ViewVC Help
View File | Revision Log | Show Annotations | Root Listing
root/cvsroot/COMP/CRAB/python/DBinterface.py
Revision: 1.62
Committed: Fri Jun 12 09:38:54 2009 UTC (15 years, 10 months ago) by spiga
Content type: text/x-python
Branch: MAIN
CVS Tags: CRAB_2_6_1_pre4, CRAB_2_6_1_pre3, CRAB_2_6_1_pre2, CRAB_2_6_1_pre1, CRAB_2_6_0, CRAB_2_6_0_pre14, CRAB_2_6_0_pre13, CRAB_2_6_0_pre12, CRAB_2_6_0_pre11, CRAB_2_6_0_pre10, CRAB_2_6_0_pre9
Changes since 1.61: +5 -4 lines
Log Message:
use proper field (job.closed and not job.standardInput) to handle close jobs

File Contents

# Content
1 from crab_exceptions import *
2 from crab_util import *
3 import common
4 import os, time, shutil
5 import traceback
6
7 from ProdCommon.BossLite.API.BossLiteAPI import BossLiteAPI
8 from ProdCommon.BossLite.Common.Exceptions import DbError
9 from ProdCommon.BossLite.Common.Exceptions import TaskError
10
11 from ProdCommon.BossLite.DbObjects.Job import Job
12 from ProdCommon.BossLite.DbObjects.Task import Task
13 from ProdCommon.BossLite.DbObjects.RunningJob import RunningJob
14
15
16 class DBinterface:
17 def __init__(self, cfg_params):
18
19 self.cfg_params = cfg_params
20
21 self.db_type = cfg_params.get("USER.use_db",'SQLite')
22 return
23
24
25 def configureDB(self):
26
27 dbname = common.work_space.shareDir()+'crabDB'
28 dbConfig = {'dbName':dbname
29 }
30 try:
31 common.bossSession = BossLiteAPI( self.db_type, dbConfig)
32 except Exception, e :
33 raise CrabException('Istantiate DB Session : '+str(e))
34
35 try:
36 common.bossSession.bossLiteDB.installDB('$CRABPRODCOMMONPYTHON/ProdCommon/BossLite/DbObjects/setupDatabase-sqlite.sql')
37 except Exception, e :
38 raise CrabException('DB Installation error : '+str(e))
39 return
40
41 def loadDB(self):
42
43 dbname = common.work_space.shareDir()+'crabDB'
44 dbConfig = {'dbName':dbname
45 }
46 try:
47 common.bossSession = BossLiteAPI( self.db_type, dbConfig)
48 except Exception, e :
49 raise CrabException('Istantiate DB Session : '+str(e))
50
51 return
52
53 def getTask(self, jobsList='all'):
54 """
55 Return task with all/list of jobs
56 """
57 try:
58 task = common.bossSession.load(1,jobsList)
59 except Exception, e :
60 common.logger.debug( "Error while getting task : " +str(traceback.format_exc()))
61 raise CrabException('Error while getting task '+str(e))
62 return task
63
64 def getJob(self, n):
65 """
66 Return a task with a single job
67 """
68 try:
69 task = common.bossSession.load(1,str(n))
70 except Exception, e :
71 common.logger.debug( "Error while getting job : " +str(traceback.format_exc()))
72 raise CrabException('Error while getting job '+str(e))
73 return task
74
75
76 def createTask_(self, optsToSave):
77 """
78 Task declaration
79 with the first coniguration stuff
80 """
81 opt={}
82 if optsToSave.get('server_mode',0) == 1: opt['serverName']=optsToSave['server_name']
83 opt['name']= getUserName()+ '_' + string.split(common.work_space.topDir(),'/')[-2]+'_'+common.work_space.task_uuid()
84 task = Task( opt )
85 try:
86 common.bossSession.saveTask( task )
87 except Exception, e :
88 raise CrabException('Error creating task '+str(traceback.format_exc()))
89
90 return
91
92 def updateTask_(self,optsToSave):
93 """
94 Update task fields
95 """
96 task = self.getTask()
97
98 for key in optsToSave.keys():
99 task[key] = optsToSave[key]
100 try:
101 common.bossSession.updateDB( task )
102 except Exception, e :
103 raise CrabException('Error updating task '+str(traceback.format_exc()))
104
105 return
106
107 def createJobs_(self, jobsL, isNew=True):
108 """
109 Fill crab DB with the jobs filed
110 """
111 task = self.getTask()
112
113 jobs = []
114 for id in jobsL:
115 parameters = {}
116 parameters['jobId'] = int(id)
117 parameters['taskId'] = 1
118 parameters['name'] = task['name'] + '_' + 'job' + str(id)
119 job = Job(parameters)
120 jobs.append(job)
121 common.bossSession.getRunningInstance(job)
122 job.runningJob['status'] = 'C'
123 ## added to support second step creation
124 ## maybe it is not needed. TO CLARIFY
125 if isNew:
126 task.addJobs(jobs)
127 else:
128 task.appendJobs(jobs)
129 try:
130 common.bossSession.updateDB( task )
131 except Exception, e :
132 raise CrabException('Error updating task '+str(traceback.format_exc()))
133
134 return
135
136 def updateJob_(self, jobsL, optsToSave):
137 """
138 Update Job fields
139 """
140 task = self.getTask(jobsL)
141 id =0
142 for job in task.jobs:
143 for key in optsToSave[id].keys():
144 job[key] = optsToSave[id][key]
145 id+=1
146 try:
147 common.bossSession.updateDB( task )
148 except Exception, e :
149 raise CrabException('Error updating task '+str(traceback.format_exc()))
150 return
151
152 def updateRunJob_(self, jobsL, optsToSave):
153 """
154 Update Running Job fields
155 """
156 task = self.getTask(jobsL)
157
158 id=0
159 for job in task.jobs:
160 common.bossSession.getRunningInstance(job)
161 for key in optsToSave[id].keys():
162 job.runningJob[key] = optsToSave[id][key]
163 id+=1
164 common.bossSession.updateDB( task )
165 return
166
167 def nJobs(self,list=''):
168
169 task = self.getTask()
170 listId=[]
171 if list == 'list':
172 for job in task.jobs:listId.append(int(job['jobId']))
173 return listId
174 else:
175 return len(task.jobs)
176
177 def dump(self,jobs):
178 """
179 List a complete set of infos for a job/range of jobs
180 """
181 task = self.getTask(jobs)
182
183 print "--------------------------"
184 for Job in task.jobs:
185 print "Id: ",Job['jobId']
186 print "Dest: ", Job['dlsDestination']
187 print "Output: ", Job['outputFiles']
188 print "Args: ",Job['arguments']
189 print "Service: ",Job.runningJob['service']
190 print "--------------------------"
191 return
192
193 def serializeTask(self, tmp_task = None):
194 if tmp_task is None:
195 tmp_task = self.getTask()
196 return common.bossSession.serialize(tmp_task)
197
198 def queryID(self,server_mode=0, jid=False):
199 '''
200 Return the taskId if serevr_mode =1
201 Return the joblistId if serevr_mode =0
202 '''
203 header=''
204 lines=[]
205 task = self.getTask()
206 if server_mode == 1:
207 # init client server params...
208 CliServerParams(self)
209 headerTask = "Task Id = %-40s\n" %(task['name'])
210 headerTask+= '--------------------------------------------------------------------------------------------\n'
211 displayReport(self,headerTask,lines)
212 common.logger.info(showWebMon(self.server_name))
213 if (jid ) or (server_mode == 0):
214 for job in task.jobs:
215 toPrint=''
216 common.bossSession.getRunningInstance(job)
217 toPrint = "%-5s %-50s " % (job['jobId'],job.runningJob['schedulerId'])
218 lines.append(toPrint)
219 header+= "%-5s %-50s\n " % ('Job:','ID' )
220 header+= '--------------------------------------------------------------------------------------------\n'
221 displayReport(self,header,lines)
222 return
223
224 def queryTask(self,attr):
225 '''
226 Perform a query over a generic task attribute
227 '''
228 task = self.getTask()
229 return task[attr]
230
231 def queryJob(self, attr, jobsL):
232 '''
233 Perform a query for a range/all/single job
234 over a generic job attribute
235 '''
236 lines=[]
237 task = self.getTask(jobsL)
238 for job in task.jobs:
239 lines.append(job[attr])
240 return lines
241
242 def queryRunJob(self, attr, jobsL):
243 '''
244 Perform a query for a range/all/single job
245 over a generic job attribute
246 '''
247 lines=[]
248 task = self.getTask(jobsL)
249 for job in task.jobs:
250 common.bossSession.getRunningInstance(job)
251 lines.append(job.runningJob[attr])
252 return lines
253
254 def queryDistJob(self, attr):
255 '''
256 Returns the list of distinct value for a given job attributes
257 '''
258 distAttr=[]
259 try:
260 task = common.bossSession.loadJobDist( 1, attr )
261 except Exception, e :
262 common.logger.debug( "Error loading Jobs By distinct Attr : " +str(traceback.format_exc()))
263 raise CrabException('Error loading Jobs By distinct Attr '+str(e))
264
265 for i in task: distAttr.append(i[attr])
266 return distAttr
267
268 def queryDistJob_Attr(self, attr_1, attr_2, list):
269 '''
270 Returns the list of distinct value for a given job attribute
271 '''
272 distAttr=[]
273 try:
274 task = common.bossSession.loadJobDistAttr( 1, attr_1, attr_2, list )
275 except Exception, e :
276 common.logger.debug( "Error loading Jobs By distinct Attr : " +str(traceback.format_exc()))
277 raise CrabException('Error loading Jobs By distinct Attr '+str(e))
278
279 for i in task: distAttr.append(i[attr_1])
280 return distAttr
281
282 def queryAttrJob(self, attr, field):
283 '''
284 Returns the list of jobs matching the given attribute
285 '''
286 matched=[]
287 try:
288 task = common.bossSession.loadJobsByAttr(attr )
289 except Exception, e :
290 common.logger.debug( "Error loading Jobs By Attr : " +str(traceback.format_exc()))
291 raise CrabException('Error loading Jobs By Attr '+str(e))
292 for i in task:
293 matched.append(i[field])
294 return matched
295
296
297 def queryAttrRunJob(self, attr,field):
298 '''
299 Returns the list of jobs matching the given attribute
300 '''
301 matched=[]
302 try:
303 task = common.bossSession.loadJobsByRunningAttr(attr)
304 except Exception, e :
305 common.logger.debug( "Error loading Jobs By Running Attr : " +str(traceback.format_exc()))
306 raise CrabException('Error loading Jobs By Running Attr '+str(e))
307 for i in task:
308 matched.append(i.runningJob[field])
309 return matched
310
311 def newRunJobs(self,nj='all'):
312 """
313 Get new running instances
314 """
315 task = self.getTask(nj)
316
317 for job in task.jobs:
318 common.bossSession.getNewRunningInstance(job)
319 job.runningJob['status'] = 'C'
320 job.runningJob['statusScheduler'] = 'Created'
321 job.runningJob['state'] = 'Created'
322 common.bossSession.updateDB(task)
323 return
324
325 def deserXmlStatus(self, reportList):
326
327 task = self.getTask()
328 for job in task.jobs:
329 if not job.runningJob:
330 raise CrabException( "Missing running object for job %s"%str(job['jobId']) )
331
332 id = str(job.runningJob['jobId'])
333 rForJ = None
334 nj_list= []
335 for r in reportList:
336 if r.getAttribute('id') in [ id, 'all']:
337 rForJ = r
338 break
339 ## Check the submission number and create new running jobs on the client side
340 if rForJ.getAttribute('resubmit') != 'None' and (rForJ.getAttribute('status') not in ['Cleared','Killed','Done','Done (Failed)','Not Submitted', 'Cancelled by user']) :
341 if int(job.runningJob['submission']) < int(rForJ.getAttribute('resubmit')) + 1:
342 nj_list.append(id)
343 if len(nj_list) > 0: self.newRunJobs(nj_list)
344
345 task_new = self.getTask()
346
347 for job in task_new.jobs:
348 id = str(job.runningJob['jobId'])
349 # TODO linear search, probably it can be optized with binary search
350 rForJ = None
351 for r in reportList:
352 if r.getAttribute('id') in [ id, 'all']:
353 rForJ = r
354 break
355
356 # Data alignment
357 if rForJ.getAttribute('status') not in ['Created', 'Unknown']:
358 # update the status
359 common.logger.debug("Updating DB status for job: " + str(id) + " @: " \
360 + str(rForJ.getAttribute('status')) )
361 job.runningJob['statusScheduler'] = str( rForJ.getAttribute('status') )
362 if (rForJ.getAttribute('status') == 'Done' or rForJ.getAttribute('status') == 'Done (Failed)')\
363 and rForJ.getAttribute('sched_status') == 'E' :
364 job.runningJob['status'] = 'SD'
365 else:
366 job.runningJob['status'] = str( rForJ.getAttribute('sched_status') )
367
368 job.runningJob['schedulerId'] = str( rForJ.getAttribute('sched_id') )
369
370 job.runningJob['destination'] = str( rForJ.getAttribute('site') )
371 dest = str(job.runningJob['destination']).split(':')[0]
372
373 job.runningJob['applicationReturnCode'] = str( rForJ.getAttribute('exe_exit') )
374 exe_exit_code = str(job.runningJob['applicationReturnCode'])
375
376 job.runningJob['wrapperReturnCode'] = str( rForJ.getAttribute('job_exit') )
377 job_exit_code = str(job.runningJob['wrapperReturnCode'])
378
379 job['closed'] = str( rForJ.getAttribute('ended') )
380
381 job.runningJob['state'] = str( rForJ.getAttribute('action') )
382
383 #if str( rForJ.getAttribute('resubmit') ).isdigit():
384 # job['submissionNumber'] = int(rForJ.getAttribute('resubmit'))
385 # job.runningJob['submission'] = int(rForJ.getAttribute('resubmit'))
386
387 # TODO cleared='0' field, how should it be handled/mapped in BL? #Fabio
388
389 common.bossSession.updateDB( task_new )
390 return
391
392 # FIXME temporary method to verify what kind of submission to perform towards the server
393 def checkIfNeverSubmittedBefore(self):
394 for j in self.getTask().jobs:
395 if j.runningJob['submission'] > 1 or j.runningJob['state'] != 'Created':
396 return False
397 return True
398
399