ViewVC Help
View File | Revision Log | Show Annotations | Root Listing
root/cvsroot/COMP/CRAB/python/cms_cmssw.py
Revision: 1.72
Committed: Tue Mar 27 22:29:22 2007 UTC (18 years, 1 month ago) by gutsche
Content type: text/x-python
Branch: MAIN
CVS Tags: CRAB_1_5_0_pre9
Changes since 1.71: +11 -2 lines
Log Message:
Provide check for default.tgz tarball size to limit input sandbox size
for all submission systems.

Condor_G has no sandbox size check as EGEE does, so a size check on the
default.tgz tarball which now also contains the data directories was introduced.

The default is 100 MB.

It can be changed by:

[EDG]

maxtarballsize = X

where X is the size in MB (float).

The switch is not documented in the template crab.cfg not to mislead users.

File Contents

# User Rev Content
1 slacapra 1.1 from JobType import JobType
2     from crab_logger import Logger
3     from crab_exceptions import *
4     from crab_util import *
5     import common
6 gutsche 1.3 import PsetManipulator
7 slacapra 1.41 import DataDiscovery
8 gutsche 1.66 import DataDiscovery_DBS2
9 slacapra 1.41 import DataLocation
10 slacapra 1.1 import Scram
11    
12 slacapra 1.70 import os, string, re, shutil, glob
13 slacapra 1.1
14     class Cmssw(JobType):
15 gutsche 1.38 def __init__(self, cfg_params, ncjobs):
16 slacapra 1.1 JobType.__init__(self, 'CMSSW')
17     common.logger.debug(3,'CMSSW::__init__')
18    
19 gutsche 1.3 # Marco.
20     self._params = {}
21     self.cfg_params = cfg_params
22 gutsche 1.38
23 gutsche 1.72 try:
24     self.MaxTarBallSize = float(self.cfg_params['EDG.maxtarballsize'])
25     except KeyError:
26     self.MaxTarBallSize = 100.0
27    
28 gutsche 1.44 # number of jobs requested to be created, limit obj splitting
29 gutsche 1.38 self.ncjobs = ncjobs
30    
31 slacapra 1.1 log = common.logger
32    
33     self.scram = Scram.Scram(cfg_params)
34     self.additional_inbox_files = []
35     self.scriptExe = ''
36     self.executable = ''
37 slacapra 1.71 self.executable_arch = self.scram.getArch()
38 slacapra 1.1 self.tgz_name = 'default.tgz'
39 corvo 1.56 self.scriptName = 'CMSSW.sh'
40 spiga 1.42 self.pset = '' #scrip use case Da
41     self.datasetPath = '' #scrip use case Da
42 gutsche 1.3
43 gutsche 1.50 # set FJR file name
44     self.fjrFileName = 'crab_fjr.xml'
45    
46 slacapra 1.1 self.version = self.scram.getSWVersion()
47 slacapra 1.55 common.taskDB.setDict('codeVersion',self.version)
48 gutsche 1.5 self.setParam_('application', self.version)
49 slacapra 1.47
50 slacapra 1.1 ### collect Data cards
51 gutsche 1.66
52     ## get DBS mode
53     try:
54     self.use_dbs_2 = int(self.cfg_params['CMSSW.use_dbs_2'])
55     except KeyError:
56     self.use_dbs_2 = 0
57    
58 slacapra 1.1 try:
59 slacapra 1.9 tmp = cfg_params['CMSSW.datasetpath']
60     log.debug(6, "CMSSW::CMSSW(): datasetPath = "+tmp)
61     if string.lower(tmp)=='none':
62     self.datasetPath = None
63 slacapra 1.21 self.selectNoInput = 1
64 slacapra 1.9 else:
65     self.datasetPath = tmp
66 slacapra 1.21 self.selectNoInput = 0
67 slacapra 1.1 except KeyError:
68 gutsche 1.3 msg = "Error: datasetpath not defined "
69 slacapra 1.1 raise CrabException(msg)
70 gutsche 1.5
71     # ML monitoring
72     # split dataset path style: /PreProdR3Minbias/SIM/GEN-SIM
73 slacapra 1.9 if not self.datasetPath:
74     self.setParam_('dataset', 'None')
75     self.setParam_('owner', 'None')
76     else:
77     datasetpath_split = self.datasetPath.split("/")
78     self.setParam_('dataset', datasetpath_split[1])
79     self.setParam_('owner', datasetpath_split[-1])
80    
81 gutsche 1.8 self.setTaskid_()
82     self.setParam_('taskId', self.cfg_params['taskId'])
83 gutsche 1.5
84 slacapra 1.1 self.dataTiers = []
85    
86     ## now the application
87     try:
88     self.executable = cfg_params['CMSSW.executable']
89 gutsche 1.5 self.setParam_('exe', self.executable)
90 slacapra 1.1 log.debug(6, "CMSSW::CMSSW(): executable = "+self.executable)
91     msg = "Default executable cmsRun overridden. Switch to " + self.executable
92     log.debug(3,msg)
93     except KeyError:
94     self.executable = 'cmsRun'
95 gutsche 1.5 self.setParam_('exe', self.executable)
96 slacapra 1.1 msg = "User executable not defined. Use cmsRun"
97     log.debug(3,msg)
98     pass
99    
100     try:
101     self.pset = cfg_params['CMSSW.pset']
102     log.debug(6, "Cmssw::Cmssw(): PSet file = "+self.pset)
103 spiga 1.42 if self.pset.lower() != 'none' :
104     if (not os.path.exists(self.pset)):
105     raise CrabException("User defined PSet file "+self.pset+" does not exist")
106     else:
107     self.pset = None
108 slacapra 1.1 except KeyError:
109     raise CrabException("PSet file missing. Cannot run cmsRun ")
110    
111     # output files
112 slacapra 1.53 ## stuff which must be returned always via sandbox
113     self.output_file_sandbox = []
114    
115     # add fjr report by default via sandbox
116     self.output_file_sandbox.append(self.fjrFileName)
117    
118     # other output files to be returned via sandbox or copied to SE
119 slacapra 1.1 try:
120     self.output_file = []
121     tmp = cfg_params['CMSSW.output_file']
122     if tmp != '':
123     tmpOutFiles = string.split(cfg_params['CMSSW.output_file'],',')
124     log.debug(7, 'cmssw::cmssw(): output files '+str(tmpOutFiles))
125     for tmp in tmpOutFiles:
126     tmp=string.strip(tmp)
127     self.output_file.append(tmp)
128     pass
129     else:
130 gutsche 1.50 log.message("No output file defined: only stdout/err and the CRAB Framework Job Report will be available")
131 slacapra 1.1 pass
132     pass
133     except KeyError:
134 gutsche 1.50 log.message("No output file defined: only stdout/err and the CRAB Framework Job Report will be available")
135 slacapra 1.1 pass
136    
137     # script_exe file as additional file in inputSandbox
138     try:
139 slacapra 1.10 self.scriptExe = cfg_params['USER.script_exe']
140     if self.scriptExe != '':
141     if not os.path.isfile(self.scriptExe):
142 slacapra 1.64 msg ="ERROR. file "+self.scriptExe+" not found"
143 slacapra 1.10 raise CrabException(msg)
144 spiga 1.42 self.additional_inbox_files.append(string.strip(self.scriptExe))
145 slacapra 1.1 except KeyError:
146 spiga 1.42 self.scriptExe = ''
147 slacapra 1.70
148 spiga 1.42 #CarlosDaniele
149     if self.datasetPath == None and self.pset == None and self.scriptExe == '' :
150 slacapra 1.70 msg ="Error. script_exe not defined"
151 spiga 1.42 raise CrabException(msg)
152    
153 slacapra 1.1 ## additional input files
154     try:
155 slacapra 1.29 tmpAddFiles = string.split(cfg_params['USER.additional_input_files'],',')
156 slacapra 1.70 for tmp in tmpAddFiles:
157     tmp = string.strip(tmp)
158     dirname = ''
159     if not tmp[0]=="/": dirname = "."
160     files = glob.glob(os.path.join(dirname, tmp))
161     for file in files:
162     if not os.path.exists(file):
163     raise CrabException("Additional input file not found: "+file)
164 slacapra 1.45 pass
165 slacapra 1.70 storedFile = common.work_space.shareDir()+file
166     shutil.copyfile(file, storedFile)
167     self.additional_inbox_files.append(string.strip(storedFile))
168 slacapra 1.1 pass
169     pass
170 slacapra 1.70 common.logger.debug(5,"Additional input files: "+str(self.additional_inbox_files))
171 slacapra 1.1 except KeyError:
172     pass
173    
174 slacapra 1.9 # files per job
175 slacapra 1.1 try:
176 gutsche 1.35 if (cfg_params['CMSSW.files_per_jobs']):
177     raise CrabException("files_per_jobs no longer supported. Quitting.")
178 gutsche 1.3 except KeyError:
179 gutsche 1.35 pass
180 gutsche 1.3
181 slacapra 1.9 ## Events per job
182 gutsche 1.3 try:
183 slacapra 1.10 self.eventsPerJob =int( cfg_params['CMSSW.events_per_job'])
184 slacapra 1.9 self.selectEventsPerJob = 1
185 gutsche 1.3 except KeyError:
186 slacapra 1.9 self.eventsPerJob = -1
187     self.selectEventsPerJob = 0
188    
189 slacapra 1.22 ## number of jobs
190     try:
191     self.theNumberOfJobs =int( cfg_params['CMSSW.number_of_jobs'])
192     self.selectNumberOfJobs = 1
193     except KeyError:
194     self.theNumberOfJobs = 0
195     self.selectNumberOfJobs = 0
196 slacapra 1.10
197 gutsche 1.35 try:
198     self.total_number_of_events = int(cfg_params['CMSSW.total_number_of_events'])
199     self.selectTotalNumberEvents = 1
200     except KeyError:
201     self.total_number_of_events = 0
202     self.selectTotalNumberEvents = 0
203    
204 spiga 1.42 if self.pset != None: #CarlosDaniele
205     if ( (self.selectTotalNumberEvents + self.selectEventsPerJob + self.selectNumberOfJobs) != 2 ):
206     msg = 'Must define exactly two of total_number_of_events, events_per_job, or number_of_jobs.'
207     raise CrabException(msg)
208     else:
209     if (self.selectNumberOfJobs == 0):
210     msg = 'Must specify number_of_jobs.'
211     raise CrabException(msg)
212 gutsche 1.35
213 slacapra 1.22 ## source seed for pythia
214     try:
215     self.sourceSeed = int(cfg_params['CMSSW.pythia_seed'])
216     except KeyError:
217 slacapra 1.23 self.sourceSeed = None
218     common.logger.debug(5,"No seed given")
219 slacapra 1.22
220 slacapra 1.28 try:
221     self.sourceSeedVtx = int(cfg_params['CMSSW.vtx_seed'])
222     except KeyError:
223     self.sourceSeedVtx = None
224     common.logger.debug(5,"No vertex seed given")
225 spiga 1.57 try:
226     self.firstRun = int(cfg_params['CMSSW.first_run'])
227     except KeyError:
228     self.firstRun = None
229     common.logger.debug(5,"No first run given")
230 spiga 1.42 if self.pset != None: #CarlosDaniele
231     self.PsetEdit = PsetManipulator.PsetManipulator(self.pset) #Daniele Pset
232 gutsche 1.3
233 slacapra 1.1 #DBSDLS-start
234     ## Initialize the variables that are extracted from DBS/DLS and needed in other places of the code
235     self.maxEvents=0 # max events available ( --> check the requested nb. of evts in Creator.py)
236     self.DBSPaths={} # all dbs paths requested ( --> input to the site local discovery script)
237 gutsche 1.35 self.jobDestination=[] # Site destination(s) for each job (list of lists)
238 slacapra 1.1 ## Perform the data location and discovery (based on DBS/DLS)
239 slacapra 1.9 ## SL: Don't if NONE is specified as input (pythia use case)
240 gutsche 1.35 blockSites = {}
241 slacapra 1.9 if self.datasetPath:
242 gutsche 1.35 blockSites = self.DataDiscoveryAndLocation(cfg_params)
243 slacapra 1.1 #DBSDLS-end
244    
245     self.tgzNameWithPath = self.getTarBall(self.executable)
246 slacapra 1.10
247 slacapra 1.9 ## Select Splitting
248 spiga 1.42 if self.selectNoInput:
249     if self.pset == None: #CarlosDaniele
250     self.jobSplittingForScript()
251     else:
252     self.jobSplittingNoInput()
253 corvo 1.56 else:
254     self.jobSplittingByBlocks(blockSites)
255 gutsche 1.5
256 slacapra 1.22 # modify Pset
257 spiga 1.42 if self.pset != None: #CarlosDaniele
258     try:
259     if (self.datasetPath): # standard job
260     # allow to processa a fraction of events in a file
261     self.PsetEdit.inputModule("INPUT")
262     self.PsetEdit.maxEvent("INPUTMAXEVENTS")
263     self.PsetEdit.skipEvent("INPUTSKIPEVENTS")
264     else: # pythia like job
265     self.PsetEdit.maxEvent(self.eventsPerJob)
266 spiga 1.57 if (self.firstRun):
267     self.PsetEdit.pythiaFirstRun("INPUTFIRSTRUN") #First Run
268 spiga 1.42 if (self.sourceSeed) :
269     self.PsetEdit.pythiaSeed("INPUT")
270     if (self.sourceSeedVtx) :
271     self.PsetEdit.pythiaSeedVtx("INPUTVTX")
272 gutsche 1.50 # add FrameworkJobReport to parameter-set
273     self.PsetEdit.addCrabFJR(self.fjrFileName)
274 spiga 1.42 self.PsetEdit.psetWriter(self.configFilename())
275     except:
276     msg='Error while manipuliating ParameterSet: exiting...'
277     raise CrabException(msg)
278 gutsche 1.3
279 slacapra 1.1 def DataDiscoveryAndLocation(self, cfg_params):
280    
281 gutsche 1.3 common.logger.debug(10,"CMSSW::DataDiscoveryAndLocation()")
282    
283     datasetPath=self.datasetPath
284    
285 slacapra 1.1 ## Contact the DBS
286 slacapra 1.41 common.logger.message("Contacting DBS...")
287 slacapra 1.1 try:
288 gutsche 1.66
289     if self.use_dbs_2 == 1 :
290     self.pubdata=DataDiscovery_DBS2.DataDiscovery_DBS2(datasetPath, cfg_params)
291     else :
292     self.pubdata=DataDiscovery.DataDiscovery(datasetPath, cfg_params)
293 slacapra 1.1 self.pubdata.fetchDBSInfo()
294    
295 slacapra 1.41 except DataDiscovery.NotExistingDatasetError, ex :
296 slacapra 1.1 msg = 'ERROR ***: failed Data Discovery in DBS : %s'%ex.getErrorMessage()
297     raise CrabException(msg)
298 slacapra 1.41 except DataDiscovery.NoDataTierinProvenanceError, ex :
299 slacapra 1.1 msg = 'ERROR ***: failed Data Discovery in DBS : %s'%ex.getErrorMessage()
300     raise CrabException(msg)
301 slacapra 1.41 except DataDiscovery.DataDiscoveryError, ex:
302 gutsche 1.66 msg = 'ERROR ***: failed Data Discovery in DBS : %s'%ex.getErrorMessage()
303 slacapra 1.1 raise CrabException(msg)
304 gutsche 1.67 except DataDiscovery_DBS2.NotExistingDatasetError_DBS2, ex :
305     msg = 'ERROR ***: failed Data Discovery in DBS : %s'%ex.getErrorMessage()
306     raise CrabException(msg)
307     except DataDiscovery_DBS2.NoDataTierinProvenanceError_DBS2, ex :
308     msg = 'ERROR ***: failed Data Discovery in DBS : %s'%ex.getErrorMessage()
309     raise CrabException(msg)
310     except DataDiscovery_DBS2.DataDiscoveryError_DBS2, ex:
311     msg = 'ERROR ***: failed Data Discovery in DBS : %s'%ex.getErrorMessage()
312     raise CrabException(msg)
313 slacapra 1.1
314     ## get list of all required data in the form of dbs paths (dbs path = /dataset/datatier/owner)
315 gutsche 1.3 common.logger.message("Required data are :"+self.datasetPath)
316    
317 gutsche 1.35 self.filesbyblock=self.pubdata.getFiles()
318 mkirn 1.37 self.eventsbyblock=self.pubdata.getEventsPerBlock()
319     self.eventsbyfile=self.pubdata.getEventsPerFile()
320 gutsche 1.3
321 slacapra 1.1 ## get max number of events
322     self.maxEvents=self.pubdata.getMaxEvents() ## self.maxEvents used in Creator.py
323 gutsche 1.44 common.logger.message("The number of available events is %s\n"%self.maxEvents)
324 slacapra 1.1
325 slacapra 1.41 common.logger.message("Contacting DLS...")
326 slacapra 1.1 ## Contact the DLS and build a list of sites hosting the fileblocks
327     try:
328 slacapra 1.41 dataloc=DataLocation.DataLocation(self.filesbyblock.keys(),cfg_params)
329 gutsche 1.6 dataloc.fetchDLSInfo()
330 slacapra 1.41 except DataLocation.DataLocationError , ex:
331 slacapra 1.1 msg = 'ERROR ***: failed Data Location in DLS \n %s '%ex.getErrorMessage()
332     raise CrabException(msg)
333    
334    
335 gutsche 1.35 sites = dataloc.getSites()
336     allSites = []
337     listSites = sites.values()
338 slacapra 1.63 for listSite in listSites:
339     for oneSite in listSite:
340 gutsche 1.35 allSites.append(oneSite)
341     allSites = self.uniquelist(allSites)
342 gutsche 1.3
343 gutsche 1.35 common.logger.message("Sites ("+str(len(allSites))+") hosting part/all of dataset: "+str(allSites))
344     common.logger.debug(6, "List of Sites: "+str(allSites))
345     return sites
346 gutsche 1.3
347 gutsche 1.35 def jobSplittingByBlocks(self, blockSites):
348 slacapra 1.9 """
349 gutsche 1.35 Perform job splitting. Jobs run over an integer number of files
350     and no more than one block.
351     ARGUMENT: blockSites: dictionary with blocks as keys and list of host sites as values
352     REQUIRES: self.selectTotalNumberEvents, self.selectEventsPerJob, self.selectNumberofJobs,
353     self.total_number_of_events, self.eventsPerJob, self.theNumberOfJobs,
354     self.maxEvents, self.filesbyblock
355     SETS: self.jobDestination - Site destination(s) for each job (a list of lists)
356     self.total_number_of_jobs - Total # of jobs
357     self.list_of_args - File(s) job will run on (a list of lists)
358     """
359    
360     # ---- Handle the possible job splitting configurations ---- #
361     if (self.selectTotalNumberEvents):
362     totalEventsRequested = self.total_number_of_events
363     if (self.selectEventsPerJob):
364     eventsPerJobRequested = self.eventsPerJob
365     if (self.selectNumberOfJobs):
366     totalEventsRequested = self.theNumberOfJobs * self.eventsPerJob
367    
368     # If user requested all the events in the dataset
369     if (totalEventsRequested == -1):
370     eventsRemaining=self.maxEvents
371     # If user requested more events than are in the dataset
372     elif (totalEventsRequested > self.maxEvents):
373     eventsRemaining = self.maxEvents
374     common.logger.message("Requested "+str(self.total_number_of_events)+ " events, but only "+str(self.maxEvents)+" events are available.")
375     # If user requested less events than are in the dataset
376     else:
377     eventsRemaining = totalEventsRequested
378 slacapra 1.22
379 slacapra 1.41 # If user requested more events per job than are in the dataset
380     if (self.selectEventsPerJob and eventsPerJobRequested > self.maxEvents):
381     eventsPerJobRequested = self.maxEvents
382    
383 gutsche 1.35 # For user info at end
384     totalEventCount = 0
385 gutsche 1.3
386 gutsche 1.35 if (self.selectTotalNumberEvents and self.selectNumberOfJobs):
387     eventsPerJobRequested = int(eventsRemaining/self.theNumberOfJobs)
388 slacapra 1.22
389 gutsche 1.35 if (self.selectNumberOfJobs):
390     common.logger.message("May not create the exact number_of_jobs requested.")
391 slacapra 1.23
392 gutsche 1.38 if ( self.ncjobs == 'all' ) :
393     totalNumberOfJobs = 999999999
394     else :
395     totalNumberOfJobs = self.ncjobs
396    
397    
398 gutsche 1.35 blocks = blockSites.keys()
399     blockCount = 0
400     # Backup variable in case self.maxEvents counted events in a non-included block
401     numBlocksInDataset = len(blocks)
402 gutsche 1.3
403 gutsche 1.35 jobCount = 0
404     list_of_lists = []
405 gutsche 1.3
406 gutsche 1.35 # ---- Iterate over the blocks in the dataset until ---- #
407     # ---- we've met the requested total # of events ---- #
408 gutsche 1.38 while ( (eventsRemaining > 0) and (blockCount < numBlocksInDataset) and (jobCount < totalNumberOfJobs)):
409 gutsche 1.35 block = blocks[blockCount]
410 gutsche 1.44 blockCount += 1
411    
412 gutsche 1.68 if self.eventsbyblock.has_key(block) :
413     numEventsInBlock = self.eventsbyblock[block]
414     common.logger.debug(5,'Events in Block File '+str(numEventsInBlock))
415 slacapra 1.9
416 gutsche 1.68 files = self.filesbyblock[block]
417     numFilesInBlock = len(files)
418     if (numFilesInBlock <= 0):
419     continue
420     fileCount = 0
421    
422     # ---- New block => New job ---- #
423     parString = "\\{"
424     # counter for number of events in files currently worked on
425     filesEventCount = 0
426     # flag if next while loop should touch new file
427     newFile = 1
428     # job event counter
429     jobSkipEventCount = 0
430 slacapra 1.9
431 gutsche 1.68 # ---- Iterate over the files in the block until we've met the requested ---- #
432     # ---- total # of events or we've gone over all the files in this block ---- #
433     while ( (eventsRemaining > 0) and (fileCount < numFilesInBlock) and (jobCount < totalNumberOfJobs) ):
434     file = files[fileCount]
435     if newFile :
436     try:
437     numEventsInFile = self.eventsbyfile[file]
438     common.logger.debug(6, "File "+str(file)+" has "+str(numEventsInFile)+" events")
439     # increase filesEventCount
440     filesEventCount += numEventsInFile
441     # Add file to current job
442     parString += '\\\"' + file + '\\\"\,'
443     newFile = 0
444     except KeyError:
445     common.logger.message("File "+str(file)+" has unknown number of events: skipping")
446 slacapra 1.41
447 gutsche 1.38
448 gutsche 1.68 # if less events in file remain than eventsPerJobRequested
449     if ( filesEventCount - jobSkipEventCount < eventsPerJobRequested ) :
450     # if last file in block
451     if ( fileCount == numFilesInBlock-1 ) :
452     # end job using last file, use remaining events in block
453     # close job and touch new file
454     fullString = parString[:-2]
455     fullString += '\\}'
456     list_of_lists.append([fullString,str(-1),str(jobSkipEventCount)])
457     common.logger.debug(3,"Job "+str(jobCount+1)+" can run over "+str(filesEventCount - jobSkipEventCount)+" events (last file in block).")
458     self.jobDestination.append(blockSites[block])
459     common.logger.debug(5,"Job "+str(jobCount+1)+" Destination: "+str(self.jobDestination[jobCount]))
460     # reset counter
461     jobCount = jobCount + 1
462     totalEventCount = totalEventCount + filesEventCount - jobSkipEventCount
463     eventsRemaining = eventsRemaining - filesEventCount + jobSkipEventCount
464     jobSkipEventCount = 0
465     # reset file
466     parString = "\\{"
467     filesEventCount = 0
468     newFile = 1
469     fileCount += 1
470     else :
471     # go to next file
472     newFile = 1
473     fileCount += 1
474     # if events in file equal to eventsPerJobRequested
475     elif ( filesEventCount - jobSkipEventCount == eventsPerJobRequested ) :
476 gutsche 1.38 # close job and touch new file
477     fullString = parString[:-2]
478     fullString += '\\}'
479 gutsche 1.68 list_of_lists.append([fullString,str(eventsPerJobRequested),str(jobSkipEventCount)])
480     common.logger.debug(3,"Job "+str(jobCount+1)+" can run over "+str(eventsPerJobRequested)+" events.")
481 gutsche 1.38 self.jobDestination.append(blockSites[block])
482     common.logger.debug(5,"Job "+str(jobCount+1)+" Destination: "+str(self.jobDestination[jobCount]))
483     # reset counter
484     jobCount = jobCount + 1
485 gutsche 1.68 totalEventCount = totalEventCount + eventsPerJobRequested
486     eventsRemaining = eventsRemaining - eventsPerJobRequested
487 gutsche 1.38 jobSkipEventCount = 0
488     # reset file
489     parString = "\\{"
490     filesEventCount = 0
491     newFile = 1
492     fileCount += 1
493 gutsche 1.68
494     # if more events in file remain than eventsPerJobRequested
495 gutsche 1.38 else :
496 gutsche 1.68 # close job but don't touch new file
497     fullString = parString[:-2]
498     fullString += '\\}'
499     list_of_lists.append([fullString,str(eventsPerJobRequested),str(jobSkipEventCount)])
500     common.logger.debug(3,"Job "+str(jobCount+1)+" can run over "+str(eventsPerJobRequested)+" events.")
501     self.jobDestination.append(blockSites[block])
502     common.logger.debug(5,"Job "+str(jobCount+1)+" Destination: "+str(self.jobDestination[jobCount]))
503     # increase counter
504     jobCount = jobCount + 1
505     totalEventCount = totalEventCount + eventsPerJobRequested
506     eventsRemaining = eventsRemaining - eventsPerJobRequested
507     # calculate skip events for last file
508     # use filesEventCount (contains several files), jobSkipEventCount and eventsPerJobRequest
509     jobSkipEventCount = eventsPerJobRequested - (filesEventCount - jobSkipEventCount - self.eventsbyfile[file])
510     # remove all but the last file
511     filesEventCount = self.eventsbyfile[file]
512     parString = "\\{"
513     parString += '\\\"' + file + '\\\"\,'
514     pass # END if
515     pass # END while (iterate over files in the block)
516 gutsche 1.35 pass # END while (iterate over blocks in the dataset)
517 slacapra 1.41 self.ncjobs = self.total_number_of_jobs = jobCount
518 gutsche 1.38 if (eventsRemaining > 0 and jobCount < totalNumberOfJobs ):
519 gutsche 1.35 common.logger.message("Could not run on all requested events because some blocks not hosted at allowed sites.")
520 mkirn 1.37 common.logger.message("\n"+str(jobCount)+" job(s) can run on "+str(totalEventCount)+" events.\n")
521 slacapra 1.22
522 slacapra 1.9 self.list_of_args = list_of_lists
523     return
524    
525 slacapra 1.21 def jobSplittingNoInput(self):
526 slacapra 1.9 """
527     Perform job splitting based on number of event per job
528     """
529     common.logger.debug(5,'Splitting per events')
530     common.logger.message('Required '+str(self.eventsPerJob)+' events per job ')
531 slacapra 1.22 common.logger.message('Required '+str(self.theNumberOfJobs)+' jobs in total ')
532 slacapra 1.9 common.logger.message('Required '+str(self.total_number_of_events)+' events in total ')
533    
534 slacapra 1.10 if (self.total_number_of_events < 0):
535     msg='Cannot split jobs per Events with "-1" as total number of events'
536     raise CrabException(msg)
537    
538 slacapra 1.22 if (self.selectEventsPerJob):
539 spiga 1.65 if (self.selectTotalNumberEvents):
540     self.total_number_of_jobs = int(self.total_number_of_events/self.eventsPerJob)
541     elif(self.selectNumberOfJobs) :
542     self.total_number_of_jobs =self.theNumberOfJobs
543     self.total_number_of_events =int(self.theNumberOfJobs*self.eventsPerJob)
544    
545 slacapra 1.22 elif (self.selectNumberOfJobs) :
546     self.total_number_of_jobs = self.theNumberOfJobs
547     self.eventsPerJob = int(self.total_number_of_events/self.total_number_of_jobs)
548 spiga 1.65
549 slacapra 1.9 common.logger.debug(5,'N jobs '+str(self.total_number_of_jobs))
550    
551     # is there any remainder?
552     check = int(self.total_number_of_events) - (int(self.total_number_of_jobs)*self.eventsPerJob)
553    
554     common.logger.debug(5,'Check '+str(check))
555    
556 gutsche 1.35 common.logger.message(str(self.total_number_of_jobs)+' jobs can be created, each for '+str(self.eventsPerJob)+' for a total of '+str(self.total_number_of_jobs*self.eventsPerJob)+' events')
557 slacapra 1.9 if check > 0:
558 gutsche 1.35 common.logger.message('Warning: asked '+str(self.total_number_of_events)+' but can do only '+str(int(self.total_number_of_jobs)*self.eventsPerJob))
559 slacapra 1.9
560 slacapra 1.10 # argument is seed number.$i
561 slacapra 1.9 self.list_of_args = []
562     for i in range(self.total_number_of_jobs):
563 gutsche 1.35 ## Since there is no input, any site is good
564 spiga 1.42 # self.jobDestination.append(["Any"])
565     self.jobDestination.append([""]) #must be empty to write correctly the xml
566 spiga 1.57 args=''
567     if (self.firstRun):
568     ## pythia first run
569     #self.list_of_args.append([(str(self.firstRun)+str(i))])
570     args=args+(str(self.firstRun)+str(i))
571     else:
572     ## no first run
573     #self.list_of_args.append([str(i)])
574     args=args+str(i)
575 slacapra 1.23 if (self.sourceSeed):
576 slacapra 1.28 if (self.sourceSeedVtx):
577     ## pythia + vtx random seed
578 spiga 1.57 #self.list_of_args.append([
579     # str(self.sourceSeed)+str(i),
580     # str(self.sourceSeedVtx)+str(i)
581     # ])
582     args=args+str(',')+str(self.sourceSeed)+str(i)+str(',')+str(self.sourceSeedVtx)+str(i)
583 slacapra 1.28 else:
584     ## only pythia random seed
585 spiga 1.57 #self.list_of_args.append([(str(self.sourceSeed)+str(i))])
586     args=args +str(',')+str(self.sourceSeed)+str(i)
587 slacapra 1.23 else:
588 slacapra 1.28 ## no random seed
589 spiga 1.57 if str(args)=='': args=args+(str(self.firstRun)+str(i))
590     arguments=args.split(',')
591     if len(arguments)==3:self.list_of_args.append([str(arguments[0]),str(arguments[1]),str(arguments[2])])
592     elif len(arguments)==2:self.list_of_args.append([str(arguments[0]),str(arguments[1])])
593     else :self.list_of_args.append([str(arguments[0])])
594    
595     # print self.list_of_args
596 gutsche 1.3
597     return
598    
599 spiga 1.42
600     def jobSplittingForScript(self):#CarlosDaniele
601     """
602     Perform job splitting based on number of job
603     """
604     common.logger.debug(5,'Splitting per job')
605     common.logger.message('Required '+str(self.theNumberOfJobs)+' jobs in total ')
606    
607     self.total_number_of_jobs = self.theNumberOfJobs
608    
609     common.logger.debug(5,'N jobs '+str(self.total_number_of_jobs))
610    
611     common.logger.message(str(self.total_number_of_jobs)+' jobs can be created')
612    
613     # argument is seed number.$i
614     self.list_of_args = []
615     for i in range(self.total_number_of_jobs):
616     ## Since there is no input, any site is good
617     # self.jobDestination.append(["Any"])
618     self.jobDestination.append([""])
619     ## no random seed
620     self.list_of_args.append([str(i)])
621     return
622    
623 gutsche 1.3 def split(self, jobParams):
624    
625     common.jobDB.load()
626     #### Fabio
627     njobs = self.total_number_of_jobs
628 slacapra 1.9 arglist = self.list_of_args
629 gutsche 1.3 # create the empty structure
630     for i in range(njobs):
631     jobParams.append("")
632    
633     for job in range(njobs):
634 slacapra 1.17 jobParams[job] = arglist[job]
635     # print str(arglist[job])
636     # print jobParams[job]
637 gutsche 1.3 common.jobDB.setArguments(job, jobParams[job])
638 gutsche 1.35 common.logger.debug(5,"Job "+str(job)+" Destination: "+str(self.jobDestination[job]))
639     common.jobDB.setDestination(job, self.jobDestination[job])
640 gutsche 1.3
641     common.jobDB.save()
642     return
643    
644     def getJobTypeArguments(self, nj, sched):
645 slacapra 1.17 result = ''
646     for i in common.jobDB.arguments(nj):
647     result=result+str(i)+" "
648     return result
649 gutsche 1.3
650     def numberOfJobs(self):
651     # Fabio
652     return self.total_number_of_jobs
653    
654 slacapra 1.1 def getTarBall(self, exe):
655     """
656     Return the TarBall with lib and exe
657     """
658    
659     # if it exist, just return it
660 corvo 1.56 #
661     # Marco. Let's start to use relative path for Boss XML files
662     #
663     self.tgzNameWithPath = common.work_space.pathForTgz()+'share/'+self.tgz_name
664 slacapra 1.1 if os.path.exists(self.tgzNameWithPath):
665     return self.tgzNameWithPath
666    
667     # Prepare a tar gzipped file with user binaries.
668     self.buildTar_(exe)
669    
670     return string.strip(self.tgzNameWithPath)
671    
672     def buildTar_(self, executable):
673    
674     # First of all declare the user Scram area
675     swArea = self.scram.getSWArea_()
676     #print "swArea = ", swArea
677 slacapra 1.63 # swVersion = self.scram.getSWVersion()
678     # print "swVersion = ", swVersion
679 slacapra 1.1 swReleaseTop = self.scram.getReleaseTop_()
680     #print "swReleaseTop = ", swReleaseTop
681    
682     ## check if working area is release top
683     if swReleaseTop == '' or swArea == swReleaseTop:
684     return
685    
686 slacapra 1.61 import tarfile
687     try: # create tar ball
688     tar = tarfile.open(self.tgzNameWithPath, "w:gz")
689     ## First find the executable
690     if (self.executable != ''):
691     exeWithPath = self.scram.findFile_(executable)
692     if ( not exeWithPath ):
693     raise CrabException('User executable '+executable+' not found')
694    
695     ## then check if it's private or not
696     if exeWithPath.find(swReleaseTop) == -1:
697     # the exe is private, so we must ship
698     common.logger.debug(5,"Exe "+exeWithPath+" to be tarred")
699     path = swArea+'/'
700     exe = string.replace(exeWithPath, path,'')
701     tar.add(path+exe,exe)
702     pass
703     else:
704     # the exe is from release, we'll find it on WN
705     pass
706    
707     ## Now get the libraries: only those in local working area
708     libDir = 'lib'
709     lib = swArea+'/' +libDir
710     common.logger.debug(5,"lib "+lib+" to be tarred")
711     if os.path.exists(lib):
712     tar.add(lib,libDir)
713    
714     ## Now check if module dir is present
715     moduleDir = 'module'
716     module = swArea + '/' + moduleDir
717     if os.path.isdir(module):
718     tar.add(module,moduleDir)
719    
720     ## Now check if any data dir(s) is present
721     swAreaLen=len(swArea)
722     for root, dirs, files in os.walk(swArea):
723     if "data" in dirs:
724     common.logger.debug(5,"data "+root+"/data"+" to be tarred")
725     tar.add(root+"/data",root[swAreaLen:]+"/data")
726    
727     ## Add ProdAgent dir to tar
728     paDir = 'ProdAgentApi'
729     pa = os.environ['CRABDIR'] + '/' + 'ProdAgentApi'
730     if os.path.isdir(pa):
731     tar.add(pa,paDir)
732    
733     common.logger.debug(5,"Files added to "+self.tgzNameWithPath+" : "+str(tar.getnames()))
734     tar.close()
735     except :
736     raise CrabException('Could not create tar-ball')
737 gutsche 1.72
738     ## check for tarball size
739     tarballinfo = os.stat(self.tgzNameWithPath)
740     if ( tarballinfo.st_size > self.MaxTarBallSize*1024*1024 ) :
741     raise CrabException('Input sandbox size of ' + str(float(tarballinfo.st_size)/1024.0/1024.0) + ' MB is larger than the allowed ' + str(self.MaxTarBallSize) + ' MB input sandbox limit and not supported by the used GRID submission system. Please make sure that no unnecessary files are in all data directories in your local CMSSW project area as they are automatically packed into the input sandbox.')
742    
743 slacapra 1.61 ## create tar-ball with ML stuff
744 corvo 1.58 self.MLtgzfile = common.work_space.pathForTgz()+'share/MLfiles.tgz'
745 slacapra 1.61 try:
746     tar = tarfile.open(self.MLtgzfile, "w:gz")
747     path=os.environ['CRABDIR'] + '/python/'
748     for file in ['report.py', 'DashboardAPI.py', 'Logger.py', 'ProcInfo.py', 'apmon.py', 'parseCrabFjr.py']:
749     tar.add(path+file,file)
750     common.logger.debug(5,"Files added to "+self.MLtgzfile+" : "+str(tar.getnames()))
751     tar.close()
752     except :
753 corvo 1.58 raise CrabException('Could not create ML files tar-ball')
754    
755 slacapra 1.1 return
756    
757     def wsSetupEnvironment(self, nj):
758     """
759     Returns part of a job script which prepares
760     the execution environment for the job 'nj'.
761     """
762     # Prepare JobType-independent part
763 gutsche 1.3 txt = ''
764    
765     ## OLI_Daniele at this level middleware already known
766    
767     txt += 'if [ $middleware == LCG ]; then \n'
768     txt += self.wsSetupCMSLCGEnvironment_()
769     txt += 'elif [ $middleware == OSG ]; then\n'
770 gutsche 1.43 txt += ' WORKING_DIR=`/bin/mktemp -d $OSG_WN_TMP/cms_XXXXXXXXXXXX`\n'
771     txt += ' echo "Created working directory: $WORKING_DIR"\n'
772 gutsche 1.3 txt += ' if [ ! -d $WORKING_DIR ] ;then\n'
773 gutsche 1.7 txt += ' echo "SET_CMS_ENV 10016 ==> OSG $WORKING_DIR could not be created on WN `hostname`"\n'
774     txt += ' echo "JOB_EXIT_STATUS = 10016"\n'
775     txt += ' echo "JobExitCode=10016" | tee -a $RUNTIME_AREA/$repo\n'
776     txt += ' dumpStatus $RUNTIME_AREA/$repo\n'
777 gutsche 1.13 txt += ' rm -f $RUNTIME_AREA/$repo \n'
778     txt += ' echo "MonitorJobID=`echo $MonitorJobID`" | tee -a $RUNTIME_AREA/$repo \n'
779     txt += ' echo "MonitorID=`echo $MonitorID`" | tee -a $RUNTIME_AREA/$repo\n'
780 gutsche 1.3 txt += ' exit 1\n'
781     txt += ' fi\n'
782     txt += '\n'
783     txt += ' echo "Change to working directory: $WORKING_DIR"\n'
784     txt += ' cd $WORKING_DIR\n'
785     txt += self.wsSetupCMSOSGEnvironment_()
786     txt += 'fi\n'
787 slacapra 1.1
788     # Prepare JobType-specific part
789     scram = self.scram.commandName()
790     txt += '\n\n'
791     txt += 'echo "### SPECIFIC JOB SETUP ENVIRONMENT ###"\n'
792     txt += scram+' project CMSSW '+self.version+'\n'
793     txt += 'status=$?\n'
794     txt += 'if [ $status != 0 ] ; then\n'
795 gutsche 1.7 txt += ' echo "SET_EXE_ENV 10034 ==>ERROR CMSSW '+self.version+' not found on `hostname`" \n'
796 gutsche 1.3 txt += ' echo "JOB_EXIT_STATUS = 10034"\n'
797 gutsche 1.7 txt += ' echo "JobExitCode=10034" | tee -a $RUNTIME_AREA/$repo\n'
798 slacapra 1.1 txt += ' dumpStatus $RUNTIME_AREA/$repo\n'
799 gutsche 1.13 txt += ' rm -f $RUNTIME_AREA/$repo \n'
800     txt += ' echo "MonitorJobID=`echo $MonitorJobID`" | tee -a $RUNTIME_AREA/$repo \n'
801     txt += ' echo "MonitorID=`echo $MonitorID`" | tee -a $RUNTIME_AREA/$repo\n'
802 gutsche 1.3 ## OLI_Daniele
803     txt += ' if [ $middleware == OSG ]; then \n'
804     txt += ' echo "Remove working directory: $WORKING_DIR"\n'
805     txt += ' cd $RUNTIME_AREA\n'
806     txt += ' /bin/rm -rf $WORKING_DIR\n'
807     txt += ' if [ -d $WORKING_DIR ] ;then\n'
808 gutsche 1.7 txt += ' echo "SET_CMS_ENV 10018 ==> OSG $WORKING_DIR could not be deleted on WN `hostname` after CMSSW CMSSW_0_6_1 not found on `hostname`"\n'
809     txt += ' echo "JOB_EXIT_STATUS = 10018"\n'
810     txt += ' echo "JobExitCode=10018" | tee -a $RUNTIME_AREA/$repo\n'
811     txt += ' dumpStatus $RUNTIME_AREA/$repo\n'
812 gutsche 1.13 txt += ' rm -f $RUNTIME_AREA/$repo \n'
813     txt += ' echo "MonitorJobID=`echo $MonitorJobID`" | tee -a $RUNTIME_AREA/$repo \n'
814     txt += ' echo "MonitorID=`echo $MonitorID`" | tee -a $RUNTIME_AREA/$repo\n'
815 gutsche 1.3 txt += ' fi\n'
816     txt += ' fi \n'
817     txt += ' exit 1 \n'
818 slacapra 1.1 txt += 'fi \n'
819     txt += 'echo "CMSSW_VERSION = '+self.version+'"\n'
820 slacapra 1.71 txt += 'export SCRAM_ARCH='+self.executable_arch+'\n'
821 slacapra 1.1 txt += 'cd '+self.version+'\n'
822     ### needed grep for bug in scramv1 ###
823 corvo 1.58 txt += scram+' runtime -sh\n'
824 slacapra 1.1 txt += 'eval `'+scram+' runtime -sh | grep -v SCRAMRT_LSB_JOBNAME`\n'
825 corvo 1.58 txt += 'echo $PATH\n'
826 slacapra 1.1
827     # Handle the arguments:
828     txt += "\n"
829 gutsche 1.7 txt += "## number of arguments (first argument always jobnumber)\n"
830 slacapra 1.1 txt += "\n"
831 mkirn 1.32 # txt += "narg=$#\n"
832     txt += "if [ $nargs -lt 2 ]\n"
833 slacapra 1.1 txt += "then\n"
834 mkirn 1.33 txt += " echo 'SET_EXE_ENV 1 ==> ERROR Too few arguments' +$nargs+ \n"
835 gutsche 1.3 txt += ' echo "JOB_EXIT_STATUS = 50113"\n'
836 gutsche 1.7 txt += ' echo "JobExitCode=50113" | tee -a $RUNTIME_AREA/$repo\n'
837 slacapra 1.1 txt += ' dumpStatus $RUNTIME_AREA/$repo\n'
838 gutsche 1.13 txt += ' rm -f $RUNTIME_AREA/$repo \n'
839     txt += ' echo "MonitorJobID=`echo $MonitorJobID`" | tee -a $RUNTIME_AREA/$repo \n'
840     txt += ' echo "MonitorID=`echo $MonitorID`" | tee -a $RUNTIME_AREA/$repo\n'
841 gutsche 1.3 ## OLI_Daniele
842     txt += ' if [ $middleware == OSG ]; then \n'
843     txt += ' echo "Remove working directory: $WORKING_DIR"\n'
844     txt += ' cd $RUNTIME_AREA\n'
845     txt += ' /bin/rm -rf $WORKING_DIR\n'
846     txt += ' if [ -d $WORKING_DIR ] ;then\n'
847 gutsche 1.7 txt += ' echo "SET_EXE_ENV 50114 ==> OSG $WORKING_DIR could not be deleted on WN `hostname` after Too few arguments for CRAB job wrapper"\n'
848     txt += ' echo "JOB_EXIT_STATUS = 50114"\n'
849     txt += ' echo "JobExitCode=50114" | tee -a $RUNTIME_AREA/$repo\n'
850     txt += ' dumpStatus $RUNTIME_AREA/$repo\n'
851 gutsche 1.13 txt += ' rm -f $RUNTIME_AREA/$repo \n'
852     txt += ' echo "MonitorJobID=`echo $MonitorJobID`" | tee -a $RUNTIME_AREA/$repo \n'
853     txt += ' echo "MonitorID=`echo $MonitorID`" | tee -a $RUNTIME_AREA/$repo\n'
854 gutsche 1.3 txt += ' fi\n'
855     txt += ' fi \n'
856 slacapra 1.1 txt += " exit 1\n"
857     txt += "fi\n"
858     txt += "\n"
859    
860     # Prepare job-specific part
861     job = common.job_list[nj]
862 spiga 1.42 if self.pset != None: #CarlosDaniele
863     pset = os.path.basename(job.configFilename())
864     txt += '\n'
865     if (self.datasetPath): # standard job
866     #txt += 'InputFiles=$2\n'
867     txt += 'InputFiles=${args[1]}\n'
868     txt += 'MaxEvents=${args[2]}\n'
869     txt += 'SkipEvents=${args[3]}\n'
870     txt += 'echo "Inputfiles:<$InputFiles>"\n'
871     txt += 'sed "s#{\'INPUT\'}#$InputFiles#" $RUNTIME_AREA/'+pset+' > pset_tmp_1.cfg\n'
872     txt += 'echo "MaxEvents:<$MaxEvents>"\n'
873 gutsche 1.46 txt += 'sed "s#INPUTMAXEVENTS#$MaxEvents#" pset_tmp_1.cfg > pset_tmp_2.cfg\n'
874 spiga 1.42 txt += 'echo "SkipEvents:<$SkipEvents>"\n'
875 gutsche 1.46 txt += 'sed "s#INPUTSKIPEVENTS#$SkipEvents#" pset_tmp_2.cfg > pset.cfg\n'
876 spiga 1.42 else: # pythia like job
877     if (self.sourceSeed):
878 spiga 1.57 txt += 'FirstRun=${args[1]}\n'
879     txt += 'echo "FirstRun: <$FirstRun>"\n'
880     txt += 'sed "s#\<INPUTFIRSTRUN\>#$FirstRun#" $RUNTIME_AREA/'+pset+' > tmp_1.cfg\n'
881     else:
882     txt += '# Copy untouched pset\n'
883     txt += 'cp $RUNTIME_AREA/'+pset+' tmp_1.cfg\n'
884     if (self.sourceSeed):
885 spiga 1.42 # txt += 'Seed=$2\n'
886 spiga 1.57 txt += 'Seed=${args[2]}\n'
887 spiga 1.42 txt += 'echo "Seed: <$Seed>"\n'
888 spiga 1.57 txt += 'sed "s#\<INPUT\>#$Seed#" tmp_1.cfg > tmp_2.cfg\n'
889 spiga 1.42 if (self.sourceSeedVtx):
890     # txt += 'VtxSeed=$3\n'
891 spiga 1.57 txt += 'VtxSeed=${args[3]}\n'
892 spiga 1.42 txt += 'echo "VtxSeed: <$VtxSeed>"\n'
893 spiga 1.57 txt += 'sed "s#INPUTVTX#$VtxSeed#" tmp_2.cfg > pset.cfg\n'
894 spiga 1.42 else:
895 spiga 1.57 txt += 'mv tmp_2.cfg pset.cfg\n'
896 slacapra 1.28 else:
897 spiga 1.57 txt += 'mv tmp_1.cfg pset.cfg\n'
898     # txt += '# Copy untouched pset\n'
899     # txt += 'cp $RUNTIME_AREA/'+pset+' pset.cfg\n'
900 slacapra 1.24
901 slacapra 1.1
902     if len(self.additional_inbox_files) > 0:
903     for file in self.additional_inbox_files:
904 mkirn 1.31 relFile = file.split("/")[-1]
905     txt += 'if [ -e $RUNTIME_AREA/'+relFile+' ] ; then\n'
906     txt += ' cp $RUNTIME_AREA/'+relFile+' .\n'
907     txt += ' chmod +x '+relFile+'\n'
908 slacapra 1.1 txt += 'fi\n'
909     pass
910    
911 spiga 1.42 if self.pset != None: #CarlosDaniele
912     txt += 'echo "### END JOB SETUP ENVIRONMENT ###"\n\n'
913    
914     txt += '\n'
915     txt += 'echo "***** cat pset.cfg *********"\n'
916     txt += 'cat pset.cfg\n'
917     txt += 'echo "****** end pset.cfg ********"\n'
918     txt += '\n'
919     # txt += 'echo "***** cat pset1.cfg *********"\n'
920     # txt += 'cat pset1.cfg\n'
921     # txt += 'echo "****** end pset1.cfg ********"\n'
922 gutsche 1.3 return txt
923    
924 slacapra 1.63 def wsBuildExe(self, nj=0):
925 gutsche 1.3 """
926     Put in the script the commands to build an executable
927     or a library.
928     """
929    
930     txt = ""
931    
932     if os.path.isfile(self.tgzNameWithPath):
933     txt += 'echo "tar xzvf $RUNTIME_AREA/'+os.path.basename(self.tgzNameWithPath)+'"\n'
934     txt += 'tar xzvf $RUNTIME_AREA/'+os.path.basename(self.tgzNameWithPath)+'\n'
935     txt += 'untar_status=$? \n'
936     txt += 'if [ $untar_status -ne 0 ]; then \n'
937     txt += ' echo "SET_EXE 1 ==> ERROR Untarring .tgz file failed"\n'
938     txt += ' echo "JOB_EXIT_STATUS = $untar_status" \n'
939 gutsche 1.7 txt += ' echo "JobExitCode=$untar_status" | tee -a $RUNTIME_AREA/$repo\n'
940 gutsche 1.3 txt += ' if [ $middleware == OSG ]; then \n'
941     txt += ' echo "Remove working directory: $WORKING_DIR"\n'
942     txt += ' cd $RUNTIME_AREA\n'
943     txt += ' /bin/rm -rf $WORKING_DIR\n'
944     txt += ' if [ -d $WORKING_DIR ] ;then\n'
945 gutsche 1.13 txt += ' echo "SET_EXE 50999 ==> OSG $WORKING_DIR could not be deleted on WN `hostname` after Untarring .tgz file failed"\n'
946     txt += ' echo "JOB_EXIT_STATUS = 50999"\n'
947     txt += ' echo "JobExitCode=50999" | tee -a $RUNTIME_AREA/$repo\n'
948     txt += ' dumpStatus $RUNTIME_AREA/$repo\n'
949     txt += ' rm -f $RUNTIME_AREA/$repo \n'
950     txt += ' echo "MonitorJobID=`echo $MonitorJobID`" | tee -a $RUNTIME_AREA/$repo \n'
951     txt += ' echo "MonitorID=`echo $MonitorID`" | tee -a $RUNTIME_AREA/$repo\n'
952 gutsche 1.3 txt += ' fi\n'
953     txt += ' fi \n'
954     txt += ' \n'
955 gutsche 1.7 txt += ' exit 1 \n'
956 gutsche 1.3 txt += 'else \n'
957     txt += ' echo "Successful untar" \n'
958     txt += 'fi \n'
959 gutsche 1.50 txt += '\n'
960     txt += 'echo "Include ProdAgentApi in PYTHONPATH"\n'
961     txt += 'if [ -z "$PYTHONPATH" ]; then\n'
962     txt += ' export PYTHONPATH=ProdAgentApi\n'
963     txt += 'else\n'
964     txt += ' export PYTHONPATH=ProdAgentApi:${PYTHONPATH}\n'
965     txt += 'fi\n'
966     txt += '\n'
967    
968 gutsche 1.3 pass
969    
970 slacapra 1.1 return txt
971    
972     def modifySteeringCards(self, nj):
973     """
974     modify the card provided by the user,
975     writing a new card into share dir
976     """
977    
978     def executableName(self):
979 slacapra 1.70 if self.scriptExe: #CarlosDaniele
980 spiga 1.42 return "sh "
981     else:
982     return self.executable
983 slacapra 1.1
984     def executableArgs(self):
985 slacapra 1.70 if self.scriptExe:#CarlosDaniele
986 spiga 1.42 return self.scriptExe + " $NJob"
987     else:
988     return " -p pset.cfg"
989 slacapra 1.1
990     def inputSandbox(self, nj):
991     """
992     Returns a list of filenames to be put in JDL input sandbox.
993     """
994     inp_box = []
995 slacapra 1.53 # # dict added to delete duplicate from input sandbox file list
996     # seen = {}
997 slacapra 1.1 ## code
998     if os.path.isfile(self.tgzNameWithPath):
999     inp_box.append(self.tgzNameWithPath)
1000 corvo 1.58 if os.path.isfile(self.MLtgzfile):
1001     inp_box.append(self.MLtgzfile)
1002 slacapra 1.1 ## config
1003 slacapra 1.70 if not self.pset is None:
1004 corvo 1.56 inp_box.append(common.work_space.pathForTgz() + 'job/' + self.configFilename())
1005 slacapra 1.1 ## additional input files
1006 slacapra 1.70 for file in self.additional_inbox_files:
1007     inp_box.append(file)
1008 slacapra 1.1 return inp_box
1009    
1010     def outputSandbox(self, nj):
1011     """
1012     Returns a list of filenames to be put in JDL output sandbox.
1013     """
1014     out_box = []
1015    
1016     ## User Declared output files
1017 slacapra 1.54 for out in (self.output_file+self.output_file_sandbox):
1018 slacapra 1.1 n_out = nj + 1
1019     out_box.append(self.numberFile_(out,str(n_out)))
1020     return out_box
1021    
1022     def prepareSteeringCards(self):
1023     """
1024     Make initial modifications of the user's steering card file.
1025     """
1026     return
1027    
1028     def wsRenameOutput(self, nj):
1029     """
1030     Returns part of a job script which renames the produced files.
1031     """
1032    
1033     txt = '\n'
1034 gutsche 1.7 txt += '# directory content\n'
1035     txt += 'ls \n'
1036 slacapra 1.54
1037     for fileWithSuffix in (self.output_file+self.output_file_sandbox):
1038 slacapra 1.1 output_file_num = self.numberFile_(fileWithSuffix, '$NJob')
1039     txt += '\n'
1040 gutsche 1.7 txt += '# check output file\n'
1041 slacapra 1.1 txt += 'ls '+fileWithSuffix+'\n'
1042 fanzago 1.18 txt += 'ls_result=$?\n'
1043     txt += 'if [ $ls_result -ne 0 ] ; then\n'
1044     txt += ' echo "ERROR: Problem with output file"\n'
1045 gutsche 1.7 if common.scheduler.boss_scheduler_name == 'condor_g':
1046     txt += ' if [ $middleware == OSG ]; then \n'
1047     txt += ' echo "prepare dummy output file"\n'
1048     txt += ' echo "Processing of job output failed" > $RUNTIME_AREA/'+output_file_num+'\n'
1049     txt += ' fi \n'
1050 slacapra 1.1 txt += 'else\n'
1051     txt += ' cp '+fileWithSuffix+' $RUNTIME_AREA/'+output_file_num+'\n'
1052     txt += 'fi\n'
1053    
1054 gutsche 1.7 txt += 'cd $RUNTIME_AREA\n'
1055 fanzago 1.18 txt += 'cd $RUNTIME_AREA\n'
1056 gutsche 1.3 ### OLI_DANIELE
1057     txt += 'if [ $middleware == OSG ]; then\n'
1058     txt += ' cd $RUNTIME_AREA\n'
1059     txt += ' echo "Remove working directory: $WORKING_DIR"\n'
1060     txt += ' /bin/rm -rf $WORKING_DIR\n'
1061     txt += ' if [ -d $WORKING_DIR ] ;then\n'
1062 gutsche 1.7 txt += ' echo "SET_EXE 60999 ==> OSG $WORKING_DIR could not be deleted on WN `hostname` after cleanup of WN"\n'
1063     txt += ' echo "JOB_EXIT_STATUS = 60999"\n'
1064     txt += ' echo "JobExitCode=60999" | tee -a $RUNTIME_AREA/$repo\n'
1065     txt += ' dumpStatus $RUNTIME_AREA/$repo\n'
1066 gutsche 1.13 txt += ' rm -f $RUNTIME_AREA/$repo \n'
1067     txt += ' echo "MonitorJobID=`echo $MonitorJobID`" | tee -a $RUNTIME_AREA/$repo \n'
1068     txt += ' echo "MonitorID=`echo $MonitorID`" | tee -a $RUNTIME_AREA/$repo\n'
1069 gutsche 1.3 txt += ' fi\n'
1070     txt += 'fi\n'
1071     txt += '\n'
1072 slacapra 1.54
1073     file_list = ''
1074     ## Add to filelist only files to be possibly copied to SE
1075     for fileWithSuffix in self.output_file:
1076     output_file_num = self.numberFile_(fileWithSuffix, '$NJob')
1077     file_list=file_list+output_file_num+' '
1078     file_list=file_list[:-1]
1079     txt += 'file_list="'+file_list+'"\n'
1080    
1081 slacapra 1.1 return txt
1082    
1083     def numberFile_(self, file, txt):
1084     """
1085     append _'txt' before last extension of a file
1086     """
1087     p = string.split(file,".")
1088     # take away last extension
1089     name = p[0]
1090     for x in p[1:-1]:
1091     name=name+"."+x
1092     # add "_txt"
1093     if len(p)>1:
1094     ext = p[len(p)-1]
1095     result = name + '_' + txt + "." + ext
1096     else:
1097     result = name + '_' + txt
1098    
1099     return result
1100    
1101 slacapra 1.63 def getRequirements(self, nj=[]):
1102 slacapra 1.1 """
1103     return job requirements to add to jdl files
1104     """
1105     req = ''
1106 slacapra 1.47 if self.version:
1107 slacapra 1.10 req='Member("VO-cms-' + \
1108 slacapra 1.47 self.version + \
1109 slacapra 1.10 '", other.GlueHostApplicationSoftwareRunTimeEnvironment)'
1110 gutsche 1.35
1111     req = req + ' && (other.GlueHostNetworkAdapterOutboundIP)'
1112    
1113 slacapra 1.1 return req
1114 gutsche 1.3
1115     def configFilename(self):
1116     """ return the config filename """
1117     return self.name()+'.cfg'
1118    
1119     ### OLI_DANIELE
1120     def wsSetupCMSOSGEnvironment_(self):
1121     """
1122     Returns part of a job script which is prepares
1123     the execution environment and which is common for all CMS jobs.
1124     """
1125     txt = '\n'
1126     txt += ' echo "### SETUP CMS OSG ENVIRONMENT ###"\n'
1127     txt += ' if [ -f $GRID3_APP_DIR/cmssoft/cmsset_default.sh ] ;then\n'
1128     txt += ' # Use $GRID3_APP_DIR/cmssoft/cmsset_default.sh to setup cms software\n'
1129     txt += ' source $GRID3_APP_DIR/cmssoft/cmsset_default.sh '+self.version+'\n'
1130 mkirn 1.40 txt += ' elif [ -f $OSG_APP/cmssoft/cms/cmsset_default.sh ] ;then\n'
1131     txt += ' # Use $OSG_APP/cmssoft/cms/cmsset_default.sh to setup cms software\n'
1132     txt += ' source $OSG_APP/cmssoft/cms/cmsset_default.sh '+self.version+'\n'
1133 gutsche 1.3 txt += ' else\n'
1134 mkirn 1.40 txt += ' echo "SET_CMS_ENV 10020 ==> ERROR $GRID3_APP_DIR/cmssoft/cmsset_default.sh and $OSG_APP/cmssoft/cms/cmsset_default.sh file not found"\n'
1135 gutsche 1.3 txt += ' echo "JOB_EXIT_STATUS = 10020"\n'
1136     txt += ' echo "JobExitCode=10020" | tee -a $RUNTIME_AREA/$repo\n'
1137     txt += ' dumpStatus $RUNTIME_AREA/$repo\n'
1138 gutsche 1.13 txt += ' rm -f $RUNTIME_AREA/$repo \n'
1139     txt += ' echo "MonitorJobID=`echo $MonitorJobID`" | tee -a $RUNTIME_AREA/$repo \n'
1140     txt += ' echo "MonitorID=`echo $MonitorID`" | tee -a $RUNTIME_AREA/$repo\n'
1141 gutsche 1.7 txt += ' exit 1\n'
1142 gutsche 1.3 txt += '\n'
1143     txt += ' echo "Remove working directory: $WORKING_DIR"\n'
1144     txt += ' cd $RUNTIME_AREA\n'
1145     txt += ' /bin/rm -rf $WORKING_DIR\n'
1146     txt += ' if [ -d $WORKING_DIR ] ;then\n'
1147 mkirn 1.40 txt += ' echo "SET_CMS_ENV 10017 ==> OSG $WORKING_DIR could not be deleted on WN `hostname` after $GRID3_APP_DIR/cmssoft/cmsset_default.sh and $OSG_APP/cmssoft/cms/cmsset_default.sh file not found"\n'
1148 gutsche 1.7 txt += ' echo "JOB_EXIT_STATUS = 10017"\n'
1149     txt += ' echo "JobExitCode=10017" | tee -a $RUNTIME_AREA/$repo\n'
1150     txt += ' dumpStatus $RUNTIME_AREA/$repo\n'
1151 gutsche 1.13 txt += ' rm -f $RUNTIME_AREA/$repo \n'
1152     txt += ' echo "MonitorJobID=`echo $MonitorJobID`" | tee -a $RUNTIME_AREA/$repo \n'
1153     txt += ' echo "MonitorID=`echo $MonitorID`" | tee -a $RUNTIME_AREA/$repo\n'
1154 gutsche 1.3 txt += ' fi\n'
1155     txt += '\n'
1156 gutsche 1.7 txt += ' exit 1\n'
1157 gutsche 1.3 txt += ' fi\n'
1158     txt += '\n'
1159     txt += ' echo "SET_CMS_ENV 0 ==> setup cms environment ok"\n'
1160     txt += ' echo " END SETUP CMS OSG ENVIRONMENT "\n'
1161    
1162     return txt
1163    
1164     ### OLI_DANIELE
1165     def wsSetupCMSLCGEnvironment_(self):
1166     """
1167     Returns part of a job script which is prepares
1168     the execution environment and which is common for all CMS jobs.
1169     """
1170     txt = ' \n'
1171     txt += ' echo " ### SETUP CMS LCG ENVIRONMENT ### "\n'
1172     txt += ' if [ ! $VO_CMS_SW_DIR ] ;then\n'
1173     txt += ' echo "SET_CMS_ENV 10031 ==> ERROR CMS software dir not found on WN `hostname`"\n'
1174     txt += ' echo "JOB_EXIT_STATUS = 10031" \n'
1175     txt += ' echo "JobExitCode=10031" | tee -a $RUNTIME_AREA/$repo\n'
1176     txt += ' dumpStatus $RUNTIME_AREA/$repo\n'
1177 gutsche 1.13 txt += ' rm -f $RUNTIME_AREA/$repo \n'
1178     txt += ' echo "MonitorJobID=`echo $MonitorJobID`" | tee -a $RUNTIME_AREA/$repo \n'
1179     txt += ' echo "MonitorID=`echo $MonitorID`" | tee -a $RUNTIME_AREA/$repo\n'
1180 gutsche 1.7 txt += ' exit 1\n'
1181 gutsche 1.3 txt += ' else\n'
1182     txt += ' echo "Sourcing environment... "\n'
1183     txt += ' if [ ! -s $VO_CMS_SW_DIR/cmsset_default.sh ] ;then\n'
1184     txt += ' echo "SET_CMS_ENV 10020 ==> ERROR cmsset_default.sh file not found into dir $VO_CMS_SW_DIR"\n'
1185     txt += ' echo "JOB_EXIT_STATUS = 10020"\n'
1186     txt += ' echo "JobExitCode=10020" | tee -a $RUNTIME_AREA/$repo\n'
1187     txt += ' dumpStatus $RUNTIME_AREA/$repo\n'
1188 gutsche 1.13 txt += ' rm -f $RUNTIME_AREA/$repo \n'
1189     txt += ' echo "MonitorJobID=`echo $MonitorJobID`" | tee -a $RUNTIME_AREA/$repo \n'
1190     txt += ' echo "MonitorID=`echo $MonitorID`" | tee -a $RUNTIME_AREA/$repo\n'
1191 gutsche 1.7 txt += ' exit 1\n'
1192 gutsche 1.3 txt += ' fi\n'
1193     txt += ' echo "sourcing $VO_CMS_SW_DIR/cmsset_default.sh"\n'
1194     txt += ' source $VO_CMS_SW_DIR/cmsset_default.sh\n'
1195     txt += ' result=$?\n'
1196     txt += ' if [ $result -ne 0 ]; then\n'
1197     txt += ' echo "SET_CMS_ENV 10032 ==> ERROR problem sourcing $VO_CMS_SW_DIR/cmsset_default.sh"\n'
1198     txt += ' echo "JOB_EXIT_STATUS = 10032"\n'
1199     txt += ' echo "JobExitCode=10032" | tee -a $RUNTIME_AREA/$repo\n'
1200     txt += ' dumpStatus $RUNTIME_AREA/$repo\n'
1201 gutsche 1.13 txt += ' rm -f $RUNTIME_AREA/$repo \n'
1202     txt += ' echo "MonitorJobID=`echo $MonitorJobID`" | tee -a $RUNTIME_AREA/$repo \n'
1203     txt += ' echo "MonitorID=`echo $MonitorID`" | tee -a $RUNTIME_AREA/$repo\n'
1204 gutsche 1.7 txt += ' exit 1\n'
1205 gutsche 1.3 txt += ' fi\n'
1206     txt += ' fi\n'
1207     txt += ' \n'
1208     txt += ' echo "SET_CMS_ENV 0 ==> setup cms environment ok"\n'
1209     txt += ' echo "### END SETUP CMS LCG ENVIRONMENT ###"\n'
1210     return txt
1211 gutsche 1.5
1212     def setParam_(self, param, value):
1213     self._params[param] = value
1214    
1215     def getParams(self):
1216     return self._params
1217 gutsche 1.8
1218     def setTaskid_(self):
1219     self._taskId = self.cfg_params['taskId']
1220    
1221     def getTaskid(self):
1222     return self._taskId
1223 gutsche 1.35
1224     #######################################################################
1225     def uniquelist(self, old):
1226     """
1227     remove duplicates from a list
1228     """
1229     nd={}
1230     for e in old:
1231     nd[e]=0
1232     return nd.keys()