msgq_test.py 16 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275276277278279280281282283284285286287288289290291292293294295296297298299300301302303304305306307308309310311312313314315316317318319320321322323324325326327328329330331332333334335336337338339340341342343344345346347348349350351352353354355356357358359360361362363364365366367368369370371372373374375376377378379380381382383384385386387388389390391392393394395396397398399400401402403404405406407408409410411412413414415416417418419420421422423424425426427428429430431432433434435436437438439440441442443
  1. from msgq import SubscriptionManager, MsgQ
  2. import unittest
  3. import os
  4. import socket
  5. import signal
  6. import sys
  7. import time
  8. import errno
  9. import threading
  10. import isc.cc
  11. #
  12. # Currently only the subscription part and some sending is implemented...
  13. # I'd have to mock out a socket, which, while not impossible, is not trivial.
  14. #
  15. class TestSubscriptionManager(unittest.TestCase):
  16. def setUp(self):
  17. self.sm = SubscriptionManager()
  18. def test_subscription_add_delete_manager(self):
  19. self.sm.subscribe("a", "*", 'sock1')
  20. self.assertEqual(self.sm.find_sub("a", "*"), [ 'sock1' ])
  21. def test_subscription_add_delete_other(self):
  22. self.sm.subscribe("a", "*", 'sock1')
  23. self.sm.unsubscribe("a", "*", 'sock2')
  24. self.assertEqual(self.sm.find_sub("a", "*"), [ 'sock1' ])
  25. def test_subscription_add_several_sockets(self):
  26. socks = [ 's1', 's2', 's3', 's4', 's5' ]
  27. for s in socks:
  28. self.sm.subscribe("a", "*", s)
  29. self.assertEqual(self.sm.find_sub("a", "*"), socks)
  30. def test_unsubscribe(self):
  31. socks = [ 's1', 's2', 's3', 's4', 's5' ]
  32. for s in socks:
  33. self.sm.subscribe("a", "*", s)
  34. self.sm.unsubscribe("a", "*", 's3')
  35. self.assertEqual(self.sm.find_sub("a", "*"), [ 's1', 's2', 's4', 's5' ])
  36. def test_unsubscribe_all(self):
  37. self.sm.subscribe('g1', 'i1', 's1')
  38. self.sm.subscribe('g1', 'i1', 's2')
  39. self.sm.subscribe('g1', 'i2', 's1')
  40. self.sm.subscribe('g1', 'i2', 's2')
  41. self.sm.subscribe('g2', 'i1', 's1')
  42. self.sm.subscribe('g2', 'i1', 's2')
  43. self.sm.subscribe('g2', 'i2', 's1')
  44. self.sm.subscribe('g2', 'i2', 's2')
  45. self.sm.unsubscribe_all('s1')
  46. self.assertEqual(self.sm.find_sub("g1", "i1"), [ 's2' ])
  47. self.assertEqual(self.sm.find_sub("g1", "i2"), [ 's2' ])
  48. self.assertEqual(self.sm.find_sub("g2", "i1"), [ 's2' ])
  49. self.assertEqual(self.sm.find_sub("g2", "i2"), [ 's2' ])
  50. def test_find(self):
  51. self.sm.subscribe('g1', 'i1', 's1')
  52. self.sm.subscribe('g1', '*', 's2')
  53. self.assertEqual(set(self.sm.find("g1", "i1")), set([ 's1', 's2' ]))
  54. def test_find_sub(self):
  55. self.sm.subscribe('g1', 'i1', 's1')
  56. self.sm.subscribe('g1', '*', 's2')
  57. self.assertEqual(self.sm.find_sub("g1", "i1"), [ 's1' ])
  58. def test_open_socket_parameter(self):
  59. self.assertFalse(os.path.exists("./my_socket_file"))
  60. msgq = MsgQ("./my_socket_file");
  61. msgq.setup()
  62. self.assertTrue(os.path.exists("./my_socket_file"))
  63. msgq.shutdown();
  64. self.assertFalse(os.path.exists("./my_socket_file"))
  65. def test_open_socket_environment_variable(self):
  66. self.assertFalse(os.path.exists("my_socket_file"))
  67. os.environ["BIND10_MSGQ_SOCKET_FILE"] = "./my_socket_file"
  68. msgq = MsgQ();
  69. msgq.setup()
  70. self.assertTrue(os.path.exists("./my_socket_file"))
  71. msgq.shutdown();
  72. self.assertFalse(os.path.exists("./my_socket_file"))
  73. def test_open_socket_default(self):
  74. env_var = None
  75. orig_socket_file = None
  76. if "BIND10_MSGQ_SOCKET_FILE" in os.environ:
  77. env_var = os.environ["BIND10_MSGQ_SOCKET_FILE"]
  78. del os.environ["BIND10_MSGQ_SOCKET_FILE"]
  79. # temporarily replace the class "default" not to be disrupted by
  80. # any running BIND 10 instance.
  81. if "BIND10_TEST_SOCKET_FILE" in os.environ:
  82. MsgQ.SOCKET_FILE = os.environ["BIND10_TEST_SOCKET_FILE"]
  83. socket_file = MsgQ.SOCKET_FILE
  84. self.assertFalse(os.path.exists(socket_file))
  85. msgq = MsgQ();
  86. try:
  87. msgq.setup()
  88. self.assertTrue(os.path.exists(socket_file))
  89. msgq.shutdown();
  90. self.assertFalse(os.path.exists(socket_file))
  91. except socket.error:
  92. # ok, the install path doesn't exist at all,
  93. # so we can't check any further
  94. pass
  95. if env_var is not None:
  96. os.environ["BIND10_MSGQ_SOCKET_FILE"] = env_var
  97. if orig_socket_file is not None:
  98. MsgQ.SOCKET_FILE = orig_socket_file
  99. def test_open_socket_bad(self):
  100. msgq = MsgQ("/does/not/exist")
  101. self.assertRaises(socket.error, msgq.setup)
  102. class DummySocket:
  103. """
  104. Dummy socket class.
  105. This one does nothing at all, but some calls are used.
  106. It is mainly intended to override the listen socket for msgq, which
  107. we do not need in these tests.
  108. """
  109. def fileno():
  110. return -1
  111. def close():
  112. pass
  113. class BadSocket:
  114. """
  115. Special socket wrapper class. Once given a socket in its constructor,
  116. it completely behaves like that socket, except that its send() call
  117. will only actually send one byte per call, and optionally raise a given
  118. exception at a given time.
  119. """
  120. def __init__(self, real_socket, raise_on_send=0, send_exception=None):
  121. """
  122. Parameters:
  123. real_socket: The actual socket to wrap
  124. raise_on_send: integer. If send_exception is not None, it will be
  125. raised on this byte (i.e. 1 = on the first
  126. call to send(), 1 = on the 4th call to send)
  127. Note: if 0, send_exception will not be raised.
  128. send_exception: if not None, this exception will be raised
  129. (if raise_on_send is not 0)
  130. """
  131. self.socket = real_socket
  132. self.send_count = 0
  133. self.raise_on_send = raise_on_send
  134. self.send_exception = send_exception
  135. # completely wrap all calls and member access
  136. # (except explicitely overridden ones)
  137. def __getattr__(self, name, *args):
  138. attr = getattr(self.socket, name)
  139. if callable(attr):
  140. def callable_attr(*args):
  141. return attr.__call__(*args)
  142. return callable_attr
  143. else:
  144. return attr
  145. def send(self, data):
  146. self.send_count += 1
  147. if self.send_exception is not None and\
  148. self.send_count == self.raise_on_send:
  149. raise self.send_exception
  150. if len(data) > 0:
  151. return self.socket.send(data[:1])
  152. else:
  153. return 0
  154. class MsgQThread(threading.Thread):
  155. """
  156. Very simple thread class that runs msgq.run() when started,
  157. and stores the exception that msgq.run() raises, if any.
  158. """
  159. def __init__(self, msgq):
  160. threading.Thread.__init__(self)
  161. self.msgq_ = msgq
  162. self.stop = False
  163. self.caught_exception = None
  164. def run(self):
  165. try:
  166. while not self.stop:
  167. self.msgq_.run()
  168. except Exception as exc:
  169. self.caught_exception = exc
  170. class SendNonblock(unittest.TestCase):
  171. """
  172. Tests that the whole thing will not get blocked if someone does not read.
  173. """
  174. def terminate_check(self, task, timeout=30):
  175. """
  176. Runs task in separate process (task is a function) and checks
  177. it terminates sooner than timeout.
  178. """
  179. task_pid = os.fork()
  180. if task_pid == 0:
  181. # Kill the forked process after timeout by SIGALRM
  182. signal.alarm(timeout)
  183. # Run the task
  184. # If an exception happens or we run out of time, we terminate
  185. # with non-zero
  186. task()
  187. # If we got here, then everything worked well and in time
  188. # In that case, we terminate successfully
  189. os._exit(0) # needs exit code
  190. else:
  191. (pid, status) = os.waitpid(task_pid, 0)
  192. self.assertEqual(0, status,
  193. "The task did not complete successfully in time")
  194. def infinite_sender(self, sender):
  195. """
  196. Sends data until an exception happens. socket.error is caught,
  197. as it means the socket got closed. Sender is called to actually
  198. send the data.
  199. """
  200. msgq = MsgQ()
  201. # We do only partial setup, so we don't create the listening socket
  202. msgq.setup_poller()
  203. (read, write) = socket.socketpair(socket.AF_UNIX, socket.SOCK_STREAM)
  204. msgq.register_socket(write)
  205. # Keep sending while it is not closed by the msgq
  206. try:
  207. while True:
  208. sender(msgq, write)
  209. except socket.error:
  210. pass
  211. # Explicitly close temporary socket pair as the Python
  212. # interpreter expects it. It may not be 100% exception safe,
  213. # but since this is only for tests we prefer brevity.
  214. read.close()
  215. write.close()
  216. def test_infinite_sendmsg(self):
  217. """
  218. Tries sending messages (and not reading them) until it either times
  219. out (in blocking call, wrong) or closes it (correct).
  220. """
  221. data = "data"
  222. for i in range(1, 10):
  223. data += data
  224. self.terminate_check(lambda: self.infinite_sender(
  225. lambda msgq, socket: msgq.sendmsg(socket, {}, {"message" : data})))
  226. def test_infinite_sendprepared(self):
  227. """
  228. Tries sending data (and not reading them) until it either times
  229. out (in blocking call, wrong) or closes it (correct).
  230. """
  231. data = b"data"
  232. for i in range(1, 10):
  233. data += data
  234. self.terminate_check(lambda: self.infinite_sender(
  235. lambda msgq, socket: msgq.send_prepared_msg(socket, data)))
  236. def send_many(self, data):
  237. """
  238. Tries that sending a command many times and getting an answer works.
  239. """
  240. msgq = MsgQ()
  241. # msgq.run needs to compare with the listen_socket, so we provide
  242. # a replacement
  243. msgq.listen_socket = DummySocket
  244. (queue, out) = socket.socketpair(socket.AF_UNIX, socket.SOCK_STREAM)
  245. def run():
  246. length = len(data)
  247. queue_pid = os.fork()
  248. if queue_pid == 0:
  249. signal.alarm(120)
  250. msgq.setup_poller()
  251. msgq.register_socket(queue)
  252. msgq.run()
  253. else:
  254. try:
  255. def killall(signum, frame):
  256. os.kill(queue_pid, signal.SIGTERM)
  257. os._exit(1)
  258. signal.signal(signal.SIGALRM, killall)
  259. msg = msgq.preparemsg({"type" : "ping"}, data)
  260. now = time.clock()
  261. while time.clock() - now < 0.2:
  262. out.sendall(msg)
  263. # Check the answer
  264. (routing, received) = msgq.read_packet(out.fileno(),
  265. out)
  266. self.assertEqual({"type" : "pong"},
  267. isc.cc.message.from_wire(routing))
  268. self.assertEqual(data, received)
  269. finally:
  270. os.kill(queue_pid, signal.SIGTERM)
  271. self.terminate_check(run)
  272. # Explicitly close temporary socket pair as the Python
  273. # interpreter expects it. It may not be 100% exception safe,
  274. # but since this is only for tests we prefer brevity.
  275. queue.close()
  276. out.close()
  277. def test_small_sends(self):
  278. """
  279. Tests sending small data many times.
  280. """
  281. self.send_many(b"data")
  282. def test_large_sends(self):
  283. """
  284. Tests sending large data many times.
  285. """
  286. data = b"data"
  287. for i in range(1, 20):
  288. data = data + data
  289. self.send_many(data)
  290. def do_send(self, write, read, expect_arrive=True,
  291. expect_send_exception=None):
  292. """
  293. Makes a msgq object that is talking to itself,
  294. run it in a separate thread so we can use and
  295. test run().
  296. Parameters:
  297. write: a socket that is used to send the data to
  298. read: a socket that is used to read the data from
  299. expect_arrive: if True, the read socket is read from, and the data
  300. that is read is expected to be the same as the data
  301. that has been sent to the write socket.
  302. expect_send_exception: if not None, this is the exception that is
  303. expected to be raised by msgq
  304. """
  305. # Some message and envelope data to send and check
  306. env = b'{"env": "foo"}'
  307. msg = b'{"msg": "bar"}'
  308. msgq = MsgQ()
  309. # Don't need a listen_socket
  310. msgq.listen_socket = DummySocket
  311. msgq.setup_poller()
  312. msgq.register_socket(write)
  313. # Queue the message for sending
  314. msgq.sendmsg(write, env, msg)
  315. # Run it in a thread
  316. msgq_thread = MsgQThread(msgq)
  317. # If we're done, just kill it
  318. msgq_thread.daemon = True
  319. msgq_thread.start()
  320. if expect_arrive:
  321. (recv_env, recv_msg) = msgq.read_packet(read.fileno(),
  322. read)
  323. self.assertEqual(env, recv_env)
  324. self.assertEqual(msg, recv_msg)
  325. # Give it a chance to stop, if it doesn't, no problem, it'll
  326. # die when the program does
  327. msgq_thread.join(0.2)
  328. # Check the exception from the thread, if any
  329. self.assertEqual(expect_send_exception, msgq_thread.caught_exception)
  330. def do_send_with_send_error(self, raise_on_send, send_exception,
  331. expect_answer=True,
  332. expect_send_exception=None):
  333. """
  334. Sets up two connected sockets, wraps the sender socket into a BadSocket
  335. class, then performs a do_send() test.
  336. Parameters:
  337. raise_on_send: the byte at which send_exception should be raised
  338. (see BadSocket)
  339. send_exception: the exception to raise (see BadSocket)
  340. expect_answer: whether the send is expected to complete (and hence
  341. the read socket should get the message)
  342. expect_send_exception: the exception msgq is expected to raise when
  343. send_exception is raised by BadSocket.
  344. """
  345. (write, read) = socket.socketpair(socket.AF_UNIX, socket.SOCK_STREAM)
  346. # prevent the test from hanging if something goes wrong
  347. read.settimeout(0.2)
  348. write.settimeout(0.2)
  349. badwrite = BadSocket(write, raise_on_send, send_exception)
  350. self.do_send(badwrite, read, expect_answer, expect_send_exception)
  351. write.close()
  352. read.close()
  353. def test_send_raise_eagain(self):
  354. """
  355. Test whether msgq survices an EAGAIN socket error when sending.
  356. Two tests are done: one where EAGAIN is raised on the 3rd octet,
  357. and one on the 23rd.
  358. """
  359. sockerr = socket.error
  360. sockerr.errno = errno.EAGAIN
  361. self.do_send_with_send_error(3, sockerr)
  362. self.do_send_with_send_error(23, sockerr)
  363. def test_send_raise_ewouldblock(self):
  364. """
  365. Test whether msgq survices an EWOULDBLOCK socket error when sending.
  366. Two tests are done: one where EWOULDBLOCK is raised on the 3rd octet,
  367. and one on the 23rd.
  368. """
  369. sockerr = socket.error
  370. sockerr.errno = errno.EWOULDBLOCK
  371. self.do_send_with_send_error(3, sockerr)
  372. self.do_send_with_send_error(23, sockerr)
  373. def test_send_raise_pipe(self):
  374. """
  375. Test whether msgq survices an EPIPE socket error when sending.
  376. Two tests are done: one where EPIPE is raised on the 3rd octet,
  377. and one on the 23rd.
  378. """
  379. sockerr = socket.error
  380. sockerr.errno = errno.EPIPE
  381. self.do_send_with_send_error(3, sockerr, False)
  382. self.do_send_with_send_error(23, sockerr, False)
  383. def test_send_raise_exception(self):
  384. """
  385. Test whether msgq does NOT survive on a general exception.
  386. Note, perhaps it should; but we'd have to first discuss and decide
  387. how it should recover (i.e. drop the socket and consider the client
  388. dead?
  389. It may be a coding problem in msgq itself, and we certainly don't
  390. want to ignore those.
  391. """
  392. sockerr = Exception("just some general exception")
  393. self.do_send_with_send_error(3, sockerr, False, sockerr)
  394. self.do_send_with_send_error(23, sockerr, False, sockerr)
  395. if __name__ == '__main__':
  396. unittest.main()