msgq_test.py 29 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275276277278279280281282283284285286287288289290291292293294295296297298299300301302303304305306307308309310311312313314315316317318319320321322323324325326327328329330331332333334335336337338339340341342343344345346347348349350351352353354355356357358359360361362363364365366367368369370371372373374375376377378379380381382383384385386387388389390391392393394395396397398399400401402403404405406407408409410411412413414415416417418419420421422423424425426427428429430431432433434435436437438439440441442443444445446447448449450451452453454455456457458459460461462463464465466467468469470471472473474475476477478479480481482483484485486487488489490491492493494495496497498499500501502503504505506507508509510511512513514515516517518519520521522523524525526527528529530531532533534535536537538539540541542543544545546547548549550551552553554555556557558559560561562563564565566567568569570571572573574575576577578579580581582583584585586587588589590591592593594595596597598599600601602603604605606607608609610611612613614615616617618619620621622623624625626627628629630631632633634635636637638639640641642643644645646647648649650651652653654655656657658659660661662663664665666667668669670671672673674675676677678679680681682683684685686687688689690691692693694695696697698699700701702703704705706707708709710711712713714715716717718719720721722723724725726727728729730731732733734735736737738739740741742743744745746747748749750751752
  1. # Copyright (C) 2010-2013 Internet Systems Consortium.
  2. #
  3. # Permission to use, copy, modify, and distribute this software for any
  4. # purpose with or without fee is hereby granted, provided that the above
  5. # copyright notice and this permission notice appear in all copies.
  6. #
  7. # THE SOFTWARE IS PROVIDED "AS IS" AND INTERNET SYSTEMS CONSORTIUM
  8. # DISCLAIMS ALL WARRANTIES WITH REGARD TO THIS SOFTWARE INCLUDING ALL
  9. # IMPLIED WARRANTIES OF MERCHANTABILITY AND FITNESS. IN NO EVENT SHALL
  10. # INTERNET SYSTEMS CONSORTIUM BE LIABLE FOR ANY SPECIAL, DIRECT,
  11. # INDIRECT, OR CONSEQUENTIAL DAMAGES OR ANY DAMAGES WHATSOEVER RESULTING
  12. # FROM LOSS OF USE, DATA OR PROFITS, WHETHER IN AN ACTION OF CONTRACT,
  13. # NEGLIGENCE OR OTHER TORTIOUS ACTION, ARISING OUT OF OR IN CONNECTION
  14. # WITH THE USE OR PERFORMANCE OF THIS SOFTWARE.
  15. import msgq
  16. from msgq import SubscriptionManager, MsgQ
  17. import unittest
  18. import os
  19. import socket
  20. import signal
  21. import sys
  22. import time
  23. import errno
  24. import threading
  25. import isc.cc
  26. import collections
  27. import isc.log
  28. #
  29. # Currently only the subscription part and some sending is implemented...
  30. # I'd have to mock out a socket, which, while not impossible, is not trivial.
  31. #
  32. class TestSubscriptionManager(unittest.TestCase):
  33. def setUp(self):
  34. self.__cfgmgr_ready_called = 0
  35. self.sm = SubscriptionManager(self.cfgmgr_ready)
  36. def cfgmgr_ready(self):
  37. # Called one more time
  38. self.__cfgmgr_ready_called += 1
  39. def test_subscription_add_delete_manager(self):
  40. self.sm.subscribe("a", "*", 'sock1')
  41. self.assertEqual(self.sm.find_sub("a", "*"), [ 'sock1' ])
  42. def test_subscription_add_delete_other(self):
  43. self.sm.subscribe("a", "*", 'sock1')
  44. self.sm.unsubscribe("a", "*", 'sock2')
  45. self.assertEqual(self.sm.find_sub("a", "*"), [ 'sock1' ])
  46. def test_subscription_add_several_sockets(self):
  47. socks = [ 's1', 's2', 's3', 's4', 's5' ]
  48. for s in socks:
  49. self.sm.subscribe("a", "*", s)
  50. self.assertEqual(self.sm.find_sub("a", "*"), socks)
  51. def test_unsubscribe(self):
  52. socks = [ 's1', 's2', 's3', 's4', 's5' ]
  53. for s in socks:
  54. self.sm.subscribe("a", "*", s)
  55. self.sm.unsubscribe("a", "*", 's3')
  56. self.assertEqual(self.sm.find_sub("a", "*"), [ 's1', 's2', 's4', 's5' ])
  57. def test_unsubscribe_all(self):
  58. self.sm.subscribe('g1', 'i1', 's1')
  59. self.sm.subscribe('g1', 'i1', 's2')
  60. self.sm.subscribe('g1', 'i2', 's1')
  61. self.sm.subscribe('g1', 'i2', 's2')
  62. self.sm.subscribe('g2', 'i1', 's1')
  63. self.sm.subscribe('g2', 'i1', 's2')
  64. self.sm.subscribe('g2', 'i2', 's1')
  65. self.sm.subscribe('g2', 'i2', 's2')
  66. self.sm.unsubscribe_all('s1')
  67. self.assertEqual(self.sm.find_sub("g1", "i1"), [ 's2' ])
  68. self.assertEqual(self.sm.find_sub("g1", "i2"), [ 's2' ])
  69. self.assertEqual(self.sm.find_sub("g2", "i1"), [ 's2' ])
  70. self.assertEqual(self.sm.find_sub("g2", "i2"), [ 's2' ])
  71. def test_find(self):
  72. self.sm.subscribe('g1', 'i1', 's1')
  73. self.sm.subscribe('g1', '*', 's2')
  74. self.assertEqual(set(self.sm.find("g1", "i1")), set([ 's1', 's2' ]))
  75. def test_find_sub(self):
  76. self.sm.subscribe('g1', 'i1', 's1')
  77. self.sm.subscribe('g1', '*', 's2')
  78. self.assertEqual(self.sm.find_sub("g1", "i1"), [ 's1' ])
  79. def test_open_socket_parameter(self):
  80. self.assertFalse(os.path.exists("./my_socket_file"))
  81. msgq = MsgQ("./my_socket_file");
  82. msgq.setup()
  83. self.assertTrue(os.path.exists("./my_socket_file"))
  84. msgq.shutdown();
  85. self.assertFalse(os.path.exists("./my_socket_file"))
  86. def test_open_socket_environment_variable(self):
  87. self.assertFalse(os.path.exists("my_socket_file"))
  88. os.environ["BIND10_MSGQ_SOCKET_FILE"] = "./my_socket_file"
  89. msgq = MsgQ();
  90. msgq.setup()
  91. self.assertTrue(os.path.exists("./my_socket_file"))
  92. msgq.shutdown();
  93. self.assertFalse(os.path.exists("./my_socket_file"))
  94. def test_open_socket_default(self):
  95. env_var = None
  96. orig_socket_file = None
  97. if "BIND10_MSGQ_SOCKET_FILE" in os.environ:
  98. env_var = os.environ["BIND10_MSGQ_SOCKET_FILE"]
  99. del os.environ["BIND10_MSGQ_SOCKET_FILE"]
  100. # temporarily replace the class "default" not to be disrupted by
  101. # any running BIND 10 instance.
  102. if "BIND10_TEST_SOCKET_FILE" in os.environ:
  103. MsgQ.SOCKET_FILE = os.environ["BIND10_TEST_SOCKET_FILE"]
  104. socket_file = MsgQ.SOCKET_FILE
  105. self.assertFalse(os.path.exists(socket_file))
  106. msgq = MsgQ();
  107. try:
  108. msgq.setup()
  109. self.assertTrue(os.path.exists(socket_file))
  110. msgq.shutdown()
  111. self.assertFalse(os.path.exists(socket_file))
  112. except socket.error:
  113. # ok, the install path doesn't exist at all,
  114. # so we can't check any further
  115. pass
  116. if env_var is not None:
  117. os.environ["BIND10_MSGQ_SOCKET_FILE"] = env_var
  118. if orig_socket_file is not None:
  119. MsgQ.SOCKET_FILE = orig_socket_file
  120. def test_open_socket_bad(self):
  121. msgq = MsgQ("/does/not/exist")
  122. self.assertRaises(socket.error, msgq.setup)
  123. # But we can clean up after that.
  124. msgq.shutdown()
  125. def test_subscribe_cfgmgr(self):
  126. """Test special handling of the config manager. Once it subscribes,
  127. the message queue needs to connect and read the config. But not
  128. before and only once.
  129. """
  130. self.assertEqual(0, self.__cfgmgr_ready_called)
  131. # Not called when something else subscribes
  132. self.sm.subscribe('SomethingElse', '*', 's1')
  133. self.assertEqual(0, self.__cfgmgr_ready_called)
  134. # Called whenever the config manager subscribes
  135. self.sm.subscribe('ConfigManager', '*', 's2')
  136. self.assertEqual(1, self.__cfgmgr_ready_called)
  137. # But not called again when it subscribes again (should not
  138. # happen in practice, but we make sure anyway)
  139. self.sm.subscribe('ConfigManager', '*', 's3')
  140. self.assertEqual(1, self.__cfgmgr_ready_called)
  141. class DummySocket:
  142. """
  143. Dummy socket class.
  144. This one does nothing at all, but some calls are used.
  145. It is mainly intended to override the listen socket for msgq, which
  146. we do not need in these tests.
  147. """
  148. def fileno():
  149. return -1
  150. def close():
  151. pass
  152. class BadSocket:
  153. """
  154. Special socket wrapper class. Once given a socket in its constructor,
  155. it completely behaves like that socket, except that its send() call
  156. will only actually send one byte per call, and optionally raise a given
  157. exception at a given time.
  158. """
  159. def __init__(self, real_socket, raise_on_send=0, send_exception=None):
  160. """
  161. Parameters:
  162. real_socket: The actual socket to wrap
  163. raise_on_send: integer. If higher than 0, and send_exception is
  164. not None, send_exception will be raised on the
  165. 'raise_on_send'th call to send().
  166. send_exception: if not None, this exception will be raised
  167. (if raise_on_send is not 0)
  168. """
  169. self.socket = real_socket
  170. self.send_count = 0
  171. self.raise_on_send = raise_on_send
  172. self.send_exception = send_exception
  173. # completely wrap all calls and member access
  174. # (except explicitely overridden ones)
  175. def __getattr__(self, name, *args):
  176. attr = getattr(self.socket, name)
  177. if isinstance(attr, collections.Callable):
  178. def callable_attr(*args):
  179. return attr.__call__(*args)
  180. return callable_attr
  181. else:
  182. return attr
  183. def send(self, data):
  184. self.send_count += 1
  185. if self.send_exception is not None and\
  186. self.send_count == self.raise_on_send:
  187. raise self.send_exception
  188. if len(data) > 0:
  189. return self.socket.send(data[:1])
  190. else:
  191. return 0
  192. class MsgQThread(threading.Thread):
  193. """
  194. Very simple thread class that runs msgq.run() when started,
  195. and stores the exception that msgq.run() raises, if any.
  196. """
  197. def __init__(self, msgq):
  198. threading.Thread.__init__(self)
  199. self.msgq_ = msgq
  200. self.caught_exception = None
  201. self.lock = threading.Lock()
  202. def run(self):
  203. try:
  204. self.msgq_.run()
  205. except Exception as exc:
  206. # Store the exception to make the test fail if necessary
  207. self.caught_exception = exc
  208. def stop(self):
  209. self.msgq_.stop()
  210. class SendNonblock(unittest.TestCase):
  211. """
  212. Tests that the whole thing will not get blocked if someone does not read.
  213. """
  214. def terminate_check(self, task, timeout=30):
  215. """
  216. Runs task in separate process (task is a function) and checks
  217. it terminates sooner than timeout.
  218. """
  219. task_pid = os.fork()
  220. if task_pid == 0:
  221. # Kill the forked process after timeout by SIGALRM
  222. signal.alarm(timeout)
  223. # Run the task
  224. # If an exception happens or we run out of time, we terminate
  225. # with non-zero
  226. task()
  227. # If we got here, then everything worked well and in time
  228. # In that case, we terminate successfully
  229. os._exit(0) # needs exit code
  230. else:
  231. (pid, status) = os.waitpid(task_pid, 0)
  232. self.assertEqual(0, status,
  233. "The task did not complete successfully in time")
  234. def infinite_sender(self, sender):
  235. """
  236. Sends data until an exception happens. socket.error is caught,
  237. as it means the socket got closed. Sender is called to actually
  238. send the data.
  239. """
  240. msgq = MsgQ()
  241. # We do only partial setup, so we don't create the listening socket
  242. msgq.setup_poller()
  243. (read, write) = socket.socketpair(socket.AF_UNIX, socket.SOCK_STREAM)
  244. msgq.register_socket(write)
  245. # Keep sending while it is not closed by the msgq
  246. try:
  247. while True:
  248. sender(msgq, write)
  249. except socket.error:
  250. pass
  251. # Explicitly close temporary socket pair as the Python
  252. # interpreter expects it. It may not be 100% exception safe,
  253. # but since this is only for tests we prefer brevity.
  254. read.close()
  255. write.close()
  256. def test_infinite_sendmsg(self):
  257. """
  258. Tries sending messages (and not reading them) until it either times
  259. out (in blocking call, wrong) or closes it (correct).
  260. """
  261. data = "data"
  262. for i in range(1, 10):
  263. data += data
  264. self.terminate_check(lambda: self.infinite_sender(
  265. lambda msgq, socket: msgq.sendmsg(socket, {}, {"message" : data})))
  266. def test_infinite_sendprepared(self):
  267. """
  268. Tries sending data (and not reading them) until it either times
  269. out (in blocking call, wrong) or closes it (correct).
  270. """
  271. data = b"data"
  272. for i in range(1, 10):
  273. data += data
  274. self.terminate_check(lambda: self.infinite_sender(
  275. lambda msgq, socket: msgq.send_prepared_msg(socket, data)))
  276. def send_many(self, data):
  277. """
  278. Tries that sending a command many times and getting an answer works.
  279. """
  280. msgq = MsgQ()
  281. # msgq.run needs to compare with the listen_socket, so we provide
  282. # a replacement
  283. msgq.listen_socket = DummySocket
  284. (queue, out) = socket.socketpair(socket.AF_UNIX, socket.SOCK_STREAM)
  285. def run():
  286. length = len(data)
  287. queue_pid = os.fork()
  288. if queue_pid == 0:
  289. signal.alarm(120)
  290. msgq.setup_poller()
  291. msgq.setup_signalsock()
  292. msgq.register_socket(queue)
  293. msgq.run()
  294. msgq.cleanup_signalsock()
  295. else:
  296. try:
  297. def killall(signum, frame):
  298. os.kill(queue_pid, signal.SIGTERM)
  299. os._exit(1)
  300. signal.signal(signal.SIGALRM, killall)
  301. msg = msgq.preparemsg({"type" : "ping"}, data)
  302. now = time.clock()
  303. while time.clock() - now < 0.2:
  304. out.sendall(msg)
  305. # Check the answer
  306. (routing, received) = msgq.read_packet(out.fileno(),
  307. out)
  308. self.assertEqual({"type" : "pong"},
  309. isc.cc.message.from_wire(routing))
  310. self.assertEqual(data, received)
  311. finally:
  312. os.kill(queue_pid, signal.SIGTERM)
  313. self.terminate_check(run)
  314. # Explicitly close temporary socket pair as the Python
  315. # interpreter expects it. It may not be 100% exception safe,
  316. # but since this is only for tests we prefer brevity.
  317. queue.close()
  318. out.close()
  319. def test_small_sends(self):
  320. """
  321. Tests sending small data many times.
  322. """
  323. self.send_many(b"data")
  324. def test_large_sends(self):
  325. """
  326. Tests sending large data many times.
  327. """
  328. data = b"data"
  329. for i in range(1, 20):
  330. data = data + data
  331. self.send_many(data)
  332. def do_send(self, write, read, control_write, control_read,
  333. expect_arrive=True, expect_send_exception=None):
  334. """
  335. Makes a msgq object that is talking to itself,
  336. run it in a separate thread so we can use and
  337. test run().
  338. It is given two sets of connected sockets; write/read, and
  339. control_write/control_read. The former may be throwing errors
  340. and mangle data to test msgq. The second is mainly used to
  341. send msgq the stop command.
  342. (Note that the terms 'read' and 'write' are from the msgq
  343. point of view, so the test itself writes to 'control_read')
  344. Parameters:
  345. write: a socket that is used to send the data to
  346. read: a socket that is used to read the data from
  347. control_write: a second socket for communication with msgq
  348. control_read: a second socket for communication with msgq
  349. expect_arrive: if True, the read socket is read from, and the data
  350. that is read is expected to be the same as the data
  351. that has been sent to the write socket.
  352. expect_send_exception: if not None, this is the exception that is
  353. expected to be raised by msgq
  354. """
  355. # Some message and envelope data to send and check
  356. env = b'{"env": "foo"}'
  357. msg = b'{"msg": "bar"}'
  358. msgq = MsgQ()
  359. # Don't need a listen_socket
  360. msgq.listen_socket = DummySocket
  361. msgq.setup_poller()
  362. msgq.setup_signalsock()
  363. msgq.register_socket(write)
  364. msgq.register_socket(control_write)
  365. # Queue the message for sending
  366. msgq.sendmsg(write, env, msg)
  367. # Run it in a thread
  368. msgq_thread = MsgQThread(msgq)
  369. # If we're done, just kill it
  370. msgq_thread.start()
  371. if expect_arrive:
  372. (recv_env, recv_msg) = msgq.read_packet(read.fileno(),
  373. read)
  374. self.assertEqual(env, recv_env)
  375. self.assertEqual(msg, recv_msg)
  376. # Tell msgq to stop
  377. msg = msgq.preparemsg({"type" : "stop"})
  378. control_read.sendall(msg)
  379. # Wait for thread to stop if it hasn't already.
  380. # Put in a (long) timeout; the thread *should* stop, but if it
  381. # does not, we don't want the test to hang forever
  382. msgq_thread.join(60)
  383. # Fail the test if it didn't stop
  384. self.assertFalse(msgq_thread.isAlive(), "Thread did not stop")
  385. # Clean up some internals of msgq (usually called as part of
  386. # shutdown, but we skip that one here)
  387. msgq.cleanup_signalsock()
  388. # Check the exception from the thread, if any
  389. # First, if we didn't expect it; reraise it (to make test fail and
  390. # show the stacktrace for debugging)
  391. if expect_send_exception is None:
  392. if msgq_thread.caught_exception is not None:
  393. raise msgq_thread.caught_exception
  394. else:
  395. # If we *did* expect it, fail it there was none
  396. self.assertIsNotNone(msgq_thread.caught_exception)
  397. def do_send_with_send_error(self, raise_on_send, send_exception,
  398. expect_answer=True,
  399. expect_send_exception=None):
  400. """
  401. Sets up two connected sockets, wraps the sender socket into a BadSocket
  402. class, then performs a do_send() test.
  403. Parameters:
  404. raise_on_send: the byte at which send_exception should be raised
  405. (see BadSocket)
  406. send_exception: the exception to raise (see BadSocket)
  407. expect_answer: whether the send is expected to complete (and hence
  408. the read socket should get the message)
  409. expect_send_exception: the exception msgq is expected to raise when
  410. send_exception is raised by BadSocket.
  411. """
  412. (write, read) = socket.socketpair(socket.AF_UNIX, socket.SOCK_STREAM)
  413. (control_write, control_read) = socket.socketpair(socket.AF_UNIX, socket.SOCK_STREAM)
  414. badwrite = BadSocket(write, raise_on_send, send_exception)
  415. self.do_send(badwrite, read, control_write, control_read, expect_answer, expect_send_exception)
  416. write.close()
  417. read.close()
  418. control_write.close()
  419. control_read.close()
  420. def test_send_raise_recoverable(self):
  421. """
  422. Test whether msgq survices a recoverable socket errors when sending.
  423. Two tests are done: one where the error is raised on the 3rd octet,
  424. and one on the 23rd.
  425. """
  426. for err in [ errno.EAGAIN, errno.EWOULDBLOCK, errno.EINTR ]:
  427. sockerr = socket.error(err, 'Socket error')
  428. self.do_send_with_send_error(3, sockerr)
  429. self.do_send_with_send_error(23, sockerr)
  430. def test_send_raise_nonrecoverable(self):
  431. """
  432. Test whether msgq survives socket errors that are nonrecoverable
  433. (for said socket that is, i.e. EPIPE etc).
  434. Two tests are done: one where the error is raised on the 3rd octet,
  435. and one on the 23rd.
  436. """
  437. for err in [ errno.EPIPE, errno.ENOBUFS, errno.ECONNRESET ]:
  438. sockerr = socket.error(err, 'Socket error')
  439. self.do_send_with_send_error(3, sockerr, False)
  440. self.do_send_with_send_error(23, sockerr, False)
  441. def otest_send_raise_crash(self):
  442. """
  443. Test whether msgq does NOT survive on a general exception.
  444. Note, perhaps it should; but we'd have to first discuss and decide
  445. how it should recover (i.e. drop the socket and consider the client
  446. dead?
  447. It may be a coding problem in msgq itself, and we certainly don't
  448. want to ignore those.
  449. """
  450. sockerr = Exception("just some general exception")
  451. self.do_send_with_send_error(3, sockerr, False, sockerr)
  452. self.do_send_with_send_error(23, sockerr, False, sockerr)
  453. class ThreadTests(unittest.TestCase):
  454. """Test various things around thread synchronization."""
  455. def setUp(self):
  456. self.__msgq = MsgQ()
  457. self.__abort_wait = False
  458. self.__result = None
  459. self.__notify_thread = threading.Thread(target=self.__notify)
  460. self.__wait_thread = threading.Thread(target=self.__wait)
  461. # Make sure the threads are killed if left behind by the test.
  462. self.__notify_thread.daemon = True
  463. self.__wait_thread.daemon = True
  464. def __notify(self):
  465. """Call the cfgmgr_ready."""
  466. if self.__abort_wait:
  467. self.__msgq.cfgmgr_ready(False)
  468. else:
  469. self.__msgq.cfgmgr_ready()
  470. def __wait(self):
  471. """Wait for config manager and store the result."""
  472. self.__result = self.__msgq.wait_cfgmgr()
  473. def test_wait_cfgmgr(self):
  474. """One thread signals the config manager subscribed, the other
  475. waits for it. We then check it terminated correctly.
  476. """
  477. self.__notify_thread.start()
  478. self.__wait_thread.start()
  479. # Timeout to ensure the test terminates even on failure
  480. self.__wait_thread.join(60)
  481. self.assertTrue(self.__result)
  482. def test_wait_cfgmgr_2(self):
  483. """Same as test_wait_cfgmgr, but starting the threads in reverse order
  484. (the result should be the same).
  485. """
  486. self.__wait_thread.start()
  487. self.__notify_thread.start()
  488. # Timeout to ensure the test terminates even on failure
  489. self.__wait_thread.join(60)
  490. self.assertTrue(self.__result)
  491. def test_wait_abort(self):
  492. """Similar to test_wait_cfgmgr, but the config manager is never
  493. subscribed and it is aborted.
  494. """
  495. self.__abort_wait = True
  496. self.__wait_thread.start()
  497. self.__notify_thread.start()
  498. # Timeout to ensure the test terminates even on failure
  499. self.__wait_thread.join(60)
  500. self.assertIsNotNone(self.__result)
  501. self.assertFalse(self.__result)
  502. def __check_ready_and_abort(self):
  503. """Check that when we first say the config manager is ready and then
  504. try to abort, it uses the first result.
  505. """
  506. self.__msgq.cfgmgr_ready()
  507. self.__msgq.cfgmgr_ready(False)
  508. self.__result = self.__msgq.wait_cfgmgr()
  509. def test_ready_and_abort(self):
  510. """Perform the __check_ready_and_abort test, but in a separate thread,
  511. so in case something goes wrong with the synchronisation and it
  512. deadlocks, the test will terminate anyway.
  513. """
  514. test_thread = threading.Thread(target=self.__check_ready_and_abort)
  515. test_thread.daemon = True
  516. test_thread.start()
  517. test_thread.join(60)
  518. self.assertTrue(self.__result)
  519. class SocketTests(unittest.TestCase):
  520. '''Test cases for micro behaviors related to socket operations.
  521. Some cases are covered as part of other tests, but in this fixture
  522. we check more details of specific method related to socket operation,
  523. with the help of mock classes to avoid expensive overhead.
  524. '''
  525. class MockSocket():
  526. '''A mock socket used instead of standard socket objects.'''
  527. def __init__(self):
  528. self.ex_on_send = None # raised from send() if not None
  529. self.recv_result = b'test' # dummy data or exception
  530. self.blockings = [] # history of setblocking() params
  531. def setblocking(self, on):
  532. self.blockings.append(on)
  533. def send(self, data):
  534. if self.ex_on_send is not None:
  535. raise self.ex_on_send
  536. return 10 # arbitrary choice
  537. def recv(self, len):
  538. if isinstance(self.recv_result, Exception):
  539. raise self.recv_result
  540. ret = self.recv_result
  541. self.recv_result = b'' # if called again, return empty data
  542. return ret
  543. def fileno(self):
  544. return 42 # arbitrary choice
  545. class LoggerWrapper():
  546. '''A simple wrapper of logger to inspect log messages.'''
  547. def __init__(self, logger):
  548. self.error_called = 0
  549. self.warn_called = 0
  550. self.debug_called = 0
  551. self.orig_logger = logger
  552. def error(self, *args):
  553. self.error_called += 1
  554. self.orig_logger.error(*args)
  555. def warn(self, *args):
  556. self.warn_called += 1
  557. self.orig_logger.warn(*args)
  558. def debug(self, *args):
  559. self.debug_called += 1
  560. self.orig_logger.debug(*args)
  561. def mock_kill_socket(self, fileno, sock):
  562. '''A replacement of MsgQ.kill_socket method for inspection.'''
  563. self.__killed_socket = (fileno, sock)
  564. if fileno in self.__msgq.sockets:
  565. del self.__msgq.sockets[fileno]
  566. def setUp(self):
  567. self.__msgq = MsgQ()
  568. self.__msgq.kill_socket = self.mock_kill_socket
  569. self.__sock = self.MockSocket()
  570. self.__data = b'dummy'
  571. self.__msgq.sockets[42] = self.__sock
  572. self.__msgq.sendbuffs[42] = (None, b'testdata')
  573. self.__sock_error = socket.error()
  574. self.__killed_socket = None
  575. self.__logger = self.LoggerWrapper(msgq.logger)
  576. msgq.logger = self.__logger
  577. def tearDown(self):
  578. msgq.logger = self.__logger.orig_logger
  579. def test_send_data(self):
  580. # Successful case: _send_data() returns the hardcoded value, and
  581. # setblocking() is called twice with the expected parameters
  582. self.assertEqual(10, self.__msgq._send_data(self.__sock, self.__data))
  583. self.assertEqual([0, 1], self.__sock.blockings)
  584. self.assertIsNone(self.__killed_socket)
  585. def test_send_data_interrupt(self):
  586. '''send() is interruptted. send_data() returns 0, sock isn't killed.'''
  587. expected_blockings = []
  588. for eno in [errno.EAGAIN, errno.EWOULDBLOCK, errno.EINTR]:
  589. self.__sock_error.errno = eno
  590. self.__sock.ex_on_send = self.__sock_error
  591. self.assertEqual(0, self.__msgq._send_data(self.__sock,
  592. self.__data))
  593. expected_blockings.extend([0, 1])
  594. self.assertEqual(expected_blockings, self.__sock.blockings)
  595. self.assertIsNone(self.__killed_socket)
  596. def test_send_data_error(self):
  597. '''Unexpected error happens on send(). The socket is killed.
  598. If the error is EPIPE, it's logged at the warn level; otherwise
  599. an error message is logged.
  600. '''
  601. expected_blockings = []
  602. expected_errors = 0
  603. expected_warns = 0
  604. for eno in [errno.EPIPE, errno.ECONNRESET, errno.ENOBUFS]:
  605. self.__sock_error.errno = eno
  606. self.__sock.ex_on_send = self.__sock_error
  607. self.__killed_socket = None # clear any previuos value
  608. self.assertEqual(None, self.__msgq._send_data(self.__sock,
  609. self.__data))
  610. self.assertEqual((42, self.__sock), self.__killed_socket)
  611. expected_blockings.extend([0, 1])
  612. self.assertEqual(expected_blockings, self.__sock.blockings)
  613. if eno == errno.EPIPE:
  614. expected_warns += 1
  615. else:
  616. expected_errors += 1
  617. self.assertEqual(expected_errors, self.__logger.error_called)
  618. self.assertEqual(expected_warns, self.__logger.warn_called)
  619. def test_process_fd_read_after_bad_write(self):
  620. '''Check the specific case of write fail followed by read attempt.
  621. The write failure results in kill_socket, then read shouldn't tried.
  622. '''
  623. self.__sock_error.errno = errno.EPIPE
  624. self.__sock.ex_on_send = self.__sock_error
  625. self.__msgq.process_socket = None # if called, trigger an exception
  626. self.__msgq._process_fd(42, True, True, False) # shouldn't crash
  627. # check the socket is deleted from the fileno=>sock dictionary
  628. self.assertEqual({}, self.__msgq.sockets)
  629. def test_process_fd_close_after_bad_write(self):
  630. '''Similar to the previous, but for checking dup'ed kill attempt'''
  631. self.__sock_error.errno = errno.EPIPE
  632. self.__sock.ex_on_send = self.__sock_error
  633. self.__msgq._process_fd(42, True, False, True) # shouldn't crash
  634. self.assertEqual({}, self.__msgq.sockets)
  635. def test_process_fd_writer_after_close(self):
  636. '''Emulate a "writable" socket has been already closed and killed.'''
  637. # This just shouldn't crash
  638. self.__msgq._process_fd(4200, True, False, False)
  639. def test_process_packet(self):
  640. '''Check some failure cases in handling an incoming message.'''
  641. expected_errors = 0
  642. expected_debugs = 0
  643. # if socket.recv() fails due to socket.error, it will be logged
  644. # as error and the socket will be killed regardless of errno.
  645. for eno in [errno.ENOBUFS, errno.ECONNRESET]:
  646. self.__sock_error.errno = eno
  647. self.__sock.recv_result = self.__sock_error
  648. self.__killed_socket = None # clear any previuos value
  649. self.__msgq.process_packet(42, self.__sock)
  650. self.assertEqual((42, self.__sock), self.__killed_socket)
  651. expected_errors += 1
  652. self.assertEqual(expected_errors, self.__logger.error_called)
  653. self.assertEqual(expected_debugs, self.__logger.debug_called)
  654. # if socket.recv() returns empty data, the result depends on whether
  655. # there's any preceding data; in the second case below, at least
  656. # 6 bytes of data will be expected, and the second call to our faked
  657. # recv() returns empty data. In that case it will be logged as error.
  658. for recv_data in [b'', b'short']:
  659. self.__sock.recv_result = recv_data
  660. self.__killed_socket = None
  661. self.__msgq.process_packet(42, self.__sock)
  662. self.assertEqual((42, self.__sock), self.__killed_socket)
  663. if len(recv_data) == 0:
  664. expected_debugs += 1
  665. else:
  666. expected_errors += 1
  667. self.assertEqual(expected_errors, self.__logger.error_called)
  668. self.assertEqual(expected_debugs, self.__logger.debug_called)
  669. if __name__ == '__main__':
  670. isc.log.resetUnitTestRootLogger()
  671. unittest.main()