ViewVC Help
View File | Revision Log | Show Annotations | Root Listing
root/cvsroot/COMP/PKGTOOLS/scheduler.py
Revision: 1.5
Committed: Sun Jan 8 22:27:25 2012 UTC (13 years, 3 months ago) by eulisse
Content type: text/x-python
Branch: MAIN
Changes since 1.4: +17 -4 lines
Log Message:
Better errors when an exception is raised.

File Contents

# User Rev Content
1 eulisse 1.1 from Queue import Queue
2     from threading import Thread
3     from time import sleep
4     import threading
5 eulisse 1.5 import sys
6     import traceback
7     from StringIO import StringIO
8 eulisse 1.1
9     # Helper class to avoid conflict between result
10     # codes and quit state transition.
11     class _SchedulerQuitCommand(object):
12     pass
13    
14     def transition(what, fromList, toList):
15     try:
16     fromList.remove(what)
17     except ValueError, e:
18     print what + " not in source list"
19     raise e
20     toList.append(what)
21    
22     class Scheduler(object):
23     # A simple job scheduler.
24     # Workers queue is to specify to threads what to do. Results
25     # queue is whatched by the master thread to wait for results
26     # of workers computations.
27     # All worker threads begin trying to fetch from the command queue (and are
28     # therefore blocked).
29     # Master thread does the scheduling and then sits waiting for results.
30     # Scheduling implies iterating on the list of jobs and creates an entry
31     # in the parallel queue for all the jobs which does not have any dependency
32     # which is not done.
33     # If a job has dependencies which did not build, move it do the failed queue.
34     # Post an appropriate build command for all the new jobs which got added.
35     # If there are jobs still to be done, post a reschedule job on the command queue
36     # if there are no jobs left, post the "kill worker" task.
37     def __init__(self, parallelThreads, logDelegate=None):
38     self.workersQueue = Queue()
39     self.resultsQueue = Queue()
40     self.jobs = {}
41     self.pendingJobs = []
42     self.runningJobs = []
43     self.doneJobs = []
44     self.brokenJobs = []
45     self.parallelThreads = parallelThreads
46     self.logDelegate = logDelegate
47 eulisse 1.2 self.errors = {}
48 eulisse 1.1 if not logDelegate:
49     self.logDelegate = self.__doLog
50    
51     def run(self):
52     for i in xrange(self.parallelThreads):
53 eulisse 1.3 t = Thread(target=self.__createWorker())
54 eulisse 1.1 t.daemon = True
55     t.start()
56    
57 eulisse 1.3 self.notifyMaster(self.__rescheduleParallel)
58 eulisse 1.1 # Wait until all the workers are done.
59     while self.parallelThreads:
60     try:
61     who, item = self.resultsQueue.get()
62 eulisse 1.5 #print who, item
63 eulisse 1.1 item[0](*item[1:])
64 eulisse 1.5 sleep(0.1)
65 eulisse 1.1 except KeyboardInterrupt:
66     print "Ctrl-c received, waiting for workers to finish"
67 eulisse 1.5 while self.workersQueue.full():
68     self.workersQueue.get(False)
69 eulisse 1.1 self.shout(self.quit)
70    
71     # Prune the queue.
72     while self.resultsQueue.full():
73     item = self.resultsQueue.get()
74     item[0](*item[1:])
75     return
76    
77     # Create a worker.
78 eulisse 1.3 def __createWorker(self):
79 eulisse 1.1 def worker():
80     while True:
81     taskId, item = self.workersQueue.get()
82 eulisse 1.2 try:
83     result = item[0](*item[1:])
84     except Exception, e:
85 eulisse 1.5 s = StringIO()
86     traceback.print_exc(file=s)
87     result = s.getvalue()
88 eulisse 1.2
89 eulisse 1.1 if type(result) == _SchedulerQuitCommand:
90 eulisse 1.3 self.notifyMaster(self.__releaseWorker)
91 eulisse 1.1 return
92     self.log(str(item) + " done")
93     self.notifyMaster(self.__updateJobStatus, taskId, result)
94 eulisse 1.3 self.notifyMaster(self.__rescheduleParallel)
95 eulisse 1.1 # Only in 2.5: self.workersQueue.task_done()
96     return worker
97    
98 eulisse 1.3 def __releaseWorker(self):
99 eulisse 1.1 self.parallelThreads -= 1
100    
101 eulisse 1.3 def parallel(self, taskId, deps, *spec):
102 eulisse 1.1 if taskId in self.pendingJobs:
103     self.log("Double task %s" % taskId)
104 eulisse 1.3 self.jobs[taskId] = {"scheduler": "parallel", "deps": deps, "spec":spec}
105 eulisse 1.1 self.pendingJobs.append(taskId)
106    
107     # Does the rescheduling of tasks. Derived class should call it.
108 eulisse 1.3 def __rescheduleParallel(self):
109     parallelJobs = [j for j in self.pendingJobs if self.jobs[j]["scheduler"] == "parallel"]
110     # First of all clean up the pending parallel jobs from all those
111 eulisse 1.1 # which have broken dependencies.
112 eulisse 1.3 for taskId in parallelJobs:
113 eulisse 1.1 brokenDeps = [dep for dep in self.jobs[taskId]["deps"] if dep in self.brokenJobs]
114     if not brokenDeps:
115     continue
116     transition(taskId, self.pendingJobs, self.brokenJobs)
117 eulisse 1.2 self.errors[taskId] = "The following dependencies could not complete:\n%s" % "\n".join(brokenDeps)
118 eulisse 1.1
119 eulisse 1.3 # If no tasks left, quit. Notice we need to check also for serial jobs
120     # since they might queue more parallel payloads.
121 eulisse 1.1 if not self.pendingJobs:
122     self.shout(self.quit)
123     self.notifyMaster(self.quit)
124 eulisse 1.3 return
125 eulisse 1.1
126 eulisse 1.3 # Otherwise do another round of scheduling of all the tasks. In this
127     # case we only queue parallel jobs to the parallel queue.
128     for taskId in parallelJobs:
129 eulisse 1.1 pendingDeps = [dep for dep in self.jobs[taskId]["deps"] if not dep in self.doneJobs]
130     if pendingDeps:
131     continue
132     # No broken dependencies and no pending ones. we can continue.
133     transition(taskId, self.pendingJobs, self.runningJobs)
134 eulisse 1.3 self.__scheduleParallel(taskId, self.jobs[taskId]["spec"])
135 eulisse 1.1
136     # Update the job with the result of running.
137     def __updateJobStatus(self, taskId, error):
138     if not error:
139     transition(taskId, self.runningJobs, self.doneJobs)
140     return
141     transition(taskId, self.runningJobs, self.brokenJobs)
142 eulisse 1.2 self.errors[taskId] = error
143 eulisse 1.1
144     # One task at the time.
145 eulisse 1.3 def __scheduleParallel(self, taskId, commandSpec):
146 eulisse 1.1 self.workersQueue.put((taskId, commandSpec))
147    
148     # Helper to enqueue commands for all the threads.
149     def shout(self, *commandSpec):
150     for x in xrange(self.parallelThreads):
151 eulisse 1.3 self.__scheduleParallel("quit-" + str(x), commandSpec)
152 eulisse 1.1
153     # Helper to enqueu replies to the master thread.
154     def notifyMaster(self, *commandSpec):
155     self.resultsQueue.put((threading.current_thread(), commandSpec))
156 eulisse 1.3
157     def serial(self, taskId, deps, *commandSpec):
158     spec = [self.doSerial, taskId, deps] + list(commandSpec)
159     self.resultsQueue.put((threading.current_thread(), spec))
160     self.jobs[taskId] = {"scheduler": "serial", "deps": deps, "spec": spec}
161     self.pendingJobs.append(taskId)
162    
163     def doSerial(self, taskId, deps, *commandSpec):
164     brokenDeps = [dep for dep in deps if dep in self.brokenJobs]
165     if brokenDeps:
166     transition(taskId, self.pendingJobs, self.brokenJobs)
167     self.errors[taskId] = "The following dependencies could not complete:\n%s" % "\n".join(brokenDeps)
168     # Remember to do the scheduling again!
169     self.notifyMaster(self.__rescheduleParallel)
170     return
171    
172     # Put back the task on the queue, since it has pending dependencies.
173     pendingDeps = [dep for dep in deps if not dep in self.doneJobs]
174     if pendingDeps:
175     self.resultsQueue.put((threading.current_thread(), [self.doSerial, taskId, deps] + list(commandSpec)))
176     return
177     # No broken dependencies and no pending ones. Run the job.
178     transition(taskId, self.pendingJobs, self.runningJobs)
179     try:
180     result = commandSpec[0](*commandSpec[1:])
181     except Exception, e:
182 eulisse 1.5 s = StringIO()
183     traceback.print_exc(file=s)
184     result = s.getvalue()
185 eulisse 1.3 self.__updateJobStatus(taskId, result)
186     # Remember to do the scheduling again!
187     self.notifyMaster(self.__rescheduleParallel)
188 eulisse 1.1
189     # Helper method to do logging:
190     def log(self, s):
191     self.notifyMaster(self.logDelegate, s)
192    
193     # Task which forces a worker to quit.
194     def quit(self):
195     self.log("Requested to quit.")
196     return _SchedulerQuitCommand()
197    
198     # Helper for printouts.
199     def __doLog(self, s):
200     print s
201    
202 eulisse 1.5 def reschedule(self):
203     self.notifyMaster(self.__rescheduleParallel)
204    
205 eulisse 1.1 def dummyTask():
206     sleep(0.1)
207    
208     def dummyTaskLong():
209     sleep(1)
210    
211     def errorTask():
212     return "This will always have an error"
213    
214 eulisse 1.2 def exceptionTask():
215     raise Exception("foo")
216    
217 eulisse 1.4 # Mimics cmsBuild workflow.
218     def scheduleMore(scheduler):
219     scheduler.parallel("download", [], dummyTask)
220     scheduler.parallel("build", ["download"], dummyTask)
221     scheduler.serial("install", ["build"], dummyTask)
222    
223 eulisse 1.1 if __name__ == "__main__":
224     scheduler = Scheduler(10)
225     scheduler.run()
226    
227     scheduler = Scheduler(1)
228     scheduler.run()
229    
230     scheduler = Scheduler(10)
231 eulisse 1.3 scheduler.parallel("test", [], scheduler.log, "This is england");
232 eulisse 1.1 scheduler.run()
233    
234     scheduler = Scheduler(10)
235     for x in xrange(50):
236 eulisse 1.3 scheduler.parallel("test" + str(x), [], dummyTask)
237 eulisse 1.1 scheduler.run()
238     assert(len(scheduler.brokenJobs) == 0)
239     assert(len(scheduler.jobs) == 50)
240    
241     scheduler = Scheduler(1)
242 eulisse 1.3 scheduler.parallel("test", [], errorTask)
243 eulisse 1.1 scheduler.run()
244     assert(len(scheduler.brokenJobs) == 1)
245     assert(len(scheduler.runningJobs) == 0)
246     assert(len(scheduler.doneJobs) == 0)
247    
248     # Check dependency actually works.
249     scheduler = Scheduler(10)
250 eulisse 1.3 scheduler.parallel("test2", ["test1"], dummyTask)
251     scheduler.parallel("test1", [], dummyTaskLong)
252 eulisse 1.1 scheduler.run()
253     assert(scheduler.doneJobs == ["test1", "test2"])
254    
255     # Check dependency actually works.
256     scheduler = Scheduler(10)
257 eulisse 1.3 scheduler.parallel("test3", ["test2"], dummyTask)
258     scheduler.parallel("test2", ["test1"], errorTask)
259     scheduler.parallel("test1", [], dummyTaskLong)
260 eulisse 1.1 scheduler.run()
261     assert(scheduler.doneJobs == ["test1"])
262     assert(scheduler.brokenJobs == ["test2", "test3"])
263    
264     # Check ctrl-C will exit properly.
265     scheduler = Scheduler(2)
266     for x in xrange(250):
267 eulisse 1.3 scheduler.parallel("test" + str(x), [], dummyTask)
268 eulisse 1.1 print "Print Control-C to continue"
269 eulisse 1.2 scheduler.run()
270    
271     # Handle tasks with exceptions.
272     scheduler = Scheduler(2)
273 eulisse 1.3 scheduler.parallel("test", [], exceptionTask)
274 eulisse 1.2 scheduler.run()
275 eulisse 1.5 assert(scheduler.errors["test"])
276 eulisse 1.2
277     # Handle tasks which depend on tasks with exceptions.
278     scheduler = Scheduler(2)
279 eulisse 1.3 scheduler.parallel("test0", [], dummyTask)
280     scheduler.parallel("test1", [], exceptionTask)
281     scheduler.parallel("test2", ["test1"], dummyTask)
282 eulisse 1.2 scheduler.run()
283     assert(scheduler.errors["test1"])
284     assert(scheduler.errors["test2"])
285 eulisse 1.3
286     # Handle serial execution tasks.
287     scheduler = Scheduler(2)
288     scheduler.serial("test0", [], dummyTask)
289     scheduler.run()
290     assert(scheduler.doneJobs == ["test0"])
291    
292     # Handle serial execution tasks, one depends from
293     # the previous one.
294     scheduler = Scheduler(2)
295     scheduler.serial("test0", [], dummyTask)
296     scheduler.serial("test1", ["test0"], dummyTask)
297     scheduler.run()
298     assert(scheduler.doneJobs == ["test0", "test1"])
299    
300     # Serial tasks depending on one another.
301     scheduler = Scheduler(2)
302     scheduler.serial("test1", ["test0"], dummyTask)
303     scheduler.serial("test0", [], dummyTask)
304     scheduler.run()
305     assert(scheduler.doneJobs == ["test0", "test1"])
306    
307     # Serial and parallel tasks being scheduled at the same time.
308     scheduler = Scheduler(2)
309     scheduler.serial("test1", ["test0"], dummyTask)
310     scheduler.serial("test0", [], dummyTask)
311     scheduler.parallel("test2", [], dummyTask)
312     scheduler.parallel("test3", [], dummyTask)
313     scheduler.run()
314     scheduler.doneJobs.sort()
315     assert(scheduler.doneJobs == ["test0", "test1", "test2", "test3"])
316    
317     # Serial and parallel tasks. Parallel depends on serial.
318     scheduler = Scheduler(2)
319     scheduler.serial("test1", ["test0"], dummyTask)
320     scheduler.serial("test0", [], dummyTask)
321     scheduler.parallel("test2", ["test1"], dummyTask)
322     scheduler.parallel("test3", ["test2"], dummyTask)
323     scheduler.run()
324     assert(scheduler.doneJobs == ["test0", "test1", "test2", "test3"])
325 eulisse 1.4
326     # Serial task scheduling two parallel task and another dependent
327     # serial task. This is actually what needs to be done for building
328     # packages. I.e.
329     # The first serial task is responsible for checking if a package is already there,
330     # then it queues a parallel download sources task, a subsequent build sources
331     # one and finally the install built package one.
332     scheduler = Scheduler(3)
333     scheduler.serial("check-pkg", [], scheduleMore, scheduler)
334     scheduler.run()
335     assert(scheduler.doneJobs == ["check-pkg", "download", "build", "install"])