1 |
corvo |
1.1 |
import os, common, string
|
2 |
|
|
from Actor import *
|
3 |
|
|
|
4 |
|
|
import xml.dom.minidom
|
5 |
|
|
import xml.dom.ext
|
6 |
|
|
|
7 |
|
|
class KillerServer(Actor):
|
8 |
farinafa |
1.2 |
# Matteo for kill by range
|
9 |
farinafa |
1.4 |
def __init__(self, cfg_params, range, parsedRange=[]):
|
10 |
corvo |
1.1 |
self.cfg_params = cfg_params
|
11 |
farinafa |
1.2 |
self.range = range
|
12 |
farinafa |
1.4 |
self.parsedRange = parsedRange
|
13 |
corvo |
1.1 |
return
|
14 |
|
|
|
15 |
farinafa |
1.2 |
# def __init__(self, cfg_params):
|
16 |
|
|
# self.cfg_params = cfg_params
|
17 |
|
|
# return
|
18 |
|
|
|
19 |
corvo |
1.1 |
def run(self):
|
20 |
|
|
"""
|
21 |
|
|
The main method of the class: kill a complete task
|
22 |
|
|
"""
|
23 |
|
|
common.logger.debug(5, "Killer::run() called")
|
24 |
|
|
|
25 |
|
|
common.jobDB.load()
|
26 |
|
|
server_name = self.cfg_params['CRAB.server_name'] # gsiftp://pcpg01.cern.ch/data/SEDir/
|
27 |
|
|
WorkDirName =os.path.basename(os.path.split(common.work_space.topDir())[0])
|
28 |
|
|
projectUniqName = 'crab_'+str(WorkDirName)+'_'+common.taskDB.dict('TasKUUID')
|
29 |
|
|
#common.taskDB.load()
|
30 |
|
|
#common.taskDB.setDict('projectName',projectUniqName)
|
31 |
|
|
#common.taskDB.save()
|
32 |
|
|
|
33 |
|
|
### Here start the kill operation
|
34 |
spiga |
1.3 |
pSubj = os.popen3('openssl x509 -in /tmp/x509up_u`id -u` -subject -noout')[1].readlines()[0]
|
35 |
corvo |
1.1 |
|
36 |
|
|
try:
|
37 |
|
|
self.cfile = xml.dom.minidom.Document()
|
38 |
|
|
root = self.cfile.createElement("TaskCommand")
|
39 |
|
|
node = self.cfile.createElement("TaskAttributes")
|
40 |
|
|
node.setAttribute("Task", projectUniqName)
|
41 |
|
|
node.setAttribute("Subject", string.strip(pSubj))
|
42 |
|
|
node.setAttribute("Command", "kill")
|
43 |
farinafa |
1.4 |
node.setAttribute("Range", str(self.parsedRange)) # Matteo for kill by range
|
44 |
corvo |
1.1 |
root.appendChild(node)
|
45 |
|
|
self.cfile.appendChild(root)
|
46 |
|
|
self.toFile(WorkDirName + '/share/command.xml')
|
47 |
|
|
cmd = 'lcg-cp --vo cms file://'+os.getcwd()+'/'+str(WorkDirName)+'/share/command.xml gsiftp://' + str(server_name) + str(projectUniqName)+'.xml'
|
48 |
|
|
retcode = os.system(cmd)
|
49 |
|
|
if retcode: raise CrabException("Failed to ship kill command to server")
|
50 |
|
|
else: common.logger.message("Kill command succesfully shipped to server")
|
51 |
|
|
except RuntimeError,e:
|
52 |
|
|
msg +="Project "+str(WorkDirName)+" not killed: \n"
|
53 |
|
|
raise CrabException(msg + e.__str__())
|
54 |
|
|
|
55 |
farinafa |
1.4 |
# synch the range of submitted jobs to server (otherwise You wont be able to submit them again) # Fabio
|
56 |
|
|
file = open(common.work_space.shareDir()+'/submit_directive','r')
|
57 |
|
|
subms = str(file.readlines()[0]).split('\n')[0]
|
58 |
|
|
file.close()
|
59 |
|
|
if self.range=='all':
|
60 |
|
|
subms = []
|
61 |
mcinquil |
1.5 |
elif self.range != None and self.range != "":
|
62 |
|
|
if len(self.range)!=0:
|
63 |
|
|
subms = eval(subms)
|
64 |
|
|
for i in self.parsedRange:
|
65 |
|
|
if i in subms:
|
66 |
|
|
subms.remove(i)
|
67 |
|
|
|
68 |
farinafa |
1.4 |
file = open(common.work_space.shareDir()+'/submit_directive','w')
|
69 |
|
|
file.write(str(subms))
|
70 |
|
|
file.close()
|
71 |
|
|
#
|
72 |
corvo |
1.1 |
return
|
73 |
|
|
|
74 |
|
|
def toFile(self, filename):
|
75 |
|
|
filename_tmp = filename+".tmp"
|
76 |
|
|
file = open(filename_tmp, 'w')
|
77 |
|
|
xml.dom.ext.PrettyPrint(self.cfile, file)
|
78 |
|
|
file.close()
|
79 |
|
|
os.rename(filename_tmp, filename) # this should be an atomic operation thread-safe and multiprocess-safe
|