ViewVC Help
View File | Revision Log | Show Annotations | Root Listing
root/cvsroot/COMP/CRAB/python/Splitter.py
Revision: 1.58
Committed: Fri May 3 19:11:04 2013 UTC (11 years, 11 months ago) by belforte
Content type: text/x-python
Branch: MAIN
CVS Tags: CRAB_2_8_7_patch1
Changes since 1.57: +8 -4 lines
Log Message:
fix problem with black list format https://savannah.cern.ch/bugs/index.php#comment2

File Contents

# User Rev Content
1 ewv 1.27
2 belforte 1.58 __revision__ = "$Id: Splitter.py,v 1.57 2013/04/23 09:41:13 belforte Exp $"
3     __version__ = "$Revision: 1.57 $"
4 ewv 1.27
5 spiga 1.1 import common
6     from crab_exceptions import *
7     from crab_util import *
8 ewv 1.26
9     from WMCore.DataStructs.File import File
10     from WMCore.DataStructs.Fileset import Fileset
11     from WMCore.DataStructs.Run import Run
12     from WMCore.DataStructs.Subscription import Subscription
13     from WMCore.DataStructs.Workflow import Workflow
14     from WMCore.JobSplitting.SplitterFactory import SplitterFactory
15 spiga 1.1 from WMCore.SiteScreening.BlackWhiteListParser import SEBlackWhiteListParser
16 ewv 1.43 try: # Can remove when CMSSW 3.7 and earlier are dropped
17     from FWCore.PythonUtilities.LumiList import LumiList
18     except ImportError:
19     from LumiList import LumiList
20 spiga 1.1
21     class JobSplitter:
22     def __init__( self, cfg_params, args ):
23     self.cfg_params = cfg_params
24 spiga 1.3 self.args=args
25 ewv 1.27
26     self.lumisPerJob = -1
27     self.totalNLumis = 0
28     self.theNumberOfJobs = 0
29     self.limitNJobs = False
30     self.limitTotalLumis = False
31     self.limitJobLumis = False
32    
33 spiga 1.1 #self.maxEvents
34     # init BlackWhiteListParser
35 spiga 1.24 self.seWhiteList = cfg_params.get('GRID.se_white_list',[])
36 spiga 1.14 seBlackList = cfg_params.get('GRID.se_black_list',[])
37 belforte 1.57 if common.scheduler.name().upper() == 'REMOTEGLIDEIN' :
38     # use central black list
39     removeBList = cfg_params.get("GRID.remove_default_blacklist", 0 )
40     blackAnaOps = None
41     if int(removeBList) == 0:
42     blacklist = Downloader("http://cmsdoc.cern.ch/cms/LCG/crab/config/")
43     result = blacklist.config("site_black_list.conf")
44     if result != None:
45     blackAnaOps = result
46     common.logger.debug("Enforced black list: %s "%blackAnaOps)
47     else:
48     common.logger.info("WARNING: Skipping default black list!")
49     if int(removeBList) == 0 and blackAnaOps:
50 belforte 1.58 seBlackList += blackAnaOps.strip().split(',')
51 belforte 1.57
52 belforte 1.58 if seBlackList != []:
53     common.logger.info("SE black list applied to data location: %s" %\
54 belforte 1.57 seBlackList)
55 belforte 1.58 if self.seWhiteList != []:
56     common.logger.info("SE white list applied to data location: %s" %\
57     self.seWhiteList)
58 spiga 1.24 self.blackWhiteListParser = SEBlackWhiteListParser(self.seWhiteList, seBlackList, common.logger())
59 spiga 1.1
60 spiga 1.39 ## check if has been asked for a non default file to store/read analyzed fileBlocks
61     defaultName = common.work_space.shareDir()+'AnalyzedBlocks.txt'
62     self.fileBlocks_FileName = os.path.abspath(self.cfg_params.get('CMSSW.fileblocks_file',defaultName))
63    
64 spiga 1.1
65     def checkUserSettings(self):
66     ## Events per job
67     if self.cfg_params.has_key('CMSSW.events_per_job'):
68     self.eventsPerJob =int( self.cfg_params['CMSSW.events_per_job'])
69     self.selectEventsPerJob = 1
70     else:
71     self.eventsPerJob = -1
72     self.selectEventsPerJob = 0
73    
74     ## number of jobs
75     if self.cfg_params.has_key('CMSSW.number_of_jobs'):
76     self.theNumberOfJobs =int( self.cfg_params['CMSSW.number_of_jobs'])
77     self.selectNumberOfJobs = 1
78     else:
79     self.theNumberOfJobs = 0
80     self.selectNumberOfJobs = 0
81    
82     if self.cfg_params.has_key('CMSSW.total_number_of_events'):
83     self.total_number_of_events = int(self.cfg_params['CMSSW.total_number_of_events'])
84     self.selectTotalNumberEvents = 1
85     if self.selectNumberOfJobs == 1:
86     if (self.total_number_of_events != -1) and int(self.total_number_of_events) < int(self.theNumberOfJobs):
87     msg = 'Must specify at least one event per job. total_number_of_events > number_of_jobs '
88     raise CrabException(msg)
89     else:
90     self.total_number_of_events = 0
91     self.selectTotalNumberEvents = 0
92    
93 ewv 1.37 return
94 spiga 1.1
95 ewv 1.27 def checkLumiSettings(self):
96     """
97     Check to make sure the user has specified enough information to
98     perform splitting by Lumis to run the job
99     """
100     settings = 0
101     if self.cfg_params.has_key('CMSSW.lumis_per_job'):
102     self.lumisPerJob =int( self.cfg_params['CMSSW.lumis_per_job'])
103     self.limitJobLumis = True
104     settings += 1
105    
106     if self.cfg_params.has_key('CMSSW.number_of_jobs'):
107     self.theNumberOfJobs =int( self.cfg_params['CMSSW.number_of_jobs'])
108     self.limitNJobs = True
109     settings += 1
110    
111     if self.cfg_params.has_key('CMSSW.total_number_of_lumis'):
112     self.totalNLumis = int(self.cfg_params['CMSSW.total_number_of_lumis'])
113     self.limitTotalLumis = (self.totalNLumis != -1)
114     settings += 1
115    
116     if settings != 2:
117 ewv 1.35 msg = 'When splitting by lumi section you must specify two and only two of:\n'
118 ewv 1.28 msg += ' number_of_jobs, lumis_per_job, total_number_of_lumis'
119 ewv 1.27 raise CrabException(msg)
120     if self.limitNJobs and self.limitJobLumis:
121     self.limitTotalLumis = True
122     self.totalNLumis = self.lumisPerJob * self.theNumberOfJobs
123    
124 ewv 1.37 # Has the user specified runselection?
125     if (self.cfg_params.has_key('CMSSW.runselection')):
126     common.logger.info('You have specified runselection and split by lumi.')
127     common.logger.info('Good lumi list will be the intersection of runselection and lumimask or ADS (if any).')
128     return
129 ewv 1.27
130 spiga 1.24 def ComputeSubBlockSites( self, blockSites ):
131     """
132     """
133     sub_blockSites = {}
134     for k,v in blockSites.iteritems():
135     sites=self.blackWhiteListParser.checkWhiteList(v)
136     if sites : sub_blockSites[k]=v
137     if len(sub_blockSites) < 1:
138     msg = 'WARNING: the sites %s is not hosting any part of data.'%self.seWhiteList
139     raise CrabException(msg)
140     return sub_blockSites
141    
142 spiga 1.1 ########################################################################
143     def jobSplittingByEvent( self ):
144     """
145     Perform job splitting. Jobs run over an integer number of files
146     and no more than one block.
147     ARGUMENT: blockSites: dictionary with blocks as keys and list of host sites as values
148     REQUIRES: self.selectTotalNumberEvents, self.selectEventsPerJob, self.selectNumberofJobs,
149     self.total_number_of_events, self.eventsPerJob, self.theNumberOfJobs,
150     self.maxEvents, self.filesbyblock
151 spiga 1.3 SETS: jobDestination - Site destination(s) for each job (a list of lists)
152 spiga 1.1 self.total_number_of_jobs - Total # of jobs
153     self.list_of_args - File(s) job will run on (a list of lists)
154     """
155    
156 ewv 1.22 jobDestination=[]
157 spiga 1.1 self.checkUserSettings()
158     if ( (self.selectTotalNumberEvents + self.selectEventsPerJob + self.selectNumberOfJobs) != 2 ):
159     msg = 'Must define exactly two of total_number_of_events, events_per_job, or number_of_jobs.'
160     raise CrabException(msg)
161 ewv 1.22
162     blockSites = self.args['blockSites']
163 spiga 1.4 pubdata = self.args['pubdata']
164 spiga 1.3 filesbyblock=pubdata.getFiles()
165    
166     self.eventsbyblock=pubdata.getEventsPerBlock()
167     self.eventsbyfile=pubdata.getEventsPerFile()
168     self.parentFiles=pubdata.getParent()
169 spiga 1.1
170     ## get max number of events
171 spiga 1.3 self.maxEvents=pubdata.getMaxEvents()
172 spiga 1.1
173     self.useParent = int(self.cfg_params.get('CMSSW.use_parent',0))
174     noBboundary = int(self.cfg_params.get('CMSSW.no_block_boundary',0))
175    
176 spiga 1.24 if noBboundary == 1:
177     if self.total_number_of_events== -1:
178     msg = 'You are selecting no_block_boundary=1 which does not allow to set total_number_of_events=-1\n'
179 ewv 1.26 msg +='\tYou shoud get the number of event from DBS web interface and use it for your configuration.'
180     raise CrabException(msg)
181 spiga 1.25 if len(self.seWhiteList) == 0 or len(self.seWhiteList.split(',')) != 1:
182 spiga 1.24 msg = 'You are selecting no_block_boundary=1 which requires to choose one and only one site.\n'
183     msg += "\tPlease set se_white_list with the site's storage element name."
184 ewv 1.26 raise CrabException(msg)
185     blockSites = self.ComputeSubBlockSites(blockSites)
186 spiga 1.24
187 spiga 1.1 # ---- Handle the possible job splitting configurations ---- #
188     if (self.selectTotalNumberEvents):
189     totalEventsRequested = self.total_number_of_events
190     if (self.selectEventsPerJob):
191     eventsPerJobRequested = self.eventsPerJob
192     if (self.selectNumberOfJobs):
193     totalEventsRequested = self.theNumberOfJobs * self.eventsPerJob
194    
195     # If user requested all the events in the dataset
196     if (totalEventsRequested == -1):
197     eventsRemaining=self.maxEvents
198     # If user requested more events than are in the dataset
199     elif (totalEventsRequested > self.maxEvents):
200     eventsRemaining = self.maxEvents
201 spiga 1.13 common.logger.info("Requested "+str(self.total_number_of_events)+ " events, but only "+str(self.maxEvents)+" events are available.")
202 spiga 1.1 # If user requested less events than are in the dataset
203     else:
204     eventsRemaining = totalEventsRequested
205    
206     # If user requested more events per job than are in the dataset
207     if (self.selectEventsPerJob and eventsPerJobRequested > self.maxEvents):
208     eventsPerJobRequested = self.maxEvents
209    
210     # For user info at end
211     totalEventCount = 0
212    
213     if (self.selectTotalNumberEvents and self.selectNumberOfJobs):
214     eventsPerJobRequested = int(eventsRemaining/self.theNumberOfJobs)
215    
216     if (self.selectNumberOfJobs):
217 spiga 1.13 common.logger.info("May not create the exact number_of_jobs requested.")
218 spiga 1.1
219 belforte 1.55 if (self.theNumberOfJobs < 0):
220     common.logger.info("ERROR: Negative number_of_jobs requested. Will result in no jobs.")
221    
222 spiga 1.1 # old... to remove Daniele
223     totalNumberOfJobs = 999999999
224    
225 spiga 1.3 blocks = blockSites.keys()
226 spiga 1.1 blockCount = 0
227     # Backup variable in case self.maxEvents counted events in a non-included block
228     numBlocksInDataset = len(blocks)
229    
230     jobCount = 0
231     list_of_lists = []
232    
233     # list tracking which jobs are in which jobs belong to which block
234     jobsOfBlock = {}
235    
236     parString = ""
237 spiga 1.16 pString = ""
238 spiga 1.1 filesEventCount = 0
239 ewv 1.22 msg=''
240 spiga 1.1
241     # ---- Iterate over the blocks in the dataset until ---- #
242     # ---- we've met the requested total # of events ---- #
243     while ( (eventsRemaining > 0) and (blockCount < numBlocksInDataset) and (jobCount < totalNumberOfJobs)):
244     block = blocks[blockCount]
245     blockCount += 1
246     if block not in jobsOfBlock.keys() :
247     jobsOfBlock[block] = []
248    
249     if self.eventsbyblock.has_key(block) :
250     numEventsInBlock = self.eventsbyblock[block]
251 spiga 1.13 common.logger.debug('Events in Block File '+str(numEventsInBlock))
252 spiga 1.1
253 spiga 1.4 files = filesbyblock[block]
254 spiga 1.1 numFilesInBlock = len(files)
255     if (numFilesInBlock <= 0):
256     continue
257     fileCount = 0
258     if noBboundary == 0: # DD
259     # ---- New block => New job ---- #
260     parString = ""
261 spiga 1.16 pString=""
262 spiga 1.1 # counter for number of events in files currently worked on
263     filesEventCount = 0
264     # flag if next while loop should touch new file
265     newFile = 1
266     # job event counter
267     jobSkipEventCount = 0
268    
269     # ---- Iterate over the files in the block until we've met the requested ---- #
270     # ---- total # of events or we've gone over all the files in this block ---- #
271 spiga 1.15 msg='\n'
272 spiga 1.1 while ( (eventsRemaining > 0) and (fileCount < numFilesInBlock) and (jobCount < totalNumberOfJobs) ):
273     file = files[fileCount]
274     if self.useParent==1:
275     parent = self.parentFiles[file]
276 spiga 1.13 common.logger.log(10-1, "File "+str(file)+" has the following parents: "+str(parent))
277 spiga 1.1 if newFile :
278     try:
279     numEventsInFile = self.eventsbyfile[file]
280 spiga 1.13 common.logger.log(10-1, "File "+str(file)+" has "+str(numEventsInFile)+" events")
281 spiga 1.1 # increase filesEventCount
282     filesEventCount += numEventsInFile
283     # Add file to current job
284 spiga 1.11 parString += file + ','
285 spiga 1.16 if self.useParent==1:
286     for f in parent :
287     pString += f + ','
288 spiga 1.1 newFile = 0
289     except KeyError:
290 spiga 1.13 common.logger.info("File "+str(file)+" has unknown number of events: skipping")
291 spiga 1.1
292     eventsPerJobRequested = min(eventsPerJobRequested, eventsRemaining)
293     # if less events in file remain than eventsPerJobRequested
294     if ( filesEventCount - jobSkipEventCount < eventsPerJobRequested):
295     if noBboundary == 1: ## DD
296     newFile = 1
297     fileCount += 1
298     else:
299     # if last file in block
300     if ( fileCount == numFilesInBlock-1 ) :
301     # end job using last file, use remaining events in block
302     # close job and touch new file
303 spiga 1.11 fullString = parString[:-1]
304 spiga 1.1 if self.useParent==1:
305 spiga 1.11 fullParentString = pString[:-1]
306 spiga 1.50 list_of_lists.append([fullString,fullParentString,str(-1),str(jobSkipEventCount),block])
307 spiga 1.1 else:
308 spiga 1.50 list_of_lists.append([fullString,str(-1),str(jobSkipEventCount),block])
309 spiga 1.15 msg += "Job %s can run over %s events (last file in block).\n"%(str(jobCount+1), str(filesEventCount - jobSkipEventCount))
310 spiga 1.3 jobDestination.append(blockSites[block])
311 slacapra 1.20 msg += "Job %s Destination: %s\n"%(str(jobCount+1),str(SE2CMS(jobDestination[jobCount])))
312 spiga 1.1 # fill jobs of block dictionary
313     jobsOfBlock[block].append(jobCount+1)
314     # reset counter
315     jobCount = jobCount + 1
316     totalEventCount = totalEventCount + filesEventCount - jobSkipEventCount
317     eventsRemaining = eventsRemaining - filesEventCount + jobSkipEventCount
318     jobSkipEventCount = 0
319     # reset file
320     pString = ""
321     parString = ""
322     filesEventCount = 0
323     newFile = 1
324     fileCount += 1
325     else :
326     # go to next file
327     newFile = 1
328     fileCount += 1
329     # if events in file equal to eventsPerJobRequested
330     elif ( filesEventCount - jobSkipEventCount == eventsPerJobRequested ) :
331     # close job and touch new file
332 spiga 1.11 fullString = parString[:-1]
333 spiga 1.1 if self.useParent==1:
334 spiga 1.11 fullParentString = pString[:-1]
335 spiga 1.50 list_of_lists.append([fullString,fullParentString,str(eventsPerJobRequested),str(jobSkipEventCount),block])
336 spiga 1.1 else:
337 spiga 1.50 list_of_lists.append([fullString,str(eventsPerJobRequested),str(jobSkipEventCount),block])
338 spiga 1.15 msg += "Job %s can run over %s events.\n"%(str(jobCount+1),str(eventsPerJobRequested))
339 spiga 1.3 jobDestination.append(blockSites[block])
340 slacapra 1.20 msg+= "Job %s Destination: %s\n"%(str(jobCount+1),str(SE2CMS(jobDestination[jobCount])))
341 spiga 1.1 jobsOfBlock[block].append(jobCount+1)
342     # reset counter
343     jobCount = jobCount + 1
344     totalEventCount = totalEventCount + eventsPerJobRequested
345     eventsRemaining = eventsRemaining - eventsPerJobRequested
346     jobSkipEventCount = 0
347     # reset file
348     pString = ""
349     parString = ""
350     filesEventCount = 0
351     newFile = 1
352     fileCount += 1
353    
354     # if more events in file remain than eventsPerJobRequested
355     else :
356     # close job but don't touch new file
357 spiga 1.11 fullString = parString[:-1]
358 spiga 1.1 if self.useParent==1:
359 spiga 1.11 fullParentString = pString[:-1]
360 spiga 1.50 list_of_lists.append([fullString,fullParentString,str(eventsPerJobRequested),str(jobSkipEventCount),block])
361 spiga 1.1 else:
362 spiga 1.50 list_of_lists.append([fullString,str(eventsPerJobRequested),str(jobSkipEventCount),block])
363 spiga 1.15 msg += "Job %s can run over %s events.\n"%(str(jobCount+1),str(eventsPerJobRequested))
364 spiga 1.3 jobDestination.append(blockSites[block])
365 slacapra 1.20 msg+= "Job %s Destination: %s\n"%(str(jobCount+1),str(SE2CMS(jobDestination[jobCount])))
366 spiga 1.1 jobsOfBlock[block].append(jobCount+1)
367     # increase counter
368     jobCount = jobCount + 1
369     totalEventCount = totalEventCount + eventsPerJobRequested
370     eventsRemaining = eventsRemaining - eventsPerJobRequested
371     # calculate skip events for last file
372     # use filesEventCount (contains several files), jobSkipEventCount and eventsPerJobRequest
373     jobSkipEventCount = eventsPerJobRequested - (filesEventCount - jobSkipEventCount - self.eventsbyfile[file])
374     # remove all but the last file
375     filesEventCount = self.eventsbyfile[file]
376 spiga 1.16 pString_tmp=''
377 spiga 1.1 if self.useParent==1:
378 spiga 1.16 for f in parent : pString_tmp += f + ','
379 ewv 1.22 pString = pString_tmp
380 spiga 1.11 parString = file + ','
381 spiga 1.1 pass # END if
382     pass # END while (iterate over files in the block)
383     pass # END while (iterate over blocks in the dataset)
384 spiga 1.15 common.logger.debug(msg)
385 spiga 1.1 self.ncjobs = self.total_number_of_jobs = jobCount
386     if (eventsRemaining > 0 and jobCount < totalNumberOfJobs ):
387 spiga 1.13 common.logger.info("Could not run on all requested events because some blocks not hosted at allowed sites.")
388     common.logger.info(str(jobCount)+" job(s) can run on "+str(totalEventCount)+" events.\n")
389 ewv 1.22
390 spiga 1.1 # skip check on block with no sites DD
391 spiga 1.5 if noBboundary == 0 : self.checkBlockNoSite(blocks,jobsOfBlock,blockSites)
392 spiga 1.1
393     # prepare dict output
394     dictOut = {}
395 spiga 1.49 dictOut['params']= ['InputFiles','MaxEvents','SkipEvents','InputBlocks']
396     if self.useParent: dictOut['params']= ['InputFiles','ParentFiles','MaxEvents','SkipEvents','InputBlocks']
397 spiga 1.1 dictOut['args'] = list_of_lists
398 spiga 1.3 dictOut['jobDestination'] = jobDestination
399 spiga 1.1 dictOut['njobs']=self.total_number_of_jobs
400    
401     return dictOut
402    
403     # keep trace of block with no sites to print a warning at the end
404    
405 ewv 1.22 def checkBlockNoSite(self,blocks,jobsOfBlock,blockSites):
406 spiga 1.1 # screen output
407     screenOutput = "List of jobs and available destination sites:\n\n"
408     noSiteBlock = []
409     bloskNoSite = []
410 spiga 1.10 allBlock = []
411 spiga 1.1
412     blockCounter = 0
413 spiga 1.39 saveFblocks =''
414 spiga 1.1 for block in blocks:
415     if block in jobsOfBlock.keys() :
416     blockCounter += 1
417 spiga 1.10 allBlock.append( blockCounter )
418 slacapra 1.19 sites=self.blackWhiteListParser.checkWhiteList(self.blackWhiteListParser.checkBlackList(blockSites[block],[block]),[block])
419 spiga 1.1 screenOutput += "Block %5i: jobs %20s: sites: %s\n" % (blockCounter,spanRanges(jobsOfBlock[block]),
420 slacapra 1.20 ', '.join(SE2CMS(sites)))
421 slacapra 1.19 if len(sites) == 0:
422 spiga 1.1 noSiteBlock.append( spanRanges(jobsOfBlock[block]) )
423     bloskNoSite.append( blockCounter )
424 spiga 1.39 else:
425     saveFblocks += str(block)+'\n'
426     writeTXTfile(self, self.fileBlocks_FileName , saveFblocks)
427 spiga 1.1
428 spiga 1.13 common.logger.info(screenOutput)
429 spiga 1.1 if len(noSiteBlock) > 0 and len(bloskNoSite) > 0:
430     msg = 'WARNING: No sites are hosting any part of data for block:\n '
431     virgola = ""
432     if len(bloskNoSite) > 1:
433     virgola = ","
434     for block in bloskNoSite:
435     msg += ' ' + str(block) + virgola
436 spiga 1.25 msg += '\n\t\tRelated jobs:\n '
437 spiga 1.1 virgola = ""
438     if len(noSiteBlock) > 1:
439     virgola = ","
440     for range_jobs in noSiteBlock:
441     msg += str(range_jobs) + virgola
442 spiga 1.25 msg += '\n\t\twill not be submitted and this block of data can not be analyzed!\n'
443 spiga 1.14 if self.cfg_params.has_key('GRID.se_white_list'):
444 spiga 1.25 msg += '\tWARNING: SE White List: '+self.cfg_params['GRID.se_white_list']+'\n'
445     msg += '\t(Hint: By whitelisting you force the job to run at this particular site(s).\n'
446     msg += '\tPlease check if the dataset is available at this site!)'
447 spiga 1.14 if self.cfg_params.has_key('GRID.ce_white_list'):
448 spiga 1.25 msg += '\tWARNING: CE White List: '+self.cfg_params['GRID.ce_white_list']+'\n'
449     msg += '\t(Hint: By whitelisting you force the job to run at this particular site(s).\n'
450     msg += '\tPlease check if the dataset is available at this site!)\n'
451 spiga 1.1
452 spiga 1.13 common.logger.info(msg)
453 spiga 1.1
454 spiga 1.10 if bloskNoSite == allBlock:
455 spiga 1.54 msg = 'Requested jobs cannot be Created! \n'
456 spiga 1.32 if self.cfg_params.has_key('GRID.se_white_list'):
457     msg += '\tWARNING: SE White List: '+self.cfg_params['GRID.se_white_list']+'\n'
458     msg += '\t(Hint: By whitelisting you force the job to run at this particular site(s).\n'
459     msg += '\tPlease check if the dataset is available at this site!)'
460     if self.cfg_params.has_key('GRID.ce_white_list'):
461     msg += '\tWARNING: CE White List: '+self.cfg_params['GRID.ce_white_list']+'\n'
462     msg += '\t(Hint: By whitelisting you force the job to run at this particular site(s).\n'
463     msg += '\tPlease check if the dataset is available at this site!)\n'
464     raise CrabException(msg)
465 spiga 1.10
466 spiga 1.1 return
467    
468    
469     ########################################################################
470 ewv 1.22 def jobSplittingByRun(self):
471 spiga 1.1 """
472     """
473    
474     self.checkUserSettings()
475 ewv 1.22 blockSites = self.args['blockSites']
476 spiga 1.4 pubdata = self.args['pubdata']
477 spiga 1.1
478     if self.selectNumberOfJobs == 0 :
479     self.theNumberOfJobs = 9999999
480     blocks = {}
481 ewv 1.22 runList = []
482 spiga 1.1 thefiles = Fileset(name='FilesToSplit')
483 spiga 1.3 fileList = pubdata.getListFiles()
484 spiga 1.1 for f in fileList:
485     block = f['Block']['Name']
486 ewv 1.22 try:
487 spiga 1.3 f['Block']['StorageElementList'].extend(blockSites[block])
488 spiga 1.1 except:
489     continue
490     wmbsFile = File(f['LogicalFileName'])
491 spiga 1.53 if not blockSites[block]:
492     msg = 'WARNING: No sites are hosting any part of data for block: %s\n' %block
493     msg += 'Related jobs will not be submitted and this block of data can not be analyzed'
494     common.logger.debug(msg)
495 spiga 1.3 [ wmbsFile['locations'].add(x) for x in blockSites[block] ]
496 spiga 1.1 wmbsFile['block'] = block
497     runNum = f['RunsList'][0]['RunNumber']
498 ewv 1.22 runList.append(runNum)
499 spiga 1.1 myRun = Run(runNumber=runNum)
500     wmbsFile.addRun( myRun )
501     thefiles.addFile(
502     wmbsFile
503     )
504 ewv 1.22
505 spiga 1.1 work = Workflow()
506     subs = Subscription(
507     fileset = thefiles,
508     workflow = work,
509     split_algo = 'RunBased',
510     type = "Processing")
511     splitter = SplitterFactory()
512     jobfactory = splitter(subs)
513 ewv 1.22
514     #loop over all runs
515 spiga 1.1 list_of_lists = []
516     jobDestination = []
517 spiga 1.39 list_of_blocks = []
518 spiga 1.1 count = 0
519 spiga 1.17 for jobGroup in jobfactory():
520 spiga 1.1 if count < self.theNumberOfJobs:
521 spiga 1.17 res = self.getJobInfo(jobGroup)
522 ewv 1.22 parString = ''
523 spiga 1.1 for file in res['lfns']:
524 spiga 1.11 parString += file + ','
525 spiga 1.49 list_of_blocks.append(res['block'])
526 spiga 1.11 fullString = parString[:-1]
527 spiga 1.49 blockString=','.join(list_of_blocks)
528 spiga 1.52 list_of_lists.append([fullString,str(-1),str(0),blockString])
529 spiga 1.2 #need to check single file location
530 ewv 1.22 jobDestination.append(res['locations'])
531 spiga 1.1 count +=1
532 ewv 1.37 # prepare dict output
533 spiga 1.1 dictOut = {}
534 spiga 1.49 dictOut['params']= ['InputFiles','MaxEvents','SkipEvents','InputBlocks']
535 spiga 1.1 dictOut['args'] = list_of_lists
536     dictOut['jobDestination'] = jobDestination
537     dictOut['njobs']=count
538 ewv 1.41 self.cacheBlocks(list_of_blocks,jobDestination)
539 spiga 1.39
540 spiga 1.1 return dictOut
541    
542     def getJobInfo( self,jobGroup ):
543     res = {}
544 ewv 1.22 lfns = []
545     locations = []
546 spiga 1.1 tmp_check=0
547     for job in jobGroup.jobs:
548     for file in job.getFiles():
549 ewv 1.22 lfns.append(file['lfn'])
550 spiga 1.1 for loc in file['locations']:
551     if tmp_check < 1 :
552     locations.append(loc)
553 spiga 1.39 res['block']= file['block']
554 ewv 1.22 tmp_check = tmp_check + 1
555     res['lfns'] = lfns
556     res['locations'] = locations
557     return res
558    
559 spiga 1.1 ########################################################################
560 spiga 1.23 def prepareSplittingNoInput(self):
561 spiga 1.1 """
562     """
563     if (self.selectEventsPerJob):
564 spiga 1.13 common.logger.info('Required '+str(self.eventsPerJob)+' events per job ')
565 spiga 1.1 if (self.selectNumberOfJobs):
566 spiga 1.13 common.logger.info('Required '+str(self.theNumberOfJobs)+' jobs in total ')
567 spiga 1.1 if (self.selectTotalNumberEvents):
568 spiga 1.13 common.logger.info('Required '+str(self.total_number_of_events)+' events in total ')
569 spiga 1.1
570     if (self.total_number_of_events < 0):
571     msg='Cannot split jobs per Events with "-1" as total number of events'
572     raise CrabException(msg)
573    
574     if (self.selectEventsPerJob):
575     if (self.selectTotalNumberEvents):
576     self.total_number_of_jobs = int(self.total_number_of_events/self.eventsPerJob)
577     elif(self.selectNumberOfJobs) :
578     self.total_number_of_jobs =self.theNumberOfJobs
579     self.total_number_of_events =int(self.theNumberOfJobs*self.eventsPerJob)
580    
581     elif (self.selectNumberOfJobs) :
582     self.total_number_of_jobs = self.theNumberOfJobs
583     self.eventsPerJob = int(self.total_number_of_events/self.total_number_of_jobs)
584    
585 spiga 1.23
586     def jobSplittingNoInput(self):
587     """
588     Perform job splitting based on number of event per job
589     """
590     common.logger.debug('Splitting per events')
591     self.checkUserSettings()
592     jobDestination=[]
593     if ( (self.selectTotalNumberEvents + self.selectEventsPerJob + self.selectNumberOfJobs) != 2 ):
594     msg = 'Must define exactly two of total_number_of_events, events_per_job, or number_of_jobs.'
595     raise CrabException(msg)
596    
597     managedGenerators =self.args['managedGenerators']
598     generator = self.args['generator']
599 spiga 1.56 firstLumi = int(self.cfg_params.get('CMSSW.first_lumi', 1))
600 spiga 1.23
601     self.prepareSplittingNoInput()
602    
603 spiga 1.13 common.logger.debug('N jobs '+str(self.total_number_of_jobs))
604 spiga 1.1
605     # is there any remainder?
606     check = int(self.total_number_of_events) - (int(self.total_number_of_jobs)*self.eventsPerJob)
607    
608 spiga 1.13 common.logger.debug('Check '+str(check))
609 spiga 1.1
610 spiga 1.13 common.logger.info(str(self.total_number_of_jobs)+' jobs can be created, each for '+str(self.eventsPerJob)+' for a total of '+str(self.total_number_of_jobs*self.eventsPerJob)+' events')
611 spiga 1.1 if check > 0:
612 spiga 1.13 common.logger.info('Warning: asked '+str(self.total_number_of_events)+' but can do only '+str(int(self.total_number_of_jobs)*self.eventsPerJob))
613 spiga 1.1
614     # argument is seed number.$i
615     self.list_of_args = []
616     for i in range(self.total_number_of_jobs):
617     ## Since there is no input, any site is good
618 ewv 1.29 jobDestination.append([""]) # must be empty to correctly write the XML
619 spiga 1.1 args=[]
620 ewv 1.30 if (firstLumi): # Pythia first lumi
621     args.append(str(int(firstLumi)+i))
622 spiga 1.3 if (generator in managedGenerators):
623 ewv 1.22 args.append(generator)
624     if (generator == 'comphep' and i == 0):
625 spiga 1.1 # COMPHEP is brain-dead and wants event #'s like 1,100,200,300
626     args.append('1')
627 ewv 1.22 else:
628 spiga 1.1 args.append(str(i*self.eventsPerJob))
629 spiga 1.7 args.append(str(self.eventsPerJob))
630 spiga 1.1 self.list_of_args.append(args)
631     # prepare dict output
632 spiga 1.11
633 spiga 1.1 dictOut = {}
634 spiga 1.11 dictOut['params'] = ['MaxEvents']
635 ewv 1.30 if (firstLumi):
636     dictOut['params'] = ['FirstLumi','MaxEvents']
637     if (generator in managedGenerators):
638     dictOut['params'] = ['FirstLumi', 'Generator', 'FirstEvent', 'MaxEvents']
639 ewv 1.22 else:
640     if (generator in managedGenerators) :
641     dictOut['params'] = ['Generator', 'FirstEvent', 'MaxEvents']
642 spiga 1.1 dictOut['args'] = self.list_of_args
643 spiga 1.3 dictOut['jobDestination'] = jobDestination
644 spiga 1.1 dictOut['njobs']=self.total_number_of_jobs
645    
646     return dictOut
647    
648    
649     def jobSplittingForScript(self):
650     """
651     Perform job splitting based on number of job
652     """
653     self.checkUserSettings()
654 spiga 1.3 if (self.selectNumberOfJobs == 0):
655 spiga 1.1 msg = 'must specify number_of_jobs.'
656     raise crabexception(msg)
657 spiga 1.3 jobDestination = []
658 spiga 1.13 common.logger.debug('Splitting per job')
659     common.logger.info('Required '+str(self.theNumberOfJobs)+' jobs in total ')
660 spiga 1.1
661 spiga 1.23 # self.total_number_of_jobs = self.theNumberOfJobs
662    
663     self.prepareSplittingNoInput()
664 spiga 1.1
665 spiga 1.13 common.logger.debug('N jobs '+str(self.total_number_of_jobs))
666 spiga 1.1
667 spiga 1.13 common.logger.info(str(self.total_number_of_jobs)+' jobs can be created')
668 spiga 1.1
669     # argument is seed number.$i
670 spiga 1.23 self.list_of_args = []
671 spiga 1.1 for i in range(self.total_number_of_jobs):
672 spiga 1.23 args=[]
673 spiga 1.3 jobDestination.append([""])
674 spiga 1.23 if self.eventsPerJob != 0 :
675     args.append(str(self.eventsPerJob))
676     self.list_of_args.append(args)
677 spiga 1.1
678     # prepare dict output
679     dictOut = {}
680 spiga 1.23 dictOut['params'] = ['MaxEvents']
681     dictOut['args'] = self.list_of_args
682 spiga 1.3 dictOut['jobDestination'] = jobDestination
683 spiga 1.1 dictOut['njobs']=self.total_number_of_jobs
684     return dictOut
685    
686 ewv 1.22
687     def jobSplittingByLumi(self):
688 spiga 1.1 """
689 ewv 1.26 Split task into jobs by Lumi section paying attention to which
690     lumis should be run (according to the analysis dataset).
691     This uses WMBS job splitting which does not split files over jobs
692     so the job will have AT LEAST as many lumis as requested, perhaps
693     more
694 spiga 1.1 """
695 spiga 1.46 self.useParent = int(self.cfg_params.get('CMSSW.use_parent',0))
696 ewv 1.26 common.logger.debug('Splitting by Lumi')
697 ewv 1.27 self.checkLumiSettings()
698 ewv 1.26
699     blockSites = self.args['blockSites']
700     pubdata = self.args['pubdata']
701    
702     lumisPerFile = pubdata.getLumis()
703 spiga 1.46 self.parentFiles=pubdata.getParent()
704 ewv 1.26 # Make the list of WMBS files for job splitter
705     fileList = pubdata.getListFiles()
706 ewv 1.45 wmFileList = []
707 ewv 1.26 for jobFile in fileList:
708     block = jobFile['Block']['Name']
709     try:
710     jobFile['Block']['StorageElementList'].extend(blockSites[block])
711     except:
712     continue
713     wmbsFile = File(jobFile['LogicalFileName'])
714 ewv 1.42 if not blockSites[block]:
715 spiga 1.53 msg = 'WARNING: No sites are hosting any part of data for block: %s\n' %block
716     msg += 'Related jobs will not be submitted and this block of data can not be analyzed'
717     common.logger.debug(msg)
718     # wmbsFile['locations'].add('Nowhere')
719 ewv 1.26 [ wmbsFile['locations'].add(x) for x in blockSites[block] ]
720     wmbsFile['block'] = block
721     for lumi in lumisPerFile[jobFile['LogicalFileName']]:
722     wmbsFile.addRun(Run(lumi[0], lumi[1]))
723 ewv 1.45 wmFileList.append(wmbsFile)
724    
725     fileSet = set(wmFileList)
726     thefiles = Fileset(name='FilesToSplit', files = fileSet)
727 ewv 1.26
728 ewv 1.27 # Create the factory and workflow
729 ewv 1.26 work = Workflow()
730     subs = Subscription(fileset = thefiles, workflow = work,
731     split_algo = 'LumiBased', type = "Processing")
732     splitter = SplitterFactory()
733     jobFactory = splitter(subs)
734    
735     list_of_lists = []
736     jobDestination = []
737     jobCount = 0
738 ewv 1.27 lumisCreated = 0
739 spiga 1.39 list_of_blocks = []
740 ewv 1.27 if not self.limitJobLumis:
741 ewv 1.48 if self.totalNLumis > 0:
742 ewv 1.47 self.lumisPerJob = max(self.totalNLumis // self.theNumberOfJobs,1)
743     else:
744     self.lumisPerJob = pubdata.getMaxLumis() // self.theNumberOfJobs + 1
745 ewv 1.27 common.logger.info('Each job will process about %s lumis.' %
746     self.lumisPerJob)
747    
748 ewv 1.38 for jobGroup in jobFactory(lumis_per_job = self.lumisPerJob):
749 ewv 1.26 for job in jobGroup.jobs:
750 ewv 1.27 if (self.limitNJobs and jobCount >= self.theNumberOfJobs):
751 ewv 1.44 common.logger.info('Requested number of jobs reached.')
752 ewv 1.27 break
753     if (self.limitTotalLumis and lumisCreated >= self.totalNLumis):
754 ewv 1.44 common.logger.info('Requested number of lumis reached.')
755 ewv 1.27 break
756     lumis = []
757     lfns = []
758 spiga 1.46 if self.useParent==1:
759 ewv 1.47 parentlfns = []
760 spiga 1.46 pString =""
761    
762 ewv 1.27 locations = []
763 spiga 1.39 blocks = []
764 ewv 1.27 firstFile = True
765     # Collect information from all the files
766     for jobFile in job.getFiles():
767 ewv 1.34 doFile = False
768 ewv 1.27 if firstFile: # Get locations from first file in the job
769     for loc in jobFile['locations']:
770     locations.append(loc)
771 spiga 1.39 blocks.append(jobFile['block'])
772 ewv 1.27 firstFile = False
773     # Accumulate Lumis from all files
774     for lumiList in jobFile['runs']:
775     theRun = lumiList.run
776     for theLumi in list(lumiList):
777 ewv 1.35 if (not self.limitTotalLumis) or \
778 ewv 1.38 (lumisCreated < self.totalNLumis):
779 ewv 1.34 doFile = True
780 ewv 1.36 lumisCreated += 1
781 ewv 1.34 lumis.append( (theRun, theLumi) )
782     if doFile:
783     lfns.append(jobFile['lfn'])
784 spiga 1.46 if self.useParent==1:
785     parent = self.parentFiles[jobFile['lfn']]
786     for p in parent :
787     pString += p + ','
788 ewv 1.27 fileString = ','.join(lfns)
789 spiga 1.33 lumiLister = LumiList(lumis = lumis)
790     lumiString = lumiLister.getCMSSWString()
791 spiga 1.49 blockString=','.join(blocks)
792 spiga 1.46 if self.useParent==1:
793     common.logger.debug("Files: "+fileString+" with the following parents: "+pString[:-1])
794     pfileString = pString[:-1]
795 spiga 1.49 list_of_lists.append([fileString, pfileString, str(-1), str(0), lumiString,blockString])
796 ewv 1.47 else:
797 spiga 1.49 list_of_lists.append([fileString, str(-1), str(0), lumiString, blockString])
798 spiga 1.39 list_of_blocks.append(blocks)
799 ewv 1.27 jobDestination.append(locations)
800     jobCount += 1
801     common.logger.debug('Job %s will run on %s files and %s lumis '
802     % (jobCount, len(lfns), len(lumis) ))
803    
804     common.logger.info('%s jobs created to run on %s lumis' %
805     (jobCount, lumisCreated))
806 ewv 1.26
807     # Prepare dict output matching back to non-WMBS job creation
808     dictOut = {}
809 spiga 1.49 dictOut['params'] = ['InputFiles', 'MaxEvents', 'SkipEvents', 'Lumis','InputBlocks']
810 ewv 1.47 if self.useParent==1:
811 spiga 1.49 dictOut['params']= ['InputFiles','ParentFiles','MaxEvents','SkipEvents','Lumis','InputBlocks']
812 ewv 1.26 dictOut['args'] = list_of_lists
813     dictOut['jobDestination'] = jobDestination
814     dictOut['njobs'] = jobCount
815 ewv 1.41 self.cacheBlocks(list_of_blocks,jobDestination)
816 spiga 1.39
817 ewv 1.26 return dictOut
818    
819 spiga 1.39 def cacheBlocks(self, blocks,destinations):
820    
821     saveFblocks=''
822     for i in range(len(blocks)):
823     sites=self.blackWhiteListParser.checkWhiteList(self.blackWhiteListParser.checkBlackList(destinations[i]))
824     if len(sites) != 0:
825 spiga 1.40 for block in blocks[i]:
826     saveFblocks += str(block)+'\n'
827 spiga 1.39 writeTXTfile(self, self.fileBlocks_FileName , saveFblocks)
828 ewv 1.26
829 spiga 1.1 def Algos(self):
830     """
831     Define key splittingType matrix
832     """
833 ewv 1.22 SplitAlogs = {
834     'EventBased' : self.jobSplittingByEvent,
835 spiga 1.1 'RunBased' : self.jobSplittingByRun,
836 ewv 1.22 'LumiBased' : self.jobSplittingByLumi,
837     'NoInput' : self.jobSplittingNoInput,
838 spiga 1.1 'ForScript' : self.jobSplittingForScript
839 ewv 1.22 }
840 spiga 1.1 return SplitAlogs
841