2 |
|
from crab_logger import Logger |
3 |
|
from crab_exceptions import * |
4 |
|
from crab_util import * |
5 |
+ |
from BlackWhiteListParser import SEBlackWhiteListParser |
6 |
|
import common |
6 |
– |
|
7 |
– |
import DataDiscovery |
8 |
– |
import DataLocation |
7 |
|
import Scram |
8 |
+ |
from LFNBaseName import * |
9 |
|
|
10 |
< |
import os, string, re |
10 |
> |
import os, string, glob |
11 |
|
|
12 |
|
class Cmssw(JobType): |
13 |
< |
def __init__(self, cfg_params): |
13 |
> |
def __init__(self, cfg_params, ncjobs,skip_blocks, isNew): |
14 |
|
JobType.__init__(self, 'CMSSW') |
15 |
|
common.logger.debug(3,'CMSSW::__init__') |
16 |
+ |
self.skip_blocks = skip_blocks |
17 |
+ |
|
18 |
+ |
self.argsList = [] |
19 |
+ |
|
20 |
+ |
self._params = {} |
21 |
+ |
self.cfg_params = cfg_params |
22 |
+ |
# init BlackWhiteListParser |
23 |
+ |
self.blackWhiteListParser = SEBlackWhiteListParser(cfg_params) |
24 |
+ |
|
25 |
+ |
self.MaxTarBallSize = float(self.cfg_params.get('EDG.maxtarballsize',9.5)) |
26 |
|
|
27 |
< |
self.analisys_common_info = {} |
27 |
> |
# number of jobs requested to be created, limit obj splitting |
28 |
> |
self.ncjobs = ncjobs |
29 |
|
|
30 |
|
log = common.logger |
31 |
< |
|
31 |
> |
|
32 |
|
self.scram = Scram.Scram(cfg_params) |
23 |
– |
scramArea = '' |
33 |
|
self.additional_inbox_files = [] |
34 |
|
self.scriptExe = '' |
35 |
|
self.executable = '' |
36 |
+ |
self.executable_arch = self.scram.getArch() |
37 |
|
self.tgz_name = 'default.tgz' |
38 |
+ |
self.scriptName = 'CMSSW.sh' |
39 |
+ |
self.pset = '' |
40 |
+ |
self.datasetPath = '' |
41 |
+ |
|
42 |
+ |
# set FJR file name |
43 |
+ |
self.fjrFileName = 'crab_fjr.xml' |
44 |
|
|
45 |
|
self.version = self.scram.getSWVersion() |
46 |
< |
common.analisys_common_info['sw_version'] = self.version |
46 |
> |
version_array = self.version.split('_') |
47 |
> |
self.CMSSW_major = 0 |
48 |
> |
self.CMSSW_minor = 0 |
49 |
> |
self.CMSSW_patch = 0 |
50 |
> |
try: |
51 |
> |
self.CMSSW_major = int(version_array[1]) |
52 |
> |
self.CMSSW_minor = int(version_array[2]) |
53 |
> |
self.CMSSW_patch = int(version_array[3]) |
54 |
> |
except: |
55 |
> |
msg = "Cannot parse CMSSW version string: " + self.version + " for major and minor release number!" |
56 |
> |
raise CrabException(msg) |
57 |
|
|
58 |
|
### collect Data cards |
59 |
< |
try: |
60 |
< |
self.owner = cfg_params['CMSSW.owner'] |
61 |
< |
log.debug(6, "CMSSW::CMSSW(): owner = "+self.owner) |
36 |
< |
self.dataset = cfg_params['CMSSW.dataset'] |
37 |
< |
log.debug(6, "CMSSW::CMSSW(): dataset = "+self.dataset) |
38 |
< |
except KeyError: |
39 |
< |
msg = "Error: owner and/or dataset not defined " |
59 |
> |
|
60 |
> |
if not cfg_params.has_key('CMSSW.datasetpath'): |
61 |
> |
msg = "Error: datasetpath not defined " |
62 |
|
raise CrabException(msg) |
63 |
|
|
64 |
< |
self.dataTiers = [] |
65 |
< |
try: |
66 |
< |
tmpDataTiers = string.split(cfg_params['CMSSW.data_tier'],',') |
67 |
< |
for tmp in tmpDataTiers: |
68 |
< |
tmp=string.strip(tmp) |
69 |
< |
self.dataTiers.append(tmp) |
70 |
< |
pass |
71 |
< |
pass |
72 |
< |
except KeyError: |
73 |
< |
pass |
74 |
< |
log.debug(6, "Cmssw::Cmssw(): dataTiers = "+str(self.dataTiers)) |
64 |
> |
### Temporary: added to remove input file control in the case of PU |
65 |
> |
self.dataset_pu = cfg_params.get('CMSSW.dataset_pu', None) |
66 |
> |
|
67 |
> |
tmp = cfg_params['CMSSW.datasetpath'] |
68 |
> |
log.debug(6, "CMSSW::CMSSW(): datasetPath = "+tmp) |
69 |
> |
if string.lower(tmp)=='none': |
70 |
> |
self.datasetPath = None |
71 |
> |
self.selectNoInput = 1 |
72 |
> |
else: |
73 |
> |
self.datasetPath = tmp |
74 |
> |
self.selectNoInput = 0 |
75 |
|
|
76 |
+ |
self.dataTiers = [] |
77 |
+ |
self.debugWrap = '' |
78 |
+ |
self.debug_wrapper = cfg_params.get('USER.debug_wrapper',False) |
79 |
+ |
if self.debug_wrapper: self.debugWrap='--debug' |
80 |
|
## now the application |
81 |
< |
try: |
82 |
< |
self.executable = cfg_params['CMSSW.executable'] |
57 |
< |
log.debug(6, "CMSSW::CMSSW(): executable = "+self.executable) |
58 |
< |
msg = "Default executable cmsRun overridden. Switch to " + self.executable |
59 |
< |
log.debug(3,msg) |
60 |
< |
except KeyError: |
61 |
< |
self.executable = 'cmsRun' |
62 |
< |
msg = "User executable not defined. Use cmsRun" |
63 |
< |
log.debug(3,msg) |
64 |
< |
pass |
81 |
> |
self.executable = cfg_params.get('CMSSW.executable','cmsRun') |
82 |
> |
log.debug(6, "CMSSW::CMSSW(): executable = "+self.executable) |
83 |
|
|
84 |
< |
try: |
85 |
< |
self.pset = cfg_params['CMSSW.pset'] |
86 |
< |
log.debug(6, "Cmssw::Cmssw(): PSet file = "+self.pset) |
84 |
> |
if not cfg_params.has_key('CMSSW.pset'): |
85 |
> |
raise CrabException("PSet file missing. Cannot run cmsRun ") |
86 |
> |
self.pset = cfg_params['CMSSW.pset'] |
87 |
> |
log.debug(6, "Cmssw::Cmssw(): PSet file = "+self.pset) |
88 |
> |
if self.pset.lower() != 'none' : |
89 |
|
if (not os.path.exists(self.pset)): |
90 |
|
raise CrabException("User defined PSet file "+self.pset+" does not exist") |
91 |
< |
except KeyError: |
92 |
< |
raise CrabException("PSet file missing. Cannot run cmsRun ") |
91 |
> |
else: |
92 |
> |
self.pset = None |
93 |
|
|
94 |
|
# output files |
95 |
< |
try: |
96 |
< |
self.output_file = [] |
95 |
> |
## stuff which must be returned always via sandbox |
96 |
> |
self.output_file_sandbox = [] |
97 |
|
|
98 |
< |
tmp = cfg_params['CMSSW.output_file'] |
99 |
< |
if tmp != '': |
80 |
< |
tmpOutFiles = string.split(cfg_params['CMSSW.output_file'],',') |
81 |
< |
log.debug(7, 'cmssw::cmssw(): output files '+str(tmpOutFiles)) |
82 |
< |
for tmp in tmpOutFiles: |
83 |
< |
tmp=string.strip(tmp) |
84 |
< |
self.output_file.append(tmp) |
85 |
< |
pass |
98 |
> |
# add fjr report by default via sandbox |
99 |
> |
self.output_file_sandbox.append(self.fjrFileName) |
100 |
|
|
101 |
< |
else: |
102 |
< |
log.message("No output file defined: only stdout/err will be available") |
103 |
< |
pass |
104 |
< |
pass |
105 |
< |
except KeyError: |
106 |
< |
log.message("No output file defined: only stdout/err will be available") |
107 |
< |
pass |
101 |
> |
# other output files to be returned via sandbox or copied to SE |
102 |
> |
outfileflag = False |
103 |
> |
self.output_file = [] |
104 |
> |
tmp = cfg_params.get('CMSSW.output_file',None) |
105 |
> |
if tmp : |
106 |
> |
self.output_file = [x.strip() for x in tmp.split(',')] |
107 |
> |
outfileflag = True #output found |
108 |
> |
#else: |
109 |
> |
# log.message("No output file defined: only stdout/err and the CRAB Framework Job Report will be available\n") |
110 |
|
|
111 |
|
# script_exe file as additional file in inputSandbox |
112 |
< |
try: |
113 |
< |
self.scriptExe = cfg_params['CMSSW.script_exe'] |
114 |
< |
self.additional_inbox_files.append(self.scriptExe) |
115 |
< |
except KeyError: |
116 |
< |
pass |
117 |
< |
if self.scriptExe != '': |
118 |
< |
if os.path.isfile(self.scriptExe): |
119 |
< |
pass |
120 |
< |
else: |
121 |
< |
log.message("WARNING. file "+self.scriptExe+" not found") |
122 |
< |
sys.exit() |
123 |
< |
|
112 |
> |
self.scriptExe = cfg_params.get('USER.script_exe',None) |
113 |
> |
if self.scriptExe : |
114 |
> |
if not os.path.isfile(self.scriptExe): |
115 |
> |
msg ="ERROR. file "+self.scriptExe+" not found" |
116 |
> |
raise CrabException(msg) |
117 |
> |
self.additional_inbox_files.append(string.strip(self.scriptExe)) |
118 |
> |
|
119 |
> |
if self.datasetPath == None and self.pset == None and self.scriptExe == '' : |
120 |
> |
msg ="Error. script_exe not defined" |
121 |
> |
raise CrabException(msg) |
122 |
> |
|
123 |
> |
# use parent files... |
124 |
> |
self.useParent = self.cfg_params.get('CMSSW.use_parent',False) |
125 |
> |
|
126 |
|
## additional input files |
127 |
< |
try: |
128 |
< |
tmpAddFiles = string.split(cfg_params['CMSSW.additional_input_files'],',') |
127 |
> |
if cfg_params.has_key('USER.additional_input_files'): |
128 |
> |
tmpAddFiles = string.split(cfg_params['USER.additional_input_files'],',') |
129 |
|
for tmp in tmpAddFiles: |
130 |
< |
tmp=string.strip(tmp) |
131 |
< |
self.additional_inbox_files.append(tmp) |
130 |
> |
tmp = string.strip(tmp) |
131 |
> |
dirname = '' |
132 |
> |
if not tmp[0]=="/": dirname = "." |
133 |
> |
files = [] |
134 |
> |
if string.find(tmp,"*")>-1: |
135 |
> |
files = glob.glob(os.path.join(dirname, tmp)) |
136 |
> |
if len(files)==0: |
137 |
> |
raise CrabException("No additional input file found with this pattern: "+tmp) |
138 |
> |
else: |
139 |
> |
files.append(tmp) |
140 |
> |
for file in files: |
141 |
> |
if not os.path.exists(file): |
142 |
> |
raise CrabException("Additional input file not found: "+file) |
143 |
> |
pass |
144 |
> |
self.additional_inbox_files.append(string.strip(file)) |
145 |
|
pass |
146 |
|
pass |
147 |
< |
except KeyError: |
148 |
< |
pass |
147 |
> |
common.logger.debug(5,"Additional input files: "+str(self.additional_inbox_files)) |
148 |
> |
pass |
149 |
|
|
150 |
< |
try: |
151 |
< |
self.total_number_of_events = int(cfg_params['CMSSW.total_number_of_events']) |
152 |
< |
except KeyError: |
153 |
< |
msg = 'Must define total_number_of_events and job_number_of_events' |
154 |
< |
raise CrabException(msg) |
155 |
< |
|
156 |
< |
#Marco: FirstEvent is nolonger used inside PSet |
126 |
< |
# try: |
127 |
< |
# self.first = int(cfg_params['CMSSW.first_event']) |
128 |
< |
# except KeyError: |
129 |
< |
# self.first = 0 |
130 |
< |
# pass |
131 |
< |
# log.debug(6, "Orca::Orca(): total number of events = "+`self.total_number_of_events`) |
132 |
< |
#log.debug(6, "Orca::Orca(): events per job = "+`self.job_number_of_events`) |
133 |
< |
# log.debug(6, "Orca::Orca(): first event = "+`self.first`) |
134 |
< |
|
135 |
< |
CEBlackList = [] |
136 |
< |
try: |
137 |
< |
tmpBad = string.split(cfg_params['EDG.ce_black_list'],',') |
138 |
< |
for tmp in tmpBad: |
139 |
< |
tmp=string.strip(tmp) |
140 |
< |
CEBlackList.append(tmp) |
141 |
< |
except KeyError: |
142 |
< |
pass |
150 |
> |
## Events per job |
151 |
> |
if cfg_params.has_key('CMSSW.events_per_job'): |
152 |
> |
self.eventsPerJob =int( cfg_params['CMSSW.events_per_job']) |
153 |
> |
self.selectEventsPerJob = 1 |
154 |
> |
else: |
155 |
> |
self.eventsPerJob = -1 |
156 |
> |
self.selectEventsPerJob = 0 |
157 |
|
|
158 |
< |
self.reCEBlackList=[] |
159 |
< |
for bad in CEBlackList: |
160 |
< |
self.reCEBlackList.append(re.compile( bad )) |
158 |
> |
## number of jobs |
159 |
> |
if cfg_params.has_key('CMSSW.number_of_jobs'): |
160 |
> |
self.theNumberOfJobs =int( cfg_params['CMSSW.number_of_jobs']) |
161 |
> |
self.selectNumberOfJobs = 1 |
162 |
> |
else: |
163 |
> |
self.theNumberOfJobs = 0 |
164 |
> |
self.selectNumberOfJobs = 0 |
165 |
|
|
166 |
< |
common.logger.debug(5,'CEBlackList: '+str(CEBlackList)) |
166 |
> |
if cfg_params.has_key('CMSSW.total_number_of_events'): |
167 |
> |
self.total_number_of_events = int(cfg_params['CMSSW.total_number_of_events']) |
168 |
> |
self.selectTotalNumberEvents = 1 |
169 |
> |
if self.selectNumberOfJobs == 1: |
170 |
> |
if (self.total_number_of_events != -1) and int(self.total_number_of_events) < int(self.theNumberOfJobs): |
171 |
> |
msg = 'Must specify at least one event per job. total_number_of_events > number_of_jobs ' |
172 |
> |
raise CrabException(msg) |
173 |
> |
else: |
174 |
> |
self.total_number_of_events = 0 |
175 |
> |
self.selectTotalNumberEvents = 0 |
176 |
|
|
177 |
< |
CEWhiteList = [] |
178 |
< |
try: |
179 |
< |
tmpGood = string.split(cfg_params['EDG.ce_white_list'],',') |
180 |
< |
#tmpGood = ['cern'] |
181 |
< |
for tmp in tmpGood: |
182 |
< |
tmp=string.strip(tmp) |
183 |
< |
#if (tmp == 'cnaf'): tmp = 'webserver' ########## warning: temp. patch |
184 |
< |
CEWhiteList.append(tmp) |
185 |
< |
except KeyError: |
186 |
< |
pass |
177 |
> |
if self.pset != None: |
178 |
> |
if ( (self.selectTotalNumberEvents + self.selectEventsPerJob + self.selectNumberOfJobs) != 2 ): |
179 |
> |
msg = 'Must define exactly two of total_number_of_events, events_per_job, or number_of_jobs.' |
180 |
> |
raise CrabException(msg) |
181 |
> |
else: |
182 |
> |
if (self.selectNumberOfJobs == 0): |
183 |
> |
msg = 'Must specify number_of_jobs.' |
184 |
> |
raise CrabException(msg) |
185 |
> |
|
186 |
> |
## New method of dealing with seeds |
187 |
> |
self.incrementSeeds = [] |
188 |
> |
self.preserveSeeds = [] |
189 |
> |
if cfg_params.has_key('CMSSW.preserve_seeds'): |
190 |
> |
tmpList = cfg_params['CMSSW.preserve_seeds'].split(',') |
191 |
> |
for tmp in tmpList: |
192 |
> |
tmp.strip() |
193 |
> |
self.preserveSeeds.append(tmp) |
194 |
> |
if cfg_params.has_key('CMSSW.increment_seeds'): |
195 |
> |
tmpList = cfg_params['CMSSW.increment_seeds'].split(',') |
196 |
> |
for tmp in tmpList: |
197 |
> |
tmp.strip() |
198 |
> |
self.incrementSeeds.append(tmp) |
199 |
> |
|
200 |
> |
## FUTURE: Can remove in CRAB 2.4.0 |
201 |
> |
self.sourceSeed = cfg_params.get('CMSSW.pythia_seed',None) |
202 |
> |
self.sourceSeedVtx = cfg_params.get('CMSSW.vtx_seed',None) |
203 |
> |
self.sourceSeedG4 = cfg_params.get('CMSSW.g4_seed',None) |
204 |
> |
self.sourceSeedMix = cfg_params.get('CMSSW.mix_seed',None) |
205 |
> |
if self.sourceSeed or self.sourceSeedVtx or self.sourceSeedG4 or self.sourceSeedMix: |
206 |
> |
msg = 'pythia_seed, vtx_seed, g4_seed, and mix_seed are no longer valid settings. You must use increment_seeds or preserve_seeds' |
207 |
> |
raise CrabException(msg) |
208 |
|
|
209 |
< |
#print 'CEWhiteList: ',CEWhiteList |
162 |
< |
self.reCEWhiteList=[] |
163 |
< |
for Good in CEWhiteList: |
164 |
< |
self.reCEWhiteList.append(re.compile( Good )) |
209 |
> |
self.firstRun = cfg_params.get('CMSSW.first_run',None) |
210 |
|
|
211 |
< |
common.logger.debug(5,'CEWhiteList: '+str(CEWhiteList)) |
211 |
> |
# Copy/return |
212 |
> |
self.copy_data = int(cfg_params.get('USER.copy_data',0)) |
213 |
> |
self.return_data = int(cfg_params.get('USER.return_data',0)) |
214 |
|
|
215 |
|
#DBSDLS-start |
216 |
< |
## Initialize the variables that are extracted from DBS/DLS and needed in other places of the code |
216 |
> |
## Initialize the variables that are extracted from DBS/DLS and needed in other places of the code |
217 |
|
self.maxEvents=0 # max events available ( --> check the requested nb. of evts in Creator.py) |
218 |
|
self.DBSPaths={} # all dbs paths requested ( --> input to the site local discovery script) |
219 |
+ |
self.jobDestination=[] # Site destination(s) for each job (list of lists) |
220 |
|
## Perform the data location and discovery (based on DBS/DLS) |
221 |
< |
self.DataDiscoveryAndLocation(cfg_params) |
222 |
< |
#DBSDLS-end |
221 |
> |
## SL: Don't if NONE is specified as input (pythia use case) |
222 |
> |
blockSites = {} |
223 |
> |
if self.datasetPath: |
224 |
> |
blockSites = self.DataDiscoveryAndLocation(cfg_params) |
225 |
> |
#DBSDLS-end |
226 |
> |
|
227 |
> |
## Select Splitting |
228 |
> |
if self.selectNoInput: |
229 |
> |
if self.pset == None: |
230 |
> |
self.jobSplittingForScript() |
231 |
> |
else: |
232 |
> |
self.jobSplittingNoInput() |
233 |
> |
else: |
234 |
> |
self.jobSplittingByBlocks(blockSites) |
235 |
|
|
236 |
< |
self.tgzNameWithPath = self.getTarBall(self.executable) |
236 |
> |
# modify Pset only the first time |
237 |
> |
if isNew: |
238 |
> |
if self.pset != None: |
239 |
> |
import PsetManipulator as pp |
240 |
> |
PsetEdit = pp.PsetManipulator(self.pset) |
241 |
> |
try: |
242 |
> |
# Add FrameworkJobReport to parameter-set, set max events. |
243 |
> |
# Reset later for data jobs by writeCFG which does all modifications |
244 |
> |
PsetEdit.addCrabFJR(self.fjrFileName) # FUTURE: Job report addition not needed by CMSSW>1.5 |
245 |
> |
PsetEdit.maxEvent(self.eventsPerJob) |
246 |
> |
PsetEdit.psetWriter(self.configFilename()) |
247 |
> |
## If present, add TFileService to output files |
248 |
> |
if not int(cfg_params.get('CMSSW.skip_TFileService_output',0)): |
249 |
> |
tfsOutput = PsetEdit.getTFileService() |
250 |
> |
if tfsOutput: |
251 |
> |
if tfsOutput in self.output_file: |
252 |
> |
common.logger.debug(5,"Output from TFileService "+tfsOutput+" already in output files") |
253 |
> |
else: |
254 |
> |
outfileflag = True #output found |
255 |
> |
self.output_file.append(tfsOutput) |
256 |
> |
common.logger.message("Adding "+tfsOutput+" to output files (from TFileService)") |
257 |
> |
pass |
258 |
> |
pass |
259 |
> |
## If present and requested, add PoolOutputModule to output files |
260 |
> |
if int(cfg_params.get('CMSSW.get_edm_output',0)): |
261 |
> |
edmOutput = PsetEdit.getPoolOutputModule() |
262 |
> |
if edmOutput: |
263 |
> |
if edmOutput in self.output_file: |
264 |
> |
common.logger.debug(5,"Output from PoolOutputModule "+edmOutput+" already in output files") |
265 |
> |
else: |
266 |
> |
self.output_file.append(edmOutput) |
267 |
> |
common.logger.message("Adding "+edmOutput+" to output files (from PoolOutputModule)") |
268 |
> |
pass |
269 |
> |
pass |
270 |
> |
except CrabException: |
271 |
> |
msg='Error while manipulating ParameterSet: exiting...' |
272 |
> |
raise CrabException(msg) |
273 |
> |
## Prepare inputSandbox TarBall (only the first time) |
274 |
> |
self.tgzNameWithPath = self.getTarBall(self.executable) |
275 |
|
|
276 |
|
def DataDiscoveryAndLocation(self, cfg_params): |
277 |
|
|
278 |
< |
fun = "CMSSW::DataDiscoveryAndLocation()" |
278 |
> |
import DataDiscovery |
279 |
> |
import DataLocation |
280 |
> |
common.logger.debug(10,"CMSSW::DataDiscoveryAndLocation()") |
281 |
> |
|
282 |
> |
datasetPath=self.datasetPath |
283 |
|
|
284 |
|
## Contact the DBS |
285 |
+ |
common.logger.message("Contacting Data Discovery Services ...") |
286 |
|
try: |
287 |
< |
self.pubdata=DataDiscovery.DataDiscovery(self.owner, |
185 |
< |
self.dataset, |
186 |
< |
self.dataTiers, |
187 |
< |
cfg_params) |
287 |
> |
self.pubdata=DataDiscovery.DataDiscovery(datasetPath, cfg_params,self.skip_blocks) |
288 |
|
self.pubdata.fetchDBSInfo() |
289 |
|
|
290 |
|
except DataDiscovery.NotExistingDatasetError, ex : |
291 |
|
msg = 'ERROR ***: failed Data Discovery in DBS : %s'%ex.getErrorMessage() |
292 |
|
raise CrabException(msg) |
193 |
– |
|
293 |
|
except DataDiscovery.NoDataTierinProvenanceError, ex : |
294 |
|
msg = 'ERROR ***: failed Data Discovery in DBS : %s'%ex.getErrorMessage() |
295 |
|
raise CrabException(msg) |
296 |
|
except DataDiscovery.DataDiscoveryError, ex: |
297 |
< |
msg = 'ERROR ***: failed Data Discovery in DBS %s'%ex.getErrorMessage() |
297 |
> |
msg = 'ERROR ***: failed Data Discovery in DBS : %s'%ex.getErrorMessage() |
298 |
|
raise CrabException(msg) |
299 |
|
|
300 |
< |
## get list of all required data in the form of dbs paths (dbs path = /dataset/datatier/owner) |
301 |
< |
self.DBSPaths=self.pubdata.getDBSPaths() |
302 |
< |
common.logger.message("Required data are : ") |
303 |
< |
for path in self.DBSPaths: |
205 |
< |
common.logger.message(" --> "+path ) |
300 |
> |
self.filesbyblock=self.pubdata.getFiles() |
301 |
> |
self.eventsbyblock=self.pubdata.getEventsPerBlock() |
302 |
> |
self.eventsbyfile=self.pubdata.getEventsPerFile() |
303 |
> |
self.parentFiles=self.pubdata.getParent() |
304 |
|
|
305 |
|
## get max number of events |
306 |
< |
common.logger.debug(10,"number of events for primary fileblocks %i"%self.pubdata.getMaxEvents()) |
209 |
< |
self.maxEvents=self.pubdata.getMaxEvents() ## self.maxEvents used in Creator.py |
210 |
< |
common.logger.message("\nThe number of available events is %s"%self.maxEvents) |
211 |
< |
|
212 |
< |
## get fileblocks corresponding to the required data |
213 |
< |
fb=self.pubdata.getFileBlocks() |
214 |
< |
common.logger.debug(5,"fileblocks are %s"%fb) |
306 |
> |
self.maxEvents=self.pubdata.getMaxEvents() |
307 |
|
|
308 |
|
## Contact the DLS and build a list of sites hosting the fileblocks |
309 |
|
try: |
310 |
< |
dataloc=DataLocation.DataLocation(self.pubdata.getFileBlocks(),cfg_params) |
310 |
> |
dataloc=DataLocation.DataLocation(self.filesbyblock.keys(),cfg_params) |
311 |
|
dataloc.fetchDLSInfo() |
312 |
|
except DataLocation.DataLocationError , ex: |
313 |
|
msg = 'ERROR ***: failed Data Location in DLS \n %s '%ex.getErrorMessage() |
314 |
|
raise CrabException(msg) |
223 |
– |
|
224 |
– |
allsites=dataloc.getSites() |
225 |
– |
common.logger.debug(5,"sites are %s"%allsites) |
226 |
– |
sites=self.checkBlackList(allsites) |
227 |
– |
common.logger.debug(5,"sites are (after black list) %s"%sites) |
228 |
– |
sites=self.checkWhiteList(sites) |
229 |
– |
common.logger.debug(5,"sites are (after white list) %s"%sites) |
315 |
|
|
316 |
< |
if len(sites)==0: |
317 |
< |
msg = 'No sites hosting all the needed data! Exiting... ' |
318 |
< |
raise CrabException(msg) |
319 |
< |
common.logger.message("List of Sites hosting the data : "+str(sites)) |
320 |
< |
common.logger.debug(6, "List of Sites: "+str(sites)) |
321 |
< |
common.analisys_common_info['sites']=sites ## used in SchedulerEdg.py in createSchScript |
322 |
< |
return |
323 |
< |
|
324 |
< |
def checkBlackList(self, allSites): |
325 |
< |
if len(self.reCEBlackList)==0: return allSites |
326 |
< |
sites = [] |
327 |
< |
for site in allSites: |
243 |
< |
common.logger.debug(10,'Site '+site) |
244 |
< |
good=1 |
245 |
< |
for re in self.reCEBlackList: |
246 |
< |
if re.search(site): |
247 |
< |
common.logger.message('CE in black list, skipping site '+site) |
248 |
< |
good=0 |
249 |
< |
pass |
250 |
< |
if good: sites.append(site) |
251 |
< |
if len(sites) == 0: |
252 |
< |
common.logger.debug(3,"No sites found after BlackList") |
316 |
> |
|
317 |
> |
sites = dataloc.getSites() |
318 |
> |
allSites = [] |
319 |
> |
listSites = sites.values() |
320 |
> |
for listSite in listSites: |
321 |
> |
for oneSite in listSite: |
322 |
> |
allSites.append(oneSite) |
323 |
> |
allSites = self.uniquelist(allSites) |
324 |
> |
|
325 |
> |
# screen output |
326 |
> |
common.logger.message("Requested dataset: " + datasetPath + " has " + str(self.maxEvents) + " events in " + str(len(self.filesbyblock.keys())) + " blocks.\n") |
327 |
> |
|
328 |
|
return sites |
329 |
|
|
330 |
< |
def checkWhiteList(self, allsites): |
330 |
> |
def jobSplittingByBlocks(self, blockSites): |
331 |
> |
""" |
332 |
> |
Perform job splitting. Jobs run over an integer number of files |
333 |
> |
and no more than one block. |
334 |
> |
ARGUMENT: blockSites: dictionary with blocks as keys and list of host sites as values |
335 |
> |
REQUIRES: self.selectTotalNumberEvents, self.selectEventsPerJob, self.selectNumberofJobs, |
336 |
> |
self.total_number_of_events, self.eventsPerJob, self.theNumberOfJobs, |
337 |
> |
self.maxEvents, self.filesbyblock |
338 |
> |
SETS: self.jobDestination - Site destination(s) for each job (a list of lists) |
339 |
> |
self.total_number_of_jobs - Total # of jobs |
340 |
> |
self.list_of_args - File(s) job will run on (a list of lists) |
341 |
> |
""" |
342 |
|
|
343 |
< |
if len(self.reCEWhiteList)==0: return pubDBUrls |
344 |
< |
sites = [] |
345 |
< |
for site in allsites: |
346 |
< |
#print 'connecting to the URL ',url |
347 |
< |
good=0 |
348 |
< |
for re in self.reCEWhiteList: |
349 |
< |
if re.search(site): |
350 |
< |
common.logger.debug(5,'CE in white list, adding site '+site) |
351 |
< |
good=1 |
352 |
< |
if not good: continue |
353 |
< |
sites.append(site) |
354 |
< |
if len(sites) == 0: |
355 |
< |
common.logger.message("No sites found after WhiteList\n") |
343 |
> |
# ---- Handle the possible job splitting configurations ---- # |
344 |
> |
if (self.selectTotalNumberEvents): |
345 |
> |
totalEventsRequested = self.total_number_of_events |
346 |
> |
if (self.selectEventsPerJob): |
347 |
> |
eventsPerJobRequested = self.eventsPerJob |
348 |
> |
if (self.selectNumberOfJobs): |
349 |
> |
totalEventsRequested = self.theNumberOfJobs * self.eventsPerJob |
350 |
> |
|
351 |
> |
# If user requested all the events in the dataset |
352 |
> |
if (totalEventsRequested == -1): |
353 |
> |
eventsRemaining=self.maxEvents |
354 |
> |
# If user requested more events than are in the dataset |
355 |
> |
elif (totalEventsRequested > self.maxEvents): |
356 |
> |
eventsRemaining = self.maxEvents |
357 |
> |
common.logger.message("Requested "+str(self.total_number_of_events)+ " events, but only "+str(self.maxEvents)+" events are available.") |
358 |
> |
# If user requested less events than are in the dataset |
359 |
|
else: |
360 |
< |
common.logger.debug(5,"Selected sites via WhiteList are "+str(sites)+"\n") |
361 |
< |
return sites |
360 |
> |
eventsRemaining = totalEventsRequested |
361 |
> |
|
362 |
> |
# If user requested more events per job than are in the dataset |
363 |
> |
if (self.selectEventsPerJob and eventsPerJobRequested > self.maxEvents): |
364 |
> |
eventsPerJobRequested = self.maxEvents |
365 |
> |
|
366 |
> |
# For user info at end |
367 |
> |
totalEventCount = 0 |
368 |
> |
|
369 |
> |
if (self.selectTotalNumberEvents and self.selectNumberOfJobs): |
370 |
> |
eventsPerJobRequested = int(eventsRemaining/self.theNumberOfJobs) |
371 |
> |
|
372 |
> |
if (self.selectNumberOfJobs): |
373 |
> |
common.logger.message("May not create the exact number_of_jobs requested.") |
374 |
> |
|
375 |
> |
if ( self.ncjobs == 'all' ) : |
376 |
> |
totalNumberOfJobs = 999999999 |
377 |
> |
else : |
378 |
> |
totalNumberOfJobs = self.ncjobs |
379 |
> |
|
380 |
> |
blocks = blockSites.keys() |
381 |
> |
blockCount = 0 |
382 |
> |
# Backup variable in case self.maxEvents counted events in a non-included block |
383 |
> |
numBlocksInDataset = len(blocks) |
384 |
> |
|
385 |
> |
jobCount = 0 |
386 |
> |
list_of_lists = [] |
387 |
> |
|
388 |
> |
# list tracking which jobs are in which jobs belong to which block |
389 |
> |
jobsOfBlock = {} |
390 |
> |
|
391 |
> |
# ---- Iterate over the blocks in the dataset until ---- # |
392 |
> |
# ---- we've met the requested total # of events ---- # |
393 |
> |
while ( (eventsRemaining > 0) and (blockCount < numBlocksInDataset) and (jobCount < totalNumberOfJobs)): |
394 |
> |
block = blocks[blockCount] |
395 |
> |
blockCount += 1 |
396 |
> |
if block not in jobsOfBlock.keys() : |
397 |
> |
jobsOfBlock[block] = [] |
398 |
> |
|
399 |
> |
if self.eventsbyblock.has_key(block) : |
400 |
> |
numEventsInBlock = self.eventsbyblock[block] |
401 |
> |
common.logger.debug(5,'Events in Block File '+str(numEventsInBlock)) |
402 |
> |
|
403 |
> |
files = self.filesbyblock[block] |
404 |
> |
numFilesInBlock = len(files) |
405 |
> |
if (numFilesInBlock <= 0): |
406 |
> |
continue |
407 |
> |
fileCount = 0 |
408 |
> |
|
409 |
> |
# ---- New block => New job ---- # |
410 |
> |
parString = "" |
411 |
> |
# counter for number of events in files currently worked on |
412 |
> |
filesEventCount = 0 |
413 |
> |
# flag if next while loop should touch new file |
414 |
> |
newFile = 1 |
415 |
> |
# job event counter |
416 |
> |
jobSkipEventCount = 0 |
417 |
> |
|
418 |
> |
# ---- Iterate over the files in the block until we've met the requested ---- # |
419 |
> |
# ---- total # of events or we've gone over all the files in this block ---- # |
420 |
> |
pString='' |
421 |
> |
while ( (eventsRemaining > 0) and (fileCount < numFilesInBlock) and (jobCount < totalNumberOfJobs) ): |
422 |
> |
file = files[fileCount] |
423 |
> |
if self.useParent: |
424 |
> |
parent = self.parentFiles[file] |
425 |
> |
for f in parent : |
426 |
> |
pString += '\\\"' + f + '\\\"\,' |
427 |
> |
common.logger.debug(6, "File "+str(file)+" has the following parents: "+str(parent)) |
428 |
> |
common.logger.write("File "+str(file)+" has the following parents: "+str(parent)) |
429 |
> |
if newFile : |
430 |
> |
try: |
431 |
> |
numEventsInFile = self.eventsbyfile[file] |
432 |
> |
common.logger.debug(6, "File "+str(file)+" has "+str(numEventsInFile)+" events") |
433 |
> |
# increase filesEventCount |
434 |
> |
filesEventCount += numEventsInFile |
435 |
> |
# Add file to current job |
436 |
> |
parString += '\\\"' + file + '\\\"\,' |
437 |
> |
newFile = 0 |
438 |
> |
except KeyError: |
439 |
> |
common.logger.message("File "+str(file)+" has unknown number of events: skipping") |
440 |
> |
|
441 |
> |
eventsPerJobRequested = min(eventsPerJobRequested, eventsRemaining) |
442 |
> |
# if less events in file remain than eventsPerJobRequested |
443 |
> |
if ( filesEventCount - jobSkipEventCount < eventsPerJobRequested): |
444 |
> |
# if last file in block |
445 |
> |
if ( fileCount == numFilesInBlock-1 ) : |
446 |
> |
# end job using last file, use remaining events in block |
447 |
> |
# close job and touch new file |
448 |
> |
fullString = parString[:-2] |
449 |
> |
if self.useParent: |
450 |
> |
fullParentString = pString[:-2] |
451 |
> |
list_of_lists.append([fullString,fullParentString,str(-1),str(jobSkipEventCount)]) |
452 |
> |
else: |
453 |
> |
list_of_lists.append([fullString,str(-1),str(jobSkipEventCount)]) |
454 |
> |
common.logger.debug(3,"Job "+str(jobCount+1)+" can run over "+str(filesEventCount - jobSkipEventCount)+" events (last file in block).") |
455 |
> |
self.jobDestination.append(blockSites[block]) |
456 |
> |
common.logger.debug(5,"Job "+str(jobCount+1)+" Destination: "+str(self.jobDestination[jobCount])) |
457 |
> |
# fill jobs of block dictionary |
458 |
> |
jobsOfBlock[block].append(jobCount+1) |
459 |
> |
# reset counter |
460 |
> |
jobCount = jobCount + 1 |
461 |
> |
totalEventCount = totalEventCount + filesEventCount - jobSkipEventCount |
462 |
> |
eventsRemaining = eventsRemaining - filesEventCount + jobSkipEventCount |
463 |
> |
jobSkipEventCount = 0 |
464 |
> |
# reset file |
465 |
> |
pString = "" |
466 |
> |
parString = "" |
467 |
> |
filesEventCount = 0 |
468 |
> |
newFile = 1 |
469 |
> |
fileCount += 1 |
470 |
> |
else : |
471 |
> |
# go to next file |
472 |
> |
newFile = 1 |
473 |
> |
fileCount += 1 |
474 |
> |
# if events in file equal to eventsPerJobRequested |
475 |
> |
elif ( filesEventCount - jobSkipEventCount == eventsPerJobRequested ) : |
476 |
> |
# close job and touch new file |
477 |
> |
fullString = parString[:-2] |
478 |
> |
if self.useParent: |
479 |
> |
fullParentString = pString[:-2] |
480 |
> |
list_of_lists.append([fullString,fullParentString,str(eventsPerJobRequested),str(jobSkipEventCount)]) |
481 |
> |
else: |
482 |
> |
list_of_lists.append([fullString,str(eventsPerJobRequested),str(jobSkipEventCount)]) |
483 |
> |
common.logger.debug(3,"Job "+str(jobCount+1)+" can run over "+str(eventsPerJobRequested)+" events.") |
484 |
> |
self.jobDestination.append(blockSites[block]) |
485 |
> |
common.logger.debug(5,"Job "+str(jobCount+1)+" Destination: "+str(self.jobDestination[jobCount])) |
486 |
> |
jobsOfBlock[block].append(jobCount+1) |
487 |
> |
# reset counter |
488 |
> |
jobCount = jobCount + 1 |
489 |
> |
totalEventCount = totalEventCount + eventsPerJobRequested |
490 |
> |
eventsRemaining = eventsRemaining - eventsPerJobRequested |
491 |
> |
jobSkipEventCount = 0 |
492 |
> |
# reset file |
493 |
> |
pString = "" |
494 |
> |
parString = "" |
495 |
> |
filesEventCount = 0 |
496 |
> |
newFile = 1 |
497 |
> |
fileCount += 1 |
498 |
> |
|
499 |
> |
# if more events in file remain than eventsPerJobRequested |
500 |
> |
else : |
501 |
> |
# close job but don't touch new file |
502 |
> |
fullString = parString[:-2] |
503 |
> |
if self.useParent: |
504 |
> |
fullParentString = pString[:-2] |
505 |
> |
list_of_lists.append([fullString,fullParentString,str(eventsPerJobRequested),str(jobSkipEventCount)]) |
506 |
> |
else: |
507 |
> |
list_of_lists.append([fullString,str(eventsPerJobRequested),str(jobSkipEventCount)]) |
508 |
> |
common.logger.debug(3,"Job "+str(jobCount+1)+" can run over "+str(eventsPerJobRequested)+" events.") |
509 |
> |
self.jobDestination.append(blockSites[block]) |
510 |
> |
common.logger.debug(5,"Job "+str(jobCount+1)+" Destination: "+str(self.jobDestination[jobCount])) |
511 |
> |
jobsOfBlock[block].append(jobCount+1) |
512 |
> |
# increase counter |
513 |
> |
jobCount = jobCount + 1 |
514 |
> |
totalEventCount = totalEventCount + eventsPerJobRequested |
515 |
> |
eventsRemaining = eventsRemaining - eventsPerJobRequested |
516 |
> |
# calculate skip events for last file |
517 |
> |
# use filesEventCount (contains several files), jobSkipEventCount and eventsPerJobRequest |
518 |
> |
jobSkipEventCount = eventsPerJobRequested - (filesEventCount - jobSkipEventCount - self.eventsbyfile[file]) |
519 |
> |
# remove all but the last file |
520 |
> |
filesEventCount = self.eventsbyfile[file] |
521 |
> |
if self.useParent: |
522 |
> |
for f in parent : pString += '\\\"' + f + '\\\"\,' |
523 |
> |
parString = '\\\"' + file + '\\\"\,' |
524 |
> |
pass # END if |
525 |
> |
pass # END while (iterate over files in the block) |
526 |
> |
pass # END while (iterate over blocks in the dataset) |
527 |
> |
self.ncjobs = self.total_number_of_jobs = jobCount |
528 |
> |
if (eventsRemaining > 0 and jobCount < totalNumberOfJobs ): |
529 |
> |
common.logger.message("Could not run on all requested events because some blocks not hosted at allowed sites.") |
530 |
> |
common.logger.message(str(jobCount)+" job(s) can run on "+str(totalEventCount)+" events.\n") |
531 |
> |
|
532 |
> |
# screen output |
533 |
> |
screenOutput = "List of jobs and available destination sites:\n\n" |
534 |
> |
|
535 |
> |
# keep trace of block with no sites to print a warning at the end |
536 |
> |
noSiteBlock = [] |
537 |
> |
bloskNoSite = [] |
538 |
> |
|
539 |
> |
blockCounter = 0 |
540 |
> |
for block in blocks: |
541 |
> |
if block in jobsOfBlock.keys() : |
542 |
> |
blockCounter += 1 |
543 |
> |
screenOutput += "Block %5i: jobs %20s: sites: %s\n" % (blockCounter,spanRanges(jobsOfBlock[block]), |
544 |
> |
','.join(self.blackWhiteListParser.checkWhiteList(self.blackWhiteListParser.checkBlackList(blockSites[block],block),block))) |
545 |
> |
if len(self.blackWhiteListParser.checkWhiteList(self.blackWhiteListParser.checkBlackList(blockSites[block],block),block)) == 0: |
546 |
> |
noSiteBlock.append( spanRanges(jobsOfBlock[block]) ) |
547 |
> |
bloskNoSite.append( blockCounter ) |
548 |
> |
|
549 |
> |
common.logger.message(screenOutput) |
550 |
> |
if len(noSiteBlock) > 0 and len(bloskNoSite) > 0: |
551 |
> |
msg = 'WARNING: No sites are hosting any part of data for block:\n ' |
552 |
> |
virgola = "" |
553 |
> |
if len(bloskNoSite) > 1: |
554 |
> |
virgola = "," |
555 |
> |
for block in bloskNoSite: |
556 |
> |
msg += ' ' + str(block) + virgola |
557 |
> |
msg += '\n Related jobs:\n ' |
558 |
> |
virgola = "" |
559 |
> |
if len(noSiteBlock) > 1: |
560 |
> |
virgola = "," |
561 |
> |
for range_jobs in noSiteBlock: |
562 |
> |
msg += str(range_jobs) + virgola |
563 |
> |
msg += '\n will not be submitted and this block of data can not be analyzed!\n' |
564 |
> |
if self.cfg_params.has_key('EDG.se_white_list'): |
565 |
> |
msg += 'WARNING: SE White List: '+self.cfg_params['EDG.se_white_list']+'\n' |
566 |
> |
msg += '(Hint: By whitelisting you force the job to run at this particular site(s).\n' |
567 |
> |
msg += 'Please check if the dataset is available at this site!)\n' |
568 |
> |
if self.cfg_params.has_key('EDG.ce_white_list'): |
569 |
> |
msg += 'WARNING: CE White List: '+self.cfg_params['EDG.ce_white_list']+'\n' |
570 |
> |
msg += '(Hint: By whitelisting you force the job to run at this particular site(s).\n' |
571 |
> |
msg += 'Please check if the dataset is available at this site!)\n' |
572 |
> |
|
573 |
> |
common.logger.message(msg) |
574 |
> |
|
575 |
> |
self.list_of_args = list_of_lists |
576 |
> |
return |
577 |
> |
|
578 |
> |
def jobSplittingNoInput(self): |
579 |
> |
""" |
580 |
> |
Perform job splitting based on number of event per job |
581 |
> |
""" |
582 |
> |
common.logger.debug(5,'Splitting per events') |
583 |
> |
|
584 |
> |
if (self.selectEventsPerJob): |
585 |
> |
common.logger.message('Required '+str(self.eventsPerJob)+' events per job ') |
586 |
> |
if (self.selectNumberOfJobs): |
587 |
> |
common.logger.message('Required '+str(self.theNumberOfJobs)+' jobs in total ') |
588 |
> |
if (self.selectTotalNumberEvents): |
589 |
> |
common.logger.message('Required '+str(self.total_number_of_events)+' events in total ') |
590 |
> |
|
591 |
> |
if (self.total_number_of_events < 0): |
592 |
> |
msg='Cannot split jobs per Events with "-1" as total number of events' |
593 |
> |
raise CrabException(msg) |
594 |
> |
|
595 |
> |
if (self.selectEventsPerJob): |
596 |
> |
if (self.selectTotalNumberEvents): |
597 |
> |
self.total_number_of_jobs = int(self.total_number_of_events/self.eventsPerJob) |
598 |
> |
elif(self.selectNumberOfJobs) : |
599 |
> |
self.total_number_of_jobs =self.theNumberOfJobs |
600 |
> |
self.total_number_of_events =int(self.theNumberOfJobs*self.eventsPerJob) |
601 |
> |
|
602 |
> |
elif (self.selectNumberOfJobs) : |
603 |
> |
self.total_number_of_jobs = self.theNumberOfJobs |
604 |
> |
self.eventsPerJob = int(self.total_number_of_events/self.total_number_of_jobs) |
605 |
> |
|
606 |
> |
common.logger.debug(5,'N jobs '+str(self.total_number_of_jobs)) |
607 |
> |
|
608 |
> |
# is there any remainder? |
609 |
> |
check = int(self.total_number_of_events) - (int(self.total_number_of_jobs)*self.eventsPerJob) |
610 |
> |
|
611 |
> |
common.logger.debug(5,'Check '+str(check)) |
612 |
> |
|
613 |
> |
common.logger.message(str(self.total_number_of_jobs)+' jobs can be created, each for '+str(self.eventsPerJob)+' for a total of '+str(self.total_number_of_jobs*self.eventsPerJob)+' events') |
614 |
> |
if check > 0: |
615 |
> |
common.logger.message('Warning: asked '+str(self.total_number_of_events)+' but can do only '+str(int(self.total_number_of_jobs)*self.eventsPerJob)) |
616 |
> |
|
617 |
> |
# argument is seed number.$i |
618 |
> |
self.list_of_args = [] |
619 |
> |
for i in range(self.total_number_of_jobs): |
620 |
> |
## Since there is no input, any site is good |
621 |
> |
self.jobDestination.append([""]) #must be empty to write correctly the xml |
622 |
> |
args=[] |
623 |
> |
if (self.firstRun): |
624 |
> |
## pythia first run |
625 |
> |
args.append(str(self.firstRun)+str(i)) |
626 |
> |
self.list_of_args.append(args) |
627 |
> |
|
628 |
> |
return |
629 |
> |
|
630 |
> |
|
631 |
> |
def jobSplittingForScript(self): |
632 |
> |
""" |
633 |
> |
Perform job splitting based on number of job |
634 |
> |
""" |
635 |
> |
common.logger.debug(5,'Splitting per job') |
636 |
> |
common.logger.message('Required '+str(self.theNumberOfJobs)+' jobs in total ') |
637 |
> |
|
638 |
> |
self.total_number_of_jobs = self.theNumberOfJobs |
639 |
> |
|
640 |
> |
common.logger.debug(5,'N jobs '+str(self.total_number_of_jobs)) |
641 |
> |
|
642 |
> |
common.logger.message(str(self.total_number_of_jobs)+' jobs can be created') |
643 |
> |
|
644 |
> |
# argument is seed number.$i |
645 |
> |
self.list_of_args = [] |
646 |
> |
for i in range(self.total_number_of_jobs): |
647 |
> |
self.jobDestination.append([""]) |
648 |
> |
self.list_of_args.append([str(i)]) |
649 |
> |
return |
650 |
> |
|
651 |
> |
def split(self, jobParams,firstJobID): |
652 |
> |
|
653 |
> |
njobs = self.total_number_of_jobs |
654 |
> |
arglist = self.list_of_args |
655 |
> |
# create the empty structure |
656 |
> |
for i in range(njobs): |
657 |
> |
jobParams.append("") |
658 |
> |
|
659 |
> |
listID=[] |
660 |
> |
listField=[] |
661 |
> |
for id in range(njobs): |
662 |
> |
job = id + int(firstJobID) |
663 |
> |
jobParams[id] = arglist[id] |
664 |
> |
listID.append(job+1) |
665 |
> |
job_ToSave ={} |
666 |
> |
concString = ' ' |
667 |
> |
argu='' |
668 |
> |
if len(jobParams[id]): |
669 |
> |
argu += concString.join(jobParams[id] ) |
670 |
> |
job_ToSave['arguments']= str(job+1)+' '+argu |
671 |
> |
job_ToSave['dlsDestination']= self.jobDestination[id] |
672 |
> |
listField.append(job_ToSave) |
673 |
> |
msg="Job "+str(job)+" Arguments: "+str(job+1)+" "+argu+"\n" \ |
674 |
> |
+" Destination: "+str(self.jobDestination[id]) |
675 |
> |
common.logger.debug(5,msg) |
676 |
> |
common._db.updateJob_(listID,listField) |
677 |
> |
self.argsList = (len(jobParams[0])+1) |
678 |
> |
|
679 |
> |
return |
680 |
> |
|
681 |
> |
def numberOfJobs(self): |
682 |
> |
return self.total_number_of_jobs |
683 |
|
|
684 |
|
def getTarBall(self, exe): |
685 |
|
""" |
686 |
|
Return the TarBall with lib and exe |
687 |
|
""" |
688 |
< |
|
279 |
< |
# if it exist, just return it |
280 |
< |
self.tgzNameWithPath = common.work_space.shareDir()+self.tgz_name |
688 |
> |
self.tgzNameWithPath = common.work_space.pathForTgz()+'share/'+self.tgz_name |
689 |
|
if os.path.exists(self.tgzNameWithPath): |
690 |
|
return self.tgzNameWithPath |
691 |
|
|
698 |
|
|
699 |
|
# First of all declare the user Scram area |
700 |
|
swArea = self.scram.getSWArea_() |
293 |
– |
#print "swArea = ", swArea |
294 |
– |
swVersion = self.scram.getSWVersion() |
295 |
– |
#print "swVersion = ", swVersion |
701 |
|
swReleaseTop = self.scram.getReleaseTop_() |
702 |
< |
#print "swReleaseTop = ", swReleaseTop |
298 |
< |
|
702 |
> |
|
703 |
|
## check if working area is release top |
704 |
|
if swReleaseTop == '' or swArea == swReleaseTop: |
705 |
+ |
common.logger.debug(3,"swArea = "+swArea+" swReleaseTop ="+swReleaseTop) |
706 |
|
return |
707 |
|
|
708 |
< |
filesToBeTarred = [] |
709 |
< |
## First find the executable |
710 |
< |
if (self.executable != ''): |
711 |
< |
exeWithPath = self.scram.findFile_(executable) |
712 |
< |
# print exeWithPath |
713 |
< |
if ( not exeWithPath ): |
714 |
< |
raise CrabException('User executable '+executable+' not found') |
715 |
< |
|
716 |
< |
## then check if it's private or not |
717 |
< |
if exeWithPath.find(swReleaseTop) == -1: |
718 |
< |
# the exe is private, so we must ship |
719 |
< |
common.logger.debug(5,"Exe "+exeWithPath+" to be tarred") |
720 |
< |
path = swArea+'/' |
721 |
< |
exe = string.replace(exeWithPath, path,'') |
722 |
< |
filesToBeTarred.append(exe) |
723 |
< |
pass |
724 |
< |
else: |
725 |
< |
# the exe is from release, we'll find it on WN |
708 |
> |
import tarfile |
709 |
> |
try: # create tar ball |
710 |
> |
tar = tarfile.open(self.tgzNameWithPath, "w:gz") |
711 |
> |
## First find the executable |
712 |
> |
if (self.executable != ''): |
713 |
> |
exeWithPath = self.scram.findFile_(executable) |
714 |
> |
if ( not exeWithPath ): |
715 |
> |
raise CrabException('User executable '+executable+' not found') |
716 |
> |
|
717 |
> |
## then check if it's private or not |
718 |
> |
if exeWithPath.find(swReleaseTop) == -1: |
719 |
> |
# the exe is private, so we must ship |
720 |
> |
common.logger.debug(5,"Exe "+exeWithPath+" to be tarred") |
721 |
> |
path = swArea+'/' |
722 |
> |
# distinguish case when script is in user project area or given by full path somewhere else |
723 |
> |
if exeWithPath.find(path) >= 0 : |
724 |
> |
exe = string.replace(exeWithPath, path,'') |
725 |
> |
tar.add(path+exe,exe) |
726 |
> |
else : |
727 |
> |
tar.add(exeWithPath,os.path.basename(executable)) |
728 |
> |
pass |
729 |
> |
else: |
730 |
> |
# the exe is from release, we'll find it on WN |
731 |
> |
pass |
732 |
> |
|
733 |
> |
## Now get the libraries: only those in local working area |
734 |
> |
libDir = 'lib' |
735 |
> |
lib = swArea+'/' +libDir |
736 |
> |
common.logger.debug(5,"lib "+lib+" to be tarred") |
737 |
> |
if os.path.exists(lib): |
738 |
> |
tar.add(lib,libDir) |
739 |
> |
|
740 |
> |
## Now check if module dir is present |
741 |
> |
moduleDir = 'module' |
742 |
> |
module = swArea + '/' + moduleDir |
743 |
> |
if os.path.isdir(module): |
744 |
> |
tar.add(module,moduleDir) |
745 |
> |
|
746 |
> |
## Now check if any data dir(s) is present |
747 |
> |
self.dataExist = False |
748 |
> |
todo_list = [(i, i) for i in os.listdir(swArea+"/src")] |
749 |
> |
while len(todo_list): |
750 |
> |
entry, name = todo_list.pop() |
751 |
> |
if name.startswith('crab_0_') or name.startswith('.') or name == 'CVS': |
752 |
> |
continue |
753 |
> |
if os.path.isdir(swArea+"/src/"+entry): |
754 |
> |
entryPath = entry + '/' |
755 |
> |
todo_list += [(entryPath + i, i) for i in os.listdir(swArea+"/src/"+entry)] |
756 |
> |
if name == 'data': |
757 |
> |
self.dataExist=True |
758 |
> |
common.logger.debug(5,"data "+entry+" to be tarred") |
759 |
> |
tar.add(swArea+"/src/"+entry,"src/"+entry) |
760 |
> |
pass |
761 |
|
pass |
762 |
< |
|
763 |
< |
## Now get the libraries: only those in local working area |
764 |
< |
libDir = 'lib' |
765 |
< |
lib = swArea+'/' +libDir |
766 |
< |
common.logger.debug(5,"lib "+lib+" to be tarred") |
767 |
< |
if os.path.exists(lib): |
768 |
< |
filesToBeTarred.append(libDir) |
769 |
< |
|
770 |
< |
## Now check if the Data dir is present |
771 |
< |
dataDir = 'src/Data/' |
772 |
< |
if os.path.isdir(swArea+'/'+dataDir): |
773 |
< |
filesToBeTarred.append(dataDir) |
774 |
< |
|
775 |
< |
## Create the tar-ball |
776 |
< |
if len(filesToBeTarred)>0: |
777 |
< |
cwd = os.getcwd() |
778 |
< |
os.chdir(swArea) |
779 |
< |
tarcmd = 'tar zcvf ' + self.tgzNameWithPath + ' ' |
780 |
< |
for line in filesToBeTarred: |
781 |
< |
tarcmd = tarcmd + line + ' ' |
782 |
< |
cout = runCommand(tarcmd) |
783 |
< |
if not cout: |
784 |
< |
raise CrabException('Could not create tar-ball') |
785 |
< |
os.chdir(cwd) |
786 |
< |
else: |
787 |
< |
common.logger.debug(5,"No files to be to be tarred") |
788 |
< |
|
789 |
< |
return |
790 |
< |
|
791 |
< |
def wsSetupEnvironment(self, nj): |
762 |
> |
|
763 |
> |
### CMSSW ParameterSet |
764 |
> |
if not self.pset is None: |
765 |
> |
cfg_file = common.work_space.jobDir()+self.configFilename() |
766 |
> |
tar.add(cfg_file,self.configFilename()) |
767 |
> |
common.logger.debug(5,"File added to "+self.tgzNameWithPath+" : "+str(tar.getnames())) |
768 |
> |
|
769 |
> |
|
770 |
> |
## Add ProdCommon dir to tar |
771 |
> |
prodcommonDir = './' |
772 |
> |
prodcommonPath = os.environ['CRABDIR'] + '/' + 'external/' |
773 |
> |
neededStuff = ['ProdCommon/__init__.py','ProdCommon/FwkJobRep', 'ProdCommon/CMSConfigTools','ProdCommon/Core','ProdCommon/MCPayloads', 'IMProv'] |
774 |
> |
for file in neededStuff: |
775 |
> |
tar.add(prodcommonPath+file,prodcommonDir+file) |
776 |
> |
common.logger.debug(5,"Files added to "+self.tgzNameWithPath+" : "+str(tar.getnames())) |
777 |
> |
|
778 |
> |
##### ML stuff |
779 |
> |
ML_file_list=['report.py', 'DashboardAPI.py', 'Logger.py', 'ProcInfo.py', 'apmon.py'] |
780 |
> |
path=os.environ['CRABDIR'] + '/python/' |
781 |
> |
for file in ML_file_list: |
782 |
> |
tar.add(path+file,file) |
783 |
> |
common.logger.debug(5,"Files added to "+self.tgzNameWithPath+" : "+str(tar.getnames())) |
784 |
> |
|
785 |
> |
##### Utils |
786 |
> |
Utils_file_list=['parseCrabFjr.py','writeCfg.py', 'fillCrabFjr.py'] |
787 |
> |
for file in Utils_file_list: |
788 |
> |
tar.add(path+file,file) |
789 |
> |
common.logger.debug(5,"Files added to "+self.tgzNameWithPath+" : "+str(tar.getnames())) |
790 |
> |
|
791 |
> |
##### AdditionalFiles |
792 |
> |
for file in self.additional_inbox_files: |
793 |
> |
tar.add(file,string.split(file,'/')[-1]) |
794 |
> |
common.logger.debug(5,"Files added to "+self.tgzNameWithPath+" : "+str(tar.getnames())) |
795 |
> |
|
796 |
> |
tar.close() |
797 |
> |
except IOError: |
798 |
> |
raise CrabException('Could not create tar-ball '+self.tgzNameWithPath) |
799 |
> |
except tarfile.TarError: |
800 |
> |
raise CrabException('Could not create tar-ball '+self.tgzNameWithPath) |
801 |
> |
|
802 |
> |
## check for tarball size |
803 |
> |
tarballinfo = os.stat(self.tgzNameWithPath) |
804 |
> |
if ( tarballinfo.st_size > self.MaxTarBallSize*1024*1024 ) : |
805 |
> |
raise CrabException('Input sandbox size of ' + str(float(tarballinfo.st_size)/1024.0/1024.0) + ' MB is larger than the allowed ' + str(self.MaxTarBallSize) + ' MB input sandbox limit and not supported by the used GRID submission system. Please make sure that no unnecessary files are in all data directories in your local CMSSW project area as they are automatically packed into the input sandbox.') |
806 |
> |
|
807 |
> |
## create tar-ball with ML stuff |
808 |
> |
|
809 |
> |
def wsSetupEnvironment(self, nj=0): |
810 |
|
""" |
811 |
|
Returns part of a job script which prepares |
812 |
|
the execution environment for the job 'nj'. |
813 |
|
""" |
814 |
+ |
if (self.CMSSW_major >= 2 and self.CMSSW_minor >= 1) or (self.CMSSW_major >= 3): |
815 |
+ |
psetName = 'pset.py' |
816 |
+ |
else: |
817 |
+ |
psetName = 'pset.cfg' |
818 |
|
# Prepare JobType-independent part |
819 |
< |
txt = self.wsSetupCMSEnvironment_() |
819 |
> |
txt = '\n#Written by cms_cmssw::wsSetupEnvironment\n' |
820 |
> |
txt += 'echo ">>> setup environment"\n' |
821 |
> |
txt += 'if [ $middleware == LCG ]; then \n' |
822 |
> |
txt += self.wsSetupCMSLCGEnvironment_() |
823 |
> |
txt += 'elif [ $middleware == OSG ]; then\n' |
824 |
> |
txt += ' WORKING_DIR=`/bin/mktemp -d $OSG_WN_TMP/cms_XXXXXXXXXXXX`\n' |
825 |
> |
txt += ' if [ ! $? == 0 ] ;then\n' |
826 |
> |
txt += ' echo "ERROR ==> OSG $WORKING_DIR could not be created on WN `hostname`"\n' |
827 |
> |
txt += ' job_exit_code=10016\n' |
828 |
> |
txt += ' func_exit\n' |
829 |
> |
txt += ' fi\n' |
830 |
> |
txt += ' echo ">>> Created working directory: $WORKING_DIR"\n' |
831 |
> |
txt += '\n' |
832 |
> |
txt += ' echo "Change to working directory: $WORKING_DIR"\n' |
833 |
> |
txt += ' cd $WORKING_DIR\n' |
834 |
> |
txt += ' echo ">>> current directory (WORKING_DIR): $WORKING_DIR"\n' |
835 |
> |
txt += self.wsSetupCMSOSGEnvironment_() |
836 |
> |
txt += 'fi\n' |
837 |
|
|
838 |
|
# Prepare JobType-specific part |
839 |
|
scram = self.scram.commandName() |
840 |
|
txt += '\n\n' |
841 |
< |
txt += 'echo "### SPECIFIC JOB SETUP ENVIRONMENT ###"\n' |
841 |
> |
txt += 'echo ">>> specific cmssw setup environment:"\n' |
842 |
> |
txt += 'echo "CMSSW_VERSION = '+self.version+'"\n' |
843 |
|
txt += scram+' project CMSSW '+self.version+'\n' |
844 |
|
txt += 'status=$?\n' |
845 |
|
txt += 'if [ $status != 0 ] ; then\n' |
846 |
< |
txt += ' echo "SET_EXE_ENV 1 ==>ERROR CMSSW '+self.version+' not found on `hostname`" \n' |
847 |
< |
txt += ' echo "JOB_EXIT_STATUS = 5"\n' |
848 |
< |
txt += ' echo "SanityCheckCode = 5" | tee -a $RUNTIME_AREA/$repo\n' |
369 |
< |
txt += ' dumpStatus $RUNTIME_AREA/$repo\n' |
370 |
< |
txt += ' exit 5 \n' |
846 |
> |
txt += ' echo "ERROR ==> CMSSW '+self.version+' not found on `hostname`" \n' |
847 |
> |
txt += ' job_exit_code=10034\n' |
848 |
> |
txt += ' func_exit\n' |
849 |
|
txt += 'fi \n' |
372 |
– |
txt += 'echo "CMSSW_VERSION = '+self.version+'"\n' |
850 |
|
txt += 'cd '+self.version+'\n' |
851 |
< |
### needed grep for bug in scramv1 ### |
851 |
> |
txt += 'SOFTWARE_DIR=`pwd`\n' |
852 |
> |
txt += 'echo ">>> current directory (SOFTWARE_DIR): $SOFTWARE_DIR" \n' |
853 |
|
txt += 'eval `'+scram+' runtime -sh | grep -v SCRAMRT_LSB_JOBNAME`\n' |
854 |
< |
|
854 |
> |
txt += 'if [ $? != 0 ] ; then\n' |
855 |
> |
txt += ' echo "ERROR ==> Problem with the command: "\n' |
856 |
> |
txt += ' echo "eval \`'+scram+' runtime -sh | grep -v SCRAMRT_LSB_JOBNAME \` at `hostname`"\n' |
857 |
> |
txt += ' job_exit_code=10034\n' |
858 |
> |
txt += ' func_exit\n' |
859 |
> |
txt += 'fi \n' |
860 |
|
# Handle the arguments: |
861 |
|
txt += "\n" |
862 |
< |
txt += "## ARGUMNETS: $1 Job Number\n" |
380 |
< |
# txt += "## ARGUMNETS: $2 First Event for this job\n" |
381 |
< |
# txt += "## ARGUMNETS: $3 Max Event for this job\n" |
862 |
> |
txt += "## number of arguments (first argument always jobnumber)\n" |
863 |
|
txt += "\n" |
864 |
< |
txt += "narg=$#\n" |
384 |
< |
txt += "if [ $narg -lt 1 ]\n" |
864 |
> |
txt += "if [ $nargs -lt "+str(self.argsList)+" ]\n" |
865 |
|
txt += "then\n" |
866 |
< |
txt += " echo 'SET_EXE_ENV 1 ==> ERROR Too few arguments' +$narg+ \n" |
867 |
< |
txt += ' echo "JOB_EXIT_STATUS = 1"\n' |
868 |
< |
txt += ' echo "SanityCheckCode = 1" | tee -a $RUNTIME_AREA/$repo\n' |
389 |
< |
txt += ' dumpStatus $RUNTIME_AREA/$repo\n' |
390 |
< |
txt += " exit 1\n" |
866 |
> |
txt += " echo 'ERROR ==> Too few arguments' +$nargs+ \n" |
867 |
> |
txt += ' job_exit_code=50113\n' |
868 |
> |
txt += " func_exit\n" |
869 |
|
txt += "fi\n" |
870 |
|
txt += "\n" |
393 |
– |
txt += "NJob=$1\n" |
394 |
– |
# txt += "FirstEvent=$2\n" |
395 |
– |
# txt += "MaxEvents=$3\n" |
871 |
|
|
872 |
|
# Prepare job-specific part |
873 |
|
job = common.job_list[nj] |
874 |
< |
pset = os.path.basename(job.configFilename()) |
875 |
< |
txt += '\n' |
876 |
< |
txt += 'cp $RUNTIME_AREA/'+pset+' pset.cfg\n' |
402 |
< |
# txt += 'if [ -e $RUNTIME_AREA/orcarc_$CE ] ; then\n' |
403 |
< |
# txt += ' cat $RUNTIME_AREA/orcarc_$CE .orcarc >> .orcarc_tmp\n' |
404 |
< |
# txt += ' mv .orcarc_tmp .orcarc\n' |
405 |
< |
# txt += 'fi\n' |
406 |
< |
# txt += 'if [ -e $RUNTIME_AREA/init_$CE.sh ] ; then\n' |
407 |
< |
# txt += ' cp $RUNTIME_AREA/init_$CE.sh init.sh\n' |
408 |
< |
# txt += 'fi\n' |
874 |
> |
if (self.datasetPath): |
875 |
> |
txt += '\n' |
876 |
> |
txt += 'DatasetPath='+self.datasetPath+'\n' |
877 |
|
|
878 |
< |
if len(self.additional_inbox_files) > 0: |
879 |
< |
for file in self.additional_inbox_files: |
880 |
< |
txt += 'if [ -e $RUNTIME_AREA/'+file+' ] ; then\n' |
881 |
< |
txt += ' cp $RUNTIME_AREA/'+file+' .\n' |
882 |
< |
txt += ' chmod +x '+file+'\n' |
883 |
< |
txt += 'fi\n' |
884 |
< |
pass |
417 |
< |
|
418 |
< |
# txt += '\n' |
419 |
< |
# txt += 'chmod +x ./init.sh\n' |
420 |
< |
# txt += './init.sh\n' |
421 |
< |
# txt += 'exitStatus=$?\n' |
422 |
< |
# txt += 'if [ $exitStatus != 0 ] ; then\n' |
423 |
< |
# txt += ' echo "SET_EXE_ENV 1 ==> ERROR StageIn init script failed"\n' |
424 |
< |
# txt += ' echo "JOB_EXIT_STATUS = $exitStatus" \n' |
425 |
< |
# txt += ' echo "SanityCheckCode = $exitStatus" | tee -a $RUNTIME_AREA/$repo\n' |
426 |
< |
# txt += ' dumpStatus $RUNTIME_AREA/$repo\n' |
427 |
< |
# txt += ' exit $exitStatus\n' |
428 |
< |
# txt += 'fi\n' |
429 |
< |
# txt += "echo 'SET_EXE_ENV 0 ==> job setup ok'\n" |
430 |
< |
txt += 'echo "### END JOB SETUP ENVIRONMENT ###"\n\n' |
431 |
< |
|
432 |
< |
# txt += 'echo "FirstEvent=$FirstEvent" >> .orcarc\n' |
433 |
< |
# txt += 'echo "MaxEvents=$MaxEvents" >> .orcarc\n' |
434 |
< |
# if self.ML: |
435 |
< |
# txt += 'echo "MonalisaJobId=$NJob" >> .orcarc\n' |
878 |
> |
datasetpath_split = self.datasetPath.split("/") |
879 |
> |
### FEDE FOR NEW LFN ### |
880 |
> |
self.primaryDataset = datasetpath_split[1] |
881 |
> |
######################## |
882 |
> |
txt += 'PrimaryDataset='+datasetpath_split[1]+'\n' |
883 |
> |
txt += 'DataTier='+datasetpath_split[2]+'\n' |
884 |
> |
txt += 'ApplicationFamily=cmsRun\n' |
885 |
|
|
886 |
< |
txt += '\n' |
887 |
< |
txt += 'echo "***** cat pset.cfg *********"\n' |
888 |
< |
txt += 'cat pset.cfg\n' |
889 |
< |
txt += 'echo "****** end pset.cfg ********"\n' |
886 |
> |
else: |
887 |
> |
txt += 'DatasetPath=MCDataTier\n' |
888 |
> |
### FEDE FOR NEW LFN ### |
889 |
> |
self.primaryDataset = 'null' |
890 |
> |
######################## |
891 |
> |
txt += 'PrimaryDataset=null\n' |
892 |
> |
txt += 'DataTier=null\n' |
893 |
> |
txt += 'ApplicationFamily=MCDataTier\n' |
894 |
> |
if self.pset != None: |
895 |
> |
pset = os.path.basename(job.configFilename()) |
896 |
> |
txt += '\n' |
897 |
> |
txt += 'cp $RUNTIME_AREA/'+pset+' .\n' |
898 |
> |
if (self.datasetPath): # standard job |
899 |
> |
txt += 'InputFiles=${args[1]}; export InputFiles\n' |
900 |
> |
if (self.useParent): |
901 |
> |
txt += 'ParentFiles=${args[2]}; export ParentFiles\n' |
902 |
> |
txt += 'MaxEvents=${args[3]}; export MaxEvents\n' |
903 |
> |
txt += 'SkipEvents=${args[4]}; export SkipEvents\n' |
904 |
> |
else: |
905 |
> |
txt += 'MaxEvents=${args[2]}; export MaxEvents\n' |
906 |
> |
txt += 'SkipEvents=${args[3]}; export SkipEvents\n' |
907 |
> |
txt += 'echo "Inputfiles:<$InputFiles>"\n' |
908 |
> |
if (self.useParent): txt += 'echo "ParentFiles:<$ParentFiles>"\n' |
909 |
> |
txt += 'echo "MaxEvents:<$MaxEvents>"\n' |
910 |
> |
txt += 'echo "SkipEvents:<$SkipEvents>"\n' |
911 |
> |
else: # pythia like job |
912 |
> |
txt += 'PreserveSeeds=' + ','.join(self.preserveSeeds) + '; export PreserveSeeds\n' |
913 |
> |
txt += 'IncrementSeeds=' + ','.join(self.incrementSeeds) + '; export IncrementSeeds\n' |
914 |
> |
txt += 'echo "PreserveSeeds: <$PreserveSeeds>"\n' |
915 |
> |
txt += 'echo "IncrementSeeds:<$IncrementSeeds>"\n' |
916 |
> |
if (self.firstRun): |
917 |
> |
txt += 'FirstRun=${args[1]}; export FirstRun\n' |
918 |
> |
txt += 'echo "FirstRun: <$FirstRun>"\n' |
919 |
> |
|
920 |
> |
txt += 'mv -f ' + pset + ' ' + psetName + '\n' |
921 |
> |
|
922 |
> |
|
923 |
> |
if self.pset != None: |
924 |
> |
# FUTURE: Can simply for 2_1_x and higher |
925 |
> |
txt += '\n' |
926 |
> |
if self.debug_wrapper==True: |
927 |
> |
txt += 'echo "***** cat ' + psetName + ' *********"\n' |
928 |
> |
txt += 'cat ' + psetName + '\n' |
929 |
> |
txt += 'echo "****** end ' + psetName + ' ********"\n' |
930 |
> |
txt += '\n' |
931 |
> |
if (self.CMSSW_major >= 2 and self.CMSSW_minor >= 1) or (self.CMSSW_major >= 3): |
932 |
> |
txt += 'PSETHASH=`edmConfigHash ' + psetName + '` \n' |
933 |
> |
else: |
934 |
> |
txt += 'PSETHASH=`edmConfigHash < ' + psetName + '` \n' |
935 |
> |
txt += 'echo "PSETHASH = $PSETHASH" \n' |
936 |
> |
txt += '\n' |
937 |
|
return txt |
938 |
|
|
939 |
< |
def modifySteeringCards(self, nj): |
939 |
> |
def wsUntarSoftware(self, nj=0): |
940 |
|
""" |
941 |
< |
modify the card provided by the user, |
942 |
< |
writing a new card into share dir |
941 |
> |
Put in the script the commands to build an executable |
942 |
> |
or a library. |
943 |
|
""" |
944 |
< |
|
944 |
> |
|
945 |
> |
txt = '\n#Written by cms_cmssw::wsUntarSoftware\n' |
946 |
> |
|
947 |
> |
if os.path.isfile(self.tgzNameWithPath): |
948 |
> |
txt += 'echo ">>> tar xzvf $RUNTIME_AREA/'+os.path.basename(self.tgzNameWithPath)+' :" \n' |
949 |
> |
txt += 'tar xzvf $RUNTIME_AREA/'+os.path.basename(self.tgzNameWithPath)+'\n' |
950 |
> |
if self.debug_wrapper: |
951 |
> |
txt += 'ls -Al \n' |
952 |
> |
txt += 'untar_status=$? \n' |
953 |
> |
txt += 'if [ $untar_status -ne 0 ]; then \n' |
954 |
> |
txt += ' echo "ERROR ==> Untarring .tgz file failed"\n' |
955 |
> |
txt += ' job_exit_code=$untar_status\n' |
956 |
> |
txt += ' func_exit\n' |
957 |
> |
txt += 'else \n' |
958 |
> |
txt += ' echo "Successful untar" \n' |
959 |
> |
txt += 'fi \n' |
960 |
> |
txt += '\n' |
961 |
> |
txt += 'echo ">>> Include $RUNTIME_AREA in PYTHONPATH:"\n' |
962 |
> |
txt += 'if [ -z "$PYTHONPATH" ]; then\n' |
963 |
> |
txt += ' export PYTHONPATH=$RUNTIME_AREA/\n' |
964 |
> |
txt += 'else\n' |
965 |
> |
txt += ' export PYTHONPATH=$RUNTIME_AREA/:${PYTHONPATH}\n' |
966 |
> |
txt += 'echo "PYTHONPATH=$PYTHONPATH"\n' |
967 |
> |
txt += 'fi\n' |
968 |
> |
txt += '\n' |
969 |
> |
|
970 |
> |
pass |
971 |
> |
|
972 |
> |
return txt |
973 |
> |
|
974 |
> |
def wsBuildExe(self, nj=0): |
975 |
> |
""" |
976 |
> |
Put in the script the commands to build an executable |
977 |
> |
or a library. |
978 |
> |
""" |
979 |
> |
|
980 |
> |
txt = '\n#Written by cms_cmssw::wsBuildExe\n' |
981 |
> |
txt += 'echo ">>> moving CMSSW software directories in `pwd`" \n' |
982 |
> |
|
983 |
> |
txt += 'rm -r lib/ module/ \n' |
984 |
> |
txt += 'mv $RUNTIME_AREA/lib/ . \n' |
985 |
> |
txt += 'mv $RUNTIME_AREA/module/ . \n' |
986 |
> |
if self.dataExist == True: |
987 |
> |
txt += 'rm -r src/ \n' |
988 |
> |
txt += 'mv $RUNTIME_AREA/src/ . \n' |
989 |
> |
if len(self.additional_inbox_files)>0: |
990 |
> |
for file in self.additional_inbox_files: |
991 |
> |
txt += 'mv $RUNTIME_AREA/'+os.path.basename(file)+' . \n' |
992 |
> |
# txt += 'mv $RUNTIME_AREA/ProdCommon/ . \n' |
993 |
> |
# txt += 'mv $RUNTIME_AREA/IMProv/ . \n' |
994 |
> |
|
995 |
> |
txt += 'echo ">>> Include $RUNTIME_AREA in PYTHONPATH:"\n' |
996 |
> |
txt += 'if [ -z "$PYTHONPATH" ]; then\n' |
997 |
> |
txt += ' export PYTHONPATH=$RUNTIME_AREA/\n' |
998 |
> |
txt += 'else\n' |
999 |
> |
txt += ' export PYTHONPATH=$RUNTIME_AREA/:${PYTHONPATH}\n' |
1000 |
> |
txt += 'echo "PYTHONPATH=$PYTHONPATH"\n' |
1001 |
> |
txt += 'fi\n' |
1002 |
> |
txt += '\n' |
1003 |
> |
|
1004 |
> |
return txt |
1005 |
> |
|
1006 |
> |
|
1007 |
|
def executableName(self): |
1008 |
< |
return self.executable |
1008 |
> |
if self.scriptExe: |
1009 |
> |
return "sh " |
1010 |
> |
else: |
1011 |
> |
return self.executable |
1012 |
|
|
1013 |
|
def executableArgs(self): |
1014 |
< |
return "-p pset.cfg" |
1014 |
> |
# FUTURE: This function tests the CMSSW version. Can be simplified as we drop support for old versions |
1015 |
> |
if self.scriptExe:#CarlosDaniele |
1016 |
> |
return self.scriptExe + " $NJob" |
1017 |
> |
else: |
1018 |
> |
ex_args = "" |
1019 |
> |
# FUTURE: This tests the CMSSW version. Can remove code as versions deprecated |
1020 |
> |
# Framework job report |
1021 |
> |
if (self.CMSSW_major >= 1 and self.CMSSW_minor >= 5) or (self.CMSSW_major >= 2): |
1022 |
> |
ex_args += " -j $RUNTIME_AREA/crab_fjr_$NJob.xml" |
1023 |
> |
# Type of config file |
1024 |
> |
if self.CMSSW_major >= 2 : |
1025 |
> |
ex_args += " -p pset.py" |
1026 |
> |
else: |
1027 |
> |
ex_args += " -p pset.cfg" |
1028 |
> |
return ex_args |
1029 |
|
|
1030 |
|
def inputSandbox(self, nj): |
1031 |
|
""" |
1032 |
|
Returns a list of filenames to be put in JDL input sandbox. |
1033 |
|
""" |
1034 |
|
inp_box = [] |
460 |
– |
# dict added to delete duplicate from input sandbox file list |
461 |
– |
seen = {} |
462 |
– |
## code |
1035 |
|
if os.path.isfile(self.tgzNameWithPath): |
1036 |
|
inp_box.append(self.tgzNameWithPath) |
1037 |
< |
## config |
1038 |
< |
inp_box.append(common.job_list[nj].configFilename()) |
467 |
< |
## additional input files |
468 |
< |
for file in self.additional_inbox_files: |
469 |
< |
inp_box.append(common.work_space.cwdDir()+file) |
470 |
< |
#print "sono inputSandbox, inp_box = ", inp_box |
1037 |
> |
wrapper = os.path.basename(str(common._db.queryTask('scriptName'))) |
1038 |
> |
inp_box.append(common.work_space.pathForTgz() +'job/'+ wrapper) |
1039 |
|
return inp_box |
1040 |
|
|
1041 |
|
def outputSandbox(self, nj): |
1044 |
|
""" |
1045 |
|
out_box = [] |
1046 |
|
|
479 |
– |
stdout=common.job_list[nj].stdout() |
480 |
– |
stderr=common.job_list[nj].stderr() |
481 |
– |
|
1047 |
|
## User Declared output files |
1048 |
< |
for out in self.output_file: |
1049 |
< |
n_out = nj + 1 |
1050 |
< |
out_box.append(self.numberFile_(out,str(n_out))) |
1048 |
> |
for out in (self.output_file+self.output_file_sandbox): |
1049 |
> |
n_out = nj + 1 |
1050 |
> |
out_box.append(numberFile(out,str(n_out))) |
1051 |
|
return out_box |
487 |
– |
return [] |
1052 |
|
|
1053 |
< |
def prepareSteeringCards(self): |
1053 |
> |
|
1054 |
> |
def wsRenameOutput(self, nj): |
1055 |
|
""" |
1056 |
< |
Make initial modifications of the user's steering card file. |
1056 |
> |
Returns part of a job script which renames the produced files. |
1057 |
|
""" |
493 |
– |
infile = open(self.pset,'r') |
494 |
– |
|
495 |
– |
outfile = open(common.work_space.jobDir()+self.name()+'.cfg', 'w') |
496 |
– |
|
497 |
– |
outfile.write('\n\n##### The following cards have been created by CRAB: DO NOT TOUCH #####\n') |
1058 |
|
|
1059 |
< |
outfile.write('InputCollections=/System/'+self.owner+'/'+self.dataset+'/'+self.dataset+'\n') |
1059 |
> |
txt = '\n#Written by cms_cmssw::wsRenameOutput\n' |
1060 |
> |
txt += 'echo ">>> current directory (SOFTWARE_DIR): $SOFTWARE_DIR" \n' |
1061 |
> |
txt += 'echo ">>> current directory content:"\n' |
1062 |
> |
if self.debug_wrapper: |
1063 |
> |
txt += 'ls -Al\n' |
1064 |
> |
txt += '\n' |
1065 |
|
|
1066 |
< |
infile.close() |
1067 |
< |
outfile.close() |
1068 |
< |
return |
1066 |
> |
for fileWithSuffix in (self.output_file): |
1067 |
> |
output_file_num = numberFile(fileWithSuffix, '$NJob') |
1068 |
> |
txt += '\n' |
1069 |
> |
txt += '# check output file\n' |
1070 |
> |
txt += 'if [ -e ./'+fileWithSuffix+' ] ; then\n' |
1071 |
> |
if (self.copy_data == 1): # For OSG nodes, file is in $WORKING_DIR, should not be moved to $RUNTIME_AREA |
1072 |
> |
txt += ' mv '+fileWithSuffix+' '+output_file_num+'\n' |
1073 |
> |
txt += ' ln -s `pwd`/'+output_file_num+' $RUNTIME_AREA/'+fileWithSuffix+'\n' |
1074 |
> |
else: |
1075 |
> |
txt += ' mv '+fileWithSuffix+' $RUNTIME_AREA/'+output_file_num+'\n' |
1076 |
> |
txt += ' ln -s $RUNTIME_AREA/'+output_file_num+' $RUNTIME_AREA/'+fileWithSuffix+'\n' |
1077 |
> |
txt += 'else\n' |
1078 |
> |
txt += ' job_exit_code=60302\n' |
1079 |
> |
txt += ' echo "WARNING: Output file '+fileWithSuffix+' not found"\n' |
1080 |
> |
if common.scheduler.name().upper() == 'CONDOR_G': |
1081 |
> |
txt += ' if [ $middleware == OSG ]; then \n' |
1082 |
> |
txt += ' echo "prepare dummy output file"\n' |
1083 |
> |
txt += ' echo "Processing of job output failed" > $RUNTIME_AREA/'+output_file_num+'\n' |
1084 |
> |
txt += ' fi \n' |
1085 |
> |
txt += 'fi\n' |
1086 |
> |
file_list = [] |
1087 |
> |
for fileWithSuffix in (self.output_file): |
1088 |
> |
file_list.append(numberFile(fileWithSuffix, '$NJob')) |
1089 |
|
|
1090 |
< |
def wsRenameOutput(self, nj): |
1090 |
> |
txt += 'file_list="'+string.join(file_list,' ')+'"\n' |
1091 |
> |
txt += '\n' |
1092 |
> |
txt += 'echo ">>> current directory (SOFTWARE_DIR): $SOFTWARE_DIR" \n' |
1093 |
> |
txt += 'echo ">>> current directory content:"\n' |
1094 |
> |
if self.debug_wrapper: |
1095 |
> |
txt += 'ls -Al\n' |
1096 |
> |
txt += '\n' |
1097 |
> |
txt += 'cd $RUNTIME_AREA\n' |
1098 |
> |
txt += 'echo ">>> current directory (RUNTIME_AREA): $RUNTIME_AREA"\n' |
1099 |
> |
return txt |
1100 |
> |
|
1101 |
> |
def getRequirements(self, nj=[]): |
1102 |
|
""" |
1103 |
< |
Returns part of a job script which renames the produced files. |
1103 |
> |
return job requirements to add to jdl files |
1104 |
|
""" |
1105 |
+ |
req = '' |
1106 |
+ |
if self.version: |
1107 |
+ |
req='Member("VO-cms-' + \ |
1108 |
+ |
self.version + \ |
1109 |
+ |
'", other.GlueHostApplicationSoftwareRunTimeEnvironment)' |
1110 |
+ |
if self.executable_arch: |
1111 |
+ |
req+=' && Member("VO-cms-' + \ |
1112 |
+ |
self.executable_arch + \ |
1113 |
+ |
'", other.GlueHostApplicationSoftwareRunTimeEnvironment)' |
1114 |
+ |
|
1115 |
+ |
req = req + ' && (other.GlueHostNetworkAdapterOutboundIP)' |
1116 |
+ |
if ( common.scheduler.name() == "glitecoll" ) or ( common.scheduler.name() == "glite"): |
1117 |
+ |
req += ' && other.GlueCEStateStatus == "Production" ' |
1118 |
|
|
1119 |
< |
txt = '\n' |
1120 |
< |
file_list = '' |
1121 |
< |
for fileWithSuffix in self.output_file: |
1122 |
< |
output_file_num = self.numberFile_(fileWithSuffix, '$NJob') |
1123 |
< |
file_list=file_list+output_file_num+' ' |
1124 |
< |
txt += '\n' |
1125 |
< |
txt += 'ls \n' |
1126 |
< |
txt += '\n' |
1127 |
< |
txt += 'ls '+fileWithSuffix+'\n' |
1128 |
< |
txt += 'exe_result=$?\n' |
1129 |
< |
txt += 'if [ $exe_result -ne 0 ] ; then\n' |
1130 |
< |
txt += ' echo "ERROR: No output file to manage"\n' |
1131 |
< |
txt += ' echo "JOB_EXIT_STATUS = $exe_result"\n' |
1132 |
< |
txt += ' echo "SanityCheckCode = $exe_result" | tee -a $RUNTIME_AREA/$repo\n' |
1133 |
< |
txt += ' dumpStatus $RUNTIME_AREA/$repo\n' |
1134 |
< |
txt += ' exit $exe_result \n' |
1119 |
> |
return req |
1120 |
> |
|
1121 |
> |
def configFilename(self): |
1122 |
> |
""" return the config filename """ |
1123 |
> |
# FUTURE: Can remove cfg mode for CMSSW >= 2_1_x |
1124 |
> |
if (self.CMSSW_major >= 2 and self.CMSSW_minor >= 1) or (self.CMSSW_major >= 3): |
1125 |
> |
return self.name()+'.py' |
1126 |
> |
else: |
1127 |
> |
return self.name()+'.cfg' |
1128 |
> |
|
1129 |
> |
def wsSetupCMSOSGEnvironment_(self): |
1130 |
> |
""" |
1131 |
> |
Returns part of a job script which is prepares |
1132 |
> |
the execution environment and which is common for all CMS jobs. |
1133 |
> |
""" |
1134 |
> |
txt = '\n#Written by cms_cmssw::wsSetupCMSOSGEnvironment_\n' |
1135 |
> |
txt += ' echo ">>> setup CMS OSG environment:"\n' |
1136 |
> |
txt += ' echo "set SCRAM ARCH to ' + self.executable_arch + '"\n' |
1137 |
> |
txt += ' export SCRAM_ARCH='+self.executable_arch+'\n' |
1138 |
> |
txt += ' echo "SCRAM_ARCH = $SCRAM_ARCH"\n' |
1139 |
> |
txt += ' if [ -f $OSG_APP/cmssoft/cms/cmsset_default.sh ] ;then\n' |
1140 |
> |
txt += ' # Use $OSG_APP/cmssoft/cms/cmsset_default.sh to setup cms software\n' |
1141 |
> |
txt += ' source $OSG_APP/cmssoft/cms/cmsset_default.sh '+self.version+'\n' |
1142 |
> |
txt += ' else\n' |
1143 |
> |
txt += ' echo "ERROR ==> $OSG_APP/cmssoft/cms/cmsset_default.sh file not found"\n' |
1144 |
> |
txt += ' job_exit_code=10020\n' |
1145 |
> |
txt += ' func_exit\n' |
1146 |
> |
txt += ' fi\n' |
1147 |
> |
txt += '\n' |
1148 |
> |
txt += ' echo "==> setup cms environment ok"\n' |
1149 |
> |
txt += ' echo "SCRAM_ARCH = $SCRAM_ARCH"\n' |
1150 |
> |
|
1151 |
> |
return txt |
1152 |
> |
|
1153 |
> |
def wsSetupCMSLCGEnvironment_(self): |
1154 |
> |
""" |
1155 |
> |
Returns part of a job script which is prepares |
1156 |
> |
the execution environment and which is common for all CMS jobs. |
1157 |
> |
""" |
1158 |
> |
txt = '\n#Written by cms_cmssw::wsSetupCMSLCGEnvironment_\n' |
1159 |
> |
txt += ' echo ">>> setup CMS LCG environment:"\n' |
1160 |
> |
txt += ' echo "set SCRAM ARCH and BUILD_ARCH to ' + self.executable_arch + ' ###"\n' |
1161 |
> |
txt += ' export SCRAM_ARCH='+self.executable_arch+'\n' |
1162 |
> |
txt += ' export BUILD_ARCH='+self.executable_arch+'\n' |
1163 |
> |
txt += ' if [ ! $VO_CMS_SW_DIR ] ;then\n' |
1164 |
> |
txt += ' echo "ERROR ==> CMS software dir not found on WN `hostname`"\n' |
1165 |
> |
txt += ' job_exit_code=10031\n' |
1166 |
> |
txt += ' func_exit\n' |
1167 |
> |
txt += ' else\n' |
1168 |
> |
txt += ' echo "Sourcing environment... "\n' |
1169 |
> |
txt += ' if [ ! -s $VO_CMS_SW_DIR/cmsset_default.sh ] ;then\n' |
1170 |
> |
txt += ' echo "ERROR ==> cmsset_default.sh file not found into dir $VO_CMS_SW_DIR"\n' |
1171 |
> |
txt += ' job_exit_code=10020\n' |
1172 |
> |
txt += ' func_exit\n' |
1173 |
> |
txt += ' fi\n' |
1174 |
> |
txt += ' echo "sourcing $VO_CMS_SW_DIR/cmsset_default.sh"\n' |
1175 |
> |
txt += ' source $VO_CMS_SW_DIR/cmsset_default.sh\n' |
1176 |
> |
txt += ' result=$?\n' |
1177 |
> |
txt += ' if [ $result -ne 0 ]; then\n' |
1178 |
> |
txt += ' echo "ERROR ==> problem sourcing $VO_CMS_SW_DIR/cmsset_default.sh"\n' |
1179 |
> |
txt += ' job_exit_code=10032\n' |
1180 |
> |
txt += ' func_exit\n' |
1181 |
> |
txt += ' fi\n' |
1182 |
> |
txt += ' fi\n' |
1183 |
> |
txt += ' \n' |
1184 |
> |
txt += ' echo "==> setup cms environment ok"\n' |
1185 |
> |
return txt |
1186 |
> |
|
1187 |
> |
def modifyReport(self, nj): |
1188 |
> |
""" |
1189 |
> |
insert the part of the script that modifies the FrameworkJob Report |
1190 |
> |
""" |
1191 |
> |
txt = '\n#Written by cms_cmssw::modifyReport\n' |
1192 |
> |
publish_data = int(self.cfg_params.get('USER.publish_data',0)) |
1193 |
> |
if (publish_data == 1): |
1194 |
> |
processedDataset = self.cfg_params['USER.publish_data_name'] |
1195 |
> |
if (self.primaryDataset == 'null'): |
1196 |
> |
self.primaryDataset = processedDataset |
1197 |
> |
if (common.scheduler.name().upper() == "CAF" or common.scheduler.name().upper() == "LSF"): |
1198 |
> |
### FEDE FOR NEW LFN ### |
1199 |
> |
LFNBaseName = LFNBase(self.primaryDataset, processedDataset, LocalUser=True) |
1200 |
> |
self.user = getUserName(LocalUser=True) |
1201 |
> |
######################## |
1202 |
> |
else : |
1203 |
> |
### FEDE FOR NEW LFN ### |
1204 |
> |
LFNBaseName = LFNBase(self.primaryDataset, processedDataset) |
1205 |
> |
self.user = getUserName() |
1206 |
> |
######################## |
1207 |
> |
|
1208 |
> |
txt += 'if [ $copy_exit_status -eq 0 ]; then\n' |
1209 |
> |
### FEDE FOR NEW LFN ### |
1210 |
> |
#txt += ' FOR_LFN=%s_${PSETHASH}/\n'%(LFNBaseName) |
1211 |
> |
txt += ' FOR_LFN=%s/${PSETHASH}/\n'%(LFNBaseName) |
1212 |
> |
######################## |
1213 |
|
txt += 'else\n' |
1214 |
< |
txt += ' cp '+fileWithSuffix+' $RUNTIME_AREA/'+output_file_num+'\n' |
1214 |
> |
txt += ' FOR_LFN=/copy_problems/ \n' |
1215 |
> |
txt += ' SE=""\n' |
1216 |
> |
txt += ' SE_PATH=""\n' |
1217 |
> |
txt += 'fi\n' |
1218 |
> |
|
1219 |
> |
txt += 'echo ">>> Modify Job Report:" \n' |
1220 |
> |
txt += 'chmod a+x $RUNTIME_AREA/ProdCommon/FwkJobRep/ModifyJobReport.py\n' |
1221 |
> |
txt += 'ProcessedDataset='+processedDataset+'\n' |
1222 |
> |
txt += 'echo "ProcessedDataset = $ProcessedDataset"\n' |
1223 |
> |
txt += 'echo "SE = $SE"\n' |
1224 |
> |
txt += 'echo "SE_PATH = $SE_PATH"\n' |
1225 |
> |
txt += 'echo "FOR_LFN = $FOR_LFN" \n' |
1226 |
> |
txt += 'echo "CMSSW_VERSION = $CMSSW_VERSION"\n\n' |
1227 |
> |
### FEDE FOR NEW LFN ### |
1228 |
> |
txt += 'echo "$RUNTIME_AREA/ProdCommon/FwkJobRep/ModifyJobReport.py $RUNTIME_AREA/crab_fjr_$NJob.xml $NJob $FOR_LFN $PrimaryDataset $DataTier ' + self.user + '-$ProcessedDataset-$PSETHASH $ApplicationFamily $executable $CMSSW_VERSION $PSETHASH $SE $SE_PATH"\n' |
1229 |
> |
txt += '$RUNTIME_AREA/ProdCommon/FwkJobRep/ModifyJobReport.py $RUNTIME_AREA/crab_fjr_$NJob.xml $NJob $FOR_LFN $PrimaryDataset $DataTier ' + self.user + '-$ProcessedDataset-$PSETHASH $ApplicationFamily $executable $CMSSW_VERSION $PSETHASH $SE $SE_PATH\n' |
1230 |
> |
######################## |
1231 |
> |
txt += 'modifyReport_result=$?\n' |
1232 |
> |
txt += 'if [ $modifyReport_result -ne 0 ]; then\n' |
1233 |
> |
txt += ' modifyReport_result=70500\n' |
1234 |
> |
txt += ' job_exit_code=$modifyReport_result\n' |
1235 |
> |
txt += ' echo "ModifyReportResult=$modifyReport_result" | tee -a $RUNTIME_AREA/$repo\n' |
1236 |
> |
txt += ' echo "WARNING: Problem with ModifyJobReport"\n' |
1237 |
> |
txt += 'else\n' |
1238 |
> |
txt += ' mv NewFrameworkJobReport.xml $RUNTIME_AREA/crab_fjr_$NJob.xml\n' |
1239 |
|
txt += 'fi\n' |
529 |
– |
txt += 'cd $RUNTIME_AREA\n' |
530 |
– |
|
531 |
– |
pass |
532 |
– |
|
533 |
– |
file_list=file_list[:-1] |
534 |
– |
txt += 'file_list="'+file_list+'"\n' |
1240 |
|
return txt |
1241 |
|
|
1242 |
< |
def numberFile_(self, file, txt): |
1242 |
> |
def wsParseFJR(self): |
1243 |
|
""" |
1244 |
< |
append _'txt' before last extension of a file |
1244 |
> |
Parse the FrameworkJobReport to obtain useful infos |
1245 |
|
""" |
1246 |
< |
p = string.split(file,".") |
1247 |
< |
# take away last extension |
1248 |
< |
name = p[0] |
1249 |
< |
for x in p[1:-1]: |
1250 |
< |
name=name+"."+x |
1251 |
< |
# add "_txt" |
1252 |
< |
if len(p)>1: |
1253 |
< |
ext = p[len(p)-1] |
1254 |
< |
#result = name + '_' + str(txt) + "." + ext |
1255 |
< |
result = name + '_' + txt + "." + ext |
1256 |
< |
else: |
1257 |
< |
#result = name + '_' + str(txt) |
1258 |
< |
result = name + '_' + txt |
1259 |
< |
|
1260 |
< |
return result |
1246 |
> |
txt = '\n#Written by cms_cmssw::wsParseFJR\n' |
1247 |
> |
txt += 'echo ">>> Parse FrameworkJobReport crab_fjr.xml"\n' |
1248 |
> |
txt += 'if [ -s $RUNTIME_AREA/crab_fjr_$NJob.xml ]; then\n' |
1249 |
> |
txt += ' if [ -s $RUNTIME_AREA/parseCrabFjr.py ]; then\n' |
1250 |
> |
txt += ' cmd_out=`python $RUNTIME_AREA/parseCrabFjr.py --input $RUNTIME_AREA/crab_fjr_$NJob.xml --dashboard $MonitorID,$MonitorJobID '+self.debugWrap+'`\n' |
1251 |
> |
if self.debug_wrapper : |
1252 |
> |
txt += ' echo "Result of parsing the FrameworkJobReport crab_fjr.xml: $cmd_out"\n' |
1253 |
> |
txt += ' executable_exit_status=`python $RUNTIME_AREA/parseCrabFjr.py --input $RUNTIME_AREA/crab_fjr_$NJob.xml --exitcode`\n' |
1254 |
> |
txt += ' if [ $executable_exit_status -eq 50115 ];then\n' |
1255 |
> |
txt += ' echo ">>> crab_fjr.xml contents: "\n' |
1256 |
> |
txt += ' cat $RUNTIME_AREA/crab_fjr_$NJob.xml\n' |
1257 |
> |
txt += ' echo "Wrong FrameworkJobReport --> does not contain useful info. ExitStatus: $executable_exit_status"\n' |
1258 |
> |
txt += ' elif [ $executable_exit_status -eq -999 ];then\n' |
1259 |
> |
txt += ' echo "ExitStatus from FrameworkJobReport not available. not available. Using exit code of executable from command line."\n' |
1260 |
> |
txt += ' else\n' |
1261 |
> |
txt += ' echo "Extracted ExitStatus from FrameworkJobReport parsing output: $executable_exit_status"\n' |
1262 |
> |
txt += ' fi\n' |
1263 |
> |
txt += ' else\n' |
1264 |
> |
txt += ' echo "CRAB python script to parse CRAB FrameworkJobReport crab_fjr.xml is not available, using exit code of executable from command line."\n' |
1265 |
> |
txt += ' fi\n' |
1266 |
> |
#### Patch to check input data reading for CMSSW16x Hopefully we-ll remove it asap |
1267 |
> |
|
1268 |
> |
if (self.datasetPath and not (self.dataset_pu or self.useParent) : |
1269 |
> |
# VERIFY PROCESSED DATA |
1270 |
> |
txt += ' if [ $executable_exit_status -eq 0 ];then\n' |
1271 |
> |
txt += ' echo ">>> Verify list of processed files:"\n' |
1272 |
> |
txt += ' echo $InputFiles |tr -d \'\\\\\' |tr \',\' \'\\n\'|tr -d \'"\' > input-files.txt\n' |
1273 |
> |
txt += ' python $RUNTIME_AREA/parseCrabFjr.py --input $RUNTIME_AREA/crab_fjr_$NJob.xml --lfn > processed-files.txt\n' |
1274 |
> |
txt += ' cat input-files.txt | sort | uniq > tmp.txt\n' |
1275 |
> |
txt += ' mv tmp.txt input-files.txt\n' |
1276 |
> |
txt += ' echo "cat input-files.txt"\n' |
1277 |
> |
txt += ' echo "----------------------"\n' |
1278 |
> |
txt += ' cat input-files.txt\n' |
1279 |
> |
txt += ' cat processed-files.txt | sort | uniq > tmp.txt\n' |
1280 |
> |
txt += ' mv tmp.txt processed-files.txt\n' |
1281 |
> |
txt += ' echo "----------------------"\n' |
1282 |
> |
txt += ' echo "cat processed-files.txt"\n' |
1283 |
> |
txt += ' echo "----------------------"\n' |
1284 |
> |
txt += ' cat processed-files.txt\n' |
1285 |
> |
txt += ' echo "----------------------"\n' |
1286 |
> |
txt += ' diff -q input-files.txt processed-files.txt\n' |
1287 |
> |
txt += ' fileverify_status=$?\n' |
1288 |
> |
txt += ' if [ $fileverify_status -ne 0 ]; then\n' |
1289 |
> |
txt += ' executable_exit_status=30001\n' |
1290 |
> |
txt += ' echo "ERROR ==> not all input files processed"\n' |
1291 |
> |
txt += ' echo " ==> list of processed files from crab_fjr.xml differs from list in pset.cfg"\n' |
1292 |
> |
txt += ' echo " ==> diff input-files.txt processed-files.txt"\n' |
1293 |
> |
txt += ' fi\n' |
1294 |
> |
txt += ' fi\n' |
1295 |
> |
txt += '\n' |
1296 |
> |
txt += 'else\n' |
1297 |
> |
txt += ' echo "CRAB FrameworkJobReport crab_fjr.xml is not available, using exit code of executable from command line."\n' |
1298 |
> |
txt += 'fi\n' |
1299 |
> |
txt += '\n' |
1300 |
> |
txt += 'echo "ExeExitCode=$executable_exit_status" | tee -a $RUNTIME_AREA/$repo\n' |
1301 |
> |
txt += 'echo "EXECUTABLE_EXIT_STATUS = $executable_exit_status"\n' |
1302 |
> |
txt += 'job_exit_code=$executable_exit_status\n' |
1303 |
> |
|
1304 |
> |
return txt |
1305 |
|
|
1306 |
< |
def getRequirements(self): |
1306 |
> |
def setParam_(self, param, value): |
1307 |
> |
self._params[param] = value |
1308 |
> |
|
1309 |
> |
def getParams(self): |
1310 |
> |
return self._params |
1311 |
> |
|
1312 |
> |
def uniquelist(self, old): |
1313 |
|
""" |
1314 |
< |
return job requirements to add to jdl files |
1314 |
> |
remove duplicates from a list |
1315 |
|
""" |
1316 |
< |
req = '' |
1317 |
< |
if common.analisys_common_info['sites']: |
1318 |
< |
if common.analisys_common_info['sw_version']: |
1319 |
< |
req='Member("VO-cms-' + \ |
1320 |
< |
common.analisys_common_info['sw_version'] + \ |
1321 |
< |
'", other.GlueHostApplicationSoftwareRunTimeEnvironment)' |
1322 |
< |
if len(common.analisys_common_info['sites'])>0: |
1323 |
< |
req = req + ' && (' |
1324 |
< |
for i in range(len(common.analisys_common_info['sites'])): |
1325 |
< |
req = req + 'other.GlueCEInfoHostName == "' \ |
1326 |
< |
+ common.analisys_common_info['sites'][i] + '"' |
1327 |
< |
if ( i < (int(len(common.analisys_common_info['sites']) - 1)) ): |
1328 |
< |
req = req + ' || ' |
1329 |
< |
req = req + ')' |
1330 |
< |
#print "req = ", req |
1331 |
< |
return req |
1316 |
> |
nd={} |
1317 |
> |
for e in old: |
1318 |
> |
nd[e]=0 |
1319 |
> |
return nd.keys() |
1320 |
> |
|
1321 |
> |
def outList(self): |
1322 |
> |
""" |
1323 |
> |
check the dimension of the output files |
1324 |
> |
""" |
1325 |
> |
txt = '' |
1326 |
> |
txt += 'echo ">>> list of expected files on output sandbox"\n' |
1327 |
> |
listOutFiles = [] |
1328 |
> |
stdout = 'CMSSW_$NJob.stdout' |
1329 |
> |
stderr = 'CMSSW_$NJob.stderr' |
1330 |
> |
if (self.return_data == 1): |
1331 |
> |
for file in (self.output_file+self.output_file_sandbox): |
1332 |
> |
listOutFiles.append(numberFile(file, '$NJob')) |
1333 |
> |
listOutFiles.append(stdout) |
1334 |
> |
listOutFiles.append(stderr) |
1335 |
> |
else: |
1336 |
> |
for file in (self.output_file_sandbox): |
1337 |
> |
listOutFiles.append(numberFile(file, '$NJob')) |
1338 |
> |
listOutFiles.append(stdout) |
1339 |
> |
listOutFiles.append(stderr) |
1340 |
> |
txt += 'echo "output files: '+string.join(listOutFiles,' ')+'"\n' |
1341 |
> |
txt += 'filesToCheck="'+string.join(listOutFiles,' ')+'"\n' |
1342 |
> |
txt += 'export filesToCheck\n' |
1343 |
> |
return txt |