ViewVC Help
View File | Revision Log | Show Annotations | Root Listing
root/cvsroot/COMP/CRAB/python/Creator.py
Revision: 1.54
Committed: Fri Jan 18 18:41:03 2008 UTC (17 years, 3 months ago) by slacapra
Content type: text/x-python
Branch: MAIN
CVS Tags: CRAB_2_1_0_pre3
Changes since 1.53: +6 -6 lines
Log Message:
Redesign the inheritance three of Scheduler Classes, and remove the SchedulerBoss one
Introduce ScheuledGrid and ScheduleLocal as base classes for Grid and local scheduler, respectively.
All interaction with boss are done via Boss class, whose (unique) instance is owned by Scheduler
These changes are done to reduce code duplication.

Plus other minor modification and cosmetcs, as usual

File Contents

# User Rev Content
1 nsmirnov 1.1 from Actor import Actor
2     from WorkSpace import WorkSpace
3     from JobList import JobList
4     from JobDB import JobDB
5     from ScriptWriter import ScriptWriter
6     from Scheduler import Scheduler
7     from crab_logger import Logger
8     from crab_exceptions import *
9     from crab_util import *
10     import common
11    
12 slacapra 1.8 import os, string, math
13 slacapra 1.45 import time
14    
15 nsmirnov 1.1 class Creator(Actor):
16 corvo 1.30 def __init__(self, job_type_name, cfg_params, ncjobs):
17 nsmirnov 1.1 self.job_type_name = job_type_name
18 corvo 1.30 self.job_type = None
19 nsmirnov 1.1 self.cfg_params = cfg_params
20 nsmirnov 1.2 self.total_njobs = 0
21     self.total_number_of_events = 0
22     self.job_number_of_events = 0
23 slacapra 1.7 self.first_event = 0
24 gutsche 1.38 self.jobParamsList=[]
25 mcinquil 1.52
26 gutsche 1.42 self.createJobTypeObject(ncjobs)
27 nsmirnov 1.3 common.logger.debug(5, __name__+": JobType "+self.job_type.name()+" created")
28 nsmirnov 1.1
29     self.job_type.prepareSteeringCards()
30 nsmirnov 1.3 common.logger.debug(5, __name__+": Steering cards prepared")
31 nsmirnov 1.1
32 gutsche 1.38 self.total_njobs = self.job_type.numberOfJobs();
33 nsmirnov 1.3 common.logger.debug(5, __name__+": total # of jobs = "+`self.total_njobs`)
34 nsmirnov 1.1
35     # Set number of jobs to be created
36 slacapra 1.44 # ---------------------------------> Boss4: changed, "all" by default
37 nsmirnov 1.1
38     self.ncjobs = ncjobs
39     if ncjobs == 'all' : self.ncjobs = self.total_njobs
40     if ncjobs > self.total_njobs : self.ncjobs = self.total_njobs
41 spiga 1.11
42 spiga 1.50
43     self.UseServer=0
44     try:
45     self.UseServer=int(self.cfg_params['CRAB.server_mode'])
46     except KeyError:
47     pass
48    
49 slacapra 1.35 # This is code for dashboard
50 spiga 1.36 #First checkProxy
51     common.scheduler.checkProxy()
52 corvo 1.33 try:
53 slacapra 1.53 fl = open(common.work_space.shareDir() + '/' + common.apmon.fName, 'w')
54     gridName = common.scheduler.userName()
55     common.logger.debug(5, "GRIDNAME: "+gridName)
56 corvo 1.33 taskType = 'analysis'
57 slacapra 1.54 VO = cfg_params.get('EDG.virtual_organization','cms')
58 corvo 1.33
59 slacapra 1.47 params = {'tool': common.prog_name,\
60     'JSToolVersion': common.prog_version_str, \
61     'tool_ui': os.environ['HOSTNAME'], \
62 slacapra 1.54 'scheduler': common.scheduler.name(), \
63 slacapra 1.53 'GridName': gridName, \
64 slacapra 1.47 'taskType': taskType, \
65 slacapra 1.48 'vo': VO, \
66 slacapra 1.54 'user': os.environ['USER']}
67 corvo 1.33 jtParam = self.job_type.getParams()
68     for i in jtParam.iterkeys():
69     params[i] = string.strip(jtParam[i])
70     for j, k in params.iteritems():
71     fl.write(j + ':' + k + '\n')
72     fl.close()
73     except:
74     exctype, value = sys.exc_info()[:2]
75 slacapra 1.35 common.logger.message("Creator::run Exception raised in collection params for dashboard: %s %s"%(exctype, value))
76 corvo 1.33 pass
77 nsmirnov 1.1
78     # Set/Save Job Type name
79 slacapra 1.49 self.job_type_name = common.taskDB.dict("jobtype")
80 nsmirnov 1.1
81 nsmirnov 1.3 common.logger.debug(5, "Creator constructor finished")
82 nsmirnov 1.1 return
83    
84 slacapra 1.7 def writeJobsSpecsToDB(self):
85     """
86     Write firstEvent and maxEvents in the DB for future use
87     """
88    
89 gutsche 1.38 self.job_type.split(self.jobParamsList)
90 nsmirnov 1.2 return
91    
92 nsmirnov 1.1 def nJobs(self):
93     return self.total_njobs
94    
95 gutsche 1.42 def createJobTypeObject(self,ncjobs):
96 nsmirnov 1.1 file_name = 'cms_'+ string.lower(self.job_type_name)
97     klass_name = string.capitalize(self.job_type_name)
98    
99     try:
100     klass = importName(file_name, klass_name)
101     except KeyError:
102     msg = 'No `class '+klass_name+'` found in file `'+file_name+'.py`'
103     raise CrabException(msg)
104     except ImportError, e:
105     msg = 'Cannot create job type '+self.job_type_name
106     msg += ' (file: '+file_name+', class '+klass_name+'):\n'
107     msg += str(e)
108     raise CrabException(msg)
109    
110 gutsche 1.42 self.job_type = klass(self.cfg_params,ncjobs)
111 nsmirnov 1.1 return
112 nsmirnov 1.5
113     def jobType(self):
114     return self.job_type
115 nsmirnov 1.1
116     def run(self):
117 nsmirnov 1.4 """
118     The main method of the class.
119     """
120 nsmirnov 1.1
121 nsmirnov 1.3 common.logger.debug(5, "Creator::run() called")
122 slacapra 1.45 start = time.time()
123 nsmirnov 1.1 # Instantiate ScriptWriter
124 mcinquil 1.51 script_writer = None
125     if self.cfg_params['CRAB.scheduler'].find("glit") != -1: ## checking scheduler: if glite(coll) output_sandbox will be limited
126     script_writer = ScriptWriter('crab_template.sh', 1) ## flag that indicates if limit or not
127     else:
128     script_writer = ScriptWriter('crab_template.sh', 0)
129 nsmirnov 1.1
130     # Loop over jobs
131 slacapra 1.44 argsList = []
132 nsmirnov 1.1 njc = 0
133 slacapra 1.46 block = -1 # first block is 0
134     lastDest=''
135 nsmirnov 1.4 for nj in range(self.total_njobs):
136 nsmirnov 1.1 if njc == self.ncjobs : break
137 nsmirnov 1.4 st = common.jobDB.status(nj)
138 nsmirnov 1.1 if st != 'X': continue
139    
140 slacapra 1.44 common.logger.debug(1,"Creating job # "+`(nj+1)`)
141 nsmirnov 1.3
142 nsmirnov 1.1 # Prepare configuration file
143    
144 nsmirnov 1.4 self.job_type.modifySteeringCards(nj)
145 mcinquil 1.52
146     # Create XML script and declare related jobs
147     argsList.append( str(nj+1)+' '+ self.job_type.getJobTypeArguments(nj, self.cfg_params['CRAB.scheduler']) )
148     self.job_type.setArgsList(argsList)
149    
150 fanzago 1.12 # Create script (sh)
151 slacapra 1.43 script_writer.modifyTemplateScript()
152     os.chmod(common.taskDB.dict("ScriptName"), 0744)
153 nsmirnov 1.1
154 nsmirnov 1.4 common.jobDB.setStatus(nj, 'C')
155 slacapra 1.9 # common: write input and output sandbox
156     common.jobDB.setInputSandbox(nj, self.job_type.inputSandbox(nj))
157    
158     outputSandbox=self.job_type.outputSandbox(nj)
159     # check if out!=err
160     common.jobDB.setOutputSandbox(nj, outputSandbox)
161 slacapra 1.53 common.jobDB.setTaskId(nj, common.taskDB.dict('taskId'))
162 slacapra 1.9
163 slacapra 1.46 currDest=common.jobDB.destination(nj)
164     if (currDest!=lastDest):
165     block+=1
166     lastDest = currDest
167     pass
168     common.jobDB.setBlock(nj,block)
169 nsmirnov 1.1 njc = njc + 1
170     pass
171     ####
172 slacapra 1.54 common.scheduler.sched_parameter()
173 slacapra 1.44 common.scheduler.createXMLSchScript(self.total_njobs, argsList)
174     common.logger.message('Creating '+str(self.total_njobs)+' jobs, please wait...')
175 spiga 1.50 # modified to support server mode - DS
176     # if crab is used in server mode the client must skyp
177     # boss declare step. In this case it is performed at server level
178     # just before the submission step
179 slacapra 1.54 ## SL TODO: I would do this inside scheduler create...
180 spiga 1.50 if (self.UseServer== 0):
181 slacapra 1.54 common.scheduler.declare() #Add for BOSS4
182 fanzago 1.12
183 slacapra 1.45 stop = time.time()
184 slacapra 1.47 common.logger.debug(2, "Creation Time: "+str(stop - start))
185 slacapra 1.45 common.logger.write("Creation Time: "+str(stop - start))
186    
187 nsmirnov 1.1 common.jobDB.save()
188 slacapra 1.43 common.taskDB.save()
189 nsmirnov 1.1 msg = '\nTotal of %d jobs created'%njc
190     if njc != self.ncjobs: msg = msg + ' from %d requested'%self.ncjobs
191     msg = msg + '.\n'
192     common.logger.message(msg)
193 slacapra 1.45
194 nsmirnov 1.1 return