ViewVC Help
View File | Revision Log | Show Annotations | Root Listing
root/cvsroot/COMP/CRAB/python/cms_cmssw.py
Revision: 1.80
Committed: Thu May 10 14:34:13 2007 UTC (17 years, 11 months ago) by slacapra
Content type: text/x-python
Branch: MAIN
Changes since 1.79: +14 -5 lines
Log Message:
DBS2 is default

File Contents

# User Rev Content
1 slacapra 1.1 from JobType import JobType
2     from crab_logger import Logger
3     from crab_exceptions import *
4     from crab_util import *
5     import common
6 gutsche 1.3 import PsetManipulator
7 slacapra 1.41 import DataDiscovery
8 gutsche 1.66 import DataDiscovery_DBS2
9 slacapra 1.41 import DataLocation
10 slacapra 1.1 import Scram
11    
12 slacapra 1.70 import os, string, re, shutil, glob
13 slacapra 1.1
14     class Cmssw(JobType):
15 gutsche 1.38 def __init__(self, cfg_params, ncjobs):
16 slacapra 1.1 JobType.__init__(self, 'CMSSW')
17     common.logger.debug(3,'CMSSW::__init__')
18    
19 gutsche 1.3 # Marco.
20     self._params = {}
21     self.cfg_params = cfg_params
22 gutsche 1.38
23 gutsche 1.72 try:
24     self.MaxTarBallSize = float(self.cfg_params['EDG.maxtarballsize'])
25     except KeyError:
26 corvo 1.79 self.MaxTarBallSize = 100.0
27 gutsche 1.72
28 gutsche 1.44 # number of jobs requested to be created, limit obj splitting
29 gutsche 1.38 self.ncjobs = ncjobs
30    
31 slacapra 1.1 log = common.logger
32    
33     self.scram = Scram.Scram(cfg_params)
34     self.additional_inbox_files = []
35     self.scriptExe = ''
36     self.executable = ''
37 slacapra 1.71 self.executable_arch = self.scram.getArch()
38 slacapra 1.1 self.tgz_name = 'default.tgz'
39 corvo 1.56 self.scriptName = 'CMSSW.sh'
40 spiga 1.42 self.pset = '' #scrip use case Da
41     self.datasetPath = '' #scrip use case Da
42 gutsche 1.3
43 gutsche 1.50 # set FJR file name
44     self.fjrFileName = 'crab_fjr.xml'
45    
46 slacapra 1.1 self.version = self.scram.getSWVersion()
47 slacapra 1.55 common.taskDB.setDict('codeVersion',self.version)
48 gutsche 1.5 self.setParam_('application', self.version)
49 slacapra 1.47
50 slacapra 1.1 ### collect Data cards
51 gutsche 1.66
52     ## get DBS mode
53     try:
54 slacapra 1.80 self.use_dbs_1 = int(self.cfg_params['CMSSW.use_dbs_1'])
55 gutsche 1.66 except KeyError:
56 slacapra 1.80 self.use_dbs_1 = 0
57 gutsche 1.66
58 slacapra 1.1 try:
59 slacapra 1.9 tmp = cfg_params['CMSSW.datasetpath']
60     log.debug(6, "CMSSW::CMSSW(): datasetPath = "+tmp)
61     if string.lower(tmp)=='none':
62     self.datasetPath = None
63 slacapra 1.21 self.selectNoInput = 1
64 slacapra 1.9 else:
65     self.datasetPath = tmp
66 slacapra 1.21 self.selectNoInput = 0
67 slacapra 1.1 except KeyError:
68 gutsche 1.3 msg = "Error: datasetpath not defined "
69 slacapra 1.1 raise CrabException(msg)
70 gutsche 1.5
71     # ML monitoring
72     # split dataset path style: /PreProdR3Minbias/SIM/GEN-SIM
73 slacapra 1.9 if not self.datasetPath:
74     self.setParam_('dataset', 'None')
75     self.setParam_('owner', 'None')
76     else:
77     datasetpath_split = self.datasetPath.split("/")
78 slacapra 1.80 <<<<<<< cms_cmssw.py
79     if self.use_dbs_1 == 1 :
80     self.setParam_('dataset', datasetpath_split[1])
81     self.setParam_('owner', datasetpath_split[-1])
82     else:
83     self.setParam_('dataset', datasetpath_split[1])
84     self.setParam_('owner', datasetpath_split[2])
85     =======
86 corvo 1.79 self.setParam_('dataset', datasetpath_split[1])
87     self.setParam_('owner', datasetpath_split[-1])
88 slacapra 1.80 >>>>>>> 1.79
89 slacapra 1.9
90 gutsche 1.8 self.setTaskid_()
91     self.setParam_('taskId', self.cfg_params['taskId'])
92 gutsche 1.5
93 slacapra 1.1 self.dataTiers = []
94    
95     ## now the application
96     try:
97     self.executable = cfg_params['CMSSW.executable']
98 gutsche 1.5 self.setParam_('exe', self.executable)
99 slacapra 1.1 log.debug(6, "CMSSW::CMSSW(): executable = "+self.executable)
100     msg = "Default executable cmsRun overridden. Switch to " + self.executable
101     log.debug(3,msg)
102     except KeyError:
103     self.executable = 'cmsRun'
104 gutsche 1.5 self.setParam_('exe', self.executable)
105 slacapra 1.1 msg = "User executable not defined. Use cmsRun"
106     log.debug(3,msg)
107     pass
108    
109     try:
110     self.pset = cfg_params['CMSSW.pset']
111     log.debug(6, "Cmssw::Cmssw(): PSet file = "+self.pset)
112 spiga 1.42 if self.pset.lower() != 'none' :
113     if (not os.path.exists(self.pset)):
114     raise CrabException("User defined PSet file "+self.pset+" does not exist")
115     else:
116     self.pset = None
117 slacapra 1.1 except KeyError:
118     raise CrabException("PSet file missing. Cannot run cmsRun ")
119    
120     # output files
121 slacapra 1.53 ## stuff which must be returned always via sandbox
122     self.output_file_sandbox = []
123    
124     # add fjr report by default via sandbox
125     self.output_file_sandbox.append(self.fjrFileName)
126    
127     # other output files to be returned via sandbox or copied to SE
128 slacapra 1.1 try:
129     self.output_file = []
130     tmp = cfg_params['CMSSW.output_file']
131     if tmp != '':
132     tmpOutFiles = string.split(cfg_params['CMSSW.output_file'],',')
133     log.debug(7, 'cmssw::cmssw(): output files '+str(tmpOutFiles))
134     for tmp in tmpOutFiles:
135     tmp=string.strip(tmp)
136     self.output_file.append(tmp)
137     pass
138     else:
139 gutsche 1.50 log.message("No output file defined: only stdout/err and the CRAB Framework Job Report will be available")
140 slacapra 1.1 pass
141     pass
142     except KeyError:
143 gutsche 1.50 log.message("No output file defined: only stdout/err and the CRAB Framework Job Report will be available")
144 slacapra 1.1 pass
145    
146     # script_exe file as additional file in inputSandbox
147     try:
148 slacapra 1.10 self.scriptExe = cfg_params['USER.script_exe']
149     if self.scriptExe != '':
150     if not os.path.isfile(self.scriptExe):
151 slacapra 1.64 msg ="ERROR. file "+self.scriptExe+" not found"
152 slacapra 1.10 raise CrabException(msg)
153 spiga 1.42 self.additional_inbox_files.append(string.strip(self.scriptExe))
154 slacapra 1.1 except KeyError:
155 spiga 1.42 self.scriptExe = ''
156 slacapra 1.70
157 spiga 1.42 #CarlosDaniele
158     if self.datasetPath == None and self.pset == None and self.scriptExe == '' :
159 slacapra 1.70 msg ="Error. script_exe not defined"
160 spiga 1.42 raise CrabException(msg)
161    
162 slacapra 1.1 ## additional input files
163     try:
164 slacapra 1.29 tmpAddFiles = string.split(cfg_params['USER.additional_input_files'],',')
165 slacapra 1.70 for tmp in tmpAddFiles:
166     tmp = string.strip(tmp)
167     dirname = ''
168     if not tmp[0]=="/": dirname = "."
169 corvo 1.79 files = glob.glob(os.path.join(dirname, tmp))
170 slacapra 1.70 for file in files:
171     if not os.path.exists(file):
172     raise CrabException("Additional input file not found: "+file)
173 slacapra 1.45 pass
174 corvo 1.79 storedFile = common.work_space.pathForTgz()+file
175 slacapra 1.70 shutil.copyfile(file, storedFile)
176     self.additional_inbox_files.append(string.strip(storedFile))
177 slacapra 1.1 pass
178     pass
179 slacapra 1.70 common.logger.debug(5,"Additional input files: "+str(self.additional_inbox_files))
180 slacapra 1.1 except KeyError:
181     pass
182    
183 slacapra 1.9 # files per job
184 slacapra 1.1 try:
185 gutsche 1.35 if (cfg_params['CMSSW.files_per_jobs']):
186     raise CrabException("files_per_jobs no longer supported. Quitting.")
187 gutsche 1.3 except KeyError:
188 gutsche 1.35 pass
189 gutsche 1.3
190 slacapra 1.9 ## Events per job
191 gutsche 1.3 try:
192 slacapra 1.10 self.eventsPerJob =int( cfg_params['CMSSW.events_per_job'])
193 slacapra 1.9 self.selectEventsPerJob = 1
194 gutsche 1.3 except KeyError:
195 slacapra 1.9 self.eventsPerJob = -1
196     self.selectEventsPerJob = 0
197    
198 slacapra 1.22 ## number of jobs
199     try:
200     self.theNumberOfJobs =int( cfg_params['CMSSW.number_of_jobs'])
201     self.selectNumberOfJobs = 1
202     except KeyError:
203     self.theNumberOfJobs = 0
204     self.selectNumberOfJobs = 0
205 slacapra 1.10
206 gutsche 1.35 try:
207     self.total_number_of_events = int(cfg_params['CMSSW.total_number_of_events'])
208     self.selectTotalNumberEvents = 1
209     except KeyError:
210     self.total_number_of_events = 0
211     self.selectTotalNumberEvents = 0
212    
213 spiga 1.42 if self.pset != None: #CarlosDaniele
214     if ( (self.selectTotalNumberEvents + self.selectEventsPerJob + self.selectNumberOfJobs) != 2 ):
215     msg = 'Must define exactly two of total_number_of_events, events_per_job, or number_of_jobs.'
216     raise CrabException(msg)
217     else:
218     if (self.selectNumberOfJobs == 0):
219     msg = 'Must specify number_of_jobs.'
220     raise CrabException(msg)
221 gutsche 1.35
222 slacapra 1.22 ## source seed for pythia
223     try:
224     self.sourceSeed = int(cfg_params['CMSSW.pythia_seed'])
225     except KeyError:
226 slacapra 1.23 self.sourceSeed = None
227     common.logger.debug(5,"No seed given")
228 slacapra 1.22
229 slacapra 1.28 try:
230     self.sourceSeedVtx = int(cfg_params['CMSSW.vtx_seed'])
231     except KeyError:
232     self.sourceSeedVtx = None
233     common.logger.debug(5,"No vertex seed given")
234 spiga 1.57 try:
235     self.firstRun = int(cfg_params['CMSSW.first_run'])
236     except KeyError:
237     self.firstRun = None
238     common.logger.debug(5,"No first run given")
239 spiga 1.42 if self.pset != None: #CarlosDaniele
240     self.PsetEdit = PsetManipulator.PsetManipulator(self.pset) #Daniele Pset
241 gutsche 1.3
242 slacapra 1.1 #DBSDLS-start
243     ## Initialize the variables that are extracted from DBS/DLS and needed in other places of the code
244     self.maxEvents=0 # max events available ( --> check the requested nb. of evts in Creator.py)
245     self.DBSPaths={} # all dbs paths requested ( --> input to the site local discovery script)
246 gutsche 1.35 self.jobDestination=[] # Site destination(s) for each job (list of lists)
247 slacapra 1.1 ## Perform the data location and discovery (based on DBS/DLS)
248 slacapra 1.9 ## SL: Don't if NONE is specified as input (pythia use case)
249 gutsche 1.35 blockSites = {}
250 slacapra 1.9 if self.datasetPath:
251 gutsche 1.35 blockSites = self.DataDiscoveryAndLocation(cfg_params)
252 slacapra 1.1 #DBSDLS-end
253    
254     self.tgzNameWithPath = self.getTarBall(self.executable)
255 slacapra 1.10
256 slacapra 1.9 ## Select Splitting
257 spiga 1.42 if self.selectNoInput:
258     if self.pset == None: #CarlosDaniele
259     self.jobSplittingForScript()
260     else:
261     self.jobSplittingNoInput()
262 corvo 1.56 else:
263     self.jobSplittingByBlocks(blockSites)
264 gutsche 1.5
265 slacapra 1.22 # modify Pset
266 spiga 1.42 if self.pset != None: #CarlosDaniele
267 corvo 1.79 try:
268     if (self.datasetPath): # standard job
269     # allow to processa a fraction of events in a file
270     self.PsetEdit.inputModule("INPUT")
271     self.PsetEdit.maxEvent("INPUTMAXEVENTS")
272     self.PsetEdit.skipEvent("INPUTSKIPEVENTS")
273     else: # pythia like job
274     self.PsetEdit.maxEvent(self.eventsPerJob)
275     if (self.firstRun):
276     self.PsetEdit.pythiaFirstRun("INPUTFIRSTRUN") #First Run
277     if (self.sourceSeed) :
278     self.PsetEdit.pythiaSeed("INPUT")
279     if (self.sourceSeedVtx) :
280     self.PsetEdit.pythiaSeedVtx("INPUTVTX")
281     # add FrameworkJobReport to parameter-set
282     self.PsetEdit.addCrabFJR(self.fjrFileName)
283     self.PsetEdit.psetWriter(self.configFilename())
284     except:
285     msg='Error while manipuliating ParameterSet: exiting...'
286     raise CrabException(msg)
287 gutsche 1.3
288 slacapra 1.1 def DataDiscoveryAndLocation(self, cfg_params):
289    
290 gutsche 1.3 common.logger.debug(10,"CMSSW::DataDiscoveryAndLocation()")
291    
292     datasetPath=self.datasetPath
293    
294 slacapra 1.1 ## Contact the DBS
295 slacapra 1.41 common.logger.message("Contacting DBS...")
296 slacapra 1.1 try:
297 gutsche 1.66
298 slacapra 1.80 if self.use_dbs_1 == 1 :
299     self.pubdata=DataDiscovery.DataDiscovery(datasetPath, cfg_params)
300     else :
301 gutsche 1.66 self.pubdata=DataDiscovery_DBS2.DataDiscovery_DBS2(datasetPath, cfg_params)
302 slacapra 1.1 self.pubdata.fetchDBSInfo()
303    
304 slacapra 1.41 except DataDiscovery.NotExistingDatasetError, ex :
305 slacapra 1.1 msg = 'ERROR ***: failed Data Discovery in DBS : %s'%ex.getErrorMessage()
306     raise CrabException(msg)
307 slacapra 1.41 except DataDiscovery.NoDataTierinProvenanceError, ex :
308 slacapra 1.1 msg = 'ERROR ***: failed Data Discovery in DBS : %s'%ex.getErrorMessage()
309     raise CrabException(msg)
310 slacapra 1.41 except DataDiscovery.DataDiscoveryError, ex:
311 gutsche 1.66 msg = 'ERROR ***: failed Data Discovery in DBS : %s'%ex.getErrorMessage()
312 slacapra 1.1 raise CrabException(msg)
313 gutsche 1.67 except DataDiscovery_DBS2.NotExistingDatasetError_DBS2, ex :
314     msg = 'ERROR ***: failed Data Discovery in DBS : %s'%ex.getErrorMessage()
315     raise CrabException(msg)
316     except DataDiscovery_DBS2.NoDataTierinProvenanceError_DBS2, ex :
317     msg = 'ERROR ***: failed Data Discovery in DBS : %s'%ex.getErrorMessage()
318     raise CrabException(msg)
319     except DataDiscovery_DBS2.DataDiscoveryError_DBS2, ex:
320     msg = 'ERROR ***: failed Data Discovery in DBS : %s'%ex.getErrorMessage()
321     raise CrabException(msg)
322 slacapra 1.1
323     ## get list of all required data in the form of dbs paths (dbs path = /dataset/datatier/owner)
324 gutsche 1.3 common.logger.message("Required data are :"+self.datasetPath)
325    
326 gutsche 1.35 self.filesbyblock=self.pubdata.getFiles()
327 mkirn 1.37 self.eventsbyblock=self.pubdata.getEventsPerBlock()
328     self.eventsbyfile=self.pubdata.getEventsPerFile()
329 gutsche 1.3
330 slacapra 1.1 ## get max number of events
331     self.maxEvents=self.pubdata.getMaxEvents() ## self.maxEvents used in Creator.py
332 gutsche 1.44 common.logger.message("The number of available events is %s\n"%self.maxEvents)
333 slacapra 1.1
334 slacapra 1.41 common.logger.message("Contacting DLS...")
335 slacapra 1.1 ## Contact the DLS and build a list of sites hosting the fileblocks
336     try:
337 slacapra 1.41 dataloc=DataLocation.DataLocation(self.filesbyblock.keys(),cfg_params)
338 gutsche 1.6 dataloc.fetchDLSInfo()
339 slacapra 1.41 except DataLocation.DataLocationError , ex:
340 slacapra 1.1 msg = 'ERROR ***: failed Data Location in DLS \n %s '%ex.getErrorMessage()
341     raise CrabException(msg)
342    
343    
344 gutsche 1.35 sites = dataloc.getSites()
345     allSites = []
346     listSites = sites.values()
347 slacapra 1.63 for listSite in listSites:
348     for oneSite in listSite:
349 gutsche 1.35 allSites.append(oneSite)
350     allSites = self.uniquelist(allSites)
351 gutsche 1.3
352 gutsche 1.35 common.logger.message("Sites ("+str(len(allSites))+") hosting part/all of dataset: "+str(allSites))
353     common.logger.debug(6, "List of Sites: "+str(allSites))
354     return sites
355 gutsche 1.3
356 gutsche 1.35 def jobSplittingByBlocks(self, blockSites):
357 slacapra 1.9 """
358 gutsche 1.35 Perform job splitting. Jobs run over an integer number of files
359     and no more than one block.
360     ARGUMENT: blockSites: dictionary with blocks as keys and list of host sites as values
361     REQUIRES: self.selectTotalNumberEvents, self.selectEventsPerJob, self.selectNumberofJobs,
362     self.total_number_of_events, self.eventsPerJob, self.theNumberOfJobs,
363     self.maxEvents, self.filesbyblock
364     SETS: self.jobDestination - Site destination(s) for each job (a list of lists)
365     self.total_number_of_jobs - Total # of jobs
366     self.list_of_args - File(s) job will run on (a list of lists)
367     """
368    
369     # ---- Handle the possible job splitting configurations ---- #
370     if (self.selectTotalNumberEvents):
371     totalEventsRequested = self.total_number_of_events
372     if (self.selectEventsPerJob):
373     eventsPerJobRequested = self.eventsPerJob
374     if (self.selectNumberOfJobs):
375     totalEventsRequested = self.theNumberOfJobs * self.eventsPerJob
376    
377     # If user requested all the events in the dataset
378     if (totalEventsRequested == -1):
379     eventsRemaining=self.maxEvents
380     # If user requested more events than are in the dataset
381     elif (totalEventsRequested > self.maxEvents):
382     eventsRemaining = self.maxEvents
383     common.logger.message("Requested "+str(self.total_number_of_events)+ " events, but only "+str(self.maxEvents)+" events are available.")
384     # If user requested less events than are in the dataset
385     else:
386     eventsRemaining = totalEventsRequested
387 slacapra 1.22
388 slacapra 1.41 # If user requested more events per job than are in the dataset
389     if (self.selectEventsPerJob and eventsPerJobRequested > self.maxEvents):
390     eventsPerJobRequested = self.maxEvents
391    
392 gutsche 1.35 # For user info at end
393     totalEventCount = 0
394 gutsche 1.3
395 gutsche 1.35 if (self.selectTotalNumberEvents and self.selectNumberOfJobs):
396     eventsPerJobRequested = int(eventsRemaining/self.theNumberOfJobs)
397 slacapra 1.22
398 gutsche 1.35 if (self.selectNumberOfJobs):
399     common.logger.message("May not create the exact number_of_jobs requested.")
400 slacapra 1.23
401 gutsche 1.38 if ( self.ncjobs == 'all' ) :
402     totalNumberOfJobs = 999999999
403     else :
404     totalNumberOfJobs = self.ncjobs
405    
406    
407 gutsche 1.35 blocks = blockSites.keys()
408     blockCount = 0
409     # Backup variable in case self.maxEvents counted events in a non-included block
410     numBlocksInDataset = len(blocks)
411 gutsche 1.3
412 gutsche 1.35 jobCount = 0
413     list_of_lists = []
414 gutsche 1.3
415 gutsche 1.35 # ---- Iterate over the blocks in the dataset until ---- #
416     # ---- we've met the requested total # of events ---- #
417 gutsche 1.38 while ( (eventsRemaining > 0) and (blockCount < numBlocksInDataset) and (jobCount < totalNumberOfJobs)):
418 gutsche 1.35 block = blocks[blockCount]
419 gutsche 1.44 blockCount += 1
420    
421 gutsche 1.68 if self.eventsbyblock.has_key(block) :
422     numEventsInBlock = self.eventsbyblock[block]
423     common.logger.debug(5,'Events in Block File '+str(numEventsInBlock))
424 slacapra 1.9
425 gutsche 1.68 files = self.filesbyblock[block]
426     numFilesInBlock = len(files)
427     if (numFilesInBlock <= 0):
428     continue
429     fileCount = 0
430    
431     # ---- New block => New job ---- #
432     parString = "\\{"
433     # counter for number of events in files currently worked on
434     filesEventCount = 0
435     # flag if next while loop should touch new file
436     newFile = 1
437     # job event counter
438     jobSkipEventCount = 0
439 slacapra 1.9
440 gutsche 1.68 # ---- Iterate over the files in the block until we've met the requested ---- #
441     # ---- total # of events or we've gone over all the files in this block ---- #
442     while ( (eventsRemaining > 0) and (fileCount < numFilesInBlock) and (jobCount < totalNumberOfJobs) ):
443     file = files[fileCount]
444     if newFile :
445     try:
446     numEventsInFile = self.eventsbyfile[file]
447     common.logger.debug(6, "File "+str(file)+" has "+str(numEventsInFile)+" events")
448     # increase filesEventCount
449     filesEventCount += numEventsInFile
450     # Add file to current job
451     parString += '\\\"' + file + '\\\"\,'
452     newFile = 0
453     except KeyError:
454     common.logger.message("File "+str(file)+" has unknown number of events: skipping")
455 slacapra 1.41
456 gutsche 1.38
457 gutsche 1.68 # if less events in file remain than eventsPerJobRequested
458     if ( filesEventCount - jobSkipEventCount < eventsPerJobRequested ) :
459     # if last file in block
460     if ( fileCount == numFilesInBlock-1 ) :
461     # end job using last file, use remaining events in block
462     # close job and touch new file
463     fullString = parString[:-2]
464     fullString += '\\}'
465     list_of_lists.append([fullString,str(-1),str(jobSkipEventCount)])
466     common.logger.debug(3,"Job "+str(jobCount+1)+" can run over "+str(filesEventCount - jobSkipEventCount)+" events (last file in block).")
467     self.jobDestination.append(blockSites[block])
468     common.logger.debug(5,"Job "+str(jobCount+1)+" Destination: "+str(self.jobDestination[jobCount]))
469     # reset counter
470     jobCount = jobCount + 1
471     totalEventCount = totalEventCount + filesEventCount - jobSkipEventCount
472     eventsRemaining = eventsRemaining - filesEventCount + jobSkipEventCount
473     jobSkipEventCount = 0
474     # reset file
475     parString = "\\{"
476     filesEventCount = 0
477     newFile = 1
478     fileCount += 1
479     else :
480     # go to next file
481     newFile = 1
482     fileCount += 1
483     # if events in file equal to eventsPerJobRequested
484     elif ( filesEventCount - jobSkipEventCount == eventsPerJobRequested ) :
485 gutsche 1.38 # close job and touch new file
486     fullString = parString[:-2]
487     fullString += '\\}'
488 gutsche 1.68 list_of_lists.append([fullString,str(eventsPerJobRequested),str(jobSkipEventCount)])
489     common.logger.debug(3,"Job "+str(jobCount+1)+" can run over "+str(eventsPerJobRequested)+" events.")
490 gutsche 1.38 self.jobDestination.append(blockSites[block])
491     common.logger.debug(5,"Job "+str(jobCount+1)+" Destination: "+str(self.jobDestination[jobCount]))
492     # reset counter
493     jobCount = jobCount + 1
494 gutsche 1.68 totalEventCount = totalEventCount + eventsPerJobRequested
495     eventsRemaining = eventsRemaining - eventsPerJobRequested
496 gutsche 1.38 jobSkipEventCount = 0
497     # reset file
498     parString = "\\{"
499     filesEventCount = 0
500     newFile = 1
501     fileCount += 1
502 gutsche 1.68
503     # if more events in file remain than eventsPerJobRequested
504 gutsche 1.38 else :
505 gutsche 1.68 # close job but don't touch new file
506     fullString = parString[:-2]
507     fullString += '\\}'
508     list_of_lists.append([fullString,str(eventsPerJobRequested),str(jobSkipEventCount)])
509     common.logger.debug(3,"Job "+str(jobCount+1)+" can run over "+str(eventsPerJobRequested)+" events.")
510     self.jobDestination.append(blockSites[block])
511     common.logger.debug(5,"Job "+str(jobCount+1)+" Destination: "+str(self.jobDestination[jobCount]))
512     # increase counter
513     jobCount = jobCount + 1
514     totalEventCount = totalEventCount + eventsPerJobRequested
515     eventsRemaining = eventsRemaining - eventsPerJobRequested
516     # calculate skip events for last file
517     # use filesEventCount (contains several files), jobSkipEventCount and eventsPerJobRequest
518     jobSkipEventCount = eventsPerJobRequested - (filesEventCount - jobSkipEventCount - self.eventsbyfile[file])
519     # remove all but the last file
520     filesEventCount = self.eventsbyfile[file]
521     parString = "\\{"
522     parString += '\\\"' + file + '\\\"\,'
523     pass # END if
524     pass # END while (iterate over files in the block)
525 gutsche 1.35 pass # END while (iterate over blocks in the dataset)
526 slacapra 1.41 self.ncjobs = self.total_number_of_jobs = jobCount
527 gutsche 1.38 if (eventsRemaining > 0 and jobCount < totalNumberOfJobs ):
528 gutsche 1.35 common.logger.message("Could not run on all requested events because some blocks not hosted at allowed sites.")
529 mkirn 1.37 common.logger.message("\n"+str(jobCount)+" job(s) can run on "+str(totalEventCount)+" events.\n")
530 slacapra 1.22
531 slacapra 1.9 self.list_of_args = list_of_lists
532     return
533    
534 slacapra 1.21 def jobSplittingNoInput(self):
535 slacapra 1.9 """
536     Perform job splitting based on number of event per job
537     """
538     common.logger.debug(5,'Splitting per events')
539     common.logger.message('Required '+str(self.eventsPerJob)+' events per job ')
540 slacapra 1.22 common.logger.message('Required '+str(self.theNumberOfJobs)+' jobs in total ')
541 slacapra 1.9 common.logger.message('Required '+str(self.total_number_of_events)+' events in total ')
542    
543 slacapra 1.10 if (self.total_number_of_events < 0):
544     msg='Cannot split jobs per Events with "-1" as total number of events'
545     raise CrabException(msg)
546    
547 slacapra 1.22 if (self.selectEventsPerJob):
548 spiga 1.65 if (self.selectTotalNumberEvents):
549     self.total_number_of_jobs = int(self.total_number_of_events/self.eventsPerJob)
550     elif(self.selectNumberOfJobs) :
551     self.total_number_of_jobs =self.theNumberOfJobs
552     self.total_number_of_events =int(self.theNumberOfJobs*self.eventsPerJob)
553    
554 slacapra 1.22 elif (self.selectNumberOfJobs) :
555     self.total_number_of_jobs = self.theNumberOfJobs
556     self.eventsPerJob = int(self.total_number_of_events/self.total_number_of_jobs)
557 spiga 1.65
558 slacapra 1.9 common.logger.debug(5,'N jobs '+str(self.total_number_of_jobs))
559    
560     # is there any remainder?
561     check = int(self.total_number_of_events) - (int(self.total_number_of_jobs)*self.eventsPerJob)
562    
563     common.logger.debug(5,'Check '+str(check))
564    
565 gutsche 1.35 common.logger.message(str(self.total_number_of_jobs)+' jobs can be created, each for '+str(self.eventsPerJob)+' for a total of '+str(self.total_number_of_jobs*self.eventsPerJob)+' events')
566 slacapra 1.9 if check > 0:
567 gutsche 1.35 common.logger.message('Warning: asked '+str(self.total_number_of_events)+' but can do only '+str(int(self.total_number_of_jobs)*self.eventsPerJob))
568 slacapra 1.9
569 slacapra 1.10 # argument is seed number.$i
570 slacapra 1.9 self.list_of_args = []
571     for i in range(self.total_number_of_jobs):
572 gutsche 1.35 ## Since there is no input, any site is good
573 corvo 1.79 # self.jobDestination.append(["Any"])
574 spiga 1.42 self.jobDestination.append([""]) #must be empty to write correctly the xml
575 corvo 1.79 args=''
576 spiga 1.57 if (self.firstRun):
577     ## pythia first run
578 corvo 1.79 #self.list_of_args.append([(str(self.firstRun)+str(i))])
579     args=args+(str(self.firstRun)+str(i))
580 spiga 1.57 else:
581     ## no first run
582 corvo 1.79 #self.list_of_args.append([str(i)])
583     args=args+str(i)
584 slacapra 1.23 if (self.sourceSeed):
585 slacapra 1.28 if (self.sourceSeedVtx):
586 corvo 1.79 ## pythia + vtx random seed
587     #self.list_of_args.append([
588     # str(self.sourceSeed)+str(i),
589     # str(self.sourceSeedVtx)+str(i)
590     # ])
591     args=args+str(',')+str(self.sourceSeed)+str(i)+str(',')+str(self.sourceSeedVtx)+str(i)
592     else:
593     ## only pythia random seed
594     #self.list_of_args.append([(str(self.sourceSeed)+str(i))])
595     args=args +str(',')+str(self.sourceSeed)+str(i)
596     else:
597     ## no random seed
598     if str(args)=='': args=args+(str(self.firstRun)+str(i))
599     arguments=args.split(',')
600     if len(arguments)==3:self.list_of_args.append([str(arguments[0]),str(arguments[1]),str(arguments[2])])
601     elif len(arguments)==2:self.list_of_args.append([str(arguments[0]),str(arguments[1])])
602     else :self.list_of_args.append([str(arguments[0])])
603    
604     # print self.list_of_args
605 gutsche 1.3
606     return
607    
608 spiga 1.42
609     def jobSplittingForScript(self):#CarlosDaniele
610     """
611     Perform job splitting based on number of job
612     """
613     common.logger.debug(5,'Splitting per job')
614     common.logger.message('Required '+str(self.theNumberOfJobs)+' jobs in total ')
615    
616     self.total_number_of_jobs = self.theNumberOfJobs
617    
618     common.logger.debug(5,'N jobs '+str(self.total_number_of_jobs))
619    
620     common.logger.message(str(self.total_number_of_jobs)+' jobs can be created')
621    
622     # argument is seed number.$i
623     self.list_of_args = []
624     for i in range(self.total_number_of_jobs):
625     ## Since there is no input, any site is good
626     # self.jobDestination.append(["Any"])
627     self.jobDestination.append([""])
628     ## no random seed
629     self.list_of_args.append([str(i)])
630     return
631    
632 gutsche 1.3 def split(self, jobParams):
633    
634     common.jobDB.load()
635     #### Fabio
636     njobs = self.total_number_of_jobs
637 slacapra 1.9 arglist = self.list_of_args
638 gutsche 1.3 # create the empty structure
639     for i in range(njobs):
640     jobParams.append("")
641    
642     for job in range(njobs):
643 slacapra 1.17 jobParams[job] = arglist[job]
644     # print str(arglist[job])
645     # print jobParams[job]
646 gutsche 1.3 common.jobDB.setArguments(job, jobParams[job])
647 gutsche 1.35 common.logger.debug(5,"Job "+str(job)+" Destination: "+str(self.jobDestination[job]))
648     common.jobDB.setDestination(job, self.jobDestination[job])
649 gutsche 1.3
650     common.jobDB.save()
651     return
652    
653     def getJobTypeArguments(self, nj, sched):
654 slacapra 1.17 result = ''
655     for i in common.jobDB.arguments(nj):
656     result=result+str(i)+" "
657     return result
658 gutsche 1.3
659     def numberOfJobs(self):
660     # Fabio
661     return self.total_number_of_jobs
662    
663 slacapra 1.1 def getTarBall(self, exe):
664     """
665     Return the TarBall with lib and exe
666     """
667    
668     # if it exist, just return it
669 corvo 1.56 #
670     # Marco. Let's start to use relative path for Boss XML files
671     #
672     self.tgzNameWithPath = common.work_space.pathForTgz()+'share/'+self.tgz_name
673 slacapra 1.1 if os.path.exists(self.tgzNameWithPath):
674     return self.tgzNameWithPath
675    
676     # Prepare a tar gzipped file with user binaries.
677     self.buildTar_(exe)
678    
679     return string.strip(self.tgzNameWithPath)
680    
681     def buildTar_(self, executable):
682    
683     # First of all declare the user Scram area
684     swArea = self.scram.getSWArea_()
685     #print "swArea = ", swArea
686 slacapra 1.63 # swVersion = self.scram.getSWVersion()
687     # print "swVersion = ", swVersion
688 slacapra 1.1 swReleaseTop = self.scram.getReleaseTop_()
689     #print "swReleaseTop = ", swReleaseTop
690    
691     ## check if working area is release top
692     if swReleaseTop == '' or swArea == swReleaseTop:
693     return
694    
695 slacapra 1.61 import tarfile
696     try: # create tar ball
697     tar = tarfile.open(self.tgzNameWithPath, "w:gz")
698     ## First find the executable
699 corvo 1.79 if (self.executable != ''):
700 slacapra 1.61 exeWithPath = self.scram.findFile_(executable)
701     if ( not exeWithPath ):
702     raise CrabException('User executable '+executable+' not found')
703    
704     ## then check if it's private or not
705     if exeWithPath.find(swReleaseTop) == -1:
706     # the exe is private, so we must ship
707     common.logger.debug(5,"Exe "+exeWithPath+" to be tarred")
708     path = swArea+'/'
709 corvo 1.79 exe = string.replace(exeWithPath, path,'')
710     tar.add(path+exe,exe)
711 slacapra 1.61 pass
712     else:
713     # the exe is from release, we'll find it on WN
714     pass
715    
716     ## Now get the libraries: only those in local working area
717     libDir = 'lib'
718     lib = swArea+'/' +libDir
719     common.logger.debug(5,"lib "+lib+" to be tarred")
720     if os.path.exists(lib):
721     tar.add(lib,libDir)
722    
723     ## Now check if module dir is present
724     moduleDir = 'module'
725     module = swArea + '/' + moduleDir
726     if os.path.isdir(module):
727     tar.add(module,moduleDir)
728    
729     ## Now check if any data dir(s) is present
730     swAreaLen=len(swArea)
731     for root, dirs, files in os.walk(swArea):
732     if "data" in dirs:
733     common.logger.debug(5,"data "+root+"/data"+" to be tarred")
734     tar.add(root+"/data",root[swAreaLen:]+"/data")
735    
736     ## Add ProdAgent dir to tar
737     paDir = 'ProdAgentApi'
738     pa = os.environ['CRABDIR'] + '/' + 'ProdAgentApi'
739     if os.path.isdir(pa):
740     tar.add(pa,paDir)
741    
742     common.logger.debug(5,"Files added to "+self.tgzNameWithPath+" : "+str(tar.getnames()))
743     tar.close()
744     except :
745     raise CrabException('Could not create tar-ball')
746 gutsche 1.72
747     ## check for tarball size
748     tarballinfo = os.stat(self.tgzNameWithPath)
749     if ( tarballinfo.st_size > self.MaxTarBallSize*1024*1024 ) :
750     raise CrabException('Input sandbox size of ' + str(float(tarballinfo.st_size)/1024.0/1024.0) + ' MB is larger than the allowed ' + str(self.MaxTarBallSize) + ' MB input sandbox limit and not supported by the used GRID submission system. Please make sure that no unnecessary files are in all data directories in your local CMSSW project area as they are automatically packed into the input sandbox.')
751    
752 slacapra 1.61 ## create tar-ball with ML stuff
753 corvo 1.58 self.MLtgzfile = common.work_space.pathForTgz()+'share/MLfiles.tgz'
754 slacapra 1.61 try:
755     tar = tarfile.open(self.MLtgzfile, "w:gz")
756     path=os.environ['CRABDIR'] + '/python/'
757     for file in ['report.py', 'DashboardAPI.py', 'Logger.py', 'ProcInfo.py', 'apmon.py', 'parseCrabFjr.py']:
758     tar.add(path+file,file)
759     common.logger.debug(5,"Files added to "+self.MLtgzfile+" : "+str(tar.getnames()))
760     tar.close()
761     except :
762 corvo 1.58 raise CrabException('Could not create ML files tar-ball')
763    
764 slacapra 1.1 return
765    
766     def wsSetupEnvironment(self, nj):
767     """
768     Returns part of a job script which prepares
769     the execution environment for the job 'nj'.
770     """
771     # Prepare JobType-independent part
772 gutsche 1.3 txt = ''
773    
774     ## OLI_Daniele at this level middleware already known
775    
776     txt += 'if [ $middleware == LCG ]; then \n'
777     txt += self.wsSetupCMSLCGEnvironment_()
778     txt += 'elif [ $middleware == OSG ]; then\n'
779 gutsche 1.43 txt += ' WORKING_DIR=`/bin/mktemp -d $OSG_WN_TMP/cms_XXXXXXXXXXXX`\n'
780     txt += ' echo "Created working directory: $WORKING_DIR"\n'
781 gutsche 1.3 txt += ' if [ ! -d $WORKING_DIR ] ;then\n'
782 gutsche 1.7 txt += ' echo "SET_CMS_ENV 10016 ==> OSG $WORKING_DIR could not be created on WN `hostname`"\n'
783     txt += ' echo "JOB_EXIT_STATUS = 10016"\n'
784     txt += ' echo "JobExitCode=10016" | tee -a $RUNTIME_AREA/$repo\n'
785     txt += ' dumpStatus $RUNTIME_AREA/$repo\n'
786 gutsche 1.13 txt += ' rm -f $RUNTIME_AREA/$repo \n'
787     txt += ' echo "MonitorJobID=`echo $MonitorJobID`" | tee -a $RUNTIME_AREA/$repo \n'
788     txt += ' echo "MonitorID=`echo $MonitorID`" | tee -a $RUNTIME_AREA/$repo\n'
789 gutsche 1.3 txt += ' exit 1\n'
790     txt += ' fi\n'
791     txt += '\n'
792     txt += ' echo "Change to working directory: $WORKING_DIR"\n'
793     txt += ' cd $WORKING_DIR\n'
794     txt += self.wsSetupCMSOSGEnvironment_()
795     txt += 'fi\n'
796 slacapra 1.1
797     # Prepare JobType-specific part
798     scram = self.scram.commandName()
799     txt += '\n\n'
800     txt += 'echo "### SPECIFIC JOB SETUP ENVIRONMENT ###"\n'
801     txt += scram+' project CMSSW '+self.version+'\n'
802     txt += 'status=$?\n'
803     txt += 'if [ $status != 0 ] ; then\n'
804 gutsche 1.7 txt += ' echo "SET_EXE_ENV 10034 ==>ERROR CMSSW '+self.version+' not found on `hostname`" \n'
805 gutsche 1.3 txt += ' echo "JOB_EXIT_STATUS = 10034"\n'
806 gutsche 1.7 txt += ' echo "JobExitCode=10034" | tee -a $RUNTIME_AREA/$repo\n'
807 slacapra 1.1 txt += ' dumpStatus $RUNTIME_AREA/$repo\n'
808 gutsche 1.13 txt += ' rm -f $RUNTIME_AREA/$repo \n'
809     txt += ' echo "MonitorJobID=`echo $MonitorJobID`" | tee -a $RUNTIME_AREA/$repo \n'
810     txt += ' echo "MonitorID=`echo $MonitorID`" | tee -a $RUNTIME_AREA/$repo\n'
811 gutsche 1.3 ## OLI_Daniele
812     txt += ' if [ $middleware == OSG ]; then \n'
813     txt += ' echo "Remove working directory: $WORKING_DIR"\n'
814     txt += ' cd $RUNTIME_AREA\n'
815     txt += ' /bin/rm -rf $WORKING_DIR\n'
816     txt += ' if [ -d $WORKING_DIR ] ;then\n'
817 gutsche 1.7 txt += ' echo "SET_CMS_ENV 10018 ==> OSG $WORKING_DIR could not be deleted on WN `hostname` after CMSSW CMSSW_0_6_1 not found on `hostname`"\n'
818     txt += ' echo "JOB_EXIT_STATUS = 10018"\n'
819     txt += ' echo "JobExitCode=10018" | tee -a $RUNTIME_AREA/$repo\n'
820     txt += ' dumpStatus $RUNTIME_AREA/$repo\n'
821 gutsche 1.13 txt += ' rm -f $RUNTIME_AREA/$repo \n'
822     txt += ' echo "MonitorJobID=`echo $MonitorJobID`" | tee -a $RUNTIME_AREA/$repo \n'
823     txt += ' echo "MonitorID=`echo $MonitorID`" | tee -a $RUNTIME_AREA/$repo\n'
824 gutsche 1.3 txt += ' fi\n'
825     txt += ' fi \n'
826     txt += ' exit 1 \n'
827 slacapra 1.1 txt += 'fi \n'
828     txt += 'echo "CMSSW_VERSION = '+self.version+'"\n'
829 slacapra 1.71 txt += 'export SCRAM_ARCH='+self.executable_arch+'\n'
830 slacapra 1.1 txt += 'cd '+self.version+'\n'
831     ### needed grep for bug in scramv1 ###
832 corvo 1.58 txt += scram+' runtime -sh\n'
833 slacapra 1.1 txt += 'eval `'+scram+' runtime -sh | grep -v SCRAMRT_LSB_JOBNAME`\n'
834 corvo 1.58 txt += 'echo $PATH\n'
835 slacapra 1.1
836     # Handle the arguments:
837     txt += "\n"
838 gutsche 1.7 txt += "## number of arguments (first argument always jobnumber)\n"
839 slacapra 1.1 txt += "\n"
840 mkirn 1.32 # txt += "narg=$#\n"
841     txt += "if [ $nargs -lt 2 ]\n"
842 slacapra 1.1 txt += "then\n"
843 mkirn 1.33 txt += " echo 'SET_EXE_ENV 1 ==> ERROR Too few arguments' +$nargs+ \n"
844 gutsche 1.3 txt += ' echo "JOB_EXIT_STATUS = 50113"\n'
845 gutsche 1.7 txt += ' echo "JobExitCode=50113" | tee -a $RUNTIME_AREA/$repo\n'
846 slacapra 1.1 txt += ' dumpStatus $RUNTIME_AREA/$repo\n'
847 gutsche 1.13 txt += ' rm -f $RUNTIME_AREA/$repo \n'
848     txt += ' echo "MonitorJobID=`echo $MonitorJobID`" | tee -a $RUNTIME_AREA/$repo \n'
849     txt += ' echo "MonitorID=`echo $MonitorID`" | tee -a $RUNTIME_AREA/$repo\n'
850 gutsche 1.3 ## OLI_Daniele
851     txt += ' if [ $middleware == OSG ]; then \n'
852     txt += ' echo "Remove working directory: $WORKING_DIR"\n'
853     txt += ' cd $RUNTIME_AREA\n'
854     txt += ' /bin/rm -rf $WORKING_DIR\n'
855     txt += ' if [ -d $WORKING_DIR ] ;then\n'
856 gutsche 1.7 txt += ' echo "SET_EXE_ENV 50114 ==> OSG $WORKING_DIR could not be deleted on WN `hostname` after Too few arguments for CRAB job wrapper"\n'
857     txt += ' echo "JOB_EXIT_STATUS = 50114"\n'
858     txt += ' echo "JobExitCode=50114" | tee -a $RUNTIME_AREA/$repo\n'
859     txt += ' dumpStatus $RUNTIME_AREA/$repo\n'
860 gutsche 1.13 txt += ' rm -f $RUNTIME_AREA/$repo \n'
861     txt += ' echo "MonitorJobID=`echo $MonitorJobID`" | tee -a $RUNTIME_AREA/$repo \n'
862     txt += ' echo "MonitorID=`echo $MonitorID`" | tee -a $RUNTIME_AREA/$repo\n'
863 gutsche 1.3 txt += ' fi\n'
864     txt += ' fi \n'
865 slacapra 1.1 txt += " exit 1\n"
866     txt += "fi\n"
867     txt += "\n"
868    
869     # Prepare job-specific part
870     job = common.job_list[nj]
871 spiga 1.42 if self.pset != None: #CarlosDaniele
872     pset = os.path.basename(job.configFilename())
873     txt += '\n'
874     if (self.datasetPath): # standard job
875     #txt += 'InputFiles=$2\n'
876     txt += 'InputFiles=${args[1]}\n'
877     txt += 'MaxEvents=${args[2]}\n'
878     txt += 'SkipEvents=${args[3]}\n'
879     txt += 'echo "Inputfiles:<$InputFiles>"\n'
880 corvo 1.79 txt += 'sed "s#{\'INPUT\'}#$InputFiles#" $RUNTIME_AREA/'+pset+' > pset_tmp_1.cfg\n'
881 spiga 1.42 txt += 'echo "MaxEvents:<$MaxEvents>"\n'
882 corvo 1.79 txt += 'sed "s#INPUTMAXEVENTS#$MaxEvents#" pset_tmp_1.cfg > pset_tmp_2.cfg\n'
883 spiga 1.42 txt += 'echo "SkipEvents:<$SkipEvents>"\n'
884 corvo 1.79 txt += 'sed "s#INPUTSKIPEVENTS#$SkipEvents#" pset_tmp_2.cfg > pset.cfg\n'
885 spiga 1.42 else: # pythia like job
886 corvo 1.79 if (self.sourceSeed):
887     txt += 'FirstRun=${args[1]}\n'
888 spiga 1.57 txt += 'echo "FirstRun: <$FirstRun>"\n'
889 corvo 1.79 txt += 'sed "s#\<INPUTFIRSTRUN\>#$FirstRun#" $RUNTIME_AREA/'+pset+' > tmp_1.cfg\n'
890     else:
891     txt += '# Copy untouched pset\n'
892     txt += 'cp $RUNTIME_AREA/'+pset+' tmp_1.cfg\n'
893 spiga 1.57 if (self.sourceSeed):
894 corvo 1.79 # txt += 'Seed=$2\n'
895     txt += 'Seed=${args[2]}\n'
896 spiga 1.42 txt += 'echo "Seed: <$Seed>"\n'
897 corvo 1.79 txt += 'sed "s#\<INPUT\>#$Seed#" tmp_1.cfg > tmp_2.cfg\n'
898 spiga 1.42 if (self.sourceSeedVtx):
899 corvo 1.79 # txt += 'VtxSeed=$3\n'
900     txt += 'VtxSeed=${args[3]}\n'
901 spiga 1.42 txt += 'echo "VtxSeed: <$VtxSeed>"\n'
902 corvo 1.79 txt += 'sed "s#INPUTVTX#$VtxSeed#" tmp_2.cfg > pset.cfg\n'
903     else:
904     txt += 'mv tmp_2.cfg pset.cfg\n'
905     else:
906     txt += 'mv tmp_1.cfg pset.cfg\n'
907     # txt += '# Copy untouched pset\n'
908     # txt += 'cp $RUNTIME_AREA/'+pset+' pset.cfg\n'
909 slacapra 1.24
910 slacapra 1.1
911     if len(self.additional_inbox_files) > 0:
912     for file in self.additional_inbox_files:
913 mkirn 1.31 relFile = file.split("/")[-1]
914     txt += 'if [ -e $RUNTIME_AREA/'+relFile+' ] ; then\n'
915     txt += ' cp $RUNTIME_AREA/'+relFile+' .\n'
916     txt += ' chmod +x '+relFile+'\n'
917 slacapra 1.1 txt += 'fi\n'
918     pass
919    
920 spiga 1.42 if self.pset != None: #CarlosDaniele
921     txt += 'echo "### END JOB SETUP ENVIRONMENT ###"\n\n'
922    
923     txt += '\n'
924     txt += 'echo "***** cat pset.cfg *********"\n'
925     txt += 'cat pset.cfg\n'
926     txt += 'echo "****** end pset.cfg ********"\n'
927     txt += '\n'
928     # txt += 'echo "***** cat pset1.cfg *********"\n'
929     # txt += 'cat pset1.cfg\n'
930     # txt += 'echo "****** end pset1.cfg ********"\n'
931 gutsche 1.3 return txt
932    
933 slacapra 1.63 def wsBuildExe(self, nj=0):
934 gutsche 1.3 """
935     Put in the script the commands to build an executable
936     or a library.
937     """
938    
939     txt = ""
940    
941     if os.path.isfile(self.tgzNameWithPath):
942     txt += 'echo "tar xzvf $RUNTIME_AREA/'+os.path.basename(self.tgzNameWithPath)+'"\n'
943     txt += 'tar xzvf $RUNTIME_AREA/'+os.path.basename(self.tgzNameWithPath)+'\n'
944     txt += 'untar_status=$? \n'
945     txt += 'if [ $untar_status -ne 0 ]; then \n'
946     txt += ' echo "SET_EXE 1 ==> ERROR Untarring .tgz file failed"\n'
947     txt += ' echo "JOB_EXIT_STATUS = $untar_status" \n'
948 gutsche 1.7 txt += ' echo "JobExitCode=$untar_status" | tee -a $RUNTIME_AREA/$repo\n'
949 gutsche 1.3 txt += ' if [ $middleware == OSG ]; then \n'
950     txt += ' echo "Remove working directory: $WORKING_DIR"\n'
951     txt += ' cd $RUNTIME_AREA\n'
952     txt += ' /bin/rm -rf $WORKING_DIR\n'
953     txt += ' if [ -d $WORKING_DIR ] ;then\n'
954 gutsche 1.13 txt += ' echo "SET_EXE 50999 ==> OSG $WORKING_DIR could not be deleted on WN `hostname` after Untarring .tgz file failed"\n'
955     txt += ' echo "JOB_EXIT_STATUS = 50999"\n'
956     txt += ' echo "JobExitCode=50999" | tee -a $RUNTIME_AREA/$repo\n'
957     txt += ' dumpStatus $RUNTIME_AREA/$repo\n'
958     txt += ' rm -f $RUNTIME_AREA/$repo \n'
959     txt += ' echo "MonitorJobID=`echo $MonitorJobID`" | tee -a $RUNTIME_AREA/$repo \n'
960     txt += ' echo "MonitorID=`echo $MonitorID`" | tee -a $RUNTIME_AREA/$repo\n'
961 gutsche 1.3 txt += ' fi\n'
962     txt += ' fi \n'
963     txt += ' \n'
964 gutsche 1.7 txt += ' exit 1 \n'
965 gutsche 1.3 txt += 'else \n'
966     txt += ' echo "Successful untar" \n'
967     txt += 'fi \n'
968 gutsche 1.50 txt += '\n'
969     txt += 'echo "Include ProdAgentApi in PYTHONPATH"\n'
970     txt += 'if [ -z "$PYTHONPATH" ]; then\n'
971     txt += ' export PYTHONPATH=ProdAgentApi\n'
972     txt += 'else\n'
973     txt += ' export PYTHONPATH=ProdAgentApi:${PYTHONPATH}\n'
974     txt += 'fi\n'
975     txt += '\n'
976    
977 gutsche 1.3 pass
978    
979 slacapra 1.1 return txt
980    
981     def modifySteeringCards(self, nj):
982     """
983     modify the card provided by the user,
984     writing a new card into share dir
985     """
986    
987     def executableName(self):
988 slacapra 1.70 if self.scriptExe: #CarlosDaniele
989 spiga 1.42 return "sh "
990     else:
991     return self.executable
992 slacapra 1.1
993     def executableArgs(self):
994 slacapra 1.70 if self.scriptExe:#CarlosDaniele
995 spiga 1.42 return self.scriptExe + " $NJob"
996     else:
997     return " -p pset.cfg"
998 slacapra 1.1
999     def inputSandbox(self, nj):
1000     """
1001     Returns a list of filenames to be put in JDL input sandbox.
1002     """
1003     inp_box = []
1004 slacapra 1.53 # # dict added to delete duplicate from input sandbox file list
1005     # seen = {}
1006 slacapra 1.1 ## code
1007     if os.path.isfile(self.tgzNameWithPath):
1008     inp_box.append(self.tgzNameWithPath)
1009 corvo 1.58 if os.path.isfile(self.MLtgzfile):
1010     inp_box.append(self.MLtgzfile)
1011 slacapra 1.1 ## config
1012 slacapra 1.70 if not self.pset is None:
1013 corvo 1.56 inp_box.append(common.work_space.pathForTgz() + 'job/' + self.configFilename())
1014 slacapra 1.1 ## additional input files
1015 slacapra 1.70 for file in self.additional_inbox_files:
1016     inp_box.append(file)
1017 slacapra 1.1 return inp_box
1018    
1019     def outputSandbox(self, nj):
1020     """
1021     Returns a list of filenames to be put in JDL output sandbox.
1022     """
1023     out_box = []
1024    
1025     ## User Declared output files
1026 slacapra 1.54 for out in (self.output_file+self.output_file_sandbox):
1027 slacapra 1.1 n_out = nj + 1
1028     out_box.append(self.numberFile_(out,str(n_out)))
1029     return out_box
1030    
1031     def prepareSteeringCards(self):
1032     """
1033     Make initial modifications of the user's steering card file.
1034     """
1035     return
1036    
1037     def wsRenameOutput(self, nj):
1038     """
1039     Returns part of a job script which renames the produced files.
1040     """
1041    
1042     txt = '\n'
1043 gutsche 1.7 txt += '# directory content\n'
1044     txt += 'ls \n'
1045 slacapra 1.54
1046     for fileWithSuffix in (self.output_file+self.output_file_sandbox):
1047 slacapra 1.1 output_file_num = self.numberFile_(fileWithSuffix, '$NJob')
1048     txt += '\n'
1049 gutsche 1.7 txt += '# check output file\n'
1050 slacapra 1.1 txt += 'ls '+fileWithSuffix+'\n'
1051 fanzago 1.18 txt += 'ls_result=$?\n'
1052     txt += 'if [ $ls_result -ne 0 ] ; then\n'
1053     txt += ' echo "ERROR: Problem with output file"\n'
1054 gutsche 1.7 if common.scheduler.boss_scheduler_name == 'condor_g':
1055     txt += ' if [ $middleware == OSG ]; then \n'
1056     txt += ' echo "prepare dummy output file"\n'
1057     txt += ' echo "Processing of job output failed" > $RUNTIME_AREA/'+output_file_num+'\n'
1058     txt += ' fi \n'
1059 slacapra 1.1 txt += 'else\n'
1060     txt += ' cp '+fileWithSuffix+' $RUNTIME_AREA/'+output_file_num+'\n'
1061     txt += 'fi\n'
1062    
1063 gutsche 1.7 txt += 'cd $RUNTIME_AREA\n'
1064 fanzago 1.18 txt += 'cd $RUNTIME_AREA\n'
1065 gutsche 1.3 ### OLI_DANIELE
1066     txt += 'if [ $middleware == OSG ]; then\n'
1067     txt += ' cd $RUNTIME_AREA\n'
1068     txt += ' echo "Remove working directory: $WORKING_DIR"\n'
1069     txt += ' /bin/rm -rf $WORKING_DIR\n'
1070     txt += ' if [ -d $WORKING_DIR ] ;then\n'
1071 gutsche 1.7 txt += ' echo "SET_EXE 60999 ==> OSG $WORKING_DIR could not be deleted on WN `hostname` after cleanup of WN"\n'
1072     txt += ' echo "JOB_EXIT_STATUS = 60999"\n'
1073     txt += ' echo "JobExitCode=60999" | tee -a $RUNTIME_AREA/$repo\n'
1074     txt += ' dumpStatus $RUNTIME_AREA/$repo\n'
1075 gutsche 1.13 txt += ' rm -f $RUNTIME_AREA/$repo \n'
1076     txt += ' echo "MonitorJobID=`echo $MonitorJobID`" | tee -a $RUNTIME_AREA/$repo \n'
1077     txt += ' echo "MonitorID=`echo $MonitorID`" | tee -a $RUNTIME_AREA/$repo\n'
1078 gutsche 1.3 txt += ' fi\n'
1079     txt += 'fi\n'
1080     txt += '\n'
1081 slacapra 1.54
1082     file_list = ''
1083     ## Add to filelist only files to be possibly copied to SE
1084     for fileWithSuffix in self.output_file:
1085     output_file_num = self.numberFile_(fileWithSuffix, '$NJob')
1086     file_list=file_list+output_file_num+' '
1087     file_list=file_list[:-1]
1088     txt += 'file_list="'+file_list+'"\n'
1089    
1090 slacapra 1.1 return txt
1091    
1092     def numberFile_(self, file, txt):
1093     """
1094     append _'txt' before last extension of a file
1095     """
1096     p = string.split(file,".")
1097     # take away last extension
1098     name = p[0]
1099     for x in p[1:-1]:
1100     name=name+"."+x
1101     # add "_txt"
1102     if len(p)>1:
1103     ext = p[len(p)-1]
1104     result = name + '_' + txt + "." + ext
1105     else:
1106     result = name + '_' + txt
1107    
1108     return result
1109    
1110 slacapra 1.63 def getRequirements(self, nj=[]):
1111 slacapra 1.1 """
1112     return job requirements to add to jdl files
1113     """
1114     req = ''
1115 slacapra 1.47 if self.version:
1116 slacapra 1.10 req='Member("VO-cms-' + \
1117 slacapra 1.47 self.version + \
1118 slacapra 1.10 '", other.GlueHostApplicationSoftwareRunTimeEnvironment)'
1119 gutsche 1.35
1120     req = req + ' && (other.GlueHostNetworkAdapterOutboundIP)'
1121    
1122 slacapra 1.1 return req
1123 gutsche 1.3
1124     def configFilename(self):
1125     """ return the config filename """
1126     return self.name()+'.cfg'
1127    
1128     ### OLI_DANIELE
1129     def wsSetupCMSOSGEnvironment_(self):
1130     """
1131     Returns part of a job script which is prepares
1132     the execution environment and which is common for all CMS jobs.
1133     """
1134     txt = '\n'
1135     txt += ' echo "### SETUP CMS OSG ENVIRONMENT ###"\n'
1136     txt += ' if [ -f $GRID3_APP_DIR/cmssoft/cmsset_default.sh ] ;then\n'
1137     txt += ' # Use $GRID3_APP_DIR/cmssoft/cmsset_default.sh to setup cms software\n'
1138     txt += ' source $GRID3_APP_DIR/cmssoft/cmsset_default.sh '+self.version+'\n'
1139 mkirn 1.40 txt += ' elif [ -f $OSG_APP/cmssoft/cms/cmsset_default.sh ] ;then\n'
1140     txt += ' # Use $OSG_APP/cmssoft/cms/cmsset_default.sh to setup cms software\n'
1141     txt += ' source $OSG_APP/cmssoft/cms/cmsset_default.sh '+self.version+'\n'
1142 gutsche 1.3 txt += ' else\n'
1143 mkirn 1.40 txt += ' echo "SET_CMS_ENV 10020 ==> ERROR $GRID3_APP_DIR/cmssoft/cmsset_default.sh and $OSG_APP/cmssoft/cms/cmsset_default.sh file not found"\n'
1144 gutsche 1.3 txt += ' echo "JOB_EXIT_STATUS = 10020"\n'
1145     txt += ' echo "JobExitCode=10020" | tee -a $RUNTIME_AREA/$repo\n'
1146     txt += ' dumpStatus $RUNTIME_AREA/$repo\n'
1147 gutsche 1.13 txt += ' rm -f $RUNTIME_AREA/$repo \n'
1148     txt += ' echo "MonitorJobID=`echo $MonitorJobID`" | tee -a $RUNTIME_AREA/$repo \n'
1149     txt += ' echo "MonitorID=`echo $MonitorID`" | tee -a $RUNTIME_AREA/$repo\n'
1150 gutsche 1.7 txt += ' exit 1\n'
1151 gutsche 1.3 txt += '\n'
1152     txt += ' echo "Remove working directory: $WORKING_DIR"\n'
1153     txt += ' cd $RUNTIME_AREA\n'
1154     txt += ' /bin/rm -rf $WORKING_DIR\n'
1155     txt += ' if [ -d $WORKING_DIR ] ;then\n'
1156 mkirn 1.40 txt += ' echo "SET_CMS_ENV 10017 ==> OSG $WORKING_DIR could not be deleted on WN `hostname` after $GRID3_APP_DIR/cmssoft/cmsset_default.sh and $OSG_APP/cmssoft/cms/cmsset_default.sh file not found"\n'
1157 gutsche 1.7 txt += ' echo "JOB_EXIT_STATUS = 10017"\n'
1158     txt += ' echo "JobExitCode=10017" | tee -a $RUNTIME_AREA/$repo\n'
1159     txt += ' dumpStatus $RUNTIME_AREA/$repo\n'
1160 gutsche 1.13 txt += ' rm -f $RUNTIME_AREA/$repo \n'
1161     txt += ' echo "MonitorJobID=`echo $MonitorJobID`" | tee -a $RUNTIME_AREA/$repo \n'
1162     txt += ' echo "MonitorID=`echo $MonitorID`" | tee -a $RUNTIME_AREA/$repo\n'
1163 gutsche 1.3 txt += ' fi\n'
1164     txt += '\n'
1165 gutsche 1.7 txt += ' exit 1\n'
1166 gutsche 1.3 txt += ' fi\n'
1167     txt += '\n'
1168     txt += ' echo "SET_CMS_ENV 0 ==> setup cms environment ok"\n'
1169     txt += ' echo " END SETUP CMS OSG ENVIRONMENT "\n'
1170    
1171     return txt
1172    
1173     ### OLI_DANIELE
1174     def wsSetupCMSLCGEnvironment_(self):
1175     """
1176     Returns part of a job script which is prepares
1177     the execution environment and which is common for all CMS jobs.
1178     """
1179     txt = ' \n'
1180     txt += ' echo " ### SETUP CMS LCG ENVIRONMENT ### "\n'
1181     txt += ' if [ ! $VO_CMS_SW_DIR ] ;then\n'
1182     txt += ' echo "SET_CMS_ENV 10031 ==> ERROR CMS software dir not found on WN `hostname`"\n'
1183     txt += ' echo "JOB_EXIT_STATUS = 10031" \n'
1184     txt += ' echo "JobExitCode=10031" | tee -a $RUNTIME_AREA/$repo\n'
1185     txt += ' dumpStatus $RUNTIME_AREA/$repo\n'
1186 gutsche 1.13 txt += ' rm -f $RUNTIME_AREA/$repo \n'
1187     txt += ' echo "MonitorJobID=`echo $MonitorJobID`" | tee -a $RUNTIME_AREA/$repo \n'
1188     txt += ' echo "MonitorID=`echo $MonitorID`" | tee -a $RUNTIME_AREA/$repo\n'
1189 gutsche 1.7 txt += ' exit 1\n'
1190 gutsche 1.3 txt += ' else\n'
1191     txt += ' echo "Sourcing environment... "\n'
1192     txt += ' if [ ! -s $VO_CMS_SW_DIR/cmsset_default.sh ] ;then\n'
1193     txt += ' echo "SET_CMS_ENV 10020 ==> ERROR cmsset_default.sh file not found into dir $VO_CMS_SW_DIR"\n'
1194     txt += ' echo "JOB_EXIT_STATUS = 10020"\n'
1195     txt += ' echo "JobExitCode=10020" | tee -a $RUNTIME_AREA/$repo\n'
1196     txt += ' dumpStatus $RUNTIME_AREA/$repo\n'
1197 gutsche 1.13 txt += ' rm -f $RUNTIME_AREA/$repo \n'
1198     txt += ' echo "MonitorJobID=`echo $MonitorJobID`" | tee -a $RUNTIME_AREA/$repo \n'
1199     txt += ' echo "MonitorID=`echo $MonitorID`" | tee -a $RUNTIME_AREA/$repo\n'
1200 gutsche 1.7 txt += ' exit 1\n'
1201 gutsche 1.3 txt += ' fi\n'
1202     txt += ' echo "sourcing $VO_CMS_SW_DIR/cmsset_default.sh"\n'
1203     txt += ' source $VO_CMS_SW_DIR/cmsset_default.sh\n'
1204     txt += ' result=$?\n'
1205     txt += ' if [ $result -ne 0 ]; then\n'
1206     txt += ' echo "SET_CMS_ENV 10032 ==> ERROR problem sourcing $VO_CMS_SW_DIR/cmsset_default.sh"\n'
1207     txt += ' echo "JOB_EXIT_STATUS = 10032"\n'
1208     txt += ' echo "JobExitCode=10032" | tee -a $RUNTIME_AREA/$repo\n'
1209     txt += ' dumpStatus $RUNTIME_AREA/$repo\n'
1210 gutsche 1.13 txt += ' rm -f $RUNTIME_AREA/$repo \n'
1211     txt += ' echo "MonitorJobID=`echo $MonitorJobID`" | tee -a $RUNTIME_AREA/$repo \n'
1212     txt += ' echo "MonitorID=`echo $MonitorID`" | tee -a $RUNTIME_AREA/$repo\n'
1213 gutsche 1.7 txt += ' exit 1\n'
1214 gutsche 1.3 txt += ' fi\n'
1215     txt += ' fi\n'
1216     txt += ' \n'
1217     txt += ' echo "SET_CMS_ENV 0 ==> setup cms environment ok"\n'
1218     txt += ' echo "### END SETUP CMS LCG ENVIRONMENT ###"\n'
1219     return txt
1220 gutsche 1.5
1221     def setParam_(self, param, value):
1222     self._params[param] = value
1223    
1224     def getParams(self):
1225     return self._params
1226 gutsche 1.8
1227     def setTaskid_(self):
1228     self._taskId = self.cfg_params['taskId']
1229    
1230     def getTaskid(self):
1231     return self._taskId
1232 gutsche 1.35
1233     #######################################################################
1234     def uniquelist(self, old):
1235     """
1236     remove duplicates from a list
1237     """
1238     nd={}
1239     for e in old:
1240     nd[e]=0
1241     return nd.keys()