ViewVC Help
View File | Revision Log | Show Annotations | Root Listing
root/cvsroot/COMP/CRAB/python/cms_cmssw.py
Revision: 1.65
Committed: Sat Jan 20 11:16:45 2007 UTC (18 years, 3 months ago) by spiga
Content type: text/x-python
Branch: MAIN
CVS Tags: CRAB_1_5_0_pre6
Changes since 1.64: +7 -2 lines
Log Message:
added last job splitting combination (event_per_job & number_of_job) for NON input mode

File Contents

# User Rev Content
1 slacapra 1.1 from JobType import JobType
2     from crab_logger import Logger
3     from crab_exceptions import *
4     from crab_util import *
5     import common
6 gutsche 1.3 import PsetManipulator
7 slacapra 1.1
8 slacapra 1.41 import DataDiscovery
9     import DataLocation
10 slacapra 1.1 import Scram
11    
12 slacapra 1.63 import os, string, re, shutil
13 slacapra 1.1
14     class Cmssw(JobType):
15 gutsche 1.38 def __init__(self, cfg_params, ncjobs):
16 slacapra 1.1 JobType.__init__(self, 'CMSSW')
17     common.logger.debug(3,'CMSSW::__init__')
18    
19 gutsche 1.3 # Marco.
20     self._params = {}
21     self.cfg_params = cfg_params
22 gutsche 1.38
23 gutsche 1.44 # number of jobs requested to be created, limit obj splitting
24 gutsche 1.38 self.ncjobs = ncjobs
25    
26 slacapra 1.1 log = common.logger
27    
28     self.scram = Scram.Scram(cfg_params)
29     self.additional_inbox_files = []
30     self.scriptExe = ''
31     self.executable = ''
32     self.tgz_name = 'default.tgz'
33 corvo 1.56 self.scriptName = 'CMSSW.sh'
34 spiga 1.42 self.pset = '' #scrip use case Da
35     self.datasetPath = '' #scrip use case Da
36 gutsche 1.3
37 gutsche 1.50 # set FJR file name
38     self.fjrFileName = 'crab_fjr.xml'
39    
40 slacapra 1.1 self.version = self.scram.getSWVersion()
41 slacapra 1.55 common.taskDB.setDict('codeVersion',self.version)
42 gutsche 1.5 self.setParam_('application', self.version)
43 slacapra 1.47
44 slacapra 1.1 ### collect Data cards
45     try:
46 slacapra 1.9 tmp = cfg_params['CMSSW.datasetpath']
47     log.debug(6, "CMSSW::CMSSW(): datasetPath = "+tmp)
48     if string.lower(tmp)=='none':
49     self.datasetPath = None
50 slacapra 1.21 self.selectNoInput = 1
51 slacapra 1.9 else:
52     self.datasetPath = tmp
53 slacapra 1.21 self.selectNoInput = 0
54 slacapra 1.1 except KeyError:
55 gutsche 1.3 msg = "Error: datasetpath not defined "
56 slacapra 1.1 raise CrabException(msg)
57 gutsche 1.5
58     # ML monitoring
59     # split dataset path style: /PreProdR3Minbias/SIM/GEN-SIM
60 slacapra 1.9 if not self.datasetPath:
61     self.setParam_('dataset', 'None')
62     self.setParam_('owner', 'None')
63     else:
64     datasetpath_split = self.datasetPath.split("/")
65     self.setParam_('dataset', datasetpath_split[1])
66     self.setParam_('owner', datasetpath_split[-1])
67    
68 gutsche 1.8 self.setTaskid_()
69     self.setParam_('taskId', self.cfg_params['taskId'])
70 gutsche 1.5
71 slacapra 1.1 self.dataTiers = []
72    
73     ## now the application
74     try:
75     self.executable = cfg_params['CMSSW.executable']
76 gutsche 1.5 self.setParam_('exe', self.executable)
77 slacapra 1.1 log.debug(6, "CMSSW::CMSSW(): executable = "+self.executable)
78     msg = "Default executable cmsRun overridden. Switch to " + self.executable
79     log.debug(3,msg)
80     except KeyError:
81     self.executable = 'cmsRun'
82 gutsche 1.5 self.setParam_('exe', self.executable)
83 slacapra 1.1 msg = "User executable not defined. Use cmsRun"
84     log.debug(3,msg)
85     pass
86    
87     try:
88     self.pset = cfg_params['CMSSW.pset']
89     log.debug(6, "Cmssw::Cmssw(): PSet file = "+self.pset)
90 spiga 1.42 if self.pset.lower() != 'none' :
91     if (not os.path.exists(self.pset)):
92     raise CrabException("User defined PSet file "+self.pset+" does not exist")
93     else:
94     self.pset = None
95 slacapra 1.1 except KeyError:
96     raise CrabException("PSet file missing. Cannot run cmsRun ")
97    
98     # output files
99 slacapra 1.53 ## stuff which must be returned always via sandbox
100     self.output_file_sandbox = []
101    
102     # add fjr report by default via sandbox
103     self.output_file_sandbox.append(self.fjrFileName)
104    
105     # other output files to be returned via sandbox or copied to SE
106 slacapra 1.1 try:
107     self.output_file = []
108     tmp = cfg_params['CMSSW.output_file']
109     if tmp != '':
110     tmpOutFiles = string.split(cfg_params['CMSSW.output_file'],',')
111     log.debug(7, 'cmssw::cmssw(): output files '+str(tmpOutFiles))
112     for tmp in tmpOutFiles:
113     tmp=string.strip(tmp)
114     self.output_file.append(tmp)
115     pass
116     else:
117 gutsche 1.50 log.message("No output file defined: only stdout/err and the CRAB Framework Job Report will be available")
118 slacapra 1.1 pass
119     pass
120     except KeyError:
121 gutsche 1.50 log.message("No output file defined: only stdout/err and the CRAB Framework Job Report will be available")
122 slacapra 1.1 pass
123    
124     # script_exe file as additional file in inputSandbox
125     try:
126 slacapra 1.10 self.scriptExe = cfg_params['USER.script_exe']
127     if self.scriptExe != '':
128     if not os.path.isfile(self.scriptExe):
129 slacapra 1.64 msg ="ERROR. file "+self.scriptExe+" not found"
130 slacapra 1.10 raise CrabException(msg)
131 spiga 1.42 self.additional_inbox_files.append(string.strip(self.scriptExe))
132 slacapra 1.1 except KeyError:
133 spiga 1.42 self.scriptExe = ''
134     #CarlosDaniele
135     if self.datasetPath == None and self.pset == None and self.scriptExe == '' :
136     msg ="WARNING. script_exe not defined"
137     raise CrabException(msg)
138    
139 slacapra 1.1 ## additional input files
140     try:
141 slacapra 1.29 tmpAddFiles = string.split(cfg_params['USER.additional_input_files'],',')
142 slacapra 1.64 common.logger.debug(5,"Additional input files: "+str(tmpAddFiles))
143     for tmpFile in tmpAddFiles:
144     tmpFile = string.strip(tmpFile)
145     if not os.path.exists(tmpFile):
146     raise CrabException("Additional input file not found: "+tmpFile)
147 slacapra 1.45 pass
148 slacapra 1.64 storedFile = common.work_space.shareDir()+ tmpFile
149     shutil.copyfile(tmpFile, storedFile)
150     self.additional_inbox_files.append(string.strip(storedFile))
151 slacapra 1.1 pass
152 slacapra 1.64 common.logger.debug(5,"Inbox files so far : "+str(self.additional_inbox_files))
153 slacapra 1.1 pass
154     except KeyError:
155     pass
156    
157 slacapra 1.9 # files per job
158 slacapra 1.1 try:
159 gutsche 1.35 if (cfg_params['CMSSW.files_per_jobs']):
160     raise CrabException("files_per_jobs no longer supported. Quitting.")
161 gutsche 1.3 except KeyError:
162 gutsche 1.35 pass
163 gutsche 1.3
164 slacapra 1.9 ## Events per job
165 gutsche 1.3 try:
166 slacapra 1.10 self.eventsPerJob =int( cfg_params['CMSSW.events_per_job'])
167 slacapra 1.9 self.selectEventsPerJob = 1
168 gutsche 1.3 except KeyError:
169 slacapra 1.9 self.eventsPerJob = -1
170     self.selectEventsPerJob = 0
171    
172 slacapra 1.22 ## number of jobs
173     try:
174     self.theNumberOfJobs =int( cfg_params['CMSSW.number_of_jobs'])
175     self.selectNumberOfJobs = 1
176     except KeyError:
177     self.theNumberOfJobs = 0
178     self.selectNumberOfJobs = 0
179 slacapra 1.10
180 gutsche 1.35 try:
181     self.total_number_of_events = int(cfg_params['CMSSW.total_number_of_events'])
182     self.selectTotalNumberEvents = 1
183     except KeyError:
184     self.total_number_of_events = 0
185     self.selectTotalNumberEvents = 0
186    
187 spiga 1.42 if self.pset != None: #CarlosDaniele
188     if ( (self.selectTotalNumberEvents + self.selectEventsPerJob + self.selectNumberOfJobs) != 2 ):
189     msg = 'Must define exactly two of total_number_of_events, events_per_job, or number_of_jobs.'
190     raise CrabException(msg)
191     else:
192     if (self.selectNumberOfJobs == 0):
193     msg = 'Must specify number_of_jobs.'
194     raise CrabException(msg)
195 gutsche 1.35
196 slacapra 1.22 ## source seed for pythia
197     try:
198     self.sourceSeed = int(cfg_params['CMSSW.pythia_seed'])
199     except KeyError:
200 slacapra 1.23 self.sourceSeed = None
201     common.logger.debug(5,"No seed given")
202 slacapra 1.22
203 slacapra 1.28 try:
204     self.sourceSeedVtx = int(cfg_params['CMSSW.vtx_seed'])
205     except KeyError:
206     self.sourceSeedVtx = None
207     common.logger.debug(5,"No vertex seed given")
208 spiga 1.57 try:
209     self.firstRun = int(cfg_params['CMSSW.first_run'])
210     except KeyError:
211     self.firstRun = None
212     common.logger.debug(5,"No first run given")
213 spiga 1.42 if self.pset != None: #CarlosDaniele
214     self.PsetEdit = PsetManipulator.PsetManipulator(self.pset) #Daniele Pset
215 gutsche 1.3
216 slacapra 1.1 #DBSDLS-start
217     ## Initialize the variables that are extracted from DBS/DLS and needed in other places of the code
218     self.maxEvents=0 # max events available ( --> check the requested nb. of evts in Creator.py)
219     self.DBSPaths={} # all dbs paths requested ( --> input to the site local discovery script)
220 gutsche 1.35 self.jobDestination=[] # Site destination(s) for each job (list of lists)
221 slacapra 1.1 ## Perform the data location and discovery (based on DBS/DLS)
222 slacapra 1.9 ## SL: Don't if NONE is specified as input (pythia use case)
223 gutsche 1.35 blockSites = {}
224 slacapra 1.9 if self.datasetPath:
225 gutsche 1.35 blockSites = self.DataDiscoveryAndLocation(cfg_params)
226 slacapra 1.1 #DBSDLS-end
227    
228     self.tgzNameWithPath = self.getTarBall(self.executable)
229 slacapra 1.10
230 slacapra 1.9 ## Select Splitting
231 spiga 1.42 if self.selectNoInput:
232     if self.pset == None: #CarlosDaniele
233     self.jobSplittingForScript()
234     else:
235     self.jobSplittingNoInput()
236 corvo 1.56 else:
237     self.jobSplittingByBlocks(blockSites)
238 gutsche 1.5
239 slacapra 1.22 # modify Pset
240 spiga 1.42 if self.pset != None: #CarlosDaniele
241     try:
242     if (self.datasetPath): # standard job
243     # allow to processa a fraction of events in a file
244     self.PsetEdit.inputModule("INPUT")
245     self.PsetEdit.maxEvent("INPUTMAXEVENTS")
246     self.PsetEdit.skipEvent("INPUTSKIPEVENTS")
247     else: # pythia like job
248     self.PsetEdit.maxEvent(self.eventsPerJob)
249 spiga 1.57 if (self.firstRun):
250     self.PsetEdit.pythiaFirstRun("INPUTFIRSTRUN") #First Run
251 spiga 1.42 if (self.sourceSeed) :
252     self.PsetEdit.pythiaSeed("INPUT")
253     if (self.sourceSeedVtx) :
254     self.PsetEdit.pythiaSeedVtx("INPUTVTX")
255 gutsche 1.50 # add FrameworkJobReport to parameter-set
256     self.PsetEdit.addCrabFJR(self.fjrFileName)
257 spiga 1.42 self.PsetEdit.psetWriter(self.configFilename())
258     except:
259     msg='Error while manipuliating ParameterSet: exiting...'
260     raise CrabException(msg)
261 gutsche 1.3
262 slacapra 1.1 def DataDiscoveryAndLocation(self, cfg_params):
263    
264 gutsche 1.3 common.logger.debug(10,"CMSSW::DataDiscoveryAndLocation()")
265    
266     datasetPath=self.datasetPath
267    
268     ## TODO
269     dataTiersList = ""
270     dataTiers = dataTiersList.split(',')
271 slacapra 1.1
272     ## Contact the DBS
273 slacapra 1.41 common.logger.message("Contacting DBS...")
274 slacapra 1.1 try:
275 slacapra 1.41 self.pubdata=DataDiscovery.DataDiscovery(datasetPath, dataTiers, cfg_params)
276 slacapra 1.1 self.pubdata.fetchDBSInfo()
277    
278 slacapra 1.41 except DataDiscovery.NotExistingDatasetError, ex :
279 slacapra 1.1 msg = 'ERROR ***: failed Data Discovery in DBS : %s'%ex.getErrorMessage()
280     raise CrabException(msg)
281    
282 slacapra 1.41 except DataDiscovery.NoDataTierinProvenanceError, ex :
283 slacapra 1.1 msg = 'ERROR ***: failed Data Discovery in DBS : %s'%ex.getErrorMessage()
284     raise CrabException(msg)
285 slacapra 1.41 except DataDiscovery.DataDiscoveryError, ex:
286 slacapra 1.1 msg = 'ERROR ***: failed Data Discovery in DBS %s'%ex.getErrorMessage()
287     raise CrabException(msg)
288    
289     ## get list of all required data in the form of dbs paths (dbs path = /dataset/datatier/owner)
290 gutsche 1.3 ## self.DBSPaths=self.pubdata.getDBSPaths()
291     common.logger.message("Required data are :"+self.datasetPath)
292    
293 gutsche 1.35 self.filesbyblock=self.pubdata.getFiles()
294 mkirn 1.37 self.eventsbyblock=self.pubdata.getEventsPerBlock()
295     self.eventsbyfile=self.pubdata.getEventsPerFile()
296 slacapra 1.41 # print str(self.filesbyblock)
297     # print 'self.eventsbyfile',len(self.eventsbyfile)
298     # print str(self.eventsbyfile)
299 gutsche 1.3
300 slacapra 1.1 ## get max number of events
301     self.maxEvents=self.pubdata.getMaxEvents() ## self.maxEvents used in Creator.py
302 gutsche 1.44 common.logger.message("The number of available events is %s\n"%self.maxEvents)
303 slacapra 1.1
304 slacapra 1.41 common.logger.message("Contacting DLS...")
305 slacapra 1.1 ## Contact the DLS and build a list of sites hosting the fileblocks
306     try:
307 slacapra 1.41 dataloc=DataLocation.DataLocation(self.filesbyblock.keys(),cfg_params)
308 gutsche 1.6 dataloc.fetchDLSInfo()
309 slacapra 1.41 except DataLocation.DataLocationError , ex:
310 slacapra 1.1 msg = 'ERROR ***: failed Data Location in DLS \n %s '%ex.getErrorMessage()
311     raise CrabException(msg)
312    
313    
314 gutsche 1.35 sites = dataloc.getSites()
315     allSites = []
316     listSites = sites.values()
317 slacapra 1.63 for listSite in listSites:
318     for oneSite in listSite:
319 gutsche 1.35 allSites.append(oneSite)
320     allSites = self.uniquelist(allSites)
321 gutsche 1.3
322 gutsche 1.35 common.logger.message("Sites ("+str(len(allSites))+") hosting part/all of dataset: "+str(allSites))
323     common.logger.debug(6, "List of Sites: "+str(allSites))
324     return sites
325 gutsche 1.3
326 gutsche 1.35 def jobSplittingByBlocks(self, blockSites):
327 slacapra 1.9 """
328 gutsche 1.35 Perform job splitting. Jobs run over an integer number of files
329     and no more than one block.
330     ARGUMENT: blockSites: dictionary with blocks as keys and list of host sites as values
331     REQUIRES: self.selectTotalNumberEvents, self.selectEventsPerJob, self.selectNumberofJobs,
332     self.total_number_of_events, self.eventsPerJob, self.theNumberOfJobs,
333     self.maxEvents, self.filesbyblock
334     SETS: self.jobDestination - Site destination(s) for each job (a list of lists)
335     self.total_number_of_jobs - Total # of jobs
336     self.list_of_args - File(s) job will run on (a list of lists)
337     """
338    
339     # ---- Handle the possible job splitting configurations ---- #
340     if (self.selectTotalNumberEvents):
341     totalEventsRequested = self.total_number_of_events
342     if (self.selectEventsPerJob):
343     eventsPerJobRequested = self.eventsPerJob
344     if (self.selectNumberOfJobs):
345     totalEventsRequested = self.theNumberOfJobs * self.eventsPerJob
346    
347     # If user requested all the events in the dataset
348     if (totalEventsRequested == -1):
349     eventsRemaining=self.maxEvents
350     # If user requested more events than are in the dataset
351     elif (totalEventsRequested > self.maxEvents):
352     eventsRemaining = self.maxEvents
353     common.logger.message("Requested "+str(self.total_number_of_events)+ " events, but only "+str(self.maxEvents)+" events are available.")
354     # If user requested less events than are in the dataset
355     else:
356     eventsRemaining = totalEventsRequested
357 slacapra 1.22
358 slacapra 1.41 # If user requested more events per job than are in the dataset
359     if (self.selectEventsPerJob and eventsPerJobRequested > self.maxEvents):
360     eventsPerJobRequested = self.maxEvents
361    
362 gutsche 1.35 # For user info at end
363     totalEventCount = 0
364 gutsche 1.3
365 gutsche 1.35 if (self.selectTotalNumberEvents and self.selectNumberOfJobs):
366     eventsPerJobRequested = int(eventsRemaining/self.theNumberOfJobs)
367 slacapra 1.22
368 gutsche 1.35 if (self.selectNumberOfJobs):
369     common.logger.message("May not create the exact number_of_jobs requested.")
370 slacapra 1.23
371 gutsche 1.38 if ( self.ncjobs == 'all' ) :
372     totalNumberOfJobs = 999999999
373     else :
374     totalNumberOfJobs = self.ncjobs
375    
376    
377 gutsche 1.35 blocks = blockSites.keys()
378     blockCount = 0
379     # Backup variable in case self.maxEvents counted events in a non-included block
380     numBlocksInDataset = len(blocks)
381 gutsche 1.3
382 gutsche 1.35 jobCount = 0
383     list_of_lists = []
384 gutsche 1.3
385 gutsche 1.35 # ---- Iterate over the blocks in the dataset until ---- #
386     # ---- we've met the requested total # of events ---- #
387 gutsche 1.38 while ( (eventsRemaining > 0) and (blockCount < numBlocksInDataset) and (jobCount < totalNumberOfJobs)):
388 gutsche 1.35 block = blocks[blockCount]
389 gutsche 1.44 blockCount += 1
390    
391 gutsche 1.3
392 gutsche 1.44 numEventsInBlock = self.eventsbyblock[block]
393     common.logger.debug(5,'Events in Block File '+str(numEventsInBlock))
394 slacapra 1.9
395 gutsche 1.35 files = self.filesbyblock[block]
396     numFilesInBlock = len(files)
397     if (numFilesInBlock <= 0):
398     continue
399     fileCount = 0
400    
401     # ---- New block => New job ---- #
402     parString = "\\{"
403 gutsche 1.38 # counter for number of events in files currently worked on
404     filesEventCount = 0
405     # flag if next while loop should touch new file
406     newFile = 1
407     # job event counter
408     jobSkipEventCount = 0
409 slacapra 1.9
410 gutsche 1.35 # ---- Iterate over the files in the block until we've met the requested ---- #
411     # ---- total # of events or we've gone over all the files in this block ---- #
412 gutsche 1.38 while ( (eventsRemaining > 0) and (fileCount < numFilesInBlock) and (jobCount < totalNumberOfJobs) ):
413 gutsche 1.35 file = files[fileCount]
414 gutsche 1.38 if newFile :
415 slacapra 1.41 try:
416     numEventsInFile = self.eventsbyfile[file]
417     common.logger.debug(6, "File "+str(file)+" has "+str(numEventsInFile)+" events")
418     # increase filesEventCount
419     filesEventCount += numEventsInFile
420     # Add file to current job
421     parString += '\\\"' + file + '\\\"\,'
422     newFile = 0
423     except KeyError:
424 gutsche 1.44 common.logger.message("File "+str(file)+" has unknown number of events: skipping")
425 slacapra 1.41
426 gutsche 1.38
427     # if less events in file remain than eventsPerJobRequested
428     if ( filesEventCount - jobSkipEventCount < eventsPerJobRequested ) :
429     # if last file in block
430 gutsche 1.44 if ( fileCount == numFilesInBlock-1 ) :
431 gutsche 1.38 # end job using last file, use remaining events in block
432     # close job and touch new file
433     fullString = parString[:-2]
434     fullString += '\\}'
435     list_of_lists.append([fullString,str(-1),str(jobSkipEventCount)])
436 slacapra 1.41 common.logger.debug(3,"Job "+str(jobCount+1)+" can run over "+str(filesEventCount - jobSkipEventCount)+" events (last file in block).")
437 gutsche 1.38 self.jobDestination.append(blockSites[block])
438     common.logger.debug(5,"Job "+str(jobCount+1)+" Destination: "+str(self.jobDestination[jobCount]))
439     # reset counter
440     jobCount = jobCount + 1
441 gutsche 1.44 totalEventCount = totalEventCount + filesEventCount - jobSkipEventCount
442     eventsRemaining = eventsRemaining - filesEventCount + jobSkipEventCount
443 gutsche 1.38 jobSkipEventCount = 0
444     # reset file
445     parString = "\\{"
446     filesEventCount = 0
447     newFile = 1
448     fileCount += 1
449     else :
450     # go to next file
451     newFile = 1
452     fileCount += 1
453     # if events in file equal to eventsPerJobRequested
454     elif ( filesEventCount - jobSkipEventCount == eventsPerJobRequested ) :
455     # close job and touch new file
456 gutsche 1.35 fullString = parString[:-2]
457     fullString += '\\}'
458 gutsche 1.38 list_of_lists.append([fullString,str(eventsPerJobRequested),str(jobSkipEventCount)])
459 slacapra 1.41 common.logger.debug(3,"Job "+str(jobCount+1)+" can run over "+str(eventsPerJobRequested)+" events.")
460 gutsche 1.38 self.jobDestination.append(blockSites[block])
461     common.logger.debug(5,"Job "+str(jobCount+1)+" Destination: "+str(self.jobDestination[jobCount]))
462     # reset counter
463     jobCount = jobCount + 1
464     totalEventCount = totalEventCount + eventsPerJobRequested
465     eventsRemaining = eventsRemaining - eventsPerJobRequested
466     jobSkipEventCount = 0
467     # reset file
468     parString = "\\{"
469     filesEventCount = 0
470     newFile = 1
471     fileCount += 1
472    
473     # if more events in file remain than eventsPerJobRequested
474     else :
475     # close job but don't touch new file
476     fullString = parString[:-2]
477     fullString += '\\}'
478     list_of_lists.append([fullString,str(eventsPerJobRequested),str(jobSkipEventCount)])
479 slacapra 1.41 common.logger.debug(3,"Job "+str(jobCount+1)+" can run over "+str(eventsPerJobRequested)+" events.")
480 gutsche 1.35 self.jobDestination.append(blockSites[block])
481     common.logger.debug(5,"Job "+str(jobCount+1)+" Destination: "+str(self.jobDestination[jobCount]))
482 gutsche 1.38 # increase counter
483     jobCount = jobCount + 1
484     totalEventCount = totalEventCount + eventsPerJobRequested
485     eventsRemaining = eventsRemaining - eventsPerJobRequested
486     # calculate skip events for last file
487     # use filesEventCount (contains several files), jobSkipEventCount and eventsPerJobRequest
488     jobSkipEventCount = eventsPerJobRequested - (filesEventCount - jobSkipEventCount - self.eventsbyfile[file])
489     # remove all but the last file
490     filesEventCount = self.eventsbyfile[file]
491     parString = "\\{"
492     parString += '\\\"' + file + '\\\"\,'
493 slacapra 1.41 pass # END if
494 gutsche 1.35 pass # END while (iterate over files in the block)
495     pass # END while (iterate over blocks in the dataset)
496 slacapra 1.41 self.ncjobs = self.total_number_of_jobs = jobCount
497 gutsche 1.38 if (eventsRemaining > 0 and jobCount < totalNumberOfJobs ):
498 gutsche 1.35 common.logger.message("Could not run on all requested events because some blocks not hosted at allowed sites.")
499 mkirn 1.37 common.logger.message("\n"+str(jobCount)+" job(s) can run on "+str(totalEventCount)+" events.\n")
500 slacapra 1.22
501 slacapra 1.9 self.list_of_args = list_of_lists
502     return
503    
504 slacapra 1.21 def jobSplittingNoInput(self):
505 slacapra 1.9 """
506     Perform job splitting based on number of event per job
507     """
508     common.logger.debug(5,'Splitting per events')
509     common.logger.message('Required '+str(self.eventsPerJob)+' events per job ')
510 slacapra 1.22 common.logger.message('Required '+str(self.theNumberOfJobs)+' jobs in total ')
511 slacapra 1.9 common.logger.message('Required '+str(self.total_number_of_events)+' events in total ')
512    
513 slacapra 1.10 if (self.total_number_of_events < 0):
514     msg='Cannot split jobs per Events with "-1" as total number of events'
515     raise CrabException(msg)
516    
517 slacapra 1.22 if (self.selectEventsPerJob):
518 spiga 1.65 if (self.selectTotalNumberEvents):
519     self.total_number_of_jobs = int(self.total_number_of_events/self.eventsPerJob)
520     elif(self.selectNumberOfJobs) :
521     self.total_number_of_jobs =self.theNumberOfJobs
522     self.total_number_of_events =int(self.theNumberOfJobs*self.eventsPerJob)
523    
524 slacapra 1.22 elif (self.selectNumberOfJobs) :
525     self.total_number_of_jobs = self.theNumberOfJobs
526     self.eventsPerJob = int(self.total_number_of_events/self.total_number_of_jobs)
527 spiga 1.65
528 slacapra 1.9 common.logger.debug(5,'N jobs '+str(self.total_number_of_jobs))
529    
530     # is there any remainder?
531     check = int(self.total_number_of_events) - (int(self.total_number_of_jobs)*self.eventsPerJob)
532    
533     common.logger.debug(5,'Check '+str(check))
534    
535 gutsche 1.35 common.logger.message(str(self.total_number_of_jobs)+' jobs can be created, each for '+str(self.eventsPerJob)+' for a total of '+str(self.total_number_of_jobs*self.eventsPerJob)+' events')
536 slacapra 1.9 if check > 0:
537 gutsche 1.35 common.logger.message('Warning: asked '+str(self.total_number_of_events)+' but can do only '+str(int(self.total_number_of_jobs)*self.eventsPerJob))
538 slacapra 1.9
539 slacapra 1.10 # argument is seed number.$i
540 slacapra 1.9 self.list_of_args = []
541     for i in range(self.total_number_of_jobs):
542 gutsche 1.35 ## Since there is no input, any site is good
543 spiga 1.42 # self.jobDestination.append(["Any"])
544     self.jobDestination.append([""]) #must be empty to write correctly the xml
545 spiga 1.57 args=''
546     if (self.firstRun):
547     ## pythia first run
548     #self.list_of_args.append([(str(self.firstRun)+str(i))])
549     args=args+(str(self.firstRun)+str(i))
550     else:
551     ## no first run
552     #self.list_of_args.append([str(i)])
553     args=args+str(i)
554 slacapra 1.23 if (self.sourceSeed):
555 slacapra 1.28 if (self.sourceSeedVtx):
556     ## pythia + vtx random seed
557 spiga 1.57 #self.list_of_args.append([
558     # str(self.sourceSeed)+str(i),
559     # str(self.sourceSeedVtx)+str(i)
560     # ])
561     args=args+str(',')+str(self.sourceSeed)+str(i)+str(',')+str(self.sourceSeedVtx)+str(i)
562 slacapra 1.28 else:
563     ## only pythia random seed
564 spiga 1.57 #self.list_of_args.append([(str(self.sourceSeed)+str(i))])
565     args=args +str(',')+str(self.sourceSeed)+str(i)
566 slacapra 1.23 else:
567 slacapra 1.28 ## no random seed
568 spiga 1.57 if str(args)=='': args=args+(str(self.firstRun)+str(i))
569     arguments=args.split(',')
570     if len(arguments)==3:self.list_of_args.append([str(arguments[0]),str(arguments[1]),str(arguments[2])])
571     elif len(arguments)==2:self.list_of_args.append([str(arguments[0]),str(arguments[1])])
572     else :self.list_of_args.append([str(arguments[0])])
573    
574     # print self.list_of_args
575 gutsche 1.3
576     return
577    
578 spiga 1.42
579     def jobSplittingForScript(self):#CarlosDaniele
580     """
581     Perform job splitting based on number of job
582     """
583     common.logger.debug(5,'Splitting per job')
584     common.logger.message('Required '+str(self.theNumberOfJobs)+' jobs in total ')
585    
586     self.total_number_of_jobs = self.theNumberOfJobs
587    
588     common.logger.debug(5,'N jobs '+str(self.total_number_of_jobs))
589    
590     common.logger.message(str(self.total_number_of_jobs)+' jobs can be created')
591    
592     # argument is seed number.$i
593     self.list_of_args = []
594     for i in range(self.total_number_of_jobs):
595     ## Since there is no input, any site is good
596     # self.jobDestination.append(["Any"])
597     self.jobDestination.append([""])
598     ## no random seed
599     self.list_of_args.append([str(i)])
600     return
601    
602 gutsche 1.3 def split(self, jobParams):
603    
604     common.jobDB.load()
605     #### Fabio
606     njobs = self.total_number_of_jobs
607 slacapra 1.9 arglist = self.list_of_args
608 gutsche 1.3 # create the empty structure
609     for i in range(njobs):
610     jobParams.append("")
611    
612     for job in range(njobs):
613 slacapra 1.17 jobParams[job] = arglist[job]
614     # print str(arglist[job])
615     # print jobParams[job]
616 gutsche 1.3 common.jobDB.setArguments(job, jobParams[job])
617 gutsche 1.35 common.logger.debug(5,"Job "+str(job)+" Destination: "+str(self.jobDestination[job]))
618     common.jobDB.setDestination(job, self.jobDestination[job])
619 gutsche 1.3
620     common.jobDB.save()
621     return
622    
623     def getJobTypeArguments(self, nj, sched):
624 slacapra 1.17 result = ''
625     for i in common.jobDB.arguments(nj):
626     result=result+str(i)+" "
627     return result
628 gutsche 1.3
629     def numberOfJobs(self):
630     # Fabio
631     return self.total_number_of_jobs
632    
633 slacapra 1.1 def getTarBall(self, exe):
634     """
635     Return the TarBall with lib and exe
636     """
637    
638     # if it exist, just return it
639 corvo 1.56 #
640     # Marco. Let's start to use relative path for Boss XML files
641     #
642     self.tgzNameWithPath = common.work_space.pathForTgz()+'share/'+self.tgz_name
643 slacapra 1.1 if os.path.exists(self.tgzNameWithPath):
644     return self.tgzNameWithPath
645    
646     # Prepare a tar gzipped file with user binaries.
647     self.buildTar_(exe)
648    
649     return string.strip(self.tgzNameWithPath)
650    
651     def buildTar_(self, executable):
652    
653     # First of all declare the user Scram area
654     swArea = self.scram.getSWArea_()
655     #print "swArea = ", swArea
656 slacapra 1.63 # swVersion = self.scram.getSWVersion()
657     # print "swVersion = ", swVersion
658 slacapra 1.1 swReleaseTop = self.scram.getReleaseTop_()
659     #print "swReleaseTop = ", swReleaseTop
660    
661     ## check if working area is release top
662     if swReleaseTop == '' or swArea == swReleaseTop:
663     return
664    
665 slacapra 1.61 import tarfile
666     try: # create tar ball
667     tar = tarfile.open(self.tgzNameWithPath, "w:gz")
668     ## First find the executable
669     if (self.executable != ''):
670     exeWithPath = self.scram.findFile_(executable)
671     if ( not exeWithPath ):
672     raise CrabException('User executable '+executable+' not found')
673    
674     ## then check if it's private or not
675     if exeWithPath.find(swReleaseTop) == -1:
676     # the exe is private, so we must ship
677     common.logger.debug(5,"Exe "+exeWithPath+" to be tarred")
678     path = swArea+'/'
679     exe = string.replace(exeWithPath, path,'')
680     tar.add(path+exe,exe)
681     pass
682     else:
683     # the exe is from release, we'll find it on WN
684     pass
685    
686     ## Now get the libraries: only those in local working area
687     libDir = 'lib'
688     lib = swArea+'/' +libDir
689     common.logger.debug(5,"lib "+lib+" to be tarred")
690     if os.path.exists(lib):
691     tar.add(lib,libDir)
692    
693     ## Now check if module dir is present
694     moduleDir = 'module'
695     module = swArea + '/' + moduleDir
696     if os.path.isdir(module):
697     tar.add(module,moduleDir)
698    
699     ## Now check if any data dir(s) is present
700     swAreaLen=len(swArea)
701     for root, dirs, files in os.walk(swArea):
702     if "data" in dirs:
703     common.logger.debug(5,"data "+root+"/data"+" to be tarred")
704     tar.add(root+"/data",root[swAreaLen:]+"/data")
705    
706     ## Add ProdAgent dir to tar
707     paDir = 'ProdAgentApi'
708     pa = os.environ['CRABDIR'] + '/' + 'ProdAgentApi'
709     if os.path.isdir(pa):
710     tar.add(pa,paDir)
711    
712     common.logger.debug(5,"Files added to "+self.tgzNameWithPath+" : "+str(tar.getnames()))
713     tar.close()
714     except :
715     raise CrabException('Could not create tar-ball')
716 corvo 1.56
717 slacapra 1.61 ## create tar-ball with ML stuff
718 corvo 1.58 self.MLtgzfile = common.work_space.pathForTgz()+'share/MLfiles.tgz'
719 slacapra 1.61 try:
720     tar = tarfile.open(self.MLtgzfile, "w:gz")
721     path=os.environ['CRABDIR'] + '/python/'
722     for file in ['report.py', 'DashboardAPI.py', 'Logger.py', 'ProcInfo.py', 'apmon.py', 'parseCrabFjr.py']:
723     tar.add(path+file,file)
724     common.logger.debug(5,"Files added to "+self.MLtgzfile+" : "+str(tar.getnames()))
725     tar.close()
726     except :
727 corvo 1.58 raise CrabException('Could not create ML files tar-ball')
728    
729 slacapra 1.1 return
730    
731     def wsSetupEnvironment(self, nj):
732     """
733     Returns part of a job script which prepares
734     the execution environment for the job 'nj'.
735     """
736     # Prepare JobType-independent part
737 gutsche 1.3 txt = ''
738    
739     ## OLI_Daniele at this level middleware already known
740    
741     txt += 'if [ $middleware == LCG ]; then \n'
742     txt += self.wsSetupCMSLCGEnvironment_()
743     txt += 'elif [ $middleware == OSG ]; then\n'
744 gutsche 1.43 txt += ' WORKING_DIR=`/bin/mktemp -d $OSG_WN_TMP/cms_XXXXXXXXXXXX`\n'
745     txt += ' echo "Created working directory: $WORKING_DIR"\n'
746 gutsche 1.3 txt += ' if [ ! -d $WORKING_DIR ] ;then\n'
747 gutsche 1.7 txt += ' echo "SET_CMS_ENV 10016 ==> OSG $WORKING_DIR could not be created on WN `hostname`"\n'
748     txt += ' echo "JOB_EXIT_STATUS = 10016"\n'
749     txt += ' echo "JobExitCode=10016" | tee -a $RUNTIME_AREA/$repo\n'
750     txt += ' dumpStatus $RUNTIME_AREA/$repo\n'
751 gutsche 1.13 txt += ' rm -f $RUNTIME_AREA/$repo \n'
752     txt += ' echo "MonitorJobID=`echo $MonitorJobID`" | tee -a $RUNTIME_AREA/$repo \n'
753     txt += ' echo "MonitorID=`echo $MonitorID`" | tee -a $RUNTIME_AREA/$repo\n'
754 gutsche 1.3 txt += ' exit 1\n'
755     txt += ' fi\n'
756     txt += '\n'
757     txt += ' echo "Change to working directory: $WORKING_DIR"\n'
758     txt += ' cd $WORKING_DIR\n'
759     txt += self.wsSetupCMSOSGEnvironment_()
760     txt += 'fi\n'
761 slacapra 1.1
762     # Prepare JobType-specific part
763     scram = self.scram.commandName()
764     txt += '\n\n'
765     txt += 'echo "### SPECIFIC JOB SETUP ENVIRONMENT ###"\n'
766     txt += scram+' project CMSSW '+self.version+'\n'
767     txt += 'status=$?\n'
768     txt += 'if [ $status != 0 ] ; then\n'
769 gutsche 1.7 txt += ' echo "SET_EXE_ENV 10034 ==>ERROR CMSSW '+self.version+' not found on `hostname`" \n'
770 gutsche 1.3 txt += ' echo "JOB_EXIT_STATUS = 10034"\n'
771 gutsche 1.7 txt += ' echo "JobExitCode=10034" | tee -a $RUNTIME_AREA/$repo\n'
772 slacapra 1.1 txt += ' dumpStatus $RUNTIME_AREA/$repo\n'
773 gutsche 1.13 txt += ' rm -f $RUNTIME_AREA/$repo \n'
774     txt += ' echo "MonitorJobID=`echo $MonitorJobID`" | tee -a $RUNTIME_AREA/$repo \n'
775     txt += ' echo "MonitorID=`echo $MonitorID`" | tee -a $RUNTIME_AREA/$repo\n'
776 gutsche 1.3 ## OLI_Daniele
777     txt += ' if [ $middleware == OSG ]; then \n'
778     txt += ' echo "Remove working directory: $WORKING_DIR"\n'
779     txt += ' cd $RUNTIME_AREA\n'
780     txt += ' /bin/rm -rf $WORKING_DIR\n'
781     txt += ' if [ -d $WORKING_DIR ] ;then\n'
782 gutsche 1.7 txt += ' echo "SET_CMS_ENV 10018 ==> OSG $WORKING_DIR could not be deleted on WN `hostname` after CMSSW CMSSW_0_6_1 not found on `hostname`"\n'
783     txt += ' echo "JOB_EXIT_STATUS = 10018"\n'
784     txt += ' echo "JobExitCode=10018" | tee -a $RUNTIME_AREA/$repo\n'
785     txt += ' dumpStatus $RUNTIME_AREA/$repo\n'
786 gutsche 1.13 txt += ' rm -f $RUNTIME_AREA/$repo \n'
787     txt += ' echo "MonitorJobID=`echo $MonitorJobID`" | tee -a $RUNTIME_AREA/$repo \n'
788     txt += ' echo "MonitorID=`echo $MonitorID`" | tee -a $RUNTIME_AREA/$repo\n'
789 gutsche 1.3 txt += ' fi\n'
790     txt += ' fi \n'
791     txt += ' exit 1 \n'
792 slacapra 1.1 txt += 'fi \n'
793     txt += 'echo "CMSSW_VERSION = '+self.version+'"\n'
794     txt += 'cd '+self.version+'\n'
795     ### needed grep for bug in scramv1 ###
796 corvo 1.58 txt += scram+' runtime -sh\n'
797 slacapra 1.1 txt += 'eval `'+scram+' runtime -sh | grep -v SCRAMRT_LSB_JOBNAME`\n'
798 corvo 1.58 txt += 'echo $PATH\n'
799 slacapra 1.1
800     # Handle the arguments:
801     txt += "\n"
802 gutsche 1.7 txt += "## number of arguments (first argument always jobnumber)\n"
803 slacapra 1.1 txt += "\n"
804 mkirn 1.32 # txt += "narg=$#\n"
805     txt += "if [ $nargs -lt 2 ]\n"
806 slacapra 1.1 txt += "then\n"
807 mkirn 1.33 txt += " echo 'SET_EXE_ENV 1 ==> ERROR Too few arguments' +$nargs+ \n"
808 gutsche 1.3 txt += ' echo "JOB_EXIT_STATUS = 50113"\n'
809 gutsche 1.7 txt += ' echo "JobExitCode=50113" | tee -a $RUNTIME_AREA/$repo\n'
810 slacapra 1.1 txt += ' dumpStatus $RUNTIME_AREA/$repo\n'
811 gutsche 1.13 txt += ' rm -f $RUNTIME_AREA/$repo \n'
812     txt += ' echo "MonitorJobID=`echo $MonitorJobID`" | tee -a $RUNTIME_AREA/$repo \n'
813     txt += ' echo "MonitorID=`echo $MonitorID`" | tee -a $RUNTIME_AREA/$repo\n'
814 gutsche 1.3 ## OLI_Daniele
815     txt += ' if [ $middleware == OSG ]; then \n'
816     txt += ' echo "Remove working directory: $WORKING_DIR"\n'
817     txt += ' cd $RUNTIME_AREA\n'
818     txt += ' /bin/rm -rf $WORKING_DIR\n'
819     txt += ' if [ -d $WORKING_DIR ] ;then\n'
820 gutsche 1.7 txt += ' echo "SET_EXE_ENV 50114 ==> OSG $WORKING_DIR could not be deleted on WN `hostname` after Too few arguments for CRAB job wrapper"\n'
821     txt += ' echo "JOB_EXIT_STATUS = 50114"\n'
822     txt += ' echo "JobExitCode=50114" | tee -a $RUNTIME_AREA/$repo\n'
823     txt += ' dumpStatus $RUNTIME_AREA/$repo\n'
824 gutsche 1.13 txt += ' rm -f $RUNTIME_AREA/$repo \n'
825     txt += ' echo "MonitorJobID=`echo $MonitorJobID`" | tee -a $RUNTIME_AREA/$repo \n'
826     txt += ' echo "MonitorID=`echo $MonitorID`" | tee -a $RUNTIME_AREA/$repo\n'
827 gutsche 1.3 txt += ' fi\n'
828     txt += ' fi \n'
829 slacapra 1.1 txt += " exit 1\n"
830     txt += "fi\n"
831     txt += "\n"
832    
833     # Prepare job-specific part
834     job = common.job_list[nj]
835 spiga 1.42 if self.pset != None: #CarlosDaniele
836     pset = os.path.basename(job.configFilename())
837     txt += '\n'
838     if (self.datasetPath): # standard job
839     #txt += 'InputFiles=$2\n'
840     txt += 'InputFiles=${args[1]}\n'
841     txt += 'MaxEvents=${args[2]}\n'
842     txt += 'SkipEvents=${args[3]}\n'
843     txt += 'echo "Inputfiles:<$InputFiles>"\n'
844     txt += 'sed "s#{\'INPUT\'}#$InputFiles#" $RUNTIME_AREA/'+pset+' > pset_tmp_1.cfg\n'
845     txt += 'echo "MaxEvents:<$MaxEvents>"\n'
846 gutsche 1.46 txt += 'sed "s#INPUTMAXEVENTS#$MaxEvents#" pset_tmp_1.cfg > pset_tmp_2.cfg\n'
847 spiga 1.42 txt += 'echo "SkipEvents:<$SkipEvents>"\n'
848 gutsche 1.46 txt += 'sed "s#INPUTSKIPEVENTS#$SkipEvents#" pset_tmp_2.cfg > pset.cfg\n'
849 spiga 1.42 else: # pythia like job
850     if (self.sourceSeed):
851 spiga 1.57 txt += 'FirstRun=${args[1]}\n'
852     txt += 'echo "FirstRun: <$FirstRun>"\n'
853     txt += 'sed "s#\<INPUTFIRSTRUN\>#$FirstRun#" $RUNTIME_AREA/'+pset+' > tmp_1.cfg\n'
854     else:
855     txt += '# Copy untouched pset\n'
856     txt += 'cp $RUNTIME_AREA/'+pset+' tmp_1.cfg\n'
857     if (self.sourceSeed):
858 spiga 1.42 # txt += 'Seed=$2\n'
859 spiga 1.57 txt += 'Seed=${args[2]}\n'
860 spiga 1.42 txt += 'echo "Seed: <$Seed>"\n'
861 spiga 1.57 txt += 'sed "s#\<INPUT\>#$Seed#" tmp_1.cfg > tmp_2.cfg\n'
862 spiga 1.42 if (self.sourceSeedVtx):
863     # txt += 'VtxSeed=$3\n'
864 spiga 1.57 txt += 'VtxSeed=${args[3]}\n'
865 spiga 1.42 txt += 'echo "VtxSeed: <$VtxSeed>"\n'
866 spiga 1.57 txt += 'sed "s#INPUTVTX#$VtxSeed#" tmp_2.cfg > pset.cfg\n'
867 spiga 1.42 else:
868 spiga 1.57 txt += 'mv tmp_2.cfg pset.cfg\n'
869 slacapra 1.28 else:
870 spiga 1.57 txt += 'mv tmp_1.cfg pset.cfg\n'
871     # txt += '# Copy untouched pset\n'
872     # txt += 'cp $RUNTIME_AREA/'+pset+' pset.cfg\n'
873 slacapra 1.24
874 slacapra 1.1
875     if len(self.additional_inbox_files) > 0:
876     for file in self.additional_inbox_files:
877 mkirn 1.31 relFile = file.split("/")[-1]
878     txt += 'if [ -e $RUNTIME_AREA/'+relFile+' ] ; then\n'
879     txt += ' cp $RUNTIME_AREA/'+relFile+' .\n'
880     txt += ' chmod +x '+relFile+'\n'
881 slacapra 1.1 txt += 'fi\n'
882     pass
883    
884 spiga 1.42 if self.pset != None: #CarlosDaniele
885     txt += 'echo "### END JOB SETUP ENVIRONMENT ###"\n\n'
886    
887     txt += '\n'
888     txt += 'echo "***** cat pset.cfg *********"\n'
889     txt += 'cat pset.cfg\n'
890     txt += 'echo "****** end pset.cfg ********"\n'
891     txt += '\n'
892     # txt += 'echo "***** cat pset1.cfg *********"\n'
893     # txt += 'cat pset1.cfg\n'
894     # txt += 'echo "****** end pset1.cfg ********"\n'
895 gutsche 1.3 return txt
896    
897 slacapra 1.63 def wsBuildExe(self, nj=0):
898 gutsche 1.3 """
899     Put in the script the commands to build an executable
900     or a library.
901     """
902    
903     txt = ""
904    
905     if os.path.isfile(self.tgzNameWithPath):
906     txt += 'echo "tar xzvf $RUNTIME_AREA/'+os.path.basename(self.tgzNameWithPath)+'"\n'
907     txt += 'tar xzvf $RUNTIME_AREA/'+os.path.basename(self.tgzNameWithPath)+'\n'
908     txt += 'untar_status=$? \n'
909     txt += 'if [ $untar_status -ne 0 ]; then \n'
910     txt += ' echo "SET_EXE 1 ==> ERROR Untarring .tgz file failed"\n'
911     txt += ' echo "JOB_EXIT_STATUS = $untar_status" \n'
912 gutsche 1.7 txt += ' echo "JobExitCode=$untar_status" | tee -a $RUNTIME_AREA/$repo\n'
913 gutsche 1.3 txt += ' if [ $middleware == OSG ]; then \n'
914     txt += ' echo "Remove working directory: $WORKING_DIR"\n'
915     txt += ' cd $RUNTIME_AREA\n'
916     txt += ' /bin/rm -rf $WORKING_DIR\n'
917     txt += ' if [ -d $WORKING_DIR ] ;then\n'
918 gutsche 1.13 txt += ' echo "SET_EXE 50999 ==> OSG $WORKING_DIR could not be deleted on WN `hostname` after Untarring .tgz file failed"\n'
919     txt += ' echo "JOB_EXIT_STATUS = 50999"\n'
920     txt += ' echo "JobExitCode=50999" | tee -a $RUNTIME_AREA/$repo\n'
921     txt += ' dumpStatus $RUNTIME_AREA/$repo\n'
922     txt += ' rm -f $RUNTIME_AREA/$repo \n'
923     txt += ' echo "MonitorJobID=`echo $MonitorJobID`" | tee -a $RUNTIME_AREA/$repo \n'
924     txt += ' echo "MonitorID=`echo $MonitorID`" | tee -a $RUNTIME_AREA/$repo\n'
925 gutsche 1.3 txt += ' fi\n'
926     txt += ' fi \n'
927     txt += ' \n'
928 gutsche 1.7 txt += ' exit 1 \n'
929 gutsche 1.3 txt += 'else \n'
930     txt += ' echo "Successful untar" \n'
931     txt += 'fi \n'
932 gutsche 1.50 txt += '\n'
933     txt += 'echo "Include ProdAgentApi in PYTHONPATH"\n'
934     txt += 'if [ -z "$PYTHONPATH" ]; then\n'
935     txt += ' export PYTHONPATH=ProdAgentApi\n'
936     txt += 'else\n'
937     txt += ' export PYTHONPATH=ProdAgentApi:${PYTHONPATH}\n'
938     txt += 'fi\n'
939     txt += '\n'
940    
941 gutsche 1.3 pass
942    
943 slacapra 1.1 return txt
944    
945     def modifySteeringCards(self, nj):
946     """
947     modify the card provided by the user,
948     writing a new card into share dir
949     """
950    
951     def executableName(self):
952 spiga 1.42 if self.pset == None: #CarlosDaniele
953     return "sh "
954     else:
955     return self.executable
956 slacapra 1.1
957     def executableArgs(self):
958 spiga 1.42 if self.pset == None:#CarlosDaniele
959     return self.scriptExe + " $NJob"
960     else:
961     return " -p pset.cfg"
962 slacapra 1.1
963     def inputSandbox(self, nj):
964     """
965     Returns a list of filenames to be put in JDL input sandbox.
966     """
967     inp_box = []
968 slacapra 1.53 # # dict added to delete duplicate from input sandbox file list
969     # seen = {}
970 slacapra 1.1 ## code
971     if os.path.isfile(self.tgzNameWithPath):
972     inp_box.append(self.tgzNameWithPath)
973 corvo 1.58 if os.path.isfile(self.MLtgzfile):
974     inp_box.append(self.MLtgzfile)
975 slacapra 1.1 ## config
976 spiga 1.42 if not self.pset is None: #CarlosDaniele
977 corvo 1.56 inp_box.append(common.work_space.pathForTgz() + 'job/' + self.configFilename())
978 slacapra 1.1 ## additional input files
979 gutsche 1.3 #for file in self.additional_inbox_files:
980     # inp_box.append(common.work_space.cwdDir()+file)
981 slacapra 1.1 return inp_box
982    
983     def outputSandbox(self, nj):
984     """
985     Returns a list of filenames to be put in JDL output sandbox.
986     """
987     out_box = []
988    
989     ## User Declared output files
990 slacapra 1.54 for out in (self.output_file+self.output_file_sandbox):
991 slacapra 1.1 n_out = nj + 1
992     out_box.append(self.numberFile_(out,str(n_out)))
993     return out_box
994    
995     def prepareSteeringCards(self):
996     """
997     Make initial modifications of the user's steering card file.
998     """
999     return
1000    
1001     def wsRenameOutput(self, nj):
1002     """
1003     Returns part of a job script which renames the produced files.
1004     """
1005    
1006     txt = '\n'
1007 gutsche 1.7 txt += '# directory content\n'
1008     txt += 'ls \n'
1009 slacapra 1.54
1010     for fileWithSuffix in (self.output_file+self.output_file_sandbox):
1011 slacapra 1.1 output_file_num = self.numberFile_(fileWithSuffix, '$NJob')
1012     txt += '\n'
1013 gutsche 1.7 txt += '# check output file\n'
1014 slacapra 1.1 txt += 'ls '+fileWithSuffix+'\n'
1015 fanzago 1.18 txt += 'ls_result=$?\n'
1016     txt += 'if [ $ls_result -ne 0 ] ; then\n'
1017     txt += ' echo "ERROR: Problem with output file"\n'
1018 gutsche 1.7 if common.scheduler.boss_scheduler_name == 'condor_g':
1019     txt += ' if [ $middleware == OSG ]; then \n'
1020     txt += ' echo "prepare dummy output file"\n'
1021     txt += ' echo "Processing of job output failed" > $RUNTIME_AREA/'+output_file_num+'\n'
1022     txt += ' fi \n'
1023 slacapra 1.1 txt += 'else\n'
1024     txt += ' cp '+fileWithSuffix+' $RUNTIME_AREA/'+output_file_num+'\n'
1025     txt += 'fi\n'
1026    
1027 gutsche 1.7 txt += 'cd $RUNTIME_AREA\n'
1028 fanzago 1.18 txt += 'cd $RUNTIME_AREA\n'
1029 gutsche 1.3 ### OLI_DANIELE
1030     txt += 'if [ $middleware == OSG ]; then\n'
1031     txt += ' cd $RUNTIME_AREA\n'
1032     txt += ' echo "Remove working directory: $WORKING_DIR"\n'
1033     txt += ' /bin/rm -rf $WORKING_DIR\n'
1034     txt += ' if [ -d $WORKING_DIR ] ;then\n'
1035 gutsche 1.7 txt += ' echo "SET_EXE 60999 ==> OSG $WORKING_DIR could not be deleted on WN `hostname` after cleanup of WN"\n'
1036     txt += ' echo "JOB_EXIT_STATUS = 60999"\n'
1037     txt += ' echo "JobExitCode=60999" | tee -a $RUNTIME_AREA/$repo\n'
1038     txt += ' dumpStatus $RUNTIME_AREA/$repo\n'
1039 gutsche 1.13 txt += ' rm -f $RUNTIME_AREA/$repo \n'
1040     txt += ' echo "MonitorJobID=`echo $MonitorJobID`" | tee -a $RUNTIME_AREA/$repo \n'
1041     txt += ' echo "MonitorID=`echo $MonitorID`" | tee -a $RUNTIME_AREA/$repo\n'
1042 gutsche 1.3 txt += ' fi\n'
1043     txt += 'fi\n'
1044     txt += '\n'
1045 slacapra 1.54
1046     file_list = ''
1047     ## Add to filelist only files to be possibly copied to SE
1048     for fileWithSuffix in self.output_file:
1049     output_file_num = self.numberFile_(fileWithSuffix, '$NJob')
1050     file_list=file_list+output_file_num+' '
1051     file_list=file_list[:-1]
1052     txt += 'file_list="'+file_list+'"\n'
1053    
1054 slacapra 1.1 return txt
1055    
1056     def numberFile_(self, file, txt):
1057     """
1058     append _'txt' before last extension of a file
1059     """
1060     p = string.split(file,".")
1061     # take away last extension
1062     name = p[0]
1063     for x in p[1:-1]:
1064     name=name+"."+x
1065     # add "_txt"
1066     if len(p)>1:
1067     ext = p[len(p)-1]
1068     result = name + '_' + txt + "." + ext
1069     else:
1070     result = name + '_' + txt
1071    
1072     return result
1073    
1074 slacapra 1.63 def getRequirements(self, nj=[]):
1075 slacapra 1.1 """
1076     return job requirements to add to jdl files
1077     """
1078     req = ''
1079 slacapra 1.47 if self.version:
1080 slacapra 1.10 req='Member("VO-cms-' + \
1081 slacapra 1.47 self.version + \
1082 slacapra 1.10 '", other.GlueHostApplicationSoftwareRunTimeEnvironment)'
1083 gutsche 1.35
1084     req = req + ' && (other.GlueHostNetworkAdapterOutboundIP)'
1085    
1086 slacapra 1.1 return req
1087 gutsche 1.3
1088     def configFilename(self):
1089     """ return the config filename """
1090     return self.name()+'.cfg'
1091    
1092     ### OLI_DANIELE
1093     def wsSetupCMSOSGEnvironment_(self):
1094     """
1095     Returns part of a job script which is prepares
1096     the execution environment and which is common for all CMS jobs.
1097     """
1098     txt = '\n'
1099     txt += ' echo "### SETUP CMS OSG ENVIRONMENT ###"\n'
1100     txt += ' if [ -f $GRID3_APP_DIR/cmssoft/cmsset_default.sh ] ;then\n'
1101     txt += ' # Use $GRID3_APP_DIR/cmssoft/cmsset_default.sh to setup cms software\n'
1102     txt += ' source $GRID3_APP_DIR/cmssoft/cmsset_default.sh '+self.version+'\n'
1103 mkirn 1.40 txt += ' elif [ -f $OSG_APP/cmssoft/cms/cmsset_default.sh ] ;then\n'
1104     txt += ' # Use $OSG_APP/cmssoft/cms/cmsset_default.sh to setup cms software\n'
1105     txt += ' source $OSG_APP/cmssoft/cms/cmsset_default.sh '+self.version+'\n'
1106 gutsche 1.3 txt += ' else\n'
1107 mkirn 1.40 txt += ' echo "SET_CMS_ENV 10020 ==> ERROR $GRID3_APP_DIR/cmssoft/cmsset_default.sh and $OSG_APP/cmssoft/cms/cmsset_default.sh file not found"\n'
1108 gutsche 1.3 txt += ' echo "JOB_EXIT_STATUS = 10020"\n'
1109     txt += ' echo "JobExitCode=10020" | tee -a $RUNTIME_AREA/$repo\n'
1110     txt += ' dumpStatus $RUNTIME_AREA/$repo\n'
1111 gutsche 1.13 txt += ' rm -f $RUNTIME_AREA/$repo \n'
1112     txt += ' echo "MonitorJobID=`echo $MonitorJobID`" | tee -a $RUNTIME_AREA/$repo \n'
1113     txt += ' echo "MonitorID=`echo $MonitorID`" | tee -a $RUNTIME_AREA/$repo\n'
1114 gutsche 1.7 txt += ' exit 1\n'
1115 gutsche 1.3 txt += '\n'
1116     txt += ' echo "Remove working directory: $WORKING_DIR"\n'
1117     txt += ' cd $RUNTIME_AREA\n'
1118     txt += ' /bin/rm -rf $WORKING_DIR\n'
1119     txt += ' if [ -d $WORKING_DIR ] ;then\n'
1120 mkirn 1.40 txt += ' echo "SET_CMS_ENV 10017 ==> OSG $WORKING_DIR could not be deleted on WN `hostname` after $GRID3_APP_DIR/cmssoft/cmsset_default.sh and $OSG_APP/cmssoft/cms/cmsset_default.sh file not found"\n'
1121 gutsche 1.7 txt += ' echo "JOB_EXIT_STATUS = 10017"\n'
1122     txt += ' echo "JobExitCode=10017" | tee -a $RUNTIME_AREA/$repo\n'
1123     txt += ' dumpStatus $RUNTIME_AREA/$repo\n'
1124 gutsche 1.13 txt += ' rm -f $RUNTIME_AREA/$repo \n'
1125     txt += ' echo "MonitorJobID=`echo $MonitorJobID`" | tee -a $RUNTIME_AREA/$repo \n'
1126     txt += ' echo "MonitorID=`echo $MonitorID`" | tee -a $RUNTIME_AREA/$repo\n'
1127 gutsche 1.3 txt += ' fi\n'
1128     txt += '\n'
1129 gutsche 1.7 txt += ' exit 1\n'
1130 gutsche 1.3 txt += ' fi\n'
1131     txt += '\n'
1132     txt += ' echo "SET_CMS_ENV 0 ==> setup cms environment ok"\n'
1133     txt += ' echo " END SETUP CMS OSG ENVIRONMENT "\n'
1134    
1135     return txt
1136    
1137     ### OLI_DANIELE
1138     def wsSetupCMSLCGEnvironment_(self):
1139     """
1140     Returns part of a job script which is prepares
1141     the execution environment and which is common for all CMS jobs.
1142     """
1143     txt = ' \n'
1144     txt += ' echo " ### SETUP CMS LCG ENVIRONMENT ### "\n'
1145     txt += ' if [ ! $VO_CMS_SW_DIR ] ;then\n'
1146     txt += ' echo "SET_CMS_ENV 10031 ==> ERROR CMS software dir not found on WN `hostname`"\n'
1147     txt += ' echo "JOB_EXIT_STATUS = 10031" \n'
1148     txt += ' echo "JobExitCode=10031" | tee -a $RUNTIME_AREA/$repo\n'
1149     txt += ' dumpStatus $RUNTIME_AREA/$repo\n'
1150 gutsche 1.13 txt += ' rm -f $RUNTIME_AREA/$repo \n'
1151     txt += ' echo "MonitorJobID=`echo $MonitorJobID`" | tee -a $RUNTIME_AREA/$repo \n'
1152     txt += ' echo "MonitorID=`echo $MonitorID`" | tee -a $RUNTIME_AREA/$repo\n'
1153 gutsche 1.7 txt += ' exit 1\n'
1154 gutsche 1.3 txt += ' else\n'
1155     txt += ' echo "Sourcing environment... "\n'
1156     txt += ' if [ ! -s $VO_CMS_SW_DIR/cmsset_default.sh ] ;then\n'
1157     txt += ' echo "SET_CMS_ENV 10020 ==> ERROR cmsset_default.sh file not found into dir $VO_CMS_SW_DIR"\n'
1158     txt += ' echo "JOB_EXIT_STATUS = 10020"\n'
1159     txt += ' echo "JobExitCode=10020" | tee -a $RUNTIME_AREA/$repo\n'
1160     txt += ' dumpStatus $RUNTIME_AREA/$repo\n'
1161 gutsche 1.13 txt += ' rm -f $RUNTIME_AREA/$repo \n'
1162     txt += ' echo "MonitorJobID=`echo $MonitorJobID`" | tee -a $RUNTIME_AREA/$repo \n'
1163     txt += ' echo "MonitorID=`echo $MonitorID`" | tee -a $RUNTIME_AREA/$repo\n'
1164 gutsche 1.7 txt += ' exit 1\n'
1165 gutsche 1.3 txt += ' fi\n'
1166     txt += ' echo "sourcing $VO_CMS_SW_DIR/cmsset_default.sh"\n'
1167     txt += ' source $VO_CMS_SW_DIR/cmsset_default.sh\n'
1168     txt += ' result=$?\n'
1169     txt += ' if [ $result -ne 0 ]; then\n'
1170     txt += ' echo "SET_CMS_ENV 10032 ==> ERROR problem sourcing $VO_CMS_SW_DIR/cmsset_default.sh"\n'
1171     txt += ' echo "JOB_EXIT_STATUS = 10032"\n'
1172     txt += ' echo "JobExitCode=10032" | tee -a $RUNTIME_AREA/$repo\n'
1173     txt += ' dumpStatus $RUNTIME_AREA/$repo\n'
1174 gutsche 1.13 txt += ' rm -f $RUNTIME_AREA/$repo \n'
1175     txt += ' echo "MonitorJobID=`echo $MonitorJobID`" | tee -a $RUNTIME_AREA/$repo \n'
1176     txt += ' echo "MonitorID=`echo $MonitorID`" | tee -a $RUNTIME_AREA/$repo\n'
1177 gutsche 1.7 txt += ' exit 1\n'
1178 gutsche 1.3 txt += ' fi\n'
1179     txt += ' fi\n'
1180     txt += ' \n'
1181     txt += ' string=`cat /etc/redhat-release`\n'
1182     txt += ' echo $string\n'
1183     txt += ' if [[ $string = *alhalla* ]]; then\n'
1184     txt += ' echo "SCRAM_ARCH= $SCRAM_ARCH"\n'
1185     txt += ' elif [[ $string = *Enterprise* ]] || [[ $string = *cientific* ]]; then\n'
1186     txt += ' export SCRAM_ARCH=slc3_ia32_gcc323\n'
1187     txt += ' echo "SCRAM_ARCH= $SCRAM_ARCH"\n'
1188     txt += ' else\n'
1189 gutsche 1.7 txt += ' echo "SET_CMS_ENV 10033 ==> ERROR OS unknown, LCG environment not initialized"\n'
1190 gutsche 1.3 txt += ' echo "JOB_EXIT_STATUS = 10033"\n'
1191     txt += ' echo "JobExitCode=10033" | tee -a $RUNTIME_AREA/$repo\n'
1192     txt += ' dumpStatus $RUNTIME_AREA/$repo\n'
1193 gutsche 1.13 txt += ' rm -f $RUNTIME_AREA/$repo \n'
1194     txt += ' echo "MonitorJobID=`echo $MonitorJobID`" | tee -a $RUNTIME_AREA/$repo \n'
1195     txt += ' echo "MonitorID=`echo $MonitorID`" | tee -a $RUNTIME_AREA/$repo\n'
1196 gutsche 1.7 txt += ' exit 1\n'
1197 gutsche 1.3 txt += ' fi\n'
1198     txt += ' echo "SET_CMS_ENV 0 ==> setup cms environment ok"\n'
1199     txt += ' echo "### END SETUP CMS LCG ENVIRONMENT ###"\n'
1200     return txt
1201 gutsche 1.5
1202     def setParam_(self, param, value):
1203     self._params[param] = value
1204    
1205     def getParams(self):
1206     return self._params
1207 gutsche 1.8
1208     def setTaskid_(self):
1209     self._taskId = self.cfg_params['taskId']
1210    
1211     def getTaskid(self):
1212     return self._taskId
1213 gutsche 1.35
1214     #######################################################################
1215     def uniquelist(self, old):
1216     """
1217     remove duplicates from a list
1218     """
1219     nd={}
1220     for e in old:
1221     nd[e]=0
1222     return nd.keys()