ViewVC Help
View File | Revision Log | Show Annotations | Root Listing
root/cvsroot/COMP/CRAB/python/DBinterface.py
Revision: 1.29
Committed: Tue May 20 20:48:08 2008 UTC (16 years, 11 months ago) by spiga
Content type: text/x-python
Branch: MAIN
CVS Tags: CRAB_2_2_1_pre2
Changes since 1.28: +1 -10 lines
Log Message:
temporary rollback

File Contents

# User Rev Content
1 spiga 1.1 from crab_logger import Logger
2     from crab_exceptions import *
3     from crab_util import *
4     import common
5     import os, time, shutil
6 spiga 1.13 import traceback
7 spiga 1.1
8     from ProdCommon.BossLite.API.BossLiteAPI import BossLiteAPI
9 spiga 1.13 from ProdCommon.BossLite.Common.Exceptions import DbError
10     from ProdCommon.BossLite.Common.Exceptions import TaskError
11 spiga 1.1
12     from ProdCommon.BossLite.DbObjects.Job import Job
13     from ProdCommon.BossLite.DbObjects.Task import Task
14     from ProdCommon.BossLite.DbObjects.RunningJob import RunningJob
15    
16    
17     class DBinterface:
18 spiga 1.2 def __init__(self, cfg_params):
19 spiga 1.1
20 spiga 1.2 self.cfg_params = cfg_params
21    
22     self.db_type = cfg_params.get("USER.use_db",'SQLite')
23 spiga 1.1 return
24    
25    
26 spiga 1.2 def configureDB(self):
27 spiga 1.1
28     dbname = common.work_space.shareDir()+'crabDB'
29     dbConfig = {'dbName':dbname
30     }
31 spiga 1.13 try:
32     common.bossSession = BossLiteAPI( self.db_type, dbConfig)
33     except Exception, e :
34     raise CrabException('Istantiate DB Session : '+str(e))
35    
36     try:
37 spiga 1.29 common.bossSession.installDB('$CRABPRODCOMMONPYTHON/ProdCommon/BossLite/DbObjects/setupDatabase-sqlite.sql')
38 spiga 1.13 except Exception, e :
39     raise CrabException('DB Installation error : '+str(e))
40     return
41 spiga 1.1
42 spiga 1.2 def loadDB(self):
43    
44     dbname = common.work_space.shareDir()+'crabDB'
45     dbConfig = {'dbName':dbname
46     }
47 spiga 1.13 try:
48     common.bossSession = BossLiteAPI( self.db_type, dbConfig)
49     except Exception, e :
50     raise CrabException('Istantiate DB Session : '+str(e))
51    
52 spiga 1.2 return
53 spiga 1.4
54 spiga 1.13 def getTask(self, jobsList='all'):
55 spiga 1.9 """
56     Return task with all/list of jobs
57     """
58 spiga 1.13 try:
59     task = common.bossSession.load(1,jobsList)[0]
60     except Exception, e :
61     common.logger.debug(3, "Error while getting task : " +str(traceback.format_exc()))
62     raise CrabException('Error while getting task '+str(e))
63 spiga 1.9 return task
64 spiga 1.2
65 slacapra 1.6 def getJob(self, n):
66 spiga 1.9 """
67     Return a task with a single job
68     """
69 spiga 1.13 try:
70     task = common.bossSession.load(1,str(n))[0]
71     except Exception, e :
72     common.logger.debug(3, "Error while getting job : " +str(traceback.format_exc()))
73     raise CrabException('Error while getting job '+str(e))
74 spiga 1.9 return task
75 spiga 1.2
76 spiga 1.1
77     def createTask_(self, optsToSave):
78     """
79     Task declaration
80     with the first coniguration stuff
81     """
82     opt={}
83 slacapra 1.14 if optsToSave.get('server_mode',0) == 1: opt['serverName']=optsToSave['server_name']
84     opt['name']=common.work_space.taskName()
85 spiga 1.1 task = Task( opt )
86 spiga 1.13 try:
87     common.bossSession.saveTask( task )
88     except Exception, e :
89     # common.logger.debug(3, "Error creating task : " +str(traceback.format_exc()))
90     # raise CrabException('Error creating task '+str(e))
91     raise CrabException('Error creating task '+str(traceback.format_exc()))
92    
93 spiga 1.1 return
94    
95     def updateTask_(self,optsToSave):
96     """
97     Update task fields
98     """
99 spiga 1.13 task = self.getTask()
100    
101 spiga 1.1 for key in optsToSave.keys():
102     task[key] = optsToSave[key]
103 spiga 1.13 try:
104     common.bossSession.updateDB( task )
105     except Exception, e :
106     raise CrabException('Error updating task '+str(traceback.format_exc()))
107    
108 spiga 1.1 return
109    
110 spiga 1.9 def createJobs_(self, jobsL):
111 spiga 1.1 """
112     Fill crab DB with the jobs filed
113     """
114 spiga 1.13 task = self.getTask()
115    
116 spiga 1.1 jobs = []
117 spiga 1.9 for id in jobsL:
118 spiga 1.1 parameters = {}
119 spiga 1.9 parameters['jobId'] = str(id)
120 mcinquil 1.15 parameters['name'] = task['name'] + '_' + 'job' + str(id)
121 spiga 1.1 job = Job(parameters)
122 spiga 1.20 jobs.append(job)
123     common.bossSession.getRunningInstance(job)
124     job.runningJob['status'] = 'C'
125 spiga 1.1 task.addJobs(jobs)
126 spiga 1.13 try:
127     common.bossSession.updateDB( task )
128     except Exception, e :
129     raise CrabException('Error updating task '+str(traceback.format_exc()))
130    
131 spiga 1.1 return
132    
133 spiga 1.9 def updateJob_(self, jobsL, optsToSave):
134 spiga 1.1 """
135     Update Job fields
136     """
137 spiga 1.13 task = self.getTask(jobsL)
138 spiga 1.10 id =0
139     for job in task.jobs:
140 spiga 1.9 for key in optsToSave[id].keys():
141 spiga 1.10 job[key] = optsToSave[id][key]
142     id+=1
143 spiga 1.13 try:
144     common.bossSession.updateDB( task )
145     except Exception, e :
146     raise CrabException('Error updating task '+str(traceback.format_exc()))
147 spiga 1.1 return
148    
149 spiga 1.9 def updateRunJob_(self, jobsL, optsToSave):
150 spiga 1.1 """
151     Update Running Job fields
152     """
153 spiga 1.13 task = self.getTask(jobsL)
154    
155 spiga 1.12 id=0
156 spiga 1.10 for job in task.jobs:
157     common.bossSession.getRunningInstance(job)
158 spiga 1.12 for key in optsToSave[id].keys():
159     job.runningJob[key] = optsToSave[id][key]
160     id+=1
161 spiga 1.1 common.bossSession.updateDB( task )
162     return
163    
164 spiga 1.9 def nJobs(self,list=''):
165 spiga 1.2
166 spiga 1.13 task = self.getTask()
167 spiga 1.9 listId=[]
168     if list == 'list':
169     for job in task.jobs:listId.append(int(job['jobId']))
170     return listId
171     else:
172     return len(task.jobs)
173 spiga 1.1
174     def dump(self,jobs):
175     """
176     List a complete set of infos for a job/range of jobs
177     """
178 slacapra 1.17 task = self.getTask(jobs)
179 spiga 1.1
180 slacapra 1.17 Jobs = task.getJobs()
181     print "--------------------------"
182     for Job in Jobs:
183 spiga 1.24 print "Id: ",Job['jobId']
184 slacapra 1.17 print "Dest: ", Job['dlsDestination']
185     print "Output: ", Job['outputFiles']
186     print "Args: ",Job['arguments']
187     print "--------------------------"
188 spiga 1.1 return
189 farinafa 1.8
190     def serializeTask(self, tmp_task = None):
191     if tmp_task is None:
192 spiga 1.13 tmp_task = self.getTask()
193 farinafa 1.8 return common.bossSession.serialize(tmp_task)
194 spiga 1.1
195     def queryID(self,server_mode=0):
196     '''
197     Return the taskId if serevr_mode =1
198     Return the joblistId if serevr_mode =0
199     '''
200     header=''
201     lines=[]
202 spiga 1.13 task = self.getTask()
203 spiga 1.1 if server_mode == 1:
204 spiga 1.2 header= "Task Id = %-40s " %(task['name'])
205 spiga 1.1 else:
206 spiga 1.11 for job in task.jobs:
207     toPrint=''
208     common.bossSession.getRunningInstance(job)
209 spiga 1.24 toPrint = "%-5s %-50s " % (job['jobId'],job.runningJob['schedulerId'])
210 spiga 1.11 lines.append(toPrint)
211     header+= "%-5s %-50s " % ('Job:','ID' )
212     displayReport(self,header,lines)
213 spiga 1.1 return
214    
215     def queryTask(self,attr):
216     '''
217     Perform a query over a generic task attribute
218     '''
219 spiga 1.13 task = self.getTask()
220 spiga 1.1 return task[attr]
221    
222 spiga 1.12 def queryJob(self, attr, jobsL):
223 spiga 1.1 '''
224     Perform a query for a range/all/single job
225     over a generic job attribute
226     '''
227     lines=[]
228 spiga 1.13 task = self.getTask(jobsL)
229 spiga 1.9 for job in task.jobs:
230 spiga 1.19 lines.append(job[attr])
231 spiga 1.1 return lines
232    
233 spiga 1.12 def queryRunJob(self, attr, jobsL):
234 spiga 1.1 '''
235     Perform a query for a range/all/single job
236     over a generic job attribute
237     '''
238     lines=[]
239 spiga 1.13 task = self.getTask(jobsL)
240 spiga 1.9 for job in task.jobs:
241     common.bossSession.getRunningInstance(job)
242     lines.append(job.runningJob[attr])
243 spiga 1.1 return lines
244 spiga 1.3
245     def queryDistJob(self, attr):
246     '''
247     Returns the list of distinct value for a given job attributes
248     '''
249     distAttr=[]
250 spiga 1.13 try:
251     task = common.bossSession.loadJobDist( 1, attr )
252     except Exception, e :
253     common.logger.debug(3, "Error loading Jobs By distinct Attr : " +str(traceback.format_exc()))
254     raise CrabException('Error loading Jobs By distinct Attr '+str(e))
255    
256 spiga 1.19 for i in task: distAttr.append(i[attr])
257 spiga 1.3 return distAttr
258    
259 spiga 1.5 def queryDistJob_Attr(self, attr_1, attr_2, list):
260 spiga 1.4 '''
261 spiga 1.9 Returns the list of distinct value for a given job attribute
262 spiga 1.4 '''
263     distAttr=[]
264 spiga 1.13 try:
265     task = common.bossSession.loadJobDistAttr( 1, attr_1, attr_2, list )
266     except Exception, e :
267     common.logger.debug(3, "Error loading Jobs By distinct Attr : " +str(traceback.format_exc()))
268     raise CrabException('Error loading Jobs By distinct Attr '+str(e))
269    
270 spiga 1.19 for i in task: distAttr.append(i[attr_1])
271 spiga 1.4 return distAttr
272 slacapra 1.6
273 spiga 1.3 def queryAttrJob(self, attr, field):
274     '''
275     Returns the list of jobs matching the given attribute
276     '''
277     matched=[]
278 spiga 1.13 try:
279     task = common.bossSession.loadJobsByAttr(attr )
280     except Exception, e :
281     common.logger.debug(3, "Error loading Jobs By Attr : " +str(traceback.format_exc()))
282     raise CrabException('Error loading Jobs By Attr '+str(e))
283 spiga 1.3 for i in task:
284     matched.append(i[field])
285     return matched
286    
287 spiga 1.7
288     def queryAttrRunJob(self, attr,field):
289     '''
290     Returns the list of jobs matching the given attribute
291     '''
292     matched=[]
293 spiga 1.13 try:
294     task = common.bossSession.loadJobsByRunningAttr(attr)
295     except Exception, e :
296     common.logger.debug(3, "Error loading Jobs By Running Attr : " +str(traceback.format_exc()))
297     raise CrabException('Error loading Jobs By Running Attr '+str(e))
298 spiga 1.7 for i in task:
299 spiga 1.18 matched.append(i.runningJob[field])
300 spiga 1.7 return matched
301 spiga 1.16
302 spiga 1.22 def newRunJobs(self,nj='all'):
303     """
304     Get new running instances
305     """
306     task = self.getTask(nj)
307    
308     for job in task.jobs:
309 spiga 1.24 common.bossSession.getNewRunningInstance(job)
310     job.runningJob['status'] = 'C'
311     job.runningJob['statusScheduler'] = 'Created'
312     common.bossSession.updateDB(task)
313 spiga 1.22 return
314    
315 spiga 1.16 def deserXmlStatus(self, reportList):
316    
317     task = self.getTask()
318    
319     for job in task.jobs:
320     if not job.runningJob:
321 spiga 1.25 raise CrabException( "Missing running object for job %s"%str(job['jobId']) )
322 spiga 1.16
323 spiga 1.25 id = str(job.runningJob['jobId'])
324 spiga 1.16 # TODO linear search, probably it can be optized with binary search
325     rForJ = None
326     for r in reportList:
327     if r.getAttribute('id') in [ id, 'all']:
328     rForJ = r
329     break
330    
331     # Data alignment
332     jobStatus = str(job.runningJob['statusScheduler'])
333 farinafa 1.23 if rForJ.getAttribute('status') not in ['Created', 'Submitting', 'Unknown'] and \
334 farinafa 1.21 job.runningJob['statusScheduler'] != 'Cleared':
335 spiga 1.16 job.runningJob['statusScheduler'] = str( rForJ.getAttribute('status') )
336     jobStatus = str(job.runningJob['statusScheduler'])
337     job.runningJob['status'] = str( rForJ.getAttribute('sched_status') )
338    
339     job.runningJob['destination'] = str( rForJ.getAttribute('site') )
340     dest = str(job.runningJob['destination']).split(':')[0]
341    
342     job.runningJob['applicationReturnCode'] = str( rForJ.getAttribute('exe_exit') )
343     exe_exit_code = str(job.runningJob['applicationReturnCode'])
344    
345     job.runningJob['wrapperReturnCode'] = str( rForJ.getAttribute('job_exit') )
346     job_exit_code = str(job.runningJob['wrapperReturnCode'])
347    
348 farinafa 1.23 #if str( rForJ.getAttribute('resubmit') ).isdigit():
349     # job['submissionNumber'] = int(rForJ.getAttribute('resubmit'))
350     # job.runningJob['submission'] = int(rForJ.getAttribute('resubmit'))
351    
352 spiga 1.16 # TODO cleared='0' field, how should it be handled/mapped in BL? #Fabio
353    
354     common.bossSession.updateDB( task )
355 farinafa 1.28
356 farinafa 1.27 return