ViewVC Help
View File | Revision Log | Show Annotations | Root Listing
root/cvsroot/COMP/CRAB/python/DBinterface.py
Revision: 1.7
Committed: Mon Mar 17 14:00:59 2008 UTC (17 years, 1 month ago) by spiga
Content type: text/x-python
Branch: MAIN
Changes since 1.6: +28 -13 lines
Log Message:
Many changes to integrate BossLite. Creation step fully implemented and optimized... Submission is now working again. Here the missing things are the support for jobs submission by range, the message sending to ML, and the listmatch_match check. Actually the requirements can be changed on the fly as was in the past. The status is fully working with BossLite. The exit code display is not there since the new boss does not implement the RealTime mon.. here the functionality is under development by Federica: to be integrated.

File Contents

# User Rev Content
1 spiga 1.1 from crab_logger import Logger
2     from crab_exceptions import *
3     from crab_util import *
4     import common
5     import os, time, shutil
6    
7     from ProdCommon.BossLite.API.BossLiteAPI import BossLiteAPI
8    
9    
10     from ProdCommon.BossLite.DbObjects.Job import Job
11     from ProdCommon.BossLite.DbObjects.Task import Task
12     from ProdCommon.BossLite.DbObjects.RunningJob import RunningJob
13    
14    
15     class DBinterface:
16 spiga 1.2 def __init__(self, cfg_params):
17 spiga 1.1
18 spiga 1.2 self.cfg_params = cfg_params
19    
20     self.db_type = cfg_params.get("USER.use_db",'SQLite')
21 spiga 1.1 return
22    
23    
24 spiga 1.2 def configureDB(self):
25 spiga 1.1
26     dbname = common.work_space.shareDir()+'crabDB'
27     dbConfig = {'dbName':dbname
28     }
29    
30     common.bossSession = BossLiteAPI( self.db_type, dbConfig)
31     common.bossSession.installDB('$CRABPRODCOMMONPYTHON/ProdCommon/BossLite/DbObjects/setupDatabase-sqlite.sql')
32    
33     return
34    
35 spiga 1.2 def loadDB(self):
36    
37     dbname = common.work_space.shareDir()+'crabDB'
38     dbConfig = {'dbName':dbname
39     }
40     common.bossSession = BossLiteAPI( self.db_type, dbConfig)
41 spiga 1.7 self.task = common.bossSession.loadTaskByID(1)
42 spiga 1.2 return
43 spiga 1.4
44 spiga 1.7 def getTask(self, jobsList='all'): #, cfg_params):
45 spiga 1.4
46 spiga 1.7 #if jobsList == 'all':
47     # self.task = common.bossSession.loadTaskByID(1)
48     #else:
49     #self.task = common.bossSession.load('1','5')
50     #return self.task[0]
51 spiga 1.4 self.task = common.bossSession.loadTaskByID(1)
52     return self.task
53 spiga 1.2
54 slacapra 1.6 def getJob(self, n):
55    
56     self.job = common.bossSession.loadJobByID(1,n)
57     return self.job
58 spiga 1.2
59 spiga 1.1
60     def createTask_(self, optsToSave):
61     """
62     Task declaration
63     with the first coniguration stuff
64 slacapra 1.6 {'server_name': 'crabas.lnl.infn.it/data1/cms/', '-scheduler': 'glite', '-jobtype': 'cmssw', '-server_mode': '0'}
65 spiga 1.1
66     """
67     opt={}
68 spiga 1.7 if optsToSave['server_mode'] == 1: opt['serverName']=optsToSave['server_name']
69 spiga 1.1 opt[ 'name']=common.work_space.taskName()
70     task = Task( opt )
71    
72     common.bossSession.saveTask( task )
73     return
74    
75     def updateTask_(self,optsToSave):
76     """
77     Update task fields
78     """
79 spiga 1.2 #task = common.bossSession.loadTaskByName(common.work_space.taskName() )
80     task = common.bossSession.loadTaskByID(1)
81 spiga 1.1
82     for key in optsToSave.keys():
83     task[key] = optsToSave[key]
84     common.bossSession.updateDB( task )
85     return
86    
87     def createJobs_(self, nj):
88     """
89     Fill crab DB with the jobs filed
90     """
91 spiga 1.2 #task = common.bossSession.loadTaskByName(common.work_space.taskName())
92     task = common.bossSession.loadTaskByID(1)
93 spiga 1.1 jobs = []
94     for id in range(nj):
95     parameters = {}
96 spiga 1.7 parameters['name'] = 'job' + str(id)
97 spiga 1.1 job = Job(parameters)
98     jobs.append(job)
99     task.addJobs(jobs)
100     common.bossSession.updateDB( task )
101     return
102    
103     def updateJob_(self, nj, optsToSave):
104     """
105     Update Job fields
106     """
107 spiga 1.2 task = common.bossSession.loadTaskByID(1)
108     #task = common.bossSession.loadTaskByName( common.work_space.taskName())
109 spiga 1.7 for i in range(len(nj)):
110     # jobs = common.bossSession.loadJob(task['id'],i)
111     for key in optsToSave[i].keys():
112     task.jobs[i][key] = optsToSave[i][key]
113     common.bossSession.updateDB( task )
114 spiga 1.1 return
115    
116     def updateRunJob_(self, nj, optsToSave):
117     """
118     Update Running Job fields
119     """
120 spiga 1.2 task = common.bossSession.loadTaskByID(1)
121     #task = common.bossSession.loadTaskByName( common.work_space.taskName())
122 spiga 1.7 for i in nj:
123     common.bossSession.getRunningInstance(task.jobs[i])
124     for key in optsToSave.keys():
125     task.jobs[i].runningJob[key] = optsToSave[key]
126 spiga 1.1 common.bossSession.updateDB( task )
127     return
128    
129     def nJobs(self):
130 spiga 1.2
131     task = common.bossSession.loadTaskByID(1)
132     #task = common.bossSession.loadTaskByName( common.work_space.taskName())
133 spiga 1.1 return len(task.jobs)
134    
135     def dump(self,jobs):
136     """
137     List a complete set of infos for a job/range of jobs
138     """
139 spiga 1.2 task = common.bossSession.loadTaskByID(1)
140     #task = common.bossSession.loadTaskByName( common.work_space.taskName())
141 spiga 1.1
142     njobs = len(jobs)
143     lines=[]
144     header=''
145     # ##query the DB asking the right infos for runningJobs TODO DS
146     # for job in jobs:
147     # ## here the query over runngJobs
148     # pass
149    
150    
151     # ##Define Header to show and Pass the query results,
152     # ## header and format to displayReport() TODO DS
153     # if njobs == 1: plural = ''
154     # else: plural = 's'
155     # header += 'Listing %d job%s:\n' % (njobs, plural)
156     # header += ' :\n' % (---) ## TODO DS
157    
158     # displayReport(header, lines):
159     return
160    
161     def queryID(self,server_mode=0):
162     '''
163     Return the taskId if serevr_mode =1
164     Return the joblistId if serevr_mode =0
165     '''
166     header=''
167     lines=[]
168 spiga 1.2 task = common.bossSession.loadTaskByID(1)
169 spiga 1.1 if server_mode == 1:
170 spiga 1.2 header= "Task Id = %-40s " %(task['name'])
171 spiga 1.1 else:
172 spiga 1.2 # task = common.bossSession.loadTaskByName(common.work_space.taskName() )
173 spiga 1.1 for i in range(len(task.job)):
174     common.bossSession.getRunningInstance(task.jobs[i])
175     lines.append(task.jobs[i].runningJob['schedulerId'])
176    
177     header+= "Job: %-5s Id = %-40s: \n"
178     displayReport(header,lines)
179     return
180    
181     def queryTask(self,attr):
182     '''
183     Perform a query over a generic task attribute
184     '''
185 spiga 1.2 task = common.bossSession.loadTaskByID(1)
186 spiga 1.1 return task[attr]
187    
188     def queryJob(self, attr, njobs):
189     '''
190     Perform a query for a range/all/single job
191     over a generic job attribute
192     '''
193     lines=[]
194 spiga 1.2 task = common.bossSession.loadTaskByID(1)
195     #task = common.bossSession.loadTaskByName( common.work_space.taskName())
196 spiga 1.1 for i in njobs:
197     jobs = common.bossSession.loadJob(task['id'],i+1)
198     lines.append(task.jobs[i][attr])
199     return lines
200    
201     def queryRunJob(self, attr, jobs):
202     '''
203     Perform a query for a range/all/single job
204     over a generic job attribute
205     '''
206     lines=[]
207 spiga 1.2 task = common.bossSession.loadTaskByID(1)
208     # task = common.bossSession.loadTaskByName( common.work_space.taskName() )
209 spiga 1.1 for i in jobs:
210 spiga 1.5 common.bossSession.getRunningInstance(task.jobs[i-1])
211     lines.append(task.jobs[i-1].runningJob[attr])
212 spiga 1.1 return lines
213 spiga 1.3
214     def queryDistJob(self, attr):
215     '''
216     Returns the list of distinct value for a given job attributes
217     '''
218     distAttr=[]
219 spiga 1.5 task = common.bossSession.loadJobDist( 1, attr )
220 spiga 1.3 for i in task: distAttr.append(i[attr])
221     return distAttr
222    
223 spiga 1.5 def queryDistJob_Attr(self, attr_1, attr_2, list):
224 spiga 1.4 '''
225     Returns the list of distinct value for a given job attributes
226     '''
227     distAttr=[]
228 slacapra 1.6 task = common.bossSession.loadJobDistAttr( 1, attr_1, attr_2, list )
229 spiga 1.4 for i in task: distAttr.append(i[attr_1])
230     return distAttr
231 slacapra 1.6
232 spiga 1.3 def queryAttrJob(self, attr, field):
233     '''
234     Returns the list of jobs matching the given attribute
235     '''
236     matched=[]
237     task = common.bossSession.loadJobsByAttr(attr )
238     for i in task:
239     matched.append(i[field])
240     return matched
241    
242 spiga 1.7
243     def queryAttrRunJob(self, attr,field):
244     '''
245     Returns the list of jobs matching the given attribute
246     '''
247     matched=[]
248     task = common.bossSession.loadJobsByRunningAttr(attr)
249     for i in task:
250     matched.append(i[field])
251     return matched