ViewVC Help
View File | Revision Log | Show Annotations | Root Listing
root/cvsroot/COMP/CRAB/python/cms_cmssw.py
Revision: 1.199
Committed: Thu May 29 23:08:35 2008 UTC (16 years, 11 months ago) by spiga
Content type: text/x-python
Branch: MAIN
CVS Tags: CRAB_2_2_1_pre4
Changes since 1.198: +6 -3 lines
Log Message:
moved some ls -Al to high level debug

File Contents

# User Rev Content
1 slacapra 1.1 from JobType import JobType
2     from crab_logger import Logger
3     from crab_exceptions import *
4     from crab_util import *
5 fanzago 1.115 from BlackWhiteListParser import BlackWhiteListParser
6 slacapra 1.1 import common
7     import Scram
8 fanzago 1.173 from LFNBaseName import *
9 slacapra 1.1
10 slacapra 1.105 import os, string, glob
11 slacapra 1.1
12     class Cmssw(JobType):
13 mcinquil 1.144 def __init__(self, cfg_params, ncjobs):
14 slacapra 1.1 JobType.__init__(self, 'CMSSW')
15     common.logger.debug(3,'CMSSW::__init__')
16    
17 mcinquil 1.140 self.argsList = []
18 mcinquil 1.144
19 gutsche 1.3 self._params = {}
20     self.cfg_params = cfg_params
21 fanzago 1.115 # init BlackWhiteListParser
22     self.blackWhiteListParser = BlackWhiteListParser(cfg_params)
23    
24 slacapra 1.153 self.MaxTarBallSize = float(self.cfg_params.get('EDG.maxtarballsize',9.5))
25 gutsche 1.72
26 gutsche 1.44 # number of jobs requested to be created, limit obj splitting
27 gutsche 1.38 self.ncjobs = ncjobs
28    
29 slacapra 1.1 log = common.logger
30 ewv 1.131
31 slacapra 1.1 self.scram = Scram.Scram(cfg_params)
32     self.additional_inbox_files = []
33     self.scriptExe = ''
34     self.executable = ''
35 slacapra 1.71 self.executable_arch = self.scram.getArch()
36 slacapra 1.1 self.tgz_name = 'default.tgz'
37 corvo 1.56 self.scriptName = 'CMSSW.sh'
38 ewv 1.192 self.pset = ''
39 spiga 1.187 self.datasetPath = ''
40 gutsche 1.3
41 gutsche 1.50 # set FJR file name
42     self.fjrFileName = 'crab_fjr.xml'
43    
44 slacapra 1.1 self.version = self.scram.getSWVersion()
45 ewv 1.182 version_array = self.version.split('_')
46 ewv 1.184 self.CMSSW_major = 0
47     self.CMSSW_minor = 0
48     self.CMSSW_patch = 0
49 ewv 1.182 try:
50 ewv 1.184 self.CMSSW_major = int(version_array[1])
51     self.CMSSW_minor = int(version_array[2])
52     self.CMSSW_patch = int(version_array[3])
53 ewv 1.182 except:
54 ewv 1.184 msg = "Cannot parse CMSSW version string: " + self.version + " for major and minor release number!"
55 ewv 1.182 raise CrabException(msg)
56    
57 slacapra 1.1 ### collect Data cards
58 gutsche 1.66
59 slacapra 1.153 if not cfg_params.has_key('CMSSW.datasetpath'):
60 ewv 1.131 msg = "Error: datasetpath not defined "
61 slacapra 1.1 raise CrabException(msg)
62 slacapra 1.153 tmp = cfg_params['CMSSW.datasetpath']
63     log.debug(6, "CMSSW::CMSSW(): datasetPath = "+tmp)
64     if string.lower(tmp)=='none':
65     self.datasetPath = None
66     self.selectNoInput = 1
67     else:
68     self.datasetPath = tmp
69     self.selectNoInput = 0
70 gutsche 1.5
71 slacapra 1.1 self.dataTiers = []
72 spiga 1.197 self.debugWrap = ''
73     self.debug_wrapper = cfg_params.get('USER.debug_wrapper',False)
74     if self.debug_wrapper: self.debugWrap='--debug'
75 slacapra 1.1 ## now the application
76 slacapra 1.153 self.executable = cfg_params.get('CMSSW.executable','cmsRun')
77     log.debug(6, "CMSSW::CMSSW(): executable = "+self.executable)
78 slacapra 1.1
79 slacapra 1.153 if not cfg_params.has_key('CMSSW.pset'):
80 slacapra 1.1 raise CrabException("PSet file missing. Cannot run cmsRun ")
81 slacapra 1.153 self.pset = cfg_params['CMSSW.pset']
82     log.debug(6, "Cmssw::Cmssw(): PSet file = "+self.pset)
83     if self.pset.lower() != 'none' :
84     if (not os.path.exists(self.pset)):
85     raise CrabException("User defined PSet file "+self.pset+" does not exist")
86     else:
87     self.pset = None
88 slacapra 1.1
89     # output files
90 slacapra 1.53 ## stuff which must be returned always via sandbox
91     self.output_file_sandbox = []
92    
93     # add fjr report by default via sandbox
94     self.output_file_sandbox.append(self.fjrFileName)
95    
96     # other output files to be returned via sandbox or copied to SE
97 slacapra 1.153 self.output_file = []
98     tmp = cfg_params.get('CMSSW.output_file',None)
99     if tmp :
100     tmpOutFiles = string.split(tmp,',')
101     log.debug(7, 'cmssw::cmssw(): output files '+str(tmpOutFiles))
102     for tmp in tmpOutFiles:
103     tmp=string.strip(tmp)
104     self.output_file.append(tmp)
105 slacapra 1.1 pass
106 slacapra 1.153 else:
107 gutsche 1.92 log.message("No output file defined: only stdout/err and the CRAB Framework Job Report will be available\n")
108 slacapra 1.153 pass
109 slacapra 1.1
110     # script_exe file as additional file in inputSandbox
111 slacapra 1.153 self.scriptExe = cfg_params.get('USER.script_exe',None)
112     if self.scriptExe :
113 slacapra 1.176 if not os.path.isfile(self.scriptExe):
114     msg ="ERROR. file "+self.scriptExe+" not found"
115     raise CrabException(msg)
116     self.additional_inbox_files.append(string.strip(self.scriptExe))
117 slacapra 1.70
118 spiga 1.42 if self.datasetPath == None and self.pset == None and self.scriptExe == '' :
119 slacapra 1.176 msg ="Error. script_exe not defined"
120     raise CrabException(msg)
121 spiga 1.42
122 slacapra 1.1 ## additional input files
123 slacapra 1.153 if cfg_params.has_key('USER.additional_input_files'):
124 slacapra 1.29 tmpAddFiles = string.split(cfg_params['USER.additional_input_files'],',')
125 slacapra 1.70 for tmp in tmpAddFiles:
126     tmp = string.strip(tmp)
127     dirname = ''
128     if not tmp[0]=="/": dirname = "."
129 corvo 1.85 files = []
130     if string.find(tmp,"*")>-1:
131     files = glob.glob(os.path.join(dirname, tmp))
132     if len(files)==0:
133     raise CrabException("No additional input file found with this pattern: "+tmp)
134     else:
135     files.append(tmp)
136 slacapra 1.70 for file in files:
137     if not os.path.exists(file):
138     raise CrabException("Additional input file not found: "+file)
139 slacapra 1.45 pass
140 slacapra 1.105 self.additional_inbox_files.append(string.strip(file))
141 slacapra 1.1 pass
142     pass
143 slacapra 1.70 common.logger.debug(5,"Additional input files: "+str(self.additional_inbox_files))
144 slacapra 1.153 pass
145 gutsche 1.3
146 slacapra 1.9 ## Events per job
147 slacapra 1.153 if cfg_params.has_key('CMSSW.events_per_job'):
148 slacapra 1.10 self.eventsPerJob =int( cfg_params['CMSSW.events_per_job'])
149 slacapra 1.9 self.selectEventsPerJob = 1
150 slacapra 1.153 else:
151 slacapra 1.9 self.eventsPerJob = -1
152     self.selectEventsPerJob = 0
153 ewv 1.131
154 slacapra 1.22 ## number of jobs
155 slacapra 1.153 if cfg_params.has_key('CMSSW.number_of_jobs'):
156 slacapra 1.22 self.theNumberOfJobs =int( cfg_params['CMSSW.number_of_jobs'])
157     self.selectNumberOfJobs = 1
158 slacapra 1.153 else:
159 slacapra 1.22 self.theNumberOfJobs = 0
160     self.selectNumberOfJobs = 0
161 slacapra 1.10
162 slacapra 1.153 if cfg_params.has_key('CMSSW.total_number_of_events'):
163 gutsche 1.35 self.total_number_of_events = int(cfg_params['CMSSW.total_number_of_events'])
164     self.selectTotalNumberEvents = 1
165 spiga 1.193 if self.selectNumberOfJobs == 1:
166 ewv 1.194 if int(self.total_number_of_events) < int(self.theNumberOfJobs):
167 spiga 1.193 msg = 'Must specify at least one event per job. total_number_of_events > number_of_jobs '
168     raise CrabException(msg)
169 slacapra 1.153 else:
170 gutsche 1.35 self.total_number_of_events = 0
171     self.selectTotalNumberEvents = 0
172    
173 spiga 1.187 if self.pset != None:
174 spiga 1.42 if ( (self.selectTotalNumberEvents + self.selectEventsPerJob + self.selectNumberOfJobs) != 2 ):
175     msg = 'Must define exactly two of total_number_of_events, events_per_job, or number_of_jobs.'
176     raise CrabException(msg)
177     else:
178     if (self.selectNumberOfJobs == 0):
179     msg = 'Must specify number_of_jobs.'
180     raise CrabException(msg)
181 gutsche 1.35
182 ewv 1.160 ## New method of dealing with seeds
183     self.incrementSeeds = []
184     self.preserveSeeds = []
185     if cfg_params.has_key('CMSSW.preserve_seeds'):
186     tmpList = cfg_params['CMSSW.preserve_seeds'].split(',')
187     for tmp in tmpList:
188     tmp.strip()
189     self.preserveSeeds.append(tmp)
190     if cfg_params.has_key('CMSSW.increment_seeds'):
191     tmpList = cfg_params['CMSSW.increment_seeds'].split(',')
192     for tmp in tmpList:
193     tmp.strip()
194     self.incrementSeeds.append(tmp)
195    
196     ## Old method of dealing with seeds
197     ## FUTURE: This is for old CMSSW and old CRAB. Can throw exceptions after a couple of CRAB releases and then
198     ## remove
199 slacapra 1.153 self.sourceSeed = cfg_params.get('CMSSW.pythia_seed',None)
200 ewv 1.160 if self.sourceSeed:
201 slacapra 1.177 print "pythia_seed is a deprecated parameter. Use preserve_seeds or increment_seeds in the future.\n","Added to increment_seeds."
202     self.incrementSeeds.append('sourceSeed')
203 ewv 1.185 self.incrementSeeds.append('theSource')
204 slacapra 1.153
205     self.sourceSeedVtx = cfg_params.get('CMSSW.vtx_seed',None)
206 ewv 1.160 if self.sourceSeedVtx:
207 slacapra 1.177 print "vtx_seed is a deprecated parameter. Use preserve_seeds or increment_seeds in the future.\n","Added to increment_seeds."
208     self.incrementSeeds.append('VtxSmeared')
209 slacapra 1.22
210 slacapra 1.153 self.sourceSeedG4 = cfg_params.get('CMSSW.g4_seed',None)
211 ewv 1.160 if self.sourceSeedG4:
212 slacapra 1.177 print "g4_seed is a deprecated parameter. Use preserve_seeds or increment_seeds in the future.\n","Added to increment_seeds."
213     self.incrementSeeds.append('g4SimHits')
214 slacapra 1.90
215 slacapra 1.153 self.sourceSeedMix = cfg_params.get('CMSSW.mix_seed',None)
216 ewv 1.160 if self.sourceSeedMix:
217 slacapra 1.177 print "mix_seed is a deprecated parameter. Use preserve_seeds or increment_seeds in the future.\n","Added to increment_seeds."
218     self.incrementSeeds.append('mix')
219 slacapra 1.90
220 slacapra 1.153 self.firstRun = cfg_params.get('CMSSW.first_run',None)
221 slacapra 1.90
222 spiga 1.42 if self.pset != None: #CarlosDaniele
223 ewv 1.131 import PsetManipulator as pp
224 slacapra 1.97 PsetEdit = pp.PsetManipulator(self.pset) #Daniele Pset
225 gutsche 1.3
226 ewv 1.147 # Copy/return
227    
228 slacapra 1.153 self.copy_data = int(cfg_params.get('USER.copy_data',0))
229     self.return_data = int(cfg_params.get('USER.return_data',0))
230 ewv 1.147
231 slacapra 1.1 #DBSDLS-start
232 ewv 1.131 ## Initialize the variables that are extracted from DBS/DLS and needed in other places of the code
233 slacapra 1.1 self.maxEvents=0 # max events available ( --> check the requested nb. of evts in Creator.py)
234     self.DBSPaths={} # all dbs paths requested ( --> input to the site local discovery script)
235 gutsche 1.35 self.jobDestination=[] # Site destination(s) for each job (list of lists)
236 slacapra 1.1 ## Perform the data location and discovery (based on DBS/DLS)
237 slacapra 1.9 ## SL: Don't if NONE is specified as input (pythia use case)
238 gutsche 1.35 blockSites = {}
239 slacapra 1.9 if self.datasetPath:
240 gutsche 1.35 blockSites = self.DataDiscoveryAndLocation(cfg_params)
241 ewv 1.131 #DBSDLS-end
242 slacapra 1.1
243 ewv 1.131
244 slacapra 1.9 ## Select Splitting
245 ewv 1.131 if self.selectNoInput:
246 spiga 1.187 if self.pset == None:
247 spiga 1.42 self.jobSplittingForScript()
248     else:
249     self.jobSplittingNoInput()
250 gutsche 1.92 else:
251 corvo 1.56 self.jobSplittingByBlocks(blockSites)
252 gutsche 1.5
253 slacapra 1.22 # modify Pset
254 ewv 1.192 if self.pset != None:
255 slacapra 1.86 try:
256 ewv 1.160 # Add FrameworkJobReport to parameter-set, set max events.
257     # Reset later for data jobs by writeCFG which does all modifications
258 ewv 1.182 PsetEdit.addCrabFJR(self.fjrFileName) # FUTURE: Job report addition not needed by CMSSW>1.5
259 ewv 1.160 PsetEdit.maxEvent(self.eventsPerJob)
260 slacapra 1.90 PsetEdit.psetWriter(self.configFilename())
261 slacapra 1.86 except:
262 ewv 1.184 msg='Error while manipulating ParameterSet: exiting...'
263 slacapra 1.86 raise CrabException(msg)
264 spiga 1.179 self.tgzNameWithPath = self.getTarBall(self.executable)
265 gutsche 1.3
266 slacapra 1.1 def DataDiscoveryAndLocation(self, cfg_params):
267    
268 slacapra 1.86 import DataDiscovery
269     import DataLocation
270 gutsche 1.3 common.logger.debug(10,"CMSSW::DataDiscoveryAndLocation()")
271    
272     datasetPath=self.datasetPath
273    
274 slacapra 1.1 ## Contact the DBS
275 gutsche 1.92 common.logger.message("Contacting Data Discovery Services ...")
276 slacapra 1.1 try:
277 slacapra 1.137 self.pubdata=DataDiscovery.DataDiscovery(datasetPath, cfg_params)
278 slacapra 1.1 self.pubdata.fetchDBSInfo()
279    
280 slacapra 1.41 except DataDiscovery.NotExistingDatasetError, ex :
281 slacapra 1.1 msg = 'ERROR ***: failed Data Discovery in DBS : %s'%ex.getErrorMessage()
282     raise CrabException(msg)
283 slacapra 1.41 except DataDiscovery.NoDataTierinProvenanceError, ex :
284 slacapra 1.1 msg = 'ERROR ***: failed Data Discovery in DBS : %s'%ex.getErrorMessage()
285     raise CrabException(msg)
286 slacapra 1.41 except DataDiscovery.DataDiscoveryError, ex:
287 gutsche 1.66 msg = 'ERROR ***: failed Data Discovery in DBS : %s'%ex.getErrorMessage()
288 slacapra 1.1 raise CrabException(msg)
289    
290 gutsche 1.35 self.filesbyblock=self.pubdata.getFiles()
291 mkirn 1.37 self.eventsbyblock=self.pubdata.getEventsPerBlock()
292     self.eventsbyfile=self.pubdata.getEventsPerFile()
293 gutsche 1.3
294 slacapra 1.1 ## get max number of events
295 ewv 1.192 self.maxEvents=self.pubdata.getMaxEvents()
296 slacapra 1.1
297     ## Contact the DLS and build a list of sites hosting the fileblocks
298     try:
299 slacapra 1.41 dataloc=DataLocation.DataLocation(self.filesbyblock.keys(),cfg_params)
300 gutsche 1.6 dataloc.fetchDLSInfo()
301 slacapra 1.41 except DataLocation.DataLocationError , ex:
302 slacapra 1.1 msg = 'ERROR ***: failed Data Location in DLS \n %s '%ex.getErrorMessage()
303     raise CrabException(msg)
304 ewv 1.131
305 slacapra 1.1
306 gutsche 1.35 sites = dataloc.getSites()
307     allSites = []
308     listSites = sites.values()
309 slacapra 1.63 for listSite in listSites:
310     for oneSite in listSite:
311 gutsche 1.35 allSites.append(oneSite)
312     allSites = self.uniquelist(allSites)
313 gutsche 1.3
314 gutsche 1.92 # screen output
315     common.logger.message("Requested dataset: " + datasetPath + " has " + str(self.maxEvents) + " events in " + str(len(self.filesbyblock.keys())) + " blocks.\n")
316    
317 gutsche 1.35 return sites
318 ewv 1.131
319 gutsche 1.35 def jobSplittingByBlocks(self, blockSites):
320 slacapra 1.9 """
321 gutsche 1.35 Perform job splitting. Jobs run over an integer number of files
322     and no more than one block.
323     ARGUMENT: blockSites: dictionary with blocks as keys and list of host sites as values
324     REQUIRES: self.selectTotalNumberEvents, self.selectEventsPerJob, self.selectNumberofJobs,
325     self.total_number_of_events, self.eventsPerJob, self.theNumberOfJobs,
326     self.maxEvents, self.filesbyblock
327     SETS: self.jobDestination - Site destination(s) for each job (a list of lists)
328     self.total_number_of_jobs - Total # of jobs
329     self.list_of_args - File(s) job will run on (a list of lists)
330     """
331    
332     # ---- Handle the possible job splitting configurations ---- #
333     if (self.selectTotalNumberEvents):
334     totalEventsRequested = self.total_number_of_events
335     if (self.selectEventsPerJob):
336     eventsPerJobRequested = self.eventsPerJob
337     if (self.selectNumberOfJobs):
338     totalEventsRequested = self.theNumberOfJobs * self.eventsPerJob
339    
340     # If user requested all the events in the dataset
341     if (totalEventsRequested == -1):
342     eventsRemaining=self.maxEvents
343     # If user requested more events than are in the dataset
344     elif (totalEventsRequested > self.maxEvents):
345     eventsRemaining = self.maxEvents
346     common.logger.message("Requested "+str(self.total_number_of_events)+ " events, but only "+str(self.maxEvents)+" events are available.")
347     # If user requested less events than are in the dataset
348     else:
349     eventsRemaining = totalEventsRequested
350 slacapra 1.22
351 slacapra 1.41 # If user requested more events per job than are in the dataset
352     if (self.selectEventsPerJob and eventsPerJobRequested > self.maxEvents):
353     eventsPerJobRequested = self.maxEvents
354    
355 gutsche 1.35 # For user info at end
356     totalEventCount = 0
357 gutsche 1.3
358 gutsche 1.35 if (self.selectTotalNumberEvents and self.selectNumberOfJobs):
359     eventsPerJobRequested = int(eventsRemaining/self.theNumberOfJobs)
360 slacapra 1.22
361 gutsche 1.35 if (self.selectNumberOfJobs):
362     common.logger.message("May not create the exact number_of_jobs requested.")
363 slacapra 1.23
364 gutsche 1.38 if ( self.ncjobs == 'all' ) :
365     totalNumberOfJobs = 999999999
366     else :
367     totalNumberOfJobs = self.ncjobs
368 ewv 1.131
369 gutsche 1.35 blocks = blockSites.keys()
370     blockCount = 0
371     # Backup variable in case self.maxEvents counted events in a non-included block
372     numBlocksInDataset = len(blocks)
373 gutsche 1.3
374 gutsche 1.35 jobCount = 0
375     list_of_lists = []
376 gutsche 1.3
377 gutsche 1.92 # list tracking which jobs are in which jobs belong to which block
378     jobsOfBlock = {}
379    
380 gutsche 1.35 # ---- Iterate over the blocks in the dataset until ---- #
381     # ---- we've met the requested total # of events ---- #
382 gutsche 1.38 while ( (eventsRemaining > 0) and (blockCount < numBlocksInDataset) and (jobCount < totalNumberOfJobs)):
383 gutsche 1.35 block = blocks[blockCount]
384 gutsche 1.44 blockCount += 1
385 gutsche 1.104 if block not in jobsOfBlock.keys() :
386     jobsOfBlock[block] = []
387 ewv 1.131
388 gutsche 1.68 if self.eventsbyblock.has_key(block) :
389     numEventsInBlock = self.eventsbyblock[block]
390     common.logger.debug(5,'Events in Block File '+str(numEventsInBlock))
391 ewv 1.131
392 gutsche 1.68 files = self.filesbyblock[block]
393     numFilesInBlock = len(files)
394     if (numFilesInBlock <= 0):
395     continue
396     fileCount = 0
397    
398     # ---- New block => New job ---- #
399 ewv 1.131 parString = ""
400 gutsche 1.68 # counter for number of events in files currently worked on
401     filesEventCount = 0
402     # flag if next while loop should touch new file
403     newFile = 1
404     # job event counter
405     jobSkipEventCount = 0
406 ewv 1.131
407 gutsche 1.68 # ---- Iterate over the files in the block until we've met the requested ---- #
408     # ---- total # of events or we've gone over all the files in this block ---- #
409     while ( (eventsRemaining > 0) and (fileCount < numFilesInBlock) and (jobCount < totalNumberOfJobs) ):
410     file = files[fileCount]
411     if newFile :
412     try:
413     numEventsInFile = self.eventsbyfile[file]
414     common.logger.debug(6, "File "+str(file)+" has "+str(numEventsInFile)+" events")
415     # increase filesEventCount
416     filesEventCount += numEventsInFile
417     # Add file to current job
418     parString += '\\\"' + file + '\\\"\,'
419     newFile = 0
420     except KeyError:
421     common.logger.message("File "+str(file)+" has unknown number of events: skipping")
422 ewv 1.131
423 slacapra 1.177 eventsPerJobRequested = min(eventsPerJobRequested, eventsRemaining)
424 gutsche 1.68 # if less events in file remain than eventsPerJobRequested
425 slacapra 1.177 if ( filesEventCount - jobSkipEventCount < eventsPerJobRequested):
426 gutsche 1.68 # if last file in block
427     if ( fileCount == numFilesInBlock-1 ) :
428     # end job using last file, use remaining events in block
429     # close job and touch new file
430     fullString = parString[:-2]
431     list_of_lists.append([fullString,str(-1),str(jobSkipEventCount)])
432     common.logger.debug(3,"Job "+str(jobCount+1)+" can run over "+str(filesEventCount - jobSkipEventCount)+" events (last file in block).")
433     self.jobDestination.append(blockSites[block])
434     common.logger.debug(5,"Job "+str(jobCount+1)+" Destination: "+str(self.jobDestination[jobCount]))
435 gutsche 1.92 # fill jobs of block dictionary
436 gutsche 1.104 jobsOfBlock[block].append(jobCount+1)
437 gutsche 1.68 # reset counter
438     jobCount = jobCount + 1
439     totalEventCount = totalEventCount + filesEventCount - jobSkipEventCount
440     eventsRemaining = eventsRemaining - filesEventCount + jobSkipEventCount
441     jobSkipEventCount = 0
442     # reset file
443 ewv 1.131 parString = ""
444 gutsche 1.68 filesEventCount = 0
445     newFile = 1
446     fileCount += 1
447     else :
448     # go to next file
449     newFile = 1
450     fileCount += 1
451     # if events in file equal to eventsPerJobRequested
452     elif ( filesEventCount - jobSkipEventCount == eventsPerJobRequested ) :
453 gutsche 1.38 # close job and touch new file
454     fullString = parString[:-2]
455 gutsche 1.68 list_of_lists.append([fullString,str(eventsPerJobRequested),str(jobSkipEventCount)])
456     common.logger.debug(3,"Job "+str(jobCount+1)+" can run over "+str(eventsPerJobRequested)+" events.")
457 gutsche 1.38 self.jobDestination.append(blockSites[block])
458     common.logger.debug(5,"Job "+str(jobCount+1)+" Destination: "+str(self.jobDestination[jobCount]))
459 gutsche 1.104 jobsOfBlock[block].append(jobCount+1)
460 gutsche 1.38 # reset counter
461     jobCount = jobCount + 1
462 gutsche 1.68 totalEventCount = totalEventCount + eventsPerJobRequested
463     eventsRemaining = eventsRemaining - eventsPerJobRequested
464 gutsche 1.38 jobSkipEventCount = 0
465     # reset file
466 ewv 1.131 parString = ""
467 gutsche 1.38 filesEventCount = 0
468     newFile = 1
469     fileCount += 1
470 ewv 1.131
471 gutsche 1.68 # if more events in file remain than eventsPerJobRequested
472 gutsche 1.38 else :
473 gutsche 1.68 # close job but don't touch new file
474     fullString = parString[:-2]
475     list_of_lists.append([fullString,str(eventsPerJobRequested),str(jobSkipEventCount)])
476     common.logger.debug(3,"Job "+str(jobCount+1)+" can run over "+str(eventsPerJobRequested)+" events.")
477     self.jobDestination.append(blockSites[block])
478     common.logger.debug(5,"Job "+str(jobCount+1)+" Destination: "+str(self.jobDestination[jobCount]))
479 gutsche 1.104 jobsOfBlock[block].append(jobCount+1)
480 gutsche 1.68 # increase counter
481     jobCount = jobCount + 1
482     totalEventCount = totalEventCount + eventsPerJobRequested
483     eventsRemaining = eventsRemaining - eventsPerJobRequested
484     # calculate skip events for last file
485     # use filesEventCount (contains several files), jobSkipEventCount and eventsPerJobRequest
486     jobSkipEventCount = eventsPerJobRequested - (filesEventCount - jobSkipEventCount - self.eventsbyfile[file])
487     # remove all but the last file
488     filesEventCount = self.eventsbyfile[file]
489 ewv 1.160 parString = '\\\"' + file + '\\\"\,'
490 gutsche 1.68 pass # END if
491     pass # END while (iterate over files in the block)
492 gutsche 1.35 pass # END while (iterate over blocks in the dataset)
493 slacapra 1.41 self.ncjobs = self.total_number_of_jobs = jobCount
494 gutsche 1.38 if (eventsRemaining > 0 and jobCount < totalNumberOfJobs ):
495 gutsche 1.35 common.logger.message("Could not run on all requested events because some blocks not hosted at allowed sites.")
496 gutsche 1.92 common.logger.message(str(jobCount)+" job(s) can run on "+str(totalEventCount)+" events.\n")
497 ewv 1.131
498 gutsche 1.92 # screen output
499     screenOutput = "List of jobs and available destination sites:\n\n"
500    
501 mcinquil 1.124 # keep trace of block with no sites to print a warning at the end
502     noSiteBlock = []
503     bloskNoSite = []
504    
505 gutsche 1.92 blockCounter = 0
506 gutsche 1.104 for block in blocks:
507     if block in jobsOfBlock.keys() :
508     blockCounter += 1
509 slacapra 1.176 screenOutput += "Block %5i: jobs %20s: sites: %s\n" % (blockCounter,spanRanges(jobsOfBlock[block]),
510     ','.join(self.blackWhiteListParser.checkWhiteList(self.blackWhiteListParser.checkBlackList(blockSites[block],block),block)))
511 mcinquil 1.124 if len(self.blackWhiteListParser.checkWhiteList(self.blackWhiteListParser.checkBlackList(blockSites[block],block),block)) == 0:
512 ewv 1.131 noSiteBlock.append( spanRanges(jobsOfBlock[block]) )
513 mcinquil 1.124 bloskNoSite.append( blockCounter )
514 ewv 1.131
515 mcinquil 1.124 common.logger.message(screenOutput)
516 fanzago 1.127 if len(noSiteBlock) > 0 and len(bloskNoSite) > 0:
517 mcinquil 1.126 msg = 'WARNING: No sites are hosting any part of data for block:\n '
518     virgola = ""
519     if len(bloskNoSite) > 1:
520     virgola = ","
521     for block in bloskNoSite:
522     msg += ' ' + str(block) + virgola
523     msg += '\n Related jobs:\n '
524     virgola = ""
525     if len(noSiteBlock) > 1:
526     virgola = ","
527     for range_jobs in noSiteBlock:
528     msg += str(range_jobs) + virgola
529     msg += '\n will not be submitted and this block of data can not be analyzed!\n'
530 slacapra 1.155 if self.cfg_params.has_key('EDG.se_white_list'):
531     msg += 'WARNING: SE White List: '+self.cfg_params['EDG.se_white_list']+'\n'
532     msg += '(Hint: By whitelisting you force the job to run at this particular site(s).\n'
533     msg += 'Please check if the dataset is available at this site!)\n'
534     if self.cfg_params.has_key('EDG.ce_white_list'):
535     msg += 'WARNING: CE White List: '+self.cfg_params['EDG.ce_white_list']+'\n'
536     msg += '(Hint: By whitelisting you force the job to run at this particular site(s).\n'
537     msg += 'Please check if the dataset is available at this site!)\n'
538    
539 mcinquil 1.126 common.logger.message(msg)
540 gutsche 1.92
541 slacapra 1.9 self.list_of_args = list_of_lists
542     return
543    
544 slacapra 1.21 def jobSplittingNoInput(self):
545 slacapra 1.9 """
546     Perform job splitting based on number of event per job
547     """
548     common.logger.debug(5,'Splitting per events')
549 fanzago 1.130
550 ewv 1.131 if (self.selectEventsPerJob):
551 fanzago 1.130 common.logger.message('Required '+str(self.eventsPerJob)+' events per job ')
552     if (self.selectNumberOfJobs):
553     common.logger.message('Required '+str(self.theNumberOfJobs)+' jobs in total ')
554     if (self.selectTotalNumberEvents):
555     common.logger.message('Required '+str(self.total_number_of_events)+' events in total ')
556 slacapra 1.9
557 slacapra 1.10 if (self.total_number_of_events < 0):
558     msg='Cannot split jobs per Events with "-1" as total number of events'
559     raise CrabException(msg)
560    
561 slacapra 1.22 if (self.selectEventsPerJob):
562 spiga 1.65 if (self.selectTotalNumberEvents):
563     self.total_number_of_jobs = int(self.total_number_of_events/self.eventsPerJob)
564 ewv 1.131 elif(self.selectNumberOfJobs) :
565 spiga 1.65 self.total_number_of_jobs =self.theNumberOfJobs
566 ewv 1.131 self.total_number_of_events =int(self.theNumberOfJobs*self.eventsPerJob)
567 spiga 1.65
568 slacapra 1.22 elif (self.selectNumberOfJobs) :
569     self.total_number_of_jobs = self.theNumberOfJobs
570     self.eventsPerJob = int(self.total_number_of_events/self.total_number_of_jobs)
571 ewv 1.131
572 slacapra 1.9 common.logger.debug(5,'N jobs '+str(self.total_number_of_jobs))
573    
574     # is there any remainder?
575     check = int(self.total_number_of_events) - (int(self.total_number_of_jobs)*self.eventsPerJob)
576    
577     common.logger.debug(5,'Check '+str(check))
578    
579 gutsche 1.35 common.logger.message(str(self.total_number_of_jobs)+' jobs can be created, each for '+str(self.eventsPerJob)+' for a total of '+str(self.total_number_of_jobs*self.eventsPerJob)+' events')
580 slacapra 1.9 if check > 0:
581 gutsche 1.35 common.logger.message('Warning: asked '+str(self.total_number_of_events)+' but can do only '+str(int(self.total_number_of_jobs)*self.eventsPerJob))
582 slacapra 1.9
583 slacapra 1.10 # argument is seed number.$i
584 slacapra 1.9 self.list_of_args = []
585     for i in range(self.total_number_of_jobs):
586 gutsche 1.35 ## Since there is no input, any site is good
587 ewv 1.131 self.jobDestination.append([""]) #must be empty to write correctly the xml
588 slacapra 1.90 args=[]
589 spiga 1.57 if (self.firstRun):
590 slacapra 1.138 ## pythia first run
591 slacapra 1.90 args.append(str(self.firstRun)+str(i))
592     self.list_of_args.append(args)
593 ewv 1.131
594 gutsche 1.3 return
595    
596 spiga 1.42
597 spiga 1.187 def jobSplittingForScript(self):
598 spiga 1.42 """
599     Perform job splitting based on number of job
600     """
601     common.logger.debug(5,'Splitting per job')
602     common.logger.message('Required '+str(self.theNumberOfJobs)+' jobs in total ')
603    
604     self.total_number_of_jobs = self.theNumberOfJobs
605    
606     common.logger.debug(5,'N jobs '+str(self.total_number_of_jobs))
607    
608     common.logger.message(str(self.total_number_of_jobs)+' jobs can be created')
609    
610     # argument is seed number.$i
611     self.list_of_args = []
612     for i in range(self.total_number_of_jobs):
613     self.jobDestination.append([""])
614     self.list_of_args.append([str(i)])
615     return
616    
617 gutsche 1.3 def split(self, jobParams):
618 ewv 1.131
619 gutsche 1.3 njobs = self.total_number_of_jobs
620 slacapra 1.9 arglist = self.list_of_args
621 gutsche 1.3 # create the empty structure
622     for i in range(njobs):
623     jobParams.append("")
624 ewv 1.131
625 spiga 1.165 listID=[]
626     listField=[]
627 gutsche 1.3 for job in range(njobs):
628 slacapra 1.17 jobParams[job] = arglist[job]
629 spiga 1.167 listID.append(job+1)
630 spiga 1.162 job_ToSave ={}
631 spiga 1.169 concString = ' '
632 spiga 1.165 argu=''
633     if len(jobParams[job]):
634     argu += concString.join(jobParams[job] )
635 spiga 1.187 job_ToSave['arguments']= str(job+1)+' '+argu
636     job_ToSave['dlsDestination']= self.jobDestination[job]
637 spiga 1.165 listField.append(job_ToSave)
638 spiga 1.169 msg="Job "+str(job)+" Arguments: "+str(job+1)+" "+argu+"\n" \
639 spiga 1.165 +" Destination: "+str(self.jobDestination[job])
640     common.logger.debug(5,msg)
641 spiga 1.187 common._db.updateJob_(listID,listField)
642 spiga 1.181 self.argsList = (len(jobParams[0])+1)
643 gutsche 1.3
644     return
645 ewv 1.131
646 gutsche 1.3 def numberOfJobs(self):
647     return self.total_number_of_jobs
648    
649 slacapra 1.1 def getTarBall(self, exe):
650     """
651     Return the TarBall with lib and exe
652     """
653 corvo 1.56 self.tgzNameWithPath = common.work_space.pathForTgz()+'share/'+self.tgz_name
654 slacapra 1.1 if os.path.exists(self.tgzNameWithPath):
655     return self.tgzNameWithPath
656    
657     # Prepare a tar gzipped file with user binaries.
658     self.buildTar_(exe)
659    
660     return string.strip(self.tgzNameWithPath)
661    
662     def buildTar_(self, executable):
663    
664     # First of all declare the user Scram area
665     swArea = self.scram.getSWArea_()
666     swReleaseTop = self.scram.getReleaseTop_()
667 ewv 1.131
668 slacapra 1.1 ## check if working area is release top
669     if swReleaseTop == '' or swArea == swReleaseTop:
670 afanfani 1.172 common.logger.debug(3,"swArea = "+swArea+" swReleaseTop ="+swReleaseTop)
671 slacapra 1.1 return
672    
673 slacapra 1.61 import tarfile
674     try: # create tar ball
675     tar = tarfile.open(self.tgzNameWithPath, "w:gz")
676     ## First find the executable
677 slacapra 1.86 if (self.executable != ''):
678 slacapra 1.61 exeWithPath = self.scram.findFile_(executable)
679     if ( not exeWithPath ):
680     raise CrabException('User executable '+executable+' not found')
681 ewv 1.131
682 slacapra 1.61 ## then check if it's private or not
683     if exeWithPath.find(swReleaseTop) == -1:
684     # the exe is private, so we must ship
685     common.logger.debug(5,"Exe "+exeWithPath+" to be tarred")
686     path = swArea+'/'
687 corvo 1.85 # distinguish case when script is in user project area or given by full path somewhere else
688     if exeWithPath.find(path) >= 0 :
689     exe = string.replace(exeWithPath, path,'')
690 slacapra 1.129 tar.add(path+exe,exe)
691 corvo 1.85 else :
692     tar.add(exeWithPath,os.path.basename(executable))
693 slacapra 1.61 pass
694     else:
695     # the exe is from release, we'll find it on WN
696     pass
697 ewv 1.131
698 slacapra 1.61 ## Now get the libraries: only those in local working area
699     libDir = 'lib'
700     lib = swArea+'/' +libDir
701     common.logger.debug(5,"lib "+lib+" to be tarred")
702     if os.path.exists(lib):
703     tar.add(lib,libDir)
704 ewv 1.131
705 slacapra 1.61 ## Now check if module dir is present
706     moduleDir = 'module'
707     module = swArea + '/' + moduleDir
708     if os.path.isdir(module):
709     tar.add(module,moduleDir)
710    
711     ## Now check if any data dir(s) is present
712     swAreaLen=len(swArea)
713 spiga 1.179 self.dataExist = False
714 slacapra 1.61 for root, dirs, files in os.walk(swArea):
715     if "data" in dirs:
716 spiga 1.179 self.dataExist=True
717 slacapra 1.61 common.logger.debug(5,"data "+root+"/data"+" to be tarred")
718     tar.add(root+"/data",root[swAreaLen:]+"/data")
719 ewv 1.182
720 spiga 1.179 ### CMSSW ParameterSet
721     if not self.pset is None:
722     cfg_file = common.work_space.jobDir()+self.configFilename()
723 ewv 1.182 tar.add(cfg_file,self.configFilename())
724 spiga 1.179 common.logger.debug(5,"File added to "+self.tgzNameWithPath+" : "+str(tar.getnames()))
725 slacapra 1.61
726 fanzago 1.93
727 fanzago 1.152 ## Add ProdCommon dir to tar
728 fanzago 1.93 prodcommonDir = 'ProdCommon'
729     prodcommonPath = os.environ['CRABDIR'] + '/' + 'ProdCommon'
730     if os.path.isdir(prodcommonPath):
731     tar.add(prodcommonPath,prodcommonDir)
732 spiga 1.179 common.logger.debug(5,"Files added to "+self.tgzNameWithPath+" : "+str(tar.getnames()))
733    
734     ##### ML stuff
735     ML_file_list=['report.py', 'DashboardAPI.py', 'Logger.py', 'ProcInfo.py', 'apmon.py']
736     path=os.environ['CRABDIR'] + '/python/'
737     for file in ML_file_list:
738     tar.add(path+file,file)
739     common.logger.debug(5,"Files added to "+self.tgzNameWithPath+" : "+str(tar.getnames()))
740    
741     ##### Utils
742     Utils_file_list=['parseCrabFjr.py','writeCfg.py', 'JobReportErrorCode.py']
743     for file in Utils_file_list:
744     tar.add(path+file,file)
745     common.logger.debug(5,"Files added to "+self.tgzNameWithPath+" : "+str(tar.getnames()))
746 ewv 1.131
747 ewv 1.182 ##### AdditionalFiles
748 spiga 1.179 for file in self.additional_inbox_files:
749     tar.add(file,string.split(file,'/')[-1])
750 slacapra 1.61 common.logger.debug(5,"Files added to "+self.tgzNameWithPath+" : "+str(tar.getnames()))
751 ewv 1.182
752 slacapra 1.61 tar.close()
753     except :
754     raise CrabException('Could not create tar-ball')
755 gutsche 1.72
756     ## check for tarball size
757     tarballinfo = os.stat(self.tgzNameWithPath)
758     if ( tarballinfo.st_size > self.MaxTarBallSize*1024*1024 ) :
759     raise CrabException('Input sandbox size of ' + str(float(tarballinfo.st_size)/1024.0/1024.0) + ' MB is larger than the allowed ' + str(self.MaxTarBallSize) + ' MB input sandbox limit and not supported by the used GRID submission system. Please make sure that no unnecessary files are in all data directories in your local CMSSW project area as they are automatically packed into the input sandbox.')
760    
761 slacapra 1.61 ## create tar-ball with ML stuff
762 slacapra 1.97
763 spiga 1.165 def wsSetupEnvironment(self, nj=0):
764 slacapra 1.1 """
765     Returns part of a job script which prepares
766     the execution environment for the job 'nj'.
767     """
768 ewv 1.184 if (self.CMSSW_major >= 2 and self.CMSSW_minor >= 1) or (self.CMSSW_major >= 3):
769     psetName = 'pset.py'
770     else:
771     psetName = 'pset.cfg'
772 slacapra 1.1 # Prepare JobType-independent part
773 ewv 1.160 txt = '\n#Written by cms_cmssw::wsSetupEnvironment\n'
774 fanzago 1.133 txt += 'echo ">>> setup environment"\n'
775 ewv 1.131 txt += 'if [ $middleware == LCG ]; then \n'
776 gutsche 1.3 txt += self.wsSetupCMSLCGEnvironment_()
777     txt += 'elif [ $middleware == OSG ]; then\n'
778 gutsche 1.43 txt += ' WORKING_DIR=`/bin/mktemp -d $OSG_WN_TMP/cms_XXXXXXXXXXXX`\n'
779 ewv 1.132 txt += ' if [ ! $? == 0 ] ;then\n'
780 fanzago 1.161 txt += ' echo "ERROR ==> OSG $WORKING_DIR could not be created on WN `hostname`"\n'
781     txt += ' job_exit_code=10016\n'
782     txt += ' func_exit\n'
783 gutsche 1.3 txt += ' fi\n'
784 fanzago 1.133 txt += ' echo ">>> Created working directory: $WORKING_DIR"\n'
785 gutsche 1.3 txt += '\n'
786     txt += ' echo "Change to working directory: $WORKING_DIR"\n'
787     txt += ' cd $WORKING_DIR\n'
788 fanzago 1.133 txt += ' echo ">>> current directory (WORKING_DIR): $WORKING_DIR"\n'
789 ewv 1.131 txt += self.wsSetupCMSOSGEnvironment_()
790 gutsche 1.3 txt += 'fi\n'
791 slacapra 1.1
792     # Prepare JobType-specific part
793     scram = self.scram.commandName()
794     txt += '\n\n'
795 fanzago 1.133 txt += 'echo ">>> specific cmssw setup environment:"\n'
796     txt += 'echo "CMSSW_VERSION = '+self.version+'"\n'
797 slacapra 1.1 txt += scram+' project CMSSW '+self.version+'\n'
798     txt += 'status=$?\n'
799     txt += 'if [ $status != 0 ] ; then\n'
800 fanzago 1.161 txt += ' echo "ERROR ==> CMSSW '+self.version+' not found on `hostname`" \n'
801     txt += ' job_exit_code=10034\n'
802 fanzago 1.163 txt += ' func_exit\n'
803 slacapra 1.1 txt += 'fi \n'
804     txt += 'cd '+self.version+'\n'
805 fanzago 1.99 txt += 'SOFTWARE_DIR=`pwd`\n'
806 fanzago 1.133 txt += 'echo ">>> current directory (SOFTWARE_DIR): $SOFTWARE_DIR" \n'
807 slacapra 1.1 txt += 'eval `'+scram+' runtime -sh | grep -v SCRAMRT_LSB_JOBNAME`\n'
808 fanzago 1.180 txt += 'if [ $? != 0 ] ; then\n'
809     txt += ' echo "ERROR ==> Problem with the command: "\n'
810     txt += ' echo "eval \`'+scram+' runtime -sh | grep -v SCRAMRT_LSB_JOBNAME \` at `hostname`"\n'
811     txt += ' job_exit_code=10034\n'
812     txt += ' func_exit\n'
813     txt += 'fi \n'
814 slacapra 1.1 # Handle the arguments:
815     txt += "\n"
816 gutsche 1.7 txt += "## number of arguments (first argument always jobnumber)\n"
817 slacapra 1.1 txt += "\n"
818 spiga 1.165 txt += "if [ $nargs -lt "+str(self.argsList)+" ]\n"
819 slacapra 1.1 txt += "then\n"
820 fanzago 1.161 txt += " echo 'ERROR ==> Too few arguments' +$nargs+ \n"
821     txt += ' job_exit_code=50113\n'
822     txt += " func_exit\n"
823 slacapra 1.1 txt += "fi\n"
824     txt += "\n"
825    
826     # Prepare job-specific part
827     job = common.job_list[nj]
828 ewv 1.131 if (self.datasetPath):
829 fanzago 1.93 txt += '\n'
830     txt += 'DatasetPath='+self.datasetPath+'\n'
831    
832     datasetpath_split = self.datasetPath.split("/")
833 ewv 1.131
834 fanzago 1.93 txt += 'PrimaryDataset='+datasetpath_split[1]+'\n'
835     txt += 'DataTier='+datasetpath_split[2]+'\n'
836 fanzago 1.96 txt += 'ApplicationFamily=cmsRun\n'
837 fanzago 1.93
838     else:
839     txt += 'DatasetPath=MCDataTier\n'
840     txt += 'PrimaryDataset=null\n'
841     txt += 'DataTier=null\n'
842     txt += 'ApplicationFamily=MCDataTier\n'
843 ewv 1.170 if self.pset != None:
844 spiga 1.42 pset = os.path.basename(job.configFilename())
845     txt += '\n'
846 spiga 1.95 txt += 'cp $RUNTIME_AREA/'+pset+' .\n'
847 spiga 1.42 if (self.datasetPath): # standard job
848 ewv 1.160 txt += 'InputFiles=${args[1]}; export InputFiles\n'
849     txt += 'MaxEvents=${args[2]}; export MaxEvents\n'
850     txt += 'SkipEvents=${args[3]}; export SkipEvents\n'
851 spiga 1.42 txt += 'echo "Inputfiles:<$InputFiles>"\n'
852     txt += 'echo "MaxEvents:<$MaxEvents>"\n'
853     txt += 'echo "SkipEvents:<$SkipEvents>"\n'
854     else: # pythia like job
855 ewv 1.160 txt += 'PreserveSeeds=' + ','.join(self.preserveSeeds) + '; export PreserveSeeds\n'
856     txt += 'IncrementSeeds=' + ','.join(self.incrementSeeds) + '; export IncrementSeeds\n'
857     txt += 'echo "PreserveSeeds: <$PreserveSeeds>"\n'
858     txt += 'echo "IncrementSeeds:<$IncrementSeeds>"\n'
859 slacapra 1.90 if (self.firstRun):
860 ewv 1.160 txt += 'FirstRun=${args[1]}; export FirstRun\n'
861 spiga 1.57 txt += 'echo "FirstRun: <$FirstRun>"\n'
862 slacapra 1.90
863 ewv 1.184 txt += 'mv -f ' + pset + ' ' + psetName + '\n'
864 slacapra 1.1
865    
866 fanzago 1.163 if self.pset != None:
867 ewv 1.184 # FUTURE: Can simply for 2_1_x and higher
868 spiga 1.42 txt += '\n'
869 spiga 1.197 if self.debug_wrapper==True:
870 spiga 1.188 txt += 'echo "***** cat ' + psetName + ' *********"\n'
871     txt += 'cat ' + psetName + '\n'
872     txt += 'echo "****** end ' + psetName + ' ********"\n'
873     txt += '\n'
874 ewv 1.184 txt += 'PSETHASH=`edmConfigHash < ' + psetName + '` \n'
875 fanzago 1.94 txt += 'echo "PSETHASH = $PSETHASH" \n'
876 fanzago 1.93 txt += '\n'
877 gutsche 1.3 return txt
878 slacapra 1.176
879 fanzago 1.166 def wsUntarSoftware(self, nj=0):
880 gutsche 1.3 """
881     Put in the script the commands to build an executable
882     or a library.
883     """
884    
885 fanzago 1.166 txt = '\n#Written by cms_cmssw::wsUntarSoftware\n'
886 gutsche 1.3
887     if os.path.isfile(self.tgzNameWithPath):
888 fanzago 1.133 txt += 'echo ">>> tar xzvf $RUNTIME_AREA/'+os.path.basename(self.tgzNameWithPath)+' :" \n'
889 gutsche 1.3 txt += 'tar xzvf $RUNTIME_AREA/'+os.path.basename(self.tgzNameWithPath)+'\n'
890 spiga 1.199 if self.debug_wrapper:
891     txt += 'ls -Al \n'
892 gutsche 1.3 txt += 'untar_status=$? \n'
893     txt += 'if [ $untar_status -ne 0 ]; then \n'
894 fanzago 1.161 txt += ' echo "ERROR ==> Untarring .tgz file failed"\n'
895     txt += ' job_exit_code=$untar_status\n'
896     txt += ' func_exit\n'
897 gutsche 1.3 txt += 'else \n'
898     txt += ' echo "Successful untar" \n'
899     txt += 'fi \n'
900 gutsche 1.50 txt += '\n'
901 fanzago 1.152 txt += 'echo ">>> Include ProdCommon in PYTHONPATH:"\n'
902 gutsche 1.50 txt += 'if [ -z "$PYTHONPATH" ]; then\n'
903 fanzago 1.166 txt += ' export PYTHONPATH=$RUNTIME_AREA/ProdCommon\n'
904 gutsche 1.50 txt += 'else\n'
905 fanzago 1.166 txt += ' export PYTHONPATH=$RUNTIME_AREA/ProdCommon:${PYTHONPATH}\n'
906 fanzago 1.93 txt += 'echo "PYTHONPATH=$PYTHONPATH"\n'
907 gutsche 1.50 txt += 'fi\n'
908     txt += '\n'
909    
910 gutsche 1.3 pass
911 ewv 1.131
912 slacapra 1.1 return txt
913 ewv 1.170
914 fanzago 1.166 def wsBuildExe(self, nj=0):
915     """
916     Put in the script the commands to build an executable
917     or a library.
918     """
919    
920     txt = '\n#Written by cms_cmssw::wsBuildExe\n'
921     txt += 'echo ">>> moving CMSSW software directories in `pwd`" \n'
922    
923 ewv 1.170 txt += 'rm -r lib/ module/ \n'
924     txt += 'mv $RUNTIME_AREA/lib/ . \n'
925     txt += 'mv $RUNTIME_AREA/module/ . \n'
926 spiga 1.186 if self.dataExist == True:
927     txt += 'rm -r src/ \n'
928     txt += 'mv $RUNTIME_AREA/src/ . \n'
929 ewv 1.182 if len(self.additional_inbox_files)>0:
930 spiga 1.179 for file in self.additional_inbox_files:
931 spiga 1.191 txt += 'mv $RUNTIME_AREA/'+os.path.basename(file)+' . \n'
932 ewv 1.170 txt += 'mv $RUNTIME_AREA/ProdCommon/ . \n'
933    
934 fanzago 1.166 txt += 'if [ -z "$PYTHONPATH" ]; then\n'
935     txt += ' export PYTHONPATH=$SOFTWARE_DIR/ProdCommon\n'
936     txt += 'else\n'
937     txt += ' export PYTHONPATH=$SOFTWARE_DIR/ProdCommon:${PYTHONPATH}\n'
938     txt += 'echo "PYTHONPATH=$PYTHONPATH"\n'
939     txt += 'fi\n'
940     txt += '\n'
941    
942     return txt
943 slacapra 1.1
944     def modifySteeringCards(self, nj):
945     """
946 ewv 1.131 modify the card provided by the user,
947 slacapra 1.1 writing a new card into share dir
948     """
949 ewv 1.131
950 slacapra 1.1 def executableName(self):
951 ewv 1.192 if self.scriptExe:
952 spiga 1.42 return "sh "
953     else:
954     return self.executable
955 slacapra 1.1
956     def executableArgs(self):
957 ewv 1.160 # FUTURE: This function tests the CMSSW version. Can be simplified as we drop support for old versions
958 slacapra 1.70 if self.scriptExe:#CarlosDaniele
959 spiga 1.42 return self.scriptExe + " $NJob"
960 fanzago 1.115 else:
961 ewv 1.160 ex_args = ""
962 ewv 1.171 # FUTURE: This tests the CMSSW version. Can remove code as versions deprecated
963 ewv 1.160 # Framework job report
964 ewv 1.184 if (self.CMSSW_major >= 1 and self.CMSSW_minor >= 5) or (self.CMSSW_major >= 2):
965 fanzago 1.166 ex_args += " -j $RUNTIME_AREA/crab_fjr_$NJob.xml"
966 ewv 1.184 # Type of config file
967     if self.CMSSW_major >= 2 :
968 ewv 1.171 ex_args += " -p pset.py"
969 fanzago 1.115 else:
970 ewv 1.160 ex_args += " -p pset.cfg"
971     return ex_args
972 slacapra 1.1
973     def inputSandbox(self, nj):
974     """
975     Returns a list of filenames to be put in JDL input sandbox.
976     """
977     inp_box = []
978     if os.path.isfile(self.tgzNameWithPath):
979     inp_box.append(self.tgzNameWithPath)
980 spiga 1.168 wrapper = os.path.basename(str(common._db.queryTask('scriptName')))
981     inp_box.append(common.work_space.pathForTgz() +'job/'+ wrapper)
982 slacapra 1.1 return inp_box
983    
984     def outputSandbox(self, nj):
985     """
986     Returns a list of filenames to be put in JDL output sandbox.
987     """
988     out_box = []
989    
990     ## User Declared output files
991 slacapra 1.54 for out in (self.output_file+self.output_file_sandbox):
992 ewv 1.131 n_out = nj + 1
993 slacapra 1.1 out_box.append(self.numberFile_(out,str(n_out)))
994     return out_box
995    
996     def prepareSteeringCards(self):
997     """
998     Make initial modifications of the user's steering card file.
999     """
1000     return
1001    
1002     def wsRenameOutput(self, nj):
1003     """
1004     Returns part of a job script which renames the produced files.
1005     """
1006    
1007 ewv 1.160 txt = '\n#Written by cms_cmssw::wsRenameOutput\n'
1008 fanzago 1.148 txt += 'echo ">>> current directory (SOFTWARE_DIR): $SOFTWARE_DIR" \n'
1009     txt += 'echo ">>> current directory content:"\n'
1010 spiga 1.199 if self.debug_wrapper:
1011     txt += 'ls -Al\n'
1012 fanzago 1.145 txt += '\n'
1013 slacapra 1.54
1014 fanzago 1.128 for fileWithSuffix in (self.output_file):
1015 slacapra 1.1 output_file_num = self.numberFile_(fileWithSuffix, '$NJob')
1016     txt += '\n'
1017 gutsche 1.7 txt += '# check output file\n'
1018 slacapra 1.106 txt += 'if [ -e ./'+fileWithSuffix+' ] ; then\n'
1019 ewv 1.147 if (self.copy_data == 1): # For OSG nodes, file is in $WORKING_DIR, should not be moved to $RUNTIME_AREA
1020     txt += ' mv '+fileWithSuffix+' '+output_file_num+'\n'
1021 ewv 1.198 #txt += ' ln -s `pwd`/'+output_file_num+' $RUNTIME_AREA/'+fileWithSuffix+'\n'
1022 ewv 1.147 else:
1023     txt += ' mv '+fileWithSuffix+' $RUNTIME_AREA/'+output_file_num+'\n'
1024     txt += ' ln -s $RUNTIME_AREA/'+output_file_num+' $RUNTIME_AREA/'+fileWithSuffix+'\n'
1025 slacapra 1.106 txt += 'else\n'
1026 fanzago 1.161 txt += ' job_exit_code=60302\n'
1027     txt += ' echo "WARNING: Output file '+fileWithSuffix+' not found"\n'
1028 ewv 1.156 if common.scheduler.name().upper() == 'CONDOR_G':
1029 gutsche 1.7 txt += ' if [ $middleware == OSG ]; then \n'
1030     txt += ' echo "prepare dummy output file"\n'
1031     txt += ' echo "Processing of job output failed" > $RUNTIME_AREA/'+output_file_num+'\n'
1032     txt += ' fi \n'
1033 slacapra 1.1 txt += 'fi\n'
1034 slacapra 1.105 file_list = []
1035     for fileWithSuffix in (self.output_file):
1036     file_list.append(self.numberFile_(fileWithSuffix, '$NJob'))
1037 ewv 1.131
1038 slacapra 1.105 txt += 'file_list="'+string.join(file_list,' ')+'"\n'
1039 fanzago 1.149 txt += '\n'
1040 fanzago 1.148 txt += 'echo ">>> current directory (SOFTWARE_DIR): $SOFTWARE_DIR" \n'
1041     txt += 'echo ">>> current directory content:"\n'
1042 spiga 1.199 if self.debug_wrapper:
1043     txt += 'ls -Al\n'
1044 fanzago 1.148 txt += '\n'
1045 gutsche 1.7 txt += 'cd $RUNTIME_AREA\n'
1046 fanzago 1.133 txt += 'echo ">>> current directory (RUNTIME_AREA): $RUNTIME_AREA"\n'
1047 slacapra 1.1 return txt
1048    
1049     def numberFile_(self, file, txt):
1050     """
1051     append _'txt' before last extension of a file
1052     """
1053     p = string.split(file,".")
1054     # take away last extension
1055     name = p[0]
1056     for x in p[1:-1]:
1057 slacapra 1.90 name=name+"."+x
1058 slacapra 1.1 # add "_txt"
1059     if len(p)>1:
1060 slacapra 1.90 ext = p[len(p)-1]
1061     result = name + '_' + txt + "." + ext
1062 slacapra 1.1 else:
1063 slacapra 1.90 result = name + '_' + txt
1064 ewv 1.131
1065 slacapra 1.1 return result
1066    
1067 slacapra 1.63 def getRequirements(self, nj=[]):
1068 slacapra 1.1 """
1069 ewv 1.131 return job requirements to add to jdl files
1070 slacapra 1.1 """
1071     req = ''
1072 slacapra 1.47 if self.version:
1073 slacapra 1.10 req='Member("VO-cms-' + \
1074 slacapra 1.47 self.version + \
1075 slacapra 1.10 '", other.GlueHostApplicationSoftwareRunTimeEnvironment)'
1076 ewv 1.192 if self.executable_arch:
1077 gutsche 1.107 req+=' && Member("VO-cms-' + \
1078 slacapra 1.105 self.executable_arch + \
1079     '", other.GlueHostApplicationSoftwareRunTimeEnvironment)'
1080 gutsche 1.35
1081     req = req + ' && (other.GlueHostNetworkAdapterOutboundIP)'
1082 afanfani 1.158 if common.scheduler.name() == "glitecoll":
1083     req += ' && other.GlueCEStateStatus == "Production" '
1084 gutsche 1.35
1085 slacapra 1.1 return req
1086 gutsche 1.3
1087     def configFilename(self):
1088     """ return the config filename """
1089 ewv 1.182 # FUTURE: Can remove cfg mode for CMSSW >= 2_1_x
1090 ewv 1.184 if (self.CMSSW_major >= 2 and self.CMSSW_minor >= 1) or (self.CMSSW_major >= 3):
1091 ewv 1.182 return self.name()+'.py'
1092     else:
1093     return self.name()+'.cfg'
1094 gutsche 1.3
1095     def wsSetupCMSOSGEnvironment_(self):
1096     """
1097     Returns part of a job script which is prepares
1098     the execution environment and which is common for all CMS jobs.
1099     """
1100 ewv 1.160 txt = '\n#Written by cms_cmssw::wsSetupCMSOSGEnvironment_\n'
1101     txt += ' echo ">>> setup CMS OSG environment:"\n'
1102 fanzago 1.133 txt += ' echo "set SCRAM ARCH to ' + self.executable_arch + '"\n'
1103     txt += ' export SCRAM_ARCH='+self.executable_arch+'\n'
1104 fanzago 1.136 txt += ' echo "SCRAM_ARCH = $SCRAM_ARCH"\n'
1105 ewv 1.135 txt += ' if [ -f $OSG_APP/cmssoft/cms/cmsset_default.sh ] ;then\n'
1106 mkirn 1.40 txt += ' # Use $OSG_APP/cmssoft/cms/cmsset_default.sh to setup cms software\n'
1107 fanzago 1.133 txt += ' source $OSG_APP/cmssoft/cms/cmsset_default.sh '+self.version+'\n'
1108     txt += ' else\n'
1109 fanzago 1.161 txt += ' echo "ERROR ==> $OSG_APP/cmssoft/cms/cmsset_default.sh file not found"\n'
1110     txt += ' job_exit_code=10020\n'
1111     txt += ' func_exit\n'
1112 fanzago 1.133 txt += ' fi\n'
1113 gutsche 1.3 txt += '\n'
1114 fanzago 1.161 txt += ' echo "==> setup cms environment ok"\n'
1115 fanzago 1.136 txt += ' echo "SCRAM_ARCH = $SCRAM_ARCH"\n'
1116 gutsche 1.3
1117     return txt
1118 ewv 1.131
1119 gutsche 1.3 def wsSetupCMSLCGEnvironment_(self):
1120     """
1121     Returns part of a job script which is prepares
1122     the execution environment and which is common for all CMS jobs.
1123     """
1124 ewv 1.160 txt = '\n#Written by cms_cmssw::wsSetupCMSLCGEnvironment_\n'
1125     txt += ' echo ">>> setup CMS LCG environment:"\n'
1126 fanzago 1.133 txt += ' echo "set SCRAM ARCH and BUILD_ARCH to ' + self.executable_arch + ' ###"\n'
1127     txt += ' export SCRAM_ARCH='+self.executable_arch+'\n'
1128     txt += ' export BUILD_ARCH='+self.executable_arch+'\n'
1129     txt += ' if [ ! $VO_CMS_SW_DIR ] ;then\n'
1130 fanzago 1.161 txt += ' echo "ERROR ==> CMS software dir not found on WN `hostname`"\n'
1131     txt += ' job_exit_code=10031\n'
1132     txt += ' func_exit\n'
1133 fanzago 1.133 txt += ' else\n'
1134     txt += ' echo "Sourcing environment... "\n'
1135     txt += ' if [ ! -s $VO_CMS_SW_DIR/cmsset_default.sh ] ;then\n'
1136 fanzago 1.161 txt += ' echo "ERROR ==> cmsset_default.sh file not found into dir $VO_CMS_SW_DIR"\n'
1137     txt += ' job_exit_code=10020\n'
1138     txt += ' func_exit\n'
1139 fanzago 1.133 txt += ' fi\n'
1140     txt += ' echo "sourcing $VO_CMS_SW_DIR/cmsset_default.sh"\n'
1141     txt += ' source $VO_CMS_SW_DIR/cmsset_default.sh\n'
1142     txt += ' result=$?\n'
1143     txt += ' if [ $result -ne 0 ]; then\n'
1144 fanzago 1.161 txt += ' echo "ERROR ==> problem sourcing $VO_CMS_SW_DIR/cmsset_default.sh"\n'
1145     txt += ' job_exit_code=10032\n'
1146     txt += ' func_exit\n'
1147 fanzago 1.133 txt += ' fi\n'
1148     txt += ' fi\n'
1149     txt += ' \n'
1150 fanzago 1.161 txt += ' echo "==> setup cms environment ok"\n'
1151 gutsche 1.3 return txt
1152 gutsche 1.5
1153 fanzago 1.93 def modifyReport(self, nj):
1154     """
1155 ewv 1.131 insert the part of the script that modifies the FrameworkJob Report
1156 fanzago 1.93 """
1157 ewv 1.160 txt = '\n#Written by cms_cmssw::modifyReport\n'
1158 slacapra 1.176 publish_data = int(self.cfg_params.get('USER.publish_data',0))
1159 ewv 1.131 if (publish_data == 1):
1160 fanzago 1.94 processedDataset = self.cfg_params['USER.publish_data_name']
1161 fanzago 1.173 LFNBaseName = LFNBase(processedDataset)
1162 fanzago 1.175
1163     txt += 'if [ $copy_exit_status -eq 0 ]; then\n'
1164 fanzago 1.173 txt += ' FOR_LFN=%s_${PSETHASH}/\n'%(LFNBaseName)
1165 fanzago 1.175 txt += 'else\n'
1166     txt += ' FOR_LFN=/copy_problems/ \n'
1167     txt += ' SE=""\n'
1168     txt += ' SE_PATH=""\n'
1169     txt += 'fi\n'
1170 ewv 1.182
1171 fanzago 1.175 txt += 'echo ">>> Modify Job Report:" \n'
1172     txt += 'chmod a+x $SOFTWARE_DIR/ProdCommon/ProdCommon/FwkJobRep/ModifyJobReport.py\n'
1173     txt += 'ProcessedDataset='+processedDataset+'\n'
1174     txt += 'echo "ProcessedDataset = $ProcessedDataset"\n'
1175     txt += 'echo "SE = $SE"\n'
1176     txt += 'echo "SE_PATH = $SE_PATH"\n'
1177     txt += 'echo "FOR_LFN = $FOR_LFN" \n'
1178     txt += 'echo "CMSSW_VERSION = $CMSSW_VERSION"\n\n'
1179     txt += 'echo "$SOFTWARE_DIR/ProdCommon/ProdCommon/FwkJobRep/ModifyJobReport.py $RUNTIME_AREA/crab_fjr_$NJob.xml $NJob $FOR_LFN $PrimaryDataset $DataTier $ProcessedDataset $ApplicationFamily $executable $CMSSW_VERSION $PSETHASH $SE $SE_PATH"\n'
1180     txt += '$SOFTWARE_DIR/ProdCommon/ProdCommon/FwkJobRep/ModifyJobReport.py $RUNTIME_AREA/crab_fjr_$NJob.xml $NJob $FOR_LFN $PrimaryDataset $DataTier $ProcessedDataset $ApplicationFamily $executable $CMSSW_VERSION $PSETHASH $SE $SE_PATH\n'
1181     txt += 'modifyReport_result=$?\n'
1182     txt += 'if [ $modifyReport_result -ne 0 ]; then\n'
1183     txt += ' modifyReport_result=70500\n'
1184     txt += ' job_exit_code=$modifyReport_result\n'
1185     txt += ' echo "ModifyReportResult=$modifyReport_result" | tee -a $RUNTIME_AREA/$repo\n'
1186     txt += ' echo "WARNING: Problem with ModifyJobReport"\n'
1187     txt += 'else\n'
1188     txt += ' mv NewFrameworkJobReport.xml $RUNTIME_AREA/crab_fjr_$NJob.xml\n'
1189 spiga 1.103 txt += 'fi\n'
1190 fanzago 1.93 return txt
1191 fanzago 1.99
1192 ewv 1.192 def wsParseFJR(self):
1193 spiga 1.189 """
1194 ewv 1.192 Parse the FrameworkJobReport to obtain useful infos
1195 spiga 1.189 """
1196     txt = '\n#Written by cms_cmssw::wsParseFJR\n'
1197     txt += 'echo ">>> Parse FrameworkJobReport crab_fjr.xml"\n'
1198     txt += 'if [ -s $RUNTIME_AREA/crab_fjr_$NJob.xml ]; then\n'
1199     txt += ' if [ -s $RUNTIME_AREA/parseCrabFjr.py ]; then\n'
1200 spiga 1.197 txt += ' cmd_out=`python $RUNTIME_AREA/parseCrabFjr.py --input $RUNTIME_AREA/crab_fjr_$NJob.xml --dashboard $MonitorID,$MonitorJobID '+self.debugWrap+'`\n'
1201     if self.debug_wrapper :
1202     txt += ' echo "Result of parsing the FrameworkJobReport crab_fjr.xml: $cmd_out"\n'
1203     txt += ' executable_exit_status=`python $RUNTIME_AREA/parseCrabFjr.py --input $RUNTIME_AREA/crab_fjr_$NJob.xml --exitcode`\n'
1204 spiga 1.189 txt += ' if [ $executable_exit_status -eq 50115 ];then\n'
1205     txt += ' echo ">>> crab_fjr.xml contents: "\n'
1206     txt += ' cat $RUNTIME_AREA/crab_fjr_NJob.xml\n'
1207     txt += ' echo "Wrong FrameworkJobReport --> does not contain useful info. ExitStatus: $executable_exit_status"\n'
1208 spiga 1.197 txt += ' elif [ $executable_exit_status -eq -999 ];then\n'
1209     txt += ' echo "ExitStatus from FrameworkJobReport not available. not available. Using exit code of executable from command line."\n'
1210 spiga 1.189 txt += ' else\n'
1211     txt += ' echo "Extracted ExitStatus from FrameworkJobReport parsing output: $executable_exit_status"\n'
1212     txt += ' fi\n'
1213     txt += ' else\n'
1214     txt += ' echo "CRAB python script to parse CRAB FrameworkJobReport crab_fjr.xml is not available, using exit code of executable from command line."\n'
1215     txt += ' fi\n'
1216     #### Patch to check input data reading for CMSSW16x Hopefully we-ll remove it asap
1217    
1218     if self.datasetPath:
1219     # VERIFY PROCESSED DATA
1220     txt += ' if [ $executable_exit_status -eq 0 ];then\n'
1221     txt += ' echo ">>> Verify list of processed files:"\n'
1222 ewv 1.196 txt += ' echo $InputFiles |tr -d \'\\\\\' |tr \',\' \'\\n\'|tr -d \'"\' > input-files.txt\n'
1223 spiga 1.197 txt += ' `python $RUNTIME_AREA/parseCrabFjr.py --input $RUNTIME_AREA/crab_fjr_$NJob.xml --lfn` > processed-files.txt\n'
1224 spiga 1.189 txt += ' cat input-files.txt | sort | uniq > tmp.txt\n'
1225     txt += ' mv tmp.txt input-files.txt\n'
1226     txt += ' echo "cat input-files.txt"\n'
1227     txt += ' echo "----------------------"\n'
1228     txt += ' cat input-files.txt\n'
1229     txt += ' cat processed-files.txt | sort | uniq > tmp.txt\n'
1230     txt += ' mv tmp.txt processed-files.txt\n'
1231     txt += ' echo "----------------------"\n'
1232     txt += ' echo "cat processed-files.txt"\n'
1233     txt += ' echo "----------------------"\n'
1234     txt += ' cat processed-files.txt\n'
1235     txt += ' echo "----------------------"\n'
1236     txt += ' diff -q input-files.txt processed-files.txt\n'
1237     txt += ' fileverify_status=$?\n'
1238     txt += ' if [ $fileverify_status -ne 0 ]; then\n'
1239     txt += ' executable_exit_status=30001\n'
1240     txt += ' echo "ERROR ==> not all input files processed"\n'
1241     txt += ' echo " ==> list of processed files from crab_fjr.xml differs from list in pset.cfg"\n'
1242     txt += ' echo " ==> diff input-files.txt processed-files.txt"\n'
1243     txt += ' fi\n'
1244     txt += ' fi\n'
1245     txt += '\n'
1246     txt += 'else\n'
1247     txt += ' echo "CRAB FrameworkJobReport crab_fjr.xml is not available, using exit code of executable from command line."\n'
1248     txt += 'fi\n'
1249     txt += '\n'
1250     txt += 'echo "ExeExitCode=$executable_exit_status" | tee -a $RUNTIME_AREA/$repo\n'
1251     txt += 'echo "EXECUTABLE_EXIT_STATUS = $executable_exit_status"\n'
1252     txt += 'job_exit_code=$executable_exit_status\n'
1253    
1254     return txt
1255    
1256 gutsche 1.5 def setParam_(self, param, value):
1257     self._params[param] = value
1258    
1259     def getParams(self):
1260     return self._params
1261 gutsche 1.8
1262 gutsche 1.35 def uniquelist(self, old):
1263     """
1264     remove duplicates from a list
1265     """
1266     nd={}
1267     for e in old:
1268     nd[e]=0
1269     return nd.keys()
1270 mcinquil 1.121
1271 spiga 1.169 def outList(self):
1272 mcinquil 1.121 """
1273     check the dimension of the output files
1274     """
1275 spiga 1.169 txt = ''
1276     txt += 'echo ">>> list of expected files on output sandbox"\n'
1277 mcinquil 1.121 listOutFiles = []
1278 ewv 1.170 stdout = 'CMSSW_$NJob.stdout'
1279 spiga 1.169 stderr = 'CMSSW_$NJob.stderr'
1280 fanzago 1.148 if (self.return_data == 1):
1281 spiga 1.157 for file in (self.output_file+self.output_file_sandbox):
1282     listOutFiles.append(self.numberFile_(file, '$NJob'))
1283 spiga 1.169 listOutFiles.append(stdout)
1284     listOutFiles.append(stderr)
1285 ewv 1.156 else:
1286 spiga 1.157 for file in (self.output_file_sandbox):
1287     listOutFiles.append(self.numberFile_(file, '$NJob'))
1288 spiga 1.169 listOutFiles.append(stdout)
1289     listOutFiles.append(stderr)
1290 fanzago 1.161 txt += 'echo "output files: '+string.join(listOutFiles,' ')+'"\n'
1291 spiga 1.157 txt += 'filesToCheck="'+string.join(listOutFiles,' ')+'"\n'
1292 spiga 1.169 txt += 'export filesToCheck\n'
1293 ewv 1.170 return txt