ViewVC Help
View File | Revision Log | Show Annotations | Root Listing
root/cvsroot/COMP/CRAB/python/cms_cmssw.py
Revision: 1.84
Committed: Thu May 17 10:49:49 2007 UTC (17 years, 11 months ago) by slacapra
Content type: text/x-python
Branch: MAIN
CVS Tags: CRAB_1_5_1_pre4
Changes since 1.83: +2 -1 lines
Log Message:
move setting of SCRAM_ARCH before actual creatio of scram project

File Contents

# User Rev Content
1 slacapra 1.1 from JobType import JobType
2     from crab_logger import Logger
3     from crab_exceptions import *
4     from crab_util import *
5     import common
6 gutsche 1.3 import PsetManipulator
7 slacapra 1.83 # import DataDiscovery
8     # import DataDiscovery_DBS2
9     # import DataLocation
10 slacapra 1.1 import Scram
11    
12 slacapra 1.70 import os, string, re, shutil, glob
13 slacapra 1.1
14     class Cmssw(JobType):
15 gutsche 1.38 def __init__(self, cfg_params, ncjobs):
16 slacapra 1.1 JobType.__init__(self, 'CMSSW')
17     common.logger.debug(3,'CMSSW::__init__')
18    
19 gutsche 1.3 # Marco.
20     self._params = {}
21     self.cfg_params = cfg_params
22 gutsche 1.38
23 gutsche 1.72 try:
24     self.MaxTarBallSize = float(self.cfg_params['EDG.maxtarballsize'])
25     except KeyError:
26 slacapra 1.82 self.MaxTarBallSize = 9.5
27 gutsche 1.72
28 gutsche 1.44 # number of jobs requested to be created, limit obj splitting
29 gutsche 1.38 self.ncjobs = ncjobs
30    
31 slacapra 1.1 log = common.logger
32    
33     self.scram = Scram.Scram(cfg_params)
34     self.additional_inbox_files = []
35     self.scriptExe = ''
36     self.executable = ''
37 slacapra 1.71 self.executable_arch = self.scram.getArch()
38 slacapra 1.1 self.tgz_name = 'default.tgz'
39 corvo 1.56 self.scriptName = 'CMSSW.sh'
40 spiga 1.42 self.pset = '' #scrip use case Da
41     self.datasetPath = '' #scrip use case Da
42 gutsche 1.3
43 gutsche 1.50 # set FJR file name
44     self.fjrFileName = 'crab_fjr.xml'
45    
46 slacapra 1.1 self.version = self.scram.getSWVersion()
47 slacapra 1.55 common.taskDB.setDict('codeVersion',self.version)
48 gutsche 1.5 self.setParam_('application', self.version)
49 slacapra 1.47
50 slacapra 1.1 ### collect Data cards
51 gutsche 1.66
52     ## get DBS mode
53     try:
54 slacapra 1.80 self.use_dbs_1 = int(self.cfg_params['CMSSW.use_dbs_1'])
55 gutsche 1.66 except KeyError:
56 slacapra 1.80 self.use_dbs_1 = 0
57 gutsche 1.66
58 slacapra 1.1 try:
59 slacapra 1.9 tmp = cfg_params['CMSSW.datasetpath']
60     log.debug(6, "CMSSW::CMSSW(): datasetPath = "+tmp)
61     if string.lower(tmp)=='none':
62     self.datasetPath = None
63 slacapra 1.21 self.selectNoInput = 1
64 slacapra 1.9 else:
65     self.datasetPath = tmp
66 slacapra 1.21 self.selectNoInput = 0
67 slacapra 1.1 except KeyError:
68 gutsche 1.3 msg = "Error: datasetpath not defined "
69 slacapra 1.1 raise CrabException(msg)
70 gutsche 1.5
71     # ML monitoring
72     # split dataset path style: /PreProdR3Minbias/SIM/GEN-SIM
73 slacapra 1.9 if not self.datasetPath:
74     self.setParam_('dataset', 'None')
75     self.setParam_('owner', 'None')
76     else:
77     datasetpath_split = self.datasetPath.split("/")
78 slacapra 1.80 if self.use_dbs_1 == 1 :
79     self.setParam_('dataset', datasetpath_split[1])
80     self.setParam_('owner', datasetpath_split[-1])
81     else:
82     self.setParam_('dataset', datasetpath_split[1])
83     self.setParam_('owner', datasetpath_split[2])
84 gutsche 1.8 self.setTaskid_()
85     self.setParam_('taskId', self.cfg_params['taskId'])
86 gutsche 1.5
87 slacapra 1.1 self.dataTiers = []
88    
89     ## now the application
90     try:
91     self.executable = cfg_params['CMSSW.executable']
92 gutsche 1.5 self.setParam_('exe', self.executable)
93 slacapra 1.1 log.debug(6, "CMSSW::CMSSW(): executable = "+self.executable)
94     msg = "Default executable cmsRun overridden. Switch to " + self.executable
95     log.debug(3,msg)
96     except KeyError:
97     self.executable = 'cmsRun'
98 gutsche 1.5 self.setParam_('exe', self.executable)
99 slacapra 1.1 msg = "User executable not defined. Use cmsRun"
100     log.debug(3,msg)
101     pass
102    
103     try:
104     self.pset = cfg_params['CMSSW.pset']
105     log.debug(6, "Cmssw::Cmssw(): PSet file = "+self.pset)
106 spiga 1.42 if self.pset.lower() != 'none' :
107     if (not os.path.exists(self.pset)):
108     raise CrabException("User defined PSet file "+self.pset+" does not exist")
109     else:
110     self.pset = None
111 slacapra 1.1 except KeyError:
112     raise CrabException("PSet file missing. Cannot run cmsRun ")
113    
114     # output files
115 slacapra 1.53 ## stuff which must be returned always via sandbox
116     self.output_file_sandbox = []
117    
118     # add fjr report by default via sandbox
119     self.output_file_sandbox.append(self.fjrFileName)
120    
121     # other output files to be returned via sandbox or copied to SE
122 slacapra 1.1 try:
123     self.output_file = []
124     tmp = cfg_params['CMSSW.output_file']
125     if tmp != '':
126     tmpOutFiles = string.split(cfg_params['CMSSW.output_file'],',')
127     log.debug(7, 'cmssw::cmssw(): output files '+str(tmpOutFiles))
128     for tmp in tmpOutFiles:
129     tmp=string.strip(tmp)
130     self.output_file.append(tmp)
131     pass
132     else:
133 gutsche 1.50 log.message("No output file defined: only stdout/err and the CRAB Framework Job Report will be available")
134 slacapra 1.1 pass
135     pass
136     except KeyError:
137 gutsche 1.50 log.message("No output file defined: only stdout/err and the CRAB Framework Job Report will be available")
138 slacapra 1.1 pass
139    
140     # script_exe file as additional file in inputSandbox
141     try:
142 slacapra 1.10 self.scriptExe = cfg_params['USER.script_exe']
143     if self.scriptExe != '':
144     if not os.path.isfile(self.scriptExe):
145 slacapra 1.64 msg ="ERROR. file "+self.scriptExe+" not found"
146 slacapra 1.10 raise CrabException(msg)
147 spiga 1.42 self.additional_inbox_files.append(string.strip(self.scriptExe))
148 slacapra 1.1 except KeyError:
149 spiga 1.42 self.scriptExe = ''
150 slacapra 1.70
151 spiga 1.42 #CarlosDaniele
152     if self.datasetPath == None and self.pset == None and self.scriptExe == '' :
153 slacapra 1.70 msg ="Error. script_exe not defined"
154 spiga 1.42 raise CrabException(msg)
155    
156 slacapra 1.1 ## additional input files
157     try:
158 slacapra 1.29 tmpAddFiles = string.split(cfg_params['USER.additional_input_files'],',')
159 slacapra 1.70 for tmp in tmpAddFiles:
160     tmp = string.strip(tmp)
161     dirname = ''
162     if not tmp[0]=="/": dirname = "."
163 corvo 1.79 files = glob.glob(os.path.join(dirname, tmp))
164 slacapra 1.70 for file in files:
165     if not os.path.exists(file):
166     raise CrabException("Additional input file not found: "+file)
167 slacapra 1.45 pass
168 corvo 1.79 storedFile = common.work_space.pathForTgz()+file
169 slacapra 1.70 shutil.copyfile(file, storedFile)
170     self.additional_inbox_files.append(string.strip(storedFile))
171 slacapra 1.1 pass
172     pass
173 slacapra 1.70 common.logger.debug(5,"Additional input files: "+str(self.additional_inbox_files))
174 slacapra 1.1 except KeyError:
175     pass
176    
177 slacapra 1.9 # files per job
178 slacapra 1.1 try:
179 gutsche 1.35 if (cfg_params['CMSSW.files_per_jobs']):
180     raise CrabException("files_per_jobs no longer supported. Quitting.")
181 gutsche 1.3 except KeyError:
182 gutsche 1.35 pass
183 gutsche 1.3
184 slacapra 1.9 ## Events per job
185 gutsche 1.3 try:
186 slacapra 1.10 self.eventsPerJob =int( cfg_params['CMSSW.events_per_job'])
187 slacapra 1.9 self.selectEventsPerJob = 1
188 gutsche 1.3 except KeyError:
189 slacapra 1.9 self.eventsPerJob = -1
190     self.selectEventsPerJob = 0
191    
192 slacapra 1.22 ## number of jobs
193     try:
194     self.theNumberOfJobs =int( cfg_params['CMSSW.number_of_jobs'])
195     self.selectNumberOfJobs = 1
196     except KeyError:
197     self.theNumberOfJobs = 0
198     self.selectNumberOfJobs = 0
199 slacapra 1.10
200 gutsche 1.35 try:
201     self.total_number_of_events = int(cfg_params['CMSSW.total_number_of_events'])
202     self.selectTotalNumberEvents = 1
203     except KeyError:
204     self.total_number_of_events = 0
205     self.selectTotalNumberEvents = 0
206    
207 spiga 1.42 if self.pset != None: #CarlosDaniele
208     if ( (self.selectTotalNumberEvents + self.selectEventsPerJob + self.selectNumberOfJobs) != 2 ):
209     msg = 'Must define exactly two of total_number_of_events, events_per_job, or number_of_jobs.'
210     raise CrabException(msg)
211     else:
212     if (self.selectNumberOfJobs == 0):
213     msg = 'Must specify number_of_jobs.'
214     raise CrabException(msg)
215 gutsche 1.35
216 slacapra 1.22 ## source seed for pythia
217     try:
218     self.sourceSeed = int(cfg_params['CMSSW.pythia_seed'])
219     except KeyError:
220 slacapra 1.23 self.sourceSeed = None
221     common.logger.debug(5,"No seed given")
222 slacapra 1.22
223 slacapra 1.28 try:
224     self.sourceSeedVtx = int(cfg_params['CMSSW.vtx_seed'])
225     except KeyError:
226     self.sourceSeedVtx = None
227     common.logger.debug(5,"No vertex seed given")
228 spiga 1.57 try:
229     self.firstRun = int(cfg_params['CMSSW.first_run'])
230     except KeyError:
231     self.firstRun = None
232     common.logger.debug(5,"No first run given")
233 spiga 1.42 if self.pset != None: #CarlosDaniele
234     self.PsetEdit = PsetManipulator.PsetManipulator(self.pset) #Daniele Pset
235 gutsche 1.3
236 slacapra 1.1 #DBSDLS-start
237     ## Initialize the variables that are extracted from DBS/DLS and needed in other places of the code
238     self.maxEvents=0 # max events available ( --> check the requested nb. of evts in Creator.py)
239     self.DBSPaths={} # all dbs paths requested ( --> input to the site local discovery script)
240 gutsche 1.35 self.jobDestination=[] # Site destination(s) for each job (list of lists)
241 slacapra 1.1 ## Perform the data location and discovery (based on DBS/DLS)
242 slacapra 1.9 ## SL: Don't if NONE is specified as input (pythia use case)
243 gutsche 1.35 blockSites = {}
244 slacapra 1.9 if self.datasetPath:
245 gutsche 1.35 blockSites = self.DataDiscoveryAndLocation(cfg_params)
246 slacapra 1.1 #DBSDLS-end
247    
248     self.tgzNameWithPath = self.getTarBall(self.executable)
249 slacapra 1.10
250 slacapra 1.9 ## Select Splitting
251 spiga 1.42 if self.selectNoInput:
252     if self.pset == None: #CarlosDaniele
253     self.jobSplittingForScript()
254     else:
255     self.jobSplittingNoInput()
256 corvo 1.56 else:
257     self.jobSplittingByBlocks(blockSites)
258 gutsche 1.5
259 slacapra 1.22 # modify Pset
260 spiga 1.42 if self.pset != None: #CarlosDaniele
261 corvo 1.79 try:
262     if (self.datasetPath): # standard job
263     # allow to processa a fraction of events in a file
264     self.PsetEdit.inputModule("INPUT")
265     self.PsetEdit.maxEvent("INPUTMAXEVENTS")
266     self.PsetEdit.skipEvent("INPUTSKIPEVENTS")
267     else: # pythia like job
268     self.PsetEdit.maxEvent(self.eventsPerJob)
269     if (self.firstRun):
270     self.PsetEdit.pythiaFirstRun("INPUTFIRSTRUN") #First Run
271     if (self.sourceSeed) :
272     self.PsetEdit.pythiaSeed("INPUT")
273     if (self.sourceSeedVtx) :
274     self.PsetEdit.pythiaSeedVtx("INPUTVTX")
275     # add FrameworkJobReport to parameter-set
276     self.PsetEdit.addCrabFJR(self.fjrFileName)
277     self.PsetEdit.psetWriter(self.configFilename())
278     except:
279     msg='Error while manipuliating ParameterSet: exiting...'
280     raise CrabException(msg)
281 gutsche 1.3
282 slacapra 1.1 def DataDiscoveryAndLocation(self, cfg_params):
283    
284 slacapra 1.83 import DataDiscovery
285     import DataDiscovery_DBS2
286     import DataLocation
287 gutsche 1.3 common.logger.debug(10,"CMSSW::DataDiscoveryAndLocation()")
288    
289     datasetPath=self.datasetPath
290    
291 slacapra 1.1 ## Contact the DBS
292 slacapra 1.41 common.logger.message("Contacting DBS...")
293 slacapra 1.1 try:
294 gutsche 1.66
295 slacapra 1.80 if self.use_dbs_1 == 1 :
296     self.pubdata=DataDiscovery.DataDiscovery(datasetPath, cfg_params)
297     else :
298 gutsche 1.66 self.pubdata=DataDiscovery_DBS2.DataDiscovery_DBS2(datasetPath, cfg_params)
299 slacapra 1.1 self.pubdata.fetchDBSInfo()
300    
301 slacapra 1.41 except DataDiscovery.NotExistingDatasetError, ex :
302 slacapra 1.1 msg = 'ERROR ***: failed Data Discovery in DBS : %s'%ex.getErrorMessage()
303     raise CrabException(msg)
304 slacapra 1.41 except DataDiscovery.NoDataTierinProvenanceError, ex :
305 slacapra 1.1 msg = 'ERROR ***: failed Data Discovery in DBS : %s'%ex.getErrorMessage()
306     raise CrabException(msg)
307 slacapra 1.41 except DataDiscovery.DataDiscoveryError, ex:
308 gutsche 1.66 msg = 'ERROR ***: failed Data Discovery in DBS : %s'%ex.getErrorMessage()
309 slacapra 1.1 raise CrabException(msg)
310 gutsche 1.67 except DataDiscovery_DBS2.NotExistingDatasetError_DBS2, ex :
311     msg = 'ERROR ***: failed Data Discovery in DBS : %s'%ex.getErrorMessage()
312     raise CrabException(msg)
313     except DataDiscovery_DBS2.NoDataTierinProvenanceError_DBS2, ex :
314     msg = 'ERROR ***: failed Data Discovery in DBS : %s'%ex.getErrorMessage()
315     raise CrabException(msg)
316     except DataDiscovery_DBS2.DataDiscoveryError_DBS2, ex:
317     msg = 'ERROR ***: failed Data Discovery in DBS : %s'%ex.getErrorMessage()
318     raise CrabException(msg)
319 slacapra 1.1
320     ## get list of all required data in the form of dbs paths (dbs path = /dataset/datatier/owner)
321 gutsche 1.3 common.logger.message("Required data are :"+self.datasetPath)
322    
323 gutsche 1.35 self.filesbyblock=self.pubdata.getFiles()
324 mkirn 1.37 self.eventsbyblock=self.pubdata.getEventsPerBlock()
325     self.eventsbyfile=self.pubdata.getEventsPerFile()
326 gutsche 1.3
327 slacapra 1.1 ## get max number of events
328     self.maxEvents=self.pubdata.getMaxEvents() ## self.maxEvents used in Creator.py
329 gutsche 1.44 common.logger.message("The number of available events is %s\n"%self.maxEvents)
330 slacapra 1.1
331 slacapra 1.41 common.logger.message("Contacting DLS...")
332 slacapra 1.1 ## Contact the DLS and build a list of sites hosting the fileblocks
333     try:
334 slacapra 1.41 dataloc=DataLocation.DataLocation(self.filesbyblock.keys(),cfg_params)
335 gutsche 1.6 dataloc.fetchDLSInfo()
336 slacapra 1.41 except DataLocation.DataLocationError , ex:
337 slacapra 1.1 msg = 'ERROR ***: failed Data Location in DLS \n %s '%ex.getErrorMessage()
338     raise CrabException(msg)
339    
340    
341 gutsche 1.35 sites = dataloc.getSites()
342     allSites = []
343     listSites = sites.values()
344 slacapra 1.63 for listSite in listSites:
345     for oneSite in listSite:
346 gutsche 1.35 allSites.append(oneSite)
347     allSites = self.uniquelist(allSites)
348 gutsche 1.3
349 gutsche 1.35 common.logger.message("Sites ("+str(len(allSites))+") hosting part/all of dataset: "+str(allSites))
350     common.logger.debug(6, "List of Sites: "+str(allSites))
351     return sites
352 gutsche 1.3
353 gutsche 1.35 def jobSplittingByBlocks(self, blockSites):
354 slacapra 1.9 """
355 gutsche 1.35 Perform job splitting. Jobs run over an integer number of files
356     and no more than one block.
357     ARGUMENT: blockSites: dictionary with blocks as keys and list of host sites as values
358     REQUIRES: self.selectTotalNumberEvents, self.selectEventsPerJob, self.selectNumberofJobs,
359     self.total_number_of_events, self.eventsPerJob, self.theNumberOfJobs,
360     self.maxEvents, self.filesbyblock
361     SETS: self.jobDestination - Site destination(s) for each job (a list of lists)
362     self.total_number_of_jobs - Total # of jobs
363     self.list_of_args - File(s) job will run on (a list of lists)
364     """
365    
366     # ---- Handle the possible job splitting configurations ---- #
367     if (self.selectTotalNumberEvents):
368     totalEventsRequested = self.total_number_of_events
369     if (self.selectEventsPerJob):
370     eventsPerJobRequested = self.eventsPerJob
371     if (self.selectNumberOfJobs):
372     totalEventsRequested = self.theNumberOfJobs * self.eventsPerJob
373    
374     # If user requested all the events in the dataset
375     if (totalEventsRequested == -1):
376     eventsRemaining=self.maxEvents
377     # If user requested more events than are in the dataset
378     elif (totalEventsRequested > self.maxEvents):
379     eventsRemaining = self.maxEvents
380     common.logger.message("Requested "+str(self.total_number_of_events)+ " events, but only "+str(self.maxEvents)+" events are available.")
381     # If user requested less events than are in the dataset
382     else:
383     eventsRemaining = totalEventsRequested
384 slacapra 1.22
385 slacapra 1.41 # If user requested more events per job than are in the dataset
386     if (self.selectEventsPerJob and eventsPerJobRequested > self.maxEvents):
387     eventsPerJobRequested = self.maxEvents
388    
389 gutsche 1.35 # For user info at end
390     totalEventCount = 0
391 gutsche 1.3
392 gutsche 1.35 if (self.selectTotalNumberEvents and self.selectNumberOfJobs):
393     eventsPerJobRequested = int(eventsRemaining/self.theNumberOfJobs)
394 slacapra 1.22
395 gutsche 1.35 if (self.selectNumberOfJobs):
396     common.logger.message("May not create the exact number_of_jobs requested.")
397 slacapra 1.23
398 gutsche 1.38 if ( self.ncjobs == 'all' ) :
399     totalNumberOfJobs = 999999999
400     else :
401     totalNumberOfJobs = self.ncjobs
402    
403    
404 gutsche 1.35 blocks = blockSites.keys()
405     blockCount = 0
406     # Backup variable in case self.maxEvents counted events in a non-included block
407     numBlocksInDataset = len(blocks)
408 gutsche 1.3
409 gutsche 1.35 jobCount = 0
410     list_of_lists = []
411 gutsche 1.3
412 gutsche 1.35 # ---- Iterate over the blocks in the dataset until ---- #
413     # ---- we've met the requested total # of events ---- #
414 gutsche 1.38 while ( (eventsRemaining > 0) and (blockCount < numBlocksInDataset) and (jobCount < totalNumberOfJobs)):
415 gutsche 1.35 block = blocks[blockCount]
416 gutsche 1.44 blockCount += 1
417    
418 gutsche 1.68 if self.eventsbyblock.has_key(block) :
419     numEventsInBlock = self.eventsbyblock[block]
420     common.logger.debug(5,'Events in Block File '+str(numEventsInBlock))
421 slacapra 1.9
422 gutsche 1.68 files = self.filesbyblock[block]
423     numFilesInBlock = len(files)
424     if (numFilesInBlock <= 0):
425     continue
426     fileCount = 0
427    
428     # ---- New block => New job ---- #
429     parString = "\\{"
430     # counter for number of events in files currently worked on
431     filesEventCount = 0
432     # flag if next while loop should touch new file
433     newFile = 1
434     # job event counter
435     jobSkipEventCount = 0
436 slacapra 1.9
437 gutsche 1.68 # ---- Iterate over the files in the block until we've met the requested ---- #
438     # ---- total # of events or we've gone over all the files in this block ---- #
439     while ( (eventsRemaining > 0) and (fileCount < numFilesInBlock) and (jobCount < totalNumberOfJobs) ):
440     file = files[fileCount]
441     if newFile :
442     try:
443     numEventsInFile = self.eventsbyfile[file]
444     common.logger.debug(6, "File "+str(file)+" has "+str(numEventsInFile)+" events")
445     # increase filesEventCount
446     filesEventCount += numEventsInFile
447     # Add file to current job
448     parString += '\\\"' + file + '\\\"\,'
449     newFile = 0
450     except KeyError:
451     common.logger.message("File "+str(file)+" has unknown number of events: skipping")
452 slacapra 1.41
453 gutsche 1.38
454 gutsche 1.68 # if less events in file remain than eventsPerJobRequested
455     if ( filesEventCount - jobSkipEventCount < eventsPerJobRequested ) :
456     # if last file in block
457     if ( fileCount == numFilesInBlock-1 ) :
458     # end job using last file, use remaining events in block
459     # close job and touch new file
460     fullString = parString[:-2]
461     fullString += '\\}'
462     list_of_lists.append([fullString,str(-1),str(jobSkipEventCount)])
463     common.logger.debug(3,"Job "+str(jobCount+1)+" can run over "+str(filesEventCount - jobSkipEventCount)+" events (last file in block).")
464     self.jobDestination.append(blockSites[block])
465     common.logger.debug(5,"Job "+str(jobCount+1)+" Destination: "+str(self.jobDestination[jobCount]))
466     # reset counter
467     jobCount = jobCount + 1
468     totalEventCount = totalEventCount + filesEventCount - jobSkipEventCount
469     eventsRemaining = eventsRemaining - filesEventCount + jobSkipEventCount
470     jobSkipEventCount = 0
471     # reset file
472     parString = "\\{"
473     filesEventCount = 0
474     newFile = 1
475     fileCount += 1
476     else :
477     # go to next file
478     newFile = 1
479     fileCount += 1
480     # if events in file equal to eventsPerJobRequested
481     elif ( filesEventCount - jobSkipEventCount == eventsPerJobRequested ) :
482 gutsche 1.38 # close job and touch new file
483     fullString = parString[:-2]
484     fullString += '\\}'
485 gutsche 1.68 list_of_lists.append([fullString,str(eventsPerJobRequested),str(jobSkipEventCount)])
486     common.logger.debug(3,"Job "+str(jobCount+1)+" can run over "+str(eventsPerJobRequested)+" events.")
487 gutsche 1.38 self.jobDestination.append(blockSites[block])
488     common.logger.debug(5,"Job "+str(jobCount+1)+" Destination: "+str(self.jobDestination[jobCount]))
489     # reset counter
490     jobCount = jobCount + 1
491 gutsche 1.68 totalEventCount = totalEventCount + eventsPerJobRequested
492     eventsRemaining = eventsRemaining - eventsPerJobRequested
493 gutsche 1.38 jobSkipEventCount = 0
494     # reset file
495     parString = "\\{"
496     filesEventCount = 0
497     newFile = 1
498     fileCount += 1
499 gutsche 1.68
500     # if more events in file remain than eventsPerJobRequested
501 gutsche 1.38 else :
502 gutsche 1.68 # close job but don't touch new file
503     fullString = parString[:-2]
504     fullString += '\\}'
505     list_of_lists.append([fullString,str(eventsPerJobRequested),str(jobSkipEventCount)])
506     common.logger.debug(3,"Job "+str(jobCount+1)+" can run over "+str(eventsPerJobRequested)+" events.")
507     self.jobDestination.append(blockSites[block])
508     common.logger.debug(5,"Job "+str(jobCount+1)+" Destination: "+str(self.jobDestination[jobCount]))
509     # increase counter
510     jobCount = jobCount + 1
511     totalEventCount = totalEventCount + eventsPerJobRequested
512     eventsRemaining = eventsRemaining - eventsPerJobRequested
513     # calculate skip events for last file
514     # use filesEventCount (contains several files), jobSkipEventCount and eventsPerJobRequest
515     jobSkipEventCount = eventsPerJobRequested - (filesEventCount - jobSkipEventCount - self.eventsbyfile[file])
516     # remove all but the last file
517     filesEventCount = self.eventsbyfile[file]
518     parString = "\\{"
519     parString += '\\\"' + file + '\\\"\,'
520     pass # END if
521     pass # END while (iterate over files in the block)
522 gutsche 1.35 pass # END while (iterate over blocks in the dataset)
523 slacapra 1.41 self.ncjobs = self.total_number_of_jobs = jobCount
524 gutsche 1.38 if (eventsRemaining > 0 and jobCount < totalNumberOfJobs ):
525 gutsche 1.35 common.logger.message("Could not run on all requested events because some blocks not hosted at allowed sites.")
526 mkirn 1.37 common.logger.message("\n"+str(jobCount)+" job(s) can run on "+str(totalEventCount)+" events.\n")
527 slacapra 1.22
528 slacapra 1.9 self.list_of_args = list_of_lists
529     return
530    
531 slacapra 1.21 def jobSplittingNoInput(self):
532 slacapra 1.9 """
533     Perform job splitting based on number of event per job
534     """
535     common.logger.debug(5,'Splitting per events')
536     common.logger.message('Required '+str(self.eventsPerJob)+' events per job ')
537 slacapra 1.22 common.logger.message('Required '+str(self.theNumberOfJobs)+' jobs in total ')
538 slacapra 1.9 common.logger.message('Required '+str(self.total_number_of_events)+' events in total ')
539    
540 slacapra 1.10 if (self.total_number_of_events < 0):
541     msg='Cannot split jobs per Events with "-1" as total number of events'
542     raise CrabException(msg)
543    
544 slacapra 1.22 if (self.selectEventsPerJob):
545 spiga 1.65 if (self.selectTotalNumberEvents):
546     self.total_number_of_jobs = int(self.total_number_of_events/self.eventsPerJob)
547     elif(self.selectNumberOfJobs) :
548     self.total_number_of_jobs =self.theNumberOfJobs
549     self.total_number_of_events =int(self.theNumberOfJobs*self.eventsPerJob)
550    
551 slacapra 1.22 elif (self.selectNumberOfJobs) :
552     self.total_number_of_jobs = self.theNumberOfJobs
553     self.eventsPerJob = int(self.total_number_of_events/self.total_number_of_jobs)
554 spiga 1.65
555 slacapra 1.9 common.logger.debug(5,'N jobs '+str(self.total_number_of_jobs))
556    
557     # is there any remainder?
558     check = int(self.total_number_of_events) - (int(self.total_number_of_jobs)*self.eventsPerJob)
559    
560     common.logger.debug(5,'Check '+str(check))
561    
562 gutsche 1.35 common.logger.message(str(self.total_number_of_jobs)+' jobs can be created, each for '+str(self.eventsPerJob)+' for a total of '+str(self.total_number_of_jobs*self.eventsPerJob)+' events')
563 slacapra 1.9 if check > 0:
564 gutsche 1.35 common.logger.message('Warning: asked '+str(self.total_number_of_events)+' but can do only '+str(int(self.total_number_of_jobs)*self.eventsPerJob))
565 slacapra 1.9
566 slacapra 1.10 # argument is seed number.$i
567 slacapra 1.9 self.list_of_args = []
568     for i in range(self.total_number_of_jobs):
569 gutsche 1.35 ## Since there is no input, any site is good
570 corvo 1.79 # self.jobDestination.append(["Any"])
571 spiga 1.42 self.jobDestination.append([""]) #must be empty to write correctly the xml
572 corvo 1.79 args=''
573 spiga 1.57 if (self.firstRun):
574     ## pythia first run
575 corvo 1.79 #self.list_of_args.append([(str(self.firstRun)+str(i))])
576     args=args+(str(self.firstRun)+str(i))
577 spiga 1.57 else:
578     ## no first run
579 corvo 1.79 #self.list_of_args.append([str(i)])
580     args=args+str(i)
581 slacapra 1.23 if (self.sourceSeed):
582 slacapra 1.28 if (self.sourceSeedVtx):
583 corvo 1.79 ## pythia + vtx random seed
584     #self.list_of_args.append([
585     # str(self.sourceSeed)+str(i),
586     # str(self.sourceSeedVtx)+str(i)
587     # ])
588     args=args+str(',')+str(self.sourceSeed)+str(i)+str(',')+str(self.sourceSeedVtx)+str(i)
589     else:
590     ## only pythia random seed
591     #self.list_of_args.append([(str(self.sourceSeed)+str(i))])
592     args=args +str(',')+str(self.sourceSeed)+str(i)
593     else:
594     ## no random seed
595     if str(args)=='': args=args+(str(self.firstRun)+str(i))
596     arguments=args.split(',')
597     if len(arguments)==3:self.list_of_args.append([str(arguments[0]),str(arguments[1]),str(arguments[2])])
598     elif len(arguments)==2:self.list_of_args.append([str(arguments[0]),str(arguments[1])])
599     else :self.list_of_args.append([str(arguments[0])])
600    
601     # print self.list_of_args
602 gutsche 1.3
603     return
604    
605 spiga 1.42
606     def jobSplittingForScript(self):#CarlosDaniele
607     """
608     Perform job splitting based on number of job
609     """
610     common.logger.debug(5,'Splitting per job')
611     common.logger.message('Required '+str(self.theNumberOfJobs)+' jobs in total ')
612    
613     self.total_number_of_jobs = self.theNumberOfJobs
614    
615     common.logger.debug(5,'N jobs '+str(self.total_number_of_jobs))
616    
617     common.logger.message(str(self.total_number_of_jobs)+' jobs can be created')
618    
619     # argument is seed number.$i
620     self.list_of_args = []
621     for i in range(self.total_number_of_jobs):
622     ## Since there is no input, any site is good
623     # self.jobDestination.append(["Any"])
624     self.jobDestination.append([""])
625     ## no random seed
626     self.list_of_args.append([str(i)])
627     return
628    
629 gutsche 1.3 def split(self, jobParams):
630    
631     common.jobDB.load()
632     #### Fabio
633     njobs = self.total_number_of_jobs
634 slacapra 1.9 arglist = self.list_of_args
635 gutsche 1.3 # create the empty structure
636     for i in range(njobs):
637     jobParams.append("")
638    
639     for job in range(njobs):
640 slacapra 1.17 jobParams[job] = arglist[job]
641     # print str(arglist[job])
642     # print jobParams[job]
643 gutsche 1.3 common.jobDB.setArguments(job, jobParams[job])
644 gutsche 1.35 common.logger.debug(5,"Job "+str(job)+" Destination: "+str(self.jobDestination[job]))
645     common.jobDB.setDestination(job, self.jobDestination[job])
646 gutsche 1.3
647     common.jobDB.save()
648     return
649    
650     def getJobTypeArguments(self, nj, sched):
651 slacapra 1.17 result = ''
652     for i in common.jobDB.arguments(nj):
653     result=result+str(i)+" "
654     return result
655 gutsche 1.3
656     def numberOfJobs(self):
657     # Fabio
658     return self.total_number_of_jobs
659    
660 slacapra 1.1 def getTarBall(self, exe):
661     """
662     Return the TarBall with lib and exe
663     """
664    
665     # if it exist, just return it
666 corvo 1.56 #
667     # Marco. Let's start to use relative path for Boss XML files
668     #
669     self.tgzNameWithPath = common.work_space.pathForTgz()+'share/'+self.tgz_name
670 slacapra 1.1 if os.path.exists(self.tgzNameWithPath):
671     return self.tgzNameWithPath
672    
673     # Prepare a tar gzipped file with user binaries.
674     self.buildTar_(exe)
675    
676     return string.strip(self.tgzNameWithPath)
677    
678     def buildTar_(self, executable):
679    
680     # First of all declare the user Scram area
681     swArea = self.scram.getSWArea_()
682     #print "swArea = ", swArea
683 slacapra 1.63 # swVersion = self.scram.getSWVersion()
684     # print "swVersion = ", swVersion
685 slacapra 1.1 swReleaseTop = self.scram.getReleaseTop_()
686     #print "swReleaseTop = ", swReleaseTop
687    
688     ## check if working area is release top
689     if swReleaseTop == '' or swArea == swReleaseTop:
690     return
691    
692 slacapra 1.61 import tarfile
693     try: # create tar ball
694     tar = tarfile.open(self.tgzNameWithPath, "w:gz")
695     ## First find the executable
696 corvo 1.79 if (self.executable != ''):
697 slacapra 1.61 exeWithPath = self.scram.findFile_(executable)
698     if ( not exeWithPath ):
699     raise CrabException('User executable '+executable+' not found')
700    
701     ## then check if it's private or not
702     if exeWithPath.find(swReleaseTop) == -1:
703     # the exe is private, so we must ship
704     common.logger.debug(5,"Exe "+exeWithPath+" to be tarred")
705     path = swArea+'/'
706 corvo 1.79 exe = string.replace(exeWithPath, path,'')
707     tar.add(path+exe,exe)
708 slacapra 1.61 pass
709     else:
710     # the exe is from release, we'll find it on WN
711     pass
712    
713     ## Now get the libraries: only those in local working area
714     libDir = 'lib'
715     lib = swArea+'/' +libDir
716     common.logger.debug(5,"lib "+lib+" to be tarred")
717     if os.path.exists(lib):
718     tar.add(lib,libDir)
719    
720     ## Now check if module dir is present
721     moduleDir = 'module'
722     module = swArea + '/' + moduleDir
723     if os.path.isdir(module):
724     tar.add(module,moduleDir)
725    
726     ## Now check if any data dir(s) is present
727     swAreaLen=len(swArea)
728     for root, dirs, files in os.walk(swArea):
729     if "data" in dirs:
730     common.logger.debug(5,"data "+root+"/data"+" to be tarred")
731     tar.add(root+"/data",root[swAreaLen:]+"/data")
732    
733     ## Add ProdAgent dir to tar
734     paDir = 'ProdAgentApi'
735     pa = os.environ['CRABDIR'] + '/' + 'ProdAgentApi'
736     if os.path.isdir(pa):
737     tar.add(pa,paDir)
738    
739     common.logger.debug(5,"Files added to "+self.tgzNameWithPath+" : "+str(tar.getnames()))
740     tar.close()
741     except :
742     raise CrabException('Could not create tar-ball')
743 gutsche 1.72
744     ## check for tarball size
745     tarballinfo = os.stat(self.tgzNameWithPath)
746     if ( tarballinfo.st_size > self.MaxTarBallSize*1024*1024 ) :
747     raise CrabException('Input sandbox size of ' + str(float(tarballinfo.st_size)/1024.0/1024.0) + ' MB is larger than the allowed ' + str(self.MaxTarBallSize) + ' MB input sandbox limit and not supported by the used GRID submission system. Please make sure that no unnecessary files are in all data directories in your local CMSSW project area as they are automatically packed into the input sandbox.')
748    
749 slacapra 1.61 ## create tar-ball with ML stuff
750 corvo 1.58 self.MLtgzfile = common.work_space.pathForTgz()+'share/MLfiles.tgz'
751 slacapra 1.61 try:
752     tar = tarfile.open(self.MLtgzfile, "w:gz")
753     path=os.environ['CRABDIR'] + '/python/'
754     for file in ['report.py', 'DashboardAPI.py', 'Logger.py', 'ProcInfo.py', 'apmon.py', 'parseCrabFjr.py']:
755     tar.add(path+file,file)
756     common.logger.debug(5,"Files added to "+self.MLtgzfile+" : "+str(tar.getnames()))
757     tar.close()
758     except :
759 corvo 1.58 raise CrabException('Could not create ML files tar-ball')
760    
761 slacapra 1.1 return
762    
763     def wsSetupEnvironment(self, nj):
764     """
765     Returns part of a job script which prepares
766     the execution environment for the job 'nj'.
767     """
768     # Prepare JobType-independent part
769 gutsche 1.3 txt = ''
770    
771     ## OLI_Daniele at this level middleware already known
772    
773     txt += 'if [ $middleware == LCG ]; then \n'
774     txt += self.wsSetupCMSLCGEnvironment_()
775     txt += 'elif [ $middleware == OSG ]; then\n'
776 gutsche 1.43 txt += ' WORKING_DIR=`/bin/mktemp -d $OSG_WN_TMP/cms_XXXXXXXXXXXX`\n'
777     txt += ' echo "Created working directory: $WORKING_DIR"\n'
778 gutsche 1.3 txt += ' if [ ! -d $WORKING_DIR ] ;then\n'
779 gutsche 1.7 txt += ' echo "SET_CMS_ENV 10016 ==> OSG $WORKING_DIR could not be created on WN `hostname`"\n'
780     txt += ' echo "JOB_EXIT_STATUS = 10016"\n'
781     txt += ' echo "JobExitCode=10016" | tee -a $RUNTIME_AREA/$repo\n'
782     txt += ' dumpStatus $RUNTIME_AREA/$repo\n'
783 gutsche 1.13 txt += ' rm -f $RUNTIME_AREA/$repo \n'
784     txt += ' echo "MonitorJobID=`echo $MonitorJobID`" | tee -a $RUNTIME_AREA/$repo \n'
785     txt += ' echo "MonitorID=`echo $MonitorID`" | tee -a $RUNTIME_AREA/$repo\n'
786 gutsche 1.3 txt += ' exit 1\n'
787     txt += ' fi\n'
788     txt += '\n'
789     txt += ' echo "Change to working directory: $WORKING_DIR"\n'
790     txt += ' cd $WORKING_DIR\n'
791     txt += self.wsSetupCMSOSGEnvironment_()
792     txt += 'fi\n'
793 slacapra 1.1
794     # Prepare JobType-specific part
795     scram = self.scram.commandName()
796     txt += '\n\n'
797     txt += 'echo "### SPECIFIC JOB SETUP ENVIRONMENT ###"\n'
798 slacapra 1.84 txt += 'echo "Setting SCRAM_ARCH='+self.executable_arch+'"\n'
799     txt += 'export SCRAM_ARCH='+self.executable_arch+'\n'
800 slacapra 1.1 txt += scram+' project CMSSW '+self.version+'\n'
801     txt += 'status=$?\n'
802     txt += 'if [ $status != 0 ] ; then\n'
803 gutsche 1.7 txt += ' echo "SET_EXE_ENV 10034 ==>ERROR CMSSW '+self.version+' not found on `hostname`" \n'
804 gutsche 1.3 txt += ' echo "JOB_EXIT_STATUS = 10034"\n'
805 gutsche 1.7 txt += ' echo "JobExitCode=10034" | tee -a $RUNTIME_AREA/$repo\n'
806 slacapra 1.1 txt += ' dumpStatus $RUNTIME_AREA/$repo\n'
807 gutsche 1.13 txt += ' rm -f $RUNTIME_AREA/$repo \n'
808     txt += ' echo "MonitorJobID=`echo $MonitorJobID`" | tee -a $RUNTIME_AREA/$repo \n'
809     txt += ' echo "MonitorID=`echo $MonitorID`" | tee -a $RUNTIME_AREA/$repo\n'
810 gutsche 1.3 ## OLI_Daniele
811     txt += ' if [ $middleware == OSG ]; then \n'
812     txt += ' echo "Remove working directory: $WORKING_DIR"\n'
813     txt += ' cd $RUNTIME_AREA\n'
814     txt += ' /bin/rm -rf $WORKING_DIR\n'
815     txt += ' if [ -d $WORKING_DIR ] ;then\n'
816 gutsche 1.7 txt += ' echo "SET_CMS_ENV 10018 ==> OSG $WORKING_DIR could not be deleted on WN `hostname` after CMSSW CMSSW_0_6_1 not found on `hostname`"\n'
817     txt += ' echo "JOB_EXIT_STATUS = 10018"\n'
818     txt += ' echo "JobExitCode=10018" | tee -a $RUNTIME_AREA/$repo\n'
819     txt += ' dumpStatus $RUNTIME_AREA/$repo\n'
820 gutsche 1.13 txt += ' rm -f $RUNTIME_AREA/$repo \n'
821     txt += ' echo "MonitorJobID=`echo $MonitorJobID`" | tee -a $RUNTIME_AREA/$repo \n'
822     txt += ' echo "MonitorID=`echo $MonitorID`" | tee -a $RUNTIME_AREA/$repo\n'
823 gutsche 1.3 txt += ' fi\n'
824     txt += ' fi \n'
825     txt += ' exit 1 \n'
826 slacapra 1.1 txt += 'fi \n'
827     txt += 'echo "CMSSW_VERSION = '+self.version+'"\n'
828     txt += 'cd '+self.version+'\n'
829     ### needed grep for bug in scramv1 ###
830 corvo 1.58 txt += scram+' runtime -sh\n'
831 slacapra 1.1 txt += 'eval `'+scram+' runtime -sh | grep -v SCRAMRT_LSB_JOBNAME`\n'
832 corvo 1.58 txt += 'echo $PATH\n'
833 slacapra 1.1
834     # Handle the arguments:
835     txt += "\n"
836 gutsche 1.7 txt += "## number of arguments (first argument always jobnumber)\n"
837 slacapra 1.1 txt += "\n"
838 mkirn 1.32 # txt += "narg=$#\n"
839     txt += "if [ $nargs -lt 2 ]\n"
840 slacapra 1.1 txt += "then\n"
841 mkirn 1.33 txt += " echo 'SET_EXE_ENV 1 ==> ERROR Too few arguments' +$nargs+ \n"
842 gutsche 1.3 txt += ' echo "JOB_EXIT_STATUS = 50113"\n'
843 gutsche 1.7 txt += ' echo "JobExitCode=50113" | tee -a $RUNTIME_AREA/$repo\n'
844 slacapra 1.1 txt += ' dumpStatus $RUNTIME_AREA/$repo\n'
845 gutsche 1.13 txt += ' rm -f $RUNTIME_AREA/$repo \n'
846     txt += ' echo "MonitorJobID=`echo $MonitorJobID`" | tee -a $RUNTIME_AREA/$repo \n'
847     txt += ' echo "MonitorID=`echo $MonitorID`" | tee -a $RUNTIME_AREA/$repo\n'
848 gutsche 1.3 ## OLI_Daniele
849     txt += ' if [ $middleware == OSG ]; then \n'
850     txt += ' echo "Remove working directory: $WORKING_DIR"\n'
851     txt += ' cd $RUNTIME_AREA\n'
852     txt += ' /bin/rm -rf $WORKING_DIR\n'
853     txt += ' if [ -d $WORKING_DIR ] ;then\n'
854 gutsche 1.7 txt += ' echo "SET_EXE_ENV 50114 ==> OSG $WORKING_DIR could not be deleted on WN `hostname` after Too few arguments for CRAB job wrapper"\n'
855     txt += ' echo "JOB_EXIT_STATUS = 50114"\n'
856     txt += ' echo "JobExitCode=50114" | tee -a $RUNTIME_AREA/$repo\n'
857     txt += ' dumpStatus $RUNTIME_AREA/$repo\n'
858 gutsche 1.13 txt += ' rm -f $RUNTIME_AREA/$repo \n'
859     txt += ' echo "MonitorJobID=`echo $MonitorJobID`" | tee -a $RUNTIME_AREA/$repo \n'
860     txt += ' echo "MonitorID=`echo $MonitorID`" | tee -a $RUNTIME_AREA/$repo\n'
861 gutsche 1.3 txt += ' fi\n'
862     txt += ' fi \n'
863 slacapra 1.1 txt += " exit 1\n"
864     txt += "fi\n"
865     txt += "\n"
866    
867     # Prepare job-specific part
868     job = common.job_list[nj]
869 spiga 1.42 if self.pset != None: #CarlosDaniele
870     pset = os.path.basename(job.configFilename())
871     txt += '\n'
872     if (self.datasetPath): # standard job
873     #txt += 'InputFiles=$2\n'
874     txt += 'InputFiles=${args[1]}\n'
875     txt += 'MaxEvents=${args[2]}\n'
876     txt += 'SkipEvents=${args[3]}\n'
877     txt += 'echo "Inputfiles:<$InputFiles>"\n'
878 corvo 1.79 txt += 'sed "s#{\'INPUT\'}#$InputFiles#" $RUNTIME_AREA/'+pset+' > pset_tmp_1.cfg\n'
879 spiga 1.42 txt += 'echo "MaxEvents:<$MaxEvents>"\n'
880 corvo 1.79 txt += 'sed "s#INPUTMAXEVENTS#$MaxEvents#" pset_tmp_1.cfg > pset_tmp_2.cfg\n'
881 spiga 1.42 txt += 'echo "SkipEvents:<$SkipEvents>"\n'
882 corvo 1.79 txt += 'sed "s#INPUTSKIPEVENTS#$SkipEvents#" pset_tmp_2.cfg > pset.cfg\n'
883 spiga 1.42 else: # pythia like job
884 corvo 1.79 if (self.sourceSeed):
885     txt += 'FirstRun=${args[1]}\n'
886 spiga 1.57 txt += 'echo "FirstRun: <$FirstRun>"\n'
887 corvo 1.79 txt += 'sed "s#\<INPUTFIRSTRUN\>#$FirstRun#" $RUNTIME_AREA/'+pset+' > tmp_1.cfg\n'
888     else:
889     txt += '# Copy untouched pset\n'
890     txt += 'cp $RUNTIME_AREA/'+pset+' tmp_1.cfg\n'
891 spiga 1.57 if (self.sourceSeed):
892 corvo 1.79 # txt += 'Seed=$2\n'
893     txt += 'Seed=${args[2]}\n'
894 spiga 1.42 txt += 'echo "Seed: <$Seed>"\n'
895 corvo 1.79 txt += 'sed "s#\<INPUT\>#$Seed#" tmp_1.cfg > tmp_2.cfg\n'
896 spiga 1.42 if (self.sourceSeedVtx):
897 corvo 1.79 # txt += 'VtxSeed=$3\n'
898     txt += 'VtxSeed=${args[3]}\n'
899 spiga 1.42 txt += 'echo "VtxSeed: <$VtxSeed>"\n'
900 corvo 1.79 txt += 'sed "s#INPUTVTX#$VtxSeed#" tmp_2.cfg > pset.cfg\n'
901     else:
902     txt += 'mv tmp_2.cfg pset.cfg\n'
903     else:
904     txt += 'mv tmp_1.cfg pset.cfg\n'
905     # txt += '# Copy untouched pset\n'
906     # txt += 'cp $RUNTIME_AREA/'+pset+' pset.cfg\n'
907 slacapra 1.24
908 slacapra 1.1
909     if len(self.additional_inbox_files) > 0:
910     for file in self.additional_inbox_files:
911 mkirn 1.31 relFile = file.split("/")[-1]
912     txt += 'if [ -e $RUNTIME_AREA/'+relFile+' ] ; then\n'
913     txt += ' cp $RUNTIME_AREA/'+relFile+' .\n'
914     txt += ' chmod +x '+relFile+'\n'
915 slacapra 1.1 txt += 'fi\n'
916     pass
917    
918 spiga 1.42 if self.pset != None: #CarlosDaniele
919     txt += 'echo "### END JOB SETUP ENVIRONMENT ###"\n\n'
920    
921     txt += '\n'
922     txt += 'echo "***** cat pset.cfg *********"\n'
923     txt += 'cat pset.cfg\n'
924     txt += 'echo "****** end pset.cfg ********"\n'
925     txt += '\n'
926     # txt += 'echo "***** cat pset1.cfg *********"\n'
927     # txt += 'cat pset1.cfg\n'
928     # txt += 'echo "****** end pset1.cfg ********"\n'
929 gutsche 1.3 return txt
930    
931 slacapra 1.63 def wsBuildExe(self, nj=0):
932 gutsche 1.3 """
933     Put in the script the commands to build an executable
934     or a library.
935     """
936    
937     txt = ""
938    
939     if os.path.isfile(self.tgzNameWithPath):
940     txt += 'echo "tar xzvf $RUNTIME_AREA/'+os.path.basename(self.tgzNameWithPath)+'"\n'
941     txt += 'tar xzvf $RUNTIME_AREA/'+os.path.basename(self.tgzNameWithPath)+'\n'
942     txt += 'untar_status=$? \n'
943     txt += 'if [ $untar_status -ne 0 ]; then \n'
944     txt += ' echo "SET_EXE 1 ==> ERROR Untarring .tgz file failed"\n'
945     txt += ' echo "JOB_EXIT_STATUS = $untar_status" \n'
946 gutsche 1.7 txt += ' echo "JobExitCode=$untar_status" | tee -a $RUNTIME_AREA/$repo\n'
947 gutsche 1.3 txt += ' if [ $middleware == OSG ]; then \n'
948     txt += ' echo "Remove working directory: $WORKING_DIR"\n'
949     txt += ' cd $RUNTIME_AREA\n'
950     txt += ' /bin/rm -rf $WORKING_DIR\n'
951     txt += ' if [ -d $WORKING_DIR ] ;then\n'
952 gutsche 1.13 txt += ' echo "SET_EXE 50999 ==> OSG $WORKING_DIR could not be deleted on WN `hostname` after Untarring .tgz file failed"\n'
953     txt += ' echo "JOB_EXIT_STATUS = 50999"\n'
954     txt += ' echo "JobExitCode=50999" | tee -a $RUNTIME_AREA/$repo\n'
955     txt += ' dumpStatus $RUNTIME_AREA/$repo\n'
956     txt += ' rm -f $RUNTIME_AREA/$repo \n'
957     txt += ' echo "MonitorJobID=`echo $MonitorJobID`" | tee -a $RUNTIME_AREA/$repo \n'
958     txt += ' echo "MonitorID=`echo $MonitorID`" | tee -a $RUNTIME_AREA/$repo\n'
959 gutsche 1.3 txt += ' fi\n'
960     txt += ' fi \n'
961     txt += ' \n'
962 gutsche 1.7 txt += ' exit 1 \n'
963 gutsche 1.3 txt += 'else \n'
964     txt += ' echo "Successful untar" \n'
965     txt += 'fi \n'
966 gutsche 1.50 txt += '\n'
967     txt += 'echo "Include ProdAgentApi in PYTHONPATH"\n'
968     txt += 'if [ -z "$PYTHONPATH" ]; then\n'
969     txt += ' export PYTHONPATH=ProdAgentApi\n'
970     txt += 'else\n'
971     txt += ' export PYTHONPATH=ProdAgentApi:${PYTHONPATH}\n'
972     txt += 'fi\n'
973     txt += '\n'
974    
975 gutsche 1.3 pass
976    
977 slacapra 1.1 return txt
978    
979     def modifySteeringCards(self, nj):
980     """
981     modify the card provided by the user,
982     writing a new card into share dir
983     """
984    
985     def executableName(self):
986 slacapra 1.70 if self.scriptExe: #CarlosDaniele
987 spiga 1.42 return "sh "
988     else:
989     return self.executable
990 slacapra 1.1
991     def executableArgs(self):
992 slacapra 1.70 if self.scriptExe:#CarlosDaniele
993 spiga 1.42 return self.scriptExe + " $NJob"
994     else:
995     return " -p pset.cfg"
996 slacapra 1.1
997     def inputSandbox(self, nj):
998     """
999     Returns a list of filenames to be put in JDL input sandbox.
1000     """
1001     inp_box = []
1002 slacapra 1.53 # # dict added to delete duplicate from input sandbox file list
1003     # seen = {}
1004 slacapra 1.1 ## code
1005     if os.path.isfile(self.tgzNameWithPath):
1006     inp_box.append(self.tgzNameWithPath)
1007 corvo 1.58 if os.path.isfile(self.MLtgzfile):
1008     inp_box.append(self.MLtgzfile)
1009 slacapra 1.1 ## config
1010 slacapra 1.70 if not self.pset is None:
1011 corvo 1.56 inp_box.append(common.work_space.pathForTgz() + 'job/' + self.configFilename())
1012 slacapra 1.1 ## additional input files
1013 slacapra 1.70 for file in self.additional_inbox_files:
1014     inp_box.append(file)
1015 slacapra 1.1 return inp_box
1016    
1017     def outputSandbox(self, nj):
1018     """
1019     Returns a list of filenames to be put in JDL output sandbox.
1020     """
1021     out_box = []
1022    
1023     ## User Declared output files
1024 slacapra 1.54 for out in (self.output_file+self.output_file_sandbox):
1025 slacapra 1.1 n_out = nj + 1
1026     out_box.append(self.numberFile_(out,str(n_out)))
1027     return out_box
1028    
1029     def prepareSteeringCards(self):
1030     """
1031     Make initial modifications of the user's steering card file.
1032     """
1033     return
1034    
1035     def wsRenameOutput(self, nj):
1036     """
1037     Returns part of a job script which renames the produced files.
1038     """
1039    
1040     txt = '\n'
1041 gutsche 1.7 txt += '# directory content\n'
1042     txt += 'ls \n'
1043 slacapra 1.54
1044     for fileWithSuffix in (self.output_file+self.output_file_sandbox):
1045 slacapra 1.1 output_file_num = self.numberFile_(fileWithSuffix, '$NJob')
1046     txt += '\n'
1047 gutsche 1.7 txt += '# check output file\n'
1048 slacapra 1.1 txt += 'ls '+fileWithSuffix+'\n'
1049 fanzago 1.18 txt += 'ls_result=$?\n'
1050     txt += 'if [ $ls_result -ne 0 ] ; then\n'
1051     txt += ' echo "ERROR: Problem with output file"\n'
1052 gutsche 1.7 if common.scheduler.boss_scheduler_name == 'condor_g':
1053     txt += ' if [ $middleware == OSG ]; then \n'
1054     txt += ' echo "prepare dummy output file"\n'
1055     txt += ' echo "Processing of job output failed" > $RUNTIME_AREA/'+output_file_num+'\n'
1056     txt += ' fi \n'
1057 slacapra 1.1 txt += 'else\n'
1058     txt += ' cp '+fileWithSuffix+' $RUNTIME_AREA/'+output_file_num+'\n'
1059     txt += 'fi\n'
1060    
1061 gutsche 1.7 txt += 'cd $RUNTIME_AREA\n'
1062 fanzago 1.18 txt += 'cd $RUNTIME_AREA\n'
1063 gutsche 1.3 ### OLI_DANIELE
1064     txt += 'if [ $middleware == OSG ]; then\n'
1065     txt += ' cd $RUNTIME_AREA\n'
1066     txt += ' echo "Remove working directory: $WORKING_DIR"\n'
1067     txt += ' /bin/rm -rf $WORKING_DIR\n'
1068     txt += ' if [ -d $WORKING_DIR ] ;then\n'
1069 gutsche 1.7 txt += ' echo "SET_EXE 60999 ==> OSG $WORKING_DIR could not be deleted on WN `hostname` after cleanup of WN"\n'
1070     txt += ' echo "JOB_EXIT_STATUS = 60999"\n'
1071     txt += ' echo "JobExitCode=60999" | tee -a $RUNTIME_AREA/$repo\n'
1072     txt += ' dumpStatus $RUNTIME_AREA/$repo\n'
1073 gutsche 1.13 txt += ' rm -f $RUNTIME_AREA/$repo \n'
1074     txt += ' echo "MonitorJobID=`echo $MonitorJobID`" | tee -a $RUNTIME_AREA/$repo \n'
1075     txt += ' echo "MonitorID=`echo $MonitorID`" | tee -a $RUNTIME_AREA/$repo\n'
1076 gutsche 1.3 txt += ' fi\n'
1077     txt += 'fi\n'
1078     txt += '\n'
1079 slacapra 1.54
1080     file_list = ''
1081     ## Add to filelist only files to be possibly copied to SE
1082     for fileWithSuffix in self.output_file:
1083     output_file_num = self.numberFile_(fileWithSuffix, '$NJob')
1084     file_list=file_list+output_file_num+' '
1085     file_list=file_list[:-1]
1086     txt += 'file_list="'+file_list+'"\n'
1087    
1088 slacapra 1.1 return txt
1089    
1090     def numberFile_(self, file, txt):
1091     """
1092     append _'txt' before last extension of a file
1093     """
1094     p = string.split(file,".")
1095     # take away last extension
1096     name = p[0]
1097     for x in p[1:-1]:
1098     name=name+"."+x
1099     # add "_txt"
1100     if len(p)>1:
1101     ext = p[len(p)-1]
1102     result = name + '_' + txt + "." + ext
1103     else:
1104     result = name + '_' + txt
1105    
1106     return result
1107    
1108 slacapra 1.63 def getRequirements(self, nj=[]):
1109 slacapra 1.1 """
1110     return job requirements to add to jdl files
1111     """
1112     req = ''
1113 slacapra 1.47 if self.version:
1114 slacapra 1.10 req='Member("VO-cms-' + \
1115 slacapra 1.47 self.version + \
1116 slacapra 1.10 '", other.GlueHostApplicationSoftwareRunTimeEnvironment)'
1117 gutsche 1.35
1118     req = req + ' && (other.GlueHostNetworkAdapterOutboundIP)'
1119    
1120 slacapra 1.1 return req
1121 gutsche 1.3
1122     def configFilename(self):
1123     """ return the config filename """
1124     return self.name()+'.cfg'
1125    
1126     ### OLI_DANIELE
1127     def wsSetupCMSOSGEnvironment_(self):
1128     """
1129     Returns part of a job script which is prepares
1130     the execution environment and which is common for all CMS jobs.
1131     """
1132     txt = '\n'
1133     txt += ' echo "### SETUP CMS OSG ENVIRONMENT ###"\n'
1134     txt += ' if [ -f $GRID3_APP_DIR/cmssoft/cmsset_default.sh ] ;then\n'
1135     txt += ' # Use $GRID3_APP_DIR/cmssoft/cmsset_default.sh to setup cms software\n'
1136     txt += ' source $GRID3_APP_DIR/cmssoft/cmsset_default.sh '+self.version+'\n'
1137 mkirn 1.40 txt += ' elif [ -f $OSG_APP/cmssoft/cms/cmsset_default.sh ] ;then\n'
1138     txt += ' # Use $OSG_APP/cmssoft/cms/cmsset_default.sh to setup cms software\n'
1139     txt += ' source $OSG_APP/cmssoft/cms/cmsset_default.sh '+self.version+'\n'
1140 gutsche 1.3 txt += ' else\n'
1141 mkirn 1.40 txt += ' echo "SET_CMS_ENV 10020 ==> ERROR $GRID3_APP_DIR/cmssoft/cmsset_default.sh and $OSG_APP/cmssoft/cms/cmsset_default.sh file not found"\n'
1142 gutsche 1.3 txt += ' echo "JOB_EXIT_STATUS = 10020"\n'
1143     txt += ' echo "JobExitCode=10020" | tee -a $RUNTIME_AREA/$repo\n'
1144     txt += ' dumpStatus $RUNTIME_AREA/$repo\n'
1145 gutsche 1.13 txt += ' rm -f $RUNTIME_AREA/$repo \n'
1146     txt += ' echo "MonitorJobID=`echo $MonitorJobID`" | tee -a $RUNTIME_AREA/$repo \n'
1147     txt += ' echo "MonitorID=`echo $MonitorID`" | tee -a $RUNTIME_AREA/$repo\n'
1148 gutsche 1.7 txt += ' exit 1\n'
1149 gutsche 1.3 txt += '\n'
1150     txt += ' echo "Remove working directory: $WORKING_DIR"\n'
1151     txt += ' cd $RUNTIME_AREA\n'
1152     txt += ' /bin/rm -rf $WORKING_DIR\n'
1153     txt += ' if [ -d $WORKING_DIR ] ;then\n'
1154 mkirn 1.40 txt += ' echo "SET_CMS_ENV 10017 ==> OSG $WORKING_DIR could not be deleted on WN `hostname` after $GRID3_APP_DIR/cmssoft/cmsset_default.sh and $OSG_APP/cmssoft/cms/cmsset_default.sh file not found"\n'
1155 gutsche 1.7 txt += ' echo "JOB_EXIT_STATUS = 10017"\n'
1156     txt += ' echo "JobExitCode=10017" | tee -a $RUNTIME_AREA/$repo\n'
1157     txt += ' dumpStatus $RUNTIME_AREA/$repo\n'
1158 gutsche 1.13 txt += ' rm -f $RUNTIME_AREA/$repo \n'
1159     txt += ' echo "MonitorJobID=`echo $MonitorJobID`" | tee -a $RUNTIME_AREA/$repo \n'
1160     txt += ' echo "MonitorID=`echo $MonitorID`" | tee -a $RUNTIME_AREA/$repo\n'
1161 gutsche 1.3 txt += ' fi\n'
1162     txt += '\n'
1163 gutsche 1.7 txt += ' exit 1\n'
1164 gutsche 1.3 txt += ' fi\n'
1165     txt += '\n'
1166     txt += ' echo "SET_CMS_ENV 0 ==> setup cms environment ok"\n'
1167     txt += ' echo " END SETUP CMS OSG ENVIRONMENT "\n'
1168    
1169     return txt
1170    
1171     ### OLI_DANIELE
1172     def wsSetupCMSLCGEnvironment_(self):
1173     """
1174     Returns part of a job script which is prepares
1175     the execution environment and which is common for all CMS jobs.
1176     """
1177     txt = ' \n'
1178     txt += ' echo " ### SETUP CMS LCG ENVIRONMENT ### "\n'
1179     txt += ' if [ ! $VO_CMS_SW_DIR ] ;then\n'
1180     txt += ' echo "SET_CMS_ENV 10031 ==> ERROR CMS software dir not found on WN `hostname`"\n'
1181     txt += ' echo "JOB_EXIT_STATUS = 10031" \n'
1182     txt += ' echo "JobExitCode=10031" | tee -a $RUNTIME_AREA/$repo\n'
1183     txt += ' dumpStatus $RUNTIME_AREA/$repo\n'
1184 gutsche 1.13 txt += ' rm -f $RUNTIME_AREA/$repo \n'
1185     txt += ' echo "MonitorJobID=`echo $MonitorJobID`" | tee -a $RUNTIME_AREA/$repo \n'
1186     txt += ' echo "MonitorID=`echo $MonitorID`" | tee -a $RUNTIME_AREA/$repo\n'
1187 gutsche 1.7 txt += ' exit 1\n'
1188 gutsche 1.3 txt += ' else\n'
1189     txt += ' echo "Sourcing environment... "\n'
1190     txt += ' if [ ! -s $VO_CMS_SW_DIR/cmsset_default.sh ] ;then\n'
1191     txt += ' echo "SET_CMS_ENV 10020 ==> ERROR cmsset_default.sh file not found into dir $VO_CMS_SW_DIR"\n'
1192     txt += ' echo "JOB_EXIT_STATUS = 10020"\n'
1193     txt += ' echo "JobExitCode=10020" | tee -a $RUNTIME_AREA/$repo\n'
1194     txt += ' dumpStatus $RUNTIME_AREA/$repo\n'
1195 gutsche 1.13 txt += ' rm -f $RUNTIME_AREA/$repo \n'
1196     txt += ' echo "MonitorJobID=`echo $MonitorJobID`" | tee -a $RUNTIME_AREA/$repo \n'
1197     txt += ' echo "MonitorID=`echo $MonitorID`" | tee -a $RUNTIME_AREA/$repo\n'
1198 gutsche 1.7 txt += ' exit 1\n'
1199 gutsche 1.3 txt += ' fi\n'
1200     txt += ' echo "sourcing $VO_CMS_SW_DIR/cmsset_default.sh"\n'
1201     txt += ' source $VO_CMS_SW_DIR/cmsset_default.sh\n'
1202     txt += ' result=$?\n'
1203     txt += ' if [ $result -ne 0 ]; then\n'
1204     txt += ' echo "SET_CMS_ENV 10032 ==> ERROR problem sourcing $VO_CMS_SW_DIR/cmsset_default.sh"\n'
1205     txt += ' echo "JOB_EXIT_STATUS = 10032"\n'
1206     txt += ' echo "JobExitCode=10032" | tee -a $RUNTIME_AREA/$repo\n'
1207     txt += ' dumpStatus $RUNTIME_AREA/$repo\n'
1208 gutsche 1.13 txt += ' rm -f $RUNTIME_AREA/$repo \n'
1209     txt += ' echo "MonitorJobID=`echo $MonitorJobID`" | tee -a $RUNTIME_AREA/$repo \n'
1210     txt += ' echo "MonitorID=`echo $MonitorID`" | tee -a $RUNTIME_AREA/$repo\n'
1211 gutsche 1.7 txt += ' exit 1\n'
1212 gutsche 1.3 txt += ' fi\n'
1213     txt += ' fi\n'
1214     txt += ' \n'
1215     txt += ' echo "SET_CMS_ENV 0 ==> setup cms environment ok"\n'
1216     txt += ' echo "### END SETUP CMS LCG ENVIRONMENT ###"\n'
1217     return txt
1218 gutsche 1.5
1219     def setParam_(self, param, value):
1220     self._params[param] = value
1221    
1222     def getParams(self):
1223     return self._params
1224 gutsche 1.8
1225     def setTaskid_(self):
1226     self._taskId = self.cfg_params['taskId']
1227    
1228     def getTaskid(self):
1229     return self._taskId
1230 gutsche 1.35
1231     #######################################################################
1232     def uniquelist(self, old):
1233     """
1234     remove duplicates from a list
1235     """
1236     nd={}
1237     for e in old:
1238     nd[e]=0
1239     return nd.keys()