ViewVC Help
View File | Revision Log | Show Annotations | Root Listing
root/cvsroot/COMP/CRAB/python/Splitter.py
Revision: 1.61
Committed: Thu Sep 5 14:42:35 2013 UTC (11 years, 7 months ago) by belforte
Content type: text/x-python
Branch: MAIN
CVS Tags: CRAB_2_9_1, CRAB_2_9_1_pre2, CRAB_2_9_1_pre1, HEAD
Changes since 1.60: +17 -4 lines
Log Message:
make sure T1's are blacklisted as needed https://savannah.cern.ch/bugs/index.php?102400

File Contents

# User Rev Content
1 ewv 1.27
2 belforte 1.61 __revision__ = "$Id: Splitter.py,v 1.60 2013/05/22 15:57:41 belforte Exp $"
3     __version__ = "$Revision: 1.60 $"
4 ewv 1.27
5 spiga 1.1 import common
6     from crab_exceptions import *
7     from crab_util import *
8 ewv 1.26
9     from WMCore.DataStructs.File import File
10     from WMCore.DataStructs.Fileset import Fileset
11     from WMCore.DataStructs.Run import Run
12     from WMCore.DataStructs.Subscription import Subscription
13     from WMCore.DataStructs.Workflow import Workflow
14     from WMCore.JobSplitting.SplitterFactory import SplitterFactory
15 spiga 1.1 from WMCore.SiteScreening.BlackWhiteListParser import SEBlackWhiteListParser
16 ewv 1.43 try: # Can remove when CMSSW 3.7 and earlier are dropped
17     from FWCore.PythonUtilities.LumiList import LumiList
18     except ImportError:
19     from LumiList import LumiList
20 spiga 1.1
21     class JobSplitter:
22     def __init__( self, cfg_params, args ):
23 belforte 1.61
24 spiga 1.1 self.cfg_params = cfg_params
25 spiga 1.3 self.args=args
26 ewv 1.27
27     self.lumisPerJob = -1
28     self.totalNLumis = 0
29     self.theNumberOfJobs = 0
30     self.limitNJobs = False
31     self.limitTotalLumis = False
32     self.limitJobLumis = False
33    
34 spiga 1.1 #self.maxEvents
35     # init BlackWhiteListParser
36 spiga 1.24 self.seWhiteList = cfg_params.get('GRID.se_white_list',[])
37 belforte 1.59 if type(self.seWhiteList) == type("string"):
38     self.seWhiteList = self.seWhiteList.split(',')
39 spiga 1.14 seBlackList = cfg_params.get('GRID.se_black_list',[])
40 belforte 1.59 if type(seBlackList) == type("string"):
41     seBlackList = seBlackList.split(',')
42 belforte 1.57 if common.scheduler.name().upper() == 'REMOTEGLIDEIN' :
43     # use central black list
44     removeBList = cfg_params.get("GRID.remove_default_blacklist", 0 )
45     blackAnaOps = None
46     if int(removeBList) == 0:
47     blacklist = Downloader("http://cmsdoc.cern.ch/cms/LCG/crab/config/")
48 belforte 1.59 result = blacklist.config("site_black_list.conf").strip().split(',')
49 belforte 1.57 if result != None:
50     blackAnaOps = result
51     common.logger.debug("Enforced black list: %s "%blackAnaOps)
52     else:
53     common.logger.info("WARNING: Skipping default black list!")
54     if int(removeBList) == 0 and blackAnaOps:
55 belforte 1.59 seBlackList += blackAnaOps
56 belforte 1.57
57 belforte 1.61 self.blackWhiteListParser = SEBlackWhiteListParser(self.seWhiteList, seBlackList, common.logger())
58    
59 belforte 1.58 if seBlackList != []:
60     common.logger.info("SE black list applied to data location: %s" %\
61 belforte 1.57 seBlackList)
62 belforte 1.58 if self.seWhiteList != []:
63     common.logger.info("SE white list applied to data location: %s" %\
64     self.seWhiteList)
65 belforte 1.61 # apply BW list
66     blockSites=args['blockSites']
67     common.logger.debug("List of blocks and used locations (SE):")
68     for block,dlsDest in blockSites.iteritems():
69     noBsites=self.blackWhiteListParser.checkBlackList(dlsDest)
70     sites=self.blackWhiteListParser.checkWhiteList(noBsites)
71     if sites : blockSites[block]=sites
72     common.logger.debug("%s : %s" % (block,sites))
73     args['blockSites']=blockSites
74    
75 spiga 1.39 ## check if has been asked for a non default file to store/read analyzed fileBlocks
76     defaultName = common.work_space.shareDir()+'AnalyzedBlocks.txt'
77     self.fileBlocks_FileName = os.path.abspath(self.cfg_params.get('CMSSW.fileblocks_file',defaultName))
78    
79 spiga 1.1
80     def checkUserSettings(self):
81     ## Events per job
82     if self.cfg_params.has_key('CMSSW.events_per_job'):
83     self.eventsPerJob =int( self.cfg_params['CMSSW.events_per_job'])
84     self.selectEventsPerJob = 1
85     else:
86     self.eventsPerJob = -1
87     self.selectEventsPerJob = 0
88    
89     ## number of jobs
90     if self.cfg_params.has_key('CMSSW.number_of_jobs'):
91     self.theNumberOfJobs =int( self.cfg_params['CMSSW.number_of_jobs'])
92     self.selectNumberOfJobs = 1
93     else:
94     self.theNumberOfJobs = 0
95     self.selectNumberOfJobs = 0
96    
97     if self.cfg_params.has_key('CMSSW.total_number_of_events'):
98     self.total_number_of_events = int(self.cfg_params['CMSSW.total_number_of_events'])
99     self.selectTotalNumberEvents = 1
100     if self.selectNumberOfJobs == 1:
101     if (self.total_number_of_events != -1) and int(self.total_number_of_events) < int(self.theNumberOfJobs):
102     msg = 'Must specify at least one event per job. total_number_of_events > number_of_jobs '
103     raise CrabException(msg)
104     else:
105     self.total_number_of_events = 0
106     self.selectTotalNumberEvents = 0
107    
108 ewv 1.37 return
109 spiga 1.1
110 ewv 1.27 def checkLumiSettings(self):
111     """
112     Check to make sure the user has specified enough information to
113     perform splitting by Lumis to run the job
114     """
115     settings = 0
116     if self.cfg_params.has_key('CMSSW.lumis_per_job'):
117     self.lumisPerJob =int( self.cfg_params['CMSSW.lumis_per_job'])
118     self.limitJobLumis = True
119     settings += 1
120    
121     if self.cfg_params.has_key('CMSSW.number_of_jobs'):
122     self.theNumberOfJobs =int( self.cfg_params['CMSSW.number_of_jobs'])
123     self.limitNJobs = True
124     settings += 1
125    
126     if self.cfg_params.has_key('CMSSW.total_number_of_lumis'):
127     self.totalNLumis = int(self.cfg_params['CMSSW.total_number_of_lumis'])
128     self.limitTotalLumis = (self.totalNLumis != -1)
129     settings += 1
130    
131     if settings != 2:
132 ewv 1.35 msg = 'When splitting by lumi section you must specify two and only two of:\n'
133 ewv 1.28 msg += ' number_of_jobs, lumis_per_job, total_number_of_lumis'
134 ewv 1.27 raise CrabException(msg)
135     if self.limitNJobs and self.limitJobLumis:
136     self.limitTotalLumis = True
137     self.totalNLumis = self.lumisPerJob * self.theNumberOfJobs
138    
139 ewv 1.37 # Has the user specified runselection?
140     if (self.cfg_params.has_key('CMSSW.runselection')):
141     common.logger.info('You have specified runselection and split by lumi.')
142     common.logger.info('Good lumi list will be the intersection of runselection and lumimask or ADS (if any).')
143     return
144 ewv 1.27
145 spiga 1.24 def ComputeSubBlockSites( self, blockSites ):
146     """
147     """
148 belforte 1.61
149 spiga 1.24 sub_blockSites = {}
150     for k,v in blockSites.iteritems():
151     sites=self.blackWhiteListParser.checkWhiteList(v)
152     if sites : sub_blockSites[k]=v
153     if len(sub_blockSites) < 1:
154     msg = 'WARNING: the sites %s is not hosting any part of data.'%self.seWhiteList
155     raise CrabException(msg)
156     return sub_blockSites
157    
158 spiga 1.1 ########################################################################
159     def jobSplittingByEvent( self ):
160     """
161     Perform job splitting. Jobs run over an integer number of files
162     and no more than one block.
163     ARGUMENT: blockSites: dictionary with blocks as keys and list of host sites as values
164     REQUIRES: self.selectTotalNumberEvents, self.selectEventsPerJob, self.selectNumberofJobs,
165     self.total_number_of_events, self.eventsPerJob, self.theNumberOfJobs,
166     self.maxEvents, self.filesbyblock
167 spiga 1.3 SETS: jobDestination - Site destination(s) for each job (a list of lists)
168 spiga 1.1 self.total_number_of_jobs - Total # of jobs
169     self.list_of_args - File(s) job will run on (a list of lists)
170     """
171    
172 belforte 1.61
173 ewv 1.22 jobDestination=[]
174 spiga 1.1 self.checkUserSettings()
175     if ( (self.selectTotalNumberEvents + self.selectEventsPerJob + self.selectNumberOfJobs) != 2 ):
176     msg = 'Must define exactly two of total_number_of_events, events_per_job, or number_of_jobs.'
177     raise CrabException(msg)
178 ewv 1.22
179     blockSites = self.args['blockSites']
180 spiga 1.4 pubdata = self.args['pubdata']
181 spiga 1.3 filesbyblock=pubdata.getFiles()
182    
183     self.eventsbyblock=pubdata.getEventsPerBlock()
184     self.eventsbyfile=pubdata.getEventsPerFile()
185     self.parentFiles=pubdata.getParent()
186 spiga 1.1
187     ## get max number of events
188 spiga 1.3 self.maxEvents=pubdata.getMaxEvents()
189 spiga 1.1
190     self.useParent = int(self.cfg_params.get('CMSSW.use_parent',0))
191     noBboundary = int(self.cfg_params.get('CMSSW.no_block_boundary',0))
192    
193 spiga 1.24 if noBboundary == 1:
194     if self.total_number_of_events== -1:
195     msg = 'You are selecting no_block_boundary=1 which does not allow to set total_number_of_events=-1\n'
196 ewv 1.26 msg +='\tYou shoud get the number of event from DBS web interface and use it for your configuration.'
197     raise CrabException(msg)
198 belforte 1.60 if len(self.seWhiteList) != 1:
199 spiga 1.24 msg = 'You are selecting no_block_boundary=1 which requires to choose one and only one site.\n'
200     msg += "\tPlease set se_white_list with the site's storage element name."
201 ewv 1.26 raise CrabException(msg)
202     blockSites = self.ComputeSubBlockSites(blockSites)
203 spiga 1.24
204 spiga 1.1 # ---- Handle the possible job splitting configurations ---- #
205     if (self.selectTotalNumberEvents):
206     totalEventsRequested = self.total_number_of_events
207     if (self.selectEventsPerJob):
208     eventsPerJobRequested = self.eventsPerJob
209     if (self.selectNumberOfJobs):
210     totalEventsRequested = self.theNumberOfJobs * self.eventsPerJob
211    
212     # If user requested all the events in the dataset
213     if (totalEventsRequested == -1):
214     eventsRemaining=self.maxEvents
215     # If user requested more events than are in the dataset
216     elif (totalEventsRequested > self.maxEvents):
217     eventsRemaining = self.maxEvents
218 spiga 1.13 common.logger.info("Requested "+str(self.total_number_of_events)+ " events, but only "+str(self.maxEvents)+" events are available.")
219 spiga 1.1 # If user requested less events than are in the dataset
220     else:
221     eventsRemaining = totalEventsRequested
222    
223     # If user requested more events per job than are in the dataset
224     if (self.selectEventsPerJob and eventsPerJobRequested > self.maxEvents):
225     eventsPerJobRequested = self.maxEvents
226    
227     # For user info at end
228     totalEventCount = 0
229    
230     if (self.selectTotalNumberEvents and self.selectNumberOfJobs):
231     eventsPerJobRequested = int(eventsRemaining/self.theNumberOfJobs)
232    
233     if (self.selectNumberOfJobs):
234 spiga 1.13 common.logger.info("May not create the exact number_of_jobs requested.")
235 spiga 1.1
236 belforte 1.55 if (self.theNumberOfJobs < 0):
237     common.logger.info("ERROR: Negative number_of_jobs requested. Will result in no jobs.")
238    
239 spiga 1.1 # old... to remove Daniele
240     totalNumberOfJobs = 999999999
241    
242 spiga 1.3 blocks = blockSites.keys()
243 spiga 1.1 blockCount = 0
244     # Backup variable in case self.maxEvents counted events in a non-included block
245     numBlocksInDataset = len(blocks)
246    
247     jobCount = 0
248     list_of_lists = []
249    
250     # list tracking which jobs are in which jobs belong to which block
251     jobsOfBlock = {}
252    
253     parString = ""
254 spiga 1.16 pString = ""
255 spiga 1.1 filesEventCount = 0
256 ewv 1.22 msg=''
257 spiga 1.1
258     # ---- Iterate over the blocks in the dataset until ---- #
259     # ---- we've met the requested total # of events ---- #
260     while ( (eventsRemaining > 0) and (blockCount < numBlocksInDataset) and (jobCount < totalNumberOfJobs)):
261     block = blocks[blockCount]
262     blockCount += 1
263     if block not in jobsOfBlock.keys() :
264     jobsOfBlock[block] = []
265    
266     if self.eventsbyblock.has_key(block) :
267     numEventsInBlock = self.eventsbyblock[block]
268 spiga 1.13 common.logger.debug('Events in Block File '+str(numEventsInBlock))
269 spiga 1.1
270 spiga 1.4 files = filesbyblock[block]
271 spiga 1.1 numFilesInBlock = len(files)
272     if (numFilesInBlock <= 0):
273     continue
274     fileCount = 0
275     if noBboundary == 0: # DD
276     # ---- New block => New job ---- #
277     parString = ""
278 spiga 1.16 pString=""
279 spiga 1.1 # counter for number of events in files currently worked on
280     filesEventCount = 0
281     # flag if next while loop should touch new file
282     newFile = 1
283     # job event counter
284     jobSkipEventCount = 0
285    
286     # ---- Iterate over the files in the block until we've met the requested ---- #
287     # ---- total # of events or we've gone over all the files in this block ---- #
288 spiga 1.15 msg='\n'
289 spiga 1.1 while ( (eventsRemaining > 0) and (fileCount < numFilesInBlock) and (jobCount < totalNumberOfJobs) ):
290     file = files[fileCount]
291     if self.useParent==1:
292     parent = self.parentFiles[file]
293 spiga 1.13 common.logger.log(10-1, "File "+str(file)+" has the following parents: "+str(parent))
294 spiga 1.1 if newFile :
295     try:
296     numEventsInFile = self.eventsbyfile[file]
297 spiga 1.13 common.logger.log(10-1, "File "+str(file)+" has "+str(numEventsInFile)+" events")
298 spiga 1.1 # increase filesEventCount
299     filesEventCount += numEventsInFile
300     # Add file to current job
301 spiga 1.11 parString += file + ','
302 spiga 1.16 if self.useParent==1:
303     for f in parent :
304     pString += f + ','
305 spiga 1.1 newFile = 0
306     except KeyError:
307 spiga 1.13 common.logger.info("File "+str(file)+" has unknown number of events: skipping")
308 spiga 1.1
309     eventsPerJobRequested = min(eventsPerJobRequested, eventsRemaining)
310     # if less events in file remain than eventsPerJobRequested
311     if ( filesEventCount - jobSkipEventCount < eventsPerJobRequested):
312     if noBboundary == 1: ## DD
313     newFile = 1
314     fileCount += 1
315     else:
316     # if last file in block
317     if ( fileCount == numFilesInBlock-1 ) :
318     # end job using last file, use remaining events in block
319     # close job and touch new file
320 spiga 1.11 fullString = parString[:-1]
321 spiga 1.1 if self.useParent==1:
322 spiga 1.11 fullParentString = pString[:-1]
323 spiga 1.50 list_of_lists.append([fullString,fullParentString,str(-1),str(jobSkipEventCount),block])
324 spiga 1.1 else:
325 spiga 1.50 list_of_lists.append([fullString,str(-1),str(jobSkipEventCount),block])
326 spiga 1.15 msg += "Job %s can run over %s events (last file in block).\n"%(str(jobCount+1), str(filesEventCount - jobSkipEventCount))
327 spiga 1.3 jobDestination.append(blockSites[block])
328 slacapra 1.20 msg += "Job %s Destination: %s\n"%(str(jobCount+1),str(SE2CMS(jobDestination[jobCount])))
329 spiga 1.1 # fill jobs of block dictionary
330     jobsOfBlock[block].append(jobCount+1)
331     # reset counter
332     jobCount = jobCount + 1
333     totalEventCount = totalEventCount + filesEventCount - jobSkipEventCount
334     eventsRemaining = eventsRemaining - filesEventCount + jobSkipEventCount
335     jobSkipEventCount = 0
336     # reset file
337     pString = ""
338     parString = ""
339     filesEventCount = 0
340     newFile = 1
341     fileCount += 1
342     else :
343     # go to next file
344     newFile = 1
345     fileCount += 1
346     # if events in file equal to eventsPerJobRequested
347     elif ( filesEventCount - jobSkipEventCount == eventsPerJobRequested ) :
348     # close job and touch new file
349 spiga 1.11 fullString = parString[:-1]
350 spiga 1.1 if self.useParent==1:
351 spiga 1.11 fullParentString = pString[:-1]
352 spiga 1.50 list_of_lists.append([fullString,fullParentString,str(eventsPerJobRequested),str(jobSkipEventCount),block])
353 spiga 1.1 else:
354 spiga 1.50 list_of_lists.append([fullString,str(eventsPerJobRequested),str(jobSkipEventCount),block])
355 spiga 1.15 msg += "Job %s can run over %s events.\n"%(str(jobCount+1),str(eventsPerJobRequested))
356 spiga 1.3 jobDestination.append(blockSites[block])
357 slacapra 1.20 msg+= "Job %s Destination: %s\n"%(str(jobCount+1),str(SE2CMS(jobDestination[jobCount])))
358 spiga 1.1 jobsOfBlock[block].append(jobCount+1)
359     # reset counter
360     jobCount = jobCount + 1
361     totalEventCount = totalEventCount + eventsPerJobRequested
362     eventsRemaining = eventsRemaining - eventsPerJobRequested
363     jobSkipEventCount = 0
364     # reset file
365     pString = ""
366     parString = ""
367     filesEventCount = 0
368     newFile = 1
369     fileCount += 1
370    
371     # if more events in file remain than eventsPerJobRequested
372     else :
373     # close job but don't touch new file
374 spiga 1.11 fullString = parString[:-1]
375 spiga 1.1 if self.useParent==1:
376 spiga 1.11 fullParentString = pString[:-1]
377 spiga 1.50 list_of_lists.append([fullString,fullParentString,str(eventsPerJobRequested),str(jobSkipEventCount),block])
378 spiga 1.1 else:
379 spiga 1.50 list_of_lists.append([fullString,str(eventsPerJobRequested),str(jobSkipEventCount),block])
380 spiga 1.15 msg += "Job %s can run over %s events.\n"%(str(jobCount+1),str(eventsPerJobRequested))
381 spiga 1.3 jobDestination.append(blockSites[block])
382 slacapra 1.20 msg+= "Job %s Destination: %s\n"%(str(jobCount+1),str(SE2CMS(jobDestination[jobCount])))
383 spiga 1.1 jobsOfBlock[block].append(jobCount+1)
384     # increase counter
385     jobCount = jobCount + 1
386     totalEventCount = totalEventCount + eventsPerJobRequested
387     eventsRemaining = eventsRemaining - eventsPerJobRequested
388     # calculate skip events for last file
389     # use filesEventCount (contains several files), jobSkipEventCount and eventsPerJobRequest
390     jobSkipEventCount = eventsPerJobRequested - (filesEventCount - jobSkipEventCount - self.eventsbyfile[file])
391     # remove all but the last file
392     filesEventCount = self.eventsbyfile[file]
393 spiga 1.16 pString_tmp=''
394 spiga 1.1 if self.useParent==1:
395 spiga 1.16 for f in parent : pString_tmp += f + ','
396 ewv 1.22 pString = pString_tmp
397 spiga 1.11 parString = file + ','
398 spiga 1.1 pass # END if
399     pass # END while (iterate over files in the block)
400     pass # END while (iterate over blocks in the dataset)
401 spiga 1.15 common.logger.debug(msg)
402 spiga 1.1 self.ncjobs = self.total_number_of_jobs = jobCount
403     if (eventsRemaining > 0 and jobCount < totalNumberOfJobs ):
404 spiga 1.13 common.logger.info("Could not run on all requested events because some blocks not hosted at allowed sites.")
405     common.logger.info(str(jobCount)+" job(s) can run on "+str(totalEventCount)+" events.\n")
406 ewv 1.22
407 spiga 1.1 # skip check on block with no sites DD
408 spiga 1.5 if noBboundary == 0 : self.checkBlockNoSite(blocks,jobsOfBlock,blockSites)
409 spiga 1.1
410     # prepare dict output
411     dictOut = {}
412 spiga 1.49 dictOut['params']= ['InputFiles','MaxEvents','SkipEvents','InputBlocks']
413     if self.useParent: dictOut['params']= ['InputFiles','ParentFiles','MaxEvents','SkipEvents','InputBlocks']
414 spiga 1.1 dictOut['args'] = list_of_lists
415 spiga 1.3 dictOut['jobDestination'] = jobDestination
416 spiga 1.1 dictOut['njobs']=self.total_number_of_jobs
417    
418     return dictOut
419    
420     # keep trace of block with no sites to print a warning at the end
421    
422 ewv 1.22 def checkBlockNoSite(self,blocks,jobsOfBlock,blockSites):
423 spiga 1.1 # screen output
424     screenOutput = "List of jobs and available destination sites:\n\n"
425     noSiteBlock = []
426     bloskNoSite = []
427 spiga 1.10 allBlock = []
428 spiga 1.1
429     blockCounter = 0
430 spiga 1.39 saveFblocks =''
431 spiga 1.1 for block in blocks:
432     if block in jobsOfBlock.keys() :
433     blockCounter += 1
434 spiga 1.10 allBlock.append( blockCounter )
435 slacapra 1.19 sites=self.blackWhiteListParser.checkWhiteList(self.blackWhiteListParser.checkBlackList(blockSites[block],[block]),[block])
436 spiga 1.1 screenOutput += "Block %5i: jobs %20s: sites: %s\n" % (blockCounter,spanRanges(jobsOfBlock[block]),
437 slacapra 1.20 ', '.join(SE2CMS(sites)))
438 slacapra 1.19 if len(sites) == 0:
439 spiga 1.1 noSiteBlock.append( spanRanges(jobsOfBlock[block]) )
440     bloskNoSite.append( blockCounter )
441 spiga 1.39 else:
442     saveFblocks += str(block)+'\n'
443     writeTXTfile(self, self.fileBlocks_FileName , saveFblocks)
444 spiga 1.1
445 spiga 1.13 common.logger.info(screenOutput)
446 spiga 1.1 if len(noSiteBlock) > 0 and len(bloskNoSite) > 0:
447     msg = 'WARNING: No sites are hosting any part of data for block:\n '
448     virgola = ""
449     if len(bloskNoSite) > 1:
450     virgola = ","
451     for block in bloskNoSite:
452     msg += ' ' + str(block) + virgola
453 spiga 1.25 msg += '\n\t\tRelated jobs:\n '
454 spiga 1.1 virgola = ""
455     if len(noSiteBlock) > 1:
456     virgola = ","
457     for range_jobs in noSiteBlock:
458     msg += str(range_jobs) + virgola
459 spiga 1.25 msg += '\n\t\twill not be submitted and this block of data can not be analyzed!\n'
460 spiga 1.14 if self.cfg_params.has_key('GRID.se_white_list'):
461 spiga 1.25 msg += '\tWARNING: SE White List: '+self.cfg_params['GRID.se_white_list']+'\n'
462     msg += '\t(Hint: By whitelisting you force the job to run at this particular site(s).\n'
463     msg += '\tPlease check if the dataset is available at this site!)'
464 spiga 1.14 if self.cfg_params.has_key('GRID.ce_white_list'):
465 spiga 1.25 msg += '\tWARNING: CE White List: '+self.cfg_params['GRID.ce_white_list']+'\n'
466     msg += '\t(Hint: By whitelisting you force the job to run at this particular site(s).\n'
467     msg += '\tPlease check if the dataset is available at this site!)\n'
468 spiga 1.1
469 spiga 1.13 common.logger.info(msg)
470 spiga 1.1
471 spiga 1.10 if bloskNoSite == allBlock:
472 spiga 1.54 msg = 'Requested jobs cannot be Created! \n'
473 spiga 1.32 if self.cfg_params.has_key('GRID.se_white_list'):
474     msg += '\tWARNING: SE White List: '+self.cfg_params['GRID.se_white_list']+'\n'
475     msg += '\t(Hint: By whitelisting you force the job to run at this particular site(s).\n'
476     msg += '\tPlease check if the dataset is available at this site!)'
477     if self.cfg_params.has_key('GRID.ce_white_list'):
478     msg += '\tWARNING: CE White List: '+self.cfg_params['GRID.ce_white_list']+'\n'
479     msg += '\t(Hint: By whitelisting you force the job to run at this particular site(s).\n'
480     msg += '\tPlease check if the dataset is available at this site!)\n'
481     raise CrabException(msg)
482 spiga 1.10
483 spiga 1.1 return
484    
485    
486     ########################################################################
487 ewv 1.22 def jobSplittingByRun(self):
488 spiga 1.1 """
489     """
490    
491     self.checkUserSettings()
492 ewv 1.22 blockSites = self.args['blockSites']
493 spiga 1.4 pubdata = self.args['pubdata']
494 spiga 1.1
495     if self.selectNumberOfJobs == 0 :
496     self.theNumberOfJobs = 9999999
497     blocks = {}
498 ewv 1.22 runList = []
499 spiga 1.1 thefiles = Fileset(name='FilesToSplit')
500 spiga 1.3 fileList = pubdata.getListFiles()
501 spiga 1.1 for f in fileList:
502     block = f['Block']['Name']
503 ewv 1.22 try:
504 spiga 1.3 f['Block']['StorageElementList'].extend(blockSites[block])
505 spiga 1.1 except:
506     continue
507     wmbsFile = File(f['LogicalFileName'])
508 spiga 1.53 if not blockSites[block]:
509     msg = 'WARNING: No sites are hosting any part of data for block: %s\n' %block
510     msg += 'Related jobs will not be submitted and this block of data can not be analyzed'
511     common.logger.debug(msg)
512 spiga 1.3 [ wmbsFile['locations'].add(x) for x in blockSites[block] ]
513 spiga 1.1 wmbsFile['block'] = block
514     runNum = f['RunsList'][0]['RunNumber']
515 ewv 1.22 runList.append(runNum)
516 spiga 1.1 myRun = Run(runNumber=runNum)
517     wmbsFile.addRun( myRun )
518     thefiles.addFile(
519     wmbsFile
520     )
521 ewv 1.22
522 spiga 1.1 work = Workflow()
523     subs = Subscription(
524     fileset = thefiles,
525     workflow = work,
526     split_algo = 'RunBased',
527     type = "Processing")
528     splitter = SplitterFactory()
529     jobfactory = splitter(subs)
530 ewv 1.22
531     #loop over all runs
532 spiga 1.1 list_of_lists = []
533     jobDestination = []
534 spiga 1.39 list_of_blocks = []
535 spiga 1.1 count = 0
536 spiga 1.17 for jobGroup in jobfactory():
537 spiga 1.1 if count < self.theNumberOfJobs:
538 spiga 1.17 res = self.getJobInfo(jobGroup)
539 ewv 1.22 parString = ''
540 spiga 1.1 for file in res['lfns']:
541 spiga 1.11 parString += file + ','
542 spiga 1.49 list_of_blocks.append(res['block'])
543 spiga 1.11 fullString = parString[:-1]
544 spiga 1.49 blockString=','.join(list_of_blocks)
545 spiga 1.52 list_of_lists.append([fullString,str(-1),str(0),blockString])
546 spiga 1.2 #need to check single file location
547 ewv 1.22 jobDestination.append(res['locations'])
548 spiga 1.1 count +=1
549 ewv 1.37 # prepare dict output
550 spiga 1.1 dictOut = {}
551 spiga 1.49 dictOut['params']= ['InputFiles','MaxEvents','SkipEvents','InputBlocks']
552 spiga 1.1 dictOut['args'] = list_of_lists
553     dictOut['jobDestination'] = jobDestination
554     dictOut['njobs']=count
555 ewv 1.41 self.cacheBlocks(list_of_blocks,jobDestination)
556 spiga 1.39
557 spiga 1.1 return dictOut
558    
559     def getJobInfo( self,jobGroup ):
560     res = {}
561 ewv 1.22 lfns = []
562     locations = []
563 spiga 1.1 tmp_check=0
564     for job in jobGroup.jobs:
565     for file in job.getFiles():
566 ewv 1.22 lfns.append(file['lfn'])
567 spiga 1.1 for loc in file['locations']:
568     if tmp_check < 1 :
569     locations.append(loc)
570 spiga 1.39 res['block']= file['block']
571 ewv 1.22 tmp_check = tmp_check + 1
572     res['lfns'] = lfns
573     res['locations'] = locations
574     return res
575    
576 spiga 1.1 ########################################################################
577 spiga 1.23 def prepareSplittingNoInput(self):
578 spiga 1.1 """
579     """
580     if (self.selectEventsPerJob):
581 spiga 1.13 common.logger.info('Required '+str(self.eventsPerJob)+' events per job ')
582 spiga 1.1 if (self.selectNumberOfJobs):
583 spiga 1.13 common.logger.info('Required '+str(self.theNumberOfJobs)+' jobs in total ')
584 spiga 1.1 if (self.selectTotalNumberEvents):
585 spiga 1.13 common.logger.info('Required '+str(self.total_number_of_events)+' events in total ')
586 spiga 1.1
587     if (self.total_number_of_events < 0):
588     msg='Cannot split jobs per Events with "-1" as total number of events'
589     raise CrabException(msg)
590    
591     if (self.selectEventsPerJob):
592     if (self.selectTotalNumberEvents):
593     self.total_number_of_jobs = int(self.total_number_of_events/self.eventsPerJob)
594     elif(self.selectNumberOfJobs) :
595     self.total_number_of_jobs =self.theNumberOfJobs
596     self.total_number_of_events =int(self.theNumberOfJobs*self.eventsPerJob)
597    
598     elif (self.selectNumberOfJobs) :
599     self.total_number_of_jobs = self.theNumberOfJobs
600     self.eventsPerJob = int(self.total_number_of_events/self.total_number_of_jobs)
601    
602 spiga 1.23
603     def jobSplittingNoInput(self):
604     """
605     Perform job splitting based on number of event per job
606     """
607     common.logger.debug('Splitting per events')
608     self.checkUserSettings()
609     jobDestination=[]
610     if ( (self.selectTotalNumberEvents + self.selectEventsPerJob + self.selectNumberOfJobs) != 2 ):
611     msg = 'Must define exactly two of total_number_of_events, events_per_job, or number_of_jobs.'
612     raise CrabException(msg)
613    
614     managedGenerators =self.args['managedGenerators']
615     generator = self.args['generator']
616 spiga 1.56 firstLumi = int(self.cfg_params.get('CMSSW.first_lumi', 1))
617 spiga 1.23
618     self.prepareSplittingNoInput()
619    
620 spiga 1.13 common.logger.debug('N jobs '+str(self.total_number_of_jobs))
621 spiga 1.1
622     # is there any remainder?
623     check = int(self.total_number_of_events) - (int(self.total_number_of_jobs)*self.eventsPerJob)
624    
625 spiga 1.13 common.logger.debug('Check '+str(check))
626 spiga 1.1
627 spiga 1.13 common.logger.info(str(self.total_number_of_jobs)+' jobs can be created, each for '+str(self.eventsPerJob)+' for a total of '+str(self.total_number_of_jobs*self.eventsPerJob)+' events')
628 spiga 1.1 if check > 0:
629 spiga 1.13 common.logger.info('Warning: asked '+str(self.total_number_of_events)+' but can do only '+str(int(self.total_number_of_jobs)*self.eventsPerJob))
630 spiga 1.1
631     # argument is seed number.$i
632     self.list_of_args = []
633     for i in range(self.total_number_of_jobs):
634     ## Since there is no input, any site is good
635 ewv 1.29 jobDestination.append([""]) # must be empty to correctly write the XML
636 spiga 1.1 args=[]
637 ewv 1.30 if (firstLumi): # Pythia first lumi
638     args.append(str(int(firstLumi)+i))
639 spiga 1.3 if (generator in managedGenerators):
640 ewv 1.22 args.append(generator)
641     if (generator == 'comphep' and i == 0):
642 spiga 1.1 # COMPHEP is brain-dead and wants event #'s like 1,100,200,300
643     args.append('1')
644 ewv 1.22 else:
645 spiga 1.1 args.append(str(i*self.eventsPerJob))
646 spiga 1.7 args.append(str(self.eventsPerJob))
647 spiga 1.1 self.list_of_args.append(args)
648     # prepare dict output
649 spiga 1.11
650 spiga 1.1 dictOut = {}
651 spiga 1.11 dictOut['params'] = ['MaxEvents']
652 ewv 1.30 if (firstLumi):
653     dictOut['params'] = ['FirstLumi','MaxEvents']
654     if (generator in managedGenerators):
655     dictOut['params'] = ['FirstLumi', 'Generator', 'FirstEvent', 'MaxEvents']
656 ewv 1.22 else:
657     if (generator in managedGenerators) :
658     dictOut['params'] = ['Generator', 'FirstEvent', 'MaxEvents']
659 spiga 1.1 dictOut['args'] = self.list_of_args
660 spiga 1.3 dictOut['jobDestination'] = jobDestination
661 spiga 1.1 dictOut['njobs']=self.total_number_of_jobs
662    
663     return dictOut
664    
665    
666     def jobSplittingForScript(self):
667     """
668     Perform job splitting based on number of job
669     """
670     self.checkUserSettings()
671 spiga 1.3 if (self.selectNumberOfJobs == 0):
672 spiga 1.1 msg = 'must specify number_of_jobs.'
673     raise crabexception(msg)
674 spiga 1.3 jobDestination = []
675 spiga 1.13 common.logger.debug('Splitting per job')
676     common.logger.info('Required '+str(self.theNumberOfJobs)+' jobs in total ')
677 spiga 1.1
678 spiga 1.23 # self.total_number_of_jobs = self.theNumberOfJobs
679    
680     self.prepareSplittingNoInput()
681 spiga 1.1
682 spiga 1.13 common.logger.debug('N jobs '+str(self.total_number_of_jobs))
683 spiga 1.1
684 spiga 1.13 common.logger.info(str(self.total_number_of_jobs)+' jobs can be created')
685 spiga 1.1
686     # argument is seed number.$i
687 spiga 1.23 self.list_of_args = []
688 spiga 1.1 for i in range(self.total_number_of_jobs):
689 spiga 1.23 args=[]
690 spiga 1.3 jobDestination.append([""])
691 spiga 1.23 if self.eventsPerJob != 0 :
692     args.append(str(self.eventsPerJob))
693     self.list_of_args.append(args)
694 spiga 1.1
695     # prepare dict output
696     dictOut = {}
697 spiga 1.23 dictOut['params'] = ['MaxEvents']
698     dictOut['args'] = self.list_of_args
699 spiga 1.3 dictOut['jobDestination'] = jobDestination
700 spiga 1.1 dictOut['njobs']=self.total_number_of_jobs
701     return dictOut
702    
703 ewv 1.22
704     def jobSplittingByLumi(self):
705 spiga 1.1 """
706 ewv 1.26 Split task into jobs by Lumi section paying attention to which
707     lumis should be run (according to the analysis dataset).
708     This uses WMBS job splitting which does not split files over jobs
709     so the job will have AT LEAST as many lumis as requested, perhaps
710     more
711 spiga 1.1 """
712 spiga 1.46 self.useParent = int(self.cfg_params.get('CMSSW.use_parent',0))
713 ewv 1.26 common.logger.debug('Splitting by Lumi')
714 ewv 1.27 self.checkLumiSettings()
715 ewv 1.26
716     blockSites = self.args['blockSites']
717     pubdata = self.args['pubdata']
718    
719     lumisPerFile = pubdata.getLumis()
720 spiga 1.46 self.parentFiles=pubdata.getParent()
721 ewv 1.26 # Make the list of WMBS files for job splitter
722     fileList = pubdata.getListFiles()
723 ewv 1.45 wmFileList = []
724 ewv 1.26 for jobFile in fileList:
725     block = jobFile['Block']['Name']
726     try:
727     jobFile['Block']['StorageElementList'].extend(blockSites[block])
728     except:
729     continue
730     wmbsFile = File(jobFile['LogicalFileName'])
731 ewv 1.42 if not blockSites[block]:
732 spiga 1.53 msg = 'WARNING: No sites are hosting any part of data for block: %s\n' %block
733     msg += 'Related jobs will not be submitted and this block of data can not be analyzed'
734     common.logger.debug(msg)
735     # wmbsFile['locations'].add('Nowhere')
736 ewv 1.26 [ wmbsFile['locations'].add(x) for x in blockSites[block] ]
737     wmbsFile['block'] = block
738     for lumi in lumisPerFile[jobFile['LogicalFileName']]:
739     wmbsFile.addRun(Run(lumi[0], lumi[1]))
740 ewv 1.45 wmFileList.append(wmbsFile)
741    
742     fileSet = set(wmFileList)
743     thefiles = Fileset(name='FilesToSplit', files = fileSet)
744 ewv 1.26
745 ewv 1.27 # Create the factory and workflow
746 ewv 1.26 work = Workflow()
747     subs = Subscription(fileset = thefiles, workflow = work,
748     split_algo = 'LumiBased', type = "Processing")
749     splitter = SplitterFactory()
750     jobFactory = splitter(subs)
751    
752     list_of_lists = []
753     jobDestination = []
754     jobCount = 0
755 ewv 1.27 lumisCreated = 0
756 spiga 1.39 list_of_blocks = []
757 ewv 1.27 if not self.limitJobLumis:
758 ewv 1.48 if self.totalNLumis > 0:
759 ewv 1.47 self.lumisPerJob = max(self.totalNLumis // self.theNumberOfJobs,1)
760     else:
761     self.lumisPerJob = pubdata.getMaxLumis() // self.theNumberOfJobs + 1
762 ewv 1.27 common.logger.info('Each job will process about %s lumis.' %
763     self.lumisPerJob)
764    
765 ewv 1.38 for jobGroup in jobFactory(lumis_per_job = self.lumisPerJob):
766 ewv 1.26 for job in jobGroup.jobs:
767 ewv 1.27 if (self.limitNJobs and jobCount >= self.theNumberOfJobs):
768 ewv 1.44 common.logger.info('Requested number of jobs reached.')
769 ewv 1.27 break
770     if (self.limitTotalLumis and lumisCreated >= self.totalNLumis):
771 ewv 1.44 common.logger.info('Requested number of lumis reached.')
772 ewv 1.27 break
773     lumis = []
774     lfns = []
775 spiga 1.46 if self.useParent==1:
776 ewv 1.47 parentlfns = []
777 spiga 1.46 pString =""
778    
779 ewv 1.27 locations = []
780 spiga 1.39 blocks = []
781 ewv 1.27 firstFile = True
782     # Collect information from all the files
783     for jobFile in job.getFiles():
784 ewv 1.34 doFile = False
785 ewv 1.27 if firstFile: # Get locations from first file in the job
786     for loc in jobFile['locations']:
787     locations.append(loc)
788 spiga 1.39 blocks.append(jobFile['block'])
789 ewv 1.27 firstFile = False
790     # Accumulate Lumis from all files
791     for lumiList in jobFile['runs']:
792     theRun = lumiList.run
793     for theLumi in list(lumiList):
794 ewv 1.35 if (not self.limitTotalLumis) or \
795 ewv 1.38 (lumisCreated < self.totalNLumis):
796 ewv 1.34 doFile = True
797 ewv 1.36 lumisCreated += 1
798 ewv 1.34 lumis.append( (theRun, theLumi) )
799     if doFile:
800     lfns.append(jobFile['lfn'])
801 spiga 1.46 if self.useParent==1:
802     parent = self.parentFiles[jobFile['lfn']]
803     for p in parent :
804     pString += p + ','
805 ewv 1.27 fileString = ','.join(lfns)
806 spiga 1.33 lumiLister = LumiList(lumis = lumis)
807     lumiString = lumiLister.getCMSSWString()
808 spiga 1.49 blockString=','.join(blocks)
809 spiga 1.46 if self.useParent==1:
810     common.logger.debug("Files: "+fileString+" with the following parents: "+pString[:-1])
811     pfileString = pString[:-1]
812 spiga 1.49 list_of_lists.append([fileString, pfileString, str(-1), str(0), lumiString,blockString])
813 ewv 1.47 else:
814 spiga 1.49 list_of_lists.append([fileString, str(-1), str(0), lumiString, blockString])
815 spiga 1.39 list_of_blocks.append(blocks)
816 ewv 1.27 jobDestination.append(locations)
817     jobCount += 1
818     common.logger.debug('Job %s will run on %s files and %s lumis '
819     % (jobCount, len(lfns), len(lumis) ))
820    
821     common.logger.info('%s jobs created to run on %s lumis' %
822     (jobCount, lumisCreated))
823 ewv 1.26
824     # Prepare dict output matching back to non-WMBS job creation
825     dictOut = {}
826 spiga 1.49 dictOut['params'] = ['InputFiles', 'MaxEvents', 'SkipEvents', 'Lumis','InputBlocks']
827 ewv 1.47 if self.useParent==1:
828 spiga 1.49 dictOut['params']= ['InputFiles','ParentFiles','MaxEvents','SkipEvents','Lumis','InputBlocks']
829 ewv 1.26 dictOut['args'] = list_of_lists
830     dictOut['jobDestination'] = jobDestination
831     dictOut['njobs'] = jobCount
832 ewv 1.41 self.cacheBlocks(list_of_blocks,jobDestination)
833 spiga 1.39
834 ewv 1.26 return dictOut
835    
836 spiga 1.39 def cacheBlocks(self, blocks,destinations):
837    
838     saveFblocks=''
839     for i in range(len(blocks)):
840     sites=self.blackWhiteListParser.checkWhiteList(self.blackWhiteListParser.checkBlackList(destinations[i]))
841     if len(sites) != 0:
842 spiga 1.40 for block in blocks[i]:
843     saveFblocks += str(block)+'\n'
844 spiga 1.39 writeTXTfile(self, self.fileBlocks_FileName , saveFblocks)
845 ewv 1.26
846 spiga 1.1 def Algos(self):
847     """
848     Define key splittingType matrix
849     """
850 ewv 1.22 SplitAlogs = {
851     'EventBased' : self.jobSplittingByEvent,
852 spiga 1.1 'RunBased' : self.jobSplittingByRun,
853 ewv 1.22 'LumiBased' : self.jobSplittingByLumi,
854     'NoInput' : self.jobSplittingNoInput,
855 spiga 1.1 'ForScript' : self.jobSplittingForScript
856 ewv 1.22 }
857 spiga 1.1 return SplitAlogs
858