ViewVC Help
View File | Revision Log | Show Annotations | Root Listing
root/cvsroot/COMP/PKGTOOLS/scheduler.py
Revision: 1.7
Committed: Mon Jan 30 12:42:21 2012 UTC (13 years, 2 months ago) by eulisse
Content type: text/x-python
Branch: MAIN
Changes since 1.6: +16 -1 lines
Log Message:
Check for self depending tasks added.

File Contents

# User Rev Content
1 eulisse 1.1 from Queue import Queue
2     from threading import Thread
3     from time import sleep
4     import threading
5 eulisse 1.5 import sys
6     import traceback
7     from StringIO import StringIO
8 eulisse 1.1
9     # Helper class to avoid conflict between result
10     # codes and quit state transition.
11     class _SchedulerQuitCommand(object):
12     pass
13    
14     def transition(what, fromList, toList):
15     try:
16     fromList.remove(what)
17     except ValueError, e:
18     print what + " not in source list"
19     raise e
20     toList.append(what)
21    
22 eulisse 1.7 # Raise this in case a dependency loop is found.
23     class DependencyLoop(RuntimeError):
24     def __init__(self, s):
25     RuntimeError.__init__(self, s)
26    
27 eulisse 1.1 class Scheduler(object):
28     # A simple job scheduler.
29     # Workers queue is to specify to threads what to do. Results
30     # queue is whatched by the master thread to wait for results
31     # of workers computations.
32     # All worker threads begin trying to fetch from the command queue (and are
33     # therefore blocked).
34     # Master thread does the scheduling and then sits waiting for results.
35     # Scheduling implies iterating on the list of jobs and creates an entry
36     # in the parallel queue for all the jobs which does not have any dependency
37     # which is not done.
38     # If a job has dependencies which did not build, move it do the failed queue.
39     # Post an appropriate build command for all the new jobs which got added.
40     # If there are jobs still to be done, post a reschedule job on the command queue
41     # if there are no jobs left, post the "kill worker" task.
42     def __init__(self, parallelThreads, logDelegate=None):
43     self.workersQueue = Queue()
44     self.resultsQueue = Queue()
45     self.jobs = {}
46     self.pendingJobs = []
47     self.runningJobs = []
48     self.doneJobs = []
49     self.brokenJobs = []
50     self.parallelThreads = parallelThreads
51     self.logDelegate = logDelegate
52 eulisse 1.2 self.errors = {}
53 eulisse 1.1 if not logDelegate:
54     self.logDelegate = self.__doLog
55    
56     def run(self):
57     for i in xrange(self.parallelThreads):
58 eulisse 1.3 t = Thread(target=self.__createWorker())
59 eulisse 1.1 t.daemon = True
60     t.start()
61    
62 eulisse 1.3 self.notifyMaster(self.__rescheduleParallel)
63 eulisse 1.1 # Wait until all the workers are done.
64     while self.parallelThreads:
65     try:
66     who, item = self.resultsQueue.get()
67 eulisse 1.5 #print who, item
68 eulisse 1.1 item[0](*item[1:])
69 eulisse 1.5 sleep(0.1)
70 eulisse 1.1 except KeyboardInterrupt:
71     print "Ctrl-c received, waiting for workers to finish"
72 eulisse 1.5 while self.workersQueue.full():
73     self.workersQueue.get(False)
74 eulisse 1.1 self.shout(self.quit)
75    
76     # Prune the queue.
77     while self.resultsQueue.full():
78     item = self.resultsQueue.get()
79     item[0](*item[1:])
80     return
81    
82     # Create a worker.
83 eulisse 1.3 def __createWorker(self):
84 eulisse 1.1 def worker():
85     while True:
86     taskId, item = self.workersQueue.get()
87 eulisse 1.2 try:
88     result = item[0](*item[1:])
89     except Exception, e:
90 eulisse 1.5 s = StringIO()
91     traceback.print_exc(file=s)
92     result = s.getvalue()
93 eulisse 1.2
94 eulisse 1.1 if type(result) == _SchedulerQuitCommand:
95 eulisse 1.3 self.notifyMaster(self.__releaseWorker)
96 eulisse 1.1 return
97     self.log(str(item) + " done")
98     self.notifyMaster(self.__updateJobStatus, taskId, result)
99 eulisse 1.3 self.notifyMaster(self.__rescheduleParallel)
100 eulisse 1.1 # Only in 2.5: self.workersQueue.task_done()
101     return worker
102    
103 eulisse 1.3 def __releaseWorker(self):
104 eulisse 1.1 self.parallelThreads -= 1
105    
106 eulisse 1.3 def parallel(self, taskId, deps, *spec):
107 eulisse 1.1 if taskId in self.pendingJobs:
108 eulisse 1.7 self.log("Duplicate task %s" % taskId)
109     if taskId in deps:
110     raise DependencyLoop(taskId)
111 eulisse 1.3 self.jobs[taskId] = {"scheduler": "parallel", "deps": deps, "spec":spec}
112 eulisse 1.1 self.pendingJobs.append(taskId)
113    
114     # Does the rescheduling of tasks. Derived class should call it.
115 eulisse 1.3 def __rescheduleParallel(self):
116     parallelJobs = [j for j in self.pendingJobs if self.jobs[j]["scheduler"] == "parallel"]
117     # First of all clean up the pending parallel jobs from all those
118 eulisse 1.1 # which have broken dependencies.
119 eulisse 1.3 for taskId in parallelJobs:
120 eulisse 1.1 brokenDeps = [dep for dep in self.jobs[taskId]["deps"] if dep in self.brokenJobs]
121     if not brokenDeps:
122     continue
123     transition(taskId, self.pendingJobs, self.brokenJobs)
124 eulisse 1.2 self.errors[taskId] = "The following dependencies could not complete:\n%s" % "\n".join(brokenDeps)
125 eulisse 1.1
126 eulisse 1.3 # If no tasks left, quit. Notice we need to check also for serial jobs
127     # since they might queue more parallel payloads.
128 eulisse 1.1 if not self.pendingJobs:
129     self.shout(self.quit)
130     self.notifyMaster(self.quit)
131 eulisse 1.3 return
132 eulisse 1.1
133 eulisse 1.3 # Otherwise do another round of scheduling of all the tasks. In this
134     # case we only queue parallel jobs to the parallel queue.
135     for taskId in parallelJobs:
136 eulisse 1.1 pendingDeps = [dep for dep in self.jobs[taskId]["deps"] if not dep in self.doneJobs]
137     if pendingDeps:
138     continue
139     # No broken dependencies and no pending ones. we can continue.
140     transition(taskId, self.pendingJobs, self.runningJobs)
141 eulisse 1.3 self.__scheduleParallel(taskId, self.jobs[taskId]["spec"])
142 eulisse 1.1
143     # Update the job with the result of running.
144     def __updateJobStatus(self, taskId, error):
145     if not error:
146     transition(taskId, self.runningJobs, self.doneJobs)
147     return
148     transition(taskId, self.runningJobs, self.brokenJobs)
149 eulisse 1.2 self.errors[taskId] = error
150 eulisse 1.1
151     # One task at the time.
152 eulisse 1.3 def __scheduleParallel(self, taskId, commandSpec):
153 eulisse 1.1 self.workersQueue.put((taskId, commandSpec))
154    
155     # Helper to enqueue commands for all the threads.
156     def shout(self, *commandSpec):
157     for x in xrange(self.parallelThreads):
158 eulisse 1.3 self.__scheduleParallel("quit-" + str(x), commandSpec)
159 eulisse 1.1
160     # Helper to enqueu replies to the master thread.
161     def notifyMaster(self, *commandSpec):
162 eulisse 1.6 self.resultsQueue.put((threading.currentThread(), commandSpec))
163 eulisse 1.3
164     def serial(self, taskId, deps, *commandSpec):
165     spec = [self.doSerial, taskId, deps] + list(commandSpec)
166 eulisse 1.6 self.resultsQueue.put((threading.currentThread(), spec))
167 eulisse 1.3 self.jobs[taskId] = {"scheduler": "serial", "deps": deps, "spec": spec}
168     self.pendingJobs.append(taskId)
169    
170     def doSerial(self, taskId, deps, *commandSpec):
171     brokenDeps = [dep for dep in deps if dep in self.brokenJobs]
172     if brokenDeps:
173     transition(taskId, self.pendingJobs, self.brokenJobs)
174     self.errors[taskId] = "The following dependencies could not complete:\n%s" % "\n".join(brokenDeps)
175     # Remember to do the scheduling again!
176     self.notifyMaster(self.__rescheduleParallel)
177     return
178    
179     # Put back the task on the queue, since it has pending dependencies.
180     pendingDeps = [dep for dep in deps if not dep in self.doneJobs]
181     if pendingDeps:
182 eulisse 1.6 self.resultsQueue.put((threading.currentThread(), [self.doSerial, taskId, deps] + list(commandSpec)))
183 eulisse 1.3 return
184     # No broken dependencies and no pending ones. Run the job.
185     transition(taskId, self.pendingJobs, self.runningJobs)
186     try:
187     result = commandSpec[0](*commandSpec[1:])
188     except Exception, e:
189 eulisse 1.5 s = StringIO()
190     traceback.print_exc(file=s)
191     result = s.getvalue()
192 eulisse 1.3 self.__updateJobStatus(taskId, result)
193     # Remember to do the scheduling again!
194     self.notifyMaster(self.__rescheduleParallel)
195 eulisse 1.1
196     # Helper method to do logging:
197     def log(self, s):
198     self.notifyMaster(self.logDelegate, s)
199    
200     # Task which forces a worker to quit.
201     def quit(self):
202     self.log("Requested to quit.")
203     return _SchedulerQuitCommand()
204    
205     # Helper for printouts.
206     def __doLog(self, s):
207     print s
208    
209 eulisse 1.5 def reschedule(self):
210     self.notifyMaster(self.__rescheduleParallel)
211    
212 eulisse 1.1 def dummyTask():
213     sleep(0.1)
214    
215     def dummyTaskLong():
216     sleep(1)
217    
218     def errorTask():
219     return "This will always have an error"
220    
221 eulisse 1.2 def exceptionTask():
222     raise Exception("foo")
223    
224 eulisse 1.4 # Mimics cmsBuild workflow.
225     def scheduleMore(scheduler):
226     scheduler.parallel("download", [], dummyTask)
227     scheduler.parallel("build", ["download"], dummyTask)
228     scheduler.serial("install", ["build"], dummyTask)
229    
230 eulisse 1.1 if __name__ == "__main__":
231     scheduler = Scheduler(10)
232     scheduler.run()
233    
234     scheduler = Scheduler(1)
235     scheduler.run()
236    
237     scheduler = Scheduler(10)
238 eulisse 1.3 scheduler.parallel("test", [], scheduler.log, "This is england");
239 eulisse 1.1 scheduler.run()
240    
241     scheduler = Scheduler(10)
242     for x in xrange(50):
243 eulisse 1.3 scheduler.parallel("test" + str(x), [], dummyTask)
244 eulisse 1.1 scheduler.run()
245     assert(len(scheduler.brokenJobs) == 0)
246     assert(len(scheduler.jobs) == 50)
247    
248     scheduler = Scheduler(1)
249 eulisse 1.3 scheduler.parallel("test", [], errorTask)
250 eulisse 1.1 scheduler.run()
251     assert(len(scheduler.brokenJobs) == 1)
252     assert(len(scheduler.runningJobs) == 0)
253     assert(len(scheduler.doneJobs) == 0)
254    
255     # Check dependency actually works.
256     scheduler = Scheduler(10)
257 eulisse 1.3 scheduler.parallel("test2", ["test1"], dummyTask)
258     scheduler.parallel("test1", [], dummyTaskLong)
259 eulisse 1.1 scheduler.run()
260     assert(scheduler.doneJobs == ["test1", "test2"])
261    
262     # Check dependency actually works.
263     scheduler = Scheduler(10)
264 eulisse 1.3 scheduler.parallel("test3", ["test2"], dummyTask)
265     scheduler.parallel("test2", ["test1"], errorTask)
266     scheduler.parallel("test1", [], dummyTaskLong)
267 eulisse 1.1 scheduler.run()
268     assert(scheduler.doneJobs == ["test1"])
269     assert(scheduler.brokenJobs == ["test2", "test3"])
270    
271     # Check ctrl-C will exit properly.
272     scheduler = Scheduler(2)
273     for x in xrange(250):
274 eulisse 1.3 scheduler.parallel("test" + str(x), [], dummyTask)
275 eulisse 1.1 print "Print Control-C to continue"
276 eulisse 1.2 scheduler.run()
277    
278     # Handle tasks with exceptions.
279     scheduler = Scheduler(2)
280 eulisse 1.3 scheduler.parallel("test", [], exceptionTask)
281 eulisse 1.2 scheduler.run()
282 eulisse 1.5 assert(scheduler.errors["test"])
283 eulisse 1.2
284     # Handle tasks which depend on tasks with exceptions.
285     scheduler = Scheduler(2)
286 eulisse 1.3 scheduler.parallel("test0", [], dummyTask)
287     scheduler.parallel("test1", [], exceptionTask)
288     scheduler.parallel("test2", ["test1"], dummyTask)
289 eulisse 1.2 scheduler.run()
290     assert(scheduler.errors["test1"])
291     assert(scheduler.errors["test2"])
292 eulisse 1.3
293     # Handle serial execution tasks.
294     scheduler = Scheduler(2)
295     scheduler.serial("test0", [], dummyTask)
296     scheduler.run()
297     assert(scheduler.doneJobs == ["test0"])
298    
299     # Handle serial execution tasks, one depends from
300     # the previous one.
301     scheduler = Scheduler(2)
302     scheduler.serial("test0", [], dummyTask)
303     scheduler.serial("test1", ["test0"], dummyTask)
304     scheduler.run()
305     assert(scheduler.doneJobs == ["test0", "test1"])
306    
307     # Serial tasks depending on one another.
308     scheduler = Scheduler(2)
309     scheduler.serial("test1", ["test0"], dummyTask)
310     scheduler.serial("test0", [], dummyTask)
311     scheduler.run()
312     assert(scheduler.doneJobs == ["test0", "test1"])
313    
314     # Serial and parallel tasks being scheduled at the same time.
315     scheduler = Scheduler(2)
316     scheduler.serial("test1", ["test0"], dummyTask)
317     scheduler.serial("test0", [], dummyTask)
318     scheduler.parallel("test2", [], dummyTask)
319     scheduler.parallel("test3", [], dummyTask)
320     scheduler.run()
321     scheduler.doneJobs.sort()
322     assert(scheduler.doneJobs == ["test0", "test1", "test2", "test3"])
323    
324     # Serial and parallel tasks. Parallel depends on serial.
325     scheduler = Scheduler(2)
326     scheduler.serial("test1", ["test0"], dummyTask)
327     scheduler.serial("test0", [], dummyTask)
328     scheduler.parallel("test2", ["test1"], dummyTask)
329     scheduler.parallel("test3", ["test2"], dummyTask)
330     scheduler.run()
331     assert(scheduler.doneJobs == ["test0", "test1", "test2", "test3"])
332 eulisse 1.4
333     # Serial task scheduling two parallel task and another dependent
334     # serial task. This is actually what needs to be done for building
335     # packages. I.e.
336     # The first serial task is responsible for checking if a package is already there,
337     # then it queues a parallel download sources task, a subsequent build sources
338     # one and finally the install built package one.
339     scheduler = Scheduler(3)
340     scheduler.serial("check-pkg", [], scheduleMore, scheduler)
341     scheduler.run()
342     assert(scheduler.doneJobs == ["check-pkg", "download", "build", "install"])
343 eulisse 1.7
344     # Make sure we do not queue the same task.
345     scheduler = Scheduler(3)
346     try:
347     scheduler.parallel("test1", ["test1"], dummyTask)
348     assert(False)
349     except DependencyLoop, e:
350     assert(str(e) == "test1")