ViewVC Help
View File | Revision Log | Show Annotations | Root Listing
root/cvsroot/COMP/CRAB/python/KillerServer.py
(Generate patch)

Comparing COMP/CRAB/python/KillerServer.py (file contents):
Revision 1.9 by farinafa, Mon Nov 19 18:01:19 2007 UTC vs.
Revision 1.21 by spiga, Sun Aug 31 22:26:13 2008 UTC

# Line 1 | Line 1
1 import os, common, string
1   from Actor import *
2   from crab_util import *
3 + import common
4 + from ApmonIf import ApmonIf
5 + import time
6 +
7 + import traceback
8 + from xml.dom import minidom
9 + from ServerCommunicator import ServerCommunicator
10  
11 < import xml.dom.minidom
11 > from StatusServer import StatusServer
12  
13 < class KillerServer(Actor):
14 <    # Matteo for kill by range
9 <    def __init__(self, cfg_params, range, parsedRange=[]):
13 > class KillerServer(Actor, StatusServer):
14 >    def __init__(self, cfg_params, range):
15          self.cfg_params = cfg_params
16          self.range = range
17 <        self.parsedRange = parsedRange
17 >
18 >        # init client server params...
19 >        CliServerParams(self)      
20 >
21          return
22  
23      def run(self):
# Line 17 | Line 25 | class KillerServer(Actor):
25          The main method of the class: kill a complete task
26          """
27          common.logger.debug(5, "Killer::run() called")
20        
21        common.jobDB.load()
22        server_name = self.cfg_params['CRAB.server_name'] # gsiftp://pcpg01.cern.ch/data/SEDir/
23        WorkDirName =os.path.basename(os.path.split(common.work_space.topDir())[0])
24        projectUniqName = 'crab_'+str(WorkDirName)+'_'+common.taskDB.dict('TasKUUID')
25
26        ### Here start the kill operation  
27        try:
28            x509=os.environ['X509_USER_PROXY']
29        except Exception, ex:
30            import traceback
31            common.logger.debug( 6, str(ex) )
32            x509_cmd = 'ls /tmp/x509up_u`id -u`'
33            x509=runCommand(x509_cmd).strip()
34        pSubj = os.popen3('openssl x509 -in '+str(x509)+' -subject -noout')[1].readlines()[0]
35      
36        try:
37            self.cfile = xml.dom.minidom.Document()
38            root = self.cfile.createElement("TaskCommand")
39            node = self.cfile.createElement("TaskAttributes")
40            node.setAttribute("Task", projectUniqName)
41            node.setAttribute("Subject", string.strip(pSubj))
42            node.setAttribute("Command", "kill")
43            node.setAttribute("Range", str(self.parsedRange)) # Matteo for kill by range
44            root.appendChild(node)
45            self.cfile.appendChild(root)
46            self.toFile(WorkDirName + '/share/command.xml')
47            cmd = 'lcg-cp --vo cms file://'+os.getcwd()+'/'+str(WorkDirName)+'/share/command.xml gsiftp://' + str(server_name) + str(projectUniqName)+'.xml'
48            retcode = os.system(cmd)
49            if retcode: raise CrabException("Failed to ship kill command to server")
50            else: common.logger.message("Kill command succesfully shipped to server")
51        except RuntimeError,e:
52            msg +="Project "+str(WorkDirName)+" not killed: \n"      
53            raise CrabException(msg + e.__str__())
54
55        # synch the range of submitted jobs to server (otherwise You wont be able to submit them again) # Fabio
56        file = open(common.work_space.shareDir()+'/submit_directive','r')
57        subms = str(file.readlines()[0]).split('\n')[0]
58        file.close()
59        if self.range=='all':
60            subms = []
61        elif self.range != None and self.range != "":
62            if len(self.range)!=0:
63                subms = eval(subms)
64                for i in self.parsedRange:
65                    if i in subms:
66                        subms.remove(i)
67        
68        file = open(common.work_space.shareDir()+'/submit_directive','w')
69        file.write(str(subms))
70        file.close()
71        #  
72        return
73                
74    def toFile(self, filename):
75        filename_tmp = filename+".tmp"
76        file = open(filename_tmp, 'w')
77        # Fix for minidom # Fabio
78            # xml.dom.ext.PrettyPrint(self.cfile, file)
79        file.write(self.cfile.toprettyxml())
80        #
81        file.close()
82        os.rename(filename_tmp, filename) # this should be an atomic operation thread-safe and multiprocess-safe
28  
29 +        # get updated status from server #inherited from StatusServer
30 +        self.resynchClientSide()
31  
32 +        task = common._db.getTask(self.range)
33 +        toBeKilled = []
34 +        toSetKilling = []
35 +        for job  in task.jobs:
36 +           if job.runningJob['status'] not in ['C','E','KK','SK','SU','SA','SSE']:
37 +               toBeKilled.append(job['jobId'])
38 +               if job.runningJob['status'] != 'SD': toSetKilling.append(job['jobId'])
39 +           else:
40 +               common.logger.message("Not possible to kill Job #"+str(job['jobId'])+" : Status is "+str(job.runningJob['statusScheduler']))
41 +           pass
42 +
43 +        if len(toBeKilled)>0:
44 +            ## register proxy ##
45 +            csCommunicator = ServerCommunicator(self.server_name, self.server_port, self.cfg_params)
46 +
47 +            taskuuid = str(common._db.queryTask('name'))
48 +            ret = csCommunicator.killJobs( taskuuid, toBeKilled)
49 +            del csCommunicator
50 +
51 +            if ret != 0:
52 +                msg = "ClientServer ERROR: %d raised during the communication.\n"%ret
53 +                raise CrabException(msg)
54 +
55 +            # update runningjobs status
56 +            updList = [{'statusScheduler':'Killing', 'status':'KK'}] * len(toSetKilling)
57 +            common._db.updateRunJob_(toSetKilling, updList)
58 +
59 +            # printout the command result
60 +            common.logger.message("Kill request for %d jobs succesfully sent to the server\n"%len(toBeKilled) )
61 +
62 +        return
63 +                

Diff Legend

Removed lines
+ Added lines
< Changed lines
> Changed lines