ViewVC Help
View File | Revision Log | Show Annotations | Root Listing
root/cvsroot/COMP/PKGTOOLS/scheduler.py
Revision: 1.1
Committed: Wed Jan 4 14:07:53 2012 UTC (13 years, 4 months ago) by eulisse
Content type: text/x-python
Branch: MAIN
Log Message:
New scheduler to be used for fetch / build / install tasks.

* Lockless.
* One queue for parallel tasks, one queue serial tasks.
* Handles dependencies.
* In case of errors, completes all the completable tasks rather than quitting
  immediately.

Still needs some cleanups, but it's good enough to attempt integrating it in
cmsBuild. Unit tests by simply executing the file.

File Contents

# User Rev Content
1 eulisse 1.1 from Queue import Queue
2     from threading import Thread
3     from time import sleep
4     import threading
5    
6     # Helper class to avoid conflict between result
7     # codes and quit state transition.
8     class _SchedulerQuitCommand(object):
9     pass
10    
11     def transition(what, fromList, toList):
12     try:
13     fromList.remove(what)
14     except ValueError, e:
15     print what + " not in source list"
16     raise e
17     toList.append(what)
18    
19     class Scheduler(object):
20     # A simple job scheduler.
21     # Workers queue is to specify to threads what to do. Results
22     # queue is whatched by the master thread to wait for results
23     # of workers computations.
24     # All worker threads begin trying to fetch from the command queue (and are
25     # therefore blocked).
26     # Master thread does the scheduling and then sits waiting for results.
27     # Scheduling implies iterating on the list of jobs and creates an entry
28     # in the parallel queue for all the jobs which does not have any dependency
29     # which is not done.
30     # If a job has dependencies which did not build, move it do the failed queue.
31     # Post an appropriate build command for all the new jobs which got added.
32     # If there are jobs still to be done, post a reschedule job on the command queue
33     # if there are no jobs left, post the "kill worker" task.
34     def __init__(self, parallelThreads, logDelegate=None):
35     self.workersQueue = Queue()
36     self.resultsQueue = Queue()
37     self.jobs = {}
38     self.pendingJobs = []
39     self.runningJobs = []
40     self.doneJobs = []
41     self.brokenJobs = []
42     self.parallelThreads = parallelThreads
43     self.logDelegate = logDelegate
44     if not logDelegate:
45     self.logDelegate = self.__doLog
46    
47     def run(self):
48     for i in xrange(self.parallelThreads):
49     t = Thread(target=self.createWorker())
50     t.daemon = True
51     t.start()
52    
53     self.notifyMaster(self.reschedule)
54     # Wait until all the workers are done.
55     while self.parallelThreads:
56     try:
57     who, item = self.resultsQueue.get()
58     print who
59     item[0](*item[1:])
60     except KeyboardInterrupt:
61     print "Ctrl-c received, waiting for workers to finish"
62     self.shout(self.quit)
63    
64     # Prune the queue.
65     while self.resultsQueue.full():
66     item = self.resultsQueue.get()
67     item[0](*item[1:])
68     return
69    
70     # Create a worker.
71     def createWorker(self):
72     def worker():
73     while True:
74     taskId, item = self.workersQueue.get()
75     result = item[0](*item[1:])
76     if type(result) == _SchedulerQuitCommand:
77     self.notifyMaster(self.releaseWorker)
78     return
79     self.log(str(item) + " done")
80     self.notifyMaster(self.__updateJobStatus, taskId, result)
81     self.notifyMaster(self.reschedule)
82     # Only in 2.5: self.workersQueue.task_done()
83     return worker
84    
85     def releaseWorker(self):
86     self.parallelThreads -= 1
87    
88     def put(self, taskId, deps, *spec):
89     if taskId in self.pendingJobs:
90     self.log("Double task %s" % taskId)
91     self.jobs[taskId] = {"deps": deps, "spec":spec}
92     self.pendingJobs.append(taskId)
93    
94     # Does the rescheduling of tasks. Derived class should call it.
95     def reschedule(self):
96     # First of all clean up the pending jobs from all those
97     # which have broken dependencies.
98     for taskId in self.pendingJobs:
99     brokenDeps = [dep for dep in self.jobs[taskId]["deps"] if dep in self.brokenJobs]
100     if not brokenDeps:
101     continue
102     transition(taskId, self.pendingJobs, self.brokenJobs)
103    
104     # If no tasks left, quit.
105     if not self.pendingJobs:
106     self.shout(self.quit)
107     self.notifyMaster(self.quit)
108     return
109    
110     # Otherwise do another round of scheduling of all the tasks.
111     for taskId in self.pendingJobs:
112     pendingDeps = [dep for dep in self.jobs[taskId]["deps"] if not dep in self.doneJobs]
113     if pendingDeps:
114     continue
115     # No broken dependencies and no pending ones. we can continue.
116     transition(taskId, self.pendingJobs, self.runningJobs)
117     self.__schedule(taskId, self.jobs[taskId]["spec"])
118    
119     # Update the job with the result of running.
120     def __updateJobStatus(self, taskId, error):
121     if not error:
122     transition(taskId, self.runningJobs, self.doneJobs)
123     return
124     transition(taskId, self.runningJobs, self.brokenJobs)
125    
126     # One task at the time.
127     def __schedule(self, taskId, commandSpec):
128     self.workersQueue.put((taskId, commandSpec))
129    
130     # Helper to enqueue commands for all the threads.
131     def shout(self, *commandSpec):
132     for x in xrange(self.parallelThreads):
133     self.__schedule("quit-" + str(x), commandSpec)
134    
135     # Helper to enqueu replies to the master thread.
136     def notifyMaster(self, *commandSpec):
137     self.resultsQueue.put((threading.current_thread(), commandSpec))
138    
139     # Helper method to do logging:
140     def log(self, s):
141     self.notifyMaster(self.logDelegate, s)
142    
143     # Task which forces a worker to quit.
144     def quit(self):
145     self.log("Requested to quit.")
146     return _SchedulerQuitCommand()
147    
148     # Helper for printouts.
149     def __doLog(self, s):
150     print s
151    
152     def dummyTask():
153     sleep(0.1)
154    
155     def dummyTaskLong():
156     sleep(1)
157    
158     def errorTask():
159     return "This will always have an error"
160    
161     if __name__ == "__main__":
162     scheduler = Scheduler(10)
163     scheduler.run()
164    
165     scheduler = Scheduler(1)
166     scheduler.run()
167    
168     scheduler = Scheduler(10)
169     scheduler.put("test", [], scheduler.log, "This is england");
170     scheduler.run()
171    
172     scheduler = Scheduler(10)
173     for x in xrange(50):
174     scheduler.put("test" + str(x), [], dummyTask)
175     scheduler.run()
176     assert(len(scheduler.brokenJobs) == 0)
177     assert(len(scheduler.jobs) == 50)
178    
179     scheduler = Scheduler(1)
180     scheduler.put("test", [], errorTask)
181     scheduler.run()
182     assert(len(scheduler.brokenJobs) == 1)
183     assert(len(scheduler.runningJobs) == 0)
184     assert(len(scheduler.doneJobs) == 0)
185    
186     # Check dependency actually works.
187     scheduler = Scheduler(10)
188     scheduler.put("test2", ["test1"], dummyTask)
189     scheduler.put("test1", [], dummyTaskLong)
190     scheduler.run()
191     assert(scheduler.doneJobs == ["test1", "test2"])
192    
193     # Check dependency actually works.
194     scheduler = Scheduler(10)
195     scheduler.put("test3", ["test2"], dummyTask)
196     scheduler.put("test2", ["test1"], errorTask)
197     scheduler.put("test1", [], dummyTaskLong)
198     scheduler.run()
199     assert(scheduler.doneJobs == ["test1"])
200     assert(scheduler.brokenJobs == ["test2", "test3"])
201    
202     # Check ctrl-C will exit properly.
203     scheduler = Scheduler(2)
204     for x in xrange(250):
205     scheduler.put("test" + str(x), [], dummyTask)
206     print "Print Control-C to continue"
207     scheduler.run()