ViewVC Help
View File | Revision Log | Show Annotations | Root Listing
root/cvsroot/COMP/CRAB/python/Creator.py
Revision: 1.55
Committed: Mon Feb 4 15:06:28 2008 UTC (17 years, 3 months ago) by slacapra
Content type: text/x-python
Branch: MAIN
CVS Tags: CRAB_2_1_0_pre4
Changes since 1.54: +1 -1 lines
Log Message:
improve slightly the reading of ml dictionary from file

File Contents

# User Rev Content
1 nsmirnov 1.1 from Actor import Actor
2     from WorkSpace import WorkSpace
3     from JobList import JobList
4     from JobDB import JobDB
5     from ScriptWriter import ScriptWriter
6     from Scheduler import Scheduler
7     from crab_logger import Logger
8     from crab_exceptions import *
9     from crab_util import *
10     import common
11    
12 slacapra 1.8 import os, string, math
13 slacapra 1.45 import time
14    
15 nsmirnov 1.1 class Creator(Actor):
16 corvo 1.30 def __init__(self, job_type_name, cfg_params, ncjobs):
17 nsmirnov 1.1 self.job_type_name = job_type_name
18 corvo 1.30 self.job_type = None
19 nsmirnov 1.1 self.cfg_params = cfg_params
20 nsmirnov 1.2 self.total_njobs = 0
21     self.total_number_of_events = 0
22     self.job_number_of_events = 0
23 slacapra 1.7 self.first_event = 0
24 gutsche 1.38 self.jobParamsList=[]
25 mcinquil 1.52
26 gutsche 1.42 self.createJobTypeObject(ncjobs)
27 nsmirnov 1.3 common.logger.debug(5, __name__+": JobType "+self.job_type.name()+" created")
28 nsmirnov 1.1
29     self.job_type.prepareSteeringCards()
30 nsmirnov 1.3 common.logger.debug(5, __name__+": Steering cards prepared")
31 nsmirnov 1.1
32 gutsche 1.38 self.total_njobs = self.job_type.numberOfJobs();
33 nsmirnov 1.3 common.logger.debug(5, __name__+": total # of jobs = "+`self.total_njobs`)
34 nsmirnov 1.1
35     # Set number of jobs to be created
36 slacapra 1.44 # ---------------------------------> Boss4: changed, "all" by default
37 nsmirnov 1.1
38     self.ncjobs = ncjobs
39     if ncjobs == 'all' : self.ncjobs = self.total_njobs
40     if ncjobs > self.total_njobs : self.ncjobs = self.total_njobs
41 spiga 1.11
42 spiga 1.50
43     self.UseServer=0
44     try:
45     self.UseServer=int(self.cfg_params['CRAB.server_mode'])
46     except KeyError:
47     pass
48    
49 slacapra 1.35 # This is code for dashboard
50 spiga 1.36 #First checkProxy
51     common.scheduler.checkProxy()
52 corvo 1.33 try:
53 slacapra 1.53 gridName = common.scheduler.userName()
54     common.logger.debug(5, "GRIDNAME: "+gridName)
55 corvo 1.33 taskType = 'analysis'
56 slacapra 1.54 VO = cfg_params.get('EDG.virtual_organization','cms')
57 corvo 1.33
58 slacapra 1.47 params = {'tool': common.prog_name,\
59     'JSToolVersion': common.prog_version_str, \
60     'tool_ui': os.environ['HOSTNAME'], \
61 slacapra 1.54 'scheduler': common.scheduler.name(), \
62 slacapra 1.53 'GridName': gridName, \
63 slacapra 1.47 'taskType': taskType, \
64 slacapra 1.48 'vo': VO, \
65 slacapra 1.54 'user': os.environ['USER']}
66 corvo 1.33 jtParam = self.job_type.getParams()
67     for i in jtParam.iterkeys():
68     params[i] = string.strip(jtParam[i])
69 slacapra 1.55 fl = open(common.work_space.shareDir() + '/' + common.apmon.fName, 'w')
70 corvo 1.33 for j, k in params.iteritems():
71     fl.write(j + ':' + k + '\n')
72     fl.close()
73     except:
74     exctype, value = sys.exc_info()[:2]
75 slacapra 1.35 common.logger.message("Creator::run Exception raised in collection params for dashboard: %s %s"%(exctype, value))
76 corvo 1.33 pass
77 nsmirnov 1.1
78     # Set/Save Job Type name
79 slacapra 1.49 self.job_type_name = common.taskDB.dict("jobtype")
80 nsmirnov 1.1
81 nsmirnov 1.3 common.logger.debug(5, "Creator constructor finished")
82 nsmirnov 1.1 return
83    
84 slacapra 1.7 def writeJobsSpecsToDB(self):
85     """
86     Write firstEvent and maxEvents in the DB for future use
87     """
88    
89 gutsche 1.38 self.job_type.split(self.jobParamsList)
90 nsmirnov 1.2 return
91    
92 nsmirnov 1.1 def nJobs(self):
93     return self.total_njobs
94    
95 gutsche 1.42 def createJobTypeObject(self,ncjobs):
96 nsmirnov 1.1 file_name = 'cms_'+ string.lower(self.job_type_name)
97     klass_name = string.capitalize(self.job_type_name)
98    
99     try:
100     klass = importName(file_name, klass_name)
101     except KeyError:
102     msg = 'No `class '+klass_name+'` found in file `'+file_name+'.py`'
103     raise CrabException(msg)
104     except ImportError, e:
105     msg = 'Cannot create job type '+self.job_type_name
106     msg += ' (file: '+file_name+', class '+klass_name+'):\n'
107     msg += str(e)
108     raise CrabException(msg)
109    
110 gutsche 1.42 self.job_type = klass(self.cfg_params,ncjobs)
111 nsmirnov 1.1 return
112 nsmirnov 1.5
113     def jobType(self):
114     return self.job_type
115 nsmirnov 1.1
116     def run(self):
117 nsmirnov 1.4 """
118     The main method of the class.
119     """
120 nsmirnov 1.1
121 nsmirnov 1.3 common.logger.debug(5, "Creator::run() called")
122 slacapra 1.45 start = time.time()
123 nsmirnov 1.1 # Instantiate ScriptWriter
124 mcinquil 1.51 script_writer = None
125     if self.cfg_params['CRAB.scheduler'].find("glit") != -1: ## checking scheduler: if glite(coll) output_sandbox will be limited
126     script_writer = ScriptWriter('crab_template.sh', 1) ## flag that indicates if limit or not
127     else:
128     script_writer = ScriptWriter('crab_template.sh', 0)
129 nsmirnov 1.1
130     # Loop over jobs
131 slacapra 1.44 argsList = []
132 nsmirnov 1.1 njc = 0
133 slacapra 1.46 block = -1 # first block is 0
134     lastDest=''
135 nsmirnov 1.4 for nj in range(self.total_njobs):
136 nsmirnov 1.1 if njc == self.ncjobs : break
137 nsmirnov 1.4 st = common.jobDB.status(nj)
138 nsmirnov 1.1 if st != 'X': continue
139    
140 slacapra 1.44 common.logger.debug(1,"Creating job # "+`(nj+1)`)
141 nsmirnov 1.3
142 nsmirnov 1.1 # Prepare configuration file
143    
144 nsmirnov 1.4 self.job_type.modifySteeringCards(nj)
145 mcinquil 1.52
146     # Create XML script and declare related jobs
147     argsList.append( str(nj+1)+' '+ self.job_type.getJobTypeArguments(nj, self.cfg_params['CRAB.scheduler']) )
148     self.job_type.setArgsList(argsList)
149    
150 fanzago 1.12 # Create script (sh)
151 slacapra 1.43 script_writer.modifyTemplateScript()
152     os.chmod(common.taskDB.dict("ScriptName"), 0744)
153 nsmirnov 1.1
154 nsmirnov 1.4 common.jobDB.setStatus(nj, 'C')
155 slacapra 1.9 # common: write input and output sandbox
156     common.jobDB.setInputSandbox(nj, self.job_type.inputSandbox(nj))
157    
158     outputSandbox=self.job_type.outputSandbox(nj)
159     # check if out!=err
160     common.jobDB.setOutputSandbox(nj, outputSandbox)
161 slacapra 1.53 common.jobDB.setTaskId(nj, common.taskDB.dict('taskId'))
162 slacapra 1.9
163 slacapra 1.46 currDest=common.jobDB.destination(nj)
164     if (currDest!=lastDest):
165     block+=1
166     lastDest = currDest
167     pass
168     common.jobDB.setBlock(nj,block)
169 nsmirnov 1.1 njc = njc + 1
170     pass
171     ####
172 slacapra 1.54 common.scheduler.sched_parameter()
173 slacapra 1.44 common.scheduler.createXMLSchScript(self.total_njobs, argsList)
174     common.logger.message('Creating '+str(self.total_njobs)+' jobs, please wait...')
175 spiga 1.50 # modified to support server mode - DS
176     # if crab is used in server mode the client must skyp
177     # boss declare step. In this case it is performed at server level
178     # just before the submission step
179 slacapra 1.54 ## SL TODO: I would do this inside scheduler create...
180 spiga 1.50 if (self.UseServer== 0):
181 slacapra 1.54 common.scheduler.declare() #Add for BOSS4
182 fanzago 1.12
183 slacapra 1.45 stop = time.time()
184 slacapra 1.47 common.logger.debug(2, "Creation Time: "+str(stop - start))
185 slacapra 1.45 common.logger.write("Creation Time: "+str(stop - start))
186    
187 nsmirnov 1.1 common.jobDB.save()
188 slacapra 1.43 common.taskDB.save()
189 nsmirnov 1.1 msg = '\nTotal of %d jobs created'%njc
190     if njc != self.ncjobs: msg = msg + ' from %d requested'%self.ncjobs
191     msg = msg + '.\n'
192     common.logger.message(msg)
193 slacapra 1.45
194 nsmirnov 1.1 return