ViewVC Help
View File | Revision Log | Show Annotations | Root Listing
root/cvsroot/COMP/CRAB/python/cms_cmssw.py
Revision: 1.81
Committed: Fri May 11 10:57:12 2007 UTC (17 years, 11 months ago) by spiga
Content type: text/x-python
Branch: MAIN
Changes since 1.80: +0 -6 lines
Log Message:
 removed some line added by cvs update?? (<<<<<<< cms_cmssw.py)

File Contents

# User Rev Content
1 slacapra 1.1 from JobType import JobType
2     from crab_logger import Logger
3     from crab_exceptions import *
4     from crab_util import *
5     import common
6 gutsche 1.3 import PsetManipulator
7 slacapra 1.41 import DataDiscovery
8 gutsche 1.66 import DataDiscovery_DBS2
9 slacapra 1.41 import DataLocation
10 slacapra 1.1 import Scram
11    
12 slacapra 1.70 import os, string, re, shutil, glob
13 slacapra 1.1
14     class Cmssw(JobType):
15 gutsche 1.38 def __init__(self, cfg_params, ncjobs):
16 slacapra 1.1 JobType.__init__(self, 'CMSSW')
17     common.logger.debug(3,'CMSSW::__init__')
18    
19 gutsche 1.3 # Marco.
20     self._params = {}
21     self.cfg_params = cfg_params
22 gutsche 1.38
23 gutsche 1.72 try:
24     self.MaxTarBallSize = float(self.cfg_params['EDG.maxtarballsize'])
25     except KeyError:
26 corvo 1.79 self.MaxTarBallSize = 100.0
27 gutsche 1.72
28 gutsche 1.44 # number of jobs requested to be created, limit obj splitting
29 gutsche 1.38 self.ncjobs = ncjobs
30    
31 slacapra 1.1 log = common.logger
32    
33     self.scram = Scram.Scram(cfg_params)
34     self.additional_inbox_files = []
35     self.scriptExe = ''
36     self.executable = ''
37 slacapra 1.71 self.executable_arch = self.scram.getArch()
38 slacapra 1.1 self.tgz_name = 'default.tgz'
39 corvo 1.56 self.scriptName = 'CMSSW.sh'
40 spiga 1.42 self.pset = '' #scrip use case Da
41     self.datasetPath = '' #scrip use case Da
42 gutsche 1.3
43 gutsche 1.50 # set FJR file name
44     self.fjrFileName = 'crab_fjr.xml'
45    
46 slacapra 1.1 self.version = self.scram.getSWVersion()
47 slacapra 1.55 common.taskDB.setDict('codeVersion',self.version)
48 gutsche 1.5 self.setParam_('application', self.version)
49 slacapra 1.47
50 slacapra 1.1 ### collect Data cards
51 gutsche 1.66
52     ## get DBS mode
53     try:
54 slacapra 1.80 self.use_dbs_1 = int(self.cfg_params['CMSSW.use_dbs_1'])
55 gutsche 1.66 except KeyError:
56 slacapra 1.80 self.use_dbs_1 = 0
57 gutsche 1.66
58 slacapra 1.1 try:
59 slacapra 1.9 tmp = cfg_params['CMSSW.datasetpath']
60     log.debug(6, "CMSSW::CMSSW(): datasetPath = "+tmp)
61     if string.lower(tmp)=='none':
62     self.datasetPath = None
63 slacapra 1.21 self.selectNoInput = 1
64 slacapra 1.9 else:
65     self.datasetPath = tmp
66 slacapra 1.21 self.selectNoInput = 0
67 slacapra 1.1 except KeyError:
68 gutsche 1.3 msg = "Error: datasetpath not defined "
69 slacapra 1.1 raise CrabException(msg)
70 gutsche 1.5
71     # ML monitoring
72     # split dataset path style: /PreProdR3Minbias/SIM/GEN-SIM
73 slacapra 1.9 if not self.datasetPath:
74     self.setParam_('dataset', 'None')
75     self.setParam_('owner', 'None')
76     else:
77     datasetpath_split = self.datasetPath.split("/")
78 slacapra 1.80 if self.use_dbs_1 == 1 :
79     self.setParam_('dataset', datasetpath_split[1])
80     self.setParam_('owner', datasetpath_split[-1])
81     else:
82     self.setParam_('dataset', datasetpath_split[1])
83     self.setParam_('owner', datasetpath_split[2])
84 gutsche 1.8 self.setTaskid_()
85     self.setParam_('taskId', self.cfg_params['taskId'])
86 gutsche 1.5
87 slacapra 1.1 self.dataTiers = []
88    
89     ## now the application
90     try:
91     self.executable = cfg_params['CMSSW.executable']
92 gutsche 1.5 self.setParam_('exe', self.executable)
93 slacapra 1.1 log.debug(6, "CMSSW::CMSSW(): executable = "+self.executable)
94     msg = "Default executable cmsRun overridden. Switch to " + self.executable
95     log.debug(3,msg)
96     except KeyError:
97     self.executable = 'cmsRun'
98 gutsche 1.5 self.setParam_('exe', self.executable)
99 slacapra 1.1 msg = "User executable not defined. Use cmsRun"
100     log.debug(3,msg)
101     pass
102    
103     try:
104     self.pset = cfg_params['CMSSW.pset']
105     log.debug(6, "Cmssw::Cmssw(): PSet file = "+self.pset)
106 spiga 1.42 if self.pset.lower() != 'none' :
107     if (not os.path.exists(self.pset)):
108     raise CrabException("User defined PSet file "+self.pset+" does not exist")
109     else:
110     self.pset = None
111 slacapra 1.1 except KeyError:
112     raise CrabException("PSet file missing. Cannot run cmsRun ")
113    
114     # output files
115 slacapra 1.53 ## stuff which must be returned always via sandbox
116     self.output_file_sandbox = []
117    
118     # add fjr report by default via sandbox
119     self.output_file_sandbox.append(self.fjrFileName)
120    
121     # other output files to be returned via sandbox or copied to SE
122 slacapra 1.1 try:
123     self.output_file = []
124     tmp = cfg_params['CMSSW.output_file']
125     if tmp != '':
126     tmpOutFiles = string.split(cfg_params['CMSSW.output_file'],',')
127     log.debug(7, 'cmssw::cmssw(): output files '+str(tmpOutFiles))
128     for tmp in tmpOutFiles:
129     tmp=string.strip(tmp)
130     self.output_file.append(tmp)
131     pass
132     else:
133 gutsche 1.50 log.message("No output file defined: only stdout/err and the CRAB Framework Job Report will be available")
134 slacapra 1.1 pass
135     pass
136     except KeyError:
137 gutsche 1.50 log.message("No output file defined: only stdout/err and the CRAB Framework Job Report will be available")
138 slacapra 1.1 pass
139    
140     # script_exe file as additional file in inputSandbox
141     try:
142 slacapra 1.10 self.scriptExe = cfg_params['USER.script_exe']
143     if self.scriptExe != '':
144     if not os.path.isfile(self.scriptExe):
145 slacapra 1.64 msg ="ERROR. file "+self.scriptExe+" not found"
146 slacapra 1.10 raise CrabException(msg)
147 spiga 1.42 self.additional_inbox_files.append(string.strip(self.scriptExe))
148 slacapra 1.1 except KeyError:
149 spiga 1.42 self.scriptExe = ''
150 slacapra 1.70
151 spiga 1.42 #CarlosDaniele
152     if self.datasetPath == None and self.pset == None and self.scriptExe == '' :
153 slacapra 1.70 msg ="Error. script_exe not defined"
154 spiga 1.42 raise CrabException(msg)
155    
156 slacapra 1.1 ## additional input files
157     try:
158 slacapra 1.29 tmpAddFiles = string.split(cfg_params['USER.additional_input_files'],',')
159 slacapra 1.70 for tmp in tmpAddFiles:
160     tmp = string.strip(tmp)
161     dirname = ''
162     if not tmp[0]=="/": dirname = "."
163 corvo 1.79 files = glob.glob(os.path.join(dirname, tmp))
164 slacapra 1.70 for file in files:
165     if not os.path.exists(file):
166     raise CrabException("Additional input file not found: "+file)
167 slacapra 1.45 pass
168 corvo 1.79 storedFile = common.work_space.pathForTgz()+file
169 slacapra 1.70 shutil.copyfile(file, storedFile)
170     self.additional_inbox_files.append(string.strip(storedFile))
171 slacapra 1.1 pass
172     pass
173 slacapra 1.70 common.logger.debug(5,"Additional input files: "+str(self.additional_inbox_files))
174 slacapra 1.1 except KeyError:
175     pass
176    
177 slacapra 1.9 # files per job
178 slacapra 1.1 try:
179 gutsche 1.35 if (cfg_params['CMSSW.files_per_jobs']):
180     raise CrabException("files_per_jobs no longer supported. Quitting.")
181 gutsche 1.3 except KeyError:
182 gutsche 1.35 pass
183 gutsche 1.3
184 slacapra 1.9 ## Events per job
185 gutsche 1.3 try:
186 slacapra 1.10 self.eventsPerJob =int( cfg_params['CMSSW.events_per_job'])
187 slacapra 1.9 self.selectEventsPerJob = 1
188 gutsche 1.3 except KeyError:
189 slacapra 1.9 self.eventsPerJob = -1
190     self.selectEventsPerJob = 0
191    
192 slacapra 1.22 ## number of jobs
193     try:
194     self.theNumberOfJobs =int( cfg_params['CMSSW.number_of_jobs'])
195     self.selectNumberOfJobs = 1
196     except KeyError:
197     self.theNumberOfJobs = 0
198     self.selectNumberOfJobs = 0
199 slacapra 1.10
200 gutsche 1.35 try:
201     self.total_number_of_events = int(cfg_params['CMSSW.total_number_of_events'])
202     self.selectTotalNumberEvents = 1
203     except KeyError:
204     self.total_number_of_events = 0
205     self.selectTotalNumberEvents = 0
206    
207 spiga 1.42 if self.pset != None: #CarlosDaniele
208     if ( (self.selectTotalNumberEvents + self.selectEventsPerJob + self.selectNumberOfJobs) != 2 ):
209     msg = 'Must define exactly two of total_number_of_events, events_per_job, or number_of_jobs.'
210     raise CrabException(msg)
211     else:
212     if (self.selectNumberOfJobs == 0):
213     msg = 'Must specify number_of_jobs.'
214     raise CrabException(msg)
215 gutsche 1.35
216 slacapra 1.22 ## source seed for pythia
217     try:
218     self.sourceSeed = int(cfg_params['CMSSW.pythia_seed'])
219     except KeyError:
220 slacapra 1.23 self.sourceSeed = None
221     common.logger.debug(5,"No seed given")
222 slacapra 1.22
223 slacapra 1.28 try:
224     self.sourceSeedVtx = int(cfg_params['CMSSW.vtx_seed'])
225     except KeyError:
226     self.sourceSeedVtx = None
227     common.logger.debug(5,"No vertex seed given")
228 spiga 1.57 try:
229     self.firstRun = int(cfg_params['CMSSW.first_run'])
230     except KeyError:
231     self.firstRun = None
232     common.logger.debug(5,"No first run given")
233 spiga 1.42 if self.pset != None: #CarlosDaniele
234     self.PsetEdit = PsetManipulator.PsetManipulator(self.pset) #Daniele Pset
235 gutsche 1.3
236 slacapra 1.1 #DBSDLS-start
237     ## Initialize the variables that are extracted from DBS/DLS and needed in other places of the code
238     self.maxEvents=0 # max events available ( --> check the requested nb. of evts in Creator.py)
239     self.DBSPaths={} # all dbs paths requested ( --> input to the site local discovery script)
240 gutsche 1.35 self.jobDestination=[] # Site destination(s) for each job (list of lists)
241 slacapra 1.1 ## Perform the data location and discovery (based on DBS/DLS)
242 slacapra 1.9 ## SL: Don't if NONE is specified as input (pythia use case)
243 gutsche 1.35 blockSites = {}
244 slacapra 1.9 if self.datasetPath:
245 gutsche 1.35 blockSites = self.DataDiscoveryAndLocation(cfg_params)
246 slacapra 1.1 #DBSDLS-end
247    
248     self.tgzNameWithPath = self.getTarBall(self.executable)
249 slacapra 1.10
250 slacapra 1.9 ## Select Splitting
251 spiga 1.42 if self.selectNoInput:
252     if self.pset == None: #CarlosDaniele
253     self.jobSplittingForScript()
254     else:
255     self.jobSplittingNoInput()
256 corvo 1.56 else:
257     self.jobSplittingByBlocks(blockSites)
258 gutsche 1.5
259 slacapra 1.22 # modify Pset
260 spiga 1.42 if self.pset != None: #CarlosDaniele
261 corvo 1.79 try:
262     if (self.datasetPath): # standard job
263     # allow to processa a fraction of events in a file
264     self.PsetEdit.inputModule("INPUT")
265     self.PsetEdit.maxEvent("INPUTMAXEVENTS")
266     self.PsetEdit.skipEvent("INPUTSKIPEVENTS")
267     else: # pythia like job
268     self.PsetEdit.maxEvent(self.eventsPerJob)
269     if (self.firstRun):
270     self.PsetEdit.pythiaFirstRun("INPUTFIRSTRUN") #First Run
271     if (self.sourceSeed) :
272     self.PsetEdit.pythiaSeed("INPUT")
273     if (self.sourceSeedVtx) :
274     self.PsetEdit.pythiaSeedVtx("INPUTVTX")
275     # add FrameworkJobReport to parameter-set
276     self.PsetEdit.addCrabFJR(self.fjrFileName)
277     self.PsetEdit.psetWriter(self.configFilename())
278     except:
279     msg='Error while manipuliating ParameterSet: exiting...'
280     raise CrabException(msg)
281 gutsche 1.3
282 slacapra 1.1 def DataDiscoveryAndLocation(self, cfg_params):
283    
284 gutsche 1.3 common.logger.debug(10,"CMSSW::DataDiscoveryAndLocation()")
285    
286     datasetPath=self.datasetPath
287    
288 slacapra 1.1 ## Contact the DBS
289 slacapra 1.41 common.logger.message("Contacting DBS...")
290 slacapra 1.1 try:
291 gutsche 1.66
292 slacapra 1.80 if self.use_dbs_1 == 1 :
293     self.pubdata=DataDiscovery.DataDiscovery(datasetPath, cfg_params)
294     else :
295 gutsche 1.66 self.pubdata=DataDiscovery_DBS2.DataDiscovery_DBS2(datasetPath, cfg_params)
296 slacapra 1.1 self.pubdata.fetchDBSInfo()
297    
298 slacapra 1.41 except DataDiscovery.NotExistingDatasetError, ex :
299 slacapra 1.1 msg = 'ERROR ***: failed Data Discovery in DBS : %s'%ex.getErrorMessage()
300     raise CrabException(msg)
301 slacapra 1.41 except DataDiscovery.NoDataTierinProvenanceError, ex :
302 slacapra 1.1 msg = 'ERROR ***: failed Data Discovery in DBS : %s'%ex.getErrorMessage()
303     raise CrabException(msg)
304 slacapra 1.41 except DataDiscovery.DataDiscoveryError, ex:
305 gutsche 1.66 msg = 'ERROR ***: failed Data Discovery in DBS : %s'%ex.getErrorMessage()
306 slacapra 1.1 raise CrabException(msg)
307 gutsche 1.67 except DataDiscovery_DBS2.NotExistingDatasetError_DBS2, ex :
308     msg = 'ERROR ***: failed Data Discovery in DBS : %s'%ex.getErrorMessage()
309     raise CrabException(msg)
310     except DataDiscovery_DBS2.NoDataTierinProvenanceError_DBS2, ex :
311     msg = 'ERROR ***: failed Data Discovery in DBS : %s'%ex.getErrorMessage()
312     raise CrabException(msg)
313     except DataDiscovery_DBS2.DataDiscoveryError_DBS2, ex:
314     msg = 'ERROR ***: failed Data Discovery in DBS : %s'%ex.getErrorMessage()
315     raise CrabException(msg)
316 slacapra 1.1
317     ## get list of all required data in the form of dbs paths (dbs path = /dataset/datatier/owner)
318 gutsche 1.3 common.logger.message("Required data are :"+self.datasetPath)
319    
320 gutsche 1.35 self.filesbyblock=self.pubdata.getFiles()
321 mkirn 1.37 self.eventsbyblock=self.pubdata.getEventsPerBlock()
322     self.eventsbyfile=self.pubdata.getEventsPerFile()
323 gutsche 1.3
324 slacapra 1.1 ## get max number of events
325     self.maxEvents=self.pubdata.getMaxEvents() ## self.maxEvents used in Creator.py
326 gutsche 1.44 common.logger.message("The number of available events is %s\n"%self.maxEvents)
327 slacapra 1.1
328 slacapra 1.41 common.logger.message("Contacting DLS...")
329 slacapra 1.1 ## Contact the DLS and build a list of sites hosting the fileblocks
330     try:
331 slacapra 1.41 dataloc=DataLocation.DataLocation(self.filesbyblock.keys(),cfg_params)
332 gutsche 1.6 dataloc.fetchDLSInfo()
333 slacapra 1.41 except DataLocation.DataLocationError , ex:
334 slacapra 1.1 msg = 'ERROR ***: failed Data Location in DLS \n %s '%ex.getErrorMessage()
335     raise CrabException(msg)
336    
337    
338 gutsche 1.35 sites = dataloc.getSites()
339     allSites = []
340     listSites = sites.values()
341 slacapra 1.63 for listSite in listSites:
342     for oneSite in listSite:
343 gutsche 1.35 allSites.append(oneSite)
344     allSites = self.uniquelist(allSites)
345 gutsche 1.3
346 gutsche 1.35 common.logger.message("Sites ("+str(len(allSites))+") hosting part/all of dataset: "+str(allSites))
347     common.logger.debug(6, "List of Sites: "+str(allSites))
348     return sites
349 gutsche 1.3
350 gutsche 1.35 def jobSplittingByBlocks(self, blockSites):
351 slacapra 1.9 """
352 gutsche 1.35 Perform job splitting. Jobs run over an integer number of files
353     and no more than one block.
354     ARGUMENT: blockSites: dictionary with blocks as keys and list of host sites as values
355     REQUIRES: self.selectTotalNumberEvents, self.selectEventsPerJob, self.selectNumberofJobs,
356     self.total_number_of_events, self.eventsPerJob, self.theNumberOfJobs,
357     self.maxEvents, self.filesbyblock
358     SETS: self.jobDestination - Site destination(s) for each job (a list of lists)
359     self.total_number_of_jobs - Total # of jobs
360     self.list_of_args - File(s) job will run on (a list of lists)
361     """
362    
363     # ---- Handle the possible job splitting configurations ---- #
364     if (self.selectTotalNumberEvents):
365     totalEventsRequested = self.total_number_of_events
366     if (self.selectEventsPerJob):
367     eventsPerJobRequested = self.eventsPerJob
368     if (self.selectNumberOfJobs):
369     totalEventsRequested = self.theNumberOfJobs * self.eventsPerJob
370    
371     # If user requested all the events in the dataset
372     if (totalEventsRequested == -1):
373     eventsRemaining=self.maxEvents
374     # If user requested more events than are in the dataset
375     elif (totalEventsRequested > self.maxEvents):
376     eventsRemaining = self.maxEvents
377     common.logger.message("Requested "+str(self.total_number_of_events)+ " events, but only "+str(self.maxEvents)+" events are available.")
378     # If user requested less events than are in the dataset
379     else:
380     eventsRemaining = totalEventsRequested
381 slacapra 1.22
382 slacapra 1.41 # If user requested more events per job than are in the dataset
383     if (self.selectEventsPerJob and eventsPerJobRequested > self.maxEvents):
384     eventsPerJobRequested = self.maxEvents
385    
386 gutsche 1.35 # For user info at end
387     totalEventCount = 0
388 gutsche 1.3
389 gutsche 1.35 if (self.selectTotalNumberEvents and self.selectNumberOfJobs):
390     eventsPerJobRequested = int(eventsRemaining/self.theNumberOfJobs)
391 slacapra 1.22
392 gutsche 1.35 if (self.selectNumberOfJobs):
393     common.logger.message("May not create the exact number_of_jobs requested.")
394 slacapra 1.23
395 gutsche 1.38 if ( self.ncjobs == 'all' ) :
396     totalNumberOfJobs = 999999999
397     else :
398     totalNumberOfJobs = self.ncjobs
399    
400    
401 gutsche 1.35 blocks = blockSites.keys()
402     blockCount = 0
403     # Backup variable in case self.maxEvents counted events in a non-included block
404     numBlocksInDataset = len(blocks)
405 gutsche 1.3
406 gutsche 1.35 jobCount = 0
407     list_of_lists = []
408 gutsche 1.3
409 gutsche 1.35 # ---- Iterate over the blocks in the dataset until ---- #
410     # ---- we've met the requested total # of events ---- #
411 gutsche 1.38 while ( (eventsRemaining > 0) and (blockCount < numBlocksInDataset) and (jobCount < totalNumberOfJobs)):
412 gutsche 1.35 block = blocks[blockCount]
413 gutsche 1.44 blockCount += 1
414    
415 gutsche 1.68 if self.eventsbyblock.has_key(block) :
416     numEventsInBlock = self.eventsbyblock[block]
417     common.logger.debug(5,'Events in Block File '+str(numEventsInBlock))
418 slacapra 1.9
419 gutsche 1.68 files = self.filesbyblock[block]
420     numFilesInBlock = len(files)
421     if (numFilesInBlock <= 0):
422     continue
423     fileCount = 0
424    
425     # ---- New block => New job ---- #
426     parString = "\\{"
427     # counter for number of events in files currently worked on
428     filesEventCount = 0
429     # flag if next while loop should touch new file
430     newFile = 1
431     # job event counter
432     jobSkipEventCount = 0
433 slacapra 1.9
434 gutsche 1.68 # ---- Iterate over the files in the block until we've met the requested ---- #
435     # ---- total # of events or we've gone over all the files in this block ---- #
436     while ( (eventsRemaining > 0) and (fileCount < numFilesInBlock) and (jobCount < totalNumberOfJobs) ):
437     file = files[fileCount]
438     if newFile :
439     try:
440     numEventsInFile = self.eventsbyfile[file]
441     common.logger.debug(6, "File "+str(file)+" has "+str(numEventsInFile)+" events")
442     # increase filesEventCount
443     filesEventCount += numEventsInFile
444     # Add file to current job
445     parString += '\\\"' + file + '\\\"\,'
446     newFile = 0
447     except KeyError:
448     common.logger.message("File "+str(file)+" has unknown number of events: skipping")
449 slacapra 1.41
450 gutsche 1.38
451 gutsche 1.68 # if less events in file remain than eventsPerJobRequested
452     if ( filesEventCount - jobSkipEventCount < eventsPerJobRequested ) :
453     # if last file in block
454     if ( fileCount == numFilesInBlock-1 ) :
455     # end job using last file, use remaining events in block
456     # close job and touch new file
457     fullString = parString[:-2]
458     fullString += '\\}'
459     list_of_lists.append([fullString,str(-1),str(jobSkipEventCount)])
460     common.logger.debug(3,"Job "+str(jobCount+1)+" can run over "+str(filesEventCount - jobSkipEventCount)+" events (last file in block).")
461     self.jobDestination.append(blockSites[block])
462     common.logger.debug(5,"Job "+str(jobCount+1)+" Destination: "+str(self.jobDestination[jobCount]))
463     # reset counter
464     jobCount = jobCount + 1
465     totalEventCount = totalEventCount + filesEventCount - jobSkipEventCount
466     eventsRemaining = eventsRemaining - filesEventCount + jobSkipEventCount
467     jobSkipEventCount = 0
468     # reset file
469     parString = "\\{"
470     filesEventCount = 0
471     newFile = 1
472     fileCount += 1
473     else :
474     # go to next file
475     newFile = 1
476     fileCount += 1
477     # if events in file equal to eventsPerJobRequested
478     elif ( filesEventCount - jobSkipEventCount == eventsPerJobRequested ) :
479 gutsche 1.38 # close job and touch new file
480     fullString = parString[:-2]
481     fullString += '\\}'
482 gutsche 1.68 list_of_lists.append([fullString,str(eventsPerJobRequested),str(jobSkipEventCount)])
483     common.logger.debug(3,"Job "+str(jobCount+1)+" can run over "+str(eventsPerJobRequested)+" events.")
484 gutsche 1.38 self.jobDestination.append(blockSites[block])
485     common.logger.debug(5,"Job "+str(jobCount+1)+" Destination: "+str(self.jobDestination[jobCount]))
486     # reset counter
487     jobCount = jobCount + 1
488 gutsche 1.68 totalEventCount = totalEventCount + eventsPerJobRequested
489     eventsRemaining = eventsRemaining - eventsPerJobRequested
490 gutsche 1.38 jobSkipEventCount = 0
491     # reset file
492     parString = "\\{"
493     filesEventCount = 0
494     newFile = 1
495     fileCount += 1
496 gutsche 1.68
497     # if more events in file remain than eventsPerJobRequested
498 gutsche 1.38 else :
499 gutsche 1.68 # close job but don't touch new file
500     fullString = parString[:-2]
501     fullString += '\\}'
502     list_of_lists.append([fullString,str(eventsPerJobRequested),str(jobSkipEventCount)])
503     common.logger.debug(3,"Job "+str(jobCount+1)+" can run over "+str(eventsPerJobRequested)+" events.")
504     self.jobDestination.append(blockSites[block])
505     common.logger.debug(5,"Job "+str(jobCount+1)+" Destination: "+str(self.jobDestination[jobCount]))
506     # increase counter
507     jobCount = jobCount + 1
508     totalEventCount = totalEventCount + eventsPerJobRequested
509     eventsRemaining = eventsRemaining - eventsPerJobRequested
510     # calculate skip events for last file
511     # use filesEventCount (contains several files), jobSkipEventCount and eventsPerJobRequest
512     jobSkipEventCount = eventsPerJobRequested - (filesEventCount - jobSkipEventCount - self.eventsbyfile[file])
513     # remove all but the last file
514     filesEventCount = self.eventsbyfile[file]
515     parString = "\\{"
516     parString += '\\\"' + file + '\\\"\,'
517     pass # END if
518     pass # END while (iterate over files in the block)
519 gutsche 1.35 pass # END while (iterate over blocks in the dataset)
520 slacapra 1.41 self.ncjobs = self.total_number_of_jobs = jobCount
521 gutsche 1.38 if (eventsRemaining > 0 and jobCount < totalNumberOfJobs ):
522 gutsche 1.35 common.logger.message("Could not run on all requested events because some blocks not hosted at allowed sites.")
523 mkirn 1.37 common.logger.message("\n"+str(jobCount)+" job(s) can run on "+str(totalEventCount)+" events.\n")
524 slacapra 1.22
525 slacapra 1.9 self.list_of_args = list_of_lists
526     return
527    
528 slacapra 1.21 def jobSplittingNoInput(self):
529 slacapra 1.9 """
530     Perform job splitting based on number of event per job
531     """
532     common.logger.debug(5,'Splitting per events')
533     common.logger.message('Required '+str(self.eventsPerJob)+' events per job ')
534 slacapra 1.22 common.logger.message('Required '+str(self.theNumberOfJobs)+' jobs in total ')
535 slacapra 1.9 common.logger.message('Required '+str(self.total_number_of_events)+' events in total ')
536    
537 slacapra 1.10 if (self.total_number_of_events < 0):
538     msg='Cannot split jobs per Events with "-1" as total number of events'
539     raise CrabException(msg)
540    
541 slacapra 1.22 if (self.selectEventsPerJob):
542 spiga 1.65 if (self.selectTotalNumberEvents):
543     self.total_number_of_jobs = int(self.total_number_of_events/self.eventsPerJob)
544     elif(self.selectNumberOfJobs) :
545     self.total_number_of_jobs =self.theNumberOfJobs
546     self.total_number_of_events =int(self.theNumberOfJobs*self.eventsPerJob)
547    
548 slacapra 1.22 elif (self.selectNumberOfJobs) :
549     self.total_number_of_jobs = self.theNumberOfJobs
550     self.eventsPerJob = int(self.total_number_of_events/self.total_number_of_jobs)
551 spiga 1.65
552 slacapra 1.9 common.logger.debug(5,'N jobs '+str(self.total_number_of_jobs))
553    
554     # is there any remainder?
555     check = int(self.total_number_of_events) - (int(self.total_number_of_jobs)*self.eventsPerJob)
556    
557     common.logger.debug(5,'Check '+str(check))
558    
559 gutsche 1.35 common.logger.message(str(self.total_number_of_jobs)+' jobs can be created, each for '+str(self.eventsPerJob)+' for a total of '+str(self.total_number_of_jobs*self.eventsPerJob)+' events')
560 slacapra 1.9 if check > 0:
561 gutsche 1.35 common.logger.message('Warning: asked '+str(self.total_number_of_events)+' but can do only '+str(int(self.total_number_of_jobs)*self.eventsPerJob))
562 slacapra 1.9
563 slacapra 1.10 # argument is seed number.$i
564 slacapra 1.9 self.list_of_args = []
565     for i in range(self.total_number_of_jobs):
566 gutsche 1.35 ## Since there is no input, any site is good
567 corvo 1.79 # self.jobDestination.append(["Any"])
568 spiga 1.42 self.jobDestination.append([""]) #must be empty to write correctly the xml
569 corvo 1.79 args=''
570 spiga 1.57 if (self.firstRun):
571     ## pythia first run
572 corvo 1.79 #self.list_of_args.append([(str(self.firstRun)+str(i))])
573     args=args+(str(self.firstRun)+str(i))
574 spiga 1.57 else:
575     ## no first run
576 corvo 1.79 #self.list_of_args.append([str(i)])
577     args=args+str(i)
578 slacapra 1.23 if (self.sourceSeed):
579 slacapra 1.28 if (self.sourceSeedVtx):
580 corvo 1.79 ## pythia + vtx random seed
581     #self.list_of_args.append([
582     # str(self.sourceSeed)+str(i),
583     # str(self.sourceSeedVtx)+str(i)
584     # ])
585     args=args+str(',')+str(self.sourceSeed)+str(i)+str(',')+str(self.sourceSeedVtx)+str(i)
586     else:
587     ## only pythia random seed
588     #self.list_of_args.append([(str(self.sourceSeed)+str(i))])
589     args=args +str(',')+str(self.sourceSeed)+str(i)
590     else:
591     ## no random seed
592     if str(args)=='': args=args+(str(self.firstRun)+str(i))
593     arguments=args.split(',')
594     if len(arguments)==3:self.list_of_args.append([str(arguments[0]),str(arguments[1]),str(arguments[2])])
595     elif len(arguments)==2:self.list_of_args.append([str(arguments[0]),str(arguments[1])])
596     else :self.list_of_args.append([str(arguments[0])])
597    
598     # print self.list_of_args
599 gutsche 1.3
600     return
601    
602 spiga 1.42
603     def jobSplittingForScript(self):#CarlosDaniele
604     """
605     Perform job splitting based on number of job
606     """
607     common.logger.debug(5,'Splitting per job')
608     common.logger.message('Required '+str(self.theNumberOfJobs)+' jobs in total ')
609    
610     self.total_number_of_jobs = self.theNumberOfJobs
611    
612     common.logger.debug(5,'N jobs '+str(self.total_number_of_jobs))
613    
614     common.logger.message(str(self.total_number_of_jobs)+' jobs can be created')
615    
616     # argument is seed number.$i
617     self.list_of_args = []
618     for i in range(self.total_number_of_jobs):
619     ## Since there is no input, any site is good
620     # self.jobDestination.append(["Any"])
621     self.jobDestination.append([""])
622     ## no random seed
623     self.list_of_args.append([str(i)])
624     return
625    
626 gutsche 1.3 def split(self, jobParams):
627    
628     common.jobDB.load()
629     #### Fabio
630     njobs = self.total_number_of_jobs
631 slacapra 1.9 arglist = self.list_of_args
632 gutsche 1.3 # create the empty structure
633     for i in range(njobs):
634     jobParams.append("")
635    
636     for job in range(njobs):
637 slacapra 1.17 jobParams[job] = arglist[job]
638     # print str(arglist[job])
639     # print jobParams[job]
640 gutsche 1.3 common.jobDB.setArguments(job, jobParams[job])
641 gutsche 1.35 common.logger.debug(5,"Job "+str(job)+" Destination: "+str(self.jobDestination[job]))
642     common.jobDB.setDestination(job, self.jobDestination[job])
643 gutsche 1.3
644     common.jobDB.save()
645     return
646    
647     def getJobTypeArguments(self, nj, sched):
648 slacapra 1.17 result = ''
649     for i in common.jobDB.arguments(nj):
650     result=result+str(i)+" "
651     return result
652 gutsche 1.3
653     def numberOfJobs(self):
654     # Fabio
655     return self.total_number_of_jobs
656    
657 slacapra 1.1 def getTarBall(self, exe):
658     """
659     Return the TarBall with lib and exe
660     """
661    
662     # if it exist, just return it
663 corvo 1.56 #
664     # Marco. Let's start to use relative path for Boss XML files
665     #
666     self.tgzNameWithPath = common.work_space.pathForTgz()+'share/'+self.tgz_name
667 slacapra 1.1 if os.path.exists(self.tgzNameWithPath):
668     return self.tgzNameWithPath
669    
670     # Prepare a tar gzipped file with user binaries.
671     self.buildTar_(exe)
672    
673     return string.strip(self.tgzNameWithPath)
674    
675     def buildTar_(self, executable):
676    
677     # First of all declare the user Scram area
678     swArea = self.scram.getSWArea_()
679     #print "swArea = ", swArea
680 slacapra 1.63 # swVersion = self.scram.getSWVersion()
681     # print "swVersion = ", swVersion
682 slacapra 1.1 swReleaseTop = self.scram.getReleaseTop_()
683     #print "swReleaseTop = ", swReleaseTop
684    
685     ## check if working area is release top
686     if swReleaseTop == '' or swArea == swReleaseTop:
687     return
688    
689 slacapra 1.61 import tarfile
690     try: # create tar ball
691     tar = tarfile.open(self.tgzNameWithPath, "w:gz")
692     ## First find the executable
693 corvo 1.79 if (self.executable != ''):
694 slacapra 1.61 exeWithPath = self.scram.findFile_(executable)
695     if ( not exeWithPath ):
696     raise CrabException('User executable '+executable+' not found')
697    
698     ## then check if it's private or not
699     if exeWithPath.find(swReleaseTop) == -1:
700     # the exe is private, so we must ship
701     common.logger.debug(5,"Exe "+exeWithPath+" to be tarred")
702     path = swArea+'/'
703 corvo 1.79 exe = string.replace(exeWithPath, path,'')
704     tar.add(path+exe,exe)
705 slacapra 1.61 pass
706     else:
707     # the exe is from release, we'll find it on WN
708     pass
709    
710     ## Now get the libraries: only those in local working area
711     libDir = 'lib'
712     lib = swArea+'/' +libDir
713     common.logger.debug(5,"lib "+lib+" to be tarred")
714     if os.path.exists(lib):
715     tar.add(lib,libDir)
716    
717     ## Now check if module dir is present
718     moduleDir = 'module'
719     module = swArea + '/' + moduleDir
720     if os.path.isdir(module):
721     tar.add(module,moduleDir)
722    
723     ## Now check if any data dir(s) is present
724     swAreaLen=len(swArea)
725     for root, dirs, files in os.walk(swArea):
726     if "data" in dirs:
727     common.logger.debug(5,"data "+root+"/data"+" to be tarred")
728     tar.add(root+"/data",root[swAreaLen:]+"/data")
729    
730     ## Add ProdAgent dir to tar
731     paDir = 'ProdAgentApi'
732     pa = os.environ['CRABDIR'] + '/' + 'ProdAgentApi'
733     if os.path.isdir(pa):
734     tar.add(pa,paDir)
735    
736     common.logger.debug(5,"Files added to "+self.tgzNameWithPath+" : "+str(tar.getnames()))
737     tar.close()
738     except :
739     raise CrabException('Could not create tar-ball')
740 gutsche 1.72
741     ## check for tarball size
742     tarballinfo = os.stat(self.tgzNameWithPath)
743     if ( tarballinfo.st_size > self.MaxTarBallSize*1024*1024 ) :
744     raise CrabException('Input sandbox size of ' + str(float(tarballinfo.st_size)/1024.0/1024.0) + ' MB is larger than the allowed ' + str(self.MaxTarBallSize) + ' MB input sandbox limit and not supported by the used GRID submission system. Please make sure that no unnecessary files are in all data directories in your local CMSSW project area as they are automatically packed into the input sandbox.')
745    
746 slacapra 1.61 ## create tar-ball with ML stuff
747 corvo 1.58 self.MLtgzfile = common.work_space.pathForTgz()+'share/MLfiles.tgz'
748 slacapra 1.61 try:
749     tar = tarfile.open(self.MLtgzfile, "w:gz")
750     path=os.environ['CRABDIR'] + '/python/'
751     for file in ['report.py', 'DashboardAPI.py', 'Logger.py', 'ProcInfo.py', 'apmon.py', 'parseCrabFjr.py']:
752     tar.add(path+file,file)
753     common.logger.debug(5,"Files added to "+self.MLtgzfile+" : "+str(tar.getnames()))
754     tar.close()
755     except :
756 corvo 1.58 raise CrabException('Could not create ML files tar-ball')
757    
758 slacapra 1.1 return
759    
760     def wsSetupEnvironment(self, nj):
761     """
762     Returns part of a job script which prepares
763     the execution environment for the job 'nj'.
764     """
765     # Prepare JobType-independent part
766 gutsche 1.3 txt = ''
767    
768     ## OLI_Daniele at this level middleware already known
769    
770     txt += 'if [ $middleware == LCG ]; then \n'
771     txt += self.wsSetupCMSLCGEnvironment_()
772     txt += 'elif [ $middleware == OSG ]; then\n'
773 gutsche 1.43 txt += ' WORKING_DIR=`/bin/mktemp -d $OSG_WN_TMP/cms_XXXXXXXXXXXX`\n'
774     txt += ' echo "Created working directory: $WORKING_DIR"\n'
775 gutsche 1.3 txt += ' if [ ! -d $WORKING_DIR ] ;then\n'
776 gutsche 1.7 txt += ' echo "SET_CMS_ENV 10016 ==> OSG $WORKING_DIR could not be created on WN `hostname`"\n'
777     txt += ' echo "JOB_EXIT_STATUS = 10016"\n'
778     txt += ' echo "JobExitCode=10016" | tee -a $RUNTIME_AREA/$repo\n'
779     txt += ' dumpStatus $RUNTIME_AREA/$repo\n'
780 gutsche 1.13 txt += ' rm -f $RUNTIME_AREA/$repo \n'
781     txt += ' echo "MonitorJobID=`echo $MonitorJobID`" | tee -a $RUNTIME_AREA/$repo \n'
782     txt += ' echo "MonitorID=`echo $MonitorID`" | tee -a $RUNTIME_AREA/$repo\n'
783 gutsche 1.3 txt += ' exit 1\n'
784     txt += ' fi\n'
785     txt += '\n'
786     txt += ' echo "Change to working directory: $WORKING_DIR"\n'
787     txt += ' cd $WORKING_DIR\n'
788     txt += self.wsSetupCMSOSGEnvironment_()
789     txt += 'fi\n'
790 slacapra 1.1
791     # Prepare JobType-specific part
792     scram = self.scram.commandName()
793     txt += '\n\n'
794     txt += 'echo "### SPECIFIC JOB SETUP ENVIRONMENT ###"\n'
795     txt += scram+' project CMSSW '+self.version+'\n'
796     txt += 'status=$?\n'
797     txt += 'if [ $status != 0 ] ; then\n'
798 gutsche 1.7 txt += ' echo "SET_EXE_ENV 10034 ==>ERROR CMSSW '+self.version+' not found on `hostname`" \n'
799 gutsche 1.3 txt += ' echo "JOB_EXIT_STATUS = 10034"\n'
800 gutsche 1.7 txt += ' echo "JobExitCode=10034" | tee -a $RUNTIME_AREA/$repo\n'
801 slacapra 1.1 txt += ' dumpStatus $RUNTIME_AREA/$repo\n'
802 gutsche 1.13 txt += ' rm -f $RUNTIME_AREA/$repo \n'
803     txt += ' echo "MonitorJobID=`echo $MonitorJobID`" | tee -a $RUNTIME_AREA/$repo \n'
804     txt += ' echo "MonitorID=`echo $MonitorID`" | tee -a $RUNTIME_AREA/$repo\n'
805 gutsche 1.3 ## OLI_Daniele
806     txt += ' if [ $middleware == OSG ]; then \n'
807     txt += ' echo "Remove working directory: $WORKING_DIR"\n'
808     txt += ' cd $RUNTIME_AREA\n'
809     txt += ' /bin/rm -rf $WORKING_DIR\n'
810     txt += ' if [ -d $WORKING_DIR ] ;then\n'
811 gutsche 1.7 txt += ' echo "SET_CMS_ENV 10018 ==> OSG $WORKING_DIR could not be deleted on WN `hostname` after CMSSW CMSSW_0_6_1 not found on `hostname`"\n'
812     txt += ' echo "JOB_EXIT_STATUS = 10018"\n'
813     txt += ' echo "JobExitCode=10018" | tee -a $RUNTIME_AREA/$repo\n'
814     txt += ' dumpStatus $RUNTIME_AREA/$repo\n'
815 gutsche 1.13 txt += ' rm -f $RUNTIME_AREA/$repo \n'
816     txt += ' echo "MonitorJobID=`echo $MonitorJobID`" | tee -a $RUNTIME_AREA/$repo \n'
817     txt += ' echo "MonitorID=`echo $MonitorID`" | tee -a $RUNTIME_AREA/$repo\n'
818 gutsche 1.3 txt += ' fi\n'
819     txt += ' fi \n'
820     txt += ' exit 1 \n'
821 slacapra 1.1 txt += 'fi \n'
822     txt += 'echo "CMSSW_VERSION = '+self.version+'"\n'
823 slacapra 1.71 txt += 'export SCRAM_ARCH='+self.executable_arch+'\n'
824 slacapra 1.1 txt += 'cd '+self.version+'\n'
825     ### needed grep for bug in scramv1 ###
826 corvo 1.58 txt += scram+' runtime -sh\n'
827 slacapra 1.1 txt += 'eval `'+scram+' runtime -sh | grep -v SCRAMRT_LSB_JOBNAME`\n'
828 corvo 1.58 txt += 'echo $PATH\n'
829 slacapra 1.1
830     # Handle the arguments:
831     txt += "\n"
832 gutsche 1.7 txt += "## number of arguments (first argument always jobnumber)\n"
833 slacapra 1.1 txt += "\n"
834 mkirn 1.32 # txt += "narg=$#\n"
835     txt += "if [ $nargs -lt 2 ]\n"
836 slacapra 1.1 txt += "then\n"
837 mkirn 1.33 txt += " echo 'SET_EXE_ENV 1 ==> ERROR Too few arguments' +$nargs+ \n"
838 gutsche 1.3 txt += ' echo "JOB_EXIT_STATUS = 50113"\n'
839 gutsche 1.7 txt += ' echo "JobExitCode=50113" | tee -a $RUNTIME_AREA/$repo\n'
840 slacapra 1.1 txt += ' dumpStatus $RUNTIME_AREA/$repo\n'
841 gutsche 1.13 txt += ' rm -f $RUNTIME_AREA/$repo \n'
842     txt += ' echo "MonitorJobID=`echo $MonitorJobID`" | tee -a $RUNTIME_AREA/$repo \n'
843     txt += ' echo "MonitorID=`echo $MonitorID`" | tee -a $RUNTIME_AREA/$repo\n'
844 gutsche 1.3 ## OLI_Daniele
845     txt += ' if [ $middleware == OSG ]; then \n'
846     txt += ' echo "Remove working directory: $WORKING_DIR"\n'
847     txt += ' cd $RUNTIME_AREA\n'
848     txt += ' /bin/rm -rf $WORKING_DIR\n'
849     txt += ' if [ -d $WORKING_DIR ] ;then\n'
850 gutsche 1.7 txt += ' echo "SET_EXE_ENV 50114 ==> OSG $WORKING_DIR could not be deleted on WN `hostname` after Too few arguments for CRAB job wrapper"\n'
851     txt += ' echo "JOB_EXIT_STATUS = 50114"\n'
852     txt += ' echo "JobExitCode=50114" | tee -a $RUNTIME_AREA/$repo\n'
853     txt += ' dumpStatus $RUNTIME_AREA/$repo\n'
854 gutsche 1.13 txt += ' rm -f $RUNTIME_AREA/$repo \n'
855     txt += ' echo "MonitorJobID=`echo $MonitorJobID`" | tee -a $RUNTIME_AREA/$repo \n'
856     txt += ' echo "MonitorID=`echo $MonitorID`" | tee -a $RUNTIME_AREA/$repo\n'
857 gutsche 1.3 txt += ' fi\n'
858     txt += ' fi \n'
859 slacapra 1.1 txt += " exit 1\n"
860     txt += "fi\n"
861     txt += "\n"
862    
863     # Prepare job-specific part
864     job = common.job_list[nj]
865 spiga 1.42 if self.pset != None: #CarlosDaniele
866     pset = os.path.basename(job.configFilename())
867     txt += '\n'
868     if (self.datasetPath): # standard job
869     #txt += 'InputFiles=$2\n'
870     txt += 'InputFiles=${args[1]}\n'
871     txt += 'MaxEvents=${args[2]}\n'
872     txt += 'SkipEvents=${args[3]}\n'
873     txt += 'echo "Inputfiles:<$InputFiles>"\n'
874 corvo 1.79 txt += 'sed "s#{\'INPUT\'}#$InputFiles#" $RUNTIME_AREA/'+pset+' > pset_tmp_1.cfg\n'
875 spiga 1.42 txt += 'echo "MaxEvents:<$MaxEvents>"\n'
876 corvo 1.79 txt += 'sed "s#INPUTMAXEVENTS#$MaxEvents#" pset_tmp_1.cfg > pset_tmp_2.cfg\n'
877 spiga 1.42 txt += 'echo "SkipEvents:<$SkipEvents>"\n'
878 corvo 1.79 txt += 'sed "s#INPUTSKIPEVENTS#$SkipEvents#" pset_tmp_2.cfg > pset.cfg\n'
879 spiga 1.42 else: # pythia like job
880 corvo 1.79 if (self.sourceSeed):
881     txt += 'FirstRun=${args[1]}\n'
882 spiga 1.57 txt += 'echo "FirstRun: <$FirstRun>"\n'
883 corvo 1.79 txt += 'sed "s#\<INPUTFIRSTRUN\>#$FirstRun#" $RUNTIME_AREA/'+pset+' > tmp_1.cfg\n'
884     else:
885     txt += '# Copy untouched pset\n'
886     txt += 'cp $RUNTIME_AREA/'+pset+' tmp_1.cfg\n'
887 spiga 1.57 if (self.sourceSeed):
888 corvo 1.79 # txt += 'Seed=$2\n'
889     txt += 'Seed=${args[2]}\n'
890 spiga 1.42 txt += 'echo "Seed: <$Seed>"\n'
891 corvo 1.79 txt += 'sed "s#\<INPUT\>#$Seed#" tmp_1.cfg > tmp_2.cfg\n'
892 spiga 1.42 if (self.sourceSeedVtx):
893 corvo 1.79 # txt += 'VtxSeed=$3\n'
894     txt += 'VtxSeed=${args[3]}\n'
895 spiga 1.42 txt += 'echo "VtxSeed: <$VtxSeed>"\n'
896 corvo 1.79 txt += 'sed "s#INPUTVTX#$VtxSeed#" tmp_2.cfg > pset.cfg\n'
897     else:
898     txt += 'mv tmp_2.cfg pset.cfg\n'
899     else:
900     txt += 'mv tmp_1.cfg pset.cfg\n'
901     # txt += '# Copy untouched pset\n'
902     # txt += 'cp $RUNTIME_AREA/'+pset+' pset.cfg\n'
903 slacapra 1.24
904 slacapra 1.1
905     if len(self.additional_inbox_files) > 0:
906     for file in self.additional_inbox_files:
907 mkirn 1.31 relFile = file.split("/")[-1]
908     txt += 'if [ -e $RUNTIME_AREA/'+relFile+' ] ; then\n'
909     txt += ' cp $RUNTIME_AREA/'+relFile+' .\n'
910     txt += ' chmod +x '+relFile+'\n'
911 slacapra 1.1 txt += 'fi\n'
912     pass
913    
914 spiga 1.42 if self.pset != None: #CarlosDaniele
915     txt += 'echo "### END JOB SETUP ENVIRONMENT ###"\n\n'
916    
917     txt += '\n'
918     txt += 'echo "***** cat pset.cfg *********"\n'
919     txt += 'cat pset.cfg\n'
920     txt += 'echo "****** end pset.cfg ********"\n'
921     txt += '\n'
922     # txt += 'echo "***** cat pset1.cfg *********"\n'
923     # txt += 'cat pset1.cfg\n'
924     # txt += 'echo "****** end pset1.cfg ********"\n'
925 gutsche 1.3 return txt
926    
927 slacapra 1.63 def wsBuildExe(self, nj=0):
928 gutsche 1.3 """
929     Put in the script the commands to build an executable
930     or a library.
931     """
932    
933     txt = ""
934    
935     if os.path.isfile(self.tgzNameWithPath):
936     txt += 'echo "tar xzvf $RUNTIME_AREA/'+os.path.basename(self.tgzNameWithPath)+'"\n'
937     txt += 'tar xzvf $RUNTIME_AREA/'+os.path.basename(self.tgzNameWithPath)+'\n'
938     txt += 'untar_status=$? \n'
939     txt += 'if [ $untar_status -ne 0 ]; then \n'
940     txt += ' echo "SET_EXE 1 ==> ERROR Untarring .tgz file failed"\n'
941     txt += ' echo "JOB_EXIT_STATUS = $untar_status" \n'
942 gutsche 1.7 txt += ' echo "JobExitCode=$untar_status" | tee -a $RUNTIME_AREA/$repo\n'
943 gutsche 1.3 txt += ' if [ $middleware == OSG ]; then \n'
944     txt += ' echo "Remove working directory: $WORKING_DIR"\n'
945     txt += ' cd $RUNTIME_AREA\n'
946     txt += ' /bin/rm -rf $WORKING_DIR\n'
947     txt += ' if [ -d $WORKING_DIR ] ;then\n'
948 gutsche 1.13 txt += ' echo "SET_EXE 50999 ==> OSG $WORKING_DIR could not be deleted on WN `hostname` after Untarring .tgz file failed"\n'
949     txt += ' echo "JOB_EXIT_STATUS = 50999"\n'
950     txt += ' echo "JobExitCode=50999" | tee -a $RUNTIME_AREA/$repo\n'
951     txt += ' dumpStatus $RUNTIME_AREA/$repo\n'
952     txt += ' rm -f $RUNTIME_AREA/$repo \n'
953     txt += ' echo "MonitorJobID=`echo $MonitorJobID`" | tee -a $RUNTIME_AREA/$repo \n'
954     txt += ' echo "MonitorID=`echo $MonitorID`" | tee -a $RUNTIME_AREA/$repo\n'
955 gutsche 1.3 txt += ' fi\n'
956     txt += ' fi \n'
957     txt += ' \n'
958 gutsche 1.7 txt += ' exit 1 \n'
959 gutsche 1.3 txt += 'else \n'
960     txt += ' echo "Successful untar" \n'
961     txt += 'fi \n'
962 gutsche 1.50 txt += '\n'
963     txt += 'echo "Include ProdAgentApi in PYTHONPATH"\n'
964     txt += 'if [ -z "$PYTHONPATH" ]; then\n'
965     txt += ' export PYTHONPATH=ProdAgentApi\n'
966     txt += 'else\n'
967     txt += ' export PYTHONPATH=ProdAgentApi:${PYTHONPATH}\n'
968     txt += 'fi\n'
969     txt += '\n'
970    
971 gutsche 1.3 pass
972    
973 slacapra 1.1 return txt
974    
975     def modifySteeringCards(self, nj):
976     """
977     modify the card provided by the user,
978     writing a new card into share dir
979     """
980    
981     def executableName(self):
982 slacapra 1.70 if self.scriptExe: #CarlosDaniele
983 spiga 1.42 return "sh "
984     else:
985     return self.executable
986 slacapra 1.1
987     def executableArgs(self):
988 slacapra 1.70 if self.scriptExe:#CarlosDaniele
989 spiga 1.42 return self.scriptExe + " $NJob"
990     else:
991     return " -p pset.cfg"
992 slacapra 1.1
993     def inputSandbox(self, nj):
994     """
995     Returns a list of filenames to be put in JDL input sandbox.
996     """
997     inp_box = []
998 slacapra 1.53 # # dict added to delete duplicate from input sandbox file list
999     # seen = {}
1000 slacapra 1.1 ## code
1001     if os.path.isfile(self.tgzNameWithPath):
1002     inp_box.append(self.tgzNameWithPath)
1003 corvo 1.58 if os.path.isfile(self.MLtgzfile):
1004     inp_box.append(self.MLtgzfile)
1005 slacapra 1.1 ## config
1006 slacapra 1.70 if not self.pset is None:
1007 corvo 1.56 inp_box.append(common.work_space.pathForTgz() + 'job/' + self.configFilename())
1008 slacapra 1.1 ## additional input files
1009 slacapra 1.70 for file in self.additional_inbox_files:
1010     inp_box.append(file)
1011 slacapra 1.1 return inp_box
1012    
1013     def outputSandbox(self, nj):
1014     """
1015     Returns a list of filenames to be put in JDL output sandbox.
1016     """
1017     out_box = []
1018    
1019     ## User Declared output files
1020 slacapra 1.54 for out in (self.output_file+self.output_file_sandbox):
1021 slacapra 1.1 n_out = nj + 1
1022     out_box.append(self.numberFile_(out,str(n_out)))
1023     return out_box
1024    
1025     def prepareSteeringCards(self):
1026     """
1027     Make initial modifications of the user's steering card file.
1028     """
1029     return
1030    
1031     def wsRenameOutput(self, nj):
1032     """
1033     Returns part of a job script which renames the produced files.
1034     """
1035    
1036     txt = '\n'
1037 gutsche 1.7 txt += '# directory content\n'
1038     txt += 'ls \n'
1039 slacapra 1.54
1040     for fileWithSuffix in (self.output_file+self.output_file_sandbox):
1041 slacapra 1.1 output_file_num = self.numberFile_(fileWithSuffix, '$NJob')
1042     txt += '\n'
1043 gutsche 1.7 txt += '# check output file\n'
1044 slacapra 1.1 txt += 'ls '+fileWithSuffix+'\n'
1045 fanzago 1.18 txt += 'ls_result=$?\n'
1046     txt += 'if [ $ls_result -ne 0 ] ; then\n'
1047     txt += ' echo "ERROR: Problem with output file"\n'
1048 gutsche 1.7 if common.scheduler.boss_scheduler_name == 'condor_g':
1049     txt += ' if [ $middleware == OSG ]; then \n'
1050     txt += ' echo "prepare dummy output file"\n'
1051     txt += ' echo "Processing of job output failed" > $RUNTIME_AREA/'+output_file_num+'\n'
1052     txt += ' fi \n'
1053 slacapra 1.1 txt += 'else\n'
1054     txt += ' cp '+fileWithSuffix+' $RUNTIME_AREA/'+output_file_num+'\n'
1055     txt += 'fi\n'
1056    
1057 gutsche 1.7 txt += 'cd $RUNTIME_AREA\n'
1058 fanzago 1.18 txt += 'cd $RUNTIME_AREA\n'
1059 gutsche 1.3 ### OLI_DANIELE
1060     txt += 'if [ $middleware == OSG ]; then\n'
1061     txt += ' cd $RUNTIME_AREA\n'
1062     txt += ' echo "Remove working directory: $WORKING_DIR"\n'
1063     txt += ' /bin/rm -rf $WORKING_DIR\n'
1064     txt += ' if [ -d $WORKING_DIR ] ;then\n'
1065 gutsche 1.7 txt += ' echo "SET_EXE 60999 ==> OSG $WORKING_DIR could not be deleted on WN `hostname` after cleanup of WN"\n'
1066     txt += ' echo "JOB_EXIT_STATUS = 60999"\n'
1067     txt += ' echo "JobExitCode=60999" | tee -a $RUNTIME_AREA/$repo\n'
1068     txt += ' dumpStatus $RUNTIME_AREA/$repo\n'
1069 gutsche 1.13 txt += ' rm -f $RUNTIME_AREA/$repo \n'
1070     txt += ' echo "MonitorJobID=`echo $MonitorJobID`" | tee -a $RUNTIME_AREA/$repo \n'
1071     txt += ' echo "MonitorID=`echo $MonitorID`" | tee -a $RUNTIME_AREA/$repo\n'
1072 gutsche 1.3 txt += ' fi\n'
1073     txt += 'fi\n'
1074     txt += '\n'
1075 slacapra 1.54
1076     file_list = ''
1077     ## Add to filelist only files to be possibly copied to SE
1078     for fileWithSuffix in self.output_file:
1079     output_file_num = self.numberFile_(fileWithSuffix, '$NJob')
1080     file_list=file_list+output_file_num+' '
1081     file_list=file_list[:-1]
1082     txt += 'file_list="'+file_list+'"\n'
1083    
1084 slacapra 1.1 return txt
1085    
1086     def numberFile_(self, file, txt):
1087     """
1088     append _'txt' before last extension of a file
1089     """
1090     p = string.split(file,".")
1091     # take away last extension
1092     name = p[0]
1093     for x in p[1:-1]:
1094     name=name+"."+x
1095     # add "_txt"
1096     if len(p)>1:
1097     ext = p[len(p)-1]
1098     result = name + '_' + txt + "." + ext
1099     else:
1100     result = name + '_' + txt
1101    
1102     return result
1103    
1104 slacapra 1.63 def getRequirements(self, nj=[]):
1105 slacapra 1.1 """
1106     return job requirements to add to jdl files
1107     """
1108     req = ''
1109 slacapra 1.47 if self.version:
1110 slacapra 1.10 req='Member("VO-cms-' + \
1111 slacapra 1.47 self.version + \
1112 slacapra 1.10 '", other.GlueHostApplicationSoftwareRunTimeEnvironment)'
1113 gutsche 1.35
1114     req = req + ' && (other.GlueHostNetworkAdapterOutboundIP)'
1115    
1116 slacapra 1.1 return req
1117 gutsche 1.3
1118     def configFilename(self):
1119     """ return the config filename """
1120     return self.name()+'.cfg'
1121    
1122     ### OLI_DANIELE
1123     def wsSetupCMSOSGEnvironment_(self):
1124     """
1125     Returns part of a job script which is prepares
1126     the execution environment and which is common for all CMS jobs.
1127     """
1128     txt = '\n'
1129     txt += ' echo "### SETUP CMS OSG ENVIRONMENT ###"\n'
1130     txt += ' if [ -f $GRID3_APP_DIR/cmssoft/cmsset_default.sh ] ;then\n'
1131     txt += ' # Use $GRID3_APP_DIR/cmssoft/cmsset_default.sh to setup cms software\n'
1132     txt += ' source $GRID3_APP_DIR/cmssoft/cmsset_default.sh '+self.version+'\n'
1133 mkirn 1.40 txt += ' elif [ -f $OSG_APP/cmssoft/cms/cmsset_default.sh ] ;then\n'
1134     txt += ' # Use $OSG_APP/cmssoft/cms/cmsset_default.sh to setup cms software\n'
1135     txt += ' source $OSG_APP/cmssoft/cms/cmsset_default.sh '+self.version+'\n'
1136 gutsche 1.3 txt += ' else\n'
1137 mkirn 1.40 txt += ' echo "SET_CMS_ENV 10020 ==> ERROR $GRID3_APP_DIR/cmssoft/cmsset_default.sh and $OSG_APP/cmssoft/cms/cmsset_default.sh file not found"\n'
1138 gutsche 1.3 txt += ' echo "JOB_EXIT_STATUS = 10020"\n'
1139     txt += ' echo "JobExitCode=10020" | tee -a $RUNTIME_AREA/$repo\n'
1140     txt += ' dumpStatus $RUNTIME_AREA/$repo\n'
1141 gutsche 1.13 txt += ' rm -f $RUNTIME_AREA/$repo \n'
1142     txt += ' echo "MonitorJobID=`echo $MonitorJobID`" | tee -a $RUNTIME_AREA/$repo \n'
1143     txt += ' echo "MonitorID=`echo $MonitorID`" | tee -a $RUNTIME_AREA/$repo\n'
1144 gutsche 1.7 txt += ' exit 1\n'
1145 gutsche 1.3 txt += '\n'
1146     txt += ' echo "Remove working directory: $WORKING_DIR"\n'
1147     txt += ' cd $RUNTIME_AREA\n'
1148     txt += ' /bin/rm -rf $WORKING_DIR\n'
1149     txt += ' if [ -d $WORKING_DIR ] ;then\n'
1150 mkirn 1.40 txt += ' echo "SET_CMS_ENV 10017 ==> OSG $WORKING_DIR could not be deleted on WN `hostname` after $GRID3_APP_DIR/cmssoft/cmsset_default.sh and $OSG_APP/cmssoft/cms/cmsset_default.sh file not found"\n'
1151 gutsche 1.7 txt += ' echo "JOB_EXIT_STATUS = 10017"\n'
1152     txt += ' echo "JobExitCode=10017" | tee -a $RUNTIME_AREA/$repo\n'
1153     txt += ' dumpStatus $RUNTIME_AREA/$repo\n'
1154 gutsche 1.13 txt += ' rm -f $RUNTIME_AREA/$repo \n'
1155     txt += ' echo "MonitorJobID=`echo $MonitorJobID`" | tee -a $RUNTIME_AREA/$repo \n'
1156     txt += ' echo "MonitorID=`echo $MonitorID`" | tee -a $RUNTIME_AREA/$repo\n'
1157 gutsche 1.3 txt += ' fi\n'
1158     txt += '\n'
1159 gutsche 1.7 txt += ' exit 1\n'
1160 gutsche 1.3 txt += ' fi\n'
1161     txt += '\n'
1162     txt += ' echo "SET_CMS_ENV 0 ==> setup cms environment ok"\n'
1163     txt += ' echo " END SETUP CMS OSG ENVIRONMENT "\n'
1164    
1165     return txt
1166    
1167     ### OLI_DANIELE
1168     def wsSetupCMSLCGEnvironment_(self):
1169     """
1170     Returns part of a job script which is prepares
1171     the execution environment and which is common for all CMS jobs.
1172     """
1173     txt = ' \n'
1174     txt += ' echo " ### SETUP CMS LCG ENVIRONMENT ### "\n'
1175     txt += ' if [ ! $VO_CMS_SW_DIR ] ;then\n'
1176     txt += ' echo "SET_CMS_ENV 10031 ==> ERROR CMS software dir not found on WN `hostname`"\n'
1177     txt += ' echo "JOB_EXIT_STATUS = 10031" \n'
1178     txt += ' echo "JobExitCode=10031" | tee -a $RUNTIME_AREA/$repo\n'
1179     txt += ' dumpStatus $RUNTIME_AREA/$repo\n'
1180 gutsche 1.13 txt += ' rm -f $RUNTIME_AREA/$repo \n'
1181     txt += ' echo "MonitorJobID=`echo $MonitorJobID`" | tee -a $RUNTIME_AREA/$repo \n'
1182     txt += ' echo "MonitorID=`echo $MonitorID`" | tee -a $RUNTIME_AREA/$repo\n'
1183 gutsche 1.7 txt += ' exit 1\n'
1184 gutsche 1.3 txt += ' else\n'
1185     txt += ' echo "Sourcing environment... "\n'
1186     txt += ' if [ ! -s $VO_CMS_SW_DIR/cmsset_default.sh ] ;then\n'
1187     txt += ' echo "SET_CMS_ENV 10020 ==> ERROR cmsset_default.sh file not found into dir $VO_CMS_SW_DIR"\n'
1188     txt += ' echo "JOB_EXIT_STATUS = 10020"\n'
1189     txt += ' echo "JobExitCode=10020" | tee -a $RUNTIME_AREA/$repo\n'
1190     txt += ' dumpStatus $RUNTIME_AREA/$repo\n'
1191 gutsche 1.13 txt += ' rm -f $RUNTIME_AREA/$repo \n'
1192     txt += ' echo "MonitorJobID=`echo $MonitorJobID`" | tee -a $RUNTIME_AREA/$repo \n'
1193     txt += ' echo "MonitorID=`echo $MonitorID`" | tee -a $RUNTIME_AREA/$repo\n'
1194 gutsche 1.7 txt += ' exit 1\n'
1195 gutsche 1.3 txt += ' fi\n'
1196     txt += ' echo "sourcing $VO_CMS_SW_DIR/cmsset_default.sh"\n'
1197     txt += ' source $VO_CMS_SW_DIR/cmsset_default.sh\n'
1198     txt += ' result=$?\n'
1199     txt += ' if [ $result -ne 0 ]; then\n'
1200     txt += ' echo "SET_CMS_ENV 10032 ==> ERROR problem sourcing $VO_CMS_SW_DIR/cmsset_default.sh"\n'
1201     txt += ' echo "JOB_EXIT_STATUS = 10032"\n'
1202     txt += ' echo "JobExitCode=10032" | tee -a $RUNTIME_AREA/$repo\n'
1203     txt += ' dumpStatus $RUNTIME_AREA/$repo\n'
1204 gutsche 1.13 txt += ' rm -f $RUNTIME_AREA/$repo \n'
1205     txt += ' echo "MonitorJobID=`echo $MonitorJobID`" | tee -a $RUNTIME_AREA/$repo \n'
1206     txt += ' echo "MonitorID=`echo $MonitorID`" | tee -a $RUNTIME_AREA/$repo\n'
1207 gutsche 1.7 txt += ' exit 1\n'
1208 gutsche 1.3 txt += ' fi\n'
1209     txt += ' fi\n'
1210     txt += ' \n'
1211     txt += ' echo "SET_CMS_ENV 0 ==> setup cms environment ok"\n'
1212     txt += ' echo "### END SETUP CMS LCG ENVIRONMENT ###"\n'
1213     return txt
1214 gutsche 1.5
1215     def setParam_(self, param, value):
1216     self._params[param] = value
1217    
1218     def getParams(self):
1219     return self._params
1220 gutsche 1.8
1221     def setTaskid_(self):
1222     self._taskId = self.cfg_params['taskId']
1223    
1224     def getTaskid(self):
1225     return self._taskId
1226 gutsche 1.35
1227     #######################################################################
1228     def uniquelist(self, old):
1229     """
1230     remove duplicates from a list
1231     """
1232     nd={}
1233     for e in old:
1234     nd[e]=0
1235     return nd.keys()