ViewVC Help
View File | Revision Log | Show Annotations | Root Listing
root/cvsroot/COMP/CRAB/python/cms_cmssw.py
Revision: 1.178
Committed: Sun Apr 20 09:34:40 2008 UTC (17 years ago) by spiga
Content type: text/x-python
Branch: MAIN
Changes since 1.177: +0 -21 lines
Log Message:
 reimplemented dashboard communication sending again  both pre and post submssion infos, plus many Submitter code reorganization

File Contents

# User Rev Content
1 slacapra 1.1 from JobType import JobType
2     from crab_logger import Logger
3     from crab_exceptions import *
4     from crab_util import *
5 fanzago 1.115 from BlackWhiteListParser import BlackWhiteListParser
6 slacapra 1.1 import common
7     import Scram
8 fanzago 1.173 from LFNBaseName import *
9 slacapra 1.1
10 slacapra 1.105 import os, string, glob
11 slacapra 1.1
12     class Cmssw(JobType):
13 mcinquil 1.144 def __init__(self, cfg_params, ncjobs):
14 slacapra 1.1 JobType.__init__(self, 'CMSSW')
15     common.logger.debug(3,'CMSSW::__init__')
16    
17 mcinquil 1.140 self.argsList = []
18 mcinquil 1.144
19 gutsche 1.3 self._params = {}
20     self.cfg_params = cfg_params
21 fanzago 1.115 # init BlackWhiteListParser
22     self.blackWhiteListParser = BlackWhiteListParser(cfg_params)
23    
24 slacapra 1.153 self.MaxTarBallSize = float(self.cfg_params.get('EDG.maxtarballsize',9.5))
25 gutsche 1.72
26 gutsche 1.44 # number of jobs requested to be created, limit obj splitting
27 gutsche 1.38 self.ncjobs = ncjobs
28    
29 slacapra 1.1 log = common.logger
30 ewv 1.131
31 slacapra 1.1 self.scram = Scram.Scram(cfg_params)
32     self.additional_inbox_files = []
33     self.scriptExe = ''
34     self.executable = ''
35 slacapra 1.71 self.executable_arch = self.scram.getArch()
36 slacapra 1.1 self.tgz_name = 'default.tgz'
37 slacapra 1.97 self.additional_tgz_name = 'additional.tgz'
38 corvo 1.56 self.scriptName = 'CMSSW.sh'
39 ewv 1.131 self.pset = '' #scrip use case Da
40 spiga 1.42 self.datasetPath = '' #scrip use case Da
41 gutsche 1.3
42 gutsche 1.50 # set FJR file name
43     self.fjrFileName = 'crab_fjr.xml'
44    
45 slacapra 1.1 self.version = self.scram.getSWVersion()
46 ewv 1.131
47 spiga 1.114 #
48     # Try to block creation in case of arch/version mismatch
49     #
50    
51 spiga 1.162 # a = string.split(self.version, "_")
52     #
53     # if int(a[1]) == 1 and (int(a[2]) < 5 and self.executable_arch.find('slc4') == 0):
54     # msg = "Warning: You are using %s version of CMSSW with %s architecture. \n--> Did you compile your libraries with SLC3? Otherwise you can find some problems running on SLC4 Grid nodes.\n"%(self.version, self.executable_arch)
55     # common.logger.message(msg)
56     # if int(a[1]) == 1 and (int(a[2]) >= 5 and self.executable_arch.find('slc3') == 0):
57     # msg = "Error: CMS does not support %s with %s architecture"%(self.version, self.executable_arch)
58     # raise CrabException(msg)
59     #
60 ewv 1.170
61 slacapra 1.47
62 slacapra 1.1 ### collect Data cards
63 gutsche 1.66
64 slacapra 1.153 if not cfg_params.has_key('CMSSW.datasetpath'):
65 ewv 1.131 msg = "Error: datasetpath not defined "
66 slacapra 1.1 raise CrabException(msg)
67 slacapra 1.153 tmp = cfg_params['CMSSW.datasetpath']
68     log.debug(6, "CMSSW::CMSSW(): datasetPath = "+tmp)
69     if string.lower(tmp)=='none':
70     self.datasetPath = None
71     self.selectNoInput = 1
72     else:
73     self.datasetPath = tmp
74     self.selectNoInput = 0
75 gutsche 1.5
76 slacapra 1.1 self.dataTiers = []
77    
78     ## now the application
79 slacapra 1.153 self.executable = cfg_params.get('CMSSW.executable','cmsRun')
80     log.debug(6, "CMSSW::CMSSW(): executable = "+self.executable)
81 slacapra 1.1
82 slacapra 1.153 if not cfg_params.has_key('CMSSW.pset'):
83 slacapra 1.1 raise CrabException("PSet file missing. Cannot run cmsRun ")
84 slacapra 1.153 self.pset = cfg_params['CMSSW.pset']
85     log.debug(6, "Cmssw::Cmssw(): PSet file = "+self.pset)
86     if self.pset.lower() != 'none' :
87     if (not os.path.exists(self.pset)):
88     raise CrabException("User defined PSet file "+self.pset+" does not exist")
89     else:
90     self.pset = None
91 slacapra 1.1
92     # output files
93 slacapra 1.53 ## stuff which must be returned always via sandbox
94     self.output_file_sandbox = []
95    
96     # add fjr report by default via sandbox
97     self.output_file_sandbox.append(self.fjrFileName)
98    
99     # other output files to be returned via sandbox or copied to SE
100 slacapra 1.153 self.output_file = []
101     tmp = cfg_params.get('CMSSW.output_file',None)
102     if tmp :
103     tmpOutFiles = string.split(tmp,',')
104     log.debug(7, 'cmssw::cmssw(): output files '+str(tmpOutFiles))
105     for tmp in tmpOutFiles:
106     tmp=string.strip(tmp)
107     self.output_file.append(tmp)
108 slacapra 1.1 pass
109 slacapra 1.153 else:
110 gutsche 1.92 log.message("No output file defined: only stdout/err and the CRAB Framework Job Report will be available\n")
111 slacapra 1.153 pass
112 slacapra 1.1
113     # script_exe file as additional file in inputSandbox
114 slacapra 1.153 self.scriptExe = cfg_params.get('USER.script_exe',None)
115     if self.scriptExe :
116 slacapra 1.176 if not os.path.isfile(self.scriptExe):
117     msg ="ERROR. file "+self.scriptExe+" not found"
118     raise CrabException(msg)
119     self.additional_inbox_files.append(string.strip(self.scriptExe))
120 slacapra 1.70
121 spiga 1.42 #CarlosDaniele
122     if self.datasetPath == None and self.pset == None and self.scriptExe == '' :
123 slacapra 1.176 msg ="Error. script_exe not defined"
124     raise CrabException(msg)
125 spiga 1.42
126 slacapra 1.1 ## additional input files
127 slacapra 1.153 if cfg_params.has_key('USER.additional_input_files'):
128 slacapra 1.29 tmpAddFiles = string.split(cfg_params['USER.additional_input_files'],',')
129 slacapra 1.70 for tmp in tmpAddFiles:
130     tmp = string.strip(tmp)
131     dirname = ''
132     if not tmp[0]=="/": dirname = "."
133 corvo 1.85 files = []
134     if string.find(tmp,"*")>-1:
135     files = glob.glob(os.path.join(dirname, tmp))
136     if len(files)==0:
137     raise CrabException("No additional input file found with this pattern: "+tmp)
138     else:
139     files.append(tmp)
140 slacapra 1.70 for file in files:
141     if not os.path.exists(file):
142     raise CrabException("Additional input file not found: "+file)
143 slacapra 1.45 pass
144 slacapra 1.105 # fname = string.split(file, '/')[-1]
145     # storedFile = common.work_space.pathForTgz()+'share/'+fname
146     # shutil.copyfile(file, storedFile)
147     self.additional_inbox_files.append(string.strip(file))
148 slacapra 1.1 pass
149     pass
150 slacapra 1.70 common.logger.debug(5,"Additional input files: "+str(self.additional_inbox_files))
151 slacapra 1.153 pass
152 gutsche 1.3
153 slacapra 1.9 ## Events per job
154 slacapra 1.153 if cfg_params.has_key('CMSSW.events_per_job'):
155 slacapra 1.10 self.eventsPerJob =int( cfg_params['CMSSW.events_per_job'])
156 slacapra 1.9 self.selectEventsPerJob = 1
157 slacapra 1.153 else:
158 slacapra 1.9 self.eventsPerJob = -1
159     self.selectEventsPerJob = 0
160 ewv 1.131
161 slacapra 1.22 ## number of jobs
162 slacapra 1.153 if cfg_params.has_key('CMSSW.number_of_jobs'):
163 slacapra 1.22 self.theNumberOfJobs =int( cfg_params['CMSSW.number_of_jobs'])
164     self.selectNumberOfJobs = 1
165 slacapra 1.153 else:
166 slacapra 1.22 self.theNumberOfJobs = 0
167     self.selectNumberOfJobs = 0
168 slacapra 1.10
169 slacapra 1.153 if cfg_params.has_key('CMSSW.total_number_of_events'):
170 gutsche 1.35 self.total_number_of_events = int(cfg_params['CMSSW.total_number_of_events'])
171     self.selectTotalNumberEvents = 1
172 slacapra 1.153 else:
173 gutsche 1.35 self.total_number_of_events = 0
174     self.selectTotalNumberEvents = 0
175    
176 ewv 1.131 if self.pset != None: #CarlosDaniele
177 spiga 1.42 if ( (self.selectTotalNumberEvents + self.selectEventsPerJob + self.selectNumberOfJobs) != 2 ):
178     msg = 'Must define exactly two of total_number_of_events, events_per_job, or number_of_jobs.'
179     raise CrabException(msg)
180     else:
181     if (self.selectNumberOfJobs == 0):
182     msg = 'Must specify number_of_jobs.'
183     raise CrabException(msg)
184 gutsche 1.35
185 ewv 1.160 ## New method of dealing with seeds
186     self.incrementSeeds = []
187     self.preserveSeeds = []
188     if cfg_params.has_key('CMSSW.preserve_seeds'):
189     tmpList = cfg_params['CMSSW.preserve_seeds'].split(',')
190     for tmp in tmpList:
191     tmp.strip()
192     self.preserveSeeds.append(tmp)
193     if cfg_params.has_key('CMSSW.increment_seeds'):
194     tmpList = cfg_params['CMSSW.increment_seeds'].split(',')
195     for tmp in tmpList:
196     tmp.strip()
197     self.incrementSeeds.append(tmp)
198    
199     ## Old method of dealing with seeds
200     ## FUTURE: This is for old CMSSW and old CRAB. Can throw exceptions after a couple of CRAB releases and then
201     ## remove
202 slacapra 1.153 self.sourceSeed = cfg_params.get('CMSSW.pythia_seed',None)
203 ewv 1.160 if self.sourceSeed:
204 slacapra 1.177 print "pythia_seed is a deprecated parameter. Use preserve_seeds or increment_seeds in the future.\n","Added to increment_seeds."
205     self.incrementSeeds.append('sourceSeed')
206 slacapra 1.153
207     self.sourceSeedVtx = cfg_params.get('CMSSW.vtx_seed',None)
208 ewv 1.160 if self.sourceSeedVtx:
209 slacapra 1.177 print "vtx_seed is a deprecated parameter. Use preserve_seeds or increment_seeds in the future.\n","Added to increment_seeds."
210     self.incrementSeeds.append('VtxSmeared')
211 slacapra 1.22
212 slacapra 1.153 self.sourceSeedG4 = cfg_params.get('CMSSW.g4_seed',None)
213 ewv 1.160 if self.sourceSeedG4:
214 slacapra 1.177 print "g4_seed is a deprecated parameter. Use preserve_seeds or increment_seeds in the future.\n","Added to increment_seeds."
215     self.incrementSeeds.append('g4SimHits')
216 slacapra 1.90
217 slacapra 1.153 self.sourceSeedMix = cfg_params.get('CMSSW.mix_seed',None)
218 ewv 1.160 if self.sourceSeedMix:
219 slacapra 1.177 print "mix_seed is a deprecated parameter. Use preserve_seeds or increment_seeds in the future.\n","Added to increment_seeds."
220     self.incrementSeeds.append('mix')
221 slacapra 1.90
222 slacapra 1.153 self.firstRun = cfg_params.get('CMSSW.first_run',None)
223 slacapra 1.90
224 spiga 1.42 if self.pset != None: #CarlosDaniele
225 ewv 1.131 import PsetManipulator as pp
226 slacapra 1.97 PsetEdit = pp.PsetManipulator(self.pset) #Daniele Pset
227 gutsche 1.3
228 ewv 1.147 # Copy/return
229    
230 slacapra 1.153 self.copy_data = int(cfg_params.get('USER.copy_data',0))
231     self.return_data = int(cfg_params.get('USER.return_data',0))
232 ewv 1.147
233 slacapra 1.1 #DBSDLS-start
234 ewv 1.131 ## Initialize the variables that are extracted from DBS/DLS and needed in other places of the code
235 slacapra 1.1 self.maxEvents=0 # max events available ( --> check the requested nb. of evts in Creator.py)
236     self.DBSPaths={} # all dbs paths requested ( --> input to the site local discovery script)
237 gutsche 1.35 self.jobDestination=[] # Site destination(s) for each job (list of lists)
238 slacapra 1.1 ## Perform the data location and discovery (based on DBS/DLS)
239 slacapra 1.9 ## SL: Don't if NONE is specified as input (pythia use case)
240 gutsche 1.35 blockSites = {}
241 slacapra 1.9 if self.datasetPath:
242 gutsche 1.35 blockSites = self.DataDiscoveryAndLocation(cfg_params)
243 ewv 1.131 #DBSDLS-end
244 slacapra 1.1
245     self.tgzNameWithPath = self.getTarBall(self.executable)
246 ewv 1.131
247 slacapra 1.9 ## Select Splitting
248 ewv 1.131 if self.selectNoInput:
249 spiga 1.42 if self.pset == None: #CarlosDaniele
250     self.jobSplittingForScript()
251     else:
252     self.jobSplittingNoInput()
253 gutsche 1.92 else:
254 corvo 1.56 self.jobSplittingByBlocks(blockSites)
255 gutsche 1.5
256 slacapra 1.22 # modify Pset
257 spiga 1.42 if self.pset != None: #CarlosDaniele
258 slacapra 1.86 try:
259 ewv 1.160 # Add FrameworkJobReport to parameter-set, set max events.
260     # Reset later for data jobs by writeCFG which does all modifications
261 slacapra 1.90 PsetEdit.addCrabFJR(self.fjrFileName)
262 ewv 1.160 PsetEdit.maxEvent(self.eventsPerJob)
263 slacapra 1.90 PsetEdit.psetWriter(self.configFilename())
264 slacapra 1.86 except:
265     msg='Error while manipuliating ParameterSet: exiting...'
266     raise CrabException(msg)
267 gutsche 1.3
268 slacapra 1.1 def DataDiscoveryAndLocation(self, cfg_params):
269    
270 slacapra 1.86 import DataDiscovery
271     import DataLocation
272 gutsche 1.3 common.logger.debug(10,"CMSSW::DataDiscoveryAndLocation()")
273    
274     datasetPath=self.datasetPath
275    
276 slacapra 1.1 ## Contact the DBS
277 gutsche 1.92 common.logger.message("Contacting Data Discovery Services ...")
278 slacapra 1.1 try:
279 slacapra 1.137 self.pubdata=DataDiscovery.DataDiscovery(datasetPath, cfg_params)
280 slacapra 1.1 self.pubdata.fetchDBSInfo()
281    
282 slacapra 1.41 except DataDiscovery.NotExistingDatasetError, ex :
283 slacapra 1.1 msg = 'ERROR ***: failed Data Discovery in DBS : %s'%ex.getErrorMessage()
284     raise CrabException(msg)
285 slacapra 1.41 except DataDiscovery.NoDataTierinProvenanceError, ex :
286 slacapra 1.1 msg = 'ERROR ***: failed Data Discovery in DBS : %s'%ex.getErrorMessage()
287     raise CrabException(msg)
288 slacapra 1.41 except DataDiscovery.DataDiscoveryError, ex:
289 gutsche 1.66 msg = 'ERROR ***: failed Data Discovery in DBS : %s'%ex.getErrorMessage()
290 slacapra 1.1 raise CrabException(msg)
291    
292 gutsche 1.35 self.filesbyblock=self.pubdata.getFiles()
293 mkirn 1.37 self.eventsbyblock=self.pubdata.getEventsPerBlock()
294     self.eventsbyfile=self.pubdata.getEventsPerFile()
295 gutsche 1.3
296 slacapra 1.1 ## get max number of events
297 ewv 1.131 self.maxEvents=self.pubdata.getMaxEvents() ## self.maxEvents used in Creator.py
298 slacapra 1.1
299     ## Contact the DLS and build a list of sites hosting the fileblocks
300     try:
301 slacapra 1.41 dataloc=DataLocation.DataLocation(self.filesbyblock.keys(),cfg_params)
302 gutsche 1.6 dataloc.fetchDLSInfo()
303 slacapra 1.41 except DataLocation.DataLocationError , ex:
304 slacapra 1.1 msg = 'ERROR ***: failed Data Location in DLS \n %s '%ex.getErrorMessage()
305     raise CrabException(msg)
306 ewv 1.131
307 slacapra 1.1
308 gutsche 1.35 sites = dataloc.getSites()
309     allSites = []
310     listSites = sites.values()
311 slacapra 1.63 for listSite in listSites:
312     for oneSite in listSite:
313 gutsche 1.35 allSites.append(oneSite)
314     allSites = self.uniquelist(allSites)
315 gutsche 1.3
316 gutsche 1.92 # screen output
317     common.logger.message("Requested dataset: " + datasetPath + " has " + str(self.maxEvents) + " events in " + str(len(self.filesbyblock.keys())) + " blocks.\n")
318    
319 gutsche 1.35 return sites
320 ewv 1.131
321 ewv 1.170 # to Be Removed DS -- BL
322 spiga 1.165 # def setArgsList(self, argsList):
323     # self.argsList = argsList
324 mcinquil 1.140
325 gutsche 1.35 def jobSplittingByBlocks(self, blockSites):
326 slacapra 1.9 """
327 gutsche 1.35 Perform job splitting. Jobs run over an integer number of files
328     and no more than one block.
329     ARGUMENT: blockSites: dictionary with blocks as keys and list of host sites as values
330     REQUIRES: self.selectTotalNumberEvents, self.selectEventsPerJob, self.selectNumberofJobs,
331     self.total_number_of_events, self.eventsPerJob, self.theNumberOfJobs,
332     self.maxEvents, self.filesbyblock
333     SETS: self.jobDestination - Site destination(s) for each job (a list of lists)
334     self.total_number_of_jobs - Total # of jobs
335     self.list_of_args - File(s) job will run on (a list of lists)
336     """
337    
338     # ---- Handle the possible job splitting configurations ---- #
339     if (self.selectTotalNumberEvents):
340     totalEventsRequested = self.total_number_of_events
341     if (self.selectEventsPerJob):
342     eventsPerJobRequested = self.eventsPerJob
343     if (self.selectNumberOfJobs):
344     totalEventsRequested = self.theNumberOfJobs * self.eventsPerJob
345    
346     # If user requested all the events in the dataset
347     if (totalEventsRequested == -1):
348     eventsRemaining=self.maxEvents
349     # If user requested more events than are in the dataset
350     elif (totalEventsRequested > self.maxEvents):
351     eventsRemaining = self.maxEvents
352     common.logger.message("Requested "+str(self.total_number_of_events)+ " events, but only "+str(self.maxEvents)+" events are available.")
353     # If user requested less events than are in the dataset
354     else:
355     eventsRemaining = totalEventsRequested
356 slacapra 1.22
357 slacapra 1.41 # If user requested more events per job than are in the dataset
358     if (self.selectEventsPerJob and eventsPerJobRequested > self.maxEvents):
359     eventsPerJobRequested = self.maxEvents
360    
361 gutsche 1.35 # For user info at end
362     totalEventCount = 0
363 gutsche 1.3
364 gutsche 1.35 if (self.selectTotalNumberEvents and self.selectNumberOfJobs):
365     eventsPerJobRequested = int(eventsRemaining/self.theNumberOfJobs)
366 slacapra 1.22
367 gutsche 1.35 if (self.selectNumberOfJobs):
368     common.logger.message("May not create the exact number_of_jobs requested.")
369 slacapra 1.23
370 gutsche 1.38 if ( self.ncjobs == 'all' ) :
371     totalNumberOfJobs = 999999999
372     else :
373     totalNumberOfJobs = self.ncjobs
374 ewv 1.131
375 gutsche 1.35 blocks = blockSites.keys()
376     blockCount = 0
377     # Backup variable in case self.maxEvents counted events in a non-included block
378     numBlocksInDataset = len(blocks)
379 gutsche 1.3
380 gutsche 1.35 jobCount = 0
381     list_of_lists = []
382 gutsche 1.3
383 gutsche 1.92 # list tracking which jobs are in which jobs belong to which block
384     jobsOfBlock = {}
385    
386 gutsche 1.35 # ---- Iterate over the blocks in the dataset until ---- #
387     # ---- we've met the requested total # of events ---- #
388 gutsche 1.38 while ( (eventsRemaining > 0) and (blockCount < numBlocksInDataset) and (jobCount < totalNumberOfJobs)):
389 gutsche 1.35 block = blocks[blockCount]
390 gutsche 1.44 blockCount += 1
391 gutsche 1.104 if block not in jobsOfBlock.keys() :
392     jobsOfBlock[block] = []
393 ewv 1.131
394 gutsche 1.68 if self.eventsbyblock.has_key(block) :
395     numEventsInBlock = self.eventsbyblock[block]
396     common.logger.debug(5,'Events in Block File '+str(numEventsInBlock))
397 ewv 1.131
398 gutsche 1.68 files = self.filesbyblock[block]
399     numFilesInBlock = len(files)
400     if (numFilesInBlock <= 0):
401     continue
402     fileCount = 0
403    
404     # ---- New block => New job ---- #
405 ewv 1.131 parString = ""
406 gutsche 1.68 # counter for number of events in files currently worked on
407     filesEventCount = 0
408     # flag if next while loop should touch new file
409     newFile = 1
410     # job event counter
411     jobSkipEventCount = 0
412 ewv 1.131
413 gutsche 1.68 # ---- Iterate over the files in the block until we've met the requested ---- #
414     # ---- total # of events or we've gone over all the files in this block ---- #
415     while ( (eventsRemaining > 0) and (fileCount < numFilesInBlock) and (jobCount < totalNumberOfJobs) ):
416     file = files[fileCount]
417     if newFile :
418     try:
419     numEventsInFile = self.eventsbyfile[file]
420     common.logger.debug(6, "File "+str(file)+" has "+str(numEventsInFile)+" events")
421     # increase filesEventCount
422     filesEventCount += numEventsInFile
423     # Add file to current job
424     parString += '\\\"' + file + '\\\"\,'
425     newFile = 0
426     except KeyError:
427     common.logger.message("File "+str(file)+" has unknown number of events: skipping")
428 ewv 1.131
429 slacapra 1.177 eventsPerJobRequested = min(eventsPerJobRequested, eventsRemaining)
430 gutsche 1.68 # if less events in file remain than eventsPerJobRequested
431 slacapra 1.177 if ( filesEventCount - jobSkipEventCount < eventsPerJobRequested):
432 gutsche 1.68 # if last file in block
433     if ( fileCount == numFilesInBlock-1 ) :
434     # end job using last file, use remaining events in block
435     # close job and touch new file
436     fullString = parString[:-2]
437     list_of_lists.append([fullString,str(-1),str(jobSkipEventCount)])
438     common.logger.debug(3,"Job "+str(jobCount+1)+" can run over "+str(filesEventCount - jobSkipEventCount)+" events (last file in block).")
439     self.jobDestination.append(blockSites[block])
440     common.logger.debug(5,"Job "+str(jobCount+1)+" Destination: "+str(self.jobDestination[jobCount]))
441 gutsche 1.92 # fill jobs of block dictionary
442 gutsche 1.104 jobsOfBlock[block].append(jobCount+1)
443 gutsche 1.68 # reset counter
444     jobCount = jobCount + 1
445     totalEventCount = totalEventCount + filesEventCount - jobSkipEventCount
446     eventsRemaining = eventsRemaining - filesEventCount + jobSkipEventCount
447     jobSkipEventCount = 0
448     # reset file
449 ewv 1.131 parString = ""
450 gutsche 1.68 filesEventCount = 0
451     newFile = 1
452     fileCount += 1
453     else :
454     # go to next file
455     newFile = 1
456     fileCount += 1
457     # if events in file equal to eventsPerJobRequested
458     elif ( filesEventCount - jobSkipEventCount == eventsPerJobRequested ) :
459 gutsche 1.38 # close job and touch new file
460     fullString = parString[:-2]
461 gutsche 1.68 list_of_lists.append([fullString,str(eventsPerJobRequested),str(jobSkipEventCount)])
462     common.logger.debug(3,"Job "+str(jobCount+1)+" can run over "+str(eventsPerJobRequested)+" events.")
463 gutsche 1.38 self.jobDestination.append(blockSites[block])
464     common.logger.debug(5,"Job "+str(jobCount+1)+" Destination: "+str(self.jobDestination[jobCount]))
465 gutsche 1.104 jobsOfBlock[block].append(jobCount+1)
466 gutsche 1.38 # reset counter
467     jobCount = jobCount + 1
468 gutsche 1.68 totalEventCount = totalEventCount + eventsPerJobRequested
469     eventsRemaining = eventsRemaining - eventsPerJobRequested
470 gutsche 1.38 jobSkipEventCount = 0
471     # reset file
472 ewv 1.131 parString = ""
473 gutsche 1.38 filesEventCount = 0
474     newFile = 1
475     fileCount += 1
476 ewv 1.131
477 gutsche 1.68 # if more events in file remain than eventsPerJobRequested
478 gutsche 1.38 else :
479 gutsche 1.68 # close job but don't touch new file
480     fullString = parString[:-2]
481     list_of_lists.append([fullString,str(eventsPerJobRequested),str(jobSkipEventCount)])
482     common.logger.debug(3,"Job "+str(jobCount+1)+" can run over "+str(eventsPerJobRequested)+" events.")
483     self.jobDestination.append(blockSites[block])
484     common.logger.debug(5,"Job "+str(jobCount+1)+" Destination: "+str(self.jobDestination[jobCount]))
485 gutsche 1.104 jobsOfBlock[block].append(jobCount+1)
486 gutsche 1.68 # increase counter
487     jobCount = jobCount + 1
488     totalEventCount = totalEventCount + eventsPerJobRequested
489     eventsRemaining = eventsRemaining - eventsPerJobRequested
490     # calculate skip events for last file
491     # use filesEventCount (contains several files), jobSkipEventCount and eventsPerJobRequest
492     jobSkipEventCount = eventsPerJobRequested - (filesEventCount - jobSkipEventCount - self.eventsbyfile[file])
493     # remove all but the last file
494     filesEventCount = self.eventsbyfile[file]
495 ewv 1.160 parString = '\\\"' + file + '\\\"\,'
496 gutsche 1.68 pass # END if
497     pass # END while (iterate over files in the block)
498 gutsche 1.35 pass # END while (iterate over blocks in the dataset)
499 slacapra 1.41 self.ncjobs = self.total_number_of_jobs = jobCount
500 gutsche 1.38 if (eventsRemaining > 0 and jobCount < totalNumberOfJobs ):
501 gutsche 1.35 common.logger.message("Could not run on all requested events because some blocks not hosted at allowed sites.")
502 gutsche 1.92 common.logger.message(str(jobCount)+" job(s) can run on "+str(totalEventCount)+" events.\n")
503 ewv 1.131
504 gutsche 1.92 # screen output
505     screenOutput = "List of jobs and available destination sites:\n\n"
506    
507 mcinquil 1.124 # keep trace of block with no sites to print a warning at the end
508     noSiteBlock = []
509     bloskNoSite = []
510    
511 gutsche 1.92 blockCounter = 0
512 gutsche 1.104 for block in blocks:
513     if block in jobsOfBlock.keys() :
514     blockCounter += 1
515 slacapra 1.176 screenOutput += "Block %5i: jobs %20s: sites: %s\n" % (blockCounter,spanRanges(jobsOfBlock[block]),
516     ','.join(self.blackWhiteListParser.checkWhiteList(self.blackWhiteListParser.checkBlackList(blockSites[block],block),block)))
517 mcinquil 1.124 if len(self.blackWhiteListParser.checkWhiteList(self.blackWhiteListParser.checkBlackList(blockSites[block],block),block)) == 0:
518 ewv 1.131 noSiteBlock.append( spanRanges(jobsOfBlock[block]) )
519 mcinquil 1.124 bloskNoSite.append( blockCounter )
520 ewv 1.131
521 mcinquil 1.124 common.logger.message(screenOutput)
522 fanzago 1.127 if len(noSiteBlock) > 0 and len(bloskNoSite) > 0:
523 mcinquil 1.126 msg = 'WARNING: No sites are hosting any part of data for block:\n '
524     virgola = ""
525     if len(bloskNoSite) > 1:
526     virgola = ","
527     for block in bloskNoSite:
528     msg += ' ' + str(block) + virgola
529     msg += '\n Related jobs:\n '
530     virgola = ""
531     if len(noSiteBlock) > 1:
532     virgola = ","
533     for range_jobs in noSiteBlock:
534     msg += str(range_jobs) + virgola
535     msg += '\n will not be submitted and this block of data can not be analyzed!\n'
536 slacapra 1.155 if self.cfg_params.has_key('EDG.se_white_list'):
537     msg += 'WARNING: SE White List: '+self.cfg_params['EDG.se_white_list']+'\n'
538     msg += '(Hint: By whitelisting you force the job to run at this particular site(s).\n'
539     msg += 'Please check if the dataset is available at this site!)\n'
540     if self.cfg_params.has_key('EDG.ce_white_list'):
541     msg += 'WARNING: CE White List: '+self.cfg_params['EDG.ce_white_list']+'\n'
542     msg += '(Hint: By whitelisting you force the job to run at this particular site(s).\n'
543     msg += 'Please check if the dataset is available at this site!)\n'
544    
545 mcinquil 1.126 common.logger.message(msg)
546 gutsche 1.92
547 slacapra 1.9 self.list_of_args = list_of_lists
548     return
549    
550 slacapra 1.21 def jobSplittingNoInput(self):
551 slacapra 1.9 """
552     Perform job splitting based on number of event per job
553     """
554     common.logger.debug(5,'Splitting per events')
555 fanzago 1.130
556 ewv 1.131 if (self.selectEventsPerJob):
557 fanzago 1.130 common.logger.message('Required '+str(self.eventsPerJob)+' events per job ')
558     if (self.selectNumberOfJobs):
559     common.logger.message('Required '+str(self.theNumberOfJobs)+' jobs in total ')
560     if (self.selectTotalNumberEvents):
561     common.logger.message('Required '+str(self.total_number_of_events)+' events in total ')
562 slacapra 1.9
563 slacapra 1.10 if (self.total_number_of_events < 0):
564     msg='Cannot split jobs per Events with "-1" as total number of events'
565     raise CrabException(msg)
566    
567 slacapra 1.22 if (self.selectEventsPerJob):
568 spiga 1.65 if (self.selectTotalNumberEvents):
569     self.total_number_of_jobs = int(self.total_number_of_events/self.eventsPerJob)
570 ewv 1.131 elif(self.selectNumberOfJobs) :
571 spiga 1.65 self.total_number_of_jobs =self.theNumberOfJobs
572 ewv 1.131 self.total_number_of_events =int(self.theNumberOfJobs*self.eventsPerJob)
573 spiga 1.65
574 slacapra 1.22 elif (self.selectNumberOfJobs) :
575     self.total_number_of_jobs = self.theNumberOfJobs
576     self.eventsPerJob = int(self.total_number_of_events/self.total_number_of_jobs)
577 ewv 1.131
578 slacapra 1.9 common.logger.debug(5,'N jobs '+str(self.total_number_of_jobs))
579    
580     # is there any remainder?
581     check = int(self.total_number_of_events) - (int(self.total_number_of_jobs)*self.eventsPerJob)
582    
583     common.logger.debug(5,'Check '+str(check))
584    
585 gutsche 1.35 common.logger.message(str(self.total_number_of_jobs)+' jobs can be created, each for '+str(self.eventsPerJob)+' for a total of '+str(self.total_number_of_jobs*self.eventsPerJob)+' events')
586 slacapra 1.9 if check > 0:
587 gutsche 1.35 common.logger.message('Warning: asked '+str(self.total_number_of_events)+' but can do only '+str(int(self.total_number_of_jobs)*self.eventsPerJob))
588 slacapra 1.9
589 slacapra 1.10 # argument is seed number.$i
590 slacapra 1.9 self.list_of_args = []
591     for i in range(self.total_number_of_jobs):
592 gutsche 1.35 ## Since there is no input, any site is good
593 ewv 1.131 self.jobDestination.append([""]) #must be empty to write correctly the xml
594 slacapra 1.90 args=[]
595 spiga 1.57 if (self.firstRun):
596 slacapra 1.138 ## pythia first run
597 slacapra 1.90 args.append(str(self.firstRun)+str(i))
598     self.list_of_args.append(args)
599 ewv 1.131
600 gutsche 1.3 return
601    
602 spiga 1.42
603     def jobSplittingForScript(self):#CarlosDaniele
604     """
605     Perform job splitting based on number of job
606     """
607     common.logger.debug(5,'Splitting per job')
608     common.logger.message('Required '+str(self.theNumberOfJobs)+' jobs in total ')
609    
610     self.total_number_of_jobs = self.theNumberOfJobs
611    
612     common.logger.debug(5,'N jobs '+str(self.total_number_of_jobs))
613    
614     common.logger.message(str(self.total_number_of_jobs)+' jobs can be created')
615    
616     # argument is seed number.$i
617     self.list_of_args = []
618     for i in range(self.total_number_of_jobs):
619     ## Since there is no input, any site is good
620     # self.jobDestination.append(["Any"])
621     self.jobDestination.append([""])
622     ## no random seed
623     self.list_of_args.append([str(i)])
624     return
625    
626 gutsche 1.3 def split(self, jobParams):
627 ewv 1.131
628 gutsche 1.3 #### Fabio
629     njobs = self.total_number_of_jobs
630 slacapra 1.9 arglist = self.list_of_args
631 gutsche 1.3 # create the empty structure
632     for i in range(njobs):
633     jobParams.append("")
634 ewv 1.131
635 spiga 1.165 listID=[]
636     listField=[]
637 gutsche 1.3 for job in range(njobs):
638 slacapra 1.17 jobParams[job] = arglist[job]
639 spiga 1.167 listID.append(job+1)
640 spiga 1.162 job_ToSave ={}
641 spiga 1.169 concString = ' '
642 spiga 1.165 argu=''
643     if len(jobParams[job]):
644     argu += concString.join(jobParams[job] )
645 spiga 1.169 job_ToSave['arguments']= str(job+1)+' '+argu## new BL--DS
646 spiga 1.162 job_ToSave['dlsDestination']= self.jobDestination[job]## new BL--DS
647 spiga 1.165 #common._db.updateJob_(job,job_ToSave)## new BL--DS
648     listField.append(job_ToSave)
649 spiga 1.169 msg="Job "+str(job)+" Arguments: "+str(job+1)+" "+argu+"\n" \
650 spiga 1.165 +" Destination: "+str(self.jobDestination[job])
651     common.logger.debug(5,msg)
652     #common.logger.debug(5,"Job "+str(job)+" Destination: "+str(self.jobDestination[job]))
653     common._db.updateJob_(listID,listField)## new BL--DS
654     ## Pay Attention Here....DS--BL
655 ewv 1.170 self.argsList = (len(jobParams[1])+1)
656 gutsche 1.3
657     return
658 ewv 1.131
659 gutsche 1.3 def numberOfJobs(self):
660     # Fabio
661     return self.total_number_of_jobs
662    
663 slacapra 1.1 def getTarBall(self, exe):
664     """
665     Return the TarBall with lib and exe
666     """
667 ewv 1.131
668 slacapra 1.1 # if it exist, just return it
669 corvo 1.56 #
670     # Marco. Let's start to use relative path for Boss XML files
671     #
672     self.tgzNameWithPath = common.work_space.pathForTgz()+'share/'+self.tgz_name
673 slacapra 1.1 if os.path.exists(self.tgzNameWithPath):
674     return self.tgzNameWithPath
675    
676     # Prepare a tar gzipped file with user binaries.
677     self.buildTar_(exe)
678    
679     return string.strip(self.tgzNameWithPath)
680    
681     def buildTar_(self, executable):
682    
683     # First of all declare the user Scram area
684     swArea = self.scram.getSWArea_()
685     #print "swArea = ", swArea
686 slacapra 1.63 # swVersion = self.scram.getSWVersion()
687     # print "swVersion = ", swVersion
688 slacapra 1.1 swReleaseTop = self.scram.getReleaseTop_()
689     #print "swReleaseTop = ", swReleaseTop
690 ewv 1.131
691 slacapra 1.1 ## check if working area is release top
692     if swReleaseTop == '' or swArea == swReleaseTop:
693 afanfani 1.172 common.logger.debug(3,"swArea = "+swArea+" swReleaseTop ="+swReleaseTop)
694 slacapra 1.1 return
695    
696 slacapra 1.61 import tarfile
697     try: # create tar ball
698     tar = tarfile.open(self.tgzNameWithPath, "w:gz")
699     ## First find the executable
700 slacapra 1.86 if (self.executable != ''):
701 slacapra 1.61 exeWithPath = self.scram.findFile_(executable)
702     if ( not exeWithPath ):
703     raise CrabException('User executable '+executable+' not found')
704 ewv 1.131
705 slacapra 1.61 ## then check if it's private or not
706     if exeWithPath.find(swReleaseTop) == -1:
707     # the exe is private, so we must ship
708     common.logger.debug(5,"Exe "+exeWithPath+" to be tarred")
709     path = swArea+'/'
710 corvo 1.85 # distinguish case when script is in user project area or given by full path somewhere else
711     if exeWithPath.find(path) >= 0 :
712     exe = string.replace(exeWithPath, path,'')
713 slacapra 1.129 tar.add(path+exe,exe)
714 corvo 1.85 else :
715     tar.add(exeWithPath,os.path.basename(executable))
716 slacapra 1.61 pass
717     else:
718     # the exe is from release, we'll find it on WN
719     pass
720 ewv 1.131
721 slacapra 1.61 ## Now get the libraries: only those in local working area
722     libDir = 'lib'
723     lib = swArea+'/' +libDir
724     common.logger.debug(5,"lib "+lib+" to be tarred")
725     if os.path.exists(lib):
726     tar.add(lib,libDir)
727 ewv 1.131
728 slacapra 1.61 ## Now check if module dir is present
729     moduleDir = 'module'
730     module = swArea + '/' + moduleDir
731     if os.path.isdir(module):
732     tar.add(module,moduleDir)
733    
734     ## Now check if any data dir(s) is present
735     swAreaLen=len(swArea)
736     for root, dirs, files in os.walk(swArea):
737     if "data" in dirs:
738     common.logger.debug(5,"data "+root+"/data"+" to be tarred")
739     tar.add(root+"/data",root[swAreaLen:]+"/data")
740    
741 fanzago 1.93
742 fanzago 1.152 ## Add ProdCommon dir to tar
743 fanzago 1.93 prodcommonDir = 'ProdCommon'
744     prodcommonPath = os.environ['CRABDIR'] + '/' + 'ProdCommon'
745     if os.path.isdir(prodcommonPath):
746     tar.add(prodcommonPath,prodcommonDir)
747 ewv 1.131
748 slacapra 1.61 common.logger.debug(5,"Files added to "+self.tgzNameWithPath+" : "+str(tar.getnames()))
749     tar.close()
750     except :
751     raise CrabException('Could not create tar-ball')
752 gutsche 1.72
753     ## check for tarball size
754     tarballinfo = os.stat(self.tgzNameWithPath)
755     if ( tarballinfo.st_size > self.MaxTarBallSize*1024*1024 ) :
756     raise CrabException('Input sandbox size of ' + str(float(tarballinfo.st_size)/1024.0/1024.0) + ' MB is larger than the allowed ' + str(self.MaxTarBallSize) + ' MB input sandbox limit and not supported by the used GRID submission system. Please make sure that no unnecessary files are in all data directories in your local CMSSW project area as they are automatically packed into the input sandbox.')
757    
758 slacapra 1.61 ## create tar-ball with ML stuff
759 ewv 1.131 self.MLtgzfile = common.work_space.pathForTgz()+'share/MLfiles.tgz'
760 slacapra 1.61 try:
761     tar = tarfile.open(self.MLtgzfile, "w:gz")
762     path=os.environ['CRABDIR'] + '/python/'
763 fanzago 1.166 for file in ['report.py', 'DashboardAPI.py', 'Logger.py', 'ProcInfo.py', 'apmon.py', 'parseCrabFjr.py','writeCfg.py', 'JobReportErrorCode.py']:
764 slacapra 1.61 tar.add(path+file,file)
765     common.logger.debug(5,"Files added to "+self.MLtgzfile+" : "+str(tar.getnames()))
766     tar.close()
767     except :
768 corvo 1.58 raise CrabException('Could not create ML files tar-ball')
769 ewv 1.131
770 slacapra 1.1 return
771 ewv 1.131
772 slacapra 1.97 def additionalInputFileTgz(self):
773     """
774     Put all additional files into a tar ball and return its name
775     """
776     import tarfile
777     tarName= common.work_space.pathForTgz()+'share/'+self.additional_tgz_name
778     tar = tarfile.open(tarName, "w:gz")
779     for file in self.additional_inbox_files:
780     tar.add(file,string.split(file,'/')[-1])
781     common.logger.debug(5,"Files added to "+self.additional_tgz_name+" : "+str(tar.getnames()))
782     tar.close()
783     return tarName
784    
785 spiga 1.165 def wsSetupEnvironment(self, nj=0):
786 slacapra 1.1 """
787     Returns part of a job script which prepares
788     the execution environment for the job 'nj'.
789     """
790     # Prepare JobType-independent part
791 ewv 1.160 txt = '\n#Written by cms_cmssw::wsSetupEnvironment\n'
792 fanzago 1.133 txt += 'echo ">>> setup environment"\n'
793 ewv 1.131 txt += 'if [ $middleware == LCG ]; then \n'
794 gutsche 1.3 txt += self.wsSetupCMSLCGEnvironment_()
795     txt += 'elif [ $middleware == OSG ]; then\n'
796 gutsche 1.43 txt += ' WORKING_DIR=`/bin/mktemp -d $OSG_WN_TMP/cms_XXXXXXXXXXXX`\n'
797 ewv 1.132 txt += ' if [ ! $? == 0 ] ;then\n'
798 fanzago 1.161 txt += ' echo "ERROR ==> OSG $WORKING_DIR could not be created on WN `hostname`"\n'
799     txt += ' job_exit_code=10016\n'
800     txt += ' func_exit\n'
801 gutsche 1.3 txt += ' fi\n'
802 fanzago 1.133 txt += ' echo ">>> Created working directory: $WORKING_DIR"\n'
803 gutsche 1.3 txt += '\n'
804     txt += ' echo "Change to working directory: $WORKING_DIR"\n'
805     txt += ' cd $WORKING_DIR\n'
806 fanzago 1.133 txt += ' echo ">>> current directory (WORKING_DIR): $WORKING_DIR"\n'
807 ewv 1.131 txt += self.wsSetupCMSOSGEnvironment_()
808 fanzago 1.133 #txt += ' echo "### Set SCRAM ARCH to ' + self.executable_arch + ' ###"\n'
809     #txt += ' export SCRAM_ARCH='+self.executable_arch+'\n'
810 gutsche 1.3 txt += 'fi\n'
811 slacapra 1.1
812     # Prepare JobType-specific part
813     scram = self.scram.commandName()
814     txt += '\n\n'
815 fanzago 1.133 txt += 'echo ">>> specific cmssw setup environment:"\n'
816     txt += 'echo "CMSSW_VERSION = '+self.version+'"\n'
817 slacapra 1.1 txt += scram+' project CMSSW '+self.version+'\n'
818     txt += 'status=$?\n'
819     txt += 'if [ $status != 0 ] ; then\n'
820 fanzago 1.161 txt += ' echo "ERROR ==> CMSSW '+self.version+' not found on `hostname`" \n'
821     txt += ' job_exit_code=10034\n'
822 fanzago 1.163 txt += ' func_exit\n'
823 slacapra 1.1 txt += 'fi \n'
824     txt += 'cd '+self.version+'\n'
825 fanzago 1.99 ########## FEDE FOR DBS2 ######################
826     txt += 'SOFTWARE_DIR=`pwd`\n'
827 fanzago 1.133 txt += 'echo ">>> current directory (SOFTWARE_DIR): $SOFTWARE_DIR" \n'
828 fanzago 1.99 ###############################################
829 slacapra 1.1 ### needed grep for bug in scramv1 ###
830     txt += 'eval `'+scram+' runtime -sh | grep -v SCRAMRT_LSB_JOBNAME`\n'
831     # Handle the arguments:
832     txt += "\n"
833 gutsche 1.7 txt += "## number of arguments (first argument always jobnumber)\n"
834 slacapra 1.1 txt += "\n"
835 spiga 1.165 txt += "if [ $nargs -lt "+str(self.argsList)+" ]\n"
836 slacapra 1.1 txt += "then\n"
837 fanzago 1.161 txt += " echo 'ERROR ==> Too few arguments' +$nargs+ \n"
838     txt += ' job_exit_code=50113\n'
839     txt += " func_exit\n"
840 slacapra 1.1 txt += "fi\n"
841     txt += "\n"
842    
843     # Prepare job-specific part
844     job = common.job_list[nj]
845 fanzago 1.93 ### FEDE FOR DBS OUTPUT PUBLICATION
846 ewv 1.131 if (self.datasetPath):
847 fanzago 1.93 txt += '\n'
848     txt += 'DatasetPath='+self.datasetPath+'\n'
849    
850     datasetpath_split = self.datasetPath.split("/")
851 ewv 1.131
852 fanzago 1.93 txt += 'PrimaryDataset='+datasetpath_split[1]+'\n'
853     txt += 'DataTier='+datasetpath_split[2]+'\n'
854 fanzago 1.96 txt += 'ApplicationFamily=cmsRun\n'
855 fanzago 1.93
856     else:
857     txt += 'DatasetPath=MCDataTier\n'
858     txt += 'PrimaryDataset=null\n'
859     txt += 'DataTier=null\n'
860     txt += 'ApplicationFamily=MCDataTier\n'
861 ewv 1.170 if self.pset != None:
862 spiga 1.42 pset = os.path.basename(job.configFilename())
863     txt += '\n'
864 spiga 1.95 txt += 'cp $RUNTIME_AREA/'+pset+' .\n'
865 spiga 1.42 if (self.datasetPath): # standard job
866 ewv 1.160 txt += 'InputFiles=${args[1]}; export InputFiles\n'
867     txt += 'MaxEvents=${args[2]}; export MaxEvents\n'
868     txt += 'SkipEvents=${args[3]}; export SkipEvents\n'
869 spiga 1.42 txt += 'echo "Inputfiles:<$InputFiles>"\n'
870     txt += 'echo "MaxEvents:<$MaxEvents>"\n'
871     txt += 'echo "SkipEvents:<$SkipEvents>"\n'
872     else: # pythia like job
873 ewv 1.160 txt += 'PreserveSeeds=' + ','.join(self.preserveSeeds) + '; export PreserveSeeds\n'
874     txt += 'IncrementSeeds=' + ','.join(self.incrementSeeds) + '; export IncrementSeeds\n'
875     txt += 'echo "PreserveSeeds: <$PreserveSeeds>"\n'
876     txt += 'echo "IncrementSeeds:<$IncrementSeeds>"\n'
877 slacapra 1.90 if (self.firstRun):
878 ewv 1.160 txt += 'FirstRun=${args[1]}; export FirstRun\n'
879 spiga 1.57 txt += 'echo "FirstRun: <$FirstRun>"\n'
880 slacapra 1.90
881     txt += 'mv -f '+pset+' pset.cfg\n'
882 slacapra 1.1
883     if len(self.additional_inbox_files) > 0:
884 slacapra 1.97 txt += 'if [ -e $RUNTIME_AREA/'+self.additional_tgz_name+' ] ; then\n'
885     txt += ' tar xzvf $RUNTIME_AREA/'+self.additional_tgz_name+'\n'
886     txt += 'fi\n'
887 ewv 1.131 pass
888 slacapra 1.1
889 fanzago 1.163 if self.pset != None:
890 spiga 1.42 txt += '\n'
891     txt += 'echo "***** cat pset.cfg *********"\n'
892     txt += 'cat pset.cfg\n'
893     txt += 'echo "****** end pset.cfg ********"\n'
894     txt += '\n'
895 fanzago 1.94 txt += 'PSETHASH=`EdmConfigHash < pset.cfg` \n'
896     txt += 'echo "PSETHASH = $PSETHASH" \n'
897 fanzago 1.93 txt += '\n'
898 gutsche 1.3 return txt
899 slacapra 1.176
900 fanzago 1.166 def wsUntarSoftware(self, nj=0):
901 gutsche 1.3 """
902     Put in the script the commands to build an executable
903     or a library.
904     """
905    
906 fanzago 1.166 txt = '\n#Written by cms_cmssw::wsUntarSoftware\n'
907 gutsche 1.3
908     if os.path.isfile(self.tgzNameWithPath):
909 fanzago 1.133 txt += 'echo ">>> tar xzvf $RUNTIME_AREA/'+os.path.basename(self.tgzNameWithPath)+' :" \n'
910 gutsche 1.3 txt += 'tar xzvf $RUNTIME_AREA/'+os.path.basename(self.tgzNameWithPath)+'\n'
911     txt += 'untar_status=$? \n'
912     txt += 'if [ $untar_status -ne 0 ]; then \n'
913 fanzago 1.161 txt += ' echo "ERROR ==> Untarring .tgz file failed"\n'
914     txt += ' job_exit_code=$untar_status\n'
915     txt += ' func_exit\n'
916 gutsche 1.3 txt += 'else \n'
917     txt += ' echo "Successful untar" \n'
918     txt += 'fi \n'
919 gutsche 1.50 txt += '\n'
920 fanzago 1.152 txt += 'echo ">>> Include ProdCommon in PYTHONPATH:"\n'
921 gutsche 1.50 txt += 'if [ -z "$PYTHONPATH" ]; then\n'
922 fanzago 1.166 txt += ' export PYTHONPATH=$RUNTIME_AREA/ProdCommon\n'
923 gutsche 1.50 txt += 'else\n'
924 fanzago 1.166 txt += ' export PYTHONPATH=$RUNTIME_AREA/ProdCommon:${PYTHONPATH}\n'
925 fanzago 1.93 txt += 'echo "PYTHONPATH=$PYTHONPATH"\n'
926 gutsche 1.50 txt += 'fi\n'
927     txt += '\n'
928    
929 gutsche 1.3 pass
930 ewv 1.131
931 slacapra 1.1 return txt
932 ewv 1.170
933 fanzago 1.166 def wsBuildExe(self, nj=0):
934     """
935     Put in the script the commands to build an executable
936     or a library.
937     """
938    
939     txt = '\n#Written by cms_cmssw::wsBuildExe\n'
940     txt += 'echo ">>> moving CMSSW software directories in `pwd`" \n'
941    
942 ewv 1.170 txt += 'rm -r lib/ module/ \n'
943     txt += 'mv $RUNTIME_AREA/lib/ . \n'
944     txt += 'mv $RUNTIME_AREA/module/ . \n'
945     txt += 'mv $RUNTIME_AREA/ProdCommon/ . \n'
946    
947 fanzago 1.166 txt += 'if [ -z "$PYTHONPATH" ]; then\n'
948     txt += ' export PYTHONPATH=$SOFTWARE_DIR/ProdCommon\n'
949     txt += 'else\n'
950     txt += ' export PYTHONPATH=$SOFTWARE_DIR/ProdCommon:${PYTHONPATH}\n'
951     txt += 'echo "PYTHONPATH=$PYTHONPATH"\n'
952     txt += 'fi\n'
953     txt += '\n'
954    
955     return txt
956 slacapra 1.1
957     def modifySteeringCards(self, nj):
958     """
959 ewv 1.131 modify the card provided by the user,
960 slacapra 1.1 writing a new card into share dir
961     """
962 ewv 1.131
963 slacapra 1.1 def executableName(self):
964 slacapra 1.70 if self.scriptExe: #CarlosDaniele
965 spiga 1.42 return "sh "
966     else:
967     return self.executable
968 slacapra 1.1
969     def executableArgs(self):
970 ewv 1.160 # FUTURE: This function tests the CMSSW version. Can be simplified as we drop support for old versions
971 slacapra 1.70 if self.scriptExe:#CarlosDaniele
972 spiga 1.42 return self.scriptExe + " $NJob"
973 fanzago 1.115 else:
974     version_array = self.scram.getSWVersion().split('_')
975     major = 0
976     minor = 0
977     try:
978     major = int(version_array[1])
979     minor = int(version_array[2])
980     except:
981 ewv 1.131 msg = "Cannot parse CMSSW version string: " + "_".join(version_array) + " for major and minor release number!"
982 fanzago 1.115 raise CrabException(msg)
983 ewv 1.160
984     ex_args = ""
985 ewv 1.171 # FUTURE: This tests the CMSSW version. Can remove code as versions deprecated
986 ewv 1.160 # Framework job report
987 fanzago 1.115 if major >= 1 and minor >= 5 :
988 fanzago 1.166 ex_args += " -j $RUNTIME_AREA/crab_fjr_$NJob.xml"
989 ewv 1.160 # Type of cfg file
990     if major >= 2 :
991 ewv 1.171 ex_args += " -p pset.py"
992 fanzago 1.115 else:
993 ewv 1.160 ex_args += " -p pset.cfg"
994     return ex_args
995 slacapra 1.1
996     def inputSandbox(self, nj):
997     """
998     Returns a list of filenames to be put in JDL input sandbox.
999     """
1000     inp_box = []
1001 slacapra 1.53 # # dict added to delete duplicate from input sandbox file list
1002     # seen = {}
1003 slacapra 1.1 ## code
1004     if os.path.isfile(self.tgzNameWithPath):
1005     inp_box.append(self.tgzNameWithPath)
1006 corvo 1.58 if os.path.isfile(self.MLtgzfile):
1007     inp_box.append(self.MLtgzfile)
1008 slacapra 1.1 ## config
1009 slacapra 1.70 if not self.pset is None:
1010 corvo 1.56 inp_box.append(common.work_space.pathForTgz() + 'job/' + self.configFilename())
1011 slacapra 1.1 ## additional input files
1012 slacapra 1.97 tgz = self.additionalInputFileTgz()
1013     inp_box.append(tgz)
1014 spiga 1.168 ## executable
1015     wrapper = os.path.basename(str(common._db.queryTask('scriptName')))
1016     inp_box.append(common.work_space.pathForTgz() +'job/'+ wrapper)
1017 slacapra 1.1 return inp_box
1018    
1019     def outputSandbox(self, nj):
1020     """
1021     Returns a list of filenames to be put in JDL output sandbox.
1022     """
1023     out_box = []
1024    
1025     ## User Declared output files
1026 slacapra 1.54 for out in (self.output_file+self.output_file_sandbox):
1027 ewv 1.131 n_out = nj + 1
1028 slacapra 1.1 out_box.append(self.numberFile_(out,str(n_out)))
1029     return out_box
1030    
1031     def prepareSteeringCards(self):
1032     """
1033     Make initial modifications of the user's steering card file.
1034     """
1035     return
1036    
1037     def wsRenameOutput(self, nj):
1038     """
1039     Returns part of a job script which renames the produced files.
1040     """
1041    
1042 ewv 1.160 txt = '\n#Written by cms_cmssw::wsRenameOutput\n'
1043 fanzago 1.148 txt += 'echo ">>> current directory (SOFTWARE_DIR): $SOFTWARE_DIR" \n'
1044     txt += 'echo ">>> current directory content:"\n'
1045 gutsche 1.7 txt += 'ls \n'
1046 fanzago 1.145 txt += '\n'
1047 slacapra 1.54
1048 fanzago 1.128 for fileWithSuffix in (self.output_file):
1049 slacapra 1.1 output_file_num = self.numberFile_(fileWithSuffix, '$NJob')
1050     txt += '\n'
1051 gutsche 1.7 txt += '# check output file\n'
1052 slacapra 1.106 txt += 'if [ -e ./'+fileWithSuffix+' ] ; then\n'
1053 ewv 1.147 if (self.copy_data == 1): # For OSG nodes, file is in $WORKING_DIR, should not be moved to $RUNTIME_AREA
1054     txt += ' mv '+fileWithSuffix+' '+output_file_num+'\n'
1055     txt += ' ln -s `pwd`/'+output_file_num+' $RUNTIME_AREA/'+fileWithSuffix+'\n'
1056     else:
1057     txt += ' mv '+fileWithSuffix+' $RUNTIME_AREA/'+output_file_num+'\n'
1058     txt += ' ln -s $RUNTIME_AREA/'+output_file_num+' $RUNTIME_AREA/'+fileWithSuffix+'\n'
1059 slacapra 1.106 txt += 'else\n'
1060 fanzago 1.161 txt += ' job_exit_code=60302\n'
1061     txt += ' echo "WARNING: Output file '+fileWithSuffix+' not found"\n'
1062 ewv 1.156 if common.scheduler.name().upper() == 'CONDOR_G':
1063 gutsche 1.7 txt += ' if [ $middleware == OSG ]; then \n'
1064     txt += ' echo "prepare dummy output file"\n'
1065     txt += ' echo "Processing of job output failed" > $RUNTIME_AREA/'+output_file_num+'\n'
1066     txt += ' fi \n'
1067 slacapra 1.1 txt += 'fi\n'
1068 slacapra 1.105 file_list = []
1069     for fileWithSuffix in (self.output_file):
1070     file_list.append(self.numberFile_(fileWithSuffix, '$NJob'))
1071 ewv 1.131
1072 slacapra 1.105 txt += 'file_list="'+string.join(file_list,' ')+'"\n'
1073 fanzago 1.149 txt += '\n'
1074 fanzago 1.148 txt += 'echo ">>> current directory (SOFTWARE_DIR): $SOFTWARE_DIR" \n'
1075     txt += 'echo ">>> current directory content:"\n'
1076     txt += 'ls \n'
1077     txt += '\n'
1078 gutsche 1.7 txt += 'cd $RUNTIME_AREA\n'
1079 fanzago 1.133 txt += 'echo ">>> current directory (RUNTIME_AREA): $RUNTIME_AREA"\n'
1080 slacapra 1.1 return txt
1081    
1082     def numberFile_(self, file, txt):
1083     """
1084     append _'txt' before last extension of a file
1085     """
1086     p = string.split(file,".")
1087     # take away last extension
1088     name = p[0]
1089     for x in p[1:-1]:
1090 slacapra 1.90 name=name+"."+x
1091 slacapra 1.1 # add "_txt"
1092     if len(p)>1:
1093 slacapra 1.90 ext = p[len(p)-1]
1094     result = name + '_' + txt + "." + ext
1095 slacapra 1.1 else:
1096 slacapra 1.90 result = name + '_' + txt
1097 ewv 1.131
1098 slacapra 1.1 return result
1099    
1100 slacapra 1.63 def getRequirements(self, nj=[]):
1101 slacapra 1.1 """
1102 ewv 1.131 return job requirements to add to jdl files
1103 slacapra 1.1 """
1104     req = ''
1105 slacapra 1.47 if self.version:
1106 slacapra 1.10 req='Member("VO-cms-' + \
1107 slacapra 1.47 self.version + \
1108 slacapra 1.10 '", other.GlueHostApplicationSoftwareRunTimeEnvironment)'
1109 farinafa 1.111 ## SL add requirement for OS version only if SL4
1110     #reSL4 = re.compile( r'slc4' )
1111 slacapra 1.109 if self.executable_arch: # and reSL4.search(self.executable_arch):
1112 gutsche 1.107 req+=' && Member("VO-cms-' + \
1113 slacapra 1.105 self.executable_arch + \
1114     '", other.GlueHostApplicationSoftwareRunTimeEnvironment)'
1115 gutsche 1.35
1116     req = req + ' && (other.GlueHostNetworkAdapterOutboundIP)'
1117 afanfani 1.158 if common.scheduler.name() == "glitecoll":
1118     req += ' && other.GlueCEStateStatus == "Production" '
1119 gutsche 1.35
1120 slacapra 1.1 return req
1121 gutsche 1.3
1122     def configFilename(self):
1123     """ return the config filename """
1124     return self.name()+'.cfg'
1125    
1126     def wsSetupCMSOSGEnvironment_(self):
1127     """
1128     Returns part of a job script which is prepares
1129     the execution environment and which is common for all CMS jobs.
1130     """
1131 ewv 1.160 txt = '\n#Written by cms_cmssw::wsSetupCMSOSGEnvironment_\n'
1132     txt += ' echo ">>> setup CMS OSG environment:"\n'
1133 fanzago 1.133 txt += ' echo "set SCRAM ARCH to ' + self.executable_arch + '"\n'
1134     txt += ' export SCRAM_ARCH='+self.executable_arch+'\n'
1135 fanzago 1.136 txt += ' echo "SCRAM_ARCH = $SCRAM_ARCH"\n'
1136 ewv 1.135 txt += ' if [ -f $OSG_APP/cmssoft/cms/cmsset_default.sh ] ;then\n'
1137 mkirn 1.40 txt += ' # Use $OSG_APP/cmssoft/cms/cmsset_default.sh to setup cms software\n'
1138 fanzago 1.133 txt += ' source $OSG_APP/cmssoft/cms/cmsset_default.sh '+self.version+'\n'
1139     txt += ' else\n'
1140 fanzago 1.161 txt += ' echo "ERROR ==> $OSG_APP/cmssoft/cms/cmsset_default.sh file not found"\n'
1141     txt += ' job_exit_code=10020\n'
1142     txt += ' func_exit\n'
1143 fanzago 1.133 txt += ' fi\n'
1144 gutsche 1.3 txt += '\n'
1145 fanzago 1.161 txt += ' echo "==> setup cms environment ok"\n'
1146 fanzago 1.136 txt += ' echo "SCRAM_ARCH = $SCRAM_ARCH"\n'
1147 gutsche 1.3
1148     return txt
1149 ewv 1.131
1150 gutsche 1.3 ### OLI_DANIELE
1151     def wsSetupCMSLCGEnvironment_(self):
1152     """
1153     Returns part of a job script which is prepares
1154     the execution environment and which is common for all CMS jobs.
1155     """
1156 ewv 1.160 txt = '\n#Written by cms_cmssw::wsSetupCMSLCGEnvironment_\n'
1157     txt += ' echo ">>> setup CMS LCG environment:"\n'
1158 fanzago 1.133 txt += ' echo "set SCRAM ARCH and BUILD_ARCH to ' + self.executable_arch + ' ###"\n'
1159     txt += ' export SCRAM_ARCH='+self.executable_arch+'\n'
1160     txt += ' export BUILD_ARCH='+self.executable_arch+'\n'
1161     txt += ' if [ ! $VO_CMS_SW_DIR ] ;then\n'
1162 fanzago 1.161 txt += ' echo "ERROR ==> CMS software dir not found on WN `hostname`"\n'
1163     txt += ' job_exit_code=10031\n'
1164     txt += ' func_exit\n'
1165 fanzago 1.133 txt += ' else\n'
1166     txt += ' echo "Sourcing environment... "\n'
1167     txt += ' if [ ! -s $VO_CMS_SW_DIR/cmsset_default.sh ] ;then\n'
1168 fanzago 1.161 txt += ' echo "ERROR ==> cmsset_default.sh file not found into dir $VO_CMS_SW_DIR"\n'
1169     txt += ' job_exit_code=10020\n'
1170     txt += ' func_exit\n'
1171 fanzago 1.133 txt += ' fi\n'
1172     txt += ' echo "sourcing $VO_CMS_SW_DIR/cmsset_default.sh"\n'
1173     txt += ' source $VO_CMS_SW_DIR/cmsset_default.sh\n'
1174     txt += ' result=$?\n'
1175     txt += ' if [ $result -ne 0 ]; then\n'
1176 fanzago 1.161 txt += ' echo "ERROR ==> problem sourcing $VO_CMS_SW_DIR/cmsset_default.sh"\n'
1177     txt += ' job_exit_code=10032\n'
1178     txt += ' func_exit\n'
1179 fanzago 1.133 txt += ' fi\n'
1180     txt += ' fi\n'
1181     txt += ' \n'
1182 fanzago 1.161 txt += ' echo "==> setup cms environment ok"\n'
1183 gutsche 1.3 return txt
1184 gutsche 1.5
1185 ewv 1.131 ### FEDE FOR DBS OUTPUT PUBLICATION
1186 fanzago 1.93 def modifyReport(self, nj):
1187     """
1188 ewv 1.131 insert the part of the script that modifies the FrameworkJob Report
1189 fanzago 1.93 """
1190 fanzago 1.94
1191 ewv 1.160 txt = '\n#Written by cms_cmssw::modifyReport\n'
1192 slacapra 1.176 publish_data = int(self.cfg_params.get('USER.publish_data',0))
1193 ewv 1.131 if (publish_data == 1):
1194 fanzago 1.94 processedDataset = self.cfg_params['USER.publish_data_name']
1195 fanzago 1.173 LFNBaseName = LFNBase(processedDataset)
1196 fanzago 1.175
1197     txt += 'if [ $copy_exit_status -eq 0 ]; then\n'
1198 fanzago 1.173 txt += ' FOR_LFN=%s_${PSETHASH}/\n'%(LFNBaseName)
1199 fanzago 1.175 txt += 'else\n'
1200     txt += ' FOR_LFN=/copy_problems/ \n'
1201     txt += ' SE=""\n'
1202     txt += ' SE_PATH=""\n'
1203     txt += 'fi\n'
1204    
1205     txt += 'echo ">>> Modify Job Report:" \n'
1206     txt += 'chmod a+x $SOFTWARE_DIR/ProdCommon/ProdCommon/FwkJobRep/ModifyJobReport.py\n'
1207     txt += 'ProcessedDataset='+processedDataset+'\n'
1208     txt += 'echo "ProcessedDataset = $ProcessedDataset"\n'
1209     txt += 'echo "SE = $SE"\n'
1210     txt += 'echo "SE_PATH = $SE_PATH"\n'
1211     txt += 'echo "FOR_LFN = $FOR_LFN" \n'
1212     txt += 'echo "CMSSW_VERSION = $CMSSW_VERSION"\n\n'
1213     txt += 'echo "$SOFTWARE_DIR/ProdCommon/ProdCommon/FwkJobRep/ModifyJobReport.py $RUNTIME_AREA/crab_fjr_$NJob.xml $NJob $FOR_LFN $PrimaryDataset $DataTier $ProcessedDataset $ApplicationFamily $executable $CMSSW_VERSION $PSETHASH $SE $SE_PATH"\n'
1214     txt += '$SOFTWARE_DIR/ProdCommon/ProdCommon/FwkJobRep/ModifyJobReport.py $RUNTIME_AREA/crab_fjr_$NJob.xml $NJob $FOR_LFN $PrimaryDataset $DataTier $ProcessedDataset $ApplicationFamily $executable $CMSSW_VERSION $PSETHASH $SE $SE_PATH\n'
1215     txt += 'modifyReport_result=$?\n'
1216     txt += 'if [ $modifyReport_result -ne 0 ]; then\n'
1217     txt += ' modifyReport_result=70500\n'
1218     txt += ' job_exit_code=$modifyReport_result\n'
1219     txt += ' echo "ModifyReportResult=$modifyReport_result" | tee -a $RUNTIME_AREA/$repo\n'
1220     txt += ' echo "WARNING: Problem with ModifyJobReport"\n'
1221     txt += 'else\n'
1222     txt += ' mv NewFrameworkJobReport.xml $RUNTIME_AREA/crab_fjr_$NJob.xml\n'
1223 spiga 1.103 txt += 'fi\n'
1224 fanzago 1.93 return txt
1225 fanzago 1.99
1226 gutsche 1.5 def setParam_(self, param, value):
1227     self._params[param] = value
1228    
1229     def getParams(self):
1230     return self._params
1231 gutsche 1.8
1232 gutsche 1.35 def uniquelist(self, old):
1233     """
1234     remove duplicates from a list
1235     """
1236     nd={}
1237     for e in old:
1238     nd[e]=0
1239     return nd.keys()
1240 mcinquil 1.121
1241 spiga 1.169 def outList(self):
1242 mcinquil 1.121 """
1243     check the dimension of the output files
1244     """
1245 spiga 1.169 txt = ''
1246     txt += 'echo ">>> list of expected files on output sandbox"\n'
1247 mcinquil 1.121 listOutFiles = []
1248 ewv 1.170 stdout = 'CMSSW_$NJob.stdout'
1249 spiga 1.169 stderr = 'CMSSW_$NJob.stderr'
1250 fanzago 1.148 if (self.return_data == 1):
1251 spiga 1.157 for file in (self.output_file+self.output_file_sandbox):
1252     listOutFiles.append(self.numberFile_(file, '$NJob'))
1253 spiga 1.169 listOutFiles.append(stdout)
1254     listOutFiles.append(stderr)
1255 ewv 1.156 else:
1256 spiga 1.157 for file in (self.output_file_sandbox):
1257     listOutFiles.append(self.numberFile_(file, '$NJob'))
1258 spiga 1.169 listOutFiles.append(stdout)
1259     listOutFiles.append(stderr)
1260 fanzago 1.161 txt += 'echo "output files: '+string.join(listOutFiles,' ')+'"\n'
1261 spiga 1.157 txt += 'filesToCheck="'+string.join(listOutFiles,' ')+'"\n'
1262 spiga 1.169 txt += 'export filesToCheck\n'
1263 ewv 1.170 return txt