msgq_test.py 38 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275276277278279280281282283284285286287288289290291292293294295296297298299300301302303304305306307308309310311312313314315316317318319320321322323324325326327328329330331332333334335336337338339340341342343344345346347348349350351352353354355356357358359360361362363364365366367368369370371372373374375376377378379380381382383384385386387388389390391392393394395396397398399400401402403404405406407408409410411412413414415416417418419420421422423424425426427428429430431432433434435436437438439440441442443444445446447448449450451452453454455456457458459460461462463464465466467468469470471472473474475476477478479480481482483484485486487488489490491492493494495496497498499500501502503504505506507508509510511512513514515516517518519520521522523524525526527528529530531532533534535536537538539540541542543544545546547548549550551552553554555556557558559560561562563564565566567568569570571572573574575576577578579580581582583584585586587588589590591592593594595596597598599600601602603604605606607608609610611612613614615616617618619620621622623624625626627628629630631632633634635636637638639640641642643644645646647648649650651652653654655656657658659660661662663664665666667668669670671672673674675676677678679680681682683684685686687688689690691692693694695696697698699700701702703704705706707708709710711712713714715716717718719720721722723724725726727728729730731732733734735736737738739740741742743744745746747748749750751752753754755756757758759760761762763764765766767768769770771772773774775776777778779780781782783784785786787788789790791792793794795796797798799800801802803804805806807808809810811812813814815816817818819820821822823824825826827828829830831832833834835836837838839840841842843844845846847848849850851852853854855856857858859860861862863864865866867868869870871872873874875876877878879880881882883884885886887888889890891892893894895896897898899900901902903904905906907908909910911912913914915916917918919920921922923924925926927928929930931932933934935936937938939940941942943944945946947948949950951952953954955956957
  1. # Copyright (C) 2010-2013 Internet Systems Consortium.
  2. #
  3. # Permission to use, copy, modify, and distribute this software for any
  4. # purpose with or without fee is hereby granted, provided that the above
  5. # copyright notice and this permission notice appear in all copies.
  6. #
  7. # THE SOFTWARE IS PROVIDED "AS IS" AND INTERNET SYSTEMS CONSORTIUM
  8. # DISCLAIMS ALL WARRANTIES WITH REGARD TO THIS SOFTWARE INCLUDING ALL
  9. # IMPLIED WARRANTIES OF MERCHANTABILITY AND FITNESS. IN NO EVENT SHALL
  10. # INTERNET SYSTEMS CONSORTIUM BE LIABLE FOR ANY SPECIAL, DIRECT,
  11. # INDIRECT, OR CONSEQUENTIAL DAMAGES OR ANY DAMAGES WHATSOEVER RESULTING
  12. # FROM LOSS OF USE, DATA OR PROFITS, WHETHER IN AN ACTION OF CONTRACT,
  13. # NEGLIGENCE OR OTHER TORTIOUS ACTION, ARISING OUT OF OR IN CONNECTION
  14. # WITH THE USE OR PERFORMANCE OF THIS SOFTWARE.
  15. import msgq
  16. from msgq import SubscriptionManager, MsgQ
  17. import unittest
  18. import os
  19. import socket
  20. import signal
  21. import sys
  22. import time
  23. import errno
  24. import threading
  25. import isc.cc
  26. import collections
  27. import isc.log
  28. import struct
  29. import json
  30. #
  31. # Currently only the subscription part and some sending is implemented...
  32. # I'd have to mock out a socket, which, while not impossible, is not trivial.
  33. #
  34. class TestSubscriptionManager(unittest.TestCase):
  35. def setUp(self):
  36. self.__cfgmgr_ready_called = 0
  37. self.sm = SubscriptionManager(self.cfgmgr_ready)
  38. def cfgmgr_ready(self):
  39. # Called one more time
  40. self.__cfgmgr_ready_called += 1
  41. def test_subscription_add_delete_manager(self):
  42. self.sm.subscribe("a", "*", 'sock1')
  43. self.assertEqual(self.sm.find_sub("a", "*"), [ 'sock1' ])
  44. def test_subscription_add_delete_other(self):
  45. self.sm.subscribe("a", "*", 'sock1')
  46. self.sm.unsubscribe("a", "*", 'sock2')
  47. self.assertEqual(self.sm.find_sub("a", "*"), [ 'sock1' ])
  48. def test_subscription_add_several_sockets(self):
  49. socks = [ 's1', 's2', 's3', 's4', 's5' ]
  50. for s in socks:
  51. self.sm.subscribe("a", "*", s)
  52. self.assertEqual(self.sm.find_sub("a", "*"), socks)
  53. def test_unsubscribe(self):
  54. socks = [ 's1', 's2', 's3', 's4', 's5' ]
  55. for s in socks:
  56. self.sm.subscribe("a", "*", s)
  57. self.sm.unsubscribe("a", "*", 's3')
  58. self.assertEqual(self.sm.find_sub("a", "*"), [ 's1', 's2', 's4', 's5' ])
  59. def test_unsubscribe_all(self):
  60. self.sm.subscribe('g1', 'i1', 's1')
  61. self.sm.subscribe('g1', 'i1', 's2')
  62. self.sm.subscribe('g1', 'i2', 's1')
  63. self.sm.subscribe('g1', 'i2', 's2')
  64. self.sm.subscribe('g2', 'i1', 's1')
  65. self.sm.subscribe('g2', 'i1', 's2')
  66. self.sm.subscribe('g2', 'i2', 's1')
  67. self.sm.subscribe('g2', 'i2', 's2')
  68. self.sm.unsubscribe_all('s1')
  69. self.assertEqual(self.sm.find_sub("g1", "i1"), [ 's2' ])
  70. self.assertEqual(self.sm.find_sub("g1", "i2"), [ 's2' ])
  71. self.assertEqual(self.sm.find_sub("g2", "i1"), [ 's2' ])
  72. self.assertEqual(self.sm.find_sub("g2", "i2"), [ 's2' ])
  73. def test_find(self):
  74. self.sm.subscribe('g1', 'i1', 's1')
  75. self.sm.subscribe('g1', '*', 's2')
  76. self.assertEqual(set(self.sm.find("g1", "i1")), set([ 's1', 's2' ]))
  77. def test_find_sub(self):
  78. self.sm.subscribe('g1', 'i1', 's1')
  79. self.sm.subscribe('g1', '*', 's2')
  80. self.assertEqual(self.sm.find_sub("g1", "i1"), [ 's1' ])
  81. def test_open_socket_parameter(self):
  82. self.assertFalse(os.path.exists("./my_socket_file"))
  83. msgq = MsgQ("./my_socket_file");
  84. msgq.setup()
  85. self.assertTrue(os.path.exists("./my_socket_file"))
  86. msgq.shutdown();
  87. self.assertFalse(os.path.exists("./my_socket_file"))
  88. def test_open_socket_environment_variable(self):
  89. self.assertFalse(os.path.exists("my_socket_file"))
  90. os.environ["BIND10_MSGQ_SOCKET_FILE"] = "./my_socket_file"
  91. msgq = MsgQ();
  92. msgq.setup()
  93. self.assertTrue(os.path.exists("./my_socket_file"))
  94. msgq.shutdown();
  95. self.assertFalse(os.path.exists("./my_socket_file"))
  96. def test_open_socket_default(self):
  97. env_var = None
  98. orig_socket_file = None
  99. if "BIND10_MSGQ_SOCKET_FILE" in os.environ:
  100. env_var = os.environ["BIND10_MSGQ_SOCKET_FILE"]
  101. del os.environ["BIND10_MSGQ_SOCKET_FILE"]
  102. # temporarily replace the class "default" not to be disrupted by
  103. # any running BIND 10 instance.
  104. if "BIND10_TEST_SOCKET_FILE" in os.environ:
  105. MsgQ.SOCKET_FILE = os.environ["BIND10_TEST_SOCKET_FILE"]
  106. socket_file = MsgQ.SOCKET_FILE
  107. self.assertFalse(os.path.exists(socket_file))
  108. msgq = MsgQ();
  109. try:
  110. msgq.setup()
  111. self.assertTrue(os.path.exists(socket_file))
  112. msgq.shutdown()
  113. self.assertFalse(os.path.exists(socket_file))
  114. except socket.error:
  115. # ok, the install path doesn't exist at all,
  116. # so we can't check any further
  117. pass
  118. if env_var is not None:
  119. os.environ["BIND10_MSGQ_SOCKET_FILE"] = env_var
  120. if orig_socket_file is not None:
  121. MsgQ.SOCKET_FILE = orig_socket_file
  122. def test_open_socket_bad(self):
  123. msgq = MsgQ("/does/not/exist")
  124. self.assertRaises(socket.error, msgq.setup)
  125. # But we can clean up after that.
  126. msgq.shutdown()
  127. def test_subscribe_cfgmgr(self):
  128. """Test special handling of the config manager. Once it subscribes,
  129. the message queue needs to connect and read the config. But not
  130. before and only once.
  131. """
  132. self.assertEqual(0, self.__cfgmgr_ready_called)
  133. # Not called when something else subscribes
  134. self.sm.subscribe('SomethingElse', '*', 's1')
  135. self.assertEqual(0, self.__cfgmgr_ready_called)
  136. # Called whenever the config manager subscribes
  137. self.sm.subscribe('ConfigManager', '*', 's2')
  138. self.assertEqual(1, self.__cfgmgr_ready_called)
  139. # But not called again when it subscribes again (should not
  140. # happen in practice, but we make sure anyway)
  141. self.sm.subscribe('ConfigManager', '*', 's3')
  142. self.assertEqual(1, self.__cfgmgr_ready_called)
  143. class MsgQTest(unittest.TestCase):
  144. """
  145. Tests for the behaviour of MsgQ. This is for the core of MsgQ, other
  146. subsystems are in separate test fixtures.
  147. """
  148. def setUp(self):
  149. self.__msgq = MsgQ()
  150. def parse_msg(self, msg):
  151. """
  152. Parse a binary representation of message to the routing header and the
  153. data payload. It assumes the message is correctly encoded and the
  154. payload is not omitted. It'd probably throw in other cases, but we
  155. don't use it in such situations in this test.
  156. """
  157. (length, header_len) = struct.unpack('>IH', msg[:6])
  158. header = json.loads(msg[6:6 + header_len].decode('utf-8'))
  159. data = json.loads(msg[6 + header_len:].decode('utf-8'))
  160. return (header, data)
  161. def test_unknown_command(self):
  162. """
  163. Test the command handler returns error when the command is unknown.
  164. """
  165. # Fake we are running, to disable test workarounds
  166. self.__msgq.running = True
  167. self.assertEqual({'result': [1, "unknown command: unknown"]},
  168. self.__msgq.command_handler('unknown', {}))
  169. def test_undeliverable_errors(self):
  170. """
  171. Send several packets through the MsgQ and check it generates
  172. undeliverable notifications under the correct circumstances.
  173. The test is not exhaustive as it doesn't test all combination
  174. of existence of the recipient, addressing schemes, want_answer
  175. header and the reply header. It is not needed, these should
  176. be mostly independent. That means, for example, if the message
  177. is a reply and there's no recipient to send it to, the error
  178. would not be generated no matter if we addressed the recipient
  179. by lname or group. If we included everything, the test would
  180. have too many scenarios with little benefit.
  181. """
  182. self.__sent_messages = []
  183. def fake_send_prepared_msg(socket, msg):
  184. self.__sent_messages.append((socket, msg))
  185. return True
  186. self.__msgq.send_prepared_msg = fake_send_prepared_msg
  187. # These would be real sockets in the MsgQ, but we pass them as
  188. # parameters only, so we don't need them to be. We use simple
  189. # integers to tell one from another.
  190. sender = 1
  191. recipient = 2
  192. another_recipiet = 3
  193. # The routing headers and data to test with.
  194. routing = {
  195. 'to': '*',
  196. 'from': 'sender',
  197. 'group': 'group',
  198. 'instance': '*',
  199. 'seq': 42
  200. }
  201. data = {
  202. "data": "Just some data"
  203. }
  204. # Some common checking patterns
  205. def check_error():
  206. self.assertEqual(1, len(self.__sent_messages))
  207. self.assertEqual(1, self.__sent_messages[0][0])
  208. self.assertEqual(({
  209. 'group': 'group',
  210. 'instance': '*',
  211. 'reply': 42,
  212. 'seq': 42,
  213. 'from': 'msgq',
  214. 'to': 'sender',
  215. 'want_answer': True
  216. }, {'result': [-1, "No such recipient"]}),
  217. self.parse_msg(self.__sent_messages[0][1]))
  218. self.__sent_messages = []
  219. def check_no_message():
  220. self.assertEqual([], self.__sent_messages)
  221. def check_delivered(rcpt_socket=recipient):
  222. self.assertEqual(1, len(self.__sent_messages))
  223. self.assertEqual(rcpt_socket, self.__sent_messages[0][0])
  224. self.assertEqual((routing, data),
  225. self.parse_msg(self.__sent_messages[0][1]))
  226. self.__sent_messages = []
  227. # Send the message. No recipient, but errors are not requested,
  228. # so none is generated.
  229. self.__msgq.process_command_send(sender, routing, data)
  230. check_no_message()
  231. # It should act the same if we explicitly say we do not want replies.
  232. routing["want_answer"] = False
  233. self.__msgq.process_command_send(sender, routing, data)
  234. check_no_message()
  235. # Ask for errors if it can't be delivered.
  236. routing["want_answer"] = True
  237. self.__msgq.process_command_send(sender, routing, data)
  238. check_error()
  239. # If the message is a reply itself, we never generate the errors
  240. routing["reply"] = 3
  241. self.__msgq.process_command_send(sender, routing, data)
  242. check_no_message()
  243. # If there are recipients (but no "reply" header), the error should not
  244. # be sent and the message should get delivered.
  245. del routing["reply"]
  246. self.__msgq.subs.find = lambda group, instance: [recipient]
  247. self.__msgq.process_command_send(sender, routing, data)
  248. check_delivered()
  249. # When we send a direct message and the recipient is not there, we get
  250. # the error too
  251. routing["to"] = "lname"
  252. self.__msgq.process_command_send(sender, routing, data)
  253. check_error()
  254. # But when the recipient is there, it is delivered and no error is
  255. # generated.
  256. self.__msgq.lnames["lname"] = recipient
  257. self.__msgq.process_command_send(sender, routing, data)
  258. check_delivered()
  259. # If an attempt to send fails, consider it no recipient.
  260. def fail_send_prepared_msg(socket, msg):
  261. '''
  262. Pretend sending a message failed. After one call, return to the
  263. usual mock, so the errors or other messages can be sent.
  264. '''
  265. self.__msgq.send_prepared_msg = fake_send_prepared_msg
  266. return False
  267. self.__msgq.send_prepared_msg = fail_send_prepared_msg
  268. self.__msgq.process_command_send(sender, routing, data)
  269. check_error()
  270. # But if there are more recipients and only one fails, it should
  271. # be delivered to the other and not considered an error
  272. self.__msgq.send_prepared_msg = fail_send_prepared_msg
  273. routing["to"] = '*'
  274. self.__msgq.subs.find = lambda group, instance: [recipient,
  275. another_recipiet]
  276. self.__msgq.process_command_send(sender, routing, data)
  277. check_delivered(rcpt_socket=another_recipiet)
  278. class DummySocket:
  279. """
  280. Dummy socket class.
  281. This one does nothing at all, but some calls are used.
  282. It is mainly intended to override the listen socket for msgq, which
  283. we do not need in these tests.
  284. """
  285. def fileno():
  286. return -1
  287. def close():
  288. pass
  289. class BadSocket:
  290. """
  291. Special socket wrapper class. Once given a socket in its constructor,
  292. it completely behaves like that socket, except that its send() call
  293. will only actually send one byte per call, and optionally raise a given
  294. exception at a given time.
  295. """
  296. def __init__(self, real_socket, raise_on_send=0, send_exception=None):
  297. """
  298. Parameters:
  299. real_socket: The actual socket to wrap
  300. raise_on_send: integer. If higher than 0, and send_exception is
  301. not None, send_exception will be raised on the
  302. 'raise_on_send'th call to send().
  303. send_exception: if not None, this exception will be raised
  304. (if raise_on_send is not 0)
  305. """
  306. self.socket = real_socket
  307. self.send_count = 0
  308. self.raise_on_send = raise_on_send
  309. self.send_exception = send_exception
  310. # completely wrap all calls and member access
  311. # (except explicitly overridden ones)
  312. def __getattr__(self, name, *args):
  313. attr = getattr(self.socket, name)
  314. if isinstance(attr, collections.Callable):
  315. def callable_attr(*args):
  316. return attr.__call__(*args)
  317. return callable_attr
  318. else:
  319. return attr
  320. def send(self, data):
  321. self.send_count += 1
  322. if self.send_exception is not None and\
  323. self.send_count == self.raise_on_send:
  324. raise self.send_exception
  325. if len(data) > 0:
  326. return self.socket.send(data[:1])
  327. else:
  328. return 0
  329. class MsgQThread(threading.Thread):
  330. """
  331. Very simple thread class that runs msgq.run() when started,
  332. and stores the exception that msgq.run() raises, if any.
  333. """
  334. def __init__(self, msgq):
  335. threading.Thread.__init__(self)
  336. self.msgq_ = msgq
  337. self.caught_exception = None
  338. self.lock = threading.Lock()
  339. def run(self):
  340. try:
  341. self.msgq_.run()
  342. except Exception as exc:
  343. # Store the exception to make the test fail if necessary
  344. self.caught_exception = exc
  345. def stop(self):
  346. self.msgq_.stop()
  347. class SendNonblock(unittest.TestCase):
  348. """
  349. Tests that the whole thing will not get blocked if someone does not read.
  350. """
  351. def terminate_check(self, task, timeout=30):
  352. """
  353. Runs task in separate process (task is a function) and checks
  354. it terminates sooner than timeout.
  355. """
  356. task_pid = os.fork()
  357. if task_pid == 0:
  358. # Kill the forked process after timeout by SIGALRM
  359. signal.alarm(timeout)
  360. # Run the task
  361. # If an exception happens or we run out of time, we terminate
  362. # with non-zero
  363. task()
  364. # If we got here, then everything worked well and in time
  365. # In that case, we terminate successfully
  366. os._exit(0) # needs exit code
  367. else:
  368. (pid, status) = os.waitpid(task_pid, 0)
  369. self.assertEqual(0, status,
  370. "The task did not complete successfully in time")
  371. def get_msgq_with_sockets(self):
  372. '''
  373. Create a message queue and prepare it for use with a socket pair.
  374. The write end is put into the message queue, so we can check it.
  375. It returns (msgq, read_end, write_end). It is expected the sockets
  376. are closed by the caller afterwards.
  377. Also check the sockets are registered correctly (eg. internal data
  378. structures are there for them).
  379. '''
  380. msgq = MsgQ()
  381. # We do only partial setup, so we don't create the listening socket
  382. msgq.setup_poller()
  383. (read, write) = socket.socketpair(socket.AF_UNIX, socket.SOCK_STREAM)
  384. msgq.register_socket(write)
  385. self.assertEqual(1, len(msgq.lnames))
  386. self.assertEqual(write, msgq.lnames[msgq.fd_to_lname[write.fileno()]])
  387. return (msgq, read, write)
  388. def infinite_sender(self, sender):
  389. """
  390. Sends data until an exception happens. socket.error is caught,
  391. as it means the socket got closed. Sender is called to actually
  392. send the data.
  393. """
  394. (msgq, read, write) = self.get_msgq_with_sockets()
  395. # Keep sending while it is not closed by the msgq
  396. try:
  397. while True:
  398. sender(msgq, write)
  399. except socket.error:
  400. pass
  401. # Explicitly close temporary socket pair as the Python
  402. # interpreter expects it. It may not be 100% exception safe,
  403. # but since this is only for tests we prefer brevity.
  404. # Actually, the write end is often closed by the sender.
  405. if write.fileno() != -1:
  406. # Some of the senders passed here kill the socket internally.
  407. # So kill it only if not yet done so. If the socket is closed,
  408. # it gets -1 as fileno().
  409. msgq.kill_socket(write.fileno(), write)
  410. self.assertFalse(msgq.lnames)
  411. self.assertFalse(msgq.fd_to_lname)
  412. read.close()
  413. def test_infinite_sendmsg(self):
  414. """
  415. Tries sending messages (and not reading them) until it either times
  416. out (in blocking call, wrong) or closes it (correct).
  417. """
  418. data = "data"
  419. for i in range(1, 10):
  420. data += data
  421. self.terminate_check(lambda: self.infinite_sender(
  422. lambda msgq, socket: msgq.sendmsg(socket, {}, {"message" : data})))
  423. def test_infinite_sendprepared(self):
  424. """
  425. Tries sending data (and not reading them) until it either times
  426. out (in blocking call, wrong) or closes it (correct).
  427. """
  428. data = b"data"
  429. for i in range(1, 10):
  430. data += data
  431. self.terminate_check(lambda: self.infinite_sender(
  432. lambda msgq, socket: msgq.send_prepared_msg(socket, data)))
  433. def test_sendprepared_success(self):
  434. '''
  435. Test the send_prepared_msg returns success when queueing messages.
  436. It does so on the first attempt (when it actually tries to send
  437. something to the socket) and on any attempt that follows and the
  438. buffer is already full.
  439. '''
  440. (msgq, read, write) = self.get_msgq_with_sockets()
  441. # Now keep sending until we fill in something into the internal
  442. # buffer.
  443. while not write.fileno() in msgq.sendbuffs:
  444. self.assertTrue(msgq.send_prepared_msg(write, b'data'))
  445. read.close()
  446. write.close()
  447. def test_sendprepared_epipe(self):
  448. '''
  449. Test the send_prepared_msg returns false when we try to queue a
  450. message and the other side is not there any more. It should be done
  451. with EPIPE, so not a fatal error.
  452. '''
  453. (msgq, read, write) = self.get_msgq_with_sockets()
  454. # Close one end. It should make a EPIPE on the other.
  455. read.close()
  456. # Now it should soft-fail
  457. self.assertFalse(msgq.send_prepared_msg(write, b'data'))
  458. write.close()
  459. def send_many(self, data):
  460. """
  461. Tries that sending a command many times and getting an answer works.
  462. """
  463. msgq = MsgQ()
  464. # msgq.run needs to compare with the listen_socket, so we provide
  465. # a replacement
  466. msgq.listen_socket = DummySocket
  467. (queue, out) = socket.socketpair(socket.AF_UNIX, socket.SOCK_STREAM)
  468. def run():
  469. length = len(data)
  470. queue_pid = os.fork()
  471. if queue_pid == 0:
  472. signal.alarm(120)
  473. msgq.setup_poller()
  474. msgq.setup_signalsock()
  475. msgq.register_socket(queue)
  476. msgq.run()
  477. msgq.cleanup_signalsock()
  478. else:
  479. try:
  480. def killall(signum, frame):
  481. os.kill(queue_pid, signal.SIGTERM)
  482. os._exit(1)
  483. signal.signal(signal.SIGALRM, killall)
  484. msg = msgq.preparemsg({"type" : "ping"}, data)
  485. now = time.clock()
  486. while time.clock() - now < 0.2:
  487. out.sendall(msg)
  488. # Check the answer
  489. (routing, received) = msgq.read_packet(out.fileno(),
  490. out)
  491. self.assertEqual({"type" : "pong"},
  492. isc.cc.message.from_wire(routing))
  493. self.assertEqual(data, received)
  494. finally:
  495. os.kill(queue_pid, signal.SIGTERM)
  496. self.terminate_check(run)
  497. # Explicitly close temporary socket pair as the Python
  498. # interpreter expects it. It may not be 100% exception safe,
  499. # but since this is only for tests we prefer brevity.
  500. queue.close()
  501. out.close()
  502. def test_small_sends(self):
  503. """
  504. Tests sending small data many times.
  505. """
  506. self.send_many(b"data")
  507. def test_large_sends(self):
  508. """
  509. Tests sending large data many times.
  510. """
  511. data = b"data"
  512. for i in range(1, 20):
  513. data = data + data
  514. self.send_many(data)
  515. def do_send(self, write, read, control_write, control_read,
  516. expect_arrive=True, expect_send_exception=None):
  517. """
  518. Makes a msgq object that is talking to itself,
  519. run it in a separate thread so we can use and
  520. test run().
  521. It is given two sets of connected sockets; write/read, and
  522. control_write/control_read. The former may be throwing errors
  523. and mangle data to test msgq. The second is mainly used to
  524. send msgq the stop command.
  525. (Note that the terms 'read' and 'write' are from the msgq
  526. point of view, so the test itself writes to 'control_read')
  527. Parameters:
  528. write: a socket that is used to send the data to
  529. read: a socket that is used to read the data from
  530. control_write: a second socket for communication with msgq
  531. control_read: a second socket for communication with msgq
  532. expect_arrive: if True, the read socket is read from, and the data
  533. that is read is expected to be the same as the data
  534. that has been sent to the write socket.
  535. expect_send_exception: if not None, this is the exception that is
  536. expected to be raised by msgq
  537. """
  538. # Some message and envelope data to send and check
  539. env = b'{"env": "foo"}'
  540. msg = b'{"msg": "bar"}'
  541. msgq = MsgQ()
  542. # Don't need a listen_socket
  543. msgq.listen_socket = DummySocket
  544. msgq.setup_poller()
  545. msgq.setup_signalsock()
  546. msgq.register_socket(write)
  547. msgq.register_socket(control_write)
  548. # Queue the message for sending
  549. msgq.sendmsg(write, env, msg)
  550. # Run it in a thread
  551. msgq_thread = MsgQThread(msgq)
  552. # If we're done, just kill it
  553. msgq_thread.start()
  554. if expect_arrive:
  555. (recv_env, recv_msg) = msgq.read_packet(read.fileno(),
  556. read)
  557. self.assertEqual(env, recv_env)
  558. self.assertEqual(msg, recv_msg)
  559. # Tell msgq to stop
  560. msg = msgq.preparemsg({"type" : "stop"})
  561. control_read.sendall(msg)
  562. # Wait for thread to stop if it hasn't already.
  563. # Put in a (long) timeout; the thread *should* stop, but if it
  564. # does not, we don't want the test to hang forever
  565. msgq_thread.join(60)
  566. # Fail the test if it didn't stop
  567. self.assertFalse(msgq_thread.isAlive(), "Thread did not stop")
  568. # Clean up some internals of msgq (usually called as part of
  569. # shutdown, but we skip that one here)
  570. msgq.cleanup_signalsock()
  571. # Check the exception from the thread, if any
  572. # First, if we didn't expect it; reraise it (to make test fail and
  573. # show the stacktrace for debugging)
  574. if expect_send_exception is None:
  575. if msgq_thread.caught_exception is not None:
  576. raise msgq_thread.caught_exception
  577. else:
  578. # If we *did* expect it, fail it there was none
  579. self.assertIsNotNone(msgq_thread.caught_exception)
  580. def do_send_with_send_error(self, raise_on_send, send_exception,
  581. expect_answer=True,
  582. expect_send_exception=None):
  583. """
  584. Sets up two connected sockets, wraps the sender socket into a BadSocket
  585. class, then performs a do_send() test.
  586. Parameters:
  587. raise_on_send: the byte at which send_exception should be raised
  588. (see BadSocket)
  589. send_exception: the exception to raise (see BadSocket)
  590. expect_answer: whether the send is expected to complete (and hence
  591. the read socket should get the message)
  592. expect_send_exception: the exception msgq is expected to raise when
  593. send_exception is raised by BadSocket.
  594. """
  595. (write, read) = socket.socketpair(socket.AF_UNIX, socket.SOCK_STREAM)
  596. (control_write, control_read) = socket.socketpair(socket.AF_UNIX, socket.SOCK_STREAM)
  597. badwrite = BadSocket(write, raise_on_send, send_exception)
  598. self.do_send(badwrite, read, control_write, control_read, expect_answer, expect_send_exception)
  599. write.close()
  600. read.close()
  601. control_write.close()
  602. control_read.close()
  603. def test_send_raise_recoverable(self):
  604. """
  605. Test whether msgq survices a recoverable socket errors when sending.
  606. Two tests are done: one where the error is raised on the 3rd octet,
  607. and one on the 23rd.
  608. """
  609. for err in [ errno.EAGAIN, errno.EWOULDBLOCK, errno.EINTR ]:
  610. sockerr = socket.error(err, 'Socket error')
  611. self.do_send_with_send_error(3, sockerr)
  612. self.do_send_with_send_error(23, sockerr)
  613. def test_send_raise_nonrecoverable(self):
  614. """
  615. Test whether msgq survives socket errors that are nonrecoverable
  616. (for said socket that is, i.e. EPIPE etc).
  617. Two tests are done: one where the error is raised on the 3rd octet,
  618. and one on the 23rd.
  619. """
  620. for err in [ errno.EPIPE, errno.ENOBUFS, errno.ECONNRESET ]:
  621. sockerr = socket.error(err, 'Socket error')
  622. self.do_send_with_send_error(3, sockerr, False)
  623. self.do_send_with_send_error(23, sockerr, False)
  624. def otest_send_raise_crash(self):
  625. """
  626. Test whether msgq does NOT survive on a general exception.
  627. Note, perhaps it should; but we'd have to first discuss and decide
  628. how it should recover (i.e. drop the socket and consider the client
  629. dead?
  630. It may be a coding problem in msgq itself, and we certainly don't
  631. want to ignore those.
  632. """
  633. sockerr = Exception("just some general exception")
  634. self.do_send_with_send_error(3, sockerr, False, sockerr)
  635. self.do_send_with_send_error(23, sockerr, False, sockerr)
  636. class ThreadTests(unittest.TestCase):
  637. """Test various things around thread synchronization."""
  638. def setUp(self):
  639. self.__msgq = MsgQ()
  640. self.__abort_wait = False
  641. self.__result = None
  642. self.__notify_thread = threading.Thread(target=self.__notify)
  643. self.__wait_thread = threading.Thread(target=self.__wait)
  644. # Make sure the threads are killed if left behind by the test.
  645. self.__notify_thread.daemon = True
  646. self.__wait_thread.daemon = True
  647. def __notify(self):
  648. """Call the cfgmgr_ready."""
  649. if self.__abort_wait:
  650. self.__msgq.cfgmgr_ready(False)
  651. else:
  652. self.__msgq.cfgmgr_ready()
  653. def __wait(self):
  654. """Wait for config manager and store the result."""
  655. self.__result = self.__msgq.wait_cfgmgr()
  656. def test_wait_cfgmgr(self):
  657. """One thread signals the config manager subscribed, the other
  658. waits for it. We then check it terminated correctly.
  659. """
  660. self.__notify_thread.start()
  661. self.__wait_thread.start()
  662. # Timeout to ensure the test terminates even on failure
  663. self.__wait_thread.join(60)
  664. self.assertTrue(self.__result)
  665. def test_wait_cfgmgr_2(self):
  666. """Same as test_wait_cfgmgr, but starting the threads in reverse order
  667. (the result should be the same).
  668. """
  669. self.__wait_thread.start()
  670. self.__notify_thread.start()
  671. # Timeout to ensure the test terminates even on failure
  672. self.__wait_thread.join(60)
  673. self.assertTrue(self.__result)
  674. def test_wait_abort(self):
  675. """Similar to test_wait_cfgmgr, but the config manager is never
  676. subscribed and it is aborted.
  677. """
  678. self.__abort_wait = True
  679. self.__wait_thread.start()
  680. self.__notify_thread.start()
  681. # Timeout to ensure the test terminates even on failure
  682. self.__wait_thread.join(60)
  683. self.assertIsNotNone(self.__result)
  684. self.assertFalse(self.__result)
  685. def __check_ready_and_abort(self):
  686. """Check that when we first say the config manager is ready and then
  687. try to abort, it uses the first result.
  688. """
  689. self.__msgq.cfgmgr_ready()
  690. self.__msgq.cfgmgr_ready(False)
  691. self.__result = self.__msgq.wait_cfgmgr()
  692. def test_ready_and_abort(self):
  693. """Perform the __check_ready_and_abort test, but in a separate thread,
  694. so in case something goes wrong with the synchronisation and it
  695. deadlocks, the test will terminate anyway.
  696. """
  697. test_thread = threading.Thread(target=self.__check_ready_and_abort)
  698. test_thread.daemon = True
  699. test_thread.start()
  700. test_thread.join(60)
  701. self.assertTrue(self.__result)
  702. class SocketTests(unittest.TestCase):
  703. '''Test cases for micro behaviors related to socket operations.
  704. Some cases are covered as part of other tests, but in this fixture
  705. we check more details of specific method related to socket operation,
  706. with the help of mock classes to avoid expensive overhead.
  707. '''
  708. class MockSocket():
  709. '''A mock socket used instead of standard socket objects.'''
  710. def __init__(self):
  711. self.ex_on_send = None # raised from send() if not None
  712. self.recv_result = b'test' # dummy data or exception
  713. self.blockings = [] # history of setblocking() params
  714. def setblocking(self, on):
  715. self.blockings.append(on)
  716. def send(self, data):
  717. if self.ex_on_send is not None:
  718. raise self.ex_on_send
  719. return 10 # arbitrary choice
  720. def recv(self, len):
  721. if isinstance(self.recv_result, Exception):
  722. raise self.recv_result
  723. ret = self.recv_result
  724. self.recv_result = b'' # if called again, return empty data
  725. return ret
  726. def fileno(self):
  727. return 42 # arbitrary choice
  728. class LoggerWrapper():
  729. '''A simple wrapper of logger to inspect log messages.'''
  730. def __init__(self, logger):
  731. self.error_called = 0
  732. self.warn_called = 0
  733. self.debug_called = 0
  734. self.orig_logger = logger
  735. def error(self, *args):
  736. self.error_called += 1
  737. self.orig_logger.error(*args)
  738. def warn(self, *args):
  739. self.warn_called += 1
  740. self.orig_logger.warn(*args)
  741. def debug(self, *args):
  742. self.debug_called += 1
  743. self.orig_logger.debug(*args)
  744. def mock_kill_socket(self, fileno, sock):
  745. '''A replacement of MsgQ.kill_socket method for inspection.'''
  746. self.__killed_socket = (fileno, sock)
  747. if fileno in self.__msgq.sockets:
  748. del self.__msgq.sockets[fileno]
  749. def setUp(self):
  750. self.__msgq = MsgQ()
  751. self.__msgq.kill_socket = self.mock_kill_socket
  752. self.__sock = self.MockSocket()
  753. self.__data = b'dummy'
  754. self.__msgq.sockets[42] = self.__sock
  755. self.__msgq.sendbuffs[42] = (None, b'testdata')
  756. self.__sock_error = socket.error()
  757. self.__killed_socket = None
  758. self.__logger = self.LoggerWrapper(msgq.logger)
  759. msgq.logger = self.__logger
  760. def tearDown(self):
  761. msgq.logger = self.__logger.orig_logger
  762. def test_send_data(self):
  763. # Successful case: _send_data() returns the hardcoded value, and
  764. # setblocking() is called twice with the expected parameters
  765. self.assertEqual(10, self.__msgq._send_data(self.__sock, self.__data))
  766. self.assertEqual([0, 1], self.__sock.blockings)
  767. self.assertIsNone(self.__killed_socket)
  768. def test_send_data_interrupt(self):
  769. '''send() is interrupted. send_data() returns 0, sock isn't killed.'''
  770. expected_blockings = []
  771. for eno in [errno.EAGAIN, errno.EWOULDBLOCK, errno.EINTR]:
  772. self.__sock_error.errno = eno
  773. self.__sock.ex_on_send = self.__sock_error
  774. self.assertEqual(0, self.__msgq._send_data(self.__sock,
  775. self.__data))
  776. expected_blockings.extend([0, 1])
  777. self.assertEqual(expected_blockings, self.__sock.blockings)
  778. self.assertIsNone(self.__killed_socket)
  779. def test_send_data_error(self):
  780. '''Unexpected error happens on send(). The socket is killed.
  781. If the error is EPIPE, it's logged at the warn level; otherwise
  782. an error message is logged.
  783. '''
  784. expected_blockings = []
  785. expected_errors = 0
  786. expected_warns = 0
  787. for eno in [errno.EPIPE, errno.ECONNRESET, errno.ENOBUFS]:
  788. self.__sock_error.errno = eno
  789. self.__sock.ex_on_send = self.__sock_error
  790. self.__killed_socket = None # clear any previuos value
  791. self.assertEqual(None, self.__msgq._send_data(self.__sock,
  792. self.__data))
  793. self.assertEqual((42, self.__sock), self.__killed_socket)
  794. expected_blockings.extend([0, 1])
  795. self.assertEqual(expected_blockings, self.__sock.blockings)
  796. if eno == errno.EPIPE:
  797. expected_warns += 1
  798. else:
  799. expected_errors += 1
  800. self.assertEqual(expected_errors, self.__logger.error_called)
  801. self.assertEqual(expected_warns, self.__logger.warn_called)
  802. def test_process_fd_read_after_bad_write(self):
  803. '''Check the specific case of write fail followed by read attempt.
  804. The write failure results in kill_socket, then read shouldn't tried.
  805. '''
  806. self.__sock_error.errno = errno.EPIPE
  807. self.__sock.ex_on_send = self.__sock_error
  808. self.__msgq.process_socket = None # if called, trigger an exception
  809. self.__msgq._process_fd(42, True, True, False) # shouldn't crash
  810. # check the socket is deleted from the fileno=>sock dictionary
  811. self.assertEqual({}, self.__msgq.sockets)
  812. def test_process_fd_close_after_bad_write(self):
  813. '''Similar to the previous, but for checking dup'ed kill attempt'''
  814. self.__sock_error.errno = errno.EPIPE
  815. self.__sock.ex_on_send = self.__sock_error
  816. self.__msgq._process_fd(42, True, False, True) # shouldn't crash
  817. self.assertEqual({}, self.__msgq.sockets)
  818. def test_process_fd_writer_after_close(self):
  819. '''Emulate a "writable" socket has been already closed and killed.'''
  820. # This just shouldn't crash
  821. self.__msgq._process_fd(4200, True, False, False)
  822. def test_process_packet(self):
  823. '''Check some failure cases in handling an incoming message.'''
  824. expected_errors = 0
  825. expected_debugs = 0
  826. # if socket.recv() fails due to socket.error, it will be logged
  827. # as error and the socket will be killed regardless of errno.
  828. for eno in [errno.ENOBUFS, errno.ECONNRESET]:
  829. self.__sock_error.errno = eno
  830. self.__sock.recv_result = self.__sock_error
  831. self.__killed_socket = None # clear any previuos value
  832. self.__msgq.process_packet(42, self.__sock)
  833. self.assertEqual((42, self.__sock), self.__killed_socket)
  834. expected_errors += 1
  835. self.assertEqual(expected_errors, self.__logger.error_called)
  836. self.assertEqual(expected_debugs, self.__logger.debug_called)
  837. # if socket.recv() returns empty data, the result depends on whether
  838. # there's any preceding data; in the second case below, at least
  839. # 6 bytes of data will be expected, and the second call to our faked
  840. # recv() returns empty data. In that case it will be logged as error.
  841. for recv_data in [b'', b'short']:
  842. self.__sock.recv_result = recv_data
  843. self.__killed_socket = None
  844. self.__msgq.process_packet(42, self.__sock)
  845. self.assertEqual((42, self.__sock), self.__killed_socket)
  846. if len(recv_data) == 0:
  847. expected_debugs += 1
  848. else:
  849. expected_errors += 1
  850. self.assertEqual(expected_errors, self.__logger.error_called)
  851. self.assertEqual(expected_debugs, self.__logger.debug_called)
  852. if __name__ == '__main__':
  853. isc.log.resetUnitTestRootLogger()
  854. unittest.main()