ViewVC Help
View File | Revision Log | Show Annotations | Root Listing
root/cvsroot/COMP/CRAB/python/cms_cmssw.py
Revision: 1.60
Committed: Tue Jan 16 15:32:33 2007 UTC (18 years, 3 months ago) by slacapra
Content type: text/x-python
Branch: MAIN
Changes since 1.59: +7 -7 lines
Log Message:
add all data dir found in swarea to tar-ball

File Contents

# User Rev Content
1 slacapra 1.1 from JobType import JobType
2     from crab_logger import Logger
3     from crab_exceptions import *
4     from crab_util import *
5 slacapra 1.22 import math
6 slacapra 1.1 import common
7 gutsche 1.3 import PsetManipulator
8 slacapra 1.1
9 slacapra 1.41 import DBSInfo
10     import DataDiscovery
11     import DataLocation
12 slacapra 1.1 import Scram
13    
14 corvo 1.56 import glob, os, string, re, shutil
15 slacapra 1.1
16     class Cmssw(JobType):
17 gutsche 1.38 def __init__(self, cfg_params, ncjobs):
18 slacapra 1.1 JobType.__init__(self, 'CMSSW')
19     common.logger.debug(3,'CMSSW::__init__')
20    
21 gutsche 1.3 # Marco.
22     self._params = {}
23     self.cfg_params = cfg_params
24 gutsche 1.38
25 gutsche 1.44 # number of jobs requested to be created, limit obj splitting
26 gutsche 1.38 self.ncjobs = ncjobs
27    
28 slacapra 1.1 log = common.logger
29    
30     self.scram = Scram.Scram(cfg_params)
31     scramArea = ''
32     self.additional_inbox_files = []
33     self.scriptExe = ''
34     self.executable = ''
35     self.tgz_name = 'default.tgz'
36 corvo 1.56 self.scriptName = 'CMSSW.sh'
37 spiga 1.42 self.pset = '' #scrip use case Da
38     self.datasetPath = '' #scrip use case Da
39 gutsche 1.3
40 gutsche 1.50 # set FJR file name
41     self.fjrFileName = 'crab_fjr.xml'
42    
43 slacapra 1.1 self.version = self.scram.getSWVersion()
44 slacapra 1.55 common.taskDB.setDict('codeVersion',self.version)
45 gutsche 1.5 self.setParam_('application', self.version)
46 slacapra 1.47
47 slacapra 1.1 ### collect Data cards
48     try:
49 slacapra 1.9 tmp = cfg_params['CMSSW.datasetpath']
50     log.debug(6, "CMSSW::CMSSW(): datasetPath = "+tmp)
51     if string.lower(tmp)=='none':
52     self.datasetPath = None
53 slacapra 1.21 self.selectNoInput = 1
54 slacapra 1.9 else:
55     self.datasetPath = tmp
56 slacapra 1.21 self.selectNoInput = 0
57 slacapra 1.1 except KeyError:
58 gutsche 1.3 msg = "Error: datasetpath not defined "
59 slacapra 1.1 raise CrabException(msg)
60 gutsche 1.5
61     # ML monitoring
62     # split dataset path style: /PreProdR3Minbias/SIM/GEN-SIM
63 slacapra 1.9 if not self.datasetPath:
64     self.setParam_('dataset', 'None')
65     self.setParam_('owner', 'None')
66     else:
67     datasetpath_split = self.datasetPath.split("/")
68     self.setParam_('dataset', datasetpath_split[1])
69     self.setParam_('owner', datasetpath_split[-1])
70    
71 gutsche 1.8 self.setTaskid_()
72     self.setParam_('taskId', self.cfg_params['taskId'])
73 gutsche 1.5
74 slacapra 1.1 self.dataTiers = []
75    
76     ## now the application
77     try:
78     self.executable = cfg_params['CMSSW.executable']
79 gutsche 1.5 self.setParam_('exe', self.executable)
80 slacapra 1.1 log.debug(6, "CMSSW::CMSSW(): executable = "+self.executable)
81     msg = "Default executable cmsRun overridden. Switch to " + self.executable
82     log.debug(3,msg)
83     except KeyError:
84     self.executable = 'cmsRun'
85 gutsche 1.5 self.setParam_('exe', self.executable)
86 slacapra 1.1 msg = "User executable not defined. Use cmsRun"
87     log.debug(3,msg)
88     pass
89    
90     try:
91     self.pset = cfg_params['CMSSW.pset']
92     log.debug(6, "Cmssw::Cmssw(): PSet file = "+self.pset)
93 spiga 1.42 if self.pset.lower() != 'none' :
94     if (not os.path.exists(self.pset)):
95     raise CrabException("User defined PSet file "+self.pset+" does not exist")
96     else:
97     self.pset = None
98 slacapra 1.1 except KeyError:
99     raise CrabException("PSet file missing. Cannot run cmsRun ")
100    
101     # output files
102 slacapra 1.53 ## stuff which must be returned always via sandbox
103     self.output_file_sandbox = []
104    
105     # add fjr report by default via sandbox
106     self.output_file_sandbox.append(self.fjrFileName)
107    
108     # other output files to be returned via sandbox or copied to SE
109 slacapra 1.1 try:
110     self.output_file = []
111    
112 gutsche 1.50
113 slacapra 1.1 tmp = cfg_params['CMSSW.output_file']
114     if tmp != '':
115     tmpOutFiles = string.split(cfg_params['CMSSW.output_file'],',')
116     log.debug(7, 'cmssw::cmssw(): output files '+str(tmpOutFiles))
117     for tmp in tmpOutFiles:
118     tmp=string.strip(tmp)
119     self.output_file.append(tmp)
120     pass
121     else:
122 gutsche 1.50 log.message("No output file defined: only stdout/err and the CRAB Framework Job Report will be available")
123 slacapra 1.1 pass
124     pass
125     except KeyError:
126 gutsche 1.50 log.message("No output file defined: only stdout/err and the CRAB Framework Job Report will be available")
127 slacapra 1.1 pass
128    
129     # script_exe file as additional file in inputSandbox
130     try:
131 slacapra 1.10 self.scriptExe = cfg_params['USER.script_exe']
132     if self.scriptExe != '':
133     if not os.path.isfile(self.scriptExe):
134     msg ="WARNING. file "+self.scriptExe+" not found"
135     raise CrabException(msg)
136 spiga 1.42 self.additional_inbox_files.append(string.strip(self.scriptExe))
137 slacapra 1.1 except KeyError:
138 spiga 1.42 self.scriptExe = ''
139     #CarlosDaniele
140     if self.datasetPath == None and self.pset == None and self.scriptExe == '' :
141     msg ="WARNING. script_exe not defined"
142     raise CrabException(msg)
143    
144 slacapra 1.1 ## additional input files
145     try:
146 slacapra 1.29 tmpAddFiles = string.split(cfg_params['USER.additional_input_files'],',')
147 slacapra 1.1 for tmp in tmpAddFiles:
148 slacapra 1.51 tmp = string.strip(tmp)
149 slacapra 1.45 dirname = ''
150     if not tmp[0]=="/": dirname = "."
151 corvo 1.58 shutil.copyfile(tmp, common.work_space.shareDir()+tmp)
152     files = common.work_space.pathForTgz()+'share/' + tmp
153     if not os.path.exists(files):
154     raise CrabException("Additional input file not found: "+files)
155 slacapra 1.45 pass
156 corvo 1.58 self.additional_inbox_files.append(string.strip(files))
157 slacapra 1.1 pass
158     pass
159 slacapra 1.45 common.logger.debug(5,"Additional input files: "+str(self.additional_inbox_files))
160 slacapra 1.1 except KeyError:
161     pass
162    
163 slacapra 1.9 # files per job
164 slacapra 1.1 try:
165 gutsche 1.35 if (cfg_params['CMSSW.files_per_jobs']):
166     raise CrabException("files_per_jobs no longer supported. Quitting.")
167 gutsche 1.3 except KeyError:
168 gutsche 1.35 pass
169 gutsche 1.3
170 slacapra 1.9 ## Events per job
171 gutsche 1.3 try:
172 slacapra 1.10 self.eventsPerJob =int( cfg_params['CMSSW.events_per_job'])
173 slacapra 1.9 self.selectEventsPerJob = 1
174 gutsche 1.3 except KeyError:
175 slacapra 1.9 self.eventsPerJob = -1
176     self.selectEventsPerJob = 0
177    
178 slacapra 1.22 ## number of jobs
179     try:
180     self.theNumberOfJobs =int( cfg_params['CMSSW.number_of_jobs'])
181     self.selectNumberOfJobs = 1
182     except KeyError:
183     self.theNumberOfJobs = 0
184     self.selectNumberOfJobs = 0
185 slacapra 1.10
186 gutsche 1.35 try:
187     self.total_number_of_events = int(cfg_params['CMSSW.total_number_of_events'])
188     self.selectTotalNumberEvents = 1
189     except KeyError:
190     self.total_number_of_events = 0
191     self.selectTotalNumberEvents = 0
192    
193 spiga 1.42 if self.pset != None: #CarlosDaniele
194     if ( (self.selectTotalNumberEvents + self.selectEventsPerJob + self.selectNumberOfJobs) != 2 ):
195     msg = 'Must define exactly two of total_number_of_events, events_per_job, or number_of_jobs.'
196     raise CrabException(msg)
197     else:
198     if (self.selectNumberOfJobs == 0):
199     msg = 'Must specify number_of_jobs.'
200     raise CrabException(msg)
201 gutsche 1.35
202 slacapra 1.22 ## source seed for pythia
203     try:
204     self.sourceSeed = int(cfg_params['CMSSW.pythia_seed'])
205     except KeyError:
206 slacapra 1.23 self.sourceSeed = None
207     common.logger.debug(5,"No seed given")
208 slacapra 1.22
209 slacapra 1.28 try:
210     self.sourceSeedVtx = int(cfg_params['CMSSW.vtx_seed'])
211     except KeyError:
212     self.sourceSeedVtx = None
213     common.logger.debug(5,"No vertex seed given")
214 spiga 1.57 try:
215     self.firstRun = int(cfg_params['CMSSW.first_run'])
216     except KeyError:
217     self.firstRun = None
218     common.logger.debug(5,"No first run given")
219 spiga 1.42 if self.pset != None: #CarlosDaniele
220     self.PsetEdit = PsetManipulator.PsetManipulator(self.pset) #Daniele Pset
221 gutsche 1.3
222 slacapra 1.1 #DBSDLS-start
223     ## Initialize the variables that are extracted from DBS/DLS and needed in other places of the code
224     self.maxEvents=0 # max events available ( --> check the requested nb. of evts in Creator.py)
225     self.DBSPaths={} # all dbs paths requested ( --> input to the site local discovery script)
226 gutsche 1.35 self.jobDestination=[] # Site destination(s) for each job (list of lists)
227 slacapra 1.1 ## Perform the data location and discovery (based on DBS/DLS)
228 slacapra 1.9 ## SL: Don't if NONE is specified as input (pythia use case)
229 gutsche 1.35 blockSites = {}
230 slacapra 1.9 if self.datasetPath:
231 gutsche 1.35 blockSites = self.DataDiscoveryAndLocation(cfg_params)
232 slacapra 1.1 #DBSDLS-end
233    
234     self.tgzNameWithPath = self.getTarBall(self.executable)
235 slacapra 1.10
236 slacapra 1.9 ## Select Splitting
237 spiga 1.42 if self.selectNoInput:
238     if self.pset == None: #CarlosDaniele
239     self.jobSplittingForScript()
240     else:
241     self.jobSplittingNoInput()
242 corvo 1.56 else:
243     self.jobSplittingByBlocks(blockSites)
244 gutsche 1.5
245 slacapra 1.22 # modify Pset
246 spiga 1.42 if self.pset != None: #CarlosDaniele
247     try:
248     if (self.datasetPath): # standard job
249     # allow to processa a fraction of events in a file
250     self.PsetEdit.inputModule("INPUT")
251     self.PsetEdit.maxEvent("INPUTMAXEVENTS")
252     self.PsetEdit.skipEvent("INPUTSKIPEVENTS")
253     else: # pythia like job
254     self.PsetEdit.maxEvent(self.eventsPerJob)
255 spiga 1.57 if (self.firstRun):
256     self.PsetEdit.pythiaFirstRun("INPUTFIRSTRUN") #First Run
257 spiga 1.42 if (self.sourceSeed) :
258     self.PsetEdit.pythiaSeed("INPUT")
259     if (self.sourceSeedVtx) :
260     self.PsetEdit.pythiaSeedVtx("INPUTVTX")
261 gutsche 1.50 # add FrameworkJobReport to parameter-set
262     self.PsetEdit.addCrabFJR(self.fjrFileName)
263 spiga 1.42 self.PsetEdit.psetWriter(self.configFilename())
264     except:
265     msg='Error while manipuliating ParameterSet: exiting...'
266     raise CrabException(msg)
267 gutsche 1.3
268 slacapra 1.1 def DataDiscoveryAndLocation(self, cfg_params):
269    
270 gutsche 1.3 common.logger.debug(10,"CMSSW::DataDiscoveryAndLocation()")
271    
272     datasetPath=self.datasetPath
273    
274     ## TODO
275     dataTiersList = ""
276     dataTiers = dataTiersList.split(',')
277 slacapra 1.1
278     ## Contact the DBS
279 slacapra 1.41 common.logger.message("Contacting DBS...")
280 slacapra 1.1 try:
281 slacapra 1.41 self.pubdata=DataDiscovery.DataDiscovery(datasetPath, dataTiers, cfg_params)
282 slacapra 1.1 self.pubdata.fetchDBSInfo()
283    
284 slacapra 1.41 except DataDiscovery.NotExistingDatasetError, ex :
285 slacapra 1.1 msg = 'ERROR ***: failed Data Discovery in DBS : %s'%ex.getErrorMessage()
286     raise CrabException(msg)
287    
288 slacapra 1.41 except DataDiscovery.NoDataTierinProvenanceError, ex :
289 slacapra 1.1 msg = 'ERROR ***: failed Data Discovery in DBS : %s'%ex.getErrorMessage()
290     raise CrabException(msg)
291 slacapra 1.41 except DataDiscovery.DataDiscoveryError, ex:
292 slacapra 1.1 msg = 'ERROR ***: failed Data Discovery in DBS %s'%ex.getErrorMessage()
293     raise CrabException(msg)
294    
295     ## get list of all required data in the form of dbs paths (dbs path = /dataset/datatier/owner)
296 gutsche 1.3 ## self.DBSPaths=self.pubdata.getDBSPaths()
297     common.logger.message("Required data are :"+self.datasetPath)
298    
299 gutsche 1.35 self.filesbyblock=self.pubdata.getFiles()
300 mkirn 1.37 self.eventsbyblock=self.pubdata.getEventsPerBlock()
301     self.eventsbyfile=self.pubdata.getEventsPerFile()
302 slacapra 1.41 # print str(self.filesbyblock)
303     # print 'self.eventsbyfile',len(self.eventsbyfile)
304     # print str(self.eventsbyfile)
305 gutsche 1.3
306 slacapra 1.1 ## get max number of events
307     self.maxEvents=self.pubdata.getMaxEvents() ## self.maxEvents used in Creator.py
308 gutsche 1.44 common.logger.message("The number of available events is %s\n"%self.maxEvents)
309 slacapra 1.1
310 slacapra 1.41 common.logger.message("Contacting DLS...")
311 slacapra 1.1 ## Contact the DLS and build a list of sites hosting the fileblocks
312     try:
313 slacapra 1.41 dataloc=DataLocation.DataLocation(self.filesbyblock.keys(),cfg_params)
314 gutsche 1.6 dataloc.fetchDLSInfo()
315 slacapra 1.41 except DataLocation.DataLocationError , ex:
316 slacapra 1.1 msg = 'ERROR ***: failed Data Location in DLS \n %s '%ex.getErrorMessage()
317     raise CrabException(msg)
318    
319    
320 gutsche 1.35 sites = dataloc.getSites()
321     allSites = []
322     listSites = sites.values()
323     for list in listSites:
324     for oneSite in list:
325     allSites.append(oneSite)
326     allSites = self.uniquelist(allSites)
327 gutsche 1.3
328 gutsche 1.35 common.logger.message("Sites ("+str(len(allSites))+") hosting part/all of dataset: "+str(allSites))
329     common.logger.debug(6, "List of Sites: "+str(allSites))
330     return sites
331 gutsche 1.3
332 gutsche 1.35 def jobSplittingByBlocks(self, blockSites):
333 slacapra 1.9 """
334 gutsche 1.35 Perform job splitting. Jobs run over an integer number of files
335     and no more than one block.
336     ARGUMENT: blockSites: dictionary with blocks as keys and list of host sites as values
337     REQUIRES: self.selectTotalNumberEvents, self.selectEventsPerJob, self.selectNumberofJobs,
338     self.total_number_of_events, self.eventsPerJob, self.theNumberOfJobs,
339     self.maxEvents, self.filesbyblock
340     SETS: self.jobDestination - Site destination(s) for each job (a list of lists)
341     self.total_number_of_jobs - Total # of jobs
342     self.list_of_args - File(s) job will run on (a list of lists)
343     """
344    
345     # ---- Handle the possible job splitting configurations ---- #
346     if (self.selectTotalNumberEvents):
347     totalEventsRequested = self.total_number_of_events
348     if (self.selectEventsPerJob):
349     eventsPerJobRequested = self.eventsPerJob
350     if (self.selectNumberOfJobs):
351     totalEventsRequested = self.theNumberOfJobs * self.eventsPerJob
352    
353     # If user requested all the events in the dataset
354     if (totalEventsRequested == -1):
355     eventsRemaining=self.maxEvents
356     # If user requested more events than are in the dataset
357     elif (totalEventsRequested > self.maxEvents):
358     eventsRemaining = self.maxEvents
359     common.logger.message("Requested "+str(self.total_number_of_events)+ " events, but only "+str(self.maxEvents)+" events are available.")
360     # If user requested less events than are in the dataset
361     else:
362     eventsRemaining = totalEventsRequested
363 slacapra 1.22
364 slacapra 1.41 # If user requested more events per job than are in the dataset
365     if (self.selectEventsPerJob and eventsPerJobRequested > self.maxEvents):
366     eventsPerJobRequested = self.maxEvents
367    
368 gutsche 1.35 # For user info at end
369     totalEventCount = 0
370 gutsche 1.3
371 gutsche 1.35 if (self.selectTotalNumberEvents and self.selectNumberOfJobs):
372     eventsPerJobRequested = int(eventsRemaining/self.theNumberOfJobs)
373 slacapra 1.22
374 gutsche 1.35 if (self.selectNumberOfJobs):
375     common.logger.message("May not create the exact number_of_jobs requested.")
376 slacapra 1.23
377 gutsche 1.38 if ( self.ncjobs == 'all' ) :
378     totalNumberOfJobs = 999999999
379     else :
380     totalNumberOfJobs = self.ncjobs
381    
382    
383 gutsche 1.35 blocks = blockSites.keys()
384     blockCount = 0
385     # Backup variable in case self.maxEvents counted events in a non-included block
386     numBlocksInDataset = len(blocks)
387 gutsche 1.3
388 gutsche 1.35 jobCount = 0
389     list_of_lists = []
390 gutsche 1.3
391 gutsche 1.35 # ---- Iterate over the blocks in the dataset until ---- #
392     # ---- we've met the requested total # of events ---- #
393 gutsche 1.38 while ( (eventsRemaining > 0) and (blockCount < numBlocksInDataset) and (jobCount < totalNumberOfJobs)):
394 gutsche 1.35 block = blocks[blockCount]
395 gutsche 1.44 blockCount += 1
396    
397 gutsche 1.3
398 gutsche 1.44 numEventsInBlock = self.eventsbyblock[block]
399     common.logger.debug(5,'Events in Block File '+str(numEventsInBlock))
400 slacapra 1.9
401 gutsche 1.35 files = self.filesbyblock[block]
402     numFilesInBlock = len(files)
403     if (numFilesInBlock <= 0):
404     continue
405     fileCount = 0
406    
407     # ---- New block => New job ---- #
408     parString = "\\{"
409 gutsche 1.38 # counter for number of events in files currently worked on
410     filesEventCount = 0
411     # flag if next while loop should touch new file
412     newFile = 1
413     # job event counter
414     jobSkipEventCount = 0
415 slacapra 1.9
416 gutsche 1.35 # ---- Iterate over the files in the block until we've met the requested ---- #
417     # ---- total # of events or we've gone over all the files in this block ---- #
418 gutsche 1.38 while ( (eventsRemaining > 0) and (fileCount < numFilesInBlock) and (jobCount < totalNumberOfJobs) ):
419 gutsche 1.35 file = files[fileCount]
420 gutsche 1.38 if newFile :
421 slacapra 1.41 try:
422     numEventsInFile = self.eventsbyfile[file]
423     common.logger.debug(6, "File "+str(file)+" has "+str(numEventsInFile)+" events")
424     # increase filesEventCount
425     filesEventCount += numEventsInFile
426     # Add file to current job
427     parString += '\\\"' + file + '\\\"\,'
428     newFile = 0
429     except KeyError:
430 gutsche 1.44 common.logger.message("File "+str(file)+" has unknown number of events: skipping")
431 slacapra 1.41
432 gutsche 1.38
433     # if less events in file remain than eventsPerJobRequested
434     if ( filesEventCount - jobSkipEventCount < eventsPerJobRequested ) :
435     # if last file in block
436 gutsche 1.44 if ( fileCount == numFilesInBlock-1 ) :
437 gutsche 1.38 # end job using last file, use remaining events in block
438     # close job and touch new file
439     fullString = parString[:-2]
440     fullString += '\\}'
441     list_of_lists.append([fullString,str(-1),str(jobSkipEventCount)])
442 slacapra 1.41 common.logger.debug(3,"Job "+str(jobCount+1)+" can run over "+str(filesEventCount - jobSkipEventCount)+" events (last file in block).")
443 gutsche 1.38 self.jobDestination.append(blockSites[block])
444     common.logger.debug(5,"Job "+str(jobCount+1)+" Destination: "+str(self.jobDestination[jobCount]))
445     # reset counter
446     jobCount = jobCount + 1
447 gutsche 1.44 totalEventCount = totalEventCount + filesEventCount - jobSkipEventCount
448     eventsRemaining = eventsRemaining - filesEventCount + jobSkipEventCount
449 gutsche 1.38 jobSkipEventCount = 0
450     # reset file
451     parString = "\\{"
452     filesEventCount = 0
453     newFile = 1
454     fileCount += 1
455     else :
456     # go to next file
457     newFile = 1
458     fileCount += 1
459     # if events in file equal to eventsPerJobRequested
460     elif ( filesEventCount - jobSkipEventCount == eventsPerJobRequested ) :
461     # close job and touch new file
462 gutsche 1.35 fullString = parString[:-2]
463     fullString += '\\}'
464 gutsche 1.38 list_of_lists.append([fullString,str(eventsPerJobRequested),str(jobSkipEventCount)])
465 slacapra 1.41 common.logger.debug(3,"Job "+str(jobCount+1)+" can run over "+str(eventsPerJobRequested)+" events.")
466 gutsche 1.38 self.jobDestination.append(blockSites[block])
467     common.logger.debug(5,"Job "+str(jobCount+1)+" Destination: "+str(self.jobDestination[jobCount]))
468     # reset counter
469     jobCount = jobCount + 1
470     totalEventCount = totalEventCount + eventsPerJobRequested
471     eventsRemaining = eventsRemaining - eventsPerJobRequested
472     jobSkipEventCount = 0
473     # reset file
474     parString = "\\{"
475     filesEventCount = 0
476     newFile = 1
477     fileCount += 1
478    
479     # if more events in file remain than eventsPerJobRequested
480     else :
481     # close job but don't touch new file
482     fullString = parString[:-2]
483     fullString += '\\}'
484     list_of_lists.append([fullString,str(eventsPerJobRequested),str(jobSkipEventCount)])
485 slacapra 1.41 common.logger.debug(3,"Job "+str(jobCount+1)+" can run over "+str(eventsPerJobRequested)+" events.")
486 gutsche 1.35 self.jobDestination.append(blockSites[block])
487     common.logger.debug(5,"Job "+str(jobCount+1)+" Destination: "+str(self.jobDestination[jobCount]))
488 gutsche 1.38 # increase counter
489     jobCount = jobCount + 1
490     totalEventCount = totalEventCount + eventsPerJobRequested
491     eventsRemaining = eventsRemaining - eventsPerJobRequested
492     # calculate skip events for last file
493     # use filesEventCount (contains several files), jobSkipEventCount and eventsPerJobRequest
494     jobSkipEventCount = eventsPerJobRequested - (filesEventCount - jobSkipEventCount - self.eventsbyfile[file])
495     # remove all but the last file
496     filesEventCount = self.eventsbyfile[file]
497     parString = "\\{"
498     parString += '\\\"' + file + '\\\"\,'
499 slacapra 1.41 pass # END if
500 gutsche 1.35 pass # END while (iterate over files in the block)
501     pass # END while (iterate over blocks in the dataset)
502 slacapra 1.41 self.ncjobs = self.total_number_of_jobs = jobCount
503 gutsche 1.38 if (eventsRemaining > 0 and jobCount < totalNumberOfJobs ):
504 gutsche 1.35 common.logger.message("Could not run on all requested events because some blocks not hosted at allowed sites.")
505 mkirn 1.37 common.logger.message("\n"+str(jobCount)+" job(s) can run on "+str(totalEventCount)+" events.\n")
506 slacapra 1.22
507 slacapra 1.9 self.list_of_args = list_of_lists
508     return
509    
510 slacapra 1.21 def jobSplittingNoInput(self):
511 slacapra 1.9 """
512     Perform job splitting based on number of event per job
513     """
514     common.logger.debug(5,'Splitting per events')
515     common.logger.message('Required '+str(self.eventsPerJob)+' events per job ')
516 slacapra 1.22 common.logger.message('Required '+str(self.theNumberOfJobs)+' jobs in total ')
517 slacapra 1.9 common.logger.message('Required '+str(self.total_number_of_events)+' events in total ')
518    
519 slacapra 1.10 if (self.total_number_of_events < 0):
520     msg='Cannot split jobs per Events with "-1" as total number of events'
521     raise CrabException(msg)
522    
523 slacapra 1.22 if (self.selectEventsPerJob):
524     self.total_number_of_jobs = int(self.total_number_of_events/self.eventsPerJob)
525     elif (self.selectNumberOfJobs) :
526     self.total_number_of_jobs = self.theNumberOfJobs
527     self.eventsPerJob = int(self.total_number_of_events/self.total_number_of_jobs)
528 fanzago 1.12
529 slacapra 1.9 common.logger.debug(5,'N jobs '+str(self.total_number_of_jobs))
530    
531     # is there any remainder?
532     check = int(self.total_number_of_events) - (int(self.total_number_of_jobs)*self.eventsPerJob)
533    
534     common.logger.debug(5,'Check '+str(check))
535    
536 gutsche 1.35 common.logger.message(str(self.total_number_of_jobs)+' jobs can be created, each for '+str(self.eventsPerJob)+' for a total of '+str(self.total_number_of_jobs*self.eventsPerJob)+' events')
537 slacapra 1.9 if check > 0:
538 gutsche 1.35 common.logger.message('Warning: asked '+str(self.total_number_of_events)+' but can do only '+str(int(self.total_number_of_jobs)*self.eventsPerJob))
539 slacapra 1.9
540 slacapra 1.10 # argument is seed number.$i
541 slacapra 1.9 self.list_of_args = []
542     for i in range(self.total_number_of_jobs):
543 gutsche 1.35 ## Since there is no input, any site is good
544 spiga 1.42 # self.jobDestination.append(["Any"])
545     self.jobDestination.append([""]) #must be empty to write correctly the xml
546 spiga 1.57 args=''
547     if (self.firstRun):
548     ## pythia first run
549     #self.list_of_args.append([(str(self.firstRun)+str(i))])
550     args=args+(str(self.firstRun)+str(i))
551     else:
552     ## no first run
553     #self.list_of_args.append([str(i)])
554     args=args+str(i)
555 slacapra 1.23 if (self.sourceSeed):
556 slacapra 1.28 if (self.sourceSeedVtx):
557     ## pythia + vtx random seed
558 spiga 1.57 #self.list_of_args.append([
559     # str(self.sourceSeed)+str(i),
560     # str(self.sourceSeedVtx)+str(i)
561     # ])
562     args=args+str(',')+str(self.sourceSeed)+str(i)+str(',')+str(self.sourceSeedVtx)+str(i)
563 slacapra 1.28 else:
564     ## only pythia random seed
565 spiga 1.57 #self.list_of_args.append([(str(self.sourceSeed)+str(i))])
566     args=args +str(',')+str(self.sourceSeed)+str(i)
567 slacapra 1.23 else:
568 slacapra 1.28 ## no random seed
569 spiga 1.57 if str(args)=='': args=args+(str(self.firstRun)+str(i))
570     arguments=args.split(',')
571     if len(arguments)==3:self.list_of_args.append([str(arguments[0]),str(arguments[1]),str(arguments[2])])
572     elif len(arguments)==2:self.list_of_args.append([str(arguments[0]),str(arguments[1])])
573     else :self.list_of_args.append([str(arguments[0])])
574    
575     # print self.list_of_args
576 gutsche 1.3
577     return
578    
579 spiga 1.42
580     def jobSplittingForScript(self):#CarlosDaniele
581     """
582     Perform job splitting based on number of job
583     """
584     common.logger.debug(5,'Splitting per job')
585     common.logger.message('Required '+str(self.theNumberOfJobs)+' jobs in total ')
586    
587     self.total_number_of_jobs = self.theNumberOfJobs
588    
589     common.logger.debug(5,'N jobs '+str(self.total_number_of_jobs))
590    
591     common.logger.message(str(self.total_number_of_jobs)+' jobs can be created')
592    
593     # argument is seed number.$i
594     self.list_of_args = []
595     for i in range(self.total_number_of_jobs):
596     ## Since there is no input, any site is good
597     # self.jobDestination.append(["Any"])
598     self.jobDestination.append([""])
599     ## no random seed
600     self.list_of_args.append([str(i)])
601     return
602    
603 gutsche 1.3 def split(self, jobParams):
604    
605     common.jobDB.load()
606     #### Fabio
607     njobs = self.total_number_of_jobs
608 slacapra 1.9 arglist = self.list_of_args
609 gutsche 1.3 # create the empty structure
610     for i in range(njobs):
611     jobParams.append("")
612    
613     for job in range(njobs):
614 slacapra 1.17 jobParams[job] = arglist[job]
615     # print str(arglist[job])
616     # print jobParams[job]
617 gutsche 1.3 common.jobDB.setArguments(job, jobParams[job])
618 gutsche 1.35 common.logger.debug(5,"Job "+str(job)+" Destination: "+str(self.jobDestination[job]))
619     common.jobDB.setDestination(job, self.jobDestination[job])
620 gutsche 1.3
621     common.jobDB.save()
622     return
623    
624     def getJobTypeArguments(self, nj, sched):
625 slacapra 1.17 result = ''
626     for i in common.jobDB.arguments(nj):
627     result=result+str(i)+" "
628     return result
629 gutsche 1.3
630     def numberOfJobs(self):
631     # Fabio
632     return self.total_number_of_jobs
633    
634 slacapra 1.1 def getTarBall(self, exe):
635     """
636     Return the TarBall with lib and exe
637     """
638    
639     # if it exist, just return it
640 corvo 1.56 #
641     # Marco. Let's start to use relative path for Boss XML files
642     #
643     self.tgzNameWithPath = common.work_space.pathForTgz()+'share/'+self.tgz_name
644 slacapra 1.1 if os.path.exists(self.tgzNameWithPath):
645     return self.tgzNameWithPath
646    
647     # Prepare a tar gzipped file with user binaries.
648     self.buildTar_(exe)
649    
650     return string.strip(self.tgzNameWithPath)
651    
652     def buildTar_(self, executable):
653    
654     # First of all declare the user Scram area
655     swArea = self.scram.getSWArea_()
656     #print "swArea = ", swArea
657     swVersion = self.scram.getSWVersion()
658     #print "swVersion = ", swVersion
659     swReleaseTop = self.scram.getReleaseTop_()
660     #print "swReleaseTop = ", swReleaseTop
661    
662     ## check if working area is release top
663     if swReleaseTop == '' or swArea == swReleaseTop:
664     return
665    
666     filesToBeTarred = []
667 corvo 1.58 mlFiles = []
668 slacapra 1.1 ## First find the executable
669     if (self.executable != ''):
670     exeWithPath = self.scram.findFile_(executable)
671     # print exeWithPath
672     if ( not exeWithPath ):
673     raise CrabException('User executable '+executable+' not found')
674    
675     ## then check if it's private or not
676     if exeWithPath.find(swReleaseTop) == -1:
677     # the exe is private, so we must ship
678     common.logger.debug(5,"Exe "+exeWithPath+" to be tarred")
679     path = swArea+'/'
680     exe = string.replace(exeWithPath, path,'')
681     filesToBeTarred.append(exe)
682     pass
683     else:
684     # the exe is from release, we'll find it on WN
685     pass
686    
687     ## Now get the libraries: only those in local working area
688     libDir = 'lib'
689     lib = swArea+'/' +libDir
690     common.logger.debug(5,"lib "+lib+" to be tarred")
691     if os.path.exists(lib):
692     filesToBeTarred.append(libDir)
693    
694 gutsche 1.3 ## Now check if module dir is present
695     moduleDir = 'module'
696 corvo 1.56 module = swArea + '/' + moduleDir
697     if os.path.isdir(module):
698 gutsche 1.3 filesToBeTarred.append(moduleDir)
699    
700 slacapra 1.60 ## Now check if any data dir(s) is present
701     for root, dirs, files in os.walk(swArea):
702     if "data" in dirs:
703     common.logger.debug(5,"data "+root+"/data"+" to be tarred")
704     filesToBeTarred.append(root+"/data")
705 gutsche 1.50
706     ## copy ProdAgent dir to swArea
707     cmd = '\cp -rf ' + os.environ['CRABDIR'] + '/ProdAgentApi ' + swArea
708     cmd_out = runCommand(cmd)
709     if cmd_out != '':
710     common.logger.message('ProdAgentApi directory could not be copied to local CMSSW project directory.')
711     common.logger.message('No FrameworkJobreport parsing is possible on the WorkerNode.')
712    
713 slacapra 1.60 ## Add ProdAgent dir to tar
714 gutsche 1.50 paDir = 'ProdAgentApi'
715 corvo 1.56 pa = swArea + '/' + 'ProdAgentApi'
716     if os.path.isdir(pa):
717 gutsche 1.50 filesToBeTarred.append(paDir)
718 corvo 1.56
719     for file in ['report.py', 'DashboardAPI.py', 'Logger.py', 'ProcInfo.py', 'apmon.py', 'parseCrabFjr.py']:
720 corvo 1.58 #shutil.copyfile(os.environ['CRABDIR']+'/python/'+file, swArea+'/'+file)
721     mlFiles.append(file)
722     self.MLtgzfile = common.work_space.pathForTgz()+'share/MLfiles.tgz'
723    
724 slacapra 1.59 tarcmd = 'tar zhcvf ' + self.MLtgzfile + ' -C ' + os.environ['CRABDIR'] + '/python/' + ' '
725 corvo 1.58 for f in mlFiles:
726     tarcmd = tarcmd + f + ' '
727     cout = runCommand(tarcmd)
728     if not cout:
729     raise CrabException('Could not create ML files tar-ball')
730    
731 slacapra 1.1 ## Create the tar-ball
732     if len(filesToBeTarred)>0:
733 slacapra 1.60 tarcmd = 'tar zhcvf ' + self.tgzNameWithPath + ' -C ' + swArea + ' '
734 slacapra 1.1 for line in filesToBeTarred:
735     tarcmd = tarcmd + line + ' '
736     cout = runCommand(tarcmd)
737     if not cout:
738     raise CrabException('Could not create tar-ball')
739     else:
740     common.logger.debug(5,"No files to be to be tarred")
741    
742     return
743    
744     def wsSetupEnvironment(self, nj):
745     """
746     Returns part of a job script which prepares
747     the execution environment for the job 'nj'.
748     """
749     # Prepare JobType-independent part
750 gutsche 1.3 txt = ''
751    
752     ## OLI_Daniele at this level middleware already known
753    
754     txt += 'if [ $middleware == LCG ]; then \n'
755     txt += self.wsSetupCMSLCGEnvironment_()
756     txt += 'elif [ $middleware == OSG ]; then\n'
757 gutsche 1.43 txt += ' WORKING_DIR=`/bin/mktemp -d $OSG_WN_TMP/cms_XXXXXXXXXXXX`\n'
758     txt += ' echo "Created working directory: $WORKING_DIR"\n'
759 gutsche 1.3 txt += ' if [ ! -d $WORKING_DIR ] ;then\n'
760 gutsche 1.7 txt += ' echo "SET_CMS_ENV 10016 ==> OSG $WORKING_DIR could not be created on WN `hostname`"\n'
761     txt += ' echo "JOB_EXIT_STATUS = 10016"\n'
762     txt += ' echo "JobExitCode=10016" | tee -a $RUNTIME_AREA/$repo\n'
763     txt += ' dumpStatus $RUNTIME_AREA/$repo\n'
764 gutsche 1.13 txt += ' rm -f $RUNTIME_AREA/$repo \n'
765     txt += ' echo "MonitorJobID=`echo $MonitorJobID`" | tee -a $RUNTIME_AREA/$repo \n'
766     txt += ' echo "MonitorID=`echo $MonitorID`" | tee -a $RUNTIME_AREA/$repo\n'
767 gutsche 1.3 txt += ' exit 1\n'
768     txt += ' fi\n'
769     txt += '\n'
770     txt += ' echo "Change to working directory: $WORKING_DIR"\n'
771     txt += ' cd $WORKING_DIR\n'
772     txt += self.wsSetupCMSOSGEnvironment_()
773     txt += 'fi\n'
774 slacapra 1.1
775     # Prepare JobType-specific part
776     scram = self.scram.commandName()
777     txt += '\n\n'
778     txt += 'echo "### SPECIFIC JOB SETUP ENVIRONMENT ###"\n'
779     txt += scram+' project CMSSW '+self.version+'\n'
780     txt += 'status=$?\n'
781     txt += 'if [ $status != 0 ] ; then\n'
782 gutsche 1.7 txt += ' echo "SET_EXE_ENV 10034 ==>ERROR CMSSW '+self.version+' not found on `hostname`" \n'
783 gutsche 1.3 txt += ' echo "JOB_EXIT_STATUS = 10034"\n'
784 gutsche 1.7 txt += ' echo "JobExitCode=10034" | tee -a $RUNTIME_AREA/$repo\n'
785 slacapra 1.1 txt += ' dumpStatus $RUNTIME_AREA/$repo\n'
786 gutsche 1.13 txt += ' rm -f $RUNTIME_AREA/$repo \n'
787     txt += ' echo "MonitorJobID=`echo $MonitorJobID`" | tee -a $RUNTIME_AREA/$repo \n'
788     txt += ' echo "MonitorID=`echo $MonitorID`" | tee -a $RUNTIME_AREA/$repo\n'
789 gutsche 1.3 ## OLI_Daniele
790     txt += ' if [ $middleware == OSG ]; then \n'
791     txt += ' echo "Remove working directory: $WORKING_DIR"\n'
792     txt += ' cd $RUNTIME_AREA\n'
793     txt += ' /bin/rm -rf $WORKING_DIR\n'
794     txt += ' if [ -d $WORKING_DIR ] ;then\n'
795 gutsche 1.7 txt += ' echo "SET_CMS_ENV 10018 ==> OSG $WORKING_DIR could not be deleted on WN `hostname` after CMSSW CMSSW_0_6_1 not found on `hostname`"\n'
796     txt += ' echo "JOB_EXIT_STATUS = 10018"\n'
797     txt += ' echo "JobExitCode=10018" | tee -a $RUNTIME_AREA/$repo\n'
798     txt += ' dumpStatus $RUNTIME_AREA/$repo\n'
799 gutsche 1.13 txt += ' rm -f $RUNTIME_AREA/$repo \n'
800     txt += ' echo "MonitorJobID=`echo $MonitorJobID`" | tee -a $RUNTIME_AREA/$repo \n'
801     txt += ' echo "MonitorID=`echo $MonitorID`" | tee -a $RUNTIME_AREA/$repo\n'
802 gutsche 1.3 txt += ' fi\n'
803     txt += ' fi \n'
804     txt += ' exit 1 \n'
805 slacapra 1.1 txt += 'fi \n'
806     txt += 'echo "CMSSW_VERSION = '+self.version+'"\n'
807     txt += 'cd '+self.version+'\n'
808     ### needed grep for bug in scramv1 ###
809 corvo 1.58 txt += scram+' runtime -sh\n'
810 slacapra 1.1 txt += 'eval `'+scram+' runtime -sh | grep -v SCRAMRT_LSB_JOBNAME`\n'
811 corvo 1.58 txt += 'echo $PATH\n'
812 slacapra 1.1
813     # Handle the arguments:
814     txt += "\n"
815 gutsche 1.7 txt += "## number of arguments (first argument always jobnumber)\n"
816 slacapra 1.1 txt += "\n"
817 mkirn 1.32 # txt += "narg=$#\n"
818     txt += "if [ $nargs -lt 2 ]\n"
819 slacapra 1.1 txt += "then\n"
820 mkirn 1.33 txt += " echo 'SET_EXE_ENV 1 ==> ERROR Too few arguments' +$nargs+ \n"
821 gutsche 1.3 txt += ' echo "JOB_EXIT_STATUS = 50113"\n'
822 gutsche 1.7 txt += ' echo "JobExitCode=50113" | tee -a $RUNTIME_AREA/$repo\n'
823 slacapra 1.1 txt += ' dumpStatus $RUNTIME_AREA/$repo\n'
824 gutsche 1.13 txt += ' rm -f $RUNTIME_AREA/$repo \n'
825     txt += ' echo "MonitorJobID=`echo $MonitorJobID`" | tee -a $RUNTIME_AREA/$repo \n'
826     txt += ' echo "MonitorID=`echo $MonitorID`" | tee -a $RUNTIME_AREA/$repo\n'
827 gutsche 1.3 ## OLI_Daniele
828     txt += ' if [ $middleware == OSG ]; then \n'
829     txt += ' echo "Remove working directory: $WORKING_DIR"\n'
830     txt += ' cd $RUNTIME_AREA\n'
831     txt += ' /bin/rm -rf $WORKING_DIR\n'
832     txt += ' if [ -d $WORKING_DIR ] ;then\n'
833 gutsche 1.7 txt += ' echo "SET_EXE_ENV 50114 ==> OSG $WORKING_DIR could not be deleted on WN `hostname` after Too few arguments for CRAB job wrapper"\n'
834     txt += ' echo "JOB_EXIT_STATUS = 50114"\n'
835     txt += ' echo "JobExitCode=50114" | tee -a $RUNTIME_AREA/$repo\n'
836     txt += ' dumpStatus $RUNTIME_AREA/$repo\n'
837 gutsche 1.13 txt += ' rm -f $RUNTIME_AREA/$repo \n'
838     txt += ' echo "MonitorJobID=`echo $MonitorJobID`" | tee -a $RUNTIME_AREA/$repo \n'
839     txt += ' echo "MonitorID=`echo $MonitorID`" | tee -a $RUNTIME_AREA/$repo\n'
840 gutsche 1.3 txt += ' fi\n'
841     txt += ' fi \n'
842 slacapra 1.1 txt += " exit 1\n"
843     txt += "fi\n"
844     txt += "\n"
845    
846     # Prepare job-specific part
847     job = common.job_list[nj]
848 spiga 1.42 if self.pset != None: #CarlosDaniele
849     pset = os.path.basename(job.configFilename())
850     txt += '\n'
851     if (self.datasetPath): # standard job
852     #txt += 'InputFiles=$2\n'
853     txt += 'InputFiles=${args[1]}\n'
854     txt += 'MaxEvents=${args[2]}\n'
855     txt += 'SkipEvents=${args[3]}\n'
856     txt += 'echo "Inputfiles:<$InputFiles>"\n'
857     txt += 'sed "s#{\'INPUT\'}#$InputFiles#" $RUNTIME_AREA/'+pset+' > pset_tmp_1.cfg\n'
858     txt += 'echo "MaxEvents:<$MaxEvents>"\n'
859 gutsche 1.46 txt += 'sed "s#INPUTMAXEVENTS#$MaxEvents#" pset_tmp_1.cfg > pset_tmp_2.cfg\n'
860 spiga 1.42 txt += 'echo "SkipEvents:<$SkipEvents>"\n'
861 gutsche 1.46 txt += 'sed "s#INPUTSKIPEVENTS#$SkipEvents#" pset_tmp_2.cfg > pset.cfg\n'
862 spiga 1.42 else: # pythia like job
863     if (self.sourceSeed):
864 spiga 1.57 txt += 'FirstRun=${args[1]}\n'
865     txt += 'echo "FirstRun: <$FirstRun>"\n'
866     txt += 'sed "s#\<INPUTFIRSTRUN\>#$FirstRun#" $RUNTIME_AREA/'+pset+' > tmp_1.cfg\n'
867     else:
868     txt += '# Copy untouched pset\n'
869     txt += 'cp $RUNTIME_AREA/'+pset+' tmp_1.cfg\n'
870     if (self.sourceSeed):
871 spiga 1.42 # txt += 'Seed=$2\n'
872 spiga 1.57 txt += 'Seed=${args[2]}\n'
873 spiga 1.42 txt += 'echo "Seed: <$Seed>"\n'
874 spiga 1.57 txt += 'sed "s#\<INPUT\>#$Seed#" tmp_1.cfg > tmp_2.cfg\n'
875 spiga 1.42 if (self.sourceSeedVtx):
876     # txt += 'VtxSeed=$3\n'
877 spiga 1.57 txt += 'VtxSeed=${args[3]}\n'
878 spiga 1.42 txt += 'echo "VtxSeed: <$VtxSeed>"\n'
879 spiga 1.57 txt += 'sed "s#INPUTVTX#$VtxSeed#" tmp_2.cfg > pset.cfg\n'
880 spiga 1.42 else:
881 spiga 1.57 txt += 'mv tmp_2.cfg pset.cfg\n'
882 slacapra 1.28 else:
883 spiga 1.57 txt += 'mv tmp_1.cfg pset.cfg\n'
884     # txt += '# Copy untouched pset\n'
885     # txt += 'cp $RUNTIME_AREA/'+pset+' pset.cfg\n'
886 slacapra 1.24
887 slacapra 1.1
888     if len(self.additional_inbox_files) > 0:
889     for file in self.additional_inbox_files:
890 mkirn 1.31 relFile = file.split("/")[-1]
891     txt += 'if [ -e $RUNTIME_AREA/'+relFile+' ] ; then\n'
892     txt += ' cp $RUNTIME_AREA/'+relFile+' .\n'
893     txt += ' chmod +x '+relFile+'\n'
894 slacapra 1.1 txt += 'fi\n'
895     pass
896    
897 spiga 1.42 if self.pset != None: #CarlosDaniele
898     txt += 'echo "### END JOB SETUP ENVIRONMENT ###"\n\n'
899    
900     txt += '\n'
901     txt += 'echo "***** cat pset.cfg *********"\n'
902     txt += 'cat pset.cfg\n'
903     txt += 'echo "****** end pset.cfg ********"\n'
904     txt += '\n'
905     # txt += 'echo "***** cat pset1.cfg *********"\n'
906     # txt += 'cat pset1.cfg\n'
907     # txt += 'echo "****** end pset1.cfg ********"\n'
908 gutsche 1.3 return txt
909    
910     def wsBuildExe(self, nj):
911     """
912     Put in the script the commands to build an executable
913     or a library.
914     """
915    
916     txt = ""
917    
918     if os.path.isfile(self.tgzNameWithPath):
919     txt += 'echo "tar xzvf $RUNTIME_AREA/'+os.path.basename(self.tgzNameWithPath)+'"\n'
920     txt += 'tar xzvf $RUNTIME_AREA/'+os.path.basename(self.tgzNameWithPath)+'\n'
921     txt += 'untar_status=$? \n'
922     txt += 'if [ $untar_status -ne 0 ]; then \n'
923     txt += ' echo "SET_EXE 1 ==> ERROR Untarring .tgz file failed"\n'
924     txt += ' echo "JOB_EXIT_STATUS = $untar_status" \n'
925 gutsche 1.7 txt += ' echo "JobExitCode=$untar_status" | tee -a $RUNTIME_AREA/$repo\n'
926 gutsche 1.3 txt += ' if [ $middleware == OSG ]; then \n'
927     txt += ' echo "Remove working directory: $WORKING_DIR"\n'
928     txt += ' cd $RUNTIME_AREA\n'
929     txt += ' /bin/rm -rf $WORKING_DIR\n'
930     txt += ' if [ -d $WORKING_DIR ] ;then\n'
931 gutsche 1.13 txt += ' echo "SET_EXE 50999 ==> OSG $WORKING_DIR could not be deleted on WN `hostname` after Untarring .tgz file failed"\n'
932     txt += ' echo "JOB_EXIT_STATUS = 50999"\n'
933     txt += ' echo "JobExitCode=50999" | tee -a $RUNTIME_AREA/$repo\n'
934     txt += ' dumpStatus $RUNTIME_AREA/$repo\n'
935     txt += ' rm -f $RUNTIME_AREA/$repo \n'
936     txt += ' echo "MonitorJobID=`echo $MonitorJobID`" | tee -a $RUNTIME_AREA/$repo \n'
937     txt += ' echo "MonitorID=`echo $MonitorID`" | tee -a $RUNTIME_AREA/$repo\n'
938 gutsche 1.3 txt += ' fi\n'
939     txt += ' fi \n'
940     txt += ' \n'
941 gutsche 1.7 txt += ' exit 1 \n'
942 gutsche 1.3 txt += 'else \n'
943     txt += ' echo "Successful untar" \n'
944     txt += 'fi \n'
945 gutsche 1.50 txt += '\n'
946     txt += 'echo "Include ProdAgentApi in PYTHONPATH"\n'
947     txt += 'if [ -z "$PYTHONPATH" ]; then\n'
948     txt += ' export PYTHONPATH=ProdAgentApi\n'
949     txt += 'else\n'
950     txt += ' export PYTHONPATH=ProdAgentApi:${PYTHONPATH}\n'
951     txt += 'fi\n'
952     txt += '\n'
953    
954 gutsche 1.3 pass
955    
956 slacapra 1.1 return txt
957    
958     def modifySteeringCards(self, nj):
959     """
960     modify the card provided by the user,
961     writing a new card into share dir
962     """
963    
964     def executableName(self):
965 spiga 1.42 if self.pset == None: #CarlosDaniele
966     return "sh "
967     else:
968     return self.executable
969 slacapra 1.1
970     def executableArgs(self):
971 spiga 1.42 if self.pset == None:#CarlosDaniele
972     return self.scriptExe + " $NJob"
973     else:
974     return " -p pset.cfg"
975 slacapra 1.1
976     def inputSandbox(self, nj):
977     """
978     Returns a list of filenames to be put in JDL input sandbox.
979     """
980     inp_box = []
981 slacapra 1.53 # # dict added to delete duplicate from input sandbox file list
982     # seen = {}
983 slacapra 1.1 ## code
984     if os.path.isfile(self.tgzNameWithPath):
985     inp_box.append(self.tgzNameWithPath)
986 corvo 1.58 if os.path.isfile(self.MLtgzfile):
987     inp_box.append(self.MLtgzfile)
988 slacapra 1.1 ## config
989 spiga 1.42 if not self.pset is None: #CarlosDaniele
990 corvo 1.56 inp_box.append(common.work_space.pathForTgz() + 'job/' + self.configFilename())
991 slacapra 1.1 ## additional input files
992 gutsche 1.3 #for file in self.additional_inbox_files:
993     # inp_box.append(common.work_space.cwdDir()+file)
994 slacapra 1.1 return inp_box
995    
996     def outputSandbox(self, nj):
997     """
998     Returns a list of filenames to be put in JDL output sandbox.
999     """
1000     out_box = []
1001    
1002     ## User Declared output files
1003 slacapra 1.54 for out in (self.output_file+self.output_file_sandbox):
1004 slacapra 1.1 n_out = nj + 1
1005     out_box.append(self.numberFile_(out,str(n_out)))
1006     return out_box
1007    
1008     def prepareSteeringCards(self):
1009     """
1010     Make initial modifications of the user's steering card file.
1011     """
1012     return
1013    
1014     def wsRenameOutput(self, nj):
1015     """
1016     Returns part of a job script which renames the produced files.
1017     """
1018    
1019     txt = '\n'
1020 gutsche 1.7 txt += '# directory content\n'
1021     txt += 'ls \n'
1022 slacapra 1.54
1023     for fileWithSuffix in (self.output_file+self.output_file_sandbox):
1024 slacapra 1.1 output_file_num = self.numberFile_(fileWithSuffix, '$NJob')
1025     txt += '\n'
1026 gutsche 1.7 txt += '# check output file\n'
1027 slacapra 1.1 txt += 'ls '+fileWithSuffix+'\n'
1028 fanzago 1.18 txt += 'ls_result=$?\n'
1029     txt += 'if [ $ls_result -ne 0 ] ; then\n'
1030     txt += ' echo "ERROR: Problem with output file"\n'
1031 gutsche 1.7 if common.scheduler.boss_scheduler_name == 'condor_g':
1032     txt += ' if [ $middleware == OSG ]; then \n'
1033     txt += ' echo "prepare dummy output file"\n'
1034     txt += ' echo "Processing of job output failed" > $RUNTIME_AREA/'+output_file_num+'\n'
1035     txt += ' fi \n'
1036 slacapra 1.1 txt += 'else\n'
1037     txt += ' cp '+fileWithSuffix+' $RUNTIME_AREA/'+output_file_num+'\n'
1038     txt += 'fi\n'
1039    
1040 gutsche 1.7 txt += 'cd $RUNTIME_AREA\n'
1041 fanzago 1.18 txt += 'cd $RUNTIME_AREA\n'
1042 gutsche 1.3 ### OLI_DANIELE
1043     txt += 'if [ $middleware == OSG ]; then\n'
1044     txt += ' cd $RUNTIME_AREA\n'
1045     txt += ' echo "Remove working directory: $WORKING_DIR"\n'
1046     txt += ' /bin/rm -rf $WORKING_DIR\n'
1047     txt += ' if [ -d $WORKING_DIR ] ;then\n'
1048 gutsche 1.7 txt += ' echo "SET_EXE 60999 ==> OSG $WORKING_DIR could not be deleted on WN `hostname` after cleanup of WN"\n'
1049     txt += ' echo "JOB_EXIT_STATUS = 60999"\n'
1050     txt += ' echo "JobExitCode=60999" | tee -a $RUNTIME_AREA/$repo\n'
1051     txt += ' dumpStatus $RUNTIME_AREA/$repo\n'
1052 gutsche 1.13 txt += ' rm -f $RUNTIME_AREA/$repo \n'
1053     txt += ' echo "MonitorJobID=`echo $MonitorJobID`" | tee -a $RUNTIME_AREA/$repo \n'
1054     txt += ' echo "MonitorID=`echo $MonitorID`" | tee -a $RUNTIME_AREA/$repo\n'
1055 gutsche 1.3 txt += ' fi\n'
1056     txt += 'fi\n'
1057     txt += '\n'
1058 slacapra 1.54
1059     file_list = ''
1060     ## Add to filelist only files to be possibly copied to SE
1061     for fileWithSuffix in self.output_file:
1062     output_file_num = self.numberFile_(fileWithSuffix, '$NJob')
1063     file_list=file_list+output_file_num+' '
1064     file_list=file_list[:-1]
1065     txt += 'file_list="'+file_list+'"\n'
1066    
1067 slacapra 1.1 return txt
1068    
1069     def numberFile_(self, file, txt):
1070     """
1071     append _'txt' before last extension of a file
1072     """
1073     p = string.split(file,".")
1074     # take away last extension
1075     name = p[0]
1076     for x in p[1:-1]:
1077     name=name+"."+x
1078     # add "_txt"
1079     if len(p)>1:
1080     ext = p[len(p)-1]
1081     result = name + '_' + txt + "." + ext
1082     else:
1083     result = name + '_' + txt
1084    
1085     return result
1086    
1087 slacapra 1.41 def getRequirements(self):
1088 slacapra 1.1 """
1089     return job requirements to add to jdl files
1090     """
1091     req = ''
1092 slacapra 1.47 if self.version:
1093 slacapra 1.10 req='Member("VO-cms-' + \
1094 slacapra 1.47 self.version + \
1095 slacapra 1.10 '", other.GlueHostApplicationSoftwareRunTimeEnvironment)'
1096 gutsche 1.35
1097     req = req + ' && (other.GlueHostNetworkAdapterOutboundIP)'
1098    
1099 slacapra 1.1 return req
1100 gutsche 1.3
1101     def configFilename(self):
1102     """ return the config filename """
1103     return self.name()+'.cfg'
1104    
1105     ### OLI_DANIELE
1106     def wsSetupCMSOSGEnvironment_(self):
1107     """
1108     Returns part of a job script which is prepares
1109     the execution environment and which is common for all CMS jobs.
1110     """
1111     txt = '\n'
1112     txt += ' echo "### SETUP CMS OSG ENVIRONMENT ###"\n'
1113     txt += ' if [ -f $GRID3_APP_DIR/cmssoft/cmsset_default.sh ] ;then\n'
1114     txt += ' # Use $GRID3_APP_DIR/cmssoft/cmsset_default.sh to setup cms software\n'
1115     txt += ' source $GRID3_APP_DIR/cmssoft/cmsset_default.sh '+self.version+'\n'
1116 mkirn 1.40 txt += ' elif [ -f $OSG_APP/cmssoft/cms/cmsset_default.sh ] ;then\n'
1117     txt += ' # Use $OSG_APP/cmssoft/cms/cmsset_default.sh to setup cms software\n'
1118     txt += ' source $OSG_APP/cmssoft/cms/cmsset_default.sh '+self.version+'\n'
1119 gutsche 1.3 txt += ' else\n'
1120 mkirn 1.40 txt += ' echo "SET_CMS_ENV 10020 ==> ERROR $GRID3_APP_DIR/cmssoft/cmsset_default.sh and $OSG_APP/cmssoft/cms/cmsset_default.sh file not found"\n'
1121 gutsche 1.3 txt += ' echo "JOB_EXIT_STATUS = 10020"\n'
1122     txt += ' echo "JobExitCode=10020" | tee -a $RUNTIME_AREA/$repo\n'
1123     txt += ' dumpStatus $RUNTIME_AREA/$repo\n'
1124 gutsche 1.13 txt += ' rm -f $RUNTIME_AREA/$repo \n'
1125     txt += ' echo "MonitorJobID=`echo $MonitorJobID`" | tee -a $RUNTIME_AREA/$repo \n'
1126     txt += ' echo "MonitorID=`echo $MonitorID`" | tee -a $RUNTIME_AREA/$repo\n'
1127 gutsche 1.7 txt += ' exit 1\n'
1128 gutsche 1.3 txt += '\n'
1129     txt += ' echo "Remove working directory: $WORKING_DIR"\n'
1130     txt += ' cd $RUNTIME_AREA\n'
1131     txt += ' /bin/rm -rf $WORKING_DIR\n'
1132     txt += ' if [ -d $WORKING_DIR ] ;then\n'
1133 mkirn 1.40 txt += ' echo "SET_CMS_ENV 10017 ==> OSG $WORKING_DIR could not be deleted on WN `hostname` after $GRID3_APP_DIR/cmssoft/cmsset_default.sh and $OSG_APP/cmssoft/cms/cmsset_default.sh file not found"\n'
1134 gutsche 1.7 txt += ' echo "JOB_EXIT_STATUS = 10017"\n'
1135     txt += ' echo "JobExitCode=10017" | tee -a $RUNTIME_AREA/$repo\n'
1136     txt += ' dumpStatus $RUNTIME_AREA/$repo\n'
1137 gutsche 1.13 txt += ' rm -f $RUNTIME_AREA/$repo \n'
1138     txt += ' echo "MonitorJobID=`echo $MonitorJobID`" | tee -a $RUNTIME_AREA/$repo \n'
1139     txt += ' echo "MonitorID=`echo $MonitorID`" | tee -a $RUNTIME_AREA/$repo\n'
1140 gutsche 1.3 txt += ' fi\n'
1141     txt += '\n'
1142 gutsche 1.7 txt += ' exit 1\n'
1143 gutsche 1.3 txt += ' fi\n'
1144     txt += '\n'
1145     txt += ' echo "SET_CMS_ENV 0 ==> setup cms environment ok"\n'
1146     txt += ' echo " END SETUP CMS OSG ENVIRONMENT "\n'
1147    
1148     return txt
1149    
1150     ### OLI_DANIELE
1151     def wsSetupCMSLCGEnvironment_(self):
1152     """
1153     Returns part of a job script which is prepares
1154     the execution environment and which is common for all CMS jobs.
1155     """
1156     txt = ' \n'
1157     txt += ' echo " ### SETUP CMS LCG ENVIRONMENT ### "\n'
1158     txt += ' if [ ! $VO_CMS_SW_DIR ] ;then\n'
1159     txt += ' echo "SET_CMS_ENV 10031 ==> ERROR CMS software dir not found on WN `hostname`"\n'
1160     txt += ' echo "JOB_EXIT_STATUS = 10031" \n'
1161     txt += ' echo "JobExitCode=10031" | tee -a $RUNTIME_AREA/$repo\n'
1162     txt += ' dumpStatus $RUNTIME_AREA/$repo\n'
1163 gutsche 1.13 txt += ' rm -f $RUNTIME_AREA/$repo \n'
1164     txt += ' echo "MonitorJobID=`echo $MonitorJobID`" | tee -a $RUNTIME_AREA/$repo \n'
1165     txt += ' echo "MonitorID=`echo $MonitorID`" | tee -a $RUNTIME_AREA/$repo\n'
1166 gutsche 1.7 txt += ' exit 1\n'
1167 gutsche 1.3 txt += ' else\n'
1168     txt += ' echo "Sourcing environment... "\n'
1169     txt += ' if [ ! -s $VO_CMS_SW_DIR/cmsset_default.sh ] ;then\n'
1170     txt += ' echo "SET_CMS_ENV 10020 ==> ERROR cmsset_default.sh file not found into dir $VO_CMS_SW_DIR"\n'
1171     txt += ' echo "JOB_EXIT_STATUS = 10020"\n'
1172     txt += ' echo "JobExitCode=10020" | tee -a $RUNTIME_AREA/$repo\n'
1173     txt += ' dumpStatus $RUNTIME_AREA/$repo\n'
1174 gutsche 1.13 txt += ' rm -f $RUNTIME_AREA/$repo \n'
1175     txt += ' echo "MonitorJobID=`echo $MonitorJobID`" | tee -a $RUNTIME_AREA/$repo \n'
1176     txt += ' echo "MonitorID=`echo $MonitorID`" | tee -a $RUNTIME_AREA/$repo\n'
1177 gutsche 1.7 txt += ' exit 1\n'
1178 gutsche 1.3 txt += ' fi\n'
1179     txt += ' echo "sourcing $VO_CMS_SW_DIR/cmsset_default.sh"\n'
1180     txt += ' source $VO_CMS_SW_DIR/cmsset_default.sh\n'
1181     txt += ' result=$?\n'
1182     txt += ' if [ $result -ne 0 ]; then\n'
1183     txt += ' echo "SET_CMS_ENV 10032 ==> ERROR problem sourcing $VO_CMS_SW_DIR/cmsset_default.sh"\n'
1184     txt += ' echo "JOB_EXIT_STATUS = 10032"\n'
1185     txt += ' echo "JobExitCode=10032" | tee -a $RUNTIME_AREA/$repo\n'
1186     txt += ' dumpStatus $RUNTIME_AREA/$repo\n'
1187 gutsche 1.13 txt += ' rm -f $RUNTIME_AREA/$repo \n'
1188     txt += ' echo "MonitorJobID=`echo $MonitorJobID`" | tee -a $RUNTIME_AREA/$repo \n'
1189     txt += ' echo "MonitorID=`echo $MonitorID`" | tee -a $RUNTIME_AREA/$repo\n'
1190 gutsche 1.7 txt += ' exit 1\n'
1191 gutsche 1.3 txt += ' fi\n'
1192     txt += ' fi\n'
1193     txt += ' \n'
1194     txt += ' string=`cat /etc/redhat-release`\n'
1195     txt += ' echo $string\n'
1196     txt += ' if [[ $string = *alhalla* ]]; then\n'
1197     txt += ' echo "SCRAM_ARCH= $SCRAM_ARCH"\n'
1198     txt += ' elif [[ $string = *Enterprise* ]] || [[ $string = *cientific* ]]; then\n'
1199     txt += ' export SCRAM_ARCH=slc3_ia32_gcc323\n'
1200     txt += ' echo "SCRAM_ARCH= $SCRAM_ARCH"\n'
1201     txt += ' else\n'
1202 gutsche 1.7 txt += ' echo "SET_CMS_ENV 10033 ==> ERROR OS unknown, LCG environment not initialized"\n'
1203 gutsche 1.3 txt += ' echo "JOB_EXIT_STATUS = 10033"\n'
1204     txt += ' echo "JobExitCode=10033" | tee -a $RUNTIME_AREA/$repo\n'
1205     txt += ' dumpStatus $RUNTIME_AREA/$repo\n'
1206 gutsche 1.13 txt += ' rm -f $RUNTIME_AREA/$repo \n'
1207     txt += ' echo "MonitorJobID=`echo $MonitorJobID`" | tee -a $RUNTIME_AREA/$repo \n'
1208     txt += ' echo "MonitorID=`echo $MonitorID`" | tee -a $RUNTIME_AREA/$repo\n'
1209 gutsche 1.7 txt += ' exit 1\n'
1210 gutsche 1.3 txt += ' fi\n'
1211     txt += ' echo "SET_CMS_ENV 0 ==> setup cms environment ok"\n'
1212     txt += ' echo "### END SETUP CMS LCG ENVIRONMENT ###"\n'
1213     return txt
1214 gutsche 1.5
1215     def setParam_(self, param, value):
1216     self._params[param] = value
1217    
1218     def getParams(self):
1219     return self._params
1220 gutsche 1.8
1221     def setTaskid_(self):
1222     self._taskId = self.cfg_params['taskId']
1223    
1224     def getTaskid(self):
1225     return self._taskId
1226 gutsche 1.35
1227     #######################################################################
1228     def uniquelist(self, old):
1229     """
1230     remove duplicates from a list
1231     """
1232     nd={}
1233     for e in old:
1234     nd[e]=0
1235     return nd.keys()