ViewVC Help
View File | Revision Log | Show Annotations | Root Listing
root/cvsroot/COMP/CRAB/python/cms_cmssw.py
Revision: 1.176
Committed: Fri Apr 11 14:54:23 2008 UTC (17 years ago) by slacapra
Content type: text/x-python
Branch: MAIN
CVS Tags: CRAB_2_2_0_pre8, CRAB_2_2_0_pre7
Changes since 1.175: +10 -19 lines
Log Message:
Many changes to have LSF working with BossLite
Introduce Killer class to handle -kill which works again
Work_space::res() return the correct output directory also in case user has set a non default one, Likewise for logDir()
USER.outputdir is not to be used anywhere outside workspace class
Some cleanup in submit logic, to reduce call of Scheduler specific classes from Submitter.py
crab -clean works as well (well, almost, still need to remove twice the directory)
Fill startDirectory and outputDirectory to Task
GetOuput check status and not schedulerStatus (not stantard)
Some cleanup in the use of BlackWhiteListParser
No explicit check of scheduler concrete type in Submitter at listMatch level: move different behaviour in SchedulerXYZ implementation
Plus other things I'm forgetting...

Stefano

File Contents

# User Rev Content
1 slacapra 1.1 from JobType import JobType
2     from crab_logger import Logger
3     from crab_exceptions import *
4     from crab_util import *
5 fanzago 1.115 from BlackWhiteListParser import BlackWhiteListParser
6 slacapra 1.1 import common
7     import Scram
8 fanzago 1.173 from LFNBaseName import *
9 slacapra 1.1
10 slacapra 1.105 import os, string, glob
11 slacapra 1.1
12     class Cmssw(JobType):
13 mcinquil 1.144 def __init__(self, cfg_params, ncjobs):
14 slacapra 1.1 JobType.__init__(self, 'CMSSW')
15     common.logger.debug(3,'CMSSW::__init__')
16    
17 mcinquil 1.140 self.argsList = []
18 mcinquil 1.144
19 gutsche 1.3 self._params = {}
20     self.cfg_params = cfg_params
21 fanzago 1.115 # init BlackWhiteListParser
22     self.blackWhiteListParser = BlackWhiteListParser(cfg_params)
23    
24 slacapra 1.153 self.MaxTarBallSize = float(self.cfg_params.get('EDG.maxtarballsize',9.5))
25 gutsche 1.72
26 gutsche 1.44 # number of jobs requested to be created, limit obj splitting
27 gutsche 1.38 self.ncjobs = ncjobs
28    
29 slacapra 1.1 log = common.logger
30 ewv 1.131
31 slacapra 1.1 self.scram = Scram.Scram(cfg_params)
32     self.additional_inbox_files = []
33     self.scriptExe = ''
34     self.executable = ''
35 slacapra 1.71 self.executable_arch = self.scram.getArch()
36 slacapra 1.1 self.tgz_name = 'default.tgz'
37 slacapra 1.97 self.additional_tgz_name = 'additional.tgz'
38 corvo 1.56 self.scriptName = 'CMSSW.sh'
39 ewv 1.131 self.pset = '' #scrip use case Da
40 spiga 1.42 self.datasetPath = '' #scrip use case Da
41 gutsche 1.3
42 gutsche 1.50 # set FJR file name
43     self.fjrFileName = 'crab_fjr.xml'
44    
45 slacapra 1.1 self.version = self.scram.getSWVersion()
46 ewv 1.131
47 spiga 1.114 #
48     # Try to block creation in case of arch/version mismatch
49     #
50    
51 spiga 1.162 # a = string.split(self.version, "_")
52     #
53     # if int(a[1]) == 1 and (int(a[2]) < 5 and self.executable_arch.find('slc4') == 0):
54     # msg = "Warning: You are using %s version of CMSSW with %s architecture. \n--> Did you compile your libraries with SLC3? Otherwise you can find some problems running on SLC4 Grid nodes.\n"%(self.version, self.executable_arch)
55     # common.logger.message(msg)
56     # if int(a[1]) == 1 and (int(a[2]) >= 5 and self.executable_arch.find('slc3') == 0):
57     # msg = "Error: CMS does not support %s with %s architecture"%(self.version, self.executable_arch)
58     # raise CrabException(msg)
59     #
60 ewv 1.170
61 gutsche 1.5 self.setParam_('application', self.version)
62 slacapra 1.47
63 slacapra 1.1 ### collect Data cards
64 gutsche 1.66
65 slacapra 1.153 if not cfg_params.has_key('CMSSW.datasetpath'):
66 ewv 1.131 msg = "Error: datasetpath not defined "
67 slacapra 1.1 raise CrabException(msg)
68 slacapra 1.153 tmp = cfg_params['CMSSW.datasetpath']
69     log.debug(6, "CMSSW::CMSSW(): datasetPath = "+tmp)
70     if string.lower(tmp)=='none':
71     self.datasetPath = None
72     self.selectNoInput = 1
73     else:
74     self.datasetPath = tmp
75     self.selectNoInput = 0
76 gutsche 1.5
77     # ML monitoring
78     # split dataset path style: /PreProdR3Minbias/SIM/GEN-SIM
79 slacapra 1.9 if not self.datasetPath:
80     self.setParam_('dataset', 'None')
81     self.setParam_('owner', 'None')
82     else:
83 slacapra 1.153 ## SL what is supposed to fail here?
84 gutsche 1.92 try:
85     datasetpath_split = self.datasetPath.split("/")
86     # standard style
87 mcinquil 1.120 self.setParam_('datasetFull', self.datasetPath)
88 slacapra 1.137 self.setParam_('dataset', datasetpath_split[1])
89     self.setParam_('owner', datasetpath_split[2])
90 gutsche 1.92 except:
91     self.setParam_('dataset', self.datasetPath)
92     self.setParam_('owner', self.datasetPath)
93 ewv 1.131
94 spiga 1.162 self.setParam_('taskId', common._db.queryTask('name')) ## new BL--DS
95 gutsche 1.5
96 slacapra 1.1 self.dataTiers = []
97    
98     ## now the application
99 slacapra 1.153 self.executable = cfg_params.get('CMSSW.executable','cmsRun')
100     self.setParam_('exe', self.executable)
101     log.debug(6, "CMSSW::CMSSW(): executable = "+self.executable)
102 slacapra 1.1
103 slacapra 1.153 if not cfg_params.has_key('CMSSW.pset'):
104 slacapra 1.1 raise CrabException("PSet file missing. Cannot run cmsRun ")
105 slacapra 1.153 self.pset = cfg_params['CMSSW.pset']
106     log.debug(6, "Cmssw::Cmssw(): PSet file = "+self.pset)
107     if self.pset.lower() != 'none' :
108     if (not os.path.exists(self.pset)):
109     raise CrabException("User defined PSet file "+self.pset+" does not exist")
110     else:
111     self.pset = None
112 slacapra 1.1
113     # output files
114 slacapra 1.53 ## stuff which must be returned always via sandbox
115     self.output_file_sandbox = []
116    
117     # add fjr report by default via sandbox
118     self.output_file_sandbox.append(self.fjrFileName)
119    
120     # other output files to be returned via sandbox or copied to SE
121 slacapra 1.153 self.output_file = []
122     tmp = cfg_params.get('CMSSW.output_file',None)
123     if tmp :
124     tmpOutFiles = string.split(tmp,',')
125     log.debug(7, 'cmssw::cmssw(): output files '+str(tmpOutFiles))
126     for tmp in tmpOutFiles:
127     tmp=string.strip(tmp)
128     self.output_file.append(tmp)
129 slacapra 1.1 pass
130 slacapra 1.153 else:
131 gutsche 1.92 log.message("No output file defined: only stdout/err and the CRAB Framework Job Report will be available\n")
132 slacapra 1.153 pass
133 slacapra 1.1
134     # script_exe file as additional file in inputSandbox
135 slacapra 1.153 self.scriptExe = cfg_params.get('USER.script_exe',None)
136     if self.scriptExe :
137 slacapra 1.176 if not os.path.isfile(self.scriptExe):
138     msg ="ERROR. file "+self.scriptExe+" not found"
139     raise CrabException(msg)
140     self.additional_inbox_files.append(string.strip(self.scriptExe))
141 slacapra 1.70
142 spiga 1.42 #CarlosDaniele
143     if self.datasetPath == None and self.pset == None and self.scriptExe == '' :
144 slacapra 1.176 msg ="Error. script_exe not defined"
145     raise CrabException(msg)
146 spiga 1.42
147 slacapra 1.1 ## additional input files
148 slacapra 1.153 if cfg_params.has_key('USER.additional_input_files'):
149 slacapra 1.29 tmpAddFiles = string.split(cfg_params['USER.additional_input_files'],',')
150 slacapra 1.70 for tmp in tmpAddFiles:
151     tmp = string.strip(tmp)
152     dirname = ''
153     if not tmp[0]=="/": dirname = "."
154 corvo 1.85 files = []
155     if string.find(tmp,"*")>-1:
156     files = glob.glob(os.path.join(dirname, tmp))
157     if len(files)==0:
158     raise CrabException("No additional input file found with this pattern: "+tmp)
159     else:
160     files.append(tmp)
161 slacapra 1.70 for file in files:
162     if not os.path.exists(file):
163     raise CrabException("Additional input file not found: "+file)
164 slacapra 1.45 pass
165 slacapra 1.105 # fname = string.split(file, '/')[-1]
166     # storedFile = common.work_space.pathForTgz()+'share/'+fname
167     # shutil.copyfile(file, storedFile)
168     self.additional_inbox_files.append(string.strip(file))
169 slacapra 1.1 pass
170     pass
171 slacapra 1.70 common.logger.debug(5,"Additional input files: "+str(self.additional_inbox_files))
172 slacapra 1.153 pass
173 gutsche 1.3
174 slacapra 1.9 ## Events per job
175 slacapra 1.153 if cfg_params.has_key('CMSSW.events_per_job'):
176 slacapra 1.10 self.eventsPerJob =int( cfg_params['CMSSW.events_per_job'])
177 slacapra 1.9 self.selectEventsPerJob = 1
178 slacapra 1.153 else:
179 slacapra 1.9 self.eventsPerJob = -1
180     self.selectEventsPerJob = 0
181 ewv 1.131
182 slacapra 1.22 ## number of jobs
183 slacapra 1.153 if cfg_params.has_key('CMSSW.number_of_jobs'):
184 slacapra 1.22 self.theNumberOfJobs =int( cfg_params['CMSSW.number_of_jobs'])
185     self.selectNumberOfJobs = 1
186 slacapra 1.153 else:
187 slacapra 1.22 self.theNumberOfJobs = 0
188     self.selectNumberOfJobs = 0
189 slacapra 1.10
190 slacapra 1.153 if cfg_params.has_key('CMSSW.total_number_of_events'):
191 gutsche 1.35 self.total_number_of_events = int(cfg_params['CMSSW.total_number_of_events'])
192     self.selectTotalNumberEvents = 1
193 slacapra 1.153 else:
194 gutsche 1.35 self.total_number_of_events = 0
195     self.selectTotalNumberEvents = 0
196    
197 ewv 1.131 if self.pset != None: #CarlosDaniele
198 spiga 1.42 if ( (self.selectTotalNumberEvents + self.selectEventsPerJob + self.selectNumberOfJobs) != 2 ):
199     msg = 'Must define exactly two of total_number_of_events, events_per_job, or number_of_jobs.'
200     raise CrabException(msg)
201     else:
202     if (self.selectNumberOfJobs == 0):
203     msg = 'Must specify number_of_jobs.'
204     raise CrabException(msg)
205 gutsche 1.35
206 ewv 1.160 ## New method of dealing with seeds
207     self.incrementSeeds = []
208     self.preserveSeeds = []
209     if cfg_params.has_key('CMSSW.preserve_seeds'):
210     tmpList = cfg_params['CMSSW.preserve_seeds'].split(',')
211     for tmp in tmpList:
212     tmp.strip()
213     self.preserveSeeds.append(tmp)
214     if cfg_params.has_key('CMSSW.increment_seeds'):
215     tmpList = cfg_params['CMSSW.increment_seeds'].split(',')
216     for tmp in tmpList:
217     tmp.strip()
218     self.incrementSeeds.append(tmp)
219    
220     ## Old method of dealing with seeds
221     ## FUTURE: This is for old CMSSW and old CRAB. Can throw exceptions after a couple of CRAB releases and then
222     ## remove
223 slacapra 1.153 self.sourceSeed = cfg_params.get('CMSSW.pythia_seed',None)
224 ewv 1.160 if self.sourceSeed:
225     print "pythia_seed is a deprecated parameter. Use preserve_seeds or increment_seeds in the future.\n","Added to increment_seeds."
226     self.incrementSeeds.append('sourceSeed')
227 slacapra 1.153
228     self.sourceSeedVtx = cfg_params.get('CMSSW.vtx_seed',None)
229 ewv 1.160 if self.sourceSeedVtx:
230     print "vtx_seed is a deprecated parameter. Use preserve_seeds or increment_seeds in the future.\n","Added to increment_seeds."
231     self.incrementSeeds.append('VtxSmeared')
232 slacapra 1.22
233 slacapra 1.153 self.sourceSeedG4 = cfg_params.get('CMSSW.g4_seed',None)
234 ewv 1.160 if self.sourceSeedG4:
235     print "g4_seed is a deprecated parameter. Use preserve_seeds or increment_seeds in the future.\n","Added to increment_seeds."
236     self.incrementSeeds.append('g4SimHits')
237 slacapra 1.90
238 slacapra 1.153 self.sourceSeedMix = cfg_params.get('CMSSW.mix_seed',None)
239 ewv 1.160 if self.sourceSeedMix:
240     print "mix_seed is a deprecated parameter. Use preserve_seeds or increment_seeds in the future.\n","Added to increment_seeds."
241     self.incrementSeeds.append('mix')
242 slacapra 1.90
243 slacapra 1.153 self.firstRun = cfg_params.get('CMSSW.first_run',None)
244 slacapra 1.90
245 spiga 1.42 if self.pset != None: #CarlosDaniele
246 ewv 1.131 import PsetManipulator as pp
247 slacapra 1.97 PsetEdit = pp.PsetManipulator(self.pset) #Daniele Pset
248 gutsche 1.3
249 ewv 1.147 # Copy/return
250    
251 slacapra 1.153 self.copy_data = int(cfg_params.get('USER.copy_data',0))
252     self.return_data = int(cfg_params.get('USER.return_data',0))
253 ewv 1.147
254 slacapra 1.1 #DBSDLS-start
255 ewv 1.131 ## Initialize the variables that are extracted from DBS/DLS and needed in other places of the code
256 slacapra 1.1 self.maxEvents=0 # max events available ( --> check the requested nb. of evts in Creator.py)
257     self.DBSPaths={} # all dbs paths requested ( --> input to the site local discovery script)
258 gutsche 1.35 self.jobDestination=[] # Site destination(s) for each job (list of lists)
259 slacapra 1.1 ## Perform the data location and discovery (based on DBS/DLS)
260 slacapra 1.9 ## SL: Don't if NONE is specified as input (pythia use case)
261 gutsche 1.35 blockSites = {}
262 slacapra 1.9 if self.datasetPath:
263 gutsche 1.35 blockSites = self.DataDiscoveryAndLocation(cfg_params)
264 ewv 1.131 #DBSDLS-end
265 slacapra 1.1
266     self.tgzNameWithPath = self.getTarBall(self.executable)
267 ewv 1.131
268 slacapra 1.9 ## Select Splitting
269 ewv 1.131 if self.selectNoInput:
270 spiga 1.42 if self.pset == None: #CarlosDaniele
271     self.jobSplittingForScript()
272     else:
273     self.jobSplittingNoInput()
274 gutsche 1.92 else:
275 corvo 1.56 self.jobSplittingByBlocks(blockSites)
276 gutsche 1.5
277 slacapra 1.22 # modify Pset
278 spiga 1.42 if self.pset != None: #CarlosDaniele
279 slacapra 1.86 try:
280 ewv 1.160 # Add FrameworkJobReport to parameter-set, set max events.
281     # Reset later for data jobs by writeCFG which does all modifications
282 slacapra 1.90 PsetEdit.addCrabFJR(self.fjrFileName)
283 ewv 1.160 PsetEdit.maxEvent(self.eventsPerJob)
284 slacapra 1.90 PsetEdit.psetWriter(self.configFilename())
285 slacapra 1.86 except:
286     msg='Error while manipuliating ParameterSet: exiting...'
287     raise CrabException(msg)
288 gutsche 1.3
289 slacapra 1.1 def DataDiscoveryAndLocation(self, cfg_params):
290    
291 slacapra 1.86 import DataDiscovery
292     import DataLocation
293 gutsche 1.3 common.logger.debug(10,"CMSSW::DataDiscoveryAndLocation()")
294    
295     datasetPath=self.datasetPath
296    
297 slacapra 1.1 ## Contact the DBS
298 gutsche 1.92 common.logger.message("Contacting Data Discovery Services ...")
299 slacapra 1.1 try:
300 slacapra 1.137 self.pubdata=DataDiscovery.DataDiscovery(datasetPath, cfg_params)
301 slacapra 1.1 self.pubdata.fetchDBSInfo()
302    
303 slacapra 1.41 except DataDiscovery.NotExistingDatasetError, ex :
304 slacapra 1.1 msg = 'ERROR ***: failed Data Discovery in DBS : %s'%ex.getErrorMessage()
305     raise CrabException(msg)
306 slacapra 1.41 except DataDiscovery.NoDataTierinProvenanceError, ex :
307 slacapra 1.1 msg = 'ERROR ***: failed Data Discovery in DBS : %s'%ex.getErrorMessage()
308     raise CrabException(msg)
309 slacapra 1.41 except DataDiscovery.DataDiscoveryError, ex:
310 gutsche 1.66 msg = 'ERROR ***: failed Data Discovery in DBS : %s'%ex.getErrorMessage()
311 slacapra 1.1 raise CrabException(msg)
312    
313 gutsche 1.35 self.filesbyblock=self.pubdata.getFiles()
314 mkirn 1.37 self.eventsbyblock=self.pubdata.getEventsPerBlock()
315     self.eventsbyfile=self.pubdata.getEventsPerFile()
316 gutsche 1.3
317 slacapra 1.1 ## get max number of events
318 ewv 1.131 self.maxEvents=self.pubdata.getMaxEvents() ## self.maxEvents used in Creator.py
319 slacapra 1.1
320     ## Contact the DLS and build a list of sites hosting the fileblocks
321     try:
322 slacapra 1.41 dataloc=DataLocation.DataLocation(self.filesbyblock.keys(),cfg_params)
323 gutsche 1.6 dataloc.fetchDLSInfo()
324 slacapra 1.41 except DataLocation.DataLocationError , ex:
325 slacapra 1.1 msg = 'ERROR ***: failed Data Location in DLS \n %s '%ex.getErrorMessage()
326     raise CrabException(msg)
327 ewv 1.131
328 slacapra 1.1
329 gutsche 1.35 sites = dataloc.getSites()
330     allSites = []
331     listSites = sites.values()
332 slacapra 1.63 for listSite in listSites:
333     for oneSite in listSite:
334 gutsche 1.35 allSites.append(oneSite)
335     allSites = self.uniquelist(allSites)
336 gutsche 1.3
337 gutsche 1.92 # screen output
338     common.logger.message("Requested dataset: " + datasetPath + " has " + str(self.maxEvents) + " events in " + str(len(self.filesbyblock.keys())) + " blocks.\n")
339    
340 gutsche 1.35 return sites
341 ewv 1.131
342 ewv 1.170 # to Be Removed DS -- BL
343 spiga 1.165 # def setArgsList(self, argsList):
344     # self.argsList = argsList
345 mcinquil 1.140
346 gutsche 1.35 def jobSplittingByBlocks(self, blockSites):
347 slacapra 1.9 """
348 gutsche 1.35 Perform job splitting. Jobs run over an integer number of files
349     and no more than one block.
350     ARGUMENT: blockSites: dictionary with blocks as keys and list of host sites as values
351     REQUIRES: self.selectTotalNumberEvents, self.selectEventsPerJob, self.selectNumberofJobs,
352     self.total_number_of_events, self.eventsPerJob, self.theNumberOfJobs,
353     self.maxEvents, self.filesbyblock
354     SETS: self.jobDestination - Site destination(s) for each job (a list of lists)
355     self.total_number_of_jobs - Total # of jobs
356     self.list_of_args - File(s) job will run on (a list of lists)
357     """
358    
359     # ---- Handle the possible job splitting configurations ---- #
360     if (self.selectTotalNumberEvents):
361     totalEventsRequested = self.total_number_of_events
362     if (self.selectEventsPerJob):
363     eventsPerJobRequested = self.eventsPerJob
364     if (self.selectNumberOfJobs):
365     totalEventsRequested = self.theNumberOfJobs * self.eventsPerJob
366    
367     # If user requested all the events in the dataset
368     if (totalEventsRequested == -1):
369     eventsRemaining=self.maxEvents
370     # If user requested more events than are in the dataset
371     elif (totalEventsRequested > self.maxEvents):
372     eventsRemaining = self.maxEvents
373     common.logger.message("Requested "+str(self.total_number_of_events)+ " events, but only "+str(self.maxEvents)+" events are available.")
374     # If user requested less events than are in the dataset
375     else:
376     eventsRemaining = totalEventsRequested
377 slacapra 1.22
378 slacapra 1.41 # If user requested more events per job than are in the dataset
379     if (self.selectEventsPerJob and eventsPerJobRequested > self.maxEvents):
380     eventsPerJobRequested = self.maxEvents
381    
382 gutsche 1.35 # For user info at end
383     totalEventCount = 0
384 gutsche 1.3
385 gutsche 1.35 if (self.selectTotalNumberEvents and self.selectNumberOfJobs):
386     eventsPerJobRequested = int(eventsRemaining/self.theNumberOfJobs)
387 slacapra 1.22
388 gutsche 1.35 if (self.selectNumberOfJobs):
389     common.logger.message("May not create the exact number_of_jobs requested.")
390 slacapra 1.23
391 gutsche 1.38 if ( self.ncjobs == 'all' ) :
392     totalNumberOfJobs = 999999999
393     else :
394     totalNumberOfJobs = self.ncjobs
395 ewv 1.131
396 gutsche 1.38
397 gutsche 1.35 blocks = blockSites.keys()
398     blockCount = 0
399     # Backup variable in case self.maxEvents counted events in a non-included block
400     numBlocksInDataset = len(blocks)
401 gutsche 1.3
402 gutsche 1.35 jobCount = 0
403     list_of_lists = []
404 gutsche 1.3
405 gutsche 1.92 # list tracking which jobs are in which jobs belong to which block
406     jobsOfBlock = {}
407    
408 gutsche 1.35 # ---- Iterate over the blocks in the dataset until ---- #
409     # ---- we've met the requested total # of events ---- #
410 gutsche 1.38 while ( (eventsRemaining > 0) and (blockCount < numBlocksInDataset) and (jobCount < totalNumberOfJobs)):
411 gutsche 1.35 block = blocks[blockCount]
412 gutsche 1.44 blockCount += 1
413 gutsche 1.104 if block not in jobsOfBlock.keys() :
414     jobsOfBlock[block] = []
415 ewv 1.131
416 gutsche 1.68 if self.eventsbyblock.has_key(block) :
417     numEventsInBlock = self.eventsbyblock[block]
418     common.logger.debug(5,'Events in Block File '+str(numEventsInBlock))
419 ewv 1.131
420 gutsche 1.68 files = self.filesbyblock[block]
421     numFilesInBlock = len(files)
422     if (numFilesInBlock <= 0):
423     continue
424     fileCount = 0
425    
426     # ---- New block => New job ---- #
427 ewv 1.131 parString = ""
428 gutsche 1.68 # counter for number of events in files currently worked on
429     filesEventCount = 0
430     # flag if next while loop should touch new file
431     newFile = 1
432     # job event counter
433     jobSkipEventCount = 0
434 ewv 1.131
435 gutsche 1.68 # ---- Iterate over the files in the block until we've met the requested ---- #
436     # ---- total # of events or we've gone over all the files in this block ---- #
437     while ( (eventsRemaining > 0) and (fileCount < numFilesInBlock) and (jobCount < totalNumberOfJobs) ):
438     file = files[fileCount]
439     if newFile :
440     try:
441     numEventsInFile = self.eventsbyfile[file]
442     common.logger.debug(6, "File "+str(file)+" has "+str(numEventsInFile)+" events")
443     # increase filesEventCount
444     filesEventCount += numEventsInFile
445     # Add file to current job
446     parString += '\\\"' + file + '\\\"\,'
447     newFile = 0
448     except KeyError:
449     common.logger.message("File "+str(file)+" has unknown number of events: skipping")
450 ewv 1.131
451 gutsche 1.38
452 gutsche 1.68 # if less events in file remain than eventsPerJobRequested
453     if ( filesEventCount - jobSkipEventCount < eventsPerJobRequested ) :
454     # if last file in block
455     if ( fileCount == numFilesInBlock-1 ) :
456     # end job using last file, use remaining events in block
457     # close job and touch new file
458     fullString = parString[:-2]
459     list_of_lists.append([fullString,str(-1),str(jobSkipEventCount)])
460     common.logger.debug(3,"Job "+str(jobCount+1)+" can run over "+str(filesEventCount - jobSkipEventCount)+" events (last file in block).")
461     self.jobDestination.append(blockSites[block])
462     common.logger.debug(5,"Job "+str(jobCount+1)+" Destination: "+str(self.jobDestination[jobCount]))
463 gutsche 1.92 # fill jobs of block dictionary
464 gutsche 1.104 jobsOfBlock[block].append(jobCount+1)
465 gutsche 1.68 # reset counter
466     jobCount = jobCount + 1
467     totalEventCount = totalEventCount + filesEventCount - jobSkipEventCount
468     eventsRemaining = eventsRemaining - filesEventCount + jobSkipEventCount
469     jobSkipEventCount = 0
470     # reset file
471 ewv 1.131 parString = ""
472 gutsche 1.68 filesEventCount = 0
473     newFile = 1
474     fileCount += 1
475     else :
476     # go to next file
477     newFile = 1
478     fileCount += 1
479     # if events in file equal to eventsPerJobRequested
480     elif ( filesEventCount - jobSkipEventCount == eventsPerJobRequested ) :
481 gutsche 1.38 # close job and touch new file
482     fullString = parString[:-2]
483 gutsche 1.68 list_of_lists.append([fullString,str(eventsPerJobRequested),str(jobSkipEventCount)])
484     common.logger.debug(3,"Job "+str(jobCount+1)+" can run over "+str(eventsPerJobRequested)+" events.")
485 gutsche 1.38 self.jobDestination.append(blockSites[block])
486     common.logger.debug(5,"Job "+str(jobCount+1)+" Destination: "+str(self.jobDestination[jobCount]))
487 gutsche 1.104 jobsOfBlock[block].append(jobCount+1)
488 gutsche 1.38 # reset counter
489     jobCount = jobCount + 1
490 gutsche 1.68 totalEventCount = totalEventCount + eventsPerJobRequested
491     eventsRemaining = eventsRemaining - eventsPerJobRequested
492 gutsche 1.38 jobSkipEventCount = 0
493     # reset file
494 ewv 1.131 parString = ""
495 gutsche 1.38 filesEventCount = 0
496     newFile = 1
497     fileCount += 1
498 ewv 1.131
499 gutsche 1.68 # if more events in file remain than eventsPerJobRequested
500 gutsche 1.38 else :
501 gutsche 1.68 # close job but don't touch new file
502     fullString = parString[:-2]
503     list_of_lists.append([fullString,str(eventsPerJobRequested),str(jobSkipEventCount)])
504     common.logger.debug(3,"Job "+str(jobCount+1)+" can run over "+str(eventsPerJobRequested)+" events.")
505     self.jobDestination.append(blockSites[block])
506     common.logger.debug(5,"Job "+str(jobCount+1)+" Destination: "+str(self.jobDestination[jobCount]))
507 gutsche 1.104 jobsOfBlock[block].append(jobCount+1)
508 gutsche 1.68 # increase counter
509     jobCount = jobCount + 1
510     totalEventCount = totalEventCount + eventsPerJobRequested
511     eventsRemaining = eventsRemaining - eventsPerJobRequested
512     # calculate skip events for last file
513     # use filesEventCount (contains several files), jobSkipEventCount and eventsPerJobRequest
514     jobSkipEventCount = eventsPerJobRequested - (filesEventCount - jobSkipEventCount - self.eventsbyfile[file])
515     # remove all but the last file
516     filesEventCount = self.eventsbyfile[file]
517 ewv 1.160 parString = '\\\"' + file + '\\\"\,'
518 gutsche 1.68 pass # END if
519     pass # END while (iterate over files in the block)
520 gutsche 1.35 pass # END while (iterate over blocks in the dataset)
521 slacapra 1.41 self.ncjobs = self.total_number_of_jobs = jobCount
522 gutsche 1.38 if (eventsRemaining > 0 and jobCount < totalNumberOfJobs ):
523 gutsche 1.35 common.logger.message("Could not run on all requested events because some blocks not hosted at allowed sites.")
524 gutsche 1.92 common.logger.message(str(jobCount)+" job(s) can run on "+str(totalEventCount)+" events.\n")
525 ewv 1.131
526 gutsche 1.92 # screen output
527     screenOutput = "List of jobs and available destination sites:\n\n"
528    
529 mcinquil 1.124 # keep trace of block with no sites to print a warning at the end
530     noSiteBlock = []
531     bloskNoSite = []
532    
533 gutsche 1.92 blockCounter = 0
534 gutsche 1.104 for block in blocks:
535     if block in jobsOfBlock.keys() :
536     blockCounter += 1
537 slacapra 1.176 screenOutput += "Block %5i: jobs %20s: sites: %s\n" % (blockCounter,spanRanges(jobsOfBlock[block]),
538     ','.join(self.blackWhiteListParser.checkWhiteList(self.blackWhiteListParser.checkBlackList(blockSites[block],block),block)))
539 mcinquil 1.124 if len(self.blackWhiteListParser.checkWhiteList(self.blackWhiteListParser.checkBlackList(blockSites[block],block),block)) == 0:
540 ewv 1.131 noSiteBlock.append( spanRanges(jobsOfBlock[block]) )
541 mcinquil 1.124 bloskNoSite.append( blockCounter )
542 ewv 1.131
543 mcinquil 1.124 common.logger.message(screenOutput)
544 fanzago 1.127 if len(noSiteBlock) > 0 and len(bloskNoSite) > 0:
545 mcinquil 1.126 msg = 'WARNING: No sites are hosting any part of data for block:\n '
546     virgola = ""
547     if len(bloskNoSite) > 1:
548     virgola = ","
549     for block in bloskNoSite:
550     msg += ' ' + str(block) + virgola
551     msg += '\n Related jobs:\n '
552     virgola = ""
553     if len(noSiteBlock) > 1:
554     virgola = ","
555     for range_jobs in noSiteBlock:
556     msg += str(range_jobs) + virgola
557     msg += '\n will not be submitted and this block of data can not be analyzed!\n'
558 slacapra 1.155 if self.cfg_params.has_key('EDG.se_white_list'):
559     msg += 'WARNING: SE White List: '+self.cfg_params['EDG.se_white_list']+'\n'
560     msg += '(Hint: By whitelisting you force the job to run at this particular site(s).\n'
561     msg += 'Please check if the dataset is available at this site!)\n'
562     if self.cfg_params.has_key('EDG.ce_white_list'):
563     msg += 'WARNING: CE White List: '+self.cfg_params['EDG.ce_white_list']+'\n'
564     msg += '(Hint: By whitelisting you force the job to run at this particular site(s).\n'
565     msg += 'Please check if the dataset is available at this site!)\n'
566    
567 mcinquil 1.126 common.logger.message(msg)
568 gutsche 1.92
569 slacapra 1.9 self.list_of_args = list_of_lists
570     return
571    
572 slacapra 1.21 def jobSplittingNoInput(self):
573 slacapra 1.9 """
574     Perform job splitting based on number of event per job
575     """
576     common.logger.debug(5,'Splitting per events')
577 fanzago 1.130
578 ewv 1.131 if (self.selectEventsPerJob):
579 fanzago 1.130 common.logger.message('Required '+str(self.eventsPerJob)+' events per job ')
580     if (self.selectNumberOfJobs):
581     common.logger.message('Required '+str(self.theNumberOfJobs)+' jobs in total ')
582     if (self.selectTotalNumberEvents):
583     common.logger.message('Required '+str(self.total_number_of_events)+' events in total ')
584 slacapra 1.9
585 slacapra 1.10 if (self.total_number_of_events < 0):
586     msg='Cannot split jobs per Events with "-1" as total number of events'
587     raise CrabException(msg)
588    
589 slacapra 1.22 if (self.selectEventsPerJob):
590 spiga 1.65 if (self.selectTotalNumberEvents):
591     self.total_number_of_jobs = int(self.total_number_of_events/self.eventsPerJob)
592 ewv 1.131 elif(self.selectNumberOfJobs) :
593 spiga 1.65 self.total_number_of_jobs =self.theNumberOfJobs
594 ewv 1.131 self.total_number_of_events =int(self.theNumberOfJobs*self.eventsPerJob)
595 spiga 1.65
596 slacapra 1.22 elif (self.selectNumberOfJobs) :
597     self.total_number_of_jobs = self.theNumberOfJobs
598     self.eventsPerJob = int(self.total_number_of_events/self.total_number_of_jobs)
599 ewv 1.131
600 slacapra 1.9 common.logger.debug(5,'N jobs '+str(self.total_number_of_jobs))
601    
602     # is there any remainder?
603     check = int(self.total_number_of_events) - (int(self.total_number_of_jobs)*self.eventsPerJob)
604    
605     common.logger.debug(5,'Check '+str(check))
606    
607 gutsche 1.35 common.logger.message(str(self.total_number_of_jobs)+' jobs can be created, each for '+str(self.eventsPerJob)+' for a total of '+str(self.total_number_of_jobs*self.eventsPerJob)+' events')
608 slacapra 1.9 if check > 0:
609 gutsche 1.35 common.logger.message('Warning: asked '+str(self.total_number_of_events)+' but can do only '+str(int(self.total_number_of_jobs)*self.eventsPerJob))
610 slacapra 1.9
611 slacapra 1.10 # argument is seed number.$i
612 slacapra 1.9 self.list_of_args = []
613     for i in range(self.total_number_of_jobs):
614 gutsche 1.35 ## Since there is no input, any site is good
615 ewv 1.131 self.jobDestination.append([""]) #must be empty to write correctly the xml
616 slacapra 1.90 args=[]
617 spiga 1.57 if (self.firstRun):
618 slacapra 1.138 ## pythia first run
619 slacapra 1.90 args.append(str(self.firstRun)+str(i))
620     self.list_of_args.append(args)
621 ewv 1.131
622 gutsche 1.3 return
623    
624 spiga 1.42
625     def jobSplittingForScript(self):#CarlosDaniele
626     """
627     Perform job splitting based on number of job
628     """
629     common.logger.debug(5,'Splitting per job')
630     common.logger.message('Required '+str(self.theNumberOfJobs)+' jobs in total ')
631    
632     self.total_number_of_jobs = self.theNumberOfJobs
633    
634     common.logger.debug(5,'N jobs '+str(self.total_number_of_jobs))
635    
636     common.logger.message(str(self.total_number_of_jobs)+' jobs can be created')
637    
638     # argument is seed number.$i
639     self.list_of_args = []
640     for i in range(self.total_number_of_jobs):
641     ## Since there is no input, any site is good
642     # self.jobDestination.append(["Any"])
643     self.jobDestination.append([""])
644     ## no random seed
645     self.list_of_args.append([str(i)])
646     return
647    
648 gutsche 1.3 def split(self, jobParams):
649 ewv 1.131
650 gutsche 1.3 #### Fabio
651     njobs = self.total_number_of_jobs
652 slacapra 1.9 arglist = self.list_of_args
653 gutsche 1.3 # create the empty structure
654     for i in range(njobs):
655     jobParams.append("")
656 ewv 1.131
657 spiga 1.165 listID=[]
658     listField=[]
659 gutsche 1.3 for job in range(njobs):
660 slacapra 1.17 jobParams[job] = arglist[job]
661 spiga 1.167 listID.append(job+1)
662 spiga 1.162 job_ToSave ={}
663 spiga 1.169 concString = ' '
664 spiga 1.165 argu=''
665     if len(jobParams[job]):
666     argu += concString.join(jobParams[job] )
667 spiga 1.169 job_ToSave['arguments']= str(job+1)+' '+argu## new BL--DS
668 spiga 1.162 job_ToSave['dlsDestination']= self.jobDestination[job]## new BL--DS
669 spiga 1.165 #common._db.updateJob_(job,job_ToSave)## new BL--DS
670     listField.append(job_ToSave)
671 spiga 1.169 msg="Job "+str(job)+" Arguments: "+str(job+1)+" "+argu+"\n" \
672 spiga 1.165 +" Destination: "+str(self.jobDestination[job])
673     common.logger.debug(5,msg)
674     #common.logger.debug(5,"Job "+str(job)+" Destination: "+str(self.jobDestination[job]))
675     common._db.updateJob_(listID,listField)## new BL--DS
676     ## Pay Attention Here....DS--BL
677 ewv 1.170 self.argsList = (len(jobParams[1])+1)
678 gutsche 1.3
679     return
680 ewv 1.131
681 gutsche 1.3 def numberOfJobs(self):
682     # Fabio
683     return self.total_number_of_jobs
684    
685 slacapra 1.1 def getTarBall(self, exe):
686     """
687     Return the TarBall with lib and exe
688     """
689 ewv 1.131
690 slacapra 1.1 # if it exist, just return it
691 corvo 1.56 #
692     # Marco. Let's start to use relative path for Boss XML files
693     #
694     self.tgzNameWithPath = common.work_space.pathForTgz()+'share/'+self.tgz_name
695 slacapra 1.1 if os.path.exists(self.tgzNameWithPath):
696     return self.tgzNameWithPath
697    
698     # Prepare a tar gzipped file with user binaries.
699     self.buildTar_(exe)
700    
701     return string.strip(self.tgzNameWithPath)
702    
703     def buildTar_(self, executable):
704    
705     # First of all declare the user Scram area
706     swArea = self.scram.getSWArea_()
707     #print "swArea = ", swArea
708 slacapra 1.63 # swVersion = self.scram.getSWVersion()
709     # print "swVersion = ", swVersion
710 slacapra 1.1 swReleaseTop = self.scram.getReleaseTop_()
711     #print "swReleaseTop = ", swReleaseTop
712 ewv 1.131
713 slacapra 1.1 ## check if working area is release top
714     if swReleaseTop == '' or swArea == swReleaseTop:
715 afanfani 1.172 common.logger.debug(3,"swArea = "+swArea+" swReleaseTop ="+swReleaseTop)
716 slacapra 1.1 return
717    
718 slacapra 1.61 import tarfile
719     try: # create tar ball
720     tar = tarfile.open(self.tgzNameWithPath, "w:gz")
721     ## First find the executable
722 slacapra 1.86 if (self.executable != ''):
723 slacapra 1.61 exeWithPath = self.scram.findFile_(executable)
724     if ( not exeWithPath ):
725     raise CrabException('User executable '+executable+' not found')
726 ewv 1.131
727 slacapra 1.61 ## then check if it's private or not
728     if exeWithPath.find(swReleaseTop) == -1:
729     # the exe is private, so we must ship
730     common.logger.debug(5,"Exe "+exeWithPath+" to be tarred")
731     path = swArea+'/'
732 corvo 1.85 # distinguish case when script is in user project area or given by full path somewhere else
733     if exeWithPath.find(path) >= 0 :
734     exe = string.replace(exeWithPath, path,'')
735 slacapra 1.129 tar.add(path+exe,exe)
736 corvo 1.85 else :
737     tar.add(exeWithPath,os.path.basename(executable))
738 slacapra 1.61 pass
739     else:
740     # the exe is from release, we'll find it on WN
741     pass
742 ewv 1.131
743 slacapra 1.61 ## Now get the libraries: only those in local working area
744     libDir = 'lib'
745     lib = swArea+'/' +libDir
746     common.logger.debug(5,"lib "+lib+" to be tarred")
747     if os.path.exists(lib):
748     tar.add(lib,libDir)
749 ewv 1.131
750 slacapra 1.61 ## Now check if module dir is present
751     moduleDir = 'module'
752     module = swArea + '/' + moduleDir
753     if os.path.isdir(module):
754     tar.add(module,moduleDir)
755    
756     ## Now check if any data dir(s) is present
757     swAreaLen=len(swArea)
758     for root, dirs, files in os.walk(swArea):
759     if "data" in dirs:
760     common.logger.debug(5,"data "+root+"/data"+" to be tarred")
761     tar.add(root+"/data",root[swAreaLen:]+"/data")
762    
763 fanzago 1.93
764 fanzago 1.152 ## Add ProdCommon dir to tar
765 fanzago 1.93 prodcommonDir = 'ProdCommon'
766     prodcommonPath = os.environ['CRABDIR'] + '/' + 'ProdCommon'
767     if os.path.isdir(prodcommonPath):
768     tar.add(prodcommonPath,prodcommonDir)
769 ewv 1.131
770 slacapra 1.61 common.logger.debug(5,"Files added to "+self.tgzNameWithPath+" : "+str(tar.getnames()))
771     tar.close()
772     except :
773     raise CrabException('Could not create tar-ball')
774 gutsche 1.72
775     ## check for tarball size
776     tarballinfo = os.stat(self.tgzNameWithPath)
777     if ( tarballinfo.st_size > self.MaxTarBallSize*1024*1024 ) :
778     raise CrabException('Input sandbox size of ' + str(float(tarballinfo.st_size)/1024.0/1024.0) + ' MB is larger than the allowed ' + str(self.MaxTarBallSize) + ' MB input sandbox limit and not supported by the used GRID submission system. Please make sure that no unnecessary files are in all data directories in your local CMSSW project area as they are automatically packed into the input sandbox.')
779    
780 slacapra 1.61 ## create tar-ball with ML stuff
781 ewv 1.131 self.MLtgzfile = common.work_space.pathForTgz()+'share/MLfiles.tgz'
782 slacapra 1.61 try:
783     tar = tarfile.open(self.MLtgzfile, "w:gz")
784     path=os.environ['CRABDIR'] + '/python/'
785 fanzago 1.166 for file in ['report.py', 'DashboardAPI.py', 'Logger.py', 'ProcInfo.py', 'apmon.py', 'parseCrabFjr.py','writeCfg.py', 'JobReportErrorCode.py']:
786 slacapra 1.61 tar.add(path+file,file)
787     common.logger.debug(5,"Files added to "+self.MLtgzfile+" : "+str(tar.getnames()))
788     tar.close()
789     except :
790 corvo 1.58 raise CrabException('Could not create ML files tar-ball')
791 ewv 1.131
792 slacapra 1.1 return
793 ewv 1.131
794 slacapra 1.97 def additionalInputFileTgz(self):
795     """
796     Put all additional files into a tar ball and return its name
797     """
798     import tarfile
799     tarName= common.work_space.pathForTgz()+'share/'+self.additional_tgz_name
800     tar = tarfile.open(tarName, "w:gz")
801     for file in self.additional_inbox_files:
802     tar.add(file,string.split(file,'/')[-1])
803     common.logger.debug(5,"Files added to "+self.additional_tgz_name+" : "+str(tar.getnames()))
804     tar.close()
805     return tarName
806    
807 spiga 1.165 def wsSetupEnvironment(self, nj=0):
808 slacapra 1.1 """
809     Returns part of a job script which prepares
810     the execution environment for the job 'nj'.
811     """
812     # Prepare JobType-independent part
813 ewv 1.160 txt = '\n#Written by cms_cmssw::wsSetupEnvironment\n'
814 fanzago 1.133 txt += 'echo ">>> setup environment"\n'
815 ewv 1.131 txt += 'if [ $middleware == LCG ]; then \n'
816 gutsche 1.3 txt += self.wsSetupCMSLCGEnvironment_()
817     txt += 'elif [ $middleware == OSG ]; then\n'
818 gutsche 1.43 txt += ' WORKING_DIR=`/bin/mktemp -d $OSG_WN_TMP/cms_XXXXXXXXXXXX`\n'
819 ewv 1.132 txt += ' if [ ! $? == 0 ] ;then\n'
820 fanzago 1.161 txt += ' echo "ERROR ==> OSG $WORKING_DIR could not be created on WN `hostname`"\n'
821     txt += ' job_exit_code=10016\n'
822     txt += ' func_exit\n'
823 gutsche 1.3 txt += ' fi\n'
824 fanzago 1.133 txt += ' echo ">>> Created working directory: $WORKING_DIR"\n'
825 gutsche 1.3 txt += '\n'
826     txt += ' echo "Change to working directory: $WORKING_DIR"\n'
827     txt += ' cd $WORKING_DIR\n'
828 fanzago 1.133 txt += ' echo ">>> current directory (WORKING_DIR): $WORKING_DIR"\n'
829 ewv 1.131 txt += self.wsSetupCMSOSGEnvironment_()
830 fanzago 1.133 #txt += ' echo "### Set SCRAM ARCH to ' + self.executable_arch + ' ###"\n'
831     #txt += ' export SCRAM_ARCH='+self.executable_arch+'\n'
832 gutsche 1.3 txt += 'fi\n'
833 slacapra 1.1
834     # Prepare JobType-specific part
835     scram = self.scram.commandName()
836     txt += '\n\n'
837 fanzago 1.133 txt += 'echo ">>> specific cmssw setup environment:"\n'
838     txt += 'echo "CMSSW_VERSION = '+self.version+'"\n'
839 slacapra 1.1 txt += scram+' project CMSSW '+self.version+'\n'
840     txt += 'status=$?\n'
841     txt += 'if [ $status != 0 ] ; then\n'
842 fanzago 1.161 txt += ' echo "ERROR ==> CMSSW '+self.version+' not found on `hostname`" \n'
843     txt += ' job_exit_code=10034\n'
844 fanzago 1.163 txt += ' func_exit\n'
845 slacapra 1.1 txt += 'fi \n'
846     txt += 'cd '+self.version+'\n'
847 fanzago 1.99 ########## FEDE FOR DBS2 ######################
848     txt += 'SOFTWARE_DIR=`pwd`\n'
849 fanzago 1.133 txt += 'echo ">>> current directory (SOFTWARE_DIR): $SOFTWARE_DIR" \n'
850 fanzago 1.99 ###############################################
851 slacapra 1.1 ### needed grep for bug in scramv1 ###
852     txt += 'eval `'+scram+' runtime -sh | grep -v SCRAMRT_LSB_JOBNAME`\n'
853     # Handle the arguments:
854     txt += "\n"
855 gutsche 1.7 txt += "## number of arguments (first argument always jobnumber)\n"
856 slacapra 1.1 txt += "\n"
857 spiga 1.165 txt += "if [ $nargs -lt "+str(self.argsList)+" ]\n"
858 slacapra 1.1 txt += "then\n"
859 fanzago 1.161 txt += " echo 'ERROR ==> Too few arguments' +$nargs+ \n"
860     txt += ' job_exit_code=50113\n'
861     txt += " func_exit\n"
862 slacapra 1.1 txt += "fi\n"
863     txt += "\n"
864    
865     # Prepare job-specific part
866     job = common.job_list[nj]
867 fanzago 1.93 ### FEDE FOR DBS OUTPUT PUBLICATION
868 ewv 1.131 if (self.datasetPath):
869 fanzago 1.93 txt += '\n'
870     txt += 'DatasetPath='+self.datasetPath+'\n'
871    
872     datasetpath_split = self.datasetPath.split("/")
873 ewv 1.131
874 fanzago 1.93 txt += 'PrimaryDataset='+datasetpath_split[1]+'\n'
875     txt += 'DataTier='+datasetpath_split[2]+'\n'
876 fanzago 1.96 txt += 'ApplicationFamily=cmsRun\n'
877 fanzago 1.93
878     else:
879     txt += 'DatasetPath=MCDataTier\n'
880     txt += 'PrimaryDataset=null\n'
881     txt += 'DataTier=null\n'
882     txt += 'ApplicationFamily=MCDataTier\n'
883 ewv 1.170 if self.pset != None:
884 spiga 1.42 pset = os.path.basename(job.configFilename())
885     txt += '\n'
886 spiga 1.95 txt += 'cp $RUNTIME_AREA/'+pset+' .\n'
887 spiga 1.42 if (self.datasetPath): # standard job
888 ewv 1.160 txt += 'InputFiles=${args[1]}; export InputFiles\n'
889     txt += 'MaxEvents=${args[2]}; export MaxEvents\n'
890     txt += 'SkipEvents=${args[3]}; export SkipEvents\n'
891 spiga 1.42 txt += 'echo "Inputfiles:<$InputFiles>"\n'
892     txt += 'echo "MaxEvents:<$MaxEvents>"\n'
893     txt += 'echo "SkipEvents:<$SkipEvents>"\n'
894     else: # pythia like job
895 ewv 1.160 txt += 'PreserveSeeds=' + ','.join(self.preserveSeeds) + '; export PreserveSeeds\n'
896     txt += 'IncrementSeeds=' + ','.join(self.incrementSeeds) + '; export IncrementSeeds\n'
897     txt += 'echo "PreserveSeeds: <$PreserveSeeds>"\n'
898     txt += 'echo "IncrementSeeds:<$IncrementSeeds>"\n'
899 slacapra 1.90 if (self.firstRun):
900 ewv 1.160 txt += 'FirstRun=${args[1]}; export FirstRun\n'
901 spiga 1.57 txt += 'echo "FirstRun: <$FirstRun>"\n'
902 slacapra 1.90
903     txt += 'mv -f '+pset+' pset.cfg\n'
904 slacapra 1.1
905     if len(self.additional_inbox_files) > 0:
906 slacapra 1.97 txt += 'if [ -e $RUNTIME_AREA/'+self.additional_tgz_name+' ] ; then\n'
907     txt += ' tar xzvf $RUNTIME_AREA/'+self.additional_tgz_name+'\n'
908     txt += 'fi\n'
909 ewv 1.131 pass
910 slacapra 1.1
911 fanzago 1.163 if self.pset != None:
912 spiga 1.42 txt += '\n'
913     txt += 'echo "***** cat pset.cfg *********"\n'
914     txt += 'cat pset.cfg\n'
915     txt += 'echo "****** end pset.cfg ********"\n'
916     txt += '\n'
917 fanzago 1.94 txt += 'PSETHASH=`EdmConfigHash < pset.cfg` \n'
918     txt += 'echo "PSETHASH = $PSETHASH" \n'
919 fanzago 1.93 txt += '\n'
920 gutsche 1.3 return txt
921 slacapra 1.176
922 fanzago 1.166 def wsUntarSoftware(self, nj=0):
923 gutsche 1.3 """
924     Put in the script the commands to build an executable
925     or a library.
926     """
927    
928 fanzago 1.166 txt = '\n#Written by cms_cmssw::wsUntarSoftware\n'
929 gutsche 1.3
930     if os.path.isfile(self.tgzNameWithPath):
931 fanzago 1.133 txt += 'echo ">>> tar xzvf $RUNTIME_AREA/'+os.path.basename(self.tgzNameWithPath)+' :" \n'
932 gutsche 1.3 txt += 'tar xzvf $RUNTIME_AREA/'+os.path.basename(self.tgzNameWithPath)+'\n'
933     txt += 'untar_status=$? \n'
934     txt += 'if [ $untar_status -ne 0 ]; then \n'
935 fanzago 1.161 txt += ' echo "ERROR ==> Untarring .tgz file failed"\n'
936     txt += ' job_exit_code=$untar_status\n'
937     txt += ' func_exit\n'
938 gutsche 1.3 txt += 'else \n'
939     txt += ' echo "Successful untar" \n'
940     txt += 'fi \n'
941 gutsche 1.50 txt += '\n'
942 fanzago 1.152 txt += 'echo ">>> Include ProdCommon in PYTHONPATH:"\n'
943 gutsche 1.50 txt += 'if [ -z "$PYTHONPATH" ]; then\n'
944 fanzago 1.166 txt += ' export PYTHONPATH=$RUNTIME_AREA/ProdCommon\n'
945 gutsche 1.50 txt += 'else\n'
946 fanzago 1.166 txt += ' export PYTHONPATH=$RUNTIME_AREA/ProdCommon:${PYTHONPATH}\n'
947 fanzago 1.93 txt += 'echo "PYTHONPATH=$PYTHONPATH"\n'
948 gutsche 1.50 txt += 'fi\n'
949     txt += '\n'
950    
951 gutsche 1.3 pass
952 ewv 1.131
953 slacapra 1.1 return txt
954 ewv 1.170
955 fanzago 1.166 def wsBuildExe(self, nj=0):
956     """
957     Put in the script the commands to build an executable
958     or a library.
959     """
960    
961     txt = '\n#Written by cms_cmssw::wsBuildExe\n'
962     txt += 'echo ">>> moving CMSSW software directories in `pwd`" \n'
963    
964 ewv 1.170 txt += 'rm -r lib/ module/ \n'
965     txt += 'mv $RUNTIME_AREA/lib/ . \n'
966     txt += 'mv $RUNTIME_AREA/module/ . \n'
967     txt += 'mv $RUNTIME_AREA/ProdCommon/ . \n'
968    
969 fanzago 1.166 txt += 'if [ -z "$PYTHONPATH" ]; then\n'
970     txt += ' export PYTHONPATH=$SOFTWARE_DIR/ProdCommon\n'
971     txt += 'else\n'
972     txt += ' export PYTHONPATH=$SOFTWARE_DIR/ProdCommon:${PYTHONPATH}\n'
973     txt += 'echo "PYTHONPATH=$PYTHONPATH"\n'
974     txt += 'fi\n'
975     txt += '\n'
976    
977     return txt
978 slacapra 1.1
979     def modifySteeringCards(self, nj):
980     """
981 ewv 1.131 modify the card provided by the user,
982 slacapra 1.1 writing a new card into share dir
983     """
984 ewv 1.131
985 slacapra 1.1 def executableName(self):
986 slacapra 1.70 if self.scriptExe: #CarlosDaniele
987 spiga 1.42 return "sh "
988     else:
989     return self.executable
990 slacapra 1.1
991     def executableArgs(self):
992 ewv 1.160 # FUTURE: This function tests the CMSSW version. Can be simplified as we drop support for old versions
993 slacapra 1.70 if self.scriptExe:#CarlosDaniele
994 spiga 1.42 return self.scriptExe + " $NJob"
995 fanzago 1.115 else:
996     version_array = self.scram.getSWVersion().split('_')
997     major = 0
998     minor = 0
999     try:
1000     major = int(version_array[1])
1001     minor = int(version_array[2])
1002     except:
1003 ewv 1.131 msg = "Cannot parse CMSSW version string: " + "_".join(version_array) + " for major and minor release number!"
1004 fanzago 1.115 raise CrabException(msg)
1005 ewv 1.160
1006     ex_args = ""
1007 ewv 1.171 # FUTURE: This tests the CMSSW version. Can remove code as versions deprecated
1008 ewv 1.160 # Framework job report
1009 fanzago 1.115 if major >= 1 and minor >= 5 :
1010 fanzago 1.166 ex_args += " -j $RUNTIME_AREA/crab_fjr_$NJob.xml"
1011 ewv 1.160 # Type of cfg file
1012     if major >= 2 :
1013 ewv 1.171 ex_args += " -p pset.py"
1014 fanzago 1.115 else:
1015 ewv 1.160 ex_args += " -p pset.cfg"
1016     return ex_args
1017 slacapra 1.1
1018     def inputSandbox(self, nj):
1019     """
1020     Returns a list of filenames to be put in JDL input sandbox.
1021     """
1022     inp_box = []
1023 slacapra 1.53 # # dict added to delete duplicate from input sandbox file list
1024     # seen = {}
1025 slacapra 1.1 ## code
1026     if os.path.isfile(self.tgzNameWithPath):
1027     inp_box.append(self.tgzNameWithPath)
1028 corvo 1.58 if os.path.isfile(self.MLtgzfile):
1029     inp_box.append(self.MLtgzfile)
1030 slacapra 1.1 ## config
1031 slacapra 1.70 if not self.pset is None:
1032 corvo 1.56 inp_box.append(common.work_space.pathForTgz() + 'job/' + self.configFilename())
1033 slacapra 1.1 ## additional input files
1034 slacapra 1.97 tgz = self.additionalInputFileTgz()
1035     inp_box.append(tgz)
1036 spiga 1.168 ## executable
1037     wrapper = os.path.basename(str(common._db.queryTask('scriptName')))
1038     inp_box.append(common.work_space.pathForTgz() +'job/'+ wrapper)
1039 slacapra 1.1 return inp_box
1040    
1041     def outputSandbox(self, nj):
1042     """
1043     Returns a list of filenames to be put in JDL output sandbox.
1044     """
1045     out_box = []
1046    
1047     ## User Declared output files
1048 slacapra 1.54 for out in (self.output_file+self.output_file_sandbox):
1049 ewv 1.131 n_out = nj + 1
1050 slacapra 1.1 out_box.append(self.numberFile_(out,str(n_out)))
1051     return out_box
1052    
1053     def prepareSteeringCards(self):
1054     """
1055     Make initial modifications of the user's steering card file.
1056     """
1057     return
1058    
1059     def wsRenameOutput(self, nj):
1060     """
1061     Returns part of a job script which renames the produced files.
1062     """
1063    
1064 ewv 1.160 txt = '\n#Written by cms_cmssw::wsRenameOutput\n'
1065 fanzago 1.148 txt += 'echo ">>> current directory (SOFTWARE_DIR): $SOFTWARE_DIR" \n'
1066     txt += 'echo ">>> current directory content:"\n'
1067 gutsche 1.7 txt += 'ls \n'
1068 fanzago 1.145 txt += '\n'
1069 slacapra 1.54
1070 fanzago 1.128 for fileWithSuffix in (self.output_file):
1071 slacapra 1.1 output_file_num = self.numberFile_(fileWithSuffix, '$NJob')
1072     txt += '\n'
1073 gutsche 1.7 txt += '# check output file\n'
1074 slacapra 1.106 txt += 'if [ -e ./'+fileWithSuffix+' ] ; then\n'
1075 ewv 1.147 if (self.copy_data == 1): # For OSG nodes, file is in $WORKING_DIR, should not be moved to $RUNTIME_AREA
1076     txt += ' mv '+fileWithSuffix+' '+output_file_num+'\n'
1077     txt += ' ln -s `pwd`/'+output_file_num+' $RUNTIME_AREA/'+fileWithSuffix+'\n'
1078     else:
1079     txt += ' mv '+fileWithSuffix+' $RUNTIME_AREA/'+output_file_num+'\n'
1080     txt += ' ln -s $RUNTIME_AREA/'+output_file_num+' $RUNTIME_AREA/'+fileWithSuffix+'\n'
1081 slacapra 1.106 txt += 'else\n'
1082 fanzago 1.161 txt += ' job_exit_code=60302\n'
1083     txt += ' echo "WARNING: Output file '+fileWithSuffix+' not found"\n'
1084 ewv 1.156 if common.scheduler.name().upper() == 'CONDOR_G':
1085 gutsche 1.7 txt += ' if [ $middleware == OSG ]; then \n'
1086     txt += ' echo "prepare dummy output file"\n'
1087     txt += ' echo "Processing of job output failed" > $RUNTIME_AREA/'+output_file_num+'\n'
1088     txt += ' fi \n'
1089 slacapra 1.1 txt += 'fi\n'
1090 slacapra 1.105 file_list = []
1091     for fileWithSuffix in (self.output_file):
1092     file_list.append(self.numberFile_(fileWithSuffix, '$NJob'))
1093 ewv 1.131
1094 slacapra 1.105 txt += 'file_list="'+string.join(file_list,' ')+'"\n'
1095 fanzago 1.149 txt += '\n'
1096 fanzago 1.148 txt += 'echo ">>> current directory (SOFTWARE_DIR): $SOFTWARE_DIR" \n'
1097     txt += 'echo ">>> current directory content:"\n'
1098     txt += 'ls \n'
1099     txt += '\n'
1100 gutsche 1.7 txt += 'cd $RUNTIME_AREA\n'
1101 fanzago 1.133 txt += 'echo ">>> current directory (RUNTIME_AREA): $RUNTIME_AREA"\n'
1102 slacapra 1.1 return txt
1103    
1104     def numberFile_(self, file, txt):
1105     """
1106     append _'txt' before last extension of a file
1107     """
1108     p = string.split(file,".")
1109     # take away last extension
1110     name = p[0]
1111     for x in p[1:-1]:
1112 slacapra 1.90 name=name+"."+x
1113 slacapra 1.1 # add "_txt"
1114     if len(p)>1:
1115 slacapra 1.90 ext = p[len(p)-1]
1116     result = name + '_' + txt + "." + ext
1117 slacapra 1.1 else:
1118 slacapra 1.90 result = name + '_' + txt
1119 ewv 1.131
1120 slacapra 1.1 return result
1121    
1122 slacapra 1.63 def getRequirements(self, nj=[]):
1123 slacapra 1.1 """
1124 ewv 1.131 return job requirements to add to jdl files
1125 slacapra 1.1 """
1126     req = ''
1127 slacapra 1.47 if self.version:
1128 slacapra 1.10 req='Member("VO-cms-' + \
1129 slacapra 1.47 self.version + \
1130 slacapra 1.10 '", other.GlueHostApplicationSoftwareRunTimeEnvironment)'
1131 farinafa 1.111 ## SL add requirement for OS version only if SL4
1132     #reSL4 = re.compile( r'slc4' )
1133 slacapra 1.109 if self.executable_arch: # and reSL4.search(self.executable_arch):
1134 gutsche 1.107 req+=' && Member("VO-cms-' + \
1135 slacapra 1.105 self.executable_arch + \
1136     '", other.GlueHostApplicationSoftwareRunTimeEnvironment)'
1137 gutsche 1.35
1138     req = req + ' && (other.GlueHostNetworkAdapterOutboundIP)'
1139 afanfani 1.158 if common.scheduler.name() == "glitecoll":
1140     req += ' && other.GlueCEStateStatus == "Production" '
1141 gutsche 1.35
1142 slacapra 1.1 return req
1143 gutsche 1.3
1144     def configFilename(self):
1145     """ return the config filename """
1146     return self.name()+'.cfg'
1147    
1148     def wsSetupCMSOSGEnvironment_(self):
1149     """
1150     Returns part of a job script which is prepares
1151     the execution environment and which is common for all CMS jobs.
1152     """
1153 ewv 1.160 txt = '\n#Written by cms_cmssw::wsSetupCMSOSGEnvironment_\n'
1154     txt += ' echo ">>> setup CMS OSG environment:"\n'
1155 fanzago 1.133 txt += ' echo "set SCRAM ARCH to ' + self.executable_arch + '"\n'
1156     txt += ' export SCRAM_ARCH='+self.executable_arch+'\n'
1157 fanzago 1.136 txt += ' echo "SCRAM_ARCH = $SCRAM_ARCH"\n'
1158 ewv 1.135 txt += ' if [ -f $OSG_APP/cmssoft/cms/cmsset_default.sh ] ;then\n'
1159 mkirn 1.40 txt += ' # Use $OSG_APP/cmssoft/cms/cmsset_default.sh to setup cms software\n'
1160 fanzago 1.133 txt += ' source $OSG_APP/cmssoft/cms/cmsset_default.sh '+self.version+'\n'
1161     txt += ' else\n'
1162 fanzago 1.161 txt += ' echo "ERROR ==> $OSG_APP/cmssoft/cms/cmsset_default.sh file not found"\n'
1163     txt += ' job_exit_code=10020\n'
1164     txt += ' func_exit\n'
1165 fanzago 1.133 txt += ' fi\n'
1166 gutsche 1.3 txt += '\n'
1167 fanzago 1.161 txt += ' echo "==> setup cms environment ok"\n'
1168 fanzago 1.136 txt += ' echo "SCRAM_ARCH = $SCRAM_ARCH"\n'
1169 gutsche 1.3
1170     return txt
1171 ewv 1.131
1172 gutsche 1.3 ### OLI_DANIELE
1173     def wsSetupCMSLCGEnvironment_(self):
1174     """
1175     Returns part of a job script which is prepares
1176     the execution environment and which is common for all CMS jobs.
1177     """
1178 ewv 1.160 txt = '\n#Written by cms_cmssw::wsSetupCMSLCGEnvironment_\n'
1179     txt += ' echo ">>> setup CMS LCG environment:"\n'
1180 fanzago 1.133 txt += ' echo "set SCRAM ARCH and BUILD_ARCH to ' + self.executable_arch + ' ###"\n'
1181     txt += ' export SCRAM_ARCH='+self.executable_arch+'\n'
1182     txt += ' export BUILD_ARCH='+self.executable_arch+'\n'
1183     txt += ' if [ ! $VO_CMS_SW_DIR ] ;then\n'
1184 fanzago 1.161 txt += ' echo "ERROR ==> CMS software dir not found on WN `hostname`"\n'
1185     txt += ' job_exit_code=10031\n'
1186     txt += ' func_exit\n'
1187 fanzago 1.133 txt += ' else\n'
1188     txt += ' echo "Sourcing environment... "\n'
1189     txt += ' if [ ! -s $VO_CMS_SW_DIR/cmsset_default.sh ] ;then\n'
1190 fanzago 1.161 txt += ' echo "ERROR ==> cmsset_default.sh file not found into dir $VO_CMS_SW_DIR"\n'
1191     txt += ' job_exit_code=10020\n'
1192     txt += ' func_exit\n'
1193 fanzago 1.133 txt += ' fi\n'
1194     txt += ' echo "sourcing $VO_CMS_SW_DIR/cmsset_default.sh"\n'
1195     txt += ' source $VO_CMS_SW_DIR/cmsset_default.sh\n'
1196     txt += ' result=$?\n'
1197     txt += ' if [ $result -ne 0 ]; then\n'
1198 fanzago 1.161 txt += ' echo "ERROR ==> problem sourcing $VO_CMS_SW_DIR/cmsset_default.sh"\n'
1199     txt += ' job_exit_code=10032\n'
1200     txt += ' func_exit\n'
1201 fanzago 1.133 txt += ' fi\n'
1202     txt += ' fi\n'
1203     txt += ' \n'
1204 fanzago 1.161 txt += ' echo "==> setup cms environment ok"\n'
1205 gutsche 1.3 return txt
1206 gutsche 1.5
1207 ewv 1.131 ### FEDE FOR DBS OUTPUT PUBLICATION
1208 fanzago 1.93 def modifyReport(self, nj):
1209     """
1210 ewv 1.131 insert the part of the script that modifies the FrameworkJob Report
1211 fanzago 1.93 """
1212 fanzago 1.94
1213 ewv 1.160 txt = '\n#Written by cms_cmssw::modifyReport\n'
1214 slacapra 1.176 publish_data = int(self.cfg_params.get('USER.publish_data',0))
1215 ewv 1.131 if (publish_data == 1):
1216 fanzago 1.94 processedDataset = self.cfg_params['USER.publish_data_name']
1217 fanzago 1.173 LFNBaseName = LFNBase(processedDataset)
1218 fanzago 1.175
1219     txt += 'if [ $copy_exit_status -eq 0 ]; then\n'
1220 fanzago 1.173 txt += ' FOR_LFN=%s_${PSETHASH}/\n'%(LFNBaseName)
1221 fanzago 1.175 txt += 'else\n'
1222     txt += ' FOR_LFN=/copy_problems/ \n'
1223     txt += ' SE=""\n'
1224     txt += ' SE_PATH=""\n'
1225     txt += 'fi\n'
1226    
1227     txt += 'echo ">>> Modify Job Report:" \n'
1228     txt += 'chmod a+x $SOFTWARE_DIR/ProdCommon/ProdCommon/FwkJobRep/ModifyJobReport.py\n'
1229     txt += 'ProcessedDataset='+processedDataset+'\n'
1230     txt += 'echo "ProcessedDataset = $ProcessedDataset"\n'
1231     txt += 'echo "SE = $SE"\n'
1232     txt += 'echo "SE_PATH = $SE_PATH"\n'
1233     txt += 'echo "FOR_LFN = $FOR_LFN" \n'
1234     txt += 'echo "CMSSW_VERSION = $CMSSW_VERSION"\n\n'
1235     txt += 'echo "$SOFTWARE_DIR/ProdCommon/ProdCommon/FwkJobRep/ModifyJobReport.py $RUNTIME_AREA/crab_fjr_$NJob.xml $NJob $FOR_LFN $PrimaryDataset $DataTier $ProcessedDataset $ApplicationFamily $executable $CMSSW_VERSION $PSETHASH $SE $SE_PATH"\n'
1236     txt += '$SOFTWARE_DIR/ProdCommon/ProdCommon/FwkJobRep/ModifyJobReport.py $RUNTIME_AREA/crab_fjr_$NJob.xml $NJob $FOR_LFN $PrimaryDataset $DataTier $ProcessedDataset $ApplicationFamily $executable $CMSSW_VERSION $PSETHASH $SE $SE_PATH\n'
1237     txt += 'modifyReport_result=$?\n'
1238     txt += 'if [ $modifyReport_result -ne 0 ]; then\n'
1239     txt += ' modifyReport_result=70500\n'
1240     txt += ' job_exit_code=$modifyReport_result\n'
1241     txt += ' echo "ModifyReportResult=$modifyReport_result" | tee -a $RUNTIME_AREA/$repo\n'
1242     txt += ' echo "WARNING: Problem with ModifyJobReport"\n'
1243     txt += 'else\n'
1244     txt += ' mv NewFrameworkJobReport.xml $RUNTIME_AREA/crab_fjr_$NJob.xml\n'
1245 spiga 1.103 txt += 'fi\n'
1246 fanzago 1.93 return txt
1247 fanzago 1.99
1248 gutsche 1.5 def setParam_(self, param, value):
1249     self._params[param] = value
1250    
1251     def getParams(self):
1252     return self._params
1253 gutsche 1.8
1254 gutsche 1.35 def uniquelist(self, old):
1255     """
1256     remove duplicates from a list
1257     """
1258     nd={}
1259     for e in old:
1260     nd[e]=0
1261     return nd.keys()
1262 mcinquil 1.121
1263 spiga 1.169 def outList(self):
1264 mcinquil 1.121 """
1265     check the dimension of the output files
1266     """
1267 spiga 1.169 txt = ''
1268     txt += 'echo ">>> list of expected files on output sandbox"\n'
1269 mcinquil 1.121 listOutFiles = []
1270 ewv 1.170 stdout = 'CMSSW_$NJob.stdout'
1271 spiga 1.169 stderr = 'CMSSW_$NJob.stderr'
1272 fanzago 1.148 if (self.return_data == 1):
1273 spiga 1.157 for file in (self.output_file+self.output_file_sandbox):
1274     listOutFiles.append(self.numberFile_(file, '$NJob'))
1275 spiga 1.169 listOutFiles.append(stdout)
1276     listOutFiles.append(stderr)
1277 ewv 1.156 else:
1278 spiga 1.157 for file in (self.output_file_sandbox):
1279     listOutFiles.append(self.numberFile_(file, '$NJob'))
1280 spiga 1.169 listOutFiles.append(stdout)
1281     listOutFiles.append(stderr)
1282 fanzago 1.161 txt += 'echo "output files: '+string.join(listOutFiles,' ')+'"\n'
1283 spiga 1.157 txt += 'filesToCheck="'+string.join(listOutFiles,' ')+'"\n'
1284 spiga 1.169 txt += 'export filesToCheck\n'
1285 ewv 1.170 return txt