ViewVC Help
View File | Revision Log | Show Annotations | Root Listing
root/cvsroot/COMP/CRAB/python/DBinterface.py
Revision: 1.39
Committed: Mon Jul 7 13:30:59 2008 UTC (16 years, 9 months ago) by spiga
Content type: text/x-python
Branch: MAIN
Changes since 1.38: +10 -10 lines
Log Message:
fixes for Status reporting while using the server. Plus some print adjustment

File Contents

# User Rev Content
1 spiga 1.1 from crab_logger import Logger
2     from crab_exceptions import *
3     from crab_util import *
4     import common
5     import os, time, shutil
6 spiga 1.13 import traceback
7 spiga 1.1
8     from ProdCommon.BossLite.API.BossLiteAPI import BossLiteAPI
9 spiga 1.13 from ProdCommon.BossLite.Common.Exceptions import DbError
10     from ProdCommon.BossLite.Common.Exceptions import TaskError
11 spiga 1.1
12     from ProdCommon.BossLite.DbObjects.Job import Job
13     from ProdCommon.BossLite.DbObjects.Task import Task
14     from ProdCommon.BossLite.DbObjects.RunningJob import RunningJob
15    
16    
17     class DBinterface:
18 spiga 1.2 def __init__(self, cfg_params):
19 spiga 1.1
20 spiga 1.2 self.cfg_params = cfg_params
21    
22     self.db_type = cfg_params.get("USER.use_db",'SQLite')
23 spiga 1.1 return
24    
25    
26 spiga 1.2 def configureDB(self):
27 spiga 1.1
28     dbname = common.work_space.shareDir()+'crabDB'
29     dbConfig = {'dbName':dbname
30     }
31 spiga 1.13 try:
32     common.bossSession = BossLiteAPI( self.db_type, dbConfig)
33     except Exception, e :
34     raise CrabException('Istantiate DB Session : '+str(e))
35    
36     try:
37 spiga 1.30 common.bossSession.bossLiteDB.installDB('$CRABPRODCOMMONPYTHON/ProdCommon/BossLite/DbObjects/setupDatabase-sqlite.sql')
38 spiga 1.13 except Exception, e :
39     raise CrabException('DB Installation error : '+str(e))
40     return
41 spiga 1.1
42 spiga 1.2 def loadDB(self):
43    
44     dbname = common.work_space.shareDir()+'crabDB'
45     dbConfig = {'dbName':dbname
46     }
47 spiga 1.13 try:
48     common.bossSession = BossLiteAPI( self.db_type, dbConfig)
49     except Exception, e :
50     raise CrabException('Istantiate DB Session : '+str(e))
51    
52 spiga 1.2 return
53 spiga 1.4
54 spiga 1.13 def getTask(self, jobsList='all'):
55 spiga 1.9 """
56     Return task with all/list of jobs
57     """
58 spiga 1.13 try:
59     task = common.bossSession.load(1,jobsList)[0]
60     except Exception, e :
61     common.logger.debug(3, "Error while getting task : " +str(traceback.format_exc()))
62     raise CrabException('Error while getting task '+str(e))
63 spiga 1.9 return task
64 spiga 1.2
65 slacapra 1.6 def getJob(self, n):
66 spiga 1.9 """
67     Return a task with a single job
68     """
69 spiga 1.13 try:
70     task = common.bossSession.load(1,str(n))[0]
71     except Exception, e :
72     common.logger.debug(3, "Error while getting job : " +str(traceback.format_exc()))
73     raise CrabException('Error while getting job '+str(e))
74 spiga 1.9 return task
75 spiga 1.2
76 spiga 1.1
77     def createTask_(self, optsToSave):
78     """
79     Task declaration
80     with the first coniguration stuff
81     """
82     opt={}
83 slacapra 1.14 if optsToSave.get('server_mode',0) == 1: opt['serverName']=optsToSave['server_name']
84     opt['name']=common.work_space.taskName()
85 spiga 1.1 task = Task( opt )
86 spiga 1.13 try:
87     common.bossSession.saveTask( task )
88     except Exception, e :
89     # common.logger.debug(3, "Error creating task : " +str(traceback.format_exc()))
90     # raise CrabException('Error creating task '+str(e))
91     raise CrabException('Error creating task '+str(traceback.format_exc()))
92    
93 spiga 1.1 return
94    
95     def updateTask_(self,optsToSave):
96     """
97     Update task fields
98     """
99 spiga 1.13 task = self.getTask()
100    
101 spiga 1.1 for key in optsToSave.keys():
102     task[key] = optsToSave[key]
103 spiga 1.13 try:
104     common.bossSession.updateDB( task )
105     except Exception, e :
106     raise CrabException('Error updating task '+str(traceback.format_exc()))
107    
108 spiga 1.1 return
109    
110 spiga 1.34 def createJobs_(self, jobsL, isNew=True):
111 spiga 1.1 """
112     Fill crab DB with the jobs filed
113     """
114 spiga 1.13 task = self.getTask()
115    
116 spiga 1.1 jobs = []
117 spiga 1.9 for id in jobsL:
118 spiga 1.1 parameters = {}
119 spiga 1.34 parameters['jobId'] = int(id)
120     parameters['taskId'] = 1
121 mcinquil 1.15 parameters['name'] = task['name'] + '_' + 'job' + str(id)
122 spiga 1.1 job = Job(parameters)
123 spiga 1.20 jobs.append(job)
124     common.bossSession.getRunningInstance(job)
125     job.runningJob['status'] = 'C'
126 spiga 1.34 ## added to support second step creation
127     ## maybe it is not needed. TO CLARIFY
128     if isNew:
129     task.addJobs(jobs)
130     else:
131     task.appendJobs(jobs)
132 spiga 1.13 try:
133     common.bossSession.updateDB( task )
134     except Exception, e :
135     raise CrabException('Error updating task '+str(traceback.format_exc()))
136    
137 spiga 1.1 return
138    
139 spiga 1.9 def updateJob_(self, jobsL, optsToSave):
140 spiga 1.1 """
141     Update Job fields
142     """
143 spiga 1.13 task = self.getTask(jobsL)
144 spiga 1.10 id =0
145     for job in task.jobs:
146 spiga 1.9 for key in optsToSave[id].keys():
147 spiga 1.10 job[key] = optsToSave[id][key]
148     id+=1
149 spiga 1.13 try:
150     common.bossSession.updateDB( task )
151     except Exception, e :
152     raise CrabException('Error updating task '+str(traceback.format_exc()))
153 spiga 1.1 return
154    
155 spiga 1.9 def updateRunJob_(self, jobsL, optsToSave):
156 spiga 1.1 """
157     Update Running Job fields
158     """
159 spiga 1.13 task = self.getTask(jobsL)
160    
161 spiga 1.12 id=0
162 spiga 1.10 for job in task.jobs:
163     common.bossSession.getRunningInstance(job)
164 spiga 1.12 for key in optsToSave[id].keys():
165     job.runningJob[key] = optsToSave[id][key]
166     id+=1
167 spiga 1.1 common.bossSession.updateDB( task )
168     return
169    
170 spiga 1.9 def nJobs(self,list=''):
171 spiga 1.2
172 spiga 1.13 task = self.getTask()
173 spiga 1.9 listId=[]
174     if list == 'list':
175     for job in task.jobs:listId.append(int(job['jobId']))
176     return listId
177     else:
178     return len(task.jobs)
179 spiga 1.1
180     def dump(self,jobs):
181     """
182     List a complete set of infos for a job/range of jobs
183     """
184 slacapra 1.17 task = self.getTask(jobs)
185 spiga 1.1
186 slacapra 1.17 print "--------------------------"
187 spiga 1.32 for Job in task.jobs:
188 spiga 1.24 print "Id: ",Job['jobId']
189 slacapra 1.17 print "Dest: ", Job['dlsDestination']
190     print "Output: ", Job['outputFiles']
191     print "Args: ",Job['arguments']
192 spiga 1.32 print "Service: ",Job.runningJob['service']
193 slacapra 1.17 print "--------------------------"
194 spiga 1.1 return
195 farinafa 1.8
196     def serializeTask(self, tmp_task = None):
197     if tmp_task is None:
198 spiga 1.13 tmp_task = self.getTask()
199 farinafa 1.8 return common.bossSession.serialize(tmp_task)
200 spiga 1.1
201     def queryID(self,server_mode=0):
202     '''
203     Return the taskId if serevr_mode =1
204     Return the joblistId if serevr_mode =0
205     '''
206     header=''
207     lines=[]
208 spiga 1.13 task = self.getTask()
209 spiga 1.1 if server_mode == 1:
210 spiga 1.2 header= "Task Id = %-40s " %(task['name'])
211 spiga 1.1 else:
212 spiga 1.11 for job in task.jobs:
213     toPrint=''
214     common.bossSession.getRunningInstance(job)
215 spiga 1.24 toPrint = "%-5s %-50s " % (job['jobId'],job.runningJob['schedulerId'])
216 spiga 1.11 lines.append(toPrint)
217     header+= "%-5s %-50s " % ('Job:','ID' )
218     displayReport(self,header,lines)
219 spiga 1.1 return
220    
221     def queryTask(self,attr):
222     '''
223     Perform a query over a generic task attribute
224     '''
225 spiga 1.13 task = self.getTask()
226 spiga 1.1 return task[attr]
227    
228 spiga 1.12 def queryJob(self, attr, jobsL):
229 spiga 1.1 '''
230     Perform a query for a range/all/single job
231     over a generic job attribute
232     '''
233     lines=[]
234 spiga 1.13 task = self.getTask(jobsL)
235 spiga 1.9 for job in task.jobs:
236 spiga 1.19 lines.append(job[attr])
237 spiga 1.1 return lines
238    
239 spiga 1.12 def queryRunJob(self, attr, jobsL):
240 spiga 1.1 '''
241     Perform a query for a range/all/single job
242     over a generic job attribute
243     '''
244     lines=[]
245 spiga 1.13 task = self.getTask(jobsL)
246 spiga 1.9 for job in task.jobs:
247     common.bossSession.getRunningInstance(job)
248     lines.append(job.runningJob[attr])
249 spiga 1.1 return lines
250 spiga 1.3
251     def queryDistJob(self, attr):
252     '''
253     Returns the list of distinct value for a given job attributes
254     '''
255     distAttr=[]
256 spiga 1.13 try:
257     task = common.bossSession.loadJobDist( 1, attr )
258     except Exception, e :
259     common.logger.debug(3, "Error loading Jobs By distinct Attr : " +str(traceback.format_exc()))
260     raise CrabException('Error loading Jobs By distinct Attr '+str(e))
261    
262 spiga 1.19 for i in task: distAttr.append(i[attr])
263 spiga 1.3 return distAttr
264    
265 spiga 1.5 def queryDistJob_Attr(self, attr_1, attr_2, list):
266 spiga 1.4 '''
267 spiga 1.9 Returns the list of distinct value for a given job attribute
268 spiga 1.4 '''
269     distAttr=[]
270 spiga 1.13 try:
271     task = common.bossSession.loadJobDistAttr( 1, attr_1, attr_2, list )
272     except Exception, e :
273     common.logger.debug(3, "Error loading Jobs By distinct Attr : " +str(traceback.format_exc()))
274     raise CrabException('Error loading Jobs By distinct Attr '+str(e))
275    
276 spiga 1.19 for i in task: distAttr.append(i[attr_1])
277 spiga 1.4 return distAttr
278 slacapra 1.6
279 spiga 1.3 def queryAttrJob(self, attr, field):
280     '''
281     Returns the list of jobs matching the given attribute
282     '''
283     matched=[]
284 spiga 1.13 try:
285     task = common.bossSession.loadJobsByAttr(attr )
286     except Exception, e :
287     common.logger.debug(3, "Error loading Jobs By Attr : " +str(traceback.format_exc()))
288     raise CrabException('Error loading Jobs By Attr '+str(e))
289 spiga 1.3 for i in task:
290     matched.append(i[field])
291     return matched
292    
293 spiga 1.7
294     def queryAttrRunJob(self, attr,field):
295     '''
296     Returns the list of jobs matching the given attribute
297     '''
298     matched=[]
299 spiga 1.13 try:
300     task = common.bossSession.loadJobsByRunningAttr(attr)
301     except Exception, e :
302     common.logger.debug(3, "Error loading Jobs By Running Attr : " +str(traceback.format_exc()))
303     raise CrabException('Error loading Jobs By Running Attr '+str(e))
304 spiga 1.7 for i in task:
305 spiga 1.18 matched.append(i.runningJob[field])
306 spiga 1.7 return matched
307 spiga 1.16
308 spiga 1.22 def newRunJobs(self,nj='all'):
309     """
310     Get new running instances
311     """
312     task = self.getTask(nj)
313    
314     for job in task.jobs:
315 spiga 1.24 common.bossSession.getNewRunningInstance(job)
316     job.runningJob['status'] = 'C'
317     job.runningJob['statusScheduler'] = 'Created'
318     common.bossSession.updateDB(task)
319 spiga 1.22 return
320    
321 spiga 1.16 def deserXmlStatus(self, reportList):
322    
323     task = self.getTask()
324     for job in task.jobs:
325     if not job.runningJob:
326 spiga 1.25 raise CrabException( "Missing running object for job %s"%str(job['jobId']) )
327 farinafa 1.37
328 spiga 1.25 id = str(job.runningJob['jobId'])
329 spiga 1.16 rForJ = None
330 spiga 1.33 nj_list= []
331 spiga 1.16 for r in reportList:
332     if r.getAttribute('id') in [ id, 'all']:
333     rForJ = r
334     break
335 spiga 1.33 ## Check the submission number and create new running jobs on the client side
336     if int(job.runningJob['submission']) < int(rForJ.getAttribute('resubmit')) + 1:
337     nj_list.append(id)
338     self.newRunJobs(nj_list)
339    
340     task_new = self.getTask()
341 spiga 1.16
342 spiga 1.33 for job in task_new.jobs:
343     id = str(job.runningJob['jobId'])
344     # TODO linear search, probably it can be optized with binary search
345     rForJ = None
346     for r in reportList:
347     if r.getAttribute('id') in [ id, 'all']:
348     rForJ = r
349 farinafa 1.37 break
350    
351 spiga 1.16 # Data alignment
352 spiga 1.39 if rForJ.getAttribute('status') not in ['Created', 'Unknown'] and not\
353     (job.runningJob['statusScheduler'] == 'Killing' and rForJ.getAttribute('status')!='Killed') \
354 spiga 1.38 and not (job.runningJob['statusScheduler'] in 'Submitting' and rForJ.getAttribute('status') in ['Killed','Aborted','Cleared']) :
355     # update the status
356     common.logger.debug(3,"Updating DB status for job: " + str(id) + " @: " \
357     + str(rForJ.getAttribute('status')) )
358     job.runningJob['statusScheduler'] = str( rForJ.getAttribute('status') )
359     job.runningJob['status'] = str( rForJ.getAttribute('sched_status') )
360 spiga 1.35
361 spiga 1.39 job.runningJob['destination'] = str( rForJ.getAttribute('site') )
362     dest = str(job.runningJob['destination']).split(':')[0]
363    
364     job.runningJob['applicationReturnCode'] = str( rForJ.getAttribute('exe_exit') )
365     exe_exit_code = str(job.runningJob['applicationReturnCode'])
366    
367     job.runningJob['wrapperReturnCode'] = str( rForJ.getAttribute('job_exit') )
368     job_exit_code = str(job.runningJob['wrapperReturnCode'])
369 spiga 1.35
370 spiga 1.38 #if str( rForJ.getAttribute('resubmit') ).isdigit():
371     # job['submissionNumber'] = int(rForJ.getAttribute('resubmit'))
372     # job.runningJob['submission'] = int(rForJ.getAttribute('resubmit'))
373 spiga 1.35
374 spiga 1.38 # TODO cleared='0' field, how should it be handled/mapped in BL? #Fabio
375 spiga 1.16
376 spiga 1.33 common.bossSession.updateDB( task_new )
377 farinafa 1.27 return
378 spiga 1.30
379     # FIXME temporary method to verify what kind of submission to perform towards the server
380     def checkIfNeverSubmittedBefore(self):
381     for j in self.getTask().jobs:
382 farinafa 1.31 if j.runningJob['submission'] > 1 or j.runningJob['status'] != 'C' or \
383     j.runningJob['statusScheduler'] != 'Created':
384 spiga 1.30 return False
385     return True
386    
387