ViewVC Help
View File | Revision Log | Show Annotations | Root Listing
root/cvsroot/COMP/CRAB/python/Creator.py
Revision: 1.42
Committed: Sun Aug 27 21:10:58 2006 UTC (18 years, 8 months ago) by gutsche
Content type: text/x-python
Branch: MAIN
CVS Tags: HEAD_20092006
Changes since 1.41: +3 -3 lines
Log Message:
hand over ncjobs to jobtypes, used for stopping data discovery after requested number of jobs (-create 100)

File Contents

# User Rev Content
1 nsmirnov 1.1 from Actor import Actor
2     from WorkSpace import WorkSpace
3     from JobList import JobList
4     from JobDB import JobDB
5     from ScriptWriter import ScriptWriter
6     from Scheduler import Scheduler
7     from crab_logger import Logger
8     from crab_exceptions import *
9     from crab_util import *
10     import common
11    
12 slacapra 1.8 import os, string, math
13 corvo 1.26 # marco
14 nsmirnov 1.1 class Creator(Actor):
15 corvo 1.30 def __init__(self, job_type_name, cfg_params, ncjobs):
16 nsmirnov 1.1 self.job_type_name = job_type_name
17 corvo 1.30 self.job_type = None
18 nsmirnov 1.1 self.cfg_params = cfg_params
19 nsmirnov 1.2 self.total_njobs = 0
20     self.total_number_of_events = 0
21     self.job_number_of_events = 0
22 slacapra 1.7 self.first_event = 0
23 gutsche 1.38 self.jobParamsList=[]
24 slacapra 1.14
25 gutsche 1.42 self.createJobTypeObject(ncjobs)
26 nsmirnov 1.3 common.logger.debug(5, __name__+": JobType "+self.job_type.name()+" created")
27 nsmirnov 1.1
28     self.job_type.prepareSteeringCards()
29 nsmirnov 1.3 common.logger.debug(5, __name__+": Steering cards prepared")
30 nsmirnov 1.1
31 gutsche 1.38 self.total_njobs = self.job_type.numberOfJobs();
32 nsmirnov 1.3 common.logger.debug(5, __name__+": total # of jobs = "+`self.total_njobs`)
33 nsmirnov 1.1
34     # Set number of jobs to be created
35    
36     self.ncjobs = ncjobs
37     if ncjobs == 'all' : self.ncjobs = self.total_njobs
38     if ncjobs > self.total_njobs : self.ncjobs = self.total_njobs
39 spiga 1.11
40 slacapra 1.35 # This is code for proto-monitoring
41 spiga 1.20 fileCODE1 = open(common.work_space.shareDir()+"/.code","a")
42 slacapra 1.39 if self.job_type.name() == 'ORCA' or self.job_type.name() == 'ORCA_PUBDB' :
43 spiga 1.20 self.owner = cfg_params['ORCA.owner']
44     self.dataset = cfg_params['ORCA.dataset']
45     fileCODE1.write('::'+str(self.job_type.name())+'::'+str(self.ncjobs)+'::'+str(self.dataset)+'::'+str(self.owner))
46     pass
47     elif self.job_type.name() == 'FAMOS':
48 spiga 1.27 self.inputFile = cfg_params['FAMOS.input_lfn']
49     self.executable = cfg_params['FAMOS.executable']
50     fileCODE1.write('::'+str(self.job_type.name())+'::'+str(self.ncjobs)+'::'+str(self.inputFile)+'::'+str(self.executable))
51 spiga 1.20 pass
52 spiga 1.37 elif self.job_type.name() == 'CMSSW':
53 spiga 1.41 try:
54     self.primaryDataset = cfg_params['CMSSW.datasetpath'].split("/")[1]
55     except:
56     self.primaryDataset = 'None'
57     try:
58     self.ProcessedDataset = cfg_params['CMSSW.datasetpath'].split("/")[3]
59     except:
60     self.ProcessedDataset = ' '
61 spiga 1.40 fileCODE1.write('::'+str(self.job_type.name())+'::'+str(self.ncjobs)+'::'+str(self.primaryDataset)+'::'+str(self.ProcessedDataset))
62 spiga 1.37 pass
63    
64 spiga 1.11 fileCODE1.close()
65    
66 slacapra 1.35 # This is code for dashboard
67 spiga 1.36
68     #First checkProxy
69     common.scheduler.checkProxy()
70 corvo 1.33 try:
71     fl = open(common.work_space.shareDir() + '/' + self.cfg_params['apmon'].fName, 'w')
72     self.cfg_params['GridName'] = runCommand("voms-proxy-info -identity")
73     common.logger.debug(5, "GRIDNAME: "+self.cfg_params['GridName'])
74     taskType = 'analysis'
75    
76     params = {'tool': common.prog_name, 'tool_ui': os.environ['HOSTNAME'], \
77     'scheduler': self.cfg_params['CRAB.scheduler'], 'GridName': self.cfg_params['GridName'].strip(), \
78     'taskType': taskType, 'vo': self.cfg_params['EDG.virtual_organization'], 'user': self.cfg_params['user']}
79     jtParam = self.job_type.getParams()
80     for i in jtParam.iterkeys():
81     params[i] = string.strip(jtParam[i])
82     for j, k in params.iteritems():
83     # print "Values: %s %s"%(j, k)
84     fl.write(j + ':' + k + '\n')
85     fl.close()
86     except:
87     exctype, value = sys.exc_info()[:2]
88 slacapra 1.35 common.logger.message("Creator::run Exception raised in collection params for dashboard: %s %s"%(exctype, value))
89 corvo 1.33 pass
90 nsmirnov 1.1
91     #TODO: deprecated code, not needed,
92     # will be eliminated when WorkSpace.saveConfiguration()
93     # will be improved.
94     #
95     # Set/Save Job Type name
96    
97     jt_fname = common.work_space.shareDir() + 'jobtype'
98     if os.path.exists(jt_fname):
99     # Read stored job type name
100     jt_file = open(jt_fname, 'r')
101     jt = jt_file.read()
102     if self.job_type_name:
103     if ( jt != self.job_type_name+'\n' ):
104     msg = 'Job Type mismatch: requested <' + self.job_type_name
105     msg += '>, found <' + jt[:-1] + '>.'
106     raise CrabException(msg)
107     pass
108     else:
109     self.job_type_name = jt[:-1]
110     pass
111     jt_file.close()
112     pass
113     else:
114     # Save job type name
115     jt_file = open(jt_fname, 'w')
116     jt_file.write(self.job_type_name+'\n')
117     jt_file.close()
118     pass
119     #end of deprecated code
120    
121 nsmirnov 1.3 common.logger.debug(5, "Creator constructor finished")
122 nsmirnov 1.1 return
123    
124 slacapra 1.7 def writeJobsSpecsToDB(self):
125     """
126     Write firstEvent and maxEvents in the DB for future use
127     """
128    
129 gutsche 1.38 self.job_type.split(self.jobParamsList)
130 nsmirnov 1.2 return
131    
132 nsmirnov 1.1 def nJobs(self):
133     return self.total_njobs
134    
135 gutsche 1.42 def createJobTypeObject(self,ncjobs):
136 nsmirnov 1.1 file_name = 'cms_'+ string.lower(self.job_type_name)
137     klass_name = string.capitalize(self.job_type_name)
138    
139     try:
140     klass = importName(file_name, klass_name)
141     except KeyError:
142     msg = 'No `class '+klass_name+'` found in file `'+file_name+'.py`'
143     raise CrabException(msg)
144     except ImportError, e:
145     msg = 'Cannot create job type '+self.job_type_name
146     msg += ' (file: '+file_name+', class '+klass_name+'):\n'
147     msg += str(e)
148     raise CrabException(msg)
149    
150 gutsche 1.42 self.job_type = klass(self.cfg_params,ncjobs)
151 nsmirnov 1.1 return
152 nsmirnov 1.5
153     def jobType(self):
154     return self.job_type
155 nsmirnov 1.1
156     def run(self):
157 nsmirnov 1.4 """
158     The main method of the class.
159     """
160 nsmirnov 1.1
161 nsmirnov 1.3 common.logger.debug(5, "Creator::run() called")
162    
163 nsmirnov 1.1 # Instantiate ScriptWriter
164    
165     script_writer = ScriptWriter('crab_template.sh')
166    
167     # Loop over jobs
168    
169     njc = 0
170 nsmirnov 1.4 for nj in range(self.total_njobs):
171 nsmirnov 1.1 if njc == self.ncjobs : break
172 nsmirnov 1.4 st = common.jobDB.status(nj)
173 nsmirnov 1.1 if st != 'X': continue
174    
175 nsmirnov 1.6 common.logger.message("Creating job # "+`(nj+1)`)
176 nsmirnov 1.3
177 nsmirnov 1.1 # Prepare configuration file
178    
179 nsmirnov 1.4 self.job_type.modifySteeringCards(nj)
180 fanzago 1.12 # Create script (sh)
181 fanzago 1.13 #print "nel for prima del modifyTemplateScript, nj vale", nj
182 nsmirnov 1.4 script_writer.modifyTemplateScript(nj)
183     os.chmod(common.job_list[nj].scriptFilename(), 0744)
184 nsmirnov 1.1
185 fanzago 1.12 # Create scheduler scripts (jdl)
186     common.scheduler.createSchScript(nj)
187    
188 nsmirnov 1.4 common.jobDB.setStatus(nj, 'C')
189 slacapra 1.9 # common: write input and output sandbox
190     common.jobDB.setInputSandbox(nj, self.job_type.inputSandbox(nj))
191    
192     outputSandbox=self.job_type.outputSandbox(nj)
193 gutsche 1.32 stdout=common.job_list[nj].stdout()
194 slacapra 1.9 stderr=common.job_list[nj].stderr()
195     outputSandbox.append(common.job_list[nj].stdout())
196     # check if out!=err
197     if stdout != stderr:
198     outputSandbox.append(common.job_list[nj].stderr())
199     common.jobDB.setOutputSandbox(nj, outputSandbox)
200 corvo 1.18 common.jobDB.setTaskId(nj, self.cfg_params['taskId'])
201 slacapra 1.9
202 nsmirnov 1.1 njc = njc + 1
203     pass
204    
205     ####
206 fanzago 1.12
207 nsmirnov 1.1 common.jobDB.save()
208    
209     msg = '\nTotal of %d jobs created'%njc
210     if njc != self.ncjobs: msg = msg + ' from %d requested'%self.ncjobs
211     msg = msg + '.\n'
212     common.logger.message(msg)
213     return