ViewVC Help
View File | Revision Log | Show Annotations | Root Listing
root/cvsroot/COMP/CRAB/python/Splitter.py
Revision: 1.61
Committed: Thu Sep 5 14:42:35 2013 UTC (11 years, 7 months ago) by belforte
Content type: text/x-python
Branch: MAIN
CVS Tags: CRAB_2_9_1, CRAB_2_9_1_pre2, CRAB_2_9_1_pre1, HEAD
Changes since 1.60: +17 -4 lines
Log Message:
make sure T1's are blacklisted as needed https://savannah.cern.ch/bugs/index.php?102400

File Contents

# Content
1
2 __revision__ = "$Id: Splitter.py,v 1.60 2013/05/22 15:57:41 belforte Exp $"
3 __version__ = "$Revision: 1.60 $"
4
5 import common
6 from crab_exceptions import *
7 from crab_util import *
8
9 from WMCore.DataStructs.File import File
10 from WMCore.DataStructs.Fileset import Fileset
11 from WMCore.DataStructs.Run import Run
12 from WMCore.DataStructs.Subscription import Subscription
13 from WMCore.DataStructs.Workflow import Workflow
14 from WMCore.JobSplitting.SplitterFactory import SplitterFactory
15 from WMCore.SiteScreening.BlackWhiteListParser import SEBlackWhiteListParser
16 try: # Can remove when CMSSW 3.7 and earlier are dropped
17 from FWCore.PythonUtilities.LumiList import LumiList
18 except ImportError:
19 from LumiList import LumiList
20
21 class JobSplitter:
22 def __init__( self, cfg_params, args ):
23
24 self.cfg_params = cfg_params
25 self.args=args
26
27 self.lumisPerJob = -1
28 self.totalNLumis = 0
29 self.theNumberOfJobs = 0
30 self.limitNJobs = False
31 self.limitTotalLumis = False
32 self.limitJobLumis = False
33
34 #self.maxEvents
35 # init BlackWhiteListParser
36 self.seWhiteList = cfg_params.get('GRID.se_white_list',[])
37 if type(self.seWhiteList) == type("string"):
38 self.seWhiteList = self.seWhiteList.split(',')
39 seBlackList = cfg_params.get('GRID.se_black_list',[])
40 if type(seBlackList) == type("string"):
41 seBlackList = seBlackList.split(',')
42 if common.scheduler.name().upper() == 'REMOTEGLIDEIN' :
43 # use central black list
44 removeBList = cfg_params.get("GRID.remove_default_blacklist", 0 )
45 blackAnaOps = None
46 if int(removeBList) == 0:
47 blacklist = Downloader("http://cmsdoc.cern.ch/cms/LCG/crab/config/")
48 result = blacklist.config("site_black_list.conf").strip().split(',')
49 if result != None:
50 blackAnaOps = result
51 common.logger.debug("Enforced black list: %s "%blackAnaOps)
52 else:
53 common.logger.info("WARNING: Skipping default black list!")
54 if int(removeBList) == 0 and blackAnaOps:
55 seBlackList += blackAnaOps
56
57 self.blackWhiteListParser = SEBlackWhiteListParser(self.seWhiteList, seBlackList, common.logger())
58
59 if seBlackList != []:
60 common.logger.info("SE black list applied to data location: %s" %\
61 seBlackList)
62 if self.seWhiteList != []:
63 common.logger.info("SE white list applied to data location: %s" %\
64 self.seWhiteList)
65 # apply BW list
66 blockSites=args['blockSites']
67 common.logger.debug("List of blocks and used locations (SE):")
68 for block,dlsDest in blockSites.iteritems():
69 noBsites=self.blackWhiteListParser.checkBlackList(dlsDest)
70 sites=self.blackWhiteListParser.checkWhiteList(noBsites)
71 if sites : blockSites[block]=sites
72 common.logger.debug("%s : %s" % (block,sites))
73 args['blockSites']=blockSites
74
75 ## check if has been asked for a non default file to store/read analyzed fileBlocks
76 defaultName = common.work_space.shareDir()+'AnalyzedBlocks.txt'
77 self.fileBlocks_FileName = os.path.abspath(self.cfg_params.get('CMSSW.fileblocks_file',defaultName))
78
79
80 def checkUserSettings(self):
81 ## Events per job
82 if self.cfg_params.has_key('CMSSW.events_per_job'):
83 self.eventsPerJob =int( self.cfg_params['CMSSW.events_per_job'])
84 self.selectEventsPerJob = 1
85 else:
86 self.eventsPerJob = -1
87 self.selectEventsPerJob = 0
88
89 ## number of jobs
90 if self.cfg_params.has_key('CMSSW.number_of_jobs'):
91 self.theNumberOfJobs =int( self.cfg_params['CMSSW.number_of_jobs'])
92 self.selectNumberOfJobs = 1
93 else:
94 self.theNumberOfJobs = 0
95 self.selectNumberOfJobs = 0
96
97 if self.cfg_params.has_key('CMSSW.total_number_of_events'):
98 self.total_number_of_events = int(self.cfg_params['CMSSW.total_number_of_events'])
99 self.selectTotalNumberEvents = 1
100 if self.selectNumberOfJobs == 1:
101 if (self.total_number_of_events != -1) and int(self.total_number_of_events) < int(self.theNumberOfJobs):
102 msg = 'Must specify at least one event per job. total_number_of_events > number_of_jobs '
103 raise CrabException(msg)
104 else:
105 self.total_number_of_events = 0
106 self.selectTotalNumberEvents = 0
107
108 return
109
110 def checkLumiSettings(self):
111 """
112 Check to make sure the user has specified enough information to
113 perform splitting by Lumis to run the job
114 """
115 settings = 0
116 if self.cfg_params.has_key('CMSSW.lumis_per_job'):
117 self.lumisPerJob =int( self.cfg_params['CMSSW.lumis_per_job'])
118 self.limitJobLumis = True
119 settings += 1
120
121 if self.cfg_params.has_key('CMSSW.number_of_jobs'):
122 self.theNumberOfJobs =int( self.cfg_params['CMSSW.number_of_jobs'])
123 self.limitNJobs = True
124 settings += 1
125
126 if self.cfg_params.has_key('CMSSW.total_number_of_lumis'):
127 self.totalNLumis = int(self.cfg_params['CMSSW.total_number_of_lumis'])
128 self.limitTotalLumis = (self.totalNLumis != -1)
129 settings += 1
130
131 if settings != 2:
132 msg = 'When splitting by lumi section you must specify two and only two of:\n'
133 msg += ' number_of_jobs, lumis_per_job, total_number_of_lumis'
134 raise CrabException(msg)
135 if self.limitNJobs and self.limitJobLumis:
136 self.limitTotalLumis = True
137 self.totalNLumis = self.lumisPerJob * self.theNumberOfJobs
138
139 # Has the user specified runselection?
140 if (self.cfg_params.has_key('CMSSW.runselection')):
141 common.logger.info('You have specified runselection and split by lumi.')
142 common.logger.info('Good lumi list will be the intersection of runselection and lumimask or ADS (if any).')
143 return
144
145 def ComputeSubBlockSites( self, blockSites ):
146 """
147 """
148
149 sub_blockSites = {}
150 for k,v in blockSites.iteritems():
151 sites=self.blackWhiteListParser.checkWhiteList(v)
152 if sites : sub_blockSites[k]=v
153 if len(sub_blockSites) < 1:
154 msg = 'WARNING: the sites %s is not hosting any part of data.'%self.seWhiteList
155 raise CrabException(msg)
156 return sub_blockSites
157
158 ########################################################################
159 def jobSplittingByEvent( self ):
160 """
161 Perform job splitting. Jobs run over an integer number of files
162 and no more than one block.
163 ARGUMENT: blockSites: dictionary with blocks as keys and list of host sites as values
164 REQUIRES: self.selectTotalNumberEvents, self.selectEventsPerJob, self.selectNumberofJobs,
165 self.total_number_of_events, self.eventsPerJob, self.theNumberOfJobs,
166 self.maxEvents, self.filesbyblock
167 SETS: jobDestination - Site destination(s) for each job (a list of lists)
168 self.total_number_of_jobs - Total # of jobs
169 self.list_of_args - File(s) job will run on (a list of lists)
170 """
171
172
173 jobDestination=[]
174 self.checkUserSettings()
175 if ( (self.selectTotalNumberEvents + self.selectEventsPerJob + self.selectNumberOfJobs) != 2 ):
176 msg = 'Must define exactly two of total_number_of_events, events_per_job, or number_of_jobs.'
177 raise CrabException(msg)
178
179 blockSites = self.args['blockSites']
180 pubdata = self.args['pubdata']
181 filesbyblock=pubdata.getFiles()
182
183 self.eventsbyblock=pubdata.getEventsPerBlock()
184 self.eventsbyfile=pubdata.getEventsPerFile()
185 self.parentFiles=pubdata.getParent()
186
187 ## get max number of events
188 self.maxEvents=pubdata.getMaxEvents()
189
190 self.useParent = int(self.cfg_params.get('CMSSW.use_parent',0))
191 noBboundary = int(self.cfg_params.get('CMSSW.no_block_boundary',0))
192
193 if noBboundary == 1:
194 if self.total_number_of_events== -1:
195 msg = 'You are selecting no_block_boundary=1 which does not allow to set total_number_of_events=-1\n'
196 msg +='\tYou shoud get the number of event from DBS web interface and use it for your configuration.'
197 raise CrabException(msg)
198 if len(self.seWhiteList) != 1:
199 msg = 'You are selecting no_block_boundary=1 which requires to choose one and only one site.\n'
200 msg += "\tPlease set se_white_list with the site's storage element name."
201 raise CrabException(msg)
202 blockSites = self.ComputeSubBlockSites(blockSites)
203
204 # ---- Handle the possible job splitting configurations ---- #
205 if (self.selectTotalNumberEvents):
206 totalEventsRequested = self.total_number_of_events
207 if (self.selectEventsPerJob):
208 eventsPerJobRequested = self.eventsPerJob
209 if (self.selectNumberOfJobs):
210 totalEventsRequested = self.theNumberOfJobs * self.eventsPerJob
211
212 # If user requested all the events in the dataset
213 if (totalEventsRequested == -1):
214 eventsRemaining=self.maxEvents
215 # If user requested more events than are in the dataset
216 elif (totalEventsRequested > self.maxEvents):
217 eventsRemaining = self.maxEvents
218 common.logger.info("Requested "+str(self.total_number_of_events)+ " events, but only "+str(self.maxEvents)+" events are available.")
219 # If user requested less events than are in the dataset
220 else:
221 eventsRemaining = totalEventsRequested
222
223 # If user requested more events per job than are in the dataset
224 if (self.selectEventsPerJob and eventsPerJobRequested > self.maxEvents):
225 eventsPerJobRequested = self.maxEvents
226
227 # For user info at end
228 totalEventCount = 0
229
230 if (self.selectTotalNumberEvents and self.selectNumberOfJobs):
231 eventsPerJobRequested = int(eventsRemaining/self.theNumberOfJobs)
232
233 if (self.selectNumberOfJobs):
234 common.logger.info("May not create the exact number_of_jobs requested.")
235
236 if (self.theNumberOfJobs < 0):
237 common.logger.info("ERROR: Negative number_of_jobs requested. Will result in no jobs.")
238
239 # old... to remove Daniele
240 totalNumberOfJobs = 999999999
241
242 blocks = blockSites.keys()
243 blockCount = 0
244 # Backup variable in case self.maxEvents counted events in a non-included block
245 numBlocksInDataset = len(blocks)
246
247 jobCount = 0
248 list_of_lists = []
249
250 # list tracking which jobs are in which jobs belong to which block
251 jobsOfBlock = {}
252
253 parString = ""
254 pString = ""
255 filesEventCount = 0
256 msg=''
257
258 # ---- Iterate over the blocks in the dataset until ---- #
259 # ---- we've met the requested total # of events ---- #
260 while ( (eventsRemaining > 0) and (blockCount < numBlocksInDataset) and (jobCount < totalNumberOfJobs)):
261 block = blocks[blockCount]
262 blockCount += 1
263 if block not in jobsOfBlock.keys() :
264 jobsOfBlock[block] = []
265
266 if self.eventsbyblock.has_key(block) :
267 numEventsInBlock = self.eventsbyblock[block]
268 common.logger.debug('Events in Block File '+str(numEventsInBlock))
269
270 files = filesbyblock[block]
271 numFilesInBlock = len(files)
272 if (numFilesInBlock <= 0):
273 continue
274 fileCount = 0
275 if noBboundary == 0: # DD
276 # ---- New block => New job ---- #
277 parString = ""
278 pString=""
279 # counter for number of events in files currently worked on
280 filesEventCount = 0
281 # flag if next while loop should touch new file
282 newFile = 1
283 # job event counter
284 jobSkipEventCount = 0
285
286 # ---- Iterate over the files in the block until we've met the requested ---- #
287 # ---- total # of events or we've gone over all the files in this block ---- #
288 msg='\n'
289 while ( (eventsRemaining > 0) and (fileCount < numFilesInBlock) and (jobCount < totalNumberOfJobs) ):
290 file = files[fileCount]
291 if self.useParent==1:
292 parent = self.parentFiles[file]
293 common.logger.log(10-1, "File "+str(file)+" has the following parents: "+str(parent))
294 if newFile :
295 try:
296 numEventsInFile = self.eventsbyfile[file]
297 common.logger.log(10-1, "File "+str(file)+" has "+str(numEventsInFile)+" events")
298 # increase filesEventCount
299 filesEventCount += numEventsInFile
300 # Add file to current job
301 parString += file + ','
302 if self.useParent==1:
303 for f in parent :
304 pString += f + ','
305 newFile = 0
306 except KeyError:
307 common.logger.info("File "+str(file)+" has unknown number of events: skipping")
308
309 eventsPerJobRequested = min(eventsPerJobRequested, eventsRemaining)
310 # if less events in file remain than eventsPerJobRequested
311 if ( filesEventCount - jobSkipEventCount < eventsPerJobRequested):
312 if noBboundary == 1: ## DD
313 newFile = 1
314 fileCount += 1
315 else:
316 # if last file in block
317 if ( fileCount == numFilesInBlock-1 ) :
318 # end job using last file, use remaining events in block
319 # close job and touch new file
320 fullString = parString[:-1]
321 if self.useParent==1:
322 fullParentString = pString[:-1]
323 list_of_lists.append([fullString,fullParentString,str(-1),str(jobSkipEventCount),block])
324 else:
325 list_of_lists.append([fullString,str(-1),str(jobSkipEventCount),block])
326 msg += "Job %s can run over %s events (last file in block).\n"%(str(jobCount+1), str(filesEventCount - jobSkipEventCount))
327 jobDestination.append(blockSites[block])
328 msg += "Job %s Destination: %s\n"%(str(jobCount+1),str(SE2CMS(jobDestination[jobCount])))
329 # fill jobs of block dictionary
330 jobsOfBlock[block].append(jobCount+1)
331 # reset counter
332 jobCount = jobCount + 1
333 totalEventCount = totalEventCount + filesEventCount - jobSkipEventCount
334 eventsRemaining = eventsRemaining - filesEventCount + jobSkipEventCount
335 jobSkipEventCount = 0
336 # reset file
337 pString = ""
338 parString = ""
339 filesEventCount = 0
340 newFile = 1
341 fileCount += 1
342 else :
343 # go to next file
344 newFile = 1
345 fileCount += 1
346 # if events in file equal to eventsPerJobRequested
347 elif ( filesEventCount - jobSkipEventCount == eventsPerJobRequested ) :
348 # close job and touch new file
349 fullString = parString[:-1]
350 if self.useParent==1:
351 fullParentString = pString[:-1]
352 list_of_lists.append([fullString,fullParentString,str(eventsPerJobRequested),str(jobSkipEventCount),block])
353 else:
354 list_of_lists.append([fullString,str(eventsPerJobRequested),str(jobSkipEventCount),block])
355 msg += "Job %s can run over %s events.\n"%(str(jobCount+1),str(eventsPerJobRequested))
356 jobDestination.append(blockSites[block])
357 msg+= "Job %s Destination: %s\n"%(str(jobCount+1),str(SE2CMS(jobDestination[jobCount])))
358 jobsOfBlock[block].append(jobCount+1)
359 # reset counter
360 jobCount = jobCount + 1
361 totalEventCount = totalEventCount + eventsPerJobRequested
362 eventsRemaining = eventsRemaining - eventsPerJobRequested
363 jobSkipEventCount = 0
364 # reset file
365 pString = ""
366 parString = ""
367 filesEventCount = 0
368 newFile = 1
369 fileCount += 1
370
371 # if more events in file remain than eventsPerJobRequested
372 else :
373 # close job but don't touch new file
374 fullString = parString[:-1]
375 if self.useParent==1:
376 fullParentString = pString[:-1]
377 list_of_lists.append([fullString,fullParentString,str(eventsPerJobRequested),str(jobSkipEventCount),block])
378 else:
379 list_of_lists.append([fullString,str(eventsPerJobRequested),str(jobSkipEventCount),block])
380 msg += "Job %s can run over %s events.\n"%(str(jobCount+1),str(eventsPerJobRequested))
381 jobDestination.append(blockSites[block])
382 msg+= "Job %s Destination: %s\n"%(str(jobCount+1),str(SE2CMS(jobDestination[jobCount])))
383 jobsOfBlock[block].append(jobCount+1)
384 # increase counter
385 jobCount = jobCount + 1
386 totalEventCount = totalEventCount + eventsPerJobRequested
387 eventsRemaining = eventsRemaining - eventsPerJobRequested
388 # calculate skip events for last file
389 # use filesEventCount (contains several files), jobSkipEventCount and eventsPerJobRequest
390 jobSkipEventCount = eventsPerJobRequested - (filesEventCount - jobSkipEventCount - self.eventsbyfile[file])
391 # remove all but the last file
392 filesEventCount = self.eventsbyfile[file]
393 pString_tmp=''
394 if self.useParent==1:
395 for f in parent : pString_tmp += f + ','
396 pString = pString_tmp
397 parString = file + ','
398 pass # END if
399 pass # END while (iterate over files in the block)
400 pass # END while (iterate over blocks in the dataset)
401 common.logger.debug(msg)
402 self.ncjobs = self.total_number_of_jobs = jobCount
403 if (eventsRemaining > 0 and jobCount < totalNumberOfJobs ):
404 common.logger.info("Could not run on all requested events because some blocks not hosted at allowed sites.")
405 common.logger.info(str(jobCount)+" job(s) can run on "+str(totalEventCount)+" events.\n")
406
407 # skip check on block with no sites DD
408 if noBboundary == 0 : self.checkBlockNoSite(blocks,jobsOfBlock,blockSites)
409
410 # prepare dict output
411 dictOut = {}
412 dictOut['params']= ['InputFiles','MaxEvents','SkipEvents','InputBlocks']
413 if self.useParent: dictOut['params']= ['InputFiles','ParentFiles','MaxEvents','SkipEvents','InputBlocks']
414 dictOut['args'] = list_of_lists
415 dictOut['jobDestination'] = jobDestination
416 dictOut['njobs']=self.total_number_of_jobs
417
418 return dictOut
419
420 # keep trace of block with no sites to print a warning at the end
421
422 def checkBlockNoSite(self,blocks,jobsOfBlock,blockSites):
423 # screen output
424 screenOutput = "List of jobs and available destination sites:\n\n"
425 noSiteBlock = []
426 bloskNoSite = []
427 allBlock = []
428
429 blockCounter = 0
430 saveFblocks =''
431 for block in blocks:
432 if block in jobsOfBlock.keys() :
433 blockCounter += 1
434 allBlock.append( blockCounter )
435 sites=self.blackWhiteListParser.checkWhiteList(self.blackWhiteListParser.checkBlackList(blockSites[block],[block]),[block])
436 screenOutput += "Block %5i: jobs %20s: sites: %s\n" % (blockCounter,spanRanges(jobsOfBlock[block]),
437 ', '.join(SE2CMS(sites)))
438 if len(sites) == 0:
439 noSiteBlock.append( spanRanges(jobsOfBlock[block]) )
440 bloskNoSite.append( blockCounter )
441 else:
442 saveFblocks += str(block)+'\n'
443 writeTXTfile(self, self.fileBlocks_FileName , saveFblocks)
444
445 common.logger.info(screenOutput)
446 if len(noSiteBlock) > 0 and len(bloskNoSite) > 0:
447 msg = 'WARNING: No sites are hosting any part of data for block:\n '
448 virgola = ""
449 if len(bloskNoSite) > 1:
450 virgola = ","
451 for block in bloskNoSite:
452 msg += ' ' + str(block) + virgola
453 msg += '\n\t\tRelated jobs:\n '
454 virgola = ""
455 if len(noSiteBlock) > 1:
456 virgola = ","
457 for range_jobs in noSiteBlock:
458 msg += str(range_jobs) + virgola
459 msg += '\n\t\twill not be submitted and this block of data can not be analyzed!\n'
460 if self.cfg_params.has_key('GRID.se_white_list'):
461 msg += '\tWARNING: SE White List: '+self.cfg_params['GRID.se_white_list']+'\n'
462 msg += '\t(Hint: By whitelisting you force the job to run at this particular site(s).\n'
463 msg += '\tPlease check if the dataset is available at this site!)'
464 if self.cfg_params.has_key('GRID.ce_white_list'):
465 msg += '\tWARNING: CE White List: '+self.cfg_params['GRID.ce_white_list']+'\n'
466 msg += '\t(Hint: By whitelisting you force the job to run at this particular site(s).\n'
467 msg += '\tPlease check if the dataset is available at this site!)\n'
468
469 common.logger.info(msg)
470
471 if bloskNoSite == allBlock:
472 msg = 'Requested jobs cannot be Created! \n'
473 if self.cfg_params.has_key('GRID.se_white_list'):
474 msg += '\tWARNING: SE White List: '+self.cfg_params['GRID.se_white_list']+'\n'
475 msg += '\t(Hint: By whitelisting you force the job to run at this particular site(s).\n'
476 msg += '\tPlease check if the dataset is available at this site!)'
477 if self.cfg_params.has_key('GRID.ce_white_list'):
478 msg += '\tWARNING: CE White List: '+self.cfg_params['GRID.ce_white_list']+'\n'
479 msg += '\t(Hint: By whitelisting you force the job to run at this particular site(s).\n'
480 msg += '\tPlease check if the dataset is available at this site!)\n'
481 raise CrabException(msg)
482
483 return
484
485
486 ########################################################################
487 def jobSplittingByRun(self):
488 """
489 """
490
491 self.checkUserSettings()
492 blockSites = self.args['blockSites']
493 pubdata = self.args['pubdata']
494
495 if self.selectNumberOfJobs == 0 :
496 self.theNumberOfJobs = 9999999
497 blocks = {}
498 runList = []
499 thefiles = Fileset(name='FilesToSplit')
500 fileList = pubdata.getListFiles()
501 for f in fileList:
502 block = f['Block']['Name']
503 try:
504 f['Block']['StorageElementList'].extend(blockSites[block])
505 except:
506 continue
507 wmbsFile = File(f['LogicalFileName'])
508 if not blockSites[block]:
509 msg = 'WARNING: No sites are hosting any part of data for block: %s\n' %block
510 msg += 'Related jobs will not be submitted and this block of data can not be analyzed'
511 common.logger.debug(msg)
512 [ wmbsFile['locations'].add(x) for x in blockSites[block] ]
513 wmbsFile['block'] = block
514 runNum = f['RunsList'][0]['RunNumber']
515 runList.append(runNum)
516 myRun = Run(runNumber=runNum)
517 wmbsFile.addRun( myRun )
518 thefiles.addFile(
519 wmbsFile
520 )
521
522 work = Workflow()
523 subs = Subscription(
524 fileset = thefiles,
525 workflow = work,
526 split_algo = 'RunBased',
527 type = "Processing")
528 splitter = SplitterFactory()
529 jobfactory = splitter(subs)
530
531 #loop over all runs
532 list_of_lists = []
533 jobDestination = []
534 list_of_blocks = []
535 count = 0
536 for jobGroup in jobfactory():
537 if count < self.theNumberOfJobs:
538 res = self.getJobInfo(jobGroup)
539 parString = ''
540 for file in res['lfns']:
541 parString += file + ','
542 list_of_blocks.append(res['block'])
543 fullString = parString[:-1]
544 blockString=','.join(list_of_blocks)
545 list_of_lists.append([fullString,str(-1),str(0),blockString])
546 #need to check single file location
547 jobDestination.append(res['locations'])
548 count +=1
549 # prepare dict output
550 dictOut = {}
551 dictOut['params']= ['InputFiles','MaxEvents','SkipEvents','InputBlocks']
552 dictOut['args'] = list_of_lists
553 dictOut['jobDestination'] = jobDestination
554 dictOut['njobs']=count
555 self.cacheBlocks(list_of_blocks,jobDestination)
556
557 return dictOut
558
559 def getJobInfo( self,jobGroup ):
560 res = {}
561 lfns = []
562 locations = []
563 tmp_check=0
564 for job in jobGroup.jobs:
565 for file in job.getFiles():
566 lfns.append(file['lfn'])
567 for loc in file['locations']:
568 if tmp_check < 1 :
569 locations.append(loc)
570 res['block']= file['block']
571 tmp_check = tmp_check + 1
572 res['lfns'] = lfns
573 res['locations'] = locations
574 return res
575
576 ########################################################################
577 def prepareSplittingNoInput(self):
578 """
579 """
580 if (self.selectEventsPerJob):
581 common.logger.info('Required '+str(self.eventsPerJob)+' events per job ')
582 if (self.selectNumberOfJobs):
583 common.logger.info('Required '+str(self.theNumberOfJobs)+' jobs in total ')
584 if (self.selectTotalNumberEvents):
585 common.logger.info('Required '+str(self.total_number_of_events)+' events in total ')
586
587 if (self.total_number_of_events < 0):
588 msg='Cannot split jobs per Events with "-1" as total number of events'
589 raise CrabException(msg)
590
591 if (self.selectEventsPerJob):
592 if (self.selectTotalNumberEvents):
593 self.total_number_of_jobs = int(self.total_number_of_events/self.eventsPerJob)
594 elif(self.selectNumberOfJobs) :
595 self.total_number_of_jobs =self.theNumberOfJobs
596 self.total_number_of_events =int(self.theNumberOfJobs*self.eventsPerJob)
597
598 elif (self.selectNumberOfJobs) :
599 self.total_number_of_jobs = self.theNumberOfJobs
600 self.eventsPerJob = int(self.total_number_of_events/self.total_number_of_jobs)
601
602
603 def jobSplittingNoInput(self):
604 """
605 Perform job splitting based on number of event per job
606 """
607 common.logger.debug('Splitting per events')
608 self.checkUserSettings()
609 jobDestination=[]
610 if ( (self.selectTotalNumberEvents + self.selectEventsPerJob + self.selectNumberOfJobs) != 2 ):
611 msg = 'Must define exactly two of total_number_of_events, events_per_job, or number_of_jobs.'
612 raise CrabException(msg)
613
614 managedGenerators =self.args['managedGenerators']
615 generator = self.args['generator']
616 firstLumi = int(self.cfg_params.get('CMSSW.first_lumi', 1))
617
618 self.prepareSplittingNoInput()
619
620 common.logger.debug('N jobs '+str(self.total_number_of_jobs))
621
622 # is there any remainder?
623 check = int(self.total_number_of_events) - (int(self.total_number_of_jobs)*self.eventsPerJob)
624
625 common.logger.debug('Check '+str(check))
626
627 common.logger.info(str(self.total_number_of_jobs)+' jobs can be created, each for '+str(self.eventsPerJob)+' for a total of '+str(self.total_number_of_jobs*self.eventsPerJob)+' events')
628 if check > 0:
629 common.logger.info('Warning: asked '+str(self.total_number_of_events)+' but can do only '+str(int(self.total_number_of_jobs)*self.eventsPerJob))
630
631 # argument is seed number.$i
632 self.list_of_args = []
633 for i in range(self.total_number_of_jobs):
634 ## Since there is no input, any site is good
635 jobDestination.append([""]) # must be empty to correctly write the XML
636 args=[]
637 if (firstLumi): # Pythia first lumi
638 args.append(str(int(firstLumi)+i))
639 if (generator in managedGenerators):
640 args.append(generator)
641 if (generator == 'comphep' and i == 0):
642 # COMPHEP is brain-dead and wants event #'s like 1,100,200,300
643 args.append('1')
644 else:
645 args.append(str(i*self.eventsPerJob))
646 args.append(str(self.eventsPerJob))
647 self.list_of_args.append(args)
648 # prepare dict output
649
650 dictOut = {}
651 dictOut['params'] = ['MaxEvents']
652 if (firstLumi):
653 dictOut['params'] = ['FirstLumi','MaxEvents']
654 if (generator in managedGenerators):
655 dictOut['params'] = ['FirstLumi', 'Generator', 'FirstEvent', 'MaxEvents']
656 else:
657 if (generator in managedGenerators) :
658 dictOut['params'] = ['Generator', 'FirstEvent', 'MaxEvents']
659 dictOut['args'] = self.list_of_args
660 dictOut['jobDestination'] = jobDestination
661 dictOut['njobs']=self.total_number_of_jobs
662
663 return dictOut
664
665
666 def jobSplittingForScript(self):
667 """
668 Perform job splitting based on number of job
669 """
670 self.checkUserSettings()
671 if (self.selectNumberOfJobs == 0):
672 msg = 'must specify number_of_jobs.'
673 raise crabexception(msg)
674 jobDestination = []
675 common.logger.debug('Splitting per job')
676 common.logger.info('Required '+str(self.theNumberOfJobs)+' jobs in total ')
677
678 # self.total_number_of_jobs = self.theNumberOfJobs
679
680 self.prepareSplittingNoInput()
681
682 common.logger.debug('N jobs '+str(self.total_number_of_jobs))
683
684 common.logger.info(str(self.total_number_of_jobs)+' jobs can be created')
685
686 # argument is seed number.$i
687 self.list_of_args = []
688 for i in range(self.total_number_of_jobs):
689 args=[]
690 jobDestination.append([""])
691 if self.eventsPerJob != 0 :
692 args.append(str(self.eventsPerJob))
693 self.list_of_args.append(args)
694
695 # prepare dict output
696 dictOut = {}
697 dictOut['params'] = ['MaxEvents']
698 dictOut['args'] = self.list_of_args
699 dictOut['jobDestination'] = jobDestination
700 dictOut['njobs']=self.total_number_of_jobs
701 return dictOut
702
703
704 def jobSplittingByLumi(self):
705 """
706 Split task into jobs by Lumi section paying attention to which
707 lumis should be run (according to the analysis dataset).
708 This uses WMBS job splitting which does not split files over jobs
709 so the job will have AT LEAST as many lumis as requested, perhaps
710 more
711 """
712 self.useParent = int(self.cfg_params.get('CMSSW.use_parent',0))
713 common.logger.debug('Splitting by Lumi')
714 self.checkLumiSettings()
715
716 blockSites = self.args['blockSites']
717 pubdata = self.args['pubdata']
718
719 lumisPerFile = pubdata.getLumis()
720 self.parentFiles=pubdata.getParent()
721 # Make the list of WMBS files for job splitter
722 fileList = pubdata.getListFiles()
723 wmFileList = []
724 for jobFile in fileList:
725 block = jobFile['Block']['Name']
726 try:
727 jobFile['Block']['StorageElementList'].extend(blockSites[block])
728 except:
729 continue
730 wmbsFile = File(jobFile['LogicalFileName'])
731 if not blockSites[block]:
732 msg = 'WARNING: No sites are hosting any part of data for block: %s\n' %block
733 msg += 'Related jobs will not be submitted and this block of data can not be analyzed'
734 common.logger.debug(msg)
735 # wmbsFile['locations'].add('Nowhere')
736 [ wmbsFile['locations'].add(x) for x in blockSites[block] ]
737 wmbsFile['block'] = block
738 for lumi in lumisPerFile[jobFile['LogicalFileName']]:
739 wmbsFile.addRun(Run(lumi[0], lumi[1]))
740 wmFileList.append(wmbsFile)
741
742 fileSet = set(wmFileList)
743 thefiles = Fileset(name='FilesToSplit', files = fileSet)
744
745 # Create the factory and workflow
746 work = Workflow()
747 subs = Subscription(fileset = thefiles, workflow = work,
748 split_algo = 'LumiBased', type = "Processing")
749 splitter = SplitterFactory()
750 jobFactory = splitter(subs)
751
752 list_of_lists = []
753 jobDestination = []
754 jobCount = 0
755 lumisCreated = 0
756 list_of_blocks = []
757 if not self.limitJobLumis:
758 if self.totalNLumis > 0:
759 self.lumisPerJob = max(self.totalNLumis // self.theNumberOfJobs,1)
760 else:
761 self.lumisPerJob = pubdata.getMaxLumis() // self.theNumberOfJobs + 1
762 common.logger.info('Each job will process about %s lumis.' %
763 self.lumisPerJob)
764
765 for jobGroup in jobFactory(lumis_per_job = self.lumisPerJob):
766 for job in jobGroup.jobs:
767 if (self.limitNJobs and jobCount >= self.theNumberOfJobs):
768 common.logger.info('Requested number of jobs reached.')
769 break
770 if (self.limitTotalLumis and lumisCreated >= self.totalNLumis):
771 common.logger.info('Requested number of lumis reached.')
772 break
773 lumis = []
774 lfns = []
775 if self.useParent==1:
776 parentlfns = []
777 pString =""
778
779 locations = []
780 blocks = []
781 firstFile = True
782 # Collect information from all the files
783 for jobFile in job.getFiles():
784 doFile = False
785 if firstFile: # Get locations from first file in the job
786 for loc in jobFile['locations']:
787 locations.append(loc)
788 blocks.append(jobFile['block'])
789 firstFile = False
790 # Accumulate Lumis from all files
791 for lumiList in jobFile['runs']:
792 theRun = lumiList.run
793 for theLumi in list(lumiList):
794 if (not self.limitTotalLumis) or \
795 (lumisCreated < self.totalNLumis):
796 doFile = True
797 lumisCreated += 1
798 lumis.append( (theRun, theLumi) )
799 if doFile:
800 lfns.append(jobFile['lfn'])
801 if self.useParent==1:
802 parent = self.parentFiles[jobFile['lfn']]
803 for p in parent :
804 pString += p + ','
805 fileString = ','.join(lfns)
806 lumiLister = LumiList(lumis = lumis)
807 lumiString = lumiLister.getCMSSWString()
808 blockString=','.join(blocks)
809 if self.useParent==1:
810 common.logger.debug("Files: "+fileString+" with the following parents: "+pString[:-1])
811 pfileString = pString[:-1]
812 list_of_lists.append([fileString, pfileString, str(-1), str(0), lumiString,blockString])
813 else:
814 list_of_lists.append([fileString, str(-1), str(0), lumiString, blockString])
815 list_of_blocks.append(blocks)
816 jobDestination.append(locations)
817 jobCount += 1
818 common.logger.debug('Job %s will run on %s files and %s lumis '
819 % (jobCount, len(lfns), len(lumis) ))
820
821 common.logger.info('%s jobs created to run on %s lumis' %
822 (jobCount, lumisCreated))
823
824 # Prepare dict output matching back to non-WMBS job creation
825 dictOut = {}
826 dictOut['params'] = ['InputFiles', 'MaxEvents', 'SkipEvents', 'Lumis','InputBlocks']
827 if self.useParent==1:
828 dictOut['params']= ['InputFiles','ParentFiles','MaxEvents','SkipEvents','Lumis','InputBlocks']
829 dictOut['args'] = list_of_lists
830 dictOut['jobDestination'] = jobDestination
831 dictOut['njobs'] = jobCount
832 self.cacheBlocks(list_of_blocks,jobDestination)
833
834 return dictOut
835
836 def cacheBlocks(self, blocks,destinations):
837
838 saveFblocks=''
839 for i in range(len(blocks)):
840 sites=self.blackWhiteListParser.checkWhiteList(self.blackWhiteListParser.checkBlackList(destinations[i]))
841 if len(sites) != 0:
842 for block in blocks[i]:
843 saveFblocks += str(block)+'\n'
844 writeTXTfile(self, self.fileBlocks_FileName , saveFblocks)
845
846 def Algos(self):
847 """
848 Define key splittingType matrix
849 """
850 SplitAlogs = {
851 'EventBased' : self.jobSplittingByEvent,
852 'RunBased' : self.jobSplittingByRun,
853 'LumiBased' : self.jobSplittingByLumi,
854 'NoInput' : self.jobSplittingNoInput,
855 'ForScript' : self.jobSplittingForScript
856 }
857 return SplitAlogs
858