ViewVC Help
View File | Revision Log | Show Annotations | Root Listing
root/cvsroot/COMP/CRAB/python/cms_orca.py
Revision: 1.16
Committed: Wed Oct 5 16:23:32 2005 UTC (19 years, 6 months ago) by fanzago
Content type: text/x-python
Branch: MAIN
Changes since 1.15: +19 -15 lines
Log Message:
changed crab, orca and edg schema, to grep new exit_code

File Contents

# User Rev Content
1 nsmirnov 1.1 from JobType import JobType
2     from crab_logger import Logger
3     from crab_exceptions import *
4     from crab_util import *
5     import common
6 nsmirnov 1.3 import PubDB
7 nsmirnov 1.1 import orcarcBuilder
8 slacapra 1.9 import orcarcBuilderOld
9     import Scram
10 nsmirnov 1.1
11     import os, string, re
12    
13     class Orca(JobType):
14     def __init__(self, cfg_params):
15     JobType.__init__(self, 'ORCA')
16 slacapra 1.6 common.logger.debug(3,'ORCA::__init__')
17 nsmirnov 1.1
18     self.analisys_common_info = {}
19    
20     log = common.logger
21    
22 slacapra 1.9 self.scram = Scram.Scram(cfg_params)
23 nsmirnov 1.1 scramArea = ''
24 slacapra 1.6
25 slacapra 1.9 self.version = self.scram.getSWVersion()
26     common.analisys_common_info['sw_version'] = self.version
27 nsmirnov 1.1
28 slacapra 1.9 ### collect Data cards
29 nsmirnov 1.1 try:
30     self.owner = cfg_params['USER.owner']
31 nsmirnov 1.3 log.debug(6, "Orca::Orca(): owner = "+self.owner)
32 nsmirnov 1.1 self.dataset = cfg_params['USER.dataset']
33 nsmirnov 1.3 log.debug(6, "Orca::Orca(): dataset = "+self.dataset)
34 slacapra 1.9 except KeyError:
35     msg = "Error: owner and/or dataset not defined "
36     raise CrabException(msg)
37    
38     self.dataTiers = []
39     try:
40     tmpDataTiers = string.split(cfg_params['USER.data_tier'],',')
41     for tmp in tmpDataTiers:
42     tmp=string.strip(tmp)
43     self.dataTiers.append(tmp)
44     pass
45     pass
46     except KeyError:
47     pass
48     log.debug(6, "Orca::Orca(): dataTiers = "+str(self.dataTiers))
49    
50     ## now the application
51     try:
52 nsmirnov 1.1 self.executable = cfg_params['USER.executable']
53 nsmirnov 1.3 log.debug(6, "Orca::Orca(): executable = "+self.executable)
54 slacapra 1.9 except:
55     msg = "Error: executable not defined "
56     raise CrabException(msg)
57    
58     try:
59 nsmirnov 1.1 self.orcarc_file = cfg_params['USER.orcarc_file']
60 nsmirnov 1.3 log.debug(6, "Orca::Orca(): orcarc file = "+self.orcarc_file)
61 slacapra 1.9 except:
62     log.message("Using empty orcarc file")
63     self.orcarc_file = ''
64 nsmirnov 1.1
65 slacapra 1.9 # output files
66     try:
67 nsmirnov 1.1 self.output_file = []
68    
69     tmp = cfg_params['USER.output_file']
70     if tmp != '':
71     tmpOutFiles = string.split(cfg_params['USER.output_file'],',')
72 nsmirnov 1.3 log.debug(7, 'Orca::Orca(): output files '+str(tmpOutFiles))
73 nsmirnov 1.1 for tmp in tmpOutFiles:
74     tmp=string.strip(tmp)
75     self.output_file.append(tmp)
76     pass
77    
78 slacapra 1.9 else:
79     log.message("No output file defined: only stdout/err will be available")
80 nsmirnov 1.1 pass
81     pass
82     except KeyError:
83 slacapra 1.9 log.message("No output file defined: only stdout/err will be available")
84 nsmirnov 1.1 pass
85    
86 slacapra 1.9 ## additional input files
87 slacapra 1.7 self.additional_inbox_files = []
88 nsmirnov 1.1 try:
89     tmpAddFiles = string.split(cfg_params['USER.additional_input_files'],',')
90     for tmp in tmpAddFiles:
91     tmp=string.strip(tmp)
92 slacapra 1.7 self.additional_inbox_files.append(tmp)
93 nsmirnov 1.1 pass
94     pass
95     except KeyError:
96     pass
97    
98     try:
99     self.total_number_of_events = int(cfg_params['USER.total_number_of_events'])
100     except KeyError:
101     msg = 'Must define total_number_of_events and job_number_of_events'
102     raise CrabException(msg)
103    
104     try:
105     self.first = int(cfg_params['USER.first_event'])
106     except KeyError:
107     self.first = 0
108     pass
109 nsmirnov 1.3 log.debug(6, "Orca::Orca(): total number of events = "+`self.total_number_of_events`)
110 slacapra 1.6 #log.debug(6, "Orca::Orca(): events per job = "+`self.job_number_of_events`)
111 nsmirnov 1.3 log.debug(6, "Orca::Orca(): first event = "+`self.first`)
112 nsmirnov 1.1
113     self.maxEvents=0 # max events available in any PubDB
114 slacapra 1.7 self.connectPubDB(cfg_params)
115 nsmirnov 1.1
116 slacapra 1.9 # [-- self.checkNevJobs() --]
117 nsmirnov 1.1
118 slacapra 1.9 self.tgzNameWithPath = self.scram.getTarBall(self.executable)
119 nsmirnov 1.1
120 slacapra 1.10 try:
121     self.ML = int(cfg_params['USER.activate_monalisa'])
122     except KeyError:
123     self.ML = 0
124     pass
125 nsmirnov 1.1 return
126    
127 nsmirnov 1.4 def wsSetupEnvironment(self, nj):
128     """
129     Returns part of a job script which prepares
130     the execution environment for the job 'nj'.
131     """
132    
133     # Prepare JobType-independent part
134     txt = self.wsSetupCMSEnvironment_()
135    
136 fanzago 1.16 # Prepare JobType-specific part
137     scram = self.scram.commandName()
138     txt += '\n\n'
139     txt += 'echo "### SPECIFIC JOB SETUP ENVIRONMENT ###"\n'
140     txt += scram+' project ORCA '+self.version+'\n'
141     txt += 'status=$?\n'
142     txt += 'if [ $status != 0 ] ; then\n'
143     txt += ' echo "SET_EXE_ENV 1 ==>ERROR ORCA '+self.version+' not found on `hostname`" \n'
144     txt += ' exit 1 \n'
145     txt += 'fi \n'
146     txt += 'echo "ORCA_VERSION = '+self.version+'"\n'
147     txt += 'cd '+self.version+'\n'
148     txt += 'eval `'+scram+' runtime -sh`\n'
149    
150 slacapra 1.9 # Handle the arguments:
151     txt += "\n"
152     txt += "## ARGUMNETS: $1 Job Number\n"
153     txt += "## ARGUMNETS: $2 First Event for this job\n"
154     txt += "## ARGUMNETS: $3 Max Event for this job\n"
155     txt += "\n"
156     txt += "narg=$#\n"
157     txt += "if [ $narg -lt 3 ]\n"
158     txt += "then\n"
159 fanzago 1.16 txt += " echo 'SET_EXE_ENV 1 ==> ERROR Too few arguments' +$narg+ \n"
160 slacapra 1.9 txt += " exit 1\n"
161     txt += "fi\n"
162 fanzago 1.16 txt += "\n"
163 slacapra 1.10 txt += "NJob=$1\n"
164     txt += "FirstEvent=$2\n"
165     txt += "MaxEvent=$3\n"
166 nsmirnov 1.4
167     # Prepare job-specific part
168     job = common.job_list[nj]
169     orcarc = os.path.basename(job.configFilename())
170     txt += '\n'
171     txt += 'cp $RUNTIME_AREA/'+orcarc+' .orcarc\n'
172     txt += 'if [ -e $RUNTIME_AREA/orcarc_$CE ] ; then\n'
173     txt += ' cat $RUNTIME_AREA/orcarc_$CE .orcarc >> .orcarc_tmp\n'
174     txt += ' mv .orcarc_tmp .orcarc\n'
175     txt += ' cp $RUNTIME_AREA/init_$CE.sh init.sh\n'
176     txt += 'fi\n'
177     txt += '\n'
178     txt += 'chmod +x ./init.sh\n'
179     txt += './init.sh\n'
180     txt += 'exitStatus=$?\n'
181     txt += 'if [ $exitStatus != 0 ] ; then\n'
182 fanzago 1.16 txt += ' echo "SET_EXE_ENV 1 ==> ERROR StageIn init script failed"\n'
183 nsmirnov 1.4 txt += ' exit $exitStatus\n'
184     txt += 'fi\n'
185 fanzago 1.16 txt += "echo 'SET_EXE_ENV 0 ==> job setup ok'\n"
186     txt += 'echo "### END JOB SETUP ENVIRONMENT ###"\n\n'
187 slacapra 1.9
188 slacapra 1.10 txt += 'echo "FirstEvent=$FirstEvent" >> .orcarc\n'
189     txt += 'echo "MaxEvent=$MaxEvent" >> .orcarc\n'
190     if self.ML:
191     txt += 'echo "MonalisaJobId=$NJob" >> .orcarc\n'
192    
193     txt += '\n'
194     txt += 'echo "***** cat .orcarc *********"\n'
195     txt += 'cat .orcarc\n'
196     txt += 'echo "****** end .orcarc ********"\n'
197 nsmirnov 1.4 return txt
198    
199     def wsBuildExe(self, nj):
200     """
201     Put in the script the commands to build an executable
202     or a library.
203     """
204    
205     txt = ""
206    
207 fanzago 1.12 if os.path.isfile(self.tgzNameWithPath):
208     txt += 'echo "tar xzvf ../'+os.path.basename(self.tgzNameWithPath)+'"\n'
209     txt += 'tar xzvf ../'+os.path.basename(self.tgzNameWithPath)+'\n'
210 nsmirnov 1.4 txt += 'untar_status=$? \n'
211     txt += 'if [ $untar_status -ne 0 ]; then \n'
212     txt += ' echo "Untarring .tgz file failed ... exiting" \n'
213     txt += ' exit 1 \n'
214     txt += 'else \n'
215     txt += ' echo "Successful untar" \n'
216     txt += 'fi \n'
217     # TODO: what does this code do here ?
218     # SL check that lib/Linux__... is present
219 slacapra 1.9 txt += 'mkdir -p lib/${SCRAM_ARCH} \n'
220     txt += 'eval `'+self.scram.commandName()+' runtime -sh`'+'\n'
221 nsmirnov 1.4 pass
222    
223     return txt
224    
225     def wsRenameOutput(self, nj):
226     """
227     Returns part of a job script which renames the produced files.
228     """
229 slacapra 1.9
230 nsmirnov 1.4 txt = '\n'
231 fanzago 1.15 file_list = ''
232 slacapra 1.9 for fileWithSuffix in self.output_file:
233     output_file_num = self.numberFile_(fileWithSuffix, '$NJob')
234 fanzago 1.15 file_list=file_list+output_file_num+','
235 slacapra 1.9 txt += 'cp '+fileWithSuffix+' '+output_file_num+'\n'
236 nsmirnov 1.4 pass
237 fanzago 1.15 file_list=file_list[:-1]
238     txt += 'file_list='+file_list+'\n'
239 nsmirnov 1.4 return txt
240    
241 nsmirnov 1.1 def executableName(self):
242     return self.executable
243    
244 slacapra 1.7 def connectPubDB(self, cfg_params):
245 nsmirnov 1.1
246 nsmirnov 1.3 fun = "Orca::connectPubDB()"
247    
248 nsmirnov 1.1 self.allOrcarcs = []
249     # first check if the info from PubDB have been already processed
250     if os.path.exists(common.work_space.shareDir()+'PubDBSummaryFile') :
251 nsmirnov 1.3 common.logger.debug(6, fun+": info from PubDB has been already processed -- use it")
252 nsmirnov 1.1 f = open( common.work_space.shareDir()+'PubDBSummaryFile', 'r' )
253     for i in f.readlines():
254     a=string.split(i,' ')
255 slacapra 1.9 self.allOrcarcs.append(orcarcBuilderOld.constructFromFile(a[0:-1]))
256 nsmirnov 1.1 pass
257     for o in self.allOrcarcs:
258     # o.dump()
259     if o.Nevents >= self.maxEvents:
260     self.maxEvents= o.Nevents
261     pass
262     pass
263     pass
264    
265     else: # PubDB never queried
266 nsmirnov 1.3 common.logger.debug(6, fun+": PubDB was never queried -- do it")
267 nsmirnov 1.1 # New PubDB class by SL
268     try:
269 nsmirnov 1.3 self.pubdb = PubDB.PubDB(self.owner,
270 nsmirnov 1.1 self.dataset,
271 slacapra 1.7 self.dataTiers,
272     cfg_params)
273 slacapra 1.9 except PubDB.RefDBmapError:
274 nsmirnov 1.1 msg = 'ERROR ***: accessing PubDB'
275     raise CrabException(msg)
276    
277 slacapra 1.9 ## extract info from pubDB (grouped by PubDB version :
278     ## pubDBData contains a list of info for the new-style PubDBs,
279     ## and a list of info for the old-style PubDBs )
280     self.pubDBData = self.pubdb.getAllPubDBData()
281    
282     ## check and exit if no data are published in any PubDB
283     nodata=1
284     for PubDBversion in self.pubDBData.keys():
285     if len(self.pubDBData[PubDBversion])>0:
286     nodata=0
287     if (nodata):
288     msg = 'Owner '+self.owner+' Dataset '+ self.dataset+ ' not published in any PubDB with asked dataTiers '+string.join(self.dataTiers,'-')+' ! '
289 nsmirnov 1.1 raise CrabException(msg)
290 nsmirnov 1.3
291 slacapra 1.9 ## logging PubDB content for debugging
292     for PubDBversion in self.pubDBData.keys():
293     common.logger.debug(6, fun+": PubDB "+PubDBversion+" info ("+`len(self.pubDBData[PubDBversion])`+"):\/")
294     for aa in self.pubDBData[PubDBversion]:
295     common.logger.debug(6, "---------- start of a PubDB")
296     for bb in aa:
297     if common.logger.debugLevel() >= 6 :
298     common.logger.debug(6, str(bb.dump()))
299     pass
300     pass
301     common.logger.debug(6, "----------- end of a PubDB")
302     common.logger.debug(6, fun+": End of PubDB "+PubDBversion+" info\n")
303 nsmirnov 1.1
304    
305 slacapra 1.9 ## building orcarc : switch between info from old and new-style PubDB
306 nsmirnov 1.1 currDir = os.getcwd()
307     os.chdir(common.work_space.jobDir())
308 slacapra 1.9
309     tmpOrcarcList=[]
310     for PubDBversion in self.pubDBData.keys():
311     if len(self.pubDBData[PubDBversion])>0 :
312     #print (" PubDB-style : %s"%(PubDBversion))
313     if PubDBversion=='newPubDB' :
314     self.builder = orcarcBuilder.orcarcBuilder()
315     else :
316     self.builder = orcarcBuilderOld.orcarcBuilderOld()
317     tmpAllOrcarcs = self.builder.createOrcarcAndInit(self.pubDBData[PubDBversion])
318     tmpOrcarcList.append(tmpAllOrcarcs)
319     #print 'version ',PubDBversion,' tmpAllOrcarcs ', tmpAllOrcarcs
320    
321     #print tmpOrcarcList
322 nsmirnov 1.1 os.chdir(currDir)
323    
324     self.maxEvents=0
325 slacapra 1.9 for tmpAllOrcarcs in tmpOrcarcList:
326     for o in tmpAllOrcarcs:
327     numEvReq=self.total_number_of_events
328     if ((numEvReq == '-1') | (numEvReq <= o.Nevents)):
329     self.allOrcarcs.append(o)
330     if o.Nevents >= self.maxEvents:
331     self.maxEvents= o.Nevents
332     pass
333 nsmirnov 1.1 pass
334     pass
335    
336     # set maximum number of event available
337    
338     # I save to a file self.allOrcarcs
339    
340     PubDBSummaryFile = open(common.work_space.shareDir()+'PubDBSummaryFile','w')
341     for o in self.allOrcarcs:
342     for d in o.content():
343     PubDBSummaryFile.write(d)
344     PubDBSummaryFile.write(' ')
345     pass
346     PubDBSummaryFile.write('\n')
347     pass
348     PubDBSummaryFile.close()
349    
350     # for o in self.allOrcarcs:
351 slacapra 1.7 # o.dump()
352 nsmirnov 1.1 pass
353    
354     # build a list of sites
355     ces= []
356     for o in self.allOrcarcs:
357     ces.append(o.CE)
358     pass
359    
360     if len(ces)==0:
361 slacapra 1.9 msg = 'No PubDBs publish correct catalogs or enough events! '
362 nsmirnov 1.1 msg += `self.total_number_of_events`
363     raise CrabException(msg)
364    
365 nsmirnov 1.3 common.logger.debug(6, "List of CEs: "+str(ces))
366 slacapra 1.6 common.analisys_common_info['sites']=ces
367 nsmirnov 1.1
368     return
369    
370     def nJobs(self):
371     # TODO: should not be here !
372     # JobType should have no internal knowledge about submitted jobs
373     # One possibility is to use len(common.job_list).
374     """ return the number of job to be created """
375 slacapra 1.6 return len(common.job_list)
376     #return int((self.total_number_of_events-1)/self.job_number_of_events)+1
377 nsmirnov 1.5
378     def prepareSteeringCards(self):
379     """
380     modify the orcarc card provided by the user,
381     writing a new card into share dir
382     """
383     infile = ''
384     try:
385     infile = open(self.orcarc_file,'r')
386     except:
387     self.orcarc_file = 'empty.orcarc'
388     cmd='touch '+self.orcarc_file
389 slacapra 1.9 runCommand(cmd)
390 nsmirnov 1.5 infile = open(self.orcarc_file,'r')
391    
392 slacapra 1.9 outfile = open(common.work_space.jobDir()+self.name()+'.orcarc', 'w')
393 nsmirnov 1.5
394     inline=infile.readlines()
395     ### remove from user card these lines ###
396 slacapra 1.9 wordRemove=['InputFileCatalogURL', 'InputCollections', 'FirstEvent', 'MaxEvents', 'TFileAdaptor']
397     for line in inline:
398     word = string.strip(string.split(line,'=')[0])
399 slacapra 1.8
400 slacapra 1.9 if word not in wordRemove:
401     outfile.write(line)
402     else:
403     continue
404     pass
405    
406     outfile.write('\n\n##### The following cards have been created by CRAB: DO NOT TOUCH #####\n')
407     outfile.write('TFileAdaptor = true\n')
408    
409 slacapra 1.10 if (self.ML) :
410     outfile.write('MonalisaAddPid = false\n')
411     outfile.write('ExtraPackages=RecApplPlugins\n')
412     outfile.write('MonRecAlisaBuilder=true\n')
413     ## TaskId is username+crab_0_date_time : that should be unique
414     TaskID = os.getlogin()+'_'+string.split(common.work_space.topDir(),'/')[-2]
415     outfile.write('MonalisaApplName='+TaskID+'\n')
416     outfile.write('MonalisaNode=192.91.245.5\n')
417     outfile.write('MonalisaPort=58884\n')
418     pass
419    
420 slacapra 1.9 outfile.write('InputCollections=/System/'+self.owner+'/'+self.dataset+'/'+self.dataset+'\n')
421 slacapra 1.8
422 nsmirnov 1.5 infile.close()
423     outfile.close()
424     return
425 nsmirnov 1.1
426     def modifySteeringCards(self, nj):
427     """
428     Creates steering cards file modifying a template file
429     """
430     return
431    
432     def cardsBaseName(self):
433     """
434     Returns name of user orcarc card-file
435     """
436     return os.path.split (self.orcarc_file)[1]
437    
438     ### content of input_sanbdox ...
439     def inputSandbox(self, nj):
440     """
441     Returns a list of filenames to be put in JDL input sandbox.
442     """
443     inp_box = []
444 corvo 1.13 # dict added to delete duplicate from input sandbox file list
445     seen = {}
446 slacapra 1.7 ## code
447 fanzago 1.12 if os.path.isfile(self.tgzNameWithPath):
448     inp_box.append(self.tgzNameWithPath)
449 slacapra 1.7 ## orcarc
450 nsmirnov 1.1 for o in self.allOrcarcs:
451     for f in o.fileList():
452 corvo 1.13 if (f not in seen.keys()):
453     inp_box.append(common.work_space.jobDir()+f)
454     seen[f] = 1
455 slacapra 1.9
456 slacapra 1.7 ## config
457 nsmirnov 1.1 inp_box.append(common.job_list[nj].configFilename())
458 slacapra 1.7 ## additional input files
459     inp_box = inp_box + self.additional_inbox_files
460 nsmirnov 1.1 return inp_box
461    
462 fanzago 1.15 ### and of output_sandbox
463 nsmirnov 1.1 def outputSandbox(self, nj):
464     """
465     Returns a list of filenames to be put in JDL output sandbox.
466     """
467     out_box = []
468    
469 slacapra 1.9 stdout=common.job_list[nj].stdout()
470     stderr=common.job_list[nj].stderr()
471 fanzago 1.15 #out_box.append(stdout)
472     #out_box.append(stderr)
473 slacapra 1.9
474 slacapra 1.7 ## User Declared output files
475 slacapra 1.9 for out in self.output_file:
476 fanzago 1.14 n_out = nj + 1
477     out_box.append(self.version+'/'+self.numberFile_(out,str(n_out)))
478 nsmirnov 1.1 return out_box
479 slacapra 1.7
480 slacapra 1.9 def numberFile_(self, file, txt):
481     """
482     append _'txt' before last extension of a file
483     """
484     p = string.split(file,".")
485     # take away last extension
486     name = p[0]
487     for x in p[1:-1]:
488     name=name+"."+x
489     # add "_txt"
490     if len(p)>1:
491     ext = p[len(p)-1]
492 fanzago 1.14 #result = name + '_' + str(txt) + "." + ext
493     result = name + '_' + txt + "." + ext
494 slacapra 1.9 else:
495 fanzago 1.14 #result = name + '_' + str(txt)
496     result = name + '_' + txt
497 slacapra 1.9
498     return result
499    
500    
501 slacapra 1.7 def stdOut(self):
502     return self.stdOut_
503    
504     def stdErr(self):
505     return self.stdErr_