ViewVC Help
View File | Revision Log | Show Annotations | Root Listing
root/cvsroot/COMP/CRAB/python/PostMortemServer.py
(Generate patch)

Comparing COMP/CRAB/python/PostMortemServer.py (file contents):
Revision 1.2 by farinafa, Mon Nov 19 18:01:32 2007 UTC vs.
Revision 1.15 by ewv, Tue Jul 27 15:56:13 2010 UTC

# Line 1 | Line 1
1 < from Actor import *
1 > from PostMortem import PostMortem
2 >
3   from crab_util import *
4   import common
5 < from ApmonIf import ApmonIf
5 < import Statistic
6 < import time
7 < from ProgressBar import ProgressBar
8 < from TerminalController import TerminalController
9 <
10 < class PostMortemServer(Actor):
11 <
12 <    def __init__(self, cfg_params,):
13 <        self.cfg_params = cfg_params
14 <        try:  
15 <            self.server_name = self.cfg_params['CRAB.server_name'] # gsiftp://pcpg01.cern.ch/data/SEDir/
16 <        except KeyError:
17 <            msg = 'No server selected ...'
18 <            msg = msg + 'Please specify a server in the crab cfg file'
19 <            raise CrabException(msg)
20 <        return
21 <    
22 <    def run(self):
23 <        """
24 <        The main method of the class: retrieve the post mortem output from server
25 <        """
26 <        common.logger.debug(5, "PostMortem server::run() called")
5 > import string, os
6  
7 <        start = time.time()
7 > from ProdCommon.Storage.SEAPI.SElement import SElement
8 > from ProdCommon.Storage.SEAPI.SBinterface import SBinterface
9  
10 <        common.scheduler.checkProxy()
10 > class PostMortemServer(PostMortem):
11 >    def __init__(self, cfg_params, nj_list):
12  
13 <        common.taskDB.load()
14 <        WorkDirName =os.path.basename(os.path.split(common.work_space.topDir())[0])
15 <        projectUniqName = 'crab_'+str(WorkDirName)+'_'+common.taskDB.dict("TasKUUID")    
16 <        #Need to add a check on the treashold level
17 <        # and on the task readness  TODO  
18 <        try:
19 <            ### retrieving poject from the server
20 <            common.logger.message("Retrieving the poject from the server...\n")
13 >        PostMortem.__init__(self, cfg_params, nj_list)
14 >
15 >        # init client server params...
16 >        CliServerParams(self)
17 >
18 >        self.copyTout= setLcgTimeout()
19 >        if common.scheduler.name().upper() in ['LSF', 'CAF']:
20 >            self.copyTout= ' '
21 >
22 >        if self.storage_path[0]!='/':
23 >            self.storage_path = '/'+self.storage_path
24  
25 <            copyHere = common.work_space.jobDir() # MATT
26 <            
27 <            cmd = 'lcg-cp --vo cms --verbose gsiftp://' + str(self.server_name) + str(projectUniqName)+'/res/failed.tgz file://'+copyHere+'failed.tgz'# MATT
28 <            common.logger.debug(5, cmd)
29 <            copyOut = os.system(cmd +' >& /dev/null')
25 >        return
26 >
27 >    def collectLogging(self):
28 >        # get updated status from server
29 >        try:
30 >            from StatusServer import StatusServer
31 >            stat = StatusServer(self.cfg_params)
32 >            warning_msg = stat.resynchClientSide()
33 >            if warning_msg is not None:
34 >                common.logger.info(warning_msg)
35          except:
36 <            msg = ("postMortem output not yet available")
36 >            pass
37 >
38 >        #create once storage interaction object
39 >        seEl = None
40 >        loc = None
41 >        try:
42 >            seEl = SElement(self.storage_name, self.storage_proto, self.storage_port)
43 >        except Exception, ex:
44 >            common.logger.debug( str(ex))
45 >            msg = "ERROR: Unable to create SE source interface \n"
46 >            raise CrabException(msg)
47 >        try:
48 >            loc = SElement("localhost", "local")
49 >        except Exception, ex:
50 >            common.logger.debug( str(ex))
51 >            msg = "ERROR: Unable to create destination interface \n"
52              raise CrabException(msg)
53  
54 <        zipOut = "failed.tgz"
55 <        if os.path.exists( copyHere + zipOut ): # MATT
56 <            cwd = os.getcwd()
57 <            os.chdir( copyHere )# MATT
58 <            common.logger.debug( 5, 'tar -zxvf ' + zipOut )
55 <            cmd = 'tar -zxvf ' + zipOut
56 <            cmd += '; mv .tmpFailed/* .; rm -drf .tmpDone/'
57 <            cmd_out = runCommand(cmd)
58 <            os.chdir(cwd)
59 <            common.logger.debug( 5, 'rm -f '+copyHere+zipOut )# MATT
60 <            cmd = 'rm -f '+copyHere+zipOut# MATT
61 <            cmd_out = runCommand(cmd)
62 <
63 <            msg='Logging info for project '+str(WorkDirName)+': \n'      
64 <            msg+='written to '+copyHere+' \n'      # MATT
65 <            common.logger.message(msg)
66 <        else:
67 <            common.logger.message("Logging info is not yet ready....\n")
54 >        ## coupling se interfaces
55 >        sbi = SBinterface(seEl, loc, logger = common.logger.logger)
56 >
57 >        ## get the list of jobs to get logging.info skimmed by failed status
58 >        logginable = self.skimDeadList()
59  
60 +        if self.storage_proto in ['globus']:
61 +            for id in self.nj_list:
62 +                if id not in self.all_jobs:
63 +                    common.logger.info('Warning: job # ' + str(id) + ' does not exist! Not possible to ask for postMortem ')
64 +                elif id not in logginable:
65 +                    common.logger.info('Warning: job # ' + str(id) + ' not killed or aborted! Will get loggingInfo manually ')
66 +                    PostMortem.collectOneLogging(self,id)
67 +            # construct a list of absolute paths of input files
68 +            # and the destinations to copy them to
69 +            sourcesList = []
70 +            destsList = []
71 +            self.taskuuid = str(common._db.queryTask('name'))
72 +            common.logger.debug( "Starting globus retrieval for task name: " + self.taskuuid)
73 +            remotedir = os.path.join(self.storage_path, self.taskuuid)
74 +            for i in logginable:
75 +                remotelog = remotedir + '/loggingInfo_'+str(i)+'.log'
76 +                sourcesList.append(remotelog)
77 +                fname = self.fname_base + str(i) + '.LoggingInfo'
78 +                destsList.append(fname)
79 +
80 +            # try to do the copy
81 +            copy_res = None
82 +            try:
83 +                copy_res = sbi.copy( sourcesList, destsList, opt=self.copyTout)
84 +            except Exception, ex:
85 +                msg = "WARNING: Unable to retrieve logging" #ainfo file %s \n" % osbFiles[i]
86 +                msg += str(ex)
87 +                common.logger.debug(msg)
88 +                import traceback
89 +                common.logger.debug( str(traceback.format_exc()) )
90 +            if copy_res is not None:
91 +                ## evaluating copy results
92 +                copy_err_list = []
93 +                count = 0
94 +                for ll in map(None, copy_res, sourcesList):
95 +                    exitcode = int(ll[0][0])
96 +                    if exitcode == 0:
97 +                        ## decode logging info
98 +                        fl = open(destsList[count], 'r')
99 +                        out = "".join(fl.readlines())
100 +                        fl.close()
101 +                        reason = self.decodeLogging(out)
102 +                        common.logger.info('Logging info for job '+ str(logginable[count]) +': '+str(reason)+'\n      written to '+str(destsList[count])+' \n' )
103 +                    else:
104 +                        common.logger.info('Logging info for job '+ str(logginable[count]) +' not retrieved. Tring to get loggingInfo manually')
105 +                        PostMortem.collectOneLogging(self,logginable[count])
106 +                    count += 1
107 +        else:
108 +            ## iter over each asked job and print warning if not in skimmed list
109 +            for id in self.nj_list:
110 +                if id not in self.all_jobs:
111 +                    common.logger.info('Warning: job # ' + str(id) + ' does not exist! Not possible to ask for postMortem ')
112 +                    continue
113 +                elif id in logginable:
114 +                    fname = self.fname_base + str(id) + '.LoggingInfo'
115 +                    if os.path.exists(fname):
116 +                        common.logger.info('Logging info for job ' + str(id) + ' already present in '+fname+'\nRemove it for update')
117 +                        continue
118 +                    ## retrieving & processing logging info
119 +                    if self.retrieveFile( sbi, id, fname):
120 +                        ## decode logging info
121 +                        fl = open(fname, 'r')
122 +                        out = "".join(fl.readlines())
123 +                        fl.close()
124 +                        reason = self.decodeLogging(out)
125 +                        common.logger.info('Logging info for job '+ str(id) +': '+str(reason)+'\n      written to '+str(fname)+' \n' )
126 +                    else:
127 +                        common.logger.info('Logging info for job '+ str(id) +' not retrieved. Tring to get loggingInfo manually')
128 +                        PostMortem.collectOneLogging(self,id)
129 +                else:
130 +                    common.logger.info('Warning: job # ' + str(id) + ' not killed or aborted! Will get loggingInfo manually ')
131 +                    PostMortem.collectOneLogging(self,id)
132          return
133  
134 +
135 +    def skimDeadList(self):
136 +        """
137 +        __skimDeadList__
138 +        return the list of jobs really failed: K, A
139 +        """
140 +        skimmedlist = []
141 +        self.up_task = common._db.getTask( self.nj_list )
142 +        for job in self.up_task.jobs:
143 +            if job.runningJob['status'] in ['K','A']:
144 +                skimmedlist.append(job['jobId'])
145 +        return skimmedlist
146 +
147 +    def retrieveFile(self, sbi, jobid, destlog):
148 +        """
149 +        __retrieveFile__
150 +
151 +        retrieves logging.info file from the server storage area
152 +        """
153 +        self.taskuuid = str(common._db.queryTask('name'))
154 +        common.logger.debug( "Task name: " + self.taskuuid)
155 +
156 +        # full remote dir
157 +        remotedir = os.path.join(self.storage_path, self.taskuuid)
158 +        remotelog = remotedir + '/loggingInfo_'+str(jobid)+'.log'
159 +
160 +        common.logger.info("Starting retrieving logging-info from server " \
161 +                               + str(self.storage_name) + " for job " \
162 +                               + str(jobid) + "...")
163 +
164 +        # retrieve logging info from storage
165 +        common.logger.debug( "retrieving "+ str(remotelog) +" to "+ str(destlog) )
166 +        try:
167 +            sbi.copy( remotelog, destlog)
168 +        except Exception, ex:
169 +            msg = "WARNING: Unable to retrieve logging-info file %s \n"%remotelog
170 +            msg += str(ex)
171 +            common.logger.debug(msg)
172 +            return False
173 +        # cleaning remote logging info file
174 +        try:
175 +            common.logger.debug( "Cleaning remote file [%s] " %( str(remotelog) ) )
176 +            sbi.delete(remotelog)
177 +        except Exception, ex:
178 +            msg = "WARNING: Unable to clean remote logging-info file %s \n"%remotelog
179 +            msg += str(ex)
180 +            common.logger.debug(msg)
181 +        return True

Diff Legend

Removed lines
+ Added lines
< Changed lines
> Changed lines