ViewVC Help
View File | Revision Log | Show Annotations | Root Listing
root/cvsroot/COMP/CRAB/python/cms_cmssw.py
Revision: 1.9
Committed: Tue Jun 20 15:39:17 2006 UTC (18 years, 10 months ago) by slacapra
Content type: text/x-python
Branch: MAIN
Changes since 1.8: +87 -56 lines
Log Message:
support for "none" datasetpath" and split by events

File Contents

# User Rev Content
1 slacapra 1.1 from JobType import JobType
2     from crab_logger import Logger
3     from crab_exceptions import *
4     from crab_util import *
5     import common
6 gutsche 1.3 import PsetManipulator
7 slacapra 1.1
8 gutsche 1.3 import DBSInfo_EDM
9     import DataDiscovery_EDM
10     import DataLocation_EDM
11 slacapra 1.1 import Scram
12    
13     import os, string, re
14    
15     class Cmssw(JobType):
16     def __init__(self, cfg_params):
17     JobType.__init__(self, 'CMSSW')
18     common.logger.debug(3,'CMSSW::__init__')
19    
20     self.analisys_common_info = {}
21 gutsche 1.3 # Marco.
22     self._params = {}
23     self.cfg_params = cfg_params
24 slacapra 1.1 log = common.logger
25    
26     self.scram = Scram.Scram(cfg_params)
27     scramArea = ''
28     self.additional_inbox_files = []
29     self.scriptExe = ''
30     self.executable = ''
31     self.tgz_name = 'default.tgz'
32    
33 gutsche 1.3
34 slacapra 1.1 self.version = self.scram.getSWVersion()
35 gutsche 1.5 self.setParam_('application', self.version)
36 slacapra 1.1 common.analisys_common_info['sw_version'] = self.version
37 gutsche 1.3 ### FEDE
38     common.analisys_common_info['copy_input_data'] = 0
39     common.analisys_common_info['events_management'] = 1
40 slacapra 1.1
41     ### collect Data cards
42     try:
43 slacapra 1.9 tmp = cfg_params['CMSSW.datasetpath']
44     log.debug(6, "CMSSW::CMSSW(): datasetPath = "+tmp)
45     if string.lower(tmp)=='none':
46     self.datasetPath = None
47     else:
48     self.datasetPath = tmp
49 slacapra 1.1 except KeyError:
50 gutsche 1.3 msg = "Error: datasetpath not defined "
51 slacapra 1.1 raise CrabException(msg)
52 gutsche 1.5
53     # ML monitoring
54     # split dataset path style: /PreProdR3Minbias/SIM/GEN-SIM
55 slacapra 1.9 if not self.datasetPath:
56     self.setParam_('dataset', 'None')
57     self.setParam_('owner', 'None')
58     else:
59     datasetpath_split = self.datasetPath.split("/")
60     self.setParam_('dataset', datasetpath_split[1])
61     self.setParam_('owner', datasetpath_split[-1])
62    
63 gutsche 1.8 self.setTaskid_()
64     self.setParam_('taskId', self.cfg_params['taskId'])
65 gutsche 1.5
66 slacapra 1.1 self.dataTiers = []
67    
68     ## now the application
69     try:
70     self.executable = cfg_params['CMSSW.executable']
71 gutsche 1.5 self.setParam_('exe', self.executable)
72 slacapra 1.1 log.debug(6, "CMSSW::CMSSW(): executable = "+self.executable)
73     msg = "Default executable cmsRun overridden. Switch to " + self.executable
74     log.debug(3,msg)
75     except KeyError:
76     self.executable = 'cmsRun'
77 gutsche 1.5 self.setParam_('exe', self.executable)
78 slacapra 1.1 msg = "User executable not defined. Use cmsRun"
79     log.debug(3,msg)
80     pass
81    
82     try:
83     self.pset = cfg_params['CMSSW.pset']
84     log.debug(6, "Cmssw::Cmssw(): PSet file = "+self.pset)
85     if (not os.path.exists(self.pset)):
86     raise CrabException("User defined PSet file "+self.pset+" does not exist")
87     except KeyError:
88     raise CrabException("PSet file missing. Cannot run cmsRun ")
89    
90     # output files
91     try:
92     self.output_file = []
93    
94     tmp = cfg_params['CMSSW.output_file']
95     if tmp != '':
96     tmpOutFiles = string.split(cfg_params['CMSSW.output_file'],',')
97     log.debug(7, 'cmssw::cmssw(): output files '+str(tmpOutFiles))
98     for tmp in tmpOutFiles:
99     tmp=string.strip(tmp)
100     self.output_file.append(tmp)
101     pass
102     else:
103     log.message("No output file defined: only stdout/err will be available")
104     pass
105     pass
106     except KeyError:
107     log.message("No output file defined: only stdout/err will be available")
108     pass
109    
110     # script_exe file as additional file in inputSandbox
111     try:
112 gutsche 1.3 self.scriptExe = cfg_params['USER.script_exe']
113 slacapra 1.1 self.additional_inbox_files.append(self.scriptExe)
114     except KeyError:
115     pass
116     if self.scriptExe != '':
117     if os.path.isfile(self.scriptExe):
118     pass
119     else:
120     log.message("WARNING. file "+self.scriptExe+" not found")
121     sys.exit()
122    
123     ## additional input files
124     try:
125     tmpAddFiles = string.split(cfg_params['CMSSW.additional_input_files'],',')
126     for tmp in tmpAddFiles:
127 gutsche 1.3 if not os.path.exists(tmp):
128     raise CrabException("Additional input file not found: "+tmp)
129 slacapra 1.1 tmp=string.strip(tmp)
130     self.additional_inbox_files.append(tmp)
131     pass
132     pass
133     except KeyError:
134     pass
135    
136 slacapra 1.9 # files per job
137 slacapra 1.1 try:
138 gutsche 1.3 self.filesPerJob = int(cfg_params['CMSSW.files_per_jobs']) #Daniele
139 slacapra 1.9 self.selectFilesPerJob = 1
140 gutsche 1.3 except KeyError:
141 slacapra 1.9 self.filesPerJob = 0
142     self.selectFilesPerJob = 0
143 gutsche 1.3
144 slacapra 1.9 ## Events per job
145 gutsche 1.3 try:
146 slacapra 1.9 self.eventsPerJob =int( cfg_params['CMSSW.event_per_job'])
147     self.selectEventsPerJob = 1
148 gutsche 1.3 except KeyError:
149 slacapra 1.9 self.eventsPerJob = -1
150     self.selectEventsPerJob = 0
151    
152     if (self.selectFilesPerJob == self.selectEventsPerJob):
153     msg = 'Must define either files_per_jobs or event_per_job'
154     raise CrabException(msg)
155    
156 gutsche 1.3 try:
157 slacapra 1.1 self.total_number_of_events = int(cfg_params['CMSSW.total_number_of_events'])
158     except KeyError:
159 gutsche 1.3 msg = 'Must define total_number_of_events'
160 slacapra 1.1 raise CrabException(msg)
161    
162     CEBlackList = []
163     try:
164     tmpBad = string.split(cfg_params['EDG.ce_black_list'],',')
165     for tmp in tmpBad:
166     tmp=string.strip(tmp)
167     CEBlackList.append(tmp)
168     except KeyError:
169     pass
170    
171     self.reCEBlackList=[]
172     for bad in CEBlackList:
173     self.reCEBlackList.append(re.compile( bad ))
174    
175     common.logger.debug(5,'CEBlackList: '+str(CEBlackList))
176    
177     CEWhiteList = []
178     try:
179     tmpGood = string.split(cfg_params['EDG.ce_white_list'],',')
180     for tmp in tmpGood:
181     tmp=string.strip(tmp)
182     CEWhiteList.append(tmp)
183     except KeyError:
184     pass
185    
186     #print 'CEWhiteList: ',CEWhiteList
187     self.reCEWhiteList=[]
188     for Good in CEWhiteList:
189     self.reCEWhiteList.append(re.compile( Good ))
190    
191     common.logger.debug(5,'CEWhiteList: '+str(CEWhiteList))
192    
193 gutsche 1.3 self.PsetEdit = PsetManipulator.PsetManipulator(self.pset) #Daniele Pset
194    
195 slacapra 1.1 #DBSDLS-start
196     ## Initialize the variables that are extracted from DBS/DLS and needed in other places of the code
197     self.maxEvents=0 # max events available ( --> check the requested nb. of evts in Creator.py)
198     self.DBSPaths={} # all dbs paths requested ( --> input to the site local discovery script)
199     ## Perform the data location and discovery (based on DBS/DLS)
200 slacapra 1.9 ## SL: Don't if NONE is specified as input (pythia use case)
201     common.analisys_common_info['sites']=None
202     if self.datasetPath:
203     self.DataDiscoveryAndLocation(cfg_params)
204 slacapra 1.1 #DBSDLS-end
205    
206     self.tgzNameWithPath = self.getTarBall(self.executable)
207    
208 slacapra 1.9 ## Select Splitting
209     if self.selectFilesPerJob: self.jobSplittingPerFiles()
210     elif self.selectEventsPerJob: self.jobSplittingPerEvents()
211     else:
212     msg = 'Don\'t know how to split...'
213     raise CrabException(msg)
214    
215     self.PsetEdit.maxEvent(self.eventsPerJob) #Daniele
216 gutsche 1.3 self.PsetEdit.inputModule("INPUT") #Daniele
217     self.PsetEdit.psetWriter(self.configFilename())
218 gutsche 1.5
219 gutsche 1.3
220 slacapra 1.1 def DataDiscoveryAndLocation(self, cfg_params):
221    
222 gutsche 1.3 common.logger.debug(10,"CMSSW::DataDiscoveryAndLocation()")
223    
224     datasetPath=self.datasetPath
225    
226     ## TODO
227     dataTiersList = ""
228     dataTiers = dataTiersList.split(',')
229 slacapra 1.1
230     ## Contact the DBS
231     try:
232 afanfani 1.4 self.pubdata=DataDiscovery_EDM.DataDiscovery_EDM(datasetPath, dataTiers, cfg_params)
233 slacapra 1.1 self.pubdata.fetchDBSInfo()
234    
235 gutsche 1.3 except DataDiscovery_EDM.NotExistingDatasetError, ex :
236 slacapra 1.1 msg = 'ERROR ***: failed Data Discovery in DBS : %s'%ex.getErrorMessage()
237     raise CrabException(msg)
238    
239 gutsche 1.3 except DataDiscovery_EDM.NoDataTierinProvenanceError, ex :
240 slacapra 1.1 msg = 'ERROR ***: failed Data Discovery in DBS : %s'%ex.getErrorMessage()
241     raise CrabException(msg)
242 gutsche 1.3 except DataDiscovery_EDM.DataDiscoveryError, ex:
243 slacapra 1.1 msg = 'ERROR ***: failed Data Discovery in DBS %s'%ex.getErrorMessage()
244     raise CrabException(msg)
245    
246     ## get list of all required data in the form of dbs paths (dbs path = /dataset/datatier/owner)
247 gutsche 1.3 ## self.DBSPaths=self.pubdata.getDBSPaths()
248     common.logger.message("Required data are :"+self.datasetPath)
249    
250     filesbyblock=self.pubdata.getFiles()
251     self.AllInputFiles=filesbyblock.values()
252     self.files = self.AllInputFiles
253    
254     ## TEMP
255     # self.filesTmp = filesbyblock.values()
256     # self.files = []
257     # locPath='rfio:cmsbose2.bo.infn.it:/flatfiles/SE00/cms/fanfani/ProdTest/'
258     # locPath=''
259     # tmp = []
260     # for file in self.filesTmp[0]:
261     # tmp.append(locPath+file)
262     # self.files.append(tmp)
263     ## END TEMP
264 slacapra 1.1
265     ## get max number of events
266 gutsche 1.3 #common.logger.debug(10,"number of events for primary fileblocks %i"%self.pubdata.getMaxEvents())
267 slacapra 1.1 self.maxEvents=self.pubdata.getMaxEvents() ## self.maxEvents used in Creator.py
268     common.logger.message("\nThe number of available events is %s"%self.maxEvents)
269    
270     ## Contact the DLS and build a list of sites hosting the fileblocks
271     try:
272 gutsche 1.6 dataloc=DataLocation_EDM.DataLocation_EDM(filesbyblock.keys(),cfg_params)
273     dataloc.fetchDLSInfo()
274 gutsche 1.3 except DataLocation_EDM.DataLocationError , ex:
275 slacapra 1.1 msg = 'ERROR ***: failed Data Location in DLS \n %s '%ex.getErrorMessage()
276     raise CrabException(msg)
277    
278     allsites=dataloc.getSites()
279     common.logger.debug(5,"sites are %s"%allsites)
280     sites=self.checkBlackList(allsites)
281     common.logger.debug(5,"sites are (after black list) %s"%sites)
282     sites=self.checkWhiteList(sites)
283     common.logger.debug(5,"sites are (after white list) %s"%sites)
284    
285     if len(sites)==0:
286     msg = 'No sites hosting all the needed data! Exiting... '
287     raise CrabException(msg)
288 gutsche 1.3
289 slacapra 1.1 common.logger.message("List of Sites hosting the data : "+str(sites))
290     common.logger.debug(6, "List of Sites: "+str(sites))
291     common.analisys_common_info['sites']=sites ## used in SchedulerEdg.py in createSchScript
292 gutsche 1.5 self.setParam_('TargetCE', ','.join(sites))
293 slacapra 1.1 return
294 gutsche 1.3
295 slacapra 1.9 def jobSplittingPerFiles(self):
296     """
297     Perform job splitting based on number of files to be accessed per job
298 gutsche 1.3 """
299 slacapra 1.9 common.logger.debug(5,'Splitting per input files')
300 gutsche 1.3 common.logger.message('Required '+str(self.filesPerJob)+' files per job ')
301     common.logger.message('Required '+str(self.total_number_of_events)+' events in total ')
302    
303     ## TODO: SL need to have (from DBS) a detailed list of how many events per each file
304     n_tot_files = (len(self.files[0]))
305     ## SL: this is wrong if the files have different number of events
306     evPerFile = int(self.maxEvents)/n_tot_files
307    
308     common.logger.debug(5,'Events per File '+str(evPerFile))
309    
310     ## if asked to process all events, do it
311     if self.total_number_of_events == -1:
312     self.total_number_of_events=self.maxEvents
313     self.total_number_of_jobs = int(n_tot_files)*1/int(self.filesPerJob)
314     common.logger.message(str(self.total_number_of_jobs)+' jobs will be created for all available events '+str(self.total_number_of_events)+' events')
315    
316     else:
317     self.total_number_of_files = int(self.total_number_of_events/evPerFile)
318     ## SL: if ask for less event than what is computed to be available on a
319     ## file, process the first file anyhow.
320     if self.total_number_of_files == 0:
321     self.total_number_of_files = self.total_number_of_files + 1
322    
323     common.logger.debug(5,'N files '+str(self.total_number_of_files))
324    
325     check = 0
326    
327     ## Compute the number of jobs
328     #self.total_number_of_jobs = int(n_tot_files)*1/int(self.filesPerJob)
329     self.total_number_of_jobs = int(self.total_number_of_files/self.filesPerJob)
330     common.logger.debug(5,'N jobs '+str(self.total_number_of_jobs))
331    
332     ## is there any remainder?
333     check = int(self.total_number_of_files) - (int(self.total_number_of_jobs)*self.filesPerJob)
334    
335     common.logger.debug(5,'Check '+str(check))
336    
337     if check > 0:
338     self.total_number_of_jobs = self.total_number_of_jobs + 1
339     common.logger.message('Warning: last job will be created with '+str(check)+' files')
340    
341     common.logger.message(str(self.total_number_of_jobs)+' jobs will be created for a total of '+str((self.total_number_of_jobs-1)*self.filesPerJob*evPerFile + check*evPerFile)+' events')
342     pass
343    
344     list_of_lists = []
345     for i in xrange(0, int(n_tot_files), self.filesPerJob):
346 slacapra 1.9 parString = "\\{"
347    
348     params = self.files[0][i: i+self.filesPerJob]
349     for i in range(len(params) - 1):
350     parString += '\\\"' + params[i] + '\\\"\,'
351    
352     parString += '\\\"' + params[len(params) - 1] + '\\\"\\}'
353     list_of_lists.append(parString)
354     pass
355    
356     self.list_of_args = list_of_lists
357     print self.list_of_args
358     return
359    
360     def jobSplittingPerEvents(self):
361     """
362     Perform job splitting based on number of event per job
363     """
364     common.logger.debug(5,'Splitting per events')
365     common.logger.message('Required '+str(self.eventsPerJob)+' events per job ')
366     common.logger.message('Required '+str(self.total_number_of_events)+' events in total ')
367    
368     self.total_number_of_jobs = int(self.total_number_of_events/self.eventsPerJob)
369    
370     common.logger.debug(5,'N jobs '+str(self.total_number_of_jobs))
371    
372     # is there any remainder?
373     check = int(self.total_number_of_events) - (int(self.total_number_of_jobs)*self.eventsPerJob)
374    
375     common.logger.debug(5,'Check '+str(check))
376    
377     if check > 0:
378     common.logger.message('Warning: asked '+self.total_number_of_events+' but will do only '+(int(self.total_number_of_jobs)*self.eventsPerJob))
379    
380     common.logger.message(str(self.total_number_of_jobs)+' jobs will be created for a total of '+str(self.total_number_of_jobs*self.eventsPerJob)+' events')
381    
382     self.list_of_args = []
383     for i in range(self.total_number_of_jobs):
384     self.list_of_args.append(i)
385     print self.list_of_args
386 gutsche 1.3
387     return
388    
389     def split(self, jobParams):
390    
391     common.jobDB.load()
392     #### Fabio
393     njobs = self.total_number_of_jobs
394 slacapra 1.9 arglist = self.list_of_args
395 gutsche 1.3 # create the empty structure
396     for i in range(njobs):
397     jobParams.append("")
398    
399     for job in range(njobs):
400 slacapra 1.9 jobParams[job] = str(arglist[job])
401 gutsche 1.3 common.jobDB.setArguments(job, jobParams[job])
402    
403     common.jobDB.save()
404     return
405    
406     def getJobTypeArguments(self, nj, sched):
407 slacapra 1.9 return common.jobDB.arguments(nj)
408 gutsche 1.3
409     def numberOfJobs(self):
410     # Fabio
411     return self.total_number_of_jobs
412    
413 slacapra 1.1 def checkBlackList(self, allSites):
414     if len(self.reCEBlackList)==0: return allSites
415     sites = []
416     for site in allSites:
417     common.logger.debug(10,'Site '+site)
418     good=1
419     for re in self.reCEBlackList:
420     if re.search(site):
421     common.logger.message('CE in black list, skipping site '+site)
422     good=0
423     pass
424     if good: sites.append(site)
425     if len(sites) == 0:
426     common.logger.debug(3,"No sites found after BlackList")
427     return sites
428    
429 gutsche 1.3 def checkWhiteList(self, allSites):
430 slacapra 1.1
431 gutsche 1.3 if len(self.reCEWhiteList)==0: return allSites
432 slacapra 1.1 sites = []
433 gutsche 1.3 for site in allSites:
434 slacapra 1.1 good=0
435     for re in self.reCEWhiteList:
436     if re.search(site):
437     common.logger.debug(5,'CE in white list, adding site '+site)
438     good=1
439     if not good: continue
440     sites.append(site)
441     if len(sites) == 0:
442     common.logger.message("No sites found after WhiteList\n")
443     else:
444     common.logger.debug(5,"Selected sites via WhiteList are "+str(sites)+"\n")
445     return sites
446    
447     def getTarBall(self, exe):
448     """
449     Return the TarBall with lib and exe
450     """
451    
452     # if it exist, just return it
453     self.tgzNameWithPath = common.work_space.shareDir()+self.tgz_name
454     if os.path.exists(self.tgzNameWithPath):
455     return self.tgzNameWithPath
456    
457     # Prepare a tar gzipped file with user binaries.
458     self.buildTar_(exe)
459    
460     return string.strip(self.tgzNameWithPath)
461    
462     def buildTar_(self, executable):
463    
464     # First of all declare the user Scram area
465     swArea = self.scram.getSWArea_()
466     #print "swArea = ", swArea
467     swVersion = self.scram.getSWVersion()
468     #print "swVersion = ", swVersion
469     swReleaseTop = self.scram.getReleaseTop_()
470     #print "swReleaseTop = ", swReleaseTop
471    
472     ## check if working area is release top
473     if swReleaseTop == '' or swArea == swReleaseTop:
474     return
475    
476     filesToBeTarred = []
477     ## First find the executable
478     if (self.executable != ''):
479     exeWithPath = self.scram.findFile_(executable)
480     # print exeWithPath
481     if ( not exeWithPath ):
482     raise CrabException('User executable '+executable+' not found')
483    
484     ## then check if it's private or not
485     if exeWithPath.find(swReleaseTop) == -1:
486     # the exe is private, so we must ship
487     common.logger.debug(5,"Exe "+exeWithPath+" to be tarred")
488     path = swArea+'/'
489     exe = string.replace(exeWithPath, path,'')
490     filesToBeTarred.append(exe)
491     pass
492     else:
493     # the exe is from release, we'll find it on WN
494     pass
495    
496     ## Now get the libraries: only those in local working area
497     libDir = 'lib'
498     lib = swArea+'/' +libDir
499     common.logger.debug(5,"lib "+lib+" to be tarred")
500     if os.path.exists(lib):
501     filesToBeTarred.append(libDir)
502    
503 gutsche 1.3 ## Now check if module dir is present
504     moduleDir = 'module'
505     if os.path.isdir(swArea+'/'+moduleDir):
506     filesToBeTarred.append(moduleDir)
507    
508 slacapra 1.1 ## Now check if the Data dir is present
509     dataDir = 'src/Data/'
510     if os.path.isdir(swArea+'/'+dataDir):
511     filesToBeTarred.append(dataDir)
512    
513     ## Create the tar-ball
514     if len(filesToBeTarred)>0:
515     cwd = os.getcwd()
516     os.chdir(swArea)
517     tarcmd = 'tar zcvf ' + self.tgzNameWithPath + ' '
518     for line in filesToBeTarred:
519     tarcmd = tarcmd + line + ' '
520     cout = runCommand(tarcmd)
521     if not cout:
522     raise CrabException('Could not create tar-ball')
523     os.chdir(cwd)
524     else:
525     common.logger.debug(5,"No files to be to be tarred")
526    
527     return
528    
529     def wsSetupEnvironment(self, nj):
530     """
531     Returns part of a job script which prepares
532     the execution environment for the job 'nj'.
533     """
534     # Prepare JobType-independent part
535 gutsche 1.3 txt = ''
536    
537     ## OLI_Daniele at this level middleware already known
538    
539     txt += 'if [ $middleware == LCG ]; then \n'
540     txt += self.wsSetupCMSLCGEnvironment_()
541     txt += 'elif [ $middleware == OSG ]; then\n'
542     txt += ' time=`date -u +"%s"`\n'
543     txt += ' WORKING_DIR=$OSG_WN_TMP/cms_$time\n'
544     txt += ' echo "Creating working directory: $WORKING_DIR"\n'
545     txt += ' /bin/mkdir -p $WORKING_DIR\n'
546     txt += ' if [ ! -d $WORKING_DIR ] ;then\n'
547 gutsche 1.7 txt += ' echo "SET_CMS_ENV 10016 ==> OSG $WORKING_DIR could not be created on WN `hostname`"\n'
548     txt += ' echo "JOB_EXIT_STATUS = 10016"\n'
549     txt += ' echo "JobExitCode=10016" | tee -a $RUNTIME_AREA/$repo\n'
550     txt += ' dumpStatus $RUNTIME_AREA/$repo\n'
551 gutsche 1.3 txt += ' exit 1\n'
552     txt += ' fi\n'
553     txt += '\n'
554     txt += ' echo "Change to working directory: $WORKING_DIR"\n'
555     txt += ' cd $WORKING_DIR\n'
556     txt += self.wsSetupCMSOSGEnvironment_()
557     txt += 'fi\n'
558 slacapra 1.1
559     # Prepare JobType-specific part
560     scram = self.scram.commandName()
561     txt += '\n\n'
562     txt += 'echo "### SPECIFIC JOB SETUP ENVIRONMENT ###"\n'
563     txt += scram+' project CMSSW '+self.version+'\n'
564     txt += 'status=$?\n'
565     txt += 'if [ $status != 0 ] ; then\n'
566 gutsche 1.7 txt += ' echo "SET_EXE_ENV 10034 ==>ERROR CMSSW '+self.version+' not found on `hostname`" \n'
567 gutsche 1.3 txt += ' echo "JOB_EXIT_STATUS = 10034"\n'
568 gutsche 1.7 txt += ' echo "JobExitCode=10034" | tee -a $RUNTIME_AREA/$repo\n'
569 slacapra 1.1 txt += ' dumpStatus $RUNTIME_AREA/$repo\n'
570 gutsche 1.3 ## OLI_Daniele
571     txt += ' if [ $middleware == OSG ]; then \n'
572     txt += ' echo "Remove working directory: $WORKING_DIR"\n'
573     txt += ' cd $RUNTIME_AREA\n'
574     txt += ' /bin/rm -rf $WORKING_DIR\n'
575     txt += ' if [ -d $WORKING_DIR ] ;then\n'
576 gutsche 1.7 txt += ' echo "SET_CMS_ENV 10018 ==> OSG $WORKING_DIR could not be deleted on WN `hostname` after CMSSW CMSSW_0_6_1 not found on `hostname`"\n'
577     txt += ' echo "JOB_EXIT_STATUS = 10018"\n'
578     txt += ' echo "JobExitCode=10018" | tee -a $RUNTIME_AREA/$repo\n'
579     txt += ' dumpStatus $RUNTIME_AREA/$repo\n'
580 gutsche 1.3 txt += ' fi\n'
581     txt += ' fi \n'
582     txt += ' exit 1 \n'
583 slacapra 1.1 txt += 'fi \n'
584     txt += 'echo "CMSSW_VERSION = '+self.version+'"\n'
585     txt += 'cd '+self.version+'\n'
586     ### needed grep for bug in scramv1 ###
587     txt += 'eval `'+scram+' runtime -sh | grep -v SCRAMRT_LSB_JOBNAME`\n'
588    
589     # Handle the arguments:
590     txt += "\n"
591 gutsche 1.7 txt += "## number of arguments (first argument always jobnumber)\n"
592 slacapra 1.1 txt += "\n"
593     txt += "narg=$#\n"
594 gutsche 1.3 txt += "if [ $narg -lt 2 ]\n"
595 slacapra 1.1 txt += "then\n"
596     txt += " echo 'SET_EXE_ENV 1 ==> ERROR Too few arguments' +$narg+ \n"
597 gutsche 1.3 txt += ' echo "JOB_EXIT_STATUS = 50113"\n'
598 gutsche 1.7 txt += ' echo "JobExitCode=50113" | tee -a $RUNTIME_AREA/$repo\n'
599 slacapra 1.1 txt += ' dumpStatus $RUNTIME_AREA/$repo\n'
600 gutsche 1.3 ## OLI_Daniele
601     txt += ' if [ $middleware == OSG ]; then \n'
602     txt += ' echo "Remove working directory: $WORKING_DIR"\n'
603     txt += ' cd $RUNTIME_AREA\n'
604     txt += ' /bin/rm -rf $WORKING_DIR\n'
605     txt += ' if [ -d $WORKING_DIR ] ;then\n'
606 gutsche 1.7 txt += ' echo "SET_EXE_ENV 50114 ==> OSG $WORKING_DIR could not be deleted on WN `hostname` after Too few arguments for CRAB job wrapper"\n'
607     txt += ' echo "JOB_EXIT_STATUS = 50114"\n'
608     txt += ' echo "JobExitCode=50114" | tee -a $RUNTIME_AREA/$repo\n'
609     txt += ' dumpStatus $RUNTIME_AREA/$repo\n'
610 gutsche 1.3 txt += ' fi\n'
611     txt += ' fi \n'
612 slacapra 1.1 txt += " exit 1\n"
613     txt += "fi\n"
614     txt += "\n"
615    
616     # Prepare job-specific part
617     job = common.job_list[nj]
618     pset = os.path.basename(job.configFilename())
619     txt += '\n'
620 gutsche 1.7 txt += 'InputFiles=$2\n'
621     txt += 'echo "<$InputFiles>"\n'
622 gutsche 1.3 #txt += 'echo sed "s#{\'INPUT\'}#$InputFiles#" $RUNTIME_AREA/'+pset+' \n'
623     txt += 'sed "s#{\'INPUT\'}#$InputFiles#" $RUNTIME_AREA/'+pset+' > pset.cfg\n'
624     #txt += 'sed "s#{\'INPUT\'}#${InputFiles}#" $RUNTIME_AREA/'+pset+' > pset1.cfg\n'
625 slacapra 1.1
626     if len(self.additional_inbox_files) > 0:
627     for file in self.additional_inbox_files:
628     txt += 'if [ -e $RUNTIME_AREA/'+file+' ] ; then\n'
629     txt += ' cp $RUNTIME_AREA/'+file+' .\n'
630     txt += ' chmod +x '+file+'\n'
631     txt += 'fi\n'
632     pass
633    
634     txt += 'echo "### END JOB SETUP ENVIRONMENT ###"\n\n'
635    
636     txt += '\n'
637     txt += 'echo "***** cat pset.cfg *********"\n'
638     txt += 'cat pset.cfg\n'
639     txt += 'echo "****** end pset.cfg ********"\n'
640 gutsche 1.3 txt += '\n'
641     # txt += 'echo "***** cat pset1.cfg *********"\n'
642     # txt += 'cat pset1.cfg\n'
643     # txt += 'echo "****** end pset1.cfg ********"\n'
644     return txt
645    
646     def wsBuildExe(self, nj):
647     """
648     Put in the script the commands to build an executable
649     or a library.
650     """
651    
652     txt = ""
653    
654     if os.path.isfile(self.tgzNameWithPath):
655     txt += 'echo "tar xzvf $RUNTIME_AREA/'+os.path.basename(self.tgzNameWithPath)+'"\n'
656     txt += 'tar xzvf $RUNTIME_AREA/'+os.path.basename(self.tgzNameWithPath)+'\n'
657     txt += 'untar_status=$? \n'
658     txt += 'if [ $untar_status -ne 0 ]; then \n'
659     txt += ' echo "SET_EXE 1 ==> ERROR Untarring .tgz file failed"\n'
660     txt += ' echo "JOB_EXIT_STATUS = $untar_status" \n'
661 gutsche 1.7 txt += ' echo "JobExitCode=$untar_status" | tee -a $RUNTIME_AREA/$repo\n'
662 gutsche 1.3 txt += ' if [ $middleware == OSG ]; then \n'
663     txt += ' echo "Remove working directory: $WORKING_DIR"\n'
664     txt += ' cd $RUNTIME_AREA\n'
665     txt += ' /bin/rm -rf $WORKING_DIR\n'
666     txt += ' if [ -d $WORKING_DIR ] ;then\n'
667 gutsche 1.7 txt += ' echo "SET_EXE 50999 ==> OSG $WORKING_DIR could not be deleted on WN `hostname` after Untarring .tgz file failed"\n'
668     txt += ' echo "JOB_EXIT_STATUS = 50999"\n'
669     txt += ' echo "JobExitCode=50999" | tee -a $RUNTIME_AREA/$repo\n'
670     txt += ' dumpStatus $RUNTIME_AREA/$repo\n'
671 gutsche 1.3 txt += ' fi\n'
672     txt += ' fi \n'
673     txt += ' \n'
674 gutsche 1.7 txt += ' exit 1 \n'
675 gutsche 1.3 txt += 'else \n'
676     txt += ' echo "Successful untar" \n'
677     txt += 'fi \n'
678     pass
679    
680 slacapra 1.1 return txt
681    
682     def modifySteeringCards(self, nj):
683     """
684     modify the card provided by the user,
685     writing a new card into share dir
686     """
687    
688     def executableName(self):
689     return self.executable
690    
691     def executableArgs(self):
692 gutsche 1.3 return " -p pset.cfg"
693 slacapra 1.1
694     def inputSandbox(self, nj):
695     """
696     Returns a list of filenames to be put in JDL input sandbox.
697     """
698     inp_box = []
699     # dict added to delete duplicate from input sandbox file list
700     seen = {}
701     ## code
702     if os.path.isfile(self.tgzNameWithPath):
703     inp_box.append(self.tgzNameWithPath)
704     ## config
705     inp_box.append(common.job_list[nj].configFilename())
706     ## additional input files
707 gutsche 1.3 #for file in self.additional_inbox_files:
708     # inp_box.append(common.work_space.cwdDir()+file)
709 slacapra 1.1 return inp_box
710    
711     def outputSandbox(self, nj):
712     """
713     Returns a list of filenames to be put in JDL output sandbox.
714     """
715     out_box = []
716    
717     stdout=common.job_list[nj].stdout()
718     stderr=common.job_list[nj].stderr()
719    
720     ## User Declared output files
721     for out in self.output_file:
722     n_out = nj + 1
723     out_box.append(self.numberFile_(out,str(n_out)))
724     return out_box
725     return []
726    
727     def prepareSteeringCards(self):
728     """
729     Make initial modifications of the user's steering card file.
730     """
731     return
732    
733     def wsRenameOutput(self, nj):
734     """
735     Returns part of a job script which renames the produced files.
736     """
737    
738     txt = '\n'
739 gutsche 1.7 txt += '# directory content\n'
740     txt += 'ls \n'
741 slacapra 1.1 file_list = ''
742     for fileWithSuffix in self.output_file:
743     output_file_num = self.numberFile_(fileWithSuffix, '$NJob')
744 gutsche 1.7 file_list=file_list+output_file_num+' '
745 slacapra 1.1 txt += '\n'
746 gutsche 1.7 txt += '# check output file\n'
747 slacapra 1.1 txt += 'ls '+fileWithSuffix+'\n'
748     txt += 'exe_result=$?\n'
749     txt += 'if [ $exe_result -ne 0 ] ; then\n'
750     txt += ' echo "ERROR: No output file to manage"\n'
751 gutsche 1.7 txt += ' echo "JOB_EXIT_STATUS = $exe_result"\n'
752     txt += ' echo "JobExitCode=60302" | tee -a $RUNTIME_AREA/$repo\n'
753     txt += ' dumpStatus $RUNTIME_AREA/$repo\n'
754 gutsche 1.3 ### OLI_DANIELE
755 gutsche 1.7 if common.scheduler.boss_scheduler_name == 'condor_g':
756     txt += ' if [ $middleware == OSG ]; then \n'
757     txt += ' echo "prepare dummy output file"\n'
758     txt += ' echo "Processing of job output failed" > $RUNTIME_AREA/'+output_file_num+'\n'
759     txt += ' fi \n'
760 slacapra 1.1 txt += 'else\n'
761     txt += ' cp '+fileWithSuffix+' $RUNTIME_AREA/'+output_file_num+'\n'
762     txt += 'fi\n'
763    
764 gutsche 1.7 txt += 'cd $RUNTIME_AREA\n'
765 slacapra 1.1 file_list=file_list[:-1]
766 slacapra 1.2 txt += 'file_list="'+file_list+'"\n'
767 gutsche 1.3 ### OLI_DANIELE
768     txt += 'if [ $middleware == OSG ]; then\n'
769     txt += ' cd $RUNTIME_AREA\n'
770     txt += ' echo "Remove working directory: $WORKING_DIR"\n'
771     txt += ' /bin/rm -rf $WORKING_DIR\n'
772     txt += ' if [ -d $WORKING_DIR ] ;then\n'
773 gutsche 1.7 txt += ' echo "SET_EXE 60999 ==> OSG $WORKING_DIR could not be deleted on WN `hostname` after cleanup of WN"\n'
774     txt += ' echo "JOB_EXIT_STATUS = 60999"\n'
775     txt += ' echo "JobExitCode=60999" | tee -a $RUNTIME_AREA/$repo\n'
776     txt += ' dumpStatus $RUNTIME_AREA/$repo\n'
777 gutsche 1.3 txt += ' fi\n'
778     txt += 'fi\n'
779     txt += '\n'
780 slacapra 1.1 return txt
781    
782     def numberFile_(self, file, txt):
783     """
784     append _'txt' before last extension of a file
785     """
786     p = string.split(file,".")
787     # take away last extension
788     name = p[0]
789     for x in p[1:-1]:
790     name=name+"."+x
791     # add "_txt"
792     if len(p)>1:
793     ext = p[len(p)-1]
794     #result = name + '_' + str(txt) + "." + ext
795     result = name + '_' + txt + "." + ext
796     else:
797     #result = name + '_' + str(txt)
798     result = name + '_' + txt
799    
800     return result
801    
802     def getRequirements(self):
803     """
804     return job requirements to add to jdl files
805     """
806     req = ''
807     if common.analisys_common_info['sites']:
808     if common.analisys_common_info['sw_version']:
809     req='Member("VO-cms-' + \
810     common.analisys_common_info['sw_version'] + \
811     '", other.GlueHostApplicationSoftwareRunTimeEnvironment)'
812     if len(common.analisys_common_info['sites'])>0:
813     req = req + ' && ('
814     for i in range(len(common.analisys_common_info['sites'])):
815     req = req + 'other.GlueCEInfoHostName == "' \
816     + common.analisys_common_info['sites'][i] + '"'
817     if ( i < (int(len(common.analisys_common_info['sites']) - 1)) ):
818     req = req + ' || '
819     req = req + ')'
820     #print "req = ", req
821     return req
822 gutsche 1.3
823     def configFilename(self):
824     """ return the config filename """
825     return self.name()+'.cfg'
826    
827     ### OLI_DANIELE
828     def wsSetupCMSOSGEnvironment_(self):
829     """
830     Returns part of a job script which is prepares
831     the execution environment and which is common for all CMS jobs.
832     """
833     txt = '\n'
834     txt += ' echo "### SETUP CMS OSG ENVIRONMENT ###"\n'
835     txt += ' if [ -f $GRID3_APP_DIR/cmssoft/cmsset_default.sh ] ;then\n'
836     txt += ' # Use $GRID3_APP_DIR/cmssoft/cmsset_default.sh to setup cms software\n'
837     txt += ' source $GRID3_APP_DIR/cmssoft/cmsset_default.sh '+self.version+'\n'
838     txt += ' elif [ -f $OSG_APP/cmssoft/cmsset_default.sh ] ;then\n'
839     txt += ' # Use $OSG_APP/cmssoft/cmsset_default.sh to setup cms software\n'
840     txt += ' source $OSG_APP/cmssoft/cmsset_default.sh '+self.version+'\n'
841     txt += ' else\n'
842     txt += ' echo "SET_CMS_ENV 10020 ==> ERROR $GRID3_APP_DIR/cmssoft/cmsset_default.sh and $OSG_APP/cmssoft/cmsset_default.sh file not found"\n'
843     txt += ' echo "JOB_EXIT_STATUS = 10020"\n'
844     txt += ' echo "JobExitCode=10020" | tee -a $RUNTIME_AREA/$repo\n'
845     txt += ' dumpStatus $RUNTIME_AREA/$repo\n'
846 gutsche 1.7 txt += ' exit 1\n'
847 gutsche 1.3 txt += '\n'
848     txt += ' echo "Remove working directory: $WORKING_DIR"\n'
849     txt += ' cd $RUNTIME_AREA\n'
850     txt += ' /bin/rm -rf $WORKING_DIR\n'
851     txt += ' if [ -d $WORKING_DIR ] ;then\n'
852 gutsche 1.7 txt += ' echo "SET_CMS_ENV 10017 ==> OSG $WORKING_DIR could not be deleted on WN `hostname` after $GRID3_APP_DIR/cmssoft/cmsset_default.sh and $OSG_APP/cmssoft/cmsset_default.sh file not found"\n'
853     txt += ' echo "JOB_EXIT_STATUS = 10017"\n'
854     txt += ' echo "JobExitCode=10017" | tee -a $RUNTIME_AREA/$repo\n'
855     txt += ' dumpStatus $RUNTIME_AREA/$repo\n'
856 gutsche 1.3 txt += ' fi\n'
857     txt += '\n'
858 gutsche 1.7 txt += ' exit 1\n'
859 gutsche 1.3 txt += ' fi\n'
860     txt += '\n'
861     txt += ' echo "SET_CMS_ENV 0 ==> setup cms environment ok"\n'
862     txt += ' echo " END SETUP CMS OSG ENVIRONMENT "\n'
863    
864     return txt
865    
866     ### OLI_DANIELE
867     def wsSetupCMSLCGEnvironment_(self):
868     """
869     Returns part of a job script which is prepares
870     the execution environment and which is common for all CMS jobs.
871     """
872     txt = ' \n'
873     txt += ' echo " ### SETUP CMS LCG ENVIRONMENT ### "\n'
874     txt += ' if [ ! $VO_CMS_SW_DIR ] ;then\n'
875     txt += ' echo "SET_CMS_ENV 10031 ==> ERROR CMS software dir not found on WN `hostname`"\n'
876     txt += ' echo "JOB_EXIT_STATUS = 10031" \n'
877     txt += ' echo "JobExitCode=10031" | tee -a $RUNTIME_AREA/$repo\n'
878     txt += ' dumpStatus $RUNTIME_AREA/$repo\n'
879 gutsche 1.7 txt += ' exit 1\n'
880 gutsche 1.3 txt += ' else\n'
881     txt += ' echo "Sourcing environment... "\n'
882     txt += ' if [ ! -s $VO_CMS_SW_DIR/cmsset_default.sh ] ;then\n'
883     txt += ' echo "SET_CMS_ENV 10020 ==> ERROR cmsset_default.sh file not found into dir $VO_CMS_SW_DIR"\n'
884     txt += ' echo "JOB_EXIT_STATUS = 10020"\n'
885     txt += ' echo "JobExitCode=10020" | tee -a $RUNTIME_AREA/$repo\n'
886     txt += ' dumpStatus $RUNTIME_AREA/$repo\n'
887 gutsche 1.7 txt += ' exit 1\n'
888 gutsche 1.3 txt += ' fi\n'
889     txt += ' echo "sourcing $VO_CMS_SW_DIR/cmsset_default.sh"\n'
890     txt += ' source $VO_CMS_SW_DIR/cmsset_default.sh\n'
891     txt += ' result=$?\n'
892     txt += ' if [ $result -ne 0 ]; then\n'
893     txt += ' echo "SET_CMS_ENV 10032 ==> ERROR problem sourcing $VO_CMS_SW_DIR/cmsset_default.sh"\n'
894     txt += ' echo "JOB_EXIT_STATUS = 10032"\n'
895     txt += ' echo "JobExitCode=10032" | tee -a $RUNTIME_AREA/$repo\n'
896     txt += ' dumpStatus $RUNTIME_AREA/$repo\n'
897 gutsche 1.7 txt += ' exit 1\n'
898 gutsche 1.3 txt += ' fi\n'
899     txt += ' fi\n'
900     txt += ' \n'
901     txt += ' string=`cat /etc/redhat-release`\n'
902     txt += ' echo $string\n'
903     txt += ' if [[ $string = *alhalla* ]]; then\n'
904     txt += ' echo "SCRAM_ARCH= $SCRAM_ARCH"\n'
905     txt += ' elif [[ $string = *Enterprise* ]] || [[ $string = *cientific* ]]; then\n'
906     txt += ' export SCRAM_ARCH=slc3_ia32_gcc323\n'
907     txt += ' echo "SCRAM_ARCH= $SCRAM_ARCH"\n'
908     txt += ' else\n'
909 gutsche 1.7 txt += ' echo "SET_CMS_ENV 10033 ==> ERROR OS unknown, LCG environment not initialized"\n'
910 gutsche 1.3 txt += ' echo "JOB_EXIT_STATUS = 10033"\n'
911     txt += ' echo "JobExitCode=10033" | tee -a $RUNTIME_AREA/$repo\n'
912     txt += ' dumpStatus $RUNTIME_AREA/$repo\n'
913 gutsche 1.7 txt += ' exit 1\n'
914 gutsche 1.3 txt += ' fi\n'
915     txt += ' echo "SET_CMS_ENV 0 ==> setup cms environment ok"\n'
916     txt += ' echo "### END SETUP CMS LCG ENVIRONMENT ###"\n'
917     return txt
918 gutsche 1.5
919     def setParam_(self, param, value):
920     self._params[param] = value
921    
922     def getParams(self):
923     return self._params
924 gutsche 1.8
925     def setTaskid_(self):
926     self._taskId = self.cfg_params['taskId']
927    
928     def getTaskid(self):
929     return self._taskId