ViewVC Help
View File | Revision Log | Show Annotations | Root Listing
root/cvsroot/COMP/CRAB/python/cms_cmssw.py
Revision: 1.92
Committed: Tue Jun 19 16:21:49 2007 UTC (17 years, 10 months ago) by gutsche
Content type: text/x-python
Branch: MAIN
Changes since 1.91: +47 -23 lines
Log Message:
Changed job creation and output to be independent of black and white lists.

All jobs are created after data discovery and job spitting using black and
white lists independent from the entries in the destination list of jobs.
If a job does not have any valid destination sites, it is nontheless created.

After job splitting, a screen output listing all blocks and all jobs belonging to a block
is shown listing available sites per block.

The user can submit x number of jobs while CRAB will skip in addition to the normal job submission
all jobs which have an empty destination list.

File Contents

# User Rev Content
1 slacapra 1.1 from JobType import JobType
2     from crab_logger import Logger
3     from crab_exceptions import *
4     from crab_util import *
5     import common
6     import Scram
7    
8 slacapra 1.70 import os, string, re, shutil, glob
9 slacapra 1.1
10     class Cmssw(JobType):
11 gutsche 1.38 def __init__(self, cfg_params, ncjobs):
12 slacapra 1.1 JobType.__init__(self, 'CMSSW')
13     common.logger.debug(3,'CMSSW::__init__')
14    
15 gutsche 1.3 # Marco.
16     self._params = {}
17     self.cfg_params = cfg_params
18 gutsche 1.38
19 gutsche 1.72 try:
20     self.MaxTarBallSize = float(self.cfg_params['EDG.maxtarballsize'])
21     except KeyError:
22 slacapra 1.86 self.MaxTarBallSize = 9.5
23 gutsche 1.72
24 gutsche 1.44 # number of jobs requested to be created, limit obj splitting
25 gutsche 1.38 self.ncjobs = ncjobs
26    
27 slacapra 1.1 log = common.logger
28    
29     self.scram = Scram.Scram(cfg_params)
30     self.additional_inbox_files = []
31     self.scriptExe = ''
32     self.executable = ''
33 slacapra 1.71 self.executable_arch = self.scram.getArch()
34 slacapra 1.1 self.tgz_name = 'default.tgz'
35 corvo 1.56 self.scriptName = 'CMSSW.sh'
36 spiga 1.42 self.pset = '' #scrip use case Da
37     self.datasetPath = '' #scrip use case Da
38 gutsche 1.3
39 gutsche 1.50 # set FJR file name
40     self.fjrFileName = 'crab_fjr.xml'
41    
42 slacapra 1.1 self.version = self.scram.getSWVersion()
43 slacapra 1.55 common.taskDB.setDict('codeVersion',self.version)
44 gutsche 1.5 self.setParam_('application', self.version)
45 slacapra 1.47
46 slacapra 1.1 ### collect Data cards
47 gutsche 1.66
48     ## get DBS mode
49     try:
50 slacapra 1.86 self.use_dbs_1 = int(self.cfg_params['CMSSW.use_dbs_1'])
51 gutsche 1.66 except KeyError:
52 slacapra 1.86 self.use_dbs_1 = 0
53 gutsche 1.66
54 slacapra 1.1 try:
55 slacapra 1.9 tmp = cfg_params['CMSSW.datasetpath']
56     log.debug(6, "CMSSW::CMSSW(): datasetPath = "+tmp)
57     if string.lower(tmp)=='none':
58     self.datasetPath = None
59 slacapra 1.21 self.selectNoInput = 1
60 slacapra 1.9 else:
61     self.datasetPath = tmp
62 slacapra 1.21 self.selectNoInput = 0
63 slacapra 1.1 except KeyError:
64 gutsche 1.3 msg = "Error: datasetpath not defined "
65 slacapra 1.1 raise CrabException(msg)
66 gutsche 1.5
67     # ML monitoring
68     # split dataset path style: /PreProdR3Minbias/SIM/GEN-SIM
69 slacapra 1.9 if not self.datasetPath:
70     self.setParam_('dataset', 'None')
71     self.setParam_('owner', 'None')
72     else:
73 gutsche 1.92 try:
74     datasetpath_split = self.datasetPath.split("/")
75     # standard style
76     if self.use_dbs_1 == 1 :
77     self.setParam_('dataset', datasetpath_split[1])
78     self.setParam_('owner', datasetpath_split[-1])
79     else:
80     self.setParam_('dataset', datasetpath_split[1])
81     self.setParam_('owner', datasetpath_split[2])
82     except:
83     self.setParam_('dataset', self.datasetPath)
84     self.setParam_('owner', self.datasetPath)
85    
86 gutsche 1.8 self.setTaskid_()
87     self.setParam_('taskId', self.cfg_params['taskId'])
88 gutsche 1.5
89 slacapra 1.1 self.dataTiers = []
90    
91     ## now the application
92     try:
93     self.executable = cfg_params['CMSSW.executable']
94 gutsche 1.5 self.setParam_('exe', self.executable)
95 slacapra 1.1 log.debug(6, "CMSSW::CMSSW(): executable = "+self.executable)
96     msg = "Default executable cmsRun overridden. Switch to " + self.executable
97     log.debug(3,msg)
98     except KeyError:
99     self.executable = 'cmsRun'
100 gutsche 1.5 self.setParam_('exe', self.executable)
101 slacapra 1.1 msg = "User executable not defined. Use cmsRun"
102     log.debug(3,msg)
103     pass
104    
105     try:
106     self.pset = cfg_params['CMSSW.pset']
107     log.debug(6, "Cmssw::Cmssw(): PSet file = "+self.pset)
108 spiga 1.42 if self.pset.lower() != 'none' :
109     if (not os.path.exists(self.pset)):
110     raise CrabException("User defined PSet file "+self.pset+" does not exist")
111     else:
112     self.pset = None
113 slacapra 1.1 except KeyError:
114     raise CrabException("PSet file missing. Cannot run cmsRun ")
115    
116     # output files
117 slacapra 1.53 ## stuff which must be returned always via sandbox
118     self.output_file_sandbox = []
119    
120     # add fjr report by default via sandbox
121     self.output_file_sandbox.append(self.fjrFileName)
122    
123     # other output files to be returned via sandbox or copied to SE
124 slacapra 1.1 try:
125     self.output_file = []
126     tmp = cfg_params['CMSSW.output_file']
127     if tmp != '':
128     tmpOutFiles = string.split(cfg_params['CMSSW.output_file'],',')
129     log.debug(7, 'cmssw::cmssw(): output files '+str(tmpOutFiles))
130     for tmp in tmpOutFiles:
131     tmp=string.strip(tmp)
132     self.output_file.append(tmp)
133     pass
134     else:
135 gutsche 1.92 log.message("No output file defined: only stdout/err and the CRAB Framework Job Report will be available\n")
136 slacapra 1.1 pass
137     pass
138     except KeyError:
139 gutsche 1.92 log.message("No output file defined: only stdout/err and the CRAB Framework Job Report will be available\n")
140 slacapra 1.1 pass
141    
142     # script_exe file as additional file in inputSandbox
143     try:
144 slacapra 1.10 self.scriptExe = cfg_params['USER.script_exe']
145     if self.scriptExe != '':
146     if not os.path.isfile(self.scriptExe):
147 slacapra 1.64 msg ="ERROR. file "+self.scriptExe+" not found"
148 slacapra 1.10 raise CrabException(msg)
149 spiga 1.42 self.additional_inbox_files.append(string.strip(self.scriptExe))
150 slacapra 1.1 except KeyError:
151 spiga 1.42 self.scriptExe = ''
152 slacapra 1.70
153 spiga 1.42 #CarlosDaniele
154     if self.datasetPath == None and self.pset == None and self.scriptExe == '' :
155 slacapra 1.70 msg ="Error. script_exe not defined"
156 spiga 1.42 raise CrabException(msg)
157    
158 slacapra 1.1 ## additional input files
159     try:
160 slacapra 1.29 tmpAddFiles = string.split(cfg_params['USER.additional_input_files'],',')
161 slacapra 1.70 for tmp in tmpAddFiles:
162     tmp = string.strip(tmp)
163     dirname = ''
164     if not tmp[0]=="/": dirname = "."
165 corvo 1.85 files = []
166     if string.find(tmp,"*")>-1:
167     files = glob.glob(os.path.join(dirname, tmp))
168     if len(files)==0:
169     raise CrabException("No additional input file found with this pattern: "+tmp)
170     else:
171     files.append(tmp)
172 slacapra 1.70 for file in files:
173     if not os.path.exists(file):
174     raise CrabException("Additional input file not found: "+file)
175 slacapra 1.45 pass
176 corvo 1.85 fname = string.split(file, '/')[-1]
177     storedFile = common.work_space.pathForTgz()+'share/'+fname
178 slacapra 1.70 shutil.copyfile(file, storedFile)
179     self.additional_inbox_files.append(string.strip(storedFile))
180 slacapra 1.1 pass
181     pass
182 slacapra 1.70 common.logger.debug(5,"Additional input files: "+str(self.additional_inbox_files))
183 slacapra 1.1 except KeyError:
184     pass
185    
186 slacapra 1.9 # files per job
187 slacapra 1.1 try:
188 gutsche 1.35 if (cfg_params['CMSSW.files_per_jobs']):
189     raise CrabException("files_per_jobs no longer supported. Quitting.")
190 gutsche 1.3 except KeyError:
191 gutsche 1.35 pass
192 gutsche 1.3
193 slacapra 1.9 ## Events per job
194 gutsche 1.3 try:
195 slacapra 1.10 self.eventsPerJob =int( cfg_params['CMSSW.events_per_job'])
196 slacapra 1.9 self.selectEventsPerJob = 1
197 gutsche 1.3 except KeyError:
198 slacapra 1.9 self.eventsPerJob = -1
199     self.selectEventsPerJob = 0
200    
201 slacapra 1.22 ## number of jobs
202     try:
203     self.theNumberOfJobs =int( cfg_params['CMSSW.number_of_jobs'])
204     self.selectNumberOfJobs = 1
205     except KeyError:
206     self.theNumberOfJobs = 0
207     self.selectNumberOfJobs = 0
208 slacapra 1.10
209 gutsche 1.35 try:
210     self.total_number_of_events = int(cfg_params['CMSSW.total_number_of_events'])
211     self.selectTotalNumberEvents = 1
212     except KeyError:
213     self.total_number_of_events = 0
214     self.selectTotalNumberEvents = 0
215    
216 spiga 1.42 if self.pset != None: #CarlosDaniele
217     if ( (self.selectTotalNumberEvents + self.selectEventsPerJob + self.selectNumberOfJobs) != 2 ):
218     msg = 'Must define exactly two of total_number_of_events, events_per_job, or number_of_jobs.'
219     raise CrabException(msg)
220     else:
221     if (self.selectNumberOfJobs == 0):
222     msg = 'Must specify number_of_jobs.'
223     raise CrabException(msg)
224 gutsche 1.35
225 slacapra 1.22 ## source seed for pythia
226     try:
227     self.sourceSeed = int(cfg_params['CMSSW.pythia_seed'])
228     except KeyError:
229 slacapra 1.23 self.sourceSeed = None
230     common.logger.debug(5,"No seed given")
231 slacapra 1.22
232 slacapra 1.28 try:
233     self.sourceSeedVtx = int(cfg_params['CMSSW.vtx_seed'])
234     except KeyError:
235     self.sourceSeedVtx = None
236     common.logger.debug(5,"No vertex seed given")
237 slacapra 1.90
238     try:
239     self.sourceSeedG4 = int(cfg_params['CMSSW.g4_seed'])
240     except KeyError:
241     self.sourceSeedG4 = None
242     common.logger.debug(5,"No g4 sim hits seed given")
243    
244     try:
245     self.sourceSeedMix = int(cfg_params['CMSSW.mix_seed'])
246     except KeyError:
247     self.sourceSeedMix = None
248     common.logger.debug(5,"No mix seed given")
249    
250 spiga 1.57 try:
251     self.firstRun = int(cfg_params['CMSSW.first_run'])
252     except KeyError:
253     self.firstRun = None
254     common.logger.debug(5,"No first run given")
255 spiga 1.42 if self.pset != None: #CarlosDaniele
256 slacapra 1.90 import PsetManipulator
257     PsetEdit = PsetManipulator.PsetManipulator(self.pset) #Daniele Pset
258 gutsche 1.3
259 slacapra 1.1 #DBSDLS-start
260     ## Initialize the variables that are extracted from DBS/DLS and needed in other places of the code
261     self.maxEvents=0 # max events available ( --> check the requested nb. of evts in Creator.py)
262     self.DBSPaths={} # all dbs paths requested ( --> input to the site local discovery script)
263 gutsche 1.35 self.jobDestination=[] # Site destination(s) for each job (list of lists)
264 slacapra 1.1 ## Perform the data location and discovery (based on DBS/DLS)
265 slacapra 1.9 ## SL: Don't if NONE is specified as input (pythia use case)
266 gutsche 1.35 blockSites = {}
267 slacapra 1.9 if self.datasetPath:
268 gutsche 1.35 blockSites = self.DataDiscoveryAndLocation(cfg_params)
269 slacapra 1.1 #DBSDLS-end
270    
271     self.tgzNameWithPath = self.getTarBall(self.executable)
272 slacapra 1.10
273 slacapra 1.9 ## Select Splitting
274 spiga 1.42 if self.selectNoInput:
275     if self.pset == None: #CarlosDaniele
276     self.jobSplittingForScript()
277     else:
278     self.jobSplittingNoInput()
279 gutsche 1.92 else:
280 corvo 1.56 self.jobSplittingByBlocks(blockSites)
281 gutsche 1.5
282 slacapra 1.22 # modify Pset
283 spiga 1.42 if self.pset != None: #CarlosDaniele
284 slacapra 1.86 try:
285     if (self.datasetPath): # standard job
286     # allow to processa a fraction of events in a file
287 slacapra 1.90 PsetEdit.inputModule("INPUT")
288     PsetEdit.maxEvent("INPUTMAXEVENTS")
289     PsetEdit.skipEvent("INPUTSKIPEVENTS")
290 slacapra 1.86 else: # pythia like job
291 slacapra 1.90 PsetEdit.maxEvent(self.eventsPerJob)
292 slacapra 1.86 if (self.firstRun):
293 slacapra 1.90 PsetEdit.pythiaFirstRun("INPUTFIRSTRUN") #First Run
294 slacapra 1.86 if (self.sourceSeed) :
295 slacapra 1.90 PsetEdit.pythiaSeed("INPUT")
296 slacapra 1.86 if (self.sourceSeedVtx) :
297 slacapra 1.90 PsetEdit.vtxSeed("INPUTVTX")
298     if (self.sourceSeedG4) :
299     self.PsetEdit.g4Seed("INPUTG4")
300     if (self.sourceSeedMix) :
301     self.PsetEdit.mixSeed("INPUTMIX")
302 slacapra 1.86 # add FrameworkJobReport to parameter-set
303 slacapra 1.90 PsetEdit.addCrabFJR(self.fjrFileName)
304     PsetEdit.psetWriter(self.configFilename())
305 slacapra 1.86 except:
306     msg='Error while manipuliating ParameterSet: exiting...'
307     raise CrabException(msg)
308 gutsche 1.3
309 slacapra 1.1 def DataDiscoveryAndLocation(self, cfg_params):
310    
311 slacapra 1.86 import DataDiscovery
312     import DataDiscovery_DBS2
313     import DataLocation
314 gutsche 1.3 common.logger.debug(10,"CMSSW::DataDiscoveryAndLocation()")
315    
316     datasetPath=self.datasetPath
317    
318 slacapra 1.1 ## Contact the DBS
319 gutsche 1.92 common.logger.message("Contacting Data Discovery Services ...")
320 slacapra 1.1 try:
321 gutsche 1.66
322 slacapra 1.86 if self.use_dbs_1 == 1 :
323     self.pubdata=DataDiscovery.DataDiscovery(datasetPath, cfg_params)
324     else :
325 corvo 1.85 self.pubdata=DataDiscovery_DBS2.DataDiscovery_DBS2(datasetPath, cfg_params)
326 slacapra 1.1 self.pubdata.fetchDBSInfo()
327    
328 slacapra 1.41 except DataDiscovery.NotExistingDatasetError, ex :
329 slacapra 1.1 msg = 'ERROR ***: failed Data Discovery in DBS : %s'%ex.getErrorMessage()
330     raise CrabException(msg)
331 slacapra 1.41 except DataDiscovery.NoDataTierinProvenanceError, ex :
332 slacapra 1.1 msg = 'ERROR ***: failed Data Discovery in DBS : %s'%ex.getErrorMessage()
333     raise CrabException(msg)
334 slacapra 1.41 except DataDiscovery.DataDiscoveryError, ex:
335 gutsche 1.66 msg = 'ERROR ***: failed Data Discovery in DBS : %s'%ex.getErrorMessage()
336 slacapra 1.1 raise CrabException(msg)
337 gutsche 1.67 except DataDiscovery_DBS2.NotExistingDatasetError_DBS2, ex :
338     msg = 'ERROR ***: failed Data Discovery in DBS : %s'%ex.getErrorMessage()
339     raise CrabException(msg)
340     except DataDiscovery_DBS2.NoDataTierinProvenanceError_DBS2, ex :
341     msg = 'ERROR ***: failed Data Discovery in DBS : %s'%ex.getErrorMessage()
342     raise CrabException(msg)
343     except DataDiscovery_DBS2.DataDiscoveryError_DBS2, ex:
344     msg = 'ERROR ***: failed Data Discovery in DBS : %s'%ex.getErrorMessage()
345     raise CrabException(msg)
346 slacapra 1.1
347 gutsche 1.35 self.filesbyblock=self.pubdata.getFiles()
348 mkirn 1.37 self.eventsbyblock=self.pubdata.getEventsPerBlock()
349     self.eventsbyfile=self.pubdata.getEventsPerFile()
350 gutsche 1.3
351 slacapra 1.1 ## get max number of events
352     self.maxEvents=self.pubdata.getMaxEvents() ## self.maxEvents used in Creator.py
353    
354     ## Contact the DLS and build a list of sites hosting the fileblocks
355     try:
356 slacapra 1.41 dataloc=DataLocation.DataLocation(self.filesbyblock.keys(),cfg_params)
357 gutsche 1.6 dataloc.fetchDLSInfo()
358 slacapra 1.41 except DataLocation.DataLocationError , ex:
359 slacapra 1.1 msg = 'ERROR ***: failed Data Location in DLS \n %s '%ex.getErrorMessage()
360     raise CrabException(msg)
361    
362    
363 gutsche 1.35 sites = dataloc.getSites()
364     allSites = []
365     listSites = sites.values()
366 slacapra 1.63 for listSite in listSites:
367     for oneSite in listSite:
368 gutsche 1.35 allSites.append(oneSite)
369     allSites = self.uniquelist(allSites)
370 gutsche 1.3
371 gutsche 1.92 # screen output
372     common.logger.message("Requested dataset: " + datasetPath + " has " + str(self.maxEvents) + " events in " + str(len(self.filesbyblock.keys())) + " blocks.\n")
373    
374 gutsche 1.35 return sites
375 gutsche 1.3
376 gutsche 1.35 def jobSplittingByBlocks(self, blockSites):
377 slacapra 1.9 """
378 gutsche 1.35 Perform job splitting. Jobs run over an integer number of files
379     and no more than one block.
380     ARGUMENT: blockSites: dictionary with blocks as keys and list of host sites as values
381     REQUIRES: self.selectTotalNumberEvents, self.selectEventsPerJob, self.selectNumberofJobs,
382     self.total_number_of_events, self.eventsPerJob, self.theNumberOfJobs,
383     self.maxEvents, self.filesbyblock
384     SETS: self.jobDestination - Site destination(s) for each job (a list of lists)
385     self.total_number_of_jobs - Total # of jobs
386     self.list_of_args - File(s) job will run on (a list of lists)
387     """
388    
389     # ---- Handle the possible job splitting configurations ---- #
390     if (self.selectTotalNumberEvents):
391     totalEventsRequested = self.total_number_of_events
392     if (self.selectEventsPerJob):
393     eventsPerJobRequested = self.eventsPerJob
394     if (self.selectNumberOfJobs):
395     totalEventsRequested = self.theNumberOfJobs * self.eventsPerJob
396    
397     # If user requested all the events in the dataset
398     if (totalEventsRequested == -1):
399     eventsRemaining=self.maxEvents
400     # If user requested more events than are in the dataset
401     elif (totalEventsRequested > self.maxEvents):
402     eventsRemaining = self.maxEvents
403     common.logger.message("Requested "+str(self.total_number_of_events)+ " events, but only "+str(self.maxEvents)+" events are available.")
404     # If user requested less events than are in the dataset
405     else:
406     eventsRemaining = totalEventsRequested
407 slacapra 1.22
408 slacapra 1.41 # If user requested more events per job than are in the dataset
409     if (self.selectEventsPerJob and eventsPerJobRequested > self.maxEvents):
410     eventsPerJobRequested = self.maxEvents
411    
412 gutsche 1.35 # For user info at end
413     totalEventCount = 0
414 gutsche 1.3
415 gutsche 1.35 if (self.selectTotalNumberEvents and self.selectNumberOfJobs):
416     eventsPerJobRequested = int(eventsRemaining/self.theNumberOfJobs)
417 slacapra 1.22
418 gutsche 1.35 if (self.selectNumberOfJobs):
419     common.logger.message("May not create the exact number_of_jobs requested.")
420 slacapra 1.23
421 gutsche 1.38 if ( self.ncjobs == 'all' ) :
422     totalNumberOfJobs = 999999999
423     else :
424     totalNumberOfJobs = self.ncjobs
425    
426    
427 gutsche 1.35 blocks = blockSites.keys()
428     blockCount = 0
429     # Backup variable in case self.maxEvents counted events in a non-included block
430     numBlocksInDataset = len(blocks)
431 gutsche 1.3
432 gutsche 1.35 jobCount = 0
433     list_of_lists = []
434 gutsche 1.3
435 gutsche 1.92 # list tracking which jobs are in which jobs belong to which block
436     jobsOfBlock = {}
437    
438 gutsche 1.35 # ---- Iterate over the blocks in the dataset until ---- #
439     # ---- we've met the requested total # of events ---- #
440 gutsche 1.38 while ( (eventsRemaining > 0) and (blockCount < numBlocksInDataset) and (jobCount < totalNumberOfJobs)):
441 gutsche 1.35 block = blocks[blockCount]
442 gutsche 1.44 blockCount += 1
443    
444 gutsche 1.68 if self.eventsbyblock.has_key(block) :
445     numEventsInBlock = self.eventsbyblock[block]
446     common.logger.debug(5,'Events in Block File '+str(numEventsInBlock))
447 slacapra 1.9
448 gutsche 1.68 files = self.filesbyblock[block]
449     numFilesInBlock = len(files)
450     if (numFilesInBlock <= 0):
451     continue
452     fileCount = 0
453    
454     # ---- New block => New job ---- #
455     parString = "\\{"
456     # counter for number of events in files currently worked on
457     filesEventCount = 0
458     # flag if next while loop should touch new file
459     newFile = 1
460     # job event counter
461     jobSkipEventCount = 0
462 slacapra 1.9
463 gutsche 1.68 # ---- Iterate over the files in the block until we've met the requested ---- #
464     # ---- total # of events or we've gone over all the files in this block ---- #
465     while ( (eventsRemaining > 0) and (fileCount < numFilesInBlock) and (jobCount < totalNumberOfJobs) ):
466     file = files[fileCount]
467     if newFile :
468     try:
469     numEventsInFile = self.eventsbyfile[file]
470     common.logger.debug(6, "File "+str(file)+" has "+str(numEventsInFile)+" events")
471     # increase filesEventCount
472     filesEventCount += numEventsInFile
473     # Add file to current job
474     parString += '\\\"' + file + '\\\"\,'
475     newFile = 0
476     except KeyError:
477     common.logger.message("File "+str(file)+" has unknown number of events: skipping")
478 slacapra 1.41
479 gutsche 1.38
480 gutsche 1.68 # if less events in file remain than eventsPerJobRequested
481     if ( filesEventCount - jobSkipEventCount < eventsPerJobRequested ) :
482     # if last file in block
483     if ( fileCount == numFilesInBlock-1 ) :
484     # end job using last file, use remaining events in block
485     # close job and touch new file
486     fullString = parString[:-2]
487     fullString += '\\}'
488     list_of_lists.append([fullString,str(-1),str(jobSkipEventCount)])
489     common.logger.debug(3,"Job "+str(jobCount+1)+" can run over "+str(filesEventCount - jobSkipEventCount)+" events (last file in block).")
490     self.jobDestination.append(blockSites[block])
491     common.logger.debug(5,"Job "+str(jobCount+1)+" Destination: "+str(self.jobDestination[jobCount]))
492 gutsche 1.92 # fill jobs of block dictionary
493     if block in jobsOfBlock.keys() :
494     jobsOfBlock[block].append(jobCount+1)
495     else:
496     jobsOfBlock[block] = [jobCount+1]
497 gutsche 1.68 # reset counter
498     jobCount = jobCount + 1
499     totalEventCount = totalEventCount + filesEventCount - jobSkipEventCount
500     eventsRemaining = eventsRemaining - filesEventCount + jobSkipEventCount
501     jobSkipEventCount = 0
502     # reset file
503     parString = "\\{"
504     filesEventCount = 0
505     newFile = 1
506     fileCount += 1
507     else :
508     # go to next file
509     newFile = 1
510     fileCount += 1
511     # if events in file equal to eventsPerJobRequested
512     elif ( filesEventCount - jobSkipEventCount == eventsPerJobRequested ) :
513 gutsche 1.38 # close job and touch new file
514     fullString = parString[:-2]
515     fullString += '\\}'
516 gutsche 1.68 list_of_lists.append([fullString,str(eventsPerJobRequested),str(jobSkipEventCount)])
517     common.logger.debug(3,"Job "+str(jobCount+1)+" can run over "+str(eventsPerJobRequested)+" events.")
518 gutsche 1.38 self.jobDestination.append(blockSites[block])
519     common.logger.debug(5,"Job "+str(jobCount+1)+" Destination: "+str(self.jobDestination[jobCount]))
520 gutsche 1.92 if block in jobsOfBlock.keys() :
521     jobsOfBlock[block].append(jobCount+1)
522     else:
523     jobsOfBlock[block] = [jobCount+1]
524 gutsche 1.38 # reset counter
525     jobCount = jobCount + 1
526 gutsche 1.68 totalEventCount = totalEventCount + eventsPerJobRequested
527     eventsRemaining = eventsRemaining - eventsPerJobRequested
528 gutsche 1.38 jobSkipEventCount = 0
529     # reset file
530     parString = "\\{"
531     filesEventCount = 0
532     newFile = 1
533     fileCount += 1
534 gutsche 1.68
535     # if more events in file remain than eventsPerJobRequested
536 gutsche 1.38 else :
537 gutsche 1.68 # close job but don't touch new file
538     fullString = parString[:-2]
539     fullString += '\\}'
540     list_of_lists.append([fullString,str(eventsPerJobRequested),str(jobSkipEventCount)])
541     common.logger.debug(3,"Job "+str(jobCount+1)+" can run over "+str(eventsPerJobRequested)+" events.")
542     self.jobDestination.append(blockSites[block])
543     common.logger.debug(5,"Job "+str(jobCount+1)+" Destination: "+str(self.jobDestination[jobCount]))
544 gutsche 1.92 if block in jobsOfBlock.keys() :
545     jobsOfBlock[block].append(jobCount+1)
546     else:
547     jobsOfBlock[block] = [jobCount+1]
548 gutsche 1.68 # increase counter
549     jobCount = jobCount + 1
550     totalEventCount = totalEventCount + eventsPerJobRequested
551     eventsRemaining = eventsRemaining - eventsPerJobRequested
552     # calculate skip events for last file
553     # use filesEventCount (contains several files), jobSkipEventCount and eventsPerJobRequest
554     jobSkipEventCount = eventsPerJobRequested - (filesEventCount - jobSkipEventCount - self.eventsbyfile[file])
555     # remove all but the last file
556     filesEventCount = self.eventsbyfile[file]
557     parString = "\\{"
558     parString += '\\\"' + file + '\\\"\,'
559     pass # END if
560     pass # END while (iterate over files in the block)
561 gutsche 1.35 pass # END while (iterate over blocks in the dataset)
562 slacapra 1.41 self.ncjobs = self.total_number_of_jobs = jobCount
563 gutsche 1.38 if (eventsRemaining > 0 and jobCount < totalNumberOfJobs ):
564 gutsche 1.35 common.logger.message("Could not run on all requested events because some blocks not hosted at allowed sites.")
565 gutsche 1.92 common.logger.message(str(jobCount)+" job(s) can run on "+str(totalEventCount)+" events.\n")
566 slacapra 1.22
567 gutsche 1.92 # screen output
568     screenOutput = "List of jobs and available destination sites:\n\n"
569    
570     blockCounter = 0
571     for block in jobsOfBlock.keys():
572     blockCounter += 1
573     screenOutput += "Block %5i: jobs %20s: sites: %s\n" % (blockCounter,spanRanges(jobsOfBlock[block]),','.join(blockSites[block]))
574    
575     common.logger.message(screenOutput)
576    
577 slacapra 1.9 self.list_of_args = list_of_lists
578     return
579    
580 slacapra 1.21 def jobSplittingNoInput(self):
581 slacapra 1.9 """
582     Perform job splitting based on number of event per job
583     """
584     common.logger.debug(5,'Splitting per events')
585     common.logger.message('Required '+str(self.eventsPerJob)+' events per job ')
586 slacapra 1.22 common.logger.message('Required '+str(self.theNumberOfJobs)+' jobs in total ')
587 slacapra 1.9 common.logger.message('Required '+str(self.total_number_of_events)+' events in total ')
588    
589 slacapra 1.10 if (self.total_number_of_events < 0):
590     msg='Cannot split jobs per Events with "-1" as total number of events'
591     raise CrabException(msg)
592    
593 slacapra 1.22 if (self.selectEventsPerJob):
594 spiga 1.65 if (self.selectTotalNumberEvents):
595     self.total_number_of_jobs = int(self.total_number_of_events/self.eventsPerJob)
596     elif(self.selectNumberOfJobs) :
597     self.total_number_of_jobs =self.theNumberOfJobs
598     self.total_number_of_events =int(self.theNumberOfJobs*self.eventsPerJob)
599    
600 slacapra 1.22 elif (self.selectNumberOfJobs) :
601     self.total_number_of_jobs = self.theNumberOfJobs
602     self.eventsPerJob = int(self.total_number_of_events/self.total_number_of_jobs)
603 spiga 1.65
604 slacapra 1.9 common.logger.debug(5,'N jobs '+str(self.total_number_of_jobs))
605    
606     # is there any remainder?
607     check = int(self.total_number_of_events) - (int(self.total_number_of_jobs)*self.eventsPerJob)
608    
609     common.logger.debug(5,'Check '+str(check))
610    
611 gutsche 1.35 common.logger.message(str(self.total_number_of_jobs)+' jobs can be created, each for '+str(self.eventsPerJob)+' for a total of '+str(self.total_number_of_jobs*self.eventsPerJob)+' events')
612 slacapra 1.9 if check > 0:
613 gutsche 1.35 common.logger.message('Warning: asked '+str(self.total_number_of_events)+' but can do only '+str(int(self.total_number_of_jobs)*self.eventsPerJob))
614 slacapra 1.9
615 slacapra 1.10 # argument is seed number.$i
616 slacapra 1.9 self.list_of_args = []
617     for i in range(self.total_number_of_jobs):
618 gutsche 1.35 ## Since there is no input, any site is good
619 slacapra 1.86 # self.jobDestination.append(["Any"])
620 spiga 1.42 self.jobDestination.append([""]) #must be empty to write correctly the xml
621 slacapra 1.90 args=[]
622 spiga 1.57 if (self.firstRun):
623     ## pythia first run
624 slacapra 1.86 #self.list_of_args.append([(str(self.firstRun)+str(i))])
625 slacapra 1.90 args.append(str(self.firstRun)+str(i))
626 spiga 1.57 else:
627     ## no first run
628 slacapra 1.86 #self.list_of_args.append([str(i)])
629 slacapra 1.90 args.append(str(i))
630 slacapra 1.23 if (self.sourceSeed):
631 slacapra 1.90 args.append(str(self.sourceSeed)+str(i))
632 slacapra 1.28 if (self.sourceSeedVtx):
633 slacapra 1.90 ## + vtx random seed
634     args.append(str(self.sourceSeedVtx)+str(i))
635     if (self.sourceSeedG4):
636     ## + G4 random seed
637     args.append(str(self.sourceSeedG4)+str(i))
638     if (self.sourceSeedMix):
639     ## + Mix random seed
640     args.append(str(self.sourceSeedMix)+str(i))
641     pass
642     pass
643     self.list_of_args.append(args)
644     pass
645 slacapra 1.86
646 slacapra 1.90 # print self.list_of_args
647 gutsche 1.3
648     return
649    
650 spiga 1.42
651     def jobSplittingForScript(self):#CarlosDaniele
652     """
653     Perform job splitting based on number of job
654     """
655     common.logger.debug(5,'Splitting per job')
656     common.logger.message('Required '+str(self.theNumberOfJobs)+' jobs in total ')
657    
658     self.total_number_of_jobs = self.theNumberOfJobs
659    
660     common.logger.debug(5,'N jobs '+str(self.total_number_of_jobs))
661    
662     common.logger.message(str(self.total_number_of_jobs)+' jobs can be created')
663    
664     # argument is seed number.$i
665     self.list_of_args = []
666     for i in range(self.total_number_of_jobs):
667     ## Since there is no input, any site is good
668     # self.jobDestination.append(["Any"])
669     self.jobDestination.append([""])
670     ## no random seed
671     self.list_of_args.append([str(i)])
672     return
673    
674 gutsche 1.3 def split(self, jobParams):
675    
676     common.jobDB.load()
677     #### Fabio
678     njobs = self.total_number_of_jobs
679 slacapra 1.9 arglist = self.list_of_args
680 gutsche 1.3 # create the empty structure
681     for i in range(njobs):
682     jobParams.append("")
683    
684     for job in range(njobs):
685 slacapra 1.17 jobParams[job] = arglist[job]
686     # print str(arglist[job])
687     # print jobParams[job]
688 gutsche 1.3 common.jobDB.setArguments(job, jobParams[job])
689 gutsche 1.35 common.logger.debug(5,"Job "+str(job)+" Destination: "+str(self.jobDestination[job]))
690     common.jobDB.setDestination(job, self.jobDestination[job])
691 gutsche 1.3
692     common.jobDB.save()
693     return
694    
695     def getJobTypeArguments(self, nj, sched):
696 slacapra 1.17 result = ''
697     for i in common.jobDB.arguments(nj):
698     result=result+str(i)+" "
699     return result
700 gutsche 1.3
701     def numberOfJobs(self):
702     # Fabio
703     return self.total_number_of_jobs
704    
705 slacapra 1.1 def getTarBall(self, exe):
706     """
707     Return the TarBall with lib and exe
708     """
709    
710     # if it exist, just return it
711 corvo 1.56 #
712     # Marco. Let's start to use relative path for Boss XML files
713     #
714     self.tgzNameWithPath = common.work_space.pathForTgz()+'share/'+self.tgz_name
715 slacapra 1.1 if os.path.exists(self.tgzNameWithPath):
716     return self.tgzNameWithPath
717    
718     # Prepare a tar gzipped file with user binaries.
719     self.buildTar_(exe)
720    
721     return string.strip(self.tgzNameWithPath)
722    
723     def buildTar_(self, executable):
724    
725     # First of all declare the user Scram area
726     swArea = self.scram.getSWArea_()
727     #print "swArea = ", swArea
728 slacapra 1.63 # swVersion = self.scram.getSWVersion()
729     # print "swVersion = ", swVersion
730 slacapra 1.1 swReleaseTop = self.scram.getReleaseTop_()
731     #print "swReleaseTop = ", swReleaseTop
732    
733     ## check if working area is release top
734     if swReleaseTop == '' or swArea == swReleaseTop:
735     return
736    
737 slacapra 1.61 import tarfile
738     try: # create tar ball
739     tar = tarfile.open(self.tgzNameWithPath, "w:gz")
740     ## First find the executable
741 slacapra 1.86 if (self.executable != ''):
742 slacapra 1.61 exeWithPath = self.scram.findFile_(executable)
743     if ( not exeWithPath ):
744     raise CrabException('User executable '+executable+' not found')
745    
746     ## then check if it's private or not
747     if exeWithPath.find(swReleaseTop) == -1:
748     # the exe is private, so we must ship
749     common.logger.debug(5,"Exe "+exeWithPath+" to be tarred")
750     path = swArea+'/'
751 corvo 1.85 # distinguish case when script is in user project area or given by full path somewhere else
752     if exeWithPath.find(path) >= 0 :
753     exe = string.replace(exeWithPath, path,'')
754     tar.add(path+exe,os.path.basename(executable))
755     else :
756     tar.add(exeWithPath,os.path.basename(executable))
757 slacapra 1.61 pass
758     else:
759     # the exe is from release, we'll find it on WN
760     pass
761    
762     ## Now get the libraries: only those in local working area
763     libDir = 'lib'
764     lib = swArea+'/' +libDir
765     common.logger.debug(5,"lib "+lib+" to be tarred")
766     if os.path.exists(lib):
767     tar.add(lib,libDir)
768    
769     ## Now check if module dir is present
770     moduleDir = 'module'
771     module = swArea + '/' + moduleDir
772     if os.path.isdir(module):
773     tar.add(module,moduleDir)
774    
775     ## Now check if any data dir(s) is present
776     swAreaLen=len(swArea)
777     for root, dirs, files in os.walk(swArea):
778     if "data" in dirs:
779     common.logger.debug(5,"data "+root+"/data"+" to be tarred")
780     tar.add(root+"/data",root[swAreaLen:]+"/data")
781    
782     ## Add ProdAgent dir to tar
783     paDir = 'ProdAgentApi'
784     pa = os.environ['CRABDIR'] + '/' + 'ProdAgentApi'
785     if os.path.isdir(pa):
786     tar.add(pa,paDir)
787    
788     common.logger.debug(5,"Files added to "+self.tgzNameWithPath+" : "+str(tar.getnames()))
789     tar.close()
790     except :
791     raise CrabException('Could not create tar-ball')
792 gutsche 1.72
793     ## check for tarball size
794     tarballinfo = os.stat(self.tgzNameWithPath)
795     if ( tarballinfo.st_size > self.MaxTarBallSize*1024*1024 ) :
796     raise CrabException('Input sandbox size of ' + str(float(tarballinfo.st_size)/1024.0/1024.0) + ' MB is larger than the allowed ' + str(self.MaxTarBallSize) + ' MB input sandbox limit and not supported by the used GRID submission system. Please make sure that no unnecessary files are in all data directories in your local CMSSW project area as they are automatically packed into the input sandbox.')
797    
798 slacapra 1.61 ## create tar-ball with ML stuff
799 corvo 1.58 self.MLtgzfile = common.work_space.pathForTgz()+'share/MLfiles.tgz'
800 slacapra 1.61 try:
801     tar = tarfile.open(self.MLtgzfile, "w:gz")
802     path=os.environ['CRABDIR'] + '/python/'
803     for file in ['report.py', 'DashboardAPI.py', 'Logger.py', 'ProcInfo.py', 'apmon.py', 'parseCrabFjr.py']:
804     tar.add(path+file,file)
805     common.logger.debug(5,"Files added to "+self.MLtgzfile+" : "+str(tar.getnames()))
806     tar.close()
807     except :
808 corvo 1.58 raise CrabException('Could not create ML files tar-ball')
809    
810 slacapra 1.1 return
811    
812     def wsSetupEnvironment(self, nj):
813     """
814     Returns part of a job script which prepares
815     the execution environment for the job 'nj'.
816     """
817     # Prepare JobType-independent part
818 gutsche 1.3 txt = ''
819    
820     ## OLI_Daniele at this level middleware already known
821    
822     txt += 'if [ $middleware == LCG ]; then \n'
823     txt += self.wsSetupCMSLCGEnvironment_()
824     txt += 'elif [ $middleware == OSG ]; then\n'
825 gutsche 1.43 txt += ' WORKING_DIR=`/bin/mktemp -d $OSG_WN_TMP/cms_XXXXXXXXXXXX`\n'
826     txt += ' echo "Created working directory: $WORKING_DIR"\n'
827 gutsche 1.3 txt += ' if [ ! -d $WORKING_DIR ] ;then\n'
828 gutsche 1.7 txt += ' echo "SET_CMS_ENV 10016 ==> OSG $WORKING_DIR could not be created on WN `hostname`"\n'
829 slacapra 1.90 txt += ' echo "JOB_EXIT_STATUS = 10016"\n'
830     txt += ' echo "JobExitCode=10016" | tee -a $RUNTIME_AREA/$repo\n'
831     txt += ' dumpStatus $RUNTIME_AREA/$repo\n'
832 gutsche 1.13 txt += ' rm -f $RUNTIME_AREA/$repo \n'
833     txt += ' echo "MonitorJobID=`echo $MonitorJobID`" | tee -a $RUNTIME_AREA/$repo \n'
834     txt += ' echo "MonitorID=`echo $MonitorID`" | tee -a $RUNTIME_AREA/$repo\n'
835 gutsche 1.3 txt += ' exit 1\n'
836     txt += ' fi\n'
837     txt += '\n'
838     txt += ' echo "Change to working directory: $WORKING_DIR"\n'
839     txt += ' cd $WORKING_DIR\n'
840     txt += self.wsSetupCMSOSGEnvironment_()
841     txt += 'fi\n'
842 slacapra 1.1
843     # Prepare JobType-specific part
844     scram = self.scram.commandName()
845     txt += '\n\n'
846     txt += 'echo "### SPECIFIC JOB SETUP ENVIRONMENT ###"\n'
847 slacapra 1.86 txt += 'echo "Setting SCRAM_ARCH='+self.executable_arch+'"\n'
848     txt += 'export SCRAM_ARCH='+self.executable_arch+'\n'
849 slacapra 1.1 txt += scram+' project CMSSW '+self.version+'\n'
850     txt += 'status=$?\n'
851     txt += 'if [ $status != 0 ] ; then\n'
852 gutsche 1.7 txt += ' echo "SET_EXE_ENV 10034 ==>ERROR CMSSW '+self.version+' not found on `hostname`" \n'
853 gutsche 1.3 txt += ' echo "JOB_EXIT_STATUS = 10034"\n'
854 gutsche 1.7 txt += ' echo "JobExitCode=10034" | tee -a $RUNTIME_AREA/$repo\n'
855 slacapra 1.1 txt += ' dumpStatus $RUNTIME_AREA/$repo\n'
856 gutsche 1.13 txt += ' rm -f $RUNTIME_AREA/$repo \n'
857     txt += ' echo "MonitorJobID=`echo $MonitorJobID`" | tee -a $RUNTIME_AREA/$repo \n'
858     txt += ' echo "MonitorID=`echo $MonitorID`" | tee -a $RUNTIME_AREA/$repo\n'
859 gutsche 1.3 ## OLI_Daniele
860     txt += ' if [ $middleware == OSG ]; then \n'
861     txt += ' echo "Remove working directory: $WORKING_DIR"\n'
862     txt += ' cd $RUNTIME_AREA\n'
863     txt += ' /bin/rm -rf $WORKING_DIR\n'
864     txt += ' if [ -d $WORKING_DIR ] ;then\n'
865 slacapra 1.90 txt += ' echo "SET_CMS_ENV 10018 ==> OSG $WORKING_DIR could not be deleted on WN `hostname` after CMSSW CMSSW_0_6_1 not found on `hostname`"\n'
866     txt += ' echo "JOB_EXIT_STATUS = 10018"\n'
867     txt += ' echo "JobExitCode=10018" | tee -a $RUNTIME_AREA/$repo\n'
868     txt += ' dumpStatus $RUNTIME_AREA/$repo\n'
869 gutsche 1.13 txt += ' rm -f $RUNTIME_AREA/$repo \n'
870     txt += ' echo "MonitorJobID=`echo $MonitorJobID`" | tee -a $RUNTIME_AREA/$repo \n'
871     txt += ' echo "MonitorID=`echo $MonitorID`" | tee -a $RUNTIME_AREA/$repo\n'
872 gutsche 1.3 txt += ' fi\n'
873     txt += ' fi \n'
874     txt += ' exit 1 \n'
875 slacapra 1.1 txt += 'fi \n'
876     txt += 'echo "CMSSW_VERSION = '+self.version+'"\n'
877     txt += 'cd '+self.version+'\n'
878     ### needed grep for bug in scramv1 ###
879 corvo 1.58 txt += scram+' runtime -sh\n'
880 slacapra 1.1 txt += 'eval `'+scram+' runtime -sh | grep -v SCRAMRT_LSB_JOBNAME`\n'
881 corvo 1.58 txt += 'echo $PATH\n'
882 slacapra 1.1
883     # Handle the arguments:
884     txt += "\n"
885 gutsche 1.7 txt += "## number of arguments (first argument always jobnumber)\n"
886 slacapra 1.1 txt += "\n"
887 mkirn 1.32 # txt += "narg=$#\n"
888     txt += "if [ $nargs -lt 2 ]\n"
889 slacapra 1.1 txt += "then\n"
890 mkirn 1.33 txt += " echo 'SET_EXE_ENV 1 ==> ERROR Too few arguments' +$nargs+ \n"
891 gutsche 1.3 txt += ' echo "JOB_EXIT_STATUS = 50113"\n'
892 gutsche 1.7 txt += ' echo "JobExitCode=50113" | tee -a $RUNTIME_AREA/$repo\n'
893 slacapra 1.1 txt += ' dumpStatus $RUNTIME_AREA/$repo\n'
894 gutsche 1.13 txt += ' rm -f $RUNTIME_AREA/$repo \n'
895     txt += ' echo "MonitorJobID=`echo $MonitorJobID`" | tee -a $RUNTIME_AREA/$repo \n'
896     txt += ' echo "MonitorID=`echo $MonitorID`" | tee -a $RUNTIME_AREA/$repo\n'
897 gutsche 1.3 ## OLI_Daniele
898     txt += ' if [ $middleware == OSG ]; then \n'
899     txt += ' echo "Remove working directory: $WORKING_DIR"\n'
900     txt += ' cd $RUNTIME_AREA\n'
901     txt += ' /bin/rm -rf $WORKING_DIR\n'
902     txt += ' if [ -d $WORKING_DIR ] ;then\n'
903 slacapra 1.90 txt += ' echo "SET_EXE_ENV 50114 ==> OSG $WORKING_DIR could not be deleted on WN `hostname` after Too few arguments for CRAB job wrapper"\n'
904     txt += ' echo "JOB_EXIT_STATUS = 50114"\n'
905     txt += ' echo "JobExitCode=50114" | tee -a $RUNTIME_AREA/$repo\n'
906     txt += ' dumpStatus $RUNTIME_AREA/$repo\n'
907 gutsche 1.13 txt += ' rm -f $RUNTIME_AREA/$repo \n'
908     txt += ' echo "MonitorJobID=`echo $MonitorJobID`" | tee -a $RUNTIME_AREA/$repo \n'
909     txt += ' echo "MonitorID=`echo $MonitorID`" | tee -a $RUNTIME_AREA/$repo\n'
910 gutsche 1.3 txt += ' fi\n'
911     txt += ' fi \n'
912 slacapra 1.1 txt += " exit 1\n"
913     txt += "fi\n"
914     txt += "\n"
915    
916     # Prepare job-specific part
917     job = common.job_list[nj]
918 spiga 1.42 if self.pset != None: #CarlosDaniele
919     pset = os.path.basename(job.configFilename())
920     txt += '\n'
921     if (self.datasetPath): # standard job
922     #txt += 'InputFiles=$2\n'
923     txt += 'InputFiles=${args[1]}\n'
924     txt += 'MaxEvents=${args[2]}\n'
925     txt += 'SkipEvents=${args[3]}\n'
926     txt += 'echo "Inputfiles:<$InputFiles>"\n'
927 slacapra 1.90 txt += 'sed "s#{\'INPUT\'}#$InputFiles#" '+pset+' > tmp && mv -f tmp '+pset+'\n'
928 spiga 1.42 txt += 'echo "MaxEvents:<$MaxEvents>"\n'
929 slacapra 1.90 txt += 'sed "s#INPUTMAXEVENTS#$MaxEvents#" '+pset+' > tmp && mv -f tmp '+pset+'\n'
930 spiga 1.42 txt += 'echo "SkipEvents:<$SkipEvents>"\n'
931 slacapra 1.90 txt += 'sed "s#INPUTSKIPEVENTS#$SkipEvents#" '+pset+' > tmp && mv -f tmp '+pset+'\n'
932 spiga 1.42 else: # pythia like job
933 slacapra 1.90 seedIndex=1
934     if (self.firstRun):
935     txt += 'FirstRun=${args['+str(seedIndex)+']}\n'
936 spiga 1.57 txt += 'echo "FirstRun: <$FirstRun>"\n'
937 slacapra 1.90 txt += 'sed "s#\<INPUTFIRSTRUN\>#$FirstRun#" '+pset+' > tmp && mv -f tmp '+pset+'\n'
938     seedIndex=seedIndex+1
939    
940 spiga 1.57 if (self.sourceSeed):
941 slacapra 1.90 txt += 'Seed=${args['+str(seedIndex)+']}\n'
942     txt += 'sed "s#\<INPUT\>#$Seed#" '+pset+' > tmp && mv -f tmp '+pset+'\n'
943     seedIndex=seedIndex+1
944     ## the following seeds are not always present
945 spiga 1.42 if (self.sourceSeedVtx):
946 slacapra 1.90 txt += 'VtxSeed=${args['+str(seedIndex)+']}\n'
947 spiga 1.42 txt += 'echo "VtxSeed: <$VtxSeed>"\n'
948 slacapra 1.90 txt += 'sed "s#\<INPUTVTX\>#$VtxSeed#" '+pset+' > tmp && mv -f tmp '+pset+'\n'
949     seedIndex += 1
950     if (self.sourceSeedG4):
951     txt += 'G4Seed=${args['+str(seedIndex)+']}\n'
952     txt += 'echo "G4Seed: <$G4Seed>"\n'
953     txt += 'sed "s#\<INPUTG4\>#$G4Seed#" '+pset+' > tmp && mv -f tmp '+pset+'\n'
954     seedIndex += 1
955     if (self.sourceSeedMix):
956     txt += 'mixSeed=${args['+str(seedIndex)+']}\n'
957     txt += 'echo "MixSeed: <$mixSeed>"\n'
958     txt += 'sed "s#\<INPUTMIX\>#$mixSeed#" '+pset+' > tmp && mv -f tmp '+pset+'\n'
959     seedIndex += 1
960     pass
961     pass
962     txt += 'mv -f '+pset+' pset.cfg\n'
963 slacapra 1.1
964     if len(self.additional_inbox_files) > 0:
965     for file in self.additional_inbox_files:
966 mkirn 1.31 relFile = file.split("/")[-1]
967     txt += 'if [ -e $RUNTIME_AREA/'+relFile+' ] ; then\n'
968     txt += ' cp $RUNTIME_AREA/'+relFile+' .\n'
969     txt += ' chmod +x '+relFile+'\n'
970 slacapra 1.1 txt += 'fi\n'
971     pass
972    
973 spiga 1.42 if self.pset != None: #CarlosDaniele
974     txt += 'echo "### END JOB SETUP ENVIRONMENT ###"\n\n'
975    
976     txt += '\n'
977     txt += 'echo "***** cat pset.cfg *********"\n'
978     txt += 'cat pset.cfg\n'
979     txt += 'echo "****** end pset.cfg ********"\n'
980     txt += '\n'
981 gutsche 1.3 return txt
982    
983 slacapra 1.63 def wsBuildExe(self, nj=0):
984 gutsche 1.3 """
985     Put in the script the commands to build an executable
986     or a library.
987     """
988    
989     txt = ""
990    
991     if os.path.isfile(self.tgzNameWithPath):
992     txt += 'echo "tar xzvf $RUNTIME_AREA/'+os.path.basename(self.tgzNameWithPath)+'"\n'
993     txt += 'tar xzvf $RUNTIME_AREA/'+os.path.basename(self.tgzNameWithPath)+'\n'
994     txt += 'untar_status=$? \n'
995     txt += 'if [ $untar_status -ne 0 ]; then \n'
996     txt += ' echo "SET_EXE 1 ==> ERROR Untarring .tgz file failed"\n'
997     txt += ' echo "JOB_EXIT_STATUS = $untar_status" \n'
998 gutsche 1.7 txt += ' echo "JobExitCode=$untar_status" | tee -a $RUNTIME_AREA/$repo\n'
999 gutsche 1.3 txt += ' if [ $middleware == OSG ]; then \n'
1000     txt += ' echo "Remove working directory: $WORKING_DIR"\n'
1001     txt += ' cd $RUNTIME_AREA\n'
1002     txt += ' /bin/rm -rf $WORKING_DIR\n'
1003     txt += ' if [ -d $WORKING_DIR ] ;then\n'
1004 gutsche 1.13 txt += ' echo "SET_EXE 50999 ==> OSG $WORKING_DIR could not be deleted on WN `hostname` after Untarring .tgz file failed"\n'
1005     txt += ' echo "JOB_EXIT_STATUS = 50999"\n'
1006     txt += ' echo "JobExitCode=50999" | tee -a $RUNTIME_AREA/$repo\n'
1007     txt += ' dumpStatus $RUNTIME_AREA/$repo\n'
1008     txt += ' rm -f $RUNTIME_AREA/$repo \n'
1009     txt += ' echo "MonitorJobID=`echo $MonitorJobID`" | tee -a $RUNTIME_AREA/$repo \n'
1010     txt += ' echo "MonitorID=`echo $MonitorID`" | tee -a $RUNTIME_AREA/$repo\n'
1011 gutsche 1.3 txt += ' fi\n'
1012     txt += ' fi \n'
1013     txt += ' \n'
1014 gutsche 1.7 txt += ' exit 1 \n'
1015 gutsche 1.3 txt += 'else \n'
1016     txt += ' echo "Successful untar" \n'
1017     txt += 'fi \n'
1018 gutsche 1.50 txt += '\n'
1019     txt += 'echo "Include ProdAgentApi in PYTHONPATH"\n'
1020     txt += 'if [ -z "$PYTHONPATH" ]; then\n'
1021     txt += ' export PYTHONPATH=ProdAgentApi\n'
1022     txt += 'else\n'
1023     txt += ' export PYTHONPATH=ProdAgentApi:${PYTHONPATH}\n'
1024     txt += 'fi\n'
1025     txt += '\n'
1026    
1027 gutsche 1.3 pass
1028    
1029 slacapra 1.1 return txt
1030    
1031     def modifySteeringCards(self, nj):
1032     """
1033     modify the card provided by the user,
1034     writing a new card into share dir
1035     """
1036    
1037     def executableName(self):
1038 slacapra 1.70 if self.scriptExe: #CarlosDaniele
1039 spiga 1.42 return "sh "
1040     else:
1041     return self.executable
1042 slacapra 1.1
1043     def executableArgs(self):
1044 slacapra 1.70 if self.scriptExe:#CarlosDaniele
1045 spiga 1.42 return self.scriptExe + " $NJob"
1046     else:
1047     return " -p pset.cfg"
1048 slacapra 1.1
1049     def inputSandbox(self, nj):
1050     """
1051     Returns a list of filenames to be put in JDL input sandbox.
1052     """
1053     inp_box = []
1054 slacapra 1.53 # # dict added to delete duplicate from input sandbox file list
1055     # seen = {}
1056 slacapra 1.1 ## code
1057     if os.path.isfile(self.tgzNameWithPath):
1058     inp_box.append(self.tgzNameWithPath)
1059 corvo 1.58 if os.path.isfile(self.MLtgzfile):
1060     inp_box.append(self.MLtgzfile)
1061 slacapra 1.1 ## config
1062 slacapra 1.70 if not self.pset is None:
1063 corvo 1.56 inp_box.append(common.work_space.pathForTgz() + 'job/' + self.configFilename())
1064 slacapra 1.1 ## additional input files
1065 slacapra 1.70 for file in self.additional_inbox_files:
1066     inp_box.append(file)
1067 slacapra 1.1 return inp_box
1068    
1069     def outputSandbox(self, nj):
1070     """
1071     Returns a list of filenames to be put in JDL output sandbox.
1072     """
1073     out_box = []
1074    
1075     ## User Declared output files
1076 slacapra 1.54 for out in (self.output_file+self.output_file_sandbox):
1077 slacapra 1.1 n_out = nj + 1
1078     out_box.append(self.numberFile_(out,str(n_out)))
1079     return out_box
1080    
1081     def prepareSteeringCards(self):
1082     """
1083     Make initial modifications of the user's steering card file.
1084     """
1085     return
1086    
1087     def wsRenameOutput(self, nj):
1088     """
1089     Returns part of a job script which renames the produced files.
1090     """
1091    
1092     txt = '\n'
1093 gutsche 1.7 txt += '# directory content\n'
1094     txt += 'ls \n'
1095 slacapra 1.54
1096     for fileWithSuffix in (self.output_file+self.output_file_sandbox):
1097 slacapra 1.1 output_file_num = self.numberFile_(fileWithSuffix, '$NJob')
1098     txt += '\n'
1099 gutsche 1.7 txt += '# check output file\n'
1100 slacapra 1.1 txt += 'ls '+fileWithSuffix+'\n'
1101 fanzago 1.18 txt += 'ls_result=$?\n'
1102     txt += 'if [ $ls_result -ne 0 ] ; then\n'
1103 spiga 1.88 #txt += ' JOB_EXIT_STATUS=60302\n'
1104     ### FEDE
1105     txt += ' exit_status=60302\n'
1106     ####
1107 fanzago 1.18 txt += ' echo "ERROR: Problem with output file"\n'
1108 gutsche 1.7 if common.scheduler.boss_scheduler_name == 'condor_g':
1109     txt += ' if [ $middleware == OSG ]; then \n'
1110     txt += ' echo "prepare dummy output file"\n'
1111     txt += ' echo "Processing of job output failed" > $RUNTIME_AREA/'+output_file_num+'\n'
1112     txt += ' fi \n'
1113 slacapra 1.1 txt += 'else\n'
1114     txt += ' cp '+fileWithSuffix+' $RUNTIME_AREA/'+output_file_num+'\n'
1115     txt += 'fi\n'
1116    
1117 gutsche 1.7 txt += 'cd $RUNTIME_AREA\n'
1118 fanzago 1.18 txt += 'cd $RUNTIME_AREA\n'
1119 gutsche 1.3 ### OLI_DANIELE
1120     txt += 'if [ $middleware == OSG ]; then\n'
1121     txt += ' cd $RUNTIME_AREA\n'
1122     txt += ' echo "Remove working directory: $WORKING_DIR"\n'
1123     txt += ' /bin/rm -rf $WORKING_DIR\n'
1124     txt += ' if [ -d $WORKING_DIR ] ;then\n'
1125 slacapra 1.90 txt += ' echo "SET_EXE 60999 ==> OSG $WORKING_DIR could not be deleted on WN `hostname` after cleanup of WN"\n'
1126     txt += ' echo "JOB_EXIT_STATUS = 60999"\n'
1127     txt += ' echo "JobExitCode=60999" | tee -a $RUNTIME_AREA/$repo\n'
1128     txt += ' dumpStatus $RUNTIME_AREA/$repo\n'
1129 gutsche 1.13 txt += ' rm -f $RUNTIME_AREA/$repo \n'
1130     txt += ' echo "MonitorJobID=`echo $MonitorJobID`" | tee -a $RUNTIME_AREA/$repo \n'
1131     txt += ' echo "MonitorID=`echo $MonitorID`" | tee -a $RUNTIME_AREA/$repo\n'
1132 gutsche 1.3 txt += ' fi\n'
1133     txt += 'fi\n'
1134     txt += '\n'
1135 slacapra 1.54
1136     file_list = ''
1137     ## Add to filelist only files to be possibly copied to SE
1138     for fileWithSuffix in self.output_file:
1139     output_file_num = self.numberFile_(fileWithSuffix, '$NJob')
1140     file_list=file_list+output_file_num+' '
1141     file_list=file_list[:-1]
1142     txt += 'file_list="'+file_list+'"\n'
1143    
1144 slacapra 1.1 return txt
1145    
1146     def numberFile_(self, file, txt):
1147     """
1148     append _'txt' before last extension of a file
1149     """
1150     p = string.split(file,".")
1151     # take away last extension
1152     name = p[0]
1153     for x in p[1:-1]:
1154 slacapra 1.90 name=name+"."+x
1155 slacapra 1.1 # add "_txt"
1156     if len(p)>1:
1157 slacapra 1.90 ext = p[len(p)-1]
1158     result = name + '_' + txt + "." + ext
1159 slacapra 1.1 else:
1160 slacapra 1.90 result = name + '_' + txt
1161 slacapra 1.1
1162     return result
1163    
1164 slacapra 1.63 def getRequirements(self, nj=[]):
1165 slacapra 1.1 """
1166     return job requirements to add to jdl files
1167     """
1168     req = ''
1169 slacapra 1.47 if self.version:
1170 slacapra 1.10 req='Member("VO-cms-' + \
1171 slacapra 1.47 self.version + \
1172 slacapra 1.10 '", other.GlueHostApplicationSoftwareRunTimeEnvironment)'
1173 slacapra 1.91 # if self.executable_arch:
1174     # req='Member("VO-cms-' + \
1175     # self.executable_arch + \
1176     # '", other.GlueHostApplicationSoftwareRunTimeEnvironment)'
1177 gutsche 1.35
1178     req = req + ' && (other.GlueHostNetworkAdapterOutboundIP)'
1179    
1180 slacapra 1.1 return req
1181 gutsche 1.3
1182     def configFilename(self):
1183     """ return the config filename """
1184     return self.name()+'.cfg'
1185    
1186     ### OLI_DANIELE
1187     def wsSetupCMSOSGEnvironment_(self):
1188     """
1189     Returns part of a job script which is prepares
1190     the execution environment and which is common for all CMS jobs.
1191     """
1192     txt = '\n'
1193     txt += ' echo "### SETUP CMS OSG ENVIRONMENT ###"\n'
1194     txt += ' if [ -f $GRID3_APP_DIR/cmssoft/cmsset_default.sh ] ;then\n'
1195     txt += ' # Use $GRID3_APP_DIR/cmssoft/cmsset_default.sh to setup cms software\n'
1196 spiga 1.87 txt += ' export SCRAM_ARCH='+self.executable_arch+'\n'
1197 gutsche 1.3 txt += ' source $GRID3_APP_DIR/cmssoft/cmsset_default.sh '+self.version+'\n'
1198 mkirn 1.40 txt += ' elif [ -f $OSG_APP/cmssoft/cms/cmsset_default.sh ] ;then\n'
1199     txt += ' # Use $OSG_APP/cmssoft/cms/cmsset_default.sh to setup cms software\n'
1200 spiga 1.87 txt += ' export SCRAM_ARCH='+self.executable_arch+'\n'
1201 mkirn 1.40 txt += ' source $OSG_APP/cmssoft/cms/cmsset_default.sh '+self.version+'\n'
1202 gutsche 1.3 txt += ' else\n'
1203 mkirn 1.40 txt += ' echo "SET_CMS_ENV 10020 ==> ERROR $GRID3_APP_DIR/cmssoft/cmsset_default.sh and $OSG_APP/cmssoft/cms/cmsset_default.sh file not found"\n'
1204 gutsche 1.3 txt += ' echo "JOB_EXIT_STATUS = 10020"\n'
1205     txt += ' echo "JobExitCode=10020" | tee -a $RUNTIME_AREA/$repo\n'
1206     txt += ' dumpStatus $RUNTIME_AREA/$repo\n'
1207 gutsche 1.13 txt += ' rm -f $RUNTIME_AREA/$repo \n'
1208     txt += ' echo "MonitorJobID=`echo $MonitorJobID`" | tee -a $RUNTIME_AREA/$repo \n'
1209     txt += ' echo "MonitorID=`echo $MonitorID`" | tee -a $RUNTIME_AREA/$repo\n'
1210 gutsche 1.7 txt += ' exit 1\n'
1211 gutsche 1.3 txt += '\n'
1212     txt += ' echo "Remove working directory: $WORKING_DIR"\n'
1213     txt += ' cd $RUNTIME_AREA\n'
1214     txt += ' /bin/rm -rf $WORKING_DIR\n'
1215     txt += ' if [ -d $WORKING_DIR ] ;then\n'
1216 slacapra 1.90 txt += ' echo "SET_CMS_ENV 10017 ==> OSG $WORKING_DIR could not be deleted on WN `hostname` after $GRID3_APP_DIR/cmssoft/cmsset_default.sh and $OSG_APP/cmssoft/cms/cmsset_default.sh file not found"\n'
1217     txt += ' echo "JOB_EXIT_STATUS = 10017"\n'
1218     txt += ' echo "JobExitCode=10017" | tee -a $RUNTIME_AREA/$repo\n'
1219     txt += ' dumpStatus $RUNTIME_AREA/$repo\n'
1220 gutsche 1.13 txt += ' rm -f $RUNTIME_AREA/$repo \n'
1221     txt += ' echo "MonitorJobID=`echo $MonitorJobID`" | tee -a $RUNTIME_AREA/$repo \n'
1222     txt += ' echo "MonitorID=`echo $MonitorID`" | tee -a $RUNTIME_AREA/$repo\n'
1223 gutsche 1.3 txt += ' fi\n'
1224     txt += '\n'
1225 gutsche 1.7 txt += ' exit 1\n'
1226 gutsche 1.3 txt += ' fi\n'
1227     txt += '\n'
1228     txt += ' echo "SET_CMS_ENV 0 ==> setup cms environment ok"\n'
1229     txt += ' echo " END SETUP CMS OSG ENVIRONMENT "\n'
1230    
1231     return txt
1232    
1233     ### OLI_DANIELE
1234     def wsSetupCMSLCGEnvironment_(self):
1235     """
1236     Returns part of a job script which is prepares
1237     the execution environment and which is common for all CMS jobs.
1238     """
1239     txt = ' \n'
1240     txt += ' echo " ### SETUP CMS LCG ENVIRONMENT ### "\n'
1241     txt += ' if [ ! $VO_CMS_SW_DIR ] ;then\n'
1242     txt += ' echo "SET_CMS_ENV 10031 ==> ERROR CMS software dir not found on WN `hostname`"\n'
1243     txt += ' echo "JOB_EXIT_STATUS = 10031" \n'
1244     txt += ' echo "JobExitCode=10031" | tee -a $RUNTIME_AREA/$repo\n'
1245     txt += ' dumpStatus $RUNTIME_AREA/$repo\n'
1246 gutsche 1.13 txt += ' rm -f $RUNTIME_AREA/$repo \n'
1247     txt += ' echo "MonitorJobID=`echo $MonitorJobID`" | tee -a $RUNTIME_AREA/$repo \n'
1248     txt += ' echo "MonitorID=`echo $MonitorID`" | tee -a $RUNTIME_AREA/$repo\n'
1249 gutsche 1.7 txt += ' exit 1\n'
1250 gutsche 1.3 txt += ' else\n'
1251     txt += ' echo "Sourcing environment... "\n'
1252     txt += ' if [ ! -s $VO_CMS_SW_DIR/cmsset_default.sh ] ;then\n'
1253     txt += ' echo "SET_CMS_ENV 10020 ==> ERROR cmsset_default.sh file not found into dir $VO_CMS_SW_DIR"\n'
1254     txt += ' echo "JOB_EXIT_STATUS = 10020"\n'
1255     txt += ' echo "JobExitCode=10020" | tee -a $RUNTIME_AREA/$repo\n'
1256     txt += ' dumpStatus $RUNTIME_AREA/$repo\n'
1257 gutsche 1.13 txt += ' rm -f $RUNTIME_AREA/$repo \n'
1258     txt += ' echo "MonitorJobID=`echo $MonitorJobID`" | tee -a $RUNTIME_AREA/$repo \n'
1259     txt += ' echo "MonitorID=`echo $MonitorID`" | tee -a $RUNTIME_AREA/$repo\n'
1260 gutsche 1.7 txt += ' exit 1\n'
1261 gutsche 1.3 txt += ' fi\n'
1262     txt += ' echo "sourcing $VO_CMS_SW_DIR/cmsset_default.sh"\n'
1263     txt += ' source $VO_CMS_SW_DIR/cmsset_default.sh\n'
1264     txt += ' result=$?\n'
1265     txt += ' if [ $result -ne 0 ]; then\n'
1266     txt += ' echo "SET_CMS_ENV 10032 ==> ERROR problem sourcing $VO_CMS_SW_DIR/cmsset_default.sh"\n'
1267     txt += ' echo "JOB_EXIT_STATUS = 10032"\n'
1268     txt += ' echo "JobExitCode=10032" | tee -a $RUNTIME_AREA/$repo\n'
1269     txt += ' dumpStatus $RUNTIME_AREA/$repo\n'
1270 gutsche 1.13 txt += ' rm -f $RUNTIME_AREA/$repo \n'
1271     txt += ' echo "MonitorJobID=`echo $MonitorJobID`" | tee -a $RUNTIME_AREA/$repo \n'
1272     txt += ' echo "MonitorID=`echo $MonitorID`" | tee -a $RUNTIME_AREA/$repo\n'
1273 gutsche 1.7 txt += ' exit 1\n'
1274 gutsche 1.3 txt += ' fi\n'
1275     txt += ' fi\n'
1276     txt += ' \n'
1277     txt += ' echo "SET_CMS_ENV 0 ==> setup cms environment ok"\n'
1278     txt += ' echo "### END SETUP CMS LCG ENVIRONMENT ###"\n'
1279     return txt
1280 gutsche 1.5
1281     def setParam_(self, param, value):
1282     self._params[param] = value
1283    
1284     def getParams(self):
1285     return self._params
1286 gutsche 1.8
1287     def setTaskid_(self):
1288     self._taskId = self.cfg_params['taskId']
1289    
1290     def getTaskid(self):
1291     return self._taskId
1292 gutsche 1.35
1293     #######################################################################
1294     def uniquelist(self, old):
1295     """
1296     remove duplicates from a list
1297     """
1298     nd={}
1299     for e in old:
1300     nd[e]=0
1301     return nd.keys()