2 |
|
from crab_logger import Logger |
3 |
|
from crab_exceptions import * |
4 |
|
from crab_util import * |
5 |
< |
from BlackWhiteListParser import BlackWhiteListParser |
5 |
> |
from BlackWhiteListParser import SEBlackWhiteListParser |
6 |
|
import common |
7 |
|
import Scram |
8 |
|
from LFNBaseName import * |
10 |
|
import os, string, glob |
11 |
|
|
12 |
|
class Cmssw(JobType): |
13 |
< |
def __init__(self, cfg_params, ncjobs): |
13 |
> |
def __init__(self, cfg_params, ncjobs,skip_blocks, isNew): |
14 |
|
JobType.__init__(self, 'CMSSW') |
15 |
|
common.logger.debug(3,'CMSSW::__init__') |
16 |
+ |
self.skip_blocks = skip_blocks |
17 |
|
|
18 |
|
self.argsList = [] |
19 |
|
|
20 |
|
self._params = {} |
21 |
|
self.cfg_params = cfg_params |
22 |
|
# init BlackWhiteListParser |
23 |
< |
self.blackWhiteListParser = BlackWhiteListParser(cfg_params) |
23 |
> |
self.blackWhiteListParser = SEBlackWhiteListParser(cfg_params) |
24 |
|
|
25 |
|
self.MaxTarBallSize = float(self.cfg_params.get('EDG.maxtarballsize',9.5)) |
26 |
|
|
36 |
|
self.executable_arch = self.scram.getArch() |
37 |
|
self.tgz_name = 'default.tgz' |
38 |
|
self.scriptName = 'CMSSW.sh' |
39 |
< |
self.pset = '' |
39 |
> |
self.pset = '' |
40 |
|
self.datasetPath = '' |
41 |
|
|
42 |
|
# set FJR file name |
60 |
|
if not cfg_params.has_key('CMSSW.datasetpath'): |
61 |
|
msg = "Error: datasetpath not defined " |
62 |
|
raise CrabException(msg) |
63 |
+ |
|
64 |
+ |
### Temporary: added to remove input file control in the case of PU |
65 |
+ |
self.dataset_pu = cfg_params.get('CMSSW.dataset_pu', None) |
66 |
+ |
|
67 |
|
tmp = cfg_params['CMSSW.datasetpath'] |
68 |
|
log.debug(6, "CMSSW::CMSSW(): datasetPath = "+tmp) |
69 |
|
if string.lower(tmp)=='none': |
74 |
|
self.selectNoInput = 0 |
75 |
|
|
76 |
|
self.dataTiers = [] |
77 |
< |
|
78 |
< |
self.debug_pset = cfg_params.get('USER.debug_pset',False) |
79 |
< |
|
77 |
> |
self.debugWrap = '' |
78 |
> |
self.debug_wrapper = cfg_params.get('USER.debug_wrapper',False) |
79 |
> |
if self.debug_wrapper: self.debugWrap='--debug' |
80 |
|
## now the application |
81 |
|
self.executable = cfg_params.get('CMSSW.executable','cmsRun') |
82 |
|
log.debug(6, "CMSSW::CMSSW(): executable = "+self.executable) |
99 |
|
self.output_file_sandbox.append(self.fjrFileName) |
100 |
|
|
101 |
|
# other output files to be returned via sandbox or copied to SE |
102 |
+ |
outfileflag = False |
103 |
|
self.output_file = [] |
104 |
|
tmp = cfg_params.get('CMSSW.output_file',None) |
105 |
|
if tmp : |
106 |
< |
tmpOutFiles = string.split(tmp,',') |
107 |
< |
log.debug(7, 'cmssw::cmssw(): output files '+str(tmpOutFiles)) |
108 |
< |
for tmp in tmpOutFiles: |
109 |
< |
tmp=string.strip(tmp) |
104 |
< |
self.output_file.append(tmp) |
105 |
< |
pass |
106 |
< |
else: |
107 |
< |
log.message("No output file defined: only stdout/err and the CRAB Framework Job Report will be available\n") |
108 |
< |
pass |
106 |
> |
self.output_file = [x.strip() for x in tmp.split(',')] |
107 |
> |
outfileflag = True #output found |
108 |
> |
#else: |
109 |
> |
# log.message("No output file defined: only stdout/err and the CRAB Framework Job Report will be available\n") |
110 |
|
|
111 |
|
# script_exe file as additional file in inputSandbox |
112 |
|
self.scriptExe = cfg_params.get('USER.script_exe',None) |
120 |
|
msg ="Error. script_exe not defined" |
121 |
|
raise CrabException(msg) |
122 |
|
|
123 |
+ |
# use parent files... |
124 |
+ |
self.useParent = self.cfg_params.get('CMSSW.use_parent',False) |
125 |
+ |
|
126 |
|
## additional input files |
127 |
|
if cfg_params.has_key('USER.additional_input_files'): |
128 |
|
tmpAddFiles = string.split(cfg_params['USER.additional_input_files'],',') |
166 |
|
if cfg_params.has_key('CMSSW.total_number_of_events'): |
167 |
|
self.total_number_of_events = int(cfg_params['CMSSW.total_number_of_events']) |
168 |
|
self.selectTotalNumberEvents = 1 |
169 |
+ |
if self.selectNumberOfJobs == 1: |
170 |
+ |
if (self.total_number_of_events != -1) and int(self.total_number_of_events) < int(self.theNumberOfJobs): |
171 |
+ |
msg = 'Must specify at least one event per job. total_number_of_events > number_of_jobs ' |
172 |
+ |
raise CrabException(msg) |
173 |
|
else: |
174 |
|
self.total_number_of_events = 0 |
175 |
|
self.selectTotalNumberEvents = 0 |
197 |
|
tmp.strip() |
198 |
|
self.incrementSeeds.append(tmp) |
199 |
|
|
200 |
< |
## Old method of dealing with seeds |
201 |
< |
## FUTURE: This is for old CMSSW and old CRAB. Can throw exceptions after a couple of CRAB releases and then |
194 |
< |
## remove |
195 |
< |
self.sourceSeed = cfg_params.get('CMSSW.pythia_seed',None) |
196 |
< |
if self.sourceSeed: |
197 |
< |
print "pythia_seed is a deprecated parameter. Use preserve_seeds or increment_seeds in the future.\n","Added to increment_seeds." |
198 |
< |
self.incrementSeeds.append('sourceSeed') |
199 |
< |
self.incrementSeeds.append('theSource') |
200 |
< |
|
200 |
> |
## FUTURE: Can remove in CRAB 2.4.0 |
201 |
> |
self.sourceSeed = cfg_params.get('CMSSW.pythia_seed',None) |
202 |
|
self.sourceSeedVtx = cfg_params.get('CMSSW.vtx_seed',None) |
203 |
< |
if self.sourceSeedVtx: |
203 |
< |
print "vtx_seed is a deprecated parameter. Use preserve_seeds or increment_seeds in the future.\n","Added to increment_seeds." |
204 |
< |
self.incrementSeeds.append('VtxSmeared') |
205 |
< |
|
206 |
< |
self.sourceSeedG4 = cfg_params.get('CMSSW.g4_seed',None) |
207 |
< |
if self.sourceSeedG4: |
208 |
< |
print "g4_seed is a deprecated parameter. Use preserve_seeds or increment_seeds in the future.\n","Added to increment_seeds." |
209 |
< |
self.incrementSeeds.append('g4SimHits') |
210 |
< |
|
203 |
> |
self.sourceSeedG4 = cfg_params.get('CMSSW.g4_seed',None) |
204 |
|
self.sourceSeedMix = cfg_params.get('CMSSW.mix_seed',None) |
205 |
< |
if self.sourceSeedMix: |
206 |
< |
print "mix_seed is a deprecated parameter. Use preserve_seeds or increment_seeds in the future.\n","Added to increment_seeds." |
207 |
< |
self.incrementSeeds.append('mix') |
205 |
> |
if self.sourceSeed or self.sourceSeedVtx or self.sourceSeedG4 or self.sourceSeedMix: |
206 |
> |
msg = 'pythia_seed, vtx_seed, g4_seed, and mix_seed are no longer valid settings. You must use increment_seeds or preserve_seeds' |
207 |
> |
raise CrabException(msg) |
208 |
|
|
209 |
|
self.firstRun = cfg_params.get('CMSSW.first_run',None) |
210 |
|
|
218 |
– |
if self.pset != None: #CarlosDaniele |
219 |
– |
import PsetManipulator as pp |
220 |
– |
PsetEdit = pp.PsetManipulator(self.pset) #Daniele Pset |
221 |
– |
|
211 |
|
# Copy/return |
223 |
– |
|
212 |
|
self.copy_data = int(cfg_params.get('USER.copy_data',0)) |
213 |
|
self.return_data = int(cfg_params.get('USER.return_data',0)) |
214 |
|
|
224 |
|
blockSites = self.DataDiscoveryAndLocation(cfg_params) |
225 |
|
#DBSDLS-end |
226 |
|
|
239 |
– |
|
227 |
|
## Select Splitting |
228 |
|
if self.selectNoInput: |
229 |
|
if self.pset == None: |
233 |
|
else: |
234 |
|
self.jobSplittingByBlocks(blockSites) |
235 |
|
|
236 |
< |
# modify Pset |
237 |
< |
if self.pset != None: |
238 |
< |
try: |
239 |
< |
# Add FrameworkJobReport to parameter-set, set max events. |
240 |
< |
# Reset later for data jobs by writeCFG which does all modifications |
241 |
< |
PsetEdit.addCrabFJR(self.fjrFileName) # FUTURE: Job report addition not needed by CMSSW>1.5 |
242 |
< |
PsetEdit.maxEvent(self.eventsPerJob) |
243 |
< |
PsetEdit.psetWriter(self.configFilename()) |
244 |
< |
except: |
245 |
< |
msg='Error while manipulating ParameterSet: exiting...' |
246 |
< |
raise CrabException(msg) |
247 |
< |
self.tgzNameWithPath = self.getTarBall(self.executable) |
236 |
> |
# modify Pset only the first time |
237 |
> |
if isNew: |
238 |
> |
if self.pset != None: |
239 |
> |
import PsetManipulator as pp |
240 |
> |
PsetEdit = pp.PsetManipulator(self.pset) |
241 |
> |
try: |
242 |
> |
# Add FrameworkJobReport to parameter-set, set max events. |
243 |
> |
# Reset later for data jobs by writeCFG which does all modifications |
244 |
> |
PsetEdit.addCrabFJR(self.fjrFileName) # FUTURE: Job report addition not needed by CMSSW>1.5 |
245 |
> |
PsetEdit.maxEvent(self.eventsPerJob) |
246 |
> |
PsetEdit.psetWriter(self.configFilename()) |
247 |
> |
## If present, add TFileService to output files |
248 |
> |
if not int(cfg_params.get('CMSSW.skip_TFileService_output',0)): |
249 |
> |
tfsOutput = PsetEdit.getTFileService() |
250 |
> |
if tfsOutput: |
251 |
> |
if tfsOutput in self.output_file: |
252 |
> |
common.logger.debug(5,"Output from TFileService "+tfsOutput+" already in output files") |
253 |
> |
else: |
254 |
> |
outfileflag = True #output found |
255 |
> |
self.output_file.append(tfsOutput) |
256 |
> |
common.logger.message("Adding "+tfsOutput+" to output files (from TFileService)") |
257 |
> |
pass |
258 |
> |
pass |
259 |
> |
## If present and requested, add PoolOutputModule to output files |
260 |
> |
if int(cfg_params.get('CMSSW.get_edm_output',0)): |
261 |
> |
edmOutput = PsetEdit.getPoolOutputModule() |
262 |
> |
if edmOutput: |
263 |
> |
if edmOutput in self.output_file: |
264 |
> |
common.logger.debug(5,"Output from PoolOutputModule "+edmOutput+" already in output files") |
265 |
> |
else: |
266 |
> |
self.output_file.append(edmOutput) |
267 |
> |
common.logger.message("Adding "+edmOutput+" to output files (from PoolOutputModule)") |
268 |
> |
pass |
269 |
> |
pass |
270 |
> |
except CrabException: |
271 |
> |
msg='Error while manipulating ParameterSet: exiting...' |
272 |
> |
raise CrabException(msg) |
273 |
> |
## Prepare inputSandbox TarBall (only the first time) |
274 |
> |
self.tgzNameWithPath = self.getTarBall(self.executable) |
275 |
|
|
276 |
|
def DataDiscoveryAndLocation(self, cfg_params): |
277 |
|
|
284 |
|
## Contact the DBS |
285 |
|
common.logger.message("Contacting Data Discovery Services ...") |
286 |
|
try: |
287 |
< |
self.pubdata=DataDiscovery.DataDiscovery(datasetPath, cfg_params) |
287 |
> |
self.pubdata=DataDiscovery.DataDiscovery(datasetPath, cfg_params,self.skip_blocks) |
288 |
|
self.pubdata.fetchDBSInfo() |
289 |
|
|
290 |
|
except DataDiscovery.NotExistingDatasetError, ex : |
300 |
|
self.filesbyblock=self.pubdata.getFiles() |
301 |
|
self.eventsbyblock=self.pubdata.getEventsPerBlock() |
302 |
|
self.eventsbyfile=self.pubdata.getEventsPerFile() |
303 |
+ |
self.parentFiles=self.pubdata.getParent() |
304 |
|
|
305 |
|
## get max number of events |
306 |
< |
self.maxEvents=self.pubdata.getMaxEvents() |
306 |
> |
self.maxEvents=self.pubdata.getMaxEvents() |
307 |
|
|
308 |
|
## Contact the DLS and build a list of sites hosting the fileblocks |
309 |
|
try: |
417 |
|
|
418 |
|
# ---- Iterate over the files in the block until we've met the requested ---- # |
419 |
|
# ---- total # of events or we've gone over all the files in this block ---- # |
420 |
+ |
pString='' |
421 |
|
while ( (eventsRemaining > 0) and (fileCount < numFilesInBlock) and (jobCount < totalNumberOfJobs) ): |
422 |
|
file = files[fileCount] |
423 |
+ |
if self.useParent: |
424 |
+ |
parent = self.parentFiles[file] |
425 |
+ |
for f in parent : |
426 |
+ |
pString += '\\\"' + f + '\\\"\,' |
427 |
+ |
common.logger.debug(6, "File "+str(file)+" has the following parents: "+str(parent)) |
428 |
+ |
common.logger.write("File "+str(file)+" has the following parents: "+str(parent)) |
429 |
|
if newFile : |
430 |
|
try: |
431 |
|
numEventsInFile = self.eventsbyfile[file] |
446 |
|
# end job using last file, use remaining events in block |
447 |
|
# close job and touch new file |
448 |
|
fullString = parString[:-2] |
449 |
< |
list_of_lists.append([fullString,str(-1),str(jobSkipEventCount)]) |
449 |
> |
if self.useParent: |
450 |
> |
fullParentString = pString[:-2] |
451 |
> |
list_of_lists.append([fullString,fullParentString,str(-1),str(jobSkipEventCount)]) |
452 |
> |
else: |
453 |
> |
list_of_lists.append([fullString,str(-1),str(jobSkipEventCount)]) |
454 |
|
common.logger.debug(3,"Job "+str(jobCount+1)+" can run over "+str(filesEventCount - jobSkipEventCount)+" events (last file in block).") |
455 |
|
self.jobDestination.append(blockSites[block]) |
456 |
|
common.logger.debug(5,"Job "+str(jobCount+1)+" Destination: "+str(self.jobDestination[jobCount])) |
462 |
|
eventsRemaining = eventsRemaining - filesEventCount + jobSkipEventCount |
463 |
|
jobSkipEventCount = 0 |
464 |
|
# reset file |
465 |
+ |
pString = "" |
466 |
|
parString = "" |
467 |
|
filesEventCount = 0 |
468 |
|
newFile = 1 |
475 |
|
elif ( filesEventCount - jobSkipEventCount == eventsPerJobRequested ) : |
476 |
|
# close job and touch new file |
477 |
|
fullString = parString[:-2] |
478 |
< |
list_of_lists.append([fullString,str(eventsPerJobRequested),str(jobSkipEventCount)]) |
478 |
> |
if self.useParent: |
479 |
> |
fullParentString = pString[:-2] |
480 |
> |
list_of_lists.append([fullString,fullParentString,str(eventsPerJobRequested),str(jobSkipEventCount)]) |
481 |
> |
else: |
482 |
> |
list_of_lists.append([fullString,str(eventsPerJobRequested),str(jobSkipEventCount)]) |
483 |
|
common.logger.debug(3,"Job "+str(jobCount+1)+" can run over "+str(eventsPerJobRequested)+" events.") |
484 |
|
self.jobDestination.append(blockSites[block]) |
485 |
|
common.logger.debug(5,"Job "+str(jobCount+1)+" Destination: "+str(self.jobDestination[jobCount])) |
490 |
|
eventsRemaining = eventsRemaining - eventsPerJobRequested |
491 |
|
jobSkipEventCount = 0 |
492 |
|
# reset file |
493 |
+ |
pString = "" |
494 |
|
parString = "" |
495 |
|
filesEventCount = 0 |
496 |
|
newFile = 1 |
500 |
|
else : |
501 |
|
# close job but don't touch new file |
502 |
|
fullString = parString[:-2] |
503 |
< |
list_of_lists.append([fullString,str(eventsPerJobRequested),str(jobSkipEventCount)]) |
503 |
> |
if self.useParent: |
504 |
> |
fullParentString = pString[:-2] |
505 |
> |
list_of_lists.append([fullString,fullParentString,str(eventsPerJobRequested),str(jobSkipEventCount)]) |
506 |
> |
else: |
507 |
> |
list_of_lists.append([fullString,str(eventsPerJobRequested),str(jobSkipEventCount)]) |
508 |
|
common.logger.debug(3,"Job "+str(jobCount+1)+" can run over "+str(eventsPerJobRequested)+" events.") |
509 |
|
self.jobDestination.append(blockSites[block]) |
510 |
|
common.logger.debug(5,"Job "+str(jobCount+1)+" Destination: "+str(self.jobDestination[jobCount])) |
518 |
|
jobSkipEventCount = eventsPerJobRequested - (filesEventCount - jobSkipEventCount - self.eventsbyfile[file]) |
519 |
|
# remove all but the last file |
520 |
|
filesEventCount = self.eventsbyfile[file] |
521 |
+ |
if self.useParent: |
522 |
+ |
for f in parent : pString += '\\\"' + f + '\\\"\,' |
523 |
|
parString = '\\\"' + file + '\\\"\,' |
524 |
|
pass # END if |
525 |
|
pass # END while (iterate over files in the block) |
648 |
|
self.list_of_args.append([str(i)]) |
649 |
|
return |
650 |
|
|
651 |
< |
def split(self, jobParams): |
651 |
> |
def split(self, jobParams,firstJobID): |
652 |
|
|
653 |
|
njobs = self.total_number_of_jobs |
654 |
|
arglist = self.list_of_args |
658 |
|
|
659 |
|
listID=[] |
660 |
|
listField=[] |
661 |
< |
for job in range(njobs): |
662 |
< |
jobParams[job] = arglist[job] |
661 |
> |
for id in range(njobs): |
662 |
> |
job = id + int(firstJobID) |
663 |
> |
jobParams[id] = arglist[id] |
664 |
|
listID.append(job+1) |
665 |
|
job_ToSave ={} |
666 |
|
concString = ' ' |
667 |
|
argu='' |
668 |
< |
if len(jobParams[job]): |
669 |
< |
argu += concString.join(jobParams[job] ) |
668 |
> |
if len(jobParams[id]): |
669 |
> |
argu += concString.join(jobParams[id] ) |
670 |
|
job_ToSave['arguments']= str(job+1)+' '+argu |
671 |
< |
job_ToSave['dlsDestination']= self.jobDestination[job] |
671 |
> |
job_ToSave['dlsDestination']= self.jobDestination[id] |
672 |
|
listField.append(job_ToSave) |
673 |
|
msg="Job "+str(job)+" Arguments: "+str(job+1)+" "+argu+"\n" \ |
674 |
< |
+" Destination: "+str(self.jobDestination[job]) |
674 |
> |
+" Destination: "+str(self.jobDestination[id]) |
675 |
|
common.logger.debug(5,msg) |
676 |
|
common._db.updateJob_(listID,listField) |
677 |
|
self.argsList = (len(jobParams[0])+1) |
744 |
|
tar.add(module,moduleDir) |
745 |
|
|
746 |
|
## Now check if any data dir(s) is present |
708 |
– |
swAreaLen=len(swArea) |
747 |
|
self.dataExist = False |
748 |
< |
for root, dirs, files in os.walk(swArea): |
749 |
< |
if "data" in dirs: |
750 |
< |
self.dataExist=True |
751 |
< |
common.logger.debug(5,"data "+root+"/data"+" to be tarred") |
752 |
< |
tar.add(root+"/data",root[swAreaLen:]+"/data") |
748 |
> |
todo_list = [(i, i) for i in os.listdir(swArea+"/src")] |
749 |
> |
while len(todo_list): |
750 |
> |
entry, name = todo_list.pop() |
751 |
> |
if name.startswith('crab_0_') or name.startswith('.') or name == 'CVS': |
752 |
> |
continue |
753 |
> |
if os.path.isdir(swArea+"/src/"+entry): |
754 |
> |
entryPath = entry + '/' |
755 |
> |
todo_list += [(entryPath + i, i) for i in os.listdir(swArea+"/src/"+entry)] |
756 |
> |
if name == 'data': |
757 |
> |
self.dataExist=True |
758 |
> |
common.logger.debug(5,"data "+entry+" to be tarred") |
759 |
> |
tar.add(swArea+"/src/"+entry,"src/"+entry) |
760 |
> |
pass |
761 |
> |
pass |
762 |
|
|
763 |
|
### CMSSW ParameterSet |
764 |
|
if not self.pset is None: |
768 |
|
|
769 |
|
|
770 |
|
## Add ProdCommon dir to tar |
771 |
< |
prodcommonDir = 'ProdCommon' |
772 |
< |
prodcommonPath = os.environ['CRABDIR'] + '/' + 'ProdCommon' |
773 |
< |
if os.path.isdir(prodcommonPath): |
774 |
< |
tar.add(prodcommonPath,prodcommonDir) |
771 |
> |
prodcommonDir = './' |
772 |
> |
prodcommonPath = os.environ['CRABDIR'] + '/' + 'external/' |
773 |
> |
neededStuff = ['ProdCommon/__init__.py','ProdCommon/FwkJobRep', 'ProdCommon/CMSConfigTools','ProdCommon/Core','ProdCommon/MCPayloads', 'IMProv'] |
774 |
> |
for file in neededStuff: |
775 |
> |
tar.add(prodcommonPath+file,prodcommonDir+file) |
776 |
|
common.logger.debug(5,"Files added to "+self.tgzNameWithPath+" : "+str(tar.getnames())) |
777 |
|
|
778 |
|
##### ML stuff |
783 |
|
common.logger.debug(5,"Files added to "+self.tgzNameWithPath+" : "+str(tar.getnames())) |
784 |
|
|
785 |
|
##### Utils |
786 |
< |
Utils_file_list=['parseCrabFjr.py','writeCfg.py', 'JobReportErrorCode.py'] |
786 |
> |
Utils_file_list=['parseCrabFjr.py','writeCfg.py', 'fillCrabFjr.py'] |
787 |
|
for file in Utils_file_list: |
788 |
|
tar.add(path+file,file) |
789 |
|
common.logger.debug(5,"Files added to "+self.tgzNameWithPath+" : "+str(tar.getnames())) |
794 |
|
common.logger.debug(5,"Files added to "+self.tgzNameWithPath+" : "+str(tar.getnames())) |
795 |
|
|
796 |
|
tar.close() |
797 |
< |
except : |
798 |
< |
raise CrabException('Could not create tar-ball') |
797 |
> |
except IOError: |
798 |
> |
raise CrabException('Could not create tar-ball '+self.tgzNameWithPath) |
799 |
> |
except tarfile.TarError: |
800 |
> |
raise CrabException('Could not create tar-ball '+self.tgzNameWithPath) |
801 |
|
|
802 |
|
## check for tarball size |
803 |
|
tarballinfo = os.stat(self.tgzNameWithPath) |
876 |
|
txt += 'DatasetPath='+self.datasetPath+'\n' |
877 |
|
|
878 |
|
datasetpath_split = self.datasetPath.split("/") |
879 |
< |
|
879 |
> |
### FEDE FOR NEW LFN ### |
880 |
> |
self.primaryDataset = datasetpath_split[1] |
881 |
> |
######################## |
882 |
|
txt += 'PrimaryDataset='+datasetpath_split[1]+'\n' |
883 |
|
txt += 'DataTier='+datasetpath_split[2]+'\n' |
884 |
|
txt += 'ApplicationFamily=cmsRun\n' |
885 |
|
|
886 |
|
else: |
887 |
|
txt += 'DatasetPath=MCDataTier\n' |
888 |
+ |
### FEDE FOR NEW LFN ### |
889 |
+ |
self.primaryDataset = 'null' |
890 |
+ |
######################## |
891 |
|
txt += 'PrimaryDataset=null\n' |
892 |
|
txt += 'DataTier=null\n' |
893 |
|
txt += 'ApplicationFamily=MCDataTier\n' |
897 |
|
txt += 'cp $RUNTIME_AREA/'+pset+' .\n' |
898 |
|
if (self.datasetPath): # standard job |
899 |
|
txt += 'InputFiles=${args[1]}; export InputFiles\n' |
900 |
< |
txt += 'MaxEvents=${args[2]}; export MaxEvents\n' |
901 |
< |
txt += 'SkipEvents=${args[3]}; export SkipEvents\n' |
900 |
> |
if (self.useParent): |
901 |
> |
txt += 'ParentFiles=${args[2]}; export ParentFiles\n' |
902 |
> |
txt += 'MaxEvents=${args[3]}; export MaxEvents\n' |
903 |
> |
txt += 'SkipEvents=${args[4]}; export SkipEvents\n' |
904 |
> |
else: |
905 |
> |
txt += 'MaxEvents=${args[2]}; export MaxEvents\n' |
906 |
> |
txt += 'SkipEvents=${args[3]}; export SkipEvents\n' |
907 |
|
txt += 'echo "Inputfiles:<$InputFiles>"\n' |
908 |
+ |
if (self.useParent): txt += 'echo "ParentFiles:<$ParentFiles>"\n' |
909 |
|
txt += 'echo "MaxEvents:<$MaxEvents>"\n' |
910 |
|
txt += 'echo "SkipEvents:<$SkipEvents>"\n' |
911 |
|
else: # pythia like job |
923 |
|
if self.pset != None: |
924 |
|
# FUTURE: Can simply for 2_1_x and higher |
925 |
|
txt += '\n' |
926 |
< |
if self.debug_pset==True: |
926 |
> |
if self.debug_wrapper==True: |
927 |
|
txt += 'echo "***** cat ' + psetName + ' *********"\n' |
928 |
|
txt += 'cat ' + psetName + '\n' |
929 |
|
txt += 'echo "****** end ' + psetName + ' ********"\n' |
930 |
|
txt += '\n' |
931 |
< |
txt += 'PSETHASH=`edmConfigHash < ' + psetName + '` \n' |
931 |
> |
if (self.CMSSW_major >= 2 and self.CMSSW_minor >= 1) or (self.CMSSW_major >= 3): |
932 |
> |
txt += 'PSETHASH=`edmConfigHash ' + psetName + '` \n' |
933 |
> |
else: |
934 |
> |
txt += 'PSETHASH=`edmConfigHash < ' + psetName + '` \n' |
935 |
|
txt += 'echo "PSETHASH = $PSETHASH" \n' |
936 |
|
txt += '\n' |
937 |
|
return txt |
947 |
|
if os.path.isfile(self.tgzNameWithPath): |
948 |
|
txt += 'echo ">>> tar xzvf $RUNTIME_AREA/'+os.path.basename(self.tgzNameWithPath)+' :" \n' |
949 |
|
txt += 'tar xzvf $RUNTIME_AREA/'+os.path.basename(self.tgzNameWithPath)+'\n' |
950 |
< |
txt += 'ls -Al \n' |
950 |
> |
if self.debug_wrapper: |
951 |
> |
txt += 'ls -Al \n' |
952 |
|
txt += 'untar_status=$? \n' |
953 |
|
txt += 'if [ $untar_status -ne 0 ]; then \n' |
954 |
|
txt += ' echo "ERROR ==> Untarring .tgz file failed"\n' |
958 |
|
txt += ' echo "Successful untar" \n' |
959 |
|
txt += 'fi \n' |
960 |
|
txt += '\n' |
961 |
< |
txt += 'echo ">>> Include ProdCommon in PYTHONPATH:"\n' |
961 |
> |
txt += 'echo ">>> Include $RUNTIME_AREA in PYTHONPATH:"\n' |
962 |
|
txt += 'if [ -z "$PYTHONPATH" ]; then\n' |
963 |
< |
txt += ' export PYTHONPATH=$RUNTIME_AREA/ProdCommon\n' |
963 |
> |
txt += ' export PYTHONPATH=$RUNTIME_AREA/\n' |
964 |
|
txt += 'else\n' |
965 |
< |
txt += ' export PYTHONPATH=$RUNTIME_AREA/ProdCommon:${PYTHONPATH}\n' |
965 |
> |
txt += ' export PYTHONPATH=$RUNTIME_AREA/:${PYTHONPATH}\n' |
966 |
|
txt += 'echo "PYTHONPATH=$PYTHONPATH"\n' |
967 |
|
txt += 'fi\n' |
968 |
|
txt += '\n' |
989 |
|
if len(self.additional_inbox_files)>0: |
990 |
|
for file in self.additional_inbox_files: |
991 |
|
txt += 'mv $RUNTIME_AREA/'+os.path.basename(file)+' . \n' |
992 |
< |
txt += 'mv $RUNTIME_AREA/ProdCommon/ . \n' |
992 |
> |
# txt += 'mv $RUNTIME_AREA/ProdCommon/ . \n' |
993 |
> |
# txt += 'mv $RUNTIME_AREA/IMProv/ . \n' |
994 |
|
|
995 |
+ |
txt += 'echo ">>> Include $RUNTIME_AREA in PYTHONPATH:"\n' |
996 |
|
txt += 'if [ -z "$PYTHONPATH" ]; then\n' |
997 |
< |
txt += ' export PYTHONPATH=$SOFTWARE_DIR/ProdCommon\n' |
997 |
> |
txt += ' export PYTHONPATH=$RUNTIME_AREA/\n' |
998 |
|
txt += 'else\n' |
999 |
< |
txt += ' export PYTHONPATH=$SOFTWARE_DIR/ProdCommon:${PYTHONPATH}\n' |
999 |
> |
txt += ' export PYTHONPATH=$RUNTIME_AREA/:${PYTHONPATH}\n' |
1000 |
|
txt += 'echo "PYTHONPATH=$PYTHONPATH"\n' |
1001 |
|
txt += 'fi\n' |
1002 |
|
txt += '\n' |
1003 |
|
|
1004 |
|
return txt |
1005 |
|
|
939 |
– |
def modifySteeringCards(self, nj): |
940 |
– |
""" |
941 |
– |
modify the card provided by the user, |
942 |
– |
writing a new card into share dir |
943 |
– |
""" |
1006 |
|
|
1007 |
|
def executableName(self): |
1008 |
< |
if self.scriptExe: |
1008 |
> |
if self.scriptExe: |
1009 |
|
return "sh " |
1010 |
|
else: |
1011 |
|
return self.executable |
1047 |
|
## User Declared output files |
1048 |
|
for out in (self.output_file+self.output_file_sandbox): |
1049 |
|
n_out = nj + 1 |
1050 |
< |
out_box.append(self.numberFile_(out,str(n_out))) |
1050 |
> |
out_box.append(numberFile(out,str(n_out))) |
1051 |
|
return out_box |
1052 |
|
|
991 |
– |
def prepareSteeringCards(self): |
992 |
– |
""" |
993 |
– |
Make initial modifications of the user's steering card file. |
994 |
– |
""" |
995 |
– |
return |
1053 |
|
|
1054 |
|
def wsRenameOutput(self, nj): |
1055 |
|
""" |
1059 |
|
txt = '\n#Written by cms_cmssw::wsRenameOutput\n' |
1060 |
|
txt += 'echo ">>> current directory (SOFTWARE_DIR): $SOFTWARE_DIR" \n' |
1061 |
|
txt += 'echo ">>> current directory content:"\n' |
1062 |
< |
txt += 'ls \n' |
1062 |
> |
if self.debug_wrapper: |
1063 |
> |
txt += 'ls -Al\n' |
1064 |
|
txt += '\n' |
1065 |
|
|
1066 |
|
for fileWithSuffix in (self.output_file): |
1067 |
< |
output_file_num = self.numberFile_(fileWithSuffix, '$NJob') |
1067 |
> |
output_file_num = numberFile(fileWithSuffix, '$NJob') |
1068 |
|
txt += '\n' |
1069 |
|
txt += '# check output file\n' |
1070 |
|
txt += 'if [ -e ./'+fileWithSuffix+' ] ; then\n' |
1085 |
|
txt += 'fi\n' |
1086 |
|
file_list = [] |
1087 |
|
for fileWithSuffix in (self.output_file): |
1088 |
< |
file_list.append(self.numberFile_(fileWithSuffix, '$NJob')) |
1088 |
> |
file_list.append(numberFile(fileWithSuffix, '$NJob')) |
1089 |
|
|
1090 |
|
txt += 'file_list="'+string.join(file_list,' ')+'"\n' |
1091 |
|
txt += '\n' |
1092 |
|
txt += 'echo ">>> current directory (SOFTWARE_DIR): $SOFTWARE_DIR" \n' |
1093 |
|
txt += 'echo ">>> current directory content:"\n' |
1094 |
< |
txt += 'ls \n' |
1094 |
> |
if self.debug_wrapper: |
1095 |
> |
txt += 'ls -Al\n' |
1096 |
|
txt += '\n' |
1097 |
|
txt += 'cd $RUNTIME_AREA\n' |
1098 |
|
txt += 'echo ">>> current directory (RUNTIME_AREA): $RUNTIME_AREA"\n' |
1099 |
|
return txt |
1100 |
|
|
1042 |
– |
def numberFile_(self, file, txt): |
1043 |
– |
""" |
1044 |
– |
append _'txt' before last extension of a file |
1045 |
– |
""" |
1046 |
– |
p = string.split(file,".") |
1047 |
– |
# take away last extension |
1048 |
– |
name = p[0] |
1049 |
– |
for x in p[1:-1]: |
1050 |
– |
name=name+"."+x |
1051 |
– |
# add "_txt" |
1052 |
– |
if len(p)>1: |
1053 |
– |
ext = p[len(p)-1] |
1054 |
– |
result = name + '_' + txt + "." + ext |
1055 |
– |
else: |
1056 |
– |
result = name + '_' + txt |
1057 |
– |
|
1058 |
– |
return result |
1059 |
– |
|
1101 |
|
def getRequirements(self, nj=[]): |
1102 |
|
""" |
1103 |
|
return job requirements to add to jdl files |
1107 |
|
req='Member("VO-cms-' + \ |
1108 |
|
self.version + \ |
1109 |
|
'", other.GlueHostApplicationSoftwareRunTimeEnvironment)' |
1110 |
< |
if self.executable_arch: |
1110 |
> |
if self.executable_arch: |
1111 |
|
req+=' && Member("VO-cms-' + \ |
1112 |
|
self.executable_arch + \ |
1113 |
|
'", other.GlueHostApplicationSoftwareRunTimeEnvironment)' |
1114 |
|
|
1115 |
|
req = req + ' && (other.GlueHostNetworkAdapterOutboundIP)' |
1116 |
< |
if common.scheduler.name() == "glitecoll": |
1116 |
> |
if ( common.scheduler.name() == "glitecoll" ) or ( common.scheduler.name() == "glite"): |
1117 |
|
req += ' && other.GlueCEStateStatus == "Production" ' |
1118 |
|
|
1119 |
|
return req |
1192 |
|
publish_data = int(self.cfg_params.get('USER.publish_data',0)) |
1193 |
|
if (publish_data == 1): |
1194 |
|
processedDataset = self.cfg_params['USER.publish_data_name'] |
1195 |
< |
LFNBaseName = LFNBase(processedDataset) |
1195 |
> |
if (self.primaryDataset == 'null'): |
1196 |
> |
self.primaryDataset = processedDataset |
1197 |
> |
if (common.scheduler.name().upper() == "CAF" or common.scheduler.name().upper() == "LSF"): |
1198 |
> |
### FEDE FOR NEW LFN ### |
1199 |
> |
LFNBaseName = LFNBase(self.primaryDataset, processedDataset, LocalUser=True) |
1200 |
> |
self.user = getUserName(LocalUser=True) |
1201 |
> |
######################## |
1202 |
> |
else : |
1203 |
> |
### FEDE FOR NEW LFN ### |
1204 |
> |
LFNBaseName = LFNBase(self.primaryDataset, processedDataset) |
1205 |
> |
self.user = getUserName() |
1206 |
> |
######################## |
1207 |
|
|
1208 |
|
txt += 'if [ $copy_exit_status -eq 0 ]; then\n' |
1209 |
< |
txt += ' FOR_LFN=%s_${PSETHASH}/\n'%(LFNBaseName) |
1209 |
> |
### FEDE FOR NEW LFN ### |
1210 |
> |
#txt += ' FOR_LFN=%s_${PSETHASH}/\n'%(LFNBaseName) |
1211 |
> |
txt += ' FOR_LFN=%s/${PSETHASH}/\n'%(LFNBaseName) |
1212 |
> |
######################## |
1213 |
|
txt += 'else\n' |
1214 |
|
txt += ' FOR_LFN=/copy_problems/ \n' |
1215 |
|
txt += ' SE=""\n' |
1217 |
|
txt += 'fi\n' |
1218 |
|
|
1219 |
|
txt += 'echo ">>> Modify Job Report:" \n' |
1220 |
< |
txt += 'chmod a+x $SOFTWARE_DIR/ProdCommon/ProdCommon/FwkJobRep/ModifyJobReport.py\n' |
1220 |
> |
txt += 'chmod a+x $RUNTIME_AREA/ProdCommon/FwkJobRep/ModifyJobReport.py\n' |
1221 |
|
txt += 'ProcessedDataset='+processedDataset+'\n' |
1222 |
|
txt += 'echo "ProcessedDataset = $ProcessedDataset"\n' |
1223 |
|
txt += 'echo "SE = $SE"\n' |
1224 |
|
txt += 'echo "SE_PATH = $SE_PATH"\n' |
1225 |
|
txt += 'echo "FOR_LFN = $FOR_LFN" \n' |
1226 |
|
txt += 'echo "CMSSW_VERSION = $CMSSW_VERSION"\n\n' |
1227 |
< |
txt += 'echo "$SOFTWARE_DIR/ProdCommon/ProdCommon/FwkJobRep/ModifyJobReport.py $RUNTIME_AREA/crab_fjr_$NJob.xml $NJob $FOR_LFN $PrimaryDataset $DataTier $ProcessedDataset $ApplicationFamily $executable $CMSSW_VERSION $PSETHASH $SE $SE_PATH"\n' |
1228 |
< |
txt += '$SOFTWARE_DIR/ProdCommon/ProdCommon/FwkJobRep/ModifyJobReport.py $RUNTIME_AREA/crab_fjr_$NJob.xml $NJob $FOR_LFN $PrimaryDataset $DataTier $ProcessedDataset $ApplicationFamily $executable $CMSSW_VERSION $PSETHASH $SE $SE_PATH\n' |
1227 |
> |
### FEDE FOR NEW LFN ### |
1228 |
> |
txt += 'echo "$RUNTIME_AREA/ProdCommon/FwkJobRep/ModifyJobReport.py $RUNTIME_AREA/crab_fjr_$NJob.xml $NJob $FOR_LFN $PrimaryDataset $DataTier ' + self.user + '-$ProcessedDataset-$PSETHASH $ApplicationFamily $executable $CMSSW_VERSION $PSETHASH $SE $SE_PATH"\n' |
1229 |
> |
txt += '$RUNTIME_AREA/ProdCommon/FwkJobRep/ModifyJobReport.py $RUNTIME_AREA/crab_fjr_$NJob.xml $NJob $FOR_LFN $PrimaryDataset $DataTier ' + self.user + '-$ProcessedDataset-$PSETHASH $ApplicationFamily $executable $CMSSW_VERSION $PSETHASH $SE $SE_PATH\n' |
1230 |
> |
######################## |
1231 |
|
txt += 'modifyReport_result=$?\n' |
1232 |
|
txt += 'if [ $modifyReport_result -ne 0 ]; then\n' |
1233 |
|
txt += ' modifyReport_result=70500\n' |
1239 |
|
txt += 'fi\n' |
1240 |
|
return txt |
1241 |
|
|
1242 |
< |
def wsParseFJR(self): |
1242 |
> |
def wsParseFJR(self): |
1243 |
|
""" |
1244 |
< |
Parse the FrameworkJobReport to obtain useful infos |
1244 |
> |
Parse the FrameworkJobReport to obtain useful infos |
1245 |
|
""" |
1246 |
|
txt = '\n#Written by cms_cmssw::wsParseFJR\n' |
1247 |
|
txt += 'echo ">>> Parse FrameworkJobReport crab_fjr.xml"\n' |
1248 |
|
txt += 'if [ -s $RUNTIME_AREA/crab_fjr_$NJob.xml ]; then\n' |
1249 |
|
txt += ' if [ -s $RUNTIME_AREA/parseCrabFjr.py ]; then\n' |
1250 |
< |
txt += ' cmd_out=`python $RUNTIME_AREA/parseCrabFjr.py --input $RUNTIME_AREA/crab_fjr_$NJob.xml --MonitorID $MonitorID --MonitorJobID $MonitorJobID`\n' |
1251 |
< |
txt += ' echo "Result of parsing the FrameworkJobReport crab_fjr.xml: $cmd_out"\n' |
1252 |
< |
txt += ' executable_exit_status=`echo $cmd_out | awk -F\; "{print $1}" | awk -F ' ' "{print $NF}"`\n' |
1250 |
> |
txt += ' cmd_out=`python $RUNTIME_AREA/parseCrabFjr.py --input $RUNTIME_AREA/crab_fjr_$NJob.xml --dashboard $MonitorID,$MonitorJobID '+self.debugWrap+'`\n' |
1251 |
> |
if self.debug_wrapper : |
1252 |
> |
txt += ' echo "Result of parsing the FrameworkJobReport crab_fjr.xml: $cmd_out"\n' |
1253 |
> |
txt += ' executable_exit_status=`python $RUNTIME_AREA/parseCrabFjr.py --input $RUNTIME_AREA/crab_fjr_$NJob.xml --exitcode`\n' |
1254 |
|
txt += ' if [ $executable_exit_status -eq 50115 ];then\n' |
1255 |
|
txt += ' echo ">>> crab_fjr.xml contents: "\n' |
1256 |
< |
txt += ' cat $RUNTIME_AREA/crab_fjr_NJob.xml\n' |
1256 |
> |
txt += ' cat $RUNTIME_AREA/crab_fjr_$NJob.xml\n' |
1257 |
|
txt += ' echo "Wrong FrameworkJobReport --> does not contain useful info. ExitStatus: $executable_exit_status"\n' |
1258 |
+ |
txt += ' elif [ $executable_exit_status -eq -999 ];then\n' |
1259 |
+ |
txt += ' echo "ExitStatus from FrameworkJobReport not available. not available. Using exit code of executable from command line."\n' |
1260 |
|
txt += ' else\n' |
1261 |
|
txt += ' echo "Extracted ExitStatus from FrameworkJobReport parsing output: $executable_exit_status"\n' |
1262 |
|
txt += ' fi\n' |
1265 |
|
txt += ' fi\n' |
1266 |
|
#### Patch to check input data reading for CMSSW16x Hopefully we-ll remove it asap |
1267 |
|
|
1268 |
< |
if self.datasetPath: |
1268 |
> |
if (self.datasetPath and not (self.dataset_pu or self.useParent) : |
1269 |
|
# VERIFY PROCESSED DATA |
1270 |
|
txt += ' if [ $executable_exit_status -eq 0 ];then\n' |
1271 |
|
txt += ' echo ">>> Verify list of processed files:"\n' |
1272 |
< |
txt += ' echo $InputFiles |tr -d "\\" |tr "," \n"|tr -d "\"" > input-files.txt\n' |
1273 |
< |
txt += ' grep LFN $RUNTIME_AREA/crab_fjr_$NJob.xml |cut -d">" -f2|cut -d"<" -f1|grep "/" > processed-files.txt\n' |
1272 |
> |
txt += ' echo $InputFiles |tr -d \'\\\\\' |tr \',\' \'\\n\'|tr -d \'"\' > input-files.txt\n' |
1273 |
> |
txt += ' python $RUNTIME_AREA/parseCrabFjr.py --input $RUNTIME_AREA/crab_fjr_$NJob.xml --lfn > processed-files.txt\n' |
1274 |
|
txt += ' cat input-files.txt | sort | uniq > tmp.txt\n' |
1275 |
|
txt += ' mv tmp.txt input-files.txt\n' |
1276 |
|
txt += ' echo "cat input-files.txt"\n' |
1329 |
|
stderr = 'CMSSW_$NJob.stderr' |
1330 |
|
if (self.return_data == 1): |
1331 |
|
for file in (self.output_file+self.output_file_sandbox): |
1332 |
< |
listOutFiles.append(self.numberFile_(file, '$NJob')) |
1332 |
> |
listOutFiles.append(numberFile(file, '$NJob')) |
1333 |
|
listOutFiles.append(stdout) |
1334 |
|
listOutFiles.append(stderr) |
1335 |
|
else: |
1336 |
|
for file in (self.output_file_sandbox): |
1337 |
< |
listOutFiles.append(self.numberFile_(file, '$NJob')) |
1337 |
> |
listOutFiles.append(numberFile(file, '$NJob')) |
1338 |
|
listOutFiles.append(stdout) |
1339 |
|
listOutFiles.append(stderr) |
1340 |
|
txt += 'echo "output files: '+string.join(listOutFiles,' ')+'"\n' |