ViewVC Help
View File | Revision Log | Show Annotations | Root Listing
root/cvsroot/COMP/CRAB/python/cms_cmssw.py
Revision: 1.191
Committed: Tue May 27 22:14:26 2008 UTC (16 years, 11 months ago) by spiga
Content type: text/x-python
Branch: MAIN
CVS Tags: PRODCOMMON_0_10_7_testCS2
Changes since 1.190: +1 -1 lines
Log Message:
fix for additonal input file

File Contents

# User Rev Content
1 slacapra 1.1 from JobType import JobType
2     from crab_logger import Logger
3     from crab_exceptions import *
4     from crab_util import *
5 fanzago 1.115 from BlackWhiteListParser import BlackWhiteListParser
6 slacapra 1.1 import common
7     import Scram
8 fanzago 1.173 from LFNBaseName import *
9 slacapra 1.1
10 slacapra 1.105 import os, string, glob
11 slacapra 1.1
12     class Cmssw(JobType):
13 mcinquil 1.144 def __init__(self, cfg_params, ncjobs):
14 slacapra 1.1 JobType.__init__(self, 'CMSSW')
15     common.logger.debug(3,'CMSSW::__init__')
16    
17 mcinquil 1.140 self.argsList = []
18 mcinquil 1.144
19 gutsche 1.3 self._params = {}
20     self.cfg_params = cfg_params
21 fanzago 1.115 # init BlackWhiteListParser
22     self.blackWhiteListParser = BlackWhiteListParser(cfg_params)
23    
24 slacapra 1.153 self.MaxTarBallSize = float(self.cfg_params.get('EDG.maxtarballsize',9.5))
25 gutsche 1.72
26 gutsche 1.44 # number of jobs requested to be created, limit obj splitting
27 gutsche 1.38 self.ncjobs = ncjobs
28    
29 slacapra 1.1 log = common.logger
30 ewv 1.131
31 slacapra 1.1 self.scram = Scram.Scram(cfg_params)
32     self.additional_inbox_files = []
33     self.scriptExe = ''
34     self.executable = ''
35 slacapra 1.71 self.executable_arch = self.scram.getArch()
36 slacapra 1.1 self.tgz_name = 'default.tgz'
37 corvo 1.56 self.scriptName = 'CMSSW.sh'
38 spiga 1.187 self.pset = ''
39     self.datasetPath = ''
40 gutsche 1.3
41 gutsche 1.50 # set FJR file name
42     self.fjrFileName = 'crab_fjr.xml'
43    
44 slacapra 1.1 self.version = self.scram.getSWVersion()
45 ewv 1.182 version_array = self.version.split('_')
46 ewv 1.184 self.CMSSW_major = 0
47     self.CMSSW_minor = 0
48     self.CMSSW_patch = 0
49 ewv 1.182 try:
50 ewv 1.184 self.CMSSW_major = int(version_array[1])
51     self.CMSSW_minor = int(version_array[2])
52     self.CMSSW_patch = int(version_array[3])
53 ewv 1.182 except:
54 ewv 1.184 msg = "Cannot parse CMSSW version string: " + self.version + " for major and minor release number!"
55 ewv 1.182 raise CrabException(msg)
56    
57 slacapra 1.1 ### collect Data cards
58 gutsche 1.66
59 slacapra 1.153 if not cfg_params.has_key('CMSSW.datasetpath'):
60 ewv 1.131 msg = "Error: datasetpath not defined "
61 slacapra 1.1 raise CrabException(msg)
62 slacapra 1.153 tmp = cfg_params['CMSSW.datasetpath']
63     log.debug(6, "CMSSW::CMSSW(): datasetPath = "+tmp)
64     if string.lower(tmp)=='none':
65     self.datasetPath = None
66     self.selectNoInput = 1
67     else:
68     self.datasetPath = tmp
69     self.selectNoInput = 0
70 gutsche 1.5
71 slacapra 1.1 self.dataTiers = []
72 spiga 1.188
73     self.debug_pset = cfg_params.get('USER.debug_pset',False)
74 slacapra 1.1
75     ## now the application
76 slacapra 1.153 self.executable = cfg_params.get('CMSSW.executable','cmsRun')
77     log.debug(6, "CMSSW::CMSSW(): executable = "+self.executable)
78 slacapra 1.1
79 slacapra 1.153 if not cfg_params.has_key('CMSSW.pset'):
80 slacapra 1.1 raise CrabException("PSet file missing. Cannot run cmsRun ")
81 slacapra 1.153 self.pset = cfg_params['CMSSW.pset']
82     log.debug(6, "Cmssw::Cmssw(): PSet file = "+self.pset)
83     if self.pset.lower() != 'none' :
84     if (not os.path.exists(self.pset)):
85     raise CrabException("User defined PSet file "+self.pset+" does not exist")
86     else:
87     self.pset = None
88 slacapra 1.1
89     # output files
90 slacapra 1.53 ## stuff which must be returned always via sandbox
91     self.output_file_sandbox = []
92    
93     # add fjr report by default via sandbox
94     self.output_file_sandbox.append(self.fjrFileName)
95    
96     # other output files to be returned via sandbox or copied to SE
97 slacapra 1.153 self.output_file = []
98     tmp = cfg_params.get('CMSSW.output_file',None)
99     if tmp :
100     tmpOutFiles = string.split(tmp,',')
101     log.debug(7, 'cmssw::cmssw(): output files '+str(tmpOutFiles))
102     for tmp in tmpOutFiles:
103     tmp=string.strip(tmp)
104     self.output_file.append(tmp)
105 slacapra 1.1 pass
106 slacapra 1.153 else:
107 gutsche 1.92 log.message("No output file defined: only stdout/err and the CRAB Framework Job Report will be available\n")
108 slacapra 1.153 pass
109 slacapra 1.1
110     # script_exe file as additional file in inputSandbox
111 slacapra 1.153 self.scriptExe = cfg_params.get('USER.script_exe',None)
112     if self.scriptExe :
113 slacapra 1.176 if not os.path.isfile(self.scriptExe):
114     msg ="ERROR. file "+self.scriptExe+" not found"
115     raise CrabException(msg)
116     self.additional_inbox_files.append(string.strip(self.scriptExe))
117 slacapra 1.70
118 spiga 1.42 if self.datasetPath == None and self.pset == None and self.scriptExe == '' :
119 slacapra 1.176 msg ="Error. script_exe not defined"
120     raise CrabException(msg)
121 spiga 1.42
122 slacapra 1.1 ## additional input files
123 slacapra 1.153 if cfg_params.has_key('USER.additional_input_files'):
124 slacapra 1.29 tmpAddFiles = string.split(cfg_params['USER.additional_input_files'],',')
125 slacapra 1.70 for tmp in tmpAddFiles:
126     tmp = string.strip(tmp)
127     dirname = ''
128     if not tmp[0]=="/": dirname = "."
129 corvo 1.85 files = []
130     if string.find(tmp,"*")>-1:
131     files = glob.glob(os.path.join(dirname, tmp))
132     if len(files)==0:
133     raise CrabException("No additional input file found with this pattern: "+tmp)
134     else:
135     files.append(tmp)
136 slacapra 1.70 for file in files:
137     if not os.path.exists(file):
138     raise CrabException("Additional input file not found: "+file)
139 slacapra 1.45 pass
140 slacapra 1.105 self.additional_inbox_files.append(string.strip(file))
141 slacapra 1.1 pass
142     pass
143 slacapra 1.70 common.logger.debug(5,"Additional input files: "+str(self.additional_inbox_files))
144 slacapra 1.153 pass
145 gutsche 1.3
146 slacapra 1.9 ## Events per job
147 slacapra 1.153 if cfg_params.has_key('CMSSW.events_per_job'):
148 slacapra 1.10 self.eventsPerJob =int( cfg_params['CMSSW.events_per_job'])
149 slacapra 1.9 self.selectEventsPerJob = 1
150 slacapra 1.153 else:
151 slacapra 1.9 self.eventsPerJob = -1
152     self.selectEventsPerJob = 0
153 ewv 1.131
154 slacapra 1.22 ## number of jobs
155 slacapra 1.153 if cfg_params.has_key('CMSSW.number_of_jobs'):
156 slacapra 1.22 self.theNumberOfJobs =int( cfg_params['CMSSW.number_of_jobs'])
157     self.selectNumberOfJobs = 1
158 slacapra 1.153 else:
159 slacapra 1.22 self.theNumberOfJobs = 0
160     self.selectNumberOfJobs = 0
161 slacapra 1.10
162 slacapra 1.153 if cfg_params.has_key('CMSSW.total_number_of_events'):
163 gutsche 1.35 self.total_number_of_events = int(cfg_params['CMSSW.total_number_of_events'])
164     self.selectTotalNumberEvents = 1
165 slacapra 1.153 else:
166 gutsche 1.35 self.total_number_of_events = 0
167     self.selectTotalNumberEvents = 0
168    
169 spiga 1.187 if self.pset != None:
170 spiga 1.42 if ( (self.selectTotalNumberEvents + self.selectEventsPerJob + self.selectNumberOfJobs) != 2 ):
171     msg = 'Must define exactly two of total_number_of_events, events_per_job, or number_of_jobs.'
172     raise CrabException(msg)
173     else:
174     if (self.selectNumberOfJobs == 0):
175     msg = 'Must specify number_of_jobs.'
176     raise CrabException(msg)
177 gutsche 1.35
178 ewv 1.160 ## New method of dealing with seeds
179     self.incrementSeeds = []
180     self.preserveSeeds = []
181     if cfg_params.has_key('CMSSW.preserve_seeds'):
182     tmpList = cfg_params['CMSSW.preserve_seeds'].split(',')
183     for tmp in tmpList:
184     tmp.strip()
185     self.preserveSeeds.append(tmp)
186     if cfg_params.has_key('CMSSW.increment_seeds'):
187     tmpList = cfg_params['CMSSW.increment_seeds'].split(',')
188     for tmp in tmpList:
189     tmp.strip()
190     self.incrementSeeds.append(tmp)
191    
192     ## Old method of dealing with seeds
193     ## FUTURE: This is for old CMSSW and old CRAB. Can throw exceptions after a couple of CRAB releases and then
194     ## remove
195 slacapra 1.153 self.sourceSeed = cfg_params.get('CMSSW.pythia_seed',None)
196 ewv 1.160 if self.sourceSeed:
197 slacapra 1.177 print "pythia_seed is a deprecated parameter. Use preserve_seeds or increment_seeds in the future.\n","Added to increment_seeds."
198     self.incrementSeeds.append('sourceSeed')
199 ewv 1.185 self.incrementSeeds.append('theSource')
200 slacapra 1.153
201     self.sourceSeedVtx = cfg_params.get('CMSSW.vtx_seed',None)
202 ewv 1.160 if self.sourceSeedVtx:
203 slacapra 1.177 print "vtx_seed is a deprecated parameter. Use preserve_seeds or increment_seeds in the future.\n","Added to increment_seeds."
204     self.incrementSeeds.append('VtxSmeared')
205 slacapra 1.22
206 slacapra 1.153 self.sourceSeedG4 = cfg_params.get('CMSSW.g4_seed',None)
207 ewv 1.160 if self.sourceSeedG4:
208 slacapra 1.177 print "g4_seed is a deprecated parameter. Use preserve_seeds or increment_seeds in the future.\n","Added to increment_seeds."
209     self.incrementSeeds.append('g4SimHits')
210 slacapra 1.90
211 slacapra 1.153 self.sourceSeedMix = cfg_params.get('CMSSW.mix_seed',None)
212 ewv 1.160 if self.sourceSeedMix:
213 slacapra 1.177 print "mix_seed is a deprecated parameter. Use preserve_seeds or increment_seeds in the future.\n","Added to increment_seeds."
214     self.incrementSeeds.append('mix')
215 slacapra 1.90
216 slacapra 1.153 self.firstRun = cfg_params.get('CMSSW.first_run',None)
217 slacapra 1.90
218 spiga 1.42 if self.pset != None: #CarlosDaniele
219 ewv 1.131 import PsetManipulator as pp
220 slacapra 1.97 PsetEdit = pp.PsetManipulator(self.pset) #Daniele Pset
221 gutsche 1.3
222 ewv 1.147 # Copy/return
223    
224 slacapra 1.153 self.copy_data = int(cfg_params.get('USER.copy_data',0))
225     self.return_data = int(cfg_params.get('USER.return_data',0))
226 ewv 1.147
227 slacapra 1.1 #DBSDLS-start
228 ewv 1.131 ## Initialize the variables that are extracted from DBS/DLS and needed in other places of the code
229 slacapra 1.1 self.maxEvents=0 # max events available ( --> check the requested nb. of evts in Creator.py)
230     self.DBSPaths={} # all dbs paths requested ( --> input to the site local discovery script)
231 gutsche 1.35 self.jobDestination=[] # Site destination(s) for each job (list of lists)
232 slacapra 1.1 ## Perform the data location and discovery (based on DBS/DLS)
233 slacapra 1.9 ## SL: Don't if NONE is specified as input (pythia use case)
234 gutsche 1.35 blockSites = {}
235 slacapra 1.9 if self.datasetPath:
236 gutsche 1.35 blockSites = self.DataDiscoveryAndLocation(cfg_params)
237 ewv 1.131 #DBSDLS-end
238 slacapra 1.1
239 ewv 1.131
240 slacapra 1.9 ## Select Splitting
241 ewv 1.131 if self.selectNoInput:
242 spiga 1.187 if self.pset == None:
243 spiga 1.42 self.jobSplittingForScript()
244     else:
245     self.jobSplittingNoInput()
246 gutsche 1.92 else:
247 corvo 1.56 self.jobSplittingByBlocks(blockSites)
248 gutsche 1.5
249 slacapra 1.22 # modify Pset
250 spiga 1.187 if self.pset != None:
251 slacapra 1.86 try:
252 ewv 1.160 # Add FrameworkJobReport to parameter-set, set max events.
253     # Reset later for data jobs by writeCFG which does all modifications
254 ewv 1.182 PsetEdit.addCrabFJR(self.fjrFileName) # FUTURE: Job report addition not needed by CMSSW>1.5
255 ewv 1.160 PsetEdit.maxEvent(self.eventsPerJob)
256 slacapra 1.90 PsetEdit.psetWriter(self.configFilename())
257 slacapra 1.86 except:
258 ewv 1.184 msg='Error while manipulating ParameterSet: exiting...'
259 slacapra 1.86 raise CrabException(msg)
260 spiga 1.179 self.tgzNameWithPath = self.getTarBall(self.executable)
261 gutsche 1.3
262 slacapra 1.1 def DataDiscoveryAndLocation(self, cfg_params):
263    
264 slacapra 1.86 import DataDiscovery
265     import DataLocation
266 gutsche 1.3 common.logger.debug(10,"CMSSW::DataDiscoveryAndLocation()")
267    
268     datasetPath=self.datasetPath
269    
270 slacapra 1.1 ## Contact the DBS
271 gutsche 1.92 common.logger.message("Contacting Data Discovery Services ...")
272 slacapra 1.1 try:
273 slacapra 1.137 self.pubdata=DataDiscovery.DataDiscovery(datasetPath, cfg_params)
274 slacapra 1.1 self.pubdata.fetchDBSInfo()
275    
276 slacapra 1.41 except DataDiscovery.NotExistingDatasetError, ex :
277 slacapra 1.1 msg = 'ERROR ***: failed Data Discovery in DBS : %s'%ex.getErrorMessage()
278     raise CrabException(msg)
279 slacapra 1.41 except DataDiscovery.NoDataTierinProvenanceError, ex :
280 slacapra 1.1 msg = 'ERROR ***: failed Data Discovery in DBS : %s'%ex.getErrorMessage()
281     raise CrabException(msg)
282 slacapra 1.41 except DataDiscovery.DataDiscoveryError, ex:
283 gutsche 1.66 msg = 'ERROR ***: failed Data Discovery in DBS : %s'%ex.getErrorMessage()
284 slacapra 1.1 raise CrabException(msg)
285    
286 gutsche 1.35 self.filesbyblock=self.pubdata.getFiles()
287 mkirn 1.37 self.eventsbyblock=self.pubdata.getEventsPerBlock()
288     self.eventsbyfile=self.pubdata.getEventsPerFile()
289 gutsche 1.3
290 slacapra 1.1 ## get max number of events
291 spiga 1.187 self.maxEvents=self.pubdata.getMaxEvents()
292 slacapra 1.1
293     ## Contact the DLS and build a list of sites hosting the fileblocks
294     try:
295 slacapra 1.41 dataloc=DataLocation.DataLocation(self.filesbyblock.keys(),cfg_params)
296 gutsche 1.6 dataloc.fetchDLSInfo()
297 slacapra 1.41 except DataLocation.DataLocationError , ex:
298 slacapra 1.1 msg = 'ERROR ***: failed Data Location in DLS \n %s '%ex.getErrorMessage()
299     raise CrabException(msg)
300 ewv 1.131
301 slacapra 1.1
302 gutsche 1.35 sites = dataloc.getSites()
303     allSites = []
304     listSites = sites.values()
305 slacapra 1.63 for listSite in listSites:
306     for oneSite in listSite:
307 gutsche 1.35 allSites.append(oneSite)
308     allSites = self.uniquelist(allSites)
309 gutsche 1.3
310 gutsche 1.92 # screen output
311     common.logger.message("Requested dataset: " + datasetPath + " has " + str(self.maxEvents) + " events in " + str(len(self.filesbyblock.keys())) + " blocks.\n")
312    
313 gutsche 1.35 return sites
314 ewv 1.131
315 gutsche 1.35 def jobSplittingByBlocks(self, blockSites):
316 slacapra 1.9 """
317 gutsche 1.35 Perform job splitting. Jobs run over an integer number of files
318     and no more than one block.
319     ARGUMENT: blockSites: dictionary with blocks as keys and list of host sites as values
320     REQUIRES: self.selectTotalNumberEvents, self.selectEventsPerJob, self.selectNumberofJobs,
321     self.total_number_of_events, self.eventsPerJob, self.theNumberOfJobs,
322     self.maxEvents, self.filesbyblock
323     SETS: self.jobDestination - Site destination(s) for each job (a list of lists)
324     self.total_number_of_jobs - Total # of jobs
325     self.list_of_args - File(s) job will run on (a list of lists)
326     """
327    
328     # ---- Handle the possible job splitting configurations ---- #
329     if (self.selectTotalNumberEvents):
330     totalEventsRequested = self.total_number_of_events
331     if (self.selectEventsPerJob):
332     eventsPerJobRequested = self.eventsPerJob
333     if (self.selectNumberOfJobs):
334     totalEventsRequested = self.theNumberOfJobs * self.eventsPerJob
335    
336     # If user requested all the events in the dataset
337     if (totalEventsRequested == -1):
338     eventsRemaining=self.maxEvents
339     # If user requested more events than are in the dataset
340     elif (totalEventsRequested > self.maxEvents):
341     eventsRemaining = self.maxEvents
342     common.logger.message("Requested "+str(self.total_number_of_events)+ " events, but only "+str(self.maxEvents)+" events are available.")
343     # If user requested less events than are in the dataset
344     else:
345     eventsRemaining = totalEventsRequested
346 slacapra 1.22
347 slacapra 1.41 # If user requested more events per job than are in the dataset
348     if (self.selectEventsPerJob and eventsPerJobRequested > self.maxEvents):
349     eventsPerJobRequested = self.maxEvents
350    
351 gutsche 1.35 # For user info at end
352     totalEventCount = 0
353 gutsche 1.3
354 gutsche 1.35 if (self.selectTotalNumberEvents and self.selectNumberOfJobs):
355     eventsPerJobRequested = int(eventsRemaining/self.theNumberOfJobs)
356 slacapra 1.22
357 gutsche 1.35 if (self.selectNumberOfJobs):
358     common.logger.message("May not create the exact number_of_jobs requested.")
359 slacapra 1.23
360 gutsche 1.38 if ( self.ncjobs == 'all' ) :
361     totalNumberOfJobs = 999999999
362     else :
363     totalNumberOfJobs = self.ncjobs
364 ewv 1.131
365 gutsche 1.35 blocks = blockSites.keys()
366     blockCount = 0
367     # Backup variable in case self.maxEvents counted events in a non-included block
368     numBlocksInDataset = len(blocks)
369 gutsche 1.3
370 gutsche 1.35 jobCount = 0
371     list_of_lists = []
372 gutsche 1.3
373 gutsche 1.92 # list tracking which jobs are in which jobs belong to which block
374     jobsOfBlock = {}
375    
376 gutsche 1.35 # ---- Iterate over the blocks in the dataset until ---- #
377     # ---- we've met the requested total # of events ---- #
378 gutsche 1.38 while ( (eventsRemaining > 0) and (blockCount < numBlocksInDataset) and (jobCount < totalNumberOfJobs)):
379 gutsche 1.35 block = blocks[blockCount]
380 gutsche 1.44 blockCount += 1
381 gutsche 1.104 if block not in jobsOfBlock.keys() :
382     jobsOfBlock[block] = []
383 ewv 1.131
384 gutsche 1.68 if self.eventsbyblock.has_key(block) :
385     numEventsInBlock = self.eventsbyblock[block]
386     common.logger.debug(5,'Events in Block File '+str(numEventsInBlock))
387 ewv 1.131
388 gutsche 1.68 files = self.filesbyblock[block]
389     numFilesInBlock = len(files)
390     if (numFilesInBlock <= 0):
391     continue
392     fileCount = 0
393    
394     # ---- New block => New job ---- #
395 ewv 1.131 parString = ""
396 gutsche 1.68 # counter for number of events in files currently worked on
397     filesEventCount = 0
398     # flag if next while loop should touch new file
399     newFile = 1
400     # job event counter
401     jobSkipEventCount = 0
402 ewv 1.131
403 gutsche 1.68 # ---- Iterate over the files in the block until we've met the requested ---- #
404     # ---- total # of events or we've gone over all the files in this block ---- #
405     while ( (eventsRemaining > 0) and (fileCount < numFilesInBlock) and (jobCount < totalNumberOfJobs) ):
406     file = files[fileCount]
407     if newFile :
408     try:
409     numEventsInFile = self.eventsbyfile[file]
410     common.logger.debug(6, "File "+str(file)+" has "+str(numEventsInFile)+" events")
411     # increase filesEventCount
412     filesEventCount += numEventsInFile
413     # Add file to current job
414     parString += '\\\"' + file + '\\\"\,'
415     newFile = 0
416     except KeyError:
417     common.logger.message("File "+str(file)+" has unknown number of events: skipping")
418 ewv 1.131
419 slacapra 1.177 eventsPerJobRequested = min(eventsPerJobRequested, eventsRemaining)
420 gutsche 1.68 # if less events in file remain than eventsPerJobRequested
421 slacapra 1.177 if ( filesEventCount - jobSkipEventCount < eventsPerJobRequested):
422 gutsche 1.68 # if last file in block
423     if ( fileCount == numFilesInBlock-1 ) :
424     # end job using last file, use remaining events in block
425     # close job and touch new file
426     fullString = parString[:-2]
427     list_of_lists.append([fullString,str(-1),str(jobSkipEventCount)])
428     common.logger.debug(3,"Job "+str(jobCount+1)+" can run over "+str(filesEventCount - jobSkipEventCount)+" events (last file in block).")
429     self.jobDestination.append(blockSites[block])
430     common.logger.debug(5,"Job "+str(jobCount+1)+" Destination: "+str(self.jobDestination[jobCount]))
431 gutsche 1.92 # fill jobs of block dictionary
432 gutsche 1.104 jobsOfBlock[block].append(jobCount+1)
433 gutsche 1.68 # reset counter
434     jobCount = jobCount + 1
435     totalEventCount = totalEventCount + filesEventCount - jobSkipEventCount
436     eventsRemaining = eventsRemaining - filesEventCount + jobSkipEventCount
437     jobSkipEventCount = 0
438     # reset file
439 ewv 1.131 parString = ""
440 gutsche 1.68 filesEventCount = 0
441     newFile = 1
442     fileCount += 1
443     else :
444     # go to next file
445     newFile = 1
446     fileCount += 1
447     # if events in file equal to eventsPerJobRequested
448     elif ( filesEventCount - jobSkipEventCount == eventsPerJobRequested ) :
449 gutsche 1.38 # close job and touch new file
450     fullString = parString[:-2]
451 gutsche 1.68 list_of_lists.append([fullString,str(eventsPerJobRequested),str(jobSkipEventCount)])
452     common.logger.debug(3,"Job "+str(jobCount+1)+" can run over "+str(eventsPerJobRequested)+" events.")
453 gutsche 1.38 self.jobDestination.append(blockSites[block])
454     common.logger.debug(5,"Job "+str(jobCount+1)+" Destination: "+str(self.jobDestination[jobCount]))
455 gutsche 1.104 jobsOfBlock[block].append(jobCount+1)
456 gutsche 1.38 # reset counter
457     jobCount = jobCount + 1
458 gutsche 1.68 totalEventCount = totalEventCount + eventsPerJobRequested
459     eventsRemaining = eventsRemaining - eventsPerJobRequested
460 gutsche 1.38 jobSkipEventCount = 0
461     # reset file
462 ewv 1.131 parString = ""
463 gutsche 1.38 filesEventCount = 0
464     newFile = 1
465     fileCount += 1
466 ewv 1.131
467 gutsche 1.68 # if more events in file remain than eventsPerJobRequested
468 gutsche 1.38 else :
469 gutsche 1.68 # close job but don't touch new file
470     fullString = parString[:-2]
471     list_of_lists.append([fullString,str(eventsPerJobRequested),str(jobSkipEventCount)])
472     common.logger.debug(3,"Job "+str(jobCount+1)+" can run over "+str(eventsPerJobRequested)+" events.")
473     self.jobDestination.append(blockSites[block])
474     common.logger.debug(5,"Job "+str(jobCount+1)+" Destination: "+str(self.jobDestination[jobCount]))
475 gutsche 1.104 jobsOfBlock[block].append(jobCount+1)
476 gutsche 1.68 # increase counter
477     jobCount = jobCount + 1
478     totalEventCount = totalEventCount + eventsPerJobRequested
479     eventsRemaining = eventsRemaining - eventsPerJobRequested
480     # calculate skip events for last file
481     # use filesEventCount (contains several files), jobSkipEventCount and eventsPerJobRequest
482     jobSkipEventCount = eventsPerJobRequested - (filesEventCount - jobSkipEventCount - self.eventsbyfile[file])
483     # remove all but the last file
484     filesEventCount = self.eventsbyfile[file]
485 ewv 1.160 parString = '\\\"' + file + '\\\"\,'
486 gutsche 1.68 pass # END if
487     pass # END while (iterate over files in the block)
488 gutsche 1.35 pass # END while (iterate over blocks in the dataset)
489 slacapra 1.41 self.ncjobs = self.total_number_of_jobs = jobCount
490 gutsche 1.38 if (eventsRemaining > 0 and jobCount < totalNumberOfJobs ):
491 gutsche 1.35 common.logger.message("Could not run on all requested events because some blocks not hosted at allowed sites.")
492 gutsche 1.92 common.logger.message(str(jobCount)+" job(s) can run on "+str(totalEventCount)+" events.\n")
493 ewv 1.131
494 gutsche 1.92 # screen output
495     screenOutput = "List of jobs and available destination sites:\n\n"
496    
497 mcinquil 1.124 # keep trace of block with no sites to print a warning at the end
498     noSiteBlock = []
499     bloskNoSite = []
500    
501 gutsche 1.92 blockCounter = 0
502 gutsche 1.104 for block in blocks:
503     if block in jobsOfBlock.keys() :
504     blockCounter += 1
505 slacapra 1.176 screenOutput += "Block %5i: jobs %20s: sites: %s\n" % (blockCounter,spanRanges(jobsOfBlock[block]),
506     ','.join(self.blackWhiteListParser.checkWhiteList(self.blackWhiteListParser.checkBlackList(blockSites[block],block),block)))
507 mcinquil 1.124 if len(self.blackWhiteListParser.checkWhiteList(self.blackWhiteListParser.checkBlackList(blockSites[block],block),block)) == 0:
508 ewv 1.131 noSiteBlock.append( spanRanges(jobsOfBlock[block]) )
509 mcinquil 1.124 bloskNoSite.append( blockCounter )
510 ewv 1.131
511 mcinquil 1.124 common.logger.message(screenOutput)
512 fanzago 1.127 if len(noSiteBlock) > 0 and len(bloskNoSite) > 0:
513 mcinquil 1.126 msg = 'WARNING: No sites are hosting any part of data for block:\n '
514     virgola = ""
515     if len(bloskNoSite) > 1:
516     virgola = ","
517     for block in bloskNoSite:
518     msg += ' ' + str(block) + virgola
519     msg += '\n Related jobs:\n '
520     virgola = ""
521     if len(noSiteBlock) > 1:
522     virgola = ","
523     for range_jobs in noSiteBlock:
524     msg += str(range_jobs) + virgola
525     msg += '\n will not be submitted and this block of data can not be analyzed!\n'
526 slacapra 1.155 if self.cfg_params.has_key('EDG.se_white_list'):
527     msg += 'WARNING: SE White List: '+self.cfg_params['EDG.se_white_list']+'\n'
528     msg += '(Hint: By whitelisting you force the job to run at this particular site(s).\n'
529     msg += 'Please check if the dataset is available at this site!)\n'
530     if self.cfg_params.has_key('EDG.ce_white_list'):
531     msg += 'WARNING: CE White List: '+self.cfg_params['EDG.ce_white_list']+'\n'
532     msg += '(Hint: By whitelisting you force the job to run at this particular site(s).\n'
533     msg += 'Please check if the dataset is available at this site!)\n'
534    
535 mcinquil 1.126 common.logger.message(msg)
536 gutsche 1.92
537 slacapra 1.9 self.list_of_args = list_of_lists
538     return
539    
540 slacapra 1.21 def jobSplittingNoInput(self):
541 slacapra 1.9 """
542     Perform job splitting based on number of event per job
543     """
544     common.logger.debug(5,'Splitting per events')
545 fanzago 1.130
546 ewv 1.131 if (self.selectEventsPerJob):
547 fanzago 1.130 common.logger.message('Required '+str(self.eventsPerJob)+' events per job ')
548     if (self.selectNumberOfJobs):
549     common.logger.message('Required '+str(self.theNumberOfJobs)+' jobs in total ')
550     if (self.selectTotalNumberEvents):
551     common.logger.message('Required '+str(self.total_number_of_events)+' events in total ')
552 slacapra 1.9
553 slacapra 1.10 if (self.total_number_of_events < 0):
554     msg='Cannot split jobs per Events with "-1" as total number of events'
555     raise CrabException(msg)
556    
557 slacapra 1.22 if (self.selectEventsPerJob):
558 spiga 1.65 if (self.selectTotalNumberEvents):
559     self.total_number_of_jobs = int(self.total_number_of_events/self.eventsPerJob)
560 ewv 1.131 elif(self.selectNumberOfJobs) :
561 spiga 1.65 self.total_number_of_jobs =self.theNumberOfJobs
562 ewv 1.131 self.total_number_of_events =int(self.theNumberOfJobs*self.eventsPerJob)
563 spiga 1.65
564 slacapra 1.22 elif (self.selectNumberOfJobs) :
565     self.total_number_of_jobs = self.theNumberOfJobs
566     self.eventsPerJob = int(self.total_number_of_events/self.total_number_of_jobs)
567 ewv 1.131
568 slacapra 1.9 common.logger.debug(5,'N jobs '+str(self.total_number_of_jobs))
569    
570     # is there any remainder?
571     check = int(self.total_number_of_events) - (int(self.total_number_of_jobs)*self.eventsPerJob)
572    
573     common.logger.debug(5,'Check '+str(check))
574    
575 gutsche 1.35 common.logger.message(str(self.total_number_of_jobs)+' jobs can be created, each for '+str(self.eventsPerJob)+' for a total of '+str(self.total_number_of_jobs*self.eventsPerJob)+' events')
576 slacapra 1.9 if check > 0:
577 gutsche 1.35 common.logger.message('Warning: asked '+str(self.total_number_of_events)+' but can do only '+str(int(self.total_number_of_jobs)*self.eventsPerJob))
578 slacapra 1.9
579 slacapra 1.10 # argument is seed number.$i
580 slacapra 1.9 self.list_of_args = []
581     for i in range(self.total_number_of_jobs):
582 gutsche 1.35 ## Since there is no input, any site is good
583 ewv 1.131 self.jobDestination.append([""]) #must be empty to write correctly the xml
584 slacapra 1.90 args=[]
585 spiga 1.57 if (self.firstRun):
586 slacapra 1.138 ## pythia first run
587 slacapra 1.90 args.append(str(self.firstRun)+str(i))
588     self.list_of_args.append(args)
589 ewv 1.131
590 gutsche 1.3 return
591    
592 spiga 1.42
593 spiga 1.187 def jobSplittingForScript(self):
594 spiga 1.42 """
595     Perform job splitting based on number of job
596     """
597     common.logger.debug(5,'Splitting per job')
598     common.logger.message('Required '+str(self.theNumberOfJobs)+' jobs in total ')
599    
600     self.total_number_of_jobs = self.theNumberOfJobs
601    
602     common.logger.debug(5,'N jobs '+str(self.total_number_of_jobs))
603    
604     common.logger.message(str(self.total_number_of_jobs)+' jobs can be created')
605    
606     # argument is seed number.$i
607     self.list_of_args = []
608     for i in range(self.total_number_of_jobs):
609     self.jobDestination.append([""])
610     self.list_of_args.append([str(i)])
611     return
612    
613 gutsche 1.3 def split(self, jobParams):
614 ewv 1.131
615 gutsche 1.3 njobs = self.total_number_of_jobs
616 slacapra 1.9 arglist = self.list_of_args
617 gutsche 1.3 # create the empty structure
618     for i in range(njobs):
619     jobParams.append("")
620 ewv 1.131
621 spiga 1.165 listID=[]
622     listField=[]
623 gutsche 1.3 for job in range(njobs):
624 slacapra 1.17 jobParams[job] = arglist[job]
625 spiga 1.167 listID.append(job+1)
626 spiga 1.162 job_ToSave ={}
627 spiga 1.169 concString = ' '
628 spiga 1.165 argu=''
629     if len(jobParams[job]):
630     argu += concString.join(jobParams[job] )
631 spiga 1.187 job_ToSave['arguments']= str(job+1)+' '+argu
632     job_ToSave['dlsDestination']= self.jobDestination[job]
633 spiga 1.165 listField.append(job_ToSave)
634 spiga 1.169 msg="Job "+str(job)+" Arguments: "+str(job+1)+" "+argu+"\n" \
635 spiga 1.165 +" Destination: "+str(self.jobDestination[job])
636     common.logger.debug(5,msg)
637 spiga 1.187 common._db.updateJob_(listID,listField)
638 spiga 1.181 self.argsList = (len(jobParams[0])+1)
639 gutsche 1.3
640     return
641 ewv 1.131
642 gutsche 1.3 def numberOfJobs(self):
643     return self.total_number_of_jobs
644    
645 slacapra 1.1 def getTarBall(self, exe):
646     """
647     Return the TarBall with lib and exe
648     """
649 corvo 1.56 self.tgzNameWithPath = common.work_space.pathForTgz()+'share/'+self.tgz_name
650 slacapra 1.1 if os.path.exists(self.tgzNameWithPath):
651     return self.tgzNameWithPath
652    
653     # Prepare a tar gzipped file with user binaries.
654     self.buildTar_(exe)
655    
656     return string.strip(self.tgzNameWithPath)
657    
658     def buildTar_(self, executable):
659    
660     # First of all declare the user Scram area
661     swArea = self.scram.getSWArea_()
662     swReleaseTop = self.scram.getReleaseTop_()
663 ewv 1.131
664 slacapra 1.1 ## check if working area is release top
665     if swReleaseTop == '' or swArea == swReleaseTop:
666 afanfani 1.172 common.logger.debug(3,"swArea = "+swArea+" swReleaseTop ="+swReleaseTop)
667 slacapra 1.1 return
668    
669 slacapra 1.61 import tarfile
670     try: # create tar ball
671     tar = tarfile.open(self.tgzNameWithPath, "w:gz")
672     ## First find the executable
673 slacapra 1.86 if (self.executable != ''):
674 slacapra 1.61 exeWithPath = self.scram.findFile_(executable)
675     if ( not exeWithPath ):
676     raise CrabException('User executable '+executable+' not found')
677 ewv 1.131
678 slacapra 1.61 ## then check if it's private or not
679     if exeWithPath.find(swReleaseTop) == -1:
680     # the exe is private, so we must ship
681     common.logger.debug(5,"Exe "+exeWithPath+" to be tarred")
682     path = swArea+'/'
683 corvo 1.85 # distinguish case when script is in user project area or given by full path somewhere else
684     if exeWithPath.find(path) >= 0 :
685     exe = string.replace(exeWithPath, path,'')
686 slacapra 1.129 tar.add(path+exe,exe)
687 corvo 1.85 else :
688     tar.add(exeWithPath,os.path.basename(executable))
689 slacapra 1.61 pass
690     else:
691     # the exe is from release, we'll find it on WN
692     pass
693 ewv 1.131
694 slacapra 1.61 ## Now get the libraries: only those in local working area
695     libDir = 'lib'
696     lib = swArea+'/' +libDir
697     common.logger.debug(5,"lib "+lib+" to be tarred")
698     if os.path.exists(lib):
699     tar.add(lib,libDir)
700 ewv 1.131
701 slacapra 1.61 ## Now check if module dir is present
702     moduleDir = 'module'
703     module = swArea + '/' + moduleDir
704     if os.path.isdir(module):
705     tar.add(module,moduleDir)
706    
707     ## Now check if any data dir(s) is present
708     swAreaLen=len(swArea)
709 spiga 1.179 self.dataExist = False
710 slacapra 1.61 for root, dirs, files in os.walk(swArea):
711     if "data" in dirs:
712 spiga 1.179 self.dataExist=True
713 slacapra 1.61 common.logger.debug(5,"data "+root+"/data"+" to be tarred")
714     tar.add(root+"/data",root[swAreaLen:]+"/data")
715 ewv 1.182
716 spiga 1.179 ### CMSSW ParameterSet
717     if not self.pset is None:
718     cfg_file = common.work_space.jobDir()+self.configFilename()
719 ewv 1.182 tar.add(cfg_file,self.configFilename())
720 spiga 1.179 common.logger.debug(5,"File added to "+self.tgzNameWithPath+" : "+str(tar.getnames()))
721 slacapra 1.61
722 fanzago 1.93
723 fanzago 1.152 ## Add ProdCommon dir to tar
724 fanzago 1.93 prodcommonDir = 'ProdCommon'
725     prodcommonPath = os.environ['CRABDIR'] + '/' + 'ProdCommon'
726     if os.path.isdir(prodcommonPath):
727     tar.add(prodcommonPath,prodcommonDir)
728 spiga 1.179 common.logger.debug(5,"Files added to "+self.tgzNameWithPath+" : "+str(tar.getnames()))
729    
730     ##### ML stuff
731     ML_file_list=['report.py', 'DashboardAPI.py', 'Logger.py', 'ProcInfo.py', 'apmon.py']
732     path=os.environ['CRABDIR'] + '/python/'
733     for file in ML_file_list:
734     tar.add(path+file,file)
735     common.logger.debug(5,"Files added to "+self.tgzNameWithPath+" : "+str(tar.getnames()))
736    
737     ##### Utils
738     Utils_file_list=['parseCrabFjr.py','writeCfg.py', 'JobReportErrorCode.py']
739     for file in Utils_file_list:
740     tar.add(path+file,file)
741     common.logger.debug(5,"Files added to "+self.tgzNameWithPath+" : "+str(tar.getnames()))
742 ewv 1.131
743 ewv 1.182 ##### AdditionalFiles
744 spiga 1.179 for file in self.additional_inbox_files:
745     tar.add(file,string.split(file,'/')[-1])
746 slacapra 1.61 common.logger.debug(5,"Files added to "+self.tgzNameWithPath+" : "+str(tar.getnames()))
747 ewv 1.182
748 slacapra 1.61 tar.close()
749     except :
750     raise CrabException('Could not create tar-ball')
751 gutsche 1.72
752     ## check for tarball size
753     tarballinfo = os.stat(self.tgzNameWithPath)
754     if ( tarballinfo.st_size > self.MaxTarBallSize*1024*1024 ) :
755     raise CrabException('Input sandbox size of ' + str(float(tarballinfo.st_size)/1024.0/1024.0) + ' MB is larger than the allowed ' + str(self.MaxTarBallSize) + ' MB input sandbox limit and not supported by the used GRID submission system. Please make sure that no unnecessary files are in all data directories in your local CMSSW project area as they are automatically packed into the input sandbox.')
756    
757 slacapra 1.61 ## create tar-ball with ML stuff
758 slacapra 1.97
759 spiga 1.165 def wsSetupEnvironment(self, nj=0):
760 slacapra 1.1 """
761     Returns part of a job script which prepares
762     the execution environment for the job 'nj'.
763     """
764 ewv 1.184 if (self.CMSSW_major >= 2 and self.CMSSW_minor >= 1) or (self.CMSSW_major >= 3):
765     psetName = 'pset.py'
766     else:
767     psetName = 'pset.cfg'
768 slacapra 1.1 # Prepare JobType-independent part
769 ewv 1.160 txt = '\n#Written by cms_cmssw::wsSetupEnvironment\n'
770 fanzago 1.133 txt += 'echo ">>> setup environment"\n'
771 ewv 1.131 txt += 'if [ $middleware == LCG ]; then \n'
772 gutsche 1.3 txt += self.wsSetupCMSLCGEnvironment_()
773     txt += 'elif [ $middleware == OSG ]; then\n'
774 gutsche 1.43 txt += ' WORKING_DIR=`/bin/mktemp -d $OSG_WN_TMP/cms_XXXXXXXXXXXX`\n'
775 ewv 1.132 txt += ' if [ ! $? == 0 ] ;then\n'
776 fanzago 1.161 txt += ' echo "ERROR ==> OSG $WORKING_DIR could not be created on WN `hostname`"\n'
777     txt += ' job_exit_code=10016\n'
778     txt += ' func_exit\n'
779 gutsche 1.3 txt += ' fi\n'
780 fanzago 1.133 txt += ' echo ">>> Created working directory: $WORKING_DIR"\n'
781 gutsche 1.3 txt += '\n'
782     txt += ' echo "Change to working directory: $WORKING_DIR"\n'
783     txt += ' cd $WORKING_DIR\n'
784 fanzago 1.133 txt += ' echo ">>> current directory (WORKING_DIR): $WORKING_DIR"\n'
785 ewv 1.131 txt += self.wsSetupCMSOSGEnvironment_()
786 gutsche 1.3 txt += 'fi\n'
787 slacapra 1.1
788     # Prepare JobType-specific part
789     scram = self.scram.commandName()
790     txt += '\n\n'
791 fanzago 1.133 txt += 'echo ">>> specific cmssw setup environment:"\n'
792     txt += 'echo "CMSSW_VERSION = '+self.version+'"\n'
793 slacapra 1.1 txt += scram+' project CMSSW '+self.version+'\n'
794     txt += 'status=$?\n'
795     txt += 'if [ $status != 0 ] ; then\n'
796 fanzago 1.161 txt += ' echo "ERROR ==> CMSSW '+self.version+' not found on `hostname`" \n'
797     txt += ' job_exit_code=10034\n'
798 fanzago 1.163 txt += ' func_exit\n'
799 slacapra 1.1 txt += 'fi \n'
800     txt += 'cd '+self.version+'\n'
801 fanzago 1.99 txt += 'SOFTWARE_DIR=`pwd`\n'
802 fanzago 1.133 txt += 'echo ">>> current directory (SOFTWARE_DIR): $SOFTWARE_DIR" \n'
803 slacapra 1.1 txt += 'eval `'+scram+' runtime -sh | grep -v SCRAMRT_LSB_JOBNAME`\n'
804 fanzago 1.180 txt += 'if [ $? != 0 ] ; then\n'
805     txt += ' echo "ERROR ==> Problem with the command: "\n'
806     txt += ' echo "eval \`'+scram+' runtime -sh | grep -v SCRAMRT_LSB_JOBNAME \` at `hostname`"\n'
807     txt += ' job_exit_code=10034\n'
808     txt += ' func_exit\n'
809     txt += 'fi \n'
810 slacapra 1.1 # Handle the arguments:
811     txt += "\n"
812 gutsche 1.7 txt += "## number of arguments (first argument always jobnumber)\n"
813 slacapra 1.1 txt += "\n"
814 spiga 1.165 txt += "if [ $nargs -lt "+str(self.argsList)+" ]\n"
815 slacapra 1.1 txt += "then\n"
816 fanzago 1.161 txt += " echo 'ERROR ==> Too few arguments' +$nargs+ \n"
817     txt += ' job_exit_code=50113\n'
818     txt += " func_exit\n"
819 slacapra 1.1 txt += "fi\n"
820     txt += "\n"
821    
822     # Prepare job-specific part
823     job = common.job_list[nj]
824 ewv 1.131 if (self.datasetPath):
825 fanzago 1.93 txt += '\n'
826     txt += 'DatasetPath='+self.datasetPath+'\n'
827    
828     datasetpath_split = self.datasetPath.split("/")
829 ewv 1.131
830 fanzago 1.93 txt += 'PrimaryDataset='+datasetpath_split[1]+'\n'
831     txt += 'DataTier='+datasetpath_split[2]+'\n'
832 fanzago 1.96 txt += 'ApplicationFamily=cmsRun\n'
833 fanzago 1.93
834     else:
835     txt += 'DatasetPath=MCDataTier\n'
836     txt += 'PrimaryDataset=null\n'
837     txt += 'DataTier=null\n'
838     txt += 'ApplicationFamily=MCDataTier\n'
839 ewv 1.170 if self.pset != None:
840 spiga 1.42 pset = os.path.basename(job.configFilename())
841     txt += '\n'
842 spiga 1.95 txt += 'cp $RUNTIME_AREA/'+pset+' .\n'
843 spiga 1.42 if (self.datasetPath): # standard job
844 ewv 1.160 txt += 'InputFiles=${args[1]}; export InputFiles\n'
845     txt += 'MaxEvents=${args[2]}; export MaxEvents\n'
846     txt += 'SkipEvents=${args[3]}; export SkipEvents\n'
847 spiga 1.42 txt += 'echo "Inputfiles:<$InputFiles>"\n'
848     txt += 'echo "MaxEvents:<$MaxEvents>"\n'
849     txt += 'echo "SkipEvents:<$SkipEvents>"\n'
850     else: # pythia like job
851 ewv 1.160 txt += 'PreserveSeeds=' + ','.join(self.preserveSeeds) + '; export PreserveSeeds\n'
852     txt += 'IncrementSeeds=' + ','.join(self.incrementSeeds) + '; export IncrementSeeds\n'
853     txt += 'echo "PreserveSeeds: <$PreserveSeeds>"\n'
854     txt += 'echo "IncrementSeeds:<$IncrementSeeds>"\n'
855 slacapra 1.90 if (self.firstRun):
856 ewv 1.160 txt += 'FirstRun=${args[1]}; export FirstRun\n'
857 spiga 1.57 txt += 'echo "FirstRun: <$FirstRun>"\n'
858 slacapra 1.90
859 ewv 1.184 txt += 'mv -f ' + pset + ' ' + psetName + '\n'
860 slacapra 1.1
861    
862 fanzago 1.163 if self.pset != None:
863 ewv 1.184 # FUTURE: Can simply for 2_1_x and higher
864 spiga 1.42 txt += '\n'
865 spiga 1.188 if self.debug_pset==True:
866     txt += 'echo "***** cat ' + psetName + ' *********"\n'
867     txt += 'cat ' + psetName + '\n'
868     txt += 'echo "****** end ' + psetName + ' ********"\n'
869     txt += '\n'
870 ewv 1.184 txt += 'PSETHASH=`edmConfigHash < ' + psetName + '` \n'
871 fanzago 1.94 txt += 'echo "PSETHASH = $PSETHASH" \n'
872 fanzago 1.93 txt += '\n'
873 gutsche 1.3 return txt
874 slacapra 1.176
875 fanzago 1.166 def wsUntarSoftware(self, nj=0):
876 gutsche 1.3 """
877     Put in the script the commands to build an executable
878     or a library.
879     """
880    
881 fanzago 1.166 txt = '\n#Written by cms_cmssw::wsUntarSoftware\n'
882 gutsche 1.3
883     if os.path.isfile(self.tgzNameWithPath):
884 fanzago 1.133 txt += 'echo ">>> tar xzvf $RUNTIME_AREA/'+os.path.basename(self.tgzNameWithPath)+' :" \n'
885 gutsche 1.3 txt += 'tar xzvf $RUNTIME_AREA/'+os.path.basename(self.tgzNameWithPath)+'\n'
886 spiga 1.179 txt += 'ls -Al \n'
887 gutsche 1.3 txt += 'untar_status=$? \n'
888     txt += 'if [ $untar_status -ne 0 ]; then \n'
889 fanzago 1.161 txt += ' echo "ERROR ==> Untarring .tgz file failed"\n'
890     txt += ' job_exit_code=$untar_status\n'
891     txt += ' func_exit\n'
892 gutsche 1.3 txt += 'else \n'
893     txt += ' echo "Successful untar" \n'
894     txt += 'fi \n'
895 gutsche 1.50 txt += '\n'
896 fanzago 1.152 txt += 'echo ">>> Include ProdCommon in PYTHONPATH:"\n'
897 gutsche 1.50 txt += 'if [ -z "$PYTHONPATH" ]; then\n'
898 fanzago 1.166 txt += ' export PYTHONPATH=$RUNTIME_AREA/ProdCommon\n'
899 gutsche 1.50 txt += 'else\n'
900 fanzago 1.166 txt += ' export PYTHONPATH=$RUNTIME_AREA/ProdCommon:${PYTHONPATH}\n'
901 fanzago 1.93 txt += 'echo "PYTHONPATH=$PYTHONPATH"\n'
902 gutsche 1.50 txt += 'fi\n'
903     txt += '\n'
904    
905 gutsche 1.3 pass
906 ewv 1.131
907 slacapra 1.1 return txt
908 ewv 1.170
909 fanzago 1.166 def wsBuildExe(self, nj=0):
910     """
911     Put in the script the commands to build an executable
912     or a library.
913     """
914    
915     txt = '\n#Written by cms_cmssw::wsBuildExe\n'
916     txt += 'echo ">>> moving CMSSW software directories in `pwd`" \n'
917    
918 ewv 1.170 txt += 'rm -r lib/ module/ \n'
919     txt += 'mv $RUNTIME_AREA/lib/ . \n'
920     txt += 'mv $RUNTIME_AREA/module/ . \n'
921 spiga 1.186 if self.dataExist == True:
922     txt += 'rm -r src/ \n'
923     txt += 'mv $RUNTIME_AREA/src/ . \n'
924 ewv 1.182 if len(self.additional_inbox_files)>0:
925 spiga 1.179 for file in self.additional_inbox_files:
926 spiga 1.191 txt += 'mv $RUNTIME_AREA/'+os.path.basename(file)+' . \n'
927 ewv 1.170 txt += 'mv $RUNTIME_AREA/ProdCommon/ . \n'
928    
929 fanzago 1.166 txt += 'if [ -z "$PYTHONPATH" ]; then\n'
930     txt += ' export PYTHONPATH=$SOFTWARE_DIR/ProdCommon\n'
931     txt += 'else\n'
932     txt += ' export PYTHONPATH=$SOFTWARE_DIR/ProdCommon:${PYTHONPATH}\n'
933     txt += 'echo "PYTHONPATH=$PYTHONPATH"\n'
934     txt += 'fi\n'
935     txt += '\n'
936    
937     return txt
938 slacapra 1.1
939     def modifySteeringCards(self, nj):
940     """
941 ewv 1.131 modify the card provided by the user,
942 slacapra 1.1 writing a new card into share dir
943     """
944 ewv 1.131
945 slacapra 1.1 def executableName(self):
946 spiga 1.187 if self.scriptExe:
947 spiga 1.42 return "sh "
948     else:
949     return self.executable
950 slacapra 1.1
951     def executableArgs(self):
952 ewv 1.160 # FUTURE: This function tests the CMSSW version. Can be simplified as we drop support for old versions
953 slacapra 1.70 if self.scriptExe:#CarlosDaniele
954 spiga 1.42 return self.scriptExe + " $NJob"
955 fanzago 1.115 else:
956 ewv 1.160 ex_args = ""
957 ewv 1.171 # FUTURE: This tests the CMSSW version. Can remove code as versions deprecated
958 ewv 1.160 # Framework job report
959 ewv 1.184 if (self.CMSSW_major >= 1 and self.CMSSW_minor >= 5) or (self.CMSSW_major >= 2):
960 fanzago 1.166 ex_args += " -j $RUNTIME_AREA/crab_fjr_$NJob.xml"
961 ewv 1.184 # Type of config file
962     if self.CMSSW_major >= 2 :
963 ewv 1.171 ex_args += " -p pset.py"
964 fanzago 1.115 else:
965 ewv 1.160 ex_args += " -p pset.cfg"
966     return ex_args
967 slacapra 1.1
968     def inputSandbox(self, nj):
969     """
970     Returns a list of filenames to be put in JDL input sandbox.
971     """
972     inp_box = []
973     if os.path.isfile(self.tgzNameWithPath):
974     inp_box.append(self.tgzNameWithPath)
975 spiga 1.168 wrapper = os.path.basename(str(common._db.queryTask('scriptName')))
976     inp_box.append(common.work_space.pathForTgz() +'job/'+ wrapper)
977 slacapra 1.1 return inp_box
978    
979     def outputSandbox(self, nj):
980     """
981     Returns a list of filenames to be put in JDL output sandbox.
982     """
983     out_box = []
984    
985     ## User Declared output files
986 slacapra 1.54 for out in (self.output_file+self.output_file_sandbox):
987 ewv 1.131 n_out = nj + 1
988 slacapra 1.1 out_box.append(self.numberFile_(out,str(n_out)))
989     return out_box
990    
991     def prepareSteeringCards(self):
992     """
993     Make initial modifications of the user's steering card file.
994     """
995     return
996    
997     def wsRenameOutput(self, nj):
998     """
999     Returns part of a job script which renames the produced files.
1000     """
1001    
1002 ewv 1.160 txt = '\n#Written by cms_cmssw::wsRenameOutput\n'
1003 fanzago 1.148 txt += 'echo ">>> current directory (SOFTWARE_DIR): $SOFTWARE_DIR" \n'
1004     txt += 'echo ">>> current directory content:"\n'
1005 gutsche 1.7 txt += 'ls \n'
1006 fanzago 1.145 txt += '\n'
1007 slacapra 1.54
1008 fanzago 1.128 for fileWithSuffix in (self.output_file):
1009 slacapra 1.1 output_file_num = self.numberFile_(fileWithSuffix, '$NJob')
1010     txt += '\n'
1011 gutsche 1.7 txt += '# check output file\n'
1012 slacapra 1.106 txt += 'if [ -e ./'+fileWithSuffix+' ] ; then\n'
1013 ewv 1.147 if (self.copy_data == 1): # For OSG nodes, file is in $WORKING_DIR, should not be moved to $RUNTIME_AREA
1014     txt += ' mv '+fileWithSuffix+' '+output_file_num+'\n'
1015     txt += ' ln -s `pwd`/'+output_file_num+' $RUNTIME_AREA/'+fileWithSuffix+'\n'
1016     else:
1017     txt += ' mv '+fileWithSuffix+' $RUNTIME_AREA/'+output_file_num+'\n'
1018     txt += ' ln -s $RUNTIME_AREA/'+output_file_num+' $RUNTIME_AREA/'+fileWithSuffix+'\n'
1019 slacapra 1.106 txt += 'else\n'
1020 fanzago 1.161 txt += ' job_exit_code=60302\n'
1021     txt += ' echo "WARNING: Output file '+fileWithSuffix+' not found"\n'
1022 ewv 1.156 if common.scheduler.name().upper() == 'CONDOR_G':
1023 gutsche 1.7 txt += ' if [ $middleware == OSG ]; then \n'
1024     txt += ' echo "prepare dummy output file"\n'
1025     txt += ' echo "Processing of job output failed" > $RUNTIME_AREA/'+output_file_num+'\n'
1026     txt += ' fi \n'
1027 slacapra 1.1 txt += 'fi\n'
1028 slacapra 1.105 file_list = []
1029     for fileWithSuffix in (self.output_file):
1030     file_list.append(self.numberFile_(fileWithSuffix, '$NJob'))
1031 ewv 1.131
1032 slacapra 1.105 txt += 'file_list="'+string.join(file_list,' ')+'"\n'
1033 fanzago 1.149 txt += '\n'
1034 fanzago 1.148 txt += 'echo ">>> current directory (SOFTWARE_DIR): $SOFTWARE_DIR" \n'
1035     txt += 'echo ">>> current directory content:"\n'
1036     txt += 'ls \n'
1037     txt += '\n'
1038 gutsche 1.7 txt += 'cd $RUNTIME_AREA\n'
1039 fanzago 1.133 txt += 'echo ">>> current directory (RUNTIME_AREA): $RUNTIME_AREA"\n'
1040 slacapra 1.1 return txt
1041    
1042     def numberFile_(self, file, txt):
1043     """
1044     append _'txt' before last extension of a file
1045     """
1046     p = string.split(file,".")
1047     # take away last extension
1048     name = p[0]
1049     for x in p[1:-1]:
1050 slacapra 1.90 name=name+"."+x
1051 slacapra 1.1 # add "_txt"
1052     if len(p)>1:
1053 slacapra 1.90 ext = p[len(p)-1]
1054     result = name + '_' + txt + "." + ext
1055 slacapra 1.1 else:
1056 slacapra 1.90 result = name + '_' + txt
1057 ewv 1.131
1058 slacapra 1.1 return result
1059    
1060 slacapra 1.63 def getRequirements(self, nj=[]):
1061 slacapra 1.1 """
1062 ewv 1.131 return job requirements to add to jdl files
1063 slacapra 1.1 """
1064     req = ''
1065 slacapra 1.47 if self.version:
1066 slacapra 1.10 req='Member("VO-cms-' + \
1067 slacapra 1.47 self.version + \
1068 slacapra 1.10 '", other.GlueHostApplicationSoftwareRunTimeEnvironment)'
1069 spiga 1.187 if self.executable_arch:
1070 gutsche 1.107 req+=' && Member("VO-cms-' + \
1071 slacapra 1.105 self.executable_arch + \
1072     '", other.GlueHostApplicationSoftwareRunTimeEnvironment)'
1073 gutsche 1.35
1074     req = req + ' && (other.GlueHostNetworkAdapterOutboundIP)'
1075 afanfani 1.158 if common.scheduler.name() == "glitecoll":
1076     req += ' && other.GlueCEStateStatus == "Production" '
1077 gutsche 1.35
1078 slacapra 1.1 return req
1079 gutsche 1.3
1080     def configFilename(self):
1081     """ return the config filename """
1082 ewv 1.182 # FUTURE: Can remove cfg mode for CMSSW >= 2_1_x
1083 ewv 1.184 if (self.CMSSW_major >= 2 and self.CMSSW_minor >= 1) or (self.CMSSW_major >= 3):
1084 ewv 1.182 return self.name()+'.py'
1085     else:
1086     return self.name()+'.cfg'
1087 gutsche 1.3
1088     def wsSetupCMSOSGEnvironment_(self):
1089     """
1090     Returns part of a job script which is prepares
1091     the execution environment and which is common for all CMS jobs.
1092     """
1093 ewv 1.160 txt = '\n#Written by cms_cmssw::wsSetupCMSOSGEnvironment_\n'
1094     txt += ' echo ">>> setup CMS OSG environment:"\n'
1095 fanzago 1.133 txt += ' echo "set SCRAM ARCH to ' + self.executable_arch + '"\n'
1096     txt += ' export SCRAM_ARCH='+self.executable_arch+'\n'
1097 fanzago 1.136 txt += ' echo "SCRAM_ARCH = $SCRAM_ARCH"\n'
1098 ewv 1.135 txt += ' if [ -f $OSG_APP/cmssoft/cms/cmsset_default.sh ] ;then\n'
1099 mkirn 1.40 txt += ' # Use $OSG_APP/cmssoft/cms/cmsset_default.sh to setup cms software\n'
1100 fanzago 1.133 txt += ' source $OSG_APP/cmssoft/cms/cmsset_default.sh '+self.version+'\n'
1101     txt += ' else\n'
1102 fanzago 1.161 txt += ' echo "ERROR ==> $OSG_APP/cmssoft/cms/cmsset_default.sh file not found"\n'
1103     txt += ' job_exit_code=10020\n'
1104     txt += ' func_exit\n'
1105 fanzago 1.133 txt += ' fi\n'
1106 gutsche 1.3 txt += '\n'
1107 fanzago 1.161 txt += ' echo "==> setup cms environment ok"\n'
1108 fanzago 1.136 txt += ' echo "SCRAM_ARCH = $SCRAM_ARCH"\n'
1109 gutsche 1.3
1110     return txt
1111 ewv 1.131
1112 gutsche 1.3 def wsSetupCMSLCGEnvironment_(self):
1113     """
1114     Returns part of a job script which is prepares
1115     the execution environment and which is common for all CMS jobs.
1116     """
1117 ewv 1.160 txt = '\n#Written by cms_cmssw::wsSetupCMSLCGEnvironment_\n'
1118     txt += ' echo ">>> setup CMS LCG environment:"\n'
1119 fanzago 1.133 txt += ' echo "set SCRAM ARCH and BUILD_ARCH to ' + self.executable_arch + ' ###"\n'
1120     txt += ' export SCRAM_ARCH='+self.executable_arch+'\n'
1121     txt += ' export BUILD_ARCH='+self.executable_arch+'\n'
1122     txt += ' if [ ! $VO_CMS_SW_DIR ] ;then\n'
1123 fanzago 1.161 txt += ' echo "ERROR ==> CMS software dir not found on WN `hostname`"\n'
1124     txt += ' job_exit_code=10031\n'
1125     txt += ' func_exit\n'
1126 fanzago 1.133 txt += ' else\n'
1127     txt += ' echo "Sourcing environment... "\n'
1128     txt += ' if [ ! -s $VO_CMS_SW_DIR/cmsset_default.sh ] ;then\n'
1129 fanzago 1.161 txt += ' echo "ERROR ==> cmsset_default.sh file not found into dir $VO_CMS_SW_DIR"\n'
1130     txt += ' job_exit_code=10020\n'
1131     txt += ' func_exit\n'
1132 fanzago 1.133 txt += ' fi\n'
1133     txt += ' echo "sourcing $VO_CMS_SW_DIR/cmsset_default.sh"\n'
1134     txt += ' source $VO_CMS_SW_DIR/cmsset_default.sh\n'
1135     txt += ' result=$?\n'
1136     txt += ' if [ $result -ne 0 ]; then\n'
1137 fanzago 1.161 txt += ' echo "ERROR ==> problem sourcing $VO_CMS_SW_DIR/cmsset_default.sh"\n'
1138     txt += ' job_exit_code=10032\n'
1139     txt += ' func_exit\n'
1140 fanzago 1.133 txt += ' fi\n'
1141     txt += ' fi\n'
1142     txt += ' \n'
1143 fanzago 1.161 txt += ' echo "==> setup cms environment ok"\n'
1144 gutsche 1.3 return txt
1145 gutsche 1.5
1146 fanzago 1.93 def modifyReport(self, nj):
1147     """
1148 ewv 1.131 insert the part of the script that modifies the FrameworkJob Report
1149 fanzago 1.93 """
1150 ewv 1.160 txt = '\n#Written by cms_cmssw::modifyReport\n'
1151 slacapra 1.176 publish_data = int(self.cfg_params.get('USER.publish_data',0))
1152 ewv 1.131 if (publish_data == 1):
1153 fanzago 1.94 processedDataset = self.cfg_params['USER.publish_data_name']
1154 fanzago 1.173 LFNBaseName = LFNBase(processedDataset)
1155 fanzago 1.175
1156     txt += 'if [ $copy_exit_status -eq 0 ]; then\n'
1157 fanzago 1.173 txt += ' FOR_LFN=%s_${PSETHASH}/\n'%(LFNBaseName)
1158 fanzago 1.175 txt += 'else\n'
1159     txt += ' FOR_LFN=/copy_problems/ \n'
1160     txt += ' SE=""\n'
1161     txt += ' SE_PATH=""\n'
1162     txt += 'fi\n'
1163 ewv 1.182
1164 fanzago 1.175 txt += 'echo ">>> Modify Job Report:" \n'
1165     txt += 'chmod a+x $SOFTWARE_DIR/ProdCommon/ProdCommon/FwkJobRep/ModifyJobReport.py\n'
1166     txt += 'ProcessedDataset='+processedDataset+'\n'
1167     txt += 'echo "ProcessedDataset = $ProcessedDataset"\n'
1168     txt += 'echo "SE = $SE"\n'
1169     txt += 'echo "SE_PATH = $SE_PATH"\n'
1170     txt += 'echo "FOR_LFN = $FOR_LFN" \n'
1171     txt += 'echo "CMSSW_VERSION = $CMSSW_VERSION"\n\n'
1172     txt += 'echo "$SOFTWARE_DIR/ProdCommon/ProdCommon/FwkJobRep/ModifyJobReport.py $RUNTIME_AREA/crab_fjr_$NJob.xml $NJob $FOR_LFN $PrimaryDataset $DataTier $ProcessedDataset $ApplicationFamily $executable $CMSSW_VERSION $PSETHASH $SE $SE_PATH"\n'
1173     txt += '$SOFTWARE_DIR/ProdCommon/ProdCommon/FwkJobRep/ModifyJobReport.py $RUNTIME_AREA/crab_fjr_$NJob.xml $NJob $FOR_LFN $PrimaryDataset $DataTier $ProcessedDataset $ApplicationFamily $executable $CMSSW_VERSION $PSETHASH $SE $SE_PATH\n'
1174     txt += 'modifyReport_result=$?\n'
1175     txt += 'if [ $modifyReport_result -ne 0 ]; then\n'
1176     txt += ' modifyReport_result=70500\n'
1177     txt += ' job_exit_code=$modifyReport_result\n'
1178     txt += ' echo "ModifyReportResult=$modifyReport_result" | tee -a $RUNTIME_AREA/$repo\n'
1179     txt += ' echo "WARNING: Problem with ModifyJobReport"\n'
1180     txt += 'else\n'
1181     txt += ' mv NewFrameworkJobReport.xml $RUNTIME_AREA/crab_fjr_$NJob.xml\n'
1182 spiga 1.103 txt += 'fi\n'
1183 fanzago 1.93 return txt
1184 fanzago 1.99
1185 spiga 1.190 def wsParseFJR(self):
1186 spiga 1.189 """
1187     Parse the FrameworkJobReport to obtain useful infos
1188     """
1189     txt = '\n#Written by cms_cmssw::wsParseFJR\n'
1190     txt += 'echo ">>> Parse FrameworkJobReport crab_fjr.xml"\n'
1191     txt += 'if [ -s $RUNTIME_AREA/crab_fjr_$NJob.xml ]; then\n'
1192     txt += ' if [ -s $RUNTIME_AREA/parseCrabFjr.py ]; then\n'
1193     txt += ' cmd_out=`python $RUNTIME_AREA/parseCrabFjr.py --input $RUNTIME_AREA/crab_fjr_$NJob.xml --MonitorID $MonitorID --MonitorJobID $MonitorJobID`\n'
1194     txt += ' echo "Result of parsing the FrameworkJobReport crab_fjr.xml: $cmd_out"\n'
1195     txt += ' executable_exit_status=`echo $cmd_out | awk -F\; "{print $1}" | awk -F ' ' "{print $NF}"`\n'
1196     txt += ' if [ $executable_exit_status -eq 50115 ];then\n'
1197     txt += ' echo ">>> crab_fjr.xml contents: "\n'
1198     txt += ' cat $RUNTIME_AREA/crab_fjr_NJob.xml\n'
1199     txt += ' echo "Wrong FrameworkJobReport --> does not contain useful info. ExitStatus: $executable_exit_status"\n'
1200     txt += ' else\n'
1201     txt += ' echo "Extracted ExitStatus from FrameworkJobReport parsing output: $executable_exit_status"\n'
1202     txt += ' fi\n'
1203     txt += ' else\n'
1204     txt += ' echo "CRAB python script to parse CRAB FrameworkJobReport crab_fjr.xml is not available, using exit code of executable from command line."\n'
1205     txt += ' fi\n'
1206     #### Patch to check input data reading for CMSSW16x Hopefully we-ll remove it asap
1207    
1208     if self.datasetPath:
1209     # VERIFY PROCESSED DATA
1210     txt += ' if [ $executable_exit_status -eq 0 ];then\n'
1211     txt += ' echo ">>> Verify list of processed files:"\n'
1212     txt += ' echo $InputFiles |tr -d "\\" |tr "," \n"|tr -d "\"" > input-files.txt\n'
1213     txt += ' grep LFN $RUNTIME_AREA/crab_fjr_$NJob.xml |cut -d">" -f2|cut -d"<" -f1|grep "/" > processed-files.txt\n'
1214     txt += ' cat input-files.txt | sort | uniq > tmp.txt\n'
1215     txt += ' mv tmp.txt input-files.txt\n'
1216     txt += ' echo "cat input-files.txt"\n'
1217     txt += ' echo "----------------------"\n'
1218     txt += ' cat input-files.txt\n'
1219     txt += ' cat processed-files.txt | sort | uniq > tmp.txt\n'
1220     txt += ' mv tmp.txt processed-files.txt\n'
1221     txt += ' echo "----------------------"\n'
1222     txt += ' echo "cat processed-files.txt"\n'
1223     txt += ' echo "----------------------"\n'
1224     txt += ' cat processed-files.txt\n'
1225     txt += ' echo "----------------------"\n'
1226     txt += ' diff -q input-files.txt processed-files.txt\n'
1227     txt += ' fileverify_status=$?\n'
1228     txt += ' if [ $fileverify_status -ne 0 ]; then\n'
1229     txt += ' executable_exit_status=30001\n'
1230     txt += ' echo "ERROR ==> not all input files processed"\n'
1231     txt += ' echo " ==> list of processed files from crab_fjr.xml differs from list in pset.cfg"\n'
1232     txt += ' echo " ==> diff input-files.txt processed-files.txt"\n'
1233     txt += ' fi\n'
1234     txt += ' fi\n'
1235     txt += '\n'
1236     txt += 'else\n'
1237     txt += ' echo "CRAB FrameworkJobReport crab_fjr.xml is not available, using exit code of executable from command line."\n'
1238     txt += 'fi\n'
1239     txt += '\n'
1240     txt += 'echo "ExeExitCode=$executable_exit_status" | tee -a $RUNTIME_AREA/$repo\n'
1241     txt += 'echo "EXECUTABLE_EXIT_STATUS = $executable_exit_status"\n'
1242     txt += 'job_exit_code=$executable_exit_status\n'
1243    
1244     return txt
1245    
1246 gutsche 1.5 def setParam_(self, param, value):
1247     self._params[param] = value
1248    
1249     def getParams(self):
1250     return self._params
1251 gutsche 1.8
1252 gutsche 1.35 def uniquelist(self, old):
1253     """
1254     remove duplicates from a list
1255     """
1256     nd={}
1257     for e in old:
1258     nd[e]=0
1259     return nd.keys()
1260 mcinquil 1.121
1261 spiga 1.169 def outList(self):
1262 mcinquil 1.121 """
1263     check the dimension of the output files
1264     """
1265 spiga 1.169 txt = ''
1266     txt += 'echo ">>> list of expected files on output sandbox"\n'
1267 mcinquil 1.121 listOutFiles = []
1268 ewv 1.170 stdout = 'CMSSW_$NJob.stdout'
1269 spiga 1.169 stderr = 'CMSSW_$NJob.stderr'
1270 fanzago 1.148 if (self.return_data == 1):
1271 spiga 1.157 for file in (self.output_file+self.output_file_sandbox):
1272     listOutFiles.append(self.numberFile_(file, '$NJob'))
1273 spiga 1.169 listOutFiles.append(stdout)
1274     listOutFiles.append(stderr)
1275 ewv 1.156 else:
1276 spiga 1.157 for file in (self.output_file_sandbox):
1277     listOutFiles.append(self.numberFile_(file, '$NJob'))
1278 spiga 1.169 listOutFiles.append(stdout)
1279     listOutFiles.append(stderr)
1280 fanzago 1.161 txt += 'echo "output files: '+string.join(listOutFiles,' ')+'"\n'
1281 spiga 1.157 txt += 'filesToCheck="'+string.join(listOutFiles,' ')+'"\n'
1282 spiga 1.169 txt += 'export filesToCheck\n'
1283 ewv 1.170 return txt