2 |
|
from crab_logger import Logger |
3 |
|
from crab_exceptions import * |
4 |
|
from crab_util import * |
5 |
< |
from BlackWhiteListParser import BlackWhiteListParser |
5 |
> |
from BlackWhiteListParser import SEBlackWhiteListParser |
6 |
|
import common |
7 |
|
import Scram |
8 |
– |
from LFNBaseName import * |
8 |
|
|
9 |
|
import os, string, glob |
10 |
|
|
11 |
|
class Cmssw(JobType): |
12 |
< |
def __init__(self, cfg_params, ncjobs): |
12 |
> |
def __init__(self, cfg_params, ncjobs,skip_blocks, isNew): |
13 |
|
JobType.__init__(self, 'CMSSW') |
14 |
|
common.logger.debug(3,'CMSSW::__init__') |
15 |
+ |
self.skip_blocks = skip_blocks |
16 |
|
|
17 |
|
self.argsList = [] |
18 |
|
|
19 |
|
self._params = {} |
20 |
|
self.cfg_params = cfg_params |
21 |
|
# init BlackWhiteListParser |
22 |
< |
self.blackWhiteListParser = BlackWhiteListParser(cfg_params) |
22 |
> |
self.blackWhiteListParser = SEBlackWhiteListParser(cfg_params) |
23 |
|
|
24 |
< |
self.MaxTarBallSize = float(self.cfg_params.get('EDG.maxtarballsize',9.5)) |
24 |
> |
### Temporary patch to automatically skip the ISB size check: |
25 |
> |
server=self.cfg_params.get('CRAB.server_name',None) |
26 |
> |
size = 9.5 |
27 |
> |
if server: size = 99999 |
28 |
> |
### D.S. |
29 |
> |
self.MaxTarBallSize = float(self.cfg_params.get('EDG.maxtarballsize',size)) |
30 |
|
|
31 |
|
# number of jobs requested to be created, limit obj splitting |
32 |
|
self.ncjobs = ncjobs |
64 |
|
if not cfg_params.has_key('CMSSW.datasetpath'): |
65 |
|
msg = "Error: datasetpath not defined " |
66 |
|
raise CrabException(msg) |
67 |
+ |
|
68 |
+ |
### Temporary: added to remove input file control in the case of PU |
69 |
+ |
self.dataset_pu = cfg_params.get('CMSSW.dataset_pu', None) |
70 |
+ |
|
71 |
|
tmp = cfg_params['CMSSW.datasetpath'] |
72 |
|
log.debug(6, "CMSSW::CMSSW(): datasetPath = "+tmp) |
73 |
< |
if string.lower(tmp)=='none': |
73 |
> |
|
74 |
> |
if tmp =='': |
75 |
> |
msg = "Error: datasetpath not defined " |
76 |
> |
raise CrabException(msg) |
77 |
> |
elif string.lower(tmp)=='none': |
78 |
|
self.datasetPath = None |
79 |
|
self.selectNoInput = 1 |
80 |
|
else: |
82 |
|
self.selectNoInput = 0 |
83 |
|
|
84 |
|
self.dataTiers = [] |
85 |
< |
|
86 |
< |
self.debug_pset = cfg_params.get('USER.debug_pset',False) |
87 |
< |
|
85 |
> |
self.debugWrap = '' |
86 |
> |
self.debug_wrapper = cfg_params.get('USER.debug_wrapper',False) |
87 |
> |
if self.debug_wrapper: self.debugWrap='--debug' |
88 |
|
## now the application |
89 |
|
self.executable = cfg_params.get('CMSSW.executable','cmsRun') |
90 |
|
log.debug(6, "CMSSW::CMSSW(): executable = "+self.executable) |
107 |
|
self.output_file_sandbox.append(self.fjrFileName) |
108 |
|
|
109 |
|
# other output files to be returned via sandbox or copied to SE |
110 |
+ |
outfileflag = False |
111 |
|
self.output_file = [] |
112 |
|
tmp = cfg_params.get('CMSSW.output_file',None) |
113 |
|
if tmp : |
114 |
< |
tmpOutFiles = string.split(tmp,',') |
115 |
< |
log.debug(7, 'cmssw::cmssw(): output files '+str(tmpOutFiles)) |
116 |
< |
for tmp in tmpOutFiles: |
117 |
< |
tmp=string.strip(tmp) |
104 |
< |
self.output_file.append(tmp) |
105 |
< |
pass |
106 |
< |
else: |
107 |
< |
log.message("No output file defined: only stdout/err and the CRAB Framework Job Report will be available\n") |
108 |
< |
pass |
114 |
> |
self.output_file = [x.strip() for x in tmp.split(',')] |
115 |
> |
outfileflag = True #output found |
116 |
> |
#else: |
117 |
> |
# log.message("No output file defined: only stdout/err and the CRAB Framework Job Report will be available\n") |
118 |
|
|
119 |
|
# script_exe file as additional file in inputSandbox |
120 |
|
self.scriptExe = cfg_params.get('USER.script_exe',None) |
128 |
|
msg ="Error. script_exe not defined" |
129 |
|
raise CrabException(msg) |
130 |
|
|
131 |
+ |
# use parent files... |
132 |
+ |
self.useParent = self.cfg_params.get('CMSSW.use_parent',False) |
133 |
+ |
|
134 |
|
## additional input files |
135 |
|
if cfg_params.has_key('USER.additional_input_files'): |
136 |
|
tmpAddFiles = string.split(cfg_params['USER.additional_input_files'],',') |
175 |
|
self.total_number_of_events = int(cfg_params['CMSSW.total_number_of_events']) |
176 |
|
self.selectTotalNumberEvents = 1 |
177 |
|
if self.selectNumberOfJobs == 1: |
178 |
< |
if int(self.total_number_of_events) < int(self.theNumberOfJobs): |
178 |
> |
if (self.total_number_of_events != -1) and int(self.total_number_of_events) < int(self.theNumberOfJobs): |
179 |
|
msg = 'Must specify at least one event per job. total_number_of_events > number_of_jobs ' |
180 |
|
raise CrabException(msg) |
181 |
|
else: |
205 |
|
tmp.strip() |
206 |
|
self.incrementSeeds.append(tmp) |
207 |
|
|
208 |
< |
## Old method of dealing with seeds |
209 |
< |
## FUTURE: This is for old CMSSW and old CRAB. Can throw exceptions after a couple of CRAB releases and then |
198 |
< |
## remove |
199 |
< |
self.sourceSeed = cfg_params.get('CMSSW.pythia_seed',None) |
200 |
< |
if self.sourceSeed: |
201 |
< |
print "pythia_seed is a deprecated parameter. Use preserve_seeds or increment_seeds in the future.\n","Added to increment_seeds." |
202 |
< |
self.incrementSeeds.append('sourceSeed') |
203 |
< |
self.incrementSeeds.append('theSource') |
204 |
< |
|
208 |
> |
## FUTURE: Can remove in CRAB 2.4.0 |
209 |
> |
self.sourceSeed = cfg_params.get('CMSSW.pythia_seed',None) |
210 |
|
self.sourceSeedVtx = cfg_params.get('CMSSW.vtx_seed',None) |
211 |
< |
if self.sourceSeedVtx: |
207 |
< |
print "vtx_seed is a deprecated parameter. Use preserve_seeds or increment_seeds in the future.\n","Added to increment_seeds." |
208 |
< |
self.incrementSeeds.append('VtxSmeared') |
209 |
< |
|
210 |
< |
self.sourceSeedG4 = cfg_params.get('CMSSW.g4_seed',None) |
211 |
< |
if self.sourceSeedG4: |
212 |
< |
print "g4_seed is a deprecated parameter. Use preserve_seeds or increment_seeds in the future.\n","Added to increment_seeds." |
213 |
< |
self.incrementSeeds.append('g4SimHits') |
214 |
< |
|
211 |
> |
self.sourceSeedG4 = cfg_params.get('CMSSW.g4_seed',None) |
212 |
|
self.sourceSeedMix = cfg_params.get('CMSSW.mix_seed',None) |
213 |
< |
if self.sourceSeedMix: |
214 |
< |
print "mix_seed is a deprecated parameter. Use preserve_seeds or increment_seeds in the future.\n","Added to increment_seeds." |
215 |
< |
self.incrementSeeds.append('mix') |
213 |
> |
if self.sourceSeed or self.sourceSeedVtx or self.sourceSeedG4 or self.sourceSeedMix: |
214 |
> |
msg = 'pythia_seed, vtx_seed, g4_seed, and mix_seed are no longer valid settings. You must use increment_seeds or preserve_seeds' |
215 |
> |
raise CrabException(msg) |
216 |
|
|
217 |
|
self.firstRun = cfg_params.get('CMSSW.first_run',None) |
218 |
|
|
222 |
– |
if self.pset != None: #CarlosDaniele |
223 |
– |
import PsetManipulator as pp |
224 |
– |
PsetEdit = pp.PsetManipulator(self.pset) #Daniele Pset |
225 |
– |
|
219 |
|
# Copy/return |
227 |
– |
|
220 |
|
self.copy_data = int(cfg_params.get('USER.copy_data',0)) |
221 |
|
self.return_data = int(cfg_params.get('USER.return_data',0)) |
222 |
|
|
232 |
|
blockSites = self.DataDiscoveryAndLocation(cfg_params) |
233 |
|
#DBSDLS-end |
234 |
|
|
243 |
– |
|
235 |
|
## Select Splitting |
236 |
|
if self.selectNoInput: |
237 |
|
if self.pset == None: |
238 |
|
self.jobSplittingForScript() |
239 |
|
else: |
240 |
|
self.jobSplittingNoInput() |
241 |
+ |
elif (cfg_params.get('CMSSW.noblockboundary',0)): |
242 |
+ |
self.jobSplittingNoBlockBoundary(blockSites) |
243 |
|
else: |
244 |
|
self.jobSplittingByBlocks(blockSites) |
245 |
|
|
246 |
< |
# modify Pset |
247 |
< |
if self.pset != None: |
248 |
< |
try: |
249 |
< |
# Add FrameworkJobReport to parameter-set, set max events. |
250 |
< |
# Reset later for data jobs by writeCFG which does all modifications |
251 |
< |
PsetEdit.addCrabFJR(self.fjrFileName) # FUTURE: Job report addition not needed by CMSSW>1.5 |
252 |
< |
PsetEdit.maxEvent(self.eventsPerJob) |
253 |
< |
PsetEdit.psetWriter(self.configFilename()) |
254 |
< |
except: |
255 |
< |
msg='Error while manipulating ParameterSet: exiting...' |
256 |
< |
raise CrabException(msg) |
257 |
< |
self.tgzNameWithPath = self.getTarBall(self.executable) |
246 |
> |
# modify Pset only the first time |
247 |
> |
if isNew: |
248 |
> |
if self.pset != None: |
249 |
> |
import PsetManipulator as pp |
250 |
> |
PsetEdit = pp.PsetManipulator(self.pset) |
251 |
> |
try: |
252 |
> |
# Add FrameworkJobReport to parameter-set, set max events. |
253 |
> |
# Reset later for data jobs by writeCFG which does all modifications |
254 |
> |
PsetEdit.addCrabFJR(self.fjrFileName) # FUTURE: Job report addition not needed by CMSSW>1.5 |
255 |
> |
PsetEdit.maxEvent(self.eventsPerJob) |
256 |
> |
PsetEdit.psetWriter(self.configFilename()) |
257 |
> |
## If present, add TFileService to output files |
258 |
> |
if not int(cfg_params.get('CMSSW.skip_TFileService_output',0)): |
259 |
> |
tfsOutput = PsetEdit.getTFileService() |
260 |
> |
if tfsOutput: |
261 |
> |
if tfsOutput in self.output_file: |
262 |
> |
common.logger.debug(5,"Output from TFileService "+tfsOutput+" already in output files") |
263 |
> |
else: |
264 |
> |
outfileflag = True #output found |
265 |
> |
self.output_file.append(tfsOutput) |
266 |
> |
common.logger.message("Adding "+tfsOutput+" to output files (from TFileService)") |
267 |
> |
pass |
268 |
> |
pass |
269 |
> |
## If present and requested, add PoolOutputModule to output files |
270 |
> |
if int(cfg_params.get('CMSSW.get_edm_output',0)): |
271 |
> |
edmOutput = PsetEdit.getPoolOutputModule() |
272 |
> |
if edmOutput: |
273 |
> |
if edmOutput in self.output_file: |
274 |
> |
common.logger.debug(5,"Output from PoolOutputModule "+edmOutput+" already in output files") |
275 |
> |
else: |
276 |
> |
self.output_file.append(edmOutput) |
277 |
> |
common.logger.message("Adding "+edmOutput+" to output files (from PoolOutputModule)") |
278 |
> |
pass |
279 |
> |
pass |
280 |
> |
except CrabException: |
281 |
> |
msg='Error while manipulating ParameterSet: exiting...' |
282 |
> |
raise CrabException(msg) |
283 |
> |
## Prepare inputSandbox TarBall (only the first time) |
284 |
> |
self.tgzNameWithPath = self.getTarBall(self.executable) |
285 |
|
|
286 |
|
def DataDiscoveryAndLocation(self, cfg_params): |
287 |
|
|
294 |
|
## Contact the DBS |
295 |
|
common.logger.message("Contacting Data Discovery Services ...") |
296 |
|
try: |
297 |
< |
self.pubdata=DataDiscovery.DataDiscovery(datasetPath, cfg_params) |
297 |
> |
self.pubdata=DataDiscovery.DataDiscovery(datasetPath, cfg_params,self.skip_blocks) |
298 |
|
self.pubdata.fetchDBSInfo() |
299 |
|
|
300 |
|
except DataDiscovery.NotExistingDatasetError, ex : |
310 |
|
self.filesbyblock=self.pubdata.getFiles() |
311 |
|
self.eventsbyblock=self.pubdata.getEventsPerBlock() |
312 |
|
self.eventsbyfile=self.pubdata.getEventsPerFile() |
313 |
+ |
self.parentFiles=self.pubdata.getParent() |
314 |
|
|
315 |
|
## get max number of events |
316 |
|
self.maxEvents=self.pubdata.getMaxEvents() |
427 |
|
|
428 |
|
# ---- Iterate over the files in the block until we've met the requested ---- # |
429 |
|
# ---- total # of events or we've gone over all the files in this block ---- # |
430 |
+ |
pString='' |
431 |
|
while ( (eventsRemaining > 0) and (fileCount < numFilesInBlock) and (jobCount < totalNumberOfJobs) ): |
432 |
|
file = files[fileCount] |
433 |
+ |
if self.useParent: |
434 |
+ |
parent = self.parentFiles[file] |
435 |
+ |
for f in parent : |
436 |
+ |
pString += '\\\"' + f + '\\\"\,' |
437 |
+ |
common.logger.debug(6, "File "+str(file)+" has the following parents: "+str(parent)) |
438 |
+ |
common.logger.write("File "+str(file)+" has the following parents: "+str(parent)) |
439 |
|
if newFile : |
440 |
|
try: |
441 |
|
numEventsInFile = self.eventsbyfile[file] |
456 |
|
# end job using last file, use remaining events in block |
457 |
|
# close job and touch new file |
458 |
|
fullString = parString[:-2] |
459 |
< |
list_of_lists.append([fullString,str(-1),str(jobSkipEventCount)]) |
459 |
> |
if self.useParent: |
460 |
> |
fullParentString = pString[:-2] |
461 |
> |
list_of_lists.append([fullString,fullParentString,str(-1),str(jobSkipEventCount)]) |
462 |
> |
else: |
463 |
> |
list_of_lists.append([fullString,str(-1),str(jobSkipEventCount)]) |
464 |
|
common.logger.debug(3,"Job "+str(jobCount+1)+" can run over "+str(filesEventCount - jobSkipEventCount)+" events (last file in block).") |
465 |
|
self.jobDestination.append(blockSites[block]) |
466 |
|
common.logger.debug(5,"Job "+str(jobCount+1)+" Destination: "+str(self.jobDestination[jobCount])) |
472 |
|
eventsRemaining = eventsRemaining - filesEventCount + jobSkipEventCount |
473 |
|
jobSkipEventCount = 0 |
474 |
|
# reset file |
475 |
+ |
pString = "" |
476 |
|
parString = "" |
477 |
|
filesEventCount = 0 |
478 |
|
newFile = 1 |
485 |
|
elif ( filesEventCount - jobSkipEventCount == eventsPerJobRequested ) : |
486 |
|
# close job and touch new file |
487 |
|
fullString = parString[:-2] |
488 |
< |
list_of_lists.append([fullString,str(eventsPerJobRequested),str(jobSkipEventCount)]) |
488 |
> |
if self.useParent: |
489 |
> |
fullParentString = pString[:-2] |
490 |
> |
list_of_lists.append([fullString,fullParentString,str(eventsPerJobRequested),str(jobSkipEventCount)]) |
491 |
> |
else: |
492 |
> |
list_of_lists.append([fullString,str(eventsPerJobRequested),str(jobSkipEventCount)]) |
493 |
|
common.logger.debug(3,"Job "+str(jobCount+1)+" can run over "+str(eventsPerJobRequested)+" events.") |
494 |
|
self.jobDestination.append(blockSites[block]) |
495 |
|
common.logger.debug(5,"Job "+str(jobCount+1)+" Destination: "+str(self.jobDestination[jobCount])) |
500 |
|
eventsRemaining = eventsRemaining - eventsPerJobRequested |
501 |
|
jobSkipEventCount = 0 |
502 |
|
# reset file |
503 |
+ |
pString = "" |
504 |
|
parString = "" |
505 |
|
filesEventCount = 0 |
506 |
|
newFile = 1 |
510 |
|
else : |
511 |
|
# close job but don't touch new file |
512 |
|
fullString = parString[:-2] |
513 |
< |
list_of_lists.append([fullString,str(eventsPerJobRequested),str(jobSkipEventCount)]) |
513 |
> |
if self.useParent: |
514 |
> |
fullParentString = pString[:-2] |
515 |
> |
list_of_lists.append([fullString,fullParentString,str(eventsPerJobRequested),str(jobSkipEventCount)]) |
516 |
> |
else: |
517 |
> |
list_of_lists.append([fullString,str(eventsPerJobRequested),str(jobSkipEventCount)]) |
518 |
|
common.logger.debug(3,"Job "+str(jobCount+1)+" can run over "+str(eventsPerJobRequested)+" events.") |
519 |
|
self.jobDestination.append(blockSites[block]) |
520 |
|
common.logger.debug(5,"Job "+str(jobCount+1)+" Destination: "+str(self.jobDestination[jobCount])) |
528 |
|
jobSkipEventCount = eventsPerJobRequested - (filesEventCount - jobSkipEventCount - self.eventsbyfile[file]) |
529 |
|
# remove all but the last file |
530 |
|
filesEventCount = self.eventsbyfile[file] |
531 |
+ |
if self.useParent: |
532 |
+ |
for f in parent : pString += '\\\"' + f + '\\\"\,' |
533 |
|
parString = '\\\"' + file + '\\\"\,' |
534 |
|
pass # END if |
535 |
|
pass # END while (iterate over files in the block) |
585 |
|
self.list_of_args = list_of_lists |
586 |
|
return |
587 |
|
|
588 |
+ |
def jobSplittingNoBlockBoundary(self,blockSites): |
589 |
+ |
""" |
590 |
+ |
""" |
591 |
+ |
# ---- Handle the possible job splitting configurations ---- # |
592 |
+ |
if (self.selectTotalNumberEvents): |
593 |
+ |
totalEventsRequested = self.total_number_of_events |
594 |
+ |
if (self.selectEventsPerJob): |
595 |
+ |
eventsPerJobRequested = self.eventsPerJob |
596 |
+ |
if (self.selectNumberOfJobs): |
597 |
+ |
totalEventsRequested = self.theNumberOfJobs * self.eventsPerJob |
598 |
+ |
|
599 |
+ |
# If user requested all the events in the dataset |
600 |
+ |
if (totalEventsRequested == -1): |
601 |
+ |
eventsRemaining=self.maxEvents |
602 |
+ |
# If user requested more events than are in the dataset |
603 |
+ |
elif (totalEventsRequested > self.maxEvents): |
604 |
+ |
eventsRemaining = self.maxEvents |
605 |
+ |
common.logger.message("Requested "+str(self.total_number_of_events)+ " events, but only "+str(self.maxEvents)+" events are available.") |
606 |
+ |
# If user requested less events than are in the dataset |
607 |
+ |
else: |
608 |
+ |
eventsRemaining = totalEventsRequested |
609 |
+ |
|
610 |
+ |
# If user requested more events per job than are in the dataset |
611 |
+ |
if (self.selectEventsPerJob and eventsPerJobRequested > self.maxEvents): |
612 |
+ |
eventsPerJobRequested = self.maxEvents |
613 |
+ |
|
614 |
+ |
# For user info at end |
615 |
+ |
totalEventCount = 0 |
616 |
+ |
|
617 |
+ |
if (self.selectTotalNumberEvents and self.selectNumberOfJobs): |
618 |
+ |
eventsPerJobRequested = int(eventsRemaining/self.theNumberOfJobs) |
619 |
+ |
|
620 |
+ |
if (self.selectNumberOfJobs): |
621 |
+ |
common.logger.message("May not create the exact number_of_jobs requested.") |
622 |
+ |
|
623 |
+ |
if ( self.ncjobs == 'all' ) : |
624 |
+ |
totalNumberOfJobs = 999999999 |
625 |
+ |
else : |
626 |
+ |
totalNumberOfJobs = self.ncjobs |
627 |
+ |
|
628 |
+ |
blocks = blockSites.keys() |
629 |
+ |
blockCount = 0 |
630 |
+ |
# Backup variable in case self.maxEvents counted events in a non-included block |
631 |
+ |
numBlocksInDataset = len(blocks) |
632 |
+ |
|
633 |
+ |
jobCount = 0 |
634 |
+ |
list_of_lists = [] |
635 |
+ |
|
636 |
+ |
#AF |
637 |
+ |
#AF do not reset input files and event count on block boundary |
638 |
+ |
#AF |
639 |
+ |
parString="" |
640 |
+ |
filesEventCount = 0 |
641 |
+ |
#AF |
642 |
+ |
|
643 |
+ |
# list tracking which jobs are in which jobs belong to which block |
644 |
+ |
jobsOfBlock = {} |
645 |
+ |
while ( (eventsRemaining > 0) and (blockCount < numBlocksInDataset) and (jobCount < totalNumberOfJobs)): |
646 |
+ |
block = blocks[blockCount] |
647 |
+ |
blockCount += 1 |
648 |
+ |
if block not in jobsOfBlock.keys() : |
649 |
+ |
jobsOfBlock[block] = [] |
650 |
+ |
|
651 |
+ |
if self.eventsbyblock.has_key(block) : |
652 |
+ |
numEventsInBlock = self.eventsbyblock[block] |
653 |
+ |
common.logger.debug(5,'Events in Block File '+str(numEventsInBlock)) |
654 |
+ |
files = self.filesbyblock[block] |
655 |
+ |
numFilesInBlock = len(files) |
656 |
+ |
if (numFilesInBlock <= 0): |
657 |
+ |
continue |
658 |
+ |
fileCount = 0 |
659 |
+ |
#AF |
660 |
+ |
#AF do not reset input files and event count of block boundary |
661 |
+ |
#AF |
662 |
+ |
## ---- New block => New job ---- # |
663 |
+ |
#parString = "" |
664 |
+ |
# counter for number of events in files currently worked on |
665 |
+ |
#filesEventCount = 0 |
666 |
+ |
#AF |
667 |
+ |
# flag if next while loop should touch new file |
668 |
+ |
newFile = 1 |
669 |
+ |
# job event counter |
670 |
+ |
jobSkipEventCount = 0 |
671 |
+ |
|
672 |
+ |
# ---- Iterate over the files in the block until we've met the requested ---- # |
673 |
+ |
# ---- total # of events or we've gone over all the files in this block ---- # |
674 |
+ |
pString='' |
675 |
+ |
while ( (eventsRemaining > 0) and (fileCount < numFilesInBlock) and (jobCount < totalNumberOfJobs) ): |
676 |
+ |
file = files[fileCount] |
677 |
+ |
if self.useParent: |
678 |
+ |
parent = self.parentFiles[file] |
679 |
+ |
for f in parent : |
680 |
+ |
pString += '\\\"' + f + '\\\"\,' |
681 |
+ |
common.logger.debug(6, "File "+str(file)+" has the following parents: "+str(parent)) |
682 |
+ |
common.logger.write("File "+str(file)+" has the following parents: "+str(parent)) |
683 |
+ |
if newFile : |
684 |
+ |
try: |
685 |
+ |
numEventsInFile = self.eventsbyfile[file] |
686 |
+ |
common.logger.debug(6, "File "+str(file)+" has "+str(numEventsInFile)+" events") |
687 |
+ |
# increase filesEventCount |
688 |
+ |
filesEventCount += numEventsInFile |
689 |
+ |
# Add file to current job |
690 |
+ |
parString += '\\\"' + file + '\\\"\,' |
691 |
+ |
newFile = 0 |
692 |
+ |
except KeyError: |
693 |
+ |
common.logger.message("File "+str(file)+" has unknown number of events: skipping") |
694 |
+ |
eventsPerJobRequested = min(eventsPerJobRequested, eventsRemaining) |
695 |
+ |
#common.logger.message("AF filesEventCount %s - jobSkipEventCount %s "%(filesEventCount,jobSkipEventCount)) |
696 |
+ |
# if less events in file remain than eventsPerJobRequested |
697 |
+ |
if ( filesEventCount - jobSkipEventCount < eventsPerJobRequested): |
698 |
+ |
#AF |
699 |
+ |
#AF skip fileboundary part |
700 |
+ |
#AF |
701 |
+ |
# go to next file |
702 |
+ |
newFile = 1 |
703 |
+ |
fileCount += 1 |
704 |
+ |
# if events in file equal to eventsPerJobRequested |
705 |
+ |
elif ( filesEventCount - jobSkipEventCount == eventsPerJobRequested ) : |
706 |
+ |
# close job and touch new file |
707 |
+ |
fullString = parString[:-2] |
708 |
+ |
if self.useParent: |
709 |
+ |
fullParentString = pString[:-2] |
710 |
+ |
list_of_lists.append([fullString,fullParentString,str(eventsPerJobRequested),str(jobSkipEventCount)]) |
711 |
+ |
else: |
712 |
+ |
list_of_lists.append([fullString,str(eventsPerJobRequested),str(jobSkipEventCount)]) |
713 |
+ |
common.logger.debug(3,"Job "+str(jobCount+1)+" can run over "+str(eventsPerJobRequested)+" events.") |
714 |
+ |
self.jobDestination.append(blockSites[block]) |
715 |
+ |
common.logger.debug(5,"Job "+str(jobCount+1)+" Destination: "+str(self.jobDestination[jobCount])) |
716 |
+ |
jobsOfBlock[block].append(jobCount+1) |
717 |
+ |
# reset counter |
718 |
+ |
jobCount = jobCount + 1 |
719 |
+ |
totalEventCount = totalEventCount + eventsPerJobRequested |
720 |
+ |
eventsRemaining = eventsRemaining - eventsPerJobRequested |
721 |
+ |
jobSkipEventCount = 0 |
722 |
+ |
# reset file |
723 |
+ |
pString = "" |
724 |
+ |
parString = "" |
725 |
+ |
filesEventCount = 0 |
726 |
+ |
newFile = 1 |
727 |
+ |
fileCount += 1 |
728 |
+ |
|
729 |
+ |
# if more events in file remain than eventsPerJobRequested |
730 |
+ |
else : |
731 |
+ |
# close job but don't touch new file |
732 |
+ |
fullString = parString[:-2] |
733 |
+ |
if self.useParent: |
734 |
+ |
fullParentString = pString[:-2] |
735 |
+ |
list_of_lists.append([fullString,fullParentString,str(eventsPerJobRequested),str(jobSkipEventCount)]) |
736 |
+ |
else: |
737 |
+ |
list_of_lists.append([fullString,str(eventsPerJobRequested),str(jobSkipEventCount)]) |
738 |
+ |
common.logger.debug(3,"Job "+str(jobCount+1)+" can run over "+str(eventsPerJobRequested)+" events.") |
739 |
+ |
self.jobDestination.append(blockSites[block]) |
740 |
+ |
common.logger.debug(5,"Job "+str(jobCount+1)+" Destination: "+str(self.jobDestination[jobCount])) |
741 |
+ |
jobsOfBlock[block].append(jobCount+1) |
742 |
+ |
# increase counter |
743 |
+ |
jobCount = jobCount + 1 |
744 |
+ |
totalEventCount = totalEventCount + eventsPerJobRequested |
745 |
+ |
eventsRemaining = eventsRemaining - eventsPerJobRequested |
746 |
+ |
# calculate skip events for last file |
747 |
+ |
# use filesEventCount (contains several files), jobSkipEventCount and eventsPerJobRequest |
748 |
+ |
jobSkipEventCount = eventsPerJobRequested - (filesEventCount - jobSkipEventCount - self.eventsbyfile[file]) |
749 |
+ |
# remove all but the last file |
750 |
+ |
filesEventCount = self.eventsbyfile[file] |
751 |
+ |
if self.useParent: |
752 |
+ |
for f in parent : pString += '\\\"' + f + '\\\"\,' |
753 |
+ |
parString = '\\\"' + file + '\\\"\,' |
754 |
+ |
pass # END if |
755 |
+ |
pass # END while (iterate over files in the block) |
756 |
+ |
pass # END while (iterate over blocks in the dataset) |
757 |
+ |
self.ncjobs = self.total_number_of_jobs = jobCount |
758 |
+ |
if (eventsRemaining > 0 and jobCount < totalNumberOfJobs ): |
759 |
+ |
common.logger.message("eventsRemaining "+str(eventsRemaining)) |
760 |
+ |
common.logger.message("jobCount "+str(jobCount)) |
761 |
+ |
common.logger.message(" totalNumberOfJobs "+str(totalNumberOfJobs)) |
762 |
+ |
common.logger.message("Could not run on all requested events because some blocks not hosted at allowed sites.") |
763 |
+ |
common.logger.message(str(jobCount)+" job(s) can run on "+str(totalEventCount)+" events.\n") |
764 |
+ |
|
765 |
+ |
# screen output |
766 |
+ |
screenOutput = "List of jobs and available destination sites:\n\n" |
767 |
+ |
|
768 |
+ |
#AF |
769 |
+ |
#AF skip check on block with no sites |
770 |
+ |
#AF |
771 |
+ |
self.list_of_args = list_of_lists |
772 |
+ |
|
773 |
+ |
return |
774 |
+ |
|
775 |
+ |
|
776 |
+ |
|
777 |
|
def jobSplittingNoInput(self): |
778 |
|
""" |
779 |
|
Perform job splitting based on number of event per job |
847 |
|
self.list_of_args.append([str(i)]) |
848 |
|
return |
849 |
|
|
850 |
< |
def split(self, jobParams): |
850 |
> |
def split(self, jobParams,firstJobID): |
851 |
|
|
852 |
|
njobs = self.total_number_of_jobs |
853 |
|
arglist = self.list_of_args |
857 |
|
|
858 |
|
listID=[] |
859 |
|
listField=[] |
860 |
< |
for job in range(njobs): |
861 |
< |
jobParams[job] = arglist[job] |
860 |
> |
for id in range(njobs): |
861 |
> |
job = id + int(firstJobID) |
862 |
> |
jobParams[id] = arglist[id] |
863 |
|
listID.append(job+1) |
864 |
|
job_ToSave ={} |
865 |
|
concString = ' ' |
866 |
|
argu='' |
867 |
< |
if len(jobParams[job]): |
868 |
< |
argu += concString.join(jobParams[job] ) |
867 |
> |
if len(jobParams[id]): |
868 |
> |
argu += concString.join(jobParams[id] ) |
869 |
|
job_ToSave['arguments']= str(job+1)+' '+argu |
870 |
< |
job_ToSave['dlsDestination']= self.jobDestination[job] |
870 |
> |
job_ToSave['dlsDestination']= self.jobDestination[id] |
871 |
|
listField.append(job_ToSave) |
872 |
|
msg="Job "+str(job)+" Arguments: "+str(job+1)+" "+argu+"\n" \ |
873 |
< |
+" Destination: "+str(self.jobDestination[job]) |
873 |
> |
+" Destination: "+str(self.jobDestination[id]) |
874 |
|
common.logger.debug(5,msg) |
875 |
|
common._db.updateJob_(listID,listField) |
876 |
|
self.argsList = (len(jobParams[0])+1) |
884 |
|
""" |
885 |
|
Return the TarBall with lib and exe |
886 |
|
""" |
887 |
< |
self.tgzNameWithPath = common.work_space.pathForTgz()+'share/'+self.tgz_name |
887 |
> |
self.tgzNameWithPath = common.work_space.pathForTgz()+self.tgz_name |
888 |
|
if os.path.exists(self.tgzNameWithPath): |
889 |
|
return self.tgzNameWithPath |
890 |
|
|
943 |
|
tar.add(module,moduleDir) |
944 |
|
|
945 |
|
## Now check if any data dir(s) is present |
712 |
– |
swAreaLen=len(swArea) |
946 |
|
self.dataExist = False |
947 |
< |
for root, dirs, files in os.walk(swArea): |
948 |
< |
if "data" in dirs: |
949 |
< |
self.dataExist=True |
950 |
< |
common.logger.debug(5,"data "+root+"/data"+" to be tarred") |
951 |
< |
tar.add(root+"/data",root[swAreaLen:]+"/data") |
947 |
> |
todo_list = [(i, i) for i in os.listdir(swArea+"/src")] |
948 |
> |
while len(todo_list): |
949 |
> |
entry, name = todo_list.pop() |
950 |
> |
if name.startswith('crab_0_') or name.startswith('.') or name == 'CVS': |
951 |
> |
continue |
952 |
> |
if os.path.isdir(swArea+"/src/"+entry): |
953 |
> |
entryPath = entry + '/' |
954 |
> |
todo_list += [(entryPath + i, i) for i in os.listdir(swArea+"/src/"+entry)] |
955 |
> |
if name == 'data': |
956 |
> |
self.dataExist=True |
957 |
> |
common.logger.debug(5,"data "+entry+" to be tarred") |
958 |
> |
tar.add(swArea+"/src/"+entry,"src/"+entry) |
959 |
> |
pass |
960 |
> |
pass |
961 |
|
|
962 |
|
### CMSSW ParameterSet |
963 |
|
if not self.pset is None: |
967 |
|
|
968 |
|
|
969 |
|
## Add ProdCommon dir to tar |
970 |
< |
prodcommonDir = 'ProdCommon' |
971 |
< |
prodcommonPath = os.environ['CRABDIR'] + '/' + 'ProdCommon' |
972 |
< |
if os.path.isdir(prodcommonPath): |
973 |
< |
tar.add(prodcommonPath,prodcommonDir) |
970 |
> |
prodcommonDir = './' |
971 |
> |
prodcommonPath = os.environ['CRABDIR'] + '/' + 'external/' |
972 |
> |
neededStuff = ['ProdCommon/__init__.py','ProdCommon/FwkJobRep', 'ProdCommon/CMSConfigTools', \ |
973 |
> |
'ProdCommon/Core', 'ProdCommon/MCPayloads', 'IMProv', 'ProdCommon/Storage'] |
974 |
> |
for file in neededStuff: |
975 |
> |
tar.add(prodcommonPath+file,prodcommonDir+file) |
976 |
|
common.logger.debug(5,"Files added to "+self.tgzNameWithPath+" : "+str(tar.getnames())) |
977 |
|
|
978 |
|
##### ML stuff |
983 |
|
common.logger.debug(5,"Files added to "+self.tgzNameWithPath+" : "+str(tar.getnames())) |
984 |
|
|
985 |
|
##### Utils |
986 |
< |
Utils_file_list=['parseCrabFjr.py','writeCfg.py', 'JobReportErrorCode.py'] |
986 |
> |
Utils_file_list=['parseCrabFjr.py','writeCfg.py', 'fillCrabFjr.py','cmscp.py'] |
987 |
|
for file in Utils_file_list: |
988 |
|
tar.add(path+file,file) |
989 |
|
common.logger.debug(5,"Files added to "+self.tgzNameWithPath+" : "+str(tar.getnames())) |
994 |
|
common.logger.debug(5,"Files added to "+self.tgzNameWithPath+" : "+str(tar.getnames())) |
995 |
|
|
996 |
|
tar.close() |
997 |
< |
except : |
998 |
< |
raise CrabException('Could not create tar-ball') |
997 |
> |
except IOError, exc: |
998 |
> |
common.logger.write(str(exc)) |
999 |
> |
raise CrabException('Could not create tar-ball '+self.tgzNameWithPath) |
1000 |
> |
except tarfile.TarError, exc: |
1001 |
> |
common.logger.write(str(exc)) |
1002 |
> |
raise CrabException('Could not create tar-ball '+self.tgzNameWithPath) |
1003 |
|
|
1004 |
|
## check for tarball size |
1005 |
|
tarballinfo = os.stat(self.tgzNameWithPath) |
1006 |
|
if ( tarballinfo.st_size > self.MaxTarBallSize*1024*1024 ) : |
1007 |
< |
raise CrabException('Input sandbox size of ' + str(float(tarballinfo.st_size)/1024.0/1024.0) + ' MB is larger than the allowed ' + str(self.MaxTarBallSize) + ' MB input sandbox limit and not supported by the used GRID submission system. Please make sure that no unnecessary files are in all data directories in your local CMSSW project area as they are automatically packed into the input sandbox.') |
1007 |
> |
msg = 'Input sandbox size of ' + str(float(tarballinfo.st_size)/1024.0/1024.0) + ' MB is larger than the allowed ' + str(self.MaxTarBallSize) \ |
1008 |
> |
+'MB input sandbox limit \n' |
1009 |
> |
msg += ' and not supported by the direct GRID submission system.\n' |
1010 |
> |
msg += ' Please use the CRAB server mode by setting server_name=<NAME> in section [CRAB] of your crab.cfg.\n' |
1011 |
> |
msg += ' For further infos please see https://twiki.cern.ch/twiki/bin/view/CMS/CrabServer#CRABSERVER_for_Users' |
1012 |
> |
raise CrabException(msg) |
1013 |
|
|
1014 |
|
## create tar-ball with ML stuff |
1015 |
|
|
1079 |
|
# Prepare job-specific part |
1080 |
|
job = common.job_list[nj] |
1081 |
|
if (self.datasetPath): |
1082 |
+ |
self.primaryDataset = self.datasetPath.split("/")[1] |
1083 |
+ |
DataTier = self.datasetPath.split("/")[2] |
1084 |
|
txt += '\n' |
1085 |
|
txt += 'DatasetPath='+self.datasetPath+'\n' |
1086 |
|
|
1087 |
< |
datasetpath_split = self.datasetPath.split("/") |
1088 |
< |
|
834 |
< |
txt += 'PrimaryDataset='+datasetpath_split[1]+'\n' |
835 |
< |
txt += 'DataTier='+datasetpath_split[2]+'\n' |
1087 |
> |
txt += 'PrimaryDataset='+self.primaryDataset +'\n' |
1088 |
> |
txt += 'DataTier='+DataTier+'\n' |
1089 |
|
txt += 'ApplicationFamily=cmsRun\n' |
1090 |
|
|
1091 |
|
else: |
1092 |
+ |
self.primaryDataset = 'null' |
1093 |
|
txt += 'DatasetPath=MCDataTier\n' |
1094 |
|
txt += 'PrimaryDataset=null\n' |
1095 |
|
txt += 'DataTier=null\n' |
1100 |
|
txt += 'cp $RUNTIME_AREA/'+pset+' .\n' |
1101 |
|
if (self.datasetPath): # standard job |
1102 |
|
txt += 'InputFiles=${args[1]}; export InputFiles\n' |
1103 |
< |
txt += 'MaxEvents=${args[2]}; export MaxEvents\n' |
1104 |
< |
txt += 'SkipEvents=${args[3]}; export SkipEvents\n' |
1103 |
> |
if (self.useParent): |
1104 |
> |
txt += 'ParentFiles=${args[2]}; export ParentFiles\n' |
1105 |
> |
txt += 'MaxEvents=${args[3]}; export MaxEvents\n' |
1106 |
> |
txt += 'SkipEvents=${args[4]}; export SkipEvents\n' |
1107 |
> |
else: |
1108 |
> |
txt += 'MaxEvents=${args[2]}; export MaxEvents\n' |
1109 |
> |
txt += 'SkipEvents=${args[3]}; export SkipEvents\n' |
1110 |
|
txt += 'echo "Inputfiles:<$InputFiles>"\n' |
1111 |
+ |
if (self.useParent): txt += 'echo "ParentFiles:<$ParentFiles>"\n' |
1112 |
|
txt += 'echo "MaxEvents:<$MaxEvents>"\n' |
1113 |
|
txt += 'echo "SkipEvents:<$SkipEvents>"\n' |
1114 |
|
else: # pythia like job |
1126 |
|
if self.pset != None: |
1127 |
|
# FUTURE: Can simply for 2_1_x and higher |
1128 |
|
txt += '\n' |
1129 |
< |
if self.debug_pset==True: |
1129 |
> |
if self.debug_wrapper==True: |
1130 |
|
txt += 'echo "***** cat ' + psetName + ' *********"\n' |
1131 |
|
txt += 'cat ' + psetName + '\n' |
1132 |
|
txt += 'echo "****** end ' + psetName + ' ********"\n' |
1133 |
|
txt += '\n' |
1134 |
< |
txt += 'PSETHASH=`edmConfigHash < ' + psetName + '` \n' |
1134 |
> |
if (self.CMSSW_major >= 2 and self.CMSSW_minor >= 1) or (self.CMSSW_major >= 3): |
1135 |
> |
txt += 'PSETHASH=`edmConfigHash ' + psetName + '` \n' |
1136 |
> |
else: |
1137 |
> |
txt += 'PSETHASH=`edmConfigHash < ' + psetName + '` \n' |
1138 |
|
txt += 'echo "PSETHASH = $PSETHASH" \n' |
1139 |
|
txt += '\n' |
1140 |
|
return txt |
1150 |
|
if os.path.isfile(self.tgzNameWithPath): |
1151 |
|
txt += 'echo ">>> tar xzvf $RUNTIME_AREA/'+os.path.basename(self.tgzNameWithPath)+' :" \n' |
1152 |
|
txt += 'tar xzvf $RUNTIME_AREA/'+os.path.basename(self.tgzNameWithPath)+'\n' |
1153 |
< |
txt += 'ls -Al \n' |
1153 |
> |
if self.debug_wrapper: |
1154 |
> |
txt += 'ls -Al \n' |
1155 |
|
txt += 'untar_status=$? \n' |
1156 |
|
txt += 'if [ $untar_status -ne 0 ]; then \n' |
1157 |
|
txt += ' echo "ERROR ==> Untarring .tgz file failed"\n' |
1161 |
|
txt += ' echo "Successful untar" \n' |
1162 |
|
txt += 'fi \n' |
1163 |
|
txt += '\n' |
1164 |
< |
txt += 'echo ">>> Include ProdCommon in PYTHONPATH:"\n' |
1164 |
> |
txt += 'echo ">>> Include $RUNTIME_AREA in PYTHONPATH:"\n' |
1165 |
|
txt += 'if [ -z "$PYTHONPATH" ]; then\n' |
1166 |
< |
txt += ' export PYTHONPATH=$RUNTIME_AREA/ProdCommon\n' |
1166 |
> |
txt += ' export PYTHONPATH=$RUNTIME_AREA/\n' |
1167 |
|
txt += 'else\n' |
1168 |
< |
txt += ' export PYTHONPATH=$RUNTIME_AREA/ProdCommon:${PYTHONPATH}\n' |
1168 |
> |
txt += ' export PYTHONPATH=$RUNTIME_AREA/:${PYTHONPATH}\n' |
1169 |
|
txt += 'echo "PYTHONPATH=$PYTHONPATH"\n' |
1170 |
|
txt += 'fi\n' |
1171 |
|
txt += '\n' |
1192 |
|
if len(self.additional_inbox_files)>0: |
1193 |
|
for file in self.additional_inbox_files: |
1194 |
|
txt += 'mv $RUNTIME_AREA/'+os.path.basename(file)+' . \n' |
1195 |
< |
txt += 'mv $RUNTIME_AREA/ProdCommon/ . \n' |
1195 |
> |
# txt += 'mv $RUNTIME_AREA/ProdCommon/ . \n' |
1196 |
> |
# txt += 'mv $RUNTIME_AREA/IMProv/ . \n' |
1197 |
|
|
1198 |
+ |
txt += 'echo ">>> Include $RUNTIME_AREA in PYTHONPATH:"\n' |
1199 |
|
txt += 'if [ -z "$PYTHONPATH" ]; then\n' |
1200 |
< |
txt += ' export PYTHONPATH=$SOFTWARE_DIR/ProdCommon\n' |
1200 |
> |
txt += ' export PYTHONPATH=$RUNTIME_AREA/\n' |
1201 |
|
txt += 'else\n' |
1202 |
< |
txt += ' export PYTHONPATH=$SOFTWARE_DIR/ProdCommon:${PYTHONPATH}\n' |
1202 |
> |
txt += ' export PYTHONPATH=$RUNTIME_AREA/:${PYTHONPATH}\n' |
1203 |
|
txt += 'echo "PYTHONPATH=$PYTHONPATH"\n' |
1204 |
|
txt += 'fi\n' |
1205 |
|
txt += '\n' |
1206 |
|
|
1207 |
|
return txt |
1208 |
|
|
943 |
– |
def modifySteeringCards(self, nj): |
944 |
– |
""" |
945 |
– |
modify the card provided by the user, |
946 |
– |
writing a new card into share dir |
947 |
– |
""" |
1209 |
|
|
1210 |
|
def executableName(self): |
1211 |
|
if self.scriptExe: |
1237 |
|
inp_box = [] |
1238 |
|
if os.path.isfile(self.tgzNameWithPath): |
1239 |
|
inp_box.append(self.tgzNameWithPath) |
1240 |
< |
wrapper = os.path.basename(str(common._db.queryTask('scriptName'))) |
980 |
< |
inp_box.append(common.work_space.pathForTgz() +'job/'+ wrapper) |
1240 |
> |
inp_box.append(common.work_space.jobDir() + self.scriptName) |
1241 |
|
return inp_box |
1242 |
|
|
1243 |
|
def outputSandbox(self, nj): |
1249 |
|
## User Declared output files |
1250 |
|
for out in (self.output_file+self.output_file_sandbox): |
1251 |
|
n_out = nj + 1 |
1252 |
< |
out_box.append(self.numberFile_(out,str(n_out))) |
1252 |
> |
out_box.append(numberFile(out,str(n_out))) |
1253 |
|
return out_box |
1254 |
|
|
995 |
– |
def prepareSteeringCards(self): |
996 |
– |
""" |
997 |
– |
Make initial modifications of the user's steering card file. |
998 |
– |
""" |
999 |
– |
return |
1255 |
|
|
1256 |
|
def wsRenameOutput(self, nj): |
1257 |
|
""" |
1261 |
|
txt = '\n#Written by cms_cmssw::wsRenameOutput\n' |
1262 |
|
txt += 'echo ">>> current directory (SOFTWARE_DIR): $SOFTWARE_DIR" \n' |
1263 |
|
txt += 'echo ">>> current directory content:"\n' |
1264 |
< |
txt += 'ls \n' |
1264 |
> |
if self.debug_wrapper: |
1265 |
> |
txt += 'ls -Al\n' |
1266 |
|
txt += '\n' |
1267 |
|
|
1268 |
|
for fileWithSuffix in (self.output_file): |
1269 |
< |
output_file_num = self.numberFile_(fileWithSuffix, '$NJob') |
1269 |
> |
output_file_num = numberFile(fileWithSuffix, '$NJob') |
1270 |
|
txt += '\n' |
1271 |
|
txt += '# check output file\n' |
1272 |
|
txt += 'if [ -e ./'+fileWithSuffix+' ] ; then\n' |
1287 |
|
txt += 'fi\n' |
1288 |
|
file_list = [] |
1289 |
|
for fileWithSuffix in (self.output_file): |
1290 |
< |
file_list.append(self.numberFile_(fileWithSuffix, '$NJob')) |
1290 |
> |
file_list.append(numberFile(fileWithSuffix, '$NJob')) |
1291 |
|
|
1292 |
|
txt += 'file_list="'+string.join(file_list,' ')+'"\n' |
1293 |
|
txt += '\n' |
1294 |
|
txt += 'echo ">>> current directory (SOFTWARE_DIR): $SOFTWARE_DIR" \n' |
1295 |
|
txt += 'echo ">>> current directory content:"\n' |
1296 |
< |
txt += 'ls \n' |
1296 |
> |
if self.debug_wrapper: |
1297 |
> |
txt += 'ls -Al\n' |
1298 |
|
txt += '\n' |
1299 |
|
txt += 'cd $RUNTIME_AREA\n' |
1300 |
|
txt += 'echo ">>> current directory (RUNTIME_AREA): $RUNTIME_AREA"\n' |
1301 |
|
return txt |
1302 |
|
|
1046 |
– |
def numberFile_(self, file, txt): |
1047 |
– |
""" |
1048 |
– |
append _'txt' before last extension of a file |
1049 |
– |
""" |
1050 |
– |
p = string.split(file,".") |
1051 |
– |
# take away last extension |
1052 |
– |
name = p[0] |
1053 |
– |
for x in p[1:-1]: |
1054 |
– |
name=name+"."+x |
1055 |
– |
# add "_txt" |
1056 |
– |
if len(p)>1: |
1057 |
– |
ext = p[len(p)-1] |
1058 |
– |
result = name + '_' + txt + "." + ext |
1059 |
– |
else: |
1060 |
– |
result = name + '_' + txt |
1061 |
– |
|
1062 |
– |
return result |
1063 |
– |
|
1303 |
|
def getRequirements(self, nj=[]): |
1304 |
|
""" |
1305 |
|
return job requirements to add to jdl files |
1315 |
|
'", other.GlueHostApplicationSoftwareRunTimeEnvironment)' |
1316 |
|
|
1317 |
|
req = req + ' && (other.GlueHostNetworkAdapterOutboundIP)' |
1318 |
< |
if common.scheduler.name() == "glitecoll": |
1318 |
> |
if ( common.scheduler.name() == "glitecoll" ) or ( common.scheduler.name() == "glite"): |
1319 |
|
req += ' && other.GlueCEStateStatus == "Production" ' |
1320 |
|
|
1321 |
|
return req |
1386 |
|
txt += ' echo "==> setup cms environment ok"\n' |
1387 |
|
return txt |
1388 |
|
|
1389 |
< |
def modifyReport(self, nj): |
1389 |
> |
def wsModifyReport(self, nj): |
1390 |
|
""" |
1391 |
|
insert the part of the script that modifies the FrameworkJob Report |
1392 |
|
""" |
1393 |
< |
txt = '\n#Written by cms_cmssw::modifyReport\n' |
1393 |
> |
txt = '\n#Written by cms_cmssw::wsModifyReport\n' |
1394 |
|
publish_data = int(self.cfg_params.get('USER.publish_data',0)) |
1395 |
|
if (publish_data == 1): |
1157 |
– |
processedDataset = self.cfg_params['USER.publish_data_name'] |
1158 |
– |
LFNBaseName = LFNBase(processedDataset) |
1396 |
|
|
1397 |
< |
txt += 'if [ $copy_exit_status -eq 0 ]; then\n' |
1398 |
< |
txt += ' FOR_LFN=%s_${PSETHASH}/\n'%(LFNBaseName) |
1397 |
> |
txt += 'if [ $StageOutExitStatus -eq 0 ]; then\n' |
1398 |
> |
txt += ' FOR_LFN=$LFNBaseName/${PSETHASH}/\n' |
1399 |
|
txt += 'else\n' |
1400 |
|
txt += ' FOR_LFN=/copy_problems/ \n' |
1401 |
|
txt += ' SE=""\n' |
1403 |
|
txt += 'fi\n' |
1404 |
|
|
1405 |
|
txt += 'echo ">>> Modify Job Report:" \n' |
1406 |
< |
txt += 'chmod a+x $SOFTWARE_DIR/ProdCommon/ProdCommon/FwkJobRep/ModifyJobReport.py\n' |
1407 |
< |
txt += 'ProcessedDataset='+processedDataset+'\n' |
1406 |
> |
txt += 'chmod a+x $RUNTIME_AREA/ProdCommon/FwkJobRep/ModifyJobReport.py\n' |
1407 |
> |
txt += 'ProcessedDataset= $procDataset \n' |
1408 |
|
txt += 'echo "ProcessedDataset = $ProcessedDataset"\n' |
1409 |
|
txt += 'echo "SE = $SE"\n' |
1410 |
|
txt += 'echo "SE_PATH = $SE_PATH"\n' |
1411 |
|
txt += 'echo "FOR_LFN = $FOR_LFN" \n' |
1412 |
|
txt += 'echo "CMSSW_VERSION = $CMSSW_VERSION"\n\n' |
1413 |
< |
txt += 'echo "$SOFTWARE_DIR/ProdCommon/ProdCommon/FwkJobRep/ModifyJobReport.py $RUNTIME_AREA/crab_fjr_$NJob.xml $NJob $FOR_LFN $PrimaryDataset $DataTier $ProcessedDataset $ApplicationFamily $executable $CMSSW_VERSION $PSETHASH $SE $SE_PATH"\n' |
1414 |
< |
txt += '$SOFTWARE_DIR/ProdCommon/ProdCommon/FwkJobRep/ModifyJobReport.py $RUNTIME_AREA/crab_fjr_$NJob.xml $NJob $FOR_LFN $PrimaryDataset $DataTier $ProcessedDataset $ApplicationFamily $executable $CMSSW_VERSION $PSETHASH $SE $SE_PATH\n' |
1413 |
> |
args = '$RUNTIME_AREA/crab_fjr_$NJob.xml $NJob $FOR_LFN $PrimaryDataset $DataTier ' \ |
1414 |
> |
'$User -$ProcessedDataset-$PSETHASH $ApplicationFamily '+ \ |
1415 |
> |
' $executable $CMSSW_VERSION $PSETHASH $SE $SE_PATH"\n' |
1416 |
> |
txt += 'echo "$RUNTIME_AREA/ProdCommon/FwkJobRep/ModifyJobReport.py '+str(args) |
1417 |
> |
txt += '$RUNTIME_AREA/ProdCommon/FwkJobRep/ModifyJobReport.py '+str(args) |
1418 |
|
txt += 'modifyReport_result=$?\n' |
1419 |
|
txt += 'if [ $modifyReport_result -ne 0 ]; then\n' |
1420 |
|
txt += ' modifyReport_result=70500\n' |
1434 |
|
txt += 'echo ">>> Parse FrameworkJobReport crab_fjr.xml"\n' |
1435 |
|
txt += 'if [ -s $RUNTIME_AREA/crab_fjr_$NJob.xml ]; then\n' |
1436 |
|
txt += ' if [ -s $RUNTIME_AREA/parseCrabFjr.py ]; then\n' |
1437 |
< |
txt += ' cmd_out=`python $RUNTIME_AREA/parseCrabFjr.py --input $RUNTIME_AREA/crab_fjr_$NJob.xml --MonitorID $MonitorID --MonitorJobID $MonitorJobID`\n' |
1438 |
< |
txt += ' echo "Result of parsing the FrameworkJobReport crab_fjr.xml: $cmd_out"\n' |
1439 |
< |
txt += ' tmp_executable_exit_status=`echo $cmd_out | awk -F\; \'{print $1}\' | awk -F \' \' \'{print $NF}\'`\n' |
1440 |
< |
txt += ' if [ -n $tmp_executable_exit_status ];then\n' |
1201 |
< |
txt += ' executable_exit_status=$tmp_executable_exit_status\n' |
1202 |
< |
txt += ' fi\n' |
1437 |
> |
txt += ' cmd_out=`python $RUNTIME_AREA/parseCrabFjr.py --input $RUNTIME_AREA/crab_fjr_$NJob.xml --dashboard $MonitorID,$MonitorJobID '+self.debugWrap+'`\n' |
1438 |
> |
if self.debug_wrapper : |
1439 |
> |
txt += ' echo "Result of parsing the FrameworkJobReport crab_fjr.xml: $cmd_out"\n' |
1440 |
> |
txt += ' executable_exit_status=`python $RUNTIME_AREA/parseCrabFjr.py --input $RUNTIME_AREA/crab_fjr_$NJob.xml --exitcode`\n' |
1441 |
|
txt += ' if [ $executable_exit_status -eq 50115 ];then\n' |
1442 |
|
txt += ' echo ">>> crab_fjr.xml contents: "\n' |
1443 |
< |
txt += ' cat $RUNTIME_AREA/crab_fjr_NJob.xml\n' |
1443 |
> |
txt += ' cat $RUNTIME_AREA/crab_fjr_$NJob.xml\n' |
1444 |
|
txt += ' echo "Wrong FrameworkJobReport --> does not contain useful info. ExitStatus: $executable_exit_status"\n' |
1445 |
+ |
txt += ' elif [ $executable_exit_status -eq -999 ];then\n' |
1446 |
+ |
txt += ' echo "ExitStatus from FrameworkJobReport not available. not available. Using exit code of executable from command line."\n' |
1447 |
|
txt += ' else\n' |
1448 |
|
txt += ' echo "Extracted ExitStatus from FrameworkJobReport parsing output: $executable_exit_status"\n' |
1449 |
|
txt += ' fi\n' |
1451 |
|
txt += ' echo "CRAB python script to parse CRAB FrameworkJobReport crab_fjr.xml is not available, using exit code of executable from command line."\n' |
1452 |
|
txt += ' fi\n' |
1453 |
|
#### Patch to check input data reading for CMSSW16x Hopefully we-ll remove it asap |
1454 |
< |
|
1455 |
< |
if self.datasetPath: |
1454 |
> |
txt += ' if [ $executable_exit_status -eq 0 ];then\n' |
1455 |
> |
txt += ' echo ">>> Executable succeded $executable_exit_status"\n' |
1456 |
> |
if (self.datasetPath and not (self.dataset_pu or self.useParent)) : |
1457 |
|
# VERIFY PROCESSED DATA |
1217 |
– |
txt += ' if [ $executable_exit_status -eq 0 ];then\n' |
1458 |
|
txt += ' echo ">>> Verify list of processed files:"\n' |
1459 |
< |
txt += ' echo $InputFiles |tr -d "\\\\" |tr "," "\\n"|tr -d "\\"" > input-files.txt\n' |
1460 |
< |
txt += ' grep LFN $RUNTIME_AREA/crab_fjr_$NJob.xml |cut -d">" -f2|cut -d"<" -f1|grep "/" > processed-files.txt\n' |
1459 |
> |
txt += ' echo $InputFiles |tr -d \'\\\\\' |tr \',\' \'\\n\'|tr -d \'"\' > input-files.txt\n' |
1460 |
> |
txt += ' python $RUNTIME_AREA/parseCrabFjr.py --input $RUNTIME_AREA/crab_fjr_$NJob.xml --lfn > processed-files.txt\n' |
1461 |
|
txt += ' cat input-files.txt | sort | uniq > tmp.txt\n' |
1462 |
|
txt += ' mv tmp.txt input-files.txt\n' |
1463 |
|
txt += ' echo "cat input-files.txt"\n' |
1478 |
|
txt += ' echo " ==> list of processed files from crab_fjr.xml differs from list in pset.cfg"\n' |
1479 |
|
txt += ' echo " ==> diff input-files.txt processed-files.txt"\n' |
1480 |
|
txt += ' fi\n' |
1481 |
< |
txt += ' fi\n' |
1482 |
< |
txt += '\n' |
1481 |
> |
txt += ' elif [ $executable_exit_status -ne 0 ] || [ $executable_exit_status -ne 50015 ] || [ $executable_exit_status -ne 50017 ];then\n' |
1482 |
> |
txt += ' echo ">>> Executable failed $executable_exit_status"\n' |
1483 |
> |
txt += ' func_exit\n' |
1484 |
> |
txt += ' fi\n' |
1485 |
> |
txt += '\n' |
1486 |
|
txt += 'else\n' |
1487 |
|
txt += ' echo "CRAB FrameworkJobReport crab_fjr.xml is not available, using exit code of executable from command line."\n' |
1488 |
|
txt += 'fi\n' |
1519 |
|
stderr = 'CMSSW_$NJob.stderr' |
1520 |
|
if (self.return_data == 1): |
1521 |
|
for file in (self.output_file+self.output_file_sandbox): |
1522 |
< |
listOutFiles.append(self.numberFile_(file, '$NJob')) |
1522 |
> |
listOutFiles.append(numberFile(file, '$NJob')) |
1523 |
|
listOutFiles.append(stdout) |
1524 |
|
listOutFiles.append(stderr) |
1525 |
|
else: |
1526 |
|
for file in (self.output_file_sandbox): |
1527 |
< |
listOutFiles.append(self.numberFile_(file, '$NJob')) |
1527 |
> |
listOutFiles.append(numberFile(file, '$NJob')) |
1528 |
|
listOutFiles.append(stdout) |
1529 |
|
listOutFiles.append(stderr) |
1530 |
|
txt += 'echo "output files: '+string.join(listOutFiles,' ')+'"\n' |