ViewVC Help
View File | Revision Log | Show Annotations | Root Listing
root/cvsroot/COMP/CRAB/python/Splitter.py
Revision: 1.26
Committed: Wed Jul 29 21:42:16 2009 UTC (15 years, 9 months ago) by ewv
Content type: text/x-python
Branch: MAIN
Changes since 1.25: +132 -13 lines
Log Message:
Split ADS, still some values hard coded

File Contents

# User Rev Content
1 spiga 1.1 import common
2 ewv 1.26 from sets import Set
3 spiga 1.1 from crab_exceptions import *
4     from crab_util import *
5 ewv 1.26
6     from WMCore.DataStructs.File import File
7     from WMCore.DataStructs.Fileset import Fileset
8     from WMCore.DataStructs.Run import Run
9     from WMCore.DataStructs.Subscription import Subscription
10     from WMCore.DataStructs.Workflow import Workflow
11     from WMCore.JobSplitting.SplitterFactory import SplitterFactory
12 spiga 1.1 from WMCore.SiteScreening.BlackWhiteListParser import SEBlackWhiteListParser
13    
14     class JobSplitter:
15     def __init__( self, cfg_params, args ):
16     self.cfg_params = cfg_params
17 spiga 1.3 self.args=args
18 spiga 1.1 #self.maxEvents
19     # init BlackWhiteListParser
20 spiga 1.24 self.seWhiteList = cfg_params.get('GRID.se_white_list',[])
21 spiga 1.14 seBlackList = cfg_params.get('GRID.se_black_list',[])
22 spiga 1.24 self.blackWhiteListParser = SEBlackWhiteListParser(self.seWhiteList, seBlackList, common.logger())
23 spiga 1.1
24    
25     def checkUserSettings(self):
26     ## Events per job
27     if self.cfg_params.has_key('CMSSW.events_per_job'):
28     self.eventsPerJob =int( self.cfg_params['CMSSW.events_per_job'])
29     self.selectEventsPerJob = 1
30     else:
31     self.eventsPerJob = -1
32     self.selectEventsPerJob = 0
33    
34     ## number of jobs
35     if self.cfg_params.has_key('CMSSW.number_of_jobs'):
36     self.theNumberOfJobs =int( self.cfg_params['CMSSW.number_of_jobs'])
37     self.selectNumberOfJobs = 1
38     else:
39     self.theNumberOfJobs = 0
40     self.selectNumberOfJobs = 0
41    
42     if self.cfg_params.has_key('CMSSW.total_number_of_events'):
43     self.total_number_of_events = int(self.cfg_params['CMSSW.total_number_of_events'])
44     self.selectTotalNumberEvents = 1
45     if self.selectNumberOfJobs == 1:
46     if (self.total_number_of_events != -1) and int(self.total_number_of_events) < int(self.theNumberOfJobs):
47     msg = 'Must specify at least one event per job. total_number_of_events > number_of_jobs '
48     raise CrabException(msg)
49     else:
50     self.total_number_of_events = 0
51     self.selectTotalNumberEvents = 0
52    
53    
54 spiga 1.24 def ComputeSubBlockSites( self, blockSites ):
55     """
56     """
57     sub_blockSites = {}
58     for k,v in blockSites.iteritems():
59     sites=self.blackWhiteListParser.checkWhiteList(v)
60     if sites : sub_blockSites[k]=v
61     if len(sub_blockSites) < 1:
62     msg = 'WARNING: the sites %s is not hosting any part of data.'%self.seWhiteList
63     raise CrabException(msg)
64     return sub_blockSites
65    
66 spiga 1.1 ########################################################################
67     def jobSplittingByEvent( self ):
68     """
69     Perform job splitting. Jobs run over an integer number of files
70     and no more than one block.
71     ARGUMENT: blockSites: dictionary with blocks as keys and list of host sites as values
72     REQUIRES: self.selectTotalNumberEvents, self.selectEventsPerJob, self.selectNumberofJobs,
73     self.total_number_of_events, self.eventsPerJob, self.theNumberOfJobs,
74     self.maxEvents, self.filesbyblock
75 spiga 1.3 SETS: jobDestination - Site destination(s) for each job (a list of lists)
76 spiga 1.1 self.total_number_of_jobs - Total # of jobs
77     self.list_of_args - File(s) job will run on (a list of lists)
78     """
79    
80 ewv 1.22 jobDestination=[]
81 spiga 1.1 self.checkUserSettings()
82     if ( (self.selectTotalNumberEvents + self.selectEventsPerJob + self.selectNumberOfJobs) != 2 ):
83     msg = 'Must define exactly two of total_number_of_events, events_per_job, or number_of_jobs.'
84     raise CrabException(msg)
85 ewv 1.22
86     blockSites = self.args['blockSites']
87 spiga 1.4 pubdata = self.args['pubdata']
88 spiga 1.3 filesbyblock=pubdata.getFiles()
89    
90     self.eventsbyblock=pubdata.getEventsPerBlock()
91     self.eventsbyfile=pubdata.getEventsPerFile()
92     self.parentFiles=pubdata.getParent()
93 spiga 1.1
94     ## get max number of events
95 spiga 1.3 self.maxEvents=pubdata.getMaxEvents()
96 spiga 1.1
97     self.useParent = int(self.cfg_params.get('CMSSW.use_parent',0))
98     noBboundary = int(self.cfg_params.get('CMSSW.no_block_boundary',0))
99    
100 spiga 1.24 if noBboundary == 1:
101     if self.total_number_of_events== -1:
102     msg = 'You are selecting no_block_boundary=1 which does not allow to set total_number_of_events=-1\n'
103 ewv 1.26 msg +='\tYou shoud get the number of event from DBS web interface and use it for your configuration.'
104     raise CrabException(msg)
105 spiga 1.25 if len(self.seWhiteList) == 0 or len(self.seWhiteList.split(',')) != 1:
106 spiga 1.24 msg = 'You are selecting no_block_boundary=1 which requires to choose one and only one site.\n'
107     msg += "\tPlease set se_white_list with the site's storage element name."
108 ewv 1.26 raise CrabException(msg)
109     blockSites = self.ComputeSubBlockSites(blockSites)
110 spiga 1.24
111 spiga 1.1 # ---- Handle the possible job splitting configurations ---- #
112     if (self.selectTotalNumberEvents):
113     totalEventsRequested = self.total_number_of_events
114     if (self.selectEventsPerJob):
115     eventsPerJobRequested = self.eventsPerJob
116     if (self.selectNumberOfJobs):
117     totalEventsRequested = self.theNumberOfJobs * self.eventsPerJob
118    
119     # If user requested all the events in the dataset
120     if (totalEventsRequested == -1):
121     eventsRemaining=self.maxEvents
122     # If user requested more events than are in the dataset
123     elif (totalEventsRequested > self.maxEvents):
124     eventsRemaining = self.maxEvents
125 spiga 1.13 common.logger.info("Requested "+str(self.total_number_of_events)+ " events, but only "+str(self.maxEvents)+" events are available.")
126 spiga 1.1 # If user requested less events than are in the dataset
127     else:
128     eventsRemaining = totalEventsRequested
129    
130     # If user requested more events per job than are in the dataset
131     if (self.selectEventsPerJob and eventsPerJobRequested > self.maxEvents):
132     eventsPerJobRequested = self.maxEvents
133    
134     # For user info at end
135     totalEventCount = 0
136    
137     if (self.selectTotalNumberEvents and self.selectNumberOfJobs):
138     eventsPerJobRequested = int(eventsRemaining/self.theNumberOfJobs)
139    
140     if (self.selectNumberOfJobs):
141 spiga 1.13 common.logger.info("May not create the exact number_of_jobs requested.")
142 spiga 1.1
143     # old... to remove Daniele
144     totalNumberOfJobs = 999999999
145    
146 spiga 1.3 blocks = blockSites.keys()
147 spiga 1.1 blockCount = 0
148     # Backup variable in case self.maxEvents counted events in a non-included block
149     numBlocksInDataset = len(blocks)
150    
151     jobCount = 0
152     list_of_lists = []
153    
154     # list tracking which jobs are in which jobs belong to which block
155     jobsOfBlock = {}
156    
157     parString = ""
158 spiga 1.16 pString = ""
159 spiga 1.1 filesEventCount = 0
160 ewv 1.22 msg=''
161 spiga 1.1
162     # ---- Iterate over the blocks in the dataset until ---- #
163     # ---- we've met the requested total # of events ---- #
164     while ( (eventsRemaining > 0) and (blockCount < numBlocksInDataset) and (jobCount < totalNumberOfJobs)):
165     block = blocks[blockCount]
166     blockCount += 1
167     if block not in jobsOfBlock.keys() :
168     jobsOfBlock[block] = []
169    
170     if self.eventsbyblock.has_key(block) :
171     numEventsInBlock = self.eventsbyblock[block]
172 spiga 1.13 common.logger.debug('Events in Block File '+str(numEventsInBlock))
173 spiga 1.1
174 spiga 1.4 files = filesbyblock[block]
175 spiga 1.1 numFilesInBlock = len(files)
176     if (numFilesInBlock <= 0):
177     continue
178     fileCount = 0
179     if noBboundary == 0: # DD
180     # ---- New block => New job ---- #
181     parString = ""
182 spiga 1.16 pString=""
183 spiga 1.1 # counter for number of events in files currently worked on
184     filesEventCount = 0
185     # flag if next while loop should touch new file
186     newFile = 1
187     # job event counter
188     jobSkipEventCount = 0
189    
190     # ---- Iterate over the files in the block until we've met the requested ---- #
191     # ---- total # of events or we've gone over all the files in this block ---- #
192 spiga 1.15 msg='\n'
193 spiga 1.1 while ( (eventsRemaining > 0) and (fileCount < numFilesInBlock) and (jobCount < totalNumberOfJobs) ):
194     file = files[fileCount]
195     if self.useParent==1:
196     parent = self.parentFiles[file]
197 spiga 1.13 common.logger.log(10-1, "File "+str(file)+" has the following parents: "+str(parent))
198 spiga 1.1 if newFile :
199     try:
200     numEventsInFile = self.eventsbyfile[file]
201 spiga 1.13 common.logger.log(10-1, "File "+str(file)+" has "+str(numEventsInFile)+" events")
202 spiga 1.1 # increase filesEventCount
203     filesEventCount += numEventsInFile
204     # Add file to current job
205 spiga 1.11 parString += file + ','
206 spiga 1.16 if self.useParent==1:
207     for f in parent :
208     pString += f + ','
209 spiga 1.1 newFile = 0
210     except KeyError:
211 spiga 1.13 common.logger.info("File "+str(file)+" has unknown number of events: skipping")
212 spiga 1.1
213     eventsPerJobRequested = min(eventsPerJobRequested, eventsRemaining)
214     # if less events in file remain than eventsPerJobRequested
215     if ( filesEventCount - jobSkipEventCount < eventsPerJobRequested):
216     if noBboundary == 1: ## DD
217     newFile = 1
218     fileCount += 1
219     else:
220     # if last file in block
221     if ( fileCount == numFilesInBlock-1 ) :
222     # end job using last file, use remaining events in block
223     # close job and touch new file
224 spiga 1.11 fullString = parString[:-1]
225 spiga 1.1 if self.useParent==1:
226 spiga 1.11 fullParentString = pString[:-1]
227 spiga 1.1 list_of_lists.append([fullString,fullParentString,str(-1),str(jobSkipEventCount)])
228     else:
229     list_of_lists.append([fullString,str(-1),str(jobSkipEventCount)])
230 spiga 1.15 msg += "Job %s can run over %s events (last file in block).\n"%(str(jobCount+1), str(filesEventCount - jobSkipEventCount))
231 spiga 1.3 jobDestination.append(blockSites[block])
232 slacapra 1.20 msg += "Job %s Destination: %s\n"%(str(jobCount+1),str(SE2CMS(jobDestination[jobCount])))
233 spiga 1.1 # fill jobs of block dictionary
234     jobsOfBlock[block].append(jobCount+1)
235     # reset counter
236     jobCount = jobCount + 1
237     totalEventCount = totalEventCount + filesEventCount - jobSkipEventCount
238     eventsRemaining = eventsRemaining - filesEventCount + jobSkipEventCount
239     jobSkipEventCount = 0
240     # reset file
241     pString = ""
242     parString = ""
243     filesEventCount = 0
244     newFile = 1
245     fileCount += 1
246     else :
247     # go to next file
248     newFile = 1
249     fileCount += 1
250     # if events in file equal to eventsPerJobRequested
251     elif ( filesEventCount - jobSkipEventCount == eventsPerJobRequested ) :
252     # close job and touch new file
253 spiga 1.11 fullString = parString[:-1]
254 spiga 1.1 if self.useParent==1:
255 spiga 1.11 fullParentString = pString[:-1]
256 spiga 1.1 list_of_lists.append([fullString,fullParentString,str(eventsPerJobRequested),str(jobSkipEventCount)])
257     else:
258     list_of_lists.append([fullString,str(eventsPerJobRequested),str(jobSkipEventCount)])
259 spiga 1.15 msg += "Job %s can run over %s events.\n"%(str(jobCount+1),str(eventsPerJobRequested))
260 spiga 1.3 jobDestination.append(blockSites[block])
261 slacapra 1.20 msg+= "Job %s Destination: %s\n"%(str(jobCount+1),str(SE2CMS(jobDestination[jobCount])))
262 spiga 1.1 jobsOfBlock[block].append(jobCount+1)
263     # reset counter
264     jobCount = jobCount + 1
265     totalEventCount = totalEventCount + eventsPerJobRequested
266     eventsRemaining = eventsRemaining - eventsPerJobRequested
267     jobSkipEventCount = 0
268     # reset file
269     pString = ""
270     parString = ""
271     filesEventCount = 0
272     newFile = 1
273     fileCount += 1
274    
275     # if more events in file remain than eventsPerJobRequested
276     else :
277     # close job but don't touch new file
278 spiga 1.11 fullString = parString[:-1]
279 spiga 1.1 if self.useParent==1:
280 spiga 1.11 fullParentString = pString[:-1]
281 spiga 1.1 list_of_lists.append([fullString,fullParentString,str(eventsPerJobRequested),str(jobSkipEventCount)])
282     else:
283     list_of_lists.append([fullString,str(eventsPerJobRequested),str(jobSkipEventCount)])
284 spiga 1.15 msg += "Job %s can run over %s events.\n"%(str(jobCount+1),str(eventsPerJobRequested))
285 spiga 1.3 jobDestination.append(blockSites[block])
286 slacapra 1.20 msg+= "Job %s Destination: %s\n"%(str(jobCount+1),str(SE2CMS(jobDestination[jobCount])))
287 spiga 1.1 jobsOfBlock[block].append(jobCount+1)
288     # increase counter
289     jobCount = jobCount + 1
290     totalEventCount = totalEventCount + eventsPerJobRequested
291     eventsRemaining = eventsRemaining - eventsPerJobRequested
292     # calculate skip events for last file
293     # use filesEventCount (contains several files), jobSkipEventCount and eventsPerJobRequest
294     jobSkipEventCount = eventsPerJobRequested - (filesEventCount - jobSkipEventCount - self.eventsbyfile[file])
295     # remove all but the last file
296     filesEventCount = self.eventsbyfile[file]
297 spiga 1.16 pString_tmp=''
298 spiga 1.1 if self.useParent==1:
299 spiga 1.16 for f in parent : pString_tmp += f + ','
300 ewv 1.22 pString = pString_tmp
301 spiga 1.11 parString = file + ','
302 spiga 1.1 pass # END if
303     pass # END while (iterate over files in the block)
304     pass # END while (iterate over blocks in the dataset)
305 spiga 1.15 common.logger.debug(msg)
306 spiga 1.1 self.ncjobs = self.total_number_of_jobs = jobCount
307     if (eventsRemaining > 0 and jobCount < totalNumberOfJobs ):
308 spiga 1.13 common.logger.info("Could not run on all requested events because some blocks not hosted at allowed sites.")
309     common.logger.info(str(jobCount)+" job(s) can run on "+str(totalEventCount)+" events.\n")
310 ewv 1.22
311 spiga 1.1 # skip check on block with no sites DD
312 spiga 1.5 if noBboundary == 0 : self.checkBlockNoSite(blocks,jobsOfBlock,blockSites)
313 spiga 1.1
314     # prepare dict output
315     dictOut = {}
316 spiga 1.11 dictOut['params']= ['InputFiles','MaxEvents','SkipEvents']
317     if self.useParent: dictOut['params']= ['InputFiles','ParentFiles','MaxEvents','SkipEvents']
318 spiga 1.1 dictOut['args'] = list_of_lists
319 spiga 1.3 dictOut['jobDestination'] = jobDestination
320 spiga 1.1 dictOut['njobs']=self.total_number_of_jobs
321    
322     return dictOut
323    
324     # keep trace of block with no sites to print a warning at the end
325    
326 ewv 1.22 def checkBlockNoSite(self,blocks,jobsOfBlock,blockSites):
327 spiga 1.1 # screen output
328     screenOutput = "List of jobs and available destination sites:\n\n"
329     noSiteBlock = []
330     bloskNoSite = []
331 spiga 1.10 allBlock = []
332 spiga 1.1
333     blockCounter = 0
334     for block in blocks:
335     if block in jobsOfBlock.keys() :
336     blockCounter += 1
337 spiga 1.10 allBlock.append( blockCounter )
338 slacapra 1.19 sites=self.blackWhiteListParser.checkWhiteList(self.blackWhiteListParser.checkBlackList(blockSites[block],[block]),[block])
339 spiga 1.1 screenOutput += "Block %5i: jobs %20s: sites: %s\n" % (blockCounter,spanRanges(jobsOfBlock[block]),
340 slacapra 1.20 ', '.join(SE2CMS(sites)))
341 slacapra 1.19 if len(sites) == 0:
342 spiga 1.1 noSiteBlock.append( spanRanges(jobsOfBlock[block]) )
343     bloskNoSite.append( blockCounter )
344    
345 spiga 1.13 common.logger.info(screenOutput)
346 spiga 1.1 if len(noSiteBlock) > 0 and len(bloskNoSite) > 0:
347     msg = 'WARNING: No sites are hosting any part of data for block:\n '
348     virgola = ""
349     if len(bloskNoSite) > 1:
350     virgola = ","
351     for block in bloskNoSite:
352     msg += ' ' + str(block) + virgola
353 spiga 1.25 msg += '\n\t\tRelated jobs:\n '
354 spiga 1.1 virgola = ""
355     if len(noSiteBlock) > 1:
356     virgola = ","
357     for range_jobs in noSiteBlock:
358     msg += str(range_jobs) + virgola
359 spiga 1.25 msg += '\n\t\twill not be submitted and this block of data can not be analyzed!\n'
360 spiga 1.14 if self.cfg_params.has_key('GRID.se_white_list'):
361 spiga 1.25 msg += '\tWARNING: SE White List: '+self.cfg_params['GRID.se_white_list']+'\n'
362     msg += '\t(Hint: By whitelisting you force the job to run at this particular site(s).\n'
363     msg += '\tPlease check if the dataset is available at this site!)'
364 spiga 1.14 if self.cfg_params.has_key('GRID.ce_white_list'):
365 spiga 1.25 msg += '\tWARNING: CE White List: '+self.cfg_params['GRID.ce_white_list']+'\n'
366     msg += '\t(Hint: By whitelisting you force the job to run at this particular site(s).\n'
367     msg += '\tPlease check if the dataset is available at this site!)\n'
368 spiga 1.1
369 spiga 1.13 common.logger.info(msg)
370 spiga 1.1
371 spiga 1.10 if bloskNoSite == allBlock:
372 ewv 1.22 raise CrabException('No jobs created')
373 spiga 1.10
374 spiga 1.1 return
375    
376    
377     ########################################################################
378 ewv 1.22 def jobSplittingByRun(self):
379 spiga 1.1 """
380     """
381    
382     self.checkUserSettings()
383 ewv 1.22 blockSites = self.args['blockSites']
384 spiga 1.4 pubdata = self.args['pubdata']
385 spiga 1.1
386     if self.selectNumberOfJobs == 0 :
387     self.theNumberOfJobs = 9999999
388     blocks = {}
389 ewv 1.22 runList = []
390 spiga 1.1 thefiles = Fileset(name='FilesToSplit')
391 spiga 1.3 fileList = pubdata.getListFiles()
392 spiga 1.1 for f in fileList:
393     block = f['Block']['Name']
394 ewv 1.22 try:
395 spiga 1.3 f['Block']['StorageElementList'].extend(blockSites[block])
396 spiga 1.1 except:
397     continue
398     wmbsFile = File(f['LogicalFileName'])
399 spiga 1.3 [ wmbsFile['locations'].add(x) for x in blockSites[block] ]
400 spiga 1.1 wmbsFile['block'] = block
401     runNum = f['RunsList'][0]['RunNumber']
402 ewv 1.22 runList.append(runNum)
403 spiga 1.1 myRun = Run(runNumber=runNum)
404     wmbsFile.addRun( myRun )
405     thefiles.addFile(
406     wmbsFile
407     )
408 ewv 1.22
409 spiga 1.1 work = Workflow()
410     subs = Subscription(
411     fileset = thefiles,
412     workflow = work,
413     split_algo = 'RunBased',
414     type = "Processing")
415     splitter = SplitterFactory()
416     jobfactory = splitter(subs)
417 ewv 1.22
418     #loop over all runs
419 spiga 1.1 set = Set(runList)
420     list_of_lists = []
421     jobDestination = []
422     count = 0
423 spiga 1.17 for jobGroup in jobfactory():
424 spiga 1.1 if count < self.theNumberOfJobs:
425 spiga 1.17 res = self.getJobInfo(jobGroup)
426 ewv 1.22 parString = ''
427 spiga 1.1 for file in res['lfns']:
428 spiga 1.11 parString += file + ','
429     fullString = parString[:-1]
430 ewv 1.22 list_of_lists.append([fullString,str(-1),str(0)])
431 spiga 1.2 #need to check single file location
432 ewv 1.22 jobDestination.append(res['locations'])
433 spiga 1.1 count +=1
434     # prepare dict output
435     dictOut = {}
436 spiga 1.11 dictOut['params']= ['InputFiles','MaxEvents','SkipEvents']
437 spiga 1.1 dictOut['args'] = list_of_lists
438     dictOut['jobDestination'] = jobDestination
439     dictOut['njobs']=count
440    
441     return dictOut
442    
443     def getJobInfo( self,jobGroup ):
444     res = {}
445 ewv 1.22 lfns = []
446     locations = []
447 spiga 1.1 tmp_check=0
448     for job in jobGroup.jobs:
449     for file in job.getFiles():
450 ewv 1.22 lfns.append(file['lfn'])
451 spiga 1.1 for loc in file['locations']:
452     if tmp_check < 1 :
453     locations.append(loc)
454 ewv 1.22 tmp_check = tmp_check + 1
455     ### qui va messo il check per la locations
456     res['lfns'] = lfns
457     res['locations'] = locations
458     return res
459    
460 spiga 1.1 ########################################################################
461 spiga 1.23 def prepareSplittingNoInput(self):
462 spiga 1.1 """
463     """
464     if (self.selectEventsPerJob):
465 spiga 1.13 common.logger.info('Required '+str(self.eventsPerJob)+' events per job ')
466 spiga 1.1 if (self.selectNumberOfJobs):
467 spiga 1.13 common.logger.info('Required '+str(self.theNumberOfJobs)+' jobs in total ')
468 spiga 1.1 if (self.selectTotalNumberEvents):
469 spiga 1.13 common.logger.info('Required '+str(self.total_number_of_events)+' events in total ')
470 spiga 1.1
471     if (self.total_number_of_events < 0):
472     msg='Cannot split jobs per Events with "-1" as total number of events'
473     raise CrabException(msg)
474    
475     if (self.selectEventsPerJob):
476     if (self.selectTotalNumberEvents):
477     self.total_number_of_jobs = int(self.total_number_of_events/self.eventsPerJob)
478     elif(self.selectNumberOfJobs) :
479     self.total_number_of_jobs =self.theNumberOfJobs
480     self.total_number_of_events =int(self.theNumberOfJobs*self.eventsPerJob)
481    
482     elif (self.selectNumberOfJobs) :
483     self.total_number_of_jobs = self.theNumberOfJobs
484     self.eventsPerJob = int(self.total_number_of_events/self.total_number_of_jobs)
485    
486 spiga 1.23
487     def jobSplittingNoInput(self):
488     """
489     Perform job splitting based on number of event per job
490     """
491     common.logger.debug('Splitting per events')
492     self.checkUserSettings()
493     jobDestination=[]
494     if ( (self.selectTotalNumberEvents + self.selectEventsPerJob + self.selectNumberOfJobs) != 2 ):
495     msg = 'Must define exactly two of total_number_of_events, events_per_job, or number_of_jobs.'
496     raise CrabException(msg)
497    
498     managedGenerators =self.args['managedGenerators']
499     generator = self.args['generator']
500     firstRun = self.cfg_params.get('CMSSW.first_run',None)
501    
502     self.prepareSplittingNoInput()
503    
504 spiga 1.13 common.logger.debug('N jobs '+str(self.total_number_of_jobs))
505 spiga 1.1
506     # is there any remainder?
507     check = int(self.total_number_of_events) - (int(self.total_number_of_jobs)*self.eventsPerJob)
508    
509 spiga 1.13 common.logger.debug('Check '+str(check))
510 spiga 1.1
511 spiga 1.13 common.logger.info(str(self.total_number_of_jobs)+' jobs can be created, each for '+str(self.eventsPerJob)+' for a total of '+str(self.total_number_of_jobs*self.eventsPerJob)+' events')
512 spiga 1.1 if check > 0:
513 spiga 1.13 common.logger.info('Warning: asked '+str(self.total_number_of_events)+' but can do only '+str(int(self.total_number_of_jobs)*self.eventsPerJob))
514 spiga 1.1
515     # argument is seed number.$i
516     self.list_of_args = []
517     for i in range(self.total_number_of_jobs):
518     ## Since there is no input, any site is good
519 spiga 1.3 jobDestination.append([""]) #must be empty to write correctly the xml
520 spiga 1.1 args=[]
521 spiga 1.3 if (firstRun):
522 spiga 1.1 ## pythia first run
523 spiga 1.3 args.append(str(firstRun)+str(i))
524     if (generator in managedGenerators):
525 ewv 1.22 args.append(generator)
526     if (generator == 'comphep' and i == 0):
527 spiga 1.1 # COMPHEP is brain-dead and wants event #'s like 1,100,200,300
528     args.append('1')
529 ewv 1.22 else:
530 spiga 1.1 args.append(str(i*self.eventsPerJob))
531 spiga 1.7 args.append(str(self.eventsPerJob))
532 spiga 1.1 self.list_of_args.append(args)
533     # prepare dict output
534 spiga 1.11
535 spiga 1.1 dictOut = {}
536 spiga 1.11 dictOut['params'] = ['MaxEvents']
537     if (firstRun):
538     dictOut['params'] = ['FirstRun','MaxEvents']
539 ewv 1.22 if ( generator in managedGenerators ) :
540     dictOut['params'] = ['FirstRun', 'Generator', 'FirstEvent', 'MaxEvents']
541     else:
542     if (generator in managedGenerators) :
543     dictOut['params'] = ['Generator', 'FirstEvent', 'MaxEvents']
544 spiga 1.1 dictOut['args'] = self.list_of_args
545 spiga 1.3 dictOut['jobDestination'] = jobDestination
546 spiga 1.1 dictOut['njobs']=self.total_number_of_jobs
547    
548     return dictOut
549    
550    
551     def jobSplittingForScript(self):
552     """
553     Perform job splitting based on number of job
554     """
555     self.checkUserSettings()
556 spiga 1.3 if (self.selectNumberOfJobs == 0):
557 spiga 1.1 msg = 'must specify number_of_jobs.'
558     raise crabexception(msg)
559 spiga 1.3 jobDestination = []
560 spiga 1.13 common.logger.debug('Splitting per job')
561     common.logger.info('Required '+str(self.theNumberOfJobs)+' jobs in total ')
562 spiga 1.1
563 spiga 1.23 # self.total_number_of_jobs = self.theNumberOfJobs
564    
565     self.prepareSplittingNoInput()
566 spiga 1.1
567 spiga 1.13 common.logger.debug('N jobs '+str(self.total_number_of_jobs))
568 spiga 1.1
569 spiga 1.13 common.logger.info(str(self.total_number_of_jobs)+' jobs can be created')
570 spiga 1.1
571     # argument is seed number.$i
572 spiga 1.23 self.list_of_args = []
573 spiga 1.1 for i in range(self.total_number_of_jobs):
574 spiga 1.23 args=[]
575 spiga 1.3 jobDestination.append([""])
576 spiga 1.23 if self.eventsPerJob != 0 :
577     args.append(str(self.eventsPerJob))
578     self.list_of_args.append(args)
579 spiga 1.1
580     # prepare dict output
581     dictOut = {}
582 spiga 1.23 dictOut['params'] = ['MaxEvents']
583     dictOut['args'] = self.list_of_args
584 spiga 1.3 dictOut['jobDestination'] = jobDestination
585 spiga 1.1 dictOut['njobs']=self.total_number_of_jobs
586     return dictOut
587    
588 ewv 1.22
589     def jobSplittingByLumi(self):
590 spiga 1.1 """
591 ewv 1.26 Split task into jobs by Lumi section paying attention to which
592     lumis should be run (according to the analysis dataset).
593     This uses WMBS job splitting which does not split files over jobs
594     so the job will have AT LEAST as many lumis as requested, perhaps
595     more
596 spiga 1.1 """
597 ewv 1.26
598     common.logger.debug('Splitting by Lumi')
599     self.checkUserSettings() # FIXME need one for lumis
600    
601     blockSites = self.args['blockSites']
602     pubdata = self.args['pubdata']
603    
604     if self.selectNumberOfJobs == 0 :
605     self.theNumberOfJobs = 9999999
606     lumisPerFile = pubdata.getLumis()
607    
608     # Make the list of WMBS files for job splitter
609     fileList = pubdata.getListFiles()
610     thefiles = Fileset(name='FilesToSplit')
611     for jobFile in fileList:
612     block = jobFile['Block']['Name']
613     try:
614     jobFile['Block']['StorageElementList'].extend(blockSites[block])
615     except:
616     continue
617     wmbsFile = File(jobFile['LogicalFileName'])
618     [ wmbsFile['locations'].add(x) for x in blockSites[block] ]
619     wmbsFile['block'] = block
620     for lumi in lumisPerFile[jobFile['LogicalFileName']]:
621     wmbsFile.addRun(Run(lumi[0], lumi[1]))
622     thefiles.addFile(wmbsFile)
623    
624     work = Workflow()
625     subs = Subscription(fileset = thefiles, workflow = work,
626     split_algo = 'LumiBased', type = "Processing")
627     splitter = SplitterFactory()
628     jobFactory = splitter(subs)
629    
630     list_of_lists = []
631     jobDestination = []
632     jobCount = 0
633     self.theNumberOfJobs = 20 #FIXME
634     for jobGroup in jobFactory(lumis_per_job = 50): #FIXME
635     for job in jobGroup.jobs:
636     if jobCount < self.theNumberOfJobs:
637     lumis = []
638     lfns = []
639     locations = []
640     firstFile = True
641     # Collect information from all the files
642     for jobFile in job.getFiles():
643     if firstFile: # Get locations from first file in the job
644     for loc in jobFile['locations']:
645     locations.append(loc)
646     firstFile = False
647     # Accumulate Lumis from all files
648     for lumiList in jobFile['runs']:
649     theRun = lumiList.run
650     for theLumi in list(lumiList):
651     lumis.append( (theRun, theLumi) )
652    
653     lfns.append(jobFile['lfn'])
654     fileString = ','.join(lfns)
655     lumiString = compressLumiString(lumis)
656     common.logger.debug('Job %s will run on %s files and %s lumis '
657     % (jobCount, len(lfns), len(lumis) ))
658     list_of_lists.append([fileString, str(-1), str(0), lumiString])
659    
660     jobDestination.append(locations)
661     jobCount += 1
662     # Prepare dict output matching back to non-WMBS job creation
663     dictOut = {}
664     dictOut['params'] = ['InputFiles', 'MaxEvents', 'SkipEvents', 'Lumis']
665     dictOut['args'] = list_of_lists
666     dictOut['jobDestination'] = jobDestination
667     dictOut['njobs'] = jobCount
668    
669     return dictOut
670    
671    
672 spiga 1.1 def Algos(self):
673     """
674     Define key splittingType matrix
675     """
676 ewv 1.22 SplitAlogs = {
677     'EventBased' : self.jobSplittingByEvent,
678 spiga 1.1 'RunBased' : self.jobSplittingByRun,
679 ewv 1.22 'LumiBased' : self.jobSplittingByLumi,
680     'NoInput' : self.jobSplittingNoInput,
681 spiga 1.1 'ForScript' : self.jobSplittingForScript
682 ewv 1.22 }
683 spiga 1.1 return SplitAlogs
684    
685 ewv 1.26
686    
687     def compressLumiString(lumis):
688     """
689     Turn a list of 2-tuples of run/lumi numbers into a list of the format
690     R1:L1,R2:L2-R3:L3 which is acceptable to CMSSW LumiBlockRange variable
691     """
692    
693     lumis.sort()
694     parts = []
695     startRange = None
696     endRange = None
697    
698     for lumiBlock in lumis:
699     if not startRange: # This is the first one
700     startRange = lumiBlock
701     endRange = lumiBlock
702     elif lumiBlock == endRange: # Same Lumi (different files?)
703     pass
704     elif lumiBlock[0] == endRange[0] and lumiBlock[1] == endRange[1] + 1: # This is a continuation
705     endRange = lumiBlock
706     else: # This is the start of a new range
707     part = ':'.join(map(str, startRange))
708     if startRange != endRange:
709     part += '-' + ':'.join(map(str, endRange))
710     parts.append(part)
711     startRange = lumiBlock
712     endRange = lumiBlock
713    
714     # Put out what's left
715     if startRange:
716     part = ':'.join(map(str, startRange))
717     if startRange != endRange:
718     part += '-' + ':'.join(map(str, endRange))
719     parts.append(part)
720    
721     output = ','.join(parts)
722     return output
723    
724