4 |
|
from crab_util import * |
5 |
|
import common |
6 |
|
|
7 |
< |
import os, sys, tempfile |
7 |
> |
import os, sys, time |
8 |
|
|
9 |
|
class SchedulerEdg(Scheduler): |
10 |
|
def __init__(self): |
90 |
|
libPath=os.path.join(path, "lib", "python") |
91 |
|
sys.path.append(libPath) |
92 |
|
|
93 |
< |
self.checkProxy_() |
93 |
> |
self.proxyValid=0 |
94 |
|
return |
95 |
|
|
96 |
|
|
147 |
|
txt += '#\n' |
148 |
|
txt += '# Copy output to SE = $SE\n' |
149 |
|
txt += '#\n' |
150 |
< |
txt += 'if [ $executable_exit_status -eq 0 ]; then\n' |
150 |
> |
#### per orca l'exit_status non e' affidabile..... |
151 |
> |
#txt += 'if [ $executable_exit_status -eq 0 ]; then\n' |
152 |
> |
txt += 'if [ $exe_result -eq 0 ]; then\n' |
153 |
|
txt += ' for out_file in $file_list ; do\n' |
154 |
|
txt += ' echo "Trying to copy output file to $SE "\n' |
155 |
|
txt += ' echo "'+copy+'"\n' |
176 |
|
txt += '#\n' |
177 |
|
txt += '# Register output to RLS\n' |
178 |
|
txt += '#\n' |
179 |
< |
txt += 'if [[ $executable_exit_status -eq 0 && $copy_exit_status -eq 0 ]]; then\n' |
179 |
> |
### analogo |
180 |
> |
#txt += 'if [[ $executable_exit_status -eq 0 && $copy_exit_status -eq 0 ]]; then\n' |
181 |
> |
txt += 'if [[ $exe_result -eq 0 && $copy_exit_status -eq 0 ]]; then\n' |
182 |
|
txt += ' for out_file in $file_list ; do\n' |
183 |
|
txt += ' echo "Trying to register the output file into RLS"\n' |
184 |
|
txt += ' echo "lcg-rf -l $LFN/$out_file --vo $VO sfn://$SE$SE_PATH/$out_file"\n' |
201 |
|
txt += ' echo "output registered to RLS"\n' |
202 |
|
txt += ' fi \n' |
203 |
|
txt += ' done\n' |
204 |
< |
txt += 'elif [[ $executable_exit_status -eq 0 && $copy_exit_status -ne 0 ]]; then \n' |
204 |
> |
txt += 'elif [[ $exe_result -eq 0 && $copy_exit_status -ne 0 ]]; then \n' |
205 |
|
txt += ' echo "Trying to copy output file to CloseSE"\n' |
206 |
|
txt += ' CLOSE_SE=`edg-brokerinfo getCloseSEs | head -1`\n' |
207 |
|
txt += ' for out_file in $file_list ; do\n' |
224 |
|
return txt |
225 |
|
##################### |
226 |
|
|
227 |
< |
def loggingInfo(self, nj): |
227 |
> |
def loggingInfo(self, id): |
228 |
|
""" |
229 |
|
retrieve the logging info from logging and bookkeeping and return it |
230 |
|
""" |
231 |
< |
id = common.jobDB.jobId(nj) |
232 |
< |
edg_ui_cfg_opt = '' |
233 |
< |
if self.edg_config: |
234 |
< |
edg_ui_cfg_opt = ' -c ' + self.edg_config + ' ' |
231 |
< |
cmd = 'edg-job-get-logging-info -v 2 ' + edg_ui_cfg_opt + id |
232 |
< |
print cmd |
233 |
< |
myCmd = os.popen(cmd) |
234 |
< |
cmd_out = myCmd.readlines() |
235 |
< |
myCmd.close() |
231 |
> |
self.checkProxy() |
232 |
> |
# id = common.jobDB.jobId(nj) |
233 |
> |
cmd = 'edg-job-get-logging-info -v 2 ' + self.configOpt_() + id |
234 |
> |
cmd_out = runCommand(cmd) |
235 |
|
return cmd_out |
236 |
|
|
237 |
|
def listMatch(self, nj): |
238 |
|
""" |
239 |
|
Check the compatibility of available resources |
240 |
|
""" |
241 |
+ |
self.checkProxy() |
242 |
|
jdl = common.job_list[nj].jdlFilename() |
243 |
< |
edg_ui_cfg_opt = '' |
244 |
< |
if self.edg_config: |
245 |
< |
edg_ui_cfg_opt = ' -c ' + self.edg_config + ' ' |
246 |
< |
if self.edg_config_vo: |
247 |
< |
edg_ui_cfg_opt += ' --config-vo ' + self.edg_config_vo + ' ' |
248 |
< |
cmd = 'edg-job-list-match ' + edg_ui_cfg_opt + jdl |
249 |
< |
myCmd = os.popen(cmd) |
250 |
< |
cmd_out = myCmd.readlines() |
251 |
< |
myCmd.close() |
243 |
> |
cmd = 'edg-job-list-match ' + self.configOpt_() + jdl |
244 |
> |
cmd_out = runCommand(cmd) |
245 |
|
return self.parseListMatch_(cmd_out, jdl) |
246 |
|
|
247 |
|
def parseListMatch_(self, out, jdl): |
248 |
|
reComment = re.compile( r'^\**$' ) |
249 |
|
reEmptyLine = re.compile( r'^$' ) |
250 |
|
reVO = re.compile( r'Selected Virtual Organisation name.*' ) |
251 |
< |
reCE = re.compile( r'CEId' ) |
251 |
> |
reCE = re.compile( r'CEId.*\n((.*:.*)\n)*' ) |
252 |
|
reNO = re.compile( r'No Computing Element matching' ) |
253 |
|
reRB = re.compile( r'Connecting to host' ) |
254 |
|
next = 0 |
255 |
|
CEs=[] |
256 |
|
Match=0 |
257 |
< |
for line in out: |
258 |
< |
line = line.strip() |
259 |
< |
if reComment.match( line ): |
260 |
< |
next = 0 |
261 |
< |
continue |
262 |
< |
if reEmptyLine.match(line): |
263 |
< |
continue |
264 |
< |
if reVO.match( line ): |
265 |
< |
VO =line.split()[-1] |
266 |
< |
common.logger.debug(5, 'VO :'+VO) |
267 |
< |
pass |
268 |
< |
if reRB.match( line ): |
269 |
< |
RB =line.split()[3] |
270 |
< |
common.logger.debug(5, 'Using RB :'+RB) |
271 |
< |
pass |
272 |
< |
if reCE.search( line ): |
273 |
< |
next = 1 |
274 |
< |
continue |
275 |
< |
if next: |
276 |
< |
CE=line.split(':')[0] |
277 |
< |
CEs.append(CE) |
278 |
< |
common.logger.debug(5, 'Matched CE :'+CE) |
257 |
> |
|
258 |
> |
if reNO.match( out ): |
259 |
> |
common.logger.debug(5,out) |
260 |
> |
self.noMatchFound_(jdl) |
261 |
> |
Match=0 |
262 |
> |
pass |
263 |
> |
if reVO.match( out ): |
264 |
> |
VO =reVO.match( out ).group() |
265 |
> |
common.logger.debug(5, 'VO :'+VO) |
266 |
> |
pass |
267 |
> |
|
268 |
> |
if reRB.match( out ): |
269 |
> |
RB =reRB.match(out).group() |
270 |
> |
common.logger.debug(5, 'Using RB :'+RB) |
271 |
> |
pass |
272 |
> |
|
273 |
> |
if reCE.search( out ): |
274 |
> |
groups=reCE.search(out).groups() |
275 |
> |
for CE in groups: |
276 |
> |
tmp = string.strip(CE) |
277 |
> |
CEs.append(tmp) |
278 |
> |
common.logger.debug(5, 'Matched CE :'+tmp) |
279 |
|
Match=Match+1 |
280 |
< |
pass |
281 |
< |
if reNO.match( line ): |
289 |
< |
common.logger.debug(5,line) |
290 |
< |
self.noMatchFound_(jdl) |
291 |
< |
Match=0 |
292 |
< |
pass |
280 |
> |
pass |
281 |
> |
|
282 |
|
return Match |
283 |
|
|
284 |
|
def noMatchFound_(self, jdl): |
305 |
|
Submit one EDG job. |
306 |
|
""" |
307 |
|
|
308 |
+ |
self.checkProxy() |
309 |
|
jid = None |
310 |
|
jdl = common.job_list[nj].jdlFilename() |
311 |
< |
id_tmp = tempfile.mktemp() |
312 |
< |
edg_ui_cfg_opt = ' ' |
323 |
< |
if self.edg_config: |
324 |
< |
edg_ui_cfg_opt = ' -c ' + self.edg_config + ' ' |
325 |
< |
if self.edg_config_vo: |
326 |
< |
edg_ui_cfg_opt += ' --config-vo ' + self.edg_config_vo + ' ' |
327 |
< |
cmd = 'edg-job-submit -o ' + id_tmp + edg_ui_cfg_opt + jdl |
311 |
> |
|
312 |
> |
cmd = 'edg-job-submit ' + self.configOpt_() + jdl |
313 |
|
cmd_out = runCommand(cmd) |
314 |
|
if cmd_out != None: |
315 |
< |
idfile = open(id_tmp) |
316 |
< |
jid_line = idfile.readline() |
332 |
< |
while jid_line[0] == '#': |
333 |
< |
jid_line = idfile.readline() |
334 |
< |
pass |
335 |
< |
jid = string.strip(jid_line) |
336 |
< |
os.unlink(id_tmp) |
315 |
> |
reSid = re.compile( r'https.+' ) |
316 |
> |
jid = reSid.search(cmd_out).group() |
317 |
|
pass |
318 |
|
return jid |
319 |
|
|
330 |
|
def getStatusAttribute_(self, id, attr): |
331 |
|
""" Query a status of the job with id """ |
332 |
|
|
333 |
+ |
self.checkProxy() |
334 |
|
hstates = {} |
335 |
|
Status = importName('edg_wl_userinterface_common_LbWrapper', 'Status') |
336 |
|
# Bypass edg-job-status interfacing directly to C++ API |
364 |
|
Returns the name of directory with results. |
365 |
|
""" |
366 |
|
|
367 |
+ |
self.checkProxy() |
368 |
|
cmd = 'edg-job-get-output --dir ' + common.work_space.resDir() + ' ' + id |
369 |
|
cmd_out = runCommand(cmd) |
370 |
|
|
376 |
|
|
377 |
|
def cancel(self, id): |
378 |
|
""" Cancel the EDG job with id """ |
379 |
+ |
self.checkProxy() |
380 |
|
cmd = 'edg-job-cancel --noint ' + id |
381 |
|
cmd_out = runCommand(cmd) |
382 |
|
return cmd_out |
383 |
|
|
401 |
– |
def checkProxy_(self): |
402 |
– |
""" |
403 |
– |
Function to check the Globus proxy. |
404 |
– |
""" |
405 |
– |
cmd = 'grid-proxy-info -timeleft' |
406 |
– |
cmd_out = runCommand(cmd) |
407 |
– |
ok = 1 |
408 |
– |
timeleft = -999 |
409 |
– |
try: timeleft = int(cmd_out) |
410 |
– |
except ValueError: ok=0 |
411 |
– |
except TypeError: ok=0 |
412 |
– |
if timeleft < 1: ok=0 |
413 |
– |
|
414 |
– |
if ok==0: |
415 |
– |
print "No valid proxy found !\n" |
416 |
– |
print "Creating a user proxy with default length of 100h\n" |
417 |
– |
msg = "Unable to create a valid proxy!\n" |
418 |
– |
if os.system("grid-proxy-init -valid 100:00"): |
419 |
– |
raise CrabException(msg) |
420 |
– |
return |
421 |
– |
|
384 |
|
def createSchScript(self, nj): |
385 |
|
""" |
386 |
|
Create a JDL-file for EDG. |
493 |
|
|
494 |
|
jdl.close() |
495 |
|
return |
496 |
+ |
|
497 |
+ |
def checkProxy(self): |
498 |
+ |
""" |
499 |
+ |
Function to check the Globus proxy. |
500 |
+ |
""" |
501 |
+ |
if (self.proxyValid): return |
502 |
+ |
timeleft = -999 |
503 |
+ |
minTimeLeft=10 # in hours |
504 |
+ |
cmd = 'grid-proxy-info -e -v '+str(minTimeLeft)+':00' |
505 |
+ |
try: cmd_out = runCommand(cmd,0) |
506 |
+ |
except: print cmd_out |
507 |
+ |
if (cmd_out == None or cmd_out=='1'): |
508 |
+ |
common.logger.message( "No valid proxy found or timeleft too short!\n Creating a user proxy with default length of 100h\n") |
509 |
+ |
cmd = 'grid-proxy-init -valid 100:00' |
510 |
+ |
try: |
511 |
+ |
out = os.system(cmd) |
512 |
+ |
if (out>0): raise CrabException("Unable to create a valid proxy!\n") |
513 |
+ |
except: |
514 |
+ |
msg = "Unable to create a valid proxy!\n" |
515 |
+ |
raise CrabException(msg) |
516 |
+ |
cmd = 'grid-proxy-info -timeleft' |
517 |
+ |
cmd_out = runCommand(cmd,0) |
518 |
+ |
#print cmd_out, time.time() |
519 |
+ |
#time.time(cms_out) |
520 |
+ |
pass |
521 |
+ |
self.proxyValid=1 |
522 |
+ |
return |
523 |
+ |
|
524 |
+ |
def configOpt_(self): |
525 |
+ |
edg_ui_cfg_opt = ' ' |
526 |
+ |
if self.edg_config: |
527 |
+ |
edg_ui_cfg_opt = ' -c ' + self.edg_config + ' ' |
528 |
+ |
if self.edg_config_vo: |
529 |
+ |
edg_ui_cfg_opt += ' --config-vo ' + self.edg_config_vo + ' ' |
530 |
+ |
return edg_ui_cfg_opt |