ViewVC Help
View File | Revision Log | Show Annotations | Root Listing
root/cvsroot/COMP/CRAB/python/KillerServer.py
(Generate patch)

Comparing COMP/CRAB/python/KillerServer.py (file contents):
Revision 1.9 by farinafa, Mon Nov 19 18:01:19 2007 UTC vs.
Revision 1.32 by spiga, Mon Jun 15 09:35:06 2009 UTC

# Line 1 | Line 1
1 import os, common, string
1   from Actor import *
2   from crab_util import *
3 + import common
4 + from ApmonIf import ApmonIf
5 + import time
6 +
7 + import traceback
8 + from xml.dom import minidom
9 + from ServerCommunicator import ServerCommunicator
10  
5 import xml.dom.minidom
11  
12   class KillerServer(Actor):
13 <    # Matteo for kill by range
9 <    def __init__(self, cfg_params, range, parsedRange=[]):
13 >    def __init__(self, cfg_params, range):
14          self.cfg_params = cfg_params
15          self.range = range
16 <        self.parsedRange = parsedRange
16 >
17 >        # init client server params...
18 >        CliServerParams(self)      
19 >
20          return
21  
22      def run(self):
23          """
24          The main method of the class: kill a complete task
25          """
26 <        common.logger.debug(5, "Killer::run() called")
20 <        
21 <        common.jobDB.load()
22 <        server_name = self.cfg_params['CRAB.server_name'] # gsiftp://pcpg01.cern.ch/data/SEDir/
23 <        WorkDirName =os.path.basename(os.path.split(common.work_space.topDir())[0])
24 <        projectUniqName = 'crab_'+str(WorkDirName)+'_'+common.taskDB.dict('TasKUUID')
26 >        common.logger.debug( "Killer::run() called")
27  
28 <        ### Here start the kill operation  
28 >        # get updated status from server #inherited from StatusServer
29          try:
30 <            x509=os.environ['X509_USER_PROXY']
31 <        except Exception, ex:
32 <            import traceback
33 <            common.logger.debug( 6, str(ex) )
34 <            x509_cmd = 'ls /tmp/x509up_u`id -u`'
35 <            x509=runCommand(x509_cmd).strip()
36 <        pSubj = os.popen3('openssl x509 -in '+str(x509)+' -subject -noout')[1].readlines()[0]
37 <      
38 <        try:
39 <            self.cfile = xml.dom.minidom.Document()
40 <            root = self.cfile.createElement("TaskCommand")
41 <            node = self.cfile.createElement("TaskAttributes")
42 <            node.setAttribute("Task", projectUniqName)
43 <            node.setAttribute("Subject", string.strip(pSubj))
44 <            node.setAttribute("Command", "kill")
45 <            node.setAttribute("Range", str(self.parsedRange)) # Matteo for kill by range
46 <            root.appendChild(node)
47 <            self.cfile.appendChild(root)
48 <            self.toFile(WorkDirName + '/share/command.xml')
49 <            cmd = 'lcg-cp --vo cms file://'+os.getcwd()+'/'+str(WorkDirName)+'/share/command.xml gsiftp://' + str(server_name) + str(projectUniqName)+'.xml'
50 <            retcode = os.system(cmd)
51 <            if retcode: raise CrabException("Failed to ship kill command to server")
52 <            else: common.logger.message("Kill command succesfully shipped to server")
53 <        except RuntimeError,e:
54 <            msg +="Project "+str(WorkDirName)+" not killed: \n"      
55 <            raise CrabException(msg + e.__str__())
56 <
57 <        # synch the range of submitted jobs to server (otherwise You wont be able to submit them again) # Fabio
58 <        file = open(common.work_space.shareDir()+'/submit_directive','r')
59 <        subms = str(file.readlines()[0]).split('\n')[0]
60 <        file.close()
61 <        if self.range=='all':
62 <            subms = []
61 <        elif self.range != None and self.range != "":
62 <            if len(self.range)!=0:
63 <                subms = eval(subms)
64 <                for i in self.parsedRange:
65 <                    if i in subms:
66 <                        subms.remove(i)
67 <        
68 <        file = open(common.work_space.shareDir()+'/submit_directive','w')
69 <        file.write(str(subms))
70 <        file.close()
71 <        #  
72 <        return
73 <                
74 <    def toFile(self, filename):
75 <        filename_tmp = filename+".tmp"
76 <        file = open(filename_tmp, 'w')
77 <        # Fix for minidom # Fabio
78 <            # xml.dom.ext.PrettyPrint(self.cfile, file)
79 <        file.write(self.cfile.toprettyxml())
80 <        #
81 <        file.close()
82 <        os.rename(filename_tmp, filename) # this should be an atomic operation thread-safe and multiprocess-safe
30 >            from StatusServer import StatusServer
31 >            stat = StatusServer(self.cfg_params)
32 >            stat.resynchClientSide()
33 >        except:
34 >            pass    
35 >
36 >        task = common._db.getTask(self.range)
37 >        toBeKilled = []
38 >        for job  in task.jobs:
39 >           #if job.runningJob['status'] not in ['C','E','KK','K','SU','SA','NS']: # commented for fast-kill at registration ,'SSE']:
40 >           if job.runningJob['state'] in ['SubSuccess','SubRequested']:
41 >               toBeKilled.append(job['jobId'])
42 >           else:
43 >               common.logger.info("Not possible to kill Job #"+str(job['jobId'])+\
44 >                       " : Last action was: "+str(job.runningJob['state'])+\
45 >                       " Status is "+str(job.runningJob['statusScheduler']))
46 >           pass
47 >
48 >        if len(toBeKilled)>0:
49 >            ## register proxy ##
50 >            csCommunicator = ServerCommunicator(self.server_name, self.server_port, self.cfg_params)
51 >
52 >            taskuuid = str(common._db.queryTask('name'))
53 >            ret = csCommunicator.killJobs( taskuuid, toBeKilled)
54 >            del csCommunicator
55 >
56 >            if ret != 0:
57 >                msg = "ClientServer ERROR: %d raised during the communication.\n"%ret
58 >                raise CrabException(msg)
59 >
60 >            # printout the command result
61 >            common.scheduler.cancel(toBeKilled)
62 >            common._db.updateRunJob_(toBeKilled, [{'state':'KillRequested'}]*len(toBeKilled))
63  
64 +            common.logger.info("Kill request for %d jobs succesfully sent to the server\n"%len(toBeKilled) )
65  
66 +        return
67 +                

Diff Legend

Removed lines
+ Added lines
< Changed lines
> Changed lines