1 |
from WorkSpace import WorkSpace
|
2 |
from crab_exceptions import *
|
3 |
import common
|
4 |
|
5 |
import os, string
|
6 |
|
7 |
class dbEntry:
|
8 |
def __init__(self):
|
9 |
self.status = 'X' # job status
|
10 |
self.exitStatus = '' # job exit status
|
11 |
self.jid = '' # scheduler job id
|
12 |
self.bossid = '' # BOSS job id
|
13 |
self.collections = [] # EvCollection to be analyzed in this job
|
14 |
self.inputSandbox = [] # InputSandbox
|
15 |
self.outputSandbox = [] # OutputSandbox
|
16 |
self.taskId = '' # Task job belongs to
|
17 |
self.block = '' # Task job belongs to
|
18 |
self.arguments = [] # abstract job_type parameters
|
19 |
self.dest = [] # Destination for this job according to DLS
|
20 |
return
|
21 |
|
22 |
def __str__(self):
|
23 |
txt = 'Status <' + self.status + '>; '
|
24 |
if self.exitStatus!='':
|
25 |
txt += 'exitStatus <' + str(self.exitStatus) + '>\n'
|
26 |
txt += 'Job Id <' + self.jid + '> Block <'+self.block+'>\n'
|
27 |
if self.arguments:
|
28 |
txt += 'Job Type Arguments <' + str(self.arguments) + '>\n'
|
29 |
if self.dest:
|
30 |
txt += 'Destination <' + str(self.dest) + '>\n'
|
31 |
|
32 |
return txt
|
33 |
|
34 |
class JobDB:
|
35 |
def __init__(self):
|
36 |
self._dir = common.work_space.shareDir() + 'db/'
|
37 |
self._db_fname = 'jobs'
|
38 |
self._jobs = [] # list of dbEntry's
|
39 |
return
|
40 |
|
41 |
def __str__(self):
|
42 |
njobs = self.nJobs()
|
43 |
if njobs == 1: plural = ''
|
44 |
else: plural = 's'
|
45 |
txt = 'Total of %d job%s:\n' % (njobs, plural)
|
46 |
for i in range(njobs):
|
47 |
txt += ('Job %03d' % (i+1)) + ': '
|
48 |
txt += str(self._jobs[i])
|
49 |
pass
|
50 |
return txt
|
51 |
|
52 |
def dump(self, jobs):
|
53 |
njobs = len(jobs)
|
54 |
if njobs == 1: plural = ''
|
55 |
else: plural = 's'
|
56 |
print 'Listing %d job%s:\n' % (njobs, plural)
|
57 |
for job in jobs:
|
58 |
print ('Job %03d' % (job)) + ': ' + str(self._jobs[job-1])
|
59 |
pass
|
60 |
|
61 |
def nJobs(self):
|
62 |
return len(self._jobs)
|
63 |
|
64 |
def nSubmittedJobs(self):
|
65 |
n = 0
|
66 |
for i in range(self.nJobs()):
|
67 |
if self.status(i) in ['S', 'R']:
|
68 |
n += 1
|
69 |
return n
|
70 |
|
71 |
def create(self, njobs):
|
72 |
|
73 |
if os.path.exists(self._dir):
|
74 |
msg = 'Cannot create Job DB: already exists.'
|
75 |
raise CrabException(msg)
|
76 |
|
77 |
os.mkdir(self._dir)
|
78 |
|
79 |
for i in range(njobs):
|
80 |
self._jobs.append(dbEntry())
|
81 |
pass
|
82 |
|
83 |
common.logger.debug(5,'Created DB for '+str(njobs)+' jobs')
|
84 |
|
85 |
self.save()
|
86 |
return
|
87 |
|
88 |
def save(self):
|
89 |
db_file = open(self._dir+self._db_fname, 'w')
|
90 |
for i in range(len(self._jobs)):
|
91 |
db_file.write(str(i)+'|')
|
92 |
db_file.write(self._jobs[i].status+'|')
|
93 |
db_file.write(self._jobs[i].exitStatus+'|')
|
94 |
db_file.write(self._jobs[i].jid+'|')
|
95 |
db_file.write(self._jobs[i].bossid+'|')
|
96 |
db_file.write(string.join(self._jobs[i].collections)+'|')
|
97 |
db_file.write(string.join(self._jobs[i].inputSandbox)+'|')
|
98 |
db_file.write(string.join(self._jobs[i].outputSandbox)+'|')
|
99 |
db_file.write(str(self._jobs[i].taskId)+'|')
|
100 |
db_file.write(str(self._jobs[i].block)+'|')
|
101 |
db_file.write(string.join(self._jobs[i].arguments)+'|')
|
102 |
db_file.write(string.join(self._jobs[i].dest)+'|')
|
103 |
db_file.write('\n')
|
104 |
pass
|
105 |
db_file.close()
|
106 |
return
|
107 |
|
108 |
def load(self):
|
109 |
self._jobs = []
|
110 |
try:
|
111 |
db_file = open(self._dir+self._db_fname, 'r')
|
112 |
except IOError:
|
113 |
raise DBException("Something really serious! no JobDB is present!!!")
|
114 |
|
115 |
for line in db_file:
|
116 |
db_entry = dbEntry()
|
117 |
(n, db_entry.status, db_entry.exitStatus, db_entry.jid, db_entry.bossid, collectionsTMP, inputSandboxTMP , outputSandboxTMP , db_entry.taskId, db_entry.block, argumentsTMP, destTMP, rest) = string.split(line, '|')
|
118 |
db_entry.collections = string.split(collectionsTMP)
|
119 |
db_entry.inputSandbox = string.split(inputSandboxTMP)
|
120 |
db_entry.outputSandbox = string.split(outputSandboxTMP)
|
121 |
db_entry.arguments = string.split(argumentsTMP)
|
122 |
db_entry.dest = string.split(destTMP)
|
123 |
self._jobs.append(db_entry)
|
124 |
pass
|
125 |
db_file.close()
|
126 |
return
|
127 |
|
128 |
|
129 |
def setStatus(self, nj, status):
|
130 |
self.check(nj)
|
131 |
self._jobs[int(nj)].status = status
|
132 |
return
|
133 |
|
134 |
def status(self, nj):
|
135 |
return self._jobs[int(nj)].status
|
136 |
|
137 |
def setExitStatus(self, nj, exitStatus):
|
138 |
self.check(nj)
|
139 |
self._jobs[int(nj)].exitStatus = exitStatus
|
140 |
return
|
141 |
|
142 |
def exitStatus(self, nj):
|
143 |
return self._jobs[int(nj)].exitStatus
|
144 |
|
145 |
def setJobId(self, nj, jid):
|
146 |
self.check(nj)
|
147 |
self._jobs[int(nj)].jid = jid
|
148 |
return
|
149 |
|
150 |
def jobId(self, nj):
|
151 |
return self._jobs[int(nj)].jid
|
152 |
|
153 |
def setBossId(self, nj, bossid):
|
154 |
self.check(nj)
|
155 |
self._jobs[int(nj)].bossid = bossid
|
156 |
return
|
157 |
|
158 |
def bossId(self, nj):
|
159 |
return self._jobs[int(nj)].bossid
|
160 |
|
161 |
def setArguments(self, nj, args):
|
162 |
self.check(nj)
|
163 |
self._jobs[int(nj)].arguments = args
|
164 |
return
|
165 |
|
166 |
def arguments(self, nj):
|
167 |
return self._jobs[int(nj)].arguments
|
168 |
|
169 |
def setCollections(self, nj, Collections):
|
170 |
self.check(nj)
|
171 |
self._jobs[int(nj)].Collections = Collections
|
172 |
return
|
173 |
|
174 |
def collections(self, nj):
|
175 |
return self._jobs[int(nj)].collections
|
176 |
|
177 |
def setInputSandbox(self, nj, InputSandbox):
|
178 |
self.check(nj)
|
179 |
self._jobs[int(nj)].inputSandbox = InputSandbox
|
180 |
return
|
181 |
|
182 |
def inputSandbox(self, nj):
|
183 |
return self._jobs[int(nj)].inputSandbox
|
184 |
|
185 |
def setOutputSandbox(self, nj, OutputSandbox):
|
186 |
self.check(nj)
|
187 |
self._jobs[int(nj)].outputSandbox = OutputSandbox
|
188 |
return
|
189 |
|
190 |
def outputSandbox(self, nj):
|
191 |
return self._jobs[int(nj)].outputSandbox
|
192 |
|
193 |
def setTaskId(self, nj, taskId):
|
194 |
self.check(nj)
|
195 |
self._jobs[int(nj)].taskId = taskId
|
196 |
|
197 |
def taskId(self, nj):
|
198 |
return self._jobs[int(nj)].taskId
|
199 |
|
200 |
def setBlock(self, nj, block):
|
201 |
self.check(nj)
|
202 |
self._jobs[int(nj)].block = block
|
203 |
|
204 |
def block(self, nj):
|
205 |
return self._jobs[int(nj)].block
|
206 |
|
207 |
def setDestination(self, nj, args):
|
208 |
self.check(nj)
|
209 |
self._jobs[int(nj)].dest = args
|
210 |
return
|
211 |
|
212 |
def destination(self, nj):
|
213 |
return self._jobs[int(nj)].dest
|
214 |
|
215 |
def check(self, nj):
|
216 |
""" Check if the job nj is already present in DB (first job is 0) and create it if not """
|
217 |
if (int(nj) >= self.nJobs()): self._jobs.append(dbEntry())
|