1 |
< |
from Actor import * |
1 |
> |
from PostMortem import PostMortem |
2 |
> |
|
3 |
|
from crab_util import * |
4 |
|
import common |
5 |
< |
from ApmonIf import ApmonIf |
5 |
< |
import Statistic |
6 |
< |
import time |
7 |
< |
from ProgressBar import ProgressBar |
8 |
< |
from TerminalController import TerminalController |
9 |
< |
|
10 |
< |
import xml.dom.minidom |
11 |
< |
import xml.dom.ext |
12 |
< |
|
13 |
< |
class PostMortemServer(Actor): |
14 |
< |
|
15 |
< |
def __init__(self, cfg_params,): |
16 |
< |
self.cfg_params = cfg_params |
17 |
< |
try: |
18 |
< |
self.server_name = self.cfg_params['CRAB.server_name'] # gsiftp://pcpg01.cern.ch/data/SEDir/ |
19 |
< |
except KeyError: |
20 |
< |
msg = 'No server selected ...' |
21 |
< |
msg = msg + 'Please specify a server in the crab cfg file' |
22 |
< |
raise CrabException(msg) |
23 |
< |
return |
24 |
< |
|
25 |
< |
def run(self): |
26 |
< |
""" |
27 |
< |
The main method of the class: retrieve the post mortem output from server |
28 |
< |
""" |
29 |
< |
common.logger.debug(5, "PostMortem server::run() called") |
30 |
< |
|
31 |
< |
start = time.time() |
5 |
> |
import string, os |
6 |
|
|
7 |
< |
common.scheduler.checkProxy() |
7 |
> |
from ProdCommon.Storage.SEAPI.SElement import SElement |
8 |
> |
from ProdCommon.Storage.SEAPI.SBinterface import SBinterface |
9 |
|
|
10 |
< |
common.taskDB.load() |
11 |
< |
WorkDirName =os.path.basename(os.path.split(common.work_space.topDir())[0]) |
12 |
< |
projectUniqName = 'crab_'+str(WorkDirName)+'_'+common.taskDB.dict("TasKUUID") |
13 |
< |
#Need to add a check on the treashold level |
14 |
< |
# and on the task readness TODO |
10 |
> |
class PostMortemServer(PostMortem): |
11 |
> |
def __init__(self, cfg_params, nj_list): |
12 |
> |
|
13 |
> |
PostMortem.__init__(self, cfg_params, nj_list) |
14 |
> |
|
15 |
> |
# init client server params... |
16 |
> |
CliServerParams(self) |
17 |
> |
|
18 |
> |
if self.storage_path[0]!='/': |
19 |
> |
self.storage_path = '/'+self.storage_path |
20 |
> |
|
21 |
> |
return |
22 |
> |
|
23 |
> |
def collectLogging(self): |
24 |
> |
# get updated status from server |
25 |
|
try: |
26 |
< |
### retrieving poject from the server |
27 |
< |
common.logger.message("Retrieving the poject from the server...\n") |
28 |
< |
|
44 |
< |
copyHere = common.work_space.jobDir() # MATT |
45 |
< |
|
46 |
< |
cmd = 'lcg-cp --vo cms --verbose gsiftp://' + str(self.server_name) + str(projectUniqName)+'/res/failed.tgz file://'+copyHere+'failed.tgz'# MATT |
47 |
< |
common.logger.debug(5, cmd) |
48 |
< |
copyOut = os.system(cmd +' >& /dev/null') |
26 |
> |
from StatusServer import StatusServer |
27 |
> |
stat = StatusServer(self.cfg_params) |
28 |
> |
stat.resynchClientSide() |
29 |
|
except: |
30 |
< |
msg = ("postMortem output not yet available") |
30 |
> |
pass |
31 |
> |
|
32 |
> |
#create once storage interaction object |
33 |
> |
seEl = None |
34 |
> |
loc = None |
35 |
> |
try: |
36 |
> |
seEl = SElement(self.storage_name, self.storage_proto, self.storage_port) |
37 |
> |
except Exception, ex: |
38 |
> |
common.logger.debug( str(ex)) |
39 |
> |
msg = "ERROR: Unable to create SE source interface \n" |
40 |
|
raise CrabException(msg) |
41 |
+ |
try: |
42 |
+ |
loc = SElement("localhost", "local") |
43 |
+ |
except Exception, ex: |
44 |
+ |
common.logger.debug( str(ex)) |
45 |
+ |
msg = "ERROR: Unable to create destination interface \n" |
46 |
+ |
raise CrabException(msg) |
47 |
+ |
|
48 |
+ |
## coupling se interfaces |
49 |
+ |
sbi = SBinterface( seEl, loc ) |
50 |
|
|
51 |
< |
zipOut = "failed.tgz" |
52 |
< |
if os.path.exists( copyHere + zipOut ): # MATT |
55 |
< |
cwd = os.getcwd() |
56 |
< |
os.chdir( copyHere )# MATT |
57 |
< |
common.logger.debug( 5, 'tar -zxvf ' + zipOut ) |
58 |
< |
cmd = 'tar -zxvf ' + zipOut |
59 |
< |
cmd += '; mv .tmpFailed/* .; rm -drf .tmpDone/' |
60 |
< |
cmd_out = runCommand(cmd) |
61 |
< |
os.chdir(cwd) |
62 |
< |
common.logger.debug( 5, 'rm -f '+copyHere+zipOut )# MATT |
63 |
< |
cmd = 'rm -f '+copyHere+zipOut# MATT |
64 |
< |
cmd_out = runCommand(cmd) |
65 |
< |
|
66 |
< |
msg='Logging info for project '+str(WorkDirName)+': \n' |
67 |
< |
msg+='written to '+copyHere+' \n' # MATT |
68 |
< |
common.logger.message(msg) |
69 |
< |
else: |
70 |
< |
common.logger.message("Logging info is not yet ready....\n") |
51 |
> |
## get the list of jobs to get logging.info skimmed by failed status |
52 |
> |
logginable = self.skimDeadList() |
53 |
|
|
54 |
+ |
## iter over each asked job and print warning if not in skimmed list |
55 |
+ |
for id in self.nj_list: |
56 |
+ |
if id not in self.all_jobs: |
57 |
+ |
common.logger.info('Warning: job # ' + str(id) + ' does not exist! Not possible to ask for postMortem ') |
58 |
+ |
continue |
59 |
+ |
elif id in logginable: |
60 |
+ |
fname = self.fname_base + str(id) + '.LoggingInfo' |
61 |
+ |
if os.path.exists(fname): |
62 |
+ |
common.logger.info('Logging info for job ' + str(id) + ' already present in '+fname+'\nRemove it for update') |
63 |
+ |
continue |
64 |
+ |
## retrieving & processing logging info |
65 |
+ |
if self.retrieveFile( sbi, id, fname): |
66 |
+ |
## decode logging info |
67 |
+ |
fl = open(fname, 'r') |
68 |
+ |
out = "".join(fl.readlines()) |
69 |
+ |
fl.close() |
70 |
+ |
reason = self.decodeLogging(out) |
71 |
+ |
common.logger.info('Logging info for job '+ str(id) +': '+str(reason)+'\n written to '+str(fname)+' \n' ) |
72 |
+ |
else: |
73 |
+ |
common.logger.info('Logging info for job '+ str(id) +' not retrieved. Tring to get loggingInfo manually') |
74 |
+ |
PostMortem.collectOneLogging(self,id) |
75 |
+ |
else: |
76 |
+ |
common.logger.info('Warning: job # ' + str(id) + ' not killed or aborted! Will get loggingInfo manually ') |
77 |
+ |
PostMortem.collectOneLogging(self,id) |
78 |
|
return |
79 |
|
|
80 |
+ |
|
81 |
+ |
def skimDeadList(self): |
82 |
+ |
""" |
83 |
+ |
__skimDeadList__ |
84 |
+ |
return the list of jobs really failed: K, A |
85 |
+ |
""" |
86 |
+ |
skimmedlist = [] |
87 |
+ |
self.up_task = common._db.getTask( self.nj_list ) |
88 |
+ |
for job in self.up_task.jobs: |
89 |
+ |
if job.runningJob['status'] in ['K','A']: |
90 |
+ |
skimmedlist.append(job['jobId']) |
91 |
+ |
return skimmedlist |
92 |
+ |
|
93 |
+ |
def retrieveFile(self, sbi, jobid, destlog): |
94 |
+ |
""" |
95 |
+ |
__retrieveFile__ |
96 |
+ |
|
97 |
+ |
retrieves logging.info file from the server storage area |
98 |
+ |
""" |
99 |
+ |
self.taskuuid = str(common._db.queryTask('name')) |
100 |
+ |
common.logger.debug( "Task name: " + self.taskuuid) |
101 |
+ |
|
102 |
+ |
# full remote dir |
103 |
+ |
remotedir = os.path.join(self.storage_path, self.taskuuid) |
104 |
+ |
remotelog = remotedir + '/loggingInfo_'+str(jobid)+'.log' |
105 |
+ |
|
106 |
+ |
common.logger.info("Starting retrieving logging-info from server " \ |
107 |
+ |
+ str(self.storage_name) + " for job " \ |
108 |
+ |
+ str(jobid) + "...") |
109 |
+ |
|
110 |
+ |
# retrieve logging info from storage |
111 |
+ |
common.logger.debug( "retrieving "+ str(remotelog) +" to "+ str(destlog) ) |
112 |
+ |
try: |
113 |
+ |
sbi.copy( remotelog, destlog) |
114 |
+ |
except Exception, ex: |
115 |
+ |
msg = "WARNING: Unable to retrieve logging-info file %s \n"%remotelog |
116 |
+ |
msg += str(ex) |
117 |
+ |
common.logger.debug(msg) |
118 |
+ |
return False |
119 |
+ |
# cleaning remote logging info file |
120 |
+ |
try: |
121 |
+ |
common.logger.debug( "Cleaning remote file [%s] " %( str(remotelog) ) ) |
122 |
+ |
sbi.delete(remotelog) |
123 |
+ |
except Exception, ex: |
124 |
+ |
msg = "WARNING: Unable to clean remote logging-info file %s \n"%remotelog |
125 |
+ |
msg += str(ex) |
126 |
+ |
common.logger.debug(msg) |
127 |
+ |
return True |