ViewVC Help
View File | Revision Log | Show Annotations | Root Listing
root/cvsroot/COMP/CRAB/python/cms_orca.py
Revision: 1.6
Committed: Wed Jul 20 10:03:22 2005 UTC (19 years, 9 months ago) by slacapra
Content type: text/x-python
Branch: MAIN
Changes since 1.5: +26 -123 lines
Log Message:
too many changes to list them here...

File Contents

# User Rev Content
1 nsmirnov 1.1 from JobType import JobType
2     from crab_logger import Logger
3     from crab_exceptions import *
4     from crab_util import *
5     import common
6 nsmirnov 1.3 import PubDB
7 nsmirnov 1.1 import orcarcBuilder
8 slacapra 1.6 import codePreparator
9 nsmirnov 1.1
10     import os, string, re
11    
12     class Orca(JobType):
13     def __init__(self, cfg_params):
14     JobType.__init__(self, 'ORCA')
15 slacapra 1.6 common.logger.debug(3,'ORCA::__init__')
16 nsmirnov 1.1
17     self.analisys_common_info = {}
18    
19    
20     log = common.logger
21 slacapra 1.6 code = codePreparator.codePreparator(cfg_params)
22 nsmirnov 1.1
23     scramArea = ''
24 slacapra 1.6
25 nsmirnov 1.1 try:
26     scramArea = os.environ["LOCALRT"]
27     except KeyError:
28     msg = self.name()+' job type cannot be created:\n'
29     msg += ' LOCALRT env variable not set\n'
30     msg += ' Did you do eval `scram runtime ...` from your ORCA area ?\n'
31     raise CrabException(msg)
32 nsmirnov 1.3 log.debug(6, "Orca::Orca(): SCRAM area is "+scramArea)
33 nsmirnov 1.1
34     try:
35 slacapra 1.6 self.version = code.findSwVersion_(scramArea)
36 nsmirnov 1.3 log.debug(6, "Orca::Orca(): version = "+self.version)
37 nsmirnov 1.1 self.owner = cfg_params['USER.owner']
38 nsmirnov 1.3 log.debug(6, "Orca::Orca(): owner = "+self.owner)
39 nsmirnov 1.1 self.dataset = cfg_params['USER.dataset']
40 nsmirnov 1.3 log.debug(6, "Orca::Orca(): dataset = "+self.dataset)
41 nsmirnov 1.1 self.executable = cfg_params['USER.executable']
42 nsmirnov 1.3 log.debug(6, "Orca::Orca(): executable = "+self.executable)
43 nsmirnov 1.1 self.orcarc_file = cfg_params['USER.orcarc_file']
44 nsmirnov 1.3 log.debug(6, "Orca::Orca(): orcarc file = "+self.orcarc_file)
45 nsmirnov 1.1
46     # allow multiple output files
47    
48     self.output_file = []
49    
50     tmp = cfg_params['USER.output_file']
51     if tmp != '':
52     tmpOutFiles = string.split(cfg_params['USER.output_file'],',')
53 nsmirnov 1.3 log.debug(7, 'Orca::Orca(): output files '+str(tmpOutFiles))
54 nsmirnov 1.1 for tmp in tmpOutFiles:
55     tmp=string.strip(tmp)
56     self.output_file.append(tmp)
57     pass
58    
59     # output files with num added
60     self.output_file_num = []
61     for tmp in self.output_file:
62     self.output_file_num.append(tmp)
63     pass
64     pass
65     pass
66    
67     except KeyError:
68     msg = self.name()+' job type cannot be created:\n'
69     msg = msg + ' not all mandatory fields present in the'\
70     ' [USER] section.\n List of mandatory fields:\n'
71     msg = msg + \
72     'USER.owner\n'+\
73     'USER.dataset\n'+\
74     'USER.executable\n'+\
75     'USER.orcarc_file\n'+\
76     'USER.output_file\n'
77     raise CrabException(msg)
78    
79     self.dataTiers = []
80     try:
81     tmpDataTiers = string.split(cfg_params['USER.data_tier'],',')
82     for tmp in tmpDataTiers:
83     tmp=string.strip(tmp)
84     self.dataTiers.append(tmp)
85     pass
86     pass
87     except KeyError:
88     pass
89 nsmirnov 1.3 log.debug(6, "Orca::Orca(): data tiers = "+str(self.dataTiers))
90 nsmirnov 1.1
91     try:
92     tmpAddFiles = string.split(cfg_params['USER.additional_input_files'],',')
93     for tmp in tmpAddFiles:
94     tmp=string.strip(tmp)
95     common.additional_inbox_files.append(tmp)
96     pass
97     pass
98     except KeyError:
99     pass
100    
101     try:
102     self.total_number_of_events = int(cfg_params['USER.total_number_of_events'])
103 slacapra 1.6 # self.job_number_of_events = int(cfg_params['USER.job_number_of_events'])
104 nsmirnov 1.1 except KeyError:
105     msg = 'Must define total_number_of_events and job_number_of_events'
106     raise CrabException(msg)
107    
108     try:
109     self.first = int(cfg_params['USER.first_event'])
110     except KeyError:
111     self.first = 0
112     pass
113 nsmirnov 1.3 log.debug(6, "Orca::Orca(): total number of events = "+`self.total_number_of_events`)
114 slacapra 1.6 #log.debug(6, "Orca::Orca(): events per job = "+`self.job_number_of_events`)
115 nsmirnov 1.3 log.debug(6, "Orca::Orca(): first event = "+`self.first`)
116 nsmirnov 1.1
117     self.maxEvents=0 # max events available in any PubDB
118     self.connectPubDB()
119    
120     self.checkNevJobs()
121    
122 slacapra 1.6 self.tgzNameWithPath = code.prepareTgz_(self.executable)
123 nsmirnov 1.1
124     return
125    
126 nsmirnov 1.4 def wsSetupEnvironment(self, nj):
127     """
128     Returns part of a job script which prepares
129     the execution environment for the job 'nj'.
130     """
131    
132     # Prepare JobType-independent part
133     txt = self.wsSetupCMSEnvironment_()
134    
135     # Prepare JobType-specific part
136     txt += '\n'
137     txt += 'scram project ORCA '+self.version+'\n'
138     txt += 'status=$?\n'
139     txt += 'if [ $status != 0 ] ; then\n'
140     txt += ' echo "Warning: ORCA '+self.version+' not found on `hostname`" \n'
141     txt += ' exit 1 \n'
142     txt += 'fi \n'
143     txt += 'cd '+self.version+'\n'
144     txt += 'eval `scram runtime -sh`\n'
145    
146     # Prepare job-specific part
147     job = common.job_list[nj]
148     orcarc = os.path.basename(job.configFilename())
149     txt += '\n'
150     txt += 'cp $RUNTIME_AREA/'+orcarc+' .orcarc\n'
151     txt += 'if [ -e $RUNTIME_AREA/orcarc_$CE ] ; then\n'
152     txt += ' cat $RUNTIME_AREA/orcarc_$CE .orcarc >> .orcarc_tmp\n'
153     txt += ' mv .orcarc_tmp .orcarc\n'
154     txt += ' cp $RUNTIME_AREA/init_$CE.sh init.sh\n'
155     txt += 'fi\n'
156     txt += 'echo "***** cat .orcarc *********"\n'
157     txt += 'cat .orcarc\n'
158     txt += 'echo "****** end .orcarc ********"\n'
159     txt += '\n'
160     txt += 'chmod +x ./init.sh\n'
161     txt += './init.sh\n'
162     txt += 'exitStatus=$?\n'
163     txt += 'if [ $exitStatus != 0 ] ; then\n'
164     txt += ' echo "StageIn init script failed!"\n'
165     txt += ' exit $exitStatus\n'
166     txt += 'fi\n'
167     return txt
168    
169     def wsBuildExe(self, nj):
170     """
171     Put in the script the commands to build an executable
172     or a library.
173     """
174    
175     txt = ""
176    
177     if os.path.isfile(self.tgz):
178     txt += 'echo "tar xzvf ../'+os.path.basename(self.tgz)+'"\n'
179     txt += 'tar xzvf ../'+os.path.basename(self.tgz)+'\n'
180     txt += 'untar_status=$? \n'
181     txt += 'if [ $untar_status -ne 0 ]; then \n'
182     txt += ' echo "Untarring .tgz file failed ... exiting" \n'
183     txt += ' exit 1 \n'
184     txt += 'else \n'
185     txt += ' echo "Successful untar" \n'
186     txt += 'fi \n'
187     # TODO: what does this code do here ?
188     # SL check that lib/Linux__... is present
189     txt += 'mkdir -p lib/Linux__2.4 \n'
190     txt += 'eval `scram runtime -sh`'+'\n'
191     pass
192    
193     return txt
194    
195     def wsRenameOutput(self, nj):
196     """
197     Returns part of a job script which renames the produced files.
198     """
199     txt = '\n'
200     for i in range(len(self.output_file)):
201     txt += 'cp '+self.output_file[i]+' '+self.output_file_num[i]+'\n'
202     pass
203     return txt
204    
205 nsmirnov 1.1 def executableName(self):
206     return self.executable
207    
208     def checkNevJobs(self):
209     """Check the number of jobs and num events per job"""
210    
211     # check if total_number_of_events==-1
212     if self.total_number_of_events==-1:
213     self.total_number_of_events=int(self.maxEvents)
214     if self.total_number_of_events==0:
215     msg = 'Max events available is '+str(self.total_number_of_events)
216     raise CrabException(msg)
217    
218    
219 slacapra 1.6 # # Check if job_number_of_events>total_number_of_events, in case warning and set =
220     # if self.job_number_of_events>self.total_number_of_events:
221     # msg='Asking '+str(self.job_number_of_events)+' per job but only '+str(self.total_number_of_events)+' in total: '
222     # msg=msg+'setting job_number_of_events to '+str(self.total_number_of_events)
223     # common.logger.message(msg)
224     # self.job_number_of_events=self.total_number_of_events
225     # pass
226 nsmirnov 1.1
227     return
228    
229     def connectPubDB(self):
230    
231 nsmirnov 1.3 fun = "Orca::connectPubDB()"
232    
233 nsmirnov 1.1 self.allOrcarcs = []
234     # first check if the info from PubDB have been already processed
235     if os.path.exists(common.work_space.shareDir()+'PubDBSummaryFile') :
236 nsmirnov 1.3 common.logger.debug(6, fun+": info from PubDB has been already processed -- use it")
237 nsmirnov 1.1 f = open( common.work_space.shareDir()+'PubDBSummaryFile', 'r' )
238     for i in f.readlines():
239     a=string.split(i,' ')
240     self.allOrcarcs.append(orcarcBuilder.constructFromFile(a[0:-1]))
241     pass
242     for o in self.allOrcarcs:
243     # o.dump()
244     if o.Nevents >= self.maxEvents:
245     self.maxEvents= o.Nevents
246     pass
247     pass
248     pass
249    
250     else: # PubDB never queried
251 nsmirnov 1.3 common.logger.debug(6, fun+": PubDB was never queried -- do it")
252 nsmirnov 1.1 # New PubDB class by SL
253     try:
254 nsmirnov 1.3 self.pubdb = PubDB.PubDB(self.owner,
255 nsmirnov 1.1 self.dataset,
256     self.dataTiers)
257 nsmirnov 1.3 except PubDB.RefDBError:
258 nsmirnov 1.1 msg = 'ERROR ***: accessing PubDB'
259     raise CrabException(msg)
260    
261     self.pubDBResults = self.pubdb.getAllPubDBsInfo()
262     if len(self.pubDBResults)==0:
263     msg = 'Owner Dataset not published with asked dataTiers! '+\
264     self.owner+' '+ self.dataset+' '+self.dataTiers
265     raise CrabException(msg)
266 nsmirnov 1.3
267     common.logger.debug(6, fun+": PubDB info ("+`len(self.pubDBResults)`+"):\n")
268     for aa in self.pubDBResults:
269     for bb in aa:
270     common.logger.debug(6, str(bb))
271     pass
272     pass
273     common.logger.debug(6, fun+": End of PubDB info\n")
274 nsmirnov 1.1
275     self.builder = orcarcBuilder.orcarcBuilder()
276    
277     currDir = os.getcwd()
278     os.chdir(common.work_space.jobDir())
279     tmpAllOrcarcs = self.builder.createOrcarcAndInit(self.pubDBResults)
280     os.chdir(currDir)
281    
282     self.maxEvents=0
283     for o in tmpAllOrcarcs:
284     numEvReq=self.total_number_of_events
285     if ((numEvReq == '-1') | (numEvReq <= o.Nevents)):
286     self.allOrcarcs.append(o)
287     if o.Nevents >= self.maxEvents:
288     self.maxEvents= o.Nevents
289     pass
290     pass
291     pass
292    
293     # set maximum number of event available
294    
295     # I save to a file self.allOrcarcs
296    
297     PubDBSummaryFile = open(common.work_space.shareDir()+'PubDBSummaryFile','w')
298     for o in self.allOrcarcs:
299     for d in o.content():
300     PubDBSummaryFile.write(d)
301     PubDBSummaryFile.write(' ')
302     pass
303     PubDBSummaryFile.write('\n')
304     pass
305     PubDBSummaryFile.close()
306    
307     # for o in self.allOrcarcs:
308     # o.dump()
309     pass
310    
311     # build a list of sites
312     ces= []
313     for o in self.allOrcarcs:
314     ces.append(o.CE)
315     pass
316    
317     if len(ces)==0:
318     msg = 'No PubDBs publish enough events! '
319     msg += `self.total_number_of_events`
320     raise CrabException(msg)
321    
322 nsmirnov 1.3 common.logger.debug(6, "List of CEs: "+str(ces))
323 slacapra 1.6 common.analisys_common_info['sites']=ces
324 nsmirnov 1.1
325     return
326    
327     def nJobs(self):
328     # TODO: should not be here !
329     # JobType should have no internal knowledge about submitted jobs
330     # One possibility is to use len(common.job_list).
331     """ return the number of job to be created """
332 slacapra 1.6 return len(common.job_list)
333     #return int((self.total_number_of_events-1)/self.job_number_of_events)+1
334 nsmirnov 1.5
335     def prepareSteeringCards(self):
336     """
337     modify the orcarc card provided by the user,
338     writing a new card into share dir
339     """
340     infile = ''
341     try:
342     infile = open(self.orcarc_file,'r')
343     except:
344     self.orcarc_file = 'empty.orcarc'
345     cmd='touch '+self.orcarc_file
346     runCommand(cmd,0)
347     infile = open(self.orcarc_file,'r')
348    
349     outfile = open(common.work_space.shareDir()+self.cardsBaseName(), 'w')
350    
351     inline=infile.readlines()
352     ### remove from user card these lines ###
353     for i in range (len(inline)):
354     if string.find(inline[i], 'InputFileCatalogURL') == -1:
355     if string.find(inline[i], 'InputCollections') == -1:
356     if string.find(inline[i], 'FirstEvent') == -1:
357     if string.find(inline[i], 'MaxEvents') == -1:
358     outfile.write(inline[i])
359     else:
360     continue
361     infile.close()
362     outfile.close()
363     return
364    
365     def setSteeringCardsNames(self):
366     """
367     Generates names for application steering card names,
368     e.g. 'mumu_000002.orcarc' for dataset 'mumu', job 2.
369     """
370    
371     common.job_list.setCfgNames(self.dataset+'.orcarc')
372     return
373 nsmirnov 1.1
374     def modifySteeringCards(self, nj):
375     # add jobs information to the orcarc card,
376     # starting from card into share dir
377     """
378     Creates steering cards file modifying a template file
379     taken from RefDB or given by user.
380     """
381     infile = open(common.work_space.shareDir()+self.cardsBaseName(), 'r')
382     outfile = open(common.job_list[nj].configFilename(),'w')
383     ### job splitting
384 slacapra 1.6 firstEvent = common.jobDB.firstEvent(nj)
385     maxEvents = common.jobDB.maxEvents(nj)
386     #Nev_job = self.job_number_of_events
387 nsmirnov 1.1 outfile.write('InputCollections=/System/'+self.owner+'/'+self.dataset+'/'+self.dataset+'\n')
388 slacapra 1.6 outfile.write('FirstEvent = '+ str(firstEvent) +'\n')
389 nsmirnov 1.1
390 slacapra 1.6 # # how to check that this is the last job, so the number of events to be analyzed is different?
391     # if nj==(self.nJobs()-1):
392     # Nev_job=self.total_number_of_events - (self.first + (nj*Nev_job))
393 nsmirnov 1.1
394 slacapra 1.6 outfile.write('MaxEvents = '+str(maxEvents)+'\n')
395 nsmirnov 1.1
396     if len(self.output_file)>0 :
397     for i in range(len(self.output_file)):
398     p = string.split(self.output_file[i],".")
399     file = p[0]
400     for x in p[1:-1]:
401     file=file+"."+x
402     if len(p)>1:
403     ext = p[len(p)-1]
404     self.output_file_num[i] = file + "_" +str(nj + 1) + "." + ext
405     else:
406     self.output_file_num[i] = file + "_" +str(nj + 1)
407    
408     outfile.write(infile.read())
409     outfile.close()
410     return
411    
412     def cardsBaseName(self):
413     """
414     Returns name of user orcarc card-file
415     """
416     return os.path.split (self.orcarc_file)[1]
417    
418     ### content of input_sanbdox ...
419     def inputSandbox(self, nj):
420     """
421     Returns a list of filenames to be put in JDL input sandbox.
422     """
423     inp_box = []
424     self.tgz = self.tgzNameWithPath
425     if os.path.isfile(self.tgz):
426     inp_box.append(self.tgz)
427     #else:
428     #print 'tgz not found!!!'
429     for o in self.allOrcarcs:
430     for f in o.fileList():
431     inp_box.append(common.work_space.jobDir()+f)
432     inp_box.append(common.job_list[nj].configFilename())
433     return inp_box
434    
435     ### and of output_sandbox
436     def outputSandbox(self, nj):
437     """
438     Returns a list of filenames to be put in JDL output sandbox.
439     """
440     out_box = []
441    
442     if len(self.output_file_num)>0 :
443     for out in self.output_file_num:
444     out_box.append(self.version+'/'+out)
445     return out_box