ViewVC Help
View File | Revision Log | Show Annotations | Root Listing
root/cvsroot/COMP/PKGTOOLS/scheduler.py
Revision: 1.10
Committed: Tue Dec 4 14:18:09 2012 UTC (12 years, 4 months ago) by eulisse
Content type: text/x-python
Branch: MAIN
CVS Tags: V00-22-02, V00-21-09, V00-22-01, V00-22-00, V00-21-08, V00-21-07, V00-21-06, V00-21-05, ge20130117, V00-21-04, V00-21-03, HEAD
Changes since 1.9: +26 -10 lines
Log Message:
Add dummy final job.

* This job is dependent from any other task being scheduled and will
  therefore always be scheduled last. This is useful to avoid dead-lock
  situations because it forces all threads to remain active until all
  the tasks have terminated.
* Unit tests updated accordingly.

File Contents

# Content
1 from Queue import Queue
2 from threading import Thread
3 from time import sleep
4 import threading
5 import sys
6 import traceback
7 from StringIO import StringIO
8
9 # Helper class to avoid conflict between result
10 # codes and quit state transition.
11 class _SchedulerQuitCommand(object):
12 pass
13
14 def transition(what, fromList, toList):
15 try:
16 fromList.remove(what)
17 except ValueError, e:
18 print what + " not in source list"
19 raise e
20 toList.append(what)
21
22 class Scheduler(object):
23 # A simple job scheduler.
24 # Workers queue is to specify to threads what to do. Results
25 # queue is whatched by the master thread to wait for results
26 # of workers computations.
27 # All worker threads begin trying to fetch from the command queue (and are
28 # therefore blocked).
29 # Master thread does the scheduling and then sits waiting for results.
30 # Scheduling implies iterating on the list of jobs and creates an entry
31 # in the parallel queue for all the jobs which does not have any dependency
32 # which is not done.
33 # If a job has dependencies which did not build, move it do the failed queue.
34 # Post an appropriate build command for all the new jobs which got added.
35 # If there are jobs still to be done, post a reschedule job on the command queue
36 # if there are no jobs left, post the "kill worker" task.
37 # There is one, special, "final-job" which depends on all the scheduled jobs
38 # either parallel or serial. This job is guaranteed to be executed last and
39 # avoids having deadlocks due to all the queues having been disposed.
40 def __init__(self, parallelThreads, logDelegate=None):
41 self.workersQueue = Queue()
42 self.resultsQueue = Queue()
43 self.jobs = {}
44 self.pendingJobs = []
45 self.runningJobs = []
46 self.doneJobs = []
47 self.brokenJobs = []
48 self.parallelThreads = parallelThreads
49 self.logDelegate = logDelegate
50 self.errors = {}
51 if not logDelegate:
52 self.logDelegate = self.__doLog
53 # Add a final job, which will depend on any spawned task so that we do not
54 # terminate until we are completely done.
55 self.finalJobDeps = []
56 self.finalJobSpec = [self.doSerial, "final-job", self.finalJobDeps] + [self.__doLog, "Nothing else to be done, exiting."]
57 self.resultsQueue.put((threading.currentThread(), self.finalJobSpec))
58 self.jobs["final-job"] = {"scheduler": "serial", "deps": self.finalJobSpec, "spec": self.finalJobSpec}
59 self.pendingJobs.append("final-job")
60
61 def run(self):
62 for i in xrange(self.parallelThreads):
63 t = Thread(target=self.__createWorker())
64 t.daemon = True
65 t.start()
66
67 self.notifyMaster(self.__rescheduleParallel)
68 # Wait until all the workers are done.
69 while self.parallelThreads:
70 try:
71 who, item = self.resultsQueue.get()
72 #print who, item
73 item[0](*item[1:])
74 sleep(0.1)
75 except KeyboardInterrupt:
76 print "Ctrl-c received, waiting for workers to finish"
77 while self.workersQueue.full():
78 self.workersQueue.get(False)
79 self.shout(self.quit)
80
81 # Prune the queue.
82 while self.resultsQueue.full():
83 item = self.resultsQueue.get()
84 item[0](*item[1:])
85 return
86
87 # Create a worker.
88 def __createWorker(self):
89 def worker():
90 while True:
91 taskId, item = self.workersQueue.get()
92 try:
93 result = item[0](*item[1:])
94 except Exception, e:
95 s = StringIO()
96 traceback.print_exc(file=s)
97 result = s.getvalue()
98
99 if type(result) == _SchedulerQuitCommand:
100 self.notifyMaster(self.__releaseWorker)
101 return
102 self.log(str(item) + " done")
103 self.notifyMaster(self.__updateJobStatus, taskId, result)
104 self.notifyMaster(self.__rescheduleParallel)
105 # Only in 2.5: self.workersQueue.task_done()
106 return worker
107
108 def __releaseWorker(self):
109 self.parallelThreads -= 1
110
111 def parallel(self, taskId, deps, *spec):
112 if taskId in self.pendingJobs:
113 self.log("Double task %s" % taskId)
114 self.jobs[taskId] = {"scheduler": "parallel", "deps": deps, "spec":spec}
115 self.pendingJobs.append(taskId)
116 self.finalJobDeps.append(taskId)
117
118 # Does the rescheduling of tasks. Derived class should call it.
119 def __rescheduleParallel(self):
120 parallelJobs = [j for j in self.pendingJobs if self.jobs[j]["scheduler"] == "parallel"]
121 # First of all clean up the pending parallel jobs from all those
122 # which have broken dependencies.
123 for taskId in parallelJobs:
124 brokenDeps = [dep for dep in self.jobs[taskId]["deps"] if dep in self.brokenJobs]
125 if not brokenDeps:
126 continue
127 transition(taskId, self.pendingJobs, self.brokenJobs)
128 self.errors[taskId] = "The following dependencies could not complete:\n%s" % "\n".join(brokenDeps)
129
130 # If no tasks left, quit. Notice we need to check also for serial jobs
131 # since they might queue more parallel payloads.
132 if not self.pendingJobs:
133 self.shout(self.quit)
134 self.notifyMaster(self.quit)
135 return
136
137 # Otherwise do another round of scheduling of all the tasks. In this
138 # case we only queue parallel jobs to the parallel queue.
139 for taskId in parallelJobs:
140 pendingDeps = [dep for dep in self.jobs[taskId]["deps"] if not dep in self.doneJobs]
141 if pendingDeps:
142 continue
143 # No broken dependencies and no pending ones. we can continue.
144 transition(taskId, self.pendingJobs, self.runningJobs)
145 self.__scheduleParallel(taskId, self.jobs[taskId]["spec"])
146
147 # Update the job with the result of running.
148 def __updateJobStatus(self, taskId, error):
149 if not error:
150 transition(taskId, self.runningJobs, self.doneJobs)
151 return
152 transition(taskId, self.runningJobs, self.brokenJobs)
153 self.errors[taskId] = error
154
155 # One task at the time.
156 def __scheduleParallel(self, taskId, commandSpec):
157 self.workersQueue.put((taskId, commandSpec))
158
159 # Helper to enqueue commands for all the threads.
160 def shout(self, *commandSpec):
161 for x in xrange(self.parallelThreads):
162 self.__scheduleParallel("quit-" + str(x), commandSpec)
163
164 # Helper to enqueu replies to the master thread.
165 def notifyMaster(self, *commandSpec):
166 self.resultsQueue.put((threading.currentThread(), commandSpec))
167
168 def serial(self, taskId, deps, *commandSpec):
169 spec = [self.doSerial, taskId, deps] + list(commandSpec)
170 self.resultsQueue.put((threading.currentThread(), spec))
171 self.jobs[taskId] = {"scheduler": "serial", "deps": deps, "spec": spec}
172 self.pendingJobs.append(taskId)
173 self.finalJobDeps.append(taskId)
174
175 def doSerial(self, taskId, deps, *commandSpec):
176 brokenDeps = [dep for dep in deps if dep in self.brokenJobs]
177 if brokenDeps:
178 transition(taskId, self.pendingJobs, self.brokenJobs)
179 self.errors[taskId] = "The following dependencies could not complete:\n%s" % "\n".join(brokenDeps)
180 # Remember to do the scheduling again!
181 self.notifyMaster(self.__rescheduleParallel)
182 return
183
184 # Put back the task on the queue, since it has pending dependencies.
185 pendingDeps = [dep for dep in deps if not dep in self.doneJobs]
186 if pendingDeps:
187 self.resultsQueue.put((threading.currentThread(), [self.doSerial, taskId, deps] + list(commandSpec)))
188 return
189 # No broken dependencies and no pending ones. Run the job.
190 transition(taskId, self.pendingJobs, self.runningJobs)
191 try:
192 result = commandSpec[0](*commandSpec[1:])
193 except Exception, e:
194 s = StringIO()
195 traceback.print_exc(file=s)
196 result = s.getvalue()
197 self.__updateJobStatus(taskId, result)
198 # Remember to do the scheduling again!
199 self.notifyMaster(self.__rescheduleParallel)
200
201 # Helper method to do logging:
202 def log(self, s):
203 self.notifyMaster(self.logDelegate, s)
204
205 # Task which forces a worker to quit.
206 def quit(self):
207 self.log("Requested to quit.")
208 return _SchedulerQuitCommand()
209
210 # Helper for printouts.
211 def __doLog(self, s):
212 print s
213
214 def reschedule(self):
215 self.notifyMaster(self.__rescheduleParallel)
216
217 def dummyTask():
218 sleep(0.1)
219
220 def dummyTaskLong():
221 sleep(1)
222
223 def errorTask():
224 return "This will always have an error"
225
226 def exceptionTask():
227 raise Exception("foo")
228
229 # Mimics cmsBuild workflow.
230 def scheduleMore(scheduler):
231 scheduler.parallel("download", [], dummyTask)
232 scheduler.parallel("build", ["download"], dummyTask)
233 scheduler.serial("install", ["build"], dummyTask)
234
235 if __name__ == "__main__":
236 scheduler = Scheduler(10)
237 scheduler.run()
238
239 scheduler = Scheduler(1)
240 scheduler.run()
241
242 scheduler = Scheduler(10)
243 scheduler.parallel("test", [], scheduler.log, "This is england");
244 scheduler.run()
245
246 scheduler = Scheduler(10)
247 for x in xrange(50):
248 scheduler.parallel("test" + str(x), [], dummyTask)
249 scheduler.run()
250 # Notice we have 51 jobs because there is always a toplevel one
251 # which depends on all the others.
252 assert(len(scheduler.brokenJobs) == 0)
253 assert(len(scheduler.jobs) == 51)
254
255 scheduler = Scheduler(1)
256 scheduler.parallel("test", [], errorTask)
257 scheduler.run()
258 # Again, since the toplevel one always depend on all the others
259 # it is always broken if something else is brokend.
260 assert(len(scheduler.brokenJobs) == 2)
261 assert(len(scheduler.runningJobs) == 0)
262 assert(len(scheduler.doneJobs) == 0)
263
264 # Check dependency actually works.
265 scheduler = Scheduler(10)
266 scheduler.parallel("test2", ["test1"], dummyTask)
267 scheduler.parallel("test1", [], dummyTaskLong)
268 scheduler.run()
269 assert(scheduler.doneJobs == ["test1", "test2", "final-job"])
270
271 # Check dependency actually works.
272 scheduler = Scheduler(10)
273 scheduler.parallel("test3", ["test2"], dummyTask)
274 scheduler.parallel("test2", ["test1"], errorTask)
275 scheduler.parallel("test1", [], dummyTaskLong)
276 scheduler.run()
277 assert(scheduler.doneJobs == ["test1"])
278 assert(scheduler.brokenJobs == ["test2", "test3", "final-job"])
279
280 # Check ctrl-C will exit properly.
281 scheduler = Scheduler(2)
282 for x in xrange(250):
283 scheduler.parallel("test" + str(x), [], dummyTask)
284 print "Print Control-C to continue"
285 scheduler.run()
286
287 # Handle tasks with exceptions.
288 scheduler = Scheduler(2)
289 scheduler.parallel("test", [], exceptionTask)
290 scheduler.run()
291 assert(scheduler.errors["test"])
292
293 # Handle tasks which depend on tasks with exceptions.
294 scheduler = Scheduler(2)
295 scheduler.parallel("test0", [], dummyTask)
296 scheduler.parallel("test1", [], exceptionTask)
297 scheduler.parallel("test2", ["test1"], dummyTask)
298 scheduler.run()
299 assert(scheduler.errors["test1"])
300 assert(scheduler.errors["test2"])
301
302 # Handle serial execution tasks.
303 scheduler = Scheduler(2)
304 scheduler.serial("test0", [], dummyTask)
305 scheduler.run()
306 assert(scheduler.doneJobs == ["test0", "final-job"])
307
308 # Handle serial execution tasks, one depends from
309 # the previous one.
310 scheduler = Scheduler(2)
311 scheduler.serial("test0", [], dummyTask)
312 scheduler.serial("test1", ["test0"], dummyTask)
313 scheduler.run()
314 assert(scheduler.doneJobs == ["test0", "test1", "final-job"])
315
316 # Serial tasks depending on one another.
317 scheduler = Scheduler(2)
318 scheduler.serial("test1", ["test0"], dummyTask)
319 scheduler.serial("test0", [], dummyTask)
320 scheduler.run()
321 assert(scheduler.doneJobs == ["test0", "test1", "final-job"])
322
323 # Serial and parallel tasks being scheduled at the same time.
324 scheduler = Scheduler(2)
325 scheduler.serial("test1", ["test0"], dummyTask)
326 scheduler.serial("test0", [], dummyTask)
327 scheduler.parallel("test2", [], dummyTask)
328 scheduler.parallel("test3", [], dummyTask)
329 scheduler.run()
330 scheduler.doneJobs.sort()
331 assert(scheduler.doneJobs == ["final-job", "test0", "test1", "test2", "test3"])
332
333 # Serial and parallel tasks. Parallel depends on serial.
334 scheduler = Scheduler(2)
335 scheduler.serial("test1", ["test0"], dummyTask)
336 scheduler.serial("test0", [], dummyTask)
337 scheduler.parallel("test2", ["test1"], dummyTask)
338 scheduler.parallel("test3", ["test2"], dummyTask)
339 scheduler.run()
340 assert(scheduler.doneJobs == ["test0", "test1", "test2", "test3", "final-job"])
341
342 # Serial task scheduling two parallel task and another dependent
343 # serial task. This is actually what needs to be done for building
344 # packages. I.e.
345 # The first serial task is responsible for checking if a package is already there,
346 # then it queues a parallel download sources task, a subsequent build sources
347 # one and finally the install built package one.
348 scheduler = Scheduler(3)
349 scheduler.serial("check-pkg", [], scheduleMore, scheduler)
350 scheduler.run()
351 assert(scheduler.doneJobs == ["check-pkg", "download", "build", "install", "final-job"])