ViewVC Help
View File | Revision Log | Show Annotations | Root Listing
root/cvsroot/COMP/CRAB/python/DBinterface.py
Revision: 1.59
Committed: Tue May 26 10:23:00 2009 UTC (15 years, 11 months ago) by spiga
Content type: text/x-python
Branch: MAIN
CVS Tags: CRAB_2_6_0_pre5, CRAB_2_6_0_pre4, CRAB_2_6_0_pre3
Changes since 1.58: +8 -11 lines
Log Message:
adapting code to logging usage. (crab_logger removed)

File Contents

# Content
1 from crab_exceptions import *
2 from crab_util import *
3 import common
4 import os, time, shutil
5 import traceback
6
7 from ProdCommon.BossLite.API.BossLiteAPI import BossLiteAPI
8 from ProdCommon.BossLite.Common.Exceptions import DbError
9 from ProdCommon.BossLite.Common.Exceptions import TaskError
10
11 from ProdCommon.BossLite.DbObjects.Job import Job
12 from ProdCommon.BossLite.DbObjects.Task import Task
13 from ProdCommon.BossLite.DbObjects.RunningJob import RunningJob
14
15
16 class DBinterface:
17 def __init__(self, cfg_params):
18
19 self.cfg_params = cfg_params
20
21 self.db_type = cfg_params.get("USER.use_db",'SQLite')
22 return
23
24
25 def configureDB(self):
26
27 dbname = common.work_space.shareDir()+'crabDB'
28 dbConfig = {'dbName':dbname
29 }
30 try:
31 common.bossSession = BossLiteAPI( self.db_type, dbConfig)
32 except Exception, e :
33 raise CrabException('Istantiate DB Session : '+str(e))
34
35 try:
36 common.bossSession.bossLiteDB.installDB('$CRABPRODCOMMONPYTHON/ProdCommon/BossLite/DbObjects/setupDatabase-sqlite.sql')
37 except Exception, e :
38 raise CrabException('DB Installation error : '+str(e))
39 return
40
41 def loadDB(self):
42
43 dbname = common.work_space.shareDir()+'crabDB'
44 dbConfig = {'dbName':dbname
45 }
46 try:
47 common.bossSession = BossLiteAPI( self.db_type, dbConfig)
48 except Exception, e :
49 raise CrabException('Istantiate DB Session : '+str(e))
50
51 return
52
53 def getTask(self, jobsList='all'):
54 """
55 Return task with all/list of jobs
56 """
57 try:
58 task = common.bossSession.load(1,jobsList)
59 except Exception, e :
60 common.logger.debug( "Error while getting task : " +str(traceback.format_exc()))
61 raise CrabException('Error while getting task '+str(e))
62 return task
63
64 def getJob(self, n):
65 """
66 Return a task with a single job
67 """
68 try:
69 task = common.bossSession.load(1,str(n))
70 except Exception, e :
71 common.logger.debug( "Error while getting job : " +str(traceback.format_exc()))
72 raise CrabException('Error while getting job '+str(e))
73 return task
74
75
76 def createTask_(self, optsToSave):
77 """
78 Task declaration
79 with the first coniguration stuff
80 """
81 opt={}
82 if optsToSave.get('server_mode',0) == 1: opt['serverName']=optsToSave['server_name']
83 opt['name']= getUserName()+ '_' + string.split(common.work_space.topDir(),'/')[-2]+'_'+common.work_space.task_uuid()
84 task = Task( opt )
85 try:
86 common.bossSession.saveTask( task )
87 except Exception, e :
88 raise CrabException('Error creating task '+str(traceback.format_exc()))
89
90 return
91
92 def updateTask_(self,optsToSave):
93 """
94 Update task fields
95 """
96 task = self.getTask()
97
98 for key in optsToSave.keys():
99 task[key] = optsToSave[key]
100 try:
101 common.bossSession.updateDB( task )
102 except Exception, e :
103 raise CrabException('Error updating task '+str(traceback.format_exc()))
104
105 return
106
107 def createJobs_(self, jobsL, isNew=True):
108 """
109 Fill crab DB with the jobs filed
110 """
111 task = self.getTask()
112
113 jobs = []
114 for id in jobsL:
115 parameters = {}
116 parameters['jobId'] = int(id)
117 parameters['taskId'] = 1
118 parameters['name'] = task['name'] + '_' + 'job' + str(id)
119 job = Job(parameters)
120 jobs.append(job)
121 common.bossSession.getRunningInstance(job)
122 job.runningJob['status'] = 'C'
123 ## added to support second step creation
124 ## maybe it is not needed. TO CLARIFY
125 if isNew:
126 task.addJobs(jobs)
127 else:
128 task.appendJobs(jobs)
129 try:
130 common.bossSession.updateDB( task )
131 except Exception, e :
132 raise CrabException('Error updating task '+str(traceback.format_exc()))
133
134 return
135
136 def updateJob_(self, jobsL, optsToSave):
137 """
138 Update Job fields
139 """
140 task = self.getTask(jobsL)
141 id =0
142 for job in task.jobs:
143 for key in optsToSave[id].keys():
144 job[key] = optsToSave[id][key]
145 id+=1
146 try:
147 common.bossSession.updateDB( task )
148 except Exception, e :
149 raise CrabException('Error updating task '+str(traceback.format_exc()))
150 return
151
152 def updateRunJob_(self, jobsL, optsToSave):
153 """
154 Update Running Job fields
155 """
156 task = self.getTask(jobsL)
157
158 id=0
159 for job in task.jobs:
160 common.bossSession.getRunningInstance(job)
161 for key in optsToSave[id].keys():
162 job.runningJob[key] = optsToSave[id][key]
163 id+=1
164 common.bossSession.updateDB( task )
165 return
166
167 def nJobs(self,list=''):
168
169 task = self.getTask()
170 listId=[]
171 if list == 'list':
172 for job in task.jobs:listId.append(int(job['jobId']))
173 return listId
174 else:
175 return len(task.jobs)
176
177 def dump(self,jobs):
178 """
179 List a complete set of infos for a job/range of jobs
180 """
181 task = self.getTask(jobs)
182
183 print "--------------------------"
184 for Job in task.jobs:
185 print "Id: ",Job['jobId']
186 print "Dest: ", Job['dlsDestination']
187 print "Output: ", Job['outputFiles']
188 print "Args: ",Job['arguments']
189 print "Service: ",Job.runningJob['service']
190 print "--------------------------"
191 return
192
193 def serializeTask(self, tmp_task = None):
194 if tmp_task is None:
195 tmp_task = self.getTask()
196 return common.bossSession.serialize(tmp_task)
197
198 def queryID(self,server_mode=0, jid=False):
199 '''
200 Return the taskId if serevr_mode =1
201 Return the joblistId if serevr_mode =0
202 '''
203 header=''
204 lines=[]
205 task = self.getTask()
206 if server_mode == 1:
207 # init client server params...
208 CliServerParams(self)
209 headerTask= "Task Id = %-40s " %(task['name'])
210 displayReport(self,headerTask,lines)
211 common.logger.info(showWebMon(self.server_name))
212 if (jid ) or (server_mode == 0):
213 for job in task.jobs:
214 toPrint=''
215 common.bossSession.getRunningInstance(job)
216 toPrint = "%-5s %-50s " % (job['jobId'],job.runningJob['schedulerId'])
217 lines.append(toPrint)
218 header+= "%-5s %-50s " % ('Job:','ID' )
219 displayReport(self,header,lines)
220 return
221
222 def queryTask(self,attr):
223 '''
224 Perform a query over a generic task attribute
225 '''
226 task = self.getTask()
227 return task[attr]
228
229 def queryJob(self, attr, jobsL):
230 '''
231 Perform a query for a range/all/single job
232 over a generic job attribute
233 '''
234 lines=[]
235 task = self.getTask(jobsL)
236 for job in task.jobs:
237 lines.append(job[attr])
238 return lines
239
240 def queryRunJob(self, attr, jobsL):
241 '''
242 Perform a query for a range/all/single job
243 over a generic job attribute
244 '''
245 lines=[]
246 task = self.getTask(jobsL)
247 for job in task.jobs:
248 common.bossSession.getRunningInstance(job)
249 lines.append(job.runningJob[attr])
250 return lines
251
252 def queryDistJob(self, attr):
253 '''
254 Returns the list of distinct value for a given job attributes
255 '''
256 distAttr=[]
257 try:
258 task = common.bossSession.loadJobDist( 1, attr )
259 except Exception, e :
260 common.logger.debug( "Error loading Jobs By distinct Attr : " +str(traceback.format_exc()))
261 raise CrabException('Error loading Jobs By distinct Attr '+str(e))
262
263 for i in task: distAttr.append(i[attr])
264 return distAttr
265
266 def queryDistJob_Attr(self, attr_1, attr_2, list):
267 '''
268 Returns the list of distinct value for a given job attribute
269 '''
270 distAttr=[]
271 try:
272 task = common.bossSession.loadJobDistAttr( 1, attr_1, attr_2, list )
273 except Exception, e :
274 common.logger.debug( "Error loading Jobs By distinct Attr : " +str(traceback.format_exc()))
275 raise CrabException('Error loading Jobs By distinct Attr '+str(e))
276
277 for i in task: distAttr.append(i[attr_1])
278 return distAttr
279
280 def queryAttrJob(self, attr, field):
281 '''
282 Returns the list of jobs matching the given attribute
283 '''
284 matched=[]
285 try:
286 task = common.bossSession.loadJobsByAttr(attr )
287 except Exception, e :
288 common.logger.debug( "Error loading Jobs By Attr : " +str(traceback.format_exc()))
289 raise CrabException('Error loading Jobs By Attr '+str(e))
290 for i in task:
291 matched.append(i[field])
292 return matched
293
294
295 def queryAttrRunJob(self, attr,field):
296 '''
297 Returns the list of jobs matching the given attribute
298 '''
299 matched=[]
300 try:
301 task = common.bossSession.loadJobsByRunningAttr(attr)
302 except Exception, e :
303 common.logger.debug( "Error loading Jobs By Running Attr : " +str(traceback.format_exc()))
304 raise CrabException('Error loading Jobs By Running Attr '+str(e))
305 for i in task:
306 matched.append(i.runningJob[field])
307 return matched
308
309 def newRunJobs(self,nj='all'):
310 """
311 Get new running instances
312 """
313 task = self.getTask(nj)
314
315 for job in task.jobs:
316 common.bossSession.getNewRunningInstance(job)
317 job.runningJob['status'] = 'C'
318 job.runningJob['statusScheduler'] = 'Created'
319 common.bossSession.updateDB(task)
320 return
321
322 def deserXmlStatus(self, reportList):
323
324 task = self.getTask()
325 for job in task.jobs:
326 if not job.runningJob:
327 raise CrabException( "Missing running object for job %s"%str(job['jobId']) )
328
329 id = str(job.runningJob['jobId'])
330 rForJ = None
331 nj_list= []
332 for r in reportList:
333 if r.getAttribute('id') in [ id, 'all']:
334 rForJ = r
335 break
336 ## Check the submission number and create new running jobs on the client side
337 if rForJ.getAttribute('resubmit') != 'None' and (rForJ.getAttribute('status') not in ['Cleared','Killed','Done','Done (Failed)','Not Submitted', 'Cancelled by user']) :
338 if int(job.runningJob['submission']) < int(rForJ.getAttribute('resubmit')) + 1:
339 nj_list.append(id)
340 if len(nj_list) > 0: self.newRunJobs(nj_list)
341
342 task_new = self.getTask()
343
344 for job in task_new.jobs:
345 id = str(job.runningJob['jobId'])
346 # TODO linear search, probably it can be optized with binary search
347 rForJ = None
348 for r in reportList:
349 if r.getAttribute('id') in [ id, 'all']:
350 rForJ = r
351 break
352
353 # Data alignment
354 if rForJ.getAttribute('status') not in ['Created', 'Unknown']:
355 # update the status
356 common.logger.debug("Updating DB status for job: " + str(id) + " @: " \
357 + str(rForJ.getAttribute('status')) )
358 job.runningJob['statusScheduler'] = str( rForJ.getAttribute('status') )
359 if (rForJ.getAttribute('status') == 'Done' or rForJ.getAttribute('status') == 'Done (Failed)')\
360 and rForJ.getAttribute('sched_status') == 'E' :
361 job.runningJob['status'] = 'SD'
362 else:
363 job.runningJob['status'] = str( rForJ.getAttribute('sched_status') )
364
365 job.runningJob['schedulerId'] = str( rForJ.getAttribute('sched_id') )
366
367 job.runningJob['destination'] = str( rForJ.getAttribute('site') )
368 dest = str(job.runningJob['destination']).split(':')[0]
369
370 job.runningJob['applicationReturnCode'] = str( rForJ.getAttribute('exe_exit') )
371 exe_exit_code = str(job.runningJob['applicationReturnCode'])
372
373 job.runningJob['wrapperReturnCode'] = str( rForJ.getAttribute('job_exit') )
374 job_exit_code = str(job.runningJob['wrapperReturnCode'])
375
376 ## unsing 'standardInput' field for 'ended' tag ['Y','N']
377 job['standardInput'] = str( rForJ.getAttribute('ended') )
378
379 job.runningJob['state'] = str( rForJ.getAttribute('action') )
380
381 #if str( rForJ.getAttribute('resubmit') ).isdigit():
382 # job['submissionNumber'] = int(rForJ.getAttribute('resubmit'))
383 # job.runningJob['submission'] = int(rForJ.getAttribute('resubmit'))
384
385 # TODO cleared='0' field, how should it be handled/mapped in BL? #Fabio
386
387 common.bossSession.updateDB( task_new )
388 return
389
390 # FIXME temporary method to verify what kind of submission to perform towards the server
391 def checkIfNeverSubmittedBefore(self):
392 for j in self.getTask().jobs:
393 if j.runningJob['submission'] > 1 or j.runningJob['status'] != 'C' or \
394 j.runningJob['statusScheduler'] != 'Created':
395 return False
396 return True
397
398