ViewVC Help
View File | Revision Log | Show Annotations | Root Listing
root/cvsroot/COMP/CRAB/python/cms_cmssw.py
Revision: 1.50
Committed: Mon Oct 9 23:25:10 2006 UTC (18 years, 6 months ago) by gutsche
Content type: text/x-python
Branch: MAIN
CVS Tags: CRAB_1_4_0_pre3, CRAB_1_4_0_pre2
Changes since 1.49: +32 -3 lines
Log Message:
added FrameworkJobReport crab_fjr.xml per default to output sandbox, copy ProdAgentApi directory from  to swAera of local USER CMSSW project, include ProdAgentApi into PYTHONPATH on WN

File Contents

# User Rev Content
1 slacapra 1.1 from JobType import JobType
2     from crab_logger import Logger
3     from crab_exceptions import *
4     from crab_util import *
5 slacapra 1.22 import math
6 slacapra 1.1 import common
7 gutsche 1.3 import PsetManipulator
8 slacapra 1.1
9 slacapra 1.41 import DBSInfo
10     import DataDiscovery
11     import DataLocation
12 slacapra 1.1 import Scram
13    
14 slacapra 1.45 import glob, os, string, re
15 slacapra 1.1
16     class Cmssw(JobType):
17 gutsche 1.38 def __init__(self, cfg_params, ncjobs):
18 slacapra 1.1 JobType.__init__(self, 'CMSSW')
19     common.logger.debug(3,'CMSSW::__init__')
20    
21 gutsche 1.3 # Marco.
22     self._params = {}
23     self.cfg_params = cfg_params
24 gutsche 1.38
25 gutsche 1.44 # number of jobs requested to be created, limit obj splitting
26 gutsche 1.38 self.ncjobs = ncjobs
27    
28 slacapra 1.1 log = common.logger
29    
30     self.scram = Scram.Scram(cfg_params)
31     scramArea = ''
32     self.additional_inbox_files = []
33     self.scriptExe = ''
34     self.executable = ''
35     self.tgz_name = 'default.tgz'
36 spiga 1.42 self.pset = '' #scrip use case Da
37     self.datasetPath = '' #scrip use case Da
38 gutsche 1.3
39 gutsche 1.50 # set FJR file name
40     self.fjrFileName = 'crab_fjr.xml'
41    
42 slacapra 1.1 self.version = self.scram.getSWVersion()
43 gutsche 1.5 self.setParam_('application', self.version)
44 slacapra 1.47
45 slacapra 1.1 ### collect Data cards
46     try:
47 slacapra 1.9 tmp = cfg_params['CMSSW.datasetpath']
48     log.debug(6, "CMSSW::CMSSW(): datasetPath = "+tmp)
49     if string.lower(tmp)=='none':
50     self.datasetPath = None
51 slacapra 1.21 self.selectNoInput = 1
52 slacapra 1.9 else:
53     self.datasetPath = tmp
54 slacapra 1.21 self.selectNoInput = 0
55 slacapra 1.1 except KeyError:
56 gutsche 1.3 msg = "Error: datasetpath not defined "
57 slacapra 1.1 raise CrabException(msg)
58 gutsche 1.5
59     # ML monitoring
60     # split dataset path style: /PreProdR3Minbias/SIM/GEN-SIM
61 slacapra 1.9 if not self.datasetPath:
62     self.setParam_('dataset', 'None')
63     self.setParam_('owner', 'None')
64     else:
65     datasetpath_split = self.datasetPath.split("/")
66     self.setParam_('dataset', datasetpath_split[1])
67     self.setParam_('owner', datasetpath_split[-1])
68    
69 gutsche 1.8 self.setTaskid_()
70     self.setParam_('taskId', self.cfg_params['taskId'])
71 gutsche 1.5
72 slacapra 1.1 self.dataTiers = []
73    
74     ## now the application
75     try:
76     self.executable = cfg_params['CMSSW.executable']
77 gutsche 1.5 self.setParam_('exe', self.executable)
78 slacapra 1.1 log.debug(6, "CMSSW::CMSSW(): executable = "+self.executable)
79     msg = "Default executable cmsRun overridden. Switch to " + self.executable
80     log.debug(3,msg)
81     except KeyError:
82     self.executable = 'cmsRun'
83 gutsche 1.5 self.setParam_('exe', self.executable)
84 slacapra 1.1 msg = "User executable not defined. Use cmsRun"
85     log.debug(3,msg)
86     pass
87    
88     try:
89     self.pset = cfg_params['CMSSW.pset']
90     log.debug(6, "Cmssw::Cmssw(): PSet file = "+self.pset)
91 spiga 1.42 if self.pset.lower() != 'none' :
92     if (not os.path.exists(self.pset)):
93     raise CrabException("User defined PSet file "+self.pset+" does not exist")
94     else:
95     self.pset = None
96 slacapra 1.1 except KeyError:
97     raise CrabException("PSet file missing. Cannot run cmsRun ")
98    
99     # output files
100     try:
101     self.output_file = []
102    
103 gutsche 1.50 # add fjr report by default
104     self.output_file.append(self.fjrFileName)
105    
106 slacapra 1.1 tmp = cfg_params['CMSSW.output_file']
107     if tmp != '':
108     tmpOutFiles = string.split(cfg_params['CMSSW.output_file'],',')
109     log.debug(7, 'cmssw::cmssw(): output files '+str(tmpOutFiles))
110     for tmp in tmpOutFiles:
111     tmp=string.strip(tmp)
112     self.output_file.append(tmp)
113     pass
114     else:
115 gutsche 1.50 log.message("No output file defined: only stdout/err and the CRAB Framework Job Report will be available")
116 slacapra 1.1 pass
117     pass
118     except KeyError:
119 gutsche 1.50 log.message("No output file defined: only stdout/err and the CRAB Framework Job Report will be available")
120 slacapra 1.1 pass
121    
122     # script_exe file as additional file in inputSandbox
123     try:
124 slacapra 1.10 self.scriptExe = cfg_params['USER.script_exe']
125     if self.scriptExe != '':
126     if not os.path.isfile(self.scriptExe):
127     msg ="WARNING. file "+self.scriptExe+" not found"
128     raise CrabException(msg)
129 spiga 1.42 self.additional_inbox_files.append(string.strip(self.scriptExe))
130 slacapra 1.1 except KeyError:
131 spiga 1.42 self.scriptExe = ''
132     #CarlosDaniele
133     if self.datasetPath == None and self.pset == None and self.scriptExe == '' :
134     msg ="WARNING. script_exe not defined"
135     raise CrabException(msg)
136    
137 slacapra 1.1 ## additional input files
138     try:
139 slacapra 1.29 tmpAddFiles = string.split(cfg_params['USER.additional_input_files'],',')
140 slacapra 1.1 for tmp in tmpAddFiles:
141 slacapra 1.45 dirname = ''
142     if not tmp[0]=="/": dirname = "."
143     files = glob.glob(os.path.join(dirname, tmp))
144     for file in files:
145     if not os.path.exists(file):
146     raise CrabException("Additional input file not found: "+file)
147     pass
148     self.additional_inbox_files.append(string.strip(file))
149 slacapra 1.1 pass
150     pass
151 slacapra 1.45 common.logger.debug(5,"Additional input files: "+str(self.additional_inbox_files))
152 slacapra 1.1 except KeyError:
153     pass
154    
155 slacapra 1.9 # files per job
156 slacapra 1.1 try:
157 gutsche 1.35 if (cfg_params['CMSSW.files_per_jobs']):
158     raise CrabException("files_per_jobs no longer supported. Quitting.")
159 gutsche 1.3 except KeyError:
160 gutsche 1.35 pass
161 gutsche 1.3
162 slacapra 1.9 ## Events per job
163 gutsche 1.3 try:
164 slacapra 1.10 self.eventsPerJob =int( cfg_params['CMSSW.events_per_job'])
165 slacapra 1.9 self.selectEventsPerJob = 1
166 gutsche 1.3 except KeyError:
167 slacapra 1.9 self.eventsPerJob = -1
168     self.selectEventsPerJob = 0
169    
170 slacapra 1.22 ## number of jobs
171     try:
172     self.theNumberOfJobs =int( cfg_params['CMSSW.number_of_jobs'])
173     self.selectNumberOfJobs = 1
174     except KeyError:
175     self.theNumberOfJobs = 0
176     self.selectNumberOfJobs = 0
177 slacapra 1.10
178 gutsche 1.35 try:
179     self.total_number_of_events = int(cfg_params['CMSSW.total_number_of_events'])
180     self.selectTotalNumberEvents = 1
181     except KeyError:
182     self.total_number_of_events = 0
183     self.selectTotalNumberEvents = 0
184    
185 spiga 1.42 if self.pset != None: #CarlosDaniele
186     if ( (self.selectTotalNumberEvents + self.selectEventsPerJob + self.selectNumberOfJobs) != 2 ):
187     msg = 'Must define exactly two of total_number_of_events, events_per_job, or number_of_jobs.'
188     raise CrabException(msg)
189     else:
190     if (self.selectNumberOfJobs == 0):
191     msg = 'Must specify number_of_jobs.'
192     raise CrabException(msg)
193 gutsche 1.35
194 slacapra 1.22 ## source seed for pythia
195     try:
196     self.sourceSeed = int(cfg_params['CMSSW.pythia_seed'])
197     except KeyError:
198 slacapra 1.23 self.sourceSeed = None
199     common.logger.debug(5,"No seed given")
200 slacapra 1.22
201 slacapra 1.28 try:
202     self.sourceSeedVtx = int(cfg_params['CMSSW.vtx_seed'])
203     except KeyError:
204     self.sourceSeedVtx = None
205     common.logger.debug(5,"No vertex seed given")
206 spiga 1.42 if self.pset != None: #CarlosDaniele
207     self.PsetEdit = PsetManipulator.PsetManipulator(self.pset) #Daniele Pset
208 gutsche 1.3
209 slacapra 1.1 #DBSDLS-start
210     ## Initialize the variables that are extracted from DBS/DLS and needed in other places of the code
211     self.maxEvents=0 # max events available ( --> check the requested nb. of evts in Creator.py)
212     self.DBSPaths={} # all dbs paths requested ( --> input to the site local discovery script)
213 gutsche 1.35 self.jobDestination=[] # Site destination(s) for each job (list of lists)
214 slacapra 1.1 ## Perform the data location and discovery (based on DBS/DLS)
215 slacapra 1.9 ## SL: Don't if NONE is specified as input (pythia use case)
216 gutsche 1.35 blockSites = {}
217 slacapra 1.9 if self.datasetPath:
218 gutsche 1.35 blockSites = self.DataDiscoveryAndLocation(cfg_params)
219 slacapra 1.1 #DBSDLS-end
220    
221     self.tgzNameWithPath = self.getTarBall(self.executable)
222 slacapra 1.10
223 slacapra 1.9 ## Select Splitting
224 spiga 1.42 if self.selectNoInput:
225     if self.pset == None: #CarlosDaniele
226     self.jobSplittingForScript()
227     else:
228     self.jobSplittingNoInput()
229 gutsche 1.35 else: self.jobSplittingByBlocks(blockSites)
230 gutsche 1.5
231 slacapra 1.22 # modify Pset
232 spiga 1.42 if self.pset != None: #CarlosDaniele
233     try:
234     if (self.datasetPath): # standard job
235     # allow to processa a fraction of events in a file
236     self.PsetEdit.inputModule("INPUT")
237     self.PsetEdit.maxEvent("INPUTMAXEVENTS")
238     self.PsetEdit.skipEvent("INPUTSKIPEVENTS")
239     else: # pythia like job
240     self.PsetEdit.maxEvent(self.eventsPerJob)
241     if (self.sourceSeed) :
242     self.PsetEdit.pythiaSeed("INPUT")
243     if (self.sourceSeedVtx) :
244     self.PsetEdit.pythiaSeedVtx("INPUTVTX")
245 gutsche 1.50 # add FrameworkJobReport to parameter-set
246     self.PsetEdit.addCrabFJR(self.fjrFileName)
247 spiga 1.42 self.PsetEdit.psetWriter(self.configFilename())
248     except:
249     msg='Error while manipuliating ParameterSet: exiting...'
250     raise CrabException(msg)
251 gutsche 1.3
252 slacapra 1.1 def DataDiscoveryAndLocation(self, cfg_params):
253    
254 gutsche 1.3 common.logger.debug(10,"CMSSW::DataDiscoveryAndLocation()")
255    
256     datasetPath=self.datasetPath
257    
258     ## TODO
259     dataTiersList = ""
260     dataTiers = dataTiersList.split(',')
261 slacapra 1.1
262     ## Contact the DBS
263 slacapra 1.41 common.logger.message("Contacting DBS...")
264 slacapra 1.1 try:
265 slacapra 1.41 self.pubdata=DataDiscovery.DataDiscovery(datasetPath, dataTiers, cfg_params)
266 slacapra 1.1 self.pubdata.fetchDBSInfo()
267    
268 slacapra 1.41 except DataDiscovery.NotExistingDatasetError, ex :
269 slacapra 1.1 msg = 'ERROR ***: failed Data Discovery in DBS : %s'%ex.getErrorMessage()
270     raise CrabException(msg)
271    
272 slacapra 1.41 except DataDiscovery.NoDataTierinProvenanceError, ex :
273 slacapra 1.1 msg = 'ERROR ***: failed Data Discovery in DBS : %s'%ex.getErrorMessage()
274     raise CrabException(msg)
275 slacapra 1.41 except DataDiscovery.DataDiscoveryError, ex:
276 slacapra 1.1 msg = 'ERROR ***: failed Data Discovery in DBS %s'%ex.getErrorMessage()
277     raise CrabException(msg)
278    
279     ## get list of all required data in the form of dbs paths (dbs path = /dataset/datatier/owner)
280 gutsche 1.3 ## self.DBSPaths=self.pubdata.getDBSPaths()
281     common.logger.message("Required data are :"+self.datasetPath)
282    
283 gutsche 1.35 self.filesbyblock=self.pubdata.getFiles()
284 mkirn 1.37 self.eventsbyblock=self.pubdata.getEventsPerBlock()
285     self.eventsbyfile=self.pubdata.getEventsPerFile()
286 slacapra 1.41 # print str(self.filesbyblock)
287     # print 'self.eventsbyfile',len(self.eventsbyfile)
288     # print str(self.eventsbyfile)
289 gutsche 1.3
290 slacapra 1.1 ## get max number of events
291     self.maxEvents=self.pubdata.getMaxEvents() ## self.maxEvents used in Creator.py
292 gutsche 1.44 common.logger.message("The number of available events is %s\n"%self.maxEvents)
293 slacapra 1.1
294 slacapra 1.41 common.logger.message("Contacting DLS...")
295 slacapra 1.1 ## Contact the DLS and build a list of sites hosting the fileblocks
296     try:
297 slacapra 1.41 dataloc=DataLocation.DataLocation(self.filesbyblock.keys(),cfg_params)
298 gutsche 1.6 dataloc.fetchDLSInfo()
299 slacapra 1.41 except DataLocation.DataLocationError , ex:
300 slacapra 1.1 msg = 'ERROR ***: failed Data Location in DLS \n %s '%ex.getErrorMessage()
301     raise CrabException(msg)
302    
303    
304 gutsche 1.35 sites = dataloc.getSites()
305     allSites = []
306     listSites = sites.values()
307     for list in listSites:
308     for oneSite in list:
309     allSites.append(oneSite)
310     allSites = self.uniquelist(allSites)
311 gutsche 1.3
312 gutsche 1.35 common.logger.message("Sites ("+str(len(allSites))+") hosting part/all of dataset: "+str(allSites))
313     common.logger.debug(6, "List of Sites: "+str(allSites))
314     return sites
315 gutsche 1.3
316 gutsche 1.35 def jobSplittingByBlocks(self, blockSites):
317 slacapra 1.9 """
318 gutsche 1.35 Perform job splitting. Jobs run over an integer number of files
319     and no more than one block.
320     ARGUMENT: blockSites: dictionary with blocks as keys and list of host sites as values
321     REQUIRES: self.selectTotalNumberEvents, self.selectEventsPerJob, self.selectNumberofJobs,
322     self.total_number_of_events, self.eventsPerJob, self.theNumberOfJobs,
323     self.maxEvents, self.filesbyblock
324     SETS: self.jobDestination - Site destination(s) for each job (a list of lists)
325     self.total_number_of_jobs - Total # of jobs
326     self.list_of_args - File(s) job will run on (a list of lists)
327     """
328    
329     # ---- Handle the possible job splitting configurations ---- #
330     if (self.selectTotalNumberEvents):
331     totalEventsRequested = self.total_number_of_events
332     if (self.selectEventsPerJob):
333     eventsPerJobRequested = self.eventsPerJob
334     if (self.selectNumberOfJobs):
335     totalEventsRequested = self.theNumberOfJobs * self.eventsPerJob
336    
337     # If user requested all the events in the dataset
338     if (totalEventsRequested == -1):
339     eventsRemaining=self.maxEvents
340     # If user requested more events than are in the dataset
341     elif (totalEventsRequested > self.maxEvents):
342     eventsRemaining = self.maxEvents
343     common.logger.message("Requested "+str(self.total_number_of_events)+ " events, but only "+str(self.maxEvents)+" events are available.")
344     # If user requested less events than are in the dataset
345     else:
346     eventsRemaining = totalEventsRequested
347 slacapra 1.22
348 slacapra 1.41 # If user requested more events per job than are in the dataset
349     if (self.selectEventsPerJob and eventsPerJobRequested > self.maxEvents):
350     eventsPerJobRequested = self.maxEvents
351    
352 gutsche 1.35 # For user info at end
353     totalEventCount = 0
354 gutsche 1.3
355 gutsche 1.35 if (self.selectTotalNumberEvents and self.selectNumberOfJobs):
356     eventsPerJobRequested = int(eventsRemaining/self.theNumberOfJobs)
357 slacapra 1.22
358 gutsche 1.35 if (self.selectNumberOfJobs):
359     common.logger.message("May not create the exact number_of_jobs requested.")
360 slacapra 1.23
361 gutsche 1.38 if ( self.ncjobs == 'all' ) :
362     totalNumberOfJobs = 999999999
363     else :
364     totalNumberOfJobs = self.ncjobs
365    
366    
367 gutsche 1.35 blocks = blockSites.keys()
368     blockCount = 0
369     # Backup variable in case self.maxEvents counted events in a non-included block
370     numBlocksInDataset = len(blocks)
371 gutsche 1.3
372 gutsche 1.35 jobCount = 0
373     list_of_lists = []
374 gutsche 1.3
375 gutsche 1.35 # ---- Iterate over the blocks in the dataset until ---- #
376     # ---- we've met the requested total # of events ---- #
377 gutsche 1.38 while ( (eventsRemaining > 0) and (blockCount < numBlocksInDataset) and (jobCount < totalNumberOfJobs)):
378 gutsche 1.35 block = blocks[blockCount]
379 gutsche 1.44 blockCount += 1
380    
381 gutsche 1.3
382 gutsche 1.44 numEventsInBlock = self.eventsbyblock[block]
383     common.logger.debug(5,'Events in Block File '+str(numEventsInBlock))
384 slacapra 1.9
385 gutsche 1.35 files = self.filesbyblock[block]
386     numFilesInBlock = len(files)
387     if (numFilesInBlock <= 0):
388     continue
389     fileCount = 0
390    
391     # ---- New block => New job ---- #
392     parString = "\\{"
393 gutsche 1.38 # counter for number of events in files currently worked on
394     filesEventCount = 0
395     # flag if next while loop should touch new file
396     newFile = 1
397     # job event counter
398     jobSkipEventCount = 0
399 slacapra 1.9
400 gutsche 1.35 # ---- Iterate over the files in the block until we've met the requested ---- #
401     # ---- total # of events or we've gone over all the files in this block ---- #
402 gutsche 1.38 while ( (eventsRemaining > 0) and (fileCount < numFilesInBlock) and (jobCount < totalNumberOfJobs) ):
403 gutsche 1.35 file = files[fileCount]
404 gutsche 1.38 if newFile :
405 slacapra 1.41 try:
406     numEventsInFile = self.eventsbyfile[file]
407     common.logger.debug(6, "File "+str(file)+" has "+str(numEventsInFile)+" events")
408     # increase filesEventCount
409     filesEventCount += numEventsInFile
410     # Add file to current job
411     parString += '\\\"' + file + '\\\"\,'
412     newFile = 0
413     except KeyError:
414 gutsche 1.44 common.logger.message("File "+str(file)+" has unknown number of events: skipping")
415 slacapra 1.41
416 gutsche 1.38
417     # if less events in file remain than eventsPerJobRequested
418     if ( filesEventCount - jobSkipEventCount < eventsPerJobRequested ) :
419     # if last file in block
420 gutsche 1.44 if ( fileCount == numFilesInBlock-1 ) :
421 gutsche 1.38 # end job using last file, use remaining events in block
422     # close job and touch new file
423     fullString = parString[:-2]
424     fullString += '\\}'
425     list_of_lists.append([fullString,str(-1),str(jobSkipEventCount)])
426 slacapra 1.41 common.logger.debug(3,"Job "+str(jobCount+1)+" can run over "+str(filesEventCount - jobSkipEventCount)+" events (last file in block).")
427 gutsche 1.38 self.jobDestination.append(blockSites[block])
428     common.logger.debug(5,"Job "+str(jobCount+1)+" Destination: "+str(self.jobDestination[jobCount]))
429     # reset counter
430     jobCount = jobCount + 1
431 gutsche 1.44 totalEventCount = totalEventCount + filesEventCount - jobSkipEventCount
432     eventsRemaining = eventsRemaining - filesEventCount + jobSkipEventCount
433 gutsche 1.38 jobSkipEventCount = 0
434     # reset file
435     parString = "\\{"
436     filesEventCount = 0
437     newFile = 1
438     fileCount += 1
439     else :
440     # go to next file
441     newFile = 1
442     fileCount += 1
443     # if events in file equal to eventsPerJobRequested
444     elif ( filesEventCount - jobSkipEventCount == eventsPerJobRequested ) :
445     # close job and touch new file
446 gutsche 1.35 fullString = parString[:-2]
447     fullString += '\\}'
448 gutsche 1.38 list_of_lists.append([fullString,str(eventsPerJobRequested),str(jobSkipEventCount)])
449 slacapra 1.41 common.logger.debug(3,"Job "+str(jobCount+1)+" can run over "+str(eventsPerJobRequested)+" events.")
450 gutsche 1.38 self.jobDestination.append(blockSites[block])
451     common.logger.debug(5,"Job "+str(jobCount+1)+" Destination: "+str(self.jobDestination[jobCount]))
452     # reset counter
453     jobCount = jobCount + 1
454     totalEventCount = totalEventCount + eventsPerJobRequested
455     eventsRemaining = eventsRemaining - eventsPerJobRequested
456     jobSkipEventCount = 0
457     # reset file
458     parString = "\\{"
459     filesEventCount = 0
460     newFile = 1
461     fileCount += 1
462    
463     # if more events in file remain than eventsPerJobRequested
464     else :
465     # close job but don't touch new file
466     fullString = parString[:-2]
467     fullString += '\\}'
468     list_of_lists.append([fullString,str(eventsPerJobRequested),str(jobSkipEventCount)])
469 slacapra 1.41 common.logger.debug(3,"Job "+str(jobCount+1)+" can run over "+str(eventsPerJobRequested)+" events.")
470 gutsche 1.35 self.jobDestination.append(blockSites[block])
471     common.logger.debug(5,"Job "+str(jobCount+1)+" Destination: "+str(self.jobDestination[jobCount]))
472 gutsche 1.38 # increase counter
473     jobCount = jobCount + 1
474     totalEventCount = totalEventCount + eventsPerJobRequested
475     eventsRemaining = eventsRemaining - eventsPerJobRequested
476     # calculate skip events for last file
477     # use filesEventCount (contains several files), jobSkipEventCount and eventsPerJobRequest
478     jobSkipEventCount = eventsPerJobRequested - (filesEventCount - jobSkipEventCount - self.eventsbyfile[file])
479     # remove all but the last file
480     filesEventCount = self.eventsbyfile[file]
481     parString = "\\{"
482     parString += '\\\"' + file + '\\\"\,'
483 slacapra 1.41 pass # END if
484 gutsche 1.35 pass # END while (iterate over files in the block)
485     pass # END while (iterate over blocks in the dataset)
486 slacapra 1.41 self.ncjobs = self.total_number_of_jobs = jobCount
487 gutsche 1.38 if (eventsRemaining > 0 and jobCount < totalNumberOfJobs ):
488 gutsche 1.35 common.logger.message("Could not run on all requested events because some blocks not hosted at allowed sites.")
489 mkirn 1.37 common.logger.message("\n"+str(jobCount)+" job(s) can run on "+str(totalEventCount)+" events.\n")
490 slacapra 1.22
491 slacapra 1.9 self.list_of_args = list_of_lists
492     return
493    
494 slacapra 1.21 def jobSplittingNoInput(self):
495 slacapra 1.9 """
496     Perform job splitting based on number of event per job
497     """
498     common.logger.debug(5,'Splitting per events')
499     common.logger.message('Required '+str(self.eventsPerJob)+' events per job ')
500 slacapra 1.22 common.logger.message('Required '+str(self.theNumberOfJobs)+' jobs in total ')
501 slacapra 1.9 common.logger.message('Required '+str(self.total_number_of_events)+' events in total ')
502    
503 slacapra 1.10 if (self.total_number_of_events < 0):
504     msg='Cannot split jobs per Events with "-1" as total number of events'
505     raise CrabException(msg)
506    
507 slacapra 1.22 if (self.selectEventsPerJob):
508     self.total_number_of_jobs = int(self.total_number_of_events/self.eventsPerJob)
509     elif (self.selectNumberOfJobs) :
510     self.total_number_of_jobs = self.theNumberOfJobs
511     self.eventsPerJob = int(self.total_number_of_events/self.total_number_of_jobs)
512 fanzago 1.12
513 slacapra 1.9 common.logger.debug(5,'N jobs '+str(self.total_number_of_jobs))
514    
515     # is there any remainder?
516     check = int(self.total_number_of_events) - (int(self.total_number_of_jobs)*self.eventsPerJob)
517    
518     common.logger.debug(5,'Check '+str(check))
519    
520 gutsche 1.35 common.logger.message(str(self.total_number_of_jobs)+' jobs can be created, each for '+str(self.eventsPerJob)+' for a total of '+str(self.total_number_of_jobs*self.eventsPerJob)+' events')
521 slacapra 1.9 if check > 0:
522 gutsche 1.35 common.logger.message('Warning: asked '+str(self.total_number_of_events)+' but can do only '+str(int(self.total_number_of_jobs)*self.eventsPerJob))
523 slacapra 1.9
524 slacapra 1.10 # argument is seed number.$i
525 slacapra 1.9 self.list_of_args = []
526     for i in range(self.total_number_of_jobs):
527 gutsche 1.35 ## Since there is no input, any site is good
528 spiga 1.42 # self.jobDestination.append(["Any"])
529     self.jobDestination.append([""]) #must be empty to write correctly the xml
530 slacapra 1.23 if (self.sourceSeed):
531 slacapra 1.28 if (self.sourceSeedVtx):
532     ## pythia + vtx random seed
533     self.list_of_args.append([
534     str(self.sourceSeed)+str(i),
535     str(self.sourceSeedVtx)+str(i)
536     ])
537     else:
538     ## only pythia random seed
539     self.list_of_args.append([(str(self.sourceSeed)+str(i))])
540 slacapra 1.23 else:
541 slacapra 1.28 ## no random seed
542 slacapra 1.23 self.list_of_args.append([str(i)])
543 slacapra 1.17 #print self.list_of_args
544 gutsche 1.3
545     return
546    
547 spiga 1.42
548     def jobSplittingForScript(self):#CarlosDaniele
549     """
550     Perform job splitting based on number of job
551     """
552     common.logger.debug(5,'Splitting per job')
553     common.logger.message('Required '+str(self.theNumberOfJobs)+' jobs in total ')
554    
555     self.total_number_of_jobs = self.theNumberOfJobs
556    
557     common.logger.debug(5,'N jobs '+str(self.total_number_of_jobs))
558    
559     common.logger.message(str(self.total_number_of_jobs)+' jobs can be created')
560    
561     # argument is seed number.$i
562     self.list_of_args = []
563     for i in range(self.total_number_of_jobs):
564     ## Since there is no input, any site is good
565     # self.jobDestination.append(["Any"])
566     self.jobDestination.append([""])
567     ## no random seed
568     self.list_of_args.append([str(i)])
569     return
570    
571 gutsche 1.3 def split(self, jobParams):
572    
573     common.jobDB.load()
574     #### Fabio
575     njobs = self.total_number_of_jobs
576 slacapra 1.9 arglist = self.list_of_args
577 gutsche 1.3 # create the empty structure
578     for i in range(njobs):
579     jobParams.append("")
580    
581     for job in range(njobs):
582 slacapra 1.17 jobParams[job] = arglist[job]
583     # print str(arglist[job])
584     # print jobParams[job]
585 gutsche 1.3 common.jobDB.setArguments(job, jobParams[job])
586 gutsche 1.35 common.logger.debug(5,"Job "+str(job)+" Destination: "+str(self.jobDestination[job]))
587     common.jobDB.setDestination(job, self.jobDestination[job])
588 gutsche 1.3
589     common.jobDB.save()
590     return
591    
592     def getJobTypeArguments(self, nj, sched):
593 slacapra 1.17 result = ''
594     for i in common.jobDB.arguments(nj):
595     result=result+str(i)+" "
596     return result
597 gutsche 1.3
598     def numberOfJobs(self):
599     # Fabio
600     return self.total_number_of_jobs
601    
602 slacapra 1.1 def getTarBall(self, exe):
603     """
604     Return the TarBall with lib and exe
605     """
606    
607     # if it exist, just return it
608     self.tgzNameWithPath = common.work_space.shareDir()+self.tgz_name
609     if os.path.exists(self.tgzNameWithPath):
610     return self.tgzNameWithPath
611    
612     # Prepare a tar gzipped file with user binaries.
613     self.buildTar_(exe)
614    
615     return string.strip(self.tgzNameWithPath)
616    
617     def buildTar_(self, executable):
618    
619     # First of all declare the user Scram area
620     swArea = self.scram.getSWArea_()
621     #print "swArea = ", swArea
622     swVersion = self.scram.getSWVersion()
623     #print "swVersion = ", swVersion
624     swReleaseTop = self.scram.getReleaseTop_()
625     #print "swReleaseTop = ", swReleaseTop
626    
627     ## check if working area is release top
628     if swReleaseTop == '' or swArea == swReleaseTop:
629     return
630    
631     filesToBeTarred = []
632     ## First find the executable
633     if (self.executable != ''):
634     exeWithPath = self.scram.findFile_(executable)
635     # print exeWithPath
636     if ( not exeWithPath ):
637     raise CrabException('User executable '+executable+' not found')
638    
639     ## then check if it's private or not
640     if exeWithPath.find(swReleaseTop) == -1:
641     # the exe is private, so we must ship
642     common.logger.debug(5,"Exe "+exeWithPath+" to be tarred")
643     path = swArea+'/'
644     exe = string.replace(exeWithPath, path,'')
645     filesToBeTarred.append(exe)
646     pass
647     else:
648     # the exe is from release, we'll find it on WN
649     pass
650    
651     ## Now get the libraries: only those in local working area
652     libDir = 'lib'
653     lib = swArea+'/' +libDir
654     common.logger.debug(5,"lib "+lib+" to be tarred")
655     if os.path.exists(lib):
656     filesToBeTarred.append(libDir)
657    
658 gutsche 1.3 ## Now check if module dir is present
659     moduleDir = 'module'
660     if os.path.isdir(swArea+'/'+moduleDir):
661     filesToBeTarred.append(moduleDir)
662    
663 slacapra 1.1 ## Now check if the Data dir is present
664     dataDir = 'src/Data/'
665     if os.path.isdir(swArea+'/'+dataDir):
666     filesToBeTarred.append(dataDir)
667 gutsche 1.50
668     ## copy ProdAgent dir to swArea
669     cmd = '\cp -rf ' + os.environ['CRABDIR'] + '/ProdAgentApi ' + swArea
670     cmd_out = runCommand(cmd)
671     if cmd_out != '':
672     common.logger.message('ProdAgentApi directory could not be copied to local CMSSW project directory.')
673     common.logger.message('No FrameworkJobreport parsing is possible on the WorkerNode.')
674    
675     ## Now check if the Data dir is present
676     paDir = 'ProdAgentApi'
677     if os.path.isdir(swArea+'/'+paDir):
678     filesToBeTarred.append(paDir)
679    
680 slacapra 1.1 ## Create the tar-ball
681     if len(filesToBeTarred)>0:
682     cwd = os.getcwd()
683     os.chdir(swArea)
684     tarcmd = 'tar zcvf ' + self.tgzNameWithPath + ' '
685     for line in filesToBeTarred:
686     tarcmd = tarcmd + line + ' '
687     cout = runCommand(tarcmd)
688     if not cout:
689     raise CrabException('Could not create tar-ball')
690     os.chdir(cwd)
691     else:
692     common.logger.debug(5,"No files to be to be tarred")
693    
694     return
695    
696     def wsSetupEnvironment(self, nj):
697     """
698     Returns part of a job script which prepares
699     the execution environment for the job 'nj'.
700     """
701     # Prepare JobType-independent part
702 gutsche 1.3 txt = ''
703    
704     ## OLI_Daniele at this level middleware already known
705    
706     txt += 'if [ $middleware == LCG ]; then \n'
707     txt += self.wsSetupCMSLCGEnvironment_()
708     txt += 'elif [ $middleware == OSG ]; then\n'
709 gutsche 1.43 txt += ' WORKING_DIR=`/bin/mktemp -d $OSG_WN_TMP/cms_XXXXXXXXXXXX`\n'
710     txt += ' echo "Created working directory: $WORKING_DIR"\n'
711 gutsche 1.3 txt += ' if [ ! -d $WORKING_DIR ] ;then\n'
712 gutsche 1.7 txt += ' echo "SET_CMS_ENV 10016 ==> OSG $WORKING_DIR could not be created on WN `hostname`"\n'
713     txt += ' echo "JOB_EXIT_STATUS = 10016"\n'
714     txt += ' echo "JobExitCode=10016" | tee -a $RUNTIME_AREA/$repo\n'
715     txt += ' dumpStatus $RUNTIME_AREA/$repo\n'
716 gutsche 1.13 txt += ' rm -f $RUNTIME_AREA/$repo \n'
717     txt += ' echo "MonitorJobID=`echo $MonitorJobID`" | tee -a $RUNTIME_AREA/$repo \n'
718     txt += ' echo "MonitorID=`echo $MonitorID`" | tee -a $RUNTIME_AREA/$repo\n'
719 gutsche 1.3 txt += ' exit 1\n'
720     txt += ' fi\n'
721     txt += '\n'
722     txt += ' echo "Change to working directory: $WORKING_DIR"\n'
723     txt += ' cd $WORKING_DIR\n'
724     txt += self.wsSetupCMSOSGEnvironment_()
725     txt += 'fi\n'
726 slacapra 1.1
727     # Prepare JobType-specific part
728     scram = self.scram.commandName()
729     txt += '\n\n'
730     txt += 'echo "### SPECIFIC JOB SETUP ENVIRONMENT ###"\n'
731     txt += scram+' project CMSSW '+self.version+'\n'
732     txt += 'status=$?\n'
733     txt += 'if [ $status != 0 ] ; then\n'
734 gutsche 1.7 txt += ' echo "SET_EXE_ENV 10034 ==>ERROR CMSSW '+self.version+' not found on `hostname`" \n'
735 gutsche 1.3 txt += ' echo "JOB_EXIT_STATUS = 10034"\n'
736 gutsche 1.7 txt += ' echo "JobExitCode=10034" | tee -a $RUNTIME_AREA/$repo\n'
737 slacapra 1.1 txt += ' dumpStatus $RUNTIME_AREA/$repo\n'
738 gutsche 1.13 txt += ' rm -f $RUNTIME_AREA/$repo \n'
739     txt += ' echo "MonitorJobID=`echo $MonitorJobID`" | tee -a $RUNTIME_AREA/$repo \n'
740     txt += ' echo "MonitorID=`echo $MonitorID`" | tee -a $RUNTIME_AREA/$repo\n'
741 gutsche 1.3 ## OLI_Daniele
742     txt += ' if [ $middleware == OSG ]; then \n'
743     txt += ' echo "Remove working directory: $WORKING_DIR"\n'
744     txt += ' cd $RUNTIME_AREA\n'
745     txt += ' /bin/rm -rf $WORKING_DIR\n'
746     txt += ' if [ -d $WORKING_DIR ] ;then\n'
747 gutsche 1.7 txt += ' echo "SET_CMS_ENV 10018 ==> OSG $WORKING_DIR could not be deleted on WN `hostname` after CMSSW CMSSW_0_6_1 not found on `hostname`"\n'
748     txt += ' echo "JOB_EXIT_STATUS = 10018"\n'
749     txt += ' echo "JobExitCode=10018" | tee -a $RUNTIME_AREA/$repo\n'
750     txt += ' dumpStatus $RUNTIME_AREA/$repo\n'
751 gutsche 1.13 txt += ' rm -f $RUNTIME_AREA/$repo \n'
752     txt += ' echo "MonitorJobID=`echo $MonitorJobID`" | tee -a $RUNTIME_AREA/$repo \n'
753     txt += ' echo "MonitorID=`echo $MonitorID`" | tee -a $RUNTIME_AREA/$repo\n'
754 gutsche 1.3 txt += ' fi\n'
755     txt += ' fi \n'
756     txt += ' exit 1 \n'
757 slacapra 1.1 txt += 'fi \n'
758     txt += 'echo "CMSSW_VERSION = '+self.version+'"\n'
759     txt += 'cd '+self.version+'\n'
760     ### needed grep for bug in scramv1 ###
761     txt += 'eval `'+scram+' runtime -sh | grep -v SCRAMRT_LSB_JOBNAME`\n'
762    
763     # Handle the arguments:
764     txt += "\n"
765 gutsche 1.7 txt += "## number of arguments (first argument always jobnumber)\n"
766 slacapra 1.1 txt += "\n"
767 mkirn 1.32 # txt += "narg=$#\n"
768     txt += "if [ $nargs -lt 2 ]\n"
769 slacapra 1.1 txt += "then\n"
770 mkirn 1.33 txt += " echo 'SET_EXE_ENV 1 ==> ERROR Too few arguments' +$nargs+ \n"
771 gutsche 1.3 txt += ' echo "JOB_EXIT_STATUS = 50113"\n'
772 gutsche 1.7 txt += ' echo "JobExitCode=50113" | tee -a $RUNTIME_AREA/$repo\n'
773 slacapra 1.1 txt += ' dumpStatus $RUNTIME_AREA/$repo\n'
774 gutsche 1.13 txt += ' rm -f $RUNTIME_AREA/$repo \n'
775     txt += ' echo "MonitorJobID=`echo $MonitorJobID`" | tee -a $RUNTIME_AREA/$repo \n'
776     txt += ' echo "MonitorID=`echo $MonitorID`" | tee -a $RUNTIME_AREA/$repo\n'
777 gutsche 1.3 ## OLI_Daniele
778     txt += ' if [ $middleware == OSG ]; then \n'
779     txt += ' echo "Remove working directory: $WORKING_DIR"\n'
780     txt += ' cd $RUNTIME_AREA\n'
781     txt += ' /bin/rm -rf $WORKING_DIR\n'
782     txt += ' if [ -d $WORKING_DIR ] ;then\n'
783 gutsche 1.7 txt += ' echo "SET_EXE_ENV 50114 ==> OSG $WORKING_DIR could not be deleted on WN `hostname` after Too few arguments for CRAB job wrapper"\n'
784     txt += ' echo "JOB_EXIT_STATUS = 50114"\n'
785     txt += ' echo "JobExitCode=50114" | tee -a $RUNTIME_AREA/$repo\n'
786     txt += ' dumpStatus $RUNTIME_AREA/$repo\n'
787 gutsche 1.13 txt += ' rm -f $RUNTIME_AREA/$repo \n'
788     txt += ' echo "MonitorJobID=`echo $MonitorJobID`" | tee -a $RUNTIME_AREA/$repo \n'
789     txt += ' echo "MonitorID=`echo $MonitorID`" | tee -a $RUNTIME_AREA/$repo\n'
790 gutsche 1.3 txt += ' fi\n'
791     txt += ' fi \n'
792 slacapra 1.1 txt += " exit 1\n"
793     txt += "fi\n"
794     txt += "\n"
795    
796     # Prepare job-specific part
797     job = common.job_list[nj]
798 spiga 1.42 if self.pset != None: #CarlosDaniele
799     pset = os.path.basename(job.configFilename())
800     txt += '\n'
801     if (self.datasetPath): # standard job
802     #txt += 'InputFiles=$2\n'
803     txt += 'InputFiles=${args[1]}\n'
804     txt += 'MaxEvents=${args[2]}\n'
805     txt += 'SkipEvents=${args[3]}\n'
806     txt += 'echo "Inputfiles:<$InputFiles>"\n'
807     txt += 'sed "s#{\'INPUT\'}#$InputFiles#" $RUNTIME_AREA/'+pset+' > pset_tmp_1.cfg\n'
808     txt += 'echo "MaxEvents:<$MaxEvents>"\n'
809 gutsche 1.46 txt += 'sed "s#INPUTMAXEVENTS#$MaxEvents#" pset_tmp_1.cfg > pset_tmp_2.cfg\n'
810 spiga 1.42 txt += 'echo "SkipEvents:<$SkipEvents>"\n'
811 gutsche 1.46 txt += 'sed "s#INPUTSKIPEVENTS#$SkipEvents#" pset_tmp_2.cfg > pset.cfg\n'
812 spiga 1.42 else: # pythia like job
813     if (self.sourceSeed):
814     # txt += 'Seed=$2\n'
815     txt += 'Seed=${args[1]}\n'
816     txt += 'echo "Seed: <$Seed>"\n'
817     txt += 'sed "s#\<INPUT\>#$Seed#" $RUNTIME_AREA/'+pset+' > tmp.cfg\n'
818     if (self.sourceSeedVtx):
819     # txt += 'VtxSeed=$3\n'
820     txt += 'VtxSeed=${args[2]}\n'
821     txt += 'echo "VtxSeed: <$VtxSeed>"\n'
822     txt += 'sed "s#INPUTVTX#$VtxSeed#" tmp.cfg > pset.cfg\n'
823     else:
824     txt += 'mv tmp.cfg pset.cfg\n'
825 slacapra 1.28 else:
826 spiga 1.42 txt += '# Copy untouched pset\n'
827     txt += 'cp $RUNTIME_AREA/'+pset+' pset.cfg\n'
828 slacapra 1.24
829 slacapra 1.1
830     if len(self.additional_inbox_files) > 0:
831     for file in self.additional_inbox_files:
832 mkirn 1.31 relFile = file.split("/")[-1]
833     txt += 'if [ -e $RUNTIME_AREA/'+relFile+' ] ; then\n'
834     txt += ' cp $RUNTIME_AREA/'+relFile+' .\n'
835     txt += ' chmod +x '+relFile+'\n'
836 slacapra 1.1 txt += 'fi\n'
837     pass
838    
839 spiga 1.42 if self.pset != None: #CarlosDaniele
840     txt += 'echo "### END JOB SETUP ENVIRONMENT ###"\n\n'
841    
842     txt += '\n'
843     txt += 'echo "***** cat pset.cfg *********"\n'
844     txt += 'cat pset.cfg\n'
845     txt += 'echo "****** end pset.cfg ********"\n'
846     txt += '\n'
847     # txt += 'echo "***** cat pset1.cfg *********"\n'
848     # txt += 'cat pset1.cfg\n'
849     # txt += 'echo "****** end pset1.cfg ********"\n'
850 gutsche 1.3 return txt
851    
852     def wsBuildExe(self, nj):
853     """
854     Put in the script the commands to build an executable
855     or a library.
856     """
857    
858     txt = ""
859    
860     if os.path.isfile(self.tgzNameWithPath):
861     txt += 'echo "tar xzvf $RUNTIME_AREA/'+os.path.basename(self.tgzNameWithPath)+'"\n'
862     txt += 'tar xzvf $RUNTIME_AREA/'+os.path.basename(self.tgzNameWithPath)+'\n'
863     txt += 'untar_status=$? \n'
864     txt += 'if [ $untar_status -ne 0 ]; then \n'
865     txt += ' echo "SET_EXE 1 ==> ERROR Untarring .tgz file failed"\n'
866     txt += ' echo "JOB_EXIT_STATUS = $untar_status" \n'
867 gutsche 1.7 txt += ' echo "JobExitCode=$untar_status" | tee -a $RUNTIME_AREA/$repo\n'
868 gutsche 1.3 txt += ' if [ $middleware == OSG ]; then \n'
869     txt += ' echo "Remove working directory: $WORKING_DIR"\n'
870     txt += ' cd $RUNTIME_AREA\n'
871     txt += ' /bin/rm -rf $WORKING_DIR\n'
872     txt += ' if [ -d $WORKING_DIR ] ;then\n'
873 gutsche 1.13 txt += ' echo "SET_EXE 50999 ==> OSG $WORKING_DIR could not be deleted on WN `hostname` after Untarring .tgz file failed"\n'
874     txt += ' echo "JOB_EXIT_STATUS = 50999"\n'
875     txt += ' echo "JobExitCode=50999" | tee -a $RUNTIME_AREA/$repo\n'
876     txt += ' dumpStatus $RUNTIME_AREA/$repo\n'
877     txt += ' rm -f $RUNTIME_AREA/$repo \n'
878     txt += ' echo "MonitorJobID=`echo $MonitorJobID`" | tee -a $RUNTIME_AREA/$repo \n'
879     txt += ' echo "MonitorID=`echo $MonitorID`" | tee -a $RUNTIME_AREA/$repo\n'
880 gutsche 1.3 txt += ' fi\n'
881     txt += ' fi \n'
882     txt += ' \n'
883 gutsche 1.7 txt += ' exit 1 \n'
884 gutsche 1.3 txt += 'else \n'
885     txt += ' echo "Successful untar" \n'
886     txt += 'fi \n'
887 gutsche 1.50 txt += '\n'
888     txt += 'echo "Include ProdAgentApi in PYTHONPATH"\n'
889     txt += 'if [ -z "$PYTHONPATH" ]; then\n'
890     txt += ' export PYTHONPATH=ProdAgentApi\n'
891     txt += 'else\n'
892     txt += ' export PYTHONPATH=ProdAgentApi:${PYTHONPATH}\n'
893     txt += 'fi\n'
894     txt += '\n'
895    
896 gutsche 1.3 pass
897    
898 slacapra 1.1 return txt
899    
900     def modifySteeringCards(self, nj):
901     """
902     modify the card provided by the user,
903     writing a new card into share dir
904     """
905    
906     def executableName(self):
907 spiga 1.42 if self.pset == None: #CarlosDaniele
908     return "sh "
909     else:
910     return self.executable
911 slacapra 1.1
912     def executableArgs(self):
913 spiga 1.42 if self.pset == None:#CarlosDaniele
914     return self.scriptExe + " $NJob"
915     else:
916     return " -p pset.cfg"
917 slacapra 1.1
918     def inputSandbox(self, nj):
919     """
920     Returns a list of filenames to be put in JDL input sandbox.
921     """
922     inp_box = []
923     # dict added to delete duplicate from input sandbox file list
924     seen = {}
925     ## code
926     if os.path.isfile(self.tgzNameWithPath):
927     inp_box.append(self.tgzNameWithPath)
928     ## config
929 spiga 1.42 if not self.pset is None: #CarlosDaniele
930     inp_box.append(common.job_list[nj].configFilename())
931 slacapra 1.1 ## additional input files
932 gutsche 1.3 #for file in self.additional_inbox_files:
933     # inp_box.append(common.work_space.cwdDir()+file)
934 slacapra 1.1 return inp_box
935    
936     def outputSandbox(self, nj):
937     """
938     Returns a list of filenames to be put in JDL output sandbox.
939     """
940     out_box = []
941    
942     stdout=common.job_list[nj].stdout()
943     stderr=common.job_list[nj].stderr()
944    
945     ## User Declared output files
946     for out in self.output_file:
947     n_out = nj + 1
948     out_box.append(self.numberFile_(out,str(n_out)))
949     return out_box
950     return []
951    
952     def prepareSteeringCards(self):
953     """
954     Make initial modifications of the user's steering card file.
955     """
956     return
957    
958     def wsRenameOutput(self, nj):
959     """
960     Returns part of a job script which renames the produced files.
961     """
962    
963     txt = '\n'
964 gutsche 1.7 txt += '# directory content\n'
965     txt += 'ls \n'
966 slacapra 1.1 file_list = ''
967     for fileWithSuffix in self.output_file:
968     output_file_num = self.numberFile_(fileWithSuffix, '$NJob')
969 gutsche 1.7 file_list=file_list+output_file_num+' '
970 slacapra 1.1 txt += '\n'
971 gutsche 1.7 txt += '# check output file\n'
972 slacapra 1.1 txt += 'ls '+fileWithSuffix+'\n'
973 fanzago 1.18 txt += 'ls_result=$?\n'
974     #txt += 'exe_result=$?\n'
975     txt += 'if [ $ls_result -ne 0 ] ; then\n'
976     txt += ' echo "ERROR: Problem with output file"\n'
977     #txt += ' echo "JOB_EXIT_STATUS = $exe_result"\n'
978     #txt += ' echo "JobExitCode=60302" | tee -a $RUNTIME_AREA/$repo\n'
979     #txt += ' dumpStatus $RUNTIME_AREA/$repo\n'
980 gutsche 1.3 ### OLI_DANIELE
981 gutsche 1.7 if common.scheduler.boss_scheduler_name == 'condor_g':
982     txt += ' if [ $middleware == OSG ]; then \n'
983     txt += ' echo "prepare dummy output file"\n'
984     txt += ' echo "Processing of job output failed" > $RUNTIME_AREA/'+output_file_num+'\n'
985     txt += ' fi \n'
986 slacapra 1.1 txt += 'else\n'
987     txt += ' cp '+fileWithSuffix+' $RUNTIME_AREA/'+output_file_num+'\n'
988     txt += 'fi\n'
989    
990 gutsche 1.7 txt += 'cd $RUNTIME_AREA\n'
991 slacapra 1.1 file_list=file_list[:-1]
992 slacapra 1.2 txt += 'file_list="'+file_list+'"\n'
993 fanzago 1.18 txt += 'cd $RUNTIME_AREA\n'
994 gutsche 1.3 ### OLI_DANIELE
995     txt += 'if [ $middleware == OSG ]; then\n'
996     txt += ' cd $RUNTIME_AREA\n'
997     txt += ' echo "Remove working directory: $WORKING_DIR"\n'
998     txt += ' /bin/rm -rf $WORKING_DIR\n'
999     txt += ' if [ -d $WORKING_DIR ] ;then\n'
1000 gutsche 1.7 txt += ' echo "SET_EXE 60999 ==> OSG $WORKING_DIR could not be deleted on WN `hostname` after cleanup of WN"\n'
1001     txt += ' echo "JOB_EXIT_STATUS = 60999"\n'
1002     txt += ' echo "JobExitCode=60999" | tee -a $RUNTIME_AREA/$repo\n'
1003     txt += ' dumpStatus $RUNTIME_AREA/$repo\n'
1004 gutsche 1.13 txt += ' rm -f $RUNTIME_AREA/$repo \n'
1005     txt += ' echo "MonitorJobID=`echo $MonitorJobID`" | tee -a $RUNTIME_AREA/$repo \n'
1006     txt += ' echo "MonitorID=`echo $MonitorID`" | tee -a $RUNTIME_AREA/$repo\n'
1007 gutsche 1.3 txt += ' fi\n'
1008     txt += 'fi\n'
1009     txt += '\n'
1010 slacapra 1.1 return txt
1011    
1012     def numberFile_(self, file, txt):
1013     """
1014     append _'txt' before last extension of a file
1015     """
1016     p = string.split(file,".")
1017     # take away last extension
1018     name = p[0]
1019     for x in p[1:-1]:
1020     name=name+"."+x
1021     # add "_txt"
1022     if len(p)>1:
1023     ext = p[len(p)-1]
1024     result = name + '_' + txt + "." + ext
1025     else:
1026     result = name + '_' + txt
1027    
1028     return result
1029    
1030 slacapra 1.41 def getRequirements(self):
1031 slacapra 1.1 """
1032     return job requirements to add to jdl files
1033     """
1034     req = ''
1035 slacapra 1.47 if self.version:
1036 slacapra 1.10 req='Member("VO-cms-' + \
1037 slacapra 1.47 self.version + \
1038 slacapra 1.10 '", other.GlueHostApplicationSoftwareRunTimeEnvironment)'
1039 gutsche 1.35
1040     req = req + ' && (other.GlueHostNetworkAdapterOutboundIP)'
1041    
1042 slacapra 1.1 return req
1043 gutsche 1.3
1044     def configFilename(self):
1045     """ return the config filename """
1046     return self.name()+'.cfg'
1047    
1048     ### OLI_DANIELE
1049     def wsSetupCMSOSGEnvironment_(self):
1050     """
1051     Returns part of a job script which is prepares
1052     the execution environment and which is common for all CMS jobs.
1053     """
1054     txt = '\n'
1055     txt += ' echo "### SETUP CMS OSG ENVIRONMENT ###"\n'
1056     txt += ' if [ -f $GRID3_APP_DIR/cmssoft/cmsset_default.sh ] ;then\n'
1057     txt += ' # Use $GRID3_APP_DIR/cmssoft/cmsset_default.sh to setup cms software\n'
1058     txt += ' source $GRID3_APP_DIR/cmssoft/cmsset_default.sh '+self.version+'\n'
1059 mkirn 1.40 txt += ' elif [ -f $OSG_APP/cmssoft/cms/cmsset_default.sh ] ;then\n'
1060     txt += ' # Use $OSG_APP/cmssoft/cms/cmsset_default.sh to setup cms software\n'
1061     txt += ' source $OSG_APP/cmssoft/cms/cmsset_default.sh '+self.version+'\n'
1062 gutsche 1.3 txt += ' else\n'
1063 mkirn 1.40 txt += ' echo "SET_CMS_ENV 10020 ==> ERROR $GRID3_APP_DIR/cmssoft/cmsset_default.sh and $OSG_APP/cmssoft/cms/cmsset_default.sh file not found"\n'
1064 gutsche 1.3 txt += ' echo "JOB_EXIT_STATUS = 10020"\n'
1065     txt += ' echo "JobExitCode=10020" | tee -a $RUNTIME_AREA/$repo\n'
1066     txt += ' dumpStatus $RUNTIME_AREA/$repo\n'
1067 gutsche 1.13 txt += ' rm -f $RUNTIME_AREA/$repo \n'
1068     txt += ' echo "MonitorJobID=`echo $MonitorJobID`" | tee -a $RUNTIME_AREA/$repo \n'
1069     txt += ' echo "MonitorID=`echo $MonitorID`" | tee -a $RUNTIME_AREA/$repo\n'
1070 gutsche 1.7 txt += ' exit 1\n'
1071 gutsche 1.3 txt += '\n'
1072     txt += ' echo "Remove working directory: $WORKING_DIR"\n'
1073     txt += ' cd $RUNTIME_AREA\n'
1074     txt += ' /bin/rm -rf $WORKING_DIR\n'
1075     txt += ' if [ -d $WORKING_DIR ] ;then\n'
1076 mkirn 1.40 txt += ' echo "SET_CMS_ENV 10017 ==> OSG $WORKING_DIR could not be deleted on WN `hostname` after $GRID3_APP_DIR/cmssoft/cmsset_default.sh and $OSG_APP/cmssoft/cms/cmsset_default.sh file not found"\n'
1077 gutsche 1.7 txt += ' echo "JOB_EXIT_STATUS = 10017"\n'
1078     txt += ' echo "JobExitCode=10017" | tee -a $RUNTIME_AREA/$repo\n'
1079     txt += ' dumpStatus $RUNTIME_AREA/$repo\n'
1080 gutsche 1.13 txt += ' rm -f $RUNTIME_AREA/$repo \n'
1081     txt += ' echo "MonitorJobID=`echo $MonitorJobID`" | tee -a $RUNTIME_AREA/$repo \n'
1082     txt += ' echo "MonitorID=`echo $MonitorID`" | tee -a $RUNTIME_AREA/$repo\n'
1083 gutsche 1.3 txt += ' fi\n'
1084     txt += '\n'
1085 gutsche 1.7 txt += ' exit 1\n'
1086 gutsche 1.3 txt += ' fi\n'
1087     txt += '\n'
1088     txt += ' echo "SET_CMS_ENV 0 ==> setup cms environment ok"\n'
1089     txt += ' echo " END SETUP CMS OSG ENVIRONMENT "\n'
1090    
1091     return txt
1092    
1093     ### OLI_DANIELE
1094     def wsSetupCMSLCGEnvironment_(self):
1095     """
1096     Returns part of a job script which is prepares
1097     the execution environment and which is common for all CMS jobs.
1098     """
1099     txt = ' \n'
1100     txt += ' echo " ### SETUP CMS LCG ENVIRONMENT ### "\n'
1101     txt += ' if [ ! $VO_CMS_SW_DIR ] ;then\n'
1102     txt += ' echo "SET_CMS_ENV 10031 ==> ERROR CMS software dir not found on WN `hostname`"\n'
1103     txt += ' echo "JOB_EXIT_STATUS = 10031" \n'
1104     txt += ' echo "JobExitCode=10031" | tee -a $RUNTIME_AREA/$repo\n'
1105     txt += ' dumpStatus $RUNTIME_AREA/$repo\n'
1106 gutsche 1.13 txt += ' rm -f $RUNTIME_AREA/$repo \n'
1107     txt += ' echo "MonitorJobID=`echo $MonitorJobID`" | tee -a $RUNTIME_AREA/$repo \n'
1108     txt += ' echo "MonitorID=`echo $MonitorID`" | tee -a $RUNTIME_AREA/$repo\n'
1109 gutsche 1.7 txt += ' exit 1\n'
1110 gutsche 1.3 txt += ' else\n'
1111     txt += ' echo "Sourcing environment... "\n'
1112     txt += ' if [ ! -s $VO_CMS_SW_DIR/cmsset_default.sh ] ;then\n'
1113     txt += ' echo "SET_CMS_ENV 10020 ==> ERROR cmsset_default.sh file not found into dir $VO_CMS_SW_DIR"\n'
1114     txt += ' echo "JOB_EXIT_STATUS = 10020"\n'
1115     txt += ' echo "JobExitCode=10020" | tee -a $RUNTIME_AREA/$repo\n'
1116     txt += ' dumpStatus $RUNTIME_AREA/$repo\n'
1117 gutsche 1.13 txt += ' rm -f $RUNTIME_AREA/$repo \n'
1118     txt += ' echo "MonitorJobID=`echo $MonitorJobID`" | tee -a $RUNTIME_AREA/$repo \n'
1119     txt += ' echo "MonitorID=`echo $MonitorID`" | tee -a $RUNTIME_AREA/$repo\n'
1120 gutsche 1.7 txt += ' exit 1\n'
1121 gutsche 1.3 txt += ' fi\n'
1122     txt += ' echo "sourcing $VO_CMS_SW_DIR/cmsset_default.sh"\n'
1123     txt += ' source $VO_CMS_SW_DIR/cmsset_default.sh\n'
1124     txt += ' result=$?\n'
1125     txt += ' if [ $result -ne 0 ]; then\n'
1126     txt += ' echo "SET_CMS_ENV 10032 ==> ERROR problem sourcing $VO_CMS_SW_DIR/cmsset_default.sh"\n'
1127     txt += ' echo "JOB_EXIT_STATUS = 10032"\n'
1128     txt += ' echo "JobExitCode=10032" | tee -a $RUNTIME_AREA/$repo\n'
1129     txt += ' dumpStatus $RUNTIME_AREA/$repo\n'
1130 gutsche 1.13 txt += ' rm -f $RUNTIME_AREA/$repo \n'
1131     txt += ' echo "MonitorJobID=`echo $MonitorJobID`" | tee -a $RUNTIME_AREA/$repo \n'
1132     txt += ' echo "MonitorID=`echo $MonitorID`" | tee -a $RUNTIME_AREA/$repo\n'
1133 gutsche 1.7 txt += ' exit 1\n'
1134 gutsche 1.3 txt += ' fi\n'
1135     txt += ' fi\n'
1136     txt += ' \n'
1137     txt += ' string=`cat /etc/redhat-release`\n'
1138     txt += ' echo $string\n'
1139     txt += ' if [[ $string = *alhalla* ]]; then\n'
1140     txt += ' echo "SCRAM_ARCH= $SCRAM_ARCH"\n'
1141     txt += ' elif [[ $string = *Enterprise* ]] || [[ $string = *cientific* ]]; then\n'
1142     txt += ' export SCRAM_ARCH=slc3_ia32_gcc323\n'
1143     txt += ' echo "SCRAM_ARCH= $SCRAM_ARCH"\n'
1144     txt += ' else\n'
1145 gutsche 1.7 txt += ' echo "SET_CMS_ENV 10033 ==> ERROR OS unknown, LCG environment not initialized"\n'
1146 gutsche 1.3 txt += ' echo "JOB_EXIT_STATUS = 10033"\n'
1147     txt += ' echo "JobExitCode=10033" | tee -a $RUNTIME_AREA/$repo\n'
1148     txt += ' dumpStatus $RUNTIME_AREA/$repo\n'
1149 gutsche 1.13 txt += ' rm -f $RUNTIME_AREA/$repo \n'
1150     txt += ' echo "MonitorJobID=`echo $MonitorJobID`" | tee -a $RUNTIME_AREA/$repo \n'
1151     txt += ' echo "MonitorID=`echo $MonitorID`" | tee -a $RUNTIME_AREA/$repo\n'
1152 gutsche 1.7 txt += ' exit 1\n'
1153 gutsche 1.3 txt += ' fi\n'
1154     txt += ' echo "SET_CMS_ENV 0 ==> setup cms environment ok"\n'
1155     txt += ' echo "### END SETUP CMS LCG ENVIRONMENT ###"\n'
1156     return txt
1157 gutsche 1.5
1158     def setParam_(self, param, value):
1159     self._params[param] = value
1160    
1161     def getParams(self):
1162     return self._params
1163 gutsche 1.8
1164     def setTaskid_(self):
1165     self._taskId = self.cfg_params['taskId']
1166    
1167     def getTaskid(self):
1168     return self._taskId
1169 gutsche 1.35
1170     #######################################################################
1171     def uniquelist(self, old):
1172     """
1173     remove duplicates from a list
1174     """
1175     nd={}
1176     for e in old:
1177     nd[e]=0
1178     return nd.keys()