ViewVC Help
View File | Revision Log | Show Annotations | Root Listing
root/cvsroot/COMP/CRAB/python/DBinterface.py
Revision: 1.18
Committed: Sun Apr 20 17:04:52 2008 UTC (17 years ago) by spiga
Content type: text/x-python
Branch: MAIN
Changes since 1.17: +1 -1 lines
Log Message:
bug fix

File Contents

# User Rev Content
1 spiga 1.1 from crab_logger import Logger
2     from crab_exceptions import *
3     from crab_util import *
4     import common
5     import os, time, shutil
6 spiga 1.13 import traceback
7 spiga 1.1
8     from ProdCommon.BossLite.API.BossLiteAPI import BossLiteAPI
9 spiga 1.13 from ProdCommon.BossLite.Common.Exceptions import DbError
10     from ProdCommon.BossLite.Common.Exceptions import TaskError
11 spiga 1.1
12     from ProdCommon.BossLite.DbObjects.Job import Job
13     from ProdCommon.BossLite.DbObjects.Task import Task
14     from ProdCommon.BossLite.DbObjects.RunningJob import RunningJob
15    
16    
17     class DBinterface:
18 spiga 1.2 def __init__(self, cfg_params):
19 spiga 1.1
20 spiga 1.2 self.cfg_params = cfg_params
21    
22     self.db_type = cfg_params.get("USER.use_db",'SQLite')
23 spiga 1.1 return
24    
25    
26 spiga 1.2 def configureDB(self):
27 spiga 1.1
28     dbname = common.work_space.shareDir()+'crabDB'
29     dbConfig = {'dbName':dbname
30     }
31 spiga 1.13 try:
32     common.bossSession = BossLiteAPI( self.db_type, dbConfig)
33     except Exception, e :
34     raise CrabException('Istantiate DB Session : '+str(e))
35    
36     try:
37     common.bossSession.installDB('$CRABPRODCOMMONPYTHON/ProdCommon/BossLite/DbObjects/setupDatabase-sqlite.sql')
38     except Exception, e :
39     raise CrabException('DB Installation error : '+str(e))
40     return
41 spiga 1.1
42 spiga 1.2 def loadDB(self):
43    
44     dbname = common.work_space.shareDir()+'crabDB'
45     dbConfig = {'dbName':dbname
46     }
47 spiga 1.13 try:
48     common.bossSession = BossLiteAPI( self.db_type, dbConfig)
49     except Exception, e :
50     raise CrabException('Istantiate DB Session : '+str(e))
51    
52 spiga 1.2 return
53 spiga 1.4
54 spiga 1.13 def getTask(self, jobsList='all'):
55 spiga 1.9 """
56     Return task with all/list of jobs
57     """
58 spiga 1.13 try:
59     task = common.bossSession.load(1,jobsList)[0]
60     except Exception, e :
61     common.logger.debug(3, "Error while getting task : " +str(traceback.format_exc()))
62     raise CrabException('Error while getting task '+str(e))
63 spiga 1.9 return task
64 spiga 1.2
65 slacapra 1.6 def getJob(self, n):
66 spiga 1.9 """
67     Return a task with a single job
68     """
69 spiga 1.13 try:
70     task = common.bossSession.load(1,str(n))[0]
71     except Exception, e :
72     common.logger.debug(3, "Error while getting job : " +str(traceback.format_exc()))
73     raise CrabException('Error while getting job '+str(e))
74 spiga 1.9 return task
75 spiga 1.2
76 spiga 1.1
77     def createTask_(self, optsToSave):
78     """
79     Task declaration
80     with the first coniguration stuff
81     """
82     opt={}
83 slacapra 1.14 if optsToSave.get('server_mode',0) == 1: opt['serverName']=optsToSave['server_name']
84     opt['name']=common.work_space.taskName()
85 spiga 1.1 task = Task( opt )
86 spiga 1.13 try:
87     common.bossSession.saveTask( task )
88     except Exception, e :
89     # common.logger.debug(3, "Error creating task : " +str(traceback.format_exc()))
90     # raise CrabException('Error creating task '+str(e))
91     raise CrabException('Error creating task '+str(traceback.format_exc()))
92    
93 spiga 1.1 return
94    
95     def updateTask_(self,optsToSave):
96     """
97     Update task fields
98     """
99 spiga 1.13 task = self.getTask()
100    
101 spiga 1.1 for key in optsToSave.keys():
102     task[key] = optsToSave[key]
103 spiga 1.13 try:
104     common.bossSession.updateDB( task )
105     except Exception, e :
106     raise CrabException('Error updating task '+str(traceback.format_exc()))
107    
108 spiga 1.1 return
109    
110 spiga 1.9 def createJobs_(self, jobsL):
111 spiga 1.1 """
112     Fill crab DB with the jobs filed
113     """
114 spiga 1.13 task = self.getTask()
115    
116 spiga 1.1 jobs = []
117 spiga 1.9 for id in jobsL:
118 spiga 1.1 parameters = {}
119 spiga 1.9 parameters['jobId'] = str(id)
120 mcinquil 1.15 parameters['name'] = task['name'] + '_' + 'job' + str(id)
121 spiga 1.1 job = Job(parameters)
122 spiga 1.9 jobs.append(job)
123 spiga 1.1 task.addJobs(jobs)
124 spiga 1.13 try:
125     common.bossSession.updateDB( task )
126     except Exception, e :
127     raise CrabException('Error updating task '+str(traceback.format_exc()))
128    
129 spiga 1.1 return
130    
131 spiga 1.9 def updateJob_(self, jobsL, optsToSave):
132 spiga 1.1 """
133     Update Job fields
134     """
135 spiga 1.13 task = self.getTask(jobsL)
136 spiga 1.10 id =0
137     for job in task.jobs:
138 spiga 1.9 for key in optsToSave[id].keys():
139 spiga 1.10 job[key] = optsToSave[id][key]
140     id+=1
141 spiga 1.13 try:
142     common.bossSession.updateDB( task )
143     except Exception, e :
144     raise CrabException('Error updating task '+str(traceback.format_exc()))
145 spiga 1.1 return
146    
147 spiga 1.9 def updateRunJob_(self, jobsL, optsToSave):
148 spiga 1.1 """
149     Update Running Job fields
150     """
151 spiga 1.13 task = self.getTask(jobsL)
152    
153 spiga 1.12 id=0
154 spiga 1.10 for job in task.jobs:
155     common.bossSession.getRunningInstance(job)
156 spiga 1.12 for key in optsToSave[id].keys():
157     job.runningJob[key] = optsToSave[id][key]
158     id+=1
159 spiga 1.1 common.bossSession.updateDB( task )
160     return
161    
162 spiga 1.9 def nJobs(self,list=''):
163 spiga 1.2
164 spiga 1.13 task = self.getTask()
165 spiga 1.9 listId=[]
166     if list == 'list':
167     for job in task.jobs:listId.append(int(job['jobId']))
168     return listId
169     else:
170     return len(task.jobs)
171 spiga 1.1
172     def dump(self,jobs):
173     """
174     List a complete set of infos for a job/range of jobs
175     """
176 slacapra 1.17 task = self.getTask(jobs)
177 spiga 1.1
178 slacapra 1.17 Jobs = task.getJobs()
179     print "--------------------------"
180     for Job in Jobs:
181     print "Id: ",Job['id']
182     print "Dest: ", Job['dlsDestination']
183     print "Output: ", Job['outputFiles']
184     print "Args: ",Job['arguments']
185     print "--------------------------"
186 spiga 1.1 return
187 farinafa 1.8
188     def serializeTask(self, tmp_task = None):
189     if tmp_task is None:
190 spiga 1.13 tmp_task = self.getTask()
191 farinafa 1.8 return common.bossSession.serialize(tmp_task)
192 spiga 1.1
193     def queryID(self,server_mode=0):
194     '''
195     Return the taskId if serevr_mode =1
196     Return the joblistId if serevr_mode =0
197     '''
198     header=''
199     lines=[]
200 spiga 1.13 task = self.getTask()
201 spiga 1.1 if server_mode == 1:
202 spiga 1.2 header= "Task Id = %-40s " %(task['name'])
203 spiga 1.1 else:
204 spiga 1.11 for job in task.jobs:
205     toPrint=''
206     common.bossSession.getRunningInstance(job)
207     toPrint = "%-5s %-50s " % (job['id'],job.runningJob['schedulerId'])
208     lines.append(toPrint)
209     header+= "%-5s %-50s " % ('Job:','ID' )
210     displayReport(self,header,lines)
211 spiga 1.1 return
212    
213     def queryTask(self,attr):
214     '''
215     Perform a query over a generic task attribute
216     '''
217 spiga 1.13 task = self.getTask()
218 spiga 1.1 return task[attr]
219    
220 spiga 1.12 def queryJob(self, attr, jobsL):
221 spiga 1.1 '''
222     Perform a query for a range/all/single job
223     over a generic job attribute
224     '''
225     lines=[]
226 spiga 1.13 task = self.getTask(jobsL)
227 spiga 1.9 for job in task.jobs:
228     lines.append(eval(job[attr]))
229 spiga 1.1 return lines
230    
231 spiga 1.12 def queryRunJob(self, attr, jobsL):
232 spiga 1.1 '''
233     Perform a query for a range/all/single job
234     over a generic job attribute
235     '''
236     lines=[]
237 spiga 1.13 task = self.getTask(jobsL)
238 spiga 1.9 for job in task.jobs:
239     common.bossSession.getRunningInstance(job)
240     lines.append(job.runningJob[attr])
241 spiga 1.1 return lines
242 spiga 1.3
243     def queryDistJob(self, attr):
244     '''
245     Returns the list of distinct value for a given job attributes
246     '''
247     distAttr=[]
248 spiga 1.13 try:
249     task = common.bossSession.loadJobDist( 1, attr )
250     except Exception, e :
251     common.logger.debug(3, "Error loading Jobs By distinct Attr : " +str(traceback.format_exc()))
252     raise CrabException('Error loading Jobs By distinct Attr '+str(e))
253    
254 spiga 1.9 for i in task: distAttr.append(eval(i[attr]))
255 spiga 1.3 return distAttr
256    
257 spiga 1.5 def queryDistJob_Attr(self, attr_1, attr_2, list):
258 spiga 1.4 '''
259 spiga 1.9 Returns the list of distinct value for a given job attribute
260 spiga 1.4 '''
261     distAttr=[]
262 spiga 1.13 try:
263     task = common.bossSession.loadJobDistAttr( 1, attr_1, attr_2, list )
264     except Exception, e :
265     common.logger.debug(3, "Error loading Jobs By distinct Attr : " +str(traceback.format_exc()))
266     raise CrabException('Error loading Jobs By distinct Attr '+str(e))
267    
268 spiga 1.9 for i in task: distAttr.append(eval(i[attr_1]))
269 spiga 1.4 return distAttr
270 slacapra 1.6
271 spiga 1.3 def queryAttrJob(self, attr, field):
272     '''
273     Returns the list of jobs matching the given attribute
274     '''
275     matched=[]
276 spiga 1.13 try:
277     task = common.bossSession.loadJobsByAttr(attr )
278     except Exception, e :
279     common.logger.debug(3, "Error loading Jobs By Attr : " +str(traceback.format_exc()))
280     raise CrabException('Error loading Jobs By Attr '+str(e))
281 spiga 1.3 for i in task:
282     matched.append(i[field])
283     return matched
284    
285 spiga 1.7
286     def queryAttrRunJob(self, attr,field):
287     '''
288     Returns the list of jobs matching the given attribute
289     '''
290     matched=[]
291 spiga 1.13 try:
292     task = common.bossSession.loadJobsByRunningAttr(attr)
293     except Exception, e :
294     common.logger.debug(3, "Error loading Jobs By Running Attr : " +str(traceback.format_exc()))
295     raise CrabException('Error loading Jobs By Running Attr '+str(e))
296 spiga 1.7 for i in task:
297 spiga 1.18 matched.append(i.runningJob[field])
298 spiga 1.7 return matched
299 spiga 1.16
300     def deserXmlStatus(self, reportList):
301    
302     task = self.getTask()
303    
304     for job in task.jobs:
305     if not job.runningJob:
306     raise CrabException( "Missing running object for job %s"%str(job['id']) )
307    
308     id = str(job.runningJob['id'])
309     # TODO linear search, probably it can be optized with binary search
310     rForJ = None
311     for r in reportList:
312     if r.getAttribute('id') in [ id, 'all']:
313     rForJ = r
314     break
315    
316     # Data alignment
317     jobStatus = str(job.runningJob['statusScheduler'])
318     if rForJ.getAttribute('status') not in ['Created', 'Submitting']:
319     job.runningJob['statusScheduler'] = str( rForJ.getAttribute('status') )
320     jobStatus = str(job.runningJob['statusScheduler'])
321     job.runningJob['status'] = str( rForJ.getAttribute('sched_status') )
322    
323     job.runningJob['destination'] = str( rForJ.getAttribute('site') )
324     dest = str(job.runningJob['destination']).split(':')[0]
325    
326     job.runningJob['applicationReturnCode'] = str( rForJ.getAttribute('exe_exit') )
327     exe_exit_code = str(job.runningJob['applicationReturnCode'])
328    
329     job.runningJob['wrapperReturnCode'] = str( rForJ.getAttribute('job_exit') )
330     job_exit_code = str(job.runningJob['wrapperReturnCode'])
331    
332     if str( rForJ.getAttribute('resubmit') ).isdigit():
333     job['submissionNumber'] = int(rForJ.getAttribute('resubmit'))
334     # TODO cleared='0' field, how should it be handled/mapped in BL? #Fabio
335    
336     common.bossSession.updateDB( task )
337    
338     return