ViewVC Help
View File | Revision Log | Show Annotations | Root Listing
root/cvsroot/COMP/CRAB/python/cms_cmssw.py
Revision: 1.187
Committed: Mon May 26 16:53:39 2008 UTC (16 years, 11 months ago) by spiga
Content type: text/x-python
Branch: MAIN
Changes since 1.186: +12 -47 lines
Log Message:
removed some confusing comments

File Contents

# User Rev Content
1 slacapra 1.1 from JobType import JobType
2     from crab_logger import Logger
3     from crab_exceptions import *
4     from crab_util import *
5 fanzago 1.115 from BlackWhiteListParser import BlackWhiteListParser
6 slacapra 1.1 import common
7     import Scram
8 fanzago 1.173 from LFNBaseName import *
9 slacapra 1.1
10 slacapra 1.105 import os, string, glob
11 slacapra 1.1
12     class Cmssw(JobType):
13 mcinquil 1.144 def __init__(self, cfg_params, ncjobs):
14 slacapra 1.1 JobType.__init__(self, 'CMSSW')
15     common.logger.debug(3,'CMSSW::__init__')
16    
17 mcinquil 1.140 self.argsList = []
18 mcinquil 1.144
19 gutsche 1.3 self._params = {}
20     self.cfg_params = cfg_params
21 fanzago 1.115 # init BlackWhiteListParser
22     self.blackWhiteListParser = BlackWhiteListParser(cfg_params)
23    
24 slacapra 1.153 self.MaxTarBallSize = float(self.cfg_params.get('EDG.maxtarballsize',9.5))
25 gutsche 1.72
26 gutsche 1.44 # number of jobs requested to be created, limit obj splitting
27 gutsche 1.38 self.ncjobs = ncjobs
28    
29 slacapra 1.1 log = common.logger
30 ewv 1.131
31 slacapra 1.1 self.scram = Scram.Scram(cfg_params)
32     self.additional_inbox_files = []
33     self.scriptExe = ''
34     self.executable = ''
35 slacapra 1.71 self.executable_arch = self.scram.getArch()
36 slacapra 1.1 self.tgz_name = 'default.tgz'
37 corvo 1.56 self.scriptName = 'CMSSW.sh'
38 spiga 1.187 self.pset = ''
39     self.datasetPath = ''
40 gutsche 1.3
41 gutsche 1.50 # set FJR file name
42     self.fjrFileName = 'crab_fjr.xml'
43    
44 slacapra 1.1 self.version = self.scram.getSWVersion()
45 ewv 1.182 version_array = self.version.split('_')
46 ewv 1.184 self.CMSSW_major = 0
47     self.CMSSW_minor = 0
48     self.CMSSW_patch = 0
49 ewv 1.182 try:
50 ewv 1.184 self.CMSSW_major = int(version_array[1])
51     self.CMSSW_minor = int(version_array[2])
52     self.CMSSW_patch = int(version_array[3])
53 ewv 1.182 except:
54 ewv 1.184 msg = "Cannot parse CMSSW version string: " + self.version + " for major and minor release number!"
55 ewv 1.182 raise CrabException(msg)
56    
57 slacapra 1.1 ### collect Data cards
58 gutsche 1.66
59 slacapra 1.153 if not cfg_params.has_key('CMSSW.datasetpath'):
60 ewv 1.131 msg = "Error: datasetpath not defined "
61 slacapra 1.1 raise CrabException(msg)
62 slacapra 1.153 tmp = cfg_params['CMSSW.datasetpath']
63     log.debug(6, "CMSSW::CMSSW(): datasetPath = "+tmp)
64     if string.lower(tmp)=='none':
65     self.datasetPath = None
66     self.selectNoInput = 1
67     else:
68     self.datasetPath = tmp
69     self.selectNoInput = 0
70 gutsche 1.5
71 slacapra 1.1 self.dataTiers = []
72    
73     ## now the application
74 slacapra 1.153 self.executable = cfg_params.get('CMSSW.executable','cmsRun')
75     log.debug(6, "CMSSW::CMSSW(): executable = "+self.executable)
76 slacapra 1.1
77 slacapra 1.153 if not cfg_params.has_key('CMSSW.pset'):
78 slacapra 1.1 raise CrabException("PSet file missing. Cannot run cmsRun ")
79 slacapra 1.153 self.pset = cfg_params['CMSSW.pset']
80     log.debug(6, "Cmssw::Cmssw(): PSet file = "+self.pset)
81     if self.pset.lower() != 'none' :
82     if (not os.path.exists(self.pset)):
83     raise CrabException("User defined PSet file "+self.pset+" does not exist")
84     else:
85     self.pset = None
86 slacapra 1.1
87     # output files
88 slacapra 1.53 ## stuff which must be returned always via sandbox
89     self.output_file_sandbox = []
90    
91     # add fjr report by default via sandbox
92     self.output_file_sandbox.append(self.fjrFileName)
93    
94     # other output files to be returned via sandbox or copied to SE
95 slacapra 1.153 self.output_file = []
96     tmp = cfg_params.get('CMSSW.output_file',None)
97     if tmp :
98     tmpOutFiles = string.split(tmp,',')
99     log.debug(7, 'cmssw::cmssw(): output files '+str(tmpOutFiles))
100     for tmp in tmpOutFiles:
101     tmp=string.strip(tmp)
102     self.output_file.append(tmp)
103 slacapra 1.1 pass
104 slacapra 1.153 else:
105 gutsche 1.92 log.message("No output file defined: only stdout/err and the CRAB Framework Job Report will be available\n")
106 slacapra 1.153 pass
107 slacapra 1.1
108     # script_exe file as additional file in inputSandbox
109 slacapra 1.153 self.scriptExe = cfg_params.get('USER.script_exe',None)
110     if self.scriptExe :
111 slacapra 1.176 if not os.path.isfile(self.scriptExe):
112     msg ="ERROR. file "+self.scriptExe+" not found"
113     raise CrabException(msg)
114     self.additional_inbox_files.append(string.strip(self.scriptExe))
115 slacapra 1.70
116 spiga 1.42 if self.datasetPath == None and self.pset == None and self.scriptExe == '' :
117 slacapra 1.176 msg ="Error. script_exe not defined"
118     raise CrabException(msg)
119 spiga 1.42
120 slacapra 1.1 ## additional input files
121 slacapra 1.153 if cfg_params.has_key('USER.additional_input_files'):
122 slacapra 1.29 tmpAddFiles = string.split(cfg_params['USER.additional_input_files'],',')
123 slacapra 1.70 for tmp in tmpAddFiles:
124     tmp = string.strip(tmp)
125     dirname = ''
126     if not tmp[0]=="/": dirname = "."
127 corvo 1.85 files = []
128     if string.find(tmp,"*")>-1:
129     files = glob.glob(os.path.join(dirname, tmp))
130     if len(files)==0:
131     raise CrabException("No additional input file found with this pattern: "+tmp)
132     else:
133     files.append(tmp)
134 slacapra 1.70 for file in files:
135     if not os.path.exists(file):
136     raise CrabException("Additional input file not found: "+file)
137 slacapra 1.45 pass
138 slacapra 1.105 self.additional_inbox_files.append(string.strip(file))
139 slacapra 1.1 pass
140     pass
141 slacapra 1.70 common.logger.debug(5,"Additional input files: "+str(self.additional_inbox_files))
142 slacapra 1.153 pass
143 gutsche 1.3
144 slacapra 1.9 ## Events per job
145 slacapra 1.153 if cfg_params.has_key('CMSSW.events_per_job'):
146 slacapra 1.10 self.eventsPerJob =int( cfg_params['CMSSW.events_per_job'])
147 slacapra 1.9 self.selectEventsPerJob = 1
148 slacapra 1.153 else:
149 slacapra 1.9 self.eventsPerJob = -1
150     self.selectEventsPerJob = 0
151 ewv 1.131
152 slacapra 1.22 ## number of jobs
153 slacapra 1.153 if cfg_params.has_key('CMSSW.number_of_jobs'):
154 slacapra 1.22 self.theNumberOfJobs =int( cfg_params['CMSSW.number_of_jobs'])
155     self.selectNumberOfJobs = 1
156 slacapra 1.153 else:
157 slacapra 1.22 self.theNumberOfJobs = 0
158     self.selectNumberOfJobs = 0
159 slacapra 1.10
160 slacapra 1.153 if cfg_params.has_key('CMSSW.total_number_of_events'):
161 gutsche 1.35 self.total_number_of_events = int(cfg_params['CMSSW.total_number_of_events'])
162     self.selectTotalNumberEvents = 1
163 slacapra 1.153 else:
164 gutsche 1.35 self.total_number_of_events = 0
165     self.selectTotalNumberEvents = 0
166    
167 spiga 1.187 if self.pset != None:
168 spiga 1.42 if ( (self.selectTotalNumberEvents + self.selectEventsPerJob + self.selectNumberOfJobs) != 2 ):
169     msg = 'Must define exactly two of total_number_of_events, events_per_job, or number_of_jobs.'
170     raise CrabException(msg)
171     else:
172     if (self.selectNumberOfJobs == 0):
173     msg = 'Must specify number_of_jobs.'
174     raise CrabException(msg)
175 gutsche 1.35
176 ewv 1.160 ## New method of dealing with seeds
177     self.incrementSeeds = []
178     self.preserveSeeds = []
179     if cfg_params.has_key('CMSSW.preserve_seeds'):
180     tmpList = cfg_params['CMSSW.preserve_seeds'].split(',')
181     for tmp in tmpList:
182     tmp.strip()
183     self.preserveSeeds.append(tmp)
184     if cfg_params.has_key('CMSSW.increment_seeds'):
185     tmpList = cfg_params['CMSSW.increment_seeds'].split(',')
186     for tmp in tmpList:
187     tmp.strip()
188     self.incrementSeeds.append(tmp)
189    
190     ## Old method of dealing with seeds
191     ## FUTURE: This is for old CMSSW and old CRAB. Can throw exceptions after a couple of CRAB releases and then
192     ## remove
193 slacapra 1.153 self.sourceSeed = cfg_params.get('CMSSW.pythia_seed',None)
194 ewv 1.160 if self.sourceSeed:
195 slacapra 1.177 print "pythia_seed is a deprecated parameter. Use preserve_seeds or increment_seeds in the future.\n","Added to increment_seeds."
196     self.incrementSeeds.append('sourceSeed')
197 ewv 1.185 self.incrementSeeds.append('theSource')
198 slacapra 1.153
199     self.sourceSeedVtx = cfg_params.get('CMSSW.vtx_seed',None)
200 ewv 1.160 if self.sourceSeedVtx:
201 slacapra 1.177 print "vtx_seed is a deprecated parameter. Use preserve_seeds or increment_seeds in the future.\n","Added to increment_seeds."
202     self.incrementSeeds.append('VtxSmeared')
203 slacapra 1.22
204 slacapra 1.153 self.sourceSeedG4 = cfg_params.get('CMSSW.g4_seed',None)
205 ewv 1.160 if self.sourceSeedG4:
206 slacapra 1.177 print "g4_seed is a deprecated parameter. Use preserve_seeds or increment_seeds in the future.\n","Added to increment_seeds."
207     self.incrementSeeds.append('g4SimHits')
208 slacapra 1.90
209 slacapra 1.153 self.sourceSeedMix = cfg_params.get('CMSSW.mix_seed',None)
210 ewv 1.160 if self.sourceSeedMix:
211 slacapra 1.177 print "mix_seed is a deprecated parameter. Use preserve_seeds or increment_seeds in the future.\n","Added to increment_seeds."
212     self.incrementSeeds.append('mix')
213 slacapra 1.90
214 slacapra 1.153 self.firstRun = cfg_params.get('CMSSW.first_run',None)
215 slacapra 1.90
216 spiga 1.42 if self.pset != None: #CarlosDaniele
217 ewv 1.131 import PsetManipulator as pp
218 slacapra 1.97 PsetEdit = pp.PsetManipulator(self.pset) #Daniele Pset
219 gutsche 1.3
220 ewv 1.147 # Copy/return
221    
222 slacapra 1.153 self.copy_data = int(cfg_params.get('USER.copy_data',0))
223     self.return_data = int(cfg_params.get('USER.return_data',0))
224 ewv 1.147
225 slacapra 1.1 #DBSDLS-start
226 ewv 1.131 ## Initialize the variables that are extracted from DBS/DLS and needed in other places of the code
227 slacapra 1.1 self.maxEvents=0 # max events available ( --> check the requested nb. of evts in Creator.py)
228     self.DBSPaths={} # all dbs paths requested ( --> input to the site local discovery script)
229 gutsche 1.35 self.jobDestination=[] # Site destination(s) for each job (list of lists)
230 slacapra 1.1 ## Perform the data location and discovery (based on DBS/DLS)
231 slacapra 1.9 ## SL: Don't if NONE is specified as input (pythia use case)
232 gutsche 1.35 blockSites = {}
233 slacapra 1.9 if self.datasetPath:
234 gutsche 1.35 blockSites = self.DataDiscoveryAndLocation(cfg_params)
235 ewv 1.131 #DBSDLS-end
236 slacapra 1.1
237 ewv 1.131
238 slacapra 1.9 ## Select Splitting
239 ewv 1.131 if self.selectNoInput:
240 spiga 1.187 if self.pset == None:
241 spiga 1.42 self.jobSplittingForScript()
242     else:
243     self.jobSplittingNoInput()
244 gutsche 1.92 else:
245 corvo 1.56 self.jobSplittingByBlocks(blockSites)
246 gutsche 1.5
247 slacapra 1.22 # modify Pset
248 spiga 1.187 if self.pset != None:
249 slacapra 1.86 try:
250 ewv 1.160 # Add FrameworkJobReport to parameter-set, set max events.
251     # Reset later for data jobs by writeCFG which does all modifications
252 ewv 1.182 PsetEdit.addCrabFJR(self.fjrFileName) # FUTURE: Job report addition not needed by CMSSW>1.5
253 ewv 1.160 PsetEdit.maxEvent(self.eventsPerJob)
254 slacapra 1.90 PsetEdit.psetWriter(self.configFilename())
255 slacapra 1.86 except:
256 ewv 1.184 msg='Error while manipulating ParameterSet: exiting...'
257 slacapra 1.86 raise CrabException(msg)
258 spiga 1.179 self.tgzNameWithPath = self.getTarBall(self.executable)
259 gutsche 1.3
260 slacapra 1.1 def DataDiscoveryAndLocation(self, cfg_params):
261    
262 slacapra 1.86 import DataDiscovery
263     import DataLocation
264 gutsche 1.3 common.logger.debug(10,"CMSSW::DataDiscoveryAndLocation()")
265    
266     datasetPath=self.datasetPath
267    
268 slacapra 1.1 ## Contact the DBS
269 gutsche 1.92 common.logger.message("Contacting Data Discovery Services ...")
270 slacapra 1.1 try:
271 slacapra 1.137 self.pubdata=DataDiscovery.DataDiscovery(datasetPath, cfg_params)
272 slacapra 1.1 self.pubdata.fetchDBSInfo()
273    
274 slacapra 1.41 except DataDiscovery.NotExistingDatasetError, ex :
275 slacapra 1.1 msg = 'ERROR ***: failed Data Discovery in DBS : %s'%ex.getErrorMessage()
276     raise CrabException(msg)
277 slacapra 1.41 except DataDiscovery.NoDataTierinProvenanceError, ex :
278 slacapra 1.1 msg = 'ERROR ***: failed Data Discovery in DBS : %s'%ex.getErrorMessage()
279     raise CrabException(msg)
280 slacapra 1.41 except DataDiscovery.DataDiscoveryError, ex:
281 gutsche 1.66 msg = 'ERROR ***: failed Data Discovery in DBS : %s'%ex.getErrorMessage()
282 slacapra 1.1 raise CrabException(msg)
283    
284 gutsche 1.35 self.filesbyblock=self.pubdata.getFiles()
285 mkirn 1.37 self.eventsbyblock=self.pubdata.getEventsPerBlock()
286     self.eventsbyfile=self.pubdata.getEventsPerFile()
287 gutsche 1.3
288 slacapra 1.1 ## get max number of events
289 spiga 1.187 self.maxEvents=self.pubdata.getMaxEvents()
290 slacapra 1.1
291     ## Contact the DLS and build a list of sites hosting the fileblocks
292     try:
293 slacapra 1.41 dataloc=DataLocation.DataLocation(self.filesbyblock.keys(),cfg_params)
294 gutsche 1.6 dataloc.fetchDLSInfo()
295 slacapra 1.41 except DataLocation.DataLocationError , ex:
296 slacapra 1.1 msg = 'ERROR ***: failed Data Location in DLS \n %s '%ex.getErrorMessage()
297     raise CrabException(msg)
298 ewv 1.131
299 slacapra 1.1
300 gutsche 1.35 sites = dataloc.getSites()
301     allSites = []
302     listSites = sites.values()
303 slacapra 1.63 for listSite in listSites:
304     for oneSite in listSite:
305 gutsche 1.35 allSites.append(oneSite)
306     allSites = self.uniquelist(allSites)
307 gutsche 1.3
308 gutsche 1.92 # screen output
309     common.logger.message("Requested dataset: " + datasetPath + " has " + str(self.maxEvents) + " events in " + str(len(self.filesbyblock.keys())) + " blocks.\n")
310    
311 gutsche 1.35 return sites
312 ewv 1.131
313 gutsche 1.35 def jobSplittingByBlocks(self, blockSites):
314 slacapra 1.9 """
315 gutsche 1.35 Perform job splitting. Jobs run over an integer number of files
316     and no more than one block.
317     ARGUMENT: blockSites: dictionary with blocks as keys and list of host sites as values
318     REQUIRES: self.selectTotalNumberEvents, self.selectEventsPerJob, self.selectNumberofJobs,
319     self.total_number_of_events, self.eventsPerJob, self.theNumberOfJobs,
320     self.maxEvents, self.filesbyblock
321     SETS: self.jobDestination - Site destination(s) for each job (a list of lists)
322     self.total_number_of_jobs - Total # of jobs
323     self.list_of_args - File(s) job will run on (a list of lists)
324     """
325    
326     # ---- Handle the possible job splitting configurations ---- #
327     if (self.selectTotalNumberEvents):
328     totalEventsRequested = self.total_number_of_events
329     if (self.selectEventsPerJob):
330     eventsPerJobRequested = self.eventsPerJob
331     if (self.selectNumberOfJobs):
332     totalEventsRequested = self.theNumberOfJobs * self.eventsPerJob
333    
334     # If user requested all the events in the dataset
335     if (totalEventsRequested == -1):
336     eventsRemaining=self.maxEvents
337     # If user requested more events than are in the dataset
338     elif (totalEventsRequested > self.maxEvents):
339     eventsRemaining = self.maxEvents
340     common.logger.message("Requested "+str(self.total_number_of_events)+ " events, but only "+str(self.maxEvents)+" events are available.")
341     # If user requested less events than are in the dataset
342     else:
343     eventsRemaining = totalEventsRequested
344 slacapra 1.22
345 slacapra 1.41 # If user requested more events per job than are in the dataset
346     if (self.selectEventsPerJob and eventsPerJobRequested > self.maxEvents):
347     eventsPerJobRequested = self.maxEvents
348    
349 gutsche 1.35 # For user info at end
350     totalEventCount = 0
351 gutsche 1.3
352 gutsche 1.35 if (self.selectTotalNumberEvents and self.selectNumberOfJobs):
353     eventsPerJobRequested = int(eventsRemaining/self.theNumberOfJobs)
354 slacapra 1.22
355 gutsche 1.35 if (self.selectNumberOfJobs):
356     common.logger.message("May not create the exact number_of_jobs requested.")
357 slacapra 1.23
358 gutsche 1.38 if ( self.ncjobs == 'all' ) :
359     totalNumberOfJobs = 999999999
360     else :
361     totalNumberOfJobs = self.ncjobs
362 ewv 1.131
363 gutsche 1.35 blocks = blockSites.keys()
364     blockCount = 0
365     # Backup variable in case self.maxEvents counted events in a non-included block
366     numBlocksInDataset = len(blocks)
367 gutsche 1.3
368 gutsche 1.35 jobCount = 0
369     list_of_lists = []
370 gutsche 1.3
371 gutsche 1.92 # list tracking which jobs are in which jobs belong to which block
372     jobsOfBlock = {}
373    
374 gutsche 1.35 # ---- Iterate over the blocks in the dataset until ---- #
375     # ---- we've met the requested total # of events ---- #
376 gutsche 1.38 while ( (eventsRemaining > 0) and (blockCount < numBlocksInDataset) and (jobCount < totalNumberOfJobs)):
377 gutsche 1.35 block = blocks[blockCount]
378 gutsche 1.44 blockCount += 1
379 gutsche 1.104 if block not in jobsOfBlock.keys() :
380     jobsOfBlock[block] = []
381 ewv 1.131
382 gutsche 1.68 if self.eventsbyblock.has_key(block) :
383     numEventsInBlock = self.eventsbyblock[block]
384     common.logger.debug(5,'Events in Block File '+str(numEventsInBlock))
385 ewv 1.131
386 gutsche 1.68 files = self.filesbyblock[block]
387     numFilesInBlock = len(files)
388     if (numFilesInBlock <= 0):
389     continue
390     fileCount = 0
391    
392     # ---- New block => New job ---- #
393 ewv 1.131 parString = ""
394 gutsche 1.68 # counter for number of events in files currently worked on
395     filesEventCount = 0
396     # flag if next while loop should touch new file
397     newFile = 1
398     # job event counter
399     jobSkipEventCount = 0
400 ewv 1.131
401 gutsche 1.68 # ---- Iterate over the files in the block until we've met the requested ---- #
402     # ---- total # of events or we've gone over all the files in this block ---- #
403     while ( (eventsRemaining > 0) and (fileCount < numFilesInBlock) and (jobCount < totalNumberOfJobs) ):
404     file = files[fileCount]
405     if newFile :
406     try:
407     numEventsInFile = self.eventsbyfile[file]
408     common.logger.debug(6, "File "+str(file)+" has "+str(numEventsInFile)+" events")
409     # increase filesEventCount
410     filesEventCount += numEventsInFile
411     # Add file to current job
412     parString += '\\\"' + file + '\\\"\,'
413     newFile = 0
414     except KeyError:
415     common.logger.message("File "+str(file)+" has unknown number of events: skipping")
416 ewv 1.131
417 slacapra 1.177 eventsPerJobRequested = min(eventsPerJobRequested, eventsRemaining)
418 gutsche 1.68 # if less events in file remain than eventsPerJobRequested
419 slacapra 1.177 if ( filesEventCount - jobSkipEventCount < eventsPerJobRequested):
420 gutsche 1.68 # if last file in block
421     if ( fileCount == numFilesInBlock-1 ) :
422     # end job using last file, use remaining events in block
423     # close job and touch new file
424     fullString = parString[:-2]
425     list_of_lists.append([fullString,str(-1),str(jobSkipEventCount)])
426     common.logger.debug(3,"Job "+str(jobCount+1)+" can run over "+str(filesEventCount - jobSkipEventCount)+" events (last file in block).")
427     self.jobDestination.append(blockSites[block])
428     common.logger.debug(5,"Job "+str(jobCount+1)+" Destination: "+str(self.jobDestination[jobCount]))
429 gutsche 1.92 # fill jobs of block dictionary
430 gutsche 1.104 jobsOfBlock[block].append(jobCount+1)
431 gutsche 1.68 # reset counter
432     jobCount = jobCount + 1
433     totalEventCount = totalEventCount + filesEventCount - jobSkipEventCount
434     eventsRemaining = eventsRemaining - filesEventCount + jobSkipEventCount
435     jobSkipEventCount = 0
436     # reset file
437 ewv 1.131 parString = ""
438 gutsche 1.68 filesEventCount = 0
439     newFile = 1
440     fileCount += 1
441     else :
442     # go to next file
443     newFile = 1
444     fileCount += 1
445     # if events in file equal to eventsPerJobRequested
446     elif ( filesEventCount - jobSkipEventCount == eventsPerJobRequested ) :
447 gutsche 1.38 # close job and touch new file
448     fullString = parString[:-2]
449 gutsche 1.68 list_of_lists.append([fullString,str(eventsPerJobRequested),str(jobSkipEventCount)])
450     common.logger.debug(3,"Job "+str(jobCount+1)+" can run over "+str(eventsPerJobRequested)+" events.")
451 gutsche 1.38 self.jobDestination.append(blockSites[block])
452     common.logger.debug(5,"Job "+str(jobCount+1)+" Destination: "+str(self.jobDestination[jobCount]))
453 gutsche 1.104 jobsOfBlock[block].append(jobCount+1)
454 gutsche 1.38 # reset counter
455     jobCount = jobCount + 1
456 gutsche 1.68 totalEventCount = totalEventCount + eventsPerJobRequested
457     eventsRemaining = eventsRemaining - eventsPerJobRequested
458 gutsche 1.38 jobSkipEventCount = 0
459     # reset file
460 ewv 1.131 parString = ""
461 gutsche 1.38 filesEventCount = 0
462     newFile = 1
463     fileCount += 1
464 ewv 1.131
465 gutsche 1.68 # if more events in file remain than eventsPerJobRequested
466 gutsche 1.38 else :
467 gutsche 1.68 # close job but don't touch new file
468     fullString = parString[:-2]
469     list_of_lists.append([fullString,str(eventsPerJobRequested),str(jobSkipEventCount)])
470     common.logger.debug(3,"Job "+str(jobCount+1)+" can run over "+str(eventsPerJobRequested)+" events.")
471     self.jobDestination.append(blockSites[block])
472     common.logger.debug(5,"Job "+str(jobCount+1)+" Destination: "+str(self.jobDestination[jobCount]))
473 gutsche 1.104 jobsOfBlock[block].append(jobCount+1)
474 gutsche 1.68 # increase counter
475     jobCount = jobCount + 1
476     totalEventCount = totalEventCount + eventsPerJobRequested
477     eventsRemaining = eventsRemaining - eventsPerJobRequested
478     # calculate skip events for last file
479     # use filesEventCount (contains several files), jobSkipEventCount and eventsPerJobRequest
480     jobSkipEventCount = eventsPerJobRequested - (filesEventCount - jobSkipEventCount - self.eventsbyfile[file])
481     # remove all but the last file
482     filesEventCount = self.eventsbyfile[file]
483 ewv 1.160 parString = '\\\"' + file + '\\\"\,'
484 gutsche 1.68 pass # END if
485     pass # END while (iterate over files in the block)
486 gutsche 1.35 pass # END while (iterate over blocks in the dataset)
487 slacapra 1.41 self.ncjobs = self.total_number_of_jobs = jobCount
488 gutsche 1.38 if (eventsRemaining > 0 and jobCount < totalNumberOfJobs ):
489 gutsche 1.35 common.logger.message("Could not run on all requested events because some blocks not hosted at allowed sites.")
490 gutsche 1.92 common.logger.message(str(jobCount)+" job(s) can run on "+str(totalEventCount)+" events.\n")
491 ewv 1.131
492 gutsche 1.92 # screen output
493     screenOutput = "List of jobs and available destination sites:\n\n"
494    
495 mcinquil 1.124 # keep trace of block with no sites to print a warning at the end
496     noSiteBlock = []
497     bloskNoSite = []
498    
499 gutsche 1.92 blockCounter = 0
500 gutsche 1.104 for block in blocks:
501     if block in jobsOfBlock.keys() :
502     blockCounter += 1
503 slacapra 1.176 screenOutput += "Block %5i: jobs %20s: sites: %s\n" % (blockCounter,spanRanges(jobsOfBlock[block]),
504     ','.join(self.blackWhiteListParser.checkWhiteList(self.blackWhiteListParser.checkBlackList(blockSites[block],block),block)))
505 mcinquil 1.124 if len(self.blackWhiteListParser.checkWhiteList(self.blackWhiteListParser.checkBlackList(blockSites[block],block),block)) == 0:
506 ewv 1.131 noSiteBlock.append( spanRanges(jobsOfBlock[block]) )
507 mcinquil 1.124 bloskNoSite.append( blockCounter )
508 ewv 1.131
509 mcinquil 1.124 common.logger.message(screenOutput)
510 fanzago 1.127 if len(noSiteBlock) > 0 and len(bloskNoSite) > 0:
511 mcinquil 1.126 msg = 'WARNING: No sites are hosting any part of data for block:\n '
512     virgola = ""
513     if len(bloskNoSite) > 1:
514     virgola = ","
515     for block in bloskNoSite:
516     msg += ' ' + str(block) + virgola
517     msg += '\n Related jobs:\n '
518     virgola = ""
519     if len(noSiteBlock) > 1:
520     virgola = ","
521     for range_jobs in noSiteBlock:
522     msg += str(range_jobs) + virgola
523     msg += '\n will not be submitted and this block of data can not be analyzed!\n'
524 slacapra 1.155 if self.cfg_params.has_key('EDG.se_white_list'):
525     msg += 'WARNING: SE White List: '+self.cfg_params['EDG.se_white_list']+'\n'
526     msg += '(Hint: By whitelisting you force the job to run at this particular site(s).\n'
527     msg += 'Please check if the dataset is available at this site!)\n'
528     if self.cfg_params.has_key('EDG.ce_white_list'):
529     msg += 'WARNING: CE White List: '+self.cfg_params['EDG.ce_white_list']+'\n'
530     msg += '(Hint: By whitelisting you force the job to run at this particular site(s).\n'
531     msg += 'Please check if the dataset is available at this site!)\n'
532    
533 mcinquil 1.126 common.logger.message(msg)
534 gutsche 1.92
535 slacapra 1.9 self.list_of_args = list_of_lists
536     return
537    
538 slacapra 1.21 def jobSplittingNoInput(self):
539 slacapra 1.9 """
540     Perform job splitting based on number of event per job
541     """
542     common.logger.debug(5,'Splitting per events')
543 fanzago 1.130
544 ewv 1.131 if (self.selectEventsPerJob):
545 fanzago 1.130 common.logger.message('Required '+str(self.eventsPerJob)+' events per job ')
546     if (self.selectNumberOfJobs):
547     common.logger.message('Required '+str(self.theNumberOfJobs)+' jobs in total ')
548     if (self.selectTotalNumberEvents):
549     common.logger.message('Required '+str(self.total_number_of_events)+' events in total ')
550 slacapra 1.9
551 slacapra 1.10 if (self.total_number_of_events < 0):
552     msg='Cannot split jobs per Events with "-1" as total number of events'
553     raise CrabException(msg)
554    
555 slacapra 1.22 if (self.selectEventsPerJob):
556 spiga 1.65 if (self.selectTotalNumberEvents):
557     self.total_number_of_jobs = int(self.total_number_of_events/self.eventsPerJob)
558 ewv 1.131 elif(self.selectNumberOfJobs) :
559 spiga 1.65 self.total_number_of_jobs =self.theNumberOfJobs
560 ewv 1.131 self.total_number_of_events =int(self.theNumberOfJobs*self.eventsPerJob)
561 spiga 1.65
562 slacapra 1.22 elif (self.selectNumberOfJobs) :
563     self.total_number_of_jobs = self.theNumberOfJobs
564     self.eventsPerJob = int(self.total_number_of_events/self.total_number_of_jobs)
565 ewv 1.131
566 slacapra 1.9 common.logger.debug(5,'N jobs '+str(self.total_number_of_jobs))
567    
568     # is there any remainder?
569     check = int(self.total_number_of_events) - (int(self.total_number_of_jobs)*self.eventsPerJob)
570    
571     common.logger.debug(5,'Check '+str(check))
572    
573 gutsche 1.35 common.logger.message(str(self.total_number_of_jobs)+' jobs can be created, each for '+str(self.eventsPerJob)+' for a total of '+str(self.total_number_of_jobs*self.eventsPerJob)+' events')
574 slacapra 1.9 if check > 0:
575 gutsche 1.35 common.logger.message('Warning: asked '+str(self.total_number_of_events)+' but can do only '+str(int(self.total_number_of_jobs)*self.eventsPerJob))
576 slacapra 1.9
577 slacapra 1.10 # argument is seed number.$i
578 slacapra 1.9 self.list_of_args = []
579     for i in range(self.total_number_of_jobs):
580 gutsche 1.35 ## Since there is no input, any site is good
581 ewv 1.131 self.jobDestination.append([""]) #must be empty to write correctly the xml
582 slacapra 1.90 args=[]
583 spiga 1.57 if (self.firstRun):
584 slacapra 1.138 ## pythia first run
585 slacapra 1.90 args.append(str(self.firstRun)+str(i))
586     self.list_of_args.append(args)
587 ewv 1.131
588 gutsche 1.3 return
589    
590 spiga 1.42
591 spiga 1.187 def jobSplittingForScript(self):
592 spiga 1.42 """
593     Perform job splitting based on number of job
594     """
595     common.logger.debug(5,'Splitting per job')
596     common.logger.message('Required '+str(self.theNumberOfJobs)+' jobs in total ')
597    
598     self.total_number_of_jobs = self.theNumberOfJobs
599    
600     common.logger.debug(5,'N jobs '+str(self.total_number_of_jobs))
601    
602     common.logger.message(str(self.total_number_of_jobs)+' jobs can be created')
603    
604     # argument is seed number.$i
605     self.list_of_args = []
606     for i in range(self.total_number_of_jobs):
607     self.jobDestination.append([""])
608     self.list_of_args.append([str(i)])
609     return
610    
611 gutsche 1.3 def split(self, jobParams):
612 ewv 1.131
613 gutsche 1.3 njobs = self.total_number_of_jobs
614 slacapra 1.9 arglist = self.list_of_args
615 gutsche 1.3 # create the empty structure
616     for i in range(njobs):
617     jobParams.append("")
618 ewv 1.131
619 spiga 1.165 listID=[]
620     listField=[]
621 gutsche 1.3 for job in range(njobs):
622 slacapra 1.17 jobParams[job] = arglist[job]
623 spiga 1.167 listID.append(job+1)
624 spiga 1.162 job_ToSave ={}
625 spiga 1.169 concString = ' '
626 spiga 1.165 argu=''
627     if len(jobParams[job]):
628     argu += concString.join(jobParams[job] )
629 spiga 1.187 job_ToSave['arguments']= str(job+1)+' '+argu
630     job_ToSave['dlsDestination']= self.jobDestination[job]
631 spiga 1.165 listField.append(job_ToSave)
632 spiga 1.169 msg="Job "+str(job)+" Arguments: "+str(job+1)+" "+argu+"\n" \
633 spiga 1.165 +" Destination: "+str(self.jobDestination[job])
634     common.logger.debug(5,msg)
635 spiga 1.187 common._db.updateJob_(listID,listField)
636 spiga 1.181 self.argsList = (len(jobParams[0])+1)
637 gutsche 1.3
638     return
639 ewv 1.131
640 gutsche 1.3 def numberOfJobs(self):
641     return self.total_number_of_jobs
642    
643 slacapra 1.1 def getTarBall(self, exe):
644     """
645     Return the TarBall with lib and exe
646     """
647 corvo 1.56 self.tgzNameWithPath = common.work_space.pathForTgz()+'share/'+self.tgz_name
648 slacapra 1.1 if os.path.exists(self.tgzNameWithPath):
649     return self.tgzNameWithPath
650    
651     # Prepare a tar gzipped file with user binaries.
652     self.buildTar_(exe)
653    
654     return string.strip(self.tgzNameWithPath)
655    
656     def buildTar_(self, executable):
657    
658     # First of all declare the user Scram area
659     swArea = self.scram.getSWArea_()
660     swReleaseTop = self.scram.getReleaseTop_()
661 ewv 1.131
662 slacapra 1.1 ## check if working area is release top
663     if swReleaseTop == '' or swArea == swReleaseTop:
664 afanfani 1.172 common.logger.debug(3,"swArea = "+swArea+" swReleaseTop ="+swReleaseTop)
665 slacapra 1.1 return
666    
667 slacapra 1.61 import tarfile
668     try: # create tar ball
669     tar = tarfile.open(self.tgzNameWithPath, "w:gz")
670     ## First find the executable
671 slacapra 1.86 if (self.executable != ''):
672 slacapra 1.61 exeWithPath = self.scram.findFile_(executable)
673     if ( not exeWithPath ):
674     raise CrabException('User executable '+executable+' not found')
675 ewv 1.131
676 slacapra 1.61 ## then check if it's private or not
677     if exeWithPath.find(swReleaseTop) == -1:
678     # the exe is private, so we must ship
679     common.logger.debug(5,"Exe "+exeWithPath+" to be tarred")
680     path = swArea+'/'
681 corvo 1.85 # distinguish case when script is in user project area or given by full path somewhere else
682     if exeWithPath.find(path) >= 0 :
683     exe = string.replace(exeWithPath, path,'')
684 slacapra 1.129 tar.add(path+exe,exe)
685 corvo 1.85 else :
686     tar.add(exeWithPath,os.path.basename(executable))
687 slacapra 1.61 pass
688     else:
689     # the exe is from release, we'll find it on WN
690     pass
691 ewv 1.131
692 slacapra 1.61 ## Now get the libraries: only those in local working area
693     libDir = 'lib'
694     lib = swArea+'/' +libDir
695     common.logger.debug(5,"lib "+lib+" to be tarred")
696     if os.path.exists(lib):
697     tar.add(lib,libDir)
698 ewv 1.131
699 slacapra 1.61 ## Now check if module dir is present
700     moduleDir = 'module'
701     module = swArea + '/' + moduleDir
702     if os.path.isdir(module):
703     tar.add(module,moduleDir)
704    
705     ## Now check if any data dir(s) is present
706     swAreaLen=len(swArea)
707 spiga 1.179 self.dataExist = False
708 slacapra 1.61 for root, dirs, files in os.walk(swArea):
709     if "data" in dirs:
710 spiga 1.179 self.dataExist=True
711 slacapra 1.61 common.logger.debug(5,"data "+root+"/data"+" to be tarred")
712     tar.add(root+"/data",root[swAreaLen:]+"/data")
713 ewv 1.182
714 spiga 1.179 ### CMSSW ParameterSet
715     if not self.pset is None:
716     cfg_file = common.work_space.jobDir()+self.configFilename()
717 ewv 1.182 tar.add(cfg_file,self.configFilename())
718 spiga 1.179 common.logger.debug(5,"File added to "+self.tgzNameWithPath+" : "+str(tar.getnames()))
719 slacapra 1.61
720 fanzago 1.93
721 fanzago 1.152 ## Add ProdCommon dir to tar
722 fanzago 1.93 prodcommonDir = 'ProdCommon'
723     prodcommonPath = os.environ['CRABDIR'] + '/' + 'ProdCommon'
724     if os.path.isdir(prodcommonPath):
725     tar.add(prodcommonPath,prodcommonDir)
726 spiga 1.179 common.logger.debug(5,"Files added to "+self.tgzNameWithPath+" : "+str(tar.getnames()))
727    
728     ##### ML stuff
729     ML_file_list=['report.py', 'DashboardAPI.py', 'Logger.py', 'ProcInfo.py', 'apmon.py']
730     path=os.environ['CRABDIR'] + '/python/'
731     for file in ML_file_list:
732     tar.add(path+file,file)
733     common.logger.debug(5,"Files added to "+self.tgzNameWithPath+" : "+str(tar.getnames()))
734    
735     ##### Utils
736     Utils_file_list=['parseCrabFjr.py','writeCfg.py', 'JobReportErrorCode.py']
737     for file in Utils_file_list:
738     tar.add(path+file,file)
739     common.logger.debug(5,"Files added to "+self.tgzNameWithPath+" : "+str(tar.getnames()))
740 ewv 1.131
741 ewv 1.182 ##### AdditionalFiles
742 spiga 1.179 for file in self.additional_inbox_files:
743     tar.add(file,string.split(file,'/')[-1])
744 slacapra 1.61 common.logger.debug(5,"Files added to "+self.tgzNameWithPath+" : "+str(tar.getnames()))
745 ewv 1.182
746 slacapra 1.61 tar.close()
747     except :
748     raise CrabException('Could not create tar-ball')
749 gutsche 1.72
750     ## check for tarball size
751     tarballinfo = os.stat(self.tgzNameWithPath)
752     if ( tarballinfo.st_size > self.MaxTarBallSize*1024*1024 ) :
753     raise CrabException('Input sandbox size of ' + str(float(tarballinfo.st_size)/1024.0/1024.0) + ' MB is larger than the allowed ' + str(self.MaxTarBallSize) + ' MB input sandbox limit and not supported by the used GRID submission system. Please make sure that no unnecessary files are in all data directories in your local CMSSW project area as they are automatically packed into the input sandbox.')
754    
755 slacapra 1.61 ## create tar-ball with ML stuff
756 slacapra 1.97
757 spiga 1.165 def wsSetupEnvironment(self, nj=0):
758 slacapra 1.1 """
759     Returns part of a job script which prepares
760     the execution environment for the job 'nj'.
761     """
762 ewv 1.184 if (self.CMSSW_major >= 2 and self.CMSSW_minor >= 1) or (self.CMSSW_major >= 3):
763     psetName = 'pset.py'
764     else:
765     psetName = 'pset.cfg'
766 slacapra 1.1 # Prepare JobType-independent part
767 ewv 1.160 txt = '\n#Written by cms_cmssw::wsSetupEnvironment\n'
768 fanzago 1.133 txt += 'echo ">>> setup environment"\n'
769 ewv 1.131 txt += 'if [ $middleware == LCG ]; then \n'
770 gutsche 1.3 txt += self.wsSetupCMSLCGEnvironment_()
771     txt += 'elif [ $middleware == OSG ]; then\n'
772 gutsche 1.43 txt += ' WORKING_DIR=`/bin/mktemp -d $OSG_WN_TMP/cms_XXXXXXXXXXXX`\n'
773 ewv 1.132 txt += ' if [ ! $? == 0 ] ;then\n'
774 fanzago 1.161 txt += ' echo "ERROR ==> OSG $WORKING_DIR could not be created on WN `hostname`"\n'
775     txt += ' job_exit_code=10016\n'
776     txt += ' func_exit\n'
777 gutsche 1.3 txt += ' fi\n'
778 fanzago 1.133 txt += ' echo ">>> Created working directory: $WORKING_DIR"\n'
779 gutsche 1.3 txt += '\n'
780     txt += ' echo "Change to working directory: $WORKING_DIR"\n'
781     txt += ' cd $WORKING_DIR\n'
782 fanzago 1.133 txt += ' echo ">>> current directory (WORKING_DIR): $WORKING_DIR"\n'
783 ewv 1.131 txt += self.wsSetupCMSOSGEnvironment_()
784 gutsche 1.3 txt += 'fi\n'
785 slacapra 1.1
786     # Prepare JobType-specific part
787     scram = self.scram.commandName()
788     txt += '\n\n'
789 fanzago 1.133 txt += 'echo ">>> specific cmssw setup environment:"\n'
790     txt += 'echo "CMSSW_VERSION = '+self.version+'"\n'
791 slacapra 1.1 txt += scram+' project CMSSW '+self.version+'\n'
792     txt += 'status=$?\n'
793     txt += 'if [ $status != 0 ] ; then\n'
794 fanzago 1.161 txt += ' echo "ERROR ==> CMSSW '+self.version+' not found on `hostname`" \n'
795     txt += ' job_exit_code=10034\n'
796 fanzago 1.163 txt += ' func_exit\n'
797 slacapra 1.1 txt += 'fi \n'
798     txt += 'cd '+self.version+'\n'
799 fanzago 1.99 txt += 'SOFTWARE_DIR=`pwd`\n'
800 fanzago 1.133 txt += 'echo ">>> current directory (SOFTWARE_DIR): $SOFTWARE_DIR" \n'
801 slacapra 1.1 txt += 'eval `'+scram+' runtime -sh | grep -v SCRAMRT_LSB_JOBNAME`\n'
802 fanzago 1.180 txt += 'if [ $? != 0 ] ; then\n'
803     txt += ' echo "ERROR ==> Problem with the command: "\n'
804     txt += ' echo "eval \`'+scram+' runtime -sh | grep -v SCRAMRT_LSB_JOBNAME \` at `hostname`"\n'
805     txt += ' job_exit_code=10034\n'
806     txt += ' func_exit\n'
807     txt += 'fi \n'
808 slacapra 1.1 # Handle the arguments:
809     txt += "\n"
810 gutsche 1.7 txt += "## number of arguments (first argument always jobnumber)\n"
811 slacapra 1.1 txt += "\n"
812 spiga 1.165 txt += "if [ $nargs -lt "+str(self.argsList)+" ]\n"
813 slacapra 1.1 txt += "then\n"
814 fanzago 1.161 txt += " echo 'ERROR ==> Too few arguments' +$nargs+ \n"
815     txt += ' job_exit_code=50113\n'
816     txt += " func_exit\n"
817 slacapra 1.1 txt += "fi\n"
818     txt += "\n"
819    
820     # Prepare job-specific part
821     job = common.job_list[nj]
822 ewv 1.131 if (self.datasetPath):
823 fanzago 1.93 txt += '\n'
824     txt += 'DatasetPath='+self.datasetPath+'\n'
825    
826     datasetpath_split = self.datasetPath.split("/")
827 ewv 1.131
828 fanzago 1.93 txt += 'PrimaryDataset='+datasetpath_split[1]+'\n'
829     txt += 'DataTier='+datasetpath_split[2]+'\n'
830 fanzago 1.96 txt += 'ApplicationFamily=cmsRun\n'
831 fanzago 1.93
832     else:
833     txt += 'DatasetPath=MCDataTier\n'
834     txt += 'PrimaryDataset=null\n'
835     txt += 'DataTier=null\n'
836     txt += 'ApplicationFamily=MCDataTier\n'
837 ewv 1.170 if self.pset != None:
838 spiga 1.42 pset = os.path.basename(job.configFilename())
839     txt += '\n'
840 spiga 1.95 txt += 'cp $RUNTIME_AREA/'+pset+' .\n'
841 spiga 1.42 if (self.datasetPath): # standard job
842 ewv 1.160 txt += 'InputFiles=${args[1]}; export InputFiles\n'
843     txt += 'MaxEvents=${args[2]}; export MaxEvents\n'
844     txt += 'SkipEvents=${args[3]}; export SkipEvents\n'
845 spiga 1.42 txt += 'echo "Inputfiles:<$InputFiles>"\n'
846     txt += 'echo "MaxEvents:<$MaxEvents>"\n'
847     txt += 'echo "SkipEvents:<$SkipEvents>"\n'
848     else: # pythia like job
849 ewv 1.160 txt += 'PreserveSeeds=' + ','.join(self.preserveSeeds) + '; export PreserveSeeds\n'
850     txt += 'IncrementSeeds=' + ','.join(self.incrementSeeds) + '; export IncrementSeeds\n'
851     txt += 'echo "PreserveSeeds: <$PreserveSeeds>"\n'
852     txt += 'echo "IncrementSeeds:<$IncrementSeeds>"\n'
853 slacapra 1.90 if (self.firstRun):
854 ewv 1.160 txt += 'FirstRun=${args[1]}; export FirstRun\n'
855 spiga 1.57 txt += 'echo "FirstRun: <$FirstRun>"\n'
856 slacapra 1.90
857 ewv 1.184 txt += 'mv -f ' + pset + ' ' + psetName + '\n'
858 slacapra 1.1
859    
860 fanzago 1.163 if self.pset != None:
861 ewv 1.184 # FUTURE: Can simply for 2_1_x and higher
862 spiga 1.42 txt += '\n'
863 ewv 1.184 txt += 'echo "***** cat ' + psetName + ' *********"\n'
864     txt += 'cat ' + psetName + '\n'
865     txt += 'echo "****** end ' + psetName + ' ********"\n'
866 spiga 1.42 txt += '\n'
867 ewv 1.184 txt += 'PSETHASH=`edmConfigHash < ' + psetName + '` \n'
868 fanzago 1.94 txt += 'echo "PSETHASH = $PSETHASH" \n'
869 fanzago 1.93 txt += '\n'
870 gutsche 1.3 return txt
871 slacapra 1.176
872 fanzago 1.166 def wsUntarSoftware(self, nj=0):
873 gutsche 1.3 """
874     Put in the script the commands to build an executable
875     or a library.
876     """
877    
878 fanzago 1.166 txt = '\n#Written by cms_cmssw::wsUntarSoftware\n'
879 gutsche 1.3
880     if os.path.isfile(self.tgzNameWithPath):
881 fanzago 1.133 txt += 'echo ">>> tar xzvf $RUNTIME_AREA/'+os.path.basename(self.tgzNameWithPath)+' :" \n'
882 gutsche 1.3 txt += 'tar xzvf $RUNTIME_AREA/'+os.path.basename(self.tgzNameWithPath)+'\n'
883 spiga 1.179 txt += 'ls -Al \n'
884 gutsche 1.3 txt += 'untar_status=$? \n'
885     txt += 'if [ $untar_status -ne 0 ]; then \n'
886 fanzago 1.161 txt += ' echo "ERROR ==> Untarring .tgz file failed"\n'
887     txt += ' job_exit_code=$untar_status\n'
888     txt += ' func_exit\n'
889 gutsche 1.3 txt += 'else \n'
890     txt += ' echo "Successful untar" \n'
891     txt += 'fi \n'
892 gutsche 1.50 txt += '\n'
893 fanzago 1.152 txt += 'echo ">>> Include ProdCommon in PYTHONPATH:"\n'
894 gutsche 1.50 txt += 'if [ -z "$PYTHONPATH" ]; then\n'
895 fanzago 1.166 txt += ' export PYTHONPATH=$RUNTIME_AREA/ProdCommon\n'
896 gutsche 1.50 txt += 'else\n'
897 fanzago 1.166 txt += ' export PYTHONPATH=$RUNTIME_AREA/ProdCommon:${PYTHONPATH}\n'
898 fanzago 1.93 txt += 'echo "PYTHONPATH=$PYTHONPATH"\n'
899 gutsche 1.50 txt += 'fi\n'
900     txt += '\n'
901    
902 gutsche 1.3 pass
903 ewv 1.131
904 slacapra 1.1 return txt
905 ewv 1.170
906 fanzago 1.166 def wsBuildExe(self, nj=0):
907     """
908     Put in the script the commands to build an executable
909     or a library.
910     """
911    
912     txt = '\n#Written by cms_cmssw::wsBuildExe\n'
913     txt += 'echo ">>> moving CMSSW software directories in `pwd`" \n'
914    
915 ewv 1.170 txt += 'rm -r lib/ module/ \n'
916     txt += 'mv $RUNTIME_AREA/lib/ . \n'
917     txt += 'mv $RUNTIME_AREA/module/ . \n'
918 spiga 1.186 if self.dataExist == True:
919     txt += 'rm -r src/ \n'
920     txt += 'mv $RUNTIME_AREA/src/ . \n'
921 ewv 1.182 if len(self.additional_inbox_files)>0:
922 spiga 1.179 for file in self.additional_inbox_files:
923     txt += 'mv $RUNTIME_AREA/'+file+' . \n'
924 ewv 1.170 txt += 'mv $RUNTIME_AREA/ProdCommon/ . \n'
925    
926 fanzago 1.166 txt += 'if [ -z "$PYTHONPATH" ]; then\n'
927     txt += ' export PYTHONPATH=$SOFTWARE_DIR/ProdCommon\n'
928     txt += 'else\n'
929     txt += ' export PYTHONPATH=$SOFTWARE_DIR/ProdCommon:${PYTHONPATH}\n'
930     txt += 'echo "PYTHONPATH=$PYTHONPATH"\n'
931     txt += 'fi\n'
932     txt += '\n'
933    
934     return txt
935 slacapra 1.1
936     def modifySteeringCards(self, nj):
937     """
938 ewv 1.131 modify the card provided by the user,
939 slacapra 1.1 writing a new card into share dir
940     """
941 ewv 1.131
942 slacapra 1.1 def executableName(self):
943 spiga 1.187 if self.scriptExe:
944 spiga 1.42 return "sh "
945     else:
946     return self.executable
947 slacapra 1.1
948     def executableArgs(self):
949 ewv 1.160 # FUTURE: This function tests the CMSSW version. Can be simplified as we drop support for old versions
950 slacapra 1.70 if self.scriptExe:#CarlosDaniele
951 spiga 1.42 return self.scriptExe + " $NJob"
952 fanzago 1.115 else:
953 ewv 1.160 ex_args = ""
954 ewv 1.171 # FUTURE: This tests the CMSSW version. Can remove code as versions deprecated
955 ewv 1.160 # Framework job report
956 ewv 1.184 if (self.CMSSW_major >= 1 and self.CMSSW_minor >= 5) or (self.CMSSW_major >= 2):
957 fanzago 1.166 ex_args += " -j $RUNTIME_AREA/crab_fjr_$NJob.xml"
958 ewv 1.184 # Type of config file
959     if self.CMSSW_major >= 2 :
960 ewv 1.171 ex_args += " -p pset.py"
961 fanzago 1.115 else:
962 ewv 1.160 ex_args += " -p pset.cfg"
963     return ex_args
964 slacapra 1.1
965     def inputSandbox(self, nj):
966     """
967     Returns a list of filenames to be put in JDL input sandbox.
968     """
969     inp_box = []
970     if os.path.isfile(self.tgzNameWithPath):
971     inp_box.append(self.tgzNameWithPath)
972 spiga 1.168 wrapper = os.path.basename(str(common._db.queryTask('scriptName')))
973     inp_box.append(common.work_space.pathForTgz() +'job/'+ wrapper)
974 slacapra 1.1 return inp_box
975    
976     def outputSandbox(self, nj):
977     """
978     Returns a list of filenames to be put in JDL output sandbox.
979     """
980     out_box = []
981    
982     ## User Declared output files
983 slacapra 1.54 for out in (self.output_file+self.output_file_sandbox):
984 ewv 1.131 n_out = nj + 1
985 slacapra 1.1 out_box.append(self.numberFile_(out,str(n_out)))
986     return out_box
987    
988     def prepareSteeringCards(self):
989     """
990     Make initial modifications of the user's steering card file.
991     """
992     return
993    
994     def wsRenameOutput(self, nj):
995     """
996     Returns part of a job script which renames the produced files.
997     """
998    
999 ewv 1.160 txt = '\n#Written by cms_cmssw::wsRenameOutput\n'
1000 fanzago 1.148 txt += 'echo ">>> current directory (SOFTWARE_DIR): $SOFTWARE_DIR" \n'
1001     txt += 'echo ">>> current directory content:"\n'
1002 gutsche 1.7 txt += 'ls \n'
1003 fanzago 1.145 txt += '\n'
1004 slacapra 1.54
1005 fanzago 1.128 for fileWithSuffix in (self.output_file):
1006 slacapra 1.1 output_file_num = self.numberFile_(fileWithSuffix, '$NJob')
1007     txt += '\n'
1008 gutsche 1.7 txt += '# check output file\n'
1009 slacapra 1.106 txt += 'if [ -e ./'+fileWithSuffix+' ] ; then\n'
1010 ewv 1.147 if (self.copy_data == 1): # For OSG nodes, file is in $WORKING_DIR, should not be moved to $RUNTIME_AREA
1011     txt += ' mv '+fileWithSuffix+' '+output_file_num+'\n'
1012     txt += ' ln -s `pwd`/'+output_file_num+' $RUNTIME_AREA/'+fileWithSuffix+'\n'
1013     else:
1014     txt += ' mv '+fileWithSuffix+' $RUNTIME_AREA/'+output_file_num+'\n'
1015     txt += ' ln -s $RUNTIME_AREA/'+output_file_num+' $RUNTIME_AREA/'+fileWithSuffix+'\n'
1016 slacapra 1.106 txt += 'else\n'
1017 fanzago 1.161 txt += ' job_exit_code=60302\n'
1018     txt += ' echo "WARNING: Output file '+fileWithSuffix+' not found"\n'
1019 ewv 1.156 if common.scheduler.name().upper() == 'CONDOR_G':
1020 gutsche 1.7 txt += ' if [ $middleware == OSG ]; then \n'
1021     txt += ' echo "prepare dummy output file"\n'
1022     txt += ' echo "Processing of job output failed" > $RUNTIME_AREA/'+output_file_num+'\n'
1023     txt += ' fi \n'
1024 slacapra 1.1 txt += 'fi\n'
1025 slacapra 1.105 file_list = []
1026     for fileWithSuffix in (self.output_file):
1027     file_list.append(self.numberFile_(fileWithSuffix, '$NJob'))
1028 ewv 1.131
1029 slacapra 1.105 txt += 'file_list="'+string.join(file_list,' ')+'"\n'
1030 fanzago 1.149 txt += '\n'
1031 fanzago 1.148 txt += 'echo ">>> current directory (SOFTWARE_DIR): $SOFTWARE_DIR" \n'
1032     txt += 'echo ">>> current directory content:"\n'
1033     txt += 'ls \n'
1034     txt += '\n'
1035 gutsche 1.7 txt += 'cd $RUNTIME_AREA\n'
1036 fanzago 1.133 txt += 'echo ">>> current directory (RUNTIME_AREA): $RUNTIME_AREA"\n'
1037 slacapra 1.1 return txt
1038    
1039     def numberFile_(self, file, txt):
1040     """
1041     append _'txt' before last extension of a file
1042     """
1043     p = string.split(file,".")
1044     # take away last extension
1045     name = p[0]
1046     for x in p[1:-1]:
1047 slacapra 1.90 name=name+"."+x
1048 slacapra 1.1 # add "_txt"
1049     if len(p)>1:
1050 slacapra 1.90 ext = p[len(p)-1]
1051     result = name + '_' + txt + "." + ext
1052 slacapra 1.1 else:
1053 slacapra 1.90 result = name + '_' + txt
1054 ewv 1.131
1055 slacapra 1.1 return result
1056    
1057 slacapra 1.63 def getRequirements(self, nj=[]):
1058 slacapra 1.1 """
1059 ewv 1.131 return job requirements to add to jdl files
1060 slacapra 1.1 """
1061     req = ''
1062 slacapra 1.47 if self.version:
1063 slacapra 1.10 req='Member("VO-cms-' + \
1064 slacapra 1.47 self.version + \
1065 slacapra 1.10 '", other.GlueHostApplicationSoftwareRunTimeEnvironment)'
1066 spiga 1.187 if self.executable_arch:
1067 gutsche 1.107 req+=' && Member("VO-cms-' + \
1068 slacapra 1.105 self.executable_arch + \
1069     '", other.GlueHostApplicationSoftwareRunTimeEnvironment)'
1070 gutsche 1.35
1071     req = req + ' && (other.GlueHostNetworkAdapterOutboundIP)'
1072 afanfani 1.158 if common.scheduler.name() == "glitecoll":
1073     req += ' && other.GlueCEStateStatus == "Production" '
1074 gutsche 1.35
1075 slacapra 1.1 return req
1076 gutsche 1.3
1077     def configFilename(self):
1078     """ return the config filename """
1079 ewv 1.182 # FUTURE: Can remove cfg mode for CMSSW >= 2_1_x
1080 ewv 1.184 if (self.CMSSW_major >= 2 and self.CMSSW_minor >= 1) or (self.CMSSW_major >= 3):
1081 ewv 1.182 return self.name()+'.py'
1082     else:
1083     return self.name()+'.cfg'
1084 gutsche 1.3
1085     def wsSetupCMSOSGEnvironment_(self):
1086     """
1087     Returns part of a job script which is prepares
1088     the execution environment and which is common for all CMS jobs.
1089     """
1090 ewv 1.160 txt = '\n#Written by cms_cmssw::wsSetupCMSOSGEnvironment_\n'
1091     txt += ' echo ">>> setup CMS OSG environment:"\n'
1092 fanzago 1.133 txt += ' echo "set SCRAM ARCH to ' + self.executable_arch + '"\n'
1093     txt += ' export SCRAM_ARCH='+self.executable_arch+'\n'
1094 fanzago 1.136 txt += ' echo "SCRAM_ARCH = $SCRAM_ARCH"\n'
1095 ewv 1.135 txt += ' if [ -f $OSG_APP/cmssoft/cms/cmsset_default.sh ] ;then\n'
1096 mkirn 1.40 txt += ' # Use $OSG_APP/cmssoft/cms/cmsset_default.sh to setup cms software\n'
1097 fanzago 1.133 txt += ' source $OSG_APP/cmssoft/cms/cmsset_default.sh '+self.version+'\n'
1098     txt += ' else\n'
1099 fanzago 1.161 txt += ' echo "ERROR ==> $OSG_APP/cmssoft/cms/cmsset_default.sh file not found"\n'
1100     txt += ' job_exit_code=10020\n'
1101     txt += ' func_exit\n'
1102 fanzago 1.133 txt += ' fi\n'
1103 gutsche 1.3 txt += '\n'
1104 fanzago 1.161 txt += ' echo "==> setup cms environment ok"\n'
1105 fanzago 1.136 txt += ' echo "SCRAM_ARCH = $SCRAM_ARCH"\n'
1106 gutsche 1.3
1107     return txt
1108 ewv 1.131
1109 gutsche 1.3 def wsSetupCMSLCGEnvironment_(self):
1110     """
1111     Returns part of a job script which is prepares
1112     the execution environment and which is common for all CMS jobs.
1113     """
1114 ewv 1.160 txt = '\n#Written by cms_cmssw::wsSetupCMSLCGEnvironment_\n'
1115     txt += ' echo ">>> setup CMS LCG environment:"\n'
1116 fanzago 1.133 txt += ' echo "set SCRAM ARCH and BUILD_ARCH to ' + self.executable_arch + ' ###"\n'
1117     txt += ' export SCRAM_ARCH='+self.executable_arch+'\n'
1118     txt += ' export BUILD_ARCH='+self.executable_arch+'\n'
1119     txt += ' if [ ! $VO_CMS_SW_DIR ] ;then\n'
1120 fanzago 1.161 txt += ' echo "ERROR ==> CMS software dir not found on WN `hostname`"\n'
1121     txt += ' job_exit_code=10031\n'
1122     txt += ' func_exit\n'
1123 fanzago 1.133 txt += ' else\n'
1124     txt += ' echo "Sourcing environment... "\n'
1125     txt += ' if [ ! -s $VO_CMS_SW_DIR/cmsset_default.sh ] ;then\n'
1126 fanzago 1.161 txt += ' echo "ERROR ==> cmsset_default.sh file not found into dir $VO_CMS_SW_DIR"\n'
1127     txt += ' job_exit_code=10020\n'
1128     txt += ' func_exit\n'
1129 fanzago 1.133 txt += ' fi\n'
1130     txt += ' echo "sourcing $VO_CMS_SW_DIR/cmsset_default.sh"\n'
1131     txt += ' source $VO_CMS_SW_DIR/cmsset_default.sh\n'
1132     txt += ' result=$?\n'
1133     txt += ' if [ $result -ne 0 ]; then\n'
1134 fanzago 1.161 txt += ' echo "ERROR ==> problem sourcing $VO_CMS_SW_DIR/cmsset_default.sh"\n'
1135     txt += ' job_exit_code=10032\n'
1136     txt += ' func_exit\n'
1137 fanzago 1.133 txt += ' fi\n'
1138     txt += ' fi\n'
1139     txt += ' \n'
1140 fanzago 1.161 txt += ' echo "==> setup cms environment ok"\n'
1141 gutsche 1.3 return txt
1142 gutsche 1.5
1143 fanzago 1.93 def modifyReport(self, nj):
1144     """
1145 ewv 1.131 insert the part of the script that modifies the FrameworkJob Report
1146 fanzago 1.93 """
1147 fanzago 1.94
1148 ewv 1.160 txt = '\n#Written by cms_cmssw::modifyReport\n'
1149 slacapra 1.176 publish_data = int(self.cfg_params.get('USER.publish_data',0))
1150 ewv 1.131 if (publish_data == 1):
1151 fanzago 1.94 processedDataset = self.cfg_params['USER.publish_data_name']
1152 fanzago 1.173 LFNBaseName = LFNBase(processedDataset)
1153 fanzago 1.175
1154     txt += 'if [ $copy_exit_status -eq 0 ]; then\n'
1155 fanzago 1.173 txt += ' FOR_LFN=%s_${PSETHASH}/\n'%(LFNBaseName)
1156 fanzago 1.175 txt += 'else\n'
1157     txt += ' FOR_LFN=/copy_problems/ \n'
1158     txt += ' SE=""\n'
1159     txt += ' SE_PATH=""\n'
1160     txt += 'fi\n'
1161 ewv 1.182
1162 fanzago 1.175 txt += 'echo ">>> Modify Job Report:" \n'
1163     txt += 'chmod a+x $SOFTWARE_DIR/ProdCommon/ProdCommon/FwkJobRep/ModifyJobReport.py\n'
1164     txt += 'ProcessedDataset='+processedDataset+'\n'
1165     txt += 'echo "ProcessedDataset = $ProcessedDataset"\n'
1166     txt += 'echo "SE = $SE"\n'
1167     txt += 'echo "SE_PATH = $SE_PATH"\n'
1168     txt += 'echo "FOR_LFN = $FOR_LFN" \n'
1169     txt += 'echo "CMSSW_VERSION = $CMSSW_VERSION"\n\n'
1170     txt += 'echo "$SOFTWARE_DIR/ProdCommon/ProdCommon/FwkJobRep/ModifyJobReport.py $RUNTIME_AREA/crab_fjr_$NJob.xml $NJob $FOR_LFN $PrimaryDataset $DataTier $ProcessedDataset $ApplicationFamily $executable $CMSSW_VERSION $PSETHASH $SE $SE_PATH"\n'
1171     txt += '$SOFTWARE_DIR/ProdCommon/ProdCommon/FwkJobRep/ModifyJobReport.py $RUNTIME_AREA/crab_fjr_$NJob.xml $NJob $FOR_LFN $PrimaryDataset $DataTier $ProcessedDataset $ApplicationFamily $executable $CMSSW_VERSION $PSETHASH $SE $SE_PATH\n'
1172     txt += 'modifyReport_result=$?\n'
1173     txt += 'if [ $modifyReport_result -ne 0 ]; then\n'
1174     txt += ' modifyReport_result=70500\n'
1175     txt += ' job_exit_code=$modifyReport_result\n'
1176     txt += ' echo "ModifyReportResult=$modifyReport_result" | tee -a $RUNTIME_AREA/$repo\n'
1177     txt += ' echo "WARNING: Problem with ModifyJobReport"\n'
1178     txt += 'else\n'
1179     txt += ' mv NewFrameworkJobReport.xml $RUNTIME_AREA/crab_fjr_$NJob.xml\n'
1180 spiga 1.103 txt += 'fi\n'
1181 fanzago 1.93 return txt
1182 fanzago 1.99
1183 gutsche 1.5 def setParam_(self, param, value):
1184     self._params[param] = value
1185    
1186     def getParams(self):
1187     return self._params
1188 gutsche 1.8
1189 gutsche 1.35 def uniquelist(self, old):
1190     """
1191     remove duplicates from a list
1192     """
1193     nd={}
1194     for e in old:
1195     nd[e]=0
1196     return nd.keys()
1197 mcinquil 1.121
1198 spiga 1.169 def outList(self):
1199 mcinquil 1.121 """
1200     check the dimension of the output files
1201     """
1202 spiga 1.169 txt = ''
1203     txt += 'echo ">>> list of expected files on output sandbox"\n'
1204 mcinquil 1.121 listOutFiles = []
1205 ewv 1.170 stdout = 'CMSSW_$NJob.stdout'
1206 spiga 1.169 stderr = 'CMSSW_$NJob.stderr'
1207 fanzago 1.148 if (self.return_data == 1):
1208 spiga 1.157 for file in (self.output_file+self.output_file_sandbox):
1209     listOutFiles.append(self.numberFile_(file, '$NJob'))
1210 spiga 1.169 listOutFiles.append(stdout)
1211     listOutFiles.append(stderr)
1212 ewv 1.156 else:
1213 spiga 1.157 for file in (self.output_file_sandbox):
1214     listOutFiles.append(self.numberFile_(file, '$NJob'))
1215 spiga 1.169 listOutFiles.append(stdout)
1216     listOutFiles.append(stderr)
1217 fanzago 1.161 txt += 'echo "output files: '+string.join(listOutFiles,' ')+'"\n'
1218 spiga 1.157 txt += 'filesToCheck="'+string.join(listOutFiles,' ')+'"\n'
1219 spiga 1.169 txt += 'export filesToCheck\n'
1220 ewv 1.170 return txt