ViewVC Help
View File | Revision Log | Show Annotations | Root Listing
root/cvsroot/COMP/CRAB/python/cms_cmssw.py
Revision: 1.183
Committed: Wed Apr 30 18:21:07 2008 UTC (17 years ago) by spiga
Content type: text/x-python
Branch: MAIN
CVS Tags: CRAB_2_2_0_pre15
Changes since 1.182: +1 -1 lines
Log Message:
changed variable major --> self.major_version

File Contents

# User Rev Content
1 slacapra 1.1 from JobType import JobType
2     from crab_logger import Logger
3     from crab_exceptions import *
4     from crab_util import *
5 fanzago 1.115 from BlackWhiteListParser import BlackWhiteListParser
6 slacapra 1.1 import common
7     import Scram
8 fanzago 1.173 from LFNBaseName import *
9 slacapra 1.1
10 slacapra 1.105 import os, string, glob
11 slacapra 1.1
12     class Cmssw(JobType):
13 mcinquil 1.144 def __init__(self, cfg_params, ncjobs):
14 slacapra 1.1 JobType.__init__(self, 'CMSSW')
15     common.logger.debug(3,'CMSSW::__init__')
16    
17 mcinquil 1.140 self.argsList = []
18 mcinquil 1.144
19 gutsche 1.3 self._params = {}
20     self.cfg_params = cfg_params
21 fanzago 1.115 # init BlackWhiteListParser
22     self.blackWhiteListParser = BlackWhiteListParser(cfg_params)
23    
24 slacapra 1.153 self.MaxTarBallSize = float(self.cfg_params.get('EDG.maxtarballsize',9.5))
25 gutsche 1.72
26 gutsche 1.44 # number of jobs requested to be created, limit obj splitting
27 gutsche 1.38 self.ncjobs = ncjobs
28    
29 slacapra 1.1 log = common.logger
30 ewv 1.131
31 slacapra 1.1 self.scram = Scram.Scram(cfg_params)
32     self.additional_inbox_files = []
33     self.scriptExe = ''
34     self.executable = ''
35 slacapra 1.71 self.executable_arch = self.scram.getArch()
36 slacapra 1.1 self.tgz_name = 'default.tgz'
37 corvo 1.56 self.scriptName = 'CMSSW.sh'
38 ewv 1.131 self.pset = '' #scrip use case Da
39 spiga 1.42 self.datasetPath = '' #scrip use case Da
40 gutsche 1.3
41 gutsche 1.50 # set FJR file name
42     self.fjrFileName = 'crab_fjr.xml'
43    
44 slacapra 1.1 self.version = self.scram.getSWVersion()
45 ewv 1.182 version_array = self.version.split('_')
46     self.major_version = 0
47     self.minor_version = 0
48     try:
49     self.major_version = int(version_array[1])
50     self.minor_version = int(version_array[2])
51     except:
52     msg = "Cannot parse CMSSW version string: " + "_".join(version_array) + " for major and minor release number!"
53     raise CrabException(msg)
54    
55 ewv 1.131
56 spiga 1.114 #
57     # Try to block creation in case of arch/version mismatch
58     #
59    
60 spiga 1.162 # a = string.split(self.version, "_")
61     #
62     # if int(a[1]) == 1 and (int(a[2]) < 5 and self.executable_arch.find('slc4') == 0):
63     # msg = "Warning: You are using %s version of CMSSW with %s architecture. \n--> Did you compile your libraries with SLC3? Otherwise you can find some problems running on SLC4 Grid nodes.\n"%(self.version, self.executable_arch)
64     # common.logger.message(msg)
65     # if int(a[1]) == 1 and (int(a[2]) >= 5 and self.executable_arch.find('slc3') == 0):
66     # msg = "Error: CMS does not support %s with %s architecture"%(self.version, self.executable_arch)
67     # raise CrabException(msg)
68     #
69 ewv 1.170
70 slacapra 1.47
71 slacapra 1.1 ### collect Data cards
72 gutsche 1.66
73 slacapra 1.153 if not cfg_params.has_key('CMSSW.datasetpath'):
74 ewv 1.131 msg = "Error: datasetpath not defined "
75 slacapra 1.1 raise CrabException(msg)
76 slacapra 1.153 tmp = cfg_params['CMSSW.datasetpath']
77     log.debug(6, "CMSSW::CMSSW(): datasetPath = "+tmp)
78     if string.lower(tmp)=='none':
79     self.datasetPath = None
80     self.selectNoInput = 1
81     else:
82     self.datasetPath = tmp
83     self.selectNoInput = 0
84 gutsche 1.5
85 slacapra 1.1 self.dataTiers = []
86    
87     ## now the application
88 slacapra 1.153 self.executable = cfg_params.get('CMSSW.executable','cmsRun')
89     log.debug(6, "CMSSW::CMSSW(): executable = "+self.executable)
90 slacapra 1.1
91 slacapra 1.153 if not cfg_params.has_key('CMSSW.pset'):
92 slacapra 1.1 raise CrabException("PSet file missing. Cannot run cmsRun ")
93 slacapra 1.153 self.pset = cfg_params['CMSSW.pset']
94     log.debug(6, "Cmssw::Cmssw(): PSet file = "+self.pset)
95     if self.pset.lower() != 'none' :
96     if (not os.path.exists(self.pset)):
97     raise CrabException("User defined PSet file "+self.pset+" does not exist")
98     else:
99     self.pset = None
100 slacapra 1.1
101     # output files
102 slacapra 1.53 ## stuff which must be returned always via sandbox
103     self.output_file_sandbox = []
104    
105     # add fjr report by default via sandbox
106     self.output_file_sandbox.append(self.fjrFileName)
107    
108     # other output files to be returned via sandbox or copied to SE
109 slacapra 1.153 self.output_file = []
110     tmp = cfg_params.get('CMSSW.output_file',None)
111     if tmp :
112     tmpOutFiles = string.split(tmp,',')
113     log.debug(7, 'cmssw::cmssw(): output files '+str(tmpOutFiles))
114     for tmp in tmpOutFiles:
115     tmp=string.strip(tmp)
116     self.output_file.append(tmp)
117 slacapra 1.1 pass
118 slacapra 1.153 else:
119 gutsche 1.92 log.message("No output file defined: only stdout/err and the CRAB Framework Job Report will be available\n")
120 slacapra 1.153 pass
121 slacapra 1.1
122     # script_exe file as additional file in inputSandbox
123 slacapra 1.153 self.scriptExe = cfg_params.get('USER.script_exe',None)
124     if self.scriptExe :
125 slacapra 1.176 if not os.path.isfile(self.scriptExe):
126     msg ="ERROR. file "+self.scriptExe+" not found"
127     raise CrabException(msg)
128     self.additional_inbox_files.append(string.strip(self.scriptExe))
129 slacapra 1.70
130 spiga 1.42 #CarlosDaniele
131     if self.datasetPath == None and self.pset == None and self.scriptExe == '' :
132 slacapra 1.176 msg ="Error. script_exe not defined"
133     raise CrabException(msg)
134 spiga 1.42
135 slacapra 1.1 ## additional input files
136 slacapra 1.153 if cfg_params.has_key('USER.additional_input_files'):
137 slacapra 1.29 tmpAddFiles = string.split(cfg_params['USER.additional_input_files'],',')
138 slacapra 1.70 for tmp in tmpAddFiles:
139     tmp = string.strip(tmp)
140     dirname = ''
141     if not tmp[0]=="/": dirname = "."
142 corvo 1.85 files = []
143     if string.find(tmp,"*")>-1:
144     files = glob.glob(os.path.join(dirname, tmp))
145     if len(files)==0:
146     raise CrabException("No additional input file found with this pattern: "+tmp)
147     else:
148     files.append(tmp)
149 slacapra 1.70 for file in files:
150     if not os.path.exists(file):
151     raise CrabException("Additional input file not found: "+file)
152 slacapra 1.45 pass
153 slacapra 1.105 # fname = string.split(file, '/')[-1]
154     # storedFile = common.work_space.pathForTgz()+'share/'+fname
155     # shutil.copyfile(file, storedFile)
156     self.additional_inbox_files.append(string.strip(file))
157 slacapra 1.1 pass
158     pass
159 slacapra 1.70 common.logger.debug(5,"Additional input files: "+str(self.additional_inbox_files))
160 slacapra 1.153 pass
161 gutsche 1.3
162 slacapra 1.9 ## Events per job
163 slacapra 1.153 if cfg_params.has_key('CMSSW.events_per_job'):
164 slacapra 1.10 self.eventsPerJob =int( cfg_params['CMSSW.events_per_job'])
165 slacapra 1.9 self.selectEventsPerJob = 1
166 slacapra 1.153 else:
167 slacapra 1.9 self.eventsPerJob = -1
168     self.selectEventsPerJob = 0
169 ewv 1.131
170 slacapra 1.22 ## number of jobs
171 slacapra 1.153 if cfg_params.has_key('CMSSW.number_of_jobs'):
172 slacapra 1.22 self.theNumberOfJobs =int( cfg_params['CMSSW.number_of_jobs'])
173     self.selectNumberOfJobs = 1
174 slacapra 1.153 else:
175 slacapra 1.22 self.theNumberOfJobs = 0
176     self.selectNumberOfJobs = 0
177 slacapra 1.10
178 slacapra 1.153 if cfg_params.has_key('CMSSW.total_number_of_events'):
179 gutsche 1.35 self.total_number_of_events = int(cfg_params['CMSSW.total_number_of_events'])
180     self.selectTotalNumberEvents = 1
181 slacapra 1.153 else:
182 gutsche 1.35 self.total_number_of_events = 0
183     self.selectTotalNumberEvents = 0
184    
185 ewv 1.131 if self.pset != None: #CarlosDaniele
186 spiga 1.42 if ( (self.selectTotalNumberEvents + self.selectEventsPerJob + self.selectNumberOfJobs) != 2 ):
187     msg = 'Must define exactly two of total_number_of_events, events_per_job, or number_of_jobs.'
188     raise CrabException(msg)
189     else:
190     if (self.selectNumberOfJobs == 0):
191     msg = 'Must specify number_of_jobs.'
192     raise CrabException(msg)
193 gutsche 1.35
194 ewv 1.160 ## New method of dealing with seeds
195     self.incrementSeeds = []
196     self.preserveSeeds = []
197     if cfg_params.has_key('CMSSW.preserve_seeds'):
198     tmpList = cfg_params['CMSSW.preserve_seeds'].split(',')
199     for tmp in tmpList:
200     tmp.strip()
201     self.preserveSeeds.append(tmp)
202     if cfg_params.has_key('CMSSW.increment_seeds'):
203     tmpList = cfg_params['CMSSW.increment_seeds'].split(',')
204     for tmp in tmpList:
205     tmp.strip()
206     self.incrementSeeds.append(tmp)
207    
208     ## Old method of dealing with seeds
209     ## FUTURE: This is for old CMSSW and old CRAB. Can throw exceptions after a couple of CRAB releases and then
210     ## remove
211 slacapra 1.153 self.sourceSeed = cfg_params.get('CMSSW.pythia_seed',None)
212 ewv 1.160 if self.sourceSeed:
213 slacapra 1.177 print "pythia_seed is a deprecated parameter. Use preserve_seeds or increment_seeds in the future.\n","Added to increment_seeds."
214     self.incrementSeeds.append('sourceSeed')
215 slacapra 1.153
216     self.sourceSeedVtx = cfg_params.get('CMSSW.vtx_seed',None)
217 ewv 1.160 if self.sourceSeedVtx:
218 slacapra 1.177 print "vtx_seed is a deprecated parameter. Use preserve_seeds or increment_seeds in the future.\n","Added to increment_seeds."
219     self.incrementSeeds.append('VtxSmeared')
220 slacapra 1.22
221 slacapra 1.153 self.sourceSeedG4 = cfg_params.get('CMSSW.g4_seed',None)
222 ewv 1.160 if self.sourceSeedG4:
223 slacapra 1.177 print "g4_seed is a deprecated parameter. Use preserve_seeds or increment_seeds in the future.\n","Added to increment_seeds."
224     self.incrementSeeds.append('g4SimHits')
225 slacapra 1.90
226 slacapra 1.153 self.sourceSeedMix = cfg_params.get('CMSSW.mix_seed',None)
227 ewv 1.160 if self.sourceSeedMix:
228 slacapra 1.177 print "mix_seed is a deprecated parameter. Use preserve_seeds or increment_seeds in the future.\n","Added to increment_seeds."
229     self.incrementSeeds.append('mix')
230 slacapra 1.90
231 slacapra 1.153 self.firstRun = cfg_params.get('CMSSW.first_run',None)
232 slacapra 1.90
233 spiga 1.42 if self.pset != None: #CarlosDaniele
234 ewv 1.131 import PsetManipulator as pp
235 slacapra 1.97 PsetEdit = pp.PsetManipulator(self.pset) #Daniele Pset
236 gutsche 1.3
237 ewv 1.147 # Copy/return
238    
239 slacapra 1.153 self.copy_data = int(cfg_params.get('USER.copy_data',0))
240     self.return_data = int(cfg_params.get('USER.return_data',0))
241 ewv 1.147
242 slacapra 1.1 #DBSDLS-start
243 ewv 1.131 ## Initialize the variables that are extracted from DBS/DLS and needed in other places of the code
244 slacapra 1.1 self.maxEvents=0 # max events available ( --> check the requested nb. of evts in Creator.py)
245     self.DBSPaths={} # all dbs paths requested ( --> input to the site local discovery script)
246 gutsche 1.35 self.jobDestination=[] # Site destination(s) for each job (list of lists)
247 slacapra 1.1 ## Perform the data location and discovery (based on DBS/DLS)
248 slacapra 1.9 ## SL: Don't if NONE is specified as input (pythia use case)
249 gutsche 1.35 blockSites = {}
250 slacapra 1.9 if self.datasetPath:
251 gutsche 1.35 blockSites = self.DataDiscoveryAndLocation(cfg_params)
252 ewv 1.131 #DBSDLS-end
253 slacapra 1.1
254 ewv 1.131
255 slacapra 1.9 ## Select Splitting
256 ewv 1.131 if self.selectNoInput:
257 spiga 1.42 if self.pset == None: #CarlosDaniele
258     self.jobSplittingForScript()
259     else:
260     self.jobSplittingNoInput()
261 gutsche 1.92 else:
262 corvo 1.56 self.jobSplittingByBlocks(blockSites)
263 gutsche 1.5
264 slacapra 1.22 # modify Pset
265 spiga 1.42 if self.pset != None: #CarlosDaniele
266 slacapra 1.86 try:
267 ewv 1.160 # Add FrameworkJobReport to parameter-set, set max events.
268     # Reset later for data jobs by writeCFG which does all modifications
269 ewv 1.182 PsetEdit.addCrabFJR(self.fjrFileName) # FUTURE: Job report addition not needed by CMSSW>1.5
270 ewv 1.160 PsetEdit.maxEvent(self.eventsPerJob)
271 slacapra 1.90 PsetEdit.psetWriter(self.configFilename())
272 slacapra 1.86 except:
273     msg='Error while manipuliating ParameterSet: exiting...'
274     raise CrabException(msg)
275 spiga 1.179 self.tgzNameWithPath = self.getTarBall(self.executable)
276 gutsche 1.3
277 slacapra 1.1 def DataDiscoveryAndLocation(self, cfg_params):
278    
279 slacapra 1.86 import DataDiscovery
280     import DataLocation
281 gutsche 1.3 common.logger.debug(10,"CMSSW::DataDiscoveryAndLocation()")
282    
283     datasetPath=self.datasetPath
284    
285 slacapra 1.1 ## Contact the DBS
286 gutsche 1.92 common.logger.message("Contacting Data Discovery Services ...")
287 slacapra 1.1 try:
288 slacapra 1.137 self.pubdata=DataDiscovery.DataDiscovery(datasetPath, cfg_params)
289 slacapra 1.1 self.pubdata.fetchDBSInfo()
290    
291 slacapra 1.41 except DataDiscovery.NotExistingDatasetError, ex :
292 slacapra 1.1 msg = 'ERROR ***: failed Data Discovery in DBS : %s'%ex.getErrorMessage()
293     raise CrabException(msg)
294 slacapra 1.41 except DataDiscovery.NoDataTierinProvenanceError, ex :
295 slacapra 1.1 msg = 'ERROR ***: failed Data Discovery in DBS : %s'%ex.getErrorMessage()
296     raise CrabException(msg)
297 slacapra 1.41 except DataDiscovery.DataDiscoveryError, ex:
298 gutsche 1.66 msg = 'ERROR ***: failed Data Discovery in DBS : %s'%ex.getErrorMessage()
299 slacapra 1.1 raise CrabException(msg)
300    
301 gutsche 1.35 self.filesbyblock=self.pubdata.getFiles()
302 mkirn 1.37 self.eventsbyblock=self.pubdata.getEventsPerBlock()
303     self.eventsbyfile=self.pubdata.getEventsPerFile()
304 gutsche 1.3
305 slacapra 1.1 ## get max number of events
306 ewv 1.131 self.maxEvents=self.pubdata.getMaxEvents() ## self.maxEvents used in Creator.py
307 slacapra 1.1
308     ## Contact the DLS and build a list of sites hosting the fileblocks
309     try:
310 slacapra 1.41 dataloc=DataLocation.DataLocation(self.filesbyblock.keys(),cfg_params)
311 gutsche 1.6 dataloc.fetchDLSInfo()
312 slacapra 1.41 except DataLocation.DataLocationError , ex:
313 slacapra 1.1 msg = 'ERROR ***: failed Data Location in DLS \n %s '%ex.getErrorMessage()
314     raise CrabException(msg)
315 ewv 1.131
316 slacapra 1.1
317 gutsche 1.35 sites = dataloc.getSites()
318     allSites = []
319     listSites = sites.values()
320 slacapra 1.63 for listSite in listSites:
321     for oneSite in listSite:
322 gutsche 1.35 allSites.append(oneSite)
323     allSites = self.uniquelist(allSites)
324 gutsche 1.3
325 gutsche 1.92 # screen output
326     common.logger.message("Requested dataset: " + datasetPath + " has " + str(self.maxEvents) + " events in " + str(len(self.filesbyblock.keys())) + " blocks.\n")
327    
328 gutsche 1.35 return sites
329 ewv 1.131
330 ewv 1.170 # to Be Removed DS -- BL
331 spiga 1.165 # def setArgsList(self, argsList):
332     # self.argsList = argsList
333 mcinquil 1.140
334 gutsche 1.35 def jobSplittingByBlocks(self, blockSites):
335 slacapra 1.9 """
336 gutsche 1.35 Perform job splitting. Jobs run over an integer number of files
337     and no more than one block.
338     ARGUMENT: blockSites: dictionary with blocks as keys and list of host sites as values
339     REQUIRES: self.selectTotalNumberEvents, self.selectEventsPerJob, self.selectNumberofJobs,
340     self.total_number_of_events, self.eventsPerJob, self.theNumberOfJobs,
341     self.maxEvents, self.filesbyblock
342     SETS: self.jobDestination - Site destination(s) for each job (a list of lists)
343     self.total_number_of_jobs - Total # of jobs
344     self.list_of_args - File(s) job will run on (a list of lists)
345     """
346    
347     # ---- Handle the possible job splitting configurations ---- #
348     if (self.selectTotalNumberEvents):
349     totalEventsRequested = self.total_number_of_events
350     if (self.selectEventsPerJob):
351     eventsPerJobRequested = self.eventsPerJob
352     if (self.selectNumberOfJobs):
353     totalEventsRequested = self.theNumberOfJobs * self.eventsPerJob
354    
355     # If user requested all the events in the dataset
356     if (totalEventsRequested == -1):
357     eventsRemaining=self.maxEvents
358     # If user requested more events than are in the dataset
359     elif (totalEventsRequested > self.maxEvents):
360     eventsRemaining = self.maxEvents
361     common.logger.message("Requested "+str(self.total_number_of_events)+ " events, but only "+str(self.maxEvents)+" events are available.")
362     # If user requested less events than are in the dataset
363     else:
364     eventsRemaining = totalEventsRequested
365 slacapra 1.22
366 slacapra 1.41 # If user requested more events per job than are in the dataset
367     if (self.selectEventsPerJob and eventsPerJobRequested > self.maxEvents):
368     eventsPerJobRequested = self.maxEvents
369    
370 gutsche 1.35 # For user info at end
371     totalEventCount = 0
372 gutsche 1.3
373 gutsche 1.35 if (self.selectTotalNumberEvents and self.selectNumberOfJobs):
374     eventsPerJobRequested = int(eventsRemaining/self.theNumberOfJobs)
375 slacapra 1.22
376 gutsche 1.35 if (self.selectNumberOfJobs):
377     common.logger.message("May not create the exact number_of_jobs requested.")
378 slacapra 1.23
379 gutsche 1.38 if ( self.ncjobs == 'all' ) :
380     totalNumberOfJobs = 999999999
381     else :
382     totalNumberOfJobs = self.ncjobs
383 ewv 1.131
384 gutsche 1.35 blocks = blockSites.keys()
385     blockCount = 0
386     # Backup variable in case self.maxEvents counted events in a non-included block
387     numBlocksInDataset = len(blocks)
388 gutsche 1.3
389 gutsche 1.35 jobCount = 0
390     list_of_lists = []
391 gutsche 1.3
392 gutsche 1.92 # list tracking which jobs are in which jobs belong to which block
393     jobsOfBlock = {}
394    
395 gutsche 1.35 # ---- Iterate over the blocks in the dataset until ---- #
396     # ---- we've met the requested total # of events ---- #
397 gutsche 1.38 while ( (eventsRemaining > 0) and (blockCount < numBlocksInDataset) and (jobCount < totalNumberOfJobs)):
398 gutsche 1.35 block = blocks[blockCount]
399 gutsche 1.44 blockCount += 1
400 gutsche 1.104 if block not in jobsOfBlock.keys() :
401     jobsOfBlock[block] = []
402 ewv 1.131
403 gutsche 1.68 if self.eventsbyblock.has_key(block) :
404     numEventsInBlock = self.eventsbyblock[block]
405     common.logger.debug(5,'Events in Block File '+str(numEventsInBlock))
406 ewv 1.131
407 gutsche 1.68 files = self.filesbyblock[block]
408     numFilesInBlock = len(files)
409     if (numFilesInBlock <= 0):
410     continue
411     fileCount = 0
412    
413     # ---- New block => New job ---- #
414 ewv 1.131 parString = ""
415 gutsche 1.68 # counter for number of events in files currently worked on
416     filesEventCount = 0
417     # flag if next while loop should touch new file
418     newFile = 1
419     # job event counter
420     jobSkipEventCount = 0
421 ewv 1.131
422 gutsche 1.68 # ---- Iterate over the files in the block until we've met the requested ---- #
423     # ---- total # of events or we've gone over all the files in this block ---- #
424     while ( (eventsRemaining > 0) and (fileCount < numFilesInBlock) and (jobCount < totalNumberOfJobs) ):
425     file = files[fileCount]
426     if newFile :
427     try:
428     numEventsInFile = self.eventsbyfile[file]
429     common.logger.debug(6, "File "+str(file)+" has "+str(numEventsInFile)+" events")
430     # increase filesEventCount
431     filesEventCount += numEventsInFile
432     # Add file to current job
433     parString += '\\\"' + file + '\\\"\,'
434     newFile = 0
435     except KeyError:
436     common.logger.message("File "+str(file)+" has unknown number of events: skipping")
437 ewv 1.131
438 slacapra 1.177 eventsPerJobRequested = min(eventsPerJobRequested, eventsRemaining)
439 gutsche 1.68 # if less events in file remain than eventsPerJobRequested
440 slacapra 1.177 if ( filesEventCount - jobSkipEventCount < eventsPerJobRequested):
441 gutsche 1.68 # if last file in block
442     if ( fileCount == numFilesInBlock-1 ) :
443     # end job using last file, use remaining events in block
444     # close job and touch new file
445     fullString = parString[:-2]
446     list_of_lists.append([fullString,str(-1),str(jobSkipEventCount)])
447     common.logger.debug(3,"Job "+str(jobCount+1)+" can run over "+str(filesEventCount - jobSkipEventCount)+" events (last file in block).")
448     self.jobDestination.append(blockSites[block])
449     common.logger.debug(5,"Job "+str(jobCount+1)+" Destination: "+str(self.jobDestination[jobCount]))
450 gutsche 1.92 # fill jobs of block dictionary
451 gutsche 1.104 jobsOfBlock[block].append(jobCount+1)
452 gutsche 1.68 # reset counter
453     jobCount = jobCount + 1
454     totalEventCount = totalEventCount + filesEventCount - jobSkipEventCount
455     eventsRemaining = eventsRemaining - filesEventCount + jobSkipEventCount
456     jobSkipEventCount = 0
457     # reset file
458 ewv 1.131 parString = ""
459 gutsche 1.68 filesEventCount = 0
460     newFile = 1
461     fileCount += 1
462     else :
463     # go to next file
464     newFile = 1
465     fileCount += 1
466     # if events in file equal to eventsPerJobRequested
467     elif ( filesEventCount - jobSkipEventCount == eventsPerJobRequested ) :
468 gutsche 1.38 # close job and touch new file
469     fullString = parString[:-2]
470 gutsche 1.68 list_of_lists.append([fullString,str(eventsPerJobRequested),str(jobSkipEventCount)])
471     common.logger.debug(3,"Job "+str(jobCount+1)+" can run over "+str(eventsPerJobRequested)+" events.")
472 gutsche 1.38 self.jobDestination.append(blockSites[block])
473     common.logger.debug(5,"Job "+str(jobCount+1)+" Destination: "+str(self.jobDestination[jobCount]))
474 gutsche 1.104 jobsOfBlock[block].append(jobCount+1)
475 gutsche 1.38 # reset counter
476     jobCount = jobCount + 1
477 gutsche 1.68 totalEventCount = totalEventCount + eventsPerJobRequested
478     eventsRemaining = eventsRemaining - eventsPerJobRequested
479 gutsche 1.38 jobSkipEventCount = 0
480     # reset file
481 ewv 1.131 parString = ""
482 gutsche 1.38 filesEventCount = 0
483     newFile = 1
484     fileCount += 1
485 ewv 1.131
486 gutsche 1.68 # if more events in file remain than eventsPerJobRequested
487 gutsche 1.38 else :
488 gutsche 1.68 # close job but don't touch new file
489     fullString = parString[:-2]
490     list_of_lists.append([fullString,str(eventsPerJobRequested),str(jobSkipEventCount)])
491     common.logger.debug(3,"Job "+str(jobCount+1)+" can run over "+str(eventsPerJobRequested)+" events.")
492     self.jobDestination.append(blockSites[block])
493     common.logger.debug(5,"Job "+str(jobCount+1)+" Destination: "+str(self.jobDestination[jobCount]))
494 gutsche 1.104 jobsOfBlock[block].append(jobCount+1)
495 gutsche 1.68 # increase counter
496     jobCount = jobCount + 1
497     totalEventCount = totalEventCount + eventsPerJobRequested
498     eventsRemaining = eventsRemaining - eventsPerJobRequested
499     # calculate skip events for last file
500     # use filesEventCount (contains several files), jobSkipEventCount and eventsPerJobRequest
501     jobSkipEventCount = eventsPerJobRequested - (filesEventCount - jobSkipEventCount - self.eventsbyfile[file])
502     # remove all but the last file
503     filesEventCount = self.eventsbyfile[file]
504 ewv 1.160 parString = '\\\"' + file + '\\\"\,'
505 gutsche 1.68 pass # END if
506     pass # END while (iterate over files in the block)
507 gutsche 1.35 pass # END while (iterate over blocks in the dataset)
508 slacapra 1.41 self.ncjobs = self.total_number_of_jobs = jobCount
509 gutsche 1.38 if (eventsRemaining > 0 and jobCount < totalNumberOfJobs ):
510 gutsche 1.35 common.logger.message("Could not run on all requested events because some blocks not hosted at allowed sites.")
511 gutsche 1.92 common.logger.message(str(jobCount)+" job(s) can run on "+str(totalEventCount)+" events.\n")
512 ewv 1.131
513 gutsche 1.92 # screen output
514     screenOutput = "List of jobs and available destination sites:\n\n"
515    
516 mcinquil 1.124 # keep trace of block with no sites to print a warning at the end
517     noSiteBlock = []
518     bloskNoSite = []
519    
520 gutsche 1.92 blockCounter = 0
521 gutsche 1.104 for block in blocks:
522     if block in jobsOfBlock.keys() :
523     blockCounter += 1
524 slacapra 1.176 screenOutput += "Block %5i: jobs %20s: sites: %s\n" % (blockCounter,spanRanges(jobsOfBlock[block]),
525     ','.join(self.blackWhiteListParser.checkWhiteList(self.blackWhiteListParser.checkBlackList(blockSites[block],block),block)))
526 mcinquil 1.124 if len(self.blackWhiteListParser.checkWhiteList(self.blackWhiteListParser.checkBlackList(blockSites[block],block),block)) == 0:
527 ewv 1.131 noSiteBlock.append( spanRanges(jobsOfBlock[block]) )
528 mcinquil 1.124 bloskNoSite.append( blockCounter )
529 ewv 1.131
530 mcinquil 1.124 common.logger.message(screenOutput)
531 fanzago 1.127 if len(noSiteBlock) > 0 and len(bloskNoSite) > 0:
532 mcinquil 1.126 msg = 'WARNING: No sites are hosting any part of data for block:\n '
533     virgola = ""
534     if len(bloskNoSite) > 1:
535     virgola = ","
536     for block in bloskNoSite:
537     msg += ' ' + str(block) + virgola
538     msg += '\n Related jobs:\n '
539     virgola = ""
540     if len(noSiteBlock) > 1:
541     virgola = ","
542     for range_jobs in noSiteBlock:
543     msg += str(range_jobs) + virgola
544     msg += '\n will not be submitted and this block of data can not be analyzed!\n'
545 slacapra 1.155 if self.cfg_params.has_key('EDG.se_white_list'):
546     msg += 'WARNING: SE White List: '+self.cfg_params['EDG.se_white_list']+'\n'
547     msg += '(Hint: By whitelisting you force the job to run at this particular site(s).\n'
548     msg += 'Please check if the dataset is available at this site!)\n'
549     if self.cfg_params.has_key('EDG.ce_white_list'):
550     msg += 'WARNING: CE White List: '+self.cfg_params['EDG.ce_white_list']+'\n'
551     msg += '(Hint: By whitelisting you force the job to run at this particular site(s).\n'
552     msg += 'Please check if the dataset is available at this site!)\n'
553    
554 mcinquil 1.126 common.logger.message(msg)
555 gutsche 1.92
556 slacapra 1.9 self.list_of_args = list_of_lists
557     return
558    
559 slacapra 1.21 def jobSplittingNoInput(self):
560 slacapra 1.9 """
561     Perform job splitting based on number of event per job
562     """
563     common.logger.debug(5,'Splitting per events')
564 fanzago 1.130
565 ewv 1.131 if (self.selectEventsPerJob):
566 fanzago 1.130 common.logger.message('Required '+str(self.eventsPerJob)+' events per job ')
567     if (self.selectNumberOfJobs):
568     common.logger.message('Required '+str(self.theNumberOfJobs)+' jobs in total ')
569     if (self.selectTotalNumberEvents):
570     common.logger.message('Required '+str(self.total_number_of_events)+' events in total ')
571 slacapra 1.9
572 slacapra 1.10 if (self.total_number_of_events < 0):
573     msg='Cannot split jobs per Events with "-1" as total number of events'
574     raise CrabException(msg)
575    
576 slacapra 1.22 if (self.selectEventsPerJob):
577 spiga 1.65 if (self.selectTotalNumberEvents):
578     self.total_number_of_jobs = int(self.total_number_of_events/self.eventsPerJob)
579 ewv 1.131 elif(self.selectNumberOfJobs) :
580 spiga 1.65 self.total_number_of_jobs =self.theNumberOfJobs
581 ewv 1.131 self.total_number_of_events =int(self.theNumberOfJobs*self.eventsPerJob)
582 spiga 1.65
583 slacapra 1.22 elif (self.selectNumberOfJobs) :
584     self.total_number_of_jobs = self.theNumberOfJobs
585     self.eventsPerJob = int(self.total_number_of_events/self.total_number_of_jobs)
586 ewv 1.131
587 slacapra 1.9 common.logger.debug(5,'N jobs '+str(self.total_number_of_jobs))
588    
589     # is there any remainder?
590     check = int(self.total_number_of_events) - (int(self.total_number_of_jobs)*self.eventsPerJob)
591    
592     common.logger.debug(5,'Check '+str(check))
593    
594 gutsche 1.35 common.logger.message(str(self.total_number_of_jobs)+' jobs can be created, each for '+str(self.eventsPerJob)+' for a total of '+str(self.total_number_of_jobs*self.eventsPerJob)+' events')
595 slacapra 1.9 if check > 0:
596 gutsche 1.35 common.logger.message('Warning: asked '+str(self.total_number_of_events)+' but can do only '+str(int(self.total_number_of_jobs)*self.eventsPerJob))
597 slacapra 1.9
598 slacapra 1.10 # argument is seed number.$i
599 slacapra 1.9 self.list_of_args = []
600     for i in range(self.total_number_of_jobs):
601 gutsche 1.35 ## Since there is no input, any site is good
602 ewv 1.131 self.jobDestination.append([""]) #must be empty to write correctly the xml
603 slacapra 1.90 args=[]
604 spiga 1.57 if (self.firstRun):
605 slacapra 1.138 ## pythia first run
606 slacapra 1.90 args.append(str(self.firstRun)+str(i))
607     self.list_of_args.append(args)
608 ewv 1.131
609 gutsche 1.3 return
610    
611 spiga 1.42
612     def jobSplittingForScript(self):#CarlosDaniele
613     """
614     Perform job splitting based on number of job
615     """
616     common.logger.debug(5,'Splitting per job')
617     common.logger.message('Required '+str(self.theNumberOfJobs)+' jobs in total ')
618    
619     self.total_number_of_jobs = self.theNumberOfJobs
620    
621     common.logger.debug(5,'N jobs '+str(self.total_number_of_jobs))
622    
623     common.logger.message(str(self.total_number_of_jobs)+' jobs can be created')
624    
625     # argument is seed number.$i
626     self.list_of_args = []
627     for i in range(self.total_number_of_jobs):
628     ## Since there is no input, any site is good
629     # self.jobDestination.append(["Any"])
630     self.jobDestination.append([""])
631     ## no random seed
632     self.list_of_args.append([str(i)])
633     return
634    
635 gutsche 1.3 def split(self, jobParams):
636 ewv 1.131
637 gutsche 1.3 #### Fabio
638     njobs = self.total_number_of_jobs
639 slacapra 1.9 arglist = self.list_of_args
640 gutsche 1.3 # create the empty structure
641     for i in range(njobs):
642     jobParams.append("")
643 ewv 1.131
644 spiga 1.165 listID=[]
645     listField=[]
646 gutsche 1.3 for job in range(njobs):
647 slacapra 1.17 jobParams[job] = arglist[job]
648 spiga 1.167 listID.append(job+1)
649 spiga 1.162 job_ToSave ={}
650 spiga 1.169 concString = ' '
651 spiga 1.165 argu=''
652     if len(jobParams[job]):
653     argu += concString.join(jobParams[job] )
654 spiga 1.169 job_ToSave['arguments']= str(job+1)+' '+argu## new BL--DS
655 spiga 1.162 job_ToSave['dlsDestination']= self.jobDestination[job]## new BL--DS
656 spiga 1.165 #common._db.updateJob_(job,job_ToSave)## new BL--DS
657     listField.append(job_ToSave)
658 spiga 1.169 msg="Job "+str(job)+" Arguments: "+str(job+1)+" "+argu+"\n" \
659 spiga 1.165 +" Destination: "+str(self.jobDestination[job])
660     common.logger.debug(5,msg)
661     #common.logger.debug(5,"Job "+str(job)+" Destination: "+str(self.jobDestination[job]))
662     common._db.updateJob_(listID,listField)## new BL--DS
663     ## Pay Attention Here....DS--BL
664 spiga 1.181 self.argsList = (len(jobParams[0])+1)
665 gutsche 1.3
666     return
667 ewv 1.131
668 gutsche 1.3 def numberOfJobs(self):
669     # Fabio
670     return self.total_number_of_jobs
671    
672 slacapra 1.1 def getTarBall(self, exe):
673     """
674     Return the TarBall with lib and exe
675     """
676 ewv 1.131
677 slacapra 1.1 # if it exist, just return it
678 corvo 1.56 #
679     # Marco. Let's start to use relative path for Boss XML files
680     #
681     self.tgzNameWithPath = common.work_space.pathForTgz()+'share/'+self.tgz_name
682 slacapra 1.1 if os.path.exists(self.tgzNameWithPath):
683     return self.tgzNameWithPath
684    
685     # Prepare a tar gzipped file with user binaries.
686     self.buildTar_(exe)
687    
688     return string.strip(self.tgzNameWithPath)
689    
690     def buildTar_(self, executable):
691    
692     # First of all declare the user Scram area
693     swArea = self.scram.getSWArea_()
694     #print "swArea = ", swArea
695 slacapra 1.63 # swVersion = self.scram.getSWVersion()
696     # print "swVersion = ", swVersion
697 slacapra 1.1 swReleaseTop = self.scram.getReleaseTop_()
698     #print "swReleaseTop = ", swReleaseTop
699 ewv 1.131
700 slacapra 1.1 ## check if working area is release top
701     if swReleaseTop == '' or swArea == swReleaseTop:
702 afanfani 1.172 common.logger.debug(3,"swArea = "+swArea+" swReleaseTop ="+swReleaseTop)
703 slacapra 1.1 return
704    
705 slacapra 1.61 import tarfile
706     try: # create tar ball
707     tar = tarfile.open(self.tgzNameWithPath, "w:gz")
708     ## First find the executable
709 slacapra 1.86 if (self.executable != ''):
710 slacapra 1.61 exeWithPath = self.scram.findFile_(executable)
711     if ( not exeWithPath ):
712     raise CrabException('User executable '+executable+' not found')
713 ewv 1.131
714 slacapra 1.61 ## then check if it's private or not
715     if exeWithPath.find(swReleaseTop) == -1:
716     # the exe is private, so we must ship
717     common.logger.debug(5,"Exe "+exeWithPath+" to be tarred")
718     path = swArea+'/'
719 corvo 1.85 # distinguish case when script is in user project area or given by full path somewhere else
720     if exeWithPath.find(path) >= 0 :
721     exe = string.replace(exeWithPath, path,'')
722 slacapra 1.129 tar.add(path+exe,exe)
723 corvo 1.85 else :
724     tar.add(exeWithPath,os.path.basename(executable))
725 slacapra 1.61 pass
726     else:
727     # the exe is from release, we'll find it on WN
728     pass
729 ewv 1.131
730 slacapra 1.61 ## Now get the libraries: only those in local working area
731     libDir = 'lib'
732     lib = swArea+'/' +libDir
733     common.logger.debug(5,"lib "+lib+" to be tarred")
734     if os.path.exists(lib):
735     tar.add(lib,libDir)
736 ewv 1.131
737 slacapra 1.61 ## Now check if module dir is present
738     moduleDir = 'module'
739     module = swArea + '/' + moduleDir
740     if os.path.isdir(module):
741     tar.add(module,moduleDir)
742    
743     ## Now check if any data dir(s) is present
744     swAreaLen=len(swArea)
745 spiga 1.179 self.dataExist = False
746 slacapra 1.61 for root, dirs, files in os.walk(swArea):
747     if "data" in dirs:
748 spiga 1.179 self.dataExist=True
749 slacapra 1.61 common.logger.debug(5,"data "+root+"/data"+" to be tarred")
750     tar.add(root+"/data",root[swAreaLen:]+"/data")
751 ewv 1.182
752 spiga 1.179 ### CMSSW ParameterSet
753     if not self.pset is None:
754     cfg_file = common.work_space.jobDir()+self.configFilename()
755 ewv 1.182 tar.add(cfg_file,self.configFilename())
756 spiga 1.179 common.logger.debug(5,"File added to "+self.tgzNameWithPath+" : "+str(tar.getnames()))
757 slacapra 1.61
758 fanzago 1.93
759 fanzago 1.152 ## Add ProdCommon dir to tar
760 fanzago 1.93 prodcommonDir = 'ProdCommon'
761     prodcommonPath = os.environ['CRABDIR'] + '/' + 'ProdCommon'
762     if os.path.isdir(prodcommonPath):
763     tar.add(prodcommonPath,prodcommonDir)
764 spiga 1.179 common.logger.debug(5,"Files added to "+self.tgzNameWithPath+" : "+str(tar.getnames()))
765    
766     ##### ML stuff
767     ML_file_list=['report.py', 'DashboardAPI.py', 'Logger.py', 'ProcInfo.py', 'apmon.py']
768     path=os.environ['CRABDIR'] + '/python/'
769     for file in ML_file_list:
770     tar.add(path+file,file)
771     common.logger.debug(5,"Files added to "+self.tgzNameWithPath+" : "+str(tar.getnames()))
772    
773     ##### Utils
774     Utils_file_list=['parseCrabFjr.py','writeCfg.py', 'JobReportErrorCode.py']
775     for file in Utils_file_list:
776     tar.add(path+file,file)
777     common.logger.debug(5,"Files added to "+self.tgzNameWithPath+" : "+str(tar.getnames()))
778 ewv 1.131
779 ewv 1.182 ##### AdditionalFiles
780 spiga 1.179 for file in self.additional_inbox_files:
781     tar.add(file,string.split(file,'/')[-1])
782 slacapra 1.61 common.logger.debug(5,"Files added to "+self.tgzNameWithPath+" : "+str(tar.getnames()))
783 ewv 1.182
784 slacapra 1.61 tar.close()
785     except :
786     raise CrabException('Could not create tar-ball')
787 gutsche 1.72
788     ## check for tarball size
789     tarballinfo = os.stat(self.tgzNameWithPath)
790     if ( tarballinfo.st_size > self.MaxTarBallSize*1024*1024 ) :
791     raise CrabException('Input sandbox size of ' + str(float(tarballinfo.st_size)/1024.0/1024.0) + ' MB is larger than the allowed ' + str(self.MaxTarBallSize) + ' MB input sandbox limit and not supported by the used GRID submission system. Please make sure that no unnecessary files are in all data directories in your local CMSSW project area as they are automatically packed into the input sandbox.')
792    
793 slacapra 1.61 ## create tar-ball with ML stuff
794 slacapra 1.97
795 spiga 1.165 def wsSetupEnvironment(self, nj=0):
796 slacapra 1.1 """
797     Returns part of a job script which prepares
798     the execution environment for the job 'nj'.
799     """
800     # Prepare JobType-independent part
801 ewv 1.160 txt = '\n#Written by cms_cmssw::wsSetupEnvironment\n'
802 fanzago 1.133 txt += 'echo ">>> setup environment"\n'
803 ewv 1.131 txt += 'if [ $middleware == LCG ]; then \n'
804 gutsche 1.3 txt += self.wsSetupCMSLCGEnvironment_()
805     txt += 'elif [ $middleware == OSG ]; then\n'
806 gutsche 1.43 txt += ' WORKING_DIR=`/bin/mktemp -d $OSG_WN_TMP/cms_XXXXXXXXXXXX`\n'
807 ewv 1.132 txt += ' if [ ! $? == 0 ] ;then\n'
808 fanzago 1.161 txt += ' echo "ERROR ==> OSG $WORKING_DIR could not be created on WN `hostname`"\n'
809     txt += ' job_exit_code=10016\n'
810     txt += ' func_exit\n'
811 gutsche 1.3 txt += ' fi\n'
812 fanzago 1.133 txt += ' echo ">>> Created working directory: $WORKING_DIR"\n'
813 gutsche 1.3 txt += '\n'
814     txt += ' echo "Change to working directory: $WORKING_DIR"\n'
815     txt += ' cd $WORKING_DIR\n'
816 fanzago 1.133 txt += ' echo ">>> current directory (WORKING_DIR): $WORKING_DIR"\n'
817 ewv 1.131 txt += self.wsSetupCMSOSGEnvironment_()
818 fanzago 1.133 #txt += ' echo "### Set SCRAM ARCH to ' + self.executable_arch + ' ###"\n'
819     #txt += ' export SCRAM_ARCH='+self.executable_arch+'\n'
820 gutsche 1.3 txt += 'fi\n'
821 slacapra 1.1
822     # Prepare JobType-specific part
823     scram = self.scram.commandName()
824     txt += '\n\n'
825 fanzago 1.133 txt += 'echo ">>> specific cmssw setup environment:"\n'
826     txt += 'echo "CMSSW_VERSION = '+self.version+'"\n'
827 slacapra 1.1 txt += scram+' project CMSSW '+self.version+'\n'
828     txt += 'status=$?\n'
829     txt += 'if [ $status != 0 ] ; then\n'
830 fanzago 1.161 txt += ' echo "ERROR ==> CMSSW '+self.version+' not found on `hostname`" \n'
831     txt += ' job_exit_code=10034\n'
832 fanzago 1.163 txt += ' func_exit\n'
833 slacapra 1.1 txt += 'fi \n'
834     txt += 'cd '+self.version+'\n'
835 fanzago 1.99 txt += 'SOFTWARE_DIR=`pwd`\n'
836 fanzago 1.133 txt += 'echo ">>> current directory (SOFTWARE_DIR): $SOFTWARE_DIR" \n'
837 slacapra 1.1 txt += 'eval `'+scram+' runtime -sh | grep -v SCRAMRT_LSB_JOBNAME`\n'
838 fanzago 1.180 txt += 'if [ $? != 0 ] ; then\n'
839     txt += ' echo "ERROR ==> Problem with the command: "\n'
840     txt += ' echo "eval \`'+scram+' runtime -sh | grep -v SCRAMRT_LSB_JOBNAME \` at `hostname`"\n'
841     txt += ' job_exit_code=10034\n'
842     txt += ' func_exit\n'
843     txt += 'fi \n'
844 slacapra 1.1 # Handle the arguments:
845     txt += "\n"
846 gutsche 1.7 txt += "## number of arguments (first argument always jobnumber)\n"
847 slacapra 1.1 txt += "\n"
848 spiga 1.165 txt += "if [ $nargs -lt "+str(self.argsList)+" ]\n"
849 slacapra 1.1 txt += "then\n"
850 fanzago 1.161 txt += " echo 'ERROR ==> Too few arguments' +$nargs+ \n"
851     txt += ' job_exit_code=50113\n'
852     txt += " func_exit\n"
853 slacapra 1.1 txt += "fi\n"
854     txt += "\n"
855    
856     # Prepare job-specific part
857     job = common.job_list[nj]
858 fanzago 1.93 ### FEDE FOR DBS OUTPUT PUBLICATION
859 ewv 1.131 if (self.datasetPath):
860 fanzago 1.93 txt += '\n'
861     txt += 'DatasetPath='+self.datasetPath+'\n'
862    
863     datasetpath_split = self.datasetPath.split("/")
864 ewv 1.131
865 fanzago 1.93 txt += 'PrimaryDataset='+datasetpath_split[1]+'\n'
866     txt += 'DataTier='+datasetpath_split[2]+'\n'
867 fanzago 1.96 txt += 'ApplicationFamily=cmsRun\n'
868 fanzago 1.93
869     else:
870     txt += 'DatasetPath=MCDataTier\n'
871     txt += 'PrimaryDataset=null\n'
872     txt += 'DataTier=null\n'
873     txt += 'ApplicationFamily=MCDataTier\n'
874 ewv 1.170 if self.pset != None:
875 spiga 1.42 pset = os.path.basename(job.configFilename())
876     txt += '\n'
877 spiga 1.95 txt += 'cp $RUNTIME_AREA/'+pset+' .\n'
878 spiga 1.42 if (self.datasetPath): # standard job
879 ewv 1.160 txt += 'InputFiles=${args[1]}; export InputFiles\n'
880     txt += 'MaxEvents=${args[2]}; export MaxEvents\n'
881     txt += 'SkipEvents=${args[3]}; export SkipEvents\n'
882 spiga 1.42 txt += 'echo "Inputfiles:<$InputFiles>"\n'
883     txt += 'echo "MaxEvents:<$MaxEvents>"\n'
884     txt += 'echo "SkipEvents:<$SkipEvents>"\n'
885     else: # pythia like job
886 ewv 1.160 txt += 'PreserveSeeds=' + ','.join(self.preserveSeeds) + '; export PreserveSeeds\n'
887     txt += 'IncrementSeeds=' + ','.join(self.incrementSeeds) + '; export IncrementSeeds\n'
888     txt += 'echo "PreserveSeeds: <$PreserveSeeds>"\n'
889     txt += 'echo "IncrementSeeds:<$IncrementSeeds>"\n'
890 slacapra 1.90 if (self.firstRun):
891 ewv 1.160 txt += 'FirstRun=${args[1]}; export FirstRun\n'
892 spiga 1.57 txt += 'echo "FirstRun: <$FirstRun>"\n'
893 slacapra 1.90
894     txt += 'mv -f '+pset+' pset.cfg\n'
895 slacapra 1.1
896    
897 fanzago 1.163 if self.pset != None:
898 spiga 1.42 txt += '\n'
899     txt += 'echo "***** cat pset.cfg *********"\n'
900     txt += 'cat pset.cfg\n'
901     txt += 'echo "****** end pset.cfg ********"\n'
902     txt += '\n'
903 fanzago 1.94 txt += 'PSETHASH=`EdmConfigHash < pset.cfg` \n'
904     txt += 'echo "PSETHASH = $PSETHASH" \n'
905 fanzago 1.93 txt += '\n'
906 gutsche 1.3 return txt
907 slacapra 1.176
908 fanzago 1.166 def wsUntarSoftware(self, nj=0):
909 gutsche 1.3 """
910     Put in the script the commands to build an executable
911     or a library.
912     """
913    
914 fanzago 1.166 txt = '\n#Written by cms_cmssw::wsUntarSoftware\n'
915 gutsche 1.3
916     if os.path.isfile(self.tgzNameWithPath):
917 fanzago 1.133 txt += 'echo ">>> tar xzvf $RUNTIME_AREA/'+os.path.basename(self.tgzNameWithPath)+' :" \n'
918 gutsche 1.3 txt += 'tar xzvf $RUNTIME_AREA/'+os.path.basename(self.tgzNameWithPath)+'\n'
919 spiga 1.179 txt += 'ls -Al \n'
920 gutsche 1.3 txt += 'untar_status=$? \n'
921     txt += 'if [ $untar_status -ne 0 ]; then \n'
922 fanzago 1.161 txt += ' echo "ERROR ==> Untarring .tgz file failed"\n'
923     txt += ' job_exit_code=$untar_status\n'
924     txt += ' func_exit\n'
925 gutsche 1.3 txt += 'else \n'
926     txt += ' echo "Successful untar" \n'
927     txt += 'fi \n'
928 gutsche 1.50 txt += '\n'
929 fanzago 1.152 txt += 'echo ">>> Include ProdCommon in PYTHONPATH:"\n'
930 gutsche 1.50 txt += 'if [ -z "$PYTHONPATH" ]; then\n'
931 fanzago 1.166 txt += ' export PYTHONPATH=$RUNTIME_AREA/ProdCommon\n'
932 gutsche 1.50 txt += 'else\n'
933 fanzago 1.166 txt += ' export PYTHONPATH=$RUNTIME_AREA/ProdCommon:${PYTHONPATH}\n'
934 fanzago 1.93 txt += 'echo "PYTHONPATH=$PYTHONPATH"\n'
935 gutsche 1.50 txt += 'fi\n'
936     txt += '\n'
937    
938 gutsche 1.3 pass
939 ewv 1.131
940 slacapra 1.1 return txt
941 ewv 1.170
942 fanzago 1.166 def wsBuildExe(self, nj=0):
943     """
944     Put in the script the commands to build an executable
945     or a library.
946     """
947    
948     txt = '\n#Written by cms_cmssw::wsBuildExe\n'
949     txt += 'echo ">>> moving CMSSW software directories in `pwd`" \n'
950    
951 ewv 1.170 txt += 'rm -r lib/ module/ \n'
952     txt += 'mv $RUNTIME_AREA/lib/ . \n'
953     txt += 'mv $RUNTIME_AREA/module/ . \n'
954 spiga 1.179 if self.dataExist == True: txt += 'mv $RUNTIME_AREA/src/ . \n'
955 ewv 1.182 if len(self.additional_inbox_files)>0:
956 spiga 1.179 for file in self.additional_inbox_files:
957     txt += 'mv $RUNTIME_AREA/'+file+' . \n'
958 ewv 1.170 txt += 'mv $RUNTIME_AREA/ProdCommon/ . \n'
959    
960 fanzago 1.166 txt += 'if [ -z "$PYTHONPATH" ]; then\n'
961     txt += ' export PYTHONPATH=$SOFTWARE_DIR/ProdCommon\n'
962     txt += 'else\n'
963     txt += ' export PYTHONPATH=$SOFTWARE_DIR/ProdCommon:${PYTHONPATH}\n'
964     txt += 'echo "PYTHONPATH=$PYTHONPATH"\n'
965     txt += 'fi\n'
966     txt += '\n'
967    
968     return txt
969 slacapra 1.1
970     def modifySteeringCards(self, nj):
971     """
972 ewv 1.131 modify the card provided by the user,
973 slacapra 1.1 writing a new card into share dir
974     """
975 ewv 1.131
976 slacapra 1.1 def executableName(self):
977 slacapra 1.70 if self.scriptExe: #CarlosDaniele
978 spiga 1.42 return "sh "
979     else:
980     return self.executable
981 slacapra 1.1
982     def executableArgs(self):
983 ewv 1.160 # FUTURE: This function tests the CMSSW version. Can be simplified as we drop support for old versions
984 slacapra 1.70 if self.scriptExe:#CarlosDaniele
985 spiga 1.42 return self.scriptExe + " $NJob"
986 fanzago 1.115 else:
987 ewv 1.160 ex_args = ""
988 ewv 1.171 # FUTURE: This tests the CMSSW version. Can remove code as versions deprecated
989 ewv 1.160 # Framework job report
990 ewv 1.182 if (self.major_version >= 1 and self.minor_version >= 5) or (self.major_version >= 2):
991 fanzago 1.166 ex_args += " -j $RUNTIME_AREA/crab_fjr_$NJob.xml"
992 ewv 1.160 # Type of cfg file
993 spiga 1.183 if self.major_version >= 2 :
994 ewv 1.171 ex_args += " -p pset.py"
995 fanzago 1.115 else:
996 ewv 1.160 ex_args += " -p pset.cfg"
997     return ex_args
998 slacapra 1.1
999     def inputSandbox(self, nj):
1000     """
1001     Returns a list of filenames to be put in JDL input sandbox.
1002     """
1003     inp_box = []
1004 slacapra 1.53 # # dict added to delete duplicate from input sandbox file list
1005     # seen = {}
1006 slacapra 1.1 ## code
1007     if os.path.isfile(self.tgzNameWithPath):
1008     inp_box.append(self.tgzNameWithPath)
1009 spiga 1.168 wrapper = os.path.basename(str(common._db.queryTask('scriptName')))
1010     inp_box.append(common.work_space.pathForTgz() +'job/'+ wrapper)
1011 slacapra 1.1 return inp_box
1012    
1013     def outputSandbox(self, nj):
1014     """
1015     Returns a list of filenames to be put in JDL output sandbox.
1016     """
1017     out_box = []
1018    
1019     ## User Declared output files
1020 slacapra 1.54 for out in (self.output_file+self.output_file_sandbox):
1021 ewv 1.131 n_out = nj + 1
1022 slacapra 1.1 out_box.append(self.numberFile_(out,str(n_out)))
1023     return out_box
1024    
1025     def prepareSteeringCards(self):
1026     """
1027     Make initial modifications of the user's steering card file.
1028     """
1029     return
1030    
1031     def wsRenameOutput(self, nj):
1032     """
1033     Returns part of a job script which renames the produced files.
1034     """
1035    
1036 ewv 1.160 txt = '\n#Written by cms_cmssw::wsRenameOutput\n'
1037 fanzago 1.148 txt += 'echo ">>> current directory (SOFTWARE_DIR): $SOFTWARE_DIR" \n'
1038     txt += 'echo ">>> current directory content:"\n'
1039 gutsche 1.7 txt += 'ls \n'
1040 fanzago 1.145 txt += '\n'
1041 slacapra 1.54
1042 fanzago 1.128 for fileWithSuffix in (self.output_file):
1043 slacapra 1.1 output_file_num = self.numberFile_(fileWithSuffix, '$NJob')
1044     txt += '\n'
1045 gutsche 1.7 txt += '# check output file\n'
1046 slacapra 1.106 txt += 'if [ -e ./'+fileWithSuffix+' ] ; then\n'
1047 ewv 1.147 if (self.copy_data == 1): # For OSG nodes, file is in $WORKING_DIR, should not be moved to $RUNTIME_AREA
1048     txt += ' mv '+fileWithSuffix+' '+output_file_num+'\n'
1049     txt += ' ln -s `pwd`/'+output_file_num+' $RUNTIME_AREA/'+fileWithSuffix+'\n'
1050     else:
1051     txt += ' mv '+fileWithSuffix+' $RUNTIME_AREA/'+output_file_num+'\n'
1052     txt += ' ln -s $RUNTIME_AREA/'+output_file_num+' $RUNTIME_AREA/'+fileWithSuffix+'\n'
1053 slacapra 1.106 txt += 'else\n'
1054 fanzago 1.161 txt += ' job_exit_code=60302\n'
1055     txt += ' echo "WARNING: Output file '+fileWithSuffix+' not found"\n'
1056 ewv 1.156 if common.scheduler.name().upper() == 'CONDOR_G':
1057 gutsche 1.7 txt += ' if [ $middleware == OSG ]; then \n'
1058     txt += ' echo "prepare dummy output file"\n'
1059     txt += ' echo "Processing of job output failed" > $RUNTIME_AREA/'+output_file_num+'\n'
1060     txt += ' fi \n'
1061 slacapra 1.1 txt += 'fi\n'
1062 slacapra 1.105 file_list = []
1063     for fileWithSuffix in (self.output_file):
1064     file_list.append(self.numberFile_(fileWithSuffix, '$NJob'))
1065 ewv 1.131
1066 slacapra 1.105 txt += 'file_list="'+string.join(file_list,' ')+'"\n'
1067 fanzago 1.149 txt += '\n'
1068 fanzago 1.148 txt += 'echo ">>> current directory (SOFTWARE_DIR): $SOFTWARE_DIR" \n'
1069     txt += 'echo ">>> current directory content:"\n'
1070     txt += 'ls \n'
1071     txt += '\n'
1072 gutsche 1.7 txt += 'cd $RUNTIME_AREA\n'
1073 fanzago 1.133 txt += 'echo ">>> current directory (RUNTIME_AREA): $RUNTIME_AREA"\n'
1074 slacapra 1.1 return txt
1075    
1076     def numberFile_(self, file, txt):
1077     """
1078     append _'txt' before last extension of a file
1079     """
1080     p = string.split(file,".")
1081     # take away last extension
1082     name = p[0]
1083     for x in p[1:-1]:
1084 slacapra 1.90 name=name+"."+x
1085 slacapra 1.1 # add "_txt"
1086     if len(p)>1:
1087 slacapra 1.90 ext = p[len(p)-1]
1088     result = name + '_' + txt + "." + ext
1089 slacapra 1.1 else:
1090 slacapra 1.90 result = name + '_' + txt
1091 ewv 1.131
1092 slacapra 1.1 return result
1093    
1094 slacapra 1.63 def getRequirements(self, nj=[]):
1095 slacapra 1.1 """
1096 ewv 1.131 return job requirements to add to jdl files
1097 slacapra 1.1 """
1098     req = ''
1099 slacapra 1.47 if self.version:
1100 slacapra 1.10 req='Member("VO-cms-' + \
1101 slacapra 1.47 self.version + \
1102 slacapra 1.10 '", other.GlueHostApplicationSoftwareRunTimeEnvironment)'
1103 farinafa 1.111 ## SL add requirement for OS version only if SL4
1104     #reSL4 = re.compile( r'slc4' )
1105 slacapra 1.109 if self.executable_arch: # and reSL4.search(self.executable_arch):
1106 gutsche 1.107 req+=' && Member("VO-cms-' + \
1107 slacapra 1.105 self.executable_arch + \
1108     '", other.GlueHostApplicationSoftwareRunTimeEnvironment)'
1109 gutsche 1.35
1110     req = req + ' && (other.GlueHostNetworkAdapterOutboundIP)'
1111 afanfani 1.158 if common.scheduler.name() == "glitecoll":
1112     req += ' && other.GlueCEStateStatus == "Production" '
1113 gutsche 1.35
1114 slacapra 1.1 return req
1115 gutsche 1.3
1116     def configFilename(self):
1117     """ return the config filename """
1118 ewv 1.182 # FUTURE: Can remove cfg mode for CMSSW >= 2_1_x
1119     if (self.major_version >= 2 and self.minor_version >= 1) or (self.major_version >= 3):
1120     return self.name()+'.py'
1121     else:
1122     return self.name()+'.cfg'
1123 gutsche 1.3
1124     def wsSetupCMSOSGEnvironment_(self):
1125     """
1126     Returns part of a job script which is prepares
1127     the execution environment and which is common for all CMS jobs.
1128     """
1129 ewv 1.160 txt = '\n#Written by cms_cmssw::wsSetupCMSOSGEnvironment_\n'
1130     txt += ' echo ">>> setup CMS OSG environment:"\n'
1131 fanzago 1.133 txt += ' echo "set SCRAM ARCH to ' + self.executable_arch + '"\n'
1132     txt += ' export SCRAM_ARCH='+self.executable_arch+'\n'
1133 fanzago 1.136 txt += ' echo "SCRAM_ARCH = $SCRAM_ARCH"\n'
1134 ewv 1.135 txt += ' if [ -f $OSG_APP/cmssoft/cms/cmsset_default.sh ] ;then\n'
1135 mkirn 1.40 txt += ' # Use $OSG_APP/cmssoft/cms/cmsset_default.sh to setup cms software\n'
1136 fanzago 1.133 txt += ' source $OSG_APP/cmssoft/cms/cmsset_default.sh '+self.version+'\n'
1137     txt += ' else\n'
1138 fanzago 1.161 txt += ' echo "ERROR ==> $OSG_APP/cmssoft/cms/cmsset_default.sh file not found"\n'
1139     txt += ' job_exit_code=10020\n'
1140     txt += ' func_exit\n'
1141 fanzago 1.133 txt += ' fi\n'
1142 gutsche 1.3 txt += '\n'
1143 fanzago 1.161 txt += ' echo "==> setup cms environment ok"\n'
1144 fanzago 1.136 txt += ' echo "SCRAM_ARCH = $SCRAM_ARCH"\n'
1145 gutsche 1.3
1146     return txt
1147 ewv 1.131
1148 gutsche 1.3 ### OLI_DANIELE
1149     def wsSetupCMSLCGEnvironment_(self):
1150     """
1151     Returns part of a job script which is prepares
1152     the execution environment and which is common for all CMS jobs.
1153     """
1154 ewv 1.160 txt = '\n#Written by cms_cmssw::wsSetupCMSLCGEnvironment_\n'
1155     txt += ' echo ">>> setup CMS LCG environment:"\n'
1156 fanzago 1.133 txt += ' echo "set SCRAM ARCH and BUILD_ARCH to ' + self.executable_arch + ' ###"\n'
1157     txt += ' export SCRAM_ARCH='+self.executable_arch+'\n'
1158     txt += ' export BUILD_ARCH='+self.executable_arch+'\n'
1159     txt += ' if [ ! $VO_CMS_SW_DIR ] ;then\n'
1160 fanzago 1.161 txt += ' echo "ERROR ==> CMS software dir not found on WN `hostname`"\n'
1161     txt += ' job_exit_code=10031\n'
1162     txt += ' func_exit\n'
1163 fanzago 1.133 txt += ' else\n'
1164     txt += ' echo "Sourcing environment... "\n'
1165     txt += ' if [ ! -s $VO_CMS_SW_DIR/cmsset_default.sh ] ;then\n'
1166 fanzago 1.161 txt += ' echo "ERROR ==> cmsset_default.sh file not found into dir $VO_CMS_SW_DIR"\n'
1167     txt += ' job_exit_code=10020\n'
1168     txt += ' func_exit\n'
1169 fanzago 1.133 txt += ' fi\n'
1170     txt += ' echo "sourcing $VO_CMS_SW_DIR/cmsset_default.sh"\n'
1171     txt += ' source $VO_CMS_SW_DIR/cmsset_default.sh\n'
1172     txt += ' result=$?\n'
1173     txt += ' if [ $result -ne 0 ]; then\n'
1174 fanzago 1.161 txt += ' echo "ERROR ==> problem sourcing $VO_CMS_SW_DIR/cmsset_default.sh"\n'
1175     txt += ' job_exit_code=10032\n'
1176     txt += ' func_exit\n'
1177 fanzago 1.133 txt += ' fi\n'
1178     txt += ' fi\n'
1179     txt += ' \n'
1180 fanzago 1.161 txt += ' echo "==> setup cms environment ok"\n'
1181 gutsche 1.3 return txt
1182 gutsche 1.5
1183 ewv 1.131 ### FEDE FOR DBS OUTPUT PUBLICATION
1184 fanzago 1.93 def modifyReport(self, nj):
1185     """
1186 ewv 1.131 insert the part of the script that modifies the FrameworkJob Report
1187 fanzago 1.93 """
1188 fanzago 1.94
1189 ewv 1.160 txt = '\n#Written by cms_cmssw::modifyReport\n'
1190 slacapra 1.176 publish_data = int(self.cfg_params.get('USER.publish_data',0))
1191 ewv 1.131 if (publish_data == 1):
1192 fanzago 1.94 processedDataset = self.cfg_params['USER.publish_data_name']
1193 fanzago 1.173 LFNBaseName = LFNBase(processedDataset)
1194 fanzago 1.175
1195     txt += 'if [ $copy_exit_status -eq 0 ]; then\n'
1196 fanzago 1.173 txt += ' FOR_LFN=%s_${PSETHASH}/\n'%(LFNBaseName)
1197 fanzago 1.175 txt += 'else\n'
1198     txt += ' FOR_LFN=/copy_problems/ \n'
1199     txt += ' SE=""\n'
1200     txt += ' SE_PATH=""\n'
1201     txt += 'fi\n'
1202 ewv 1.182
1203 fanzago 1.175 txt += 'echo ">>> Modify Job Report:" \n'
1204     txt += 'chmod a+x $SOFTWARE_DIR/ProdCommon/ProdCommon/FwkJobRep/ModifyJobReport.py\n'
1205     txt += 'ProcessedDataset='+processedDataset+'\n'
1206     txt += 'echo "ProcessedDataset = $ProcessedDataset"\n'
1207     txt += 'echo "SE = $SE"\n'
1208     txt += 'echo "SE_PATH = $SE_PATH"\n'
1209     txt += 'echo "FOR_LFN = $FOR_LFN" \n'
1210     txt += 'echo "CMSSW_VERSION = $CMSSW_VERSION"\n\n'
1211     txt += 'echo "$SOFTWARE_DIR/ProdCommon/ProdCommon/FwkJobRep/ModifyJobReport.py $RUNTIME_AREA/crab_fjr_$NJob.xml $NJob $FOR_LFN $PrimaryDataset $DataTier $ProcessedDataset $ApplicationFamily $executable $CMSSW_VERSION $PSETHASH $SE $SE_PATH"\n'
1212     txt += '$SOFTWARE_DIR/ProdCommon/ProdCommon/FwkJobRep/ModifyJobReport.py $RUNTIME_AREA/crab_fjr_$NJob.xml $NJob $FOR_LFN $PrimaryDataset $DataTier $ProcessedDataset $ApplicationFamily $executable $CMSSW_VERSION $PSETHASH $SE $SE_PATH\n'
1213     txt += 'modifyReport_result=$?\n'
1214     txt += 'if [ $modifyReport_result -ne 0 ]; then\n'
1215     txt += ' modifyReport_result=70500\n'
1216     txt += ' job_exit_code=$modifyReport_result\n'
1217     txt += ' echo "ModifyReportResult=$modifyReport_result" | tee -a $RUNTIME_AREA/$repo\n'
1218     txt += ' echo "WARNING: Problem with ModifyJobReport"\n'
1219     txt += 'else\n'
1220     txt += ' mv NewFrameworkJobReport.xml $RUNTIME_AREA/crab_fjr_$NJob.xml\n'
1221 spiga 1.103 txt += 'fi\n'
1222 fanzago 1.93 return txt
1223 fanzago 1.99
1224 gutsche 1.5 def setParam_(self, param, value):
1225     self._params[param] = value
1226    
1227     def getParams(self):
1228     return self._params
1229 gutsche 1.8
1230 gutsche 1.35 def uniquelist(self, old):
1231     """
1232     remove duplicates from a list
1233     """
1234     nd={}
1235     for e in old:
1236     nd[e]=0
1237     return nd.keys()
1238 mcinquil 1.121
1239 spiga 1.169 def outList(self):
1240 mcinquil 1.121 """
1241     check the dimension of the output files
1242     """
1243 spiga 1.169 txt = ''
1244     txt += 'echo ">>> list of expected files on output sandbox"\n'
1245 mcinquil 1.121 listOutFiles = []
1246 ewv 1.170 stdout = 'CMSSW_$NJob.stdout'
1247 spiga 1.169 stderr = 'CMSSW_$NJob.stderr'
1248 fanzago 1.148 if (self.return_data == 1):
1249 spiga 1.157 for file in (self.output_file+self.output_file_sandbox):
1250     listOutFiles.append(self.numberFile_(file, '$NJob'))
1251 spiga 1.169 listOutFiles.append(stdout)
1252     listOutFiles.append(stderr)
1253 ewv 1.156 else:
1254 spiga 1.157 for file in (self.output_file_sandbox):
1255     listOutFiles.append(self.numberFile_(file, '$NJob'))
1256 spiga 1.169 listOutFiles.append(stdout)
1257     listOutFiles.append(stderr)
1258 fanzago 1.161 txt += 'echo "output files: '+string.join(listOutFiles,' ')+'"\n'
1259 spiga 1.157 txt += 'filesToCheck="'+string.join(listOutFiles,' ')+'"\n'
1260 spiga 1.169 txt += 'export filesToCheck\n'
1261 ewv 1.170 return txt