123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275276277278 |
- """
- In this test file, we actually start msgq as a process and test it
- as a whole. It may be considered a system test instead of unit test,
- but apart from the terminology, we don't care much. We need to test
- the message queue works as expected, together with the libraries.
- In each test, we first start a timeout (because we do some waits
- for messages and if they wouldn't come, the test could block indefinitely).
- The timeout is long, because it is for the case the test fails.
- We then start the msgq and wait for the socket file to appear
- (that should indicate it is ready to receive connections). Then the
- actual test starts. After the test, we kill it and remove the test file.
- We also register signal handlers for many signals. Even in the case
- the test is interrupted or crashes, we should ensure the message queue
- itself is terminated.
- """
- import unittest
- import os
- import signal
- import sys
- import subprocess
- import time
- import isc.log
- import isc.cc.session
- from isc.cc.proto_defs import *
- SOCKET_PATH = os.path.abspath(os.environ['B10_FROM_BUILD'] + '/msgq.sock')
- MSGQ_PATH = os.environ['B10_FROM_BUILD'] + '/src/bin/msgq/run_msgq.sh'
- TIMEOUT = 15
- class MsgqRunTest(unittest.TestCase):
- def setUp(self):
- """
- As described above - check the socket file does not exist.
- Then register signals and timeouts. Finally, launch msgq
- and wait for it to start.
- """
- self.__msgq = None
- self.__opened_connections = []
-
- self.assertFalse(os.path.exists(SOCKET_PATH))
- signal.alarm(TIMEOUT)
- self.__orig_signals = {}
-
-
-
- for sig in [signal.SIGHUP, signal.SIGINT, signal.SIGQUIT,
- signal.SIGILL, signal.SIGTRAP, signal.SIGABRT, signal.SIGBUS,
- signal.SIGFPE, signal.SIGALRM, signal.SIGTERM]:
- self.__orig_signals[sig] = signal.signal(sig, self.__signal)
-
- self.__msgq = subprocess.Popen([MSGQ_PATH, '-s', SOCKET_PATH],
- close_fds=True)
-
- self.__no_recpt = {"result": [-1, "No such recipient"]}
-
- connection = None
- while not connection:
- try:
-
-
- connection = isc.cc.session.Session(SOCKET_PATH)
- except isc.cc.session.SessionError:
- time.sleep(0.1)
-
-
-
- connection.close()
- def __message(self, data):
- """
- Provide some testing message. The data will be included in it, so
- several different messages can be created.
- """
- return {"Message": "Text", "Data": data}
- def tearDown(self):
- """
- Perform cleanup after the test.
- """
- self.__cleanup()
- def __signal(self, signal, frame):
- """
- Called from a signal handler. We perform some cleanup, output
- a complain and terminate with error.
- """
- self.__cleanup()
- sys.stderr.write("Test terminating from signal " + str(signal) +
- " in " + str(frame) + "\n")
- sys.exit(1)
- def __cleanup(self):
- """
- Kill msgq (if running) and restore original signal handlers.
- """
-
- for conn in self.__opened_connections:
- conn.close()
- self.__opened_connections = []
- if self.__msgq:
- self.__msgq.kill()
- self.__msgq = None
- if os.path.exists(SOCKET_PATH):
- os.unlink(SOCKET_PATH)
- for sig in self.__orig_signals:
- signal.signal(sig, self.__orig_signals[sig])
-
- signal.alarm(0)
- def __get_connection(self):
- """
- Create a connection to the daemon and make sure it is properly closed
- at the end of the test.
- """
- connection = isc.cc.session.Session(SOCKET_PATH)
- self.__opened_connections.append(connection)
- return connection
- def test_send_direct(self):
- """
- Connect twice to msgq, send a message from one to another using direct
- l-name and see it comes.
- """
-
- conn1 = self.__get_connection()
- conn2 = self.__get_connection()
-
- lname1 = conn1.lname
- conn2.group_sendmsg(self.__message(1), "*", to=lname1)
-
- (msg, env) = conn1.group_recvmsg(nonblock=False)
- self.assertEqual(self.__message(1), msg)
-
-
- self.assertEqual(lname1, env[CC_HEADER_TO])
- self.assertEqual(conn2.lname, env[CC_HEADER_FROM])
- self.assertEqual("*", env[CC_HEADER_GROUP])
- self.assertEqual(CC_INSTANCE_WILDCARD, env[CC_HEADER_INSTANCE])
- self.assertEqual(CC_COMMAND_SEND, env[CC_HEADER_TYPE])
- self.assertFalse(env[CC_HEADER_WANT_ANSWER])
- def __barrier(self, connections):
- """
- Make sure all previous commands on all supplied connections are
- processed, by sending a ping and waiting for an answer.
- """
- for c in connections:
- c.sendmsg({"type": "ping"})
- for c in connections:
- pong = c.recvmsg(nonblock=False)
- self.assertEqual(({"type": "pong"}, None), pong)
- def test_send_group(self):
- """
- Create several connections. First, try to send a message to a (empty)
- group and see an error is bounced back. Then subscribe the others
- to the group and send it again. Send to a different group and see it
- bounced back. Unsubscribe and see it is bounced again.
- Then the other connections answer (after unsubscribing, strange, but
- legal). See both answers come.
- Then, look there are no more waiting messages.
- """
- conn_a = self.__get_connection()
- conn_b = []
- for i in range(0, 10):
- conn_b.append(self.__get_connection())
-
- seq = conn_a.group_sendmsg(self.__message(1), "group",
- want_answer=True)
- (msg, env) = conn_a.group_recvmsg(nonblock=False, seq=seq)
- self.assertEqual(self.__no_recpt, msg)
- self.assertEqual(conn_a.lname, env[CC_HEADER_TO])
-
- for c in conn_b:
- c.group_subscribe("group")
-
-
- self.__barrier(conn_b)
-
- seq = conn_a.group_sendmsg(self.__message(2), "group",
- want_answer=True)
- envs = []
- for c in conn_b:
- (msg, env) = c.group_recvmsg(nonblock=False)
- self.assertEqual(self.__message(2), msg)
- self.assertEqual(conn_a.lname, env[CC_HEADER_FROM])
-
- self.assertEqual(CC_TO_WILDCARD, env[CC_HEADER_TO])
- self.assertEqual("group", env[CC_HEADER_GROUP])
- self.assertEqual(CC_INSTANCE_WILDCARD, env[CC_HEADER_INSTANCE])
- self.assertEqual(CC_COMMAND_SEND, env[CC_HEADER_TYPE])
- self.assertTrue(env[CC_HEADER_WANT_ANSWER])
- envs.append(env)
-
- seq_ne = conn_a.group_sendmsg(self.__message(3), "no-group",
- want_answer=True)
- (msg, env) = conn_a.group_recvmsg(nonblock=False, seq=seq_ne)
- self.assertEqual(self.__no_recpt, msg)
- self.assertEqual(conn_a.lname, env[CC_HEADER_TO])
-
- for c in conn_b:
- c.group_unsubscribe("group")
-
- self.__barrier(conn_b)
- seq_ne = conn_a.group_sendmsg(self.__message(4), "group",
- want_answer=True)
- (msg, env) = conn_a.group_recvmsg(nonblock=False, seq=seq_ne)
- self.assertEqual(self.__no_recpt, msg)
- self.assertEqual(conn_a.lname, env[CC_HEADER_TO])
-
- lnames = set()
- for (c, env) in zip(conn_b, envs):
- c.group_reply(env, self.__message("Reply"))
- lnames.add(c.lname)
-
- while lnames:
-
-
- (msg, env) = conn_a.group_recvmsg(nonblock=False, seq=seq)
- self.assertEqual(self.__message("Reply"), msg)
- lname = env[CC_HEADER_FROM]
- self.assertTrue(lname in lnames)
- lnames.remove(lname)
-
-
-
- self.__barrier(conn_b)
- self.__barrier([conn_a])
- for c in conn_b:
- self.assertEqual((None, None), c.group_recvmsg())
- self.assertEqual((None, None), conn_a.group_recvmsg())
- def test_conn_disconn(self):
- """
- Keep connecting and disconnecting, checking we can still send
- and receive messages.
- """
- conn = self.__get_connection()
- conn.group_subscribe("group")
- for i in range(0, 50):
- new = self.__get_connection()
- new.group_subscribe("group")
- self.__barrier([conn, new])
- new.group_sendmsg(self.__message(i), "group")
- (msg, env) = conn.group_recvmsg(nonblock=False)
- self.assertEqual(self.__message(i), msg)
- conn.close()
- conn = new
- if __name__ == '__main__':
- isc.log.init("msgq-tests")
- isc.log.resetUnitTestRootLogger()
- unittest.main()
|