ViewVC Help
View File | Revision Log | Show Annotations | Root Listing
root/cvsroot/COMP/PKGTOOLS/scheduler.py
Revision: 1.3
Committed: Fri Jan 6 16:05:53 2012 UTC (13 years, 3 months ago) by eulisse
Content type: text/x-python
Branch: MAIN
Changes since 1.2: +104 -32 lines
Log Message:
Support for serial tasks.

* Serial tasks get executed once at the time (doh).
* Serial tasks can depend on each other (no checks for dependency loops).
* Serial and parallel tasks can be executed in parallel (doh) and can depend
  on each other.
* Scheduler.put() method renamed to Scheduler.parallel().

File Contents

# User Rev Content
1 eulisse 1.1 from Queue import Queue
2     from threading import Thread
3     from time import sleep
4     import threading
5    
6     # Helper class to avoid conflict between result
7     # codes and quit state transition.
8     class _SchedulerQuitCommand(object):
9     pass
10    
11     def transition(what, fromList, toList):
12     try:
13     fromList.remove(what)
14     except ValueError, e:
15     print what + " not in source list"
16     raise e
17     toList.append(what)
18    
19     class Scheduler(object):
20     # A simple job scheduler.
21     # Workers queue is to specify to threads what to do. Results
22     # queue is whatched by the master thread to wait for results
23     # of workers computations.
24     # All worker threads begin trying to fetch from the command queue (and are
25     # therefore blocked).
26     # Master thread does the scheduling and then sits waiting for results.
27     # Scheduling implies iterating on the list of jobs and creates an entry
28     # in the parallel queue for all the jobs which does not have any dependency
29     # which is not done.
30     # If a job has dependencies which did not build, move it do the failed queue.
31     # Post an appropriate build command for all the new jobs which got added.
32     # If there are jobs still to be done, post a reschedule job on the command queue
33     # if there are no jobs left, post the "kill worker" task.
34     def __init__(self, parallelThreads, logDelegate=None):
35     self.workersQueue = Queue()
36     self.resultsQueue = Queue()
37     self.jobs = {}
38     self.pendingJobs = []
39     self.runningJobs = []
40     self.doneJobs = []
41     self.brokenJobs = []
42     self.parallelThreads = parallelThreads
43     self.logDelegate = logDelegate
44 eulisse 1.2 self.errors = {}
45 eulisse 1.1 if not logDelegate:
46     self.logDelegate = self.__doLog
47    
48     def run(self):
49     for i in xrange(self.parallelThreads):
50 eulisse 1.3 t = Thread(target=self.__createWorker())
51 eulisse 1.1 t.daemon = True
52     t.start()
53    
54 eulisse 1.3 self.notifyMaster(self.__rescheduleParallel)
55 eulisse 1.1 # Wait until all the workers are done.
56     while self.parallelThreads:
57     try:
58     who, item = self.resultsQueue.get()
59     print who
60     item[0](*item[1:])
61     except KeyboardInterrupt:
62     print "Ctrl-c received, waiting for workers to finish"
63     self.shout(self.quit)
64    
65     # Prune the queue.
66     while self.resultsQueue.full():
67     item = self.resultsQueue.get()
68     item[0](*item[1:])
69     return
70    
71     # Create a worker.
72 eulisse 1.3 def __createWorker(self):
73 eulisse 1.1 def worker():
74     while True:
75     taskId, item = self.workersQueue.get()
76 eulisse 1.2 try:
77     result = item[0](*item[1:])
78     except Exception, e:
79     result = str(e)
80    
81 eulisse 1.1 if type(result) == _SchedulerQuitCommand:
82 eulisse 1.3 self.notifyMaster(self.__releaseWorker)
83 eulisse 1.1 return
84     self.log(str(item) + " done")
85     self.notifyMaster(self.__updateJobStatus, taskId, result)
86 eulisse 1.3 self.notifyMaster(self.__rescheduleParallel)
87 eulisse 1.1 # Only in 2.5: self.workersQueue.task_done()
88     return worker
89    
90 eulisse 1.3 def __releaseWorker(self):
91 eulisse 1.1 self.parallelThreads -= 1
92    
93 eulisse 1.3 def parallel(self, taskId, deps, *spec):
94 eulisse 1.1 if taskId in self.pendingJobs:
95     self.log("Double task %s" % taskId)
96 eulisse 1.3 self.jobs[taskId] = {"scheduler": "parallel", "deps": deps, "spec":spec}
97 eulisse 1.1 self.pendingJobs.append(taskId)
98    
99     # Does the rescheduling of tasks. Derived class should call it.
100 eulisse 1.3 def __rescheduleParallel(self):
101     parallelJobs = [j for j in self.pendingJobs if self.jobs[j]["scheduler"] == "parallel"]
102     # First of all clean up the pending parallel jobs from all those
103 eulisse 1.1 # which have broken dependencies.
104 eulisse 1.3 for taskId in parallelJobs:
105 eulisse 1.1 brokenDeps = [dep for dep in self.jobs[taskId]["deps"] if dep in self.brokenJobs]
106     if not brokenDeps:
107     continue
108     transition(taskId, self.pendingJobs, self.brokenJobs)
109 eulisse 1.2 self.errors[taskId] = "The following dependencies could not complete:\n%s" % "\n".join(brokenDeps)
110 eulisse 1.1
111 eulisse 1.3 # If no tasks left, quit. Notice we need to check also for serial jobs
112     # since they might queue more parallel payloads.
113 eulisse 1.1 if not self.pendingJobs:
114     self.shout(self.quit)
115     self.notifyMaster(self.quit)
116 eulisse 1.3 return
117 eulisse 1.1
118 eulisse 1.3 # Otherwise do another round of scheduling of all the tasks. In this
119     # case we only queue parallel jobs to the parallel queue.
120     for taskId in parallelJobs:
121 eulisse 1.1 pendingDeps = [dep for dep in self.jobs[taskId]["deps"] if not dep in self.doneJobs]
122     if pendingDeps:
123     continue
124     # No broken dependencies and no pending ones. we can continue.
125     transition(taskId, self.pendingJobs, self.runningJobs)
126 eulisse 1.3 self.__scheduleParallel(taskId, self.jobs[taskId]["spec"])
127 eulisse 1.1
128     # Update the job with the result of running.
129     def __updateJobStatus(self, taskId, error):
130     if not error:
131     transition(taskId, self.runningJobs, self.doneJobs)
132     return
133     transition(taskId, self.runningJobs, self.brokenJobs)
134 eulisse 1.2 self.errors[taskId] = error
135 eulisse 1.1
136     # One task at the time.
137 eulisse 1.3 def __scheduleParallel(self, taskId, commandSpec):
138 eulisse 1.1 self.workersQueue.put((taskId, commandSpec))
139    
140     # Helper to enqueue commands for all the threads.
141     def shout(self, *commandSpec):
142     for x in xrange(self.parallelThreads):
143 eulisse 1.3 self.__scheduleParallel("quit-" + str(x), commandSpec)
144 eulisse 1.1
145     # Helper to enqueu replies to the master thread.
146     def notifyMaster(self, *commandSpec):
147     self.resultsQueue.put((threading.current_thread(), commandSpec))
148 eulisse 1.3
149     def serial(self, taskId, deps, *commandSpec):
150     spec = [self.doSerial, taskId, deps] + list(commandSpec)
151     self.resultsQueue.put((threading.current_thread(), spec))
152     self.jobs[taskId] = {"scheduler": "serial", "deps": deps, "spec": spec}
153     self.pendingJobs.append(taskId)
154    
155     def doSerial(self, taskId, deps, *commandSpec):
156     brokenDeps = [dep for dep in deps if dep in self.brokenJobs]
157     if brokenDeps:
158     transition(taskId, self.pendingJobs, self.brokenJobs)
159     self.errors[taskId] = "The following dependencies could not complete:\n%s" % "\n".join(brokenDeps)
160     # Remember to do the scheduling again!
161     self.notifyMaster(self.__rescheduleParallel)
162     return
163    
164     # Put back the task on the queue, since it has pending dependencies.
165     pendingDeps = [dep for dep in deps if not dep in self.doneJobs]
166     if pendingDeps:
167     self.resultsQueue.put((threading.current_thread(), [self.doSerial, taskId, deps] + list(commandSpec)))
168     return
169     # No broken dependencies and no pending ones. Run the job.
170     transition(taskId, self.pendingJobs, self.runningJobs)
171     try:
172     result = commandSpec[0](*commandSpec[1:])
173     except Exception, e:
174     result = str(e)
175     self.__updateJobStatus(taskId, result)
176     # Remember to do the scheduling again!
177     self.notifyMaster(self.__rescheduleParallel)
178 eulisse 1.1
179     # Helper method to do logging:
180     def log(self, s):
181     self.notifyMaster(self.logDelegate, s)
182    
183     # Task which forces a worker to quit.
184     def quit(self):
185     self.log("Requested to quit.")
186     return _SchedulerQuitCommand()
187    
188     # Helper for printouts.
189     def __doLog(self, s):
190     print s
191    
192     def dummyTask():
193     sleep(0.1)
194    
195     def dummyTaskLong():
196     sleep(1)
197    
198     def errorTask():
199     return "This will always have an error"
200    
201 eulisse 1.2 def exceptionTask():
202     raise Exception("foo")
203    
204 eulisse 1.1 if __name__ == "__main__":
205     scheduler = Scheduler(10)
206     scheduler.run()
207    
208     scheduler = Scheduler(1)
209     scheduler.run()
210    
211     scheduler = Scheduler(10)
212 eulisse 1.3 scheduler.parallel("test", [], scheduler.log, "This is england");
213 eulisse 1.1 scheduler.run()
214    
215     scheduler = Scheduler(10)
216     for x in xrange(50):
217 eulisse 1.3 scheduler.parallel("test" + str(x), [], dummyTask)
218 eulisse 1.1 scheduler.run()
219     assert(len(scheduler.brokenJobs) == 0)
220     assert(len(scheduler.jobs) == 50)
221    
222     scheduler = Scheduler(1)
223 eulisse 1.3 scheduler.parallel("test", [], errorTask)
224 eulisse 1.1 scheduler.run()
225     assert(len(scheduler.brokenJobs) == 1)
226     assert(len(scheduler.runningJobs) == 0)
227     assert(len(scheduler.doneJobs) == 0)
228    
229     # Check dependency actually works.
230     scheduler = Scheduler(10)
231 eulisse 1.3 scheduler.parallel("test2", ["test1"], dummyTask)
232     scheduler.parallel("test1", [], dummyTaskLong)
233 eulisse 1.1 scheduler.run()
234     assert(scheduler.doneJobs == ["test1", "test2"])
235    
236     # Check dependency actually works.
237     scheduler = Scheduler(10)
238 eulisse 1.3 scheduler.parallel("test3", ["test2"], dummyTask)
239     scheduler.parallel("test2", ["test1"], errorTask)
240     scheduler.parallel("test1", [], dummyTaskLong)
241 eulisse 1.1 scheduler.run()
242     assert(scheduler.doneJobs == ["test1"])
243     assert(scheduler.brokenJobs == ["test2", "test3"])
244    
245     # Check ctrl-C will exit properly.
246     scheduler = Scheduler(2)
247     for x in xrange(250):
248 eulisse 1.3 scheduler.parallel("test" + str(x), [], dummyTask)
249 eulisse 1.1 print "Print Control-C to continue"
250 eulisse 1.2 scheduler.run()
251    
252     # Handle tasks with exceptions.
253     scheduler = Scheduler(2)
254 eulisse 1.3 scheduler.parallel("test", [], exceptionTask)
255 eulisse 1.2 scheduler.run()
256     assert(scheduler.errors["test"] == "foo")
257    
258     # Handle tasks which depend on tasks with exceptions.
259     scheduler = Scheduler(2)
260 eulisse 1.3 scheduler.parallel("test0", [], dummyTask)
261     scheduler.parallel("test1", [], exceptionTask)
262     scheduler.parallel("test2", ["test1"], dummyTask)
263 eulisse 1.2 scheduler.run()
264     assert(scheduler.errors["test1"])
265     assert(scheduler.errors["test2"])
266 eulisse 1.3
267     # Handle serial execution tasks.
268     scheduler = Scheduler(2)
269     scheduler.serial("test0", [], dummyTask)
270     scheduler.run()
271     assert(scheduler.doneJobs == ["test0"])
272    
273     # Handle serial execution tasks, one depends from
274     # the previous one.
275     scheduler = Scheduler(2)
276     scheduler.serial("test0", [], dummyTask)
277     scheduler.serial("test1", ["test0"], dummyTask)
278     scheduler.run()
279     assert(scheduler.doneJobs == ["test0", "test1"])
280    
281     # Serial tasks depending on one another.
282     scheduler = Scheduler(2)
283     scheduler.serial("test1", ["test0"], dummyTask)
284     scheduler.serial("test0", [], dummyTask)
285     scheduler.run()
286     assert(scheduler.doneJobs == ["test0", "test1"])
287    
288     # Serial and parallel tasks being scheduled at the same time.
289     scheduler = Scheduler(2)
290     scheduler.serial("test1", ["test0"], dummyTask)
291     scheduler.serial("test0", [], dummyTask)
292     scheduler.parallel("test2", [], dummyTask)
293     scheduler.parallel("test3", [], dummyTask)
294     scheduler.run()
295     scheduler.doneJobs.sort()
296     assert(scheduler.doneJobs == ["test0", "test1", "test2", "test3"])
297    
298     # Serial and parallel tasks. Parallel depends on serial.
299     scheduler = Scheduler(2)
300     scheduler.serial("test1", ["test0"], dummyTask)
301     scheduler.serial("test0", [], dummyTask)
302     scheduler.parallel("test2", ["test1"], dummyTask)
303     scheduler.parallel("test3", ["test2"], dummyTask)
304     scheduler.run()
305     assert(scheduler.doneJobs == ["test0", "test1", "test2", "test3"])