ViewVC Help
View File | Revision Log | Show Annotations | Root Listing
root/cvsroot/COMP/CRAB/python/cms_cmssw.py
Revision: 1.8
Committed: Wed Jun 14 20:52:32 2006 UTC (18 years, 10 months ago) by gutsche
Content type: text/x-python
Branch: MAIN
Changes since 1.7: +8 -1 lines
Log Message:
added taskId to be reported at Submission time

File Contents

# User Rev Content
1 slacapra 1.1 from JobType import JobType
2     from crab_logger import Logger
3     from crab_exceptions import *
4     from crab_util import *
5     import common
6 gutsche 1.3 import PsetManipulator
7 slacapra 1.1
8 gutsche 1.3 import DBSInfo_EDM
9     import DataDiscovery_EDM
10     import DataLocation_EDM
11 slacapra 1.1 import Scram
12    
13     import os, string, re
14    
15     class Cmssw(JobType):
16     def __init__(self, cfg_params):
17     JobType.__init__(self, 'CMSSW')
18     common.logger.debug(3,'CMSSW::__init__')
19    
20     self.analisys_common_info = {}
21 gutsche 1.3 # Marco.
22     self._params = {}
23     self.cfg_params = cfg_params
24 slacapra 1.1 log = common.logger
25    
26     self.scram = Scram.Scram(cfg_params)
27     scramArea = ''
28     self.additional_inbox_files = []
29     self.scriptExe = ''
30     self.executable = ''
31     self.tgz_name = 'default.tgz'
32    
33 gutsche 1.3
34 slacapra 1.1 self.version = self.scram.getSWVersion()
35 gutsche 1.5 self.setParam_('application', self.version)
36 slacapra 1.1 common.analisys_common_info['sw_version'] = self.version
37 gutsche 1.3 ### FEDE
38     common.analisys_common_info['copy_input_data'] = 0
39     common.analisys_common_info['events_management'] = 1
40 slacapra 1.1
41     ### collect Data cards
42     try:
43 gutsche 1.3 # self.owner = cfg_params['CMSSW.owner']
44     # log.debug(6, "CMSSW::CMSSW(): owner = "+self.owner)
45     # self.dataset = cfg_params['CMSSW.dataset']
46     self.datasetPath = cfg_params['CMSSW.datasetpath']
47     log.debug(6, "CMSSW::CMSSW(): datasetPath = "+self.datasetPath)
48 slacapra 1.1 except KeyError:
49 gutsche 1.3 # msg = "Error: owner and/or dataset not defined "
50     msg = "Error: datasetpath not defined "
51 slacapra 1.1 raise CrabException(msg)
52 gutsche 1.5
53     # ML monitoring
54     # split dataset path style: /PreProdR3Minbias/SIM/GEN-SIM
55     datasetpath_split = self.datasetPath.split("/")
56     self.setParam_('dataset', datasetpath_split[1])
57     self.setParam_('owner', datasetpath_split[-1])
58 gutsche 1.8 self.setTaskid_()
59     self.setParam_('taskId', self.cfg_params['taskId'])
60 gutsche 1.5
61    
62    
63    
64 slacapra 1.1 self.dataTiers = []
65 gutsche 1.3 # try:
66     # tmpDataTiers = string.split(cfg_params['CMSSW.data_tier'],',')
67     # for tmp in tmpDataTiers:
68     # tmp=string.strip(tmp)
69     # self.dataTiers.append(tmp)
70     # pass
71     # pass
72     # except KeyError:
73     # pass
74     # log.debug(6, "Cmssw::Cmssw(): dataTiers = "+str(self.dataTiers))
75 slacapra 1.1
76     ## now the application
77     try:
78     self.executable = cfg_params['CMSSW.executable']
79 gutsche 1.5 self.setParam_('exe', self.executable)
80 slacapra 1.1 log.debug(6, "CMSSW::CMSSW(): executable = "+self.executable)
81     msg = "Default executable cmsRun overridden. Switch to " + self.executable
82     log.debug(3,msg)
83     except KeyError:
84     self.executable = 'cmsRun'
85 gutsche 1.5 self.setParam_('exe', self.executable)
86 slacapra 1.1 msg = "User executable not defined. Use cmsRun"
87     log.debug(3,msg)
88     pass
89    
90     try:
91     self.pset = cfg_params['CMSSW.pset']
92     log.debug(6, "Cmssw::Cmssw(): PSet file = "+self.pset)
93     if (not os.path.exists(self.pset)):
94     raise CrabException("User defined PSet file "+self.pset+" does not exist")
95     except KeyError:
96     raise CrabException("PSet file missing. Cannot run cmsRun ")
97    
98     # output files
99     try:
100     self.output_file = []
101    
102     tmp = cfg_params['CMSSW.output_file']
103     if tmp != '':
104     tmpOutFiles = string.split(cfg_params['CMSSW.output_file'],',')
105     log.debug(7, 'cmssw::cmssw(): output files '+str(tmpOutFiles))
106     for tmp in tmpOutFiles:
107     tmp=string.strip(tmp)
108     self.output_file.append(tmp)
109     pass
110     else:
111     log.message("No output file defined: only stdout/err will be available")
112     pass
113     pass
114     except KeyError:
115     log.message("No output file defined: only stdout/err will be available")
116     pass
117    
118     # script_exe file as additional file in inputSandbox
119     try:
120 gutsche 1.3 self.scriptExe = cfg_params['USER.script_exe']
121 slacapra 1.1 self.additional_inbox_files.append(self.scriptExe)
122     except KeyError:
123     pass
124     if self.scriptExe != '':
125     if os.path.isfile(self.scriptExe):
126     pass
127     else:
128     log.message("WARNING. file "+self.scriptExe+" not found")
129     sys.exit()
130    
131     ## additional input files
132     try:
133     tmpAddFiles = string.split(cfg_params['CMSSW.additional_input_files'],',')
134     for tmp in tmpAddFiles:
135 gutsche 1.3 if not os.path.exists(tmp):
136     raise CrabException("Additional input file not found: "+tmp)
137 slacapra 1.1 tmp=string.strip(tmp)
138     self.additional_inbox_files.append(tmp)
139     pass
140     pass
141     except KeyError:
142     pass
143    
144     try:
145 gutsche 1.3 self.filesPerJob = int(cfg_params['CMSSW.files_per_jobs']) #Daniele
146     except KeyError:
147     self.filesPerJob = 1
148    
149     ## Max event will be total_number_of_events ??? Daniele
150     try:
151     self.maxEv = cfg_params['CMSSW.event_per_job']
152     except KeyError:
153     self.maxEv = "-1"
154     ##
155     try:
156 slacapra 1.1 self.total_number_of_events = int(cfg_params['CMSSW.total_number_of_events'])
157     except KeyError:
158 gutsche 1.3 msg = 'Must define total_number_of_events'
159 slacapra 1.1 raise CrabException(msg)
160    
161     CEBlackList = []
162     try:
163     tmpBad = string.split(cfg_params['EDG.ce_black_list'],',')
164     for tmp in tmpBad:
165     tmp=string.strip(tmp)
166     CEBlackList.append(tmp)
167     except KeyError:
168     pass
169    
170     self.reCEBlackList=[]
171     for bad in CEBlackList:
172     self.reCEBlackList.append(re.compile( bad ))
173    
174     common.logger.debug(5,'CEBlackList: '+str(CEBlackList))
175    
176     CEWhiteList = []
177     try:
178     tmpGood = string.split(cfg_params['EDG.ce_white_list'],',')
179     for tmp in tmpGood:
180     tmp=string.strip(tmp)
181     CEWhiteList.append(tmp)
182     except KeyError:
183     pass
184    
185     #print 'CEWhiteList: ',CEWhiteList
186     self.reCEWhiteList=[]
187     for Good in CEWhiteList:
188     self.reCEWhiteList.append(re.compile( Good ))
189    
190     common.logger.debug(5,'CEWhiteList: '+str(CEWhiteList))
191    
192 gutsche 1.3 self.PsetEdit = PsetManipulator.PsetManipulator(self.pset) #Daniele Pset
193    
194 slacapra 1.1 #DBSDLS-start
195     ## Initialize the variables that are extracted from DBS/DLS and needed in other places of the code
196     self.maxEvents=0 # max events available ( --> check the requested nb. of evts in Creator.py)
197     self.DBSPaths={} # all dbs paths requested ( --> input to the site local discovery script)
198     ## Perform the data location and discovery (based on DBS/DLS)
199     self.DataDiscoveryAndLocation(cfg_params)
200     #DBSDLS-end
201    
202     self.tgzNameWithPath = self.getTarBall(self.executable)
203    
204 gutsche 1.3 self.jobSplitting() #Daniele job Splitting
205     self.PsetEdit.maxEvent(self.maxEv) #Daniele
206     self.PsetEdit.inputModule("INPUT") #Daniele
207     self.PsetEdit.psetWriter(self.configFilename())
208 gutsche 1.5
209    
210 gutsche 1.3
211 slacapra 1.1 def DataDiscoveryAndLocation(self, cfg_params):
212    
213 gutsche 1.3 common.logger.debug(10,"CMSSW::DataDiscoveryAndLocation()")
214    
215     #datasetPath = "/"+self.owner+"/"+self.dataTiers[0]+"/"+self.dataset
216    
217     datasetPath=self.datasetPath
218    
219     ## TODO
220     dataTiersList = ""
221     dataTiers = dataTiersList.split(',')
222 slacapra 1.1
223     ## Contact the DBS
224     try:
225 afanfani 1.4 self.pubdata=DataDiscovery_EDM.DataDiscovery_EDM(datasetPath, dataTiers, cfg_params)
226 slacapra 1.1 self.pubdata.fetchDBSInfo()
227    
228 gutsche 1.3 except DataDiscovery_EDM.NotExistingDatasetError, ex :
229 slacapra 1.1 msg = 'ERROR ***: failed Data Discovery in DBS : %s'%ex.getErrorMessage()
230     raise CrabException(msg)
231    
232 gutsche 1.3 except DataDiscovery_EDM.NoDataTierinProvenanceError, ex :
233 slacapra 1.1 msg = 'ERROR ***: failed Data Discovery in DBS : %s'%ex.getErrorMessage()
234     raise CrabException(msg)
235 gutsche 1.3 except DataDiscovery_EDM.DataDiscoveryError, ex:
236 slacapra 1.1 msg = 'ERROR ***: failed Data Discovery in DBS %s'%ex.getErrorMessage()
237     raise CrabException(msg)
238    
239     ## get list of all required data in the form of dbs paths (dbs path = /dataset/datatier/owner)
240 gutsche 1.3 ## self.DBSPaths=self.pubdata.getDBSPaths()
241     common.logger.message("Required data are :"+self.datasetPath)
242    
243     filesbyblock=self.pubdata.getFiles()
244     self.AllInputFiles=filesbyblock.values()
245     self.files = self.AllInputFiles
246    
247     ## TEMP
248     # self.filesTmp = filesbyblock.values()
249     # self.files = []
250     # locPath='rfio:cmsbose2.bo.infn.it:/flatfiles/SE00/cms/fanfani/ProdTest/'
251     # locPath=''
252     # tmp = []
253     # for file in self.filesTmp[0]:
254     # tmp.append(locPath+file)
255     # self.files.append(tmp)
256     ## END TEMP
257 slacapra 1.1
258     ## get max number of events
259 gutsche 1.3 #common.logger.debug(10,"number of events for primary fileblocks %i"%self.pubdata.getMaxEvents())
260 slacapra 1.1 self.maxEvents=self.pubdata.getMaxEvents() ## self.maxEvents used in Creator.py
261     common.logger.message("\nThe number of available events is %s"%self.maxEvents)
262    
263     ## Contact the DLS and build a list of sites hosting the fileblocks
264     try:
265 gutsche 1.6 dataloc=DataLocation_EDM.DataLocation_EDM(filesbyblock.keys(),cfg_params)
266     dataloc.fetchDLSInfo()
267 gutsche 1.3 except DataLocation_EDM.DataLocationError , ex:
268 slacapra 1.1 msg = 'ERROR ***: failed Data Location in DLS \n %s '%ex.getErrorMessage()
269     raise CrabException(msg)
270    
271     allsites=dataloc.getSites()
272     common.logger.debug(5,"sites are %s"%allsites)
273     sites=self.checkBlackList(allsites)
274     common.logger.debug(5,"sites are (after black list) %s"%sites)
275     sites=self.checkWhiteList(sites)
276     common.logger.debug(5,"sites are (after white list) %s"%sites)
277    
278     if len(sites)==0:
279     msg = 'No sites hosting all the needed data! Exiting... '
280     raise CrabException(msg)
281 gutsche 1.3
282 slacapra 1.1 common.logger.message("List of Sites hosting the data : "+str(sites))
283     common.logger.debug(6, "List of Sites: "+str(sites))
284     common.analisys_common_info['sites']=sites ## used in SchedulerEdg.py in createSchScript
285 gutsche 1.5 self.setParam_('TargetCE', ','.join(sites))
286 slacapra 1.1 return
287 gutsche 1.3
288     def jobSplitting(self):
289     """
290     first implemntation for job splitting
291     """
292     # print 'eventi totali '+str(self.maxEvents)
293     # print 'eventi totali richiesti dallo user '+str(self.total_number_of_events)
294     #print 'files per job '+str(self.filesPerJob)
295     common.logger.message('Required '+str(self.filesPerJob)+' files per job ')
296     common.logger.message('Required '+str(self.total_number_of_events)+' events in total ')
297    
298     ## TODO: SL need to have (from DBS) a detailed list of how many events per each file
299     n_tot_files = (len(self.files[0]))
300     ## SL: this is wrong if the files have different number of events
301     evPerFile = int(self.maxEvents)/n_tot_files
302    
303     common.logger.debug(5,'Events per File '+str(evPerFile))
304    
305     ## if asked to process all events, do it
306     if self.total_number_of_events == -1:
307     self.total_number_of_events=self.maxEvents
308     self.total_number_of_jobs = int(n_tot_files)*1/int(self.filesPerJob)
309     common.logger.message(str(self.total_number_of_jobs)+' jobs will be created for all available events '+str(self.total_number_of_events)+' events')
310    
311     else:
312     self.total_number_of_files = int(self.total_number_of_events/evPerFile)
313     ## SL: if ask for less event than what is computed to be available on a
314     ## file, process the first file anyhow.
315     if self.total_number_of_files == 0:
316     self.total_number_of_files = self.total_number_of_files + 1
317    
318     common.logger.debug(5,'N files '+str(self.total_number_of_files))
319    
320     check = 0
321    
322     ## Compute the number of jobs
323     #self.total_number_of_jobs = int(n_tot_files)*1/int(self.filesPerJob)
324     self.total_number_of_jobs = int(self.total_number_of_files/self.filesPerJob)
325     common.logger.debug(5,'N jobs '+str(self.total_number_of_jobs))
326    
327     ## is there any remainder?
328     check = int(self.total_number_of_files) - (int(self.total_number_of_jobs)*self.filesPerJob)
329    
330     common.logger.debug(5,'Check '+str(check))
331    
332     if check > 0:
333     self.total_number_of_jobs = self.total_number_of_jobs + 1
334     common.logger.message('Warning: last job will be created with '+str(check)+' files')
335    
336     common.logger.message(str(self.total_number_of_jobs)+' jobs will be created for a total of '+str((self.total_number_of_jobs-1)*self.filesPerJob*evPerFile + check*evPerFile)+' events')
337     pass
338    
339     list_of_lists = []
340     for i in xrange(0, int(n_tot_files), self.filesPerJob):
341     list_of_lists.append(self.files[0][i: i+self.filesPerJob])
342    
343     self.list_of_files = list_of_lists
344    
345     return
346    
347     def split(self, jobParams):
348    
349     common.jobDB.load()
350     #### Fabio
351     njobs = self.total_number_of_jobs
352     filelist = self.list_of_files
353     # create the empty structure
354     for i in range(njobs):
355     jobParams.append("")
356    
357     for job in range(njobs):
358     jobParams[job] = filelist[job]
359     common.jobDB.setArguments(job, jobParams[job])
360    
361     common.jobDB.save()
362     return
363    
364     def getJobTypeArguments(self, nj, sched):
365     params = common.jobDB.arguments(nj)
366     #print params
367     parString = "\\{"
368    
369     for i in range(len(params) - 1):
370     parString += '\\\"' + params[i] + '\\\"\,'
371 slacapra 1.1
372 gutsche 1.3 parString += '\\\"' + params[len(params) - 1] + '\\\"\\}'
373     return parString
374    
375     def numberOfJobs(self):
376     # Fabio
377    
378     return self.total_number_of_jobs
379    
380    
381    
382 slacapra 1.1 def checkBlackList(self, allSites):
383     if len(self.reCEBlackList)==0: return allSites
384     sites = []
385     for site in allSites:
386     common.logger.debug(10,'Site '+site)
387     good=1
388     for re in self.reCEBlackList:
389     if re.search(site):
390     common.logger.message('CE in black list, skipping site '+site)
391     good=0
392     pass
393     if good: sites.append(site)
394     if len(sites) == 0:
395     common.logger.debug(3,"No sites found after BlackList")
396     return sites
397    
398 gutsche 1.3 def checkWhiteList(self, allSites):
399 slacapra 1.1
400 gutsche 1.3 if len(self.reCEWhiteList)==0: return allSites
401 slacapra 1.1 sites = []
402 gutsche 1.3 for site in allSites:
403 slacapra 1.1 good=0
404     for re in self.reCEWhiteList:
405     if re.search(site):
406     common.logger.debug(5,'CE in white list, adding site '+site)
407     good=1
408     if not good: continue
409     sites.append(site)
410     if len(sites) == 0:
411     common.logger.message("No sites found after WhiteList\n")
412     else:
413     common.logger.debug(5,"Selected sites via WhiteList are "+str(sites)+"\n")
414     return sites
415    
416     def getTarBall(self, exe):
417     """
418     Return the TarBall with lib and exe
419     """
420    
421     # if it exist, just return it
422     self.tgzNameWithPath = common.work_space.shareDir()+self.tgz_name
423     if os.path.exists(self.tgzNameWithPath):
424     return self.tgzNameWithPath
425    
426     # Prepare a tar gzipped file with user binaries.
427     self.buildTar_(exe)
428    
429     return string.strip(self.tgzNameWithPath)
430    
431     def buildTar_(self, executable):
432    
433     # First of all declare the user Scram area
434     swArea = self.scram.getSWArea_()
435     #print "swArea = ", swArea
436     swVersion = self.scram.getSWVersion()
437     #print "swVersion = ", swVersion
438     swReleaseTop = self.scram.getReleaseTop_()
439     #print "swReleaseTop = ", swReleaseTop
440    
441     ## check if working area is release top
442     if swReleaseTop == '' or swArea == swReleaseTop:
443     return
444    
445     filesToBeTarred = []
446     ## First find the executable
447     if (self.executable != ''):
448     exeWithPath = self.scram.findFile_(executable)
449     # print exeWithPath
450     if ( not exeWithPath ):
451     raise CrabException('User executable '+executable+' not found')
452    
453     ## then check if it's private or not
454     if exeWithPath.find(swReleaseTop) == -1:
455     # the exe is private, so we must ship
456     common.logger.debug(5,"Exe "+exeWithPath+" to be tarred")
457     path = swArea+'/'
458     exe = string.replace(exeWithPath, path,'')
459     filesToBeTarred.append(exe)
460     pass
461     else:
462     # the exe is from release, we'll find it on WN
463     pass
464    
465     ## Now get the libraries: only those in local working area
466     libDir = 'lib'
467     lib = swArea+'/' +libDir
468     common.logger.debug(5,"lib "+lib+" to be tarred")
469     if os.path.exists(lib):
470     filesToBeTarred.append(libDir)
471    
472 gutsche 1.3 ## Now check if module dir is present
473     moduleDir = 'module'
474     if os.path.isdir(swArea+'/'+moduleDir):
475     filesToBeTarred.append(moduleDir)
476    
477 slacapra 1.1 ## Now check if the Data dir is present
478     dataDir = 'src/Data/'
479     if os.path.isdir(swArea+'/'+dataDir):
480     filesToBeTarred.append(dataDir)
481    
482     ## Create the tar-ball
483     if len(filesToBeTarred)>0:
484     cwd = os.getcwd()
485     os.chdir(swArea)
486     tarcmd = 'tar zcvf ' + self.tgzNameWithPath + ' '
487     for line in filesToBeTarred:
488     tarcmd = tarcmd + line + ' '
489     cout = runCommand(tarcmd)
490     if not cout:
491     raise CrabException('Could not create tar-ball')
492     os.chdir(cwd)
493     else:
494     common.logger.debug(5,"No files to be to be tarred")
495    
496     return
497    
498     def wsSetupEnvironment(self, nj):
499     """
500     Returns part of a job script which prepares
501     the execution environment for the job 'nj'.
502     """
503     # Prepare JobType-independent part
504 gutsche 1.3 txt = ''
505    
506     ## OLI_Daniele at this level middleware already known
507    
508     txt += 'if [ $middleware == LCG ]; then \n'
509     txt += self.wsSetupCMSLCGEnvironment_()
510     txt += 'elif [ $middleware == OSG ]; then\n'
511     txt += ' time=`date -u +"%s"`\n'
512     txt += ' WORKING_DIR=$OSG_WN_TMP/cms_$time\n'
513     txt += ' echo "Creating working directory: $WORKING_DIR"\n'
514     txt += ' /bin/mkdir -p $WORKING_DIR\n'
515     txt += ' if [ ! -d $WORKING_DIR ] ;then\n'
516 gutsche 1.7 txt += ' echo "SET_CMS_ENV 10016 ==> OSG $WORKING_DIR could not be created on WN `hostname`"\n'
517     txt += ' echo "JOB_EXIT_STATUS = 10016"\n'
518     txt += ' echo "JobExitCode=10016" | tee -a $RUNTIME_AREA/$repo\n'
519     txt += ' dumpStatus $RUNTIME_AREA/$repo\n'
520 gutsche 1.3 txt += ' exit 1\n'
521     txt += ' fi\n'
522     txt += '\n'
523     txt += ' echo "Change to working directory: $WORKING_DIR"\n'
524     txt += ' cd $WORKING_DIR\n'
525     txt += self.wsSetupCMSOSGEnvironment_()
526     txt += 'fi\n'
527 slacapra 1.1
528     # Prepare JobType-specific part
529     scram = self.scram.commandName()
530     txt += '\n\n'
531     txt += 'echo "### SPECIFIC JOB SETUP ENVIRONMENT ###"\n'
532     txt += scram+' project CMSSW '+self.version+'\n'
533     txt += 'status=$?\n'
534     txt += 'if [ $status != 0 ] ; then\n'
535 gutsche 1.7 txt += ' echo "SET_EXE_ENV 10034 ==>ERROR CMSSW '+self.version+' not found on `hostname`" \n'
536 gutsche 1.3 txt += ' echo "JOB_EXIT_STATUS = 10034"\n'
537 gutsche 1.7 txt += ' echo "JobExitCode=10034" | tee -a $RUNTIME_AREA/$repo\n'
538 slacapra 1.1 txt += ' dumpStatus $RUNTIME_AREA/$repo\n'
539 gutsche 1.3 ## OLI_Daniele
540     txt += ' if [ $middleware == OSG ]; then \n'
541     txt += ' echo "Remove working directory: $WORKING_DIR"\n'
542     txt += ' cd $RUNTIME_AREA\n'
543     txt += ' /bin/rm -rf $WORKING_DIR\n'
544     txt += ' if [ -d $WORKING_DIR ] ;then\n'
545 gutsche 1.7 txt += ' echo "SET_CMS_ENV 10018 ==> OSG $WORKING_DIR could not be deleted on WN `hostname` after CMSSW CMSSW_0_6_1 not found on `hostname`"\n'
546     txt += ' echo "JOB_EXIT_STATUS = 10018"\n'
547     txt += ' echo "JobExitCode=10018" | tee -a $RUNTIME_AREA/$repo\n'
548     txt += ' dumpStatus $RUNTIME_AREA/$repo\n'
549 gutsche 1.3 txt += ' fi\n'
550     txt += ' fi \n'
551     txt += ' exit 1 \n'
552 slacapra 1.1 txt += 'fi \n'
553     txt += 'echo "CMSSW_VERSION = '+self.version+'"\n'
554     txt += 'cd '+self.version+'\n'
555     ### needed grep for bug in scramv1 ###
556     txt += 'eval `'+scram+' runtime -sh | grep -v SCRAMRT_LSB_JOBNAME`\n'
557    
558     # Handle the arguments:
559     txt += "\n"
560 gutsche 1.7 txt += "## number of arguments (first argument always jobnumber)\n"
561 slacapra 1.1 txt += "\n"
562     txt += "narg=$#\n"
563 gutsche 1.3 txt += "if [ $narg -lt 2 ]\n"
564 slacapra 1.1 txt += "then\n"
565     txt += " echo 'SET_EXE_ENV 1 ==> ERROR Too few arguments' +$narg+ \n"
566 gutsche 1.3 txt += ' echo "JOB_EXIT_STATUS = 50113"\n'
567 gutsche 1.7 txt += ' echo "JobExitCode=50113" | tee -a $RUNTIME_AREA/$repo\n'
568 slacapra 1.1 txt += ' dumpStatus $RUNTIME_AREA/$repo\n'
569 gutsche 1.3 ## OLI_Daniele
570     txt += ' if [ $middleware == OSG ]; then \n'
571     txt += ' echo "Remove working directory: $WORKING_DIR"\n'
572     txt += ' cd $RUNTIME_AREA\n'
573     txt += ' /bin/rm -rf $WORKING_DIR\n'
574     txt += ' if [ -d $WORKING_DIR ] ;then\n'
575 gutsche 1.7 txt += ' echo "SET_EXE_ENV 50114 ==> OSG $WORKING_DIR could not be deleted on WN `hostname` after Too few arguments for CRAB job wrapper"\n'
576     txt += ' echo "JOB_EXIT_STATUS = 50114"\n'
577     txt += ' echo "JobExitCode=50114" | tee -a $RUNTIME_AREA/$repo\n'
578     txt += ' dumpStatus $RUNTIME_AREA/$repo\n'
579 gutsche 1.3 txt += ' fi\n'
580     txt += ' fi \n'
581 slacapra 1.1 txt += " exit 1\n"
582     txt += "fi\n"
583     txt += "\n"
584    
585     # Prepare job-specific part
586     job = common.job_list[nj]
587     pset = os.path.basename(job.configFilename())
588     txt += '\n'
589 gutsche 1.7 txt += 'InputFiles=$2\n'
590     txt += 'echo "<$InputFiles>"\n'
591 gutsche 1.3 #txt += 'echo sed "s#{\'INPUT\'}#$InputFiles#" $RUNTIME_AREA/'+pset+' \n'
592     txt += 'sed "s#{\'INPUT\'}#$InputFiles#" $RUNTIME_AREA/'+pset+' > pset.cfg\n'
593     #txt += 'sed "s#{\'INPUT\'}#${InputFiles}#" $RUNTIME_AREA/'+pset+' > pset1.cfg\n'
594 slacapra 1.1
595     if len(self.additional_inbox_files) > 0:
596     for file in self.additional_inbox_files:
597     txt += 'if [ -e $RUNTIME_AREA/'+file+' ] ; then\n'
598     txt += ' cp $RUNTIME_AREA/'+file+' .\n'
599     txt += ' chmod +x '+file+'\n'
600     txt += 'fi\n'
601     pass
602    
603     txt += 'echo "### END JOB SETUP ENVIRONMENT ###"\n\n'
604    
605     txt += '\n'
606     txt += 'echo "***** cat pset.cfg *********"\n'
607     txt += 'cat pset.cfg\n'
608     txt += 'echo "****** end pset.cfg ********"\n'
609 gutsche 1.3 txt += '\n'
610     # txt += 'echo "***** cat pset1.cfg *********"\n'
611     # txt += 'cat pset1.cfg\n'
612     # txt += 'echo "****** end pset1.cfg ********"\n'
613     return txt
614    
615     def wsBuildExe(self, nj):
616     """
617     Put in the script the commands to build an executable
618     or a library.
619     """
620    
621     txt = ""
622    
623     if os.path.isfile(self.tgzNameWithPath):
624     txt += 'echo "tar xzvf $RUNTIME_AREA/'+os.path.basename(self.tgzNameWithPath)+'"\n'
625     txt += 'tar xzvf $RUNTIME_AREA/'+os.path.basename(self.tgzNameWithPath)+'\n'
626     txt += 'untar_status=$? \n'
627     txt += 'if [ $untar_status -ne 0 ]; then \n'
628     txt += ' echo "SET_EXE 1 ==> ERROR Untarring .tgz file failed"\n'
629     txt += ' echo "JOB_EXIT_STATUS = $untar_status" \n'
630 gutsche 1.7 txt += ' echo "JobExitCode=$untar_status" | tee -a $RUNTIME_AREA/$repo\n'
631 gutsche 1.3 txt += ' if [ $middleware == OSG ]; then \n'
632     txt += ' echo "Remove working directory: $WORKING_DIR"\n'
633     txt += ' cd $RUNTIME_AREA\n'
634     txt += ' /bin/rm -rf $WORKING_DIR\n'
635     txt += ' if [ -d $WORKING_DIR ] ;then\n'
636 gutsche 1.7 txt += ' echo "SET_EXE 50999 ==> OSG $WORKING_DIR could not be deleted on WN `hostname` after Untarring .tgz file failed"\n'
637     txt += ' echo "JOB_EXIT_STATUS = 50999"\n'
638     txt += ' echo "JobExitCode=50999" | tee -a $RUNTIME_AREA/$repo\n'
639     txt += ' dumpStatus $RUNTIME_AREA/$repo\n'
640 gutsche 1.3 txt += ' fi\n'
641     txt += ' fi \n'
642     txt += ' \n'
643 gutsche 1.7 txt += ' exit 1 \n'
644 gutsche 1.3 txt += 'else \n'
645     txt += ' echo "Successful untar" \n'
646     txt += 'fi \n'
647     pass
648    
649 slacapra 1.1 return txt
650    
651     def modifySteeringCards(self, nj):
652     """
653     modify the card provided by the user,
654     writing a new card into share dir
655     """
656    
657     def executableName(self):
658     return self.executable
659    
660     def executableArgs(self):
661 gutsche 1.3 return " -p pset.cfg"
662 slacapra 1.1
663     def inputSandbox(self, nj):
664     """
665     Returns a list of filenames to be put in JDL input sandbox.
666     """
667     inp_box = []
668     # dict added to delete duplicate from input sandbox file list
669     seen = {}
670     ## code
671     if os.path.isfile(self.tgzNameWithPath):
672     inp_box.append(self.tgzNameWithPath)
673     ## config
674     inp_box.append(common.job_list[nj].configFilename())
675     ## additional input files
676 gutsche 1.3 #for file in self.additional_inbox_files:
677     # inp_box.append(common.work_space.cwdDir()+file)
678 slacapra 1.1 return inp_box
679    
680     def outputSandbox(self, nj):
681     """
682     Returns a list of filenames to be put in JDL output sandbox.
683     """
684     out_box = []
685    
686     stdout=common.job_list[nj].stdout()
687     stderr=common.job_list[nj].stderr()
688    
689     ## User Declared output files
690     for out in self.output_file:
691     n_out = nj + 1
692     out_box.append(self.numberFile_(out,str(n_out)))
693     return out_box
694     return []
695    
696     def prepareSteeringCards(self):
697     """
698     Make initial modifications of the user's steering card file.
699     """
700     return
701    
702     def wsRenameOutput(self, nj):
703     """
704     Returns part of a job script which renames the produced files.
705     """
706    
707     txt = '\n'
708 gutsche 1.7 txt += '# directory content\n'
709     txt += 'ls \n'
710 slacapra 1.1 file_list = ''
711     for fileWithSuffix in self.output_file:
712     output_file_num = self.numberFile_(fileWithSuffix, '$NJob')
713 gutsche 1.7 file_list=file_list+output_file_num+' '
714 slacapra 1.1 txt += '\n'
715 gutsche 1.7 txt += '# check output file\n'
716 slacapra 1.1 txt += 'ls '+fileWithSuffix+'\n'
717     txt += 'exe_result=$?\n'
718     txt += 'if [ $exe_result -ne 0 ] ; then\n'
719     txt += ' echo "ERROR: No output file to manage"\n'
720 gutsche 1.7 txt += ' echo "JOB_EXIT_STATUS = $exe_result"\n'
721     txt += ' echo "JobExitCode=60302" | tee -a $RUNTIME_AREA/$repo\n'
722     txt += ' dumpStatus $RUNTIME_AREA/$repo\n'
723 gutsche 1.3 ### OLI_DANIELE
724 gutsche 1.7 if common.scheduler.boss_scheduler_name == 'condor_g':
725     txt += ' if [ $middleware == OSG ]; then \n'
726     txt += ' echo "prepare dummy output file"\n'
727     txt += ' echo "Processing of job output failed" > $RUNTIME_AREA/'+output_file_num+'\n'
728     txt += ' fi \n'
729 slacapra 1.1 txt += 'else\n'
730     txt += ' cp '+fileWithSuffix+' $RUNTIME_AREA/'+output_file_num+'\n'
731     txt += 'fi\n'
732    
733 gutsche 1.7 txt += 'cd $RUNTIME_AREA\n'
734 slacapra 1.1 file_list=file_list[:-1]
735 slacapra 1.2 txt += 'file_list="'+file_list+'"\n'
736 gutsche 1.3 ### OLI_DANIELE
737     txt += 'if [ $middleware == OSG ]; then\n'
738     txt += ' cd $RUNTIME_AREA\n'
739     txt += ' echo "Remove working directory: $WORKING_DIR"\n'
740     txt += ' /bin/rm -rf $WORKING_DIR\n'
741     txt += ' if [ -d $WORKING_DIR ] ;then\n'
742 gutsche 1.7 txt += ' echo "SET_EXE 60999 ==> OSG $WORKING_DIR could not be deleted on WN `hostname` after cleanup of WN"\n'
743     txt += ' echo "JOB_EXIT_STATUS = 60999"\n'
744     txt += ' echo "JobExitCode=60999" | tee -a $RUNTIME_AREA/$repo\n'
745     txt += ' dumpStatus $RUNTIME_AREA/$repo\n'
746 gutsche 1.3 txt += ' fi\n'
747     txt += 'fi\n'
748     txt += '\n'
749 slacapra 1.1 return txt
750    
751     def numberFile_(self, file, txt):
752     """
753     append _'txt' before last extension of a file
754     """
755     p = string.split(file,".")
756     # take away last extension
757     name = p[0]
758     for x in p[1:-1]:
759     name=name+"."+x
760     # add "_txt"
761     if len(p)>1:
762     ext = p[len(p)-1]
763     #result = name + '_' + str(txt) + "." + ext
764     result = name + '_' + txt + "." + ext
765     else:
766     #result = name + '_' + str(txt)
767     result = name + '_' + txt
768    
769     return result
770    
771     def getRequirements(self):
772     """
773     return job requirements to add to jdl files
774     """
775     req = ''
776     if common.analisys_common_info['sites']:
777     if common.analisys_common_info['sw_version']:
778     req='Member("VO-cms-' + \
779     common.analisys_common_info['sw_version'] + \
780     '", other.GlueHostApplicationSoftwareRunTimeEnvironment)'
781     if len(common.analisys_common_info['sites'])>0:
782     req = req + ' && ('
783     for i in range(len(common.analisys_common_info['sites'])):
784     req = req + 'other.GlueCEInfoHostName == "' \
785     + common.analisys_common_info['sites'][i] + '"'
786     if ( i < (int(len(common.analisys_common_info['sites']) - 1)) ):
787     req = req + ' || '
788     req = req + ')'
789     #print "req = ", req
790     return req
791 gutsche 1.3
792     def configFilename(self):
793     """ return the config filename """
794     return self.name()+'.cfg'
795    
796     ### OLI_DANIELE
797     def wsSetupCMSOSGEnvironment_(self):
798     """
799     Returns part of a job script which is prepares
800     the execution environment and which is common for all CMS jobs.
801     """
802     txt = '\n'
803     txt += ' echo "### SETUP CMS OSG ENVIRONMENT ###"\n'
804     txt += ' if [ -f $GRID3_APP_DIR/cmssoft/cmsset_default.sh ] ;then\n'
805     txt += ' # Use $GRID3_APP_DIR/cmssoft/cmsset_default.sh to setup cms software\n'
806     txt += ' source $GRID3_APP_DIR/cmssoft/cmsset_default.sh '+self.version+'\n'
807     txt += ' elif [ -f $OSG_APP/cmssoft/cmsset_default.sh ] ;then\n'
808     txt += ' # Use $OSG_APP/cmssoft/cmsset_default.sh to setup cms software\n'
809     txt += ' source $OSG_APP/cmssoft/cmsset_default.sh '+self.version+'\n'
810     txt += ' else\n'
811     txt += ' echo "SET_CMS_ENV 10020 ==> ERROR $GRID3_APP_DIR/cmssoft/cmsset_default.sh and $OSG_APP/cmssoft/cmsset_default.sh file not found"\n'
812     txt += ' echo "JOB_EXIT_STATUS = 10020"\n'
813     txt += ' echo "JobExitCode=10020" | tee -a $RUNTIME_AREA/$repo\n'
814     txt += ' dumpStatus $RUNTIME_AREA/$repo\n'
815 gutsche 1.7 txt += ' exit 1\n'
816 gutsche 1.3 txt += '\n'
817     txt += ' echo "Remove working directory: $WORKING_DIR"\n'
818     txt += ' cd $RUNTIME_AREA\n'
819     txt += ' /bin/rm -rf $WORKING_DIR\n'
820     txt += ' if [ -d $WORKING_DIR ] ;then\n'
821 gutsche 1.7 txt += ' echo "SET_CMS_ENV 10017 ==> OSG $WORKING_DIR could not be deleted on WN `hostname` after $GRID3_APP_DIR/cmssoft/cmsset_default.sh and $OSG_APP/cmssoft/cmsset_default.sh file not found"\n'
822     txt += ' echo "JOB_EXIT_STATUS = 10017"\n'
823     txt += ' echo "JobExitCode=10017" | tee -a $RUNTIME_AREA/$repo\n'
824     txt += ' dumpStatus $RUNTIME_AREA/$repo\n'
825 gutsche 1.3 txt += ' fi\n'
826     txt += '\n'
827 gutsche 1.7 txt += ' exit 1\n'
828 gutsche 1.3 txt += ' fi\n'
829     txt += '\n'
830     txt += ' echo "SET_CMS_ENV 0 ==> setup cms environment ok"\n'
831     txt += ' echo " END SETUP CMS OSG ENVIRONMENT "\n'
832    
833     return txt
834    
835     ### OLI_DANIELE
836     def wsSetupCMSLCGEnvironment_(self):
837     """
838     Returns part of a job script which is prepares
839     the execution environment and which is common for all CMS jobs.
840     """
841     txt = ' \n'
842     txt += ' echo " ### SETUP CMS LCG ENVIRONMENT ### "\n'
843     txt += ' if [ ! $VO_CMS_SW_DIR ] ;then\n'
844     txt += ' echo "SET_CMS_ENV 10031 ==> ERROR CMS software dir not found on WN `hostname`"\n'
845     txt += ' echo "JOB_EXIT_STATUS = 10031" \n'
846     txt += ' echo "JobExitCode=10031" | tee -a $RUNTIME_AREA/$repo\n'
847     txt += ' dumpStatus $RUNTIME_AREA/$repo\n'
848 gutsche 1.7 txt += ' exit 1\n'
849 gutsche 1.3 txt += ' else\n'
850     txt += ' echo "Sourcing environment... "\n'
851     txt += ' if [ ! -s $VO_CMS_SW_DIR/cmsset_default.sh ] ;then\n'
852     txt += ' echo "SET_CMS_ENV 10020 ==> ERROR cmsset_default.sh file not found into dir $VO_CMS_SW_DIR"\n'
853     txt += ' echo "JOB_EXIT_STATUS = 10020"\n'
854     txt += ' echo "JobExitCode=10020" | tee -a $RUNTIME_AREA/$repo\n'
855     txt += ' dumpStatus $RUNTIME_AREA/$repo\n'
856 gutsche 1.7 txt += ' exit 1\n'
857 gutsche 1.3 txt += ' fi\n'
858     txt += ' echo "sourcing $VO_CMS_SW_DIR/cmsset_default.sh"\n'
859     txt += ' source $VO_CMS_SW_DIR/cmsset_default.sh\n'
860     txt += ' result=$?\n'
861     txt += ' if [ $result -ne 0 ]; then\n'
862     txt += ' echo "SET_CMS_ENV 10032 ==> ERROR problem sourcing $VO_CMS_SW_DIR/cmsset_default.sh"\n'
863     txt += ' echo "JOB_EXIT_STATUS = 10032"\n'
864     txt += ' echo "JobExitCode=10032" | tee -a $RUNTIME_AREA/$repo\n'
865     txt += ' dumpStatus $RUNTIME_AREA/$repo\n'
866 gutsche 1.7 txt += ' exit 1\n'
867 gutsche 1.3 txt += ' fi\n'
868     txt += ' fi\n'
869     txt += ' \n'
870     txt += ' string=`cat /etc/redhat-release`\n'
871     txt += ' echo $string\n'
872     txt += ' if [[ $string = *alhalla* ]]; then\n'
873     txt += ' echo "SCRAM_ARCH= $SCRAM_ARCH"\n'
874     txt += ' elif [[ $string = *Enterprise* ]] || [[ $string = *cientific* ]]; then\n'
875     txt += ' export SCRAM_ARCH=slc3_ia32_gcc323\n'
876     txt += ' echo "SCRAM_ARCH= $SCRAM_ARCH"\n'
877     txt += ' else\n'
878 gutsche 1.7 txt += ' echo "SET_CMS_ENV 10033 ==> ERROR OS unknown, LCG environment not initialized"\n'
879 gutsche 1.3 txt += ' echo "JOB_EXIT_STATUS = 10033"\n'
880     txt += ' echo "JobExitCode=10033" | tee -a $RUNTIME_AREA/$repo\n'
881     txt += ' dumpStatus $RUNTIME_AREA/$repo\n'
882 gutsche 1.7 txt += ' exit 1\n'
883 gutsche 1.3 txt += ' fi\n'
884     txt += ' echo "SET_CMS_ENV 0 ==> setup cms environment ok"\n'
885     txt += ' echo "### END SETUP CMS LCG ENVIRONMENT ###"\n'
886     return txt
887 gutsche 1.5
888     def setParam_(self, param, value):
889     self._params[param] = value
890    
891     def getParams(self):
892     return self._params
893 gutsche 1.8
894     def setTaskid_(self):
895     self._taskId = self.cfg_params['taskId']
896    
897     def getTaskid(self):
898     return self._taskId