12345678910111213141516171819202122232425262728293031323334353637383940414243444546474849505152535455565758596061626364656667686970717273747576777879808182838485868788899091929394959697989910010110210310410510610710810911011111211311411511611711811912012112212312412512612712812913013113213313413513613713813914014114214314414514614714814915015115215315415515615715815916016116216316416516616716816917017117217317417517617717817918018118218318418518618718818919019119219319419519619719819920020120220320420520620720820921021121221321421521621721821922022122222322422522622722822923023123223323423523623723823924024124224324424524624724824925025125225325425525625725825926026126226326426526626726826927027127227327427527627727827928028128228328428528628728828929029129229329429529629729829930030130230330430530630730830931031131231331431531631731831932032132232332432532632732832933033133233333433533633733833934034134234334434534634734834935035135235335435535635735835936036136236336436536636736836937037137237337437537637737837938038138238338438538638738838939039139239339439539639739839940040140240340440540640740840941041141241341441541641741841942042142242342442542642742842943043143243343443543643743843944044144244344444544644744844945045145245345445545645745845946046146246346446546646746846947047147247347447547647747847948048148248348448548648748848949049149249349449549649749849950050150250350450550650750850951051151251351451551651751851952052152252352452552652752852953053153253353453553653753853954054154254354454554654754854955055155255355455555655755855956056156256356456556656756856957057157257357457557657757857958058158258358458558658758858959059159259359459559659759859960060160260360460560660760860961061161261361461561661761861962062162262362462562662762862963063163263363463563663763863964064164264364464564664764864965065165265365465565665765865966066166266366466566666766866967067167267367467567667767867968068168268368468568668768868969069169269369469569669769869970070170270370470570670770870971071171271371471571671771871972072172272372472572672772872973073173273373473573673773873974074174274374474574674774874975075175275375475575675775875976076176276376476576676776876977077177277377477577677777877978078178278378478578678778878979079179279379479579679779879980080180280380480580680780880981081181281381481581681781881982082182282382482582682782882983083183283383483583683783883984084184284384484584684784884985085185285385485585685785885986086186286386486586686786886987087187287387487587687787887988088188288388488588688788888989089189289389489589689789889990090190290390490590690790890991091191291391491591691791891992092192292392492592692792892993093193293393493593693793893994094194294394494594694794894995095195295395495595695795895996096196296396496596696796896997097197297397497597697797897998098198298398498598698798898999099199299399499599699799899910001001100210031004100510061007100810091010101110121013101410151016101710181019102010211022102310241025102610271028102910301031103210331034103510361037103810391040104110421043104410451046104710481049105010511052105310541055105610571058105910601061106210631064106510661067106810691070107110721073107410751076107710781079108010811082108310841085108610871088108910901091109210931094109510961097109810991100110111021103110411051106110711081109111011111112111311141115111611171118111911201121 |
- # Copyright (C) 2010-2013 Internet Systems Consortium.
- #
- # Permission to use, copy, modify, and distribute this software for any
- # purpose with or without fee is hereby granted, provided that the above
- # copyright notice and this permission notice appear in all copies.
- #
- # THE SOFTWARE IS PROVIDED "AS IS" AND INTERNET SYSTEMS CONSORTIUM
- # DISCLAIMS ALL WARRANTIES WITH REGARD TO THIS SOFTWARE INCLUDING ALL
- # IMPLIED WARRANTIES OF MERCHANTABILITY AND FITNESS. IN NO EVENT SHALL
- # INTERNET SYSTEMS CONSORTIUM BE LIABLE FOR ANY SPECIAL, DIRECT,
- # INDIRECT, OR CONSEQUENTIAL DAMAGES OR ANY DAMAGES WHATSOEVER RESULTING
- # FROM LOSS OF USE, DATA OR PROFITS, WHETHER IN AN ACTION OF CONTRACT,
- # NEGLIGENCE OR OTHER TORTIOUS ACTION, ARISING OUT OF OR IN CONNECTION
- # WITH THE USE OR PERFORMANCE OF THIS SOFTWARE.
- import msgq
- from msgq import SubscriptionManager, MsgQ
- import unittest
- import os
- import socket
- import select # needed only for #3014. can be removed once it's solved
- import signal
- import sys
- import time
- import errno
- import threading
- import isc.cc
- import collections
- import isc.log
- import struct
- import json
- #
- # Currently only the subscription part and some sending is implemented...
- # I'd have to mock out a socket, which, while not impossible, is not trivial.
- #
- class TestSubscriptionManager(unittest.TestCase):
- def setUp(self):
- self.__cfgmgr_ready_called = 0
- self.sm = SubscriptionManager(self.cfgmgr_ready)
- def cfgmgr_ready(self):
- # Called one more time
- self.__cfgmgr_ready_called += 1
- def test_subscription_add_delete_manager(self):
- self.sm.subscribe("a", "*", 'sock1')
- self.assertEqual(self.sm.find_sub("a", "*"), [ 'sock1' ])
- def test_subscription_add_delete_other(self):
- self.sm.subscribe("a", "*", 'sock1')
- self.sm.unsubscribe("a", "*", 'sock2')
- self.assertEqual(self.sm.find_sub("a", "*"), [ 'sock1' ])
- def test_subscription_add_several_sockets(self):
- socks = [ 's1', 's2', 's3', 's4', 's5' ]
- for s in socks:
- self.sm.subscribe("a", "*", s)
- self.assertEqual(self.sm.find_sub("a", "*"), socks)
- def test_unsubscribe(self):
- socks = [ 's1', 's2', 's3', 's4', 's5' ]
- for s in socks:
- self.sm.subscribe("a", "*", s)
- self.assertTrue(self.sm.unsubscribe("a", "*", 's3'))
- # Unsubscribe from group it is not in
- self.assertFalse(self.sm.unsubscribe("a", "*", 's42'))
- self.assertEqual(self.sm.find_sub("a", "*"),
- [ 's1', 's2', 's4', 's5' ])
- def test_unsubscribe_all(self):
- self.sm.subscribe('g1', 'i1', 's1')
- self.sm.subscribe('g1', 'i1', 's2')
- self.sm.subscribe('g1', 'i2', 's1')
- self.sm.subscribe('g1', 'i2', 's2')
- self.sm.subscribe('g2', 'i1', 's1')
- self.sm.subscribe('g2', 'i1', 's2')
- self.sm.subscribe('g2', 'i2', 's1')
- self.sm.subscribe('g2', 'i2', 's2')
- self.assertEqual(set([('g1', 'i1'), ('g1', 'i2'), ('g2', 'i1'),
- ('g2', 'i2')]),
- set(self.sm.unsubscribe_all('s1')))
- self.assertEqual(self.sm.find_sub("g1", "i1"), [ 's2' ])
- self.assertEqual(self.sm.find_sub("g1", "i2"), [ 's2' ])
- self.assertEqual(self.sm.find_sub("g2", "i1"), [ 's2' ])
- self.assertEqual(self.sm.find_sub("g2", "i2"), [ 's2' ])
- def test_find(self):
- self.sm.subscribe('g1', 'i1', 's1')
- self.sm.subscribe('g1', '*', 's2')
- self.assertEqual(set(self.sm.find("g1", "i1")), set([ 's1', 's2' ]))
- def test_find_sub(self):
- self.sm.subscribe('g1', 'i1', 's1')
- self.sm.subscribe('g1', '*', 's2')
- self.assertEqual(self.sm.find_sub("g1", "i1"), [ 's1' ])
- def test_open_socket_parameter(self):
- self.assertFalse(os.path.exists("./my_socket_file"))
- msgq = MsgQ("./my_socket_file");
- msgq.setup()
- self.assertTrue(os.path.exists("./my_socket_file"))
- msgq.shutdown();
- self.assertFalse(os.path.exists("./my_socket_file"))
- def test_open_socket_environment_variable(self):
- self.assertFalse(os.path.exists("my_socket_file"))
- os.environ["BIND10_MSGQ_SOCKET_FILE"] = "./my_socket_file"
- msgq = MsgQ();
- msgq.setup()
- self.assertTrue(os.path.exists("./my_socket_file"))
- msgq.shutdown();
- self.assertFalse(os.path.exists("./my_socket_file"))
- def test_open_socket_default(self):
- env_var = None
- orig_socket_file = None
- if "BIND10_MSGQ_SOCKET_FILE" in os.environ:
- env_var = os.environ["BIND10_MSGQ_SOCKET_FILE"]
- del os.environ["BIND10_MSGQ_SOCKET_FILE"]
- # temporarily replace the class "default" not to be disrupted by
- # any running BIND 10 instance.
- if "BIND10_TEST_SOCKET_FILE" in os.environ:
- MsgQ.SOCKET_FILE = os.environ["BIND10_TEST_SOCKET_FILE"]
- socket_file = MsgQ.SOCKET_FILE
- self.assertFalse(os.path.exists(socket_file))
- msgq = MsgQ();
- try:
- msgq.setup()
- self.assertTrue(os.path.exists(socket_file))
- msgq.shutdown()
- self.assertFalse(os.path.exists(socket_file))
- except socket.error:
- # ok, the install path doesn't exist at all,
- # so we can't check any further
- pass
- if env_var is not None:
- os.environ["BIND10_MSGQ_SOCKET_FILE"] = env_var
- if orig_socket_file is not None:
- MsgQ.SOCKET_FILE = orig_socket_file
- def test_open_socket_bad(self):
- msgq = MsgQ("/does/not/exist")
- self.assertRaises(socket.error, msgq.setup)
- # But we can clean up after that.
- msgq.shutdown()
- def test_subscribe_cfgmgr(self):
- """Test special handling of the config manager. Once it subscribes,
- the message queue needs to connect and read the config. But not
- before and only once.
- """
- self.assertEqual(0, self.__cfgmgr_ready_called)
- # Not called when something else subscribes
- self.sm.subscribe('SomethingElse', '*', 's1')
- self.assertEqual(0, self.__cfgmgr_ready_called)
- # Called whenever the config manager subscribes
- self.sm.subscribe('ConfigManager', '*', 's2')
- self.assertEqual(1, self.__cfgmgr_ready_called)
- # But not called again when it subscribes again (should not
- # happen in practice, but we make sure anyway)
- self.sm.subscribe('ConfigManager', '*', 's3')
- self.assertEqual(1, self.__cfgmgr_ready_called)
- class MsgQTest(unittest.TestCase):
- """
- Tests for the behaviour of MsgQ. This is for the core of MsgQ, other
- subsystems are in separate test fixtures.
- """
- def setUp(self):
- self.__msgq = MsgQ()
- def parse_msg(self, msg):
- """
- Parse a binary representation of message to the routing header and the
- data payload. It assumes the message is correctly encoded and the
- payload is not omitted. It'd probably throw in other cases, but we
- don't use it in such situations in this test.
- """
- (length, header_len) = struct.unpack('>IH', msg[:6])
- header = json.loads(msg[6:6 + header_len].decode('utf-8'))
- data = json.loads(msg[6 + header_len:].decode('utf-8'))
- return (header, data)
- def test_unknown_command(self):
- """
- Test the command handler returns error when the command is unknown.
- """
- # Fake we are running, to disable test workarounds
- self.__msgq.running = True
- self.assertEqual({'result': [1, "unknown command: unknown"]},
- self.__msgq.command_handler('unknown', {}))
- def test_get_members(self):
- """
- Test getting members of a group or of all connected clients.
- """
- # Push two dummy "clients" into msgq (the ugly way, by directly
- # tweaking relevant data structures).
- class Sock:
- def __init__(self, fileno):
- self.fileno = lambda: fileno
- self.__msgq.lnames['first'] = Sock(1)
- self.__msgq.lnames['second'] = Sock(2)
- self.__msgq.fd_to_lname[1] = 'first'
- self.__msgq.fd_to_lname[2] = 'second'
- # Subscribe them to some groups
- self.__msgq.process_command_subscribe(self.__msgq.lnames['first'],
- {'group': 'G1', 'instance': '*'},
- None)
- self.__msgq.process_command_subscribe(self.__msgq.lnames['second'],
- {'group': 'G1', 'instance': '*'},
- None)
- self.__msgq.process_command_subscribe(self.__msgq.lnames['second'],
- {'group': 'G2', 'instance': '*'},
- None)
- # Now query content of some groups through the command handler.
- self.__msgq.running = True # Enable the command handler
- def check_both(result):
- """
- Check the result is successful one and it contains both lnames (in
- any order).
- """
- array = result['result'][1]
- self.assertEqual(set(['first', 'second']), set(array))
- self.assertEqual({'result': [0, array]}, result)
- # Make sure the result can be encoded as JSON
- # (there seems to be types that look like a list but JSON choks
- # on them)
- json.dumps(result)
- # Members of the G1 and G2
- self.assertEqual({'result': [0, ['second']]},
- self.__msgq.command_handler('members',
- {'group': 'G2'}))
- check_both(self.__msgq.command_handler('members', {'group': 'G1'}))
- # We pretend that all the possible groups exist, just that most
- # of them are empty. So requesting for Empty is request for an empty
- # group and should not fail.
- self.assertEqual({'result': [0, []]},
- self.__msgq.command_handler('members',
- {'group': 'Empty'}))
- # Without the name of the group, we just get all the clients.
- check_both(self.__msgq.command_handler('members', {}))
- # Omitting the parameters completely in such case is OK
- check_both(self.__msgq.command_handler('members', None))
- def notifications_setup(self):
- """
- Common setup of some notifications tests. Mock several things.
- """
- # Mock the method to send notifications (we don't really want
- # to send them now, just see they'd be sent).
- # Mock the poller, as we don't need it at all (and we don't have
- # real socket to give it now).
- notifications = []
- def send_notification(event, params):
- notifications.append((event, params))
- class FakePoller:
- def register(self, socket, mode):
- pass
- def unregister(self, sock):
- pass
- self.__msgq.members_notify = send_notification
- self.__msgq.poller = FakePoller()
- # Create a socket
- class Sock:
- def __init__(self, fileno):
- self.fileno = lambda: fileno
- def close(self):
- pass
- sock = Sock(1)
- return notifications, sock
- def test_notifies(self):
- """
- Test the message queue sends notifications about connecting,
- disconnecting and subscription changes.
- """
- notifications, sock = self.notifications_setup()
- # We should notify about new cliend when we register it
- self.__msgq.register_socket(sock)
- lname = self.__msgq.fd_to_lname[1] # Steal the lname
- self.assertEqual([('connected', {'client': lname})], notifications)
- del notifications[:]
- # A notification should happen for a subscription to a group
- self.__msgq.process_command_subscribe(sock, {'group': 'G',
- 'instance': '*'},
- None)
- self.assertEqual([('subscribed', {'client': lname, 'group': 'G'})],
- notifications)
- del notifications[:]
- # As well for unsubscription
- self.__msgq.process_command_unsubscribe(sock, {'group': 'G',
- 'instance': '*'},
- None)
- self.assertEqual([('unsubscribed', {'client': lname, 'group': 'G'})],
- notifications)
- del notifications[:]
- # Unsubscription from a group it isn't subscribed to
- self.__msgq.process_command_unsubscribe(sock, {'group': 'H',
- 'instance': '*'},
- None)
- self.assertEqual([], notifications)
- # And, finally, for removal of client
- self.__msgq.kill_socket(sock.fileno(), sock)
- self.assertEqual([('disconnected', {'client': lname})], notifications)
- def test_notifies_implicit_kill(self):
- """
- Test that the unsubscription notifications are sent before the socket
- is dropped, even in case it does not unsubscribe explicitly.
- """
- notifications, sock = self.notifications_setup()
- # Register and subscribe. Notifications for these are in above test.
- self.__msgq.register_socket(sock)
- lname = self.__msgq.fd_to_lname[1] # Steal the lname
- self.__msgq.process_command_subscribe(sock, {'group': 'G',
- 'instance': '*'},
- None)
- del notifications[:]
- self.__msgq.kill_socket(sock.fileno(), sock)
- # Now, the notification for unsubscribe should be first, second for
- # the disconnection.
- self.assertEqual([('unsubscribed', {'client': lname, 'group': 'G'}),
- ('disconnected', {'client': lname})
- ], notifications)
- def test_undeliverable_errors(self):
- """
- Send several packets through the MsgQ and check it generates
- undeliverable notifications under the correct circumstances.
- The test is not exhaustive as it doesn't test all combination
- of existence of the recipient, addressing schemes, want_answer
- header and the reply header. It is not needed, these should
- be mostly independent. That means, for example, if the message
- is a reply and there's no recipient to send it to, the error
- would not be generated no matter if we addressed the recipient
- by lname or group. If we included everything, the test would
- have too many scenarios with little benefit.
- """
- self.__sent_messages = []
- def fake_send_prepared_msg(socket, msg):
- self.__sent_messages.append((socket, msg))
- return True
- self.__msgq.send_prepared_msg = fake_send_prepared_msg
- # These would be real sockets in the MsgQ, but we pass them as
- # parameters only, so we don't need them to be. We use simple
- # integers to tell one from another.
- sender = 1
- recipient = 2
- another_recipiet = 3
- # The routing headers and data to test with.
- routing = {
- 'to': '*',
- 'from': 'sender',
- 'group': 'group',
- 'instance': '*',
- 'seq': 42
- }
- data = {
- "data": "Just some data"
- }
- # Some common checking patterns
- def check_error():
- self.assertEqual(1, len(self.__sent_messages))
- self.assertEqual(1, self.__sent_messages[0][0])
- self.assertEqual(({
- 'group': 'group',
- 'instance': '*',
- 'reply': 42,
- 'seq': 42,
- 'from': 'msgq',
- 'to': 'sender',
- 'want_answer': True
- }, {'result': [-1, "No such recipient"]}),
- self.parse_msg(self.__sent_messages[0][1]))
- self.__sent_messages = []
- def check_no_message():
- self.assertEqual([], self.__sent_messages)
- def check_delivered(rcpt_socket=recipient):
- self.assertEqual(1, len(self.__sent_messages))
- self.assertEqual(rcpt_socket, self.__sent_messages[0][0])
- self.assertEqual((routing, data),
- self.parse_msg(self.__sent_messages[0][1]))
- self.__sent_messages = []
- # Send the message. No recipient, but errors are not requested,
- # so none is generated.
- self.__msgq.process_command_send(sender, routing, data)
- check_no_message()
- # It should act the same if we explicitly say we do not want replies.
- routing["want_answer"] = False
- self.__msgq.process_command_send(sender, routing, data)
- check_no_message()
- # Ask for errors if it can't be delivered.
- routing["want_answer"] = True
- self.__msgq.process_command_send(sender, routing, data)
- check_error()
- # If the message is a reply itself, we never generate the errors
- routing["reply"] = 3
- self.__msgq.process_command_send(sender, routing, data)
- check_no_message()
- # If there are recipients (but no "reply" header), the error should not
- # be sent and the message should get delivered.
- del routing["reply"]
- self.__msgq.subs.find = lambda group, instance: [recipient]
- self.__msgq.process_command_send(sender, routing, data)
- check_delivered()
- # When we send a direct message and the recipient is not there, we get
- # the error too
- routing["to"] = "lname"
- self.__msgq.process_command_send(sender, routing, data)
- check_error()
- # But when the recipient is there, it is delivered and no error is
- # generated.
- self.__msgq.lnames["lname"] = recipient
- self.__msgq.process_command_send(sender, routing, data)
- check_delivered()
- # If an attempt to send fails, consider it no recipient.
- def fail_send_prepared_msg(socket, msg):
- '''
- Pretend sending a message failed. After one call, return to the
- usual mock, so the errors or other messages can be sent.
- '''
- self.__msgq.send_prepared_msg = fake_send_prepared_msg
- return False
- self.__msgq.send_prepared_msg = fail_send_prepared_msg
- self.__msgq.process_command_send(sender, routing, data)
- check_error()
- # But if there are more recipients and only one fails, it should
- # be delivered to the other and not considered an error
- self.__msgq.send_prepared_msg = fail_send_prepared_msg
- routing["to"] = '*'
- self.__msgq.subs.find = lambda group, instance: [recipient,
- another_recipiet]
- self.__msgq.process_command_send(sender, routing, data)
- check_delivered(rcpt_socket=another_recipiet)
- class DummySocket:
- """
- Dummy socket class.
- This one does nothing at all, but some calls are used.
- It is mainly intended to override the listen socket for msgq, which
- we do not need in these tests.
- """
- def fileno():
- return -1
- def close():
- pass
- class BadSocket:
- """
- Special socket wrapper class. Once given a socket in its constructor,
- it completely behaves like that socket, except that its send() call
- will only actually send one byte per call, and optionally raise a given
- exception at a given time.
- """
- def __init__(self, real_socket, raise_on_send=0, send_exception=None):
- """
- Parameters:
- real_socket: The actual socket to wrap
- raise_on_send: integer. If higher than 0, and send_exception is
- not None, send_exception will be raised on the
- 'raise_on_send'th call to send().
- send_exception: if not None, this exception will be raised
- (if raise_on_send is not 0)
- """
- self.socket = real_socket
- self.send_count = 0
- self.raise_on_send = raise_on_send
- self.send_exception = send_exception
- # completely wrap all calls and member access
- # (except explicitly overridden ones)
- def __getattr__(self, name, *args):
- attr = getattr(self.socket, name)
- if isinstance(attr, collections.Callable):
- def callable_attr(*args):
- return attr.__call__(*args)
- return callable_attr
- else:
- return attr
- def send(self, data):
- self.send_count += 1
- if self.send_exception is not None and\
- self.send_count == self.raise_on_send:
- raise self.send_exception
- if len(data) > 0:
- return self.socket.send(data[:1])
- else:
- return 0
- class MsgQThread(threading.Thread):
- """
- Very simple thread class that runs msgq.run() when started,
- and stores the exception that msgq.run() raises, if any.
- """
- def __init__(self, msgq):
- threading.Thread.__init__(self)
- self.msgq_ = msgq
- self.caught_exception = None
- self.lock = threading.Lock()
- def run(self):
- try:
- self.msgq_.run()
- except Exception as exc:
- # Store the exception to make the test fail if necessary
- self.caught_exception = exc
- def stop(self):
- self.msgq_.stop()
- class SendNonblock(unittest.TestCase):
- """
- Tests that the whole thing will not get blocked if someone does not read.
- """
- def terminate_check(self, task, timeout=30):
- """
- Runs task in separate process (task is a function) and checks
- it terminates sooner than timeout.
- """
- task_pid = os.fork()
- if task_pid == 0:
- # Kill the forked process after timeout by SIGALRM
- signal.alarm(timeout)
- # Run the task
- # If an exception happens or we run out of time, we terminate
- # with non-zero
- task()
- # If we got here, then everything worked well and in time
- # In that case, we terminate successfully
- os._exit(0) # needs exit code
- else:
- (pid, status) = os.waitpid(task_pid, 0)
- self.assertEqual(0, status,
- "The task did not complete successfully in time")
- def get_msgq_with_sockets(self):
- '''
- Create a message queue and prepare it for use with a socket pair.
- The write end is put into the message queue, so we can check it.
- It returns (msgq, read_end, write_end). It is expected the sockets
- are closed by the caller afterwards.
- Also check the sockets are registered correctly (eg. internal data
- structures are there for them).
- '''
- msgq = MsgQ()
- # We do only partial setup, so we don't create the listening socket
- (read, write) = socket.socketpair(socket.AF_UNIX, socket.SOCK_STREAM)
- msgq.register_socket(write)
- self.assertEqual(1, len(msgq.lnames))
- self.assertEqual(write, msgq.lnames[msgq.fd_to_lname[write.fileno()]])
- return (msgq, read, write)
- def infinite_sender(self, sender):
- """
- Sends data until an exception happens. socket.error is caught,
- as it means the socket got closed. Sender is called to actually
- send the data.
- """
- (msgq, read, write) = self.get_msgq_with_sockets()
- # Keep sending while it is not closed by the msgq
- try:
- while True:
- sender(msgq, write)
- except socket.error:
- pass
- # Explicitly close temporary socket pair as the Python
- # interpreter expects it. It may not be 100% exception safe,
- # but since this is only for tests we prefer brevity.
- # Actually, the write end is often closed by the sender.
- if write.fileno() != -1:
- # Some of the senders passed here kill the socket internally.
- # So kill it only if not yet done so. If the socket is closed,
- # it gets -1 as fileno().
- msgq.kill_socket(write.fileno(), write)
- self.assertFalse(msgq.lnames)
- self.assertFalse(msgq.fd_to_lname)
- read.close()
- def test_infinite_sendmsg(self):
- """
- Tries sending messages (and not reading them) until it either times
- out (in blocking call, wrong) or closes it (correct).
- """
- data = "data"
- for i in range(1, 10):
- data += data
- self.terminate_check(lambda: self.infinite_sender(
- lambda msgq, socket: msgq.sendmsg(socket, {}, {"message" : data})))
- def test_infinite_sendprepared(self):
- """
- Tries sending data (and not reading them) until it either times
- out (in blocking call, wrong) or closes it (correct).
- """
- data = b"data"
- for i in range(1, 10):
- data += data
- self.terminate_check(lambda: self.infinite_sender(
- lambda msgq, socket: msgq.send_prepared_msg(socket, data)))
- def test_sendprepared_success(self):
- '''
- Test the send_prepared_msg returns success when queueing messages.
- It does so on the first attempt (when it actually tries to send
- something to the socket) and on any attempt that follows and the
- buffer is already full.
- '''
- (msgq, read, write) = self.get_msgq_with_sockets()
- # Now keep sending until we fill in something into the internal
- # buffer.
- while not write.fileno() in msgq.sendbuffs:
- self.assertTrue(msgq.send_prepared_msg(write, b'data'))
- read.close()
- write.close()
- def test_sendprepared_epipe(self):
- '''
- Test the send_prepared_msg returns false when we try to queue a
- message and the other side is not there any more. It should be done
- with EPIPE, so not a fatal error.
- '''
- (msgq, read, write) = self.get_msgq_with_sockets()
- # Close one end. It should make a EPIPE on the other.
- read.close()
- # Now it should soft-fail
- self.assertFalse(msgq.send_prepared_msg(write, b'data'))
- write.close()
- def send_many(self, data):
- """
- Tries that sending a command many times and getting an answer works.
- """
- msgq = MsgQ()
- # msgq.run needs to compare with the listen_socket, so we provide
- # a replacement
- msgq.listen_socket = DummySocket
- (queue, out) = socket.socketpair(socket.AF_UNIX, socket.SOCK_STREAM)
- def run():
- length = len(data)
- queue_pid = os.fork()
- if queue_pid == 0:
- signal.alarm(120)
- msgq.setup_signalsock()
- msgq.register_socket(queue)
- msgq.run()
- msgq.cleanup_signalsock()
- else:
- try:
- def killall(signum, frame):
- os.kill(queue_pid, signal.SIGTERM)
- os._exit(1)
- signal.signal(signal.SIGALRM, killall)
- msg = msgq.preparemsg({"type" : "ping"}, data)
- now = time.clock()
- while time.clock() - now < 0.2:
- out.sendall(msg)
- # Check the answer
- (routing, received) = msgq.read_packet(out.fileno(),
- out)
- self.assertEqual({"type" : "pong"},
- isc.cc.message.from_wire(routing))
- self.assertEqual(data, received)
- finally:
- os.kill(queue_pid, signal.SIGTERM)
- self.terminate_check(run)
- # Explicitly close temporary socket pair as the Python
- # interpreter expects it. It may not be 100% exception safe,
- # but since this is only for tests we prefer brevity.
- queue.close()
- out.close()
- def test_small_sends(self):
- """
- Tests sending small data many times.
- """
- self.send_many(b"data")
- def test_large_sends(self):
- """
- Tests sending large data many times.
- """
- data = b"data"
- for i in range(1, 20):
- data = data + data
- self.send_many(data)
- def do_send(self, write, read, control_write, control_read,
- expect_arrive=True, expect_send_exception=None):
- """
- Makes a msgq object that is talking to itself,
- run it in a separate thread so we can use and
- test run().
- It is given two sets of connected sockets; write/read, and
- control_write/control_read. The former may be throwing errors
- and mangle data to test msgq. The second is mainly used to
- send msgq the stop command.
- (Note that the terms 'read' and 'write' are from the msgq
- point of view, so the test itself writes to 'control_read')
- Parameters:
- write: a socket that is used to send the data to
- read: a socket that is used to read the data from
- control_write: a second socket for communication with msgq
- control_read: a second socket for communication with msgq
- expect_arrive: if True, the read socket is read from, and the data
- that is read is expected to be the same as the data
- that has been sent to the write socket.
- expect_send_exception: if not None, this is the exception that is
- expected to be raised by msgq
- """
- # Some message and envelope data to send and check
- env = b'{"env": "foo"}'
- msg = b'{"msg": "bar"}'
- msgq = MsgQ()
- # Don't need a listen_socket
- msgq.listen_socket = DummySocket
- msgq.setup_signalsock()
- msgq.register_socket(write)
- msgq.register_socket(control_write)
- # Queue the message for sending
- msgq.sendmsg(write, env, msg)
- # Run it in a thread
- msgq_thread = MsgQThread(msgq)
- # If we're done, just kill it
- msgq_thread.start()
- if expect_arrive:
- (recv_env, recv_msg) = msgq.read_packet(read.fileno(),
- read)
- self.assertEqual(env, recv_env)
- self.assertEqual(msg, recv_msg)
- # Tell msgq to stop
- msg = msgq.preparemsg({"type" : "stop"})
- control_read.sendall(msg)
- # Wait for thread to stop if it hasn't already.
- # Put in a (long) timeout; the thread *should* stop, but if it
- # does not, we don't want the test to hang forever
- msgq_thread.join(60)
- # Fail the test if it didn't stop
- self.assertFalse(msgq_thread.isAlive(), "Thread did not stop")
- # Clean up some internals of msgq (usually called as part of
- # shutdown, but we skip that one here)
- msgq.cleanup_signalsock()
- # Check the exception from the thread, if any
- # First, if we didn't expect it; reraise it (to make test fail and
- # show the stacktrace for debugging)
- if expect_send_exception is None:
- if msgq_thread.caught_exception is not None:
- raise msgq_thread.caught_exception
- else:
- # If we *did* expect it, fail it there was none
- self.assertIsNotNone(msgq_thread.caught_exception)
- def do_send_with_send_error(self, raise_on_send, send_exception,
- expect_answer=True,
- expect_send_exception=None):
- """
- Sets up two connected sockets, wraps the sender socket into a BadSocket
- class, then performs a do_send() test.
- Parameters:
- raise_on_send: the byte at which send_exception should be raised
- (see BadSocket)
- send_exception: the exception to raise (see BadSocket)
- expect_answer: whether the send is expected to complete (and hence
- the read socket should get the message)
- expect_send_exception: the exception msgq is expected to raise when
- send_exception is raised by BadSocket.
- """
- (write, read) = socket.socketpair(socket.AF_UNIX, socket.SOCK_STREAM)
- (control_write, control_read) = socket.socketpair(socket.AF_UNIX,
- socket.SOCK_STREAM)
- badwrite = BadSocket(write, raise_on_send, send_exception)
- self.do_send(badwrite, read, control_write, control_read,
- expect_answer, expect_send_exception)
- write.close()
- read.close()
- control_write.close()
- control_read.close()
- def test_send_raise_recoverable(self):
- """
- Test whether msgq survices a recoverable socket errors when sending.
- Two tests are done: one where the error is raised on the 3rd octet,
- and one on the 23rd.
- """
- for err in [ errno.EAGAIN, errno.EWOULDBLOCK, errno.EINTR ]:
- sockerr = socket.error(err, 'Socket error')
- self.do_send_with_send_error(3, sockerr)
- self.do_send_with_send_error(23, sockerr)
- def test_send_raise_nonrecoverable(self):
- """
- Test whether msgq survives socket errors that are nonrecoverable
- (for said socket that is, i.e. EPIPE etc).
- Two tests are done: one where the error is raised on the 3rd octet,
- and one on the 23rd.
- """
- for err in [ errno.EPIPE, errno.ENOBUFS, errno.ECONNRESET ]:
- sockerr = socket.error(err, 'Socket error')
- self.do_send_with_send_error(3, sockerr, False)
- self.do_send_with_send_error(23, sockerr, False)
- def otest_send_raise_crash(self):
- """
- Test whether msgq does NOT survive on a general exception.
- Note, perhaps it should; but we'd have to first discuss and decide
- how it should recover (i.e. drop the socket and consider the client
- dead?
- It may be a coding problem in msgq itself, and we certainly don't
- want to ignore those.
- """
- sockerr = Exception("just some general exception")
- self.do_send_with_send_error(3, sockerr, False, sockerr)
- self.do_send_with_send_error(23, sockerr, False, sockerr)
- class ThreadTests(unittest.TestCase):
- """Test various things around thread synchronization."""
- def setUp(self):
- self.__msgq = MsgQ()
- self.__abort_wait = False
- self.__result = None
- self.__notify_thread = threading.Thread(target=self.__notify)
- self.__wait_thread = threading.Thread(target=self.__wait)
- # Make sure the threads are killed if left behind by the test.
- self.__notify_thread.daemon = True
- self.__wait_thread.daemon = True
- def __notify(self):
- """Call the cfgmgr_ready."""
- if self.__abort_wait:
- self.__msgq.cfgmgr_ready(False)
- else:
- self.__msgq.cfgmgr_ready()
- def __wait(self):
- """Wait for config manager and store the result."""
- self.__result = self.__msgq.wait_cfgmgr()
- def test_wait_cfgmgr(self):
- """One thread signals the config manager subscribed, the other
- waits for it. We then check it terminated correctly.
- """
- self.__notify_thread.start()
- self.__wait_thread.start()
- # Timeout to ensure the test terminates even on failure
- self.__wait_thread.join(60)
- self.assertTrue(self.__result)
- def test_wait_cfgmgr_2(self):
- """Same as test_wait_cfgmgr, but starting the threads in reverse order
- (the result should be the same).
- """
- self.__wait_thread.start()
- self.__notify_thread.start()
- # Timeout to ensure the test terminates even on failure
- self.__wait_thread.join(60)
- self.assertTrue(self.__result)
- def test_wait_abort(self):
- """Similar to test_wait_cfgmgr, but the config manager is never
- subscribed and it is aborted.
- """
- self.__abort_wait = True
- self.__wait_thread.start()
- self.__notify_thread.start()
- # Timeout to ensure the test terminates even on failure
- self.__wait_thread.join(60)
- self.assertIsNotNone(self.__result)
- self.assertFalse(self.__result)
- def __check_ready_and_abort(self):
- """Check that when we first say the config manager is ready and then
- try to abort, it uses the first result.
- """
- self.__msgq.cfgmgr_ready()
- self.__msgq.cfgmgr_ready(False)
- self.__result = self.__msgq.wait_cfgmgr()
- def test_ready_and_abort(self):
- """Perform the __check_ready_and_abort test, but in a separate thread,
- so in case something goes wrong with the synchronisation and it
- deadlocks, the test will terminate anyway.
- """
- test_thread = threading.Thread(target=self.__check_ready_and_abort)
- test_thread.daemon = True
- test_thread.start()
- test_thread.join(60)
- self.assertTrue(self.__result)
- class SocketTests(unittest.TestCase):
- '''Test cases for micro behaviors related to socket operations.
- Some cases are covered as part of other tests, but in this fixture
- we check more details of specific method related to socket operation,
- with the help of mock classes to avoid expensive overhead.
- '''
- class MockSocket():
- '''A mock socket used instead of standard socket objects.'''
- def __init__(self):
- self.ex_on_send = None # raised from send() if not None
- self.recv_result = b'test' # dummy data or exception
- self.blockings = [] # history of setblocking() params
- def setblocking(self, on):
- self.blockings.append(on)
- def send(self, data):
- if self.ex_on_send is not None:
- raise self.ex_on_send
- return 10 # arbitrary choice
- def recv(self, len):
- if isinstance(self.recv_result, Exception):
- raise self.recv_result
- ret = self.recv_result
- self.recv_result = b'' # if called again, return empty data
- return ret
- def fileno(self):
- return 42 # arbitrary choice
- class LoggerWrapper():
- '''A simple wrapper of logger to inspect log messages.'''
- def __init__(self, logger):
- self.error_called = 0
- self.warn_called = 0
- self.debug_called = 0
- self.orig_logger = logger
- def error(self, *args):
- self.error_called += 1
- self.orig_logger.error(*args)
- def warn(self, *args):
- self.warn_called += 1
- self.orig_logger.warn(*args)
- def debug(self, *args):
- self.debug_called += 1
- self.orig_logger.debug(*args)
- def mock_kill_socket(self, fileno, sock):
- '''A replacement of MsgQ.kill_socket method for inspection.'''
- self.__killed_socket = (fileno, sock)
- if fileno in self.__msgq.sockets:
- del self.__msgq.sockets[fileno]
- def setUp(self):
- self.__msgq = MsgQ()
- self.__msgq.kill_socket = self.mock_kill_socket
- self.__sock = self.MockSocket()
- self.__data = b'dummy'
- self.__msgq.sockets[42] = self.__sock
- self.__msgq.sendbuffs[42] = (None, b'testdata')
- self.__sock_error = socket.error()
- self.__killed_socket = None
- self.__logger = self.LoggerWrapper(msgq.logger)
- msgq.logger = self.__logger
- self.__orig_select = msgq.select.select
- def tearDown(self):
- msgq.logger = self.__logger.orig_logger
- msgq.select.select = self.__orig_select
- def test_send_data(self):
- # Successful case: _send_data() returns the hardcoded value, and
- # setblocking() is called twice with the expected parameters
- self.assertEqual(10, self.__msgq._send_data(self.__sock, self.__data))
- self.assertEqual([0, 1], self.__sock.blockings)
- self.assertIsNone(self.__killed_socket)
- def test_send_data_interrupt(self):
- '''send() is interrupted. send_data() returns 0, sock isn't killed.'''
- expected_blockings = []
- for eno in [errno.EAGAIN, errno.EWOULDBLOCK, errno.EINTR]:
- self.__sock_error.errno = eno
- self.__sock.ex_on_send = self.__sock_error
- self.assertEqual(0, self.__msgq._send_data(self.__sock,
- self.__data))
- expected_blockings.extend([0, 1])
- self.assertEqual(expected_blockings, self.__sock.blockings)
- self.assertIsNone(self.__killed_socket)
- def test_send_data_error(self):
- '''Unexpected error happens on send(). The socket is killed.
- If the error is EPIPE, it's logged at the warn level; otherwise
- an error message is logged.
- '''
- expected_blockings = []
- expected_errors = 0
- expected_warns = 0
- for eno in [errno.EPIPE, errno.ECONNRESET, errno.ENOBUFS]:
- self.__sock_error.errno = eno
- self.__sock.ex_on_send = self.__sock_error
- self.__killed_socket = None # clear any previuos value
- self.assertEqual(None, self.__msgq._send_data(self.__sock,
- self.__data))
- self.assertEqual((42, self.__sock), self.__killed_socket)
- expected_blockings.extend([0, 1])
- self.assertEqual(expected_blockings, self.__sock.blockings)
- if eno == errno.EPIPE:
- expected_warns += 1
- else:
- expected_errors += 1
- self.assertEqual(expected_errors, self.__logger.error_called)
- self.assertEqual(expected_warns, self.__logger.warn_called)
- def test_process_packet(self):
- '''Check some failure cases in handling an incoming message.'''
- expected_errors = 0
- expected_debugs = 0
- # if socket.recv() fails due to socket.error, it will be logged
- # as error and the socket will be killed regardless of errno.
- for eno in [errno.ENOBUFS, errno.ECONNRESET]:
- self.__sock_error.errno = eno
- self.__sock.recv_result = self.__sock_error
- self.__killed_socket = None # clear any previuos value
- self.__msgq.process_packet(42, self.__sock)
- self.assertEqual((42, self.__sock), self.__killed_socket)
- expected_errors += 1
- self.assertEqual(expected_errors, self.__logger.error_called)
- self.assertEqual(expected_debugs, self.__logger.debug_called)
- # if socket.recv() returns empty data, the result depends on whether
- # there's any preceding data; in the second case below, at least
- # 6 bytes of data will be expected, and the second call to our faked
- # recv() returns empty data. In that case it will be logged as error.
- for recv_data in [b'', b'short']:
- self.__sock.recv_result = recv_data
- self.__killed_socket = None
- self.__msgq.process_packet(42, self.__sock)
- self.assertEqual((42, self.__sock), self.__killed_socket)
- if len(recv_data) == 0:
- expected_debugs += 1
- else:
- expected_errors += 1
- self.assertEqual(expected_errors, self.__logger.error_called)
- self.assertEqual(expected_debugs, self.__logger.debug_called)
- def test_do_select(self):
- """
- Check the behaviour of the run_select method.
- In particular, check that we skip writing to the sockets we read,
- because a read may have side effects (like closing the socket) and
- we want to prevent strange behavior.
- """
- self.__read_called = []
- self.__write_called = []
- self.__reads = None
- self.__writes = None
- def do_read(fd, socket):
- self.__read_called.append(fd)
- self.__msgq.running = False
- def do_write(fd):
- self.__write_called.append(fd)
- self.__msgq.running = False
- self.__msgq.process_packet = do_read
- self.__msgq._process_write = do_write
- self.__msgq.fd_to_lname = {42: 'lname', 44: 'other', 45: 'unused'}
- # The do_select does index it, but just passes the value. So reuse
- # the dict to safe typing in the test.
- self.__msgq.sockets = self.__msgq.fd_to_lname
- self.__msgq.sendbuffs = {42: 'data', 43: 'data'}
- def my_select(reads, writes, errors):
- self.__reads = reads
- self.__writes = writes
- self.assertEqual([], errors)
- return ([42, 44], [42, 43], [])
- msgq.select.select = my_select
- self.__msgq.listen_socket = DummySocket
- self.__msgq.running = True
- self.__msgq.run_select()
- self.assertEqual([42, 44], self.__read_called)
- self.assertEqual([43], self.__write_called)
- self.assertEqual({42, 44, 45}, set(self.__reads))
- self.assertEqual({42, 43}, set(self.__writes))
- if __name__ == '__main__':
- isc.log.resetUnitTestRootLogger()
- unittest.main()
|