Module npsgd_worker
[hide private]
[frames] | no frames]

Source Code for Module npsgd_worker

  1  #!/usr/bin/python 
  2  # Author: Thomas Dimson [tdimson@gmail.com] 
  3  # Date:   January 2011 
  4  # For distribution details, see LICENSE 
  5  """Worker process for NPSGD. 
  6   
  7  This script will periodically poll the queue server specified in the config 
  8  file to see if it has any jobs available. When it finds one, it will  
  9  take it off the queue and begin processing - and communicate success back 
 10  with the user (via e-mail) or failure back to the queue 
 11  """ 
 12  import os 
 13  import sys 
 14  import time 
 15  import json 
 16  import logging 
 17  import urllib2, urllib 
 18  from threading import Thread, Event 
 19  from optparse import OptionParser 
 20   
 21  from npsgd import model_manager 
 22  from npsgd.config import config 
 23  from npsgd.model_task import ModelTask 
 24  from npsgd.model_manager import modelManager 
 25  import npsgd.email_manager 
 26   
27 -class TaskKeepAliveThread(Thread):
28 """Model keep alive thread. 29 30 Periodically sends a keepalive (heartbeat) to the server while we are working 31 on a model task so that it doesn't expire our task id. 32 """ 33
34 - def __init__(self, keepAliveRequest, taskId):
35 Thread.__init__(self) 36 self.done = Event() 37 self.keepAliveRequest = keepAliveRequest 38 self.taskId = taskId 39 self.daemon = True
40
41 - def run(self):
42 fails = 0 43 while True: 44 self.done.wait(config.keepAliveInterval) 45 if self.done.isSet(): 46 break 47 48 try: 49 logging.info("Making heartbeat request '%s'", self.keepAliveRequest) 50 response = urllib2.urlopen("%s/%s?secret=%s" % (self.keepAliveRequest, self.taskId, config.requestSecret)) 51 except urllib2.URLError, e: 52 logging.error("Heartbeat failed to make connection to %s", self.keepAliveRequest) 53 fails += 1
54 55
56 -class NPSGDWorker(object):
57 """Worker class for executing models and sending out result emails. 58 59 This enters a polling loop where the worker will poll the queue for tasks 60 at a fixed interval. When it finds a task, it will decode it into a model, 61 then process it using the model's "run" method. 62 """
63 - def __init__(self, serverAddress, serverPort):
64 self.baseRequest = "http://%s:%s" % (serverAddress, serverPort) 65 self.infoRequest = "%s/worker_info" % self.baseRequest 66 self.taskRequest = "%s/worker_work_task" % self.baseRequest 67 self.failedTaskRequest = "%s/worker_failed_task" % self.baseRequest 68 self.hasTaskRequest = "%s/worker_has_task" % self.baseRequest 69 self.succeedTaskRequest = "%s/worker_succeed_task" % self.baseRequest 70 self.taskKeepAliveRequest = "%s/worker_keep_alive_task" % self.baseRequest 71 self.requestTimeout = 100 72 self.supportedModels = ["test"] 73 self.requestErrors = 0 74 self.maxErrors = 3 75 self.errorSleepTime = 10 76 self.requestSleepTime = 10
77
78 - def getServerInfo(self):
79 try: 80 response = urllib2.urlopen("%s?secret=%s" % (self.infoRequest, config.requestSecret)) 81 except urllib2.URLError, e: 82 logging.error("Failed to make initial connection to %s", self.baseRequest) 83 return 84 85 logging.info("Got initial response from server")
86
87 - def loop(self):
88 """Main IO loop.""" 89 logging.info("Entering event loop") 90 while True: 91 try: 92 self.handleEvents() 93 except Exception: 94 logging.exception("Unhandled exception in event loop!")
95
96 - def handleEvents(self):
97 """Workhorse method of actually making requests to the queue for tasks.""" 98 try: 99 logging.info("Polling %s for tasks" % self.taskRequest) 100 #response = urllib2.urlopen("%s?secret=%s" % (self.taskRequest, config.requestSecret)) 101 response = urllib2.urlopen(self.taskRequest, data=urllib.urlencode({ 102 "secret": config.requestSecret, 103 "model_versions_json": json.dumps(modelManager.modelVersions()) 104 })) 105 except urllib2.URLError, e: 106 self.requestErrors += 1 107 logging.error("Error making worker request to server, attempt #%d", self.requestErrors + 1) 108 time.sleep(self.errorSleepTime) 109 return 110 111 try: 112 decodedResponse = json.load(response) 113 except ValueError, e: 114 self.requestErrors += 1 115 logging.error("Error decoding server response, attempt #%d", self.requestErrors +1) 116 time.sleep(self.errorSleepTime) 117 return 118 119 self.requestErrors = 0 120 self.processResponse(decodedResponse) 121 time.sleep(self.requestSleepTime)
122 123 124
125 - def processResponse(self, response):
126 if "status" in response: 127 if response["status"] == "empty_queue": 128 logging.info("No tasks available on server") 129 elif response["status"] == "no_version": 130 logging.info("Queue lacks any tasks with our model versions") 131 elif "task" in response: 132 self.processTask(response["task"])
133
134 - def notifyFailedTask(self, taskId):
135 try: 136 logging.info("Notifying server of failed task with id %s", taskId) 137 response = urllib2.urlopen("%s/%s?secret=%s" % (self.failedTaskRequest, taskId, config.requestSecret)) 138 except urllib2.URLError, e: 139 logging.error("Failed to communicate failed task to server %s", self.baseRequest)
140
141 - def notifySucceedTask(self, taskId):
142 try: 143 logging.info("Notifying server of succeeded task with id %s", taskId) 144 response = urllib2.urlopen("%s/%s?secret=%s" % (self.succeedTaskRequest, taskId, config.requestSecret)) 145 except urllib2.URLError, e: 146 logging.error("Failed to communicate succeeded task to server %s", self.baseRequest)
147
148 - def serverHasTask(self, taskId):
149 """Method for ensuring that the queue still recognizes our task id. 150 151 If the queue has expired the task for some reason (i.e. a timeout) 152 this method will return false. Otherwise, it means we can proceed. 153 """ 154 try: 155 logging.info("Making has task request for %s", taskId) 156 response = urllib2.urlopen("%s/%s?secret=%s" % (self.hasTaskRequest, taskId, config.requestSecret)) 157 except urllib2.URLError, e: 158 logging.error("Failed to make has task request to server %s", self.baseRequest) 159 raise RuntimeError(e) 160 161 try: 162 decodedResponse = json.load(response) 163 except ValueError, e: 164 logging.error("Bad response from server for has task") 165 raise RuntimeError(e) 166 167 if "response" in decodedResponse and decodedResponse["response"] in ["yes", "no"]: 168 return decodedResponse["response"] == "yes" 169 else: 170 logging.error("Malformed response from server") 171 raise RuntimeError("Malformed response from server for 'has task'")
172
173 - def processTask(self, taskDict):
174 """Handle creation and running of a model and setup heartbeat thread. 175 176 This is the heart of a worker. When we find a model on the queue, this 177 method takes the request and decodes it into something that can be processed. 178 It will then spawn a heartbeat thread that continues to check into the server 179 while we actually enter the models "run" method. From there, it is all up 180 to the model to handle. 181 """ 182 taskId = None 183 if "taskId" in taskDict: 184 taskId = taskDict["taskId"] 185 186 try: 187 try: 188 model = modelManager.getModel(taskDict["modelName"], taskDict["modelVersion"]) 189 logging.info("Creating a model task for '%s'", taskDict["modelName"]) 190 taskObject = model.fromDict(taskDict) 191 except KeyError, e: 192 logging.warning("Was unable to deserialize model task (%s), model task: %s", e, taskDict) 193 if taskId: 194 self.notifyFailedTask(taskId) 195 return 196 197 keepAliveThread = TaskKeepAliveThread(self.taskKeepAliveRequest, taskObject.taskId) 198 keepAliveThread.start() 199 try: 200 resultsEmail = taskObject.run() 201 logging.info("Model finished running, sending email") 202 if self.serverHasTask(taskObject.taskId): 203 npsgd.email_manager.blockingEmailSend(resultsEmail) 204 logging.info("Email sent, model is 100% complete!") 205 self.notifySucceedTask(taskObject.taskId) 206 else: 207 logging.warning("Skipping task completion since the server forgot about our task") 208 209 except RuntimeError, e: 210 logging.error("Some kind of error during processing model task, notifying server of failure") 211 logging.exception(e) 212 self.notifyFailedTask(taskObject.taskId) 213 214 finally: 215 keepAliveThread.done.set() 216 217 except: #If all else fails, notify the server that we are going down 218 if taskId: 219 self.notifyFailedTask(taskId) 220 raise
221
222 -def main():
223 parser = OptionParser() 224 parser.add_option('-c', '--config', dest="config", 225 help="Configuration file path", type="string", default="config.cfg") 226 227 parser.add_option('-l', '--log-filename', dest='log', 228 help="Log filename (use '-' for stderr)", default="-") 229 230 (options, args) = parser.parse_args() 231 232 config.loadConfig(options.config) 233 config.setupLogging(options.log) 234 model_manager.setupModels() 235 model_manager.startScannerThread() 236 237 worker = NPSGDWorker(config.queueServerAddress, config.queueServerPort) 238 logging.info("NPSGD Worker booted up, going into event loop") 239 worker.getServerInfo() 240 worker.loop()
241 242 if __name__ == "__main__": 243 main() 244