ViewVC Help
View File | Revision Log | Show Annotations | Root Listing
root/cvsroot/COMP/CRAB/python/cms_cmssw.py
Revision: 1.67
Committed: Wed Feb 7 16:47:45 2007 UTC (18 years, 2 months ago) by gutsche
Content type: text/x-python
Branch: MAIN
Changes since 1.66: +9 -1 lines
Log Message:
Add dbs_version card for steering the api version of the selected dbs server, default v00_00_05. Improved exception handling

File Contents

# User Rev Content
1 slacapra 1.1 from JobType import JobType
2     from crab_logger import Logger
3     from crab_exceptions import *
4     from crab_util import *
5     import common
6 gutsche 1.3 import PsetManipulator
7 slacapra 1.1
8 slacapra 1.41 import DataDiscovery
9 gutsche 1.66 import DataDiscovery_DBS2
10 slacapra 1.41 import DataLocation
11 slacapra 1.1 import Scram
12    
13 slacapra 1.63 import os, string, re, shutil
14 slacapra 1.1
15     class Cmssw(JobType):
16 gutsche 1.38 def __init__(self, cfg_params, ncjobs):
17 slacapra 1.1 JobType.__init__(self, 'CMSSW')
18     common.logger.debug(3,'CMSSW::__init__')
19    
20 gutsche 1.3 # Marco.
21     self._params = {}
22     self.cfg_params = cfg_params
23 gutsche 1.38
24 gutsche 1.44 # number of jobs requested to be created, limit obj splitting
25 gutsche 1.38 self.ncjobs = ncjobs
26    
27 slacapra 1.1 log = common.logger
28    
29     self.scram = Scram.Scram(cfg_params)
30     self.additional_inbox_files = []
31     self.scriptExe = ''
32     self.executable = ''
33     self.tgz_name = 'default.tgz'
34 corvo 1.56 self.scriptName = 'CMSSW.sh'
35 spiga 1.42 self.pset = '' #scrip use case Da
36     self.datasetPath = '' #scrip use case Da
37 gutsche 1.3
38 gutsche 1.50 # set FJR file name
39     self.fjrFileName = 'crab_fjr.xml'
40    
41 slacapra 1.1 self.version = self.scram.getSWVersion()
42 slacapra 1.55 common.taskDB.setDict('codeVersion',self.version)
43 gutsche 1.5 self.setParam_('application', self.version)
44 slacapra 1.47
45 slacapra 1.1 ### collect Data cards
46 gutsche 1.66
47     ## get DBS mode
48     self.use_dbs_2 = 0
49     try:
50     self.use_dbs_2 = int(self.cfg_params['CMSSW.use_dbs_2'])
51     except KeyError:
52     self.use_dbs_2 = 0
53    
54 slacapra 1.1 try:
55 slacapra 1.9 tmp = cfg_params['CMSSW.datasetpath']
56     log.debug(6, "CMSSW::CMSSW(): datasetPath = "+tmp)
57     if string.lower(tmp)=='none':
58     self.datasetPath = None
59 slacapra 1.21 self.selectNoInput = 1
60 slacapra 1.9 else:
61     self.datasetPath = tmp
62 slacapra 1.21 self.selectNoInput = 0
63 slacapra 1.1 except KeyError:
64 gutsche 1.3 msg = "Error: datasetpath not defined "
65 slacapra 1.1 raise CrabException(msg)
66 gutsche 1.5
67     # ML monitoring
68     # split dataset path style: /PreProdR3Minbias/SIM/GEN-SIM
69 slacapra 1.9 if not self.datasetPath:
70     self.setParam_('dataset', 'None')
71     self.setParam_('owner', 'None')
72     else:
73     datasetpath_split = self.datasetPath.split("/")
74     self.setParam_('dataset', datasetpath_split[1])
75     self.setParam_('owner', datasetpath_split[-1])
76    
77 gutsche 1.8 self.setTaskid_()
78     self.setParam_('taskId', self.cfg_params['taskId'])
79 gutsche 1.5
80 slacapra 1.1 self.dataTiers = []
81    
82     ## now the application
83     try:
84     self.executable = cfg_params['CMSSW.executable']
85 gutsche 1.5 self.setParam_('exe', self.executable)
86 slacapra 1.1 log.debug(6, "CMSSW::CMSSW(): executable = "+self.executable)
87     msg = "Default executable cmsRun overridden. Switch to " + self.executable
88     log.debug(3,msg)
89     except KeyError:
90     self.executable = 'cmsRun'
91 gutsche 1.5 self.setParam_('exe', self.executable)
92 slacapra 1.1 msg = "User executable not defined. Use cmsRun"
93     log.debug(3,msg)
94     pass
95    
96     try:
97     self.pset = cfg_params['CMSSW.pset']
98     log.debug(6, "Cmssw::Cmssw(): PSet file = "+self.pset)
99 spiga 1.42 if self.pset.lower() != 'none' :
100     if (not os.path.exists(self.pset)):
101     raise CrabException("User defined PSet file "+self.pset+" does not exist")
102     else:
103     self.pset = None
104 slacapra 1.1 except KeyError:
105     raise CrabException("PSet file missing. Cannot run cmsRun ")
106    
107     # output files
108 slacapra 1.53 ## stuff which must be returned always via sandbox
109     self.output_file_sandbox = []
110    
111     # add fjr report by default via sandbox
112     self.output_file_sandbox.append(self.fjrFileName)
113    
114     # other output files to be returned via sandbox or copied to SE
115 slacapra 1.1 try:
116     self.output_file = []
117     tmp = cfg_params['CMSSW.output_file']
118     if tmp != '':
119     tmpOutFiles = string.split(cfg_params['CMSSW.output_file'],',')
120     log.debug(7, 'cmssw::cmssw(): output files '+str(tmpOutFiles))
121     for tmp in tmpOutFiles:
122     tmp=string.strip(tmp)
123     self.output_file.append(tmp)
124     pass
125     else:
126 gutsche 1.50 log.message("No output file defined: only stdout/err and the CRAB Framework Job Report will be available")
127 slacapra 1.1 pass
128     pass
129     except KeyError:
130 gutsche 1.50 log.message("No output file defined: only stdout/err and the CRAB Framework Job Report will be available")
131 slacapra 1.1 pass
132    
133     # script_exe file as additional file in inputSandbox
134     try:
135 slacapra 1.10 self.scriptExe = cfg_params['USER.script_exe']
136     if self.scriptExe != '':
137     if not os.path.isfile(self.scriptExe):
138 slacapra 1.64 msg ="ERROR. file "+self.scriptExe+" not found"
139 slacapra 1.10 raise CrabException(msg)
140 spiga 1.42 self.additional_inbox_files.append(string.strip(self.scriptExe))
141 slacapra 1.1 except KeyError:
142 spiga 1.42 self.scriptExe = ''
143     #CarlosDaniele
144     if self.datasetPath == None and self.pset == None and self.scriptExe == '' :
145     msg ="WARNING. script_exe not defined"
146     raise CrabException(msg)
147    
148 slacapra 1.1 ## additional input files
149     try:
150 slacapra 1.29 tmpAddFiles = string.split(cfg_params['USER.additional_input_files'],',')
151 slacapra 1.64 common.logger.debug(5,"Additional input files: "+str(tmpAddFiles))
152     for tmpFile in tmpAddFiles:
153     tmpFile = string.strip(tmpFile)
154     if not os.path.exists(tmpFile):
155     raise CrabException("Additional input file not found: "+tmpFile)
156 slacapra 1.45 pass
157 slacapra 1.64 storedFile = common.work_space.shareDir()+ tmpFile
158     shutil.copyfile(tmpFile, storedFile)
159     self.additional_inbox_files.append(string.strip(storedFile))
160 slacapra 1.1 pass
161 slacapra 1.64 common.logger.debug(5,"Inbox files so far : "+str(self.additional_inbox_files))
162 slacapra 1.1 pass
163     except KeyError:
164     pass
165    
166 slacapra 1.9 # files per job
167 slacapra 1.1 try:
168 gutsche 1.35 if (cfg_params['CMSSW.files_per_jobs']):
169     raise CrabException("files_per_jobs no longer supported. Quitting.")
170 gutsche 1.3 except KeyError:
171 gutsche 1.35 pass
172 gutsche 1.3
173 slacapra 1.9 ## Events per job
174 gutsche 1.3 try:
175 slacapra 1.10 self.eventsPerJob =int( cfg_params['CMSSW.events_per_job'])
176 slacapra 1.9 self.selectEventsPerJob = 1
177 gutsche 1.3 except KeyError:
178 slacapra 1.9 self.eventsPerJob = -1
179     self.selectEventsPerJob = 0
180    
181 slacapra 1.22 ## number of jobs
182     try:
183     self.theNumberOfJobs =int( cfg_params['CMSSW.number_of_jobs'])
184     self.selectNumberOfJobs = 1
185     except KeyError:
186     self.theNumberOfJobs = 0
187     self.selectNumberOfJobs = 0
188 slacapra 1.10
189 gutsche 1.35 try:
190     self.total_number_of_events = int(cfg_params['CMSSW.total_number_of_events'])
191     self.selectTotalNumberEvents = 1
192     except KeyError:
193     self.total_number_of_events = 0
194     self.selectTotalNumberEvents = 0
195    
196 spiga 1.42 if self.pset != None: #CarlosDaniele
197     if ( (self.selectTotalNumberEvents + self.selectEventsPerJob + self.selectNumberOfJobs) != 2 ):
198     msg = 'Must define exactly two of total_number_of_events, events_per_job, or number_of_jobs.'
199     raise CrabException(msg)
200     else:
201     if (self.selectNumberOfJobs == 0):
202     msg = 'Must specify number_of_jobs.'
203     raise CrabException(msg)
204 gutsche 1.35
205 slacapra 1.22 ## source seed for pythia
206     try:
207     self.sourceSeed = int(cfg_params['CMSSW.pythia_seed'])
208     except KeyError:
209 slacapra 1.23 self.sourceSeed = None
210     common.logger.debug(5,"No seed given")
211 slacapra 1.22
212 slacapra 1.28 try:
213     self.sourceSeedVtx = int(cfg_params['CMSSW.vtx_seed'])
214     except KeyError:
215     self.sourceSeedVtx = None
216     common.logger.debug(5,"No vertex seed given")
217 spiga 1.57 try:
218     self.firstRun = int(cfg_params['CMSSW.first_run'])
219     except KeyError:
220     self.firstRun = None
221     common.logger.debug(5,"No first run given")
222 spiga 1.42 if self.pset != None: #CarlosDaniele
223     self.PsetEdit = PsetManipulator.PsetManipulator(self.pset) #Daniele Pset
224 gutsche 1.3
225 slacapra 1.1 #DBSDLS-start
226     ## Initialize the variables that are extracted from DBS/DLS and needed in other places of the code
227     self.maxEvents=0 # max events available ( --> check the requested nb. of evts in Creator.py)
228     self.DBSPaths={} # all dbs paths requested ( --> input to the site local discovery script)
229 gutsche 1.35 self.jobDestination=[] # Site destination(s) for each job (list of lists)
230 slacapra 1.1 ## Perform the data location and discovery (based on DBS/DLS)
231 slacapra 1.9 ## SL: Don't if NONE is specified as input (pythia use case)
232 gutsche 1.35 blockSites = {}
233 slacapra 1.9 if self.datasetPath:
234 gutsche 1.35 blockSites = self.DataDiscoveryAndLocation(cfg_params)
235 slacapra 1.1 #DBSDLS-end
236    
237     self.tgzNameWithPath = self.getTarBall(self.executable)
238 slacapra 1.10
239 slacapra 1.9 ## Select Splitting
240 spiga 1.42 if self.selectNoInput:
241     if self.pset == None: #CarlosDaniele
242     self.jobSplittingForScript()
243     else:
244     self.jobSplittingNoInput()
245 corvo 1.56 else:
246     self.jobSplittingByBlocks(blockSites)
247 gutsche 1.5
248 slacapra 1.22 # modify Pset
249 spiga 1.42 if self.pset != None: #CarlosDaniele
250     try:
251     if (self.datasetPath): # standard job
252     # allow to processa a fraction of events in a file
253     self.PsetEdit.inputModule("INPUT")
254     self.PsetEdit.maxEvent("INPUTMAXEVENTS")
255     self.PsetEdit.skipEvent("INPUTSKIPEVENTS")
256     else: # pythia like job
257     self.PsetEdit.maxEvent(self.eventsPerJob)
258 spiga 1.57 if (self.firstRun):
259     self.PsetEdit.pythiaFirstRun("INPUTFIRSTRUN") #First Run
260 spiga 1.42 if (self.sourceSeed) :
261     self.PsetEdit.pythiaSeed("INPUT")
262     if (self.sourceSeedVtx) :
263     self.PsetEdit.pythiaSeedVtx("INPUTVTX")
264 gutsche 1.50 # add FrameworkJobReport to parameter-set
265     self.PsetEdit.addCrabFJR(self.fjrFileName)
266 spiga 1.42 self.PsetEdit.psetWriter(self.configFilename())
267     except:
268     msg='Error while manipuliating ParameterSet: exiting...'
269     raise CrabException(msg)
270 gutsche 1.3
271 slacapra 1.1 def DataDiscoveryAndLocation(self, cfg_params):
272    
273 gutsche 1.3 common.logger.debug(10,"CMSSW::DataDiscoveryAndLocation()")
274    
275     datasetPath=self.datasetPath
276    
277 slacapra 1.1 ## Contact the DBS
278 slacapra 1.41 common.logger.message("Contacting DBS...")
279 slacapra 1.1 try:
280 gutsche 1.66
281     if self.use_dbs_2 == 1 :
282     self.pubdata=DataDiscovery_DBS2.DataDiscovery_DBS2(datasetPath, cfg_params)
283     else :
284     self.pubdata=DataDiscovery.DataDiscovery(datasetPath, cfg_params)
285 slacapra 1.1 self.pubdata.fetchDBSInfo()
286    
287 slacapra 1.41 except DataDiscovery.NotExistingDatasetError, ex :
288 slacapra 1.1 msg = 'ERROR ***: failed Data Discovery in DBS : %s'%ex.getErrorMessage()
289     raise CrabException(msg)
290 slacapra 1.41 except DataDiscovery.NoDataTierinProvenanceError, ex :
291 slacapra 1.1 msg = 'ERROR ***: failed Data Discovery in DBS : %s'%ex.getErrorMessage()
292     raise CrabException(msg)
293 slacapra 1.41 except DataDiscovery.DataDiscoveryError, ex:
294 gutsche 1.66 msg = 'ERROR ***: failed Data Discovery in DBS : %s'%ex.getErrorMessage()
295 slacapra 1.1 raise CrabException(msg)
296 gutsche 1.67 except DataDiscovery_DBS2.NotExistingDatasetError_DBS2, ex :
297     msg = 'ERROR ***: failed Data Discovery in DBS : %s'%ex.getErrorMessage()
298     raise CrabException(msg)
299     except DataDiscovery_DBS2.NoDataTierinProvenanceError_DBS2, ex :
300     msg = 'ERROR ***: failed Data Discovery in DBS : %s'%ex.getErrorMessage()
301     raise CrabException(msg)
302     except DataDiscovery_DBS2.DataDiscoveryError_DBS2, ex:
303     msg = 'ERROR ***: failed Data Discovery in DBS : %s'%ex.getErrorMessage()
304     raise CrabException(msg)
305 slacapra 1.1
306     ## get list of all required data in the form of dbs paths (dbs path = /dataset/datatier/owner)
307 gutsche 1.3 common.logger.message("Required data are :"+self.datasetPath)
308    
309 gutsche 1.35 self.filesbyblock=self.pubdata.getFiles()
310 mkirn 1.37 self.eventsbyblock=self.pubdata.getEventsPerBlock()
311     self.eventsbyfile=self.pubdata.getEventsPerFile()
312 gutsche 1.3
313 slacapra 1.1 ## get max number of events
314     self.maxEvents=self.pubdata.getMaxEvents() ## self.maxEvents used in Creator.py
315 gutsche 1.44 common.logger.message("The number of available events is %s\n"%self.maxEvents)
316 slacapra 1.1
317 slacapra 1.41 common.logger.message("Contacting DLS...")
318 slacapra 1.1 ## Contact the DLS and build a list of sites hosting the fileblocks
319     try:
320 slacapra 1.41 dataloc=DataLocation.DataLocation(self.filesbyblock.keys(),cfg_params)
321 gutsche 1.6 dataloc.fetchDLSInfo()
322 slacapra 1.41 except DataLocation.DataLocationError , ex:
323 slacapra 1.1 msg = 'ERROR ***: failed Data Location in DLS \n %s '%ex.getErrorMessage()
324     raise CrabException(msg)
325    
326    
327 gutsche 1.35 sites = dataloc.getSites()
328     allSites = []
329     listSites = sites.values()
330 slacapra 1.63 for listSite in listSites:
331     for oneSite in listSite:
332 gutsche 1.35 allSites.append(oneSite)
333     allSites = self.uniquelist(allSites)
334 gutsche 1.3
335 gutsche 1.35 common.logger.message("Sites ("+str(len(allSites))+") hosting part/all of dataset: "+str(allSites))
336     common.logger.debug(6, "List of Sites: "+str(allSites))
337     return sites
338 gutsche 1.3
339 gutsche 1.35 def jobSplittingByBlocks(self, blockSites):
340 slacapra 1.9 """
341 gutsche 1.35 Perform job splitting. Jobs run over an integer number of files
342     and no more than one block.
343     ARGUMENT: blockSites: dictionary with blocks as keys and list of host sites as values
344     REQUIRES: self.selectTotalNumberEvents, self.selectEventsPerJob, self.selectNumberofJobs,
345     self.total_number_of_events, self.eventsPerJob, self.theNumberOfJobs,
346     self.maxEvents, self.filesbyblock
347     SETS: self.jobDestination - Site destination(s) for each job (a list of lists)
348     self.total_number_of_jobs - Total # of jobs
349     self.list_of_args - File(s) job will run on (a list of lists)
350     """
351    
352     # ---- Handle the possible job splitting configurations ---- #
353     if (self.selectTotalNumberEvents):
354     totalEventsRequested = self.total_number_of_events
355     if (self.selectEventsPerJob):
356     eventsPerJobRequested = self.eventsPerJob
357     if (self.selectNumberOfJobs):
358     totalEventsRequested = self.theNumberOfJobs * self.eventsPerJob
359    
360     # If user requested all the events in the dataset
361     if (totalEventsRequested == -1):
362     eventsRemaining=self.maxEvents
363     # If user requested more events than are in the dataset
364     elif (totalEventsRequested > self.maxEvents):
365     eventsRemaining = self.maxEvents
366     common.logger.message("Requested "+str(self.total_number_of_events)+ " events, but only "+str(self.maxEvents)+" events are available.")
367     # If user requested less events than are in the dataset
368     else:
369     eventsRemaining = totalEventsRequested
370 slacapra 1.22
371 slacapra 1.41 # If user requested more events per job than are in the dataset
372     if (self.selectEventsPerJob and eventsPerJobRequested > self.maxEvents):
373     eventsPerJobRequested = self.maxEvents
374    
375 gutsche 1.35 # For user info at end
376     totalEventCount = 0
377 gutsche 1.3
378 gutsche 1.35 if (self.selectTotalNumberEvents and self.selectNumberOfJobs):
379     eventsPerJobRequested = int(eventsRemaining/self.theNumberOfJobs)
380 slacapra 1.22
381 gutsche 1.35 if (self.selectNumberOfJobs):
382     common.logger.message("May not create the exact number_of_jobs requested.")
383 slacapra 1.23
384 gutsche 1.38 if ( self.ncjobs == 'all' ) :
385     totalNumberOfJobs = 999999999
386     else :
387     totalNumberOfJobs = self.ncjobs
388    
389    
390 gutsche 1.35 blocks = blockSites.keys()
391     blockCount = 0
392     # Backup variable in case self.maxEvents counted events in a non-included block
393     numBlocksInDataset = len(blocks)
394 gutsche 1.3
395 gutsche 1.35 jobCount = 0
396     list_of_lists = []
397 gutsche 1.3
398 gutsche 1.35 # ---- Iterate over the blocks in the dataset until ---- #
399     # ---- we've met the requested total # of events ---- #
400 gutsche 1.38 while ( (eventsRemaining > 0) and (blockCount < numBlocksInDataset) and (jobCount < totalNumberOfJobs)):
401 gutsche 1.35 block = blocks[blockCount]
402 gutsche 1.44 blockCount += 1
403    
404 gutsche 1.3
405 gutsche 1.44 numEventsInBlock = self.eventsbyblock[block]
406     common.logger.debug(5,'Events in Block File '+str(numEventsInBlock))
407 slacapra 1.9
408 gutsche 1.35 files = self.filesbyblock[block]
409     numFilesInBlock = len(files)
410     if (numFilesInBlock <= 0):
411     continue
412     fileCount = 0
413    
414     # ---- New block => New job ---- #
415     parString = "\\{"
416 gutsche 1.38 # counter for number of events in files currently worked on
417     filesEventCount = 0
418     # flag if next while loop should touch new file
419     newFile = 1
420     # job event counter
421     jobSkipEventCount = 0
422 slacapra 1.9
423 gutsche 1.35 # ---- Iterate over the files in the block until we've met the requested ---- #
424     # ---- total # of events or we've gone over all the files in this block ---- #
425 gutsche 1.38 while ( (eventsRemaining > 0) and (fileCount < numFilesInBlock) and (jobCount < totalNumberOfJobs) ):
426 gutsche 1.35 file = files[fileCount]
427 gutsche 1.38 if newFile :
428 slacapra 1.41 try:
429     numEventsInFile = self.eventsbyfile[file]
430     common.logger.debug(6, "File "+str(file)+" has "+str(numEventsInFile)+" events")
431     # increase filesEventCount
432     filesEventCount += numEventsInFile
433     # Add file to current job
434     parString += '\\\"' + file + '\\\"\,'
435     newFile = 0
436     except KeyError:
437 gutsche 1.44 common.logger.message("File "+str(file)+" has unknown number of events: skipping")
438 slacapra 1.41
439 gutsche 1.38
440     # if less events in file remain than eventsPerJobRequested
441     if ( filesEventCount - jobSkipEventCount < eventsPerJobRequested ) :
442     # if last file in block
443 gutsche 1.44 if ( fileCount == numFilesInBlock-1 ) :
444 gutsche 1.38 # end job using last file, use remaining events in block
445     # close job and touch new file
446     fullString = parString[:-2]
447     fullString += '\\}'
448     list_of_lists.append([fullString,str(-1),str(jobSkipEventCount)])
449 slacapra 1.41 common.logger.debug(3,"Job "+str(jobCount+1)+" can run over "+str(filesEventCount - jobSkipEventCount)+" events (last file in block).")
450 gutsche 1.38 self.jobDestination.append(blockSites[block])
451     common.logger.debug(5,"Job "+str(jobCount+1)+" Destination: "+str(self.jobDestination[jobCount]))
452     # reset counter
453     jobCount = jobCount + 1
454 gutsche 1.44 totalEventCount = totalEventCount + filesEventCount - jobSkipEventCount
455     eventsRemaining = eventsRemaining - filesEventCount + jobSkipEventCount
456 gutsche 1.38 jobSkipEventCount = 0
457     # reset file
458     parString = "\\{"
459     filesEventCount = 0
460     newFile = 1
461     fileCount += 1
462     else :
463     # go to next file
464     newFile = 1
465     fileCount += 1
466     # if events in file equal to eventsPerJobRequested
467     elif ( filesEventCount - jobSkipEventCount == eventsPerJobRequested ) :
468     # close job and touch new file
469 gutsche 1.35 fullString = parString[:-2]
470     fullString += '\\}'
471 gutsche 1.38 list_of_lists.append([fullString,str(eventsPerJobRequested),str(jobSkipEventCount)])
472 slacapra 1.41 common.logger.debug(3,"Job "+str(jobCount+1)+" can run over "+str(eventsPerJobRequested)+" events.")
473 gutsche 1.38 self.jobDestination.append(blockSites[block])
474     common.logger.debug(5,"Job "+str(jobCount+1)+" Destination: "+str(self.jobDestination[jobCount]))
475     # reset counter
476     jobCount = jobCount + 1
477     totalEventCount = totalEventCount + eventsPerJobRequested
478     eventsRemaining = eventsRemaining - eventsPerJobRequested
479     jobSkipEventCount = 0
480     # reset file
481     parString = "\\{"
482     filesEventCount = 0
483     newFile = 1
484     fileCount += 1
485    
486     # if more events in file remain than eventsPerJobRequested
487     else :
488     # close job but don't touch new file
489     fullString = parString[:-2]
490     fullString += '\\}'
491     list_of_lists.append([fullString,str(eventsPerJobRequested),str(jobSkipEventCount)])
492 slacapra 1.41 common.logger.debug(3,"Job "+str(jobCount+1)+" can run over "+str(eventsPerJobRequested)+" events.")
493 gutsche 1.35 self.jobDestination.append(blockSites[block])
494     common.logger.debug(5,"Job "+str(jobCount+1)+" Destination: "+str(self.jobDestination[jobCount]))
495 gutsche 1.38 # increase counter
496     jobCount = jobCount + 1
497     totalEventCount = totalEventCount + eventsPerJobRequested
498     eventsRemaining = eventsRemaining - eventsPerJobRequested
499     # calculate skip events for last file
500     # use filesEventCount (contains several files), jobSkipEventCount and eventsPerJobRequest
501     jobSkipEventCount = eventsPerJobRequested - (filesEventCount - jobSkipEventCount - self.eventsbyfile[file])
502     # remove all but the last file
503     filesEventCount = self.eventsbyfile[file]
504     parString = "\\{"
505     parString += '\\\"' + file + '\\\"\,'
506 slacapra 1.41 pass # END if
507 gutsche 1.35 pass # END while (iterate over files in the block)
508     pass # END while (iterate over blocks in the dataset)
509 slacapra 1.41 self.ncjobs = self.total_number_of_jobs = jobCount
510 gutsche 1.38 if (eventsRemaining > 0 and jobCount < totalNumberOfJobs ):
511 gutsche 1.35 common.logger.message("Could not run on all requested events because some blocks not hosted at allowed sites.")
512 mkirn 1.37 common.logger.message("\n"+str(jobCount)+" job(s) can run on "+str(totalEventCount)+" events.\n")
513 slacapra 1.22
514 slacapra 1.9 self.list_of_args = list_of_lists
515     return
516    
517 slacapra 1.21 def jobSplittingNoInput(self):
518 slacapra 1.9 """
519     Perform job splitting based on number of event per job
520     """
521     common.logger.debug(5,'Splitting per events')
522     common.logger.message('Required '+str(self.eventsPerJob)+' events per job ')
523 slacapra 1.22 common.logger.message('Required '+str(self.theNumberOfJobs)+' jobs in total ')
524 slacapra 1.9 common.logger.message('Required '+str(self.total_number_of_events)+' events in total ')
525    
526 slacapra 1.10 if (self.total_number_of_events < 0):
527     msg='Cannot split jobs per Events with "-1" as total number of events'
528     raise CrabException(msg)
529    
530 slacapra 1.22 if (self.selectEventsPerJob):
531 spiga 1.65 if (self.selectTotalNumberEvents):
532     self.total_number_of_jobs = int(self.total_number_of_events/self.eventsPerJob)
533     elif(self.selectNumberOfJobs) :
534     self.total_number_of_jobs =self.theNumberOfJobs
535     self.total_number_of_events =int(self.theNumberOfJobs*self.eventsPerJob)
536    
537 slacapra 1.22 elif (self.selectNumberOfJobs) :
538     self.total_number_of_jobs = self.theNumberOfJobs
539     self.eventsPerJob = int(self.total_number_of_events/self.total_number_of_jobs)
540 spiga 1.65
541 slacapra 1.9 common.logger.debug(5,'N jobs '+str(self.total_number_of_jobs))
542    
543     # is there any remainder?
544     check = int(self.total_number_of_events) - (int(self.total_number_of_jobs)*self.eventsPerJob)
545    
546     common.logger.debug(5,'Check '+str(check))
547    
548 gutsche 1.35 common.logger.message(str(self.total_number_of_jobs)+' jobs can be created, each for '+str(self.eventsPerJob)+' for a total of '+str(self.total_number_of_jobs*self.eventsPerJob)+' events')
549 slacapra 1.9 if check > 0:
550 gutsche 1.35 common.logger.message('Warning: asked '+str(self.total_number_of_events)+' but can do only '+str(int(self.total_number_of_jobs)*self.eventsPerJob))
551 slacapra 1.9
552 slacapra 1.10 # argument is seed number.$i
553 slacapra 1.9 self.list_of_args = []
554     for i in range(self.total_number_of_jobs):
555 gutsche 1.35 ## Since there is no input, any site is good
556 spiga 1.42 # self.jobDestination.append(["Any"])
557     self.jobDestination.append([""]) #must be empty to write correctly the xml
558 spiga 1.57 args=''
559     if (self.firstRun):
560     ## pythia first run
561     #self.list_of_args.append([(str(self.firstRun)+str(i))])
562     args=args+(str(self.firstRun)+str(i))
563     else:
564     ## no first run
565     #self.list_of_args.append([str(i)])
566     args=args+str(i)
567 slacapra 1.23 if (self.sourceSeed):
568 slacapra 1.28 if (self.sourceSeedVtx):
569     ## pythia + vtx random seed
570 spiga 1.57 #self.list_of_args.append([
571     # str(self.sourceSeed)+str(i),
572     # str(self.sourceSeedVtx)+str(i)
573     # ])
574     args=args+str(',')+str(self.sourceSeed)+str(i)+str(',')+str(self.sourceSeedVtx)+str(i)
575 slacapra 1.28 else:
576     ## only pythia random seed
577 spiga 1.57 #self.list_of_args.append([(str(self.sourceSeed)+str(i))])
578     args=args +str(',')+str(self.sourceSeed)+str(i)
579 slacapra 1.23 else:
580 slacapra 1.28 ## no random seed
581 spiga 1.57 if str(args)=='': args=args+(str(self.firstRun)+str(i))
582     arguments=args.split(',')
583     if len(arguments)==3:self.list_of_args.append([str(arguments[0]),str(arguments[1]),str(arguments[2])])
584     elif len(arguments)==2:self.list_of_args.append([str(arguments[0]),str(arguments[1])])
585     else :self.list_of_args.append([str(arguments[0])])
586    
587     # print self.list_of_args
588 gutsche 1.3
589     return
590    
591 spiga 1.42
592     def jobSplittingForScript(self):#CarlosDaniele
593     """
594     Perform job splitting based on number of job
595     """
596     common.logger.debug(5,'Splitting per job')
597     common.logger.message('Required '+str(self.theNumberOfJobs)+' jobs in total ')
598    
599     self.total_number_of_jobs = self.theNumberOfJobs
600    
601     common.logger.debug(5,'N jobs '+str(self.total_number_of_jobs))
602    
603     common.logger.message(str(self.total_number_of_jobs)+' jobs can be created')
604    
605     # argument is seed number.$i
606     self.list_of_args = []
607     for i in range(self.total_number_of_jobs):
608     ## Since there is no input, any site is good
609     # self.jobDestination.append(["Any"])
610     self.jobDestination.append([""])
611     ## no random seed
612     self.list_of_args.append([str(i)])
613     return
614    
615 gutsche 1.3 def split(self, jobParams):
616    
617     common.jobDB.load()
618     #### Fabio
619     njobs = self.total_number_of_jobs
620 slacapra 1.9 arglist = self.list_of_args
621 gutsche 1.3 # create the empty structure
622     for i in range(njobs):
623     jobParams.append("")
624    
625     for job in range(njobs):
626 slacapra 1.17 jobParams[job] = arglist[job]
627     # print str(arglist[job])
628     # print jobParams[job]
629 gutsche 1.3 common.jobDB.setArguments(job, jobParams[job])
630 gutsche 1.35 common.logger.debug(5,"Job "+str(job)+" Destination: "+str(self.jobDestination[job]))
631     common.jobDB.setDestination(job, self.jobDestination[job])
632 gutsche 1.3
633     common.jobDB.save()
634     return
635    
636     def getJobTypeArguments(self, nj, sched):
637 slacapra 1.17 result = ''
638     for i in common.jobDB.arguments(nj):
639     result=result+str(i)+" "
640     return result
641 gutsche 1.3
642     def numberOfJobs(self):
643     # Fabio
644     return self.total_number_of_jobs
645    
646 slacapra 1.1 def getTarBall(self, exe):
647     """
648     Return the TarBall with lib and exe
649     """
650    
651     # if it exist, just return it
652 corvo 1.56 #
653     # Marco. Let's start to use relative path for Boss XML files
654     #
655     self.tgzNameWithPath = common.work_space.pathForTgz()+'share/'+self.tgz_name
656 slacapra 1.1 if os.path.exists(self.tgzNameWithPath):
657     return self.tgzNameWithPath
658    
659     # Prepare a tar gzipped file with user binaries.
660     self.buildTar_(exe)
661    
662     return string.strip(self.tgzNameWithPath)
663    
664     def buildTar_(self, executable):
665    
666     # First of all declare the user Scram area
667     swArea = self.scram.getSWArea_()
668     #print "swArea = ", swArea
669 slacapra 1.63 # swVersion = self.scram.getSWVersion()
670     # print "swVersion = ", swVersion
671 slacapra 1.1 swReleaseTop = self.scram.getReleaseTop_()
672     #print "swReleaseTop = ", swReleaseTop
673    
674     ## check if working area is release top
675     if swReleaseTop == '' or swArea == swReleaseTop:
676     return
677    
678 slacapra 1.61 import tarfile
679     try: # create tar ball
680     tar = tarfile.open(self.tgzNameWithPath, "w:gz")
681     ## First find the executable
682     if (self.executable != ''):
683     exeWithPath = self.scram.findFile_(executable)
684     if ( not exeWithPath ):
685     raise CrabException('User executable '+executable+' not found')
686    
687     ## then check if it's private or not
688     if exeWithPath.find(swReleaseTop) == -1:
689     # the exe is private, so we must ship
690     common.logger.debug(5,"Exe "+exeWithPath+" to be tarred")
691     path = swArea+'/'
692     exe = string.replace(exeWithPath, path,'')
693     tar.add(path+exe,exe)
694     pass
695     else:
696     # the exe is from release, we'll find it on WN
697     pass
698    
699     ## Now get the libraries: only those in local working area
700     libDir = 'lib'
701     lib = swArea+'/' +libDir
702     common.logger.debug(5,"lib "+lib+" to be tarred")
703     if os.path.exists(lib):
704     tar.add(lib,libDir)
705    
706     ## Now check if module dir is present
707     moduleDir = 'module'
708     module = swArea + '/' + moduleDir
709     if os.path.isdir(module):
710     tar.add(module,moduleDir)
711    
712     ## Now check if any data dir(s) is present
713     swAreaLen=len(swArea)
714     for root, dirs, files in os.walk(swArea):
715     if "data" in dirs:
716     common.logger.debug(5,"data "+root+"/data"+" to be tarred")
717     tar.add(root+"/data",root[swAreaLen:]+"/data")
718    
719     ## Add ProdAgent dir to tar
720     paDir = 'ProdAgentApi'
721     pa = os.environ['CRABDIR'] + '/' + 'ProdAgentApi'
722     if os.path.isdir(pa):
723     tar.add(pa,paDir)
724    
725     common.logger.debug(5,"Files added to "+self.tgzNameWithPath+" : "+str(tar.getnames()))
726     tar.close()
727     except :
728     raise CrabException('Could not create tar-ball')
729 corvo 1.56
730 slacapra 1.61 ## create tar-ball with ML stuff
731 corvo 1.58 self.MLtgzfile = common.work_space.pathForTgz()+'share/MLfiles.tgz'
732 slacapra 1.61 try:
733     tar = tarfile.open(self.MLtgzfile, "w:gz")
734     path=os.environ['CRABDIR'] + '/python/'
735     for file in ['report.py', 'DashboardAPI.py', 'Logger.py', 'ProcInfo.py', 'apmon.py', 'parseCrabFjr.py']:
736     tar.add(path+file,file)
737     common.logger.debug(5,"Files added to "+self.MLtgzfile+" : "+str(tar.getnames()))
738     tar.close()
739     except :
740 corvo 1.58 raise CrabException('Could not create ML files tar-ball')
741    
742 slacapra 1.1 return
743    
744     def wsSetupEnvironment(self, nj):
745     """
746     Returns part of a job script which prepares
747     the execution environment for the job 'nj'.
748     """
749     # Prepare JobType-independent part
750 gutsche 1.3 txt = ''
751    
752     ## OLI_Daniele at this level middleware already known
753    
754     txt += 'if [ $middleware == LCG ]; then \n'
755     txt += self.wsSetupCMSLCGEnvironment_()
756     txt += 'elif [ $middleware == OSG ]; then\n'
757 gutsche 1.43 txt += ' WORKING_DIR=`/bin/mktemp -d $OSG_WN_TMP/cms_XXXXXXXXXXXX`\n'
758     txt += ' echo "Created working directory: $WORKING_DIR"\n'
759 gutsche 1.3 txt += ' if [ ! -d $WORKING_DIR ] ;then\n'
760 gutsche 1.7 txt += ' echo "SET_CMS_ENV 10016 ==> OSG $WORKING_DIR could not be created on WN `hostname`"\n'
761     txt += ' echo "JOB_EXIT_STATUS = 10016"\n'
762     txt += ' echo "JobExitCode=10016" | tee -a $RUNTIME_AREA/$repo\n'
763     txt += ' dumpStatus $RUNTIME_AREA/$repo\n'
764 gutsche 1.13 txt += ' rm -f $RUNTIME_AREA/$repo \n'
765     txt += ' echo "MonitorJobID=`echo $MonitorJobID`" | tee -a $RUNTIME_AREA/$repo \n'
766     txt += ' echo "MonitorID=`echo $MonitorID`" | tee -a $RUNTIME_AREA/$repo\n'
767 gutsche 1.3 txt += ' exit 1\n'
768     txt += ' fi\n'
769     txt += '\n'
770     txt += ' echo "Change to working directory: $WORKING_DIR"\n'
771     txt += ' cd $WORKING_DIR\n'
772     txt += self.wsSetupCMSOSGEnvironment_()
773     txt += 'fi\n'
774 slacapra 1.1
775     # Prepare JobType-specific part
776     scram = self.scram.commandName()
777     txt += '\n\n'
778     txt += 'echo "### SPECIFIC JOB SETUP ENVIRONMENT ###"\n'
779     txt += scram+' project CMSSW '+self.version+'\n'
780     txt += 'status=$?\n'
781     txt += 'if [ $status != 0 ] ; then\n'
782 gutsche 1.7 txt += ' echo "SET_EXE_ENV 10034 ==>ERROR CMSSW '+self.version+' not found on `hostname`" \n'
783 gutsche 1.3 txt += ' echo "JOB_EXIT_STATUS = 10034"\n'
784 gutsche 1.7 txt += ' echo "JobExitCode=10034" | tee -a $RUNTIME_AREA/$repo\n'
785 slacapra 1.1 txt += ' dumpStatus $RUNTIME_AREA/$repo\n'
786 gutsche 1.13 txt += ' rm -f $RUNTIME_AREA/$repo \n'
787     txt += ' echo "MonitorJobID=`echo $MonitorJobID`" | tee -a $RUNTIME_AREA/$repo \n'
788     txt += ' echo "MonitorID=`echo $MonitorID`" | tee -a $RUNTIME_AREA/$repo\n'
789 gutsche 1.3 ## OLI_Daniele
790     txt += ' if [ $middleware == OSG ]; then \n'
791     txt += ' echo "Remove working directory: $WORKING_DIR"\n'
792     txt += ' cd $RUNTIME_AREA\n'
793     txt += ' /bin/rm -rf $WORKING_DIR\n'
794     txt += ' if [ -d $WORKING_DIR ] ;then\n'
795 gutsche 1.7 txt += ' echo "SET_CMS_ENV 10018 ==> OSG $WORKING_DIR could not be deleted on WN `hostname` after CMSSW CMSSW_0_6_1 not found on `hostname`"\n'
796     txt += ' echo "JOB_EXIT_STATUS = 10018"\n'
797     txt += ' echo "JobExitCode=10018" | tee -a $RUNTIME_AREA/$repo\n'
798     txt += ' dumpStatus $RUNTIME_AREA/$repo\n'
799 gutsche 1.13 txt += ' rm -f $RUNTIME_AREA/$repo \n'
800     txt += ' echo "MonitorJobID=`echo $MonitorJobID`" | tee -a $RUNTIME_AREA/$repo \n'
801     txt += ' echo "MonitorID=`echo $MonitorID`" | tee -a $RUNTIME_AREA/$repo\n'
802 gutsche 1.3 txt += ' fi\n'
803     txt += ' fi \n'
804     txt += ' exit 1 \n'
805 slacapra 1.1 txt += 'fi \n'
806     txt += 'echo "CMSSW_VERSION = '+self.version+'"\n'
807     txt += 'cd '+self.version+'\n'
808     ### needed grep for bug in scramv1 ###
809 corvo 1.58 txt += scram+' runtime -sh\n'
810 slacapra 1.1 txt += 'eval `'+scram+' runtime -sh | grep -v SCRAMRT_LSB_JOBNAME`\n'
811 corvo 1.58 txt += 'echo $PATH\n'
812 slacapra 1.1
813     # Handle the arguments:
814     txt += "\n"
815 gutsche 1.7 txt += "## number of arguments (first argument always jobnumber)\n"
816 slacapra 1.1 txt += "\n"
817 mkirn 1.32 # txt += "narg=$#\n"
818     txt += "if [ $nargs -lt 2 ]\n"
819 slacapra 1.1 txt += "then\n"
820 mkirn 1.33 txt += " echo 'SET_EXE_ENV 1 ==> ERROR Too few arguments' +$nargs+ \n"
821 gutsche 1.3 txt += ' echo "JOB_EXIT_STATUS = 50113"\n'
822 gutsche 1.7 txt += ' echo "JobExitCode=50113" | tee -a $RUNTIME_AREA/$repo\n'
823 slacapra 1.1 txt += ' dumpStatus $RUNTIME_AREA/$repo\n'
824 gutsche 1.13 txt += ' rm -f $RUNTIME_AREA/$repo \n'
825     txt += ' echo "MonitorJobID=`echo $MonitorJobID`" | tee -a $RUNTIME_AREA/$repo \n'
826     txt += ' echo "MonitorID=`echo $MonitorID`" | tee -a $RUNTIME_AREA/$repo\n'
827 gutsche 1.3 ## OLI_Daniele
828     txt += ' if [ $middleware == OSG ]; then \n'
829     txt += ' echo "Remove working directory: $WORKING_DIR"\n'
830     txt += ' cd $RUNTIME_AREA\n'
831     txt += ' /bin/rm -rf $WORKING_DIR\n'
832     txt += ' if [ -d $WORKING_DIR ] ;then\n'
833 gutsche 1.7 txt += ' echo "SET_EXE_ENV 50114 ==> OSG $WORKING_DIR could not be deleted on WN `hostname` after Too few arguments for CRAB job wrapper"\n'
834     txt += ' echo "JOB_EXIT_STATUS = 50114"\n'
835     txt += ' echo "JobExitCode=50114" | tee -a $RUNTIME_AREA/$repo\n'
836     txt += ' dumpStatus $RUNTIME_AREA/$repo\n'
837 gutsche 1.13 txt += ' rm -f $RUNTIME_AREA/$repo \n'
838     txt += ' echo "MonitorJobID=`echo $MonitorJobID`" | tee -a $RUNTIME_AREA/$repo \n'
839     txt += ' echo "MonitorID=`echo $MonitorID`" | tee -a $RUNTIME_AREA/$repo\n'
840 gutsche 1.3 txt += ' fi\n'
841     txt += ' fi \n'
842 slacapra 1.1 txt += " exit 1\n"
843     txt += "fi\n"
844     txt += "\n"
845    
846     # Prepare job-specific part
847     job = common.job_list[nj]
848 spiga 1.42 if self.pset != None: #CarlosDaniele
849     pset = os.path.basename(job.configFilename())
850     txt += '\n'
851     if (self.datasetPath): # standard job
852     #txt += 'InputFiles=$2\n'
853     txt += 'InputFiles=${args[1]}\n'
854     txt += 'MaxEvents=${args[2]}\n'
855     txt += 'SkipEvents=${args[3]}\n'
856     txt += 'echo "Inputfiles:<$InputFiles>"\n'
857     txt += 'sed "s#{\'INPUT\'}#$InputFiles#" $RUNTIME_AREA/'+pset+' > pset_tmp_1.cfg\n'
858     txt += 'echo "MaxEvents:<$MaxEvents>"\n'
859 gutsche 1.46 txt += 'sed "s#INPUTMAXEVENTS#$MaxEvents#" pset_tmp_1.cfg > pset_tmp_2.cfg\n'
860 spiga 1.42 txt += 'echo "SkipEvents:<$SkipEvents>"\n'
861 gutsche 1.46 txt += 'sed "s#INPUTSKIPEVENTS#$SkipEvents#" pset_tmp_2.cfg > pset.cfg\n'
862 spiga 1.42 else: # pythia like job
863     if (self.sourceSeed):
864 spiga 1.57 txt += 'FirstRun=${args[1]}\n'
865     txt += 'echo "FirstRun: <$FirstRun>"\n'
866     txt += 'sed "s#\<INPUTFIRSTRUN\>#$FirstRun#" $RUNTIME_AREA/'+pset+' > tmp_1.cfg\n'
867     else:
868     txt += '# Copy untouched pset\n'
869     txt += 'cp $RUNTIME_AREA/'+pset+' tmp_1.cfg\n'
870     if (self.sourceSeed):
871 spiga 1.42 # txt += 'Seed=$2\n'
872 spiga 1.57 txt += 'Seed=${args[2]}\n'
873 spiga 1.42 txt += 'echo "Seed: <$Seed>"\n'
874 spiga 1.57 txt += 'sed "s#\<INPUT\>#$Seed#" tmp_1.cfg > tmp_2.cfg\n'
875 spiga 1.42 if (self.sourceSeedVtx):
876     # txt += 'VtxSeed=$3\n'
877 spiga 1.57 txt += 'VtxSeed=${args[3]}\n'
878 spiga 1.42 txt += 'echo "VtxSeed: <$VtxSeed>"\n'
879 spiga 1.57 txt += 'sed "s#INPUTVTX#$VtxSeed#" tmp_2.cfg > pset.cfg\n'
880 spiga 1.42 else:
881 spiga 1.57 txt += 'mv tmp_2.cfg pset.cfg\n'
882 slacapra 1.28 else:
883 spiga 1.57 txt += 'mv tmp_1.cfg pset.cfg\n'
884     # txt += '# Copy untouched pset\n'
885     # txt += 'cp $RUNTIME_AREA/'+pset+' pset.cfg\n'
886 slacapra 1.24
887 slacapra 1.1
888     if len(self.additional_inbox_files) > 0:
889     for file in self.additional_inbox_files:
890 mkirn 1.31 relFile = file.split("/")[-1]
891     txt += 'if [ -e $RUNTIME_AREA/'+relFile+' ] ; then\n'
892     txt += ' cp $RUNTIME_AREA/'+relFile+' .\n'
893     txt += ' chmod +x '+relFile+'\n'
894 slacapra 1.1 txt += 'fi\n'
895     pass
896    
897 spiga 1.42 if self.pset != None: #CarlosDaniele
898     txt += 'echo "### END JOB SETUP ENVIRONMENT ###"\n\n'
899    
900     txt += '\n'
901     txt += 'echo "***** cat pset.cfg *********"\n'
902     txt += 'cat pset.cfg\n'
903     txt += 'echo "****** end pset.cfg ********"\n'
904     txt += '\n'
905     # txt += 'echo "***** cat pset1.cfg *********"\n'
906     # txt += 'cat pset1.cfg\n'
907     # txt += 'echo "****** end pset1.cfg ********"\n'
908 gutsche 1.3 return txt
909    
910 slacapra 1.63 def wsBuildExe(self, nj=0):
911 gutsche 1.3 """
912     Put in the script the commands to build an executable
913     or a library.
914     """
915    
916     txt = ""
917    
918     if os.path.isfile(self.tgzNameWithPath):
919     txt += 'echo "tar xzvf $RUNTIME_AREA/'+os.path.basename(self.tgzNameWithPath)+'"\n'
920     txt += 'tar xzvf $RUNTIME_AREA/'+os.path.basename(self.tgzNameWithPath)+'\n'
921     txt += 'untar_status=$? \n'
922     txt += 'if [ $untar_status -ne 0 ]; then \n'
923     txt += ' echo "SET_EXE 1 ==> ERROR Untarring .tgz file failed"\n'
924     txt += ' echo "JOB_EXIT_STATUS = $untar_status" \n'
925 gutsche 1.7 txt += ' echo "JobExitCode=$untar_status" | tee -a $RUNTIME_AREA/$repo\n'
926 gutsche 1.3 txt += ' if [ $middleware == OSG ]; then \n'
927     txt += ' echo "Remove working directory: $WORKING_DIR"\n'
928     txt += ' cd $RUNTIME_AREA\n'
929     txt += ' /bin/rm -rf $WORKING_DIR\n'
930     txt += ' if [ -d $WORKING_DIR ] ;then\n'
931 gutsche 1.13 txt += ' echo "SET_EXE 50999 ==> OSG $WORKING_DIR could not be deleted on WN `hostname` after Untarring .tgz file failed"\n'
932     txt += ' echo "JOB_EXIT_STATUS = 50999"\n'
933     txt += ' echo "JobExitCode=50999" | tee -a $RUNTIME_AREA/$repo\n'
934     txt += ' dumpStatus $RUNTIME_AREA/$repo\n'
935     txt += ' rm -f $RUNTIME_AREA/$repo \n'
936     txt += ' echo "MonitorJobID=`echo $MonitorJobID`" | tee -a $RUNTIME_AREA/$repo \n'
937     txt += ' echo "MonitorID=`echo $MonitorID`" | tee -a $RUNTIME_AREA/$repo\n'
938 gutsche 1.3 txt += ' fi\n'
939     txt += ' fi \n'
940     txt += ' \n'
941 gutsche 1.7 txt += ' exit 1 \n'
942 gutsche 1.3 txt += 'else \n'
943     txt += ' echo "Successful untar" \n'
944     txt += 'fi \n'
945 gutsche 1.50 txt += '\n'
946     txt += 'echo "Include ProdAgentApi in PYTHONPATH"\n'
947     txt += 'if [ -z "$PYTHONPATH" ]; then\n'
948     txt += ' export PYTHONPATH=ProdAgentApi\n'
949     txt += 'else\n'
950     txt += ' export PYTHONPATH=ProdAgentApi:${PYTHONPATH}\n'
951     txt += 'fi\n'
952     txt += '\n'
953    
954 gutsche 1.3 pass
955    
956 slacapra 1.1 return txt
957    
958     def modifySteeringCards(self, nj):
959     """
960     modify the card provided by the user,
961     writing a new card into share dir
962     """
963    
964     def executableName(self):
965 spiga 1.42 if self.pset == None: #CarlosDaniele
966     return "sh "
967     else:
968     return self.executable
969 slacapra 1.1
970     def executableArgs(self):
971 spiga 1.42 if self.pset == None:#CarlosDaniele
972     return self.scriptExe + " $NJob"
973     else:
974     return " -p pset.cfg"
975 slacapra 1.1
976     def inputSandbox(self, nj):
977     """
978     Returns a list of filenames to be put in JDL input sandbox.
979     """
980     inp_box = []
981 slacapra 1.53 # # dict added to delete duplicate from input sandbox file list
982     # seen = {}
983 slacapra 1.1 ## code
984     if os.path.isfile(self.tgzNameWithPath):
985     inp_box.append(self.tgzNameWithPath)
986 corvo 1.58 if os.path.isfile(self.MLtgzfile):
987     inp_box.append(self.MLtgzfile)
988 slacapra 1.1 ## config
989 spiga 1.42 if not self.pset is None: #CarlosDaniele
990 corvo 1.56 inp_box.append(common.work_space.pathForTgz() + 'job/' + self.configFilename())
991 slacapra 1.1 ## additional input files
992 gutsche 1.3 #for file in self.additional_inbox_files:
993     # inp_box.append(common.work_space.cwdDir()+file)
994 slacapra 1.1 return inp_box
995    
996     def outputSandbox(self, nj):
997     """
998     Returns a list of filenames to be put in JDL output sandbox.
999     """
1000     out_box = []
1001    
1002     ## User Declared output files
1003 slacapra 1.54 for out in (self.output_file+self.output_file_sandbox):
1004 slacapra 1.1 n_out = nj + 1
1005     out_box.append(self.numberFile_(out,str(n_out)))
1006     return out_box
1007    
1008     def prepareSteeringCards(self):
1009     """
1010     Make initial modifications of the user's steering card file.
1011     """
1012     return
1013    
1014     def wsRenameOutput(self, nj):
1015     """
1016     Returns part of a job script which renames the produced files.
1017     """
1018    
1019     txt = '\n'
1020 gutsche 1.7 txt += '# directory content\n'
1021     txt += 'ls \n'
1022 slacapra 1.54
1023     for fileWithSuffix in (self.output_file+self.output_file_sandbox):
1024 slacapra 1.1 output_file_num = self.numberFile_(fileWithSuffix, '$NJob')
1025     txt += '\n'
1026 gutsche 1.7 txt += '# check output file\n'
1027 slacapra 1.1 txt += 'ls '+fileWithSuffix+'\n'
1028 fanzago 1.18 txt += 'ls_result=$?\n'
1029     txt += 'if [ $ls_result -ne 0 ] ; then\n'
1030     txt += ' echo "ERROR: Problem with output file"\n'
1031 gutsche 1.7 if common.scheduler.boss_scheduler_name == 'condor_g':
1032     txt += ' if [ $middleware == OSG ]; then \n'
1033     txt += ' echo "prepare dummy output file"\n'
1034     txt += ' echo "Processing of job output failed" > $RUNTIME_AREA/'+output_file_num+'\n'
1035     txt += ' fi \n'
1036 slacapra 1.1 txt += 'else\n'
1037     txt += ' cp '+fileWithSuffix+' $RUNTIME_AREA/'+output_file_num+'\n'
1038     txt += 'fi\n'
1039    
1040 gutsche 1.7 txt += 'cd $RUNTIME_AREA\n'
1041 fanzago 1.18 txt += 'cd $RUNTIME_AREA\n'
1042 gutsche 1.3 ### OLI_DANIELE
1043     txt += 'if [ $middleware == OSG ]; then\n'
1044     txt += ' cd $RUNTIME_AREA\n'
1045     txt += ' echo "Remove working directory: $WORKING_DIR"\n'
1046     txt += ' /bin/rm -rf $WORKING_DIR\n'
1047     txt += ' if [ -d $WORKING_DIR ] ;then\n'
1048 gutsche 1.7 txt += ' echo "SET_EXE 60999 ==> OSG $WORKING_DIR could not be deleted on WN `hostname` after cleanup of WN"\n'
1049     txt += ' echo "JOB_EXIT_STATUS = 60999"\n'
1050     txt += ' echo "JobExitCode=60999" | tee -a $RUNTIME_AREA/$repo\n'
1051     txt += ' dumpStatus $RUNTIME_AREA/$repo\n'
1052 gutsche 1.13 txt += ' rm -f $RUNTIME_AREA/$repo \n'
1053     txt += ' echo "MonitorJobID=`echo $MonitorJobID`" | tee -a $RUNTIME_AREA/$repo \n'
1054     txt += ' echo "MonitorID=`echo $MonitorID`" | tee -a $RUNTIME_AREA/$repo\n'
1055 gutsche 1.3 txt += ' fi\n'
1056     txt += 'fi\n'
1057     txt += '\n'
1058 slacapra 1.54
1059     file_list = ''
1060     ## Add to filelist only files to be possibly copied to SE
1061     for fileWithSuffix in self.output_file:
1062     output_file_num = self.numberFile_(fileWithSuffix, '$NJob')
1063     file_list=file_list+output_file_num+' '
1064     file_list=file_list[:-1]
1065     txt += 'file_list="'+file_list+'"\n'
1066    
1067 slacapra 1.1 return txt
1068    
1069     def numberFile_(self, file, txt):
1070     """
1071     append _'txt' before last extension of a file
1072     """
1073     p = string.split(file,".")
1074     # take away last extension
1075     name = p[0]
1076     for x in p[1:-1]:
1077     name=name+"."+x
1078     # add "_txt"
1079     if len(p)>1:
1080     ext = p[len(p)-1]
1081     result = name + '_' + txt + "." + ext
1082     else:
1083     result = name + '_' + txt
1084    
1085     return result
1086    
1087 slacapra 1.63 def getRequirements(self, nj=[]):
1088 slacapra 1.1 """
1089     return job requirements to add to jdl files
1090     """
1091     req = ''
1092 slacapra 1.47 if self.version:
1093 slacapra 1.10 req='Member("VO-cms-' + \
1094 slacapra 1.47 self.version + \
1095 slacapra 1.10 '", other.GlueHostApplicationSoftwareRunTimeEnvironment)'
1096 gutsche 1.35
1097     req = req + ' && (other.GlueHostNetworkAdapterOutboundIP)'
1098    
1099 slacapra 1.1 return req
1100 gutsche 1.3
1101     def configFilename(self):
1102     """ return the config filename """
1103     return self.name()+'.cfg'
1104    
1105     ### OLI_DANIELE
1106     def wsSetupCMSOSGEnvironment_(self):
1107     """
1108     Returns part of a job script which is prepares
1109     the execution environment and which is common for all CMS jobs.
1110     """
1111     txt = '\n'
1112     txt += ' echo "### SETUP CMS OSG ENVIRONMENT ###"\n'
1113     txt += ' if [ -f $GRID3_APP_DIR/cmssoft/cmsset_default.sh ] ;then\n'
1114     txt += ' # Use $GRID3_APP_DIR/cmssoft/cmsset_default.sh to setup cms software\n'
1115     txt += ' source $GRID3_APP_DIR/cmssoft/cmsset_default.sh '+self.version+'\n'
1116 mkirn 1.40 txt += ' elif [ -f $OSG_APP/cmssoft/cms/cmsset_default.sh ] ;then\n'
1117     txt += ' # Use $OSG_APP/cmssoft/cms/cmsset_default.sh to setup cms software\n'
1118     txt += ' source $OSG_APP/cmssoft/cms/cmsset_default.sh '+self.version+'\n'
1119 gutsche 1.3 txt += ' else\n'
1120 mkirn 1.40 txt += ' echo "SET_CMS_ENV 10020 ==> ERROR $GRID3_APP_DIR/cmssoft/cmsset_default.sh and $OSG_APP/cmssoft/cms/cmsset_default.sh file not found"\n'
1121 gutsche 1.3 txt += ' echo "JOB_EXIT_STATUS = 10020"\n'
1122     txt += ' echo "JobExitCode=10020" | tee -a $RUNTIME_AREA/$repo\n'
1123     txt += ' dumpStatus $RUNTIME_AREA/$repo\n'
1124 gutsche 1.13 txt += ' rm -f $RUNTIME_AREA/$repo \n'
1125     txt += ' echo "MonitorJobID=`echo $MonitorJobID`" | tee -a $RUNTIME_AREA/$repo \n'
1126     txt += ' echo "MonitorID=`echo $MonitorID`" | tee -a $RUNTIME_AREA/$repo\n'
1127 gutsche 1.7 txt += ' exit 1\n'
1128 gutsche 1.3 txt += '\n'
1129     txt += ' echo "Remove working directory: $WORKING_DIR"\n'
1130     txt += ' cd $RUNTIME_AREA\n'
1131     txt += ' /bin/rm -rf $WORKING_DIR\n'
1132     txt += ' if [ -d $WORKING_DIR ] ;then\n'
1133 mkirn 1.40 txt += ' echo "SET_CMS_ENV 10017 ==> OSG $WORKING_DIR could not be deleted on WN `hostname` after $GRID3_APP_DIR/cmssoft/cmsset_default.sh and $OSG_APP/cmssoft/cms/cmsset_default.sh file not found"\n'
1134 gutsche 1.7 txt += ' echo "JOB_EXIT_STATUS = 10017"\n'
1135     txt += ' echo "JobExitCode=10017" | tee -a $RUNTIME_AREA/$repo\n'
1136     txt += ' dumpStatus $RUNTIME_AREA/$repo\n'
1137 gutsche 1.13 txt += ' rm -f $RUNTIME_AREA/$repo \n'
1138     txt += ' echo "MonitorJobID=`echo $MonitorJobID`" | tee -a $RUNTIME_AREA/$repo \n'
1139     txt += ' echo "MonitorID=`echo $MonitorID`" | tee -a $RUNTIME_AREA/$repo\n'
1140 gutsche 1.3 txt += ' fi\n'
1141     txt += '\n'
1142 gutsche 1.7 txt += ' exit 1\n'
1143 gutsche 1.3 txt += ' fi\n'
1144     txt += '\n'
1145     txt += ' echo "SET_CMS_ENV 0 ==> setup cms environment ok"\n'
1146     txt += ' echo " END SETUP CMS OSG ENVIRONMENT "\n'
1147    
1148     return txt
1149    
1150     ### OLI_DANIELE
1151     def wsSetupCMSLCGEnvironment_(self):
1152     """
1153     Returns part of a job script which is prepares
1154     the execution environment and which is common for all CMS jobs.
1155     """
1156     txt = ' \n'
1157     txt += ' echo " ### SETUP CMS LCG ENVIRONMENT ### "\n'
1158     txt += ' if [ ! $VO_CMS_SW_DIR ] ;then\n'
1159     txt += ' echo "SET_CMS_ENV 10031 ==> ERROR CMS software dir not found on WN `hostname`"\n'
1160     txt += ' echo "JOB_EXIT_STATUS = 10031" \n'
1161     txt += ' echo "JobExitCode=10031" | tee -a $RUNTIME_AREA/$repo\n'
1162     txt += ' dumpStatus $RUNTIME_AREA/$repo\n'
1163 gutsche 1.13 txt += ' rm -f $RUNTIME_AREA/$repo \n'
1164     txt += ' echo "MonitorJobID=`echo $MonitorJobID`" | tee -a $RUNTIME_AREA/$repo \n'
1165     txt += ' echo "MonitorID=`echo $MonitorID`" | tee -a $RUNTIME_AREA/$repo\n'
1166 gutsche 1.7 txt += ' exit 1\n'
1167 gutsche 1.3 txt += ' else\n'
1168     txt += ' echo "Sourcing environment... "\n'
1169     txt += ' if [ ! -s $VO_CMS_SW_DIR/cmsset_default.sh ] ;then\n'
1170     txt += ' echo "SET_CMS_ENV 10020 ==> ERROR cmsset_default.sh file not found into dir $VO_CMS_SW_DIR"\n'
1171     txt += ' echo "JOB_EXIT_STATUS = 10020"\n'
1172     txt += ' echo "JobExitCode=10020" | tee -a $RUNTIME_AREA/$repo\n'
1173     txt += ' dumpStatus $RUNTIME_AREA/$repo\n'
1174 gutsche 1.13 txt += ' rm -f $RUNTIME_AREA/$repo \n'
1175     txt += ' echo "MonitorJobID=`echo $MonitorJobID`" | tee -a $RUNTIME_AREA/$repo \n'
1176     txt += ' echo "MonitorID=`echo $MonitorID`" | tee -a $RUNTIME_AREA/$repo\n'
1177 gutsche 1.7 txt += ' exit 1\n'
1178 gutsche 1.3 txt += ' fi\n'
1179     txt += ' echo "sourcing $VO_CMS_SW_DIR/cmsset_default.sh"\n'
1180     txt += ' source $VO_CMS_SW_DIR/cmsset_default.sh\n'
1181     txt += ' result=$?\n'
1182     txt += ' if [ $result -ne 0 ]; then\n'
1183     txt += ' echo "SET_CMS_ENV 10032 ==> ERROR problem sourcing $VO_CMS_SW_DIR/cmsset_default.sh"\n'
1184     txt += ' echo "JOB_EXIT_STATUS = 10032"\n'
1185     txt += ' echo "JobExitCode=10032" | tee -a $RUNTIME_AREA/$repo\n'
1186     txt += ' dumpStatus $RUNTIME_AREA/$repo\n'
1187 gutsche 1.13 txt += ' rm -f $RUNTIME_AREA/$repo \n'
1188     txt += ' echo "MonitorJobID=`echo $MonitorJobID`" | tee -a $RUNTIME_AREA/$repo \n'
1189     txt += ' echo "MonitorID=`echo $MonitorID`" | tee -a $RUNTIME_AREA/$repo\n'
1190 gutsche 1.7 txt += ' exit 1\n'
1191 gutsche 1.3 txt += ' fi\n'
1192     txt += ' fi\n'
1193     txt += ' \n'
1194     txt += ' string=`cat /etc/redhat-release`\n'
1195     txt += ' echo $string\n'
1196     txt += ' if [[ $string = *alhalla* ]]; then\n'
1197     txt += ' echo "SCRAM_ARCH= $SCRAM_ARCH"\n'
1198     txt += ' elif [[ $string = *Enterprise* ]] || [[ $string = *cientific* ]]; then\n'
1199     txt += ' export SCRAM_ARCH=slc3_ia32_gcc323\n'
1200     txt += ' echo "SCRAM_ARCH= $SCRAM_ARCH"\n'
1201     txt += ' else\n'
1202 gutsche 1.7 txt += ' echo "SET_CMS_ENV 10033 ==> ERROR OS unknown, LCG environment not initialized"\n'
1203 gutsche 1.3 txt += ' echo "JOB_EXIT_STATUS = 10033"\n'
1204     txt += ' echo "JobExitCode=10033" | tee -a $RUNTIME_AREA/$repo\n'
1205     txt += ' dumpStatus $RUNTIME_AREA/$repo\n'
1206 gutsche 1.13 txt += ' rm -f $RUNTIME_AREA/$repo \n'
1207     txt += ' echo "MonitorJobID=`echo $MonitorJobID`" | tee -a $RUNTIME_AREA/$repo \n'
1208     txt += ' echo "MonitorID=`echo $MonitorID`" | tee -a $RUNTIME_AREA/$repo\n'
1209 gutsche 1.7 txt += ' exit 1\n'
1210 gutsche 1.3 txt += ' fi\n'
1211     txt += ' echo "SET_CMS_ENV 0 ==> setup cms environment ok"\n'
1212     txt += ' echo "### END SETUP CMS LCG ENVIRONMENT ###"\n'
1213     return txt
1214 gutsche 1.5
1215     def setParam_(self, param, value):
1216     self._params[param] = value
1217    
1218     def getParams(self):
1219     return self._params
1220 gutsche 1.8
1221     def setTaskid_(self):
1222     self._taskId = self.cfg_params['taskId']
1223    
1224     def getTaskid(self):
1225     return self._taskId
1226 gutsche 1.35
1227     #######################################################################
1228     def uniquelist(self, old):
1229     """
1230     remove duplicates from a list
1231     """
1232     nd={}
1233     for e in old:
1234     nd[e]=0
1235     return nd.keys()