ViewVC Help
View File | Revision Log | Show Annotations | Root Listing
root/cvsroot/COMP/CRAB/python/JobDB.py
Revision: 1.24
Committed: Thu Oct 19 16:48:52 2006 UTC (18 years, 6 months ago) by corvo
Content type: text/x-python
Branch: MAIN
CVS Tags: CRAB_2_0_0_pre7, CRAB_2_0_0_pre6, CRAB_1_5_3, CRAB_1_5_3_pre5, CRAB_1_5_3_pre4, CRAB_2_0_0_pre5, CRAB_1_5_3_pre3, configure, CRAB_2_0_0_pre4, CRAB_1_5_3_pre2, CRAB_1_5_3_pre1, CRAB_2_0_0_pre3, CRAB_1_5_2, CRAB_2_0_0_pre2, CRAB_2_0_0_pre1, CRAB_1_5_1, CRAB_1_5_1_pre4, CRAB_1_5_1_pre3, CRAB_1_5_1_pre2, CRAB_1_5_1_pre1, CRAB_1_5_0, CRAB_1_5_0_pre9, CRAB_1_5_0_pre8, CRAB_1_5_0_pre7, CRAB_1_5_0_pre6, CRAB_1_5_0_pre5, CRAB_1_5_0_pre4, CRAB_1_5_0_pre3, CRAB_1_5_0_pre2, CRAB_1_4_2, CRAB_1_5_0_pre1, CRAB_1_4_1, CRAB_1_4_1_pre2, CRAB_1_4_1_pre1, CRAB_1_4_0
Branch point for: CRAB_1_5_4_SLC3_start, branch_1_4_1
Changes since 1.23: +1 -1 lines
Log Message:
small fix in the nSubmittedJob methid for the kill all option

File Contents

# Content
1 from WorkSpace import WorkSpace
2 from crab_exceptions import *
3 import common
4
5 import os, string
6
7 class dbEntry:
8 def __init__(self):
9 self.status = 'X' # job status
10 self.exitStatus = '' # job exit status
11 self.jid = '' # scheduler job id
12 self.bossid = '' # BOSS job id
13 self.collections = [] # EvCollection to be analyzed in this job
14 self.inputSandbox = [] # InputSandbox
15 self.outputSandbox = [] # OutputSandbox
16 self.taskId = '' # Task job belongs to
17 self.block = '' # Task job belongs to
18 self.arguments = [] # abstract job_type parameters
19 self.dest = [] # Destination for this job according to DLS
20 return
21
22 def __str__(self):
23 txt = 'Status <' + self.status + '>; '
24 if self.exitStatus!='':
25 txt += 'exitStatus <' + str(self.exitStatus) + '>\n'
26 txt += 'Job Id <' + self.jid + '> Block <'+self.block+'>\n'
27 if self.arguments:
28 txt += 'Job Type Arguments <' + str(self.arguments) + '>\n'
29 if self.dest:
30 txt += 'Destination <' + str(self.dest) + '>\n'
31
32 return txt
33
34 class JobDB:
35 def __init__(self):
36 self._dir = common.work_space.shareDir() + 'db/'
37 self._db_fname = 'jobs'
38 self._jobs = [] # list of dbEntry's
39 return
40
41 def __str__(self):
42 njobs = self.nJobs()
43 if njobs == 1: plural = ''
44 else: plural = 's'
45 txt = 'Total of %d job%s:\n' % (njobs, plural)
46 for i in range(njobs):
47 txt += ('Job %03d' % (i+1)) + ': '
48 txt += str(self._jobs[i])
49 pass
50 return txt
51
52 def dump(self, jobs):
53 njobs = len(jobs)
54 if njobs == 1: plural = ''
55 else: plural = 's'
56 print 'Listing %d job%s:\n' % (njobs, plural)
57 for job in jobs:
58 print ('Job %03d' % (job)) + ': ' + str(self._jobs[job-1])
59 pass
60
61 def nJobs(self):
62 return len(self._jobs)
63
64 def nSubmittedJobs(self):
65 n = 0
66 for i in range(self.nJobs()):
67 if self.status(i) in ['S', 'R']:
68 n += 1
69 return n
70
71 def create(self, njobs):
72
73 if os.path.exists(self._dir):
74 msg = 'Cannot create Job DB: already exists.'
75 raise CrabException(msg)
76
77 os.mkdir(self._dir)
78
79 for i in range(njobs):
80 self._jobs.append(dbEntry())
81 pass
82
83 common.logger.debug(5,'Created DB for '+str(njobs)+' jobs')
84
85 self.save()
86 return
87
88 def save(self):
89 db_file = open(self._dir+self._db_fname, 'w')
90 for i in range(len(self._jobs)):
91 db_file.write(str(i)+'|')
92 db_file.write(self._jobs[i].status+'|')
93 db_file.write(self._jobs[i].exitStatus+'|')
94 db_file.write(self._jobs[i].jid+'|')
95 db_file.write(self._jobs[i].bossid+'|')
96 db_file.write(string.join(self._jobs[i].collections)+'|')
97 db_file.write(string.join(self._jobs[i].inputSandbox)+'|')
98 db_file.write(string.join(self._jobs[i].outputSandbox)+'|')
99 db_file.write(str(self._jobs[i].taskId)+'|')
100 db_file.write(str(self._jobs[i].block)+'|')
101 db_file.write(string.join(self._jobs[i].arguments)+'|')
102 db_file.write(string.join(self._jobs[i].dest)+'|')
103 db_file.write('\n')
104 pass
105 db_file.close()
106 return
107
108 def load(self):
109 self._jobs = []
110 try:
111 db_file = open(self._dir+self._db_fname, 'r')
112 except IOError:
113 raise DBException("Something really serious! no JobDB is present!!!")
114
115 for line in db_file:
116 db_entry = dbEntry()
117 (n, db_entry.status, db_entry.exitStatus, db_entry.jid, db_entry.bossid, collectionsTMP, inputSandboxTMP , outputSandboxTMP , db_entry.taskId, db_entry.block, argumentsTMP, destTMP, rest) = string.split(line, '|')
118 db_entry.collections = string.split(collectionsTMP)
119 db_entry.inputSandbox = string.split(inputSandboxTMP)
120 db_entry.outputSandbox = string.split(outputSandboxTMP)
121 db_entry.arguments = string.split(argumentsTMP)
122 db_entry.dest = string.split(destTMP)
123 self._jobs.append(db_entry)
124 pass
125 db_file.close()
126 return
127
128
129 def setStatus(self, nj, status):
130 self.check(nj)
131 self._jobs[int(nj)].status = status
132 return
133
134 def status(self, nj):
135 return self._jobs[int(nj)].status
136
137 def setExitStatus(self, nj, exitStatus):
138 self.check(nj)
139 self._jobs[int(nj)].exitStatus = exitStatus
140 return
141
142 def exitStatus(self, nj):
143 return self._jobs[int(nj)].exitStatus
144
145 def setJobId(self, nj, jid):
146 self.check(nj)
147 self._jobs[int(nj)].jid = jid
148 return
149
150 def jobId(self, nj):
151 return self._jobs[int(nj)].jid
152
153 def setBossId(self, nj, bossid):
154 self.check(nj)
155 self._jobs[int(nj)].bossid = bossid
156 return
157
158 def bossId(self, nj):
159 return self._jobs[int(nj)].bossid
160
161 def setArguments(self, nj, args):
162 self.check(nj)
163 self._jobs[int(nj)].arguments = args
164 return
165
166 def arguments(self, nj):
167 return self._jobs[int(nj)].arguments
168
169 def setCollections(self, nj, Collections):
170 self.check(nj)
171 self._jobs[int(nj)].Collections = Collections
172 return
173
174 def collections(self, nj):
175 return self._jobs[int(nj)].collections
176
177 def setInputSandbox(self, nj, InputSandbox):
178 self.check(nj)
179 self._jobs[int(nj)].inputSandbox = InputSandbox
180 return
181
182 def inputSandbox(self, nj):
183 return self._jobs[int(nj)].inputSandbox
184
185 def setOutputSandbox(self, nj, OutputSandbox):
186 self.check(nj)
187 self._jobs[int(nj)].outputSandbox = OutputSandbox
188 return
189
190 def outputSandbox(self, nj):
191 return self._jobs[int(nj)].outputSandbox
192
193 def setTaskId(self, nj, taskId):
194 self.check(nj)
195 self._jobs[int(nj)].taskId = taskId
196
197 def taskId(self, nj):
198 return self._jobs[int(nj)].taskId
199
200 def setBlock(self, nj, block):
201 self.check(nj)
202 self._jobs[int(nj)].block = block
203
204 def block(self, nj):
205 return self._jobs[int(nj)].block
206
207 def setDestination(self, nj, args):
208 self.check(nj)
209 self._jobs[int(nj)].dest = args
210 return
211
212 def destination(self, nj):
213 return self._jobs[int(nj)].dest
214
215 def check(self, nj):
216 """ Check if the job nj is already present in DB (first job is 0) and create it if not """
217 if (int(nj) >= self.nJobs()): self._jobs.append(dbEntry())