ViewVC Help
View File | Revision Log | Show Annotations | Root Listing
root/cvsroot/COMP/CRAB/python/SchedulerEdg.py
Revision: 1.13
Committed: Mon Oct 3 12:17:43 2005 UTC (19 years, 6 months ago) by fanzago
Content type: text/x-python
Branch: MAIN
CVS Tags: CRAB_1_0_0_pre1_boss_2
Changes since 1.12: +1 -17 lines
Log Message:
fixed bugs

File Contents

# Content
1 from Scheduler import Scheduler
2 from crab_logger import Logger
3 from crab_exceptions import *
4 from crab_util import *
5 import common
6
7 import os, sys, tempfile
8
9 class SchedulerEdg(Scheduler):
10 def __init__(self):
11 Scheduler.__init__(self,"EDG")
12 self.states = [ "Acl", "cancelReason", "cancelling","ce_node","children", \
13 "children_hist","children_num","children_states","condorId","condor_jdl", \
14 "cpuTime","destination", "done_code","exit_code","expectFrom", \
15 "expectUpdate","globusId","jdl","jobId","jobtype", \
16 "lastUpdateTime","localId","location", "matched_jdl","network_server", \
17 "owner","parent_job", "reason","resubmitted","rsl","seed",\
18 "stateEnterTime","stateEnterTimes","subjob_failed", \
19 "user tags" , "status" , "status_code","hierarchy"]
20 return
21
22 def configure(self, cfg_params):
23
24 try: self.edg_config = cfg_params["EDG.config"]
25 except KeyError: self.edg_config = ''
26
27 try: self.edg_config_vo = cfg_params["EDG.config_vo"]
28 except KeyError: self.edg_config_vo = ''
29
30 try: self.LCG_version = cfg_params["EDG.lcg_version"]
31 except KeyError: self.LCG_version = '2'
32
33 try: self.EDG_requirements = cfg_params['EDG.requirements']
34 except KeyError: self.EDG_requirements = ''
35
36 try: self.EDG_retry_count = cfg_params['EDG.retry_count']
37 except KeyError: self.EDG_retry_count = ''
38
39 try:
40 self.VO = cfg_params['EDG.virtual_organization']
41 except KeyError:
42 self.VO = 'cms'
43
44 # Add EDG_WL_LOCATION to the python path
45
46 try:
47 path = os.environ['EDG_WL_LOCATION']
48 except:
49 msg = "Error: the EDG_WL_LOCATION variable is not set."
50 raise CrabException(msg)
51
52 libPath=os.path.join(path, "lib")
53 sys.path.append(libPath)
54 libPath=os.path.join(path, "lib", "python")
55 sys.path.append(libPath)
56
57 self.checkProxy_()
58 return
59
60
61 def sched_parameter(self):
62 """
63 Returns file with scheduler-specific parameters
64 """
65
66 if (self.edg_config and self.edg_config_vo != ''):
67 self.param='sched_param.clad'
68 param_file = open(common.work_space.shareDir()+'/'+self.param, 'w')
69 param_file.write('RBconfig = "'+self.edg_config+'";\n')
70 param_file.write('RBconfigVO = "'+self.edg_config_vo+'";')
71 param_file.close()
72 return 1
73 else:
74 return 0
75
76 def wsSetupEnvironment(self):
77 """
78 Returns part of a job script which does scheduler-specific work.
79 """
80 txt = '\n'
81 txt += 'CloseCEs=`edg-brokerinfo getCE`\n'
82 txt += 'echo "CloseCEs = $CloseCEs"\n'
83 txt += 'CE=`echo $CloseCEs | sed -e "s/:.*//"`\n'
84 txt += 'echo "CE = $CE"\n'
85 return txt
86
87 def loggingInfo(self, nj):
88 """
89 retrieve the logging info from logging and bookkeeping and return it
90 """
91 id = common.jobDB.jobId(nj)
92 edg_ui_cfg_opt = ''
93 if self.edg_config:
94 edg_ui_cfg_opt = ' -c ' + self.edg_config + ' '
95 cmd = 'edg-job-get-logging-info -v 2 ' + edg_ui_cfg_opt + id
96 print cmd
97 myCmd = os.popen(cmd)
98 cmd_out = myCmd.readlines()
99 myCmd.close()
100 return cmd_out
101
102 def listMatch(self, nj):
103 """
104 Check the compatibility of available resources
105 """
106 jdl = common.job_list[nj].jdlFilename()
107 edg_ui_cfg_opt = ''
108 if self.edg_config:
109 edg_ui_cfg_opt = ' -c ' + self.edg_config + ' '
110 if self.edg_config_vo:
111 edg_ui_cfg_opt += ' --config-vo ' + self.edg_config_vo + ' '
112 cmd = 'edg-job-list-match ' + edg_ui_cfg_opt + jdl
113 myCmd = os.popen(cmd)
114 cmd_out = myCmd.readlines()
115 myCmd.close()
116 return self.parseListMatch_(cmd_out, jdl)
117
118 def parseListMatch_(self, out, jdl):
119 reComment = re.compile( r'^\**$' )
120 reEmptyLine = re.compile( r'^$' )
121 reVO = re.compile( r'Selected Virtual Organisation name.*' )
122 reCE = re.compile( r'CEId' )
123 reNO = re.compile( r'No Computing Element matching' )
124 reRB = re.compile( r'Connecting to host' )
125 next = 0
126 CEs=[]
127 Match=0
128 for line in out:
129 line = line.strip()
130 if reComment.match( line ):
131 next = 0
132 continue
133 if reEmptyLine.match(line):
134 continue
135 if reVO.match( line ):
136 VO =line.split()[-1]
137 common.logger.debug(5, 'VO :'+VO)
138 pass
139 if reRB.match( line ):
140 RB =line.split()[3]
141 common.logger.debug(5, 'Using RB :'+RB)
142 pass
143 if reCE.search( line ):
144 next = 1
145 continue
146 if next:
147 CE=line.split(':')[0]
148 CEs.append(CE)
149 common.logger.debug(5, 'Matched CE :'+CE)
150 Match=Match+1
151 pass
152 if reNO.match( line ):
153 common.logger.debug(5,line)
154 self.noMatchFound_(jdl)
155 Match=0
156 pass
157 return Match
158
159 def noMatchFound_(self, jdl):
160 reReq = re.compile( r'Requirements' )
161 reString = re.compile( r'"\S*"' )
162 f = file(jdl,'r')
163 for line in f.readlines():
164 line= line.strip()
165 if reReq.match(line):
166 for req in reString.findall(line):
167 if re.search("VO",req):
168 common.logger.message( "SW required: "+req)
169 continue
170 if re.search('"\d+',req):
171 common.logger.message("Other req : "+req)
172 continue
173 common.logger.message( "CE required: "+req)
174 break
175 pass
176 raise CrabException("No compatible resources found!")
177
178 def submit(self, nj):
179 """
180 Submit one EDG job.
181 """
182
183 jid = None
184 jdl = common.job_list[nj].jdlFilename()
185 id_tmp = tempfile.mktemp()
186 edg_ui_cfg_opt = ' '
187 if self.edg_config:
188 edg_ui_cfg_opt = ' -c ' + self.edg_config + ' '
189 if self.edg_config_vo:
190 edg_ui_cfg_opt += ' --config-vo ' + self.edg_config_vo + ' '
191 cmd = 'edg-job-submit -o ' + id_tmp + edg_ui_cfg_opt + jdl
192 cmd_out = runCommand(cmd)
193 if cmd_out != None:
194 idfile = open(id_tmp)
195 jid_line = idfile.readline()
196 while jid_line[0] == '#':
197 jid_line = idfile.readline()
198 pass
199 jid = string.strip(jid_line)
200 os.unlink(id_tmp)
201 pass
202 return jid
203
204 def getExitStatus(self, id):
205 return self.getStatusAttribute_(id, 'exit_code')
206
207 def queryStatus(self, id):
208 return self.getStatusAttribute_(id, 'status')
209
210 def queryDest(self, id):
211 return self.getStatusAttribute_(id, 'destination')
212
213
214 def getStatusAttribute_(self, id, attr):
215 """ Query a status of the job with id """
216
217 hstates = {}
218 Status = importName('edg_wl_userinterface_common_LbWrapper', 'Status')
219 # Bypass edg-job-status interfacing directly to C++ API
220 # Job attribute vector to retrieve status without edg-job-status
221 level = 0
222 # Instance of the Status class provided by LB API
223 jobStat = Status()
224 st = 0
225 jobStat.getStatus(id, level)
226 err, apiMsg = jobStat.get_error()
227 if err:
228 print 'Error caught', apiMsg
229 common.log.message(apiMsg)
230 return None
231 else:
232 for i in range(len(self.states)):
233 # Fill an hash table with all information retrieved from LB API
234 hstates[ self.states[i] ] = jobStat.loadStatus(st)[i]
235 result = jobStat.loadStatus(st)[ self.states.index(attr) ]
236 return result
237
238 def queryDetailedStatus(self, id):
239 """ Query a detailed status of the job with id """
240 cmd = 'edg-job-status '+id
241 cmd_out = runCommand(cmd)
242 return cmd_out
243
244 def getOutput(self, id):
245 """
246 Get output for a finished job with id.
247 Returns the name of directory with results.
248 """
249
250 cmd = 'edg-job-get-output --dir ' + common.work_space.resDir() + ' ' + id
251 cmd_out = runCommand(cmd)
252
253 # Determine the output directory name
254 dir = common.work_space.resDir()
255 dir += os.getlogin()
256 dir += '_' + os.path.basename(id)
257 return dir
258
259 def cancel(self, id):
260 """ Cancel the EDG job with id """
261 cmd = 'edg-job-cancel --noint ' + id
262 cmd_out = runCommand(cmd)
263 return cmd_out
264
265 def checkProxy_(self):
266 """
267 Function to check the Globus proxy.
268 """
269 cmd = 'grid-proxy-info -timeleft'
270 cmd_out = runCommand(cmd)
271 ok = 1
272 timeleft = -999
273 try: timeleft = int(cmd_out)
274 except ValueError: ok=0
275 except TypeError: ok=0
276 if timeleft < 1: ok=0
277
278 if ok==0:
279 print "No valid proxy found !\n"
280 print "Creating a user proxy with default length of 100h\n"
281 msg = "Unable to create a valid proxy!\n"
282 if os.system("grid-proxy-init -valid 100:00"):
283 raise CrabException(msg)
284 return
285
286 def createSchScript(self, nj):
287 """
288 Create a JDL-file for EDG.
289 """
290
291 job = common.job_list[nj]
292 jbt = job.type()
293 inp_sandbox = jbt.inputSandbox(nj)
294 out_sandbox = jbt.outputSandbox(nj)
295 inp_storage_subdir = ''
296
297 title = '# This JDL was generated by '+\
298 common.prog_name+' (version '+common.prog_version_str+')\n'
299 jt_string = ''
300
301 SPL = inp_storage_subdir
302 if ( SPL and SPL[-1] != '/' ) : SPL = SPL + '/'
303
304 jdl_fname = job.jdlFilename()
305 jdl = open(jdl_fname, 'w')
306 jdl.write(title)
307
308 script = job.scriptFilename()
309 jdl.write('Executable = "' + os.path.basename(script) +'";\n')
310 jdl.write(jt_string)
311
312 firstEvent = common.jobDB.firstEvent(nj)
313 maxEvents = common.jobDB.maxEvents(nj)
314 jdl.write('Arguments = "' + str(nj+1)+' '+str(firstEvent)+' '+str(maxEvents)+'";\n')
315
316 inp_box = 'InputSandbox = { '
317 inp_box = inp_box + '"' + script + '",'
318
319 if inp_sandbox != None:
320 for fl in inp_sandbox:
321 inp_box = inp_box + ' "' + fl + '",'
322 pass
323 pass
324
325 #if common.use_jam:
326 # inp_box = inp_box+' "'+common.bin_dir+'/'+common.run_jam+'",'
327
328 for addFile in jbt.additional_inbox_files:
329 addFile = os.path.abspath(addFile)
330 inp_box = inp_box+' "'+addFile+'",'
331 pass
332
333 if inp_box[-1] == ',' : inp_box = inp_box[:-1]
334 inp_box = inp_box + ' };\n'
335 jdl.write(inp_box)
336
337 jdl.write('StdOutput = "' + job.stdout() + '";\n')
338 jdl.write('StdError = "' + job.stderr() + '";\n')
339
340 #if common.flag_return_data :
341 # for fl in job.outputDataFiles():
342 # out_box = out_box + ' "' + fl + '",'
343 # pass
344 # pass
345
346 out_box = 'OutputSandbox = { '
347 if out_sandbox != None:
348 for fl in out_sandbox:
349 out_box = out_box + ' "' + fl + '",'
350 pass
351 pass
352
353 if out_box[-1] == ',' : out_box = out_box[:-1]
354 out_box = out_box + ' };'
355 jdl.write(out_box+'\n')
356
357 if common.analisys_common_info['sites']:
358 if common.analisys_common_info['sw_version']:
359
360 req='Requirements = '
361 ### First ORCA version
362 req=req + 'Member("VO-cms-' + \
363 common.analisys_common_info['sw_version'] + \
364 '", other.GlueHostApplicationSoftwareRunTimeEnvironment)'
365 ## then sites
366 if len(common.analisys_common_info['sites'])>0:
367 req = req + ' && ('
368 for i in range(len(common.analisys_common_info['sites'])):
369 req = req + 'other.GlueCEInfoHostName == "' \
370 + common.analisys_common_info['sites'][i] + '"'
371 if ( i < (int(len(common.analisys_common_info['sites']) - 1)) ):
372 req = req + ' || '
373 req = req + ')'
374 ## then user requirement
375 if self.EDG_requirements:
376 req = req + ' && ' + self.EDG_requirements
377 req = req + ';\n'
378 jdl.write(req)
379
380 jdl.write('VirtualOrganisation = "' + self.VO + '";\n')
381
382 if ( self.EDG_retry_count ):
383 jdl.write('RetryCount = '+self.EDG_retry_count+';\n')
384 pass
385
386 jdl.close()
387 return