1
2
3
4
5
6 """Queue server for npsgd modelling tasks.
7
8 The queue server is the data backend for NPSGD. It listens to both workers and
9 the web interface. The web interface populates it with requests while the workers
10 poll for requests and pull them off the queue. Additionally, the queue is
11 responsible for sending out confirmation code e-mail messages.
12 """
13 import os
14 import sys
15 import anydbm
16 import shelve
17 import logging
18 import tornado.web
19 import tornado.ioloop
20 import tornado.escape
21 import tornado.httpserver
22 import threading
23 from datetime import datetime
24 from optparse import OptionParser
25
26 import npsgd.email_manager
27 from npsgd.email_manager import Email
28 from npsgd import model_manager
29 from npsgd.config import config
30 from npsgd.task_queue import TaskQueue
31 from npsgd.task_queue import TaskQueueException
32 from npsgd.confirmation_map import ConfirmationMap
33 from npsgd.model_manager import modelManager
34
35 glb = None
36 """Queue globals object - assigned at startup."""
37
39 """Queue state objects along with disk serialization mechanisms for them."""
40
42 self.shelve = shelve
43 self.shelveLock = threading.RLock()
44 self.idLock = threading.RLock()
45 self.taskQueue = TaskQueue()
46 self.confirmationMap = ConfirmationMap()
47 if shelve.has_key("idCounter"):
48 self.idCounter = shelve["idCounter"]
49 else:
50 self.idCounter = 0
51
52 self.loadDiskTaskQueue()
53 self.loadConfirmationMap()
54 self.expireWorkerTaskThread = ExpireWorkerTaskThread(self.taskQueue)
55 self.lastWorkerCheckin = datetime(1,1,1)
56
58 """Load task queue from disk using the shelve reserved for the queue."""
59
60 if not self.shelve.has_key("taskQueue"):
61 logging.info("Unable to read task queue from disk db, starting fresh")
62 return
63
64 logging.info("Reading task queue from disk")
65 readTasks = 0
66 failedTasks = 0
67 taskDicts = self.shelve["taskQueue"]
68 for taskDict in taskDicts:
69 try:
70 task = modelManager.getModelFromTaskDict(taskDict)
71 except model_manager.InvalidModelError, e:
72 emailAddress = taskDict["emailAddress"]
73 subject = config.lostTaskEmailSubject.generate(full_name=taskDict["modelFullName"],
74 visibleId=taskDict["visibleId"])
75 body = config.lostTaskEmailTemplate.generate()
76 emailObject = Email(emailAddress, subject, body)
77 logging.info("Invalid model-version pair, notifying %s", emailAddress)
78 npsgd.email_manager.backgroundEmailSend(Email(emailAddress, subject, body))
79 failedTasks += 1
80 continue
81
82 readTasks += 1
83 self.taskQueue.putTask(task)
84
85 logging.info("Read %s tasks, failed while reading %s tasks", readTasks, failedTasks)
86
88 """Load confirmation map ([code, modelDict] pairs) from shelve reserved for the queue."""
89
90 if not self.shelve.has_key("confirmationMap"):
91 logging.info("Unable to read confirmation map from disk db, starting fresh")
92 return
93
94 logging.info("Reading confirmation map from disk")
95 confirmationMapEntries = self.shelve["confirmationMap"]
96
97 readCodes = 0
98 failedCodes = 0
99 for code, taskDict in confirmationMapEntries.iteritems():
100 try:
101 task = modelManager.getModelFromTaskDict(taskDict)
102 except model_manager.InvalidModelError, e:
103 emailAddress = taskDict["emailAddress"]
104 subject = config.confirmationFailedEmailSubject.generate(full_name=taskDict["modelFullName"],
105 visibleId=taskDict["visibleId"])
106 body = config.confirmationFailedEmailTemplate.generate(code=code)
107 emailObject = Email(emailAddress, subject, body)
108 logging.info("Invalid model-version pair, notifying %s", emailAddress)
109 npsgd.email_manager.backgroundEmailSend(Email(emailAddress, subject, body))
110 failedCodes += 1
111 continue
112
113 readCodes += 1
114 self.confirmationMap.putRequestWithCode(task, code)
115
116 logging.info("Read %s codes, failed while reading %s codes", readCodes, failedCodes)
117
118
120 """Serializes the task queue, confirmation map and id counter to disk using the queue shelve."""
121 try:
122 with self.shelveLock:
123 self.shelve["taskQueue"] = [e.asDict() \
124 for e in self.taskQueue.allRequests()]
125 self.shelve["confirmationMap"] = dict( (code, task.asDict())\
126 for (code, task) in self.confirmationMap.getRequestsWithCodes())
127
128 with self.idLock:
129 self.shelve["idCounter"] = self.idCounter
130 except pickle.PicklingError, e:
131 logging.warning("Unable sync task queue and confirmation error to disk due to a pickling (serialization error): %s", e)
132 return
133
134 logging.info("Synced queue and confirmation map to disk")
135
137 self.lastWorkerCheckin = datetime.now()
138
140 with self.idLock:
141 self.idCounter += 1
142 return self.idCounter
143
145 """Task Expiration Thread
146
147 Moves tasks back into the queue whenever
148 We haven't heard from a worker in a while
149 """
150
152 threading.Thread.__init__(self)
153 self.daemon = True
154 self.taskQueue = taskQueue
155 self.done = threading.Event()
156
158 logging.info("Expire worker task thread booting up...")
159
160 while True:
161 self.done.wait(config.keepAliveInterval)
162 if self.done.isSet():
163 break
164
165 badTasks = self.taskQueue.pullProcessingTasksOlderThan(
166 time.time() - config.keepAliveTimeout)
167
168 if len(badTasks) > 0:
169 logging.info("Found %d tasks to expire", len(badTasks))
170
171 for task in badTasks:
172 task.failureCount += 1
173 logging.warning("Task '%s' failed due to timeout (failure #%d)", task.taskId, task.failureCount)
174 if task.failureCount > config.maxJobFailures:
175 logging.warning("Exceeded max job failures, sending fail email")
176 npsgd.email_manager.backgroundEmailSend(task.failureEmail())
177 else:
178 logging.warning("Inserting task back in to queue with new taskId")
179 task.taskId = glb.newTaskId()
180 self.taskQueue.putTask(task)
181
183 """Superclass to all queue request methods."""
185 """Checks the request for a 'secret' parameter that matches the queue's own."""
186 if self.get_argument("secret") == config.requestSecret:
187 return True
188 else:
189 self.write(tornado.escape.json_encode({"error": "bad_secret"}))
190 return False
191
193 """HTTP handler for clients creating a model request (before confirmation)."""
194
196 """Post handler for model requests from the web daemon.
197
198 Attempts to build a model from its known models (essentially performing
199 parameter verification) then places a request in the queue if it succeeds.
200 Additionally, it will send out an e-mail to the user for confirmation of
201 the request
202 """
203
204 if not self.checkSecret():
205 return
206
207 task_json = tornado.escape.json_decode(self.get_argument("task_json"))
208 task = modelManager.getModelFromTaskDict(task_json)
209 task.taskId = glb.newTaskId()
210 code = glb.confirmationMap.putRequest(task)
211
212 emailAddress = task.emailAddress
213 logging.info("Generated a request for %s, confirmation %s required", emailAddress, code)
214 subject = config.confirmEmailSubject.generate(task=task)
215 body = config.confirmEmailTemplate.generate(code=code, task=task, expireDelta=config.confirmTimeout)
216 emailObject = Email(emailAddress, subject, body)
217 npsgd.email_manager.backgroundEmailSend(emailObject)
218 glb.syncShelve()
219 self.write(tornado.escape.json_encode({
220 "response": {
221 "task" : task.asDict(),
222 "code" : code
223 }
224 }))
225
227 """Request handler for the web daemon to check if workers are available.
228
229 We keep track of the last time workers checked into the queue in order
230 to ensure that all requests can be processed.
231 """
232
234 if not self.checkSecret():
235 return
236
237 td = datetime.now() - glb.lastWorkerCheckin
238 hasWorkers = (td.seconds + td.days * 24 * 3600) < config.keepAliveTimeout
239
240 self.write(tornado.escape.json_encode({
241 "response": {
242 "has_workers" : hasWorkers
243 }
244 }))
245
246
247 previouslyConfirmed = set()
249 """HTTP handler for clients confirming a model request.
250
251 This handler moves requests from the confirmation map to the general
252 request queue for processing.
253 """
254 - def get(self, code):
279
280
282 """HTTP handler for workers checking into the queue."""
283
290
292 """HTTP handler for workers pinging the queue while working on a task.
293
294 Having this request makes sure that we don't time out any jobs that
295 are currently being handled by some worker. If a worker goes down,
296 we will put the job back into the queue because this request won't have
297 been made.
298 """
299 - def get(self, taskIdString):
300 if not self.checkSecret():
301 return
302 glb.touchWorkerCheckin()
303 taskId = int(taskIdString)
304 logging.info("Got heartbeat for task id '%s'", taskId)
305 try:
306 task = glb.taskQueue.touchProcessingTaskById(taskId)
307 except TaskQueueException, e:
308 logging.info("Bad keep alive request: no such task id '%s' exists" % taskId)
309 self.write(tornado.escape.json_encode({
310 "error": {"type" : "bad_id" }
311 }))
312
313 self.write("{}")
314
316 """HTTP handler for workers telling the queue that they have succeeded processing.
317
318 After this request, the queue no longer needs to keep track of the job in any way
319 and declares it complete.
320 """
321
322 - def get(self, taskIdString):
323 if not self.checkSecret():
324 return
325 glb.touchWorkerCheckin()
326 taskId = int(taskIdString)
327 try:
328 task = glb.taskQueue.pullProcessingTaskById(taskId)
329 except TaskQueueException, e:
330 logging.info("Bad succeed request: no task id exists")
331 self.write(tornado.escape.json_encode({
332 "error": {"type" : "bad_id" }
333 }))
334 return
335
336 glb.syncShelve()
337 self.write(tornado.escape.json_encode({
338 "status": "okay"
339 }))
340
342 """HTTP handler for workers ensuring that a job still exists.
343
344 This handler helps eliminate certain race conditions in NPSGD. Before a
345 worker sends an e-mail with job results, it checks back with the queue to
346 make sure that the job hasn't already been handler by another worker
347 (this could happen if the queue declares that the first worker had timed out).
348 If there is no task with that id still in the processing list then
349 an e-mail being sent out would be a duplicate.
350 """
351
352 - def get(self, taskIdString):
353 if not self.checkSecret():
354 return
355
356 glb.touchWorkerCheckin()
357 taskId = int(taskIdString)
358 logging.info("Got 'has task' request for task of id '%d'", taskId)
359 if glb.taskQueue.hasProcessingTaskById(taskId):
360 self.write(tornado.escape.json_encode({
361 "response": "yes"
362 }))
363 else:
364 self.write(tornado.escape.json_encode({
365 "response": "no"
366 }))
367
369 """HTTP handler for workers reporting failure to complete a job.
370
371 Upon failure, we will either recycle the request into the queue or we will
372 report a failure (with an e-mail message to the user).
373 """
374
375 - def get(self, taskIdString):
376 if not self.checkSecret():
377 return
378
379 glb.touchWorkerCheckin()
380 taskId = int(taskIdString)
381 try:
382 task = glb.taskQueue.pullProcessingTaskById(taskId)
383 except TaskQueueException, e:
384 logging.info("Bad failed request: no such task id exists, ignoring request")
385 self.write(tornado.escape.json_encode({
386 "error": {"type" : "bad_id" }
387 }))
388 return
389
390 task.failureCount += 1
391 logging.warning("Worker had a failure while processing task '%s' (failure #%d)",\
392 task.taskId, task.failureCount)
393
394 if task.failureCount >= config.maxJobFailures:
395 logging.warning("Max job failures found, sending failure email")
396 npsgd.email_manager.backgroundEmailSend(task.failureEmail())
397 else:
398 logging.warning("Returning task to queue for another attempt")
399 glb.taskQueue.putTask(task)
400
401 self.write(tornado.escape.json_encode({
402 "status": "okay"
403 }))
404
405
407 """HTTP handler for workers grabbings tasks off the queue."""
409 if not self.checkSecret():
410 return
411
412 modelVersions = tornado.escape.json_decode(self.get_argument("model_versions_json"))
413
414 glb.touchWorkerCheckin()
415 logging.info("Received worker task request with models %s", modelVersions)
416 if glb.taskQueue.isEmpty():
417 self.write(tornado.escape.json_encode({
418 "status": "empty_queue"
419 }))
420 else:
421 task = glb.taskQueue.pullNextVersioned(modelVersions)
422 if task == None:
423 logging.info("Found no models in queue matching worker's supported versions")
424 self.write(tornado.escape.json_encode({
425 "status": "no_version"
426 }))
427 else:
428 glb.taskQueue.putProcessingTask(task)
429 self.write(tornado.escape.json_encode({
430 "task": task.asDict()
431 }))
432
434 global glb
435 parser = OptionParser()
436 parser.add_option("-c", "--config", dest="config",
437 help="Config file", default="config.cfg")
438 parser.add_option("-p", "--port", dest="port",
439 help="Queue port number", default=9000)
440 parser.add_option('-l', '--log-filename', dest='log',
441 help="Log filename (use '-' for stderr)", default="-")
442
443 (options, args) = parser.parse_args()
444
445 config.loadConfig(options.config)
446 config.setupLogging(options.log)
447 model_manager.setupModels()
448 model_manager.startScannerThread()
449
450 try:
451 queueShelve = shelve.open(config.queueFile)
452 except anydbm.error:
453 logging.warning("Queue file '%s' is corrupt, removing and starting afresh", config.queueFile)
454 os.remove(config.queueFile)
455 queueShelve = shelve.open(config.queueFile)
456
457 try:
458 glb = QueueGlobals(queueShelve)
459 queueHTTP = tornado.httpserver.HTTPServer(tornado.web.Application([
460 (r"/worker_info", WorkerInfo),
461 (r"/client_model_create", ClientModelCreate),
462 (r"/client_queue_has_workers", ClientQueueHasWorkers),
463 (r"/client_confirm/(\w+)", ClientConfirm),
464 (r"/worker_failed_task/(\d+)", WorkerFailedTask),
465 (r"/worker_succeed_task/(\d+)", WorkerSucceededTask),
466 (r"/worker_has_task/(\d+)", WorkerHasTask),
467 (r"/worker_keep_alive_task/(\d+)", WorkerTaskKeepAlive),
468 (r"/worker_work_task", WorkerTaskRequest)
469 ]))
470 queueHTTP.listen(options.port)
471 logging.info("NPSGD Queue Booted up, serving on port %d", options.port)
472 print >>sys.stderr, "NPSGD queue server listening on %d" % options.port
473 tornado.ioloop.IOLoop.instance().start()
474 finally:
475 queueShelve.close()
476
477
478 if __name__ == "__main__":
479 main()
480