1 |
slacapra |
1.1 |
from Actor import *
|
2 |
slacapra |
1.21 |
import common
|
3 |
spiga |
1.24 |
import string, os, time
|
4 |
|
|
from crab_util import makeCksum
|
5 |
slacapra |
1.1 |
|
6 |
|
|
class Status(Actor):
|
7 |
spiga |
1.24 |
def __init__(self, *args):
|
8 |
|
|
self.cfg_params = args[0]
|
9 |
|
|
|
10 |
|
|
if common.scheduler.name().upper() == 'CONDOR_G':
|
11 |
|
|
# create hash of cfg file
|
12 |
|
|
self.hash = makeCksum(common.work_space.cfgFileName())
|
13 |
slacapra |
1.15 |
else:
|
14 |
spiga |
1.24 |
self.hash = ''
|
15 |
slacapra |
1.1 |
|
16 |
|
|
return
|
17 |
|
|
|
18 |
|
|
def run(self):
|
19 |
|
|
"""
|
20 |
spiga |
1.24 |
The main method of the class: compute the status and print a report
|
21 |
slacapra |
1.1 |
"""
|
22 |
|
|
common.logger.debug(5, "Status::run() called")
|
23 |
|
|
|
24 |
spiga |
1.24 |
start = time.time()
|
25 |
slacapra |
1.15 |
self.compute()
|
26 |
|
|
self.PrintReport_()
|
27 |
spiga |
1.24 |
stop = time.time()
|
28 |
|
|
common.logger.debug(1, "Status Time: "+str(stop - start))
|
29 |
|
|
common.logger.write("Status Time: "+str(stop - start))
|
30 |
slacapra |
1.15 |
pass
|
31 |
|
|
|
32 |
|
|
def compute(self):
|
33 |
|
|
"""
|
34 |
spiga |
1.24 |
compute the status
|
35 |
slacapra |
1.15 |
"""
|
36 |
|
|
|
37 |
spiga |
1.24 |
common.logger.message("Checking the status of all jobs: please wait")
|
38 |
|
|
task = common._db.getTask()
|
39 |
|
|
jobAttributes = common.scheduler.queryEverything(task['id']) ## NeW BL--DS
|
40 |
|
|
task = common._db.getTask()
|
41 |
|
|
toPrint=[]
|
42 |
|
|
for job in task.jobs :
|
43 |
|
|
id = str(job.runningJob['id'])
|
44 |
|
|
jobStatus = str(job.runningJob['statusScheduler'])
|
45 |
|
|
dest = str(job.runningJob['destination']).split(':')[0]
|
46 |
|
|
exe_exit_code = str(job.runningJob['applicationReturnCode'])
|
47 |
|
|
job_exit_code = str(job.runningJob['wrapperReturnCode'])
|
48 |
|
|
printline=''
|
49 |
|
|
|
50 |
|
|
# if jobStatus == 'Done (Success)' or jobStatus == 'Cleared' or jobStatus == 'Done (Aborted)':
|
51 |
|
|
if dest == 'None' : dest = ''
|
52 |
|
|
if exe_exit_code == 'None' : exe_exit_code = ''
|
53 |
|
|
if job_exit_code == 'None' : job_exit_code = ''
|
54 |
|
|
printline+="%-8s %-18s %-40s %-13s %-15s" % (id,jobStatus,dest,exe_exit_code,job_exit_code)
|
55 |
|
|
toPrint.append(printline)
|
56 |
|
|
# elif jobStatus == 'Created':
|
57 |
|
|
# printline+="%-8s %-18s %-40s %-13s %-15s" % (bossid,'Created',dest,'','')
|
58 |
|
|
# pass
|
59 |
|
|
# else:
|
60 |
|
|
# printline+="%-8s %-18s %-40s %-13s %-15s" % (bossid,jobStatus,dest,'','')
|
61 |
|
|
# toPrint.append(printline)
|
62 |
|
|
resFlag = 0
|
63 |
|
|
|
64 |
|
|
## Here to be implemented.. maybe putting stuff in a dedicated funcion.... better if not needed
|
65 |
|
|
#"""
|
66 |
|
|
# if jobStatus != 'Created' and jobStatus != 'Unknown':
|
67 |
|
|
# jid1 = string.strip(jobAttributes[bossid]['SCHED_ID'])
|
68 |
|
|
#
|
69 |
|
|
# jobId = ''
|
70 |
|
|
# if common.scheduler.name().upper() == 'CONDOR_G':
|
71 |
|
|
# jobId = str(bossid) + '_' + self.hash + '_' + string.strip(jobAttributes[bossid]['SCHED_ID'])
|
72 |
|
|
# common.logger.debug(5,'JobID for ML monitoring is created for CONDOR_G scheduler:'+jobId)
|
73 |
|
|
# else:
|
74 |
|
|
# jobId = str(bossid) + '_' + string.strip(jobAttributes[bossid]['SCHED_ID'])
|
75 |
|
|
# if common.scheduler.name() == 'lsf' or common.scheduler.name() == 'caf':
|
76 |
|
|
# jobId=str(bossid)+"_https://"+common.scheduler.name()+":/"+string.strip(jobAttributes[bossid]['SCHED_ID'])+"-"+string.replace(common.taskDB.dict('taskId'),"_","-")
|
77 |
|
|
# common.logger.debug(5,'JobID for ML monitoring is created for LSF scheduler:'+jobId)
|
78 |
|
|
# pass
|
79 |
|
|
# pass
|
80 |
|
|
#
|
81 |
|
|
# common.logger.debug(5,"sending info to ML")
|
82 |
|
|
# params = {}
|
83 |
|
|
# if RB != None:
|
84 |
|
|
# params = {'taskId': common.taskDB.dict('taskId'), \
|
85 |
|
|
# 'jobId': jobId,\
|
86 |
|
|
# 'sid': string.strip(jobAttributes[bossid]['SCHED_ID']), \
|
87 |
|
|
# 'StatusValueReason': job_status_reason, \
|
88 |
|
|
# 'StatusValue': jobStatus, \
|
89 |
|
|
# 'StatusEnterTime': job_last_time, \
|
90 |
|
|
# 'StatusDestination': dest, \
|
91 |
|
|
# 'RBname': RB }
|
92 |
|
|
# else:
|
93 |
|
|
# params = {'taskId': common.taskDB.dict('taskId'), \
|
94 |
|
|
# 'jobId': jobId,\
|
95 |
|
|
# 'sid': string.strip(jobAttributes[bossid]['SCHED_ID']), \
|
96 |
|
|
# 'StatusValueReason': job_status_reason, \
|
97 |
|
|
# 'StatusValue': jobStatus, \
|
98 |
|
|
# 'StatusEnterTime': job_last_time, \
|
99 |
|
|
# 'StatusDestination': dest }
|
100 |
|
|
# common.logger.debug(5,str(params))
|
101 |
|
|
#
|
102 |
|
|
# common.apmon.sendToML(params)
|
103 |
|
|
## if printline != '':
|
104 |
|
|
## print printline
|
105 |
corvo |
1.16 |
|
106 |
spiga |
1.24 |
self.detailedReport(toPrint)
|
107 |
|
|
# self.update_(for_summary)
|
108 |
|
|
return
|
109 |
slacapra |
1.1 |
|
110 |
spiga |
1.24 |
def detailedReport(self, lines):
|
111 |
spiga |
1.9 |
|
112 |
spiga |
1.24 |
counter = 0
|
113 |
|
|
printline = ''
|
114 |
|
|
printline+= "%-8s %-18s %-40s %-13s %-15s" % ('ID','STATUS','E_HOST','EXE_EXIT_CODE','JOB_EXIT_STATUS')
|
115 |
|
|
print printline
|
116 |
|
|
print '---------------------------------------------------------------------------------------------------'
|
117 |
|
|
|
118 |
|
|
for i in range(len(lines)):
|
119 |
|
|
if counter != 0 and counter%10 == 0 :
|
120 |
|
|
print '---------------------------------------------------------------------------------------------------'
|
121 |
|
|
print lines[i]
|
122 |
|
|
counter += 1
|
123 |
|
|
|
124 |
|
|
def PrintReport_(self):
|
125 |
|
|
|
126 |
|
|
# query sui distinct statusScheduler
|
127 |
|
|
#distinct_status = common._db.queryDistJob('dlsDestination')
|
128 |
|
|
possible_status = [
|
129 |
|
|
'Undefined',
|
130 |
|
|
'Submitted',
|
131 |
|
|
'Waiting',
|
132 |
|
|
'Ready',
|
133 |
|
|
'Scheduled',
|
134 |
|
|
'Running',
|
135 |
|
|
'Done',
|
136 |
|
|
'Cancelled',
|
137 |
|
|
'Aborted',
|
138 |
|
|
'Unknown',
|
139 |
|
|
'Done(failed)'
|
140 |
|
|
'Cleared'
|
141 |
|
|
]
|
142 |
slacapra |
1.1 |
|
143 |
spiga |
1.24 |
print ''
|
144 |
|
|
print ">>>>>>>>> %i Total Jobs " % (common._db.nJobs())
|
145 |
|
|
print ''
|
146 |
|
|
list_ID=[]
|
147 |
|
|
for st in possible_status:
|
148 |
|
|
list_ID = common._db.queryAttrRunJob({'statusScheduler':st},'jobId')
|
149 |
|
|
if len(list_ID)>0:
|
150 |
|
|
print ">>>>>>>>> %i Jobs %s " % (len(list_ID), str(st))#,len(list_ID)
|
151 |
|
|
if st == 'killed' or st == 'Aborted': print " You can resubmit them specifying JOB numbers: crab -resubmit JOB_number <Jobs list>"
|
152 |
|
|
if st == 'Done' : print " Retrieve them with: crab -getoutput <Jobs list>"
|
153 |
|
|
if st == 'Cleared': print " %i Jobs with EXE_EXIT_CODE: %s" % (len(common._db.queryDistJob('wrapperReturnCode')))
|
154 |
|
|
print " List of jobs: %s" % str(common._db.queryAttrRunJob({'statusScheduler':st},'jobId'))
|
155 |
|
|
print " "
|
156 |
|
|
|
157 |
|
|
# if (len(self.countCorrupt) != 0):
|
158 |
|
|
# self.countCorrupt.sort()
|
159 |
|
|
# print ''
|
160 |
|
|
# print ">>>>>>>>> %i Jobs cleared with corrupt output" % len(self.countCorrupt)
|
161 |
|
|
# print " List of jobs: %s" % self.joinIntArray_(self.countCorrupt)
|
162 |
|
|
# print " You can resubmit them specifying JOB numbers: crab -resubmit JOB_number <Jobs list>"
|
163 |
|
|
# if (len(self.countCleared.keys()) != 0):
|
164 |
|
|
# total_size = 0
|
165 |
|
|
# for key in self.countCleared.keys() :
|
166 |
|
|
# total_size += len(self.countCleared[key])
|
167 |
|
|
# print ''
|
168 |
slacapra |
1.1 |
|
169 |
|
|
|
170 |
|
|
|
171 |
spiga |
1.24 |
def joinIntArray_(self,array) :
|
172 |
|
|
output = ''
|
173 |
|
|
for item in array :
|
174 |
|
|
output += str(item)+','
|
175 |
|
|
if output[-1] == ',' :
|
176 |
|
|
output = output[:-1]
|
177 |
|
|
return output
|
178 |
slacapra |
1.1 |
|