ViewVC Help
View File | Revision Log | Show Annotations | Root Listing
root/cvsroot/COMP/CRAB/python/cms_cmssw.py
Revision: 1.181
Committed: Tue Apr 29 08:14:55 2008 UTC (17 years ago) by spiga
Content type: text/x-python
Branch: MAIN
Changes since 1.180: +1 -1 lines
Log Message:
 bug fix

File Contents

# User Rev Content
1 slacapra 1.1 from JobType import JobType
2     from crab_logger import Logger
3     from crab_exceptions import *
4     from crab_util import *
5 fanzago 1.115 from BlackWhiteListParser import BlackWhiteListParser
6 slacapra 1.1 import common
7     import Scram
8 fanzago 1.173 from LFNBaseName import *
9 slacapra 1.1
10 slacapra 1.105 import os, string, glob
11 slacapra 1.1
12     class Cmssw(JobType):
13 mcinquil 1.144 def __init__(self, cfg_params, ncjobs):
14 slacapra 1.1 JobType.__init__(self, 'CMSSW')
15     common.logger.debug(3,'CMSSW::__init__')
16    
17 mcinquil 1.140 self.argsList = []
18 mcinquil 1.144
19 gutsche 1.3 self._params = {}
20     self.cfg_params = cfg_params
21 fanzago 1.115 # init BlackWhiteListParser
22     self.blackWhiteListParser = BlackWhiteListParser(cfg_params)
23    
24 slacapra 1.153 self.MaxTarBallSize = float(self.cfg_params.get('EDG.maxtarballsize',9.5))
25 gutsche 1.72
26 gutsche 1.44 # number of jobs requested to be created, limit obj splitting
27 gutsche 1.38 self.ncjobs = ncjobs
28    
29 slacapra 1.1 log = common.logger
30 ewv 1.131
31 slacapra 1.1 self.scram = Scram.Scram(cfg_params)
32     self.additional_inbox_files = []
33     self.scriptExe = ''
34     self.executable = ''
35 slacapra 1.71 self.executable_arch = self.scram.getArch()
36 slacapra 1.1 self.tgz_name = 'default.tgz'
37 corvo 1.56 self.scriptName = 'CMSSW.sh'
38 ewv 1.131 self.pset = '' #scrip use case Da
39 spiga 1.42 self.datasetPath = '' #scrip use case Da
40 gutsche 1.3
41 gutsche 1.50 # set FJR file name
42     self.fjrFileName = 'crab_fjr.xml'
43    
44 slacapra 1.1 self.version = self.scram.getSWVersion()
45 ewv 1.131
46 spiga 1.114 #
47     # Try to block creation in case of arch/version mismatch
48     #
49    
50 spiga 1.162 # a = string.split(self.version, "_")
51     #
52     # if int(a[1]) == 1 and (int(a[2]) < 5 and self.executable_arch.find('slc4') == 0):
53     # msg = "Warning: You are using %s version of CMSSW with %s architecture. \n--> Did you compile your libraries with SLC3? Otherwise you can find some problems running on SLC4 Grid nodes.\n"%(self.version, self.executable_arch)
54     # common.logger.message(msg)
55     # if int(a[1]) == 1 and (int(a[2]) >= 5 and self.executable_arch.find('slc3') == 0):
56     # msg = "Error: CMS does not support %s with %s architecture"%(self.version, self.executable_arch)
57     # raise CrabException(msg)
58     #
59 ewv 1.170
60 slacapra 1.47
61 slacapra 1.1 ### collect Data cards
62 gutsche 1.66
63 slacapra 1.153 if not cfg_params.has_key('CMSSW.datasetpath'):
64 ewv 1.131 msg = "Error: datasetpath not defined "
65 slacapra 1.1 raise CrabException(msg)
66 slacapra 1.153 tmp = cfg_params['CMSSW.datasetpath']
67     log.debug(6, "CMSSW::CMSSW(): datasetPath = "+tmp)
68     if string.lower(tmp)=='none':
69     self.datasetPath = None
70     self.selectNoInput = 1
71     else:
72     self.datasetPath = tmp
73     self.selectNoInput = 0
74 gutsche 1.5
75 slacapra 1.1 self.dataTiers = []
76    
77     ## now the application
78 slacapra 1.153 self.executable = cfg_params.get('CMSSW.executable','cmsRun')
79     log.debug(6, "CMSSW::CMSSW(): executable = "+self.executable)
80 slacapra 1.1
81 slacapra 1.153 if not cfg_params.has_key('CMSSW.pset'):
82 slacapra 1.1 raise CrabException("PSet file missing. Cannot run cmsRun ")
83 slacapra 1.153 self.pset = cfg_params['CMSSW.pset']
84     log.debug(6, "Cmssw::Cmssw(): PSet file = "+self.pset)
85     if self.pset.lower() != 'none' :
86     if (not os.path.exists(self.pset)):
87     raise CrabException("User defined PSet file "+self.pset+" does not exist")
88     else:
89     self.pset = None
90 slacapra 1.1
91     # output files
92 slacapra 1.53 ## stuff which must be returned always via sandbox
93     self.output_file_sandbox = []
94    
95     # add fjr report by default via sandbox
96     self.output_file_sandbox.append(self.fjrFileName)
97    
98     # other output files to be returned via sandbox or copied to SE
99 slacapra 1.153 self.output_file = []
100     tmp = cfg_params.get('CMSSW.output_file',None)
101     if tmp :
102     tmpOutFiles = string.split(tmp,',')
103     log.debug(7, 'cmssw::cmssw(): output files '+str(tmpOutFiles))
104     for tmp in tmpOutFiles:
105     tmp=string.strip(tmp)
106     self.output_file.append(tmp)
107 slacapra 1.1 pass
108 slacapra 1.153 else:
109 gutsche 1.92 log.message("No output file defined: only stdout/err and the CRAB Framework Job Report will be available\n")
110 slacapra 1.153 pass
111 slacapra 1.1
112     # script_exe file as additional file in inputSandbox
113 slacapra 1.153 self.scriptExe = cfg_params.get('USER.script_exe',None)
114     if self.scriptExe :
115 slacapra 1.176 if not os.path.isfile(self.scriptExe):
116     msg ="ERROR. file "+self.scriptExe+" not found"
117     raise CrabException(msg)
118     self.additional_inbox_files.append(string.strip(self.scriptExe))
119 slacapra 1.70
120 spiga 1.42 #CarlosDaniele
121     if self.datasetPath == None and self.pset == None and self.scriptExe == '' :
122 slacapra 1.176 msg ="Error. script_exe not defined"
123     raise CrabException(msg)
124 spiga 1.42
125 slacapra 1.1 ## additional input files
126 slacapra 1.153 if cfg_params.has_key('USER.additional_input_files'):
127 slacapra 1.29 tmpAddFiles = string.split(cfg_params['USER.additional_input_files'],',')
128 slacapra 1.70 for tmp in tmpAddFiles:
129     tmp = string.strip(tmp)
130     dirname = ''
131     if not tmp[0]=="/": dirname = "."
132 corvo 1.85 files = []
133     if string.find(tmp,"*")>-1:
134     files = glob.glob(os.path.join(dirname, tmp))
135     if len(files)==0:
136     raise CrabException("No additional input file found with this pattern: "+tmp)
137     else:
138     files.append(tmp)
139 slacapra 1.70 for file in files:
140     if not os.path.exists(file):
141     raise CrabException("Additional input file not found: "+file)
142 slacapra 1.45 pass
143 slacapra 1.105 # fname = string.split(file, '/')[-1]
144     # storedFile = common.work_space.pathForTgz()+'share/'+fname
145     # shutil.copyfile(file, storedFile)
146     self.additional_inbox_files.append(string.strip(file))
147 slacapra 1.1 pass
148     pass
149 slacapra 1.70 common.logger.debug(5,"Additional input files: "+str(self.additional_inbox_files))
150 slacapra 1.153 pass
151 gutsche 1.3
152 slacapra 1.9 ## Events per job
153 slacapra 1.153 if cfg_params.has_key('CMSSW.events_per_job'):
154 slacapra 1.10 self.eventsPerJob =int( cfg_params['CMSSW.events_per_job'])
155 slacapra 1.9 self.selectEventsPerJob = 1
156 slacapra 1.153 else:
157 slacapra 1.9 self.eventsPerJob = -1
158     self.selectEventsPerJob = 0
159 ewv 1.131
160 slacapra 1.22 ## number of jobs
161 slacapra 1.153 if cfg_params.has_key('CMSSW.number_of_jobs'):
162 slacapra 1.22 self.theNumberOfJobs =int( cfg_params['CMSSW.number_of_jobs'])
163     self.selectNumberOfJobs = 1
164 slacapra 1.153 else:
165 slacapra 1.22 self.theNumberOfJobs = 0
166     self.selectNumberOfJobs = 0
167 slacapra 1.10
168 slacapra 1.153 if cfg_params.has_key('CMSSW.total_number_of_events'):
169 gutsche 1.35 self.total_number_of_events = int(cfg_params['CMSSW.total_number_of_events'])
170     self.selectTotalNumberEvents = 1
171 slacapra 1.153 else:
172 gutsche 1.35 self.total_number_of_events = 0
173     self.selectTotalNumberEvents = 0
174    
175 ewv 1.131 if self.pset != None: #CarlosDaniele
176 spiga 1.42 if ( (self.selectTotalNumberEvents + self.selectEventsPerJob + self.selectNumberOfJobs) != 2 ):
177     msg = 'Must define exactly two of total_number_of_events, events_per_job, or number_of_jobs.'
178     raise CrabException(msg)
179     else:
180     if (self.selectNumberOfJobs == 0):
181     msg = 'Must specify number_of_jobs.'
182     raise CrabException(msg)
183 gutsche 1.35
184 ewv 1.160 ## New method of dealing with seeds
185     self.incrementSeeds = []
186     self.preserveSeeds = []
187     if cfg_params.has_key('CMSSW.preserve_seeds'):
188     tmpList = cfg_params['CMSSW.preserve_seeds'].split(',')
189     for tmp in tmpList:
190     tmp.strip()
191     self.preserveSeeds.append(tmp)
192     if cfg_params.has_key('CMSSW.increment_seeds'):
193     tmpList = cfg_params['CMSSW.increment_seeds'].split(',')
194     for tmp in tmpList:
195     tmp.strip()
196     self.incrementSeeds.append(tmp)
197    
198     ## Old method of dealing with seeds
199     ## FUTURE: This is for old CMSSW and old CRAB. Can throw exceptions after a couple of CRAB releases and then
200     ## remove
201 slacapra 1.153 self.sourceSeed = cfg_params.get('CMSSW.pythia_seed',None)
202 ewv 1.160 if self.sourceSeed:
203 slacapra 1.177 print "pythia_seed is a deprecated parameter. Use preserve_seeds or increment_seeds in the future.\n","Added to increment_seeds."
204     self.incrementSeeds.append('sourceSeed')
205 slacapra 1.153
206     self.sourceSeedVtx = cfg_params.get('CMSSW.vtx_seed',None)
207 ewv 1.160 if self.sourceSeedVtx:
208 slacapra 1.177 print "vtx_seed is a deprecated parameter. Use preserve_seeds or increment_seeds in the future.\n","Added to increment_seeds."
209     self.incrementSeeds.append('VtxSmeared')
210 slacapra 1.22
211 slacapra 1.153 self.sourceSeedG4 = cfg_params.get('CMSSW.g4_seed',None)
212 ewv 1.160 if self.sourceSeedG4:
213 slacapra 1.177 print "g4_seed is a deprecated parameter. Use preserve_seeds or increment_seeds in the future.\n","Added to increment_seeds."
214     self.incrementSeeds.append('g4SimHits')
215 slacapra 1.90
216 slacapra 1.153 self.sourceSeedMix = cfg_params.get('CMSSW.mix_seed',None)
217 ewv 1.160 if self.sourceSeedMix:
218 slacapra 1.177 print "mix_seed is a deprecated parameter. Use preserve_seeds or increment_seeds in the future.\n","Added to increment_seeds."
219     self.incrementSeeds.append('mix')
220 slacapra 1.90
221 slacapra 1.153 self.firstRun = cfg_params.get('CMSSW.first_run',None)
222 slacapra 1.90
223 spiga 1.42 if self.pset != None: #CarlosDaniele
224 ewv 1.131 import PsetManipulator as pp
225 slacapra 1.97 PsetEdit = pp.PsetManipulator(self.pset) #Daniele Pset
226 gutsche 1.3
227 ewv 1.147 # Copy/return
228    
229 slacapra 1.153 self.copy_data = int(cfg_params.get('USER.copy_data',0))
230     self.return_data = int(cfg_params.get('USER.return_data',0))
231 ewv 1.147
232 slacapra 1.1 #DBSDLS-start
233 ewv 1.131 ## Initialize the variables that are extracted from DBS/DLS and needed in other places of the code
234 slacapra 1.1 self.maxEvents=0 # max events available ( --> check the requested nb. of evts in Creator.py)
235     self.DBSPaths={} # all dbs paths requested ( --> input to the site local discovery script)
236 gutsche 1.35 self.jobDestination=[] # Site destination(s) for each job (list of lists)
237 slacapra 1.1 ## Perform the data location and discovery (based on DBS/DLS)
238 slacapra 1.9 ## SL: Don't if NONE is specified as input (pythia use case)
239 gutsche 1.35 blockSites = {}
240 slacapra 1.9 if self.datasetPath:
241 gutsche 1.35 blockSites = self.DataDiscoveryAndLocation(cfg_params)
242 ewv 1.131 #DBSDLS-end
243 slacapra 1.1
244 ewv 1.131
245 slacapra 1.9 ## Select Splitting
246 ewv 1.131 if self.selectNoInput:
247 spiga 1.42 if self.pset == None: #CarlosDaniele
248     self.jobSplittingForScript()
249     else:
250     self.jobSplittingNoInput()
251 gutsche 1.92 else:
252 corvo 1.56 self.jobSplittingByBlocks(blockSites)
253 gutsche 1.5
254 slacapra 1.22 # modify Pset
255 spiga 1.42 if self.pset != None: #CarlosDaniele
256 slacapra 1.86 try:
257 ewv 1.160 # Add FrameworkJobReport to parameter-set, set max events.
258     # Reset later for data jobs by writeCFG which does all modifications
259 slacapra 1.90 PsetEdit.addCrabFJR(self.fjrFileName)
260 ewv 1.160 PsetEdit.maxEvent(self.eventsPerJob)
261 slacapra 1.90 PsetEdit.psetWriter(self.configFilename())
262 slacapra 1.86 except:
263     msg='Error while manipuliating ParameterSet: exiting...'
264     raise CrabException(msg)
265 spiga 1.179 self.tgzNameWithPath = self.getTarBall(self.executable)
266 gutsche 1.3
267 slacapra 1.1 def DataDiscoveryAndLocation(self, cfg_params):
268    
269 slacapra 1.86 import DataDiscovery
270     import DataLocation
271 gutsche 1.3 common.logger.debug(10,"CMSSW::DataDiscoveryAndLocation()")
272    
273     datasetPath=self.datasetPath
274    
275 slacapra 1.1 ## Contact the DBS
276 gutsche 1.92 common.logger.message("Contacting Data Discovery Services ...")
277 slacapra 1.1 try:
278 slacapra 1.137 self.pubdata=DataDiscovery.DataDiscovery(datasetPath, cfg_params)
279 slacapra 1.1 self.pubdata.fetchDBSInfo()
280    
281 slacapra 1.41 except DataDiscovery.NotExistingDatasetError, ex :
282 slacapra 1.1 msg = 'ERROR ***: failed Data Discovery in DBS : %s'%ex.getErrorMessage()
283     raise CrabException(msg)
284 slacapra 1.41 except DataDiscovery.NoDataTierinProvenanceError, ex :
285 slacapra 1.1 msg = 'ERROR ***: failed Data Discovery in DBS : %s'%ex.getErrorMessage()
286     raise CrabException(msg)
287 slacapra 1.41 except DataDiscovery.DataDiscoveryError, ex:
288 gutsche 1.66 msg = 'ERROR ***: failed Data Discovery in DBS : %s'%ex.getErrorMessage()
289 slacapra 1.1 raise CrabException(msg)
290    
291 gutsche 1.35 self.filesbyblock=self.pubdata.getFiles()
292 mkirn 1.37 self.eventsbyblock=self.pubdata.getEventsPerBlock()
293     self.eventsbyfile=self.pubdata.getEventsPerFile()
294 gutsche 1.3
295 slacapra 1.1 ## get max number of events
296 ewv 1.131 self.maxEvents=self.pubdata.getMaxEvents() ## self.maxEvents used in Creator.py
297 slacapra 1.1
298     ## Contact the DLS and build a list of sites hosting the fileblocks
299     try:
300 slacapra 1.41 dataloc=DataLocation.DataLocation(self.filesbyblock.keys(),cfg_params)
301 gutsche 1.6 dataloc.fetchDLSInfo()
302 slacapra 1.41 except DataLocation.DataLocationError , ex:
303 slacapra 1.1 msg = 'ERROR ***: failed Data Location in DLS \n %s '%ex.getErrorMessage()
304     raise CrabException(msg)
305 ewv 1.131
306 slacapra 1.1
307 gutsche 1.35 sites = dataloc.getSites()
308     allSites = []
309     listSites = sites.values()
310 slacapra 1.63 for listSite in listSites:
311     for oneSite in listSite:
312 gutsche 1.35 allSites.append(oneSite)
313     allSites = self.uniquelist(allSites)
314 gutsche 1.3
315 gutsche 1.92 # screen output
316     common.logger.message("Requested dataset: " + datasetPath + " has " + str(self.maxEvents) + " events in " + str(len(self.filesbyblock.keys())) + " blocks.\n")
317    
318 gutsche 1.35 return sites
319 ewv 1.131
320 ewv 1.170 # to Be Removed DS -- BL
321 spiga 1.165 # def setArgsList(self, argsList):
322     # self.argsList = argsList
323 mcinquil 1.140
324 gutsche 1.35 def jobSplittingByBlocks(self, blockSites):
325 slacapra 1.9 """
326 gutsche 1.35 Perform job splitting. Jobs run over an integer number of files
327     and no more than one block.
328     ARGUMENT: blockSites: dictionary with blocks as keys and list of host sites as values
329     REQUIRES: self.selectTotalNumberEvents, self.selectEventsPerJob, self.selectNumberofJobs,
330     self.total_number_of_events, self.eventsPerJob, self.theNumberOfJobs,
331     self.maxEvents, self.filesbyblock
332     SETS: self.jobDestination - Site destination(s) for each job (a list of lists)
333     self.total_number_of_jobs - Total # of jobs
334     self.list_of_args - File(s) job will run on (a list of lists)
335     """
336    
337     # ---- Handle the possible job splitting configurations ---- #
338     if (self.selectTotalNumberEvents):
339     totalEventsRequested = self.total_number_of_events
340     if (self.selectEventsPerJob):
341     eventsPerJobRequested = self.eventsPerJob
342     if (self.selectNumberOfJobs):
343     totalEventsRequested = self.theNumberOfJobs * self.eventsPerJob
344    
345     # If user requested all the events in the dataset
346     if (totalEventsRequested == -1):
347     eventsRemaining=self.maxEvents
348     # If user requested more events than are in the dataset
349     elif (totalEventsRequested > self.maxEvents):
350     eventsRemaining = self.maxEvents
351     common.logger.message("Requested "+str(self.total_number_of_events)+ " events, but only "+str(self.maxEvents)+" events are available.")
352     # If user requested less events than are in the dataset
353     else:
354     eventsRemaining = totalEventsRequested
355 slacapra 1.22
356 slacapra 1.41 # If user requested more events per job than are in the dataset
357     if (self.selectEventsPerJob and eventsPerJobRequested > self.maxEvents):
358     eventsPerJobRequested = self.maxEvents
359    
360 gutsche 1.35 # For user info at end
361     totalEventCount = 0
362 gutsche 1.3
363 gutsche 1.35 if (self.selectTotalNumberEvents and self.selectNumberOfJobs):
364     eventsPerJobRequested = int(eventsRemaining/self.theNumberOfJobs)
365 slacapra 1.22
366 gutsche 1.35 if (self.selectNumberOfJobs):
367     common.logger.message("May not create the exact number_of_jobs requested.")
368 slacapra 1.23
369 gutsche 1.38 if ( self.ncjobs == 'all' ) :
370     totalNumberOfJobs = 999999999
371     else :
372     totalNumberOfJobs = self.ncjobs
373 ewv 1.131
374 gutsche 1.35 blocks = blockSites.keys()
375     blockCount = 0
376     # Backup variable in case self.maxEvents counted events in a non-included block
377     numBlocksInDataset = len(blocks)
378 gutsche 1.3
379 gutsche 1.35 jobCount = 0
380     list_of_lists = []
381 gutsche 1.3
382 gutsche 1.92 # list tracking which jobs are in which jobs belong to which block
383     jobsOfBlock = {}
384    
385 gutsche 1.35 # ---- Iterate over the blocks in the dataset until ---- #
386     # ---- we've met the requested total # of events ---- #
387 gutsche 1.38 while ( (eventsRemaining > 0) and (blockCount < numBlocksInDataset) and (jobCount < totalNumberOfJobs)):
388 gutsche 1.35 block = blocks[blockCount]
389 gutsche 1.44 blockCount += 1
390 gutsche 1.104 if block not in jobsOfBlock.keys() :
391     jobsOfBlock[block] = []
392 ewv 1.131
393 gutsche 1.68 if self.eventsbyblock.has_key(block) :
394     numEventsInBlock = self.eventsbyblock[block]
395     common.logger.debug(5,'Events in Block File '+str(numEventsInBlock))
396 ewv 1.131
397 gutsche 1.68 files = self.filesbyblock[block]
398     numFilesInBlock = len(files)
399     if (numFilesInBlock <= 0):
400     continue
401     fileCount = 0
402    
403     # ---- New block => New job ---- #
404 ewv 1.131 parString = ""
405 gutsche 1.68 # counter for number of events in files currently worked on
406     filesEventCount = 0
407     # flag if next while loop should touch new file
408     newFile = 1
409     # job event counter
410     jobSkipEventCount = 0
411 ewv 1.131
412 gutsche 1.68 # ---- Iterate over the files in the block until we've met the requested ---- #
413     # ---- total # of events or we've gone over all the files in this block ---- #
414     while ( (eventsRemaining > 0) and (fileCount < numFilesInBlock) and (jobCount < totalNumberOfJobs) ):
415     file = files[fileCount]
416     if newFile :
417     try:
418     numEventsInFile = self.eventsbyfile[file]
419     common.logger.debug(6, "File "+str(file)+" has "+str(numEventsInFile)+" events")
420     # increase filesEventCount
421     filesEventCount += numEventsInFile
422     # Add file to current job
423     parString += '\\\"' + file + '\\\"\,'
424     newFile = 0
425     except KeyError:
426     common.logger.message("File "+str(file)+" has unknown number of events: skipping")
427 ewv 1.131
428 slacapra 1.177 eventsPerJobRequested = min(eventsPerJobRequested, eventsRemaining)
429 gutsche 1.68 # if less events in file remain than eventsPerJobRequested
430 slacapra 1.177 if ( filesEventCount - jobSkipEventCount < eventsPerJobRequested):
431 gutsche 1.68 # if last file in block
432     if ( fileCount == numFilesInBlock-1 ) :
433     # end job using last file, use remaining events in block
434     # close job and touch new file
435     fullString = parString[:-2]
436     list_of_lists.append([fullString,str(-1),str(jobSkipEventCount)])
437     common.logger.debug(3,"Job "+str(jobCount+1)+" can run over "+str(filesEventCount - jobSkipEventCount)+" events (last file in block).")
438     self.jobDestination.append(blockSites[block])
439     common.logger.debug(5,"Job "+str(jobCount+1)+" Destination: "+str(self.jobDestination[jobCount]))
440 gutsche 1.92 # fill jobs of block dictionary
441 gutsche 1.104 jobsOfBlock[block].append(jobCount+1)
442 gutsche 1.68 # reset counter
443     jobCount = jobCount + 1
444     totalEventCount = totalEventCount + filesEventCount - jobSkipEventCount
445     eventsRemaining = eventsRemaining - filesEventCount + jobSkipEventCount
446     jobSkipEventCount = 0
447     # reset file
448 ewv 1.131 parString = ""
449 gutsche 1.68 filesEventCount = 0
450     newFile = 1
451     fileCount += 1
452     else :
453     # go to next file
454     newFile = 1
455     fileCount += 1
456     # if events in file equal to eventsPerJobRequested
457     elif ( filesEventCount - jobSkipEventCount == eventsPerJobRequested ) :
458 gutsche 1.38 # close job and touch new file
459     fullString = parString[:-2]
460 gutsche 1.68 list_of_lists.append([fullString,str(eventsPerJobRequested),str(jobSkipEventCount)])
461     common.logger.debug(3,"Job "+str(jobCount+1)+" can run over "+str(eventsPerJobRequested)+" events.")
462 gutsche 1.38 self.jobDestination.append(blockSites[block])
463     common.logger.debug(5,"Job "+str(jobCount+1)+" Destination: "+str(self.jobDestination[jobCount]))
464 gutsche 1.104 jobsOfBlock[block].append(jobCount+1)
465 gutsche 1.38 # reset counter
466     jobCount = jobCount + 1
467 gutsche 1.68 totalEventCount = totalEventCount + eventsPerJobRequested
468     eventsRemaining = eventsRemaining - eventsPerJobRequested
469 gutsche 1.38 jobSkipEventCount = 0
470     # reset file
471 ewv 1.131 parString = ""
472 gutsche 1.38 filesEventCount = 0
473     newFile = 1
474     fileCount += 1
475 ewv 1.131
476 gutsche 1.68 # if more events in file remain than eventsPerJobRequested
477 gutsche 1.38 else :
478 gutsche 1.68 # close job but don't touch new file
479     fullString = parString[:-2]
480     list_of_lists.append([fullString,str(eventsPerJobRequested),str(jobSkipEventCount)])
481     common.logger.debug(3,"Job "+str(jobCount+1)+" can run over "+str(eventsPerJobRequested)+" events.")
482     self.jobDestination.append(blockSites[block])
483     common.logger.debug(5,"Job "+str(jobCount+1)+" Destination: "+str(self.jobDestination[jobCount]))
484 gutsche 1.104 jobsOfBlock[block].append(jobCount+1)
485 gutsche 1.68 # increase counter
486     jobCount = jobCount + 1
487     totalEventCount = totalEventCount + eventsPerJobRequested
488     eventsRemaining = eventsRemaining - eventsPerJobRequested
489     # calculate skip events for last file
490     # use filesEventCount (contains several files), jobSkipEventCount and eventsPerJobRequest
491     jobSkipEventCount = eventsPerJobRequested - (filesEventCount - jobSkipEventCount - self.eventsbyfile[file])
492     # remove all but the last file
493     filesEventCount = self.eventsbyfile[file]
494 ewv 1.160 parString = '\\\"' + file + '\\\"\,'
495 gutsche 1.68 pass # END if
496     pass # END while (iterate over files in the block)
497 gutsche 1.35 pass # END while (iterate over blocks in the dataset)
498 slacapra 1.41 self.ncjobs = self.total_number_of_jobs = jobCount
499 gutsche 1.38 if (eventsRemaining > 0 and jobCount < totalNumberOfJobs ):
500 gutsche 1.35 common.logger.message("Could not run on all requested events because some blocks not hosted at allowed sites.")
501 gutsche 1.92 common.logger.message(str(jobCount)+" job(s) can run on "+str(totalEventCount)+" events.\n")
502 ewv 1.131
503 gutsche 1.92 # screen output
504     screenOutput = "List of jobs and available destination sites:\n\n"
505    
506 mcinquil 1.124 # keep trace of block with no sites to print a warning at the end
507     noSiteBlock = []
508     bloskNoSite = []
509    
510 gutsche 1.92 blockCounter = 0
511 gutsche 1.104 for block in blocks:
512     if block in jobsOfBlock.keys() :
513     blockCounter += 1
514 slacapra 1.176 screenOutput += "Block %5i: jobs %20s: sites: %s\n" % (blockCounter,spanRanges(jobsOfBlock[block]),
515     ','.join(self.blackWhiteListParser.checkWhiteList(self.blackWhiteListParser.checkBlackList(blockSites[block],block),block)))
516 mcinquil 1.124 if len(self.blackWhiteListParser.checkWhiteList(self.blackWhiteListParser.checkBlackList(blockSites[block],block),block)) == 0:
517 ewv 1.131 noSiteBlock.append( spanRanges(jobsOfBlock[block]) )
518 mcinquil 1.124 bloskNoSite.append( blockCounter )
519 ewv 1.131
520 mcinquil 1.124 common.logger.message(screenOutput)
521 fanzago 1.127 if len(noSiteBlock) > 0 and len(bloskNoSite) > 0:
522 mcinquil 1.126 msg = 'WARNING: No sites are hosting any part of data for block:\n '
523     virgola = ""
524     if len(bloskNoSite) > 1:
525     virgola = ","
526     for block in bloskNoSite:
527     msg += ' ' + str(block) + virgola
528     msg += '\n Related jobs:\n '
529     virgola = ""
530     if len(noSiteBlock) > 1:
531     virgola = ","
532     for range_jobs in noSiteBlock:
533     msg += str(range_jobs) + virgola
534     msg += '\n will not be submitted and this block of data can not be analyzed!\n'
535 slacapra 1.155 if self.cfg_params.has_key('EDG.se_white_list'):
536     msg += 'WARNING: SE White List: '+self.cfg_params['EDG.se_white_list']+'\n'
537     msg += '(Hint: By whitelisting you force the job to run at this particular site(s).\n'
538     msg += 'Please check if the dataset is available at this site!)\n'
539     if self.cfg_params.has_key('EDG.ce_white_list'):
540     msg += 'WARNING: CE White List: '+self.cfg_params['EDG.ce_white_list']+'\n'
541     msg += '(Hint: By whitelisting you force the job to run at this particular site(s).\n'
542     msg += 'Please check if the dataset is available at this site!)\n'
543    
544 mcinquil 1.126 common.logger.message(msg)
545 gutsche 1.92
546 slacapra 1.9 self.list_of_args = list_of_lists
547     return
548    
549 slacapra 1.21 def jobSplittingNoInput(self):
550 slacapra 1.9 """
551     Perform job splitting based on number of event per job
552     """
553     common.logger.debug(5,'Splitting per events')
554 fanzago 1.130
555 ewv 1.131 if (self.selectEventsPerJob):
556 fanzago 1.130 common.logger.message('Required '+str(self.eventsPerJob)+' events per job ')
557     if (self.selectNumberOfJobs):
558     common.logger.message('Required '+str(self.theNumberOfJobs)+' jobs in total ')
559     if (self.selectTotalNumberEvents):
560     common.logger.message('Required '+str(self.total_number_of_events)+' events in total ')
561 slacapra 1.9
562 slacapra 1.10 if (self.total_number_of_events < 0):
563     msg='Cannot split jobs per Events with "-1" as total number of events'
564     raise CrabException(msg)
565    
566 slacapra 1.22 if (self.selectEventsPerJob):
567 spiga 1.65 if (self.selectTotalNumberEvents):
568     self.total_number_of_jobs = int(self.total_number_of_events/self.eventsPerJob)
569 ewv 1.131 elif(self.selectNumberOfJobs) :
570 spiga 1.65 self.total_number_of_jobs =self.theNumberOfJobs
571 ewv 1.131 self.total_number_of_events =int(self.theNumberOfJobs*self.eventsPerJob)
572 spiga 1.65
573 slacapra 1.22 elif (self.selectNumberOfJobs) :
574     self.total_number_of_jobs = self.theNumberOfJobs
575     self.eventsPerJob = int(self.total_number_of_events/self.total_number_of_jobs)
576 ewv 1.131
577 slacapra 1.9 common.logger.debug(5,'N jobs '+str(self.total_number_of_jobs))
578    
579     # is there any remainder?
580     check = int(self.total_number_of_events) - (int(self.total_number_of_jobs)*self.eventsPerJob)
581    
582     common.logger.debug(5,'Check '+str(check))
583    
584 gutsche 1.35 common.logger.message(str(self.total_number_of_jobs)+' jobs can be created, each for '+str(self.eventsPerJob)+' for a total of '+str(self.total_number_of_jobs*self.eventsPerJob)+' events')
585 slacapra 1.9 if check > 0:
586 gutsche 1.35 common.logger.message('Warning: asked '+str(self.total_number_of_events)+' but can do only '+str(int(self.total_number_of_jobs)*self.eventsPerJob))
587 slacapra 1.9
588 slacapra 1.10 # argument is seed number.$i
589 slacapra 1.9 self.list_of_args = []
590     for i in range(self.total_number_of_jobs):
591 gutsche 1.35 ## Since there is no input, any site is good
592 ewv 1.131 self.jobDestination.append([""]) #must be empty to write correctly the xml
593 slacapra 1.90 args=[]
594 spiga 1.57 if (self.firstRun):
595 slacapra 1.138 ## pythia first run
596 slacapra 1.90 args.append(str(self.firstRun)+str(i))
597     self.list_of_args.append(args)
598 ewv 1.131
599 gutsche 1.3 return
600    
601 spiga 1.42
602     def jobSplittingForScript(self):#CarlosDaniele
603     """
604     Perform job splitting based on number of job
605     """
606     common.logger.debug(5,'Splitting per job')
607     common.logger.message('Required '+str(self.theNumberOfJobs)+' jobs in total ')
608    
609     self.total_number_of_jobs = self.theNumberOfJobs
610    
611     common.logger.debug(5,'N jobs '+str(self.total_number_of_jobs))
612    
613     common.logger.message(str(self.total_number_of_jobs)+' jobs can be created')
614    
615     # argument is seed number.$i
616     self.list_of_args = []
617     for i in range(self.total_number_of_jobs):
618     ## Since there is no input, any site is good
619     # self.jobDestination.append(["Any"])
620     self.jobDestination.append([""])
621     ## no random seed
622     self.list_of_args.append([str(i)])
623     return
624    
625 gutsche 1.3 def split(self, jobParams):
626 ewv 1.131
627 gutsche 1.3 #### Fabio
628     njobs = self.total_number_of_jobs
629 slacapra 1.9 arglist = self.list_of_args
630 gutsche 1.3 # create the empty structure
631     for i in range(njobs):
632     jobParams.append("")
633 ewv 1.131
634 spiga 1.165 listID=[]
635     listField=[]
636 gutsche 1.3 for job in range(njobs):
637 slacapra 1.17 jobParams[job] = arglist[job]
638 spiga 1.167 listID.append(job+1)
639 spiga 1.162 job_ToSave ={}
640 spiga 1.169 concString = ' '
641 spiga 1.165 argu=''
642     if len(jobParams[job]):
643     argu += concString.join(jobParams[job] )
644 spiga 1.169 job_ToSave['arguments']= str(job+1)+' '+argu## new BL--DS
645 spiga 1.162 job_ToSave['dlsDestination']= self.jobDestination[job]## new BL--DS
646 spiga 1.165 #common._db.updateJob_(job,job_ToSave)## new BL--DS
647     listField.append(job_ToSave)
648 spiga 1.169 msg="Job "+str(job)+" Arguments: "+str(job+1)+" "+argu+"\n" \
649 spiga 1.165 +" Destination: "+str(self.jobDestination[job])
650     common.logger.debug(5,msg)
651     #common.logger.debug(5,"Job "+str(job)+" Destination: "+str(self.jobDestination[job]))
652     common._db.updateJob_(listID,listField)## new BL--DS
653     ## Pay Attention Here....DS--BL
654 spiga 1.181 self.argsList = (len(jobParams[0])+1)
655 gutsche 1.3
656     return
657 ewv 1.131
658 gutsche 1.3 def numberOfJobs(self):
659     # Fabio
660     return self.total_number_of_jobs
661    
662 slacapra 1.1 def getTarBall(self, exe):
663     """
664     Return the TarBall with lib and exe
665     """
666 ewv 1.131
667 slacapra 1.1 # if it exist, just return it
668 corvo 1.56 #
669     # Marco. Let's start to use relative path for Boss XML files
670     #
671     self.tgzNameWithPath = common.work_space.pathForTgz()+'share/'+self.tgz_name
672 slacapra 1.1 if os.path.exists(self.tgzNameWithPath):
673     return self.tgzNameWithPath
674    
675     # Prepare a tar gzipped file with user binaries.
676     self.buildTar_(exe)
677    
678     return string.strip(self.tgzNameWithPath)
679    
680     def buildTar_(self, executable):
681    
682     # First of all declare the user Scram area
683     swArea = self.scram.getSWArea_()
684     #print "swArea = ", swArea
685 slacapra 1.63 # swVersion = self.scram.getSWVersion()
686     # print "swVersion = ", swVersion
687 slacapra 1.1 swReleaseTop = self.scram.getReleaseTop_()
688     #print "swReleaseTop = ", swReleaseTop
689 ewv 1.131
690 slacapra 1.1 ## check if working area is release top
691     if swReleaseTop == '' or swArea == swReleaseTop:
692 afanfani 1.172 common.logger.debug(3,"swArea = "+swArea+" swReleaseTop ="+swReleaseTop)
693 slacapra 1.1 return
694    
695 slacapra 1.61 import tarfile
696     try: # create tar ball
697     tar = tarfile.open(self.tgzNameWithPath, "w:gz")
698     ## First find the executable
699 slacapra 1.86 if (self.executable != ''):
700 slacapra 1.61 exeWithPath = self.scram.findFile_(executable)
701     if ( not exeWithPath ):
702     raise CrabException('User executable '+executable+' not found')
703 ewv 1.131
704 slacapra 1.61 ## then check if it's private or not
705     if exeWithPath.find(swReleaseTop) == -1:
706     # the exe is private, so we must ship
707     common.logger.debug(5,"Exe "+exeWithPath+" to be tarred")
708     path = swArea+'/'
709 corvo 1.85 # distinguish case when script is in user project area or given by full path somewhere else
710     if exeWithPath.find(path) >= 0 :
711     exe = string.replace(exeWithPath, path,'')
712 slacapra 1.129 tar.add(path+exe,exe)
713 corvo 1.85 else :
714     tar.add(exeWithPath,os.path.basename(executable))
715 slacapra 1.61 pass
716     else:
717     # the exe is from release, we'll find it on WN
718     pass
719 ewv 1.131
720 slacapra 1.61 ## Now get the libraries: only those in local working area
721     libDir = 'lib'
722     lib = swArea+'/' +libDir
723     common.logger.debug(5,"lib "+lib+" to be tarred")
724     if os.path.exists(lib):
725     tar.add(lib,libDir)
726 ewv 1.131
727 slacapra 1.61 ## Now check if module dir is present
728     moduleDir = 'module'
729     module = swArea + '/' + moduleDir
730     if os.path.isdir(module):
731     tar.add(module,moduleDir)
732    
733     ## Now check if any data dir(s) is present
734     swAreaLen=len(swArea)
735 spiga 1.179 self.dataExist = False
736 slacapra 1.61 for root, dirs, files in os.walk(swArea):
737     if "data" in dirs:
738 spiga 1.179 self.dataExist=True
739 slacapra 1.61 common.logger.debug(5,"data "+root+"/data"+" to be tarred")
740     tar.add(root+"/data",root[swAreaLen:]+"/data")
741 spiga 1.179
742     ### CMSSW ParameterSet
743     if not self.pset is None:
744     cfg_file = common.work_space.jobDir()+self.configFilename()
745     tar.add(cfg_file,self.configFilename())
746     common.logger.debug(5,"File added to "+self.tgzNameWithPath+" : "+str(tar.getnames()))
747 slacapra 1.61
748 fanzago 1.93
749 fanzago 1.152 ## Add ProdCommon dir to tar
750 fanzago 1.93 prodcommonDir = 'ProdCommon'
751     prodcommonPath = os.environ['CRABDIR'] + '/' + 'ProdCommon'
752     if os.path.isdir(prodcommonPath):
753     tar.add(prodcommonPath,prodcommonDir)
754 spiga 1.179 common.logger.debug(5,"Files added to "+self.tgzNameWithPath+" : "+str(tar.getnames()))
755    
756     ##### ML stuff
757     ML_file_list=['report.py', 'DashboardAPI.py', 'Logger.py', 'ProcInfo.py', 'apmon.py']
758     path=os.environ['CRABDIR'] + '/python/'
759     for file in ML_file_list:
760     tar.add(path+file,file)
761     common.logger.debug(5,"Files added to "+self.tgzNameWithPath+" : "+str(tar.getnames()))
762    
763     ##### Utils
764     Utils_file_list=['parseCrabFjr.py','writeCfg.py', 'JobReportErrorCode.py']
765     for file in Utils_file_list:
766     tar.add(path+file,file)
767     common.logger.debug(5,"Files added to "+self.tgzNameWithPath+" : "+str(tar.getnames()))
768 ewv 1.131
769 spiga 1.179 ##### AdditionalFiles
770     for file in self.additional_inbox_files:
771     tar.add(file,string.split(file,'/')[-1])
772 slacapra 1.61 common.logger.debug(5,"Files added to "+self.tgzNameWithPath+" : "+str(tar.getnames()))
773 spiga 1.179
774 slacapra 1.61 tar.close()
775     except :
776     raise CrabException('Could not create tar-ball')
777 gutsche 1.72
778     ## check for tarball size
779     tarballinfo = os.stat(self.tgzNameWithPath)
780     if ( tarballinfo.st_size > self.MaxTarBallSize*1024*1024 ) :
781     raise CrabException('Input sandbox size of ' + str(float(tarballinfo.st_size)/1024.0/1024.0) + ' MB is larger than the allowed ' + str(self.MaxTarBallSize) + ' MB input sandbox limit and not supported by the used GRID submission system. Please make sure that no unnecessary files are in all data directories in your local CMSSW project area as they are automatically packed into the input sandbox.')
782    
783 slacapra 1.61 ## create tar-ball with ML stuff
784 slacapra 1.97
785 spiga 1.165 def wsSetupEnvironment(self, nj=0):
786 slacapra 1.1 """
787     Returns part of a job script which prepares
788     the execution environment for the job 'nj'.
789     """
790     # Prepare JobType-independent part
791 ewv 1.160 txt = '\n#Written by cms_cmssw::wsSetupEnvironment\n'
792 fanzago 1.133 txt += 'echo ">>> setup environment"\n'
793 ewv 1.131 txt += 'if [ $middleware == LCG ]; then \n'
794 gutsche 1.3 txt += self.wsSetupCMSLCGEnvironment_()
795     txt += 'elif [ $middleware == OSG ]; then\n'
796 gutsche 1.43 txt += ' WORKING_DIR=`/bin/mktemp -d $OSG_WN_TMP/cms_XXXXXXXXXXXX`\n'
797 ewv 1.132 txt += ' if [ ! $? == 0 ] ;then\n'
798 fanzago 1.161 txt += ' echo "ERROR ==> OSG $WORKING_DIR could not be created on WN `hostname`"\n'
799     txt += ' job_exit_code=10016\n'
800     txt += ' func_exit\n'
801 gutsche 1.3 txt += ' fi\n'
802 fanzago 1.133 txt += ' echo ">>> Created working directory: $WORKING_DIR"\n'
803 gutsche 1.3 txt += '\n'
804     txt += ' echo "Change to working directory: $WORKING_DIR"\n'
805     txt += ' cd $WORKING_DIR\n'
806 fanzago 1.133 txt += ' echo ">>> current directory (WORKING_DIR): $WORKING_DIR"\n'
807 ewv 1.131 txt += self.wsSetupCMSOSGEnvironment_()
808 fanzago 1.133 #txt += ' echo "### Set SCRAM ARCH to ' + self.executable_arch + ' ###"\n'
809     #txt += ' export SCRAM_ARCH='+self.executable_arch+'\n'
810 gutsche 1.3 txt += 'fi\n'
811 slacapra 1.1
812     # Prepare JobType-specific part
813     scram = self.scram.commandName()
814     txt += '\n\n'
815 fanzago 1.133 txt += 'echo ">>> specific cmssw setup environment:"\n'
816     txt += 'echo "CMSSW_VERSION = '+self.version+'"\n'
817 slacapra 1.1 txt += scram+' project CMSSW '+self.version+'\n'
818     txt += 'status=$?\n'
819     txt += 'if [ $status != 0 ] ; then\n'
820 fanzago 1.161 txt += ' echo "ERROR ==> CMSSW '+self.version+' not found on `hostname`" \n'
821     txt += ' job_exit_code=10034\n'
822 fanzago 1.163 txt += ' func_exit\n'
823 slacapra 1.1 txt += 'fi \n'
824     txt += 'cd '+self.version+'\n'
825 fanzago 1.99 txt += 'SOFTWARE_DIR=`pwd`\n'
826 fanzago 1.133 txt += 'echo ">>> current directory (SOFTWARE_DIR): $SOFTWARE_DIR" \n'
827 slacapra 1.1 txt += 'eval `'+scram+' runtime -sh | grep -v SCRAMRT_LSB_JOBNAME`\n'
828 fanzago 1.180 txt += 'if [ $? != 0 ] ; then\n'
829     txt += ' echo "ERROR ==> Problem with the command: "\n'
830     txt += ' echo "eval \`'+scram+' runtime -sh | grep -v SCRAMRT_LSB_JOBNAME \` at `hostname`"\n'
831     txt += ' job_exit_code=10034\n'
832     txt += ' func_exit\n'
833     txt += 'fi \n'
834 slacapra 1.1 # Handle the arguments:
835     txt += "\n"
836 gutsche 1.7 txt += "## number of arguments (first argument always jobnumber)\n"
837 slacapra 1.1 txt += "\n"
838 spiga 1.165 txt += "if [ $nargs -lt "+str(self.argsList)+" ]\n"
839 slacapra 1.1 txt += "then\n"
840 fanzago 1.161 txt += " echo 'ERROR ==> Too few arguments' +$nargs+ \n"
841     txt += ' job_exit_code=50113\n'
842     txt += " func_exit\n"
843 slacapra 1.1 txt += "fi\n"
844     txt += "\n"
845    
846     # Prepare job-specific part
847     job = common.job_list[nj]
848 fanzago 1.93 ### FEDE FOR DBS OUTPUT PUBLICATION
849 ewv 1.131 if (self.datasetPath):
850 fanzago 1.93 txt += '\n'
851     txt += 'DatasetPath='+self.datasetPath+'\n'
852    
853     datasetpath_split = self.datasetPath.split("/")
854 ewv 1.131
855 fanzago 1.93 txt += 'PrimaryDataset='+datasetpath_split[1]+'\n'
856     txt += 'DataTier='+datasetpath_split[2]+'\n'
857 fanzago 1.96 txt += 'ApplicationFamily=cmsRun\n'
858 fanzago 1.93
859     else:
860     txt += 'DatasetPath=MCDataTier\n'
861     txt += 'PrimaryDataset=null\n'
862     txt += 'DataTier=null\n'
863     txt += 'ApplicationFamily=MCDataTier\n'
864 ewv 1.170 if self.pset != None:
865 spiga 1.42 pset = os.path.basename(job.configFilename())
866     txt += '\n'
867 spiga 1.95 txt += 'cp $RUNTIME_AREA/'+pset+' .\n'
868 spiga 1.42 if (self.datasetPath): # standard job
869 ewv 1.160 txt += 'InputFiles=${args[1]}; export InputFiles\n'
870     txt += 'MaxEvents=${args[2]}; export MaxEvents\n'
871     txt += 'SkipEvents=${args[3]}; export SkipEvents\n'
872 spiga 1.42 txt += 'echo "Inputfiles:<$InputFiles>"\n'
873     txt += 'echo "MaxEvents:<$MaxEvents>"\n'
874     txt += 'echo "SkipEvents:<$SkipEvents>"\n'
875     else: # pythia like job
876 ewv 1.160 txt += 'PreserveSeeds=' + ','.join(self.preserveSeeds) + '; export PreserveSeeds\n'
877     txt += 'IncrementSeeds=' + ','.join(self.incrementSeeds) + '; export IncrementSeeds\n'
878     txt += 'echo "PreserveSeeds: <$PreserveSeeds>"\n'
879     txt += 'echo "IncrementSeeds:<$IncrementSeeds>"\n'
880 slacapra 1.90 if (self.firstRun):
881 ewv 1.160 txt += 'FirstRun=${args[1]}; export FirstRun\n'
882 spiga 1.57 txt += 'echo "FirstRun: <$FirstRun>"\n'
883 slacapra 1.90
884     txt += 'mv -f '+pset+' pset.cfg\n'
885 slacapra 1.1
886    
887 fanzago 1.163 if self.pset != None:
888 spiga 1.42 txt += '\n'
889     txt += 'echo "***** cat pset.cfg *********"\n'
890     txt += 'cat pset.cfg\n'
891     txt += 'echo "****** end pset.cfg ********"\n'
892     txt += '\n'
893 fanzago 1.94 txt += 'PSETHASH=`EdmConfigHash < pset.cfg` \n'
894     txt += 'echo "PSETHASH = $PSETHASH" \n'
895 fanzago 1.93 txt += '\n'
896 gutsche 1.3 return txt
897 slacapra 1.176
898 fanzago 1.166 def wsUntarSoftware(self, nj=0):
899 gutsche 1.3 """
900     Put in the script the commands to build an executable
901     or a library.
902     """
903    
904 fanzago 1.166 txt = '\n#Written by cms_cmssw::wsUntarSoftware\n'
905 gutsche 1.3
906     if os.path.isfile(self.tgzNameWithPath):
907 fanzago 1.133 txt += 'echo ">>> tar xzvf $RUNTIME_AREA/'+os.path.basename(self.tgzNameWithPath)+' :" \n'
908 gutsche 1.3 txt += 'tar xzvf $RUNTIME_AREA/'+os.path.basename(self.tgzNameWithPath)+'\n'
909 spiga 1.179 txt += 'ls -Al \n'
910 gutsche 1.3 txt += 'untar_status=$? \n'
911     txt += 'if [ $untar_status -ne 0 ]; then \n'
912 fanzago 1.161 txt += ' echo "ERROR ==> Untarring .tgz file failed"\n'
913     txt += ' job_exit_code=$untar_status\n'
914     txt += ' func_exit\n'
915 gutsche 1.3 txt += 'else \n'
916     txt += ' echo "Successful untar" \n'
917     txt += 'fi \n'
918 gutsche 1.50 txt += '\n'
919 fanzago 1.152 txt += 'echo ">>> Include ProdCommon in PYTHONPATH:"\n'
920 gutsche 1.50 txt += 'if [ -z "$PYTHONPATH" ]; then\n'
921 fanzago 1.166 txt += ' export PYTHONPATH=$RUNTIME_AREA/ProdCommon\n'
922 gutsche 1.50 txt += 'else\n'
923 fanzago 1.166 txt += ' export PYTHONPATH=$RUNTIME_AREA/ProdCommon:${PYTHONPATH}\n'
924 fanzago 1.93 txt += 'echo "PYTHONPATH=$PYTHONPATH"\n'
925 gutsche 1.50 txt += 'fi\n'
926     txt += '\n'
927    
928 gutsche 1.3 pass
929 ewv 1.131
930 slacapra 1.1 return txt
931 ewv 1.170
932 fanzago 1.166 def wsBuildExe(self, nj=0):
933     """
934     Put in the script the commands to build an executable
935     or a library.
936     """
937    
938     txt = '\n#Written by cms_cmssw::wsBuildExe\n'
939     txt += 'echo ">>> moving CMSSW software directories in `pwd`" \n'
940    
941 ewv 1.170 txt += 'rm -r lib/ module/ \n'
942     txt += 'mv $RUNTIME_AREA/lib/ . \n'
943     txt += 'mv $RUNTIME_AREA/module/ . \n'
944 spiga 1.179 if self.dataExist == True: txt += 'mv $RUNTIME_AREA/src/ . \n'
945     if len(self.additional_inbox_files)>0:
946     for file in self.additional_inbox_files:
947     txt += 'mv $RUNTIME_AREA/'+file+' . \n'
948 ewv 1.170 txt += 'mv $RUNTIME_AREA/ProdCommon/ . \n'
949    
950 fanzago 1.166 txt += 'if [ -z "$PYTHONPATH" ]; then\n'
951     txt += ' export PYTHONPATH=$SOFTWARE_DIR/ProdCommon\n'
952     txt += 'else\n'
953     txt += ' export PYTHONPATH=$SOFTWARE_DIR/ProdCommon:${PYTHONPATH}\n'
954     txt += 'echo "PYTHONPATH=$PYTHONPATH"\n'
955     txt += 'fi\n'
956     txt += '\n'
957    
958     return txt
959 slacapra 1.1
960     def modifySteeringCards(self, nj):
961     """
962 ewv 1.131 modify the card provided by the user,
963 slacapra 1.1 writing a new card into share dir
964     """
965 ewv 1.131
966 slacapra 1.1 def executableName(self):
967 slacapra 1.70 if self.scriptExe: #CarlosDaniele
968 spiga 1.42 return "sh "
969     else:
970     return self.executable
971 slacapra 1.1
972     def executableArgs(self):
973 ewv 1.160 # FUTURE: This function tests the CMSSW version. Can be simplified as we drop support for old versions
974 slacapra 1.70 if self.scriptExe:#CarlosDaniele
975 spiga 1.42 return self.scriptExe + " $NJob"
976 fanzago 1.115 else:
977     version_array = self.scram.getSWVersion().split('_')
978     major = 0
979     minor = 0
980     try:
981     major = int(version_array[1])
982     minor = int(version_array[2])
983     except:
984 ewv 1.131 msg = "Cannot parse CMSSW version string: " + "_".join(version_array) + " for major and minor release number!"
985 fanzago 1.115 raise CrabException(msg)
986 ewv 1.160
987     ex_args = ""
988 ewv 1.171 # FUTURE: This tests the CMSSW version. Can remove code as versions deprecated
989 ewv 1.160 # Framework job report
990 fanzago 1.115 if major >= 1 and minor >= 5 :
991 fanzago 1.166 ex_args += " -j $RUNTIME_AREA/crab_fjr_$NJob.xml"
992 ewv 1.160 # Type of cfg file
993     if major >= 2 :
994 ewv 1.171 ex_args += " -p pset.py"
995 fanzago 1.115 else:
996 ewv 1.160 ex_args += " -p pset.cfg"
997     return ex_args
998 slacapra 1.1
999     def inputSandbox(self, nj):
1000     """
1001     Returns a list of filenames to be put in JDL input sandbox.
1002     """
1003     inp_box = []
1004 slacapra 1.53 # # dict added to delete duplicate from input sandbox file list
1005     # seen = {}
1006 slacapra 1.1 ## code
1007     if os.path.isfile(self.tgzNameWithPath):
1008     inp_box.append(self.tgzNameWithPath)
1009 spiga 1.168 wrapper = os.path.basename(str(common._db.queryTask('scriptName')))
1010     inp_box.append(common.work_space.pathForTgz() +'job/'+ wrapper)
1011 slacapra 1.1 return inp_box
1012    
1013     def outputSandbox(self, nj):
1014     """
1015     Returns a list of filenames to be put in JDL output sandbox.
1016     """
1017     out_box = []
1018    
1019     ## User Declared output files
1020 slacapra 1.54 for out in (self.output_file+self.output_file_sandbox):
1021 ewv 1.131 n_out = nj + 1
1022 slacapra 1.1 out_box.append(self.numberFile_(out,str(n_out)))
1023     return out_box
1024    
1025     def prepareSteeringCards(self):
1026     """
1027     Make initial modifications of the user's steering card file.
1028     """
1029     return
1030    
1031     def wsRenameOutput(self, nj):
1032     """
1033     Returns part of a job script which renames the produced files.
1034     """
1035    
1036 ewv 1.160 txt = '\n#Written by cms_cmssw::wsRenameOutput\n'
1037 fanzago 1.148 txt += 'echo ">>> current directory (SOFTWARE_DIR): $SOFTWARE_DIR" \n'
1038     txt += 'echo ">>> current directory content:"\n'
1039 gutsche 1.7 txt += 'ls \n'
1040 fanzago 1.145 txt += '\n'
1041 slacapra 1.54
1042 fanzago 1.128 for fileWithSuffix in (self.output_file):
1043 slacapra 1.1 output_file_num = self.numberFile_(fileWithSuffix, '$NJob')
1044     txt += '\n'
1045 gutsche 1.7 txt += '# check output file\n'
1046 slacapra 1.106 txt += 'if [ -e ./'+fileWithSuffix+' ] ; then\n'
1047 ewv 1.147 if (self.copy_data == 1): # For OSG nodes, file is in $WORKING_DIR, should not be moved to $RUNTIME_AREA
1048     txt += ' mv '+fileWithSuffix+' '+output_file_num+'\n'
1049     txt += ' ln -s `pwd`/'+output_file_num+' $RUNTIME_AREA/'+fileWithSuffix+'\n'
1050     else:
1051     txt += ' mv '+fileWithSuffix+' $RUNTIME_AREA/'+output_file_num+'\n'
1052     txt += ' ln -s $RUNTIME_AREA/'+output_file_num+' $RUNTIME_AREA/'+fileWithSuffix+'\n'
1053 slacapra 1.106 txt += 'else\n'
1054 fanzago 1.161 txt += ' job_exit_code=60302\n'
1055     txt += ' echo "WARNING: Output file '+fileWithSuffix+' not found"\n'
1056 ewv 1.156 if common.scheduler.name().upper() == 'CONDOR_G':
1057 gutsche 1.7 txt += ' if [ $middleware == OSG ]; then \n'
1058     txt += ' echo "prepare dummy output file"\n'
1059     txt += ' echo "Processing of job output failed" > $RUNTIME_AREA/'+output_file_num+'\n'
1060     txt += ' fi \n'
1061 slacapra 1.1 txt += 'fi\n'
1062 slacapra 1.105 file_list = []
1063     for fileWithSuffix in (self.output_file):
1064     file_list.append(self.numberFile_(fileWithSuffix, '$NJob'))
1065 ewv 1.131
1066 slacapra 1.105 txt += 'file_list="'+string.join(file_list,' ')+'"\n'
1067 fanzago 1.149 txt += '\n'
1068 fanzago 1.148 txt += 'echo ">>> current directory (SOFTWARE_DIR): $SOFTWARE_DIR" \n'
1069     txt += 'echo ">>> current directory content:"\n'
1070     txt += 'ls \n'
1071     txt += '\n'
1072 gutsche 1.7 txt += 'cd $RUNTIME_AREA\n'
1073 fanzago 1.133 txt += 'echo ">>> current directory (RUNTIME_AREA): $RUNTIME_AREA"\n'
1074 slacapra 1.1 return txt
1075    
1076     def numberFile_(self, file, txt):
1077     """
1078     append _'txt' before last extension of a file
1079     """
1080     p = string.split(file,".")
1081     # take away last extension
1082     name = p[0]
1083     for x in p[1:-1]:
1084 slacapra 1.90 name=name+"."+x
1085 slacapra 1.1 # add "_txt"
1086     if len(p)>1:
1087 slacapra 1.90 ext = p[len(p)-1]
1088     result = name + '_' + txt + "." + ext
1089 slacapra 1.1 else:
1090 slacapra 1.90 result = name + '_' + txt
1091 ewv 1.131
1092 slacapra 1.1 return result
1093    
1094 slacapra 1.63 def getRequirements(self, nj=[]):
1095 slacapra 1.1 """
1096 ewv 1.131 return job requirements to add to jdl files
1097 slacapra 1.1 """
1098     req = ''
1099 slacapra 1.47 if self.version:
1100 slacapra 1.10 req='Member("VO-cms-' + \
1101 slacapra 1.47 self.version + \
1102 slacapra 1.10 '", other.GlueHostApplicationSoftwareRunTimeEnvironment)'
1103 farinafa 1.111 ## SL add requirement for OS version only if SL4
1104     #reSL4 = re.compile( r'slc4' )
1105 slacapra 1.109 if self.executable_arch: # and reSL4.search(self.executable_arch):
1106 gutsche 1.107 req+=' && Member("VO-cms-' + \
1107 slacapra 1.105 self.executable_arch + \
1108     '", other.GlueHostApplicationSoftwareRunTimeEnvironment)'
1109 gutsche 1.35
1110     req = req + ' && (other.GlueHostNetworkAdapterOutboundIP)'
1111 afanfani 1.158 if common.scheduler.name() == "glitecoll":
1112     req += ' && other.GlueCEStateStatus == "Production" '
1113 gutsche 1.35
1114 slacapra 1.1 return req
1115 gutsche 1.3
1116     def configFilename(self):
1117     """ return the config filename """
1118     return self.name()+'.cfg'
1119    
1120     def wsSetupCMSOSGEnvironment_(self):
1121     """
1122     Returns part of a job script which is prepares
1123     the execution environment and which is common for all CMS jobs.
1124     """
1125 ewv 1.160 txt = '\n#Written by cms_cmssw::wsSetupCMSOSGEnvironment_\n'
1126     txt += ' echo ">>> setup CMS OSG environment:"\n'
1127 fanzago 1.133 txt += ' echo "set SCRAM ARCH to ' + self.executable_arch + '"\n'
1128     txt += ' export SCRAM_ARCH='+self.executable_arch+'\n'
1129 fanzago 1.136 txt += ' echo "SCRAM_ARCH = $SCRAM_ARCH"\n'
1130 ewv 1.135 txt += ' if [ -f $OSG_APP/cmssoft/cms/cmsset_default.sh ] ;then\n'
1131 mkirn 1.40 txt += ' # Use $OSG_APP/cmssoft/cms/cmsset_default.sh to setup cms software\n'
1132 fanzago 1.133 txt += ' source $OSG_APP/cmssoft/cms/cmsset_default.sh '+self.version+'\n'
1133     txt += ' else\n'
1134 fanzago 1.161 txt += ' echo "ERROR ==> $OSG_APP/cmssoft/cms/cmsset_default.sh file not found"\n'
1135     txt += ' job_exit_code=10020\n'
1136     txt += ' func_exit\n'
1137 fanzago 1.133 txt += ' fi\n'
1138 gutsche 1.3 txt += '\n'
1139 fanzago 1.161 txt += ' echo "==> setup cms environment ok"\n'
1140 fanzago 1.136 txt += ' echo "SCRAM_ARCH = $SCRAM_ARCH"\n'
1141 gutsche 1.3
1142     return txt
1143 ewv 1.131
1144 gutsche 1.3 ### OLI_DANIELE
1145     def wsSetupCMSLCGEnvironment_(self):
1146     """
1147     Returns part of a job script which is prepares
1148     the execution environment and which is common for all CMS jobs.
1149     """
1150 ewv 1.160 txt = '\n#Written by cms_cmssw::wsSetupCMSLCGEnvironment_\n'
1151     txt += ' echo ">>> setup CMS LCG environment:"\n'
1152 fanzago 1.133 txt += ' echo "set SCRAM ARCH and BUILD_ARCH to ' + self.executable_arch + ' ###"\n'
1153     txt += ' export SCRAM_ARCH='+self.executable_arch+'\n'
1154     txt += ' export BUILD_ARCH='+self.executable_arch+'\n'
1155     txt += ' if [ ! $VO_CMS_SW_DIR ] ;then\n'
1156 fanzago 1.161 txt += ' echo "ERROR ==> CMS software dir not found on WN `hostname`"\n'
1157     txt += ' job_exit_code=10031\n'
1158     txt += ' func_exit\n'
1159 fanzago 1.133 txt += ' else\n'
1160     txt += ' echo "Sourcing environment... "\n'
1161     txt += ' if [ ! -s $VO_CMS_SW_DIR/cmsset_default.sh ] ;then\n'
1162 fanzago 1.161 txt += ' echo "ERROR ==> cmsset_default.sh file not found into dir $VO_CMS_SW_DIR"\n'
1163     txt += ' job_exit_code=10020\n'
1164     txt += ' func_exit\n'
1165 fanzago 1.133 txt += ' fi\n'
1166     txt += ' echo "sourcing $VO_CMS_SW_DIR/cmsset_default.sh"\n'
1167     txt += ' source $VO_CMS_SW_DIR/cmsset_default.sh\n'
1168     txt += ' result=$?\n'
1169     txt += ' if [ $result -ne 0 ]; then\n'
1170 fanzago 1.161 txt += ' echo "ERROR ==> problem sourcing $VO_CMS_SW_DIR/cmsset_default.sh"\n'
1171     txt += ' job_exit_code=10032\n'
1172     txt += ' func_exit\n'
1173 fanzago 1.133 txt += ' fi\n'
1174     txt += ' fi\n'
1175     txt += ' \n'
1176 fanzago 1.161 txt += ' echo "==> setup cms environment ok"\n'
1177 gutsche 1.3 return txt
1178 gutsche 1.5
1179 ewv 1.131 ### FEDE FOR DBS OUTPUT PUBLICATION
1180 fanzago 1.93 def modifyReport(self, nj):
1181     """
1182 ewv 1.131 insert the part of the script that modifies the FrameworkJob Report
1183 fanzago 1.93 """
1184 fanzago 1.94
1185 ewv 1.160 txt = '\n#Written by cms_cmssw::modifyReport\n'
1186 slacapra 1.176 publish_data = int(self.cfg_params.get('USER.publish_data',0))
1187 ewv 1.131 if (publish_data == 1):
1188 fanzago 1.94 processedDataset = self.cfg_params['USER.publish_data_name']
1189 fanzago 1.173 LFNBaseName = LFNBase(processedDataset)
1190 fanzago 1.175
1191     txt += 'if [ $copy_exit_status -eq 0 ]; then\n'
1192 fanzago 1.173 txt += ' FOR_LFN=%s_${PSETHASH}/\n'%(LFNBaseName)
1193 fanzago 1.175 txt += 'else\n'
1194     txt += ' FOR_LFN=/copy_problems/ \n'
1195     txt += ' SE=""\n'
1196     txt += ' SE_PATH=""\n'
1197     txt += 'fi\n'
1198    
1199     txt += 'echo ">>> Modify Job Report:" \n'
1200     txt += 'chmod a+x $SOFTWARE_DIR/ProdCommon/ProdCommon/FwkJobRep/ModifyJobReport.py\n'
1201     txt += 'ProcessedDataset='+processedDataset+'\n'
1202     txt += 'echo "ProcessedDataset = $ProcessedDataset"\n'
1203     txt += 'echo "SE = $SE"\n'
1204     txt += 'echo "SE_PATH = $SE_PATH"\n'
1205     txt += 'echo "FOR_LFN = $FOR_LFN" \n'
1206     txt += 'echo "CMSSW_VERSION = $CMSSW_VERSION"\n\n'
1207     txt += 'echo "$SOFTWARE_DIR/ProdCommon/ProdCommon/FwkJobRep/ModifyJobReport.py $RUNTIME_AREA/crab_fjr_$NJob.xml $NJob $FOR_LFN $PrimaryDataset $DataTier $ProcessedDataset $ApplicationFamily $executable $CMSSW_VERSION $PSETHASH $SE $SE_PATH"\n'
1208     txt += '$SOFTWARE_DIR/ProdCommon/ProdCommon/FwkJobRep/ModifyJobReport.py $RUNTIME_AREA/crab_fjr_$NJob.xml $NJob $FOR_LFN $PrimaryDataset $DataTier $ProcessedDataset $ApplicationFamily $executable $CMSSW_VERSION $PSETHASH $SE $SE_PATH\n'
1209     txt += 'modifyReport_result=$?\n'
1210     txt += 'if [ $modifyReport_result -ne 0 ]; then\n'
1211     txt += ' modifyReport_result=70500\n'
1212     txt += ' job_exit_code=$modifyReport_result\n'
1213     txt += ' echo "ModifyReportResult=$modifyReport_result" | tee -a $RUNTIME_AREA/$repo\n'
1214     txt += ' echo "WARNING: Problem with ModifyJobReport"\n'
1215     txt += 'else\n'
1216     txt += ' mv NewFrameworkJobReport.xml $RUNTIME_AREA/crab_fjr_$NJob.xml\n'
1217 spiga 1.103 txt += 'fi\n'
1218 fanzago 1.93 return txt
1219 fanzago 1.99
1220 gutsche 1.5 def setParam_(self, param, value):
1221     self._params[param] = value
1222    
1223     def getParams(self):
1224     return self._params
1225 gutsche 1.8
1226 gutsche 1.35 def uniquelist(self, old):
1227     """
1228     remove duplicates from a list
1229     """
1230     nd={}
1231     for e in old:
1232     nd[e]=0
1233     return nd.keys()
1234 mcinquil 1.121
1235 spiga 1.169 def outList(self):
1236 mcinquil 1.121 """
1237     check the dimension of the output files
1238     """
1239 spiga 1.169 txt = ''
1240     txt += 'echo ">>> list of expected files on output sandbox"\n'
1241 mcinquil 1.121 listOutFiles = []
1242 ewv 1.170 stdout = 'CMSSW_$NJob.stdout'
1243 spiga 1.169 stderr = 'CMSSW_$NJob.stderr'
1244 fanzago 1.148 if (self.return_data == 1):
1245 spiga 1.157 for file in (self.output_file+self.output_file_sandbox):
1246     listOutFiles.append(self.numberFile_(file, '$NJob'))
1247 spiga 1.169 listOutFiles.append(stdout)
1248     listOutFiles.append(stderr)
1249 ewv 1.156 else:
1250 spiga 1.157 for file in (self.output_file_sandbox):
1251     listOutFiles.append(self.numberFile_(file, '$NJob'))
1252 spiga 1.169 listOutFiles.append(stdout)
1253     listOutFiles.append(stderr)
1254 fanzago 1.161 txt += 'echo "output files: '+string.join(listOutFiles,' ')+'"\n'
1255 spiga 1.157 txt += 'filesToCheck="'+string.join(listOutFiles,' ')+'"\n'
1256 spiga 1.169 txt += 'export filesToCheck\n'
1257 ewv 1.170 return txt