ViewVC Help
View File | Revision Log | Show Annotations | Root Listing
root/cvsroot/COMP/CRAB/python/SchedulerEdg.py
Revision: 1.1
Committed: Thu Mar 10 16:20:30 2005 UTC (20 years, 1 month ago) by nsmirnov
Content type: text/x-python
Branch: MAIN
Log Message:
initial set of files

File Contents

# Content
1 from Scheduler import Scheduler
2 from crab_logger import Logger
3 from crab_exceptions import *
4 from crab_util import *
5 import common
6
7 import os, sys, tempfile
8
9 class SchedulerEdg(Scheduler):
10 def __init__(self):
11 Scheduler.__init__(self,"EDG")
12 return
13
14 def configure(self, cfg_params):
15
16 try: self.edg_ui_cfg = cfg_params["EDG.rb_config"]
17 except KeyError: self.edg_ui_cfg = ''
18
19 try: self.edg_config = cfg_params["EDG.config"]
20 except KeyError: self.edg_config = ''
21
22 try: self.edg_config_vo = cfg_params["EDG.config_vo"]
23 except KeyError: self.edg_config_vo = 'cms'
24
25 try: self.LCG_version = cfg_params["EDG.lcg_version"]
26 except KeyError: self.LCG_version = '2'
27
28 try: self.EDG_requirements = cfg_params['EDG.requirements']
29 except KeyError: self.EDG_requirements = ''
30
31 try: self.EDG_retry_count = cfg_params['EDG.retry_count']
32 except KeyError: self.EDG_retry_count = ''
33
34 try:
35 self.VO = cfg_params['EDG.virtual_organization']
36 except KeyError:
37 msg = 'EDG.virtual_organization is mandatory.'
38 raise CrabException(msg)
39
40
41 #self.scripts_dir = common.bin_dir + '/scripts'
42 #self.cmd_prefix = 'edg'
43 #if common.LCG_version == '0' : self.cmd_prefix = 'dg'
44
45 # Add EDG_WL_LOCATION to the python path
46
47 try:
48 path = os.environ['EDG_WL_LOCATION']
49 except:
50 msg = "Error: the EDG_WL_LOCATION variable is not set."
51 raise CrabException(msg)
52
53 libPath=os.path.join(path, "lib")
54 sys.path.append(libPath)
55 libPath=os.path.join(path, "lib", "python")
56 sys.path.append(libPath)
57
58 return
59
60
61 def submit(self, nj):
62 """Submit one EDG job."""
63
64 jid = None
65 jdl = common.job_list[nj].jdlFilename()
66 id_tmp = tempfile.mktemp()
67 edg_ui_cfg_opt = ' '
68 if self.edg_config:
69 edg_ui_cfg_opt = ' -c ' + self.edg_config + ' '
70 if self.edg_config_vo:
71 edg_ui_cfg_opt = edg_ui_cfg_opt + ' --config-vo ' + self.edg_config_vo + ' '
72 cmd = 'edg-job-submit -o ' + id_tmp + edg_ui_cfg_opt + jdl
73 cmd_out = runCommand(cmd)
74 if cmd_out != None:
75 idfile = open(id_tmp)
76 jid_line = idfile.readline()
77 while jid_line[0] == '#':
78 jid_line = idfile.readline()
79 jid = string.strip(jid_line)
80 os.unlink(id_tmp)
81 pass
82 return jid
83
84 def queryStatus(self, id):
85 """ Query a status of the job with id """
86 log = Logger.getInstance()
87 cmd0 = 'edg-job-status '
88 cmd = cmd0 + id
89 cmd_out = runCommand(cmd)
90 if cmd_out == None:
91 log.message('Error. No output from `'+cmd+'`')
92 return None
93 # parse output
94 status_prefix = 'Status = '
95 status_index = string.find(cmd_out, status_prefix)
96 if status_index == -1:
97 log.message('Error. Bad output of `'+cmd0+'`:\n'+cmd_out)
98 return None
99 status = cmd_out[(status_index+len(status_prefix)):]
100 nl = string.find(status,'\n')
101 return self.EDG2CMSprodStatus(status[0:nl])
102
103 def EDG2CMSprodStatus(self, edg_status):
104 edg_st = string.lower(string.strip(edg_status))
105 if edg_st == 'submitted' or edg_st == 'waiting' or \
106 edg_st == 'ready' or edg_st == 'scheduled':
107 return 'Pending'
108 if edg_st == 'running' or edg_st == 'done' or edg_st == 'chkpt':
109 return 'Running'
110 if edg_st == 'done (cancelled)':
111 return 'Canceled'
112 if edg_st == 'aborted': return 'Aborted'
113 if edg_st == 'outputready': return 'OutputReady'
114 if edg_st == 'cleared': return 'Finished'
115 return edg_st
116
117 def queryDetailedStatus(self, id):
118 """ Query a detailed status of the job with id """
119 cmd = 'edg-job-status '+id
120 cmd_out = runCommand(cmd)
121 return cmd_out
122
123 def getOutput(self, id):
124 """ Get output for a finished job with id."""
125 cmd = 'edg-job-get-output --dir ' + common.res_dir + ' '+id
126 cmd_out = runCommand(cmd)
127 return cmd_out
128
129 def cancel(self, id):
130 """ Cancel the EDG job with id """
131 cmd = 'edg-job-cancel --noint ' + id
132 cmd_out = runCommand(cmd)
133 return cmd_out
134
135 def checkProxy(self):
136 """
137 Function to check the Globus proxy.
138 """
139 cmd = 'grid-proxy-info -timeleft'
140 cmd_out = runCommand(cmd,0)
141 ok = 1
142 timeleft = -999
143 try: timeleft = int(cmd_out)
144 except ValueError: ok=0
145 except TypeError: ok=0
146 if timeleft < 1: ok=0
147
148 if ok==0:
149 msg = 'No valid proxy found !\n'
150 msg += "Please do 'grid-proxy-init'"
151 raise CrabException(msg)
152 return
153
154 def isInputReady(self, nj):
155 return 1
156
157 def createJDL(self, nj):
158 """
159 Create a JDL-file for EDG.
160 """
161
162 job = common.job_list[nj]
163 jbt = job.type()
164 # jbt.loadJobInfo()
165 inp_sandbox = jbt.inputSandbox(nj)
166 out_sandbox = jbt.outputSandbox(nj)
167 inp_storage_subdir = ''#jbt.inputStorageSubdir()
168
169 title = '# This JDL was generated by '+\
170 common.prog_name+' (version '+common.prog_version_str+')\n'
171 jt_string = ''
172
173 SPL = inp_storage_subdir
174 if ( SPL and SPL[-1] != '/' ) : SPL = SPL + '/'
175
176 jdl_fname = job.jdlFilename()
177 jdl = open(jdl_fname, 'w')
178 jdl.write(title)
179
180 script = job.scriptFilename()
181 jdl.write('Executable = "' + os.path.basename(script) +'";\n')
182 jdl.write(jt_string)
183
184 inp_box = 'InputSandbox = { '
185 inp_box = inp_box + '"' + script + '",'
186
187 if inp_sandbox != None:
188 for fl in inp_sandbox:
189 inp_box = inp_box + ' "' + fl + '",'
190 pass
191 pass
192
193 #if common.use_jam:
194 # inp_box = inp_box+' "'+common.bin_dir+'/'+common.run_jam+'",'
195
196 # ??? Should be local, i.e. self.additional_inbox_files
197 # and filled in ctor from cfg_params
198 #for addFile in common.additional_inbox_files:
199 # addFile = os.path.abspath(addFile)
200 # inp_box = inp_box+' "'+addFile+'",'
201 # pass
202
203 if inp_box[-1] == ',' : inp_box = inp_box[:-1]
204 inp_box = inp_box + ' };\n'
205 jdl.write(inp_box)
206
207 jdl.write('StdOutput = "' + job.stdout() + '";\n')
208 jdl.write('StdError = "' + job.stderr() + '";\n')
209
210
211 ### SL check if stdout==stderr: in case put just one in the out_box
212 if job.stdout() == job.stderr():
213 out_box = 'OutputSandbox = { "' + \
214 job.stdout() + '", ".BrokerInfo",'
215 else:
216 out_box = 'OutputSandbox = { "' + \
217 job.stdout() + '", "' + \
218 job.stderr() + '", ".BrokerInfo",'
219 pass
220
221 #if common.flag_return_data :
222 # for fl in job.outputDataFiles():
223 # out_box = out_box + ' "' + fl + '",'
224 # pass
225 # pass
226
227 if out_sandbox != None:
228 for fl in out_sandbox:
229 out_box = out_box + ' "' + fl + '",'
230 pass
231 pass
232
233 if out_box[-1] == ',' : out_box = out_box[:-1]
234 out_box = out_box + ' };'
235 jdl.write(out_box+'\n')
236
237 # If CloseCE is used ...
238 #if common.flag_usecloseCE and job.inputDataFiles():
239 # indata = 'InputData = { '
240 # for fl in job.inputDataFiles():
241 # indata = indata + ' "lfn:' + SPL + fl + '",'
242 # if indata[-1] == ',' : indata = indata[:-1]
243 # indata = indata + ' };'
244 # jdl.write(indata+'\n')
245 # jdl.write('DataAccessProtocol = { "gsiftp" };\n')
246
247 if common.analisys_common_info['sites']:
248 if common.analisys_common_info['sw_version']:
249
250 req='Requirements = '
251 ### First ORCA version
252 req=req + 'Member("VO-cms-' + \
253 common.analisys_common_info['sw_version'] + \
254 '", other.GlueHostApplicationSoftwareRunTimeEnvironment)'
255 ## then sites
256 if len(common.analisys_common_info['sites'])>0:
257 req = req + ' && ('
258 for i in range(len(common.analisys_common_info['sites'])):
259 req = req + 'other.GlueCEInfoHostName == "' \
260 + common.analisys_common_info['sites'][i] + '"'
261 if ( i < (int(len(common.analisys_common_info['sites']) - 1)) ):
262 req = req + ' || '
263 req = req + ')'
264 ## then user requirement
265 if self.EDG_requirements:
266 req = req + ' && ' + self.EDG_requirements
267 req = req + ';\n'
268 jdl.write(req)
269
270 jdl.write('VirtualOrganisation = "' + self.VO + '";\n')
271
272 if ( self.EDG_retry_count ):
273 jdl.write('RetryCount = '+self.EDG_retry_count+';\n')
274 pass
275
276 jdl.close()
277 return