4 |
|
from crab_util import * |
5 |
|
import common |
6 |
|
|
7 |
< |
import os, sys, tempfile |
7 |
> |
import os, sys, time |
8 |
|
|
9 |
|
class SchedulerEdg(Scheduler): |
10 |
|
def __init__(self): |
77 |
|
try: self.EDG_cpu_time = cfg_params['EDG.max_cpu_time'] |
78 |
|
except KeyError: self.EDG_cpu_time = '' |
79 |
|
|
80 |
< |
# # Add EDG_WL_LOCATION to the python path |
81 |
< |
# |
82 |
< |
# try: |
83 |
< |
# path = os.environ['EDG_WL_LOCATION'] |
84 |
< |
# except: |
85 |
< |
# msg = "Error: the EDG_WL_LOCATION variable is not set." |
86 |
< |
# raise CrabException(msg) |
87 |
< |
# |
88 |
< |
# libPath=os.path.join(path, "lib") |
89 |
< |
# sys.path.append(libPath) |
90 |
< |
# libPath=os.path.join(path, "lib", "python") |
91 |
< |
# sys.path.append(libPath) |
80 |
> |
# Add EDG_WL_LOCATION to the python path |
81 |
|
|
82 |
< |
self.checkProxy_() |
82 |
> |
try: |
83 |
> |
path = os.environ['EDG_WL_LOCATION'] |
84 |
> |
except: |
85 |
> |
msg = "Error: the EDG_WL_LOCATION variable is not set." |
86 |
> |
raise CrabException(msg) |
87 |
> |
|
88 |
> |
libPath=os.path.join(path, "lib") |
89 |
> |
sys.path.append(libPath) |
90 |
> |
libPath=os.path.join(path, "lib", "python") |
91 |
> |
sys.path.append(libPath) |
92 |
> |
|
93 |
> |
self.proxyValid=0 |
94 |
|
return |
95 |
|
|
96 |
|
|
115 |
|
""" |
116 |
|
|
117 |
|
txt = '' |
118 |
– |
### FEDE #### |
118 |
|
if self.copy_data: |
119 |
|
if self.SE: |
120 |
|
txt += 'export SE='+self.SE+'\n' |
121 |
+ |
txt += 'echo "SE = $SE"\n' |
122 |
|
if self.SE_PATH: |
123 |
|
if ( self.SE_PATH[-1] != '/' ) : self.SE_PATH = self.SE_PATH + '/' |
124 |
|
txt += 'export SE_PATH='+self.SE_PATH+'\n' |
125 |
+ |
txt += 'echo "SE_PATH = $SE_PATH"\n' |
126 |
|
|
127 |
|
if self.register_data: |
128 |
|
if self.VO: |
130 |
|
if self.LFN: |
131 |
|
txt += 'export LFN='+self.LFN+'\n' |
132 |
|
txt += '\n' |
132 |
– |
######## |
133 |
|
txt += 'CloseCEs=`edg-brokerinfo getCE`\n' |
134 |
|
txt += 'echo "CloseCEs = $CloseCEs"\n' |
135 |
|
txt += 'CE=`echo $CloseCEs | sed -e "s/:.*//"`\n' |
136 |
|
txt += 'echo "CE = $CE"\n' |
137 |
|
return txt |
138 |
< |
#### FEDE |
138 |
> |
|
139 |
|
def wsCopyOutput(self): |
140 |
|
""" |
141 |
|
Write a CopyResults part of a job script, e.g. |
145 |
|
if self.copy_data: |
146 |
|
copy = 'globus-url-copy file://`pwd`/$out_file gsiftp://${SE}${SE_PATH}$out_file' |
147 |
|
txt += '#\n' |
148 |
< |
txt += '# Copy output into user SE = $SE\n' |
148 |
> |
txt += '# Copy output to SE = $SE\n' |
149 |
|
txt += '#\n' |
150 |
< |
txt += 'copy_exit_status=1\n' |
151 |
< |
txt += 'if [ $executable_exit_status -eq 0 ]; then\n' |
150 |
> |
txt += 'if [ $exe_result -eq 0 ]; then\n' |
151 |
|
txt += ' for out_file in $file_list ; do\n' |
152 |
|
txt += ' echo "Trying to copy output file to $SE "\n' |
153 |
|
txt += ' echo "'+copy+'"\n' |
154 |
|
txt += ' '+copy+' 2>&1\n' |
155 |
|
txt += ' copy_exit_status=$?\n' |
156 |
< |
txt += ' echo "copy_exit_status= $copy_exit_status"\n' |
156 |
> |
txt += ' echo "COPY_EXIT_STATUS = $copy_exit_status"\n' |
157 |
> |
txt += ' echo "STAGE_OUT = $copy_exit_status"\n' |
158 |
|
txt += ' if [ $copy_exit_status -ne 0 ]; then \n' |
159 |
|
txt += ' echo "Problems with SE= $SE" \n' |
160 |
– |
txt += ' echo "output lost!"\n' |
160 |
|
txt += ' else \n' |
161 |
|
txt += ' echo "output copied into $SE/$SE_PATH directory"\n' |
162 |
|
txt += ' fi \n' |
163 |
|
txt += ' done\n' |
164 |
|
txt += 'fi \n' |
166 |
– |
txt += 'echo "COPY_EXIT_STATUS=$copy_exit_status"\n' |
165 |
|
return txt |
166 |
|
|
167 |
|
def wsRegisterOutput(self): |
172 |
|
txt = '' |
173 |
|
if self.register_data: |
174 |
|
txt += '#\n' |
175 |
< |
txt += '# Register output into RLS\n' |
175 |
> |
txt += '# Register output to RLS\n' |
176 |
|
txt += '#\n' |
177 |
< |
txt += 'register_exit_status=1\n' |
180 |
< |
txt += 'if [[ $executable_exit_status -eq 0 && $copy_exit_status -eq 0 ]]; then\n' |
177 |
> |
txt += 'if [[ $exe_result -eq 0 && $copy_exit_status -eq 0 ]]; then\n' |
178 |
|
txt += ' for out_file in $file_list ; do\n' |
179 |
|
txt += ' echo "Trying to register the output file into RLS"\n' |
180 |
|
txt += ' echo "lcg-rf -l $LFN/$out_file --vo $VO sfn://$SE$SE_PATH/$out_file"\n' |
181 |
|
txt += ' lcg-rf -l $LFN/$out_file --vo $VO sfn://$SE$SE_PATH/$out_file 2>&1 \n' |
182 |
|
txt += ' register_exit_status=$?\n' |
183 |
< |
txt += ' echo "register_exit_status= $register_exit_status"\n' |
183 |
> |
txt += ' echo "REGISTER_EXIT_STATUS = $register_exit_status"\n' |
184 |
> |
txt += ' echo "STAGE_OUT = $register_exit_status"\n' |
185 |
|
txt += ' if [ $register_exit_status -ne 0 ]; then \n' |
186 |
< |
txt += ' echo "Problems with the registration into RLS" \n' |
186 |
> |
txt += ' echo "Problems with the registration to RLS" \n' |
187 |
|
txt += ' echo "Try with srm protocol" \n' |
188 |
|
txt += ' echo "lcg-rf -l $LFN/$out_file --vo $VO srm://$SE$SE_PATH/$out_file"\n' |
189 |
|
txt += ' lcg-rf -l $LFN/$out_file --vo $VO srm://$SE$SE_PATH/$out_file 2>&1 \n' |
190 |
|
txt += ' register_exit_status=$?\n' |
191 |
< |
txt += ' echo "register_exit_status= $register_exit_status"\n' |
191 |
> |
txt += ' echo "REGISTER_EXIT_STATUS = $register_exit_status"\n' |
192 |
> |
txt += ' echo "STAGE_OUT = $register_exit_status"\n' |
193 |
|
txt += ' if [ $register_exit_status -ne 0 ]; then \n' |
194 |
|
txt += ' echo "Problems with the registration into RLS" \n' |
195 |
|
txt += ' fi \n' |
196 |
|
txt += ' else \n' |
197 |
< |
txt += ' echo "output registered into RLS"\n' |
197 |
> |
txt += ' echo "output registered to RLS"\n' |
198 |
|
txt += ' fi \n' |
199 |
|
txt += ' done\n' |
200 |
< |
txt += 'elif [[ $executable_exit_status -eq 0 && $copy_exit_status -ne 0 ]]; then \n' |
200 |
> |
txt += 'elif [[ $exe_result -eq 0 && $copy_exit_status -ne 0 ]]; then \n' |
201 |
|
txt += ' echo "Trying to copy output file to CloseSE"\n' |
202 |
|
txt += ' CLOSE_SE=`edg-brokerinfo getCloseSEs | head -1`\n' |
203 |
|
txt += ' for out_file in $file_list ; do\n' |
204 |
< |
txt += ' echo "lcg-cr -v -l lfn:${LFN}/$out_file -d $SE -P $LFN/$out_file --vo $VO file://`pwd`/$out_file" \n' |
205 |
< |
txt += ' lcg-cr -v -l lfn:${LFN}/$out_file -d $SE -P $LFN/$out_file --vo $VO file://`pwd`/$out_file 2>&1 \n' |
204 |
> |
txt += ' echo "lcg-cr -v -l lfn:${LFN}/$out_file -d $CLOSE_SE -P $LFN/$out_file --vo $VO file://`pwd`/$out_file" \n' |
205 |
> |
txt += ' lcg-cr -v -l lfn:${LFN}/$out_file -d $CLOSE_SE -P $LFN/$out_file --vo $VO file://`pwd`/$out_file 2>&1 \n' |
206 |
|
txt += ' register_exit_status=$?\n' |
207 |
< |
txt += ' echo "register_exit_status= $register_exit_status"\n' |
207 |
> |
txt += ' echo "REGISTER_EXIT_STATUS = $register_exit_status"\n' |
208 |
> |
txt += ' echo "STAGE_OUT = $register_exit_status"\n' |
209 |
|
txt += ' if [ $register_exit_status -ne 0 ]; then \n' |
210 |
|
txt += ' echo "Problems with CloseSE" \n' |
211 |
|
txt += ' else \n' |
212 |
|
txt += ' echo "The program was successfully executed"\n' |
213 |
< |
txt += ' echo "Output storage element used: $CLOSE_SE"\n' |
214 |
< |
txt += ' echo "the LFN for the file is LFN=${LFN}/$out_file"\n' |
213 |
> |
txt += ' echo "SE = $CLOSE_SE"\n' |
214 |
> |
txt += ' echo "LFN for the file is LFN=${LFN}/$out_file"\n' |
215 |
|
txt += ' fi \n' |
216 |
|
txt += ' done\n' |
217 |
|
txt += 'else\n' |
218 |
< |
txt += ' echo "Problem with the executable"\n' |
218 |
> |
txt += ' echo "Problem with the executable"\n' |
219 |
|
txt += 'fi \n' |
220 |
– |
txt += 'echo "REGISTER_EXIT_STATUS=$register_exit_status"\n' |
220 |
|
return txt |
222 |
– |
##################### |
221 |
|
|
222 |
< |
def loggingInfo(self, nj): |
222 |
> |
def loggingInfo(self, id): |
223 |
|
""" |
224 |
|
retrieve the logging info from logging and bookkeeping and return it |
225 |
|
""" |
226 |
< |
id = common.jobDB.jobId(nj) |
227 |
< |
edg_ui_cfg_opt = '' |
228 |
< |
if self.edg_config: |
229 |
< |
edg_ui_cfg_opt = ' -c ' + self.edg_config + ' ' |
230 |
< |
cmd = 'edg-job-get-logging-info -v 2 ' + edg_ui_cfg_opt + id |
233 |
< |
print cmd |
234 |
< |
myCmd = os.popen(cmd) |
235 |
< |
cmd_out = myCmd.readlines() |
236 |
< |
myCmd.close() |
226 |
> |
self.checkProxy() |
227 |
> |
# id = common.jobDB.jobId(nj) |
228 |
> |
cmd = 'edg-job-get-logging-info -v 2 ' + id |
229 |
> |
cmd_out = os.popen(cmd) |
230 |
> |
# cmd_out = runCommand(cmd) |
231 |
|
return cmd_out |
232 |
|
|
233 |
|
def listMatch(self, nj): |
234 |
|
""" |
235 |
|
Check the compatibility of available resources |
236 |
|
""" |
237 |
+ |
self.checkProxy() |
238 |
|
jdl = common.job_list[nj].jdlFilename() |
239 |
< |
edg_ui_cfg_opt = '' |
245 |
< |
if self.edg_config: |
246 |
< |
edg_ui_cfg_opt = ' -c ' + self.edg_config + ' ' |
247 |
< |
if self.edg_config_vo: |
248 |
< |
edg_ui_cfg_opt += ' --config-vo ' + self.edg_config_vo + ' ' |
249 |
< |
cmd = 'edg-job-list-match ' + edg_ui_cfg_opt + jdl |
239 |
> |
cmd = 'edg-job-list-match ' + self.configOpt_() + jdl |
240 |
|
myCmd = os.popen(cmd) |
241 |
|
cmd_out = myCmd.readlines() |
242 |
|
myCmd.close() |
243 |
|
return self.parseListMatch_(cmd_out, jdl) |
244 |
|
|
245 |
|
def parseListMatch_(self, out, jdl): |
246 |
+ |
|
247 |
|
reComment = re.compile( r'^\**$' ) |
248 |
|
reEmptyLine = re.compile( r'^$' ) |
249 |
|
reVO = re.compile( r'Selected Virtual Organisation name.*' ) |
253 |
|
next = 0 |
254 |
|
CEs=[] |
255 |
|
Match=0 |
256 |
+ |
|
257 |
|
for line in out: |
258 |
|
line = line.strip() |
259 |
|
if reComment.match( line ): |
274 |
|
continue |
275 |
|
if next: |
276 |
|
CE=line.split(':')[0] |
277 |
< |
CEs.append(CE) |
277 |
> |
if (CEs.count(CE) > 0): |
278 |
> |
pass |
279 |
> |
else: |
280 |
> |
CEs.append(CE) |
281 |
> |
Match=Match+1 |
282 |
|
common.logger.debug(5, 'Matched CE :'+CE) |
287 |
– |
Match=Match+1 |
283 |
|
pass |
284 |
|
if reNO.match( line ): |
285 |
|
common.logger.debug(5,line) |
312 |
|
Submit one EDG job. |
313 |
|
""" |
314 |
|
|
315 |
+ |
self.checkProxy() |
316 |
|
jid = None |
317 |
|
jdl = common.job_list[nj].jdlFilename() |
318 |
< |
id_tmp = tempfile.mktemp() |
319 |
< |
edg_ui_cfg_opt = ' ' |
324 |
< |
if self.edg_config: |
325 |
< |
edg_ui_cfg_opt = ' -c ' + self.edg_config + ' ' |
326 |
< |
if self.edg_config_vo: |
327 |
< |
edg_ui_cfg_opt += ' --config-vo ' + self.edg_config_vo + ' ' |
328 |
< |
cmd = 'edg-job-submit -o ' + id_tmp + edg_ui_cfg_opt + jdl |
318 |
> |
|
319 |
> |
cmd = 'edg-job-submit ' + self.configOpt_() + jdl |
320 |
|
cmd_out = runCommand(cmd) |
321 |
|
if cmd_out != None: |
322 |
< |
idfile = open(id_tmp) |
323 |
< |
jid_line = idfile.readline() |
333 |
< |
while jid_line[0] == '#': |
334 |
< |
jid_line = idfile.readline() |
335 |
< |
pass |
336 |
< |
jid = string.strip(jid_line) |
337 |
< |
os.unlink(id_tmp) |
322 |
> |
reSid = re.compile( r'https.+' ) |
323 |
> |
jid = reSid.search(cmd_out).group() |
324 |
|
pass |
325 |
|
return jid |
326 |
|
|
337 |
|
def getStatusAttribute_(self, id, attr): |
338 |
|
""" Query a status of the job with id """ |
339 |
|
|
340 |
+ |
self.checkProxy() |
341 |
|
hstates = {} |
342 |
|
Status = importName('edg_wl_userinterface_common_LbWrapper', 'Status') |
343 |
|
# Bypass edg-job-status interfacing directly to C++ API |
349 |
|
jobStat.getStatus(id, level) |
350 |
|
err, apiMsg = jobStat.get_error() |
351 |
|
if err: |
352 |
< |
print 'Error caught', apiMsg |
353 |
< |
common.log.message(apiMsg) |
352 |
> |
#print 'Error caught', apiMsg |
353 |
> |
#common.log.message(apiMsg) |
354 |
> |
common.logger.debug(5,'Error caught' + apiMsg) |
355 |
|
return None |
356 |
|
else: |
357 |
|
for i in range(len(self.states)): |
372 |
|
Returns the name of directory with results. |
373 |
|
""" |
374 |
|
|
375 |
+ |
self.checkProxy() |
376 |
|
cmd = 'edg-job-get-output --dir ' + common.work_space.resDir() + ' ' + id |
377 |
|
cmd_out = runCommand(cmd) |
378 |
|
|
384 |
|
|
385 |
|
def cancel(self, id): |
386 |
|
""" Cancel the EDG job with id """ |
387 |
+ |
self.checkProxy() |
388 |
|
cmd = 'edg-job-cancel --noint ' + id |
389 |
|
cmd_out = runCommand(cmd) |
390 |
|
return cmd_out |
391 |
|
|
402 |
– |
def checkProxy_(self): |
403 |
– |
""" |
404 |
– |
Function to check the Globus proxy. |
405 |
– |
""" |
406 |
– |
cmd = 'grid-proxy-info -timeleft' |
407 |
– |
cmd_out = runCommand(cmd) |
408 |
– |
ok = 1 |
409 |
– |
timeleft = -999 |
410 |
– |
try: timeleft = int(cmd_out) |
411 |
– |
except ValueError: ok=0 |
412 |
– |
except TypeError: ok=0 |
413 |
– |
if timeleft < 1: ok=0 |
414 |
– |
|
415 |
– |
if ok==0: |
416 |
– |
print "No valid proxy found !\n" |
417 |
– |
print "Creating a user proxy with default length of 100h\n" |
418 |
– |
msg = "Unable to create a valid proxy!\n" |
419 |
– |
if os.system("grid-proxy-init -valid 100:00"): |
420 |
– |
raise CrabException(msg) |
421 |
– |
return |
422 |
– |
|
392 |
|
def createSchScript(self, nj): |
393 |
|
""" |
394 |
|
Create a JDL-file for EDG. |
501 |
|
|
502 |
|
jdl.close() |
503 |
|
return |
504 |
+ |
|
505 |
+ |
def checkProxy(self): |
506 |
+ |
""" |
507 |
+ |
Function to check the Globus proxy. |
508 |
+ |
""" |
509 |
+ |
if (self.proxyValid): return |
510 |
+ |
timeleft = -999 |
511 |
+ |
minTimeLeft=10 # in hours |
512 |
+ |
cmd = 'grid-proxy-info -e -v '+str(minTimeLeft)+':00' |
513 |
+ |
try: cmd_out = runCommand(cmd,0) |
514 |
+ |
except: print cmd_out |
515 |
+ |
if (cmd_out == None or cmd_out=='1'): |
516 |
+ |
common.logger.message( "No valid proxy found or timeleft too short!\n Creating a user proxy with default length of 100h\n") |
517 |
+ |
cmd = 'grid-proxy-init -valid 100:00' |
518 |
+ |
try: |
519 |
+ |
out = os.system(cmd) |
520 |
+ |
if (out>0): raise CrabException("Unable to create a valid proxy!\n") |
521 |
+ |
except: |
522 |
+ |
msg = "Unable to create a valid proxy!\n" |
523 |
+ |
raise CrabException(msg) |
524 |
+ |
cmd = 'grid-proxy-info -timeleft' |
525 |
+ |
cmd_out = runCommand(cmd,0) |
526 |
+ |
#print cmd_out, time.time() |
527 |
+ |
#time.time(cms_out) |
528 |
+ |
pass |
529 |
+ |
self.proxyValid=1 |
530 |
+ |
return |
531 |
+ |
|
532 |
+ |
def configOpt_(self): |
533 |
+ |
edg_ui_cfg_opt = ' ' |
534 |
+ |
if self.edg_config: |
535 |
+ |
edg_ui_cfg_opt = ' -c ' + self.edg_config + ' ' |
536 |
+ |
if self.edg_config_vo: |
537 |
+ |
edg_ui_cfg_opt += ' --config-vo ' + self.edg_config_vo + ' ' |
538 |
+ |
return edg_ui_cfg_opt |