1 |
|
import os, common, string |
2 |
|
from Actor import * |
3 |
+ |
from crab_util import * |
4 |
|
|
5 |
|
import xml.dom.minidom |
5 |
– |
import xml.dom.ext |
6 |
|
|
7 |
|
class KillerServer(Actor): |
8 |
|
# Matteo for kill by range |
9 |
< |
def __init__(self, cfg_params, range): |
9 |
> |
def __init__(self, cfg_params, range, parsedRange=[]): |
10 |
|
self.cfg_params = cfg_params |
11 |
|
self.range = range |
12 |
+ |
self.parsedRange = parsedRange |
13 |
|
return |
14 |
|
|
14 |
– |
# def __init__(self, cfg_params): |
15 |
– |
# self.cfg_params = cfg_params |
16 |
– |
# return |
17 |
– |
|
15 |
|
def run(self): |
16 |
|
""" |
17 |
|
The main method of the class: kill a complete task |
22 |
|
server_name = self.cfg_params['CRAB.server_name'] # gsiftp://pcpg01.cern.ch/data/SEDir/ |
23 |
|
WorkDirName =os.path.basename(os.path.split(common.work_space.topDir())[0]) |
24 |
|
projectUniqName = 'crab_'+str(WorkDirName)+'_'+common.taskDB.dict('TasKUUID') |
28 |
– |
#common.taskDB.load() |
29 |
– |
#common.taskDB.setDict('projectName',projectUniqName) |
30 |
– |
#common.taskDB.save() |
25 |
|
|
26 |
|
### Here start the kill operation |
27 |
< |
pSubj = os.popen3('openssl x509 -in /tmp/x509up_u`id -u` -subject -noout')[1].readlines()[0] |
27 |
> |
try: |
28 |
> |
x509=os.environ['X509_USER_PROXY'] |
29 |
> |
except Exception, ex: |
30 |
> |
import traceback |
31 |
> |
common.logger.debug( 6, str(ex) ) |
32 |
> |
x509_cmd = 'ls /tmp/x509up_u`id -u`' |
33 |
> |
x509=runCommand(x509_cmd).strip() |
34 |
> |
pSubj = os.popen3('openssl x509 -in '+str(x509)+' -subject -noout')[1].readlines()[0] |
35 |
|
|
36 |
|
try: |
37 |
|
self.cfile = xml.dom.minidom.Document() |
40 |
|
node.setAttribute("Task", projectUniqName) |
41 |
|
node.setAttribute("Subject", string.strip(pSubj)) |
42 |
|
node.setAttribute("Command", "kill") |
43 |
< |
node.setAttribute("Range", str(self.range)) # Matteo for kill by range |
43 |
> |
node.setAttribute("Range", str(self.parsedRange)) # Matteo for kill by range |
44 |
|
root.appendChild(node) |
45 |
|
self.cfile.appendChild(root) |
46 |
|
self.toFile(WorkDirName + '/share/command.xml') |
52 |
|
msg +="Project "+str(WorkDirName)+" not killed: \n" |
53 |
|
raise CrabException(msg + e.__str__()) |
54 |
|
|
55 |
+ |
# synch the range of submitted jobs to server (otherwise You wont be able to submit them again) # Fabio |
56 |
+ |
file = open(common.work_space.shareDir()+'/submit_directive','r') |
57 |
+ |
subms = str(file.readlines()[0]).split('\n')[0] |
58 |
+ |
file.close() |
59 |
+ |
if self.range=='all': |
60 |
+ |
subms = [] |
61 |
+ |
elif self.range != None and self.range != "": |
62 |
+ |
if len(self.range)!=0: |
63 |
+ |
subms = eval(subms) |
64 |
+ |
for i in self.parsedRange: |
65 |
+ |
if i in subms: |
66 |
+ |
subms.remove(i) |
67 |
+ |
|
68 |
+ |
file = open(common.work_space.shareDir()+'/submit_directive','w') |
69 |
+ |
file.write(str(subms)) |
70 |
+ |
file.close() |
71 |
+ |
# |
72 |
|
return |
73 |
|
|
74 |
|
def toFile(self, filename): |
75 |
|
filename_tmp = filename+".tmp" |
76 |
|
file = open(filename_tmp, 'w') |
77 |
< |
xml.dom.ext.PrettyPrint(self.cfile, file) |
77 |
> |
# Fix for minidom # Fabio |
78 |
> |
# xml.dom.ext.PrettyPrint(self.cfile, file) |
79 |
> |
file.write(self.cfile.toprettyxml()) |
80 |
> |
# |
81 |
|
file.close() |
82 |
|
os.rename(filename_tmp, filename) # this should be an atomic operation thread-safe and multiprocess-safe |
83 |
+ |
|
84 |
+ |
|