ViewVC Help
View File | Revision Log | Show Annotations | Root Listing
root/cvsroot/COMP/CRAB/python/Splitter.py
Revision: 1.55
Committed: Tue Mar 13 21:49:55 2012 UTC (13 years, 1 month ago) by belforte
Content type: text/x-python
Branch: MAIN
CVS Tags: CRAB_2_8_2_pre3, CRAB_2_8_2_pre2, CRAB_2_8_2_pre1, CRAB_2_8_1
Changes since 1.54: +5 -2 lines
Log Message:
add message for negative number_of_jobs

File Contents

# User Rev Content
1 ewv 1.27
2 belforte 1.55 __revision__ = "$Id: Splitter.py,v 1.54 2012/03/06 19:22:59 spiga Exp $"
3     __version__ = "$Revision: 1.54 $"
4 ewv 1.27
5 spiga 1.1 import common
6     from crab_exceptions import *
7     from crab_util import *
8 ewv 1.26
9     from WMCore.DataStructs.File import File
10     from WMCore.DataStructs.Fileset import Fileset
11     from WMCore.DataStructs.Run import Run
12     from WMCore.DataStructs.Subscription import Subscription
13     from WMCore.DataStructs.Workflow import Workflow
14     from WMCore.JobSplitting.SplitterFactory import SplitterFactory
15 spiga 1.1 from WMCore.SiteScreening.BlackWhiteListParser import SEBlackWhiteListParser
16 ewv 1.43 try: # Can remove when CMSSW 3.7 and earlier are dropped
17     from FWCore.PythonUtilities.LumiList import LumiList
18     except ImportError:
19     from LumiList import LumiList
20 spiga 1.1
21     class JobSplitter:
22     def __init__( self, cfg_params, args ):
23     self.cfg_params = cfg_params
24 spiga 1.3 self.args=args
25 ewv 1.27
26     self.lumisPerJob = -1
27     self.totalNLumis = 0
28     self.theNumberOfJobs = 0
29     self.limitNJobs = False
30     self.limitTotalLumis = False
31     self.limitJobLumis = False
32    
33 spiga 1.1 #self.maxEvents
34     # init BlackWhiteListParser
35 spiga 1.24 self.seWhiteList = cfg_params.get('GRID.se_white_list',[])
36 spiga 1.14 seBlackList = cfg_params.get('GRID.se_black_list',[])
37 spiga 1.24 self.blackWhiteListParser = SEBlackWhiteListParser(self.seWhiteList, seBlackList, common.logger())
38 spiga 1.1
39 spiga 1.39 ## check if has been asked for a non default file to store/read analyzed fileBlocks
40     defaultName = common.work_space.shareDir()+'AnalyzedBlocks.txt'
41     self.fileBlocks_FileName = os.path.abspath(self.cfg_params.get('CMSSW.fileblocks_file',defaultName))
42    
43 spiga 1.1
44     def checkUserSettings(self):
45     ## Events per job
46     if self.cfg_params.has_key('CMSSW.events_per_job'):
47     self.eventsPerJob =int( self.cfg_params['CMSSW.events_per_job'])
48     self.selectEventsPerJob = 1
49     else:
50     self.eventsPerJob = -1
51     self.selectEventsPerJob = 0
52    
53     ## number of jobs
54     if self.cfg_params.has_key('CMSSW.number_of_jobs'):
55     self.theNumberOfJobs =int( self.cfg_params['CMSSW.number_of_jobs'])
56     self.selectNumberOfJobs = 1
57     else:
58     self.theNumberOfJobs = 0
59     self.selectNumberOfJobs = 0
60    
61     if self.cfg_params.has_key('CMSSW.total_number_of_events'):
62     self.total_number_of_events = int(self.cfg_params['CMSSW.total_number_of_events'])
63     self.selectTotalNumberEvents = 1
64     if self.selectNumberOfJobs == 1:
65     if (self.total_number_of_events != -1) and int(self.total_number_of_events) < int(self.theNumberOfJobs):
66     msg = 'Must specify at least one event per job. total_number_of_events > number_of_jobs '
67     raise CrabException(msg)
68     else:
69     self.total_number_of_events = 0
70     self.selectTotalNumberEvents = 0
71    
72 ewv 1.37 return
73 spiga 1.1
74 ewv 1.27 def checkLumiSettings(self):
75     """
76     Check to make sure the user has specified enough information to
77     perform splitting by Lumis to run the job
78     """
79     settings = 0
80     if self.cfg_params.has_key('CMSSW.lumis_per_job'):
81     self.lumisPerJob =int( self.cfg_params['CMSSW.lumis_per_job'])
82     self.limitJobLumis = True
83     settings += 1
84    
85     if self.cfg_params.has_key('CMSSW.number_of_jobs'):
86     self.theNumberOfJobs =int( self.cfg_params['CMSSW.number_of_jobs'])
87     self.limitNJobs = True
88     settings += 1
89    
90     if self.cfg_params.has_key('CMSSW.total_number_of_lumis'):
91     self.totalNLumis = int(self.cfg_params['CMSSW.total_number_of_lumis'])
92     self.limitTotalLumis = (self.totalNLumis != -1)
93     settings += 1
94    
95     if settings != 2:
96 ewv 1.35 msg = 'When splitting by lumi section you must specify two and only two of:\n'
97 ewv 1.28 msg += ' number_of_jobs, lumis_per_job, total_number_of_lumis'
98 ewv 1.27 raise CrabException(msg)
99     if self.limitNJobs and self.limitJobLumis:
100     self.limitTotalLumis = True
101     self.totalNLumis = self.lumisPerJob * self.theNumberOfJobs
102    
103 ewv 1.37 # Has the user specified runselection?
104     if (self.cfg_params.has_key('CMSSW.runselection')):
105     common.logger.info('You have specified runselection and split by lumi.')
106     common.logger.info('Good lumi list will be the intersection of runselection and lumimask or ADS (if any).')
107     return
108 ewv 1.27
109 spiga 1.24 def ComputeSubBlockSites( self, blockSites ):
110     """
111     """
112     sub_blockSites = {}
113     for k,v in blockSites.iteritems():
114     sites=self.blackWhiteListParser.checkWhiteList(v)
115     if sites : sub_blockSites[k]=v
116     if len(sub_blockSites) < 1:
117     msg = 'WARNING: the sites %s is not hosting any part of data.'%self.seWhiteList
118     raise CrabException(msg)
119     return sub_blockSites
120    
121 spiga 1.1 ########################################################################
122     def jobSplittingByEvent( self ):
123     """
124     Perform job splitting. Jobs run over an integer number of files
125     and no more than one block.
126     ARGUMENT: blockSites: dictionary with blocks as keys and list of host sites as values
127     REQUIRES: self.selectTotalNumberEvents, self.selectEventsPerJob, self.selectNumberofJobs,
128     self.total_number_of_events, self.eventsPerJob, self.theNumberOfJobs,
129     self.maxEvents, self.filesbyblock
130 spiga 1.3 SETS: jobDestination - Site destination(s) for each job (a list of lists)
131 spiga 1.1 self.total_number_of_jobs - Total # of jobs
132     self.list_of_args - File(s) job will run on (a list of lists)
133     """
134    
135 ewv 1.22 jobDestination=[]
136 spiga 1.1 self.checkUserSettings()
137     if ( (self.selectTotalNumberEvents + self.selectEventsPerJob + self.selectNumberOfJobs) != 2 ):
138     msg = 'Must define exactly two of total_number_of_events, events_per_job, or number_of_jobs.'
139     raise CrabException(msg)
140 ewv 1.22
141     blockSites = self.args['blockSites']
142 spiga 1.4 pubdata = self.args['pubdata']
143 spiga 1.3 filesbyblock=pubdata.getFiles()
144    
145     self.eventsbyblock=pubdata.getEventsPerBlock()
146     self.eventsbyfile=pubdata.getEventsPerFile()
147     self.parentFiles=pubdata.getParent()
148 spiga 1.1
149     ## get max number of events
150 spiga 1.3 self.maxEvents=pubdata.getMaxEvents()
151 spiga 1.1
152     self.useParent = int(self.cfg_params.get('CMSSW.use_parent',0))
153     noBboundary = int(self.cfg_params.get('CMSSW.no_block_boundary',0))
154    
155 spiga 1.24 if noBboundary == 1:
156     if self.total_number_of_events== -1:
157     msg = 'You are selecting no_block_boundary=1 which does not allow to set total_number_of_events=-1\n'
158 ewv 1.26 msg +='\tYou shoud get the number of event from DBS web interface and use it for your configuration.'
159     raise CrabException(msg)
160 spiga 1.25 if len(self.seWhiteList) == 0 or len(self.seWhiteList.split(',')) != 1:
161 spiga 1.24 msg = 'You are selecting no_block_boundary=1 which requires to choose one and only one site.\n'
162     msg += "\tPlease set se_white_list with the site's storage element name."
163 ewv 1.26 raise CrabException(msg)
164     blockSites = self.ComputeSubBlockSites(blockSites)
165 spiga 1.24
166 spiga 1.1 # ---- Handle the possible job splitting configurations ---- #
167     if (self.selectTotalNumberEvents):
168     totalEventsRequested = self.total_number_of_events
169     if (self.selectEventsPerJob):
170     eventsPerJobRequested = self.eventsPerJob
171     if (self.selectNumberOfJobs):
172     totalEventsRequested = self.theNumberOfJobs * self.eventsPerJob
173    
174     # If user requested all the events in the dataset
175     if (totalEventsRequested == -1):
176     eventsRemaining=self.maxEvents
177     # If user requested more events than are in the dataset
178     elif (totalEventsRequested > self.maxEvents):
179     eventsRemaining = self.maxEvents
180 spiga 1.13 common.logger.info("Requested "+str(self.total_number_of_events)+ " events, but only "+str(self.maxEvents)+" events are available.")
181 spiga 1.1 # If user requested less events than are in the dataset
182     else:
183     eventsRemaining = totalEventsRequested
184    
185     # If user requested more events per job than are in the dataset
186     if (self.selectEventsPerJob and eventsPerJobRequested > self.maxEvents):
187     eventsPerJobRequested = self.maxEvents
188    
189     # For user info at end
190     totalEventCount = 0
191    
192     if (self.selectTotalNumberEvents and self.selectNumberOfJobs):
193     eventsPerJobRequested = int(eventsRemaining/self.theNumberOfJobs)
194    
195     if (self.selectNumberOfJobs):
196 spiga 1.13 common.logger.info("May not create the exact number_of_jobs requested.")
197 spiga 1.1
198 belforte 1.55 if (self.theNumberOfJobs < 0):
199     common.logger.info("ERROR: Negative number_of_jobs requested. Will result in no jobs.")
200    
201 spiga 1.1 # old... to remove Daniele
202     totalNumberOfJobs = 999999999
203    
204 spiga 1.3 blocks = blockSites.keys()
205 spiga 1.1 blockCount = 0
206     # Backup variable in case self.maxEvents counted events in a non-included block
207     numBlocksInDataset = len(blocks)
208    
209     jobCount = 0
210     list_of_lists = []
211    
212     # list tracking which jobs are in which jobs belong to which block
213     jobsOfBlock = {}
214    
215     parString = ""
216 spiga 1.16 pString = ""
217 spiga 1.1 filesEventCount = 0
218 ewv 1.22 msg=''
219 spiga 1.1
220     # ---- Iterate over the blocks in the dataset until ---- #
221     # ---- we've met the requested total # of events ---- #
222     while ( (eventsRemaining > 0) and (blockCount < numBlocksInDataset) and (jobCount < totalNumberOfJobs)):
223     block = blocks[blockCount]
224     blockCount += 1
225     if block not in jobsOfBlock.keys() :
226     jobsOfBlock[block] = []
227    
228     if self.eventsbyblock.has_key(block) :
229     numEventsInBlock = self.eventsbyblock[block]
230 spiga 1.13 common.logger.debug('Events in Block File '+str(numEventsInBlock))
231 spiga 1.1
232 spiga 1.4 files = filesbyblock[block]
233 spiga 1.1 numFilesInBlock = len(files)
234     if (numFilesInBlock <= 0):
235     continue
236     fileCount = 0
237     if noBboundary == 0: # DD
238     # ---- New block => New job ---- #
239     parString = ""
240 spiga 1.16 pString=""
241 spiga 1.1 # counter for number of events in files currently worked on
242     filesEventCount = 0
243     # flag if next while loop should touch new file
244     newFile = 1
245     # job event counter
246     jobSkipEventCount = 0
247    
248     # ---- Iterate over the files in the block until we've met the requested ---- #
249     # ---- total # of events or we've gone over all the files in this block ---- #
250 spiga 1.15 msg='\n'
251 spiga 1.1 while ( (eventsRemaining > 0) and (fileCount < numFilesInBlock) and (jobCount < totalNumberOfJobs) ):
252     file = files[fileCount]
253     if self.useParent==1:
254     parent = self.parentFiles[file]
255 spiga 1.13 common.logger.log(10-1, "File "+str(file)+" has the following parents: "+str(parent))
256 spiga 1.1 if newFile :
257     try:
258     numEventsInFile = self.eventsbyfile[file]
259 spiga 1.13 common.logger.log(10-1, "File "+str(file)+" has "+str(numEventsInFile)+" events")
260 spiga 1.1 # increase filesEventCount
261     filesEventCount += numEventsInFile
262     # Add file to current job
263 spiga 1.11 parString += file + ','
264 spiga 1.16 if self.useParent==1:
265     for f in parent :
266     pString += f + ','
267 spiga 1.1 newFile = 0
268     except KeyError:
269 spiga 1.13 common.logger.info("File "+str(file)+" has unknown number of events: skipping")
270 spiga 1.1
271     eventsPerJobRequested = min(eventsPerJobRequested, eventsRemaining)
272     # if less events in file remain than eventsPerJobRequested
273     if ( filesEventCount - jobSkipEventCount < eventsPerJobRequested):
274     if noBboundary == 1: ## DD
275     newFile = 1
276     fileCount += 1
277     else:
278     # if last file in block
279     if ( fileCount == numFilesInBlock-1 ) :
280     # end job using last file, use remaining events in block
281     # close job and touch new file
282 spiga 1.11 fullString = parString[:-1]
283 spiga 1.1 if self.useParent==1:
284 spiga 1.11 fullParentString = pString[:-1]
285 spiga 1.50 list_of_lists.append([fullString,fullParentString,str(-1),str(jobSkipEventCount),block])
286 spiga 1.1 else:
287 spiga 1.50 list_of_lists.append([fullString,str(-1),str(jobSkipEventCount),block])
288 spiga 1.15 msg += "Job %s can run over %s events (last file in block).\n"%(str(jobCount+1), str(filesEventCount - jobSkipEventCount))
289 spiga 1.3 jobDestination.append(blockSites[block])
290 slacapra 1.20 msg += "Job %s Destination: %s\n"%(str(jobCount+1),str(SE2CMS(jobDestination[jobCount])))
291 spiga 1.1 # fill jobs of block dictionary
292     jobsOfBlock[block].append(jobCount+1)
293     # reset counter
294     jobCount = jobCount + 1
295     totalEventCount = totalEventCount + filesEventCount - jobSkipEventCount
296     eventsRemaining = eventsRemaining - filesEventCount + jobSkipEventCount
297     jobSkipEventCount = 0
298     # reset file
299     pString = ""
300     parString = ""
301     filesEventCount = 0
302     newFile = 1
303     fileCount += 1
304     else :
305     # go to next file
306     newFile = 1
307     fileCount += 1
308     # if events in file equal to eventsPerJobRequested
309     elif ( filesEventCount - jobSkipEventCount == eventsPerJobRequested ) :
310     # close job and touch new file
311 spiga 1.11 fullString = parString[:-1]
312 spiga 1.1 if self.useParent==1:
313 spiga 1.11 fullParentString = pString[:-1]
314 spiga 1.50 list_of_lists.append([fullString,fullParentString,str(eventsPerJobRequested),str(jobSkipEventCount),block])
315 spiga 1.1 else:
316 spiga 1.50 list_of_lists.append([fullString,str(eventsPerJobRequested),str(jobSkipEventCount),block])
317 spiga 1.15 msg += "Job %s can run over %s events.\n"%(str(jobCount+1),str(eventsPerJobRequested))
318 spiga 1.3 jobDestination.append(blockSites[block])
319 slacapra 1.20 msg+= "Job %s Destination: %s\n"%(str(jobCount+1),str(SE2CMS(jobDestination[jobCount])))
320 spiga 1.1 jobsOfBlock[block].append(jobCount+1)
321     # reset counter
322     jobCount = jobCount + 1
323     totalEventCount = totalEventCount + eventsPerJobRequested
324     eventsRemaining = eventsRemaining - eventsPerJobRequested
325     jobSkipEventCount = 0
326     # reset file
327     pString = ""
328     parString = ""
329     filesEventCount = 0
330     newFile = 1
331     fileCount += 1
332    
333     # if more events in file remain than eventsPerJobRequested
334     else :
335     # close job but don't touch new file
336 spiga 1.11 fullString = parString[:-1]
337 spiga 1.1 if self.useParent==1:
338 spiga 1.11 fullParentString = pString[:-1]
339 spiga 1.50 list_of_lists.append([fullString,fullParentString,str(eventsPerJobRequested),str(jobSkipEventCount),block])
340 spiga 1.1 else:
341 spiga 1.50 list_of_lists.append([fullString,str(eventsPerJobRequested),str(jobSkipEventCount),block])
342 spiga 1.15 msg += "Job %s can run over %s events.\n"%(str(jobCount+1),str(eventsPerJobRequested))
343 spiga 1.3 jobDestination.append(blockSites[block])
344 slacapra 1.20 msg+= "Job %s Destination: %s\n"%(str(jobCount+1),str(SE2CMS(jobDestination[jobCount])))
345 spiga 1.1 jobsOfBlock[block].append(jobCount+1)
346     # increase counter
347     jobCount = jobCount + 1
348     totalEventCount = totalEventCount + eventsPerJobRequested
349     eventsRemaining = eventsRemaining - eventsPerJobRequested
350     # calculate skip events for last file
351     # use filesEventCount (contains several files), jobSkipEventCount and eventsPerJobRequest
352     jobSkipEventCount = eventsPerJobRequested - (filesEventCount - jobSkipEventCount - self.eventsbyfile[file])
353     # remove all but the last file
354     filesEventCount = self.eventsbyfile[file]
355 spiga 1.16 pString_tmp=''
356 spiga 1.1 if self.useParent==1:
357 spiga 1.16 for f in parent : pString_tmp += f + ','
358 ewv 1.22 pString = pString_tmp
359 spiga 1.11 parString = file + ','
360 spiga 1.1 pass # END if
361     pass # END while (iterate over files in the block)
362     pass # END while (iterate over blocks in the dataset)
363 spiga 1.15 common.logger.debug(msg)
364 spiga 1.1 self.ncjobs = self.total_number_of_jobs = jobCount
365     if (eventsRemaining > 0 and jobCount < totalNumberOfJobs ):
366 spiga 1.13 common.logger.info("Could not run on all requested events because some blocks not hosted at allowed sites.")
367     common.logger.info(str(jobCount)+" job(s) can run on "+str(totalEventCount)+" events.\n")
368 ewv 1.22
369 spiga 1.1 # skip check on block with no sites DD
370 spiga 1.5 if noBboundary == 0 : self.checkBlockNoSite(blocks,jobsOfBlock,blockSites)
371 spiga 1.1
372     # prepare dict output
373     dictOut = {}
374 spiga 1.49 dictOut['params']= ['InputFiles','MaxEvents','SkipEvents','InputBlocks']
375     if self.useParent: dictOut['params']= ['InputFiles','ParentFiles','MaxEvents','SkipEvents','InputBlocks']
376 spiga 1.1 dictOut['args'] = list_of_lists
377 spiga 1.3 dictOut['jobDestination'] = jobDestination
378 spiga 1.1 dictOut['njobs']=self.total_number_of_jobs
379    
380     return dictOut
381    
382     # keep trace of block with no sites to print a warning at the end
383    
384 ewv 1.22 def checkBlockNoSite(self,blocks,jobsOfBlock,blockSites):
385 spiga 1.1 # screen output
386     screenOutput = "List of jobs and available destination sites:\n\n"
387     noSiteBlock = []
388     bloskNoSite = []
389 spiga 1.10 allBlock = []
390 spiga 1.1
391     blockCounter = 0
392 spiga 1.39 saveFblocks =''
393 spiga 1.1 for block in blocks:
394     if block in jobsOfBlock.keys() :
395     blockCounter += 1
396 spiga 1.10 allBlock.append( blockCounter )
397 slacapra 1.19 sites=self.blackWhiteListParser.checkWhiteList(self.blackWhiteListParser.checkBlackList(blockSites[block],[block]),[block])
398 spiga 1.1 screenOutput += "Block %5i: jobs %20s: sites: %s\n" % (blockCounter,spanRanges(jobsOfBlock[block]),
399 slacapra 1.20 ', '.join(SE2CMS(sites)))
400 slacapra 1.19 if len(sites) == 0:
401 spiga 1.1 noSiteBlock.append( spanRanges(jobsOfBlock[block]) )
402     bloskNoSite.append( blockCounter )
403 spiga 1.39 else:
404     saveFblocks += str(block)+'\n'
405     writeTXTfile(self, self.fileBlocks_FileName , saveFblocks)
406 spiga 1.1
407 spiga 1.13 common.logger.info(screenOutput)
408 spiga 1.1 if len(noSiteBlock) > 0 and len(bloskNoSite) > 0:
409     msg = 'WARNING: No sites are hosting any part of data for block:\n '
410     virgola = ""
411     if len(bloskNoSite) > 1:
412     virgola = ","
413     for block in bloskNoSite:
414     msg += ' ' + str(block) + virgola
415 spiga 1.25 msg += '\n\t\tRelated jobs:\n '
416 spiga 1.1 virgola = ""
417     if len(noSiteBlock) > 1:
418     virgola = ","
419     for range_jobs in noSiteBlock:
420     msg += str(range_jobs) + virgola
421 spiga 1.25 msg += '\n\t\twill not be submitted and this block of data can not be analyzed!\n'
422 spiga 1.14 if self.cfg_params.has_key('GRID.se_white_list'):
423 spiga 1.25 msg += '\tWARNING: SE White List: '+self.cfg_params['GRID.se_white_list']+'\n'
424     msg += '\t(Hint: By whitelisting you force the job to run at this particular site(s).\n'
425     msg += '\tPlease check if the dataset is available at this site!)'
426 spiga 1.14 if self.cfg_params.has_key('GRID.ce_white_list'):
427 spiga 1.25 msg += '\tWARNING: CE White List: '+self.cfg_params['GRID.ce_white_list']+'\n'
428     msg += '\t(Hint: By whitelisting you force the job to run at this particular site(s).\n'
429     msg += '\tPlease check if the dataset is available at this site!)\n'
430 spiga 1.1
431 spiga 1.13 common.logger.info(msg)
432 spiga 1.1
433 spiga 1.10 if bloskNoSite == allBlock:
434 spiga 1.54 msg = 'Requested jobs cannot be Created! \n'
435 spiga 1.32 if self.cfg_params.has_key('GRID.se_white_list'):
436     msg += '\tWARNING: SE White List: '+self.cfg_params['GRID.se_white_list']+'\n'
437     msg += '\t(Hint: By whitelisting you force the job to run at this particular site(s).\n'
438     msg += '\tPlease check if the dataset is available at this site!)'
439     if self.cfg_params.has_key('GRID.ce_white_list'):
440     msg += '\tWARNING: CE White List: '+self.cfg_params['GRID.ce_white_list']+'\n'
441     msg += '\t(Hint: By whitelisting you force the job to run at this particular site(s).\n'
442     msg += '\tPlease check if the dataset is available at this site!)\n'
443     raise CrabException(msg)
444 spiga 1.10
445 spiga 1.1 return
446    
447    
448     ########################################################################
449 ewv 1.22 def jobSplittingByRun(self):
450 spiga 1.1 """
451     """
452    
453     self.checkUserSettings()
454 ewv 1.22 blockSites = self.args['blockSites']
455 spiga 1.4 pubdata = self.args['pubdata']
456 spiga 1.1
457     if self.selectNumberOfJobs == 0 :
458     self.theNumberOfJobs = 9999999
459     blocks = {}
460 ewv 1.22 runList = []
461 spiga 1.1 thefiles = Fileset(name='FilesToSplit')
462 spiga 1.3 fileList = pubdata.getListFiles()
463 spiga 1.1 for f in fileList:
464     block = f['Block']['Name']
465 ewv 1.22 try:
466 spiga 1.3 f['Block']['StorageElementList'].extend(blockSites[block])
467 spiga 1.1 except:
468     continue
469     wmbsFile = File(f['LogicalFileName'])
470 spiga 1.53 if not blockSites[block]:
471     msg = 'WARNING: No sites are hosting any part of data for block: %s\n' %block
472     msg += 'Related jobs will not be submitted and this block of data can not be analyzed'
473     common.logger.debug(msg)
474 spiga 1.3 [ wmbsFile['locations'].add(x) for x in blockSites[block] ]
475 spiga 1.1 wmbsFile['block'] = block
476     runNum = f['RunsList'][0]['RunNumber']
477 ewv 1.22 runList.append(runNum)
478 spiga 1.1 myRun = Run(runNumber=runNum)
479     wmbsFile.addRun( myRun )
480     thefiles.addFile(
481     wmbsFile
482     )
483 ewv 1.22
484 spiga 1.1 work = Workflow()
485     subs = Subscription(
486     fileset = thefiles,
487     workflow = work,
488     split_algo = 'RunBased',
489     type = "Processing")
490     splitter = SplitterFactory()
491     jobfactory = splitter(subs)
492 ewv 1.22
493     #loop over all runs
494 spiga 1.1 list_of_lists = []
495     jobDestination = []
496 spiga 1.39 list_of_blocks = []
497 spiga 1.1 count = 0
498 spiga 1.17 for jobGroup in jobfactory():
499 spiga 1.1 if count < self.theNumberOfJobs:
500 spiga 1.17 res = self.getJobInfo(jobGroup)
501 ewv 1.22 parString = ''
502 spiga 1.1 for file in res['lfns']:
503 spiga 1.11 parString += file + ','
504 spiga 1.49 list_of_blocks.append(res['block'])
505 spiga 1.11 fullString = parString[:-1]
506 spiga 1.49 blockString=','.join(list_of_blocks)
507 spiga 1.52 list_of_lists.append([fullString,str(-1),str(0),blockString])
508 spiga 1.2 #need to check single file location
509 ewv 1.22 jobDestination.append(res['locations'])
510 spiga 1.1 count +=1
511 ewv 1.37 # prepare dict output
512 spiga 1.1 dictOut = {}
513 spiga 1.49 dictOut['params']= ['InputFiles','MaxEvents','SkipEvents','InputBlocks']
514 spiga 1.1 dictOut['args'] = list_of_lists
515     dictOut['jobDestination'] = jobDestination
516     dictOut['njobs']=count
517 ewv 1.41 self.cacheBlocks(list_of_blocks,jobDestination)
518 spiga 1.39
519 spiga 1.1 return dictOut
520    
521     def getJobInfo( self,jobGroup ):
522     res = {}
523 ewv 1.22 lfns = []
524     locations = []
525 spiga 1.1 tmp_check=0
526     for job in jobGroup.jobs:
527     for file in job.getFiles():
528 ewv 1.22 lfns.append(file['lfn'])
529 spiga 1.1 for loc in file['locations']:
530     if tmp_check < 1 :
531     locations.append(loc)
532 spiga 1.39 res['block']= file['block']
533 ewv 1.22 tmp_check = tmp_check + 1
534     res['lfns'] = lfns
535     res['locations'] = locations
536     return res
537    
538 spiga 1.1 ########################################################################
539 spiga 1.23 def prepareSplittingNoInput(self):
540 spiga 1.1 """
541     """
542     if (self.selectEventsPerJob):
543 spiga 1.13 common.logger.info('Required '+str(self.eventsPerJob)+' events per job ')
544 spiga 1.1 if (self.selectNumberOfJobs):
545 spiga 1.13 common.logger.info('Required '+str(self.theNumberOfJobs)+' jobs in total ')
546 spiga 1.1 if (self.selectTotalNumberEvents):
547 spiga 1.13 common.logger.info('Required '+str(self.total_number_of_events)+' events in total ')
548 spiga 1.1
549     if (self.total_number_of_events < 0):
550     msg='Cannot split jobs per Events with "-1" as total number of events'
551     raise CrabException(msg)
552    
553     if (self.selectEventsPerJob):
554     if (self.selectTotalNumberEvents):
555     self.total_number_of_jobs = int(self.total_number_of_events/self.eventsPerJob)
556     elif(self.selectNumberOfJobs) :
557     self.total_number_of_jobs =self.theNumberOfJobs
558     self.total_number_of_events =int(self.theNumberOfJobs*self.eventsPerJob)
559    
560     elif (self.selectNumberOfJobs) :
561     self.total_number_of_jobs = self.theNumberOfJobs
562     self.eventsPerJob = int(self.total_number_of_events/self.total_number_of_jobs)
563    
564 spiga 1.23
565     def jobSplittingNoInput(self):
566     """
567     Perform job splitting based on number of event per job
568     """
569     common.logger.debug('Splitting per events')
570     self.checkUserSettings()
571     jobDestination=[]
572     if ( (self.selectTotalNumberEvents + self.selectEventsPerJob + self.selectNumberOfJobs) != 2 ):
573     msg = 'Must define exactly two of total_number_of_events, events_per_job, or number_of_jobs.'
574     raise CrabException(msg)
575    
576     managedGenerators =self.args['managedGenerators']
577     generator = self.args['generator']
578 ewv 1.30 firstLumi = self.cfg_params.get('CMSSW.first_lumi', 1)
579 spiga 1.23
580     self.prepareSplittingNoInput()
581    
582 spiga 1.13 common.logger.debug('N jobs '+str(self.total_number_of_jobs))
583 spiga 1.1
584     # is there any remainder?
585     check = int(self.total_number_of_events) - (int(self.total_number_of_jobs)*self.eventsPerJob)
586    
587 spiga 1.13 common.logger.debug('Check '+str(check))
588 spiga 1.1
589 spiga 1.13 common.logger.info(str(self.total_number_of_jobs)+' jobs can be created, each for '+str(self.eventsPerJob)+' for a total of '+str(self.total_number_of_jobs*self.eventsPerJob)+' events')
590 spiga 1.1 if check > 0:
591 spiga 1.13 common.logger.info('Warning: asked '+str(self.total_number_of_events)+' but can do only '+str(int(self.total_number_of_jobs)*self.eventsPerJob))
592 spiga 1.1
593     # argument is seed number.$i
594     self.list_of_args = []
595     for i in range(self.total_number_of_jobs):
596     ## Since there is no input, any site is good
597 ewv 1.29 jobDestination.append([""]) # must be empty to correctly write the XML
598 spiga 1.1 args=[]
599 ewv 1.30 if (firstLumi): # Pythia first lumi
600     args.append(str(int(firstLumi)+i))
601 spiga 1.3 if (generator in managedGenerators):
602 ewv 1.22 args.append(generator)
603     if (generator == 'comphep' and i == 0):
604 spiga 1.1 # COMPHEP is brain-dead and wants event #'s like 1,100,200,300
605     args.append('1')
606 ewv 1.22 else:
607 spiga 1.1 args.append(str(i*self.eventsPerJob))
608 spiga 1.7 args.append(str(self.eventsPerJob))
609 spiga 1.1 self.list_of_args.append(args)
610     # prepare dict output
611 spiga 1.11
612 spiga 1.1 dictOut = {}
613 spiga 1.11 dictOut['params'] = ['MaxEvents']
614 ewv 1.30 if (firstLumi):
615     dictOut['params'] = ['FirstLumi','MaxEvents']
616     if (generator in managedGenerators):
617     dictOut['params'] = ['FirstLumi', 'Generator', 'FirstEvent', 'MaxEvents']
618 ewv 1.22 else:
619     if (generator in managedGenerators) :
620     dictOut['params'] = ['Generator', 'FirstEvent', 'MaxEvents']
621 spiga 1.1 dictOut['args'] = self.list_of_args
622 spiga 1.3 dictOut['jobDestination'] = jobDestination
623 spiga 1.1 dictOut['njobs']=self.total_number_of_jobs
624    
625     return dictOut
626    
627    
628     def jobSplittingForScript(self):
629     """
630     Perform job splitting based on number of job
631     """
632     self.checkUserSettings()
633 spiga 1.3 if (self.selectNumberOfJobs == 0):
634 spiga 1.1 msg = 'must specify number_of_jobs.'
635     raise crabexception(msg)
636 spiga 1.3 jobDestination = []
637 spiga 1.13 common.logger.debug('Splitting per job')
638     common.logger.info('Required '+str(self.theNumberOfJobs)+' jobs in total ')
639 spiga 1.1
640 spiga 1.23 # self.total_number_of_jobs = self.theNumberOfJobs
641    
642     self.prepareSplittingNoInput()
643 spiga 1.1
644 spiga 1.13 common.logger.debug('N jobs '+str(self.total_number_of_jobs))
645 spiga 1.1
646 spiga 1.13 common.logger.info(str(self.total_number_of_jobs)+' jobs can be created')
647 spiga 1.1
648     # argument is seed number.$i
649 spiga 1.23 self.list_of_args = []
650 spiga 1.1 for i in range(self.total_number_of_jobs):
651 spiga 1.23 args=[]
652 spiga 1.3 jobDestination.append([""])
653 spiga 1.23 if self.eventsPerJob != 0 :
654     args.append(str(self.eventsPerJob))
655     self.list_of_args.append(args)
656 spiga 1.1
657     # prepare dict output
658     dictOut = {}
659 spiga 1.23 dictOut['params'] = ['MaxEvents']
660     dictOut['args'] = self.list_of_args
661 spiga 1.3 dictOut['jobDestination'] = jobDestination
662 spiga 1.1 dictOut['njobs']=self.total_number_of_jobs
663     return dictOut
664    
665 ewv 1.22
666     def jobSplittingByLumi(self):
667 spiga 1.1 """
668 ewv 1.26 Split task into jobs by Lumi section paying attention to which
669     lumis should be run (according to the analysis dataset).
670     This uses WMBS job splitting which does not split files over jobs
671     so the job will have AT LEAST as many lumis as requested, perhaps
672     more
673 spiga 1.1 """
674 spiga 1.46 self.useParent = int(self.cfg_params.get('CMSSW.use_parent',0))
675 ewv 1.26 common.logger.debug('Splitting by Lumi')
676 ewv 1.27 self.checkLumiSettings()
677 ewv 1.26
678     blockSites = self.args['blockSites']
679     pubdata = self.args['pubdata']
680    
681     lumisPerFile = pubdata.getLumis()
682 spiga 1.46 self.parentFiles=pubdata.getParent()
683 ewv 1.26 # Make the list of WMBS files for job splitter
684     fileList = pubdata.getListFiles()
685 ewv 1.45 wmFileList = []
686 ewv 1.26 for jobFile in fileList:
687     block = jobFile['Block']['Name']
688     try:
689     jobFile['Block']['StorageElementList'].extend(blockSites[block])
690     except:
691     continue
692     wmbsFile = File(jobFile['LogicalFileName'])
693 ewv 1.42 if not blockSites[block]:
694 spiga 1.53 msg = 'WARNING: No sites are hosting any part of data for block: %s\n' %block
695     msg += 'Related jobs will not be submitted and this block of data can not be analyzed'
696     common.logger.debug(msg)
697     # wmbsFile['locations'].add('Nowhere')
698 ewv 1.26 [ wmbsFile['locations'].add(x) for x in blockSites[block] ]
699     wmbsFile['block'] = block
700     for lumi in lumisPerFile[jobFile['LogicalFileName']]:
701     wmbsFile.addRun(Run(lumi[0], lumi[1]))
702 ewv 1.45 wmFileList.append(wmbsFile)
703    
704     fileSet = set(wmFileList)
705     thefiles = Fileset(name='FilesToSplit', files = fileSet)
706 ewv 1.26
707 ewv 1.27 # Create the factory and workflow
708 ewv 1.26 work = Workflow()
709     subs = Subscription(fileset = thefiles, workflow = work,
710     split_algo = 'LumiBased', type = "Processing")
711     splitter = SplitterFactory()
712     jobFactory = splitter(subs)
713    
714     list_of_lists = []
715     jobDestination = []
716     jobCount = 0
717 ewv 1.27 lumisCreated = 0
718 spiga 1.39 list_of_blocks = []
719 ewv 1.27 if not self.limitJobLumis:
720 ewv 1.48 if self.totalNLumis > 0:
721 ewv 1.47 self.lumisPerJob = max(self.totalNLumis // self.theNumberOfJobs,1)
722     else:
723     self.lumisPerJob = pubdata.getMaxLumis() // self.theNumberOfJobs + 1
724 ewv 1.27 common.logger.info('Each job will process about %s lumis.' %
725     self.lumisPerJob)
726    
727 ewv 1.38 for jobGroup in jobFactory(lumis_per_job = self.lumisPerJob):
728 ewv 1.26 for job in jobGroup.jobs:
729 ewv 1.27 if (self.limitNJobs and jobCount >= self.theNumberOfJobs):
730 ewv 1.44 common.logger.info('Requested number of jobs reached.')
731 ewv 1.27 break
732     if (self.limitTotalLumis and lumisCreated >= self.totalNLumis):
733 ewv 1.44 common.logger.info('Requested number of lumis reached.')
734 ewv 1.27 break
735     lumis = []
736     lfns = []
737 spiga 1.46 if self.useParent==1:
738 ewv 1.47 parentlfns = []
739 spiga 1.46 pString =""
740    
741 ewv 1.27 locations = []
742 spiga 1.39 blocks = []
743 ewv 1.27 firstFile = True
744     # Collect information from all the files
745     for jobFile in job.getFiles():
746 ewv 1.34 doFile = False
747 ewv 1.27 if firstFile: # Get locations from first file in the job
748     for loc in jobFile['locations']:
749     locations.append(loc)
750 spiga 1.39 blocks.append(jobFile['block'])
751 ewv 1.27 firstFile = False
752     # Accumulate Lumis from all files
753     for lumiList in jobFile['runs']:
754     theRun = lumiList.run
755     for theLumi in list(lumiList):
756 ewv 1.35 if (not self.limitTotalLumis) or \
757 ewv 1.38 (lumisCreated < self.totalNLumis):
758 ewv 1.34 doFile = True
759 ewv 1.36 lumisCreated += 1
760 ewv 1.34 lumis.append( (theRun, theLumi) )
761     if doFile:
762     lfns.append(jobFile['lfn'])
763 spiga 1.46 if self.useParent==1:
764     parent = self.parentFiles[jobFile['lfn']]
765     for p in parent :
766     pString += p + ','
767 ewv 1.27 fileString = ','.join(lfns)
768 spiga 1.33 lumiLister = LumiList(lumis = lumis)
769     lumiString = lumiLister.getCMSSWString()
770 spiga 1.49 blockString=','.join(blocks)
771 spiga 1.46 if self.useParent==1:
772     common.logger.debug("Files: "+fileString+" with the following parents: "+pString[:-1])
773     pfileString = pString[:-1]
774 spiga 1.49 list_of_lists.append([fileString, pfileString, str(-1), str(0), lumiString,blockString])
775 ewv 1.47 else:
776 spiga 1.49 list_of_lists.append([fileString, str(-1), str(0), lumiString, blockString])
777 spiga 1.39 list_of_blocks.append(blocks)
778 ewv 1.27 jobDestination.append(locations)
779     jobCount += 1
780     common.logger.debug('Job %s will run on %s files and %s lumis '
781     % (jobCount, len(lfns), len(lumis) ))
782    
783     common.logger.info('%s jobs created to run on %s lumis' %
784     (jobCount, lumisCreated))
785 ewv 1.26
786     # Prepare dict output matching back to non-WMBS job creation
787     dictOut = {}
788 spiga 1.49 dictOut['params'] = ['InputFiles', 'MaxEvents', 'SkipEvents', 'Lumis','InputBlocks']
789 ewv 1.47 if self.useParent==1:
790 spiga 1.49 dictOut['params']= ['InputFiles','ParentFiles','MaxEvents','SkipEvents','Lumis','InputBlocks']
791 ewv 1.26 dictOut['args'] = list_of_lists
792     dictOut['jobDestination'] = jobDestination
793     dictOut['njobs'] = jobCount
794 ewv 1.41 self.cacheBlocks(list_of_blocks,jobDestination)
795 spiga 1.39
796 ewv 1.26 return dictOut
797    
798 spiga 1.39 def cacheBlocks(self, blocks,destinations):
799    
800     saveFblocks=''
801     for i in range(len(blocks)):
802     sites=self.blackWhiteListParser.checkWhiteList(self.blackWhiteListParser.checkBlackList(destinations[i]))
803     if len(sites) != 0:
804 spiga 1.40 for block in blocks[i]:
805     saveFblocks += str(block)+'\n'
806 spiga 1.39 writeTXTfile(self, self.fileBlocks_FileName , saveFblocks)
807 ewv 1.26
808 spiga 1.1 def Algos(self):
809     """
810     Define key splittingType matrix
811     """
812 ewv 1.22 SplitAlogs = {
813     'EventBased' : self.jobSplittingByEvent,
814 spiga 1.1 'RunBased' : self.jobSplittingByRun,
815 ewv 1.22 'LumiBased' : self.jobSplittingByLumi,
816     'NoInput' : self.jobSplittingNoInput,
817 spiga 1.1 'ForScript' : self.jobSplittingForScript
818 ewv 1.22 }
819 spiga 1.1 return SplitAlogs
820