ViewVC Help
View File | Revision Log | Show Annotations | Root Listing
root/cvsroot/COMP/CRAB/python/Status.py
Revision: 1.52
Committed: Tue Nov 11 13:53:32 2008 UTC (16 years, 5 months ago) by spiga
Content type: text/x-python
Branch: MAIN
CVS Tags: CRAB_2_4_3_pre2, CRAB_2_4_3_pre1, CRAB_2_4_2
Changes since 1.51: +4 -4 lines
Log Message:
added support to avoid print on screen while querying task status

File Contents

# User Rev Content
1 slacapra 1.1 from Actor import *
2 slacapra 1.21 import common
3 spiga 1.24 import string, os, time
4 ewv 1.42 import sha
5 spiga 1.28 from crab_util import *
6 slacapra 1.1
7     class Status(Actor):
8 spiga 1.24 def __init__(self, *args):
9     self.cfg_params = args[0]
10 spiga 1.29 self.xml = self.cfg_params.get("USER.xml_report",'')
11 slacapra 1.1
12     return
13    
14     def run(self):
15     """
16 spiga 1.24 The main method of the class: compute the status and print a report
17 slacapra 1.1 """
18     common.logger.debug(5, "Status::run() called")
19    
20 spiga 1.24 start = time.time()
21 spiga 1.28 self.query()
22 slacapra 1.15 self.PrintReport_()
23 spiga 1.24 stop = time.time()
24     common.logger.debug(1, "Status Time: "+str(stop - start))
25     common.logger.write("Status Time: "+str(stop - start))
26 slacapra 1.15 pass
27    
28 spiga 1.52 def query(self,display=True):
29 slacapra 1.15 """
30 spiga 1.24 compute the status
31 slacapra 1.15 """
32 spiga 1.24 common.logger.message("Checking the status of all jobs: please wait")
33     task = common._db.getTask()
34 ewv 1.33 upTask = common.scheduler.queryEverything(task['id'])
35 spiga 1.52 self.compute(upTask,display)
36 spiga 1.28
37 spiga 1.52 def compute(self, up_task, display=True ):
38 ewv 1.33
39 spiga 1.24 toPrint=[]
40 ewv 1.44 taskId = uniqueTaskName(up_task['name'])
41 spiga 1.51 task_unique_name = up_task['name']
42 ewv 1.44
43 spiga 1.37 self.wrapErrorList = []
44 spiga 1.25 for job in up_task.jobs :
45 spiga 1.35 id = str(job.runningJob['jobId'])
46 spiga 1.24 jobStatus = str(job.runningJob['statusScheduler'])
47     dest = str(job.runningJob['destination']).split(':')[0]
48     exe_exit_code = str(job.runningJob['applicationReturnCode'])
49 ewv 1.33 job_exit_code = str(job.runningJob['wrapperReturnCode'])
50 spiga 1.37 self.wrapErrorList.append(job_exit_code)
51 spiga 1.45 res_ID = str(job.runningJob['submission'])
52 spiga 1.24 printline=''
53 spiga 1.28 header = ''
54 spiga 1.24 if dest == 'None' : dest = ''
55     if exe_exit_code == 'None' : exe_exit_code = ''
56     if job_exit_code == 'None' : job_exit_code = ''
57 spiga 1.50 printline+="%-6s %-18s %-36s %-13s %-16s %-4s" % (id,jobStatus,dest,exe_exit_code,job_exit_code,res_ID)
58 spiga 1.24 toPrint.append(printline)
59 corvo 1.16
60 spiga 1.28 if jobStatus is not None:
61 spiga 1.51 self.dataToDash(job,id,taskId,task_unique_name,dest,jobStatus)
62 spiga 1.28 header = ''
63 spiga 1.50 header+= "%-6s %-18s %-36s %-13s %-16s %-4s" % ('ID','STATUS','E_HOST','EXE_EXIT_CODE','JOB_EXIT_STATUS','#SUB')
64 slacapra 1.1
65 spiga 1.52 if display: displayReport(self,header,toPrint,self.xml)
66 spiga 1.9
67 spiga 1.28 return
68 spiga 1.24
69     def PrintReport_(self):
70    
71 spiga 1.39
72     possible_status = [
73 spiga 1.41 'Created',
74 spiga 1.39 'Undefined',
75 ewv 1.42 'Submitting',
76 spiga 1.39 'Submitted',
77     'Waiting',
78     'Ready',
79     'Scheduled',
80     'Running',
81     'Done',
82 ewv 1.42 'Killing',
83 spiga 1.41 'Killed',
84 spiga 1.39 'Aborted',
85     'Unknown',
86 spiga 1.49 'Done (Failed)',
87 spiga 1.47 'Cleared',
88 ewv 1.42 'retrieved'
89 spiga 1.39 ]
90    
91 spiga 1.37 jobs = common._db.nJobs('list')
92     WrapExitCode = list(set(self.wrapErrorList))
93 spiga 1.24 print ''
94 spiga 1.37 print ">>>>>>>>> %i Total Jobs " % (len(jobs))
95 spiga 1.24 print ''
96 ewv 1.33 list_ID=[]
97 spiga 1.37 for c in WrapExitCode:
98 spiga 1.49 if c != 'None':
99     self.reportCodes(c)
100     else:
101 spiga 1.39 for st in possible_status:
102     list_ID = common._db.queryAttrRunJob({'statusScheduler':st},'jobId')
103     if len(list_ID)>0:
104 ewv 1.42 if st == 'killed':
105 spiga 1.39 print ">>>>>>>>> %i Jobs %s " % (len(list_ID), str(st))
106 spiga 1.46 print " You can resubmit them specifying JOB numbers: crab -resubmit <List of jobs>"
107 spiga 1.47 print " List of jobs: %s \n" % readableList(self,list_ID)
108 ewv 1.42 elif st == 'Aborted':
109 spiga 1.39 print ">>>>>>>>> %i Jobs %s " % (len(list_ID), str(st))
110 spiga 1.46 print " You can resubmit them specifying JOB numbers: crab -resubmit <List of jobs>"
111 spiga 1.47 print " List of jobs: %s \n" % readableList(self,list_ID)
112 spiga 1.49 elif st == 'Done' or st == 'Done (Failed)' :
113 spiga 1.39 print ">>>>>>>>> %i Jobs %s " % (len(list_ID), str(st))
114     print " Retrieve them with: crab -getoutput <List of jobs>"
115 spiga 1.47 print " List of jobs: %s \n" % readableList(self,list_ID)
116 ewv 1.42 else :
117 spiga 1.39 print ">>>>>>>>> %i Jobs %s \n " % (len(list_ID), str(st))
118    
119 spiga 1.48 def reportCodes(self,code):
120 spiga 1.47 """
121     """
122 spiga 1.48 list_ID = common._db.queryAttrRunJob({'wrapperReturnCode':code},'jobId')
123 spiga 1.47 if len(list_ID)>0:
124 spiga 1.48 print ">>>>>>>>> %i Jobs with Wrapper Exit Code : %s " % (len(list_ID), str(code))
125 spiga 1.47 print " List of jobs: %s" % readableList(self,list_ID)
126     print " "
127 spiga 1.24
128 spiga 1.47 return
129    
130 spiga 1.51 def dataToDash(self,job,id,taskId,task_unique_name,dest,jobStatus):
131 spiga 1.28 jid = job.runningJob['schedulerId']
132     job_status_reason = str(job.runningJob['statusReason'])
133     job_last_time = str(job.runningJob['startTime'])
134 ewv 1.43 if common.scheduler.name().upper() in ['CONDOR_G','GLIDEIN']:
135 spiga 1.30 WMS = 'OSG'
136 ewv 1.42 taskHash = sha.new(common._db.queryTask('name')).hexdigest()
137 ewv 1.43 jobId = str(id) + '_https://' + common.scheduler.name() + '/' + taskHash + '/' + str(id)
138 spiga 1.28 common.logger.debug(5,'JobID for ML monitoring is created for CONDOR_G scheduler:'+jobId)
139 ewv 1.42 elif common.scheduler.name().upper() in ['LSF','CAF']:
140     WMS = common.scheduler.name()
141 spiga 1.51 jobId=str(id)+"_https://"+common.scheduler.name()+":/"+str(jid)+"-"+string.replace(task_unique_name,"_","-")
142 ewv 1.42 common.logger.debug(5,'JobID for ML monitoring is created for Local scheduler:'+jobId)
143 spiga 1.28 else:
144 ewv 1.42 jobId = str(id) + '_' + str(jid)
145     WMS = job.runningJob['service']
146     common.logger.debug(5,'JobID for ML monitoring is created for gLite scheduler:'+jobId)
147 spiga 1.28 pass
148    
149     common.logger.debug(5,"sending info to ML")
150     params = {}
151     if WMS != None:
152     params = {'taskId': taskId, \
153     'jobId': jobId,\
154     'sid': str(jid), \
155     'StatusValueReason': job_status_reason, \
156     'StatusValue': jobStatus, \
157     'StatusEnterTime': job_last_time, \
158     'StatusDestination': dest, \
159     'RBname': WMS }
160     else:
161     params = {'taskId': taskId, \
162     'jobId': jobId,\
163     'sid': str(jid), \
164     'StatusValueReason': job_status_reason, \
165     'StatusValue': jobStatus, \
166     'StatusEnterTime': job_last_time, \
167     'StatusDestination': dest }
168     common.logger.debug(5,str(params))
169     common.apmon.sendToML(params)
170    
171     return
172 ewv 1.33
173 spiga 1.24 def joinIntArray_(self,array) :
174     output = ''
175     for item in array :
176     output += str(item)+','
177     if output[-1] == ',' :
178     output = output[:-1]
179     return output
180 slacapra 1.1