ViewVC Help
View File | Revision Log | Show Annotations | Root Listing
root/cvsroot/COMP/CRAB/python/DBinterface.py
Revision: 1.67
Committed: Fri Feb 19 15:13:37 2010 UTC (15 years, 2 months ago) by farinafa
Content type: text/x-python
Branch: MAIN
CVS Tags: CRAB_LumiMask, CRAB_2_7_lumi, from_LimiMask
Changes since 1.66: +2 -5 lines
Log Message:
runningJob submission field backpropagation to client for mixed cli/ser resubmission with unique names convention

File Contents

# Content
1 from crab_exceptions import *
2 from crab_util import *
3 import common
4 import os, time, shutil
5 import traceback
6
7 from ProdCommon.BossLite.API.BossLiteAPI import BossLiteAPI
8 from ProdCommon.BossLite.Common.Exceptions import DbError
9 from ProdCommon.BossLite.Common.Exceptions import TaskError
10
11 from ProdCommon.BossLite.DbObjects.Job import Job
12 from ProdCommon.BossLite.DbObjects.Task import Task
13 from ProdCommon.BossLite.DbObjects.RunningJob import RunningJob
14
15
16 class DBinterface:
17 def __init__(self, cfg_params):
18
19 self.cfg_params = cfg_params
20
21 self.db_type = cfg_params.get("USER.use_db",'SQLite')
22 return
23
24
25 def configureDB(self):
26
27 dbname = common.work_space.shareDir()+'crabDB'
28 dbConfig = {'dbName':dbname
29 }
30 try:
31 common.bossSession = BossLiteAPI( self.db_type, dbConfig)
32 except Exception, e :
33 raise CrabException('Istantiate DB Session : '+str(e))
34
35 try:
36 common.bossSession.bossLiteDB.installDB('$CRABPRODCOMMONPYTHON/ProdCommon/BossLite/DbObjects/setupDatabase-sqlite.sql')
37 except Exception, e :
38 raise CrabException('DB Installation error : '+str(e))
39 return
40
41 def loadDB(self):
42
43 dbname = common.work_space.shareDir()+'crabDB'
44 dbConfig = {'dbName':dbname
45 }
46 try:
47 common.bossSession = BossLiteAPI( self.db_type, dbConfig)
48 except Exception, e :
49 raise CrabException('Istantiate DB Session : '+str(e))
50
51 return
52
53 def getTask(self, jobsList='all'):
54 """
55 Return task with all/list of jobs
56 """
57 try:
58 task = common.bossSession.load(1,jobsList)
59 except Exception, e :
60 common.logger.debug( "Error while getting task : " +str(traceback.format_exc()))
61 raise CrabException('Error while getting task '+str(e))
62 return task
63
64 def getJob(self, n):
65 """
66 Return a task with a single job
67 """
68 try:
69 task = common.bossSession.load(1,str(n))
70 except Exception, e :
71 common.logger.debug( "Error while getting job : " +str(traceback.format_exc()))
72 raise CrabException('Error while getting job '+str(e))
73 return task
74
75
76 def createTask_(self, optsToSave):
77 """
78 Task declaration
79 with the first coniguration stuff
80 """
81 opt={}
82 if optsToSave.get('server_mode',0) == 1: opt['serverName']=optsToSave['server_name']
83 opt['name']= getUserName()+ '_' + string.split(common.work_space.topDir(),'/')[-2]+'_'+common.work_space.task_uuid()
84 task = Task( opt )
85 try:
86 common.bossSession.saveTask( task )
87 except Exception, e :
88 raise CrabException('Error creating task '+str(traceback.format_exc()))
89
90 return
91
92 def updateTask_(self,optsToSave):
93 """
94 Update task fields
95 """
96 task = self.getTask()
97
98 for key in optsToSave.keys():
99 task[key] = optsToSave[key]
100 try:
101 common.bossSession.updateDB( task )
102 except Exception, e :
103 raise CrabException('Error updating task '+str(traceback.format_exc()))
104
105 return
106
107 def createJobs_(self, jobsL, isNew=True):
108 """
109 Fill crab DB with the jobs filed
110 """
111 task = self.getTask()
112
113 jobs = []
114 for id in jobsL:
115 parameters = {}
116 parameters['jobId'] = int(id)
117 parameters['taskId'] = 1
118 parameters['name'] = task['name'] + '_' + 'job' + str(id)
119 job = Job(parameters)
120 jobs.append(job)
121 common.bossSession.getRunningInstance(job)
122 job.runningJob['status'] = 'C'
123 ## added to support second step creation
124 ## maybe it is not needed. TO CLARIFY
125 if isNew:
126 task.addJobs(jobs)
127 else:
128 task.appendJobs(jobs)
129 try:
130 common.bossSession.updateDB( task )
131 except Exception, e :
132 raise CrabException('Error updating task '+str(traceback.format_exc()))
133
134 return
135
136 def updateJob_(self, jobsL, optsToSave):
137 """
138 Update Job fields
139 """
140 task = self.getTask(jobsL)
141 id =0
142 for job in task.jobs:
143 for key in optsToSave[id].keys():
144 job[key] = optsToSave[id][key]
145 id+=1
146 try:
147 common.bossSession.updateDB( task )
148 except Exception, e :
149 raise CrabException('Error updating task '+str(traceback.format_exc()))
150 return
151
152 def updateRunJob_(self, jobsL, optsToSave):
153 """
154 Update Running Job fields
155 """
156 task = self.getTask(jobsL)
157
158 id=0
159 for job in task.jobs:
160 common.bossSession.getRunningInstance(job)
161 for key in optsToSave[id].keys():
162 job.runningJob[key] = optsToSave[id][key]
163 id+=1
164 common.bossSession.updateDB( task )
165 return
166
167 def nJobs(self,list=''):
168
169 task = self.getTask()
170 listId=[]
171 if list == 'list':
172 for job in task.jobs:listId.append(int(job['jobId']))
173 return listId
174 else:
175 return len(task.jobs)
176
177 def dump(self,jobs):
178 """
179 List a complete set of infos for a job/range of jobs
180 """
181 task = self.getTask(jobs)
182
183 print "--------------------------"
184 for Job in task.jobs:
185 print "Id: ",Job['jobId']
186 print "Dest: ", Job['dlsDestination']
187 print "Output: ", Job['outputFiles']
188 print "Args: ",Job['arguments']
189 print "Service: ",Job.runningJob['service']
190 print "--------------------------"
191 return
192
193 def serializeTask(self, tmp_task = None):
194 if tmp_task is None:
195 tmp_task = self.getTask()
196 return common.bossSession.serialize(tmp_task)
197
198 def queryID(self,server_mode=0, jid=False):
199 '''
200 Return the taskId if serevr_mode =1
201 Return the joblistId if serevr_mode =0
202 '''
203 header=''
204 lines=[]
205 task = self.getTask()
206 if server_mode == 1:
207 # init client server params...
208 CliServerParams(self)
209 headerTask = "Task Id = %-40s\n" %(task['name'])
210 headerTask+= '--------------------------------------------------------------------------------------------\n'
211 displayReport(self,headerTask,lines)
212 common.logger.info(showWebMon(self.server_name))
213 if (jid ) or (server_mode == 0):
214 for job in task.jobs:
215 toPrint=''
216 common.bossSession.getRunningInstance(job)
217 toPrint = "%-5s %-50s " % (job['jobId'],job.runningJob['schedulerId'])
218 lines.append(toPrint)
219 header+= "%-5s %-50s\n " % ('Job:','ID' )
220 header+= '--------------------------------------------------------------------------------------------\n'
221 displayReport(self,header,lines)
222 return
223
224 def queryTask(self,attr):
225 '''
226 Perform a query over a generic task attribute
227 '''
228 task = self.getTask()
229 return task[attr]
230
231 def queryJob(self, attr, jobsL):
232 '''
233 Perform a query for a range/all/single job
234 over a generic job attribute
235 '''
236 lines=[]
237 task = self.getTask(jobsL)
238 for job in task.jobs:
239 lines.append(job[attr])
240 return lines
241
242 def queryRunJob(self, attr, jobsL):
243 '''
244 Perform a query for a range/all/single job
245 over a generic job attribute
246 '''
247 lines=[]
248 task = self.getTask(jobsL)
249 for job in task.jobs:
250 common.bossSession.getRunningInstance(job)
251 lines.append(job.runningJob[attr])
252 return lines
253
254 def queryDistJob(self, attr):
255 '''
256 Returns the list of distinct value for a given job attributes
257 '''
258 distAttr=[]
259 try:
260 task = common.bossSession.loadJobDist( 1, attr )
261 except Exception, e :
262 common.logger.debug( "Error loading Jobs By distinct Attr : " +str(traceback.format_exc()))
263 raise CrabException('Error loading Jobs By distinct Attr '+str(e))
264
265 for i in task: distAttr.append(i[attr])
266 return distAttr
267
268 def queryDistJob_Attr(self, attr_1, attr_2, list):
269 '''
270 Returns the list of distinct value for a given job attribute
271 '''
272 distAttr=[]
273 try:
274 task = common.bossSession.loadJobDistAttr( 1, attr_1, attr_2, list )
275 except Exception, e :
276 common.logger.debug( "Error loading Jobs By distinct Attr : " +str(traceback.format_exc()))
277 raise CrabException('Error loading Jobs By distinct Attr '+str(e))
278
279 for i in task: distAttr.append(i[attr_1])
280 return distAttr
281
282 def queryAttrJob(self, attr, field):
283 '''
284 Returns the list of jobs matching the given attribute
285 '''
286 matched=[]
287 try:
288 task = common.bossSession.loadJobsByAttr(attr )
289 except Exception, e :
290 common.logger.debug( "Error loading Jobs By Attr : " +str(traceback.format_exc()))
291 raise CrabException('Error loading Jobs By Attr '+str(e))
292 for i in task:
293 matched.append(i[field])
294 return matched
295
296
297 def queryAttrRunJob(self, attr,field):
298 '''
299 Returns the list of jobs matching the given attribute
300 '''
301 matched=[]
302 try:
303 task = common.bossSession.loadJobsByRunningAttr(attr)
304 except Exception, e :
305 common.logger.debug( "Error loading Jobs By Running Attr : " +str(traceback.format_exc()))
306 raise CrabException('Error loading Jobs By Running Attr '+str(e))
307 for i in task:
308 matched.append(i.runningJob[field])
309 return matched
310
311 def newRunJobs(self,nj='all'):
312 """
313 Get new running instances
314 """
315 task = self.getTask(nj)
316
317 for job in task.jobs:
318 common.bossSession.getNewRunningInstance(job)
319 job.runningJob['status'] = 'C'
320 job.runningJob['statusScheduler'] = 'Created'
321 job.runningJob['state'] = 'Created'
322 common.bossSession.updateDB(task)
323 return
324
325 def deserXmlStatus(self, reportList):
326
327 task = self.getTask()
328 if int(self.cfg_params.get('WMBS.automation',0)) == 1:
329 if len(reportList) ==0:
330 msg = 'You are using CRAB with WMBS the server is still creating your jobs.\n'
331 msg += '\tPlease wait...'
332 raise CrabException(msg)
333 newJobs = len(reportList) - len(task.jobs)
334 if newJobs != 0:
335 isNew=True
336 if len(task.jobs):isNew=False
337 jobL=[]
338 for i in range(1,newJobs+1):
339 jobL.append(len(task.jobs)+i)
340 self.createJobs_(jobL,isNew)
341
342 for job in task.jobs:
343 if not job.runningJob:
344 raise CrabException( "Missing running object for job %s"%str(job['jobId']) )
345
346 id = str(job.runningJob['jobId'])
347 rForJ = None
348 nj_list= []
349 for r in reportList:
350 if r.getAttribute('id') in [ id, 'all']:
351 rForJ = r
352 break
353 ## Check the submission number and create new running jobs on the client side
354 if rForJ.getAttribute('resubmit') != 'None' and (rForJ.getAttribute('status') not in ['Cleared','Killed','Done','Done (Failed)','Not Submitted', 'Cancelled by user']) :
355 if int(job.runningJob['submission']) < int(rForJ.getAttribute('resubmit')) + 1:
356 nj_list.append(id)
357 if len(nj_list) > 0: self.newRunJobs(nj_list)
358
359 task_new = self.getTask()
360
361 for job in task_new.jobs:
362 id = str(job.runningJob['jobId'])
363 # TODO linear search, probably it can be optized with binary search
364 rForJ = None
365 for r in reportList:
366 if r.getAttribute('id') in [ id, 'all']:
367 rForJ = r
368 break
369
370 # Data alignment
371 if rForJ.getAttribute('status') not in ['Unknown']: # ['Created', 'Unknown']:
372 # update the status
373 common.logger.debug("Updating DB status for job: " + str(id) + " @: " \
374 + str(rForJ.getAttribute('status')) )
375 job.runningJob['statusScheduler'] = str( rForJ.getAttribute('status') )
376 if (rForJ.getAttribute('status') == 'Done' or rForJ.getAttribute('status') == 'Done (Failed)')\
377 and rForJ.getAttribute('sched_status') == 'E' :
378 job.runningJob['status'] = 'SD'
379 else:
380 job.runningJob['status'] = str( rForJ.getAttribute('sched_status') )
381
382 job.runningJob['schedulerId'] = str( rForJ.getAttribute('sched_id') )
383
384 job.runningJob['destination'] = str( rForJ.getAttribute('site') )
385 dest = str(job.runningJob['destination']).split(':')[0]
386
387 job.runningJob['applicationReturnCode'] = str( rForJ.getAttribute('exe_exit') )
388 exe_exit_code = str(job.runningJob['applicationReturnCode'])
389
390 job.runningJob['wrapperReturnCode'] = str( rForJ.getAttribute('job_exit') )
391 job_exit_code = str(job.runningJob['wrapperReturnCode'])
392
393 job['closed'] = str( rForJ.getAttribute('ended') )
394
395 job.runningJob['state'] = str( rForJ.getAttribute('action') )
396
397 # Needed for unique naming of the output
398 job.runningJob['submission'] = int(rForJ.getAttribute('submission'))
399
400 common.bossSession.updateDB( task_new )
401 return
402
403 # FIXME temporary method to verify what kind of submission to perform towards the server
404 def checkIfNeverSubmittedBefore(self):
405 for j in self.getTask().jobs:
406 if j.runningJob['submission'] > 1 or j.runningJob['state'] != 'Created':
407 return False
408 return True
409
410 # Method to update arguments w.r.t. resubmission number in order to grant unique output
411 def updateResubAttribs(self, jobsL):
412 task = self.getTask(jobsL)
413 for j in task.jobs:
414 common.bossSession.getRunningInstance(j)
415 newArgs = "%d %d"%(j.runningJob['jobId'], j.runningJob['submission'])
416 j['arguments'] = newArgs
417
418 common.bossSession.updateDB(task)
419 return
420
421