ViewVC Help
View File | Revision Log | Show Annotations | Root Listing
root/cvsroot/COMP/CRAB/python/Splitter.py
Revision: 1.49
Committed: Wed Mar 2 10:48:46 2011 UTC (14 years, 1 month ago) by spiga
Content type: text/x-python
Branch: MAIN
CVS Tags: CRAB_2_7_8_dash
Changes since 1.48: +19 -19 lines
Log Message:
send files and block info to the dashboard

File Contents

# User Rev Content
1 ewv 1.27
2 spiga 1.49 __revision__ = "$Id: Splitter.py,v 1.48 2010/12/01 15:44:09 ewv Exp $"
3     __version__ = "$Revision: 1.48 $"
4 ewv 1.27
5 spiga 1.1 import common
6     from crab_exceptions import *
7     from crab_util import *
8 ewv 1.26
9     from WMCore.DataStructs.File import File
10     from WMCore.DataStructs.Fileset import Fileset
11     from WMCore.DataStructs.Run import Run
12     from WMCore.DataStructs.Subscription import Subscription
13     from WMCore.DataStructs.Workflow import Workflow
14     from WMCore.JobSplitting.SplitterFactory import SplitterFactory
15 spiga 1.1 from WMCore.SiteScreening.BlackWhiteListParser import SEBlackWhiteListParser
16 ewv 1.43 try: # Can remove when CMSSW 3.7 and earlier are dropped
17     from FWCore.PythonUtilities.LumiList import LumiList
18     except ImportError:
19     from LumiList import LumiList
20 spiga 1.1
21     class JobSplitter:
22     def __init__( self, cfg_params, args ):
23     self.cfg_params = cfg_params
24 spiga 1.3 self.args=args
25 ewv 1.27
26     self.lumisPerJob = -1
27     self.totalNLumis = 0
28     self.theNumberOfJobs = 0
29     self.limitNJobs = False
30     self.limitTotalLumis = False
31     self.limitJobLumis = False
32    
33 spiga 1.1 #self.maxEvents
34     # init BlackWhiteListParser
35 spiga 1.24 self.seWhiteList = cfg_params.get('GRID.se_white_list',[])
36 spiga 1.14 seBlackList = cfg_params.get('GRID.se_black_list',[])
37 spiga 1.24 self.blackWhiteListParser = SEBlackWhiteListParser(self.seWhiteList, seBlackList, common.logger())
38 spiga 1.1
39 spiga 1.39 ## check if has been asked for a non default file to store/read analyzed fileBlocks
40     defaultName = common.work_space.shareDir()+'AnalyzedBlocks.txt'
41     self.fileBlocks_FileName = os.path.abspath(self.cfg_params.get('CMSSW.fileblocks_file',defaultName))
42    
43 spiga 1.1
44     def checkUserSettings(self):
45     ## Events per job
46     if self.cfg_params.has_key('CMSSW.events_per_job'):
47     self.eventsPerJob =int( self.cfg_params['CMSSW.events_per_job'])
48     self.selectEventsPerJob = 1
49     else:
50     self.eventsPerJob = -1
51     self.selectEventsPerJob = 0
52    
53     ## number of jobs
54     if self.cfg_params.has_key('CMSSW.number_of_jobs'):
55     self.theNumberOfJobs =int( self.cfg_params['CMSSW.number_of_jobs'])
56     self.selectNumberOfJobs = 1
57     else:
58     self.theNumberOfJobs = 0
59     self.selectNumberOfJobs = 0
60    
61     if self.cfg_params.has_key('CMSSW.total_number_of_events'):
62     self.total_number_of_events = int(self.cfg_params['CMSSW.total_number_of_events'])
63     self.selectTotalNumberEvents = 1
64     if self.selectNumberOfJobs == 1:
65     if (self.total_number_of_events != -1) and int(self.total_number_of_events) < int(self.theNumberOfJobs):
66     msg = 'Must specify at least one event per job. total_number_of_events > number_of_jobs '
67     raise CrabException(msg)
68     else:
69     self.total_number_of_events = 0
70     self.selectTotalNumberEvents = 0
71    
72 ewv 1.37 return
73 spiga 1.1
74 ewv 1.27 def checkLumiSettings(self):
75     """
76     Check to make sure the user has specified enough information to
77     perform splitting by Lumis to run the job
78     """
79     settings = 0
80     if self.cfg_params.has_key('CMSSW.lumis_per_job'):
81     self.lumisPerJob =int( self.cfg_params['CMSSW.lumis_per_job'])
82     self.limitJobLumis = True
83     settings += 1
84    
85     if self.cfg_params.has_key('CMSSW.number_of_jobs'):
86     self.theNumberOfJobs =int( self.cfg_params['CMSSW.number_of_jobs'])
87     self.limitNJobs = True
88     settings += 1
89    
90     if self.cfg_params.has_key('CMSSW.total_number_of_lumis'):
91     self.totalNLumis = int(self.cfg_params['CMSSW.total_number_of_lumis'])
92     self.limitTotalLumis = (self.totalNLumis != -1)
93     settings += 1
94    
95     if settings != 2:
96 ewv 1.35 msg = 'When splitting by lumi section you must specify two and only two of:\n'
97 ewv 1.28 msg += ' number_of_jobs, lumis_per_job, total_number_of_lumis'
98 ewv 1.27 raise CrabException(msg)
99     if self.limitNJobs and self.limitJobLumis:
100     self.limitTotalLumis = True
101     self.totalNLumis = self.lumisPerJob * self.theNumberOfJobs
102    
103 ewv 1.37 # Has the user specified runselection?
104     if (self.cfg_params.has_key('CMSSW.runselection')):
105     common.logger.info('You have specified runselection and split by lumi.')
106     common.logger.info('Good lumi list will be the intersection of runselection and lumimask or ADS (if any).')
107     return
108 ewv 1.27
109 spiga 1.24 def ComputeSubBlockSites( self, blockSites ):
110     """
111     """
112     sub_blockSites = {}
113     for k,v in blockSites.iteritems():
114     sites=self.blackWhiteListParser.checkWhiteList(v)
115     if sites : sub_blockSites[k]=v
116     if len(sub_blockSites) < 1:
117     msg = 'WARNING: the sites %s is not hosting any part of data.'%self.seWhiteList
118     raise CrabException(msg)
119     return sub_blockSites
120    
121 spiga 1.1 ########################################################################
122     def jobSplittingByEvent( self ):
123     """
124     Perform job splitting. Jobs run over an integer number of files
125     and no more than one block.
126     ARGUMENT: blockSites: dictionary with blocks as keys and list of host sites as values
127     REQUIRES: self.selectTotalNumberEvents, self.selectEventsPerJob, self.selectNumberofJobs,
128     self.total_number_of_events, self.eventsPerJob, self.theNumberOfJobs,
129     self.maxEvents, self.filesbyblock
130 spiga 1.3 SETS: jobDestination - Site destination(s) for each job (a list of lists)
131 spiga 1.1 self.total_number_of_jobs - Total # of jobs
132     self.list_of_args - File(s) job will run on (a list of lists)
133     """
134    
135 ewv 1.22 jobDestination=[]
136 spiga 1.1 self.checkUserSettings()
137     if ( (self.selectTotalNumberEvents + self.selectEventsPerJob + self.selectNumberOfJobs) != 2 ):
138     msg = 'Must define exactly two of total_number_of_events, events_per_job, or number_of_jobs.'
139     raise CrabException(msg)
140 ewv 1.22
141     blockSites = self.args['blockSites']
142 spiga 1.4 pubdata = self.args['pubdata']
143 spiga 1.3 filesbyblock=pubdata.getFiles()
144    
145     self.eventsbyblock=pubdata.getEventsPerBlock()
146     self.eventsbyfile=pubdata.getEventsPerFile()
147     self.parentFiles=pubdata.getParent()
148 spiga 1.1
149     ## get max number of events
150 spiga 1.3 self.maxEvents=pubdata.getMaxEvents()
151 spiga 1.1
152     self.useParent = int(self.cfg_params.get('CMSSW.use_parent',0))
153     noBboundary = int(self.cfg_params.get('CMSSW.no_block_boundary',0))
154    
155 spiga 1.24 if noBboundary == 1:
156     if self.total_number_of_events== -1:
157     msg = 'You are selecting no_block_boundary=1 which does not allow to set total_number_of_events=-1\n'
158 ewv 1.26 msg +='\tYou shoud get the number of event from DBS web interface and use it for your configuration.'
159     raise CrabException(msg)
160 spiga 1.25 if len(self.seWhiteList) == 0 or len(self.seWhiteList.split(',')) != 1:
161 spiga 1.24 msg = 'You are selecting no_block_boundary=1 which requires to choose one and only one site.\n'
162     msg += "\tPlease set se_white_list with the site's storage element name."
163 ewv 1.26 raise CrabException(msg)
164     blockSites = self.ComputeSubBlockSites(blockSites)
165 spiga 1.24
166 spiga 1.1 # ---- Handle the possible job splitting configurations ---- #
167     if (self.selectTotalNumberEvents):
168     totalEventsRequested = self.total_number_of_events
169     if (self.selectEventsPerJob):
170     eventsPerJobRequested = self.eventsPerJob
171     if (self.selectNumberOfJobs):
172     totalEventsRequested = self.theNumberOfJobs * self.eventsPerJob
173    
174     # If user requested all the events in the dataset
175     if (totalEventsRequested == -1):
176     eventsRemaining=self.maxEvents
177     # If user requested more events than are in the dataset
178     elif (totalEventsRequested > self.maxEvents):
179     eventsRemaining = self.maxEvents
180 spiga 1.13 common.logger.info("Requested "+str(self.total_number_of_events)+ " events, but only "+str(self.maxEvents)+" events are available.")
181 spiga 1.1 # If user requested less events than are in the dataset
182     else:
183     eventsRemaining = totalEventsRequested
184    
185     # If user requested more events per job than are in the dataset
186     if (self.selectEventsPerJob and eventsPerJobRequested > self.maxEvents):
187     eventsPerJobRequested = self.maxEvents
188    
189     # For user info at end
190     totalEventCount = 0
191    
192     if (self.selectTotalNumberEvents and self.selectNumberOfJobs):
193     eventsPerJobRequested = int(eventsRemaining/self.theNumberOfJobs)
194    
195     if (self.selectNumberOfJobs):
196 spiga 1.13 common.logger.info("May not create the exact number_of_jobs requested.")
197 spiga 1.1
198     # old... to remove Daniele
199     totalNumberOfJobs = 999999999
200    
201 spiga 1.3 blocks = blockSites.keys()
202 spiga 1.1 blockCount = 0
203     # Backup variable in case self.maxEvents counted events in a non-included block
204     numBlocksInDataset = len(blocks)
205    
206     jobCount = 0
207     list_of_lists = []
208    
209     # list tracking which jobs are in which jobs belong to which block
210     jobsOfBlock = {}
211    
212     parString = ""
213 spiga 1.16 pString = ""
214 spiga 1.1 filesEventCount = 0
215 ewv 1.22 msg=''
216 spiga 1.1
217     # ---- Iterate over the blocks in the dataset until ---- #
218     # ---- we've met the requested total # of events ---- #
219     while ( (eventsRemaining > 0) and (blockCount < numBlocksInDataset) and (jobCount < totalNumberOfJobs)):
220     block = blocks[blockCount]
221     blockCount += 1
222     if block not in jobsOfBlock.keys() :
223     jobsOfBlock[block] = []
224    
225     if self.eventsbyblock.has_key(block) :
226     numEventsInBlock = self.eventsbyblock[block]
227 spiga 1.13 common.logger.debug('Events in Block File '+str(numEventsInBlock))
228 spiga 1.1
229 spiga 1.4 files = filesbyblock[block]
230 spiga 1.1 numFilesInBlock = len(files)
231     if (numFilesInBlock <= 0):
232     continue
233     fileCount = 0
234     if noBboundary == 0: # DD
235     # ---- New block => New job ---- #
236     parString = ""
237 spiga 1.16 pString=""
238 spiga 1.1 # counter for number of events in files currently worked on
239     filesEventCount = 0
240     # flag if next while loop should touch new file
241     newFile = 1
242     # job event counter
243     jobSkipEventCount = 0
244    
245     # ---- Iterate over the files in the block until we've met the requested ---- #
246     # ---- total # of events or we've gone over all the files in this block ---- #
247 spiga 1.15 msg='\n'
248 spiga 1.1 while ( (eventsRemaining > 0) and (fileCount < numFilesInBlock) and (jobCount < totalNumberOfJobs) ):
249     file = files[fileCount]
250     if self.useParent==1:
251     parent = self.parentFiles[file]
252 spiga 1.13 common.logger.log(10-1, "File "+str(file)+" has the following parents: "+str(parent))
253 spiga 1.1 if newFile :
254     try:
255     numEventsInFile = self.eventsbyfile[file]
256 spiga 1.13 common.logger.log(10-1, "File "+str(file)+" has "+str(numEventsInFile)+" events")
257 spiga 1.1 # increase filesEventCount
258     filesEventCount += numEventsInFile
259     # Add file to current job
260 spiga 1.11 parString += file + ','
261 spiga 1.16 if self.useParent==1:
262     for f in parent :
263     pString += f + ','
264 spiga 1.1 newFile = 0
265     except KeyError:
266 spiga 1.13 common.logger.info("File "+str(file)+" has unknown number of events: skipping")
267 spiga 1.1
268     eventsPerJobRequested = min(eventsPerJobRequested, eventsRemaining)
269     # if less events in file remain than eventsPerJobRequested
270     if ( filesEventCount - jobSkipEventCount < eventsPerJobRequested):
271     if noBboundary == 1: ## DD
272     newFile = 1
273     fileCount += 1
274     else:
275     # if last file in block
276     if ( fileCount == numFilesInBlock-1 ) :
277     # end job using last file, use remaining events in block
278     # close job and touch new file
279 spiga 1.11 fullString = parString[:-1]
280 spiga 1.1 if self.useParent==1:
281 spiga 1.11 fullParentString = pString[:-1]
282 spiga 1.49 list_of_lists.append([fullString,fullParentString,str(-1),str(jobSkipEventCount)],block)
283 spiga 1.1 else:
284 spiga 1.49 list_of_lists.append([fullString,str(-1),str(jobSkipEventCount)],block)
285 spiga 1.15 msg += "Job %s can run over %s events (last file in block).\n"%(str(jobCount+1), str(filesEventCount - jobSkipEventCount))
286 spiga 1.3 jobDestination.append(blockSites[block])
287 slacapra 1.20 msg += "Job %s Destination: %s\n"%(str(jobCount+1),str(SE2CMS(jobDestination[jobCount])))
288 spiga 1.1 # fill jobs of block dictionary
289     jobsOfBlock[block].append(jobCount+1)
290     # reset counter
291     jobCount = jobCount + 1
292     totalEventCount = totalEventCount + filesEventCount - jobSkipEventCount
293     eventsRemaining = eventsRemaining - filesEventCount + jobSkipEventCount
294     jobSkipEventCount = 0
295     # reset file
296     pString = ""
297     parString = ""
298     filesEventCount = 0
299     newFile = 1
300     fileCount += 1
301     else :
302     # go to next file
303     newFile = 1
304     fileCount += 1
305     # if events in file equal to eventsPerJobRequested
306     elif ( filesEventCount - jobSkipEventCount == eventsPerJobRequested ) :
307     # close job and touch new file
308 spiga 1.11 fullString = parString[:-1]
309 spiga 1.1 if self.useParent==1:
310 spiga 1.11 fullParentString = pString[:-1]
311 spiga 1.49 list_of_lists.append([fullString,fullParentString,str(eventsPerJobRequested),str(jobSkipEventCount)],block)
312 spiga 1.1 else:
313 spiga 1.49 list_of_lists.append([fullString,str(eventsPerJobRequested),str(jobSkipEventCount)],block)
314 spiga 1.15 msg += "Job %s can run over %s events.\n"%(str(jobCount+1),str(eventsPerJobRequested))
315 spiga 1.3 jobDestination.append(blockSites[block])
316 slacapra 1.20 msg+= "Job %s Destination: %s\n"%(str(jobCount+1),str(SE2CMS(jobDestination[jobCount])))
317 spiga 1.1 jobsOfBlock[block].append(jobCount+1)
318     # reset counter
319     jobCount = jobCount + 1
320     totalEventCount = totalEventCount + eventsPerJobRequested
321     eventsRemaining = eventsRemaining - eventsPerJobRequested
322     jobSkipEventCount = 0
323     # reset file
324     pString = ""
325     parString = ""
326     filesEventCount = 0
327     newFile = 1
328     fileCount += 1
329    
330     # if more events in file remain than eventsPerJobRequested
331     else :
332     # close job but don't touch new file
333 spiga 1.11 fullString = parString[:-1]
334 spiga 1.1 if self.useParent==1:
335 spiga 1.11 fullParentString = pString[:-1]
336 spiga 1.49 list_of_lists.append([fullString,fullParentString,str(eventsPerJobRequested),str(jobSkipEventCount)],block)
337 spiga 1.1 else:
338 spiga 1.49 list_of_lists.append([fullString,str(eventsPerJobRequested),str(jobSkipEventCount)],block)
339 spiga 1.15 msg += "Job %s can run over %s events.\n"%(str(jobCount+1),str(eventsPerJobRequested))
340 spiga 1.3 jobDestination.append(blockSites[block])
341 slacapra 1.20 msg+= "Job %s Destination: %s\n"%(str(jobCount+1),str(SE2CMS(jobDestination[jobCount])))
342 spiga 1.1 jobsOfBlock[block].append(jobCount+1)
343     # increase counter
344     jobCount = jobCount + 1
345     totalEventCount = totalEventCount + eventsPerJobRequested
346     eventsRemaining = eventsRemaining - eventsPerJobRequested
347     # calculate skip events for last file
348     # use filesEventCount (contains several files), jobSkipEventCount and eventsPerJobRequest
349     jobSkipEventCount = eventsPerJobRequested - (filesEventCount - jobSkipEventCount - self.eventsbyfile[file])
350     # remove all but the last file
351     filesEventCount = self.eventsbyfile[file]
352 spiga 1.16 pString_tmp=''
353 spiga 1.1 if self.useParent==1:
354 spiga 1.16 for f in parent : pString_tmp += f + ','
355 ewv 1.22 pString = pString_tmp
356 spiga 1.11 parString = file + ','
357 spiga 1.1 pass # END if
358     pass # END while (iterate over files in the block)
359     pass # END while (iterate over blocks in the dataset)
360 spiga 1.15 common.logger.debug(msg)
361 spiga 1.1 self.ncjobs = self.total_number_of_jobs = jobCount
362     if (eventsRemaining > 0 and jobCount < totalNumberOfJobs ):
363 spiga 1.13 common.logger.info("Could not run on all requested events because some blocks not hosted at allowed sites.")
364     common.logger.info(str(jobCount)+" job(s) can run on "+str(totalEventCount)+" events.\n")
365 ewv 1.22
366 spiga 1.1 # skip check on block with no sites DD
367 spiga 1.5 if noBboundary == 0 : self.checkBlockNoSite(blocks,jobsOfBlock,blockSites)
368 spiga 1.1
369     # prepare dict output
370     dictOut = {}
371 spiga 1.49 dictOut['params']= ['InputFiles','MaxEvents','SkipEvents','InputBlocks']
372     if self.useParent: dictOut['params']= ['InputFiles','ParentFiles','MaxEvents','SkipEvents','InputBlocks']
373 spiga 1.1 dictOut['args'] = list_of_lists
374 spiga 1.3 dictOut['jobDestination'] = jobDestination
375 spiga 1.1 dictOut['njobs']=self.total_number_of_jobs
376    
377     return dictOut
378    
379     # keep trace of block with no sites to print a warning at the end
380    
381 ewv 1.22 def checkBlockNoSite(self,blocks,jobsOfBlock,blockSites):
382 spiga 1.1 # screen output
383     screenOutput = "List of jobs and available destination sites:\n\n"
384     noSiteBlock = []
385     bloskNoSite = []
386 spiga 1.10 allBlock = []
387 spiga 1.1
388     blockCounter = 0
389 spiga 1.39 saveFblocks =''
390 spiga 1.1 for block in blocks:
391     if block in jobsOfBlock.keys() :
392     blockCounter += 1
393 spiga 1.10 allBlock.append( blockCounter )
394 slacapra 1.19 sites=self.blackWhiteListParser.checkWhiteList(self.blackWhiteListParser.checkBlackList(blockSites[block],[block]),[block])
395 spiga 1.1 screenOutput += "Block %5i: jobs %20s: sites: %s\n" % (blockCounter,spanRanges(jobsOfBlock[block]),
396 slacapra 1.20 ', '.join(SE2CMS(sites)))
397 slacapra 1.19 if len(sites) == 0:
398 spiga 1.1 noSiteBlock.append( spanRanges(jobsOfBlock[block]) )
399     bloskNoSite.append( blockCounter )
400 spiga 1.39 else:
401     saveFblocks += str(block)+'\n'
402     writeTXTfile(self, self.fileBlocks_FileName , saveFblocks)
403 spiga 1.1
404 spiga 1.13 common.logger.info(screenOutput)
405 spiga 1.1 if len(noSiteBlock) > 0 and len(bloskNoSite) > 0:
406     msg = 'WARNING: No sites are hosting any part of data for block:\n '
407     virgola = ""
408     if len(bloskNoSite) > 1:
409     virgola = ","
410     for block in bloskNoSite:
411     msg += ' ' + str(block) + virgola
412 spiga 1.25 msg += '\n\t\tRelated jobs:\n '
413 spiga 1.1 virgola = ""
414     if len(noSiteBlock) > 1:
415     virgola = ","
416     for range_jobs in noSiteBlock:
417     msg += str(range_jobs) + virgola
418 spiga 1.25 msg += '\n\t\twill not be submitted and this block of data can not be analyzed!\n'
419 spiga 1.14 if self.cfg_params.has_key('GRID.se_white_list'):
420 spiga 1.25 msg += '\tWARNING: SE White List: '+self.cfg_params['GRID.se_white_list']+'\n'
421     msg += '\t(Hint: By whitelisting you force the job to run at this particular site(s).\n'
422     msg += '\tPlease check if the dataset is available at this site!)'
423 spiga 1.14 if self.cfg_params.has_key('GRID.ce_white_list'):
424 spiga 1.25 msg += '\tWARNING: CE White List: '+self.cfg_params['GRID.ce_white_list']+'\n'
425     msg += '\t(Hint: By whitelisting you force the job to run at this particular site(s).\n'
426     msg += '\tPlease check if the dataset is available at this site!)\n'
427 spiga 1.1
428 spiga 1.13 common.logger.info(msg)
429 spiga 1.1
430 spiga 1.10 if bloskNoSite == allBlock:
431 spiga 1.32 msg += 'Requested jobs cannot be Created! \n'
432     if self.cfg_params.has_key('GRID.se_white_list'):
433     msg += '\tWARNING: SE White List: '+self.cfg_params['GRID.se_white_list']+'\n'
434     msg += '\t(Hint: By whitelisting you force the job to run at this particular site(s).\n'
435     msg += '\tPlease check if the dataset is available at this site!)'
436     if self.cfg_params.has_key('GRID.ce_white_list'):
437     msg += '\tWARNING: CE White List: '+self.cfg_params['GRID.ce_white_list']+'\n'
438     msg += '\t(Hint: By whitelisting you force the job to run at this particular site(s).\n'
439     msg += '\tPlease check if the dataset is available at this site!)\n'
440     raise CrabException(msg)
441 spiga 1.10
442 spiga 1.1 return
443    
444    
445     ########################################################################
446 ewv 1.22 def jobSplittingByRun(self):
447 spiga 1.1 """
448     """
449    
450     self.checkUserSettings()
451 ewv 1.22 blockSites = self.args['blockSites']
452 spiga 1.4 pubdata = self.args['pubdata']
453 spiga 1.1
454     if self.selectNumberOfJobs == 0 :
455     self.theNumberOfJobs = 9999999
456     blocks = {}
457 ewv 1.22 runList = []
458 spiga 1.1 thefiles = Fileset(name='FilesToSplit')
459 spiga 1.3 fileList = pubdata.getListFiles()
460 spiga 1.1 for f in fileList:
461     block = f['Block']['Name']
462 ewv 1.22 try:
463 spiga 1.3 f['Block']['StorageElementList'].extend(blockSites[block])
464 spiga 1.1 except:
465     continue
466     wmbsFile = File(f['LogicalFileName'])
467 spiga 1.3 [ wmbsFile['locations'].add(x) for x in blockSites[block] ]
468 spiga 1.1 wmbsFile['block'] = block
469     runNum = f['RunsList'][0]['RunNumber']
470 ewv 1.22 runList.append(runNum)
471 spiga 1.1 myRun = Run(runNumber=runNum)
472     wmbsFile.addRun( myRun )
473     thefiles.addFile(
474     wmbsFile
475     )
476 ewv 1.22
477 spiga 1.1 work = Workflow()
478     subs = Subscription(
479     fileset = thefiles,
480     workflow = work,
481     split_algo = 'RunBased',
482     type = "Processing")
483     splitter = SplitterFactory()
484     jobfactory = splitter(subs)
485 ewv 1.22
486     #loop over all runs
487 spiga 1.1 list_of_lists = []
488     jobDestination = []
489 spiga 1.39 list_of_blocks = []
490 spiga 1.1 count = 0
491 spiga 1.17 for jobGroup in jobfactory():
492 spiga 1.1 if count < self.theNumberOfJobs:
493 spiga 1.17 res = self.getJobInfo(jobGroup)
494 ewv 1.22 parString = ''
495 spiga 1.1 for file in res['lfns']:
496 spiga 1.11 parString += file + ','
497 spiga 1.49 list_of_blocks.append(res['block'])
498 spiga 1.11 fullString = parString[:-1]
499 spiga 1.49 blockString=','.join(list_of_blocks)
500     list_of_lists.append([fullString,str(-1),str(0)],blockString)
501 spiga 1.2 #need to check single file location
502 ewv 1.22 jobDestination.append(res['locations'])
503 spiga 1.1 count +=1
504 ewv 1.37 # prepare dict output
505 spiga 1.1 dictOut = {}
506 spiga 1.49 dictOut['params']= ['InputFiles','MaxEvents','SkipEvents','InputBlocks']
507 spiga 1.1 dictOut['args'] = list_of_lists
508     dictOut['jobDestination'] = jobDestination
509     dictOut['njobs']=count
510 ewv 1.41 self.cacheBlocks(list_of_blocks,jobDestination)
511 spiga 1.39
512 spiga 1.1 return dictOut
513    
514     def getJobInfo( self,jobGroup ):
515     res = {}
516 ewv 1.22 lfns = []
517     locations = []
518 spiga 1.1 tmp_check=0
519     for job in jobGroup.jobs:
520     for file in job.getFiles():
521 ewv 1.22 lfns.append(file['lfn'])
522 spiga 1.1 for loc in file['locations']:
523     if tmp_check < 1 :
524     locations.append(loc)
525 spiga 1.39 res['block']= file['block']
526 ewv 1.22 tmp_check = tmp_check + 1
527     res['lfns'] = lfns
528     res['locations'] = locations
529     return res
530    
531 spiga 1.1 ########################################################################
532 spiga 1.23 def prepareSplittingNoInput(self):
533 spiga 1.1 """
534     """
535     if (self.selectEventsPerJob):
536 spiga 1.13 common.logger.info('Required '+str(self.eventsPerJob)+' events per job ')
537 spiga 1.1 if (self.selectNumberOfJobs):
538 spiga 1.13 common.logger.info('Required '+str(self.theNumberOfJobs)+' jobs in total ')
539 spiga 1.1 if (self.selectTotalNumberEvents):
540 spiga 1.13 common.logger.info('Required '+str(self.total_number_of_events)+' events in total ')
541 spiga 1.1
542     if (self.total_number_of_events < 0):
543     msg='Cannot split jobs per Events with "-1" as total number of events'
544     raise CrabException(msg)
545    
546     if (self.selectEventsPerJob):
547     if (self.selectTotalNumberEvents):
548     self.total_number_of_jobs = int(self.total_number_of_events/self.eventsPerJob)
549     elif(self.selectNumberOfJobs) :
550     self.total_number_of_jobs =self.theNumberOfJobs
551     self.total_number_of_events =int(self.theNumberOfJobs*self.eventsPerJob)
552    
553     elif (self.selectNumberOfJobs) :
554     self.total_number_of_jobs = self.theNumberOfJobs
555     self.eventsPerJob = int(self.total_number_of_events/self.total_number_of_jobs)
556    
557 spiga 1.23
558     def jobSplittingNoInput(self):
559     """
560     Perform job splitting based on number of event per job
561     """
562     common.logger.debug('Splitting per events')
563     self.checkUserSettings()
564     jobDestination=[]
565     if ( (self.selectTotalNumberEvents + self.selectEventsPerJob + self.selectNumberOfJobs) != 2 ):
566     msg = 'Must define exactly two of total_number_of_events, events_per_job, or number_of_jobs.'
567     raise CrabException(msg)
568    
569     managedGenerators =self.args['managedGenerators']
570     generator = self.args['generator']
571 ewv 1.30 firstLumi = self.cfg_params.get('CMSSW.first_lumi', 1)
572 spiga 1.23
573     self.prepareSplittingNoInput()
574    
575 spiga 1.13 common.logger.debug('N jobs '+str(self.total_number_of_jobs))
576 spiga 1.1
577     # is there any remainder?
578     check = int(self.total_number_of_events) - (int(self.total_number_of_jobs)*self.eventsPerJob)
579    
580 spiga 1.13 common.logger.debug('Check '+str(check))
581 spiga 1.1
582 spiga 1.13 common.logger.info(str(self.total_number_of_jobs)+' jobs can be created, each for '+str(self.eventsPerJob)+' for a total of '+str(self.total_number_of_jobs*self.eventsPerJob)+' events')
583 spiga 1.1 if check > 0:
584 spiga 1.13 common.logger.info('Warning: asked '+str(self.total_number_of_events)+' but can do only '+str(int(self.total_number_of_jobs)*self.eventsPerJob))
585 spiga 1.1
586     # argument is seed number.$i
587     self.list_of_args = []
588     for i in range(self.total_number_of_jobs):
589     ## Since there is no input, any site is good
590 ewv 1.29 jobDestination.append([""]) # must be empty to correctly write the XML
591 spiga 1.1 args=[]
592 ewv 1.30 if (firstLumi): # Pythia first lumi
593     args.append(str(int(firstLumi)+i))
594 spiga 1.3 if (generator in managedGenerators):
595 ewv 1.22 args.append(generator)
596     if (generator == 'comphep' and i == 0):
597 spiga 1.1 # COMPHEP is brain-dead and wants event #'s like 1,100,200,300
598     args.append('1')
599 ewv 1.22 else:
600 spiga 1.1 args.append(str(i*self.eventsPerJob))
601 spiga 1.7 args.append(str(self.eventsPerJob))
602 spiga 1.1 self.list_of_args.append(args)
603     # prepare dict output
604 spiga 1.11
605 spiga 1.1 dictOut = {}
606 spiga 1.11 dictOut['params'] = ['MaxEvents']
607 ewv 1.30 if (firstLumi):
608     dictOut['params'] = ['FirstLumi','MaxEvents']
609     if (generator in managedGenerators):
610     dictOut['params'] = ['FirstLumi', 'Generator', 'FirstEvent', 'MaxEvents']
611 ewv 1.22 else:
612     if (generator in managedGenerators) :
613     dictOut['params'] = ['Generator', 'FirstEvent', 'MaxEvents']
614 spiga 1.1 dictOut['args'] = self.list_of_args
615 spiga 1.3 dictOut['jobDestination'] = jobDestination
616 spiga 1.1 dictOut['njobs']=self.total_number_of_jobs
617    
618     return dictOut
619    
620    
621     def jobSplittingForScript(self):
622     """
623     Perform job splitting based on number of job
624     """
625     self.checkUserSettings()
626 spiga 1.3 if (self.selectNumberOfJobs == 0):
627 spiga 1.1 msg = 'must specify number_of_jobs.'
628     raise crabexception(msg)
629 spiga 1.3 jobDestination = []
630 spiga 1.13 common.logger.debug('Splitting per job')
631     common.logger.info('Required '+str(self.theNumberOfJobs)+' jobs in total ')
632 spiga 1.1
633 spiga 1.23 # self.total_number_of_jobs = self.theNumberOfJobs
634    
635     self.prepareSplittingNoInput()
636 spiga 1.1
637 spiga 1.13 common.logger.debug('N jobs '+str(self.total_number_of_jobs))
638 spiga 1.1
639 spiga 1.13 common.logger.info(str(self.total_number_of_jobs)+' jobs can be created')
640 spiga 1.1
641     # argument is seed number.$i
642 spiga 1.23 self.list_of_args = []
643 spiga 1.1 for i in range(self.total_number_of_jobs):
644 spiga 1.23 args=[]
645 spiga 1.3 jobDestination.append([""])
646 spiga 1.23 if self.eventsPerJob != 0 :
647     args.append(str(self.eventsPerJob))
648     self.list_of_args.append(args)
649 spiga 1.1
650     # prepare dict output
651     dictOut = {}
652 spiga 1.23 dictOut['params'] = ['MaxEvents']
653     dictOut['args'] = self.list_of_args
654 spiga 1.3 dictOut['jobDestination'] = jobDestination
655 spiga 1.1 dictOut['njobs']=self.total_number_of_jobs
656     return dictOut
657    
658 ewv 1.22
659     def jobSplittingByLumi(self):
660 spiga 1.1 """
661 ewv 1.26 Split task into jobs by Lumi section paying attention to which
662     lumis should be run (according to the analysis dataset).
663     This uses WMBS job splitting which does not split files over jobs
664     so the job will have AT LEAST as many lumis as requested, perhaps
665     more
666 spiga 1.1 """
667 spiga 1.46 self.useParent = int(self.cfg_params.get('CMSSW.use_parent',0))
668 ewv 1.26 common.logger.debug('Splitting by Lumi')
669 ewv 1.27 self.checkLumiSettings()
670 ewv 1.26
671     blockSites = self.args['blockSites']
672     pubdata = self.args['pubdata']
673    
674     lumisPerFile = pubdata.getLumis()
675 spiga 1.46 self.parentFiles=pubdata.getParent()
676 ewv 1.26 # Make the list of WMBS files for job splitter
677     fileList = pubdata.getListFiles()
678 ewv 1.45 wmFileList = []
679 ewv 1.26 for jobFile in fileList:
680     block = jobFile['Block']['Name']
681     try:
682     jobFile['Block']['StorageElementList'].extend(blockSites[block])
683     except:
684     continue
685     wmbsFile = File(jobFile['LogicalFileName'])
686 ewv 1.42 if not blockSites[block]:
687     wmbsFile['locations'].add('Nowhere')
688 ewv 1.26 [ wmbsFile['locations'].add(x) for x in blockSites[block] ]
689     wmbsFile['block'] = block
690     for lumi in lumisPerFile[jobFile['LogicalFileName']]:
691     wmbsFile.addRun(Run(lumi[0], lumi[1]))
692 ewv 1.45 wmFileList.append(wmbsFile)
693    
694     fileSet = set(wmFileList)
695     thefiles = Fileset(name='FilesToSplit', files = fileSet)
696 ewv 1.26
697 ewv 1.27 # Create the factory and workflow
698 ewv 1.26 work = Workflow()
699     subs = Subscription(fileset = thefiles, workflow = work,
700     split_algo = 'LumiBased', type = "Processing")
701     splitter = SplitterFactory()
702     jobFactory = splitter(subs)
703    
704     list_of_lists = []
705     jobDestination = []
706     jobCount = 0
707 ewv 1.27 lumisCreated = 0
708 spiga 1.39 list_of_blocks = []
709 ewv 1.27 if not self.limitJobLumis:
710 ewv 1.48 if self.totalNLumis > 0:
711 ewv 1.47 self.lumisPerJob = max(self.totalNLumis // self.theNumberOfJobs,1)
712     else:
713     self.lumisPerJob = pubdata.getMaxLumis() // self.theNumberOfJobs + 1
714 ewv 1.27 common.logger.info('Each job will process about %s lumis.' %
715     self.lumisPerJob)
716    
717 ewv 1.38 for jobGroup in jobFactory(lumis_per_job = self.lumisPerJob):
718 ewv 1.26 for job in jobGroup.jobs:
719 ewv 1.27 if (self.limitNJobs and jobCount >= self.theNumberOfJobs):
720 ewv 1.44 common.logger.info('Requested number of jobs reached.')
721 ewv 1.27 break
722     if (self.limitTotalLumis and lumisCreated >= self.totalNLumis):
723 ewv 1.44 common.logger.info('Requested number of lumis reached.')
724 ewv 1.27 break
725     lumis = []
726     lfns = []
727 spiga 1.46 if self.useParent==1:
728 ewv 1.47 parentlfns = []
729 spiga 1.46 pString =""
730    
731 ewv 1.27 locations = []
732 spiga 1.39 blocks = []
733 ewv 1.27 firstFile = True
734     # Collect information from all the files
735     for jobFile in job.getFiles():
736 ewv 1.34 doFile = False
737 ewv 1.27 if firstFile: # Get locations from first file in the job
738     for loc in jobFile['locations']:
739     locations.append(loc)
740 spiga 1.39 blocks.append(jobFile['block'])
741 ewv 1.27 firstFile = False
742     # Accumulate Lumis from all files
743     for lumiList in jobFile['runs']:
744     theRun = lumiList.run
745     for theLumi in list(lumiList):
746 ewv 1.35 if (not self.limitTotalLumis) or \
747 ewv 1.38 (lumisCreated < self.totalNLumis):
748 ewv 1.34 doFile = True
749 ewv 1.36 lumisCreated += 1
750 ewv 1.34 lumis.append( (theRun, theLumi) )
751     if doFile:
752     lfns.append(jobFile['lfn'])
753 spiga 1.46 if self.useParent==1:
754     parent = self.parentFiles[jobFile['lfn']]
755     for p in parent :
756     pString += p + ','
757 ewv 1.27 fileString = ','.join(lfns)
758 spiga 1.33 lumiLister = LumiList(lumis = lumis)
759     lumiString = lumiLister.getCMSSWString()
760 spiga 1.49 blockString=','.join(blocks)
761 spiga 1.46 if self.useParent==1:
762     common.logger.debug("Files: "+fileString+" with the following parents: "+pString[:-1])
763     pfileString = pString[:-1]
764 spiga 1.49 list_of_lists.append([fileString, pfileString, str(-1), str(0), lumiString,blockString])
765 ewv 1.47 else:
766 spiga 1.49 list_of_lists.append([fileString, str(-1), str(0), lumiString, blockString])
767 spiga 1.39 list_of_blocks.append(blocks)
768 ewv 1.27 jobDestination.append(locations)
769     jobCount += 1
770     common.logger.debug('Job %s will run on %s files and %s lumis '
771     % (jobCount, len(lfns), len(lumis) ))
772    
773     common.logger.info('%s jobs created to run on %s lumis' %
774     (jobCount, lumisCreated))
775 ewv 1.26
776     # Prepare dict output matching back to non-WMBS job creation
777     dictOut = {}
778 spiga 1.49 dictOut['params'] = ['InputFiles', 'MaxEvents', 'SkipEvents', 'Lumis','InputBlocks']
779 ewv 1.47 if self.useParent==1:
780 spiga 1.49 dictOut['params']= ['InputFiles','ParentFiles','MaxEvents','SkipEvents','Lumis','InputBlocks']
781 ewv 1.26 dictOut['args'] = list_of_lists
782     dictOut['jobDestination'] = jobDestination
783     dictOut['njobs'] = jobCount
784 ewv 1.41 self.cacheBlocks(list_of_blocks,jobDestination)
785 spiga 1.39
786 ewv 1.26 return dictOut
787    
788 spiga 1.39 def cacheBlocks(self, blocks,destinations):
789    
790     saveFblocks=''
791     for i in range(len(blocks)):
792     sites=self.blackWhiteListParser.checkWhiteList(self.blackWhiteListParser.checkBlackList(destinations[i]))
793     if len(sites) != 0:
794 spiga 1.40 for block in blocks[i]:
795     saveFblocks += str(block)+'\n'
796 spiga 1.39 writeTXTfile(self, self.fileBlocks_FileName , saveFblocks)
797 ewv 1.26
798 spiga 1.1 def Algos(self):
799     """
800     Define key splittingType matrix
801     """
802 ewv 1.22 SplitAlogs = {
803     'EventBased' : self.jobSplittingByEvent,
804 spiga 1.1 'RunBased' : self.jobSplittingByRun,
805 ewv 1.22 'LumiBased' : self.jobSplittingByLumi,
806     'NoInput' : self.jobSplittingNoInput,
807 spiga 1.1 'ForScript' : self.jobSplittingForScript
808 ewv 1.22 }
809 spiga 1.1 return SplitAlogs
810