ViewVC Help
View File | Revision Log | Show Annotations | Root Listing
root/cvsroot/COMP/CRAB/python/cms_cmssw.py
Revision: 1.2
Committed: Wed Apr 26 15:31:06 2006 UTC (19 years ago) by slacapra
Content type: text/x-python
Branch: MAIN
CVS Tags: pre_cmssw_integration_20060527, CRAB_1_1_0
Changes since 1.1: +2 -2 lines
Log Message:
bug fix in copying multiple files on SE. BTW same code in 4 different classescvscheckcvscheck BAD!

File Contents

# User Rev Content
1 slacapra 1.1 from JobType import JobType
2     from crab_logger import Logger
3     from crab_exceptions import *
4     from crab_util import *
5     import common
6    
7     import DataDiscovery
8     import DataLocation
9     import Scram
10    
11     import os, string, re
12    
13     class Cmssw(JobType):
14     def __init__(self, cfg_params):
15     JobType.__init__(self, 'CMSSW')
16     common.logger.debug(3,'CMSSW::__init__')
17    
18     self.analisys_common_info = {}
19    
20     log = common.logger
21    
22     self.scram = Scram.Scram(cfg_params)
23     scramArea = ''
24     self.additional_inbox_files = []
25     self.scriptExe = ''
26     self.executable = ''
27     self.tgz_name = 'default.tgz'
28    
29     self.version = self.scram.getSWVersion()
30     common.analisys_common_info['sw_version'] = self.version
31    
32     ### collect Data cards
33     try:
34     self.owner = cfg_params['CMSSW.owner']
35     log.debug(6, "CMSSW::CMSSW(): owner = "+self.owner)
36     self.dataset = cfg_params['CMSSW.dataset']
37     log.debug(6, "CMSSW::CMSSW(): dataset = "+self.dataset)
38     except KeyError:
39     msg = "Error: owner and/or dataset not defined "
40     raise CrabException(msg)
41    
42     self.dataTiers = []
43     try:
44     tmpDataTiers = string.split(cfg_params['CMSSW.data_tier'],',')
45     for tmp in tmpDataTiers:
46     tmp=string.strip(tmp)
47     self.dataTiers.append(tmp)
48     pass
49     pass
50     except KeyError:
51     pass
52     log.debug(6, "Cmssw::Cmssw(): dataTiers = "+str(self.dataTiers))
53    
54     ## now the application
55     try:
56     self.executable = cfg_params['CMSSW.executable']
57     log.debug(6, "CMSSW::CMSSW(): executable = "+self.executable)
58     msg = "Default executable cmsRun overridden. Switch to " + self.executable
59     log.debug(3,msg)
60     except KeyError:
61     self.executable = 'cmsRun'
62     msg = "User executable not defined. Use cmsRun"
63     log.debug(3,msg)
64     pass
65    
66     try:
67     self.pset = cfg_params['CMSSW.pset']
68     log.debug(6, "Cmssw::Cmssw(): PSet file = "+self.pset)
69     if (not os.path.exists(self.pset)):
70     raise CrabException("User defined PSet file "+self.pset+" does not exist")
71     except KeyError:
72     raise CrabException("PSet file missing. Cannot run cmsRun ")
73    
74     # output files
75     try:
76     self.output_file = []
77    
78     tmp = cfg_params['CMSSW.output_file']
79     if tmp != '':
80     tmpOutFiles = string.split(cfg_params['CMSSW.output_file'],',')
81     log.debug(7, 'cmssw::cmssw(): output files '+str(tmpOutFiles))
82     for tmp in tmpOutFiles:
83     tmp=string.strip(tmp)
84     self.output_file.append(tmp)
85     pass
86    
87     else:
88     log.message("No output file defined: only stdout/err will be available")
89     pass
90     pass
91     except KeyError:
92     log.message("No output file defined: only stdout/err will be available")
93     pass
94    
95     # script_exe file as additional file in inputSandbox
96     try:
97     self.scriptExe = cfg_params['CMSSW.script_exe']
98     self.additional_inbox_files.append(self.scriptExe)
99     except KeyError:
100     pass
101     if self.scriptExe != '':
102     if os.path.isfile(self.scriptExe):
103     pass
104     else:
105     log.message("WARNING. file "+self.scriptExe+" not found")
106     sys.exit()
107    
108     ## additional input files
109     try:
110     tmpAddFiles = string.split(cfg_params['CMSSW.additional_input_files'],',')
111     for tmp in tmpAddFiles:
112     tmp=string.strip(tmp)
113     self.additional_inbox_files.append(tmp)
114     pass
115     pass
116     except KeyError:
117     pass
118    
119     try:
120     self.total_number_of_events = int(cfg_params['CMSSW.total_number_of_events'])
121     except KeyError:
122     msg = 'Must define total_number_of_events and job_number_of_events'
123     raise CrabException(msg)
124    
125     #Marco: FirstEvent is nolonger used inside PSet
126     # try:
127     # self.first = int(cfg_params['CMSSW.first_event'])
128     # except KeyError:
129     # self.first = 0
130     # pass
131     # log.debug(6, "Orca::Orca(): total number of events = "+`self.total_number_of_events`)
132     #log.debug(6, "Orca::Orca(): events per job = "+`self.job_number_of_events`)
133     # log.debug(6, "Orca::Orca(): first event = "+`self.first`)
134    
135     CEBlackList = []
136     try:
137     tmpBad = string.split(cfg_params['EDG.ce_black_list'],',')
138     for tmp in tmpBad:
139     tmp=string.strip(tmp)
140     CEBlackList.append(tmp)
141     except KeyError:
142     pass
143    
144     self.reCEBlackList=[]
145     for bad in CEBlackList:
146     self.reCEBlackList.append(re.compile( bad ))
147    
148     common.logger.debug(5,'CEBlackList: '+str(CEBlackList))
149    
150     CEWhiteList = []
151     try:
152     tmpGood = string.split(cfg_params['EDG.ce_white_list'],',')
153     #tmpGood = ['cern']
154     for tmp in tmpGood:
155     tmp=string.strip(tmp)
156     #if (tmp == 'cnaf'): tmp = 'webserver' ########## warning: temp. patch
157     CEWhiteList.append(tmp)
158     except KeyError:
159     pass
160    
161     #print 'CEWhiteList: ',CEWhiteList
162     self.reCEWhiteList=[]
163     for Good in CEWhiteList:
164     self.reCEWhiteList.append(re.compile( Good ))
165    
166     common.logger.debug(5,'CEWhiteList: '+str(CEWhiteList))
167    
168     #DBSDLS-start
169     ## Initialize the variables that are extracted from DBS/DLS and needed in other places of the code
170     self.maxEvents=0 # max events available ( --> check the requested nb. of evts in Creator.py)
171     self.DBSPaths={} # all dbs paths requested ( --> input to the site local discovery script)
172     ## Perform the data location and discovery (based on DBS/DLS)
173     self.DataDiscoveryAndLocation(cfg_params)
174     #DBSDLS-end
175    
176     self.tgzNameWithPath = self.getTarBall(self.executable)
177    
178     def DataDiscoveryAndLocation(self, cfg_params):
179    
180     fun = "CMSSW::DataDiscoveryAndLocation()"
181    
182     ## Contact the DBS
183     try:
184     self.pubdata=DataDiscovery.DataDiscovery(self.owner,
185     self.dataset,
186     self.dataTiers,
187     cfg_params)
188     self.pubdata.fetchDBSInfo()
189    
190     except DataDiscovery.NotExistingDatasetError, ex :
191     msg = 'ERROR ***: failed Data Discovery in DBS : %s'%ex.getErrorMessage()
192     raise CrabException(msg)
193    
194     except DataDiscovery.NoDataTierinProvenanceError, ex :
195     msg = 'ERROR ***: failed Data Discovery in DBS : %s'%ex.getErrorMessage()
196     raise CrabException(msg)
197     except DataDiscovery.DataDiscoveryError, ex:
198     msg = 'ERROR ***: failed Data Discovery in DBS %s'%ex.getErrorMessage()
199     raise CrabException(msg)
200    
201     ## get list of all required data in the form of dbs paths (dbs path = /dataset/datatier/owner)
202     self.DBSPaths=self.pubdata.getDBSPaths()
203     common.logger.message("Required data are : ")
204     for path in self.DBSPaths:
205     common.logger.message(" --> "+path )
206    
207     ## get max number of events
208     common.logger.debug(10,"number of events for primary fileblocks %i"%self.pubdata.getMaxEvents())
209     self.maxEvents=self.pubdata.getMaxEvents() ## self.maxEvents used in Creator.py
210     common.logger.message("\nThe number of available events is %s"%self.maxEvents)
211    
212     ## get fileblocks corresponding to the required data
213     fb=self.pubdata.getFileBlocks()
214     common.logger.debug(5,"fileblocks are %s"%fb)
215    
216     ## Contact the DLS and build a list of sites hosting the fileblocks
217     try:
218     dataloc=DataLocation.DataLocation(self.pubdata.getFileBlocks(),cfg_params)
219     dataloc.fetchDLSInfo()
220     except DataLocation.DataLocationError , ex:
221     msg = 'ERROR ***: failed Data Location in DLS \n %s '%ex.getErrorMessage()
222     raise CrabException(msg)
223    
224     allsites=dataloc.getSites()
225     common.logger.debug(5,"sites are %s"%allsites)
226     sites=self.checkBlackList(allsites)
227     common.logger.debug(5,"sites are (after black list) %s"%sites)
228     sites=self.checkWhiteList(sites)
229     common.logger.debug(5,"sites are (after white list) %s"%sites)
230    
231     if len(sites)==0:
232     msg = 'No sites hosting all the needed data! Exiting... '
233     raise CrabException(msg)
234     common.logger.message("List of Sites hosting the data : "+str(sites))
235     common.logger.debug(6, "List of Sites: "+str(sites))
236     common.analisys_common_info['sites']=sites ## used in SchedulerEdg.py in createSchScript
237     return
238    
239     def checkBlackList(self, allSites):
240     if len(self.reCEBlackList)==0: return allSites
241     sites = []
242     for site in allSites:
243     common.logger.debug(10,'Site '+site)
244     good=1
245     for re in self.reCEBlackList:
246     if re.search(site):
247     common.logger.message('CE in black list, skipping site '+site)
248     good=0
249     pass
250     if good: sites.append(site)
251     if len(sites) == 0:
252     common.logger.debug(3,"No sites found after BlackList")
253     return sites
254    
255     def checkWhiteList(self, allsites):
256    
257     if len(self.reCEWhiteList)==0: return pubDBUrls
258     sites = []
259     for site in allsites:
260     #print 'connecting to the URL ',url
261     good=0
262     for re in self.reCEWhiteList:
263     if re.search(site):
264     common.logger.debug(5,'CE in white list, adding site '+site)
265     good=1
266     if not good: continue
267     sites.append(site)
268     if len(sites) == 0:
269     common.logger.message("No sites found after WhiteList\n")
270     else:
271     common.logger.debug(5,"Selected sites via WhiteList are "+str(sites)+"\n")
272     return sites
273    
274     def getTarBall(self, exe):
275     """
276     Return the TarBall with lib and exe
277     """
278    
279     # if it exist, just return it
280     self.tgzNameWithPath = common.work_space.shareDir()+self.tgz_name
281     if os.path.exists(self.tgzNameWithPath):
282     return self.tgzNameWithPath
283    
284     # Prepare a tar gzipped file with user binaries.
285     self.buildTar_(exe)
286    
287     return string.strip(self.tgzNameWithPath)
288    
289     def buildTar_(self, executable):
290    
291     # First of all declare the user Scram area
292     swArea = self.scram.getSWArea_()
293     #print "swArea = ", swArea
294     swVersion = self.scram.getSWVersion()
295     #print "swVersion = ", swVersion
296     swReleaseTop = self.scram.getReleaseTop_()
297     #print "swReleaseTop = ", swReleaseTop
298    
299     ## check if working area is release top
300     if swReleaseTop == '' or swArea == swReleaseTop:
301     return
302    
303     filesToBeTarred = []
304     ## First find the executable
305     if (self.executable != ''):
306     exeWithPath = self.scram.findFile_(executable)
307     # print exeWithPath
308     if ( not exeWithPath ):
309     raise CrabException('User executable '+executable+' not found')
310    
311     ## then check if it's private or not
312     if exeWithPath.find(swReleaseTop) == -1:
313     # the exe is private, so we must ship
314     common.logger.debug(5,"Exe "+exeWithPath+" to be tarred")
315     path = swArea+'/'
316     exe = string.replace(exeWithPath, path,'')
317     filesToBeTarred.append(exe)
318     pass
319     else:
320     # the exe is from release, we'll find it on WN
321     pass
322    
323     ## Now get the libraries: only those in local working area
324     libDir = 'lib'
325     lib = swArea+'/' +libDir
326     common.logger.debug(5,"lib "+lib+" to be tarred")
327     if os.path.exists(lib):
328     filesToBeTarred.append(libDir)
329    
330     ## Now check if the Data dir is present
331     dataDir = 'src/Data/'
332     if os.path.isdir(swArea+'/'+dataDir):
333     filesToBeTarred.append(dataDir)
334    
335     ## Create the tar-ball
336     if len(filesToBeTarred)>0:
337     cwd = os.getcwd()
338     os.chdir(swArea)
339     tarcmd = 'tar zcvf ' + self.tgzNameWithPath + ' '
340     for line in filesToBeTarred:
341     tarcmd = tarcmd + line + ' '
342     cout = runCommand(tarcmd)
343     if not cout:
344     raise CrabException('Could not create tar-ball')
345     os.chdir(cwd)
346     else:
347     common.logger.debug(5,"No files to be to be tarred")
348    
349     return
350    
351     def wsSetupEnvironment(self, nj):
352     """
353     Returns part of a job script which prepares
354     the execution environment for the job 'nj'.
355     """
356     # Prepare JobType-independent part
357     txt = self.wsSetupCMSEnvironment_()
358    
359     # Prepare JobType-specific part
360     scram = self.scram.commandName()
361     txt += '\n\n'
362     txt += 'echo "### SPECIFIC JOB SETUP ENVIRONMENT ###"\n'
363     txt += scram+' project CMSSW '+self.version+'\n'
364     txt += 'status=$?\n'
365     txt += 'if [ $status != 0 ] ; then\n'
366     txt += ' echo "SET_EXE_ENV 1 ==>ERROR CMSSW '+self.version+' not found on `hostname`" \n'
367     txt += ' echo "JOB_EXIT_STATUS = 5"\n'
368     txt += ' echo "SanityCheckCode = 5" | tee -a $RUNTIME_AREA/$repo\n'
369     txt += ' dumpStatus $RUNTIME_AREA/$repo\n'
370     txt += ' exit 5 \n'
371     txt += 'fi \n'
372     txt += 'echo "CMSSW_VERSION = '+self.version+'"\n'
373     txt += 'cd '+self.version+'\n'
374     ### needed grep for bug in scramv1 ###
375     txt += 'eval `'+scram+' runtime -sh | grep -v SCRAMRT_LSB_JOBNAME`\n'
376    
377     # Handle the arguments:
378     txt += "\n"
379     txt += "## ARGUMNETS: $1 Job Number\n"
380     # txt += "## ARGUMNETS: $2 First Event for this job\n"
381     # txt += "## ARGUMNETS: $3 Max Event for this job\n"
382     txt += "\n"
383     txt += "narg=$#\n"
384     txt += "if [ $narg -lt 1 ]\n"
385     txt += "then\n"
386     txt += " echo 'SET_EXE_ENV 1 ==> ERROR Too few arguments' +$narg+ \n"
387     txt += ' echo "JOB_EXIT_STATUS = 1"\n'
388     txt += ' echo "SanityCheckCode = 1" | tee -a $RUNTIME_AREA/$repo\n'
389     txt += ' dumpStatus $RUNTIME_AREA/$repo\n'
390     txt += " exit 1\n"
391     txt += "fi\n"
392     txt += "\n"
393     txt += "NJob=$1\n"
394     # txt += "FirstEvent=$2\n"
395     # txt += "MaxEvents=$3\n"
396    
397     # Prepare job-specific part
398     job = common.job_list[nj]
399     pset = os.path.basename(job.configFilename())
400     txt += '\n'
401     txt += 'cp $RUNTIME_AREA/'+pset+' pset.cfg\n'
402     # txt += 'if [ -e $RUNTIME_AREA/orcarc_$CE ] ; then\n'
403     # txt += ' cat $RUNTIME_AREA/orcarc_$CE .orcarc >> .orcarc_tmp\n'
404     # txt += ' mv .orcarc_tmp .orcarc\n'
405     # txt += 'fi\n'
406     # txt += 'if [ -e $RUNTIME_AREA/init_$CE.sh ] ; then\n'
407     # txt += ' cp $RUNTIME_AREA/init_$CE.sh init.sh\n'
408     # txt += 'fi\n'
409    
410     if len(self.additional_inbox_files) > 0:
411     for file in self.additional_inbox_files:
412     txt += 'if [ -e $RUNTIME_AREA/'+file+' ] ; then\n'
413     txt += ' cp $RUNTIME_AREA/'+file+' .\n'
414     txt += ' chmod +x '+file+'\n'
415     txt += 'fi\n'
416     pass
417    
418     # txt += '\n'
419     # txt += 'chmod +x ./init.sh\n'
420     # txt += './init.sh\n'
421     # txt += 'exitStatus=$?\n'
422     # txt += 'if [ $exitStatus != 0 ] ; then\n'
423     # txt += ' echo "SET_EXE_ENV 1 ==> ERROR StageIn init script failed"\n'
424     # txt += ' echo "JOB_EXIT_STATUS = $exitStatus" \n'
425     # txt += ' echo "SanityCheckCode = $exitStatus" | tee -a $RUNTIME_AREA/$repo\n'
426     # txt += ' dumpStatus $RUNTIME_AREA/$repo\n'
427     # txt += ' exit $exitStatus\n'
428     # txt += 'fi\n'
429     # txt += "echo 'SET_EXE_ENV 0 ==> job setup ok'\n"
430     txt += 'echo "### END JOB SETUP ENVIRONMENT ###"\n\n'
431    
432     # txt += 'echo "FirstEvent=$FirstEvent" >> .orcarc\n'
433     # txt += 'echo "MaxEvents=$MaxEvents" >> .orcarc\n'
434     # if self.ML:
435     # txt += 'echo "MonalisaJobId=$NJob" >> .orcarc\n'
436    
437     txt += '\n'
438     txt += 'echo "***** cat pset.cfg *********"\n'
439     txt += 'cat pset.cfg\n'
440     txt += 'echo "****** end pset.cfg ********"\n'
441     return txt
442    
443     def modifySteeringCards(self, nj):
444     """
445     modify the card provided by the user,
446     writing a new card into share dir
447     """
448    
449     def executableName(self):
450     return self.executable
451    
452     def executableArgs(self):
453     return "-p pset.cfg"
454    
455     def inputSandbox(self, nj):
456     """
457     Returns a list of filenames to be put in JDL input sandbox.
458     """
459     inp_box = []
460     # dict added to delete duplicate from input sandbox file list
461     seen = {}
462     ## code
463     if os.path.isfile(self.tgzNameWithPath):
464     inp_box.append(self.tgzNameWithPath)
465     ## config
466     inp_box.append(common.job_list[nj].configFilename())
467     ## additional input files
468     for file in self.additional_inbox_files:
469     inp_box.append(common.work_space.cwdDir()+file)
470     #print "sono inputSandbox, inp_box = ", inp_box
471     return inp_box
472    
473     def outputSandbox(self, nj):
474     """
475     Returns a list of filenames to be put in JDL output sandbox.
476     """
477     out_box = []
478    
479     stdout=common.job_list[nj].stdout()
480     stderr=common.job_list[nj].stderr()
481    
482     ## User Declared output files
483     for out in self.output_file:
484     n_out = nj + 1
485     out_box.append(self.numberFile_(out,str(n_out)))
486     return out_box
487     return []
488    
489     def prepareSteeringCards(self):
490     """
491     Make initial modifications of the user's steering card file.
492     """
493     infile = open(self.pset,'r')
494    
495     outfile = open(common.work_space.jobDir()+self.name()+'.cfg', 'w')
496    
497     outfile.write('\n\n##### The following cards have been created by CRAB: DO NOT TOUCH #####\n')
498    
499     outfile.write('InputCollections=/System/'+self.owner+'/'+self.dataset+'/'+self.dataset+'\n')
500    
501     infile.close()
502     outfile.close()
503     return
504    
505     def wsRenameOutput(self, nj):
506     """
507     Returns part of a job script which renames the produced files.
508     """
509    
510     txt = '\n'
511     file_list = ''
512     for fileWithSuffix in self.output_file:
513     output_file_num = self.numberFile_(fileWithSuffix, '$NJob')
514 slacapra 1.2 file_list=file_list+output_file_num+' '
515 slacapra 1.1 txt += '\n'
516     txt += 'ls \n'
517     txt += '\n'
518     txt += 'ls '+fileWithSuffix+'\n'
519     txt += 'exe_result=$?\n'
520     txt += 'if [ $exe_result -ne 0 ] ; then\n'
521     txt += ' echo "ERROR: No output file to manage"\n'
522     txt += ' echo "JOB_EXIT_STATUS = $exe_result"\n'
523     txt += ' echo "SanityCheckCode = $exe_result" | tee -a $RUNTIME_AREA/$repo\n'
524     txt += ' dumpStatus $RUNTIME_AREA/$repo\n'
525     txt += ' exit $exe_result \n'
526     txt += 'else\n'
527     txt += ' cp '+fileWithSuffix+' $RUNTIME_AREA/'+output_file_num+'\n'
528     txt += 'fi\n'
529     txt += 'cd $RUNTIME_AREA\n'
530    
531     pass
532    
533     file_list=file_list[:-1]
534 slacapra 1.2 txt += 'file_list="'+file_list+'"\n'
535 slacapra 1.1 return txt
536    
537     def numberFile_(self, file, txt):
538     """
539     append _'txt' before last extension of a file
540     """
541     p = string.split(file,".")
542     # take away last extension
543     name = p[0]
544     for x in p[1:-1]:
545     name=name+"."+x
546     # add "_txt"
547     if len(p)>1:
548     ext = p[len(p)-1]
549     #result = name + '_' + str(txt) + "." + ext
550     result = name + '_' + txt + "." + ext
551     else:
552     #result = name + '_' + str(txt)
553     result = name + '_' + txt
554    
555     return result
556    
557     def getRequirements(self):
558     """
559     return job requirements to add to jdl files
560     """
561     req = ''
562     if common.analisys_common_info['sites']:
563     if common.analisys_common_info['sw_version']:
564     req='Member("VO-cms-' + \
565     common.analisys_common_info['sw_version'] + \
566     '", other.GlueHostApplicationSoftwareRunTimeEnvironment)'
567     if len(common.analisys_common_info['sites'])>0:
568     req = req + ' && ('
569     for i in range(len(common.analisys_common_info['sites'])):
570     req = req + 'other.GlueCEInfoHostName == "' \
571     + common.analisys_common_info['sites'][i] + '"'
572     if ( i < (int(len(common.analisys_common_info['sites']) - 1)) ):
573     req = req + ' || '
574     req = req + ')'
575     #print "req = ", req
576     return req