Module npsgd_queue
[hide private]
[frames] | no frames]

Source Code for Module npsgd_queue

  1  #!/usr/bin/python 
  2  # Author: Thomas Dimson [tdimson@gmail.com] 
  3  # Date:   January 2011 
  4  # For distribution details, see LICENSE 
  5   
  6  """Queue server for npsgd modelling tasks. 
  7   
  8  The queue server is the data backend for NPSGD. It listens to both workers and 
  9  the web interface. The web interface populates it with requests while the workers 
 10  poll for requests and pull them off the queue. Additionally, the queue is  
 11  responsible for sending out confirmation code e-mail messages. 
 12  """ 
 13  import os 
 14  import sys 
 15  import anydbm 
 16  import shelve 
 17  import logging 
 18  import tornado.web 
 19  import tornado.ioloop 
 20  import tornado.escape 
 21  import tornado.httpserver 
 22  import threading 
 23  from datetime import datetime 
 24  from optparse import OptionParser 
 25   
 26  import npsgd.email_manager 
 27  from npsgd.email_manager import Email 
 28  from npsgd import model_manager 
 29  from npsgd.config import config 
 30  from npsgd.task_queue import TaskQueue 
 31  from npsgd.task_queue import TaskQueueException 
 32  from npsgd.confirmation_map import ConfirmationMap 
 33  from npsgd.model_manager import modelManager 
 34   
 35  glb = None 
 36  """Queue globals object - assigned at startup.""" 
 37   
38 -class QueueGlobals(object):
39 """Queue state objects along with disk serialization mechanisms for them.""" 40
41 - def __init__(self, shelve):
42 self.shelve = shelve 43 self.shelveLock = threading.RLock() 44 self.idLock = threading.RLock() 45 self.taskQueue = TaskQueue() 46 self.confirmationMap = ConfirmationMap() 47 if shelve.has_key("idCounter"): 48 self.idCounter = shelve["idCounter"] 49 else: 50 self.idCounter = 0 51 52 self.loadDiskTaskQueue() 53 self.loadConfirmationMap() 54 self.expireWorkerTaskThread = ExpireWorkerTaskThread(self.taskQueue) 55 self.lastWorkerCheckin = datetime(1,1,1)
56
57 - def loadDiskTaskQueue(self):
58 """Load task queue from disk using the shelve reserved for the queue.""" 59 60 if not self.shelve.has_key("taskQueue"): 61 logging.info("Unable to read task queue from disk db, starting fresh") 62 return 63 64 logging.info("Reading task queue from disk") 65 readTasks = 0 66 failedTasks = 0 67 taskDicts = self.shelve["taskQueue"] 68 for taskDict in taskDicts: 69 try: 70 task = modelManager.getModelFromTaskDict(taskDict) 71 except model_manager.InvalidModelError, e: 72 emailAddress = taskDict["emailAddress"] 73 subject = config.lostTaskEmailSubject.generate(full_name=taskDict["modelFullName"], 74 visibleId=taskDict["visibleId"]) 75 body = config.lostTaskEmailTemplate.generate() 76 emailObject = Email(emailAddress, subject, body) 77 logging.info("Invalid model-version pair, notifying %s", emailAddress) 78 npsgd.email_manager.backgroundEmailSend(Email(emailAddress, subject, body)) 79 failedTasks += 1 80 continue 81 82 readTasks += 1 83 self.taskQueue.putTask(task) 84 85 logging.info("Read %s tasks, failed while reading %s tasks", readTasks, failedTasks)
86
87 - def loadConfirmationMap(self):
88 """Load confirmation map ([code, modelDict] pairs) from shelve reserved for the queue.""" 89 90 if not self.shelve.has_key("confirmationMap"): 91 logging.info("Unable to read confirmation map from disk db, starting fresh") 92 return 93 94 logging.info("Reading confirmation map from disk") 95 confirmationMapEntries = self.shelve["confirmationMap"] 96 97 readCodes = 0 98 failedCodes = 0 99 for code, taskDict in confirmationMapEntries.iteritems(): 100 try: 101 task = modelManager.getModelFromTaskDict(taskDict) 102 except model_manager.InvalidModelError, e: 103 emailAddress = taskDict["emailAddress"] 104 subject = config.confirmationFailedEmailSubject.generate(full_name=taskDict["modelFullName"], 105 visibleId=taskDict["visibleId"]) 106 body = config.confirmationFailedEmailTemplate.generate(code=code) 107 emailObject = Email(emailAddress, subject, body) 108 logging.info("Invalid model-version pair, notifying %s", emailAddress) 109 npsgd.email_manager.backgroundEmailSend(Email(emailAddress, subject, body)) 110 failedCodes += 1 111 continue 112 113 readCodes += 1 114 self.confirmationMap.putRequestWithCode(task, code) 115 116 logging.info("Read %s codes, failed while reading %s codes", readCodes, failedCodes)
117 118
119 - def syncShelve(self):
120 """Serializes the task queue, confirmation map and id counter to disk using the queue shelve.""" 121 try: 122 with self.shelveLock: 123 self.shelve["taskQueue"] = [e.asDict() \ 124 for e in self.taskQueue.allRequests()] 125 self.shelve["confirmationMap"] = dict( (code, task.asDict())\ 126 for (code, task) in self.confirmationMap.getRequestsWithCodes()) 127 128 with self.idLock: 129 self.shelve["idCounter"] = self.idCounter 130 except pickle.PicklingError, e: 131 logging.warning("Unable sync task queue and confirmation error to disk due to a pickling (serialization error): %s", e) 132 return 133 134 logging.info("Synced queue and confirmation map to disk")
135
136 - def touchWorkerCheckin(self):
137 self.lastWorkerCheckin = datetime.now()
138
139 - def newTaskId(self):
140 with self.idLock: 141 self.idCounter += 1 142 return self.idCounter
143
144 -class ExpireWorkerTaskThread(threading.Thread):
145 """Task Expiration Thread 146 147 Moves tasks back into the queue whenever 148 We haven't heard from a worker in a while 149 """ 150
151 - def __init__(self, taskQueue):
152 threading.Thread.__init__(self) 153 self.daemon = True 154 self.taskQueue = taskQueue 155 self.done = threading.Event()
156
157 - def run(self):
158 logging.info("Expire worker task thread booting up...") 159 160 while True: 161 self.done.wait(config.keepAliveInterval) 162 if self.done.isSet(): 163 break 164 165 badTasks = self.taskQueue.pullProcessingTasksOlderThan( 166 time.time() - config.keepAliveTimeout) 167 168 if len(badTasks) > 0: 169 logging.info("Found %d tasks to expire", len(badTasks)) 170 171 for task in badTasks: 172 task.failureCount += 1 173 logging.warning("Task '%s' failed due to timeout (failure #%d)", task.taskId, task.failureCount) 174 if task.failureCount > config.maxJobFailures: 175 logging.warning("Exceeded max job failures, sending fail email") 176 npsgd.email_manager.backgroundEmailSend(task.failureEmail()) 177 else: 178 logging.warning("Inserting task back in to queue with new taskId") 179 task.taskId = glb.newTaskId() 180 self.taskQueue.putTask(task)
181
182 -class QueueRequestHandler(tornado.web.RequestHandler):
183 """Superclass to all queue request methods."""
184 - def checkSecret(self):
185 """Checks the request for a 'secret' parameter that matches the queue's own.""" 186 if self.get_argument("secret") == config.requestSecret: 187 return True 188 else: 189 self.write(tornado.escape.json_encode({"error": "bad_secret"})) 190 return False
191
192 -class ClientModelCreate(QueueRequestHandler):
193 """HTTP handler for clients creating a model request (before confirmation).""" 194
195 - def post(self):
196 """Post handler for model requests from the web daemon. 197 198 Attempts to build a model from its known models (essentially performing 199 parameter verification) then places a request in the queue if it succeeds. 200 Additionally, it will send out an e-mail to the user for confirmation of 201 the request 202 """ 203 204 if not self.checkSecret(): 205 return 206 207 task_json = tornado.escape.json_decode(self.get_argument("task_json")) 208 task = modelManager.getModelFromTaskDict(task_json) 209 task.taskId = glb.newTaskId() 210 code = glb.confirmationMap.putRequest(task) 211 212 emailAddress = task.emailAddress 213 logging.info("Generated a request for %s, confirmation %s required", emailAddress, code) 214 subject = config.confirmEmailSubject.generate(task=task) 215 body = config.confirmEmailTemplate.generate(code=code, task=task, expireDelta=config.confirmTimeout) 216 emailObject = Email(emailAddress, subject, body) 217 npsgd.email_manager.backgroundEmailSend(emailObject) 218 glb.syncShelve() 219 self.write(tornado.escape.json_encode({ 220 "response": { 221 "task" : task.asDict(), 222 "code" : code 223 } 224 }))
225
226 -class ClientQueueHasWorkers(QueueRequestHandler):
227 """Request handler for the web daemon to check if workers are available. 228 229 We keep track of the last time workers checked into the queue in order 230 to ensure that all requests can be processed. 231 """ 232
233 - def get(self):
234 if not self.checkSecret(): 235 return 236 237 td = datetime.now() - glb.lastWorkerCheckin 238 hasWorkers = (td.seconds + td.days * 24 * 3600) < config.keepAliveTimeout 239 240 self.write(tornado.escape.json_encode({ 241 "response": { 242 "has_workers" : hasWorkers 243 } 244 }))
245 246 247 previouslyConfirmed = set()
248 -class ClientConfirm(QueueRequestHandler):
249 """HTTP handler for clients confirming a model request. 250 251 This handler moves requests from the confirmation map to the general 252 request queue for processing. 253 """
254 - def get(self, code):
255 global previouslyConfirmed 256 257 if not self.checkSecret(): 258 return 259 260 try: 261 #Expire old confirmations first, just in case 262 glb.confirmationMap.expireConfirmations() 263 confirmedRequest = glb.confirmationMap.getRequest(code) 264 previouslyConfirmed.add(code) 265 except KeyError, e: 266 if code in previouslyConfirmed: 267 self.write(tornado.escape.json_encode({ 268 "response": "already_confirmed" 269 })) 270 return 271 else: 272 raise tornado.web.HTTPError(404) 273 274 glb.taskQueue.putTask(confirmedRequest) 275 glb.syncShelve() 276 self.write(tornado.escape.json_encode({ 277 "response": "okay" 278 }))
279 280
281 -class WorkerInfo(QueueRequestHandler):
282 """HTTP handler for workers checking into the queue.""" 283
284 - def get(self):
285 if not self.checkSecret(): 286 return 287 288 glb.touchWorkerCheckin() 289 self.write("{}")
290
291 -class WorkerTaskKeepAlive(QueueRequestHandler):
292 """HTTP handler for workers pinging the queue while working on a task. 293 294 Having this request makes sure that we don't time out any jobs that 295 are currently being handled by some worker. If a worker goes down, 296 we will put the job back into the queue because this request won't have 297 been made. 298 """
299 - def get(self, taskIdString):
300 if not self.checkSecret(): 301 return 302 glb.touchWorkerCheckin() 303 taskId = int(taskIdString) 304 logging.info("Got heartbeat for task id '%s'", taskId) 305 try: 306 task = glb.taskQueue.touchProcessingTaskById(taskId) 307 except TaskQueueException, e: 308 logging.info("Bad keep alive request: no such task id '%s' exists" % taskId) 309 self.write(tornado.escape.json_encode({ 310 "error": {"type" : "bad_id" } 311 })) 312 313 self.write("{}")
314
315 -class WorkerSucceededTask(QueueRequestHandler):
316 """HTTP handler for workers telling the queue that they have succeeded processing. 317 318 After this request, the queue no longer needs to keep track of the job in any way 319 and declares it complete. 320 """ 321
322 - def get(self, taskIdString):
323 if not self.checkSecret(): 324 return 325 glb.touchWorkerCheckin() 326 taskId = int(taskIdString) 327 try: 328 task = glb.taskQueue.pullProcessingTaskById(taskId) 329 except TaskQueueException, e: 330 logging.info("Bad succeed request: no task id exists") 331 self.write(tornado.escape.json_encode({ 332 "error": {"type" : "bad_id" } 333 })) 334 return 335 336 glb.syncShelve() 337 self.write(tornado.escape.json_encode({ 338 "status": "okay" 339 }))
340
341 -class WorkerHasTask(QueueRequestHandler):
342 """HTTP handler for workers ensuring that a job still exists. 343 344 This handler helps eliminate certain race conditions in NPSGD. Before a 345 worker sends an e-mail with job results, it checks back with the queue to 346 make sure that the job hasn't already been handler by another worker 347 (this could happen if the queue declares that the first worker had timed out). 348 If there is no task with that id still in the processing list then 349 an e-mail being sent out would be a duplicate. 350 """ 351
352 - def get(self, taskIdString):
353 if not self.checkSecret(): 354 return 355 356 glb.touchWorkerCheckin() 357 taskId = int(taskIdString) 358 logging.info("Got 'has task' request for task of id '%d'", taskId) 359 if glb.taskQueue.hasProcessingTaskById(taskId): 360 self.write(tornado.escape.json_encode({ 361 "response": "yes" 362 })) 363 else: 364 self.write(tornado.escape.json_encode({ 365 "response": "no" 366 }))
367
368 -class WorkerFailedTask(QueueRequestHandler):
369 """HTTP handler for workers reporting failure to complete a job. 370 371 Upon failure, we will either recycle the request into the queue or we will 372 report a failure (with an e-mail message to the user). 373 """ 374
375 - def get(self, taskIdString):
376 if not self.checkSecret(): 377 return 378 379 glb.touchWorkerCheckin() 380 taskId = int(taskIdString) 381 try: 382 task = glb.taskQueue.pullProcessingTaskById(taskId) 383 except TaskQueueException, e: 384 logging.info("Bad failed request: no such task id exists, ignoring request") 385 self.write(tornado.escape.json_encode({ 386 "error": {"type" : "bad_id" } 387 })) 388 return 389 390 task.failureCount += 1 391 logging.warning("Worker had a failure while processing task '%s' (failure #%d)",\ 392 task.taskId, task.failureCount) 393 394 if task.failureCount >= config.maxJobFailures: 395 logging.warning("Max job failures found, sending failure email") 396 npsgd.email_manager.backgroundEmailSend(task.failureEmail()) 397 else: 398 logging.warning("Returning task to queue for another attempt") 399 glb.taskQueue.putTask(task) 400 401 self.write(tornado.escape.json_encode({ 402 "status": "okay" 403 }))
404 405
406 -class WorkerTaskRequest(QueueRequestHandler):
407 """HTTP handler for workers grabbings tasks off the queue."""
408 - def post(self):
409 if not self.checkSecret(): 410 return 411 412 modelVersions = tornado.escape.json_decode(self.get_argument("model_versions_json")) 413 414 glb.touchWorkerCheckin() 415 logging.info("Received worker task request with models %s", modelVersions) 416 if glb.taskQueue.isEmpty(): 417 self.write(tornado.escape.json_encode({ 418 "status": "empty_queue" 419 })) 420 else: 421 task = glb.taskQueue.pullNextVersioned(modelVersions) 422 if task == None: 423 logging.info("Found no models in queue matching worker's supported versions") 424 self.write(tornado.escape.json_encode({ 425 "status": "no_version" 426 })) 427 else: 428 glb.taskQueue.putProcessingTask(task) 429 self.write(tornado.escape.json_encode({ 430 "task": task.asDict() 431 }))
432
433 -def main():
434 global glb 435 parser = OptionParser() 436 parser.add_option("-c", "--config", dest="config", 437 help="Config file", default="config.cfg") 438 parser.add_option("-p", "--port", dest="port", 439 help="Queue port number", default=9000) 440 parser.add_option('-l', '--log-filename', dest='log', 441 help="Log filename (use '-' for stderr)", default="-") 442 443 (options, args) = parser.parse_args() 444 445 config.loadConfig(options.config) 446 config.setupLogging(options.log) 447 model_manager.setupModels() 448 model_manager.startScannerThread() 449 450 try: 451 queueShelve = shelve.open(config.queueFile) 452 except anydbm.error: 453 logging.warning("Queue file '%s' is corrupt, removing and starting afresh", config.queueFile) 454 os.remove(config.queueFile) 455 queueShelve = shelve.open(config.queueFile) 456 457 try: 458 glb = QueueGlobals(queueShelve) 459 queueHTTP = tornado.httpserver.HTTPServer(tornado.web.Application([ 460 (r"/worker_info", WorkerInfo), 461 (r"/client_model_create", ClientModelCreate), 462 (r"/client_queue_has_workers", ClientQueueHasWorkers), 463 (r"/client_confirm/(\w+)", ClientConfirm), 464 (r"/worker_failed_task/(\d+)", WorkerFailedTask), 465 (r"/worker_succeed_task/(\d+)", WorkerSucceededTask), 466 (r"/worker_has_task/(\d+)", WorkerHasTask), 467 (r"/worker_keep_alive_task/(\d+)", WorkerTaskKeepAlive), 468 (r"/worker_work_task", WorkerTaskRequest) 469 ])) 470 queueHTTP.listen(options.port) 471 logging.info("NPSGD Queue Booted up, serving on port %d", options.port) 472 print >>sys.stderr, "NPSGD queue server listening on %d" % options.port 473 tornado.ioloop.IOLoop.instance().start() 474 finally: 475 queueShelve.close()
476 477 478 if __name__ == "__main__": 479 main() 480