ViewVC Help
View File | Revision Log | Show Annotations | Root Listing
root/cvsroot/COMP/CRAB/python/JobDB.py
Revision: 1.24
Committed: Thu Oct 19 16:48:52 2006 UTC (18 years, 6 months ago) by corvo
Content type: text/x-python
Branch: MAIN
CVS Tags: CRAB_2_0_0_pre7, CRAB_2_0_0_pre6, CRAB_1_5_3, CRAB_1_5_3_pre5, CRAB_1_5_3_pre4, CRAB_2_0_0_pre5, CRAB_1_5_3_pre3, configure, CRAB_2_0_0_pre4, CRAB_1_5_3_pre2, CRAB_1_5_3_pre1, CRAB_2_0_0_pre3, CRAB_1_5_2, CRAB_2_0_0_pre2, CRAB_2_0_0_pre1, CRAB_1_5_1, CRAB_1_5_1_pre4, CRAB_1_5_1_pre3, CRAB_1_5_1_pre2, CRAB_1_5_1_pre1, CRAB_1_5_0, CRAB_1_5_0_pre9, CRAB_1_5_0_pre8, CRAB_1_5_0_pre7, CRAB_1_5_0_pre6, CRAB_1_5_0_pre5, CRAB_1_5_0_pre4, CRAB_1_5_0_pre3, CRAB_1_5_0_pre2, CRAB_1_4_2, CRAB_1_5_0_pre1, CRAB_1_4_1, CRAB_1_4_1_pre2, CRAB_1_4_1_pre1, CRAB_1_4_0
Branch point for: CRAB_1_5_4_SLC3_start, branch_1_4_1
Changes since 1.23: +1 -1 lines
Log Message:
small fix in the nSubmittedJob methid for the kill all option

File Contents

# User Rev Content
1 nsmirnov 1.1 from WorkSpace import WorkSpace
2     from crab_exceptions import *
3     import common
4    
5 nsmirnov 1.2 import os, string
6 nsmirnov 1.1
7     class dbEntry:
8     def __init__(self):
9 slacapra 1.5 self.status = 'X' # job status
10 slacapra 1.8 self.exitStatus = '' # job exit status
11 slacapra 1.5 self.jid = '' # scheduler job id
12 corvo 1.10 self.bossid = '' # BOSS job id
13 slacapra 1.5 self.collections = [] # EvCollection to be analyzed in this job
14     self.inputSandbox = [] # InputSandbox
15     self.outputSandbox = [] # OutputSandbox
16 corvo 1.10 self.taskId = '' # Task job belongs to
17 slacapra 1.22 self.block = '' # Task job belongs to
18     self.arguments = [] # abstract job_type parameters
19 gutsche 1.18 self.dest = [] # Destination for this job according to DLS
20 nsmirnov 1.1 return
21    
22 nsmirnov 1.2 def __str__(self):
23     txt = 'Status <' + self.status + '>; '
24 slacapra 1.8 if self.exitStatus!='':
25     txt += 'exitStatus <' + str(self.exitStatus) + '>\n'
26 slacapra 1.22 txt += 'Job Id <' + self.jid + '> Block <'+self.block+'>\n'
27 gutsche 1.13 if self.arguments:
28     txt += 'Job Type Arguments <' + str(self.arguments) + '>\n'
29 slacapra 1.19 if self.dest:
30     txt += 'Destination <' + str(self.dest) + '>\n'
31 slacapra 1.4
32 nsmirnov 1.1 return txt
33    
34     class JobDB:
35     def __init__(self):
36     self._dir = common.work_space.shareDir() + 'db/'
37     self._db_fname = 'jobs'
38     self._jobs = [] # list of dbEntry's
39     return
40    
41 nsmirnov 1.2 def __str__(self):
42 nsmirnov 1.1 njobs = self.nJobs()
43     if njobs == 1: plural = ''
44     else: plural = 's'
45     txt = 'Total of %d job%s:\n' % (njobs, plural)
46     for i in range(njobs):
47 nsmirnov 1.3 txt += ('Job %03d' % (i+1)) + ': '
48 nsmirnov 1.1 txt += str(self._jobs[i])
49     pass
50     return txt
51    
52 slacapra 1.4 def dump(self, jobs):
53     njobs = len(jobs)
54     if njobs == 1: plural = ''
55     else: plural = 's'
56     print 'Listing %d job%s:\n' % (njobs, plural)
57     for job in jobs:
58 slacapra 1.17 print ('Job %03d' % (job)) + ': ' + str(self._jobs[job-1])
59 slacapra 1.4 pass
60    
61 nsmirnov 1.1 def nJobs(self):
62     return len(self._jobs)
63    
64 corvo 1.23 def nSubmittedJobs(self):
65     n = 0
66     for i in range(self.nJobs()):
67 corvo 1.24 if self.status(i) in ['S', 'R']:
68 corvo 1.23 n += 1
69     return n
70    
71 nsmirnov 1.1 def create(self, njobs):
72    
73     if os.path.exists(self._dir):
74     msg = 'Cannot create Job DB: already exists.'
75     raise CrabException(msg)
76    
77     os.mkdir(self._dir)
78    
79     for i in range(njobs):
80     self._jobs.append(dbEntry())
81     pass
82    
83 slacapra 1.4 common.logger.debug(5,'Created DB for '+str(njobs)+' jobs')
84    
85 nsmirnov 1.1 self.save()
86     return
87    
88     def save(self):
89     db_file = open(self._dir+self._db_fname, 'w')
90     for i in range(len(self._jobs)):
91 gutsche 1.21 db_file.write(str(i)+'|')
92 slacapra 1.20 db_file.write(self._jobs[i].status+'|')
93     db_file.write(self._jobs[i].exitStatus+'|')
94     db_file.write(self._jobs[i].jid+'|')
95     db_file.write(self._jobs[i].bossid+'|')
96     db_file.write(string.join(self._jobs[i].collections)+'|')
97     db_file.write(string.join(self._jobs[i].inputSandbox)+'|')
98     db_file.write(string.join(self._jobs[i].outputSandbox)+'|')
99     db_file.write(str(self._jobs[i].taskId)+'|')
100 slacapra 1.22 db_file.write(str(self._jobs[i].block)+'|')
101 slacapra 1.20 db_file.write(string.join(self._jobs[i].arguments)+'|')
102     db_file.write(string.join(self._jobs[i].dest)+'|')
103 nsmirnov 1.1 db_file.write('\n')
104     pass
105     db_file.close()
106     return
107    
108     def load(self):
109     self._jobs = []
110 slacapra 1.6 try:
111     db_file = open(self._dir+self._db_fname, 'r')
112     except IOError:
113 slacapra 1.7 raise DBException("Something really serious! no JobDB is present!!!")
114 slacapra 1.6
115 nsmirnov 1.1 for line in db_file:
116 nsmirnov 1.2 db_entry = dbEntry()
117 slacapra 1.22 (n, db_entry.status, db_entry.exitStatus, db_entry.jid, db_entry.bossid, collectionsTMP, inputSandboxTMP , outputSandboxTMP , db_entry.taskId, db_entry.block, argumentsTMP, destTMP, rest) = string.split(line, '|')
118 slacapra 1.16 db_entry.collections = string.split(collectionsTMP)
119     db_entry.inputSandbox = string.split(inputSandboxTMP)
120     db_entry.outputSandbox = string.split(outputSandboxTMP)
121     db_entry.arguments = string.split(argumentsTMP)
122 gutsche 1.18 db_entry.dest = string.split(destTMP)
123 nsmirnov 1.1 self._jobs.append(db_entry)
124     pass
125     db_file.close()
126     return
127    
128 slacapra 1.5
129 nsmirnov 1.1 def setStatus(self, nj, status):
130 slacapra 1.20 self.check(nj)
131 gutsche 1.11 self._jobs[int(nj)].status = status
132 nsmirnov 1.1 return
133    
134     def status(self, nj):
135 gutsche 1.11 return self._jobs[int(nj)].status
136 nsmirnov 1.1
137 slacapra 1.8 def setExitStatus(self, nj, exitStatus):
138 slacapra 1.20 self.check(nj)
139 gutsche 1.11 self._jobs[int(nj)].exitStatus = exitStatus
140 slacapra 1.8 return
141    
142     def exitStatus(self, nj):
143 gutsche 1.11 return self._jobs[int(nj)].exitStatus
144 slacapra 1.8
145 nsmirnov 1.1 def setJobId(self, nj, jid):
146 slacapra 1.20 self.check(nj)
147 gutsche 1.11 self._jobs[int(nj)].jid = jid
148 nsmirnov 1.1 return
149    
150     def jobId(self, nj):
151 gutsche 1.11 return self._jobs[int(nj)].jid
152 slacapra 1.4
153 fanzago 1.9 def setBossId(self, nj, bossid):
154 slacapra 1.20 self.check(nj)
155 gutsche 1.11 self._jobs[int(nj)].bossid = bossid
156 fanzago 1.9 return
157    
158     def bossId(self, nj):
159 gutsche 1.11 return self._jobs[int(nj)].bossid
160 fanzago 1.9
161 gutsche 1.13 def setArguments(self, nj, args):
162 slacapra 1.20 self.check(nj)
163 gutsche 1.13 self._jobs[int(nj)].arguments = args
164 slacapra 1.4 return
165    
166 gutsche 1.13 def arguments(self, nj):
167     return self._jobs[int(nj)].arguments
168 slacapra 1.4
169     def setCollections(self, nj, Collections):
170 slacapra 1.20 self.check(nj)
171 gutsche 1.11 self._jobs[int(nj)].Collections = Collections
172 slacapra 1.4 return
173    
174     def collections(self, nj):
175 gutsche 1.11 return self._jobs[int(nj)].collections
176 slacapra 1.5
177     def setInputSandbox(self, nj, InputSandbox):
178 slacapra 1.20 self.check(nj)
179 gutsche 1.11 self._jobs[int(nj)].inputSandbox = InputSandbox
180 slacapra 1.5 return
181    
182     def inputSandbox(self, nj):
183 gutsche 1.11 return self._jobs[int(nj)].inputSandbox
184 slacapra 1.5
185     def setOutputSandbox(self, nj, OutputSandbox):
186 slacapra 1.20 self.check(nj)
187 gutsche 1.11 self._jobs[int(nj)].outputSandbox = OutputSandbox
188 slacapra 1.5 return
189    
190     def outputSandbox(self, nj):
191 gutsche 1.11 return self._jobs[int(nj)].outputSandbox
192 corvo 1.10
193     def setTaskId(self, nj, taskId):
194 slacapra 1.20 self.check(nj)
195 gutsche 1.11 self._jobs[int(nj)].taskId = taskId
196 gutsche 1.18
197 slacapra 1.22 def taskId(self, nj):
198 slacapra 1.20 return self._jobs[int(nj)].taskId
199    
200 slacapra 1.22 def setBlock(self, nj, block):
201     self.check(nj)
202     self._jobs[int(nj)].block = block
203    
204     def block(self, nj):
205     return self._jobs[int(nj)].block
206    
207 gutsche 1.18 def setDestination(self, nj, args):
208 slacapra 1.20 self.check(nj)
209 gutsche 1.18 self._jobs[int(nj)].dest = args
210     return
211    
212     def destination(self, nj):
213     return self._jobs[int(nj)].dest
214 slacapra 1.20
215     def check(self, nj):
216     """ Check if the job nj is already present in DB (first job is 0) and create it if not """
217     if (int(nj) >= self.nJobs()): self._jobs.append(dbEntry())