ViewVC Help
View File | Revision Log | Show Annotations | Root Listing
root/cvsroot/COMP/CRAB/python/cms_cmssw.py
Revision: 1.154
Committed: Fri Jan 18 18:41:04 2008 UTC (17 years, 3 months ago) by slacapra
Content type: text/x-python
Branch: MAIN
CVS Tags: CRAB_2_1_0_pre4, CRAB_2_1_0_pre3
Changes since 1.153: +2 -4 lines
Log Message:
Redesign the inheritance three of Scheduler Classes, and remove the SchedulerBoss one
Introduce ScheuledGrid and ScheduleLocal as base classes for Grid and local scheduler, respectively.
All interaction with boss are done via Boss class, whose (unique) instance is owned by Scheduler
These changes are done to reduce code duplication.

Plus other minor modification and cosmetcs, as usual

File Contents

# User Rev Content
1 slacapra 1.1 from JobType import JobType
2     from crab_logger import Logger
3     from crab_exceptions import *
4     from crab_util import *
5 fanzago 1.115 from BlackWhiteListParser import BlackWhiteListParser
6 slacapra 1.1 import common
7     import Scram
8    
9 slacapra 1.105 import os, string, glob
10 slacapra 1.1
11     class Cmssw(JobType):
12 mcinquil 1.144 def __init__(self, cfg_params, ncjobs):
13 slacapra 1.1 JobType.__init__(self, 'CMSSW')
14     common.logger.debug(3,'CMSSW::__init__')
15    
16 mcinquil 1.140 self.argsList = []
17 mcinquil 1.144
18 gutsche 1.3 self._params = {}
19     self.cfg_params = cfg_params
20 fanzago 1.115 # init BlackWhiteListParser
21     self.blackWhiteListParser = BlackWhiteListParser(cfg_params)
22    
23 slacapra 1.153 self.MaxTarBallSize = float(self.cfg_params.get('EDG.maxtarballsize',9.5))
24 gutsche 1.72
25 gutsche 1.44 # number of jobs requested to be created, limit obj splitting
26 gutsche 1.38 self.ncjobs = ncjobs
27    
28 slacapra 1.1 log = common.logger
29 ewv 1.131
30 slacapra 1.1 self.scram = Scram.Scram(cfg_params)
31     self.additional_inbox_files = []
32     self.scriptExe = ''
33     self.executable = ''
34 slacapra 1.71 self.executable_arch = self.scram.getArch()
35 slacapra 1.1 self.tgz_name = 'default.tgz'
36 slacapra 1.97 self.additional_tgz_name = 'additional.tgz'
37 corvo 1.56 self.scriptName = 'CMSSW.sh'
38 ewv 1.131 self.pset = '' #scrip use case Da
39 spiga 1.42 self.datasetPath = '' #scrip use case Da
40 gutsche 1.3
41 gutsche 1.50 # set FJR file name
42     self.fjrFileName = 'crab_fjr.xml'
43    
44 slacapra 1.1 self.version = self.scram.getSWVersion()
45 ewv 1.131
46 spiga 1.114 #
47     # Try to block creation in case of arch/version mismatch
48     #
49    
50     a = string.split(self.version, "_")
51    
52     if int(a[1]) == 1 and (int(a[2]) < 5 and self.executable_arch.find('slc4') == 0):
53 fanzago 1.134 msg = "Warning: You are using %s version of CMSSW with %s architecture. \n--> Did you compile your libraries with SLC3? Otherwise you can find some problems running on SLC4 Grid nodes.\n"%(self.version, self.executable_arch)
54     common.logger.message(msg)
55 spiga 1.114 if int(a[1]) == 1 and (int(a[2]) >= 5 and self.executable_arch.find('slc3') == 0):
56     msg = "Error: CMS does not support %s with %s architecture"%(self.version, self.executable_arch)
57     raise CrabException(msg)
58 ewv 1.131
59 slacapra 1.55 common.taskDB.setDict('codeVersion',self.version)
60 gutsche 1.5 self.setParam_('application', self.version)
61 slacapra 1.47
62 slacapra 1.1 ### collect Data cards
63 gutsche 1.66
64 slacapra 1.153 if not cfg_params.has_key('CMSSW.datasetpath'):
65 ewv 1.131 msg = "Error: datasetpath not defined "
66 slacapra 1.1 raise CrabException(msg)
67 slacapra 1.153 tmp = cfg_params['CMSSW.datasetpath']
68     log.debug(6, "CMSSW::CMSSW(): datasetPath = "+tmp)
69     if string.lower(tmp)=='none':
70     self.datasetPath = None
71     self.selectNoInput = 1
72     else:
73     self.datasetPath = tmp
74     self.selectNoInput = 0
75 gutsche 1.5
76     # ML monitoring
77     # split dataset path style: /PreProdR3Minbias/SIM/GEN-SIM
78 slacapra 1.9 if not self.datasetPath:
79     self.setParam_('dataset', 'None')
80     self.setParam_('owner', 'None')
81     else:
82 slacapra 1.153 ## SL what is supposed to fail here?
83 gutsche 1.92 try:
84     datasetpath_split = self.datasetPath.split("/")
85     # standard style
86 mcinquil 1.120 self.setParam_('datasetFull', self.datasetPath)
87 slacapra 1.137 self.setParam_('dataset', datasetpath_split[1])
88     self.setParam_('owner', datasetpath_split[2])
89 gutsche 1.92 except:
90     self.setParam_('dataset', self.datasetPath)
91     self.setParam_('owner', self.datasetPath)
92 ewv 1.131
93 slacapra 1.151 self.setParam_('taskId', common.taskDB.dict('taskId'))
94 gutsche 1.5
95 slacapra 1.1 self.dataTiers = []
96    
97     ## now the application
98 slacapra 1.153 self.executable = cfg_params.get('CMSSW.executable','cmsRun')
99     self.setParam_('exe', self.executable)
100     log.debug(6, "CMSSW::CMSSW(): executable = "+self.executable)
101 slacapra 1.1
102 slacapra 1.153 if not cfg_params.has_key('CMSSW.pset'):
103 slacapra 1.1 raise CrabException("PSet file missing. Cannot run cmsRun ")
104 slacapra 1.153 self.pset = cfg_params['CMSSW.pset']
105     log.debug(6, "Cmssw::Cmssw(): PSet file = "+self.pset)
106     if self.pset.lower() != 'none' :
107     if (not os.path.exists(self.pset)):
108     raise CrabException("User defined PSet file "+self.pset+" does not exist")
109     else:
110     self.pset = None
111 slacapra 1.1
112     # output files
113 slacapra 1.53 ## stuff which must be returned always via sandbox
114     self.output_file_sandbox = []
115    
116     # add fjr report by default via sandbox
117     self.output_file_sandbox.append(self.fjrFileName)
118    
119     # other output files to be returned via sandbox or copied to SE
120 slacapra 1.153 self.output_file = []
121     tmp = cfg_params.get('CMSSW.output_file',None)
122     if tmp :
123     tmpOutFiles = string.split(tmp,',')
124     log.debug(7, 'cmssw::cmssw(): output files '+str(tmpOutFiles))
125     for tmp in tmpOutFiles:
126     tmp=string.strip(tmp)
127     self.output_file.append(tmp)
128 slacapra 1.1 pass
129 slacapra 1.153 else:
130 gutsche 1.92 log.message("No output file defined: only stdout/err and the CRAB Framework Job Report will be available\n")
131 slacapra 1.153 pass
132 slacapra 1.1
133     # script_exe file as additional file in inputSandbox
134 slacapra 1.153 self.scriptExe = cfg_params.get('USER.script_exe',None)
135     if self.scriptExe :
136     if not os.path.isfile(self.scriptExe):
137     msg ="ERROR. file "+self.scriptExe+" not found"
138     raise CrabException(msg)
139     self.additional_inbox_files.append(string.strip(self.scriptExe))
140 slacapra 1.70
141 spiga 1.42 #CarlosDaniele
142     if self.datasetPath == None and self.pset == None and self.scriptExe == '' :
143 slacapra 1.70 msg ="Error. script_exe not defined"
144 spiga 1.42 raise CrabException(msg)
145    
146 slacapra 1.1 ## additional input files
147 slacapra 1.153 if cfg_params.has_key('USER.additional_input_files'):
148 slacapra 1.29 tmpAddFiles = string.split(cfg_params['USER.additional_input_files'],',')
149 slacapra 1.70 for tmp in tmpAddFiles:
150     tmp = string.strip(tmp)
151     dirname = ''
152     if not tmp[0]=="/": dirname = "."
153 corvo 1.85 files = []
154     if string.find(tmp,"*")>-1:
155     files = glob.glob(os.path.join(dirname, tmp))
156     if len(files)==0:
157     raise CrabException("No additional input file found with this pattern: "+tmp)
158     else:
159     files.append(tmp)
160 slacapra 1.70 for file in files:
161     if not os.path.exists(file):
162     raise CrabException("Additional input file not found: "+file)
163 slacapra 1.45 pass
164 slacapra 1.105 # fname = string.split(file, '/')[-1]
165     # storedFile = common.work_space.pathForTgz()+'share/'+fname
166     # shutil.copyfile(file, storedFile)
167     self.additional_inbox_files.append(string.strip(file))
168 slacapra 1.1 pass
169     pass
170 slacapra 1.70 common.logger.debug(5,"Additional input files: "+str(self.additional_inbox_files))
171 slacapra 1.153 pass
172 gutsche 1.3
173 slacapra 1.9 ## Events per job
174 slacapra 1.153 if cfg_params.has_key('CMSSW.events_per_job'):
175 slacapra 1.10 self.eventsPerJob =int( cfg_params['CMSSW.events_per_job'])
176 slacapra 1.9 self.selectEventsPerJob = 1
177 slacapra 1.153 else:
178 slacapra 1.9 self.eventsPerJob = -1
179     self.selectEventsPerJob = 0
180 ewv 1.131
181 slacapra 1.22 ## number of jobs
182 slacapra 1.153 if cfg_params.has_key('CMSSW.number_of_jobs'):
183 slacapra 1.22 self.theNumberOfJobs =int( cfg_params['CMSSW.number_of_jobs'])
184     self.selectNumberOfJobs = 1
185 slacapra 1.153 else:
186 slacapra 1.22 self.theNumberOfJobs = 0
187     self.selectNumberOfJobs = 0
188 slacapra 1.10
189 slacapra 1.153 if cfg_params.has_key('CMSSW.total_number_of_events'):
190 gutsche 1.35 self.total_number_of_events = int(cfg_params['CMSSW.total_number_of_events'])
191     self.selectTotalNumberEvents = 1
192 slacapra 1.153 else:
193 gutsche 1.35 self.total_number_of_events = 0
194     self.selectTotalNumberEvents = 0
195    
196 ewv 1.131 if self.pset != None: #CarlosDaniele
197 spiga 1.42 if ( (self.selectTotalNumberEvents + self.selectEventsPerJob + self.selectNumberOfJobs) != 2 ):
198     msg = 'Must define exactly two of total_number_of_events, events_per_job, or number_of_jobs.'
199     raise CrabException(msg)
200     else:
201     if (self.selectNumberOfJobs == 0):
202     msg = 'Must specify number_of_jobs.'
203     raise CrabException(msg)
204 gutsche 1.35
205 slacapra 1.22 ## source seed for pythia
206 slacapra 1.153 self.sourceSeed = cfg_params.get('CMSSW.pythia_seed',None)
207    
208     self.sourceSeedVtx = cfg_params.get('CMSSW.vtx_seed',None)
209 slacapra 1.22
210 slacapra 1.153 self.sourceSeedG4 = cfg_params.get('CMSSW.g4_seed',None)
211 slacapra 1.90
212 slacapra 1.153 self.sourceSeedMix = cfg_params.get('CMSSW.mix_seed',None)
213 slacapra 1.90
214 slacapra 1.153 self.firstRun = cfg_params.get('CMSSW.first_run',None)
215 slacapra 1.90
216 spiga 1.42 if self.pset != None: #CarlosDaniele
217 ewv 1.131 import PsetManipulator as pp
218 slacapra 1.97 PsetEdit = pp.PsetManipulator(self.pset) #Daniele Pset
219 gutsche 1.3
220 ewv 1.147 # Copy/return
221    
222 slacapra 1.153 self.copy_data = int(cfg_params.get('USER.copy_data',0))
223     self.return_data = int(cfg_params.get('USER.return_data',0))
224 ewv 1.147
225 slacapra 1.1 #DBSDLS-start
226 ewv 1.131 ## Initialize the variables that are extracted from DBS/DLS and needed in other places of the code
227 slacapra 1.1 self.maxEvents=0 # max events available ( --> check the requested nb. of evts in Creator.py)
228     self.DBSPaths={} # all dbs paths requested ( --> input to the site local discovery script)
229 gutsche 1.35 self.jobDestination=[] # Site destination(s) for each job (list of lists)
230 slacapra 1.1 ## Perform the data location and discovery (based on DBS/DLS)
231 slacapra 1.9 ## SL: Don't if NONE is specified as input (pythia use case)
232 gutsche 1.35 blockSites = {}
233 slacapra 1.9 if self.datasetPath:
234 gutsche 1.35 blockSites = self.DataDiscoveryAndLocation(cfg_params)
235 ewv 1.131 #DBSDLS-end
236 slacapra 1.1
237     self.tgzNameWithPath = self.getTarBall(self.executable)
238 ewv 1.131
239 slacapra 1.9 ## Select Splitting
240 ewv 1.131 if self.selectNoInput:
241 spiga 1.42 if self.pset == None: #CarlosDaniele
242     self.jobSplittingForScript()
243     else:
244     self.jobSplittingNoInput()
245 gutsche 1.92 else:
246 corvo 1.56 self.jobSplittingByBlocks(blockSites)
247 gutsche 1.5
248 slacapra 1.22 # modify Pset
249 spiga 1.42 if self.pset != None: #CarlosDaniele
250 slacapra 1.86 try:
251     if (self.datasetPath): # standard job
252     # allow to processa a fraction of events in a file
253 ewv 1.131 PsetEdit.inputModule("INPUTFILE")
254     PsetEdit.maxEvent(0)
255     PsetEdit.skipEvent(0)
256 slacapra 1.86 else: # pythia like job
257 slacapra 1.90 PsetEdit.maxEvent(self.eventsPerJob)
258 slacapra 1.86 if (self.firstRun):
259 ewv 1.131 PsetEdit.pythiaFirstRun(0) #First Run
260 slacapra 1.86 if (self.sourceSeed) :
261 ewv 1.131 PsetEdit.pythiaSeed(0)
262 slacapra 1.86 if (self.sourceSeedVtx) :
263 ewv 1.131 PsetEdit.vtxSeed(0)
264 slacapra 1.90 if (self.sourceSeedG4) :
265 ewv 1.131 PsetEdit.g4Seed(0)
266 slacapra 1.90 if (self.sourceSeedMix) :
267 ewv 1.131 PsetEdit.mixSeed(0)
268 slacapra 1.86 # add FrameworkJobReport to parameter-set
269 slacapra 1.90 PsetEdit.addCrabFJR(self.fjrFileName)
270     PsetEdit.psetWriter(self.configFilename())
271 slacapra 1.86 except:
272     msg='Error while manipuliating ParameterSet: exiting...'
273     raise CrabException(msg)
274 gutsche 1.3
275 slacapra 1.1 def DataDiscoveryAndLocation(self, cfg_params):
276    
277 slacapra 1.86 import DataDiscovery
278     import DataLocation
279 gutsche 1.3 common.logger.debug(10,"CMSSW::DataDiscoveryAndLocation()")
280    
281     datasetPath=self.datasetPath
282    
283 slacapra 1.1 ## Contact the DBS
284 gutsche 1.92 common.logger.message("Contacting Data Discovery Services ...")
285 slacapra 1.1 try:
286 slacapra 1.137 self.pubdata=DataDiscovery.DataDiscovery(datasetPath, cfg_params)
287 slacapra 1.1 self.pubdata.fetchDBSInfo()
288    
289 slacapra 1.41 except DataDiscovery.NotExistingDatasetError, ex :
290 slacapra 1.1 msg = 'ERROR ***: failed Data Discovery in DBS : %s'%ex.getErrorMessage()
291     raise CrabException(msg)
292 slacapra 1.41 except DataDiscovery.NoDataTierinProvenanceError, ex :
293 slacapra 1.1 msg = 'ERROR ***: failed Data Discovery in DBS : %s'%ex.getErrorMessage()
294     raise CrabException(msg)
295 slacapra 1.41 except DataDiscovery.DataDiscoveryError, ex:
296 gutsche 1.66 msg = 'ERROR ***: failed Data Discovery in DBS : %s'%ex.getErrorMessage()
297 slacapra 1.1 raise CrabException(msg)
298    
299 gutsche 1.35 self.filesbyblock=self.pubdata.getFiles()
300 mkirn 1.37 self.eventsbyblock=self.pubdata.getEventsPerBlock()
301     self.eventsbyfile=self.pubdata.getEventsPerFile()
302 gutsche 1.3
303 slacapra 1.1 ## get max number of events
304 ewv 1.131 self.maxEvents=self.pubdata.getMaxEvents() ## self.maxEvents used in Creator.py
305 slacapra 1.1
306     ## Contact the DLS and build a list of sites hosting the fileblocks
307     try:
308 slacapra 1.41 dataloc=DataLocation.DataLocation(self.filesbyblock.keys(),cfg_params)
309 gutsche 1.6 dataloc.fetchDLSInfo()
310 slacapra 1.41 except DataLocation.DataLocationError , ex:
311 slacapra 1.1 msg = 'ERROR ***: failed Data Location in DLS \n %s '%ex.getErrorMessage()
312     raise CrabException(msg)
313 ewv 1.131
314 slacapra 1.1
315 gutsche 1.35 sites = dataloc.getSites()
316     allSites = []
317     listSites = sites.values()
318 slacapra 1.63 for listSite in listSites:
319     for oneSite in listSite:
320 gutsche 1.35 allSites.append(oneSite)
321     allSites = self.uniquelist(allSites)
322 gutsche 1.3
323 gutsche 1.92 # screen output
324     common.logger.message("Requested dataset: " + datasetPath + " has " + str(self.maxEvents) + " events in " + str(len(self.filesbyblock.keys())) + " blocks.\n")
325    
326 gutsche 1.35 return sites
327 ewv 1.131
328 mcinquil 1.140 def setArgsList(self, argsList):
329     self.argsList = argsList
330    
331 gutsche 1.35 def jobSplittingByBlocks(self, blockSites):
332 slacapra 1.9 """
333 gutsche 1.35 Perform job splitting. Jobs run over an integer number of files
334     and no more than one block.
335     ARGUMENT: blockSites: dictionary with blocks as keys and list of host sites as values
336     REQUIRES: self.selectTotalNumberEvents, self.selectEventsPerJob, self.selectNumberofJobs,
337     self.total_number_of_events, self.eventsPerJob, self.theNumberOfJobs,
338     self.maxEvents, self.filesbyblock
339     SETS: self.jobDestination - Site destination(s) for each job (a list of lists)
340     self.total_number_of_jobs - Total # of jobs
341     self.list_of_args - File(s) job will run on (a list of lists)
342     """
343    
344     # ---- Handle the possible job splitting configurations ---- #
345     if (self.selectTotalNumberEvents):
346     totalEventsRequested = self.total_number_of_events
347     if (self.selectEventsPerJob):
348     eventsPerJobRequested = self.eventsPerJob
349     if (self.selectNumberOfJobs):
350     totalEventsRequested = self.theNumberOfJobs * self.eventsPerJob
351    
352     # If user requested all the events in the dataset
353     if (totalEventsRequested == -1):
354     eventsRemaining=self.maxEvents
355     # If user requested more events than are in the dataset
356     elif (totalEventsRequested > self.maxEvents):
357     eventsRemaining = self.maxEvents
358     common.logger.message("Requested "+str(self.total_number_of_events)+ " events, but only "+str(self.maxEvents)+" events are available.")
359     # If user requested less events than are in the dataset
360     else:
361     eventsRemaining = totalEventsRequested
362 slacapra 1.22
363 slacapra 1.41 # If user requested more events per job than are in the dataset
364     if (self.selectEventsPerJob and eventsPerJobRequested > self.maxEvents):
365     eventsPerJobRequested = self.maxEvents
366    
367 gutsche 1.35 # For user info at end
368     totalEventCount = 0
369 gutsche 1.3
370 gutsche 1.35 if (self.selectTotalNumberEvents and self.selectNumberOfJobs):
371     eventsPerJobRequested = int(eventsRemaining/self.theNumberOfJobs)
372 slacapra 1.22
373 gutsche 1.35 if (self.selectNumberOfJobs):
374     common.logger.message("May not create the exact number_of_jobs requested.")
375 slacapra 1.23
376 gutsche 1.38 if ( self.ncjobs == 'all' ) :
377     totalNumberOfJobs = 999999999
378     else :
379     totalNumberOfJobs = self.ncjobs
380 ewv 1.131
381 gutsche 1.38
382 gutsche 1.35 blocks = blockSites.keys()
383     blockCount = 0
384     # Backup variable in case self.maxEvents counted events in a non-included block
385     numBlocksInDataset = len(blocks)
386 gutsche 1.3
387 gutsche 1.35 jobCount = 0
388     list_of_lists = []
389 gutsche 1.3
390 gutsche 1.92 # list tracking which jobs are in which jobs belong to which block
391     jobsOfBlock = {}
392    
393 gutsche 1.35 # ---- Iterate over the blocks in the dataset until ---- #
394     # ---- we've met the requested total # of events ---- #
395 gutsche 1.38 while ( (eventsRemaining > 0) and (blockCount < numBlocksInDataset) and (jobCount < totalNumberOfJobs)):
396 gutsche 1.35 block = blocks[blockCount]
397 gutsche 1.44 blockCount += 1
398 gutsche 1.104 if block not in jobsOfBlock.keys() :
399     jobsOfBlock[block] = []
400 ewv 1.131
401 gutsche 1.68 if self.eventsbyblock.has_key(block) :
402     numEventsInBlock = self.eventsbyblock[block]
403     common.logger.debug(5,'Events in Block File '+str(numEventsInBlock))
404 ewv 1.131
405 gutsche 1.68 files = self.filesbyblock[block]
406     numFilesInBlock = len(files)
407     if (numFilesInBlock <= 0):
408     continue
409     fileCount = 0
410    
411     # ---- New block => New job ---- #
412 ewv 1.131 parString = ""
413 gutsche 1.68 # counter for number of events in files currently worked on
414     filesEventCount = 0
415     # flag if next while loop should touch new file
416     newFile = 1
417     # job event counter
418     jobSkipEventCount = 0
419 ewv 1.131
420 gutsche 1.68 # ---- Iterate over the files in the block until we've met the requested ---- #
421     # ---- total # of events or we've gone over all the files in this block ---- #
422     while ( (eventsRemaining > 0) and (fileCount < numFilesInBlock) and (jobCount < totalNumberOfJobs) ):
423     file = files[fileCount]
424     if newFile :
425     try:
426     numEventsInFile = self.eventsbyfile[file]
427     common.logger.debug(6, "File "+str(file)+" has "+str(numEventsInFile)+" events")
428     # increase filesEventCount
429     filesEventCount += numEventsInFile
430     # Add file to current job
431     parString += '\\\"' + file + '\\\"\,'
432     newFile = 0
433     except KeyError:
434     common.logger.message("File "+str(file)+" has unknown number of events: skipping")
435 ewv 1.131
436 gutsche 1.38
437 gutsche 1.68 # if less events in file remain than eventsPerJobRequested
438     if ( filesEventCount - jobSkipEventCount < eventsPerJobRequested ) :
439     # if last file in block
440     if ( fileCount == numFilesInBlock-1 ) :
441     # end job using last file, use remaining events in block
442     # close job and touch new file
443     fullString = parString[:-2]
444     list_of_lists.append([fullString,str(-1),str(jobSkipEventCount)])
445     common.logger.debug(3,"Job "+str(jobCount+1)+" can run over "+str(filesEventCount - jobSkipEventCount)+" events (last file in block).")
446     self.jobDestination.append(blockSites[block])
447     common.logger.debug(5,"Job "+str(jobCount+1)+" Destination: "+str(self.jobDestination[jobCount]))
448 gutsche 1.92 # fill jobs of block dictionary
449 gutsche 1.104 jobsOfBlock[block].append(jobCount+1)
450 gutsche 1.68 # reset counter
451     jobCount = jobCount + 1
452     totalEventCount = totalEventCount + filesEventCount - jobSkipEventCount
453     eventsRemaining = eventsRemaining - filesEventCount + jobSkipEventCount
454     jobSkipEventCount = 0
455     # reset file
456 ewv 1.131 parString = ""
457 gutsche 1.68 filesEventCount = 0
458     newFile = 1
459     fileCount += 1
460     else :
461     # go to next file
462     newFile = 1
463     fileCount += 1
464     # if events in file equal to eventsPerJobRequested
465     elif ( filesEventCount - jobSkipEventCount == eventsPerJobRequested ) :
466 gutsche 1.38 # close job and touch new file
467     fullString = parString[:-2]
468 gutsche 1.68 list_of_lists.append([fullString,str(eventsPerJobRequested),str(jobSkipEventCount)])
469     common.logger.debug(3,"Job "+str(jobCount+1)+" can run over "+str(eventsPerJobRequested)+" events.")
470 gutsche 1.38 self.jobDestination.append(blockSites[block])
471     common.logger.debug(5,"Job "+str(jobCount+1)+" Destination: "+str(self.jobDestination[jobCount]))
472 gutsche 1.104 jobsOfBlock[block].append(jobCount+1)
473 gutsche 1.38 # reset counter
474     jobCount = jobCount + 1
475 gutsche 1.68 totalEventCount = totalEventCount + eventsPerJobRequested
476     eventsRemaining = eventsRemaining - eventsPerJobRequested
477 gutsche 1.38 jobSkipEventCount = 0
478     # reset file
479 ewv 1.131 parString = ""
480 gutsche 1.38 filesEventCount = 0
481     newFile = 1
482     fileCount += 1
483 ewv 1.131
484 gutsche 1.68 # if more events in file remain than eventsPerJobRequested
485 gutsche 1.38 else :
486 gutsche 1.68 # close job but don't touch new file
487     fullString = parString[:-2]
488     list_of_lists.append([fullString,str(eventsPerJobRequested),str(jobSkipEventCount)])
489     common.logger.debug(3,"Job "+str(jobCount+1)+" can run over "+str(eventsPerJobRequested)+" events.")
490     self.jobDestination.append(blockSites[block])
491     common.logger.debug(5,"Job "+str(jobCount+1)+" Destination: "+str(self.jobDestination[jobCount]))
492 gutsche 1.104 jobsOfBlock[block].append(jobCount+1)
493 gutsche 1.68 # increase counter
494     jobCount = jobCount + 1
495     totalEventCount = totalEventCount + eventsPerJobRequested
496     eventsRemaining = eventsRemaining - eventsPerJobRequested
497     # calculate skip events for last file
498     # use filesEventCount (contains several files), jobSkipEventCount and eventsPerJobRequest
499     jobSkipEventCount = eventsPerJobRequested - (filesEventCount - jobSkipEventCount - self.eventsbyfile[file])
500     # remove all but the last file
501     filesEventCount = self.eventsbyfile[file]
502 ewv 1.131 parString = ""
503 gutsche 1.68 parString += '\\\"' + file + '\\\"\,'
504     pass # END if
505     pass # END while (iterate over files in the block)
506 gutsche 1.35 pass # END while (iterate over blocks in the dataset)
507 slacapra 1.41 self.ncjobs = self.total_number_of_jobs = jobCount
508 gutsche 1.38 if (eventsRemaining > 0 and jobCount < totalNumberOfJobs ):
509 gutsche 1.35 common.logger.message("Could not run on all requested events because some blocks not hosted at allowed sites.")
510 gutsche 1.92 common.logger.message(str(jobCount)+" job(s) can run on "+str(totalEventCount)+" events.\n")
511 ewv 1.131
512 gutsche 1.92 # screen output
513     screenOutput = "List of jobs and available destination sites:\n\n"
514    
515 mcinquil 1.124 # keep trace of block with no sites to print a warning at the end
516     noSiteBlock = []
517     bloskNoSite = []
518    
519 gutsche 1.92 blockCounter = 0
520 gutsche 1.104 for block in blocks:
521     if block in jobsOfBlock.keys() :
522     blockCounter += 1
523 fanzago 1.115 screenOutput += "Block %5i: jobs %20s: sites: %s\n" % (blockCounter,spanRanges(jobsOfBlock[block]),','.join(self.blackWhiteListParser.checkWhiteList(self.blackWhiteListParser.checkBlackList(blockSites[block],block),block)))
524 mcinquil 1.124 if len(self.blackWhiteListParser.checkWhiteList(self.blackWhiteListParser.checkBlackList(blockSites[block],block),block)) == 0:
525 ewv 1.131 noSiteBlock.append( spanRanges(jobsOfBlock[block]) )
526 mcinquil 1.124 bloskNoSite.append( blockCounter )
527 ewv 1.131
528 mcinquil 1.124 common.logger.message(screenOutput)
529 fanzago 1.127 if len(noSiteBlock) > 0 and len(bloskNoSite) > 0:
530 mcinquil 1.126 msg = 'WARNING: No sites are hosting any part of data for block:\n '
531     virgola = ""
532     if len(bloskNoSite) > 1:
533     virgola = ","
534     for block in bloskNoSite:
535     msg += ' ' + str(block) + virgola
536     msg += '\n Related jobs:\n '
537     virgola = ""
538     if len(noSiteBlock) > 1:
539     virgola = ","
540     for range_jobs in noSiteBlock:
541     msg += str(range_jobs) + virgola
542     msg += '\n will not be submitted and this block of data can not be analyzed!\n'
543     common.logger.message(msg)
544 gutsche 1.92
545 slacapra 1.9 self.list_of_args = list_of_lists
546     return
547    
548 slacapra 1.21 def jobSplittingNoInput(self):
549 slacapra 1.9 """
550     Perform job splitting based on number of event per job
551     """
552     common.logger.debug(5,'Splitting per events')
553 fanzago 1.130
554 ewv 1.131 if (self.selectEventsPerJob):
555 fanzago 1.130 common.logger.message('Required '+str(self.eventsPerJob)+' events per job ')
556     if (self.selectNumberOfJobs):
557     common.logger.message('Required '+str(self.theNumberOfJobs)+' jobs in total ')
558     if (self.selectTotalNumberEvents):
559     common.logger.message('Required '+str(self.total_number_of_events)+' events in total ')
560 slacapra 1.9
561 slacapra 1.10 if (self.total_number_of_events < 0):
562     msg='Cannot split jobs per Events with "-1" as total number of events'
563     raise CrabException(msg)
564    
565 slacapra 1.22 if (self.selectEventsPerJob):
566 spiga 1.65 if (self.selectTotalNumberEvents):
567     self.total_number_of_jobs = int(self.total_number_of_events/self.eventsPerJob)
568 ewv 1.131 elif(self.selectNumberOfJobs) :
569 spiga 1.65 self.total_number_of_jobs =self.theNumberOfJobs
570 ewv 1.131 self.total_number_of_events =int(self.theNumberOfJobs*self.eventsPerJob)
571 spiga 1.65
572 slacapra 1.22 elif (self.selectNumberOfJobs) :
573     self.total_number_of_jobs = self.theNumberOfJobs
574     self.eventsPerJob = int(self.total_number_of_events/self.total_number_of_jobs)
575 ewv 1.131
576 slacapra 1.9 common.logger.debug(5,'N jobs '+str(self.total_number_of_jobs))
577    
578     # is there any remainder?
579     check = int(self.total_number_of_events) - (int(self.total_number_of_jobs)*self.eventsPerJob)
580    
581     common.logger.debug(5,'Check '+str(check))
582    
583 gutsche 1.35 common.logger.message(str(self.total_number_of_jobs)+' jobs can be created, each for '+str(self.eventsPerJob)+' for a total of '+str(self.total_number_of_jobs*self.eventsPerJob)+' events')
584 slacapra 1.9 if check > 0:
585 gutsche 1.35 common.logger.message('Warning: asked '+str(self.total_number_of_events)+' but can do only '+str(int(self.total_number_of_jobs)*self.eventsPerJob))
586 slacapra 1.9
587 slacapra 1.10 # argument is seed number.$i
588 slacapra 1.9 self.list_of_args = []
589     for i in range(self.total_number_of_jobs):
590 gutsche 1.35 ## Since there is no input, any site is good
591 ewv 1.131 self.jobDestination.append([""]) #must be empty to write correctly the xml
592 slacapra 1.90 args=[]
593 spiga 1.57 if (self.firstRun):
594 slacapra 1.138 ## pythia first run
595 slacapra 1.90 args.append(str(self.firstRun)+str(i))
596 slacapra 1.23 if (self.sourceSeed):
597 slacapra 1.90 args.append(str(self.sourceSeed)+str(i))
598 slacapra 1.28 if (self.sourceSeedVtx):
599 slacapra 1.90 ## + vtx random seed
600     args.append(str(self.sourceSeedVtx)+str(i))
601     if (self.sourceSeedG4):
602     ## + G4 random seed
603     args.append(str(self.sourceSeedG4)+str(i))
604 ewv 1.131 if (self.sourceSeedMix):
605 slacapra 1.90 ## + Mix random seed
606     args.append(str(self.sourceSeedMix)+str(i))
607     pass
608     pass
609     self.list_of_args.append(args)
610     pass
611 ewv 1.131
612 gutsche 1.3 return
613    
614 spiga 1.42
615     def jobSplittingForScript(self):#CarlosDaniele
616     """
617     Perform job splitting based on number of job
618     """
619     common.logger.debug(5,'Splitting per job')
620     common.logger.message('Required '+str(self.theNumberOfJobs)+' jobs in total ')
621    
622     self.total_number_of_jobs = self.theNumberOfJobs
623    
624     common.logger.debug(5,'N jobs '+str(self.total_number_of_jobs))
625    
626     common.logger.message(str(self.total_number_of_jobs)+' jobs can be created')
627    
628     # argument is seed number.$i
629     self.list_of_args = []
630     for i in range(self.total_number_of_jobs):
631     ## Since there is no input, any site is good
632     # self.jobDestination.append(["Any"])
633     self.jobDestination.append([""])
634     ## no random seed
635     self.list_of_args.append([str(i)])
636     return
637    
638 gutsche 1.3 def split(self, jobParams):
639 ewv 1.131
640 gutsche 1.3 common.jobDB.load()
641     #### Fabio
642     njobs = self.total_number_of_jobs
643 slacapra 1.9 arglist = self.list_of_args
644 gutsche 1.3 # create the empty structure
645     for i in range(njobs):
646     jobParams.append("")
647 ewv 1.131
648 gutsche 1.3 for job in range(njobs):
649 slacapra 1.17 jobParams[job] = arglist[job]
650     # print str(arglist[job])
651     # print jobParams[job]
652 gutsche 1.3 common.jobDB.setArguments(job, jobParams[job])
653 gutsche 1.35 common.logger.debug(5,"Job "+str(job)+" Destination: "+str(self.jobDestination[job]))
654     common.jobDB.setDestination(job, self.jobDestination[job])
655 gutsche 1.3
656     common.jobDB.save()
657     return
658 ewv 1.131
659 gutsche 1.3 def getJobTypeArguments(self, nj, sched):
660 slacapra 1.17 result = ''
661     for i in common.jobDB.arguments(nj):
662     result=result+str(i)+" "
663     return result
664 ewv 1.131
665 gutsche 1.3 def numberOfJobs(self):
666     # Fabio
667     return self.total_number_of_jobs
668    
669 slacapra 1.1 def getTarBall(self, exe):
670     """
671     Return the TarBall with lib and exe
672     """
673 ewv 1.131
674 slacapra 1.1 # if it exist, just return it
675 corvo 1.56 #
676     # Marco. Let's start to use relative path for Boss XML files
677     #
678     self.tgzNameWithPath = common.work_space.pathForTgz()+'share/'+self.tgz_name
679 slacapra 1.1 if os.path.exists(self.tgzNameWithPath):
680     return self.tgzNameWithPath
681    
682     # Prepare a tar gzipped file with user binaries.
683     self.buildTar_(exe)
684    
685     return string.strip(self.tgzNameWithPath)
686    
687     def buildTar_(self, executable):
688    
689     # First of all declare the user Scram area
690     swArea = self.scram.getSWArea_()
691     #print "swArea = ", swArea
692 slacapra 1.63 # swVersion = self.scram.getSWVersion()
693     # print "swVersion = ", swVersion
694 slacapra 1.1 swReleaseTop = self.scram.getReleaseTop_()
695     #print "swReleaseTop = ", swReleaseTop
696 ewv 1.131
697 slacapra 1.1 ## check if working area is release top
698     if swReleaseTop == '' or swArea == swReleaseTop:
699     return
700    
701 slacapra 1.61 import tarfile
702     try: # create tar ball
703     tar = tarfile.open(self.tgzNameWithPath, "w:gz")
704     ## First find the executable
705 slacapra 1.86 if (self.executable != ''):
706 slacapra 1.61 exeWithPath = self.scram.findFile_(executable)
707     if ( not exeWithPath ):
708     raise CrabException('User executable '+executable+' not found')
709 ewv 1.131
710 slacapra 1.61 ## then check if it's private or not
711     if exeWithPath.find(swReleaseTop) == -1:
712     # the exe is private, so we must ship
713     common.logger.debug(5,"Exe "+exeWithPath+" to be tarred")
714     path = swArea+'/'
715 corvo 1.85 # distinguish case when script is in user project area or given by full path somewhere else
716     if exeWithPath.find(path) >= 0 :
717     exe = string.replace(exeWithPath, path,'')
718 slacapra 1.129 tar.add(path+exe,exe)
719 corvo 1.85 else :
720     tar.add(exeWithPath,os.path.basename(executable))
721 slacapra 1.61 pass
722     else:
723     # the exe is from release, we'll find it on WN
724     pass
725 ewv 1.131
726 slacapra 1.61 ## Now get the libraries: only those in local working area
727     libDir = 'lib'
728     lib = swArea+'/' +libDir
729     common.logger.debug(5,"lib "+lib+" to be tarred")
730     if os.path.exists(lib):
731     tar.add(lib,libDir)
732 ewv 1.131
733 slacapra 1.61 ## Now check if module dir is present
734     moduleDir = 'module'
735     module = swArea + '/' + moduleDir
736     if os.path.isdir(module):
737     tar.add(module,moduleDir)
738    
739     ## Now check if any data dir(s) is present
740     swAreaLen=len(swArea)
741     for root, dirs, files in os.walk(swArea):
742     if "data" in dirs:
743     common.logger.debug(5,"data "+root+"/data"+" to be tarred")
744     tar.add(root+"/data",root[swAreaLen:]+"/data")
745    
746 fanzago 1.152 ### Removed ProdAgent Api dependencies ###
747     ### Add ProdAgent dir to tar
748     #paDir = 'ProdAgentApi'
749     #pa = os.environ['CRABDIR'] + '/' + 'ProdAgentApi'
750     #if os.path.isdir(pa):
751     # tar.add(pa,paDir)
752 fanzago 1.93
753 fanzago 1.152 ## Add ProdCommon dir to tar
754 fanzago 1.93 prodcommonDir = 'ProdCommon'
755     prodcommonPath = os.environ['CRABDIR'] + '/' + 'ProdCommon'
756     if os.path.isdir(prodcommonPath):
757     tar.add(prodcommonPath,prodcommonDir)
758 ewv 1.131
759 slacapra 1.61 common.logger.debug(5,"Files added to "+self.tgzNameWithPath+" : "+str(tar.getnames()))
760     tar.close()
761     except :
762     raise CrabException('Could not create tar-ball')
763 gutsche 1.72
764     ## check for tarball size
765     tarballinfo = os.stat(self.tgzNameWithPath)
766     if ( tarballinfo.st_size > self.MaxTarBallSize*1024*1024 ) :
767     raise CrabException('Input sandbox size of ' + str(float(tarballinfo.st_size)/1024.0/1024.0) + ' MB is larger than the allowed ' + str(self.MaxTarBallSize) + ' MB input sandbox limit and not supported by the used GRID submission system. Please make sure that no unnecessary files are in all data directories in your local CMSSW project area as they are automatically packed into the input sandbox.')
768    
769 slacapra 1.61 ## create tar-ball with ML stuff
770 ewv 1.131 self.MLtgzfile = common.work_space.pathForTgz()+'share/MLfiles.tgz'
771 slacapra 1.61 try:
772     tar = tarfile.open(self.MLtgzfile, "w:gz")
773     path=os.environ['CRABDIR'] + '/python/'
774     for file in ['report.py', 'DashboardAPI.py', 'Logger.py', 'ProcInfo.py', 'apmon.py', 'parseCrabFjr.py']:
775     tar.add(path+file,file)
776     common.logger.debug(5,"Files added to "+self.MLtgzfile+" : "+str(tar.getnames()))
777     tar.close()
778     except :
779 corvo 1.58 raise CrabException('Could not create ML files tar-ball')
780 ewv 1.131
781 slacapra 1.1 return
782 ewv 1.131
783 slacapra 1.97 def additionalInputFileTgz(self):
784     """
785     Put all additional files into a tar ball and return its name
786     """
787     import tarfile
788     tarName= common.work_space.pathForTgz()+'share/'+self.additional_tgz_name
789     tar = tarfile.open(tarName, "w:gz")
790     for file in self.additional_inbox_files:
791     tar.add(file,string.split(file,'/')[-1])
792     common.logger.debug(5,"Files added to "+self.additional_tgz_name+" : "+str(tar.getnames()))
793     tar.close()
794     return tarName
795    
796 slacapra 1.1 def wsSetupEnvironment(self, nj):
797     """
798     Returns part of a job script which prepares
799     the execution environment for the job 'nj'.
800     """
801     # Prepare JobType-independent part
802 ewv 1.131 txt = ''
803 fanzago 1.133 txt += 'echo ">>> setup environment"\n'
804 ewv 1.131 txt += 'if [ $middleware == LCG ]; then \n'
805 gutsche 1.3 txt += self.wsSetupCMSLCGEnvironment_()
806     txt += 'elif [ $middleware == OSG ]; then\n'
807 gutsche 1.43 txt += ' WORKING_DIR=`/bin/mktemp -d $OSG_WN_TMP/cms_XXXXXXXXXXXX`\n'
808 ewv 1.132 txt += ' if [ ! $? == 0 ] ;then\n'
809 gutsche 1.7 txt += ' echo "SET_CMS_ENV 10016 ==> OSG $WORKING_DIR could not be created on WN `hostname`"\n'
810 fanzago 1.133 txt += ' echo "JOB_EXIT_STATUS = 10016"\n'
811     txt += ' echo "JobExitCode=10016" | tee -a $RUNTIME_AREA/$repo\n'
812     txt += ' dumpStatus $RUNTIME_AREA/$repo\n'
813 gutsche 1.3 txt += ' exit 1\n'
814     txt += ' fi\n'
815 fanzago 1.133 txt += ' echo ">>> Created working directory: $WORKING_DIR"\n'
816 gutsche 1.3 txt += '\n'
817     txt += ' echo "Change to working directory: $WORKING_DIR"\n'
818     txt += ' cd $WORKING_DIR\n'
819 fanzago 1.133 txt += ' echo ">>> current directory (WORKING_DIR): $WORKING_DIR"\n'
820 ewv 1.131 txt += self.wsSetupCMSOSGEnvironment_()
821 fanzago 1.133 #txt += ' echo "### Set SCRAM ARCH to ' + self.executable_arch + ' ###"\n'
822     #txt += ' export SCRAM_ARCH='+self.executable_arch+'\n'
823 gutsche 1.3 txt += 'fi\n'
824 slacapra 1.1
825     # Prepare JobType-specific part
826     scram = self.scram.commandName()
827     txt += '\n\n'
828 fanzago 1.133 txt += 'echo ">>> specific cmssw setup environment:"\n'
829     txt += 'echo "CMSSW_VERSION = '+self.version+'"\n'
830 slacapra 1.1 txt += scram+' project CMSSW '+self.version+'\n'
831     txt += 'status=$?\n'
832     txt += 'if [ $status != 0 ] ; then\n'
833 fanzago 1.133 txt += ' echo "SET_EXE_ENV 10034 ==>ERROR CMSSW '+self.version+' not found on `hostname`" \n'
834     txt += ' echo "JOB_EXIT_STATUS = 10034"\n'
835     txt += ' echo "JobExitCode=10034" | tee -a $RUNTIME_AREA/$repo\n'
836     txt += ' dumpStatus $RUNTIME_AREA/$repo\n'
837 gutsche 1.3 txt += ' if [ $middleware == OSG ]; then \n'
838     txt += ' cd $RUNTIME_AREA\n'
839 fanzago 1.133 txt += ' echo ">>> current directory (RUNTIME_AREA): $RUNTIME_AREA"\n'
840     txt += ' echo ">>> Remove working directory: $WORKING_DIR"\n'
841 gutsche 1.3 txt += ' /bin/rm -rf $WORKING_DIR\n'
842     txt += ' if [ -d $WORKING_DIR ] ;then\n'
843 fanzago 1.96 txt += ' echo "SET_CMS_ENV 10018 ==> OSG $WORKING_DIR could not be deleted on WN `hostname` after CMSSW CMSSW_0_6_1 not found on `hostname`"\n'
844     txt += ' echo "JOB_EXIT_STATUS = 10018"\n'
845     txt += ' echo "JobExitCode=10018" | tee -a $RUNTIME_AREA/$repo\n'
846     txt += ' dumpStatus $RUNTIME_AREA/$repo\n'
847 gutsche 1.3 txt += ' fi\n'
848     txt += ' fi \n'
849 fanzago 1.133 txt += ' exit 1 \n'
850 slacapra 1.1 txt += 'fi \n'
851     txt += 'cd '+self.version+'\n'
852 fanzago 1.99 ########## FEDE FOR DBS2 ######################
853     txt += 'SOFTWARE_DIR=`pwd`\n'
854 fanzago 1.133 txt += 'echo ">>> current directory (SOFTWARE_DIR): $SOFTWARE_DIR" \n'
855 fanzago 1.99 ###############################################
856 slacapra 1.1 ### needed grep for bug in scramv1 ###
857     txt += 'eval `'+scram+' runtime -sh | grep -v SCRAMRT_LSB_JOBNAME`\n'
858     # Handle the arguments:
859     txt += "\n"
860 gutsche 1.7 txt += "## number of arguments (first argument always jobnumber)\n"
861 slacapra 1.1 txt += "\n"
862 mcinquil 1.140 txt += "if [ $nargs -lt "+str(len(self.argsList[nj].split()))+" ]\n"
863 slacapra 1.1 txt += "then\n"
864 mkirn 1.33 txt += " echo 'SET_EXE_ENV 1 ==> ERROR Too few arguments' +$nargs+ \n"
865 gutsche 1.3 txt += ' echo "JOB_EXIT_STATUS = 50113"\n'
866 gutsche 1.7 txt += ' echo "JobExitCode=50113" | tee -a $RUNTIME_AREA/$repo\n'
867 slacapra 1.1 txt += ' dumpStatus $RUNTIME_AREA/$repo\n'
868 gutsche 1.3 txt += ' if [ $middleware == OSG ]; then \n'
869     txt += ' cd $RUNTIME_AREA\n'
870 fanzago 1.133 txt += ' echo ">>> current directory (RUNTIME_AREA): $RUNTIME_AREA"\n'
871     txt += ' echo ">>> Remove working directory: $WORKING_DIR"\n'
872 gutsche 1.3 txt += ' /bin/rm -rf $WORKING_DIR\n'
873     txt += ' if [ -d $WORKING_DIR ] ;then\n'
874 fanzago 1.96 txt += ' echo "SET_EXE_ENV 50114 ==> OSG $WORKING_DIR could not be deleted on WN `hostname` after Too few arguments for CRAB job wrapper"\n'
875     txt += ' echo "JOB_EXIT_STATUS = 50114"\n'
876     txt += ' echo "JobExitCode=50114" | tee -a $RUNTIME_AREA/$repo\n'
877     txt += ' dumpStatus $RUNTIME_AREA/$repo\n'
878 gutsche 1.3 txt += ' fi\n'
879     txt += ' fi \n'
880 slacapra 1.1 txt += " exit 1\n"
881     txt += "fi\n"
882     txt += "\n"
883    
884     # Prepare job-specific part
885     job = common.job_list[nj]
886 fanzago 1.93 ### FEDE FOR DBS OUTPUT PUBLICATION
887 ewv 1.131 if (self.datasetPath):
888 fanzago 1.93 txt += '\n'
889     txt += 'DatasetPath='+self.datasetPath+'\n'
890    
891     datasetpath_split = self.datasetPath.split("/")
892 ewv 1.131
893 fanzago 1.93 txt += 'PrimaryDataset='+datasetpath_split[1]+'\n'
894     txt += 'DataTier='+datasetpath_split[2]+'\n'
895 fanzago 1.96 txt += 'ApplicationFamily=cmsRun\n'
896 fanzago 1.93
897     else:
898     txt += 'DatasetPath=MCDataTier\n'
899     txt += 'PrimaryDataset=null\n'
900     txt += 'DataTier=null\n'
901     txt += 'ApplicationFamily=MCDataTier\n'
902 spiga 1.42 if self.pset != None: #CarlosDaniele
903     pset = os.path.basename(job.configFilename())
904     txt += '\n'
905 spiga 1.95 txt += 'cp $RUNTIME_AREA/'+pset+' .\n'
906 spiga 1.42 if (self.datasetPath): # standard job
907     txt += 'InputFiles=${args[1]}\n'
908     txt += 'MaxEvents=${args[2]}\n'
909     txt += 'SkipEvents=${args[3]}\n'
910     txt += 'echo "Inputfiles:<$InputFiles>"\n'
911 ewv 1.131 txt += 'sed "s#\'INPUTFILE\'#$InputFiles#" '+pset+' > tmp && mv -f tmp '+pset+'\n'
912 spiga 1.42 txt += 'echo "MaxEvents:<$MaxEvents>"\n'
913 ewv 1.131 txt += 'sed "s#int32 input = 0#int32 input = $MaxEvents#" '+pset+' > tmp && mv -f tmp '+pset+'\n'
914 spiga 1.42 txt += 'echo "SkipEvents:<$SkipEvents>"\n'
915 ewv 1.131 txt += 'sed "s#uint32 skipEvents = 0#uint32 skipEvents = $SkipEvents#" '+pset+' > tmp && mv -f tmp '+pset+'\n'
916 spiga 1.42 else: # pythia like job
917 slacapra 1.90 seedIndex=1
918     if (self.firstRun):
919     txt += 'FirstRun=${args['+str(seedIndex)+']}\n'
920 spiga 1.57 txt += 'echo "FirstRun: <$FirstRun>"\n'
921 ewv 1.131 txt += 'sed "s#uint32 firstRun = 0#uint32 firstRun = $FirstRun#" '+pset+' > tmp && mv -f tmp '+pset+'\n'
922 slacapra 1.90 seedIndex=seedIndex+1
923    
924 spiga 1.57 if (self.sourceSeed):
925 slacapra 1.90 txt += 'Seed=${args['+str(seedIndex)+']}\n'
926 ewv 1.131 txt += 'sed "s#uint32 sourceSeed = 0#uint32 sourceSeed = $Seed#" '+pset+' > tmp && mv -f tmp '+pset+'\n'
927 slacapra 1.90 seedIndex=seedIndex+1
928     ## the following seeds are not always present
929 spiga 1.42 if (self.sourceSeedVtx):
930 slacapra 1.90 txt += 'VtxSeed=${args['+str(seedIndex)+']}\n'
931 spiga 1.42 txt += 'echo "VtxSeed: <$VtxSeed>"\n'
932 ewv 1.131 txt += 'sed "s#uint32 VtxSmeared = 0#uint32 VtxSmeared = $VtxSeed#" '+pset+' > tmp && mv -f tmp '+pset+'\n'
933 slacapra 1.90 seedIndex += 1
934     if (self.sourceSeedG4):
935     txt += 'G4Seed=${args['+str(seedIndex)+']}\n'
936     txt += 'echo "G4Seed: <$G4Seed>"\n'
937 ewv 1.131 txt += 'sed "s#uint32 g4SimHits = 0#uint32 g4SimHits = $G4Seed#" '+pset+' > tmp && mv -f tmp '+pset+'\n'
938 slacapra 1.90 seedIndex += 1
939     if (self.sourceSeedMix):
940     txt += 'mixSeed=${args['+str(seedIndex)+']}\n'
941     txt += 'echo "MixSeed: <$mixSeed>"\n'
942 ewv 1.131 txt += 'sed "s#uint32 mix = 0#uint32 mix = $mixSeed#" '+pset+' > tmp && mv -f tmp '+pset+'\n'
943 slacapra 1.90 seedIndex += 1
944     pass
945     pass
946     txt += 'mv -f '+pset+' pset.cfg\n'
947 slacapra 1.1
948     if len(self.additional_inbox_files) > 0:
949 slacapra 1.97 txt += 'if [ -e $RUNTIME_AREA/'+self.additional_tgz_name+' ] ; then\n'
950     txt += ' tar xzvf $RUNTIME_AREA/'+self.additional_tgz_name+'\n'
951     txt += 'fi\n'
952 ewv 1.131 pass
953 slacapra 1.1
954 spiga 1.42 if self.pset != None: #CarlosDaniele
955     txt += '\n'
956     txt += 'echo "***** cat pset.cfg *********"\n'
957     txt += 'cat pset.cfg\n'
958     txt += 'echo "****** end pset.cfg ********"\n'
959     txt += '\n'
960 fanzago 1.93 ### FEDE FOR DBS OUTPUT PUBLICATION
961 fanzago 1.94 txt += 'PSETHASH=`EdmConfigHash < pset.cfg` \n'
962     txt += 'echo "PSETHASH = $PSETHASH" \n'
963 ewv 1.131 ##############
964 fanzago 1.93 txt += '\n'
965 gutsche 1.3 return txt
966    
967 slacapra 1.63 def wsBuildExe(self, nj=0):
968 gutsche 1.3 """
969     Put in the script the commands to build an executable
970     or a library.
971     """
972    
973     txt = ""
974    
975     if os.path.isfile(self.tgzNameWithPath):
976 fanzago 1.133 txt += 'echo ">>> tar xzvf $RUNTIME_AREA/'+os.path.basename(self.tgzNameWithPath)+' :" \n'
977 gutsche 1.3 txt += 'tar xzvf $RUNTIME_AREA/'+os.path.basename(self.tgzNameWithPath)+'\n'
978     txt += 'untar_status=$? \n'
979     txt += 'if [ $untar_status -ne 0 ]; then \n'
980     txt += ' echo "SET_EXE 1 ==> ERROR Untarring .tgz file failed"\n'
981     txt += ' echo "JOB_EXIT_STATUS = $untar_status" \n'
982 gutsche 1.7 txt += ' echo "JobExitCode=$untar_status" | tee -a $RUNTIME_AREA/$repo\n'
983 gutsche 1.3 txt += ' if [ $middleware == OSG ]; then \n'
984     txt += ' cd $RUNTIME_AREA\n'
985 fanzago 1.133 txt += ' echo ">>> current directory (RUNTIME_AREA): $RUNTIME_AREA"\n'
986     txt += ' echo ">>> Remove working directory: $WORKING_DIR"\n'
987 gutsche 1.3 txt += ' /bin/rm -rf $WORKING_DIR\n'
988     txt += ' if [ -d $WORKING_DIR ] ;then\n'
989 gutsche 1.13 txt += ' echo "SET_EXE 50999 ==> OSG $WORKING_DIR could not be deleted on WN `hostname` after Untarring .tgz file failed"\n'
990     txt += ' echo "JOB_EXIT_STATUS = 50999"\n'
991     txt += ' echo "JobExitCode=50999" | tee -a $RUNTIME_AREA/$repo\n'
992     txt += ' dumpStatus $RUNTIME_AREA/$repo\n'
993 gutsche 1.3 txt += ' fi\n'
994     txt += ' fi \n'
995     txt += ' \n'
996 gutsche 1.7 txt += ' exit 1 \n'
997 gutsche 1.3 txt += 'else \n'
998     txt += ' echo "Successful untar" \n'
999     txt += 'fi \n'
1000 gutsche 1.50 txt += '\n'
1001 fanzago 1.152 #### Removed ProdAgent API dependencies
1002     txt += 'echo ">>> Include ProdCommon in PYTHONPATH:"\n'
1003 gutsche 1.50 txt += 'if [ -z "$PYTHONPATH" ]; then\n'
1004 fanzago 1.93 #### FEDE FOR DBS OUTPUT PUBLICATION
1005 fanzago 1.152 txt += ' export PYTHONPATH=$SOFTWARE_DIR/ProdCommon\n'
1006 gutsche 1.50 txt += 'else\n'
1007 fanzago 1.152 txt += ' export PYTHONPATH=$SOFTWARE_DIR/ProdCommon:${PYTHONPATH}\n'
1008 fanzago 1.93 txt += 'echo "PYTHONPATH=$PYTHONPATH"\n'
1009 ewv 1.131 ###################
1010 gutsche 1.50 txt += 'fi\n'
1011     txt += '\n'
1012    
1013 gutsche 1.3 pass
1014 ewv 1.131
1015 slacapra 1.1 return txt
1016    
1017     def modifySteeringCards(self, nj):
1018     """
1019 ewv 1.131 modify the card provided by the user,
1020 slacapra 1.1 writing a new card into share dir
1021     """
1022 ewv 1.131
1023 slacapra 1.1 def executableName(self):
1024 slacapra 1.70 if self.scriptExe: #CarlosDaniele
1025 spiga 1.42 return "sh "
1026     else:
1027     return self.executable
1028 slacapra 1.1
1029     def executableArgs(self):
1030 slacapra 1.70 if self.scriptExe:#CarlosDaniele
1031 spiga 1.42 return self.scriptExe + " $NJob"
1032 fanzago 1.115 else:
1033 ewv 1.139 # if >= CMSSW_1_5_X, add -j crab_fjr.xml
1034 fanzago 1.115 version_array = self.scram.getSWVersion().split('_')
1035     major = 0
1036     minor = 0
1037     try:
1038     major = int(version_array[1])
1039     minor = int(version_array[2])
1040     except:
1041 ewv 1.131 msg = "Cannot parse CMSSW version string: " + "_".join(version_array) + " for major and minor release number!"
1042 fanzago 1.115 raise CrabException(msg)
1043     if major >= 1 and minor >= 5 :
1044 ewv 1.139 return " -j " + self.fjrFileName + " -p pset.cfg"
1045 fanzago 1.115 else:
1046     return " -p pset.cfg"
1047 slacapra 1.1
1048     def inputSandbox(self, nj):
1049     """
1050     Returns a list of filenames to be put in JDL input sandbox.
1051     """
1052     inp_box = []
1053 slacapra 1.53 # # dict added to delete duplicate from input sandbox file list
1054     # seen = {}
1055 slacapra 1.1 ## code
1056     if os.path.isfile(self.tgzNameWithPath):
1057     inp_box.append(self.tgzNameWithPath)
1058 corvo 1.58 if os.path.isfile(self.MLtgzfile):
1059     inp_box.append(self.MLtgzfile)
1060 slacapra 1.1 ## config
1061 slacapra 1.70 if not self.pset is None:
1062 corvo 1.56 inp_box.append(common.work_space.pathForTgz() + 'job/' + self.configFilename())
1063 slacapra 1.1 ## additional input files
1064 slacapra 1.97 tgz = self.additionalInputFileTgz()
1065     inp_box.append(tgz)
1066 slacapra 1.1 return inp_box
1067    
1068     def outputSandbox(self, nj):
1069     """
1070     Returns a list of filenames to be put in JDL output sandbox.
1071     """
1072     out_box = []
1073    
1074     ## User Declared output files
1075 slacapra 1.54 for out in (self.output_file+self.output_file_sandbox):
1076 ewv 1.131 n_out = nj + 1
1077 slacapra 1.1 out_box.append(self.numberFile_(out,str(n_out)))
1078     return out_box
1079    
1080     def prepareSteeringCards(self):
1081     """
1082     Make initial modifications of the user's steering card file.
1083     """
1084     return
1085    
1086     def wsRenameOutput(self, nj):
1087     """
1088     Returns part of a job script which renames the produced files.
1089     """
1090    
1091     txt = '\n'
1092 fanzago 1.148 txt += 'echo ">>> current directory (SOFTWARE_DIR): $SOFTWARE_DIR" \n'
1093     txt += 'echo ">>> current directory content:"\n'
1094 gutsche 1.7 txt += 'ls \n'
1095 fanzago 1.145 txt += '\n'
1096 slacapra 1.54
1097 fanzago 1.128 txt += 'output_exit_status=0\n'
1098 ewv 1.131
1099 fanzago 1.128 for fileWithSuffix in (self.output_file_sandbox):
1100     output_file_num = self.numberFile_(fileWithSuffix, '$NJob')
1101     txt += '\n'
1102     txt += '# check output file\n'
1103     txt += 'if [ -e ./'+fileWithSuffix+' ] ; then\n'
1104 mcinquil 1.144 txt += ' mv '+fileWithSuffix+' $RUNTIME_AREA/'+output_file_num+'\n'
1105     txt += ' ln -s $RUNTIME_AREA/'+output_file_num+' $RUNTIME_AREA/'+fileWithSuffix+'\n'
1106 fanzago 1.128 txt += 'else\n'
1107     txt += ' exit_status=60302\n'
1108 fanzago 1.150 txt += ' echo "ERROR: Output file '+fileWithSuffix+' not found"\n'
1109 slacapra 1.154 if common.scheduler.name() == 'CONDOR_G':
1110 fanzago 1.128 txt += ' if [ $middleware == OSG ]; then \n'
1111     txt += ' echo "prepare dummy output file"\n'
1112     txt += ' echo "Processing of job output failed" > $RUNTIME_AREA/'+output_file_num+'\n'
1113     txt += ' fi \n'
1114     txt += 'fi\n'
1115 ewv 1.131
1116 fanzago 1.128 for fileWithSuffix in (self.output_file):
1117 slacapra 1.1 output_file_num = self.numberFile_(fileWithSuffix, '$NJob')
1118     txt += '\n'
1119 gutsche 1.7 txt += '# check output file\n'
1120 slacapra 1.106 txt += 'if [ -e ./'+fileWithSuffix+' ] ; then\n'
1121 ewv 1.147 if (self.copy_data == 1): # For OSG nodes, file is in $WORKING_DIR, should not be moved to $RUNTIME_AREA
1122     txt += ' mv '+fileWithSuffix+' '+output_file_num+'\n'
1123     txt += ' ln -s `pwd`/'+output_file_num+' $RUNTIME_AREA/'+fileWithSuffix+'\n'
1124     else:
1125     txt += ' mv '+fileWithSuffix+' $RUNTIME_AREA/'+output_file_num+'\n'
1126     txt += ' ln -s $RUNTIME_AREA/'+output_file_num+' $RUNTIME_AREA/'+fileWithSuffix+'\n'
1127 slacapra 1.106 txt += 'else\n'
1128 fanzago 1.117 txt += ' exit_status=60302\n'
1129 fanzago 1.150 txt += ' echo "ERROR: Output file '+fileWithSuffix+' not found"\n'
1130 fanzago 1.128 txt += ' echo "JOB_EXIT_STATUS = $exit_status"\n'
1131     txt += ' output_exit_status=$exit_status\n'
1132 slacapra 1.154 if common.scheduler.name() == 'CONDOR_G':
1133 gutsche 1.7 txt += ' if [ $middleware == OSG ]; then \n'
1134     txt += ' echo "prepare dummy output file"\n'
1135     txt += ' echo "Processing of job output failed" > $RUNTIME_AREA/'+output_file_num+'\n'
1136     txt += ' fi \n'
1137 slacapra 1.1 txt += 'fi\n'
1138 slacapra 1.105 file_list = []
1139     for fileWithSuffix in (self.output_file):
1140     file_list.append(self.numberFile_(fileWithSuffix, '$NJob'))
1141 ewv 1.131
1142 slacapra 1.105 txt += 'file_list="'+string.join(file_list,' ')+'"\n'
1143 fanzago 1.149 txt += '\n'
1144 fanzago 1.148 txt += 'echo ">>> current directory (SOFTWARE_DIR): $SOFTWARE_DIR" \n'
1145     txt += 'echo ">>> current directory content:"\n'
1146     txt += 'ls \n'
1147     txt += '\n'
1148 gutsche 1.7 txt += 'cd $RUNTIME_AREA\n'
1149 fanzago 1.133 txt += 'echo ">>> current directory (RUNTIME_AREA): $RUNTIME_AREA"\n'
1150 slacapra 1.1 return txt
1151    
1152     def numberFile_(self, file, txt):
1153     """
1154     append _'txt' before last extension of a file
1155     """
1156     p = string.split(file,".")
1157     # take away last extension
1158     name = p[0]
1159     for x in p[1:-1]:
1160 slacapra 1.90 name=name+"."+x
1161 slacapra 1.1 # add "_txt"
1162     if len(p)>1:
1163 slacapra 1.90 ext = p[len(p)-1]
1164     result = name + '_' + txt + "." + ext
1165 slacapra 1.1 else:
1166 slacapra 1.90 result = name + '_' + txt
1167 ewv 1.131
1168 slacapra 1.1 return result
1169    
1170 slacapra 1.63 def getRequirements(self, nj=[]):
1171 slacapra 1.1 """
1172 ewv 1.131 return job requirements to add to jdl files
1173 slacapra 1.1 """
1174     req = ''
1175 slacapra 1.47 if self.version:
1176 slacapra 1.10 req='Member("VO-cms-' + \
1177 slacapra 1.47 self.version + \
1178 slacapra 1.10 '", other.GlueHostApplicationSoftwareRunTimeEnvironment)'
1179 farinafa 1.111 ## SL add requirement for OS version only if SL4
1180     #reSL4 = re.compile( r'slc4' )
1181 slacapra 1.109 if self.executable_arch: # and reSL4.search(self.executable_arch):
1182 gutsche 1.107 req+=' && Member("VO-cms-' + \
1183 slacapra 1.105 self.executable_arch + \
1184     '", other.GlueHostApplicationSoftwareRunTimeEnvironment)'
1185 gutsche 1.35
1186     req = req + ' && (other.GlueHostNetworkAdapterOutboundIP)'
1187    
1188 slacapra 1.1 return req
1189 gutsche 1.3
1190     def configFilename(self):
1191     """ return the config filename """
1192     return self.name()+'.cfg'
1193    
1194     def wsSetupCMSOSGEnvironment_(self):
1195     """
1196     Returns part of a job script which is prepares
1197     the execution environment and which is common for all CMS jobs.
1198     """
1199 fanzago 1.133 txt = ' echo ">>> setup CMS OSG environment:"\n'
1200     txt += ' echo "set SCRAM ARCH to ' + self.executable_arch + '"\n'
1201     txt += ' export SCRAM_ARCH='+self.executable_arch+'\n'
1202 fanzago 1.136 txt += ' echo "SCRAM_ARCH = $SCRAM_ARCH"\n'
1203 ewv 1.135 txt += ' if [ -f $OSG_APP/cmssoft/cms/cmsset_default.sh ] ;then\n'
1204 mkirn 1.40 txt += ' # Use $OSG_APP/cmssoft/cms/cmsset_default.sh to setup cms software\n'
1205 fanzago 1.133 txt += ' source $OSG_APP/cmssoft/cms/cmsset_default.sh '+self.version+'\n'
1206     txt += ' else\n'
1207 ewv 1.135 txt += ' echo "SET_CMS_ENV 10020 ==> ERROR $OSG_APP/cmssoft/cms/cmsset_default.sh file not found"\n'
1208 fanzago 1.133 txt += ' echo "JOB_EXIT_STATUS = 10020"\n'
1209     txt += ' echo "JobExitCode=10020" | tee -a $RUNTIME_AREA/$repo\n'
1210     txt += ' dumpStatus $RUNTIME_AREA/$repo\n'
1211 gutsche 1.3 txt += '\n'
1212 fanzago 1.133 txt += ' cd $RUNTIME_AREA\n'
1213     txt += ' echo ">>> current directory (RUNTIME_AREA): $RUNTIME_AREA"\n'
1214     txt += ' echo ">>> Remove working directory: $WORKING_DIR"\n'
1215     txt += ' /bin/rm -rf $WORKING_DIR\n'
1216     txt += ' if [ -d $WORKING_DIR ] ;then\n'
1217 ewv 1.135 txt += ' echo "SET_CMS_ENV 10017 ==> OSG $WORKING_DIR could not be deleted on WN `hostname` after $OSG_APP/cmssoft/cms/cmsset_default.sh file not found"\n'
1218 fanzago 1.133 txt += ' echo "JOB_EXIT_STATUS = 10017"\n'
1219     txt += ' echo "JobExitCode=10017" | tee -a $RUNTIME_AREA/$repo\n'
1220     txt += ' dumpStatus $RUNTIME_AREA/$repo\n'
1221     txt += ' fi\n'
1222 gutsche 1.3 txt += '\n'
1223 fanzago 1.133 txt += ' exit 1\n'
1224     txt += ' fi\n'
1225 gutsche 1.3 txt += '\n'
1226 fanzago 1.133 txt += ' echo "SET_CMS_ENV 0 ==> setup cms environment ok"\n'
1227 fanzago 1.136 txt += ' echo "SCRAM_ARCH = $SCRAM_ARCH"\n'
1228 gutsche 1.3
1229     return txt
1230 ewv 1.131
1231 gutsche 1.3 ### OLI_DANIELE
1232     def wsSetupCMSLCGEnvironment_(self):
1233     """
1234     Returns part of a job script which is prepares
1235     the execution environment and which is common for all CMS jobs.
1236     """
1237 fanzago 1.133 txt = ' echo ">>> setup CMS LCG environment:"\n'
1238     txt += ' echo "set SCRAM ARCH and BUILD_ARCH to ' + self.executable_arch + ' ###"\n'
1239     txt += ' export SCRAM_ARCH='+self.executable_arch+'\n'
1240     txt += ' export BUILD_ARCH='+self.executable_arch+'\n'
1241     txt += ' if [ ! $VO_CMS_SW_DIR ] ;then\n'
1242     txt += ' echo "SET_CMS_ENV 10031 ==> ERROR CMS software dir not found on WN `hostname`"\n'
1243     txt += ' echo "JOB_EXIT_STATUS = 10031" \n'
1244     txt += ' echo "JobExitCode=10031" | tee -a $RUNTIME_AREA/$repo\n'
1245     txt += ' dumpStatus $RUNTIME_AREA/$repo\n'
1246     txt += ' exit 1\n'
1247     txt += ' else\n'
1248     txt += ' echo "Sourcing environment... "\n'
1249     txt += ' if [ ! -s $VO_CMS_SW_DIR/cmsset_default.sh ] ;then\n'
1250     txt += ' echo "SET_CMS_ENV 10020 ==> ERROR cmsset_default.sh file not found into dir $VO_CMS_SW_DIR"\n'
1251     txt += ' echo "JOB_EXIT_STATUS = 10020"\n'
1252     txt += ' echo "JobExitCode=10020" | tee -a $RUNTIME_AREA/$repo\n'
1253     txt += ' dumpStatus $RUNTIME_AREA/$repo\n'
1254     txt += ' exit 1\n'
1255     txt += ' fi\n'
1256     txt += ' echo "sourcing $VO_CMS_SW_DIR/cmsset_default.sh"\n'
1257     txt += ' source $VO_CMS_SW_DIR/cmsset_default.sh\n'
1258     txt += ' result=$?\n'
1259     txt += ' if [ $result -ne 0 ]; then\n'
1260     txt += ' echo "SET_CMS_ENV 10032 ==> ERROR problem sourcing $VO_CMS_SW_DIR/cmsset_default.sh"\n'
1261     txt += ' echo "JOB_EXIT_STATUS = 10032"\n'
1262     txt += ' echo "JobExitCode=10032" | tee -a $RUNTIME_AREA/$repo\n'
1263     txt += ' dumpStatus $RUNTIME_AREA/$repo\n'
1264     txt += ' exit 1\n'
1265     txt += ' fi\n'
1266     txt += ' fi\n'
1267     txt += ' \n'
1268     txt += ' echo "SET_CMS_ENV 0 ==> setup cms environment ok"\n'
1269 gutsche 1.3 return txt
1270 gutsche 1.5
1271 ewv 1.131 ### FEDE FOR DBS OUTPUT PUBLICATION
1272 fanzago 1.93 def modifyReport(self, nj):
1273     """
1274 ewv 1.131 insert the part of the script that modifies the FrameworkJob Report
1275 fanzago 1.93 """
1276 fanzago 1.94
1277 ewv 1.131 txt = ''
1278 fanzago 1.94 try:
1279 ewv 1.131 publish_data = int(self.cfg_params['USER.publish_data'])
1280 fanzago 1.94 except KeyError:
1281     publish_data = 0
1282 ewv 1.131 if (publish_data == 1):
1283 fanzago 1.133 txt += 'echo ">>> Modify Job Report:" \n'
1284 fanzago 1.122 ################ FEDE FOR DBS2 #############################################
1285 fanzago 1.152 #txt += 'chmod a+x $SOFTWARE_DIR/ProdAgentApi/FwkJobRep/ModifyJobReport.py\n'
1286     txt += 'chmod a+x $SOFTWARE_DIR/ProdCommon/ProdCommon/FwkJobRep/ModifyJobReport.py\n'
1287 fanzago 1.122 #############################################################################
1288 fanzago 1.94
1289 fanzago 1.122 txt += 'if [ -z "$SE" ]; then\n'
1290     txt += ' SE="" \n'
1291 ewv 1.131 txt += 'fi \n'
1292 fanzago 1.122 txt += 'if [ -z "$SE_PATH" ]; then\n'
1293     txt += ' SE_PATH="" \n'
1294 ewv 1.131 txt += 'fi \n'
1295     txt += 'echo "SE = $SE"\n'
1296 fanzago 1.122 txt += 'echo "SE_PATH = $SE_PATH"\n'
1297 fanzago 1.94
1298     processedDataset = self.cfg_params['USER.publish_data_name']
1299     txt += 'ProcessedDataset='+processedDataset+'\n'
1300     #### LFN=/store/user/<user>/processedDataset_PSETHASH
1301     txt += 'if [ "$SE_PATH" == "" ]; then\n'
1302 fanzago 1.101 #### FEDE: added slash in LFN ##############
1303     txt += ' FOR_LFN=/copy_problems/ \n'
1304 ewv 1.131 txt += 'else \n'
1305 fanzago 1.94 txt += ' tmp=`echo $SE_PATH | awk -F \'store\' \'{print$2}\'` \n'
1306 fanzago 1.101 ##### FEDE TO BE CHANGED, BECAUSE STORE IS HARDCODED!!!! ########
1307 fanzago 1.94 txt += ' FOR_LFN=/store$tmp \n'
1308 ewv 1.131 txt += 'fi \n'
1309 fanzago 1.94 txt += 'echo "ProcessedDataset = $ProcessedDataset"\n'
1310     txt += 'echo "FOR_LFN = $FOR_LFN" \n'
1311 spiga 1.103 txt += 'echo "CMSSW_VERSION = $CMSSW_VERSION"\n\n'
1312 fanzago 1.152 txt += 'echo "$SOFTWARE_DIR/ProdCommon/ProdCommon/FwkJobRep/ModifyJobReport.py crab_fjr_$NJob.xml $NJob $FOR_LFN $PrimaryDataset $DataTier $ProcessedDataset $ApplicationFamily $executable $CMSSW_VERSION $PSETHASH $SE $SE_PATH"\n'
1313     txt += '$SOFTWARE_DIR/ProdCommon/ProdCommon/FwkJobRep/ModifyJobReport.py crab_fjr_$NJob.xml $NJob $FOR_LFN $PrimaryDataset $DataTier $ProcessedDataset $ApplicationFamily $executable $CMSSW_VERSION $PSETHASH $SE $SE_PATH\n'
1314 ewv 1.131
1315 spiga 1.103 txt += 'modifyReport_result=$?\n'
1316     txt += 'echo modifyReport_result = $modifyReport_result\n'
1317     txt += 'if [ $modifyReport_result -ne 0 ]; then\n'
1318     txt += ' exit_status=1\n'
1319     txt += ' echo "ERROR: Problem with ModifyJobReport"\n'
1320     txt += 'else\n'
1321     txt += ' mv NewFrameworkJobReport.xml crab_fjr_$NJob.xml\n'
1322     txt += 'fi\n'
1323 fanzago 1.94 else:
1324 fanzago 1.122 txt += 'echo "no data publication required"\n'
1325 fanzago 1.93 return txt
1326 fanzago 1.99
1327     def cleanEnv(self):
1328 ewv 1.131 txt = ''
1329     txt += 'if [ $middleware == OSG ]; then\n'
1330 fanzago 1.99 txt += ' cd $RUNTIME_AREA\n'
1331 fanzago 1.133 txt += ' echo ">>> current directory (RUNTIME_AREA): $RUNTIME_AREA"\n'
1332     txt += ' echo ">>> Remove working directory: $WORKING_DIR"\n'
1333 fanzago 1.99 txt += ' /bin/rm -rf $WORKING_DIR\n'
1334     txt += ' if [ -d $WORKING_DIR ] ;then\n'
1335 fanzago 1.133 txt += ' echo "SET_EXE 60999 ==> OSG $WORKING_DIR could not be deleted on WN `hostname` after cleanup of WN"\n'
1336     txt += ' echo "JOB_EXIT_STATUS = 60999"\n'
1337     txt += ' echo "JobExitCode=60999" | tee -a $RUNTIME_AREA/$repo\n'
1338     txt += ' dumpStatus $RUNTIME_AREA/$repo\n'
1339 fanzago 1.99 txt += ' fi\n'
1340     txt += 'fi\n'
1341     txt += '\n'
1342     return txt
1343 fanzago 1.93
1344 gutsche 1.5 def setParam_(self, param, value):
1345     self._params[param] = value
1346    
1347     def getParams(self):
1348     return self._params
1349 gutsche 1.8
1350 gutsche 1.35 def uniquelist(self, old):
1351     """
1352     remove duplicates from a list
1353     """
1354     nd={}
1355     for e in old:
1356     nd[e]=0
1357     return nd.keys()
1358 mcinquil 1.121
1359    
1360     def checkOut(self, limit):
1361     """
1362     check the dimension of the output files
1363     """
1364 mcinquil 1.142 txt = 'echo ">>> Starting output sandbox limit check :"\n'
1365 mcinquil 1.121 allOutFiles = ""
1366     listOutFiles = []
1367 slacapra 1.151 txt += 'stdoutFile=`ls *stdout` \n'
1368     txt += 'stderrFile=`ls *stderr` \n'
1369 fanzago 1.148 if (self.return_data == 1):
1370     for fileOut in (self.output_file+self.output_file_sandbox):
1371     allOutFiles = allOutFiles + " " + self.numberFile_(fileOut, '$NJob') + " $stdoutFile $stderrFile"
1372     else:
1373     for fileOut in (self.output_file_sandbox):
1374     txt += 'echo " '+fileOut+'";\n'
1375     allOutFiles = allOutFiles + " " + self.numberFile_(fileOut, '$NJob') + " $stdoutFile $stderrFile"
1376 mcinquil 1.121 txt += 'echo "OUTPUT files: '+str(allOutFiles)+'";\n'
1377     txt += 'ls -gGhrta;\n'
1378     txt += 'sum=0;\n'
1379     txt += 'for file in '+str(allOutFiles)+' ; do\n'
1380     txt += ' if [ -e $file ]; then\n'
1381     txt += ' tt=`ls -gGrta $file | awk \'{ print $3 }\'`\n'
1382     txt += ' sum=`expr $sum + $tt`\n'
1383     txt += ' else\n'
1384     txt += ' echo "WARNING: output file $file not found!"\n'
1385     txt += ' fi\n'
1386     txt += 'done\n'
1387     txt += 'echo "Total Output dimension: $sum";\n'
1388     txt += 'limit='+str(limit)+';\n'
1389     txt += 'echo "OUTPUT FILES LIMIT SET TO: $limit";\n'
1390     txt += 'if [ $limit -lt $sum ]; then\n'
1391     txt += ' echo "WARNING: output files have to big size - something will be lost;"\n'
1392     txt += ' echo " checking the output file sizes..."\n'
1393     txt += ' tot=0;\n'
1394 mcinquil 1.143 txt += ' for filefile in '+str(allOutFiles)+' ; do\n'
1395     txt += ' dimFile=`ls -gGrta $filefile | awk \'{ print $3 }\';`\n'
1396 mcinquil 1.121 txt += ' tot=`expr $tot + $tt`;\n'
1397 mcinquil 1.143 txt += ' if [ $limit -lt $dimFile ]; then\n'
1398     txt += ' echo "deleting file: $filefile";\n'
1399     txt += ' rm -f $filefile\n'
1400     txt += ' elif [ $limit -lt $tot ]; then\n'
1401     txt += ' echo "deleting file: $filefile";\n'
1402     txt += ' rm -f $filefile\n'
1403     txt += ' else\n'
1404     txt += ' echo "saving file: $filefile"\n'
1405 mcinquil 1.121 txt += ' fi\n'
1406     txt += ' done\n'
1407 mcinquil 1.143
1408 mcinquil 1.121 txt += ' ls -agGhrt;\n'
1409     txt += ' echo "WARNING: output files are too big in dimension: can not put in the output_sandbox.";\n'
1410     txt += ' echo "JOB_EXIT_STATUS = 70000";\n'
1411     txt += ' exit_status=70000;\n'
1412     txt += 'else'
1413     txt += ' echo "Total Output dimension $sum is fine.";\n'
1414     txt += 'fi\n'
1415 fanzago 1.133 txt += 'echo "Ending output sandbox limit check"\n'
1416 mcinquil 1.121 return txt