ViewVC Help
View File | Revision Log | Show Annotations | Root Listing
root/cvsroot/COMP/CRAB/python/cms_cmssw.py
Revision: 1.75
Committed: Sun Apr 8 23:50:33 2007 UTC (18 years ago) by gutsche
Content type: text/x-python
Branch: MAIN
CVS Tags: CRAB_1_5_0
Changes since 1.74: +6 -2 lines
Log Message:
DBS tag changed to latest tag: DBS_1_0_0_pre6
DBS default URL changed to   : http://cmsdbsprod.cern.ch/cms_dbs_int_global/servlet/DBSServlet
DBS api version changed to   : v00_00_06

new format of datasetPath: /primaryDataset/processedDataset/Tier

Parsing for ML report adapted accordingly

File Contents

# User Rev Content
1 slacapra 1.1 from JobType import JobType
2     from crab_logger import Logger
3     from crab_exceptions import *
4     from crab_util import *
5     import common
6 gutsche 1.3 import PsetManipulator
7 slacapra 1.41 import DataDiscovery
8 gutsche 1.66 import DataDiscovery_DBS2
9 slacapra 1.41 import DataLocation
10 slacapra 1.1 import Scram
11    
12 slacapra 1.70 import os, string, re, shutil, glob
13 slacapra 1.1
14     class Cmssw(JobType):
15 gutsche 1.38 def __init__(self, cfg_params, ncjobs):
16 slacapra 1.1 JobType.__init__(self, 'CMSSW')
17     common.logger.debug(3,'CMSSW::__init__')
18    
19 gutsche 1.3 # Marco.
20     self._params = {}
21     self.cfg_params = cfg_params
22 gutsche 1.38
23 gutsche 1.72 try:
24     self.MaxTarBallSize = float(self.cfg_params['EDG.maxtarballsize'])
25     except KeyError:
26     self.MaxTarBallSize = 100.0
27    
28 gutsche 1.44 # number of jobs requested to be created, limit obj splitting
29 gutsche 1.38 self.ncjobs = ncjobs
30    
31 slacapra 1.1 log = common.logger
32    
33     self.scram = Scram.Scram(cfg_params)
34     self.additional_inbox_files = []
35     self.scriptExe = ''
36     self.executable = ''
37 slacapra 1.71 self.executable_arch = self.scram.getArch()
38 slacapra 1.1 self.tgz_name = 'default.tgz'
39 corvo 1.56 self.scriptName = 'CMSSW.sh'
40 spiga 1.42 self.pset = '' #scrip use case Da
41     self.datasetPath = '' #scrip use case Da
42 gutsche 1.3
43 gutsche 1.50 # set FJR file name
44     self.fjrFileName = 'crab_fjr.xml'
45    
46 slacapra 1.1 self.version = self.scram.getSWVersion()
47 slacapra 1.55 common.taskDB.setDict('codeVersion',self.version)
48 gutsche 1.5 self.setParam_('application', self.version)
49 slacapra 1.47
50 slacapra 1.1 ### collect Data cards
51 gutsche 1.66
52     ## get DBS mode
53     try:
54     self.use_dbs_2 = int(self.cfg_params['CMSSW.use_dbs_2'])
55     except KeyError:
56     self.use_dbs_2 = 0
57    
58 slacapra 1.1 try:
59 slacapra 1.9 tmp = cfg_params['CMSSW.datasetpath']
60     log.debug(6, "CMSSW::CMSSW(): datasetPath = "+tmp)
61     if string.lower(tmp)=='none':
62     self.datasetPath = None
63 slacapra 1.21 self.selectNoInput = 1
64 slacapra 1.9 else:
65     self.datasetPath = tmp
66 slacapra 1.21 self.selectNoInput = 0
67 slacapra 1.1 except KeyError:
68 gutsche 1.3 msg = "Error: datasetpath not defined "
69 slacapra 1.1 raise CrabException(msg)
70 gutsche 1.5
71     # ML monitoring
72     # split dataset path style: /PreProdR3Minbias/SIM/GEN-SIM
73 slacapra 1.9 if not self.datasetPath:
74     self.setParam_('dataset', 'None')
75     self.setParam_('owner', 'None')
76     else:
77     datasetpath_split = self.datasetPath.split("/")
78 gutsche 1.75 if self.use_dbs_2 == 1 :
79     self.setParam_('dataset', datasetpath_split[1])
80     self.setParam_('owner', datasetpath_split[2])
81     else :
82     self.setParam_('dataset', datasetpath_split[1])
83     self.setParam_('owner', datasetpath_split[-1])
84 slacapra 1.9
85 gutsche 1.8 self.setTaskid_()
86     self.setParam_('taskId', self.cfg_params['taskId'])
87 gutsche 1.5
88 slacapra 1.1 self.dataTiers = []
89    
90     ## now the application
91     try:
92     self.executable = cfg_params['CMSSW.executable']
93 gutsche 1.5 self.setParam_('exe', self.executable)
94 slacapra 1.1 log.debug(6, "CMSSW::CMSSW(): executable = "+self.executable)
95     msg = "Default executable cmsRun overridden. Switch to " + self.executable
96     log.debug(3,msg)
97     except KeyError:
98     self.executable = 'cmsRun'
99 gutsche 1.5 self.setParam_('exe', self.executable)
100 slacapra 1.1 msg = "User executable not defined. Use cmsRun"
101     log.debug(3,msg)
102     pass
103    
104     try:
105     self.pset = cfg_params['CMSSW.pset']
106     log.debug(6, "Cmssw::Cmssw(): PSet file = "+self.pset)
107 spiga 1.42 if self.pset.lower() != 'none' :
108     if (not os.path.exists(self.pset)):
109     raise CrabException("User defined PSet file "+self.pset+" does not exist")
110     else:
111     self.pset = None
112 slacapra 1.1 except KeyError:
113     raise CrabException("PSet file missing. Cannot run cmsRun ")
114    
115     # output files
116 slacapra 1.53 ## stuff which must be returned always via sandbox
117     self.output_file_sandbox = []
118    
119     # add fjr report by default via sandbox
120     self.output_file_sandbox.append(self.fjrFileName)
121    
122     # other output files to be returned via sandbox or copied to SE
123 slacapra 1.1 try:
124     self.output_file = []
125     tmp = cfg_params['CMSSW.output_file']
126     if tmp != '':
127     tmpOutFiles = string.split(cfg_params['CMSSW.output_file'],',')
128     log.debug(7, 'cmssw::cmssw(): output files '+str(tmpOutFiles))
129     for tmp in tmpOutFiles:
130     tmp=string.strip(tmp)
131     self.output_file.append(tmp)
132     pass
133     else:
134 gutsche 1.50 log.message("No output file defined: only stdout/err and the CRAB Framework Job Report will be available")
135 slacapra 1.1 pass
136     pass
137     except KeyError:
138 gutsche 1.50 log.message("No output file defined: only stdout/err and the CRAB Framework Job Report will be available")
139 slacapra 1.1 pass
140    
141     # script_exe file as additional file in inputSandbox
142     try:
143 slacapra 1.10 self.scriptExe = cfg_params['USER.script_exe']
144     if self.scriptExe != '':
145     if not os.path.isfile(self.scriptExe):
146 slacapra 1.64 msg ="ERROR. file "+self.scriptExe+" not found"
147 slacapra 1.10 raise CrabException(msg)
148 spiga 1.42 self.additional_inbox_files.append(string.strip(self.scriptExe))
149 slacapra 1.1 except KeyError:
150 spiga 1.42 self.scriptExe = ''
151 slacapra 1.70
152 spiga 1.42 #CarlosDaniele
153     if self.datasetPath == None and self.pset == None and self.scriptExe == '' :
154 slacapra 1.70 msg ="Error. script_exe not defined"
155 spiga 1.42 raise CrabException(msg)
156    
157 slacapra 1.1 ## additional input files
158     try:
159 slacapra 1.29 tmpAddFiles = string.split(cfg_params['USER.additional_input_files'],',')
160 slacapra 1.70 for tmp in tmpAddFiles:
161     tmp = string.strip(tmp)
162     dirname = ''
163     if not tmp[0]=="/": dirname = "."
164     files = glob.glob(os.path.join(dirname, tmp))
165     for file in files:
166     if not os.path.exists(file):
167     raise CrabException("Additional input file not found: "+file)
168 slacapra 1.45 pass
169 slacapra 1.70 storedFile = common.work_space.shareDir()+file
170     shutil.copyfile(file, storedFile)
171     self.additional_inbox_files.append(string.strip(storedFile))
172 slacapra 1.1 pass
173     pass
174 slacapra 1.70 common.logger.debug(5,"Additional input files: "+str(self.additional_inbox_files))
175 slacapra 1.1 except KeyError:
176     pass
177    
178 slacapra 1.9 # files per job
179 slacapra 1.1 try:
180 gutsche 1.35 if (cfg_params['CMSSW.files_per_jobs']):
181     raise CrabException("files_per_jobs no longer supported. Quitting.")
182 gutsche 1.3 except KeyError:
183 gutsche 1.35 pass
184 gutsche 1.3
185 slacapra 1.9 ## Events per job
186 gutsche 1.3 try:
187 slacapra 1.10 self.eventsPerJob =int( cfg_params['CMSSW.events_per_job'])
188 slacapra 1.9 self.selectEventsPerJob = 1
189 gutsche 1.3 except KeyError:
190 slacapra 1.9 self.eventsPerJob = -1
191     self.selectEventsPerJob = 0
192    
193 slacapra 1.22 ## number of jobs
194     try:
195     self.theNumberOfJobs =int( cfg_params['CMSSW.number_of_jobs'])
196     self.selectNumberOfJobs = 1
197     except KeyError:
198     self.theNumberOfJobs = 0
199     self.selectNumberOfJobs = 0
200 slacapra 1.10
201 gutsche 1.35 try:
202     self.total_number_of_events = int(cfg_params['CMSSW.total_number_of_events'])
203     self.selectTotalNumberEvents = 1
204     except KeyError:
205     self.total_number_of_events = 0
206     self.selectTotalNumberEvents = 0
207    
208 spiga 1.42 if self.pset != None: #CarlosDaniele
209     if ( (self.selectTotalNumberEvents + self.selectEventsPerJob + self.selectNumberOfJobs) != 2 ):
210     msg = 'Must define exactly two of total_number_of_events, events_per_job, or number_of_jobs.'
211     raise CrabException(msg)
212     else:
213     if (self.selectNumberOfJobs == 0):
214     msg = 'Must specify number_of_jobs.'
215     raise CrabException(msg)
216 gutsche 1.35
217 slacapra 1.22 ## source seed for pythia
218     try:
219     self.sourceSeed = int(cfg_params['CMSSW.pythia_seed'])
220     except KeyError:
221 slacapra 1.23 self.sourceSeed = None
222     common.logger.debug(5,"No seed given")
223 slacapra 1.22
224 slacapra 1.28 try:
225     self.sourceSeedVtx = int(cfg_params['CMSSW.vtx_seed'])
226     except KeyError:
227     self.sourceSeedVtx = None
228     common.logger.debug(5,"No vertex seed given")
229 spiga 1.57 try:
230     self.firstRun = int(cfg_params['CMSSW.first_run'])
231     except KeyError:
232     self.firstRun = None
233     common.logger.debug(5,"No first run given")
234 spiga 1.42 if self.pset != None: #CarlosDaniele
235     self.PsetEdit = PsetManipulator.PsetManipulator(self.pset) #Daniele Pset
236 gutsche 1.3
237 slacapra 1.1 #DBSDLS-start
238     ## Initialize the variables that are extracted from DBS/DLS and needed in other places of the code
239     self.maxEvents=0 # max events available ( --> check the requested nb. of evts in Creator.py)
240     self.DBSPaths={} # all dbs paths requested ( --> input to the site local discovery script)
241 gutsche 1.35 self.jobDestination=[] # Site destination(s) for each job (list of lists)
242 slacapra 1.1 ## Perform the data location and discovery (based on DBS/DLS)
243 slacapra 1.9 ## SL: Don't if NONE is specified as input (pythia use case)
244 gutsche 1.35 blockSites = {}
245 slacapra 1.9 if self.datasetPath:
246 gutsche 1.35 blockSites = self.DataDiscoveryAndLocation(cfg_params)
247 slacapra 1.1 #DBSDLS-end
248    
249     self.tgzNameWithPath = self.getTarBall(self.executable)
250 slacapra 1.10
251 slacapra 1.9 ## Select Splitting
252 spiga 1.42 if self.selectNoInput:
253     if self.pset == None: #CarlosDaniele
254     self.jobSplittingForScript()
255     else:
256     self.jobSplittingNoInput()
257 corvo 1.56 else:
258     self.jobSplittingByBlocks(blockSites)
259 gutsche 1.5
260 slacapra 1.22 # modify Pset
261 spiga 1.42 if self.pset != None: #CarlosDaniele
262     try:
263     if (self.datasetPath): # standard job
264     # allow to processa a fraction of events in a file
265     self.PsetEdit.inputModule("INPUT")
266     self.PsetEdit.maxEvent("INPUTMAXEVENTS")
267     self.PsetEdit.skipEvent("INPUTSKIPEVENTS")
268     else: # pythia like job
269     self.PsetEdit.maxEvent(self.eventsPerJob)
270 spiga 1.57 if (self.firstRun):
271     self.PsetEdit.pythiaFirstRun("INPUTFIRSTRUN") #First Run
272 spiga 1.42 if (self.sourceSeed) :
273     self.PsetEdit.pythiaSeed("INPUT")
274     if (self.sourceSeedVtx) :
275     self.PsetEdit.pythiaSeedVtx("INPUTVTX")
276 gutsche 1.50 # add FrameworkJobReport to parameter-set
277     self.PsetEdit.addCrabFJR(self.fjrFileName)
278 spiga 1.42 self.PsetEdit.psetWriter(self.configFilename())
279     except:
280     msg='Error while manipuliating ParameterSet: exiting...'
281     raise CrabException(msg)
282 gutsche 1.3
283 slacapra 1.1 def DataDiscoveryAndLocation(self, cfg_params):
284    
285 gutsche 1.3 common.logger.debug(10,"CMSSW::DataDiscoveryAndLocation()")
286    
287     datasetPath=self.datasetPath
288    
289 slacapra 1.1 ## Contact the DBS
290 slacapra 1.41 common.logger.message("Contacting DBS...")
291 slacapra 1.1 try:
292 gutsche 1.66
293     if self.use_dbs_2 == 1 :
294     self.pubdata=DataDiscovery_DBS2.DataDiscovery_DBS2(datasetPath, cfg_params)
295     else :
296     self.pubdata=DataDiscovery.DataDiscovery(datasetPath, cfg_params)
297 slacapra 1.1 self.pubdata.fetchDBSInfo()
298    
299 slacapra 1.41 except DataDiscovery.NotExistingDatasetError, ex :
300 slacapra 1.1 msg = 'ERROR ***: failed Data Discovery in DBS : %s'%ex.getErrorMessage()
301     raise CrabException(msg)
302 slacapra 1.41 except DataDiscovery.NoDataTierinProvenanceError, ex :
303 slacapra 1.1 msg = 'ERROR ***: failed Data Discovery in DBS : %s'%ex.getErrorMessage()
304     raise CrabException(msg)
305 slacapra 1.41 except DataDiscovery.DataDiscoveryError, ex:
306 gutsche 1.66 msg = 'ERROR ***: failed Data Discovery in DBS : %s'%ex.getErrorMessage()
307 slacapra 1.1 raise CrabException(msg)
308 gutsche 1.67 except DataDiscovery_DBS2.NotExistingDatasetError_DBS2, ex :
309     msg = 'ERROR ***: failed Data Discovery in DBS : %s'%ex.getErrorMessage()
310     raise CrabException(msg)
311     except DataDiscovery_DBS2.NoDataTierinProvenanceError_DBS2, ex :
312     msg = 'ERROR ***: failed Data Discovery in DBS : %s'%ex.getErrorMessage()
313     raise CrabException(msg)
314     except DataDiscovery_DBS2.DataDiscoveryError_DBS2, ex:
315     msg = 'ERROR ***: failed Data Discovery in DBS : %s'%ex.getErrorMessage()
316     raise CrabException(msg)
317 slacapra 1.1
318     ## get list of all required data in the form of dbs paths (dbs path = /dataset/datatier/owner)
319 gutsche 1.3 common.logger.message("Required data are :"+self.datasetPath)
320    
321 gutsche 1.35 self.filesbyblock=self.pubdata.getFiles()
322 mkirn 1.37 self.eventsbyblock=self.pubdata.getEventsPerBlock()
323     self.eventsbyfile=self.pubdata.getEventsPerFile()
324 gutsche 1.3
325 slacapra 1.1 ## get max number of events
326     self.maxEvents=self.pubdata.getMaxEvents() ## self.maxEvents used in Creator.py
327 gutsche 1.44 common.logger.message("The number of available events is %s\n"%self.maxEvents)
328 slacapra 1.1
329 slacapra 1.41 common.logger.message("Contacting DLS...")
330 slacapra 1.1 ## Contact the DLS and build a list of sites hosting the fileblocks
331     try:
332 slacapra 1.41 dataloc=DataLocation.DataLocation(self.filesbyblock.keys(),cfg_params)
333 gutsche 1.6 dataloc.fetchDLSInfo()
334 slacapra 1.41 except DataLocation.DataLocationError , ex:
335 slacapra 1.1 msg = 'ERROR ***: failed Data Location in DLS \n %s '%ex.getErrorMessage()
336     raise CrabException(msg)
337    
338    
339 gutsche 1.35 sites = dataloc.getSites()
340     allSites = []
341     listSites = sites.values()
342 slacapra 1.63 for listSite in listSites:
343     for oneSite in listSite:
344 gutsche 1.35 allSites.append(oneSite)
345     allSites = self.uniquelist(allSites)
346 gutsche 1.3
347 gutsche 1.35 common.logger.message("Sites ("+str(len(allSites))+") hosting part/all of dataset: "+str(allSites))
348     common.logger.debug(6, "List of Sites: "+str(allSites))
349     return sites
350 gutsche 1.3
351 gutsche 1.35 def jobSplittingByBlocks(self, blockSites):
352 slacapra 1.9 """
353 gutsche 1.35 Perform job splitting. Jobs run over an integer number of files
354     and no more than one block.
355     ARGUMENT: blockSites: dictionary with blocks as keys and list of host sites as values
356     REQUIRES: self.selectTotalNumberEvents, self.selectEventsPerJob, self.selectNumberofJobs,
357     self.total_number_of_events, self.eventsPerJob, self.theNumberOfJobs,
358     self.maxEvents, self.filesbyblock
359     SETS: self.jobDestination - Site destination(s) for each job (a list of lists)
360     self.total_number_of_jobs - Total # of jobs
361     self.list_of_args - File(s) job will run on (a list of lists)
362     """
363    
364     # ---- Handle the possible job splitting configurations ---- #
365     if (self.selectTotalNumberEvents):
366     totalEventsRequested = self.total_number_of_events
367     if (self.selectEventsPerJob):
368     eventsPerJobRequested = self.eventsPerJob
369     if (self.selectNumberOfJobs):
370     totalEventsRequested = self.theNumberOfJobs * self.eventsPerJob
371    
372     # If user requested all the events in the dataset
373     if (totalEventsRequested == -1):
374     eventsRemaining=self.maxEvents
375     # If user requested more events than are in the dataset
376     elif (totalEventsRequested > self.maxEvents):
377     eventsRemaining = self.maxEvents
378     common.logger.message("Requested "+str(self.total_number_of_events)+ " events, but only "+str(self.maxEvents)+" events are available.")
379     # If user requested less events than are in the dataset
380     else:
381     eventsRemaining = totalEventsRequested
382 slacapra 1.22
383 slacapra 1.41 # If user requested more events per job than are in the dataset
384     if (self.selectEventsPerJob and eventsPerJobRequested > self.maxEvents):
385     eventsPerJobRequested = self.maxEvents
386    
387 gutsche 1.35 # For user info at end
388     totalEventCount = 0
389 gutsche 1.3
390 gutsche 1.35 if (self.selectTotalNumberEvents and self.selectNumberOfJobs):
391     eventsPerJobRequested = int(eventsRemaining/self.theNumberOfJobs)
392 slacapra 1.22
393 gutsche 1.35 if (self.selectNumberOfJobs):
394     common.logger.message("May not create the exact number_of_jobs requested.")
395 slacapra 1.23
396 gutsche 1.38 if ( self.ncjobs == 'all' ) :
397     totalNumberOfJobs = 999999999
398     else :
399     totalNumberOfJobs = self.ncjobs
400    
401    
402 gutsche 1.35 blocks = blockSites.keys()
403     blockCount = 0
404     # Backup variable in case self.maxEvents counted events in a non-included block
405     numBlocksInDataset = len(blocks)
406 gutsche 1.3
407 gutsche 1.35 jobCount = 0
408     list_of_lists = []
409 gutsche 1.3
410 gutsche 1.35 # ---- Iterate over the blocks in the dataset until ---- #
411     # ---- we've met the requested total # of events ---- #
412 gutsche 1.38 while ( (eventsRemaining > 0) and (blockCount < numBlocksInDataset) and (jobCount < totalNumberOfJobs)):
413 gutsche 1.35 block = blocks[blockCount]
414 gutsche 1.44 blockCount += 1
415    
416 gutsche 1.68 if self.eventsbyblock.has_key(block) :
417     numEventsInBlock = self.eventsbyblock[block]
418     common.logger.debug(5,'Events in Block File '+str(numEventsInBlock))
419 slacapra 1.9
420 gutsche 1.68 files = self.filesbyblock[block]
421     numFilesInBlock = len(files)
422     if (numFilesInBlock <= 0):
423     continue
424     fileCount = 0
425    
426     # ---- New block => New job ---- #
427     parString = "\\{"
428     # counter for number of events in files currently worked on
429     filesEventCount = 0
430     # flag if next while loop should touch new file
431     newFile = 1
432     # job event counter
433     jobSkipEventCount = 0
434 slacapra 1.9
435 gutsche 1.68 # ---- Iterate over the files in the block until we've met the requested ---- #
436     # ---- total # of events or we've gone over all the files in this block ---- #
437     while ( (eventsRemaining > 0) and (fileCount < numFilesInBlock) and (jobCount < totalNumberOfJobs) ):
438     file = files[fileCount]
439     if newFile :
440     try:
441     numEventsInFile = self.eventsbyfile[file]
442     common.logger.debug(6, "File "+str(file)+" has "+str(numEventsInFile)+" events")
443     # increase filesEventCount
444     filesEventCount += numEventsInFile
445     # Add file to current job
446     parString += '\\\"' + file + '\\\"\,'
447     newFile = 0
448     except KeyError:
449     common.logger.message("File "+str(file)+" has unknown number of events: skipping")
450 slacapra 1.41
451 gutsche 1.38
452 gutsche 1.68 # if less events in file remain than eventsPerJobRequested
453     if ( filesEventCount - jobSkipEventCount < eventsPerJobRequested ) :
454     # if last file in block
455     if ( fileCount == numFilesInBlock-1 ) :
456     # end job using last file, use remaining events in block
457     # close job and touch new file
458     fullString = parString[:-2]
459     fullString += '\\}'
460     list_of_lists.append([fullString,str(-1),str(jobSkipEventCount)])
461     common.logger.debug(3,"Job "+str(jobCount+1)+" can run over "+str(filesEventCount - jobSkipEventCount)+" events (last file in block).")
462     self.jobDestination.append(blockSites[block])
463     common.logger.debug(5,"Job "+str(jobCount+1)+" Destination: "+str(self.jobDestination[jobCount]))
464     # reset counter
465     jobCount = jobCount + 1
466     totalEventCount = totalEventCount + filesEventCount - jobSkipEventCount
467     eventsRemaining = eventsRemaining - filesEventCount + jobSkipEventCount
468     jobSkipEventCount = 0
469     # reset file
470     parString = "\\{"
471     filesEventCount = 0
472     newFile = 1
473     fileCount += 1
474     else :
475     # go to next file
476     newFile = 1
477     fileCount += 1
478     # if events in file equal to eventsPerJobRequested
479     elif ( filesEventCount - jobSkipEventCount == eventsPerJobRequested ) :
480 gutsche 1.38 # close job and touch new file
481     fullString = parString[:-2]
482     fullString += '\\}'
483 gutsche 1.68 list_of_lists.append([fullString,str(eventsPerJobRequested),str(jobSkipEventCount)])
484     common.logger.debug(3,"Job "+str(jobCount+1)+" can run over "+str(eventsPerJobRequested)+" events.")
485 gutsche 1.38 self.jobDestination.append(blockSites[block])
486     common.logger.debug(5,"Job "+str(jobCount+1)+" Destination: "+str(self.jobDestination[jobCount]))
487     # reset counter
488     jobCount = jobCount + 1
489 gutsche 1.68 totalEventCount = totalEventCount + eventsPerJobRequested
490     eventsRemaining = eventsRemaining - eventsPerJobRequested
491 gutsche 1.38 jobSkipEventCount = 0
492     # reset file
493     parString = "\\{"
494     filesEventCount = 0
495     newFile = 1
496     fileCount += 1
497 gutsche 1.68
498     # if more events in file remain than eventsPerJobRequested
499 gutsche 1.38 else :
500 gutsche 1.68 # close job but don't touch new file
501     fullString = parString[:-2]
502     fullString += '\\}'
503     list_of_lists.append([fullString,str(eventsPerJobRequested),str(jobSkipEventCount)])
504     common.logger.debug(3,"Job "+str(jobCount+1)+" can run over "+str(eventsPerJobRequested)+" events.")
505     self.jobDestination.append(blockSites[block])
506     common.logger.debug(5,"Job "+str(jobCount+1)+" Destination: "+str(self.jobDestination[jobCount]))
507     # increase counter
508     jobCount = jobCount + 1
509     totalEventCount = totalEventCount + eventsPerJobRequested
510     eventsRemaining = eventsRemaining - eventsPerJobRequested
511     # calculate skip events for last file
512     # use filesEventCount (contains several files), jobSkipEventCount and eventsPerJobRequest
513     jobSkipEventCount = eventsPerJobRequested - (filesEventCount - jobSkipEventCount - self.eventsbyfile[file])
514     # remove all but the last file
515     filesEventCount = self.eventsbyfile[file]
516     parString = "\\{"
517     parString += '\\\"' + file + '\\\"\,'
518     pass # END if
519     pass # END while (iterate over files in the block)
520 gutsche 1.35 pass # END while (iterate over blocks in the dataset)
521 slacapra 1.41 self.ncjobs = self.total_number_of_jobs = jobCount
522 gutsche 1.38 if (eventsRemaining > 0 and jobCount < totalNumberOfJobs ):
523 gutsche 1.35 common.logger.message("Could not run on all requested events because some blocks not hosted at allowed sites.")
524 mkirn 1.37 common.logger.message("\n"+str(jobCount)+" job(s) can run on "+str(totalEventCount)+" events.\n")
525 slacapra 1.22
526 slacapra 1.9 self.list_of_args = list_of_lists
527     return
528    
529 slacapra 1.21 def jobSplittingNoInput(self):
530 slacapra 1.9 """
531     Perform job splitting based on number of event per job
532     """
533     common.logger.debug(5,'Splitting per events')
534     common.logger.message('Required '+str(self.eventsPerJob)+' events per job ')
535 slacapra 1.22 common.logger.message('Required '+str(self.theNumberOfJobs)+' jobs in total ')
536 slacapra 1.9 common.logger.message('Required '+str(self.total_number_of_events)+' events in total ')
537    
538 slacapra 1.10 if (self.total_number_of_events < 0):
539     msg='Cannot split jobs per Events with "-1" as total number of events'
540     raise CrabException(msg)
541    
542 slacapra 1.22 if (self.selectEventsPerJob):
543 spiga 1.65 if (self.selectTotalNumberEvents):
544     self.total_number_of_jobs = int(self.total_number_of_events/self.eventsPerJob)
545     elif(self.selectNumberOfJobs) :
546     self.total_number_of_jobs =self.theNumberOfJobs
547     self.total_number_of_events =int(self.theNumberOfJobs*self.eventsPerJob)
548    
549 slacapra 1.22 elif (self.selectNumberOfJobs) :
550     self.total_number_of_jobs = self.theNumberOfJobs
551     self.eventsPerJob = int(self.total_number_of_events/self.total_number_of_jobs)
552 spiga 1.65
553 slacapra 1.9 common.logger.debug(5,'N jobs '+str(self.total_number_of_jobs))
554    
555     # is there any remainder?
556     check = int(self.total_number_of_events) - (int(self.total_number_of_jobs)*self.eventsPerJob)
557    
558     common.logger.debug(5,'Check '+str(check))
559    
560 gutsche 1.35 common.logger.message(str(self.total_number_of_jobs)+' jobs can be created, each for '+str(self.eventsPerJob)+' for a total of '+str(self.total_number_of_jobs*self.eventsPerJob)+' events')
561 slacapra 1.9 if check > 0:
562 gutsche 1.35 common.logger.message('Warning: asked '+str(self.total_number_of_events)+' but can do only '+str(int(self.total_number_of_jobs)*self.eventsPerJob))
563 slacapra 1.9
564 slacapra 1.10 # argument is seed number.$i
565 slacapra 1.9 self.list_of_args = []
566     for i in range(self.total_number_of_jobs):
567 gutsche 1.35 ## Since there is no input, any site is good
568 spiga 1.42 # self.jobDestination.append(["Any"])
569     self.jobDestination.append([""]) #must be empty to write correctly the xml
570 spiga 1.57 args=''
571     if (self.firstRun):
572     ## pythia first run
573     #self.list_of_args.append([(str(self.firstRun)+str(i))])
574     args=args+(str(self.firstRun)+str(i))
575     else:
576     ## no first run
577     #self.list_of_args.append([str(i)])
578     args=args+str(i)
579 slacapra 1.23 if (self.sourceSeed):
580 slacapra 1.28 if (self.sourceSeedVtx):
581     ## pythia + vtx random seed
582 spiga 1.57 #self.list_of_args.append([
583     # str(self.sourceSeed)+str(i),
584     # str(self.sourceSeedVtx)+str(i)
585     # ])
586     args=args+str(',')+str(self.sourceSeed)+str(i)+str(',')+str(self.sourceSeedVtx)+str(i)
587 slacapra 1.28 else:
588     ## only pythia random seed
589 spiga 1.57 #self.list_of_args.append([(str(self.sourceSeed)+str(i))])
590     args=args +str(',')+str(self.sourceSeed)+str(i)
591 slacapra 1.23 else:
592 slacapra 1.28 ## no random seed
593 spiga 1.57 if str(args)=='': args=args+(str(self.firstRun)+str(i))
594     arguments=args.split(',')
595     if len(arguments)==3:self.list_of_args.append([str(arguments[0]),str(arguments[1]),str(arguments[2])])
596     elif len(arguments)==2:self.list_of_args.append([str(arguments[0]),str(arguments[1])])
597     else :self.list_of_args.append([str(arguments[0])])
598    
599     # print self.list_of_args
600 gutsche 1.3
601     return
602    
603 spiga 1.42
604     def jobSplittingForScript(self):#CarlosDaniele
605     """
606     Perform job splitting based on number of job
607     """
608     common.logger.debug(5,'Splitting per job')
609     common.logger.message('Required '+str(self.theNumberOfJobs)+' jobs in total ')
610    
611     self.total_number_of_jobs = self.theNumberOfJobs
612    
613     common.logger.debug(5,'N jobs '+str(self.total_number_of_jobs))
614    
615     common.logger.message(str(self.total_number_of_jobs)+' jobs can be created')
616    
617     # argument is seed number.$i
618     self.list_of_args = []
619     for i in range(self.total_number_of_jobs):
620     ## Since there is no input, any site is good
621     # self.jobDestination.append(["Any"])
622     self.jobDestination.append([""])
623     ## no random seed
624     self.list_of_args.append([str(i)])
625     return
626    
627 gutsche 1.3 def split(self, jobParams):
628    
629     common.jobDB.load()
630     #### Fabio
631     njobs = self.total_number_of_jobs
632 slacapra 1.9 arglist = self.list_of_args
633 gutsche 1.3 # create the empty structure
634     for i in range(njobs):
635     jobParams.append("")
636    
637     for job in range(njobs):
638 slacapra 1.17 jobParams[job] = arglist[job]
639     # print str(arglist[job])
640     # print jobParams[job]
641 gutsche 1.3 common.jobDB.setArguments(job, jobParams[job])
642 gutsche 1.35 common.logger.debug(5,"Job "+str(job)+" Destination: "+str(self.jobDestination[job]))
643     common.jobDB.setDestination(job, self.jobDestination[job])
644 gutsche 1.3
645     common.jobDB.save()
646     return
647    
648     def getJobTypeArguments(self, nj, sched):
649 slacapra 1.17 result = ''
650     for i in common.jobDB.arguments(nj):
651     result=result+str(i)+" "
652     return result
653 gutsche 1.3
654     def numberOfJobs(self):
655     # Fabio
656     return self.total_number_of_jobs
657    
658 slacapra 1.1 def getTarBall(self, exe):
659     """
660     Return the TarBall with lib and exe
661     """
662    
663     # if it exist, just return it
664 corvo 1.56 #
665     # Marco. Let's start to use relative path for Boss XML files
666     #
667     self.tgzNameWithPath = common.work_space.pathForTgz()+'share/'+self.tgz_name
668 slacapra 1.1 if os.path.exists(self.tgzNameWithPath):
669     return self.tgzNameWithPath
670    
671     # Prepare a tar gzipped file with user binaries.
672     self.buildTar_(exe)
673    
674     return string.strip(self.tgzNameWithPath)
675    
676     def buildTar_(self, executable):
677    
678     # First of all declare the user Scram area
679     swArea = self.scram.getSWArea_()
680     #print "swArea = ", swArea
681 slacapra 1.63 # swVersion = self.scram.getSWVersion()
682     # print "swVersion = ", swVersion
683 slacapra 1.1 swReleaseTop = self.scram.getReleaseTop_()
684     #print "swReleaseTop = ", swReleaseTop
685    
686     ## check if working area is release top
687     if swReleaseTop == '' or swArea == swReleaseTop:
688     return
689    
690 slacapra 1.61 import tarfile
691     try: # create tar ball
692     tar = tarfile.open(self.tgzNameWithPath, "w:gz")
693     ## First find the executable
694 gutsche 1.73 if (executable != ''):
695 slacapra 1.61 exeWithPath = self.scram.findFile_(executable)
696     if ( not exeWithPath ):
697     raise CrabException('User executable '+executable+' not found')
698    
699     ## then check if it's private or not
700     if exeWithPath.find(swReleaseTop) == -1:
701     # the exe is private, so we must ship
702     common.logger.debug(5,"Exe "+exeWithPath+" to be tarred")
703     path = swArea+'/'
704 gutsche 1.74 # distinguish case when script is in user project area or given by full path somewhere else
705     if exeWithPath.find(path) >= 0 :
706     exe = string.replace(exeWithPath, path,'')
707     tar.add(path+exe,os.path.basename(executable))
708     else :
709     tar.add(exeWithPath,os.path.basename(executable))
710 slacapra 1.61 pass
711     else:
712     # the exe is from release, we'll find it on WN
713     pass
714    
715     ## Now get the libraries: only those in local working area
716     libDir = 'lib'
717     lib = swArea+'/' +libDir
718     common.logger.debug(5,"lib "+lib+" to be tarred")
719     if os.path.exists(lib):
720     tar.add(lib,libDir)
721    
722     ## Now check if module dir is present
723     moduleDir = 'module'
724     module = swArea + '/' + moduleDir
725     if os.path.isdir(module):
726     tar.add(module,moduleDir)
727    
728     ## Now check if any data dir(s) is present
729     swAreaLen=len(swArea)
730     for root, dirs, files in os.walk(swArea):
731     if "data" in dirs:
732     common.logger.debug(5,"data "+root+"/data"+" to be tarred")
733     tar.add(root+"/data",root[swAreaLen:]+"/data")
734    
735     ## Add ProdAgent dir to tar
736     paDir = 'ProdAgentApi'
737     pa = os.environ['CRABDIR'] + '/' + 'ProdAgentApi'
738     if os.path.isdir(pa):
739     tar.add(pa,paDir)
740    
741     common.logger.debug(5,"Files added to "+self.tgzNameWithPath+" : "+str(tar.getnames()))
742     tar.close()
743     except :
744     raise CrabException('Could not create tar-ball')
745 gutsche 1.72
746     ## check for tarball size
747     tarballinfo = os.stat(self.tgzNameWithPath)
748     if ( tarballinfo.st_size > self.MaxTarBallSize*1024*1024 ) :
749     raise CrabException('Input sandbox size of ' + str(float(tarballinfo.st_size)/1024.0/1024.0) + ' MB is larger than the allowed ' + str(self.MaxTarBallSize) + ' MB input sandbox limit and not supported by the used GRID submission system. Please make sure that no unnecessary files are in all data directories in your local CMSSW project area as they are automatically packed into the input sandbox.')
750    
751 slacapra 1.61 ## create tar-ball with ML stuff
752 corvo 1.58 self.MLtgzfile = common.work_space.pathForTgz()+'share/MLfiles.tgz'
753 slacapra 1.61 try:
754     tar = tarfile.open(self.MLtgzfile, "w:gz")
755     path=os.environ['CRABDIR'] + '/python/'
756     for file in ['report.py', 'DashboardAPI.py', 'Logger.py', 'ProcInfo.py', 'apmon.py', 'parseCrabFjr.py']:
757     tar.add(path+file,file)
758     common.logger.debug(5,"Files added to "+self.MLtgzfile+" : "+str(tar.getnames()))
759     tar.close()
760     except :
761 corvo 1.58 raise CrabException('Could not create ML files tar-ball')
762    
763 slacapra 1.1 return
764    
765     def wsSetupEnvironment(self, nj):
766     """
767     Returns part of a job script which prepares
768     the execution environment for the job 'nj'.
769     """
770     # Prepare JobType-independent part
771 gutsche 1.3 txt = ''
772    
773     ## OLI_Daniele at this level middleware already known
774    
775     txt += 'if [ $middleware == LCG ]; then \n'
776     txt += self.wsSetupCMSLCGEnvironment_()
777     txt += 'elif [ $middleware == OSG ]; then\n'
778 gutsche 1.43 txt += ' WORKING_DIR=`/bin/mktemp -d $OSG_WN_TMP/cms_XXXXXXXXXXXX`\n'
779     txt += ' echo "Created working directory: $WORKING_DIR"\n'
780 gutsche 1.3 txt += ' if [ ! -d $WORKING_DIR ] ;then\n'
781 gutsche 1.7 txt += ' echo "SET_CMS_ENV 10016 ==> OSG $WORKING_DIR could not be created on WN `hostname`"\n'
782     txt += ' echo "JOB_EXIT_STATUS = 10016"\n'
783     txt += ' echo "JobExitCode=10016" | tee -a $RUNTIME_AREA/$repo\n'
784     txt += ' dumpStatus $RUNTIME_AREA/$repo\n'
785 gutsche 1.13 txt += ' rm -f $RUNTIME_AREA/$repo \n'
786     txt += ' echo "MonitorJobID=`echo $MonitorJobID`" | tee -a $RUNTIME_AREA/$repo \n'
787     txt += ' echo "MonitorID=`echo $MonitorID`" | tee -a $RUNTIME_AREA/$repo\n'
788 gutsche 1.3 txt += ' exit 1\n'
789     txt += ' fi\n'
790     txt += '\n'
791     txt += ' echo "Change to working directory: $WORKING_DIR"\n'
792     txt += ' cd $WORKING_DIR\n'
793     txt += self.wsSetupCMSOSGEnvironment_()
794     txt += 'fi\n'
795 slacapra 1.1
796     # Prepare JobType-specific part
797     scram = self.scram.commandName()
798     txt += '\n\n'
799     txt += 'echo "### SPECIFIC JOB SETUP ENVIRONMENT ###"\n'
800     txt += scram+' project CMSSW '+self.version+'\n'
801     txt += 'status=$?\n'
802     txt += 'if [ $status != 0 ] ; then\n'
803 gutsche 1.7 txt += ' echo "SET_EXE_ENV 10034 ==>ERROR CMSSW '+self.version+' not found on `hostname`" \n'
804 gutsche 1.3 txt += ' echo "JOB_EXIT_STATUS = 10034"\n'
805 gutsche 1.7 txt += ' echo "JobExitCode=10034" | tee -a $RUNTIME_AREA/$repo\n'
806 slacapra 1.1 txt += ' dumpStatus $RUNTIME_AREA/$repo\n'
807 gutsche 1.13 txt += ' rm -f $RUNTIME_AREA/$repo \n'
808     txt += ' echo "MonitorJobID=`echo $MonitorJobID`" | tee -a $RUNTIME_AREA/$repo \n'
809     txt += ' echo "MonitorID=`echo $MonitorID`" | tee -a $RUNTIME_AREA/$repo\n'
810 gutsche 1.3 ## OLI_Daniele
811     txt += ' if [ $middleware == OSG ]; then \n'
812     txt += ' echo "Remove working directory: $WORKING_DIR"\n'
813     txt += ' cd $RUNTIME_AREA\n'
814     txt += ' /bin/rm -rf $WORKING_DIR\n'
815     txt += ' if [ -d $WORKING_DIR ] ;then\n'
816 gutsche 1.7 txt += ' echo "SET_CMS_ENV 10018 ==> OSG $WORKING_DIR could not be deleted on WN `hostname` after CMSSW CMSSW_0_6_1 not found on `hostname`"\n'
817     txt += ' echo "JOB_EXIT_STATUS = 10018"\n'
818     txt += ' echo "JobExitCode=10018" | tee -a $RUNTIME_AREA/$repo\n'
819     txt += ' dumpStatus $RUNTIME_AREA/$repo\n'
820 gutsche 1.13 txt += ' rm -f $RUNTIME_AREA/$repo \n'
821     txt += ' echo "MonitorJobID=`echo $MonitorJobID`" | tee -a $RUNTIME_AREA/$repo \n'
822     txt += ' echo "MonitorID=`echo $MonitorID`" | tee -a $RUNTIME_AREA/$repo\n'
823 gutsche 1.3 txt += ' fi\n'
824     txt += ' fi \n'
825     txt += ' exit 1 \n'
826 slacapra 1.1 txt += 'fi \n'
827     txt += 'echo "CMSSW_VERSION = '+self.version+'"\n'
828 slacapra 1.71 txt += 'export SCRAM_ARCH='+self.executable_arch+'\n'
829 slacapra 1.1 txt += 'cd '+self.version+'\n'
830     ### needed grep for bug in scramv1 ###
831 corvo 1.58 txt += scram+' runtime -sh\n'
832 slacapra 1.1 txt += 'eval `'+scram+' runtime -sh | grep -v SCRAMRT_LSB_JOBNAME`\n'
833 corvo 1.58 txt += 'echo $PATH\n'
834 slacapra 1.1
835     # Handle the arguments:
836     txt += "\n"
837 gutsche 1.7 txt += "## number of arguments (first argument always jobnumber)\n"
838 slacapra 1.1 txt += "\n"
839 mkirn 1.32 # txt += "narg=$#\n"
840     txt += "if [ $nargs -lt 2 ]\n"
841 slacapra 1.1 txt += "then\n"
842 mkirn 1.33 txt += " echo 'SET_EXE_ENV 1 ==> ERROR Too few arguments' +$nargs+ \n"
843 gutsche 1.3 txt += ' echo "JOB_EXIT_STATUS = 50113"\n'
844 gutsche 1.7 txt += ' echo "JobExitCode=50113" | tee -a $RUNTIME_AREA/$repo\n'
845 slacapra 1.1 txt += ' dumpStatus $RUNTIME_AREA/$repo\n'
846 gutsche 1.13 txt += ' rm -f $RUNTIME_AREA/$repo \n'
847     txt += ' echo "MonitorJobID=`echo $MonitorJobID`" | tee -a $RUNTIME_AREA/$repo \n'
848     txt += ' echo "MonitorID=`echo $MonitorID`" | tee -a $RUNTIME_AREA/$repo\n'
849 gutsche 1.3 ## OLI_Daniele
850     txt += ' if [ $middleware == OSG ]; then \n'
851     txt += ' echo "Remove working directory: $WORKING_DIR"\n'
852     txt += ' cd $RUNTIME_AREA\n'
853     txt += ' /bin/rm -rf $WORKING_DIR\n'
854     txt += ' if [ -d $WORKING_DIR ] ;then\n'
855 gutsche 1.7 txt += ' echo "SET_EXE_ENV 50114 ==> OSG $WORKING_DIR could not be deleted on WN `hostname` after Too few arguments for CRAB job wrapper"\n'
856     txt += ' echo "JOB_EXIT_STATUS = 50114"\n'
857     txt += ' echo "JobExitCode=50114" | tee -a $RUNTIME_AREA/$repo\n'
858     txt += ' dumpStatus $RUNTIME_AREA/$repo\n'
859 gutsche 1.13 txt += ' rm -f $RUNTIME_AREA/$repo \n'
860     txt += ' echo "MonitorJobID=`echo $MonitorJobID`" | tee -a $RUNTIME_AREA/$repo \n'
861     txt += ' echo "MonitorID=`echo $MonitorID`" | tee -a $RUNTIME_AREA/$repo\n'
862 gutsche 1.3 txt += ' fi\n'
863     txt += ' fi \n'
864 slacapra 1.1 txt += " exit 1\n"
865     txt += "fi\n"
866     txt += "\n"
867    
868     # Prepare job-specific part
869     job = common.job_list[nj]
870 spiga 1.42 if self.pset != None: #CarlosDaniele
871     pset = os.path.basename(job.configFilename())
872     txt += '\n'
873     if (self.datasetPath): # standard job
874     #txt += 'InputFiles=$2\n'
875     txt += 'InputFiles=${args[1]}\n'
876     txt += 'MaxEvents=${args[2]}\n'
877     txt += 'SkipEvents=${args[3]}\n'
878     txt += 'echo "Inputfiles:<$InputFiles>"\n'
879     txt += 'sed "s#{\'INPUT\'}#$InputFiles#" $RUNTIME_AREA/'+pset+' > pset_tmp_1.cfg\n'
880     txt += 'echo "MaxEvents:<$MaxEvents>"\n'
881 gutsche 1.46 txt += 'sed "s#INPUTMAXEVENTS#$MaxEvents#" pset_tmp_1.cfg > pset_tmp_2.cfg\n'
882 spiga 1.42 txt += 'echo "SkipEvents:<$SkipEvents>"\n'
883 gutsche 1.46 txt += 'sed "s#INPUTSKIPEVENTS#$SkipEvents#" pset_tmp_2.cfg > pset.cfg\n'
884 spiga 1.42 else: # pythia like job
885     if (self.sourceSeed):
886 spiga 1.57 txt += 'FirstRun=${args[1]}\n'
887     txt += 'echo "FirstRun: <$FirstRun>"\n'
888     txt += 'sed "s#\<INPUTFIRSTRUN\>#$FirstRun#" $RUNTIME_AREA/'+pset+' > tmp_1.cfg\n'
889     else:
890     txt += '# Copy untouched pset\n'
891     txt += 'cp $RUNTIME_AREA/'+pset+' tmp_1.cfg\n'
892     if (self.sourceSeed):
893 spiga 1.42 # txt += 'Seed=$2\n'
894 spiga 1.57 txt += 'Seed=${args[2]}\n'
895 spiga 1.42 txt += 'echo "Seed: <$Seed>"\n'
896 spiga 1.57 txt += 'sed "s#\<INPUT\>#$Seed#" tmp_1.cfg > tmp_2.cfg\n'
897 spiga 1.42 if (self.sourceSeedVtx):
898     # txt += 'VtxSeed=$3\n'
899 spiga 1.57 txt += 'VtxSeed=${args[3]}\n'
900 spiga 1.42 txt += 'echo "VtxSeed: <$VtxSeed>"\n'
901 spiga 1.57 txt += 'sed "s#INPUTVTX#$VtxSeed#" tmp_2.cfg > pset.cfg\n'
902 spiga 1.42 else:
903 spiga 1.57 txt += 'mv tmp_2.cfg pset.cfg\n'
904 slacapra 1.28 else:
905 spiga 1.57 txt += 'mv tmp_1.cfg pset.cfg\n'
906     # txt += '# Copy untouched pset\n'
907     # txt += 'cp $RUNTIME_AREA/'+pset+' pset.cfg\n'
908 slacapra 1.24
909 slacapra 1.1
910     if len(self.additional_inbox_files) > 0:
911     for file in self.additional_inbox_files:
912 mkirn 1.31 relFile = file.split("/")[-1]
913     txt += 'if [ -e $RUNTIME_AREA/'+relFile+' ] ; then\n'
914     txt += ' cp $RUNTIME_AREA/'+relFile+' .\n'
915     txt += ' chmod +x '+relFile+'\n'
916 slacapra 1.1 txt += 'fi\n'
917     pass
918    
919 spiga 1.42 if self.pset != None: #CarlosDaniele
920     txt += 'echo "### END JOB SETUP ENVIRONMENT ###"\n\n'
921    
922     txt += '\n'
923     txt += 'echo "***** cat pset.cfg *********"\n'
924     txt += 'cat pset.cfg\n'
925     txt += 'echo "****** end pset.cfg ********"\n'
926     txt += '\n'
927     # txt += 'echo "***** cat pset1.cfg *********"\n'
928     # txt += 'cat pset1.cfg\n'
929     # txt += 'echo "****** end pset1.cfg ********"\n'
930 gutsche 1.3 return txt
931    
932 slacapra 1.63 def wsBuildExe(self, nj=0):
933 gutsche 1.3 """
934     Put in the script the commands to build an executable
935     or a library.
936     """
937    
938     txt = ""
939    
940     if os.path.isfile(self.tgzNameWithPath):
941     txt += 'echo "tar xzvf $RUNTIME_AREA/'+os.path.basename(self.tgzNameWithPath)+'"\n'
942     txt += 'tar xzvf $RUNTIME_AREA/'+os.path.basename(self.tgzNameWithPath)+'\n'
943     txt += 'untar_status=$? \n'
944     txt += 'if [ $untar_status -ne 0 ]; then \n'
945     txt += ' echo "SET_EXE 1 ==> ERROR Untarring .tgz file failed"\n'
946     txt += ' echo "JOB_EXIT_STATUS = $untar_status" \n'
947 gutsche 1.7 txt += ' echo "JobExitCode=$untar_status" | tee -a $RUNTIME_AREA/$repo\n'
948 gutsche 1.3 txt += ' if [ $middleware == OSG ]; then \n'
949     txt += ' echo "Remove working directory: $WORKING_DIR"\n'
950     txt += ' cd $RUNTIME_AREA\n'
951     txt += ' /bin/rm -rf $WORKING_DIR\n'
952     txt += ' if [ -d $WORKING_DIR ] ;then\n'
953 gutsche 1.13 txt += ' echo "SET_EXE 50999 ==> OSG $WORKING_DIR could not be deleted on WN `hostname` after Untarring .tgz file failed"\n'
954     txt += ' echo "JOB_EXIT_STATUS = 50999"\n'
955     txt += ' echo "JobExitCode=50999" | tee -a $RUNTIME_AREA/$repo\n'
956     txt += ' dumpStatus $RUNTIME_AREA/$repo\n'
957     txt += ' rm -f $RUNTIME_AREA/$repo \n'
958     txt += ' echo "MonitorJobID=`echo $MonitorJobID`" | tee -a $RUNTIME_AREA/$repo \n'
959     txt += ' echo "MonitorID=`echo $MonitorID`" | tee -a $RUNTIME_AREA/$repo\n'
960 gutsche 1.3 txt += ' fi\n'
961     txt += ' fi \n'
962     txt += ' \n'
963 gutsche 1.7 txt += ' exit 1 \n'
964 gutsche 1.3 txt += 'else \n'
965     txt += ' echo "Successful untar" \n'
966     txt += 'fi \n'
967 gutsche 1.50 txt += '\n'
968     txt += 'echo "Include ProdAgentApi in PYTHONPATH"\n'
969     txt += 'if [ -z "$PYTHONPATH" ]; then\n'
970     txt += ' export PYTHONPATH=ProdAgentApi\n'
971     txt += 'else\n'
972     txt += ' export PYTHONPATH=ProdAgentApi:${PYTHONPATH}\n'
973     txt += 'fi\n'
974     txt += '\n'
975    
976 gutsche 1.3 pass
977    
978 slacapra 1.1 return txt
979    
980     def modifySteeringCards(self, nj):
981     """
982     modify the card provided by the user,
983     writing a new card into share dir
984     """
985    
986     def executableName(self):
987 slacapra 1.70 if self.scriptExe: #CarlosDaniele
988 spiga 1.42 return "sh "
989     else:
990     return self.executable
991 slacapra 1.1
992     def executableArgs(self):
993 slacapra 1.70 if self.scriptExe:#CarlosDaniele
994 spiga 1.42 return self.scriptExe + " $NJob"
995     else:
996     return " -p pset.cfg"
997 slacapra 1.1
998     def inputSandbox(self, nj):
999     """
1000     Returns a list of filenames to be put in JDL input sandbox.
1001     """
1002     inp_box = []
1003 slacapra 1.53 # # dict added to delete duplicate from input sandbox file list
1004     # seen = {}
1005 slacapra 1.1 ## code
1006     if os.path.isfile(self.tgzNameWithPath):
1007     inp_box.append(self.tgzNameWithPath)
1008 corvo 1.58 if os.path.isfile(self.MLtgzfile):
1009     inp_box.append(self.MLtgzfile)
1010 slacapra 1.1 ## config
1011 slacapra 1.70 if not self.pset is None:
1012 corvo 1.56 inp_box.append(common.work_space.pathForTgz() + 'job/' + self.configFilename())
1013 slacapra 1.1 ## additional input files
1014 slacapra 1.70 for file in self.additional_inbox_files:
1015     inp_box.append(file)
1016 slacapra 1.1 return inp_box
1017    
1018     def outputSandbox(self, nj):
1019     """
1020     Returns a list of filenames to be put in JDL output sandbox.
1021     """
1022     out_box = []
1023    
1024     ## User Declared output files
1025 slacapra 1.54 for out in (self.output_file+self.output_file_sandbox):
1026 slacapra 1.1 n_out = nj + 1
1027     out_box.append(self.numberFile_(out,str(n_out)))
1028     return out_box
1029    
1030     def prepareSteeringCards(self):
1031     """
1032     Make initial modifications of the user's steering card file.
1033     """
1034     return
1035    
1036     def wsRenameOutput(self, nj):
1037     """
1038     Returns part of a job script which renames the produced files.
1039     """
1040    
1041     txt = '\n'
1042 gutsche 1.7 txt += '# directory content\n'
1043     txt += 'ls \n'
1044 slacapra 1.54
1045     for fileWithSuffix in (self.output_file+self.output_file_sandbox):
1046 slacapra 1.1 output_file_num = self.numberFile_(fileWithSuffix, '$NJob')
1047     txt += '\n'
1048 gutsche 1.7 txt += '# check output file\n'
1049 slacapra 1.1 txt += 'ls '+fileWithSuffix+'\n'
1050 fanzago 1.18 txt += 'ls_result=$?\n'
1051     txt += 'if [ $ls_result -ne 0 ] ; then\n'
1052     txt += ' echo "ERROR: Problem with output file"\n'
1053 gutsche 1.7 if common.scheduler.boss_scheduler_name == 'condor_g':
1054     txt += ' if [ $middleware == OSG ]; then \n'
1055     txt += ' echo "prepare dummy output file"\n'
1056     txt += ' echo "Processing of job output failed" > $RUNTIME_AREA/'+output_file_num+'\n'
1057     txt += ' fi \n'
1058 slacapra 1.1 txt += 'else\n'
1059     txt += ' cp '+fileWithSuffix+' $RUNTIME_AREA/'+output_file_num+'\n'
1060     txt += 'fi\n'
1061    
1062 gutsche 1.7 txt += 'cd $RUNTIME_AREA\n'
1063 fanzago 1.18 txt += 'cd $RUNTIME_AREA\n'
1064 gutsche 1.3 ### OLI_DANIELE
1065     txt += 'if [ $middleware == OSG ]; then\n'
1066     txt += ' cd $RUNTIME_AREA\n'
1067     txt += ' echo "Remove working directory: $WORKING_DIR"\n'
1068     txt += ' /bin/rm -rf $WORKING_DIR\n'
1069     txt += ' if [ -d $WORKING_DIR ] ;then\n'
1070 gutsche 1.7 txt += ' echo "SET_EXE 60999 ==> OSG $WORKING_DIR could not be deleted on WN `hostname` after cleanup of WN"\n'
1071     txt += ' echo "JOB_EXIT_STATUS = 60999"\n'
1072     txt += ' echo "JobExitCode=60999" | tee -a $RUNTIME_AREA/$repo\n'
1073     txt += ' dumpStatus $RUNTIME_AREA/$repo\n'
1074 gutsche 1.13 txt += ' rm -f $RUNTIME_AREA/$repo \n'
1075     txt += ' echo "MonitorJobID=`echo $MonitorJobID`" | tee -a $RUNTIME_AREA/$repo \n'
1076     txt += ' echo "MonitorID=`echo $MonitorID`" | tee -a $RUNTIME_AREA/$repo\n'
1077 gutsche 1.3 txt += ' fi\n'
1078     txt += 'fi\n'
1079     txt += '\n'
1080 slacapra 1.54
1081     file_list = ''
1082     ## Add to filelist only files to be possibly copied to SE
1083     for fileWithSuffix in self.output_file:
1084     output_file_num = self.numberFile_(fileWithSuffix, '$NJob')
1085     file_list=file_list+output_file_num+' '
1086     file_list=file_list[:-1]
1087     txt += 'file_list="'+file_list+'"\n'
1088    
1089 slacapra 1.1 return txt
1090    
1091     def numberFile_(self, file, txt):
1092     """
1093     append _'txt' before last extension of a file
1094     """
1095     p = string.split(file,".")
1096     # take away last extension
1097     name = p[0]
1098     for x in p[1:-1]:
1099     name=name+"."+x
1100     # add "_txt"
1101     if len(p)>1:
1102     ext = p[len(p)-1]
1103     result = name + '_' + txt + "." + ext
1104     else:
1105     result = name + '_' + txt
1106    
1107     return result
1108    
1109 slacapra 1.63 def getRequirements(self, nj=[]):
1110 slacapra 1.1 """
1111     return job requirements to add to jdl files
1112     """
1113     req = ''
1114 slacapra 1.47 if self.version:
1115 slacapra 1.10 req='Member("VO-cms-' + \
1116 slacapra 1.47 self.version + \
1117 slacapra 1.10 '", other.GlueHostApplicationSoftwareRunTimeEnvironment)'
1118 gutsche 1.35
1119     req = req + ' && (other.GlueHostNetworkAdapterOutboundIP)'
1120    
1121 slacapra 1.1 return req
1122 gutsche 1.3
1123     def configFilename(self):
1124     """ return the config filename """
1125     return self.name()+'.cfg'
1126    
1127     ### OLI_DANIELE
1128     def wsSetupCMSOSGEnvironment_(self):
1129     """
1130     Returns part of a job script which is prepares
1131     the execution environment and which is common for all CMS jobs.
1132     """
1133     txt = '\n'
1134     txt += ' echo "### SETUP CMS OSG ENVIRONMENT ###"\n'
1135     txt += ' if [ -f $GRID3_APP_DIR/cmssoft/cmsset_default.sh ] ;then\n'
1136     txt += ' # Use $GRID3_APP_DIR/cmssoft/cmsset_default.sh to setup cms software\n'
1137     txt += ' source $GRID3_APP_DIR/cmssoft/cmsset_default.sh '+self.version+'\n'
1138 mkirn 1.40 txt += ' elif [ -f $OSG_APP/cmssoft/cms/cmsset_default.sh ] ;then\n'
1139     txt += ' # Use $OSG_APP/cmssoft/cms/cmsset_default.sh to setup cms software\n'
1140     txt += ' source $OSG_APP/cmssoft/cms/cmsset_default.sh '+self.version+'\n'
1141 gutsche 1.3 txt += ' else\n'
1142 mkirn 1.40 txt += ' echo "SET_CMS_ENV 10020 ==> ERROR $GRID3_APP_DIR/cmssoft/cmsset_default.sh and $OSG_APP/cmssoft/cms/cmsset_default.sh file not found"\n'
1143 gutsche 1.3 txt += ' echo "JOB_EXIT_STATUS = 10020"\n'
1144     txt += ' echo "JobExitCode=10020" | tee -a $RUNTIME_AREA/$repo\n'
1145     txt += ' dumpStatus $RUNTIME_AREA/$repo\n'
1146 gutsche 1.13 txt += ' rm -f $RUNTIME_AREA/$repo \n'
1147     txt += ' echo "MonitorJobID=`echo $MonitorJobID`" | tee -a $RUNTIME_AREA/$repo \n'
1148     txt += ' echo "MonitorID=`echo $MonitorID`" | tee -a $RUNTIME_AREA/$repo\n'
1149 gutsche 1.7 txt += ' exit 1\n'
1150 gutsche 1.3 txt += '\n'
1151     txt += ' echo "Remove working directory: $WORKING_DIR"\n'
1152     txt += ' cd $RUNTIME_AREA\n'
1153     txt += ' /bin/rm -rf $WORKING_DIR\n'
1154     txt += ' if [ -d $WORKING_DIR ] ;then\n'
1155 mkirn 1.40 txt += ' echo "SET_CMS_ENV 10017 ==> OSG $WORKING_DIR could not be deleted on WN `hostname` after $GRID3_APP_DIR/cmssoft/cmsset_default.sh and $OSG_APP/cmssoft/cms/cmsset_default.sh file not found"\n'
1156 gutsche 1.7 txt += ' echo "JOB_EXIT_STATUS = 10017"\n'
1157     txt += ' echo "JobExitCode=10017" | tee -a $RUNTIME_AREA/$repo\n'
1158     txt += ' dumpStatus $RUNTIME_AREA/$repo\n'
1159 gutsche 1.13 txt += ' rm -f $RUNTIME_AREA/$repo \n'
1160     txt += ' echo "MonitorJobID=`echo $MonitorJobID`" | tee -a $RUNTIME_AREA/$repo \n'
1161     txt += ' echo "MonitorID=`echo $MonitorID`" | tee -a $RUNTIME_AREA/$repo\n'
1162 gutsche 1.3 txt += ' fi\n'
1163     txt += '\n'
1164 gutsche 1.7 txt += ' exit 1\n'
1165 gutsche 1.3 txt += ' fi\n'
1166     txt += '\n'
1167     txt += ' echo "SET_CMS_ENV 0 ==> setup cms environment ok"\n'
1168     txt += ' echo " END SETUP CMS OSG ENVIRONMENT "\n'
1169    
1170     return txt
1171    
1172     ### OLI_DANIELE
1173     def wsSetupCMSLCGEnvironment_(self):
1174     """
1175     Returns part of a job script which is prepares
1176     the execution environment and which is common for all CMS jobs.
1177     """
1178     txt = ' \n'
1179     txt += ' echo " ### SETUP CMS LCG ENVIRONMENT ### "\n'
1180     txt += ' if [ ! $VO_CMS_SW_DIR ] ;then\n'
1181     txt += ' echo "SET_CMS_ENV 10031 ==> ERROR CMS software dir not found on WN `hostname`"\n'
1182     txt += ' echo "JOB_EXIT_STATUS = 10031" \n'
1183     txt += ' echo "JobExitCode=10031" | tee -a $RUNTIME_AREA/$repo\n'
1184     txt += ' dumpStatus $RUNTIME_AREA/$repo\n'
1185 gutsche 1.13 txt += ' rm -f $RUNTIME_AREA/$repo \n'
1186     txt += ' echo "MonitorJobID=`echo $MonitorJobID`" | tee -a $RUNTIME_AREA/$repo \n'
1187     txt += ' echo "MonitorID=`echo $MonitorID`" | tee -a $RUNTIME_AREA/$repo\n'
1188 gutsche 1.7 txt += ' exit 1\n'
1189 gutsche 1.3 txt += ' else\n'
1190     txt += ' echo "Sourcing environment... "\n'
1191     txt += ' if [ ! -s $VO_CMS_SW_DIR/cmsset_default.sh ] ;then\n'
1192     txt += ' echo "SET_CMS_ENV 10020 ==> ERROR cmsset_default.sh file not found into dir $VO_CMS_SW_DIR"\n'
1193     txt += ' echo "JOB_EXIT_STATUS = 10020"\n'
1194     txt += ' echo "JobExitCode=10020" | tee -a $RUNTIME_AREA/$repo\n'
1195     txt += ' dumpStatus $RUNTIME_AREA/$repo\n'
1196 gutsche 1.13 txt += ' rm -f $RUNTIME_AREA/$repo \n'
1197     txt += ' echo "MonitorJobID=`echo $MonitorJobID`" | tee -a $RUNTIME_AREA/$repo \n'
1198     txt += ' echo "MonitorID=`echo $MonitorID`" | tee -a $RUNTIME_AREA/$repo\n'
1199 gutsche 1.7 txt += ' exit 1\n'
1200 gutsche 1.3 txt += ' fi\n'
1201     txt += ' echo "sourcing $VO_CMS_SW_DIR/cmsset_default.sh"\n'
1202     txt += ' source $VO_CMS_SW_DIR/cmsset_default.sh\n'
1203     txt += ' result=$?\n'
1204     txt += ' if [ $result -ne 0 ]; then\n'
1205     txt += ' echo "SET_CMS_ENV 10032 ==> ERROR problem sourcing $VO_CMS_SW_DIR/cmsset_default.sh"\n'
1206     txt += ' echo "JOB_EXIT_STATUS = 10032"\n'
1207     txt += ' echo "JobExitCode=10032" | tee -a $RUNTIME_AREA/$repo\n'
1208     txt += ' dumpStatus $RUNTIME_AREA/$repo\n'
1209 gutsche 1.13 txt += ' rm -f $RUNTIME_AREA/$repo \n'
1210     txt += ' echo "MonitorJobID=`echo $MonitorJobID`" | tee -a $RUNTIME_AREA/$repo \n'
1211     txt += ' echo "MonitorID=`echo $MonitorID`" | tee -a $RUNTIME_AREA/$repo\n'
1212 gutsche 1.7 txt += ' exit 1\n'
1213 gutsche 1.3 txt += ' fi\n'
1214     txt += ' fi\n'
1215     txt += ' \n'
1216     txt += ' echo "SET_CMS_ENV 0 ==> setup cms environment ok"\n'
1217     txt += ' echo "### END SETUP CMS LCG ENVIRONMENT ###"\n'
1218     return txt
1219 gutsche 1.5
1220     def setParam_(self, param, value):
1221     self._params[param] = value
1222    
1223     def getParams(self):
1224     return self._params
1225 gutsche 1.8
1226     def setTaskid_(self):
1227     self._taskId = self.cfg_params['taskId']
1228    
1229     def getTaskid(self):
1230     return self._taskId
1231 gutsche 1.35
1232     #######################################################################
1233     def uniquelist(self, old):
1234     """
1235     remove duplicates from a list
1236     """
1237     nd={}
1238     for e in old:
1239     nd[e]=0
1240     return nd.keys()