1
2
3
4 """Module containing a 'task queue' for the queue daemon."""
5 import os
6 import sys
7 import time
8 import logging
9 import threading
10
12
14 """Main queue object (thread safe).
15
16 Contains two internal queues: one for actually holding requests, and one
17 for holding the tasks that are currently being processed. This way, if
18 a task fails to process we can cycle it back into the requests queue a
19 few times to see if the error was transient.
20 """
22 self.requests = []
23 self.processingTasks = []
24 self.lock = threading.RLock()
25
27 """Returns all requests in processing or requests queue.
28
29 This is really only useful for serializing the queue to disk."""
30 with self.lock:
31 return self.requests + self.processingTasks
32
33
35 """Puts a model into the queue for worker processing."""
36 with self.lock:
37 self.requests.append(request)
38
39 logging.info("Added task to model queue as number %d", len(self.requests))
40
42 """Puts a model into queue at the head (useful for peeking)."""
43 with self.lock:
44 self.requests.insert(0,request)
45
46
48 """Puts a model into the queue for worker processing."""
49 now = time.time()
50 with self.lock:
51 self.processingTasks.append((task, now))
52
54 """Pulls the next model from the worker queue that matches versions."""
55 with self.lock:
56 for i,task in enumerate(self.requests):
57 if [task.__class__.short_name, task.__class__.version] in modelVersions:
58 del self.requests[i]
59 return task
60
61 return None
62
63
65 """Pulls a model from the worker queue."""
66 with self.lock:
67 return self.requests.pop(0)
68
70 """Update timestamp on a task that is currently processing."""
71
72 now = time.time()
73 with self.lock:
74 for i, (task, taskTime) in enumerate(self.processingTasks):
75 if task.taskId == taskId:
76 self.processingTasks[i] = (task, now)
77 break
78 else:
79 raise TaskQueueException("Invalid id '%s'" % taskId)
80
82 with self.lock:
83 return any(e.taskId == taskId for (e,t) in self.processingTasks)
84
86 """Pulls tasks out of the processing queue that are stale."""
87
88 with self.lock:
89 expireTasks = [e for (e,t) in self.processingTasks if t <= oldTime]
90 self.processingTasks = [(e,t) for (e,t) in self.processingTasks if t > oldTime]
91
92 return expireTasks
93
95 with self.lock:
96 possibleTasks = [e for (e,t) in self.processingTasks if e.taskId == taskId]
97 self.processingTasks = [(e,t) for (e,t) in self.processingTasks if e.taskId != taskId]
98
99 if len(possibleTasks) == 0:
100 raise TaskQueueException("Invalid id '%s'" % taskId)
101
102 return possibleTasks[0]
103
105 with self.lock:
106 return len(self.requests) == 0
107