ViewVC Help
View File | Revision Log | Show Annotations | Root Listing
root/cvsroot/COMP/CRAB/python/cms_cmssw.py
Revision: 1.85
Committed: Fri May 18 08:42:39 2007 UTC (17 years, 11 months ago) by corvo
Content type: text/x-python
Branch: MAIN
Changes since 1.84: +115 -89 lines
Log Message:
Again the fix on additional input files and relative paths

File Contents

# User Rev Content
1 slacapra 1.1 from JobType import JobType
2     from crab_logger import Logger
3     from crab_exceptions import *
4     from crab_util import *
5     import common
6 gutsche 1.3 import PsetManipulator
7 corvo 1.85 import DataDiscovery
8     import DataDiscovery_DBS2
9     import DataLocation
10 slacapra 1.1 import Scram
11    
12 slacapra 1.70 import os, string, re, shutil, glob
13 slacapra 1.1
14     class Cmssw(JobType):
15 gutsche 1.38 def __init__(self, cfg_params, ncjobs):
16 slacapra 1.1 JobType.__init__(self, 'CMSSW')
17     common.logger.debug(3,'CMSSW::__init__')
18    
19 gutsche 1.3 # Marco.
20     self._params = {}
21     self.cfg_params = cfg_params
22 gutsche 1.38
23 gutsche 1.72 try:
24     self.MaxTarBallSize = float(self.cfg_params['EDG.maxtarballsize'])
25     except KeyError:
26 corvo 1.85 self.MaxTarBallSize = 9.5 # actual (23-Apr-2007) limit is 10 Mb
27 gutsche 1.72
28 gutsche 1.44 # number of jobs requested to be created, limit obj splitting
29 gutsche 1.38 self.ncjobs = ncjobs
30    
31 slacapra 1.1 log = common.logger
32    
33     self.scram = Scram.Scram(cfg_params)
34     self.additional_inbox_files = []
35     self.scriptExe = ''
36     self.executable = ''
37 slacapra 1.71 self.executable_arch = self.scram.getArch()
38 slacapra 1.1 self.tgz_name = 'default.tgz'
39 corvo 1.56 self.scriptName = 'CMSSW.sh'
40 spiga 1.42 self.pset = '' #scrip use case Da
41     self.datasetPath = '' #scrip use case Da
42 gutsche 1.3
43 gutsche 1.50 # set FJR file name
44     self.fjrFileName = 'crab_fjr.xml'
45    
46 slacapra 1.1 self.version = self.scram.getSWVersion()
47 slacapra 1.55 common.taskDB.setDict('codeVersion',self.version)
48 gutsche 1.5 self.setParam_('application', self.version)
49 slacapra 1.47
50 slacapra 1.1 ### collect Data cards
51 gutsche 1.66
52     ## get DBS mode
53     try:
54 corvo 1.85 self.use_dbs_2 = int(self.cfg_params['CMSSW.use_dbs_2'])
55 gutsche 1.66 except KeyError:
56 corvo 1.85 self.use_dbs_2 = 0
57 gutsche 1.66
58 slacapra 1.1 try:
59 slacapra 1.9 tmp = cfg_params['CMSSW.datasetpath']
60     log.debug(6, "CMSSW::CMSSW(): datasetPath = "+tmp)
61     if string.lower(tmp)=='none':
62     self.datasetPath = None
63 slacapra 1.21 self.selectNoInput = 1
64 slacapra 1.9 else:
65     self.datasetPath = tmp
66 slacapra 1.21 self.selectNoInput = 0
67 slacapra 1.1 except KeyError:
68 gutsche 1.3 msg = "Error: datasetpath not defined "
69 slacapra 1.1 raise CrabException(msg)
70 gutsche 1.5
71     # ML monitoring
72     # split dataset path style: /PreProdR3Minbias/SIM/GEN-SIM
73 slacapra 1.9 if not self.datasetPath:
74     self.setParam_('dataset', 'None')
75     self.setParam_('owner', 'None')
76     else:
77     datasetpath_split = self.datasetPath.split("/")
78 corvo 1.85 if self.use_dbs_2 == 1 :
79     self.setParam_('dataset', datasetpath_split[1])
80     self.setParam_('owner', datasetpath_split[2])
81     else :
82 slacapra 1.80 self.setParam_('dataset', datasetpath_split[1])
83     self.setParam_('owner', datasetpath_split[-1])
84 corvo 1.85
85 gutsche 1.8 self.setTaskid_()
86     self.setParam_('taskId', self.cfg_params['taskId'])
87 gutsche 1.5
88 slacapra 1.1 self.dataTiers = []
89    
90     ## now the application
91     try:
92     self.executable = cfg_params['CMSSW.executable']
93 gutsche 1.5 self.setParam_('exe', self.executable)
94 slacapra 1.1 log.debug(6, "CMSSW::CMSSW(): executable = "+self.executable)
95     msg = "Default executable cmsRun overridden. Switch to " + self.executable
96     log.debug(3,msg)
97     except KeyError:
98     self.executable = 'cmsRun'
99 gutsche 1.5 self.setParam_('exe', self.executable)
100 slacapra 1.1 msg = "User executable not defined. Use cmsRun"
101     log.debug(3,msg)
102     pass
103    
104     try:
105     self.pset = cfg_params['CMSSW.pset']
106     log.debug(6, "Cmssw::Cmssw(): PSet file = "+self.pset)
107 spiga 1.42 if self.pset.lower() != 'none' :
108     if (not os.path.exists(self.pset)):
109     raise CrabException("User defined PSet file "+self.pset+" does not exist")
110     else:
111     self.pset = None
112 slacapra 1.1 except KeyError:
113     raise CrabException("PSet file missing. Cannot run cmsRun ")
114    
115     # output files
116 slacapra 1.53 ## stuff which must be returned always via sandbox
117     self.output_file_sandbox = []
118    
119     # add fjr report by default via sandbox
120     self.output_file_sandbox.append(self.fjrFileName)
121    
122     # other output files to be returned via sandbox or copied to SE
123 slacapra 1.1 try:
124     self.output_file = []
125     tmp = cfg_params['CMSSW.output_file']
126     if tmp != '':
127     tmpOutFiles = string.split(cfg_params['CMSSW.output_file'],',')
128     log.debug(7, 'cmssw::cmssw(): output files '+str(tmpOutFiles))
129     for tmp in tmpOutFiles:
130     tmp=string.strip(tmp)
131     self.output_file.append(tmp)
132     pass
133     else:
134 gutsche 1.50 log.message("No output file defined: only stdout/err and the CRAB Framework Job Report will be available")
135 slacapra 1.1 pass
136     pass
137     except KeyError:
138 gutsche 1.50 log.message("No output file defined: only stdout/err and the CRAB Framework Job Report will be available")
139 slacapra 1.1 pass
140    
141     # script_exe file as additional file in inputSandbox
142     try:
143 slacapra 1.10 self.scriptExe = cfg_params['USER.script_exe']
144     if self.scriptExe != '':
145     if not os.path.isfile(self.scriptExe):
146 slacapra 1.64 msg ="ERROR. file "+self.scriptExe+" not found"
147 slacapra 1.10 raise CrabException(msg)
148 spiga 1.42 self.additional_inbox_files.append(string.strip(self.scriptExe))
149 slacapra 1.1 except KeyError:
150 spiga 1.42 self.scriptExe = ''
151 slacapra 1.70
152 spiga 1.42 #CarlosDaniele
153     if self.datasetPath == None and self.pset == None and self.scriptExe == '' :
154 slacapra 1.70 msg ="Error. script_exe not defined"
155 spiga 1.42 raise CrabException(msg)
156    
157 slacapra 1.1 ## additional input files
158     try:
159 slacapra 1.29 tmpAddFiles = string.split(cfg_params['USER.additional_input_files'],',')
160 slacapra 1.70 for tmp in tmpAddFiles:
161     tmp = string.strip(tmp)
162     dirname = ''
163     if not tmp[0]=="/": dirname = "."
164 corvo 1.85 files = []
165     if string.find(tmp,"*")>-1:
166     files = glob.glob(os.path.join(dirname, tmp))
167     if len(files)==0:
168     raise CrabException("No additional input file found with this pattern: "+tmp)
169     else:
170     files.append(tmp)
171 slacapra 1.70 for file in files:
172     if not os.path.exists(file):
173     raise CrabException("Additional input file not found: "+file)
174 slacapra 1.45 pass
175 corvo 1.85 fname = string.split(file, '/')[-1]
176     storedFile = common.work_space.pathForTgz()+'share/'+fname
177 slacapra 1.70 shutil.copyfile(file, storedFile)
178     self.additional_inbox_files.append(string.strip(storedFile))
179 slacapra 1.1 pass
180     pass
181 slacapra 1.70 common.logger.debug(5,"Additional input files: "+str(self.additional_inbox_files))
182 slacapra 1.1 except KeyError:
183     pass
184    
185 slacapra 1.9 # files per job
186 slacapra 1.1 try:
187 gutsche 1.35 if (cfg_params['CMSSW.files_per_jobs']):
188     raise CrabException("files_per_jobs no longer supported. Quitting.")
189 gutsche 1.3 except KeyError:
190 gutsche 1.35 pass
191 gutsche 1.3
192 slacapra 1.9 ## Events per job
193 gutsche 1.3 try:
194 slacapra 1.10 self.eventsPerJob =int( cfg_params['CMSSW.events_per_job'])
195 slacapra 1.9 self.selectEventsPerJob = 1
196 gutsche 1.3 except KeyError:
197 slacapra 1.9 self.eventsPerJob = -1
198     self.selectEventsPerJob = 0
199    
200 slacapra 1.22 ## number of jobs
201     try:
202     self.theNumberOfJobs =int( cfg_params['CMSSW.number_of_jobs'])
203     self.selectNumberOfJobs = 1
204     except KeyError:
205     self.theNumberOfJobs = 0
206     self.selectNumberOfJobs = 0
207 slacapra 1.10
208 gutsche 1.35 try:
209     self.total_number_of_events = int(cfg_params['CMSSW.total_number_of_events'])
210     self.selectTotalNumberEvents = 1
211     except KeyError:
212     self.total_number_of_events = 0
213     self.selectTotalNumberEvents = 0
214    
215 spiga 1.42 if self.pset != None: #CarlosDaniele
216     if ( (self.selectTotalNumberEvents + self.selectEventsPerJob + self.selectNumberOfJobs) != 2 ):
217     msg = 'Must define exactly two of total_number_of_events, events_per_job, or number_of_jobs.'
218     raise CrabException(msg)
219     else:
220     if (self.selectNumberOfJobs == 0):
221     msg = 'Must specify number_of_jobs.'
222     raise CrabException(msg)
223 gutsche 1.35
224 slacapra 1.22 ## source seed for pythia
225     try:
226     self.sourceSeed = int(cfg_params['CMSSW.pythia_seed'])
227     except KeyError:
228 slacapra 1.23 self.sourceSeed = None
229     common.logger.debug(5,"No seed given")
230 slacapra 1.22
231 slacapra 1.28 try:
232     self.sourceSeedVtx = int(cfg_params['CMSSW.vtx_seed'])
233     except KeyError:
234     self.sourceSeedVtx = None
235     common.logger.debug(5,"No vertex seed given")
236 corvo 1.85
237     try:
238     self.sourceSeedG4 = int(cfg_params['CMSSW.g4_seed'])
239     except KeyError:
240     self.sourceSeedG4 = None
241     common.logger.debug(5,"No g4 sim hits seed given")
242    
243     try:
244     self.sourceSeedMix = int(cfg_params['CMSSW.mix_seed'])
245     except KeyError:
246     self.sourceSeedMix = None
247     common.logger.debug(5,"No mix seed given")
248    
249 spiga 1.57 try:
250     self.firstRun = int(cfg_params['CMSSW.first_run'])
251     except KeyError:
252     self.firstRun = None
253     common.logger.debug(5,"No first run given")
254 spiga 1.42 if self.pset != None: #CarlosDaniele
255     self.PsetEdit = PsetManipulator.PsetManipulator(self.pset) #Daniele Pset
256 gutsche 1.3
257 slacapra 1.1 #DBSDLS-start
258     ## Initialize the variables that are extracted from DBS/DLS and needed in other places of the code
259     self.maxEvents=0 # max events available ( --> check the requested nb. of evts in Creator.py)
260     self.DBSPaths={} # all dbs paths requested ( --> input to the site local discovery script)
261 gutsche 1.35 self.jobDestination=[] # Site destination(s) for each job (list of lists)
262 slacapra 1.1 ## Perform the data location and discovery (based on DBS/DLS)
263 slacapra 1.9 ## SL: Don't if NONE is specified as input (pythia use case)
264 gutsche 1.35 blockSites = {}
265 slacapra 1.9 if self.datasetPath:
266 gutsche 1.35 blockSites = self.DataDiscoveryAndLocation(cfg_params)
267 slacapra 1.1 #DBSDLS-end
268    
269     self.tgzNameWithPath = self.getTarBall(self.executable)
270 slacapra 1.10
271 slacapra 1.9 ## Select Splitting
272 spiga 1.42 if self.selectNoInput:
273     if self.pset == None: #CarlosDaniele
274     self.jobSplittingForScript()
275     else:
276     self.jobSplittingNoInput()
277 corvo 1.56 else:
278     self.jobSplittingByBlocks(blockSites)
279 gutsche 1.5
280 slacapra 1.22 # modify Pset
281 spiga 1.42 if self.pset != None: #CarlosDaniele
282 corvo 1.85 # try:
283     if (self.datasetPath): # standard job
284     # allow to processa a fraction of events in a file
285     self.PsetEdit.inputModule("INPUT")
286     self.PsetEdit.maxEvent("INPUTMAXEVENTS")
287     self.PsetEdit.skipEvent("INPUTSKIPEVENTS")
288     else: # pythia like job
289     self.PsetEdit.maxEvent(self.eventsPerJob)
290     if (self.firstRun):
291     self.PsetEdit.pythiaFirstRun("INPUTFIRSTRUN") #First Run
292     if (self.sourceSeed) :
293     self.PsetEdit.pythiaSeed("INPUT")
294     if (self.sourceSeedVtx) :
295     self.PsetEdit.vtxSeed("INPUTVTX")
296     if (self.sourceSeedG4) :
297     self.PsetEdit.g4Seed("INPUTG4")
298     if (self.sourceSeedMix) :
299     self.PsetEdit.mixSeed("INPUTMIX")
300     # add FrameworkJobReport to parameter-set
301     self.PsetEdit.addCrabFJR(self.fjrFileName)
302     self.PsetEdit.psetWriter(self.configFilename())
303     # except:
304     # msg='Error while manipuliating ParameterSet: exiting...'
305     # raise CrabException(msg)
306 gutsche 1.3
307 slacapra 1.1 def DataDiscoveryAndLocation(self, cfg_params):
308    
309 gutsche 1.3 common.logger.debug(10,"CMSSW::DataDiscoveryAndLocation()")
310    
311     datasetPath=self.datasetPath
312    
313 slacapra 1.1 ## Contact the DBS
314 slacapra 1.41 common.logger.message("Contacting DBS...")
315 slacapra 1.1 try:
316 gutsche 1.66
317 corvo 1.85 if self.use_dbs_2 == 1 :
318     self.pubdata=DataDiscovery_DBS2.DataDiscovery_DBS2(datasetPath, cfg_params)
319     else :
320 slacapra 1.80 self.pubdata=DataDiscovery.DataDiscovery(datasetPath, cfg_params)
321 slacapra 1.1 self.pubdata.fetchDBSInfo()
322    
323 slacapra 1.41 except DataDiscovery.NotExistingDatasetError, ex :
324 slacapra 1.1 msg = 'ERROR ***: failed Data Discovery in DBS : %s'%ex.getErrorMessage()
325     raise CrabException(msg)
326 slacapra 1.41 except DataDiscovery.NoDataTierinProvenanceError, ex :
327 slacapra 1.1 msg = 'ERROR ***: failed Data Discovery in DBS : %s'%ex.getErrorMessage()
328     raise CrabException(msg)
329 slacapra 1.41 except DataDiscovery.DataDiscoveryError, ex:
330 gutsche 1.66 msg = 'ERROR ***: failed Data Discovery in DBS : %s'%ex.getErrorMessage()
331 slacapra 1.1 raise CrabException(msg)
332 gutsche 1.67 except DataDiscovery_DBS2.NotExistingDatasetError_DBS2, ex :
333     msg = 'ERROR ***: failed Data Discovery in DBS : %s'%ex.getErrorMessage()
334     raise CrabException(msg)
335     except DataDiscovery_DBS2.NoDataTierinProvenanceError_DBS2, ex :
336     msg = 'ERROR ***: failed Data Discovery in DBS : %s'%ex.getErrorMessage()
337     raise CrabException(msg)
338     except DataDiscovery_DBS2.DataDiscoveryError_DBS2, ex:
339     msg = 'ERROR ***: failed Data Discovery in DBS : %s'%ex.getErrorMessage()
340     raise CrabException(msg)
341 slacapra 1.1
342     ## get list of all required data in the form of dbs paths (dbs path = /dataset/datatier/owner)
343 gutsche 1.3 common.logger.message("Required data are :"+self.datasetPath)
344    
345 gutsche 1.35 self.filesbyblock=self.pubdata.getFiles()
346 mkirn 1.37 self.eventsbyblock=self.pubdata.getEventsPerBlock()
347     self.eventsbyfile=self.pubdata.getEventsPerFile()
348 gutsche 1.3
349 slacapra 1.1 ## get max number of events
350     self.maxEvents=self.pubdata.getMaxEvents() ## self.maxEvents used in Creator.py
351 gutsche 1.44 common.logger.message("The number of available events is %s\n"%self.maxEvents)
352 slacapra 1.1
353 slacapra 1.41 common.logger.message("Contacting DLS...")
354 slacapra 1.1 ## Contact the DLS and build a list of sites hosting the fileblocks
355     try:
356 slacapra 1.41 dataloc=DataLocation.DataLocation(self.filesbyblock.keys(),cfg_params)
357 gutsche 1.6 dataloc.fetchDLSInfo()
358 slacapra 1.41 except DataLocation.DataLocationError , ex:
359 slacapra 1.1 msg = 'ERROR ***: failed Data Location in DLS \n %s '%ex.getErrorMessage()
360     raise CrabException(msg)
361    
362    
363 gutsche 1.35 sites = dataloc.getSites()
364     allSites = []
365     listSites = sites.values()
366 slacapra 1.63 for listSite in listSites:
367     for oneSite in listSite:
368 gutsche 1.35 allSites.append(oneSite)
369     allSites = self.uniquelist(allSites)
370 gutsche 1.3
371 gutsche 1.35 common.logger.message("Sites ("+str(len(allSites))+") hosting part/all of dataset: "+str(allSites))
372     common.logger.debug(6, "List of Sites: "+str(allSites))
373     return sites
374 gutsche 1.3
375 gutsche 1.35 def jobSplittingByBlocks(self, blockSites):
376 slacapra 1.9 """
377 gutsche 1.35 Perform job splitting. Jobs run over an integer number of files
378     and no more than one block.
379     ARGUMENT: blockSites: dictionary with blocks as keys and list of host sites as values
380     REQUIRES: self.selectTotalNumberEvents, self.selectEventsPerJob, self.selectNumberofJobs,
381     self.total_number_of_events, self.eventsPerJob, self.theNumberOfJobs,
382     self.maxEvents, self.filesbyblock
383     SETS: self.jobDestination - Site destination(s) for each job (a list of lists)
384     self.total_number_of_jobs - Total # of jobs
385     self.list_of_args - File(s) job will run on (a list of lists)
386     """
387    
388     # ---- Handle the possible job splitting configurations ---- #
389     if (self.selectTotalNumberEvents):
390     totalEventsRequested = self.total_number_of_events
391     if (self.selectEventsPerJob):
392     eventsPerJobRequested = self.eventsPerJob
393     if (self.selectNumberOfJobs):
394     totalEventsRequested = self.theNumberOfJobs * self.eventsPerJob
395    
396     # If user requested all the events in the dataset
397     if (totalEventsRequested == -1):
398     eventsRemaining=self.maxEvents
399     # If user requested more events than are in the dataset
400     elif (totalEventsRequested > self.maxEvents):
401     eventsRemaining = self.maxEvents
402     common.logger.message("Requested "+str(self.total_number_of_events)+ " events, but only "+str(self.maxEvents)+" events are available.")
403     # If user requested less events than are in the dataset
404     else:
405     eventsRemaining = totalEventsRequested
406 slacapra 1.22
407 slacapra 1.41 # If user requested more events per job than are in the dataset
408     if (self.selectEventsPerJob and eventsPerJobRequested > self.maxEvents):
409     eventsPerJobRequested = self.maxEvents
410    
411 gutsche 1.35 # For user info at end
412     totalEventCount = 0
413 gutsche 1.3
414 gutsche 1.35 if (self.selectTotalNumberEvents and self.selectNumberOfJobs):
415     eventsPerJobRequested = int(eventsRemaining/self.theNumberOfJobs)
416 slacapra 1.22
417 gutsche 1.35 if (self.selectNumberOfJobs):
418     common.logger.message("May not create the exact number_of_jobs requested.")
419 slacapra 1.23
420 gutsche 1.38 if ( self.ncjobs == 'all' ) :
421     totalNumberOfJobs = 999999999
422     else :
423     totalNumberOfJobs = self.ncjobs
424    
425    
426 gutsche 1.35 blocks = blockSites.keys()
427     blockCount = 0
428     # Backup variable in case self.maxEvents counted events in a non-included block
429     numBlocksInDataset = len(blocks)
430 gutsche 1.3
431 gutsche 1.35 jobCount = 0
432     list_of_lists = []
433 gutsche 1.3
434 gutsche 1.35 # ---- Iterate over the blocks in the dataset until ---- #
435     # ---- we've met the requested total # of events ---- #
436 gutsche 1.38 while ( (eventsRemaining > 0) and (blockCount < numBlocksInDataset) and (jobCount < totalNumberOfJobs)):
437 gutsche 1.35 block = blocks[blockCount]
438 gutsche 1.44 blockCount += 1
439    
440 gutsche 1.68 if self.eventsbyblock.has_key(block) :
441     numEventsInBlock = self.eventsbyblock[block]
442     common.logger.debug(5,'Events in Block File '+str(numEventsInBlock))
443 slacapra 1.9
444 gutsche 1.68 files = self.filesbyblock[block]
445     numFilesInBlock = len(files)
446     if (numFilesInBlock <= 0):
447     continue
448     fileCount = 0
449    
450     # ---- New block => New job ---- #
451     parString = "\\{"
452     # counter for number of events in files currently worked on
453     filesEventCount = 0
454     # flag if next while loop should touch new file
455     newFile = 1
456     # job event counter
457     jobSkipEventCount = 0
458 slacapra 1.9
459 gutsche 1.68 # ---- Iterate over the files in the block until we've met the requested ---- #
460     # ---- total # of events or we've gone over all the files in this block ---- #
461     while ( (eventsRemaining > 0) and (fileCount < numFilesInBlock) and (jobCount < totalNumberOfJobs) ):
462     file = files[fileCount]
463     if newFile :
464     try:
465     numEventsInFile = self.eventsbyfile[file]
466     common.logger.debug(6, "File "+str(file)+" has "+str(numEventsInFile)+" events")
467     # increase filesEventCount
468     filesEventCount += numEventsInFile
469     # Add file to current job
470     parString += '\\\"' + file + '\\\"\,'
471     newFile = 0
472     except KeyError:
473     common.logger.message("File "+str(file)+" has unknown number of events: skipping")
474 slacapra 1.41
475 gutsche 1.38
476 gutsche 1.68 # if less events in file remain than eventsPerJobRequested
477     if ( filesEventCount - jobSkipEventCount < eventsPerJobRequested ) :
478     # if last file in block
479     if ( fileCount == numFilesInBlock-1 ) :
480     # end job using last file, use remaining events in block
481     # close job and touch new file
482     fullString = parString[:-2]
483     fullString += '\\}'
484     list_of_lists.append([fullString,str(-1),str(jobSkipEventCount)])
485     common.logger.debug(3,"Job "+str(jobCount+1)+" can run over "+str(filesEventCount - jobSkipEventCount)+" events (last file in block).")
486     self.jobDestination.append(blockSites[block])
487     common.logger.debug(5,"Job "+str(jobCount+1)+" Destination: "+str(self.jobDestination[jobCount]))
488     # reset counter
489     jobCount = jobCount + 1
490     totalEventCount = totalEventCount + filesEventCount - jobSkipEventCount
491     eventsRemaining = eventsRemaining - filesEventCount + jobSkipEventCount
492     jobSkipEventCount = 0
493     # reset file
494     parString = "\\{"
495     filesEventCount = 0
496     newFile = 1
497     fileCount += 1
498     else :
499     # go to next file
500     newFile = 1
501     fileCount += 1
502     # if events in file equal to eventsPerJobRequested
503     elif ( filesEventCount - jobSkipEventCount == eventsPerJobRequested ) :
504 gutsche 1.38 # close job and touch new file
505     fullString = parString[:-2]
506     fullString += '\\}'
507 gutsche 1.68 list_of_lists.append([fullString,str(eventsPerJobRequested),str(jobSkipEventCount)])
508     common.logger.debug(3,"Job "+str(jobCount+1)+" can run over "+str(eventsPerJobRequested)+" events.")
509 gutsche 1.38 self.jobDestination.append(blockSites[block])
510     common.logger.debug(5,"Job "+str(jobCount+1)+" Destination: "+str(self.jobDestination[jobCount]))
511     # reset counter
512     jobCount = jobCount + 1
513 gutsche 1.68 totalEventCount = totalEventCount + eventsPerJobRequested
514     eventsRemaining = eventsRemaining - eventsPerJobRequested
515 gutsche 1.38 jobSkipEventCount = 0
516     # reset file
517     parString = "\\{"
518     filesEventCount = 0
519     newFile = 1
520     fileCount += 1
521 gutsche 1.68
522     # if more events in file remain than eventsPerJobRequested
523 gutsche 1.38 else :
524 gutsche 1.68 # close job but don't touch new file
525     fullString = parString[:-2]
526     fullString += '\\}'
527     list_of_lists.append([fullString,str(eventsPerJobRequested),str(jobSkipEventCount)])
528     common.logger.debug(3,"Job "+str(jobCount+1)+" can run over "+str(eventsPerJobRequested)+" events.")
529     self.jobDestination.append(blockSites[block])
530     common.logger.debug(5,"Job "+str(jobCount+1)+" Destination: "+str(self.jobDestination[jobCount]))
531     # increase counter
532     jobCount = jobCount + 1
533     totalEventCount = totalEventCount + eventsPerJobRequested
534     eventsRemaining = eventsRemaining - eventsPerJobRequested
535     # calculate skip events for last file
536     # use filesEventCount (contains several files), jobSkipEventCount and eventsPerJobRequest
537     jobSkipEventCount = eventsPerJobRequested - (filesEventCount - jobSkipEventCount - self.eventsbyfile[file])
538     # remove all but the last file
539     filesEventCount = self.eventsbyfile[file]
540     parString = "\\{"
541     parString += '\\\"' + file + '\\\"\,'
542     pass # END if
543     pass # END while (iterate over files in the block)
544 gutsche 1.35 pass # END while (iterate over blocks in the dataset)
545 slacapra 1.41 self.ncjobs = self.total_number_of_jobs = jobCount
546 gutsche 1.38 if (eventsRemaining > 0 and jobCount < totalNumberOfJobs ):
547 gutsche 1.35 common.logger.message("Could not run on all requested events because some blocks not hosted at allowed sites.")
548 mkirn 1.37 common.logger.message("\n"+str(jobCount)+" job(s) can run on "+str(totalEventCount)+" events.\n")
549 slacapra 1.22
550 slacapra 1.9 self.list_of_args = list_of_lists
551     return
552    
553 slacapra 1.21 def jobSplittingNoInput(self):
554 slacapra 1.9 """
555     Perform job splitting based on number of event per job
556     """
557     common.logger.debug(5,'Splitting per events')
558     common.logger.message('Required '+str(self.eventsPerJob)+' events per job ')
559 slacapra 1.22 common.logger.message('Required '+str(self.theNumberOfJobs)+' jobs in total ')
560 slacapra 1.9 common.logger.message('Required '+str(self.total_number_of_events)+' events in total ')
561    
562 slacapra 1.10 if (self.total_number_of_events < 0):
563     msg='Cannot split jobs per Events with "-1" as total number of events'
564     raise CrabException(msg)
565    
566 slacapra 1.22 if (self.selectEventsPerJob):
567 spiga 1.65 if (self.selectTotalNumberEvents):
568     self.total_number_of_jobs = int(self.total_number_of_events/self.eventsPerJob)
569     elif(self.selectNumberOfJobs) :
570     self.total_number_of_jobs =self.theNumberOfJobs
571     self.total_number_of_events =int(self.theNumberOfJobs*self.eventsPerJob)
572    
573 slacapra 1.22 elif (self.selectNumberOfJobs) :
574     self.total_number_of_jobs = self.theNumberOfJobs
575     self.eventsPerJob = int(self.total_number_of_events/self.total_number_of_jobs)
576 spiga 1.65
577 slacapra 1.9 common.logger.debug(5,'N jobs '+str(self.total_number_of_jobs))
578    
579     # is there any remainder?
580     check = int(self.total_number_of_events) - (int(self.total_number_of_jobs)*self.eventsPerJob)
581    
582     common.logger.debug(5,'Check '+str(check))
583    
584 gutsche 1.35 common.logger.message(str(self.total_number_of_jobs)+' jobs can be created, each for '+str(self.eventsPerJob)+' for a total of '+str(self.total_number_of_jobs*self.eventsPerJob)+' events')
585 slacapra 1.9 if check > 0:
586 gutsche 1.35 common.logger.message('Warning: asked '+str(self.total_number_of_events)+' but can do only '+str(int(self.total_number_of_jobs)*self.eventsPerJob))
587 slacapra 1.9
588 slacapra 1.10 # argument is seed number.$i
589 slacapra 1.9 self.list_of_args = []
590     for i in range(self.total_number_of_jobs):
591 gutsche 1.35 ## Since there is no input, any site is good
592 spiga 1.42 self.jobDestination.append([""]) #must be empty to write correctly the xml
593 corvo 1.85 args=[]
594 spiga 1.57 if (self.firstRun):
595     ## pythia first run
596 corvo 1.85 args.append(str(self.firstRun)+str(i))
597 spiga 1.57 else:
598     ## no first run
599 corvo 1.85 args.append(str(i))
600 slacapra 1.23 if (self.sourceSeed):
601 corvo 1.85 args.append(str(self.sourceSeed)+str(i))
602 slacapra 1.28 if (self.sourceSeedVtx):
603 corvo 1.85 ## + vtx random seed
604     args.append(str(self.sourceSeedVtx)+str(i))
605     if (self.sourceSeedG4):
606     ## + G4 random seed
607     args.append(str(self.sourceSeedG4)+str(i))
608     if (self.sourceSeedMix):
609     ## + Mix random seed
610     args.append(str(self.sourceSeedMix)+str(i))
611     pass
612     pass
613     self.list_of_args.append(args)
614     pass
615    
616     #common.logger.debug(5,"Arguments list (pythia-like job):"+str(self.list_of_args))
617 gutsche 1.3
618     return
619    
620 spiga 1.42
621     def jobSplittingForScript(self):#CarlosDaniele
622     """
623     Perform job splitting based on number of job
624     """
625     common.logger.debug(5,'Splitting per job')
626     common.logger.message('Required '+str(self.theNumberOfJobs)+' jobs in total ')
627    
628     self.total_number_of_jobs = self.theNumberOfJobs
629    
630     common.logger.debug(5,'N jobs '+str(self.total_number_of_jobs))
631    
632     common.logger.message(str(self.total_number_of_jobs)+' jobs can be created')
633    
634     # argument is seed number.$i
635     self.list_of_args = []
636     for i in range(self.total_number_of_jobs):
637     ## Since there is no input, any site is good
638     # self.jobDestination.append(["Any"])
639     self.jobDestination.append([""])
640     ## no random seed
641     self.list_of_args.append([str(i)])
642     return
643    
644 gutsche 1.3 def split(self, jobParams):
645    
646     common.jobDB.load()
647     #### Fabio
648     njobs = self.total_number_of_jobs
649 slacapra 1.9 arglist = self.list_of_args
650 gutsche 1.3 # create the empty structure
651     for i in range(njobs):
652     jobParams.append("")
653    
654     for job in range(njobs):
655 slacapra 1.17 jobParams[job] = arglist[job]
656     # print str(arglist[job])
657     # print jobParams[job]
658 gutsche 1.3 common.jobDB.setArguments(job, jobParams[job])
659 gutsche 1.35 common.logger.debug(5,"Job "+str(job)+" Destination: "+str(self.jobDestination[job]))
660     common.jobDB.setDestination(job, self.jobDestination[job])
661 gutsche 1.3
662     common.jobDB.save()
663     return
664    
665     def getJobTypeArguments(self, nj, sched):
666 slacapra 1.17 result = ''
667     for i in common.jobDB.arguments(nj):
668     result=result+str(i)+" "
669     return result
670 gutsche 1.3
671     def numberOfJobs(self):
672     # Fabio
673     return self.total_number_of_jobs
674    
675 slacapra 1.1 def getTarBall(self, exe):
676     """
677     Return the TarBall with lib and exe
678     """
679    
680     # if it exist, just return it
681 corvo 1.56 #
682     # Marco. Let's start to use relative path for Boss XML files
683     #
684     self.tgzNameWithPath = common.work_space.pathForTgz()+'share/'+self.tgz_name
685 slacapra 1.1 if os.path.exists(self.tgzNameWithPath):
686     return self.tgzNameWithPath
687    
688     # Prepare a tar gzipped file with user binaries.
689     self.buildTar_(exe)
690    
691     return string.strip(self.tgzNameWithPath)
692    
693     def buildTar_(self, executable):
694    
695     # First of all declare the user Scram area
696     swArea = self.scram.getSWArea_()
697     #print "swArea = ", swArea
698 slacapra 1.63 # swVersion = self.scram.getSWVersion()
699     # print "swVersion = ", swVersion
700 slacapra 1.1 swReleaseTop = self.scram.getReleaseTop_()
701     #print "swReleaseTop = ", swReleaseTop
702    
703     ## check if working area is release top
704     if swReleaseTop == '' or swArea == swReleaseTop:
705     return
706    
707 slacapra 1.61 import tarfile
708     try: # create tar ball
709     tar = tarfile.open(self.tgzNameWithPath, "w:gz")
710     ## First find the executable
711 corvo 1.85 if (executable != ''):
712 slacapra 1.61 exeWithPath = self.scram.findFile_(executable)
713     if ( not exeWithPath ):
714     raise CrabException('User executable '+executable+' not found')
715    
716     ## then check if it's private or not
717     if exeWithPath.find(swReleaseTop) == -1:
718     # the exe is private, so we must ship
719     common.logger.debug(5,"Exe "+exeWithPath+" to be tarred")
720     path = swArea+'/'
721 corvo 1.85 # distinguish case when script is in user project area or given by full path somewhere else
722     if exeWithPath.find(path) >= 0 :
723     exe = string.replace(exeWithPath, path,'')
724     tar.add(path+exe,os.path.basename(executable))
725     else :
726     tar.add(exeWithPath,os.path.basename(executable))
727 slacapra 1.61 pass
728     else:
729     # the exe is from release, we'll find it on WN
730     pass
731    
732     ## Now get the libraries: only those in local working area
733     libDir = 'lib'
734     lib = swArea+'/' +libDir
735     common.logger.debug(5,"lib "+lib+" to be tarred")
736     if os.path.exists(lib):
737     tar.add(lib,libDir)
738    
739     ## Now check if module dir is present
740     moduleDir = 'module'
741     module = swArea + '/' + moduleDir
742     if os.path.isdir(module):
743     tar.add(module,moduleDir)
744    
745     ## Now check if any data dir(s) is present
746     swAreaLen=len(swArea)
747     for root, dirs, files in os.walk(swArea):
748     if "data" in dirs:
749     common.logger.debug(5,"data "+root+"/data"+" to be tarred")
750     tar.add(root+"/data",root[swAreaLen:]+"/data")
751    
752     ## Add ProdAgent dir to tar
753     paDir = 'ProdAgentApi'
754     pa = os.environ['CRABDIR'] + '/' + 'ProdAgentApi'
755     if os.path.isdir(pa):
756     tar.add(pa,paDir)
757    
758     common.logger.debug(5,"Files added to "+self.tgzNameWithPath+" : "+str(tar.getnames()))
759     tar.close()
760     except :
761     raise CrabException('Could not create tar-ball')
762 gutsche 1.72
763     ## check for tarball size
764     tarballinfo = os.stat(self.tgzNameWithPath)
765     if ( tarballinfo.st_size > self.MaxTarBallSize*1024*1024 ) :
766     raise CrabException('Input sandbox size of ' + str(float(tarballinfo.st_size)/1024.0/1024.0) + ' MB is larger than the allowed ' + str(self.MaxTarBallSize) + ' MB input sandbox limit and not supported by the used GRID submission system. Please make sure that no unnecessary files are in all data directories in your local CMSSW project area as they are automatically packed into the input sandbox.')
767    
768 slacapra 1.61 ## create tar-ball with ML stuff
769 corvo 1.58 self.MLtgzfile = common.work_space.pathForTgz()+'share/MLfiles.tgz'
770 slacapra 1.61 try:
771     tar = tarfile.open(self.MLtgzfile, "w:gz")
772     path=os.environ['CRABDIR'] + '/python/'
773     for file in ['report.py', 'DashboardAPI.py', 'Logger.py', 'ProcInfo.py', 'apmon.py', 'parseCrabFjr.py']:
774     tar.add(path+file,file)
775     common.logger.debug(5,"Files added to "+self.MLtgzfile+" : "+str(tar.getnames()))
776     tar.close()
777     except :
778 corvo 1.58 raise CrabException('Could not create ML files tar-ball')
779    
780 slacapra 1.1 return
781    
782     def wsSetupEnvironment(self, nj):
783     """
784     Returns part of a job script which prepares
785     the execution environment for the job 'nj'.
786     """
787     # Prepare JobType-independent part
788 gutsche 1.3 txt = ''
789    
790     ## OLI_Daniele at this level middleware already known
791    
792     txt += 'if [ $middleware == LCG ]; then \n'
793     txt += self.wsSetupCMSLCGEnvironment_()
794     txt += 'elif [ $middleware == OSG ]; then\n'
795 gutsche 1.43 txt += ' WORKING_DIR=`/bin/mktemp -d $OSG_WN_TMP/cms_XXXXXXXXXXXX`\n'
796     txt += ' echo "Created working directory: $WORKING_DIR"\n'
797 gutsche 1.3 txt += ' if [ ! -d $WORKING_DIR ] ;then\n'
798 gutsche 1.7 txt += ' echo "SET_CMS_ENV 10016 ==> OSG $WORKING_DIR could not be created on WN `hostname`"\n'
799     txt += ' echo "JOB_EXIT_STATUS = 10016"\n'
800     txt += ' echo "JobExitCode=10016" | tee -a $RUNTIME_AREA/$repo\n'
801     txt += ' dumpStatus $RUNTIME_AREA/$repo\n'
802 gutsche 1.13 txt += ' rm -f $RUNTIME_AREA/$repo \n'
803     txt += ' echo "MonitorJobID=`echo $MonitorJobID`" | tee -a $RUNTIME_AREA/$repo \n'
804     txt += ' echo "MonitorID=`echo $MonitorID`" | tee -a $RUNTIME_AREA/$repo\n'
805 gutsche 1.3 txt += ' exit 1\n'
806     txt += ' fi\n'
807     txt += '\n'
808     txt += ' echo "Change to working directory: $WORKING_DIR"\n'
809     txt += ' cd $WORKING_DIR\n'
810     txt += self.wsSetupCMSOSGEnvironment_()
811     txt += 'fi\n'
812 slacapra 1.1
813     # Prepare JobType-specific part
814     scram = self.scram.commandName()
815     txt += '\n\n'
816     txt += 'echo "### SPECIFIC JOB SETUP ENVIRONMENT ###"\n'
817     txt += scram+' project CMSSW '+self.version+'\n'
818     txt += 'status=$?\n'
819     txt += 'if [ $status != 0 ] ; then\n'
820 gutsche 1.7 txt += ' echo "SET_EXE_ENV 10034 ==>ERROR CMSSW '+self.version+' not found on `hostname`" \n'
821 gutsche 1.3 txt += ' echo "JOB_EXIT_STATUS = 10034"\n'
822 gutsche 1.7 txt += ' echo "JobExitCode=10034" | tee -a $RUNTIME_AREA/$repo\n'
823 slacapra 1.1 txt += ' dumpStatus $RUNTIME_AREA/$repo\n'
824 gutsche 1.13 txt += ' rm -f $RUNTIME_AREA/$repo \n'
825     txt += ' echo "MonitorJobID=`echo $MonitorJobID`" | tee -a $RUNTIME_AREA/$repo \n'
826     txt += ' echo "MonitorID=`echo $MonitorID`" | tee -a $RUNTIME_AREA/$repo\n'
827 gutsche 1.3 ## OLI_Daniele
828     txt += ' if [ $middleware == OSG ]; then \n'
829     txt += ' echo "Remove working directory: $WORKING_DIR"\n'
830     txt += ' cd $RUNTIME_AREA\n'
831     txt += ' /bin/rm -rf $WORKING_DIR\n'
832     txt += ' if [ -d $WORKING_DIR ] ;then\n'
833 gutsche 1.7 txt += ' echo "SET_CMS_ENV 10018 ==> OSG $WORKING_DIR could not be deleted on WN `hostname` after CMSSW CMSSW_0_6_1 not found on `hostname`"\n'
834     txt += ' echo "JOB_EXIT_STATUS = 10018"\n'
835     txt += ' echo "JobExitCode=10018" | tee -a $RUNTIME_AREA/$repo\n'
836     txt += ' dumpStatus $RUNTIME_AREA/$repo\n'
837 gutsche 1.13 txt += ' rm -f $RUNTIME_AREA/$repo \n'
838     txt += ' echo "MonitorJobID=`echo $MonitorJobID`" | tee -a $RUNTIME_AREA/$repo \n'
839     txt += ' echo "MonitorID=`echo $MonitorID`" | tee -a $RUNTIME_AREA/$repo\n'
840 gutsche 1.3 txt += ' fi\n'
841     txt += ' fi \n'
842     txt += ' exit 1 \n'
843 slacapra 1.1 txt += 'fi \n'
844     txt += 'echo "CMSSW_VERSION = '+self.version+'"\n'
845 corvo 1.85 txt += 'export SCRAM_ARCH='+self.executable_arch+'\n'
846 slacapra 1.1 txt += 'cd '+self.version+'\n'
847     ### needed grep for bug in scramv1 ###
848 corvo 1.58 txt += scram+' runtime -sh\n'
849 slacapra 1.1 txt += 'eval `'+scram+' runtime -sh | grep -v SCRAMRT_LSB_JOBNAME`\n'
850 corvo 1.58 txt += 'echo $PATH\n'
851 slacapra 1.1
852     # Handle the arguments:
853     txt += "\n"
854 gutsche 1.7 txt += "## number of arguments (first argument always jobnumber)\n"
855 slacapra 1.1 txt += "\n"
856 mkirn 1.32 # txt += "narg=$#\n"
857     txt += "if [ $nargs -lt 2 ]\n"
858 slacapra 1.1 txt += "then\n"
859 mkirn 1.33 txt += " echo 'SET_EXE_ENV 1 ==> ERROR Too few arguments' +$nargs+ \n"
860 gutsche 1.3 txt += ' echo "JOB_EXIT_STATUS = 50113"\n'
861 gutsche 1.7 txt += ' echo "JobExitCode=50113" | tee -a $RUNTIME_AREA/$repo\n'
862 slacapra 1.1 txt += ' dumpStatus $RUNTIME_AREA/$repo\n'
863 gutsche 1.13 txt += ' rm -f $RUNTIME_AREA/$repo \n'
864     txt += ' echo "MonitorJobID=`echo $MonitorJobID`" | tee -a $RUNTIME_AREA/$repo \n'
865     txt += ' echo "MonitorID=`echo $MonitorID`" | tee -a $RUNTIME_AREA/$repo\n'
866 gutsche 1.3 ## OLI_Daniele
867     txt += ' if [ $middleware == OSG ]; then \n'
868     txt += ' echo "Remove working directory: $WORKING_DIR"\n'
869     txt += ' cd $RUNTIME_AREA\n'
870     txt += ' /bin/rm -rf $WORKING_DIR\n'
871     txt += ' if [ -d $WORKING_DIR ] ;then\n'
872 gutsche 1.7 txt += ' echo "SET_EXE_ENV 50114 ==> OSG $WORKING_DIR could not be deleted on WN `hostname` after Too few arguments for CRAB job wrapper"\n'
873     txt += ' echo "JOB_EXIT_STATUS = 50114"\n'
874     txt += ' echo "JobExitCode=50114" | tee -a $RUNTIME_AREA/$repo\n'
875     txt += ' dumpStatus $RUNTIME_AREA/$repo\n'
876 gutsche 1.13 txt += ' rm -f $RUNTIME_AREA/$repo \n'
877     txt += ' echo "MonitorJobID=`echo $MonitorJobID`" | tee -a $RUNTIME_AREA/$repo \n'
878     txt += ' echo "MonitorID=`echo $MonitorID`" | tee -a $RUNTIME_AREA/$repo\n'
879 gutsche 1.3 txt += ' fi\n'
880     txt += ' fi \n'
881 slacapra 1.1 txt += " exit 1\n"
882     txt += "fi\n"
883     txt += "\n"
884    
885     # Prepare job-specific part
886     job = common.job_list[nj]
887 spiga 1.42 if self.pset != None: #CarlosDaniele
888     pset = os.path.basename(job.configFilename())
889     txt += '\n'
890 corvo 1.85 txt += 'cp $RUNTIME_AREA/'+pset+' .\n'
891 spiga 1.42 if (self.datasetPath): # standard job
892     #txt += 'InputFiles=$2\n'
893     txt += 'InputFiles=${args[1]}\n'
894     txt += 'MaxEvents=${args[2]}\n'
895     txt += 'SkipEvents=${args[3]}\n'
896     txt += 'echo "Inputfiles:<$InputFiles>"\n'
897 corvo 1.85 txt += 'sed "s#{\'INPUT\'}#$InputFiles#" '+pset+' > tmp && mv -f tmp '+pset+'\n'
898 spiga 1.42 txt += 'echo "MaxEvents:<$MaxEvents>"\n'
899 corvo 1.85 txt += 'sed "s#INPUTMAXEVENTS#$MaxEvents#" '+pset+' > tmp && mv -f tmp '+pset+'\n'
900 spiga 1.42 txt += 'echo "SkipEvents:<$SkipEvents>"\n'
901 corvo 1.85 txt += 'sed "s#INPUTSKIPEVENTS#$SkipEvents#" '+pset+' > tmp && mv -f tmp '+pset+'\n'
902 spiga 1.42 else: # pythia like job
903 corvo 1.85 seedIndex=1
904     if (self.firstRun):
905     txt += 'FirstRun=${args['+str(seedIndex)+']}\n'
906 spiga 1.57 txt += 'echo "FirstRun: <$FirstRun>"\n'
907 corvo 1.85 txt += 'sed "s#\<INPUTFIRSTRUN\>#$FirstRun#" '+pset+' > tmp && mv -f tmp '+pset+'\n'
908     seedIndex += 1
909 spiga 1.57 if (self.sourceSeed):
910 corvo 1.85 txt += 'Seed=${args['+str(seedIndex)+']}\n'
911 spiga 1.42 txt += 'echo "Seed: <$Seed>"\n'
912 corvo 1.85 txt += 'sed "s#\<INPUT\>#$Seed#" '+pset+' > tmp && mv -f tmp '+pset+'\n'
913     seedIndex += 1
914     ## the following seeds are not always present
915 spiga 1.42 if (self.sourceSeedVtx):
916 corvo 1.85 txt += 'VtxSeed=${args['+str(seedIndex)+']}\n'
917 spiga 1.42 txt += 'echo "VtxSeed: <$VtxSeed>"\n'
918 corvo 1.85 txt += 'sed "s#\<INPUTVTX\>#$VtxSeed#" '+pset+' > tmp && mv -f tmp '+pset+'\n'
919     seedIndex += 1
920     if (self.sourceSeedG4):
921     txt += 'G4Seed=${args['+str(seedIndex)+']}\n'
922     txt += 'echo "G4Seed: <$G4Seed>"\n'
923     txt += 'sed "s#\<INPUTG4\>#$G4Seed#" '+pset+' > tmp && mv -f tmp '+pset+'\n'
924     seedIndex += 1
925     if (self.sourceSeedMix):
926     txt += 'mixSeed=${args['+str(seedIndex)+']}\n'
927     txt += 'echo "MixSeed: <$mixSeed>"\n'
928     txt += 'sed "s#\<INPUTMIX\>#$mixSeed#" '+pset+' > tmp && mv -f tmp '+pset+'\n'
929     seedIndex += 1
930     pass
931     pass
932     txt += 'mv -f '+pset+' pset.cfg\n'
933 slacapra 1.24
934 slacapra 1.1
935     if len(self.additional_inbox_files) > 0:
936     for file in self.additional_inbox_files:
937 mkirn 1.31 relFile = file.split("/")[-1]
938     txt += 'if [ -e $RUNTIME_AREA/'+relFile+' ] ; then\n'
939     txt += ' cp $RUNTIME_AREA/'+relFile+' .\n'
940     txt += ' chmod +x '+relFile+'\n'
941 slacapra 1.1 txt += 'fi\n'
942     pass
943    
944 spiga 1.42 if self.pset != None: #CarlosDaniele
945     txt += 'echo "### END JOB SETUP ENVIRONMENT ###"\n\n'
946    
947     txt += '\n'
948     txt += 'echo "***** cat pset.cfg *********"\n'
949     txt += 'cat pset.cfg\n'
950     txt += 'echo "****** end pset.cfg ********"\n'
951     txt += '\n'
952     # txt += 'echo "***** cat pset1.cfg *********"\n'
953     # txt += 'cat pset1.cfg\n'
954     # txt += 'echo "****** end pset1.cfg ********"\n'
955 gutsche 1.3 return txt
956    
957 slacapra 1.63 def wsBuildExe(self, nj=0):
958 gutsche 1.3 """
959     Put in the script the commands to build an executable
960     or a library.
961     """
962    
963     txt = ""
964    
965     if os.path.isfile(self.tgzNameWithPath):
966     txt += 'echo "tar xzvf $RUNTIME_AREA/'+os.path.basename(self.tgzNameWithPath)+'"\n'
967     txt += 'tar xzvf $RUNTIME_AREA/'+os.path.basename(self.tgzNameWithPath)+'\n'
968     txt += 'untar_status=$? \n'
969     txt += 'if [ $untar_status -ne 0 ]; then \n'
970     txt += ' echo "SET_EXE 1 ==> ERROR Untarring .tgz file failed"\n'
971     txt += ' echo "JOB_EXIT_STATUS = $untar_status" \n'
972 gutsche 1.7 txt += ' echo "JobExitCode=$untar_status" | tee -a $RUNTIME_AREA/$repo\n'
973 gutsche 1.3 txt += ' if [ $middleware == OSG ]; then \n'
974     txt += ' echo "Remove working directory: $WORKING_DIR"\n'
975     txt += ' cd $RUNTIME_AREA\n'
976     txt += ' /bin/rm -rf $WORKING_DIR\n'
977     txt += ' if [ -d $WORKING_DIR ] ;then\n'
978 gutsche 1.13 txt += ' echo "SET_EXE 50999 ==> OSG $WORKING_DIR could not be deleted on WN `hostname` after Untarring .tgz file failed"\n'
979     txt += ' echo "JOB_EXIT_STATUS = 50999"\n'
980     txt += ' echo "JobExitCode=50999" | tee -a $RUNTIME_AREA/$repo\n'
981     txt += ' dumpStatus $RUNTIME_AREA/$repo\n'
982     txt += ' rm -f $RUNTIME_AREA/$repo \n'
983     txt += ' echo "MonitorJobID=`echo $MonitorJobID`" | tee -a $RUNTIME_AREA/$repo \n'
984     txt += ' echo "MonitorID=`echo $MonitorID`" | tee -a $RUNTIME_AREA/$repo\n'
985 gutsche 1.3 txt += ' fi\n'
986     txt += ' fi \n'
987     txt += ' \n'
988 gutsche 1.7 txt += ' exit 1 \n'
989 gutsche 1.3 txt += 'else \n'
990     txt += ' echo "Successful untar" \n'
991     txt += 'fi \n'
992 gutsche 1.50 txt += '\n'
993     txt += 'echo "Include ProdAgentApi in PYTHONPATH"\n'
994     txt += 'if [ -z "$PYTHONPATH" ]; then\n'
995     txt += ' export PYTHONPATH=ProdAgentApi\n'
996     txt += 'else\n'
997     txt += ' export PYTHONPATH=ProdAgentApi:${PYTHONPATH}\n'
998     txt += 'fi\n'
999     txt += '\n'
1000    
1001 gutsche 1.3 pass
1002    
1003 slacapra 1.1 return txt
1004    
1005     def modifySteeringCards(self, nj):
1006     """
1007     modify the card provided by the user,
1008     writing a new card into share dir
1009     """
1010    
1011     def executableName(self):
1012 slacapra 1.70 if self.scriptExe: #CarlosDaniele
1013 spiga 1.42 return "sh "
1014     else:
1015     return self.executable
1016 slacapra 1.1
1017     def executableArgs(self):
1018 slacapra 1.70 if self.scriptExe:#CarlosDaniele
1019 spiga 1.42 return self.scriptExe + " $NJob"
1020     else:
1021     return " -p pset.cfg"
1022 slacapra 1.1
1023     def inputSandbox(self, nj):
1024     """
1025     Returns a list of filenames to be put in JDL input sandbox.
1026     """
1027     inp_box = []
1028 slacapra 1.53 # # dict added to delete duplicate from input sandbox file list
1029     # seen = {}
1030 slacapra 1.1 ## code
1031     if os.path.isfile(self.tgzNameWithPath):
1032     inp_box.append(self.tgzNameWithPath)
1033 corvo 1.58 if os.path.isfile(self.MLtgzfile):
1034     inp_box.append(self.MLtgzfile)
1035 slacapra 1.1 ## config
1036 slacapra 1.70 if not self.pset is None:
1037 corvo 1.56 inp_box.append(common.work_space.pathForTgz() + 'job/' + self.configFilename())
1038 slacapra 1.1 ## additional input files
1039 slacapra 1.70 for file in self.additional_inbox_files:
1040     inp_box.append(file)
1041 slacapra 1.1 return inp_box
1042    
1043     def outputSandbox(self, nj):
1044     """
1045     Returns a list of filenames to be put in JDL output sandbox.
1046     """
1047     out_box = []
1048    
1049     ## User Declared output files
1050 slacapra 1.54 for out in (self.output_file+self.output_file_sandbox):
1051 slacapra 1.1 n_out = nj + 1
1052     out_box.append(self.numberFile_(out,str(n_out)))
1053     return out_box
1054    
1055     def prepareSteeringCards(self):
1056     """
1057     Make initial modifications of the user's steering card file.
1058     """
1059     return
1060    
1061     def wsRenameOutput(self, nj):
1062     """
1063     Returns part of a job script which renames the produced files.
1064     """
1065    
1066     txt = '\n'
1067 gutsche 1.7 txt += '# directory content\n'
1068     txt += 'ls \n'
1069 slacapra 1.54
1070     for fileWithSuffix in (self.output_file+self.output_file_sandbox):
1071 slacapra 1.1 output_file_num = self.numberFile_(fileWithSuffix, '$NJob')
1072     txt += '\n'
1073 gutsche 1.7 txt += '# check output file\n'
1074 slacapra 1.1 txt += 'ls '+fileWithSuffix+'\n'
1075 fanzago 1.18 txt += 'ls_result=$?\n'
1076     txt += 'if [ $ls_result -ne 0 ] ; then\n'
1077     txt += ' echo "ERROR: Problem with output file"\n'
1078 gutsche 1.7 if common.scheduler.boss_scheduler_name == 'condor_g':
1079     txt += ' if [ $middleware == OSG ]; then \n'
1080     txt += ' echo "prepare dummy output file"\n'
1081     txt += ' echo "Processing of job output failed" > $RUNTIME_AREA/'+output_file_num+'\n'
1082     txt += ' fi \n'
1083 slacapra 1.1 txt += 'else\n'
1084     txt += ' cp '+fileWithSuffix+' $RUNTIME_AREA/'+output_file_num+'\n'
1085     txt += 'fi\n'
1086    
1087 gutsche 1.7 txt += 'cd $RUNTIME_AREA\n'
1088 fanzago 1.18 txt += 'cd $RUNTIME_AREA\n'
1089 gutsche 1.3 ### OLI_DANIELE
1090     txt += 'if [ $middleware == OSG ]; then\n'
1091     txt += ' cd $RUNTIME_AREA\n'
1092     txt += ' echo "Remove working directory: $WORKING_DIR"\n'
1093     txt += ' /bin/rm -rf $WORKING_DIR\n'
1094     txt += ' if [ -d $WORKING_DIR ] ;then\n'
1095 gutsche 1.7 txt += ' echo "SET_EXE 60999 ==> OSG $WORKING_DIR could not be deleted on WN `hostname` after cleanup of WN"\n'
1096     txt += ' echo "JOB_EXIT_STATUS = 60999"\n'
1097     txt += ' echo "JobExitCode=60999" | tee -a $RUNTIME_AREA/$repo\n'
1098     txt += ' dumpStatus $RUNTIME_AREA/$repo\n'
1099 gutsche 1.13 txt += ' rm -f $RUNTIME_AREA/$repo \n'
1100     txt += ' echo "MonitorJobID=`echo $MonitorJobID`" | tee -a $RUNTIME_AREA/$repo \n'
1101     txt += ' echo "MonitorID=`echo $MonitorID`" | tee -a $RUNTIME_AREA/$repo\n'
1102 gutsche 1.3 txt += ' fi\n'
1103     txt += 'fi\n'
1104     txt += '\n'
1105 slacapra 1.54
1106     file_list = ''
1107     ## Add to filelist only files to be possibly copied to SE
1108     for fileWithSuffix in self.output_file:
1109     output_file_num = self.numberFile_(fileWithSuffix, '$NJob')
1110     file_list=file_list+output_file_num+' '
1111     file_list=file_list[:-1]
1112     txt += 'file_list="'+file_list+'"\n'
1113    
1114 slacapra 1.1 return txt
1115    
1116     def numberFile_(self, file, txt):
1117     """
1118     append _'txt' before last extension of a file
1119     """
1120     p = string.split(file,".")
1121     # take away last extension
1122     name = p[0]
1123     for x in p[1:-1]:
1124     name=name+"."+x
1125     # add "_txt"
1126     if len(p)>1:
1127     ext = p[len(p)-1]
1128     result = name + '_' + txt + "." + ext
1129     else:
1130     result = name + '_' + txt
1131    
1132     return result
1133    
1134 slacapra 1.63 def getRequirements(self, nj=[]):
1135 slacapra 1.1 """
1136     return job requirements to add to jdl files
1137     """
1138     req = ''
1139 slacapra 1.47 if self.version:
1140 slacapra 1.10 req='Member("VO-cms-' + \
1141 slacapra 1.47 self.version + \
1142 slacapra 1.10 '", other.GlueHostApplicationSoftwareRunTimeEnvironment)'
1143 gutsche 1.35
1144     req = req + ' && (other.GlueHostNetworkAdapterOutboundIP)'
1145    
1146 slacapra 1.1 return req
1147 gutsche 1.3
1148     def configFilename(self):
1149     """ return the config filename """
1150     return self.name()+'.cfg'
1151    
1152     ### OLI_DANIELE
1153     def wsSetupCMSOSGEnvironment_(self):
1154     """
1155     Returns part of a job script which is prepares
1156     the execution environment and which is common for all CMS jobs.
1157     """
1158     txt = '\n'
1159     txt += ' echo "### SETUP CMS OSG ENVIRONMENT ###"\n'
1160     txt += ' if [ -f $GRID3_APP_DIR/cmssoft/cmsset_default.sh ] ;then\n'
1161     txt += ' # Use $GRID3_APP_DIR/cmssoft/cmsset_default.sh to setup cms software\n'
1162     txt += ' source $GRID3_APP_DIR/cmssoft/cmsset_default.sh '+self.version+'\n'
1163 mkirn 1.40 txt += ' elif [ -f $OSG_APP/cmssoft/cms/cmsset_default.sh ] ;then\n'
1164     txt += ' # Use $OSG_APP/cmssoft/cms/cmsset_default.sh to setup cms software\n'
1165     txt += ' source $OSG_APP/cmssoft/cms/cmsset_default.sh '+self.version+'\n'
1166 gutsche 1.3 txt += ' else\n'
1167 mkirn 1.40 txt += ' echo "SET_CMS_ENV 10020 ==> ERROR $GRID3_APP_DIR/cmssoft/cmsset_default.sh and $OSG_APP/cmssoft/cms/cmsset_default.sh file not found"\n'
1168 gutsche 1.3 txt += ' echo "JOB_EXIT_STATUS = 10020"\n'
1169     txt += ' echo "JobExitCode=10020" | tee -a $RUNTIME_AREA/$repo\n'
1170     txt += ' dumpStatus $RUNTIME_AREA/$repo\n'
1171 gutsche 1.13 txt += ' rm -f $RUNTIME_AREA/$repo \n'
1172     txt += ' echo "MonitorJobID=`echo $MonitorJobID`" | tee -a $RUNTIME_AREA/$repo \n'
1173     txt += ' echo "MonitorID=`echo $MonitorID`" | tee -a $RUNTIME_AREA/$repo\n'
1174 gutsche 1.7 txt += ' exit 1\n'
1175 gutsche 1.3 txt += '\n'
1176     txt += ' echo "Remove working directory: $WORKING_DIR"\n'
1177     txt += ' cd $RUNTIME_AREA\n'
1178     txt += ' /bin/rm -rf $WORKING_DIR\n'
1179     txt += ' if [ -d $WORKING_DIR ] ;then\n'
1180 mkirn 1.40 txt += ' echo "SET_CMS_ENV 10017 ==> OSG $WORKING_DIR could not be deleted on WN `hostname` after $GRID3_APP_DIR/cmssoft/cmsset_default.sh and $OSG_APP/cmssoft/cms/cmsset_default.sh file not found"\n'
1181 gutsche 1.7 txt += ' echo "JOB_EXIT_STATUS = 10017"\n'
1182     txt += ' echo "JobExitCode=10017" | tee -a $RUNTIME_AREA/$repo\n'
1183     txt += ' dumpStatus $RUNTIME_AREA/$repo\n'
1184 gutsche 1.13 txt += ' rm -f $RUNTIME_AREA/$repo \n'
1185     txt += ' echo "MonitorJobID=`echo $MonitorJobID`" | tee -a $RUNTIME_AREA/$repo \n'
1186     txt += ' echo "MonitorID=`echo $MonitorID`" | tee -a $RUNTIME_AREA/$repo\n'
1187 gutsche 1.3 txt += ' fi\n'
1188     txt += '\n'
1189 gutsche 1.7 txt += ' exit 1\n'
1190 gutsche 1.3 txt += ' fi\n'
1191     txt += '\n'
1192     txt += ' echo "SET_CMS_ENV 0 ==> setup cms environment ok"\n'
1193     txt += ' echo " END SETUP CMS OSG ENVIRONMENT "\n'
1194    
1195     return txt
1196    
1197     ### OLI_DANIELE
1198     def wsSetupCMSLCGEnvironment_(self):
1199     """
1200     Returns part of a job script which is prepares
1201     the execution environment and which is common for all CMS jobs.
1202     """
1203     txt = ' \n'
1204     txt += ' echo " ### SETUP CMS LCG ENVIRONMENT ### "\n'
1205     txt += ' if [ ! $VO_CMS_SW_DIR ] ;then\n'
1206     txt += ' echo "SET_CMS_ENV 10031 ==> ERROR CMS software dir not found on WN `hostname`"\n'
1207     txt += ' echo "JOB_EXIT_STATUS = 10031" \n'
1208     txt += ' echo "JobExitCode=10031" | tee -a $RUNTIME_AREA/$repo\n'
1209     txt += ' dumpStatus $RUNTIME_AREA/$repo\n'
1210 gutsche 1.13 txt += ' rm -f $RUNTIME_AREA/$repo \n'
1211     txt += ' echo "MonitorJobID=`echo $MonitorJobID`" | tee -a $RUNTIME_AREA/$repo \n'
1212     txt += ' echo "MonitorID=`echo $MonitorID`" | tee -a $RUNTIME_AREA/$repo\n'
1213 gutsche 1.7 txt += ' exit 1\n'
1214 gutsche 1.3 txt += ' else\n'
1215     txt += ' echo "Sourcing environment... "\n'
1216     txt += ' if [ ! -s $VO_CMS_SW_DIR/cmsset_default.sh ] ;then\n'
1217     txt += ' echo "SET_CMS_ENV 10020 ==> ERROR cmsset_default.sh file not found into dir $VO_CMS_SW_DIR"\n'
1218     txt += ' echo "JOB_EXIT_STATUS = 10020"\n'
1219     txt += ' echo "JobExitCode=10020" | tee -a $RUNTIME_AREA/$repo\n'
1220     txt += ' dumpStatus $RUNTIME_AREA/$repo\n'
1221 gutsche 1.13 txt += ' rm -f $RUNTIME_AREA/$repo \n'
1222     txt += ' echo "MonitorJobID=`echo $MonitorJobID`" | tee -a $RUNTIME_AREA/$repo \n'
1223     txt += ' echo "MonitorID=`echo $MonitorID`" | tee -a $RUNTIME_AREA/$repo\n'
1224 gutsche 1.7 txt += ' exit 1\n'
1225 gutsche 1.3 txt += ' fi\n'
1226     txt += ' echo "sourcing $VO_CMS_SW_DIR/cmsset_default.sh"\n'
1227     txt += ' source $VO_CMS_SW_DIR/cmsset_default.sh\n'
1228     txt += ' result=$?\n'
1229     txt += ' if [ $result -ne 0 ]; then\n'
1230     txt += ' echo "SET_CMS_ENV 10032 ==> ERROR problem sourcing $VO_CMS_SW_DIR/cmsset_default.sh"\n'
1231     txt += ' echo "JOB_EXIT_STATUS = 10032"\n'
1232     txt += ' echo "JobExitCode=10032" | tee -a $RUNTIME_AREA/$repo\n'
1233     txt += ' dumpStatus $RUNTIME_AREA/$repo\n'
1234 gutsche 1.13 txt += ' rm -f $RUNTIME_AREA/$repo \n'
1235     txt += ' echo "MonitorJobID=`echo $MonitorJobID`" | tee -a $RUNTIME_AREA/$repo \n'
1236     txt += ' echo "MonitorID=`echo $MonitorID`" | tee -a $RUNTIME_AREA/$repo\n'
1237 gutsche 1.7 txt += ' exit 1\n'
1238 gutsche 1.3 txt += ' fi\n'
1239     txt += ' fi\n'
1240     txt += ' \n'
1241     txt += ' echo "SET_CMS_ENV 0 ==> setup cms environment ok"\n'
1242     txt += ' echo "### END SETUP CMS LCG ENVIRONMENT ###"\n'
1243     return txt
1244 gutsche 1.5
1245     def setParam_(self, param, value):
1246     self._params[param] = value
1247    
1248     def getParams(self):
1249     return self._params
1250 gutsche 1.8
1251     def setTaskid_(self):
1252     self._taskId = self.cfg_params['taskId']
1253    
1254     def getTaskid(self):
1255     return self._taskId
1256 gutsche 1.35
1257     #######################################################################
1258     def uniquelist(self, old):
1259     """
1260     remove duplicates from a list
1261     """
1262     nd={}
1263     for e in old:
1264     nd[e]=0
1265     return nd.keys()