ViewVC Help
View File | Revision Log | Show Annotations | Root Listing
root/cvsroot/COMP/CRAB/python/cms_cmssw.py
Revision: 1.70
Committed: Wed Feb 28 14:17:24 2007 UTC (18 years, 2 months ago) by slacapra
Content type: text/x-python
Branch: MAIN
Changes since 1.69: +20 -17 lines
Log Message:
re-establish submission and execution of user script

File Contents

# User Rev Content
1 slacapra 1.1 from JobType import JobType
2     from crab_logger import Logger
3     from crab_exceptions import *
4     from crab_util import *
5     import common
6 gutsche 1.3 import PsetManipulator
7 slacapra 1.41 import DataDiscovery
8 gutsche 1.66 import DataDiscovery_DBS2
9 slacapra 1.41 import DataLocation
10 slacapra 1.1 import Scram
11    
12 slacapra 1.70 import os, string, re, shutil, glob
13 slacapra 1.1
14     class Cmssw(JobType):
15 gutsche 1.38 def __init__(self, cfg_params, ncjobs):
16 slacapra 1.1 JobType.__init__(self, 'CMSSW')
17     common.logger.debug(3,'CMSSW::__init__')
18    
19 gutsche 1.3 # Marco.
20     self._params = {}
21     self.cfg_params = cfg_params
22 gutsche 1.38
23 gutsche 1.44 # number of jobs requested to be created, limit obj splitting
24 gutsche 1.38 self.ncjobs = ncjobs
25    
26 slacapra 1.1 log = common.logger
27    
28     self.scram = Scram.Scram(cfg_params)
29     self.additional_inbox_files = []
30     self.scriptExe = ''
31     self.executable = ''
32     self.tgz_name = 'default.tgz'
33 corvo 1.56 self.scriptName = 'CMSSW.sh'
34 spiga 1.42 self.pset = '' #scrip use case Da
35     self.datasetPath = '' #scrip use case Da
36 gutsche 1.3
37 gutsche 1.50 # set FJR file name
38     self.fjrFileName = 'crab_fjr.xml'
39    
40 slacapra 1.1 self.version = self.scram.getSWVersion()
41 slacapra 1.55 common.taskDB.setDict('codeVersion',self.version)
42 gutsche 1.5 self.setParam_('application', self.version)
43 slacapra 1.47
44 slacapra 1.1 ### collect Data cards
45 gutsche 1.66
46     ## get DBS mode
47     self.use_dbs_2 = 0
48     try:
49     self.use_dbs_2 = int(self.cfg_params['CMSSW.use_dbs_2'])
50     except KeyError:
51     self.use_dbs_2 = 0
52    
53 slacapra 1.1 try:
54 slacapra 1.9 tmp = cfg_params['CMSSW.datasetpath']
55     log.debug(6, "CMSSW::CMSSW(): datasetPath = "+tmp)
56     if string.lower(tmp)=='none':
57     self.datasetPath = None
58 slacapra 1.21 self.selectNoInput = 1
59 slacapra 1.9 else:
60     self.datasetPath = tmp
61 slacapra 1.21 self.selectNoInput = 0
62 slacapra 1.1 except KeyError:
63 gutsche 1.3 msg = "Error: datasetpath not defined "
64 slacapra 1.1 raise CrabException(msg)
65 gutsche 1.5
66     # ML monitoring
67     # split dataset path style: /PreProdR3Minbias/SIM/GEN-SIM
68 slacapra 1.9 if not self.datasetPath:
69     self.setParam_('dataset', 'None')
70     self.setParam_('owner', 'None')
71     else:
72     datasetpath_split = self.datasetPath.split("/")
73     self.setParam_('dataset', datasetpath_split[1])
74     self.setParam_('owner', datasetpath_split[-1])
75    
76 gutsche 1.8 self.setTaskid_()
77     self.setParam_('taskId', self.cfg_params['taskId'])
78 gutsche 1.5
79 slacapra 1.1 self.dataTiers = []
80    
81     ## now the application
82     try:
83     self.executable = cfg_params['CMSSW.executable']
84 gutsche 1.5 self.setParam_('exe', self.executable)
85 slacapra 1.1 log.debug(6, "CMSSW::CMSSW(): executable = "+self.executable)
86     msg = "Default executable cmsRun overridden. Switch to " + self.executable
87     log.debug(3,msg)
88     except KeyError:
89     self.executable = 'cmsRun'
90 gutsche 1.5 self.setParam_('exe', self.executable)
91 slacapra 1.1 msg = "User executable not defined. Use cmsRun"
92     log.debug(3,msg)
93     pass
94    
95     try:
96     self.pset = cfg_params['CMSSW.pset']
97     log.debug(6, "Cmssw::Cmssw(): PSet file = "+self.pset)
98 spiga 1.42 if self.pset.lower() != 'none' :
99     if (not os.path.exists(self.pset)):
100     raise CrabException("User defined PSet file "+self.pset+" does not exist")
101     else:
102     self.pset = None
103 slacapra 1.1 except KeyError:
104     raise CrabException("PSet file missing. Cannot run cmsRun ")
105    
106     # output files
107 slacapra 1.53 ## stuff which must be returned always via sandbox
108     self.output_file_sandbox = []
109    
110     # add fjr report by default via sandbox
111     self.output_file_sandbox.append(self.fjrFileName)
112    
113     # other output files to be returned via sandbox or copied to SE
114 slacapra 1.1 try:
115     self.output_file = []
116     tmp = cfg_params['CMSSW.output_file']
117     if tmp != '':
118     tmpOutFiles = string.split(cfg_params['CMSSW.output_file'],',')
119     log.debug(7, 'cmssw::cmssw(): output files '+str(tmpOutFiles))
120     for tmp in tmpOutFiles:
121     tmp=string.strip(tmp)
122     self.output_file.append(tmp)
123     pass
124     else:
125 gutsche 1.50 log.message("No output file defined: only stdout/err and the CRAB Framework Job Report will be available")
126 slacapra 1.1 pass
127     pass
128     except KeyError:
129 gutsche 1.50 log.message("No output file defined: only stdout/err and the CRAB Framework Job Report will be available")
130 slacapra 1.1 pass
131    
132     # script_exe file as additional file in inputSandbox
133     try:
134 slacapra 1.10 self.scriptExe = cfg_params['USER.script_exe']
135     if self.scriptExe != '':
136     if not os.path.isfile(self.scriptExe):
137 slacapra 1.64 msg ="ERROR. file "+self.scriptExe+" not found"
138 slacapra 1.10 raise CrabException(msg)
139 spiga 1.42 self.additional_inbox_files.append(string.strip(self.scriptExe))
140 slacapra 1.1 except KeyError:
141 spiga 1.42 self.scriptExe = ''
142 slacapra 1.70
143 spiga 1.42 #CarlosDaniele
144     if self.datasetPath == None and self.pset == None and self.scriptExe == '' :
145 slacapra 1.70 msg ="Error. script_exe not defined"
146 spiga 1.42 raise CrabException(msg)
147    
148 slacapra 1.1 ## additional input files
149     try:
150 slacapra 1.29 tmpAddFiles = string.split(cfg_params['USER.additional_input_files'],',')
151 slacapra 1.70 for tmp in tmpAddFiles:
152     tmp = string.strip(tmp)
153     dirname = ''
154     if not tmp[0]=="/": dirname = "."
155     files = glob.glob(os.path.join(dirname, tmp))
156     for file in files:
157     if not os.path.exists(file):
158     raise CrabException("Additional input file not found: "+file)
159 slacapra 1.45 pass
160 slacapra 1.70 storedFile = common.work_space.shareDir()+file
161     shutil.copyfile(file, storedFile)
162     self.additional_inbox_files.append(string.strip(storedFile))
163 slacapra 1.1 pass
164     pass
165 slacapra 1.70 common.logger.debug(5,"Additional input files: "+str(self.additional_inbox_files))
166 slacapra 1.1 except KeyError:
167     pass
168    
169 slacapra 1.9 # files per job
170 slacapra 1.1 try:
171 gutsche 1.35 if (cfg_params['CMSSW.files_per_jobs']):
172     raise CrabException("files_per_jobs no longer supported. Quitting.")
173 gutsche 1.3 except KeyError:
174 gutsche 1.35 pass
175 gutsche 1.3
176 slacapra 1.9 ## Events per job
177 gutsche 1.3 try:
178 slacapra 1.10 self.eventsPerJob =int( cfg_params['CMSSW.events_per_job'])
179 slacapra 1.9 self.selectEventsPerJob = 1
180 gutsche 1.3 except KeyError:
181 slacapra 1.9 self.eventsPerJob = -1
182     self.selectEventsPerJob = 0
183    
184 slacapra 1.22 ## number of jobs
185     try:
186     self.theNumberOfJobs =int( cfg_params['CMSSW.number_of_jobs'])
187     self.selectNumberOfJobs = 1
188     except KeyError:
189     self.theNumberOfJobs = 0
190     self.selectNumberOfJobs = 0
191 slacapra 1.10
192 gutsche 1.35 try:
193     self.total_number_of_events = int(cfg_params['CMSSW.total_number_of_events'])
194     self.selectTotalNumberEvents = 1
195     except KeyError:
196     self.total_number_of_events = 0
197     self.selectTotalNumberEvents = 0
198    
199 spiga 1.42 if self.pset != None: #CarlosDaniele
200     if ( (self.selectTotalNumberEvents + self.selectEventsPerJob + self.selectNumberOfJobs) != 2 ):
201     msg = 'Must define exactly two of total_number_of_events, events_per_job, or number_of_jobs.'
202     raise CrabException(msg)
203     else:
204     if (self.selectNumberOfJobs == 0):
205     msg = 'Must specify number_of_jobs.'
206     raise CrabException(msg)
207 gutsche 1.35
208 slacapra 1.22 ## source seed for pythia
209     try:
210     self.sourceSeed = int(cfg_params['CMSSW.pythia_seed'])
211     except KeyError:
212 slacapra 1.23 self.sourceSeed = None
213     common.logger.debug(5,"No seed given")
214 slacapra 1.22
215 slacapra 1.28 try:
216     self.sourceSeedVtx = int(cfg_params['CMSSW.vtx_seed'])
217     except KeyError:
218     self.sourceSeedVtx = None
219     common.logger.debug(5,"No vertex seed given")
220 spiga 1.57 try:
221     self.firstRun = int(cfg_params['CMSSW.first_run'])
222     except KeyError:
223     self.firstRun = None
224     common.logger.debug(5,"No first run given")
225 spiga 1.42 if self.pset != None: #CarlosDaniele
226     self.PsetEdit = PsetManipulator.PsetManipulator(self.pset) #Daniele Pset
227 gutsche 1.3
228 slacapra 1.1 #DBSDLS-start
229     ## Initialize the variables that are extracted from DBS/DLS and needed in other places of the code
230     self.maxEvents=0 # max events available ( --> check the requested nb. of evts in Creator.py)
231     self.DBSPaths={} # all dbs paths requested ( --> input to the site local discovery script)
232 gutsche 1.35 self.jobDestination=[] # Site destination(s) for each job (list of lists)
233 slacapra 1.1 ## Perform the data location and discovery (based on DBS/DLS)
234 slacapra 1.9 ## SL: Don't if NONE is specified as input (pythia use case)
235 gutsche 1.35 blockSites = {}
236 slacapra 1.9 if self.datasetPath:
237 gutsche 1.35 blockSites = self.DataDiscoveryAndLocation(cfg_params)
238 slacapra 1.1 #DBSDLS-end
239    
240     self.tgzNameWithPath = self.getTarBall(self.executable)
241 slacapra 1.10
242 slacapra 1.9 ## Select Splitting
243 spiga 1.42 if self.selectNoInput:
244     if self.pset == None: #CarlosDaniele
245     self.jobSplittingForScript()
246     else:
247     self.jobSplittingNoInput()
248 corvo 1.56 else:
249     self.jobSplittingByBlocks(blockSites)
250 gutsche 1.5
251 slacapra 1.22 # modify Pset
252 spiga 1.42 if self.pset != None: #CarlosDaniele
253     try:
254     if (self.datasetPath): # standard job
255     # allow to processa a fraction of events in a file
256     self.PsetEdit.inputModule("INPUT")
257     self.PsetEdit.maxEvent("INPUTMAXEVENTS")
258     self.PsetEdit.skipEvent("INPUTSKIPEVENTS")
259     else: # pythia like job
260     self.PsetEdit.maxEvent(self.eventsPerJob)
261 spiga 1.57 if (self.firstRun):
262     self.PsetEdit.pythiaFirstRun("INPUTFIRSTRUN") #First Run
263 spiga 1.42 if (self.sourceSeed) :
264     self.PsetEdit.pythiaSeed("INPUT")
265     if (self.sourceSeedVtx) :
266     self.PsetEdit.pythiaSeedVtx("INPUTVTX")
267 gutsche 1.50 # add FrameworkJobReport to parameter-set
268     self.PsetEdit.addCrabFJR(self.fjrFileName)
269 spiga 1.42 self.PsetEdit.psetWriter(self.configFilename())
270     except:
271     msg='Error while manipuliating ParameterSet: exiting...'
272     raise CrabException(msg)
273 gutsche 1.3
274 slacapra 1.1 def DataDiscoveryAndLocation(self, cfg_params):
275    
276 gutsche 1.3 common.logger.debug(10,"CMSSW::DataDiscoveryAndLocation()")
277    
278     datasetPath=self.datasetPath
279    
280 slacapra 1.1 ## Contact the DBS
281 slacapra 1.41 common.logger.message("Contacting DBS...")
282 slacapra 1.1 try:
283 gutsche 1.66
284     if self.use_dbs_2 == 1 :
285     self.pubdata=DataDiscovery_DBS2.DataDiscovery_DBS2(datasetPath, cfg_params)
286     else :
287     self.pubdata=DataDiscovery.DataDiscovery(datasetPath, cfg_params)
288 slacapra 1.1 self.pubdata.fetchDBSInfo()
289    
290 slacapra 1.41 except DataDiscovery.NotExistingDatasetError, ex :
291 slacapra 1.1 msg = 'ERROR ***: failed Data Discovery in DBS : %s'%ex.getErrorMessage()
292     raise CrabException(msg)
293 slacapra 1.41 except DataDiscovery.NoDataTierinProvenanceError, ex :
294 slacapra 1.1 msg = 'ERROR ***: failed Data Discovery in DBS : %s'%ex.getErrorMessage()
295     raise CrabException(msg)
296 slacapra 1.41 except DataDiscovery.DataDiscoveryError, ex:
297 gutsche 1.66 msg = 'ERROR ***: failed Data Discovery in DBS : %s'%ex.getErrorMessage()
298 slacapra 1.1 raise CrabException(msg)
299 gutsche 1.67 except DataDiscovery_DBS2.NotExistingDatasetError_DBS2, ex :
300     msg = 'ERROR ***: failed Data Discovery in DBS : %s'%ex.getErrorMessage()
301     raise CrabException(msg)
302     except DataDiscovery_DBS2.NoDataTierinProvenanceError_DBS2, ex :
303     msg = 'ERROR ***: failed Data Discovery in DBS : %s'%ex.getErrorMessage()
304     raise CrabException(msg)
305     except DataDiscovery_DBS2.DataDiscoveryError_DBS2, ex:
306     msg = 'ERROR ***: failed Data Discovery in DBS : %s'%ex.getErrorMessage()
307     raise CrabException(msg)
308 slacapra 1.1
309     ## get list of all required data in the form of dbs paths (dbs path = /dataset/datatier/owner)
310 gutsche 1.3 common.logger.message("Required data are :"+self.datasetPath)
311    
312 gutsche 1.35 self.filesbyblock=self.pubdata.getFiles()
313 mkirn 1.37 self.eventsbyblock=self.pubdata.getEventsPerBlock()
314     self.eventsbyfile=self.pubdata.getEventsPerFile()
315 gutsche 1.3
316 slacapra 1.1 ## get max number of events
317     self.maxEvents=self.pubdata.getMaxEvents() ## self.maxEvents used in Creator.py
318 gutsche 1.44 common.logger.message("The number of available events is %s\n"%self.maxEvents)
319 slacapra 1.1
320 slacapra 1.41 common.logger.message("Contacting DLS...")
321 slacapra 1.1 ## Contact the DLS and build a list of sites hosting the fileblocks
322     try:
323 slacapra 1.41 dataloc=DataLocation.DataLocation(self.filesbyblock.keys(),cfg_params)
324 gutsche 1.6 dataloc.fetchDLSInfo()
325 slacapra 1.41 except DataLocation.DataLocationError , ex:
326 slacapra 1.1 msg = 'ERROR ***: failed Data Location in DLS \n %s '%ex.getErrorMessage()
327     raise CrabException(msg)
328    
329    
330 gutsche 1.35 sites = dataloc.getSites()
331     allSites = []
332     listSites = sites.values()
333 slacapra 1.63 for listSite in listSites:
334     for oneSite in listSite:
335 gutsche 1.35 allSites.append(oneSite)
336     allSites = self.uniquelist(allSites)
337 gutsche 1.3
338 gutsche 1.35 common.logger.message("Sites ("+str(len(allSites))+") hosting part/all of dataset: "+str(allSites))
339     common.logger.debug(6, "List of Sites: "+str(allSites))
340     return sites
341 gutsche 1.3
342 gutsche 1.35 def jobSplittingByBlocks(self, blockSites):
343 slacapra 1.9 """
344 gutsche 1.35 Perform job splitting. Jobs run over an integer number of files
345     and no more than one block.
346     ARGUMENT: blockSites: dictionary with blocks as keys and list of host sites as values
347     REQUIRES: self.selectTotalNumberEvents, self.selectEventsPerJob, self.selectNumberofJobs,
348     self.total_number_of_events, self.eventsPerJob, self.theNumberOfJobs,
349     self.maxEvents, self.filesbyblock
350     SETS: self.jobDestination - Site destination(s) for each job (a list of lists)
351     self.total_number_of_jobs - Total # of jobs
352     self.list_of_args - File(s) job will run on (a list of lists)
353     """
354    
355     # ---- Handle the possible job splitting configurations ---- #
356     if (self.selectTotalNumberEvents):
357     totalEventsRequested = self.total_number_of_events
358     if (self.selectEventsPerJob):
359     eventsPerJobRequested = self.eventsPerJob
360     if (self.selectNumberOfJobs):
361     totalEventsRequested = self.theNumberOfJobs * self.eventsPerJob
362    
363     # If user requested all the events in the dataset
364     if (totalEventsRequested == -1):
365     eventsRemaining=self.maxEvents
366     # If user requested more events than are in the dataset
367     elif (totalEventsRequested > self.maxEvents):
368     eventsRemaining = self.maxEvents
369     common.logger.message("Requested "+str(self.total_number_of_events)+ " events, but only "+str(self.maxEvents)+" events are available.")
370     # If user requested less events than are in the dataset
371     else:
372     eventsRemaining = totalEventsRequested
373 slacapra 1.22
374 slacapra 1.41 # If user requested more events per job than are in the dataset
375     if (self.selectEventsPerJob and eventsPerJobRequested > self.maxEvents):
376     eventsPerJobRequested = self.maxEvents
377    
378 gutsche 1.35 # For user info at end
379     totalEventCount = 0
380 gutsche 1.3
381 gutsche 1.35 if (self.selectTotalNumberEvents and self.selectNumberOfJobs):
382     eventsPerJobRequested = int(eventsRemaining/self.theNumberOfJobs)
383 slacapra 1.22
384 gutsche 1.35 if (self.selectNumberOfJobs):
385     common.logger.message("May not create the exact number_of_jobs requested.")
386 slacapra 1.23
387 gutsche 1.38 if ( self.ncjobs == 'all' ) :
388     totalNumberOfJobs = 999999999
389     else :
390     totalNumberOfJobs = self.ncjobs
391    
392    
393 gutsche 1.35 blocks = blockSites.keys()
394     blockCount = 0
395     # Backup variable in case self.maxEvents counted events in a non-included block
396     numBlocksInDataset = len(blocks)
397 gutsche 1.3
398 gutsche 1.35 jobCount = 0
399     list_of_lists = []
400 gutsche 1.3
401 gutsche 1.35 # ---- Iterate over the blocks in the dataset until ---- #
402     # ---- we've met the requested total # of events ---- #
403 gutsche 1.38 while ( (eventsRemaining > 0) and (blockCount < numBlocksInDataset) and (jobCount < totalNumberOfJobs)):
404 gutsche 1.35 block = blocks[blockCount]
405 gutsche 1.44 blockCount += 1
406    
407 gutsche 1.68 if self.eventsbyblock.has_key(block) :
408     numEventsInBlock = self.eventsbyblock[block]
409     common.logger.debug(5,'Events in Block File '+str(numEventsInBlock))
410 slacapra 1.9
411 gutsche 1.68 files = self.filesbyblock[block]
412     numFilesInBlock = len(files)
413     if (numFilesInBlock <= 0):
414     continue
415     fileCount = 0
416    
417     # ---- New block => New job ---- #
418     parString = "\\{"
419     # counter for number of events in files currently worked on
420     filesEventCount = 0
421     # flag if next while loop should touch new file
422     newFile = 1
423     # job event counter
424     jobSkipEventCount = 0
425 slacapra 1.9
426 gutsche 1.68 # ---- Iterate over the files in the block until we've met the requested ---- #
427     # ---- total # of events or we've gone over all the files in this block ---- #
428     while ( (eventsRemaining > 0) and (fileCount < numFilesInBlock) and (jobCount < totalNumberOfJobs) ):
429     file = files[fileCount]
430     if newFile :
431     try:
432     numEventsInFile = self.eventsbyfile[file]
433     common.logger.debug(6, "File "+str(file)+" has "+str(numEventsInFile)+" events")
434     # increase filesEventCount
435     filesEventCount += numEventsInFile
436     # Add file to current job
437     parString += '\\\"' + file + '\\\"\,'
438     newFile = 0
439     except KeyError:
440     common.logger.message("File "+str(file)+" has unknown number of events: skipping")
441 slacapra 1.41
442 gutsche 1.38
443 gutsche 1.68 # if less events in file remain than eventsPerJobRequested
444     if ( filesEventCount - jobSkipEventCount < eventsPerJobRequested ) :
445     # if last file in block
446     if ( fileCount == numFilesInBlock-1 ) :
447     # end job using last file, use remaining events in block
448     # close job and touch new file
449     fullString = parString[:-2]
450     fullString += '\\}'
451     list_of_lists.append([fullString,str(-1),str(jobSkipEventCount)])
452     common.logger.debug(3,"Job "+str(jobCount+1)+" can run over "+str(filesEventCount - jobSkipEventCount)+" events (last file in block).")
453     self.jobDestination.append(blockSites[block])
454     common.logger.debug(5,"Job "+str(jobCount+1)+" Destination: "+str(self.jobDestination[jobCount]))
455     # reset counter
456     jobCount = jobCount + 1
457     totalEventCount = totalEventCount + filesEventCount - jobSkipEventCount
458     eventsRemaining = eventsRemaining - filesEventCount + jobSkipEventCount
459     jobSkipEventCount = 0
460     # reset file
461     parString = "\\{"
462     filesEventCount = 0
463     newFile = 1
464     fileCount += 1
465     else :
466     # go to next file
467     newFile = 1
468     fileCount += 1
469     # if events in file equal to eventsPerJobRequested
470     elif ( filesEventCount - jobSkipEventCount == eventsPerJobRequested ) :
471 gutsche 1.38 # close job and touch new file
472     fullString = parString[:-2]
473     fullString += '\\}'
474 gutsche 1.68 list_of_lists.append([fullString,str(eventsPerJobRequested),str(jobSkipEventCount)])
475     common.logger.debug(3,"Job "+str(jobCount+1)+" can run over "+str(eventsPerJobRequested)+" events.")
476 gutsche 1.38 self.jobDestination.append(blockSites[block])
477     common.logger.debug(5,"Job "+str(jobCount+1)+" Destination: "+str(self.jobDestination[jobCount]))
478     # reset counter
479     jobCount = jobCount + 1
480 gutsche 1.68 totalEventCount = totalEventCount + eventsPerJobRequested
481     eventsRemaining = eventsRemaining - eventsPerJobRequested
482 gutsche 1.38 jobSkipEventCount = 0
483     # reset file
484     parString = "\\{"
485     filesEventCount = 0
486     newFile = 1
487     fileCount += 1
488 gutsche 1.68
489     # if more events in file remain than eventsPerJobRequested
490 gutsche 1.38 else :
491 gutsche 1.68 # close job but don't touch new file
492     fullString = parString[:-2]
493     fullString += '\\}'
494     list_of_lists.append([fullString,str(eventsPerJobRequested),str(jobSkipEventCount)])
495     common.logger.debug(3,"Job "+str(jobCount+1)+" can run over "+str(eventsPerJobRequested)+" events.")
496     self.jobDestination.append(blockSites[block])
497     common.logger.debug(5,"Job "+str(jobCount+1)+" Destination: "+str(self.jobDestination[jobCount]))
498     # increase counter
499     jobCount = jobCount + 1
500     totalEventCount = totalEventCount + eventsPerJobRequested
501     eventsRemaining = eventsRemaining - eventsPerJobRequested
502     # calculate skip events for last file
503     # use filesEventCount (contains several files), jobSkipEventCount and eventsPerJobRequest
504     jobSkipEventCount = eventsPerJobRequested - (filesEventCount - jobSkipEventCount - self.eventsbyfile[file])
505     # remove all but the last file
506     filesEventCount = self.eventsbyfile[file]
507     parString = "\\{"
508     parString += '\\\"' + file + '\\\"\,'
509     pass # END if
510     pass # END while (iterate over files in the block)
511 gutsche 1.35 pass # END while (iterate over blocks in the dataset)
512 slacapra 1.41 self.ncjobs = self.total_number_of_jobs = jobCount
513 gutsche 1.38 if (eventsRemaining > 0 and jobCount < totalNumberOfJobs ):
514 gutsche 1.35 common.logger.message("Could not run on all requested events because some blocks not hosted at allowed sites.")
515 mkirn 1.37 common.logger.message("\n"+str(jobCount)+" job(s) can run on "+str(totalEventCount)+" events.\n")
516 slacapra 1.22
517 slacapra 1.9 self.list_of_args = list_of_lists
518     return
519    
520 slacapra 1.21 def jobSplittingNoInput(self):
521 slacapra 1.9 """
522     Perform job splitting based on number of event per job
523     """
524     common.logger.debug(5,'Splitting per events')
525     common.logger.message('Required '+str(self.eventsPerJob)+' events per job ')
526 slacapra 1.22 common.logger.message('Required '+str(self.theNumberOfJobs)+' jobs in total ')
527 slacapra 1.9 common.logger.message('Required '+str(self.total_number_of_events)+' events in total ')
528    
529 slacapra 1.10 if (self.total_number_of_events < 0):
530     msg='Cannot split jobs per Events with "-1" as total number of events'
531     raise CrabException(msg)
532    
533 slacapra 1.22 if (self.selectEventsPerJob):
534 spiga 1.65 if (self.selectTotalNumberEvents):
535     self.total_number_of_jobs = int(self.total_number_of_events/self.eventsPerJob)
536     elif(self.selectNumberOfJobs) :
537     self.total_number_of_jobs =self.theNumberOfJobs
538     self.total_number_of_events =int(self.theNumberOfJobs*self.eventsPerJob)
539    
540 slacapra 1.22 elif (self.selectNumberOfJobs) :
541     self.total_number_of_jobs = self.theNumberOfJobs
542     self.eventsPerJob = int(self.total_number_of_events/self.total_number_of_jobs)
543 spiga 1.65
544 slacapra 1.9 common.logger.debug(5,'N jobs '+str(self.total_number_of_jobs))
545    
546     # is there any remainder?
547     check = int(self.total_number_of_events) - (int(self.total_number_of_jobs)*self.eventsPerJob)
548    
549     common.logger.debug(5,'Check '+str(check))
550    
551 gutsche 1.35 common.logger.message(str(self.total_number_of_jobs)+' jobs can be created, each for '+str(self.eventsPerJob)+' for a total of '+str(self.total_number_of_jobs*self.eventsPerJob)+' events')
552 slacapra 1.9 if check > 0:
553 gutsche 1.35 common.logger.message('Warning: asked '+str(self.total_number_of_events)+' but can do only '+str(int(self.total_number_of_jobs)*self.eventsPerJob))
554 slacapra 1.9
555 slacapra 1.10 # argument is seed number.$i
556 slacapra 1.9 self.list_of_args = []
557     for i in range(self.total_number_of_jobs):
558 gutsche 1.35 ## Since there is no input, any site is good
559 spiga 1.42 # self.jobDestination.append(["Any"])
560     self.jobDestination.append([""]) #must be empty to write correctly the xml
561 spiga 1.57 args=''
562     if (self.firstRun):
563     ## pythia first run
564     #self.list_of_args.append([(str(self.firstRun)+str(i))])
565     args=args+(str(self.firstRun)+str(i))
566     else:
567     ## no first run
568     #self.list_of_args.append([str(i)])
569     args=args+str(i)
570 slacapra 1.23 if (self.sourceSeed):
571 slacapra 1.28 if (self.sourceSeedVtx):
572     ## pythia + vtx random seed
573 spiga 1.57 #self.list_of_args.append([
574     # str(self.sourceSeed)+str(i),
575     # str(self.sourceSeedVtx)+str(i)
576     # ])
577     args=args+str(',')+str(self.sourceSeed)+str(i)+str(',')+str(self.sourceSeedVtx)+str(i)
578 slacapra 1.28 else:
579     ## only pythia random seed
580 spiga 1.57 #self.list_of_args.append([(str(self.sourceSeed)+str(i))])
581     args=args +str(',')+str(self.sourceSeed)+str(i)
582 slacapra 1.23 else:
583 slacapra 1.28 ## no random seed
584 spiga 1.57 if str(args)=='': args=args+(str(self.firstRun)+str(i))
585     arguments=args.split(',')
586     if len(arguments)==3:self.list_of_args.append([str(arguments[0]),str(arguments[1]),str(arguments[2])])
587     elif len(arguments)==2:self.list_of_args.append([str(arguments[0]),str(arguments[1])])
588     else :self.list_of_args.append([str(arguments[0])])
589    
590     # print self.list_of_args
591 gutsche 1.3
592     return
593    
594 spiga 1.42
595     def jobSplittingForScript(self):#CarlosDaniele
596     """
597     Perform job splitting based on number of job
598     """
599     common.logger.debug(5,'Splitting per job')
600     common.logger.message('Required '+str(self.theNumberOfJobs)+' jobs in total ')
601    
602     self.total_number_of_jobs = self.theNumberOfJobs
603    
604     common.logger.debug(5,'N jobs '+str(self.total_number_of_jobs))
605    
606     common.logger.message(str(self.total_number_of_jobs)+' jobs can be created')
607    
608     # argument is seed number.$i
609     self.list_of_args = []
610     for i in range(self.total_number_of_jobs):
611     ## Since there is no input, any site is good
612     # self.jobDestination.append(["Any"])
613     self.jobDestination.append([""])
614     ## no random seed
615     self.list_of_args.append([str(i)])
616     return
617    
618 gutsche 1.3 def split(self, jobParams):
619    
620     common.jobDB.load()
621     #### Fabio
622     njobs = self.total_number_of_jobs
623 slacapra 1.9 arglist = self.list_of_args
624 gutsche 1.3 # create the empty structure
625     for i in range(njobs):
626     jobParams.append("")
627    
628     for job in range(njobs):
629 slacapra 1.17 jobParams[job] = arglist[job]
630     # print str(arglist[job])
631     # print jobParams[job]
632 gutsche 1.3 common.jobDB.setArguments(job, jobParams[job])
633 gutsche 1.35 common.logger.debug(5,"Job "+str(job)+" Destination: "+str(self.jobDestination[job]))
634     common.jobDB.setDestination(job, self.jobDestination[job])
635 gutsche 1.3
636     common.jobDB.save()
637     return
638    
639     def getJobTypeArguments(self, nj, sched):
640 slacapra 1.17 result = ''
641     for i in common.jobDB.arguments(nj):
642     result=result+str(i)+" "
643     return result
644 gutsche 1.3
645     def numberOfJobs(self):
646     # Fabio
647     return self.total_number_of_jobs
648    
649 slacapra 1.1 def getTarBall(self, exe):
650     """
651     Return the TarBall with lib and exe
652     """
653    
654     # if it exist, just return it
655 corvo 1.56 #
656     # Marco. Let's start to use relative path for Boss XML files
657     #
658     self.tgzNameWithPath = common.work_space.pathForTgz()+'share/'+self.tgz_name
659 slacapra 1.1 if os.path.exists(self.tgzNameWithPath):
660     return self.tgzNameWithPath
661    
662     # Prepare a tar gzipped file with user binaries.
663     self.buildTar_(exe)
664    
665     return string.strip(self.tgzNameWithPath)
666    
667     def buildTar_(self, executable):
668    
669     # First of all declare the user Scram area
670     swArea = self.scram.getSWArea_()
671     #print "swArea = ", swArea
672 slacapra 1.63 # swVersion = self.scram.getSWVersion()
673     # print "swVersion = ", swVersion
674 slacapra 1.1 swReleaseTop = self.scram.getReleaseTop_()
675     #print "swReleaseTop = ", swReleaseTop
676    
677     ## check if working area is release top
678     if swReleaseTop == '' or swArea == swReleaseTop:
679     return
680    
681 slacapra 1.61 import tarfile
682     try: # create tar ball
683     tar = tarfile.open(self.tgzNameWithPath, "w:gz")
684     ## First find the executable
685     if (self.executable != ''):
686     exeWithPath = self.scram.findFile_(executable)
687     if ( not exeWithPath ):
688     raise CrabException('User executable '+executable+' not found')
689    
690     ## then check if it's private or not
691     if exeWithPath.find(swReleaseTop) == -1:
692     # the exe is private, so we must ship
693     common.logger.debug(5,"Exe "+exeWithPath+" to be tarred")
694     path = swArea+'/'
695     exe = string.replace(exeWithPath, path,'')
696     tar.add(path+exe,exe)
697     pass
698     else:
699     # the exe is from release, we'll find it on WN
700     pass
701    
702     ## Now get the libraries: only those in local working area
703     libDir = 'lib'
704     lib = swArea+'/' +libDir
705     common.logger.debug(5,"lib "+lib+" to be tarred")
706     if os.path.exists(lib):
707     tar.add(lib,libDir)
708    
709     ## Now check if module dir is present
710     moduleDir = 'module'
711     module = swArea + '/' + moduleDir
712     if os.path.isdir(module):
713     tar.add(module,moduleDir)
714    
715     ## Now check if any data dir(s) is present
716     swAreaLen=len(swArea)
717     for root, dirs, files in os.walk(swArea):
718     if "data" in dirs:
719     common.logger.debug(5,"data "+root+"/data"+" to be tarred")
720     tar.add(root+"/data",root[swAreaLen:]+"/data")
721    
722     ## Add ProdAgent dir to tar
723     paDir = 'ProdAgentApi'
724     pa = os.environ['CRABDIR'] + '/' + 'ProdAgentApi'
725     if os.path.isdir(pa):
726     tar.add(pa,paDir)
727    
728     common.logger.debug(5,"Files added to "+self.tgzNameWithPath+" : "+str(tar.getnames()))
729     tar.close()
730     except :
731     raise CrabException('Could not create tar-ball')
732 corvo 1.56
733 slacapra 1.61 ## create tar-ball with ML stuff
734 corvo 1.58 self.MLtgzfile = common.work_space.pathForTgz()+'share/MLfiles.tgz'
735 slacapra 1.61 try:
736     tar = tarfile.open(self.MLtgzfile, "w:gz")
737     path=os.environ['CRABDIR'] + '/python/'
738     for file in ['report.py', 'DashboardAPI.py', 'Logger.py', 'ProcInfo.py', 'apmon.py', 'parseCrabFjr.py']:
739     tar.add(path+file,file)
740     common.logger.debug(5,"Files added to "+self.MLtgzfile+" : "+str(tar.getnames()))
741     tar.close()
742     except :
743 corvo 1.58 raise CrabException('Could not create ML files tar-ball')
744    
745 slacapra 1.1 return
746    
747     def wsSetupEnvironment(self, nj):
748     """
749     Returns part of a job script which prepares
750     the execution environment for the job 'nj'.
751     """
752     # Prepare JobType-independent part
753 gutsche 1.3 txt = ''
754    
755     ## OLI_Daniele at this level middleware already known
756    
757     txt += 'if [ $middleware == LCG ]; then \n'
758     txt += self.wsSetupCMSLCGEnvironment_()
759     txt += 'elif [ $middleware == OSG ]; then\n'
760 gutsche 1.43 txt += ' WORKING_DIR=`/bin/mktemp -d $OSG_WN_TMP/cms_XXXXXXXXXXXX`\n'
761     txt += ' echo "Created working directory: $WORKING_DIR"\n'
762 gutsche 1.3 txt += ' if [ ! -d $WORKING_DIR ] ;then\n'
763 gutsche 1.7 txt += ' echo "SET_CMS_ENV 10016 ==> OSG $WORKING_DIR could not be created on WN `hostname`"\n'
764     txt += ' echo "JOB_EXIT_STATUS = 10016"\n'
765     txt += ' echo "JobExitCode=10016" | tee -a $RUNTIME_AREA/$repo\n'
766     txt += ' dumpStatus $RUNTIME_AREA/$repo\n'
767 gutsche 1.13 txt += ' rm -f $RUNTIME_AREA/$repo \n'
768     txt += ' echo "MonitorJobID=`echo $MonitorJobID`" | tee -a $RUNTIME_AREA/$repo \n'
769     txt += ' echo "MonitorID=`echo $MonitorID`" | tee -a $RUNTIME_AREA/$repo\n'
770 gutsche 1.3 txt += ' exit 1\n'
771     txt += ' fi\n'
772     txt += '\n'
773     txt += ' echo "Change to working directory: $WORKING_DIR"\n'
774     txt += ' cd $WORKING_DIR\n'
775     txt += self.wsSetupCMSOSGEnvironment_()
776     txt += 'fi\n'
777 slacapra 1.1
778     # Prepare JobType-specific part
779     scram = self.scram.commandName()
780     txt += '\n\n'
781     txt += 'echo "### SPECIFIC JOB SETUP ENVIRONMENT ###"\n'
782     txt += scram+' project CMSSW '+self.version+'\n'
783     txt += 'status=$?\n'
784     txt += 'if [ $status != 0 ] ; then\n'
785 gutsche 1.7 txt += ' echo "SET_EXE_ENV 10034 ==>ERROR CMSSW '+self.version+' not found on `hostname`" \n'
786 gutsche 1.3 txt += ' echo "JOB_EXIT_STATUS = 10034"\n'
787 gutsche 1.7 txt += ' echo "JobExitCode=10034" | tee -a $RUNTIME_AREA/$repo\n'
788 slacapra 1.1 txt += ' dumpStatus $RUNTIME_AREA/$repo\n'
789 gutsche 1.13 txt += ' rm -f $RUNTIME_AREA/$repo \n'
790     txt += ' echo "MonitorJobID=`echo $MonitorJobID`" | tee -a $RUNTIME_AREA/$repo \n'
791     txt += ' echo "MonitorID=`echo $MonitorID`" | tee -a $RUNTIME_AREA/$repo\n'
792 gutsche 1.3 ## OLI_Daniele
793     txt += ' if [ $middleware == OSG ]; then \n'
794     txt += ' echo "Remove working directory: $WORKING_DIR"\n'
795     txt += ' cd $RUNTIME_AREA\n'
796     txt += ' /bin/rm -rf $WORKING_DIR\n'
797     txt += ' if [ -d $WORKING_DIR ] ;then\n'
798 gutsche 1.7 txt += ' echo "SET_CMS_ENV 10018 ==> OSG $WORKING_DIR could not be deleted on WN `hostname` after CMSSW CMSSW_0_6_1 not found on `hostname`"\n'
799     txt += ' echo "JOB_EXIT_STATUS = 10018"\n'
800     txt += ' echo "JobExitCode=10018" | tee -a $RUNTIME_AREA/$repo\n'
801     txt += ' dumpStatus $RUNTIME_AREA/$repo\n'
802 gutsche 1.13 txt += ' rm -f $RUNTIME_AREA/$repo \n'
803     txt += ' echo "MonitorJobID=`echo $MonitorJobID`" | tee -a $RUNTIME_AREA/$repo \n'
804     txt += ' echo "MonitorID=`echo $MonitorID`" | tee -a $RUNTIME_AREA/$repo\n'
805 gutsche 1.3 txt += ' fi\n'
806     txt += ' fi \n'
807     txt += ' exit 1 \n'
808 slacapra 1.1 txt += 'fi \n'
809     txt += 'echo "CMSSW_VERSION = '+self.version+'"\n'
810     txt += 'cd '+self.version+'\n'
811     ### needed grep for bug in scramv1 ###
812 corvo 1.58 txt += scram+' runtime -sh\n'
813 slacapra 1.1 txt += 'eval `'+scram+' runtime -sh | grep -v SCRAMRT_LSB_JOBNAME`\n'
814 corvo 1.58 txt += 'echo $PATH\n'
815 slacapra 1.1
816     # Handle the arguments:
817     txt += "\n"
818 gutsche 1.7 txt += "## number of arguments (first argument always jobnumber)\n"
819 slacapra 1.1 txt += "\n"
820 mkirn 1.32 # txt += "narg=$#\n"
821     txt += "if [ $nargs -lt 2 ]\n"
822 slacapra 1.1 txt += "then\n"
823 mkirn 1.33 txt += " echo 'SET_EXE_ENV 1 ==> ERROR Too few arguments' +$nargs+ \n"
824 gutsche 1.3 txt += ' echo "JOB_EXIT_STATUS = 50113"\n'
825 gutsche 1.7 txt += ' echo "JobExitCode=50113" | tee -a $RUNTIME_AREA/$repo\n'
826 slacapra 1.1 txt += ' dumpStatus $RUNTIME_AREA/$repo\n'
827 gutsche 1.13 txt += ' rm -f $RUNTIME_AREA/$repo \n'
828     txt += ' echo "MonitorJobID=`echo $MonitorJobID`" | tee -a $RUNTIME_AREA/$repo \n'
829     txt += ' echo "MonitorID=`echo $MonitorID`" | tee -a $RUNTIME_AREA/$repo\n'
830 gutsche 1.3 ## OLI_Daniele
831     txt += ' if [ $middleware == OSG ]; then \n'
832     txt += ' echo "Remove working directory: $WORKING_DIR"\n'
833     txt += ' cd $RUNTIME_AREA\n'
834     txt += ' /bin/rm -rf $WORKING_DIR\n'
835     txt += ' if [ -d $WORKING_DIR ] ;then\n'
836 gutsche 1.7 txt += ' echo "SET_EXE_ENV 50114 ==> OSG $WORKING_DIR could not be deleted on WN `hostname` after Too few arguments for CRAB job wrapper"\n'
837     txt += ' echo "JOB_EXIT_STATUS = 50114"\n'
838     txt += ' echo "JobExitCode=50114" | tee -a $RUNTIME_AREA/$repo\n'
839     txt += ' dumpStatus $RUNTIME_AREA/$repo\n'
840 gutsche 1.13 txt += ' rm -f $RUNTIME_AREA/$repo \n'
841     txt += ' echo "MonitorJobID=`echo $MonitorJobID`" | tee -a $RUNTIME_AREA/$repo \n'
842     txt += ' echo "MonitorID=`echo $MonitorID`" | tee -a $RUNTIME_AREA/$repo\n'
843 gutsche 1.3 txt += ' fi\n'
844     txt += ' fi \n'
845 slacapra 1.1 txt += " exit 1\n"
846     txt += "fi\n"
847     txt += "\n"
848    
849     # Prepare job-specific part
850     job = common.job_list[nj]
851 spiga 1.42 if self.pset != None: #CarlosDaniele
852     pset = os.path.basename(job.configFilename())
853     txt += '\n'
854     if (self.datasetPath): # standard job
855     #txt += 'InputFiles=$2\n'
856     txt += 'InputFiles=${args[1]}\n'
857     txt += 'MaxEvents=${args[2]}\n'
858     txt += 'SkipEvents=${args[3]}\n'
859     txt += 'echo "Inputfiles:<$InputFiles>"\n'
860     txt += 'sed "s#{\'INPUT\'}#$InputFiles#" $RUNTIME_AREA/'+pset+' > pset_tmp_1.cfg\n'
861     txt += 'echo "MaxEvents:<$MaxEvents>"\n'
862 gutsche 1.46 txt += 'sed "s#INPUTMAXEVENTS#$MaxEvents#" pset_tmp_1.cfg > pset_tmp_2.cfg\n'
863 spiga 1.42 txt += 'echo "SkipEvents:<$SkipEvents>"\n'
864 gutsche 1.46 txt += 'sed "s#INPUTSKIPEVENTS#$SkipEvents#" pset_tmp_2.cfg > pset.cfg\n'
865 spiga 1.42 else: # pythia like job
866     if (self.sourceSeed):
867 spiga 1.57 txt += 'FirstRun=${args[1]}\n'
868     txt += 'echo "FirstRun: <$FirstRun>"\n'
869     txt += 'sed "s#\<INPUTFIRSTRUN\>#$FirstRun#" $RUNTIME_AREA/'+pset+' > tmp_1.cfg\n'
870     else:
871     txt += '# Copy untouched pset\n'
872     txt += 'cp $RUNTIME_AREA/'+pset+' tmp_1.cfg\n'
873     if (self.sourceSeed):
874 spiga 1.42 # txt += 'Seed=$2\n'
875 spiga 1.57 txt += 'Seed=${args[2]}\n'
876 spiga 1.42 txt += 'echo "Seed: <$Seed>"\n'
877 spiga 1.57 txt += 'sed "s#\<INPUT\>#$Seed#" tmp_1.cfg > tmp_2.cfg\n'
878 spiga 1.42 if (self.sourceSeedVtx):
879     # txt += 'VtxSeed=$3\n'
880 spiga 1.57 txt += 'VtxSeed=${args[3]}\n'
881 spiga 1.42 txt += 'echo "VtxSeed: <$VtxSeed>"\n'
882 spiga 1.57 txt += 'sed "s#INPUTVTX#$VtxSeed#" tmp_2.cfg > pset.cfg\n'
883 spiga 1.42 else:
884 spiga 1.57 txt += 'mv tmp_2.cfg pset.cfg\n'
885 slacapra 1.28 else:
886 spiga 1.57 txt += 'mv tmp_1.cfg pset.cfg\n'
887     # txt += '# Copy untouched pset\n'
888     # txt += 'cp $RUNTIME_AREA/'+pset+' pset.cfg\n'
889 slacapra 1.24
890 slacapra 1.1
891     if len(self.additional_inbox_files) > 0:
892     for file in self.additional_inbox_files:
893 mkirn 1.31 relFile = file.split("/")[-1]
894     txt += 'if [ -e $RUNTIME_AREA/'+relFile+' ] ; then\n'
895     txt += ' cp $RUNTIME_AREA/'+relFile+' .\n'
896     txt += ' chmod +x '+relFile+'\n'
897 slacapra 1.1 txt += 'fi\n'
898     pass
899    
900 spiga 1.42 if self.pset != None: #CarlosDaniele
901     txt += 'echo "### END JOB SETUP ENVIRONMENT ###"\n\n'
902    
903     txt += '\n'
904     txt += 'echo "***** cat pset.cfg *********"\n'
905     txt += 'cat pset.cfg\n'
906     txt += 'echo "****** end pset.cfg ********"\n'
907     txt += '\n'
908     # txt += 'echo "***** cat pset1.cfg *********"\n'
909     # txt += 'cat pset1.cfg\n'
910     # txt += 'echo "****** end pset1.cfg ********"\n'
911 gutsche 1.3 return txt
912    
913 slacapra 1.63 def wsBuildExe(self, nj=0):
914 gutsche 1.3 """
915     Put in the script the commands to build an executable
916     or a library.
917     """
918    
919     txt = ""
920    
921     if os.path.isfile(self.tgzNameWithPath):
922     txt += 'echo "tar xzvf $RUNTIME_AREA/'+os.path.basename(self.tgzNameWithPath)+'"\n'
923     txt += 'tar xzvf $RUNTIME_AREA/'+os.path.basename(self.tgzNameWithPath)+'\n'
924     txt += 'untar_status=$? \n'
925     txt += 'if [ $untar_status -ne 0 ]; then \n'
926     txt += ' echo "SET_EXE 1 ==> ERROR Untarring .tgz file failed"\n'
927     txt += ' echo "JOB_EXIT_STATUS = $untar_status" \n'
928 gutsche 1.7 txt += ' echo "JobExitCode=$untar_status" | tee -a $RUNTIME_AREA/$repo\n'
929 gutsche 1.3 txt += ' if [ $middleware == OSG ]; then \n'
930     txt += ' echo "Remove working directory: $WORKING_DIR"\n'
931     txt += ' cd $RUNTIME_AREA\n'
932     txt += ' /bin/rm -rf $WORKING_DIR\n'
933     txt += ' if [ -d $WORKING_DIR ] ;then\n'
934 gutsche 1.13 txt += ' echo "SET_EXE 50999 ==> OSG $WORKING_DIR could not be deleted on WN `hostname` after Untarring .tgz file failed"\n'
935     txt += ' echo "JOB_EXIT_STATUS = 50999"\n'
936     txt += ' echo "JobExitCode=50999" | tee -a $RUNTIME_AREA/$repo\n'
937     txt += ' dumpStatus $RUNTIME_AREA/$repo\n'
938     txt += ' rm -f $RUNTIME_AREA/$repo \n'
939     txt += ' echo "MonitorJobID=`echo $MonitorJobID`" | tee -a $RUNTIME_AREA/$repo \n'
940     txt += ' echo "MonitorID=`echo $MonitorID`" | tee -a $RUNTIME_AREA/$repo\n'
941 gutsche 1.3 txt += ' fi\n'
942     txt += ' fi \n'
943     txt += ' \n'
944 gutsche 1.7 txt += ' exit 1 \n'
945 gutsche 1.3 txt += 'else \n'
946     txt += ' echo "Successful untar" \n'
947     txt += 'fi \n'
948 gutsche 1.50 txt += '\n'
949     txt += 'echo "Include ProdAgentApi in PYTHONPATH"\n'
950     txt += 'if [ -z "$PYTHONPATH" ]; then\n'
951     txt += ' export PYTHONPATH=ProdAgentApi\n'
952     txt += 'else\n'
953     txt += ' export PYTHONPATH=ProdAgentApi:${PYTHONPATH}\n'
954     txt += 'fi\n'
955     txt += '\n'
956    
957 gutsche 1.3 pass
958    
959 slacapra 1.1 return txt
960    
961     def modifySteeringCards(self, nj):
962     """
963     modify the card provided by the user,
964     writing a new card into share dir
965     """
966    
967     def executableName(self):
968 slacapra 1.70 if self.scriptExe: #CarlosDaniele
969 spiga 1.42 return "sh "
970     else:
971     return self.executable
972 slacapra 1.1
973     def executableArgs(self):
974 slacapra 1.70 if self.scriptExe:#CarlosDaniele
975 spiga 1.42 return self.scriptExe + " $NJob"
976     else:
977     return " -p pset.cfg"
978 slacapra 1.1
979     def inputSandbox(self, nj):
980     """
981     Returns a list of filenames to be put in JDL input sandbox.
982     """
983     inp_box = []
984 slacapra 1.53 # # dict added to delete duplicate from input sandbox file list
985     # seen = {}
986 slacapra 1.1 ## code
987     if os.path.isfile(self.tgzNameWithPath):
988     inp_box.append(self.tgzNameWithPath)
989 corvo 1.58 if os.path.isfile(self.MLtgzfile):
990     inp_box.append(self.MLtgzfile)
991 slacapra 1.1 ## config
992 slacapra 1.70 if not self.pset is None:
993 corvo 1.56 inp_box.append(common.work_space.pathForTgz() + 'job/' + self.configFilename())
994 slacapra 1.1 ## additional input files
995 slacapra 1.70 for file in self.additional_inbox_files:
996     inp_box.append(file)
997 slacapra 1.1 return inp_box
998    
999     def outputSandbox(self, nj):
1000     """
1001     Returns a list of filenames to be put in JDL output sandbox.
1002     """
1003     out_box = []
1004    
1005     ## User Declared output files
1006 slacapra 1.54 for out in (self.output_file+self.output_file_sandbox):
1007 slacapra 1.1 n_out = nj + 1
1008     out_box.append(self.numberFile_(out,str(n_out)))
1009     return out_box
1010    
1011     def prepareSteeringCards(self):
1012     """
1013     Make initial modifications of the user's steering card file.
1014     """
1015     return
1016    
1017     def wsRenameOutput(self, nj):
1018     """
1019     Returns part of a job script which renames the produced files.
1020     """
1021    
1022     txt = '\n'
1023 gutsche 1.7 txt += '# directory content\n'
1024     txt += 'ls \n'
1025 slacapra 1.54
1026     for fileWithSuffix in (self.output_file+self.output_file_sandbox):
1027 slacapra 1.1 output_file_num = self.numberFile_(fileWithSuffix, '$NJob')
1028     txt += '\n'
1029 gutsche 1.7 txt += '# check output file\n'
1030 slacapra 1.1 txt += 'ls '+fileWithSuffix+'\n'
1031 fanzago 1.18 txt += 'ls_result=$?\n'
1032     txt += 'if [ $ls_result -ne 0 ] ; then\n'
1033     txt += ' echo "ERROR: Problem with output file"\n'
1034 gutsche 1.7 if common.scheduler.boss_scheduler_name == 'condor_g':
1035     txt += ' if [ $middleware == OSG ]; then \n'
1036     txt += ' echo "prepare dummy output file"\n'
1037     txt += ' echo "Processing of job output failed" > $RUNTIME_AREA/'+output_file_num+'\n'
1038     txt += ' fi \n'
1039 slacapra 1.1 txt += 'else\n'
1040     txt += ' cp '+fileWithSuffix+' $RUNTIME_AREA/'+output_file_num+'\n'
1041     txt += 'fi\n'
1042    
1043 gutsche 1.7 txt += 'cd $RUNTIME_AREA\n'
1044 fanzago 1.18 txt += 'cd $RUNTIME_AREA\n'
1045 gutsche 1.3 ### OLI_DANIELE
1046     txt += 'if [ $middleware == OSG ]; then\n'
1047     txt += ' cd $RUNTIME_AREA\n'
1048     txt += ' echo "Remove working directory: $WORKING_DIR"\n'
1049     txt += ' /bin/rm -rf $WORKING_DIR\n'
1050     txt += ' if [ -d $WORKING_DIR ] ;then\n'
1051 gutsche 1.7 txt += ' echo "SET_EXE 60999 ==> OSG $WORKING_DIR could not be deleted on WN `hostname` after cleanup of WN"\n'
1052     txt += ' echo "JOB_EXIT_STATUS = 60999"\n'
1053     txt += ' echo "JobExitCode=60999" | tee -a $RUNTIME_AREA/$repo\n'
1054     txt += ' dumpStatus $RUNTIME_AREA/$repo\n'
1055 gutsche 1.13 txt += ' rm -f $RUNTIME_AREA/$repo \n'
1056     txt += ' echo "MonitorJobID=`echo $MonitorJobID`" | tee -a $RUNTIME_AREA/$repo \n'
1057     txt += ' echo "MonitorID=`echo $MonitorID`" | tee -a $RUNTIME_AREA/$repo\n'
1058 gutsche 1.3 txt += ' fi\n'
1059     txt += 'fi\n'
1060     txt += '\n'
1061 slacapra 1.54
1062     file_list = ''
1063     ## Add to filelist only files to be possibly copied to SE
1064     for fileWithSuffix in self.output_file:
1065     output_file_num = self.numberFile_(fileWithSuffix, '$NJob')
1066     file_list=file_list+output_file_num+' '
1067     file_list=file_list[:-1]
1068     txt += 'file_list="'+file_list+'"\n'
1069    
1070 slacapra 1.1 return txt
1071    
1072     def numberFile_(self, file, txt):
1073     """
1074     append _'txt' before last extension of a file
1075     """
1076     p = string.split(file,".")
1077     # take away last extension
1078     name = p[0]
1079     for x in p[1:-1]:
1080     name=name+"."+x
1081     # add "_txt"
1082     if len(p)>1:
1083     ext = p[len(p)-1]
1084     result = name + '_' + txt + "." + ext
1085     else:
1086     result = name + '_' + txt
1087    
1088     return result
1089    
1090 slacapra 1.63 def getRequirements(self, nj=[]):
1091 slacapra 1.1 """
1092     return job requirements to add to jdl files
1093     """
1094     req = ''
1095 slacapra 1.47 if self.version:
1096 slacapra 1.10 req='Member("VO-cms-' + \
1097 slacapra 1.47 self.version + \
1098 slacapra 1.10 '", other.GlueHostApplicationSoftwareRunTimeEnvironment)'
1099 gutsche 1.35
1100     req = req + ' && (other.GlueHostNetworkAdapterOutboundIP)'
1101    
1102 slacapra 1.1 return req
1103 gutsche 1.3
1104     def configFilename(self):
1105     """ return the config filename """
1106     return self.name()+'.cfg'
1107    
1108     ### OLI_DANIELE
1109     def wsSetupCMSOSGEnvironment_(self):
1110     """
1111     Returns part of a job script which is prepares
1112     the execution environment and which is common for all CMS jobs.
1113     """
1114     txt = '\n'
1115     txt += ' echo "### SETUP CMS OSG ENVIRONMENT ###"\n'
1116     txt += ' if [ -f $GRID3_APP_DIR/cmssoft/cmsset_default.sh ] ;then\n'
1117     txt += ' # Use $GRID3_APP_DIR/cmssoft/cmsset_default.sh to setup cms software\n'
1118     txt += ' source $GRID3_APP_DIR/cmssoft/cmsset_default.sh '+self.version+'\n'
1119 mkirn 1.40 txt += ' elif [ -f $OSG_APP/cmssoft/cms/cmsset_default.sh ] ;then\n'
1120     txt += ' # Use $OSG_APP/cmssoft/cms/cmsset_default.sh to setup cms software\n'
1121     txt += ' source $OSG_APP/cmssoft/cms/cmsset_default.sh '+self.version+'\n'
1122 gutsche 1.3 txt += ' else\n'
1123 mkirn 1.40 txt += ' echo "SET_CMS_ENV 10020 ==> ERROR $GRID3_APP_DIR/cmssoft/cmsset_default.sh and $OSG_APP/cmssoft/cms/cmsset_default.sh file not found"\n'
1124 gutsche 1.3 txt += ' echo "JOB_EXIT_STATUS = 10020"\n'
1125     txt += ' echo "JobExitCode=10020" | tee -a $RUNTIME_AREA/$repo\n'
1126     txt += ' dumpStatus $RUNTIME_AREA/$repo\n'
1127 gutsche 1.13 txt += ' rm -f $RUNTIME_AREA/$repo \n'
1128     txt += ' echo "MonitorJobID=`echo $MonitorJobID`" | tee -a $RUNTIME_AREA/$repo \n'
1129     txt += ' echo "MonitorID=`echo $MonitorID`" | tee -a $RUNTIME_AREA/$repo\n'
1130 gutsche 1.7 txt += ' exit 1\n'
1131 gutsche 1.3 txt += '\n'
1132     txt += ' echo "Remove working directory: $WORKING_DIR"\n'
1133     txt += ' cd $RUNTIME_AREA\n'
1134     txt += ' /bin/rm -rf $WORKING_DIR\n'
1135     txt += ' if [ -d $WORKING_DIR ] ;then\n'
1136 mkirn 1.40 txt += ' echo "SET_CMS_ENV 10017 ==> OSG $WORKING_DIR could not be deleted on WN `hostname` after $GRID3_APP_DIR/cmssoft/cmsset_default.sh and $OSG_APP/cmssoft/cms/cmsset_default.sh file not found"\n'
1137 gutsche 1.7 txt += ' echo "JOB_EXIT_STATUS = 10017"\n'
1138     txt += ' echo "JobExitCode=10017" | tee -a $RUNTIME_AREA/$repo\n'
1139     txt += ' dumpStatus $RUNTIME_AREA/$repo\n'
1140 gutsche 1.13 txt += ' rm -f $RUNTIME_AREA/$repo \n'
1141     txt += ' echo "MonitorJobID=`echo $MonitorJobID`" | tee -a $RUNTIME_AREA/$repo \n'
1142     txt += ' echo "MonitorID=`echo $MonitorID`" | tee -a $RUNTIME_AREA/$repo\n'
1143 gutsche 1.3 txt += ' fi\n'
1144     txt += '\n'
1145 gutsche 1.7 txt += ' exit 1\n'
1146 gutsche 1.3 txt += ' fi\n'
1147     txt += '\n'
1148     txt += ' echo "SET_CMS_ENV 0 ==> setup cms environment ok"\n'
1149     txt += ' echo " END SETUP CMS OSG ENVIRONMENT "\n'
1150    
1151     return txt
1152    
1153     ### OLI_DANIELE
1154     def wsSetupCMSLCGEnvironment_(self):
1155     """
1156     Returns part of a job script which is prepares
1157     the execution environment and which is common for all CMS jobs.
1158     """
1159     txt = ' \n'
1160     txt += ' echo " ### SETUP CMS LCG ENVIRONMENT ### "\n'
1161     txt += ' if [ ! $VO_CMS_SW_DIR ] ;then\n'
1162     txt += ' echo "SET_CMS_ENV 10031 ==> ERROR CMS software dir not found on WN `hostname`"\n'
1163     txt += ' echo "JOB_EXIT_STATUS = 10031" \n'
1164     txt += ' echo "JobExitCode=10031" | tee -a $RUNTIME_AREA/$repo\n'
1165     txt += ' dumpStatus $RUNTIME_AREA/$repo\n'
1166 gutsche 1.13 txt += ' rm -f $RUNTIME_AREA/$repo \n'
1167     txt += ' echo "MonitorJobID=`echo $MonitorJobID`" | tee -a $RUNTIME_AREA/$repo \n'
1168     txt += ' echo "MonitorID=`echo $MonitorID`" | tee -a $RUNTIME_AREA/$repo\n'
1169 gutsche 1.7 txt += ' exit 1\n'
1170 gutsche 1.3 txt += ' else\n'
1171     txt += ' echo "Sourcing environment... "\n'
1172     txt += ' if [ ! -s $VO_CMS_SW_DIR/cmsset_default.sh ] ;then\n'
1173     txt += ' echo "SET_CMS_ENV 10020 ==> ERROR cmsset_default.sh file not found into dir $VO_CMS_SW_DIR"\n'
1174     txt += ' echo "JOB_EXIT_STATUS = 10020"\n'
1175     txt += ' echo "JobExitCode=10020" | tee -a $RUNTIME_AREA/$repo\n'
1176     txt += ' dumpStatus $RUNTIME_AREA/$repo\n'
1177 gutsche 1.13 txt += ' rm -f $RUNTIME_AREA/$repo \n'
1178     txt += ' echo "MonitorJobID=`echo $MonitorJobID`" | tee -a $RUNTIME_AREA/$repo \n'
1179     txt += ' echo "MonitorID=`echo $MonitorID`" | tee -a $RUNTIME_AREA/$repo\n'
1180 gutsche 1.7 txt += ' exit 1\n'
1181 gutsche 1.3 txt += ' fi\n'
1182     txt += ' echo "sourcing $VO_CMS_SW_DIR/cmsset_default.sh"\n'
1183     txt += ' source $VO_CMS_SW_DIR/cmsset_default.sh\n'
1184     txt += ' result=$?\n'
1185     txt += ' if [ $result -ne 0 ]; then\n'
1186     txt += ' echo "SET_CMS_ENV 10032 ==> ERROR problem sourcing $VO_CMS_SW_DIR/cmsset_default.sh"\n'
1187     txt += ' echo "JOB_EXIT_STATUS = 10032"\n'
1188     txt += ' echo "JobExitCode=10032" | tee -a $RUNTIME_AREA/$repo\n'
1189     txt += ' dumpStatus $RUNTIME_AREA/$repo\n'
1190 gutsche 1.13 txt += ' rm -f $RUNTIME_AREA/$repo \n'
1191     txt += ' echo "MonitorJobID=`echo $MonitorJobID`" | tee -a $RUNTIME_AREA/$repo \n'
1192     txt += ' echo "MonitorID=`echo $MonitorID`" | tee -a $RUNTIME_AREA/$repo\n'
1193 gutsche 1.7 txt += ' exit 1\n'
1194 gutsche 1.3 txt += ' fi\n'
1195     txt += ' fi\n'
1196     txt += ' \n'
1197     txt += ' string=`cat /etc/redhat-release`\n'
1198     txt += ' echo $string\n'
1199     txt += ' if [[ $string = *alhalla* ]]; then\n'
1200     txt += ' echo "SCRAM_ARCH= $SCRAM_ARCH"\n'
1201     txt += ' elif [[ $string = *Enterprise* ]] || [[ $string = *cientific* ]]; then\n'
1202     txt += ' export SCRAM_ARCH=slc3_ia32_gcc323\n'
1203     txt += ' echo "SCRAM_ARCH= $SCRAM_ARCH"\n'
1204     txt += ' else\n'
1205 gutsche 1.7 txt += ' echo "SET_CMS_ENV 10033 ==> ERROR OS unknown, LCG environment not initialized"\n'
1206 gutsche 1.3 txt += ' echo "JOB_EXIT_STATUS = 10033"\n'
1207     txt += ' echo "JobExitCode=10033" | tee -a $RUNTIME_AREA/$repo\n'
1208     txt += ' dumpStatus $RUNTIME_AREA/$repo\n'
1209 gutsche 1.13 txt += ' rm -f $RUNTIME_AREA/$repo \n'
1210     txt += ' echo "MonitorJobID=`echo $MonitorJobID`" | tee -a $RUNTIME_AREA/$repo \n'
1211     txt += ' echo "MonitorID=`echo $MonitorID`" | tee -a $RUNTIME_AREA/$repo\n'
1212 gutsche 1.7 txt += ' exit 1\n'
1213 gutsche 1.3 txt += ' fi\n'
1214     txt += ' echo "SET_CMS_ENV 0 ==> setup cms environment ok"\n'
1215     txt += ' echo "### END SETUP CMS LCG ENVIRONMENT ###"\n'
1216     return txt
1217 gutsche 1.5
1218     def setParam_(self, param, value):
1219     self._params[param] = value
1220    
1221     def getParams(self):
1222     return self._params
1223 gutsche 1.8
1224     def setTaskid_(self):
1225     self._taskId = self.cfg_params['taskId']
1226    
1227     def getTaskid(self):
1228     return self._taskId
1229 gutsche 1.35
1230     #######################################################################
1231     def uniquelist(self, old):
1232     """
1233     remove duplicates from a list
1234     """
1235     nd={}
1236     for e in old:
1237     nd[e]=0
1238     return nd.keys()