ViewVC Help
View File | Revision Log | Show Annotations | Root Listing
root/cvsroot/COMP/CRAB/python/cms_cmssw.py
Revision: 1.179
Committed: Sun Apr 20 15:25:42 2008 UTC (17 years ago) by spiga
Content type: text/x-python
Branch: MAIN
CVS Tags: CRAB_2_2_0_pre10
Changes since 1.178: +32 -41 lines
Log Message:
now the input sanbox is just one tarball plus the wrapper script

File Contents

# User Rev Content
1 slacapra 1.1 from JobType import JobType
2     from crab_logger import Logger
3     from crab_exceptions import *
4     from crab_util import *
5 fanzago 1.115 from BlackWhiteListParser import BlackWhiteListParser
6 slacapra 1.1 import common
7     import Scram
8 fanzago 1.173 from LFNBaseName import *
9 slacapra 1.1
10 slacapra 1.105 import os, string, glob
11 slacapra 1.1
12     class Cmssw(JobType):
13 mcinquil 1.144 def __init__(self, cfg_params, ncjobs):
14 slacapra 1.1 JobType.__init__(self, 'CMSSW')
15     common.logger.debug(3,'CMSSW::__init__')
16    
17 mcinquil 1.140 self.argsList = []
18 mcinquil 1.144
19 gutsche 1.3 self._params = {}
20     self.cfg_params = cfg_params
21 fanzago 1.115 # init BlackWhiteListParser
22     self.blackWhiteListParser = BlackWhiteListParser(cfg_params)
23    
24 slacapra 1.153 self.MaxTarBallSize = float(self.cfg_params.get('EDG.maxtarballsize',9.5))
25 gutsche 1.72
26 gutsche 1.44 # number of jobs requested to be created, limit obj splitting
27 gutsche 1.38 self.ncjobs = ncjobs
28    
29 slacapra 1.1 log = common.logger
30 ewv 1.131
31 slacapra 1.1 self.scram = Scram.Scram(cfg_params)
32     self.additional_inbox_files = []
33     self.scriptExe = ''
34     self.executable = ''
35 slacapra 1.71 self.executable_arch = self.scram.getArch()
36 slacapra 1.1 self.tgz_name = 'default.tgz'
37 corvo 1.56 self.scriptName = 'CMSSW.sh'
38 ewv 1.131 self.pset = '' #scrip use case Da
39 spiga 1.42 self.datasetPath = '' #scrip use case Da
40 gutsche 1.3
41 gutsche 1.50 # set FJR file name
42     self.fjrFileName = 'crab_fjr.xml'
43    
44 slacapra 1.1 self.version = self.scram.getSWVersion()
45 ewv 1.131
46 spiga 1.114 #
47     # Try to block creation in case of arch/version mismatch
48     #
49    
50 spiga 1.162 # a = string.split(self.version, "_")
51     #
52     # if int(a[1]) == 1 and (int(a[2]) < 5 and self.executable_arch.find('slc4') == 0):
53     # msg = "Warning: You are using %s version of CMSSW with %s architecture. \n--> Did you compile your libraries with SLC3? Otherwise you can find some problems running on SLC4 Grid nodes.\n"%(self.version, self.executable_arch)
54     # common.logger.message(msg)
55     # if int(a[1]) == 1 and (int(a[2]) >= 5 and self.executable_arch.find('slc3') == 0):
56     # msg = "Error: CMS does not support %s with %s architecture"%(self.version, self.executable_arch)
57     # raise CrabException(msg)
58     #
59 ewv 1.170
60 slacapra 1.47
61 slacapra 1.1 ### collect Data cards
62 gutsche 1.66
63 slacapra 1.153 if not cfg_params.has_key('CMSSW.datasetpath'):
64 ewv 1.131 msg = "Error: datasetpath not defined "
65 slacapra 1.1 raise CrabException(msg)
66 slacapra 1.153 tmp = cfg_params['CMSSW.datasetpath']
67     log.debug(6, "CMSSW::CMSSW(): datasetPath = "+tmp)
68     if string.lower(tmp)=='none':
69     self.datasetPath = None
70     self.selectNoInput = 1
71     else:
72     self.datasetPath = tmp
73     self.selectNoInput = 0
74 gutsche 1.5
75 slacapra 1.1 self.dataTiers = []
76    
77     ## now the application
78 slacapra 1.153 self.executable = cfg_params.get('CMSSW.executable','cmsRun')
79     log.debug(6, "CMSSW::CMSSW(): executable = "+self.executable)
80 slacapra 1.1
81 slacapra 1.153 if not cfg_params.has_key('CMSSW.pset'):
82 slacapra 1.1 raise CrabException("PSet file missing. Cannot run cmsRun ")
83 slacapra 1.153 self.pset = cfg_params['CMSSW.pset']
84     log.debug(6, "Cmssw::Cmssw(): PSet file = "+self.pset)
85     if self.pset.lower() != 'none' :
86     if (not os.path.exists(self.pset)):
87     raise CrabException("User defined PSet file "+self.pset+" does not exist")
88     else:
89     self.pset = None
90 slacapra 1.1
91     # output files
92 slacapra 1.53 ## stuff which must be returned always via sandbox
93     self.output_file_sandbox = []
94    
95     # add fjr report by default via sandbox
96     self.output_file_sandbox.append(self.fjrFileName)
97    
98     # other output files to be returned via sandbox or copied to SE
99 slacapra 1.153 self.output_file = []
100     tmp = cfg_params.get('CMSSW.output_file',None)
101     if tmp :
102     tmpOutFiles = string.split(tmp,',')
103     log.debug(7, 'cmssw::cmssw(): output files '+str(tmpOutFiles))
104     for tmp in tmpOutFiles:
105     tmp=string.strip(tmp)
106     self.output_file.append(tmp)
107 slacapra 1.1 pass
108 slacapra 1.153 else:
109 gutsche 1.92 log.message("No output file defined: only stdout/err and the CRAB Framework Job Report will be available\n")
110 slacapra 1.153 pass
111 slacapra 1.1
112     # script_exe file as additional file in inputSandbox
113 slacapra 1.153 self.scriptExe = cfg_params.get('USER.script_exe',None)
114     if self.scriptExe :
115 slacapra 1.176 if not os.path.isfile(self.scriptExe):
116     msg ="ERROR. file "+self.scriptExe+" not found"
117     raise CrabException(msg)
118     self.additional_inbox_files.append(string.strip(self.scriptExe))
119 slacapra 1.70
120 spiga 1.42 #CarlosDaniele
121     if self.datasetPath == None and self.pset == None and self.scriptExe == '' :
122 slacapra 1.176 msg ="Error. script_exe not defined"
123     raise CrabException(msg)
124 spiga 1.42
125 slacapra 1.1 ## additional input files
126 slacapra 1.153 if cfg_params.has_key('USER.additional_input_files'):
127 slacapra 1.29 tmpAddFiles = string.split(cfg_params['USER.additional_input_files'],',')
128 slacapra 1.70 for tmp in tmpAddFiles:
129     tmp = string.strip(tmp)
130     dirname = ''
131     if not tmp[0]=="/": dirname = "."
132 corvo 1.85 files = []
133     if string.find(tmp,"*")>-1:
134     files = glob.glob(os.path.join(dirname, tmp))
135     if len(files)==0:
136     raise CrabException("No additional input file found with this pattern: "+tmp)
137     else:
138     files.append(tmp)
139 slacapra 1.70 for file in files:
140     if not os.path.exists(file):
141     raise CrabException("Additional input file not found: "+file)
142 slacapra 1.45 pass
143 slacapra 1.105 # fname = string.split(file, '/')[-1]
144     # storedFile = common.work_space.pathForTgz()+'share/'+fname
145     # shutil.copyfile(file, storedFile)
146     self.additional_inbox_files.append(string.strip(file))
147 slacapra 1.1 pass
148     pass
149 slacapra 1.70 common.logger.debug(5,"Additional input files: "+str(self.additional_inbox_files))
150 slacapra 1.153 pass
151 gutsche 1.3
152 slacapra 1.9 ## Events per job
153 slacapra 1.153 if cfg_params.has_key('CMSSW.events_per_job'):
154 slacapra 1.10 self.eventsPerJob =int( cfg_params['CMSSW.events_per_job'])
155 slacapra 1.9 self.selectEventsPerJob = 1
156 slacapra 1.153 else:
157 slacapra 1.9 self.eventsPerJob = -1
158     self.selectEventsPerJob = 0
159 ewv 1.131
160 slacapra 1.22 ## number of jobs
161 slacapra 1.153 if cfg_params.has_key('CMSSW.number_of_jobs'):
162 slacapra 1.22 self.theNumberOfJobs =int( cfg_params['CMSSW.number_of_jobs'])
163     self.selectNumberOfJobs = 1
164 slacapra 1.153 else:
165 slacapra 1.22 self.theNumberOfJobs = 0
166     self.selectNumberOfJobs = 0
167 slacapra 1.10
168 slacapra 1.153 if cfg_params.has_key('CMSSW.total_number_of_events'):
169 gutsche 1.35 self.total_number_of_events = int(cfg_params['CMSSW.total_number_of_events'])
170     self.selectTotalNumberEvents = 1
171 slacapra 1.153 else:
172 gutsche 1.35 self.total_number_of_events = 0
173     self.selectTotalNumberEvents = 0
174    
175 ewv 1.131 if self.pset != None: #CarlosDaniele
176 spiga 1.42 if ( (self.selectTotalNumberEvents + self.selectEventsPerJob + self.selectNumberOfJobs) != 2 ):
177     msg = 'Must define exactly two of total_number_of_events, events_per_job, or number_of_jobs.'
178     raise CrabException(msg)
179     else:
180     if (self.selectNumberOfJobs == 0):
181     msg = 'Must specify number_of_jobs.'
182     raise CrabException(msg)
183 gutsche 1.35
184 ewv 1.160 ## New method of dealing with seeds
185     self.incrementSeeds = []
186     self.preserveSeeds = []
187     if cfg_params.has_key('CMSSW.preserve_seeds'):
188     tmpList = cfg_params['CMSSW.preserve_seeds'].split(',')
189     for tmp in tmpList:
190     tmp.strip()
191     self.preserveSeeds.append(tmp)
192     if cfg_params.has_key('CMSSW.increment_seeds'):
193     tmpList = cfg_params['CMSSW.increment_seeds'].split(',')
194     for tmp in tmpList:
195     tmp.strip()
196     self.incrementSeeds.append(tmp)
197    
198     ## Old method of dealing with seeds
199     ## FUTURE: This is for old CMSSW and old CRAB. Can throw exceptions after a couple of CRAB releases and then
200     ## remove
201 slacapra 1.153 self.sourceSeed = cfg_params.get('CMSSW.pythia_seed',None)
202 ewv 1.160 if self.sourceSeed:
203 slacapra 1.177 print "pythia_seed is a deprecated parameter. Use preserve_seeds or increment_seeds in the future.\n","Added to increment_seeds."
204     self.incrementSeeds.append('sourceSeed')
205 slacapra 1.153
206     self.sourceSeedVtx = cfg_params.get('CMSSW.vtx_seed',None)
207 ewv 1.160 if self.sourceSeedVtx:
208 slacapra 1.177 print "vtx_seed is a deprecated parameter. Use preserve_seeds or increment_seeds in the future.\n","Added to increment_seeds."
209     self.incrementSeeds.append('VtxSmeared')
210 slacapra 1.22
211 slacapra 1.153 self.sourceSeedG4 = cfg_params.get('CMSSW.g4_seed',None)
212 ewv 1.160 if self.sourceSeedG4:
213 slacapra 1.177 print "g4_seed is a deprecated parameter. Use preserve_seeds or increment_seeds in the future.\n","Added to increment_seeds."
214     self.incrementSeeds.append('g4SimHits')
215 slacapra 1.90
216 slacapra 1.153 self.sourceSeedMix = cfg_params.get('CMSSW.mix_seed',None)
217 ewv 1.160 if self.sourceSeedMix:
218 slacapra 1.177 print "mix_seed is a deprecated parameter. Use preserve_seeds or increment_seeds in the future.\n","Added to increment_seeds."
219     self.incrementSeeds.append('mix')
220 slacapra 1.90
221 slacapra 1.153 self.firstRun = cfg_params.get('CMSSW.first_run',None)
222 slacapra 1.90
223 spiga 1.42 if self.pset != None: #CarlosDaniele
224 ewv 1.131 import PsetManipulator as pp
225 slacapra 1.97 PsetEdit = pp.PsetManipulator(self.pset) #Daniele Pset
226 gutsche 1.3
227 ewv 1.147 # Copy/return
228    
229 slacapra 1.153 self.copy_data = int(cfg_params.get('USER.copy_data',0))
230     self.return_data = int(cfg_params.get('USER.return_data',0))
231 ewv 1.147
232 slacapra 1.1 #DBSDLS-start
233 ewv 1.131 ## Initialize the variables that are extracted from DBS/DLS and needed in other places of the code
234 slacapra 1.1 self.maxEvents=0 # max events available ( --> check the requested nb. of evts in Creator.py)
235     self.DBSPaths={} # all dbs paths requested ( --> input to the site local discovery script)
236 gutsche 1.35 self.jobDestination=[] # Site destination(s) for each job (list of lists)
237 slacapra 1.1 ## Perform the data location and discovery (based on DBS/DLS)
238 slacapra 1.9 ## SL: Don't if NONE is specified as input (pythia use case)
239 gutsche 1.35 blockSites = {}
240 slacapra 1.9 if self.datasetPath:
241 gutsche 1.35 blockSites = self.DataDiscoveryAndLocation(cfg_params)
242 ewv 1.131 #DBSDLS-end
243 slacapra 1.1
244 ewv 1.131
245 slacapra 1.9 ## Select Splitting
246 ewv 1.131 if self.selectNoInput:
247 spiga 1.42 if self.pset == None: #CarlosDaniele
248     self.jobSplittingForScript()
249     else:
250     self.jobSplittingNoInput()
251 gutsche 1.92 else:
252 corvo 1.56 self.jobSplittingByBlocks(blockSites)
253 gutsche 1.5
254 slacapra 1.22 # modify Pset
255 spiga 1.42 if self.pset != None: #CarlosDaniele
256 slacapra 1.86 try:
257 ewv 1.160 # Add FrameworkJobReport to parameter-set, set max events.
258     # Reset later for data jobs by writeCFG which does all modifications
259 slacapra 1.90 PsetEdit.addCrabFJR(self.fjrFileName)
260 ewv 1.160 PsetEdit.maxEvent(self.eventsPerJob)
261 slacapra 1.90 PsetEdit.psetWriter(self.configFilename())
262 slacapra 1.86 except:
263     msg='Error while manipuliating ParameterSet: exiting...'
264     raise CrabException(msg)
265 spiga 1.179 self.tgzNameWithPath = self.getTarBall(self.executable)
266 gutsche 1.3
267 slacapra 1.1 def DataDiscoveryAndLocation(self, cfg_params):
268    
269 slacapra 1.86 import DataDiscovery
270     import DataLocation
271 gutsche 1.3 common.logger.debug(10,"CMSSW::DataDiscoveryAndLocation()")
272    
273     datasetPath=self.datasetPath
274    
275 slacapra 1.1 ## Contact the DBS
276 gutsche 1.92 common.logger.message("Contacting Data Discovery Services ...")
277 slacapra 1.1 try:
278 slacapra 1.137 self.pubdata=DataDiscovery.DataDiscovery(datasetPath, cfg_params)
279 slacapra 1.1 self.pubdata.fetchDBSInfo()
280    
281 slacapra 1.41 except DataDiscovery.NotExistingDatasetError, ex :
282 slacapra 1.1 msg = 'ERROR ***: failed Data Discovery in DBS : %s'%ex.getErrorMessage()
283     raise CrabException(msg)
284 slacapra 1.41 except DataDiscovery.NoDataTierinProvenanceError, ex :
285 slacapra 1.1 msg = 'ERROR ***: failed Data Discovery in DBS : %s'%ex.getErrorMessage()
286     raise CrabException(msg)
287 slacapra 1.41 except DataDiscovery.DataDiscoveryError, ex:
288 gutsche 1.66 msg = 'ERROR ***: failed Data Discovery in DBS : %s'%ex.getErrorMessage()
289 slacapra 1.1 raise CrabException(msg)
290    
291 gutsche 1.35 self.filesbyblock=self.pubdata.getFiles()
292 mkirn 1.37 self.eventsbyblock=self.pubdata.getEventsPerBlock()
293     self.eventsbyfile=self.pubdata.getEventsPerFile()
294 gutsche 1.3
295 slacapra 1.1 ## get max number of events
296 ewv 1.131 self.maxEvents=self.pubdata.getMaxEvents() ## self.maxEvents used in Creator.py
297 slacapra 1.1
298     ## Contact the DLS and build a list of sites hosting the fileblocks
299     try:
300 slacapra 1.41 dataloc=DataLocation.DataLocation(self.filesbyblock.keys(),cfg_params)
301 gutsche 1.6 dataloc.fetchDLSInfo()
302 slacapra 1.41 except DataLocation.DataLocationError , ex:
303 slacapra 1.1 msg = 'ERROR ***: failed Data Location in DLS \n %s '%ex.getErrorMessage()
304     raise CrabException(msg)
305 ewv 1.131
306 slacapra 1.1
307 gutsche 1.35 sites = dataloc.getSites()
308     allSites = []
309     listSites = sites.values()
310 slacapra 1.63 for listSite in listSites:
311     for oneSite in listSite:
312 gutsche 1.35 allSites.append(oneSite)
313     allSites = self.uniquelist(allSites)
314 gutsche 1.3
315 gutsche 1.92 # screen output
316     common.logger.message("Requested dataset: " + datasetPath + " has " + str(self.maxEvents) + " events in " + str(len(self.filesbyblock.keys())) + " blocks.\n")
317    
318 gutsche 1.35 return sites
319 ewv 1.131
320 ewv 1.170 # to Be Removed DS -- BL
321 spiga 1.165 # def setArgsList(self, argsList):
322     # self.argsList = argsList
323 mcinquil 1.140
324 gutsche 1.35 def jobSplittingByBlocks(self, blockSites):
325 slacapra 1.9 """
326 gutsche 1.35 Perform job splitting. Jobs run over an integer number of files
327     and no more than one block.
328     ARGUMENT: blockSites: dictionary with blocks as keys and list of host sites as values
329     REQUIRES: self.selectTotalNumberEvents, self.selectEventsPerJob, self.selectNumberofJobs,
330     self.total_number_of_events, self.eventsPerJob, self.theNumberOfJobs,
331     self.maxEvents, self.filesbyblock
332     SETS: self.jobDestination - Site destination(s) for each job (a list of lists)
333     self.total_number_of_jobs - Total # of jobs
334     self.list_of_args - File(s) job will run on (a list of lists)
335     """
336    
337     # ---- Handle the possible job splitting configurations ---- #
338     if (self.selectTotalNumberEvents):
339     totalEventsRequested = self.total_number_of_events
340     if (self.selectEventsPerJob):
341     eventsPerJobRequested = self.eventsPerJob
342     if (self.selectNumberOfJobs):
343     totalEventsRequested = self.theNumberOfJobs * self.eventsPerJob
344    
345     # If user requested all the events in the dataset
346     if (totalEventsRequested == -1):
347     eventsRemaining=self.maxEvents
348     # If user requested more events than are in the dataset
349     elif (totalEventsRequested > self.maxEvents):
350     eventsRemaining = self.maxEvents
351     common.logger.message("Requested "+str(self.total_number_of_events)+ " events, but only "+str(self.maxEvents)+" events are available.")
352     # If user requested less events than are in the dataset
353     else:
354     eventsRemaining = totalEventsRequested
355 slacapra 1.22
356 slacapra 1.41 # If user requested more events per job than are in the dataset
357     if (self.selectEventsPerJob and eventsPerJobRequested > self.maxEvents):
358     eventsPerJobRequested = self.maxEvents
359    
360 gutsche 1.35 # For user info at end
361     totalEventCount = 0
362 gutsche 1.3
363 gutsche 1.35 if (self.selectTotalNumberEvents and self.selectNumberOfJobs):
364     eventsPerJobRequested = int(eventsRemaining/self.theNumberOfJobs)
365 slacapra 1.22
366 gutsche 1.35 if (self.selectNumberOfJobs):
367     common.logger.message("May not create the exact number_of_jobs requested.")
368 slacapra 1.23
369 gutsche 1.38 if ( self.ncjobs == 'all' ) :
370     totalNumberOfJobs = 999999999
371     else :
372     totalNumberOfJobs = self.ncjobs
373 ewv 1.131
374 gutsche 1.35 blocks = blockSites.keys()
375     blockCount = 0
376     # Backup variable in case self.maxEvents counted events in a non-included block
377     numBlocksInDataset = len(blocks)
378 gutsche 1.3
379 gutsche 1.35 jobCount = 0
380     list_of_lists = []
381 gutsche 1.3
382 gutsche 1.92 # list tracking which jobs are in which jobs belong to which block
383     jobsOfBlock = {}
384    
385 gutsche 1.35 # ---- Iterate over the blocks in the dataset until ---- #
386     # ---- we've met the requested total # of events ---- #
387 gutsche 1.38 while ( (eventsRemaining > 0) and (blockCount < numBlocksInDataset) and (jobCount < totalNumberOfJobs)):
388 gutsche 1.35 block = blocks[blockCount]
389 gutsche 1.44 blockCount += 1
390 gutsche 1.104 if block not in jobsOfBlock.keys() :
391     jobsOfBlock[block] = []
392 ewv 1.131
393 gutsche 1.68 if self.eventsbyblock.has_key(block) :
394     numEventsInBlock = self.eventsbyblock[block]
395     common.logger.debug(5,'Events in Block File '+str(numEventsInBlock))
396 ewv 1.131
397 gutsche 1.68 files = self.filesbyblock[block]
398     numFilesInBlock = len(files)
399     if (numFilesInBlock <= 0):
400     continue
401     fileCount = 0
402    
403     # ---- New block => New job ---- #
404 ewv 1.131 parString = ""
405 gutsche 1.68 # counter for number of events in files currently worked on
406     filesEventCount = 0
407     # flag if next while loop should touch new file
408     newFile = 1
409     # job event counter
410     jobSkipEventCount = 0
411 ewv 1.131
412 gutsche 1.68 # ---- Iterate over the files in the block until we've met the requested ---- #
413     # ---- total # of events or we've gone over all the files in this block ---- #
414     while ( (eventsRemaining > 0) and (fileCount < numFilesInBlock) and (jobCount < totalNumberOfJobs) ):
415     file = files[fileCount]
416     if newFile :
417     try:
418     numEventsInFile = self.eventsbyfile[file]
419     common.logger.debug(6, "File "+str(file)+" has "+str(numEventsInFile)+" events")
420     # increase filesEventCount
421     filesEventCount += numEventsInFile
422     # Add file to current job
423     parString += '\\\"' + file + '\\\"\,'
424     newFile = 0
425     except KeyError:
426     common.logger.message("File "+str(file)+" has unknown number of events: skipping")
427 ewv 1.131
428 slacapra 1.177 eventsPerJobRequested = min(eventsPerJobRequested, eventsRemaining)
429 gutsche 1.68 # if less events in file remain than eventsPerJobRequested
430 slacapra 1.177 if ( filesEventCount - jobSkipEventCount < eventsPerJobRequested):
431 gutsche 1.68 # if last file in block
432     if ( fileCount == numFilesInBlock-1 ) :
433     # end job using last file, use remaining events in block
434     # close job and touch new file
435     fullString = parString[:-2]
436     list_of_lists.append([fullString,str(-1),str(jobSkipEventCount)])
437     common.logger.debug(3,"Job "+str(jobCount+1)+" can run over "+str(filesEventCount - jobSkipEventCount)+" events (last file in block).")
438     self.jobDestination.append(blockSites[block])
439     common.logger.debug(5,"Job "+str(jobCount+1)+" Destination: "+str(self.jobDestination[jobCount]))
440 gutsche 1.92 # fill jobs of block dictionary
441 gutsche 1.104 jobsOfBlock[block].append(jobCount+1)
442 gutsche 1.68 # reset counter
443     jobCount = jobCount + 1
444     totalEventCount = totalEventCount + filesEventCount - jobSkipEventCount
445     eventsRemaining = eventsRemaining - filesEventCount + jobSkipEventCount
446     jobSkipEventCount = 0
447     # reset file
448 ewv 1.131 parString = ""
449 gutsche 1.68 filesEventCount = 0
450     newFile = 1
451     fileCount += 1
452     else :
453     # go to next file
454     newFile = 1
455     fileCount += 1
456     # if events in file equal to eventsPerJobRequested
457     elif ( filesEventCount - jobSkipEventCount == eventsPerJobRequested ) :
458 gutsche 1.38 # close job and touch new file
459     fullString = parString[:-2]
460 gutsche 1.68 list_of_lists.append([fullString,str(eventsPerJobRequested),str(jobSkipEventCount)])
461     common.logger.debug(3,"Job "+str(jobCount+1)+" can run over "+str(eventsPerJobRequested)+" events.")
462 gutsche 1.38 self.jobDestination.append(blockSites[block])
463     common.logger.debug(5,"Job "+str(jobCount+1)+" Destination: "+str(self.jobDestination[jobCount]))
464 gutsche 1.104 jobsOfBlock[block].append(jobCount+1)
465 gutsche 1.38 # reset counter
466     jobCount = jobCount + 1
467 gutsche 1.68 totalEventCount = totalEventCount + eventsPerJobRequested
468     eventsRemaining = eventsRemaining - eventsPerJobRequested
469 gutsche 1.38 jobSkipEventCount = 0
470     # reset file
471 ewv 1.131 parString = ""
472 gutsche 1.38 filesEventCount = 0
473     newFile = 1
474     fileCount += 1
475 ewv 1.131
476 gutsche 1.68 # if more events in file remain than eventsPerJobRequested
477 gutsche 1.38 else :
478 gutsche 1.68 # close job but don't touch new file
479     fullString = parString[:-2]
480     list_of_lists.append([fullString,str(eventsPerJobRequested),str(jobSkipEventCount)])
481     common.logger.debug(3,"Job "+str(jobCount+1)+" can run over "+str(eventsPerJobRequested)+" events.")
482     self.jobDestination.append(blockSites[block])
483     common.logger.debug(5,"Job "+str(jobCount+1)+" Destination: "+str(self.jobDestination[jobCount]))
484 gutsche 1.104 jobsOfBlock[block].append(jobCount+1)
485 gutsche 1.68 # increase counter
486     jobCount = jobCount + 1
487     totalEventCount = totalEventCount + eventsPerJobRequested
488     eventsRemaining = eventsRemaining - eventsPerJobRequested
489     # calculate skip events for last file
490     # use filesEventCount (contains several files), jobSkipEventCount and eventsPerJobRequest
491     jobSkipEventCount = eventsPerJobRequested - (filesEventCount - jobSkipEventCount - self.eventsbyfile[file])
492     # remove all but the last file
493     filesEventCount = self.eventsbyfile[file]
494 ewv 1.160 parString = '\\\"' + file + '\\\"\,'
495 gutsche 1.68 pass # END if
496     pass # END while (iterate over files in the block)
497 gutsche 1.35 pass # END while (iterate over blocks in the dataset)
498 slacapra 1.41 self.ncjobs = self.total_number_of_jobs = jobCount
499 gutsche 1.38 if (eventsRemaining > 0 and jobCount < totalNumberOfJobs ):
500 gutsche 1.35 common.logger.message("Could not run on all requested events because some blocks not hosted at allowed sites.")
501 gutsche 1.92 common.logger.message(str(jobCount)+" job(s) can run on "+str(totalEventCount)+" events.\n")
502 ewv 1.131
503 gutsche 1.92 # screen output
504     screenOutput = "List of jobs and available destination sites:\n\n"
505    
506 mcinquil 1.124 # keep trace of block with no sites to print a warning at the end
507     noSiteBlock = []
508     bloskNoSite = []
509    
510 gutsche 1.92 blockCounter = 0
511 gutsche 1.104 for block in blocks:
512     if block in jobsOfBlock.keys() :
513     blockCounter += 1
514 slacapra 1.176 screenOutput += "Block %5i: jobs %20s: sites: %s\n" % (blockCounter,spanRanges(jobsOfBlock[block]),
515     ','.join(self.blackWhiteListParser.checkWhiteList(self.blackWhiteListParser.checkBlackList(blockSites[block],block),block)))
516 mcinquil 1.124 if len(self.blackWhiteListParser.checkWhiteList(self.blackWhiteListParser.checkBlackList(blockSites[block],block),block)) == 0:
517 ewv 1.131 noSiteBlock.append( spanRanges(jobsOfBlock[block]) )
518 mcinquil 1.124 bloskNoSite.append( blockCounter )
519 ewv 1.131
520 mcinquil 1.124 common.logger.message(screenOutput)
521 fanzago 1.127 if len(noSiteBlock) > 0 and len(bloskNoSite) > 0:
522 mcinquil 1.126 msg = 'WARNING: No sites are hosting any part of data for block:\n '
523     virgola = ""
524     if len(bloskNoSite) > 1:
525     virgola = ","
526     for block in bloskNoSite:
527     msg += ' ' + str(block) + virgola
528     msg += '\n Related jobs:\n '
529     virgola = ""
530     if len(noSiteBlock) > 1:
531     virgola = ","
532     for range_jobs in noSiteBlock:
533     msg += str(range_jobs) + virgola
534     msg += '\n will not be submitted and this block of data can not be analyzed!\n'
535 slacapra 1.155 if self.cfg_params.has_key('EDG.se_white_list'):
536     msg += 'WARNING: SE White List: '+self.cfg_params['EDG.se_white_list']+'\n'
537     msg += '(Hint: By whitelisting you force the job to run at this particular site(s).\n'
538     msg += 'Please check if the dataset is available at this site!)\n'
539     if self.cfg_params.has_key('EDG.ce_white_list'):
540     msg += 'WARNING: CE White List: '+self.cfg_params['EDG.ce_white_list']+'\n'
541     msg += '(Hint: By whitelisting you force the job to run at this particular site(s).\n'
542     msg += 'Please check if the dataset is available at this site!)\n'
543    
544 mcinquil 1.126 common.logger.message(msg)
545 gutsche 1.92
546 slacapra 1.9 self.list_of_args = list_of_lists
547     return
548    
549 slacapra 1.21 def jobSplittingNoInput(self):
550 slacapra 1.9 """
551     Perform job splitting based on number of event per job
552     """
553     common.logger.debug(5,'Splitting per events')
554 fanzago 1.130
555 ewv 1.131 if (self.selectEventsPerJob):
556 fanzago 1.130 common.logger.message('Required '+str(self.eventsPerJob)+' events per job ')
557     if (self.selectNumberOfJobs):
558     common.logger.message('Required '+str(self.theNumberOfJobs)+' jobs in total ')
559     if (self.selectTotalNumberEvents):
560     common.logger.message('Required '+str(self.total_number_of_events)+' events in total ')
561 slacapra 1.9
562 slacapra 1.10 if (self.total_number_of_events < 0):
563     msg='Cannot split jobs per Events with "-1" as total number of events'
564     raise CrabException(msg)
565    
566 slacapra 1.22 if (self.selectEventsPerJob):
567 spiga 1.65 if (self.selectTotalNumberEvents):
568     self.total_number_of_jobs = int(self.total_number_of_events/self.eventsPerJob)
569 ewv 1.131 elif(self.selectNumberOfJobs) :
570 spiga 1.65 self.total_number_of_jobs =self.theNumberOfJobs
571 ewv 1.131 self.total_number_of_events =int(self.theNumberOfJobs*self.eventsPerJob)
572 spiga 1.65
573 slacapra 1.22 elif (self.selectNumberOfJobs) :
574     self.total_number_of_jobs = self.theNumberOfJobs
575     self.eventsPerJob = int(self.total_number_of_events/self.total_number_of_jobs)
576 ewv 1.131
577 slacapra 1.9 common.logger.debug(5,'N jobs '+str(self.total_number_of_jobs))
578    
579     # is there any remainder?
580     check = int(self.total_number_of_events) - (int(self.total_number_of_jobs)*self.eventsPerJob)
581    
582     common.logger.debug(5,'Check '+str(check))
583    
584 gutsche 1.35 common.logger.message(str(self.total_number_of_jobs)+' jobs can be created, each for '+str(self.eventsPerJob)+' for a total of '+str(self.total_number_of_jobs*self.eventsPerJob)+' events')
585 slacapra 1.9 if check > 0:
586 gutsche 1.35 common.logger.message('Warning: asked '+str(self.total_number_of_events)+' but can do only '+str(int(self.total_number_of_jobs)*self.eventsPerJob))
587 slacapra 1.9
588 slacapra 1.10 # argument is seed number.$i
589 slacapra 1.9 self.list_of_args = []
590     for i in range(self.total_number_of_jobs):
591 gutsche 1.35 ## Since there is no input, any site is good
592 ewv 1.131 self.jobDestination.append([""]) #must be empty to write correctly the xml
593 slacapra 1.90 args=[]
594 spiga 1.57 if (self.firstRun):
595 slacapra 1.138 ## pythia first run
596 slacapra 1.90 args.append(str(self.firstRun)+str(i))
597     self.list_of_args.append(args)
598 ewv 1.131
599 gutsche 1.3 return
600    
601 spiga 1.42
602     def jobSplittingForScript(self):#CarlosDaniele
603     """
604     Perform job splitting based on number of job
605     """
606     common.logger.debug(5,'Splitting per job')
607     common.logger.message('Required '+str(self.theNumberOfJobs)+' jobs in total ')
608    
609     self.total_number_of_jobs = self.theNumberOfJobs
610    
611     common.logger.debug(5,'N jobs '+str(self.total_number_of_jobs))
612    
613     common.logger.message(str(self.total_number_of_jobs)+' jobs can be created')
614    
615     # argument is seed number.$i
616     self.list_of_args = []
617     for i in range(self.total_number_of_jobs):
618     ## Since there is no input, any site is good
619     # self.jobDestination.append(["Any"])
620     self.jobDestination.append([""])
621     ## no random seed
622     self.list_of_args.append([str(i)])
623     return
624    
625 gutsche 1.3 def split(self, jobParams):
626 ewv 1.131
627 gutsche 1.3 #### Fabio
628     njobs = self.total_number_of_jobs
629 slacapra 1.9 arglist = self.list_of_args
630 gutsche 1.3 # create the empty structure
631     for i in range(njobs):
632     jobParams.append("")
633 ewv 1.131
634 spiga 1.165 listID=[]
635     listField=[]
636 gutsche 1.3 for job in range(njobs):
637 slacapra 1.17 jobParams[job] = arglist[job]
638 spiga 1.167 listID.append(job+1)
639 spiga 1.162 job_ToSave ={}
640 spiga 1.169 concString = ' '
641 spiga 1.165 argu=''
642     if len(jobParams[job]):
643     argu += concString.join(jobParams[job] )
644 spiga 1.169 job_ToSave['arguments']= str(job+1)+' '+argu## new BL--DS
645 spiga 1.162 job_ToSave['dlsDestination']= self.jobDestination[job]## new BL--DS
646 spiga 1.165 #common._db.updateJob_(job,job_ToSave)## new BL--DS
647     listField.append(job_ToSave)
648 spiga 1.169 msg="Job "+str(job)+" Arguments: "+str(job+1)+" "+argu+"\n" \
649 spiga 1.165 +" Destination: "+str(self.jobDestination[job])
650     common.logger.debug(5,msg)
651     #common.logger.debug(5,"Job "+str(job)+" Destination: "+str(self.jobDestination[job]))
652     common._db.updateJob_(listID,listField)## new BL--DS
653     ## Pay Attention Here....DS--BL
654 ewv 1.170 self.argsList = (len(jobParams[1])+1)
655 gutsche 1.3
656     return
657 ewv 1.131
658 gutsche 1.3 def numberOfJobs(self):
659     # Fabio
660     return self.total_number_of_jobs
661    
662 slacapra 1.1 def getTarBall(self, exe):
663     """
664     Return the TarBall with lib and exe
665     """
666 ewv 1.131
667 slacapra 1.1 # if it exist, just return it
668 corvo 1.56 #
669     # Marco. Let's start to use relative path for Boss XML files
670     #
671     self.tgzNameWithPath = common.work_space.pathForTgz()+'share/'+self.tgz_name
672 slacapra 1.1 if os.path.exists(self.tgzNameWithPath):
673     return self.tgzNameWithPath
674    
675     # Prepare a tar gzipped file with user binaries.
676     self.buildTar_(exe)
677    
678     return string.strip(self.tgzNameWithPath)
679    
680     def buildTar_(self, executable):
681    
682     # First of all declare the user Scram area
683     swArea = self.scram.getSWArea_()
684     #print "swArea = ", swArea
685 slacapra 1.63 # swVersion = self.scram.getSWVersion()
686     # print "swVersion = ", swVersion
687 slacapra 1.1 swReleaseTop = self.scram.getReleaseTop_()
688     #print "swReleaseTop = ", swReleaseTop
689 ewv 1.131
690 slacapra 1.1 ## check if working area is release top
691     if swReleaseTop == '' or swArea == swReleaseTop:
692 afanfani 1.172 common.logger.debug(3,"swArea = "+swArea+" swReleaseTop ="+swReleaseTop)
693 slacapra 1.1 return
694    
695 slacapra 1.61 import tarfile
696     try: # create tar ball
697     tar = tarfile.open(self.tgzNameWithPath, "w:gz")
698     ## First find the executable
699 slacapra 1.86 if (self.executable != ''):
700 slacapra 1.61 exeWithPath = self.scram.findFile_(executable)
701     if ( not exeWithPath ):
702     raise CrabException('User executable '+executable+' not found')
703 ewv 1.131
704 slacapra 1.61 ## then check if it's private or not
705     if exeWithPath.find(swReleaseTop) == -1:
706     # the exe is private, so we must ship
707     common.logger.debug(5,"Exe "+exeWithPath+" to be tarred")
708     path = swArea+'/'
709 corvo 1.85 # distinguish case when script is in user project area or given by full path somewhere else
710     if exeWithPath.find(path) >= 0 :
711     exe = string.replace(exeWithPath, path,'')
712 slacapra 1.129 tar.add(path+exe,exe)
713 corvo 1.85 else :
714     tar.add(exeWithPath,os.path.basename(executable))
715 slacapra 1.61 pass
716     else:
717     # the exe is from release, we'll find it on WN
718     pass
719 ewv 1.131
720 slacapra 1.61 ## Now get the libraries: only those in local working area
721     libDir = 'lib'
722     lib = swArea+'/' +libDir
723     common.logger.debug(5,"lib "+lib+" to be tarred")
724     if os.path.exists(lib):
725     tar.add(lib,libDir)
726 ewv 1.131
727 slacapra 1.61 ## Now check if module dir is present
728     moduleDir = 'module'
729     module = swArea + '/' + moduleDir
730     if os.path.isdir(module):
731     tar.add(module,moduleDir)
732    
733     ## Now check if any data dir(s) is present
734     swAreaLen=len(swArea)
735 spiga 1.179 self.dataExist = False
736 slacapra 1.61 for root, dirs, files in os.walk(swArea):
737     if "data" in dirs:
738 spiga 1.179 self.dataExist=True
739 slacapra 1.61 common.logger.debug(5,"data "+root+"/data"+" to be tarred")
740     tar.add(root+"/data",root[swAreaLen:]+"/data")
741 spiga 1.179
742     ### CMSSW ParameterSet
743     if not self.pset is None:
744     cfg_file = common.work_space.jobDir()+self.configFilename()
745     tar.add(cfg_file,self.configFilename())
746     common.logger.debug(5,"File added to "+self.tgzNameWithPath+" : "+str(tar.getnames()))
747 slacapra 1.61
748 fanzago 1.93
749 fanzago 1.152 ## Add ProdCommon dir to tar
750 fanzago 1.93 prodcommonDir = 'ProdCommon'
751     prodcommonPath = os.environ['CRABDIR'] + '/' + 'ProdCommon'
752     if os.path.isdir(prodcommonPath):
753     tar.add(prodcommonPath,prodcommonDir)
754 spiga 1.179 common.logger.debug(5,"Files added to "+self.tgzNameWithPath+" : "+str(tar.getnames()))
755    
756     ##### ML stuff
757     ML_file_list=['report.py', 'DashboardAPI.py', 'Logger.py', 'ProcInfo.py', 'apmon.py']
758     path=os.environ['CRABDIR'] + '/python/'
759     for file in ML_file_list:
760     tar.add(path+file,file)
761     common.logger.debug(5,"Files added to "+self.tgzNameWithPath+" : "+str(tar.getnames()))
762    
763     ##### Utils
764     Utils_file_list=['parseCrabFjr.py','writeCfg.py', 'JobReportErrorCode.py']
765     for file in Utils_file_list:
766     tar.add(path+file,file)
767     common.logger.debug(5,"Files added to "+self.tgzNameWithPath+" : "+str(tar.getnames()))
768 ewv 1.131
769 spiga 1.179 ##### AdditionalFiles
770     for file in self.additional_inbox_files:
771     tar.add(file,string.split(file,'/')[-1])
772 slacapra 1.61 common.logger.debug(5,"Files added to "+self.tgzNameWithPath+" : "+str(tar.getnames()))
773 spiga 1.179
774 slacapra 1.61 tar.close()
775     except :
776     raise CrabException('Could not create tar-ball')
777 gutsche 1.72
778     ## check for tarball size
779     tarballinfo = os.stat(self.tgzNameWithPath)
780     if ( tarballinfo.st_size > self.MaxTarBallSize*1024*1024 ) :
781     raise CrabException('Input sandbox size of ' + str(float(tarballinfo.st_size)/1024.0/1024.0) + ' MB is larger than the allowed ' + str(self.MaxTarBallSize) + ' MB input sandbox limit and not supported by the used GRID submission system. Please make sure that no unnecessary files are in all data directories in your local CMSSW project area as they are automatically packed into the input sandbox.')
782    
783 slacapra 1.61 ## create tar-ball with ML stuff
784 slacapra 1.97
785 spiga 1.165 def wsSetupEnvironment(self, nj=0):
786 slacapra 1.1 """
787     Returns part of a job script which prepares
788     the execution environment for the job 'nj'.
789     """
790     # Prepare JobType-independent part
791 ewv 1.160 txt = '\n#Written by cms_cmssw::wsSetupEnvironment\n'
792 fanzago 1.133 txt += 'echo ">>> setup environment"\n'
793 ewv 1.131 txt += 'if [ $middleware == LCG ]; then \n'
794 gutsche 1.3 txt += self.wsSetupCMSLCGEnvironment_()
795     txt += 'elif [ $middleware == OSG ]; then\n'
796 gutsche 1.43 txt += ' WORKING_DIR=`/bin/mktemp -d $OSG_WN_TMP/cms_XXXXXXXXXXXX`\n'
797 ewv 1.132 txt += ' if [ ! $? == 0 ] ;then\n'
798 fanzago 1.161 txt += ' echo "ERROR ==> OSG $WORKING_DIR could not be created on WN `hostname`"\n'
799     txt += ' job_exit_code=10016\n'
800     txt += ' func_exit\n'
801 gutsche 1.3 txt += ' fi\n'
802 fanzago 1.133 txt += ' echo ">>> Created working directory: $WORKING_DIR"\n'
803 gutsche 1.3 txt += '\n'
804     txt += ' echo "Change to working directory: $WORKING_DIR"\n'
805     txt += ' cd $WORKING_DIR\n'
806 fanzago 1.133 txt += ' echo ">>> current directory (WORKING_DIR): $WORKING_DIR"\n'
807 ewv 1.131 txt += self.wsSetupCMSOSGEnvironment_()
808 fanzago 1.133 #txt += ' echo "### Set SCRAM ARCH to ' + self.executable_arch + ' ###"\n'
809     #txt += ' export SCRAM_ARCH='+self.executable_arch+'\n'
810 gutsche 1.3 txt += 'fi\n'
811 slacapra 1.1
812     # Prepare JobType-specific part
813     scram = self.scram.commandName()
814     txt += '\n\n'
815 fanzago 1.133 txt += 'echo ">>> specific cmssw setup environment:"\n'
816     txt += 'echo "CMSSW_VERSION = '+self.version+'"\n'
817 slacapra 1.1 txt += scram+' project CMSSW '+self.version+'\n'
818     txt += 'status=$?\n'
819     txt += 'if [ $status != 0 ] ; then\n'
820 fanzago 1.161 txt += ' echo "ERROR ==> CMSSW '+self.version+' not found on `hostname`" \n'
821     txt += ' job_exit_code=10034\n'
822 fanzago 1.163 txt += ' func_exit\n'
823 slacapra 1.1 txt += 'fi \n'
824     txt += 'cd '+self.version+'\n'
825 fanzago 1.99 ########## FEDE FOR DBS2 ######################
826     txt += 'SOFTWARE_DIR=`pwd`\n'
827 fanzago 1.133 txt += 'echo ">>> current directory (SOFTWARE_DIR): $SOFTWARE_DIR" \n'
828 fanzago 1.99 ###############################################
829 slacapra 1.1 ### needed grep for bug in scramv1 ###
830     txt += 'eval `'+scram+' runtime -sh | grep -v SCRAMRT_LSB_JOBNAME`\n'
831     # Handle the arguments:
832     txt += "\n"
833 gutsche 1.7 txt += "## number of arguments (first argument always jobnumber)\n"
834 slacapra 1.1 txt += "\n"
835 spiga 1.165 txt += "if [ $nargs -lt "+str(self.argsList)+" ]\n"
836 slacapra 1.1 txt += "then\n"
837 fanzago 1.161 txt += " echo 'ERROR ==> Too few arguments' +$nargs+ \n"
838     txt += ' job_exit_code=50113\n'
839     txt += " func_exit\n"
840 slacapra 1.1 txt += "fi\n"
841     txt += "\n"
842    
843     # Prepare job-specific part
844     job = common.job_list[nj]
845 fanzago 1.93 ### FEDE FOR DBS OUTPUT PUBLICATION
846 ewv 1.131 if (self.datasetPath):
847 fanzago 1.93 txt += '\n'
848     txt += 'DatasetPath='+self.datasetPath+'\n'
849    
850     datasetpath_split = self.datasetPath.split("/")
851 ewv 1.131
852 fanzago 1.93 txt += 'PrimaryDataset='+datasetpath_split[1]+'\n'
853     txt += 'DataTier='+datasetpath_split[2]+'\n'
854 fanzago 1.96 txt += 'ApplicationFamily=cmsRun\n'
855 fanzago 1.93
856     else:
857     txt += 'DatasetPath=MCDataTier\n'
858     txt += 'PrimaryDataset=null\n'
859     txt += 'DataTier=null\n'
860     txt += 'ApplicationFamily=MCDataTier\n'
861 ewv 1.170 if self.pset != None:
862 spiga 1.42 pset = os.path.basename(job.configFilename())
863     txt += '\n'
864 spiga 1.95 txt += 'cp $RUNTIME_AREA/'+pset+' .\n'
865 spiga 1.42 if (self.datasetPath): # standard job
866 ewv 1.160 txt += 'InputFiles=${args[1]}; export InputFiles\n'
867     txt += 'MaxEvents=${args[2]}; export MaxEvents\n'
868     txt += 'SkipEvents=${args[3]}; export SkipEvents\n'
869 spiga 1.42 txt += 'echo "Inputfiles:<$InputFiles>"\n'
870     txt += 'echo "MaxEvents:<$MaxEvents>"\n'
871     txt += 'echo "SkipEvents:<$SkipEvents>"\n'
872     else: # pythia like job
873 ewv 1.160 txt += 'PreserveSeeds=' + ','.join(self.preserveSeeds) + '; export PreserveSeeds\n'
874     txt += 'IncrementSeeds=' + ','.join(self.incrementSeeds) + '; export IncrementSeeds\n'
875     txt += 'echo "PreserveSeeds: <$PreserveSeeds>"\n'
876     txt += 'echo "IncrementSeeds:<$IncrementSeeds>"\n'
877 slacapra 1.90 if (self.firstRun):
878 ewv 1.160 txt += 'FirstRun=${args[1]}; export FirstRun\n'
879 spiga 1.57 txt += 'echo "FirstRun: <$FirstRun>"\n'
880 slacapra 1.90
881     txt += 'mv -f '+pset+' pset.cfg\n'
882 slacapra 1.1
883    
884 fanzago 1.163 if self.pset != None:
885 spiga 1.42 txt += '\n'
886     txt += 'echo "***** cat pset.cfg *********"\n'
887     txt += 'cat pset.cfg\n'
888     txt += 'echo "****** end pset.cfg ********"\n'
889     txt += '\n'
890 fanzago 1.94 txt += 'PSETHASH=`EdmConfigHash < pset.cfg` \n'
891     txt += 'echo "PSETHASH = $PSETHASH" \n'
892 fanzago 1.93 txt += '\n'
893 gutsche 1.3 return txt
894 slacapra 1.176
895 fanzago 1.166 def wsUntarSoftware(self, nj=0):
896 gutsche 1.3 """
897     Put in the script the commands to build an executable
898     or a library.
899     """
900    
901 fanzago 1.166 txt = '\n#Written by cms_cmssw::wsUntarSoftware\n'
902 gutsche 1.3
903     if os.path.isfile(self.tgzNameWithPath):
904 fanzago 1.133 txt += 'echo ">>> tar xzvf $RUNTIME_AREA/'+os.path.basename(self.tgzNameWithPath)+' :" \n'
905 gutsche 1.3 txt += 'tar xzvf $RUNTIME_AREA/'+os.path.basename(self.tgzNameWithPath)+'\n'
906 spiga 1.179 txt += 'ls -Al \n'
907 gutsche 1.3 txt += 'untar_status=$? \n'
908     txt += 'if [ $untar_status -ne 0 ]; then \n'
909 fanzago 1.161 txt += ' echo "ERROR ==> Untarring .tgz file failed"\n'
910     txt += ' job_exit_code=$untar_status\n'
911     txt += ' func_exit\n'
912 gutsche 1.3 txt += 'else \n'
913     txt += ' echo "Successful untar" \n'
914     txt += 'fi \n'
915 gutsche 1.50 txt += '\n'
916 fanzago 1.152 txt += 'echo ">>> Include ProdCommon in PYTHONPATH:"\n'
917 gutsche 1.50 txt += 'if [ -z "$PYTHONPATH" ]; then\n'
918 fanzago 1.166 txt += ' export PYTHONPATH=$RUNTIME_AREA/ProdCommon\n'
919 gutsche 1.50 txt += 'else\n'
920 fanzago 1.166 txt += ' export PYTHONPATH=$RUNTIME_AREA/ProdCommon:${PYTHONPATH}\n'
921 fanzago 1.93 txt += 'echo "PYTHONPATH=$PYTHONPATH"\n'
922 gutsche 1.50 txt += 'fi\n'
923     txt += '\n'
924    
925 gutsche 1.3 pass
926 ewv 1.131
927 slacapra 1.1 return txt
928 ewv 1.170
929 fanzago 1.166 def wsBuildExe(self, nj=0):
930     """
931     Put in the script the commands to build an executable
932     or a library.
933     """
934    
935     txt = '\n#Written by cms_cmssw::wsBuildExe\n'
936     txt += 'echo ">>> moving CMSSW software directories in `pwd`" \n'
937    
938 ewv 1.170 txt += 'rm -r lib/ module/ \n'
939     txt += 'mv $RUNTIME_AREA/lib/ . \n'
940     txt += 'mv $RUNTIME_AREA/module/ . \n'
941 spiga 1.179 if self.dataExist == True: txt += 'mv $RUNTIME_AREA/src/ . \n'
942     if len(self.additional_inbox_files)>0:
943     for file in self.additional_inbox_files:
944     txt += 'mv $RUNTIME_AREA/'+file+' . \n'
945 ewv 1.170 txt += 'mv $RUNTIME_AREA/ProdCommon/ . \n'
946    
947 fanzago 1.166 txt += 'if [ -z "$PYTHONPATH" ]; then\n'
948     txt += ' export PYTHONPATH=$SOFTWARE_DIR/ProdCommon\n'
949     txt += 'else\n'
950     txt += ' export PYTHONPATH=$SOFTWARE_DIR/ProdCommon:${PYTHONPATH}\n'
951     txt += 'echo "PYTHONPATH=$PYTHONPATH"\n'
952     txt += 'fi\n'
953     txt += '\n'
954    
955     return txt
956 slacapra 1.1
957     def modifySteeringCards(self, nj):
958     """
959 ewv 1.131 modify the card provided by the user,
960 slacapra 1.1 writing a new card into share dir
961     """
962 ewv 1.131
963 slacapra 1.1 def executableName(self):
964 slacapra 1.70 if self.scriptExe: #CarlosDaniele
965 spiga 1.42 return "sh "
966     else:
967     return self.executable
968 slacapra 1.1
969     def executableArgs(self):
970 ewv 1.160 # FUTURE: This function tests the CMSSW version. Can be simplified as we drop support for old versions
971 slacapra 1.70 if self.scriptExe:#CarlosDaniele
972 spiga 1.42 return self.scriptExe + " $NJob"
973 fanzago 1.115 else:
974     version_array = self.scram.getSWVersion().split('_')
975     major = 0
976     minor = 0
977     try:
978     major = int(version_array[1])
979     minor = int(version_array[2])
980     except:
981 ewv 1.131 msg = "Cannot parse CMSSW version string: " + "_".join(version_array) + " for major and minor release number!"
982 fanzago 1.115 raise CrabException(msg)
983 ewv 1.160
984     ex_args = ""
985 ewv 1.171 # FUTURE: This tests the CMSSW version. Can remove code as versions deprecated
986 ewv 1.160 # Framework job report
987 fanzago 1.115 if major >= 1 and minor >= 5 :
988 fanzago 1.166 ex_args += " -j $RUNTIME_AREA/crab_fjr_$NJob.xml"
989 ewv 1.160 # Type of cfg file
990     if major >= 2 :
991 ewv 1.171 ex_args += " -p pset.py"
992 fanzago 1.115 else:
993 ewv 1.160 ex_args += " -p pset.cfg"
994     return ex_args
995 slacapra 1.1
996     def inputSandbox(self, nj):
997     """
998     Returns a list of filenames to be put in JDL input sandbox.
999     """
1000     inp_box = []
1001 slacapra 1.53 # # dict added to delete duplicate from input sandbox file list
1002     # seen = {}
1003 slacapra 1.1 ## code
1004     if os.path.isfile(self.tgzNameWithPath):
1005     inp_box.append(self.tgzNameWithPath)
1006 spiga 1.168 wrapper = os.path.basename(str(common._db.queryTask('scriptName')))
1007     inp_box.append(common.work_space.pathForTgz() +'job/'+ wrapper)
1008 slacapra 1.1 return inp_box
1009    
1010     def outputSandbox(self, nj):
1011     """
1012     Returns a list of filenames to be put in JDL output sandbox.
1013     """
1014     out_box = []
1015    
1016     ## User Declared output files
1017 slacapra 1.54 for out in (self.output_file+self.output_file_sandbox):
1018 ewv 1.131 n_out = nj + 1
1019 slacapra 1.1 out_box.append(self.numberFile_(out,str(n_out)))
1020     return out_box
1021    
1022     def prepareSteeringCards(self):
1023     """
1024     Make initial modifications of the user's steering card file.
1025     """
1026     return
1027    
1028     def wsRenameOutput(self, nj):
1029     """
1030     Returns part of a job script which renames the produced files.
1031     """
1032    
1033 ewv 1.160 txt = '\n#Written by cms_cmssw::wsRenameOutput\n'
1034 fanzago 1.148 txt += 'echo ">>> current directory (SOFTWARE_DIR): $SOFTWARE_DIR" \n'
1035     txt += 'echo ">>> current directory content:"\n'
1036 gutsche 1.7 txt += 'ls \n'
1037 fanzago 1.145 txt += '\n'
1038 slacapra 1.54
1039 fanzago 1.128 for fileWithSuffix in (self.output_file):
1040 slacapra 1.1 output_file_num = self.numberFile_(fileWithSuffix, '$NJob')
1041     txt += '\n'
1042 gutsche 1.7 txt += '# check output file\n'
1043 slacapra 1.106 txt += 'if [ -e ./'+fileWithSuffix+' ] ; then\n'
1044 ewv 1.147 if (self.copy_data == 1): # For OSG nodes, file is in $WORKING_DIR, should not be moved to $RUNTIME_AREA
1045     txt += ' mv '+fileWithSuffix+' '+output_file_num+'\n'
1046     txt += ' ln -s `pwd`/'+output_file_num+' $RUNTIME_AREA/'+fileWithSuffix+'\n'
1047     else:
1048     txt += ' mv '+fileWithSuffix+' $RUNTIME_AREA/'+output_file_num+'\n'
1049     txt += ' ln -s $RUNTIME_AREA/'+output_file_num+' $RUNTIME_AREA/'+fileWithSuffix+'\n'
1050 slacapra 1.106 txt += 'else\n'
1051 fanzago 1.161 txt += ' job_exit_code=60302\n'
1052     txt += ' echo "WARNING: Output file '+fileWithSuffix+' not found"\n'
1053 ewv 1.156 if common.scheduler.name().upper() == 'CONDOR_G':
1054 gutsche 1.7 txt += ' if [ $middleware == OSG ]; then \n'
1055     txt += ' echo "prepare dummy output file"\n'
1056     txt += ' echo "Processing of job output failed" > $RUNTIME_AREA/'+output_file_num+'\n'
1057     txt += ' fi \n'
1058 slacapra 1.1 txt += 'fi\n'
1059 slacapra 1.105 file_list = []
1060     for fileWithSuffix in (self.output_file):
1061     file_list.append(self.numberFile_(fileWithSuffix, '$NJob'))
1062 ewv 1.131
1063 slacapra 1.105 txt += 'file_list="'+string.join(file_list,' ')+'"\n'
1064 fanzago 1.149 txt += '\n'
1065 fanzago 1.148 txt += 'echo ">>> current directory (SOFTWARE_DIR): $SOFTWARE_DIR" \n'
1066     txt += 'echo ">>> current directory content:"\n'
1067     txt += 'ls \n'
1068     txt += '\n'
1069 gutsche 1.7 txt += 'cd $RUNTIME_AREA\n'
1070 fanzago 1.133 txt += 'echo ">>> current directory (RUNTIME_AREA): $RUNTIME_AREA"\n'
1071 slacapra 1.1 return txt
1072    
1073     def numberFile_(self, file, txt):
1074     """
1075     append _'txt' before last extension of a file
1076     """
1077     p = string.split(file,".")
1078     # take away last extension
1079     name = p[0]
1080     for x in p[1:-1]:
1081 slacapra 1.90 name=name+"."+x
1082 slacapra 1.1 # add "_txt"
1083     if len(p)>1:
1084 slacapra 1.90 ext = p[len(p)-1]
1085     result = name + '_' + txt + "." + ext
1086 slacapra 1.1 else:
1087 slacapra 1.90 result = name + '_' + txt
1088 ewv 1.131
1089 slacapra 1.1 return result
1090    
1091 slacapra 1.63 def getRequirements(self, nj=[]):
1092 slacapra 1.1 """
1093 ewv 1.131 return job requirements to add to jdl files
1094 slacapra 1.1 """
1095     req = ''
1096 slacapra 1.47 if self.version:
1097 slacapra 1.10 req='Member("VO-cms-' + \
1098 slacapra 1.47 self.version + \
1099 slacapra 1.10 '", other.GlueHostApplicationSoftwareRunTimeEnvironment)'
1100 farinafa 1.111 ## SL add requirement for OS version only if SL4
1101     #reSL4 = re.compile( r'slc4' )
1102 slacapra 1.109 if self.executable_arch: # and reSL4.search(self.executable_arch):
1103 gutsche 1.107 req+=' && Member("VO-cms-' + \
1104 slacapra 1.105 self.executable_arch + \
1105     '", other.GlueHostApplicationSoftwareRunTimeEnvironment)'
1106 gutsche 1.35
1107     req = req + ' && (other.GlueHostNetworkAdapterOutboundIP)'
1108 afanfani 1.158 if common.scheduler.name() == "glitecoll":
1109     req += ' && other.GlueCEStateStatus == "Production" '
1110 gutsche 1.35
1111 slacapra 1.1 return req
1112 gutsche 1.3
1113     def configFilename(self):
1114     """ return the config filename """
1115     return self.name()+'.cfg'
1116    
1117     def wsSetupCMSOSGEnvironment_(self):
1118     """
1119     Returns part of a job script which is prepares
1120     the execution environment and which is common for all CMS jobs.
1121     """
1122 ewv 1.160 txt = '\n#Written by cms_cmssw::wsSetupCMSOSGEnvironment_\n'
1123     txt += ' echo ">>> setup CMS OSG environment:"\n'
1124 fanzago 1.133 txt += ' echo "set SCRAM ARCH to ' + self.executable_arch + '"\n'
1125     txt += ' export SCRAM_ARCH='+self.executable_arch+'\n'
1126 fanzago 1.136 txt += ' echo "SCRAM_ARCH = $SCRAM_ARCH"\n'
1127 ewv 1.135 txt += ' if [ -f $OSG_APP/cmssoft/cms/cmsset_default.sh ] ;then\n'
1128 mkirn 1.40 txt += ' # Use $OSG_APP/cmssoft/cms/cmsset_default.sh to setup cms software\n'
1129 fanzago 1.133 txt += ' source $OSG_APP/cmssoft/cms/cmsset_default.sh '+self.version+'\n'
1130     txt += ' else\n'
1131 fanzago 1.161 txt += ' echo "ERROR ==> $OSG_APP/cmssoft/cms/cmsset_default.sh file not found"\n'
1132     txt += ' job_exit_code=10020\n'
1133     txt += ' func_exit\n'
1134 fanzago 1.133 txt += ' fi\n'
1135 gutsche 1.3 txt += '\n'
1136 fanzago 1.161 txt += ' echo "==> setup cms environment ok"\n'
1137 fanzago 1.136 txt += ' echo "SCRAM_ARCH = $SCRAM_ARCH"\n'
1138 gutsche 1.3
1139     return txt
1140 ewv 1.131
1141 gutsche 1.3 ### OLI_DANIELE
1142     def wsSetupCMSLCGEnvironment_(self):
1143     """
1144     Returns part of a job script which is prepares
1145     the execution environment and which is common for all CMS jobs.
1146     """
1147 ewv 1.160 txt = '\n#Written by cms_cmssw::wsSetupCMSLCGEnvironment_\n'
1148     txt += ' echo ">>> setup CMS LCG environment:"\n'
1149 fanzago 1.133 txt += ' echo "set SCRAM ARCH and BUILD_ARCH to ' + self.executable_arch + ' ###"\n'
1150     txt += ' export SCRAM_ARCH='+self.executable_arch+'\n'
1151     txt += ' export BUILD_ARCH='+self.executable_arch+'\n'
1152     txt += ' if [ ! $VO_CMS_SW_DIR ] ;then\n'
1153 fanzago 1.161 txt += ' echo "ERROR ==> CMS software dir not found on WN `hostname`"\n'
1154     txt += ' job_exit_code=10031\n'
1155     txt += ' func_exit\n'
1156 fanzago 1.133 txt += ' else\n'
1157     txt += ' echo "Sourcing environment... "\n'
1158     txt += ' if [ ! -s $VO_CMS_SW_DIR/cmsset_default.sh ] ;then\n'
1159 fanzago 1.161 txt += ' echo "ERROR ==> cmsset_default.sh file not found into dir $VO_CMS_SW_DIR"\n'
1160     txt += ' job_exit_code=10020\n'
1161     txt += ' func_exit\n'
1162 fanzago 1.133 txt += ' fi\n'
1163     txt += ' echo "sourcing $VO_CMS_SW_DIR/cmsset_default.sh"\n'
1164     txt += ' source $VO_CMS_SW_DIR/cmsset_default.sh\n'
1165     txt += ' result=$?\n'
1166     txt += ' if [ $result -ne 0 ]; then\n'
1167 fanzago 1.161 txt += ' echo "ERROR ==> problem sourcing $VO_CMS_SW_DIR/cmsset_default.sh"\n'
1168     txt += ' job_exit_code=10032\n'
1169     txt += ' func_exit\n'
1170 fanzago 1.133 txt += ' fi\n'
1171     txt += ' fi\n'
1172     txt += ' \n'
1173 fanzago 1.161 txt += ' echo "==> setup cms environment ok"\n'
1174 gutsche 1.3 return txt
1175 gutsche 1.5
1176 ewv 1.131 ### FEDE FOR DBS OUTPUT PUBLICATION
1177 fanzago 1.93 def modifyReport(self, nj):
1178     """
1179 ewv 1.131 insert the part of the script that modifies the FrameworkJob Report
1180 fanzago 1.93 """
1181 fanzago 1.94
1182 ewv 1.160 txt = '\n#Written by cms_cmssw::modifyReport\n'
1183 slacapra 1.176 publish_data = int(self.cfg_params.get('USER.publish_data',0))
1184 ewv 1.131 if (publish_data == 1):
1185 fanzago 1.94 processedDataset = self.cfg_params['USER.publish_data_name']
1186 fanzago 1.173 LFNBaseName = LFNBase(processedDataset)
1187 fanzago 1.175
1188     txt += 'if [ $copy_exit_status -eq 0 ]; then\n'
1189 fanzago 1.173 txt += ' FOR_LFN=%s_${PSETHASH}/\n'%(LFNBaseName)
1190 fanzago 1.175 txt += 'else\n'
1191     txt += ' FOR_LFN=/copy_problems/ \n'
1192     txt += ' SE=""\n'
1193     txt += ' SE_PATH=""\n'
1194     txt += 'fi\n'
1195    
1196     txt += 'echo ">>> Modify Job Report:" \n'
1197     txt += 'chmod a+x $SOFTWARE_DIR/ProdCommon/ProdCommon/FwkJobRep/ModifyJobReport.py\n'
1198     txt += 'ProcessedDataset='+processedDataset+'\n'
1199     txt += 'echo "ProcessedDataset = $ProcessedDataset"\n'
1200     txt += 'echo "SE = $SE"\n'
1201     txt += 'echo "SE_PATH = $SE_PATH"\n'
1202     txt += 'echo "FOR_LFN = $FOR_LFN" \n'
1203     txt += 'echo "CMSSW_VERSION = $CMSSW_VERSION"\n\n'
1204     txt += 'echo "$SOFTWARE_DIR/ProdCommon/ProdCommon/FwkJobRep/ModifyJobReport.py $RUNTIME_AREA/crab_fjr_$NJob.xml $NJob $FOR_LFN $PrimaryDataset $DataTier $ProcessedDataset $ApplicationFamily $executable $CMSSW_VERSION $PSETHASH $SE $SE_PATH"\n'
1205     txt += '$SOFTWARE_DIR/ProdCommon/ProdCommon/FwkJobRep/ModifyJobReport.py $RUNTIME_AREA/crab_fjr_$NJob.xml $NJob $FOR_LFN $PrimaryDataset $DataTier $ProcessedDataset $ApplicationFamily $executable $CMSSW_VERSION $PSETHASH $SE $SE_PATH\n'
1206     txt += 'modifyReport_result=$?\n'
1207     txt += 'if [ $modifyReport_result -ne 0 ]; then\n'
1208     txt += ' modifyReport_result=70500\n'
1209     txt += ' job_exit_code=$modifyReport_result\n'
1210     txt += ' echo "ModifyReportResult=$modifyReport_result" | tee -a $RUNTIME_AREA/$repo\n'
1211     txt += ' echo "WARNING: Problem with ModifyJobReport"\n'
1212     txt += 'else\n'
1213     txt += ' mv NewFrameworkJobReport.xml $RUNTIME_AREA/crab_fjr_$NJob.xml\n'
1214 spiga 1.103 txt += 'fi\n'
1215 fanzago 1.93 return txt
1216 fanzago 1.99
1217 gutsche 1.5 def setParam_(self, param, value):
1218     self._params[param] = value
1219    
1220     def getParams(self):
1221     return self._params
1222 gutsche 1.8
1223 gutsche 1.35 def uniquelist(self, old):
1224     """
1225     remove duplicates from a list
1226     """
1227     nd={}
1228     for e in old:
1229     nd[e]=0
1230     return nd.keys()
1231 mcinquil 1.121
1232 spiga 1.169 def outList(self):
1233 mcinquil 1.121 """
1234     check the dimension of the output files
1235     """
1236 spiga 1.169 txt = ''
1237     txt += 'echo ">>> list of expected files on output sandbox"\n'
1238 mcinquil 1.121 listOutFiles = []
1239 ewv 1.170 stdout = 'CMSSW_$NJob.stdout'
1240 spiga 1.169 stderr = 'CMSSW_$NJob.stderr'
1241 fanzago 1.148 if (self.return_data == 1):
1242 spiga 1.157 for file in (self.output_file+self.output_file_sandbox):
1243     listOutFiles.append(self.numberFile_(file, '$NJob'))
1244 spiga 1.169 listOutFiles.append(stdout)
1245     listOutFiles.append(stderr)
1246 ewv 1.156 else:
1247 spiga 1.157 for file in (self.output_file_sandbox):
1248     listOutFiles.append(self.numberFile_(file, '$NJob'))
1249 spiga 1.169 listOutFiles.append(stdout)
1250     listOutFiles.append(stderr)
1251 fanzago 1.161 txt += 'echo "output files: '+string.join(listOutFiles,' ')+'"\n'
1252 spiga 1.157 txt += 'filesToCheck="'+string.join(listOutFiles,' ')+'"\n'
1253 spiga 1.169 txt += 'export filesToCheck\n'
1254 ewv 1.170 return txt