msgq_test.py 45 KB

12345678910111213141516171819202122232425262728293031323334353637383940414243444546474849505152535455565758596061626364656667686970717273747576777879808182838485868788899091929394959697989910010110210310410510610710810911011111211311411511611711811912012112212312412512612712812913013113213313413513613713813914014114214314414514614714814915015115215315415515615715815916016116216316416516616716816917017117217317417517617717817918018118218318418518618718818919019119219319419519619719819920020120220320420520620720820921021121221321421521621721821922022122222322422522622722822923023123223323423523623723823924024124224324424524624724824925025125225325425525625725825926026126226326426526626726826927027127227327427527627727827928028128228328428528628728828929029129229329429529629729829930030130230330430530630730830931031131231331431531631731831932032132232332432532632732832933033133233333433533633733833934034134234334434534634734834935035135235335435535635735835936036136236336436536636736836937037137237337437537637737837938038138238338438538638738838939039139239339439539639739839940040140240340440540640740840941041141241341441541641741841942042142242342442542642742842943043143243343443543643743843944044144244344444544644744844945045145245345445545645745845946046146246346446546646746846947047147247347447547647747847948048148248348448548648748848949049149249349449549649749849950050150250350450550650750850951051151251351451551651751851952052152252352452552652752852953053153253353453553653753853954054154254354454554654754854955055155255355455555655755855956056156256356456556656756856957057157257357457557657757857958058158258358458558658758858959059159259359459559659759859960060160260360460560660760860961061161261361461561661761861962062162262362462562662762862963063163263363463563663763863964064164264364464564664764864965065165265365465565665765865966066166266366466566666766866967067167267367467567667767867968068168268368468568668768868969069169269369469569669769869970070170270370470570670770870971071171271371471571671771871972072172272372472572672772872973073173273373473573673773873974074174274374474574674774874975075175275375475575675775875976076176276376476576676776876977077177277377477577677777877978078178278378478578678778878979079179279379479579679779879980080180280380480580680780880981081181281381481581681781881982082182282382482582682782882983083183283383483583683783883984084184284384484584684784884985085185285385485585685785885986086186286386486586686786886987087187287387487587687787887988088188288388488588688788888989089189289389489589689789889990090190290390490590690790890991091191291391491591691791891992092192292392492592692792892993093193293393493593693793893994094194294394494594694794894995095195295395495595695795895996096196296396496596696796896997097197297397497597697797897998098198298398498598698798898999099199299399499599699799899910001001100210031004100510061007100810091010101110121013101410151016101710181019102010211022102310241025102610271028102910301031103210331034103510361037103810391040104110421043104410451046104710481049105010511052105310541055105610571058105910601061106210631064106510661067106810691070107110721073107410751076107710781079108010811082108310841085108610871088108910901091109210931094109510961097109810991100110111021103110411051106110711081109111011111112111311141115111611171118111911201121
  1. # Copyright (C) 2010-2013 Internet Systems Consortium.
  2. #
  3. # Permission to use, copy, modify, and distribute this software for any
  4. # purpose with or without fee is hereby granted, provided that the above
  5. # copyright notice and this permission notice appear in all copies.
  6. #
  7. # THE SOFTWARE IS PROVIDED "AS IS" AND INTERNET SYSTEMS CONSORTIUM
  8. # DISCLAIMS ALL WARRANTIES WITH REGARD TO THIS SOFTWARE INCLUDING ALL
  9. # IMPLIED WARRANTIES OF MERCHANTABILITY AND FITNESS. IN NO EVENT SHALL
  10. # INTERNET SYSTEMS CONSORTIUM BE LIABLE FOR ANY SPECIAL, DIRECT,
  11. # INDIRECT, OR CONSEQUENTIAL DAMAGES OR ANY DAMAGES WHATSOEVER RESULTING
  12. # FROM LOSS OF USE, DATA OR PROFITS, WHETHER IN AN ACTION OF CONTRACT,
  13. # NEGLIGENCE OR OTHER TORTIOUS ACTION, ARISING OUT OF OR IN CONNECTION
  14. # WITH THE USE OR PERFORMANCE OF THIS SOFTWARE.
  15. import msgq
  16. from msgq import SubscriptionManager, MsgQ
  17. import unittest
  18. import os
  19. import socket
  20. import select # needed only for #3014. can be removed once it's solved
  21. import signal
  22. import sys
  23. import time
  24. import errno
  25. import threading
  26. import isc.cc
  27. import collections
  28. import isc.log
  29. import struct
  30. import json
  31. #
  32. # Currently only the subscription part and some sending is implemented...
  33. # I'd have to mock out a socket, which, while not impossible, is not trivial.
  34. #
  35. class TestSubscriptionManager(unittest.TestCase):
  36. def setUp(self):
  37. self.__cfgmgr_ready_called = 0
  38. self.sm = SubscriptionManager(self.cfgmgr_ready)
  39. def cfgmgr_ready(self):
  40. # Called one more time
  41. self.__cfgmgr_ready_called += 1
  42. def test_subscription_add_delete_manager(self):
  43. self.sm.subscribe("a", "*", 'sock1')
  44. self.assertEqual(self.sm.find_sub("a", "*"), [ 'sock1' ])
  45. def test_subscription_add_delete_other(self):
  46. self.sm.subscribe("a", "*", 'sock1')
  47. self.sm.unsubscribe("a", "*", 'sock2')
  48. self.assertEqual(self.sm.find_sub("a", "*"), [ 'sock1' ])
  49. def test_subscription_add_several_sockets(self):
  50. socks = [ 's1', 's2', 's3', 's4', 's5' ]
  51. for s in socks:
  52. self.sm.subscribe("a", "*", s)
  53. self.assertEqual(self.sm.find_sub("a", "*"), socks)
  54. def test_unsubscribe(self):
  55. socks = [ 's1', 's2', 's3', 's4', 's5' ]
  56. for s in socks:
  57. self.sm.subscribe("a", "*", s)
  58. self.assertTrue(self.sm.unsubscribe("a", "*", 's3'))
  59. # Unsubscribe from group it is not in
  60. self.assertFalse(self.sm.unsubscribe("a", "*", 's42'))
  61. self.assertEqual(self.sm.find_sub("a", "*"),
  62. [ 's1', 's2', 's4', 's5' ])
  63. def test_unsubscribe_all(self):
  64. self.sm.subscribe('g1', 'i1', 's1')
  65. self.sm.subscribe('g1', 'i1', 's2')
  66. self.sm.subscribe('g1', 'i2', 's1')
  67. self.sm.subscribe('g1', 'i2', 's2')
  68. self.sm.subscribe('g2', 'i1', 's1')
  69. self.sm.subscribe('g2', 'i1', 's2')
  70. self.sm.subscribe('g2', 'i2', 's1')
  71. self.sm.subscribe('g2', 'i2', 's2')
  72. self.assertEqual(set([('g1', 'i1'), ('g1', 'i2'), ('g2', 'i1'),
  73. ('g2', 'i2')]),
  74. set(self.sm.unsubscribe_all('s1')))
  75. self.assertEqual(self.sm.find_sub("g1", "i1"), [ 's2' ])
  76. self.assertEqual(self.sm.find_sub("g1", "i2"), [ 's2' ])
  77. self.assertEqual(self.sm.find_sub("g2", "i1"), [ 's2' ])
  78. self.assertEqual(self.sm.find_sub("g2", "i2"), [ 's2' ])
  79. def test_find(self):
  80. self.sm.subscribe('g1', 'i1', 's1')
  81. self.sm.subscribe('g1', '*', 's2')
  82. self.assertEqual(set(self.sm.find("g1", "i1")), set([ 's1', 's2' ]))
  83. def test_find_sub(self):
  84. self.sm.subscribe('g1', 'i1', 's1')
  85. self.sm.subscribe('g1', '*', 's2')
  86. self.assertEqual(self.sm.find_sub("g1", "i1"), [ 's1' ])
  87. def test_open_socket_parameter(self):
  88. self.assertFalse(os.path.exists("./my_socket_file"))
  89. msgq = MsgQ("./my_socket_file");
  90. msgq.setup()
  91. self.assertTrue(os.path.exists("./my_socket_file"))
  92. msgq.shutdown();
  93. self.assertFalse(os.path.exists("./my_socket_file"))
  94. def test_open_socket_environment_variable(self):
  95. self.assertFalse(os.path.exists("my_socket_file"))
  96. os.environ["BIND10_MSGQ_SOCKET_FILE"] = "./my_socket_file"
  97. msgq = MsgQ();
  98. msgq.setup()
  99. self.assertTrue(os.path.exists("./my_socket_file"))
  100. msgq.shutdown();
  101. self.assertFalse(os.path.exists("./my_socket_file"))
  102. def test_open_socket_default(self):
  103. env_var = None
  104. orig_socket_file = None
  105. if "BIND10_MSGQ_SOCKET_FILE" in os.environ:
  106. env_var = os.environ["BIND10_MSGQ_SOCKET_FILE"]
  107. del os.environ["BIND10_MSGQ_SOCKET_FILE"]
  108. # temporarily replace the class "default" not to be disrupted by
  109. # any running BIND 10 instance.
  110. if "BIND10_TEST_SOCKET_FILE" in os.environ:
  111. MsgQ.SOCKET_FILE = os.environ["BIND10_TEST_SOCKET_FILE"]
  112. socket_file = MsgQ.SOCKET_FILE
  113. self.assertFalse(os.path.exists(socket_file))
  114. msgq = MsgQ();
  115. try:
  116. msgq.setup()
  117. self.assertTrue(os.path.exists(socket_file))
  118. msgq.shutdown()
  119. self.assertFalse(os.path.exists(socket_file))
  120. except socket.error:
  121. # ok, the install path doesn't exist at all,
  122. # so we can't check any further
  123. pass
  124. if env_var is not None:
  125. os.environ["BIND10_MSGQ_SOCKET_FILE"] = env_var
  126. if orig_socket_file is not None:
  127. MsgQ.SOCKET_FILE = orig_socket_file
  128. def test_open_socket_bad(self):
  129. msgq = MsgQ("/does/not/exist")
  130. self.assertRaises(socket.error, msgq.setup)
  131. # But we can clean up after that.
  132. msgq.shutdown()
  133. def test_subscribe_cfgmgr(self):
  134. """Test special handling of the config manager. Once it subscribes,
  135. the message queue needs to connect and read the config. But not
  136. before and only once.
  137. """
  138. self.assertEqual(0, self.__cfgmgr_ready_called)
  139. # Not called when something else subscribes
  140. self.sm.subscribe('SomethingElse', '*', 's1')
  141. self.assertEqual(0, self.__cfgmgr_ready_called)
  142. # Called whenever the config manager subscribes
  143. self.sm.subscribe('ConfigManager', '*', 's2')
  144. self.assertEqual(1, self.__cfgmgr_ready_called)
  145. # But not called again when it subscribes again (should not
  146. # happen in practice, but we make sure anyway)
  147. self.sm.subscribe('ConfigManager', '*', 's3')
  148. self.assertEqual(1, self.__cfgmgr_ready_called)
  149. class MsgQTest(unittest.TestCase):
  150. """
  151. Tests for the behaviour of MsgQ. This is for the core of MsgQ, other
  152. subsystems are in separate test fixtures.
  153. """
  154. def setUp(self):
  155. self.__msgq = MsgQ()
  156. def parse_msg(self, msg):
  157. """
  158. Parse a binary representation of message to the routing header and the
  159. data payload. It assumes the message is correctly encoded and the
  160. payload is not omitted. It'd probably throw in other cases, but we
  161. don't use it in such situations in this test.
  162. """
  163. (length, header_len) = struct.unpack('>IH', msg[:6])
  164. header = json.loads(msg[6:6 + header_len].decode('utf-8'))
  165. data = json.loads(msg[6 + header_len:].decode('utf-8'))
  166. return (header, data)
  167. def test_unknown_command(self):
  168. """
  169. Test the command handler returns error when the command is unknown.
  170. """
  171. # Fake we are running, to disable test workarounds
  172. self.__msgq.running = True
  173. self.assertEqual({'result': [1, "unknown command: unknown"]},
  174. self.__msgq.command_handler('unknown', {}))
  175. def test_get_members(self):
  176. """
  177. Test getting members of a group or of all connected clients.
  178. """
  179. # Push two dummy "clients" into msgq (the ugly way, by directly
  180. # tweaking relevant data structures).
  181. class Sock:
  182. def __init__(self, fileno):
  183. self.fileno = lambda: fileno
  184. self.__msgq.lnames['first'] = Sock(1)
  185. self.__msgq.lnames['second'] = Sock(2)
  186. self.__msgq.fd_to_lname[1] = 'first'
  187. self.__msgq.fd_to_lname[2] = 'second'
  188. # Subscribe them to some groups
  189. self.__msgq.process_command_subscribe(self.__msgq.lnames['first'],
  190. {'group': 'G1', 'instance': '*'},
  191. None)
  192. self.__msgq.process_command_subscribe(self.__msgq.lnames['second'],
  193. {'group': 'G1', 'instance': '*'},
  194. None)
  195. self.__msgq.process_command_subscribe(self.__msgq.lnames['second'],
  196. {'group': 'G2', 'instance': '*'},
  197. None)
  198. # Now query content of some groups through the command handler.
  199. self.__msgq.running = True # Enable the command handler
  200. def check_both(result):
  201. """
  202. Check the result is successful one and it contains both lnames (in
  203. any order).
  204. """
  205. array = result['result'][1]
  206. self.assertEqual(set(['first', 'second']), set(array))
  207. self.assertEqual({'result': [0, array]}, result)
  208. # Make sure the result can be encoded as JSON
  209. # (there seems to be types that look like a list but JSON choks
  210. # on them)
  211. json.dumps(result)
  212. # Members of the G1 and G2
  213. self.assertEqual({'result': [0, ['second']]},
  214. self.__msgq.command_handler('members',
  215. {'group': 'G2'}))
  216. check_both(self.__msgq.command_handler('members', {'group': 'G1'}))
  217. # We pretend that all the possible groups exist, just that most
  218. # of them are empty. So requesting for Empty is request for an empty
  219. # group and should not fail.
  220. self.assertEqual({'result': [0, []]},
  221. self.__msgq.command_handler('members',
  222. {'group': 'Empty'}))
  223. # Without the name of the group, we just get all the clients.
  224. check_both(self.__msgq.command_handler('members', {}))
  225. # Omitting the parameters completely in such case is OK
  226. check_both(self.__msgq.command_handler('members', None))
  227. def notifications_setup(self):
  228. """
  229. Common setup of some notifications tests. Mock several things.
  230. """
  231. # Mock the method to send notifications (we don't really want
  232. # to send them now, just see they'd be sent).
  233. # Mock the poller, as we don't need it at all (and we don't have
  234. # real socket to give it now).
  235. notifications = []
  236. def send_notification(event, params):
  237. notifications.append((event, params))
  238. class FakePoller:
  239. def register(self, socket, mode):
  240. pass
  241. def unregister(self, sock):
  242. pass
  243. self.__msgq.members_notify = send_notification
  244. self.__msgq.poller = FakePoller()
  245. # Create a socket
  246. class Sock:
  247. def __init__(self, fileno):
  248. self.fileno = lambda: fileno
  249. def close(self):
  250. pass
  251. sock = Sock(1)
  252. return notifications, sock
  253. def test_notifies(self):
  254. """
  255. Test the message queue sends notifications about connecting,
  256. disconnecting and subscription changes.
  257. """
  258. notifications, sock = self.notifications_setup()
  259. # We should notify about new cliend when we register it
  260. self.__msgq.register_socket(sock)
  261. lname = self.__msgq.fd_to_lname[1] # Steal the lname
  262. self.assertEqual([('connected', {'client': lname})], notifications)
  263. del notifications[:]
  264. # A notification should happen for a subscription to a group
  265. self.__msgq.process_command_subscribe(sock, {'group': 'G',
  266. 'instance': '*'},
  267. None)
  268. self.assertEqual([('subscribed', {'client': lname, 'group': 'G'})],
  269. notifications)
  270. del notifications[:]
  271. # As well for unsubscription
  272. self.__msgq.process_command_unsubscribe(sock, {'group': 'G',
  273. 'instance': '*'},
  274. None)
  275. self.assertEqual([('unsubscribed', {'client': lname, 'group': 'G'})],
  276. notifications)
  277. del notifications[:]
  278. # Unsubscription from a group it isn't subscribed to
  279. self.__msgq.process_command_unsubscribe(sock, {'group': 'H',
  280. 'instance': '*'},
  281. None)
  282. self.assertEqual([], notifications)
  283. # And, finally, for removal of client
  284. self.__msgq.kill_socket(sock.fileno(), sock)
  285. self.assertEqual([('disconnected', {'client': lname})], notifications)
  286. def test_notifies_implicit_kill(self):
  287. """
  288. Test that the unsubscription notifications are sent before the socket
  289. is dropped, even in case it does not unsubscribe explicitly.
  290. """
  291. notifications, sock = self.notifications_setup()
  292. # Register and subscribe. Notifications for these are in above test.
  293. self.__msgq.register_socket(sock)
  294. lname = self.__msgq.fd_to_lname[1] # Steal the lname
  295. self.__msgq.process_command_subscribe(sock, {'group': 'G',
  296. 'instance': '*'},
  297. None)
  298. del notifications[:]
  299. self.__msgq.kill_socket(sock.fileno(), sock)
  300. # Now, the notification for unsubscribe should be first, second for
  301. # the disconnection.
  302. self.assertEqual([('unsubscribed', {'client': lname, 'group': 'G'}),
  303. ('disconnected', {'client': lname})
  304. ], notifications)
  305. def test_undeliverable_errors(self):
  306. """
  307. Send several packets through the MsgQ and check it generates
  308. undeliverable notifications under the correct circumstances.
  309. The test is not exhaustive as it doesn't test all combination
  310. of existence of the recipient, addressing schemes, want_answer
  311. header and the reply header. It is not needed, these should
  312. be mostly independent. That means, for example, if the message
  313. is a reply and there's no recipient to send it to, the error
  314. would not be generated no matter if we addressed the recipient
  315. by lname or group. If we included everything, the test would
  316. have too many scenarios with little benefit.
  317. """
  318. self.__sent_messages = []
  319. def fake_send_prepared_msg(socket, msg):
  320. self.__sent_messages.append((socket, msg))
  321. return True
  322. self.__msgq.send_prepared_msg = fake_send_prepared_msg
  323. # These would be real sockets in the MsgQ, but we pass them as
  324. # parameters only, so we don't need them to be. We use simple
  325. # integers to tell one from another.
  326. sender = 1
  327. recipient = 2
  328. another_recipiet = 3
  329. # The routing headers and data to test with.
  330. routing = {
  331. 'to': '*',
  332. 'from': 'sender',
  333. 'group': 'group',
  334. 'instance': '*',
  335. 'seq': 42
  336. }
  337. data = {
  338. "data": "Just some data"
  339. }
  340. # Some common checking patterns
  341. def check_error():
  342. self.assertEqual(1, len(self.__sent_messages))
  343. self.assertEqual(1, self.__sent_messages[0][0])
  344. self.assertEqual(({
  345. 'group': 'group',
  346. 'instance': '*',
  347. 'reply': 42,
  348. 'seq': 42,
  349. 'from': 'msgq',
  350. 'to': 'sender',
  351. 'want_answer': True
  352. }, {'result': [-1, "No such recipient"]}),
  353. self.parse_msg(self.__sent_messages[0][1]))
  354. self.__sent_messages = []
  355. def check_no_message():
  356. self.assertEqual([], self.__sent_messages)
  357. def check_delivered(rcpt_socket=recipient):
  358. self.assertEqual(1, len(self.__sent_messages))
  359. self.assertEqual(rcpt_socket, self.__sent_messages[0][0])
  360. self.assertEqual((routing, data),
  361. self.parse_msg(self.__sent_messages[0][1]))
  362. self.__sent_messages = []
  363. # Send the message. No recipient, but errors are not requested,
  364. # so none is generated.
  365. self.__msgq.process_command_send(sender, routing, data)
  366. check_no_message()
  367. # It should act the same if we explicitly say we do not want replies.
  368. routing["want_answer"] = False
  369. self.__msgq.process_command_send(sender, routing, data)
  370. check_no_message()
  371. # Ask for errors if it can't be delivered.
  372. routing["want_answer"] = True
  373. self.__msgq.process_command_send(sender, routing, data)
  374. check_error()
  375. # If the message is a reply itself, we never generate the errors
  376. routing["reply"] = 3
  377. self.__msgq.process_command_send(sender, routing, data)
  378. check_no_message()
  379. # If there are recipients (but no "reply" header), the error should not
  380. # be sent and the message should get delivered.
  381. del routing["reply"]
  382. self.__msgq.subs.find = lambda group, instance: [recipient]
  383. self.__msgq.process_command_send(sender, routing, data)
  384. check_delivered()
  385. # When we send a direct message and the recipient is not there, we get
  386. # the error too
  387. routing["to"] = "lname"
  388. self.__msgq.process_command_send(sender, routing, data)
  389. check_error()
  390. # But when the recipient is there, it is delivered and no error is
  391. # generated.
  392. self.__msgq.lnames["lname"] = recipient
  393. self.__msgq.process_command_send(sender, routing, data)
  394. check_delivered()
  395. # If an attempt to send fails, consider it no recipient.
  396. def fail_send_prepared_msg(socket, msg):
  397. '''
  398. Pretend sending a message failed. After one call, return to the
  399. usual mock, so the errors or other messages can be sent.
  400. '''
  401. self.__msgq.send_prepared_msg = fake_send_prepared_msg
  402. return False
  403. self.__msgq.send_prepared_msg = fail_send_prepared_msg
  404. self.__msgq.process_command_send(sender, routing, data)
  405. check_error()
  406. # But if there are more recipients and only one fails, it should
  407. # be delivered to the other and not considered an error
  408. self.__msgq.send_prepared_msg = fail_send_prepared_msg
  409. routing["to"] = '*'
  410. self.__msgq.subs.find = lambda group, instance: [recipient,
  411. another_recipiet]
  412. self.__msgq.process_command_send(sender, routing, data)
  413. check_delivered(rcpt_socket=another_recipiet)
  414. class DummySocket:
  415. """
  416. Dummy socket class.
  417. This one does nothing at all, but some calls are used.
  418. It is mainly intended to override the listen socket for msgq, which
  419. we do not need in these tests.
  420. """
  421. def fileno():
  422. return -1
  423. def close():
  424. pass
  425. class BadSocket:
  426. """
  427. Special socket wrapper class. Once given a socket in its constructor,
  428. it completely behaves like that socket, except that its send() call
  429. will only actually send one byte per call, and optionally raise a given
  430. exception at a given time.
  431. """
  432. def __init__(self, real_socket, raise_on_send=0, send_exception=None):
  433. """
  434. Parameters:
  435. real_socket: The actual socket to wrap
  436. raise_on_send: integer. If higher than 0, and send_exception is
  437. not None, send_exception will be raised on the
  438. 'raise_on_send'th call to send().
  439. send_exception: if not None, this exception will be raised
  440. (if raise_on_send is not 0)
  441. """
  442. self.socket = real_socket
  443. self.send_count = 0
  444. self.raise_on_send = raise_on_send
  445. self.send_exception = send_exception
  446. # completely wrap all calls and member access
  447. # (except explicitly overridden ones)
  448. def __getattr__(self, name, *args):
  449. attr = getattr(self.socket, name)
  450. if isinstance(attr, collections.Callable):
  451. def callable_attr(*args):
  452. return attr.__call__(*args)
  453. return callable_attr
  454. else:
  455. return attr
  456. def send(self, data):
  457. self.send_count += 1
  458. if self.send_exception is not None and\
  459. self.send_count == self.raise_on_send:
  460. raise self.send_exception
  461. if len(data) > 0:
  462. return self.socket.send(data[:1])
  463. else:
  464. return 0
  465. class MsgQThread(threading.Thread):
  466. """
  467. Very simple thread class that runs msgq.run() when started,
  468. and stores the exception that msgq.run() raises, if any.
  469. """
  470. def __init__(self, msgq):
  471. threading.Thread.__init__(self)
  472. self.msgq_ = msgq
  473. self.caught_exception = None
  474. self.lock = threading.Lock()
  475. def run(self):
  476. try:
  477. self.msgq_.run()
  478. except Exception as exc:
  479. # Store the exception to make the test fail if necessary
  480. self.caught_exception = exc
  481. def stop(self):
  482. self.msgq_.stop()
  483. class SendNonblock(unittest.TestCase):
  484. """
  485. Tests that the whole thing will not get blocked if someone does not read.
  486. """
  487. def terminate_check(self, task, timeout=30):
  488. """
  489. Runs task in separate process (task is a function) and checks
  490. it terminates sooner than timeout.
  491. """
  492. task_pid = os.fork()
  493. if task_pid == 0:
  494. # Kill the forked process after timeout by SIGALRM
  495. signal.alarm(timeout)
  496. # Run the task
  497. # If an exception happens or we run out of time, we terminate
  498. # with non-zero
  499. task()
  500. # If we got here, then everything worked well and in time
  501. # In that case, we terminate successfully
  502. os._exit(0) # needs exit code
  503. else:
  504. (pid, status) = os.waitpid(task_pid, 0)
  505. self.assertEqual(0, status,
  506. "The task did not complete successfully in time")
  507. def get_msgq_with_sockets(self):
  508. '''
  509. Create a message queue and prepare it for use with a socket pair.
  510. The write end is put into the message queue, so we can check it.
  511. It returns (msgq, read_end, write_end). It is expected the sockets
  512. are closed by the caller afterwards.
  513. Also check the sockets are registered correctly (eg. internal data
  514. structures are there for them).
  515. '''
  516. msgq = MsgQ()
  517. # We do only partial setup, so we don't create the listening socket
  518. (read, write) = socket.socketpair(socket.AF_UNIX, socket.SOCK_STREAM)
  519. msgq.register_socket(write)
  520. self.assertEqual(1, len(msgq.lnames))
  521. self.assertEqual(write, msgq.lnames[msgq.fd_to_lname[write.fileno()]])
  522. return (msgq, read, write)
  523. def infinite_sender(self, sender):
  524. """
  525. Sends data until an exception happens. socket.error is caught,
  526. as it means the socket got closed. Sender is called to actually
  527. send the data.
  528. """
  529. (msgq, read, write) = self.get_msgq_with_sockets()
  530. # Keep sending while it is not closed by the msgq
  531. try:
  532. while True:
  533. sender(msgq, write)
  534. except socket.error:
  535. pass
  536. # Explicitly close temporary socket pair as the Python
  537. # interpreter expects it. It may not be 100% exception safe,
  538. # but since this is only for tests we prefer brevity.
  539. # Actually, the write end is often closed by the sender.
  540. if write.fileno() != -1:
  541. # Some of the senders passed here kill the socket internally.
  542. # So kill it only if not yet done so. If the socket is closed,
  543. # it gets -1 as fileno().
  544. msgq.kill_socket(write.fileno(), write)
  545. self.assertFalse(msgq.lnames)
  546. self.assertFalse(msgq.fd_to_lname)
  547. read.close()
  548. def test_infinite_sendmsg(self):
  549. """
  550. Tries sending messages (and not reading them) until it either times
  551. out (in blocking call, wrong) or closes it (correct).
  552. """
  553. data = "data"
  554. for i in range(1, 10):
  555. data += data
  556. self.terminate_check(lambda: self.infinite_sender(
  557. lambda msgq, socket: msgq.sendmsg(socket, {}, {"message" : data})))
  558. def test_infinite_sendprepared(self):
  559. """
  560. Tries sending data (and not reading them) until it either times
  561. out (in blocking call, wrong) or closes it (correct).
  562. """
  563. data = b"data"
  564. for i in range(1, 10):
  565. data += data
  566. self.terminate_check(lambda: self.infinite_sender(
  567. lambda msgq, socket: msgq.send_prepared_msg(socket, data)))
  568. def test_sendprepared_success(self):
  569. '''
  570. Test the send_prepared_msg returns success when queueing messages.
  571. It does so on the first attempt (when it actually tries to send
  572. something to the socket) and on any attempt that follows and the
  573. buffer is already full.
  574. '''
  575. (msgq, read, write) = self.get_msgq_with_sockets()
  576. # Now keep sending until we fill in something into the internal
  577. # buffer.
  578. while not write.fileno() in msgq.sendbuffs:
  579. self.assertTrue(msgq.send_prepared_msg(write, b'data'))
  580. read.close()
  581. write.close()
  582. def test_sendprepared_epipe(self):
  583. '''
  584. Test the send_prepared_msg returns false when we try to queue a
  585. message and the other side is not there any more. It should be done
  586. with EPIPE, so not a fatal error.
  587. '''
  588. (msgq, read, write) = self.get_msgq_with_sockets()
  589. # Close one end. It should make a EPIPE on the other.
  590. read.close()
  591. # Now it should soft-fail
  592. self.assertFalse(msgq.send_prepared_msg(write, b'data'))
  593. write.close()
  594. def send_many(self, data):
  595. """
  596. Tries that sending a command many times and getting an answer works.
  597. """
  598. msgq = MsgQ()
  599. # msgq.run needs to compare with the listen_socket, so we provide
  600. # a replacement
  601. msgq.listen_socket = DummySocket
  602. (queue, out) = socket.socketpair(socket.AF_UNIX, socket.SOCK_STREAM)
  603. def run():
  604. length = len(data)
  605. queue_pid = os.fork()
  606. if queue_pid == 0:
  607. signal.alarm(120)
  608. msgq.setup_signalsock()
  609. msgq.register_socket(queue)
  610. msgq.run()
  611. msgq.cleanup_signalsock()
  612. else:
  613. try:
  614. def killall(signum, frame):
  615. os.kill(queue_pid, signal.SIGTERM)
  616. os._exit(1)
  617. signal.signal(signal.SIGALRM, killall)
  618. msg = msgq.preparemsg({"type" : "ping"}, data)
  619. now = time.clock()
  620. while time.clock() - now < 0.2:
  621. out.sendall(msg)
  622. # Check the answer
  623. (routing, received) = msgq.read_packet(out.fileno(),
  624. out)
  625. self.assertEqual({"type" : "pong"},
  626. isc.cc.message.from_wire(routing))
  627. self.assertEqual(data, received)
  628. finally:
  629. os.kill(queue_pid, signal.SIGTERM)
  630. self.terminate_check(run)
  631. # Explicitly close temporary socket pair as the Python
  632. # interpreter expects it. It may not be 100% exception safe,
  633. # but since this is only for tests we prefer brevity.
  634. queue.close()
  635. out.close()
  636. def test_small_sends(self):
  637. """
  638. Tests sending small data many times.
  639. """
  640. self.send_many(b"data")
  641. def test_large_sends(self):
  642. """
  643. Tests sending large data many times.
  644. """
  645. data = b"data"
  646. for i in range(1, 20):
  647. data = data + data
  648. self.send_many(data)
  649. def do_send(self, write, read, control_write, control_read,
  650. expect_arrive=True, expect_send_exception=None):
  651. """
  652. Makes a msgq object that is talking to itself,
  653. run it in a separate thread so we can use and
  654. test run().
  655. It is given two sets of connected sockets; write/read, and
  656. control_write/control_read. The former may be throwing errors
  657. and mangle data to test msgq. The second is mainly used to
  658. send msgq the stop command.
  659. (Note that the terms 'read' and 'write' are from the msgq
  660. point of view, so the test itself writes to 'control_read')
  661. Parameters:
  662. write: a socket that is used to send the data to
  663. read: a socket that is used to read the data from
  664. control_write: a second socket for communication with msgq
  665. control_read: a second socket for communication with msgq
  666. expect_arrive: if True, the read socket is read from, and the data
  667. that is read is expected to be the same as the data
  668. that has been sent to the write socket.
  669. expect_send_exception: if not None, this is the exception that is
  670. expected to be raised by msgq
  671. """
  672. # Some message and envelope data to send and check
  673. env = b'{"env": "foo"}'
  674. msg = b'{"msg": "bar"}'
  675. msgq = MsgQ()
  676. # Don't need a listen_socket
  677. msgq.listen_socket = DummySocket
  678. msgq.setup_signalsock()
  679. msgq.register_socket(write)
  680. msgq.register_socket(control_write)
  681. # Queue the message for sending
  682. msgq.sendmsg(write, env, msg)
  683. # Run it in a thread
  684. msgq_thread = MsgQThread(msgq)
  685. # If we're done, just kill it
  686. msgq_thread.start()
  687. if expect_arrive:
  688. (recv_env, recv_msg) = msgq.read_packet(read.fileno(),
  689. read)
  690. self.assertEqual(env, recv_env)
  691. self.assertEqual(msg, recv_msg)
  692. # Tell msgq to stop
  693. msg = msgq.preparemsg({"type" : "stop"})
  694. control_read.sendall(msg)
  695. # Wait for thread to stop if it hasn't already.
  696. # Put in a (long) timeout; the thread *should* stop, but if it
  697. # does not, we don't want the test to hang forever
  698. msgq_thread.join(60)
  699. # Fail the test if it didn't stop
  700. self.assertFalse(msgq_thread.isAlive(), "Thread did not stop")
  701. # Clean up some internals of msgq (usually called as part of
  702. # shutdown, but we skip that one here)
  703. msgq.cleanup_signalsock()
  704. # Check the exception from the thread, if any
  705. # First, if we didn't expect it; reraise it (to make test fail and
  706. # show the stacktrace for debugging)
  707. if expect_send_exception is None:
  708. if msgq_thread.caught_exception is not None:
  709. raise msgq_thread.caught_exception
  710. else:
  711. # If we *did* expect it, fail it there was none
  712. self.assertIsNotNone(msgq_thread.caught_exception)
  713. def do_send_with_send_error(self, raise_on_send, send_exception,
  714. expect_answer=True,
  715. expect_send_exception=None):
  716. """
  717. Sets up two connected sockets, wraps the sender socket into a BadSocket
  718. class, then performs a do_send() test.
  719. Parameters:
  720. raise_on_send: the byte at which send_exception should be raised
  721. (see BadSocket)
  722. send_exception: the exception to raise (see BadSocket)
  723. expect_answer: whether the send is expected to complete (and hence
  724. the read socket should get the message)
  725. expect_send_exception: the exception msgq is expected to raise when
  726. send_exception is raised by BadSocket.
  727. """
  728. (write, read) = socket.socketpair(socket.AF_UNIX, socket.SOCK_STREAM)
  729. (control_write, control_read) = socket.socketpair(socket.AF_UNIX,
  730. socket.SOCK_STREAM)
  731. badwrite = BadSocket(write, raise_on_send, send_exception)
  732. self.do_send(badwrite, read, control_write, control_read,
  733. expect_answer, expect_send_exception)
  734. write.close()
  735. read.close()
  736. control_write.close()
  737. control_read.close()
  738. def test_send_raise_recoverable(self):
  739. """
  740. Test whether msgq survices a recoverable socket errors when sending.
  741. Two tests are done: one where the error is raised on the 3rd octet,
  742. and one on the 23rd.
  743. """
  744. for err in [ errno.EAGAIN, errno.EWOULDBLOCK, errno.EINTR ]:
  745. sockerr = socket.error(err, 'Socket error')
  746. self.do_send_with_send_error(3, sockerr)
  747. self.do_send_with_send_error(23, sockerr)
  748. def test_send_raise_nonrecoverable(self):
  749. """
  750. Test whether msgq survives socket errors that are nonrecoverable
  751. (for said socket that is, i.e. EPIPE etc).
  752. Two tests are done: one where the error is raised on the 3rd octet,
  753. and one on the 23rd.
  754. """
  755. for err in [ errno.EPIPE, errno.ENOBUFS, errno.ECONNRESET ]:
  756. sockerr = socket.error(err, 'Socket error')
  757. self.do_send_with_send_error(3, sockerr, False)
  758. self.do_send_with_send_error(23, sockerr, False)
  759. def otest_send_raise_crash(self):
  760. """
  761. Test whether msgq does NOT survive on a general exception.
  762. Note, perhaps it should; but we'd have to first discuss and decide
  763. how it should recover (i.e. drop the socket and consider the client
  764. dead?
  765. It may be a coding problem in msgq itself, and we certainly don't
  766. want to ignore those.
  767. """
  768. sockerr = Exception("just some general exception")
  769. self.do_send_with_send_error(3, sockerr, False, sockerr)
  770. self.do_send_with_send_error(23, sockerr, False, sockerr)
  771. class ThreadTests(unittest.TestCase):
  772. """Test various things around thread synchronization."""
  773. def setUp(self):
  774. self.__msgq = MsgQ()
  775. self.__abort_wait = False
  776. self.__result = None
  777. self.__notify_thread = threading.Thread(target=self.__notify)
  778. self.__wait_thread = threading.Thread(target=self.__wait)
  779. # Make sure the threads are killed if left behind by the test.
  780. self.__notify_thread.daemon = True
  781. self.__wait_thread.daemon = True
  782. def __notify(self):
  783. """Call the cfgmgr_ready."""
  784. if self.__abort_wait:
  785. self.__msgq.cfgmgr_ready(False)
  786. else:
  787. self.__msgq.cfgmgr_ready()
  788. def __wait(self):
  789. """Wait for config manager and store the result."""
  790. self.__result = self.__msgq.wait_cfgmgr()
  791. def test_wait_cfgmgr(self):
  792. """One thread signals the config manager subscribed, the other
  793. waits for it. We then check it terminated correctly.
  794. """
  795. self.__notify_thread.start()
  796. self.__wait_thread.start()
  797. # Timeout to ensure the test terminates even on failure
  798. self.__wait_thread.join(60)
  799. self.assertTrue(self.__result)
  800. def test_wait_cfgmgr_2(self):
  801. """Same as test_wait_cfgmgr, but starting the threads in reverse order
  802. (the result should be the same).
  803. """
  804. self.__wait_thread.start()
  805. self.__notify_thread.start()
  806. # Timeout to ensure the test terminates even on failure
  807. self.__wait_thread.join(60)
  808. self.assertTrue(self.__result)
  809. def test_wait_abort(self):
  810. """Similar to test_wait_cfgmgr, but the config manager is never
  811. subscribed and it is aborted.
  812. """
  813. self.__abort_wait = True
  814. self.__wait_thread.start()
  815. self.__notify_thread.start()
  816. # Timeout to ensure the test terminates even on failure
  817. self.__wait_thread.join(60)
  818. self.assertIsNotNone(self.__result)
  819. self.assertFalse(self.__result)
  820. def __check_ready_and_abort(self):
  821. """Check that when we first say the config manager is ready and then
  822. try to abort, it uses the first result.
  823. """
  824. self.__msgq.cfgmgr_ready()
  825. self.__msgq.cfgmgr_ready(False)
  826. self.__result = self.__msgq.wait_cfgmgr()
  827. def test_ready_and_abort(self):
  828. """Perform the __check_ready_and_abort test, but in a separate thread,
  829. so in case something goes wrong with the synchronisation and it
  830. deadlocks, the test will terminate anyway.
  831. """
  832. test_thread = threading.Thread(target=self.__check_ready_and_abort)
  833. test_thread.daemon = True
  834. test_thread.start()
  835. test_thread.join(60)
  836. self.assertTrue(self.__result)
  837. class SocketTests(unittest.TestCase):
  838. '''Test cases for micro behaviors related to socket operations.
  839. Some cases are covered as part of other tests, but in this fixture
  840. we check more details of specific method related to socket operation,
  841. with the help of mock classes to avoid expensive overhead.
  842. '''
  843. class MockSocket():
  844. '''A mock socket used instead of standard socket objects.'''
  845. def __init__(self):
  846. self.ex_on_send = None # raised from send() if not None
  847. self.recv_result = b'test' # dummy data or exception
  848. self.blockings = [] # history of setblocking() params
  849. def setblocking(self, on):
  850. self.blockings.append(on)
  851. def send(self, data):
  852. if self.ex_on_send is not None:
  853. raise self.ex_on_send
  854. return 10 # arbitrary choice
  855. def recv(self, len):
  856. if isinstance(self.recv_result, Exception):
  857. raise self.recv_result
  858. ret = self.recv_result
  859. self.recv_result = b'' # if called again, return empty data
  860. return ret
  861. def fileno(self):
  862. return 42 # arbitrary choice
  863. class LoggerWrapper():
  864. '''A simple wrapper of logger to inspect log messages.'''
  865. def __init__(self, logger):
  866. self.error_called = 0
  867. self.warn_called = 0
  868. self.debug_called = 0
  869. self.orig_logger = logger
  870. def error(self, *args):
  871. self.error_called += 1
  872. self.orig_logger.error(*args)
  873. def warn(self, *args):
  874. self.warn_called += 1
  875. self.orig_logger.warn(*args)
  876. def debug(self, *args):
  877. self.debug_called += 1
  878. self.orig_logger.debug(*args)
  879. def mock_kill_socket(self, fileno, sock):
  880. '''A replacement of MsgQ.kill_socket method for inspection.'''
  881. self.__killed_socket = (fileno, sock)
  882. if fileno in self.__msgq.sockets:
  883. del self.__msgq.sockets[fileno]
  884. def setUp(self):
  885. self.__msgq = MsgQ()
  886. self.__msgq.kill_socket = self.mock_kill_socket
  887. self.__sock = self.MockSocket()
  888. self.__data = b'dummy'
  889. self.__msgq.sockets[42] = self.__sock
  890. self.__msgq.sendbuffs[42] = (None, b'testdata')
  891. self.__sock_error = socket.error()
  892. self.__killed_socket = None
  893. self.__logger = self.LoggerWrapper(msgq.logger)
  894. msgq.logger = self.__logger
  895. self.__orig_select = msgq.select.select
  896. def tearDown(self):
  897. msgq.logger = self.__logger.orig_logger
  898. msgq.select.select = self.__orig_select
  899. def test_send_data(self):
  900. # Successful case: _send_data() returns the hardcoded value, and
  901. # setblocking() is called twice with the expected parameters
  902. self.assertEqual(10, self.__msgq._send_data(self.__sock, self.__data))
  903. self.assertEqual([0, 1], self.__sock.blockings)
  904. self.assertIsNone(self.__killed_socket)
  905. def test_send_data_interrupt(self):
  906. '''send() is interrupted. send_data() returns 0, sock isn't killed.'''
  907. expected_blockings = []
  908. for eno in [errno.EAGAIN, errno.EWOULDBLOCK, errno.EINTR]:
  909. self.__sock_error.errno = eno
  910. self.__sock.ex_on_send = self.__sock_error
  911. self.assertEqual(0, self.__msgq._send_data(self.__sock,
  912. self.__data))
  913. expected_blockings.extend([0, 1])
  914. self.assertEqual(expected_blockings, self.__sock.blockings)
  915. self.assertIsNone(self.__killed_socket)
  916. def test_send_data_error(self):
  917. '''Unexpected error happens on send(). The socket is killed.
  918. If the error is EPIPE, it's logged at the warn level; otherwise
  919. an error message is logged.
  920. '''
  921. expected_blockings = []
  922. expected_errors = 0
  923. expected_warns = 0
  924. for eno in [errno.EPIPE, errno.ECONNRESET, errno.ENOBUFS]:
  925. self.__sock_error.errno = eno
  926. self.__sock.ex_on_send = self.__sock_error
  927. self.__killed_socket = None # clear any previuos value
  928. self.assertEqual(None, self.__msgq._send_data(self.__sock,
  929. self.__data))
  930. self.assertEqual((42, self.__sock), self.__killed_socket)
  931. expected_blockings.extend([0, 1])
  932. self.assertEqual(expected_blockings, self.__sock.blockings)
  933. if eno == errno.EPIPE:
  934. expected_warns += 1
  935. else:
  936. expected_errors += 1
  937. self.assertEqual(expected_errors, self.__logger.error_called)
  938. self.assertEqual(expected_warns, self.__logger.warn_called)
  939. def test_process_packet(self):
  940. '''Check some failure cases in handling an incoming message.'''
  941. expected_errors = 0
  942. expected_debugs = 0
  943. # if socket.recv() fails due to socket.error, it will be logged
  944. # as error and the socket will be killed regardless of errno.
  945. for eno in [errno.ENOBUFS, errno.ECONNRESET]:
  946. self.__sock_error.errno = eno
  947. self.__sock.recv_result = self.__sock_error
  948. self.__killed_socket = None # clear any previuos value
  949. self.__msgq.process_packet(42, self.__sock)
  950. self.assertEqual((42, self.__sock), self.__killed_socket)
  951. expected_errors += 1
  952. self.assertEqual(expected_errors, self.__logger.error_called)
  953. self.assertEqual(expected_debugs, self.__logger.debug_called)
  954. # if socket.recv() returns empty data, the result depends on whether
  955. # there's any preceding data; in the second case below, at least
  956. # 6 bytes of data will be expected, and the second call to our faked
  957. # recv() returns empty data. In that case it will be logged as error.
  958. for recv_data in [b'', b'short']:
  959. self.__sock.recv_result = recv_data
  960. self.__killed_socket = None
  961. self.__msgq.process_packet(42, self.__sock)
  962. self.assertEqual((42, self.__sock), self.__killed_socket)
  963. if len(recv_data) == 0:
  964. expected_debugs += 1
  965. else:
  966. expected_errors += 1
  967. self.assertEqual(expected_errors, self.__logger.error_called)
  968. self.assertEqual(expected_debugs, self.__logger.debug_called)
  969. def test_do_select(self):
  970. """
  971. Check the behaviour of the run_select method.
  972. In particular, check that we skip writing to the sockets we read,
  973. because a read may have side effects (like closing the socket) and
  974. we want to prevent strange behavior.
  975. """
  976. self.__read_called = []
  977. self.__write_called = []
  978. self.__reads = None
  979. self.__writes = None
  980. def do_read(fd, socket):
  981. self.__read_called.append(fd)
  982. self.__msgq.running = False
  983. def do_write(fd):
  984. self.__write_called.append(fd)
  985. self.__msgq.running = False
  986. self.__msgq.process_packet = do_read
  987. self.__msgq._process_write = do_write
  988. self.__msgq.fd_to_lname = {42: 'lname', 44: 'other', 45: 'unused'}
  989. # The do_select does index it, but just passes the value. So reuse
  990. # the dict to safe typing in the test.
  991. self.__msgq.sockets = self.__msgq.fd_to_lname
  992. self.__msgq.sendbuffs = {42: 'data', 43: 'data'}
  993. def my_select(reads, writes, errors):
  994. self.__reads = reads
  995. self.__writes = writes
  996. self.assertEqual([], errors)
  997. return ([42, 44], [42, 43], [])
  998. msgq.select.select = my_select
  999. self.__msgq.listen_socket = DummySocket
  1000. self.__msgq.running = True
  1001. self.__msgq.run_select()
  1002. self.assertEqual([42, 44], self.__read_called)
  1003. self.assertEqual([43], self.__write_called)
  1004. self.assertEqual({42, 44, 45}, set(self.__reads))
  1005. self.assertEqual({42, 43}, set(self.__writes))
  1006. if __name__ == '__main__':
  1007. isc.log.resetUnitTestRootLogger()
  1008. unittest.main()