ViewVC Help
View File | Revision Log | Show Annotations | Root Listing
root/cvsroot/COMP/CRAB/python/Creator.py
Revision: 1.65
Committed: Fri Apr 11 14:54:22 2008 UTC (17 years ago) by slacapra
Content type: text/x-python
Branch: MAIN
CVS Tags: bp_osg_bdii, CRAB_2_2_0_pre9, CRAB_2_2_0_pre8, CRAB_2_2_0_pre7
Branch point for: osg_bdii
Changes since 1.64: +7 -2 lines
Log Message:
Many changes to have LSF working with BossLite
Introduce Killer class to handle -kill which works again
Work_space::res() return the correct output directory also in case user has set a non default one, Likewise for logDir()
USER.outputdir is not to be used anywhere outside workspace class
Some cleanup in submit logic, to reduce call of Scheduler specific classes from Submitter.py
crab -clean works as well (well, almost, still need to remove twice the directory)
Fill startDirectory and outputDirectory to Task
GetOuput check status and not schedulerStatus (not stantard)
Some cleanup in the use of BlackWhiteListParser
No explicit check of scheduler concrete type in Submitter at listMatch level: move different behaviour in SchedulerXYZ implementation
Plus other things I'm forgetting...

Stefano

File Contents

# User Rev Content
1 nsmirnov 1.1 from Actor import Actor
2     from WorkSpace import WorkSpace
3     from JobList import JobList
4     from ScriptWriter import ScriptWriter
5     from Scheduler import Scheduler
6     from crab_logger import Logger
7     from crab_exceptions import *
8     from crab_util import *
9     import common
10    
11 slacapra 1.8 import os, string, math
12 slacapra 1.45 import time
13    
14 nsmirnov 1.1 class Creator(Actor):
15 corvo 1.30 def __init__(self, job_type_name, cfg_params, ncjobs):
16 nsmirnov 1.1 self.job_type_name = job_type_name
17 corvo 1.30 self.job_type = None
18 nsmirnov 1.1 self.cfg_params = cfg_params
19 nsmirnov 1.2 self.total_njobs = 0
20     self.total_number_of_events = 0
21     self.job_number_of_events = 0
22 slacapra 1.7 self.first_event = 0
23 gutsche 1.38 self.jobParamsList=[]
24 mcinquil 1.52
25 gutsche 1.42 self.createJobTypeObject(ncjobs)
26 nsmirnov 1.3 common.logger.debug(5, __name__+": JobType "+self.job_type.name()+" created")
27 nsmirnov 1.1
28     self.job_type.prepareSteeringCards()
29 nsmirnov 1.3 common.logger.debug(5, __name__+": Steering cards prepared")
30 nsmirnov 1.1
31 gutsche 1.38 self.total_njobs = self.job_type.numberOfJobs();
32 nsmirnov 1.3 common.logger.debug(5, __name__+": total # of jobs = "+`self.total_njobs`)
33 nsmirnov 1.1
34     self.ncjobs = ncjobs
35     if ncjobs == 'all' : self.ncjobs = self.total_njobs
36     if ncjobs > self.total_njobs : self.ncjobs = self.total_njobs
37 spiga 1.11
38 spiga 1.50
39 slacapra 1.35 # This is code for dashboard
40 spiga 1.36 #First checkProxy
41     common.scheduler.checkProxy()
42 corvo 1.33 try:
43 spiga 1.56 gridName = string.strip(common.scheduler.userName())
44 slacapra 1.53 common.logger.debug(5, "GRIDNAME: "+gridName)
45 corvo 1.33 taskType = 'analysis'
46 slacapra 1.54 VO = cfg_params.get('EDG.virtual_organization','cms')
47 corvo 1.33
48 slacapra 1.47 params = {'tool': common.prog_name,\
49     'JSToolVersion': common.prog_version_str, \
50     'tool_ui': os.environ['HOSTNAME'], \
51 slacapra 1.54 'scheduler': common.scheduler.name(), \
52 slacapra 1.53 'GridName': gridName, \
53 slacapra 1.47 'taskType': taskType, \
54 slacapra 1.48 'vo': VO, \
55 slacapra 1.54 'user': os.environ['USER']}
56 corvo 1.33 jtParam = self.job_type.getParams()
57     for i in jtParam.iterkeys():
58     params[i] = string.strip(jtParam[i])
59 slacapra 1.55 fl = open(common.work_space.shareDir() + '/' + common.apmon.fName, 'w')
60 corvo 1.33 for j, k in params.iteritems():
61     fl.write(j + ':' + k + '\n')
62     fl.close()
63     except:
64     exctype, value = sys.exc_info()[:2]
65 slacapra 1.35 common.logger.message("Creator::run Exception raised in collection params for dashboard: %s %s"%(exctype, value))
66 corvo 1.33 pass
67 nsmirnov 1.1
68 spiga 1.60 self.job_type_name = self.job_type.name()
69    
70 nsmirnov 1.3 common.logger.debug(5, "Creator constructor finished")
71 nsmirnov 1.1 return
72    
73 slacapra 1.7 def writeJobsSpecsToDB(self):
74     """
75     Write firstEvent and maxEvents in the DB for future use
76     """
77    
78 gutsche 1.38 self.job_type.split(self.jobParamsList)
79 nsmirnov 1.2 return
80    
81 nsmirnov 1.1 def nJobs(self):
82     return self.total_njobs
83    
84 spiga 1.61
85     def nJobsL(self):
86     jobsL=[]
87     for i in range(self.total_njobs):
88     jobsL.append(i+1)
89     return jobsL
90    
91 gutsche 1.42 def createJobTypeObject(self,ncjobs):
92 nsmirnov 1.1 file_name = 'cms_'+ string.lower(self.job_type_name)
93     klass_name = string.capitalize(self.job_type_name)
94    
95     try:
96     klass = importName(file_name, klass_name)
97     except KeyError:
98     msg = 'No `class '+klass_name+'` found in file `'+file_name+'.py`'
99     raise CrabException(msg)
100     except ImportError, e:
101     msg = 'Cannot create job type '+self.job_type_name
102     msg += ' (file: '+file_name+', class '+klass_name+'):\n'
103     msg += str(e)
104     raise CrabException(msg)
105    
106 gutsche 1.42 self.job_type = klass(self.cfg_params,ncjobs)
107 nsmirnov 1.1 return
108 nsmirnov 1.5
109     def jobType(self):
110     return self.job_type
111 nsmirnov 1.1
112     def run(self):
113 nsmirnov 1.4 """
114     The main method of the class.
115     """
116 nsmirnov 1.1
117 nsmirnov 1.3 common.logger.debug(5, "Creator::run() called")
118 slacapra 1.45 start = time.time()
119 nsmirnov 1.1 # Instantiate ScriptWriter
120 mcinquil 1.51 script_writer = None
121     if self.cfg_params['CRAB.scheduler'].find("glit") != -1: ## checking scheduler: if glite(coll) output_sandbox will be limited
122     script_writer = ScriptWriter('crab_template.sh', 1) ## flag that indicates if limit or not
123     else:
124     script_writer = ScriptWriter('crab_template.sh', 0)
125 nsmirnov 1.1
126     # Loop over jobs
127 slacapra 1.44 argsList = []
128 nsmirnov 1.1 njc = 0
129 spiga 1.60 listID=[]
130     listField=[]
131 spiga 1.63 listRunField=[]
132     run_jobToSave = {'status' :'C'}
133 nsmirnov 1.4 for nj in range(self.total_njobs):
134 spiga 1.64 output=[]
135 nsmirnov 1.1 if njc == self.ncjobs : break
136    
137 slacapra 1.44 common.logger.debug(1,"Creating job # "+`(nj+1)`)
138 spiga 1.63 listRunField.append(run_jobToSave)
139 nsmirnov 1.3
140 nsmirnov 1.1 # Prepare configuration file
141    
142 nsmirnov 1.4 self.job_type.modifySteeringCards(nj)
143 spiga 1.64 # outputSandbox=[]
144     # outputSandbox=self.job_type.outputSandbox(nj)
145     # outputSandbox.append('out_files_'+str(nj+1)+'.tgz')
146     output.append('out_files_'+str(nj+1)+'.tgz')
147     # job_ToSave={'outputFiles': outputSandbox}
148     job_ToSave={'outputFiles': output}
149 spiga 1.60 listField.append(job_ToSave)
150 spiga 1.57
151 spiga 1.62 listID.append(nj+1)
152 nsmirnov 1.1 njc = njc + 1
153     pass
154 spiga 1.60
155     # ## Not clear why here.. DS
156     # self.job_type.setArgsList()
157 spiga 1.63 common._db.updateRunJob_(listID , listRunField ) ## New BL--DS
158 spiga 1.60 common._db.updateJob_(listID, listField ) ## Nes BL--DS
159 spiga 1.57
160 slacapra 1.65
161 spiga 1.57 # Create script (sh)
162     script_writer.modifyTemplateScript()
163     os.chmod(common._db.queryTask('scriptName'), 0744) ## Modified BL--DS
164     # common: write input sandbox --- This is now a task attribute... not per job ## BL--DS
165 spiga 1.60
166 slacapra 1.65 # SL This should be a list, rather than a string!
167 spiga 1.60 concString = ','
168     inSand=''
169     if len(self.job_type.inputSandbox(1)):
170     inSand += concString.join(self.job_type.inputSandbox(1))
171 slacapra 1.65 # Sandbox, Start Dir , outputDir
172     param = {'globalSandbox': inSand , 'startDirectory': common.work_space.cwdDir() , 'outputDirectory': common.work_space.resDir() }
173     common._db.updateTask_(param)
174    
175     # set start Directory
176 spiga 1.60
177    
178 nsmirnov 1.1 ####
179 spiga 1.57 common.scheduler.declare(self.total_njobs )
180 spiga 1.60 common.scheduler.sched_fix_parameter()
181     # common.scheduler.sched_parameter()
182    
183     stop = time.time()
184 slacapra 1.44 common.logger.message('Creating '+str(self.total_njobs)+' jobs, please wait...')
185 fanzago 1.12
186 slacapra 1.45 stop = time.time()
187 slacapra 1.47 common.logger.debug(2, "Creation Time: "+str(stop - start))
188 slacapra 1.45 common.logger.write("Creation Time: "+str(stop - start))
189    
190 nsmirnov 1.1 msg = '\nTotal of %d jobs created'%njc
191     if njc != self.ncjobs: msg = msg + ' from %d requested'%self.ncjobs
192     msg = msg + '.\n'
193     common.logger.message(msg)
194 slacapra 1.45
195 nsmirnov 1.1 return