ViewVC Help
View File | Revision Log | Show Annotations | Root Listing
root/cvsroot/COMP/CRAB/python/Status.py
Revision: 1.82
Committed: Sat Jul 18 01:14:11 2009 UTC (15 years, 9 months ago) by spiga
Content type: text/x-python
Branch: MAIN
CVS Tags: test_1, CRAB_2_6_2, CRAB_2_6_2_pre2, CRAB_2_6_2_pre1, CRAB_2_6_1_pre4, CRAB_2_6_1_pre3, CRAB_2_6_1_pre2, CRAB_2_6_1_pre1, CRAB_2_6_1
Branch point for: CRAB_2_6_X_br
Changes since 1.81: +4 -3 lines
Log Message:
fix Segmentation fault

File Contents

# User Rev Content
1 slacapra 1.1 from Actor import *
2 slacapra 1.21 import common
3 spiga 1.24 import string, os, time
4 ewv 1.42 import sha
5 spiga 1.28 from crab_util import *
6 slacapra 1.1
7 mcinquil 1.53
8 slacapra 1.1 class Status(Actor):
9 spiga 1.24 def __init__(self, *args):
10     self.cfg_params = args[0]
11 slacapra 1.68 self.verbose = (str(self.cfg_params.get("CRAB.status",'')) in ('verbose','v'))
12 spiga 1.29 self.xml = self.cfg_params.get("USER.xml_report",'')
13 spiga 1.56 self.server_name = ''
14    
15 slacapra 1.1 return
16    
17     def run(self):
18     """
19 spiga 1.24 The main method of the class: compute the status and print a report
20 slacapra 1.1 """
21 spiga 1.65 common.logger.debug( "Status::run() called")
22 slacapra 1.1
23 spiga 1.24 start = time.time()
24 mcinquil 1.53
25 spiga 1.28 self.query()
26 slacapra 1.15 self.PrintReport_()
27 spiga 1.56 ## TEMPORARY FIXME Ds
28     msg = showWebMon(self.server_name)
29 spiga 1.65 common.logger.info(msg)
30 mcinquil 1.53
31 spiga 1.24 stop = time.time()
32 spiga 1.65 common.logger.debug( "Status Time: "+str(stop - start))
33 slacapra 1.15 pass
34    
35 spiga 1.52 def query(self,display=True):
36 slacapra 1.15 """
37 spiga 1.24 compute the status
38 slacapra 1.15 """
39 spiga 1.65 common.logger.info("Checking the status of all jobs: please wait")
40 spiga 1.24 task = common._db.getTask()
41 ewv 1.33 upTask = common.scheduler.queryEverything(task['id'])
42 spiga 1.52 self.compute(upTask,display)
43 spiga 1.82
44 spiga 1.28
45 spiga 1.52 def compute(self, up_task, display=True ):
46 ewv 1.33
47 spiga 1.24 toPrint=[]
48 spiga 1.61 taskId = str(up_task['name'])
49     task_unique_name = str(up_task['name'])
50 mcinquil 1.58 ended = None
51 ewv 1.44
52 slacapra 1.78 run_jobToSaveTerm = {'state' :'Terminated'}
53     run_jobToSaveAbort = {'state' :'Aborted'}
54 spiga 1.73 jobToSave = {'closed' :'Y'}
55 spiga 1.64 listId=[]
56     listRunField=[]
57 spiga 1.73 listJobField=[]
58 spiga 1.64
59 spiga 1.37 self.wrapErrorList = []
60 spiga 1.67 msg='\n'
61 spiga 1.25 for job in up_task.jobs :
62 spiga 1.35 id = str(job.runningJob['jobId'])
63 spiga 1.24 jobStatus = str(job.runningJob['statusScheduler'])
64 slacapra 1.68 jobState = str(job.runningJob['state'])
65 spiga 1.24 dest = str(job.runningJob['destination']).split(':')[0]
66     exe_exit_code = str(job.runningJob['applicationReturnCode'])
67 ewv 1.33 job_exit_code = str(job.runningJob['wrapperReturnCode'])
68 spiga 1.37 self.wrapErrorList.append(job_exit_code)
69 spiga 1.72 ended = str(job['closed'])
70 spiga 1.24 printline=''
71     if dest == 'None' : dest = ''
72 spiga 1.82 # from ProdCommon.SiteDB.CmsSiteMapper import CECmsMap
73     # ce_cms = CECmsMap()
74     # dest = ce_cms[dest]
75 slacapra 1.77
76 spiga 1.24 if exe_exit_code == 'None' : exe_exit_code = ''
77     if job_exit_code == 'None' : job_exit_code = ''
78 spiga 1.64 if job.runningJob['state'] == 'SubRequested' : jobStatus = 'Submitting'
79 spiga 1.76 if job.runningJob['state'] == 'Terminated': jobStatus = 'Done'
80 slacapra 1.71 #This is needed for StandAlone
81 spiga 1.81 if job.runningJob['status'] in ['SD','DA'] :
82 spiga 1.64 listId.append(id)
83 slacapra 1.78 listRunField.append(run_jobToSaveTerm)
84 spiga 1.73 listJobField.append(jobToSave)
85 spiga 1.81 elif job.runningJob['status'] in ['A'] :
86 slacapra 1.78 listId.append(id)
87     listRunField.append(run_jobToSaveAbort)
88     listJobField.append(jobToSave)
89    
90 slacapra 1.68 if (self.verbose) :printline+="%-6s %-18s %-14s %-36s %-13s %-16s %-4s" % (id,jobStatus,jobState,dest,exe_exit_code,job_exit_code,ended)
91 spiga 1.72 else: printline+="%-6s %-18s %-36s %-13s %-16s " % (id,jobStatus,dest,exe_exit_code,job_exit_code)
92 spiga 1.24 toPrint.append(printline)
93 corvo 1.16
94 spiga 1.28 if jobStatus is not None:
95 spiga 1.67 msg += self.dataToDash(job,id,taskId,task_unique_name,dest,jobStatus)
96     common.logger.log(10-1,msg)
97 slacapra 1.71 #This is needed for StandAlone
98 spiga 1.73 if len(listId) > 0 :
99     common._db.updateRunJob_(listId, listRunField)
100     common._db.updateJob_(listId, listJobField)
101 spiga 1.28 header = ''
102 spiga 1.72
103     if (self.verbose):
104     header+= "%-6s %-18s %-14s %-36s %-13s %-16s %-4s\n" % ('ID','STATUS','ACTION','E_HOST','EXE_EXIT_CODE','JOB_EXIT_STATUS','ENDED')
105     header+= '------------------------------------------------------------------------------------------------------------------\n'
106     else:
107     header+= "%-6s %-18s %-36s %-13s %-16s\n" % ('ID','STATUS','E_HOST','EXE_EXIT_CODE','JOB_EXIT_STATUS')
108     header+= '--------------------------------------------------------------------------------------------\n'
109 slacapra 1.1
110 spiga 1.52 if display: displayReport(self,header,toPrint,self.xml)
111 spiga 1.9
112 spiga 1.28 return
113 spiga 1.24
114     def PrintReport_(self):
115    
116 spiga 1.39
117     possible_status = [
118 spiga 1.41 'Created',
119 spiga 1.39 'Undefined',
120 ewv 1.42 'Submitting',
121 spiga 1.39 'Submitted',
122     'Waiting',
123     'Ready',
124     'Scheduled',
125     'Running',
126     'Done',
127 ewv 1.42 'Killing',
128 spiga 1.41 'Killed',
129 spiga 1.39 'Aborted',
130     'Unknown',
131 spiga 1.49 'Done (Failed)',
132 spiga 1.47 'Cleared',
133 slacapra 1.74 'Retrieved',
134     'NotSubmitted',
135     'CannotSubmit'
136 spiga 1.39 ]
137    
138 spiga 1.37 jobs = common._db.nJobs('list')
139     WrapExitCode = list(set(self.wrapErrorList))
140 slacapra 1.79
141     task = common._db.getTask()
142    
143 spiga 1.66 msg= " %i Total Jobs \n" % (len(jobs))
144 ewv 1.33 list_ID=[]
145 spiga 1.37 for c in WrapExitCode:
146 spiga 1.49 if c != 'None':
147     self.reportCodes(c)
148     else:
149 spiga 1.39 for st in possible_status:
150     list_ID = common._db.queryAttrRunJob({'statusScheduler':st},'jobId')
151     if len(list_ID)>0:
152 ewv 1.42 if st == 'killed':
153 spiga 1.66 msg+= ">>>>>>>>> %i Jobs %s \n" % (len(list_ID), str(st))
154     msg+= "\tYou can resubmit them specifying JOB numbers: crab -resubmit <List of jobs>\n"
155     msg+= "\tList of jobs: %s \n" % readableList(self,list_ID)
156 ewv 1.42 elif st == 'Aborted':
157 spiga 1.66 msg+= ">>>>>>>>> %i Jobs %s\n " % (len(list_ID), str(st))
158     msg+= "\tYou can resubmit them specifying JOB numbers: crab -resubmit <List of jobs>\n"
159     msg+= "\tList of jobs: %s \n" % readableList(self,list_ID)
160 spiga 1.49 elif st == 'Done' or st == 'Done (Failed)' :
161 spiga 1.66 msg+= ">>>>>>>>> %i Jobs %s\n " % (len(list_ID), str(st))
162 slacapra 1.79 terminatedListId=[]
163     notTerminatedListId=[]
164     for id in list_ID:
165     job = task.jobs[id-1]
166     if job.runningJob['state']=='Terminated': terminatedListId.append(id)
167     else: notTerminatedListId.append(id)
168 slacapra 1.80 if len(notTerminatedListId)>0:
169     msg+= "\tJobs still in final phase: cannot retrieve them, yet\n"
170     msg+= "\tList of jobs: %s \n" % readableList(self,notTerminatedListId)
171     if len(terminatedListId)>0:
172     msg+= "\tJobs terminated: retrieve them with: crab -getoutput <List of jobs>\n"
173     msg+= "\tList of jobs: %s \n" % readableList(self,terminatedListId)
174 slacapra 1.74 elif st in ['NotSubmitted','CannotSubmit']:
175     msg+= ">>>>>>>>> %i Jobs %s\n " % (len(list_ID), str(st))
176     msg+= "\tCheck if they match resources with: crab -match <List of jobs>\n"
177     msg+= "\tIf not, check data location (eg your data is just on T1's)\n"
178     msg+= "\tand software version installation (eg the version you are using has been deprecared and is beeing removed\n"
179     msg+= "\tList of jobs: %s \n" % readableList(self,list_ID)
180 ewv 1.42 else :
181 spiga 1.66 msg+= ">>>>>>>>> %i Jobs %s \n " % (len(list_ID), str(st))
182     msg+= "\tList of jobs %s: %s \n" % (str(st),readableList(self,list_ID))
183     common.logger.info(msg)
184 slacapra 1.59 return
185 spiga 1.39
186 spiga 1.48 def reportCodes(self,code):
187 spiga 1.47 """
188     """
189 spiga 1.48 list_ID = common._db.queryAttrRunJob({'wrapperReturnCode':code},'jobId')
190 spiga 1.47 if len(list_ID)>0:
191 spiga 1.66 msg = 'ExitCodes Summary\n'
192     msg += ">>>>>>>>> %i Jobs with Wrapper Exit Code : %s \n " % (len(list_ID), str(code))
193     msg += "\t List of jobs: %s \n" % readableList(self,list_ID)
194 slacapra 1.62 if (code!=0):
195 spiga 1.66 msg += "\tSee https://twiki.cern.ch/twiki/bin/view/CMS/JobExitCodes for Exit Code meaning\n"
196 spiga 1.24
197 spiga 1.66 common.logger.info(msg)
198 spiga 1.47 return
199    
200 spiga 1.51 def dataToDash(self,job,id,taskId,task_unique_name,dest,jobStatus):
201 spiga 1.28 jid = job.runningJob['schedulerId']
202     job_status_reason = str(job.runningJob['statusReason'])
203     job_last_time = str(job.runningJob['startTime'])
204 spiga 1.67 msg = ''
205 ewv 1.43 if common.scheduler.name().upper() in ['CONDOR_G','GLIDEIN']:
206 spiga 1.30 WMS = 'OSG'
207 ewv 1.42 taskHash = sha.new(common._db.queryTask('name')).hexdigest()
208 ewv 1.43 jobId = str(id) + '_https://' + common.scheduler.name() + '/' + taskHash + '/' + str(id)
209 spiga 1.67 msg += ('JobID for ML monitoring is created for CONDOR_G scheduler: %s\n'%jobId)
210 ewv 1.42 elif common.scheduler.name().upper() in ['LSF','CAF']:
211     WMS = common.scheduler.name()
212 spiga 1.51 jobId=str(id)+"_https://"+common.scheduler.name()+":/"+str(jid)+"-"+string.replace(task_unique_name,"_","-")
213 spiga 1.67 msg += ('JobID for ML monitoring is created for Local scheduler: %s\n'%jobId)
214 edelmann 1.69 elif common.scheduler.name().upper() in ['ARC']:
215     taskHash = sha.new(common._db.queryTask('name')).hexdigest()
216     jobId = str(id) + '_https://' + common.scheduler.name() + '/' + taskHash + '/' + str(id)
217     msg += ('JobID for ML monitoring is created for ARC scheduler: %s\n'%jobId)
218     WMS = 'ARC'
219 spiga 1.28 else:
220 ewv 1.42 jobId = str(id) + '_' + str(jid)
221     WMS = job.runningJob['service']
222 spiga 1.67 msg += ('JobID for ML monitoring is created for gLite scheduler: %s'%jobId)
223 spiga 1.28 pass
224    
225 spiga 1.67 msg += ("sending info to ML\n")
226 spiga 1.28 params = {}
227     if WMS != None:
228     params = {'taskId': taskId, \
229     'jobId': jobId,\
230     'sid': str(jid), \
231     'StatusValueReason': job_status_reason, \
232     'StatusValue': jobStatus, \
233     'StatusEnterTime': job_last_time, \
234     'StatusDestination': dest, \
235     'RBname': WMS }
236     else:
237     params = {'taskId': taskId, \
238     'jobId': jobId,\
239     'sid': str(jid), \
240     'StatusValueReason': job_status_reason, \
241     'StatusValue': jobStatus, \
242     'StatusEnterTime': job_last_time, \
243     'StatusDestination': dest }
244 spiga 1.67 msg += ('%s\n'%str(params))
245 spiga 1.28 common.apmon.sendToML(params)
246    
247 spiga 1.67 return msg
248 ewv 1.33
249 spiga 1.24 def joinIntArray_(self,array) :
250     output = ''
251     for item in array :
252     output += str(item)+','
253     if output[-1] == ',' :
254     output = output[:-1]
255     return output