ViewVC Help
View File | Revision Log | Show Annotations | Root Listing
root/cvsroot/COMP/CRAB/python/cms_cmssw.py
Revision: 1.76
Committed: Tue Apr 17 14:58:29 2007 UTC (18 years ago) by slacapra
Content type: text/x-python
Branch: MAIN
Changes since 1.75: +6 -1 lines
Log Message:
fix bug in checking existence of additional_input_files

File Contents

# User Rev Content
1 slacapra 1.1 from JobType import JobType
2     from crab_logger import Logger
3     from crab_exceptions import *
4     from crab_util import *
5     import common
6 gutsche 1.3 import PsetManipulator
7 slacapra 1.41 import DataDiscovery
8 gutsche 1.66 import DataDiscovery_DBS2
9 slacapra 1.41 import DataLocation
10 slacapra 1.1 import Scram
11    
12 slacapra 1.70 import os, string, re, shutil, glob
13 slacapra 1.1
14     class Cmssw(JobType):
15 gutsche 1.38 def __init__(self, cfg_params, ncjobs):
16 slacapra 1.1 JobType.__init__(self, 'CMSSW')
17     common.logger.debug(3,'CMSSW::__init__')
18    
19 gutsche 1.3 # Marco.
20     self._params = {}
21     self.cfg_params = cfg_params
22 gutsche 1.38
23 gutsche 1.72 try:
24     self.MaxTarBallSize = float(self.cfg_params['EDG.maxtarballsize'])
25     except KeyError:
26     self.MaxTarBallSize = 100.0
27    
28 gutsche 1.44 # number of jobs requested to be created, limit obj splitting
29 gutsche 1.38 self.ncjobs = ncjobs
30    
31 slacapra 1.1 log = common.logger
32    
33     self.scram = Scram.Scram(cfg_params)
34     self.additional_inbox_files = []
35     self.scriptExe = ''
36     self.executable = ''
37 slacapra 1.71 self.executable_arch = self.scram.getArch()
38 slacapra 1.1 self.tgz_name = 'default.tgz'
39 corvo 1.56 self.scriptName = 'CMSSW.sh'
40 spiga 1.42 self.pset = '' #scrip use case Da
41     self.datasetPath = '' #scrip use case Da
42 gutsche 1.3
43 gutsche 1.50 # set FJR file name
44     self.fjrFileName = 'crab_fjr.xml'
45    
46 slacapra 1.1 self.version = self.scram.getSWVersion()
47 slacapra 1.55 common.taskDB.setDict('codeVersion',self.version)
48 gutsche 1.5 self.setParam_('application', self.version)
49 slacapra 1.47
50 slacapra 1.1 ### collect Data cards
51 gutsche 1.66
52     ## get DBS mode
53     try:
54     self.use_dbs_2 = int(self.cfg_params['CMSSW.use_dbs_2'])
55     except KeyError:
56     self.use_dbs_2 = 0
57    
58 slacapra 1.1 try:
59 slacapra 1.9 tmp = cfg_params['CMSSW.datasetpath']
60     log.debug(6, "CMSSW::CMSSW(): datasetPath = "+tmp)
61     if string.lower(tmp)=='none':
62     self.datasetPath = None
63 slacapra 1.21 self.selectNoInput = 1
64 slacapra 1.9 else:
65     self.datasetPath = tmp
66 slacapra 1.21 self.selectNoInput = 0
67 slacapra 1.1 except KeyError:
68 gutsche 1.3 msg = "Error: datasetpath not defined "
69 slacapra 1.1 raise CrabException(msg)
70 gutsche 1.5
71     # ML monitoring
72     # split dataset path style: /PreProdR3Minbias/SIM/GEN-SIM
73 slacapra 1.9 if not self.datasetPath:
74     self.setParam_('dataset', 'None')
75     self.setParam_('owner', 'None')
76     else:
77     datasetpath_split = self.datasetPath.split("/")
78 gutsche 1.75 if self.use_dbs_2 == 1 :
79     self.setParam_('dataset', datasetpath_split[1])
80     self.setParam_('owner', datasetpath_split[2])
81     else :
82     self.setParam_('dataset', datasetpath_split[1])
83     self.setParam_('owner', datasetpath_split[-1])
84 slacapra 1.9
85 gutsche 1.8 self.setTaskid_()
86     self.setParam_('taskId', self.cfg_params['taskId'])
87 gutsche 1.5
88 slacapra 1.1 self.dataTiers = []
89    
90     ## now the application
91     try:
92     self.executable = cfg_params['CMSSW.executable']
93 gutsche 1.5 self.setParam_('exe', self.executable)
94 slacapra 1.1 log.debug(6, "CMSSW::CMSSW(): executable = "+self.executable)
95     msg = "Default executable cmsRun overridden. Switch to " + self.executable
96     log.debug(3,msg)
97     except KeyError:
98     self.executable = 'cmsRun'
99 gutsche 1.5 self.setParam_('exe', self.executable)
100 slacapra 1.1 msg = "User executable not defined. Use cmsRun"
101     log.debug(3,msg)
102     pass
103    
104     try:
105     self.pset = cfg_params['CMSSW.pset']
106     log.debug(6, "Cmssw::Cmssw(): PSet file = "+self.pset)
107 spiga 1.42 if self.pset.lower() != 'none' :
108     if (not os.path.exists(self.pset)):
109     raise CrabException("User defined PSet file "+self.pset+" does not exist")
110     else:
111     self.pset = None
112 slacapra 1.1 except KeyError:
113     raise CrabException("PSet file missing. Cannot run cmsRun ")
114    
115     # output files
116 slacapra 1.53 ## stuff which must be returned always via sandbox
117     self.output_file_sandbox = []
118    
119     # add fjr report by default via sandbox
120     self.output_file_sandbox.append(self.fjrFileName)
121    
122     # other output files to be returned via sandbox or copied to SE
123 slacapra 1.1 try:
124     self.output_file = []
125     tmp = cfg_params['CMSSW.output_file']
126     if tmp != '':
127     tmpOutFiles = string.split(cfg_params['CMSSW.output_file'],',')
128     log.debug(7, 'cmssw::cmssw(): output files '+str(tmpOutFiles))
129     for tmp in tmpOutFiles:
130     tmp=string.strip(tmp)
131     self.output_file.append(tmp)
132     pass
133     else:
134 gutsche 1.50 log.message("No output file defined: only stdout/err and the CRAB Framework Job Report will be available")
135 slacapra 1.1 pass
136     pass
137     except KeyError:
138 gutsche 1.50 log.message("No output file defined: only stdout/err and the CRAB Framework Job Report will be available")
139 slacapra 1.1 pass
140    
141     # script_exe file as additional file in inputSandbox
142     try:
143 slacapra 1.10 self.scriptExe = cfg_params['USER.script_exe']
144     if self.scriptExe != '':
145     if not os.path.isfile(self.scriptExe):
146 slacapra 1.64 msg ="ERROR. file "+self.scriptExe+" not found"
147 slacapra 1.10 raise CrabException(msg)
148 spiga 1.42 self.additional_inbox_files.append(string.strip(self.scriptExe))
149 slacapra 1.1 except KeyError:
150 spiga 1.42 self.scriptExe = ''
151 slacapra 1.70
152 spiga 1.42 #CarlosDaniele
153     if self.datasetPath == None and self.pset == None and self.scriptExe == '' :
154 slacapra 1.70 msg ="Error. script_exe not defined"
155 spiga 1.42 raise CrabException(msg)
156    
157 slacapra 1.1 ## additional input files
158     try:
159 slacapra 1.29 tmpAddFiles = string.split(cfg_params['USER.additional_input_files'],',')
160 slacapra 1.70 for tmp in tmpAddFiles:
161     tmp = string.strip(tmp)
162     dirname = ''
163     if not tmp[0]=="/": dirname = "."
164 slacapra 1.76 files = []
165     if string.find(tmp,"*")>-1:
166     files = glob.glob(os.path.join(dirname, tmp))
167     if len(files)==0:
168     raise CrabException("No additional input file found with this pattern: "+tmp)
169     else: files.append(tmp)
170 slacapra 1.70 for file in files:
171     if not os.path.exists(file):
172     raise CrabException("Additional input file not found: "+file)
173 slacapra 1.45 pass
174 slacapra 1.70 storedFile = common.work_space.shareDir()+file
175     shutil.copyfile(file, storedFile)
176     self.additional_inbox_files.append(string.strip(storedFile))
177 slacapra 1.1 pass
178     pass
179 slacapra 1.70 common.logger.debug(5,"Additional input files: "+str(self.additional_inbox_files))
180 slacapra 1.1 except KeyError:
181     pass
182    
183 slacapra 1.9 # files per job
184 slacapra 1.1 try:
185 gutsche 1.35 if (cfg_params['CMSSW.files_per_jobs']):
186     raise CrabException("files_per_jobs no longer supported. Quitting.")
187 gutsche 1.3 except KeyError:
188 gutsche 1.35 pass
189 gutsche 1.3
190 slacapra 1.9 ## Events per job
191 gutsche 1.3 try:
192 slacapra 1.10 self.eventsPerJob =int( cfg_params['CMSSW.events_per_job'])
193 slacapra 1.9 self.selectEventsPerJob = 1
194 gutsche 1.3 except KeyError:
195 slacapra 1.9 self.eventsPerJob = -1
196     self.selectEventsPerJob = 0
197    
198 slacapra 1.22 ## number of jobs
199     try:
200     self.theNumberOfJobs =int( cfg_params['CMSSW.number_of_jobs'])
201     self.selectNumberOfJobs = 1
202     except KeyError:
203     self.theNumberOfJobs = 0
204     self.selectNumberOfJobs = 0
205 slacapra 1.10
206 gutsche 1.35 try:
207     self.total_number_of_events = int(cfg_params['CMSSW.total_number_of_events'])
208     self.selectTotalNumberEvents = 1
209     except KeyError:
210     self.total_number_of_events = 0
211     self.selectTotalNumberEvents = 0
212    
213 spiga 1.42 if self.pset != None: #CarlosDaniele
214     if ( (self.selectTotalNumberEvents + self.selectEventsPerJob + self.selectNumberOfJobs) != 2 ):
215     msg = 'Must define exactly two of total_number_of_events, events_per_job, or number_of_jobs.'
216     raise CrabException(msg)
217     else:
218     if (self.selectNumberOfJobs == 0):
219     msg = 'Must specify number_of_jobs.'
220     raise CrabException(msg)
221 gutsche 1.35
222 slacapra 1.22 ## source seed for pythia
223     try:
224     self.sourceSeed = int(cfg_params['CMSSW.pythia_seed'])
225     except KeyError:
226 slacapra 1.23 self.sourceSeed = None
227     common.logger.debug(5,"No seed given")
228 slacapra 1.22
229 slacapra 1.28 try:
230     self.sourceSeedVtx = int(cfg_params['CMSSW.vtx_seed'])
231     except KeyError:
232     self.sourceSeedVtx = None
233     common.logger.debug(5,"No vertex seed given")
234 spiga 1.57 try:
235     self.firstRun = int(cfg_params['CMSSW.first_run'])
236     except KeyError:
237     self.firstRun = None
238     common.logger.debug(5,"No first run given")
239 spiga 1.42 if self.pset != None: #CarlosDaniele
240     self.PsetEdit = PsetManipulator.PsetManipulator(self.pset) #Daniele Pset
241 gutsche 1.3
242 slacapra 1.1 #DBSDLS-start
243     ## Initialize the variables that are extracted from DBS/DLS and needed in other places of the code
244     self.maxEvents=0 # max events available ( --> check the requested nb. of evts in Creator.py)
245     self.DBSPaths={} # all dbs paths requested ( --> input to the site local discovery script)
246 gutsche 1.35 self.jobDestination=[] # Site destination(s) for each job (list of lists)
247 slacapra 1.1 ## Perform the data location and discovery (based on DBS/DLS)
248 slacapra 1.9 ## SL: Don't if NONE is specified as input (pythia use case)
249 gutsche 1.35 blockSites = {}
250 slacapra 1.9 if self.datasetPath:
251 gutsche 1.35 blockSites = self.DataDiscoveryAndLocation(cfg_params)
252 slacapra 1.1 #DBSDLS-end
253    
254     self.tgzNameWithPath = self.getTarBall(self.executable)
255 slacapra 1.10
256 slacapra 1.9 ## Select Splitting
257 spiga 1.42 if self.selectNoInput:
258     if self.pset == None: #CarlosDaniele
259     self.jobSplittingForScript()
260     else:
261     self.jobSplittingNoInput()
262 corvo 1.56 else:
263     self.jobSplittingByBlocks(blockSites)
264 gutsche 1.5
265 slacapra 1.22 # modify Pset
266 spiga 1.42 if self.pset != None: #CarlosDaniele
267     try:
268     if (self.datasetPath): # standard job
269     # allow to processa a fraction of events in a file
270     self.PsetEdit.inputModule("INPUT")
271     self.PsetEdit.maxEvent("INPUTMAXEVENTS")
272     self.PsetEdit.skipEvent("INPUTSKIPEVENTS")
273     else: # pythia like job
274     self.PsetEdit.maxEvent(self.eventsPerJob)
275 spiga 1.57 if (self.firstRun):
276     self.PsetEdit.pythiaFirstRun("INPUTFIRSTRUN") #First Run
277 spiga 1.42 if (self.sourceSeed) :
278     self.PsetEdit.pythiaSeed("INPUT")
279     if (self.sourceSeedVtx) :
280     self.PsetEdit.pythiaSeedVtx("INPUTVTX")
281 gutsche 1.50 # add FrameworkJobReport to parameter-set
282     self.PsetEdit.addCrabFJR(self.fjrFileName)
283 spiga 1.42 self.PsetEdit.psetWriter(self.configFilename())
284     except:
285     msg='Error while manipuliating ParameterSet: exiting...'
286     raise CrabException(msg)
287 gutsche 1.3
288 slacapra 1.1 def DataDiscoveryAndLocation(self, cfg_params):
289    
290 gutsche 1.3 common.logger.debug(10,"CMSSW::DataDiscoveryAndLocation()")
291    
292     datasetPath=self.datasetPath
293    
294 slacapra 1.1 ## Contact the DBS
295 slacapra 1.41 common.logger.message("Contacting DBS...")
296 slacapra 1.1 try:
297 gutsche 1.66
298     if self.use_dbs_2 == 1 :
299     self.pubdata=DataDiscovery_DBS2.DataDiscovery_DBS2(datasetPath, cfg_params)
300     else :
301     self.pubdata=DataDiscovery.DataDiscovery(datasetPath, cfg_params)
302 slacapra 1.1 self.pubdata.fetchDBSInfo()
303    
304 slacapra 1.41 except DataDiscovery.NotExistingDatasetError, ex :
305 slacapra 1.1 msg = 'ERROR ***: failed Data Discovery in DBS : %s'%ex.getErrorMessage()
306     raise CrabException(msg)
307 slacapra 1.41 except DataDiscovery.NoDataTierinProvenanceError, ex :
308 slacapra 1.1 msg = 'ERROR ***: failed Data Discovery in DBS : %s'%ex.getErrorMessage()
309     raise CrabException(msg)
310 slacapra 1.41 except DataDiscovery.DataDiscoveryError, ex:
311 gutsche 1.66 msg = 'ERROR ***: failed Data Discovery in DBS : %s'%ex.getErrorMessage()
312 slacapra 1.1 raise CrabException(msg)
313 gutsche 1.67 except DataDiscovery_DBS2.NotExistingDatasetError_DBS2, ex :
314     msg = 'ERROR ***: failed Data Discovery in DBS : %s'%ex.getErrorMessage()
315     raise CrabException(msg)
316     except DataDiscovery_DBS2.NoDataTierinProvenanceError_DBS2, ex :
317     msg = 'ERROR ***: failed Data Discovery in DBS : %s'%ex.getErrorMessage()
318     raise CrabException(msg)
319     except DataDiscovery_DBS2.DataDiscoveryError_DBS2, ex:
320     msg = 'ERROR ***: failed Data Discovery in DBS : %s'%ex.getErrorMessage()
321     raise CrabException(msg)
322 slacapra 1.1
323     ## get list of all required data in the form of dbs paths (dbs path = /dataset/datatier/owner)
324 gutsche 1.3 common.logger.message("Required data are :"+self.datasetPath)
325    
326 gutsche 1.35 self.filesbyblock=self.pubdata.getFiles()
327 mkirn 1.37 self.eventsbyblock=self.pubdata.getEventsPerBlock()
328     self.eventsbyfile=self.pubdata.getEventsPerFile()
329 gutsche 1.3
330 slacapra 1.1 ## get max number of events
331     self.maxEvents=self.pubdata.getMaxEvents() ## self.maxEvents used in Creator.py
332 gutsche 1.44 common.logger.message("The number of available events is %s\n"%self.maxEvents)
333 slacapra 1.1
334 slacapra 1.41 common.logger.message("Contacting DLS...")
335 slacapra 1.1 ## Contact the DLS and build a list of sites hosting the fileblocks
336     try:
337 slacapra 1.41 dataloc=DataLocation.DataLocation(self.filesbyblock.keys(),cfg_params)
338 gutsche 1.6 dataloc.fetchDLSInfo()
339 slacapra 1.41 except DataLocation.DataLocationError , ex:
340 slacapra 1.1 msg = 'ERROR ***: failed Data Location in DLS \n %s '%ex.getErrorMessage()
341     raise CrabException(msg)
342    
343    
344 gutsche 1.35 sites = dataloc.getSites()
345     allSites = []
346     listSites = sites.values()
347 slacapra 1.63 for listSite in listSites:
348     for oneSite in listSite:
349 gutsche 1.35 allSites.append(oneSite)
350     allSites = self.uniquelist(allSites)
351 gutsche 1.3
352 gutsche 1.35 common.logger.message("Sites ("+str(len(allSites))+") hosting part/all of dataset: "+str(allSites))
353     common.logger.debug(6, "List of Sites: "+str(allSites))
354     return sites
355 gutsche 1.3
356 gutsche 1.35 def jobSplittingByBlocks(self, blockSites):
357 slacapra 1.9 """
358 gutsche 1.35 Perform job splitting. Jobs run over an integer number of files
359     and no more than one block.
360     ARGUMENT: blockSites: dictionary with blocks as keys and list of host sites as values
361     REQUIRES: self.selectTotalNumberEvents, self.selectEventsPerJob, self.selectNumberofJobs,
362     self.total_number_of_events, self.eventsPerJob, self.theNumberOfJobs,
363     self.maxEvents, self.filesbyblock
364     SETS: self.jobDestination - Site destination(s) for each job (a list of lists)
365     self.total_number_of_jobs - Total # of jobs
366     self.list_of_args - File(s) job will run on (a list of lists)
367     """
368    
369     # ---- Handle the possible job splitting configurations ---- #
370     if (self.selectTotalNumberEvents):
371     totalEventsRequested = self.total_number_of_events
372     if (self.selectEventsPerJob):
373     eventsPerJobRequested = self.eventsPerJob
374     if (self.selectNumberOfJobs):
375     totalEventsRequested = self.theNumberOfJobs * self.eventsPerJob
376    
377     # If user requested all the events in the dataset
378     if (totalEventsRequested == -1):
379     eventsRemaining=self.maxEvents
380     # If user requested more events than are in the dataset
381     elif (totalEventsRequested > self.maxEvents):
382     eventsRemaining = self.maxEvents
383     common.logger.message("Requested "+str(self.total_number_of_events)+ " events, but only "+str(self.maxEvents)+" events are available.")
384     # If user requested less events than are in the dataset
385     else:
386     eventsRemaining = totalEventsRequested
387 slacapra 1.22
388 slacapra 1.41 # If user requested more events per job than are in the dataset
389     if (self.selectEventsPerJob and eventsPerJobRequested > self.maxEvents):
390     eventsPerJobRequested = self.maxEvents
391    
392 gutsche 1.35 # For user info at end
393     totalEventCount = 0
394 gutsche 1.3
395 gutsche 1.35 if (self.selectTotalNumberEvents and self.selectNumberOfJobs):
396     eventsPerJobRequested = int(eventsRemaining/self.theNumberOfJobs)
397 slacapra 1.22
398 gutsche 1.35 if (self.selectNumberOfJobs):
399     common.logger.message("May not create the exact number_of_jobs requested.")
400 slacapra 1.23
401 gutsche 1.38 if ( self.ncjobs == 'all' ) :
402     totalNumberOfJobs = 999999999
403     else :
404     totalNumberOfJobs = self.ncjobs
405    
406    
407 gutsche 1.35 blocks = blockSites.keys()
408     blockCount = 0
409     # Backup variable in case self.maxEvents counted events in a non-included block
410     numBlocksInDataset = len(blocks)
411 gutsche 1.3
412 gutsche 1.35 jobCount = 0
413     list_of_lists = []
414 gutsche 1.3
415 gutsche 1.35 # ---- Iterate over the blocks in the dataset until ---- #
416     # ---- we've met the requested total # of events ---- #
417 gutsche 1.38 while ( (eventsRemaining > 0) and (blockCount < numBlocksInDataset) and (jobCount < totalNumberOfJobs)):
418 gutsche 1.35 block = blocks[blockCount]
419 gutsche 1.44 blockCount += 1
420    
421 gutsche 1.68 if self.eventsbyblock.has_key(block) :
422     numEventsInBlock = self.eventsbyblock[block]
423     common.logger.debug(5,'Events in Block File '+str(numEventsInBlock))
424 slacapra 1.9
425 gutsche 1.68 files = self.filesbyblock[block]
426     numFilesInBlock = len(files)
427     if (numFilesInBlock <= 0):
428     continue
429     fileCount = 0
430    
431     # ---- New block => New job ---- #
432     parString = "\\{"
433     # counter for number of events in files currently worked on
434     filesEventCount = 0
435     # flag if next while loop should touch new file
436     newFile = 1
437     # job event counter
438     jobSkipEventCount = 0
439 slacapra 1.9
440 gutsche 1.68 # ---- Iterate over the files in the block until we've met the requested ---- #
441     # ---- total # of events or we've gone over all the files in this block ---- #
442     while ( (eventsRemaining > 0) and (fileCount < numFilesInBlock) and (jobCount < totalNumberOfJobs) ):
443     file = files[fileCount]
444     if newFile :
445     try:
446     numEventsInFile = self.eventsbyfile[file]
447     common.logger.debug(6, "File "+str(file)+" has "+str(numEventsInFile)+" events")
448     # increase filesEventCount
449     filesEventCount += numEventsInFile
450     # Add file to current job
451     parString += '\\\"' + file + '\\\"\,'
452     newFile = 0
453     except KeyError:
454     common.logger.message("File "+str(file)+" has unknown number of events: skipping")
455 slacapra 1.41
456 gutsche 1.38
457 gutsche 1.68 # if less events in file remain than eventsPerJobRequested
458     if ( filesEventCount - jobSkipEventCount < eventsPerJobRequested ) :
459     # if last file in block
460     if ( fileCount == numFilesInBlock-1 ) :
461     # end job using last file, use remaining events in block
462     # close job and touch new file
463     fullString = parString[:-2]
464     fullString += '\\}'
465     list_of_lists.append([fullString,str(-1),str(jobSkipEventCount)])
466     common.logger.debug(3,"Job "+str(jobCount+1)+" can run over "+str(filesEventCount - jobSkipEventCount)+" events (last file in block).")
467     self.jobDestination.append(blockSites[block])
468     common.logger.debug(5,"Job "+str(jobCount+1)+" Destination: "+str(self.jobDestination[jobCount]))
469     # reset counter
470     jobCount = jobCount + 1
471     totalEventCount = totalEventCount + filesEventCount - jobSkipEventCount
472     eventsRemaining = eventsRemaining - filesEventCount + jobSkipEventCount
473     jobSkipEventCount = 0
474     # reset file
475     parString = "\\{"
476     filesEventCount = 0
477     newFile = 1
478     fileCount += 1
479     else :
480     # go to next file
481     newFile = 1
482     fileCount += 1
483     # if events in file equal to eventsPerJobRequested
484     elif ( filesEventCount - jobSkipEventCount == eventsPerJobRequested ) :
485 gutsche 1.38 # close job and touch new file
486     fullString = parString[:-2]
487     fullString += '\\}'
488 gutsche 1.68 list_of_lists.append([fullString,str(eventsPerJobRequested),str(jobSkipEventCount)])
489     common.logger.debug(3,"Job "+str(jobCount+1)+" can run over "+str(eventsPerJobRequested)+" events.")
490 gutsche 1.38 self.jobDestination.append(blockSites[block])
491     common.logger.debug(5,"Job "+str(jobCount+1)+" Destination: "+str(self.jobDestination[jobCount]))
492     # reset counter
493     jobCount = jobCount + 1
494 gutsche 1.68 totalEventCount = totalEventCount + eventsPerJobRequested
495     eventsRemaining = eventsRemaining - eventsPerJobRequested
496 gutsche 1.38 jobSkipEventCount = 0
497     # reset file
498     parString = "\\{"
499     filesEventCount = 0
500     newFile = 1
501     fileCount += 1
502 gutsche 1.68
503     # if more events in file remain than eventsPerJobRequested
504 gutsche 1.38 else :
505 gutsche 1.68 # close job but don't touch new file
506     fullString = parString[:-2]
507     fullString += '\\}'
508     list_of_lists.append([fullString,str(eventsPerJobRequested),str(jobSkipEventCount)])
509     common.logger.debug(3,"Job "+str(jobCount+1)+" can run over "+str(eventsPerJobRequested)+" events.")
510     self.jobDestination.append(blockSites[block])
511     common.logger.debug(5,"Job "+str(jobCount+1)+" Destination: "+str(self.jobDestination[jobCount]))
512     # increase counter
513     jobCount = jobCount + 1
514     totalEventCount = totalEventCount + eventsPerJobRequested
515     eventsRemaining = eventsRemaining - eventsPerJobRequested
516     # calculate skip events for last file
517     # use filesEventCount (contains several files), jobSkipEventCount and eventsPerJobRequest
518     jobSkipEventCount = eventsPerJobRequested - (filesEventCount - jobSkipEventCount - self.eventsbyfile[file])
519     # remove all but the last file
520     filesEventCount = self.eventsbyfile[file]
521     parString = "\\{"
522     parString += '\\\"' + file + '\\\"\,'
523     pass # END if
524     pass # END while (iterate over files in the block)
525 gutsche 1.35 pass # END while (iterate over blocks in the dataset)
526 slacapra 1.41 self.ncjobs = self.total_number_of_jobs = jobCount
527 gutsche 1.38 if (eventsRemaining > 0 and jobCount < totalNumberOfJobs ):
528 gutsche 1.35 common.logger.message("Could not run on all requested events because some blocks not hosted at allowed sites.")
529 mkirn 1.37 common.logger.message("\n"+str(jobCount)+" job(s) can run on "+str(totalEventCount)+" events.\n")
530 slacapra 1.22
531 slacapra 1.9 self.list_of_args = list_of_lists
532     return
533    
534 slacapra 1.21 def jobSplittingNoInput(self):
535 slacapra 1.9 """
536     Perform job splitting based on number of event per job
537     """
538     common.logger.debug(5,'Splitting per events')
539     common.logger.message('Required '+str(self.eventsPerJob)+' events per job ')
540 slacapra 1.22 common.logger.message('Required '+str(self.theNumberOfJobs)+' jobs in total ')
541 slacapra 1.9 common.logger.message('Required '+str(self.total_number_of_events)+' events in total ')
542    
543 slacapra 1.10 if (self.total_number_of_events < 0):
544     msg='Cannot split jobs per Events with "-1" as total number of events'
545     raise CrabException(msg)
546    
547 slacapra 1.22 if (self.selectEventsPerJob):
548 spiga 1.65 if (self.selectTotalNumberEvents):
549     self.total_number_of_jobs = int(self.total_number_of_events/self.eventsPerJob)
550     elif(self.selectNumberOfJobs) :
551     self.total_number_of_jobs =self.theNumberOfJobs
552     self.total_number_of_events =int(self.theNumberOfJobs*self.eventsPerJob)
553    
554 slacapra 1.22 elif (self.selectNumberOfJobs) :
555     self.total_number_of_jobs = self.theNumberOfJobs
556     self.eventsPerJob = int(self.total_number_of_events/self.total_number_of_jobs)
557 spiga 1.65
558 slacapra 1.9 common.logger.debug(5,'N jobs '+str(self.total_number_of_jobs))
559    
560     # is there any remainder?
561     check = int(self.total_number_of_events) - (int(self.total_number_of_jobs)*self.eventsPerJob)
562    
563     common.logger.debug(5,'Check '+str(check))
564    
565 gutsche 1.35 common.logger.message(str(self.total_number_of_jobs)+' jobs can be created, each for '+str(self.eventsPerJob)+' for a total of '+str(self.total_number_of_jobs*self.eventsPerJob)+' events')
566 slacapra 1.9 if check > 0:
567 gutsche 1.35 common.logger.message('Warning: asked '+str(self.total_number_of_events)+' but can do only '+str(int(self.total_number_of_jobs)*self.eventsPerJob))
568 slacapra 1.9
569 slacapra 1.10 # argument is seed number.$i
570 slacapra 1.9 self.list_of_args = []
571     for i in range(self.total_number_of_jobs):
572 gutsche 1.35 ## Since there is no input, any site is good
573 spiga 1.42 # self.jobDestination.append(["Any"])
574     self.jobDestination.append([""]) #must be empty to write correctly the xml
575 spiga 1.57 args=''
576     if (self.firstRun):
577     ## pythia first run
578     #self.list_of_args.append([(str(self.firstRun)+str(i))])
579     args=args+(str(self.firstRun)+str(i))
580     else:
581     ## no first run
582     #self.list_of_args.append([str(i)])
583     args=args+str(i)
584 slacapra 1.23 if (self.sourceSeed):
585 slacapra 1.28 if (self.sourceSeedVtx):
586     ## pythia + vtx random seed
587 spiga 1.57 #self.list_of_args.append([
588     # str(self.sourceSeed)+str(i),
589     # str(self.sourceSeedVtx)+str(i)
590     # ])
591     args=args+str(',')+str(self.sourceSeed)+str(i)+str(',')+str(self.sourceSeedVtx)+str(i)
592 slacapra 1.28 else:
593     ## only pythia random seed
594 spiga 1.57 #self.list_of_args.append([(str(self.sourceSeed)+str(i))])
595     args=args +str(',')+str(self.sourceSeed)+str(i)
596 slacapra 1.23 else:
597 slacapra 1.28 ## no random seed
598 spiga 1.57 if str(args)=='': args=args+(str(self.firstRun)+str(i))
599     arguments=args.split(',')
600     if len(arguments)==3:self.list_of_args.append([str(arguments[0]),str(arguments[1]),str(arguments[2])])
601     elif len(arguments)==2:self.list_of_args.append([str(arguments[0]),str(arguments[1])])
602     else :self.list_of_args.append([str(arguments[0])])
603    
604     # print self.list_of_args
605 gutsche 1.3
606     return
607    
608 spiga 1.42
609     def jobSplittingForScript(self):#CarlosDaniele
610     """
611     Perform job splitting based on number of job
612     """
613     common.logger.debug(5,'Splitting per job')
614     common.logger.message('Required '+str(self.theNumberOfJobs)+' jobs in total ')
615    
616     self.total_number_of_jobs = self.theNumberOfJobs
617    
618     common.logger.debug(5,'N jobs '+str(self.total_number_of_jobs))
619    
620     common.logger.message(str(self.total_number_of_jobs)+' jobs can be created')
621    
622     # argument is seed number.$i
623     self.list_of_args = []
624     for i in range(self.total_number_of_jobs):
625     ## Since there is no input, any site is good
626     # self.jobDestination.append(["Any"])
627     self.jobDestination.append([""])
628     ## no random seed
629     self.list_of_args.append([str(i)])
630     return
631    
632 gutsche 1.3 def split(self, jobParams):
633    
634     common.jobDB.load()
635     #### Fabio
636     njobs = self.total_number_of_jobs
637 slacapra 1.9 arglist = self.list_of_args
638 gutsche 1.3 # create the empty structure
639     for i in range(njobs):
640     jobParams.append("")
641    
642     for job in range(njobs):
643 slacapra 1.17 jobParams[job] = arglist[job]
644     # print str(arglist[job])
645     # print jobParams[job]
646 gutsche 1.3 common.jobDB.setArguments(job, jobParams[job])
647 gutsche 1.35 common.logger.debug(5,"Job "+str(job)+" Destination: "+str(self.jobDestination[job]))
648     common.jobDB.setDestination(job, self.jobDestination[job])
649 gutsche 1.3
650     common.jobDB.save()
651     return
652    
653     def getJobTypeArguments(self, nj, sched):
654 slacapra 1.17 result = ''
655     for i in common.jobDB.arguments(nj):
656     result=result+str(i)+" "
657     return result
658 gutsche 1.3
659     def numberOfJobs(self):
660     # Fabio
661     return self.total_number_of_jobs
662    
663 slacapra 1.1 def getTarBall(self, exe):
664     """
665     Return the TarBall with lib and exe
666     """
667    
668     # if it exist, just return it
669 corvo 1.56 #
670     # Marco. Let's start to use relative path for Boss XML files
671     #
672     self.tgzNameWithPath = common.work_space.pathForTgz()+'share/'+self.tgz_name
673 slacapra 1.1 if os.path.exists(self.tgzNameWithPath):
674     return self.tgzNameWithPath
675    
676     # Prepare a tar gzipped file with user binaries.
677     self.buildTar_(exe)
678    
679     return string.strip(self.tgzNameWithPath)
680    
681     def buildTar_(self, executable):
682    
683     # First of all declare the user Scram area
684     swArea = self.scram.getSWArea_()
685     #print "swArea = ", swArea
686 slacapra 1.63 # swVersion = self.scram.getSWVersion()
687     # print "swVersion = ", swVersion
688 slacapra 1.1 swReleaseTop = self.scram.getReleaseTop_()
689     #print "swReleaseTop = ", swReleaseTop
690    
691     ## check if working area is release top
692     if swReleaseTop == '' or swArea == swReleaseTop:
693     return
694    
695 slacapra 1.61 import tarfile
696     try: # create tar ball
697     tar = tarfile.open(self.tgzNameWithPath, "w:gz")
698     ## First find the executable
699 gutsche 1.73 if (executable != ''):
700 slacapra 1.61 exeWithPath = self.scram.findFile_(executable)
701     if ( not exeWithPath ):
702     raise CrabException('User executable '+executable+' not found')
703    
704     ## then check if it's private or not
705     if exeWithPath.find(swReleaseTop) == -1:
706     # the exe is private, so we must ship
707     common.logger.debug(5,"Exe "+exeWithPath+" to be tarred")
708     path = swArea+'/'
709 gutsche 1.74 # distinguish case when script is in user project area or given by full path somewhere else
710     if exeWithPath.find(path) >= 0 :
711     exe = string.replace(exeWithPath, path,'')
712     tar.add(path+exe,os.path.basename(executable))
713     else :
714     tar.add(exeWithPath,os.path.basename(executable))
715 slacapra 1.61 pass
716     else:
717     # the exe is from release, we'll find it on WN
718     pass
719    
720     ## Now get the libraries: only those in local working area
721     libDir = 'lib'
722     lib = swArea+'/' +libDir
723     common.logger.debug(5,"lib "+lib+" to be tarred")
724     if os.path.exists(lib):
725     tar.add(lib,libDir)
726    
727     ## Now check if module dir is present
728     moduleDir = 'module'
729     module = swArea + '/' + moduleDir
730     if os.path.isdir(module):
731     tar.add(module,moduleDir)
732    
733     ## Now check if any data dir(s) is present
734     swAreaLen=len(swArea)
735     for root, dirs, files in os.walk(swArea):
736     if "data" in dirs:
737     common.logger.debug(5,"data "+root+"/data"+" to be tarred")
738     tar.add(root+"/data",root[swAreaLen:]+"/data")
739    
740     ## Add ProdAgent dir to tar
741     paDir = 'ProdAgentApi'
742     pa = os.environ['CRABDIR'] + '/' + 'ProdAgentApi'
743     if os.path.isdir(pa):
744     tar.add(pa,paDir)
745    
746     common.logger.debug(5,"Files added to "+self.tgzNameWithPath+" : "+str(tar.getnames()))
747     tar.close()
748     except :
749     raise CrabException('Could not create tar-ball')
750 gutsche 1.72
751     ## check for tarball size
752     tarballinfo = os.stat(self.tgzNameWithPath)
753     if ( tarballinfo.st_size > self.MaxTarBallSize*1024*1024 ) :
754     raise CrabException('Input sandbox size of ' + str(float(tarballinfo.st_size)/1024.0/1024.0) + ' MB is larger than the allowed ' + str(self.MaxTarBallSize) + ' MB input sandbox limit and not supported by the used GRID submission system. Please make sure that no unnecessary files are in all data directories in your local CMSSW project area as they are automatically packed into the input sandbox.')
755    
756 slacapra 1.61 ## create tar-ball with ML stuff
757 corvo 1.58 self.MLtgzfile = common.work_space.pathForTgz()+'share/MLfiles.tgz'
758 slacapra 1.61 try:
759     tar = tarfile.open(self.MLtgzfile, "w:gz")
760     path=os.environ['CRABDIR'] + '/python/'
761     for file in ['report.py', 'DashboardAPI.py', 'Logger.py', 'ProcInfo.py', 'apmon.py', 'parseCrabFjr.py']:
762     tar.add(path+file,file)
763     common.logger.debug(5,"Files added to "+self.MLtgzfile+" : "+str(tar.getnames()))
764     tar.close()
765     except :
766 corvo 1.58 raise CrabException('Could not create ML files tar-ball')
767    
768 slacapra 1.1 return
769    
770     def wsSetupEnvironment(self, nj):
771     """
772     Returns part of a job script which prepares
773     the execution environment for the job 'nj'.
774     """
775     # Prepare JobType-independent part
776 gutsche 1.3 txt = ''
777    
778     ## OLI_Daniele at this level middleware already known
779    
780     txt += 'if [ $middleware == LCG ]; then \n'
781     txt += self.wsSetupCMSLCGEnvironment_()
782     txt += 'elif [ $middleware == OSG ]; then\n'
783 gutsche 1.43 txt += ' WORKING_DIR=`/bin/mktemp -d $OSG_WN_TMP/cms_XXXXXXXXXXXX`\n'
784     txt += ' echo "Created working directory: $WORKING_DIR"\n'
785 gutsche 1.3 txt += ' if [ ! -d $WORKING_DIR ] ;then\n'
786 gutsche 1.7 txt += ' echo "SET_CMS_ENV 10016 ==> OSG $WORKING_DIR could not be created on WN `hostname`"\n'
787     txt += ' echo "JOB_EXIT_STATUS = 10016"\n'
788     txt += ' echo "JobExitCode=10016" | tee -a $RUNTIME_AREA/$repo\n'
789     txt += ' dumpStatus $RUNTIME_AREA/$repo\n'
790 gutsche 1.13 txt += ' rm -f $RUNTIME_AREA/$repo \n'
791     txt += ' echo "MonitorJobID=`echo $MonitorJobID`" | tee -a $RUNTIME_AREA/$repo \n'
792     txt += ' echo "MonitorID=`echo $MonitorID`" | tee -a $RUNTIME_AREA/$repo\n'
793 gutsche 1.3 txt += ' exit 1\n'
794     txt += ' fi\n'
795     txt += '\n'
796     txt += ' echo "Change to working directory: $WORKING_DIR"\n'
797     txt += ' cd $WORKING_DIR\n'
798     txt += self.wsSetupCMSOSGEnvironment_()
799     txt += 'fi\n'
800 slacapra 1.1
801     # Prepare JobType-specific part
802     scram = self.scram.commandName()
803     txt += '\n\n'
804     txt += 'echo "### SPECIFIC JOB SETUP ENVIRONMENT ###"\n'
805     txt += scram+' project CMSSW '+self.version+'\n'
806     txt += 'status=$?\n'
807     txt += 'if [ $status != 0 ] ; then\n'
808 gutsche 1.7 txt += ' echo "SET_EXE_ENV 10034 ==>ERROR CMSSW '+self.version+' not found on `hostname`" \n'
809 gutsche 1.3 txt += ' echo "JOB_EXIT_STATUS = 10034"\n'
810 gutsche 1.7 txt += ' echo "JobExitCode=10034" | tee -a $RUNTIME_AREA/$repo\n'
811 slacapra 1.1 txt += ' dumpStatus $RUNTIME_AREA/$repo\n'
812 gutsche 1.13 txt += ' rm -f $RUNTIME_AREA/$repo \n'
813     txt += ' echo "MonitorJobID=`echo $MonitorJobID`" | tee -a $RUNTIME_AREA/$repo \n'
814     txt += ' echo "MonitorID=`echo $MonitorID`" | tee -a $RUNTIME_AREA/$repo\n'
815 gutsche 1.3 ## OLI_Daniele
816     txt += ' if [ $middleware == OSG ]; then \n'
817     txt += ' echo "Remove working directory: $WORKING_DIR"\n'
818     txt += ' cd $RUNTIME_AREA\n'
819     txt += ' /bin/rm -rf $WORKING_DIR\n'
820     txt += ' if [ -d $WORKING_DIR ] ;then\n'
821 gutsche 1.7 txt += ' echo "SET_CMS_ENV 10018 ==> OSG $WORKING_DIR could not be deleted on WN `hostname` after CMSSW CMSSW_0_6_1 not found on `hostname`"\n'
822     txt += ' echo "JOB_EXIT_STATUS = 10018"\n'
823     txt += ' echo "JobExitCode=10018" | tee -a $RUNTIME_AREA/$repo\n'
824     txt += ' dumpStatus $RUNTIME_AREA/$repo\n'
825 gutsche 1.13 txt += ' rm -f $RUNTIME_AREA/$repo \n'
826     txt += ' echo "MonitorJobID=`echo $MonitorJobID`" | tee -a $RUNTIME_AREA/$repo \n'
827     txt += ' echo "MonitorID=`echo $MonitorID`" | tee -a $RUNTIME_AREA/$repo\n'
828 gutsche 1.3 txt += ' fi\n'
829     txt += ' fi \n'
830     txt += ' exit 1 \n'
831 slacapra 1.1 txt += 'fi \n'
832     txt += 'echo "CMSSW_VERSION = '+self.version+'"\n'
833 slacapra 1.71 txt += 'export SCRAM_ARCH='+self.executable_arch+'\n'
834 slacapra 1.1 txt += 'cd '+self.version+'\n'
835     ### needed grep for bug in scramv1 ###
836 corvo 1.58 txt += scram+' runtime -sh\n'
837 slacapra 1.1 txt += 'eval `'+scram+' runtime -sh | grep -v SCRAMRT_LSB_JOBNAME`\n'
838 corvo 1.58 txt += 'echo $PATH\n'
839 slacapra 1.1
840     # Handle the arguments:
841     txt += "\n"
842 gutsche 1.7 txt += "## number of arguments (first argument always jobnumber)\n"
843 slacapra 1.1 txt += "\n"
844 mkirn 1.32 # txt += "narg=$#\n"
845     txt += "if [ $nargs -lt 2 ]\n"
846 slacapra 1.1 txt += "then\n"
847 mkirn 1.33 txt += " echo 'SET_EXE_ENV 1 ==> ERROR Too few arguments' +$nargs+ \n"
848 gutsche 1.3 txt += ' echo "JOB_EXIT_STATUS = 50113"\n'
849 gutsche 1.7 txt += ' echo "JobExitCode=50113" | tee -a $RUNTIME_AREA/$repo\n'
850 slacapra 1.1 txt += ' dumpStatus $RUNTIME_AREA/$repo\n'
851 gutsche 1.13 txt += ' rm -f $RUNTIME_AREA/$repo \n'
852     txt += ' echo "MonitorJobID=`echo $MonitorJobID`" | tee -a $RUNTIME_AREA/$repo \n'
853     txt += ' echo "MonitorID=`echo $MonitorID`" | tee -a $RUNTIME_AREA/$repo\n'
854 gutsche 1.3 ## OLI_Daniele
855     txt += ' if [ $middleware == OSG ]; then \n'
856     txt += ' echo "Remove working directory: $WORKING_DIR"\n'
857     txt += ' cd $RUNTIME_AREA\n'
858     txt += ' /bin/rm -rf $WORKING_DIR\n'
859     txt += ' if [ -d $WORKING_DIR ] ;then\n'
860 gutsche 1.7 txt += ' echo "SET_EXE_ENV 50114 ==> OSG $WORKING_DIR could not be deleted on WN `hostname` after Too few arguments for CRAB job wrapper"\n'
861     txt += ' echo "JOB_EXIT_STATUS = 50114"\n'
862     txt += ' echo "JobExitCode=50114" | tee -a $RUNTIME_AREA/$repo\n'
863     txt += ' dumpStatus $RUNTIME_AREA/$repo\n'
864 gutsche 1.13 txt += ' rm -f $RUNTIME_AREA/$repo \n'
865     txt += ' echo "MonitorJobID=`echo $MonitorJobID`" | tee -a $RUNTIME_AREA/$repo \n'
866     txt += ' echo "MonitorID=`echo $MonitorID`" | tee -a $RUNTIME_AREA/$repo\n'
867 gutsche 1.3 txt += ' fi\n'
868     txt += ' fi \n'
869 slacapra 1.1 txt += " exit 1\n"
870     txt += "fi\n"
871     txt += "\n"
872    
873     # Prepare job-specific part
874     job = common.job_list[nj]
875 spiga 1.42 if self.pset != None: #CarlosDaniele
876     pset = os.path.basename(job.configFilename())
877     txt += '\n'
878     if (self.datasetPath): # standard job
879     #txt += 'InputFiles=$2\n'
880     txt += 'InputFiles=${args[1]}\n'
881     txt += 'MaxEvents=${args[2]}\n'
882     txt += 'SkipEvents=${args[3]}\n'
883     txt += 'echo "Inputfiles:<$InputFiles>"\n'
884     txt += 'sed "s#{\'INPUT\'}#$InputFiles#" $RUNTIME_AREA/'+pset+' > pset_tmp_1.cfg\n'
885     txt += 'echo "MaxEvents:<$MaxEvents>"\n'
886 gutsche 1.46 txt += 'sed "s#INPUTMAXEVENTS#$MaxEvents#" pset_tmp_1.cfg > pset_tmp_2.cfg\n'
887 spiga 1.42 txt += 'echo "SkipEvents:<$SkipEvents>"\n'
888 gutsche 1.46 txt += 'sed "s#INPUTSKIPEVENTS#$SkipEvents#" pset_tmp_2.cfg > pset.cfg\n'
889 spiga 1.42 else: # pythia like job
890     if (self.sourceSeed):
891 spiga 1.57 txt += 'FirstRun=${args[1]}\n'
892     txt += 'echo "FirstRun: <$FirstRun>"\n'
893     txt += 'sed "s#\<INPUTFIRSTRUN\>#$FirstRun#" $RUNTIME_AREA/'+pset+' > tmp_1.cfg\n'
894     else:
895     txt += '# Copy untouched pset\n'
896     txt += 'cp $RUNTIME_AREA/'+pset+' tmp_1.cfg\n'
897     if (self.sourceSeed):
898 spiga 1.42 # txt += 'Seed=$2\n'
899 spiga 1.57 txt += 'Seed=${args[2]}\n'
900 spiga 1.42 txt += 'echo "Seed: <$Seed>"\n'
901 spiga 1.57 txt += 'sed "s#\<INPUT\>#$Seed#" tmp_1.cfg > tmp_2.cfg\n'
902 spiga 1.42 if (self.sourceSeedVtx):
903     # txt += 'VtxSeed=$3\n'
904 spiga 1.57 txt += 'VtxSeed=${args[3]}\n'
905 spiga 1.42 txt += 'echo "VtxSeed: <$VtxSeed>"\n'
906 spiga 1.57 txt += 'sed "s#INPUTVTX#$VtxSeed#" tmp_2.cfg > pset.cfg\n'
907 spiga 1.42 else:
908 spiga 1.57 txt += 'mv tmp_2.cfg pset.cfg\n'
909 slacapra 1.28 else:
910 spiga 1.57 txt += 'mv tmp_1.cfg pset.cfg\n'
911     # txt += '# Copy untouched pset\n'
912     # txt += 'cp $RUNTIME_AREA/'+pset+' pset.cfg\n'
913 slacapra 1.24
914 slacapra 1.1
915     if len(self.additional_inbox_files) > 0:
916     for file in self.additional_inbox_files:
917 mkirn 1.31 relFile = file.split("/")[-1]
918     txt += 'if [ -e $RUNTIME_AREA/'+relFile+' ] ; then\n'
919     txt += ' cp $RUNTIME_AREA/'+relFile+' .\n'
920     txt += ' chmod +x '+relFile+'\n'
921 slacapra 1.1 txt += 'fi\n'
922     pass
923    
924 spiga 1.42 if self.pset != None: #CarlosDaniele
925     txt += 'echo "### END JOB SETUP ENVIRONMENT ###"\n\n'
926    
927     txt += '\n'
928     txt += 'echo "***** cat pset.cfg *********"\n'
929     txt += 'cat pset.cfg\n'
930     txt += 'echo "****** end pset.cfg ********"\n'
931     txt += '\n'
932     # txt += 'echo "***** cat pset1.cfg *********"\n'
933     # txt += 'cat pset1.cfg\n'
934     # txt += 'echo "****** end pset1.cfg ********"\n'
935 gutsche 1.3 return txt
936    
937 slacapra 1.63 def wsBuildExe(self, nj=0):
938 gutsche 1.3 """
939     Put in the script the commands to build an executable
940     or a library.
941     """
942    
943     txt = ""
944    
945     if os.path.isfile(self.tgzNameWithPath):
946     txt += 'echo "tar xzvf $RUNTIME_AREA/'+os.path.basename(self.tgzNameWithPath)+'"\n'
947     txt += 'tar xzvf $RUNTIME_AREA/'+os.path.basename(self.tgzNameWithPath)+'\n'
948     txt += 'untar_status=$? \n'
949     txt += 'if [ $untar_status -ne 0 ]; then \n'
950     txt += ' echo "SET_EXE 1 ==> ERROR Untarring .tgz file failed"\n'
951     txt += ' echo "JOB_EXIT_STATUS = $untar_status" \n'
952 gutsche 1.7 txt += ' echo "JobExitCode=$untar_status" | tee -a $RUNTIME_AREA/$repo\n'
953 gutsche 1.3 txt += ' if [ $middleware == OSG ]; then \n'
954     txt += ' echo "Remove working directory: $WORKING_DIR"\n'
955     txt += ' cd $RUNTIME_AREA\n'
956     txt += ' /bin/rm -rf $WORKING_DIR\n'
957     txt += ' if [ -d $WORKING_DIR ] ;then\n'
958 gutsche 1.13 txt += ' echo "SET_EXE 50999 ==> OSG $WORKING_DIR could not be deleted on WN `hostname` after Untarring .tgz file failed"\n'
959     txt += ' echo "JOB_EXIT_STATUS = 50999"\n'
960     txt += ' echo "JobExitCode=50999" | tee -a $RUNTIME_AREA/$repo\n'
961     txt += ' dumpStatus $RUNTIME_AREA/$repo\n'
962     txt += ' rm -f $RUNTIME_AREA/$repo \n'
963     txt += ' echo "MonitorJobID=`echo $MonitorJobID`" | tee -a $RUNTIME_AREA/$repo \n'
964     txt += ' echo "MonitorID=`echo $MonitorID`" | tee -a $RUNTIME_AREA/$repo\n'
965 gutsche 1.3 txt += ' fi\n'
966     txt += ' fi \n'
967     txt += ' \n'
968 gutsche 1.7 txt += ' exit 1 \n'
969 gutsche 1.3 txt += 'else \n'
970     txt += ' echo "Successful untar" \n'
971     txt += 'fi \n'
972 gutsche 1.50 txt += '\n'
973     txt += 'echo "Include ProdAgentApi in PYTHONPATH"\n'
974     txt += 'if [ -z "$PYTHONPATH" ]; then\n'
975     txt += ' export PYTHONPATH=ProdAgentApi\n'
976     txt += 'else\n'
977     txt += ' export PYTHONPATH=ProdAgentApi:${PYTHONPATH}\n'
978     txt += 'fi\n'
979     txt += '\n'
980    
981 gutsche 1.3 pass
982    
983 slacapra 1.1 return txt
984    
985     def modifySteeringCards(self, nj):
986     """
987     modify the card provided by the user,
988     writing a new card into share dir
989     """
990    
991     def executableName(self):
992 slacapra 1.70 if self.scriptExe: #CarlosDaniele
993 spiga 1.42 return "sh "
994     else:
995     return self.executable
996 slacapra 1.1
997     def executableArgs(self):
998 slacapra 1.70 if self.scriptExe:#CarlosDaniele
999 spiga 1.42 return self.scriptExe + " $NJob"
1000     else:
1001     return " -p pset.cfg"
1002 slacapra 1.1
1003     def inputSandbox(self, nj):
1004     """
1005     Returns a list of filenames to be put in JDL input sandbox.
1006     """
1007     inp_box = []
1008 slacapra 1.53 # # dict added to delete duplicate from input sandbox file list
1009     # seen = {}
1010 slacapra 1.1 ## code
1011     if os.path.isfile(self.tgzNameWithPath):
1012     inp_box.append(self.tgzNameWithPath)
1013 corvo 1.58 if os.path.isfile(self.MLtgzfile):
1014     inp_box.append(self.MLtgzfile)
1015 slacapra 1.1 ## config
1016 slacapra 1.70 if not self.pset is None:
1017 corvo 1.56 inp_box.append(common.work_space.pathForTgz() + 'job/' + self.configFilename())
1018 slacapra 1.1 ## additional input files
1019 slacapra 1.70 for file in self.additional_inbox_files:
1020     inp_box.append(file)
1021 slacapra 1.1 return inp_box
1022    
1023     def outputSandbox(self, nj):
1024     """
1025     Returns a list of filenames to be put in JDL output sandbox.
1026     """
1027     out_box = []
1028    
1029     ## User Declared output files
1030 slacapra 1.54 for out in (self.output_file+self.output_file_sandbox):
1031 slacapra 1.1 n_out = nj + 1
1032     out_box.append(self.numberFile_(out,str(n_out)))
1033     return out_box
1034    
1035     def prepareSteeringCards(self):
1036     """
1037     Make initial modifications of the user's steering card file.
1038     """
1039     return
1040    
1041     def wsRenameOutput(self, nj):
1042     """
1043     Returns part of a job script which renames the produced files.
1044     """
1045    
1046     txt = '\n'
1047 gutsche 1.7 txt += '# directory content\n'
1048     txt += 'ls \n'
1049 slacapra 1.54
1050     for fileWithSuffix in (self.output_file+self.output_file_sandbox):
1051 slacapra 1.1 output_file_num = self.numberFile_(fileWithSuffix, '$NJob')
1052     txt += '\n'
1053 gutsche 1.7 txt += '# check output file\n'
1054 slacapra 1.1 txt += 'ls '+fileWithSuffix+'\n'
1055 fanzago 1.18 txt += 'ls_result=$?\n'
1056     txt += 'if [ $ls_result -ne 0 ] ; then\n'
1057     txt += ' echo "ERROR: Problem with output file"\n'
1058 gutsche 1.7 if common.scheduler.boss_scheduler_name == 'condor_g':
1059     txt += ' if [ $middleware == OSG ]; then \n'
1060     txt += ' echo "prepare dummy output file"\n'
1061     txt += ' echo "Processing of job output failed" > $RUNTIME_AREA/'+output_file_num+'\n'
1062     txt += ' fi \n'
1063 slacapra 1.1 txt += 'else\n'
1064     txt += ' cp '+fileWithSuffix+' $RUNTIME_AREA/'+output_file_num+'\n'
1065     txt += 'fi\n'
1066    
1067 gutsche 1.7 txt += 'cd $RUNTIME_AREA\n'
1068 fanzago 1.18 txt += 'cd $RUNTIME_AREA\n'
1069 gutsche 1.3 ### OLI_DANIELE
1070     txt += 'if [ $middleware == OSG ]; then\n'
1071     txt += ' cd $RUNTIME_AREA\n'
1072     txt += ' echo "Remove working directory: $WORKING_DIR"\n'
1073     txt += ' /bin/rm -rf $WORKING_DIR\n'
1074     txt += ' if [ -d $WORKING_DIR ] ;then\n'
1075 gutsche 1.7 txt += ' echo "SET_EXE 60999 ==> OSG $WORKING_DIR could not be deleted on WN `hostname` after cleanup of WN"\n'
1076     txt += ' echo "JOB_EXIT_STATUS = 60999"\n'
1077     txt += ' echo "JobExitCode=60999" | tee -a $RUNTIME_AREA/$repo\n'
1078     txt += ' dumpStatus $RUNTIME_AREA/$repo\n'
1079 gutsche 1.13 txt += ' rm -f $RUNTIME_AREA/$repo \n'
1080     txt += ' echo "MonitorJobID=`echo $MonitorJobID`" | tee -a $RUNTIME_AREA/$repo \n'
1081     txt += ' echo "MonitorID=`echo $MonitorID`" | tee -a $RUNTIME_AREA/$repo\n'
1082 gutsche 1.3 txt += ' fi\n'
1083     txt += 'fi\n'
1084     txt += '\n'
1085 slacapra 1.54
1086     file_list = ''
1087     ## Add to filelist only files to be possibly copied to SE
1088     for fileWithSuffix in self.output_file:
1089     output_file_num = self.numberFile_(fileWithSuffix, '$NJob')
1090     file_list=file_list+output_file_num+' '
1091     file_list=file_list[:-1]
1092     txt += 'file_list="'+file_list+'"\n'
1093    
1094 slacapra 1.1 return txt
1095    
1096     def numberFile_(self, file, txt):
1097     """
1098     append _'txt' before last extension of a file
1099     """
1100     p = string.split(file,".")
1101     # take away last extension
1102     name = p[0]
1103     for x in p[1:-1]:
1104     name=name+"."+x
1105     # add "_txt"
1106     if len(p)>1:
1107     ext = p[len(p)-1]
1108     result = name + '_' + txt + "." + ext
1109     else:
1110     result = name + '_' + txt
1111    
1112     return result
1113    
1114 slacapra 1.63 def getRequirements(self, nj=[]):
1115 slacapra 1.1 """
1116     return job requirements to add to jdl files
1117     """
1118     req = ''
1119 slacapra 1.47 if self.version:
1120 slacapra 1.10 req='Member("VO-cms-' + \
1121 slacapra 1.47 self.version + \
1122 slacapra 1.10 '", other.GlueHostApplicationSoftwareRunTimeEnvironment)'
1123 gutsche 1.35
1124     req = req + ' && (other.GlueHostNetworkAdapterOutboundIP)'
1125    
1126 slacapra 1.1 return req
1127 gutsche 1.3
1128     def configFilename(self):
1129     """ return the config filename """
1130     return self.name()+'.cfg'
1131    
1132     ### OLI_DANIELE
1133     def wsSetupCMSOSGEnvironment_(self):
1134     """
1135     Returns part of a job script which is prepares
1136     the execution environment and which is common for all CMS jobs.
1137     """
1138     txt = '\n'
1139     txt += ' echo "### SETUP CMS OSG ENVIRONMENT ###"\n'
1140     txt += ' if [ -f $GRID3_APP_DIR/cmssoft/cmsset_default.sh ] ;then\n'
1141     txt += ' # Use $GRID3_APP_DIR/cmssoft/cmsset_default.sh to setup cms software\n'
1142     txt += ' source $GRID3_APP_DIR/cmssoft/cmsset_default.sh '+self.version+'\n'
1143 mkirn 1.40 txt += ' elif [ -f $OSG_APP/cmssoft/cms/cmsset_default.sh ] ;then\n'
1144     txt += ' # Use $OSG_APP/cmssoft/cms/cmsset_default.sh to setup cms software\n'
1145     txt += ' source $OSG_APP/cmssoft/cms/cmsset_default.sh '+self.version+'\n'
1146 gutsche 1.3 txt += ' else\n'
1147 mkirn 1.40 txt += ' echo "SET_CMS_ENV 10020 ==> ERROR $GRID3_APP_DIR/cmssoft/cmsset_default.sh and $OSG_APP/cmssoft/cms/cmsset_default.sh file not found"\n'
1148 gutsche 1.3 txt += ' echo "JOB_EXIT_STATUS = 10020"\n'
1149     txt += ' echo "JobExitCode=10020" | tee -a $RUNTIME_AREA/$repo\n'
1150     txt += ' dumpStatus $RUNTIME_AREA/$repo\n'
1151 gutsche 1.13 txt += ' rm -f $RUNTIME_AREA/$repo \n'
1152     txt += ' echo "MonitorJobID=`echo $MonitorJobID`" | tee -a $RUNTIME_AREA/$repo \n'
1153     txt += ' echo "MonitorID=`echo $MonitorID`" | tee -a $RUNTIME_AREA/$repo\n'
1154 gutsche 1.7 txt += ' exit 1\n'
1155 gutsche 1.3 txt += '\n'
1156     txt += ' echo "Remove working directory: $WORKING_DIR"\n'
1157     txt += ' cd $RUNTIME_AREA\n'
1158     txt += ' /bin/rm -rf $WORKING_DIR\n'
1159     txt += ' if [ -d $WORKING_DIR ] ;then\n'
1160 mkirn 1.40 txt += ' echo "SET_CMS_ENV 10017 ==> OSG $WORKING_DIR could not be deleted on WN `hostname` after $GRID3_APP_DIR/cmssoft/cmsset_default.sh and $OSG_APP/cmssoft/cms/cmsset_default.sh file not found"\n'
1161 gutsche 1.7 txt += ' echo "JOB_EXIT_STATUS = 10017"\n'
1162     txt += ' echo "JobExitCode=10017" | tee -a $RUNTIME_AREA/$repo\n'
1163     txt += ' dumpStatus $RUNTIME_AREA/$repo\n'
1164 gutsche 1.13 txt += ' rm -f $RUNTIME_AREA/$repo \n'
1165     txt += ' echo "MonitorJobID=`echo $MonitorJobID`" | tee -a $RUNTIME_AREA/$repo \n'
1166     txt += ' echo "MonitorID=`echo $MonitorID`" | tee -a $RUNTIME_AREA/$repo\n'
1167 gutsche 1.3 txt += ' fi\n'
1168     txt += '\n'
1169 gutsche 1.7 txt += ' exit 1\n'
1170 gutsche 1.3 txt += ' fi\n'
1171     txt += '\n'
1172     txt += ' echo "SET_CMS_ENV 0 ==> setup cms environment ok"\n'
1173     txt += ' echo " END SETUP CMS OSG ENVIRONMENT "\n'
1174    
1175     return txt
1176    
1177     ### OLI_DANIELE
1178     def wsSetupCMSLCGEnvironment_(self):
1179     """
1180     Returns part of a job script which is prepares
1181     the execution environment and which is common for all CMS jobs.
1182     """
1183     txt = ' \n'
1184     txt += ' echo " ### SETUP CMS LCG ENVIRONMENT ### "\n'
1185     txt += ' if [ ! $VO_CMS_SW_DIR ] ;then\n'
1186     txt += ' echo "SET_CMS_ENV 10031 ==> ERROR CMS software dir not found on WN `hostname`"\n'
1187     txt += ' echo "JOB_EXIT_STATUS = 10031" \n'
1188     txt += ' echo "JobExitCode=10031" | tee -a $RUNTIME_AREA/$repo\n'
1189     txt += ' dumpStatus $RUNTIME_AREA/$repo\n'
1190 gutsche 1.13 txt += ' rm -f $RUNTIME_AREA/$repo \n'
1191     txt += ' echo "MonitorJobID=`echo $MonitorJobID`" | tee -a $RUNTIME_AREA/$repo \n'
1192     txt += ' echo "MonitorID=`echo $MonitorID`" | tee -a $RUNTIME_AREA/$repo\n'
1193 gutsche 1.7 txt += ' exit 1\n'
1194 gutsche 1.3 txt += ' else\n'
1195     txt += ' echo "Sourcing environment... "\n'
1196     txt += ' if [ ! -s $VO_CMS_SW_DIR/cmsset_default.sh ] ;then\n'
1197     txt += ' echo "SET_CMS_ENV 10020 ==> ERROR cmsset_default.sh file not found into dir $VO_CMS_SW_DIR"\n'
1198     txt += ' echo "JOB_EXIT_STATUS = 10020"\n'
1199     txt += ' echo "JobExitCode=10020" | tee -a $RUNTIME_AREA/$repo\n'
1200     txt += ' dumpStatus $RUNTIME_AREA/$repo\n'
1201 gutsche 1.13 txt += ' rm -f $RUNTIME_AREA/$repo \n'
1202     txt += ' echo "MonitorJobID=`echo $MonitorJobID`" | tee -a $RUNTIME_AREA/$repo \n'
1203     txt += ' echo "MonitorID=`echo $MonitorID`" | tee -a $RUNTIME_AREA/$repo\n'
1204 gutsche 1.7 txt += ' exit 1\n'
1205 gutsche 1.3 txt += ' fi\n'
1206     txt += ' echo "sourcing $VO_CMS_SW_DIR/cmsset_default.sh"\n'
1207     txt += ' source $VO_CMS_SW_DIR/cmsset_default.sh\n'
1208     txt += ' result=$?\n'
1209     txt += ' if [ $result -ne 0 ]; then\n'
1210     txt += ' echo "SET_CMS_ENV 10032 ==> ERROR problem sourcing $VO_CMS_SW_DIR/cmsset_default.sh"\n'
1211     txt += ' echo "JOB_EXIT_STATUS = 10032"\n'
1212     txt += ' echo "JobExitCode=10032" | tee -a $RUNTIME_AREA/$repo\n'
1213     txt += ' dumpStatus $RUNTIME_AREA/$repo\n'
1214 gutsche 1.13 txt += ' rm -f $RUNTIME_AREA/$repo \n'
1215     txt += ' echo "MonitorJobID=`echo $MonitorJobID`" | tee -a $RUNTIME_AREA/$repo \n'
1216     txt += ' echo "MonitorID=`echo $MonitorID`" | tee -a $RUNTIME_AREA/$repo\n'
1217 gutsche 1.7 txt += ' exit 1\n'
1218 gutsche 1.3 txt += ' fi\n'
1219     txt += ' fi\n'
1220     txt += ' \n'
1221     txt += ' echo "SET_CMS_ENV 0 ==> setup cms environment ok"\n'
1222     txt += ' echo "### END SETUP CMS LCG ENVIRONMENT ###"\n'
1223     return txt
1224 gutsche 1.5
1225     def setParam_(self, param, value):
1226     self._params[param] = value
1227    
1228     def getParams(self):
1229     return self._params
1230 gutsche 1.8
1231     def setTaskid_(self):
1232     self._taskId = self.cfg_params['taskId']
1233    
1234     def getTaskid(self):
1235     return self._taskId
1236 gutsche 1.35
1237     #######################################################################
1238     def uniquelist(self, old):
1239     """
1240     remove duplicates from a list
1241     """
1242     nd={}
1243     for e in old:
1244     nd[e]=0
1245     return nd.keys()