ViewVC Help
View File | Revision Log | Show Annotations | Root Listing
root/cvsroot/COMP/CRAB/python/Splitter.py
Revision: 1.59
Committed: Sat May 4 10:37:07 2013 UTC (11 years, 11 months ago) by belforte
Content type: text/x-python
Branch: MAIN
CVS Tags: CRAB_2_8_7_patch2
Changes since 1.58: +8 -4 lines
Log Message:
final (hope) fix for https://savannah.cern.ch/bugs/index.php?101186

File Contents

# User Rev Content
1 ewv 1.27
2 belforte 1.59 __revision__ = "$Id: Splitter.py,v 1.58 2013/05/03 19:11:04 belforte Exp $"
3     __version__ = "$Revision: 1.58 $"
4 ewv 1.27
5 spiga 1.1 import common
6     from crab_exceptions import *
7     from crab_util import *
8 ewv 1.26
9     from WMCore.DataStructs.File import File
10     from WMCore.DataStructs.Fileset import Fileset
11     from WMCore.DataStructs.Run import Run
12     from WMCore.DataStructs.Subscription import Subscription
13     from WMCore.DataStructs.Workflow import Workflow
14     from WMCore.JobSplitting.SplitterFactory import SplitterFactory
15 spiga 1.1 from WMCore.SiteScreening.BlackWhiteListParser import SEBlackWhiteListParser
16 ewv 1.43 try: # Can remove when CMSSW 3.7 and earlier are dropped
17     from FWCore.PythonUtilities.LumiList import LumiList
18     except ImportError:
19     from LumiList import LumiList
20 spiga 1.1
21     class JobSplitter:
22     def __init__( self, cfg_params, args ):
23     self.cfg_params = cfg_params
24 spiga 1.3 self.args=args
25 ewv 1.27
26     self.lumisPerJob = -1
27     self.totalNLumis = 0
28     self.theNumberOfJobs = 0
29     self.limitNJobs = False
30     self.limitTotalLumis = False
31     self.limitJobLumis = False
32    
33 spiga 1.1 #self.maxEvents
34     # init BlackWhiteListParser
35 spiga 1.24 self.seWhiteList = cfg_params.get('GRID.se_white_list',[])
36 belforte 1.59 if type(self.seWhiteList) == type("string"):
37     self.seWhiteList = self.seWhiteList.split(',')
38 spiga 1.14 seBlackList = cfg_params.get('GRID.se_black_list',[])
39 belforte 1.59 if type(seBlackList) == type("string"):
40     seBlackList = seBlackList.split(',')
41 belforte 1.57 if common.scheduler.name().upper() == 'REMOTEGLIDEIN' :
42     # use central black list
43     removeBList = cfg_params.get("GRID.remove_default_blacklist", 0 )
44     blackAnaOps = None
45     if int(removeBList) == 0:
46     blacklist = Downloader("http://cmsdoc.cern.ch/cms/LCG/crab/config/")
47 belforte 1.59 result = blacklist.config("site_black_list.conf").strip().split(',')
48 belforte 1.57 if result != None:
49     blackAnaOps = result
50     common.logger.debug("Enforced black list: %s "%blackAnaOps)
51     else:
52     common.logger.info("WARNING: Skipping default black list!")
53     if int(removeBList) == 0 and blackAnaOps:
54 belforte 1.59 seBlackList += blackAnaOps
55 belforte 1.57
56 belforte 1.58 if seBlackList != []:
57     common.logger.info("SE black list applied to data location: %s" %\
58 belforte 1.57 seBlackList)
59 belforte 1.58 if self.seWhiteList != []:
60     common.logger.info("SE white list applied to data location: %s" %\
61     self.seWhiteList)
62 spiga 1.24 self.blackWhiteListParser = SEBlackWhiteListParser(self.seWhiteList, seBlackList, common.logger())
63 spiga 1.1
64 spiga 1.39 ## check if has been asked for a non default file to store/read analyzed fileBlocks
65     defaultName = common.work_space.shareDir()+'AnalyzedBlocks.txt'
66     self.fileBlocks_FileName = os.path.abspath(self.cfg_params.get('CMSSW.fileblocks_file',defaultName))
67    
68 spiga 1.1
69     def checkUserSettings(self):
70     ## Events per job
71     if self.cfg_params.has_key('CMSSW.events_per_job'):
72     self.eventsPerJob =int( self.cfg_params['CMSSW.events_per_job'])
73     self.selectEventsPerJob = 1
74     else:
75     self.eventsPerJob = -1
76     self.selectEventsPerJob = 0
77    
78     ## number of jobs
79     if self.cfg_params.has_key('CMSSW.number_of_jobs'):
80     self.theNumberOfJobs =int( self.cfg_params['CMSSW.number_of_jobs'])
81     self.selectNumberOfJobs = 1
82     else:
83     self.theNumberOfJobs = 0
84     self.selectNumberOfJobs = 0
85    
86     if self.cfg_params.has_key('CMSSW.total_number_of_events'):
87     self.total_number_of_events = int(self.cfg_params['CMSSW.total_number_of_events'])
88     self.selectTotalNumberEvents = 1
89     if self.selectNumberOfJobs == 1:
90     if (self.total_number_of_events != -1) and int(self.total_number_of_events) < int(self.theNumberOfJobs):
91     msg = 'Must specify at least one event per job. total_number_of_events > number_of_jobs '
92     raise CrabException(msg)
93     else:
94     self.total_number_of_events = 0
95     self.selectTotalNumberEvents = 0
96    
97 ewv 1.37 return
98 spiga 1.1
99 ewv 1.27 def checkLumiSettings(self):
100     """
101     Check to make sure the user has specified enough information to
102     perform splitting by Lumis to run the job
103     """
104     settings = 0
105     if self.cfg_params.has_key('CMSSW.lumis_per_job'):
106     self.lumisPerJob =int( self.cfg_params['CMSSW.lumis_per_job'])
107     self.limitJobLumis = True
108     settings += 1
109    
110     if self.cfg_params.has_key('CMSSW.number_of_jobs'):
111     self.theNumberOfJobs =int( self.cfg_params['CMSSW.number_of_jobs'])
112     self.limitNJobs = True
113     settings += 1
114    
115     if self.cfg_params.has_key('CMSSW.total_number_of_lumis'):
116     self.totalNLumis = int(self.cfg_params['CMSSW.total_number_of_lumis'])
117     self.limitTotalLumis = (self.totalNLumis != -1)
118     settings += 1
119    
120     if settings != 2:
121 ewv 1.35 msg = 'When splitting by lumi section you must specify two and only two of:\n'
122 ewv 1.28 msg += ' number_of_jobs, lumis_per_job, total_number_of_lumis'
123 ewv 1.27 raise CrabException(msg)
124     if self.limitNJobs and self.limitJobLumis:
125     self.limitTotalLumis = True
126     self.totalNLumis = self.lumisPerJob * self.theNumberOfJobs
127    
128 ewv 1.37 # Has the user specified runselection?
129     if (self.cfg_params.has_key('CMSSW.runselection')):
130     common.logger.info('You have specified runselection and split by lumi.')
131     common.logger.info('Good lumi list will be the intersection of runselection and lumimask or ADS (if any).')
132     return
133 ewv 1.27
134 spiga 1.24 def ComputeSubBlockSites( self, blockSites ):
135     """
136     """
137     sub_blockSites = {}
138     for k,v in blockSites.iteritems():
139     sites=self.blackWhiteListParser.checkWhiteList(v)
140     if sites : sub_blockSites[k]=v
141     if len(sub_blockSites) < 1:
142     msg = 'WARNING: the sites %s is not hosting any part of data.'%self.seWhiteList
143     raise CrabException(msg)
144     return sub_blockSites
145    
146 spiga 1.1 ########################################################################
147     def jobSplittingByEvent( self ):
148     """
149     Perform job splitting. Jobs run over an integer number of files
150     and no more than one block.
151     ARGUMENT: blockSites: dictionary with blocks as keys and list of host sites as values
152     REQUIRES: self.selectTotalNumberEvents, self.selectEventsPerJob, self.selectNumberofJobs,
153     self.total_number_of_events, self.eventsPerJob, self.theNumberOfJobs,
154     self.maxEvents, self.filesbyblock
155 spiga 1.3 SETS: jobDestination - Site destination(s) for each job (a list of lists)
156 spiga 1.1 self.total_number_of_jobs - Total # of jobs
157     self.list_of_args - File(s) job will run on (a list of lists)
158     """
159    
160 ewv 1.22 jobDestination=[]
161 spiga 1.1 self.checkUserSettings()
162     if ( (self.selectTotalNumberEvents + self.selectEventsPerJob + self.selectNumberOfJobs) != 2 ):
163     msg = 'Must define exactly two of total_number_of_events, events_per_job, or number_of_jobs.'
164     raise CrabException(msg)
165 ewv 1.22
166     blockSites = self.args['blockSites']
167 spiga 1.4 pubdata = self.args['pubdata']
168 spiga 1.3 filesbyblock=pubdata.getFiles()
169    
170     self.eventsbyblock=pubdata.getEventsPerBlock()
171     self.eventsbyfile=pubdata.getEventsPerFile()
172     self.parentFiles=pubdata.getParent()
173 spiga 1.1
174     ## get max number of events
175 spiga 1.3 self.maxEvents=pubdata.getMaxEvents()
176 spiga 1.1
177     self.useParent = int(self.cfg_params.get('CMSSW.use_parent',0))
178     noBboundary = int(self.cfg_params.get('CMSSW.no_block_boundary',0))
179    
180 spiga 1.24 if noBboundary == 1:
181     if self.total_number_of_events== -1:
182     msg = 'You are selecting no_block_boundary=1 which does not allow to set total_number_of_events=-1\n'
183 ewv 1.26 msg +='\tYou shoud get the number of event from DBS web interface and use it for your configuration.'
184     raise CrabException(msg)
185 spiga 1.25 if len(self.seWhiteList) == 0 or len(self.seWhiteList.split(',')) != 1:
186 spiga 1.24 msg = 'You are selecting no_block_boundary=1 which requires to choose one and only one site.\n'
187     msg += "\tPlease set se_white_list with the site's storage element name."
188 ewv 1.26 raise CrabException(msg)
189     blockSites = self.ComputeSubBlockSites(blockSites)
190 spiga 1.24
191 spiga 1.1 # ---- Handle the possible job splitting configurations ---- #
192     if (self.selectTotalNumberEvents):
193     totalEventsRequested = self.total_number_of_events
194     if (self.selectEventsPerJob):
195     eventsPerJobRequested = self.eventsPerJob
196     if (self.selectNumberOfJobs):
197     totalEventsRequested = self.theNumberOfJobs * self.eventsPerJob
198    
199     # If user requested all the events in the dataset
200     if (totalEventsRequested == -1):
201     eventsRemaining=self.maxEvents
202     # If user requested more events than are in the dataset
203     elif (totalEventsRequested > self.maxEvents):
204     eventsRemaining = self.maxEvents
205 spiga 1.13 common.logger.info("Requested "+str(self.total_number_of_events)+ " events, but only "+str(self.maxEvents)+" events are available.")
206 spiga 1.1 # If user requested less events than are in the dataset
207     else:
208     eventsRemaining = totalEventsRequested
209    
210     # If user requested more events per job than are in the dataset
211     if (self.selectEventsPerJob and eventsPerJobRequested > self.maxEvents):
212     eventsPerJobRequested = self.maxEvents
213    
214     # For user info at end
215     totalEventCount = 0
216    
217     if (self.selectTotalNumberEvents and self.selectNumberOfJobs):
218     eventsPerJobRequested = int(eventsRemaining/self.theNumberOfJobs)
219    
220     if (self.selectNumberOfJobs):
221 spiga 1.13 common.logger.info("May not create the exact number_of_jobs requested.")
222 spiga 1.1
223 belforte 1.55 if (self.theNumberOfJobs < 0):
224     common.logger.info("ERROR: Negative number_of_jobs requested. Will result in no jobs.")
225    
226 spiga 1.1 # old... to remove Daniele
227     totalNumberOfJobs = 999999999
228    
229 spiga 1.3 blocks = blockSites.keys()
230 spiga 1.1 blockCount = 0
231     # Backup variable in case self.maxEvents counted events in a non-included block
232     numBlocksInDataset = len(blocks)
233    
234     jobCount = 0
235     list_of_lists = []
236    
237     # list tracking which jobs are in which jobs belong to which block
238     jobsOfBlock = {}
239    
240     parString = ""
241 spiga 1.16 pString = ""
242 spiga 1.1 filesEventCount = 0
243 ewv 1.22 msg=''
244 spiga 1.1
245     # ---- Iterate over the blocks in the dataset until ---- #
246     # ---- we've met the requested total # of events ---- #
247     while ( (eventsRemaining > 0) and (blockCount < numBlocksInDataset) and (jobCount < totalNumberOfJobs)):
248     block = blocks[blockCount]
249     blockCount += 1
250     if block not in jobsOfBlock.keys() :
251     jobsOfBlock[block] = []
252    
253     if self.eventsbyblock.has_key(block) :
254     numEventsInBlock = self.eventsbyblock[block]
255 spiga 1.13 common.logger.debug('Events in Block File '+str(numEventsInBlock))
256 spiga 1.1
257 spiga 1.4 files = filesbyblock[block]
258 spiga 1.1 numFilesInBlock = len(files)
259     if (numFilesInBlock <= 0):
260     continue
261     fileCount = 0
262     if noBboundary == 0: # DD
263     # ---- New block => New job ---- #
264     parString = ""
265 spiga 1.16 pString=""
266 spiga 1.1 # counter for number of events in files currently worked on
267     filesEventCount = 0
268     # flag if next while loop should touch new file
269     newFile = 1
270     # job event counter
271     jobSkipEventCount = 0
272    
273     # ---- Iterate over the files in the block until we've met the requested ---- #
274     # ---- total # of events or we've gone over all the files in this block ---- #
275 spiga 1.15 msg='\n'
276 spiga 1.1 while ( (eventsRemaining > 0) and (fileCount < numFilesInBlock) and (jobCount < totalNumberOfJobs) ):
277     file = files[fileCount]
278     if self.useParent==1:
279     parent = self.parentFiles[file]
280 spiga 1.13 common.logger.log(10-1, "File "+str(file)+" has the following parents: "+str(parent))
281 spiga 1.1 if newFile :
282     try:
283     numEventsInFile = self.eventsbyfile[file]
284 spiga 1.13 common.logger.log(10-1, "File "+str(file)+" has "+str(numEventsInFile)+" events")
285 spiga 1.1 # increase filesEventCount
286     filesEventCount += numEventsInFile
287     # Add file to current job
288 spiga 1.11 parString += file + ','
289 spiga 1.16 if self.useParent==1:
290     for f in parent :
291     pString += f + ','
292 spiga 1.1 newFile = 0
293     except KeyError:
294 spiga 1.13 common.logger.info("File "+str(file)+" has unknown number of events: skipping")
295 spiga 1.1
296     eventsPerJobRequested = min(eventsPerJobRequested, eventsRemaining)
297     # if less events in file remain than eventsPerJobRequested
298     if ( filesEventCount - jobSkipEventCount < eventsPerJobRequested):
299     if noBboundary == 1: ## DD
300     newFile = 1
301     fileCount += 1
302     else:
303     # if last file in block
304     if ( fileCount == numFilesInBlock-1 ) :
305     # end job using last file, use remaining events in block
306     # close job and touch new file
307 spiga 1.11 fullString = parString[:-1]
308 spiga 1.1 if self.useParent==1:
309 spiga 1.11 fullParentString = pString[:-1]
310 spiga 1.50 list_of_lists.append([fullString,fullParentString,str(-1),str(jobSkipEventCount),block])
311 spiga 1.1 else:
312 spiga 1.50 list_of_lists.append([fullString,str(-1),str(jobSkipEventCount),block])
313 spiga 1.15 msg += "Job %s can run over %s events (last file in block).\n"%(str(jobCount+1), str(filesEventCount - jobSkipEventCount))
314 spiga 1.3 jobDestination.append(blockSites[block])
315 slacapra 1.20 msg += "Job %s Destination: %s\n"%(str(jobCount+1),str(SE2CMS(jobDestination[jobCount])))
316 spiga 1.1 # fill jobs of block dictionary
317     jobsOfBlock[block].append(jobCount+1)
318     # reset counter
319     jobCount = jobCount + 1
320     totalEventCount = totalEventCount + filesEventCount - jobSkipEventCount
321     eventsRemaining = eventsRemaining - filesEventCount + jobSkipEventCount
322     jobSkipEventCount = 0
323     # reset file
324     pString = ""
325     parString = ""
326     filesEventCount = 0
327     newFile = 1
328     fileCount += 1
329     else :
330     # go to next file
331     newFile = 1
332     fileCount += 1
333     # if events in file equal to eventsPerJobRequested
334     elif ( filesEventCount - jobSkipEventCount == eventsPerJobRequested ) :
335     # close job and touch new file
336 spiga 1.11 fullString = parString[:-1]
337 spiga 1.1 if self.useParent==1:
338 spiga 1.11 fullParentString = pString[:-1]
339 spiga 1.50 list_of_lists.append([fullString,fullParentString,str(eventsPerJobRequested),str(jobSkipEventCount),block])
340 spiga 1.1 else:
341 spiga 1.50 list_of_lists.append([fullString,str(eventsPerJobRequested),str(jobSkipEventCount),block])
342 spiga 1.15 msg += "Job %s can run over %s events.\n"%(str(jobCount+1),str(eventsPerJobRequested))
343 spiga 1.3 jobDestination.append(blockSites[block])
344 slacapra 1.20 msg+= "Job %s Destination: %s\n"%(str(jobCount+1),str(SE2CMS(jobDestination[jobCount])))
345 spiga 1.1 jobsOfBlock[block].append(jobCount+1)
346     # reset counter
347     jobCount = jobCount + 1
348     totalEventCount = totalEventCount + eventsPerJobRequested
349     eventsRemaining = eventsRemaining - eventsPerJobRequested
350     jobSkipEventCount = 0
351     # reset file
352     pString = ""
353     parString = ""
354     filesEventCount = 0
355     newFile = 1
356     fileCount += 1
357    
358     # if more events in file remain than eventsPerJobRequested
359     else :
360     # close job but don't touch new file
361 spiga 1.11 fullString = parString[:-1]
362 spiga 1.1 if self.useParent==1:
363 spiga 1.11 fullParentString = pString[:-1]
364 spiga 1.50 list_of_lists.append([fullString,fullParentString,str(eventsPerJobRequested),str(jobSkipEventCount),block])
365 spiga 1.1 else:
366 spiga 1.50 list_of_lists.append([fullString,str(eventsPerJobRequested),str(jobSkipEventCount),block])
367 spiga 1.15 msg += "Job %s can run over %s events.\n"%(str(jobCount+1),str(eventsPerJobRequested))
368 spiga 1.3 jobDestination.append(blockSites[block])
369 slacapra 1.20 msg+= "Job %s Destination: %s\n"%(str(jobCount+1),str(SE2CMS(jobDestination[jobCount])))
370 spiga 1.1 jobsOfBlock[block].append(jobCount+1)
371     # increase counter
372     jobCount = jobCount + 1
373     totalEventCount = totalEventCount + eventsPerJobRequested
374     eventsRemaining = eventsRemaining - eventsPerJobRequested
375     # calculate skip events for last file
376     # use filesEventCount (contains several files), jobSkipEventCount and eventsPerJobRequest
377     jobSkipEventCount = eventsPerJobRequested - (filesEventCount - jobSkipEventCount - self.eventsbyfile[file])
378     # remove all but the last file
379     filesEventCount = self.eventsbyfile[file]
380 spiga 1.16 pString_tmp=''
381 spiga 1.1 if self.useParent==1:
382 spiga 1.16 for f in parent : pString_tmp += f + ','
383 ewv 1.22 pString = pString_tmp
384 spiga 1.11 parString = file + ','
385 spiga 1.1 pass # END if
386     pass # END while (iterate over files in the block)
387     pass # END while (iterate over blocks in the dataset)
388 spiga 1.15 common.logger.debug(msg)
389 spiga 1.1 self.ncjobs = self.total_number_of_jobs = jobCount
390     if (eventsRemaining > 0 and jobCount < totalNumberOfJobs ):
391 spiga 1.13 common.logger.info("Could not run on all requested events because some blocks not hosted at allowed sites.")
392     common.logger.info(str(jobCount)+" job(s) can run on "+str(totalEventCount)+" events.\n")
393 ewv 1.22
394 spiga 1.1 # skip check on block with no sites DD
395 spiga 1.5 if noBboundary == 0 : self.checkBlockNoSite(blocks,jobsOfBlock,blockSites)
396 spiga 1.1
397     # prepare dict output
398     dictOut = {}
399 spiga 1.49 dictOut['params']= ['InputFiles','MaxEvents','SkipEvents','InputBlocks']
400     if self.useParent: dictOut['params']= ['InputFiles','ParentFiles','MaxEvents','SkipEvents','InputBlocks']
401 spiga 1.1 dictOut['args'] = list_of_lists
402 spiga 1.3 dictOut['jobDestination'] = jobDestination
403 spiga 1.1 dictOut['njobs']=self.total_number_of_jobs
404    
405     return dictOut
406    
407     # keep trace of block with no sites to print a warning at the end
408    
409 ewv 1.22 def checkBlockNoSite(self,blocks,jobsOfBlock,blockSites):
410 spiga 1.1 # screen output
411     screenOutput = "List of jobs and available destination sites:\n\n"
412     noSiteBlock = []
413     bloskNoSite = []
414 spiga 1.10 allBlock = []
415 spiga 1.1
416     blockCounter = 0
417 spiga 1.39 saveFblocks =''
418 spiga 1.1 for block in blocks:
419     if block in jobsOfBlock.keys() :
420     blockCounter += 1
421 spiga 1.10 allBlock.append( blockCounter )
422 slacapra 1.19 sites=self.blackWhiteListParser.checkWhiteList(self.blackWhiteListParser.checkBlackList(blockSites[block],[block]),[block])
423 spiga 1.1 screenOutput += "Block %5i: jobs %20s: sites: %s\n" % (blockCounter,spanRanges(jobsOfBlock[block]),
424 slacapra 1.20 ', '.join(SE2CMS(sites)))
425 slacapra 1.19 if len(sites) == 0:
426 spiga 1.1 noSiteBlock.append( spanRanges(jobsOfBlock[block]) )
427     bloskNoSite.append( blockCounter )
428 spiga 1.39 else:
429     saveFblocks += str(block)+'\n'
430     writeTXTfile(self, self.fileBlocks_FileName , saveFblocks)
431 spiga 1.1
432 spiga 1.13 common.logger.info(screenOutput)
433 spiga 1.1 if len(noSiteBlock) > 0 and len(bloskNoSite) > 0:
434     msg = 'WARNING: No sites are hosting any part of data for block:\n '
435     virgola = ""
436     if len(bloskNoSite) > 1:
437     virgola = ","
438     for block in bloskNoSite:
439     msg += ' ' + str(block) + virgola
440 spiga 1.25 msg += '\n\t\tRelated jobs:\n '
441 spiga 1.1 virgola = ""
442     if len(noSiteBlock) > 1:
443     virgola = ","
444     for range_jobs in noSiteBlock:
445     msg += str(range_jobs) + virgola
446 spiga 1.25 msg += '\n\t\twill not be submitted and this block of data can not be analyzed!\n'
447 spiga 1.14 if self.cfg_params.has_key('GRID.se_white_list'):
448 spiga 1.25 msg += '\tWARNING: SE White List: '+self.cfg_params['GRID.se_white_list']+'\n'
449     msg += '\t(Hint: By whitelisting you force the job to run at this particular site(s).\n'
450     msg += '\tPlease check if the dataset is available at this site!)'
451 spiga 1.14 if self.cfg_params.has_key('GRID.ce_white_list'):
452 spiga 1.25 msg += '\tWARNING: CE White List: '+self.cfg_params['GRID.ce_white_list']+'\n'
453     msg += '\t(Hint: By whitelisting you force the job to run at this particular site(s).\n'
454     msg += '\tPlease check if the dataset is available at this site!)\n'
455 spiga 1.1
456 spiga 1.13 common.logger.info(msg)
457 spiga 1.1
458 spiga 1.10 if bloskNoSite == allBlock:
459 spiga 1.54 msg = 'Requested jobs cannot be Created! \n'
460 spiga 1.32 if self.cfg_params.has_key('GRID.se_white_list'):
461     msg += '\tWARNING: SE White List: '+self.cfg_params['GRID.se_white_list']+'\n'
462     msg += '\t(Hint: By whitelisting you force the job to run at this particular site(s).\n'
463     msg += '\tPlease check if the dataset is available at this site!)'
464     if self.cfg_params.has_key('GRID.ce_white_list'):
465     msg += '\tWARNING: CE White List: '+self.cfg_params['GRID.ce_white_list']+'\n'
466     msg += '\t(Hint: By whitelisting you force the job to run at this particular site(s).\n'
467     msg += '\tPlease check if the dataset is available at this site!)\n'
468     raise CrabException(msg)
469 spiga 1.10
470 spiga 1.1 return
471    
472    
473     ########################################################################
474 ewv 1.22 def jobSplittingByRun(self):
475 spiga 1.1 """
476     """
477    
478     self.checkUserSettings()
479 ewv 1.22 blockSites = self.args['blockSites']
480 spiga 1.4 pubdata = self.args['pubdata']
481 spiga 1.1
482     if self.selectNumberOfJobs == 0 :
483     self.theNumberOfJobs = 9999999
484     blocks = {}
485 ewv 1.22 runList = []
486 spiga 1.1 thefiles = Fileset(name='FilesToSplit')
487 spiga 1.3 fileList = pubdata.getListFiles()
488 spiga 1.1 for f in fileList:
489     block = f['Block']['Name']
490 ewv 1.22 try:
491 spiga 1.3 f['Block']['StorageElementList'].extend(blockSites[block])
492 spiga 1.1 except:
493     continue
494     wmbsFile = File(f['LogicalFileName'])
495 spiga 1.53 if not blockSites[block]:
496     msg = 'WARNING: No sites are hosting any part of data for block: %s\n' %block
497     msg += 'Related jobs will not be submitted and this block of data can not be analyzed'
498     common.logger.debug(msg)
499 spiga 1.3 [ wmbsFile['locations'].add(x) for x in blockSites[block] ]
500 spiga 1.1 wmbsFile['block'] = block
501     runNum = f['RunsList'][0]['RunNumber']
502 ewv 1.22 runList.append(runNum)
503 spiga 1.1 myRun = Run(runNumber=runNum)
504     wmbsFile.addRun( myRun )
505     thefiles.addFile(
506     wmbsFile
507     )
508 ewv 1.22
509 spiga 1.1 work = Workflow()
510     subs = Subscription(
511     fileset = thefiles,
512     workflow = work,
513     split_algo = 'RunBased',
514     type = "Processing")
515     splitter = SplitterFactory()
516     jobfactory = splitter(subs)
517 ewv 1.22
518     #loop over all runs
519 spiga 1.1 list_of_lists = []
520     jobDestination = []
521 spiga 1.39 list_of_blocks = []
522 spiga 1.1 count = 0
523 spiga 1.17 for jobGroup in jobfactory():
524 spiga 1.1 if count < self.theNumberOfJobs:
525 spiga 1.17 res = self.getJobInfo(jobGroup)
526 ewv 1.22 parString = ''
527 spiga 1.1 for file in res['lfns']:
528 spiga 1.11 parString += file + ','
529 spiga 1.49 list_of_blocks.append(res['block'])
530 spiga 1.11 fullString = parString[:-1]
531 spiga 1.49 blockString=','.join(list_of_blocks)
532 spiga 1.52 list_of_lists.append([fullString,str(-1),str(0),blockString])
533 spiga 1.2 #need to check single file location
534 ewv 1.22 jobDestination.append(res['locations'])
535 spiga 1.1 count +=1
536 ewv 1.37 # prepare dict output
537 spiga 1.1 dictOut = {}
538 spiga 1.49 dictOut['params']= ['InputFiles','MaxEvents','SkipEvents','InputBlocks']
539 spiga 1.1 dictOut['args'] = list_of_lists
540     dictOut['jobDestination'] = jobDestination
541     dictOut['njobs']=count
542 ewv 1.41 self.cacheBlocks(list_of_blocks,jobDestination)
543 spiga 1.39
544 spiga 1.1 return dictOut
545    
546     def getJobInfo( self,jobGroup ):
547     res = {}
548 ewv 1.22 lfns = []
549     locations = []
550 spiga 1.1 tmp_check=0
551     for job in jobGroup.jobs:
552     for file in job.getFiles():
553 ewv 1.22 lfns.append(file['lfn'])
554 spiga 1.1 for loc in file['locations']:
555     if tmp_check < 1 :
556     locations.append(loc)
557 spiga 1.39 res['block']= file['block']
558 ewv 1.22 tmp_check = tmp_check + 1
559     res['lfns'] = lfns
560     res['locations'] = locations
561     return res
562    
563 spiga 1.1 ########################################################################
564 spiga 1.23 def prepareSplittingNoInput(self):
565 spiga 1.1 """
566     """
567     if (self.selectEventsPerJob):
568 spiga 1.13 common.logger.info('Required '+str(self.eventsPerJob)+' events per job ')
569 spiga 1.1 if (self.selectNumberOfJobs):
570 spiga 1.13 common.logger.info('Required '+str(self.theNumberOfJobs)+' jobs in total ')
571 spiga 1.1 if (self.selectTotalNumberEvents):
572 spiga 1.13 common.logger.info('Required '+str(self.total_number_of_events)+' events in total ')
573 spiga 1.1
574     if (self.total_number_of_events < 0):
575     msg='Cannot split jobs per Events with "-1" as total number of events'
576     raise CrabException(msg)
577    
578     if (self.selectEventsPerJob):
579     if (self.selectTotalNumberEvents):
580     self.total_number_of_jobs = int(self.total_number_of_events/self.eventsPerJob)
581     elif(self.selectNumberOfJobs) :
582     self.total_number_of_jobs =self.theNumberOfJobs
583     self.total_number_of_events =int(self.theNumberOfJobs*self.eventsPerJob)
584    
585     elif (self.selectNumberOfJobs) :
586     self.total_number_of_jobs = self.theNumberOfJobs
587     self.eventsPerJob = int(self.total_number_of_events/self.total_number_of_jobs)
588    
589 spiga 1.23
590     def jobSplittingNoInput(self):
591     """
592     Perform job splitting based on number of event per job
593     """
594     common.logger.debug('Splitting per events')
595     self.checkUserSettings()
596     jobDestination=[]
597     if ( (self.selectTotalNumberEvents + self.selectEventsPerJob + self.selectNumberOfJobs) != 2 ):
598     msg = 'Must define exactly two of total_number_of_events, events_per_job, or number_of_jobs.'
599     raise CrabException(msg)
600    
601     managedGenerators =self.args['managedGenerators']
602     generator = self.args['generator']
603 spiga 1.56 firstLumi = int(self.cfg_params.get('CMSSW.first_lumi', 1))
604 spiga 1.23
605     self.prepareSplittingNoInput()
606    
607 spiga 1.13 common.logger.debug('N jobs '+str(self.total_number_of_jobs))
608 spiga 1.1
609     # is there any remainder?
610     check = int(self.total_number_of_events) - (int(self.total_number_of_jobs)*self.eventsPerJob)
611    
612 spiga 1.13 common.logger.debug('Check '+str(check))
613 spiga 1.1
614 spiga 1.13 common.logger.info(str(self.total_number_of_jobs)+' jobs can be created, each for '+str(self.eventsPerJob)+' for a total of '+str(self.total_number_of_jobs*self.eventsPerJob)+' events')
615 spiga 1.1 if check > 0:
616 spiga 1.13 common.logger.info('Warning: asked '+str(self.total_number_of_events)+' but can do only '+str(int(self.total_number_of_jobs)*self.eventsPerJob))
617 spiga 1.1
618     # argument is seed number.$i
619     self.list_of_args = []
620     for i in range(self.total_number_of_jobs):
621     ## Since there is no input, any site is good
622 ewv 1.29 jobDestination.append([""]) # must be empty to correctly write the XML
623 spiga 1.1 args=[]
624 ewv 1.30 if (firstLumi): # Pythia first lumi
625     args.append(str(int(firstLumi)+i))
626 spiga 1.3 if (generator in managedGenerators):
627 ewv 1.22 args.append(generator)
628     if (generator == 'comphep' and i == 0):
629 spiga 1.1 # COMPHEP is brain-dead and wants event #'s like 1,100,200,300
630     args.append('1')
631 ewv 1.22 else:
632 spiga 1.1 args.append(str(i*self.eventsPerJob))
633 spiga 1.7 args.append(str(self.eventsPerJob))
634 spiga 1.1 self.list_of_args.append(args)
635     # prepare dict output
636 spiga 1.11
637 spiga 1.1 dictOut = {}
638 spiga 1.11 dictOut['params'] = ['MaxEvents']
639 ewv 1.30 if (firstLumi):
640     dictOut['params'] = ['FirstLumi','MaxEvents']
641     if (generator in managedGenerators):
642     dictOut['params'] = ['FirstLumi', 'Generator', 'FirstEvent', 'MaxEvents']
643 ewv 1.22 else:
644     if (generator in managedGenerators) :
645     dictOut['params'] = ['Generator', 'FirstEvent', 'MaxEvents']
646 spiga 1.1 dictOut['args'] = self.list_of_args
647 spiga 1.3 dictOut['jobDestination'] = jobDestination
648 spiga 1.1 dictOut['njobs']=self.total_number_of_jobs
649    
650     return dictOut
651    
652    
653     def jobSplittingForScript(self):
654     """
655     Perform job splitting based on number of job
656     """
657     self.checkUserSettings()
658 spiga 1.3 if (self.selectNumberOfJobs == 0):
659 spiga 1.1 msg = 'must specify number_of_jobs.'
660     raise crabexception(msg)
661 spiga 1.3 jobDestination = []
662 spiga 1.13 common.logger.debug('Splitting per job')
663     common.logger.info('Required '+str(self.theNumberOfJobs)+' jobs in total ')
664 spiga 1.1
665 spiga 1.23 # self.total_number_of_jobs = self.theNumberOfJobs
666    
667     self.prepareSplittingNoInput()
668 spiga 1.1
669 spiga 1.13 common.logger.debug('N jobs '+str(self.total_number_of_jobs))
670 spiga 1.1
671 spiga 1.13 common.logger.info(str(self.total_number_of_jobs)+' jobs can be created')
672 spiga 1.1
673     # argument is seed number.$i
674 spiga 1.23 self.list_of_args = []
675 spiga 1.1 for i in range(self.total_number_of_jobs):
676 spiga 1.23 args=[]
677 spiga 1.3 jobDestination.append([""])
678 spiga 1.23 if self.eventsPerJob != 0 :
679     args.append(str(self.eventsPerJob))
680     self.list_of_args.append(args)
681 spiga 1.1
682     # prepare dict output
683     dictOut = {}
684 spiga 1.23 dictOut['params'] = ['MaxEvents']
685     dictOut['args'] = self.list_of_args
686 spiga 1.3 dictOut['jobDestination'] = jobDestination
687 spiga 1.1 dictOut['njobs']=self.total_number_of_jobs
688     return dictOut
689    
690 ewv 1.22
691     def jobSplittingByLumi(self):
692 spiga 1.1 """
693 ewv 1.26 Split task into jobs by Lumi section paying attention to which
694     lumis should be run (according to the analysis dataset).
695     This uses WMBS job splitting which does not split files over jobs
696     so the job will have AT LEAST as many lumis as requested, perhaps
697     more
698 spiga 1.1 """
699 spiga 1.46 self.useParent = int(self.cfg_params.get('CMSSW.use_parent',0))
700 ewv 1.26 common.logger.debug('Splitting by Lumi')
701 ewv 1.27 self.checkLumiSettings()
702 ewv 1.26
703     blockSites = self.args['blockSites']
704     pubdata = self.args['pubdata']
705    
706     lumisPerFile = pubdata.getLumis()
707 spiga 1.46 self.parentFiles=pubdata.getParent()
708 ewv 1.26 # Make the list of WMBS files for job splitter
709     fileList = pubdata.getListFiles()
710 ewv 1.45 wmFileList = []
711 ewv 1.26 for jobFile in fileList:
712     block = jobFile['Block']['Name']
713     try:
714     jobFile['Block']['StorageElementList'].extend(blockSites[block])
715     except:
716     continue
717     wmbsFile = File(jobFile['LogicalFileName'])
718 ewv 1.42 if not blockSites[block]:
719 spiga 1.53 msg = 'WARNING: No sites are hosting any part of data for block: %s\n' %block
720     msg += 'Related jobs will not be submitted and this block of data can not be analyzed'
721     common.logger.debug(msg)
722     # wmbsFile['locations'].add('Nowhere')
723 ewv 1.26 [ wmbsFile['locations'].add(x) for x in blockSites[block] ]
724     wmbsFile['block'] = block
725     for lumi in lumisPerFile[jobFile['LogicalFileName']]:
726     wmbsFile.addRun(Run(lumi[0], lumi[1]))
727 ewv 1.45 wmFileList.append(wmbsFile)
728    
729     fileSet = set(wmFileList)
730     thefiles = Fileset(name='FilesToSplit', files = fileSet)
731 ewv 1.26
732 ewv 1.27 # Create the factory and workflow
733 ewv 1.26 work = Workflow()
734     subs = Subscription(fileset = thefiles, workflow = work,
735     split_algo = 'LumiBased', type = "Processing")
736     splitter = SplitterFactory()
737     jobFactory = splitter(subs)
738    
739     list_of_lists = []
740     jobDestination = []
741     jobCount = 0
742 ewv 1.27 lumisCreated = 0
743 spiga 1.39 list_of_blocks = []
744 ewv 1.27 if not self.limitJobLumis:
745 ewv 1.48 if self.totalNLumis > 0:
746 ewv 1.47 self.lumisPerJob = max(self.totalNLumis // self.theNumberOfJobs,1)
747     else:
748     self.lumisPerJob = pubdata.getMaxLumis() // self.theNumberOfJobs + 1
749 ewv 1.27 common.logger.info('Each job will process about %s lumis.' %
750     self.lumisPerJob)
751    
752 ewv 1.38 for jobGroup in jobFactory(lumis_per_job = self.lumisPerJob):
753 ewv 1.26 for job in jobGroup.jobs:
754 ewv 1.27 if (self.limitNJobs and jobCount >= self.theNumberOfJobs):
755 ewv 1.44 common.logger.info('Requested number of jobs reached.')
756 ewv 1.27 break
757     if (self.limitTotalLumis and lumisCreated >= self.totalNLumis):
758 ewv 1.44 common.logger.info('Requested number of lumis reached.')
759 ewv 1.27 break
760     lumis = []
761     lfns = []
762 spiga 1.46 if self.useParent==1:
763 ewv 1.47 parentlfns = []
764 spiga 1.46 pString =""
765    
766 ewv 1.27 locations = []
767 spiga 1.39 blocks = []
768 ewv 1.27 firstFile = True
769     # Collect information from all the files
770     for jobFile in job.getFiles():
771 ewv 1.34 doFile = False
772 ewv 1.27 if firstFile: # Get locations from first file in the job
773     for loc in jobFile['locations']:
774     locations.append(loc)
775 spiga 1.39 blocks.append(jobFile['block'])
776 ewv 1.27 firstFile = False
777     # Accumulate Lumis from all files
778     for lumiList in jobFile['runs']:
779     theRun = lumiList.run
780     for theLumi in list(lumiList):
781 ewv 1.35 if (not self.limitTotalLumis) or \
782 ewv 1.38 (lumisCreated < self.totalNLumis):
783 ewv 1.34 doFile = True
784 ewv 1.36 lumisCreated += 1
785 ewv 1.34 lumis.append( (theRun, theLumi) )
786     if doFile:
787     lfns.append(jobFile['lfn'])
788 spiga 1.46 if self.useParent==1:
789     parent = self.parentFiles[jobFile['lfn']]
790     for p in parent :
791     pString += p + ','
792 ewv 1.27 fileString = ','.join(lfns)
793 spiga 1.33 lumiLister = LumiList(lumis = lumis)
794     lumiString = lumiLister.getCMSSWString()
795 spiga 1.49 blockString=','.join(blocks)
796 spiga 1.46 if self.useParent==1:
797     common.logger.debug("Files: "+fileString+" with the following parents: "+pString[:-1])
798     pfileString = pString[:-1]
799 spiga 1.49 list_of_lists.append([fileString, pfileString, str(-1), str(0), lumiString,blockString])
800 ewv 1.47 else:
801 spiga 1.49 list_of_lists.append([fileString, str(-1), str(0), lumiString, blockString])
802 spiga 1.39 list_of_blocks.append(blocks)
803 ewv 1.27 jobDestination.append(locations)
804     jobCount += 1
805     common.logger.debug('Job %s will run on %s files and %s lumis '
806     % (jobCount, len(lfns), len(lumis) ))
807    
808     common.logger.info('%s jobs created to run on %s lumis' %
809     (jobCount, lumisCreated))
810 ewv 1.26
811     # Prepare dict output matching back to non-WMBS job creation
812     dictOut = {}
813 spiga 1.49 dictOut['params'] = ['InputFiles', 'MaxEvents', 'SkipEvents', 'Lumis','InputBlocks']
814 ewv 1.47 if self.useParent==1:
815 spiga 1.49 dictOut['params']= ['InputFiles','ParentFiles','MaxEvents','SkipEvents','Lumis','InputBlocks']
816 ewv 1.26 dictOut['args'] = list_of_lists
817     dictOut['jobDestination'] = jobDestination
818     dictOut['njobs'] = jobCount
819 ewv 1.41 self.cacheBlocks(list_of_blocks,jobDestination)
820 spiga 1.39
821 ewv 1.26 return dictOut
822    
823 spiga 1.39 def cacheBlocks(self, blocks,destinations):
824    
825     saveFblocks=''
826     for i in range(len(blocks)):
827     sites=self.blackWhiteListParser.checkWhiteList(self.blackWhiteListParser.checkBlackList(destinations[i]))
828     if len(sites) != 0:
829 spiga 1.40 for block in blocks[i]:
830     saveFblocks += str(block)+'\n'
831 spiga 1.39 writeTXTfile(self, self.fileBlocks_FileName , saveFblocks)
832 ewv 1.26
833 spiga 1.1 def Algos(self):
834     """
835     Define key splittingType matrix
836     """
837 ewv 1.22 SplitAlogs = {
838     'EventBased' : self.jobSplittingByEvent,
839 spiga 1.1 'RunBased' : self.jobSplittingByRun,
840 ewv 1.22 'LumiBased' : self.jobSplittingByLumi,
841     'NoInput' : self.jobSplittingNoInput,
842 spiga 1.1 'ForScript' : self.jobSplittingForScript
843 ewv 1.22 }
844 spiga 1.1 return SplitAlogs
845