ViewVC Help
View File | Revision Log | Show Annotations | Root Listing
root/cvsroot/COMP/CRAB/python/Splitter.py
Revision: 1.53
Committed: Thu Jun 16 06:22:12 2011 UTC (13 years, 10 months ago) by spiga
Content type: text/x-python
Branch: MAIN
CVS Tags: CRAB_2_8_0, CRAB_2_8_0_pre1, CRAB_2_7_10_pre3, CRAB_2_7_9_patch2_pre1, CRAB_2_7_10_pre2, CRAB_2_7_10_pre1, CRAB_2_7_9_patch1, CRAB_2_7_9, CRAB_2_7_9_pre5, CRAB_2_7_9_pre4, CRAB_2_7_9_pre3, CRAB_2_7_9_pre2, CRAB_2_7_9_pre1
Changes since 1.52: +10 -3 lines
Log Message:
 fixing NoWhere issue. Details in savannah bug #74415

File Contents

# User Rev Content
1 ewv 1.27
2 spiga 1.53 __revision__ = "$Id: Splitter.py,v 1.52 2011/05/09 11:51:28 spiga Exp $"
3     __version__ = "$Revision: 1.52 $"
4 ewv 1.27
5 spiga 1.1 import common
6     from crab_exceptions import *
7     from crab_util import *
8 ewv 1.26
9     from WMCore.DataStructs.File import File
10     from WMCore.DataStructs.Fileset import Fileset
11     from WMCore.DataStructs.Run import Run
12     from WMCore.DataStructs.Subscription import Subscription
13     from WMCore.DataStructs.Workflow import Workflow
14     from WMCore.JobSplitting.SplitterFactory import SplitterFactory
15 spiga 1.1 from WMCore.SiteScreening.BlackWhiteListParser import SEBlackWhiteListParser
16 ewv 1.43 try: # Can remove when CMSSW 3.7 and earlier are dropped
17     from FWCore.PythonUtilities.LumiList import LumiList
18     except ImportError:
19     from LumiList import LumiList
20 spiga 1.1
21     class JobSplitter:
22     def __init__( self, cfg_params, args ):
23     self.cfg_params = cfg_params
24 spiga 1.3 self.args=args
25 ewv 1.27
26     self.lumisPerJob = -1
27     self.totalNLumis = 0
28     self.theNumberOfJobs = 0
29     self.limitNJobs = False
30     self.limitTotalLumis = False
31     self.limitJobLumis = False
32    
33 spiga 1.1 #self.maxEvents
34     # init BlackWhiteListParser
35 spiga 1.24 self.seWhiteList = cfg_params.get('GRID.se_white_list',[])
36 spiga 1.14 seBlackList = cfg_params.get('GRID.se_black_list',[])
37 spiga 1.24 self.blackWhiteListParser = SEBlackWhiteListParser(self.seWhiteList, seBlackList, common.logger())
38 spiga 1.1
39 spiga 1.39 ## check if has been asked for a non default file to store/read analyzed fileBlocks
40     defaultName = common.work_space.shareDir()+'AnalyzedBlocks.txt'
41     self.fileBlocks_FileName = os.path.abspath(self.cfg_params.get('CMSSW.fileblocks_file',defaultName))
42    
43 spiga 1.1
44     def checkUserSettings(self):
45     ## Events per job
46     if self.cfg_params.has_key('CMSSW.events_per_job'):
47     self.eventsPerJob =int( self.cfg_params['CMSSW.events_per_job'])
48     self.selectEventsPerJob = 1
49     else:
50     self.eventsPerJob = -1
51     self.selectEventsPerJob = 0
52    
53     ## number of jobs
54     if self.cfg_params.has_key('CMSSW.number_of_jobs'):
55     self.theNumberOfJobs =int( self.cfg_params['CMSSW.number_of_jobs'])
56     self.selectNumberOfJobs = 1
57     else:
58     self.theNumberOfJobs = 0
59     self.selectNumberOfJobs = 0
60    
61     if self.cfg_params.has_key('CMSSW.total_number_of_events'):
62     self.total_number_of_events = int(self.cfg_params['CMSSW.total_number_of_events'])
63     self.selectTotalNumberEvents = 1
64     if self.selectNumberOfJobs == 1:
65     if (self.total_number_of_events != -1) and int(self.total_number_of_events) < int(self.theNumberOfJobs):
66     msg = 'Must specify at least one event per job. total_number_of_events > number_of_jobs '
67     raise CrabException(msg)
68     else:
69     self.total_number_of_events = 0
70     self.selectTotalNumberEvents = 0
71    
72 ewv 1.37 return
73 spiga 1.1
74 ewv 1.27 def checkLumiSettings(self):
75     """
76     Check to make sure the user has specified enough information to
77     perform splitting by Lumis to run the job
78     """
79     settings = 0
80     if self.cfg_params.has_key('CMSSW.lumis_per_job'):
81     self.lumisPerJob =int( self.cfg_params['CMSSW.lumis_per_job'])
82     self.limitJobLumis = True
83     settings += 1
84    
85     if self.cfg_params.has_key('CMSSW.number_of_jobs'):
86     self.theNumberOfJobs =int( self.cfg_params['CMSSW.number_of_jobs'])
87     self.limitNJobs = True
88     settings += 1
89    
90     if self.cfg_params.has_key('CMSSW.total_number_of_lumis'):
91     self.totalNLumis = int(self.cfg_params['CMSSW.total_number_of_lumis'])
92     self.limitTotalLumis = (self.totalNLumis != -1)
93     settings += 1
94    
95     if settings != 2:
96 ewv 1.35 msg = 'When splitting by lumi section you must specify two and only two of:\n'
97 ewv 1.28 msg += ' number_of_jobs, lumis_per_job, total_number_of_lumis'
98 ewv 1.27 raise CrabException(msg)
99     if self.limitNJobs and self.limitJobLumis:
100     self.limitTotalLumis = True
101     self.totalNLumis = self.lumisPerJob * self.theNumberOfJobs
102    
103 ewv 1.37 # Has the user specified runselection?
104     if (self.cfg_params.has_key('CMSSW.runselection')):
105     common.logger.info('You have specified runselection and split by lumi.')
106     common.logger.info('Good lumi list will be the intersection of runselection and lumimask or ADS (if any).')
107     return
108 ewv 1.27
109 spiga 1.24 def ComputeSubBlockSites( self, blockSites ):
110     """
111     """
112     sub_blockSites = {}
113     for k,v in blockSites.iteritems():
114     sites=self.blackWhiteListParser.checkWhiteList(v)
115     if sites : sub_blockSites[k]=v
116     if len(sub_blockSites) < 1:
117     msg = 'WARNING: the sites %s is not hosting any part of data.'%self.seWhiteList
118     raise CrabException(msg)
119     return sub_blockSites
120    
121 spiga 1.1 ########################################################################
122     def jobSplittingByEvent( self ):
123     """
124     Perform job splitting. Jobs run over an integer number of files
125     and no more than one block.
126     ARGUMENT: blockSites: dictionary with blocks as keys and list of host sites as values
127     REQUIRES: self.selectTotalNumberEvents, self.selectEventsPerJob, self.selectNumberofJobs,
128     self.total_number_of_events, self.eventsPerJob, self.theNumberOfJobs,
129     self.maxEvents, self.filesbyblock
130 spiga 1.3 SETS: jobDestination - Site destination(s) for each job (a list of lists)
131 spiga 1.1 self.total_number_of_jobs - Total # of jobs
132     self.list_of_args - File(s) job will run on (a list of lists)
133     """
134    
135 ewv 1.22 jobDestination=[]
136 spiga 1.1 self.checkUserSettings()
137     if ( (self.selectTotalNumberEvents + self.selectEventsPerJob + self.selectNumberOfJobs) != 2 ):
138     msg = 'Must define exactly two of total_number_of_events, events_per_job, or number_of_jobs.'
139     raise CrabException(msg)
140 ewv 1.22
141     blockSites = self.args['blockSites']
142 spiga 1.4 pubdata = self.args['pubdata']
143 spiga 1.3 filesbyblock=pubdata.getFiles()
144    
145     self.eventsbyblock=pubdata.getEventsPerBlock()
146     self.eventsbyfile=pubdata.getEventsPerFile()
147     self.parentFiles=pubdata.getParent()
148 spiga 1.1
149     ## get max number of events
150 spiga 1.3 self.maxEvents=pubdata.getMaxEvents()
151 spiga 1.1
152     self.useParent = int(self.cfg_params.get('CMSSW.use_parent',0))
153     noBboundary = int(self.cfg_params.get('CMSSW.no_block_boundary',0))
154    
155 spiga 1.24 if noBboundary == 1:
156     if self.total_number_of_events== -1:
157     msg = 'You are selecting no_block_boundary=1 which does not allow to set total_number_of_events=-1\n'
158 ewv 1.26 msg +='\tYou shoud get the number of event from DBS web interface and use it for your configuration.'
159     raise CrabException(msg)
160 spiga 1.25 if len(self.seWhiteList) == 0 or len(self.seWhiteList.split(',')) != 1:
161 spiga 1.24 msg = 'You are selecting no_block_boundary=1 which requires to choose one and only one site.\n'
162     msg += "\tPlease set se_white_list with the site's storage element name."
163 ewv 1.26 raise CrabException(msg)
164     blockSites = self.ComputeSubBlockSites(blockSites)
165 spiga 1.24
166 spiga 1.1 # ---- Handle the possible job splitting configurations ---- #
167     if (self.selectTotalNumberEvents):
168     totalEventsRequested = self.total_number_of_events
169     if (self.selectEventsPerJob):
170     eventsPerJobRequested = self.eventsPerJob
171     if (self.selectNumberOfJobs):
172     totalEventsRequested = self.theNumberOfJobs * self.eventsPerJob
173    
174     # If user requested all the events in the dataset
175     if (totalEventsRequested == -1):
176     eventsRemaining=self.maxEvents
177     # If user requested more events than are in the dataset
178     elif (totalEventsRequested > self.maxEvents):
179     eventsRemaining = self.maxEvents
180 spiga 1.13 common.logger.info("Requested "+str(self.total_number_of_events)+ " events, but only "+str(self.maxEvents)+" events are available.")
181 spiga 1.1 # If user requested less events than are in the dataset
182     else:
183     eventsRemaining = totalEventsRequested
184    
185     # If user requested more events per job than are in the dataset
186     if (self.selectEventsPerJob and eventsPerJobRequested > self.maxEvents):
187     eventsPerJobRequested = self.maxEvents
188    
189     # For user info at end
190     totalEventCount = 0
191    
192     if (self.selectTotalNumberEvents and self.selectNumberOfJobs):
193     eventsPerJobRequested = int(eventsRemaining/self.theNumberOfJobs)
194    
195     if (self.selectNumberOfJobs):
196 spiga 1.13 common.logger.info("May not create the exact number_of_jobs requested.")
197 spiga 1.1
198     # old... to remove Daniele
199     totalNumberOfJobs = 999999999
200    
201 spiga 1.3 blocks = blockSites.keys()
202 spiga 1.1 blockCount = 0
203     # Backup variable in case self.maxEvents counted events in a non-included block
204     numBlocksInDataset = len(blocks)
205    
206     jobCount = 0
207     list_of_lists = []
208    
209     # list tracking which jobs are in which jobs belong to which block
210     jobsOfBlock = {}
211    
212     parString = ""
213 spiga 1.16 pString = ""
214 spiga 1.1 filesEventCount = 0
215 ewv 1.22 msg=''
216 spiga 1.1
217     # ---- Iterate over the blocks in the dataset until ---- #
218     # ---- we've met the requested total # of events ---- #
219     while ( (eventsRemaining > 0) and (blockCount < numBlocksInDataset) and (jobCount < totalNumberOfJobs)):
220     block = blocks[blockCount]
221     blockCount += 1
222     if block not in jobsOfBlock.keys() :
223     jobsOfBlock[block] = []
224    
225     if self.eventsbyblock.has_key(block) :
226     numEventsInBlock = self.eventsbyblock[block]
227 spiga 1.13 common.logger.debug('Events in Block File '+str(numEventsInBlock))
228 spiga 1.1
229 spiga 1.4 files = filesbyblock[block]
230 spiga 1.1 numFilesInBlock = len(files)
231     if (numFilesInBlock <= 0):
232     continue
233     fileCount = 0
234     if noBboundary == 0: # DD
235     # ---- New block => New job ---- #
236     parString = ""
237 spiga 1.16 pString=""
238 spiga 1.1 # counter for number of events in files currently worked on
239     filesEventCount = 0
240     # flag if next while loop should touch new file
241     newFile = 1
242     # job event counter
243     jobSkipEventCount = 0
244    
245     # ---- Iterate over the files in the block until we've met the requested ---- #
246     # ---- total # of events or we've gone over all the files in this block ---- #
247 spiga 1.15 msg='\n'
248 spiga 1.1 while ( (eventsRemaining > 0) and (fileCount < numFilesInBlock) and (jobCount < totalNumberOfJobs) ):
249     file = files[fileCount]
250     if self.useParent==1:
251     parent = self.parentFiles[file]
252 spiga 1.13 common.logger.log(10-1, "File "+str(file)+" has the following parents: "+str(parent))
253 spiga 1.1 if newFile :
254     try:
255     numEventsInFile = self.eventsbyfile[file]
256 spiga 1.13 common.logger.log(10-1, "File "+str(file)+" has "+str(numEventsInFile)+" events")
257 spiga 1.1 # increase filesEventCount
258     filesEventCount += numEventsInFile
259     # Add file to current job
260 spiga 1.11 parString += file + ','
261 spiga 1.16 if self.useParent==1:
262     for f in parent :
263     pString += f + ','
264 spiga 1.1 newFile = 0
265     except KeyError:
266 spiga 1.13 common.logger.info("File "+str(file)+" has unknown number of events: skipping")
267 spiga 1.1
268     eventsPerJobRequested = min(eventsPerJobRequested, eventsRemaining)
269     # if less events in file remain than eventsPerJobRequested
270     if ( filesEventCount - jobSkipEventCount < eventsPerJobRequested):
271     if noBboundary == 1: ## DD
272     newFile = 1
273     fileCount += 1
274     else:
275     # if last file in block
276     if ( fileCount == numFilesInBlock-1 ) :
277     # end job using last file, use remaining events in block
278     # close job and touch new file
279 spiga 1.11 fullString = parString[:-1]
280 spiga 1.1 if self.useParent==1:
281 spiga 1.11 fullParentString = pString[:-1]
282 spiga 1.50 list_of_lists.append([fullString,fullParentString,str(-1),str(jobSkipEventCount),block])
283 spiga 1.1 else:
284 spiga 1.50 list_of_lists.append([fullString,str(-1),str(jobSkipEventCount),block])
285 spiga 1.15 msg += "Job %s can run over %s events (last file in block).\n"%(str(jobCount+1), str(filesEventCount - jobSkipEventCount))
286 spiga 1.3 jobDestination.append(blockSites[block])
287 slacapra 1.20 msg += "Job %s Destination: %s\n"%(str(jobCount+1),str(SE2CMS(jobDestination[jobCount])))
288 spiga 1.1 # fill jobs of block dictionary
289     jobsOfBlock[block].append(jobCount+1)
290     # reset counter
291     jobCount = jobCount + 1
292     totalEventCount = totalEventCount + filesEventCount - jobSkipEventCount
293     eventsRemaining = eventsRemaining - filesEventCount + jobSkipEventCount
294     jobSkipEventCount = 0
295     # reset file
296     pString = ""
297     parString = ""
298     filesEventCount = 0
299     newFile = 1
300     fileCount += 1
301     else :
302     # go to next file
303     newFile = 1
304     fileCount += 1
305     # if events in file equal to eventsPerJobRequested
306     elif ( filesEventCount - jobSkipEventCount == eventsPerJobRequested ) :
307     # close job and touch new file
308 spiga 1.11 fullString = parString[:-1]
309 spiga 1.1 if self.useParent==1:
310 spiga 1.11 fullParentString = pString[:-1]
311 spiga 1.50 list_of_lists.append([fullString,fullParentString,str(eventsPerJobRequested),str(jobSkipEventCount),block])
312 spiga 1.1 else:
313 spiga 1.50 list_of_lists.append([fullString,str(eventsPerJobRequested),str(jobSkipEventCount),block])
314 spiga 1.15 msg += "Job %s can run over %s events.\n"%(str(jobCount+1),str(eventsPerJobRequested))
315 spiga 1.3 jobDestination.append(blockSites[block])
316 slacapra 1.20 msg+= "Job %s Destination: %s\n"%(str(jobCount+1),str(SE2CMS(jobDestination[jobCount])))
317 spiga 1.1 jobsOfBlock[block].append(jobCount+1)
318     # reset counter
319     jobCount = jobCount + 1
320     totalEventCount = totalEventCount + eventsPerJobRequested
321     eventsRemaining = eventsRemaining - eventsPerJobRequested
322     jobSkipEventCount = 0
323     # reset file
324     pString = ""
325     parString = ""
326     filesEventCount = 0
327     newFile = 1
328     fileCount += 1
329    
330     # if more events in file remain than eventsPerJobRequested
331     else :
332     # close job but don't touch new file
333 spiga 1.11 fullString = parString[:-1]
334 spiga 1.1 if self.useParent==1:
335 spiga 1.11 fullParentString = pString[:-1]
336 spiga 1.50 list_of_lists.append([fullString,fullParentString,str(eventsPerJobRequested),str(jobSkipEventCount),block])
337 spiga 1.1 else:
338 spiga 1.50 list_of_lists.append([fullString,str(eventsPerJobRequested),str(jobSkipEventCount),block])
339 spiga 1.15 msg += "Job %s can run over %s events.\n"%(str(jobCount+1),str(eventsPerJobRequested))
340 spiga 1.3 jobDestination.append(blockSites[block])
341 slacapra 1.20 msg+= "Job %s Destination: %s\n"%(str(jobCount+1),str(SE2CMS(jobDestination[jobCount])))
342 spiga 1.1 jobsOfBlock[block].append(jobCount+1)
343     # increase counter
344     jobCount = jobCount + 1
345     totalEventCount = totalEventCount + eventsPerJobRequested
346     eventsRemaining = eventsRemaining - eventsPerJobRequested
347     # calculate skip events for last file
348     # use filesEventCount (contains several files), jobSkipEventCount and eventsPerJobRequest
349     jobSkipEventCount = eventsPerJobRequested - (filesEventCount - jobSkipEventCount - self.eventsbyfile[file])
350     # remove all but the last file
351     filesEventCount = self.eventsbyfile[file]
352 spiga 1.16 pString_tmp=''
353 spiga 1.1 if self.useParent==1:
354 spiga 1.16 for f in parent : pString_tmp += f + ','
355 ewv 1.22 pString = pString_tmp
356 spiga 1.11 parString = file + ','
357 spiga 1.1 pass # END if
358     pass # END while (iterate over files in the block)
359     pass # END while (iterate over blocks in the dataset)
360 spiga 1.15 common.logger.debug(msg)
361 spiga 1.1 self.ncjobs = self.total_number_of_jobs = jobCount
362     if (eventsRemaining > 0 and jobCount < totalNumberOfJobs ):
363 spiga 1.13 common.logger.info("Could not run on all requested events because some blocks not hosted at allowed sites.")
364     common.logger.info(str(jobCount)+" job(s) can run on "+str(totalEventCount)+" events.\n")
365 ewv 1.22
366 spiga 1.1 # skip check on block with no sites DD
367 spiga 1.5 if noBboundary == 0 : self.checkBlockNoSite(blocks,jobsOfBlock,blockSites)
368 spiga 1.1
369     # prepare dict output
370     dictOut = {}
371 spiga 1.49 dictOut['params']= ['InputFiles','MaxEvents','SkipEvents','InputBlocks']
372     if self.useParent: dictOut['params']= ['InputFiles','ParentFiles','MaxEvents','SkipEvents','InputBlocks']
373 spiga 1.1 dictOut['args'] = list_of_lists
374 spiga 1.3 dictOut['jobDestination'] = jobDestination
375 spiga 1.1 dictOut['njobs']=self.total_number_of_jobs
376    
377     return dictOut
378    
379     # keep trace of block with no sites to print a warning at the end
380    
381 ewv 1.22 def checkBlockNoSite(self,blocks,jobsOfBlock,blockSites):
382 spiga 1.1 # screen output
383     screenOutput = "List of jobs and available destination sites:\n\n"
384     noSiteBlock = []
385     bloskNoSite = []
386 spiga 1.10 allBlock = []
387 spiga 1.1
388     blockCounter = 0
389 spiga 1.39 saveFblocks =''
390 spiga 1.1 for block in blocks:
391     if block in jobsOfBlock.keys() :
392     blockCounter += 1
393 spiga 1.10 allBlock.append( blockCounter )
394 slacapra 1.19 sites=self.blackWhiteListParser.checkWhiteList(self.blackWhiteListParser.checkBlackList(blockSites[block],[block]),[block])
395 spiga 1.1 screenOutput += "Block %5i: jobs %20s: sites: %s\n" % (blockCounter,spanRanges(jobsOfBlock[block]),
396 slacapra 1.20 ', '.join(SE2CMS(sites)))
397 slacapra 1.19 if len(sites) == 0:
398 spiga 1.1 noSiteBlock.append( spanRanges(jobsOfBlock[block]) )
399     bloskNoSite.append( blockCounter )
400 spiga 1.39 else:
401     saveFblocks += str(block)+'\n'
402     writeTXTfile(self, self.fileBlocks_FileName , saveFblocks)
403 spiga 1.1
404 spiga 1.13 common.logger.info(screenOutput)
405 spiga 1.1 if len(noSiteBlock) > 0 and len(bloskNoSite) > 0:
406     msg = 'WARNING: No sites are hosting any part of data for block:\n '
407     virgola = ""
408     if len(bloskNoSite) > 1:
409     virgola = ","
410     for block in bloskNoSite:
411     msg += ' ' + str(block) + virgola
412 spiga 1.25 msg += '\n\t\tRelated jobs:\n '
413 spiga 1.1 virgola = ""
414     if len(noSiteBlock) > 1:
415     virgola = ","
416     for range_jobs in noSiteBlock:
417     msg += str(range_jobs) + virgola
418 spiga 1.25 msg += '\n\t\twill not be submitted and this block of data can not be analyzed!\n'
419 spiga 1.14 if self.cfg_params.has_key('GRID.se_white_list'):
420 spiga 1.25 msg += '\tWARNING: SE White List: '+self.cfg_params['GRID.se_white_list']+'\n'
421     msg += '\t(Hint: By whitelisting you force the job to run at this particular site(s).\n'
422     msg += '\tPlease check if the dataset is available at this site!)'
423 spiga 1.14 if self.cfg_params.has_key('GRID.ce_white_list'):
424 spiga 1.25 msg += '\tWARNING: CE White List: '+self.cfg_params['GRID.ce_white_list']+'\n'
425     msg += '\t(Hint: By whitelisting you force the job to run at this particular site(s).\n'
426     msg += '\tPlease check if the dataset is available at this site!)\n'
427 spiga 1.1
428 spiga 1.13 common.logger.info(msg)
429 spiga 1.1
430 spiga 1.10 if bloskNoSite == allBlock:
431 spiga 1.32 msg += 'Requested jobs cannot be Created! \n'
432     if self.cfg_params.has_key('GRID.se_white_list'):
433     msg += '\tWARNING: SE White List: '+self.cfg_params['GRID.se_white_list']+'\n'
434     msg += '\t(Hint: By whitelisting you force the job to run at this particular site(s).\n'
435     msg += '\tPlease check if the dataset is available at this site!)'
436     if self.cfg_params.has_key('GRID.ce_white_list'):
437     msg += '\tWARNING: CE White List: '+self.cfg_params['GRID.ce_white_list']+'\n'
438     msg += '\t(Hint: By whitelisting you force the job to run at this particular site(s).\n'
439     msg += '\tPlease check if the dataset is available at this site!)\n'
440     raise CrabException(msg)
441 spiga 1.10
442 spiga 1.1 return
443    
444    
445     ########################################################################
446 ewv 1.22 def jobSplittingByRun(self):
447 spiga 1.1 """
448     """
449    
450     self.checkUserSettings()
451 ewv 1.22 blockSites = self.args['blockSites']
452 spiga 1.4 pubdata = self.args['pubdata']
453 spiga 1.1
454     if self.selectNumberOfJobs == 0 :
455     self.theNumberOfJobs = 9999999
456     blocks = {}
457 ewv 1.22 runList = []
458 spiga 1.1 thefiles = Fileset(name='FilesToSplit')
459 spiga 1.3 fileList = pubdata.getListFiles()
460 spiga 1.1 for f in fileList:
461     block = f['Block']['Name']
462 ewv 1.22 try:
463 spiga 1.3 f['Block']['StorageElementList'].extend(blockSites[block])
464 spiga 1.1 except:
465     continue
466     wmbsFile = File(f['LogicalFileName'])
467 spiga 1.53 if not blockSites[block]:
468     msg = 'WARNING: No sites are hosting any part of data for block: %s\n' %block
469     msg += 'Related jobs will not be submitted and this block of data can not be analyzed'
470     common.logger.debug(msg)
471 spiga 1.3 [ wmbsFile['locations'].add(x) for x in blockSites[block] ]
472 spiga 1.1 wmbsFile['block'] = block
473     runNum = f['RunsList'][0]['RunNumber']
474 ewv 1.22 runList.append(runNum)
475 spiga 1.1 myRun = Run(runNumber=runNum)
476     wmbsFile.addRun( myRun )
477     thefiles.addFile(
478     wmbsFile
479     )
480 ewv 1.22
481 spiga 1.1 work = Workflow()
482     subs = Subscription(
483     fileset = thefiles,
484     workflow = work,
485     split_algo = 'RunBased',
486     type = "Processing")
487     splitter = SplitterFactory()
488     jobfactory = splitter(subs)
489 ewv 1.22
490     #loop over all runs
491 spiga 1.1 list_of_lists = []
492     jobDestination = []
493 spiga 1.39 list_of_blocks = []
494 spiga 1.1 count = 0
495 spiga 1.17 for jobGroup in jobfactory():
496 spiga 1.1 if count < self.theNumberOfJobs:
497 spiga 1.17 res = self.getJobInfo(jobGroup)
498 ewv 1.22 parString = ''
499 spiga 1.1 for file in res['lfns']:
500 spiga 1.11 parString += file + ','
501 spiga 1.49 list_of_blocks.append(res['block'])
502 spiga 1.11 fullString = parString[:-1]
503 spiga 1.49 blockString=','.join(list_of_blocks)
504 spiga 1.52 list_of_lists.append([fullString,str(-1),str(0),blockString])
505 spiga 1.2 #need to check single file location
506 ewv 1.22 jobDestination.append(res['locations'])
507 spiga 1.1 count +=1
508 ewv 1.37 # prepare dict output
509 spiga 1.1 dictOut = {}
510 spiga 1.49 dictOut['params']= ['InputFiles','MaxEvents','SkipEvents','InputBlocks']
511 spiga 1.1 dictOut['args'] = list_of_lists
512     dictOut['jobDestination'] = jobDestination
513     dictOut['njobs']=count
514 ewv 1.41 self.cacheBlocks(list_of_blocks,jobDestination)
515 spiga 1.39
516 spiga 1.1 return dictOut
517    
518     def getJobInfo( self,jobGroup ):
519     res = {}
520 ewv 1.22 lfns = []
521     locations = []
522 spiga 1.1 tmp_check=0
523     for job in jobGroup.jobs:
524     for file in job.getFiles():
525 ewv 1.22 lfns.append(file['lfn'])
526 spiga 1.1 for loc in file['locations']:
527     if tmp_check < 1 :
528     locations.append(loc)
529 spiga 1.39 res['block']= file['block']
530 ewv 1.22 tmp_check = tmp_check + 1
531     res['lfns'] = lfns
532     res['locations'] = locations
533     return res
534    
535 spiga 1.1 ########################################################################
536 spiga 1.23 def prepareSplittingNoInput(self):
537 spiga 1.1 """
538     """
539     if (self.selectEventsPerJob):
540 spiga 1.13 common.logger.info('Required '+str(self.eventsPerJob)+' events per job ')
541 spiga 1.1 if (self.selectNumberOfJobs):
542 spiga 1.13 common.logger.info('Required '+str(self.theNumberOfJobs)+' jobs in total ')
543 spiga 1.1 if (self.selectTotalNumberEvents):
544 spiga 1.13 common.logger.info('Required '+str(self.total_number_of_events)+' events in total ')
545 spiga 1.1
546     if (self.total_number_of_events < 0):
547     msg='Cannot split jobs per Events with "-1" as total number of events'
548     raise CrabException(msg)
549    
550     if (self.selectEventsPerJob):
551     if (self.selectTotalNumberEvents):
552     self.total_number_of_jobs = int(self.total_number_of_events/self.eventsPerJob)
553     elif(self.selectNumberOfJobs) :
554     self.total_number_of_jobs =self.theNumberOfJobs
555     self.total_number_of_events =int(self.theNumberOfJobs*self.eventsPerJob)
556    
557     elif (self.selectNumberOfJobs) :
558     self.total_number_of_jobs = self.theNumberOfJobs
559     self.eventsPerJob = int(self.total_number_of_events/self.total_number_of_jobs)
560    
561 spiga 1.23
562     def jobSplittingNoInput(self):
563     """
564     Perform job splitting based on number of event per job
565     """
566     common.logger.debug('Splitting per events')
567     self.checkUserSettings()
568     jobDestination=[]
569     if ( (self.selectTotalNumberEvents + self.selectEventsPerJob + self.selectNumberOfJobs) != 2 ):
570     msg = 'Must define exactly two of total_number_of_events, events_per_job, or number_of_jobs.'
571     raise CrabException(msg)
572    
573     managedGenerators =self.args['managedGenerators']
574     generator = self.args['generator']
575 ewv 1.30 firstLumi = self.cfg_params.get('CMSSW.first_lumi', 1)
576 spiga 1.23
577     self.prepareSplittingNoInput()
578    
579 spiga 1.13 common.logger.debug('N jobs '+str(self.total_number_of_jobs))
580 spiga 1.1
581     # is there any remainder?
582     check = int(self.total_number_of_events) - (int(self.total_number_of_jobs)*self.eventsPerJob)
583    
584 spiga 1.13 common.logger.debug('Check '+str(check))
585 spiga 1.1
586 spiga 1.13 common.logger.info(str(self.total_number_of_jobs)+' jobs can be created, each for '+str(self.eventsPerJob)+' for a total of '+str(self.total_number_of_jobs*self.eventsPerJob)+' events')
587 spiga 1.1 if check > 0:
588 spiga 1.13 common.logger.info('Warning: asked '+str(self.total_number_of_events)+' but can do only '+str(int(self.total_number_of_jobs)*self.eventsPerJob))
589 spiga 1.1
590     # argument is seed number.$i
591     self.list_of_args = []
592     for i in range(self.total_number_of_jobs):
593     ## Since there is no input, any site is good
594 ewv 1.29 jobDestination.append([""]) # must be empty to correctly write the XML
595 spiga 1.1 args=[]
596 ewv 1.30 if (firstLumi): # Pythia first lumi
597     args.append(str(int(firstLumi)+i))
598 spiga 1.3 if (generator in managedGenerators):
599 ewv 1.22 args.append(generator)
600     if (generator == 'comphep' and i == 0):
601 spiga 1.1 # COMPHEP is brain-dead and wants event #'s like 1,100,200,300
602     args.append('1')
603 ewv 1.22 else:
604 spiga 1.1 args.append(str(i*self.eventsPerJob))
605 spiga 1.7 args.append(str(self.eventsPerJob))
606 spiga 1.1 self.list_of_args.append(args)
607     # prepare dict output
608 spiga 1.11
609 spiga 1.1 dictOut = {}
610 spiga 1.11 dictOut['params'] = ['MaxEvents']
611 ewv 1.30 if (firstLumi):
612     dictOut['params'] = ['FirstLumi','MaxEvents']
613     if (generator in managedGenerators):
614     dictOut['params'] = ['FirstLumi', 'Generator', 'FirstEvent', 'MaxEvents']
615 ewv 1.22 else:
616     if (generator in managedGenerators) :
617     dictOut['params'] = ['Generator', 'FirstEvent', 'MaxEvents']
618 spiga 1.1 dictOut['args'] = self.list_of_args
619 spiga 1.3 dictOut['jobDestination'] = jobDestination
620 spiga 1.1 dictOut['njobs']=self.total_number_of_jobs
621    
622     return dictOut
623    
624    
625     def jobSplittingForScript(self):
626     """
627     Perform job splitting based on number of job
628     """
629     self.checkUserSettings()
630 spiga 1.3 if (self.selectNumberOfJobs == 0):
631 spiga 1.1 msg = 'must specify number_of_jobs.'
632     raise crabexception(msg)
633 spiga 1.3 jobDestination = []
634 spiga 1.13 common.logger.debug('Splitting per job')
635     common.logger.info('Required '+str(self.theNumberOfJobs)+' jobs in total ')
636 spiga 1.1
637 spiga 1.23 # self.total_number_of_jobs = self.theNumberOfJobs
638    
639     self.prepareSplittingNoInput()
640 spiga 1.1
641 spiga 1.13 common.logger.debug('N jobs '+str(self.total_number_of_jobs))
642 spiga 1.1
643 spiga 1.13 common.logger.info(str(self.total_number_of_jobs)+' jobs can be created')
644 spiga 1.1
645     # argument is seed number.$i
646 spiga 1.23 self.list_of_args = []
647 spiga 1.1 for i in range(self.total_number_of_jobs):
648 spiga 1.23 args=[]
649 spiga 1.3 jobDestination.append([""])
650 spiga 1.23 if self.eventsPerJob != 0 :
651     args.append(str(self.eventsPerJob))
652     self.list_of_args.append(args)
653 spiga 1.1
654     # prepare dict output
655     dictOut = {}
656 spiga 1.23 dictOut['params'] = ['MaxEvents']
657     dictOut['args'] = self.list_of_args
658 spiga 1.3 dictOut['jobDestination'] = jobDestination
659 spiga 1.1 dictOut['njobs']=self.total_number_of_jobs
660     return dictOut
661    
662 ewv 1.22
663     def jobSplittingByLumi(self):
664 spiga 1.1 """
665 ewv 1.26 Split task into jobs by Lumi section paying attention to which
666     lumis should be run (according to the analysis dataset).
667     This uses WMBS job splitting which does not split files over jobs
668     so the job will have AT LEAST as many lumis as requested, perhaps
669     more
670 spiga 1.1 """
671 spiga 1.46 self.useParent = int(self.cfg_params.get('CMSSW.use_parent',0))
672 ewv 1.26 common.logger.debug('Splitting by Lumi')
673 ewv 1.27 self.checkLumiSettings()
674 ewv 1.26
675     blockSites = self.args['blockSites']
676     pubdata = self.args['pubdata']
677    
678     lumisPerFile = pubdata.getLumis()
679 spiga 1.46 self.parentFiles=pubdata.getParent()
680 ewv 1.26 # Make the list of WMBS files for job splitter
681     fileList = pubdata.getListFiles()
682 ewv 1.45 wmFileList = []
683 ewv 1.26 for jobFile in fileList:
684     block = jobFile['Block']['Name']
685     try:
686     jobFile['Block']['StorageElementList'].extend(blockSites[block])
687     except:
688     continue
689     wmbsFile = File(jobFile['LogicalFileName'])
690 ewv 1.42 if not blockSites[block]:
691 spiga 1.53 msg = 'WARNING: No sites are hosting any part of data for block: %s\n' %block
692     msg += 'Related jobs will not be submitted and this block of data can not be analyzed'
693     common.logger.debug(msg)
694     # wmbsFile['locations'].add('Nowhere')
695 ewv 1.26 [ wmbsFile['locations'].add(x) for x in blockSites[block] ]
696     wmbsFile['block'] = block
697     for lumi in lumisPerFile[jobFile['LogicalFileName']]:
698     wmbsFile.addRun(Run(lumi[0], lumi[1]))
699 ewv 1.45 wmFileList.append(wmbsFile)
700    
701     fileSet = set(wmFileList)
702     thefiles = Fileset(name='FilesToSplit', files = fileSet)
703 ewv 1.26
704 ewv 1.27 # Create the factory and workflow
705 ewv 1.26 work = Workflow()
706     subs = Subscription(fileset = thefiles, workflow = work,
707     split_algo = 'LumiBased', type = "Processing")
708     splitter = SplitterFactory()
709     jobFactory = splitter(subs)
710    
711     list_of_lists = []
712     jobDestination = []
713     jobCount = 0
714 ewv 1.27 lumisCreated = 0
715 spiga 1.39 list_of_blocks = []
716 ewv 1.27 if not self.limitJobLumis:
717 ewv 1.48 if self.totalNLumis > 0:
718 ewv 1.47 self.lumisPerJob = max(self.totalNLumis // self.theNumberOfJobs,1)
719     else:
720     self.lumisPerJob = pubdata.getMaxLumis() // self.theNumberOfJobs + 1
721 ewv 1.27 common.logger.info('Each job will process about %s lumis.' %
722     self.lumisPerJob)
723    
724 ewv 1.38 for jobGroup in jobFactory(lumis_per_job = self.lumisPerJob):
725 ewv 1.26 for job in jobGroup.jobs:
726 ewv 1.27 if (self.limitNJobs and jobCount >= self.theNumberOfJobs):
727 ewv 1.44 common.logger.info('Requested number of jobs reached.')
728 ewv 1.27 break
729     if (self.limitTotalLumis and lumisCreated >= self.totalNLumis):
730 ewv 1.44 common.logger.info('Requested number of lumis reached.')
731 ewv 1.27 break
732     lumis = []
733     lfns = []
734 spiga 1.46 if self.useParent==1:
735 ewv 1.47 parentlfns = []
736 spiga 1.46 pString =""
737    
738 ewv 1.27 locations = []
739 spiga 1.39 blocks = []
740 ewv 1.27 firstFile = True
741     # Collect information from all the files
742     for jobFile in job.getFiles():
743 ewv 1.34 doFile = False
744 ewv 1.27 if firstFile: # Get locations from first file in the job
745     for loc in jobFile['locations']:
746     locations.append(loc)
747 spiga 1.39 blocks.append(jobFile['block'])
748 ewv 1.27 firstFile = False
749     # Accumulate Lumis from all files
750     for lumiList in jobFile['runs']:
751     theRun = lumiList.run
752     for theLumi in list(lumiList):
753 ewv 1.35 if (not self.limitTotalLumis) or \
754 ewv 1.38 (lumisCreated < self.totalNLumis):
755 ewv 1.34 doFile = True
756 ewv 1.36 lumisCreated += 1
757 ewv 1.34 lumis.append( (theRun, theLumi) )
758     if doFile:
759     lfns.append(jobFile['lfn'])
760 spiga 1.46 if self.useParent==1:
761     parent = self.parentFiles[jobFile['lfn']]
762     for p in parent :
763     pString += p + ','
764 ewv 1.27 fileString = ','.join(lfns)
765 spiga 1.33 lumiLister = LumiList(lumis = lumis)
766     lumiString = lumiLister.getCMSSWString()
767 spiga 1.49 blockString=','.join(blocks)
768 spiga 1.46 if self.useParent==1:
769     common.logger.debug("Files: "+fileString+" with the following parents: "+pString[:-1])
770     pfileString = pString[:-1]
771 spiga 1.49 list_of_lists.append([fileString, pfileString, str(-1), str(0), lumiString,blockString])
772 ewv 1.47 else:
773 spiga 1.49 list_of_lists.append([fileString, str(-1), str(0), lumiString, blockString])
774 spiga 1.39 list_of_blocks.append(blocks)
775 ewv 1.27 jobDestination.append(locations)
776     jobCount += 1
777     common.logger.debug('Job %s will run on %s files and %s lumis '
778     % (jobCount, len(lfns), len(lumis) ))
779    
780     common.logger.info('%s jobs created to run on %s lumis' %
781     (jobCount, lumisCreated))
782 ewv 1.26
783     # Prepare dict output matching back to non-WMBS job creation
784     dictOut = {}
785 spiga 1.49 dictOut['params'] = ['InputFiles', 'MaxEvents', 'SkipEvents', 'Lumis','InputBlocks']
786 ewv 1.47 if self.useParent==1:
787 spiga 1.49 dictOut['params']= ['InputFiles','ParentFiles','MaxEvents','SkipEvents','Lumis','InputBlocks']
788 ewv 1.26 dictOut['args'] = list_of_lists
789     dictOut['jobDestination'] = jobDestination
790     dictOut['njobs'] = jobCount
791 ewv 1.41 self.cacheBlocks(list_of_blocks,jobDestination)
792 spiga 1.39
793 ewv 1.26 return dictOut
794    
795 spiga 1.39 def cacheBlocks(self, blocks,destinations):
796    
797     saveFblocks=''
798     for i in range(len(blocks)):
799     sites=self.blackWhiteListParser.checkWhiteList(self.blackWhiteListParser.checkBlackList(destinations[i]))
800     if len(sites) != 0:
801 spiga 1.40 for block in blocks[i]:
802     saveFblocks += str(block)+'\n'
803 spiga 1.39 writeTXTfile(self, self.fileBlocks_FileName , saveFblocks)
804 ewv 1.26
805 spiga 1.1 def Algos(self):
806     """
807     Define key splittingType matrix
808     """
809 ewv 1.22 SplitAlogs = {
810     'EventBased' : self.jobSplittingByEvent,
811 spiga 1.1 'RunBased' : self.jobSplittingByRun,
812 ewv 1.22 'LumiBased' : self.jobSplittingByLumi,
813     'NoInput' : self.jobSplittingNoInput,
814 spiga 1.1 'ForScript' : self.jobSplittingForScript
815 ewv 1.22 }
816 spiga 1.1 return SplitAlogs
817