1 |
slacapra |
1.1 |
from Actor import *
|
2 |
slacapra |
1.21 |
import common
|
3 |
spiga |
1.24 |
import string, os, time
|
4 |
ewv |
1.42 |
import sha
|
5 |
spiga |
1.28 |
from crab_util import *
|
6 |
slacapra |
1.1 |
|
7 |
mcinquil |
1.53 |
|
8 |
slacapra |
1.1 |
class Status(Actor):
|
9 |
spiga |
1.24 |
def __init__(self, *args):
|
10 |
|
|
self.cfg_params = args[0]
|
11 |
spiga |
1.29 |
self.xml = self.cfg_params.get("USER.xml_report",'')
|
12 |
spiga |
1.56 |
self.server_name = ''
|
13 |
|
|
|
14 |
slacapra |
1.1 |
return
|
15 |
|
|
|
16 |
|
|
def run(self):
|
17 |
|
|
"""
|
18 |
spiga |
1.24 |
The main method of the class: compute the status and print a report
|
19 |
slacapra |
1.1 |
"""
|
20 |
|
|
common.logger.debug(5, "Status::run() called")
|
21 |
|
|
|
22 |
spiga |
1.24 |
start = time.time()
|
23 |
mcinquil |
1.53 |
|
24 |
spiga |
1.28 |
self.query()
|
25 |
slacapra |
1.15 |
self.PrintReport_()
|
26 |
spiga |
1.56 |
## TEMPORARY FIXME Ds
|
27 |
|
|
msg = showWebMon(self.server_name)
|
28 |
slacapra |
1.59 |
common.logger.message(msg)
|
29 |
mcinquil |
1.53 |
|
30 |
spiga |
1.24 |
stop = time.time()
|
31 |
|
|
common.logger.debug(1, "Status Time: "+str(stop - start))
|
32 |
|
|
common.logger.write("Status Time: "+str(stop - start))
|
33 |
slacapra |
1.15 |
pass
|
34 |
|
|
|
35 |
spiga |
1.52 |
def query(self,display=True):
|
36 |
slacapra |
1.15 |
"""
|
37 |
spiga |
1.24 |
compute the status
|
38 |
slacapra |
1.15 |
"""
|
39 |
spiga |
1.24 |
common.logger.message("Checking the status of all jobs: please wait")
|
40 |
|
|
task = common._db.getTask()
|
41 |
ewv |
1.33 |
upTask = common.scheduler.queryEverything(task['id'])
|
42 |
spiga |
1.52 |
self.compute(upTask,display)
|
43 |
spiga |
1.28 |
|
44 |
spiga |
1.52 |
def compute(self, up_task, display=True ):
|
45 |
ewv |
1.33 |
|
46 |
spiga |
1.24 |
toPrint=[]
|
47 |
spiga |
1.61 |
taskId = str(up_task['name'])
|
48 |
|
|
task_unique_name = str(up_task['name'])
|
49 |
mcinquil |
1.58 |
ended = None
|
50 |
ewv |
1.44 |
|
51 |
spiga |
1.64 |
run_jobToSave = {'state' :'Terminated'}
|
52 |
|
|
listId=[]
|
53 |
|
|
listRunField=[]
|
54 |
|
|
|
55 |
spiga |
1.37 |
self.wrapErrorList = []
|
56 |
spiga |
1.25 |
for job in up_task.jobs :
|
57 |
spiga |
1.35 |
id = str(job.runningJob['jobId'])
|
58 |
spiga |
1.24 |
jobStatus = str(job.runningJob['statusScheduler'])
|
59 |
|
|
dest = str(job.runningJob['destination']).split(':')[0]
|
60 |
|
|
exe_exit_code = str(job.runningJob['applicationReturnCode'])
|
61 |
ewv |
1.33 |
job_exit_code = str(job.runningJob['wrapperReturnCode'])
|
62 |
spiga |
1.37 |
self.wrapErrorList.append(job_exit_code)
|
63 |
mcinquil |
1.54 |
ended = str(job['standardInput'])
|
64 |
spiga |
1.24 |
printline=''
|
65 |
|
|
if dest == 'None' : dest = ''
|
66 |
|
|
if exe_exit_code == 'None' : exe_exit_code = ''
|
67 |
|
|
if job_exit_code == 'None' : job_exit_code = ''
|
68 |
spiga |
1.63 |
if job.runningJob['state'] == 'Terminated' : jobStatus = 'Done'
|
69 |
spiga |
1.64 |
if job.runningJob['state'] == 'SubRequested' : jobStatus = 'Submitting'
|
70 |
|
|
if job.runningJob['status'] in ['SD','DA'] :
|
71 |
|
|
listId.append(id)
|
72 |
|
|
listRunField.append(run_jobToSave)
|
73 |
mcinquil |
1.54 |
printline+="%-6s %-18s %-36s %-13s %-16s %-4s" % (id,jobStatus,dest,exe_exit_code,job_exit_code,ended)
|
74 |
spiga |
1.24 |
toPrint.append(printline)
|
75 |
corvo |
1.16 |
|
76 |
spiga |
1.28 |
if jobStatus is not None:
|
77 |
spiga |
1.51 |
self.dataToDash(job,id,taskId,task_unique_name,dest,jobStatus)
|
78 |
spiga |
1.64 |
if len(listId) > 0 : common._db.updateRunJob_(listId, listRunField)
|
79 |
spiga |
1.28 |
header = ''
|
80 |
mcinquil |
1.58 |
if ended != None and len(ended) > 0:
|
81 |
mcinquil |
1.54 |
header+= "%-6s %-18s %-36s %-13s %-16s %-4s" % ('ID','STATUS','E_HOST','EXE_EXIT_CODE','JOB_EXIT_STATUS','ENDED')
|
82 |
|
|
else:
|
83 |
mcinquil |
1.55 |
header+= "%-6s %-18s %-36s %-13s %-16s" % ('ID','STATUS','E_HOST','EXE_EXIT_CODE','JOB_EXIT_STATUS')
|
84 |
slacapra |
1.1 |
|
85 |
spiga |
1.52 |
if display: displayReport(self,header,toPrint,self.xml)
|
86 |
spiga |
1.9 |
|
87 |
spiga |
1.28 |
return
|
88 |
spiga |
1.24 |
|
89 |
|
|
def PrintReport_(self):
|
90 |
|
|
|
91 |
spiga |
1.39 |
|
92 |
|
|
possible_status = [
|
93 |
spiga |
1.41 |
'Created',
|
94 |
spiga |
1.39 |
'Undefined',
|
95 |
ewv |
1.42 |
'Submitting',
|
96 |
spiga |
1.39 |
'Submitted',
|
97 |
|
|
'Waiting',
|
98 |
|
|
'Ready',
|
99 |
|
|
'Scheduled',
|
100 |
|
|
'Running',
|
101 |
|
|
'Done',
|
102 |
ewv |
1.42 |
'Killing',
|
103 |
spiga |
1.41 |
'Killed',
|
104 |
spiga |
1.39 |
'Aborted',
|
105 |
|
|
'Unknown',
|
106 |
spiga |
1.49 |
'Done (Failed)',
|
107 |
spiga |
1.47 |
'Cleared',
|
108 |
ewv |
1.42 |
'retrieved'
|
109 |
spiga |
1.39 |
]
|
110 |
|
|
|
111 |
spiga |
1.37 |
jobs = common._db.nJobs('list')
|
112 |
|
|
WrapExitCode = list(set(self.wrapErrorList))
|
113 |
spiga |
1.24 |
print ''
|
114 |
spiga |
1.37 |
print ">>>>>>>>> %i Total Jobs " % (len(jobs))
|
115 |
spiga |
1.24 |
print ''
|
116 |
ewv |
1.33 |
list_ID=[]
|
117 |
spiga |
1.37 |
for c in WrapExitCode:
|
118 |
spiga |
1.49 |
if c != 'None':
|
119 |
|
|
self.reportCodes(c)
|
120 |
|
|
else:
|
121 |
spiga |
1.39 |
for st in possible_status:
|
122 |
|
|
list_ID = common._db.queryAttrRunJob({'statusScheduler':st},'jobId')
|
123 |
|
|
if len(list_ID)>0:
|
124 |
ewv |
1.42 |
if st == 'killed':
|
125 |
spiga |
1.39 |
print ">>>>>>>>> %i Jobs %s " % (len(list_ID), str(st))
|
126 |
spiga |
1.46 |
print " You can resubmit them specifying JOB numbers: crab -resubmit <List of jobs>"
|
127 |
spiga |
1.47 |
print " List of jobs: %s \n" % readableList(self,list_ID)
|
128 |
ewv |
1.42 |
elif st == 'Aborted':
|
129 |
spiga |
1.39 |
print ">>>>>>>>> %i Jobs %s " % (len(list_ID), str(st))
|
130 |
spiga |
1.46 |
print " You can resubmit them specifying JOB numbers: crab -resubmit <List of jobs>"
|
131 |
spiga |
1.47 |
print " List of jobs: %s \n" % readableList(self,list_ID)
|
132 |
spiga |
1.49 |
elif st == 'Done' or st == 'Done (Failed)' :
|
133 |
spiga |
1.39 |
print ">>>>>>>>> %i Jobs %s " % (len(list_ID), str(st))
|
134 |
|
|
print " Retrieve them with: crab -getoutput <List of jobs>"
|
135 |
spiga |
1.47 |
print " List of jobs: %s \n" % readableList(self,list_ID)
|
136 |
ewv |
1.42 |
else :
|
137 |
spiga |
1.39 |
print ">>>>>>>>> %i Jobs %s \n " % (len(list_ID), str(st))
|
138 |
slacapra |
1.59 |
print " List of jobs %s: %s \n" % (str(st),readableList(self,list_ID))
|
139 |
|
|
pass
|
140 |
|
|
pass
|
141 |
|
|
pass
|
142 |
|
|
pass
|
143 |
|
|
return
|
144 |
spiga |
1.39 |
|
145 |
spiga |
1.48 |
def reportCodes(self,code):
|
146 |
spiga |
1.47 |
"""
|
147 |
|
|
"""
|
148 |
spiga |
1.48 |
list_ID = common._db.queryAttrRunJob({'wrapperReturnCode':code},'jobId')
|
149 |
spiga |
1.47 |
if len(list_ID)>0:
|
150 |
spiga |
1.48 |
print ">>>>>>>>> %i Jobs with Wrapper Exit Code : %s " % (len(list_ID), str(code))
|
151 |
spiga |
1.47 |
print " List of jobs: %s" % readableList(self,list_ID)
|
152 |
slacapra |
1.62 |
if (code!=0):
|
153 |
|
|
print " See https://twiki.cern.ch/twiki/bin/view/CMS/JobExitCodes for Exit Code meaning"
|
154 |
spiga |
1.47 |
print " "
|
155 |
spiga |
1.24 |
|
156 |
spiga |
1.47 |
return
|
157 |
|
|
|
158 |
spiga |
1.51 |
def dataToDash(self,job,id,taskId,task_unique_name,dest,jobStatus):
|
159 |
spiga |
1.28 |
jid = job.runningJob['schedulerId']
|
160 |
|
|
job_status_reason = str(job.runningJob['statusReason'])
|
161 |
|
|
job_last_time = str(job.runningJob['startTime'])
|
162 |
ewv |
1.43 |
if common.scheduler.name().upper() in ['CONDOR_G','GLIDEIN']:
|
163 |
spiga |
1.30 |
WMS = 'OSG'
|
164 |
ewv |
1.42 |
taskHash = sha.new(common._db.queryTask('name')).hexdigest()
|
165 |
ewv |
1.43 |
jobId = str(id) + '_https://' + common.scheduler.name() + '/' + taskHash + '/' + str(id)
|
166 |
spiga |
1.28 |
common.logger.debug(5,'JobID for ML monitoring is created for CONDOR_G scheduler:'+jobId)
|
167 |
ewv |
1.42 |
elif common.scheduler.name().upper() in ['LSF','CAF']:
|
168 |
|
|
WMS = common.scheduler.name()
|
169 |
spiga |
1.51 |
jobId=str(id)+"_https://"+common.scheduler.name()+":/"+str(jid)+"-"+string.replace(task_unique_name,"_","-")
|
170 |
ewv |
1.42 |
common.logger.debug(5,'JobID for ML monitoring is created for Local scheduler:'+jobId)
|
171 |
spiga |
1.28 |
else:
|
172 |
ewv |
1.42 |
jobId = str(id) + '_' + str(jid)
|
173 |
|
|
WMS = job.runningJob['service']
|
174 |
|
|
common.logger.debug(5,'JobID for ML monitoring is created for gLite scheduler:'+jobId)
|
175 |
spiga |
1.28 |
pass
|
176 |
|
|
|
177 |
|
|
common.logger.debug(5,"sending info to ML")
|
178 |
|
|
params = {}
|
179 |
|
|
if WMS != None:
|
180 |
|
|
params = {'taskId': taskId, \
|
181 |
|
|
'jobId': jobId,\
|
182 |
|
|
'sid': str(jid), \
|
183 |
|
|
'StatusValueReason': job_status_reason, \
|
184 |
|
|
'StatusValue': jobStatus, \
|
185 |
|
|
'StatusEnterTime': job_last_time, \
|
186 |
|
|
'StatusDestination': dest, \
|
187 |
|
|
'RBname': WMS }
|
188 |
|
|
else:
|
189 |
|
|
params = {'taskId': taskId, \
|
190 |
|
|
'jobId': jobId,\
|
191 |
|
|
'sid': str(jid), \
|
192 |
|
|
'StatusValueReason': job_status_reason, \
|
193 |
|
|
'StatusValue': jobStatus, \
|
194 |
|
|
'StatusEnterTime': job_last_time, \
|
195 |
|
|
'StatusDestination': dest }
|
196 |
|
|
common.logger.debug(5,str(params))
|
197 |
|
|
common.apmon.sendToML(params)
|
198 |
|
|
|
199 |
|
|
return
|
200 |
ewv |
1.33 |
|
201 |
spiga |
1.24 |
def joinIntArray_(self,array) :
|
202 |
|
|
output = ''
|
203 |
|
|
for item in array :
|
204 |
|
|
output += str(item)+','
|
205 |
|
|
if output[-1] == ',' :
|
206 |
|
|
output = output[:-1]
|
207 |
|
|
return output
|