ViewVC Help
View File | Revision Log | Show Annotations | Root Listing
root/cvsroot/COMP/CRAB/python/DBinterface.py
Revision: 1.47
Committed: Tue Sep 30 22:16:46 2008 UTC (16 years, 6 months ago) by mcinquil
Content type: text/x-python
Branch: MAIN
CVS Tags: CRAB_2_4_2_pre3, CRAB_2_4_2_pre2, CRAB_2_4_2_pre1, CRAB_2_4_1, CRAB_2_4_1_pre4, CRAB_2_4_1_pre3, CRAB_2_4_1_pre2, CRAB_2_4_1_pre1, CRAB_2_4_0_Tutorial, CRAB_2_4_0_Tutorial_pre1, CRAB_2_4_0, CRAB_2_4_0_pre9, CRAB_2_4_0_pre8, CRAB_2_4_0_pre7, CRAB_2_4_0_pre6, CRAB_2_4_0_pre5
Changes since 1.46: +1 -2 lines
Log Message:
Removing Submitting status and special check

File Contents

# User Rev Content
1 spiga 1.1 from crab_logger import Logger
2     from crab_exceptions import *
3     from crab_util import *
4     import common
5     import os, time, shutil
6 spiga 1.13 import traceback
7 spiga 1.1
8     from ProdCommon.BossLite.API.BossLiteAPI import BossLiteAPI
9 spiga 1.13 from ProdCommon.BossLite.Common.Exceptions import DbError
10     from ProdCommon.BossLite.Common.Exceptions import TaskError
11 spiga 1.1
12     from ProdCommon.BossLite.DbObjects.Job import Job
13     from ProdCommon.BossLite.DbObjects.Task import Task
14     from ProdCommon.BossLite.DbObjects.RunningJob import RunningJob
15    
16    
17     class DBinterface:
18 spiga 1.2 def __init__(self, cfg_params):
19 spiga 1.1
20 spiga 1.2 self.cfg_params = cfg_params
21    
22     self.db_type = cfg_params.get("USER.use_db",'SQLite')
23 spiga 1.1 return
24    
25    
26 spiga 1.2 def configureDB(self):
27 spiga 1.1
28     dbname = common.work_space.shareDir()+'crabDB'
29     dbConfig = {'dbName':dbname
30     }
31 spiga 1.13 try:
32     common.bossSession = BossLiteAPI( self.db_type, dbConfig)
33     except Exception, e :
34     raise CrabException('Istantiate DB Session : '+str(e))
35    
36     try:
37 spiga 1.30 common.bossSession.bossLiteDB.installDB('$CRABPRODCOMMONPYTHON/ProdCommon/BossLite/DbObjects/setupDatabase-sqlite.sql')
38 spiga 1.13 except Exception, e :
39     raise CrabException('DB Installation error : '+str(e))
40     return
41 spiga 1.1
42 spiga 1.2 def loadDB(self):
43    
44     dbname = common.work_space.shareDir()+'crabDB'
45     dbConfig = {'dbName':dbname
46     }
47 spiga 1.13 try:
48     common.bossSession = BossLiteAPI( self.db_type, dbConfig)
49     except Exception, e :
50     raise CrabException('Istantiate DB Session : '+str(e))
51    
52 spiga 1.2 return
53 spiga 1.4
54 spiga 1.13 def getTask(self, jobsList='all'):
55 spiga 1.9 """
56     Return task with all/list of jobs
57     """
58 spiga 1.13 try:
59 spiga 1.45 task = common.bossSession.load(1,jobsList)
60 spiga 1.13 except Exception, e :
61     common.logger.debug(3, "Error while getting task : " +str(traceback.format_exc()))
62     raise CrabException('Error while getting task '+str(e))
63 spiga 1.9 return task
64 spiga 1.2
65 slacapra 1.6 def getJob(self, n):
66 spiga 1.9 """
67     Return a task with a single job
68     """
69 spiga 1.13 try:
70 spiga 1.45 task = common.bossSession.load(1,str(n))
71 spiga 1.13 except Exception, e :
72     common.logger.debug(3, "Error while getting job : " +str(traceback.format_exc()))
73     raise CrabException('Error while getting job '+str(e))
74 spiga 1.9 return task
75 spiga 1.2
76 spiga 1.1
77     def createTask_(self, optsToSave):
78     """
79     Task declaration
80     with the first coniguration stuff
81     """
82     opt={}
83 slacapra 1.14 if optsToSave.get('server_mode',0) == 1: opt['serverName']=optsToSave['server_name']
84     opt['name']=common.work_space.taskName()
85 spiga 1.1 task = Task( opt )
86 spiga 1.13 try:
87     common.bossSession.saveTask( task )
88     except Exception, e :
89     # common.logger.debug(3, "Error creating task : " +str(traceback.format_exc()))
90     # raise CrabException('Error creating task '+str(e))
91     raise CrabException('Error creating task '+str(traceback.format_exc()))
92    
93 spiga 1.1 return
94    
95     def updateTask_(self,optsToSave):
96     """
97     Update task fields
98     """
99 spiga 1.13 task = self.getTask()
100    
101 spiga 1.1 for key in optsToSave.keys():
102     task[key] = optsToSave[key]
103 spiga 1.13 try:
104     common.bossSession.updateDB( task )
105     except Exception, e :
106     raise CrabException('Error updating task '+str(traceback.format_exc()))
107    
108 spiga 1.1 return
109    
110 spiga 1.34 def createJobs_(self, jobsL, isNew=True):
111 spiga 1.1 """
112     Fill crab DB with the jobs filed
113     """
114 spiga 1.13 task = self.getTask()
115    
116 spiga 1.1 jobs = []
117 spiga 1.9 for id in jobsL:
118 spiga 1.1 parameters = {}
119 spiga 1.34 parameters['jobId'] = int(id)
120     parameters['taskId'] = 1
121 mcinquil 1.15 parameters['name'] = task['name'] + '_' + 'job' + str(id)
122 spiga 1.1 job = Job(parameters)
123 spiga 1.20 jobs.append(job)
124     common.bossSession.getRunningInstance(job)
125     job.runningJob['status'] = 'C'
126 spiga 1.34 ## added to support second step creation
127     ## maybe it is not needed. TO CLARIFY
128     if isNew:
129     task.addJobs(jobs)
130     else:
131     task.appendJobs(jobs)
132 spiga 1.13 try:
133     common.bossSession.updateDB( task )
134     except Exception, e :
135     raise CrabException('Error updating task '+str(traceback.format_exc()))
136    
137 spiga 1.1 return
138    
139 spiga 1.9 def updateJob_(self, jobsL, optsToSave):
140 spiga 1.1 """
141     Update Job fields
142     """
143 spiga 1.13 task = self.getTask(jobsL)
144 spiga 1.10 id =0
145     for job in task.jobs:
146 spiga 1.9 for key in optsToSave[id].keys():
147 spiga 1.10 job[key] = optsToSave[id][key]
148     id+=1
149 spiga 1.13 try:
150     common.bossSession.updateDB( task )
151     except Exception, e :
152     raise CrabException('Error updating task '+str(traceback.format_exc()))
153 spiga 1.1 return
154    
155 spiga 1.9 def updateRunJob_(self, jobsL, optsToSave):
156 spiga 1.1 """
157     Update Running Job fields
158     """
159 spiga 1.13 task = self.getTask(jobsL)
160    
161 spiga 1.12 id=0
162 spiga 1.10 for job in task.jobs:
163     common.bossSession.getRunningInstance(job)
164 spiga 1.12 for key in optsToSave[id].keys():
165     job.runningJob[key] = optsToSave[id][key]
166     id+=1
167 spiga 1.1 common.bossSession.updateDB( task )
168     return
169    
170 spiga 1.9 def nJobs(self,list=''):
171 spiga 1.2
172 spiga 1.13 task = self.getTask()
173 spiga 1.9 listId=[]
174     if list == 'list':
175     for job in task.jobs:listId.append(int(job['jobId']))
176     return listId
177     else:
178     return len(task.jobs)
179 spiga 1.1
180     def dump(self,jobs):
181     """
182     List a complete set of infos for a job/range of jobs
183     """
184 slacapra 1.17 task = self.getTask(jobs)
185 spiga 1.1
186 slacapra 1.17 print "--------------------------"
187 spiga 1.32 for Job in task.jobs:
188 spiga 1.24 print "Id: ",Job['jobId']
189 slacapra 1.17 print "Dest: ", Job['dlsDestination']
190     print "Output: ", Job['outputFiles']
191     print "Args: ",Job['arguments']
192 spiga 1.32 print "Service: ",Job.runningJob['service']
193 slacapra 1.17 print "--------------------------"
194 spiga 1.1 return
195 farinafa 1.8
196     def serializeTask(self, tmp_task = None):
197     if tmp_task is None:
198 spiga 1.13 tmp_task = self.getTask()
199 farinafa 1.8 return common.bossSession.serialize(tmp_task)
200 spiga 1.1
201 spiga 1.43 def queryID(self,server_mode=0, jid=False):
202 spiga 1.1 '''
203     Return the taskId if serevr_mode =1
204     Return the joblistId if serevr_mode =0
205     '''
206     header=''
207     lines=[]
208 spiga 1.13 task = self.getTask()
209 spiga 1.1 if server_mode == 1:
210 spiga 1.43 headerTask= "Task Id = %-40s " %(task['name'])
211     displayReport(self,headerTask,lines)
212     if (jid ) or (server_mode == 0):
213 spiga 1.11 for job in task.jobs:
214     toPrint=''
215     common.bossSession.getRunningInstance(job)
216 spiga 1.24 toPrint = "%-5s %-50s " % (job['jobId'],job.runningJob['schedulerId'])
217 spiga 1.11 lines.append(toPrint)
218     header+= "%-5s %-50s " % ('Job:','ID' )
219 spiga 1.43 displayReport(self,header,lines)
220 spiga 1.1 return
221    
222     def queryTask(self,attr):
223     '''
224     Perform a query over a generic task attribute
225     '''
226 spiga 1.13 task = self.getTask()
227 spiga 1.1 return task[attr]
228    
229 spiga 1.12 def queryJob(self, attr, jobsL):
230 spiga 1.1 '''
231     Perform a query for a range/all/single job
232     over a generic job attribute
233     '''
234     lines=[]
235 spiga 1.13 task = self.getTask(jobsL)
236 spiga 1.9 for job in task.jobs:
237 spiga 1.19 lines.append(job[attr])
238 spiga 1.1 return lines
239    
240 spiga 1.12 def queryRunJob(self, attr, jobsL):
241 spiga 1.1 '''
242     Perform a query for a range/all/single job
243     over a generic job attribute
244     '''
245     lines=[]
246 spiga 1.13 task = self.getTask(jobsL)
247 spiga 1.9 for job in task.jobs:
248     common.bossSession.getRunningInstance(job)
249     lines.append(job.runningJob[attr])
250 spiga 1.1 return lines
251 spiga 1.3
252     def queryDistJob(self, attr):
253     '''
254     Returns the list of distinct value for a given job attributes
255     '''
256     distAttr=[]
257 spiga 1.13 try:
258     task = common.bossSession.loadJobDist( 1, attr )
259     except Exception, e :
260     common.logger.debug(3, "Error loading Jobs By distinct Attr : " +str(traceback.format_exc()))
261     raise CrabException('Error loading Jobs By distinct Attr '+str(e))
262    
263 spiga 1.19 for i in task: distAttr.append(i[attr])
264 spiga 1.3 return distAttr
265    
266 spiga 1.5 def queryDistJob_Attr(self, attr_1, attr_2, list):
267 spiga 1.4 '''
268 spiga 1.9 Returns the list of distinct value for a given job attribute
269 spiga 1.4 '''
270     distAttr=[]
271 spiga 1.13 try:
272     task = common.bossSession.loadJobDistAttr( 1, attr_1, attr_2, list )
273     except Exception, e :
274     common.logger.debug(3, "Error loading Jobs By distinct Attr : " +str(traceback.format_exc()))
275     raise CrabException('Error loading Jobs By distinct Attr '+str(e))
276    
277 spiga 1.19 for i in task: distAttr.append(i[attr_1])
278 spiga 1.4 return distAttr
279 slacapra 1.6
280 spiga 1.3 def queryAttrJob(self, attr, field):
281     '''
282     Returns the list of jobs matching the given attribute
283     '''
284     matched=[]
285 spiga 1.13 try:
286     task = common.bossSession.loadJobsByAttr(attr )
287     except Exception, e :
288     common.logger.debug(3, "Error loading Jobs By Attr : " +str(traceback.format_exc()))
289     raise CrabException('Error loading Jobs By Attr '+str(e))
290 spiga 1.3 for i in task:
291     matched.append(i[field])
292     return matched
293    
294 spiga 1.7
295     def queryAttrRunJob(self, attr,field):
296     '''
297     Returns the list of jobs matching the given attribute
298     '''
299     matched=[]
300 spiga 1.13 try:
301     task = common.bossSession.loadJobsByRunningAttr(attr)
302     except Exception, e :
303     common.logger.debug(3, "Error loading Jobs By Running Attr : " +str(traceback.format_exc()))
304     raise CrabException('Error loading Jobs By Running Attr '+str(e))
305 spiga 1.7 for i in task:
306 spiga 1.18 matched.append(i.runningJob[field])
307 spiga 1.7 return matched
308 spiga 1.16
309 spiga 1.22 def newRunJobs(self,nj='all'):
310     """
311     Get new running instances
312     """
313     task = self.getTask(nj)
314    
315     for job in task.jobs:
316 spiga 1.24 common.bossSession.getNewRunningInstance(job)
317     job.runningJob['status'] = 'C'
318     job.runningJob['statusScheduler'] = 'Created'
319     common.bossSession.updateDB(task)
320 spiga 1.22 return
321    
322 spiga 1.16 def deserXmlStatus(self, reportList):
323    
324     task = self.getTask()
325     for job in task.jobs:
326     if not job.runningJob:
327 spiga 1.25 raise CrabException( "Missing running object for job %s"%str(job['jobId']) )
328 farinafa 1.37
329 spiga 1.25 id = str(job.runningJob['jobId'])
330 spiga 1.16 rForJ = None
331 spiga 1.33 nj_list= []
332 spiga 1.16 for r in reportList:
333     if r.getAttribute('id') in [ id, 'all']:
334     rForJ = r
335     break
336 spiga 1.33 ## Check the submission number and create new running jobs on the client side
337 mcinquil 1.46 if rForJ.getAttribute('resubmit') != 'None' and (rForJ.getAttribute('status') not in ['Killed','Done']) :
338 spiga 1.43 if int(job.runningJob['submission']) < int(rForJ.getAttribute('resubmit')) + 1:
339     nj_list.append(id)
340     self.newRunJobs(nj_list)
341 spiga 1.33
342     task_new = self.getTask()
343 spiga 1.16
344 spiga 1.33 for job in task_new.jobs:
345     id = str(job.runningJob['jobId'])
346     # TODO linear search, probably it can be optized with binary search
347     rForJ = None
348     for r in reportList:
349     if r.getAttribute('id') in [ id, 'all']:
350     rForJ = r
351 farinafa 1.37 break
352 spiga 1.43
353 spiga 1.16 # Data alignment
354 mcinquil 1.47 if rForJ.getAttribute('status') not in ['Created', 'Unknown']:
355 spiga 1.38 # update the status
356     common.logger.debug(3,"Updating DB status for job: " + str(id) + " @: " \
357     + str(rForJ.getAttribute('status')) )
358     job.runningJob['statusScheduler'] = str( rForJ.getAttribute('status') )
359 mcinquil 1.42 if (rForJ.getAttribute('status') == 'Done' or rForJ.getAttribute('status') == 'Done (Failed)')\
360     and rForJ.getAttribute('sched_status') == 'E' :
361 spiga 1.41 job.runningJob['status'] = 'SD'
362     else:
363     job.runningJob['status'] = str( rForJ.getAttribute('sched_status') )
364 spiga 1.35
365 spiga 1.43 job.runningJob['schedulerId'] = str( rForJ.getAttribute('sched_id') )
366    
367 spiga 1.39 job.runningJob['destination'] = str( rForJ.getAttribute('site') )
368     dest = str(job.runningJob['destination']).split(':')[0]
369    
370     job.runningJob['applicationReturnCode'] = str( rForJ.getAttribute('exe_exit') )
371     exe_exit_code = str(job.runningJob['applicationReturnCode'])
372    
373     job.runningJob['wrapperReturnCode'] = str( rForJ.getAttribute('job_exit') )
374     job_exit_code = str(job.runningJob['wrapperReturnCode'])
375 spiga 1.35
376 spiga 1.38 #if str( rForJ.getAttribute('resubmit') ).isdigit():
377     # job['submissionNumber'] = int(rForJ.getAttribute('resubmit'))
378     # job.runningJob['submission'] = int(rForJ.getAttribute('resubmit'))
379 spiga 1.35
380 spiga 1.38 # TODO cleared='0' field, how should it be handled/mapped in BL? #Fabio
381 spiga 1.16
382 spiga 1.33 common.bossSession.updateDB( task_new )
383 farinafa 1.27 return
384 spiga 1.30
385     # FIXME temporary method to verify what kind of submission to perform towards the server
386     def checkIfNeverSubmittedBefore(self):
387     for j in self.getTask().jobs:
388 farinafa 1.31 if j.runningJob['submission'] > 1 or j.runningJob['status'] != 'C' or \
389     j.runningJob['statusScheduler'] != 'Created':
390 spiga 1.30 return False
391     return True
392    
393