ViewVC Help
View File | Revision Log | Show Annotations | Root Listing
root/cvsroot/COMP/CRAB/python/cms_cmssw.py
Revision: 1.177
Committed: Thu Apr 17 16:08:28 2008 UTC (17 years ago) by slacapra
Content type: text/x-python
Branch: MAIN
CVS Tags: bp_osg_bdii, CRAB_2_2_0_pre9
Branch point for: osg_bdii
Changes since 1.176: +10 -11 lines
Log Message:
fix bug in splitting, in case remaining event per block are less than events per job

File Contents

# User Rev Content
1 slacapra 1.1 from JobType import JobType
2     from crab_logger import Logger
3     from crab_exceptions import *
4     from crab_util import *
5 fanzago 1.115 from BlackWhiteListParser import BlackWhiteListParser
6 slacapra 1.1 import common
7     import Scram
8 fanzago 1.173 from LFNBaseName import *
9 slacapra 1.1
10 slacapra 1.105 import os, string, glob
11 slacapra 1.1
12     class Cmssw(JobType):
13 mcinquil 1.144 def __init__(self, cfg_params, ncjobs):
14 slacapra 1.1 JobType.__init__(self, 'CMSSW')
15     common.logger.debug(3,'CMSSW::__init__')
16    
17 mcinquil 1.140 self.argsList = []
18 mcinquil 1.144
19 gutsche 1.3 self._params = {}
20     self.cfg_params = cfg_params
21 fanzago 1.115 # init BlackWhiteListParser
22     self.blackWhiteListParser = BlackWhiteListParser(cfg_params)
23    
24 slacapra 1.153 self.MaxTarBallSize = float(self.cfg_params.get('EDG.maxtarballsize',9.5))
25 gutsche 1.72
26 gutsche 1.44 # number of jobs requested to be created, limit obj splitting
27 gutsche 1.38 self.ncjobs = ncjobs
28    
29 slacapra 1.1 log = common.logger
30 ewv 1.131
31 slacapra 1.1 self.scram = Scram.Scram(cfg_params)
32     self.additional_inbox_files = []
33     self.scriptExe = ''
34     self.executable = ''
35 slacapra 1.71 self.executable_arch = self.scram.getArch()
36 slacapra 1.1 self.tgz_name = 'default.tgz'
37 slacapra 1.97 self.additional_tgz_name = 'additional.tgz'
38 corvo 1.56 self.scriptName = 'CMSSW.sh'
39 ewv 1.131 self.pset = '' #scrip use case Da
40 spiga 1.42 self.datasetPath = '' #scrip use case Da
41 gutsche 1.3
42 gutsche 1.50 # set FJR file name
43     self.fjrFileName = 'crab_fjr.xml'
44    
45 slacapra 1.1 self.version = self.scram.getSWVersion()
46 ewv 1.131
47 spiga 1.114 #
48     # Try to block creation in case of arch/version mismatch
49     #
50    
51 spiga 1.162 # a = string.split(self.version, "_")
52     #
53     # if int(a[1]) == 1 and (int(a[2]) < 5 and self.executable_arch.find('slc4') == 0):
54     # msg = "Warning: You are using %s version of CMSSW with %s architecture. \n--> Did you compile your libraries with SLC3? Otherwise you can find some problems running on SLC4 Grid nodes.\n"%(self.version, self.executable_arch)
55     # common.logger.message(msg)
56     # if int(a[1]) == 1 and (int(a[2]) >= 5 and self.executable_arch.find('slc3') == 0):
57     # msg = "Error: CMS does not support %s with %s architecture"%(self.version, self.executable_arch)
58     # raise CrabException(msg)
59     #
60 ewv 1.170
61 gutsche 1.5 self.setParam_('application', self.version)
62 slacapra 1.47
63 slacapra 1.1 ### collect Data cards
64 gutsche 1.66
65 slacapra 1.153 if not cfg_params.has_key('CMSSW.datasetpath'):
66 ewv 1.131 msg = "Error: datasetpath not defined "
67 slacapra 1.1 raise CrabException(msg)
68 slacapra 1.153 tmp = cfg_params['CMSSW.datasetpath']
69     log.debug(6, "CMSSW::CMSSW(): datasetPath = "+tmp)
70     if string.lower(tmp)=='none':
71     self.datasetPath = None
72     self.selectNoInput = 1
73     else:
74     self.datasetPath = tmp
75     self.selectNoInput = 0
76 gutsche 1.5
77     # ML monitoring
78     # split dataset path style: /PreProdR3Minbias/SIM/GEN-SIM
79 slacapra 1.9 if not self.datasetPath:
80     self.setParam_('dataset', 'None')
81     self.setParam_('owner', 'None')
82     else:
83 slacapra 1.153 ## SL what is supposed to fail here?
84 gutsche 1.92 try:
85     datasetpath_split = self.datasetPath.split("/")
86     # standard style
87 mcinquil 1.120 self.setParam_('datasetFull', self.datasetPath)
88 slacapra 1.137 self.setParam_('dataset', datasetpath_split[1])
89     self.setParam_('owner', datasetpath_split[2])
90 gutsche 1.92 except:
91     self.setParam_('dataset', self.datasetPath)
92     self.setParam_('owner', self.datasetPath)
93 ewv 1.131
94 spiga 1.162 self.setParam_('taskId', common._db.queryTask('name')) ## new BL--DS
95 gutsche 1.5
96 slacapra 1.1 self.dataTiers = []
97    
98     ## now the application
99 slacapra 1.153 self.executable = cfg_params.get('CMSSW.executable','cmsRun')
100     self.setParam_('exe', self.executable)
101     log.debug(6, "CMSSW::CMSSW(): executable = "+self.executable)
102 slacapra 1.1
103 slacapra 1.153 if not cfg_params.has_key('CMSSW.pset'):
104 slacapra 1.1 raise CrabException("PSet file missing. Cannot run cmsRun ")
105 slacapra 1.153 self.pset = cfg_params['CMSSW.pset']
106     log.debug(6, "Cmssw::Cmssw(): PSet file = "+self.pset)
107     if self.pset.lower() != 'none' :
108     if (not os.path.exists(self.pset)):
109     raise CrabException("User defined PSet file "+self.pset+" does not exist")
110     else:
111     self.pset = None
112 slacapra 1.1
113     # output files
114 slacapra 1.53 ## stuff which must be returned always via sandbox
115     self.output_file_sandbox = []
116    
117     # add fjr report by default via sandbox
118     self.output_file_sandbox.append(self.fjrFileName)
119    
120     # other output files to be returned via sandbox or copied to SE
121 slacapra 1.153 self.output_file = []
122     tmp = cfg_params.get('CMSSW.output_file',None)
123     if tmp :
124     tmpOutFiles = string.split(tmp,',')
125     log.debug(7, 'cmssw::cmssw(): output files '+str(tmpOutFiles))
126     for tmp in tmpOutFiles:
127     tmp=string.strip(tmp)
128     self.output_file.append(tmp)
129 slacapra 1.1 pass
130 slacapra 1.153 else:
131 gutsche 1.92 log.message("No output file defined: only stdout/err and the CRAB Framework Job Report will be available\n")
132 slacapra 1.153 pass
133 slacapra 1.1
134     # script_exe file as additional file in inputSandbox
135 slacapra 1.153 self.scriptExe = cfg_params.get('USER.script_exe',None)
136     if self.scriptExe :
137 slacapra 1.176 if not os.path.isfile(self.scriptExe):
138     msg ="ERROR. file "+self.scriptExe+" not found"
139     raise CrabException(msg)
140     self.additional_inbox_files.append(string.strip(self.scriptExe))
141 slacapra 1.70
142 spiga 1.42 #CarlosDaniele
143     if self.datasetPath == None and self.pset == None and self.scriptExe == '' :
144 slacapra 1.176 msg ="Error. script_exe not defined"
145     raise CrabException(msg)
146 spiga 1.42
147 slacapra 1.1 ## additional input files
148 slacapra 1.153 if cfg_params.has_key('USER.additional_input_files'):
149 slacapra 1.29 tmpAddFiles = string.split(cfg_params['USER.additional_input_files'],',')
150 slacapra 1.70 for tmp in tmpAddFiles:
151     tmp = string.strip(tmp)
152     dirname = ''
153     if not tmp[0]=="/": dirname = "."
154 corvo 1.85 files = []
155     if string.find(tmp,"*")>-1:
156     files = glob.glob(os.path.join(dirname, tmp))
157     if len(files)==0:
158     raise CrabException("No additional input file found with this pattern: "+tmp)
159     else:
160     files.append(tmp)
161 slacapra 1.70 for file in files:
162     if not os.path.exists(file):
163     raise CrabException("Additional input file not found: "+file)
164 slacapra 1.45 pass
165 slacapra 1.105 # fname = string.split(file, '/')[-1]
166     # storedFile = common.work_space.pathForTgz()+'share/'+fname
167     # shutil.copyfile(file, storedFile)
168     self.additional_inbox_files.append(string.strip(file))
169 slacapra 1.1 pass
170     pass
171 slacapra 1.70 common.logger.debug(5,"Additional input files: "+str(self.additional_inbox_files))
172 slacapra 1.153 pass
173 gutsche 1.3
174 slacapra 1.9 ## Events per job
175 slacapra 1.153 if cfg_params.has_key('CMSSW.events_per_job'):
176 slacapra 1.10 self.eventsPerJob =int( cfg_params['CMSSW.events_per_job'])
177 slacapra 1.9 self.selectEventsPerJob = 1
178 slacapra 1.153 else:
179 slacapra 1.9 self.eventsPerJob = -1
180     self.selectEventsPerJob = 0
181 ewv 1.131
182 slacapra 1.22 ## number of jobs
183 slacapra 1.153 if cfg_params.has_key('CMSSW.number_of_jobs'):
184 slacapra 1.22 self.theNumberOfJobs =int( cfg_params['CMSSW.number_of_jobs'])
185     self.selectNumberOfJobs = 1
186 slacapra 1.153 else:
187 slacapra 1.22 self.theNumberOfJobs = 0
188     self.selectNumberOfJobs = 0
189 slacapra 1.10
190 slacapra 1.153 if cfg_params.has_key('CMSSW.total_number_of_events'):
191 gutsche 1.35 self.total_number_of_events = int(cfg_params['CMSSW.total_number_of_events'])
192     self.selectTotalNumberEvents = 1
193 slacapra 1.153 else:
194 gutsche 1.35 self.total_number_of_events = 0
195     self.selectTotalNumberEvents = 0
196    
197 ewv 1.131 if self.pset != None: #CarlosDaniele
198 spiga 1.42 if ( (self.selectTotalNumberEvents + self.selectEventsPerJob + self.selectNumberOfJobs) != 2 ):
199     msg = 'Must define exactly two of total_number_of_events, events_per_job, or number_of_jobs.'
200     raise CrabException(msg)
201     else:
202     if (self.selectNumberOfJobs == 0):
203     msg = 'Must specify number_of_jobs.'
204     raise CrabException(msg)
205 gutsche 1.35
206 ewv 1.160 ## New method of dealing with seeds
207     self.incrementSeeds = []
208     self.preserveSeeds = []
209     if cfg_params.has_key('CMSSW.preserve_seeds'):
210     tmpList = cfg_params['CMSSW.preserve_seeds'].split(',')
211     for tmp in tmpList:
212     tmp.strip()
213     self.preserveSeeds.append(tmp)
214     if cfg_params.has_key('CMSSW.increment_seeds'):
215     tmpList = cfg_params['CMSSW.increment_seeds'].split(',')
216     for tmp in tmpList:
217     tmp.strip()
218     self.incrementSeeds.append(tmp)
219    
220     ## Old method of dealing with seeds
221     ## FUTURE: This is for old CMSSW and old CRAB. Can throw exceptions after a couple of CRAB releases and then
222     ## remove
223 slacapra 1.153 self.sourceSeed = cfg_params.get('CMSSW.pythia_seed',None)
224 ewv 1.160 if self.sourceSeed:
225 slacapra 1.177 print "pythia_seed is a deprecated parameter. Use preserve_seeds or increment_seeds in the future.\n","Added to increment_seeds."
226     self.incrementSeeds.append('sourceSeed')
227 slacapra 1.153
228     self.sourceSeedVtx = cfg_params.get('CMSSW.vtx_seed',None)
229 ewv 1.160 if self.sourceSeedVtx:
230 slacapra 1.177 print "vtx_seed is a deprecated parameter. Use preserve_seeds or increment_seeds in the future.\n","Added to increment_seeds."
231     self.incrementSeeds.append('VtxSmeared')
232 slacapra 1.22
233 slacapra 1.153 self.sourceSeedG4 = cfg_params.get('CMSSW.g4_seed',None)
234 ewv 1.160 if self.sourceSeedG4:
235 slacapra 1.177 print "g4_seed is a deprecated parameter. Use preserve_seeds or increment_seeds in the future.\n","Added to increment_seeds."
236     self.incrementSeeds.append('g4SimHits')
237 slacapra 1.90
238 slacapra 1.153 self.sourceSeedMix = cfg_params.get('CMSSW.mix_seed',None)
239 ewv 1.160 if self.sourceSeedMix:
240 slacapra 1.177 print "mix_seed is a deprecated parameter. Use preserve_seeds or increment_seeds in the future.\n","Added to increment_seeds."
241     self.incrementSeeds.append('mix')
242 slacapra 1.90
243 slacapra 1.153 self.firstRun = cfg_params.get('CMSSW.first_run',None)
244 slacapra 1.90
245 spiga 1.42 if self.pset != None: #CarlosDaniele
246 ewv 1.131 import PsetManipulator as pp
247 slacapra 1.97 PsetEdit = pp.PsetManipulator(self.pset) #Daniele Pset
248 gutsche 1.3
249 ewv 1.147 # Copy/return
250    
251 slacapra 1.153 self.copy_data = int(cfg_params.get('USER.copy_data',0))
252     self.return_data = int(cfg_params.get('USER.return_data',0))
253 ewv 1.147
254 slacapra 1.1 #DBSDLS-start
255 ewv 1.131 ## Initialize the variables that are extracted from DBS/DLS and needed in other places of the code
256 slacapra 1.1 self.maxEvents=0 # max events available ( --> check the requested nb. of evts in Creator.py)
257     self.DBSPaths={} # all dbs paths requested ( --> input to the site local discovery script)
258 gutsche 1.35 self.jobDestination=[] # Site destination(s) for each job (list of lists)
259 slacapra 1.1 ## Perform the data location and discovery (based on DBS/DLS)
260 slacapra 1.9 ## SL: Don't if NONE is specified as input (pythia use case)
261 gutsche 1.35 blockSites = {}
262 slacapra 1.9 if self.datasetPath:
263 gutsche 1.35 blockSites = self.DataDiscoveryAndLocation(cfg_params)
264 ewv 1.131 #DBSDLS-end
265 slacapra 1.1
266     self.tgzNameWithPath = self.getTarBall(self.executable)
267 ewv 1.131
268 slacapra 1.9 ## Select Splitting
269 ewv 1.131 if self.selectNoInput:
270 spiga 1.42 if self.pset == None: #CarlosDaniele
271     self.jobSplittingForScript()
272     else:
273     self.jobSplittingNoInput()
274 gutsche 1.92 else:
275 corvo 1.56 self.jobSplittingByBlocks(blockSites)
276 gutsche 1.5
277 slacapra 1.22 # modify Pset
278 spiga 1.42 if self.pset != None: #CarlosDaniele
279 slacapra 1.86 try:
280 ewv 1.160 # Add FrameworkJobReport to parameter-set, set max events.
281     # Reset later for data jobs by writeCFG which does all modifications
282 slacapra 1.90 PsetEdit.addCrabFJR(self.fjrFileName)
283 ewv 1.160 PsetEdit.maxEvent(self.eventsPerJob)
284 slacapra 1.90 PsetEdit.psetWriter(self.configFilename())
285 slacapra 1.86 except:
286     msg='Error while manipuliating ParameterSet: exiting...'
287     raise CrabException(msg)
288 gutsche 1.3
289 slacapra 1.1 def DataDiscoveryAndLocation(self, cfg_params):
290    
291 slacapra 1.86 import DataDiscovery
292     import DataLocation
293 gutsche 1.3 common.logger.debug(10,"CMSSW::DataDiscoveryAndLocation()")
294    
295     datasetPath=self.datasetPath
296    
297 slacapra 1.1 ## Contact the DBS
298 gutsche 1.92 common.logger.message("Contacting Data Discovery Services ...")
299 slacapra 1.1 try:
300 slacapra 1.137 self.pubdata=DataDiscovery.DataDiscovery(datasetPath, cfg_params)
301 slacapra 1.1 self.pubdata.fetchDBSInfo()
302    
303 slacapra 1.41 except DataDiscovery.NotExistingDatasetError, ex :
304 slacapra 1.1 msg = 'ERROR ***: failed Data Discovery in DBS : %s'%ex.getErrorMessage()
305     raise CrabException(msg)
306 slacapra 1.41 except DataDiscovery.NoDataTierinProvenanceError, ex :
307 slacapra 1.1 msg = 'ERROR ***: failed Data Discovery in DBS : %s'%ex.getErrorMessage()
308     raise CrabException(msg)
309 slacapra 1.41 except DataDiscovery.DataDiscoveryError, ex:
310 gutsche 1.66 msg = 'ERROR ***: failed Data Discovery in DBS : %s'%ex.getErrorMessage()
311 slacapra 1.1 raise CrabException(msg)
312    
313 gutsche 1.35 self.filesbyblock=self.pubdata.getFiles()
314 mkirn 1.37 self.eventsbyblock=self.pubdata.getEventsPerBlock()
315     self.eventsbyfile=self.pubdata.getEventsPerFile()
316 gutsche 1.3
317 slacapra 1.1 ## get max number of events
318 ewv 1.131 self.maxEvents=self.pubdata.getMaxEvents() ## self.maxEvents used in Creator.py
319 slacapra 1.1
320     ## Contact the DLS and build a list of sites hosting the fileblocks
321     try:
322 slacapra 1.41 dataloc=DataLocation.DataLocation(self.filesbyblock.keys(),cfg_params)
323 gutsche 1.6 dataloc.fetchDLSInfo()
324 slacapra 1.41 except DataLocation.DataLocationError , ex:
325 slacapra 1.1 msg = 'ERROR ***: failed Data Location in DLS \n %s '%ex.getErrorMessage()
326     raise CrabException(msg)
327 ewv 1.131
328 slacapra 1.1
329 gutsche 1.35 sites = dataloc.getSites()
330     allSites = []
331     listSites = sites.values()
332 slacapra 1.63 for listSite in listSites:
333     for oneSite in listSite:
334 gutsche 1.35 allSites.append(oneSite)
335     allSites = self.uniquelist(allSites)
336 gutsche 1.3
337 gutsche 1.92 # screen output
338     common.logger.message("Requested dataset: " + datasetPath + " has " + str(self.maxEvents) + " events in " + str(len(self.filesbyblock.keys())) + " blocks.\n")
339    
340 gutsche 1.35 return sites
341 ewv 1.131
342 ewv 1.170 # to Be Removed DS -- BL
343 spiga 1.165 # def setArgsList(self, argsList):
344     # self.argsList = argsList
345 mcinquil 1.140
346 gutsche 1.35 def jobSplittingByBlocks(self, blockSites):
347 slacapra 1.9 """
348 gutsche 1.35 Perform job splitting. Jobs run over an integer number of files
349     and no more than one block.
350     ARGUMENT: blockSites: dictionary with blocks as keys and list of host sites as values
351     REQUIRES: self.selectTotalNumberEvents, self.selectEventsPerJob, self.selectNumberofJobs,
352     self.total_number_of_events, self.eventsPerJob, self.theNumberOfJobs,
353     self.maxEvents, self.filesbyblock
354     SETS: self.jobDestination - Site destination(s) for each job (a list of lists)
355     self.total_number_of_jobs - Total # of jobs
356     self.list_of_args - File(s) job will run on (a list of lists)
357     """
358    
359     # ---- Handle the possible job splitting configurations ---- #
360     if (self.selectTotalNumberEvents):
361     totalEventsRequested = self.total_number_of_events
362     if (self.selectEventsPerJob):
363     eventsPerJobRequested = self.eventsPerJob
364     if (self.selectNumberOfJobs):
365     totalEventsRequested = self.theNumberOfJobs * self.eventsPerJob
366    
367     # If user requested all the events in the dataset
368     if (totalEventsRequested == -1):
369     eventsRemaining=self.maxEvents
370     # If user requested more events than are in the dataset
371     elif (totalEventsRequested > self.maxEvents):
372     eventsRemaining = self.maxEvents
373     common.logger.message("Requested "+str(self.total_number_of_events)+ " events, but only "+str(self.maxEvents)+" events are available.")
374     # If user requested less events than are in the dataset
375     else:
376     eventsRemaining = totalEventsRequested
377 slacapra 1.22
378 slacapra 1.41 # If user requested more events per job than are in the dataset
379     if (self.selectEventsPerJob and eventsPerJobRequested > self.maxEvents):
380     eventsPerJobRequested = self.maxEvents
381    
382 gutsche 1.35 # For user info at end
383     totalEventCount = 0
384 gutsche 1.3
385 gutsche 1.35 if (self.selectTotalNumberEvents and self.selectNumberOfJobs):
386     eventsPerJobRequested = int(eventsRemaining/self.theNumberOfJobs)
387 slacapra 1.22
388 gutsche 1.35 if (self.selectNumberOfJobs):
389     common.logger.message("May not create the exact number_of_jobs requested.")
390 slacapra 1.23
391 gutsche 1.38 if ( self.ncjobs == 'all' ) :
392     totalNumberOfJobs = 999999999
393     else :
394     totalNumberOfJobs = self.ncjobs
395 ewv 1.131
396 gutsche 1.35 blocks = blockSites.keys()
397     blockCount = 0
398     # Backup variable in case self.maxEvents counted events in a non-included block
399     numBlocksInDataset = len(blocks)
400 gutsche 1.3
401 gutsche 1.35 jobCount = 0
402     list_of_lists = []
403 gutsche 1.3
404 gutsche 1.92 # list tracking which jobs are in which jobs belong to which block
405     jobsOfBlock = {}
406    
407 gutsche 1.35 # ---- Iterate over the blocks in the dataset until ---- #
408     # ---- we've met the requested total # of events ---- #
409 gutsche 1.38 while ( (eventsRemaining > 0) and (blockCount < numBlocksInDataset) and (jobCount < totalNumberOfJobs)):
410 gutsche 1.35 block = blocks[blockCount]
411 gutsche 1.44 blockCount += 1
412 gutsche 1.104 if block not in jobsOfBlock.keys() :
413     jobsOfBlock[block] = []
414 ewv 1.131
415 gutsche 1.68 if self.eventsbyblock.has_key(block) :
416     numEventsInBlock = self.eventsbyblock[block]
417     common.logger.debug(5,'Events in Block File '+str(numEventsInBlock))
418 ewv 1.131
419 gutsche 1.68 files = self.filesbyblock[block]
420     numFilesInBlock = len(files)
421     if (numFilesInBlock <= 0):
422     continue
423     fileCount = 0
424    
425     # ---- New block => New job ---- #
426 ewv 1.131 parString = ""
427 gutsche 1.68 # counter for number of events in files currently worked on
428     filesEventCount = 0
429     # flag if next while loop should touch new file
430     newFile = 1
431     # job event counter
432     jobSkipEventCount = 0
433 ewv 1.131
434 gutsche 1.68 # ---- Iterate over the files in the block until we've met the requested ---- #
435     # ---- total # of events or we've gone over all the files in this block ---- #
436     while ( (eventsRemaining > 0) and (fileCount < numFilesInBlock) and (jobCount < totalNumberOfJobs) ):
437     file = files[fileCount]
438     if newFile :
439     try:
440     numEventsInFile = self.eventsbyfile[file]
441     common.logger.debug(6, "File "+str(file)+" has "+str(numEventsInFile)+" events")
442     # increase filesEventCount
443     filesEventCount += numEventsInFile
444     # Add file to current job
445     parString += '\\\"' + file + '\\\"\,'
446     newFile = 0
447     except KeyError:
448     common.logger.message("File "+str(file)+" has unknown number of events: skipping")
449 ewv 1.131
450 slacapra 1.177 eventsPerJobRequested = min(eventsPerJobRequested, eventsRemaining)
451 gutsche 1.68 # if less events in file remain than eventsPerJobRequested
452 slacapra 1.177 if ( filesEventCount - jobSkipEventCount < eventsPerJobRequested):
453 gutsche 1.68 # if last file in block
454     if ( fileCount == numFilesInBlock-1 ) :
455     # end job using last file, use remaining events in block
456     # close job and touch new file
457     fullString = parString[:-2]
458     list_of_lists.append([fullString,str(-1),str(jobSkipEventCount)])
459     common.logger.debug(3,"Job "+str(jobCount+1)+" can run over "+str(filesEventCount - jobSkipEventCount)+" events (last file in block).")
460     self.jobDestination.append(blockSites[block])
461     common.logger.debug(5,"Job "+str(jobCount+1)+" Destination: "+str(self.jobDestination[jobCount]))
462 gutsche 1.92 # fill jobs of block dictionary
463 gutsche 1.104 jobsOfBlock[block].append(jobCount+1)
464 gutsche 1.68 # reset counter
465     jobCount = jobCount + 1
466     totalEventCount = totalEventCount + filesEventCount - jobSkipEventCount
467     eventsRemaining = eventsRemaining - filesEventCount + jobSkipEventCount
468     jobSkipEventCount = 0
469     # reset file
470 ewv 1.131 parString = ""
471 gutsche 1.68 filesEventCount = 0
472     newFile = 1
473     fileCount += 1
474     else :
475     # go to next file
476     newFile = 1
477     fileCount += 1
478     # if events in file equal to eventsPerJobRequested
479     elif ( filesEventCount - jobSkipEventCount == eventsPerJobRequested ) :
480 gutsche 1.38 # close job and touch new file
481     fullString = parString[:-2]
482 gutsche 1.68 list_of_lists.append([fullString,str(eventsPerJobRequested),str(jobSkipEventCount)])
483     common.logger.debug(3,"Job "+str(jobCount+1)+" can run over "+str(eventsPerJobRequested)+" events.")
484 gutsche 1.38 self.jobDestination.append(blockSites[block])
485     common.logger.debug(5,"Job "+str(jobCount+1)+" Destination: "+str(self.jobDestination[jobCount]))
486 gutsche 1.104 jobsOfBlock[block].append(jobCount+1)
487 gutsche 1.38 # reset counter
488     jobCount = jobCount + 1
489 gutsche 1.68 totalEventCount = totalEventCount + eventsPerJobRequested
490     eventsRemaining = eventsRemaining - eventsPerJobRequested
491 gutsche 1.38 jobSkipEventCount = 0
492     # reset file
493 ewv 1.131 parString = ""
494 gutsche 1.38 filesEventCount = 0
495     newFile = 1
496     fileCount += 1
497 ewv 1.131
498 gutsche 1.68 # if more events in file remain than eventsPerJobRequested
499 gutsche 1.38 else :
500 gutsche 1.68 # close job but don't touch new file
501     fullString = parString[:-2]
502     list_of_lists.append([fullString,str(eventsPerJobRequested),str(jobSkipEventCount)])
503     common.logger.debug(3,"Job "+str(jobCount+1)+" can run over "+str(eventsPerJobRequested)+" events.")
504     self.jobDestination.append(blockSites[block])
505     common.logger.debug(5,"Job "+str(jobCount+1)+" Destination: "+str(self.jobDestination[jobCount]))
506 gutsche 1.104 jobsOfBlock[block].append(jobCount+1)
507 gutsche 1.68 # increase counter
508     jobCount = jobCount + 1
509     totalEventCount = totalEventCount + eventsPerJobRequested
510     eventsRemaining = eventsRemaining - eventsPerJobRequested
511     # calculate skip events for last file
512     # use filesEventCount (contains several files), jobSkipEventCount and eventsPerJobRequest
513     jobSkipEventCount = eventsPerJobRequested - (filesEventCount - jobSkipEventCount - self.eventsbyfile[file])
514     # remove all but the last file
515     filesEventCount = self.eventsbyfile[file]
516 ewv 1.160 parString = '\\\"' + file + '\\\"\,'
517 gutsche 1.68 pass # END if
518     pass # END while (iterate over files in the block)
519 gutsche 1.35 pass # END while (iterate over blocks in the dataset)
520 slacapra 1.41 self.ncjobs = self.total_number_of_jobs = jobCount
521 gutsche 1.38 if (eventsRemaining > 0 and jobCount < totalNumberOfJobs ):
522 gutsche 1.35 common.logger.message("Could not run on all requested events because some blocks not hosted at allowed sites.")
523 gutsche 1.92 common.logger.message(str(jobCount)+" job(s) can run on "+str(totalEventCount)+" events.\n")
524 ewv 1.131
525 gutsche 1.92 # screen output
526     screenOutput = "List of jobs and available destination sites:\n\n"
527    
528 mcinquil 1.124 # keep trace of block with no sites to print a warning at the end
529     noSiteBlock = []
530     bloskNoSite = []
531    
532 gutsche 1.92 blockCounter = 0
533 gutsche 1.104 for block in blocks:
534     if block in jobsOfBlock.keys() :
535     blockCounter += 1
536 slacapra 1.176 screenOutput += "Block %5i: jobs %20s: sites: %s\n" % (blockCounter,spanRanges(jobsOfBlock[block]),
537     ','.join(self.blackWhiteListParser.checkWhiteList(self.blackWhiteListParser.checkBlackList(blockSites[block],block),block)))
538 mcinquil 1.124 if len(self.blackWhiteListParser.checkWhiteList(self.blackWhiteListParser.checkBlackList(blockSites[block],block),block)) == 0:
539 ewv 1.131 noSiteBlock.append( spanRanges(jobsOfBlock[block]) )
540 mcinquil 1.124 bloskNoSite.append( blockCounter )
541 ewv 1.131
542 mcinquil 1.124 common.logger.message(screenOutput)
543 fanzago 1.127 if len(noSiteBlock) > 0 and len(bloskNoSite) > 0:
544 mcinquil 1.126 msg = 'WARNING: No sites are hosting any part of data for block:\n '
545     virgola = ""
546     if len(bloskNoSite) > 1:
547     virgola = ","
548     for block in bloskNoSite:
549     msg += ' ' + str(block) + virgola
550     msg += '\n Related jobs:\n '
551     virgola = ""
552     if len(noSiteBlock) > 1:
553     virgola = ","
554     for range_jobs in noSiteBlock:
555     msg += str(range_jobs) + virgola
556     msg += '\n will not be submitted and this block of data can not be analyzed!\n'
557 slacapra 1.155 if self.cfg_params.has_key('EDG.se_white_list'):
558     msg += 'WARNING: SE White List: '+self.cfg_params['EDG.se_white_list']+'\n'
559     msg += '(Hint: By whitelisting you force the job to run at this particular site(s).\n'
560     msg += 'Please check if the dataset is available at this site!)\n'
561     if self.cfg_params.has_key('EDG.ce_white_list'):
562     msg += 'WARNING: CE White List: '+self.cfg_params['EDG.ce_white_list']+'\n'
563     msg += '(Hint: By whitelisting you force the job to run at this particular site(s).\n'
564     msg += 'Please check if the dataset is available at this site!)\n'
565    
566 mcinquil 1.126 common.logger.message(msg)
567 gutsche 1.92
568 slacapra 1.9 self.list_of_args = list_of_lists
569     return
570    
571 slacapra 1.21 def jobSplittingNoInput(self):
572 slacapra 1.9 """
573     Perform job splitting based on number of event per job
574     """
575     common.logger.debug(5,'Splitting per events')
576 fanzago 1.130
577 ewv 1.131 if (self.selectEventsPerJob):
578 fanzago 1.130 common.logger.message('Required '+str(self.eventsPerJob)+' events per job ')
579     if (self.selectNumberOfJobs):
580     common.logger.message('Required '+str(self.theNumberOfJobs)+' jobs in total ')
581     if (self.selectTotalNumberEvents):
582     common.logger.message('Required '+str(self.total_number_of_events)+' events in total ')
583 slacapra 1.9
584 slacapra 1.10 if (self.total_number_of_events < 0):
585     msg='Cannot split jobs per Events with "-1" as total number of events'
586     raise CrabException(msg)
587    
588 slacapra 1.22 if (self.selectEventsPerJob):
589 spiga 1.65 if (self.selectTotalNumberEvents):
590     self.total_number_of_jobs = int(self.total_number_of_events/self.eventsPerJob)
591 ewv 1.131 elif(self.selectNumberOfJobs) :
592 spiga 1.65 self.total_number_of_jobs =self.theNumberOfJobs
593 ewv 1.131 self.total_number_of_events =int(self.theNumberOfJobs*self.eventsPerJob)
594 spiga 1.65
595 slacapra 1.22 elif (self.selectNumberOfJobs) :
596     self.total_number_of_jobs = self.theNumberOfJobs
597     self.eventsPerJob = int(self.total_number_of_events/self.total_number_of_jobs)
598 ewv 1.131
599 slacapra 1.9 common.logger.debug(5,'N jobs '+str(self.total_number_of_jobs))
600    
601     # is there any remainder?
602     check = int(self.total_number_of_events) - (int(self.total_number_of_jobs)*self.eventsPerJob)
603    
604     common.logger.debug(5,'Check '+str(check))
605    
606 gutsche 1.35 common.logger.message(str(self.total_number_of_jobs)+' jobs can be created, each for '+str(self.eventsPerJob)+' for a total of '+str(self.total_number_of_jobs*self.eventsPerJob)+' events')
607 slacapra 1.9 if check > 0:
608 gutsche 1.35 common.logger.message('Warning: asked '+str(self.total_number_of_events)+' but can do only '+str(int(self.total_number_of_jobs)*self.eventsPerJob))
609 slacapra 1.9
610 slacapra 1.10 # argument is seed number.$i
611 slacapra 1.9 self.list_of_args = []
612     for i in range(self.total_number_of_jobs):
613 gutsche 1.35 ## Since there is no input, any site is good
614 ewv 1.131 self.jobDestination.append([""]) #must be empty to write correctly the xml
615 slacapra 1.90 args=[]
616 spiga 1.57 if (self.firstRun):
617 slacapra 1.138 ## pythia first run
618 slacapra 1.90 args.append(str(self.firstRun)+str(i))
619     self.list_of_args.append(args)
620 ewv 1.131
621 gutsche 1.3 return
622    
623 spiga 1.42
624     def jobSplittingForScript(self):#CarlosDaniele
625     """
626     Perform job splitting based on number of job
627     """
628     common.logger.debug(5,'Splitting per job')
629     common.logger.message('Required '+str(self.theNumberOfJobs)+' jobs in total ')
630    
631     self.total_number_of_jobs = self.theNumberOfJobs
632    
633     common.logger.debug(5,'N jobs '+str(self.total_number_of_jobs))
634    
635     common.logger.message(str(self.total_number_of_jobs)+' jobs can be created')
636    
637     # argument is seed number.$i
638     self.list_of_args = []
639     for i in range(self.total_number_of_jobs):
640     ## Since there is no input, any site is good
641     # self.jobDestination.append(["Any"])
642     self.jobDestination.append([""])
643     ## no random seed
644     self.list_of_args.append([str(i)])
645     return
646    
647 gutsche 1.3 def split(self, jobParams):
648 ewv 1.131
649 gutsche 1.3 #### Fabio
650     njobs = self.total_number_of_jobs
651 slacapra 1.9 arglist = self.list_of_args
652 gutsche 1.3 # create the empty structure
653     for i in range(njobs):
654     jobParams.append("")
655 ewv 1.131
656 spiga 1.165 listID=[]
657     listField=[]
658 gutsche 1.3 for job in range(njobs):
659 slacapra 1.17 jobParams[job] = arglist[job]
660 spiga 1.167 listID.append(job+1)
661 spiga 1.162 job_ToSave ={}
662 spiga 1.169 concString = ' '
663 spiga 1.165 argu=''
664     if len(jobParams[job]):
665     argu += concString.join(jobParams[job] )
666 spiga 1.169 job_ToSave['arguments']= str(job+1)+' '+argu## new BL--DS
667 spiga 1.162 job_ToSave['dlsDestination']= self.jobDestination[job]## new BL--DS
668 spiga 1.165 #common._db.updateJob_(job,job_ToSave)## new BL--DS
669     listField.append(job_ToSave)
670 spiga 1.169 msg="Job "+str(job)+" Arguments: "+str(job+1)+" "+argu+"\n" \
671 spiga 1.165 +" Destination: "+str(self.jobDestination[job])
672     common.logger.debug(5,msg)
673     #common.logger.debug(5,"Job "+str(job)+" Destination: "+str(self.jobDestination[job]))
674     common._db.updateJob_(listID,listField)## new BL--DS
675     ## Pay Attention Here....DS--BL
676 ewv 1.170 self.argsList = (len(jobParams[1])+1)
677 gutsche 1.3
678     return
679 ewv 1.131
680 gutsche 1.3 def numberOfJobs(self):
681     # Fabio
682     return self.total_number_of_jobs
683    
684 slacapra 1.1 def getTarBall(self, exe):
685     """
686     Return the TarBall with lib and exe
687     """
688 ewv 1.131
689 slacapra 1.1 # if it exist, just return it
690 corvo 1.56 #
691     # Marco. Let's start to use relative path for Boss XML files
692     #
693     self.tgzNameWithPath = common.work_space.pathForTgz()+'share/'+self.tgz_name
694 slacapra 1.1 if os.path.exists(self.tgzNameWithPath):
695     return self.tgzNameWithPath
696    
697     # Prepare a tar gzipped file with user binaries.
698     self.buildTar_(exe)
699    
700     return string.strip(self.tgzNameWithPath)
701    
702     def buildTar_(self, executable):
703    
704     # First of all declare the user Scram area
705     swArea = self.scram.getSWArea_()
706     #print "swArea = ", swArea
707 slacapra 1.63 # swVersion = self.scram.getSWVersion()
708     # print "swVersion = ", swVersion
709 slacapra 1.1 swReleaseTop = self.scram.getReleaseTop_()
710     #print "swReleaseTop = ", swReleaseTop
711 ewv 1.131
712 slacapra 1.1 ## check if working area is release top
713     if swReleaseTop == '' or swArea == swReleaseTop:
714 afanfani 1.172 common.logger.debug(3,"swArea = "+swArea+" swReleaseTop ="+swReleaseTop)
715 slacapra 1.1 return
716    
717 slacapra 1.61 import tarfile
718     try: # create tar ball
719     tar = tarfile.open(self.tgzNameWithPath, "w:gz")
720     ## First find the executable
721 slacapra 1.86 if (self.executable != ''):
722 slacapra 1.61 exeWithPath = self.scram.findFile_(executable)
723     if ( not exeWithPath ):
724     raise CrabException('User executable '+executable+' not found')
725 ewv 1.131
726 slacapra 1.61 ## then check if it's private or not
727     if exeWithPath.find(swReleaseTop) == -1:
728     # the exe is private, so we must ship
729     common.logger.debug(5,"Exe "+exeWithPath+" to be tarred")
730     path = swArea+'/'
731 corvo 1.85 # distinguish case when script is in user project area or given by full path somewhere else
732     if exeWithPath.find(path) >= 0 :
733     exe = string.replace(exeWithPath, path,'')
734 slacapra 1.129 tar.add(path+exe,exe)
735 corvo 1.85 else :
736     tar.add(exeWithPath,os.path.basename(executable))
737 slacapra 1.61 pass
738     else:
739     # the exe is from release, we'll find it on WN
740     pass
741 ewv 1.131
742 slacapra 1.61 ## Now get the libraries: only those in local working area
743     libDir = 'lib'
744     lib = swArea+'/' +libDir
745     common.logger.debug(5,"lib "+lib+" to be tarred")
746     if os.path.exists(lib):
747     tar.add(lib,libDir)
748 ewv 1.131
749 slacapra 1.61 ## Now check if module dir is present
750     moduleDir = 'module'
751     module = swArea + '/' + moduleDir
752     if os.path.isdir(module):
753     tar.add(module,moduleDir)
754    
755     ## Now check if any data dir(s) is present
756     swAreaLen=len(swArea)
757     for root, dirs, files in os.walk(swArea):
758     if "data" in dirs:
759     common.logger.debug(5,"data "+root+"/data"+" to be tarred")
760     tar.add(root+"/data",root[swAreaLen:]+"/data")
761    
762 fanzago 1.93
763 fanzago 1.152 ## Add ProdCommon dir to tar
764 fanzago 1.93 prodcommonDir = 'ProdCommon'
765     prodcommonPath = os.environ['CRABDIR'] + '/' + 'ProdCommon'
766     if os.path.isdir(prodcommonPath):
767     tar.add(prodcommonPath,prodcommonDir)
768 ewv 1.131
769 slacapra 1.61 common.logger.debug(5,"Files added to "+self.tgzNameWithPath+" : "+str(tar.getnames()))
770     tar.close()
771     except :
772     raise CrabException('Could not create tar-ball')
773 gutsche 1.72
774     ## check for tarball size
775     tarballinfo = os.stat(self.tgzNameWithPath)
776     if ( tarballinfo.st_size > self.MaxTarBallSize*1024*1024 ) :
777     raise CrabException('Input sandbox size of ' + str(float(tarballinfo.st_size)/1024.0/1024.0) + ' MB is larger than the allowed ' + str(self.MaxTarBallSize) + ' MB input sandbox limit and not supported by the used GRID submission system. Please make sure that no unnecessary files are in all data directories in your local CMSSW project area as they are automatically packed into the input sandbox.')
778    
779 slacapra 1.61 ## create tar-ball with ML stuff
780 ewv 1.131 self.MLtgzfile = common.work_space.pathForTgz()+'share/MLfiles.tgz'
781 slacapra 1.61 try:
782     tar = tarfile.open(self.MLtgzfile, "w:gz")
783     path=os.environ['CRABDIR'] + '/python/'
784 fanzago 1.166 for file in ['report.py', 'DashboardAPI.py', 'Logger.py', 'ProcInfo.py', 'apmon.py', 'parseCrabFjr.py','writeCfg.py', 'JobReportErrorCode.py']:
785 slacapra 1.61 tar.add(path+file,file)
786     common.logger.debug(5,"Files added to "+self.MLtgzfile+" : "+str(tar.getnames()))
787     tar.close()
788     except :
789 corvo 1.58 raise CrabException('Could not create ML files tar-ball')
790 ewv 1.131
791 slacapra 1.1 return
792 ewv 1.131
793 slacapra 1.97 def additionalInputFileTgz(self):
794     """
795     Put all additional files into a tar ball and return its name
796     """
797     import tarfile
798     tarName= common.work_space.pathForTgz()+'share/'+self.additional_tgz_name
799     tar = tarfile.open(tarName, "w:gz")
800     for file in self.additional_inbox_files:
801     tar.add(file,string.split(file,'/')[-1])
802     common.logger.debug(5,"Files added to "+self.additional_tgz_name+" : "+str(tar.getnames()))
803     tar.close()
804     return tarName
805    
806 spiga 1.165 def wsSetupEnvironment(self, nj=0):
807 slacapra 1.1 """
808     Returns part of a job script which prepares
809     the execution environment for the job 'nj'.
810     """
811     # Prepare JobType-independent part
812 ewv 1.160 txt = '\n#Written by cms_cmssw::wsSetupEnvironment\n'
813 fanzago 1.133 txt += 'echo ">>> setup environment"\n'
814 ewv 1.131 txt += 'if [ $middleware == LCG ]; then \n'
815 gutsche 1.3 txt += self.wsSetupCMSLCGEnvironment_()
816     txt += 'elif [ $middleware == OSG ]; then\n'
817 gutsche 1.43 txt += ' WORKING_DIR=`/bin/mktemp -d $OSG_WN_TMP/cms_XXXXXXXXXXXX`\n'
818 ewv 1.132 txt += ' if [ ! $? == 0 ] ;then\n'
819 fanzago 1.161 txt += ' echo "ERROR ==> OSG $WORKING_DIR could not be created on WN `hostname`"\n'
820     txt += ' job_exit_code=10016\n'
821     txt += ' func_exit\n'
822 gutsche 1.3 txt += ' fi\n'
823 fanzago 1.133 txt += ' echo ">>> Created working directory: $WORKING_DIR"\n'
824 gutsche 1.3 txt += '\n'
825     txt += ' echo "Change to working directory: $WORKING_DIR"\n'
826     txt += ' cd $WORKING_DIR\n'
827 fanzago 1.133 txt += ' echo ">>> current directory (WORKING_DIR): $WORKING_DIR"\n'
828 ewv 1.131 txt += self.wsSetupCMSOSGEnvironment_()
829 fanzago 1.133 #txt += ' echo "### Set SCRAM ARCH to ' + self.executable_arch + ' ###"\n'
830     #txt += ' export SCRAM_ARCH='+self.executable_arch+'\n'
831 gutsche 1.3 txt += 'fi\n'
832 slacapra 1.1
833     # Prepare JobType-specific part
834     scram = self.scram.commandName()
835     txt += '\n\n'
836 fanzago 1.133 txt += 'echo ">>> specific cmssw setup environment:"\n'
837     txt += 'echo "CMSSW_VERSION = '+self.version+'"\n'
838 slacapra 1.1 txt += scram+' project CMSSW '+self.version+'\n'
839     txt += 'status=$?\n'
840     txt += 'if [ $status != 0 ] ; then\n'
841 fanzago 1.161 txt += ' echo "ERROR ==> CMSSW '+self.version+' not found on `hostname`" \n'
842     txt += ' job_exit_code=10034\n'
843 fanzago 1.163 txt += ' func_exit\n'
844 slacapra 1.1 txt += 'fi \n'
845     txt += 'cd '+self.version+'\n'
846 fanzago 1.99 ########## FEDE FOR DBS2 ######################
847     txt += 'SOFTWARE_DIR=`pwd`\n'
848 fanzago 1.133 txt += 'echo ">>> current directory (SOFTWARE_DIR): $SOFTWARE_DIR" \n'
849 fanzago 1.99 ###############################################
850 slacapra 1.1 ### needed grep for bug in scramv1 ###
851     txt += 'eval `'+scram+' runtime -sh | grep -v SCRAMRT_LSB_JOBNAME`\n'
852     # Handle the arguments:
853     txt += "\n"
854 gutsche 1.7 txt += "## number of arguments (first argument always jobnumber)\n"
855 slacapra 1.1 txt += "\n"
856 spiga 1.165 txt += "if [ $nargs -lt "+str(self.argsList)+" ]\n"
857 slacapra 1.1 txt += "then\n"
858 fanzago 1.161 txt += " echo 'ERROR ==> Too few arguments' +$nargs+ \n"
859     txt += ' job_exit_code=50113\n'
860     txt += " func_exit\n"
861 slacapra 1.1 txt += "fi\n"
862     txt += "\n"
863    
864     # Prepare job-specific part
865     job = common.job_list[nj]
866 fanzago 1.93 ### FEDE FOR DBS OUTPUT PUBLICATION
867 ewv 1.131 if (self.datasetPath):
868 fanzago 1.93 txt += '\n'
869     txt += 'DatasetPath='+self.datasetPath+'\n'
870    
871     datasetpath_split = self.datasetPath.split("/")
872 ewv 1.131
873 fanzago 1.93 txt += 'PrimaryDataset='+datasetpath_split[1]+'\n'
874     txt += 'DataTier='+datasetpath_split[2]+'\n'
875 fanzago 1.96 txt += 'ApplicationFamily=cmsRun\n'
876 fanzago 1.93
877     else:
878     txt += 'DatasetPath=MCDataTier\n'
879     txt += 'PrimaryDataset=null\n'
880     txt += 'DataTier=null\n'
881     txt += 'ApplicationFamily=MCDataTier\n'
882 ewv 1.170 if self.pset != None:
883 spiga 1.42 pset = os.path.basename(job.configFilename())
884     txt += '\n'
885 spiga 1.95 txt += 'cp $RUNTIME_AREA/'+pset+' .\n'
886 spiga 1.42 if (self.datasetPath): # standard job
887 ewv 1.160 txt += 'InputFiles=${args[1]}; export InputFiles\n'
888     txt += 'MaxEvents=${args[2]}; export MaxEvents\n'
889     txt += 'SkipEvents=${args[3]}; export SkipEvents\n'
890 spiga 1.42 txt += 'echo "Inputfiles:<$InputFiles>"\n'
891     txt += 'echo "MaxEvents:<$MaxEvents>"\n'
892     txt += 'echo "SkipEvents:<$SkipEvents>"\n'
893     else: # pythia like job
894 ewv 1.160 txt += 'PreserveSeeds=' + ','.join(self.preserveSeeds) + '; export PreserveSeeds\n'
895     txt += 'IncrementSeeds=' + ','.join(self.incrementSeeds) + '; export IncrementSeeds\n'
896     txt += 'echo "PreserveSeeds: <$PreserveSeeds>"\n'
897     txt += 'echo "IncrementSeeds:<$IncrementSeeds>"\n'
898 slacapra 1.90 if (self.firstRun):
899 ewv 1.160 txt += 'FirstRun=${args[1]}; export FirstRun\n'
900 spiga 1.57 txt += 'echo "FirstRun: <$FirstRun>"\n'
901 slacapra 1.90
902     txt += 'mv -f '+pset+' pset.cfg\n'
903 slacapra 1.1
904     if len(self.additional_inbox_files) > 0:
905 slacapra 1.97 txt += 'if [ -e $RUNTIME_AREA/'+self.additional_tgz_name+' ] ; then\n'
906     txt += ' tar xzvf $RUNTIME_AREA/'+self.additional_tgz_name+'\n'
907     txt += 'fi\n'
908 ewv 1.131 pass
909 slacapra 1.1
910 fanzago 1.163 if self.pset != None:
911 spiga 1.42 txt += '\n'
912     txt += 'echo "***** cat pset.cfg *********"\n'
913     txt += 'cat pset.cfg\n'
914     txt += 'echo "****** end pset.cfg ********"\n'
915     txt += '\n'
916 fanzago 1.94 txt += 'PSETHASH=`EdmConfigHash < pset.cfg` \n'
917     txt += 'echo "PSETHASH = $PSETHASH" \n'
918 fanzago 1.93 txt += '\n'
919 gutsche 1.3 return txt
920 slacapra 1.176
921 fanzago 1.166 def wsUntarSoftware(self, nj=0):
922 gutsche 1.3 """
923     Put in the script the commands to build an executable
924     or a library.
925     """
926    
927 fanzago 1.166 txt = '\n#Written by cms_cmssw::wsUntarSoftware\n'
928 gutsche 1.3
929     if os.path.isfile(self.tgzNameWithPath):
930 fanzago 1.133 txt += 'echo ">>> tar xzvf $RUNTIME_AREA/'+os.path.basename(self.tgzNameWithPath)+' :" \n'
931 gutsche 1.3 txt += 'tar xzvf $RUNTIME_AREA/'+os.path.basename(self.tgzNameWithPath)+'\n'
932     txt += 'untar_status=$? \n'
933     txt += 'if [ $untar_status -ne 0 ]; then \n'
934 fanzago 1.161 txt += ' echo "ERROR ==> Untarring .tgz file failed"\n'
935     txt += ' job_exit_code=$untar_status\n'
936     txt += ' func_exit\n'
937 gutsche 1.3 txt += 'else \n'
938     txt += ' echo "Successful untar" \n'
939     txt += 'fi \n'
940 gutsche 1.50 txt += '\n'
941 fanzago 1.152 txt += 'echo ">>> Include ProdCommon in PYTHONPATH:"\n'
942 gutsche 1.50 txt += 'if [ -z "$PYTHONPATH" ]; then\n'
943 fanzago 1.166 txt += ' export PYTHONPATH=$RUNTIME_AREA/ProdCommon\n'
944 gutsche 1.50 txt += 'else\n'
945 fanzago 1.166 txt += ' export PYTHONPATH=$RUNTIME_AREA/ProdCommon:${PYTHONPATH}\n'
946 fanzago 1.93 txt += 'echo "PYTHONPATH=$PYTHONPATH"\n'
947 gutsche 1.50 txt += 'fi\n'
948     txt += '\n'
949    
950 gutsche 1.3 pass
951 ewv 1.131
952 slacapra 1.1 return txt
953 ewv 1.170
954 fanzago 1.166 def wsBuildExe(self, nj=0):
955     """
956     Put in the script the commands to build an executable
957     or a library.
958     """
959    
960     txt = '\n#Written by cms_cmssw::wsBuildExe\n'
961     txt += 'echo ">>> moving CMSSW software directories in `pwd`" \n'
962    
963 ewv 1.170 txt += 'rm -r lib/ module/ \n'
964     txt += 'mv $RUNTIME_AREA/lib/ . \n'
965     txt += 'mv $RUNTIME_AREA/module/ . \n'
966     txt += 'mv $RUNTIME_AREA/ProdCommon/ . \n'
967    
968 fanzago 1.166 txt += 'if [ -z "$PYTHONPATH" ]; then\n'
969     txt += ' export PYTHONPATH=$SOFTWARE_DIR/ProdCommon\n'
970     txt += 'else\n'
971     txt += ' export PYTHONPATH=$SOFTWARE_DIR/ProdCommon:${PYTHONPATH}\n'
972     txt += 'echo "PYTHONPATH=$PYTHONPATH"\n'
973     txt += 'fi\n'
974     txt += '\n'
975    
976     return txt
977 slacapra 1.1
978     def modifySteeringCards(self, nj):
979     """
980 ewv 1.131 modify the card provided by the user,
981 slacapra 1.1 writing a new card into share dir
982     """
983 ewv 1.131
984 slacapra 1.1 def executableName(self):
985 slacapra 1.70 if self.scriptExe: #CarlosDaniele
986 spiga 1.42 return "sh "
987     else:
988     return self.executable
989 slacapra 1.1
990     def executableArgs(self):
991 ewv 1.160 # FUTURE: This function tests the CMSSW version. Can be simplified as we drop support for old versions
992 slacapra 1.70 if self.scriptExe:#CarlosDaniele
993 spiga 1.42 return self.scriptExe + " $NJob"
994 fanzago 1.115 else:
995     version_array = self.scram.getSWVersion().split('_')
996     major = 0
997     minor = 0
998     try:
999     major = int(version_array[1])
1000     minor = int(version_array[2])
1001     except:
1002 ewv 1.131 msg = "Cannot parse CMSSW version string: " + "_".join(version_array) + " for major and minor release number!"
1003 fanzago 1.115 raise CrabException(msg)
1004 ewv 1.160
1005     ex_args = ""
1006 ewv 1.171 # FUTURE: This tests the CMSSW version. Can remove code as versions deprecated
1007 ewv 1.160 # Framework job report
1008 fanzago 1.115 if major >= 1 and minor >= 5 :
1009 fanzago 1.166 ex_args += " -j $RUNTIME_AREA/crab_fjr_$NJob.xml"
1010 ewv 1.160 # Type of cfg file
1011     if major >= 2 :
1012 ewv 1.171 ex_args += " -p pset.py"
1013 fanzago 1.115 else:
1014 ewv 1.160 ex_args += " -p pset.cfg"
1015     return ex_args
1016 slacapra 1.1
1017     def inputSandbox(self, nj):
1018     """
1019     Returns a list of filenames to be put in JDL input sandbox.
1020     """
1021     inp_box = []
1022 slacapra 1.53 # # dict added to delete duplicate from input sandbox file list
1023     # seen = {}
1024 slacapra 1.1 ## code
1025     if os.path.isfile(self.tgzNameWithPath):
1026     inp_box.append(self.tgzNameWithPath)
1027 corvo 1.58 if os.path.isfile(self.MLtgzfile):
1028     inp_box.append(self.MLtgzfile)
1029 slacapra 1.1 ## config
1030 slacapra 1.70 if not self.pset is None:
1031 corvo 1.56 inp_box.append(common.work_space.pathForTgz() + 'job/' + self.configFilename())
1032 slacapra 1.1 ## additional input files
1033 slacapra 1.97 tgz = self.additionalInputFileTgz()
1034     inp_box.append(tgz)
1035 spiga 1.168 ## executable
1036     wrapper = os.path.basename(str(common._db.queryTask('scriptName')))
1037     inp_box.append(common.work_space.pathForTgz() +'job/'+ wrapper)
1038 slacapra 1.1 return inp_box
1039    
1040     def outputSandbox(self, nj):
1041     """
1042     Returns a list of filenames to be put in JDL output sandbox.
1043     """
1044     out_box = []
1045    
1046     ## User Declared output files
1047 slacapra 1.54 for out in (self.output_file+self.output_file_sandbox):
1048 ewv 1.131 n_out = nj + 1
1049 slacapra 1.1 out_box.append(self.numberFile_(out,str(n_out)))
1050     return out_box
1051    
1052     def prepareSteeringCards(self):
1053     """
1054     Make initial modifications of the user's steering card file.
1055     """
1056     return
1057    
1058     def wsRenameOutput(self, nj):
1059     """
1060     Returns part of a job script which renames the produced files.
1061     """
1062    
1063 ewv 1.160 txt = '\n#Written by cms_cmssw::wsRenameOutput\n'
1064 fanzago 1.148 txt += 'echo ">>> current directory (SOFTWARE_DIR): $SOFTWARE_DIR" \n'
1065     txt += 'echo ">>> current directory content:"\n'
1066 gutsche 1.7 txt += 'ls \n'
1067 fanzago 1.145 txt += '\n'
1068 slacapra 1.54
1069 fanzago 1.128 for fileWithSuffix in (self.output_file):
1070 slacapra 1.1 output_file_num = self.numberFile_(fileWithSuffix, '$NJob')
1071     txt += '\n'
1072 gutsche 1.7 txt += '# check output file\n'
1073 slacapra 1.106 txt += 'if [ -e ./'+fileWithSuffix+' ] ; then\n'
1074 ewv 1.147 if (self.copy_data == 1): # For OSG nodes, file is in $WORKING_DIR, should not be moved to $RUNTIME_AREA
1075     txt += ' mv '+fileWithSuffix+' '+output_file_num+'\n'
1076     txt += ' ln -s `pwd`/'+output_file_num+' $RUNTIME_AREA/'+fileWithSuffix+'\n'
1077     else:
1078     txt += ' mv '+fileWithSuffix+' $RUNTIME_AREA/'+output_file_num+'\n'
1079     txt += ' ln -s $RUNTIME_AREA/'+output_file_num+' $RUNTIME_AREA/'+fileWithSuffix+'\n'
1080 slacapra 1.106 txt += 'else\n'
1081 fanzago 1.161 txt += ' job_exit_code=60302\n'
1082     txt += ' echo "WARNING: Output file '+fileWithSuffix+' not found"\n'
1083 ewv 1.156 if common.scheduler.name().upper() == 'CONDOR_G':
1084 gutsche 1.7 txt += ' if [ $middleware == OSG ]; then \n'
1085     txt += ' echo "prepare dummy output file"\n'
1086     txt += ' echo "Processing of job output failed" > $RUNTIME_AREA/'+output_file_num+'\n'
1087     txt += ' fi \n'
1088 slacapra 1.1 txt += 'fi\n'
1089 slacapra 1.105 file_list = []
1090     for fileWithSuffix in (self.output_file):
1091     file_list.append(self.numberFile_(fileWithSuffix, '$NJob'))
1092 ewv 1.131
1093 slacapra 1.105 txt += 'file_list="'+string.join(file_list,' ')+'"\n'
1094 fanzago 1.149 txt += '\n'
1095 fanzago 1.148 txt += 'echo ">>> current directory (SOFTWARE_DIR): $SOFTWARE_DIR" \n'
1096     txt += 'echo ">>> current directory content:"\n'
1097     txt += 'ls \n'
1098     txt += '\n'
1099 gutsche 1.7 txt += 'cd $RUNTIME_AREA\n'
1100 fanzago 1.133 txt += 'echo ">>> current directory (RUNTIME_AREA): $RUNTIME_AREA"\n'
1101 slacapra 1.1 return txt
1102    
1103     def numberFile_(self, file, txt):
1104     """
1105     append _'txt' before last extension of a file
1106     """
1107     p = string.split(file,".")
1108     # take away last extension
1109     name = p[0]
1110     for x in p[1:-1]:
1111 slacapra 1.90 name=name+"."+x
1112 slacapra 1.1 # add "_txt"
1113     if len(p)>1:
1114 slacapra 1.90 ext = p[len(p)-1]
1115     result = name + '_' + txt + "." + ext
1116 slacapra 1.1 else:
1117 slacapra 1.90 result = name + '_' + txt
1118 ewv 1.131
1119 slacapra 1.1 return result
1120    
1121 slacapra 1.63 def getRequirements(self, nj=[]):
1122 slacapra 1.1 """
1123 ewv 1.131 return job requirements to add to jdl files
1124 slacapra 1.1 """
1125     req = ''
1126 slacapra 1.47 if self.version:
1127 slacapra 1.10 req='Member("VO-cms-' + \
1128 slacapra 1.47 self.version + \
1129 slacapra 1.10 '", other.GlueHostApplicationSoftwareRunTimeEnvironment)'
1130 farinafa 1.111 ## SL add requirement for OS version only if SL4
1131     #reSL4 = re.compile( r'slc4' )
1132 slacapra 1.109 if self.executable_arch: # and reSL4.search(self.executable_arch):
1133 gutsche 1.107 req+=' && Member("VO-cms-' + \
1134 slacapra 1.105 self.executable_arch + \
1135     '", other.GlueHostApplicationSoftwareRunTimeEnvironment)'
1136 gutsche 1.35
1137     req = req + ' && (other.GlueHostNetworkAdapterOutboundIP)'
1138 afanfani 1.158 if common.scheduler.name() == "glitecoll":
1139     req += ' && other.GlueCEStateStatus == "Production" '
1140 gutsche 1.35
1141 slacapra 1.1 return req
1142 gutsche 1.3
1143     def configFilename(self):
1144     """ return the config filename """
1145     return self.name()+'.cfg'
1146    
1147     def wsSetupCMSOSGEnvironment_(self):
1148     """
1149     Returns part of a job script which is prepares
1150     the execution environment and which is common for all CMS jobs.
1151     """
1152 ewv 1.160 txt = '\n#Written by cms_cmssw::wsSetupCMSOSGEnvironment_\n'
1153     txt += ' echo ">>> setup CMS OSG environment:"\n'
1154 fanzago 1.133 txt += ' echo "set SCRAM ARCH to ' + self.executable_arch + '"\n'
1155     txt += ' export SCRAM_ARCH='+self.executable_arch+'\n'
1156 fanzago 1.136 txt += ' echo "SCRAM_ARCH = $SCRAM_ARCH"\n'
1157 ewv 1.135 txt += ' if [ -f $OSG_APP/cmssoft/cms/cmsset_default.sh ] ;then\n'
1158 mkirn 1.40 txt += ' # Use $OSG_APP/cmssoft/cms/cmsset_default.sh to setup cms software\n'
1159 fanzago 1.133 txt += ' source $OSG_APP/cmssoft/cms/cmsset_default.sh '+self.version+'\n'
1160     txt += ' else\n'
1161 fanzago 1.161 txt += ' echo "ERROR ==> $OSG_APP/cmssoft/cms/cmsset_default.sh file not found"\n'
1162     txt += ' job_exit_code=10020\n'
1163     txt += ' func_exit\n'
1164 fanzago 1.133 txt += ' fi\n'
1165 gutsche 1.3 txt += '\n'
1166 fanzago 1.161 txt += ' echo "==> setup cms environment ok"\n'
1167 fanzago 1.136 txt += ' echo "SCRAM_ARCH = $SCRAM_ARCH"\n'
1168 gutsche 1.3
1169     return txt
1170 ewv 1.131
1171 gutsche 1.3 ### OLI_DANIELE
1172     def wsSetupCMSLCGEnvironment_(self):
1173     """
1174     Returns part of a job script which is prepares
1175     the execution environment and which is common for all CMS jobs.
1176     """
1177 ewv 1.160 txt = '\n#Written by cms_cmssw::wsSetupCMSLCGEnvironment_\n'
1178     txt += ' echo ">>> setup CMS LCG environment:"\n'
1179 fanzago 1.133 txt += ' echo "set SCRAM ARCH and BUILD_ARCH to ' + self.executable_arch + ' ###"\n'
1180     txt += ' export SCRAM_ARCH='+self.executable_arch+'\n'
1181     txt += ' export BUILD_ARCH='+self.executable_arch+'\n'
1182     txt += ' if [ ! $VO_CMS_SW_DIR ] ;then\n'
1183 fanzago 1.161 txt += ' echo "ERROR ==> CMS software dir not found on WN `hostname`"\n'
1184     txt += ' job_exit_code=10031\n'
1185     txt += ' func_exit\n'
1186 fanzago 1.133 txt += ' else\n'
1187     txt += ' echo "Sourcing environment... "\n'
1188     txt += ' if [ ! -s $VO_CMS_SW_DIR/cmsset_default.sh ] ;then\n'
1189 fanzago 1.161 txt += ' echo "ERROR ==> cmsset_default.sh file not found into dir $VO_CMS_SW_DIR"\n'
1190     txt += ' job_exit_code=10020\n'
1191     txt += ' func_exit\n'
1192 fanzago 1.133 txt += ' fi\n'
1193     txt += ' echo "sourcing $VO_CMS_SW_DIR/cmsset_default.sh"\n'
1194     txt += ' source $VO_CMS_SW_DIR/cmsset_default.sh\n'
1195     txt += ' result=$?\n'
1196     txt += ' if [ $result -ne 0 ]; then\n'
1197 fanzago 1.161 txt += ' echo "ERROR ==> problem sourcing $VO_CMS_SW_DIR/cmsset_default.sh"\n'
1198     txt += ' job_exit_code=10032\n'
1199     txt += ' func_exit\n'
1200 fanzago 1.133 txt += ' fi\n'
1201     txt += ' fi\n'
1202     txt += ' \n'
1203 fanzago 1.161 txt += ' echo "==> setup cms environment ok"\n'
1204 gutsche 1.3 return txt
1205 gutsche 1.5
1206 ewv 1.131 ### FEDE FOR DBS OUTPUT PUBLICATION
1207 fanzago 1.93 def modifyReport(self, nj):
1208     """
1209 ewv 1.131 insert the part of the script that modifies the FrameworkJob Report
1210 fanzago 1.93 """
1211 fanzago 1.94
1212 ewv 1.160 txt = '\n#Written by cms_cmssw::modifyReport\n'
1213 slacapra 1.176 publish_data = int(self.cfg_params.get('USER.publish_data',0))
1214 ewv 1.131 if (publish_data == 1):
1215 fanzago 1.94 processedDataset = self.cfg_params['USER.publish_data_name']
1216 fanzago 1.173 LFNBaseName = LFNBase(processedDataset)
1217 fanzago 1.175
1218     txt += 'if [ $copy_exit_status -eq 0 ]; then\n'
1219 fanzago 1.173 txt += ' FOR_LFN=%s_${PSETHASH}/\n'%(LFNBaseName)
1220 fanzago 1.175 txt += 'else\n'
1221     txt += ' FOR_LFN=/copy_problems/ \n'
1222     txt += ' SE=""\n'
1223     txt += ' SE_PATH=""\n'
1224     txt += 'fi\n'
1225    
1226     txt += 'echo ">>> Modify Job Report:" \n'
1227     txt += 'chmod a+x $SOFTWARE_DIR/ProdCommon/ProdCommon/FwkJobRep/ModifyJobReport.py\n'
1228     txt += 'ProcessedDataset='+processedDataset+'\n'
1229     txt += 'echo "ProcessedDataset = $ProcessedDataset"\n'
1230     txt += 'echo "SE = $SE"\n'
1231     txt += 'echo "SE_PATH = $SE_PATH"\n'
1232     txt += 'echo "FOR_LFN = $FOR_LFN" \n'
1233     txt += 'echo "CMSSW_VERSION = $CMSSW_VERSION"\n\n'
1234     txt += 'echo "$SOFTWARE_DIR/ProdCommon/ProdCommon/FwkJobRep/ModifyJobReport.py $RUNTIME_AREA/crab_fjr_$NJob.xml $NJob $FOR_LFN $PrimaryDataset $DataTier $ProcessedDataset $ApplicationFamily $executable $CMSSW_VERSION $PSETHASH $SE $SE_PATH"\n'
1235     txt += '$SOFTWARE_DIR/ProdCommon/ProdCommon/FwkJobRep/ModifyJobReport.py $RUNTIME_AREA/crab_fjr_$NJob.xml $NJob $FOR_LFN $PrimaryDataset $DataTier $ProcessedDataset $ApplicationFamily $executable $CMSSW_VERSION $PSETHASH $SE $SE_PATH\n'
1236     txt += 'modifyReport_result=$?\n'
1237     txt += 'if [ $modifyReport_result -ne 0 ]; then\n'
1238     txt += ' modifyReport_result=70500\n'
1239     txt += ' job_exit_code=$modifyReport_result\n'
1240     txt += ' echo "ModifyReportResult=$modifyReport_result" | tee -a $RUNTIME_AREA/$repo\n'
1241     txt += ' echo "WARNING: Problem with ModifyJobReport"\n'
1242     txt += 'else\n'
1243     txt += ' mv NewFrameworkJobReport.xml $RUNTIME_AREA/crab_fjr_$NJob.xml\n'
1244 spiga 1.103 txt += 'fi\n'
1245 fanzago 1.93 return txt
1246 fanzago 1.99
1247 gutsche 1.5 def setParam_(self, param, value):
1248     self._params[param] = value
1249    
1250     def getParams(self):
1251     return self._params
1252 gutsche 1.8
1253 gutsche 1.35 def uniquelist(self, old):
1254     """
1255     remove duplicates from a list
1256     """
1257     nd={}
1258     for e in old:
1259     nd[e]=0
1260     return nd.keys()
1261 mcinquil 1.121
1262 spiga 1.169 def outList(self):
1263 mcinquil 1.121 """
1264     check the dimension of the output files
1265     """
1266 spiga 1.169 txt = ''
1267     txt += 'echo ">>> list of expected files on output sandbox"\n'
1268 mcinquil 1.121 listOutFiles = []
1269 ewv 1.170 stdout = 'CMSSW_$NJob.stdout'
1270 spiga 1.169 stderr = 'CMSSW_$NJob.stderr'
1271 fanzago 1.148 if (self.return_data == 1):
1272 spiga 1.157 for file in (self.output_file+self.output_file_sandbox):
1273     listOutFiles.append(self.numberFile_(file, '$NJob'))
1274 spiga 1.169 listOutFiles.append(stdout)
1275     listOutFiles.append(stderr)
1276 ewv 1.156 else:
1277 spiga 1.157 for file in (self.output_file_sandbox):
1278     listOutFiles.append(self.numberFile_(file, '$NJob'))
1279 spiga 1.169 listOutFiles.append(stdout)
1280     listOutFiles.append(stderr)
1281 fanzago 1.161 txt += 'echo "output files: '+string.join(listOutFiles,' ')+'"\n'
1282 spiga 1.157 txt += 'filesToCheck="'+string.join(listOutFiles,' ')+'"\n'
1283 spiga 1.169 txt += 'export filesToCheck\n'
1284 ewv 1.170 return txt