ViewVC Help
View File | Revision Log | Show Annotations | Root Listing
root/cvsroot/COMP/CRAB/python/cms_cmssw.py
Revision: 1.268
Committed: Tue Jan 20 17:06:45 2009 UTC (16 years, 3 months ago) by spiga
Content type: text/x-python
Branch: MAIN
CVS Tags: CRAB_2_4_4, CRAB_2_4_4_pre6, CRAB_2_4_4_pre5, CRAB_2_4_4_pre4, CRAB_2_4_4_pre3
Changes since 1.267: +5 -0 lines
Log Message:
added warning if no output files are defined nor in the crab.cfg neither in the cmssw configuration

File Contents

# User Rev Content
1 slacapra 1.1 from JobType import JobType
2     from crab_logger import Logger
3     from crab_exceptions import *
4     from crab_util import *
5 ewv 1.250 from WMCore.SiteScreening.BlackWhiteListParser import SEBlackWhiteListParser
6 slacapra 1.1 import common
7     import Scram
8    
9 slacapra 1.105 import os, string, glob
10 slacapra 1.1
11     class Cmssw(JobType):
12 spiga 1.208 def __init__(self, cfg_params, ncjobs,skip_blocks, isNew):
13 slacapra 1.1 JobType.__init__(self, 'CMSSW')
14     common.logger.debug(3,'CMSSW::__init__')
15 spiga 1.208 self.skip_blocks = skip_blocks
16 mcinquil 1.140 self.argsList = []
17 mcinquil 1.144
18 gutsche 1.3 self._params = {}
19     self.cfg_params = cfg_params
20 ewv 1.254
21 fanzago 1.115 # init BlackWhiteListParser
22 ewv 1.254 seWhiteList = cfg_params.get('EDG.se_white_list',[])
23     seBlackList = cfg_params.get('EDG.se_black_list',[])
24     self.blackWhiteListParser = SEBlackWhiteListParser(seWhiteList, seBlackList, common.logger)
25 fanzago 1.115
26 spiga 1.234 ### Temporary patch to automatically skip the ISB size check:
27     server=self.cfg_params.get('CRAB.server_name',None)
28 ewv 1.250 size = 9.5
29 spiga 1.249 if server or common.scheduler.name().upper() in ['LSF','CAF']: size = 99999
30 spiga 1.234 ### D.S.
31     self.MaxTarBallSize = float(self.cfg_params.get('EDG.maxtarballsize',size))
32 gutsche 1.72
33 gutsche 1.44 # number of jobs requested to be created, limit obj splitting
34 gutsche 1.38 self.ncjobs = ncjobs
35    
36 slacapra 1.1 log = common.logger
37 ewv 1.131
38 slacapra 1.1 self.scram = Scram.Scram(cfg_params)
39     self.additional_inbox_files = []
40     self.scriptExe = ''
41     self.executable = ''
42 slacapra 1.71 self.executable_arch = self.scram.getArch()
43 slacapra 1.1 self.tgz_name = 'default.tgz'
44 corvo 1.56 self.scriptName = 'CMSSW.sh'
45 ewv 1.192 self.pset = ''
46 spiga 1.187 self.datasetPath = ''
47 gutsche 1.3
48 gutsche 1.50 # set FJR file name
49     self.fjrFileName = 'crab_fjr.xml'
50    
51 slacapra 1.1 self.version = self.scram.getSWVersion()
52 ewv 1.182 version_array = self.version.split('_')
53 ewv 1.184 self.CMSSW_major = 0
54     self.CMSSW_minor = 0
55     self.CMSSW_patch = 0
56 ewv 1.182 try:
57 ewv 1.184 self.CMSSW_major = int(version_array[1])
58     self.CMSSW_minor = int(version_array[2])
59     self.CMSSW_patch = int(version_array[3])
60 ewv 1.182 except:
61 ewv 1.184 msg = "Cannot parse CMSSW version string: " + self.version + " for major and minor release number!"
62 ewv 1.182 raise CrabException(msg)
63    
64 slacapra 1.1 ### collect Data cards
65 gutsche 1.66
66 slacapra 1.153 if not cfg_params.has_key('CMSSW.datasetpath'):
67 ewv 1.131 msg = "Error: datasetpath not defined "
68 slacapra 1.1 raise CrabException(msg)
69 ewv 1.226
70 fanzago 1.221 ### Temporary: added to remove input file control in the case of PU
71 farinafa 1.224 self.dataset_pu = cfg_params.get('CMSSW.dataset_pu', None)
72 ewv 1.226
73 slacapra 1.153 tmp = cfg_params['CMSSW.datasetpath']
74     log.debug(6, "CMSSW::CMSSW(): datasetPath = "+tmp)
75 spiga 1.236
76     if tmp =='':
77     msg = "Error: datasetpath not defined "
78     raise CrabException(msg)
79     elif string.lower(tmp)=='none':
80 slacapra 1.153 self.datasetPath = None
81     self.selectNoInput = 1
82     else:
83     self.datasetPath = tmp
84     self.selectNoInput = 0
85 gutsche 1.5
86 slacapra 1.1 self.dataTiers = []
87 spiga 1.197 self.debugWrap = ''
88     self.debug_wrapper = cfg_params.get('USER.debug_wrapper',False)
89     if self.debug_wrapper: self.debugWrap='--debug'
90 slacapra 1.1 ## now the application
91 ewv 1.262 self.managedGenerators = ['madgraph','comphep']
92 ewv 1.258 self.generator = cfg_params.get('CMSSW.generator','pythia').lower()
93 slacapra 1.153 self.executable = cfg_params.get('CMSSW.executable','cmsRun')
94     log.debug(6, "CMSSW::CMSSW(): executable = "+self.executable)
95 slacapra 1.1
96 slacapra 1.153 if not cfg_params.has_key('CMSSW.pset'):
97 slacapra 1.1 raise CrabException("PSet file missing. Cannot run cmsRun ")
98 slacapra 1.153 self.pset = cfg_params['CMSSW.pset']
99     log.debug(6, "Cmssw::Cmssw(): PSet file = "+self.pset)
100     if self.pset.lower() != 'none' :
101     if (not os.path.exists(self.pset)):
102     raise CrabException("User defined PSet file "+self.pset+" does not exist")
103     else:
104     self.pset = None
105 slacapra 1.1
106     # output files
107 slacapra 1.53 ## stuff which must be returned always via sandbox
108     self.output_file_sandbox = []
109    
110     # add fjr report by default via sandbox
111     self.output_file_sandbox.append(self.fjrFileName)
112    
113     # other output files to be returned via sandbox or copied to SE
114 mcinquil 1.216 outfileflag = False
115 slacapra 1.153 self.output_file = []
116     tmp = cfg_params.get('CMSSW.output_file',None)
117     if tmp :
118 slacapra 1.207 self.output_file = [x.strip() for x in tmp.split(',')]
119 mcinquil 1.216 outfileflag = True #output found
120     #else:
121     # log.message("No output file defined: only stdout/err and the CRAB Framework Job Report will be available\n")
122 slacapra 1.1
123     # script_exe file as additional file in inputSandbox
124 slacapra 1.153 self.scriptExe = cfg_params.get('USER.script_exe',None)
125     if self.scriptExe :
126 slacapra 1.176 if not os.path.isfile(self.scriptExe):
127     msg ="ERROR. file "+self.scriptExe+" not found"
128     raise CrabException(msg)
129     self.additional_inbox_files.append(string.strip(self.scriptExe))
130 slacapra 1.70
131 spiga 1.42 if self.datasetPath == None and self.pset == None and self.scriptExe == '' :
132 slacapra 1.176 msg ="Error. script_exe not defined"
133     raise CrabException(msg)
134 spiga 1.42
135 ewv 1.226 # use parent files...
136 spiga 1.204 self.useParent = self.cfg_params.get('CMSSW.use_parent',False)
137    
138 slacapra 1.1 ## additional input files
139 slacapra 1.153 if cfg_params.has_key('USER.additional_input_files'):
140 slacapra 1.29 tmpAddFiles = string.split(cfg_params['USER.additional_input_files'],',')
141 slacapra 1.70 for tmp in tmpAddFiles:
142     tmp = string.strip(tmp)
143     dirname = ''
144     if not tmp[0]=="/": dirname = "."
145 corvo 1.85 files = []
146     if string.find(tmp,"*")>-1:
147     files = glob.glob(os.path.join(dirname, tmp))
148     if len(files)==0:
149     raise CrabException("No additional input file found with this pattern: "+tmp)
150     else:
151     files.append(tmp)
152 slacapra 1.70 for file in files:
153     if not os.path.exists(file):
154     raise CrabException("Additional input file not found: "+file)
155 slacapra 1.45 pass
156 slacapra 1.105 self.additional_inbox_files.append(string.strip(file))
157 slacapra 1.1 pass
158     pass
159 slacapra 1.70 common.logger.debug(5,"Additional input files: "+str(self.additional_inbox_files))
160 slacapra 1.153 pass
161 gutsche 1.3
162 slacapra 1.9 ## Events per job
163 slacapra 1.153 if cfg_params.has_key('CMSSW.events_per_job'):
164 slacapra 1.10 self.eventsPerJob =int( cfg_params['CMSSW.events_per_job'])
165 slacapra 1.9 self.selectEventsPerJob = 1
166 slacapra 1.153 else:
167 slacapra 1.9 self.eventsPerJob = -1
168     self.selectEventsPerJob = 0
169 ewv 1.131
170 slacapra 1.22 ## number of jobs
171 slacapra 1.153 if cfg_params.has_key('CMSSW.number_of_jobs'):
172 slacapra 1.22 self.theNumberOfJobs =int( cfg_params['CMSSW.number_of_jobs'])
173     self.selectNumberOfJobs = 1
174 slacapra 1.153 else:
175 slacapra 1.22 self.theNumberOfJobs = 0
176     self.selectNumberOfJobs = 0
177 slacapra 1.10
178 slacapra 1.153 if cfg_params.has_key('CMSSW.total_number_of_events'):
179 gutsche 1.35 self.total_number_of_events = int(cfg_params['CMSSW.total_number_of_events'])
180     self.selectTotalNumberEvents = 1
181 spiga 1.193 if self.selectNumberOfJobs == 1:
182 spiga 1.202 if (self.total_number_of_events != -1) and int(self.total_number_of_events) < int(self.theNumberOfJobs):
183 spiga 1.193 msg = 'Must specify at least one event per job. total_number_of_events > number_of_jobs '
184     raise CrabException(msg)
185 slacapra 1.153 else:
186 gutsche 1.35 self.total_number_of_events = 0
187     self.selectTotalNumberEvents = 0
188    
189 spiga 1.187 if self.pset != None:
190 spiga 1.42 if ( (self.selectTotalNumberEvents + self.selectEventsPerJob + self.selectNumberOfJobs) != 2 ):
191     msg = 'Must define exactly two of total_number_of_events, events_per_job, or number_of_jobs.'
192     raise CrabException(msg)
193     else:
194     if (self.selectNumberOfJobs == 0):
195     msg = 'Must specify number_of_jobs.'
196     raise CrabException(msg)
197 gutsche 1.35
198 ewv 1.160 ## New method of dealing with seeds
199     self.incrementSeeds = []
200     self.preserveSeeds = []
201     if cfg_params.has_key('CMSSW.preserve_seeds'):
202     tmpList = cfg_params['CMSSW.preserve_seeds'].split(',')
203     for tmp in tmpList:
204     tmp.strip()
205     self.preserveSeeds.append(tmp)
206     if cfg_params.has_key('CMSSW.increment_seeds'):
207     tmpList = cfg_params['CMSSW.increment_seeds'].split(',')
208     for tmp in tmpList:
209     tmp.strip()
210     self.incrementSeeds.append(tmp)
211    
212 ewv 1.227 ## FUTURE: Can remove in CRAB 2.4.0
213     self.sourceSeed = cfg_params.get('CMSSW.pythia_seed',None)
214 slacapra 1.153 self.sourceSeedVtx = cfg_params.get('CMSSW.vtx_seed',None)
215 ewv 1.227 self.sourceSeedG4 = cfg_params.get('CMSSW.g4_seed',None)
216 slacapra 1.153 self.sourceSeedMix = cfg_params.get('CMSSW.mix_seed',None)
217 ewv 1.227 if self.sourceSeed or self.sourceSeedVtx or self.sourceSeedG4 or self.sourceSeedMix:
218     msg = 'pythia_seed, vtx_seed, g4_seed, and mix_seed are no longer valid settings. You must use increment_seeds or preserve_seeds'
219     raise CrabException(msg)
220 slacapra 1.90
221 slacapra 1.153 self.firstRun = cfg_params.get('CMSSW.first_run',None)
222 slacapra 1.90
223 ewv 1.147 # Copy/return
224 slacapra 1.153 self.copy_data = int(cfg_params.get('USER.copy_data',0))
225     self.return_data = int(cfg_params.get('USER.return_data',0))
226 ewv 1.147
227 slacapra 1.1 #DBSDLS-start
228 ewv 1.131 ## Initialize the variables that are extracted from DBS/DLS and needed in other places of the code
229 slacapra 1.1 self.maxEvents=0 # max events available ( --> check the requested nb. of evts in Creator.py)
230     self.DBSPaths={} # all dbs paths requested ( --> input to the site local discovery script)
231 gutsche 1.35 self.jobDestination=[] # Site destination(s) for each job (list of lists)
232 slacapra 1.1 ## Perform the data location and discovery (based on DBS/DLS)
233 slacapra 1.9 ## SL: Don't if NONE is specified as input (pythia use case)
234 gutsche 1.35 blockSites = {}
235 slacapra 1.9 if self.datasetPath:
236 gutsche 1.35 blockSites = self.DataDiscoveryAndLocation(cfg_params)
237 ewv 1.131 #DBSDLS-end
238 spiga 1.266
239     noBboundary = int(cfg_params.get('CMSSW.no_block_boundary',0))
240 slacapra 1.9 ## Select Splitting
241 ewv 1.131 if self.selectNoInput:
242 spiga 1.187 if self.pset == None:
243 spiga 1.42 self.jobSplittingForScript()
244     else:
245     self.jobSplittingNoInput()
246 spiga 1.266 elif noBboundary == 1:
247 afanfani 1.237 self.jobSplittingNoBlockBoundary(blockSites)
248 gutsche 1.92 else:
249 corvo 1.56 self.jobSplittingByBlocks(blockSites)
250 gutsche 1.5
251 spiga 1.208 # modify Pset only the first time
252     if isNew:
253     if self.pset != None:
254     import PsetManipulator as pp
255     PsetEdit = pp.PsetManipulator(self.pset)
256     try:
257     # Add FrameworkJobReport to parameter-set, set max events.
258     # Reset later for data jobs by writeCFG which does all modifications
259     PsetEdit.addCrabFJR(self.fjrFileName) # FUTURE: Job report addition not needed by CMSSW>1.5
260     PsetEdit.maxEvent(self.eventsPerJob)
261 ewv 1.265 PsetEdit.skipEvent(0)
262 spiga 1.208 PsetEdit.psetWriter(self.configFilename())
263 slacapra 1.215 ## If present, add TFileService to output files
264     if not int(cfg_params.get('CMSSW.skip_TFileService_output',0)):
265     tfsOutput = PsetEdit.getTFileService()
266 ewv 1.226 if tfsOutput:
267 slacapra 1.215 if tfsOutput in self.output_file:
268     common.logger.debug(5,"Output from TFileService "+tfsOutput+" already in output files")
269     else:
270 mcinquil 1.216 outfileflag = True #output found
271 slacapra 1.215 self.output_file.append(tfsOutput)
272     common.logger.message("Adding "+tfsOutput+" to output files (from TFileService)")
273 slacapra 1.218 pass
274     pass
275     ## If present and requested, add PoolOutputModule to output files
276 slacapra 1.219 if int(cfg_params.get('CMSSW.get_edm_output',0)):
277 slacapra 1.218 edmOutput = PsetEdit.getPoolOutputModule()
278 ewv 1.226 if edmOutput:
279 slacapra 1.218 if edmOutput in self.output_file:
280     common.logger.debug(5,"Output from PoolOutputModule "+edmOutput+" already in output files")
281     else:
282     self.output_file.append(edmOutput)
283     common.logger.message("Adding "+edmOutput+" to output files (from PoolOutputModule)")
284     pass
285     pass
286 slacapra 1.215 except CrabException:
287 spiga 1.208 msg='Error while manipulating ParameterSet: exiting...'
288     raise CrabException(msg)
289 ewv 1.226 ## Prepare inputSandbox TarBall (only the first time)
290 spiga 1.208 self.tgzNameWithPath = self.getTarBall(self.executable)
291 gutsche 1.3
292 slacapra 1.1 def DataDiscoveryAndLocation(self, cfg_params):
293    
294 slacapra 1.86 import DataDiscovery
295     import DataLocation
296 gutsche 1.3 common.logger.debug(10,"CMSSW::DataDiscoveryAndLocation()")
297    
298     datasetPath=self.datasetPath
299    
300 slacapra 1.1 ## Contact the DBS
301 gutsche 1.92 common.logger.message("Contacting Data Discovery Services ...")
302 slacapra 1.1 try:
303 spiga 1.208 self.pubdata=DataDiscovery.DataDiscovery(datasetPath, cfg_params,self.skip_blocks)
304 slacapra 1.1 self.pubdata.fetchDBSInfo()
305    
306 slacapra 1.41 except DataDiscovery.NotExistingDatasetError, ex :
307 slacapra 1.1 msg = 'ERROR ***: failed Data Discovery in DBS : %s'%ex.getErrorMessage()
308     raise CrabException(msg)
309 slacapra 1.41 except DataDiscovery.NoDataTierinProvenanceError, ex :
310 slacapra 1.1 msg = 'ERROR ***: failed Data Discovery in DBS : %s'%ex.getErrorMessage()
311     raise CrabException(msg)
312 slacapra 1.41 except DataDiscovery.DataDiscoveryError, ex:
313 gutsche 1.66 msg = 'ERROR ***: failed Data Discovery in DBS : %s'%ex.getErrorMessage()
314 slacapra 1.1 raise CrabException(msg)
315    
316 gutsche 1.35 self.filesbyblock=self.pubdata.getFiles()
317 mkirn 1.37 self.eventsbyblock=self.pubdata.getEventsPerBlock()
318     self.eventsbyfile=self.pubdata.getEventsPerFile()
319 spiga 1.204 self.parentFiles=self.pubdata.getParent()
320 gutsche 1.3
321 slacapra 1.1 ## get max number of events
322 ewv 1.192 self.maxEvents=self.pubdata.getMaxEvents()
323 slacapra 1.1
324     ## Contact the DLS and build a list of sites hosting the fileblocks
325     try:
326 slacapra 1.41 dataloc=DataLocation.DataLocation(self.filesbyblock.keys(),cfg_params)
327 gutsche 1.6 dataloc.fetchDLSInfo()
328 slacapra 1.263
329 slacapra 1.41 except DataLocation.DataLocationError , ex:
330 slacapra 1.1 msg = 'ERROR ***: failed Data Location in DLS \n %s '%ex.getErrorMessage()
331     raise CrabException(msg)
332 ewv 1.131
333 slacapra 1.1
334 gutsche 1.35 sites = dataloc.getSites()
335 slacapra 1.264 if len(sites)==0:
336 spiga 1.267 msg = 'ERROR ***: no location for any of the blocks of this dataset: \n\t %s \n'%datasetPath
337     msg += "\tMaybe the dataset is located only at T1's (or at T0), where analysis jobs are not allowed\n"
338     msg += "\tPlease check DataDiscovery page https://cmsweb.cern.ch/dbs_discovery/\n"
339 slacapra 1.264 raise CrabException(msg)
340    
341 gutsche 1.35 allSites = []
342     listSites = sites.values()
343 slacapra 1.63 for listSite in listSites:
344     for oneSite in listSite:
345 gutsche 1.35 allSites.append(oneSite)
346     allSites = self.uniquelist(allSites)
347 gutsche 1.3
348 gutsche 1.92 # screen output
349     common.logger.message("Requested dataset: " + datasetPath + " has " + str(self.maxEvents) + " events in " + str(len(self.filesbyblock.keys())) + " blocks.\n")
350    
351 gutsche 1.35 return sites
352 ewv 1.131
353 gutsche 1.35 def jobSplittingByBlocks(self, blockSites):
354 slacapra 1.9 """
355 gutsche 1.35 Perform job splitting. Jobs run over an integer number of files
356     and no more than one block.
357     ARGUMENT: blockSites: dictionary with blocks as keys and list of host sites as values
358     REQUIRES: self.selectTotalNumberEvents, self.selectEventsPerJob, self.selectNumberofJobs,
359     self.total_number_of_events, self.eventsPerJob, self.theNumberOfJobs,
360     self.maxEvents, self.filesbyblock
361     SETS: self.jobDestination - Site destination(s) for each job (a list of lists)
362     self.total_number_of_jobs - Total # of jobs
363     self.list_of_args - File(s) job will run on (a list of lists)
364     """
365    
366     # ---- Handle the possible job splitting configurations ---- #
367     if (self.selectTotalNumberEvents):
368     totalEventsRequested = self.total_number_of_events
369     if (self.selectEventsPerJob):
370     eventsPerJobRequested = self.eventsPerJob
371     if (self.selectNumberOfJobs):
372     totalEventsRequested = self.theNumberOfJobs * self.eventsPerJob
373    
374     # If user requested all the events in the dataset
375     if (totalEventsRequested == -1):
376     eventsRemaining=self.maxEvents
377     # If user requested more events than are in the dataset
378     elif (totalEventsRequested > self.maxEvents):
379     eventsRemaining = self.maxEvents
380     common.logger.message("Requested "+str(self.total_number_of_events)+ " events, but only "+str(self.maxEvents)+" events are available.")
381     # If user requested less events than are in the dataset
382     else:
383     eventsRemaining = totalEventsRequested
384 slacapra 1.22
385 slacapra 1.41 # If user requested more events per job than are in the dataset
386     if (self.selectEventsPerJob and eventsPerJobRequested > self.maxEvents):
387     eventsPerJobRequested = self.maxEvents
388    
389 gutsche 1.35 # For user info at end
390     totalEventCount = 0
391 gutsche 1.3
392 gutsche 1.35 if (self.selectTotalNumberEvents and self.selectNumberOfJobs):
393     eventsPerJobRequested = int(eventsRemaining/self.theNumberOfJobs)
394 slacapra 1.22
395 gutsche 1.35 if (self.selectNumberOfJobs):
396     common.logger.message("May not create the exact number_of_jobs requested.")
397 slacapra 1.23
398 gutsche 1.38 if ( self.ncjobs == 'all' ) :
399     totalNumberOfJobs = 999999999
400     else :
401     totalNumberOfJobs = self.ncjobs
402 ewv 1.131
403 gutsche 1.35 blocks = blockSites.keys()
404     blockCount = 0
405     # Backup variable in case self.maxEvents counted events in a non-included block
406     numBlocksInDataset = len(blocks)
407 gutsche 1.3
408 gutsche 1.35 jobCount = 0
409     list_of_lists = []
410 gutsche 1.3
411 gutsche 1.92 # list tracking which jobs are in which jobs belong to which block
412     jobsOfBlock = {}
413    
414 gutsche 1.35 # ---- Iterate over the blocks in the dataset until ---- #
415     # ---- we've met the requested total # of events ---- #
416 gutsche 1.38 while ( (eventsRemaining > 0) and (blockCount < numBlocksInDataset) and (jobCount < totalNumberOfJobs)):
417 gutsche 1.35 block = blocks[blockCount]
418 gutsche 1.44 blockCount += 1
419 gutsche 1.104 if block not in jobsOfBlock.keys() :
420     jobsOfBlock[block] = []
421 ewv 1.131
422 gutsche 1.68 if self.eventsbyblock.has_key(block) :
423     numEventsInBlock = self.eventsbyblock[block]
424     common.logger.debug(5,'Events in Block File '+str(numEventsInBlock))
425 ewv 1.131
426 gutsche 1.68 files = self.filesbyblock[block]
427     numFilesInBlock = len(files)
428     if (numFilesInBlock <= 0):
429     continue
430     fileCount = 0
431    
432     # ---- New block => New job ---- #
433 ewv 1.131 parString = ""
434 gutsche 1.68 # counter for number of events in files currently worked on
435     filesEventCount = 0
436     # flag if next while loop should touch new file
437     newFile = 1
438     # job event counter
439     jobSkipEventCount = 0
440 ewv 1.131
441 gutsche 1.68 # ---- Iterate over the files in the block until we've met the requested ---- #
442     # ---- total # of events or we've gone over all the files in this block ---- #
443 spiga 1.204 pString=''
444 gutsche 1.68 while ( (eventsRemaining > 0) and (fileCount < numFilesInBlock) and (jobCount < totalNumberOfJobs) ):
445     file = files[fileCount]
446 spiga 1.204 if self.useParent:
447     parent = self.parentFiles[file]
448     for f in parent :
449     pString += '\\\"' + f + '\\\"\,'
450     common.logger.debug(6, "File "+str(file)+" has the following parents: "+str(parent))
451     common.logger.write("File "+str(file)+" has the following parents: "+str(parent))
452 gutsche 1.68 if newFile :
453     try:
454     numEventsInFile = self.eventsbyfile[file]
455     common.logger.debug(6, "File "+str(file)+" has "+str(numEventsInFile)+" events")
456     # increase filesEventCount
457     filesEventCount += numEventsInFile
458     # Add file to current job
459     parString += '\\\"' + file + '\\\"\,'
460     newFile = 0
461     except KeyError:
462     common.logger.message("File "+str(file)+" has unknown number of events: skipping")
463 ewv 1.131
464 slacapra 1.177 eventsPerJobRequested = min(eventsPerJobRequested, eventsRemaining)
465 gutsche 1.68 # if less events in file remain than eventsPerJobRequested
466 slacapra 1.177 if ( filesEventCount - jobSkipEventCount < eventsPerJobRequested):
467 gutsche 1.68 # if last file in block
468     if ( fileCount == numFilesInBlock-1 ) :
469     # end job using last file, use remaining events in block
470     # close job and touch new file
471     fullString = parString[:-2]
472 spiga 1.204 if self.useParent:
473     fullParentString = pString[:-2]
474     list_of_lists.append([fullString,fullParentString,str(-1),str(jobSkipEventCount)])
475     else:
476     list_of_lists.append([fullString,str(-1),str(jobSkipEventCount)])
477 gutsche 1.68 common.logger.debug(3,"Job "+str(jobCount+1)+" can run over "+str(filesEventCount - jobSkipEventCount)+" events (last file in block).")
478     self.jobDestination.append(blockSites[block])
479     common.logger.debug(5,"Job "+str(jobCount+1)+" Destination: "+str(self.jobDestination[jobCount]))
480 gutsche 1.92 # fill jobs of block dictionary
481 gutsche 1.104 jobsOfBlock[block].append(jobCount+1)
482 gutsche 1.68 # reset counter
483     jobCount = jobCount + 1
484     totalEventCount = totalEventCount + filesEventCount - jobSkipEventCount
485     eventsRemaining = eventsRemaining - filesEventCount + jobSkipEventCount
486     jobSkipEventCount = 0
487     # reset file
488 spiga 1.204 pString = ""
489 ewv 1.131 parString = ""
490 gutsche 1.68 filesEventCount = 0
491     newFile = 1
492     fileCount += 1
493     else :
494     # go to next file
495     newFile = 1
496     fileCount += 1
497     # if events in file equal to eventsPerJobRequested
498     elif ( filesEventCount - jobSkipEventCount == eventsPerJobRequested ) :
499 gutsche 1.38 # close job and touch new file
500     fullString = parString[:-2]
501 spiga 1.204 if self.useParent:
502     fullParentString = pString[:-2]
503     list_of_lists.append([fullString,fullParentString,str(eventsPerJobRequested),str(jobSkipEventCount)])
504 ewv 1.226 else:
505 spiga 1.204 list_of_lists.append([fullString,str(eventsPerJobRequested),str(jobSkipEventCount)])
506 gutsche 1.68 common.logger.debug(3,"Job "+str(jobCount+1)+" can run over "+str(eventsPerJobRequested)+" events.")
507 gutsche 1.38 self.jobDestination.append(blockSites[block])
508     common.logger.debug(5,"Job "+str(jobCount+1)+" Destination: "+str(self.jobDestination[jobCount]))
509 gutsche 1.104 jobsOfBlock[block].append(jobCount+1)
510 gutsche 1.38 # reset counter
511     jobCount = jobCount + 1
512 gutsche 1.68 totalEventCount = totalEventCount + eventsPerJobRequested
513     eventsRemaining = eventsRemaining - eventsPerJobRequested
514 gutsche 1.38 jobSkipEventCount = 0
515     # reset file
516 spiga 1.204 pString = ""
517 ewv 1.131 parString = ""
518 gutsche 1.38 filesEventCount = 0
519     newFile = 1
520     fileCount += 1
521 ewv 1.131
522 gutsche 1.68 # if more events in file remain than eventsPerJobRequested
523 gutsche 1.38 else :
524 gutsche 1.68 # close job but don't touch new file
525     fullString = parString[:-2]
526 spiga 1.204 if self.useParent:
527     fullParentString = pString[:-2]
528     list_of_lists.append([fullString,fullParentString,str(eventsPerJobRequested),str(jobSkipEventCount)])
529     else:
530     list_of_lists.append([fullString,str(eventsPerJobRequested),str(jobSkipEventCount)])
531 gutsche 1.68 common.logger.debug(3,"Job "+str(jobCount+1)+" can run over "+str(eventsPerJobRequested)+" events.")
532     self.jobDestination.append(blockSites[block])
533     common.logger.debug(5,"Job "+str(jobCount+1)+" Destination: "+str(self.jobDestination[jobCount]))
534 gutsche 1.104 jobsOfBlock[block].append(jobCount+1)
535 gutsche 1.68 # increase counter
536     jobCount = jobCount + 1
537     totalEventCount = totalEventCount + eventsPerJobRequested
538     eventsRemaining = eventsRemaining - eventsPerJobRequested
539     # calculate skip events for last file
540     # use filesEventCount (contains several files), jobSkipEventCount and eventsPerJobRequest
541     jobSkipEventCount = eventsPerJobRequested - (filesEventCount - jobSkipEventCount - self.eventsbyfile[file])
542     # remove all but the last file
543     filesEventCount = self.eventsbyfile[file]
544 spiga 1.204 if self.useParent:
545     for f in parent : pString += '\\\"' + f + '\\\"\,'
546 ewv 1.160 parString = '\\\"' + file + '\\\"\,'
547 gutsche 1.68 pass # END if
548     pass # END while (iterate over files in the block)
549 gutsche 1.35 pass # END while (iterate over blocks in the dataset)
550 slacapra 1.41 self.ncjobs = self.total_number_of_jobs = jobCount
551 gutsche 1.38 if (eventsRemaining > 0 and jobCount < totalNumberOfJobs ):
552 gutsche 1.35 common.logger.message("Could not run on all requested events because some blocks not hosted at allowed sites.")
553 gutsche 1.92 common.logger.message(str(jobCount)+" job(s) can run on "+str(totalEventCount)+" events.\n")
554 ewv 1.131
555 gutsche 1.92 # screen output
556     screenOutput = "List of jobs and available destination sites:\n\n"
557    
558 mcinquil 1.124 # keep trace of block with no sites to print a warning at the end
559     noSiteBlock = []
560     bloskNoSite = []
561    
562 gutsche 1.92 blockCounter = 0
563 gutsche 1.104 for block in blocks:
564     if block in jobsOfBlock.keys() :
565     blockCounter += 1
566 slacapra 1.176 screenOutput += "Block %5i: jobs %20s: sites: %s\n" % (blockCounter,spanRanges(jobsOfBlock[block]),
567     ','.join(self.blackWhiteListParser.checkWhiteList(self.blackWhiteListParser.checkBlackList(blockSites[block],block),block)))
568 mcinquil 1.124 if len(self.blackWhiteListParser.checkWhiteList(self.blackWhiteListParser.checkBlackList(blockSites[block],block),block)) == 0:
569 ewv 1.131 noSiteBlock.append( spanRanges(jobsOfBlock[block]) )
570 mcinquil 1.124 bloskNoSite.append( blockCounter )
571 ewv 1.131
572 mcinquil 1.124 common.logger.message(screenOutput)
573 fanzago 1.127 if len(noSiteBlock) > 0 and len(bloskNoSite) > 0:
574 mcinquil 1.126 msg = 'WARNING: No sites are hosting any part of data for block:\n '
575     virgola = ""
576     if len(bloskNoSite) > 1:
577     virgola = ","
578     for block in bloskNoSite:
579     msg += ' ' + str(block) + virgola
580     msg += '\n Related jobs:\n '
581     virgola = ""
582     if len(noSiteBlock) > 1:
583     virgola = ","
584     for range_jobs in noSiteBlock:
585     msg += str(range_jobs) + virgola
586     msg += '\n will not be submitted and this block of data can not be analyzed!\n'
587 slacapra 1.155 if self.cfg_params.has_key('EDG.se_white_list'):
588     msg += 'WARNING: SE White List: '+self.cfg_params['EDG.se_white_list']+'\n'
589     msg += '(Hint: By whitelisting you force the job to run at this particular site(s).\n'
590     msg += 'Please check if the dataset is available at this site!)\n'
591     if self.cfg_params.has_key('EDG.ce_white_list'):
592     msg += 'WARNING: CE White List: '+self.cfg_params['EDG.ce_white_list']+'\n'
593     msg += '(Hint: By whitelisting you force the job to run at this particular site(s).\n'
594     msg += 'Please check if the dataset is available at this site!)\n'
595    
596 mcinquil 1.126 common.logger.message(msg)
597 gutsche 1.92
598 slacapra 1.9 self.list_of_args = list_of_lists
599     return
600    
601 afanfani 1.237 def jobSplittingNoBlockBoundary(self,blockSites):
602     """
603     """
604     # ---- Handle the possible job splitting configurations ---- #
605     if (self.selectTotalNumberEvents):
606     totalEventsRequested = self.total_number_of_events
607     if (self.selectEventsPerJob):
608     eventsPerJobRequested = self.eventsPerJob
609     if (self.selectNumberOfJobs):
610     totalEventsRequested = self.theNumberOfJobs * self.eventsPerJob
611 ewv 1.250
612 afanfani 1.237 # If user requested all the events in the dataset
613     if (totalEventsRequested == -1):
614     eventsRemaining=self.maxEvents
615     # If user requested more events than are in the dataset
616     elif (totalEventsRequested > self.maxEvents):
617     eventsRemaining = self.maxEvents
618     common.logger.message("Requested "+str(self.total_number_of_events)+ " events, but only "+str(self.maxEvents)+" events are available.")
619     # If user requested less events than are in the dataset
620     else:
621     eventsRemaining = totalEventsRequested
622 ewv 1.250
623 afanfani 1.237 # If user requested more events per job than are in the dataset
624     if (self.selectEventsPerJob and eventsPerJobRequested > self.maxEvents):
625     eventsPerJobRequested = self.maxEvents
626 ewv 1.250
627 afanfani 1.237 # For user info at end
628     totalEventCount = 0
629    
630     if (self.selectTotalNumberEvents and self.selectNumberOfJobs):
631     eventsPerJobRequested = int(eventsRemaining/self.theNumberOfJobs)
632 ewv 1.250
633 afanfani 1.237 if (self.selectNumberOfJobs):
634     common.logger.message("May not create the exact number_of_jobs requested.")
635 ewv 1.250
636 afanfani 1.237 if ( self.ncjobs == 'all' ) :
637     totalNumberOfJobs = 999999999
638     else :
639     totalNumberOfJobs = self.ncjobs
640 ewv 1.250
641 afanfani 1.237 blocks = blockSites.keys()
642     blockCount = 0
643     # Backup variable in case self.maxEvents counted events in a non-included block
644     numBlocksInDataset = len(blocks)
645 ewv 1.250
646 afanfani 1.237 jobCount = 0
647     list_of_lists = []
648    
649     #AF
650     #AF do not reset input files and event count on block boundary
651     #AF
652     parString=""
653     filesEventCount = 0
654     #AF
655    
656     # list tracking which jobs are in which jobs belong to which block
657     jobsOfBlock = {}
658     while ( (eventsRemaining > 0) and (blockCount < numBlocksInDataset) and (jobCount < totalNumberOfJobs)):
659     block = blocks[blockCount]
660     blockCount += 1
661     if block not in jobsOfBlock.keys() :
662     jobsOfBlock[block] = []
663    
664     if self.eventsbyblock.has_key(block) :
665     numEventsInBlock = self.eventsbyblock[block]
666     common.logger.debug(5,'Events in Block File '+str(numEventsInBlock))
667     files = self.filesbyblock[block]
668     numFilesInBlock = len(files)
669     if (numFilesInBlock <= 0):
670     continue
671     fileCount = 0
672     #AF
673     #AF do not reset input files and event count of block boundary
674     #AF
675     ## ---- New block => New job ---- #
676     #parString = ""
677     # counter for number of events in files currently worked on
678     #filesEventCount = 0
679     #AF
680     # flag if next while loop should touch new file
681     newFile = 1
682     # job event counter
683     jobSkipEventCount = 0
684    
685     # ---- Iterate over the files in the block until we've met the requested ---- #
686     # ---- total # of events or we've gone over all the files in this block ---- #
687     pString=''
688     while ( (eventsRemaining > 0) and (fileCount < numFilesInBlock) and (jobCount < totalNumberOfJobs) ):
689     file = files[fileCount]
690     if self.useParent:
691     parent = self.parentFiles[file]
692     for f in parent :
693     pString += '\\\"' + f + '\\\"\,'
694     common.logger.debug(6, "File "+str(file)+" has the following parents: "+str(parent))
695     common.logger.write("File "+str(file)+" has the following parents: "+str(parent))
696     if newFile :
697     try:
698     numEventsInFile = self.eventsbyfile[file]
699     common.logger.debug(6, "File "+str(file)+" has "+str(numEventsInFile)+" events")
700     # increase filesEventCount
701 ewv 1.250 filesEventCount += numEventsInFile
702 afanfani 1.237 # Add file to current job
703     parString += '\\\"' + file + '\\\"\,'
704     newFile = 0
705     except KeyError:
706     common.logger.message("File "+str(file)+" has unknown number of events: skipping")
707     eventsPerJobRequested = min(eventsPerJobRequested, eventsRemaining)
708 ewv 1.250 #common.logger.message("AF filesEventCount %s - jobSkipEventCount %s "%(filesEventCount,jobSkipEventCount))
709 afanfani 1.237 # if less events in file remain than eventsPerJobRequested
710     if ( filesEventCount - jobSkipEventCount < eventsPerJobRequested):
711     #AF
712     #AF skip fileboundary part
713     #AF
714     # go to next file
715     newFile = 1
716     fileCount += 1
717     # if events in file equal to eventsPerJobRequested
718     elif ( filesEventCount - jobSkipEventCount == eventsPerJobRequested ) :
719     # close job and touch new file
720     fullString = parString[:-2]
721     if self.useParent:
722     fullParentString = pString[:-2]
723     list_of_lists.append([fullString,fullParentString,str(eventsPerJobRequested),str(jobSkipEventCount)])
724     else:
725     list_of_lists.append([fullString,str(eventsPerJobRequested),str(jobSkipEventCount)])
726     common.logger.debug(3,"Job "+str(jobCount+1)+" can run over "+str(eventsPerJobRequested)+" events.")
727     self.jobDestination.append(blockSites[block])
728     common.logger.debug(5,"Job "+str(jobCount+1)+" Destination: "+str(self.jobDestination[jobCount]))
729     jobsOfBlock[block].append(jobCount+1)
730     # reset counter
731     jobCount = jobCount + 1
732     totalEventCount = totalEventCount + eventsPerJobRequested
733     eventsRemaining = eventsRemaining - eventsPerJobRequested
734     jobSkipEventCount = 0
735     # reset file
736     pString = ""
737     parString = ""
738     filesEventCount = 0
739     newFile = 1
740     fileCount += 1
741    
742     # if more events in file remain than eventsPerJobRequested
743     else :
744     # close job but don't touch new file
745     fullString = parString[:-2]
746     if self.useParent:
747     fullParentString = pString[:-2]
748     list_of_lists.append([fullString,fullParentString,str(eventsPerJobRequested),str(jobSkipEventCount)])
749     else:
750     list_of_lists.append([fullString,str(eventsPerJobRequested),str(jobSkipEventCount)])
751     common.logger.debug(3,"Job "+str(jobCount+1)+" can run over "+str(eventsPerJobRequested)+" events.")
752     self.jobDestination.append(blockSites[block])
753     common.logger.debug(5,"Job "+str(jobCount+1)+" Destination: "+str(self.jobDestination[jobCount]))
754     jobsOfBlock[block].append(jobCount+1)
755     # increase counter
756     jobCount = jobCount + 1
757     totalEventCount = totalEventCount + eventsPerJobRequested
758     eventsRemaining = eventsRemaining - eventsPerJobRequested
759     # calculate skip events for last file
760     # use filesEventCount (contains several files), jobSkipEventCount and eventsPerJobRequest
761     jobSkipEventCount = eventsPerJobRequested - (filesEventCount - jobSkipEventCount - self.eventsbyfile[file])
762     # remove all but the last file
763     filesEventCount = self.eventsbyfile[file]
764     if self.useParent:
765     for f in parent : pString += '\\\"' + f + '\\\"\,'
766     parString = '\\\"' + file + '\\\"\,'
767     pass # END if
768     pass # END while (iterate over files in the block)
769     pass # END while (iterate over blocks in the dataset)
770     self.ncjobs = self.total_number_of_jobs = jobCount
771     if (eventsRemaining > 0 and jobCount < totalNumberOfJobs ):
772     common.logger.message("eventsRemaining "+str(eventsRemaining))
773     common.logger.message("jobCount "+str(jobCount))
774     common.logger.message(" totalNumberOfJobs "+str(totalNumberOfJobs))
775     common.logger.message("Could not run on all requested events because some blocks not hosted at allowed sites.")
776     common.logger.message(str(jobCount)+" job(s) can run on "+str(totalEventCount)+" events.\n")
777    
778     # screen output
779     screenOutput = "List of jobs and available destination sites:\n\n"
780    
781     #AF
782 ewv 1.250 #AF skip check on block with no sites
783     #AF
784 afanfani 1.237 self.list_of_args = list_of_lists
785    
786     return
787    
788    
789    
790 slacapra 1.21 def jobSplittingNoInput(self):
791 slacapra 1.9 """
792     Perform job splitting based on number of event per job
793     """
794     common.logger.debug(5,'Splitting per events')
795 fanzago 1.130
796 ewv 1.131 if (self.selectEventsPerJob):
797 fanzago 1.130 common.logger.message('Required '+str(self.eventsPerJob)+' events per job ')
798     if (self.selectNumberOfJobs):
799     common.logger.message('Required '+str(self.theNumberOfJobs)+' jobs in total ')
800     if (self.selectTotalNumberEvents):
801     common.logger.message('Required '+str(self.total_number_of_events)+' events in total ')
802 slacapra 1.9
803 slacapra 1.10 if (self.total_number_of_events < 0):
804     msg='Cannot split jobs per Events with "-1" as total number of events'
805     raise CrabException(msg)
806    
807 slacapra 1.22 if (self.selectEventsPerJob):
808 spiga 1.65 if (self.selectTotalNumberEvents):
809     self.total_number_of_jobs = int(self.total_number_of_events/self.eventsPerJob)
810 ewv 1.131 elif(self.selectNumberOfJobs) :
811 spiga 1.65 self.total_number_of_jobs =self.theNumberOfJobs
812 ewv 1.131 self.total_number_of_events =int(self.theNumberOfJobs*self.eventsPerJob)
813 spiga 1.65
814 slacapra 1.22 elif (self.selectNumberOfJobs) :
815     self.total_number_of_jobs = self.theNumberOfJobs
816     self.eventsPerJob = int(self.total_number_of_events/self.total_number_of_jobs)
817 ewv 1.131
818 slacapra 1.9 common.logger.debug(5,'N jobs '+str(self.total_number_of_jobs))
819    
820     # is there any remainder?
821     check = int(self.total_number_of_events) - (int(self.total_number_of_jobs)*self.eventsPerJob)
822    
823     common.logger.debug(5,'Check '+str(check))
824    
825 gutsche 1.35 common.logger.message(str(self.total_number_of_jobs)+' jobs can be created, each for '+str(self.eventsPerJob)+' for a total of '+str(self.total_number_of_jobs*self.eventsPerJob)+' events')
826 slacapra 1.9 if check > 0:
827 gutsche 1.35 common.logger.message('Warning: asked '+str(self.total_number_of_events)+' but can do only '+str(int(self.total_number_of_jobs)*self.eventsPerJob))
828 slacapra 1.9
829 slacapra 1.10 # argument is seed number.$i
830 slacapra 1.9 self.list_of_args = []
831     for i in range(self.total_number_of_jobs):
832 gutsche 1.35 ## Since there is no input, any site is good
833 ewv 1.131 self.jobDestination.append([""]) #must be empty to write correctly the xml
834 slacapra 1.90 args=[]
835 spiga 1.57 if (self.firstRun):
836 slacapra 1.138 ## pythia first run
837 slacapra 1.90 args.append(str(self.firstRun)+str(i))
838 ewv 1.258 if (self.generator in self.managedGenerators):
839 ewv 1.262 if (self.generator == 'comphep' and i == 0):
840     # COMPHEP is brain-dead and wants event #'s like 1,100,200,300
841     args.append('1')
842     else:
843     args.append(str(i*self.eventsPerJob))
844 slacapra 1.90 self.list_of_args.append(args)
845 gutsche 1.3 return
846    
847 spiga 1.42
848 spiga 1.187 def jobSplittingForScript(self):
849 spiga 1.42 """
850     Perform job splitting based on number of job
851     """
852     common.logger.debug(5,'Splitting per job')
853     common.logger.message('Required '+str(self.theNumberOfJobs)+' jobs in total ')
854    
855     self.total_number_of_jobs = self.theNumberOfJobs
856    
857     common.logger.debug(5,'N jobs '+str(self.total_number_of_jobs))
858    
859     common.logger.message(str(self.total_number_of_jobs)+' jobs can be created')
860    
861     # argument is seed number.$i
862     self.list_of_args = []
863     for i in range(self.total_number_of_jobs):
864     self.jobDestination.append([""])
865     self.list_of_args.append([str(i)])
866     return
867    
868 spiga 1.208 def split(self, jobParams,firstJobID):
869 ewv 1.131
870 gutsche 1.3 njobs = self.total_number_of_jobs
871 slacapra 1.9 arglist = self.list_of_args
872 slacapra 1.263 if njobs==0:
873     raise CrabException("Ask to split "+str(njobs)+" jobs: aborting")
874    
875 gutsche 1.3 # create the empty structure
876     for i in range(njobs):
877     jobParams.append("")
878 ewv 1.131
879 spiga 1.165 listID=[]
880     listField=[]
881 spiga 1.208 for id in range(njobs):
882     job = id + int(firstJobID)
883     jobParams[id] = arglist[id]
884 spiga 1.167 listID.append(job+1)
885 spiga 1.162 job_ToSave ={}
886 spiga 1.169 concString = ' '
887 spiga 1.165 argu=''
888 spiga 1.208 if len(jobParams[id]):
889     argu += concString.join(jobParams[id] )
890 spiga 1.187 job_ToSave['arguments']= str(job+1)+' '+argu
891 spiga 1.208 job_ToSave['dlsDestination']= self.jobDestination[id]
892 spiga 1.165 listField.append(job_ToSave)
893 spiga 1.169 msg="Job "+str(job)+" Arguments: "+str(job+1)+" "+argu+"\n" \
894 spiga 1.208 +" Destination: "+str(self.jobDestination[id])
895 spiga 1.165 common.logger.debug(5,msg)
896 spiga 1.187 common._db.updateJob_(listID,listField)
897 spiga 1.181 self.argsList = (len(jobParams[0])+1)
898 gutsche 1.3
899     return
900 ewv 1.131
901 gutsche 1.3 def numberOfJobs(self):
902     return self.total_number_of_jobs
903    
904 slacapra 1.1 def getTarBall(self, exe):
905     """
906     Return the TarBall with lib and exe
907     """
908 slacapra 1.242 self.tgzNameWithPath = common.work_space.pathForTgz()+self.tgz_name
909 slacapra 1.1 if os.path.exists(self.tgzNameWithPath):
910     return self.tgzNameWithPath
911    
912     # Prepare a tar gzipped file with user binaries.
913     self.buildTar_(exe)
914    
915     return string.strip(self.tgzNameWithPath)
916    
917     def buildTar_(self, executable):
918    
919     # First of all declare the user Scram area
920     swArea = self.scram.getSWArea_()
921     swReleaseTop = self.scram.getReleaseTop_()
922 ewv 1.131
923 slacapra 1.1 ## check if working area is release top
924     if swReleaseTop == '' or swArea == swReleaseTop:
925 afanfani 1.172 common.logger.debug(3,"swArea = "+swArea+" swReleaseTop ="+swReleaseTop)
926 slacapra 1.1 return
927    
928 slacapra 1.61 import tarfile
929     try: # create tar ball
930     tar = tarfile.open(self.tgzNameWithPath, "w:gz")
931     ## First find the executable
932 slacapra 1.86 if (self.executable != ''):
933 slacapra 1.61 exeWithPath = self.scram.findFile_(executable)
934     if ( not exeWithPath ):
935     raise CrabException('User executable '+executable+' not found')
936 ewv 1.131
937 slacapra 1.61 ## then check if it's private or not
938     if exeWithPath.find(swReleaseTop) == -1:
939     # the exe is private, so we must ship
940     common.logger.debug(5,"Exe "+exeWithPath+" to be tarred")
941     path = swArea+'/'
942 corvo 1.85 # distinguish case when script is in user project area or given by full path somewhere else
943     if exeWithPath.find(path) >= 0 :
944     exe = string.replace(exeWithPath, path,'')
945 slacapra 1.129 tar.add(path+exe,exe)
946 corvo 1.85 else :
947     tar.add(exeWithPath,os.path.basename(executable))
948 slacapra 1.61 pass
949     else:
950     # the exe is from release, we'll find it on WN
951     pass
952 ewv 1.131
953 slacapra 1.61 ## Now get the libraries: only those in local working area
954 slacapra 1.256 tar.dereference=True
955 slacapra 1.61 libDir = 'lib'
956     lib = swArea+'/' +libDir
957     common.logger.debug(5,"lib "+lib+" to be tarred")
958     if os.path.exists(lib):
959     tar.add(lib,libDir)
960 ewv 1.131
961 slacapra 1.61 ## Now check if module dir is present
962     moduleDir = 'module'
963     module = swArea + '/' + moduleDir
964     if os.path.isdir(module):
965     tar.add(module,moduleDir)
966 slacapra 1.256 tar.dereference=False
967 slacapra 1.61
968     ## Now check if any data dir(s) is present
969 spiga 1.179 self.dataExist = False
970 slacapra 1.212 todo_list = [(i, i) for i in os.listdir(swArea+"/src")]
971 slacapra 1.206 while len(todo_list):
972     entry, name = todo_list.pop()
973 slacapra 1.211 if name.startswith('crab_0_') or name.startswith('.') or name == 'CVS':
974 slacapra 1.206 continue
975 slacapra 1.212 if os.path.isdir(swArea+"/src/"+entry):
976 slacapra 1.206 entryPath = entry + '/'
977 slacapra 1.212 todo_list += [(entryPath + i, i) for i in os.listdir(swArea+"/src/"+entry)]
978 slacapra 1.206 if name == 'data':
979     self.dataExist=True
980     common.logger.debug(5,"data "+entry+" to be tarred")
981 slacapra 1.212 tar.add(swArea+"/src/"+entry,"src/"+entry)
982 slacapra 1.206 pass
983     pass
984 ewv 1.182
985 spiga 1.179 ### CMSSW ParameterSet
986     if not self.pset is None:
987     cfg_file = common.work_space.jobDir()+self.configFilename()
988 ewv 1.182 tar.add(cfg_file,self.configFilename())
989 slacapra 1.61
990 fanzago 1.93
991 fanzago 1.152 ## Add ProdCommon dir to tar
992 slacapra 1.211 prodcommonDir = './'
993     prodcommonPath = os.environ['CRABDIR'] + '/' + 'external/'
994 spiga 1.244 neededStuff = ['ProdCommon/__init__.py','ProdCommon/FwkJobRep', 'ProdCommon/CMSConfigTools', \
995     'ProdCommon/Core', 'ProdCommon/MCPayloads', 'IMProv', 'ProdCommon/Storage']
996 slacapra 1.214 for file in neededStuff:
997     tar.add(prodcommonPath+file,prodcommonDir+file)
998 spiga 1.179
999     ##### ML stuff
1000     ML_file_list=['report.py', 'DashboardAPI.py', 'Logger.py', 'ProcInfo.py', 'apmon.py']
1001     path=os.environ['CRABDIR'] + '/python/'
1002     for file in ML_file_list:
1003     tar.add(path+file,file)
1004    
1005     ##### Utils
1006 spiga 1.238 Utils_file_list=['parseCrabFjr.py','writeCfg.py', 'fillCrabFjr.py','cmscp.py']
1007 spiga 1.179 for file in Utils_file_list:
1008     tar.add(path+file,file)
1009 ewv 1.131
1010 ewv 1.182 ##### AdditionalFiles
1011 slacapra 1.253 tar.dereference=True
1012 spiga 1.179 for file in self.additional_inbox_files:
1013     tar.add(file,string.split(file,'/')[-1])
1014 slacapra 1.253 tar.dereference=False
1015 slacapra 1.263 common.logger.debug(5,"Files in "+self.tgzNameWithPath+" : "+str(tar.getnames()))
1016 ewv 1.182
1017 slacapra 1.61 tar.close()
1018 mcinquil 1.241 except IOError, exc:
1019     common.logger.write(str(exc))
1020 slacapra 1.220 raise CrabException('Could not create tar-ball '+self.tgzNameWithPath)
1021 mcinquil 1.241 except tarfile.TarError, exc:
1022     common.logger.write(str(exc))
1023 slacapra 1.206 raise CrabException('Could not create tar-ball '+self.tgzNameWithPath)
1024 gutsche 1.72
1025     ## check for tarball size
1026     tarballinfo = os.stat(self.tgzNameWithPath)
1027     if ( tarballinfo.st_size > self.MaxTarBallSize*1024*1024 ) :
1028 spiga 1.238 msg = 'Input sandbox size of ' + str(float(tarballinfo.st_size)/1024.0/1024.0) + ' MB is larger than the allowed ' + str(self.MaxTarBallSize) \
1029 ewv 1.250 +'MB input sandbox limit \n'
1030 spiga 1.238 msg += ' and not supported by the direct GRID submission system.\n'
1031     msg += ' Please use the CRAB server mode by setting server_name=<NAME> in section [CRAB] of your crab.cfg.\n'
1032     msg += ' For further infos please see https://twiki.cern.ch/twiki/bin/view/CMS/CrabServer#CRABSERVER_for_Users'
1033     raise CrabException(msg)
1034 gutsche 1.72
1035 slacapra 1.61 ## create tar-ball with ML stuff
1036 slacapra 1.97
1037 spiga 1.165 def wsSetupEnvironment(self, nj=0):
1038 slacapra 1.1 """
1039     Returns part of a job script which prepares
1040     the execution environment for the job 'nj'.
1041     """
1042 ewv 1.184 if (self.CMSSW_major >= 2 and self.CMSSW_minor >= 1) or (self.CMSSW_major >= 3):
1043     psetName = 'pset.py'
1044     else:
1045     psetName = 'pset.cfg'
1046 slacapra 1.1 # Prepare JobType-independent part
1047 ewv 1.160 txt = '\n#Written by cms_cmssw::wsSetupEnvironment\n'
1048 fanzago 1.133 txt += 'echo ">>> setup environment"\n'
1049 ewv 1.131 txt += 'if [ $middleware == LCG ]; then \n'
1050 gutsche 1.3 txt += self.wsSetupCMSLCGEnvironment_()
1051     txt += 'elif [ $middleware == OSG ]; then\n'
1052 gutsche 1.43 txt += ' WORKING_DIR=`/bin/mktemp -d $OSG_WN_TMP/cms_XXXXXXXXXXXX`\n'
1053 ewv 1.132 txt += ' if [ ! $? == 0 ] ;then\n'
1054 fanzago 1.161 txt += ' echo "ERROR ==> OSG $WORKING_DIR could not be created on WN `hostname`"\n'
1055     txt += ' job_exit_code=10016\n'
1056     txt += ' func_exit\n'
1057 gutsche 1.3 txt += ' fi\n'
1058 fanzago 1.133 txt += ' echo ">>> Created working directory: $WORKING_DIR"\n'
1059 gutsche 1.3 txt += '\n'
1060     txt += ' echo "Change to working directory: $WORKING_DIR"\n'
1061     txt += ' cd $WORKING_DIR\n'
1062 fanzago 1.133 txt += ' echo ">>> current directory (WORKING_DIR): $WORKING_DIR"\n'
1063 ewv 1.131 txt += self.wsSetupCMSOSGEnvironment_()
1064 gutsche 1.3 txt += 'fi\n'
1065 slacapra 1.1
1066     # Prepare JobType-specific part
1067     scram = self.scram.commandName()
1068     txt += '\n\n'
1069 fanzago 1.133 txt += 'echo ">>> specific cmssw setup environment:"\n'
1070     txt += 'echo "CMSSW_VERSION = '+self.version+'"\n'
1071 slacapra 1.1 txt += scram+' project CMSSW '+self.version+'\n'
1072     txt += 'status=$?\n'
1073     txt += 'if [ $status != 0 ] ; then\n'
1074 fanzago 1.161 txt += ' echo "ERROR ==> CMSSW '+self.version+' not found on `hostname`" \n'
1075     txt += ' job_exit_code=10034\n'
1076 fanzago 1.163 txt += ' func_exit\n'
1077 slacapra 1.1 txt += 'fi \n'
1078     txt += 'cd '+self.version+'\n'
1079 fanzago 1.99 txt += 'SOFTWARE_DIR=`pwd`\n'
1080 fanzago 1.133 txt += 'echo ">>> current directory (SOFTWARE_DIR): $SOFTWARE_DIR" \n'
1081 slacapra 1.1 txt += 'eval `'+scram+' runtime -sh | grep -v SCRAMRT_LSB_JOBNAME`\n'
1082 fanzago 1.180 txt += 'if [ $? != 0 ] ; then\n'
1083     txt += ' echo "ERROR ==> Problem with the command: "\n'
1084     txt += ' echo "eval \`'+scram+' runtime -sh | grep -v SCRAMRT_LSB_JOBNAME \` at `hostname`"\n'
1085     txt += ' job_exit_code=10034\n'
1086     txt += ' func_exit\n'
1087     txt += 'fi \n'
1088 slacapra 1.1 # Handle the arguments:
1089     txt += "\n"
1090 gutsche 1.7 txt += "## number of arguments (first argument always jobnumber)\n"
1091 slacapra 1.1 txt += "\n"
1092 spiga 1.165 txt += "if [ $nargs -lt "+str(self.argsList)+" ]\n"
1093 slacapra 1.1 txt += "then\n"
1094 fanzago 1.161 txt += " echo 'ERROR ==> Too few arguments' +$nargs+ \n"
1095     txt += ' job_exit_code=50113\n'
1096     txt += " func_exit\n"
1097 slacapra 1.1 txt += "fi\n"
1098     txt += "\n"
1099    
1100     # Prepare job-specific part
1101     job = common.job_list[nj]
1102 ewv 1.131 if (self.datasetPath):
1103 spiga 1.238 self.primaryDataset = self.datasetPath.split("/")[1]
1104     DataTier = self.datasetPath.split("/")[2]
1105 fanzago 1.93 txt += '\n'
1106     txt += 'DatasetPath='+self.datasetPath+'\n'
1107    
1108 spiga 1.238 txt += 'PrimaryDataset='+self.primaryDataset +'\n'
1109     txt += 'DataTier='+DataTier+'\n'
1110 fanzago 1.96 txt += 'ApplicationFamily=cmsRun\n'
1111 fanzago 1.93
1112     else:
1113 ewv 1.250 self.primaryDataset = 'null'
1114 fanzago 1.93 txt += 'DatasetPath=MCDataTier\n'
1115     txt += 'PrimaryDataset=null\n'
1116     txt += 'DataTier=null\n'
1117     txt += 'ApplicationFamily=MCDataTier\n'
1118 ewv 1.170 if self.pset != None:
1119 spiga 1.42 pset = os.path.basename(job.configFilename())
1120     txt += '\n'
1121 spiga 1.95 txt += 'cp $RUNTIME_AREA/'+pset+' .\n'
1122 spiga 1.42 if (self.datasetPath): # standard job
1123 ewv 1.160 txt += 'InputFiles=${args[1]}; export InputFiles\n'
1124 ewv 1.226 if (self.useParent):
1125 spiga 1.204 txt += 'ParentFiles=${args[2]}; export ParentFiles\n'
1126     txt += 'MaxEvents=${args[3]}; export MaxEvents\n'
1127     txt += 'SkipEvents=${args[4]}; export SkipEvents\n'
1128     else:
1129     txt += 'MaxEvents=${args[2]}; export MaxEvents\n'
1130     txt += 'SkipEvents=${args[3]}; export SkipEvents\n'
1131 spiga 1.42 txt += 'echo "Inputfiles:<$InputFiles>"\n'
1132 spiga 1.204 if (self.useParent): txt += 'echo "ParentFiles:<$ParentFiles>"\n'
1133 spiga 1.42 txt += 'echo "MaxEvents:<$MaxEvents>"\n'
1134     txt += 'echo "SkipEvents:<$SkipEvents>"\n'
1135     else: # pythia like job
1136 ewv 1.258 argNum = 1
1137 ewv 1.160 txt += 'PreserveSeeds=' + ','.join(self.preserveSeeds) + '; export PreserveSeeds\n'
1138     txt += 'IncrementSeeds=' + ','.join(self.incrementSeeds) + '; export IncrementSeeds\n'
1139     txt += 'echo "PreserveSeeds: <$PreserveSeeds>"\n'
1140     txt += 'echo "IncrementSeeds:<$IncrementSeeds>"\n'
1141 slacapra 1.90 if (self.firstRun):
1142 ewv 1.258 txt += 'export FirstRun=${args[%s]}\n' % argNum
1143 spiga 1.57 txt += 'echo "FirstRun: <$FirstRun>"\n'
1144 ewv 1.258 argNum += 1
1145 ewv 1.262 if (self.generator == 'madgraph'):
1146 ewv 1.259 txt += 'export FirstEvent=${args[%s]}\n' % argNum
1147     txt += 'echo "FirstEvent:<$FirstEvent>"\n'
1148     argNum += 1
1149 ewv 1.262 elif (self.generator == 'comphep'):
1150     txt += 'export CompHEPFirstEvent=${args[%s]}\n' % argNum
1151     txt += 'echo "CompHEPFirstEvent:<$CompHEPFirstEvent>"\n'
1152     argNum += 1
1153 slacapra 1.90
1154 ewv 1.184 txt += 'mv -f ' + pset + ' ' + psetName + '\n'
1155 slacapra 1.1
1156    
1157 fanzago 1.163 if self.pset != None:
1158 ewv 1.184 # FUTURE: Can simply for 2_1_x and higher
1159 spiga 1.42 txt += '\n'
1160 spiga 1.197 if self.debug_wrapper==True:
1161 spiga 1.188 txt += 'echo "***** cat ' + psetName + ' *********"\n'
1162     txt += 'cat ' + psetName + '\n'
1163     txt += 'echo "****** end ' + psetName + ' ********"\n'
1164     txt += '\n'
1165 ewv 1.226 if (self.CMSSW_major >= 2 and self.CMSSW_minor >= 1) or (self.CMSSW_major >= 3):
1166     txt += 'PSETHASH=`edmConfigHash ' + psetName + '` \n'
1167     else:
1168     txt += 'PSETHASH=`edmConfigHash < ' + psetName + '` \n'
1169 fanzago 1.94 txt += 'echo "PSETHASH = $PSETHASH" \n'
1170 fanzago 1.93 txt += '\n'
1171 gutsche 1.3 return txt
1172 slacapra 1.176
1173 fanzago 1.166 def wsUntarSoftware(self, nj=0):
1174 gutsche 1.3 """
1175     Put in the script the commands to build an executable
1176     or a library.
1177     """
1178    
1179 fanzago 1.166 txt = '\n#Written by cms_cmssw::wsUntarSoftware\n'
1180 gutsche 1.3
1181     if os.path.isfile(self.tgzNameWithPath):
1182 fanzago 1.133 txt += 'echo ">>> tar xzvf $RUNTIME_AREA/'+os.path.basename(self.tgzNameWithPath)+' :" \n'
1183 slacapra 1.255 txt += 'tar xzf $RUNTIME_AREA/'+os.path.basename(self.tgzNameWithPath)+'\n'
1184 spiga 1.199 if self.debug_wrapper:
1185 slacapra 1.255 txt += 'tar tzvf $RUNTIME_AREA/'+os.path.basename(self.tgzNameWithPath)+'\n'
1186 spiga 1.199 txt += 'ls -Al \n'
1187 gutsche 1.3 txt += 'untar_status=$? \n'
1188     txt += 'if [ $untar_status -ne 0 ]; then \n'
1189 fanzago 1.161 txt += ' echo "ERROR ==> Untarring .tgz file failed"\n'
1190     txt += ' job_exit_code=$untar_status\n'
1191     txt += ' func_exit\n'
1192 gutsche 1.3 txt += 'else \n'
1193     txt += ' echo "Successful untar" \n'
1194     txt += 'fi \n'
1195 gutsche 1.50 txt += '\n'
1196 slacapra 1.211 txt += 'echo ">>> Include $RUNTIME_AREA in PYTHONPATH:"\n'
1197 gutsche 1.50 txt += 'if [ -z "$PYTHONPATH" ]; then\n'
1198 slacapra 1.211 txt += ' export PYTHONPATH=$RUNTIME_AREA/\n'
1199 gutsche 1.50 txt += 'else\n'
1200 slacapra 1.211 txt += ' export PYTHONPATH=$RUNTIME_AREA/:${PYTHONPATH}\n'
1201 fanzago 1.93 txt += 'echo "PYTHONPATH=$PYTHONPATH"\n'
1202 gutsche 1.50 txt += 'fi\n'
1203     txt += '\n'
1204    
1205 gutsche 1.3 pass
1206 ewv 1.131
1207 slacapra 1.1 return txt
1208 ewv 1.170
1209 fanzago 1.166 def wsBuildExe(self, nj=0):
1210     """
1211     Put in the script the commands to build an executable
1212     or a library.
1213     """
1214    
1215     txt = '\n#Written by cms_cmssw::wsBuildExe\n'
1216     txt += 'echo ">>> moving CMSSW software directories in `pwd`" \n'
1217    
1218 ewv 1.170 txt += 'rm -r lib/ module/ \n'
1219     txt += 'mv $RUNTIME_AREA/lib/ . \n'
1220     txt += 'mv $RUNTIME_AREA/module/ . \n'
1221 spiga 1.186 if self.dataExist == True:
1222     txt += 'rm -r src/ \n'
1223     txt += 'mv $RUNTIME_AREA/src/ . \n'
1224 ewv 1.182 if len(self.additional_inbox_files)>0:
1225 spiga 1.179 for file in self.additional_inbox_files:
1226 spiga 1.191 txt += 'mv $RUNTIME_AREA/'+os.path.basename(file)+' . \n'
1227 slacapra 1.214 # txt += 'mv $RUNTIME_AREA/ProdCommon/ . \n'
1228     # txt += 'mv $RUNTIME_AREA/IMProv/ . \n'
1229 ewv 1.170
1230 slacapra 1.211 txt += 'echo ">>> Include $RUNTIME_AREA in PYTHONPATH:"\n'
1231 fanzago 1.166 txt += 'if [ -z "$PYTHONPATH" ]; then\n'
1232 slacapra 1.211 txt += ' export PYTHONPATH=$RUNTIME_AREA/\n'
1233 fanzago 1.166 txt += 'else\n'
1234 slacapra 1.211 txt += ' export PYTHONPATH=$RUNTIME_AREA/:${PYTHONPATH}\n'
1235 fanzago 1.166 txt += 'echo "PYTHONPATH=$PYTHONPATH"\n'
1236     txt += 'fi\n'
1237     txt += '\n'
1238    
1239     return txt
1240 slacapra 1.1
1241 ewv 1.131
1242 slacapra 1.1 def executableName(self):
1243 ewv 1.192 if self.scriptExe:
1244 spiga 1.42 return "sh "
1245     else:
1246     return self.executable
1247 slacapra 1.1
1248     def executableArgs(self):
1249 ewv 1.160 # FUTURE: This function tests the CMSSW version. Can be simplified as we drop support for old versions
1250 slacapra 1.70 if self.scriptExe:#CarlosDaniele
1251 spiga 1.42 return self.scriptExe + " $NJob"
1252 fanzago 1.115 else:
1253 ewv 1.160 ex_args = ""
1254 ewv 1.171 # FUTURE: This tests the CMSSW version. Can remove code as versions deprecated
1255 ewv 1.160 # Framework job report
1256 ewv 1.184 if (self.CMSSW_major >= 1 and self.CMSSW_minor >= 5) or (self.CMSSW_major >= 2):
1257 fanzago 1.166 ex_args += " -j $RUNTIME_AREA/crab_fjr_$NJob.xml"
1258 ewv 1.184 # Type of config file
1259     if self.CMSSW_major >= 2 :
1260 ewv 1.171 ex_args += " -p pset.py"
1261 fanzago 1.115 else:
1262 ewv 1.160 ex_args += " -p pset.cfg"
1263     return ex_args
1264 slacapra 1.1
1265     def inputSandbox(self, nj):
1266     """
1267     Returns a list of filenames to be put in JDL input sandbox.
1268     """
1269     inp_box = []
1270     if os.path.isfile(self.tgzNameWithPath):
1271     inp_box.append(self.tgzNameWithPath)
1272 spiga 1.243 inp_box.append(common.work_space.jobDir() + self.scriptName)
1273 slacapra 1.1 return inp_box
1274    
1275     def outputSandbox(self, nj):
1276     """
1277     Returns a list of filenames to be put in JDL output sandbox.
1278     """
1279     out_box = []
1280    
1281     ## User Declared output files
1282 slacapra 1.54 for out in (self.output_file+self.output_file_sandbox):
1283 ewv 1.131 n_out = nj + 1
1284 slacapra 1.207 out_box.append(numberFile(out,str(n_out)))
1285 slacapra 1.1 return out_box
1286    
1287    
1288     def wsRenameOutput(self, nj):
1289     """
1290     Returns part of a job script which renames the produced files.
1291     """
1292    
1293 ewv 1.160 txt = '\n#Written by cms_cmssw::wsRenameOutput\n'
1294 fanzago 1.148 txt += 'echo ">>> current directory (SOFTWARE_DIR): $SOFTWARE_DIR" \n'
1295     txt += 'echo ">>> current directory content:"\n'
1296 ewv 1.226 if self.debug_wrapper:
1297 spiga 1.199 txt += 'ls -Al\n'
1298 fanzago 1.145 txt += '\n'
1299 slacapra 1.54
1300 fanzago 1.128 for fileWithSuffix in (self.output_file):
1301 slacapra 1.207 output_file_num = numberFile(fileWithSuffix, '$NJob')
1302 slacapra 1.1 txt += '\n'
1303 gutsche 1.7 txt += '# check output file\n'
1304 slacapra 1.106 txt += 'if [ -e ./'+fileWithSuffix+' ] ; then\n'
1305 ewv 1.147 if (self.copy_data == 1): # For OSG nodes, file is in $WORKING_DIR, should not be moved to $RUNTIME_AREA
1306     txt += ' mv '+fileWithSuffix+' '+output_file_num+'\n'
1307 spiga 1.209 txt += ' ln -s `pwd`/'+output_file_num+' $RUNTIME_AREA/'+fileWithSuffix+'\n'
1308 ewv 1.147 else:
1309     txt += ' mv '+fileWithSuffix+' $RUNTIME_AREA/'+output_file_num+'\n'
1310     txt += ' ln -s $RUNTIME_AREA/'+output_file_num+' $RUNTIME_AREA/'+fileWithSuffix+'\n'
1311 slacapra 1.106 txt += 'else\n'
1312 fanzago 1.161 txt += ' job_exit_code=60302\n'
1313     txt += ' echo "WARNING: Output file '+fileWithSuffix+' not found"\n'
1314 ewv 1.156 if common.scheduler.name().upper() == 'CONDOR_G':
1315 gutsche 1.7 txt += ' if [ $middleware == OSG ]; then \n'
1316     txt += ' echo "prepare dummy output file"\n'
1317     txt += ' echo "Processing of job output failed" > $RUNTIME_AREA/'+output_file_num+'\n'
1318     txt += ' fi \n'
1319 slacapra 1.1 txt += 'fi\n'
1320 slacapra 1.105 file_list = []
1321     for fileWithSuffix in (self.output_file):
1322 spiga 1.246 file_list.append(numberFile('$SOFTWARE_DIR/'+fileWithSuffix, '$NJob'))
1323 ewv 1.131
1324 spiga 1.245 txt += 'file_list="'+string.join(file_list,',')+'"\n'
1325 fanzago 1.149 txt += '\n'
1326 fanzago 1.148 txt += 'echo ">>> current directory (SOFTWARE_DIR): $SOFTWARE_DIR" \n'
1327     txt += 'echo ">>> current directory content:"\n'
1328 ewv 1.226 if self.debug_wrapper:
1329 spiga 1.199 txt += 'ls -Al\n'
1330 fanzago 1.148 txt += '\n'
1331 gutsche 1.7 txt += 'cd $RUNTIME_AREA\n'
1332 fanzago 1.133 txt += 'echo ">>> current directory (RUNTIME_AREA): $RUNTIME_AREA"\n'
1333 slacapra 1.1 return txt
1334    
1335 slacapra 1.63 def getRequirements(self, nj=[]):
1336 slacapra 1.1 """
1337 ewv 1.131 return job requirements to add to jdl files
1338 slacapra 1.1 """
1339     req = ''
1340 slacapra 1.47 if self.version:
1341 slacapra 1.10 req='Member("VO-cms-' + \
1342 slacapra 1.47 self.version + \
1343 slacapra 1.10 '", other.GlueHostApplicationSoftwareRunTimeEnvironment)'
1344 ewv 1.192 if self.executable_arch:
1345 gutsche 1.107 req+=' && Member("VO-cms-' + \
1346 slacapra 1.105 self.executable_arch + \
1347     '", other.GlueHostApplicationSoftwareRunTimeEnvironment)'
1348 gutsche 1.35
1349     req = req + ' && (other.GlueHostNetworkAdapterOutboundIP)'
1350 afanfani 1.229 if ( common.scheduler.name() == "glitecoll" ) or ( common.scheduler.name() == "glite"):
1351 afanfani 1.158 req += ' && other.GlueCEStateStatus == "Production" '
1352 gutsche 1.35
1353 slacapra 1.1 return req
1354 gutsche 1.3
1355     def configFilename(self):
1356     """ return the config filename """
1357 ewv 1.182 # FUTURE: Can remove cfg mode for CMSSW >= 2_1_x
1358 ewv 1.184 if (self.CMSSW_major >= 2 and self.CMSSW_minor >= 1) or (self.CMSSW_major >= 3):
1359 ewv 1.182 return self.name()+'.py'
1360     else:
1361     return self.name()+'.cfg'
1362 gutsche 1.3
1363     def wsSetupCMSOSGEnvironment_(self):
1364     """
1365     Returns part of a job script which is prepares
1366     the execution environment and which is common for all CMS jobs.
1367     """
1368 ewv 1.160 txt = '\n#Written by cms_cmssw::wsSetupCMSOSGEnvironment_\n'
1369     txt += ' echo ">>> setup CMS OSG environment:"\n'
1370 fanzago 1.133 txt += ' echo "set SCRAM ARCH to ' + self.executable_arch + '"\n'
1371     txt += ' export SCRAM_ARCH='+self.executable_arch+'\n'
1372 fanzago 1.136 txt += ' echo "SCRAM_ARCH = $SCRAM_ARCH"\n'
1373 ewv 1.135 txt += ' if [ -f $OSG_APP/cmssoft/cms/cmsset_default.sh ] ;then\n'
1374 mkirn 1.40 txt += ' # Use $OSG_APP/cmssoft/cms/cmsset_default.sh to setup cms software\n'
1375 fanzago 1.133 txt += ' source $OSG_APP/cmssoft/cms/cmsset_default.sh '+self.version+'\n'
1376     txt += ' else\n'
1377 fanzago 1.161 txt += ' echo "ERROR ==> $OSG_APP/cmssoft/cms/cmsset_default.sh file not found"\n'
1378     txt += ' job_exit_code=10020\n'
1379     txt += ' func_exit\n'
1380 fanzago 1.133 txt += ' fi\n'
1381 gutsche 1.3 txt += '\n'
1382 fanzago 1.161 txt += ' echo "==> setup cms environment ok"\n'
1383 fanzago 1.136 txt += ' echo "SCRAM_ARCH = $SCRAM_ARCH"\n'
1384 gutsche 1.3
1385     return txt
1386 ewv 1.131
1387 gutsche 1.3 def wsSetupCMSLCGEnvironment_(self):
1388     """
1389     Returns part of a job script which is prepares
1390     the execution environment and which is common for all CMS jobs.
1391     """
1392 ewv 1.160 txt = '\n#Written by cms_cmssw::wsSetupCMSLCGEnvironment_\n'
1393     txt += ' echo ">>> setup CMS LCG environment:"\n'
1394 fanzago 1.133 txt += ' echo "set SCRAM ARCH and BUILD_ARCH to ' + self.executable_arch + ' ###"\n'
1395     txt += ' export SCRAM_ARCH='+self.executable_arch+'\n'
1396     txt += ' export BUILD_ARCH='+self.executable_arch+'\n'
1397     txt += ' if [ ! $VO_CMS_SW_DIR ] ;then\n'
1398 fanzago 1.161 txt += ' echo "ERROR ==> CMS software dir not found on WN `hostname`"\n'
1399     txt += ' job_exit_code=10031\n'
1400     txt += ' func_exit\n'
1401 fanzago 1.133 txt += ' else\n'
1402     txt += ' echo "Sourcing environment... "\n'
1403     txt += ' if [ ! -s $VO_CMS_SW_DIR/cmsset_default.sh ] ;then\n'
1404 fanzago 1.161 txt += ' echo "ERROR ==> cmsset_default.sh file not found into dir $VO_CMS_SW_DIR"\n'
1405     txt += ' job_exit_code=10020\n'
1406     txt += ' func_exit\n'
1407 fanzago 1.133 txt += ' fi\n'
1408     txt += ' echo "sourcing $VO_CMS_SW_DIR/cmsset_default.sh"\n'
1409     txt += ' source $VO_CMS_SW_DIR/cmsset_default.sh\n'
1410     txt += ' result=$?\n'
1411     txt += ' if [ $result -ne 0 ]; then\n'
1412 fanzago 1.161 txt += ' echo "ERROR ==> problem sourcing $VO_CMS_SW_DIR/cmsset_default.sh"\n'
1413     txt += ' job_exit_code=10032\n'
1414     txt += ' func_exit\n'
1415 fanzago 1.133 txt += ' fi\n'
1416     txt += ' fi\n'
1417     txt += ' \n'
1418 fanzago 1.161 txt += ' echo "==> setup cms environment ok"\n'
1419 gutsche 1.3 return txt
1420 gutsche 1.5
1421 spiga 1.238 def wsModifyReport(self, nj):
1422 fanzago 1.93 """
1423 ewv 1.131 insert the part of the script that modifies the FrameworkJob Report
1424 fanzago 1.93 """
1425 spiga 1.238 txt = '\n#Written by cms_cmssw::wsModifyReport\n'
1426 slacapra 1.176 publish_data = int(self.cfg_params.get('USER.publish_data',0))
1427 ewv 1.131 if (publish_data == 1):
1428 ewv 1.250
1429 fanzago 1.248 processedDataset = self.cfg_params['USER.publish_data_name']
1430 spiga 1.238
1431     txt += 'if [ $StageOutExitStatus -eq 0 ]; then\n'
1432 fanzago 1.248 txt += ' FOR_LFN=$LFNBaseName\n'
1433 fanzago 1.175 txt += 'else\n'
1434     txt += ' FOR_LFN=/copy_problems/ \n'
1435     txt += ' SE=""\n'
1436     txt += ' SE_PATH=""\n'
1437     txt += 'fi\n'
1438 ewv 1.182
1439 fanzago 1.175 txt += 'echo ">>> Modify Job Report:" \n'
1440 fanzago 1.217 txt += 'chmod a+x $RUNTIME_AREA/ProdCommon/FwkJobRep/ModifyJobReport.py\n'
1441 fanzago 1.248 txt += 'ProcessedDataset='+processedDataset+'\n'
1442     #txt += 'ProcessedDataset=$procDataset \n'
1443 fanzago 1.175 txt += 'echo "ProcessedDataset = $ProcessedDataset"\n'
1444     txt += 'echo "SE = $SE"\n'
1445     txt += 'echo "SE_PATH = $SE_PATH"\n'
1446     txt += 'echo "FOR_LFN = $FOR_LFN" \n'
1447     txt += 'echo "CMSSW_VERSION = $CMSSW_VERSION"\n\n'
1448 spiga 1.238 args = '$RUNTIME_AREA/crab_fjr_$NJob.xml $NJob $FOR_LFN $PrimaryDataset $DataTier ' \
1449 fanzago 1.248 '$USER-$ProcessedDataset-$PSETHASH $ApplicationFamily '+ \
1450 fanzago 1.247 ' $executable $CMSSW_VERSION $PSETHASH $SE $SE_PATH'
1451     txt += 'echo "$RUNTIME_AREA/ProdCommon/FwkJobRep/ModifyJobReport.py '+str(args)+'"\n'
1452     txt += '$RUNTIME_AREA/ProdCommon/FwkJobRep/ModifyJobReport.py '+str(args)+'\n'
1453 fanzago 1.175 txt += 'modifyReport_result=$?\n'
1454     txt += 'if [ $modifyReport_result -ne 0 ]; then\n'
1455     txt += ' modifyReport_result=70500\n'
1456     txt += ' job_exit_code=$modifyReport_result\n'
1457     txt += ' echo "ModifyReportResult=$modifyReport_result" | tee -a $RUNTIME_AREA/$repo\n'
1458     txt += ' echo "WARNING: Problem with ModifyJobReport"\n'
1459     txt += 'else\n'
1460     txt += ' mv NewFrameworkJobReport.xml $RUNTIME_AREA/crab_fjr_$NJob.xml\n'
1461 spiga 1.103 txt += 'fi\n'
1462 fanzago 1.93 return txt
1463 fanzago 1.99
1464 ewv 1.192 def wsParseFJR(self):
1465 spiga 1.189 """
1466 ewv 1.192 Parse the FrameworkJobReport to obtain useful infos
1467 spiga 1.189 """
1468     txt = '\n#Written by cms_cmssw::wsParseFJR\n'
1469     txt += 'echo ">>> Parse FrameworkJobReport crab_fjr.xml"\n'
1470     txt += 'if [ -s $RUNTIME_AREA/crab_fjr_$NJob.xml ]; then\n'
1471     txt += ' if [ -s $RUNTIME_AREA/parseCrabFjr.py ]; then\n'
1472 spiga 1.197 txt += ' cmd_out=`python $RUNTIME_AREA/parseCrabFjr.py --input $RUNTIME_AREA/crab_fjr_$NJob.xml --dashboard $MonitorID,$MonitorJobID '+self.debugWrap+'`\n'
1473     if self.debug_wrapper :
1474     txt += ' echo "Result of parsing the FrameworkJobReport crab_fjr.xml: $cmd_out"\n'
1475     txt += ' executable_exit_status=`python $RUNTIME_AREA/parseCrabFjr.py --input $RUNTIME_AREA/crab_fjr_$NJob.xml --exitcode`\n'
1476 spiga 1.189 txt += ' if [ $executable_exit_status -eq 50115 ];then\n'
1477     txt += ' echo ">>> crab_fjr.xml contents: "\n'
1478 spiga 1.222 txt += ' cat $RUNTIME_AREA/crab_fjr_$NJob.xml\n'
1479 spiga 1.189 txt += ' echo "Wrong FrameworkJobReport --> does not contain useful info. ExitStatus: $executable_exit_status"\n'
1480 spiga 1.197 txt += ' elif [ $executable_exit_status -eq -999 ];then\n'
1481     txt += ' echo "ExitStatus from FrameworkJobReport not available. not available. Using exit code of executable from command line."\n'
1482 spiga 1.189 txt += ' else\n'
1483     txt += ' echo "Extracted ExitStatus from FrameworkJobReport parsing output: $executable_exit_status"\n'
1484     txt += ' fi\n'
1485     txt += ' else\n'
1486     txt += ' echo "CRAB python script to parse CRAB FrameworkJobReport crab_fjr.xml is not available, using exit code of executable from command line."\n'
1487     txt += ' fi\n'
1488     #### Patch to check input data reading for CMSSW16x Hopefully we-ll remove it asap
1489 spiga 1.232 txt += ' if [ $executable_exit_status -eq 0 ];then\n'
1490     txt += ' echo ">>> Executable succeded $executable_exit_status"\n'
1491 spiga 1.233 if (self.datasetPath and not (self.dataset_pu or self.useParent)) :
1492 spiga 1.189 # VERIFY PROCESSED DATA
1493     txt += ' echo ">>> Verify list of processed files:"\n'
1494 ewv 1.196 txt += ' echo $InputFiles |tr -d \'\\\\\' |tr \',\' \'\\n\'|tr -d \'"\' > input-files.txt\n'
1495 spiga 1.200 txt += ' python $RUNTIME_AREA/parseCrabFjr.py --input $RUNTIME_AREA/crab_fjr_$NJob.xml --lfn > processed-files.txt\n'
1496 spiga 1.189 txt += ' cat input-files.txt | sort | uniq > tmp.txt\n'
1497     txt += ' mv tmp.txt input-files.txt\n'
1498     txt += ' echo "cat input-files.txt"\n'
1499     txt += ' echo "----------------------"\n'
1500     txt += ' cat input-files.txt\n'
1501     txt += ' cat processed-files.txt | sort | uniq > tmp.txt\n'
1502     txt += ' mv tmp.txt processed-files.txt\n'
1503     txt += ' echo "----------------------"\n'
1504     txt += ' echo "cat processed-files.txt"\n'
1505     txt += ' echo "----------------------"\n'
1506     txt += ' cat processed-files.txt\n'
1507     txt += ' echo "----------------------"\n'
1508     txt += ' diff -q input-files.txt processed-files.txt\n'
1509     txt += ' fileverify_status=$?\n'
1510     txt += ' if [ $fileverify_status -ne 0 ]; then\n'
1511     txt += ' executable_exit_status=30001\n'
1512     txt += ' echo "ERROR ==> not all input files processed"\n'
1513     txt += ' echo " ==> list of processed files from crab_fjr.xml differs from list in pset.cfg"\n'
1514     txt += ' echo " ==> diff input-files.txt processed-files.txt"\n'
1515     txt += ' fi\n'
1516 spiga 1.232 txt += ' elif [ $executable_exit_status -ne 0 ] || [ $executable_exit_status -ne 50015 ] || [ $executable_exit_status -ne 50017 ];then\n'
1517     txt += ' echo ">>> Executable failed $executable_exit_status"\n'
1518 spiga 1.251 txt += ' echo "ExeExitCode=$executable_exit_status" | tee -a $RUNTIME_AREA/$repo\n'
1519     txt += ' echo "EXECUTABLE_EXIT_STATUS = $executable_exit_status"\n'
1520     txt += ' job_exit_code=$executable_exit_status\n'
1521 spiga 1.232 txt += ' func_exit\n'
1522     txt += ' fi\n'
1523     txt += '\n'
1524 spiga 1.189 txt += 'else\n'
1525     txt += ' echo "CRAB FrameworkJobReport crab_fjr.xml is not available, using exit code of executable from command line."\n'
1526     txt += 'fi\n'
1527     txt += '\n'
1528     txt += 'echo "ExeExitCode=$executable_exit_status" | tee -a $RUNTIME_AREA/$repo\n'
1529     txt += 'echo "EXECUTABLE_EXIT_STATUS = $executable_exit_status"\n'
1530     txt += 'job_exit_code=$executable_exit_status\n'
1531    
1532     return txt
1533    
1534 gutsche 1.5 def setParam_(self, param, value):
1535     self._params[param] = value
1536    
1537     def getParams(self):
1538     return self._params
1539 gutsche 1.8
1540 gutsche 1.35 def uniquelist(self, old):
1541     """
1542     remove duplicates from a list
1543     """
1544     nd={}
1545     for e in old:
1546     nd[e]=0
1547     return nd.keys()
1548 mcinquil 1.121
1549 spiga 1.257 def outList(self,list=False):
1550 mcinquil 1.121 """
1551     check the dimension of the output files
1552     """
1553 spiga 1.169 txt = ''
1554     txt += 'echo ">>> list of expected files on output sandbox"\n'
1555 mcinquil 1.121 listOutFiles = []
1556 ewv 1.170 stdout = 'CMSSW_$NJob.stdout'
1557 spiga 1.169 stderr = 'CMSSW_$NJob.stderr'
1558 spiga 1.268 if len(self.output_file) <= 0:
1559     msg ="WARNING: no output files name have been defined!!\n"
1560     msg+="\tno output files will be reported back/staged\n"
1561     common.logger.message(msg)
1562 fanzago 1.148 if (self.return_data == 1):
1563 spiga 1.157 for file in (self.output_file+self.output_file_sandbox):
1564 slacapra 1.207 listOutFiles.append(numberFile(file, '$NJob'))
1565 spiga 1.169 listOutFiles.append(stdout)
1566     listOutFiles.append(stderr)
1567 ewv 1.156 else:
1568 spiga 1.157 for file in (self.output_file_sandbox):
1569 slacapra 1.207 listOutFiles.append(numberFile(file, '$NJob'))
1570 spiga 1.169 listOutFiles.append(stdout)
1571     listOutFiles.append(stderr)
1572 fanzago 1.161 txt += 'echo "output files: '+string.join(listOutFiles,' ')+'"\n'
1573 spiga 1.157 txt += 'filesToCheck="'+string.join(listOutFiles,' ')+'"\n'
1574 spiga 1.169 txt += 'export filesToCheck\n'
1575 spiga 1.268
1576 spiga 1.257 if list : return self.output_file
1577 ewv 1.170 return txt