msgq_run_test.py 15 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275276277278279280281282283284285286287288289290291292293294295296297298299300301302303304305306307308309310311312313314315316317318319320321322323324325326327328329330331332333334335336337338339340341342343344345346347348349350
  1. # Copyright (C) 2013 Internet Systems Consortium.
  2. #
  3. # Permission to use, copy, modify, and distribute this software for any
  4. # purpose with or without fee is hereby granted, provided that the above
  5. # copyright notice and this permission notice appear in all copies.
  6. #
  7. # THE SOFTWARE IS PROVIDED "AS IS" AND INTERNET SYSTEMS CONSORTIUM
  8. # DISCLAIMS ALL WARRANTIES WITH REGARD TO THIS SOFTWARE INCLUDING ALL
  9. # IMPLIED WARRANTIES OF MERCHANTABILITY AND FITNESS. IN NO EVENT SHALL
  10. # INTERNET SYSTEMS CONSORTIUM BE LIABLE FOR ANY SPECIAL, DIRECT,
  11. # INDIRECT, OR CONSEQUENTIAL DAMAGES OR ANY DAMAGES WHATSOEVER RESULTING
  12. # FROM LOSS OF USE, DATA OR PROFITS, WHETHER IN AN ACTION OF CONTRACT,
  13. # NEGLIGENCE OR OTHER TORTIOUS ACTION, ARISING OUT OF OR IN CONNECTION
  14. # WITH THE USE OR PERFORMANCE OF THIS SOFTWARE.
  15. """
  16. In this test file, we actually start msgq as a process and test it
  17. as a whole. It may be considered a system test instead of unit test,
  18. but apart from the terminology, we don't care much. We need to test
  19. the message queue works as expected, together with the libraries.
  20. In each test, we first start a timeout (because we do some waits
  21. for messages and if they wouldn't come, the test could block indefinitely).
  22. The timeout is long, because it is for the case the test fails.
  23. We then start the msgq and wait for the socket file to appear
  24. (that should indicate it is ready to receive connections). Then the
  25. actual test starts. After the test, we kill it and remove the test file.
  26. We also register signal handlers for many signals. Even in the case
  27. the test is interrupted or crashes, we should ensure the message queue
  28. itself is terminated.
  29. """
  30. import unittest
  31. import os
  32. import signal
  33. import sys
  34. import subprocess
  35. import time
  36. import isc.log
  37. import isc.cc.session
  38. from isc.cc.proto_defs import *
  39. # Due to problems with too long path on build bots, we place the socket
  40. # into the top-level build directory. That is ugly, but works.
  41. SOCKET_PATH = os.path.abspath(os.environ['B10_FROM_BUILD'] + '/msgq.sock')
  42. MSGQ_PATH = os.environ['B10_FROM_BUILD'] + '/src/bin/msgq/run_msgq.sh'
  43. TIMEOUT = 15 # Some long time (seconds), for single test.
  44. class MsgqRunTest(unittest.TestCase):
  45. def setUp(self):
  46. """
  47. As described above - check the socket file does not exist.
  48. Then register signals and timeouts. Finally, launch msgq
  49. and wait for it to start.
  50. """
  51. self.__msgq = None
  52. self.__opened_connections = []
  53. # A precondition check
  54. self.assertFalse(os.path.exists(SOCKET_PATH))
  55. signal.alarm(TIMEOUT)
  56. self.__orig_signals = {}
  57. # Register handlers for many signals. Most of them probably
  58. # can't happen in python, but we register them anyway just to be
  59. # safe.
  60. for sig in [signal.SIGHUP, signal.SIGINT, signal.SIGQUIT,
  61. signal.SIGILL, signal.SIGTRAP, signal.SIGABRT, signal.SIGBUS,
  62. signal.SIGFPE, signal.SIGALRM, signal.SIGTERM]:
  63. self.__orig_signals[sig] = signal.signal(sig, self.__signal)
  64. # Start msgq
  65. self.__msgq = subprocess.Popen([MSGQ_PATH, '-s', SOCKET_PATH],
  66. close_fds=True)
  67. # Some testing data
  68. self.__no_recpt = {"result": [-1, "No such recipient"]}
  69. # Wait for it to become ready (up to the alarm-set timeout)
  70. connection = None
  71. while not connection:
  72. try:
  73. # If the msgq is ready, this'll succeed. If not, it'll throw
  74. # session error.
  75. connection = isc.cc.session.Session(SOCKET_PATH)
  76. except isc.cc.session.SessionError:
  77. time.sleep(0.1) # Retry after a short time
  78. # We have the connection now, that means it works. Close this
  79. # connection, we won't use it. Each test gets enough new connections
  80. # of its own.
  81. connection.close()
  82. def __message(self, data):
  83. """
  84. Provide some testing message. The data will be included in it, so
  85. several different messages can be created.
  86. """
  87. return {"Message": "Text", "Data": data}
  88. def tearDown(self):
  89. """
  90. Perform cleanup after the test.
  91. """
  92. self.__cleanup()
  93. def __signal(self, signal, frame):
  94. """
  95. Called from a signal handler. We perform some cleanup, output
  96. a complain and terminate with error.
  97. """
  98. self.__cleanup()
  99. sys.stderr.write("Test terminating from signal " + str(signal) +
  100. " in " + str(frame) + "\n")
  101. sys.exit(1)
  102. def __cleanup(self):
  103. """
  104. Kill msgq (if running) and restore original signal handlers.
  105. """
  106. # Remove the socket (as we kill, msgq might not clean up)
  107. for conn in self.__opened_connections:
  108. conn.close()
  109. self.__opened_connections = []
  110. if self.__msgq:
  111. self.__msgq.kill()
  112. self.__msgq = None
  113. if os.path.exists(SOCKET_PATH):
  114. os.unlink(SOCKET_PATH)
  115. for sig in self.__orig_signals:
  116. signal.signal(sig, self.__orig_signals[sig])
  117. # Cancel timeout (so someone else is not hit by it)
  118. signal.alarm(0)
  119. def __get_connection(self):
  120. """
  121. Create a connection to the daemon and make sure it is properly closed
  122. at the end of the test.
  123. """
  124. connection = isc.cc.session.Session(SOCKET_PATH)
  125. self.__opened_connections.append(connection)
  126. return connection
  127. def test_send_direct(self):
  128. """
  129. Connect twice to msgq, send a message from one to another using direct
  130. l-name and see it comes.
  131. """
  132. # Create the connections
  133. conn1 = self.__get_connection()
  134. conn2 = self.__get_connection()
  135. # Send the message
  136. lname1 = conn1.lname
  137. conn2.group_sendmsg(self.__message(1), "*", to=lname1)
  138. # Receive the message and see it contains correct data
  139. (msg, env) = conn1.group_recvmsg(nonblock=False)
  140. self.assertEqual(self.__message(1), msg)
  141. # We don't check there are no extra headers, just that none are missing
  142. # or wrong.
  143. self.assertEqual(lname1, env[CC_HEADER_TO])
  144. self.assertEqual(conn2.lname, env[CC_HEADER_FROM])
  145. self.assertEqual("*", env[CC_HEADER_GROUP])
  146. self.assertEqual(CC_INSTANCE_WILDCARD, env[CC_HEADER_INSTANCE])
  147. self.assertEqual(CC_COMMAND_SEND, env[CC_HEADER_TYPE])
  148. self.assertFalse(env[CC_HEADER_WANT_ANSWER])
  149. def __barrier(self, connections):
  150. """
  151. Make sure all previous commands on all supplied connections are
  152. processed, by sending a ping and waiting for an answer.
  153. """
  154. for c in connections:
  155. c.sendmsg({"type": "ping"})
  156. for c in connections:
  157. pong = c.recvmsg(nonblock=False)
  158. self.assertEqual(({"type": "pong"}, None), pong)
  159. def test_send_group(self):
  160. """
  161. Create several connections. First, try to send a message to a (empty)
  162. group and see an error is bounced back. Then subscribe the others
  163. to the group and send it again. Send to a different group and see it
  164. bounced back. Unsubscribe and see it is bounced again.
  165. Then the other connections answer (after unsubscribing, strange, but
  166. legal). See both answers come.
  167. Then, look there are no more waiting messages.
  168. """
  169. conn_a = self.__get_connection()
  170. conn_b = []
  171. for i in range(0, 10):
  172. conn_b.append(self.__get_connection())
  173. # Send a message to empty group and get an error answer
  174. seq = conn_a.group_sendmsg(self.__message(1), "group",
  175. want_answer=True)
  176. (msg, env) = conn_a.group_recvmsg(nonblock=False, seq=seq)
  177. self.assertEqual(self.__no_recpt, msg)
  178. self.assertEqual(conn_a.lname, env[CC_HEADER_TO])
  179. # Subscribe the two connections
  180. for c in conn_b:
  181. c.group_subscribe("group")
  182. # The subscribe doesn't wait for answer, so make sure it is
  183. # all processed before continuing.
  184. self.__barrier(conn_b)
  185. # Send a message to the group (this time not empty)
  186. seq = conn_a.group_sendmsg(self.__message(2), "group",
  187. want_answer=True)
  188. envs = []
  189. for c in conn_b:
  190. (msg, env) = c.group_recvmsg(nonblock=False)
  191. self.assertEqual(self.__message(2), msg)
  192. self.assertEqual(conn_a.lname, env[CC_HEADER_FROM])
  193. # The daemon does not mangle the headers. Is it OK?
  194. self.assertEqual(CC_TO_WILDCARD, env[CC_HEADER_TO])
  195. self.assertEqual("group", env[CC_HEADER_GROUP])
  196. self.assertEqual(CC_INSTANCE_WILDCARD, env[CC_HEADER_INSTANCE])
  197. self.assertEqual(CC_COMMAND_SEND, env[CC_HEADER_TYPE])
  198. self.assertTrue(env[CC_HEADER_WANT_ANSWER])
  199. envs.append(env)
  200. # Send to non-existing group
  201. seq_ne = conn_a.group_sendmsg(self.__message(3), "no-group",
  202. want_answer=True)
  203. (msg, env) = conn_a.group_recvmsg(nonblock=False, seq=seq_ne)
  204. self.assertEqual(self.__no_recpt, msg)
  205. self.assertEqual(conn_a.lname, env[CC_HEADER_TO])
  206. # Unsubscribe the connections
  207. for c in conn_b:
  208. c.group_unsubscribe("group")
  209. # Synchronize the unsubscriptions
  210. self.__barrier(conn_b)
  211. seq_ne = conn_a.group_sendmsg(self.__message(4), "group",
  212. want_answer=True)
  213. (msg, env) = conn_a.group_recvmsg(nonblock=False, seq=seq_ne)
  214. self.assertEqual(self.__no_recpt, msg)
  215. self.assertEqual(conn_a.lname, env[CC_HEADER_TO])
  216. # Send answers for the original message that was delivered
  217. lnames = set()
  218. for (c, env) in zip(conn_b, envs):
  219. c.group_reply(env, self.__message("Reply"))
  220. lnames.add(c.lname)
  221. # Check the both answers come
  222. while lnames:
  223. # While there are still connections we didn't get the answer from
  224. # (the order is not guaranteed, therefore the juggling with set)
  225. (msg, env) = conn_a.group_recvmsg(nonblock=False, seq=seq)
  226. self.assertEqual(self.__message("Reply"), msg)
  227. lname = env[CC_HEADER_FROM]
  228. self.assertTrue(lname in lnames)
  229. lnames.remove(lname)
  230. # The barrier makes the msgq process everything we sent. As the
  231. # processing is single-threaded in it, any stray message would have
  232. # arrived before the barrier ends.
  233. self.__barrier(conn_b)
  234. self.__barrier([conn_a])
  235. for c in conn_b:
  236. self.assertEqual((None, None), c.group_recvmsg())
  237. self.assertEqual((None, None), conn_a.group_recvmsg())
  238. def test_conn_disconn(self):
  239. """
  240. Keep connecting and disconnecting, checking we can still send
  241. and receive messages.
  242. """
  243. conn = self.__get_connection()
  244. conn.group_subscribe("group")
  245. for i in range(0, 50):
  246. new = self.__get_connection()
  247. new.group_subscribe("group")
  248. self.__barrier([conn, new])
  249. new.group_sendmsg(self.__message(i), "group")
  250. (msg, env) = conn.group_recvmsg(nonblock=False)
  251. self.assertEqual(self.__message(i), msg)
  252. conn.close()
  253. conn = new
  254. def test_notifications(self):
  255. """
  256. Check that the MsgQ is actually sending notifications about events.
  257. We create a socket, subscribe the socket itself and see it receives
  258. it's own notification.
  259. Testing all the places where notifications happen is task for the
  260. common unit tests in msgq_test.py.
  261. The test is here, because there might be some trouble with multiple
  262. threads in msgq (see the note about locking on the module CC session
  263. when sending message from one thread and listening for commands in the
  264. other) which would be hard to test using pure unit tests. Testing
  265. runnig whole msgq tests that implicitly.
  266. """
  267. conn = self.__get_connection()
  268. # Activate the session, pretend to be the config manager.
  269. conn.group_subscribe('ConfigManager')
  270. # Answer request for logging config
  271. (msg, env) = conn.group_recvmsg(nonblock=False)
  272. self.assertEqual({'command': ['get_config',
  273. {'module_name': 'Logging'}]},
  274. msg)
  275. conn.group_reply(env, {'result': [0, {}]})
  276. # It sends its spec.
  277. (msg, env) = conn.group_recvmsg(nonblock=False)
  278. self.assertEqual('module_spec', msg['command'][0])
  279. conn.group_reply(env, {'result': [0]})
  280. # It asks for its own config
  281. (msg, env) = conn.group_recvmsg(nonblock=False)
  282. self.assertEqual({'command': ['get_config',
  283. {'module_name': 'Msgq'}]},
  284. msg)
  285. conn.group_reply(env, {'result': [0, {}]})
  286. # Synchronization - make sure the session is running before
  287. # we continue, so we get the notification. Similar synchronisation
  288. # as in b10-init, but we don't have full ccsession here, so we
  289. # do so manually.
  290. synchronised = False
  291. attempts = 100
  292. while not synchronised and attempts > 0:
  293. time.sleep(0.1)
  294. seq = conn.group_sendmsg({'command': ['Are you running?']},
  295. 'Msgq', want_answer=True)
  296. msg = conn.group_recvmsg(nonblock=False, seq=seq)
  297. synchronised = msg[0] != -1
  298. attempts -= 1
  299. self.assertTrue(synchronised)
  300. # The actual test
  301. conn.group_subscribe('notifications/cc_members')
  302. (msg, env) = conn.group_recvmsg(nonblock=False)
  303. self.assertEqual({'notification': ['subscribed', {
  304. 'client': conn.lname,
  305. 'group': 'notifications/cc_members'
  306. }]}, msg)
  307. def test_multiple_invocations(self):
  308. """
  309. Check to make sure that an attempt to start a second copy of the MsgQ
  310. daemon fails.
  311. """
  312. self.assertTrue (os.path.exists(SOCKET_PATH))
  313. self.__retcode = subprocess.call([MSGQ_PATH, '-s', SOCKET_PATH])
  314. self.assertNotEqual(self.__retcode, 0)
  315. # Verify that the socket still exists and works. We re-call
  316. # test_send_direct as a means of testing that the existing
  317. # daemon is still behaving correctly.
  318. self.assertTrue (os.path.exists(SOCKET_PATH))
  319. self.test_send_direct()
  320. if __name__ == '__main__':
  321. isc.log.init("msgq-tests")
  322. isc.log.resetUnitTestRootLogger()
  323. unittest.main()