ViewVC Help
View File | Revision Log | Show Annotations | Root Listing
root/cvsroot/COMP/CRAB/python/cms_cmssw.py
Revision: 1.239
Committed: Sun Sep 21 10:31:52 2008 UTC (16 years, 7 months ago) by spiga
Content type: text/x-python
Branch: MAIN
Changes since 1.238: +0 -1 lines
Log Message:
Removed dependence from LFNBase (reduce n° of SiteDB query). Added cmscp to ISB. Improved msg ISB size related adding reference to "how to use server" twiki.  Changed modifyReport: it must  start with "ws" following the related convention. minor code cleaning

File Contents

# User Rev Content
1 slacapra 1.1 from JobType import JobType
2     from crab_logger import Logger
3     from crab_exceptions import *
4     from crab_util import *
5 ewv 1.228 from BlackWhiteListParser import SEBlackWhiteListParser
6 slacapra 1.1 import common
7     import Scram
8    
9 slacapra 1.105 import os, string, glob
10 slacapra 1.1
11     class Cmssw(JobType):
12 spiga 1.208 def __init__(self, cfg_params, ncjobs,skip_blocks, isNew):
13 slacapra 1.1 JobType.__init__(self, 'CMSSW')
14     common.logger.debug(3,'CMSSW::__init__')
15 spiga 1.208 self.skip_blocks = skip_blocks
16 ewv 1.226
17 mcinquil 1.140 self.argsList = []
18 mcinquil 1.144
19 gutsche 1.3 self._params = {}
20     self.cfg_params = cfg_params
21 fanzago 1.115 # init BlackWhiteListParser
22 ewv 1.228 self.blackWhiteListParser = SEBlackWhiteListParser(cfg_params)
23 fanzago 1.115
24 spiga 1.234 ### Temporary patch to automatically skip the ISB size check:
25     server=self.cfg_params.get('CRAB.server_name',None)
26     size = 9.5
27     if server: size = 99999
28     ### D.S.
29     self.MaxTarBallSize = float(self.cfg_params.get('EDG.maxtarballsize',size))
30 gutsche 1.72
31 gutsche 1.44 # number of jobs requested to be created, limit obj splitting
32 gutsche 1.38 self.ncjobs = ncjobs
33    
34 slacapra 1.1 log = common.logger
35 ewv 1.131
36 slacapra 1.1 self.scram = Scram.Scram(cfg_params)
37     self.additional_inbox_files = []
38     self.scriptExe = ''
39     self.executable = ''
40 slacapra 1.71 self.executable_arch = self.scram.getArch()
41 slacapra 1.1 self.tgz_name = 'default.tgz'
42 corvo 1.56 self.scriptName = 'CMSSW.sh'
43 ewv 1.192 self.pset = ''
44 spiga 1.187 self.datasetPath = ''
45 gutsche 1.3
46 gutsche 1.50 # set FJR file name
47     self.fjrFileName = 'crab_fjr.xml'
48    
49 slacapra 1.1 self.version = self.scram.getSWVersion()
50 ewv 1.182 version_array = self.version.split('_')
51 ewv 1.184 self.CMSSW_major = 0
52     self.CMSSW_minor = 0
53     self.CMSSW_patch = 0
54 ewv 1.182 try:
55 ewv 1.184 self.CMSSW_major = int(version_array[1])
56     self.CMSSW_minor = int(version_array[2])
57     self.CMSSW_patch = int(version_array[3])
58 ewv 1.182 except:
59 ewv 1.184 msg = "Cannot parse CMSSW version string: " + self.version + " for major and minor release number!"
60 ewv 1.182 raise CrabException(msg)
61    
62 slacapra 1.1 ### collect Data cards
63 gutsche 1.66
64 slacapra 1.153 if not cfg_params.has_key('CMSSW.datasetpath'):
65 ewv 1.131 msg = "Error: datasetpath not defined "
66 slacapra 1.1 raise CrabException(msg)
67 ewv 1.226
68 fanzago 1.221 ### Temporary: added to remove input file control in the case of PU
69 farinafa 1.224 self.dataset_pu = cfg_params.get('CMSSW.dataset_pu', None)
70 ewv 1.226
71 slacapra 1.153 tmp = cfg_params['CMSSW.datasetpath']
72     log.debug(6, "CMSSW::CMSSW(): datasetPath = "+tmp)
73 spiga 1.236
74     if tmp =='':
75     msg = "Error: datasetpath not defined "
76     raise CrabException(msg)
77     elif string.lower(tmp)=='none':
78 slacapra 1.153 self.datasetPath = None
79     self.selectNoInput = 1
80     else:
81     self.datasetPath = tmp
82     self.selectNoInput = 0
83 gutsche 1.5
84 slacapra 1.1 self.dataTiers = []
85 spiga 1.197 self.debugWrap = ''
86     self.debug_wrapper = cfg_params.get('USER.debug_wrapper',False)
87     if self.debug_wrapper: self.debugWrap='--debug'
88 slacapra 1.1 ## now the application
89 slacapra 1.153 self.executable = cfg_params.get('CMSSW.executable','cmsRun')
90     log.debug(6, "CMSSW::CMSSW(): executable = "+self.executable)
91 slacapra 1.1
92 slacapra 1.153 if not cfg_params.has_key('CMSSW.pset'):
93 slacapra 1.1 raise CrabException("PSet file missing. Cannot run cmsRun ")
94 slacapra 1.153 self.pset = cfg_params['CMSSW.pset']
95     log.debug(6, "Cmssw::Cmssw(): PSet file = "+self.pset)
96     if self.pset.lower() != 'none' :
97     if (not os.path.exists(self.pset)):
98     raise CrabException("User defined PSet file "+self.pset+" does not exist")
99     else:
100     self.pset = None
101 slacapra 1.1
102     # output files
103 slacapra 1.53 ## stuff which must be returned always via sandbox
104     self.output_file_sandbox = []
105    
106     # add fjr report by default via sandbox
107     self.output_file_sandbox.append(self.fjrFileName)
108    
109     # other output files to be returned via sandbox or copied to SE
110 mcinquil 1.216 outfileflag = False
111 slacapra 1.153 self.output_file = []
112     tmp = cfg_params.get('CMSSW.output_file',None)
113     if tmp :
114 slacapra 1.207 self.output_file = [x.strip() for x in tmp.split(',')]
115 mcinquil 1.216 outfileflag = True #output found
116     #else:
117     # log.message("No output file defined: only stdout/err and the CRAB Framework Job Report will be available\n")
118 slacapra 1.1
119     # script_exe file as additional file in inputSandbox
120 slacapra 1.153 self.scriptExe = cfg_params.get('USER.script_exe',None)
121     if self.scriptExe :
122 slacapra 1.176 if not os.path.isfile(self.scriptExe):
123     msg ="ERROR. file "+self.scriptExe+" not found"
124     raise CrabException(msg)
125     self.additional_inbox_files.append(string.strip(self.scriptExe))
126 slacapra 1.70
127 spiga 1.42 if self.datasetPath == None and self.pset == None and self.scriptExe == '' :
128 slacapra 1.176 msg ="Error. script_exe not defined"
129     raise CrabException(msg)
130 spiga 1.42
131 ewv 1.226 # use parent files...
132 spiga 1.204 self.useParent = self.cfg_params.get('CMSSW.use_parent',False)
133    
134 slacapra 1.1 ## additional input files
135 slacapra 1.153 if cfg_params.has_key('USER.additional_input_files'):
136 slacapra 1.29 tmpAddFiles = string.split(cfg_params['USER.additional_input_files'],',')
137 slacapra 1.70 for tmp in tmpAddFiles:
138     tmp = string.strip(tmp)
139     dirname = ''
140     if not tmp[0]=="/": dirname = "."
141 corvo 1.85 files = []
142     if string.find(tmp,"*")>-1:
143     files = glob.glob(os.path.join(dirname, tmp))
144     if len(files)==0:
145     raise CrabException("No additional input file found with this pattern: "+tmp)
146     else:
147     files.append(tmp)
148 slacapra 1.70 for file in files:
149     if not os.path.exists(file):
150     raise CrabException("Additional input file not found: "+file)
151 slacapra 1.45 pass
152 slacapra 1.105 self.additional_inbox_files.append(string.strip(file))
153 slacapra 1.1 pass
154     pass
155 slacapra 1.70 common.logger.debug(5,"Additional input files: "+str(self.additional_inbox_files))
156 slacapra 1.153 pass
157 gutsche 1.3
158 slacapra 1.9 ## Events per job
159 slacapra 1.153 if cfg_params.has_key('CMSSW.events_per_job'):
160 slacapra 1.10 self.eventsPerJob =int( cfg_params['CMSSW.events_per_job'])
161 slacapra 1.9 self.selectEventsPerJob = 1
162 slacapra 1.153 else:
163 slacapra 1.9 self.eventsPerJob = -1
164     self.selectEventsPerJob = 0
165 ewv 1.131
166 slacapra 1.22 ## number of jobs
167 slacapra 1.153 if cfg_params.has_key('CMSSW.number_of_jobs'):
168 slacapra 1.22 self.theNumberOfJobs =int( cfg_params['CMSSW.number_of_jobs'])
169     self.selectNumberOfJobs = 1
170 slacapra 1.153 else:
171 slacapra 1.22 self.theNumberOfJobs = 0
172     self.selectNumberOfJobs = 0
173 slacapra 1.10
174 slacapra 1.153 if cfg_params.has_key('CMSSW.total_number_of_events'):
175 gutsche 1.35 self.total_number_of_events = int(cfg_params['CMSSW.total_number_of_events'])
176     self.selectTotalNumberEvents = 1
177 spiga 1.193 if self.selectNumberOfJobs == 1:
178 spiga 1.202 if (self.total_number_of_events != -1) and int(self.total_number_of_events) < int(self.theNumberOfJobs):
179 spiga 1.193 msg = 'Must specify at least one event per job. total_number_of_events > number_of_jobs '
180     raise CrabException(msg)
181 slacapra 1.153 else:
182 gutsche 1.35 self.total_number_of_events = 0
183     self.selectTotalNumberEvents = 0
184    
185 spiga 1.187 if self.pset != None:
186 spiga 1.42 if ( (self.selectTotalNumberEvents + self.selectEventsPerJob + self.selectNumberOfJobs) != 2 ):
187     msg = 'Must define exactly two of total_number_of_events, events_per_job, or number_of_jobs.'
188     raise CrabException(msg)
189     else:
190     if (self.selectNumberOfJobs == 0):
191     msg = 'Must specify number_of_jobs.'
192     raise CrabException(msg)
193 gutsche 1.35
194 ewv 1.160 ## New method of dealing with seeds
195     self.incrementSeeds = []
196     self.preserveSeeds = []
197     if cfg_params.has_key('CMSSW.preserve_seeds'):
198     tmpList = cfg_params['CMSSW.preserve_seeds'].split(',')
199     for tmp in tmpList:
200     tmp.strip()
201     self.preserveSeeds.append(tmp)
202     if cfg_params.has_key('CMSSW.increment_seeds'):
203     tmpList = cfg_params['CMSSW.increment_seeds'].split(',')
204     for tmp in tmpList:
205     tmp.strip()
206     self.incrementSeeds.append(tmp)
207    
208 ewv 1.227 ## FUTURE: Can remove in CRAB 2.4.0
209     self.sourceSeed = cfg_params.get('CMSSW.pythia_seed',None)
210 slacapra 1.153 self.sourceSeedVtx = cfg_params.get('CMSSW.vtx_seed',None)
211 ewv 1.227 self.sourceSeedG4 = cfg_params.get('CMSSW.g4_seed',None)
212 slacapra 1.153 self.sourceSeedMix = cfg_params.get('CMSSW.mix_seed',None)
213 ewv 1.227 if self.sourceSeed or self.sourceSeedVtx or self.sourceSeedG4 or self.sourceSeedMix:
214     msg = 'pythia_seed, vtx_seed, g4_seed, and mix_seed are no longer valid settings. You must use increment_seeds or preserve_seeds'
215     raise CrabException(msg)
216 slacapra 1.90
217 slacapra 1.153 self.firstRun = cfg_params.get('CMSSW.first_run',None)
218 slacapra 1.90
219 ewv 1.147 # Copy/return
220 slacapra 1.153 self.copy_data = int(cfg_params.get('USER.copy_data',0))
221     self.return_data = int(cfg_params.get('USER.return_data',0))
222 ewv 1.147
223 slacapra 1.1 #DBSDLS-start
224 ewv 1.131 ## Initialize the variables that are extracted from DBS/DLS and needed in other places of the code
225 slacapra 1.1 self.maxEvents=0 # max events available ( --> check the requested nb. of evts in Creator.py)
226     self.DBSPaths={} # all dbs paths requested ( --> input to the site local discovery script)
227 gutsche 1.35 self.jobDestination=[] # Site destination(s) for each job (list of lists)
228 slacapra 1.1 ## Perform the data location and discovery (based on DBS/DLS)
229 slacapra 1.9 ## SL: Don't if NONE is specified as input (pythia use case)
230 gutsche 1.35 blockSites = {}
231 slacapra 1.9 if self.datasetPath:
232 gutsche 1.35 blockSites = self.DataDiscoveryAndLocation(cfg_params)
233 ewv 1.131 #DBSDLS-end
234 slacapra 1.1
235 slacapra 1.9 ## Select Splitting
236 ewv 1.131 if self.selectNoInput:
237 spiga 1.187 if self.pset == None:
238 spiga 1.42 self.jobSplittingForScript()
239     else:
240     self.jobSplittingNoInput()
241 afanfani 1.237 elif (cfg_params.get('CMSSW.noblockboundary',0)):
242     self.jobSplittingNoBlockBoundary(blockSites)
243 gutsche 1.92 else:
244 corvo 1.56 self.jobSplittingByBlocks(blockSites)
245 gutsche 1.5
246 spiga 1.208 # modify Pset only the first time
247     if isNew:
248     if self.pset != None:
249     import PsetManipulator as pp
250     PsetEdit = pp.PsetManipulator(self.pset)
251     try:
252     # Add FrameworkJobReport to parameter-set, set max events.
253     # Reset later for data jobs by writeCFG which does all modifications
254     PsetEdit.addCrabFJR(self.fjrFileName) # FUTURE: Job report addition not needed by CMSSW>1.5
255     PsetEdit.maxEvent(self.eventsPerJob)
256     PsetEdit.psetWriter(self.configFilename())
257 slacapra 1.215 ## If present, add TFileService to output files
258     if not int(cfg_params.get('CMSSW.skip_TFileService_output',0)):
259     tfsOutput = PsetEdit.getTFileService()
260 ewv 1.226 if tfsOutput:
261 slacapra 1.215 if tfsOutput in self.output_file:
262     common.logger.debug(5,"Output from TFileService "+tfsOutput+" already in output files")
263     else:
264 mcinquil 1.216 outfileflag = True #output found
265 slacapra 1.215 self.output_file.append(tfsOutput)
266     common.logger.message("Adding "+tfsOutput+" to output files (from TFileService)")
267 slacapra 1.218 pass
268     pass
269     ## If present and requested, add PoolOutputModule to output files
270 slacapra 1.219 if int(cfg_params.get('CMSSW.get_edm_output',0)):
271 slacapra 1.218 edmOutput = PsetEdit.getPoolOutputModule()
272 ewv 1.226 if edmOutput:
273 slacapra 1.218 if edmOutput in self.output_file:
274     common.logger.debug(5,"Output from PoolOutputModule "+edmOutput+" already in output files")
275     else:
276     self.output_file.append(edmOutput)
277     common.logger.message("Adding "+edmOutput+" to output files (from PoolOutputModule)")
278     pass
279     pass
280 slacapra 1.215 except CrabException:
281 spiga 1.208 msg='Error while manipulating ParameterSet: exiting...'
282     raise CrabException(msg)
283 ewv 1.226 ## Prepare inputSandbox TarBall (only the first time)
284 spiga 1.208 self.tgzNameWithPath = self.getTarBall(self.executable)
285 gutsche 1.3
286 slacapra 1.1 def DataDiscoveryAndLocation(self, cfg_params):
287    
288 slacapra 1.86 import DataDiscovery
289     import DataLocation
290 gutsche 1.3 common.logger.debug(10,"CMSSW::DataDiscoveryAndLocation()")
291    
292     datasetPath=self.datasetPath
293    
294 slacapra 1.1 ## Contact the DBS
295 gutsche 1.92 common.logger.message("Contacting Data Discovery Services ...")
296 slacapra 1.1 try:
297 spiga 1.208 self.pubdata=DataDiscovery.DataDiscovery(datasetPath, cfg_params,self.skip_blocks)
298 slacapra 1.1 self.pubdata.fetchDBSInfo()
299    
300 slacapra 1.41 except DataDiscovery.NotExistingDatasetError, ex :
301 slacapra 1.1 msg = 'ERROR ***: failed Data Discovery in DBS : %s'%ex.getErrorMessage()
302     raise CrabException(msg)
303 slacapra 1.41 except DataDiscovery.NoDataTierinProvenanceError, ex :
304 slacapra 1.1 msg = 'ERROR ***: failed Data Discovery in DBS : %s'%ex.getErrorMessage()
305     raise CrabException(msg)
306 slacapra 1.41 except DataDiscovery.DataDiscoveryError, ex:
307 gutsche 1.66 msg = 'ERROR ***: failed Data Discovery in DBS : %s'%ex.getErrorMessage()
308 slacapra 1.1 raise CrabException(msg)
309    
310 gutsche 1.35 self.filesbyblock=self.pubdata.getFiles()
311 mkirn 1.37 self.eventsbyblock=self.pubdata.getEventsPerBlock()
312     self.eventsbyfile=self.pubdata.getEventsPerFile()
313 spiga 1.204 self.parentFiles=self.pubdata.getParent()
314 gutsche 1.3
315 slacapra 1.1 ## get max number of events
316 ewv 1.192 self.maxEvents=self.pubdata.getMaxEvents()
317 slacapra 1.1
318     ## Contact the DLS and build a list of sites hosting the fileblocks
319     try:
320 slacapra 1.41 dataloc=DataLocation.DataLocation(self.filesbyblock.keys(),cfg_params)
321 gutsche 1.6 dataloc.fetchDLSInfo()
322 slacapra 1.41 except DataLocation.DataLocationError , ex:
323 slacapra 1.1 msg = 'ERROR ***: failed Data Location in DLS \n %s '%ex.getErrorMessage()
324     raise CrabException(msg)
325 ewv 1.131
326 slacapra 1.1
327 gutsche 1.35 sites = dataloc.getSites()
328     allSites = []
329     listSites = sites.values()
330 slacapra 1.63 for listSite in listSites:
331     for oneSite in listSite:
332 gutsche 1.35 allSites.append(oneSite)
333     allSites = self.uniquelist(allSites)
334 gutsche 1.3
335 gutsche 1.92 # screen output
336     common.logger.message("Requested dataset: " + datasetPath + " has " + str(self.maxEvents) + " events in " + str(len(self.filesbyblock.keys())) + " blocks.\n")
337    
338 gutsche 1.35 return sites
339 ewv 1.131
340 gutsche 1.35 def jobSplittingByBlocks(self, blockSites):
341 slacapra 1.9 """
342 gutsche 1.35 Perform job splitting. Jobs run over an integer number of files
343     and no more than one block.
344     ARGUMENT: blockSites: dictionary with blocks as keys and list of host sites as values
345     REQUIRES: self.selectTotalNumberEvents, self.selectEventsPerJob, self.selectNumberofJobs,
346     self.total_number_of_events, self.eventsPerJob, self.theNumberOfJobs,
347     self.maxEvents, self.filesbyblock
348     SETS: self.jobDestination - Site destination(s) for each job (a list of lists)
349     self.total_number_of_jobs - Total # of jobs
350     self.list_of_args - File(s) job will run on (a list of lists)
351     """
352    
353     # ---- Handle the possible job splitting configurations ---- #
354     if (self.selectTotalNumberEvents):
355     totalEventsRequested = self.total_number_of_events
356     if (self.selectEventsPerJob):
357     eventsPerJobRequested = self.eventsPerJob
358     if (self.selectNumberOfJobs):
359     totalEventsRequested = self.theNumberOfJobs * self.eventsPerJob
360    
361     # If user requested all the events in the dataset
362     if (totalEventsRequested == -1):
363     eventsRemaining=self.maxEvents
364     # If user requested more events than are in the dataset
365     elif (totalEventsRequested > self.maxEvents):
366     eventsRemaining = self.maxEvents
367     common.logger.message("Requested "+str(self.total_number_of_events)+ " events, but only "+str(self.maxEvents)+" events are available.")
368     # If user requested less events than are in the dataset
369     else:
370     eventsRemaining = totalEventsRequested
371 slacapra 1.22
372 slacapra 1.41 # If user requested more events per job than are in the dataset
373     if (self.selectEventsPerJob and eventsPerJobRequested > self.maxEvents):
374     eventsPerJobRequested = self.maxEvents
375    
376 gutsche 1.35 # For user info at end
377     totalEventCount = 0
378 gutsche 1.3
379 gutsche 1.35 if (self.selectTotalNumberEvents and self.selectNumberOfJobs):
380     eventsPerJobRequested = int(eventsRemaining/self.theNumberOfJobs)
381 slacapra 1.22
382 gutsche 1.35 if (self.selectNumberOfJobs):
383     common.logger.message("May not create the exact number_of_jobs requested.")
384 slacapra 1.23
385 gutsche 1.38 if ( self.ncjobs == 'all' ) :
386     totalNumberOfJobs = 999999999
387     else :
388     totalNumberOfJobs = self.ncjobs
389 ewv 1.131
390 gutsche 1.35 blocks = blockSites.keys()
391     blockCount = 0
392     # Backup variable in case self.maxEvents counted events in a non-included block
393     numBlocksInDataset = len(blocks)
394 gutsche 1.3
395 gutsche 1.35 jobCount = 0
396     list_of_lists = []
397 gutsche 1.3
398 gutsche 1.92 # list tracking which jobs are in which jobs belong to which block
399     jobsOfBlock = {}
400    
401 gutsche 1.35 # ---- Iterate over the blocks in the dataset until ---- #
402     # ---- we've met the requested total # of events ---- #
403 gutsche 1.38 while ( (eventsRemaining > 0) and (blockCount < numBlocksInDataset) and (jobCount < totalNumberOfJobs)):
404 gutsche 1.35 block = blocks[blockCount]
405 gutsche 1.44 blockCount += 1
406 gutsche 1.104 if block not in jobsOfBlock.keys() :
407     jobsOfBlock[block] = []
408 ewv 1.131
409 gutsche 1.68 if self.eventsbyblock.has_key(block) :
410     numEventsInBlock = self.eventsbyblock[block]
411     common.logger.debug(5,'Events in Block File '+str(numEventsInBlock))
412 ewv 1.131
413 gutsche 1.68 files = self.filesbyblock[block]
414     numFilesInBlock = len(files)
415     if (numFilesInBlock <= 0):
416     continue
417     fileCount = 0
418    
419     # ---- New block => New job ---- #
420 ewv 1.131 parString = ""
421 gutsche 1.68 # counter for number of events in files currently worked on
422     filesEventCount = 0
423     # flag if next while loop should touch new file
424     newFile = 1
425     # job event counter
426     jobSkipEventCount = 0
427 ewv 1.131
428 gutsche 1.68 # ---- Iterate over the files in the block until we've met the requested ---- #
429     # ---- total # of events or we've gone over all the files in this block ---- #
430 spiga 1.204 pString=''
431 gutsche 1.68 while ( (eventsRemaining > 0) and (fileCount < numFilesInBlock) and (jobCount < totalNumberOfJobs) ):
432     file = files[fileCount]
433 spiga 1.204 if self.useParent:
434     parent = self.parentFiles[file]
435     for f in parent :
436     pString += '\\\"' + f + '\\\"\,'
437     common.logger.debug(6, "File "+str(file)+" has the following parents: "+str(parent))
438     common.logger.write("File "+str(file)+" has the following parents: "+str(parent))
439 gutsche 1.68 if newFile :
440     try:
441     numEventsInFile = self.eventsbyfile[file]
442     common.logger.debug(6, "File "+str(file)+" has "+str(numEventsInFile)+" events")
443     # increase filesEventCount
444     filesEventCount += numEventsInFile
445     # Add file to current job
446     parString += '\\\"' + file + '\\\"\,'
447     newFile = 0
448     except KeyError:
449     common.logger.message("File "+str(file)+" has unknown number of events: skipping")
450 ewv 1.131
451 slacapra 1.177 eventsPerJobRequested = min(eventsPerJobRequested, eventsRemaining)
452 gutsche 1.68 # if less events in file remain than eventsPerJobRequested
453 slacapra 1.177 if ( filesEventCount - jobSkipEventCount < eventsPerJobRequested):
454 gutsche 1.68 # if last file in block
455     if ( fileCount == numFilesInBlock-1 ) :
456     # end job using last file, use remaining events in block
457     # close job and touch new file
458     fullString = parString[:-2]
459 spiga 1.204 if self.useParent:
460     fullParentString = pString[:-2]
461     list_of_lists.append([fullString,fullParentString,str(-1),str(jobSkipEventCount)])
462     else:
463     list_of_lists.append([fullString,str(-1),str(jobSkipEventCount)])
464 gutsche 1.68 common.logger.debug(3,"Job "+str(jobCount+1)+" can run over "+str(filesEventCount - jobSkipEventCount)+" events (last file in block).")
465     self.jobDestination.append(blockSites[block])
466     common.logger.debug(5,"Job "+str(jobCount+1)+" Destination: "+str(self.jobDestination[jobCount]))
467 gutsche 1.92 # fill jobs of block dictionary
468 gutsche 1.104 jobsOfBlock[block].append(jobCount+1)
469 gutsche 1.68 # reset counter
470     jobCount = jobCount + 1
471     totalEventCount = totalEventCount + filesEventCount - jobSkipEventCount
472     eventsRemaining = eventsRemaining - filesEventCount + jobSkipEventCount
473     jobSkipEventCount = 0
474     # reset file
475 spiga 1.204 pString = ""
476 ewv 1.131 parString = ""
477 gutsche 1.68 filesEventCount = 0
478     newFile = 1
479     fileCount += 1
480     else :
481     # go to next file
482     newFile = 1
483     fileCount += 1
484     # if events in file equal to eventsPerJobRequested
485     elif ( filesEventCount - jobSkipEventCount == eventsPerJobRequested ) :
486 gutsche 1.38 # close job and touch new file
487     fullString = parString[:-2]
488 spiga 1.204 if self.useParent:
489     fullParentString = pString[:-2]
490     list_of_lists.append([fullString,fullParentString,str(eventsPerJobRequested),str(jobSkipEventCount)])
491 ewv 1.226 else:
492 spiga 1.204 list_of_lists.append([fullString,str(eventsPerJobRequested),str(jobSkipEventCount)])
493 gutsche 1.68 common.logger.debug(3,"Job "+str(jobCount+1)+" can run over "+str(eventsPerJobRequested)+" events.")
494 gutsche 1.38 self.jobDestination.append(blockSites[block])
495     common.logger.debug(5,"Job "+str(jobCount+1)+" Destination: "+str(self.jobDestination[jobCount]))
496 gutsche 1.104 jobsOfBlock[block].append(jobCount+1)
497 gutsche 1.38 # reset counter
498     jobCount = jobCount + 1
499 gutsche 1.68 totalEventCount = totalEventCount + eventsPerJobRequested
500     eventsRemaining = eventsRemaining - eventsPerJobRequested
501 gutsche 1.38 jobSkipEventCount = 0
502     # reset file
503 spiga 1.204 pString = ""
504 ewv 1.131 parString = ""
505 gutsche 1.38 filesEventCount = 0
506     newFile = 1
507     fileCount += 1
508 ewv 1.131
509 gutsche 1.68 # if more events in file remain than eventsPerJobRequested
510 gutsche 1.38 else :
511 gutsche 1.68 # close job but don't touch new file
512     fullString = parString[:-2]
513 spiga 1.204 if self.useParent:
514     fullParentString = pString[:-2]
515     list_of_lists.append([fullString,fullParentString,str(eventsPerJobRequested),str(jobSkipEventCount)])
516     else:
517     list_of_lists.append([fullString,str(eventsPerJobRequested),str(jobSkipEventCount)])
518 gutsche 1.68 common.logger.debug(3,"Job "+str(jobCount+1)+" can run over "+str(eventsPerJobRequested)+" events.")
519     self.jobDestination.append(blockSites[block])
520     common.logger.debug(5,"Job "+str(jobCount+1)+" Destination: "+str(self.jobDestination[jobCount]))
521 gutsche 1.104 jobsOfBlock[block].append(jobCount+1)
522 gutsche 1.68 # increase counter
523     jobCount = jobCount + 1
524     totalEventCount = totalEventCount + eventsPerJobRequested
525     eventsRemaining = eventsRemaining - eventsPerJobRequested
526     # calculate skip events for last file
527     # use filesEventCount (contains several files), jobSkipEventCount and eventsPerJobRequest
528     jobSkipEventCount = eventsPerJobRequested - (filesEventCount - jobSkipEventCount - self.eventsbyfile[file])
529     # remove all but the last file
530     filesEventCount = self.eventsbyfile[file]
531 spiga 1.204 if self.useParent:
532     for f in parent : pString += '\\\"' + f + '\\\"\,'
533 ewv 1.160 parString = '\\\"' + file + '\\\"\,'
534 gutsche 1.68 pass # END if
535     pass # END while (iterate over files in the block)
536 gutsche 1.35 pass # END while (iterate over blocks in the dataset)
537 slacapra 1.41 self.ncjobs = self.total_number_of_jobs = jobCount
538 gutsche 1.38 if (eventsRemaining > 0 and jobCount < totalNumberOfJobs ):
539 gutsche 1.35 common.logger.message("Could not run on all requested events because some blocks not hosted at allowed sites.")
540 gutsche 1.92 common.logger.message(str(jobCount)+" job(s) can run on "+str(totalEventCount)+" events.\n")
541 ewv 1.131
542 gutsche 1.92 # screen output
543     screenOutput = "List of jobs and available destination sites:\n\n"
544    
545 mcinquil 1.124 # keep trace of block with no sites to print a warning at the end
546     noSiteBlock = []
547     bloskNoSite = []
548    
549 gutsche 1.92 blockCounter = 0
550 gutsche 1.104 for block in blocks:
551     if block in jobsOfBlock.keys() :
552     blockCounter += 1
553 slacapra 1.176 screenOutput += "Block %5i: jobs %20s: sites: %s\n" % (blockCounter,spanRanges(jobsOfBlock[block]),
554     ','.join(self.blackWhiteListParser.checkWhiteList(self.blackWhiteListParser.checkBlackList(blockSites[block],block),block)))
555 mcinquil 1.124 if len(self.blackWhiteListParser.checkWhiteList(self.blackWhiteListParser.checkBlackList(blockSites[block],block),block)) == 0:
556 ewv 1.131 noSiteBlock.append( spanRanges(jobsOfBlock[block]) )
557 mcinquil 1.124 bloskNoSite.append( blockCounter )
558 ewv 1.131
559 mcinquil 1.124 common.logger.message(screenOutput)
560 fanzago 1.127 if len(noSiteBlock) > 0 and len(bloskNoSite) > 0:
561 mcinquil 1.126 msg = 'WARNING: No sites are hosting any part of data for block:\n '
562     virgola = ""
563     if len(bloskNoSite) > 1:
564     virgola = ","
565     for block in bloskNoSite:
566     msg += ' ' + str(block) + virgola
567     msg += '\n Related jobs:\n '
568     virgola = ""
569     if len(noSiteBlock) > 1:
570     virgola = ","
571     for range_jobs in noSiteBlock:
572     msg += str(range_jobs) + virgola
573     msg += '\n will not be submitted and this block of data can not be analyzed!\n'
574 slacapra 1.155 if self.cfg_params.has_key('EDG.se_white_list'):
575     msg += 'WARNING: SE White List: '+self.cfg_params['EDG.se_white_list']+'\n'
576     msg += '(Hint: By whitelisting you force the job to run at this particular site(s).\n'
577     msg += 'Please check if the dataset is available at this site!)\n'
578     if self.cfg_params.has_key('EDG.ce_white_list'):
579     msg += 'WARNING: CE White List: '+self.cfg_params['EDG.ce_white_list']+'\n'
580     msg += '(Hint: By whitelisting you force the job to run at this particular site(s).\n'
581     msg += 'Please check if the dataset is available at this site!)\n'
582    
583 mcinquil 1.126 common.logger.message(msg)
584 gutsche 1.92
585 slacapra 1.9 self.list_of_args = list_of_lists
586     return
587    
588 afanfani 1.237 def jobSplittingNoBlockBoundary(self,blockSites):
589     """
590     """
591     # ---- Handle the possible job splitting configurations ---- #
592     if (self.selectTotalNumberEvents):
593     totalEventsRequested = self.total_number_of_events
594     if (self.selectEventsPerJob):
595     eventsPerJobRequested = self.eventsPerJob
596     if (self.selectNumberOfJobs):
597     totalEventsRequested = self.theNumberOfJobs * self.eventsPerJob
598    
599     # If user requested all the events in the dataset
600     if (totalEventsRequested == -1):
601     eventsRemaining=self.maxEvents
602     # If user requested more events than are in the dataset
603     elif (totalEventsRequested > self.maxEvents):
604     eventsRemaining = self.maxEvents
605     common.logger.message("Requested "+str(self.total_number_of_events)+ " events, but only "+str(self.maxEvents)+" events are available.")
606     # If user requested less events than are in the dataset
607     else:
608     eventsRemaining = totalEventsRequested
609    
610     # If user requested more events per job than are in the dataset
611     if (self.selectEventsPerJob and eventsPerJobRequested > self.maxEvents):
612     eventsPerJobRequested = self.maxEvents
613    
614     # For user info at end
615     totalEventCount = 0
616    
617     if (self.selectTotalNumberEvents and self.selectNumberOfJobs):
618     eventsPerJobRequested = int(eventsRemaining/self.theNumberOfJobs)
619    
620     if (self.selectNumberOfJobs):
621     common.logger.message("May not create the exact number_of_jobs requested.")
622    
623     if ( self.ncjobs == 'all' ) :
624     totalNumberOfJobs = 999999999
625     else :
626     totalNumberOfJobs = self.ncjobs
627    
628     blocks = blockSites.keys()
629     blockCount = 0
630     # Backup variable in case self.maxEvents counted events in a non-included block
631     numBlocksInDataset = len(blocks)
632    
633     jobCount = 0
634     list_of_lists = []
635    
636     #AF
637     #AF do not reset input files and event count on block boundary
638     #AF
639     parString=""
640     filesEventCount = 0
641     #AF
642    
643     # list tracking which jobs are in which jobs belong to which block
644     jobsOfBlock = {}
645     while ( (eventsRemaining > 0) and (blockCount < numBlocksInDataset) and (jobCount < totalNumberOfJobs)):
646     block = blocks[blockCount]
647     blockCount += 1
648     if block not in jobsOfBlock.keys() :
649     jobsOfBlock[block] = []
650    
651     if self.eventsbyblock.has_key(block) :
652     numEventsInBlock = self.eventsbyblock[block]
653     common.logger.debug(5,'Events in Block File '+str(numEventsInBlock))
654     files = self.filesbyblock[block]
655     numFilesInBlock = len(files)
656     if (numFilesInBlock <= 0):
657     continue
658     fileCount = 0
659     #AF
660     #AF do not reset input files and event count of block boundary
661     #AF
662     ## ---- New block => New job ---- #
663     #parString = ""
664     # counter for number of events in files currently worked on
665     #filesEventCount = 0
666     #AF
667     # flag if next while loop should touch new file
668     newFile = 1
669     # job event counter
670     jobSkipEventCount = 0
671    
672     # ---- Iterate over the files in the block until we've met the requested ---- #
673     # ---- total # of events or we've gone over all the files in this block ---- #
674     pString=''
675     while ( (eventsRemaining > 0) and (fileCount < numFilesInBlock) and (jobCount < totalNumberOfJobs) ):
676     file = files[fileCount]
677     if self.useParent:
678     parent = self.parentFiles[file]
679     for f in parent :
680     pString += '\\\"' + f + '\\\"\,'
681     common.logger.debug(6, "File "+str(file)+" has the following parents: "+str(parent))
682     common.logger.write("File "+str(file)+" has the following parents: "+str(parent))
683     if newFile :
684     try:
685     numEventsInFile = self.eventsbyfile[file]
686     common.logger.debug(6, "File "+str(file)+" has "+str(numEventsInFile)+" events")
687     # increase filesEventCount
688     filesEventCount += numEventsInFile
689     # Add file to current job
690     parString += '\\\"' + file + '\\\"\,'
691     newFile = 0
692     except KeyError:
693     common.logger.message("File "+str(file)+" has unknown number of events: skipping")
694     eventsPerJobRequested = min(eventsPerJobRequested, eventsRemaining)
695     #common.logger.message("AF filesEventCount %s - jobSkipEventCount %s "%(filesEventCount,jobSkipEventCount))
696     # if less events in file remain than eventsPerJobRequested
697     if ( filesEventCount - jobSkipEventCount < eventsPerJobRequested):
698     #AF
699     #AF skip fileboundary part
700     #AF
701     # go to next file
702     newFile = 1
703     fileCount += 1
704     # if events in file equal to eventsPerJobRequested
705     elif ( filesEventCount - jobSkipEventCount == eventsPerJobRequested ) :
706     # close job and touch new file
707     fullString = parString[:-2]
708     if self.useParent:
709     fullParentString = pString[:-2]
710     list_of_lists.append([fullString,fullParentString,str(eventsPerJobRequested),str(jobSkipEventCount)])
711     else:
712     list_of_lists.append([fullString,str(eventsPerJobRequested),str(jobSkipEventCount)])
713     common.logger.debug(3,"Job "+str(jobCount+1)+" can run over "+str(eventsPerJobRequested)+" events.")
714     self.jobDestination.append(blockSites[block])
715     common.logger.debug(5,"Job "+str(jobCount+1)+" Destination: "+str(self.jobDestination[jobCount]))
716     jobsOfBlock[block].append(jobCount+1)
717     # reset counter
718     jobCount = jobCount + 1
719     totalEventCount = totalEventCount + eventsPerJobRequested
720     eventsRemaining = eventsRemaining - eventsPerJobRequested
721     jobSkipEventCount = 0
722     # reset file
723     pString = ""
724     parString = ""
725     filesEventCount = 0
726     newFile = 1
727     fileCount += 1
728    
729     # if more events in file remain than eventsPerJobRequested
730     else :
731     # close job but don't touch new file
732     fullString = parString[:-2]
733     if self.useParent:
734     fullParentString = pString[:-2]
735     list_of_lists.append([fullString,fullParentString,str(eventsPerJobRequested),str(jobSkipEventCount)])
736     else:
737     list_of_lists.append([fullString,str(eventsPerJobRequested),str(jobSkipEventCount)])
738     common.logger.debug(3,"Job "+str(jobCount+1)+" can run over "+str(eventsPerJobRequested)+" events.")
739     self.jobDestination.append(blockSites[block])
740     common.logger.debug(5,"Job "+str(jobCount+1)+" Destination: "+str(self.jobDestination[jobCount]))
741     jobsOfBlock[block].append(jobCount+1)
742     # increase counter
743     jobCount = jobCount + 1
744     totalEventCount = totalEventCount + eventsPerJobRequested
745     eventsRemaining = eventsRemaining - eventsPerJobRequested
746     # calculate skip events for last file
747     # use filesEventCount (contains several files), jobSkipEventCount and eventsPerJobRequest
748     jobSkipEventCount = eventsPerJobRequested - (filesEventCount - jobSkipEventCount - self.eventsbyfile[file])
749     # remove all but the last file
750     filesEventCount = self.eventsbyfile[file]
751     if self.useParent:
752     for f in parent : pString += '\\\"' + f + '\\\"\,'
753     parString = '\\\"' + file + '\\\"\,'
754     pass # END if
755     pass # END while (iterate over files in the block)
756     pass # END while (iterate over blocks in the dataset)
757     self.ncjobs = self.total_number_of_jobs = jobCount
758     if (eventsRemaining > 0 and jobCount < totalNumberOfJobs ):
759     common.logger.message("eventsRemaining "+str(eventsRemaining))
760     common.logger.message("jobCount "+str(jobCount))
761     common.logger.message(" totalNumberOfJobs "+str(totalNumberOfJobs))
762     common.logger.message("Could not run on all requested events because some blocks not hosted at allowed sites.")
763     common.logger.message(str(jobCount)+" job(s) can run on "+str(totalEventCount)+" events.\n")
764    
765     # screen output
766     screenOutput = "List of jobs and available destination sites:\n\n"
767    
768     #AF
769     #AF skip check on block with no sites
770     #AF
771     self.list_of_args = list_of_lists
772    
773     return
774    
775    
776    
777 slacapra 1.21 def jobSplittingNoInput(self):
778 slacapra 1.9 """
779     Perform job splitting based on number of event per job
780     """
781     common.logger.debug(5,'Splitting per events')
782 fanzago 1.130
783 ewv 1.131 if (self.selectEventsPerJob):
784 fanzago 1.130 common.logger.message('Required '+str(self.eventsPerJob)+' events per job ')
785     if (self.selectNumberOfJobs):
786     common.logger.message('Required '+str(self.theNumberOfJobs)+' jobs in total ')
787     if (self.selectTotalNumberEvents):
788     common.logger.message('Required '+str(self.total_number_of_events)+' events in total ')
789 slacapra 1.9
790 slacapra 1.10 if (self.total_number_of_events < 0):
791     msg='Cannot split jobs per Events with "-1" as total number of events'
792     raise CrabException(msg)
793    
794 slacapra 1.22 if (self.selectEventsPerJob):
795 spiga 1.65 if (self.selectTotalNumberEvents):
796     self.total_number_of_jobs = int(self.total_number_of_events/self.eventsPerJob)
797 ewv 1.131 elif(self.selectNumberOfJobs) :
798 spiga 1.65 self.total_number_of_jobs =self.theNumberOfJobs
799 ewv 1.131 self.total_number_of_events =int(self.theNumberOfJobs*self.eventsPerJob)
800 spiga 1.65
801 slacapra 1.22 elif (self.selectNumberOfJobs) :
802     self.total_number_of_jobs = self.theNumberOfJobs
803     self.eventsPerJob = int(self.total_number_of_events/self.total_number_of_jobs)
804 ewv 1.131
805 slacapra 1.9 common.logger.debug(5,'N jobs '+str(self.total_number_of_jobs))
806    
807     # is there any remainder?
808     check = int(self.total_number_of_events) - (int(self.total_number_of_jobs)*self.eventsPerJob)
809    
810     common.logger.debug(5,'Check '+str(check))
811    
812 gutsche 1.35 common.logger.message(str(self.total_number_of_jobs)+' jobs can be created, each for '+str(self.eventsPerJob)+' for a total of '+str(self.total_number_of_jobs*self.eventsPerJob)+' events')
813 slacapra 1.9 if check > 0:
814 gutsche 1.35 common.logger.message('Warning: asked '+str(self.total_number_of_events)+' but can do only '+str(int(self.total_number_of_jobs)*self.eventsPerJob))
815 slacapra 1.9
816 slacapra 1.10 # argument is seed number.$i
817 slacapra 1.9 self.list_of_args = []
818     for i in range(self.total_number_of_jobs):
819 gutsche 1.35 ## Since there is no input, any site is good
820 ewv 1.131 self.jobDestination.append([""]) #must be empty to write correctly the xml
821 slacapra 1.90 args=[]
822 spiga 1.57 if (self.firstRun):
823 slacapra 1.138 ## pythia first run
824 slacapra 1.90 args.append(str(self.firstRun)+str(i))
825     self.list_of_args.append(args)
826 ewv 1.131
827 gutsche 1.3 return
828    
829 spiga 1.42
830 spiga 1.187 def jobSplittingForScript(self):
831 spiga 1.42 """
832     Perform job splitting based on number of job
833     """
834     common.logger.debug(5,'Splitting per job')
835     common.logger.message('Required '+str(self.theNumberOfJobs)+' jobs in total ')
836    
837     self.total_number_of_jobs = self.theNumberOfJobs
838    
839     common.logger.debug(5,'N jobs '+str(self.total_number_of_jobs))
840    
841     common.logger.message(str(self.total_number_of_jobs)+' jobs can be created')
842    
843     # argument is seed number.$i
844     self.list_of_args = []
845     for i in range(self.total_number_of_jobs):
846     self.jobDestination.append([""])
847     self.list_of_args.append([str(i)])
848     return
849    
850 spiga 1.208 def split(self, jobParams,firstJobID):
851 ewv 1.131
852 gutsche 1.3 njobs = self.total_number_of_jobs
853 slacapra 1.9 arglist = self.list_of_args
854 gutsche 1.3 # create the empty structure
855     for i in range(njobs):
856     jobParams.append("")
857 ewv 1.131
858 spiga 1.165 listID=[]
859     listField=[]
860 spiga 1.208 for id in range(njobs):
861     job = id + int(firstJobID)
862     jobParams[id] = arglist[id]
863 spiga 1.167 listID.append(job+1)
864 spiga 1.162 job_ToSave ={}
865 spiga 1.169 concString = ' '
866 spiga 1.165 argu=''
867 spiga 1.208 if len(jobParams[id]):
868     argu += concString.join(jobParams[id] )
869 spiga 1.187 job_ToSave['arguments']= str(job+1)+' '+argu
870 spiga 1.208 job_ToSave['dlsDestination']= self.jobDestination[id]
871 spiga 1.165 listField.append(job_ToSave)
872 spiga 1.169 msg="Job "+str(job)+" Arguments: "+str(job+1)+" "+argu+"\n" \
873 spiga 1.208 +" Destination: "+str(self.jobDestination[id])
874 spiga 1.165 common.logger.debug(5,msg)
875 spiga 1.187 common._db.updateJob_(listID,listField)
876 spiga 1.181 self.argsList = (len(jobParams[0])+1)
877 gutsche 1.3
878     return
879 ewv 1.131
880 gutsche 1.3 def numberOfJobs(self):
881     return self.total_number_of_jobs
882    
883 slacapra 1.1 def getTarBall(self, exe):
884     """
885     Return the TarBall with lib and exe
886     """
887 corvo 1.56 self.tgzNameWithPath = common.work_space.pathForTgz()+'share/'+self.tgz_name
888 slacapra 1.1 if os.path.exists(self.tgzNameWithPath):
889     return self.tgzNameWithPath
890    
891     # Prepare a tar gzipped file with user binaries.
892     self.buildTar_(exe)
893    
894     return string.strip(self.tgzNameWithPath)
895    
896     def buildTar_(self, executable):
897    
898     # First of all declare the user Scram area
899     swArea = self.scram.getSWArea_()
900     swReleaseTop = self.scram.getReleaseTop_()
901 ewv 1.131
902 slacapra 1.1 ## check if working area is release top
903     if swReleaseTop == '' or swArea == swReleaseTop:
904 afanfani 1.172 common.logger.debug(3,"swArea = "+swArea+" swReleaseTop ="+swReleaseTop)
905 slacapra 1.1 return
906    
907 slacapra 1.61 import tarfile
908     try: # create tar ball
909     tar = tarfile.open(self.tgzNameWithPath, "w:gz")
910     ## First find the executable
911 slacapra 1.86 if (self.executable != ''):
912 slacapra 1.61 exeWithPath = self.scram.findFile_(executable)
913     if ( not exeWithPath ):
914     raise CrabException('User executable '+executable+' not found')
915 ewv 1.131
916 slacapra 1.61 ## then check if it's private or not
917     if exeWithPath.find(swReleaseTop) == -1:
918     # the exe is private, so we must ship
919     common.logger.debug(5,"Exe "+exeWithPath+" to be tarred")
920     path = swArea+'/'
921 corvo 1.85 # distinguish case when script is in user project area or given by full path somewhere else
922     if exeWithPath.find(path) >= 0 :
923     exe = string.replace(exeWithPath, path,'')
924 slacapra 1.129 tar.add(path+exe,exe)
925 corvo 1.85 else :
926     tar.add(exeWithPath,os.path.basename(executable))
927 slacapra 1.61 pass
928     else:
929     # the exe is from release, we'll find it on WN
930     pass
931 ewv 1.131
932 slacapra 1.61 ## Now get the libraries: only those in local working area
933     libDir = 'lib'
934     lib = swArea+'/' +libDir
935     common.logger.debug(5,"lib "+lib+" to be tarred")
936     if os.path.exists(lib):
937     tar.add(lib,libDir)
938 ewv 1.131
939 slacapra 1.61 ## Now check if module dir is present
940     moduleDir = 'module'
941     module = swArea + '/' + moduleDir
942     if os.path.isdir(module):
943     tar.add(module,moduleDir)
944    
945     ## Now check if any data dir(s) is present
946 spiga 1.179 self.dataExist = False
947 slacapra 1.212 todo_list = [(i, i) for i in os.listdir(swArea+"/src")]
948 slacapra 1.206 while len(todo_list):
949     entry, name = todo_list.pop()
950 slacapra 1.211 if name.startswith('crab_0_') or name.startswith('.') or name == 'CVS':
951 slacapra 1.206 continue
952 slacapra 1.212 if os.path.isdir(swArea+"/src/"+entry):
953 slacapra 1.206 entryPath = entry + '/'
954 slacapra 1.212 todo_list += [(entryPath + i, i) for i in os.listdir(swArea+"/src/"+entry)]
955 slacapra 1.206 if name == 'data':
956     self.dataExist=True
957     common.logger.debug(5,"data "+entry+" to be tarred")
958 slacapra 1.212 tar.add(swArea+"/src/"+entry,"src/"+entry)
959 slacapra 1.206 pass
960     pass
961 ewv 1.182
962 spiga 1.179 ### CMSSW ParameterSet
963     if not self.pset is None:
964     cfg_file = common.work_space.jobDir()+self.configFilename()
965 ewv 1.182 tar.add(cfg_file,self.configFilename())
966 spiga 1.179 common.logger.debug(5,"File added to "+self.tgzNameWithPath+" : "+str(tar.getnames()))
967 slacapra 1.61
968 fanzago 1.93
969 fanzago 1.152 ## Add ProdCommon dir to tar
970 slacapra 1.211 prodcommonDir = './'
971     prodcommonPath = os.environ['CRABDIR'] + '/' + 'external/'
972 slacapra 1.214 neededStuff = ['ProdCommon/__init__.py','ProdCommon/FwkJobRep', 'ProdCommon/CMSConfigTools','ProdCommon/Core','ProdCommon/MCPayloads', 'IMProv']
973     for file in neededStuff:
974     tar.add(prodcommonPath+file,prodcommonDir+file)
975 spiga 1.179 common.logger.debug(5,"Files added to "+self.tgzNameWithPath+" : "+str(tar.getnames()))
976    
977     ##### ML stuff
978     ML_file_list=['report.py', 'DashboardAPI.py', 'Logger.py', 'ProcInfo.py', 'apmon.py']
979     path=os.environ['CRABDIR'] + '/python/'
980     for file in ML_file_list:
981     tar.add(path+file,file)
982     common.logger.debug(5,"Files added to "+self.tgzNameWithPath+" : "+str(tar.getnames()))
983    
984     ##### Utils
985 spiga 1.238 Utils_file_list=['parseCrabFjr.py','writeCfg.py', 'fillCrabFjr.py','cmscp.py']
986 spiga 1.179 for file in Utils_file_list:
987     tar.add(path+file,file)
988     common.logger.debug(5,"Files added to "+self.tgzNameWithPath+" : "+str(tar.getnames()))
989 ewv 1.131
990 ewv 1.182 ##### AdditionalFiles
991 spiga 1.179 for file in self.additional_inbox_files:
992     tar.add(file,string.split(file,'/')[-1])
993 slacapra 1.61 common.logger.debug(5,"Files added to "+self.tgzNameWithPath+" : "+str(tar.getnames()))
994 ewv 1.182
995 slacapra 1.61 tar.close()
996 slacapra 1.220 except IOError:
997     raise CrabException('Could not create tar-ball '+self.tgzNameWithPath)
998 slacapra 1.212 except tarfile.TarError:
999 slacapra 1.206 raise CrabException('Could not create tar-ball '+self.tgzNameWithPath)
1000 gutsche 1.72
1001     ## check for tarball size
1002     tarballinfo = os.stat(self.tgzNameWithPath)
1003     if ( tarballinfo.st_size > self.MaxTarBallSize*1024*1024 ) :
1004 spiga 1.238 msg = 'Input sandbox size of ' + str(float(tarballinfo.st_size)/1024.0/1024.0) + ' MB is larger than the allowed ' + str(self.MaxTarBallSize) \
1005     'MB input sandbox limit \n'
1006     msg += ' and not supported by the direct GRID submission system.\n'
1007     msg += ' Please use the CRAB server mode by setting server_name=<NAME> in section [CRAB] of your crab.cfg.\n'
1008     msg += ' For further infos please see https://twiki.cern.ch/twiki/bin/view/CMS/CrabServer#CRABSERVER_for_Users'
1009     raise CrabException(msg)
1010 gutsche 1.72
1011 slacapra 1.61 ## create tar-ball with ML stuff
1012 slacapra 1.97
1013 spiga 1.165 def wsSetupEnvironment(self, nj=0):
1014 slacapra 1.1 """
1015     Returns part of a job script which prepares
1016     the execution environment for the job 'nj'.
1017     """
1018 ewv 1.184 if (self.CMSSW_major >= 2 and self.CMSSW_minor >= 1) or (self.CMSSW_major >= 3):
1019     psetName = 'pset.py'
1020     else:
1021     psetName = 'pset.cfg'
1022 slacapra 1.1 # Prepare JobType-independent part
1023 ewv 1.160 txt = '\n#Written by cms_cmssw::wsSetupEnvironment\n'
1024 fanzago 1.133 txt += 'echo ">>> setup environment"\n'
1025 ewv 1.131 txt += 'if [ $middleware == LCG ]; then \n'
1026 gutsche 1.3 txt += self.wsSetupCMSLCGEnvironment_()
1027     txt += 'elif [ $middleware == OSG ]; then\n'
1028 gutsche 1.43 txt += ' WORKING_DIR=`/bin/mktemp -d $OSG_WN_TMP/cms_XXXXXXXXXXXX`\n'
1029 ewv 1.132 txt += ' if [ ! $? == 0 ] ;then\n'
1030 fanzago 1.161 txt += ' echo "ERROR ==> OSG $WORKING_DIR could not be created on WN `hostname`"\n'
1031     txt += ' job_exit_code=10016\n'
1032     txt += ' func_exit\n'
1033 gutsche 1.3 txt += ' fi\n'
1034 fanzago 1.133 txt += ' echo ">>> Created working directory: $WORKING_DIR"\n'
1035 gutsche 1.3 txt += '\n'
1036     txt += ' echo "Change to working directory: $WORKING_DIR"\n'
1037     txt += ' cd $WORKING_DIR\n'
1038 fanzago 1.133 txt += ' echo ">>> current directory (WORKING_DIR): $WORKING_DIR"\n'
1039 ewv 1.131 txt += self.wsSetupCMSOSGEnvironment_()
1040 gutsche 1.3 txt += 'fi\n'
1041 slacapra 1.1
1042     # Prepare JobType-specific part
1043     scram = self.scram.commandName()
1044     txt += '\n\n'
1045 fanzago 1.133 txt += 'echo ">>> specific cmssw setup environment:"\n'
1046     txt += 'echo "CMSSW_VERSION = '+self.version+'"\n'
1047 slacapra 1.1 txt += scram+' project CMSSW '+self.version+'\n'
1048     txt += 'status=$?\n'
1049     txt += 'if [ $status != 0 ] ; then\n'
1050 fanzago 1.161 txt += ' echo "ERROR ==> CMSSW '+self.version+' not found on `hostname`" \n'
1051     txt += ' job_exit_code=10034\n'
1052 fanzago 1.163 txt += ' func_exit\n'
1053 slacapra 1.1 txt += 'fi \n'
1054     txt += 'cd '+self.version+'\n'
1055 fanzago 1.99 txt += 'SOFTWARE_DIR=`pwd`\n'
1056 fanzago 1.133 txt += 'echo ">>> current directory (SOFTWARE_DIR): $SOFTWARE_DIR" \n'
1057 slacapra 1.1 txt += 'eval `'+scram+' runtime -sh | grep -v SCRAMRT_LSB_JOBNAME`\n'
1058 fanzago 1.180 txt += 'if [ $? != 0 ] ; then\n'
1059     txt += ' echo "ERROR ==> Problem with the command: "\n'
1060     txt += ' echo "eval \`'+scram+' runtime -sh | grep -v SCRAMRT_LSB_JOBNAME \` at `hostname`"\n'
1061     txt += ' job_exit_code=10034\n'
1062     txt += ' func_exit\n'
1063     txt += 'fi \n'
1064 slacapra 1.1 # Handle the arguments:
1065     txt += "\n"
1066 gutsche 1.7 txt += "## number of arguments (first argument always jobnumber)\n"
1067 slacapra 1.1 txt += "\n"
1068 spiga 1.165 txt += "if [ $nargs -lt "+str(self.argsList)+" ]\n"
1069 slacapra 1.1 txt += "then\n"
1070 fanzago 1.161 txt += " echo 'ERROR ==> Too few arguments' +$nargs+ \n"
1071     txt += ' job_exit_code=50113\n'
1072     txt += " func_exit\n"
1073 slacapra 1.1 txt += "fi\n"
1074     txt += "\n"
1075    
1076     # Prepare job-specific part
1077     job = common.job_list[nj]
1078 ewv 1.131 if (self.datasetPath):
1079 spiga 1.238 self.primaryDataset = self.datasetPath.split("/")[1]
1080     DataTier = self.datasetPath.split("/")[2]
1081 fanzago 1.93 txt += '\n'
1082     txt += 'DatasetPath='+self.datasetPath+'\n'
1083    
1084 spiga 1.238 txt += 'PrimaryDataset='+self.primaryDataset +'\n'
1085     txt += 'DataTier='+DataTier+'\n'
1086 fanzago 1.96 txt += 'ApplicationFamily=cmsRun\n'
1087 fanzago 1.93
1088     else:
1089 spiga 1.238 self.primaryDataset = 'null'
1090 fanzago 1.93 txt += 'DatasetPath=MCDataTier\n'
1091     txt += 'PrimaryDataset=null\n'
1092     txt += 'DataTier=null\n'
1093     txt += 'ApplicationFamily=MCDataTier\n'
1094 ewv 1.170 if self.pset != None:
1095 spiga 1.42 pset = os.path.basename(job.configFilename())
1096     txt += '\n'
1097 spiga 1.95 txt += 'cp $RUNTIME_AREA/'+pset+' .\n'
1098 spiga 1.42 if (self.datasetPath): # standard job
1099 ewv 1.160 txt += 'InputFiles=${args[1]}; export InputFiles\n'
1100 ewv 1.226 if (self.useParent):
1101 spiga 1.204 txt += 'ParentFiles=${args[2]}; export ParentFiles\n'
1102     txt += 'MaxEvents=${args[3]}; export MaxEvents\n'
1103     txt += 'SkipEvents=${args[4]}; export SkipEvents\n'
1104     else:
1105     txt += 'MaxEvents=${args[2]}; export MaxEvents\n'
1106     txt += 'SkipEvents=${args[3]}; export SkipEvents\n'
1107 spiga 1.42 txt += 'echo "Inputfiles:<$InputFiles>"\n'
1108 spiga 1.204 if (self.useParent): txt += 'echo "ParentFiles:<$ParentFiles>"\n'
1109 spiga 1.42 txt += 'echo "MaxEvents:<$MaxEvents>"\n'
1110     txt += 'echo "SkipEvents:<$SkipEvents>"\n'
1111     else: # pythia like job
1112 ewv 1.160 txt += 'PreserveSeeds=' + ','.join(self.preserveSeeds) + '; export PreserveSeeds\n'
1113     txt += 'IncrementSeeds=' + ','.join(self.incrementSeeds) + '; export IncrementSeeds\n'
1114     txt += 'echo "PreserveSeeds: <$PreserveSeeds>"\n'
1115     txt += 'echo "IncrementSeeds:<$IncrementSeeds>"\n'
1116 slacapra 1.90 if (self.firstRun):
1117 ewv 1.160 txt += 'FirstRun=${args[1]}; export FirstRun\n'
1118 spiga 1.57 txt += 'echo "FirstRun: <$FirstRun>"\n'
1119 slacapra 1.90
1120 ewv 1.184 txt += 'mv -f ' + pset + ' ' + psetName + '\n'
1121 slacapra 1.1
1122    
1123 fanzago 1.163 if self.pset != None:
1124 ewv 1.184 # FUTURE: Can simply for 2_1_x and higher
1125 spiga 1.42 txt += '\n'
1126 spiga 1.197 if self.debug_wrapper==True:
1127 spiga 1.188 txt += 'echo "***** cat ' + psetName + ' *********"\n'
1128     txt += 'cat ' + psetName + '\n'
1129     txt += 'echo "****** end ' + psetName + ' ********"\n'
1130     txt += '\n'
1131 ewv 1.226 if (self.CMSSW_major >= 2 and self.CMSSW_minor >= 1) or (self.CMSSW_major >= 3):
1132     txt += 'PSETHASH=`edmConfigHash ' + psetName + '` \n'
1133     else:
1134     txt += 'PSETHASH=`edmConfigHash < ' + psetName + '` \n'
1135 fanzago 1.94 txt += 'echo "PSETHASH = $PSETHASH" \n'
1136 fanzago 1.93 txt += '\n'
1137 gutsche 1.3 return txt
1138 slacapra 1.176
1139 fanzago 1.166 def wsUntarSoftware(self, nj=0):
1140 gutsche 1.3 """
1141     Put in the script the commands to build an executable
1142     or a library.
1143     """
1144    
1145 fanzago 1.166 txt = '\n#Written by cms_cmssw::wsUntarSoftware\n'
1146 gutsche 1.3
1147     if os.path.isfile(self.tgzNameWithPath):
1148 fanzago 1.133 txt += 'echo ">>> tar xzvf $RUNTIME_AREA/'+os.path.basename(self.tgzNameWithPath)+' :" \n'
1149 gutsche 1.3 txt += 'tar xzvf $RUNTIME_AREA/'+os.path.basename(self.tgzNameWithPath)+'\n'
1150 spiga 1.199 if self.debug_wrapper:
1151     txt += 'ls -Al \n'
1152 gutsche 1.3 txt += 'untar_status=$? \n'
1153     txt += 'if [ $untar_status -ne 0 ]; then \n'
1154 fanzago 1.161 txt += ' echo "ERROR ==> Untarring .tgz file failed"\n'
1155     txt += ' job_exit_code=$untar_status\n'
1156     txt += ' func_exit\n'
1157 gutsche 1.3 txt += 'else \n'
1158     txt += ' echo "Successful untar" \n'
1159     txt += 'fi \n'
1160 gutsche 1.50 txt += '\n'
1161 slacapra 1.211 txt += 'echo ">>> Include $RUNTIME_AREA in PYTHONPATH:"\n'
1162 gutsche 1.50 txt += 'if [ -z "$PYTHONPATH" ]; then\n'
1163 slacapra 1.211 txt += ' export PYTHONPATH=$RUNTIME_AREA/\n'
1164 gutsche 1.50 txt += 'else\n'
1165 slacapra 1.211 txt += ' export PYTHONPATH=$RUNTIME_AREA/:${PYTHONPATH}\n'
1166 fanzago 1.93 txt += 'echo "PYTHONPATH=$PYTHONPATH"\n'
1167 gutsche 1.50 txt += 'fi\n'
1168     txt += '\n'
1169    
1170 gutsche 1.3 pass
1171 ewv 1.131
1172 slacapra 1.1 return txt
1173 ewv 1.170
1174 fanzago 1.166 def wsBuildExe(self, nj=0):
1175     """
1176     Put in the script the commands to build an executable
1177     or a library.
1178     """
1179    
1180     txt = '\n#Written by cms_cmssw::wsBuildExe\n'
1181     txt += 'echo ">>> moving CMSSW software directories in `pwd`" \n'
1182    
1183 ewv 1.170 txt += 'rm -r lib/ module/ \n'
1184     txt += 'mv $RUNTIME_AREA/lib/ . \n'
1185     txt += 'mv $RUNTIME_AREA/module/ . \n'
1186 spiga 1.186 if self.dataExist == True:
1187     txt += 'rm -r src/ \n'
1188     txt += 'mv $RUNTIME_AREA/src/ . \n'
1189 ewv 1.182 if len(self.additional_inbox_files)>0:
1190 spiga 1.179 for file in self.additional_inbox_files:
1191 spiga 1.191 txt += 'mv $RUNTIME_AREA/'+os.path.basename(file)+' . \n'
1192 slacapra 1.214 # txt += 'mv $RUNTIME_AREA/ProdCommon/ . \n'
1193     # txt += 'mv $RUNTIME_AREA/IMProv/ . \n'
1194 ewv 1.170
1195 slacapra 1.211 txt += 'echo ">>> Include $RUNTIME_AREA in PYTHONPATH:"\n'
1196 fanzago 1.166 txt += 'if [ -z "$PYTHONPATH" ]; then\n'
1197 slacapra 1.211 txt += ' export PYTHONPATH=$RUNTIME_AREA/\n'
1198 fanzago 1.166 txt += 'else\n'
1199 slacapra 1.211 txt += ' export PYTHONPATH=$RUNTIME_AREA/:${PYTHONPATH}\n'
1200 fanzago 1.166 txt += 'echo "PYTHONPATH=$PYTHONPATH"\n'
1201     txt += 'fi\n'
1202     txt += '\n'
1203    
1204     return txt
1205 slacapra 1.1
1206 ewv 1.131
1207 slacapra 1.1 def executableName(self):
1208 ewv 1.192 if self.scriptExe:
1209 spiga 1.42 return "sh "
1210     else:
1211     return self.executable
1212 slacapra 1.1
1213     def executableArgs(self):
1214 ewv 1.160 # FUTURE: This function tests the CMSSW version. Can be simplified as we drop support for old versions
1215 slacapra 1.70 if self.scriptExe:#CarlosDaniele
1216 spiga 1.42 return self.scriptExe + " $NJob"
1217 fanzago 1.115 else:
1218 ewv 1.160 ex_args = ""
1219 ewv 1.171 # FUTURE: This tests the CMSSW version. Can remove code as versions deprecated
1220 ewv 1.160 # Framework job report
1221 ewv 1.184 if (self.CMSSW_major >= 1 and self.CMSSW_minor >= 5) or (self.CMSSW_major >= 2):
1222 fanzago 1.166 ex_args += " -j $RUNTIME_AREA/crab_fjr_$NJob.xml"
1223 ewv 1.184 # Type of config file
1224     if self.CMSSW_major >= 2 :
1225 ewv 1.171 ex_args += " -p pset.py"
1226 fanzago 1.115 else:
1227 ewv 1.160 ex_args += " -p pset.cfg"
1228     return ex_args
1229 slacapra 1.1
1230     def inputSandbox(self, nj):
1231     """
1232     Returns a list of filenames to be put in JDL input sandbox.
1233     """
1234     inp_box = []
1235     if os.path.isfile(self.tgzNameWithPath):
1236     inp_box.append(self.tgzNameWithPath)
1237 spiga 1.168 wrapper = os.path.basename(str(common._db.queryTask('scriptName')))
1238     inp_box.append(common.work_space.pathForTgz() +'job/'+ wrapper)
1239 slacapra 1.1 return inp_box
1240    
1241     def outputSandbox(self, nj):
1242     """
1243     Returns a list of filenames to be put in JDL output sandbox.
1244     """
1245     out_box = []
1246    
1247     ## User Declared output files
1248 slacapra 1.54 for out in (self.output_file+self.output_file_sandbox):
1249 ewv 1.131 n_out = nj + 1
1250 slacapra 1.207 out_box.append(numberFile(out,str(n_out)))
1251 slacapra 1.1 return out_box
1252    
1253    
1254     def wsRenameOutput(self, nj):
1255     """
1256     Returns part of a job script which renames the produced files.
1257     """
1258    
1259 ewv 1.160 txt = '\n#Written by cms_cmssw::wsRenameOutput\n'
1260 fanzago 1.148 txt += 'echo ">>> current directory (SOFTWARE_DIR): $SOFTWARE_DIR" \n'
1261     txt += 'echo ">>> current directory content:"\n'
1262 ewv 1.226 if self.debug_wrapper:
1263 spiga 1.199 txt += 'ls -Al\n'
1264 fanzago 1.145 txt += '\n'
1265 slacapra 1.54
1266 fanzago 1.128 for fileWithSuffix in (self.output_file):
1267 slacapra 1.207 output_file_num = numberFile(fileWithSuffix, '$NJob')
1268 slacapra 1.1 txt += '\n'
1269 gutsche 1.7 txt += '# check output file\n'
1270 slacapra 1.106 txt += 'if [ -e ./'+fileWithSuffix+' ] ; then\n'
1271 ewv 1.147 if (self.copy_data == 1): # For OSG nodes, file is in $WORKING_DIR, should not be moved to $RUNTIME_AREA
1272     txt += ' mv '+fileWithSuffix+' '+output_file_num+'\n'
1273 spiga 1.209 txt += ' ln -s `pwd`/'+output_file_num+' $RUNTIME_AREA/'+fileWithSuffix+'\n'
1274 ewv 1.147 else:
1275     txt += ' mv '+fileWithSuffix+' $RUNTIME_AREA/'+output_file_num+'\n'
1276     txt += ' ln -s $RUNTIME_AREA/'+output_file_num+' $RUNTIME_AREA/'+fileWithSuffix+'\n'
1277 slacapra 1.106 txt += 'else\n'
1278 fanzago 1.161 txt += ' job_exit_code=60302\n'
1279     txt += ' echo "WARNING: Output file '+fileWithSuffix+' not found"\n'
1280 ewv 1.156 if common.scheduler.name().upper() == 'CONDOR_G':
1281 gutsche 1.7 txt += ' if [ $middleware == OSG ]; then \n'
1282     txt += ' echo "prepare dummy output file"\n'
1283     txt += ' echo "Processing of job output failed" > $RUNTIME_AREA/'+output_file_num+'\n'
1284     txt += ' fi \n'
1285 slacapra 1.1 txt += 'fi\n'
1286 slacapra 1.105 file_list = []
1287     for fileWithSuffix in (self.output_file):
1288 slacapra 1.207 file_list.append(numberFile(fileWithSuffix, '$NJob'))
1289 ewv 1.131
1290 slacapra 1.105 txt += 'file_list="'+string.join(file_list,' ')+'"\n'
1291 fanzago 1.149 txt += '\n'
1292 fanzago 1.148 txt += 'echo ">>> current directory (SOFTWARE_DIR): $SOFTWARE_DIR" \n'
1293     txt += 'echo ">>> current directory content:"\n'
1294 ewv 1.226 if self.debug_wrapper:
1295 spiga 1.199 txt += 'ls -Al\n'
1296 fanzago 1.148 txt += '\n'
1297 gutsche 1.7 txt += 'cd $RUNTIME_AREA\n'
1298 fanzago 1.133 txt += 'echo ">>> current directory (RUNTIME_AREA): $RUNTIME_AREA"\n'
1299 slacapra 1.1 return txt
1300    
1301 slacapra 1.63 def getRequirements(self, nj=[]):
1302 slacapra 1.1 """
1303 ewv 1.131 return job requirements to add to jdl files
1304 slacapra 1.1 """
1305     req = ''
1306 slacapra 1.47 if self.version:
1307 slacapra 1.10 req='Member("VO-cms-' + \
1308 slacapra 1.47 self.version + \
1309 slacapra 1.10 '", other.GlueHostApplicationSoftwareRunTimeEnvironment)'
1310 ewv 1.192 if self.executable_arch:
1311 gutsche 1.107 req+=' && Member("VO-cms-' + \
1312 slacapra 1.105 self.executable_arch + \
1313     '", other.GlueHostApplicationSoftwareRunTimeEnvironment)'
1314 gutsche 1.35
1315     req = req + ' && (other.GlueHostNetworkAdapterOutboundIP)'
1316 afanfani 1.229 if ( common.scheduler.name() == "glitecoll" ) or ( common.scheduler.name() == "glite"):
1317 afanfani 1.158 req += ' && other.GlueCEStateStatus == "Production" '
1318 gutsche 1.35
1319 slacapra 1.1 return req
1320 gutsche 1.3
1321     def configFilename(self):
1322     """ return the config filename """
1323 ewv 1.182 # FUTURE: Can remove cfg mode for CMSSW >= 2_1_x
1324 ewv 1.184 if (self.CMSSW_major >= 2 and self.CMSSW_minor >= 1) or (self.CMSSW_major >= 3):
1325 ewv 1.182 return self.name()+'.py'
1326     else:
1327     return self.name()+'.cfg'
1328 gutsche 1.3
1329     def wsSetupCMSOSGEnvironment_(self):
1330     """
1331     Returns part of a job script which is prepares
1332     the execution environment and which is common for all CMS jobs.
1333     """
1334 ewv 1.160 txt = '\n#Written by cms_cmssw::wsSetupCMSOSGEnvironment_\n'
1335     txt += ' echo ">>> setup CMS OSG environment:"\n'
1336 fanzago 1.133 txt += ' echo "set SCRAM ARCH to ' + self.executable_arch + '"\n'
1337     txt += ' export SCRAM_ARCH='+self.executable_arch+'\n'
1338 fanzago 1.136 txt += ' echo "SCRAM_ARCH = $SCRAM_ARCH"\n'
1339 ewv 1.135 txt += ' if [ -f $OSG_APP/cmssoft/cms/cmsset_default.sh ] ;then\n'
1340 mkirn 1.40 txt += ' # Use $OSG_APP/cmssoft/cms/cmsset_default.sh to setup cms software\n'
1341 fanzago 1.133 txt += ' source $OSG_APP/cmssoft/cms/cmsset_default.sh '+self.version+'\n'
1342     txt += ' else\n'
1343 fanzago 1.161 txt += ' echo "ERROR ==> $OSG_APP/cmssoft/cms/cmsset_default.sh file not found"\n'
1344     txt += ' job_exit_code=10020\n'
1345     txt += ' func_exit\n'
1346 fanzago 1.133 txt += ' fi\n'
1347 gutsche 1.3 txt += '\n'
1348 fanzago 1.161 txt += ' echo "==> setup cms environment ok"\n'
1349 fanzago 1.136 txt += ' echo "SCRAM_ARCH = $SCRAM_ARCH"\n'
1350 gutsche 1.3
1351     return txt
1352 ewv 1.131
1353 gutsche 1.3 def wsSetupCMSLCGEnvironment_(self):
1354     """
1355     Returns part of a job script which is prepares
1356     the execution environment and which is common for all CMS jobs.
1357     """
1358 ewv 1.160 txt = '\n#Written by cms_cmssw::wsSetupCMSLCGEnvironment_\n'
1359     txt += ' echo ">>> setup CMS LCG environment:"\n'
1360 fanzago 1.133 txt += ' echo "set SCRAM ARCH and BUILD_ARCH to ' + self.executable_arch + ' ###"\n'
1361     txt += ' export SCRAM_ARCH='+self.executable_arch+'\n'
1362     txt += ' export BUILD_ARCH='+self.executable_arch+'\n'
1363     txt += ' if [ ! $VO_CMS_SW_DIR ] ;then\n'
1364 fanzago 1.161 txt += ' echo "ERROR ==> CMS software dir not found on WN `hostname`"\n'
1365     txt += ' job_exit_code=10031\n'
1366     txt += ' func_exit\n'
1367 fanzago 1.133 txt += ' else\n'
1368     txt += ' echo "Sourcing environment... "\n'
1369     txt += ' if [ ! -s $VO_CMS_SW_DIR/cmsset_default.sh ] ;then\n'
1370 fanzago 1.161 txt += ' echo "ERROR ==> cmsset_default.sh file not found into dir $VO_CMS_SW_DIR"\n'
1371     txt += ' job_exit_code=10020\n'
1372     txt += ' func_exit\n'
1373 fanzago 1.133 txt += ' fi\n'
1374     txt += ' echo "sourcing $VO_CMS_SW_DIR/cmsset_default.sh"\n'
1375     txt += ' source $VO_CMS_SW_DIR/cmsset_default.sh\n'
1376     txt += ' result=$?\n'
1377     txt += ' if [ $result -ne 0 ]; then\n'
1378 fanzago 1.161 txt += ' echo "ERROR ==> problem sourcing $VO_CMS_SW_DIR/cmsset_default.sh"\n'
1379     txt += ' job_exit_code=10032\n'
1380     txt += ' func_exit\n'
1381 fanzago 1.133 txt += ' fi\n'
1382     txt += ' fi\n'
1383     txt += ' \n'
1384 fanzago 1.161 txt += ' echo "==> setup cms environment ok"\n'
1385 gutsche 1.3 return txt
1386 gutsche 1.5
1387 spiga 1.238 def wsModifyReport(self, nj):
1388 fanzago 1.93 """
1389 ewv 1.131 insert the part of the script that modifies the FrameworkJob Report
1390 fanzago 1.93 """
1391 spiga 1.238 txt = '\n#Written by cms_cmssw::wsModifyReport\n'
1392 slacapra 1.176 publish_data = int(self.cfg_params.get('USER.publish_data',0))
1393 ewv 1.131 if (publish_data == 1):
1394 spiga 1.238
1395     txt += 'if [ $StageOutExitStatus -eq 0 ]; then\n'
1396     txt += ' FOR_LFN=$LFNBaseName/${PSETHASH}/\n'
1397 fanzago 1.175 txt += 'else\n'
1398     txt += ' FOR_LFN=/copy_problems/ \n'
1399     txt += ' SE=""\n'
1400     txt += ' SE_PATH=""\n'
1401     txt += 'fi\n'
1402 ewv 1.182
1403 fanzago 1.175 txt += 'echo ">>> Modify Job Report:" \n'
1404 fanzago 1.217 txt += 'chmod a+x $RUNTIME_AREA/ProdCommon/FwkJobRep/ModifyJobReport.py\n'
1405 spiga 1.238 txt += 'ProcessedDataset= $procDataset \n'
1406 fanzago 1.175 txt += 'echo "ProcessedDataset = $ProcessedDataset"\n'
1407     txt += 'echo "SE = $SE"\n'
1408     txt += 'echo "SE_PATH = $SE_PATH"\n'
1409     txt += 'echo "FOR_LFN = $FOR_LFN" \n'
1410     txt += 'echo "CMSSW_VERSION = $CMSSW_VERSION"\n\n'
1411 spiga 1.238 args = '$RUNTIME_AREA/crab_fjr_$NJob.xml $NJob $FOR_LFN $PrimaryDataset $DataTier ' \
1412     '$User -$ProcessedDataset-$PSETHASH $ApplicationFamily '+ \
1413     ' $executable $CMSSW_VERSION $PSETHASH $SE $SE_PATH"\n'
1414     txt += 'echo "$RUNTIME_AREA/ProdCommon/FwkJobRep/ModifyJobReport.py '+str(args)
1415     txt += '$RUNTIME_AREA/ProdCommon/FwkJobRep/ModifyJobReport.py '+str(args)
1416 fanzago 1.175 txt += 'modifyReport_result=$?\n'
1417     txt += 'if [ $modifyReport_result -ne 0 ]; then\n'
1418     txt += ' modifyReport_result=70500\n'
1419     txt += ' job_exit_code=$modifyReport_result\n'
1420     txt += ' echo "ModifyReportResult=$modifyReport_result" | tee -a $RUNTIME_AREA/$repo\n'
1421     txt += ' echo "WARNING: Problem with ModifyJobReport"\n'
1422     txt += 'else\n'
1423     txt += ' mv NewFrameworkJobReport.xml $RUNTIME_AREA/crab_fjr_$NJob.xml\n'
1424 spiga 1.103 txt += 'fi\n'
1425 fanzago 1.93 return txt
1426 fanzago 1.99
1427 ewv 1.192 def wsParseFJR(self):
1428 spiga 1.189 """
1429 ewv 1.192 Parse the FrameworkJobReport to obtain useful infos
1430 spiga 1.189 """
1431     txt = '\n#Written by cms_cmssw::wsParseFJR\n'
1432     txt += 'echo ">>> Parse FrameworkJobReport crab_fjr.xml"\n'
1433     txt += 'if [ -s $RUNTIME_AREA/crab_fjr_$NJob.xml ]; then\n'
1434     txt += ' if [ -s $RUNTIME_AREA/parseCrabFjr.py ]; then\n'
1435 spiga 1.197 txt += ' cmd_out=`python $RUNTIME_AREA/parseCrabFjr.py --input $RUNTIME_AREA/crab_fjr_$NJob.xml --dashboard $MonitorID,$MonitorJobID '+self.debugWrap+'`\n'
1436     if self.debug_wrapper :
1437     txt += ' echo "Result of parsing the FrameworkJobReport crab_fjr.xml: $cmd_out"\n'
1438     txt += ' executable_exit_status=`python $RUNTIME_AREA/parseCrabFjr.py --input $RUNTIME_AREA/crab_fjr_$NJob.xml --exitcode`\n'
1439 spiga 1.189 txt += ' if [ $executable_exit_status -eq 50115 ];then\n'
1440     txt += ' echo ">>> crab_fjr.xml contents: "\n'
1441 spiga 1.222 txt += ' cat $RUNTIME_AREA/crab_fjr_$NJob.xml\n'
1442 spiga 1.189 txt += ' echo "Wrong FrameworkJobReport --> does not contain useful info. ExitStatus: $executable_exit_status"\n'
1443 spiga 1.197 txt += ' elif [ $executable_exit_status -eq -999 ];then\n'
1444     txt += ' echo "ExitStatus from FrameworkJobReport not available. not available. Using exit code of executable from command line."\n'
1445 spiga 1.189 txt += ' else\n'
1446     txt += ' echo "Extracted ExitStatus from FrameworkJobReport parsing output: $executable_exit_status"\n'
1447     txt += ' fi\n'
1448     txt += ' else\n'
1449     txt += ' echo "CRAB python script to parse CRAB FrameworkJobReport crab_fjr.xml is not available, using exit code of executable from command line."\n'
1450     txt += ' fi\n'
1451     #### Patch to check input data reading for CMSSW16x Hopefully we-ll remove it asap
1452 spiga 1.232 txt += ' if [ $executable_exit_status -eq 0 ];then\n'
1453     txt += ' echo ">>> Executable succeded $executable_exit_status"\n'
1454 spiga 1.233 if (self.datasetPath and not (self.dataset_pu or self.useParent)) :
1455 spiga 1.189 # VERIFY PROCESSED DATA
1456     txt += ' echo ">>> Verify list of processed files:"\n'
1457 ewv 1.196 txt += ' echo $InputFiles |tr -d \'\\\\\' |tr \',\' \'\\n\'|tr -d \'"\' > input-files.txt\n'
1458 spiga 1.200 txt += ' python $RUNTIME_AREA/parseCrabFjr.py --input $RUNTIME_AREA/crab_fjr_$NJob.xml --lfn > processed-files.txt\n'
1459 spiga 1.189 txt += ' cat input-files.txt | sort | uniq > tmp.txt\n'
1460     txt += ' mv tmp.txt input-files.txt\n'
1461     txt += ' echo "cat input-files.txt"\n'
1462     txt += ' echo "----------------------"\n'
1463     txt += ' cat input-files.txt\n'
1464     txt += ' cat processed-files.txt | sort | uniq > tmp.txt\n'
1465     txt += ' mv tmp.txt processed-files.txt\n'
1466     txt += ' echo "----------------------"\n'
1467     txt += ' echo "cat processed-files.txt"\n'
1468     txt += ' echo "----------------------"\n'
1469     txt += ' cat processed-files.txt\n'
1470     txt += ' echo "----------------------"\n'
1471     txt += ' diff -q input-files.txt processed-files.txt\n'
1472     txt += ' fileverify_status=$?\n'
1473     txt += ' if [ $fileverify_status -ne 0 ]; then\n'
1474     txt += ' executable_exit_status=30001\n'
1475     txt += ' echo "ERROR ==> not all input files processed"\n'
1476     txt += ' echo " ==> list of processed files from crab_fjr.xml differs from list in pset.cfg"\n'
1477     txt += ' echo " ==> diff input-files.txt processed-files.txt"\n'
1478     txt += ' fi\n'
1479 spiga 1.232 txt += ' elif [ $executable_exit_status -ne 0 ] || [ $executable_exit_status -ne 50015 ] || [ $executable_exit_status -ne 50017 ];then\n'
1480     txt += ' echo ">>> Executable failed $executable_exit_status"\n'
1481     txt += ' func_exit\n'
1482     txt += ' fi\n'
1483     txt += '\n'
1484 spiga 1.189 txt += 'else\n'
1485     txt += ' echo "CRAB FrameworkJobReport crab_fjr.xml is not available, using exit code of executable from command line."\n'
1486     txt += 'fi\n'
1487     txt += '\n'
1488     txt += 'echo "ExeExitCode=$executable_exit_status" | tee -a $RUNTIME_AREA/$repo\n'
1489     txt += 'echo "EXECUTABLE_EXIT_STATUS = $executable_exit_status"\n'
1490     txt += 'job_exit_code=$executable_exit_status\n'
1491    
1492     return txt
1493    
1494 gutsche 1.5 def setParam_(self, param, value):
1495     self._params[param] = value
1496    
1497     def getParams(self):
1498     return self._params
1499 gutsche 1.8
1500 gutsche 1.35 def uniquelist(self, old):
1501     """
1502     remove duplicates from a list
1503     """
1504     nd={}
1505     for e in old:
1506     nd[e]=0
1507     return nd.keys()
1508 mcinquil 1.121
1509 spiga 1.169 def outList(self):
1510 mcinquil 1.121 """
1511     check the dimension of the output files
1512     """
1513 spiga 1.169 txt = ''
1514     txt += 'echo ">>> list of expected files on output sandbox"\n'
1515 mcinquil 1.121 listOutFiles = []
1516 ewv 1.170 stdout = 'CMSSW_$NJob.stdout'
1517 spiga 1.169 stderr = 'CMSSW_$NJob.stderr'
1518 fanzago 1.148 if (self.return_data == 1):
1519 spiga 1.157 for file in (self.output_file+self.output_file_sandbox):
1520 slacapra 1.207 listOutFiles.append(numberFile(file, '$NJob'))
1521 spiga 1.169 listOutFiles.append(stdout)
1522     listOutFiles.append(stderr)
1523 ewv 1.156 else:
1524 spiga 1.157 for file in (self.output_file_sandbox):
1525 slacapra 1.207 listOutFiles.append(numberFile(file, '$NJob'))
1526 spiga 1.169 listOutFiles.append(stdout)
1527     listOutFiles.append(stderr)
1528 fanzago 1.161 txt += 'echo "output files: '+string.join(listOutFiles,' ')+'"\n'
1529 spiga 1.157 txt += 'filesToCheck="'+string.join(listOutFiles,' ')+'"\n'
1530 spiga 1.169 txt += 'export filesToCheck\n'
1531 ewv 1.170 return txt