ViewVC Help
View File | Revision Log | Show Annotations | Root Listing
root/cvsroot/COMP/CRAB/python/Creator.py
Revision: 1.64
Committed: Tue Apr 1 14:53:36 2008 UTC (17 years, 1 month ago) by spiga
Content type: text/x-python
Branch: MAIN
CVS Tags: CRAB_2_2_0_pre5, CRAB_2_2_0_pre4, CRAB_2_2_0_pre2
Changes since 1.63: +7 -6 lines
Log Message:
restored output functionality. Some fix on arguments format avoiding wrapper crash. Implemented tarball creation for output sanbox... still remaining to enable the check on output sandbox size...and the related backup solution

File Contents

# User Rev Content
1 nsmirnov 1.1 from Actor import Actor
2     from WorkSpace import WorkSpace
3     from JobList import JobList
4     from ScriptWriter import ScriptWriter
5     from Scheduler import Scheduler
6     from crab_logger import Logger
7     from crab_exceptions import *
8     from crab_util import *
9     import common
10    
11 slacapra 1.8 import os, string, math
12 slacapra 1.45 import time
13    
14 nsmirnov 1.1 class Creator(Actor):
15 corvo 1.30 def __init__(self, job_type_name, cfg_params, ncjobs):
16 nsmirnov 1.1 self.job_type_name = job_type_name
17 corvo 1.30 self.job_type = None
18 nsmirnov 1.1 self.cfg_params = cfg_params
19 nsmirnov 1.2 self.total_njobs = 0
20     self.total_number_of_events = 0
21     self.job_number_of_events = 0
22 slacapra 1.7 self.first_event = 0
23 gutsche 1.38 self.jobParamsList=[]
24 mcinquil 1.52
25 gutsche 1.42 self.createJobTypeObject(ncjobs)
26 nsmirnov 1.3 common.logger.debug(5, __name__+": JobType "+self.job_type.name()+" created")
27 nsmirnov 1.1
28     self.job_type.prepareSteeringCards()
29 nsmirnov 1.3 common.logger.debug(5, __name__+": Steering cards prepared")
30 nsmirnov 1.1
31 gutsche 1.38 self.total_njobs = self.job_type.numberOfJobs();
32 nsmirnov 1.3 common.logger.debug(5, __name__+": total # of jobs = "+`self.total_njobs`)
33 nsmirnov 1.1
34     self.ncjobs = ncjobs
35     if ncjobs == 'all' : self.ncjobs = self.total_njobs
36     if ncjobs > self.total_njobs : self.ncjobs = self.total_njobs
37 spiga 1.11
38 spiga 1.50
39 slacapra 1.35 # This is code for dashboard
40 spiga 1.36 #First checkProxy
41     common.scheduler.checkProxy()
42 corvo 1.33 try:
43 spiga 1.56 gridName = string.strip(common.scheduler.userName())
44 slacapra 1.53 common.logger.debug(5, "GRIDNAME: "+gridName)
45 corvo 1.33 taskType = 'analysis'
46 slacapra 1.54 VO = cfg_params.get('EDG.virtual_organization','cms')
47 corvo 1.33
48 slacapra 1.47 params = {'tool': common.prog_name,\
49     'JSToolVersion': common.prog_version_str, \
50     'tool_ui': os.environ['HOSTNAME'], \
51 slacapra 1.54 'scheduler': common.scheduler.name(), \
52 slacapra 1.53 'GridName': gridName, \
53 slacapra 1.47 'taskType': taskType, \
54 slacapra 1.48 'vo': VO, \
55 slacapra 1.54 'user': os.environ['USER']}
56 corvo 1.33 jtParam = self.job_type.getParams()
57     for i in jtParam.iterkeys():
58     params[i] = string.strip(jtParam[i])
59 slacapra 1.55 fl = open(common.work_space.shareDir() + '/' + common.apmon.fName, 'w')
60 corvo 1.33 for j, k in params.iteritems():
61     fl.write(j + ':' + k + '\n')
62     fl.close()
63     except:
64     exctype, value = sys.exc_info()[:2]
65 slacapra 1.35 common.logger.message("Creator::run Exception raised in collection params for dashboard: %s %s"%(exctype, value))
66 corvo 1.33 pass
67 nsmirnov 1.1
68 spiga 1.60 self.job_type_name = self.job_type.name()
69    
70 nsmirnov 1.3 common.logger.debug(5, "Creator constructor finished")
71 nsmirnov 1.1 return
72    
73 slacapra 1.7 def writeJobsSpecsToDB(self):
74     """
75     Write firstEvent and maxEvents in the DB for future use
76     """
77    
78 gutsche 1.38 self.job_type.split(self.jobParamsList)
79 nsmirnov 1.2 return
80    
81 nsmirnov 1.1 def nJobs(self):
82     return self.total_njobs
83    
84 spiga 1.61
85     def nJobsL(self):
86     jobsL=[]
87     for i in range(self.total_njobs):
88     jobsL.append(i+1)
89     return jobsL
90    
91 gutsche 1.42 def createJobTypeObject(self,ncjobs):
92 nsmirnov 1.1 file_name = 'cms_'+ string.lower(self.job_type_name)
93     klass_name = string.capitalize(self.job_type_name)
94    
95     try:
96     klass = importName(file_name, klass_name)
97     except KeyError:
98     msg = 'No `class '+klass_name+'` found in file `'+file_name+'.py`'
99     raise CrabException(msg)
100     except ImportError, e:
101     msg = 'Cannot create job type '+self.job_type_name
102     msg += ' (file: '+file_name+', class '+klass_name+'):\n'
103     msg += str(e)
104     raise CrabException(msg)
105    
106 gutsche 1.42 self.job_type = klass(self.cfg_params,ncjobs)
107 nsmirnov 1.1 return
108 nsmirnov 1.5
109     def jobType(self):
110     return self.job_type
111 nsmirnov 1.1
112     def run(self):
113 nsmirnov 1.4 """
114     The main method of the class.
115     """
116 nsmirnov 1.1
117 nsmirnov 1.3 common.logger.debug(5, "Creator::run() called")
118 slacapra 1.45 start = time.time()
119 nsmirnov 1.1 # Instantiate ScriptWriter
120 mcinquil 1.51 script_writer = None
121     if self.cfg_params['CRAB.scheduler'].find("glit") != -1: ## checking scheduler: if glite(coll) output_sandbox will be limited
122     script_writer = ScriptWriter('crab_template.sh', 1) ## flag that indicates if limit or not
123     else:
124     script_writer = ScriptWriter('crab_template.sh', 0)
125 nsmirnov 1.1
126     # Loop over jobs
127 slacapra 1.44 argsList = []
128 nsmirnov 1.1 njc = 0
129 spiga 1.60 listID=[]
130     listField=[]
131 spiga 1.63 listRunField=[]
132     run_jobToSave = {'status' :'C'}
133 nsmirnov 1.4 for nj in range(self.total_njobs):
134 spiga 1.64 output=[]
135 nsmirnov 1.1 if njc == self.ncjobs : break
136    
137 slacapra 1.44 common.logger.debug(1,"Creating job # "+`(nj+1)`)
138 spiga 1.63 listRunField.append(run_jobToSave)
139 nsmirnov 1.3
140 nsmirnov 1.1 # Prepare configuration file
141    
142 nsmirnov 1.4 self.job_type.modifySteeringCards(nj)
143 spiga 1.64 # outputSandbox=[]
144     # outputSandbox=self.job_type.outputSandbox(nj)
145     # outputSandbox.append('out_files_'+str(nj+1)+'.tgz')
146     output.append('out_files_'+str(nj+1)+'.tgz')
147     # job_ToSave={'outputFiles': outputSandbox}
148     job_ToSave={'outputFiles': output}
149 spiga 1.60 listField.append(job_ToSave)
150 spiga 1.57
151 spiga 1.62 listID.append(nj+1)
152 nsmirnov 1.1 njc = njc + 1
153     pass
154 spiga 1.60
155     # ## Not clear why here.. DS
156     # self.job_type.setArgsList()
157 spiga 1.63 common._db.updateRunJob_(listID , listRunField ) ## New BL--DS
158 spiga 1.60 common._db.updateJob_(listID, listField ) ## Nes BL--DS
159 spiga 1.57
160     # Create script (sh)
161     script_writer.modifyTemplateScript()
162     os.chmod(common._db.queryTask('scriptName'), 0744) ## Modified BL--DS
163     # common: write input sandbox --- This is now a task attribute... not per job ## BL--DS
164 spiga 1.60
165     concString = ','
166     inSand=''
167     if len(self.job_type.inputSandbox(1)):
168     inSand += concString.join(self.job_type.inputSandbox(1))
169     isb = {'globalSandbox': inSand}
170 spiga 1.57 common._db.updateTask_(isb)
171 spiga 1.60
172    
173 nsmirnov 1.1 ####
174 spiga 1.57 common.scheduler.declare(self.total_njobs )
175 spiga 1.60 common.scheduler.sched_fix_parameter()
176     # common.scheduler.sched_parameter()
177    
178     stop = time.time()
179 slacapra 1.44 common.logger.message('Creating '+str(self.total_njobs)+' jobs, please wait...')
180 fanzago 1.12
181 slacapra 1.45 stop = time.time()
182 slacapra 1.47 common.logger.debug(2, "Creation Time: "+str(stop - start))
183 slacapra 1.45 common.logger.write("Creation Time: "+str(stop - start))
184    
185 nsmirnov 1.1 msg = '\nTotal of %d jobs created'%njc
186     if njc != self.ncjobs: msg = msg + ' from %d requested'%self.ncjobs
187     msg = msg + '.\n'
188     common.logger.message(msg)
189 slacapra 1.45
190 nsmirnov 1.1 return