ViewVC Help
View File | Revision Log | Show Annotations | Root Listing
root/cvsroot/COMP/CRAB/python/cms_cmssw.py
Revision: 1.6
Committed: Sun Jun 4 16:53:36 2006 UTC (18 years, 10 months ago) by gutsche
Content type: text/x-python
Branch: MAIN
Changes since 1.5: +31 -29 lines
Log Message:
moved SyncGridId creation in wrapper script to front that error exit codes during job setup are reported correctly to the DashBoard

File Contents

# User Rev Content
1 slacapra 1.1 from JobType import JobType
2     from crab_logger import Logger
3     from crab_exceptions import *
4     from crab_util import *
5     import common
6 gutsche 1.3 import PsetManipulator
7 slacapra 1.1
8 gutsche 1.3 import DBSInfo_EDM
9     import DataDiscovery_EDM
10     import DataLocation_EDM
11 slacapra 1.1 import Scram
12    
13     import os, string, re
14    
15     class Cmssw(JobType):
16     def __init__(self, cfg_params):
17     JobType.__init__(self, 'CMSSW')
18     common.logger.debug(3,'CMSSW::__init__')
19    
20     self.analisys_common_info = {}
21 gutsche 1.3 # Marco.
22     self._params = {}
23     self.cfg_params = cfg_params
24 slacapra 1.1
25     log = common.logger
26    
27     self.scram = Scram.Scram(cfg_params)
28     scramArea = ''
29     self.additional_inbox_files = []
30     self.scriptExe = ''
31     self.executable = ''
32     self.tgz_name = 'default.tgz'
33    
34 gutsche 1.3
35 slacapra 1.1 self.version = self.scram.getSWVersion()
36 gutsche 1.5 self.setParam_('application', self.version)
37 slacapra 1.1 common.analisys_common_info['sw_version'] = self.version
38 gutsche 1.3 ### FEDE
39     common.analisys_common_info['copy_input_data'] = 0
40     common.analisys_common_info['events_management'] = 1
41 slacapra 1.1
42     ### collect Data cards
43     try:
44 gutsche 1.3 # self.owner = cfg_params['CMSSW.owner']
45     # log.debug(6, "CMSSW::CMSSW(): owner = "+self.owner)
46     # self.dataset = cfg_params['CMSSW.dataset']
47     self.datasetPath = cfg_params['CMSSW.datasetpath']
48     log.debug(6, "CMSSW::CMSSW(): datasetPath = "+self.datasetPath)
49 slacapra 1.1 except KeyError:
50 gutsche 1.3 # msg = "Error: owner and/or dataset not defined "
51     msg = "Error: datasetpath not defined "
52 slacapra 1.1 raise CrabException(msg)
53 gutsche 1.5
54     # ML monitoring
55     # split dataset path style: /PreProdR3Minbias/SIM/GEN-SIM
56     datasetpath_split = self.datasetPath.split("/")
57     self.setParam_('dataset', datasetpath_split[1])
58     self.setParam_('owner', datasetpath_split[-1])
59    
60    
61    
62    
63 slacapra 1.1 self.dataTiers = []
64 gutsche 1.3 # try:
65     # tmpDataTiers = string.split(cfg_params['CMSSW.data_tier'],',')
66     # for tmp in tmpDataTiers:
67     # tmp=string.strip(tmp)
68     # self.dataTiers.append(tmp)
69     # pass
70     # pass
71     # except KeyError:
72     # pass
73     # log.debug(6, "Cmssw::Cmssw(): dataTiers = "+str(self.dataTiers))
74 slacapra 1.1
75     ## now the application
76     try:
77     self.executable = cfg_params['CMSSW.executable']
78 gutsche 1.5 self.setParam_('exe', self.executable)
79 slacapra 1.1 log.debug(6, "CMSSW::CMSSW(): executable = "+self.executable)
80     msg = "Default executable cmsRun overridden. Switch to " + self.executable
81     log.debug(3,msg)
82     except KeyError:
83     self.executable = 'cmsRun'
84 gutsche 1.5 self.setParam_('exe', self.executable)
85 slacapra 1.1 msg = "User executable not defined. Use cmsRun"
86     log.debug(3,msg)
87     pass
88    
89     try:
90     self.pset = cfg_params['CMSSW.pset']
91     log.debug(6, "Cmssw::Cmssw(): PSet file = "+self.pset)
92     if (not os.path.exists(self.pset)):
93     raise CrabException("User defined PSet file "+self.pset+" does not exist")
94     except KeyError:
95     raise CrabException("PSet file missing. Cannot run cmsRun ")
96    
97     # output files
98     try:
99     self.output_file = []
100    
101     tmp = cfg_params['CMSSW.output_file']
102     if tmp != '':
103     tmpOutFiles = string.split(cfg_params['CMSSW.output_file'],',')
104     log.debug(7, 'cmssw::cmssw(): output files '+str(tmpOutFiles))
105     for tmp in tmpOutFiles:
106     tmp=string.strip(tmp)
107     self.output_file.append(tmp)
108     pass
109     else:
110     log.message("No output file defined: only stdout/err will be available")
111     pass
112     pass
113     except KeyError:
114     log.message("No output file defined: only stdout/err will be available")
115     pass
116    
117     # script_exe file as additional file in inputSandbox
118     try:
119 gutsche 1.3 self.scriptExe = cfg_params['USER.script_exe']
120 slacapra 1.1 self.additional_inbox_files.append(self.scriptExe)
121     except KeyError:
122     pass
123     if self.scriptExe != '':
124     if os.path.isfile(self.scriptExe):
125     pass
126     else:
127     log.message("WARNING. file "+self.scriptExe+" not found")
128     sys.exit()
129    
130     ## additional input files
131     try:
132     tmpAddFiles = string.split(cfg_params['CMSSW.additional_input_files'],',')
133     for tmp in tmpAddFiles:
134 gutsche 1.3 if not os.path.exists(tmp):
135     raise CrabException("Additional input file not found: "+tmp)
136 slacapra 1.1 tmp=string.strip(tmp)
137     self.additional_inbox_files.append(tmp)
138     pass
139     pass
140     except KeyError:
141     pass
142    
143     try:
144 gutsche 1.3 self.filesPerJob = int(cfg_params['CMSSW.files_per_jobs']) #Daniele
145     except KeyError:
146     self.filesPerJob = 1
147    
148     ## Max event will be total_number_of_events ??? Daniele
149     try:
150     self.maxEv = cfg_params['CMSSW.event_per_job']
151     except KeyError:
152     self.maxEv = "-1"
153     ##
154     try:
155 slacapra 1.1 self.total_number_of_events = int(cfg_params['CMSSW.total_number_of_events'])
156     except KeyError:
157 gutsche 1.3 msg = 'Must define total_number_of_events'
158 slacapra 1.1 raise CrabException(msg)
159    
160     CEBlackList = []
161     try:
162     tmpBad = string.split(cfg_params['EDG.ce_black_list'],',')
163     for tmp in tmpBad:
164     tmp=string.strip(tmp)
165     CEBlackList.append(tmp)
166     except KeyError:
167     pass
168    
169     self.reCEBlackList=[]
170     for bad in CEBlackList:
171     self.reCEBlackList.append(re.compile( bad ))
172    
173     common.logger.debug(5,'CEBlackList: '+str(CEBlackList))
174    
175     CEWhiteList = []
176     try:
177     tmpGood = string.split(cfg_params['EDG.ce_white_list'],',')
178     for tmp in tmpGood:
179     tmp=string.strip(tmp)
180     CEWhiteList.append(tmp)
181     except KeyError:
182     pass
183    
184     #print 'CEWhiteList: ',CEWhiteList
185     self.reCEWhiteList=[]
186     for Good in CEWhiteList:
187     self.reCEWhiteList.append(re.compile( Good ))
188    
189     common.logger.debug(5,'CEWhiteList: '+str(CEWhiteList))
190    
191 gutsche 1.3 self.PsetEdit = PsetManipulator.PsetManipulator(self.pset) #Daniele Pset
192    
193 slacapra 1.1 #DBSDLS-start
194     ## Initialize the variables that are extracted from DBS/DLS and needed in other places of the code
195     self.maxEvents=0 # max events available ( --> check the requested nb. of evts in Creator.py)
196     self.DBSPaths={} # all dbs paths requested ( --> input to the site local discovery script)
197     ## Perform the data location and discovery (based on DBS/DLS)
198     self.DataDiscoveryAndLocation(cfg_params)
199     #DBSDLS-end
200    
201     self.tgzNameWithPath = self.getTarBall(self.executable)
202    
203 gutsche 1.3 self.jobSplitting() #Daniele job Splitting
204     self.PsetEdit.maxEvent(self.maxEv) #Daniele
205     self.PsetEdit.inputModule("INPUT") #Daniele
206     self.PsetEdit.psetWriter(self.configFilename())
207 gutsche 1.5
208    
209 gutsche 1.3
210 slacapra 1.1 def DataDiscoveryAndLocation(self, cfg_params):
211    
212 gutsche 1.3 common.logger.debug(10,"CMSSW::DataDiscoveryAndLocation()")
213    
214     #datasetPath = "/"+self.owner+"/"+self.dataTiers[0]+"/"+self.dataset
215    
216     datasetPath=self.datasetPath
217    
218     ## TODO
219     dataTiersList = ""
220     dataTiers = dataTiersList.split(',')
221 slacapra 1.1
222     ## Contact the DBS
223     try:
224 afanfani 1.4 self.pubdata=DataDiscovery_EDM.DataDiscovery_EDM(datasetPath, dataTiers, cfg_params)
225 slacapra 1.1 self.pubdata.fetchDBSInfo()
226    
227 gutsche 1.3 except DataDiscovery_EDM.NotExistingDatasetError, ex :
228 slacapra 1.1 msg = 'ERROR ***: failed Data Discovery in DBS : %s'%ex.getErrorMessage()
229     raise CrabException(msg)
230    
231 gutsche 1.3 except DataDiscovery_EDM.NoDataTierinProvenanceError, ex :
232 slacapra 1.1 msg = 'ERROR ***: failed Data Discovery in DBS : %s'%ex.getErrorMessage()
233     raise CrabException(msg)
234 gutsche 1.3 except DataDiscovery_EDM.DataDiscoveryError, ex:
235 slacapra 1.1 msg = 'ERROR ***: failed Data Discovery in DBS %s'%ex.getErrorMessage()
236     raise CrabException(msg)
237    
238     ## get list of all required data in the form of dbs paths (dbs path = /dataset/datatier/owner)
239 gutsche 1.3 ## self.DBSPaths=self.pubdata.getDBSPaths()
240     common.logger.message("Required data are :"+self.datasetPath)
241    
242     filesbyblock=self.pubdata.getFiles()
243     self.AllInputFiles=filesbyblock.values()
244     self.files = self.AllInputFiles
245    
246     ## TEMP
247     # self.filesTmp = filesbyblock.values()
248     # self.files = []
249     # locPath='rfio:cmsbose2.bo.infn.it:/flatfiles/SE00/cms/fanfani/ProdTest/'
250     # locPath=''
251     # tmp = []
252     # for file in self.filesTmp[0]:
253     # tmp.append(locPath+file)
254     # self.files.append(tmp)
255     ## END TEMP
256 slacapra 1.1
257     ## get max number of events
258 gutsche 1.3 #common.logger.debug(10,"number of events for primary fileblocks %i"%self.pubdata.getMaxEvents())
259 slacapra 1.1 self.maxEvents=self.pubdata.getMaxEvents() ## self.maxEvents used in Creator.py
260     common.logger.message("\nThe number of available events is %s"%self.maxEvents)
261    
262     ## Contact the DLS and build a list of sites hosting the fileblocks
263     try:
264 gutsche 1.6 dataloc=DataLocation_EDM.DataLocation_EDM(filesbyblock.keys(),cfg_params)
265     dataloc.fetchDLSInfo()
266 gutsche 1.3 except DataLocation_EDM.DataLocationError , ex:
267 slacapra 1.1 msg = 'ERROR ***: failed Data Location in DLS \n %s '%ex.getErrorMessage()
268     raise CrabException(msg)
269    
270     allsites=dataloc.getSites()
271     common.logger.debug(5,"sites are %s"%allsites)
272     sites=self.checkBlackList(allsites)
273     common.logger.debug(5,"sites are (after black list) %s"%sites)
274     sites=self.checkWhiteList(sites)
275     common.logger.debug(5,"sites are (after white list) %s"%sites)
276    
277     if len(sites)==0:
278     msg = 'No sites hosting all the needed data! Exiting... '
279     raise CrabException(msg)
280 gutsche 1.3
281 slacapra 1.1 common.logger.message("List of Sites hosting the data : "+str(sites))
282     common.logger.debug(6, "List of Sites: "+str(sites))
283     common.analisys_common_info['sites']=sites ## used in SchedulerEdg.py in createSchScript
284 gutsche 1.5 self.setParam_('TargetCE', ','.join(sites))
285 slacapra 1.1 return
286 gutsche 1.3
287     def jobSplitting(self):
288     """
289     first implemntation for job splitting
290     """
291     # print 'eventi totali '+str(self.maxEvents)
292     # print 'eventi totali richiesti dallo user '+str(self.total_number_of_events)
293     #print 'files per job '+str(self.filesPerJob)
294     common.logger.message('Required '+str(self.filesPerJob)+' files per job ')
295     common.logger.message('Required '+str(self.total_number_of_events)+' events in total ')
296    
297     ## TODO: SL need to have (from DBS) a detailed list of how many events per each file
298     n_tot_files = (len(self.files[0]))
299     ## SL: this is wrong if the files have different number of events
300     evPerFile = int(self.maxEvents)/n_tot_files
301    
302     common.logger.debug(5,'Events per File '+str(evPerFile))
303    
304     ## if asked to process all events, do it
305     if self.total_number_of_events == -1:
306     self.total_number_of_events=self.maxEvents
307     self.total_number_of_jobs = int(n_tot_files)*1/int(self.filesPerJob)
308     common.logger.message(str(self.total_number_of_jobs)+' jobs will be created for all available events '+str(self.total_number_of_events)+' events')
309    
310     else:
311     self.total_number_of_files = int(self.total_number_of_events/evPerFile)
312     ## SL: if ask for less event than what is computed to be available on a
313     ## file, process the first file anyhow.
314     if self.total_number_of_files == 0:
315     self.total_number_of_files = self.total_number_of_files + 1
316    
317     common.logger.debug(5,'N files '+str(self.total_number_of_files))
318    
319     check = 0
320    
321     ## Compute the number of jobs
322     #self.total_number_of_jobs = int(n_tot_files)*1/int(self.filesPerJob)
323     self.total_number_of_jobs = int(self.total_number_of_files/self.filesPerJob)
324     common.logger.debug(5,'N jobs '+str(self.total_number_of_jobs))
325    
326     ## is there any remainder?
327     check = int(self.total_number_of_files) - (int(self.total_number_of_jobs)*self.filesPerJob)
328    
329     common.logger.debug(5,'Check '+str(check))
330    
331     if check > 0:
332     self.total_number_of_jobs = self.total_number_of_jobs + 1
333     common.logger.message('Warning: last job will be created with '+str(check)+' files')
334    
335     common.logger.message(str(self.total_number_of_jobs)+' jobs will be created for a total of '+str((self.total_number_of_jobs-1)*self.filesPerJob*evPerFile + check*evPerFile)+' events')
336     pass
337    
338     list_of_lists = []
339     for i in xrange(0, int(n_tot_files), self.filesPerJob):
340     list_of_lists.append(self.files[0][i: i+self.filesPerJob])
341    
342     self.list_of_files = list_of_lists
343    
344     return
345    
346     def split(self, jobParams):
347    
348     common.jobDB.load()
349     #### Fabio
350     njobs = self.total_number_of_jobs
351     filelist = self.list_of_files
352     # create the empty structure
353     for i in range(njobs):
354     jobParams.append("")
355    
356     for job in range(njobs):
357     jobParams[job] = filelist[job]
358     common.jobDB.setArguments(job, jobParams[job])
359    
360     common.jobDB.save()
361     return
362    
363     def getJobTypeArguments(self, nj, sched):
364     params = common.jobDB.arguments(nj)
365     #print params
366     parString = "\\{"
367    
368     for i in range(len(params) - 1):
369     parString += '\\\"' + params[i] + '\\\"\,'
370 slacapra 1.1
371 gutsche 1.3 parString += '\\\"' + params[len(params) - 1] + '\\\"\\}'
372     return parString
373    
374     def numberOfJobs(self):
375     # Fabio
376    
377     return self.total_number_of_jobs
378    
379    
380    
381 slacapra 1.1 def checkBlackList(self, allSites):
382     if len(self.reCEBlackList)==0: return allSites
383     sites = []
384     for site in allSites:
385     common.logger.debug(10,'Site '+site)
386     good=1
387     for re in self.reCEBlackList:
388     if re.search(site):
389     common.logger.message('CE in black list, skipping site '+site)
390     good=0
391     pass
392     if good: sites.append(site)
393     if len(sites) == 0:
394     common.logger.debug(3,"No sites found after BlackList")
395     return sites
396    
397 gutsche 1.3 def checkWhiteList(self, allSites):
398 slacapra 1.1
399 gutsche 1.3 if len(self.reCEWhiteList)==0: return allSites
400 slacapra 1.1 sites = []
401 gutsche 1.3 for site in allSites:
402 slacapra 1.1 good=0
403     for re in self.reCEWhiteList:
404     if re.search(site):
405     common.logger.debug(5,'CE in white list, adding site '+site)
406     good=1
407     if not good: continue
408     sites.append(site)
409     if len(sites) == 0:
410     common.logger.message("No sites found after WhiteList\n")
411     else:
412     common.logger.debug(5,"Selected sites via WhiteList are "+str(sites)+"\n")
413     return sites
414    
415     def getTarBall(self, exe):
416     """
417     Return the TarBall with lib and exe
418     """
419    
420     # if it exist, just return it
421     self.tgzNameWithPath = common.work_space.shareDir()+self.tgz_name
422     if os.path.exists(self.tgzNameWithPath):
423     return self.tgzNameWithPath
424    
425     # Prepare a tar gzipped file with user binaries.
426     self.buildTar_(exe)
427    
428     return string.strip(self.tgzNameWithPath)
429    
430     def buildTar_(self, executable):
431    
432     # First of all declare the user Scram area
433     swArea = self.scram.getSWArea_()
434     #print "swArea = ", swArea
435     swVersion = self.scram.getSWVersion()
436     #print "swVersion = ", swVersion
437     swReleaseTop = self.scram.getReleaseTop_()
438     #print "swReleaseTop = ", swReleaseTop
439    
440     ## check if working area is release top
441     if swReleaseTop == '' or swArea == swReleaseTop:
442     return
443    
444     filesToBeTarred = []
445     ## First find the executable
446     if (self.executable != ''):
447     exeWithPath = self.scram.findFile_(executable)
448     # print exeWithPath
449     if ( not exeWithPath ):
450     raise CrabException('User executable '+executable+' not found')
451    
452     ## then check if it's private or not
453     if exeWithPath.find(swReleaseTop) == -1:
454     # the exe is private, so we must ship
455     common.logger.debug(5,"Exe "+exeWithPath+" to be tarred")
456     path = swArea+'/'
457     exe = string.replace(exeWithPath, path,'')
458     filesToBeTarred.append(exe)
459     pass
460     else:
461     # the exe is from release, we'll find it on WN
462     pass
463    
464     ## Now get the libraries: only those in local working area
465     libDir = 'lib'
466     lib = swArea+'/' +libDir
467     common.logger.debug(5,"lib "+lib+" to be tarred")
468     if os.path.exists(lib):
469     filesToBeTarred.append(libDir)
470    
471 gutsche 1.3 ## Now check if module dir is present
472     moduleDir = 'module'
473     if os.path.isdir(swArea+'/'+moduleDir):
474     filesToBeTarred.append(moduleDir)
475    
476 slacapra 1.1 ## Now check if the Data dir is present
477     dataDir = 'src/Data/'
478     if os.path.isdir(swArea+'/'+dataDir):
479     filesToBeTarred.append(dataDir)
480    
481     ## Create the tar-ball
482     if len(filesToBeTarred)>0:
483     cwd = os.getcwd()
484     os.chdir(swArea)
485     tarcmd = 'tar zcvf ' + self.tgzNameWithPath + ' '
486     for line in filesToBeTarred:
487     tarcmd = tarcmd + line + ' '
488     cout = runCommand(tarcmd)
489     if not cout:
490     raise CrabException('Could not create tar-ball')
491     os.chdir(cwd)
492     else:
493     common.logger.debug(5,"No files to be to be tarred")
494    
495     return
496    
497     def wsSetupEnvironment(self, nj):
498     """
499     Returns part of a job script which prepares
500     the execution environment for the job 'nj'.
501     """
502     # Prepare JobType-independent part
503 gutsche 1.3 txt = ''
504    
505 gutsche 1.6 ### OLI: moved to front to enable header reported before any error code is submitted
506     txt += "NJob=$1\n"
507     txt += "InputFiles=$2\n"
508     txt += "echo \"<$InputFiles>\"\n"
509     # txt += "Args = ` cat $2 | sed -e \'s/\\\\//g\' -e \'s/\"/\\x27/g\' `"
510    
511     ### OLI_DANIELE
512     txt += 'if [ $middleware == LCG ]; then \n'
513     txt += ' echo "MonitorJobID=`echo ${NJob}_$EDG_WL_JOBID`" | tee -a $RUNTIME_AREA/$repo\n'
514     txt += ' echo "SyncGridJobId=`echo $EDG_WL_JOBID`" | tee -a $RUNTIME_AREA/$repo\n'
515     txt += ' echo "SyncCE=`edg-brokerinfo getCE`" | tee -a $RUNTIME_AREA/$repo\n'
516     txt += 'elif [ $middleware == OSG ]; then\n'
517    
518     # OLI: added monitoring for dashbord, use hash of crab.cfg
519     if common.scheduler.boss_scheduler_name == 'condor_g':
520     # create hash of cfg file
521     hash = makeCksum(common.work_space.cfgFileName())
522     txt += ' echo "MonitorJobID=`echo ${NJob}_'+hash+'_$GLOBUS_GRAM_JOB_CONTACT`" | tee -a $RUNTIME_AREA/$repo\n'
523     txt += ' echo "SyncGridJobId=`echo $GLOBUS_GRAM_JOB_CONTACT`" | tee -a $RUNTIME_AREA/$repo\n'
524     txt += ' echo "SyncCE=`echo $hostname`" | tee -a $RUNTIME_AREA/$repo\n'
525     else :
526     txt += ' echo "MonitorJobID=`echo ${NJob}_$EDG_WL_JOBID`" | tee -a $RUNTIME_AREA/$repo\n'
527     txt += ' echo "SyncGridJobId=`echo $EDG_WL_JOBID`" | tee -a $RUNTIME_AREA/$repo\n'
528     txt += ' echo "SyncCE=`$EDG_WL_LOG_DESTINATION`" | tee -a $RUNTIME_AREA/$repo\n'
529    
530     txt += 'fi\n'
531     txt += 'dumpStatus $RUNTIME_AREA/$repo\n'
532     txt += '\n'
533    
534 gutsche 1.3 ## OLI_Daniele at this level middleware already known
535    
536     txt += 'if [ $middleware == LCG ]; then \n'
537     txt += self.wsSetupCMSLCGEnvironment_()
538     txt += 'elif [ $middleware == OSG ]; then\n'
539     txt += ' time=`date -u +"%s"`\n'
540     txt += ' WORKING_DIR=$OSG_WN_TMP/cms_$time\n'
541     txt += ' echo "Creating working directory: $WORKING_DIR"\n'
542     txt += ' /bin/mkdir -p $WORKING_DIR\n'
543     txt += ' if [ ! -d $WORKING_DIR ] ;then\n'
544     txt += ' echo "OSG WORKING DIR ==> $WORKING_DIR could not be created on on WN `hostname`"\n'
545    
546     txt += ' echo "JOB_EXIT_STATUS = 1"\n'
547     txt += ' exit 1\n'
548     txt += ' fi\n'
549     txt += '\n'
550     txt += ' echo "Change to working directory: $WORKING_DIR"\n'
551     txt += ' cd $WORKING_DIR\n'
552     txt += self.wsSetupCMSOSGEnvironment_()
553     txt += 'fi\n'
554 slacapra 1.1
555     # Prepare JobType-specific part
556     scram = self.scram.commandName()
557     txt += '\n\n'
558     txt += 'echo "### SPECIFIC JOB SETUP ENVIRONMENT ###"\n'
559     txt += scram+' project CMSSW '+self.version+'\n'
560     txt += 'status=$?\n'
561     txt += 'if [ $status != 0 ] ; then\n'
562     txt += ' echo "SET_EXE_ENV 1 ==>ERROR CMSSW '+self.version+' not found on `hostname`" \n'
563 gutsche 1.3 txt += ' echo "JOB_EXIT_STATUS = 10034"\n'
564     txt += ' echo "SanityCheckCode = 10034" | tee -a $RUNTIME_AREA/$repo\n'
565 slacapra 1.1 txt += ' dumpStatus $RUNTIME_AREA/$repo\n'
566 gutsche 1.3 ## OLI_Daniele
567     txt += ' if [ $middleware == OSG ]; then \n'
568     txt += ' echo "Remove working directory: $WORKING_DIR"\n'
569     txt += ' cd $RUNTIME_AREA\n'
570     txt += ' /bin/rm -rf $WORKING_DIR\n'
571     txt += ' if [ -d $WORKING_DIR ] ;then\n'
572     txt += ' echo "OSG WORKING DIR ==> $WORKING_DIR could not be deleted on on WN `hostname`"\n'
573     txt += ' fi\n'
574     txt += ' fi \n'
575     txt += ' exit 1 \n'
576 slacapra 1.1 txt += 'fi \n'
577     txt += 'echo "CMSSW_VERSION = '+self.version+'"\n'
578     txt += 'cd '+self.version+'\n'
579     ### needed grep for bug in scramv1 ###
580     txt += 'eval `'+scram+' runtime -sh | grep -v SCRAMRT_LSB_JOBNAME`\n'
581    
582     # Handle the arguments:
583     txt += "\n"
584     txt += "## ARGUMNETS: $1 Job Number\n"
585     # txt += "## ARGUMNETS: $2 First Event for this job\n"
586     # txt += "## ARGUMNETS: $3 Max Event for this job\n"
587     txt += "\n"
588     txt += "narg=$#\n"
589 gutsche 1.3 txt += "if [ $narg -lt 2 ]\n"
590 slacapra 1.1 txt += "then\n"
591     txt += " echo 'SET_EXE_ENV 1 ==> ERROR Too few arguments' +$narg+ \n"
592 gutsche 1.3 txt += ' echo "JOB_EXIT_STATUS = 50113"\n'
593     txt += ' echo "SanityCheckCode = 50113" | tee -a $RUNTIME_AREA/$repo\n'
594 slacapra 1.1 txt += ' dumpStatus $RUNTIME_AREA/$repo\n'
595 gutsche 1.3 ## OLI_Daniele
596     txt += ' if [ $middleware == OSG ]; then \n'
597     txt += ' echo "Remove working directory: $WORKING_DIR"\n'
598     txt += ' cd $RUNTIME_AREA\n'
599     txt += ' /bin/rm -rf $WORKING_DIR\n'
600     txt += ' if [ -d $WORKING_DIR ] ;then\n'
601     txt += ' echo "OSG WORKING DIR ==> $WORKING_DIR could not be deleted on on WN `hostname`"\n'
602     txt += ' fi\n'
603     txt += ' fi \n'
604 slacapra 1.1 txt += " exit 1\n"
605     txt += "fi\n"
606     txt += "\n"
607    
608     # Prepare job-specific part
609     job = common.job_list[nj]
610     pset = os.path.basename(job.configFilename())
611     txt += '\n'
612 gutsche 1.3 #txt += 'echo sed "s#{\'INPUT\'}#$InputFiles#" $RUNTIME_AREA/'+pset+' \n'
613     txt += 'sed "s#{\'INPUT\'}#$InputFiles#" $RUNTIME_AREA/'+pset+' > pset.cfg\n'
614     #txt += 'sed "s#{\'INPUT\'}#${InputFiles}#" $RUNTIME_AREA/'+pset+' > pset1.cfg\n'
615 slacapra 1.1
616     if len(self.additional_inbox_files) > 0:
617     for file in self.additional_inbox_files:
618     txt += 'if [ -e $RUNTIME_AREA/'+file+' ] ; then\n'
619     txt += ' cp $RUNTIME_AREA/'+file+' .\n'
620     txt += ' chmod +x '+file+'\n'
621     txt += 'fi\n'
622     pass
623    
624     txt += 'echo "### END JOB SETUP ENVIRONMENT ###"\n\n'
625    
626     txt += '\n'
627     txt += 'echo "***** cat pset.cfg *********"\n'
628     txt += 'cat pset.cfg\n'
629     txt += 'echo "****** end pset.cfg ********"\n'
630 gutsche 1.3 txt += '\n'
631     # txt += 'echo "***** cat pset1.cfg *********"\n'
632     # txt += 'cat pset1.cfg\n'
633     # txt += 'echo "****** end pset1.cfg ********"\n'
634     return txt
635    
636     def wsBuildExe(self, nj):
637     """
638     Put in the script the commands to build an executable
639     or a library.
640     """
641    
642     txt = ""
643    
644     if os.path.isfile(self.tgzNameWithPath):
645     txt += 'echo "tar xzvf $RUNTIME_AREA/'+os.path.basename(self.tgzNameWithPath)+'"\n'
646     txt += 'tar xzvf $RUNTIME_AREA/'+os.path.basename(self.tgzNameWithPath)+'\n'
647     txt += 'untar_status=$? \n'
648     txt += 'if [ $untar_status -ne 0 ]; then \n'
649     txt += ' echo "SET_EXE 1 ==> ERROR Untarring .tgz file failed"\n'
650     txt += ' echo "JOB_EXIT_STATUS = $untar_status" \n'
651     txt += ' echo "SanityCheckCode = $untar_status" | tee -a $repo\n'
652     txt += ' if [ $middleware == OSG ]; then \n'
653     txt += ' echo "Remove working directory: $WORKING_DIR"\n'
654     txt += ' cd $RUNTIME_AREA\n'
655     txt += ' /bin/rm -rf $WORKING_DIR\n'
656     txt += ' if [ -d $WORKING_DIR ] ;then\n'
657     txt += ' echo "OSG WORKING DIR ==> $WORKING_DIR could not be deleted on on WN `hostname`"\n'
658     txt += ' fi\n'
659     txt += ' fi \n'
660     txt += ' \n'
661     txt += ' exit $untar_status \n'
662     txt += 'else \n'
663     txt += ' echo "Successful untar" \n'
664     txt += 'fi \n'
665     pass
666    
667 slacapra 1.1 return txt
668    
669     def modifySteeringCards(self, nj):
670     """
671     modify the card provided by the user,
672     writing a new card into share dir
673     """
674    
675     def executableName(self):
676     return self.executable
677    
678     def executableArgs(self):
679 gutsche 1.3 return " -p pset.cfg"
680 slacapra 1.1
681     def inputSandbox(self, nj):
682     """
683     Returns a list of filenames to be put in JDL input sandbox.
684     """
685     inp_box = []
686     # dict added to delete duplicate from input sandbox file list
687     seen = {}
688     ## code
689     if os.path.isfile(self.tgzNameWithPath):
690     inp_box.append(self.tgzNameWithPath)
691     ## config
692     inp_box.append(common.job_list[nj].configFilename())
693     ## additional input files
694 gutsche 1.3 #for file in self.additional_inbox_files:
695     # inp_box.append(common.work_space.cwdDir()+file)
696 slacapra 1.1 return inp_box
697    
698     def outputSandbox(self, nj):
699     """
700     Returns a list of filenames to be put in JDL output sandbox.
701     """
702     out_box = []
703    
704     stdout=common.job_list[nj].stdout()
705     stderr=common.job_list[nj].stderr()
706    
707     ## User Declared output files
708     for out in self.output_file:
709     n_out = nj + 1
710     out_box.append(self.numberFile_(out,str(n_out)))
711     return out_box
712     return []
713    
714     def prepareSteeringCards(self):
715     """
716     Make initial modifications of the user's steering card file.
717     """
718     return
719    
720     def wsRenameOutput(self, nj):
721     """
722     Returns part of a job script which renames the produced files.
723     """
724    
725     txt = '\n'
726     file_list = ''
727 gutsche 1.3 check = len(self.output_file)
728     i = 0
729 slacapra 1.1 for fileWithSuffix in self.output_file:
730 gutsche 1.3 i= i + 1
731 slacapra 1.1 output_file_num = self.numberFile_(fileWithSuffix, '$NJob')
732 gutsche 1.3 file_list=file_list+output_file_num+''
733 slacapra 1.1 txt += '\n'
734     txt += 'ls \n'
735     txt += '\n'
736     txt += 'ls '+fileWithSuffix+'\n'
737     txt += 'exe_result=$?\n'
738     txt += 'if [ $exe_result -ne 0 ] ; then\n'
739     txt += ' echo "ERROR: No output file to manage"\n'
740 gutsche 1.3 ### OLI_DANIELE
741     txt += ' if [ $middleware == OSG ]; then \n'
742     txt += ' echo "prepare dummy output file"\n'
743     txt += ' cp '+fileWithSuffix+' $RUNTIME_AREA/'+output_file_num+'\n'
744     txt += ' fi \n'
745 slacapra 1.1 txt += 'else\n'
746     txt += ' cp '+fileWithSuffix+' $RUNTIME_AREA/'+output_file_num+'\n'
747     txt += 'fi\n'
748 gutsche 1.3 if i == check:
749     txt += 'cd $RUNTIME_AREA\n'
750     pass
751 slacapra 1.1 pass
752    
753     file_list=file_list[:-1]
754 slacapra 1.2 txt += 'file_list="'+file_list+'"\n'
755 gutsche 1.3 ### OLI_DANIELE
756     txt += 'if [ $middleware == OSG ]; then\n'
757     txt += ' cd $RUNTIME_AREA\n'
758     txt += ' echo "Remove working directory: $WORKING_DIR"\n'
759     txt += ' /bin/rm -rf $WORKING_DIR\n'
760     txt += ' if [ -d $WORKING_DIR ] ;then\n'
761     txt += ' echo "OSG WORKING DIR ==> $WORKING_DIR could not be deleted on on WN `hostname`"\n'
762     txt += ' fi\n'
763     txt += 'fi\n'
764     txt += '\n'
765 slacapra 1.1 return txt
766    
767     def numberFile_(self, file, txt):
768     """
769     append _'txt' before last extension of a file
770     """
771     p = string.split(file,".")
772     # take away last extension
773     name = p[0]
774     for x in p[1:-1]:
775     name=name+"."+x
776     # add "_txt"
777     if len(p)>1:
778     ext = p[len(p)-1]
779     #result = name + '_' + str(txt) + "." + ext
780     result = name + '_' + txt + "." + ext
781     else:
782     #result = name + '_' + str(txt)
783     result = name + '_' + txt
784    
785     return result
786    
787     def getRequirements(self):
788     """
789     return job requirements to add to jdl files
790     """
791     req = ''
792     if common.analisys_common_info['sites']:
793     if common.analisys_common_info['sw_version']:
794     req='Member("VO-cms-' + \
795     common.analisys_common_info['sw_version'] + \
796     '", other.GlueHostApplicationSoftwareRunTimeEnvironment)'
797     if len(common.analisys_common_info['sites'])>0:
798     req = req + ' && ('
799     for i in range(len(common.analisys_common_info['sites'])):
800     req = req + 'other.GlueCEInfoHostName == "' \
801     + common.analisys_common_info['sites'][i] + '"'
802     if ( i < (int(len(common.analisys_common_info['sites']) - 1)) ):
803     req = req + ' || '
804     req = req + ')'
805     #print "req = ", req
806     return req
807 gutsche 1.3
808     def configFilename(self):
809     """ return the config filename """
810     return self.name()+'.cfg'
811    
812     ### OLI_DANIELE
813     def wsSetupCMSOSGEnvironment_(self):
814     """
815     Returns part of a job script which is prepares
816     the execution environment and which is common for all CMS jobs.
817     """
818     txt = '\n'
819     txt += ' echo "### SETUP CMS OSG ENVIRONMENT ###"\n'
820     txt += ' if [ -f $GRID3_APP_DIR/cmssoft/cmsset_default.sh ] ;then\n'
821     txt += ' # Use $GRID3_APP_DIR/cmssoft/cmsset_default.sh to setup cms software\n'
822     txt += ' source $GRID3_APP_DIR/cmssoft/cmsset_default.sh '+self.version+'\n'
823     txt += ' elif [ -f $OSG_APP/cmssoft/cmsset_default.sh ] ;then\n'
824     txt += ' # Use $OSG_APP/cmssoft/cmsset_default.sh to setup cms software\n'
825     txt += ' source $OSG_APP/cmssoft/cmsset_default.sh '+self.version+'\n'
826     txt += ' else\n'
827     txt += ' echo "SET_CMS_ENV 10020 ==> ERROR $GRID3_APP_DIR/cmssoft/cmsset_default.sh and $OSG_APP/cmssoft/cmsset_default.sh file not found"\n'
828     txt += ' echo "JOB_EXIT_STATUS = 10020"\n'
829     txt += ' echo "JobExitCode=10020" | tee -a $RUNTIME_AREA/$repo\n'
830     txt += ' dumpStatus $RUNTIME_AREA/$repo\n'
831     txt += ' exit\n'
832     txt += '\n'
833     txt += ' echo "Remove working directory: $WORKING_DIR"\n'
834     txt += ' cd $RUNTIME_AREA\n'
835     txt += ' /bin/rm -rf $WORKING_DIR\n'
836     txt += ' if [ -d $WORKING_DIR ] ;then\n'
837     txt += ' echo "OSG WORKING DIR ==> $WORKING_DIR could not be deleted on on WN `hostname`"\n'
838     txt += ' fi\n'
839     txt += '\n'
840     txt += ' exit\n'
841     txt += ' fi\n'
842     txt += '\n'
843     txt += ' echo "SET_CMS_ENV 0 ==> setup cms environment ok"\n'
844     txt += ' echo " END SETUP CMS OSG ENVIRONMENT "\n'
845    
846     return txt
847    
848     ### OLI_DANIELE
849     def wsSetupCMSLCGEnvironment_(self):
850     """
851     Returns part of a job script which is prepares
852     the execution environment and which is common for all CMS jobs.
853     """
854     txt = ' \n'
855     txt += ' echo " ### SETUP CMS LCG ENVIRONMENT ### "\n'
856     txt += ' echo "JOB_EXIT_STATUS = 0"\n'
857     txt += ' if [ ! $VO_CMS_SW_DIR ] ;then\n'
858     txt += ' echo "SET_CMS_ENV 10031 ==> ERROR CMS software dir not found on WN `hostname`"\n'
859     txt += ' echo "JOB_EXIT_STATUS = 10031" \n'
860     txt += ' echo "JobExitCode=10031" | tee -a $RUNTIME_AREA/$repo\n'
861     txt += ' dumpStatus $RUNTIME_AREA/$repo\n'
862     txt += ' exit\n'
863     txt += ' else\n'
864     txt += ' echo "Sourcing environment... "\n'
865     txt += ' if [ ! -s $VO_CMS_SW_DIR/cmsset_default.sh ] ;then\n'
866     txt += ' echo "SET_CMS_ENV 10020 ==> ERROR cmsset_default.sh file not found into dir $VO_CMS_SW_DIR"\n'
867     txt += ' echo "JOB_EXIT_STATUS = 10020"\n'
868     txt += ' echo "JobExitCode=10020" | tee -a $RUNTIME_AREA/$repo\n'
869     txt += ' dumpStatus $RUNTIME_AREA/$repo\n'
870     txt += ' exit\n'
871     txt += ' fi\n'
872     txt += ' echo "sourcing $VO_CMS_SW_DIR/cmsset_default.sh"\n'
873     txt += ' source $VO_CMS_SW_DIR/cmsset_default.sh\n'
874     txt += ' result=$?\n'
875     txt += ' if [ $result -ne 0 ]; then\n'
876     txt += ' echo "SET_CMS_ENV 10032 ==> ERROR problem sourcing $VO_CMS_SW_DIR/cmsset_default.sh"\n'
877     txt += ' echo "JOB_EXIT_STATUS = 10032"\n'
878     txt += ' echo "JobExitCode=10032" | tee -a $RUNTIME_AREA/$repo\n'
879     txt += ' dumpStatus $RUNTIME_AREA/$repo\n'
880     txt += ' exit\n'
881     txt += ' fi\n'
882     txt += ' fi\n'
883     txt += ' \n'
884     txt += ' string=`cat /etc/redhat-release`\n'
885     txt += ' echo $string\n'
886     txt += ' if [[ $string = *alhalla* ]]; then\n'
887     txt += ' echo "SCRAM_ARCH= $SCRAM_ARCH"\n'
888     txt += ' elif [[ $string = *Enterprise* ]] || [[ $string = *cientific* ]]; then\n'
889     txt += ' export SCRAM_ARCH=slc3_ia32_gcc323\n'
890     txt += ' echo "SCRAM_ARCH= $SCRAM_ARCH"\n'
891     txt += ' else\n'
892     txt += ' echo "SET_CMS_ENV 1 ==> ERROR OS unknown, LCG environment not initialized"\n'
893     txt += ' echo "JOB_EXIT_STATUS = 10033"\n'
894     txt += ' echo "JobExitCode=10033" | tee -a $RUNTIME_AREA/$repo\n'
895     txt += ' dumpStatus $RUNTIME_AREA/$repo\n'
896     txt += ' exit\n'
897     txt += ' fi\n'
898     txt += ' echo "SET_CMS_ENV 0 ==> setup cms environment ok"\n'
899     txt += ' echo "### END SETUP CMS LCG ENVIRONMENT ###"\n'
900     return txt
901 gutsche 1.5
902     def setParam_(self, param, value):
903     self._params[param] = value
904    
905     def getParams(self):
906     return self._params