ViewVC Help
View File | Revision Log | Show Annotations | Root Listing
root/cvsroot/COMP/CRAB/python/DBinterface.py
Revision: 1.72
Committed: Mon Jan 21 11:22:46 2013 UTC (12 years, 3 months ago) by lolass
Content type: text/x-python
Branch: MAIN
CVS Tags: CRAB_2_8_5_patch3, CRAB_2_8_5_patch2, CRAB_2_8_5_patch1, CRAB_2_8_5, CRAB_2_8_5_pre5, CRAB_2_8_5_pre4
Changes since 1.71: +3 -1 lines
Log Message:
adding check for new siteDB

File Contents

# User Rev Content
1 spiga 1.1 from crab_exceptions import *
2     from crab_util import *
3     import common
4     import os, time, shutil
5 spiga 1.13 import traceback
6 spiga 1.1
7     from ProdCommon.BossLite.API.BossLiteAPI import BossLiteAPI
8 spiga 1.13 from ProdCommon.BossLite.Common.Exceptions import DbError
9     from ProdCommon.BossLite.Common.Exceptions import TaskError
10 spiga 1.1
11     from ProdCommon.BossLite.DbObjects.Job import Job
12     from ProdCommon.BossLite.DbObjects.Task import Task
13     from ProdCommon.BossLite.DbObjects.RunningJob import RunningJob
14    
15    
16     class DBinterface:
17 spiga 1.2 def __init__(self, cfg_params):
18 spiga 1.1
19 spiga 1.2 self.cfg_params = cfg_params
20    
21     self.db_type = cfg_params.get("USER.use_db",'SQLite')
22 spiga 1.1 return
23    
24    
25 spiga 1.2 def configureDB(self):
26 spiga 1.1
27     dbname = common.work_space.shareDir()+'crabDB'
28     dbConfig = {'dbName':dbname
29     }
30 spiga 1.13 try:
31     common.bossSession = BossLiteAPI( self.db_type, dbConfig)
32     except Exception, e :
33     raise CrabException('Istantiate DB Session : '+str(e))
34    
35     try:
36 spiga 1.30 common.bossSession.bossLiteDB.installDB('$CRABPRODCOMMONPYTHON/ProdCommon/BossLite/DbObjects/setupDatabase-sqlite.sql')
37 spiga 1.13 except Exception, e :
38     raise CrabException('DB Installation error : '+str(e))
39     return
40 spiga 1.1
41 spiga 1.2 def loadDB(self):
42    
43     dbname = common.work_space.shareDir()+'crabDB'
44     dbConfig = {'dbName':dbname
45     }
46 spiga 1.13 try:
47     common.bossSession = BossLiteAPI( self.db_type, dbConfig)
48     except Exception, e :
49     raise CrabException('Istantiate DB Session : '+str(e))
50    
51 spiga 1.2 return
52 spiga 1.4
53 spiga 1.13 def getTask(self, jobsList='all'):
54 spiga 1.9 """
55     Return task with all/list of jobs
56     """
57 spiga 1.13 try:
58 spiga 1.45 task = common.bossSession.load(1,jobsList)
59 spiga 1.13 except Exception, e :
60 spiga 1.59 common.logger.debug( "Error while getting task : " +str(traceback.format_exc()))
61 spiga 1.13 raise CrabException('Error while getting task '+str(e))
62 spiga 1.9 return task
63 spiga 1.2
64 slacapra 1.6 def getJob(self, n):
65 spiga 1.9 """
66     Return a task with a single job
67     """
68 spiga 1.13 try:
69 spiga 1.45 task = common.bossSession.load(1,str(n))
70 spiga 1.13 except Exception, e :
71 spiga 1.59 common.logger.debug( "Error while getting job : " +str(traceback.format_exc()))
72 spiga 1.13 raise CrabException('Error while getting job '+str(e))
73 spiga 1.9 return task
74 spiga 1.2
75 spiga 1.1
76     def createTask_(self, optsToSave):
77     """
78     Task declaration
79     with the first coniguration stuff
80     """
81     opt={}
82 slacapra 1.14 if optsToSave.get('server_mode',0) == 1: opt['serverName']=optsToSave['server_name']
83 lolass 1.72 if common.scheduler.name().upper() not in ['LSF', 'CAF', 'SGE', 'PBS']:
84     checkNewSiteDB()
85 spiga 1.55 opt['name']= getUserName()+ '_' + string.split(common.work_space.topDir(),'/')[-2]+'_'+common.work_space.task_uuid()
86 lolass 1.72 task = Task( opt )
87 spiga 1.13 try:
88     common.bossSession.saveTask( task )
89     except Exception, e :
90     raise CrabException('Error creating task '+str(traceback.format_exc()))
91    
92 spiga 1.1 return
93    
94     def updateTask_(self,optsToSave):
95     """
96     Update task fields
97     """
98 spiga 1.13 task = self.getTask()
99    
100 spiga 1.1 for key in optsToSave.keys():
101     task[key] = optsToSave[key]
102 spiga 1.13 try:
103     common.bossSession.updateDB( task )
104     except Exception, e :
105     raise CrabException('Error updating task '+str(traceback.format_exc()))
106    
107 spiga 1.1 return
108    
109 spiga 1.34 def createJobs_(self, jobsL, isNew=True):
110 spiga 1.1 """
111     Fill crab DB with the jobs filed
112     """
113 spiga 1.13 task = self.getTask()
114    
115 spiga 1.1 jobs = []
116 spiga 1.9 for id in jobsL:
117 spiga 1.1 parameters = {}
118 spiga 1.34 parameters['jobId'] = int(id)
119     parameters['taskId'] = 1
120 mcinquil 1.15 parameters['name'] = task['name'] + '_' + 'job' + str(id)
121 spiga 1.1 job = Job(parameters)
122 spiga 1.20 jobs.append(job)
123     common.bossSession.getRunningInstance(job)
124     job.runningJob['status'] = 'C'
125 spiga 1.34 ## added to support second step creation
126     ## maybe it is not needed. TO CLARIFY
127     if isNew:
128     task.addJobs(jobs)
129     else:
130     task.appendJobs(jobs)
131 spiga 1.13 try:
132     common.bossSession.updateDB( task )
133     except Exception, e :
134     raise CrabException('Error updating task '+str(traceback.format_exc()))
135    
136 spiga 1.1 return
137    
138 spiga 1.9 def updateJob_(self, jobsL, optsToSave):
139 spiga 1.1 """
140     Update Job fields
141     """
142 spiga 1.13 task = self.getTask(jobsL)
143 spiga 1.10 id =0
144     for job in task.jobs:
145 spiga 1.9 for key in optsToSave[id].keys():
146 spiga 1.10 job[key] = optsToSave[id][key]
147     id+=1
148 spiga 1.13 try:
149     common.bossSession.updateDB( task )
150     except Exception, e :
151     raise CrabException('Error updating task '+str(traceback.format_exc()))
152 spiga 1.1 return
153    
154 spiga 1.9 def updateRunJob_(self, jobsL, optsToSave):
155 spiga 1.1 """
156     Update Running Job fields
157     """
158 spiga 1.13 task = self.getTask(jobsL)
159    
160 spiga 1.12 id=0
161 spiga 1.10 for job in task.jobs:
162     common.bossSession.getRunningInstance(job)
163 spiga 1.12 for key in optsToSave[id].keys():
164     job.runningJob[key] = optsToSave[id][key]
165     id+=1
166 spiga 1.1 common.bossSession.updateDB( task )
167     return
168    
169 spiga 1.9 def nJobs(self,list=''):
170 spiga 1.2
171 spiga 1.13 task = self.getTask()
172 spiga 1.9 listId=[]
173     if list == 'list':
174     for job in task.jobs:listId.append(int(job['jobId']))
175     return listId
176     else:
177     return len(task.jobs)
178 spiga 1.1
179     def dump(self,jobs):
180     """
181     List a complete set of infos for a job/range of jobs
182     """
183 slacapra 1.17 task = self.getTask(jobs)
184 spiga 1.1
185 slacapra 1.17 print "--------------------------"
186 spiga 1.32 for Job in task.jobs:
187 spiga 1.24 print "Id: ",Job['jobId']
188 slacapra 1.17 print "Dest: ", Job['dlsDestination']
189     print "Output: ", Job['outputFiles']
190     print "Args: ",Job['arguments']
191 spiga 1.32 print "Service: ",Job.runningJob['service']
192 slacapra 1.17 print "--------------------------"
193 spiga 1.1 return
194 farinafa 1.8
195     def serializeTask(self, tmp_task = None):
196     if tmp_task is None:
197 spiga 1.13 tmp_task = self.getTask()
198 farinafa 1.8 return common.bossSession.serialize(tmp_task)
199 spiga 1.1
200 spiga 1.43 def queryID(self,server_mode=0, jid=False):
201 spiga 1.1 '''
202     Return the taskId if serevr_mode =1
203     Return the joblistId if serevr_mode =0
204     '''
205     header=''
206     lines=[]
207 spiga 1.13 task = self.getTask()
208 spiga 1.1 if server_mode == 1:
209 spiga 1.51 # init client server params...
210     CliServerParams(self)
211 spiga 1.62 headerTask = "Task Id = %-40s\n" %(task['name'])
212     headerTask+= '--------------------------------------------------------------------------------------------\n'
213 spiga 1.43 displayReport(self,headerTask,lines)
214 spiga 1.59 common.logger.info(showWebMon(self.server_name))
215 spiga 1.43 if (jid ) or (server_mode == 0):
216 spiga 1.11 for job in task.jobs:
217     toPrint=''
218     common.bossSession.getRunningInstance(job)
219 spiga 1.24 toPrint = "%-5s %-50s " % (job['jobId'],job.runningJob['schedulerId'])
220 spiga 1.11 lines.append(toPrint)
221 spiga 1.62 header+= "%-5s %-50s\n " % ('Job:','ID' )
222     header+= '--------------------------------------------------------------------------------------------\n'
223 spiga 1.43 displayReport(self,header,lines)
224 spiga 1.1 return
225    
226     def queryTask(self,attr):
227     '''
228     Perform a query over a generic task attribute
229     '''
230 spiga 1.13 task = self.getTask()
231 spiga 1.1 return task[attr]
232    
233 spiga 1.12 def queryJob(self, attr, jobsL):
234 spiga 1.1 '''
235     Perform a query for a range/all/single job
236     over a generic job attribute
237     '''
238     lines=[]
239 spiga 1.13 task = self.getTask(jobsL)
240 spiga 1.9 for job in task.jobs:
241 spiga 1.19 lines.append(job[attr])
242 spiga 1.1 return lines
243    
244 spiga 1.12 def queryRunJob(self, attr, jobsL):
245 spiga 1.1 '''
246     Perform a query for a range/all/single job
247     over a generic job attribute
248     '''
249     lines=[]
250 spiga 1.13 task = self.getTask(jobsL)
251 spiga 1.9 for job in task.jobs:
252     common.bossSession.getRunningInstance(job)
253     lines.append(job.runningJob[attr])
254 spiga 1.1 return lines
255 spiga 1.3
256     def queryDistJob(self, attr):
257     '''
258     Returns the list of distinct value for a given job attributes
259     '''
260     distAttr=[]
261 spiga 1.13 try:
262     task = common.bossSession.loadJobDist( 1, attr )
263     except Exception, e :
264 spiga 1.59 common.logger.debug( "Error loading Jobs By distinct Attr : " +str(traceback.format_exc()))
265 spiga 1.13 raise CrabException('Error loading Jobs By distinct Attr '+str(e))
266    
267 spiga 1.19 for i in task: distAttr.append(i[attr])
268 spiga 1.3 return distAttr
269    
270 spiga 1.5 def queryDistJob_Attr(self, attr_1, attr_2, list):
271 spiga 1.4 '''
272 spiga 1.9 Returns the list of distinct value for a given job attribute
273 spiga 1.4 '''
274     distAttr=[]
275 spiga 1.13 try:
276     task = common.bossSession.loadJobDistAttr( 1, attr_1, attr_2, list )
277     except Exception, e :
278 spiga 1.59 common.logger.debug( "Error loading Jobs By distinct Attr : " +str(traceback.format_exc()))
279 spiga 1.13 raise CrabException('Error loading Jobs By distinct Attr '+str(e))
280    
281 spiga 1.19 for i in task: distAttr.append(i[attr_1])
282 spiga 1.4 return distAttr
283 slacapra 1.6
284 spiga 1.3 def queryAttrJob(self, attr, field):
285     '''
286     Returns the list of jobs matching the given attribute
287     '''
288     matched=[]
289 spiga 1.13 try:
290     task = common.bossSession.loadJobsByAttr(attr )
291     except Exception, e :
292 spiga 1.59 common.logger.debug( "Error loading Jobs By Attr : " +str(traceback.format_exc()))
293 spiga 1.13 raise CrabException('Error loading Jobs By Attr '+str(e))
294 spiga 1.3 for i in task:
295     matched.append(i[field])
296     return matched
297    
298 spiga 1.7
299     def queryAttrRunJob(self, attr,field):
300     '''
301     Returns the list of jobs matching the given attribute
302     '''
303     matched=[]
304 spiga 1.13 try:
305     task = common.bossSession.loadJobsByRunningAttr(attr)
306     except Exception, e :
307 spiga 1.59 common.logger.debug( "Error loading Jobs By Running Attr : " +str(traceback.format_exc()))
308 spiga 1.13 raise CrabException('Error loading Jobs By Running Attr '+str(e))
309 spiga 1.7 for i in task:
310 spiga 1.18 matched.append(i.runningJob[field])
311 spiga 1.7 return matched
312 spiga 1.16
313 spiga 1.22 def newRunJobs(self,nj='all'):
314     """
315     Get new running instances
316     """
317     task = self.getTask(nj)
318    
319     for job in task.jobs:
320 spiga 1.24 common.bossSession.getNewRunningInstance(job)
321     job.runningJob['status'] = 'C'
322     job.runningJob['statusScheduler'] = 'Created'
323 spiga 1.60 job.runningJob['state'] = 'Created'
324 spiga 1.24 common.bossSession.updateDB(task)
325 spiga 1.22 return
326    
327 spiga 1.16 def deserXmlStatus(self, reportList):
328    
329     task = self.getTask()
330 spiga 1.64 if int(self.cfg_params.get('WMBS.automation',0)) == 1:
331     if len(reportList) ==0:
332     msg = 'You are using CRAB with WMBS the server is still creating your jobs.\n'
333     msg += '\tPlease wait...'
334     raise CrabException(msg)
335     newJobs = len(reportList) - len(task.jobs)
336     if newJobs != 0:
337     isNew=True
338 riahi 1.65 if len(task.jobs):isNew=False
339 spiga 1.64 jobL=[]
340     for i in range(1,newJobs+1):
341     jobL.append(len(task.jobs)+i)
342     self.createJobs_(jobL,isNew)
343    
344 spiga 1.16 for job in task.jobs:
345     if not job.runningJob:
346 spiga 1.25 raise CrabException( "Missing running object for job %s"%str(job['jobId']) )
347 farinafa 1.37
348 spiga 1.25 id = str(job.runningJob['jobId'])
349 spiga 1.16 rForJ = None
350 spiga 1.33 nj_list= []
351 spiga 1.16 for r in reportList:
352     if r.getAttribute('id') in [ id, 'all']:
353     rForJ = r
354     break
355 farinafa 1.69
356     # check if rForJ is None
357     if rForJ is None:
358     common.logger.debug( "Missing XML element for job %s, skip update status"%str(id) )
359     continue
360    
361 mcinquil 1.50 ## Check the submission number and create new running jobs on the client side
362 mcinquil 1.58 if rForJ.getAttribute('resubmit') != 'None' and (rForJ.getAttribute('status') not in ['Cleared','Killed','Done','Done (Failed)','Not Submitted', 'Cancelled by user']) :
363 spiga 1.43 if int(job.runningJob['submission']) < int(rForJ.getAttribute('resubmit')) + 1:
364     nj_list.append(id)
365 spiga 1.48 if len(nj_list) > 0: self.newRunJobs(nj_list)
366 spiga 1.33
367     task_new = self.getTask()
368 spiga 1.16
369 spiga 1.33 for job in task_new.jobs:
370     id = str(job.runningJob['jobId'])
371     # TODO linear search, probably it can be optized with binary search
372     rForJ = None
373     for r in reportList:
374     if r.getAttribute('id') in [ id, 'all']:
375     rForJ = r
376 mcinquil 1.50 break
377 spiga 1.43
378 spiga 1.16 # Data alignment
379 mcinquil 1.63 if rForJ.getAttribute('status') not in ['Unknown']: # ['Created', 'Unknown']:
380 spiga 1.38 # update the status
381 spiga 1.59 common.logger.debug("Updating DB status for job: " + str(id) + " @: " \
382 spiga 1.38 + str(rForJ.getAttribute('status')) )
383     job.runningJob['statusScheduler'] = str( rForJ.getAttribute('status') )
384 mcinquil 1.42 if (rForJ.getAttribute('status') == 'Done' or rForJ.getAttribute('status') == 'Done (Failed)')\
385     and rForJ.getAttribute('sched_status') == 'E' :
386 spiga 1.41 job.runningJob['status'] = 'SD'
387     else:
388     job.runningJob['status'] = str( rForJ.getAttribute('sched_status') )
389 spiga 1.35
390 spiga 1.43 job.runningJob['schedulerId'] = str( rForJ.getAttribute('sched_id') )
391    
392 spiga 1.39 job.runningJob['destination'] = str( rForJ.getAttribute('site') )
393     dest = str(job.runningJob['destination']).split(':')[0]
394    
395     job.runningJob['applicationReturnCode'] = str( rForJ.getAttribute('exe_exit') )
396     exe_exit_code = str(job.runningJob['applicationReturnCode'])
397    
398     job.runningJob['wrapperReturnCode'] = str( rForJ.getAttribute('job_exit') )
399     job_exit_code = str(job.runningJob['wrapperReturnCode'])
400 mcinquil 1.50
401 spiga 1.62 job['closed'] = str( rForJ.getAttribute('ended') )
402 mcinquil 1.56
403 mcinquil 1.58 job.runningJob['state'] = str( rForJ.getAttribute('action') )
404 spiga 1.35
405 farinafa 1.69 # Needed for unique naming of the output.
406 farinafa 1.71 job['arguments'] = "%d %s"%(job.runningJob['jobId'], str(rForJ.getAttribute('submission')).strip() )
407 spiga 1.35
408 spiga 1.33 common.bossSession.updateDB( task_new )
409 farinafa 1.27 return
410 spiga 1.30
411     # FIXME temporary method to verify what kind of submission to perform towards the server
412     def checkIfNeverSubmittedBefore(self):
413     for j in self.getTask().jobs:
414 slacapra 1.61 if j.runningJob['submission'] > 1 or j.runningJob['state'] != 'Created':
415 spiga 1.30 return False
416     return True
417    
418 farinafa 1.71 # Method to update arguments w.r.t. resubmission number in order to grant unique output
419 farinafa 1.66 def updateResubAttribs(self, jobsL):
420     task = self.getTask(jobsL)
421     for j in task.jobs:
422 farinafa 1.71 common.bossSession.getRunningInstance(j)
423     try:
424     resubNum = int(str(j['arguments']).split(' ')[1]) + 1
425     except Exception, e:
426     resubNum = j.runningJob['submission']
427     newArgs = "%d %d"%(j.runningJob['jobId'], resubNum)
428 farinafa 1.66 j['arguments'] = newArgs
429 farinafa 1.71
430     common.bossSession.updateDB(task)
431 farinafa 1.66 return
432