123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275276277278279280281282283284285286287288289290291292293294295296297298299300301302303304305306307308309310311312313314315316317318319320321322323324325326327328329330331332333334335336337338339340341342343344345346347348349350351352353354355356357358359360361362363364365366367368369370371372373374375376377378379380381382383384385386387388389390391392393394395396397398399400401402403404405406407408409410411412413414415416417418419420421422423424425426427428429430431432433434435436437438439440441442443444445446447448449 |
- import unittest
- import os
- import json
- from isc.cc.session import *
- class MySocket():
- def __init__(self, family, type):
- self.family = family
- self.type = type
- self.recvqueue = bytearray()
- self.sendqueue = bytearray()
- self._blocking = True
- self.send_limit = None
- def connect(self, to):
- pass
- def close(self):
- pass
- def setblocking(self, val):
- self._blocking = val
- def send(self, data):
-
-
- if self.send_limit is not None and len(data) > self.send_limit:
- self.sendqueue.extend(data[0:self.send_limit])
- return self.send_limit
- else:
- self.sendqueue.extend(data)
- return len(data)
- def readsent(self, length):
- if length > len(self.sendqueue):
- raise Exception("readsent(" + str(length) + ") called, but only " + str(len(self.sendqueue)) + " in queue")
- result = self.sendqueue[:length]
- del self.sendqueue[:length]
- return result
- def readsentmsg(self):
- """return bytearray of the full message include length specifiers"""
- result = bytearray()
- length_buf = self.readsent(4)
- result.extend(length_buf)
- length = struct.unpack('>I', length_buf)[0]
- header_length_buf = self.readsent(2)
- header_length = struct.unpack('>H', header_length_buf)[0]
- result.extend(header_length_buf)
- data_length = length - 2 - header_length
- result.extend(self.readsent(header_length))
- result.extend(self.readsent(data_length))
- return result
- def readsentmsg_parsed(self):
- length_buf = self.readsent(4)
- length = struct.unpack('>I', length_buf)[0]
- header_length_buf = self.readsent(2)
- header_length = struct.unpack('>H', header_length_buf)[0]
- data_length = length - 2 - header_length
- env = json.loads(self.readsent(header_length).decode('utf-8'), strict=False)
- if (data_length > 0):
- msg = json.loads(self.readsent(data_length).decode('utf-8'), strict=False)
- else:
- msg = {}
- return (env, msg)
- def recv(self, length):
- if len(self.recvqueue) == 0:
- if self._blocking:
- return bytes()
- else:
- raise socket.error(errno.EAGAIN, "Resource temporarily unavailable")
- if length > len(self.recvqueue):
- raise Exception("Buffer underrun in test, does the test provide the right data?")
- result = self.recvqueue[:length]
- del self.recvqueue[:length]
-
-
- return result
- def addrecv(self, env, msg = None):
- if type(env) == dict:
- env = isc.cc.message.to_wire(env)
- if type(msg) == dict:
- msg = isc.cc.message.to_wire(msg)
- length = 2 + len(env);
- if msg:
- length += len(msg)
- self.recvqueue.extend(struct.pack("!I", length))
- self.recvqueue.extend(struct.pack("!H", len(env)))
- self.recvqueue.extend(env)
- if msg:
- self.recvqueue.extend(msg)
- def settimeout(self, val):
- pass
- def gettimeout(self):
- return 0
- def set_send_limit(self, limit):
- '''Specify the upper limit of the transmittable data at once.
- By default, the send() method of this class "sends" all given data.
- If this method is called and the its parameter is not None,
- subsequent calls to send() will only transmit the specified amount
- of data. This can be used to emulate the situation where send()
- on a real socket object results in partial write.
- '''
- self.send_limit = limit
- class MySession(Session):
- def __init__(self, port=9912, s=None):
- self._socket = None
- self._socket_timeout = 1
- self._lname = None
- self._recvbuffer = bytearray()
- self._recv_len_size = 0
- self._recv_size = 0
- self._sequence = 1
- self._closed = False
- self._queue = []
- self._lock = threading.RLock()
- if s is not None:
- self._socket = s
- else:
- try:
- self._socket = MySocket(socket.AF_INET, socket.SOCK_STREAM)
- self._socket.connect(tuple(['127.0.0.1', port]))
- self._lname = "test_name"
-
- except socket.error as se:
- raise SessionError(se)
- class testSession(unittest.TestCase):
- def test_session_close(self):
- sess = MySession()
- self.assertEqual("test_name", sess.lname)
- sess.close()
- self.assertRaises(SessionError, sess.sendmsg, {}, {"hello": "a"})
- def test_env_too_large(self):
- sess = MySession()
- largeenv = { "a": "b"*65535 }
- self.assertRaises(ProtocolError, sess.sendmsg, largeenv, {"hello": "a"})
- def test_session_sendmsg(self):
- sess = MySession()
- sess.sendmsg({}, {"hello": "a"})
- sent = sess._socket.readsentmsg();
- self.assertEqual(sent, b'\x00\x00\x00\x12\x00\x02{}{"hello": "a"}')
- sess.close()
- self.assertRaises(SessionError, sess.sendmsg, {}, {"hello": "a"})
- def test_session_sendmsg2(self):
- sess = MySession()
- sess.sendmsg({'to': 'someone', 'reply': 1}, {"hello": "a"})
- sent = sess._socket.readsentmsg();
-
-
- def test_session_sendmsg_shortwrite(self):
- sess = MySession()
-
-
-
- sess._socket.set_send_limit(10)
- sess.sendmsg({'to': 'someone', 'reply': 1}, {"hello": "a"})
-
- sent = sess._socket.readsentmsg();
- def recv_and_compare(self, session, bytes, env, msg):
- """Adds bytes to the recvqueue (which will be read by the
- session object, and compare the resultinv env and msg to
- the ones given."""
- session._socket.addrecv(bytes)
- s_env, s_msg = session.recvmsg(False)
- self.assertEqual(env, s_env)
- self.assertEqual(msg, s_msg)
-
-
-
- session._socket.recvqueue = bytearray()
- def test_session_recvmsg(self):
- sess = MySession()
-
-
-
-
-
-
-
-
-
-
-
-
-
-
-
-
-
-
-
-
-
-
- sess._socket.recvqueue = bytearray()
-
-
-
-
- self.assertFalse(sess.has_queued_msgs())
- sess._socket.addrecv({'to': 'someone', 'reply': 1}, {"hello": "a"})
- env, msg = sess.recvmsg(True)
- self.assertEqual(None, env)
- self.assertTrue(sess.has_queued_msgs())
- env, msg = sess.recvmsg(True, 1)
- self.assertEqual({'to': 'someone', 'reply': 1}, env)
- self.assertEqual({"hello": "a"}, msg)
- self.assertFalse(sess.has_queued_msgs())
-
-
- self.assertFalse(sess.has_queued_msgs())
- sess._socket.addrecv({'to': 'someone', 'reply': 1}, {"hello": "a"})
- env, msg = sess.recvmsg(True, 2)
- self.assertEqual(None, env)
- self.assertEqual(None, msg)
- self.assertTrue(sess.has_queued_msgs())
- env, msg = sess.recvmsg(True, 1)
- self.assertEqual({'to': 'someone', 'reply': 1}, env)
- self.assertEqual({"hello": "a"}, msg)
- self.assertFalse(sess.has_queued_msgs())
-
-
- self.assertFalse(sess.has_queued_msgs())
- sess._socket.addrecv({'to': 'someone', 'reply': 1}, {"hello": "a"})
- env, msg = sess.recvmsg(True, 2)
- self.assertEqual(None, env)
- self.assertEqual(None, msg)
- self.assertTrue(sess.has_queued_msgs())
- env, msg = sess.recvmsg(True, 1)
- self.assertEqual({'to': 'someone', 'reply': 1}, env)
- self.assertEqual({"hello": "a"}, msg)
- self.assertFalse(sess.has_queued_msgs())
-
-
-
-
- self.assertFalse(sess.has_queued_msgs())
- sess._socket.addrecv({'to': 'someone', 'reply': 1}, {'hello': 'a'})
- env, msg = sess.recvmsg(True, 2)
- self.assertEqual(None, env)
- self.assertEqual(None, msg)
- self.assertTrue(sess.has_queued_msgs())
- sess._socket.addrecv({'to': 'someone' }, {'hello': 'b'})
- env, msg = sess.recvmsg(True, 1)
- self.assertEqual({'to': 'someone', 'reply': 1 }, env)
- self.assertEqual({"hello": "a"}, msg)
- self.assertFalse(sess.has_queued_msgs())
- env, msg = sess.recvmsg(True)
- self.assertEqual({'to': 'someone'}, env)
- self.assertEqual({"hello": "b"}, msg)
- self.assertFalse(sess.has_queued_msgs())
-
-
-
- self.assertFalse(sess.has_queued_msgs())
- sess._socket.addrecv({'to': 'someone' }, {'hello': 'b'})
- sess._socket.addrecv({'to': 'someone', 'reply': 1}, {'hello': 'a'})
- env, msg = sess.recvmsg(True, 1)
- self.assertEqual({'to': 'someone', 'reply': 1}, env)
- self.assertEqual({"hello": "a"}, msg)
- self.assertTrue(sess.has_queued_msgs())
- env, msg = sess.recvmsg(True)
- self.assertEqual({'to': 'someone'}, env)
- self.assertEqual({"hello": "b"}, msg)
- self.assertFalse(sess.has_queued_msgs())
- def test_recv_bad_msg(self):
- sess = MySession()
- self.assertFalse(sess.has_queued_msgs())
- sess._socket.addrecv({'to': 'someone' }, {'hello': 'b'})
- sess._socket.addrecv({'to': 'someone', 'reply': 1}, {'hello': 'a'})
-
- sess._socket.recvqueue[5] = sess._socket.recvqueue[5] - 2
- sess._socket.recvqueue = sess._socket.recvqueue[:-2]
- self.assertRaises(SessionError, sess.recvmsg, True, 1)
- def test_next_sequence(self):
- sess = MySession()
- self.assertEqual(sess._sequence, 1)
- self.assertEqual(sess._next_sequence(), 2)
- self.assertEqual(sess._sequence, 2)
- sess._sequence = 47805
- self.assertEqual(sess._sequence, 47805)
- self.assertEqual(sess._next_sequence(), 47806)
- self.assertEqual(sess._sequence, 47806)
- def test_group_subscribe(self):
- sess = MySession()
- sess.group_subscribe("mygroup")
- sent = sess._socket.readsentmsg_parsed()
- self.assertEqual(sent, ({"group": "mygroup", "type": "subscribe",
- "instance": "*"}, {}))
- sess.group_subscribe("mygroup")
- sent = sess._socket.readsentmsg_parsed()
- self.assertEqual(sent, ({"group": "mygroup", "type": "subscribe",
- "instance": "*"}, {}))
- sess.group_subscribe("mygroup", "my_instance")
- sent = sess._socket.readsentmsg_parsed()
- self.assertEqual(sent, ({"group": "mygroup", "type": "subscribe",
- "instance": "my_instance"}, {}))
- def test_group_unsubscribe(self):
- sess = MySession()
- sess.group_unsubscribe("mygroup")
- sent = sess._socket.readsentmsg_parsed()
- self.assertEqual(sent, ({"group": "mygroup", "type": "unsubscribe",
- "instance": "*"}, {}))
- sess.group_unsubscribe("mygroup")
- sent = sess._socket.readsentmsg_parsed()
- self.assertEqual(sent, ({"group": "mygroup", "type": "unsubscribe",
- "instance": "*"}, {}))
- sess.group_unsubscribe("mygroup", "my_instance")
- sent = sess._socket.readsentmsg_parsed()
- self.assertEqual(sent, ({"group": "mygroup", "type": "unsubscribe",
- "instance": "my_instance"}, {}))
- def test_group_sendmsg(self):
- sess = MySession()
- self.assertEqual(sess._sequence, 1)
- sess.group_sendmsg({ 'hello': 'a' }, "my_group")
- sent = sess._socket.readsentmsg_parsed()
- self.assertEqual(sent, ({"from": "test_name", "seq": 2, "to": "*",
- "instance": "*", "group": "my_group",
- "type": "send"}, {"hello": "a"}))
- self.assertEqual(sess._sequence, 2)
- sess.group_sendmsg({ 'hello': 'a' }, "my_group", "my_instance")
- sent = sess._socket.readsentmsg_parsed()
- self.assertEqual(sent, ({"from": "test_name", "seq": 3, "to": "*", "instance": "my_instance", "group": "my_group", "type": "send"}, {"hello": "a"}))
- self.assertEqual(sess._sequence, 3)
- sess.group_sendmsg({ 'hello': 'a' }, "your_group", "your_instance")
- sent = sess._socket.readsentmsg_parsed()
- self.assertEqual(sent, ({"from": "test_name", "seq": 4, "to": "*", "instance": "your_instance", "group": "your_group", "type": "send"}, {"hello": "a"}))
- self.assertEqual(sess._sequence, 4)
- def test_group_recvmsg(self):
-
-
- pass
- def test_group_reply(self):
- sess = MySession()
- sess.group_reply({ 'from': 'me', 'group': 'our_group',
- 'instance': 'other_instance', 'seq': 4},
- {"hello": "a"})
- sent = sess._socket.readsentmsg_parsed();
- self.assertEqual(sent, ({"from": "test_name", "seq": 2,
- "to": "me", "instance": "other_instance",
- "reply": 4, "group": "our_group",
- "type": "send"},
- {"hello": "a"}))
- sess.group_reply({ 'from': 'me', 'group': 'our_group',
- 'instance': 'other_instance', 'seq': 9},
- {"hello": "a"})
- sent = sess._socket.readsentmsg_parsed();
- self.assertEqual(sent, ({"from": "test_name", "seq": 3,
- "to": "me", "instance": "other_instance",
- "reply": 9, "group": "our_group",
- "type": "send"},
- {"hello": "a"}))
- def test_timeout(self):
- if "BIND10_TEST_SOCKET_FILE" not in os.environ:
- self.assertEqual("", "This test can only run if the value BIND10_TEST_SOCKET_FILE is set in the environment")
- TEST_SOCKET_FILE = os.environ["BIND10_TEST_SOCKET_FILE"]
-
- s1 = socket.socket(socket.AF_UNIX, socket.SOCK_STREAM)
- if os.path.exists(TEST_SOCKET_FILE):
- os.remove(TEST_SOCKET_FILE)
- s1.bind(TEST_SOCKET_FILE)
- try:
- s1.listen(1)
- s2 = socket.socket(socket.AF_UNIX, socket.SOCK_STREAM)
- s2.connect(TEST_SOCKET_FILE)
- sess = MySession(1, s2)
-
- sess.set_timeout(100)
- self.assertRaises(SessionTimeout, sess.group_recvmsg, False)
- finally:
- os.remove(TEST_SOCKET_FILE)
- if __name__ == "__main__":
- unittest.main()
|