1 |
|
from Actor import * |
2 |
|
from crab_util import * |
3 |
+ |
#import EdgLoggingInfo |
4 |
+ |
#import CondorGLoggingInfo |
5 |
|
import common |
6 |
< |
from ApmonIf import ApmonIf |
5 |
< |
import Statistic |
6 |
< |
#from random import random |
7 |
< |
import time |
8 |
< |
from ProgressBar import ProgressBar |
9 |
< |
from TerminalController import TerminalController |
6 |
> |
import string, os |
7 |
|
|
8 |
< |
class Submitter(Actor): |
8 |
> |
class PostMortem(Actor): |
9 |
|
def __init__(self, cfg_params, nj_list): |
10 |
|
self.cfg_params = cfg_params |
11 |
|
self.nj_list = nj_list |
12 |
< |
|
13 |
< |
if common.scheduler.boss_scheduler_name == 'condor_g': |
14 |
< |
# create hash of cfg file |
18 |
< |
self.hash = makeCksum(common.work_space.cfgFileName()) |
19 |
< |
else: |
20 |
< |
self.hash = '' |
12 |
> |
self.all_jobs=common._db.nJobs('list') |
13 |
> |
|
14 |
> |
self.fname_base = common.work_space.jobDir() + self.cfg_params['CRAB.jobtype'].upper() + '_' |
15 |
|
|
16 |
|
return |
17 |
|
|
18 |
|
def run(self): |
19 |
|
""" |
20 |
< |
The main method of the class: submit jobs in range self.nj_list |
20 |
> |
The main method of the class. |
21 |
|
""" |
22 |
< |
common.logger.debug(5, "Submitter::run() called") |
22 |
> |
common.logger.debug(5, "PostMortem::run() called") |
23 |
|
|
24 |
< |
totalCreatedJobs= 0 |
25 |
< |
start = time.time() |
26 |
< |
for nj in range(common.jobDB.nJobs()): |
27 |
< |
if (common.jobDB.status(nj)=='C') or (common.jobDB.status(nj)=='RC'): totalCreatedJobs +=1 |
28 |
< |
pass |
29 |
< |
|
30 |
< |
if (totalCreatedJobs==0): |
31 |
< |
common.logger.message("No jobs to be submitted: first create them") |
32 |
< |
return |
33 |
< |
|
34 |
< |
# submit pre DashBoard information |
35 |
< |
params = {'jobId':'TaskMeta'} |
42 |
< |
|
43 |
< |
fl = open(common.work_space.shareDir() + '/' + self.cfg_params['apmon'].fName, 'r') |
44 |
< |
for i in fl.readlines(): |
45 |
< |
val = i.split(':') |
46 |
< |
params[val[0]] = string.strip(val[1]) |
47 |
< |
fl.close() |
48 |
< |
|
49 |
< |
common.logger.debug(5,'Submission DashBoard Pre-Submission report: '+str(params)) |
50 |
< |
|
51 |
< |
self.cfg_params['apmon'].sendToML(params) |
52 |
< |
|
53 |
< |
######### |
54 |
< |
# Loop over jobs |
55 |
< |
njs = 0 |
56 |
< |
try: |
57 |
< |
list=[] |
58 |
< |
list_of_list = [] |
59 |
< |
lastBlock=-1 |
60 |
< |
count = 0 |
61 |
< |
for nj in self.nj_list: |
62 |
< |
same=0 |
63 |
< |
# first check that status of the job is suitable for submission |
64 |
< |
st = common.jobDB.status(nj) |
65 |
< |
if st != 'C' and st != 'K' and st != 'A' and st != 'RC': |
66 |
< |
long_st = crabJobStatusToString(st) |
67 |
< |
msg = "Job # %d not submitted: status %s"%(nj+1, long_st) |
68 |
< |
common.logger.message(msg) |
69 |
< |
continue |
70 |
< |
|
71 |
< |
currBlock = common.jobDB.block(nj) |
72 |
< |
# SL perform listmatch only if block has changed |
73 |
< |
if (currBlock!=lastBlock): |
74 |
< |
if common.scheduler.boss_scheduler_name != "condor_g" : |
75 |
< |
match = common.scheduler.listMatch(nj, currBlock) |
76 |
< |
else : |
77 |
< |
match = "1" |
78 |
< |
lastBlock = currBlock |
79 |
< |
else: |
80 |
< |
common.logger.debug(1,"Sites for job "+str(nj+1)+" the same as previous job") |
81 |
< |
same=1 |
82 |
< |
|
83 |
< |
if match: |
84 |
< |
if not same: |
85 |
< |
common.logger.message("Found "+str(match)+" compatible site(s) for job "+str(nj+1)) |
86 |
< |
else: |
87 |
< |
common.logger.debug(1,"Found "+str(match)+" compatible site(s) for job "+str(nj+1)) |
88 |
< |
list.append(common.jobDB.bossId(nj)) |
89 |
< |
|
90 |
< |
if nj == self.nj_list[-1]: # check that is not the last job in the list |
91 |
< |
list_of_list.append([currBlock,list]) |
92 |
< |
else: # check if next job has same group |
93 |
< |
nextBlock = common.jobDB.block(nj+1) |
94 |
< |
if currBlock != nextBlock : # if not, close this group and reset |
95 |
< |
list_of_list.append([currBlock,list]) |
96 |
< |
list=[] |
97 |
< |
else: |
98 |
< |
common.logger.message("No compatible site found, will not submit job "+str(nj+1)) |
24 |
> |
self.collectLogging() |
25 |
> |
|
26 |
> |
|
27 |
> |
def collectLogging(self): |
28 |
> |
for id in self.nj_list: |
29 |
> |
if id not in self.all_jobs: |
30 |
> |
common.logger.message('Warning: job # ' + str(id) + ' does not exist! Not possible to ask for postMortem ') |
31 |
> |
continue |
32 |
> |
else: |
33 |
> |
fname = self.fname_base + str(id) + '.LoggingInfo' |
34 |
> |
if os.path.exists(fname): |
35 |
> |
common.logger.message('Logging info for job ' + str(id) + ' already present in '+fname+'\nRemove it for update') |
36 |
|
continue |
37 |
< |
count += 1 |
38 |
< |
### Progress Bar indicator, deactivate for debug |
39 |
< |
if not common.logger.debugLevel() : |
40 |
< |
term = TerminalController() |
41 |
< |
|
42 |
< |
for ii in range(len(list_of_list)): # Add loop DS |
106 |
< |
common.logger.debug(1,'Submitting jobs '+str(list_of_list[ii][1])) |
107 |
< |
# if not common.logger.debugLevel() : |
108 |
< |
# try: pbar = ProgressBar(term, 'Submitting '+str(len(list_of_list[ii][1]))+' jobs') |
109 |
< |
# except: pbar = None |
110 |
< |
|
111 |
< |
jidLista, bjidLista = common.scheduler.submit(list_of_list[ii]) |
112 |
< |
bjidLista = map(int, bjidLista) # cast all bjidLista to int |
113 |
< |
|
114 |
< |
# if not common.logger.debugLevel(): |
115 |
< |
# if pbar : |
116 |
< |
# pbar.update(float(ii+1)/float(len(list_of_list)),'please wait') |
117 |
< |
|
118 |
< |
for jj in bjidLista: # Add loop over SID returned from group submission DS |
119 |
< |
tmpNj = jj - 1 |
120 |
< |
jid=jidLista[bjidLista.index(jj)] |
121 |
< |
common.logger.debug(5,"Submitted job # "+ `(jj)`) |
122 |
< |
common.jobDB.setStatus(tmpNj, 'S') |
123 |
< |
common.jobDB.setJobId(tmpNj, jid) |
124 |
< |
common.jobDB.setTaskId(tmpNj, self.cfg_params['taskId']) |
125 |
< |
njs += 1 |
126 |
< |
|
127 |
< |
##### DashBoard report ##################### |
128 |
< |
try: |
129 |
< |
resFlag = 0 |
130 |
< |
if st == 'RC': resFlag = 2 |
131 |
< |
Statistic.Monitor('submit',resFlag,jid,'-----','dest') |
132 |
< |
except: |
133 |
< |
pass |
134 |
< |
|
135 |
< |
# OLI: JobID treatment, special for Condor-G scheduler |
136 |
< |
jobId = '' |
137 |
< |
if common.scheduler.boss_scheduler_name == 'condor_g': |
138 |
< |
jobId = str(jj) + '_' + self.hash + '_' + jid |
139 |
< |
common.logger.debug(5,'JobID for ML monitoring is created for CONDOR_G scheduler:'+jobId) |
140 |
< |
else: |
141 |
< |
jobId = str(jj) + '_' + jid |
142 |
< |
common.logger.debug(5,'JobID for ML monitoring is created for EDG scheduler'+jobId) |
143 |
< |
|
144 |
< |
if ( jid.find(":") != -1 ) : |
145 |
< |
rb = jid.split(':')[1] |
146 |
< |
rb = rb.replace('//', '') |
147 |
< |
else : |
148 |
< |
rb = 'OSG' |
149 |
< |
|
150 |
< |
params = {'jobId': jobId, \ |
151 |
< |
'sid': jid, \ |
152 |
< |
'broker': rb, \ |
153 |
< |
'bossId': jj, \ |
154 |
< |
'TargetSE': string.join((common.jobDB.destination(tmpNj)),",")} |
155 |
< |
|
156 |
< |
fl = open(common.work_space.shareDir() + '/' + self.cfg_params['apmon'].fName, 'r') |
157 |
< |
for i in fl.readlines(): |
158 |
< |
val = i.split(':') |
159 |
< |
params[val[0]] = string.strip(val[1]) |
160 |
< |
fl.close() |
161 |
< |
|
162 |
< |
common.logger.debug(5,'Submission DashBoard report: '+str(params)) |
163 |
< |
|
164 |
< |
self.cfg_params['apmon'].sendToML(params) |
165 |
< |
pass |
166 |
< |
pass |
167 |
< |
|
168 |
< |
except: |
169 |
< |
exctype, value = sys.exc_info()[:2] |
170 |
< |
print "Type: %s Value: %s"%(exctype, value) |
171 |
< |
common.logger.message("Submitter::run Exception raised: %s %s"%(exctype, value)) |
172 |
< |
common.jobDB.save() |
173 |
< |
|
174 |
< |
stop = time.time() |
175 |
< |
common.logger.debug(1, "Submission Time: "+str(stop - start)) |
176 |
< |
common.logger.write("Submission time :"+str(stop - start)) |
177 |
< |
common.jobDB.save() |
178 |
< |
|
179 |
< |
msg = '\nTotal of %d jobs submitted'%njs |
180 |
< |
if njs != len(self.nj_list) : |
181 |
< |
msg += ' (from %d requested).'%(len(self.nj_list)) |
182 |
< |
pass |
183 |
< |
else: |
184 |
< |
msg += '.' |
185 |
< |
pass |
186 |
< |
common.logger.message(msg) |
37 |
> |
common.scheduler.loggingInfo(id,self.fname_base+str(id)) |
38 |
> |
fl = open(fname, 'r') |
39 |
> |
out = "".join(fl.readlines()) |
40 |
> |
fl.close() |
41 |
> |
reason = self.decodeLogging(out) |
42 |
> |
common.logger.message('Logging info for job '+ str(id) +': '+str(reason)+'\n written to '+str(fname)+' \n' ) |
43 |
|
return |
44 |
+ |
|
45 |
+ |
def decodeLogging(self, out): |
46 |
+ |
""" |
47 |
+ |
""" |
48 |
+ |
return common.scheduler.decodeLogInfo(out) |
49 |
+ |
|