Package npsgd :: Module task_queue
[hide private]
[frames] | no frames]

Source Code for Module npsgd.task_queue

  1  # Author: Thomas Dimson [tdimson@gmail.com] 
  2  # Date:   January 2011 
  3  # For distribution details, see LICENSE 
  4  """Module containing a 'task queue' for the queue daemon.""" 
  5  import os 
  6  import sys 
  7  import time 
  8  import logging 
  9  import threading 
 10   
11 -class TaskQueueException(RuntimeError): pass
12
13 -class TaskQueue(object):
14 """Main queue object (thread safe). 15 16 Contains two internal queues: one for actually holding requests, and one 17 for holding the tasks that are currently being processed. This way, if 18 a task fails to process we can cycle it back into the requests queue a 19 few times to see if the error was transient. 20 """
21 - def __init__(self):
22 self.requests = [] 23 self.processingTasks = [] 24 self.lock = threading.RLock()
25
26 - def allRequests(self):
27 """Returns all requests in processing or requests queue. 28 29 This is really only useful for serializing the queue to disk.""" 30 with self.lock: 31 return self.requests + self.processingTasks
32 33
34 - def putTask(self, request):
35 """Puts a model into the queue for worker processing.""" 36 with self.lock: 37 self.requests.append(request) 38 39 logging.info("Added task to model queue as number %d", len(self.requests))
40
41 - def putTaskHead(self, request):
42 """Puts a model into queue at the head (useful for peeking).""" 43 with self.lock: 44 self.requests.insert(0,request)
45 46
47 - def putProcessingTask(self, task):
48 """Puts a model into the queue for worker processing.""" 49 now = time.time() 50 with self.lock: 51 self.processingTasks.append((task, now))
52
53 - def pullNextVersioned(self, modelVersions):
54 """Pulls the next model from the worker queue that matches versions.""" 55 with self.lock: 56 for i,task in enumerate(self.requests): 57 if [task.__class__.short_name, task.__class__.version] in modelVersions: 58 del self.requests[i] 59 return task 60 61 return None
62 63
64 - def pullNextTask(self):
65 """Pulls a model from the worker queue.""" 66 with self.lock: 67 return self.requests.pop(0)
68
69 - def touchProcessingTaskById(self, taskId):
70 """Update timestamp on a task that is currently processing.""" 71 72 now = time.time() 73 with self.lock: 74 for i, (task, taskTime) in enumerate(self.processingTasks): 75 if task.taskId == taskId: 76 self.processingTasks[i] = (task, now) 77 break 78 else: 79 raise TaskQueueException("Invalid id '%s'" % taskId)
80
81 - def hasProcessingTaskById(self, taskId):
82 with self.lock: 83 return any(e.taskId == taskId for (e,t) in self.processingTasks)
84
85 - def pullProcessingTasksOlderThan(self, oldTime):
86 """Pulls tasks out of the processing queue that are stale.""" 87 88 with self.lock: 89 expireTasks = [e for (e,t) in self.processingTasks if t <= oldTime] 90 self.processingTasks = [(e,t) for (e,t) in self.processingTasks if t > oldTime] 91 92 return expireTasks
93
94 - def pullProcessingTaskById(self, taskId):
95 with self.lock: 96 possibleTasks = [e for (e,t) in self.processingTasks if e.taskId == taskId] 97 self.processingTasks = [(e,t) for (e,t) in self.processingTasks if e.taskId != taskId] 98 99 if len(possibleTasks) == 0: 100 raise TaskQueueException("Invalid id '%s'" % taskId) 101 102 return possibleTasks[0]
103
104 - def isEmpty(self):
105 with self.lock: 106 return len(self.requests) == 0
107