1 |
|
from Actor import * |
2 |
|
from crab_util import * |
3 |
+ |
import EdgLoggingInfo |
4 |
+ |
import CondorGLoggingInfo |
5 |
|
import common |
6 |
< |
from ApmonIf import ApmonIf |
5 |
< |
import Statistic |
6 |
< |
#from random import random |
7 |
< |
import time |
8 |
< |
from ProgressBar import ProgressBar |
9 |
< |
from TerminalController import TerminalController |
6 |
> |
import string, os |
7 |
|
|
8 |
< |
class Submitter(Actor): |
8 |
> |
class PostMortem(Actor): |
9 |
|
def __init__(self, cfg_params, nj_list): |
10 |
|
self.cfg_params = cfg_params |
11 |
|
self.nj_list = nj_list |
12 |
< |
|
12 |
> |
|
13 |
|
if common.scheduler.boss_scheduler_name == 'condor_g': |
14 |
|
# create hash of cfg file |
15 |
|
self.hash = makeCksum(common.work_space.cfgFileName()) |
16 |
|
else: |
17 |
|
self.hash = '' |
18 |
< |
|
18 |
> |
|
19 |
|
return |
20 |
|
|
21 |
|
def run(self): |
22 |
|
""" |
23 |
< |
The main method of the class: submit jobs in range self.nj_list |
23 |
> |
The main method of the class. |
24 |
|
""" |
25 |
< |
common.logger.debug(5, "Submitter::run() called") |
29 |
< |
|
30 |
< |
totalCreatedJobs= 0 |
31 |
< |
start = time.time() |
32 |
< |
for nj in range(common.jobDB.nJobs()): |
33 |
< |
if (common.jobDB.status(nj)=='C') or (common.jobDB.status(nj)=='RC'): totalCreatedJobs +=1 |
34 |
< |
pass |
25 |
> |
common.logger.debug(5, "PostMortem::run() called") |
26 |
|
|
27 |
< |
if (totalCreatedJobs==0): |
28 |
< |
common.logger.message("No jobs to be submitted: first create them") |
29 |
< |
return |
30 |
< |
|
31 |
< |
# submit pre DashBoard information |
32 |
< |
params = {'jobId':'TaskMeta'} |
33 |
< |
|
34 |
< |
fl = open(common.work_space.shareDir() + '/' + self.cfg_params['apmon'].fName, 'r') |
35 |
< |
for i in fl.readlines(): |
36 |
< |
val = i.split(':') |
37 |
< |
params[val[0]] = string.strip(val[1]) |
38 |
< |
fl.close() |
39 |
< |
|
40 |
< |
common.logger.debug(5,'Submission DashBoard Pre-Submission report: '+str(params)) |
41 |
< |
|
42 |
< |
self.cfg_params['apmon'].sendToML(params) |
43 |
< |
|
44 |
< |
######### |
45 |
< |
# Loop over jobs |
46 |
< |
njs = 0 |
47 |
< |
try: |
48 |
< |
list=[] |
49 |
< |
list_of_list = [] |
59 |
< |
lastBlock=-1 |
60 |
< |
count = 0 |
61 |
< |
for nj in self.nj_list: |
62 |
< |
same=0 |
63 |
< |
# first check that status of the job is suitable for submission |
64 |
< |
st = common.jobDB.status(nj) |
65 |
< |
if st != 'C' and st != 'K' and st != 'A' and st != 'RC': |
66 |
< |
long_st = crabJobStatusToString(st) |
67 |
< |
msg = "Job # %d not submitted: status %s"%(nj+1, long_st) |
68 |
< |
common.logger.message(msg) |
69 |
< |
continue |
70 |
< |
|
71 |
< |
currBlock = common.jobDB.block(nj) |
72 |
< |
# SL perform listmatch only if block has changed |
73 |
< |
if (currBlock!=lastBlock): |
74 |
< |
if common.scheduler.boss_scheduler_name != "condor_g" : |
75 |
< |
match = common.scheduler.listMatch(nj, currBlock) |
76 |
< |
else : |
77 |
< |
match = "1" |
78 |
< |
lastBlock = currBlock |
79 |
< |
else: |
80 |
< |
common.logger.debug(1,"Sites for job "+str(nj+1)+" the same as previous job") |
81 |
< |
same=1 |
82 |
< |
|
83 |
< |
if match: |
84 |
< |
if not same: |
85 |
< |
common.logger.message("Found "+str(match)+" compatible site(s) for job "+str(nj+1)) |
86 |
< |
else: |
87 |
< |
common.logger.debug(1,"Found "+str(match)+" compatible site(s) for job "+str(nj+1)) |
88 |
< |
list.append(common.jobDB.bossId(nj)) |
89 |
< |
|
90 |
< |
if nj == self.nj_list[-1]: # check that is not the last job in the list |
91 |
< |
list_of_list.append([currBlock,list]) |
92 |
< |
else: # check if next job has same group |
93 |
< |
nextBlock = common.jobDB.block(nj+1) |
94 |
< |
if currBlock != nextBlock : # if not, close this group and reset |
95 |
< |
list_of_list.append([currBlock,list]) |
96 |
< |
list=[] |
97 |
< |
else: |
98 |
< |
common.logger.message("No compatible site found, will not submit job "+str(nj+1)) |
99 |
< |
continue |
100 |
< |
count += 1 |
101 |
< |
### Progress Bar indicator, deactivate for debug |
102 |
< |
if not common.logger.debugLevel() : |
103 |
< |
term = TerminalController() |
104 |
< |
|
105 |
< |
for ii in range(len(list_of_list)): # Add loop DS |
106 |
< |
common.logger.debug(1,'Submitting jobs '+str(list_of_list[ii][1])) |
107 |
< |
# if not common.logger.debugLevel() : |
108 |
< |
# try: pbar = ProgressBar(term, 'Submitting '+str(len(list_of_list[ii][1]))+' jobs') |
109 |
< |
# except: pbar = None |
110 |
< |
|
111 |
< |
jidLista, bjidLista = common.scheduler.submit(list_of_list[ii]) |
112 |
< |
bjidLista = map(int, bjidLista) # cast all bjidLista to int |
113 |
< |
|
114 |
< |
# if not common.logger.debugLevel(): |
115 |
< |
# if pbar : |
116 |
< |
# pbar.update(float(ii+1)/float(len(list_of_list)),'please wait') |
117 |
< |
|
118 |
< |
for jj in bjidLista: # Add loop over SID returned from group submission DS |
119 |
< |
tmpNj = jj - 1 |
120 |
< |
jid=jidLista[bjidLista.index(jj)] |
121 |
< |
common.logger.debug(5,"Submitted job # "+ `(jj)`) |
122 |
< |
common.jobDB.setStatus(tmpNj, 'S') |
123 |
< |
common.jobDB.setJobId(tmpNj, jid) |
124 |
< |
common.jobDB.setTaskId(tmpNj, self.cfg_params['taskId']) |
125 |
< |
njs += 1 |
126 |
< |
|
127 |
< |
##### DashBoard report ##################### |
128 |
< |
try: |
129 |
< |
resFlag = 0 |
130 |
< |
if st == 'RC': resFlag = 2 |
131 |
< |
Statistic.Monitor('submit',resFlag,jid,'-----','dest') |
132 |
< |
except: |
133 |
< |
pass |
134 |
< |
|
135 |
< |
# OLI: JobID treatment, special for Condor-G scheduler |
136 |
< |
jobId = '' |
137 |
< |
if common.scheduler.boss_scheduler_name == 'condor_g': |
138 |
< |
jobId = str(jj) + '_' + self.hash + '_' + jid |
139 |
< |
common.logger.debug(5,'JobID for ML monitoring is created for CONDOR_G scheduler:'+jobId) |
140 |
< |
else: |
141 |
< |
jobId = str(jj) + '_' + jid |
142 |
< |
common.logger.debug(5,'JobID for ML monitoring is created for EDG scheduler'+jobId) |
143 |
< |
|
144 |
< |
if ( jid.find(":") != -1 ) : |
145 |
< |
rb = jid.split(':')[1] |
146 |
< |
rb = rb.replace('//', '') |
147 |
< |
else : |
148 |
< |
rb = 'OSG' |
149 |
< |
|
150 |
< |
params = {'jobId': jobId, \ |
151 |
< |
'sid': jid, \ |
152 |
< |
'broker': rb, \ |
153 |
< |
'bossId': jj, \ |
154 |
< |
'TargetSE': string.join((common.jobDB.destination(tmpNj)),",")} |
155 |
< |
|
156 |
< |
fl = open(common.work_space.shareDir() + '/' + self.cfg_params['apmon'].fName, 'r') |
157 |
< |
for i in fl.readlines(): |
158 |
< |
val = i.split(':') |
159 |
< |
params[val[0]] = string.strip(val[1]) |
160 |
< |
fl.close() |
161 |
< |
|
162 |
< |
common.logger.debug(5,'Submission DashBoard report: '+str(params)) |
163 |
< |
|
164 |
< |
self.cfg_params['apmon'].sendToML(params) |
165 |
< |
pass |
166 |
< |
pass |
27 |
> |
for c, v in self.nj_list.iteritems(): |
28 |
> |
id = int(c) |
29 |
> |
out = common.scheduler.loggingInfo(v) |
30 |
> |
# job = common.job_list[id - 1] |
31 |
> |
jobnum_str = '%06d' % (id) |
32 |
> |
fname = common.work_space.jobDir() + '/' + self.cfg_params['CRAB.jobtype'].upper() + '_' + jobnum_str + '.loggingInfo' |
33 |
> |
if os.path.exists(fname): |
34 |
> |
common.logger.message('Logging info for job ' + str(id) + ' already present in '+fname+'\nRemove it for update') |
35 |
> |
continue |
36 |
> |
jdl = open(fname, 'w') |
37 |
> |
for line in out: jdl.write(line) |
38 |
> |
jdl.close() |
39 |
> |
|
40 |
> |
reason = '' |
41 |
> |
## SL this if-elif is the negation of OO! Mus disappear ASAP |
42 |
> |
if common.scheduler.boss_scheduler_name == "edg" or common.scheduler.boss_scheduler_name == "glite" or common.scheduler.boss_scheduler_name == "glitecoll": |
43 |
> |
loggingInfo = EdgLoggingInfo.EdgLoggingInfo() |
44 |
> |
reason = loggingInfo.decodeReason(out) |
45 |
> |
elif common.scheduler.boss_scheduler_name == "condor_g" : |
46 |
> |
loggingInfo = CondorGLoggingInfo.CondorGLoggingInfo() |
47 |
> |
reason = loggingInfo.decodeReason(out) |
48 |
> |
else : |
49 |
> |
reason = out |
50 |
|
|
51 |
< |
except: |
169 |
< |
exctype, value = sys.exc_info()[:2] |
170 |
< |
print "Type: %s Value: %s"%(exctype, value) |
171 |
< |
common.logger.message("Submitter::run Exception raised: %s %s"%(exctype, value)) |
172 |
< |
common.jobDB.save() |
173 |
< |
|
174 |
< |
stop = time.time() |
175 |
< |
common.logger.debug(1, "Submission Time: "+str(stop - start)) |
176 |
< |
common.logger.write("Submission time :"+str(stop - start)) |
177 |
< |
common.jobDB.save() |
51 |
> |
common.logger.message('Logging info for job '+ str(id) +': '+str(reason)+'\n written to '+str(fname) ) |
52 |
|
|
53 |
< |
msg = '\nTotal of %d jobs submitted'%njs |
54 |
< |
if njs != len(self.nj_list) : |
55 |
< |
msg += ' (from %d requested).'%(len(self.nj_list)) |
56 |
< |
pass |
57 |
< |
else: |
58 |
< |
msg += '.' |
53 |
> |
# ML reporting |
54 |
> |
jobId = '' |
55 |
> |
if common.scheduler.boss_scheduler_name == 'condor_g': |
56 |
> |
jobId = str(id) + '_' + self.hash + '_' + v |
57 |
> |
else: |
58 |
> |
jobId = str(id) + '_' + v |
59 |
> |
|
60 |
> |
params = {'taskId': self.cfg_params['taskId'], 'jobId': jobId, \ |
61 |
> |
'sid': v, |
62 |
> |
'PostMortemCategory': loggingInfo.getCategory(), \ |
63 |
> |
'PostMortemReason': loggingInfo.getReason()} |
64 |
> |
common.apmon.sendToML(params) |
65 |
|
pass |
66 |
< |
common.logger.message(msg) |
66 |
> |
|
67 |
|
return |
68 |
+ |
|