ViewVC Help
View File | Revision Log | Show Annotations | Root Listing
root/cvsroot/COMP/CRAB/python/cms_cmssw.py
Revision: 1.62
Committed: Wed Jan 17 13:37:25 2007 UTC (18 years, 3 months ago) by slacapra
Content type: text/x-python
Branch: MAIN
Changes since 1.61: +0 -7 lines
Log Message:
cleanup

File Contents

# User Rev Content
1 slacapra 1.1 from JobType import JobType
2     from crab_logger import Logger
3     from crab_exceptions import *
4     from crab_util import *
5 slacapra 1.22 import math
6 slacapra 1.1 import common
7 gutsche 1.3 import PsetManipulator
8 slacapra 1.1
9 slacapra 1.41 import DBSInfo
10     import DataDiscovery
11     import DataLocation
12 slacapra 1.1 import Scram
13    
14 corvo 1.56 import glob, os, string, re, shutil
15 slacapra 1.1
16     class Cmssw(JobType):
17 gutsche 1.38 def __init__(self, cfg_params, ncjobs):
18 slacapra 1.1 JobType.__init__(self, 'CMSSW')
19     common.logger.debug(3,'CMSSW::__init__')
20    
21 gutsche 1.3 # Marco.
22     self._params = {}
23     self.cfg_params = cfg_params
24 gutsche 1.38
25 gutsche 1.44 # number of jobs requested to be created, limit obj splitting
26 gutsche 1.38 self.ncjobs = ncjobs
27    
28 slacapra 1.1 log = common.logger
29    
30     self.scram = Scram.Scram(cfg_params)
31     scramArea = ''
32     self.additional_inbox_files = []
33     self.scriptExe = ''
34     self.executable = ''
35     self.tgz_name = 'default.tgz'
36 corvo 1.56 self.scriptName = 'CMSSW.sh'
37 spiga 1.42 self.pset = '' #scrip use case Da
38     self.datasetPath = '' #scrip use case Da
39 gutsche 1.3
40 gutsche 1.50 # set FJR file name
41     self.fjrFileName = 'crab_fjr.xml'
42    
43 slacapra 1.1 self.version = self.scram.getSWVersion()
44 slacapra 1.55 common.taskDB.setDict('codeVersion',self.version)
45 gutsche 1.5 self.setParam_('application', self.version)
46 slacapra 1.47
47 slacapra 1.1 ### collect Data cards
48     try:
49 slacapra 1.9 tmp = cfg_params['CMSSW.datasetpath']
50     log.debug(6, "CMSSW::CMSSW(): datasetPath = "+tmp)
51     if string.lower(tmp)=='none':
52     self.datasetPath = None
53 slacapra 1.21 self.selectNoInput = 1
54 slacapra 1.9 else:
55     self.datasetPath = tmp
56 slacapra 1.21 self.selectNoInput = 0
57 slacapra 1.1 except KeyError:
58 gutsche 1.3 msg = "Error: datasetpath not defined "
59 slacapra 1.1 raise CrabException(msg)
60 gutsche 1.5
61     # ML monitoring
62     # split dataset path style: /PreProdR3Minbias/SIM/GEN-SIM
63 slacapra 1.9 if not self.datasetPath:
64     self.setParam_('dataset', 'None')
65     self.setParam_('owner', 'None')
66     else:
67     datasetpath_split = self.datasetPath.split("/")
68     self.setParam_('dataset', datasetpath_split[1])
69     self.setParam_('owner', datasetpath_split[-1])
70    
71 gutsche 1.8 self.setTaskid_()
72     self.setParam_('taskId', self.cfg_params['taskId'])
73 gutsche 1.5
74 slacapra 1.1 self.dataTiers = []
75    
76     ## now the application
77     try:
78     self.executable = cfg_params['CMSSW.executable']
79 gutsche 1.5 self.setParam_('exe', self.executable)
80 slacapra 1.1 log.debug(6, "CMSSW::CMSSW(): executable = "+self.executable)
81     msg = "Default executable cmsRun overridden. Switch to " + self.executable
82     log.debug(3,msg)
83     except KeyError:
84     self.executable = 'cmsRun'
85 gutsche 1.5 self.setParam_('exe', self.executable)
86 slacapra 1.1 msg = "User executable not defined. Use cmsRun"
87     log.debug(3,msg)
88     pass
89    
90     try:
91     self.pset = cfg_params['CMSSW.pset']
92     log.debug(6, "Cmssw::Cmssw(): PSet file = "+self.pset)
93 spiga 1.42 if self.pset.lower() != 'none' :
94     if (not os.path.exists(self.pset)):
95     raise CrabException("User defined PSet file "+self.pset+" does not exist")
96     else:
97     self.pset = None
98 slacapra 1.1 except KeyError:
99     raise CrabException("PSet file missing. Cannot run cmsRun ")
100    
101     # output files
102 slacapra 1.53 ## stuff which must be returned always via sandbox
103     self.output_file_sandbox = []
104    
105     # add fjr report by default via sandbox
106     self.output_file_sandbox.append(self.fjrFileName)
107    
108     # other output files to be returned via sandbox or copied to SE
109 slacapra 1.1 try:
110     self.output_file = []
111    
112 gutsche 1.50
113 slacapra 1.1 tmp = cfg_params['CMSSW.output_file']
114     if tmp != '':
115     tmpOutFiles = string.split(cfg_params['CMSSW.output_file'],',')
116     log.debug(7, 'cmssw::cmssw(): output files '+str(tmpOutFiles))
117     for tmp in tmpOutFiles:
118     tmp=string.strip(tmp)
119     self.output_file.append(tmp)
120     pass
121     else:
122 gutsche 1.50 log.message("No output file defined: only stdout/err and the CRAB Framework Job Report will be available")
123 slacapra 1.1 pass
124     pass
125     except KeyError:
126 gutsche 1.50 log.message("No output file defined: only stdout/err and the CRAB Framework Job Report will be available")
127 slacapra 1.1 pass
128    
129     # script_exe file as additional file in inputSandbox
130     try:
131 slacapra 1.10 self.scriptExe = cfg_params['USER.script_exe']
132     if self.scriptExe != '':
133     if not os.path.isfile(self.scriptExe):
134     msg ="WARNING. file "+self.scriptExe+" not found"
135     raise CrabException(msg)
136 spiga 1.42 self.additional_inbox_files.append(string.strip(self.scriptExe))
137 slacapra 1.1 except KeyError:
138 spiga 1.42 self.scriptExe = ''
139     #CarlosDaniele
140     if self.datasetPath == None and self.pset == None and self.scriptExe == '' :
141     msg ="WARNING. script_exe not defined"
142     raise CrabException(msg)
143    
144 slacapra 1.1 ## additional input files
145     try:
146 slacapra 1.29 tmpAddFiles = string.split(cfg_params['USER.additional_input_files'],',')
147 slacapra 1.1 for tmp in tmpAddFiles:
148 slacapra 1.51 tmp = string.strip(tmp)
149 slacapra 1.45 dirname = ''
150     if not tmp[0]=="/": dirname = "."
151 corvo 1.58 shutil.copyfile(tmp, common.work_space.shareDir()+tmp)
152     files = common.work_space.pathForTgz()+'share/' + tmp
153     if not os.path.exists(files):
154     raise CrabException("Additional input file not found: "+files)
155 slacapra 1.45 pass
156 corvo 1.58 self.additional_inbox_files.append(string.strip(files))
157 slacapra 1.1 pass
158     pass
159 slacapra 1.45 common.logger.debug(5,"Additional input files: "+str(self.additional_inbox_files))
160 slacapra 1.1 except KeyError:
161     pass
162    
163 slacapra 1.9 # files per job
164 slacapra 1.1 try:
165 gutsche 1.35 if (cfg_params['CMSSW.files_per_jobs']):
166     raise CrabException("files_per_jobs no longer supported. Quitting.")
167 gutsche 1.3 except KeyError:
168 gutsche 1.35 pass
169 gutsche 1.3
170 slacapra 1.9 ## Events per job
171 gutsche 1.3 try:
172 slacapra 1.10 self.eventsPerJob =int( cfg_params['CMSSW.events_per_job'])
173 slacapra 1.9 self.selectEventsPerJob = 1
174 gutsche 1.3 except KeyError:
175 slacapra 1.9 self.eventsPerJob = -1
176     self.selectEventsPerJob = 0
177    
178 slacapra 1.22 ## number of jobs
179     try:
180     self.theNumberOfJobs =int( cfg_params['CMSSW.number_of_jobs'])
181     self.selectNumberOfJobs = 1
182     except KeyError:
183     self.theNumberOfJobs = 0
184     self.selectNumberOfJobs = 0
185 slacapra 1.10
186 gutsche 1.35 try:
187     self.total_number_of_events = int(cfg_params['CMSSW.total_number_of_events'])
188     self.selectTotalNumberEvents = 1
189     except KeyError:
190     self.total_number_of_events = 0
191     self.selectTotalNumberEvents = 0
192    
193 spiga 1.42 if self.pset != None: #CarlosDaniele
194     if ( (self.selectTotalNumberEvents + self.selectEventsPerJob + self.selectNumberOfJobs) != 2 ):
195     msg = 'Must define exactly two of total_number_of_events, events_per_job, or number_of_jobs.'
196     raise CrabException(msg)
197     else:
198     if (self.selectNumberOfJobs == 0):
199     msg = 'Must specify number_of_jobs.'
200     raise CrabException(msg)
201 gutsche 1.35
202 slacapra 1.22 ## source seed for pythia
203     try:
204     self.sourceSeed = int(cfg_params['CMSSW.pythia_seed'])
205     except KeyError:
206 slacapra 1.23 self.sourceSeed = None
207     common.logger.debug(5,"No seed given")
208 slacapra 1.22
209 slacapra 1.28 try:
210     self.sourceSeedVtx = int(cfg_params['CMSSW.vtx_seed'])
211     except KeyError:
212     self.sourceSeedVtx = None
213     common.logger.debug(5,"No vertex seed given")
214 spiga 1.57 try:
215     self.firstRun = int(cfg_params['CMSSW.first_run'])
216     except KeyError:
217     self.firstRun = None
218     common.logger.debug(5,"No first run given")
219 spiga 1.42 if self.pset != None: #CarlosDaniele
220     self.PsetEdit = PsetManipulator.PsetManipulator(self.pset) #Daniele Pset
221 gutsche 1.3
222 slacapra 1.1 #DBSDLS-start
223     ## Initialize the variables that are extracted from DBS/DLS and needed in other places of the code
224     self.maxEvents=0 # max events available ( --> check the requested nb. of evts in Creator.py)
225     self.DBSPaths={} # all dbs paths requested ( --> input to the site local discovery script)
226 gutsche 1.35 self.jobDestination=[] # Site destination(s) for each job (list of lists)
227 slacapra 1.1 ## Perform the data location and discovery (based on DBS/DLS)
228 slacapra 1.9 ## SL: Don't if NONE is specified as input (pythia use case)
229 gutsche 1.35 blockSites = {}
230 slacapra 1.9 if self.datasetPath:
231 gutsche 1.35 blockSites = self.DataDiscoveryAndLocation(cfg_params)
232 slacapra 1.1 #DBSDLS-end
233    
234     self.tgzNameWithPath = self.getTarBall(self.executable)
235 slacapra 1.10
236 slacapra 1.9 ## Select Splitting
237 spiga 1.42 if self.selectNoInput:
238     if self.pset == None: #CarlosDaniele
239     self.jobSplittingForScript()
240     else:
241     self.jobSplittingNoInput()
242 corvo 1.56 else:
243     self.jobSplittingByBlocks(blockSites)
244 gutsche 1.5
245 slacapra 1.22 # modify Pset
246 spiga 1.42 if self.pset != None: #CarlosDaniele
247     try:
248     if (self.datasetPath): # standard job
249     # allow to processa a fraction of events in a file
250     self.PsetEdit.inputModule("INPUT")
251     self.PsetEdit.maxEvent("INPUTMAXEVENTS")
252     self.PsetEdit.skipEvent("INPUTSKIPEVENTS")
253     else: # pythia like job
254     self.PsetEdit.maxEvent(self.eventsPerJob)
255 spiga 1.57 if (self.firstRun):
256     self.PsetEdit.pythiaFirstRun("INPUTFIRSTRUN") #First Run
257 spiga 1.42 if (self.sourceSeed) :
258     self.PsetEdit.pythiaSeed("INPUT")
259     if (self.sourceSeedVtx) :
260     self.PsetEdit.pythiaSeedVtx("INPUTVTX")
261 gutsche 1.50 # add FrameworkJobReport to parameter-set
262     self.PsetEdit.addCrabFJR(self.fjrFileName)
263 spiga 1.42 self.PsetEdit.psetWriter(self.configFilename())
264     except:
265     msg='Error while manipuliating ParameterSet: exiting...'
266     raise CrabException(msg)
267 gutsche 1.3
268 slacapra 1.1 def DataDiscoveryAndLocation(self, cfg_params):
269    
270 gutsche 1.3 common.logger.debug(10,"CMSSW::DataDiscoveryAndLocation()")
271    
272     datasetPath=self.datasetPath
273    
274     ## TODO
275     dataTiersList = ""
276     dataTiers = dataTiersList.split(',')
277 slacapra 1.1
278     ## Contact the DBS
279 slacapra 1.41 common.logger.message("Contacting DBS...")
280 slacapra 1.1 try:
281 slacapra 1.41 self.pubdata=DataDiscovery.DataDiscovery(datasetPath, dataTiers, cfg_params)
282 slacapra 1.1 self.pubdata.fetchDBSInfo()
283    
284 slacapra 1.41 except DataDiscovery.NotExistingDatasetError, ex :
285 slacapra 1.1 msg = 'ERROR ***: failed Data Discovery in DBS : %s'%ex.getErrorMessage()
286     raise CrabException(msg)
287    
288 slacapra 1.41 except DataDiscovery.NoDataTierinProvenanceError, ex :
289 slacapra 1.1 msg = 'ERROR ***: failed Data Discovery in DBS : %s'%ex.getErrorMessage()
290     raise CrabException(msg)
291 slacapra 1.41 except DataDiscovery.DataDiscoveryError, ex:
292 slacapra 1.1 msg = 'ERROR ***: failed Data Discovery in DBS %s'%ex.getErrorMessage()
293     raise CrabException(msg)
294    
295     ## get list of all required data in the form of dbs paths (dbs path = /dataset/datatier/owner)
296 gutsche 1.3 ## self.DBSPaths=self.pubdata.getDBSPaths()
297     common.logger.message("Required data are :"+self.datasetPath)
298    
299 gutsche 1.35 self.filesbyblock=self.pubdata.getFiles()
300 mkirn 1.37 self.eventsbyblock=self.pubdata.getEventsPerBlock()
301     self.eventsbyfile=self.pubdata.getEventsPerFile()
302 slacapra 1.41 # print str(self.filesbyblock)
303     # print 'self.eventsbyfile',len(self.eventsbyfile)
304     # print str(self.eventsbyfile)
305 gutsche 1.3
306 slacapra 1.1 ## get max number of events
307     self.maxEvents=self.pubdata.getMaxEvents() ## self.maxEvents used in Creator.py
308 gutsche 1.44 common.logger.message("The number of available events is %s\n"%self.maxEvents)
309 slacapra 1.1
310 slacapra 1.41 common.logger.message("Contacting DLS...")
311 slacapra 1.1 ## Contact the DLS and build a list of sites hosting the fileblocks
312     try:
313 slacapra 1.41 dataloc=DataLocation.DataLocation(self.filesbyblock.keys(),cfg_params)
314 gutsche 1.6 dataloc.fetchDLSInfo()
315 slacapra 1.41 except DataLocation.DataLocationError , ex:
316 slacapra 1.1 msg = 'ERROR ***: failed Data Location in DLS \n %s '%ex.getErrorMessage()
317     raise CrabException(msg)
318    
319    
320 gutsche 1.35 sites = dataloc.getSites()
321     allSites = []
322     listSites = sites.values()
323     for list in listSites:
324     for oneSite in list:
325     allSites.append(oneSite)
326     allSites = self.uniquelist(allSites)
327 gutsche 1.3
328 gutsche 1.35 common.logger.message("Sites ("+str(len(allSites))+") hosting part/all of dataset: "+str(allSites))
329     common.logger.debug(6, "List of Sites: "+str(allSites))
330     return sites
331 gutsche 1.3
332 gutsche 1.35 def jobSplittingByBlocks(self, blockSites):
333 slacapra 1.9 """
334 gutsche 1.35 Perform job splitting. Jobs run over an integer number of files
335     and no more than one block.
336     ARGUMENT: blockSites: dictionary with blocks as keys and list of host sites as values
337     REQUIRES: self.selectTotalNumberEvents, self.selectEventsPerJob, self.selectNumberofJobs,
338     self.total_number_of_events, self.eventsPerJob, self.theNumberOfJobs,
339     self.maxEvents, self.filesbyblock
340     SETS: self.jobDestination - Site destination(s) for each job (a list of lists)
341     self.total_number_of_jobs - Total # of jobs
342     self.list_of_args - File(s) job will run on (a list of lists)
343     """
344    
345     # ---- Handle the possible job splitting configurations ---- #
346     if (self.selectTotalNumberEvents):
347     totalEventsRequested = self.total_number_of_events
348     if (self.selectEventsPerJob):
349     eventsPerJobRequested = self.eventsPerJob
350     if (self.selectNumberOfJobs):
351     totalEventsRequested = self.theNumberOfJobs * self.eventsPerJob
352    
353     # If user requested all the events in the dataset
354     if (totalEventsRequested == -1):
355     eventsRemaining=self.maxEvents
356     # If user requested more events than are in the dataset
357     elif (totalEventsRequested > self.maxEvents):
358     eventsRemaining = self.maxEvents
359     common.logger.message("Requested "+str(self.total_number_of_events)+ " events, but only "+str(self.maxEvents)+" events are available.")
360     # If user requested less events than are in the dataset
361     else:
362     eventsRemaining = totalEventsRequested
363 slacapra 1.22
364 slacapra 1.41 # If user requested more events per job than are in the dataset
365     if (self.selectEventsPerJob and eventsPerJobRequested > self.maxEvents):
366     eventsPerJobRequested = self.maxEvents
367    
368 gutsche 1.35 # For user info at end
369     totalEventCount = 0
370 gutsche 1.3
371 gutsche 1.35 if (self.selectTotalNumberEvents and self.selectNumberOfJobs):
372     eventsPerJobRequested = int(eventsRemaining/self.theNumberOfJobs)
373 slacapra 1.22
374 gutsche 1.35 if (self.selectNumberOfJobs):
375     common.logger.message("May not create the exact number_of_jobs requested.")
376 slacapra 1.23
377 gutsche 1.38 if ( self.ncjobs == 'all' ) :
378     totalNumberOfJobs = 999999999
379     else :
380     totalNumberOfJobs = self.ncjobs
381    
382    
383 gutsche 1.35 blocks = blockSites.keys()
384     blockCount = 0
385     # Backup variable in case self.maxEvents counted events in a non-included block
386     numBlocksInDataset = len(blocks)
387 gutsche 1.3
388 gutsche 1.35 jobCount = 0
389     list_of_lists = []
390 gutsche 1.3
391 gutsche 1.35 # ---- Iterate over the blocks in the dataset until ---- #
392     # ---- we've met the requested total # of events ---- #
393 gutsche 1.38 while ( (eventsRemaining > 0) and (blockCount < numBlocksInDataset) and (jobCount < totalNumberOfJobs)):
394 gutsche 1.35 block = blocks[blockCount]
395 gutsche 1.44 blockCount += 1
396    
397 gutsche 1.3
398 gutsche 1.44 numEventsInBlock = self.eventsbyblock[block]
399     common.logger.debug(5,'Events in Block File '+str(numEventsInBlock))
400 slacapra 1.9
401 gutsche 1.35 files = self.filesbyblock[block]
402     numFilesInBlock = len(files)
403     if (numFilesInBlock <= 0):
404     continue
405     fileCount = 0
406    
407     # ---- New block => New job ---- #
408     parString = "\\{"
409 gutsche 1.38 # counter for number of events in files currently worked on
410     filesEventCount = 0
411     # flag if next while loop should touch new file
412     newFile = 1
413     # job event counter
414     jobSkipEventCount = 0
415 slacapra 1.9
416 gutsche 1.35 # ---- Iterate over the files in the block until we've met the requested ---- #
417     # ---- total # of events or we've gone over all the files in this block ---- #
418 gutsche 1.38 while ( (eventsRemaining > 0) and (fileCount < numFilesInBlock) and (jobCount < totalNumberOfJobs) ):
419 gutsche 1.35 file = files[fileCount]
420 gutsche 1.38 if newFile :
421 slacapra 1.41 try:
422     numEventsInFile = self.eventsbyfile[file]
423     common.logger.debug(6, "File "+str(file)+" has "+str(numEventsInFile)+" events")
424     # increase filesEventCount
425     filesEventCount += numEventsInFile
426     # Add file to current job
427     parString += '\\\"' + file + '\\\"\,'
428     newFile = 0
429     except KeyError:
430 gutsche 1.44 common.logger.message("File "+str(file)+" has unknown number of events: skipping")
431 slacapra 1.41
432 gutsche 1.38
433     # if less events in file remain than eventsPerJobRequested
434     if ( filesEventCount - jobSkipEventCount < eventsPerJobRequested ) :
435     # if last file in block
436 gutsche 1.44 if ( fileCount == numFilesInBlock-1 ) :
437 gutsche 1.38 # end job using last file, use remaining events in block
438     # close job and touch new file
439     fullString = parString[:-2]
440     fullString += '\\}'
441     list_of_lists.append([fullString,str(-1),str(jobSkipEventCount)])
442 slacapra 1.41 common.logger.debug(3,"Job "+str(jobCount+1)+" can run over "+str(filesEventCount - jobSkipEventCount)+" events (last file in block).")
443 gutsche 1.38 self.jobDestination.append(blockSites[block])
444     common.logger.debug(5,"Job "+str(jobCount+1)+" Destination: "+str(self.jobDestination[jobCount]))
445     # reset counter
446     jobCount = jobCount + 1
447 gutsche 1.44 totalEventCount = totalEventCount + filesEventCount - jobSkipEventCount
448     eventsRemaining = eventsRemaining - filesEventCount + jobSkipEventCount
449 gutsche 1.38 jobSkipEventCount = 0
450     # reset file
451     parString = "\\{"
452     filesEventCount = 0
453     newFile = 1
454     fileCount += 1
455     else :
456     # go to next file
457     newFile = 1
458     fileCount += 1
459     # if events in file equal to eventsPerJobRequested
460     elif ( filesEventCount - jobSkipEventCount == eventsPerJobRequested ) :
461     # close job and touch new file
462 gutsche 1.35 fullString = parString[:-2]
463     fullString += '\\}'
464 gutsche 1.38 list_of_lists.append([fullString,str(eventsPerJobRequested),str(jobSkipEventCount)])
465 slacapra 1.41 common.logger.debug(3,"Job "+str(jobCount+1)+" can run over "+str(eventsPerJobRequested)+" events.")
466 gutsche 1.38 self.jobDestination.append(blockSites[block])
467     common.logger.debug(5,"Job "+str(jobCount+1)+" Destination: "+str(self.jobDestination[jobCount]))
468     # reset counter
469     jobCount = jobCount + 1
470     totalEventCount = totalEventCount + eventsPerJobRequested
471     eventsRemaining = eventsRemaining - eventsPerJobRequested
472     jobSkipEventCount = 0
473     # reset file
474     parString = "\\{"
475     filesEventCount = 0
476     newFile = 1
477     fileCount += 1
478    
479     # if more events in file remain than eventsPerJobRequested
480     else :
481     # close job but don't touch new file
482     fullString = parString[:-2]
483     fullString += '\\}'
484     list_of_lists.append([fullString,str(eventsPerJobRequested),str(jobSkipEventCount)])
485 slacapra 1.41 common.logger.debug(3,"Job "+str(jobCount+1)+" can run over "+str(eventsPerJobRequested)+" events.")
486 gutsche 1.35 self.jobDestination.append(blockSites[block])
487     common.logger.debug(5,"Job "+str(jobCount+1)+" Destination: "+str(self.jobDestination[jobCount]))
488 gutsche 1.38 # increase counter
489     jobCount = jobCount + 1
490     totalEventCount = totalEventCount + eventsPerJobRequested
491     eventsRemaining = eventsRemaining - eventsPerJobRequested
492     # calculate skip events for last file
493     # use filesEventCount (contains several files), jobSkipEventCount and eventsPerJobRequest
494     jobSkipEventCount = eventsPerJobRequested - (filesEventCount - jobSkipEventCount - self.eventsbyfile[file])
495     # remove all but the last file
496     filesEventCount = self.eventsbyfile[file]
497     parString = "\\{"
498     parString += '\\\"' + file + '\\\"\,'
499 slacapra 1.41 pass # END if
500 gutsche 1.35 pass # END while (iterate over files in the block)
501     pass # END while (iterate over blocks in the dataset)
502 slacapra 1.41 self.ncjobs = self.total_number_of_jobs = jobCount
503 gutsche 1.38 if (eventsRemaining > 0 and jobCount < totalNumberOfJobs ):
504 gutsche 1.35 common.logger.message("Could not run on all requested events because some blocks not hosted at allowed sites.")
505 mkirn 1.37 common.logger.message("\n"+str(jobCount)+" job(s) can run on "+str(totalEventCount)+" events.\n")
506 slacapra 1.22
507 slacapra 1.9 self.list_of_args = list_of_lists
508     return
509    
510 slacapra 1.21 def jobSplittingNoInput(self):
511 slacapra 1.9 """
512     Perform job splitting based on number of event per job
513     """
514     common.logger.debug(5,'Splitting per events')
515     common.logger.message('Required '+str(self.eventsPerJob)+' events per job ')
516 slacapra 1.22 common.logger.message('Required '+str(self.theNumberOfJobs)+' jobs in total ')
517 slacapra 1.9 common.logger.message('Required '+str(self.total_number_of_events)+' events in total ')
518    
519 slacapra 1.10 if (self.total_number_of_events < 0):
520     msg='Cannot split jobs per Events with "-1" as total number of events'
521     raise CrabException(msg)
522    
523 slacapra 1.22 if (self.selectEventsPerJob):
524     self.total_number_of_jobs = int(self.total_number_of_events/self.eventsPerJob)
525     elif (self.selectNumberOfJobs) :
526     self.total_number_of_jobs = self.theNumberOfJobs
527     self.eventsPerJob = int(self.total_number_of_events/self.total_number_of_jobs)
528 fanzago 1.12
529 slacapra 1.9 common.logger.debug(5,'N jobs '+str(self.total_number_of_jobs))
530    
531     # is there any remainder?
532     check = int(self.total_number_of_events) - (int(self.total_number_of_jobs)*self.eventsPerJob)
533    
534     common.logger.debug(5,'Check '+str(check))
535    
536 gutsche 1.35 common.logger.message(str(self.total_number_of_jobs)+' jobs can be created, each for '+str(self.eventsPerJob)+' for a total of '+str(self.total_number_of_jobs*self.eventsPerJob)+' events')
537 slacapra 1.9 if check > 0:
538 gutsche 1.35 common.logger.message('Warning: asked '+str(self.total_number_of_events)+' but can do only '+str(int(self.total_number_of_jobs)*self.eventsPerJob))
539 slacapra 1.9
540 slacapra 1.10 # argument is seed number.$i
541 slacapra 1.9 self.list_of_args = []
542     for i in range(self.total_number_of_jobs):
543 gutsche 1.35 ## Since there is no input, any site is good
544 spiga 1.42 # self.jobDestination.append(["Any"])
545     self.jobDestination.append([""]) #must be empty to write correctly the xml
546 spiga 1.57 args=''
547     if (self.firstRun):
548     ## pythia first run
549     #self.list_of_args.append([(str(self.firstRun)+str(i))])
550     args=args+(str(self.firstRun)+str(i))
551     else:
552     ## no first run
553     #self.list_of_args.append([str(i)])
554     args=args+str(i)
555 slacapra 1.23 if (self.sourceSeed):
556 slacapra 1.28 if (self.sourceSeedVtx):
557     ## pythia + vtx random seed
558 spiga 1.57 #self.list_of_args.append([
559     # str(self.sourceSeed)+str(i),
560     # str(self.sourceSeedVtx)+str(i)
561     # ])
562     args=args+str(',')+str(self.sourceSeed)+str(i)+str(',')+str(self.sourceSeedVtx)+str(i)
563 slacapra 1.28 else:
564     ## only pythia random seed
565 spiga 1.57 #self.list_of_args.append([(str(self.sourceSeed)+str(i))])
566     args=args +str(',')+str(self.sourceSeed)+str(i)
567 slacapra 1.23 else:
568 slacapra 1.28 ## no random seed
569 spiga 1.57 if str(args)=='': args=args+(str(self.firstRun)+str(i))
570     arguments=args.split(',')
571     if len(arguments)==3:self.list_of_args.append([str(arguments[0]),str(arguments[1]),str(arguments[2])])
572     elif len(arguments)==2:self.list_of_args.append([str(arguments[0]),str(arguments[1])])
573     else :self.list_of_args.append([str(arguments[0])])
574    
575     # print self.list_of_args
576 gutsche 1.3
577     return
578    
579 spiga 1.42
580     def jobSplittingForScript(self):#CarlosDaniele
581     """
582     Perform job splitting based on number of job
583     """
584     common.logger.debug(5,'Splitting per job')
585     common.logger.message('Required '+str(self.theNumberOfJobs)+' jobs in total ')
586    
587     self.total_number_of_jobs = self.theNumberOfJobs
588    
589     common.logger.debug(5,'N jobs '+str(self.total_number_of_jobs))
590    
591     common.logger.message(str(self.total_number_of_jobs)+' jobs can be created')
592    
593     # argument is seed number.$i
594     self.list_of_args = []
595     for i in range(self.total_number_of_jobs):
596     ## Since there is no input, any site is good
597     # self.jobDestination.append(["Any"])
598     self.jobDestination.append([""])
599     ## no random seed
600     self.list_of_args.append([str(i)])
601     return
602    
603 gutsche 1.3 def split(self, jobParams):
604    
605     common.jobDB.load()
606     #### Fabio
607     njobs = self.total_number_of_jobs
608 slacapra 1.9 arglist = self.list_of_args
609 gutsche 1.3 # create the empty structure
610     for i in range(njobs):
611     jobParams.append("")
612    
613     for job in range(njobs):
614 slacapra 1.17 jobParams[job] = arglist[job]
615     # print str(arglist[job])
616     # print jobParams[job]
617 gutsche 1.3 common.jobDB.setArguments(job, jobParams[job])
618 gutsche 1.35 common.logger.debug(5,"Job "+str(job)+" Destination: "+str(self.jobDestination[job]))
619     common.jobDB.setDestination(job, self.jobDestination[job])
620 gutsche 1.3
621     common.jobDB.save()
622     return
623    
624     def getJobTypeArguments(self, nj, sched):
625 slacapra 1.17 result = ''
626     for i in common.jobDB.arguments(nj):
627     result=result+str(i)+" "
628     return result
629 gutsche 1.3
630     def numberOfJobs(self):
631     # Fabio
632     return self.total_number_of_jobs
633    
634 slacapra 1.1 def getTarBall(self, exe):
635     """
636     Return the TarBall with lib and exe
637     """
638    
639     # if it exist, just return it
640 corvo 1.56 #
641     # Marco. Let's start to use relative path for Boss XML files
642     #
643     self.tgzNameWithPath = common.work_space.pathForTgz()+'share/'+self.tgz_name
644 slacapra 1.1 if os.path.exists(self.tgzNameWithPath):
645     return self.tgzNameWithPath
646    
647     # Prepare a tar gzipped file with user binaries.
648     self.buildTar_(exe)
649    
650     return string.strip(self.tgzNameWithPath)
651    
652     def buildTar_(self, executable):
653    
654     # First of all declare the user Scram area
655     swArea = self.scram.getSWArea_()
656     #print "swArea = ", swArea
657     swVersion = self.scram.getSWVersion()
658     #print "swVersion = ", swVersion
659     swReleaseTop = self.scram.getReleaseTop_()
660     #print "swReleaseTop = ", swReleaseTop
661    
662     ## check if working area is release top
663     if swReleaseTop == '' or swArea == swReleaseTop:
664     return
665    
666 slacapra 1.61 import tarfile
667     try: # create tar ball
668     tar = tarfile.open(self.tgzNameWithPath, "w:gz")
669     ## First find the executable
670     if (self.executable != ''):
671     exeWithPath = self.scram.findFile_(executable)
672     if ( not exeWithPath ):
673     raise CrabException('User executable '+executable+' not found')
674    
675     ## then check if it's private or not
676     if exeWithPath.find(swReleaseTop) == -1:
677     # the exe is private, so we must ship
678     common.logger.debug(5,"Exe "+exeWithPath+" to be tarred")
679     path = swArea+'/'
680     exe = string.replace(exeWithPath, path,'')
681     tar.add(path+exe,exe)
682     pass
683     else:
684     # the exe is from release, we'll find it on WN
685     pass
686    
687     ## Now get the libraries: only those in local working area
688     libDir = 'lib'
689     lib = swArea+'/' +libDir
690     common.logger.debug(5,"lib "+lib+" to be tarred")
691     if os.path.exists(lib):
692     tar.add(lib,libDir)
693    
694     ## Now check if module dir is present
695     moduleDir = 'module'
696     module = swArea + '/' + moduleDir
697     if os.path.isdir(module):
698     tar.add(module,moduleDir)
699    
700     ## Now check if any data dir(s) is present
701     swAreaLen=len(swArea)
702     for root, dirs, files in os.walk(swArea):
703     if "data" in dirs:
704     common.logger.debug(5,"data "+root+"/data"+" to be tarred")
705     tar.add(root+"/data",root[swAreaLen:]+"/data")
706    
707     ## Add ProdAgent dir to tar
708     paDir = 'ProdAgentApi'
709     pa = os.environ['CRABDIR'] + '/' + 'ProdAgentApi'
710     if os.path.isdir(pa):
711     tar.add(pa,paDir)
712    
713     common.logger.debug(5,"Files added to "+self.tgzNameWithPath+" : "+str(tar.getnames()))
714     tar.close()
715     except :
716     raise CrabException('Could not create tar-ball')
717 corvo 1.56
718 slacapra 1.61 ## create tar-ball with ML stuff
719 corvo 1.58 self.MLtgzfile = common.work_space.pathForTgz()+'share/MLfiles.tgz'
720 slacapra 1.61 try:
721     tar = tarfile.open(self.MLtgzfile, "w:gz")
722     path=os.environ['CRABDIR'] + '/python/'
723     for file in ['report.py', 'DashboardAPI.py', 'Logger.py', 'ProcInfo.py', 'apmon.py', 'parseCrabFjr.py']:
724     tar.add(path+file,file)
725     common.logger.debug(5,"Files added to "+self.MLtgzfile+" : "+str(tar.getnames()))
726     tar.close()
727     except :
728 corvo 1.58 raise CrabException('Could not create ML files tar-ball')
729    
730 slacapra 1.1 return
731    
732     def wsSetupEnvironment(self, nj):
733     """
734     Returns part of a job script which prepares
735     the execution environment for the job 'nj'.
736     """
737     # Prepare JobType-independent part
738 gutsche 1.3 txt = ''
739    
740     ## OLI_Daniele at this level middleware already known
741    
742     txt += 'if [ $middleware == LCG ]; then \n'
743     txt += self.wsSetupCMSLCGEnvironment_()
744     txt += 'elif [ $middleware == OSG ]; then\n'
745 gutsche 1.43 txt += ' WORKING_DIR=`/bin/mktemp -d $OSG_WN_TMP/cms_XXXXXXXXXXXX`\n'
746     txt += ' echo "Created working directory: $WORKING_DIR"\n'
747 gutsche 1.3 txt += ' if [ ! -d $WORKING_DIR ] ;then\n'
748 gutsche 1.7 txt += ' echo "SET_CMS_ENV 10016 ==> OSG $WORKING_DIR could not be created on WN `hostname`"\n'
749     txt += ' echo "JOB_EXIT_STATUS = 10016"\n'
750     txt += ' echo "JobExitCode=10016" | tee -a $RUNTIME_AREA/$repo\n'
751     txt += ' dumpStatus $RUNTIME_AREA/$repo\n'
752 gutsche 1.13 txt += ' rm -f $RUNTIME_AREA/$repo \n'
753     txt += ' echo "MonitorJobID=`echo $MonitorJobID`" | tee -a $RUNTIME_AREA/$repo \n'
754     txt += ' echo "MonitorID=`echo $MonitorID`" | tee -a $RUNTIME_AREA/$repo\n'
755 gutsche 1.3 txt += ' exit 1\n'
756     txt += ' fi\n'
757     txt += '\n'
758     txt += ' echo "Change to working directory: $WORKING_DIR"\n'
759     txt += ' cd $WORKING_DIR\n'
760     txt += self.wsSetupCMSOSGEnvironment_()
761     txt += 'fi\n'
762 slacapra 1.1
763     # Prepare JobType-specific part
764     scram = self.scram.commandName()
765     txt += '\n\n'
766     txt += 'echo "### SPECIFIC JOB SETUP ENVIRONMENT ###"\n'
767     txt += scram+' project CMSSW '+self.version+'\n'
768     txt += 'status=$?\n'
769     txt += 'if [ $status != 0 ] ; then\n'
770 gutsche 1.7 txt += ' echo "SET_EXE_ENV 10034 ==>ERROR CMSSW '+self.version+' not found on `hostname`" \n'
771 gutsche 1.3 txt += ' echo "JOB_EXIT_STATUS = 10034"\n'
772 gutsche 1.7 txt += ' echo "JobExitCode=10034" | tee -a $RUNTIME_AREA/$repo\n'
773 slacapra 1.1 txt += ' dumpStatus $RUNTIME_AREA/$repo\n'
774 gutsche 1.13 txt += ' rm -f $RUNTIME_AREA/$repo \n'
775     txt += ' echo "MonitorJobID=`echo $MonitorJobID`" | tee -a $RUNTIME_AREA/$repo \n'
776     txt += ' echo "MonitorID=`echo $MonitorID`" | tee -a $RUNTIME_AREA/$repo\n'
777 gutsche 1.3 ## OLI_Daniele
778     txt += ' if [ $middleware == OSG ]; then \n'
779     txt += ' echo "Remove working directory: $WORKING_DIR"\n'
780     txt += ' cd $RUNTIME_AREA\n'
781     txt += ' /bin/rm -rf $WORKING_DIR\n'
782     txt += ' if [ -d $WORKING_DIR ] ;then\n'
783 gutsche 1.7 txt += ' echo "SET_CMS_ENV 10018 ==> OSG $WORKING_DIR could not be deleted on WN `hostname` after CMSSW CMSSW_0_6_1 not found on `hostname`"\n'
784     txt += ' echo "JOB_EXIT_STATUS = 10018"\n'
785     txt += ' echo "JobExitCode=10018" | tee -a $RUNTIME_AREA/$repo\n'
786     txt += ' dumpStatus $RUNTIME_AREA/$repo\n'
787 gutsche 1.13 txt += ' rm -f $RUNTIME_AREA/$repo \n'
788     txt += ' echo "MonitorJobID=`echo $MonitorJobID`" | tee -a $RUNTIME_AREA/$repo \n'
789     txt += ' echo "MonitorID=`echo $MonitorID`" | tee -a $RUNTIME_AREA/$repo\n'
790 gutsche 1.3 txt += ' fi\n'
791     txt += ' fi \n'
792     txt += ' exit 1 \n'
793 slacapra 1.1 txt += 'fi \n'
794     txt += 'echo "CMSSW_VERSION = '+self.version+'"\n'
795     txt += 'cd '+self.version+'\n'
796     ### needed grep for bug in scramv1 ###
797 corvo 1.58 txt += scram+' runtime -sh\n'
798 slacapra 1.1 txt += 'eval `'+scram+' runtime -sh | grep -v SCRAMRT_LSB_JOBNAME`\n'
799 corvo 1.58 txt += 'echo $PATH\n'
800 slacapra 1.1
801     # Handle the arguments:
802     txt += "\n"
803 gutsche 1.7 txt += "## number of arguments (first argument always jobnumber)\n"
804 slacapra 1.1 txt += "\n"
805 mkirn 1.32 # txt += "narg=$#\n"
806     txt += "if [ $nargs -lt 2 ]\n"
807 slacapra 1.1 txt += "then\n"
808 mkirn 1.33 txt += " echo 'SET_EXE_ENV 1 ==> ERROR Too few arguments' +$nargs+ \n"
809 gutsche 1.3 txt += ' echo "JOB_EXIT_STATUS = 50113"\n'
810 gutsche 1.7 txt += ' echo "JobExitCode=50113" | tee -a $RUNTIME_AREA/$repo\n'
811 slacapra 1.1 txt += ' dumpStatus $RUNTIME_AREA/$repo\n'
812 gutsche 1.13 txt += ' rm -f $RUNTIME_AREA/$repo \n'
813     txt += ' echo "MonitorJobID=`echo $MonitorJobID`" | tee -a $RUNTIME_AREA/$repo \n'
814     txt += ' echo "MonitorID=`echo $MonitorID`" | tee -a $RUNTIME_AREA/$repo\n'
815 gutsche 1.3 ## OLI_Daniele
816     txt += ' if [ $middleware == OSG ]; then \n'
817     txt += ' echo "Remove working directory: $WORKING_DIR"\n'
818     txt += ' cd $RUNTIME_AREA\n'
819     txt += ' /bin/rm -rf $WORKING_DIR\n'
820     txt += ' if [ -d $WORKING_DIR ] ;then\n'
821 gutsche 1.7 txt += ' echo "SET_EXE_ENV 50114 ==> OSG $WORKING_DIR could not be deleted on WN `hostname` after Too few arguments for CRAB job wrapper"\n'
822     txt += ' echo "JOB_EXIT_STATUS = 50114"\n'
823     txt += ' echo "JobExitCode=50114" | tee -a $RUNTIME_AREA/$repo\n'
824     txt += ' dumpStatus $RUNTIME_AREA/$repo\n'
825 gutsche 1.13 txt += ' rm -f $RUNTIME_AREA/$repo \n'
826     txt += ' echo "MonitorJobID=`echo $MonitorJobID`" | tee -a $RUNTIME_AREA/$repo \n'
827     txt += ' echo "MonitorID=`echo $MonitorID`" | tee -a $RUNTIME_AREA/$repo\n'
828 gutsche 1.3 txt += ' fi\n'
829     txt += ' fi \n'
830 slacapra 1.1 txt += " exit 1\n"
831     txt += "fi\n"
832     txt += "\n"
833    
834     # Prepare job-specific part
835     job = common.job_list[nj]
836 spiga 1.42 if self.pset != None: #CarlosDaniele
837     pset = os.path.basename(job.configFilename())
838     txt += '\n'
839     if (self.datasetPath): # standard job
840     #txt += 'InputFiles=$2\n'
841     txt += 'InputFiles=${args[1]}\n'
842     txt += 'MaxEvents=${args[2]}\n'
843     txt += 'SkipEvents=${args[3]}\n'
844     txt += 'echo "Inputfiles:<$InputFiles>"\n'
845     txt += 'sed "s#{\'INPUT\'}#$InputFiles#" $RUNTIME_AREA/'+pset+' > pset_tmp_1.cfg\n'
846     txt += 'echo "MaxEvents:<$MaxEvents>"\n'
847 gutsche 1.46 txt += 'sed "s#INPUTMAXEVENTS#$MaxEvents#" pset_tmp_1.cfg > pset_tmp_2.cfg\n'
848 spiga 1.42 txt += 'echo "SkipEvents:<$SkipEvents>"\n'
849 gutsche 1.46 txt += 'sed "s#INPUTSKIPEVENTS#$SkipEvents#" pset_tmp_2.cfg > pset.cfg\n'
850 spiga 1.42 else: # pythia like job
851     if (self.sourceSeed):
852 spiga 1.57 txt += 'FirstRun=${args[1]}\n'
853     txt += 'echo "FirstRun: <$FirstRun>"\n'
854     txt += 'sed "s#\<INPUTFIRSTRUN\>#$FirstRun#" $RUNTIME_AREA/'+pset+' > tmp_1.cfg\n'
855     else:
856     txt += '# Copy untouched pset\n'
857     txt += 'cp $RUNTIME_AREA/'+pset+' tmp_1.cfg\n'
858     if (self.sourceSeed):
859 spiga 1.42 # txt += 'Seed=$2\n'
860 spiga 1.57 txt += 'Seed=${args[2]}\n'
861 spiga 1.42 txt += 'echo "Seed: <$Seed>"\n'
862 spiga 1.57 txt += 'sed "s#\<INPUT\>#$Seed#" tmp_1.cfg > tmp_2.cfg\n'
863 spiga 1.42 if (self.sourceSeedVtx):
864     # txt += 'VtxSeed=$3\n'
865 spiga 1.57 txt += 'VtxSeed=${args[3]}\n'
866 spiga 1.42 txt += 'echo "VtxSeed: <$VtxSeed>"\n'
867 spiga 1.57 txt += 'sed "s#INPUTVTX#$VtxSeed#" tmp_2.cfg > pset.cfg\n'
868 spiga 1.42 else:
869 spiga 1.57 txt += 'mv tmp_2.cfg pset.cfg\n'
870 slacapra 1.28 else:
871 spiga 1.57 txt += 'mv tmp_1.cfg pset.cfg\n'
872     # txt += '# Copy untouched pset\n'
873     # txt += 'cp $RUNTIME_AREA/'+pset+' pset.cfg\n'
874 slacapra 1.24
875 slacapra 1.1
876     if len(self.additional_inbox_files) > 0:
877     for file in self.additional_inbox_files:
878 mkirn 1.31 relFile = file.split("/")[-1]
879     txt += 'if [ -e $RUNTIME_AREA/'+relFile+' ] ; then\n'
880     txt += ' cp $RUNTIME_AREA/'+relFile+' .\n'
881     txt += ' chmod +x '+relFile+'\n'
882 slacapra 1.1 txt += 'fi\n'
883     pass
884    
885 spiga 1.42 if self.pset != None: #CarlosDaniele
886     txt += 'echo "### END JOB SETUP ENVIRONMENT ###"\n\n'
887    
888     txt += '\n'
889     txt += 'echo "***** cat pset.cfg *********"\n'
890     txt += 'cat pset.cfg\n'
891     txt += 'echo "****** end pset.cfg ********"\n'
892     txt += '\n'
893     # txt += 'echo "***** cat pset1.cfg *********"\n'
894     # txt += 'cat pset1.cfg\n'
895     # txt += 'echo "****** end pset1.cfg ********"\n'
896 gutsche 1.3 return txt
897    
898     def wsBuildExe(self, nj):
899     """
900     Put in the script the commands to build an executable
901     or a library.
902     """
903    
904     txt = ""
905    
906     if os.path.isfile(self.tgzNameWithPath):
907     txt += 'echo "tar xzvf $RUNTIME_AREA/'+os.path.basename(self.tgzNameWithPath)+'"\n'
908     txt += 'tar xzvf $RUNTIME_AREA/'+os.path.basename(self.tgzNameWithPath)+'\n'
909     txt += 'untar_status=$? \n'
910     txt += 'if [ $untar_status -ne 0 ]; then \n'
911     txt += ' echo "SET_EXE 1 ==> ERROR Untarring .tgz file failed"\n'
912     txt += ' echo "JOB_EXIT_STATUS = $untar_status" \n'
913 gutsche 1.7 txt += ' echo "JobExitCode=$untar_status" | tee -a $RUNTIME_AREA/$repo\n'
914 gutsche 1.3 txt += ' if [ $middleware == OSG ]; then \n'
915     txt += ' echo "Remove working directory: $WORKING_DIR"\n'
916     txt += ' cd $RUNTIME_AREA\n'
917     txt += ' /bin/rm -rf $WORKING_DIR\n'
918     txt += ' if [ -d $WORKING_DIR ] ;then\n'
919 gutsche 1.13 txt += ' echo "SET_EXE 50999 ==> OSG $WORKING_DIR could not be deleted on WN `hostname` after Untarring .tgz file failed"\n'
920     txt += ' echo "JOB_EXIT_STATUS = 50999"\n'
921     txt += ' echo "JobExitCode=50999" | tee -a $RUNTIME_AREA/$repo\n'
922     txt += ' dumpStatus $RUNTIME_AREA/$repo\n'
923     txt += ' rm -f $RUNTIME_AREA/$repo \n'
924     txt += ' echo "MonitorJobID=`echo $MonitorJobID`" | tee -a $RUNTIME_AREA/$repo \n'
925     txt += ' echo "MonitorID=`echo $MonitorID`" | tee -a $RUNTIME_AREA/$repo\n'
926 gutsche 1.3 txt += ' fi\n'
927     txt += ' fi \n'
928     txt += ' \n'
929 gutsche 1.7 txt += ' exit 1 \n'
930 gutsche 1.3 txt += 'else \n'
931     txt += ' echo "Successful untar" \n'
932     txt += 'fi \n'
933 gutsche 1.50 txt += '\n'
934     txt += 'echo "Include ProdAgentApi in PYTHONPATH"\n'
935     txt += 'if [ -z "$PYTHONPATH" ]; then\n'
936     txt += ' export PYTHONPATH=ProdAgentApi\n'
937     txt += 'else\n'
938     txt += ' export PYTHONPATH=ProdAgentApi:${PYTHONPATH}\n'
939     txt += 'fi\n'
940     txt += '\n'
941    
942 gutsche 1.3 pass
943    
944 slacapra 1.1 return txt
945    
946     def modifySteeringCards(self, nj):
947     """
948     modify the card provided by the user,
949     writing a new card into share dir
950     """
951    
952     def executableName(self):
953 spiga 1.42 if self.pset == None: #CarlosDaniele
954     return "sh "
955     else:
956     return self.executable
957 slacapra 1.1
958     def executableArgs(self):
959 spiga 1.42 if self.pset == None:#CarlosDaniele
960     return self.scriptExe + " $NJob"
961     else:
962     return " -p pset.cfg"
963 slacapra 1.1
964     def inputSandbox(self, nj):
965     """
966     Returns a list of filenames to be put in JDL input sandbox.
967     """
968     inp_box = []
969 slacapra 1.53 # # dict added to delete duplicate from input sandbox file list
970     # seen = {}
971 slacapra 1.1 ## code
972     if os.path.isfile(self.tgzNameWithPath):
973     inp_box.append(self.tgzNameWithPath)
974 corvo 1.58 if os.path.isfile(self.MLtgzfile):
975     inp_box.append(self.MLtgzfile)
976 slacapra 1.1 ## config
977 spiga 1.42 if not self.pset is None: #CarlosDaniele
978 corvo 1.56 inp_box.append(common.work_space.pathForTgz() + 'job/' + self.configFilename())
979 slacapra 1.1 ## additional input files
980 gutsche 1.3 #for file in self.additional_inbox_files:
981     # inp_box.append(common.work_space.cwdDir()+file)
982 slacapra 1.1 return inp_box
983    
984     def outputSandbox(self, nj):
985     """
986     Returns a list of filenames to be put in JDL output sandbox.
987     """
988     out_box = []
989    
990     ## User Declared output files
991 slacapra 1.54 for out in (self.output_file+self.output_file_sandbox):
992 slacapra 1.1 n_out = nj + 1
993     out_box.append(self.numberFile_(out,str(n_out)))
994     return out_box
995    
996     def prepareSteeringCards(self):
997     """
998     Make initial modifications of the user's steering card file.
999     """
1000     return
1001    
1002     def wsRenameOutput(self, nj):
1003     """
1004     Returns part of a job script which renames the produced files.
1005     """
1006    
1007     txt = '\n'
1008 gutsche 1.7 txt += '# directory content\n'
1009     txt += 'ls \n'
1010 slacapra 1.54
1011     for fileWithSuffix in (self.output_file+self.output_file_sandbox):
1012 slacapra 1.1 output_file_num = self.numberFile_(fileWithSuffix, '$NJob')
1013     txt += '\n'
1014 gutsche 1.7 txt += '# check output file\n'
1015 slacapra 1.1 txt += 'ls '+fileWithSuffix+'\n'
1016 fanzago 1.18 txt += 'ls_result=$?\n'
1017     txt += 'if [ $ls_result -ne 0 ] ; then\n'
1018     txt += ' echo "ERROR: Problem with output file"\n'
1019 gutsche 1.7 if common.scheduler.boss_scheduler_name == 'condor_g':
1020     txt += ' if [ $middleware == OSG ]; then \n'
1021     txt += ' echo "prepare dummy output file"\n'
1022     txt += ' echo "Processing of job output failed" > $RUNTIME_AREA/'+output_file_num+'\n'
1023     txt += ' fi \n'
1024 slacapra 1.1 txt += 'else\n'
1025     txt += ' cp '+fileWithSuffix+' $RUNTIME_AREA/'+output_file_num+'\n'
1026     txt += 'fi\n'
1027    
1028 gutsche 1.7 txt += 'cd $RUNTIME_AREA\n'
1029 fanzago 1.18 txt += 'cd $RUNTIME_AREA\n'
1030 gutsche 1.3 ### OLI_DANIELE
1031     txt += 'if [ $middleware == OSG ]; then\n'
1032     txt += ' cd $RUNTIME_AREA\n'
1033     txt += ' echo "Remove working directory: $WORKING_DIR"\n'
1034     txt += ' /bin/rm -rf $WORKING_DIR\n'
1035     txt += ' if [ -d $WORKING_DIR ] ;then\n'
1036 gutsche 1.7 txt += ' echo "SET_EXE 60999 ==> OSG $WORKING_DIR could not be deleted on WN `hostname` after cleanup of WN"\n'
1037     txt += ' echo "JOB_EXIT_STATUS = 60999"\n'
1038     txt += ' echo "JobExitCode=60999" | tee -a $RUNTIME_AREA/$repo\n'
1039     txt += ' dumpStatus $RUNTIME_AREA/$repo\n'
1040 gutsche 1.13 txt += ' rm -f $RUNTIME_AREA/$repo \n'
1041     txt += ' echo "MonitorJobID=`echo $MonitorJobID`" | tee -a $RUNTIME_AREA/$repo \n'
1042     txt += ' echo "MonitorID=`echo $MonitorID`" | tee -a $RUNTIME_AREA/$repo\n'
1043 gutsche 1.3 txt += ' fi\n'
1044     txt += 'fi\n'
1045     txt += '\n'
1046 slacapra 1.54
1047     file_list = ''
1048     ## Add to filelist only files to be possibly copied to SE
1049     for fileWithSuffix in self.output_file:
1050     output_file_num = self.numberFile_(fileWithSuffix, '$NJob')
1051     file_list=file_list+output_file_num+' '
1052     file_list=file_list[:-1]
1053     txt += 'file_list="'+file_list+'"\n'
1054    
1055 slacapra 1.1 return txt
1056    
1057     def numberFile_(self, file, txt):
1058     """
1059     append _'txt' before last extension of a file
1060     """
1061     p = string.split(file,".")
1062     # take away last extension
1063     name = p[0]
1064     for x in p[1:-1]:
1065     name=name+"."+x
1066     # add "_txt"
1067     if len(p)>1:
1068     ext = p[len(p)-1]
1069     result = name + '_' + txt + "." + ext
1070     else:
1071     result = name + '_' + txt
1072    
1073     return result
1074    
1075 slacapra 1.41 def getRequirements(self):
1076 slacapra 1.1 """
1077     return job requirements to add to jdl files
1078     """
1079     req = ''
1080 slacapra 1.47 if self.version:
1081 slacapra 1.10 req='Member("VO-cms-' + \
1082 slacapra 1.47 self.version + \
1083 slacapra 1.10 '", other.GlueHostApplicationSoftwareRunTimeEnvironment)'
1084 gutsche 1.35
1085     req = req + ' && (other.GlueHostNetworkAdapterOutboundIP)'
1086    
1087 slacapra 1.1 return req
1088 gutsche 1.3
1089     def configFilename(self):
1090     """ return the config filename """
1091     return self.name()+'.cfg'
1092    
1093     ### OLI_DANIELE
1094     def wsSetupCMSOSGEnvironment_(self):
1095     """
1096     Returns part of a job script which is prepares
1097     the execution environment and which is common for all CMS jobs.
1098     """
1099     txt = '\n'
1100     txt += ' echo "### SETUP CMS OSG ENVIRONMENT ###"\n'
1101     txt += ' if [ -f $GRID3_APP_DIR/cmssoft/cmsset_default.sh ] ;then\n'
1102     txt += ' # Use $GRID3_APP_DIR/cmssoft/cmsset_default.sh to setup cms software\n'
1103     txt += ' source $GRID3_APP_DIR/cmssoft/cmsset_default.sh '+self.version+'\n'
1104 mkirn 1.40 txt += ' elif [ -f $OSG_APP/cmssoft/cms/cmsset_default.sh ] ;then\n'
1105     txt += ' # Use $OSG_APP/cmssoft/cms/cmsset_default.sh to setup cms software\n'
1106     txt += ' source $OSG_APP/cmssoft/cms/cmsset_default.sh '+self.version+'\n'
1107 gutsche 1.3 txt += ' else\n'
1108 mkirn 1.40 txt += ' echo "SET_CMS_ENV 10020 ==> ERROR $GRID3_APP_DIR/cmssoft/cmsset_default.sh and $OSG_APP/cmssoft/cms/cmsset_default.sh file not found"\n'
1109 gutsche 1.3 txt += ' echo "JOB_EXIT_STATUS = 10020"\n'
1110     txt += ' echo "JobExitCode=10020" | tee -a $RUNTIME_AREA/$repo\n'
1111     txt += ' dumpStatus $RUNTIME_AREA/$repo\n'
1112 gutsche 1.13 txt += ' rm -f $RUNTIME_AREA/$repo \n'
1113     txt += ' echo "MonitorJobID=`echo $MonitorJobID`" | tee -a $RUNTIME_AREA/$repo \n'
1114     txt += ' echo "MonitorID=`echo $MonitorID`" | tee -a $RUNTIME_AREA/$repo\n'
1115 gutsche 1.7 txt += ' exit 1\n'
1116 gutsche 1.3 txt += '\n'
1117     txt += ' echo "Remove working directory: $WORKING_DIR"\n'
1118     txt += ' cd $RUNTIME_AREA\n'
1119     txt += ' /bin/rm -rf $WORKING_DIR\n'
1120     txt += ' if [ -d $WORKING_DIR ] ;then\n'
1121 mkirn 1.40 txt += ' echo "SET_CMS_ENV 10017 ==> OSG $WORKING_DIR could not be deleted on WN `hostname` after $GRID3_APP_DIR/cmssoft/cmsset_default.sh and $OSG_APP/cmssoft/cms/cmsset_default.sh file not found"\n'
1122 gutsche 1.7 txt += ' echo "JOB_EXIT_STATUS = 10017"\n'
1123     txt += ' echo "JobExitCode=10017" | tee -a $RUNTIME_AREA/$repo\n'
1124     txt += ' dumpStatus $RUNTIME_AREA/$repo\n'
1125 gutsche 1.13 txt += ' rm -f $RUNTIME_AREA/$repo \n'
1126     txt += ' echo "MonitorJobID=`echo $MonitorJobID`" | tee -a $RUNTIME_AREA/$repo \n'
1127     txt += ' echo "MonitorID=`echo $MonitorID`" | tee -a $RUNTIME_AREA/$repo\n'
1128 gutsche 1.3 txt += ' fi\n'
1129     txt += '\n'
1130 gutsche 1.7 txt += ' exit 1\n'
1131 gutsche 1.3 txt += ' fi\n'
1132     txt += '\n'
1133     txt += ' echo "SET_CMS_ENV 0 ==> setup cms environment ok"\n'
1134     txt += ' echo " END SETUP CMS OSG ENVIRONMENT "\n'
1135    
1136     return txt
1137    
1138     ### OLI_DANIELE
1139     def wsSetupCMSLCGEnvironment_(self):
1140     """
1141     Returns part of a job script which is prepares
1142     the execution environment and which is common for all CMS jobs.
1143     """
1144     txt = ' \n'
1145     txt += ' echo " ### SETUP CMS LCG ENVIRONMENT ### "\n'
1146     txt += ' if [ ! $VO_CMS_SW_DIR ] ;then\n'
1147     txt += ' echo "SET_CMS_ENV 10031 ==> ERROR CMS software dir not found on WN `hostname`"\n'
1148     txt += ' echo "JOB_EXIT_STATUS = 10031" \n'
1149     txt += ' echo "JobExitCode=10031" | tee -a $RUNTIME_AREA/$repo\n'
1150     txt += ' dumpStatus $RUNTIME_AREA/$repo\n'
1151 gutsche 1.13 txt += ' rm -f $RUNTIME_AREA/$repo \n'
1152     txt += ' echo "MonitorJobID=`echo $MonitorJobID`" | tee -a $RUNTIME_AREA/$repo \n'
1153     txt += ' echo "MonitorID=`echo $MonitorID`" | tee -a $RUNTIME_AREA/$repo\n'
1154 gutsche 1.7 txt += ' exit 1\n'
1155 gutsche 1.3 txt += ' else\n'
1156     txt += ' echo "Sourcing environment... "\n'
1157     txt += ' if [ ! -s $VO_CMS_SW_DIR/cmsset_default.sh ] ;then\n'
1158     txt += ' echo "SET_CMS_ENV 10020 ==> ERROR cmsset_default.sh file not found into dir $VO_CMS_SW_DIR"\n'
1159     txt += ' echo "JOB_EXIT_STATUS = 10020"\n'
1160     txt += ' echo "JobExitCode=10020" | tee -a $RUNTIME_AREA/$repo\n'
1161     txt += ' dumpStatus $RUNTIME_AREA/$repo\n'
1162 gutsche 1.13 txt += ' rm -f $RUNTIME_AREA/$repo \n'
1163     txt += ' echo "MonitorJobID=`echo $MonitorJobID`" | tee -a $RUNTIME_AREA/$repo \n'
1164     txt += ' echo "MonitorID=`echo $MonitorID`" | tee -a $RUNTIME_AREA/$repo\n'
1165 gutsche 1.7 txt += ' exit 1\n'
1166 gutsche 1.3 txt += ' fi\n'
1167     txt += ' echo "sourcing $VO_CMS_SW_DIR/cmsset_default.sh"\n'
1168     txt += ' source $VO_CMS_SW_DIR/cmsset_default.sh\n'
1169     txt += ' result=$?\n'
1170     txt += ' if [ $result -ne 0 ]; then\n'
1171     txt += ' echo "SET_CMS_ENV 10032 ==> ERROR problem sourcing $VO_CMS_SW_DIR/cmsset_default.sh"\n'
1172     txt += ' echo "JOB_EXIT_STATUS = 10032"\n'
1173     txt += ' echo "JobExitCode=10032" | tee -a $RUNTIME_AREA/$repo\n'
1174     txt += ' dumpStatus $RUNTIME_AREA/$repo\n'
1175 gutsche 1.13 txt += ' rm -f $RUNTIME_AREA/$repo \n'
1176     txt += ' echo "MonitorJobID=`echo $MonitorJobID`" | tee -a $RUNTIME_AREA/$repo \n'
1177     txt += ' echo "MonitorID=`echo $MonitorID`" | tee -a $RUNTIME_AREA/$repo\n'
1178 gutsche 1.7 txt += ' exit 1\n'
1179 gutsche 1.3 txt += ' fi\n'
1180     txt += ' fi\n'
1181     txt += ' \n'
1182     txt += ' string=`cat /etc/redhat-release`\n'
1183     txt += ' echo $string\n'
1184     txt += ' if [[ $string = *alhalla* ]]; then\n'
1185     txt += ' echo "SCRAM_ARCH= $SCRAM_ARCH"\n'
1186     txt += ' elif [[ $string = *Enterprise* ]] || [[ $string = *cientific* ]]; then\n'
1187     txt += ' export SCRAM_ARCH=slc3_ia32_gcc323\n'
1188     txt += ' echo "SCRAM_ARCH= $SCRAM_ARCH"\n'
1189     txt += ' else\n'
1190 gutsche 1.7 txt += ' echo "SET_CMS_ENV 10033 ==> ERROR OS unknown, LCG environment not initialized"\n'
1191 gutsche 1.3 txt += ' echo "JOB_EXIT_STATUS = 10033"\n'
1192     txt += ' echo "JobExitCode=10033" | tee -a $RUNTIME_AREA/$repo\n'
1193     txt += ' dumpStatus $RUNTIME_AREA/$repo\n'
1194 gutsche 1.13 txt += ' rm -f $RUNTIME_AREA/$repo \n'
1195     txt += ' echo "MonitorJobID=`echo $MonitorJobID`" | tee -a $RUNTIME_AREA/$repo \n'
1196     txt += ' echo "MonitorID=`echo $MonitorID`" | tee -a $RUNTIME_AREA/$repo\n'
1197 gutsche 1.7 txt += ' exit 1\n'
1198 gutsche 1.3 txt += ' fi\n'
1199     txt += ' echo "SET_CMS_ENV 0 ==> setup cms environment ok"\n'
1200     txt += ' echo "### END SETUP CMS LCG ENVIRONMENT ###"\n'
1201     return txt
1202 gutsche 1.5
1203     def setParam_(self, param, value):
1204     self._params[param] = value
1205    
1206     def getParams(self):
1207     return self._params
1208 gutsche 1.8
1209     def setTaskid_(self):
1210     self._taskId = self.cfg_params['taskId']
1211    
1212     def getTaskid(self):
1213     return self._taskId
1214 gutsche 1.35
1215     #######################################################################
1216     def uniquelist(self, old):
1217     """
1218     remove duplicates from a list
1219     """
1220     nd={}
1221     for e in old:
1222     nd[e]=0
1223     return nd.keys()