server.py 4.7 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145
  1. #!/usr/bin/env python
  2. import asyncio
  3. import random
  4. import functools
  5. import netaddr
  6. from netaddr import IPAddress
  7. import peerfinder_pb2 as pf
  8. WORKER_BIND_ADDRESS = "::"
  9. WORKER_BIND_PORT = 9999
  10. COMMAND_BIND_ADDRESS = "127.0.0.1"
  11. COMMAND_BIND_PORT = 9998
  12. class Peerfinder(object):
  13. def __init__(self, loop):
  14. self.loop = loop
  15. self.clients = []
  16. def send_pong(self, writer):
  17. msg = pf.Message()
  18. msg.type = pf.Message.Pong
  19. writer.write(msg.SerializeToString())
  20. yield from writer.drain()
  21. def send_target(self, writer, target):
  22. msg = pf.Message()
  23. msg.type = pf.Message.Target
  24. #msg.target = target.SerializeToString()
  25. msg.target.target_id = target.target_id
  26. msg.target.target.address = target.target.address
  27. msg.target.target.family = target.target.family
  28. writer.write(msg.SerializeToString())
  29. #yield from writer.drain()
  30. def generate_data(self):
  31. print("Generate_data")
  32. target = pf.Target()
  33. target.target_id = 1
  34. target.target.address = "2001:db8::1"
  35. target.target.family = pf.IPAddress.IPV6
  36. while True:
  37. print("Entering write loop")
  38. for writer in list(self.clients):
  39. self.sent_target(writer, target)
  40. target.target_id += 1
  41. yield from asyncio.sleep(2)
  42. def generate_data_cb(self):
  43. target = pf.Target()
  44. target.target_id = 1
  45. target.target.address = "2001:db8::1"
  46. target.target.family = pf.IPAddress.IPV6
  47. print("Sending data")
  48. for writer in list(self.clients):
  49. # writer.write(target.SerializeToString())
  50. self.send_target(writer, target)
  51. self.loop.call_later(random.randint(1, 10), self.generate_data_cb)
  52. def handle_commands(self, reader, writer):
  53. target = pf.Target()
  54. target.target_id = 1
  55. family = {4: pf.IPAddress.IPV4, 6: pf.IPAddress.IPV6}
  56. while True:
  57. if reader.at_eof():
  58. print("Exiting commands handler")
  59. return
  60. data = yield from reader.read(1024)
  61. try:
  62. data = data.strip().decode()
  63. addr = IPAddress(data)
  64. except netaddr.AddrFormatError:
  65. print("Invalid command, disconnecting client")
  66. writer.close()
  67. return
  68. target.target.address = str(addr)
  69. target.target.family = family[addr.version]
  70. target.target_id += 1
  71. print("Sending data to all workers...")
  72. for writer in list(self.clients):
  73. #writer.write(target.SerializeToString())
  74. self.send_target(writer, target)
  75. yield from writer.drain()
  76. def handle_worker(self, reader, writer):
  77. self.clients.append(writer)
  78. while True:
  79. if reader.at_eof():
  80. print("Exiting worker handler")
  81. self.clients.remove(writer)
  82. return
  83. try:
  84. data = yield from reader.read(1024)
  85. except ConnectionResetError:
  86. print("Exiting worker handler")
  87. self.clients.remove(writer)
  88. return
  89. msg = pf.Message()
  90. msg.ParseFromString(data)
  91. print("Receiving {!r}".format(msg))
  92. print("Currently having {} clients".format(len(self.clients)))
  93. if msg.type == pf.Message.Ping:
  94. yield from self.send_pong(writer)
  95. #@asyncio.coroutine
  96. def hello(loop):
  97. print("hello")
  98. #yield from asyncio.sleep(1)
  99. loop.call_later(2, hello, loop)
  100. if __name__ == '__main__':
  101. loop = asyncio.get_event_loop()
  102. p = Peerfinder(loop)
  103. worker_coro = asyncio.start_server(p.handle_worker, WORKER_BIND_ADDRESS, WORKER_BIND_PORT, loop=loop)
  104. worker_server = loop.run_until_complete(worker_coro)
  105. command_coro = asyncio.start_server(p.handle_commands, COMMAND_BIND_ADDRESS, COMMAND_BIND_PORT, loop=loop)
  106. command_server = loop.run_until_complete(command_coro)
  107. # Generate data (test)
  108. #loop.call_soon(hello, loop)
  109. loop.call_soon(p.generate_data_cb)
  110. #asyncio.async(hello)
  111. #loop.create_task(hello)
  112. # Serve requests until CTRL+c is pressed
  113. print('Serving workers on {}'.format(worker_server.sockets[0].getsockname()))
  114. print('Listen to commands on {}'.format(command_server.sockets[0].getsockname()))
  115. try:
  116. loop.run_forever()
  117. except KeyboardInterrupt:
  118. pass
  119. # Close the server
  120. command_server.close()
  121. loop.run_until_complete(command_server.wait_closed())
  122. worker_server.close()
  123. loop.run_until_complete(worker_server.wait_closed())
  124. loop.close()