4 |
|
from crab_util import * |
5 |
|
import common |
6 |
|
|
7 |
< |
import os, sys, tempfile |
7 |
> |
import os, sys, time |
8 |
|
|
9 |
|
class SchedulerEdg(Scheduler): |
10 |
|
def __init__(self): |
77 |
|
try: self.EDG_cpu_time = cfg_params['EDG.max_cpu_time'] |
78 |
|
except KeyError: self.EDG_cpu_time = '' |
79 |
|
|
80 |
< |
# # Add EDG_WL_LOCATION to the python path |
81 |
< |
# |
82 |
< |
# try: |
83 |
< |
# path = os.environ['EDG_WL_LOCATION'] |
84 |
< |
# except: |
85 |
< |
# msg = "Error: the EDG_WL_LOCATION variable is not set." |
86 |
< |
# raise CrabException(msg) |
87 |
< |
# |
88 |
< |
# libPath=os.path.join(path, "lib") |
89 |
< |
# sys.path.append(libPath) |
90 |
< |
# libPath=os.path.join(path, "lib", "python") |
91 |
< |
# sys.path.append(libPath) |
80 |
> |
# Add EDG_WL_LOCATION to the python path |
81 |
|
|
82 |
< |
self.checkProxy_() |
82 |
> |
try: |
83 |
> |
path = os.environ['EDG_WL_LOCATION'] |
84 |
> |
except: |
85 |
> |
msg = "Error: the EDG_WL_LOCATION variable is not set." |
86 |
> |
raise CrabException(msg) |
87 |
> |
|
88 |
> |
libPath=os.path.join(path, "lib") |
89 |
> |
sys.path.append(libPath) |
90 |
> |
libPath=os.path.join(path, "lib", "python") |
91 |
> |
sys.path.append(libPath) |
92 |
> |
|
93 |
> |
self.proxyValid=0 |
94 |
|
return |
95 |
|
|
96 |
|
|
147 |
|
txt += '#\n' |
148 |
|
txt += '# Copy output to SE = $SE\n' |
149 |
|
txt += '#\n' |
150 |
< |
txt += 'if [ $executable_exit_status -eq 0 ]; then\n' |
150 |
> |
#### per orca l'exit_status non e' affidabile..... |
151 |
> |
#txt += 'if [ $executable_exit_status -eq 0 ]; then\n' |
152 |
> |
txt += 'if [ $exe_result -eq 0 ]; then\n' |
153 |
|
txt += ' for out_file in $file_list ; do\n' |
154 |
|
txt += ' echo "Trying to copy output file to $SE "\n' |
155 |
|
txt += ' echo "'+copy+'"\n' |
176 |
|
txt += '#\n' |
177 |
|
txt += '# Register output to RLS\n' |
178 |
|
txt += '#\n' |
179 |
< |
txt += 'if [[ $executable_exit_status -eq 0 && $copy_exit_status -eq 0 ]]; then\n' |
179 |
> |
### analogo |
180 |
> |
#txt += 'if [[ $executable_exit_status -eq 0 && $copy_exit_status -eq 0 ]]; then\n' |
181 |
> |
txt += 'if [[ $exe_result -eq 0 && $copy_exit_status -eq 0 ]]; then\n' |
182 |
|
txt += ' for out_file in $file_list ; do\n' |
183 |
|
txt += ' echo "Trying to register the output file into RLS"\n' |
184 |
|
txt += ' echo "lcg-rf -l $LFN/$out_file --vo $VO sfn://$SE$SE_PATH/$out_file"\n' |
201 |
|
txt += ' echo "output registered to RLS"\n' |
202 |
|
txt += ' fi \n' |
203 |
|
txt += ' done\n' |
204 |
< |
txt += 'elif [[ $executable_exit_status -eq 0 && $copy_exit_status -ne 0 ]]; then \n' |
204 |
> |
txt += 'elif [[ $exe_result -eq 0 && $copy_exit_status -ne 0 ]]; then \n' |
205 |
|
txt += ' echo "Trying to copy output file to CloseSE"\n' |
206 |
|
txt += ' CLOSE_SE=`edg-brokerinfo getCloseSEs | head -1`\n' |
207 |
|
txt += ' for out_file in $file_list ; do\n' |
228 |
|
""" |
229 |
|
retrieve the logging info from logging and bookkeeping and return it |
230 |
|
""" |
231 |
+ |
self.checkProxy() |
232 |
|
id = common.jobDB.jobId(nj) |
233 |
< |
edg_ui_cfg_opt = '' |
229 |
< |
if self.edg_config: |
230 |
< |
edg_ui_cfg_opt = ' -c ' + self.edg_config + ' ' |
231 |
< |
cmd = 'edg-job-get-logging-info -v 2 ' + edg_ui_cfg_opt + id |
232 |
< |
print cmd |
233 |
> |
cmd = 'edg-job-get-logging-info -v 2 ' + self.configOpt_() + id |
234 |
|
myCmd = os.popen(cmd) |
235 |
|
cmd_out = myCmd.readlines() |
236 |
|
myCmd.close() |
240 |
|
""" |
241 |
|
Check the compatibility of available resources |
242 |
|
""" |
243 |
+ |
self.checkProxy() |
244 |
|
jdl = common.job_list[nj].jdlFilename() |
245 |
< |
edg_ui_cfg_opt = '' |
244 |
< |
if self.edg_config: |
245 |
< |
edg_ui_cfg_opt = ' -c ' + self.edg_config + ' ' |
246 |
< |
if self.edg_config_vo: |
247 |
< |
edg_ui_cfg_opt += ' --config-vo ' + self.edg_config_vo + ' ' |
248 |
< |
cmd = 'edg-job-list-match ' + edg_ui_cfg_opt + jdl |
245 |
> |
cmd = 'edg-job-list-match ' + self.configOpt_() + jdl |
246 |
|
myCmd = os.popen(cmd) |
247 |
|
cmd_out = myCmd.readlines() |
248 |
|
myCmd.close() |
313 |
|
Submit one EDG job. |
314 |
|
""" |
315 |
|
|
316 |
+ |
self.checkProxy() |
317 |
|
jid = None |
318 |
|
jdl = common.job_list[nj].jdlFilename() |
319 |
< |
id_tmp = tempfile.mktemp() |
320 |
< |
edg_ui_cfg_opt = ' ' |
323 |
< |
if self.edg_config: |
324 |
< |
edg_ui_cfg_opt = ' -c ' + self.edg_config + ' ' |
325 |
< |
if self.edg_config_vo: |
326 |
< |
edg_ui_cfg_opt += ' --config-vo ' + self.edg_config_vo + ' ' |
327 |
< |
cmd = 'edg-job-submit -o ' + id_tmp + edg_ui_cfg_opt + jdl |
319 |
> |
|
320 |
> |
cmd = 'edg-job-submit ' + self.configOpt_() + jdl |
321 |
|
cmd_out = runCommand(cmd) |
322 |
|
if cmd_out != None: |
323 |
< |
idfile = open(id_tmp) |
324 |
< |
jid_line = idfile.readline() |
332 |
< |
while jid_line[0] == '#': |
333 |
< |
jid_line = idfile.readline() |
334 |
< |
pass |
335 |
< |
jid = string.strip(jid_line) |
336 |
< |
os.unlink(id_tmp) |
323 |
> |
reSid = re.compile( r'https.+' ) |
324 |
> |
jid = reSid.search(cmd).group() |
325 |
|
pass |
326 |
|
return jid |
327 |
|
|
338 |
|
def getStatusAttribute_(self, id, attr): |
339 |
|
""" Query a status of the job with id """ |
340 |
|
|
341 |
+ |
self.checkProxy() |
342 |
|
hstates = {} |
343 |
|
Status = importName('edg_wl_userinterface_common_LbWrapper', 'Status') |
344 |
|
# Bypass edg-job-status interfacing directly to C++ API |
372 |
|
Returns the name of directory with results. |
373 |
|
""" |
374 |
|
|
375 |
+ |
self.checkProxy() |
376 |
|
cmd = 'edg-job-get-output --dir ' + common.work_space.resDir() + ' ' + id |
377 |
|
cmd_out = runCommand(cmd) |
378 |
|
|
384 |
|
|
385 |
|
def cancel(self, id): |
386 |
|
""" Cancel the EDG job with id """ |
387 |
+ |
self.checkProxy() |
388 |
|
cmd = 'edg-job-cancel --noint ' + id |
389 |
|
cmd_out = runCommand(cmd) |
390 |
|
return cmd_out |
391 |
|
|
401 |
– |
def checkProxy_(self): |
402 |
– |
""" |
403 |
– |
Function to check the Globus proxy. |
404 |
– |
""" |
405 |
– |
cmd = 'grid-proxy-info -timeleft' |
406 |
– |
cmd_out = runCommand(cmd) |
407 |
– |
ok = 1 |
408 |
– |
timeleft = -999 |
409 |
– |
try: timeleft = int(cmd_out) |
410 |
– |
except ValueError: ok=0 |
411 |
– |
except TypeError: ok=0 |
412 |
– |
if timeleft < 1: ok=0 |
413 |
– |
|
414 |
– |
if ok==0: |
415 |
– |
print "No valid proxy found !\n" |
416 |
– |
print "Creating a user proxy with default length of 100h\n" |
417 |
– |
msg = "Unable to create a valid proxy!\n" |
418 |
– |
if os.system("grid-proxy-init -valid 100:00"): |
419 |
– |
raise CrabException(msg) |
420 |
– |
return |
421 |
– |
|
392 |
|
def createSchScript(self, nj): |
393 |
|
""" |
394 |
|
Create a JDL-file for EDG. |
501 |
|
|
502 |
|
jdl.close() |
503 |
|
return |
504 |
+ |
|
505 |
+ |
def checkProxy(self): |
506 |
+ |
""" |
507 |
+ |
Function to check the Globus proxy. |
508 |
+ |
""" |
509 |
+ |
if (self.proxyValid): return |
510 |
+ |
timeleft = -999 |
511 |
+ |
minTimeLeft=10 # in hours |
512 |
+ |
cmd = 'grid-proxy-info -e -v '+str(minTimeLeft)+':00' |
513 |
+ |
try: cmd_out = runCommand(cmd,0) |
514 |
+ |
except: print cmd_out |
515 |
+ |
if (cmd_out == None or cmd_out=='1'): |
516 |
+ |
common.logger.message( "No valid proxy found or timeleft too short!\n Creating a user proxy with default length of 100h\n") |
517 |
+ |
cmd = 'grid-proxy-init -valid 100:00' |
518 |
+ |
try: |
519 |
+ |
out = os.system(cmd) |
520 |
+ |
if (out>0): raise CrabException("Unable to create a valid proxy!\n") |
521 |
+ |
except: |
522 |
+ |
msg = "Unable to create a valid proxy!\n" |
523 |
+ |
raise CrabException(msg) |
524 |
+ |
cmd = 'grid-proxy-info -timeleft' |
525 |
+ |
cmd_out = runCommand(cmd,0) |
526 |
+ |
print cmd_out, time.time() |
527 |
+ |
#time.time(cms_out) |
528 |
+ |
pass |
529 |
+ |
self.proxyValid=1 |
530 |
+ |
return |
531 |
+ |
|
532 |
+ |
def configOpt_(self): |
533 |
+ |
edg_ui_cfg_opt = ' ' |
534 |
+ |
if self.edg_config: |
535 |
+ |
edg_ui_cfg_opt = ' -c ' + self.edg_config + ' ' |
536 |
+ |
if self.edg_config_vo: |
537 |
+ |
edg_ui_cfg_opt += ' --config-vo ' + self.edg_config_vo + ' ' |
538 |
+ |
return edg_ui_cfg_opt |