|
@@ -16,6 +16,9 @@ WORKER_BIND_PORT = 9999
|
|
|
COMMAND_BIND_ADDRESS = "127.0.0.1"
|
|
|
COMMAND_BIND_PORT = 9998
|
|
|
|
|
|
+# How long (in seconds) to wait for a worker before disconnecting it
|
|
|
+WORKER_TIMEOUT = 15
|
|
|
+
|
|
|
|
|
|
class Worker(object):
|
|
|
|
|
@@ -118,7 +121,7 @@ class Peerfinder(object):
|
|
|
# Wait for a new target
|
|
|
target = yield from worker.queue.get()
|
|
|
if reader.at_eof():
|
|
|
- print("Exiting worker handler")
|
|
|
+ print("Worker disconnected, exiting")
|
|
|
self.workers.remove(worker)
|
|
|
return
|
|
|
# Use a semaphore to avoid overloading targets
|
|
@@ -126,11 +129,16 @@ class Peerfinder(object):
|
|
|
self.send_target(writer, target)
|
|
|
try:
|
|
|
yield from writer.drain()
|
|
|
- # TODO: timeout
|
|
|
- data = yield from reader.read(1024)
|
|
|
+ data = yield from asyncio.wait_for(reader.read(1024),
|
|
|
+ WORKER_TIMEOUT)
|
|
|
except ConnectionResetError:
|
|
|
- print("Exiting worker handler")
|
|
|
+ print("Worker disconnected, exiting")
|
|
|
+ self.workers.remove(worker)
|
|
|
+ return
|
|
|
+ except asyncio.TimeoutError:
|
|
|
+ print("Worker timeout, exiting")
|
|
|
self.workers.remove(worker)
|
|
|
+ writer.close()
|
|
|
return
|
|
|
msg = pf.Message()
|
|
|
answer = pf.Message()
|