ViewVC Help
View File | Revision Log | Show Annotations | Root Listing
root/cvsroot/COMP/CRAB/python/Status.py
Revision: 1.41
Committed: Mon Jul 7 13:30:59 2008 UTC (16 years, 9 months ago) by spiga
Content type: text/x-python
Branch: MAIN
CVS Tags: CRAB_2_3_2_pre1, CRAB_2_4_0_test
Changes since 1.40: +4 -2 lines
Log Message:
fixes for Status reporting while using the server. Plus some print adjustment

File Contents

# User Rev Content
1 slacapra 1.1 from Actor import *
2 slacapra 1.21 import common
3 spiga 1.24 import string, os, time
4 spiga 1.28 from crab_util import *
5 slacapra 1.1
6     class Status(Actor):
7 spiga 1.24 def __init__(self, *args):
8     self.cfg_params = args[0]
9    
10 spiga 1.29 self.xml = self.cfg_params.get("USER.xml_report",'')
11 slacapra 1.1
12     return
13    
14     def run(self):
15     """
16 spiga 1.24 The main method of the class: compute the status and print a report
17 slacapra 1.1 """
18     common.logger.debug(5, "Status::run() called")
19    
20 spiga 1.24 start = time.time()
21 spiga 1.28 self.query()
22 slacapra 1.15 self.PrintReport_()
23 spiga 1.24 stop = time.time()
24     common.logger.debug(1, "Status Time: "+str(stop - start))
25     common.logger.write("Status Time: "+str(stop - start))
26 slacapra 1.15 pass
27    
28 spiga 1.28 def query(self):
29 slacapra 1.15 """
30 spiga 1.24 compute the status
31 slacapra 1.15 """
32 spiga 1.24 common.logger.message("Checking the status of all jobs: please wait")
33     task = common._db.getTask()
34 ewv 1.33 upTask = common.scheduler.queryEverything(task['id'])
35 spiga 1.28 self.compute(upTask)
36    
37     def compute(self, up_task):
38 ewv 1.33
39 spiga 1.24 toPrint=[]
40 spiga 1.32 taskId= str("_".join(str(up_task['name']).split('_')[:-1]))
41 spiga 1.37 self.wrapErrorList = []
42 spiga 1.25 for job in up_task.jobs :
43 spiga 1.35 id = str(job.runningJob['jobId'])
44 spiga 1.24 jobStatus = str(job.runningJob['statusScheduler'])
45     dest = str(job.runningJob['destination']).split(':')[0]
46     exe_exit_code = str(job.runningJob['applicationReturnCode'])
47 ewv 1.33 job_exit_code = str(job.runningJob['wrapperReturnCode'])
48 spiga 1.37 self.wrapErrorList.append(job_exit_code)
49 spiga 1.24 printline=''
50 spiga 1.28 header = ''
51 spiga 1.24 if dest == 'None' : dest = ''
52     if exe_exit_code == 'None' : exe_exit_code = ''
53     if job_exit_code == 'None' : job_exit_code = ''
54     printline+="%-8s %-18s %-40s %-13s %-15s" % (id,jobStatus,dest,exe_exit_code,job_exit_code)
55     toPrint.append(printline)
56 corvo 1.16
57 spiga 1.28 if jobStatus is not None:
58     self.dataToDash(job,id,taskId,dest,jobStatus)
59 ewv 1.33
60 spiga 1.28 header = ''
61     header+= "%-8s %-18s %-40s %-13s %-15s" % ('ID','STATUS','E_HOST','EXE_EXIT_CODE','JOB_EXIT_STATUS')
62 slacapra 1.1
63 spiga 1.28 displayReport(self,header,toPrint,self.xml)
64 spiga 1.9
65 spiga 1.28 return
66 spiga 1.24
67     def PrintReport_(self):
68    
69 spiga 1.39
70     possible_status = [
71 spiga 1.41 'Created',
72 spiga 1.39 'Undefined',
73 spiga 1.41 'Submitting',
74 spiga 1.39 'Submitted',
75     'Waiting',
76     'Ready',
77     'Scheduled',
78     'Running',
79     'Done',
80 spiga 1.41 'Killing',
81     'Killed',
82 spiga 1.39 'Aborted',
83     'Unknown',
84     'done(failed)',
85     'cleared',
86     'retrieved'
87     ]
88    
89 spiga 1.37 jobs = common._db.nJobs('list')
90    
91     WrapExitCode = list(set(self.wrapErrorList))
92 spiga 1.24 print ''
93 spiga 1.37 print ">>>>>>>>> %i Total Jobs " % (len(jobs))
94 spiga 1.24 print ''
95 ewv 1.33 list_ID=[]
96 spiga 1.37 for c in WrapExitCode:
97 spiga 1.38 if c != 'None':
98     list_ID = common._db.queryAttrRunJob({'wrapperReturnCode':c},'jobId')
99     if len(list_ID)>0:
100 spiga 1.39 print ">>>>>>>>> %i Jobs with Wrapper Exit Code : %s " % (len(list_ID), str(c))
101 spiga 1.38 print " List of jobs: %s" % self.readableList(list_ID)
102     print " "
103 spiga 1.39 else:
104     for st in possible_status:
105     list_ID = common._db.queryAttrRunJob({'statusScheduler':st},'jobId')
106     if len(list_ID)>0:
107     if st == 'killed':
108     print ">>>>>>>>> %i Jobs %s " % (len(list_ID), str(st))
109     print " You can resubmit them specifying JOB numbers: crab -resubmit JOB_number <List of jobs>"
110     print " List of jobs: %s \n" % self.readableList(list_ID)
111     elif st == 'Aborted':
112     print ">>>>>>>>> %i Jobs %s " % (len(list_ID), str(st))
113     print " You can resubmit them specifying JOB numbers: crab -resubmit JOB_number <List of jobs>"
114     print " List of jobs: %s \n" % self.readableList(list_ID)
115     elif st == 'Done' :
116     print ">>>>>>>>> %i Jobs %s " % (len(list_ID), str(st))
117     print " Retrieve them with: crab -getoutput <List of jobs>"
118     print " List of jobs: %s \n" % self.readableList(list_ID)
119     else :
120     print ">>>>>>>>> %i Jobs %s \n " % (len(list_ID), str(st))
121    
122 spiga 1.24
123 ewv 1.33 def readableList(self,rawList):
124     listString = str(rawList[0])
125     endRange = ''
126     for i in range(1,len(rawList)):
127     if rawList[i] == rawList[i-1]+1:
128     endRange = str(rawList[i])
129     else:
130     if endRange:
131 ewv 1.34 listString += '-' + endRange + ',' + str(rawList[i])
132 ewv 1.33 endRange = ''
133     else:
134 ewv 1.34 listString += ',' + str(rawList[i])
135 ewv 1.33 if endRange:
136     listString += '-' + endRange
137     endRange = ''
138    
139     return listString
140    
141 slacapra 1.1
142 spiga 1.28 def dataToDash(self,job,id,taskId,dest,jobStatus):
143 ewv 1.33
144 slacapra 1.1
145 spiga 1.28 jid = job.runningJob['schedulerId']
146     job_status_reason = str(job.runningJob['statusReason'])
147     job_last_time = str(job.runningJob['startTime'])
148     if common.scheduler.name().upper() == 'CONDOR_G':
149 spiga 1.30 WMS = 'OSG'
150 spiga 1.28 self.hash = makeCksum(common.work_space.cfgFileName())
151     jobId = str(id) + '_' + self.hash + '_' + str(jid)
152     common.logger.debug(5,'JobID for ML monitoring is created for CONDOR_G scheduler:'+jobId)
153     else:
154     if common.scheduler.name() in ['lsf','caf']:
155 spiga 1.30 WMS = common.scheduler.name()
156     jobId=str(id)+"_https://"+common.scheduler.name()+":/"+str(jid)+"-"+string.replace(taskId,"_","-")
157 spiga 1.28 common.logger.debug(5,'JobID for ML monitoring is created for Local scheduler:'+jobId)
158 spiga 1.30 else:
159     jobId = str(id) + '_' + str(jid)
160     WMS = job.runningJob['service']
161     common.logger.debug(5,'JobID for ML monitoring is created for gLite scheduler:'+jobId)
162 spiga 1.28 pass
163     pass
164    
165     common.logger.debug(5,"sending info to ML")
166     params = {}
167     if WMS != None:
168     params = {'taskId': taskId, \
169     'jobId': jobId,\
170     'sid': str(jid), \
171     'StatusValueReason': job_status_reason, \
172     'StatusValue': jobStatus, \
173     'StatusEnterTime': job_last_time, \
174     'StatusDestination': dest, \
175     'RBname': WMS }
176     else:
177     params = {'taskId': taskId, \
178     'jobId': jobId,\
179     'sid': str(jid), \
180     'StatusValueReason': job_status_reason, \
181     'StatusValue': jobStatus, \
182     'StatusEnterTime': job_last_time, \
183     'StatusDestination': dest }
184     common.logger.debug(5,str(params))
185     common.apmon.sendToML(params)
186    
187     return
188 ewv 1.33
189 spiga 1.24 def joinIntArray_(self,array) :
190     output = ''
191     for item in array :
192     output += str(item)+','
193     if output[-1] == ',' :
194     output = output[:-1]
195     return output
196 slacapra 1.1