Browse Source

First prototype of asyncio + protobuf for the peerfinder v2

Baptiste Jonglez 10 years ago
parent
commit
a31af63ffa
4 changed files with 515 additions and 0 deletions
  1. 45 0
      peerfinderv2/client.py
  2. 34 0
      peerfinderv2/peerfinder.proto
  3. 291 0
      peerfinderv2/peerfinder_pb2.py
  4. 145 0
      peerfinderv2/server.py

+ 45 - 0
peerfinderv2/client.py

@@ -0,0 +1,45 @@
+#!/usr/bin/env python
+
+import socket
+import select
+import random
+
+import peerfinder_pb2 as pf
+
+SERVER = ("::1", 9999)
+
+
+def test_socket():
+    s = socket.socket(socket.AF_INET6, socket.SOCK_STREAM)
+    s.settimeout(random.randint(2, 8))
+    s.connect(SERVER)
+    while True:
+        try:
+            data = s.recv(1024)
+            print(data)
+            if len(data) == 0:
+                break
+            msg = pf.Message()
+            msg.ParseFromString(data)
+            if msg.type == pf.Message.Pong:
+                print("Got pong")
+            else:
+                target = msg.target
+                print(target)
+        except socket.timeout:
+            print("Sending heartbeat")
+            msg = pf.Message()
+            msg.type = pf.Message.Ping
+            s.send(msg.SerializeToString())
+
+
+def test_protobuf():
+    a = pf.IPAddress()
+    a.address = "2001:db8::1"
+    a.family = pf.IPAddress.IPV6
+    print(a.SerializeToString())
+
+
+if __name__ == '__main__':
+    #test_protobuf()
+    test_socket()

+ 34 - 0
peerfinderv2/peerfinder.proto

@@ -0,0 +1,34 @@
+package peerfinder;
+
+message IPAddress {
+  enum AddressFamily {
+    IPV4 = 1;
+    IPV6 = 2;
+  }
+  required string address = 1;
+  required AddressFamily family = 2;
+}
+
+message Target {
+  required int32 target_id = 1;
+  required IPAddress target = 2;
+}
+
+message Measurement {
+  required int32 target_id = 1;
+  required int32 probes_sent = 2;
+  required int32 probes_received = 3;
+  repeated int32 latency_us = 4;
+}
+
+message Message {
+  enum Type {
+    Ping = 1;
+    Pong = 2;
+    Target = 3;
+    Measurement = 4;
+  }
+  required Type type = 1;
+  optional Target target = 2;
+  optional Measurement measurement = 3;
+}

File diff suppressed because it is too large
+ 291 - 0
peerfinderv2/peerfinder_pb2.py


+ 145 - 0
peerfinderv2/server.py

@@ -0,0 +1,145 @@
+#!/usr/bin/env python
+
+import asyncio
+import random
+import functools
+
+import netaddr
+from netaddr import IPAddress
+
+import peerfinder_pb2 as pf
+
+
+WORKER_BIND_ADDRESS = "::"
+WORKER_BIND_PORT = 9999
+
+COMMAND_BIND_ADDRESS = "127.0.0.1"
+COMMAND_BIND_PORT = 9998
+
+
+class Peerfinder(object):
+
+    def __init__(self, loop):
+        self.loop = loop
+        self.clients = []
+
+    def send_pong(self, writer):
+        msg = pf.Message()
+        msg.type = pf.Message.Pong
+        writer.write(msg.SerializeToString())
+        yield from writer.drain()
+
+    def send_target(self, writer, target):
+        msg = pf.Message()
+        msg.type = pf.Message.Target
+        #msg.target = target.SerializeToString()
+        msg.target.target_id = target.target_id
+        msg.target.target.address = target.target.address
+        msg.target.target.family = target.target.family
+        writer.write(msg.SerializeToString())
+        #yield from writer.drain()
+
+    def generate_data(self):
+        print("Generate_data")
+        target = pf.Target()
+        target.target_id = 1
+        target.target.address = "2001:db8::1"
+        target.target.family = pf.IPAddress.IPV6
+        while True:
+            print("Entering write loop")
+            for writer in list(self.clients):
+                self.sent_target(writer, target)
+            target.target_id += 1
+            yield from asyncio.sleep(2)
+
+    def generate_data_cb(self):
+        target = pf.Target()
+        target.target_id = 1
+        target.target.address = "2001:db8::1"
+        target.target.family = pf.IPAddress.IPV6
+        print("Sending data")
+        for writer in list(self.clients):
+            # writer.write(target.SerializeToString())
+            self.send_target(writer, target)
+        self.loop.call_later(random.randint(1, 10), self.generate_data_cb)
+        
+    def handle_commands(self, reader, writer):
+        target = pf.Target()
+        target.target_id = 1
+        family = {4: pf.IPAddress.IPV4, 6: pf.IPAddress.IPV6}
+        while True:
+            if reader.at_eof():
+                print("Exiting commands handler")
+                return
+            data = yield from reader.read(1024)
+            try:
+                data = data.strip().decode()
+                addr = IPAddress(data)
+            except netaddr.AddrFormatError:
+                print("Invalid command, disconnecting client")
+                writer.close()
+                return
+            target.target.address = str(addr)
+            target.target.family = family[addr.version]
+            target.target_id += 1
+            print("Sending data to all workers...")
+            for writer in list(self.clients):
+                #writer.write(target.SerializeToString())
+                self.send_target(writer, target)
+                yield from writer.drain()
+
+    def handle_worker(self, reader, writer):
+        self.clients.append(writer)
+        while True:
+            if reader.at_eof():
+                print("Exiting worker handler")
+                self.clients.remove(writer)
+                return
+            try:
+                data = yield from reader.read(1024)
+            except ConnectionResetError:
+                print("Exiting worker handler")
+                self.clients.remove(writer)
+                return
+            msg = pf.Message()
+            msg.ParseFromString(data)
+            print("Receiving {!r}".format(msg))
+            print("Currently having {} clients".format(len(self.clients)))
+            if msg.type == pf.Message.Ping:
+                yield from self.send_pong(writer)
+
+#@asyncio.coroutine
+def hello(loop):
+    print("hello")
+    #yield from asyncio.sleep(1)
+    loop.call_later(2, hello, loop)
+
+
+if __name__ == '__main__':
+    loop = asyncio.get_event_loop()
+    p = Peerfinder(loop)
+    worker_coro = asyncio.start_server(p.handle_worker, WORKER_BIND_ADDRESS, WORKER_BIND_PORT, loop=loop)
+    worker_server = loop.run_until_complete(worker_coro)
+    command_coro = asyncio.start_server(p.handle_commands, COMMAND_BIND_ADDRESS, COMMAND_BIND_PORT, loop=loop)
+    command_server = loop.run_until_complete(command_coro)
+
+    # Generate data (test)
+    #loop.call_soon(hello, loop)
+    loop.call_soon(p.generate_data_cb)
+    #asyncio.async(hello)
+    #loop.create_task(hello)
+
+    # Serve requests until CTRL+c is pressed
+    print('Serving workers on {}'.format(worker_server.sockets[0].getsockname()))
+    print('Listen to commands on {}'.format(command_server.sockets[0].getsockname()))
+    try:
+        loop.run_forever()
+    except KeyboardInterrupt:
+        pass
+
+    # Close the server
+    command_server.close()
+    loop.run_until_complete(command_server.wait_closed())
+    worker_server.close()
+    loop.run_until_complete(worker_server.wait_closed())
+    loop.close()