123456789101112131415161718192021222324252627282930313233343536373839404142434445 |
- #!/usr/bin/env python
- import socket
- import select
- import random
- import peerfinder_pb2 as pf
- SERVER = ("::1", 9999)
- def test_socket():
- s = socket.socket(socket.AF_INET6, socket.SOCK_STREAM)
- s.settimeout(random.randint(2, 8))
- s.connect(SERVER)
- while True:
- try:
- data = s.recv(1024)
- print(data)
- if len(data) == 0:
- break
- msg = pf.Message()
- msg.ParseFromString(data)
- if msg.type == pf.Message.Pong:
- print("Got pong")
- else:
- target = msg.target
- print(target)
- except socket.timeout:
- print("Sending heartbeat")
- msg = pf.Message()
- msg.type = pf.Message.Ping
- s.send(msg.SerializeToString())
- def test_protobuf():
- a = pf.IPAddress()
- a.address = "2001:db8::1"
- a.family = pf.IPAddress.IPV6
- print(a.SerializeToString())
- if __name__ == '__main__':
- #test_protobuf()
- test_socket()
|