ViewVC Help
View File | Revision Log | Show Annotations | Root Listing
root/cvsroot/COMP/CRAB/python/KillerServer.py
(Generate patch)

Comparing COMP/CRAB/python/KillerServer.py (file contents):
Revision 1.9 by farinafa, Mon Nov 19 18:01:19 2007 UTC vs.
Revision 1.14.4.1 by spiga, Mon Jul 7 13:24:31 2008 UTC

# Line 1 | Line 1
1 import os, common, string
1   from Actor import *
2   from crab_util import *
3 + import common
4 + from ApmonIf import ApmonIf
5 + import time
6 +
7 + import traceback
8 + from xml.dom import minidom
9 + from ServerCommunicator import ServerCommunicator
10  
5 import xml.dom.minidom
11  
12   class KillerServer(Actor):
13 <    # Matteo for kill by range
9 <    def __init__(self, cfg_params, range, parsedRange=[]):
13 >    def __init__(self, cfg_params, range):
14          self.cfg_params = cfg_params
15          self.range = range
16 <        self.parsedRange = parsedRange
16 >
17 >        # init client server params...
18 >        CliServerParams(self)      
19 >
20          return
21  
22      def run(self):
# Line 17 | Line 24 | class KillerServer(Actor):
24          The main method of the class: kill a complete task
25          """
26          common.logger.debug(5, "Killer::run() called")
20        
21        common.jobDB.load()
22        server_name = self.cfg_params['CRAB.server_name'] # gsiftp://pcpg01.cern.ch/data/SEDir/
23        WorkDirName =os.path.basename(os.path.split(common.work_space.topDir())[0])
24        projectUniqName = 'crab_'+str(WorkDirName)+'_'+common.taskDB.dict('TasKUUID')
25
26        ### Here start the kill operation  
27        try:
28            x509=os.environ['X509_USER_PROXY']
29        except Exception, ex:
30            import traceback
31            common.logger.debug( 6, str(ex) )
32            x509_cmd = 'ls /tmp/x509up_u`id -u`'
33            x509=runCommand(x509_cmd).strip()
34        pSubj = os.popen3('openssl x509 -in '+str(x509)+' -subject -noout')[1].readlines()[0]
35      
36        try:
37            self.cfile = xml.dom.minidom.Document()
38            root = self.cfile.createElement("TaskCommand")
39            node = self.cfile.createElement("TaskAttributes")
40            node.setAttribute("Task", projectUniqName)
41            node.setAttribute("Subject", string.strip(pSubj))
42            node.setAttribute("Command", "kill")
43            node.setAttribute("Range", str(self.parsedRange)) # Matteo for kill by range
44            root.appendChild(node)
45            self.cfile.appendChild(root)
46            self.toFile(WorkDirName + '/share/command.xml')
47            cmd = 'lcg-cp --vo cms file://'+os.getcwd()+'/'+str(WorkDirName)+'/share/command.xml gsiftp://' + str(server_name) + str(projectUniqName)+'.xml'
48            retcode = os.system(cmd)
49            if retcode: raise CrabException("Failed to ship kill command to server")
50            else: common.logger.message("Kill command succesfully shipped to server")
51        except RuntimeError,e:
52            msg +="Project "+str(WorkDirName)+" not killed: \n"      
53            raise CrabException(msg + e.__str__())
54
55        # synch the range of submitted jobs to server (otherwise You wont be able to submit them again) # Fabio
56        file = open(common.work_space.shareDir()+'/submit_directive','r')
57        subms = str(file.readlines()[0]).split('\n')[0]
58        file.close()
59        if self.range=='all':
60            subms = []
61        elif self.range != None and self.range != "":
62            if len(self.range)!=0:
63                subms = eval(subms)
64                for i in self.parsedRange:
65                    if i in subms:
66                        subms.remove(i)
67        
68        file = open(common.work_space.shareDir()+'/submit_directive','w')
69        file.write(str(subms))
70        file.close()
71        #  
72        return
73                
74    def toFile(self, filename):
75        filename_tmp = filename+".tmp"
76        file = open(filename_tmp, 'w')
77        # Fix for minidom # Fabio
78            # xml.dom.ext.PrettyPrint(self.cfile, file)
79        file.write(self.cfile.toprettyxml())
80        #
81        file.close()
82        os.rename(filename_tmp, filename) # this should be an atomic operation thread-safe and multiprocess-safe
27  
28 +        task = common._db.getTask(self.range)
29 +        toBeKilled = []
30 +        for job  in task.jobs:
31 +           if job.runningJob['status'] in ['SS','R','SR','SW', 'SU']:
32 +               toBeKilled.append(job['jobId'])
33 +           else:
34 +               common.logger.message("Not possible to kill Job #"+str(job['jobId'])+" : Status is "+str(job.runningJob['statusScheduler']))
35 +           pass
36 +
37 +        if len(toBeKilled)>0:
38 +            ## register proxy ##
39 +            csCommunicator = ServerCommunicator(self.server_name, self.server_port, self.cfg_params)
40 +
41 +            taskuuid = str(common._db.queryTask('name'))
42 +            ret = csCommunicator.killJobs( taskuuid, toBeKilled)
43 +            del csCommunicator
44 +
45 +            if ret != 0:
46 +                msg = "ClientServer ERROR: %d raised during the communication.\n"%ret
47 +                raise CrabException(msg)
48 +
49 +            # update runningjobs status
50 +            updList = [{'statusScheduler':'Killing', 'status':'KK'}] * len(toBeKilled)
51 +            common._db.updateRunJob_(toBeKilled, updList)
52 +
53 +            # printout the command result
54 +            common.logger.message("Kill request for %d jobs succesfully sent to the server\n"%len(toBeKilled) )
55  
56 +        return
57 +                

Diff Legend

Removed lines
+ Added lines
< Changed lines
> Changed lines