ViewVC Help
View File | Revision Log | Show Annotations | Root Listing
root/cvsroot/COMP/CRAB/python/SchedulerEdg.py
Revision: 1.5
Committed: Thu Jun 23 16:53:25 2005 UTC (19 years, 10 months ago) by nsmirnov
Content type: text/x-python
Branch: MAIN
Changes since 1.4: +16 -25 lines
Log Message:
Commands '-list', '-status', '-kill', '-retrieve' implemented

File Contents

# User Rev Content
1 nsmirnov 1.1 from Scheduler import Scheduler
2     from crab_logger import Logger
3     from crab_exceptions import *
4     from crab_util import *
5     import common
6    
7     import os, sys, tempfile
8    
9     class SchedulerEdg(Scheduler):
10     def __init__(self):
11     Scheduler.__init__(self,"EDG")
12     return
13    
14     def configure(self, cfg_params):
15    
16     try: self.edg_ui_cfg = cfg_params["EDG.rb_config"]
17     except KeyError: self.edg_ui_cfg = ''
18    
19     try: self.edg_config = cfg_params["EDG.config"]
20     except KeyError: self.edg_config = ''
21    
22     try: self.edg_config_vo = cfg_params["EDG.config_vo"]
23 nsmirnov 1.3 except KeyError: self.edg_config_vo = ''
24 nsmirnov 1.1
25     try: self.LCG_version = cfg_params["EDG.lcg_version"]
26     except KeyError: self.LCG_version = '2'
27    
28     try: self.EDG_requirements = cfg_params['EDG.requirements']
29     except KeyError: self.EDG_requirements = ''
30    
31     try: self.EDG_retry_count = cfg_params['EDG.retry_count']
32     except KeyError: self.EDG_retry_count = ''
33    
34     try:
35     self.VO = cfg_params['EDG.virtual_organization']
36     except KeyError:
37     msg = 'EDG.virtual_organization is mandatory.'
38     raise CrabException(msg)
39    
40    
41     #self.scripts_dir = common.bin_dir + '/scripts'
42     #self.cmd_prefix = 'edg'
43     #if common.LCG_version == '0' : self.cmd_prefix = 'dg'
44    
45     # Add EDG_WL_LOCATION to the python path
46    
47     try:
48     path = os.environ['EDG_WL_LOCATION']
49     except:
50     msg = "Error: the EDG_WL_LOCATION variable is not set."
51     raise CrabException(msg)
52    
53     libPath=os.path.join(path, "lib")
54     sys.path.append(libPath)
55     libPath=os.path.join(path, "lib", "python")
56     sys.path.append(libPath)
57    
58 nsmirnov 1.4 self.checkProxy_()
59 nsmirnov 1.1 return
60    
61 nsmirnov 1.2 def wsSetupEnvironment(self):
62     """
63     Returns part of a job script which does scheduler-specific work.
64     """
65     txt = '\n'
66     txt += 'CloseCEs=`edg-brokerinfo getCE`\n'
67     txt += 'echo "CloseCEs = $CloseCEs"\n'
68     txt += 'CE=`echo $CloseCEs | sed -e "s/:.*//"`\n'
69     txt += 'echo "CE = $CE"\n'
70     return txt
71 nsmirnov 1.1
72     def submit(self, nj):
73 nsmirnov 1.3 """
74     Submit one EDG job.
75     """
76 nsmirnov 1.1
77     jid = None
78     jdl = common.job_list[nj].jdlFilename()
79     id_tmp = tempfile.mktemp()
80     edg_ui_cfg_opt = ' '
81     if self.edg_config:
82     edg_ui_cfg_opt = ' -c ' + self.edg_config + ' '
83     if self.edg_config_vo:
84 nsmirnov 1.3 edg_ui_cfg_opt += ' --config-vo ' + self.edg_config_vo + ' '
85 nsmirnov 1.1 cmd = 'edg-job-submit -o ' + id_tmp + edg_ui_cfg_opt + jdl
86     cmd_out = runCommand(cmd)
87     if cmd_out != None:
88 nsmirnov 1.3 idfile = open(id_tmp)
89 nsmirnov 1.1 jid_line = idfile.readline()
90 nsmirnov 1.3 while jid_line[0] == '#':
91     jid_line = idfile.readline()
92     pass
93     jid = string.strip(jid_line)
94     os.unlink(id_tmp)
95     pass
96 nsmirnov 1.1 return jid
97    
98     def queryStatus(self, id):
99     """ Query a status of the job with id """
100     cmd0 = 'edg-job-status '
101     cmd = cmd0 + id
102     cmd_out = runCommand(cmd)
103     if cmd_out == None:
104 nsmirnov 1.5 common.logger.message('Error. No output from `'+cmd+'`')
105 nsmirnov 1.1 return None
106     # parse output
107 nsmirnov 1.5 status_prefix = 'Current Status:'
108 nsmirnov 1.1 status_index = string.find(cmd_out, status_prefix)
109     if status_index == -1:
110 nsmirnov 1.5 common.logger.message('Error. Bad output of `'+cmd0+'`:\n'+cmd_out)
111 nsmirnov 1.1 return None
112     status = cmd_out[(status_index+len(status_prefix)):]
113     nl = string.find(status,'\n')
114 nsmirnov 1.5 status = string.strip(status[0:nl])
115     return status
116 nsmirnov 1.1
117     def queryDetailedStatus(self, id):
118     """ Query a detailed status of the job with id """
119     cmd = 'edg-job-status '+id
120     cmd_out = runCommand(cmd)
121     return cmd_out
122    
123     def getOutput(self, id):
124 nsmirnov 1.5 """
125     Get output for a finished job with id.
126     Returns the name of directory with results.
127     """
128     cmd = 'edg-job-get-output --dir ' + common.work_space.resDir() +' '+id
129 nsmirnov 1.1 cmd_out = runCommand(cmd)
130 nsmirnov 1.5
131     # Determine the output directory name
132     dir = common.work_space.resDir()
133     dir += os.getlogin()
134     dir += '_' + os.path.basename(id)
135     return dir
136 nsmirnov 1.1
137     def cancel(self, id):
138     """ Cancel the EDG job with id """
139     cmd = 'edg-job-cancel --noint ' + id
140     cmd_out = runCommand(cmd)
141     return cmd_out
142    
143 nsmirnov 1.4 def checkProxy_(self):
144 nsmirnov 1.1 """
145     Function to check the Globus proxy.
146     """
147     cmd = 'grid-proxy-info -timeleft'
148 nsmirnov 1.4 cmd_out = runCommand(cmd)
149 nsmirnov 1.1 ok = 1
150     timeleft = -999
151     try: timeleft = int(cmd_out)
152     except ValueError: ok=0
153     except TypeError: ok=0
154     if timeleft < 1: ok=0
155    
156     if ok==0:
157     msg = 'No valid proxy found !\n'
158 nsmirnov 1.4 msg += "Please do 'grid-proxy-init'."
159 nsmirnov 1.1 raise CrabException(msg)
160     return
161    
162     def createJDL(self, nj):
163     """
164     Create a JDL-file for EDG.
165     """
166    
167     job = common.job_list[nj]
168     jbt = job.type()
169     # jbt.loadJobInfo()
170     inp_sandbox = jbt.inputSandbox(nj)
171     out_sandbox = jbt.outputSandbox(nj)
172     inp_storage_subdir = ''#jbt.inputStorageSubdir()
173    
174     title = '# This JDL was generated by '+\
175     common.prog_name+' (version '+common.prog_version_str+')\n'
176     jt_string = ''
177    
178     SPL = inp_storage_subdir
179     if ( SPL and SPL[-1] != '/' ) : SPL = SPL + '/'
180    
181     jdl_fname = job.jdlFilename()
182     jdl = open(jdl_fname, 'w')
183     jdl.write(title)
184    
185     script = job.scriptFilename()
186     jdl.write('Executable = "' + os.path.basename(script) +'";\n')
187     jdl.write(jt_string)
188    
189     inp_box = 'InputSandbox = { '
190     inp_box = inp_box + '"' + script + '",'
191    
192     if inp_sandbox != None:
193     for fl in inp_sandbox:
194     inp_box = inp_box + ' "' + fl + '",'
195     pass
196     pass
197    
198     #if common.use_jam:
199     # inp_box = inp_box+' "'+common.bin_dir+'/'+common.run_jam+'",'
200    
201     # ??? Should be local, i.e. self.additional_inbox_files
202     # and filled in ctor from cfg_params
203     #for addFile in common.additional_inbox_files:
204     # addFile = os.path.abspath(addFile)
205     # inp_box = inp_box+' "'+addFile+'",'
206     # pass
207    
208     if inp_box[-1] == ',' : inp_box = inp_box[:-1]
209     inp_box = inp_box + ' };\n'
210     jdl.write(inp_box)
211    
212     jdl.write('StdOutput = "' + job.stdout() + '";\n')
213     jdl.write('StdError = "' + job.stderr() + '";\n')
214    
215    
216     ### SL check if stdout==stderr: in case put just one in the out_box
217     if job.stdout() == job.stderr():
218     out_box = 'OutputSandbox = { "' + \
219     job.stdout() + '", ".BrokerInfo",'
220     else:
221     out_box = 'OutputSandbox = { "' + \
222     job.stdout() + '", "' + \
223     job.stderr() + '", ".BrokerInfo",'
224     pass
225    
226     #if common.flag_return_data :
227     # for fl in job.outputDataFiles():
228     # out_box = out_box + ' "' + fl + '",'
229     # pass
230     # pass
231    
232     if out_sandbox != None:
233     for fl in out_sandbox:
234     out_box = out_box + ' "' + fl + '",'
235     pass
236     pass
237    
238     if out_box[-1] == ',' : out_box = out_box[:-1]
239     out_box = out_box + ' };'
240     jdl.write(out_box+'\n')
241    
242     # If CloseCE is used ...
243     #if common.flag_usecloseCE and job.inputDataFiles():
244     # indata = 'InputData = { '
245     # for fl in job.inputDataFiles():
246     # indata = indata + ' "lfn:' + SPL + fl + '",'
247     # if indata[-1] == ',' : indata = indata[:-1]
248     # indata = indata + ' };'
249     # jdl.write(indata+'\n')
250     # jdl.write('DataAccessProtocol = { "gsiftp" };\n')
251    
252     if common.analisys_common_info['sites']:
253     if common.analisys_common_info['sw_version']:
254    
255     req='Requirements = '
256     ### First ORCA version
257     req=req + 'Member("VO-cms-' + \
258     common.analisys_common_info['sw_version'] + \
259     '", other.GlueHostApplicationSoftwareRunTimeEnvironment)'
260     ## then sites
261     if len(common.analisys_common_info['sites'])>0:
262     req = req + ' && ('
263     for i in range(len(common.analisys_common_info['sites'])):
264     req = req + 'other.GlueCEInfoHostName == "' \
265     + common.analisys_common_info['sites'][i] + '"'
266     if ( i < (int(len(common.analisys_common_info['sites']) - 1)) ):
267     req = req + ' || '
268     req = req + ')'
269     ## then user requirement
270     if self.EDG_requirements:
271     req = req + ' && ' + self.EDG_requirements
272     req = req + ';\n'
273     jdl.write(req)
274    
275     jdl.write('VirtualOrganisation = "' + self.VO + '";\n')
276    
277     if ( self.EDG_retry_count ):
278     jdl.write('RetryCount = '+self.EDG_retry_count+';\n')
279     pass
280    
281     jdl.close()
282     return