msgq_test.py 8.5 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237
  1. from msgq import SubscriptionManager, MsgQ
  2. import unittest
  3. import os
  4. import socket
  5. import signal
  6. import sys
  7. import time
  8. import isc.cc
  9. #
  10. # Currently only the subscription part and some sending is implemented...
  11. # I'd have to mock out a socket, which, while not impossible, is not trivial.
  12. #
  13. class TestSubscriptionManager(unittest.TestCase):
  14. def setUp(self):
  15. self.sm = SubscriptionManager()
  16. def test_subscription_add_delete_manager(self):
  17. self.sm.subscribe("a", "*", 'sock1')
  18. self.assertEqual(self.sm.find_sub("a", "*"), [ 'sock1' ])
  19. def test_subscription_add_delete_other(self):
  20. self.sm.subscribe("a", "*", 'sock1')
  21. self.sm.unsubscribe("a", "*", 'sock2')
  22. self.assertEqual(self.sm.find_sub("a", "*"), [ 'sock1' ])
  23. def test_subscription_add_several_sockets(self):
  24. socks = [ 's1', 's2', 's3', 's4', 's5' ]
  25. for s in socks:
  26. self.sm.subscribe("a", "*", s)
  27. self.assertEqual(self.sm.find_sub("a", "*"), socks)
  28. def test_unsubscribe(self):
  29. socks = [ 's1', 's2', 's3', 's4', 's5' ]
  30. for s in socks:
  31. self.sm.subscribe("a", "*", s)
  32. self.sm.unsubscribe("a", "*", 's3')
  33. self.assertEqual(self.sm.find_sub("a", "*"), [ 's1', 's2', 's4', 's5' ])
  34. def test_unsubscribe_all(self):
  35. self.sm.subscribe('g1', 'i1', 's1')
  36. self.sm.subscribe('g1', 'i1', 's2')
  37. self.sm.subscribe('g1', 'i2', 's1')
  38. self.sm.subscribe('g1', 'i2', 's2')
  39. self.sm.subscribe('g2', 'i1', 's1')
  40. self.sm.subscribe('g2', 'i1', 's2')
  41. self.sm.subscribe('g2', 'i2', 's1')
  42. self.sm.subscribe('g2', 'i2', 's2')
  43. self.sm.unsubscribe_all('s1')
  44. self.assertEqual(self.sm.find_sub("g1", "i1"), [ 's2' ])
  45. self.assertEqual(self.sm.find_sub("g1", "i2"), [ 's2' ])
  46. self.assertEqual(self.sm.find_sub("g2", "i1"), [ 's2' ])
  47. self.assertEqual(self.sm.find_sub("g2", "i2"), [ 's2' ])
  48. def test_find(self):
  49. self.sm.subscribe('g1', 'i1', 's1')
  50. self.sm.subscribe('g1', '*', 's2')
  51. self.assertEqual(set(self.sm.find("g1", "i1")), set([ 's1', 's2' ]))
  52. def test_find_sub(self):
  53. self.sm.subscribe('g1', 'i1', 's1')
  54. self.sm.subscribe('g1', '*', 's2')
  55. self.assertEqual(self.sm.find_sub("g1", "i1"), [ 's1' ])
  56. def test_open_socket_parameter(self):
  57. self.assertFalse(os.path.exists("./my_socket_file"))
  58. msgq = MsgQ("./my_socket_file");
  59. msgq.setup()
  60. self.assertTrue(os.path.exists("./my_socket_file"))
  61. msgq.shutdown();
  62. self.assertFalse(os.path.exists("./my_socket_file"))
  63. def test_open_socket_environment_variable(self):
  64. self.assertFalse(os.path.exists("my_socket_file"))
  65. os.environ["BIND10_MSGQ_SOCKET_FILE"] = "./my_socket_file"
  66. msgq = MsgQ();
  67. msgq.setup()
  68. self.assertTrue(os.path.exists("./my_socket_file"))
  69. msgq.shutdown();
  70. self.assertFalse(os.path.exists("./my_socket_file"))
  71. def test_open_socket_default(self):
  72. env_var = None
  73. orig_socket_file = None
  74. if "BIND10_MSGQ_SOCKET_FILE" in os.environ:
  75. env_var = os.environ["BIND10_MSGQ_SOCKET_FILE"]
  76. del os.environ["BIND10_MSGQ_SOCKET_FILE"]
  77. # temporarily replace the class "default" not to be disrupted by
  78. # any running BIND 10 instance.
  79. if "BIND10_TEST_SOCKET_FILE" in os.environ:
  80. MsgQ.SOCKET_FILE = os.environ["BIND10_TEST_SOCKET_FILE"]
  81. socket_file = MsgQ.SOCKET_FILE
  82. self.assertFalse(os.path.exists(socket_file))
  83. msgq = MsgQ();
  84. try:
  85. msgq.setup()
  86. self.assertTrue(os.path.exists(socket_file))
  87. msgq.shutdown();
  88. self.assertFalse(os.path.exists(socket_file))
  89. except socket.error:
  90. # ok, the install path doesn't exist at all,
  91. # so we can't check any further
  92. pass
  93. if env_var is not None:
  94. os.environ["BIND10_MSGQ_SOCKET_FILE"] = env_var
  95. if orig_socket_file is not None:
  96. MsgQ.SOCKET_FILE = orig_socket_file
  97. def test_open_socket_bad(self):
  98. msgq = MsgQ("/does/not/exist")
  99. self.assertRaises(socket.error, msgq.setup)
  100. class SendNonblock(unittest.TestCase):
  101. """
  102. Tests that the whole thing will not get blocked if someone does not read.
  103. """
  104. def terminate_check(self, task, timeout = 10):
  105. """
  106. Runs task in separate process (task is a function) and checks
  107. it terminates sooner than timeout.
  108. """
  109. task_pid = os.fork()
  110. if task_pid == 0:
  111. # Kill the forked process after timeout by SIGALRM
  112. signal.alarm(timeout)
  113. # Run the task
  114. # If an exception happens or we run out of time, we terminate
  115. # with non-zero
  116. task()
  117. # If we got here, then everything worked well and in time
  118. # In that case, we terminate successfully
  119. sys.exit()
  120. else:
  121. (pid, status) = os.waitpid(task_pid, 0)
  122. self.assertEqual(0, status,
  123. "The task did not complete successfully in time")
  124. def infinite_sender(self, sender):
  125. """
  126. Sends data until an exception happens. socket.error is caught,
  127. as it means the socket got closed. Sender is called to actually
  128. send the data.
  129. """
  130. msgq = MsgQ()
  131. # We do only partial setup, so we don't create the listening socket
  132. msgq.setup_poller()
  133. (read, write) = socket.socketpair(socket.AF_UNIX, socket.SOCK_STREAM)
  134. msgq.register_socket(write)
  135. # Keep sending while it is not closed by the msgq
  136. try:
  137. while True:
  138. sender(msgq, write)
  139. except socket.error:
  140. pass
  141. def test_infinite_sendmsg(self):
  142. """
  143. Tries sending messages (and not reading them) until it either times
  144. out (in blocking call, wrong) or closes it (correct).
  145. """
  146. data = "data"
  147. for i in range(1, 10):
  148. data += data
  149. self.terminate_check(lambda: self.infinite_sender(
  150. lambda msgq, socket: msgq.sendmsg(socket, {}, {"message" : data})))
  151. def test_infinite_sendprepared(self):
  152. """
  153. Tries sending data (and not reading them) until it either times
  154. out (in blocking call, wrong) or closes it (correct).
  155. """
  156. data = b"data"
  157. for i in range(1, 10):
  158. data += data
  159. self.terminate_check(lambda: self.infinite_sender(
  160. lambda msgq, socket: msgq.send_prepared_msg(socket, data)))
  161. def send_many(self, data):
  162. """
  163. Tries that sending a command many times and getting an answer works.
  164. """
  165. msgq = MsgQ()
  166. # msgq.run needs to compare with the listen_socket, so we provide
  167. # a replacement
  168. class DummySocket:
  169. def fileno():
  170. return -1
  171. msgq.listen_socket = DummySocket
  172. (queue, out) = socket.socketpair(socket.AF_UNIX, socket.SOCK_STREAM)
  173. def run():
  174. length = len(data)
  175. queue_pid = os.fork()
  176. if queue_pid == 0:
  177. signal.alarm(30)
  178. msgq.setup_poller()
  179. msgq.register_socket(queue)
  180. msgq.run()
  181. else:
  182. try:
  183. def killall(signum, frame):
  184. os.kill(queue_pid, signal.SIGTERM)
  185. sys.exit(1)
  186. signal.signal(signal.SIGALRM, killall)
  187. msg = msgq.preparemsg({"type" : "ping"}, data)
  188. now = time.clock()
  189. while time.clock() - now < 0.2:
  190. out.sendall(msg)
  191. # Check the answer
  192. (routing, received) = msgq.read_packet(out.fileno(),
  193. out)
  194. self.assertEqual({"type" : "pong"},
  195. isc.cc.message.from_wire(routing))
  196. self.assertEqual(data, received)
  197. finally:
  198. os.kill(queue_pid, signal.SIGTERM)
  199. self.terminate_check(run)
  200. def test_small_sends(self):
  201. """
  202. Tests sending small data many times.
  203. """
  204. self.send_many(b"data")
  205. def test_large_sends(self):
  206. """
  207. Tests sending large data many times.
  208. """
  209. data = b"data"
  210. for i in range(1, 20):
  211. data = data + data
  212. self.send_many(data)
  213. if __name__ == '__main__':
  214. unittest.main()