#!/usr/bin/env python import asyncio import random import functools import netaddr from netaddr import IPAddress import peerfinder_pb2 as pf WORKER_BIND_ADDRESS = "::" WORKER_BIND_PORT = 9999 COMMAND_BIND_ADDRESS = "127.0.0.1" COMMAND_BIND_PORT = 9998 class Peerfinder(object): def __init__(self, loop): self.loop = loop self.clients = [] def send_pong(self, writer): msg = pf.Message() msg.type = pf.Message.Pong writer.write(msg.SerializeToString()) yield from writer.drain() def send_target(self, writer, target): msg = pf.Message() msg.type = pf.Message.Target #msg.target = target.SerializeToString() msg.target.target_id = target.target_id msg.target.target.address = target.target.address msg.target.target.family = target.target.family writer.write(msg.SerializeToString()) #yield from writer.drain() def generate_data(self): print("Generate_data") target = pf.Target() target.target_id = 1 target.target.address = "2001:db8::1" target.target.family = pf.IPAddress.IPV6 while True: print("Entering write loop") for writer in list(self.clients): self.sent_target(writer, target) target.target_id += 1 yield from asyncio.sleep(2) def generate_data_cb(self): target = pf.Target() target.target_id = 1 target.target.address = "2001:db8::1" target.target.family = pf.IPAddress.IPV6 print("Sending data") for writer in list(self.clients): # writer.write(target.SerializeToString()) self.send_target(writer, target) self.loop.call_later(random.randint(1, 10), self.generate_data_cb) def handle_commands(self, reader, writer): target = pf.Target() target.target_id = 1 family = {4: pf.IPAddress.IPV4, 6: pf.IPAddress.IPV6} while True: if reader.at_eof(): print("Exiting commands handler") return data = yield from reader.read(1024) try: data = data.strip().decode() addr = IPAddress(data) except netaddr.AddrFormatError: print("Invalid command, disconnecting client") writer.close() return target.target.address = str(addr) target.target.family = family[addr.version] target.target_id += 1 print("Sending data to all workers...") for writer in list(self.clients): #writer.write(target.SerializeToString()) self.send_target(writer, target) yield from writer.drain() def handle_worker(self, reader, writer): self.clients.append(writer) while True: if reader.at_eof(): print("Exiting worker handler") self.clients.remove(writer) return try: data = yield from reader.read(1024) except ConnectionResetError: print("Exiting worker handler") self.clients.remove(writer) return msg = pf.Message() msg.ParseFromString(data) print("Receiving {!r}".format(msg)) print("Currently having {} clients".format(len(self.clients))) if msg.type == pf.Message.Ping: yield from self.send_pong(writer) #@asyncio.coroutine def hello(loop): print("hello") #yield from asyncio.sleep(1) loop.call_later(2, hello, loop) if __name__ == '__main__': loop = asyncio.get_event_loop() p = Peerfinder(loop) worker_coro = asyncio.start_server(p.handle_worker, WORKER_BIND_ADDRESS, WORKER_BIND_PORT, loop=loop) worker_server = loop.run_until_complete(worker_coro) command_coro = asyncio.start_server(p.handle_commands, COMMAND_BIND_ADDRESS, COMMAND_BIND_PORT, loop=loop) command_server = loop.run_until_complete(command_coro) # Generate data (test) #loop.call_soon(hello, loop) loop.call_soon(p.generate_data_cb) #asyncio.async(hello) #loop.create_task(hello) # Serve requests until CTRL+c is pressed print('Serving workers on {}'.format(worker_server.sockets[0].getsockname())) print('Listen to commands on {}'.format(command_server.sockets[0].getsockname())) try: loop.run_forever() except KeyboardInterrupt: pass # Close the server command_server.close() loop.run_until_complete(command_server.wait_closed()) worker_server.close() loop.run_until_complete(worker_server.wait_closed()) loop.close()