|
@@ -12,6 +12,8 @@ import threading
|
|
|
import isc.cc
|
|
|
import collections
|
|
|
import isc.log
|
|
|
+import struct
|
|
|
+import json
|
|
|
|
|
|
#
|
|
|
# Currently only the subscription part and some sending is implemented...
|
|
@@ -141,6 +143,150 @@ class TestSubscriptionManager(unittest.TestCase):
|
|
|
self.sm.subscribe('ConfigManager', '*', 's3')
|
|
|
self.assertEqual(1, self.__cfgmgr_ready_called)
|
|
|
|
|
|
+class MsgQTest(unittest.TestCase):
|
|
|
+ """
|
|
|
+ Tests for the behaviour of MsgQ. This is for the core of MsgQ, other
|
|
|
+ subsystems are in separate test fixtures.
|
|
|
+ """
|
|
|
+ def setUp(self):
|
|
|
+ self.__msgq = MsgQ()
|
|
|
+
|
|
|
+ def parse_msg(self, msg):
|
|
|
+ """
|
|
|
+ Parse a binary representation of message to the routing header and the
|
|
|
+ data payload. It assumes the message is correctly encoded and the
|
|
|
+ payload is not omitted. It'd probably throw in other cases, but we
|
|
|
+ don't use it in such situations in this test.
|
|
|
+ """
|
|
|
+ (length, header_len) = struct.unpack('>IH', msg[:6])
|
|
|
+ header = json.loads(msg[6:6 + header_len].decode('utf-8'))
|
|
|
+ data = json.loads(msg[6 + header_len:].decode('utf-8'))
|
|
|
+ return (header, data)
|
|
|
+
|
|
|
+ def test_undeliverable_errors(self):
|
|
|
+ """
|
|
|
+ Send several packets through the MsgQ and check it generates
|
|
|
+ undeliverable notifications under the correct circumstances.
|
|
|
+
|
|
|
+ The test is not exhaustive as it doesn't test all combination
|
|
|
+ of existence of the recipient, addressing schemes, want_answer
|
|
|
+ header and the reply header. It is not needed, these should
|
|
|
+ be mostly independant. That means, for example, if the message
|
|
|
+ is a reply and there's no recipient to send it to, the error
|
|
|
+ would not be generated no matter if we addressed the recipient
|
|
|
+ by lname or group. If we included everything, the test would
|
|
|
+ have too many scenarios with little benefit.
|
|
|
+ """
|
|
|
+ self.__sent_messages = []
|
|
|
+ def fake_send_prepared_msg(socket, msg):
|
|
|
+ self.__sent_messages.append((socket, msg))
|
|
|
+ return True
|
|
|
+ self.__msgq.send_prepared_msg = fake_send_prepared_msg
|
|
|
+ # These would be real sockets in the MsgQ, but we pass them as
|
|
|
+ # parameters only, so we don't need them to be. We use simple
|
|
|
+ # integers to tell one from another.
|
|
|
+ sender = 1
|
|
|
+ recipient = 2
|
|
|
+ another_recipiet = 3
|
|
|
+ # The routing headers and data to test with.
|
|
|
+ routing = {
|
|
|
+ 'to': '*',
|
|
|
+ 'from': 'sender',
|
|
|
+ 'group': 'group',
|
|
|
+ 'instance': '*',
|
|
|
+ 'seq': 42
|
|
|
+ }
|
|
|
+ data = {
|
|
|
+ "data": "Just some data"
|
|
|
+ }
|
|
|
+
|
|
|
+ # Some common checking patterns
|
|
|
+ def check_error():
|
|
|
+ self.assertEqual(1, len(self.__sent_messages))
|
|
|
+ self.assertEqual(1, self.__sent_messages[0][0])
|
|
|
+ self.assertEqual(({
|
|
|
+ 'group': 'group',
|
|
|
+ 'instance': '*',
|
|
|
+ 'reply': 42,
|
|
|
+ 'seq': 42,
|
|
|
+ 'from': 'msgq',
|
|
|
+ 'to': 'sender',
|
|
|
+ 'want_answer': True
|
|
|
+ }, {'result': [-1, "No such recipient"]}),
|
|
|
+ self.parse_msg(self.__sent_messages[0][1]))
|
|
|
+ self.__sent_messages = []
|
|
|
+
|
|
|
+ def check_no_message():
|
|
|
+ self.assertEqual([], self.__sent_messages)
|
|
|
+
|
|
|
+ def check_delivered(rcpt_socket=recipient):
|
|
|
+ self.assertEqual(1, len(self.__sent_messages))
|
|
|
+ self.assertEqual(rcpt_socket, self.__sent_messages[0][0])
|
|
|
+ self.assertEqual((routing, data),
|
|
|
+ self.parse_msg(self.__sent_messages[0][1]))
|
|
|
+ self.__sent_messages = []
|
|
|
+
|
|
|
+ # Send the message. No recipient, but errors are not requested,
|
|
|
+ # so none is generated.
|
|
|
+ self.__msgq.process_command_send(sender, routing, data)
|
|
|
+ check_no_message()
|
|
|
+
|
|
|
+ # It should act the same if we explicitly say we do not want replies.
|
|
|
+ routing["want_answer"] = False
|
|
|
+ self.__msgq.process_command_send(sender, routing, data)
|
|
|
+ check_no_message()
|
|
|
+
|
|
|
+ # Ask for errors if it can't be delivered.
|
|
|
+ routing["want_answer"] = True
|
|
|
+ self.__msgq.process_command_send(sender, routing, data)
|
|
|
+ check_error()
|
|
|
+
|
|
|
+ # If the message is a reply itself, we never generate the errors
|
|
|
+ routing["reply"] = 3
|
|
|
+ self.__msgq.process_command_send(sender, routing, data)
|
|
|
+ check_no_message()
|
|
|
+
|
|
|
+ # If there are recipients (but no "reply" header), the error should not
|
|
|
+ # be sent and the message should get delivered.
|
|
|
+ del routing["reply"]
|
|
|
+ self.__msgq.subs.find = lambda group, instance: [recipient]
|
|
|
+ self.__msgq.process_command_send(sender, routing, data)
|
|
|
+ check_delivered()
|
|
|
+
|
|
|
+ # When we send a direct message and the recipient is not there, we get
|
|
|
+ # the error too
|
|
|
+ routing["to"] = "lname"
|
|
|
+ self.__msgq.process_command_send(sender, routing, data)
|
|
|
+ check_error()
|
|
|
+
|
|
|
+ # But when the recipient is there, it is delivered and no error is
|
|
|
+ # generated.
|
|
|
+ self.__msgq.lnames["lname"] = recipient
|
|
|
+ self.__msgq.process_command_send(sender, routing, data)
|
|
|
+ check_delivered()
|
|
|
+
|
|
|
+ # If an attempt to send fails, consider it no recipient.
|
|
|
+ def fail_send_prepared_msg(socket, msg):
|
|
|
+ '''
|
|
|
+ Pretend sending a message failed. After one call, return to the
|
|
|
+ usual mock, so the errors or other messages can be sent.
|
|
|
+ '''
|
|
|
+ self.__msgq.send_prepared_msg = fake_send_prepared_msg
|
|
|
+ return False
|
|
|
+
|
|
|
+ self.__msgq.send_prepared_msg = fail_send_prepared_msg
|
|
|
+ self.__msgq.process_command_send(sender, routing, data)
|
|
|
+ check_error()
|
|
|
+
|
|
|
+ # But if there are more recipients and only one fails, it should
|
|
|
+ # be delivered to the other and not considered an error
|
|
|
+ self.__msgq.send_prepared_msg = fail_send_prepared_msg
|
|
|
+ routing["to"] = '*'
|
|
|
+ self.__msgq.subs.find = lambda group, instance: [recipient,
|
|
|
+ another_recipiet]
|
|
|
+ self.__msgq.process_command_send(sender, routing, data)
|
|
|
+ check_delivered(rcpt_socket=another_recipiet)
|
|
|
+
|
|
|
class DummySocket:
|
|
|
"""
|
|
|
Dummy socket class.
|
|
@@ -245,17 +391,27 @@ class SendNonblock(unittest.TestCase):
|
|
|
self.assertEqual(0, status,
|
|
|
"The task did not complete successfully in time")
|
|
|
|
|
|
+ def get_msgq_with_sockets(self):
|
|
|
+ '''
|
|
|
+ Create a message queue and prepare it for use with a socket pair.
|
|
|
+ The write end is put into the message queue, so we can check it.
|
|
|
+ It returns (msgq, read_end, write_end). It is expected the sockets
|
|
|
+ are closed by the caller afterwards.
|
|
|
+ '''
|
|
|
+ msgq = MsgQ()
|
|
|
+ # We do only partial setup, so we don't create the listening socket
|
|
|
+ msgq.setup_poller()
|
|
|
+ (read, write) = socket.socketpair(socket.AF_UNIX, socket.SOCK_STREAM)
|
|
|
+ msgq.register_socket(write)
|
|
|
+ return (msgq, read, write)
|
|
|
+
|
|
|
def infinite_sender(self, sender):
|
|
|
"""
|
|
|
Sends data until an exception happens. socket.error is caught,
|
|
|
as it means the socket got closed. Sender is called to actually
|
|
|
send the data.
|
|
|
"""
|
|
|
- msgq = MsgQ()
|
|
|
- # We do only partial setup, so we don't create the listening socket
|
|
|
- msgq.setup_poller()
|
|
|
- (read, write) = socket.socketpair(socket.AF_UNIX, socket.SOCK_STREAM)
|
|
|
- msgq.register_socket(write)
|
|
|
+ (msgq, read, write) = self.get_msgq_with_sockets()
|
|
|
# Keep sending while it is not closed by the msgq
|
|
|
try:
|
|
|
while True:
|
|
@@ -291,6 +447,34 @@ class SendNonblock(unittest.TestCase):
|
|
|
self.terminate_check(lambda: self.infinite_sender(
|
|
|
lambda msgq, socket: msgq.send_prepared_msg(socket, data)))
|
|
|
|
|
|
+ def test_sendprepared_success(self):
|
|
|
+ '''
|
|
|
+ Test the send_prepared_msg returns success when queueing messages.
|
|
|
+ It does so on the first attempt (when it actually tries to send
|
|
|
+ something to the socket) and on any attempt that follows and the
|
|
|
+ buffer is already full.
|
|
|
+ '''
|
|
|
+ (msgq, read, write) = self.get_msgq_with_sockets()
|
|
|
+ # Now keep sending until we fill in something into the internal
|
|
|
+ # buffer.
|
|
|
+ while not write.fileno() in msgq.sendbuffs:
|
|
|
+ self.assertTrue(msgq.send_prepared_msg(write, b'data'))
|
|
|
+ read.close()
|
|
|
+ write.close()
|
|
|
+
|
|
|
+ def test_sendprepared_epipe(self):
|
|
|
+ '''
|
|
|
+ Test the send_prepared_msg returns false when we try to queue a
|
|
|
+ message and the other side is not there any more. It should be done
|
|
|
+ with EPIPE, so not a fatal error.
|
|
|
+ '''
|
|
|
+ (msgq, read, write) = self.get_msgq_with_sockets()
|
|
|
+ # Close one end. It should make a EPIPE on the other.
|
|
|
+ read.close()
|
|
|
+ # Now it should soft-fail
|
|
|
+ self.assertFalse(msgq.send_prepared_msg(write, b'data'))
|
|
|
+ write.close()
|
|
|
+
|
|
|
def send_many(self, data):
|
|
|
"""
|
|
|
Tries that sending a command many times and getting an answer works.
|