1 |
import os, common, string
|
2 |
from Actor import *
|
3 |
|
4 |
import xml.dom.minidom
|
5 |
import xml.dom.ext
|
6 |
|
7 |
class KillerServer(Actor):
|
8 |
# Matteo for kill by range
|
9 |
def __init__(self, cfg_params, range, parsedRange=[]):
|
10 |
self.cfg_params = cfg_params
|
11 |
self.range = range
|
12 |
self.parsedRange = parsedRange
|
13 |
return
|
14 |
|
15 |
# def __init__(self, cfg_params):
|
16 |
# self.cfg_params = cfg_params
|
17 |
# return
|
18 |
|
19 |
def run(self):
|
20 |
"""
|
21 |
The main method of the class: kill a complete task
|
22 |
"""
|
23 |
common.logger.debug(5, "Killer::run() called")
|
24 |
|
25 |
common.jobDB.load()
|
26 |
server_name = self.cfg_params['CRAB.server_name'] # gsiftp://pcpg01.cern.ch/data/SEDir/
|
27 |
WorkDirName =os.path.basename(os.path.split(common.work_space.topDir())[0])
|
28 |
projectUniqName = 'crab_'+str(WorkDirName)+'_'+common.taskDB.dict('TasKUUID')
|
29 |
#common.taskDB.load()
|
30 |
#common.taskDB.setDict('projectName',projectUniqName)
|
31 |
#common.taskDB.save()
|
32 |
|
33 |
### Here start the kill operation
|
34 |
try:
|
35 |
x509=os.environ['X509_USER_PROXY']
|
36 |
except Exception, ex:
|
37 |
import traceback
|
38 |
common.logger.debug( 6, str(ex) )
|
39 |
common.logger.debug( 6, traceback.format_exc() )
|
40 |
x509_cmd = 'ls /tmp/x509up_u`id -u`'
|
41 |
x509=runCommand(x509_cmd).strip()
|
42 |
pSubj = os.popen3('openssl x509 -in '+str(x509)+' -subject -noout')[1].readlines()[0]
|
43 |
|
44 |
try:
|
45 |
self.cfile = xml.dom.minidom.Document()
|
46 |
root = self.cfile.createElement("TaskCommand")
|
47 |
node = self.cfile.createElement("TaskAttributes")
|
48 |
node.setAttribute("Task", projectUniqName)
|
49 |
node.setAttribute("Subject", string.strip(pSubj))
|
50 |
node.setAttribute("Command", "kill")
|
51 |
node.setAttribute("Range", str(self.parsedRange)) # Matteo for kill by range
|
52 |
root.appendChild(node)
|
53 |
self.cfile.appendChild(root)
|
54 |
self.toFile(WorkDirName + '/share/command.xml')
|
55 |
cmd = 'lcg-cp --vo cms file://'+os.getcwd()+'/'+str(WorkDirName)+'/share/command.xml gsiftp://' + str(server_name) + str(projectUniqName)+'.xml'
|
56 |
retcode = os.system(cmd)
|
57 |
if retcode: raise CrabException("Failed to ship kill command to server")
|
58 |
else: common.logger.message("Kill command succesfully shipped to server")
|
59 |
except RuntimeError,e:
|
60 |
msg +="Project "+str(WorkDirName)+" not killed: \n"
|
61 |
raise CrabException(msg + e.__str__())
|
62 |
|
63 |
# synch the range of submitted jobs to server (otherwise You wont be able to submit them again) # Fabio
|
64 |
file = open(common.work_space.shareDir()+'/submit_directive','r')
|
65 |
subms = str(file.readlines()[0]).split('\n')[0]
|
66 |
file.close()
|
67 |
if self.range=='all':
|
68 |
subms = []
|
69 |
elif self.range != None and self.range != "":
|
70 |
if len(self.range)!=0:
|
71 |
subms = eval(subms)
|
72 |
for i in self.parsedRange:
|
73 |
if i in subms:
|
74 |
subms.remove(i)
|
75 |
|
76 |
file = open(common.work_space.shareDir()+'/submit_directive','w')
|
77 |
file.write(str(subms))
|
78 |
file.close()
|
79 |
#
|
80 |
return
|
81 |
|
82 |
def toFile(self, filename):
|
83 |
filename_tmp = filename+".tmp"
|
84 |
file = open(filename_tmp, 'w')
|
85 |
xml.dom.ext.PrettyPrint(self.cfile, file)
|
86 |
file.close()
|
87 |
os.rename(filename_tmp, filename) # this should be an atomic operation thread-safe and multiprocess-safe
|