1
2
3
4
5 """Worker process for NPSGD.
6
7 This script will periodically poll the queue server specified in the config
8 file to see if it has any jobs available. When it finds one, it will
9 take it off the queue and begin processing - and communicate success back
10 with the user (via e-mail) or failure back to the queue
11 """
12 import os
13 import sys
14 import time
15 import json
16 import logging
17 import urllib2, urllib
18 from threading import Thread, Event
19 from optparse import OptionParser
20
21 from npsgd import model_manager
22 from npsgd.config import config
23 from npsgd.model_task import ModelTask
24 from npsgd.model_manager import modelManager
25 import npsgd.email_manager
26
28 """Model keep alive thread.
29
30 Periodically sends a keepalive (heartbeat) to the server while we are working
31 on a model task so that it doesn't expire our task id.
32 """
33
34 - def __init__(self, keepAliveRequest, taskId):
35 Thread.__init__(self)
36 self.done = Event()
37 self.keepAliveRequest = keepAliveRequest
38 self.taskId = taskId
39 self.daemon = True
40
42 fails = 0
43 while True:
44 self.done.wait(config.keepAliveInterval)
45 if self.done.isSet():
46 break
47
48 try:
49 logging.info("Making heartbeat request '%s'", self.keepAliveRequest)
50 response = urllib2.urlopen("%s/%s?secret=%s" % (self.keepAliveRequest, self.taskId, config.requestSecret))
51 except urllib2.URLError, e:
52 logging.error("Heartbeat failed to make connection to %s", self.keepAliveRequest)
53 fails += 1
54
55
57 """Worker class for executing models and sending out result emails.
58
59 This enters a polling loop where the worker will poll the queue for tasks
60 at a fixed interval. When it finds a task, it will decode it into a model,
61 then process it using the model's "run" method.
62 """
63 - def __init__(self, serverAddress, serverPort):
64 self.baseRequest = "http://%s:%s" % (serverAddress, serverPort)
65 self.infoRequest = "%s/worker_info" % self.baseRequest
66 self.taskRequest = "%s/worker_work_task" % self.baseRequest
67 self.failedTaskRequest = "%s/worker_failed_task" % self.baseRequest
68 self.hasTaskRequest = "%s/worker_has_task" % self.baseRequest
69 self.succeedTaskRequest = "%s/worker_succeed_task" % self.baseRequest
70 self.taskKeepAliveRequest = "%s/worker_keep_alive_task" % self.baseRequest
71 self.requestTimeout = 100
72 self.supportedModels = ["test"]
73 self.requestErrors = 0
74 self.maxErrors = 3
75 self.errorSleepTime = 10
76 self.requestSleepTime = 10
77
79 try:
80 response = urllib2.urlopen("%s?secret=%s" % (self.infoRequest, config.requestSecret))
81 except urllib2.URLError, e:
82 logging.error("Failed to make initial connection to %s", self.baseRequest)
83 return
84
85 logging.info("Got initial response from server")
86
88 """Main IO loop."""
89 logging.info("Entering event loop")
90 while True:
91 try:
92 self.handleEvents()
93 except Exception:
94 logging.exception("Unhandled exception in event loop!")
95
97 """Workhorse method of actually making requests to the queue for tasks."""
98 try:
99 logging.info("Polling %s for tasks" % self.taskRequest)
100
101 response = urllib2.urlopen(self.taskRequest, data=urllib.urlencode({
102 "secret": config.requestSecret,
103 "model_versions_json": json.dumps(modelManager.modelVersions())
104 }))
105 except urllib2.URLError, e:
106 self.requestErrors += 1
107 logging.error("Error making worker request to server, attempt #%d", self.requestErrors + 1)
108 time.sleep(self.errorSleepTime)
109 return
110
111 try:
112 decodedResponse = json.load(response)
113 except ValueError, e:
114 self.requestErrors += 1
115 logging.error("Error decoding server response, attempt #%d", self.requestErrors +1)
116 time.sleep(self.errorSleepTime)
117 return
118
119 self.requestErrors = 0
120 self.processResponse(decodedResponse)
121 time.sleep(self.requestSleepTime)
122
123
124
126 if "status" in response:
127 if response["status"] == "empty_queue":
128 logging.info("No tasks available on server")
129 elif response["status"] == "no_version":
130 logging.info("Queue lacks any tasks with our model versions")
131 elif "task" in response:
132 self.processTask(response["task"])
133
135 try:
136 logging.info("Notifying server of failed task with id %s", taskId)
137 response = urllib2.urlopen("%s/%s?secret=%s" % (self.failedTaskRequest, taskId, config.requestSecret))
138 except urllib2.URLError, e:
139 logging.error("Failed to communicate failed task to server %s", self.baseRequest)
140
142 try:
143 logging.info("Notifying server of succeeded task with id %s", taskId)
144 response = urllib2.urlopen("%s/%s?secret=%s" % (self.succeedTaskRequest, taskId, config.requestSecret))
145 except urllib2.URLError, e:
146 logging.error("Failed to communicate succeeded task to server %s", self.baseRequest)
147
149 """Method for ensuring that the queue still recognizes our task id.
150
151 If the queue has expired the task for some reason (i.e. a timeout)
152 this method will return false. Otherwise, it means we can proceed.
153 """
154 try:
155 logging.info("Making has task request for %s", taskId)
156 response = urllib2.urlopen("%s/%s?secret=%s" % (self.hasTaskRequest, taskId, config.requestSecret))
157 except urllib2.URLError, e:
158 logging.error("Failed to make has task request to server %s", self.baseRequest)
159 raise RuntimeError(e)
160
161 try:
162 decodedResponse = json.load(response)
163 except ValueError, e:
164 logging.error("Bad response from server for has task")
165 raise RuntimeError(e)
166
167 if "response" in decodedResponse and decodedResponse["response"] in ["yes", "no"]:
168 return decodedResponse["response"] == "yes"
169 else:
170 logging.error("Malformed response from server")
171 raise RuntimeError("Malformed response from server for 'has task'")
172
174 """Handle creation and running of a model and setup heartbeat thread.
175
176 This is the heart of a worker. When we find a model on the queue, this
177 method takes the request and decodes it into something that can be processed.
178 It will then spawn a heartbeat thread that continues to check into the server
179 while we actually enter the models "run" method. From there, it is all up
180 to the model to handle.
181 """
182 taskId = None
183 if "taskId" in taskDict:
184 taskId = taskDict["taskId"]
185
186 try:
187 try:
188 model = modelManager.getModel(taskDict["modelName"], taskDict["modelVersion"])
189 logging.info("Creating a model task for '%s'", taskDict["modelName"])
190 taskObject = model.fromDict(taskDict)
191 except KeyError, e:
192 logging.warning("Was unable to deserialize model task (%s), model task: %s", e, taskDict)
193 if taskId:
194 self.notifyFailedTask(taskId)
195 return
196
197 keepAliveThread = TaskKeepAliveThread(self.taskKeepAliveRequest, taskObject.taskId)
198 keepAliveThread.start()
199 try:
200 resultsEmail = taskObject.run()
201 logging.info("Model finished running, sending email")
202 if self.serverHasTask(taskObject.taskId):
203 npsgd.email_manager.blockingEmailSend(resultsEmail)
204 logging.info("Email sent, model is 100% complete!")
205 self.notifySucceedTask(taskObject.taskId)
206 else:
207 logging.warning("Skipping task completion since the server forgot about our task")
208
209 except RuntimeError, e:
210 logging.error("Some kind of error during processing model task, notifying server of failure")
211 logging.exception(e)
212 self.notifyFailedTask(taskObject.taskId)
213
214 finally:
215 keepAliveThread.done.set()
216
217 except:
218 if taskId:
219 self.notifyFailedTask(taskId)
220 raise
221
223 parser = OptionParser()
224 parser.add_option('-c', '--config', dest="config",
225 help="Configuration file path", type="string", default="config.cfg")
226
227 parser.add_option('-l', '--log-filename', dest='log',
228 help="Log filename (use '-' for stderr)", default="-")
229
230 (options, args) = parser.parse_args()
231
232 config.loadConfig(options.config)
233 config.setupLogging(options.log)
234 model_manager.setupModels()
235 model_manager.startScannerThread()
236
237 worker = NPSGDWorker(config.queueServerAddress, config.queueServerPort)
238 logging.info("NPSGD Worker booted up, going into event loop")
239 worker.getServerInfo()
240 worker.loop()
241
242 if __name__ == "__main__":
243 main()
244