ViewVC Help
View File | Revision Log | Show Annotations | Root Listing
root/cvsroot/COMP/CRAB/python/cms_cmssw.py
Revision: 1.184
Committed: Wed Apr 30 19:41:29 2008 UTC (17 years ago) by ewv
Content type: text/x-python
Branch: MAIN
CVS Tags: CRAB_2_2_0_pre18, CRAB_2_2_0_pre17, CRAB_2_2_0_pre16
Changes since 1.183: +22 -31 lines
Log Message:
Finalize support for 2_1_x and supplying python config file in crab.cfg

File Contents

# User Rev Content
1 slacapra 1.1 from JobType import JobType
2     from crab_logger import Logger
3     from crab_exceptions import *
4     from crab_util import *
5 fanzago 1.115 from BlackWhiteListParser import BlackWhiteListParser
6 slacapra 1.1 import common
7     import Scram
8 fanzago 1.173 from LFNBaseName import *
9 slacapra 1.1
10 slacapra 1.105 import os, string, glob
11 slacapra 1.1
12     class Cmssw(JobType):
13 mcinquil 1.144 def __init__(self, cfg_params, ncjobs):
14 slacapra 1.1 JobType.__init__(self, 'CMSSW')
15     common.logger.debug(3,'CMSSW::__init__')
16    
17 mcinquil 1.140 self.argsList = []
18 mcinquil 1.144
19 gutsche 1.3 self._params = {}
20     self.cfg_params = cfg_params
21 fanzago 1.115 # init BlackWhiteListParser
22     self.blackWhiteListParser = BlackWhiteListParser(cfg_params)
23    
24 slacapra 1.153 self.MaxTarBallSize = float(self.cfg_params.get('EDG.maxtarballsize',9.5))
25 gutsche 1.72
26 gutsche 1.44 # number of jobs requested to be created, limit obj splitting
27 gutsche 1.38 self.ncjobs = ncjobs
28    
29 slacapra 1.1 log = common.logger
30 ewv 1.131
31 slacapra 1.1 self.scram = Scram.Scram(cfg_params)
32     self.additional_inbox_files = []
33     self.scriptExe = ''
34     self.executable = ''
35 slacapra 1.71 self.executable_arch = self.scram.getArch()
36 slacapra 1.1 self.tgz_name = 'default.tgz'
37 corvo 1.56 self.scriptName = 'CMSSW.sh'
38 ewv 1.131 self.pset = '' #scrip use case Da
39 spiga 1.42 self.datasetPath = '' #scrip use case Da
40 gutsche 1.3
41 gutsche 1.50 # set FJR file name
42     self.fjrFileName = 'crab_fjr.xml'
43    
44 slacapra 1.1 self.version = self.scram.getSWVersion()
45 ewv 1.182 version_array = self.version.split('_')
46 ewv 1.184 self.CMSSW_major = 0
47     self.CMSSW_minor = 0
48     self.CMSSW_patch = 0
49 ewv 1.182 try:
50 ewv 1.184 self.CMSSW_major = int(version_array[1])
51     self.CMSSW_minor = int(version_array[2])
52     self.CMSSW_patch = int(version_array[3])
53 ewv 1.182 except:
54 ewv 1.184 msg = "Cannot parse CMSSW version string: " + self.version + " for major and minor release number!"
55 ewv 1.182 raise CrabException(msg)
56    
57 slacapra 1.1 ### collect Data cards
58 gutsche 1.66
59 slacapra 1.153 if not cfg_params.has_key('CMSSW.datasetpath'):
60 ewv 1.131 msg = "Error: datasetpath not defined "
61 slacapra 1.1 raise CrabException(msg)
62 slacapra 1.153 tmp = cfg_params['CMSSW.datasetpath']
63     log.debug(6, "CMSSW::CMSSW(): datasetPath = "+tmp)
64     if string.lower(tmp)=='none':
65     self.datasetPath = None
66     self.selectNoInput = 1
67     else:
68     self.datasetPath = tmp
69     self.selectNoInput = 0
70 gutsche 1.5
71 slacapra 1.1 self.dataTiers = []
72    
73     ## now the application
74 slacapra 1.153 self.executable = cfg_params.get('CMSSW.executable','cmsRun')
75     log.debug(6, "CMSSW::CMSSW(): executable = "+self.executable)
76 slacapra 1.1
77 slacapra 1.153 if not cfg_params.has_key('CMSSW.pset'):
78 slacapra 1.1 raise CrabException("PSet file missing. Cannot run cmsRun ")
79 slacapra 1.153 self.pset = cfg_params['CMSSW.pset']
80     log.debug(6, "Cmssw::Cmssw(): PSet file = "+self.pset)
81     if self.pset.lower() != 'none' :
82     if (not os.path.exists(self.pset)):
83     raise CrabException("User defined PSet file "+self.pset+" does not exist")
84     else:
85     self.pset = None
86 slacapra 1.1
87     # output files
88 slacapra 1.53 ## stuff which must be returned always via sandbox
89     self.output_file_sandbox = []
90    
91     # add fjr report by default via sandbox
92     self.output_file_sandbox.append(self.fjrFileName)
93    
94     # other output files to be returned via sandbox or copied to SE
95 slacapra 1.153 self.output_file = []
96     tmp = cfg_params.get('CMSSW.output_file',None)
97     if tmp :
98     tmpOutFiles = string.split(tmp,',')
99     log.debug(7, 'cmssw::cmssw(): output files '+str(tmpOutFiles))
100     for tmp in tmpOutFiles:
101     tmp=string.strip(tmp)
102     self.output_file.append(tmp)
103 slacapra 1.1 pass
104 slacapra 1.153 else:
105 gutsche 1.92 log.message("No output file defined: only stdout/err and the CRAB Framework Job Report will be available\n")
106 slacapra 1.153 pass
107 slacapra 1.1
108     # script_exe file as additional file in inputSandbox
109 slacapra 1.153 self.scriptExe = cfg_params.get('USER.script_exe',None)
110     if self.scriptExe :
111 slacapra 1.176 if not os.path.isfile(self.scriptExe):
112     msg ="ERROR. file "+self.scriptExe+" not found"
113     raise CrabException(msg)
114     self.additional_inbox_files.append(string.strip(self.scriptExe))
115 slacapra 1.70
116 spiga 1.42 #CarlosDaniele
117     if self.datasetPath == None and self.pset == None and self.scriptExe == '' :
118 slacapra 1.176 msg ="Error. script_exe not defined"
119     raise CrabException(msg)
120 spiga 1.42
121 slacapra 1.1 ## additional input files
122 slacapra 1.153 if cfg_params.has_key('USER.additional_input_files'):
123 slacapra 1.29 tmpAddFiles = string.split(cfg_params['USER.additional_input_files'],',')
124 slacapra 1.70 for tmp in tmpAddFiles:
125     tmp = string.strip(tmp)
126     dirname = ''
127     if not tmp[0]=="/": dirname = "."
128 corvo 1.85 files = []
129     if string.find(tmp,"*")>-1:
130     files = glob.glob(os.path.join(dirname, tmp))
131     if len(files)==0:
132     raise CrabException("No additional input file found with this pattern: "+tmp)
133     else:
134     files.append(tmp)
135 slacapra 1.70 for file in files:
136     if not os.path.exists(file):
137     raise CrabException("Additional input file not found: "+file)
138 slacapra 1.45 pass
139 slacapra 1.105 # fname = string.split(file, '/')[-1]
140     # storedFile = common.work_space.pathForTgz()+'share/'+fname
141     # shutil.copyfile(file, storedFile)
142     self.additional_inbox_files.append(string.strip(file))
143 slacapra 1.1 pass
144     pass
145 slacapra 1.70 common.logger.debug(5,"Additional input files: "+str(self.additional_inbox_files))
146 slacapra 1.153 pass
147 gutsche 1.3
148 slacapra 1.9 ## Events per job
149 slacapra 1.153 if cfg_params.has_key('CMSSW.events_per_job'):
150 slacapra 1.10 self.eventsPerJob =int( cfg_params['CMSSW.events_per_job'])
151 slacapra 1.9 self.selectEventsPerJob = 1
152 slacapra 1.153 else:
153 slacapra 1.9 self.eventsPerJob = -1
154     self.selectEventsPerJob = 0
155 ewv 1.131
156 slacapra 1.22 ## number of jobs
157 slacapra 1.153 if cfg_params.has_key('CMSSW.number_of_jobs'):
158 slacapra 1.22 self.theNumberOfJobs =int( cfg_params['CMSSW.number_of_jobs'])
159     self.selectNumberOfJobs = 1
160 slacapra 1.153 else:
161 slacapra 1.22 self.theNumberOfJobs = 0
162     self.selectNumberOfJobs = 0
163 slacapra 1.10
164 slacapra 1.153 if cfg_params.has_key('CMSSW.total_number_of_events'):
165 gutsche 1.35 self.total_number_of_events = int(cfg_params['CMSSW.total_number_of_events'])
166     self.selectTotalNumberEvents = 1
167 slacapra 1.153 else:
168 gutsche 1.35 self.total_number_of_events = 0
169     self.selectTotalNumberEvents = 0
170    
171 ewv 1.131 if self.pset != None: #CarlosDaniele
172 spiga 1.42 if ( (self.selectTotalNumberEvents + self.selectEventsPerJob + self.selectNumberOfJobs) != 2 ):
173     msg = 'Must define exactly two of total_number_of_events, events_per_job, or number_of_jobs.'
174     raise CrabException(msg)
175     else:
176     if (self.selectNumberOfJobs == 0):
177     msg = 'Must specify number_of_jobs.'
178     raise CrabException(msg)
179 gutsche 1.35
180 ewv 1.160 ## New method of dealing with seeds
181     self.incrementSeeds = []
182     self.preserveSeeds = []
183     if cfg_params.has_key('CMSSW.preserve_seeds'):
184     tmpList = cfg_params['CMSSW.preserve_seeds'].split(',')
185     for tmp in tmpList:
186     tmp.strip()
187     self.preserveSeeds.append(tmp)
188     if cfg_params.has_key('CMSSW.increment_seeds'):
189     tmpList = cfg_params['CMSSW.increment_seeds'].split(',')
190     for tmp in tmpList:
191     tmp.strip()
192     self.incrementSeeds.append(tmp)
193    
194     ## Old method of dealing with seeds
195     ## FUTURE: This is for old CMSSW and old CRAB. Can throw exceptions after a couple of CRAB releases and then
196     ## remove
197 slacapra 1.153 self.sourceSeed = cfg_params.get('CMSSW.pythia_seed',None)
198 ewv 1.160 if self.sourceSeed:
199 slacapra 1.177 print "pythia_seed is a deprecated parameter. Use preserve_seeds or increment_seeds in the future.\n","Added to increment_seeds."
200     self.incrementSeeds.append('sourceSeed')
201 slacapra 1.153
202     self.sourceSeedVtx = cfg_params.get('CMSSW.vtx_seed',None)
203 ewv 1.160 if self.sourceSeedVtx:
204 slacapra 1.177 print "vtx_seed is a deprecated parameter. Use preserve_seeds or increment_seeds in the future.\n","Added to increment_seeds."
205     self.incrementSeeds.append('VtxSmeared')
206 slacapra 1.22
207 slacapra 1.153 self.sourceSeedG4 = cfg_params.get('CMSSW.g4_seed',None)
208 ewv 1.160 if self.sourceSeedG4:
209 slacapra 1.177 print "g4_seed is a deprecated parameter. Use preserve_seeds or increment_seeds in the future.\n","Added to increment_seeds."
210     self.incrementSeeds.append('g4SimHits')
211 slacapra 1.90
212 slacapra 1.153 self.sourceSeedMix = cfg_params.get('CMSSW.mix_seed',None)
213 ewv 1.160 if self.sourceSeedMix:
214 slacapra 1.177 print "mix_seed is a deprecated parameter. Use preserve_seeds or increment_seeds in the future.\n","Added to increment_seeds."
215     self.incrementSeeds.append('mix')
216 slacapra 1.90
217 slacapra 1.153 self.firstRun = cfg_params.get('CMSSW.first_run',None)
218 slacapra 1.90
219 spiga 1.42 if self.pset != None: #CarlosDaniele
220 ewv 1.131 import PsetManipulator as pp
221 slacapra 1.97 PsetEdit = pp.PsetManipulator(self.pset) #Daniele Pset
222 gutsche 1.3
223 ewv 1.147 # Copy/return
224    
225 slacapra 1.153 self.copy_data = int(cfg_params.get('USER.copy_data',0))
226     self.return_data = int(cfg_params.get('USER.return_data',0))
227 ewv 1.147
228 slacapra 1.1 #DBSDLS-start
229 ewv 1.131 ## Initialize the variables that are extracted from DBS/DLS and needed in other places of the code
230 slacapra 1.1 self.maxEvents=0 # max events available ( --> check the requested nb. of evts in Creator.py)
231     self.DBSPaths={} # all dbs paths requested ( --> input to the site local discovery script)
232 gutsche 1.35 self.jobDestination=[] # Site destination(s) for each job (list of lists)
233 slacapra 1.1 ## Perform the data location and discovery (based on DBS/DLS)
234 slacapra 1.9 ## SL: Don't if NONE is specified as input (pythia use case)
235 gutsche 1.35 blockSites = {}
236 slacapra 1.9 if self.datasetPath:
237 gutsche 1.35 blockSites = self.DataDiscoveryAndLocation(cfg_params)
238 ewv 1.131 #DBSDLS-end
239 slacapra 1.1
240 ewv 1.131
241 slacapra 1.9 ## Select Splitting
242 ewv 1.131 if self.selectNoInput:
243 spiga 1.42 if self.pset == None: #CarlosDaniele
244     self.jobSplittingForScript()
245     else:
246     self.jobSplittingNoInput()
247 gutsche 1.92 else:
248 corvo 1.56 self.jobSplittingByBlocks(blockSites)
249 gutsche 1.5
250 slacapra 1.22 # modify Pset
251 spiga 1.42 if self.pset != None: #CarlosDaniele
252 slacapra 1.86 try:
253 ewv 1.160 # Add FrameworkJobReport to parameter-set, set max events.
254     # Reset later for data jobs by writeCFG which does all modifications
255 ewv 1.182 PsetEdit.addCrabFJR(self.fjrFileName) # FUTURE: Job report addition not needed by CMSSW>1.5
256 ewv 1.160 PsetEdit.maxEvent(self.eventsPerJob)
257 slacapra 1.90 PsetEdit.psetWriter(self.configFilename())
258 slacapra 1.86 except:
259 ewv 1.184 msg='Error while manipulating ParameterSet: exiting...'
260 slacapra 1.86 raise CrabException(msg)
261 spiga 1.179 self.tgzNameWithPath = self.getTarBall(self.executable)
262 gutsche 1.3
263 slacapra 1.1 def DataDiscoveryAndLocation(self, cfg_params):
264    
265 slacapra 1.86 import DataDiscovery
266     import DataLocation
267 gutsche 1.3 common.logger.debug(10,"CMSSW::DataDiscoveryAndLocation()")
268    
269     datasetPath=self.datasetPath
270    
271 slacapra 1.1 ## Contact the DBS
272 gutsche 1.92 common.logger.message("Contacting Data Discovery Services ...")
273 slacapra 1.1 try:
274 slacapra 1.137 self.pubdata=DataDiscovery.DataDiscovery(datasetPath, cfg_params)
275 slacapra 1.1 self.pubdata.fetchDBSInfo()
276    
277 slacapra 1.41 except DataDiscovery.NotExistingDatasetError, ex :
278 slacapra 1.1 msg = 'ERROR ***: failed Data Discovery in DBS : %s'%ex.getErrorMessage()
279     raise CrabException(msg)
280 slacapra 1.41 except DataDiscovery.NoDataTierinProvenanceError, ex :
281 slacapra 1.1 msg = 'ERROR ***: failed Data Discovery in DBS : %s'%ex.getErrorMessage()
282     raise CrabException(msg)
283 slacapra 1.41 except DataDiscovery.DataDiscoveryError, ex:
284 gutsche 1.66 msg = 'ERROR ***: failed Data Discovery in DBS : %s'%ex.getErrorMessage()
285 slacapra 1.1 raise CrabException(msg)
286    
287 gutsche 1.35 self.filesbyblock=self.pubdata.getFiles()
288 mkirn 1.37 self.eventsbyblock=self.pubdata.getEventsPerBlock()
289     self.eventsbyfile=self.pubdata.getEventsPerFile()
290 gutsche 1.3
291 slacapra 1.1 ## get max number of events
292 ewv 1.131 self.maxEvents=self.pubdata.getMaxEvents() ## self.maxEvents used in Creator.py
293 slacapra 1.1
294     ## Contact the DLS and build a list of sites hosting the fileblocks
295     try:
296 slacapra 1.41 dataloc=DataLocation.DataLocation(self.filesbyblock.keys(),cfg_params)
297 gutsche 1.6 dataloc.fetchDLSInfo()
298 slacapra 1.41 except DataLocation.DataLocationError , ex:
299 slacapra 1.1 msg = 'ERROR ***: failed Data Location in DLS \n %s '%ex.getErrorMessage()
300     raise CrabException(msg)
301 ewv 1.131
302 slacapra 1.1
303 gutsche 1.35 sites = dataloc.getSites()
304     allSites = []
305     listSites = sites.values()
306 slacapra 1.63 for listSite in listSites:
307     for oneSite in listSite:
308 gutsche 1.35 allSites.append(oneSite)
309     allSites = self.uniquelist(allSites)
310 gutsche 1.3
311 gutsche 1.92 # screen output
312     common.logger.message("Requested dataset: " + datasetPath + " has " + str(self.maxEvents) + " events in " + str(len(self.filesbyblock.keys())) + " blocks.\n")
313    
314 gutsche 1.35 return sites
315 ewv 1.131
316 ewv 1.170 # to Be Removed DS -- BL
317 spiga 1.165 # def setArgsList(self, argsList):
318     # self.argsList = argsList
319 mcinquil 1.140
320 gutsche 1.35 def jobSplittingByBlocks(self, blockSites):
321 slacapra 1.9 """
322 gutsche 1.35 Perform job splitting. Jobs run over an integer number of files
323     and no more than one block.
324     ARGUMENT: blockSites: dictionary with blocks as keys and list of host sites as values
325     REQUIRES: self.selectTotalNumberEvents, self.selectEventsPerJob, self.selectNumberofJobs,
326     self.total_number_of_events, self.eventsPerJob, self.theNumberOfJobs,
327     self.maxEvents, self.filesbyblock
328     SETS: self.jobDestination - Site destination(s) for each job (a list of lists)
329     self.total_number_of_jobs - Total # of jobs
330     self.list_of_args - File(s) job will run on (a list of lists)
331     """
332    
333     # ---- Handle the possible job splitting configurations ---- #
334     if (self.selectTotalNumberEvents):
335     totalEventsRequested = self.total_number_of_events
336     if (self.selectEventsPerJob):
337     eventsPerJobRequested = self.eventsPerJob
338     if (self.selectNumberOfJobs):
339     totalEventsRequested = self.theNumberOfJobs * self.eventsPerJob
340    
341     # If user requested all the events in the dataset
342     if (totalEventsRequested == -1):
343     eventsRemaining=self.maxEvents
344     # If user requested more events than are in the dataset
345     elif (totalEventsRequested > self.maxEvents):
346     eventsRemaining = self.maxEvents
347     common.logger.message("Requested "+str(self.total_number_of_events)+ " events, but only "+str(self.maxEvents)+" events are available.")
348     # If user requested less events than are in the dataset
349     else:
350     eventsRemaining = totalEventsRequested
351 slacapra 1.22
352 slacapra 1.41 # If user requested more events per job than are in the dataset
353     if (self.selectEventsPerJob and eventsPerJobRequested > self.maxEvents):
354     eventsPerJobRequested = self.maxEvents
355    
356 gutsche 1.35 # For user info at end
357     totalEventCount = 0
358 gutsche 1.3
359 gutsche 1.35 if (self.selectTotalNumberEvents and self.selectNumberOfJobs):
360     eventsPerJobRequested = int(eventsRemaining/self.theNumberOfJobs)
361 slacapra 1.22
362 gutsche 1.35 if (self.selectNumberOfJobs):
363     common.logger.message("May not create the exact number_of_jobs requested.")
364 slacapra 1.23
365 gutsche 1.38 if ( self.ncjobs == 'all' ) :
366     totalNumberOfJobs = 999999999
367     else :
368     totalNumberOfJobs = self.ncjobs
369 ewv 1.131
370 gutsche 1.35 blocks = blockSites.keys()
371     blockCount = 0
372     # Backup variable in case self.maxEvents counted events in a non-included block
373     numBlocksInDataset = len(blocks)
374 gutsche 1.3
375 gutsche 1.35 jobCount = 0
376     list_of_lists = []
377 gutsche 1.3
378 gutsche 1.92 # list tracking which jobs are in which jobs belong to which block
379     jobsOfBlock = {}
380    
381 gutsche 1.35 # ---- Iterate over the blocks in the dataset until ---- #
382     # ---- we've met the requested total # of events ---- #
383 gutsche 1.38 while ( (eventsRemaining > 0) and (blockCount < numBlocksInDataset) and (jobCount < totalNumberOfJobs)):
384 gutsche 1.35 block = blocks[blockCount]
385 gutsche 1.44 blockCount += 1
386 gutsche 1.104 if block not in jobsOfBlock.keys() :
387     jobsOfBlock[block] = []
388 ewv 1.131
389 gutsche 1.68 if self.eventsbyblock.has_key(block) :
390     numEventsInBlock = self.eventsbyblock[block]
391     common.logger.debug(5,'Events in Block File '+str(numEventsInBlock))
392 ewv 1.131
393 gutsche 1.68 files = self.filesbyblock[block]
394     numFilesInBlock = len(files)
395     if (numFilesInBlock <= 0):
396     continue
397     fileCount = 0
398    
399     # ---- New block => New job ---- #
400 ewv 1.131 parString = ""
401 gutsche 1.68 # counter for number of events in files currently worked on
402     filesEventCount = 0
403     # flag if next while loop should touch new file
404     newFile = 1
405     # job event counter
406     jobSkipEventCount = 0
407 ewv 1.131
408 gutsche 1.68 # ---- Iterate over the files in the block until we've met the requested ---- #
409     # ---- total # of events or we've gone over all the files in this block ---- #
410     while ( (eventsRemaining > 0) and (fileCount < numFilesInBlock) and (jobCount < totalNumberOfJobs) ):
411     file = files[fileCount]
412     if newFile :
413     try:
414     numEventsInFile = self.eventsbyfile[file]
415     common.logger.debug(6, "File "+str(file)+" has "+str(numEventsInFile)+" events")
416     # increase filesEventCount
417     filesEventCount += numEventsInFile
418     # Add file to current job
419     parString += '\\\"' + file + '\\\"\,'
420     newFile = 0
421     except KeyError:
422     common.logger.message("File "+str(file)+" has unknown number of events: skipping")
423 ewv 1.131
424 slacapra 1.177 eventsPerJobRequested = min(eventsPerJobRequested, eventsRemaining)
425 gutsche 1.68 # if less events in file remain than eventsPerJobRequested
426 slacapra 1.177 if ( filesEventCount - jobSkipEventCount < eventsPerJobRequested):
427 gutsche 1.68 # if last file in block
428     if ( fileCount == numFilesInBlock-1 ) :
429     # end job using last file, use remaining events in block
430     # close job and touch new file
431     fullString = parString[:-2]
432     list_of_lists.append([fullString,str(-1),str(jobSkipEventCount)])
433     common.logger.debug(3,"Job "+str(jobCount+1)+" can run over "+str(filesEventCount - jobSkipEventCount)+" events (last file in block).")
434     self.jobDestination.append(blockSites[block])
435     common.logger.debug(5,"Job "+str(jobCount+1)+" Destination: "+str(self.jobDestination[jobCount]))
436 gutsche 1.92 # fill jobs of block dictionary
437 gutsche 1.104 jobsOfBlock[block].append(jobCount+1)
438 gutsche 1.68 # reset counter
439     jobCount = jobCount + 1
440     totalEventCount = totalEventCount + filesEventCount - jobSkipEventCount
441     eventsRemaining = eventsRemaining - filesEventCount + jobSkipEventCount
442     jobSkipEventCount = 0
443     # reset file
444 ewv 1.131 parString = ""
445 gutsche 1.68 filesEventCount = 0
446     newFile = 1
447     fileCount += 1
448     else :
449     # go to next file
450     newFile = 1
451     fileCount += 1
452     # if events in file equal to eventsPerJobRequested
453     elif ( filesEventCount - jobSkipEventCount == eventsPerJobRequested ) :
454 gutsche 1.38 # close job and touch new file
455     fullString = parString[:-2]
456 gutsche 1.68 list_of_lists.append([fullString,str(eventsPerJobRequested),str(jobSkipEventCount)])
457     common.logger.debug(3,"Job "+str(jobCount+1)+" can run over "+str(eventsPerJobRequested)+" events.")
458 gutsche 1.38 self.jobDestination.append(blockSites[block])
459     common.logger.debug(5,"Job "+str(jobCount+1)+" Destination: "+str(self.jobDestination[jobCount]))
460 gutsche 1.104 jobsOfBlock[block].append(jobCount+1)
461 gutsche 1.38 # reset counter
462     jobCount = jobCount + 1
463 gutsche 1.68 totalEventCount = totalEventCount + eventsPerJobRequested
464     eventsRemaining = eventsRemaining - eventsPerJobRequested
465 gutsche 1.38 jobSkipEventCount = 0
466     # reset file
467 ewv 1.131 parString = ""
468 gutsche 1.38 filesEventCount = 0
469     newFile = 1
470     fileCount += 1
471 ewv 1.131
472 gutsche 1.68 # if more events in file remain than eventsPerJobRequested
473 gutsche 1.38 else :
474 gutsche 1.68 # close job but don't touch new file
475     fullString = parString[:-2]
476     list_of_lists.append([fullString,str(eventsPerJobRequested),str(jobSkipEventCount)])
477     common.logger.debug(3,"Job "+str(jobCount+1)+" can run over "+str(eventsPerJobRequested)+" events.")
478     self.jobDestination.append(blockSites[block])
479     common.logger.debug(5,"Job "+str(jobCount+1)+" Destination: "+str(self.jobDestination[jobCount]))
480 gutsche 1.104 jobsOfBlock[block].append(jobCount+1)
481 gutsche 1.68 # increase counter
482     jobCount = jobCount + 1
483     totalEventCount = totalEventCount + eventsPerJobRequested
484     eventsRemaining = eventsRemaining - eventsPerJobRequested
485     # calculate skip events for last file
486     # use filesEventCount (contains several files), jobSkipEventCount and eventsPerJobRequest
487     jobSkipEventCount = eventsPerJobRequested - (filesEventCount - jobSkipEventCount - self.eventsbyfile[file])
488     # remove all but the last file
489     filesEventCount = self.eventsbyfile[file]
490 ewv 1.160 parString = '\\\"' + file + '\\\"\,'
491 gutsche 1.68 pass # END if
492     pass # END while (iterate over files in the block)
493 gutsche 1.35 pass # END while (iterate over blocks in the dataset)
494 slacapra 1.41 self.ncjobs = self.total_number_of_jobs = jobCount
495 gutsche 1.38 if (eventsRemaining > 0 and jobCount < totalNumberOfJobs ):
496 gutsche 1.35 common.logger.message("Could not run on all requested events because some blocks not hosted at allowed sites.")
497 gutsche 1.92 common.logger.message(str(jobCount)+" job(s) can run on "+str(totalEventCount)+" events.\n")
498 ewv 1.131
499 gutsche 1.92 # screen output
500     screenOutput = "List of jobs and available destination sites:\n\n"
501    
502 mcinquil 1.124 # keep trace of block with no sites to print a warning at the end
503     noSiteBlock = []
504     bloskNoSite = []
505    
506 gutsche 1.92 blockCounter = 0
507 gutsche 1.104 for block in blocks:
508     if block in jobsOfBlock.keys() :
509     blockCounter += 1
510 slacapra 1.176 screenOutput += "Block %5i: jobs %20s: sites: %s\n" % (blockCounter,spanRanges(jobsOfBlock[block]),
511     ','.join(self.blackWhiteListParser.checkWhiteList(self.blackWhiteListParser.checkBlackList(blockSites[block],block),block)))
512 mcinquil 1.124 if len(self.blackWhiteListParser.checkWhiteList(self.blackWhiteListParser.checkBlackList(blockSites[block],block),block)) == 0:
513 ewv 1.131 noSiteBlock.append( spanRanges(jobsOfBlock[block]) )
514 mcinquil 1.124 bloskNoSite.append( blockCounter )
515 ewv 1.131
516 mcinquil 1.124 common.logger.message(screenOutput)
517 fanzago 1.127 if len(noSiteBlock) > 0 and len(bloskNoSite) > 0:
518 mcinquil 1.126 msg = 'WARNING: No sites are hosting any part of data for block:\n '
519     virgola = ""
520     if len(bloskNoSite) > 1:
521     virgola = ","
522     for block in bloskNoSite:
523     msg += ' ' + str(block) + virgola
524     msg += '\n Related jobs:\n '
525     virgola = ""
526     if len(noSiteBlock) > 1:
527     virgola = ","
528     for range_jobs in noSiteBlock:
529     msg += str(range_jobs) + virgola
530     msg += '\n will not be submitted and this block of data can not be analyzed!\n'
531 slacapra 1.155 if self.cfg_params.has_key('EDG.se_white_list'):
532     msg += 'WARNING: SE White List: '+self.cfg_params['EDG.se_white_list']+'\n'
533     msg += '(Hint: By whitelisting you force the job to run at this particular site(s).\n'
534     msg += 'Please check if the dataset is available at this site!)\n'
535     if self.cfg_params.has_key('EDG.ce_white_list'):
536     msg += 'WARNING: CE White List: '+self.cfg_params['EDG.ce_white_list']+'\n'
537     msg += '(Hint: By whitelisting you force the job to run at this particular site(s).\n'
538     msg += 'Please check if the dataset is available at this site!)\n'
539    
540 mcinquil 1.126 common.logger.message(msg)
541 gutsche 1.92
542 slacapra 1.9 self.list_of_args = list_of_lists
543     return
544    
545 slacapra 1.21 def jobSplittingNoInput(self):
546 slacapra 1.9 """
547     Perform job splitting based on number of event per job
548     """
549     common.logger.debug(5,'Splitting per events')
550 fanzago 1.130
551 ewv 1.131 if (self.selectEventsPerJob):
552 fanzago 1.130 common.logger.message('Required '+str(self.eventsPerJob)+' events per job ')
553     if (self.selectNumberOfJobs):
554     common.logger.message('Required '+str(self.theNumberOfJobs)+' jobs in total ')
555     if (self.selectTotalNumberEvents):
556     common.logger.message('Required '+str(self.total_number_of_events)+' events in total ')
557 slacapra 1.9
558 slacapra 1.10 if (self.total_number_of_events < 0):
559     msg='Cannot split jobs per Events with "-1" as total number of events'
560     raise CrabException(msg)
561    
562 slacapra 1.22 if (self.selectEventsPerJob):
563 spiga 1.65 if (self.selectTotalNumberEvents):
564     self.total_number_of_jobs = int(self.total_number_of_events/self.eventsPerJob)
565 ewv 1.131 elif(self.selectNumberOfJobs) :
566 spiga 1.65 self.total_number_of_jobs =self.theNumberOfJobs
567 ewv 1.131 self.total_number_of_events =int(self.theNumberOfJobs*self.eventsPerJob)
568 spiga 1.65
569 slacapra 1.22 elif (self.selectNumberOfJobs) :
570     self.total_number_of_jobs = self.theNumberOfJobs
571     self.eventsPerJob = int(self.total_number_of_events/self.total_number_of_jobs)
572 ewv 1.131
573 slacapra 1.9 common.logger.debug(5,'N jobs '+str(self.total_number_of_jobs))
574    
575     # is there any remainder?
576     check = int(self.total_number_of_events) - (int(self.total_number_of_jobs)*self.eventsPerJob)
577    
578     common.logger.debug(5,'Check '+str(check))
579    
580 gutsche 1.35 common.logger.message(str(self.total_number_of_jobs)+' jobs can be created, each for '+str(self.eventsPerJob)+' for a total of '+str(self.total_number_of_jobs*self.eventsPerJob)+' events')
581 slacapra 1.9 if check > 0:
582 gutsche 1.35 common.logger.message('Warning: asked '+str(self.total_number_of_events)+' but can do only '+str(int(self.total_number_of_jobs)*self.eventsPerJob))
583 slacapra 1.9
584 slacapra 1.10 # argument is seed number.$i
585 slacapra 1.9 self.list_of_args = []
586     for i in range(self.total_number_of_jobs):
587 gutsche 1.35 ## Since there is no input, any site is good
588 ewv 1.131 self.jobDestination.append([""]) #must be empty to write correctly the xml
589 slacapra 1.90 args=[]
590 spiga 1.57 if (self.firstRun):
591 slacapra 1.138 ## pythia first run
592 slacapra 1.90 args.append(str(self.firstRun)+str(i))
593     self.list_of_args.append(args)
594 ewv 1.131
595 gutsche 1.3 return
596    
597 spiga 1.42
598     def jobSplittingForScript(self):#CarlosDaniele
599     """
600     Perform job splitting based on number of job
601     """
602     common.logger.debug(5,'Splitting per job')
603     common.logger.message('Required '+str(self.theNumberOfJobs)+' jobs in total ')
604    
605     self.total_number_of_jobs = self.theNumberOfJobs
606    
607     common.logger.debug(5,'N jobs '+str(self.total_number_of_jobs))
608    
609     common.logger.message(str(self.total_number_of_jobs)+' jobs can be created')
610    
611     # argument is seed number.$i
612     self.list_of_args = []
613     for i in range(self.total_number_of_jobs):
614     ## Since there is no input, any site is good
615     # self.jobDestination.append(["Any"])
616     self.jobDestination.append([""])
617     ## no random seed
618     self.list_of_args.append([str(i)])
619     return
620    
621 gutsche 1.3 def split(self, jobParams):
622 ewv 1.131
623 gutsche 1.3 #### Fabio
624     njobs = self.total_number_of_jobs
625 slacapra 1.9 arglist = self.list_of_args
626 gutsche 1.3 # create the empty structure
627     for i in range(njobs):
628     jobParams.append("")
629 ewv 1.131
630 spiga 1.165 listID=[]
631     listField=[]
632 gutsche 1.3 for job in range(njobs):
633 slacapra 1.17 jobParams[job] = arglist[job]
634 spiga 1.167 listID.append(job+1)
635 spiga 1.162 job_ToSave ={}
636 spiga 1.169 concString = ' '
637 spiga 1.165 argu=''
638     if len(jobParams[job]):
639     argu += concString.join(jobParams[job] )
640 spiga 1.169 job_ToSave['arguments']= str(job+1)+' '+argu## new BL--DS
641 spiga 1.162 job_ToSave['dlsDestination']= self.jobDestination[job]## new BL--DS
642 spiga 1.165 #common._db.updateJob_(job,job_ToSave)## new BL--DS
643     listField.append(job_ToSave)
644 spiga 1.169 msg="Job "+str(job)+" Arguments: "+str(job+1)+" "+argu+"\n" \
645 spiga 1.165 +" Destination: "+str(self.jobDestination[job])
646     common.logger.debug(5,msg)
647     #common.logger.debug(5,"Job "+str(job)+" Destination: "+str(self.jobDestination[job]))
648     common._db.updateJob_(listID,listField)## new BL--DS
649     ## Pay Attention Here....DS--BL
650 spiga 1.181 self.argsList = (len(jobParams[0])+1)
651 gutsche 1.3
652     return
653 ewv 1.131
654 gutsche 1.3 def numberOfJobs(self):
655     # Fabio
656     return self.total_number_of_jobs
657    
658 slacapra 1.1 def getTarBall(self, exe):
659     """
660     Return the TarBall with lib and exe
661     """
662 ewv 1.131
663 slacapra 1.1 # if it exist, just return it
664 corvo 1.56 #
665     # Marco. Let's start to use relative path for Boss XML files
666     #
667     self.tgzNameWithPath = common.work_space.pathForTgz()+'share/'+self.tgz_name
668 slacapra 1.1 if os.path.exists(self.tgzNameWithPath):
669     return self.tgzNameWithPath
670    
671     # Prepare a tar gzipped file with user binaries.
672     self.buildTar_(exe)
673    
674     return string.strip(self.tgzNameWithPath)
675    
676     def buildTar_(self, executable):
677    
678     # First of all declare the user Scram area
679     swArea = self.scram.getSWArea_()
680     #print "swArea = ", swArea
681 slacapra 1.63 # swVersion = self.scram.getSWVersion()
682     # print "swVersion = ", swVersion
683 slacapra 1.1 swReleaseTop = self.scram.getReleaseTop_()
684     #print "swReleaseTop = ", swReleaseTop
685 ewv 1.131
686 slacapra 1.1 ## check if working area is release top
687     if swReleaseTop == '' or swArea == swReleaseTop:
688 afanfani 1.172 common.logger.debug(3,"swArea = "+swArea+" swReleaseTop ="+swReleaseTop)
689 slacapra 1.1 return
690    
691 slacapra 1.61 import tarfile
692     try: # create tar ball
693     tar = tarfile.open(self.tgzNameWithPath, "w:gz")
694     ## First find the executable
695 slacapra 1.86 if (self.executable != ''):
696 slacapra 1.61 exeWithPath = self.scram.findFile_(executable)
697     if ( not exeWithPath ):
698     raise CrabException('User executable '+executable+' not found')
699 ewv 1.131
700 slacapra 1.61 ## then check if it's private or not
701     if exeWithPath.find(swReleaseTop) == -1:
702     # the exe is private, so we must ship
703     common.logger.debug(5,"Exe "+exeWithPath+" to be tarred")
704     path = swArea+'/'
705 corvo 1.85 # distinguish case when script is in user project area or given by full path somewhere else
706     if exeWithPath.find(path) >= 0 :
707     exe = string.replace(exeWithPath, path,'')
708 slacapra 1.129 tar.add(path+exe,exe)
709 corvo 1.85 else :
710     tar.add(exeWithPath,os.path.basename(executable))
711 slacapra 1.61 pass
712     else:
713     # the exe is from release, we'll find it on WN
714     pass
715 ewv 1.131
716 slacapra 1.61 ## Now get the libraries: only those in local working area
717     libDir = 'lib'
718     lib = swArea+'/' +libDir
719     common.logger.debug(5,"lib "+lib+" to be tarred")
720     if os.path.exists(lib):
721     tar.add(lib,libDir)
722 ewv 1.131
723 slacapra 1.61 ## Now check if module dir is present
724     moduleDir = 'module'
725     module = swArea + '/' + moduleDir
726     if os.path.isdir(module):
727     tar.add(module,moduleDir)
728    
729     ## Now check if any data dir(s) is present
730     swAreaLen=len(swArea)
731 spiga 1.179 self.dataExist = False
732 slacapra 1.61 for root, dirs, files in os.walk(swArea):
733     if "data" in dirs:
734 spiga 1.179 self.dataExist=True
735 slacapra 1.61 common.logger.debug(5,"data "+root+"/data"+" to be tarred")
736     tar.add(root+"/data",root[swAreaLen:]+"/data")
737 ewv 1.182
738 spiga 1.179 ### CMSSW ParameterSet
739     if not self.pset is None:
740     cfg_file = common.work_space.jobDir()+self.configFilename()
741 ewv 1.182 tar.add(cfg_file,self.configFilename())
742 spiga 1.179 common.logger.debug(5,"File added to "+self.tgzNameWithPath+" : "+str(tar.getnames()))
743 slacapra 1.61
744 fanzago 1.93
745 fanzago 1.152 ## Add ProdCommon dir to tar
746 fanzago 1.93 prodcommonDir = 'ProdCommon'
747     prodcommonPath = os.environ['CRABDIR'] + '/' + 'ProdCommon'
748     if os.path.isdir(prodcommonPath):
749     tar.add(prodcommonPath,prodcommonDir)
750 spiga 1.179 common.logger.debug(5,"Files added to "+self.tgzNameWithPath+" : "+str(tar.getnames()))
751    
752     ##### ML stuff
753     ML_file_list=['report.py', 'DashboardAPI.py', 'Logger.py', 'ProcInfo.py', 'apmon.py']
754     path=os.environ['CRABDIR'] + '/python/'
755     for file in ML_file_list:
756     tar.add(path+file,file)
757     common.logger.debug(5,"Files added to "+self.tgzNameWithPath+" : "+str(tar.getnames()))
758    
759     ##### Utils
760     Utils_file_list=['parseCrabFjr.py','writeCfg.py', 'JobReportErrorCode.py']
761     for file in Utils_file_list:
762     tar.add(path+file,file)
763     common.logger.debug(5,"Files added to "+self.tgzNameWithPath+" : "+str(tar.getnames()))
764 ewv 1.131
765 ewv 1.182 ##### AdditionalFiles
766 spiga 1.179 for file in self.additional_inbox_files:
767     tar.add(file,string.split(file,'/')[-1])
768 slacapra 1.61 common.logger.debug(5,"Files added to "+self.tgzNameWithPath+" : "+str(tar.getnames()))
769 ewv 1.182
770 slacapra 1.61 tar.close()
771     except :
772     raise CrabException('Could not create tar-ball')
773 gutsche 1.72
774     ## check for tarball size
775     tarballinfo = os.stat(self.tgzNameWithPath)
776     if ( tarballinfo.st_size > self.MaxTarBallSize*1024*1024 ) :
777     raise CrabException('Input sandbox size of ' + str(float(tarballinfo.st_size)/1024.0/1024.0) + ' MB is larger than the allowed ' + str(self.MaxTarBallSize) + ' MB input sandbox limit and not supported by the used GRID submission system. Please make sure that no unnecessary files are in all data directories in your local CMSSW project area as they are automatically packed into the input sandbox.')
778    
779 slacapra 1.61 ## create tar-ball with ML stuff
780 slacapra 1.97
781 spiga 1.165 def wsSetupEnvironment(self, nj=0):
782 slacapra 1.1 """
783     Returns part of a job script which prepares
784     the execution environment for the job 'nj'.
785     """
786 ewv 1.184 if (self.CMSSW_major >= 2 and self.CMSSW_minor >= 1) or (self.CMSSW_major >= 3):
787     psetName = 'pset.py'
788     else:
789     psetName = 'pset.cfg'
790 slacapra 1.1 # Prepare JobType-independent part
791 ewv 1.160 txt = '\n#Written by cms_cmssw::wsSetupEnvironment\n'
792 fanzago 1.133 txt += 'echo ">>> setup environment"\n'
793 ewv 1.131 txt += 'if [ $middleware == LCG ]; then \n'
794 gutsche 1.3 txt += self.wsSetupCMSLCGEnvironment_()
795     txt += 'elif [ $middleware == OSG ]; then\n'
796 gutsche 1.43 txt += ' WORKING_DIR=`/bin/mktemp -d $OSG_WN_TMP/cms_XXXXXXXXXXXX`\n'
797 ewv 1.132 txt += ' if [ ! $? == 0 ] ;then\n'
798 fanzago 1.161 txt += ' echo "ERROR ==> OSG $WORKING_DIR could not be created on WN `hostname`"\n'
799     txt += ' job_exit_code=10016\n'
800     txt += ' func_exit\n'
801 gutsche 1.3 txt += ' fi\n'
802 fanzago 1.133 txt += ' echo ">>> Created working directory: $WORKING_DIR"\n'
803 gutsche 1.3 txt += '\n'
804     txt += ' echo "Change to working directory: $WORKING_DIR"\n'
805     txt += ' cd $WORKING_DIR\n'
806 fanzago 1.133 txt += ' echo ">>> current directory (WORKING_DIR): $WORKING_DIR"\n'
807 ewv 1.131 txt += self.wsSetupCMSOSGEnvironment_()
808 fanzago 1.133 #txt += ' echo "### Set SCRAM ARCH to ' + self.executable_arch + ' ###"\n'
809     #txt += ' export SCRAM_ARCH='+self.executable_arch+'\n'
810 gutsche 1.3 txt += 'fi\n'
811 slacapra 1.1
812     # Prepare JobType-specific part
813     scram = self.scram.commandName()
814     txt += '\n\n'
815 fanzago 1.133 txt += 'echo ">>> specific cmssw setup environment:"\n'
816     txt += 'echo "CMSSW_VERSION = '+self.version+'"\n'
817 slacapra 1.1 txt += scram+' project CMSSW '+self.version+'\n'
818     txt += 'status=$?\n'
819     txt += 'if [ $status != 0 ] ; then\n'
820 fanzago 1.161 txt += ' echo "ERROR ==> CMSSW '+self.version+' not found on `hostname`" \n'
821     txt += ' job_exit_code=10034\n'
822 fanzago 1.163 txt += ' func_exit\n'
823 slacapra 1.1 txt += 'fi \n'
824     txt += 'cd '+self.version+'\n'
825 fanzago 1.99 txt += 'SOFTWARE_DIR=`pwd`\n'
826 fanzago 1.133 txt += 'echo ">>> current directory (SOFTWARE_DIR): $SOFTWARE_DIR" \n'
827 slacapra 1.1 txt += 'eval `'+scram+' runtime -sh | grep -v SCRAMRT_LSB_JOBNAME`\n'
828 fanzago 1.180 txt += 'if [ $? != 0 ] ; then\n'
829     txt += ' echo "ERROR ==> Problem with the command: "\n'
830     txt += ' echo "eval \`'+scram+' runtime -sh | grep -v SCRAMRT_LSB_JOBNAME \` at `hostname`"\n'
831     txt += ' job_exit_code=10034\n'
832     txt += ' func_exit\n'
833     txt += 'fi \n'
834 slacapra 1.1 # Handle the arguments:
835     txt += "\n"
836 gutsche 1.7 txt += "## number of arguments (first argument always jobnumber)\n"
837 slacapra 1.1 txt += "\n"
838 spiga 1.165 txt += "if [ $nargs -lt "+str(self.argsList)+" ]\n"
839 slacapra 1.1 txt += "then\n"
840 fanzago 1.161 txt += " echo 'ERROR ==> Too few arguments' +$nargs+ \n"
841     txt += ' job_exit_code=50113\n'
842     txt += " func_exit\n"
843 slacapra 1.1 txt += "fi\n"
844     txt += "\n"
845    
846     # Prepare job-specific part
847     job = common.job_list[nj]
848 fanzago 1.93 ### FEDE FOR DBS OUTPUT PUBLICATION
849 ewv 1.131 if (self.datasetPath):
850 fanzago 1.93 txt += '\n'
851     txt += 'DatasetPath='+self.datasetPath+'\n'
852    
853     datasetpath_split = self.datasetPath.split("/")
854 ewv 1.131
855 fanzago 1.93 txt += 'PrimaryDataset='+datasetpath_split[1]+'\n'
856     txt += 'DataTier='+datasetpath_split[2]+'\n'
857 fanzago 1.96 txt += 'ApplicationFamily=cmsRun\n'
858 fanzago 1.93
859     else:
860     txt += 'DatasetPath=MCDataTier\n'
861     txt += 'PrimaryDataset=null\n'
862     txt += 'DataTier=null\n'
863     txt += 'ApplicationFamily=MCDataTier\n'
864 ewv 1.170 if self.pset != None:
865 spiga 1.42 pset = os.path.basename(job.configFilename())
866     txt += '\n'
867 spiga 1.95 txt += 'cp $RUNTIME_AREA/'+pset+' .\n'
868 spiga 1.42 if (self.datasetPath): # standard job
869 ewv 1.160 txt += 'InputFiles=${args[1]}; export InputFiles\n'
870     txt += 'MaxEvents=${args[2]}; export MaxEvents\n'
871     txt += 'SkipEvents=${args[3]}; export SkipEvents\n'
872 spiga 1.42 txt += 'echo "Inputfiles:<$InputFiles>"\n'
873     txt += 'echo "MaxEvents:<$MaxEvents>"\n'
874     txt += 'echo "SkipEvents:<$SkipEvents>"\n'
875     else: # pythia like job
876 ewv 1.160 txt += 'PreserveSeeds=' + ','.join(self.preserveSeeds) + '; export PreserveSeeds\n'
877     txt += 'IncrementSeeds=' + ','.join(self.incrementSeeds) + '; export IncrementSeeds\n'
878     txt += 'echo "PreserveSeeds: <$PreserveSeeds>"\n'
879     txt += 'echo "IncrementSeeds:<$IncrementSeeds>"\n'
880 slacapra 1.90 if (self.firstRun):
881 ewv 1.160 txt += 'FirstRun=${args[1]}; export FirstRun\n'
882 spiga 1.57 txt += 'echo "FirstRun: <$FirstRun>"\n'
883 slacapra 1.90
884 ewv 1.184 txt += 'mv -f ' + pset + ' ' + psetName + '\n'
885 slacapra 1.1
886    
887 fanzago 1.163 if self.pset != None:
888 ewv 1.184 # FUTURE: Can simply for 2_1_x and higher
889 spiga 1.42 txt += '\n'
890 ewv 1.184 txt += 'echo "***** cat ' + psetName + ' *********"\n'
891     txt += 'cat ' + psetName + '\n'
892     txt += 'echo "****** end ' + psetName + ' ********"\n'
893 spiga 1.42 txt += '\n'
894 ewv 1.184 txt += 'PSETHASH=`edmConfigHash < ' + psetName + '` \n'
895 fanzago 1.94 txt += 'echo "PSETHASH = $PSETHASH" \n'
896 fanzago 1.93 txt += '\n'
897 gutsche 1.3 return txt
898 slacapra 1.176
899 fanzago 1.166 def wsUntarSoftware(self, nj=0):
900 gutsche 1.3 """
901     Put in the script the commands to build an executable
902     or a library.
903     """
904    
905 fanzago 1.166 txt = '\n#Written by cms_cmssw::wsUntarSoftware\n'
906 gutsche 1.3
907     if os.path.isfile(self.tgzNameWithPath):
908 fanzago 1.133 txt += 'echo ">>> tar xzvf $RUNTIME_AREA/'+os.path.basename(self.tgzNameWithPath)+' :" \n'
909 gutsche 1.3 txt += 'tar xzvf $RUNTIME_AREA/'+os.path.basename(self.tgzNameWithPath)+'\n'
910 spiga 1.179 txt += 'ls -Al \n'
911 gutsche 1.3 txt += 'untar_status=$? \n'
912     txt += 'if [ $untar_status -ne 0 ]; then \n'
913 fanzago 1.161 txt += ' echo "ERROR ==> Untarring .tgz file failed"\n'
914     txt += ' job_exit_code=$untar_status\n'
915     txt += ' func_exit\n'
916 gutsche 1.3 txt += 'else \n'
917     txt += ' echo "Successful untar" \n'
918     txt += 'fi \n'
919 gutsche 1.50 txt += '\n'
920 fanzago 1.152 txt += 'echo ">>> Include ProdCommon in PYTHONPATH:"\n'
921 gutsche 1.50 txt += 'if [ -z "$PYTHONPATH" ]; then\n'
922 fanzago 1.166 txt += ' export PYTHONPATH=$RUNTIME_AREA/ProdCommon\n'
923 gutsche 1.50 txt += 'else\n'
924 fanzago 1.166 txt += ' export PYTHONPATH=$RUNTIME_AREA/ProdCommon:${PYTHONPATH}\n'
925 fanzago 1.93 txt += 'echo "PYTHONPATH=$PYTHONPATH"\n'
926 gutsche 1.50 txt += 'fi\n'
927     txt += '\n'
928    
929 gutsche 1.3 pass
930 ewv 1.131
931 slacapra 1.1 return txt
932 ewv 1.170
933 fanzago 1.166 def wsBuildExe(self, nj=0):
934     """
935     Put in the script the commands to build an executable
936     or a library.
937     """
938    
939     txt = '\n#Written by cms_cmssw::wsBuildExe\n'
940     txt += 'echo ">>> moving CMSSW software directories in `pwd`" \n'
941    
942 ewv 1.170 txt += 'rm -r lib/ module/ \n'
943     txt += 'mv $RUNTIME_AREA/lib/ . \n'
944     txt += 'mv $RUNTIME_AREA/module/ . \n'
945 spiga 1.179 if self.dataExist == True: txt += 'mv $RUNTIME_AREA/src/ . \n'
946 ewv 1.182 if len(self.additional_inbox_files)>0:
947 spiga 1.179 for file in self.additional_inbox_files:
948     txt += 'mv $RUNTIME_AREA/'+file+' . \n'
949 ewv 1.170 txt += 'mv $RUNTIME_AREA/ProdCommon/ . \n'
950    
951 fanzago 1.166 txt += 'if [ -z "$PYTHONPATH" ]; then\n'
952     txt += ' export PYTHONPATH=$SOFTWARE_DIR/ProdCommon\n'
953     txt += 'else\n'
954     txt += ' export PYTHONPATH=$SOFTWARE_DIR/ProdCommon:${PYTHONPATH}\n'
955     txt += 'echo "PYTHONPATH=$PYTHONPATH"\n'
956     txt += 'fi\n'
957     txt += '\n'
958    
959     return txt
960 slacapra 1.1
961     def modifySteeringCards(self, nj):
962     """
963 ewv 1.131 modify the card provided by the user,
964 slacapra 1.1 writing a new card into share dir
965     """
966 ewv 1.131
967 slacapra 1.1 def executableName(self):
968 slacapra 1.70 if self.scriptExe: #CarlosDaniele
969 spiga 1.42 return "sh "
970     else:
971     return self.executable
972 slacapra 1.1
973     def executableArgs(self):
974 ewv 1.160 # FUTURE: This function tests the CMSSW version. Can be simplified as we drop support for old versions
975 slacapra 1.70 if self.scriptExe:#CarlosDaniele
976 spiga 1.42 return self.scriptExe + " $NJob"
977 fanzago 1.115 else:
978 ewv 1.160 ex_args = ""
979 ewv 1.171 # FUTURE: This tests the CMSSW version. Can remove code as versions deprecated
980 ewv 1.160 # Framework job report
981 ewv 1.184 if (self.CMSSW_major >= 1 and self.CMSSW_minor >= 5) or (self.CMSSW_major >= 2):
982 fanzago 1.166 ex_args += " -j $RUNTIME_AREA/crab_fjr_$NJob.xml"
983 ewv 1.184 # Type of config file
984     if self.CMSSW_major >= 2 :
985 ewv 1.171 ex_args += " -p pset.py"
986 fanzago 1.115 else:
987 ewv 1.160 ex_args += " -p pset.cfg"
988     return ex_args
989 slacapra 1.1
990     def inputSandbox(self, nj):
991     """
992     Returns a list of filenames to be put in JDL input sandbox.
993     """
994     inp_box = []
995 slacapra 1.53 # # dict added to delete duplicate from input sandbox file list
996     # seen = {}
997 slacapra 1.1 ## code
998     if os.path.isfile(self.tgzNameWithPath):
999     inp_box.append(self.tgzNameWithPath)
1000 spiga 1.168 wrapper = os.path.basename(str(common._db.queryTask('scriptName')))
1001     inp_box.append(common.work_space.pathForTgz() +'job/'+ wrapper)
1002 slacapra 1.1 return inp_box
1003    
1004     def outputSandbox(self, nj):
1005     """
1006     Returns a list of filenames to be put in JDL output sandbox.
1007     """
1008     out_box = []
1009    
1010     ## User Declared output files
1011 slacapra 1.54 for out in (self.output_file+self.output_file_sandbox):
1012 ewv 1.131 n_out = nj + 1
1013 slacapra 1.1 out_box.append(self.numberFile_(out,str(n_out)))
1014     return out_box
1015    
1016     def prepareSteeringCards(self):
1017     """
1018     Make initial modifications of the user's steering card file.
1019     """
1020     return
1021    
1022     def wsRenameOutput(self, nj):
1023     """
1024     Returns part of a job script which renames the produced files.
1025     """
1026    
1027 ewv 1.160 txt = '\n#Written by cms_cmssw::wsRenameOutput\n'
1028 fanzago 1.148 txt += 'echo ">>> current directory (SOFTWARE_DIR): $SOFTWARE_DIR" \n'
1029     txt += 'echo ">>> current directory content:"\n'
1030 gutsche 1.7 txt += 'ls \n'
1031 fanzago 1.145 txt += '\n'
1032 slacapra 1.54
1033 fanzago 1.128 for fileWithSuffix in (self.output_file):
1034 slacapra 1.1 output_file_num = self.numberFile_(fileWithSuffix, '$NJob')
1035     txt += '\n'
1036 gutsche 1.7 txt += '# check output file\n'
1037 slacapra 1.106 txt += 'if [ -e ./'+fileWithSuffix+' ] ; then\n'
1038 ewv 1.147 if (self.copy_data == 1): # For OSG nodes, file is in $WORKING_DIR, should not be moved to $RUNTIME_AREA
1039     txt += ' mv '+fileWithSuffix+' '+output_file_num+'\n'
1040     txt += ' ln -s `pwd`/'+output_file_num+' $RUNTIME_AREA/'+fileWithSuffix+'\n'
1041     else:
1042     txt += ' mv '+fileWithSuffix+' $RUNTIME_AREA/'+output_file_num+'\n'
1043     txt += ' ln -s $RUNTIME_AREA/'+output_file_num+' $RUNTIME_AREA/'+fileWithSuffix+'\n'
1044 slacapra 1.106 txt += 'else\n'
1045 fanzago 1.161 txt += ' job_exit_code=60302\n'
1046     txt += ' echo "WARNING: Output file '+fileWithSuffix+' not found"\n'
1047 ewv 1.156 if common.scheduler.name().upper() == 'CONDOR_G':
1048 gutsche 1.7 txt += ' if [ $middleware == OSG ]; then \n'
1049     txt += ' echo "prepare dummy output file"\n'
1050     txt += ' echo "Processing of job output failed" > $RUNTIME_AREA/'+output_file_num+'\n'
1051     txt += ' fi \n'
1052 slacapra 1.1 txt += 'fi\n'
1053 slacapra 1.105 file_list = []
1054     for fileWithSuffix in (self.output_file):
1055     file_list.append(self.numberFile_(fileWithSuffix, '$NJob'))
1056 ewv 1.131
1057 slacapra 1.105 txt += 'file_list="'+string.join(file_list,' ')+'"\n'
1058 fanzago 1.149 txt += '\n'
1059 fanzago 1.148 txt += 'echo ">>> current directory (SOFTWARE_DIR): $SOFTWARE_DIR" \n'
1060     txt += 'echo ">>> current directory content:"\n'
1061     txt += 'ls \n'
1062     txt += '\n'
1063 gutsche 1.7 txt += 'cd $RUNTIME_AREA\n'
1064 fanzago 1.133 txt += 'echo ">>> current directory (RUNTIME_AREA): $RUNTIME_AREA"\n'
1065 slacapra 1.1 return txt
1066    
1067     def numberFile_(self, file, txt):
1068     """
1069     append _'txt' before last extension of a file
1070     """
1071     p = string.split(file,".")
1072     # take away last extension
1073     name = p[0]
1074     for x in p[1:-1]:
1075 slacapra 1.90 name=name+"."+x
1076 slacapra 1.1 # add "_txt"
1077     if len(p)>1:
1078 slacapra 1.90 ext = p[len(p)-1]
1079     result = name + '_' + txt + "." + ext
1080 slacapra 1.1 else:
1081 slacapra 1.90 result = name + '_' + txt
1082 ewv 1.131
1083 slacapra 1.1 return result
1084    
1085 slacapra 1.63 def getRequirements(self, nj=[]):
1086 slacapra 1.1 """
1087 ewv 1.131 return job requirements to add to jdl files
1088 slacapra 1.1 """
1089     req = ''
1090 slacapra 1.47 if self.version:
1091 slacapra 1.10 req='Member("VO-cms-' + \
1092 slacapra 1.47 self.version + \
1093 slacapra 1.10 '", other.GlueHostApplicationSoftwareRunTimeEnvironment)'
1094 farinafa 1.111 ## SL add requirement for OS version only if SL4
1095     #reSL4 = re.compile( r'slc4' )
1096 slacapra 1.109 if self.executable_arch: # and reSL4.search(self.executable_arch):
1097 gutsche 1.107 req+=' && Member("VO-cms-' + \
1098 slacapra 1.105 self.executable_arch + \
1099     '", other.GlueHostApplicationSoftwareRunTimeEnvironment)'
1100 gutsche 1.35
1101     req = req + ' && (other.GlueHostNetworkAdapterOutboundIP)'
1102 afanfani 1.158 if common.scheduler.name() == "glitecoll":
1103     req += ' && other.GlueCEStateStatus == "Production" '
1104 gutsche 1.35
1105 slacapra 1.1 return req
1106 gutsche 1.3
1107     def configFilename(self):
1108     """ return the config filename """
1109 ewv 1.182 # FUTURE: Can remove cfg mode for CMSSW >= 2_1_x
1110 ewv 1.184 if (self.CMSSW_major >= 2 and self.CMSSW_minor >= 1) or (self.CMSSW_major >= 3):
1111 ewv 1.182 return self.name()+'.py'
1112     else:
1113     return self.name()+'.cfg'
1114 gutsche 1.3
1115     def wsSetupCMSOSGEnvironment_(self):
1116     """
1117     Returns part of a job script which is prepares
1118     the execution environment and which is common for all CMS jobs.
1119     """
1120 ewv 1.160 txt = '\n#Written by cms_cmssw::wsSetupCMSOSGEnvironment_\n'
1121     txt += ' echo ">>> setup CMS OSG environment:"\n'
1122 fanzago 1.133 txt += ' echo "set SCRAM ARCH to ' + self.executable_arch + '"\n'
1123     txt += ' export SCRAM_ARCH='+self.executable_arch+'\n'
1124 fanzago 1.136 txt += ' echo "SCRAM_ARCH = $SCRAM_ARCH"\n'
1125 ewv 1.135 txt += ' if [ -f $OSG_APP/cmssoft/cms/cmsset_default.sh ] ;then\n'
1126 mkirn 1.40 txt += ' # Use $OSG_APP/cmssoft/cms/cmsset_default.sh to setup cms software\n'
1127 fanzago 1.133 txt += ' source $OSG_APP/cmssoft/cms/cmsset_default.sh '+self.version+'\n'
1128     txt += ' else\n'
1129 fanzago 1.161 txt += ' echo "ERROR ==> $OSG_APP/cmssoft/cms/cmsset_default.sh file not found"\n'
1130     txt += ' job_exit_code=10020\n'
1131     txt += ' func_exit\n'
1132 fanzago 1.133 txt += ' fi\n'
1133 gutsche 1.3 txt += '\n'
1134 fanzago 1.161 txt += ' echo "==> setup cms environment ok"\n'
1135 fanzago 1.136 txt += ' echo "SCRAM_ARCH = $SCRAM_ARCH"\n'
1136 gutsche 1.3
1137     return txt
1138 ewv 1.131
1139 gutsche 1.3 ### OLI_DANIELE
1140     def wsSetupCMSLCGEnvironment_(self):
1141     """
1142     Returns part of a job script which is prepares
1143     the execution environment and which is common for all CMS jobs.
1144     """
1145 ewv 1.160 txt = '\n#Written by cms_cmssw::wsSetupCMSLCGEnvironment_\n'
1146     txt += ' echo ">>> setup CMS LCG environment:"\n'
1147 fanzago 1.133 txt += ' echo "set SCRAM ARCH and BUILD_ARCH to ' + self.executable_arch + ' ###"\n'
1148     txt += ' export SCRAM_ARCH='+self.executable_arch+'\n'
1149     txt += ' export BUILD_ARCH='+self.executable_arch+'\n'
1150     txt += ' if [ ! $VO_CMS_SW_DIR ] ;then\n'
1151 fanzago 1.161 txt += ' echo "ERROR ==> CMS software dir not found on WN `hostname`"\n'
1152     txt += ' job_exit_code=10031\n'
1153     txt += ' func_exit\n'
1154 fanzago 1.133 txt += ' else\n'
1155     txt += ' echo "Sourcing environment... "\n'
1156     txt += ' if [ ! -s $VO_CMS_SW_DIR/cmsset_default.sh ] ;then\n'
1157 fanzago 1.161 txt += ' echo "ERROR ==> cmsset_default.sh file not found into dir $VO_CMS_SW_DIR"\n'
1158     txt += ' job_exit_code=10020\n'
1159     txt += ' func_exit\n'
1160 fanzago 1.133 txt += ' fi\n'
1161     txt += ' echo "sourcing $VO_CMS_SW_DIR/cmsset_default.sh"\n'
1162     txt += ' source $VO_CMS_SW_DIR/cmsset_default.sh\n'
1163     txt += ' result=$?\n'
1164     txt += ' if [ $result -ne 0 ]; then\n'
1165 fanzago 1.161 txt += ' echo "ERROR ==> problem sourcing $VO_CMS_SW_DIR/cmsset_default.sh"\n'
1166     txt += ' job_exit_code=10032\n'
1167     txt += ' func_exit\n'
1168 fanzago 1.133 txt += ' fi\n'
1169     txt += ' fi\n'
1170     txt += ' \n'
1171 fanzago 1.161 txt += ' echo "==> setup cms environment ok"\n'
1172 gutsche 1.3 return txt
1173 gutsche 1.5
1174 ewv 1.131 ### FEDE FOR DBS OUTPUT PUBLICATION
1175 fanzago 1.93 def modifyReport(self, nj):
1176     """
1177 ewv 1.131 insert the part of the script that modifies the FrameworkJob Report
1178 fanzago 1.93 """
1179 fanzago 1.94
1180 ewv 1.160 txt = '\n#Written by cms_cmssw::modifyReport\n'
1181 slacapra 1.176 publish_data = int(self.cfg_params.get('USER.publish_data',0))
1182 ewv 1.131 if (publish_data == 1):
1183 fanzago 1.94 processedDataset = self.cfg_params['USER.publish_data_name']
1184 fanzago 1.173 LFNBaseName = LFNBase(processedDataset)
1185 fanzago 1.175
1186     txt += 'if [ $copy_exit_status -eq 0 ]; then\n'
1187 fanzago 1.173 txt += ' FOR_LFN=%s_${PSETHASH}/\n'%(LFNBaseName)
1188 fanzago 1.175 txt += 'else\n'
1189     txt += ' FOR_LFN=/copy_problems/ \n'
1190     txt += ' SE=""\n'
1191     txt += ' SE_PATH=""\n'
1192     txt += 'fi\n'
1193 ewv 1.182
1194 fanzago 1.175 txt += 'echo ">>> Modify Job Report:" \n'
1195     txt += 'chmod a+x $SOFTWARE_DIR/ProdCommon/ProdCommon/FwkJobRep/ModifyJobReport.py\n'
1196     txt += 'ProcessedDataset='+processedDataset+'\n'
1197     txt += 'echo "ProcessedDataset = $ProcessedDataset"\n'
1198     txt += 'echo "SE = $SE"\n'
1199     txt += 'echo "SE_PATH = $SE_PATH"\n'
1200     txt += 'echo "FOR_LFN = $FOR_LFN" \n'
1201     txt += 'echo "CMSSW_VERSION = $CMSSW_VERSION"\n\n'
1202     txt += 'echo "$SOFTWARE_DIR/ProdCommon/ProdCommon/FwkJobRep/ModifyJobReport.py $RUNTIME_AREA/crab_fjr_$NJob.xml $NJob $FOR_LFN $PrimaryDataset $DataTier $ProcessedDataset $ApplicationFamily $executable $CMSSW_VERSION $PSETHASH $SE $SE_PATH"\n'
1203     txt += '$SOFTWARE_DIR/ProdCommon/ProdCommon/FwkJobRep/ModifyJobReport.py $RUNTIME_AREA/crab_fjr_$NJob.xml $NJob $FOR_LFN $PrimaryDataset $DataTier $ProcessedDataset $ApplicationFamily $executable $CMSSW_VERSION $PSETHASH $SE $SE_PATH\n'
1204     txt += 'modifyReport_result=$?\n'
1205     txt += 'if [ $modifyReport_result -ne 0 ]; then\n'
1206     txt += ' modifyReport_result=70500\n'
1207     txt += ' job_exit_code=$modifyReport_result\n'
1208     txt += ' echo "ModifyReportResult=$modifyReport_result" | tee -a $RUNTIME_AREA/$repo\n'
1209     txt += ' echo "WARNING: Problem with ModifyJobReport"\n'
1210     txt += 'else\n'
1211     txt += ' mv NewFrameworkJobReport.xml $RUNTIME_AREA/crab_fjr_$NJob.xml\n'
1212 spiga 1.103 txt += 'fi\n'
1213 fanzago 1.93 return txt
1214 fanzago 1.99
1215 gutsche 1.5 def setParam_(self, param, value):
1216     self._params[param] = value
1217    
1218     def getParams(self):
1219     return self._params
1220 gutsche 1.8
1221 gutsche 1.35 def uniquelist(self, old):
1222     """
1223     remove duplicates from a list
1224     """
1225     nd={}
1226     for e in old:
1227     nd[e]=0
1228     return nd.keys()
1229 mcinquil 1.121
1230 spiga 1.169 def outList(self):
1231 mcinquil 1.121 """
1232     check the dimension of the output files
1233     """
1234 spiga 1.169 txt = ''
1235     txt += 'echo ">>> list of expected files on output sandbox"\n'
1236 mcinquil 1.121 listOutFiles = []
1237 ewv 1.170 stdout = 'CMSSW_$NJob.stdout'
1238 spiga 1.169 stderr = 'CMSSW_$NJob.stderr'
1239 fanzago 1.148 if (self.return_data == 1):
1240 spiga 1.157 for file in (self.output_file+self.output_file_sandbox):
1241     listOutFiles.append(self.numberFile_(file, '$NJob'))
1242 spiga 1.169 listOutFiles.append(stdout)
1243     listOutFiles.append(stderr)
1244 ewv 1.156 else:
1245 spiga 1.157 for file in (self.output_file_sandbox):
1246     listOutFiles.append(self.numberFile_(file, '$NJob'))
1247 spiga 1.169 listOutFiles.append(stdout)
1248     listOutFiles.append(stderr)
1249 fanzago 1.161 txt += 'echo "output files: '+string.join(listOutFiles,' ')+'"\n'
1250 spiga 1.157 txt += 'filesToCheck="'+string.join(listOutFiles,' ')+'"\n'
1251 spiga 1.169 txt += 'export filesToCheck\n'
1252 ewv 1.170 return txt