ViewVC Help
View File | Revision Log | Show Annotations | Root Listing
root/cvsroot/COMP/CRAB/python/PostMortemServer.py
(Generate patch)

Comparing COMP/CRAB/python/PostMortemServer.py (file contents):
Revision 1.1 by spiga, Sun Jun 3 17:52:05 2007 UTC vs.
Revision 1.11 by mcinquil, Wed Sep 23 20:26:49 2009 UTC

# Line 1 | Line 1
1 < from Actor import *
1 > from PostMortem import PostMortem
2 >
3   from crab_util import *
4   import common
5 < from ApmonIf import ApmonIf
5 < import Statistic
6 < import time
7 < from ProgressBar import ProgressBar
8 < from TerminalController import TerminalController
9 <
10 < import xml.dom.minidom
11 < import xml.dom.ext
12 <
13 < class PostMortemServer(Actor):
14 <
15 <    def __init__(self, cfg_params,):
16 <        self.cfg_params = cfg_params
17 <        try:  
18 <            self.server_name = self.cfg_params['CRAB.server_name'] # gsiftp://pcpg01.cern.ch/data/SEDir/
19 <        except KeyError:
20 <            msg = 'No server selected ...'
21 <            msg = msg + 'Please specify a server in the crab cfg file'
22 <            raise CrabException(msg)
23 <        return
24 <    
25 <    def run(self):
26 <        """
27 <        The main method of the class: retrieve the post mortem output from server
28 <        """
29 <        common.logger.debug(5, "PostMortem server::run() called")
30 <
31 <        start = time.time()
5 > import string, os
6  
7 <        common.scheduler.checkProxy()
7 > from ProdCommon.Storage.SEAPI.SElement import SElement
8 > from ProdCommon.Storage.SEAPI.SBinterface import SBinterface
9  
10 <        common.taskDB.load()
11 <        WorkDirName =os.path.basename(os.path.split(common.work_space.topDir())[0])
12 <        projectUniqName = 'crab_'+str(WorkDirName)+'_'+common.taskDB.dict("TasKUUID")    
13 <        #Need to add a check on the treashold level
14 <        # and on the task readness  TODO  
10 > class PostMortemServer(PostMortem):
11 >    def __init__(self, cfg_params, nj_list):
12 >            
13 >        PostMortem.__init__(self, cfg_params, nj_list)
14 >
15 >        # init client server params...
16 >        CliServerParams(self)        
17 >
18 >        if self.storage_path[0]!='/':
19 >            self.storage_path = '/'+self.storage_path
20 >        
21 >        return
22 >    
23 >    def collectLogging(self):
24 >        # get updated status from server
25          try:
26 <            ### retrieving poject from the server
27 <            common.logger.message("Retrieving the poject from the server...\n")
28 <
44 <            copyHere = common.work_space.jobDir() # MATT
45 <            
46 <            cmd = 'lcg-cp --vo cms --verbose gsiftp://' + str(self.server_name) + str(projectUniqName)+'/res/failed.tgz file://'+copyHere+'failed.tgz'# MATT
47 <            common.logger.debug(5, cmd)
48 <            copyOut = os.system(cmd +' >& /dev/null')
26 >            from StatusServer import StatusServer
27 >            stat = StatusServer(self.cfg_params)
28 >            stat.resynchClientSide()
29          except:
30 <            msg = ("postMortem output not yet available")
30 >            pass
31 >
32 >        #create once storage interaction object
33 >        seEl = None
34 >        loc = None
35 >        try:  
36 >            seEl = SElement(self.storage_name, self.storage_proto, self.storage_port)
37 >        except Exception, ex:
38 >            common.logger.debug( str(ex))
39 >            msg = "ERROR: Unable to create SE source interface \n"
40              raise CrabException(msg)
41 +        try:
42 +            loc = SElement("localhost", "local")
43 +        except Exception, ex:
44 +            common.logger.debug( str(ex))
45 +            msg = "ERROR: Unable to create destination interface \n"
46 +            raise CrabException(msg)
47 +
48 +        ## coupling se interfaces
49 +        sbi = SBinterface( seEl, loc )
50  
51 <        zipOut = "failed.tgz"
52 <        if os.path.exists( copyHere + zipOut ): # MATT
55 <            cwd = os.getcwd()
56 <            os.chdir( copyHere )# MATT
57 <            common.logger.debug( 5, 'tar -zxvf ' + zipOut )
58 <            cmd = 'tar -zxvf ' + zipOut
59 <            cmd += '; mv .tmpFailed/* .; rm -drf .tmpDone/'
60 <            cmd_out = runCommand(cmd)
61 <            os.chdir(cwd)
62 <            common.logger.debug( 5, 'rm -f '+copyHere+zipOut )# MATT
63 <            cmd = 'rm -f '+copyHere+zipOut# MATT
64 <            cmd_out = runCommand(cmd)
65 <
66 <            msg='Logging info for project '+str(WorkDirName)+': \n'      
67 <            msg+='written to '+copyHere+' \n'      # MATT
68 <            common.logger.message(msg)
69 <        else:
70 <            common.logger.message("Logging info is not yet ready....\n")
51 >        ## get the list of jobs to get logging.info skimmed by failed status
52 >        logginable = self.skimDeadList()
53  
54 +        ## iter over each asked job and print warning if not in skimmed list
55 +        for id in self.nj_list:
56 +            if id not in self.all_jobs:
57 +                common.logger.info('Warning: job # ' + str(id) + ' does not exist! Not possible to ask for postMortem ')
58 +                continue
59 +            elif id in logginable:  
60 +                fname = self.fname_base + str(id) + '.LoggingInfo'
61 +                if os.path.exists(fname):
62 +                    common.logger.info('Logging info for job ' + str(id) + ' already present in '+fname+'\nRemove it for update')
63 +                    continue
64 +                ## retrieving & processing logging info
65 +                if self.retrieveFile( sbi, id, fname):
66 +                    ## decode logging info
67 +                    fl = open(fname, 'r')
68 +                    out = "".join(fl.readlines())  
69 +                    fl.close()
70 +                    reason = self.decodeLogging(out)
71 +                    common.logger.info('Logging info for job '+ str(id) +': '+str(reason)+'\n      written to '+str(fname)+' \n' )
72 +                else:
73 +                    common.logger.info('Logging info for job '+ str(id) +' not retrieved. Tring to get loggingInfo manually')
74 +                    PostMortem.collectOneLogging(self,id)
75 +            else:
76 +                common.logger.info('Warning: job # ' + str(id) + ' not killed or aborted! Will get loggingInfo manually ')
77 +                PostMortem.collectOneLogging(self,id)
78          return
79  
80 +
81 +    def skimDeadList(self):
82 +        """
83 +        __skimDeadList__
84 +        return the list of jobs really failed: K, A
85 +        """
86 +        skimmedlist = []
87 +        self.up_task = common._db.getTask( self.nj_list )
88 +        for job in self.up_task.jobs:
89 +            if job.runningJob['status'] in ['K','A']:
90 +                skimmedlist.append(job['jobId'])
91 +        return skimmedlist
92 +        
93 +    def retrieveFile(self, sbi, jobid, destlog):
94 +        """
95 +        __retrieveFile__
96 +
97 +        retrieves logging.info file from the server storage area
98 +        """
99 +        self.taskuuid = str(common._db.queryTask('name'))
100 +        common.logger.debug( "Task name: " + self.taskuuid)
101 +
102 +        # full remote dir
103 +        remotedir = os.path.join(self.storage_path, self.taskuuid)
104 +        remotelog = remotedir + '/loggingInfo_'+str(jobid)+'.log'
105 +
106 +        common.logger.info("Starting retrieving logging-info from server " \
107 +                               + str(self.storage_name) + " for job " \
108 +                               + str(jobid) + "...")
109 +
110 +        # retrieve logging info from storage
111 +        common.logger.debug( "retrieving "+ str(remotelog) +" to "+ str(destlog) )
112 +        try:
113 +            sbi.copy( remotelog, destlog)
114 +        except Exception, ex:
115 +            msg = "WARNING: Unable to retrieve logging-info file %s \n"%remotelog
116 +            msg += str(ex)
117 +            common.logger.debug(msg)
118 +            return False
119 +        # cleaning remote logging info file
120 +        try:
121 +            common.logger.debug( "Cleaning remote file [%s] " %( str(remotelog) ) )
122 +            sbi.delete(remotelog)
123 +        except Exception, ex:
124 +            msg = "WARNING: Unable to clean remote logging-info file %s \n"%remotelog
125 +            msg += str(ex)
126 +            common.logger.debug(msg)
127 +        return True

Diff Legend

Removed lines
+ Added lines
< Changed lines
> Changed lines