ViewVC Help
View File | Revision Log | Show Annotations | Root Listing
root/cvsroot/COMP/CRAB/python/DBinterface.py
Revision: 1.55
Committed: Sat Mar 7 16:53:38 2009 UTC (16 years, 1 month ago) by spiga
Content type: text/x-python
Branch: MAIN
CVS Tags: CRAB_2_5_0, CRAB_2_5_0_pre7, CRAB_2_5_0_pre6
Changes since 1.54: +1 -1 lines
Log Message:
minor fixes

File Contents

# User Rev Content
1 spiga 1.1 from crab_logger import Logger
2     from crab_exceptions import *
3     from crab_util import *
4     import common
5     import os, time, shutil
6 spiga 1.13 import traceback
7 spiga 1.1
8     from ProdCommon.BossLite.API.BossLiteAPI import BossLiteAPI
9 spiga 1.13 from ProdCommon.BossLite.Common.Exceptions import DbError
10     from ProdCommon.BossLite.Common.Exceptions import TaskError
11 spiga 1.1
12     from ProdCommon.BossLite.DbObjects.Job import Job
13     from ProdCommon.BossLite.DbObjects.Task import Task
14     from ProdCommon.BossLite.DbObjects.RunningJob import RunningJob
15    
16    
17     class DBinterface:
18 spiga 1.2 def __init__(self, cfg_params):
19 spiga 1.1
20 spiga 1.2 self.cfg_params = cfg_params
21    
22     self.db_type = cfg_params.get("USER.use_db",'SQLite')
23 spiga 1.1 return
24    
25    
26 spiga 1.2 def configureDB(self):
27 spiga 1.1
28     dbname = common.work_space.shareDir()+'crabDB'
29     dbConfig = {'dbName':dbname
30     }
31 spiga 1.13 try:
32     common.bossSession = BossLiteAPI( self.db_type, dbConfig)
33     except Exception, e :
34     raise CrabException('Istantiate DB Session : '+str(e))
35    
36     try:
37 spiga 1.30 common.bossSession.bossLiteDB.installDB('$CRABPRODCOMMONPYTHON/ProdCommon/BossLite/DbObjects/setupDatabase-sqlite.sql')
38 spiga 1.13 except Exception, e :
39     raise CrabException('DB Installation error : '+str(e))
40     return
41 spiga 1.1
42 spiga 1.2 def loadDB(self):
43    
44     dbname = common.work_space.shareDir()+'crabDB'
45     dbConfig = {'dbName':dbname
46     }
47 spiga 1.13 try:
48     common.bossSession = BossLiteAPI( self.db_type, dbConfig)
49     except Exception, e :
50     raise CrabException('Istantiate DB Session : '+str(e))
51    
52 spiga 1.2 return
53 spiga 1.4
54 spiga 1.13 def getTask(self, jobsList='all'):
55 spiga 1.9 """
56     Return task with all/list of jobs
57     """
58 spiga 1.13 try:
59 spiga 1.45 task = common.bossSession.load(1,jobsList)
60 spiga 1.13 except Exception, e :
61     common.logger.debug(3, "Error while getting task : " +str(traceback.format_exc()))
62     raise CrabException('Error while getting task '+str(e))
63 spiga 1.9 return task
64 spiga 1.2
65 slacapra 1.6 def getJob(self, n):
66 spiga 1.9 """
67     Return a task with a single job
68     """
69 spiga 1.13 try:
70 spiga 1.45 task = common.bossSession.load(1,str(n))
71 spiga 1.13 except Exception, e :
72     common.logger.debug(3, "Error while getting job : " +str(traceback.format_exc()))
73     raise CrabException('Error while getting job '+str(e))
74 spiga 1.9 return task
75 spiga 1.2
76 spiga 1.1
77     def createTask_(self, optsToSave):
78     """
79     Task declaration
80     with the first coniguration stuff
81     """
82     opt={}
83 slacapra 1.14 if optsToSave.get('server_mode',0) == 1: opt['serverName']=optsToSave['server_name']
84 spiga 1.55 opt['name']= getUserName()+ '_' + string.split(common.work_space.topDir(),'/')[-2]+'_'+common.work_space.task_uuid()
85 spiga 1.1 task = Task( opt )
86 spiga 1.13 try:
87     common.bossSession.saveTask( task )
88     except Exception, e :
89     # common.logger.debug(3, "Error creating task : " +str(traceback.format_exc()))
90     # raise CrabException('Error creating task '+str(e))
91     raise CrabException('Error creating task '+str(traceback.format_exc()))
92    
93 spiga 1.1 return
94    
95     def updateTask_(self,optsToSave):
96     """
97     Update task fields
98     """
99 spiga 1.13 task = self.getTask()
100    
101 spiga 1.1 for key in optsToSave.keys():
102     task[key] = optsToSave[key]
103 spiga 1.13 try:
104     common.bossSession.updateDB( task )
105     except Exception, e :
106     raise CrabException('Error updating task '+str(traceback.format_exc()))
107    
108 spiga 1.1 return
109    
110 spiga 1.34 def createJobs_(self, jobsL, isNew=True):
111 spiga 1.1 """
112     Fill crab DB with the jobs filed
113     """
114 spiga 1.13 task = self.getTask()
115    
116 spiga 1.1 jobs = []
117 spiga 1.9 for id in jobsL:
118 spiga 1.1 parameters = {}
119 spiga 1.34 parameters['jobId'] = int(id)
120     parameters['taskId'] = 1
121 mcinquil 1.15 parameters['name'] = task['name'] + '_' + 'job' + str(id)
122 spiga 1.1 job = Job(parameters)
123 spiga 1.20 jobs.append(job)
124     common.bossSession.getRunningInstance(job)
125     job.runningJob['status'] = 'C'
126 spiga 1.34 ## added to support second step creation
127     ## maybe it is not needed. TO CLARIFY
128     if isNew:
129     task.addJobs(jobs)
130     else:
131     task.appendJobs(jobs)
132 spiga 1.13 try:
133     common.bossSession.updateDB( task )
134     except Exception, e :
135     raise CrabException('Error updating task '+str(traceback.format_exc()))
136    
137 spiga 1.1 return
138    
139 spiga 1.9 def updateJob_(self, jobsL, optsToSave):
140 spiga 1.1 """
141     Update Job fields
142     """
143 spiga 1.13 task = self.getTask(jobsL)
144 spiga 1.10 id =0
145     for job in task.jobs:
146 spiga 1.9 for key in optsToSave[id].keys():
147 spiga 1.10 job[key] = optsToSave[id][key]
148     id+=1
149 spiga 1.13 try:
150     common.bossSession.updateDB( task )
151     except Exception, e :
152     raise CrabException('Error updating task '+str(traceback.format_exc()))
153 spiga 1.1 return
154    
155 spiga 1.9 def updateRunJob_(self, jobsL, optsToSave):
156 spiga 1.1 """
157     Update Running Job fields
158     """
159 spiga 1.13 task = self.getTask(jobsL)
160    
161 spiga 1.12 id=0
162 spiga 1.10 for job in task.jobs:
163     common.bossSession.getRunningInstance(job)
164 spiga 1.12 for key in optsToSave[id].keys():
165     job.runningJob[key] = optsToSave[id][key]
166     id+=1
167 spiga 1.1 common.bossSession.updateDB( task )
168     return
169    
170 spiga 1.9 def nJobs(self,list=''):
171 spiga 1.2
172 spiga 1.13 task = self.getTask()
173 spiga 1.9 listId=[]
174     if list == 'list':
175     for job in task.jobs:listId.append(int(job['jobId']))
176     return listId
177     else:
178     return len(task.jobs)
179 spiga 1.1
180     def dump(self,jobs):
181     """
182     List a complete set of infos for a job/range of jobs
183     """
184 slacapra 1.17 task = self.getTask(jobs)
185 spiga 1.1
186 slacapra 1.17 print "--------------------------"
187 spiga 1.32 for Job in task.jobs:
188 spiga 1.24 print "Id: ",Job['jobId']
189 slacapra 1.17 print "Dest: ", Job['dlsDestination']
190     print "Output: ", Job['outputFiles']
191     print "Args: ",Job['arguments']
192 spiga 1.32 print "Service: ",Job.runningJob['service']
193 slacapra 1.17 print "--------------------------"
194 spiga 1.1 return
195 farinafa 1.8
196     def serializeTask(self, tmp_task = None):
197     if tmp_task is None:
198 spiga 1.13 tmp_task = self.getTask()
199 farinafa 1.8 return common.bossSession.serialize(tmp_task)
200 spiga 1.1
201 spiga 1.43 def queryID(self,server_mode=0, jid=False):
202 spiga 1.1 '''
203     Return the taskId if serevr_mode =1
204     Return the joblistId if serevr_mode =0
205     '''
206     header=''
207     lines=[]
208 spiga 1.13 task = self.getTask()
209 spiga 1.1 if server_mode == 1:
210 spiga 1.51 # init client server params...
211     CliServerParams(self)
212 spiga 1.43 headerTask= "Task Id = %-40s " %(task['name'])
213     displayReport(self,headerTask,lines)
214 spiga 1.52 common.logger.message(showWebMon(self.server_name))
215 spiga 1.43 if (jid ) or (server_mode == 0):
216 spiga 1.11 for job in task.jobs:
217     toPrint=''
218     common.bossSession.getRunningInstance(job)
219 spiga 1.24 toPrint = "%-5s %-50s " % (job['jobId'],job.runningJob['schedulerId'])
220 spiga 1.11 lines.append(toPrint)
221     header+= "%-5s %-50s " % ('Job:','ID' )
222 spiga 1.43 displayReport(self,header,lines)
223 spiga 1.1 return
224    
225     def queryTask(self,attr):
226     '''
227     Perform a query over a generic task attribute
228     '''
229 spiga 1.13 task = self.getTask()
230 spiga 1.1 return task[attr]
231    
232 spiga 1.12 def queryJob(self, attr, jobsL):
233 spiga 1.1 '''
234     Perform a query for a range/all/single job
235     over a generic job attribute
236     '''
237     lines=[]
238 spiga 1.13 task = self.getTask(jobsL)
239 spiga 1.9 for job in task.jobs:
240 spiga 1.19 lines.append(job[attr])
241 spiga 1.1 return lines
242    
243 spiga 1.12 def queryRunJob(self, attr, jobsL):
244 spiga 1.1 '''
245     Perform a query for a range/all/single job
246     over a generic job attribute
247     '''
248     lines=[]
249 spiga 1.13 task = self.getTask(jobsL)
250 spiga 1.9 for job in task.jobs:
251     common.bossSession.getRunningInstance(job)
252     lines.append(job.runningJob[attr])
253 spiga 1.1 return lines
254 spiga 1.3
255     def queryDistJob(self, attr):
256     '''
257     Returns the list of distinct value for a given job attributes
258     '''
259     distAttr=[]
260 spiga 1.13 try:
261     task = common.bossSession.loadJobDist( 1, attr )
262     except Exception, e :
263     common.logger.debug(3, "Error loading Jobs By distinct Attr : " +str(traceback.format_exc()))
264     raise CrabException('Error loading Jobs By distinct Attr '+str(e))
265    
266 spiga 1.19 for i in task: distAttr.append(i[attr])
267 spiga 1.3 return distAttr
268    
269 spiga 1.5 def queryDistJob_Attr(self, attr_1, attr_2, list):
270 spiga 1.4 '''
271 spiga 1.9 Returns the list of distinct value for a given job attribute
272 spiga 1.4 '''
273     distAttr=[]
274 spiga 1.13 try:
275     task = common.bossSession.loadJobDistAttr( 1, attr_1, attr_2, list )
276     except Exception, e :
277     common.logger.debug(3, "Error loading Jobs By distinct Attr : " +str(traceback.format_exc()))
278     raise CrabException('Error loading Jobs By distinct Attr '+str(e))
279    
280 spiga 1.19 for i in task: distAttr.append(i[attr_1])
281 spiga 1.4 return distAttr
282 slacapra 1.6
283 spiga 1.3 def queryAttrJob(self, attr, field):
284     '''
285     Returns the list of jobs matching the given attribute
286     '''
287     matched=[]
288 spiga 1.13 try:
289     task = common.bossSession.loadJobsByAttr(attr )
290     except Exception, e :
291     common.logger.debug(3, "Error loading Jobs By Attr : " +str(traceback.format_exc()))
292     raise CrabException('Error loading Jobs By Attr '+str(e))
293 spiga 1.3 for i in task:
294     matched.append(i[field])
295     return matched
296    
297 spiga 1.7
298     def queryAttrRunJob(self, attr,field):
299     '''
300     Returns the list of jobs matching the given attribute
301     '''
302     matched=[]
303 spiga 1.13 try:
304     task = common.bossSession.loadJobsByRunningAttr(attr)
305     except Exception, e :
306     common.logger.debug(3, "Error loading Jobs By Running Attr : " +str(traceback.format_exc()))
307     raise CrabException('Error loading Jobs By Running Attr '+str(e))
308 spiga 1.7 for i in task:
309 spiga 1.18 matched.append(i.runningJob[field])
310 spiga 1.7 return matched
311 spiga 1.16
312 spiga 1.22 def newRunJobs(self,nj='all'):
313     """
314     Get new running instances
315     """
316     task = self.getTask(nj)
317    
318     for job in task.jobs:
319 spiga 1.24 common.bossSession.getNewRunningInstance(job)
320     job.runningJob['status'] = 'C'
321     job.runningJob['statusScheduler'] = 'Created'
322     common.bossSession.updateDB(task)
323 spiga 1.22 return
324    
325 spiga 1.16 def deserXmlStatus(self, reportList):
326    
327     task = self.getTask()
328     for job in task.jobs:
329     if not job.runningJob:
330 spiga 1.25 raise CrabException( "Missing running object for job %s"%str(job['jobId']) )
331 farinafa 1.37
332 spiga 1.25 id = str(job.runningJob['jobId'])
333 spiga 1.16 rForJ = None
334 spiga 1.33 nj_list= []
335 spiga 1.16 for r in reportList:
336     if r.getAttribute('id') in [ id, 'all']:
337     rForJ = r
338     break
339 mcinquil 1.50 ## Check the submission number and create new running jobs on the client side
340 mcinquil 1.53 if rForJ.getAttribute('resubmit') != 'None' and (rForJ.getAttribute('status') not in ['Cleared','Killed','Done','Done (Failed)','Not Submitted']) :
341 spiga 1.43 if int(job.runningJob['submission']) < int(rForJ.getAttribute('resubmit')) + 1:
342     nj_list.append(id)
343 spiga 1.48 if len(nj_list) > 0: self.newRunJobs(nj_list)
344 spiga 1.33
345     task_new = self.getTask()
346 spiga 1.16
347 spiga 1.33 for job in task_new.jobs:
348     id = str(job.runningJob['jobId'])
349     # TODO linear search, probably it can be optized with binary search
350     rForJ = None
351     for r in reportList:
352     if r.getAttribute('id') in [ id, 'all']:
353     rForJ = r
354 mcinquil 1.50 break
355 spiga 1.43
356 spiga 1.16 # Data alignment
357 mcinquil 1.47 if rForJ.getAttribute('status') not in ['Created', 'Unknown']:
358 spiga 1.38 # update the status
359     common.logger.debug(3,"Updating DB status for job: " + str(id) + " @: " \
360     + str(rForJ.getAttribute('status')) )
361     job.runningJob['statusScheduler'] = str( rForJ.getAttribute('status') )
362 mcinquil 1.42 if (rForJ.getAttribute('status') == 'Done' or rForJ.getAttribute('status') == 'Done (Failed)')\
363     and rForJ.getAttribute('sched_status') == 'E' :
364 spiga 1.41 job.runningJob['status'] = 'SD'
365     else:
366     job.runningJob['status'] = str( rForJ.getAttribute('sched_status') )
367 spiga 1.35
368 spiga 1.43 job.runningJob['schedulerId'] = str( rForJ.getAttribute('sched_id') )
369    
370 spiga 1.39 job.runningJob['destination'] = str( rForJ.getAttribute('site') )
371     dest = str(job.runningJob['destination']).split(':')[0]
372    
373     job.runningJob['applicationReturnCode'] = str( rForJ.getAttribute('exe_exit') )
374     exe_exit_code = str(job.runningJob['applicationReturnCode'])
375    
376     job.runningJob['wrapperReturnCode'] = str( rForJ.getAttribute('job_exit') )
377     job_exit_code = str(job.runningJob['wrapperReturnCode'])
378 mcinquil 1.50
379     ## unsing 'standardInput' field for 'ended' tag ['Y','N']
380     job['standardInput'] = str( rForJ.getAttribute('ended') )
381 spiga 1.35
382 spiga 1.38 #if str( rForJ.getAttribute('resubmit') ).isdigit():
383     # job['submissionNumber'] = int(rForJ.getAttribute('resubmit'))
384     # job.runningJob['submission'] = int(rForJ.getAttribute('resubmit'))
385 spiga 1.35
386 spiga 1.38 # TODO cleared='0' field, how should it be handled/mapped in BL? #Fabio
387 spiga 1.16
388 spiga 1.33 common.bossSession.updateDB( task_new )
389 farinafa 1.27 return
390 spiga 1.30
391     # FIXME temporary method to verify what kind of submission to perform towards the server
392     def checkIfNeverSubmittedBefore(self):
393     for j in self.getTask().jobs:
394 farinafa 1.31 if j.runningJob['submission'] > 1 or j.runningJob['status'] != 'C' or \
395     j.runningJob['statusScheduler'] != 'Created':
396 spiga 1.30 return False
397     return True
398    
399