ViewVC Help
View File | Revision Log | Show Annotations | Root Listing
root/cvsroot/COMP/CRAB/python/cms_cmssw.py
Revision: 1.57
Committed: Fri Dec 29 09:25:31 2006 UTC (18 years, 4 months ago) by spiga
Content type: text/x-python
Branch: MAIN
Changes since 1.56: +45 -14 lines
Log Message:
added "first run" parameter managment for pythia job

File Contents

# User Rev Content
1 slacapra 1.1 from JobType import JobType
2     from crab_logger import Logger
3     from crab_exceptions import *
4     from crab_util import *
5 slacapra 1.22 import math
6 slacapra 1.1 import common
7 gutsche 1.3 import PsetManipulator
8 slacapra 1.1
9 slacapra 1.41 import DBSInfo
10     import DataDiscovery
11     import DataLocation
12 slacapra 1.1 import Scram
13    
14 corvo 1.56 import glob, os, string, re, shutil
15 slacapra 1.1
16     class Cmssw(JobType):
17 gutsche 1.38 def __init__(self, cfg_params, ncjobs):
18 slacapra 1.1 JobType.__init__(self, 'CMSSW')
19     common.logger.debug(3,'CMSSW::__init__')
20    
21 gutsche 1.3 # Marco.
22     self._params = {}
23     self.cfg_params = cfg_params
24 gutsche 1.38
25 gutsche 1.44 # number of jobs requested to be created, limit obj splitting
26 gutsche 1.38 self.ncjobs = ncjobs
27    
28 slacapra 1.1 log = common.logger
29    
30     self.scram = Scram.Scram(cfg_params)
31     scramArea = ''
32     self.additional_inbox_files = []
33     self.scriptExe = ''
34     self.executable = ''
35     self.tgz_name = 'default.tgz'
36 corvo 1.56 self.scriptName = 'CMSSW.sh'
37 spiga 1.42 self.pset = '' #scrip use case Da
38     self.datasetPath = '' #scrip use case Da
39 gutsche 1.3
40 gutsche 1.50 # set FJR file name
41     self.fjrFileName = 'crab_fjr.xml'
42    
43 slacapra 1.1 self.version = self.scram.getSWVersion()
44 slacapra 1.55 common.taskDB.setDict('codeVersion',self.version)
45 gutsche 1.5 self.setParam_('application', self.version)
46 slacapra 1.47
47 slacapra 1.1 ### collect Data cards
48     try:
49 slacapra 1.9 tmp = cfg_params['CMSSW.datasetpath']
50     log.debug(6, "CMSSW::CMSSW(): datasetPath = "+tmp)
51     if string.lower(tmp)=='none':
52     self.datasetPath = None
53 slacapra 1.21 self.selectNoInput = 1
54 slacapra 1.9 else:
55     self.datasetPath = tmp
56 slacapra 1.21 self.selectNoInput = 0
57 slacapra 1.1 except KeyError:
58 gutsche 1.3 msg = "Error: datasetpath not defined "
59 slacapra 1.1 raise CrabException(msg)
60 gutsche 1.5
61     # ML monitoring
62     # split dataset path style: /PreProdR3Minbias/SIM/GEN-SIM
63 slacapra 1.9 if not self.datasetPath:
64     self.setParam_('dataset', 'None')
65     self.setParam_('owner', 'None')
66     else:
67     datasetpath_split = self.datasetPath.split("/")
68     self.setParam_('dataset', datasetpath_split[1])
69     self.setParam_('owner', datasetpath_split[-1])
70    
71 gutsche 1.8 self.setTaskid_()
72     self.setParam_('taskId', self.cfg_params['taskId'])
73 gutsche 1.5
74 slacapra 1.1 self.dataTiers = []
75    
76     ## now the application
77     try:
78     self.executable = cfg_params['CMSSW.executable']
79 gutsche 1.5 self.setParam_('exe', self.executable)
80 slacapra 1.1 log.debug(6, "CMSSW::CMSSW(): executable = "+self.executable)
81     msg = "Default executable cmsRun overridden. Switch to " + self.executable
82     log.debug(3,msg)
83     except KeyError:
84     self.executable = 'cmsRun'
85 gutsche 1.5 self.setParam_('exe', self.executable)
86 slacapra 1.1 msg = "User executable not defined. Use cmsRun"
87     log.debug(3,msg)
88     pass
89    
90     try:
91     self.pset = cfg_params['CMSSW.pset']
92     log.debug(6, "Cmssw::Cmssw(): PSet file = "+self.pset)
93 spiga 1.42 if self.pset.lower() != 'none' :
94     if (not os.path.exists(self.pset)):
95     raise CrabException("User defined PSet file "+self.pset+" does not exist")
96     else:
97     self.pset = None
98 slacapra 1.1 except KeyError:
99     raise CrabException("PSet file missing. Cannot run cmsRun ")
100    
101     # output files
102 slacapra 1.53 ## stuff which must be returned always via sandbox
103     self.output_file_sandbox = []
104    
105     # add fjr report by default via sandbox
106     self.output_file_sandbox.append(self.fjrFileName)
107    
108     # other output files to be returned via sandbox or copied to SE
109 slacapra 1.1 try:
110     self.output_file = []
111    
112 gutsche 1.50
113 slacapra 1.1 tmp = cfg_params['CMSSW.output_file']
114     if tmp != '':
115     tmpOutFiles = string.split(cfg_params['CMSSW.output_file'],',')
116     log.debug(7, 'cmssw::cmssw(): output files '+str(tmpOutFiles))
117     for tmp in tmpOutFiles:
118     tmp=string.strip(tmp)
119     self.output_file.append(tmp)
120     pass
121     else:
122 gutsche 1.50 log.message("No output file defined: only stdout/err and the CRAB Framework Job Report will be available")
123 slacapra 1.1 pass
124     pass
125     except KeyError:
126 gutsche 1.50 log.message("No output file defined: only stdout/err and the CRAB Framework Job Report will be available")
127 slacapra 1.1 pass
128    
129     # script_exe file as additional file in inputSandbox
130     try:
131 slacapra 1.10 self.scriptExe = cfg_params['USER.script_exe']
132     if self.scriptExe != '':
133     if not os.path.isfile(self.scriptExe):
134     msg ="WARNING. file "+self.scriptExe+" not found"
135     raise CrabException(msg)
136 spiga 1.42 self.additional_inbox_files.append(string.strip(self.scriptExe))
137 slacapra 1.1 except KeyError:
138 spiga 1.42 self.scriptExe = ''
139     #CarlosDaniele
140     if self.datasetPath == None and self.pset == None and self.scriptExe == '' :
141     msg ="WARNING. script_exe not defined"
142     raise CrabException(msg)
143    
144 slacapra 1.1 ## additional input files
145     try:
146 slacapra 1.29 tmpAddFiles = string.split(cfg_params['USER.additional_input_files'],',')
147 slacapra 1.1 for tmp in tmpAddFiles:
148 slacapra 1.51 tmp = string.strip(tmp)
149 slacapra 1.45 dirname = ''
150     if not tmp[0]=="/": dirname = "."
151     files = glob.glob(os.path.join(dirname, tmp))
152     for file in files:
153     if not os.path.exists(file):
154     raise CrabException("Additional input file not found: "+file)
155     pass
156     self.additional_inbox_files.append(string.strip(file))
157 slacapra 1.1 pass
158     pass
159 slacapra 1.45 common.logger.debug(5,"Additional input files: "+str(self.additional_inbox_files))
160 slacapra 1.1 except KeyError:
161     pass
162    
163 slacapra 1.9 # files per job
164 slacapra 1.1 try:
165 gutsche 1.35 if (cfg_params['CMSSW.files_per_jobs']):
166     raise CrabException("files_per_jobs no longer supported. Quitting.")
167 gutsche 1.3 except KeyError:
168 gutsche 1.35 pass
169 gutsche 1.3
170 slacapra 1.9 ## Events per job
171 gutsche 1.3 try:
172 slacapra 1.10 self.eventsPerJob =int( cfg_params['CMSSW.events_per_job'])
173 slacapra 1.9 self.selectEventsPerJob = 1
174 gutsche 1.3 except KeyError:
175 slacapra 1.9 self.eventsPerJob = -1
176     self.selectEventsPerJob = 0
177    
178 slacapra 1.22 ## number of jobs
179     try:
180     self.theNumberOfJobs =int( cfg_params['CMSSW.number_of_jobs'])
181     self.selectNumberOfJobs = 1
182     except KeyError:
183     self.theNumberOfJobs = 0
184     self.selectNumberOfJobs = 0
185 slacapra 1.10
186 gutsche 1.35 try:
187     self.total_number_of_events = int(cfg_params['CMSSW.total_number_of_events'])
188     self.selectTotalNumberEvents = 1
189     except KeyError:
190     self.total_number_of_events = 0
191     self.selectTotalNumberEvents = 0
192    
193 spiga 1.42 if self.pset != None: #CarlosDaniele
194     if ( (self.selectTotalNumberEvents + self.selectEventsPerJob + self.selectNumberOfJobs) != 2 ):
195     msg = 'Must define exactly two of total_number_of_events, events_per_job, or number_of_jobs.'
196     raise CrabException(msg)
197     else:
198     if (self.selectNumberOfJobs == 0):
199     msg = 'Must specify number_of_jobs.'
200     raise CrabException(msg)
201 gutsche 1.35
202 slacapra 1.22 ## source seed for pythia
203     try:
204     self.sourceSeed = int(cfg_params['CMSSW.pythia_seed'])
205     except KeyError:
206 slacapra 1.23 self.sourceSeed = None
207     common.logger.debug(5,"No seed given")
208 slacapra 1.22
209 slacapra 1.28 try:
210     self.sourceSeedVtx = int(cfg_params['CMSSW.vtx_seed'])
211     except KeyError:
212     self.sourceSeedVtx = None
213     common.logger.debug(5,"No vertex seed given")
214 spiga 1.57 try:
215     self.firstRun = int(cfg_params['CMSSW.first_run'])
216     except KeyError:
217     self.firstRun = None
218     common.logger.debug(5,"No first run given")
219 spiga 1.42 if self.pset != None: #CarlosDaniele
220     self.PsetEdit = PsetManipulator.PsetManipulator(self.pset) #Daniele Pset
221 gutsche 1.3
222 slacapra 1.1 #DBSDLS-start
223     ## Initialize the variables that are extracted from DBS/DLS and needed in other places of the code
224     self.maxEvents=0 # max events available ( --> check the requested nb. of evts in Creator.py)
225     self.DBSPaths={} # all dbs paths requested ( --> input to the site local discovery script)
226 gutsche 1.35 self.jobDestination=[] # Site destination(s) for each job (list of lists)
227 slacapra 1.1 ## Perform the data location and discovery (based on DBS/DLS)
228 slacapra 1.9 ## SL: Don't if NONE is specified as input (pythia use case)
229 gutsche 1.35 blockSites = {}
230 slacapra 1.9 if self.datasetPath:
231 gutsche 1.35 blockSites = self.DataDiscoveryAndLocation(cfg_params)
232 slacapra 1.1 #DBSDLS-end
233    
234     self.tgzNameWithPath = self.getTarBall(self.executable)
235 slacapra 1.10
236 slacapra 1.9 ## Select Splitting
237 spiga 1.42 if self.selectNoInput:
238     if self.pset == None: #CarlosDaniele
239     self.jobSplittingForScript()
240     else:
241     self.jobSplittingNoInput()
242 corvo 1.56 else:
243     self.jobSplittingByBlocks(blockSites)
244 gutsche 1.5
245 slacapra 1.22 # modify Pset
246 spiga 1.42 if self.pset != None: #CarlosDaniele
247     try:
248     if (self.datasetPath): # standard job
249     # allow to processa a fraction of events in a file
250     self.PsetEdit.inputModule("INPUT")
251     self.PsetEdit.maxEvent("INPUTMAXEVENTS")
252     self.PsetEdit.skipEvent("INPUTSKIPEVENTS")
253     else: # pythia like job
254     self.PsetEdit.maxEvent(self.eventsPerJob)
255 spiga 1.57 if (self.firstRun):
256     self.PsetEdit.pythiaFirstRun("INPUTFIRSTRUN") #First Run
257 spiga 1.42 if (self.sourceSeed) :
258     self.PsetEdit.pythiaSeed("INPUT")
259     if (self.sourceSeedVtx) :
260     self.PsetEdit.pythiaSeedVtx("INPUTVTX")
261 gutsche 1.50 # add FrameworkJobReport to parameter-set
262     self.PsetEdit.addCrabFJR(self.fjrFileName)
263 spiga 1.42 self.PsetEdit.psetWriter(self.configFilename())
264     except:
265     msg='Error while manipuliating ParameterSet: exiting...'
266     raise CrabException(msg)
267 gutsche 1.3
268 slacapra 1.1 def DataDiscoveryAndLocation(self, cfg_params):
269    
270 gutsche 1.3 common.logger.debug(10,"CMSSW::DataDiscoveryAndLocation()")
271    
272     datasetPath=self.datasetPath
273    
274     ## TODO
275     dataTiersList = ""
276     dataTiers = dataTiersList.split(',')
277 slacapra 1.1
278     ## Contact the DBS
279 slacapra 1.41 common.logger.message("Contacting DBS...")
280 slacapra 1.1 try:
281 slacapra 1.41 self.pubdata=DataDiscovery.DataDiscovery(datasetPath, dataTiers, cfg_params)
282 slacapra 1.1 self.pubdata.fetchDBSInfo()
283    
284 slacapra 1.41 except DataDiscovery.NotExistingDatasetError, ex :
285 slacapra 1.1 msg = 'ERROR ***: failed Data Discovery in DBS : %s'%ex.getErrorMessage()
286     raise CrabException(msg)
287    
288 slacapra 1.41 except DataDiscovery.NoDataTierinProvenanceError, ex :
289 slacapra 1.1 msg = 'ERROR ***: failed Data Discovery in DBS : %s'%ex.getErrorMessage()
290     raise CrabException(msg)
291 slacapra 1.41 except DataDiscovery.DataDiscoveryError, ex:
292 slacapra 1.1 msg = 'ERROR ***: failed Data Discovery in DBS %s'%ex.getErrorMessage()
293     raise CrabException(msg)
294    
295     ## get list of all required data in the form of dbs paths (dbs path = /dataset/datatier/owner)
296 gutsche 1.3 ## self.DBSPaths=self.pubdata.getDBSPaths()
297     common.logger.message("Required data are :"+self.datasetPath)
298    
299 gutsche 1.35 self.filesbyblock=self.pubdata.getFiles()
300 mkirn 1.37 self.eventsbyblock=self.pubdata.getEventsPerBlock()
301     self.eventsbyfile=self.pubdata.getEventsPerFile()
302 slacapra 1.41 # print str(self.filesbyblock)
303     # print 'self.eventsbyfile',len(self.eventsbyfile)
304     # print str(self.eventsbyfile)
305 gutsche 1.3
306 slacapra 1.1 ## get max number of events
307     self.maxEvents=self.pubdata.getMaxEvents() ## self.maxEvents used in Creator.py
308 gutsche 1.44 common.logger.message("The number of available events is %s\n"%self.maxEvents)
309 slacapra 1.1
310 slacapra 1.41 common.logger.message("Contacting DLS...")
311 slacapra 1.1 ## Contact the DLS and build a list of sites hosting the fileblocks
312     try:
313 slacapra 1.41 dataloc=DataLocation.DataLocation(self.filesbyblock.keys(),cfg_params)
314 gutsche 1.6 dataloc.fetchDLSInfo()
315 slacapra 1.41 except DataLocation.DataLocationError , ex:
316 slacapra 1.1 msg = 'ERROR ***: failed Data Location in DLS \n %s '%ex.getErrorMessage()
317     raise CrabException(msg)
318    
319    
320 gutsche 1.35 sites = dataloc.getSites()
321     allSites = []
322     listSites = sites.values()
323     for list in listSites:
324     for oneSite in list:
325     allSites.append(oneSite)
326     allSites = self.uniquelist(allSites)
327 gutsche 1.3
328 gutsche 1.35 common.logger.message("Sites ("+str(len(allSites))+") hosting part/all of dataset: "+str(allSites))
329     common.logger.debug(6, "List of Sites: "+str(allSites))
330     return sites
331 gutsche 1.3
332 gutsche 1.35 def jobSplittingByBlocks(self, blockSites):
333 slacapra 1.9 """
334 gutsche 1.35 Perform job splitting. Jobs run over an integer number of files
335     and no more than one block.
336     ARGUMENT: blockSites: dictionary with blocks as keys and list of host sites as values
337     REQUIRES: self.selectTotalNumberEvents, self.selectEventsPerJob, self.selectNumberofJobs,
338     self.total_number_of_events, self.eventsPerJob, self.theNumberOfJobs,
339     self.maxEvents, self.filesbyblock
340     SETS: self.jobDestination - Site destination(s) for each job (a list of lists)
341     self.total_number_of_jobs - Total # of jobs
342     self.list_of_args - File(s) job will run on (a list of lists)
343     """
344    
345     # ---- Handle the possible job splitting configurations ---- #
346     if (self.selectTotalNumberEvents):
347     totalEventsRequested = self.total_number_of_events
348     if (self.selectEventsPerJob):
349     eventsPerJobRequested = self.eventsPerJob
350     if (self.selectNumberOfJobs):
351     totalEventsRequested = self.theNumberOfJobs * self.eventsPerJob
352    
353     # If user requested all the events in the dataset
354     if (totalEventsRequested == -1):
355     eventsRemaining=self.maxEvents
356     # If user requested more events than are in the dataset
357     elif (totalEventsRequested > self.maxEvents):
358     eventsRemaining = self.maxEvents
359     common.logger.message("Requested "+str(self.total_number_of_events)+ " events, but only "+str(self.maxEvents)+" events are available.")
360     # If user requested less events than are in the dataset
361     else:
362     eventsRemaining = totalEventsRequested
363 slacapra 1.22
364 slacapra 1.41 # If user requested more events per job than are in the dataset
365     if (self.selectEventsPerJob and eventsPerJobRequested > self.maxEvents):
366     eventsPerJobRequested = self.maxEvents
367    
368 gutsche 1.35 # For user info at end
369     totalEventCount = 0
370 gutsche 1.3
371 gutsche 1.35 if (self.selectTotalNumberEvents and self.selectNumberOfJobs):
372     eventsPerJobRequested = int(eventsRemaining/self.theNumberOfJobs)
373 slacapra 1.22
374 gutsche 1.35 if (self.selectNumberOfJobs):
375     common.logger.message("May not create the exact number_of_jobs requested.")
376 slacapra 1.23
377 gutsche 1.38 if ( self.ncjobs == 'all' ) :
378     totalNumberOfJobs = 999999999
379     else :
380     totalNumberOfJobs = self.ncjobs
381    
382    
383 gutsche 1.35 blocks = blockSites.keys()
384     blockCount = 0
385     # Backup variable in case self.maxEvents counted events in a non-included block
386     numBlocksInDataset = len(blocks)
387 gutsche 1.3
388 gutsche 1.35 jobCount = 0
389     list_of_lists = []
390 gutsche 1.3
391 gutsche 1.35 # ---- Iterate over the blocks in the dataset until ---- #
392     # ---- we've met the requested total # of events ---- #
393 gutsche 1.38 while ( (eventsRemaining > 0) and (blockCount < numBlocksInDataset) and (jobCount < totalNumberOfJobs)):
394 gutsche 1.35 block = blocks[blockCount]
395 gutsche 1.44 blockCount += 1
396    
397 gutsche 1.3
398 gutsche 1.44 numEventsInBlock = self.eventsbyblock[block]
399     common.logger.debug(5,'Events in Block File '+str(numEventsInBlock))
400 slacapra 1.9
401 gutsche 1.35 files = self.filesbyblock[block]
402     numFilesInBlock = len(files)
403     if (numFilesInBlock <= 0):
404     continue
405     fileCount = 0
406    
407     # ---- New block => New job ---- #
408     parString = "\\{"
409 gutsche 1.38 # counter for number of events in files currently worked on
410     filesEventCount = 0
411     # flag if next while loop should touch new file
412     newFile = 1
413     # job event counter
414     jobSkipEventCount = 0
415 slacapra 1.9
416 gutsche 1.35 # ---- Iterate over the files in the block until we've met the requested ---- #
417     # ---- total # of events or we've gone over all the files in this block ---- #
418 gutsche 1.38 while ( (eventsRemaining > 0) and (fileCount < numFilesInBlock) and (jobCount < totalNumberOfJobs) ):
419 gutsche 1.35 file = files[fileCount]
420 gutsche 1.38 if newFile :
421 slacapra 1.41 try:
422     numEventsInFile = self.eventsbyfile[file]
423     common.logger.debug(6, "File "+str(file)+" has "+str(numEventsInFile)+" events")
424     # increase filesEventCount
425     filesEventCount += numEventsInFile
426     # Add file to current job
427     parString += '\\\"' + file + '\\\"\,'
428     newFile = 0
429     except KeyError:
430 gutsche 1.44 common.logger.message("File "+str(file)+" has unknown number of events: skipping")
431 slacapra 1.41
432 gutsche 1.38
433     # if less events in file remain than eventsPerJobRequested
434     if ( filesEventCount - jobSkipEventCount < eventsPerJobRequested ) :
435     # if last file in block
436 gutsche 1.44 if ( fileCount == numFilesInBlock-1 ) :
437 gutsche 1.38 # end job using last file, use remaining events in block
438     # close job and touch new file
439     fullString = parString[:-2]
440     fullString += '\\}'
441     list_of_lists.append([fullString,str(-1),str(jobSkipEventCount)])
442 slacapra 1.41 common.logger.debug(3,"Job "+str(jobCount+1)+" can run over "+str(filesEventCount - jobSkipEventCount)+" events (last file in block).")
443 gutsche 1.38 self.jobDestination.append(blockSites[block])
444     common.logger.debug(5,"Job "+str(jobCount+1)+" Destination: "+str(self.jobDestination[jobCount]))
445     # reset counter
446     jobCount = jobCount + 1
447 gutsche 1.44 totalEventCount = totalEventCount + filesEventCount - jobSkipEventCount
448     eventsRemaining = eventsRemaining - filesEventCount + jobSkipEventCount
449 gutsche 1.38 jobSkipEventCount = 0
450     # reset file
451     parString = "\\{"
452     filesEventCount = 0
453     newFile = 1
454     fileCount += 1
455     else :
456     # go to next file
457     newFile = 1
458     fileCount += 1
459     # if events in file equal to eventsPerJobRequested
460     elif ( filesEventCount - jobSkipEventCount == eventsPerJobRequested ) :
461     # close job and touch new file
462 gutsche 1.35 fullString = parString[:-2]
463     fullString += '\\}'
464 gutsche 1.38 list_of_lists.append([fullString,str(eventsPerJobRequested),str(jobSkipEventCount)])
465 slacapra 1.41 common.logger.debug(3,"Job "+str(jobCount+1)+" can run over "+str(eventsPerJobRequested)+" events.")
466 gutsche 1.38 self.jobDestination.append(blockSites[block])
467     common.logger.debug(5,"Job "+str(jobCount+1)+" Destination: "+str(self.jobDestination[jobCount]))
468     # reset counter
469     jobCount = jobCount + 1
470     totalEventCount = totalEventCount + eventsPerJobRequested
471     eventsRemaining = eventsRemaining - eventsPerJobRequested
472     jobSkipEventCount = 0
473     # reset file
474     parString = "\\{"
475     filesEventCount = 0
476     newFile = 1
477     fileCount += 1
478    
479     # if more events in file remain than eventsPerJobRequested
480     else :
481     # close job but don't touch new file
482     fullString = parString[:-2]
483     fullString += '\\}'
484     list_of_lists.append([fullString,str(eventsPerJobRequested),str(jobSkipEventCount)])
485 slacapra 1.41 common.logger.debug(3,"Job "+str(jobCount+1)+" can run over "+str(eventsPerJobRequested)+" events.")
486 gutsche 1.35 self.jobDestination.append(blockSites[block])
487     common.logger.debug(5,"Job "+str(jobCount+1)+" Destination: "+str(self.jobDestination[jobCount]))
488 gutsche 1.38 # increase counter
489     jobCount = jobCount + 1
490     totalEventCount = totalEventCount + eventsPerJobRequested
491     eventsRemaining = eventsRemaining - eventsPerJobRequested
492     # calculate skip events for last file
493     # use filesEventCount (contains several files), jobSkipEventCount and eventsPerJobRequest
494     jobSkipEventCount = eventsPerJobRequested - (filesEventCount - jobSkipEventCount - self.eventsbyfile[file])
495     # remove all but the last file
496     filesEventCount = self.eventsbyfile[file]
497     parString = "\\{"
498     parString += '\\\"' + file + '\\\"\,'
499 slacapra 1.41 pass # END if
500 gutsche 1.35 pass # END while (iterate over files in the block)
501     pass # END while (iterate over blocks in the dataset)
502 slacapra 1.41 self.ncjobs = self.total_number_of_jobs = jobCount
503 gutsche 1.38 if (eventsRemaining > 0 and jobCount < totalNumberOfJobs ):
504 gutsche 1.35 common.logger.message("Could not run on all requested events because some blocks not hosted at allowed sites.")
505 mkirn 1.37 common.logger.message("\n"+str(jobCount)+" job(s) can run on "+str(totalEventCount)+" events.\n")
506 slacapra 1.22
507 slacapra 1.9 self.list_of_args = list_of_lists
508     return
509    
510 slacapra 1.21 def jobSplittingNoInput(self):
511 slacapra 1.9 """
512     Perform job splitting based on number of event per job
513     """
514     common.logger.debug(5,'Splitting per events')
515     common.logger.message('Required '+str(self.eventsPerJob)+' events per job ')
516 slacapra 1.22 common.logger.message('Required '+str(self.theNumberOfJobs)+' jobs in total ')
517 slacapra 1.9 common.logger.message('Required '+str(self.total_number_of_events)+' events in total ')
518    
519 slacapra 1.10 if (self.total_number_of_events < 0):
520     msg='Cannot split jobs per Events with "-1" as total number of events'
521     raise CrabException(msg)
522    
523 slacapra 1.22 if (self.selectEventsPerJob):
524     self.total_number_of_jobs = int(self.total_number_of_events/self.eventsPerJob)
525     elif (self.selectNumberOfJobs) :
526     self.total_number_of_jobs = self.theNumberOfJobs
527     self.eventsPerJob = int(self.total_number_of_events/self.total_number_of_jobs)
528 fanzago 1.12
529 slacapra 1.9 common.logger.debug(5,'N jobs '+str(self.total_number_of_jobs))
530    
531     # is there any remainder?
532     check = int(self.total_number_of_events) - (int(self.total_number_of_jobs)*self.eventsPerJob)
533    
534     common.logger.debug(5,'Check '+str(check))
535    
536 gutsche 1.35 common.logger.message(str(self.total_number_of_jobs)+' jobs can be created, each for '+str(self.eventsPerJob)+' for a total of '+str(self.total_number_of_jobs*self.eventsPerJob)+' events')
537 slacapra 1.9 if check > 0:
538 gutsche 1.35 common.logger.message('Warning: asked '+str(self.total_number_of_events)+' but can do only '+str(int(self.total_number_of_jobs)*self.eventsPerJob))
539 slacapra 1.9
540 slacapra 1.10 # argument is seed number.$i
541 slacapra 1.9 self.list_of_args = []
542     for i in range(self.total_number_of_jobs):
543 gutsche 1.35 ## Since there is no input, any site is good
544 spiga 1.42 # self.jobDestination.append(["Any"])
545     self.jobDestination.append([""]) #must be empty to write correctly the xml
546 spiga 1.57 args=''
547     if (self.firstRun):
548     ## pythia first run
549     #self.list_of_args.append([(str(self.firstRun)+str(i))])
550     args=args+(str(self.firstRun)+str(i))
551     else:
552     ## no first run
553     #self.list_of_args.append([str(i)])
554     args=args+str(i)
555 slacapra 1.23 if (self.sourceSeed):
556 slacapra 1.28 if (self.sourceSeedVtx):
557     ## pythia + vtx random seed
558 spiga 1.57 #self.list_of_args.append([
559     # str(self.sourceSeed)+str(i),
560     # str(self.sourceSeedVtx)+str(i)
561     # ])
562     args=args+str(',')+str(self.sourceSeed)+str(i)+str(',')+str(self.sourceSeedVtx)+str(i)
563 slacapra 1.28 else:
564     ## only pythia random seed
565 spiga 1.57 #self.list_of_args.append([(str(self.sourceSeed)+str(i))])
566     args=args +str(',')+str(self.sourceSeed)+str(i)
567 slacapra 1.23 else:
568 slacapra 1.28 ## no random seed
569 spiga 1.57 if str(args)=='': args=args+(str(self.firstRun)+str(i))
570     arguments=args.split(',')
571     if len(arguments)==3:self.list_of_args.append([str(arguments[0]),str(arguments[1]),str(arguments[2])])
572     elif len(arguments)==2:self.list_of_args.append([str(arguments[0]),str(arguments[1])])
573     else :self.list_of_args.append([str(arguments[0])])
574    
575     # print self.list_of_args
576 gutsche 1.3
577     return
578    
579 spiga 1.42
580     def jobSplittingForScript(self):#CarlosDaniele
581     """
582     Perform job splitting based on number of job
583     """
584     common.logger.debug(5,'Splitting per job')
585     common.logger.message('Required '+str(self.theNumberOfJobs)+' jobs in total ')
586    
587     self.total_number_of_jobs = self.theNumberOfJobs
588    
589     common.logger.debug(5,'N jobs '+str(self.total_number_of_jobs))
590    
591     common.logger.message(str(self.total_number_of_jobs)+' jobs can be created')
592    
593     # argument is seed number.$i
594     self.list_of_args = []
595     for i in range(self.total_number_of_jobs):
596     ## Since there is no input, any site is good
597     # self.jobDestination.append(["Any"])
598     self.jobDestination.append([""])
599     ## no random seed
600     self.list_of_args.append([str(i)])
601     return
602    
603 gutsche 1.3 def split(self, jobParams):
604    
605     common.jobDB.load()
606     #### Fabio
607     njobs = self.total_number_of_jobs
608 slacapra 1.9 arglist = self.list_of_args
609 gutsche 1.3 # create the empty structure
610     for i in range(njobs):
611     jobParams.append("")
612    
613     for job in range(njobs):
614 slacapra 1.17 jobParams[job] = arglist[job]
615     # print str(arglist[job])
616     # print jobParams[job]
617 gutsche 1.3 common.jobDB.setArguments(job, jobParams[job])
618 gutsche 1.35 common.logger.debug(5,"Job "+str(job)+" Destination: "+str(self.jobDestination[job]))
619     common.jobDB.setDestination(job, self.jobDestination[job])
620 gutsche 1.3
621     common.jobDB.save()
622     return
623    
624     def getJobTypeArguments(self, nj, sched):
625 slacapra 1.17 result = ''
626     for i in common.jobDB.arguments(nj):
627     result=result+str(i)+" "
628     return result
629 gutsche 1.3
630     def numberOfJobs(self):
631     # Fabio
632     return self.total_number_of_jobs
633    
634 slacapra 1.1 def getTarBall(self, exe):
635     """
636     Return the TarBall with lib and exe
637     """
638    
639     # if it exist, just return it
640 corvo 1.56 #
641     # Marco. Let's start to use relative path for Boss XML files
642     #
643     self.tgzNameWithPath = common.work_space.pathForTgz()+'share/'+self.tgz_name
644 slacapra 1.1 if os.path.exists(self.tgzNameWithPath):
645     return self.tgzNameWithPath
646    
647     # Prepare a tar gzipped file with user binaries.
648     self.buildTar_(exe)
649    
650     return string.strip(self.tgzNameWithPath)
651    
652     def buildTar_(self, executable):
653    
654     # First of all declare the user Scram area
655     swArea = self.scram.getSWArea_()
656     #print "swArea = ", swArea
657     swVersion = self.scram.getSWVersion()
658     #print "swVersion = ", swVersion
659     swReleaseTop = self.scram.getReleaseTop_()
660     #print "swReleaseTop = ", swReleaseTop
661    
662     ## check if working area is release top
663     if swReleaseTop == '' or swArea == swReleaseTop:
664     return
665    
666     filesToBeTarred = []
667     ## First find the executable
668     if (self.executable != ''):
669     exeWithPath = self.scram.findFile_(executable)
670     # print exeWithPath
671     if ( not exeWithPath ):
672     raise CrabException('User executable '+executable+' not found')
673    
674     ## then check if it's private or not
675     if exeWithPath.find(swReleaseTop) == -1:
676     # the exe is private, so we must ship
677     common.logger.debug(5,"Exe "+exeWithPath+" to be tarred")
678     path = swArea+'/'
679     exe = string.replace(exeWithPath, path,'')
680     filesToBeTarred.append(exe)
681     pass
682     else:
683     # the exe is from release, we'll find it on WN
684     pass
685    
686     ## Now get the libraries: only those in local working area
687     libDir = 'lib'
688     lib = swArea+'/' +libDir
689     common.logger.debug(5,"lib "+lib+" to be tarred")
690     if os.path.exists(lib):
691     filesToBeTarred.append(libDir)
692    
693 gutsche 1.3 ## Now check if module dir is present
694     moduleDir = 'module'
695 corvo 1.56 module = swArea + '/' + moduleDir
696     if os.path.isdir(module):
697 gutsche 1.3 filesToBeTarred.append(moduleDir)
698    
699 slacapra 1.1 ## Now check if the Data dir is present
700     dataDir = 'src/Data/'
701 corvo 1.56 data = swArea + '/' + dataDir
702     if os.path.isdir(data):
703 slacapra 1.1 filesToBeTarred.append(dataDir)
704 gutsche 1.50
705     ## copy ProdAgent dir to swArea
706     cmd = '\cp -rf ' + os.environ['CRABDIR'] + '/ProdAgentApi ' + swArea
707     cmd_out = runCommand(cmd)
708     if cmd_out != '':
709     common.logger.message('ProdAgentApi directory could not be copied to local CMSSW project directory.')
710     common.logger.message('No FrameworkJobreport parsing is possible on the WorkerNode.')
711    
712     ## Now check if the Data dir is present
713     paDir = 'ProdAgentApi'
714 corvo 1.56 pa = swArea + '/' + 'ProdAgentApi'
715     if os.path.isdir(pa):
716 gutsche 1.50 filesToBeTarred.append(paDir)
717 corvo 1.56
718     for file in ['report.py', 'DashboardAPI.py', 'Logger.py', 'ProcInfo.py', 'apmon.py', 'parseCrabFjr.py']:
719     shutil.copyfile(os.environ['CRABDIR']+'/python/'+file, swArea+'/'+file)
720     filesToBeTarred.append(file)
721 gutsche 1.50
722 slacapra 1.1 ## Create the tar-ball
723     if len(filesToBeTarred)>0:
724 corvo 1.56 tarcmd = 'tar zcvf ' + self.tgzNameWithPath + ' -C ' + swArea + ' '
725 slacapra 1.1 for line in filesToBeTarred:
726     tarcmd = tarcmd + line + ' '
727     cout = runCommand(tarcmd)
728     if not cout:
729     raise CrabException('Could not create tar-ball')
730     else:
731     common.logger.debug(5,"No files to be to be tarred")
732    
733     return
734    
735     def wsSetupEnvironment(self, nj):
736     """
737     Returns part of a job script which prepares
738     the execution environment for the job 'nj'.
739     """
740     # Prepare JobType-independent part
741 gutsche 1.3 txt = ''
742    
743     ## OLI_Daniele at this level middleware already known
744    
745     txt += 'if [ $middleware == LCG ]; then \n'
746     txt += self.wsSetupCMSLCGEnvironment_()
747     txt += 'elif [ $middleware == OSG ]; then\n'
748 gutsche 1.43 txt += ' WORKING_DIR=`/bin/mktemp -d $OSG_WN_TMP/cms_XXXXXXXXXXXX`\n'
749     txt += ' echo "Created working directory: $WORKING_DIR"\n'
750 gutsche 1.3 txt += ' if [ ! -d $WORKING_DIR ] ;then\n'
751 gutsche 1.7 txt += ' echo "SET_CMS_ENV 10016 ==> OSG $WORKING_DIR could not be created on WN `hostname`"\n'
752     txt += ' echo "JOB_EXIT_STATUS = 10016"\n'
753     txt += ' echo "JobExitCode=10016" | tee -a $RUNTIME_AREA/$repo\n'
754     txt += ' dumpStatus $RUNTIME_AREA/$repo\n'
755 gutsche 1.13 txt += ' rm -f $RUNTIME_AREA/$repo \n'
756     txt += ' echo "MonitorJobID=`echo $MonitorJobID`" | tee -a $RUNTIME_AREA/$repo \n'
757     txt += ' echo "MonitorID=`echo $MonitorID`" | tee -a $RUNTIME_AREA/$repo\n'
758 gutsche 1.3 txt += ' exit 1\n'
759     txt += ' fi\n'
760     txt += '\n'
761     txt += ' echo "Change to working directory: $WORKING_DIR"\n'
762     txt += ' cd $WORKING_DIR\n'
763     txt += self.wsSetupCMSOSGEnvironment_()
764     txt += 'fi\n'
765 slacapra 1.1
766     # Prepare JobType-specific part
767     scram = self.scram.commandName()
768     txt += '\n\n'
769     txt += 'echo "### SPECIFIC JOB SETUP ENVIRONMENT ###"\n'
770     txt += scram+' project CMSSW '+self.version+'\n'
771     txt += 'status=$?\n'
772     txt += 'if [ $status != 0 ] ; then\n'
773 gutsche 1.7 txt += ' echo "SET_EXE_ENV 10034 ==>ERROR CMSSW '+self.version+' not found on `hostname`" \n'
774 gutsche 1.3 txt += ' echo "JOB_EXIT_STATUS = 10034"\n'
775 gutsche 1.7 txt += ' echo "JobExitCode=10034" | tee -a $RUNTIME_AREA/$repo\n'
776 slacapra 1.1 txt += ' dumpStatus $RUNTIME_AREA/$repo\n'
777 gutsche 1.13 txt += ' rm -f $RUNTIME_AREA/$repo \n'
778     txt += ' echo "MonitorJobID=`echo $MonitorJobID`" | tee -a $RUNTIME_AREA/$repo \n'
779     txt += ' echo "MonitorID=`echo $MonitorID`" | tee -a $RUNTIME_AREA/$repo\n'
780 gutsche 1.3 ## OLI_Daniele
781     txt += ' if [ $middleware == OSG ]; then \n'
782     txt += ' echo "Remove working directory: $WORKING_DIR"\n'
783     txt += ' cd $RUNTIME_AREA\n'
784     txt += ' /bin/rm -rf $WORKING_DIR\n'
785     txt += ' if [ -d $WORKING_DIR ] ;then\n'
786 gutsche 1.7 txt += ' echo "SET_CMS_ENV 10018 ==> OSG $WORKING_DIR could not be deleted on WN `hostname` after CMSSW CMSSW_0_6_1 not found on `hostname`"\n'
787     txt += ' echo "JOB_EXIT_STATUS = 10018"\n'
788     txt += ' echo "JobExitCode=10018" | tee -a $RUNTIME_AREA/$repo\n'
789     txt += ' dumpStatus $RUNTIME_AREA/$repo\n'
790 gutsche 1.13 txt += ' rm -f $RUNTIME_AREA/$repo \n'
791     txt += ' echo "MonitorJobID=`echo $MonitorJobID`" | tee -a $RUNTIME_AREA/$repo \n'
792     txt += ' echo "MonitorID=`echo $MonitorID`" | tee -a $RUNTIME_AREA/$repo\n'
793 gutsche 1.3 txt += ' fi\n'
794     txt += ' fi \n'
795     txt += ' exit 1 \n'
796 slacapra 1.1 txt += 'fi \n'
797     txt += 'echo "CMSSW_VERSION = '+self.version+'"\n'
798     txt += 'cd '+self.version+'\n'
799     ### needed grep for bug in scramv1 ###
800     txt += 'eval `'+scram+' runtime -sh | grep -v SCRAMRT_LSB_JOBNAME`\n'
801    
802     # Handle the arguments:
803     txt += "\n"
804 gutsche 1.7 txt += "## number of arguments (first argument always jobnumber)\n"
805 slacapra 1.1 txt += "\n"
806 mkirn 1.32 # txt += "narg=$#\n"
807     txt += "if [ $nargs -lt 2 ]\n"
808 slacapra 1.1 txt += "then\n"
809 mkirn 1.33 txt += " echo 'SET_EXE_ENV 1 ==> ERROR Too few arguments' +$nargs+ \n"
810 gutsche 1.3 txt += ' echo "JOB_EXIT_STATUS = 50113"\n'
811 gutsche 1.7 txt += ' echo "JobExitCode=50113" | tee -a $RUNTIME_AREA/$repo\n'
812 slacapra 1.1 txt += ' dumpStatus $RUNTIME_AREA/$repo\n'
813 gutsche 1.13 txt += ' rm -f $RUNTIME_AREA/$repo \n'
814     txt += ' echo "MonitorJobID=`echo $MonitorJobID`" | tee -a $RUNTIME_AREA/$repo \n'
815     txt += ' echo "MonitorID=`echo $MonitorID`" | tee -a $RUNTIME_AREA/$repo\n'
816 gutsche 1.3 ## OLI_Daniele
817     txt += ' if [ $middleware == OSG ]; then \n'
818     txt += ' echo "Remove working directory: $WORKING_DIR"\n'
819     txt += ' cd $RUNTIME_AREA\n'
820     txt += ' /bin/rm -rf $WORKING_DIR\n'
821     txt += ' if [ -d $WORKING_DIR ] ;then\n'
822 gutsche 1.7 txt += ' echo "SET_EXE_ENV 50114 ==> OSG $WORKING_DIR could not be deleted on WN `hostname` after Too few arguments for CRAB job wrapper"\n'
823     txt += ' echo "JOB_EXIT_STATUS = 50114"\n'
824     txt += ' echo "JobExitCode=50114" | tee -a $RUNTIME_AREA/$repo\n'
825     txt += ' dumpStatus $RUNTIME_AREA/$repo\n'
826 gutsche 1.13 txt += ' rm -f $RUNTIME_AREA/$repo \n'
827     txt += ' echo "MonitorJobID=`echo $MonitorJobID`" | tee -a $RUNTIME_AREA/$repo \n'
828     txt += ' echo "MonitorID=`echo $MonitorID`" | tee -a $RUNTIME_AREA/$repo\n'
829 gutsche 1.3 txt += ' fi\n'
830     txt += ' fi \n'
831 slacapra 1.1 txt += " exit 1\n"
832     txt += "fi\n"
833     txt += "\n"
834    
835     # Prepare job-specific part
836     job = common.job_list[nj]
837 spiga 1.42 if self.pset != None: #CarlosDaniele
838     pset = os.path.basename(job.configFilename())
839     txt += '\n'
840     if (self.datasetPath): # standard job
841     #txt += 'InputFiles=$2\n'
842     txt += 'InputFiles=${args[1]}\n'
843     txt += 'MaxEvents=${args[2]}\n'
844     txt += 'SkipEvents=${args[3]}\n'
845     txt += 'echo "Inputfiles:<$InputFiles>"\n'
846     txt += 'sed "s#{\'INPUT\'}#$InputFiles#" $RUNTIME_AREA/'+pset+' > pset_tmp_1.cfg\n'
847     txt += 'echo "MaxEvents:<$MaxEvents>"\n'
848 gutsche 1.46 txt += 'sed "s#INPUTMAXEVENTS#$MaxEvents#" pset_tmp_1.cfg > pset_tmp_2.cfg\n'
849 spiga 1.42 txt += 'echo "SkipEvents:<$SkipEvents>"\n'
850 gutsche 1.46 txt += 'sed "s#INPUTSKIPEVENTS#$SkipEvents#" pset_tmp_2.cfg > pset.cfg\n'
851 spiga 1.42 else: # pythia like job
852     if (self.sourceSeed):
853 spiga 1.57 txt += 'FirstRun=${args[1]}\n'
854     txt += 'echo "FirstRun: <$FirstRun>"\n'
855     txt += 'sed "s#\<INPUTFIRSTRUN\>#$FirstRun#" $RUNTIME_AREA/'+pset+' > tmp_1.cfg\n'
856     else:
857     txt += '# Copy untouched pset\n'
858     txt += 'cp $RUNTIME_AREA/'+pset+' tmp_1.cfg\n'
859     if (self.sourceSeed):
860 spiga 1.42 # txt += 'Seed=$2\n'
861 spiga 1.57 txt += 'Seed=${args[2]}\n'
862 spiga 1.42 txt += 'echo "Seed: <$Seed>"\n'
863 spiga 1.57 txt += 'sed "s#\<INPUT\>#$Seed#" tmp_1.cfg > tmp_2.cfg\n'
864 spiga 1.42 if (self.sourceSeedVtx):
865     # txt += 'VtxSeed=$3\n'
866 spiga 1.57 txt += 'VtxSeed=${args[3]}\n'
867 spiga 1.42 txt += 'echo "VtxSeed: <$VtxSeed>"\n'
868 spiga 1.57 txt += 'sed "s#INPUTVTX#$VtxSeed#" tmp_2.cfg > pset.cfg\n'
869 spiga 1.42 else:
870 spiga 1.57 txt += 'mv tmp_2.cfg pset.cfg\n'
871 slacapra 1.28 else:
872 spiga 1.57 txt += 'mv tmp_1.cfg pset.cfg\n'
873     # txt += '# Copy untouched pset\n'
874     # txt += 'cp $RUNTIME_AREA/'+pset+' pset.cfg\n'
875 slacapra 1.24
876 slacapra 1.1
877     if len(self.additional_inbox_files) > 0:
878     for file in self.additional_inbox_files:
879 mkirn 1.31 relFile = file.split("/")[-1]
880     txt += 'if [ -e $RUNTIME_AREA/'+relFile+' ] ; then\n'
881     txt += ' cp $RUNTIME_AREA/'+relFile+' .\n'
882     txt += ' chmod +x '+relFile+'\n'
883 slacapra 1.1 txt += 'fi\n'
884     pass
885    
886 spiga 1.42 if self.pset != None: #CarlosDaniele
887     txt += 'echo "### END JOB SETUP ENVIRONMENT ###"\n\n'
888    
889     txt += '\n'
890     txt += 'echo "***** cat pset.cfg *********"\n'
891     txt += 'cat pset.cfg\n'
892     txt += 'echo "****** end pset.cfg ********"\n'
893     txt += '\n'
894     # txt += 'echo "***** cat pset1.cfg *********"\n'
895     # txt += 'cat pset1.cfg\n'
896     # txt += 'echo "****** end pset1.cfg ********"\n'
897 gutsche 1.3 return txt
898    
899     def wsBuildExe(self, nj):
900     """
901     Put in the script the commands to build an executable
902     or a library.
903     """
904    
905     txt = ""
906    
907     if os.path.isfile(self.tgzNameWithPath):
908     txt += 'echo "tar xzvf $RUNTIME_AREA/'+os.path.basename(self.tgzNameWithPath)+'"\n'
909     txt += 'tar xzvf $RUNTIME_AREA/'+os.path.basename(self.tgzNameWithPath)+'\n'
910     txt += 'untar_status=$? \n'
911     txt += 'if [ $untar_status -ne 0 ]; then \n'
912     txt += ' echo "SET_EXE 1 ==> ERROR Untarring .tgz file failed"\n'
913     txt += ' echo "JOB_EXIT_STATUS = $untar_status" \n'
914 gutsche 1.7 txt += ' echo "JobExitCode=$untar_status" | tee -a $RUNTIME_AREA/$repo\n'
915 gutsche 1.3 txt += ' if [ $middleware == OSG ]; then \n'
916     txt += ' echo "Remove working directory: $WORKING_DIR"\n'
917     txt += ' cd $RUNTIME_AREA\n'
918     txt += ' /bin/rm -rf $WORKING_DIR\n'
919     txt += ' if [ -d $WORKING_DIR ] ;then\n'
920 gutsche 1.13 txt += ' echo "SET_EXE 50999 ==> OSG $WORKING_DIR could not be deleted on WN `hostname` after Untarring .tgz file failed"\n'
921     txt += ' echo "JOB_EXIT_STATUS = 50999"\n'
922     txt += ' echo "JobExitCode=50999" | tee -a $RUNTIME_AREA/$repo\n'
923     txt += ' dumpStatus $RUNTIME_AREA/$repo\n'
924     txt += ' rm -f $RUNTIME_AREA/$repo \n'
925     txt += ' echo "MonitorJobID=`echo $MonitorJobID`" | tee -a $RUNTIME_AREA/$repo \n'
926     txt += ' echo "MonitorID=`echo $MonitorID`" | tee -a $RUNTIME_AREA/$repo\n'
927 gutsche 1.3 txt += ' fi\n'
928     txt += ' fi \n'
929     txt += ' \n'
930 gutsche 1.7 txt += ' exit 1 \n'
931 gutsche 1.3 txt += 'else \n'
932     txt += ' echo "Successful untar" \n'
933     txt += 'fi \n'
934 gutsche 1.50 txt += '\n'
935     txt += 'echo "Include ProdAgentApi in PYTHONPATH"\n'
936     txt += 'if [ -z "$PYTHONPATH" ]; then\n'
937     txt += ' export PYTHONPATH=ProdAgentApi\n'
938     txt += 'else\n'
939     txt += ' export PYTHONPATH=ProdAgentApi:${PYTHONPATH}\n'
940     txt += 'fi\n'
941     txt += '\n'
942    
943 gutsche 1.3 pass
944    
945 slacapra 1.1 return txt
946    
947     def modifySteeringCards(self, nj):
948     """
949     modify the card provided by the user,
950     writing a new card into share dir
951     """
952    
953     def executableName(self):
954 spiga 1.42 if self.pset == None: #CarlosDaniele
955     return "sh "
956     else:
957     return self.executable
958 slacapra 1.1
959     def executableArgs(self):
960 spiga 1.42 if self.pset == None:#CarlosDaniele
961     return self.scriptExe + " $NJob"
962     else:
963     return " -p pset.cfg"
964 slacapra 1.1
965     def inputSandbox(self, nj):
966     """
967     Returns a list of filenames to be put in JDL input sandbox.
968     """
969     inp_box = []
970 slacapra 1.53 # # dict added to delete duplicate from input sandbox file list
971     # seen = {}
972 slacapra 1.1 ## code
973     if os.path.isfile(self.tgzNameWithPath):
974     inp_box.append(self.tgzNameWithPath)
975     ## config
976 spiga 1.42 if not self.pset is None: #CarlosDaniele
977 corvo 1.56 inp_box.append(common.work_space.pathForTgz() + 'job/' + self.configFilename())
978 slacapra 1.1 ## additional input files
979 gutsche 1.3 #for file in self.additional_inbox_files:
980     # inp_box.append(common.work_space.cwdDir()+file)
981 slacapra 1.1 return inp_box
982    
983     def outputSandbox(self, nj):
984     """
985     Returns a list of filenames to be put in JDL output sandbox.
986     """
987     out_box = []
988    
989     ## User Declared output files
990 slacapra 1.54 for out in (self.output_file+self.output_file_sandbox):
991 slacapra 1.1 n_out = nj + 1
992     out_box.append(self.numberFile_(out,str(n_out)))
993     return out_box
994    
995     def prepareSteeringCards(self):
996     """
997     Make initial modifications of the user's steering card file.
998     """
999     return
1000    
1001     def wsRenameOutput(self, nj):
1002     """
1003     Returns part of a job script which renames the produced files.
1004     """
1005    
1006     txt = '\n'
1007 gutsche 1.7 txt += '# directory content\n'
1008     txt += 'ls \n'
1009 slacapra 1.54
1010     for fileWithSuffix in (self.output_file+self.output_file_sandbox):
1011 slacapra 1.1 output_file_num = self.numberFile_(fileWithSuffix, '$NJob')
1012     txt += '\n'
1013 gutsche 1.7 txt += '# check output file\n'
1014 slacapra 1.1 txt += 'ls '+fileWithSuffix+'\n'
1015 fanzago 1.18 txt += 'ls_result=$?\n'
1016     txt += 'if [ $ls_result -ne 0 ] ; then\n'
1017     txt += ' echo "ERROR: Problem with output file"\n'
1018 gutsche 1.7 if common.scheduler.boss_scheduler_name == 'condor_g':
1019     txt += ' if [ $middleware == OSG ]; then \n'
1020     txt += ' echo "prepare dummy output file"\n'
1021     txt += ' echo "Processing of job output failed" > $RUNTIME_AREA/'+output_file_num+'\n'
1022     txt += ' fi \n'
1023 slacapra 1.1 txt += 'else\n'
1024     txt += ' cp '+fileWithSuffix+' $RUNTIME_AREA/'+output_file_num+'\n'
1025     txt += 'fi\n'
1026    
1027 gutsche 1.7 txt += 'cd $RUNTIME_AREA\n'
1028 fanzago 1.18 txt += 'cd $RUNTIME_AREA\n'
1029 gutsche 1.3 ### OLI_DANIELE
1030     txt += 'if [ $middleware == OSG ]; then\n'
1031     txt += ' cd $RUNTIME_AREA\n'
1032     txt += ' echo "Remove working directory: $WORKING_DIR"\n'
1033     txt += ' /bin/rm -rf $WORKING_DIR\n'
1034     txt += ' if [ -d $WORKING_DIR ] ;then\n'
1035 gutsche 1.7 txt += ' echo "SET_EXE 60999 ==> OSG $WORKING_DIR could not be deleted on WN `hostname` after cleanup of WN"\n'
1036     txt += ' echo "JOB_EXIT_STATUS = 60999"\n'
1037     txt += ' echo "JobExitCode=60999" | tee -a $RUNTIME_AREA/$repo\n'
1038     txt += ' dumpStatus $RUNTIME_AREA/$repo\n'
1039 gutsche 1.13 txt += ' rm -f $RUNTIME_AREA/$repo \n'
1040     txt += ' echo "MonitorJobID=`echo $MonitorJobID`" | tee -a $RUNTIME_AREA/$repo \n'
1041     txt += ' echo "MonitorID=`echo $MonitorID`" | tee -a $RUNTIME_AREA/$repo\n'
1042 gutsche 1.3 txt += ' fi\n'
1043     txt += 'fi\n'
1044     txt += '\n'
1045 slacapra 1.54
1046     file_list = ''
1047     ## Add to filelist only files to be possibly copied to SE
1048     for fileWithSuffix in self.output_file:
1049     output_file_num = self.numberFile_(fileWithSuffix, '$NJob')
1050     file_list=file_list+output_file_num+' '
1051     file_list=file_list[:-1]
1052     txt += 'file_list="'+file_list+'"\n'
1053    
1054 slacapra 1.1 return txt
1055    
1056     def numberFile_(self, file, txt):
1057     """
1058     append _'txt' before last extension of a file
1059     """
1060     p = string.split(file,".")
1061     # take away last extension
1062     name = p[0]
1063     for x in p[1:-1]:
1064     name=name+"."+x
1065     # add "_txt"
1066     if len(p)>1:
1067     ext = p[len(p)-1]
1068     result = name + '_' + txt + "." + ext
1069     else:
1070     result = name + '_' + txt
1071    
1072     return result
1073    
1074 slacapra 1.41 def getRequirements(self):
1075 slacapra 1.1 """
1076     return job requirements to add to jdl files
1077     """
1078     req = ''
1079 slacapra 1.47 if self.version:
1080 slacapra 1.10 req='Member("VO-cms-' + \
1081 slacapra 1.47 self.version + \
1082 slacapra 1.10 '", other.GlueHostApplicationSoftwareRunTimeEnvironment)'
1083 gutsche 1.35
1084     req = req + ' && (other.GlueHostNetworkAdapterOutboundIP)'
1085    
1086 slacapra 1.1 return req
1087 gutsche 1.3
1088     def configFilename(self):
1089     """ return the config filename """
1090     return self.name()+'.cfg'
1091    
1092     ### OLI_DANIELE
1093     def wsSetupCMSOSGEnvironment_(self):
1094     """
1095     Returns part of a job script which is prepares
1096     the execution environment and which is common for all CMS jobs.
1097     """
1098     txt = '\n'
1099     txt += ' echo "### SETUP CMS OSG ENVIRONMENT ###"\n'
1100     txt += ' if [ -f $GRID3_APP_DIR/cmssoft/cmsset_default.sh ] ;then\n'
1101     txt += ' # Use $GRID3_APP_DIR/cmssoft/cmsset_default.sh to setup cms software\n'
1102     txt += ' source $GRID3_APP_DIR/cmssoft/cmsset_default.sh '+self.version+'\n'
1103 mkirn 1.40 txt += ' elif [ -f $OSG_APP/cmssoft/cms/cmsset_default.sh ] ;then\n'
1104     txt += ' # Use $OSG_APP/cmssoft/cms/cmsset_default.sh to setup cms software\n'
1105     txt += ' source $OSG_APP/cmssoft/cms/cmsset_default.sh '+self.version+'\n'
1106 gutsche 1.3 txt += ' else\n'
1107 mkirn 1.40 txt += ' echo "SET_CMS_ENV 10020 ==> ERROR $GRID3_APP_DIR/cmssoft/cmsset_default.sh and $OSG_APP/cmssoft/cms/cmsset_default.sh file not found"\n'
1108 gutsche 1.3 txt += ' echo "JOB_EXIT_STATUS = 10020"\n'
1109     txt += ' echo "JobExitCode=10020" | tee -a $RUNTIME_AREA/$repo\n'
1110     txt += ' dumpStatus $RUNTIME_AREA/$repo\n'
1111 gutsche 1.13 txt += ' rm -f $RUNTIME_AREA/$repo \n'
1112     txt += ' echo "MonitorJobID=`echo $MonitorJobID`" | tee -a $RUNTIME_AREA/$repo \n'
1113     txt += ' echo "MonitorID=`echo $MonitorID`" | tee -a $RUNTIME_AREA/$repo\n'
1114 gutsche 1.7 txt += ' exit 1\n'
1115 gutsche 1.3 txt += '\n'
1116     txt += ' echo "Remove working directory: $WORKING_DIR"\n'
1117     txt += ' cd $RUNTIME_AREA\n'
1118     txt += ' /bin/rm -rf $WORKING_DIR\n'
1119     txt += ' if [ -d $WORKING_DIR ] ;then\n'
1120 mkirn 1.40 txt += ' echo "SET_CMS_ENV 10017 ==> OSG $WORKING_DIR could not be deleted on WN `hostname` after $GRID3_APP_DIR/cmssoft/cmsset_default.sh and $OSG_APP/cmssoft/cms/cmsset_default.sh file not found"\n'
1121 gutsche 1.7 txt += ' echo "JOB_EXIT_STATUS = 10017"\n'
1122     txt += ' echo "JobExitCode=10017" | tee -a $RUNTIME_AREA/$repo\n'
1123     txt += ' dumpStatus $RUNTIME_AREA/$repo\n'
1124 gutsche 1.13 txt += ' rm -f $RUNTIME_AREA/$repo \n'
1125     txt += ' echo "MonitorJobID=`echo $MonitorJobID`" | tee -a $RUNTIME_AREA/$repo \n'
1126     txt += ' echo "MonitorID=`echo $MonitorID`" | tee -a $RUNTIME_AREA/$repo\n'
1127 gutsche 1.3 txt += ' fi\n'
1128     txt += '\n'
1129 gutsche 1.7 txt += ' exit 1\n'
1130 gutsche 1.3 txt += ' fi\n'
1131     txt += '\n'
1132     txt += ' echo "SET_CMS_ENV 0 ==> setup cms environment ok"\n'
1133     txt += ' echo " END SETUP CMS OSG ENVIRONMENT "\n'
1134    
1135     return txt
1136    
1137     ### OLI_DANIELE
1138     def wsSetupCMSLCGEnvironment_(self):
1139     """
1140     Returns part of a job script which is prepares
1141     the execution environment and which is common for all CMS jobs.
1142     """
1143     txt = ' \n'
1144     txt += ' echo " ### SETUP CMS LCG ENVIRONMENT ### "\n'
1145     txt += ' if [ ! $VO_CMS_SW_DIR ] ;then\n'
1146     txt += ' echo "SET_CMS_ENV 10031 ==> ERROR CMS software dir not found on WN `hostname`"\n'
1147     txt += ' echo "JOB_EXIT_STATUS = 10031" \n'
1148     txt += ' echo "JobExitCode=10031" | tee -a $RUNTIME_AREA/$repo\n'
1149     txt += ' dumpStatus $RUNTIME_AREA/$repo\n'
1150 gutsche 1.13 txt += ' rm -f $RUNTIME_AREA/$repo \n'
1151     txt += ' echo "MonitorJobID=`echo $MonitorJobID`" | tee -a $RUNTIME_AREA/$repo \n'
1152     txt += ' echo "MonitorID=`echo $MonitorID`" | tee -a $RUNTIME_AREA/$repo\n'
1153 gutsche 1.7 txt += ' exit 1\n'
1154 gutsche 1.3 txt += ' else\n'
1155     txt += ' echo "Sourcing environment... "\n'
1156     txt += ' if [ ! -s $VO_CMS_SW_DIR/cmsset_default.sh ] ;then\n'
1157     txt += ' echo "SET_CMS_ENV 10020 ==> ERROR cmsset_default.sh file not found into dir $VO_CMS_SW_DIR"\n'
1158     txt += ' echo "JOB_EXIT_STATUS = 10020"\n'
1159     txt += ' echo "JobExitCode=10020" | tee -a $RUNTIME_AREA/$repo\n'
1160     txt += ' dumpStatus $RUNTIME_AREA/$repo\n'
1161 gutsche 1.13 txt += ' rm -f $RUNTIME_AREA/$repo \n'
1162     txt += ' echo "MonitorJobID=`echo $MonitorJobID`" | tee -a $RUNTIME_AREA/$repo \n'
1163     txt += ' echo "MonitorID=`echo $MonitorID`" | tee -a $RUNTIME_AREA/$repo\n'
1164 gutsche 1.7 txt += ' exit 1\n'
1165 gutsche 1.3 txt += ' fi\n'
1166     txt += ' echo "sourcing $VO_CMS_SW_DIR/cmsset_default.sh"\n'
1167     txt += ' source $VO_CMS_SW_DIR/cmsset_default.sh\n'
1168     txt += ' result=$?\n'
1169     txt += ' if [ $result -ne 0 ]; then\n'
1170     txt += ' echo "SET_CMS_ENV 10032 ==> ERROR problem sourcing $VO_CMS_SW_DIR/cmsset_default.sh"\n'
1171     txt += ' echo "JOB_EXIT_STATUS = 10032"\n'
1172     txt += ' echo "JobExitCode=10032" | tee -a $RUNTIME_AREA/$repo\n'
1173     txt += ' dumpStatus $RUNTIME_AREA/$repo\n'
1174 gutsche 1.13 txt += ' rm -f $RUNTIME_AREA/$repo \n'
1175     txt += ' echo "MonitorJobID=`echo $MonitorJobID`" | tee -a $RUNTIME_AREA/$repo \n'
1176     txt += ' echo "MonitorID=`echo $MonitorID`" | tee -a $RUNTIME_AREA/$repo\n'
1177 gutsche 1.7 txt += ' exit 1\n'
1178 gutsche 1.3 txt += ' fi\n'
1179     txt += ' fi\n'
1180     txt += ' \n'
1181     txt += ' string=`cat /etc/redhat-release`\n'
1182     txt += ' echo $string\n'
1183     txt += ' if [[ $string = *alhalla* ]]; then\n'
1184     txt += ' echo "SCRAM_ARCH= $SCRAM_ARCH"\n'
1185     txt += ' elif [[ $string = *Enterprise* ]] || [[ $string = *cientific* ]]; then\n'
1186     txt += ' export SCRAM_ARCH=slc3_ia32_gcc323\n'
1187     txt += ' echo "SCRAM_ARCH= $SCRAM_ARCH"\n'
1188     txt += ' else\n'
1189 gutsche 1.7 txt += ' echo "SET_CMS_ENV 10033 ==> ERROR OS unknown, LCG environment not initialized"\n'
1190 gutsche 1.3 txt += ' echo "JOB_EXIT_STATUS = 10033"\n'
1191     txt += ' echo "JobExitCode=10033" | tee -a $RUNTIME_AREA/$repo\n'
1192     txt += ' dumpStatus $RUNTIME_AREA/$repo\n'
1193 gutsche 1.13 txt += ' rm -f $RUNTIME_AREA/$repo \n'
1194     txt += ' echo "MonitorJobID=`echo $MonitorJobID`" | tee -a $RUNTIME_AREA/$repo \n'
1195     txt += ' echo "MonitorID=`echo $MonitorID`" | tee -a $RUNTIME_AREA/$repo\n'
1196 gutsche 1.7 txt += ' exit 1\n'
1197 gutsche 1.3 txt += ' fi\n'
1198     txt += ' echo "SET_CMS_ENV 0 ==> setup cms environment ok"\n'
1199     txt += ' echo "### END SETUP CMS LCG ENVIRONMENT ###"\n'
1200     return txt
1201 gutsche 1.5
1202     def setParam_(self, param, value):
1203     self._params[param] = value
1204    
1205     def getParams(self):
1206     return self._params
1207 gutsche 1.8
1208     def setTaskid_(self):
1209     self._taskId = self.cfg_params['taskId']
1210    
1211     def getTaskid(self):
1212     return self._taskId
1213 gutsche 1.35
1214     #######################################################################
1215     def uniquelist(self, old):
1216     """
1217     remove duplicates from a list
1218     """
1219     nd={}
1220     for e in old:
1221     nd[e]=0
1222     return nd.keys()