ViewVC Help
View File | Revision Log | Show Annotations | Root Listing
root/cvsroot/COMP/CRAB/python/cms_cmssw.py
Revision: 1.61
Committed: Tue Jan 16 17:19:05 2007 UTC (18 years, 3 months ago) by slacapra
Content type: text/x-python
Branch: MAIN
Changes since 1.60: +67 -72 lines
Log Message:
rewrite the creation of tar-ball files, using the tarfile module from python rather than opening a shell and do tar -czvf... Moreover, non need to copy anything in swArea for ProdAgent stuff not ML

File Contents

# User Rev Content
1 slacapra 1.1 from JobType import JobType
2     from crab_logger import Logger
3     from crab_exceptions import *
4     from crab_util import *
5 slacapra 1.22 import math
6 slacapra 1.1 import common
7 gutsche 1.3 import PsetManipulator
8 slacapra 1.1
9 slacapra 1.41 import DBSInfo
10     import DataDiscovery
11     import DataLocation
12 slacapra 1.1 import Scram
13    
14 corvo 1.56 import glob, os, string, re, shutil
15 slacapra 1.1
16     class Cmssw(JobType):
17 gutsche 1.38 def __init__(self, cfg_params, ncjobs):
18 slacapra 1.1 JobType.__init__(self, 'CMSSW')
19     common.logger.debug(3,'CMSSW::__init__')
20    
21 gutsche 1.3 # Marco.
22     self._params = {}
23     self.cfg_params = cfg_params
24 gutsche 1.38
25 gutsche 1.44 # number of jobs requested to be created, limit obj splitting
26 gutsche 1.38 self.ncjobs = ncjobs
27    
28 slacapra 1.1 log = common.logger
29    
30     self.scram = Scram.Scram(cfg_params)
31     scramArea = ''
32     self.additional_inbox_files = []
33     self.scriptExe = ''
34     self.executable = ''
35     self.tgz_name = 'default.tgz'
36 corvo 1.56 self.scriptName = 'CMSSW.sh'
37 spiga 1.42 self.pset = '' #scrip use case Da
38     self.datasetPath = '' #scrip use case Da
39 gutsche 1.3
40 gutsche 1.50 # set FJR file name
41     self.fjrFileName = 'crab_fjr.xml'
42    
43 slacapra 1.1 self.version = self.scram.getSWVersion()
44 slacapra 1.55 common.taskDB.setDict('codeVersion',self.version)
45 gutsche 1.5 self.setParam_('application', self.version)
46 slacapra 1.47
47 slacapra 1.1 ### collect Data cards
48     try:
49 slacapra 1.9 tmp = cfg_params['CMSSW.datasetpath']
50     log.debug(6, "CMSSW::CMSSW(): datasetPath = "+tmp)
51     if string.lower(tmp)=='none':
52     self.datasetPath = None
53 slacapra 1.21 self.selectNoInput = 1
54 slacapra 1.9 else:
55     self.datasetPath = tmp
56 slacapra 1.21 self.selectNoInput = 0
57 slacapra 1.1 except KeyError:
58 gutsche 1.3 msg = "Error: datasetpath not defined "
59 slacapra 1.1 raise CrabException(msg)
60 gutsche 1.5
61     # ML monitoring
62     # split dataset path style: /PreProdR3Minbias/SIM/GEN-SIM
63 slacapra 1.9 if not self.datasetPath:
64     self.setParam_('dataset', 'None')
65     self.setParam_('owner', 'None')
66     else:
67     datasetpath_split = self.datasetPath.split("/")
68     self.setParam_('dataset', datasetpath_split[1])
69     self.setParam_('owner', datasetpath_split[-1])
70    
71 gutsche 1.8 self.setTaskid_()
72     self.setParam_('taskId', self.cfg_params['taskId'])
73 gutsche 1.5
74 slacapra 1.1 self.dataTiers = []
75    
76     ## now the application
77     try:
78     self.executable = cfg_params['CMSSW.executable']
79 gutsche 1.5 self.setParam_('exe', self.executable)
80 slacapra 1.1 log.debug(6, "CMSSW::CMSSW(): executable = "+self.executable)
81     msg = "Default executable cmsRun overridden. Switch to " + self.executable
82     log.debug(3,msg)
83     except KeyError:
84     self.executable = 'cmsRun'
85 gutsche 1.5 self.setParam_('exe', self.executable)
86 slacapra 1.1 msg = "User executable not defined. Use cmsRun"
87     log.debug(3,msg)
88     pass
89    
90     try:
91     self.pset = cfg_params['CMSSW.pset']
92     log.debug(6, "Cmssw::Cmssw(): PSet file = "+self.pset)
93 spiga 1.42 if self.pset.lower() != 'none' :
94     if (not os.path.exists(self.pset)):
95     raise CrabException("User defined PSet file "+self.pset+" does not exist")
96     else:
97     self.pset = None
98 slacapra 1.1 except KeyError:
99     raise CrabException("PSet file missing. Cannot run cmsRun ")
100    
101     # output files
102 slacapra 1.53 ## stuff which must be returned always via sandbox
103     self.output_file_sandbox = []
104    
105     # add fjr report by default via sandbox
106     self.output_file_sandbox.append(self.fjrFileName)
107    
108     # other output files to be returned via sandbox or copied to SE
109 slacapra 1.1 try:
110     self.output_file = []
111    
112 gutsche 1.50
113 slacapra 1.1 tmp = cfg_params['CMSSW.output_file']
114     if tmp != '':
115     tmpOutFiles = string.split(cfg_params['CMSSW.output_file'],',')
116     log.debug(7, 'cmssw::cmssw(): output files '+str(tmpOutFiles))
117     for tmp in tmpOutFiles:
118     tmp=string.strip(tmp)
119     self.output_file.append(tmp)
120     pass
121     else:
122 gutsche 1.50 log.message("No output file defined: only stdout/err and the CRAB Framework Job Report will be available")
123 slacapra 1.1 pass
124     pass
125     except KeyError:
126 gutsche 1.50 log.message("No output file defined: only stdout/err and the CRAB Framework Job Report will be available")
127 slacapra 1.1 pass
128    
129     # script_exe file as additional file in inputSandbox
130     try:
131 slacapra 1.10 self.scriptExe = cfg_params['USER.script_exe']
132     if self.scriptExe != '':
133     if not os.path.isfile(self.scriptExe):
134     msg ="WARNING. file "+self.scriptExe+" not found"
135     raise CrabException(msg)
136 spiga 1.42 self.additional_inbox_files.append(string.strip(self.scriptExe))
137 slacapra 1.1 except KeyError:
138 spiga 1.42 self.scriptExe = ''
139     #CarlosDaniele
140     if self.datasetPath == None and self.pset == None and self.scriptExe == '' :
141     msg ="WARNING. script_exe not defined"
142     raise CrabException(msg)
143    
144 slacapra 1.1 ## additional input files
145     try:
146 slacapra 1.29 tmpAddFiles = string.split(cfg_params['USER.additional_input_files'],',')
147 slacapra 1.1 for tmp in tmpAddFiles:
148 slacapra 1.51 tmp = string.strip(tmp)
149 slacapra 1.45 dirname = ''
150     if not tmp[0]=="/": dirname = "."
151 corvo 1.58 shutil.copyfile(tmp, common.work_space.shareDir()+tmp)
152     files = common.work_space.pathForTgz()+'share/' + tmp
153     if not os.path.exists(files):
154     raise CrabException("Additional input file not found: "+files)
155 slacapra 1.45 pass
156 corvo 1.58 self.additional_inbox_files.append(string.strip(files))
157 slacapra 1.1 pass
158     pass
159 slacapra 1.45 common.logger.debug(5,"Additional input files: "+str(self.additional_inbox_files))
160 slacapra 1.1 except KeyError:
161     pass
162    
163 slacapra 1.9 # files per job
164 slacapra 1.1 try:
165 gutsche 1.35 if (cfg_params['CMSSW.files_per_jobs']):
166     raise CrabException("files_per_jobs no longer supported. Quitting.")
167 gutsche 1.3 except KeyError:
168 gutsche 1.35 pass
169 gutsche 1.3
170 slacapra 1.9 ## Events per job
171 gutsche 1.3 try:
172 slacapra 1.10 self.eventsPerJob =int( cfg_params['CMSSW.events_per_job'])
173 slacapra 1.9 self.selectEventsPerJob = 1
174 gutsche 1.3 except KeyError:
175 slacapra 1.9 self.eventsPerJob = -1
176     self.selectEventsPerJob = 0
177    
178 slacapra 1.22 ## number of jobs
179     try:
180     self.theNumberOfJobs =int( cfg_params['CMSSW.number_of_jobs'])
181     self.selectNumberOfJobs = 1
182     except KeyError:
183     self.theNumberOfJobs = 0
184     self.selectNumberOfJobs = 0
185 slacapra 1.10
186 gutsche 1.35 try:
187     self.total_number_of_events = int(cfg_params['CMSSW.total_number_of_events'])
188     self.selectTotalNumberEvents = 1
189     except KeyError:
190     self.total_number_of_events = 0
191     self.selectTotalNumberEvents = 0
192    
193 spiga 1.42 if self.pset != None: #CarlosDaniele
194     if ( (self.selectTotalNumberEvents + self.selectEventsPerJob + self.selectNumberOfJobs) != 2 ):
195     msg = 'Must define exactly two of total_number_of_events, events_per_job, or number_of_jobs.'
196     raise CrabException(msg)
197     else:
198     if (self.selectNumberOfJobs == 0):
199     msg = 'Must specify number_of_jobs.'
200     raise CrabException(msg)
201 gutsche 1.35
202 slacapra 1.22 ## source seed for pythia
203     try:
204     self.sourceSeed = int(cfg_params['CMSSW.pythia_seed'])
205     except KeyError:
206 slacapra 1.23 self.sourceSeed = None
207     common.logger.debug(5,"No seed given")
208 slacapra 1.22
209 slacapra 1.28 try:
210     self.sourceSeedVtx = int(cfg_params['CMSSW.vtx_seed'])
211     except KeyError:
212     self.sourceSeedVtx = None
213     common.logger.debug(5,"No vertex seed given")
214 spiga 1.57 try:
215     self.firstRun = int(cfg_params['CMSSW.first_run'])
216     except KeyError:
217     self.firstRun = None
218     common.logger.debug(5,"No first run given")
219 spiga 1.42 if self.pset != None: #CarlosDaniele
220     self.PsetEdit = PsetManipulator.PsetManipulator(self.pset) #Daniele Pset
221 gutsche 1.3
222 slacapra 1.1 #DBSDLS-start
223     ## Initialize the variables that are extracted from DBS/DLS and needed in other places of the code
224     self.maxEvents=0 # max events available ( --> check the requested nb. of evts in Creator.py)
225     self.DBSPaths={} # all dbs paths requested ( --> input to the site local discovery script)
226 gutsche 1.35 self.jobDestination=[] # Site destination(s) for each job (list of lists)
227 slacapra 1.1 ## Perform the data location and discovery (based on DBS/DLS)
228 slacapra 1.9 ## SL: Don't if NONE is specified as input (pythia use case)
229 gutsche 1.35 blockSites = {}
230 slacapra 1.9 if self.datasetPath:
231 gutsche 1.35 blockSites = self.DataDiscoveryAndLocation(cfg_params)
232 slacapra 1.1 #DBSDLS-end
233    
234     self.tgzNameWithPath = self.getTarBall(self.executable)
235 slacapra 1.10
236 slacapra 1.9 ## Select Splitting
237 spiga 1.42 if self.selectNoInput:
238     if self.pset == None: #CarlosDaniele
239     self.jobSplittingForScript()
240     else:
241     self.jobSplittingNoInput()
242 corvo 1.56 else:
243     self.jobSplittingByBlocks(blockSites)
244 gutsche 1.5
245 slacapra 1.22 # modify Pset
246 spiga 1.42 if self.pset != None: #CarlosDaniele
247     try:
248     if (self.datasetPath): # standard job
249     # allow to processa a fraction of events in a file
250     self.PsetEdit.inputModule("INPUT")
251     self.PsetEdit.maxEvent("INPUTMAXEVENTS")
252     self.PsetEdit.skipEvent("INPUTSKIPEVENTS")
253     else: # pythia like job
254     self.PsetEdit.maxEvent(self.eventsPerJob)
255 spiga 1.57 if (self.firstRun):
256     self.PsetEdit.pythiaFirstRun("INPUTFIRSTRUN") #First Run
257 spiga 1.42 if (self.sourceSeed) :
258     self.PsetEdit.pythiaSeed("INPUT")
259     if (self.sourceSeedVtx) :
260     self.PsetEdit.pythiaSeedVtx("INPUTVTX")
261 gutsche 1.50 # add FrameworkJobReport to parameter-set
262     self.PsetEdit.addCrabFJR(self.fjrFileName)
263 spiga 1.42 self.PsetEdit.psetWriter(self.configFilename())
264     except:
265     msg='Error while manipuliating ParameterSet: exiting...'
266     raise CrabException(msg)
267 gutsche 1.3
268 slacapra 1.1 def DataDiscoveryAndLocation(self, cfg_params):
269    
270 gutsche 1.3 common.logger.debug(10,"CMSSW::DataDiscoveryAndLocation()")
271    
272     datasetPath=self.datasetPath
273    
274     ## TODO
275     dataTiersList = ""
276     dataTiers = dataTiersList.split(',')
277 slacapra 1.1
278     ## Contact the DBS
279 slacapra 1.41 common.logger.message("Contacting DBS...")
280 slacapra 1.1 try:
281 slacapra 1.41 self.pubdata=DataDiscovery.DataDiscovery(datasetPath, dataTiers, cfg_params)
282 slacapra 1.1 self.pubdata.fetchDBSInfo()
283    
284 slacapra 1.41 except DataDiscovery.NotExistingDatasetError, ex :
285 slacapra 1.1 msg = 'ERROR ***: failed Data Discovery in DBS : %s'%ex.getErrorMessage()
286     raise CrabException(msg)
287    
288 slacapra 1.41 except DataDiscovery.NoDataTierinProvenanceError, ex :
289 slacapra 1.1 msg = 'ERROR ***: failed Data Discovery in DBS : %s'%ex.getErrorMessage()
290     raise CrabException(msg)
291 slacapra 1.41 except DataDiscovery.DataDiscoveryError, ex:
292 slacapra 1.1 msg = 'ERROR ***: failed Data Discovery in DBS %s'%ex.getErrorMessage()
293     raise CrabException(msg)
294    
295     ## get list of all required data in the form of dbs paths (dbs path = /dataset/datatier/owner)
296 gutsche 1.3 ## self.DBSPaths=self.pubdata.getDBSPaths()
297     common.logger.message("Required data are :"+self.datasetPath)
298    
299 gutsche 1.35 self.filesbyblock=self.pubdata.getFiles()
300 mkirn 1.37 self.eventsbyblock=self.pubdata.getEventsPerBlock()
301     self.eventsbyfile=self.pubdata.getEventsPerFile()
302 slacapra 1.41 # print str(self.filesbyblock)
303     # print 'self.eventsbyfile',len(self.eventsbyfile)
304     # print str(self.eventsbyfile)
305 gutsche 1.3
306 slacapra 1.1 ## get max number of events
307     self.maxEvents=self.pubdata.getMaxEvents() ## self.maxEvents used in Creator.py
308 gutsche 1.44 common.logger.message("The number of available events is %s\n"%self.maxEvents)
309 slacapra 1.1
310 slacapra 1.41 common.logger.message("Contacting DLS...")
311 slacapra 1.1 ## Contact the DLS and build a list of sites hosting the fileblocks
312     try:
313 slacapra 1.41 dataloc=DataLocation.DataLocation(self.filesbyblock.keys(),cfg_params)
314 gutsche 1.6 dataloc.fetchDLSInfo()
315 slacapra 1.41 except DataLocation.DataLocationError , ex:
316 slacapra 1.1 msg = 'ERROR ***: failed Data Location in DLS \n %s '%ex.getErrorMessage()
317     raise CrabException(msg)
318    
319    
320 gutsche 1.35 sites = dataloc.getSites()
321     allSites = []
322     listSites = sites.values()
323     for list in listSites:
324     for oneSite in list:
325     allSites.append(oneSite)
326     allSites = self.uniquelist(allSites)
327 gutsche 1.3
328 gutsche 1.35 common.logger.message("Sites ("+str(len(allSites))+") hosting part/all of dataset: "+str(allSites))
329     common.logger.debug(6, "List of Sites: "+str(allSites))
330     return sites
331 gutsche 1.3
332 gutsche 1.35 def jobSplittingByBlocks(self, blockSites):
333 slacapra 1.9 """
334 gutsche 1.35 Perform job splitting. Jobs run over an integer number of files
335     and no more than one block.
336     ARGUMENT: blockSites: dictionary with blocks as keys and list of host sites as values
337     REQUIRES: self.selectTotalNumberEvents, self.selectEventsPerJob, self.selectNumberofJobs,
338     self.total_number_of_events, self.eventsPerJob, self.theNumberOfJobs,
339     self.maxEvents, self.filesbyblock
340     SETS: self.jobDestination - Site destination(s) for each job (a list of lists)
341     self.total_number_of_jobs - Total # of jobs
342     self.list_of_args - File(s) job will run on (a list of lists)
343     """
344    
345     # ---- Handle the possible job splitting configurations ---- #
346     if (self.selectTotalNumberEvents):
347     totalEventsRequested = self.total_number_of_events
348     if (self.selectEventsPerJob):
349     eventsPerJobRequested = self.eventsPerJob
350     if (self.selectNumberOfJobs):
351     totalEventsRequested = self.theNumberOfJobs * self.eventsPerJob
352    
353     # If user requested all the events in the dataset
354     if (totalEventsRequested == -1):
355     eventsRemaining=self.maxEvents
356     # If user requested more events than are in the dataset
357     elif (totalEventsRequested > self.maxEvents):
358     eventsRemaining = self.maxEvents
359     common.logger.message("Requested "+str(self.total_number_of_events)+ " events, but only "+str(self.maxEvents)+" events are available.")
360     # If user requested less events than are in the dataset
361     else:
362     eventsRemaining = totalEventsRequested
363 slacapra 1.22
364 slacapra 1.41 # If user requested more events per job than are in the dataset
365     if (self.selectEventsPerJob and eventsPerJobRequested > self.maxEvents):
366     eventsPerJobRequested = self.maxEvents
367    
368 gutsche 1.35 # For user info at end
369     totalEventCount = 0
370 gutsche 1.3
371 gutsche 1.35 if (self.selectTotalNumberEvents and self.selectNumberOfJobs):
372     eventsPerJobRequested = int(eventsRemaining/self.theNumberOfJobs)
373 slacapra 1.22
374 gutsche 1.35 if (self.selectNumberOfJobs):
375     common.logger.message("May not create the exact number_of_jobs requested.")
376 slacapra 1.23
377 gutsche 1.38 if ( self.ncjobs == 'all' ) :
378     totalNumberOfJobs = 999999999
379     else :
380     totalNumberOfJobs = self.ncjobs
381    
382    
383 gutsche 1.35 blocks = blockSites.keys()
384     blockCount = 0
385     # Backup variable in case self.maxEvents counted events in a non-included block
386     numBlocksInDataset = len(blocks)
387 gutsche 1.3
388 gutsche 1.35 jobCount = 0
389     list_of_lists = []
390 gutsche 1.3
391 gutsche 1.35 # ---- Iterate over the blocks in the dataset until ---- #
392     # ---- we've met the requested total # of events ---- #
393 gutsche 1.38 while ( (eventsRemaining > 0) and (blockCount < numBlocksInDataset) and (jobCount < totalNumberOfJobs)):
394 gutsche 1.35 block = blocks[blockCount]
395 gutsche 1.44 blockCount += 1
396    
397 gutsche 1.3
398 gutsche 1.44 numEventsInBlock = self.eventsbyblock[block]
399     common.logger.debug(5,'Events in Block File '+str(numEventsInBlock))
400 slacapra 1.9
401 gutsche 1.35 files = self.filesbyblock[block]
402     numFilesInBlock = len(files)
403     if (numFilesInBlock <= 0):
404     continue
405     fileCount = 0
406    
407     # ---- New block => New job ---- #
408     parString = "\\{"
409 gutsche 1.38 # counter for number of events in files currently worked on
410     filesEventCount = 0
411     # flag if next while loop should touch new file
412     newFile = 1
413     # job event counter
414     jobSkipEventCount = 0
415 slacapra 1.9
416 gutsche 1.35 # ---- Iterate over the files in the block until we've met the requested ---- #
417     # ---- total # of events or we've gone over all the files in this block ---- #
418 gutsche 1.38 while ( (eventsRemaining > 0) and (fileCount < numFilesInBlock) and (jobCount < totalNumberOfJobs) ):
419 gutsche 1.35 file = files[fileCount]
420 gutsche 1.38 if newFile :
421 slacapra 1.41 try:
422     numEventsInFile = self.eventsbyfile[file]
423     common.logger.debug(6, "File "+str(file)+" has "+str(numEventsInFile)+" events")
424     # increase filesEventCount
425     filesEventCount += numEventsInFile
426     # Add file to current job
427     parString += '\\\"' + file + '\\\"\,'
428     newFile = 0
429     except KeyError:
430 gutsche 1.44 common.logger.message("File "+str(file)+" has unknown number of events: skipping")
431 slacapra 1.41
432 gutsche 1.38
433     # if less events in file remain than eventsPerJobRequested
434     if ( filesEventCount - jobSkipEventCount < eventsPerJobRequested ) :
435     # if last file in block
436 gutsche 1.44 if ( fileCount == numFilesInBlock-1 ) :
437 gutsche 1.38 # end job using last file, use remaining events in block
438     # close job and touch new file
439     fullString = parString[:-2]
440     fullString += '\\}'
441     list_of_lists.append([fullString,str(-1),str(jobSkipEventCount)])
442 slacapra 1.41 common.logger.debug(3,"Job "+str(jobCount+1)+" can run over "+str(filesEventCount - jobSkipEventCount)+" events (last file in block).")
443 gutsche 1.38 self.jobDestination.append(blockSites[block])
444     common.logger.debug(5,"Job "+str(jobCount+1)+" Destination: "+str(self.jobDestination[jobCount]))
445     # reset counter
446     jobCount = jobCount + 1
447 gutsche 1.44 totalEventCount = totalEventCount + filesEventCount - jobSkipEventCount
448     eventsRemaining = eventsRemaining - filesEventCount + jobSkipEventCount
449 gutsche 1.38 jobSkipEventCount = 0
450     # reset file
451     parString = "\\{"
452     filesEventCount = 0
453     newFile = 1
454     fileCount += 1
455     else :
456     # go to next file
457     newFile = 1
458     fileCount += 1
459     # if events in file equal to eventsPerJobRequested
460     elif ( filesEventCount - jobSkipEventCount == eventsPerJobRequested ) :
461     # close job and touch new file
462 gutsche 1.35 fullString = parString[:-2]
463     fullString += '\\}'
464 gutsche 1.38 list_of_lists.append([fullString,str(eventsPerJobRequested),str(jobSkipEventCount)])
465 slacapra 1.41 common.logger.debug(3,"Job "+str(jobCount+1)+" can run over "+str(eventsPerJobRequested)+" events.")
466 gutsche 1.38 self.jobDestination.append(blockSites[block])
467     common.logger.debug(5,"Job "+str(jobCount+1)+" Destination: "+str(self.jobDestination[jobCount]))
468     # reset counter
469     jobCount = jobCount + 1
470     totalEventCount = totalEventCount + eventsPerJobRequested
471     eventsRemaining = eventsRemaining - eventsPerJobRequested
472     jobSkipEventCount = 0
473     # reset file
474     parString = "\\{"
475     filesEventCount = 0
476     newFile = 1
477     fileCount += 1
478    
479     # if more events in file remain than eventsPerJobRequested
480     else :
481     # close job but don't touch new file
482     fullString = parString[:-2]
483     fullString += '\\}'
484     list_of_lists.append([fullString,str(eventsPerJobRequested),str(jobSkipEventCount)])
485 slacapra 1.41 common.logger.debug(3,"Job "+str(jobCount+1)+" can run over "+str(eventsPerJobRequested)+" events.")
486 gutsche 1.35 self.jobDestination.append(blockSites[block])
487     common.logger.debug(5,"Job "+str(jobCount+1)+" Destination: "+str(self.jobDestination[jobCount]))
488 gutsche 1.38 # increase counter
489     jobCount = jobCount + 1
490     totalEventCount = totalEventCount + eventsPerJobRequested
491     eventsRemaining = eventsRemaining - eventsPerJobRequested
492     # calculate skip events for last file
493     # use filesEventCount (contains several files), jobSkipEventCount and eventsPerJobRequest
494     jobSkipEventCount = eventsPerJobRequested - (filesEventCount - jobSkipEventCount - self.eventsbyfile[file])
495     # remove all but the last file
496     filesEventCount = self.eventsbyfile[file]
497     parString = "\\{"
498     parString += '\\\"' + file + '\\\"\,'
499 slacapra 1.41 pass # END if
500 gutsche 1.35 pass # END while (iterate over files in the block)
501     pass # END while (iterate over blocks in the dataset)
502 slacapra 1.41 self.ncjobs = self.total_number_of_jobs = jobCount
503 gutsche 1.38 if (eventsRemaining > 0 and jobCount < totalNumberOfJobs ):
504 gutsche 1.35 common.logger.message("Could not run on all requested events because some blocks not hosted at allowed sites.")
505 mkirn 1.37 common.logger.message("\n"+str(jobCount)+" job(s) can run on "+str(totalEventCount)+" events.\n")
506 slacapra 1.22
507 slacapra 1.9 self.list_of_args = list_of_lists
508     return
509    
510 slacapra 1.21 def jobSplittingNoInput(self):
511 slacapra 1.9 """
512     Perform job splitting based on number of event per job
513     """
514     common.logger.debug(5,'Splitting per events')
515     common.logger.message('Required '+str(self.eventsPerJob)+' events per job ')
516 slacapra 1.22 common.logger.message('Required '+str(self.theNumberOfJobs)+' jobs in total ')
517 slacapra 1.9 common.logger.message('Required '+str(self.total_number_of_events)+' events in total ')
518    
519 slacapra 1.10 if (self.total_number_of_events < 0):
520     msg='Cannot split jobs per Events with "-1" as total number of events'
521     raise CrabException(msg)
522    
523 slacapra 1.22 if (self.selectEventsPerJob):
524     self.total_number_of_jobs = int(self.total_number_of_events/self.eventsPerJob)
525     elif (self.selectNumberOfJobs) :
526     self.total_number_of_jobs = self.theNumberOfJobs
527     self.eventsPerJob = int(self.total_number_of_events/self.total_number_of_jobs)
528 fanzago 1.12
529 slacapra 1.9 common.logger.debug(5,'N jobs '+str(self.total_number_of_jobs))
530    
531     # is there any remainder?
532     check = int(self.total_number_of_events) - (int(self.total_number_of_jobs)*self.eventsPerJob)
533    
534     common.logger.debug(5,'Check '+str(check))
535    
536 gutsche 1.35 common.logger.message(str(self.total_number_of_jobs)+' jobs can be created, each for '+str(self.eventsPerJob)+' for a total of '+str(self.total_number_of_jobs*self.eventsPerJob)+' events')
537 slacapra 1.9 if check > 0:
538 gutsche 1.35 common.logger.message('Warning: asked '+str(self.total_number_of_events)+' but can do only '+str(int(self.total_number_of_jobs)*self.eventsPerJob))
539 slacapra 1.9
540 slacapra 1.10 # argument is seed number.$i
541 slacapra 1.9 self.list_of_args = []
542     for i in range(self.total_number_of_jobs):
543 gutsche 1.35 ## Since there is no input, any site is good
544 spiga 1.42 # self.jobDestination.append(["Any"])
545     self.jobDestination.append([""]) #must be empty to write correctly the xml
546 spiga 1.57 args=''
547     if (self.firstRun):
548     ## pythia first run
549     #self.list_of_args.append([(str(self.firstRun)+str(i))])
550     args=args+(str(self.firstRun)+str(i))
551     else:
552     ## no first run
553     #self.list_of_args.append([str(i)])
554     args=args+str(i)
555 slacapra 1.23 if (self.sourceSeed):
556 slacapra 1.28 if (self.sourceSeedVtx):
557     ## pythia + vtx random seed
558 spiga 1.57 #self.list_of_args.append([
559     # str(self.sourceSeed)+str(i),
560     # str(self.sourceSeedVtx)+str(i)
561     # ])
562     args=args+str(',')+str(self.sourceSeed)+str(i)+str(',')+str(self.sourceSeedVtx)+str(i)
563 slacapra 1.28 else:
564     ## only pythia random seed
565 spiga 1.57 #self.list_of_args.append([(str(self.sourceSeed)+str(i))])
566     args=args +str(',')+str(self.sourceSeed)+str(i)
567 slacapra 1.23 else:
568 slacapra 1.28 ## no random seed
569 spiga 1.57 if str(args)=='': args=args+(str(self.firstRun)+str(i))
570     arguments=args.split(',')
571     if len(arguments)==3:self.list_of_args.append([str(arguments[0]),str(arguments[1]),str(arguments[2])])
572     elif len(arguments)==2:self.list_of_args.append([str(arguments[0]),str(arguments[1])])
573     else :self.list_of_args.append([str(arguments[0])])
574    
575     # print self.list_of_args
576 gutsche 1.3
577     return
578    
579 spiga 1.42
580     def jobSplittingForScript(self):#CarlosDaniele
581     """
582     Perform job splitting based on number of job
583     """
584     common.logger.debug(5,'Splitting per job')
585     common.logger.message('Required '+str(self.theNumberOfJobs)+' jobs in total ')
586    
587     self.total_number_of_jobs = self.theNumberOfJobs
588    
589     common.logger.debug(5,'N jobs '+str(self.total_number_of_jobs))
590    
591     common.logger.message(str(self.total_number_of_jobs)+' jobs can be created')
592    
593     # argument is seed number.$i
594     self.list_of_args = []
595     for i in range(self.total_number_of_jobs):
596     ## Since there is no input, any site is good
597     # self.jobDestination.append(["Any"])
598     self.jobDestination.append([""])
599     ## no random seed
600     self.list_of_args.append([str(i)])
601     return
602    
603 gutsche 1.3 def split(self, jobParams):
604    
605     common.jobDB.load()
606     #### Fabio
607     njobs = self.total_number_of_jobs
608 slacapra 1.9 arglist = self.list_of_args
609 gutsche 1.3 # create the empty structure
610     for i in range(njobs):
611     jobParams.append("")
612    
613     for job in range(njobs):
614 slacapra 1.17 jobParams[job] = arglist[job]
615     # print str(arglist[job])
616     # print jobParams[job]
617 gutsche 1.3 common.jobDB.setArguments(job, jobParams[job])
618 gutsche 1.35 common.logger.debug(5,"Job "+str(job)+" Destination: "+str(self.jobDestination[job]))
619     common.jobDB.setDestination(job, self.jobDestination[job])
620 gutsche 1.3
621     common.jobDB.save()
622     return
623    
624     def getJobTypeArguments(self, nj, sched):
625 slacapra 1.17 result = ''
626     for i in common.jobDB.arguments(nj):
627     result=result+str(i)+" "
628     return result
629 gutsche 1.3
630     def numberOfJobs(self):
631     # Fabio
632     return self.total_number_of_jobs
633    
634 slacapra 1.1 def getTarBall(self, exe):
635     """
636     Return the TarBall with lib and exe
637     """
638    
639     # if it exist, just return it
640 corvo 1.56 #
641     # Marco. Let's start to use relative path for Boss XML files
642     #
643     self.tgzNameWithPath = common.work_space.pathForTgz()+'share/'+self.tgz_name
644 slacapra 1.1 if os.path.exists(self.tgzNameWithPath):
645     return self.tgzNameWithPath
646    
647     # Prepare a tar gzipped file with user binaries.
648     self.buildTar_(exe)
649    
650     return string.strip(self.tgzNameWithPath)
651    
652     def buildTar_(self, executable):
653    
654     # First of all declare the user Scram area
655     swArea = self.scram.getSWArea_()
656     #print "swArea = ", swArea
657     swVersion = self.scram.getSWVersion()
658     #print "swVersion = ", swVersion
659     swReleaseTop = self.scram.getReleaseTop_()
660     #print "swReleaseTop = ", swReleaseTop
661    
662     ## check if working area is release top
663     if swReleaseTop == '' or swArea == swReleaseTop:
664     return
665    
666 slacapra 1.61 import tarfile
667     try: # create tar ball
668     tar = tarfile.open(self.tgzNameWithPath, "w:gz")
669     ## First find the executable
670     if (self.executable != ''):
671     exeWithPath = self.scram.findFile_(executable)
672     if ( not exeWithPath ):
673     raise CrabException('User executable '+executable+' not found')
674    
675     ## then check if it's private or not
676     if exeWithPath.find(swReleaseTop) == -1:
677     # the exe is private, so we must ship
678     common.logger.debug(5,"Exe "+exeWithPath+" to be tarred")
679     path = swArea+'/'
680     exe = string.replace(exeWithPath, path,'')
681     tar.add(path+exe,exe)
682     pass
683     else:
684     # the exe is from release, we'll find it on WN
685     pass
686    
687     ## Now get the libraries: only those in local working area
688     libDir = 'lib'
689     lib = swArea+'/' +libDir
690     common.logger.debug(5,"lib "+lib+" to be tarred")
691     if os.path.exists(lib):
692     tar.add(lib,libDir)
693    
694     ## Now check if module dir is present
695     moduleDir = 'module'
696     module = swArea + '/' + moduleDir
697     if os.path.isdir(module):
698     tar.add(module,moduleDir)
699    
700     ## Now check if any data dir(s) is present
701     swAreaLen=len(swArea)
702     for root, dirs, files in os.walk(swArea):
703     if "data" in dirs:
704     common.logger.debug(5,"data "+root+"/data"+" to be tarred")
705     tar.add(root+"/data",root[swAreaLen:]+"/data")
706    
707     ## copy ProdAgent dir to swArea
708     # cmd = '\cp -rf ' + os.environ['CRABDIR'] + '/ProdAgentApi ' + swArea
709     # cmd_out = runCommand(cmd)
710     # if cmd_out != '':
711     # common.logger.message('ProdAgentApi directory could not be copied to local CMSSW project directory.')
712     # common.logger.message('No FrameworkJobreport parsing is possible on the WorkerNode.')
713    
714     ## Add ProdAgent dir to tar
715     paDir = 'ProdAgentApi'
716     pa = os.environ['CRABDIR'] + '/' + 'ProdAgentApi'
717     if os.path.isdir(pa):
718     tar.add(pa,paDir)
719    
720     common.logger.debug(5,"Files added to "+self.tgzNameWithPath+" : "+str(tar.getnames()))
721     tar.close()
722     except :
723     raise CrabException('Could not create tar-ball')
724 corvo 1.56
725 slacapra 1.61 ## create tar-ball with ML stuff
726 corvo 1.58 self.MLtgzfile = common.work_space.pathForTgz()+'share/MLfiles.tgz'
727 slacapra 1.61 try:
728     tar = tarfile.open(self.MLtgzfile, "w:gz")
729     path=os.environ['CRABDIR'] + '/python/'
730     for file in ['report.py', 'DashboardAPI.py', 'Logger.py', 'ProcInfo.py', 'apmon.py', 'parseCrabFjr.py']:
731     tar.add(path+file,file)
732     common.logger.debug(5,"Files added to "+self.MLtgzfile+" : "+str(tar.getnames()))
733     tar.close()
734     except :
735 corvo 1.58 raise CrabException('Could not create ML files tar-ball')
736    
737 slacapra 1.1 return
738    
739     def wsSetupEnvironment(self, nj):
740     """
741     Returns part of a job script which prepares
742     the execution environment for the job 'nj'.
743     """
744     # Prepare JobType-independent part
745 gutsche 1.3 txt = ''
746    
747     ## OLI_Daniele at this level middleware already known
748    
749     txt += 'if [ $middleware == LCG ]; then \n'
750     txt += self.wsSetupCMSLCGEnvironment_()
751     txt += 'elif [ $middleware == OSG ]; then\n'
752 gutsche 1.43 txt += ' WORKING_DIR=`/bin/mktemp -d $OSG_WN_TMP/cms_XXXXXXXXXXXX`\n'
753     txt += ' echo "Created working directory: $WORKING_DIR"\n'
754 gutsche 1.3 txt += ' if [ ! -d $WORKING_DIR ] ;then\n'
755 gutsche 1.7 txt += ' echo "SET_CMS_ENV 10016 ==> OSG $WORKING_DIR could not be created on WN `hostname`"\n'
756     txt += ' echo "JOB_EXIT_STATUS = 10016"\n'
757     txt += ' echo "JobExitCode=10016" | tee -a $RUNTIME_AREA/$repo\n'
758     txt += ' dumpStatus $RUNTIME_AREA/$repo\n'
759 gutsche 1.13 txt += ' rm -f $RUNTIME_AREA/$repo \n'
760     txt += ' echo "MonitorJobID=`echo $MonitorJobID`" | tee -a $RUNTIME_AREA/$repo \n'
761     txt += ' echo "MonitorID=`echo $MonitorID`" | tee -a $RUNTIME_AREA/$repo\n'
762 gutsche 1.3 txt += ' exit 1\n'
763     txt += ' fi\n'
764     txt += '\n'
765     txt += ' echo "Change to working directory: $WORKING_DIR"\n'
766     txt += ' cd $WORKING_DIR\n'
767     txt += self.wsSetupCMSOSGEnvironment_()
768     txt += 'fi\n'
769 slacapra 1.1
770     # Prepare JobType-specific part
771     scram = self.scram.commandName()
772     txt += '\n\n'
773     txt += 'echo "### SPECIFIC JOB SETUP ENVIRONMENT ###"\n'
774     txt += scram+' project CMSSW '+self.version+'\n'
775     txt += 'status=$?\n'
776     txt += 'if [ $status != 0 ] ; then\n'
777 gutsche 1.7 txt += ' echo "SET_EXE_ENV 10034 ==>ERROR CMSSW '+self.version+' not found on `hostname`" \n'
778 gutsche 1.3 txt += ' echo "JOB_EXIT_STATUS = 10034"\n'
779 gutsche 1.7 txt += ' echo "JobExitCode=10034" | tee -a $RUNTIME_AREA/$repo\n'
780 slacapra 1.1 txt += ' dumpStatus $RUNTIME_AREA/$repo\n'
781 gutsche 1.13 txt += ' rm -f $RUNTIME_AREA/$repo \n'
782     txt += ' echo "MonitorJobID=`echo $MonitorJobID`" | tee -a $RUNTIME_AREA/$repo \n'
783     txt += ' echo "MonitorID=`echo $MonitorID`" | tee -a $RUNTIME_AREA/$repo\n'
784 gutsche 1.3 ## OLI_Daniele
785     txt += ' if [ $middleware == OSG ]; then \n'
786     txt += ' echo "Remove working directory: $WORKING_DIR"\n'
787     txt += ' cd $RUNTIME_AREA\n'
788     txt += ' /bin/rm -rf $WORKING_DIR\n'
789     txt += ' if [ -d $WORKING_DIR ] ;then\n'
790 gutsche 1.7 txt += ' echo "SET_CMS_ENV 10018 ==> OSG $WORKING_DIR could not be deleted on WN `hostname` after CMSSW CMSSW_0_6_1 not found on `hostname`"\n'
791     txt += ' echo "JOB_EXIT_STATUS = 10018"\n'
792     txt += ' echo "JobExitCode=10018" | tee -a $RUNTIME_AREA/$repo\n'
793     txt += ' dumpStatus $RUNTIME_AREA/$repo\n'
794 gutsche 1.13 txt += ' rm -f $RUNTIME_AREA/$repo \n'
795     txt += ' echo "MonitorJobID=`echo $MonitorJobID`" | tee -a $RUNTIME_AREA/$repo \n'
796     txt += ' echo "MonitorID=`echo $MonitorID`" | tee -a $RUNTIME_AREA/$repo\n'
797 gutsche 1.3 txt += ' fi\n'
798     txt += ' fi \n'
799     txt += ' exit 1 \n'
800 slacapra 1.1 txt += 'fi \n'
801     txt += 'echo "CMSSW_VERSION = '+self.version+'"\n'
802     txt += 'cd '+self.version+'\n'
803     ### needed grep for bug in scramv1 ###
804 corvo 1.58 txt += scram+' runtime -sh\n'
805 slacapra 1.1 txt += 'eval `'+scram+' runtime -sh | grep -v SCRAMRT_LSB_JOBNAME`\n'
806 corvo 1.58 txt += 'echo $PATH\n'
807 slacapra 1.1
808     # Handle the arguments:
809     txt += "\n"
810 gutsche 1.7 txt += "## number of arguments (first argument always jobnumber)\n"
811 slacapra 1.1 txt += "\n"
812 mkirn 1.32 # txt += "narg=$#\n"
813     txt += "if [ $nargs -lt 2 ]\n"
814 slacapra 1.1 txt += "then\n"
815 mkirn 1.33 txt += " echo 'SET_EXE_ENV 1 ==> ERROR Too few arguments' +$nargs+ \n"
816 gutsche 1.3 txt += ' echo "JOB_EXIT_STATUS = 50113"\n'
817 gutsche 1.7 txt += ' echo "JobExitCode=50113" | tee -a $RUNTIME_AREA/$repo\n'
818 slacapra 1.1 txt += ' dumpStatus $RUNTIME_AREA/$repo\n'
819 gutsche 1.13 txt += ' rm -f $RUNTIME_AREA/$repo \n'
820     txt += ' echo "MonitorJobID=`echo $MonitorJobID`" | tee -a $RUNTIME_AREA/$repo \n'
821     txt += ' echo "MonitorID=`echo $MonitorID`" | tee -a $RUNTIME_AREA/$repo\n'
822 gutsche 1.3 ## OLI_Daniele
823     txt += ' if [ $middleware == OSG ]; then \n'
824     txt += ' echo "Remove working directory: $WORKING_DIR"\n'
825     txt += ' cd $RUNTIME_AREA\n'
826     txt += ' /bin/rm -rf $WORKING_DIR\n'
827     txt += ' if [ -d $WORKING_DIR ] ;then\n'
828 gutsche 1.7 txt += ' echo "SET_EXE_ENV 50114 ==> OSG $WORKING_DIR could not be deleted on WN `hostname` after Too few arguments for CRAB job wrapper"\n'
829     txt += ' echo "JOB_EXIT_STATUS = 50114"\n'
830     txt += ' echo "JobExitCode=50114" | tee -a $RUNTIME_AREA/$repo\n'
831     txt += ' dumpStatus $RUNTIME_AREA/$repo\n'
832 gutsche 1.13 txt += ' rm -f $RUNTIME_AREA/$repo \n'
833     txt += ' echo "MonitorJobID=`echo $MonitorJobID`" | tee -a $RUNTIME_AREA/$repo \n'
834     txt += ' echo "MonitorID=`echo $MonitorID`" | tee -a $RUNTIME_AREA/$repo\n'
835 gutsche 1.3 txt += ' fi\n'
836     txt += ' fi \n'
837 slacapra 1.1 txt += " exit 1\n"
838     txt += "fi\n"
839     txt += "\n"
840    
841     # Prepare job-specific part
842     job = common.job_list[nj]
843 spiga 1.42 if self.pset != None: #CarlosDaniele
844     pset = os.path.basename(job.configFilename())
845     txt += '\n'
846     if (self.datasetPath): # standard job
847     #txt += 'InputFiles=$2\n'
848     txt += 'InputFiles=${args[1]}\n'
849     txt += 'MaxEvents=${args[2]}\n'
850     txt += 'SkipEvents=${args[3]}\n'
851     txt += 'echo "Inputfiles:<$InputFiles>"\n'
852     txt += 'sed "s#{\'INPUT\'}#$InputFiles#" $RUNTIME_AREA/'+pset+' > pset_tmp_1.cfg\n'
853     txt += 'echo "MaxEvents:<$MaxEvents>"\n'
854 gutsche 1.46 txt += 'sed "s#INPUTMAXEVENTS#$MaxEvents#" pset_tmp_1.cfg > pset_tmp_2.cfg\n'
855 spiga 1.42 txt += 'echo "SkipEvents:<$SkipEvents>"\n'
856 gutsche 1.46 txt += 'sed "s#INPUTSKIPEVENTS#$SkipEvents#" pset_tmp_2.cfg > pset.cfg\n'
857 spiga 1.42 else: # pythia like job
858     if (self.sourceSeed):
859 spiga 1.57 txt += 'FirstRun=${args[1]}\n'
860     txt += 'echo "FirstRun: <$FirstRun>"\n'
861     txt += 'sed "s#\<INPUTFIRSTRUN\>#$FirstRun#" $RUNTIME_AREA/'+pset+' > tmp_1.cfg\n'
862     else:
863     txt += '# Copy untouched pset\n'
864     txt += 'cp $RUNTIME_AREA/'+pset+' tmp_1.cfg\n'
865     if (self.sourceSeed):
866 spiga 1.42 # txt += 'Seed=$2\n'
867 spiga 1.57 txt += 'Seed=${args[2]}\n'
868 spiga 1.42 txt += 'echo "Seed: <$Seed>"\n'
869 spiga 1.57 txt += 'sed "s#\<INPUT\>#$Seed#" tmp_1.cfg > tmp_2.cfg\n'
870 spiga 1.42 if (self.sourceSeedVtx):
871     # txt += 'VtxSeed=$3\n'
872 spiga 1.57 txt += 'VtxSeed=${args[3]}\n'
873 spiga 1.42 txt += 'echo "VtxSeed: <$VtxSeed>"\n'
874 spiga 1.57 txt += 'sed "s#INPUTVTX#$VtxSeed#" tmp_2.cfg > pset.cfg\n'
875 spiga 1.42 else:
876 spiga 1.57 txt += 'mv tmp_2.cfg pset.cfg\n'
877 slacapra 1.28 else:
878 spiga 1.57 txt += 'mv tmp_1.cfg pset.cfg\n'
879     # txt += '# Copy untouched pset\n'
880     # txt += 'cp $RUNTIME_AREA/'+pset+' pset.cfg\n'
881 slacapra 1.24
882 slacapra 1.1
883     if len(self.additional_inbox_files) > 0:
884     for file in self.additional_inbox_files:
885 mkirn 1.31 relFile = file.split("/")[-1]
886     txt += 'if [ -e $RUNTIME_AREA/'+relFile+' ] ; then\n'
887     txt += ' cp $RUNTIME_AREA/'+relFile+' .\n'
888     txt += ' chmod +x '+relFile+'\n'
889 slacapra 1.1 txt += 'fi\n'
890     pass
891    
892 spiga 1.42 if self.pset != None: #CarlosDaniele
893     txt += 'echo "### END JOB SETUP ENVIRONMENT ###"\n\n'
894    
895     txt += '\n'
896     txt += 'echo "***** cat pset.cfg *********"\n'
897     txt += 'cat pset.cfg\n'
898     txt += 'echo "****** end pset.cfg ********"\n'
899     txt += '\n'
900     # txt += 'echo "***** cat pset1.cfg *********"\n'
901     # txt += 'cat pset1.cfg\n'
902     # txt += 'echo "****** end pset1.cfg ********"\n'
903 gutsche 1.3 return txt
904    
905     def wsBuildExe(self, nj):
906     """
907     Put in the script the commands to build an executable
908     or a library.
909     """
910    
911     txt = ""
912    
913     if os.path.isfile(self.tgzNameWithPath):
914     txt += 'echo "tar xzvf $RUNTIME_AREA/'+os.path.basename(self.tgzNameWithPath)+'"\n'
915     txt += 'tar xzvf $RUNTIME_AREA/'+os.path.basename(self.tgzNameWithPath)+'\n'
916     txt += 'untar_status=$? \n'
917     txt += 'if [ $untar_status -ne 0 ]; then \n'
918     txt += ' echo "SET_EXE 1 ==> ERROR Untarring .tgz file failed"\n'
919     txt += ' echo "JOB_EXIT_STATUS = $untar_status" \n'
920 gutsche 1.7 txt += ' echo "JobExitCode=$untar_status" | tee -a $RUNTIME_AREA/$repo\n'
921 gutsche 1.3 txt += ' if [ $middleware == OSG ]; then \n'
922     txt += ' echo "Remove working directory: $WORKING_DIR"\n'
923     txt += ' cd $RUNTIME_AREA\n'
924     txt += ' /bin/rm -rf $WORKING_DIR\n'
925     txt += ' if [ -d $WORKING_DIR ] ;then\n'
926 gutsche 1.13 txt += ' echo "SET_EXE 50999 ==> OSG $WORKING_DIR could not be deleted on WN `hostname` after Untarring .tgz file failed"\n'
927     txt += ' echo "JOB_EXIT_STATUS = 50999"\n'
928     txt += ' echo "JobExitCode=50999" | tee -a $RUNTIME_AREA/$repo\n'
929     txt += ' dumpStatus $RUNTIME_AREA/$repo\n'
930     txt += ' rm -f $RUNTIME_AREA/$repo \n'
931     txt += ' echo "MonitorJobID=`echo $MonitorJobID`" | tee -a $RUNTIME_AREA/$repo \n'
932     txt += ' echo "MonitorID=`echo $MonitorID`" | tee -a $RUNTIME_AREA/$repo\n'
933 gutsche 1.3 txt += ' fi\n'
934     txt += ' fi \n'
935     txt += ' \n'
936 gutsche 1.7 txt += ' exit 1 \n'
937 gutsche 1.3 txt += 'else \n'
938     txt += ' echo "Successful untar" \n'
939     txt += 'fi \n'
940 gutsche 1.50 txt += '\n'
941     txt += 'echo "Include ProdAgentApi in PYTHONPATH"\n'
942     txt += 'if [ -z "$PYTHONPATH" ]; then\n'
943     txt += ' export PYTHONPATH=ProdAgentApi\n'
944     txt += 'else\n'
945     txt += ' export PYTHONPATH=ProdAgentApi:${PYTHONPATH}\n'
946     txt += 'fi\n'
947     txt += '\n'
948    
949 gutsche 1.3 pass
950    
951 slacapra 1.1 return txt
952    
953     def modifySteeringCards(self, nj):
954     """
955     modify the card provided by the user,
956     writing a new card into share dir
957     """
958    
959     def executableName(self):
960 spiga 1.42 if self.pset == None: #CarlosDaniele
961     return "sh "
962     else:
963     return self.executable
964 slacapra 1.1
965     def executableArgs(self):
966 spiga 1.42 if self.pset == None:#CarlosDaniele
967     return self.scriptExe + " $NJob"
968     else:
969     return " -p pset.cfg"
970 slacapra 1.1
971     def inputSandbox(self, nj):
972     """
973     Returns a list of filenames to be put in JDL input sandbox.
974     """
975     inp_box = []
976 slacapra 1.53 # # dict added to delete duplicate from input sandbox file list
977     # seen = {}
978 slacapra 1.1 ## code
979     if os.path.isfile(self.tgzNameWithPath):
980     inp_box.append(self.tgzNameWithPath)
981 corvo 1.58 if os.path.isfile(self.MLtgzfile):
982     inp_box.append(self.MLtgzfile)
983 slacapra 1.1 ## config
984 spiga 1.42 if not self.pset is None: #CarlosDaniele
985 corvo 1.56 inp_box.append(common.work_space.pathForTgz() + 'job/' + self.configFilename())
986 slacapra 1.1 ## additional input files
987 gutsche 1.3 #for file in self.additional_inbox_files:
988     # inp_box.append(common.work_space.cwdDir()+file)
989 slacapra 1.1 return inp_box
990    
991     def outputSandbox(self, nj):
992     """
993     Returns a list of filenames to be put in JDL output sandbox.
994     """
995     out_box = []
996    
997     ## User Declared output files
998 slacapra 1.54 for out in (self.output_file+self.output_file_sandbox):
999 slacapra 1.1 n_out = nj + 1
1000     out_box.append(self.numberFile_(out,str(n_out)))
1001     return out_box
1002    
1003     def prepareSteeringCards(self):
1004     """
1005     Make initial modifications of the user's steering card file.
1006     """
1007     return
1008    
1009     def wsRenameOutput(self, nj):
1010     """
1011     Returns part of a job script which renames the produced files.
1012     """
1013    
1014     txt = '\n'
1015 gutsche 1.7 txt += '# directory content\n'
1016     txt += 'ls \n'
1017 slacapra 1.54
1018     for fileWithSuffix in (self.output_file+self.output_file_sandbox):
1019 slacapra 1.1 output_file_num = self.numberFile_(fileWithSuffix, '$NJob')
1020     txt += '\n'
1021 gutsche 1.7 txt += '# check output file\n'
1022 slacapra 1.1 txt += 'ls '+fileWithSuffix+'\n'
1023 fanzago 1.18 txt += 'ls_result=$?\n'
1024     txt += 'if [ $ls_result -ne 0 ] ; then\n'
1025     txt += ' echo "ERROR: Problem with output file"\n'
1026 gutsche 1.7 if common.scheduler.boss_scheduler_name == 'condor_g':
1027     txt += ' if [ $middleware == OSG ]; then \n'
1028     txt += ' echo "prepare dummy output file"\n'
1029     txt += ' echo "Processing of job output failed" > $RUNTIME_AREA/'+output_file_num+'\n'
1030     txt += ' fi \n'
1031 slacapra 1.1 txt += 'else\n'
1032     txt += ' cp '+fileWithSuffix+' $RUNTIME_AREA/'+output_file_num+'\n'
1033     txt += 'fi\n'
1034    
1035 gutsche 1.7 txt += 'cd $RUNTIME_AREA\n'
1036 fanzago 1.18 txt += 'cd $RUNTIME_AREA\n'
1037 gutsche 1.3 ### OLI_DANIELE
1038     txt += 'if [ $middleware == OSG ]; then\n'
1039     txt += ' cd $RUNTIME_AREA\n'
1040     txt += ' echo "Remove working directory: $WORKING_DIR"\n'
1041     txt += ' /bin/rm -rf $WORKING_DIR\n'
1042     txt += ' if [ -d $WORKING_DIR ] ;then\n'
1043 gutsche 1.7 txt += ' echo "SET_EXE 60999 ==> OSG $WORKING_DIR could not be deleted on WN `hostname` after cleanup of WN"\n'
1044     txt += ' echo "JOB_EXIT_STATUS = 60999"\n'
1045     txt += ' echo "JobExitCode=60999" | tee -a $RUNTIME_AREA/$repo\n'
1046     txt += ' dumpStatus $RUNTIME_AREA/$repo\n'
1047 gutsche 1.13 txt += ' rm -f $RUNTIME_AREA/$repo \n'
1048     txt += ' echo "MonitorJobID=`echo $MonitorJobID`" | tee -a $RUNTIME_AREA/$repo \n'
1049     txt += ' echo "MonitorID=`echo $MonitorID`" | tee -a $RUNTIME_AREA/$repo\n'
1050 gutsche 1.3 txt += ' fi\n'
1051     txt += 'fi\n'
1052     txt += '\n'
1053 slacapra 1.54
1054     file_list = ''
1055     ## Add to filelist only files to be possibly copied to SE
1056     for fileWithSuffix in self.output_file:
1057     output_file_num = self.numberFile_(fileWithSuffix, '$NJob')
1058     file_list=file_list+output_file_num+' '
1059     file_list=file_list[:-1]
1060     txt += 'file_list="'+file_list+'"\n'
1061    
1062 slacapra 1.1 return txt
1063    
1064     def numberFile_(self, file, txt):
1065     """
1066     append _'txt' before last extension of a file
1067     """
1068     p = string.split(file,".")
1069     # take away last extension
1070     name = p[0]
1071     for x in p[1:-1]:
1072     name=name+"."+x
1073     # add "_txt"
1074     if len(p)>1:
1075     ext = p[len(p)-1]
1076     result = name + '_' + txt + "." + ext
1077     else:
1078     result = name + '_' + txt
1079    
1080     return result
1081    
1082 slacapra 1.41 def getRequirements(self):
1083 slacapra 1.1 """
1084     return job requirements to add to jdl files
1085     """
1086     req = ''
1087 slacapra 1.47 if self.version:
1088 slacapra 1.10 req='Member("VO-cms-' + \
1089 slacapra 1.47 self.version + \
1090 slacapra 1.10 '", other.GlueHostApplicationSoftwareRunTimeEnvironment)'
1091 gutsche 1.35
1092     req = req + ' && (other.GlueHostNetworkAdapterOutboundIP)'
1093    
1094 slacapra 1.1 return req
1095 gutsche 1.3
1096     def configFilename(self):
1097     """ return the config filename """
1098     return self.name()+'.cfg'
1099    
1100     ### OLI_DANIELE
1101     def wsSetupCMSOSGEnvironment_(self):
1102     """
1103     Returns part of a job script which is prepares
1104     the execution environment and which is common for all CMS jobs.
1105     """
1106     txt = '\n'
1107     txt += ' echo "### SETUP CMS OSG ENVIRONMENT ###"\n'
1108     txt += ' if [ -f $GRID3_APP_DIR/cmssoft/cmsset_default.sh ] ;then\n'
1109     txt += ' # Use $GRID3_APP_DIR/cmssoft/cmsset_default.sh to setup cms software\n'
1110     txt += ' source $GRID3_APP_DIR/cmssoft/cmsset_default.sh '+self.version+'\n'
1111 mkirn 1.40 txt += ' elif [ -f $OSG_APP/cmssoft/cms/cmsset_default.sh ] ;then\n'
1112     txt += ' # Use $OSG_APP/cmssoft/cms/cmsset_default.sh to setup cms software\n'
1113     txt += ' source $OSG_APP/cmssoft/cms/cmsset_default.sh '+self.version+'\n'
1114 gutsche 1.3 txt += ' else\n'
1115 mkirn 1.40 txt += ' echo "SET_CMS_ENV 10020 ==> ERROR $GRID3_APP_DIR/cmssoft/cmsset_default.sh and $OSG_APP/cmssoft/cms/cmsset_default.sh file not found"\n'
1116 gutsche 1.3 txt += ' echo "JOB_EXIT_STATUS = 10020"\n'
1117     txt += ' echo "JobExitCode=10020" | tee -a $RUNTIME_AREA/$repo\n'
1118     txt += ' dumpStatus $RUNTIME_AREA/$repo\n'
1119 gutsche 1.13 txt += ' rm -f $RUNTIME_AREA/$repo \n'
1120     txt += ' echo "MonitorJobID=`echo $MonitorJobID`" | tee -a $RUNTIME_AREA/$repo \n'
1121     txt += ' echo "MonitorID=`echo $MonitorID`" | tee -a $RUNTIME_AREA/$repo\n'
1122 gutsche 1.7 txt += ' exit 1\n'
1123 gutsche 1.3 txt += '\n'
1124     txt += ' echo "Remove working directory: $WORKING_DIR"\n'
1125     txt += ' cd $RUNTIME_AREA\n'
1126     txt += ' /bin/rm -rf $WORKING_DIR\n'
1127     txt += ' if [ -d $WORKING_DIR ] ;then\n'
1128 mkirn 1.40 txt += ' echo "SET_CMS_ENV 10017 ==> OSG $WORKING_DIR could not be deleted on WN `hostname` after $GRID3_APP_DIR/cmssoft/cmsset_default.sh and $OSG_APP/cmssoft/cms/cmsset_default.sh file not found"\n'
1129 gutsche 1.7 txt += ' echo "JOB_EXIT_STATUS = 10017"\n'
1130     txt += ' echo "JobExitCode=10017" | tee -a $RUNTIME_AREA/$repo\n'
1131     txt += ' dumpStatus $RUNTIME_AREA/$repo\n'
1132 gutsche 1.13 txt += ' rm -f $RUNTIME_AREA/$repo \n'
1133     txt += ' echo "MonitorJobID=`echo $MonitorJobID`" | tee -a $RUNTIME_AREA/$repo \n'
1134     txt += ' echo "MonitorID=`echo $MonitorID`" | tee -a $RUNTIME_AREA/$repo\n'
1135 gutsche 1.3 txt += ' fi\n'
1136     txt += '\n'
1137 gutsche 1.7 txt += ' exit 1\n'
1138 gutsche 1.3 txt += ' fi\n'
1139     txt += '\n'
1140     txt += ' echo "SET_CMS_ENV 0 ==> setup cms environment ok"\n'
1141     txt += ' echo " END SETUP CMS OSG ENVIRONMENT "\n'
1142    
1143     return txt
1144    
1145     ### OLI_DANIELE
1146     def wsSetupCMSLCGEnvironment_(self):
1147     """
1148     Returns part of a job script which is prepares
1149     the execution environment and which is common for all CMS jobs.
1150     """
1151     txt = ' \n'
1152     txt += ' echo " ### SETUP CMS LCG ENVIRONMENT ### "\n'
1153     txt += ' if [ ! $VO_CMS_SW_DIR ] ;then\n'
1154     txt += ' echo "SET_CMS_ENV 10031 ==> ERROR CMS software dir not found on WN `hostname`"\n'
1155     txt += ' echo "JOB_EXIT_STATUS = 10031" \n'
1156     txt += ' echo "JobExitCode=10031" | tee -a $RUNTIME_AREA/$repo\n'
1157     txt += ' dumpStatus $RUNTIME_AREA/$repo\n'
1158 gutsche 1.13 txt += ' rm -f $RUNTIME_AREA/$repo \n'
1159     txt += ' echo "MonitorJobID=`echo $MonitorJobID`" | tee -a $RUNTIME_AREA/$repo \n'
1160     txt += ' echo "MonitorID=`echo $MonitorID`" | tee -a $RUNTIME_AREA/$repo\n'
1161 gutsche 1.7 txt += ' exit 1\n'
1162 gutsche 1.3 txt += ' else\n'
1163     txt += ' echo "Sourcing environment... "\n'
1164     txt += ' if [ ! -s $VO_CMS_SW_DIR/cmsset_default.sh ] ;then\n'
1165     txt += ' echo "SET_CMS_ENV 10020 ==> ERROR cmsset_default.sh file not found into dir $VO_CMS_SW_DIR"\n'
1166     txt += ' echo "JOB_EXIT_STATUS = 10020"\n'
1167     txt += ' echo "JobExitCode=10020" | tee -a $RUNTIME_AREA/$repo\n'
1168     txt += ' dumpStatus $RUNTIME_AREA/$repo\n'
1169 gutsche 1.13 txt += ' rm -f $RUNTIME_AREA/$repo \n'
1170     txt += ' echo "MonitorJobID=`echo $MonitorJobID`" | tee -a $RUNTIME_AREA/$repo \n'
1171     txt += ' echo "MonitorID=`echo $MonitorID`" | tee -a $RUNTIME_AREA/$repo\n'
1172 gutsche 1.7 txt += ' exit 1\n'
1173 gutsche 1.3 txt += ' fi\n'
1174     txt += ' echo "sourcing $VO_CMS_SW_DIR/cmsset_default.sh"\n'
1175     txt += ' source $VO_CMS_SW_DIR/cmsset_default.sh\n'
1176     txt += ' result=$?\n'
1177     txt += ' if [ $result -ne 0 ]; then\n'
1178     txt += ' echo "SET_CMS_ENV 10032 ==> ERROR problem sourcing $VO_CMS_SW_DIR/cmsset_default.sh"\n'
1179     txt += ' echo "JOB_EXIT_STATUS = 10032"\n'
1180     txt += ' echo "JobExitCode=10032" | tee -a $RUNTIME_AREA/$repo\n'
1181     txt += ' dumpStatus $RUNTIME_AREA/$repo\n'
1182 gutsche 1.13 txt += ' rm -f $RUNTIME_AREA/$repo \n'
1183     txt += ' echo "MonitorJobID=`echo $MonitorJobID`" | tee -a $RUNTIME_AREA/$repo \n'
1184     txt += ' echo "MonitorID=`echo $MonitorID`" | tee -a $RUNTIME_AREA/$repo\n'
1185 gutsche 1.7 txt += ' exit 1\n'
1186 gutsche 1.3 txt += ' fi\n'
1187     txt += ' fi\n'
1188     txt += ' \n'
1189     txt += ' string=`cat /etc/redhat-release`\n'
1190     txt += ' echo $string\n'
1191     txt += ' if [[ $string = *alhalla* ]]; then\n'
1192     txt += ' echo "SCRAM_ARCH= $SCRAM_ARCH"\n'
1193     txt += ' elif [[ $string = *Enterprise* ]] || [[ $string = *cientific* ]]; then\n'
1194     txt += ' export SCRAM_ARCH=slc3_ia32_gcc323\n'
1195     txt += ' echo "SCRAM_ARCH= $SCRAM_ARCH"\n'
1196     txt += ' else\n'
1197 gutsche 1.7 txt += ' echo "SET_CMS_ENV 10033 ==> ERROR OS unknown, LCG environment not initialized"\n'
1198 gutsche 1.3 txt += ' echo "JOB_EXIT_STATUS = 10033"\n'
1199     txt += ' echo "JobExitCode=10033" | tee -a $RUNTIME_AREA/$repo\n'
1200     txt += ' dumpStatus $RUNTIME_AREA/$repo\n'
1201 gutsche 1.13 txt += ' rm -f $RUNTIME_AREA/$repo \n'
1202     txt += ' echo "MonitorJobID=`echo $MonitorJobID`" | tee -a $RUNTIME_AREA/$repo \n'
1203     txt += ' echo "MonitorID=`echo $MonitorID`" | tee -a $RUNTIME_AREA/$repo\n'
1204 gutsche 1.7 txt += ' exit 1\n'
1205 gutsche 1.3 txt += ' fi\n'
1206     txt += ' echo "SET_CMS_ENV 0 ==> setup cms environment ok"\n'
1207     txt += ' echo "### END SETUP CMS LCG ENVIRONMENT ###"\n'
1208     return txt
1209 gutsche 1.5
1210     def setParam_(self, param, value):
1211     self._params[param] = value
1212    
1213     def getParams(self):
1214     return self._params
1215 gutsche 1.8
1216     def setTaskid_(self):
1217     self._taskId = self.cfg_params['taskId']
1218    
1219     def getTaskid(self):
1220     return self._taskId
1221 gutsche 1.35
1222     #######################################################################
1223     def uniquelist(self, old):
1224     """
1225     remove duplicates from a list
1226     """
1227     nd={}
1228     for e in old:
1229     nd[e]=0
1230     return nd.keys()