ViewVC Help
View File | Revision Log | Show Annotations | Root Listing
root/cvsroot/COMP/CRAB/python/DBinterface.py
Revision: 1.7
Committed: Mon Mar 17 14:00:59 2008 UTC (17 years, 1 month ago) by spiga
Content type: text/x-python
Branch: MAIN
Changes since 1.6: +28 -13 lines
Log Message:
Many changes to integrate BossLite. Creation step fully implemented and optimized... Submission is now working again. Here the missing things are the support for jobs submission by range, the message sending to ML, and the listmatch_match check. Actually the requirements can be changed on the fly as was in the past. The status is fully working with BossLite. The exit code display is not there since the new boss does not implement the RealTime mon.. here the functionality is under development by Federica: to be integrated.

File Contents

# Content
1 from crab_logger import Logger
2 from crab_exceptions import *
3 from crab_util import *
4 import common
5 import os, time, shutil
6
7 from ProdCommon.BossLite.API.BossLiteAPI import BossLiteAPI
8
9
10 from ProdCommon.BossLite.DbObjects.Job import Job
11 from ProdCommon.BossLite.DbObjects.Task import Task
12 from ProdCommon.BossLite.DbObjects.RunningJob import RunningJob
13
14
15 class DBinterface:
16 def __init__(self, cfg_params):
17
18 self.cfg_params = cfg_params
19
20 self.db_type = cfg_params.get("USER.use_db",'SQLite')
21 return
22
23
24 def configureDB(self):
25
26 dbname = common.work_space.shareDir()+'crabDB'
27 dbConfig = {'dbName':dbname
28 }
29
30 common.bossSession = BossLiteAPI( self.db_type, dbConfig)
31 common.bossSession.installDB('$CRABPRODCOMMONPYTHON/ProdCommon/BossLite/DbObjects/setupDatabase-sqlite.sql')
32
33 return
34
35 def loadDB(self):
36
37 dbname = common.work_space.shareDir()+'crabDB'
38 dbConfig = {'dbName':dbname
39 }
40 common.bossSession = BossLiteAPI( self.db_type, dbConfig)
41 self.task = common.bossSession.loadTaskByID(1)
42 return
43
44 def getTask(self, jobsList='all'): #, cfg_params):
45
46 #if jobsList == 'all':
47 # self.task = common.bossSession.loadTaskByID(1)
48 #else:
49 #self.task = common.bossSession.load('1','5')
50 #return self.task[0]
51 self.task = common.bossSession.loadTaskByID(1)
52 return self.task
53
54 def getJob(self, n):
55
56 self.job = common.bossSession.loadJobByID(1,n)
57 return self.job
58
59
60 def createTask_(self, optsToSave):
61 """
62 Task declaration
63 with the first coniguration stuff
64 {'server_name': 'crabas.lnl.infn.it/data1/cms/', '-scheduler': 'glite', '-jobtype': 'cmssw', '-server_mode': '0'}
65
66 """
67 opt={}
68 if optsToSave['server_mode'] == 1: opt['serverName']=optsToSave['server_name']
69 opt[ 'name']=common.work_space.taskName()
70 task = Task( opt )
71
72 common.bossSession.saveTask( task )
73 return
74
75 def updateTask_(self,optsToSave):
76 """
77 Update task fields
78 """
79 #task = common.bossSession.loadTaskByName(common.work_space.taskName() )
80 task = common.bossSession.loadTaskByID(1)
81
82 for key in optsToSave.keys():
83 task[key] = optsToSave[key]
84 common.bossSession.updateDB( task )
85 return
86
87 def createJobs_(self, nj):
88 """
89 Fill crab DB with the jobs filed
90 """
91 #task = common.bossSession.loadTaskByName(common.work_space.taskName())
92 task = common.bossSession.loadTaskByID(1)
93 jobs = []
94 for id in range(nj):
95 parameters = {}
96 parameters['name'] = 'job' + str(id)
97 job = Job(parameters)
98 jobs.append(job)
99 task.addJobs(jobs)
100 common.bossSession.updateDB( task )
101 return
102
103 def updateJob_(self, nj, optsToSave):
104 """
105 Update Job fields
106 """
107 task = common.bossSession.loadTaskByID(1)
108 #task = common.bossSession.loadTaskByName( common.work_space.taskName())
109 for i in range(len(nj)):
110 # jobs = common.bossSession.loadJob(task['id'],i)
111 for key in optsToSave[i].keys():
112 task.jobs[i][key] = optsToSave[i][key]
113 common.bossSession.updateDB( task )
114 return
115
116 def updateRunJob_(self, nj, optsToSave):
117 """
118 Update Running Job fields
119 """
120 task = common.bossSession.loadTaskByID(1)
121 #task = common.bossSession.loadTaskByName( common.work_space.taskName())
122 for i in nj:
123 common.bossSession.getRunningInstance(task.jobs[i])
124 for key in optsToSave.keys():
125 task.jobs[i].runningJob[key] = optsToSave[key]
126 common.bossSession.updateDB( task )
127 return
128
129 def nJobs(self):
130
131 task = common.bossSession.loadTaskByID(1)
132 #task = common.bossSession.loadTaskByName( common.work_space.taskName())
133 return len(task.jobs)
134
135 def dump(self,jobs):
136 """
137 List a complete set of infos for a job/range of jobs
138 """
139 task = common.bossSession.loadTaskByID(1)
140 #task = common.bossSession.loadTaskByName( common.work_space.taskName())
141
142 njobs = len(jobs)
143 lines=[]
144 header=''
145 # ##query the DB asking the right infos for runningJobs TODO DS
146 # for job in jobs:
147 # ## here the query over runngJobs
148 # pass
149
150
151 # ##Define Header to show and Pass the query results,
152 # ## header and format to displayReport() TODO DS
153 # if njobs == 1: plural = ''
154 # else: plural = 's'
155 # header += 'Listing %d job%s:\n' % (njobs, plural)
156 # header += ' :\n' % (---) ## TODO DS
157
158 # displayReport(header, lines):
159 return
160
161 def queryID(self,server_mode=0):
162 '''
163 Return the taskId if serevr_mode =1
164 Return the joblistId if serevr_mode =0
165 '''
166 header=''
167 lines=[]
168 task = common.bossSession.loadTaskByID(1)
169 if server_mode == 1:
170 header= "Task Id = %-40s " %(task['name'])
171 else:
172 # task = common.bossSession.loadTaskByName(common.work_space.taskName() )
173 for i in range(len(task.job)):
174 common.bossSession.getRunningInstance(task.jobs[i])
175 lines.append(task.jobs[i].runningJob['schedulerId'])
176
177 header+= "Job: %-5s Id = %-40s: \n"
178 displayReport(header,lines)
179 return
180
181 def queryTask(self,attr):
182 '''
183 Perform a query over a generic task attribute
184 '''
185 task = common.bossSession.loadTaskByID(1)
186 return task[attr]
187
188 def queryJob(self, attr, njobs):
189 '''
190 Perform a query for a range/all/single job
191 over a generic job attribute
192 '''
193 lines=[]
194 task = common.bossSession.loadTaskByID(1)
195 #task = common.bossSession.loadTaskByName( common.work_space.taskName())
196 for i in njobs:
197 jobs = common.bossSession.loadJob(task['id'],i+1)
198 lines.append(task.jobs[i][attr])
199 return lines
200
201 def queryRunJob(self, attr, jobs):
202 '''
203 Perform a query for a range/all/single job
204 over a generic job attribute
205 '''
206 lines=[]
207 task = common.bossSession.loadTaskByID(1)
208 # task = common.bossSession.loadTaskByName( common.work_space.taskName() )
209 for i in jobs:
210 common.bossSession.getRunningInstance(task.jobs[i-1])
211 lines.append(task.jobs[i-1].runningJob[attr])
212 return lines
213
214 def queryDistJob(self, attr):
215 '''
216 Returns the list of distinct value for a given job attributes
217 '''
218 distAttr=[]
219 task = common.bossSession.loadJobDist( 1, attr )
220 for i in task: distAttr.append(i[attr])
221 return distAttr
222
223 def queryDistJob_Attr(self, attr_1, attr_2, list):
224 '''
225 Returns the list of distinct value for a given job attributes
226 '''
227 distAttr=[]
228 task = common.bossSession.loadJobDistAttr( 1, attr_1, attr_2, list )
229 for i in task: distAttr.append(i[attr_1])
230 return distAttr
231
232 def queryAttrJob(self, attr, field):
233 '''
234 Returns the list of jobs matching the given attribute
235 '''
236 matched=[]
237 task = common.bossSession.loadJobsByAttr(attr )
238 for i in task:
239 matched.append(i[field])
240 return matched
241
242
243 def queryAttrRunJob(self, attr,field):
244 '''
245 Returns the list of jobs matching the given attribute
246 '''
247 matched=[]
248 task = common.bossSession.loadJobsByRunningAttr(attr)
249 for i in task:
250 matched.append(i[field])
251 return matched