123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145 |
- #!/usr/bin/env python
- import asyncio
- import random
- import functools
- import netaddr
- from netaddr import IPAddress
- import peerfinder_pb2 as pf
- WORKER_BIND_ADDRESS = "::"
- WORKER_BIND_PORT = 9999
- COMMAND_BIND_ADDRESS = "127.0.0.1"
- COMMAND_BIND_PORT = 9998
- class Peerfinder(object):
- def __init__(self, loop):
- self.loop = loop
- self.clients = []
- def send_pong(self, writer):
- msg = pf.Message()
- msg.type = pf.Message.Pong
- writer.write(msg.SerializeToString())
- yield from writer.drain()
- def send_target(self, writer, target):
- msg = pf.Message()
- msg.type = pf.Message.Target
- #msg.target = target.SerializeToString()
- msg.target.target_id = target.target_id
- msg.target.target.address = target.target.address
- msg.target.target.family = target.target.family
- writer.write(msg.SerializeToString())
- #yield from writer.drain()
- def generate_data(self):
- print("Generate_data")
- target = pf.Target()
- target.target_id = 1
- target.target.address = "2001:db8::1"
- target.target.family = pf.IPAddress.IPV6
- while True:
- print("Entering write loop")
- for writer in list(self.clients):
- self.sent_target(writer, target)
- target.target_id += 1
- yield from asyncio.sleep(2)
- def generate_data_cb(self):
- target = pf.Target()
- target.target_id = 1
- target.target.address = "2001:db8::1"
- target.target.family = pf.IPAddress.IPV6
- print("Sending data")
- for writer in list(self.clients):
- # writer.write(target.SerializeToString())
- self.send_target(writer, target)
- self.loop.call_later(random.randint(1, 10), self.generate_data_cb)
-
- def handle_commands(self, reader, writer):
- target = pf.Target()
- target.target_id = 1
- family = {4: pf.IPAddress.IPV4, 6: pf.IPAddress.IPV6}
- while True:
- if reader.at_eof():
- print("Exiting commands handler")
- return
- data = yield from reader.read(1024)
- try:
- data = data.strip().decode()
- addr = IPAddress(data)
- except netaddr.AddrFormatError:
- print("Invalid command, disconnecting client")
- writer.close()
- return
- target.target.address = str(addr)
- target.target.family = family[addr.version]
- target.target_id += 1
- print("Sending data to all workers...")
- for writer in list(self.clients):
- #writer.write(target.SerializeToString())
- self.send_target(writer, target)
- yield from writer.drain()
- def handle_worker(self, reader, writer):
- self.clients.append(writer)
- while True:
- if reader.at_eof():
- print("Exiting worker handler")
- self.clients.remove(writer)
- return
- try:
- data = yield from reader.read(1024)
- except ConnectionResetError:
- print("Exiting worker handler")
- self.clients.remove(writer)
- return
- msg = pf.Message()
- msg.ParseFromString(data)
- print("Receiving {!r}".format(msg))
- print("Currently having {} clients".format(len(self.clients)))
- if msg.type == pf.Message.Ping:
- yield from self.send_pong(writer)
- #@asyncio.coroutine
- def hello(loop):
- print("hello")
- #yield from asyncio.sleep(1)
- loop.call_later(2, hello, loop)
- if __name__ == '__main__':
- loop = asyncio.get_event_loop()
- p = Peerfinder(loop)
- worker_coro = asyncio.start_server(p.handle_worker, WORKER_BIND_ADDRESS, WORKER_BIND_PORT, loop=loop)
- worker_server = loop.run_until_complete(worker_coro)
- command_coro = asyncio.start_server(p.handle_commands, COMMAND_BIND_ADDRESS, COMMAND_BIND_PORT, loop=loop)
- command_server = loop.run_until_complete(command_coro)
- # Generate data (test)
- #loop.call_soon(hello, loop)
- loop.call_soon(p.generate_data_cb)
- #asyncio.async(hello)
- #loop.create_task(hello)
- # Serve requests until CTRL+c is pressed
- print('Serving workers on {}'.format(worker_server.sockets[0].getsockname()))
- print('Listen to commands on {}'.format(command_server.sockets[0].getsockname()))
- try:
- loop.run_forever()
- except KeyboardInterrupt:
- pass
- # Close the server
- command_server.close()
- loop.run_until_complete(command_server.wait_closed())
- worker_server.close()
- loop.run_until_complete(worker_server.wait_closed())
- loop.close()
|