ViewVC Help
View File | Revision Log | Show Annotations | Root Listing
root/cvsroot/COMP/CRAB/python/SchedulerEdg.py
Revision: 1.10
Committed: Wed Aug 31 16:06:40 2005 UTC (19 years, 8 months ago) by fanzago
Content type: text/x-python
Branch: MAIN
Changes since 1.9: +21 -16 lines
Log Message:
new function sched_param, changed createJdl in createSchScript

File Contents

# User Rev Content
1 nsmirnov 1.1 from Scheduler import Scheduler
2     from crab_logger import Logger
3     from crab_exceptions import *
4     from crab_util import *
5     import common
6    
7     import os, sys, tempfile
8    
9     class SchedulerEdg(Scheduler):
10     def __init__(self):
11     Scheduler.__init__(self,"EDG")
12 slacapra 1.8 self.states = [ "Acl", "cancelReason", "cancelling","ce_node","children", \
13     "children_hist","children_num","children_states","condorId","condor_jdl", \
14     "cpuTime","destination", "done_code","exit_code","expectFrom", \
15     "expectUpdate","globusId","jdl","jobId","jobtype", \
16     "lastUpdateTime","localId","location", "matched_jdl","network_server", \
17     "owner","parent_job", "reason","resubmitted","rsl","seed",\
18     "stateEnterTime","stateEnterTimes","subjob_failed", \
19     "user tags" , "status" , "status_code","hierarchy"]
20 nsmirnov 1.1 return
21    
22     def configure(self, cfg_params):
23    
24 fanzago 1.10 try: self.edg_config = cfg_params["EDG.config"]
25 nsmirnov 1.1 except KeyError: self.edg_config = ''
26    
27 fanzago 1.10 try: self.edg_config_vo = cfg_params["EDG.config_vo"]
28 nsmirnov 1.3 except KeyError: self.edg_config_vo = ''
29 nsmirnov 1.1
30     try: self.LCG_version = cfg_params["EDG.lcg_version"]
31     except KeyError: self.LCG_version = '2'
32    
33     try: self.EDG_requirements = cfg_params['EDG.requirements']
34     except KeyError: self.EDG_requirements = ''
35    
36     try: self.EDG_retry_count = cfg_params['EDG.retry_count']
37     except KeyError: self.EDG_retry_count = ''
38    
39     try:
40     self.VO = cfg_params['EDG.virtual_organization']
41     except KeyError:
42 slacapra 1.8 self.VO = 'cms'
43 nsmirnov 1.1
44     # Add EDG_WL_LOCATION to the python path
45    
46     try:
47     path = os.environ['EDG_WL_LOCATION']
48     except:
49     msg = "Error: the EDG_WL_LOCATION variable is not set."
50     raise CrabException(msg)
51    
52     libPath=os.path.join(path, "lib")
53     sys.path.append(libPath)
54     libPath=os.path.join(path, "lib", "python")
55     sys.path.append(libPath)
56    
57 nsmirnov 1.4 self.checkProxy_()
58 nsmirnov 1.1 return
59    
60 fanzago 1.10
61     def sched_parameter(self):
62     """
63     Returns file with scheduler-specific parameters
64     """
65    
66     if (self.edg_config and self.edg_config_vo != ''):
67     self.param='sched_param.clad'
68     param_file = open(common.work_space.shareDir()+'/'+self.param, 'w')
69     param_file.write('RBconfig = "'+self.edg_config+'";\n')
70     param_file.write('RBconfigVO = "'+self.edg_config_vo+'";')
71     param_file.close()
72     return 1
73     else:
74     return 0
75 nsmirnov 1.2 def wsSetupEnvironment(self):
76     """
77     Returns part of a job script which does scheduler-specific work.
78     """
79     txt = '\n'
80     txt += 'CloseCEs=`edg-brokerinfo getCE`\n'
81     txt += 'echo "CloseCEs = $CloseCEs"\n'
82     txt += 'CE=`echo $CloseCEs | sed -e "s/:.*//"`\n'
83     txt += 'echo "CE = $CE"\n'
84     return txt
85 nsmirnov 1.1
86 slacapra 1.7 def loggingInfo(self, nj):
87     """
88     retrieve the logging info from logging and bookkeeping and return it
89     """
90     id = common.jobDB.jobId(nj)
91     edg_ui_cfg_opt = ''
92     if self.edg_config:
93     edg_ui_cfg_opt = ' -c ' + self.edg_config + ' '
94     cmd = 'edg-job-get-logging-info -v 2 ' + edg_ui_cfg_opt + id
95     print cmd
96     myCmd = os.popen(cmd)
97     cmd_out = myCmd.readlines()
98     myCmd.close()
99     return cmd_out
100    
101 slacapra 1.6 def listMatch(self, nj):
102     """
103     Check the compatibility of available resources
104     """
105     jdl = common.job_list[nj].jdlFilename()
106     edg_ui_cfg_opt = ''
107     if self.edg_config:
108     edg_ui_cfg_opt = ' -c ' + self.edg_config + ' '
109     if self.edg_config_vo:
110     edg_ui_cfg_opt += ' --config-vo ' + self.edg_config_vo + ' '
111     cmd = 'edg-job-list-match ' + edg_ui_cfg_opt + jdl
112     myCmd = os.popen(cmd)
113     cmd_out = myCmd.readlines()
114     myCmd.close()
115     return self.parseListMatch_(cmd_out, jdl)
116    
117     def parseListMatch_(self, out, jdl):
118     reComment = re.compile( r'^\**$' )
119     reEmptyLine = re.compile( r'^$' )
120     reVO = re.compile( r'Selected Virtual Organisation name.*' )
121     reCE = re.compile( r'CEId' )
122     reNO = re.compile( r'No Computing Element matching' )
123     reRB = re.compile( r'Connecting to host' )
124     next = 0
125     CEs=[]
126     Match=0
127     for line in out:
128     line = line.strip()
129     if reComment.match( line ):
130     next = 0
131     continue
132     if reEmptyLine.match(line):
133     continue
134     if reVO.match( line ):
135 slacapra 1.7 VO =line.split()[-1]
136 slacapra 1.6 common.logger.debug(5, 'VO :'+VO)
137     pass
138     if reRB.match( line ):
139     RB =line.split()[3]
140     common.logger.debug(5, 'Using RB :'+RB)
141     pass
142     if reCE.search( line ):
143     next = 1
144     continue
145     if next:
146     CE=line.split(':')[0]
147     CEs.append(CE)
148     common.logger.debug(5, 'Matched CE :'+CE)
149     Match=Match+1
150     pass
151     if reNO.match( line ):
152     common.logger.debug(5,line)
153     self.noMatchFound_(jdl)
154     Match=0
155     pass
156     return Match
157    
158     def noMatchFound_(self, jdl):
159     reReq = re.compile( r'Requirements' )
160     reString = re.compile( r'"\S*"' )
161     f = file(jdl,'r')
162     for line in f.readlines():
163     line= line.strip()
164     if reReq.match(line):
165     for req in reString.findall(line):
166     if re.search("VO",req):
167     common.logger.message( "SW required: "+req)
168     continue
169     if re.search('"\d+',req):
170     common.logger.message("Other req : "+req)
171     continue
172     common.logger.message( "CE required: "+req)
173     break
174     pass
175     raise CrabException("No compatible resources found!")
176    
177 nsmirnov 1.1 def submit(self, nj):
178 nsmirnov 1.3 """
179     Submit one EDG job.
180     """
181 nsmirnov 1.1
182     jid = None
183     jdl = common.job_list[nj].jdlFilename()
184     id_tmp = tempfile.mktemp()
185     edg_ui_cfg_opt = ' '
186     if self.edg_config:
187     edg_ui_cfg_opt = ' -c ' + self.edg_config + ' '
188     if self.edg_config_vo:
189 nsmirnov 1.3 edg_ui_cfg_opt += ' --config-vo ' + self.edg_config_vo + ' '
190 nsmirnov 1.1 cmd = 'edg-job-submit -o ' + id_tmp + edg_ui_cfg_opt + jdl
191     cmd_out = runCommand(cmd)
192     if cmd_out != None:
193 nsmirnov 1.3 idfile = open(id_tmp)
194 nsmirnov 1.1 jid_line = idfile.readline()
195 nsmirnov 1.3 while jid_line[0] == '#':
196     jid_line = idfile.readline()
197     pass
198     jid = string.strip(jid_line)
199     os.unlink(id_tmp)
200     pass
201 nsmirnov 1.1 return jid
202    
203 slacapra 1.8 def getExitStatus(self, id):
204     return self.getStatusAttribute_(id, 'exit_code')
205 fanzago 1.10
206 nsmirnov 1.1 def queryStatus(self, id):
207 slacapra 1.8 return self.getStatusAttribute_(id, 'status')
208 fanzago 1.10
209 spiga 1.9 def queryDest(self, id):
210     return self.getStatusAttribute_(id, 'destination')
211    
212 slacapra 1.8
213     def getStatusAttribute_(self, id, attr):
214 nsmirnov 1.1 """ Query a status of the job with id """
215 slacapra 1.8
216     hstates = {}
217     Status = importName('edg_wl_userinterface_common_LbWrapper', 'Status')
218     # Bypass edg-job-status interfacing directly to C++ API
219     # Job attribute vector to retrieve status without edg-job-status
220     level = 0
221     # Instance of the Status class provided by LB API
222     jobStat = Status()
223     st = 0
224     jobStat.getStatus(id, level)
225     err, apiMsg = jobStat.get_error()
226     if err:
227     print 'Error caught', apiMsg
228     common.log.message(apiMsg)
229 nsmirnov 1.1 return None
230 slacapra 1.8 else:
231     for i in range(len(self.states)):
232     #print "states = ", states
233     # Fill an hash table with all information retrieved from LB API
234     hstates[ self.states[i] ] = jobStat.loadStatus(st)[i]
235     result = jobStat.loadStatus(st)[ self.states.index(attr) ]
236     return result
237 nsmirnov 1.1
238     def queryDetailedStatus(self, id):
239     """ Query a detailed status of the job with id """
240     cmd = 'edg-job-status '+id
241     cmd_out = runCommand(cmd)
242     return cmd_out
243    
244     def getOutput(self, id):
245 nsmirnov 1.5 """
246     Get output for a finished job with id.
247     Returns the name of directory with results.
248     """
249 slacapra 1.6
250     cmd = 'edg-job-get-output --dir ' + common.work_space.resDir() + ' ' + id
251 nsmirnov 1.1 cmd_out = runCommand(cmd)
252 nsmirnov 1.5
253     # Determine the output directory name
254     dir = common.work_space.resDir()
255     dir += os.getlogin()
256     dir += '_' + os.path.basename(id)
257     return dir
258 nsmirnov 1.1
259     def cancel(self, id):
260     """ Cancel the EDG job with id """
261     cmd = 'edg-job-cancel --noint ' + id
262     cmd_out = runCommand(cmd)
263     return cmd_out
264    
265 nsmirnov 1.4 def checkProxy_(self):
266 nsmirnov 1.1 """
267     Function to check the Globus proxy.
268     """
269     cmd = 'grid-proxy-info -timeleft'
270 nsmirnov 1.4 cmd_out = runCommand(cmd)
271 nsmirnov 1.1 ok = 1
272     timeleft = -999
273     try: timeleft = int(cmd_out)
274     except ValueError: ok=0
275     except TypeError: ok=0
276     if timeleft < 1: ok=0
277    
278     if ok==0:
279     msg = 'No valid proxy found !\n'
280 nsmirnov 1.4 msg += "Please do 'grid-proxy-init'."
281 nsmirnov 1.1 raise CrabException(msg)
282     return
283    
284 fanzago 1.10 def createSchScript(self, nj):
285 nsmirnov 1.1 """
286     Create a JDL-file for EDG.
287     """
288    
289     job = common.job_list[nj]
290     jbt = job.type()
291     inp_sandbox = jbt.inputSandbox(nj)
292     out_sandbox = jbt.outputSandbox(nj)
293 fanzago 1.10 inp_storage_subdir = ''
294 nsmirnov 1.1
295     title = '# This JDL was generated by '+\
296     common.prog_name+' (version '+common.prog_version_str+')\n'
297     jt_string = ''
298    
299     SPL = inp_storage_subdir
300     if ( SPL and SPL[-1] != '/' ) : SPL = SPL + '/'
301    
302     jdl_fname = job.jdlFilename()
303     jdl = open(jdl_fname, 'w')
304     jdl.write(title)
305    
306     script = job.scriptFilename()
307     jdl.write('Executable = "' + os.path.basename(script) +'";\n')
308     jdl.write(jt_string)
309    
310 slacapra 1.8 firstEvent = common.jobDB.firstEvent(nj)
311     maxEvents = common.jobDB.maxEvents(nj)
312     jdl.write('Arguments = "' + str(nj+1)+' '+str(firstEvent)+' '+str(maxEvents)+'";\n')
313    
314 nsmirnov 1.1 inp_box = 'InputSandbox = { '
315     inp_box = inp_box + '"' + script + '",'
316    
317     if inp_sandbox != None:
318     for fl in inp_sandbox:
319     inp_box = inp_box + ' "' + fl + '",'
320     pass
321     pass
322    
323     #if common.use_jam:
324     # inp_box = inp_box+' "'+common.bin_dir+'/'+common.run_jam+'",'
325    
326 slacapra 1.7 for addFile in jbt.additional_inbox_files:
327     addFile = os.path.abspath(addFile)
328     inp_box = inp_box+' "'+addFile+'",'
329     pass
330 nsmirnov 1.1
331     if inp_box[-1] == ',' : inp_box = inp_box[:-1]
332     inp_box = inp_box + ' };\n'
333     jdl.write(inp_box)
334    
335     jdl.write('StdOutput = "' + job.stdout() + '";\n')
336     jdl.write('StdError = "' + job.stderr() + '";\n')
337    
338     #if common.flag_return_data :
339     # for fl in job.outputDataFiles():
340     # out_box = out_box + ' "' + fl + '",'
341     # pass
342     # pass
343    
344 slacapra 1.7 out_box = 'OutputSandbox = { '
345 nsmirnov 1.1 if out_sandbox != None:
346     for fl in out_sandbox:
347     out_box = out_box + ' "' + fl + '",'
348     pass
349     pass
350    
351     if out_box[-1] == ',' : out_box = out_box[:-1]
352     out_box = out_box + ' };'
353     jdl.write(out_box+'\n')
354    
355     # If CloseCE is used ...
356     #if common.flag_usecloseCE and job.inputDataFiles():
357     # indata = 'InputData = { '
358     # for fl in job.inputDataFiles():
359     # indata = indata + ' "lfn:' + SPL + fl + '",'
360     # if indata[-1] == ',' : indata = indata[:-1]
361     # indata = indata + ' };'
362     # jdl.write(indata+'\n')
363     # jdl.write('DataAccessProtocol = { "gsiftp" };\n')
364    
365     if common.analisys_common_info['sites']:
366     if common.analisys_common_info['sw_version']:
367    
368     req='Requirements = '
369     ### First ORCA version
370     req=req + 'Member("VO-cms-' + \
371     common.analisys_common_info['sw_version'] + \
372     '", other.GlueHostApplicationSoftwareRunTimeEnvironment)'
373     ## then sites
374     if len(common.analisys_common_info['sites'])>0:
375     req = req + ' && ('
376     for i in range(len(common.analisys_common_info['sites'])):
377     req = req + 'other.GlueCEInfoHostName == "' \
378     + common.analisys_common_info['sites'][i] + '"'
379     if ( i < (int(len(common.analisys_common_info['sites']) - 1)) ):
380     req = req + ' || '
381     req = req + ')'
382     ## then user requirement
383     if self.EDG_requirements:
384     req = req + ' && ' + self.EDG_requirements
385     req = req + ';\n'
386     jdl.write(req)
387    
388     jdl.write('VirtualOrganisation = "' + self.VO + '";\n')
389    
390     if ( self.EDG_retry_count ):
391     jdl.write('RetryCount = '+self.EDG_retry_count+';\n')
392     pass
393    
394     jdl.close()
395     return