ViewVC Help
View File | Revision Log | Show Annotations | Root Listing
root/cvsroot/COMP/CRAB/python/cms_cmssw.py
Revision: 1.185
Committed: Wed May 7 14:56:24 2008 UTC (16 years, 11 months ago) by ewv
Content type: text/x-python
Branch: MAIN
CVS Tags: CRAB_2_2_0_pre19
Changes since 1.184: +1 -0 lines
Log Message:
Adapt again to new random # rules in CMSSW_2_1_x

File Contents

# User Rev Content
1 slacapra 1.1 from JobType import JobType
2     from crab_logger import Logger
3     from crab_exceptions import *
4     from crab_util import *
5 fanzago 1.115 from BlackWhiteListParser import BlackWhiteListParser
6 slacapra 1.1 import common
7     import Scram
8 fanzago 1.173 from LFNBaseName import *
9 slacapra 1.1
10 slacapra 1.105 import os, string, glob
11 slacapra 1.1
12     class Cmssw(JobType):
13 mcinquil 1.144 def __init__(self, cfg_params, ncjobs):
14 slacapra 1.1 JobType.__init__(self, 'CMSSW')
15     common.logger.debug(3,'CMSSW::__init__')
16    
17 mcinquil 1.140 self.argsList = []
18 mcinquil 1.144
19 gutsche 1.3 self._params = {}
20     self.cfg_params = cfg_params
21 fanzago 1.115 # init BlackWhiteListParser
22     self.blackWhiteListParser = BlackWhiteListParser(cfg_params)
23    
24 slacapra 1.153 self.MaxTarBallSize = float(self.cfg_params.get('EDG.maxtarballsize',9.5))
25 gutsche 1.72
26 gutsche 1.44 # number of jobs requested to be created, limit obj splitting
27 gutsche 1.38 self.ncjobs = ncjobs
28    
29 slacapra 1.1 log = common.logger
30 ewv 1.131
31 slacapra 1.1 self.scram = Scram.Scram(cfg_params)
32     self.additional_inbox_files = []
33     self.scriptExe = ''
34     self.executable = ''
35 slacapra 1.71 self.executable_arch = self.scram.getArch()
36 slacapra 1.1 self.tgz_name = 'default.tgz'
37 corvo 1.56 self.scriptName = 'CMSSW.sh'
38 ewv 1.131 self.pset = '' #scrip use case Da
39 spiga 1.42 self.datasetPath = '' #scrip use case Da
40 gutsche 1.3
41 gutsche 1.50 # set FJR file name
42     self.fjrFileName = 'crab_fjr.xml'
43    
44 slacapra 1.1 self.version = self.scram.getSWVersion()
45 ewv 1.182 version_array = self.version.split('_')
46 ewv 1.184 self.CMSSW_major = 0
47     self.CMSSW_minor = 0
48     self.CMSSW_patch = 0
49 ewv 1.182 try:
50 ewv 1.184 self.CMSSW_major = int(version_array[1])
51     self.CMSSW_minor = int(version_array[2])
52     self.CMSSW_patch = int(version_array[3])
53 ewv 1.182 except:
54 ewv 1.184 msg = "Cannot parse CMSSW version string: " + self.version + " for major and minor release number!"
55 ewv 1.182 raise CrabException(msg)
56    
57 slacapra 1.1 ### collect Data cards
58 gutsche 1.66
59 slacapra 1.153 if not cfg_params.has_key('CMSSW.datasetpath'):
60 ewv 1.131 msg = "Error: datasetpath not defined "
61 slacapra 1.1 raise CrabException(msg)
62 slacapra 1.153 tmp = cfg_params['CMSSW.datasetpath']
63     log.debug(6, "CMSSW::CMSSW(): datasetPath = "+tmp)
64     if string.lower(tmp)=='none':
65     self.datasetPath = None
66     self.selectNoInput = 1
67     else:
68     self.datasetPath = tmp
69     self.selectNoInput = 0
70 gutsche 1.5
71 slacapra 1.1 self.dataTiers = []
72    
73     ## now the application
74 slacapra 1.153 self.executable = cfg_params.get('CMSSW.executable','cmsRun')
75     log.debug(6, "CMSSW::CMSSW(): executable = "+self.executable)
76 slacapra 1.1
77 slacapra 1.153 if not cfg_params.has_key('CMSSW.pset'):
78 slacapra 1.1 raise CrabException("PSet file missing. Cannot run cmsRun ")
79 slacapra 1.153 self.pset = cfg_params['CMSSW.pset']
80     log.debug(6, "Cmssw::Cmssw(): PSet file = "+self.pset)
81     if self.pset.lower() != 'none' :
82     if (not os.path.exists(self.pset)):
83     raise CrabException("User defined PSet file "+self.pset+" does not exist")
84     else:
85     self.pset = None
86 slacapra 1.1
87     # output files
88 slacapra 1.53 ## stuff which must be returned always via sandbox
89     self.output_file_sandbox = []
90    
91     # add fjr report by default via sandbox
92     self.output_file_sandbox.append(self.fjrFileName)
93    
94     # other output files to be returned via sandbox or copied to SE
95 slacapra 1.153 self.output_file = []
96     tmp = cfg_params.get('CMSSW.output_file',None)
97     if tmp :
98     tmpOutFiles = string.split(tmp,',')
99     log.debug(7, 'cmssw::cmssw(): output files '+str(tmpOutFiles))
100     for tmp in tmpOutFiles:
101     tmp=string.strip(tmp)
102     self.output_file.append(tmp)
103 slacapra 1.1 pass
104 slacapra 1.153 else:
105 gutsche 1.92 log.message("No output file defined: only stdout/err and the CRAB Framework Job Report will be available\n")
106 slacapra 1.153 pass
107 slacapra 1.1
108     # script_exe file as additional file in inputSandbox
109 slacapra 1.153 self.scriptExe = cfg_params.get('USER.script_exe',None)
110     if self.scriptExe :
111 slacapra 1.176 if not os.path.isfile(self.scriptExe):
112     msg ="ERROR. file "+self.scriptExe+" not found"
113     raise CrabException(msg)
114     self.additional_inbox_files.append(string.strip(self.scriptExe))
115 slacapra 1.70
116 spiga 1.42 #CarlosDaniele
117     if self.datasetPath == None and self.pset == None and self.scriptExe == '' :
118 slacapra 1.176 msg ="Error. script_exe not defined"
119     raise CrabException(msg)
120 spiga 1.42
121 slacapra 1.1 ## additional input files
122 slacapra 1.153 if cfg_params.has_key('USER.additional_input_files'):
123 slacapra 1.29 tmpAddFiles = string.split(cfg_params['USER.additional_input_files'],',')
124 slacapra 1.70 for tmp in tmpAddFiles:
125     tmp = string.strip(tmp)
126     dirname = ''
127     if not tmp[0]=="/": dirname = "."
128 corvo 1.85 files = []
129     if string.find(tmp,"*")>-1:
130     files = glob.glob(os.path.join(dirname, tmp))
131     if len(files)==0:
132     raise CrabException("No additional input file found with this pattern: "+tmp)
133     else:
134     files.append(tmp)
135 slacapra 1.70 for file in files:
136     if not os.path.exists(file):
137     raise CrabException("Additional input file not found: "+file)
138 slacapra 1.45 pass
139 slacapra 1.105 # fname = string.split(file, '/')[-1]
140     # storedFile = common.work_space.pathForTgz()+'share/'+fname
141     # shutil.copyfile(file, storedFile)
142     self.additional_inbox_files.append(string.strip(file))
143 slacapra 1.1 pass
144     pass
145 slacapra 1.70 common.logger.debug(5,"Additional input files: "+str(self.additional_inbox_files))
146 slacapra 1.153 pass
147 gutsche 1.3
148 slacapra 1.9 ## Events per job
149 slacapra 1.153 if cfg_params.has_key('CMSSW.events_per_job'):
150 slacapra 1.10 self.eventsPerJob =int( cfg_params['CMSSW.events_per_job'])
151 slacapra 1.9 self.selectEventsPerJob = 1
152 slacapra 1.153 else:
153 slacapra 1.9 self.eventsPerJob = -1
154     self.selectEventsPerJob = 0
155 ewv 1.131
156 slacapra 1.22 ## number of jobs
157 slacapra 1.153 if cfg_params.has_key('CMSSW.number_of_jobs'):
158 slacapra 1.22 self.theNumberOfJobs =int( cfg_params['CMSSW.number_of_jobs'])
159     self.selectNumberOfJobs = 1
160 slacapra 1.153 else:
161 slacapra 1.22 self.theNumberOfJobs = 0
162     self.selectNumberOfJobs = 0
163 slacapra 1.10
164 slacapra 1.153 if cfg_params.has_key('CMSSW.total_number_of_events'):
165 gutsche 1.35 self.total_number_of_events = int(cfg_params['CMSSW.total_number_of_events'])
166     self.selectTotalNumberEvents = 1
167 slacapra 1.153 else:
168 gutsche 1.35 self.total_number_of_events = 0
169     self.selectTotalNumberEvents = 0
170    
171 ewv 1.131 if self.pset != None: #CarlosDaniele
172 spiga 1.42 if ( (self.selectTotalNumberEvents + self.selectEventsPerJob + self.selectNumberOfJobs) != 2 ):
173     msg = 'Must define exactly two of total_number_of_events, events_per_job, or number_of_jobs.'
174     raise CrabException(msg)
175     else:
176     if (self.selectNumberOfJobs == 0):
177     msg = 'Must specify number_of_jobs.'
178     raise CrabException(msg)
179 gutsche 1.35
180 ewv 1.160 ## New method of dealing with seeds
181     self.incrementSeeds = []
182     self.preserveSeeds = []
183     if cfg_params.has_key('CMSSW.preserve_seeds'):
184     tmpList = cfg_params['CMSSW.preserve_seeds'].split(',')
185     for tmp in tmpList:
186     tmp.strip()
187     self.preserveSeeds.append(tmp)
188     if cfg_params.has_key('CMSSW.increment_seeds'):
189     tmpList = cfg_params['CMSSW.increment_seeds'].split(',')
190     for tmp in tmpList:
191     tmp.strip()
192     self.incrementSeeds.append(tmp)
193    
194     ## Old method of dealing with seeds
195     ## FUTURE: This is for old CMSSW and old CRAB. Can throw exceptions after a couple of CRAB releases and then
196     ## remove
197 slacapra 1.153 self.sourceSeed = cfg_params.get('CMSSW.pythia_seed',None)
198 ewv 1.160 if self.sourceSeed:
199 slacapra 1.177 print "pythia_seed is a deprecated parameter. Use preserve_seeds or increment_seeds in the future.\n","Added to increment_seeds."
200     self.incrementSeeds.append('sourceSeed')
201 ewv 1.185 self.incrementSeeds.append('theSource')
202 slacapra 1.153
203     self.sourceSeedVtx = cfg_params.get('CMSSW.vtx_seed',None)
204 ewv 1.160 if self.sourceSeedVtx:
205 slacapra 1.177 print "vtx_seed is a deprecated parameter. Use preserve_seeds or increment_seeds in the future.\n","Added to increment_seeds."
206     self.incrementSeeds.append('VtxSmeared')
207 slacapra 1.22
208 slacapra 1.153 self.sourceSeedG4 = cfg_params.get('CMSSW.g4_seed',None)
209 ewv 1.160 if self.sourceSeedG4:
210 slacapra 1.177 print "g4_seed is a deprecated parameter. Use preserve_seeds or increment_seeds in the future.\n","Added to increment_seeds."
211     self.incrementSeeds.append('g4SimHits')
212 slacapra 1.90
213 slacapra 1.153 self.sourceSeedMix = cfg_params.get('CMSSW.mix_seed',None)
214 ewv 1.160 if self.sourceSeedMix:
215 slacapra 1.177 print "mix_seed is a deprecated parameter. Use preserve_seeds or increment_seeds in the future.\n","Added to increment_seeds."
216     self.incrementSeeds.append('mix')
217 slacapra 1.90
218 slacapra 1.153 self.firstRun = cfg_params.get('CMSSW.first_run',None)
219 slacapra 1.90
220 spiga 1.42 if self.pset != None: #CarlosDaniele
221 ewv 1.131 import PsetManipulator as pp
222 slacapra 1.97 PsetEdit = pp.PsetManipulator(self.pset) #Daniele Pset
223 gutsche 1.3
224 ewv 1.147 # Copy/return
225    
226 slacapra 1.153 self.copy_data = int(cfg_params.get('USER.copy_data',0))
227     self.return_data = int(cfg_params.get('USER.return_data',0))
228 ewv 1.147
229 slacapra 1.1 #DBSDLS-start
230 ewv 1.131 ## Initialize the variables that are extracted from DBS/DLS and needed in other places of the code
231 slacapra 1.1 self.maxEvents=0 # max events available ( --> check the requested nb. of evts in Creator.py)
232     self.DBSPaths={} # all dbs paths requested ( --> input to the site local discovery script)
233 gutsche 1.35 self.jobDestination=[] # Site destination(s) for each job (list of lists)
234 slacapra 1.1 ## Perform the data location and discovery (based on DBS/DLS)
235 slacapra 1.9 ## SL: Don't if NONE is specified as input (pythia use case)
236 gutsche 1.35 blockSites = {}
237 slacapra 1.9 if self.datasetPath:
238 gutsche 1.35 blockSites = self.DataDiscoveryAndLocation(cfg_params)
239 ewv 1.131 #DBSDLS-end
240 slacapra 1.1
241 ewv 1.131
242 slacapra 1.9 ## Select Splitting
243 ewv 1.131 if self.selectNoInput:
244 spiga 1.42 if self.pset == None: #CarlosDaniele
245     self.jobSplittingForScript()
246     else:
247     self.jobSplittingNoInput()
248 gutsche 1.92 else:
249 corvo 1.56 self.jobSplittingByBlocks(blockSites)
250 gutsche 1.5
251 slacapra 1.22 # modify Pset
252 spiga 1.42 if self.pset != None: #CarlosDaniele
253 slacapra 1.86 try:
254 ewv 1.160 # Add FrameworkJobReport to parameter-set, set max events.
255     # Reset later for data jobs by writeCFG which does all modifications
256 ewv 1.182 PsetEdit.addCrabFJR(self.fjrFileName) # FUTURE: Job report addition not needed by CMSSW>1.5
257 ewv 1.160 PsetEdit.maxEvent(self.eventsPerJob)
258 slacapra 1.90 PsetEdit.psetWriter(self.configFilename())
259 slacapra 1.86 except:
260 ewv 1.184 msg='Error while manipulating ParameterSet: exiting...'
261 slacapra 1.86 raise CrabException(msg)
262 spiga 1.179 self.tgzNameWithPath = self.getTarBall(self.executable)
263 gutsche 1.3
264 slacapra 1.1 def DataDiscoveryAndLocation(self, cfg_params):
265    
266 slacapra 1.86 import DataDiscovery
267     import DataLocation
268 gutsche 1.3 common.logger.debug(10,"CMSSW::DataDiscoveryAndLocation()")
269    
270     datasetPath=self.datasetPath
271    
272 slacapra 1.1 ## Contact the DBS
273 gutsche 1.92 common.logger.message("Contacting Data Discovery Services ...")
274 slacapra 1.1 try:
275 slacapra 1.137 self.pubdata=DataDiscovery.DataDiscovery(datasetPath, cfg_params)
276 slacapra 1.1 self.pubdata.fetchDBSInfo()
277    
278 slacapra 1.41 except DataDiscovery.NotExistingDatasetError, ex :
279 slacapra 1.1 msg = 'ERROR ***: failed Data Discovery in DBS : %s'%ex.getErrorMessage()
280     raise CrabException(msg)
281 slacapra 1.41 except DataDiscovery.NoDataTierinProvenanceError, ex :
282 slacapra 1.1 msg = 'ERROR ***: failed Data Discovery in DBS : %s'%ex.getErrorMessage()
283     raise CrabException(msg)
284 slacapra 1.41 except DataDiscovery.DataDiscoveryError, ex:
285 gutsche 1.66 msg = 'ERROR ***: failed Data Discovery in DBS : %s'%ex.getErrorMessage()
286 slacapra 1.1 raise CrabException(msg)
287    
288 gutsche 1.35 self.filesbyblock=self.pubdata.getFiles()
289 mkirn 1.37 self.eventsbyblock=self.pubdata.getEventsPerBlock()
290     self.eventsbyfile=self.pubdata.getEventsPerFile()
291 gutsche 1.3
292 slacapra 1.1 ## get max number of events
293 ewv 1.131 self.maxEvents=self.pubdata.getMaxEvents() ## self.maxEvents used in Creator.py
294 slacapra 1.1
295     ## Contact the DLS and build a list of sites hosting the fileblocks
296     try:
297 slacapra 1.41 dataloc=DataLocation.DataLocation(self.filesbyblock.keys(),cfg_params)
298 gutsche 1.6 dataloc.fetchDLSInfo()
299 slacapra 1.41 except DataLocation.DataLocationError , ex:
300 slacapra 1.1 msg = 'ERROR ***: failed Data Location in DLS \n %s '%ex.getErrorMessage()
301     raise CrabException(msg)
302 ewv 1.131
303 slacapra 1.1
304 gutsche 1.35 sites = dataloc.getSites()
305     allSites = []
306     listSites = sites.values()
307 slacapra 1.63 for listSite in listSites:
308     for oneSite in listSite:
309 gutsche 1.35 allSites.append(oneSite)
310     allSites = self.uniquelist(allSites)
311 gutsche 1.3
312 gutsche 1.92 # screen output
313     common.logger.message("Requested dataset: " + datasetPath + " has " + str(self.maxEvents) + " events in " + str(len(self.filesbyblock.keys())) + " blocks.\n")
314    
315 gutsche 1.35 return sites
316 ewv 1.131
317 ewv 1.170 # to Be Removed DS -- BL
318 spiga 1.165 # def setArgsList(self, argsList):
319     # self.argsList = argsList
320 mcinquil 1.140
321 gutsche 1.35 def jobSplittingByBlocks(self, blockSites):
322 slacapra 1.9 """
323 gutsche 1.35 Perform job splitting. Jobs run over an integer number of files
324     and no more than one block.
325     ARGUMENT: blockSites: dictionary with blocks as keys and list of host sites as values
326     REQUIRES: self.selectTotalNumberEvents, self.selectEventsPerJob, self.selectNumberofJobs,
327     self.total_number_of_events, self.eventsPerJob, self.theNumberOfJobs,
328     self.maxEvents, self.filesbyblock
329     SETS: self.jobDestination - Site destination(s) for each job (a list of lists)
330     self.total_number_of_jobs - Total # of jobs
331     self.list_of_args - File(s) job will run on (a list of lists)
332     """
333    
334     # ---- Handle the possible job splitting configurations ---- #
335     if (self.selectTotalNumberEvents):
336     totalEventsRequested = self.total_number_of_events
337     if (self.selectEventsPerJob):
338     eventsPerJobRequested = self.eventsPerJob
339     if (self.selectNumberOfJobs):
340     totalEventsRequested = self.theNumberOfJobs * self.eventsPerJob
341    
342     # If user requested all the events in the dataset
343     if (totalEventsRequested == -1):
344     eventsRemaining=self.maxEvents
345     # If user requested more events than are in the dataset
346     elif (totalEventsRequested > self.maxEvents):
347     eventsRemaining = self.maxEvents
348     common.logger.message("Requested "+str(self.total_number_of_events)+ " events, but only "+str(self.maxEvents)+" events are available.")
349     # If user requested less events than are in the dataset
350     else:
351     eventsRemaining = totalEventsRequested
352 slacapra 1.22
353 slacapra 1.41 # If user requested more events per job than are in the dataset
354     if (self.selectEventsPerJob and eventsPerJobRequested > self.maxEvents):
355     eventsPerJobRequested = self.maxEvents
356    
357 gutsche 1.35 # For user info at end
358     totalEventCount = 0
359 gutsche 1.3
360 gutsche 1.35 if (self.selectTotalNumberEvents and self.selectNumberOfJobs):
361     eventsPerJobRequested = int(eventsRemaining/self.theNumberOfJobs)
362 slacapra 1.22
363 gutsche 1.35 if (self.selectNumberOfJobs):
364     common.logger.message("May not create the exact number_of_jobs requested.")
365 slacapra 1.23
366 gutsche 1.38 if ( self.ncjobs == 'all' ) :
367     totalNumberOfJobs = 999999999
368     else :
369     totalNumberOfJobs = self.ncjobs
370 ewv 1.131
371 gutsche 1.35 blocks = blockSites.keys()
372     blockCount = 0
373     # Backup variable in case self.maxEvents counted events in a non-included block
374     numBlocksInDataset = len(blocks)
375 gutsche 1.3
376 gutsche 1.35 jobCount = 0
377     list_of_lists = []
378 gutsche 1.3
379 gutsche 1.92 # list tracking which jobs are in which jobs belong to which block
380     jobsOfBlock = {}
381    
382 gutsche 1.35 # ---- Iterate over the blocks in the dataset until ---- #
383     # ---- we've met the requested total # of events ---- #
384 gutsche 1.38 while ( (eventsRemaining > 0) and (blockCount < numBlocksInDataset) and (jobCount < totalNumberOfJobs)):
385 gutsche 1.35 block = blocks[blockCount]
386 gutsche 1.44 blockCount += 1
387 gutsche 1.104 if block not in jobsOfBlock.keys() :
388     jobsOfBlock[block] = []
389 ewv 1.131
390 gutsche 1.68 if self.eventsbyblock.has_key(block) :
391     numEventsInBlock = self.eventsbyblock[block]
392     common.logger.debug(5,'Events in Block File '+str(numEventsInBlock))
393 ewv 1.131
394 gutsche 1.68 files = self.filesbyblock[block]
395     numFilesInBlock = len(files)
396     if (numFilesInBlock <= 0):
397     continue
398     fileCount = 0
399    
400     # ---- New block => New job ---- #
401 ewv 1.131 parString = ""
402 gutsche 1.68 # counter for number of events in files currently worked on
403     filesEventCount = 0
404     # flag if next while loop should touch new file
405     newFile = 1
406     # job event counter
407     jobSkipEventCount = 0
408 ewv 1.131
409 gutsche 1.68 # ---- Iterate over the files in the block until we've met the requested ---- #
410     # ---- total # of events or we've gone over all the files in this block ---- #
411     while ( (eventsRemaining > 0) and (fileCount < numFilesInBlock) and (jobCount < totalNumberOfJobs) ):
412     file = files[fileCount]
413     if newFile :
414     try:
415     numEventsInFile = self.eventsbyfile[file]
416     common.logger.debug(6, "File "+str(file)+" has "+str(numEventsInFile)+" events")
417     # increase filesEventCount
418     filesEventCount += numEventsInFile
419     # Add file to current job
420     parString += '\\\"' + file + '\\\"\,'
421     newFile = 0
422     except KeyError:
423     common.logger.message("File "+str(file)+" has unknown number of events: skipping")
424 ewv 1.131
425 slacapra 1.177 eventsPerJobRequested = min(eventsPerJobRequested, eventsRemaining)
426 gutsche 1.68 # if less events in file remain than eventsPerJobRequested
427 slacapra 1.177 if ( filesEventCount - jobSkipEventCount < eventsPerJobRequested):
428 gutsche 1.68 # if last file in block
429     if ( fileCount == numFilesInBlock-1 ) :
430     # end job using last file, use remaining events in block
431     # close job and touch new file
432     fullString = parString[:-2]
433     list_of_lists.append([fullString,str(-1),str(jobSkipEventCount)])
434     common.logger.debug(3,"Job "+str(jobCount+1)+" can run over "+str(filesEventCount - jobSkipEventCount)+" events (last file in block).")
435     self.jobDestination.append(blockSites[block])
436     common.logger.debug(5,"Job "+str(jobCount+1)+" Destination: "+str(self.jobDestination[jobCount]))
437 gutsche 1.92 # fill jobs of block dictionary
438 gutsche 1.104 jobsOfBlock[block].append(jobCount+1)
439 gutsche 1.68 # reset counter
440     jobCount = jobCount + 1
441     totalEventCount = totalEventCount + filesEventCount - jobSkipEventCount
442     eventsRemaining = eventsRemaining - filesEventCount + jobSkipEventCount
443     jobSkipEventCount = 0
444     # reset file
445 ewv 1.131 parString = ""
446 gutsche 1.68 filesEventCount = 0
447     newFile = 1
448     fileCount += 1
449     else :
450     # go to next file
451     newFile = 1
452     fileCount += 1
453     # if events in file equal to eventsPerJobRequested
454     elif ( filesEventCount - jobSkipEventCount == eventsPerJobRequested ) :
455 gutsche 1.38 # close job and touch new file
456     fullString = parString[:-2]
457 gutsche 1.68 list_of_lists.append([fullString,str(eventsPerJobRequested),str(jobSkipEventCount)])
458     common.logger.debug(3,"Job "+str(jobCount+1)+" can run over "+str(eventsPerJobRequested)+" events.")
459 gutsche 1.38 self.jobDestination.append(blockSites[block])
460     common.logger.debug(5,"Job "+str(jobCount+1)+" Destination: "+str(self.jobDestination[jobCount]))
461 gutsche 1.104 jobsOfBlock[block].append(jobCount+1)
462 gutsche 1.38 # reset counter
463     jobCount = jobCount + 1
464 gutsche 1.68 totalEventCount = totalEventCount + eventsPerJobRequested
465     eventsRemaining = eventsRemaining - eventsPerJobRequested
466 gutsche 1.38 jobSkipEventCount = 0
467     # reset file
468 ewv 1.131 parString = ""
469 gutsche 1.38 filesEventCount = 0
470     newFile = 1
471     fileCount += 1
472 ewv 1.131
473 gutsche 1.68 # if more events in file remain than eventsPerJobRequested
474 gutsche 1.38 else :
475 gutsche 1.68 # close job but don't touch new file
476     fullString = parString[:-2]
477     list_of_lists.append([fullString,str(eventsPerJobRequested),str(jobSkipEventCount)])
478     common.logger.debug(3,"Job "+str(jobCount+1)+" can run over "+str(eventsPerJobRequested)+" events.")
479     self.jobDestination.append(blockSites[block])
480     common.logger.debug(5,"Job "+str(jobCount+1)+" Destination: "+str(self.jobDestination[jobCount]))
481 gutsche 1.104 jobsOfBlock[block].append(jobCount+1)
482 gutsche 1.68 # increase counter
483     jobCount = jobCount + 1
484     totalEventCount = totalEventCount + eventsPerJobRequested
485     eventsRemaining = eventsRemaining - eventsPerJobRequested
486     # calculate skip events for last file
487     # use filesEventCount (contains several files), jobSkipEventCount and eventsPerJobRequest
488     jobSkipEventCount = eventsPerJobRequested - (filesEventCount - jobSkipEventCount - self.eventsbyfile[file])
489     # remove all but the last file
490     filesEventCount = self.eventsbyfile[file]
491 ewv 1.160 parString = '\\\"' + file + '\\\"\,'
492 gutsche 1.68 pass # END if
493     pass # END while (iterate over files in the block)
494 gutsche 1.35 pass # END while (iterate over blocks in the dataset)
495 slacapra 1.41 self.ncjobs = self.total_number_of_jobs = jobCount
496 gutsche 1.38 if (eventsRemaining > 0 and jobCount < totalNumberOfJobs ):
497 gutsche 1.35 common.logger.message("Could not run on all requested events because some blocks not hosted at allowed sites.")
498 gutsche 1.92 common.logger.message(str(jobCount)+" job(s) can run on "+str(totalEventCount)+" events.\n")
499 ewv 1.131
500 gutsche 1.92 # screen output
501     screenOutput = "List of jobs and available destination sites:\n\n"
502    
503 mcinquil 1.124 # keep trace of block with no sites to print a warning at the end
504     noSiteBlock = []
505     bloskNoSite = []
506    
507 gutsche 1.92 blockCounter = 0
508 gutsche 1.104 for block in blocks:
509     if block in jobsOfBlock.keys() :
510     blockCounter += 1
511 slacapra 1.176 screenOutput += "Block %5i: jobs %20s: sites: %s\n" % (blockCounter,spanRanges(jobsOfBlock[block]),
512     ','.join(self.blackWhiteListParser.checkWhiteList(self.blackWhiteListParser.checkBlackList(blockSites[block],block),block)))
513 mcinquil 1.124 if len(self.blackWhiteListParser.checkWhiteList(self.blackWhiteListParser.checkBlackList(blockSites[block],block),block)) == 0:
514 ewv 1.131 noSiteBlock.append( spanRanges(jobsOfBlock[block]) )
515 mcinquil 1.124 bloskNoSite.append( blockCounter )
516 ewv 1.131
517 mcinquil 1.124 common.logger.message(screenOutput)
518 fanzago 1.127 if len(noSiteBlock) > 0 and len(bloskNoSite) > 0:
519 mcinquil 1.126 msg = 'WARNING: No sites are hosting any part of data for block:\n '
520     virgola = ""
521     if len(bloskNoSite) > 1:
522     virgola = ","
523     for block in bloskNoSite:
524     msg += ' ' + str(block) + virgola
525     msg += '\n Related jobs:\n '
526     virgola = ""
527     if len(noSiteBlock) > 1:
528     virgola = ","
529     for range_jobs in noSiteBlock:
530     msg += str(range_jobs) + virgola
531     msg += '\n will not be submitted and this block of data can not be analyzed!\n'
532 slacapra 1.155 if self.cfg_params.has_key('EDG.se_white_list'):
533     msg += 'WARNING: SE White List: '+self.cfg_params['EDG.se_white_list']+'\n'
534     msg += '(Hint: By whitelisting you force the job to run at this particular site(s).\n'
535     msg += 'Please check if the dataset is available at this site!)\n'
536     if self.cfg_params.has_key('EDG.ce_white_list'):
537     msg += 'WARNING: CE White List: '+self.cfg_params['EDG.ce_white_list']+'\n'
538     msg += '(Hint: By whitelisting you force the job to run at this particular site(s).\n'
539     msg += 'Please check if the dataset is available at this site!)\n'
540    
541 mcinquil 1.126 common.logger.message(msg)
542 gutsche 1.92
543 slacapra 1.9 self.list_of_args = list_of_lists
544     return
545    
546 slacapra 1.21 def jobSplittingNoInput(self):
547 slacapra 1.9 """
548     Perform job splitting based on number of event per job
549     """
550     common.logger.debug(5,'Splitting per events')
551 fanzago 1.130
552 ewv 1.131 if (self.selectEventsPerJob):
553 fanzago 1.130 common.logger.message('Required '+str(self.eventsPerJob)+' events per job ')
554     if (self.selectNumberOfJobs):
555     common.logger.message('Required '+str(self.theNumberOfJobs)+' jobs in total ')
556     if (self.selectTotalNumberEvents):
557     common.logger.message('Required '+str(self.total_number_of_events)+' events in total ')
558 slacapra 1.9
559 slacapra 1.10 if (self.total_number_of_events < 0):
560     msg='Cannot split jobs per Events with "-1" as total number of events'
561     raise CrabException(msg)
562    
563 slacapra 1.22 if (self.selectEventsPerJob):
564 spiga 1.65 if (self.selectTotalNumberEvents):
565     self.total_number_of_jobs = int(self.total_number_of_events/self.eventsPerJob)
566 ewv 1.131 elif(self.selectNumberOfJobs) :
567 spiga 1.65 self.total_number_of_jobs =self.theNumberOfJobs
568 ewv 1.131 self.total_number_of_events =int(self.theNumberOfJobs*self.eventsPerJob)
569 spiga 1.65
570 slacapra 1.22 elif (self.selectNumberOfJobs) :
571     self.total_number_of_jobs = self.theNumberOfJobs
572     self.eventsPerJob = int(self.total_number_of_events/self.total_number_of_jobs)
573 ewv 1.131
574 slacapra 1.9 common.logger.debug(5,'N jobs '+str(self.total_number_of_jobs))
575    
576     # is there any remainder?
577     check = int(self.total_number_of_events) - (int(self.total_number_of_jobs)*self.eventsPerJob)
578    
579     common.logger.debug(5,'Check '+str(check))
580    
581 gutsche 1.35 common.logger.message(str(self.total_number_of_jobs)+' jobs can be created, each for '+str(self.eventsPerJob)+' for a total of '+str(self.total_number_of_jobs*self.eventsPerJob)+' events')
582 slacapra 1.9 if check > 0:
583 gutsche 1.35 common.logger.message('Warning: asked '+str(self.total_number_of_events)+' but can do only '+str(int(self.total_number_of_jobs)*self.eventsPerJob))
584 slacapra 1.9
585 slacapra 1.10 # argument is seed number.$i
586 slacapra 1.9 self.list_of_args = []
587     for i in range(self.total_number_of_jobs):
588 gutsche 1.35 ## Since there is no input, any site is good
589 ewv 1.131 self.jobDestination.append([""]) #must be empty to write correctly the xml
590 slacapra 1.90 args=[]
591 spiga 1.57 if (self.firstRun):
592 slacapra 1.138 ## pythia first run
593 slacapra 1.90 args.append(str(self.firstRun)+str(i))
594     self.list_of_args.append(args)
595 ewv 1.131
596 gutsche 1.3 return
597    
598 spiga 1.42
599     def jobSplittingForScript(self):#CarlosDaniele
600     """
601     Perform job splitting based on number of job
602     """
603     common.logger.debug(5,'Splitting per job')
604     common.logger.message('Required '+str(self.theNumberOfJobs)+' jobs in total ')
605    
606     self.total_number_of_jobs = self.theNumberOfJobs
607    
608     common.logger.debug(5,'N jobs '+str(self.total_number_of_jobs))
609    
610     common.logger.message(str(self.total_number_of_jobs)+' jobs can be created')
611    
612     # argument is seed number.$i
613     self.list_of_args = []
614     for i in range(self.total_number_of_jobs):
615     ## Since there is no input, any site is good
616     # self.jobDestination.append(["Any"])
617     self.jobDestination.append([""])
618     ## no random seed
619     self.list_of_args.append([str(i)])
620     return
621    
622 gutsche 1.3 def split(self, jobParams):
623 ewv 1.131
624 gutsche 1.3 #### Fabio
625     njobs = self.total_number_of_jobs
626 slacapra 1.9 arglist = self.list_of_args
627 gutsche 1.3 # create the empty structure
628     for i in range(njobs):
629     jobParams.append("")
630 ewv 1.131
631 spiga 1.165 listID=[]
632     listField=[]
633 gutsche 1.3 for job in range(njobs):
634 slacapra 1.17 jobParams[job] = arglist[job]
635 spiga 1.167 listID.append(job+1)
636 spiga 1.162 job_ToSave ={}
637 spiga 1.169 concString = ' '
638 spiga 1.165 argu=''
639     if len(jobParams[job]):
640     argu += concString.join(jobParams[job] )
641 spiga 1.169 job_ToSave['arguments']= str(job+1)+' '+argu## new BL--DS
642 spiga 1.162 job_ToSave['dlsDestination']= self.jobDestination[job]## new BL--DS
643 spiga 1.165 #common._db.updateJob_(job,job_ToSave)## new BL--DS
644     listField.append(job_ToSave)
645 spiga 1.169 msg="Job "+str(job)+" Arguments: "+str(job+1)+" "+argu+"\n" \
646 spiga 1.165 +" Destination: "+str(self.jobDestination[job])
647     common.logger.debug(5,msg)
648     #common.logger.debug(5,"Job "+str(job)+" Destination: "+str(self.jobDestination[job]))
649     common._db.updateJob_(listID,listField)## new BL--DS
650     ## Pay Attention Here....DS--BL
651 spiga 1.181 self.argsList = (len(jobParams[0])+1)
652 gutsche 1.3
653     return
654 ewv 1.131
655 gutsche 1.3 def numberOfJobs(self):
656     # Fabio
657     return self.total_number_of_jobs
658    
659 slacapra 1.1 def getTarBall(self, exe):
660     """
661     Return the TarBall with lib and exe
662     """
663 ewv 1.131
664 slacapra 1.1 # if it exist, just return it
665 corvo 1.56 #
666     # Marco. Let's start to use relative path for Boss XML files
667     #
668     self.tgzNameWithPath = common.work_space.pathForTgz()+'share/'+self.tgz_name
669 slacapra 1.1 if os.path.exists(self.tgzNameWithPath):
670     return self.tgzNameWithPath
671    
672     # Prepare a tar gzipped file with user binaries.
673     self.buildTar_(exe)
674    
675     return string.strip(self.tgzNameWithPath)
676    
677     def buildTar_(self, executable):
678    
679     # First of all declare the user Scram area
680     swArea = self.scram.getSWArea_()
681     #print "swArea = ", swArea
682 slacapra 1.63 # swVersion = self.scram.getSWVersion()
683     # print "swVersion = ", swVersion
684 slacapra 1.1 swReleaseTop = self.scram.getReleaseTop_()
685     #print "swReleaseTop = ", swReleaseTop
686 ewv 1.131
687 slacapra 1.1 ## check if working area is release top
688     if swReleaseTop == '' or swArea == swReleaseTop:
689 afanfani 1.172 common.logger.debug(3,"swArea = "+swArea+" swReleaseTop ="+swReleaseTop)
690 slacapra 1.1 return
691    
692 slacapra 1.61 import tarfile
693     try: # create tar ball
694     tar = tarfile.open(self.tgzNameWithPath, "w:gz")
695     ## First find the executable
696 slacapra 1.86 if (self.executable != ''):
697 slacapra 1.61 exeWithPath = self.scram.findFile_(executable)
698     if ( not exeWithPath ):
699     raise CrabException('User executable '+executable+' not found')
700 ewv 1.131
701 slacapra 1.61 ## then check if it's private or not
702     if exeWithPath.find(swReleaseTop) == -1:
703     # the exe is private, so we must ship
704     common.logger.debug(5,"Exe "+exeWithPath+" to be tarred")
705     path = swArea+'/'
706 corvo 1.85 # distinguish case when script is in user project area or given by full path somewhere else
707     if exeWithPath.find(path) >= 0 :
708     exe = string.replace(exeWithPath, path,'')
709 slacapra 1.129 tar.add(path+exe,exe)
710 corvo 1.85 else :
711     tar.add(exeWithPath,os.path.basename(executable))
712 slacapra 1.61 pass
713     else:
714     # the exe is from release, we'll find it on WN
715     pass
716 ewv 1.131
717 slacapra 1.61 ## Now get the libraries: only those in local working area
718     libDir = 'lib'
719     lib = swArea+'/' +libDir
720     common.logger.debug(5,"lib "+lib+" to be tarred")
721     if os.path.exists(lib):
722     tar.add(lib,libDir)
723 ewv 1.131
724 slacapra 1.61 ## Now check if module dir is present
725     moduleDir = 'module'
726     module = swArea + '/' + moduleDir
727     if os.path.isdir(module):
728     tar.add(module,moduleDir)
729    
730     ## Now check if any data dir(s) is present
731     swAreaLen=len(swArea)
732 spiga 1.179 self.dataExist = False
733 slacapra 1.61 for root, dirs, files in os.walk(swArea):
734     if "data" in dirs:
735 spiga 1.179 self.dataExist=True
736 slacapra 1.61 common.logger.debug(5,"data "+root+"/data"+" to be tarred")
737     tar.add(root+"/data",root[swAreaLen:]+"/data")
738 ewv 1.182
739 spiga 1.179 ### CMSSW ParameterSet
740     if not self.pset is None:
741     cfg_file = common.work_space.jobDir()+self.configFilename()
742 ewv 1.182 tar.add(cfg_file,self.configFilename())
743 spiga 1.179 common.logger.debug(5,"File added to "+self.tgzNameWithPath+" : "+str(tar.getnames()))
744 slacapra 1.61
745 fanzago 1.93
746 fanzago 1.152 ## Add ProdCommon dir to tar
747 fanzago 1.93 prodcommonDir = 'ProdCommon'
748     prodcommonPath = os.environ['CRABDIR'] + '/' + 'ProdCommon'
749     if os.path.isdir(prodcommonPath):
750     tar.add(prodcommonPath,prodcommonDir)
751 spiga 1.179 common.logger.debug(5,"Files added to "+self.tgzNameWithPath+" : "+str(tar.getnames()))
752    
753     ##### ML stuff
754     ML_file_list=['report.py', 'DashboardAPI.py', 'Logger.py', 'ProcInfo.py', 'apmon.py']
755     path=os.environ['CRABDIR'] + '/python/'
756     for file in ML_file_list:
757     tar.add(path+file,file)
758     common.logger.debug(5,"Files added to "+self.tgzNameWithPath+" : "+str(tar.getnames()))
759    
760     ##### Utils
761     Utils_file_list=['parseCrabFjr.py','writeCfg.py', 'JobReportErrorCode.py']
762     for file in Utils_file_list:
763     tar.add(path+file,file)
764     common.logger.debug(5,"Files added to "+self.tgzNameWithPath+" : "+str(tar.getnames()))
765 ewv 1.131
766 ewv 1.182 ##### AdditionalFiles
767 spiga 1.179 for file in self.additional_inbox_files:
768     tar.add(file,string.split(file,'/')[-1])
769 slacapra 1.61 common.logger.debug(5,"Files added to "+self.tgzNameWithPath+" : "+str(tar.getnames()))
770 ewv 1.182
771 slacapra 1.61 tar.close()
772     except :
773     raise CrabException('Could not create tar-ball')
774 gutsche 1.72
775     ## check for tarball size
776     tarballinfo = os.stat(self.tgzNameWithPath)
777     if ( tarballinfo.st_size > self.MaxTarBallSize*1024*1024 ) :
778     raise CrabException('Input sandbox size of ' + str(float(tarballinfo.st_size)/1024.0/1024.0) + ' MB is larger than the allowed ' + str(self.MaxTarBallSize) + ' MB input sandbox limit and not supported by the used GRID submission system. Please make sure that no unnecessary files are in all data directories in your local CMSSW project area as they are automatically packed into the input sandbox.')
779    
780 slacapra 1.61 ## create tar-ball with ML stuff
781 slacapra 1.97
782 spiga 1.165 def wsSetupEnvironment(self, nj=0):
783 slacapra 1.1 """
784     Returns part of a job script which prepares
785     the execution environment for the job 'nj'.
786     """
787 ewv 1.184 if (self.CMSSW_major >= 2 and self.CMSSW_minor >= 1) or (self.CMSSW_major >= 3):
788     psetName = 'pset.py'
789     else:
790     psetName = 'pset.cfg'
791 slacapra 1.1 # Prepare JobType-independent part
792 ewv 1.160 txt = '\n#Written by cms_cmssw::wsSetupEnvironment\n'
793 fanzago 1.133 txt += 'echo ">>> setup environment"\n'
794 ewv 1.131 txt += 'if [ $middleware == LCG ]; then \n'
795 gutsche 1.3 txt += self.wsSetupCMSLCGEnvironment_()
796     txt += 'elif [ $middleware == OSG ]; then\n'
797 gutsche 1.43 txt += ' WORKING_DIR=`/bin/mktemp -d $OSG_WN_TMP/cms_XXXXXXXXXXXX`\n'
798 ewv 1.132 txt += ' if [ ! $? == 0 ] ;then\n'
799 fanzago 1.161 txt += ' echo "ERROR ==> OSG $WORKING_DIR could not be created on WN `hostname`"\n'
800     txt += ' job_exit_code=10016\n'
801     txt += ' func_exit\n'
802 gutsche 1.3 txt += ' fi\n'
803 fanzago 1.133 txt += ' echo ">>> Created working directory: $WORKING_DIR"\n'
804 gutsche 1.3 txt += '\n'
805     txt += ' echo "Change to working directory: $WORKING_DIR"\n'
806     txt += ' cd $WORKING_DIR\n'
807 fanzago 1.133 txt += ' echo ">>> current directory (WORKING_DIR): $WORKING_DIR"\n'
808 ewv 1.131 txt += self.wsSetupCMSOSGEnvironment_()
809 fanzago 1.133 #txt += ' echo "### Set SCRAM ARCH to ' + self.executable_arch + ' ###"\n'
810     #txt += ' export SCRAM_ARCH='+self.executable_arch+'\n'
811 gutsche 1.3 txt += 'fi\n'
812 slacapra 1.1
813     # Prepare JobType-specific part
814     scram = self.scram.commandName()
815     txt += '\n\n'
816 fanzago 1.133 txt += 'echo ">>> specific cmssw setup environment:"\n'
817     txt += 'echo "CMSSW_VERSION = '+self.version+'"\n'
818 slacapra 1.1 txt += scram+' project CMSSW '+self.version+'\n'
819     txt += 'status=$?\n'
820     txt += 'if [ $status != 0 ] ; then\n'
821 fanzago 1.161 txt += ' echo "ERROR ==> CMSSW '+self.version+' not found on `hostname`" \n'
822     txt += ' job_exit_code=10034\n'
823 fanzago 1.163 txt += ' func_exit\n'
824 slacapra 1.1 txt += 'fi \n'
825     txt += 'cd '+self.version+'\n'
826 fanzago 1.99 txt += 'SOFTWARE_DIR=`pwd`\n'
827 fanzago 1.133 txt += 'echo ">>> current directory (SOFTWARE_DIR): $SOFTWARE_DIR" \n'
828 slacapra 1.1 txt += 'eval `'+scram+' runtime -sh | grep -v SCRAMRT_LSB_JOBNAME`\n'
829 fanzago 1.180 txt += 'if [ $? != 0 ] ; then\n'
830     txt += ' echo "ERROR ==> Problem with the command: "\n'
831     txt += ' echo "eval \`'+scram+' runtime -sh | grep -v SCRAMRT_LSB_JOBNAME \` at `hostname`"\n'
832     txt += ' job_exit_code=10034\n'
833     txt += ' func_exit\n'
834     txt += 'fi \n'
835 slacapra 1.1 # Handle the arguments:
836     txt += "\n"
837 gutsche 1.7 txt += "## number of arguments (first argument always jobnumber)\n"
838 slacapra 1.1 txt += "\n"
839 spiga 1.165 txt += "if [ $nargs -lt "+str(self.argsList)+" ]\n"
840 slacapra 1.1 txt += "then\n"
841 fanzago 1.161 txt += " echo 'ERROR ==> Too few arguments' +$nargs+ \n"
842     txt += ' job_exit_code=50113\n'
843     txt += " func_exit\n"
844 slacapra 1.1 txt += "fi\n"
845     txt += "\n"
846    
847     # Prepare job-specific part
848     job = common.job_list[nj]
849 fanzago 1.93 ### FEDE FOR DBS OUTPUT PUBLICATION
850 ewv 1.131 if (self.datasetPath):
851 fanzago 1.93 txt += '\n'
852     txt += 'DatasetPath='+self.datasetPath+'\n'
853    
854     datasetpath_split = self.datasetPath.split("/")
855 ewv 1.131
856 fanzago 1.93 txt += 'PrimaryDataset='+datasetpath_split[1]+'\n'
857     txt += 'DataTier='+datasetpath_split[2]+'\n'
858 fanzago 1.96 txt += 'ApplicationFamily=cmsRun\n'
859 fanzago 1.93
860     else:
861     txt += 'DatasetPath=MCDataTier\n'
862     txt += 'PrimaryDataset=null\n'
863     txt += 'DataTier=null\n'
864     txt += 'ApplicationFamily=MCDataTier\n'
865 ewv 1.170 if self.pset != None:
866 spiga 1.42 pset = os.path.basename(job.configFilename())
867     txt += '\n'
868 spiga 1.95 txt += 'cp $RUNTIME_AREA/'+pset+' .\n'
869 spiga 1.42 if (self.datasetPath): # standard job
870 ewv 1.160 txt += 'InputFiles=${args[1]}; export InputFiles\n'
871     txt += 'MaxEvents=${args[2]}; export MaxEvents\n'
872     txt += 'SkipEvents=${args[3]}; export SkipEvents\n'
873 spiga 1.42 txt += 'echo "Inputfiles:<$InputFiles>"\n'
874     txt += 'echo "MaxEvents:<$MaxEvents>"\n'
875     txt += 'echo "SkipEvents:<$SkipEvents>"\n'
876     else: # pythia like job
877 ewv 1.160 txt += 'PreserveSeeds=' + ','.join(self.preserveSeeds) + '; export PreserveSeeds\n'
878     txt += 'IncrementSeeds=' + ','.join(self.incrementSeeds) + '; export IncrementSeeds\n'
879     txt += 'echo "PreserveSeeds: <$PreserveSeeds>"\n'
880     txt += 'echo "IncrementSeeds:<$IncrementSeeds>"\n'
881 slacapra 1.90 if (self.firstRun):
882 ewv 1.160 txt += 'FirstRun=${args[1]}; export FirstRun\n'
883 spiga 1.57 txt += 'echo "FirstRun: <$FirstRun>"\n'
884 slacapra 1.90
885 ewv 1.184 txt += 'mv -f ' + pset + ' ' + psetName + '\n'
886 slacapra 1.1
887    
888 fanzago 1.163 if self.pset != None:
889 ewv 1.184 # FUTURE: Can simply for 2_1_x and higher
890 spiga 1.42 txt += '\n'
891 ewv 1.184 txt += 'echo "***** cat ' + psetName + ' *********"\n'
892     txt += 'cat ' + psetName + '\n'
893     txt += 'echo "****** end ' + psetName + ' ********"\n'
894 spiga 1.42 txt += '\n'
895 ewv 1.184 txt += 'PSETHASH=`edmConfigHash < ' + psetName + '` \n'
896 fanzago 1.94 txt += 'echo "PSETHASH = $PSETHASH" \n'
897 fanzago 1.93 txt += '\n'
898 gutsche 1.3 return txt
899 slacapra 1.176
900 fanzago 1.166 def wsUntarSoftware(self, nj=0):
901 gutsche 1.3 """
902     Put in the script the commands to build an executable
903     or a library.
904     """
905    
906 fanzago 1.166 txt = '\n#Written by cms_cmssw::wsUntarSoftware\n'
907 gutsche 1.3
908     if os.path.isfile(self.tgzNameWithPath):
909 fanzago 1.133 txt += 'echo ">>> tar xzvf $RUNTIME_AREA/'+os.path.basename(self.tgzNameWithPath)+' :" \n'
910 gutsche 1.3 txt += 'tar xzvf $RUNTIME_AREA/'+os.path.basename(self.tgzNameWithPath)+'\n'
911 spiga 1.179 txt += 'ls -Al \n'
912 gutsche 1.3 txt += 'untar_status=$? \n'
913     txt += 'if [ $untar_status -ne 0 ]; then \n'
914 fanzago 1.161 txt += ' echo "ERROR ==> Untarring .tgz file failed"\n'
915     txt += ' job_exit_code=$untar_status\n'
916     txt += ' func_exit\n'
917 gutsche 1.3 txt += 'else \n'
918     txt += ' echo "Successful untar" \n'
919     txt += 'fi \n'
920 gutsche 1.50 txt += '\n'
921 fanzago 1.152 txt += 'echo ">>> Include ProdCommon in PYTHONPATH:"\n'
922 gutsche 1.50 txt += 'if [ -z "$PYTHONPATH" ]; then\n'
923 fanzago 1.166 txt += ' export PYTHONPATH=$RUNTIME_AREA/ProdCommon\n'
924 gutsche 1.50 txt += 'else\n'
925 fanzago 1.166 txt += ' export PYTHONPATH=$RUNTIME_AREA/ProdCommon:${PYTHONPATH}\n'
926 fanzago 1.93 txt += 'echo "PYTHONPATH=$PYTHONPATH"\n'
927 gutsche 1.50 txt += 'fi\n'
928     txt += '\n'
929    
930 gutsche 1.3 pass
931 ewv 1.131
932 slacapra 1.1 return txt
933 ewv 1.170
934 fanzago 1.166 def wsBuildExe(self, nj=0):
935     """
936     Put in the script the commands to build an executable
937     or a library.
938     """
939    
940     txt = '\n#Written by cms_cmssw::wsBuildExe\n'
941     txt += 'echo ">>> moving CMSSW software directories in `pwd`" \n'
942    
943 ewv 1.170 txt += 'rm -r lib/ module/ \n'
944     txt += 'mv $RUNTIME_AREA/lib/ . \n'
945     txt += 'mv $RUNTIME_AREA/module/ . \n'
946 spiga 1.179 if self.dataExist == True: txt += 'mv $RUNTIME_AREA/src/ . \n'
947 ewv 1.182 if len(self.additional_inbox_files)>0:
948 spiga 1.179 for file in self.additional_inbox_files:
949     txt += 'mv $RUNTIME_AREA/'+file+' . \n'
950 ewv 1.170 txt += 'mv $RUNTIME_AREA/ProdCommon/ . \n'
951    
952 fanzago 1.166 txt += 'if [ -z "$PYTHONPATH" ]; then\n'
953     txt += ' export PYTHONPATH=$SOFTWARE_DIR/ProdCommon\n'
954     txt += 'else\n'
955     txt += ' export PYTHONPATH=$SOFTWARE_DIR/ProdCommon:${PYTHONPATH}\n'
956     txt += 'echo "PYTHONPATH=$PYTHONPATH"\n'
957     txt += 'fi\n'
958     txt += '\n'
959    
960     return txt
961 slacapra 1.1
962     def modifySteeringCards(self, nj):
963     """
964 ewv 1.131 modify the card provided by the user,
965 slacapra 1.1 writing a new card into share dir
966     """
967 ewv 1.131
968 slacapra 1.1 def executableName(self):
969 slacapra 1.70 if self.scriptExe: #CarlosDaniele
970 spiga 1.42 return "sh "
971     else:
972     return self.executable
973 slacapra 1.1
974     def executableArgs(self):
975 ewv 1.160 # FUTURE: This function tests the CMSSW version. Can be simplified as we drop support for old versions
976 slacapra 1.70 if self.scriptExe:#CarlosDaniele
977 spiga 1.42 return self.scriptExe + " $NJob"
978 fanzago 1.115 else:
979 ewv 1.160 ex_args = ""
980 ewv 1.171 # FUTURE: This tests the CMSSW version. Can remove code as versions deprecated
981 ewv 1.160 # Framework job report
982 ewv 1.184 if (self.CMSSW_major >= 1 and self.CMSSW_minor >= 5) or (self.CMSSW_major >= 2):
983 fanzago 1.166 ex_args += " -j $RUNTIME_AREA/crab_fjr_$NJob.xml"
984 ewv 1.184 # Type of config file
985     if self.CMSSW_major >= 2 :
986 ewv 1.171 ex_args += " -p pset.py"
987 fanzago 1.115 else:
988 ewv 1.160 ex_args += " -p pset.cfg"
989     return ex_args
990 slacapra 1.1
991     def inputSandbox(self, nj):
992     """
993     Returns a list of filenames to be put in JDL input sandbox.
994     """
995     inp_box = []
996 slacapra 1.53 # # dict added to delete duplicate from input sandbox file list
997     # seen = {}
998 slacapra 1.1 ## code
999     if os.path.isfile(self.tgzNameWithPath):
1000     inp_box.append(self.tgzNameWithPath)
1001 spiga 1.168 wrapper = os.path.basename(str(common._db.queryTask('scriptName')))
1002     inp_box.append(common.work_space.pathForTgz() +'job/'+ wrapper)
1003 slacapra 1.1 return inp_box
1004    
1005     def outputSandbox(self, nj):
1006     """
1007     Returns a list of filenames to be put in JDL output sandbox.
1008     """
1009     out_box = []
1010    
1011     ## User Declared output files
1012 slacapra 1.54 for out in (self.output_file+self.output_file_sandbox):
1013 ewv 1.131 n_out = nj + 1
1014 slacapra 1.1 out_box.append(self.numberFile_(out,str(n_out)))
1015     return out_box
1016    
1017     def prepareSteeringCards(self):
1018     """
1019     Make initial modifications of the user's steering card file.
1020     """
1021     return
1022    
1023     def wsRenameOutput(self, nj):
1024     """
1025     Returns part of a job script which renames the produced files.
1026     """
1027    
1028 ewv 1.160 txt = '\n#Written by cms_cmssw::wsRenameOutput\n'
1029 fanzago 1.148 txt += 'echo ">>> current directory (SOFTWARE_DIR): $SOFTWARE_DIR" \n'
1030     txt += 'echo ">>> current directory content:"\n'
1031 gutsche 1.7 txt += 'ls \n'
1032 fanzago 1.145 txt += '\n'
1033 slacapra 1.54
1034 fanzago 1.128 for fileWithSuffix in (self.output_file):
1035 slacapra 1.1 output_file_num = self.numberFile_(fileWithSuffix, '$NJob')
1036     txt += '\n'
1037 gutsche 1.7 txt += '# check output file\n'
1038 slacapra 1.106 txt += 'if [ -e ./'+fileWithSuffix+' ] ; then\n'
1039 ewv 1.147 if (self.copy_data == 1): # For OSG nodes, file is in $WORKING_DIR, should not be moved to $RUNTIME_AREA
1040     txt += ' mv '+fileWithSuffix+' '+output_file_num+'\n'
1041     txt += ' ln -s `pwd`/'+output_file_num+' $RUNTIME_AREA/'+fileWithSuffix+'\n'
1042     else:
1043     txt += ' mv '+fileWithSuffix+' $RUNTIME_AREA/'+output_file_num+'\n'
1044     txt += ' ln -s $RUNTIME_AREA/'+output_file_num+' $RUNTIME_AREA/'+fileWithSuffix+'\n'
1045 slacapra 1.106 txt += 'else\n'
1046 fanzago 1.161 txt += ' job_exit_code=60302\n'
1047     txt += ' echo "WARNING: Output file '+fileWithSuffix+' not found"\n'
1048 ewv 1.156 if common.scheduler.name().upper() == 'CONDOR_G':
1049 gutsche 1.7 txt += ' if [ $middleware == OSG ]; then \n'
1050     txt += ' echo "prepare dummy output file"\n'
1051     txt += ' echo "Processing of job output failed" > $RUNTIME_AREA/'+output_file_num+'\n'
1052     txt += ' fi \n'
1053 slacapra 1.1 txt += 'fi\n'
1054 slacapra 1.105 file_list = []
1055     for fileWithSuffix in (self.output_file):
1056     file_list.append(self.numberFile_(fileWithSuffix, '$NJob'))
1057 ewv 1.131
1058 slacapra 1.105 txt += 'file_list="'+string.join(file_list,' ')+'"\n'
1059 fanzago 1.149 txt += '\n'
1060 fanzago 1.148 txt += 'echo ">>> current directory (SOFTWARE_DIR): $SOFTWARE_DIR" \n'
1061     txt += 'echo ">>> current directory content:"\n'
1062     txt += 'ls \n'
1063     txt += '\n'
1064 gutsche 1.7 txt += 'cd $RUNTIME_AREA\n'
1065 fanzago 1.133 txt += 'echo ">>> current directory (RUNTIME_AREA): $RUNTIME_AREA"\n'
1066 slacapra 1.1 return txt
1067    
1068     def numberFile_(self, file, txt):
1069     """
1070     append _'txt' before last extension of a file
1071     """
1072     p = string.split(file,".")
1073     # take away last extension
1074     name = p[0]
1075     for x in p[1:-1]:
1076 slacapra 1.90 name=name+"."+x
1077 slacapra 1.1 # add "_txt"
1078     if len(p)>1:
1079 slacapra 1.90 ext = p[len(p)-1]
1080     result = name + '_' + txt + "." + ext
1081 slacapra 1.1 else:
1082 slacapra 1.90 result = name + '_' + txt
1083 ewv 1.131
1084 slacapra 1.1 return result
1085    
1086 slacapra 1.63 def getRequirements(self, nj=[]):
1087 slacapra 1.1 """
1088 ewv 1.131 return job requirements to add to jdl files
1089 slacapra 1.1 """
1090     req = ''
1091 slacapra 1.47 if self.version:
1092 slacapra 1.10 req='Member("VO-cms-' + \
1093 slacapra 1.47 self.version + \
1094 slacapra 1.10 '", other.GlueHostApplicationSoftwareRunTimeEnvironment)'
1095 farinafa 1.111 ## SL add requirement for OS version only if SL4
1096     #reSL4 = re.compile( r'slc4' )
1097 slacapra 1.109 if self.executable_arch: # and reSL4.search(self.executable_arch):
1098 gutsche 1.107 req+=' && Member("VO-cms-' + \
1099 slacapra 1.105 self.executable_arch + \
1100     '", other.GlueHostApplicationSoftwareRunTimeEnvironment)'
1101 gutsche 1.35
1102     req = req + ' && (other.GlueHostNetworkAdapterOutboundIP)'
1103 afanfani 1.158 if common.scheduler.name() == "glitecoll":
1104     req += ' && other.GlueCEStateStatus == "Production" '
1105 gutsche 1.35
1106 slacapra 1.1 return req
1107 gutsche 1.3
1108     def configFilename(self):
1109     """ return the config filename """
1110 ewv 1.182 # FUTURE: Can remove cfg mode for CMSSW >= 2_1_x
1111 ewv 1.184 if (self.CMSSW_major >= 2 and self.CMSSW_minor >= 1) or (self.CMSSW_major >= 3):
1112 ewv 1.182 return self.name()+'.py'
1113     else:
1114     return self.name()+'.cfg'
1115 gutsche 1.3
1116     def wsSetupCMSOSGEnvironment_(self):
1117     """
1118     Returns part of a job script which is prepares
1119     the execution environment and which is common for all CMS jobs.
1120     """
1121 ewv 1.160 txt = '\n#Written by cms_cmssw::wsSetupCMSOSGEnvironment_\n'
1122     txt += ' echo ">>> setup CMS OSG environment:"\n'
1123 fanzago 1.133 txt += ' echo "set SCRAM ARCH to ' + self.executable_arch + '"\n'
1124     txt += ' export SCRAM_ARCH='+self.executable_arch+'\n'
1125 fanzago 1.136 txt += ' echo "SCRAM_ARCH = $SCRAM_ARCH"\n'
1126 ewv 1.135 txt += ' if [ -f $OSG_APP/cmssoft/cms/cmsset_default.sh ] ;then\n'
1127 mkirn 1.40 txt += ' # Use $OSG_APP/cmssoft/cms/cmsset_default.sh to setup cms software\n'
1128 fanzago 1.133 txt += ' source $OSG_APP/cmssoft/cms/cmsset_default.sh '+self.version+'\n'
1129     txt += ' else\n'
1130 fanzago 1.161 txt += ' echo "ERROR ==> $OSG_APP/cmssoft/cms/cmsset_default.sh file not found"\n'
1131     txt += ' job_exit_code=10020\n'
1132     txt += ' func_exit\n'
1133 fanzago 1.133 txt += ' fi\n'
1134 gutsche 1.3 txt += '\n'
1135 fanzago 1.161 txt += ' echo "==> setup cms environment ok"\n'
1136 fanzago 1.136 txt += ' echo "SCRAM_ARCH = $SCRAM_ARCH"\n'
1137 gutsche 1.3
1138     return txt
1139 ewv 1.131
1140 gutsche 1.3 ### OLI_DANIELE
1141     def wsSetupCMSLCGEnvironment_(self):
1142     """
1143     Returns part of a job script which is prepares
1144     the execution environment and which is common for all CMS jobs.
1145     """
1146 ewv 1.160 txt = '\n#Written by cms_cmssw::wsSetupCMSLCGEnvironment_\n'
1147     txt += ' echo ">>> setup CMS LCG environment:"\n'
1148 fanzago 1.133 txt += ' echo "set SCRAM ARCH and BUILD_ARCH to ' + self.executable_arch + ' ###"\n'
1149     txt += ' export SCRAM_ARCH='+self.executable_arch+'\n'
1150     txt += ' export BUILD_ARCH='+self.executable_arch+'\n'
1151     txt += ' if [ ! $VO_CMS_SW_DIR ] ;then\n'
1152 fanzago 1.161 txt += ' echo "ERROR ==> CMS software dir not found on WN `hostname`"\n'
1153     txt += ' job_exit_code=10031\n'
1154     txt += ' func_exit\n'
1155 fanzago 1.133 txt += ' else\n'
1156     txt += ' echo "Sourcing environment... "\n'
1157     txt += ' if [ ! -s $VO_CMS_SW_DIR/cmsset_default.sh ] ;then\n'
1158 fanzago 1.161 txt += ' echo "ERROR ==> cmsset_default.sh file not found into dir $VO_CMS_SW_DIR"\n'
1159     txt += ' job_exit_code=10020\n'
1160     txt += ' func_exit\n'
1161 fanzago 1.133 txt += ' fi\n'
1162     txt += ' echo "sourcing $VO_CMS_SW_DIR/cmsset_default.sh"\n'
1163     txt += ' source $VO_CMS_SW_DIR/cmsset_default.sh\n'
1164     txt += ' result=$?\n'
1165     txt += ' if [ $result -ne 0 ]; then\n'
1166 fanzago 1.161 txt += ' echo "ERROR ==> problem sourcing $VO_CMS_SW_DIR/cmsset_default.sh"\n'
1167     txt += ' job_exit_code=10032\n'
1168     txt += ' func_exit\n'
1169 fanzago 1.133 txt += ' fi\n'
1170     txt += ' fi\n'
1171     txt += ' \n'
1172 fanzago 1.161 txt += ' echo "==> setup cms environment ok"\n'
1173 gutsche 1.3 return txt
1174 gutsche 1.5
1175 ewv 1.131 ### FEDE FOR DBS OUTPUT PUBLICATION
1176 fanzago 1.93 def modifyReport(self, nj):
1177     """
1178 ewv 1.131 insert the part of the script that modifies the FrameworkJob Report
1179 fanzago 1.93 """
1180 fanzago 1.94
1181 ewv 1.160 txt = '\n#Written by cms_cmssw::modifyReport\n'
1182 slacapra 1.176 publish_data = int(self.cfg_params.get('USER.publish_data',0))
1183 ewv 1.131 if (publish_data == 1):
1184 fanzago 1.94 processedDataset = self.cfg_params['USER.publish_data_name']
1185 fanzago 1.173 LFNBaseName = LFNBase(processedDataset)
1186 fanzago 1.175
1187     txt += 'if [ $copy_exit_status -eq 0 ]; then\n'
1188 fanzago 1.173 txt += ' FOR_LFN=%s_${PSETHASH}/\n'%(LFNBaseName)
1189 fanzago 1.175 txt += 'else\n'
1190     txt += ' FOR_LFN=/copy_problems/ \n'
1191     txt += ' SE=""\n'
1192     txt += ' SE_PATH=""\n'
1193     txt += 'fi\n'
1194 ewv 1.182
1195 fanzago 1.175 txt += 'echo ">>> Modify Job Report:" \n'
1196     txt += 'chmod a+x $SOFTWARE_DIR/ProdCommon/ProdCommon/FwkJobRep/ModifyJobReport.py\n'
1197     txt += 'ProcessedDataset='+processedDataset+'\n'
1198     txt += 'echo "ProcessedDataset = $ProcessedDataset"\n'
1199     txt += 'echo "SE = $SE"\n'
1200     txt += 'echo "SE_PATH = $SE_PATH"\n'
1201     txt += 'echo "FOR_LFN = $FOR_LFN" \n'
1202     txt += 'echo "CMSSW_VERSION = $CMSSW_VERSION"\n\n'
1203     txt += 'echo "$SOFTWARE_DIR/ProdCommon/ProdCommon/FwkJobRep/ModifyJobReport.py $RUNTIME_AREA/crab_fjr_$NJob.xml $NJob $FOR_LFN $PrimaryDataset $DataTier $ProcessedDataset $ApplicationFamily $executable $CMSSW_VERSION $PSETHASH $SE $SE_PATH"\n'
1204     txt += '$SOFTWARE_DIR/ProdCommon/ProdCommon/FwkJobRep/ModifyJobReport.py $RUNTIME_AREA/crab_fjr_$NJob.xml $NJob $FOR_LFN $PrimaryDataset $DataTier $ProcessedDataset $ApplicationFamily $executable $CMSSW_VERSION $PSETHASH $SE $SE_PATH\n'
1205     txt += 'modifyReport_result=$?\n'
1206     txt += 'if [ $modifyReport_result -ne 0 ]; then\n'
1207     txt += ' modifyReport_result=70500\n'
1208     txt += ' job_exit_code=$modifyReport_result\n'
1209     txt += ' echo "ModifyReportResult=$modifyReport_result" | tee -a $RUNTIME_AREA/$repo\n'
1210     txt += ' echo "WARNING: Problem with ModifyJobReport"\n'
1211     txt += 'else\n'
1212     txt += ' mv NewFrameworkJobReport.xml $RUNTIME_AREA/crab_fjr_$NJob.xml\n'
1213 spiga 1.103 txt += 'fi\n'
1214 fanzago 1.93 return txt
1215 fanzago 1.99
1216 gutsche 1.5 def setParam_(self, param, value):
1217     self._params[param] = value
1218    
1219     def getParams(self):
1220     return self._params
1221 gutsche 1.8
1222 gutsche 1.35 def uniquelist(self, old):
1223     """
1224     remove duplicates from a list
1225     """
1226     nd={}
1227     for e in old:
1228     nd[e]=0
1229     return nd.keys()
1230 mcinquil 1.121
1231 spiga 1.169 def outList(self):
1232 mcinquil 1.121 """
1233     check the dimension of the output files
1234     """
1235 spiga 1.169 txt = ''
1236     txt += 'echo ">>> list of expected files on output sandbox"\n'
1237 mcinquil 1.121 listOutFiles = []
1238 ewv 1.170 stdout = 'CMSSW_$NJob.stdout'
1239 spiga 1.169 stderr = 'CMSSW_$NJob.stderr'
1240 fanzago 1.148 if (self.return_data == 1):
1241 spiga 1.157 for file in (self.output_file+self.output_file_sandbox):
1242     listOutFiles.append(self.numberFile_(file, '$NJob'))
1243 spiga 1.169 listOutFiles.append(stdout)
1244     listOutFiles.append(stderr)
1245 ewv 1.156 else:
1246 spiga 1.157 for file in (self.output_file_sandbox):
1247     listOutFiles.append(self.numberFile_(file, '$NJob'))
1248 spiga 1.169 listOutFiles.append(stdout)
1249     listOutFiles.append(stderr)
1250 fanzago 1.161 txt += 'echo "output files: '+string.join(listOutFiles,' ')+'"\n'
1251 spiga 1.157 txt += 'filesToCheck="'+string.join(listOutFiles,' ')+'"\n'
1252 spiga 1.169 txt += 'export filesToCheck\n'
1253 ewv 1.170 return txt