|
@@ -35,6 +35,7 @@ class Peerfinder(object):
|
|
|
# avoid overloading the targets of the jobs). For now, we have a
|
|
|
# global semaphore for all measurements, but we might change it to
|
|
|
# a semaphore for each measurement.
|
|
|
+ self.parallel_workers = parallel_workers
|
|
|
self.workers_semaphore = asyncio.Semaphore(parallel_workers)
|
|
|
|
|
|
def send_pong(self, writer):
|
|
@@ -51,6 +52,16 @@ class Peerfinder(object):
|
|
|
msg.target.target.family = target.target.family
|
|
|
writer.write(msg.SerializeToString())
|
|
|
|
|
|
+ def status(self):
|
|
|
+ busy_workers = len([None for w in self.workers if not w.queue.empty()])
|
|
|
+ print("Currently having {} workers, {} have queued work.".format(
|
|
|
+ len(self.workers),
|
|
|
+ busy_workers))
|
|
|
+ print("Queue lengths: {}".format([w.queue.qsize() for w in self.workers]))
|
|
|
+ print("Semaphore status: waiting for {}/{} workers.".format(
|
|
|
+ self.parallel_workers - self.workers_semaphore._value,
|
|
|
+ self.parallel_workers))
|
|
|
+
|
|
|
@asyncio.coroutine
|
|
|
def generate_measurements(self):
|
|
|
"""Generate dummy measurement at regular intervals, for debug"""
|
|
@@ -59,10 +70,11 @@ class Peerfinder(object):
|
|
|
target.target.address = "2001:db8::1"
|
|
|
target.target.family = pf.IPAddress.IPV6
|
|
|
while True:
|
|
|
+ self.status()
|
|
|
print("Sending measurement")
|
|
|
for worker in list(self.workers):
|
|
|
worker.queue.put_nowait(target)
|
|
|
- yield from asyncio.sleep(5 * random.random())
|
|
|
+ yield from asyncio.sleep(10 * random.random())
|
|
|
|
|
|
@asyncio.coroutine
|
|
|
def handle_commands(self, reader, writer):
|
|
@@ -92,6 +104,7 @@ class Peerfinder(object):
|
|
|
target.target_id = target_id
|
|
|
target.target.address = str(addr)
|
|
|
target.target.family = family[addr.version]
|
|
|
+ self.status()
|
|
|
print("Queueing measurement for all workers...")
|
|
|
for worker in list(self.workers):
|
|
|
worker.queue.put_nowait(target)
|