ViewVC Help
View File | Revision Log | Show Annotations | Root Listing
root/cvsroot/COMP/CRAB/python/cms_cmssw.py
Revision: 1.175
Committed: Tue Apr 8 13:59:39 2008 UTC (17 years ago) by fanzago
Content type: text/x-python
Branch: MAIN
CVS Tags: CRAB_2_2_0_pre5
Changes since 1.174: +26 -21 lines
Log Message:
re-added the copy_problem LFN for data publication

File Contents

# User Rev Content
1 slacapra 1.1 from JobType import JobType
2     from crab_logger import Logger
3     from crab_exceptions import *
4     from crab_util import *
5 fanzago 1.115 from BlackWhiteListParser import BlackWhiteListParser
6 slacapra 1.1 import common
7     import Scram
8 fanzago 1.173 from LFNBaseName import *
9 slacapra 1.1
10 slacapra 1.105 import os, string, glob
11 slacapra 1.1
12     class Cmssw(JobType):
13 mcinquil 1.144 def __init__(self, cfg_params, ncjobs):
14 slacapra 1.1 JobType.__init__(self, 'CMSSW')
15     common.logger.debug(3,'CMSSW::__init__')
16    
17 mcinquil 1.140 self.argsList = []
18 mcinquil 1.144
19 gutsche 1.3 self._params = {}
20     self.cfg_params = cfg_params
21 fanzago 1.115 # init BlackWhiteListParser
22     self.blackWhiteListParser = BlackWhiteListParser(cfg_params)
23    
24 slacapra 1.153 self.MaxTarBallSize = float(self.cfg_params.get('EDG.maxtarballsize',9.5))
25 gutsche 1.72
26 gutsche 1.44 # number of jobs requested to be created, limit obj splitting
27 gutsche 1.38 self.ncjobs = ncjobs
28    
29 slacapra 1.1 log = common.logger
30 ewv 1.131
31 slacapra 1.1 self.scram = Scram.Scram(cfg_params)
32     self.additional_inbox_files = []
33     self.scriptExe = ''
34     self.executable = ''
35 slacapra 1.71 self.executable_arch = self.scram.getArch()
36 slacapra 1.1 self.tgz_name = 'default.tgz'
37 slacapra 1.97 self.additional_tgz_name = 'additional.tgz'
38 corvo 1.56 self.scriptName = 'CMSSW.sh'
39 ewv 1.131 self.pset = '' #scrip use case Da
40 spiga 1.42 self.datasetPath = '' #scrip use case Da
41 gutsche 1.3
42 gutsche 1.50 # set FJR file name
43     self.fjrFileName = 'crab_fjr.xml'
44    
45 slacapra 1.1 self.version = self.scram.getSWVersion()
46 ewv 1.131
47 spiga 1.114 #
48     # Try to block creation in case of arch/version mismatch
49     #
50    
51 spiga 1.162 # a = string.split(self.version, "_")
52     #
53     # if int(a[1]) == 1 and (int(a[2]) < 5 and self.executable_arch.find('slc4') == 0):
54     # msg = "Warning: You are using %s version of CMSSW with %s architecture. \n--> Did you compile your libraries with SLC3? Otherwise you can find some problems running on SLC4 Grid nodes.\n"%(self.version, self.executable_arch)
55     # common.logger.message(msg)
56     # if int(a[1]) == 1 and (int(a[2]) >= 5 and self.executable_arch.find('slc3') == 0):
57     # msg = "Error: CMS does not support %s with %s architecture"%(self.version, self.executable_arch)
58     # raise CrabException(msg)
59     #
60 ewv 1.170
61 gutsche 1.5 self.setParam_('application', self.version)
62 slacapra 1.47
63 slacapra 1.1 ### collect Data cards
64 gutsche 1.66
65 slacapra 1.153 if not cfg_params.has_key('CMSSW.datasetpath'):
66 ewv 1.131 msg = "Error: datasetpath not defined "
67 slacapra 1.1 raise CrabException(msg)
68 slacapra 1.153 tmp = cfg_params['CMSSW.datasetpath']
69     log.debug(6, "CMSSW::CMSSW(): datasetPath = "+tmp)
70     if string.lower(tmp)=='none':
71     self.datasetPath = None
72     self.selectNoInput = 1
73     else:
74     self.datasetPath = tmp
75     self.selectNoInput = 0
76 gutsche 1.5
77     # ML monitoring
78     # split dataset path style: /PreProdR3Minbias/SIM/GEN-SIM
79 slacapra 1.9 if not self.datasetPath:
80     self.setParam_('dataset', 'None')
81     self.setParam_('owner', 'None')
82     else:
83 slacapra 1.153 ## SL what is supposed to fail here?
84 gutsche 1.92 try:
85     datasetpath_split = self.datasetPath.split("/")
86     # standard style
87 mcinquil 1.120 self.setParam_('datasetFull', self.datasetPath)
88 slacapra 1.137 self.setParam_('dataset', datasetpath_split[1])
89     self.setParam_('owner', datasetpath_split[2])
90 gutsche 1.92 except:
91     self.setParam_('dataset', self.datasetPath)
92     self.setParam_('owner', self.datasetPath)
93 ewv 1.131
94 spiga 1.162 self.setParam_('taskId', common._db.queryTask('name')) ## new BL--DS
95 gutsche 1.5
96 slacapra 1.1 self.dataTiers = []
97    
98     ## now the application
99 slacapra 1.153 self.executable = cfg_params.get('CMSSW.executable','cmsRun')
100     self.setParam_('exe', self.executable)
101     log.debug(6, "CMSSW::CMSSW(): executable = "+self.executable)
102 slacapra 1.1
103 slacapra 1.153 if not cfg_params.has_key('CMSSW.pset'):
104 slacapra 1.1 raise CrabException("PSet file missing. Cannot run cmsRun ")
105 slacapra 1.153 self.pset = cfg_params['CMSSW.pset']
106     log.debug(6, "Cmssw::Cmssw(): PSet file = "+self.pset)
107     if self.pset.lower() != 'none' :
108     if (not os.path.exists(self.pset)):
109     raise CrabException("User defined PSet file "+self.pset+" does not exist")
110     else:
111     self.pset = None
112 slacapra 1.1
113     # output files
114 slacapra 1.53 ## stuff which must be returned always via sandbox
115     self.output_file_sandbox = []
116    
117     # add fjr report by default via sandbox
118     self.output_file_sandbox.append(self.fjrFileName)
119    
120     # other output files to be returned via sandbox or copied to SE
121 slacapra 1.153 self.output_file = []
122     tmp = cfg_params.get('CMSSW.output_file',None)
123     if tmp :
124     tmpOutFiles = string.split(tmp,',')
125     log.debug(7, 'cmssw::cmssw(): output files '+str(tmpOutFiles))
126     for tmp in tmpOutFiles:
127     tmp=string.strip(tmp)
128     self.output_file.append(tmp)
129 slacapra 1.1 pass
130 slacapra 1.153 else:
131 gutsche 1.92 log.message("No output file defined: only stdout/err and the CRAB Framework Job Report will be available\n")
132 slacapra 1.153 pass
133 slacapra 1.1
134     # script_exe file as additional file in inputSandbox
135 slacapra 1.153 self.scriptExe = cfg_params.get('USER.script_exe',None)
136     if self.scriptExe :
137     if not os.path.isfile(self.scriptExe):
138     msg ="ERROR. file "+self.scriptExe+" not found"
139     raise CrabException(msg)
140     self.additional_inbox_files.append(string.strip(self.scriptExe))
141 slacapra 1.70
142 spiga 1.42 #CarlosDaniele
143     if self.datasetPath == None and self.pset == None and self.scriptExe == '' :
144 slacapra 1.70 msg ="Error. script_exe not defined"
145 spiga 1.42 raise CrabException(msg)
146    
147 slacapra 1.1 ## additional input files
148 slacapra 1.153 if cfg_params.has_key('USER.additional_input_files'):
149 slacapra 1.29 tmpAddFiles = string.split(cfg_params['USER.additional_input_files'],',')
150 slacapra 1.70 for tmp in tmpAddFiles:
151     tmp = string.strip(tmp)
152     dirname = ''
153     if not tmp[0]=="/": dirname = "."
154 corvo 1.85 files = []
155     if string.find(tmp,"*")>-1:
156     files = glob.glob(os.path.join(dirname, tmp))
157     if len(files)==0:
158     raise CrabException("No additional input file found with this pattern: "+tmp)
159     else:
160     files.append(tmp)
161 slacapra 1.70 for file in files:
162     if not os.path.exists(file):
163     raise CrabException("Additional input file not found: "+file)
164 slacapra 1.45 pass
165 slacapra 1.105 # fname = string.split(file, '/')[-1]
166     # storedFile = common.work_space.pathForTgz()+'share/'+fname
167     # shutil.copyfile(file, storedFile)
168     self.additional_inbox_files.append(string.strip(file))
169 slacapra 1.1 pass
170     pass
171 slacapra 1.70 common.logger.debug(5,"Additional input files: "+str(self.additional_inbox_files))
172 slacapra 1.153 pass
173 gutsche 1.3
174 slacapra 1.9 ## Events per job
175 slacapra 1.153 if cfg_params.has_key('CMSSW.events_per_job'):
176 slacapra 1.10 self.eventsPerJob =int( cfg_params['CMSSW.events_per_job'])
177 slacapra 1.9 self.selectEventsPerJob = 1
178 slacapra 1.153 else:
179 slacapra 1.9 self.eventsPerJob = -1
180     self.selectEventsPerJob = 0
181 ewv 1.131
182 slacapra 1.22 ## number of jobs
183 slacapra 1.153 if cfg_params.has_key('CMSSW.number_of_jobs'):
184 slacapra 1.22 self.theNumberOfJobs =int( cfg_params['CMSSW.number_of_jobs'])
185     self.selectNumberOfJobs = 1
186 slacapra 1.153 else:
187 slacapra 1.22 self.theNumberOfJobs = 0
188     self.selectNumberOfJobs = 0
189 slacapra 1.10
190 slacapra 1.153 if cfg_params.has_key('CMSSW.total_number_of_events'):
191 gutsche 1.35 self.total_number_of_events = int(cfg_params['CMSSW.total_number_of_events'])
192     self.selectTotalNumberEvents = 1
193 slacapra 1.153 else:
194 gutsche 1.35 self.total_number_of_events = 0
195     self.selectTotalNumberEvents = 0
196    
197 ewv 1.131 if self.pset != None: #CarlosDaniele
198 spiga 1.42 if ( (self.selectTotalNumberEvents + self.selectEventsPerJob + self.selectNumberOfJobs) != 2 ):
199     msg = 'Must define exactly two of total_number_of_events, events_per_job, or number_of_jobs.'
200     raise CrabException(msg)
201     else:
202     if (self.selectNumberOfJobs == 0):
203     msg = 'Must specify number_of_jobs.'
204     raise CrabException(msg)
205 gutsche 1.35
206 ewv 1.160 ## New method of dealing with seeds
207     self.incrementSeeds = []
208     self.preserveSeeds = []
209     if cfg_params.has_key('CMSSW.preserve_seeds'):
210     tmpList = cfg_params['CMSSW.preserve_seeds'].split(',')
211     for tmp in tmpList:
212     tmp.strip()
213     self.preserveSeeds.append(tmp)
214     if cfg_params.has_key('CMSSW.increment_seeds'):
215     tmpList = cfg_params['CMSSW.increment_seeds'].split(',')
216     for tmp in tmpList:
217     tmp.strip()
218     self.incrementSeeds.append(tmp)
219    
220     ## Old method of dealing with seeds
221     ## FUTURE: This is for old CMSSW and old CRAB. Can throw exceptions after a couple of CRAB releases and then
222     ## remove
223 slacapra 1.153 self.sourceSeed = cfg_params.get('CMSSW.pythia_seed',None)
224 ewv 1.160 if self.sourceSeed:
225     print "pythia_seed is a deprecated parameter. Use preserve_seeds or increment_seeds in the future.\n","Added to increment_seeds."
226     self.incrementSeeds.append('sourceSeed')
227 slacapra 1.153
228     self.sourceSeedVtx = cfg_params.get('CMSSW.vtx_seed',None)
229 ewv 1.160 if self.sourceSeedVtx:
230     print "vtx_seed is a deprecated parameter. Use preserve_seeds or increment_seeds in the future.\n","Added to increment_seeds."
231     self.incrementSeeds.append('VtxSmeared')
232 slacapra 1.22
233 slacapra 1.153 self.sourceSeedG4 = cfg_params.get('CMSSW.g4_seed',None)
234 ewv 1.160 if self.sourceSeedG4:
235     print "g4_seed is a deprecated parameter. Use preserve_seeds or increment_seeds in the future.\n","Added to increment_seeds."
236     self.incrementSeeds.append('g4SimHits')
237 slacapra 1.90
238 slacapra 1.153 self.sourceSeedMix = cfg_params.get('CMSSW.mix_seed',None)
239 ewv 1.160 if self.sourceSeedMix:
240     print "mix_seed is a deprecated parameter. Use preserve_seeds or increment_seeds in the future.\n","Added to increment_seeds."
241     self.incrementSeeds.append('mix')
242 slacapra 1.90
243 slacapra 1.153 self.firstRun = cfg_params.get('CMSSW.first_run',None)
244 slacapra 1.90
245 spiga 1.42 if self.pset != None: #CarlosDaniele
246 ewv 1.131 import PsetManipulator as pp
247 slacapra 1.97 PsetEdit = pp.PsetManipulator(self.pset) #Daniele Pset
248 gutsche 1.3
249 ewv 1.147 # Copy/return
250    
251 slacapra 1.153 self.copy_data = int(cfg_params.get('USER.copy_data',0))
252     self.return_data = int(cfg_params.get('USER.return_data',0))
253 ewv 1.147
254 slacapra 1.1 #DBSDLS-start
255 ewv 1.131 ## Initialize the variables that are extracted from DBS/DLS and needed in other places of the code
256 slacapra 1.1 self.maxEvents=0 # max events available ( --> check the requested nb. of evts in Creator.py)
257     self.DBSPaths={} # all dbs paths requested ( --> input to the site local discovery script)
258 gutsche 1.35 self.jobDestination=[] # Site destination(s) for each job (list of lists)
259 slacapra 1.1 ## Perform the data location and discovery (based on DBS/DLS)
260 slacapra 1.9 ## SL: Don't if NONE is specified as input (pythia use case)
261 gutsche 1.35 blockSites = {}
262 slacapra 1.9 if self.datasetPath:
263 gutsche 1.35 blockSites = self.DataDiscoveryAndLocation(cfg_params)
264 ewv 1.131 #DBSDLS-end
265 slacapra 1.1
266     self.tgzNameWithPath = self.getTarBall(self.executable)
267 ewv 1.131
268 slacapra 1.9 ## Select Splitting
269 ewv 1.131 if self.selectNoInput:
270 spiga 1.42 if self.pset == None: #CarlosDaniele
271     self.jobSplittingForScript()
272     else:
273     self.jobSplittingNoInput()
274 gutsche 1.92 else:
275 corvo 1.56 self.jobSplittingByBlocks(blockSites)
276 gutsche 1.5
277 slacapra 1.22 # modify Pset
278 spiga 1.42 if self.pset != None: #CarlosDaniele
279 slacapra 1.86 try:
280 ewv 1.160 # Add FrameworkJobReport to parameter-set, set max events.
281     # Reset later for data jobs by writeCFG which does all modifications
282 slacapra 1.90 PsetEdit.addCrabFJR(self.fjrFileName)
283 ewv 1.160 PsetEdit.maxEvent(self.eventsPerJob)
284 slacapra 1.90 PsetEdit.psetWriter(self.configFilename())
285 slacapra 1.86 except:
286     msg='Error while manipuliating ParameterSet: exiting...'
287     raise CrabException(msg)
288 gutsche 1.3
289 slacapra 1.1 def DataDiscoveryAndLocation(self, cfg_params):
290    
291 slacapra 1.86 import DataDiscovery
292     import DataLocation
293 gutsche 1.3 common.logger.debug(10,"CMSSW::DataDiscoveryAndLocation()")
294    
295     datasetPath=self.datasetPath
296    
297 slacapra 1.1 ## Contact the DBS
298 gutsche 1.92 common.logger.message("Contacting Data Discovery Services ...")
299 slacapra 1.1 try:
300 slacapra 1.137 self.pubdata=DataDiscovery.DataDiscovery(datasetPath, cfg_params)
301 slacapra 1.1 self.pubdata.fetchDBSInfo()
302    
303 slacapra 1.41 except DataDiscovery.NotExistingDatasetError, ex :
304 slacapra 1.1 msg = 'ERROR ***: failed Data Discovery in DBS : %s'%ex.getErrorMessage()
305     raise CrabException(msg)
306 slacapra 1.41 except DataDiscovery.NoDataTierinProvenanceError, ex :
307 slacapra 1.1 msg = 'ERROR ***: failed Data Discovery in DBS : %s'%ex.getErrorMessage()
308     raise CrabException(msg)
309 slacapra 1.41 except DataDiscovery.DataDiscoveryError, ex:
310 gutsche 1.66 msg = 'ERROR ***: failed Data Discovery in DBS : %s'%ex.getErrorMessage()
311 slacapra 1.1 raise CrabException(msg)
312    
313 gutsche 1.35 self.filesbyblock=self.pubdata.getFiles()
314 mkirn 1.37 self.eventsbyblock=self.pubdata.getEventsPerBlock()
315     self.eventsbyfile=self.pubdata.getEventsPerFile()
316 gutsche 1.3
317 slacapra 1.1 ## get max number of events
318 ewv 1.131 self.maxEvents=self.pubdata.getMaxEvents() ## self.maxEvents used in Creator.py
319 slacapra 1.1
320     ## Contact the DLS and build a list of sites hosting the fileblocks
321     try:
322 slacapra 1.41 dataloc=DataLocation.DataLocation(self.filesbyblock.keys(),cfg_params)
323 gutsche 1.6 dataloc.fetchDLSInfo()
324 slacapra 1.41 except DataLocation.DataLocationError , ex:
325 slacapra 1.1 msg = 'ERROR ***: failed Data Location in DLS \n %s '%ex.getErrorMessage()
326     raise CrabException(msg)
327 ewv 1.131
328 slacapra 1.1
329 gutsche 1.35 sites = dataloc.getSites()
330     allSites = []
331     listSites = sites.values()
332 slacapra 1.63 for listSite in listSites:
333     for oneSite in listSite:
334 gutsche 1.35 allSites.append(oneSite)
335     allSites = self.uniquelist(allSites)
336 gutsche 1.3
337 gutsche 1.92 # screen output
338     common.logger.message("Requested dataset: " + datasetPath + " has " + str(self.maxEvents) + " events in " + str(len(self.filesbyblock.keys())) + " blocks.\n")
339    
340 gutsche 1.35 return sites
341 ewv 1.131
342 ewv 1.170 # to Be Removed DS -- BL
343 spiga 1.165 # def setArgsList(self, argsList):
344     # self.argsList = argsList
345 mcinquil 1.140
346 gutsche 1.35 def jobSplittingByBlocks(self, blockSites):
347 slacapra 1.9 """
348 gutsche 1.35 Perform job splitting. Jobs run over an integer number of files
349     and no more than one block.
350     ARGUMENT: blockSites: dictionary with blocks as keys and list of host sites as values
351     REQUIRES: self.selectTotalNumberEvents, self.selectEventsPerJob, self.selectNumberofJobs,
352     self.total_number_of_events, self.eventsPerJob, self.theNumberOfJobs,
353     self.maxEvents, self.filesbyblock
354     SETS: self.jobDestination - Site destination(s) for each job (a list of lists)
355     self.total_number_of_jobs - Total # of jobs
356     self.list_of_args - File(s) job will run on (a list of lists)
357     """
358    
359     # ---- Handle the possible job splitting configurations ---- #
360     if (self.selectTotalNumberEvents):
361     totalEventsRequested = self.total_number_of_events
362     if (self.selectEventsPerJob):
363     eventsPerJobRequested = self.eventsPerJob
364     if (self.selectNumberOfJobs):
365     totalEventsRequested = self.theNumberOfJobs * self.eventsPerJob
366    
367     # If user requested all the events in the dataset
368     if (totalEventsRequested == -1):
369     eventsRemaining=self.maxEvents
370     # If user requested more events than are in the dataset
371     elif (totalEventsRequested > self.maxEvents):
372     eventsRemaining = self.maxEvents
373     common.logger.message("Requested "+str(self.total_number_of_events)+ " events, but only "+str(self.maxEvents)+" events are available.")
374     # If user requested less events than are in the dataset
375     else:
376     eventsRemaining = totalEventsRequested
377 slacapra 1.22
378 slacapra 1.41 # If user requested more events per job than are in the dataset
379     if (self.selectEventsPerJob and eventsPerJobRequested > self.maxEvents):
380     eventsPerJobRequested = self.maxEvents
381    
382 gutsche 1.35 # For user info at end
383     totalEventCount = 0
384 gutsche 1.3
385 gutsche 1.35 if (self.selectTotalNumberEvents and self.selectNumberOfJobs):
386     eventsPerJobRequested = int(eventsRemaining/self.theNumberOfJobs)
387 slacapra 1.22
388 gutsche 1.35 if (self.selectNumberOfJobs):
389     common.logger.message("May not create the exact number_of_jobs requested.")
390 slacapra 1.23
391 gutsche 1.38 if ( self.ncjobs == 'all' ) :
392     totalNumberOfJobs = 999999999
393     else :
394     totalNumberOfJobs = self.ncjobs
395 ewv 1.131
396 gutsche 1.38
397 gutsche 1.35 blocks = blockSites.keys()
398     blockCount = 0
399     # Backup variable in case self.maxEvents counted events in a non-included block
400     numBlocksInDataset = len(blocks)
401 gutsche 1.3
402 gutsche 1.35 jobCount = 0
403     list_of_lists = []
404 gutsche 1.3
405 gutsche 1.92 # list tracking which jobs are in which jobs belong to which block
406     jobsOfBlock = {}
407    
408 gutsche 1.35 # ---- Iterate over the blocks in the dataset until ---- #
409     # ---- we've met the requested total # of events ---- #
410 gutsche 1.38 while ( (eventsRemaining > 0) and (blockCount < numBlocksInDataset) and (jobCount < totalNumberOfJobs)):
411 gutsche 1.35 block = blocks[blockCount]
412 gutsche 1.44 blockCount += 1
413 gutsche 1.104 if block not in jobsOfBlock.keys() :
414     jobsOfBlock[block] = []
415 ewv 1.131
416 gutsche 1.68 if self.eventsbyblock.has_key(block) :
417     numEventsInBlock = self.eventsbyblock[block]
418     common.logger.debug(5,'Events in Block File '+str(numEventsInBlock))
419 ewv 1.131
420 gutsche 1.68 files = self.filesbyblock[block]
421     numFilesInBlock = len(files)
422     if (numFilesInBlock <= 0):
423     continue
424     fileCount = 0
425    
426     # ---- New block => New job ---- #
427 ewv 1.131 parString = ""
428 gutsche 1.68 # counter for number of events in files currently worked on
429     filesEventCount = 0
430     # flag if next while loop should touch new file
431     newFile = 1
432     # job event counter
433     jobSkipEventCount = 0
434 ewv 1.131
435 gutsche 1.68 # ---- Iterate over the files in the block until we've met the requested ---- #
436     # ---- total # of events or we've gone over all the files in this block ---- #
437     while ( (eventsRemaining > 0) and (fileCount < numFilesInBlock) and (jobCount < totalNumberOfJobs) ):
438     file = files[fileCount]
439     if newFile :
440     try:
441     numEventsInFile = self.eventsbyfile[file]
442     common.logger.debug(6, "File "+str(file)+" has "+str(numEventsInFile)+" events")
443     # increase filesEventCount
444     filesEventCount += numEventsInFile
445     # Add file to current job
446     parString += '\\\"' + file + '\\\"\,'
447     newFile = 0
448     except KeyError:
449     common.logger.message("File "+str(file)+" has unknown number of events: skipping")
450 ewv 1.131
451 gutsche 1.38
452 gutsche 1.68 # if less events in file remain than eventsPerJobRequested
453     if ( filesEventCount - jobSkipEventCount < eventsPerJobRequested ) :
454     # if last file in block
455     if ( fileCount == numFilesInBlock-1 ) :
456     # end job using last file, use remaining events in block
457     # close job and touch new file
458     fullString = parString[:-2]
459     list_of_lists.append([fullString,str(-1),str(jobSkipEventCount)])
460     common.logger.debug(3,"Job "+str(jobCount+1)+" can run over "+str(filesEventCount - jobSkipEventCount)+" events (last file in block).")
461     self.jobDestination.append(blockSites[block])
462     common.logger.debug(5,"Job "+str(jobCount+1)+" Destination: "+str(self.jobDestination[jobCount]))
463 gutsche 1.92 # fill jobs of block dictionary
464 gutsche 1.104 jobsOfBlock[block].append(jobCount+1)
465 gutsche 1.68 # reset counter
466     jobCount = jobCount + 1
467     totalEventCount = totalEventCount + filesEventCount - jobSkipEventCount
468     eventsRemaining = eventsRemaining - filesEventCount + jobSkipEventCount
469     jobSkipEventCount = 0
470     # reset file
471 ewv 1.131 parString = ""
472 gutsche 1.68 filesEventCount = 0
473     newFile = 1
474     fileCount += 1
475     else :
476     # go to next file
477     newFile = 1
478     fileCount += 1
479     # if events in file equal to eventsPerJobRequested
480     elif ( filesEventCount - jobSkipEventCount == eventsPerJobRequested ) :
481 gutsche 1.38 # close job and touch new file
482     fullString = parString[:-2]
483 gutsche 1.68 list_of_lists.append([fullString,str(eventsPerJobRequested),str(jobSkipEventCount)])
484     common.logger.debug(3,"Job "+str(jobCount+1)+" can run over "+str(eventsPerJobRequested)+" events.")
485 gutsche 1.38 self.jobDestination.append(blockSites[block])
486     common.logger.debug(5,"Job "+str(jobCount+1)+" Destination: "+str(self.jobDestination[jobCount]))
487 gutsche 1.104 jobsOfBlock[block].append(jobCount+1)
488 gutsche 1.38 # reset counter
489     jobCount = jobCount + 1
490 gutsche 1.68 totalEventCount = totalEventCount + eventsPerJobRequested
491     eventsRemaining = eventsRemaining - eventsPerJobRequested
492 gutsche 1.38 jobSkipEventCount = 0
493     # reset file
494 ewv 1.131 parString = ""
495 gutsche 1.38 filesEventCount = 0
496     newFile = 1
497     fileCount += 1
498 ewv 1.131
499 gutsche 1.68 # if more events in file remain than eventsPerJobRequested
500 gutsche 1.38 else :
501 gutsche 1.68 # close job but don't touch new file
502     fullString = parString[:-2]
503     list_of_lists.append([fullString,str(eventsPerJobRequested),str(jobSkipEventCount)])
504     common.logger.debug(3,"Job "+str(jobCount+1)+" can run over "+str(eventsPerJobRequested)+" events.")
505     self.jobDestination.append(blockSites[block])
506     common.logger.debug(5,"Job "+str(jobCount+1)+" Destination: "+str(self.jobDestination[jobCount]))
507 gutsche 1.104 jobsOfBlock[block].append(jobCount+1)
508 gutsche 1.68 # increase counter
509     jobCount = jobCount + 1
510     totalEventCount = totalEventCount + eventsPerJobRequested
511     eventsRemaining = eventsRemaining - eventsPerJobRequested
512     # calculate skip events for last file
513     # use filesEventCount (contains several files), jobSkipEventCount and eventsPerJobRequest
514     jobSkipEventCount = eventsPerJobRequested - (filesEventCount - jobSkipEventCount - self.eventsbyfile[file])
515     # remove all but the last file
516     filesEventCount = self.eventsbyfile[file]
517 ewv 1.160 parString = '\\\"' + file + '\\\"\,'
518 gutsche 1.68 pass # END if
519     pass # END while (iterate over files in the block)
520 gutsche 1.35 pass # END while (iterate over blocks in the dataset)
521 slacapra 1.41 self.ncjobs = self.total_number_of_jobs = jobCount
522 gutsche 1.38 if (eventsRemaining > 0 and jobCount < totalNumberOfJobs ):
523 gutsche 1.35 common.logger.message("Could not run on all requested events because some blocks not hosted at allowed sites.")
524 gutsche 1.92 common.logger.message(str(jobCount)+" job(s) can run on "+str(totalEventCount)+" events.\n")
525 ewv 1.131
526 gutsche 1.92 # screen output
527     screenOutput = "List of jobs and available destination sites:\n\n"
528    
529 mcinquil 1.124 # keep trace of block with no sites to print a warning at the end
530     noSiteBlock = []
531     bloskNoSite = []
532    
533 gutsche 1.92 blockCounter = 0
534 gutsche 1.104 for block in blocks:
535     if block in jobsOfBlock.keys() :
536     blockCounter += 1
537 fanzago 1.115 screenOutput += "Block %5i: jobs %20s: sites: %s\n" % (blockCounter,spanRanges(jobsOfBlock[block]),','.join(self.blackWhiteListParser.checkWhiteList(self.blackWhiteListParser.checkBlackList(blockSites[block],block),block)))
538 mcinquil 1.124 if len(self.blackWhiteListParser.checkWhiteList(self.blackWhiteListParser.checkBlackList(blockSites[block],block),block)) == 0:
539 ewv 1.131 noSiteBlock.append( spanRanges(jobsOfBlock[block]) )
540 mcinquil 1.124 bloskNoSite.append( blockCounter )
541 ewv 1.131
542 mcinquil 1.124 common.logger.message(screenOutput)
543 fanzago 1.127 if len(noSiteBlock) > 0 and len(bloskNoSite) > 0:
544 mcinquil 1.126 msg = 'WARNING: No sites are hosting any part of data for block:\n '
545     virgola = ""
546     if len(bloskNoSite) > 1:
547     virgola = ","
548     for block in bloskNoSite:
549     msg += ' ' + str(block) + virgola
550     msg += '\n Related jobs:\n '
551     virgola = ""
552     if len(noSiteBlock) > 1:
553     virgola = ","
554     for range_jobs in noSiteBlock:
555     msg += str(range_jobs) + virgola
556     msg += '\n will not be submitted and this block of data can not be analyzed!\n'
557 slacapra 1.155 if self.cfg_params.has_key('EDG.se_white_list'):
558     msg += 'WARNING: SE White List: '+self.cfg_params['EDG.se_white_list']+'\n'
559     msg += '(Hint: By whitelisting you force the job to run at this particular site(s).\n'
560     msg += 'Please check if the dataset is available at this site!)\n'
561     if self.cfg_params.has_key('EDG.ce_white_list'):
562     msg += 'WARNING: CE White List: '+self.cfg_params['EDG.ce_white_list']+'\n'
563     msg += '(Hint: By whitelisting you force the job to run at this particular site(s).\n'
564     msg += 'Please check if the dataset is available at this site!)\n'
565    
566 mcinquil 1.126 common.logger.message(msg)
567 gutsche 1.92
568 slacapra 1.9 self.list_of_args = list_of_lists
569     return
570    
571 slacapra 1.21 def jobSplittingNoInput(self):
572 slacapra 1.9 """
573     Perform job splitting based on number of event per job
574     """
575     common.logger.debug(5,'Splitting per events')
576 fanzago 1.130
577 ewv 1.131 if (self.selectEventsPerJob):
578 fanzago 1.130 common.logger.message('Required '+str(self.eventsPerJob)+' events per job ')
579     if (self.selectNumberOfJobs):
580     common.logger.message('Required '+str(self.theNumberOfJobs)+' jobs in total ')
581     if (self.selectTotalNumberEvents):
582     common.logger.message('Required '+str(self.total_number_of_events)+' events in total ')
583 slacapra 1.9
584 slacapra 1.10 if (self.total_number_of_events < 0):
585     msg='Cannot split jobs per Events with "-1" as total number of events'
586     raise CrabException(msg)
587    
588 slacapra 1.22 if (self.selectEventsPerJob):
589 spiga 1.65 if (self.selectTotalNumberEvents):
590     self.total_number_of_jobs = int(self.total_number_of_events/self.eventsPerJob)
591 ewv 1.131 elif(self.selectNumberOfJobs) :
592 spiga 1.65 self.total_number_of_jobs =self.theNumberOfJobs
593 ewv 1.131 self.total_number_of_events =int(self.theNumberOfJobs*self.eventsPerJob)
594 spiga 1.65
595 slacapra 1.22 elif (self.selectNumberOfJobs) :
596     self.total_number_of_jobs = self.theNumberOfJobs
597     self.eventsPerJob = int(self.total_number_of_events/self.total_number_of_jobs)
598 ewv 1.131
599 slacapra 1.9 common.logger.debug(5,'N jobs '+str(self.total_number_of_jobs))
600    
601     # is there any remainder?
602     check = int(self.total_number_of_events) - (int(self.total_number_of_jobs)*self.eventsPerJob)
603    
604     common.logger.debug(5,'Check '+str(check))
605    
606 gutsche 1.35 common.logger.message(str(self.total_number_of_jobs)+' jobs can be created, each for '+str(self.eventsPerJob)+' for a total of '+str(self.total_number_of_jobs*self.eventsPerJob)+' events')
607 slacapra 1.9 if check > 0:
608 gutsche 1.35 common.logger.message('Warning: asked '+str(self.total_number_of_events)+' but can do only '+str(int(self.total_number_of_jobs)*self.eventsPerJob))
609 slacapra 1.9
610 slacapra 1.10 # argument is seed number.$i
611 slacapra 1.9 self.list_of_args = []
612     for i in range(self.total_number_of_jobs):
613 gutsche 1.35 ## Since there is no input, any site is good
614 ewv 1.131 self.jobDestination.append([""]) #must be empty to write correctly the xml
615 slacapra 1.90 args=[]
616 spiga 1.57 if (self.firstRun):
617 slacapra 1.138 ## pythia first run
618 slacapra 1.90 args.append(str(self.firstRun)+str(i))
619     self.list_of_args.append(args)
620 ewv 1.131
621 gutsche 1.3 return
622    
623 spiga 1.42
624     def jobSplittingForScript(self):#CarlosDaniele
625     """
626     Perform job splitting based on number of job
627     """
628     common.logger.debug(5,'Splitting per job')
629     common.logger.message('Required '+str(self.theNumberOfJobs)+' jobs in total ')
630    
631     self.total_number_of_jobs = self.theNumberOfJobs
632    
633     common.logger.debug(5,'N jobs '+str(self.total_number_of_jobs))
634    
635     common.logger.message(str(self.total_number_of_jobs)+' jobs can be created')
636    
637     # argument is seed number.$i
638     self.list_of_args = []
639     for i in range(self.total_number_of_jobs):
640     ## Since there is no input, any site is good
641     # self.jobDestination.append(["Any"])
642     self.jobDestination.append([""])
643     ## no random seed
644     self.list_of_args.append([str(i)])
645     return
646    
647 gutsche 1.3 def split(self, jobParams):
648 ewv 1.131
649 gutsche 1.3 #### Fabio
650     njobs = self.total_number_of_jobs
651 slacapra 1.9 arglist = self.list_of_args
652 gutsche 1.3 # create the empty structure
653     for i in range(njobs):
654     jobParams.append("")
655 ewv 1.131
656 spiga 1.165 listID=[]
657     listField=[]
658 gutsche 1.3 for job in range(njobs):
659 slacapra 1.17 jobParams[job] = arglist[job]
660 spiga 1.167 listID.append(job+1)
661 spiga 1.162 job_ToSave ={}
662 spiga 1.169 concString = ' '
663 spiga 1.165 argu=''
664     if len(jobParams[job]):
665     argu += concString.join(jobParams[job] )
666 spiga 1.169 job_ToSave['arguments']= str(job+1)+' '+argu## new BL--DS
667 spiga 1.162 job_ToSave['dlsDestination']= self.jobDestination[job]## new BL--DS
668 spiga 1.165 #common._db.updateJob_(job,job_ToSave)## new BL--DS
669     listField.append(job_ToSave)
670 spiga 1.169 msg="Job "+str(job)+" Arguments: "+str(job+1)+" "+argu+"\n" \
671 spiga 1.165 +" Destination: "+str(self.jobDestination[job])
672     common.logger.debug(5,msg)
673     #common.logger.debug(5,"Job "+str(job)+" Destination: "+str(self.jobDestination[job]))
674     common._db.updateJob_(listID,listField)## new BL--DS
675     ## Pay Attention Here....DS--BL
676 ewv 1.170 self.argsList = (len(jobParams[1])+1)
677 gutsche 1.3
678     return
679 spiga 1.165 #
680     # def getJobTypeArguments(self, nj, sched):
681     # result = ''
682     # jobs=[]
683 ewv 1.170 # jobs.append(nj)
684 spiga 1.165 # for i in common._db.queryJob('arguments',jobs):## BL--DS
685     # result=result+str(i)+" "
686     # return result
687 ewv 1.131
688 gutsche 1.3 def numberOfJobs(self):
689     # Fabio
690     return self.total_number_of_jobs
691    
692 slacapra 1.1 def getTarBall(self, exe):
693     """
694     Return the TarBall with lib and exe
695     """
696 ewv 1.131
697 slacapra 1.1 # if it exist, just return it
698 corvo 1.56 #
699     # Marco. Let's start to use relative path for Boss XML files
700     #
701     self.tgzNameWithPath = common.work_space.pathForTgz()+'share/'+self.tgz_name
702 slacapra 1.1 if os.path.exists(self.tgzNameWithPath):
703     return self.tgzNameWithPath
704    
705     # Prepare a tar gzipped file with user binaries.
706     self.buildTar_(exe)
707    
708     return string.strip(self.tgzNameWithPath)
709    
710     def buildTar_(self, executable):
711    
712     # First of all declare the user Scram area
713     swArea = self.scram.getSWArea_()
714     #print "swArea = ", swArea
715 slacapra 1.63 # swVersion = self.scram.getSWVersion()
716     # print "swVersion = ", swVersion
717 slacapra 1.1 swReleaseTop = self.scram.getReleaseTop_()
718     #print "swReleaseTop = ", swReleaseTop
719 ewv 1.131
720 slacapra 1.1 ## check if working area is release top
721     if swReleaseTop == '' or swArea == swReleaseTop:
722 afanfani 1.172 common.logger.debug(3,"swArea = "+swArea+" swReleaseTop ="+swReleaseTop)
723 slacapra 1.1 return
724    
725 slacapra 1.61 import tarfile
726     try: # create tar ball
727     tar = tarfile.open(self.tgzNameWithPath, "w:gz")
728     ## First find the executable
729 slacapra 1.86 if (self.executable != ''):
730 slacapra 1.61 exeWithPath = self.scram.findFile_(executable)
731     if ( not exeWithPath ):
732     raise CrabException('User executable '+executable+' not found')
733 ewv 1.131
734 slacapra 1.61 ## then check if it's private or not
735     if exeWithPath.find(swReleaseTop) == -1:
736     # the exe is private, so we must ship
737     common.logger.debug(5,"Exe "+exeWithPath+" to be tarred")
738     path = swArea+'/'
739 corvo 1.85 # distinguish case when script is in user project area or given by full path somewhere else
740     if exeWithPath.find(path) >= 0 :
741     exe = string.replace(exeWithPath, path,'')
742 slacapra 1.129 tar.add(path+exe,exe)
743 corvo 1.85 else :
744     tar.add(exeWithPath,os.path.basename(executable))
745 slacapra 1.61 pass
746     else:
747     # the exe is from release, we'll find it on WN
748     pass
749 ewv 1.131
750 slacapra 1.61 ## Now get the libraries: only those in local working area
751     libDir = 'lib'
752     lib = swArea+'/' +libDir
753     common.logger.debug(5,"lib "+lib+" to be tarred")
754     if os.path.exists(lib):
755     tar.add(lib,libDir)
756 ewv 1.131
757 slacapra 1.61 ## Now check if module dir is present
758     moduleDir = 'module'
759     module = swArea + '/' + moduleDir
760     if os.path.isdir(module):
761     tar.add(module,moduleDir)
762    
763     ## Now check if any data dir(s) is present
764     swAreaLen=len(swArea)
765     for root, dirs, files in os.walk(swArea):
766     if "data" in dirs:
767     common.logger.debug(5,"data "+root+"/data"+" to be tarred")
768     tar.add(root+"/data",root[swAreaLen:]+"/data")
769    
770 fanzago 1.93
771 fanzago 1.152 ## Add ProdCommon dir to tar
772 fanzago 1.93 prodcommonDir = 'ProdCommon'
773     prodcommonPath = os.environ['CRABDIR'] + '/' + 'ProdCommon'
774     if os.path.isdir(prodcommonPath):
775     tar.add(prodcommonPath,prodcommonDir)
776 ewv 1.131
777 slacapra 1.61 common.logger.debug(5,"Files added to "+self.tgzNameWithPath+" : "+str(tar.getnames()))
778     tar.close()
779     except :
780     raise CrabException('Could not create tar-ball')
781 gutsche 1.72
782     ## check for tarball size
783     tarballinfo = os.stat(self.tgzNameWithPath)
784     if ( tarballinfo.st_size > self.MaxTarBallSize*1024*1024 ) :
785     raise CrabException('Input sandbox size of ' + str(float(tarballinfo.st_size)/1024.0/1024.0) + ' MB is larger than the allowed ' + str(self.MaxTarBallSize) + ' MB input sandbox limit and not supported by the used GRID submission system. Please make sure that no unnecessary files are in all data directories in your local CMSSW project area as they are automatically packed into the input sandbox.')
786    
787 slacapra 1.61 ## create tar-ball with ML stuff
788 ewv 1.131 self.MLtgzfile = common.work_space.pathForTgz()+'share/MLfiles.tgz'
789 slacapra 1.61 try:
790     tar = tarfile.open(self.MLtgzfile, "w:gz")
791     path=os.environ['CRABDIR'] + '/python/'
792 fanzago 1.166 for file in ['report.py', 'DashboardAPI.py', 'Logger.py', 'ProcInfo.py', 'apmon.py', 'parseCrabFjr.py','writeCfg.py', 'JobReportErrorCode.py']:
793 slacapra 1.61 tar.add(path+file,file)
794     common.logger.debug(5,"Files added to "+self.MLtgzfile+" : "+str(tar.getnames()))
795     tar.close()
796     except :
797 corvo 1.58 raise CrabException('Could not create ML files tar-ball')
798 ewv 1.131
799 slacapra 1.1 return
800 ewv 1.131
801 slacapra 1.97 def additionalInputFileTgz(self):
802     """
803     Put all additional files into a tar ball and return its name
804     """
805     import tarfile
806     tarName= common.work_space.pathForTgz()+'share/'+self.additional_tgz_name
807     tar = tarfile.open(tarName, "w:gz")
808     for file in self.additional_inbox_files:
809     tar.add(file,string.split(file,'/')[-1])
810     common.logger.debug(5,"Files added to "+self.additional_tgz_name+" : "+str(tar.getnames()))
811     tar.close()
812     return tarName
813    
814 spiga 1.165 def wsSetupEnvironment(self, nj=0):
815 slacapra 1.1 """
816     Returns part of a job script which prepares
817     the execution environment for the job 'nj'.
818     """
819     # Prepare JobType-independent part
820 ewv 1.160 txt = '\n#Written by cms_cmssw::wsSetupEnvironment\n'
821 fanzago 1.133 txt += 'echo ">>> setup environment"\n'
822 ewv 1.131 txt += 'if [ $middleware == LCG ]; then \n'
823 gutsche 1.3 txt += self.wsSetupCMSLCGEnvironment_()
824     txt += 'elif [ $middleware == OSG ]; then\n'
825 gutsche 1.43 txt += ' WORKING_DIR=`/bin/mktemp -d $OSG_WN_TMP/cms_XXXXXXXXXXXX`\n'
826 ewv 1.132 txt += ' if [ ! $? == 0 ] ;then\n'
827 fanzago 1.161 txt += ' echo "ERROR ==> OSG $WORKING_DIR could not be created on WN `hostname`"\n'
828     txt += ' job_exit_code=10016\n'
829     txt += ' func_exit\n'
830 gutsche 1.3 txt += ' fi\n'
831 fanzago 1.133 txt += ' echo ">>> Created working directory: $WORKING_DIR"\n'
832 gutsche 1.3 txt += '\n'
833     txt += ' echo "Change to working directory: $WORKING_DIR"\n'
834     txt += ' cd $WORKING_DIR\n'
835 fanzago 1.133 txt += ' echo ">>> current directory (WORKING_DIR): $WORKING_DIR"\n'
836 ewv 1.131 txt += self.wsSetupCMSOSGEnvironment_()
837 fanzago 1.133 #txt += ' echo "### Set SCRAM ARCH to ' + self.executable_arch + ' ###"\n'
838     #txt += ' export SCRAM_ARCH='+self.executable_arch+'\n'
839 gutsche 1.3 txt += 'fi\n'
840 slacapra 1.1
841     # Prepare JobType-specific part
842     scram = self.scram.commandName()
843     txt += '\n\n'
844 fanzago 1.133 txt += 'echo ">>> specific cmssw setup environment:"\n'
845     txt += 'echo "CMSSW_VERSION = '+self.version+'"\n'
846 slacapra 1.1 txt += scram+' project CMSSW '+self.version+'\n'
847     txt += 'status=$?\n'
848     txt += 'if [ $status != 0 ] ; then\n'
849 fanzago 1.161 txt += ' echo "ERROR ==> CMSSW '+self.version+' not found on `hostname`" \n'
850     txt += ' job_exit_code=10034\n'
851 fanzago 1.163 txt += ' func_exit\n'
852 slacapra 1.1 txt += 'fi \n'
853     txt += 'cd '+self.version+'\n'
854 fanzago 1.99 ########## FEDE FOR DBS2 ######################
855     txt += 'SOFTWARE_DIR=`pwd`\n'
856 fanzago 1.133 txt += 'echo ">>> current directory (SOFTWARE_DIR): $SOFTWARE_DIR" \n'
857 fanzago 1.99 ###############################################
858 slacapra 1.1 ### needed grep for bug in scramv1 ###
859     txt += 'eval `'+scram+' runtime -sh | grep -v SCRAMRT_LSB_JOBNAME`\n'
860     # Handle the arguments:
861     txt += "\n"
862 gutsche 1.7 txt += "## number of arguments (first argument always jobnumber)\n"
863 slacapra 1.1 txt += "\n"
864 spiga 1.165 txt += "if [ $nargs -lt "+str(self.argsList)+" ]\n"
865 slacapra 1.1 txt += "then\n"
866 fanzago 1.161 txt += " echo 'ERROR ==> Too few arguments' +$nargs+ \n"
867     txt += ' job_exit_code=50113\n'
868     txt += " func_exit\n"
869 slacapra 1.1 txt += "fi\n"
870     txt += "\n"
871    
872     # Prepare job-specific part
873     job = common.job_list[nj]
874 fanzago 1.93 ### FEDE FOR DBS OUTPUT PUBLICATION
875 ewv 1.131 if (self.datasetPath):
876 fanzago 1.93 txt += '\n'
877     txt += 'DatasetPath='+self.datasetPath+'\n'
878    
879     datasetpath_split = self.datasetPath.split("/")
880 ewv 1.131
881 fanzago 1.93 txt += 'PrimaryDataset='+datasetpath_split[1]+'\n'
882     txt += 'DataTier='+datasetpath_split[2]+'\n'
883 fanzago 1.96 txt += 'ApplicationFamily=cmsRun\n'
884 fanzago 1.93
885     else:
886     txt += 'DatasetPath=MCDataTier\n'
887     txt += 'PrimaryDataset=null\n'
888     txt += 'DataTier=null\n'
889     txt += 'ApplicationFamily=MCDataTier\n'
890 ewv 1.170 if self.pset != None:
891 spiga 1.42 pset = os.path.basename(job.configFilename())
892     txt += '\n'
893 spiga 1.95 txt += 'cp $RUNTIME_AREA/'+pset+' .\n'
894 spiga 1.42 if (self.datasetPath): # standard job
895 ewv 1.160 txt += 'InputFiles=${args[1]}; export InputFiles\n'
896     txt += 'MaxEvents=${args[2]}; export MaxEvents\n'
897     txt += 'SkipEvents=${args[3]}; export SkipEvents\n'
898 spiga 1.42 txt += 'echo "Inputfiles:<$InputFiles>"\n'
899     txt += 'echo "MaxEvents:<$MaxEvents>"\n'
900     txt += 'echo "SkipEvents:<$SkipEvents>"\n'
901     else: # pythia like job
902 ewv 1.160 txt += 'PreserveSeeds=' + ','.join(self.preserveSeeds) + '; export PreserveSeeds\n'
903     txt += 'IncrementSeeds=' + ','.join(self.incrementSeeds) + '; export IncrementSeeds\n'
904     txt += 'echo "PreserveSeeds: <$PreserveSeeds>"\n'
905     txt += 'echo "IncrementSeeds:<$IncrementSeeds>"\n'
906 slacapra 1.90 if (self.firstRun):
907 ewv 1.160 txt += 'FirstRun=${args[1]}; export FirstRun\n'
908 spiga 1.57 txt += 'echo "FirstRun: <$FirstRun>"\n'
909 slacapra 1.90
910     txt += 'mv -f '+pset+' pset.cfg\n'
911 slacapra 1.1
912     if len(self.additional_inbox_files) > 0:
913 slacapra 1.97 txt += 'if [ -e $RUNTIME_AREA/'+self.additional_tgz_name+' ] ; then\n'
914     txt += ' tar xzvf $RUNTIME_AREA/'+self.additional_tgz_name+'\n'
915     txt += 'fi\n'
916 ewv 1.131 pass
917 slacapra 1.1
918 fanzago 1.163 if self.pset != None:
919 spiga 1.42 txt += '\n'
920     txt += 'echo "***** cat pset.cfg *********"\n'
921     txt += 'cat pset.cfg\n'
922     txt += 'echo "****** end pset.cfg ********"\n'
923     txt += '\n'
924 fanzago 1.94 txt += 'PSETHASH=`EdmConfigHash < pset.cfg` \n'
925     txt += 'echo "PSETHASH = $PSETHASH" \n'
926 fanzago 1.93 txt += '\n'
927 gutsche 1.3 return txt
928 fanzago 1.166 def wsUntarSoftware(self, nj=0):
929 gutsche 1.3 """
930     Put in the script the commands to build an executable
931     or a library.
932     """
933    
934 fanzago 1.166 txt = '\n#Written by cms_cmssw::wsUntarSoftware\n'
935 gutsche 1.3
936     if os.path.isfile(self.tgzNameWithPath):
937 fanzago 1.133 txt += 'echo ">>> tar xzvf $RUNTIME_AREA/'+os.path.basename(self.tgzNameWithPath)+' :" \n'
938 gutsche 1.3 txt += 'tar xzvf $RUNTIME_AREA/'+os.path.basename(self.tgzNameWithPath)+'\n'
939     txt += 'untar_status=$? \n'
940     txt += 'if [ $untar_status -ne 0 ]; then \n'
941 fanzago 1.161 txt += ' echo "ERROR ==> Untarring .tgz file failed"\n'
942     txt += ' job_exit_code=$untar_status\n'
943     txt += ' func_exit\n'
944 gutsche 1.3 txt += 'else \n'
945     txt += ' echo "Successful untar" \n'
946     txt += 'fi \n'
947 gutsche 1.50 txt += '\n'
948 fanzago 1.152 txt += 'echo ">>> Include ProdCommon in PYTHONPATH:"\n'
949 gutsche 1.50 txt += 'if [ -z "$PYTHONPATH" ]; then\n'
950 fanzago 1.166 txt += ' export PYTHONPATH=$RUNTIME_AREA/ProdCommon\n'
951 gutsche 1.50 txt += 'else\n'
952 fanzago 1.166 txt += ' export PYTHONPATH=$RUNTIME_AREA/ProdCommon:${PYTHONPATH}\n'
953 fanzago 1.93 txt += 'echo "PYTHONPATH=$PYTHONPATH"\n'
954 gutsche 1.50 txt += 'fi\n'
955     txt += '\n'
956    
957 gutsche 1.3 pass
958 ewv 1.131
959 slacapra 1.1 return txt
960 ewv 1.170
961 fanzago 1.166 def wsBuildExe(self, nj=0):
962     """
963     Put in the script the commands to build an executable
964     or a library.
965     """
966    
967     txt = '\n#Written by cms_cmssw::wsBuildExe\n'
968     txt += 'echo ">>> moving CMSSW software directories in `pwd`" \n'
969    
970 ewv 1.170 txt += 'rm -r lib/ module/ \n'
971     txt += 'mv $RUNTIME_AREA/lib/ . \n'
972     txt += 'mv $RUNTIME_AREA/module/ . \n'
973     txt += 'mv $RUNTIME_AREA/ProdCommon/ . \n'
974    
975 fanzago 1.166 txt += 'if [ -z "$PYTHONPATH" ]; then\n'
976     txt += ' export PYTHONPATH=$SOFTWARE_DIR/ProdCommon\n'
977     txt += 'else\n'
978     txt += ' export PYTHONPATH=$SOFTWARE_DIR/ProdCommon:${PYTHONPATH}\n'
979     txt += 'echo "PYTHONPATH=$PYTHONPATH"\n'
980     txt += 'fi\n'
981     txt += '\n'
982    
983     return txt
984 slacapra 1.1
985     def modifySteeringCards(self, nj):
986     """
987 ewv 1.131 modify the card provided by the user,
988 slacapra 1.1 writing a new card into share dir
989     """
990 ewv 1.131
991 slacapra 1.1 def executableName(self):
992 slacapra 1.70 if self.scriptExe: #CarlosDaniele
993 spiga 1.42 return "sh "
994     else:
995     return self.executable
996 slacapra 1.1
997     def executableArgs(self):
998 ewv 1.160 # FUTURE: This function tests the CMSSW version. Can be simplified as we drop support for old versions
999 slacapra 1.70 if self.scriptExe:#CarlosDaniele
1000 spiga 1.42 return self.scriptExe + " $NJob"
1001 fanzago 1.115 else:
1002     version_array = self.scram.getSWVersion().split('_')
1003     major = 0
1004     minor = 0
1005     try:
1006     major = int(version_array[1])
1007     minor = int(version_array[2])
1008     except:
1009 ewv 1.131 msg = "Cannot parse CMSSW version string: " + "_".join(version_array) + " for major and minor release number!"
1010 fanzago 1.115 raise CrabException(msg)
1011 ewv 1.160
1012     ex_args = ""
1013 ewv 1.171 # FUTURE: This tests the CMSSW version. Can remove code as versions deprecated
1014 ewv 1.160 # Framework job report
1015 fanzago 1.115 if major >= 1 and minor >= 5 :
1016 fanzago 1.166 ex_args += " -j $RUNTIME_AREA/crab_fjr_$NJob.xml"
1017 ewv 1.160 # Type of cfg file
1018     if major >= 2 :
1019 ewv 1.171 ex_args += " -p pset.py"
1020 fanzago 1.115 else:
1021 ewv 1.160 ex_args += " -p pset.cfg"
1022     return ex_args
1023 slacapra 1.1
1024     def inputSandbox(self, nj):
1025     """
1026     Returns a list of filenames to be put in JDL input sandbox.
1027     """
1028     inp_box = []
1029 slacapra 1.53 # # dict added to delete duplicate from input sandbox file list
1030     # seen = {}
1031 slacapra 1.1 ## code
1032     if os.path.isfile(self.tgzNameWithPath):
1033     inp_box.append(self.tgzNameWithPath)
1034 corvo 1.58 if os.path.isfile(self.MLtgzfile):
1035     inp_box.append(self.MLtgzfile)
1036 slacapra 1.1 ## config
1037 slacapra 1.70 if not self.pset is None:
1038 corvo 1.56 inp_box.append(common.work_space.pathForTgz() + 'job/' + self.configFilename())
1039 slacapra 1.1 ## additional input files
1040 slacapra 1.97 tgz = self.additionalInputFileTgz()
1041     inp_box.append(tgz)
1042 spiga 1.168 ## executable
1043     wrapper = os.path.basename(str(common._db.queryTask('scriptName')))
1044     inp_box.append(common.work_space.pathForTgz() +'job/'+ wrapper)
1045 slacapra 1.1 return inp_box
1046    
1047     def outputSandbox(self, nj):
1048     """
1049     Returns a list of filenames to be put in JDL output sandbox.
1050     """
1051     out_box = []
1052    
1053     ## User Declared output files
1054 slacapra 1.54 for out in (self.output_file+self.output_file_sandbox):
1055 ewv 1.131 n_out = nj + 1
1056 slacapra 1.1 out_box.append(self.numberFile_(out,str(n_out)))
1057     return out_box
1058    
1059     def prepareSteeringCards(self):
1060     """
1061     Make initial modifications of the user's steering card file.
1062     """
1063     return
1064    
1065     def wsRenameOutput(self, nj):
1066     """
1067     Returns part of a job script which renames the produced files.
1068     """
1069    
1070 ewv 1.160 txt = '\n#Written by cms_cmssw::wsRenameOutput\n'
1071 fanzago 1.148 txt += 'echo ">>> current directory (SOFTWARE_DIR): $SOFTWARE_DIR" \n'
1072     txt += 'echo ">>> current directory content:"\n'
1073 gutsche 1.7 txt += 'ls \n'
1074 fanzago 1.145 txt += '\n'
1075 slacapra 1.54
1076 fanzago 1.128 for fileWithSuffix in (self.output_file):
1077 slacapra 1.1 output_file_num = self.numberFile_(fileWithSuffix, '$NJob')
1078     txt += '\n'
1079 gutsche 1.7 txt += '# check output file\n'
1080 slacapra 1.106 txt += 'if [ -e ./'+fileWithSuffix+' ] ; then\n'
1081 ewv 1.147 if (self.copy_data == 1): # For OSG nodes, file is in $WORKING_DIR, should not be moved to $RUNTIME_AREA
1082     txt += ' mv '+fileWithSuffix+' '+output_file_num+'\n'
1083     txt += ' ln -s `pwd`/'+output_file_num+' $RUNTIME_AREA/'+fileWithSuffix+'\n'
1084     else:
1085     txt += ' mv '+fileWithSuffix+' $RUNTIME_AREA/'+output_file_num+'\n'
1086     txt += ' ln -s $RUNTIME_AREA/'+output_file_num+' $RUNTIME_AREA/'+fileWithSuffix+'\n'
1087 slacapra 1.106 txt += 'else\n'
1088 fanzago 1.161 txt += ' job_exit_code=60302\n'
1089     txt += ' echo "WARNING: Output file '+fileWithSuffix+' not found"\n'
1090 ewv 1.156 if common.scheduler.name().upper() == 'CONDOR_G':
1091 gutsche 1.7 txt += ' if [ $middleware == OSG ]; then \n'
1092     txt += ' echo "prepare dummy output file"\n'
1093     txt += ' echo "Processing of job output failed" > $RUNTIME_AREA/'+output_file_num+'\n'
1094     txt += ' fi \n'
1095 slacapra 1.1 txt += 'fi\n'
1096 slacapra 1.105 file_list = []
1097     for fileWithSuffix in (self.output_file):
1098     file_list.append(self.numberFile_(fileWithSuffix, '$NJob'))
1099 ewv 1.131
1100 slacapra 1.105 txt += 'file_list="'+string.join(file_list,' ')+'"\n'
1101 fanzago 1.149 txt += '\n'
1102 fanzago 1.148 txt += 'echo ">>> current directory (SOFTWARE_DIR): $SOFTWARE_DIR" \n'
1103     txt += 'echo ">>> current directory content:"\n'
1104     txt += 'ls \n'
1105     txt += '\n'
1106 gutsche 1.7 txt += 'cd $RUNTIME_AREA\n'
1107 fanzago 1.133 txt += 'echo ">>> current directory (RUNTIME_AREA): $RUNTIME_AREA"\n'
1108 slacapra 1.1 return txt
1109    
1110     def numberFile_(self, file, txt):
1111     """
1112     append _'txt' before last extension of a file
1113     """
1114     p = string.split(file,".")
1115     # take away last extension
1116     name = p[0]
1117     for x in p[1:-1]:
1118 slacapra 1.90 name=name+"."+x
1119 slacapra 1.1 # add "_txt"
1120     if len(p)>1:
1121 slacapra 1.90 ext = p[len(p)-1]
1122     result = name + '_' + txt + "." + ext
1123 slacapra 1.1 else:
1124 slacapra 1.90 result = name + '_' + txt
1125 ewv 1.131
1126 slacapra 1.1 return result
1127    
1128 slacapra 1.63 def getRequirements(self, nj=[]):
1129 slacapra 1.1 """
1130 ewv 1.131 return job requirements to add to jdl files
1131 slacapra 1.1 """
1132     req = ''
1133 slacapra 1.47 if self.version:
1134 slacapra 1.10 req='Member("VO-cms-' + \
1135 slacapra 1.47 self.version + \
1136 slacapra 1.10 '", other.GlueHostApplicationSoftwareRunTimeEnvironment)'
1137 farinafa 1.111 ## SL add requirement for OS version only if SL4
1138     #reSL4 = re.compile( r'slc4' )
1139 slacapra 1.109 if self.executable_arch: # and reSL4.search(self.executable_arch):
1140 gutsche 1.107 req+=' && Member("VO-cms-' + \
1141 slacapra 1.105 self.executable_arch + \
1142     '", other.GlueHostApplicationSoftwareRunTimeEnvironment)'
1143 gutsche 1.35
1144     req = req + ' && (other.GlueHostNetworkAdapterOutboundIP)'
1145 afanfani 1.158 if common.scheduler.name() == "glitecoll":
1146     req += ' && other.GlueCEStateStatus == "Production" '
1147 gutsche 1.35
1148 slacapra 1.1 return req
1149 gutsche 1.3
1150     def configFilename(self):
1151     """ return the config filename """
1152     return self.name()+'.cfg'
1153    
1154     def wsSetupCMSOSGEnvironment_(self):
1155     """
1156     Returns part of a job script which is prepares
1157     the execution environment and which is common for all CMS jobs.
1158     """
1159 ewv 1.160 txt = '\n#Written by cms_cmssw::wsSetupCMSOSGEnvironment_\n'
1160     txt += ' echo ">>> setup CMS OSG environment:"\n'
1161 fanzago 1.133 txt += ' echo "set SCRAM ARCH to ' + self.executable_arch + '"\n'
1162     txt += ' export SCRAM_ARCH='+self.executable_arch+'\n'
1163 fanzago 1.136 txt += ' echo "SCRAM_ARCH = $SCRAM_ARCH"\n'
1164 ewv 1.135 txt += ' if [ -f $OSG_APP/cmssoft/cms/cmsset_default.sh ] ;then\n'
1165 mkirn 1.40 txt += ' # Use $OSG_APP/cmssoft/cms/cmsset_default.sh to setup cms software\n'
1166 fanzago 1.133 txt += ' source $OSG_APP/cmssoft/cms/cmsset_default.sh '+self.version+'\n'
1167     txt += ' else\n'
1168 fanzago 1.161 txt += ' echo "ERROR ==> $OSG_APP/cmssoft/cms/cmsset_default.sh file not found"\n'
1169     txt += ' job_exit_code=10020\n'
1170     txt += ' func_exit\n'
1171 fanzago 1.133 txt += ' fi\n'
1172 gutsche 1.3 txt += '\n'
1173 fanzago 1.161 txt += ' echo "==> setup cms environment ok"\n'
1174 fanzago 1.136 txt += ' echo "SCRAM_ARCH = $SCRAM_ARCH"\n'
1175 gutsche 1.3
1176     return txt
1177 ewv 1.131
1178 gutsche 1.3 ### OLI_DANIELE
1179     def wsSetupCMSLCGEnvironment_(self):
1180     """
1181     Returns part of a job script which is prepares
1182     the execution environment and which is common for all CMS jobs.
1183     """
1184 ewv 1.160 txt = '\n#Written by cms_cmssw::wsSetupCMSLCGEnvironment_\n'
1185     txt += ' echo ">>> setup CMS LCG environment:"\n'
1186 fanzago 1.133 txt += ' echo "set SCRAM ARCH and BUILD_ARCH to ' + self.executable_arch + ' ###"\n'
1187     txt += ' export SCRAM_ARCH='+self.executable_arch+'\n'
1188     txt += ' export BUILD_ARCH='+self.executable_arch+'\n'
1189     txt += ' if [ ! $VO_CMS_SW_DIR ] ;then\n'
1190 fanzago 1.161 txt += ' echo "ERROR ==> CMS software dir not found on WN `hostname`"\n'
1191     txt += ' job_exit_code=10031\n'
1192     txt += ' func_exit\n'
1193 fanzago 1.133 txt += ' else\n'
1194     txt += ' echo "Sourcing environment... "\n'
1195     txt += ' if [ ! -s $VO_CMS_SW_DIR/cmsset_default.sh ] ;then\n'
1196 fanzago 1.161 txt += ' echo "ERROR ==> cmsset_default.sh file not found into dir $VO_CMS_SW_DIR"\n'
1197     txt += ' job_exit_code=10020\n'
1198     txt += ' func_exit\n'
1199 fanzago 1.133 txt += ' fi\n'
1200     txt += ' echo "sourcing $VO_CMS_SW_DIR/cmsset_default.sh"\n'
1201     txt += ' source $VO_CMS_SW_DIR/cmsset_default.sh\n'
1202     txt += ' result=$?\n'
1203     txt += ' if [ $result -ne 0 ]; then\n'
1204 fanzago 1.161 txt += ' echo "ERROR ==> problem sourcing $VO_CMS_SW_DIR/cmsset_default.sh"\n'
1205     txt += ' job_exit_code=10032\n'
1206     txt += ' func_exit\n'
1207 fanzago 1.133 txt += ' fi\n'
1208     txt += ' fi\n'
1209     txt += ' \n'
1210 fanzago 1.161 txt += ' echo "==> setup cms environment ok"\n'
1211 gutsche 1.3 return txt
1212 gutsche 1.5
1213 ewv 1.131 ### FEDE FOR DBS OUTPUT PUBLICATION
1214 fanzago 1.93 def modifyReport(self, nj):
1215     """
1216 ewv 1.131 insert the part of the script that modifies the FrameworkJob Report
1217 fanzago 1.93 """
1218 fanzago 1.94
1219 ewv 1.160 txt = '\n#Written by cms_cmssw::modifyReport\n'
1220 fanzago 1.94 try:
1221 ewv 1.131 publish_data = int(self.cfg_params['USER.publish_data'])
1222 fanzago 1.94 except KeyError:
1223     publish_data = 0
1224 ewv 1.131 if (publish_data == 1):
1225 fanzago 1.94 processedDataset = self.cfg_params['USER.publish_data_name']
1226 fanzago 1.173 LFNBaseName = LFNBase(processedDataset)
1227 fanzago 1.175
1228     txt += 'if [ $copy_exit_status -eq 0 ]; then\n'
1229 fanzago 1.173 txt += ' FOR_LFN=%s_${PSETHASH}/\n'%(LFNBaseName)
1230 fanzago 1.175 txt += 'else\n'
1231     txt += ' FOR_LFN=/copy_problems/ \n'
1232     txt += ' SE=""\n'
1233     txt += ' SE_PATH=""\n'
1234     txt += 'fi\n'
1235    
1236     txt += 'echo ">>> Modify Job Report:" \n'
1237     txt += 'chmod a+x $SOFTWARE_DIR/ProdCommon/ProdCommon/FwkJobRep/ModifyJobReport.py\n'
1238     txt += 'ProcessedDataset='+processedDataset+'\n'
1239     txt += 'echo "ProcessedDataset = $ProcessedDataset"\n'
1240     txt += 'echo "SE = $SE"\n'
1241     txt += 'echo "SE_PATH = $SE_PATH"\n'
1242     txt += 'echo "FOR_LFN = $FOR_LFN" \n'
1243     txt += 'echo "CMSSW_VERSION = $CMSSW_VERSION"\n\n'
1244     txt += 'echo "$SOFTWARE_DIR/ProdCommon/ProdCommon/FwkJobRep/ModifyJobReport.py $RUNTIME_AREA/crab_fjr_$NJob.xml $NJob $FOR_LFN $PrimaryDataset $DataTier $ProcessedDataset $ApplicationFamily $executable $CMSSW_VERSION $PSETHASH $SE $SE_PATH"\n'
1245     txt += '$SOFTWARE_DIR/ProdCommon/ProdCommon/FwkJobRep/ModifyJobReport.py $RUNTIME_AREA/crab_fjr_$NJob.xml $NJob $FOR_LFN $PrimaryDataset $DataTier $ProcessedDataset $ApplicationFamily $executable $CMSSW_VERSION $PSETHASH $SE $SE_PATH\n'
1246     txt += 'modifyReport_result=$?\n'
1247     txt += 'if [ $modifyReport_result -ne 0 ]; then\n'
1248     txt += ' modifyReport_result=70500\n'
1249     txt += ' job_exit_code=$modifyReport_result\n'
1250     txt += ' echo "ModifyReportResult=$modifyReport_result" | tee -a $RUNTIME_AREA/$repo\n'
1251     txt += ' echo "WARNING: Problem with ModifyJobReport"\n'
1252     txt += 'else\n'
1253     txt += ' mv NewFrameworkJobReport.xml $RUNTIME_AREA/crab_fjr_$NJob.xml\n'
1254 spiga 1.103 txt += 'fi\n'
1255 fanzago 1.93 return txt
1256 fanzago 1.99
1257 gutsche 1.5 def setParam_(self, param, value):
1258     self._params[param] = value
1259    
1260     def getParams(self):
1261     return self._params
1262 gutsche 1.8
1263 gutsche 1.35 def uniquelist(self, old):
1264     """
1265     remove duplicates from a list
1266     """
1267     nd={}
1268     for e in old:
1269     nd[e]=0
1270     return nd.keys()
1271 mcinquil 1.121
1272 spiga 1.169 def outList(self):
1273 mcinquil 1.121 """
1274     check the dimension of the output files
1275     """
1276 spiga 1.169 txt = ''
1277     txt += 'echo ">>> list of expected files on output sandbox"\n'
1278 mcinquil 1.121 listOutFiles = []
1279 ewv 1.170 stdout = 'CMSSW_$NJob.stdout'
1280 spiga 1.169 stderr = 'CMSSW_$NJob.stderr'
1281 fanzago 1.148 if (self.return_data == 1):
1282 spiga 1.157 for file in (self.output_file+self.output_file_sandbox):
1283     listOutFiles.append(self.numberFile_(file, '$NJob'))
1284 spiga 1.169 listOutFiles.append(stdout)
1285     listOutFiles.append(stderr)
1286 ewv 1.156 else:
1287 spiga 1.157 for file in (self.output_file_sandbox):
1288     listOutFiles.append(self.numberFile_(file, '$NJob'))
1289 spiga 1.169 listOutFiles.append(stdout)
1290     listOutFiles.append(stderr)
1291 fanzago 1.161 txt += 'echo "output files: '+string.join(listOutFiles,' ')+'"\n'
1292 spiga 1.157 txt += 'filesToCheck="'+string.join(listOutFiles,' ')+'"\n'
1293 spiga 1.169 txt += 'export filesToCheck\n'
1294 ewv 1.170 return txt