ViewVC Help
View File | Revision Log | Show Annotations | Root Listing
root/cvsroot/COMP/CRAB/python/JobDB.py
Revision: 1.20
Committed: Wed Sep 20 17:29:52 2006 UTC (18 years, 7 months ago) by slacapra
Content type: text/x-python
Branch: MAIN
CVS Tags: CRAB_1_3_0_pre4, CRAB_1_3_0_pre3
Changes since 1.19: +29 -12 lines
Log Message:
BOSS4 + sub-file splitting + taskDB

File Contents

# User Rev Content
1 nsmirnov 1.1 from WorkSpace import WorkSpace
2     from crab_exceptions import *
3     import common
4    
5 nsmirnov 1.2 import os, string
6 nsmirnov 1.1
7     class dbEntry:
8     def __init__(self):
9 slacapra 1.5 self.status = 'X' # job status
10 slacapra 1.8 self.exitStatus = '' # job exit status
11 slacapra 1.5 self.jid = '' # scheduler job id
12 corvo 1.10 self.bossid = '' # BOSS job id
13 slacapra 1.5 self.collections = [] # EvCollection to be analyzed in this job
14     self.inputSandbox = [] # InputSandbox
15     self.outputSandbox = [] # OutputSandbox
16 corvo 1.10 self.taskId = '' # Task job belongs to
17 gutsche 1.13 self.arguments = [] ### Fabio: abstract job_type parameters
18 gutsche 1.18 self.dest = [] # Destination for this job according to DLS
19 nsmirnov 1.1 return
20    
21 nsmirnov 1.2 def __str__(self):
22     txt = 'Status <' + self.status + '>; '
23 slacapra 1.8 if self.exitStatus!='':
24     txt += 'exitStatus <' + str(self.exitStatus) + '>\n'
25 nsmirnov 1.2 txt += 'Job Id <' + self.jid + '>\n'
26 gutsche 1.13 if self.arguments:
27     txt += 'Job Type Arguments <' + str(self.arguments) + '>\n'
28 slacapra 1.19 if self.dest:
29     txt += 'Destination <' + str(self.dest) + '>\n'
30 slacapra 1.4
31 nsmirnov 1.1 return txt
32    
33     class JobDB:
34     def __init__(self):
35     self._dir = common.work_space.shareDir() + 'db/'
36     self._db_fname = 'jobs'
37     self._jobs = [] # list of dbEntry's
38     return
39    
40 nsmirnov 1.2 def __str__(self):
41 nsmirnov 1.1 njobs = self.nJobs()
42     if njobs == 1: plural = ''
43     else: plural = 's'
44     txt = 'Total of %d job%s:\n' % (njobs, plural)
45     for i in range(njobs):
46 nsmirnov 1.3 txt += ('Job %03d' % (i+1)) + ': '
47 nsmirnov 1.1 txt += str(self._jobs[i])
48     pass
49     return txt
50    
51 slacapra 1.4 def dump(self, jobs):
52     njobs = len(jobs)
53     if njobs == 1: plural = ''
54     else: plural = 's'
55     print 'Listing %d job%s:\n' % (njobs, plural)
56     for job in jobs:
57 slacapra 1.17 print ('Job %03d' % (job)) + ': ' + str(self._jobs[job-1])
58 slacapra 1.4 pass
59    
60 nsmirnov 1.1 def nJobs(self):
61     return len(self._jobs)
62    
63     def create(self, njobs):
64    
65     if os.path.exists(self._dir):
66     msg = 'Cannot create Job DB: already exists.'
67     raise CrabException(msg)
68    
69     os.mkdir(self._dir)
70    
71     for i in range(njobs):
72     self._jobs.append(dbEntry())
73     pass
74    
75 slacapra 1.4 common.logger.debug(5,'Created DB for '+str(njobs)+' jobs')
76    
77 nsmirnov 1.1 self.save()
78     return
79    
80     def save(self):
81     db_file = open(self._dir+self._db_fname, 'w')
82     for i in range(len(self._jobs)):
83 slacapra 1.20 db_file.write(`(i+1)`+'|')
84     db_file.write(self._jobs[i].status+'|')
85     db_file.write(self._jobs[i].exitStatus+'|')
86     db_file.write(self._jobs[i].jid+'|')
87     db_file.write(self._jobs[i].bossid+'|')
88     db_file.write(string.join(self._jobs[i].collections)+'|')
89     db_file.write(string.join(self._jobs[i].inputSandbox)+'|')
90     db_file.write(string.join(self._jobs[i].outputSandbox)+'|')
91     db_file.write(str(self._jobs[i].taskId)+'|')
92     db_file.write(string.join(self._jobs[i].arguments)+'|')
93     db_file.write(string.join(self._jobs[i].dest)+'|')
94 nsmirnov 1.1 db_file.write('\n')
95     pass
96     db_file.close()
97     return
98    
99     def load(self):
100     self._jobs = []
101 slacapra 1.6 try:
102     db_file = open(self._dir+self._db_fname, 'r')
103     except IOError:
104 slacapra 1.7 raise DBException("Something really serious! no JobDB is present!!!")
105 slacapra 1.6
106 nsmirnov 1.1 for line in db_file:
107 nsmirnov 1.2 db_entry = dbEntry()
108 slacapra 1.20 (n, db_entry.status, db_entry.exitStatus, db_entry.jid, db_entry.bossid, collectionsTMP, inputSandboxTMP , outputSandboxTMP , db_entry.taskId, argumentsTMP, destTMP, rest) = string.split(line, '|')
109 slacapra 1.16 db_entry.collections = string.split(collectionsTMP)
110     db_entry.inputSandbox = string.split(inputSandboxTMP)
111     db_entry.outputSandbox = string.split(outputSandboxTMP)
112     db_entry.arguments = string.split(argumentsTMP)
113 gutsche 1.18 db_entry.dest = string.split(destTMP)
114 nsmirnov 1.1 self._jobs.append(db_entry)
115     pass
116     db_file.close()
117     return
118    
119 slacapra 1.5
120 nsmirnov 1.1 def setStatus(self, nj, status):
121 slacapra 1.20 self.check(nj)
122 gutsche 1.11 self._jobs[int(nj)].status = status
123 nsmirnov 1.1 return
124    
125     def status(self, nj):
126 gutsche 1.11 return self._jobs[int(nj)].status
127 nsmirnov 1.1
128 slacapra 1.8 def setExitStatus(self, nj, exitStatus):
129 slacapra 1.20 self.check(nj)
130 gutsche 1.11 self._jobs[int(nj)].exitStatus = exitStatus
131 slacapra 1.8 return
132    
133     def exitStatus(self, nj):
134 gutsche 1.11 return self._jobs[int(nj)].exitStatus
135 slacapra 1.8
136 nsmirnov 1.1 def setJobId(self, nj, jid):
137 slacapra 1.20 self.check(nj)
138 gutsche 1.11 self._jobs[int(nj)].jid = jid
139 nsmirnov 1.1 return
140    
141     def jobId(self, nj):
142 gutsche 1.11 return self._jobs[int(nj)].jid
143 slacapra 1.4
144 fanzago 1.9 def setBossId(self, nj, bossid):
145 slacapra 1.20 self.check(nj)
146 gutsche 1.11 self._jobs[int(nj)].bossid = bossid
147 fanzago 1.9 return
148    
149     def bossId(self, nj):
150 gutsche 1.11 return self._jobs[int(nj)].bossid
151 fanzago 1.9
152 gutsche 1.13 def setArguments(self, nj, args):
153 slacapra 1.20 self.check(nj)
154 gutsche 1.13 self._jobs[int(nj)].arguments = args
155 slacapra 1.4 return
156    
157 gutsche 1.13 def arguments(self, nj):
158     return self._jobs[int(nj)].arguments
159 slacapra 1.4
160     def setCollections(self, nj, Collections):
161 slacapra 1.20 self.check(nj)
162 gutsche 1.11 self._jobs[int(nj)].Collections = Collections
163 slacapra 1.4 return
164    
165     def collections(self, nj):
166 gutsche 1.11 return self._jobs[int(nj)].collections
167 slacapra 1.5
168     def setInputSandbox(self, nj, InputSandbox):
169 slacapra 1.20 self.check(nj)
170 gutsche 1.11 self._jobs[int(nj)].inputSandbox = InputSandbox
171 slacapra 1.5 return
172    
173     def inputSandbox(self, nj):
174 gutsche 1.11 return self._jobs[int(nj)].inputSandbox
175 slacapra 1.5
176     def setOutputSandbox(self, nj, OutputSandbox):
177 slacapra 1.20 self.check(nj)
178 gutsche 1.11 self._jobs[int(nj)].outputSandbox = OutputSandbox
179 slacapra 1.5 return
180    
181     def outputSandbox(self, nj):
182 gutsche 1.11 return self._jobs[int(nj)].outputSandbox
183 corvo 1.10
184     def setTaskId(self, nj, taskId):
185 slacapra 1.20 self.check(nj)
186 gutsche 1.11 self._jobs[int(nj)].taskId = taskId
187 gutsche 1.18
188 slacapra 1.20 def taskId(self, nj, taskId):
189     return self._jobs[int(nj)].taskId
190    
191 gutsche 1.18 def setDestination(self, nj, args):
192 slacapra 1.20 self.check(nj)
193 gutsche 1.18 self._jobs[int(nj)].dest = args
194     return
195    
196     def destination(self, nj):
197     return self._jobs[int(nj)].dest
198 slacapra 1.20
199     def check(self, nj):
200     """ Check if the job nj is already present in DB (first job is 0) and create it if not """
201     if (int(nj) >= self.nJobs()): self._jobs.append(dbEntry())