1 |
from WorkSpace import WorkSpace
|
2 |
from crab_exceptions import *
|
3 |
import common
|
4 |
|
5 |
import os, string
|
6 |
|
7 |
class dbEntry:
|
8 |
def __init__(self):
|
9 |
self.status = 'X' # job status
|
10 |
self.exitStatus = '' # job exit status
|
11 |
self.jid = '' # scheduler job id
|
12 |
self.bossid = '' # BOSS job id
|
13 |
self.collections = [] # EvCollection to be analyzed in this job
|
14 |
self.inputSandbox = [] # InputSandbox
|
15 |
self.outputSandbox = [] # OutputSandbox
|
16 |
self.taskId = '' # Task job belongs to
|
17 |
self.arguments = [] ### Fabio: abstract job_type parameters
|
18 |
self.dest = [] # Destination for this job according to DLS
|
19 |
return
|
20 |
|
21 |
def __str__(self):
|
22 |
txt = 'Status <' + self.status + '>; '
|
23 |
if self.exitStatus!='':
|
24 |
txt += 'exitStatus <' + str(self.exitStatus) + '>\n'
|
25 |
txt += 'Job Id <' + self.jid + '>\n'
|
26 |
if self.arguments:
|
27 |
txt += 'Job Type Arguments <' + str(self.arguments) + '>\n'
|
28 |
if self.dest:
|
29 |
txt += 'Destination <' + str(self.dest) + '>\n'
|
30 |
|
31 |
return txt
|
32 |
|
33 |
class JobDB:
|
34 |
def __init__(self):
|
35 |
self._dir = common.work_space.shareDir() + 'db/'
|
36 |
self._db_fname = 'jobs'
|
37 |
self._jobs = [] # list of dbEntry's
|
38 |
return
|
39 |
|
40 |
def __str__(self):
|
41 |
njobs = self.nJobs()
|
42 |
if njobs == 1: plural = ''
|
43 |
else: plural = 's'
|
44 |
txt = 'Total of %d job%s:\n' % (njobs, plural)
|
45 |
for i in range(njobs):
|
46 |
txt += ('Job %03d' % (i+1)) + ': '
|
47 |
txt += str(self._jobs[i])
|
48 |
pass
|
49 |
return txt
|
50 |
|
51 |
def dump(self, jobs):
|
52 |
njobs = len(jobs)
|
53 |
if njobs == 1: plural = ''
|
54 |
else: plural = 's'
|
55 |
print 'Listing %d job%s:\n' % (njobs, plural)
|
56 |
for job in jobs:
|
57 |
print ('Job %03d' % (job)) + ': ' + str(self._jobs[job-1])
|
58 |
pass
|
59 |
|
60 |
def nJobs(self):
|
61 |
return len(self._jobs)
|
62 |
|
63 |
def create(self, njobs):
|
64 |
|
65 |
if os.path.exists(self._dir):
|
66 |
msg = 'Cannot create Job DB: already exists.'
|
67 |
raise CrabException(msg)
|
68 |
|
69 |
os.mkdir(self._dir)
|
70 |
|
71 |
for i in range(njobs):
|
72 |
self._jobs.append(dbEntry())
|
73 |
pass
|
74 |
|
75 |
common.logger.debug(5,'Created DB for '+str(njobs)+' jobs')
|
76 |
|
77 |
self.save()
|
78 |
return
|
79 |
|
80 |
def save(self):
|
81 |
db_file = open(self._dir+self._db_fname, 'w')
|
82 |
for i in range(len(self._jobs)):
|
83 |
db_file.write(`(i+1)`+';')
|
84 |
db_file.write(self._jobs[i].status+';')
|
85 |
db_file.write(self._jobs[i].exitStatus+';')
|
86 |
db_file.write(self._jobs[i].jid+';')
|
87 |
db_file.write(self._jobs[i].bossid+';')
|
88 |
db_file.write(string.join(self._jobs[i].collections)+';')
|
89 |
db_file.write(string.join(self._jobs[i].inputSandbox)+';')
|
90 |
db_file.write(string.join(self._jobs[i].outputSandbox)+';')
|
91 |
db_file.write(str(self._jobs[i].taskId)+';')
|
92 |
db_file.write(string.join(self._jobs[i].arguments)+';')
|
93 |
db_file.write(string.join(self._jobs[i].dest)+';')
|
94 |
db_file.write('\n')
|
95 |
pass
|
96 |
db_file.close()
|
97 |
return
|
98 |
|
99 |
def load(self):
|
100 |
self._jobs = []
|
101 |
try:
|
102 |
db_file = open(self._dir+self._db_fname, 'r')
|
103 |
except IOError:
|
104 |
raise DBException("Something really serious! no JobDB is present!!!")
|
105 |
|
106 |
for line in db_file:
|
107 |
db_entry = dbEntry()
|
108 |
(n, db_entry.status, db_entry.exitStatus, db_entry.jid, db_entry.bossid, collectionsTMP, inputSandboxTMP , outputSandboxTMP , db_entry.taskId, argumentsTMP, destTMP, rest) = string.split(line, ';')
|
109 |
db_entry.collections = string.split(collectionsTMP)
|
110 |
db_entry.inputSandbox = string.split(inputSandboxTMP)
|
111 |
db_entry.outputSandbox = string.split(outputSandboxTMP)
|
112 |
db_entry.arguments = string.split(argumentsTMP)
|
113 |
db_entry.dest = string.split(destTMP)
|
114 |
self._jobs.append(db_entry)
|
115 |
pass
|
116 |
db_file.close()
|
117 |
return
|
118 |
|
119 |
|
120 |
def setStatus(self, nj, status):
|
121 |
self._jobs[int(nj)].status = status
|
122 |
return
|
123 |
|
124 |
def status(self, nj):
|
125 |
return self._jobs[int(nj)].status
|
126 |
|
127 |
def setExitStatus(self, nj, exitStatus):
|
128 |
self._jobs[int(nj)].exitStatus = exitStatus
|
129 |
return
|
130 |
|
131 |
def exitStatus(self, nj):
|
132 |
return self._jobs[int(nj)].exitStatus
|
133 |
|
134 |
def setJobId(self, nj, jid):
|
135 |
self._jobs[int(nj)].jid = jid
|
136 |
return
|
137 |
|
138 |
def jobId(self, nj):
|
139 |
return self._jobs[int(nj)].jid
|
140 |
|
141 |
def setBossId(self, nj, bossid):
|
142 |
self._jobs[int(nj)].bossid = bossid
|
143 |
return
|
144 |
|
145 |
def bossId(self, nj):
|
146 |
return self._jobs[int(nj)].bossid
|
147 |
|
148 |
def setArguments(self, nj, args):
|
149 |
self._jobs[int(nj)].arguments = args
|
150 |
return
|
151 |
|
152 |
def arguments(self, nj):
|
153 |
return self._jobs[int(nj)].arguments
|
154 |
|
155 |
def setCollections(self, nj, Collections):
|
156 |
self._jobs[int(nj)].Collections = Collections
|
157 |
return
|
158 |
|
159 |
def collections(self, nj):
|
160 |
return self._jobs[int(nj)].collections
|
161 |
|
162 |
def setInputSandbox(self, nj, InputSandbox):
|
163 |
self._jobs[int(nj)].inputSandbox = InputSandbox
|
164 |
return
|
165 |
|
166 |
def inputSandbox(self, nj):
|
167 |
return self._jobs[int(nj)].inputSandbox
|
168 |
|
169 |
def setOutputSandbox(self, nj, OutputSandbox):
|
170 |
self._jobs[int(nj)].outputSandbox = OutputSandbox
|
171 |
return
|
172 |
|
173 |
def outputSandbox(self, nj):
|
174 |
return self._jobs[int(nj)].outputSandbox
|
175 |
|
176 |
def setTaskId(self, nj, taskId):
|
177 |
self._jobs[int(nj)].taskId = taskId
|
178 |
|
179 |
def setDestination(self, nj, args):
|
180 |
self._jobs[int(nj)].dest = args
|
181 |
return
|
182 |
|
183 |
def destination(self, nj):
|
184 |
return self._jobs[int(nj)].dest
|