ViewVC Help
View File | Revision Log | Show Annotations | Root Listing
root/cvsroot/COMP/CRAB/python/DBinterface.py
Revision: 1.66.2.2
Committed: Tue Feb 23 16:53:16 2010 UTC (15 years, 2 months ago) by farinafa
Content type: text/x-python
Branch: CRAB_2_7_1_branch
CVS Tags: CRAB_2_7_2_p1, CRAB_2_7_1_branch_firstMERGE, CRAB_2_7_2, CRAB_2_7_2_pre4, CRAB_2_7_2_pre3, CRAB_2_7_2_pre2, CRAB_2_7_2_pre1, CRAB_2_7_1, CRAB_2_7_1_pre12, CRAB_2_7_1_pre11, CRAB_2_7_1_pre10, CRAB_2_7_1_pre9
Changes since 1.66.2.1: +3 -2 lines
Log Message:
Rollback for runningJob submission alignment. Will be investigated in next releases

File Contents

# User Rev Content
1 spiga 1.1 from crab_exceptions import *
2     from crab_util import *
3     import common
4     import os, time, shutil
5 spiga 1.13 import traceback
6 spiga 1.1
7     from ProdCommon.BossLite.API.BossLiteAPI import BossLiteAPI
8 spiga 1.13 from ProdCommon.BossLite.Common.Exceptions import DbError
9     from ProdCommon.BossLite.Common.Exceptions import TaskError
10 spiga 1.1
11     from ProdCommon.BossLite.DbObjects.Job import Job
12     from ProdCommon.BossLite.DbObjects.Task import Task
13     from ProdCommon.BossLite.DbObjects.RunningJob import RunningJob
14    
15    
16     class DBinterface:
17 spiga 1.2 def __init__(self, cfg_params):
18 spiga 1.1
19 spiga 1.2 self.cfg_params = cfg_params
20    
21     self.db_type = cfg_params.get("USER.use_db",'SQLite')
22 spiga 1.1 return
23    
24    
25 spiga 1.2 def configureDB(self):
26 spiga 1.1
27     dbname = common.work_space.shareDir()+'crabDB'
28     dbConfig = {'dbName':dbname
29     }
30 spiga 1.13 try:
31     common.bossSession = BossLiteAPI( self.db_type, dbConfig)
32     except Exception, e :
33     raise CrabException('Istantiate DB Session : '+str(e))
34    
35     try:
36 spiga 1.30 common.bossSession.bossLiteDB.installDB('$CRABPRODCOMMONPYTHON/ProdCommon/BossLite/DbObjects/setupDatabase-sqlite.sql')
37 spiga 1.13 except Exception, e :
38     raise CrabException('DB Installation error : '+str(e))
39     return
40 spiga 1.1
41 spiga 1.2 def loadDB(self):
42    
43     dbname = common.work_space.shareDir()+'crabDB'
44     dbConfig = {'dbName':dbname
45     }
46 spiga 1.13 try:
47     common.bossSession = BossLiteAPI( self.db_type, dbConfig)
48     except Exception, e :
49     raise CrabException('Istantiate DB Session : '+str(e))
50    
51 spiga 1.2 return
52 spiga 1.4
53 spiga 1.13 def getTask(self, jobsList='all'):
54 spiga 1.9 """
55     Return task with all/list of jobs
56     """
57 spiga 1.13 try:
58 spiga 1.45 task = common.bossSession.load(1,jobsList)
59 spiga 1.13 except Exception, e :
60 spiga 1.59 common.logger.debug( "Error while getting task : " +str(traceback.format_exc()))
61 spiga 1.13 raise CrabException('Error while getting task '+str(e))
62 spiga 1.9 return task
63 spiga 1.2
64 slacapra 1.6 def getJob(self, n):
65 spiga 1.9 """
66     Return a task with a single job
67     """
68 spiga 1.13 try:
69 spiga 1.45 task = common.bossSession.load(1,str(n))
70 spiga 1.13 except Exception, e :
71 spiga 1.59 common.logger.debug( "Error while getting job : " +str(traceback.format_exc()))
72 spiga 1.13 raise CrabException('Error while getting job '+str(e))
73 spiga 1.9 return task
74 spiga 1.2
75 spiga 1.1
76     def createTask_(self, optsToSave):
77     """
78     Task declaration
79     with the first coniguration stuff
80     """
81     opt={}
82 slacapra 1.14 if optsToSave.get('server_mode',0) == 1: opt['serverName']=optsToSave['server_name']
83 spiga 1.55 opt['name']= getUserName()+ '_' + string.split(common.work_space.topDir(),'/')[-2]+'_'+common.work_space.task_uuid()
84 spiga 1.1 task = Task( opt )
85 spiga 1.13 try:
86     common.bossSession.saveTask( task )
87     except Exception, e :
88     raise CrabException('Error creating task '+str(traceback.format_exc()))
89    
90 spiga 1.1 return
91    
92     def updateTask_(self,optsToSave):
93     """
94     Update task fields
95     """
96 spiga 1.13 task = self.getTask()
97    
98 spiga 1.1 for key in optsToSave.keys():
99     task[key] = optsToSave[key]
100 spiga 1.13 try:
101     common.bossSession.updateDB( task )
102     except Exception, e :
103     raise CrabException('Error updating task '+str(traceback.format_exc()))
104    
105 spiga 1.1 return
106    
107 spiga 1.34 def createJobs_(self, jobsL, isNew=True):
108 spiga 1.1 """
109     Fill crab DB with the jobs filed
110     """
111 spiga 1.13 task = self.getTask()
112    
113 spiga 1.1 jobs = []
114 spiga 1.9 for id in jobsL:
115 spiga 1.1 parameters = {}
116 spiga 1.34 parameters['jobId'] = int(id)
117     parameters['taskId'] = 1
118 mcinquil 1.15 parameters['name'] = task['name'] + '_' + 'job' + str(id)
119 spiga 1.1 job = Job(parameters)
120 spiga 1.20 jobs.append(job)
121     common.bossSession.getRunningInstance(job)
122     job.runningJob['status'] = 'C'
123 spiga 1.34 ## added to support second step creation
124     ## maybe it is not needed. TO CLARIFY
125     if isNew:
126     task.addJobs(jobs)
127     else:
128     task.appendJobs(jobs)
129 spiga 1.13 try:
130     common.bossSession.updateDB( task )
131     except Exception, e :
132     raise CrabException('Error updating task '+str(traceback.format_exc()))
133    
134 spiga 1.1 return
135    
136 spiga 1.9 def updateJob_(self, jobsL, optsToSave):
137 spiga 1.1 """
138     Update Job fields
139     """
140 spiga 1.13 task = self.getTask(jobsL)
141 spiga 1.10 id =0
142     for job in task.jobs:
143 spiga 1.9 for key in optsToSave[id].keys():
144 spiga 1.10 job[key] = optsToSave[id][key]
145     id+=1
146 spiga 1.13 try:
147     common.bossSession.updateDB( task )
148     except Exception, e :
149     raise CrabException('Error updating task '+str(traceback.format_exc()))
150 spiga 1.1 return
151    
152 spiga 1.9 def updateRunJob_(self, jobsL, optsToSave):
153 spiga 1.1 """
154     Update Running Job fields
155     """
156 spiga 1.13 task = self.getTask(jobsL)
157    
158 spiga 1.12 id=0
159 spiga 1.10 for job in task.jobs:
160     common.bossSession.getRunningInstance(job)
161 spiga 1.12 for key in optsToSave[id].keys():
162     job.runningJob[key] = optsToSave[id][key]
163     id+=1
164 spiga 1.1 common.bossSession.updateDB( task )
165     return
166    
167 spiga 1.9 def nJobs(self,list=''):
168 spiga 1.2
169 spiga 1.13 task = self.getTask()
170 spiga 1.9 listId=[]
171     if list == 'list':
172     for job in task.jobs:listId.append(int(job['jobId']))
173     return listId
174     else:
175     return len(task.jobs)
176 spiga 1.1
177     def dump(self,jobs):
178     """
179     List a complete set of infos for a job/range of jobs
180     """
181 slacapra 1.17 task = self.getTask(jobs)
182 spiga 1.1
183 slacapra 1.17 print "--------------------------"
184 spiga 1.32 for Job in task.jobs:
185 spiga 1.24 print "Id: ",Job['jobId']
186 slacapra 1.17 print "Dest: ", Job['dlsDestination']
187     print "Output: ", Job['outputFiles']
188     print "Args: ",Job['arguments']
189 spiga 1.32 print "Service: ",Job.runningJob['service']
190 slacapra 1.17 print "--------------------------"
191 spiga 1.1 return
192 farinafa 1.8
193     def serializeTask(self, tmp_task = None):
194     if tmp_task is None:
195 spiga 1.13 tmp_task = self.getTask()
196 farinafa 1.8 return common.bossSession.serialize(tmp_task)
197 spiga 1.1
198 spiga 1.43 def queryID(self,server_mode=0, jid=False):
199 spiga 1.1 '''
200     Return the taskId if serevr_mode =1
201     Return the joblistId if serevr_mode =0
202     '''
203     header=''
204     lines=[]
205 spiga 1.13 task = self.getTask()
206 spiga 1.1 if server_mode == 1:
207 spiga 1.51 # init client server params...
208     CliServerParams(self)
209 spiga 1.62 headerTask = "Task Id = %-40s\n" %(task['name'])
210     headerTask+= '--------------------------------------------------------------------------------------------\n'
211 spiga 1.43 displayReport(self,headerTask,lines)
212 spiga 1.59 common.logger.info(showWebMon(self.server_name))
213 spiga 1.43 if (jid ) or (server_mode == 0):
214 spiga 1.11 for job in task.jobs:
215     toPrint=''
216     common.bossSession.getRunningInstance(job)
217 spiga 1.24 toPrint = "%-5s %-50s " % (job['jobId'],job.runningJob['schedulerId'])
218 spiga 1.11 lines.append(toPrint)
219 spiga 1.62 header+= "%-5s %-50s\n " % ('Job:','ID' )
220     header+= '--------------------------------------------------------------------------------------------\n'
221 spiga 1.43 displayReport(self,header,lines)
222 spiga 1.1 return
223    
224     def queryTask(self,attr):
225     '''
226     Perform a query over a generic task attribute
227     '''
228 spiga 1.13 task = self.getTask()
229 spiga 1.1 return task[attr]
230    
231 spiga 1.12 def queryJob(self, attr, jobsL):
232 spiga 1.1 '''
233     Perform a query for a range/all/single job
234     over a generic job attribute
235     '''
236     lines=[]
237 spiga 1.13 task = self.getTask(jobsL)
238 spiga 1.9 for job in task.jobs:
239 spiga 1.19 lines.append(job[attr])
240 spiga 1.1 return lines
241    
242 spiga 1.12 def queryRunJob(self, attr, jobsL):
243 spiga 1.1 '''
244     Perform a query for a range/all/single job
245     over a generic job attribute
246     '''
247     lines=[]
248 spiga 1.13 task = self.getTask(jobsL)
249 spiga 1.9 for job in task.jobs:
250     common.bossSession.getRunningInstance(job)
251     lines.append(job.runningJob[attr])
252 spiga 1.1 return lines
253 spiga 1.3
254     def queryDistJob(self, attr):
255     '''
256     Returns the list of distinct value for a given job attributes
257     '''
258     distAttr=[]
259 spiga 1.13 try:
260     task = common.bossSession.loadJobDist( 1, attr )
261     except Exception, e :
262 spiga 1.59 common.logger.debug( "Error loading Jobs By distinct Attr : " +str(traceback.format_exc()))
263 spiga 1.13 raise CrabException('Error loading Jobs By distinct Attr '+str(e))
264    
265 spiga 1.19 for i in task: distAttr.append(i[attr])
266 spiga 1.3 return distAttr
267    
268 spiga 1.5 def queryDistJob_Attr(self, attr_1, attr_2, list):
269 spiga 1.4 '''
270 spiga 1.9 Returns the list of distinct value for a given job attribute
271 spiga 1.4 '''
272     distAttr=[]
273 spiga 1.13 try:
274     task = common.bossSession.loadJobDistAttr( 1, attr_1, attr_2, list )
275     except Exception, e :
276 spiga 1.59 common.logger.debug( "Error loading Jobs By distinct Attr : " +str(traceback.format_exc()))
277 spiga 1.13 raise CrabException('Error loading Jobs By distinct Attr '+str(e))
278    
279 spiga 1.19 for i in task: distAttr.append(i[attr_1])
280 spiga 1.4 return distAttr
281 slacapra 1.6
282 spiga 1.3 def queryAttrJob(self, attr, field):
283     '''
284     Returns the list of jobs matching the given attribute
285     '''
286     matched=[]
287 spiga 1.13 try:
288     task = common.bossSession.loadJobsByAttr(attr )
289     except Exception, e :
290 spiga 1.59 common.logger.debug( "Error loading Jobs By Attr : " +str(traceback.format_exc()))
291 spiga 1.13 raise CrabException('Error loading Jobs By Attr '+str(e))
292 spiga 1.3 for i in task:
293     matched.append(i[field])
294     return matched
295    
296 spiga 1.7
297     def queryAttrRunJob(self, attr,field):
298     '''
299     Returns the list of jobs matching the given attribute
300     '''
301     matched=[]
302 spiga 1.13 try:
303     task = common.bossSession.loadJobsByRunningAttr(attr)
304     except Exception, e :
305 spiga 1.59 common.logger.debug( "Error loading Jobs By Running Attr : " +str(traceback.format_exc()))
306 spiga 1.13 raise CrabException('Error loading Jobs By Running Attr '+str(e))
307 spiga 1.7 for i in task:
308 spiga 1.18 matched.append(i.runningJob[field])
309 spiga 1.7 return matched
310 spiga 1.16
311 spiga 1.22 def newRunJobs(self,nj='all'):
312     """
313     Get new running instances
314     """
315     task = self.getTask(nj)
316    
317     for job in task.jobs:
318 spiga 1.24 common.bossSession.getNewRunningInstance(job)
319     job.runningJob['status'] = 'C'
320     job.runningJob['statusScheduler'] = 'Created'
321 spiga 1.60 job.runningJob['state'] = 'Created'
322 spiga 1.24 common.bossSession.updateDB(task)
323 spiga 1.22 return
324    
325 spiga 1.16 def deserXmlStatus(self, reportList):
326    
327     task = self.getTask()
328 spiga 1.64 if int(self.cfg_params.get('WMBS.automation',0)) == 1:
329     if len(reportList) ==0:
330     msg = 'You are using CRAB with WMBS the server is still creating your jobs.\n'
331     msg += '\tPlease wait...'
332     raise CrabException(msg)
333     newJobs = len(reportList) - len(task.jobs)
334     if newJobs != 0:
335     isNew=True
336 riahi 1.65 if len(task.jobs):isNew=False
337 spiga 1.64 jobL=[]
338     for i in range(1,newJobs+1):
339     jobL.append(len(task.jobs)+i)
340     self.createJobs_(jobL,isNew)
341    
342 spiga 1.16 for job in task.jobs:
343     if not job.runningJob:
344 spiga 1.25 raise CrabException( "Missing running object for job %s"%str(job['jobId']) )
345 farinafa 1.37
346 spiga 1.25 id = str(job.runningJob['jobId'])
347 spiga 1.16 rForJ = None
348 spiga 1.33 nj_list= []
349 spiga 1.16 for r in reportList:
350     if r.getAttribute('id') in [ id, 'all']:
351     rForJ = r
352     break
353 mcinquil 1.50 ## Check the submission number and create new running jobs on the client side
354 mcinquil 1.58 if rForJ.getAttribute('resubmit') != 'None' and (rForJ.getAttribute('status') not in ['Cleared','Killed','Done','Done (Failed)','Not Submitted', 'Cancelled by user']) :
355 spiga 1.43 if int(job.runningJob['submission']) < int(rForJ.getAttribute('resubmit')) + 1:
356     nj_list.append(id)
357 spiga 1.48 if len(nj_list) > 0: self.newRunJobs(nj_list)
358 spiga 1.33
359     task_new = self.getTask()
360 spiga 1.16
361 spiga 1.33 for job in task_new.jobs:
362     id = str(job.runningJob['jobId'])
363     # TODO linear search, probably it can be optized with binary search
364     rForJ = None
365     for r in reportList:
366     if r.getAttribute('id') in [ id, 'all']:
367     rForJ = r
368 mcinquil 1.50 break
369 spiga 1.43
370 spiga 1.16 # Data alignment
371 mcinquil 1.63 if rForJ.getAttribute('status') not in ['Unknown']: # ['Created', 'Unknown']:
372 spiga 1.38 # update the status
373 spiga 1.59 common.logger.debug("Updating DB status for job: " + str(id) + " @: " \
374 spiga 1.38 + str(rForJ.getAttribute('status')) )
375     job.runningJob['statusScheduler'] = str( rForJ.getAttribute('status') )
376 mcinquil 1.42 if (rForJ.getAttribute('status') == 'Done' or rForJ.getAttribute('status') == 'Done (Failed)')\
377     and rForJ.getAttribute('sched_status') == 'E' :
378 spiga 1.41 job.runningJob['status'] = 'SD'
379     else:
380     job.runningJob['status'] = str( rForJ.getAttribute('sched_status') )
381 spiga 1.35
382 spiga 1.43 job.runningJob['schedulerId'] = str( rForJ.getAttribute('sched_id') )
383    
384 spiga 1.39 job.runningJob['destination'] = str( rForJ.getAttribute('site') )
385     dest = str(job.runningJob['destination']).split(':')[0]
386    
387     job.runningJob['applicationReturnCode'] = str( rForJ.getAttribute('exe_exit') )
388     exe_exit_code = str(job.runningJob['applicationReturnCode'])
389    
390     job.runningJob['wrapperReturnCode'] = str( rForJ.getAttribute('job_exit') )
391     job_exit_code = str(job.runningJob['wrapperReturnCode'])
392 mcinquil 1.50
393 spiga 1.62 job['closed'] = str( rForJ.getAttribute('ended') )
394 mcinquil 1.56
395 mcinquil 1.58 job.runningJob['state'] = str( rForJ.getAttribute('action') )
396 spiga 1.35
397 farinafa 1.66.2.2 # Needed for unique naming of the output.
398     # GIVES PROBLEMS. FIX in >=2_7_2
399     #job.runningJob['submission'] = int(rForJ.getAttribute('submission'))
400 spiga 1.35
401 spiga 1.33 common.bossSession.updateDB( task_new )
402 farinafa 1.27 return
403 spiga 1.30
404     # FIXME temporary method to verify what kind of submission to perform towards the server
405     def checkIfNeverSubmittedBefore(self):
406     for j in self.getTask().jobs:
407 slacapra 1.61 if j.runningJob['submission'] > 1 or j.runningJob['state'] != 'Created':
408 spiga 1.30 return False
409     return True
410    
411 farinafa 1.66 # Method to update arguments w.r.t. resubmission number in order to grant unique output
412     def updateResubAttribs(self, jobsL):
413     task = self.getTask(jobsL)
414     for j in task.jobs:
415     common.bossSession.getRunningInstance(j)
416     newArgs = "%d %d"%(j.runningJob['jobId'], j.runningJob['submission'])
417     j['arguments'] = newArgs
418    
419     common.bossSession.updateDB(task)
420     return
421    
422 spiga 1.30