ViewVC Help
View File | Revision Log | Show Annotations | Root Listing
root/cvsroot/COMP/CRAB/python/SchedulerEdg.py
Revision: 1.5
Committed: Thu Jun 23 16:53:25 2005 UTC (19 years, 10 months ago) by nsmirnov
Content type: text/x-python
Branch: MAIN
Changes since 1.4: +16 -25 lines
Log Message:
Commands '-list', '-status', '-kill', '-retrieve' implemented

File Contents

# Content
1 from Scheduler import Scheduler
2 from crab_logger import Logger
3 from crab_exceptions import *
4 from crab_util import *
5 import common
6
7 import os, sys, tempfile
8
9 class SchedulerEdg(Scheduler):
10 def __init__(self):
11 Scheduler.__init__(self,"EDG")
12 return
13
14 def configure(self, cfg_params):
15
16 try: self.edg_ui_cfg = cfg_params["EDG.rb_config"]
17 except KeyError: self.edg_ui_cfg = ''
18
19 try: self.edg_config = cfg_params["EDG.config"]
20 except KeyError: self.edg_config = ''
21
22 try: self.edg_config_vo = cfg_params["EDG.config_vo"]
23 except KeyError: self.edg_config_vo = ''
24
25 try: self.LCG_version = cfg_params["EDG.lcg_version"]
26 except KeyError: self.LCG_version = '2'
27
28 try: self.EDG_requirements = cfg_params['EDG.requirements']
29 except KeyError: self.EDG_requirements = ''
30
31 try: self.EDG_retry_count = cfg_params['EDG.retry_count']
32 except KeyError: self.EDG_retry_count = ''
33
34 try:
35 self.VO = cfg_params['EDG.virtual_organization']
36 except KeyError:
37 msg = 'EDG.virtual_organization is mandatory.'
38 raise CrabException(msg)
39
40
41 #self.scripts_dir = common.bin_dir + '/scripts'
42 #self.cmd_prefix = 'edg'
43 #if common.LCG_version == '0' : self.cmd_prefix = 'dg'
44
45 # Add EDG_WL_LOCATION to the python path
46
47 try:
48 path = os.environ['EDG_WL_LOCATION']
49 except:
50 msg = "Error: the EDG_WL_LOCATION variable is not set."
51 raise CrabException(msg)
52
53 libPath=os.path.join(path, "lib")
54 sys.path.append(libPath)
55 libPath=os.path.join(path, "lib", "python")
56 sys.path.append(libPath)
57
58 self.checkProxy_()
59 return
60
61 def wsSetupEnvironment(self):
62 """
63 Returns part of a job script which does scheduler-specific work.
64 """
65 txt = '\n'
66 txt += 'CloseCEs=`edg-brokerinfo getCE`\n'
67 txt += 'echo "CloseCEs = $CloseCEs"\n'
68 txt += 'CE=`echo $CloseCEs | sed -e "s/:.*//"`\n'
69 txt += 'echo "CE = $CE"\n'
70 return txt
71
72 def submit(self, nj):
73 """
74 Submit one EDG job.
75 """
76
77 jid = None
78 jdl = common.job_list[nj].jdlFilename()
79 id_tmp = tempfile.mktemp()
80 edg_ui_cfg_opt = ' '
81 if self.edg_config:
82 edg_ui_cfg_opt = ' -c ' + self.edg_config + ' '
83 if self.edg_config_vo:
84 edg_ui_cfg_opt += ' --config-vo ' + self.edg_config_vo + ' '
85 cmd = 'edg-job-submit -o ' + id_tmp + edg_ui_cfg_opt + jdl
86 cmd_out = runCommand(cmd)
87 if cmd_out != None:
88 idfile = open(id_tmp)
89 jid_line = idfile.readline()
90 while jid_line[0] == '#':
91 jid_line = idfile.readline()
92 pass
93 jid = string.strip(jid_line)
94 os.unlink(id_tmp)
95 pass
96 return jid
97
98 def queryStatus(self, id):
99 """ Query a status of the job with id """
100 cmd0 = 'edg-job-status '
101 cmd = cmd0 + id
102 cmd_out = runCommand(cmd)
103 if cmd_out == None:
104 common.logger.message('Error. No output from `'+cmd+'`')
105 return None
106 # parse output
107 status_prefix = 'Current Status:'
108 status_index = string.find(cmd_out, status_prefix)
109 if status_index == -1:
110 common.logger.message('Error. Bad output of `'+cmd0+'`:\n'+cmd_out)
111 return None
112 status = cmd_out[(status_index+len(status_prefix)):]
113 nl = string.find(status,'\n')
114 status = string.strip(status[0:nl])
115 return status
116
117 def queryDetailedStatus(self, id):
118 """ Query a detailed status of the job with id """
119 cmd = 'edg-job-status '+id
120 cmd_out = runCommand(cmd)
121 return cmd_out
122
123 def getOutput(self, id):
124 """
125 Get output for a finished job with id.
126 Returns the name of directory with results.
127 """
128 cmd = 'edg-job-get-output --dir ' + common.work_space.resDir() +' '+id
129 cmd_out = runCommand(cmd)
130
131 # Determine the output directory name
132 dir = common.work_space.resDir()
133 dir += os.getlogin()
134 dir += '_' + os.path.basename(id)
135 return dir
136
137 def cancel(self, id):
138 """ Cancel the EDG job with id """
139 cmd = 'edg-job-cancel --noint ' + id
140 cmd_out = runCommand(cmd)
141 return cmd_out
142
143 def checkProxy_(self):
144 """
145 Function to check the Globus proxy.
146 """
147 cmd = 'grid-proxy-info -timeleft'
148 cmd_out = runCommand(cmd)
149 ok = 1
150 timeleft = -999
151 try: timeleft = int(cmd_out)
152 except ValueError: ok=0
153 except TypeError: ok=0
154 if timeleft < 1: ok=0
155
156 if ok==0:
157 msg = 'No valid proxy found !\n'
158 msg += "Please do 'grid-proxy-init'."
159 raise CrabException(msg)
160 return
161
162 def createJDL(self, nj):
163 """
164 Create a JDL-file for EDG.
165 """
166
167 job = common.job_list[nj]
168 jbt = job.type()
169 # jbt.loadJobInfo()
170 inp_sandbox = jbt.inputSandbox(nj)
171 out_sandbox = jbt.outputSandbox(nj)
172 inp_storage_subdir = ''#jbt.inputStorageSubdir()
173
174 title = '# This JDL was generated by '+\
175 common.prog_name+' (version '+common.prog_version_str+')\n'
176 jt_string = ''
177
178 SPL = inp_storage_subdir
179 if ( SPL and SPL[-1] != '/' ) : SPL = SPL + '/'
180
181 jdl_fname = job.jdlFilename()
182 jdl = open(jdl_fname, 'w')
183 jdl.write(title)
184
185 script = job.scriptFilename()
186 jdl.write('Executable = "' + os.path.basename(script) +'";\n')
187 jdl.write(jt_string)
188
189 inp_box = 'InputSandbox = { '
190 inp_box = inp_box + '"' + script + '",'
191
192 if inp_sandbox != None:
193 for fl in inp_sandbox:
194 inp_box = inp_box + ' "' + fl + '",'
195 pass
196 pass
197
198 #if common.use_jam:
199 # inp_box = inp_box+' "'+common.bin_dir+'/'+common.run_jam+'",'
200
201 # ??? Should be local, i.e. self.additional_inbox_files
202 # and filled in ctor from cfg_params
203 #for addFile in common.additional_inbox_files:
204 # addFile = os.path.abspath(addFile)
205 # inp_box = inp_box+' "'+addFile+'",'
206 # pass
207
208 if inp_box[-1] == ',' : inp_box = inp_box[:-1]
209 inp_box = inp_box + ' };\n'
210 jdl.write(inp_box)
211
212 jdl.write('StdOutput = "' + job.stdout() + '";\n')
213 jdl.write('StdError = "' + job.stderr() + '";\n')
214
215
216 ### SL check if stdout==stderr: in case put just one in the out_box
217 if job.stdout() == job.stderr():
218 out_box = 'OutputSandbox = { "' + \
219 job.stdout() + '", ".BrokerInfo",'
220 else:
221 out_box = 'OutputSandbox = { "' + \
222 job.stdout() + '", "' + \
223 job.stderr() + '", ".BrokerInfo",'
224 pass
225
226 #if common.flag_return_data :
227 # for fl in job.outputDataFiles():
228 # out_box = out_box + ' "' + fl + '",'
229 # pass
230 # pass
231
232 if out_sandbox != None:
233 for fl in out_sandbox:
234 out_box = out_box + ' "' + fl + '",'
235 pass
236 pass
237
238 if out_box[-1] == ',' : out_box = out_box[:-1]
239 out_box = out_box + ' };'
240 jdl.write(out_box+'\n')
241
242 # If CloseCE is used ...
243 #if common.flag_usecloseCE and job.inputDataFiles():
244 # indata = 'InputData = { '
245 # for fl in job.inputDataFiles():
246 # indata = indata + ' "lfn:' + SPL + fl + '",'
247 # if indata[-1] == ',' : indata = indata[:-1]
248 # indata = indata + ' };'
249 # jdl.write(indata+'\n')
250 # jdl.write('DataAccessProtocol = { "gsiftp" };\n')
251
252 if common.analisys_common_info['sites']:
253 if common.analisys_common_info['sw_version']:
254
255 req='Requirements = '
256 ### First ORCA version
257 req=req + 'Member("VO-cms-' + \
258 common.analisys_common_info['sw_version'] + \
259 '", other.GlueHostApplicationSoftwareRunTimeEnvironment)'
260 ## then sites
261 if len(common.analisys_common_info['sites'])>0:
262 req = req + ' && ('
263 for i in range(len(common.analisys_common_info['sites'])):
264 req = req + 'other.GlueCEInfoHostName == "' \
265 + common.analisys_common_info['sites'][i] + '"'
266 if ( i < (int(len(common.analisys_common_info['sites']) - 1)) ):
267 req = req + ' || '
268 req = req + ')'
269 ## then user requirement
270 if self.EDG_requirements:
271 req = req + ' && ' + self.EDG_requirements
272 req = req + ';\n'
273 jdl.write(req)
274
275 jdl.write('VirtualOrganisation = "' + self.VO + '";\n')
276
277 if ( self.EDG_retry_count ):
278 jdl.write('RetryCount = '+self.EDG_retry_count+';\n')
279 pass
280
281 jdl.close()
282 return