# Copyright (C) 2010 Internet Systems Consortium. # # Permission to use, copy, modify, and distribute this software for any # purpose with or without fee is hereby granted, provided that the above # copyright notice and this permission notice appear in all copies. # # THE SOFTWARE IS PROVIDED "AS IS" AND INTERNET SYSTEMS CONSORTIUM # DISCLAIMS ALL WARRANTIES WITH REGARD TO THIS SOFTWARE INCLUDING ALL # IMPLIED WARRANTIES OF MERCHANTABILITY AND FITNESS. IN NO EVENT SHALL # INTERNET SYSTEMS CONSORTIUM BE LIABLE FOR ANY SPECIAL, DIRECT, # INDIRECT, OR CONSEQUENTIAL DAMAGES OR ANY DAMAGES WHATSOEVER RESULTING # FROM LOSS OF USE, DATA OR PROFITS, WHETHER IN AN ACTION OF CONTRACT, # NEGLIGENCE OR OTHER TORTIOUS ACTION, ARISING OUT OF OR IN CONNECTION # WITH THE USE OR PERFORMANCE OF THIS SOFTWARE. import unittest import sys import os import tempfile import time import socket from isc.notify import notify_out, SOCK_DATA import isc.log from isc.dns import * TESTDATA_SRCDIR = os.getenv("TESTDATASRCDIR") # our fake socket, where we can read and insert messages class MockSocket(): def __init__(self): self._local_sock, self._remote_sock = socket.socketpair() def connect(self, to): pass def fileno(self): return self._local_sock.fileno() def close(self): self._local_sock.close() self._remote_sock.close() def sendto(self, data, flag, dst): return self._local_sock.send(data) def recvfrom(self, length): data = self._local_sock.recv(length) return (data, None) # provide a remote end which can write data to MockSocket for testing. def remote_end(self): return self._remote_sock # We subclass the ZoneNotifyInfo class we're testing here, only # to override the create_socket() method. class MockZoneNotifyInfo(notify_out.ZoneNotifyInfo): def create_socket(self, addrinfo): super().create_socket(addrinfo) # before replacing the underlying socket, remember the address family # of the original socket so that tests can check that. self.sock_family = self._sock.family self._sock.close() self._sock = MockSocket() self._sock.family = self.sock_family return self._sock class TestZoneNotifyInfo(unittest.TestCase): def setUp(self): self.info = notify_out.ZoneNotifyInfo('example.net.', 'IN') def test_prepare_finish_notify_out(self): self.info.prepare_notify_out() self.assertNotEqual(self.info.notify_timeout, None) self.assertIsNone(self.info._notify_current) self.info.finish_notify_out() self.assertEqual(self.info._sock, None) self.assertEqual(self.info.notify_timeout, None) def test_set_next_notify_target(self): self.info.notify_slaves.append(('127.0.0.1', 53)) self.info.notify_slaves.append(('1.1.1.1', 5353)) self.info.prepare_notify_out() self.assertEqual(self.info.get_current_notify_target(), ('127.0.0.1', 53)) self.info.set_next_notify_target() self.assertEqual(self.info.get_current_notify_target(), ('1.1.1.1', 5353)) self.info.set_next_notify_target() self.assertIsNone(self.info.get_current_notify_target()) temp_info = notify_out.ZoneNotifyInfo('example.com.', 'IN') temp_info.prepare_notify_out() self.assertIsNone(temp_info.get_current_notify_target()) class TestNotifyOut(unittest.TestCase): def setUp(self): self._db_file = TESTDATA_SRCDIR + '/test.sqlite3' self._notifiedv4_zone_name = None def _dummy_counter_notifyoutv4(z): self._notifiedv4_zone_name = z self._notifiedv6_zone_name = None def _dummy_counter_notifyoutv6(z): self._notifiedv6_zone_name = z self._notify = notify_out.NotifyOut(self._db_file, counter_notifyoutv4=_dummy_counter_notifyoutv4, counter_notifyoutv6=_dummy_counter_notifyoutv6) self._notify._notify_infos[('example.com.', 'IN')] = MockZoneNotifyInfo('example.com.', 'IN') self._notify._notify_infos[('example.com.', 'CH')] = MockZoneNotifyInfo('example.com.', 'CH') self._notify._notify_infos[('example.net.', 'IN')] = MockZoneNotifyInfo('example.net.', 'IN') self._notify._notify_infos[('example.org.', 'IN')] = MockZoneNotifyInfo('example.org.', 'IN') self._notify._notify_infos[('example.org.', 'CH')] = MockZoneNotifyInfo('example.org.', 'CH') net_info = self._notify._notify_infos[('example.net.', 'IN')] net_info.notify_slaves.append(('127.0.0.1', 53)) net_info.notify_slaves.append(('1.1.1.1', 5353)) com_info = self._notify._notify_infos[('example.com.', 'IN')] com_info.notify_slaves.append(('1.1.1.1', 5353)) com_ch_info = self._notify._notify_infos[('example.com.', 'CH')] com_ch_info.notify_slaves.append(('1.1.1.1', 5353)) def test_send_notify(self): notify_out._MAX_NOTIFY_NUM = 2 self._notify._nonblock_event.clear() self.assertTrue(self._notify.send_notify('example.net')) self.assertTrue(self._notify._nonblock_event.isSet()) self.assertEqual(self._notify.notify_num, 1) self.assertEqual(self._notify._notifying_zones[0], ('example.net.', 'IN')) self.assertTrue(self._notify.send_notify('example.com')) self.assertEqual(self._notify.notify_num, 2) self.assertEqual(self._notify._notifying_zones[1], ('example.com.', 'IN')) # notify_num is equal to MAX_NOTIFY_NUM, append it to waiting_zones list. self._notify._nonblock_event.clear() self.assertTrue(self._notify.send_notify('example.com', 'CH')) # add waiting zones won't set nonblock_event. self.assertFalse(self._notify._nonblock_event.isSet()) self.assertEqual(self._notify.notify_num, 2) self.assertEqual(1, len(self._notify._waiting_zones)) # zone_id is already in notifying_zones list, append it to waiting_zones list. self.assertTrue(self._notify.send_notify('example.net')) self.assertEqual(2, len(self._notify._waiting_zones)) self.assertEqual(self._notify._waiting_zones[1], ('example.net.', 'IN')) # zone_id is already in waiting_zones list, skip it. self.assertTrue(self._notify.send_notify('example.net')) self.assertEqual(2, len(self._notify._waiting_zones)) # has no slave masters, skip it. self.assertTrue(self._notify.send_notify('example.org.', 'CH')) self.assertEqual(self._notify.notify_num, 2) self.assertEqual(2, len(self._notify._waiting_zones)) self.assertTrue(self._notify.send_notify('example.org.')) self.assertEqual(self._notify.notify_num, 2) self.assertEqual(2, len(self._notify._waiting_zones)) # zone does not exist, should return False, and no change in other # values self.assertFalse(self._notify.send_notify('does.not.exist.')) self.assertEqual(self._notify.notify_num, 2) self.assertEqual(2, len(self._notify._waiting_zones)) self.assertFalse(self._notify.send_notify('example.net.', 'CH')) self.assertEqual(self._notify.notify_num, 2) self.assertEqual(2, len(self._notify._waiting_zones)) def test_wait_for_notify_reply(self): self._notify.send_notify('example.net.') self._notify.send_notify('example.com.') notify_out._MAX_NOTIFY_NUM = 2 self._notify.send_notify('example.org.') replied_zones, timeout_zones = self._notify._wait_for_notify_reply() self.assertEqual(len(replied_zones), 0) self.assertEqual(len(timeout_zones), 2) # Trigger timeout events to "send" notifies via a mock socket for zone in timeout_zones: self._notify._zone_notify_handler(timeout_zones[zone], notify_out._EVENT_TIMEOUT) # Now make one socket be readable self._notify._notify_infos[('example.net.', 'IN')].notify_timeout = time.time() + 10 self._notify._notify_infos[('example.com.', 'IN')].notify_timeout = time.time() + 10 #Send some data to socket 12340, to make the target socket be readable self._notify._notify_infos[('example.net.', 'IN')]._sock.remote_end().send(b'data') replied_zones, timeout_zones = self._notify._wait_for_notify_reply() self.assertEqual(len(replied_zones), 1) self.assertEqual(len(timeout_zones), 1) self.assertTrue(('example.net.', 'IN') in replied_zones.keys()) self.assertTrue(('example.com.', 'IN') in timeout_zones.keys()) self.assertLess(time.time(), self._notify._notify_infos[('example.com.', 'IN')].notify_timeout) def test_wait_for_notify_reply_2(self): # Test the returned value when the read_side socket is readable. self._notify.send_notify('example.net.') self._notify.send_notify('example.com.') # Now make one socket be readable self._notify._notify_infos[('example.net.', 'IN')].notify_timeout = time.time() + 10 self._notify._notify_infos[('example.com.', 'IN')].notify_timeout = time.time() + 10 if self._notify._read_sock is not None: self._notify._read_sock.close() if self._notify._write_sock is not None: self._notify._write_sock.close() self._notify._read_sock, self._notify._write_sock = socket.socketpair() self._notify._write_sock.send(SOCK_DATA) replied_zones, timeout_zones = self._notify._wait_for_notify_reply() self.assertEqual(0, len(replied_zones)) self.assertEqual(0, len(timeout_zones)) def test_notify_next_target(self): self._notify.send_notify('example.net.') self._notify.send_notify('example.com.') notify_out._MAX_NOTIFY_NUM = 2 # zone example.org. has no slave servers. self._notify.send_notify('example.org.') self._notify.send_notify('example.com.', 'CH') info = self._notify._notify_infos[('example.net.', 'IN')] self._notify._notify_next_target(info) self.assertEqual(0, info.notify_try_num) self.assertEqual(info.get_current_notify_target(), ('1.1.1.1', 5353)) self.assertEqual(2, self._notify.notify_num) self.assertEqual(1, len(self._notify._waiting_zones)) self._notify._notify_next_target(info) self.assertEqual(0, info.notify_try_num) self.assertIsNone(info.get_current_notify_target()) self.assertEqual(2, self._notify.notify_num) self.assertEqual(0, len(self._notify._waiting_zones)) example_com_info = self._notify._notify_infos[('example.com.', 'IN')] self._notify._notify_next_target(example_com_info) self.assertEqual(1, self._notify.notify_num) self.assertEqual(1, len(self._notify._notifying_zones)) self.assertEqual(0, len(self._notify._waiting_zones)) def test_handle_notify_reply(self): fake_address = ('192.0.2.1', 53) self.assertEqual(notify_out._BAD_REPLY_PACKET, self._notify._handle_notify_reply(None, b'badmsg', fake_address)) example_com_info = self._notify._notify_infos[('example.com.', 'IN')] example_com_info.notify_msg_id = 0X2f18 # test with right notify reply message data = b'\x2f\x18\xa0\x00\x00\x01\x00\x00\x00\x00\x00\x00\x07example\03com\x00\x00\x06\x00\x01' self.assertEqual(notify_out._REPLY_OK, self._notify._handle_notify_reply(example_com_info, data, fake_address)) # test with unright query id data = b'\x2e\x18\xa0\x00\x00\x01\x00\x00\x00\x00\x00\x00\x07example\03com\x00\x00\x06\x00\x01' self.assertEqual(notify_out._BAD_QUERY_ID, self._notify._handle_notify_reply(example_com_info, data, fake_address)) # test with unright query name data = b'\x2f\x18\xa0\x00\x00\x01\x00\x00\x00\x00\x00\x00\x07example\03net\x00\x00\x06\x00\x01' self.assertEqual(notify_out._BAD_QUERY_NAME, self._notify._handle_notify_reply(example_com_info, data, fake_address)) # test with unright opcode data = b'\x2f\x18\x80\x00\x00\x01\x00\x00\x00\x00\x00\x00\x07example\03com\x00\x00\x06\x00\x01' self.assertEqual(notify_out._BAD_OPCODE, self._notify._handle_notify_reply(example_com_info, data, fake_address)) # test with unright qr data = b'\x2f\x18\x10\x10\x00\x01\x00\x00\x00\x00\x00\x00\x07example\03com\x00\x00\x06\x00\x01' self.assertEqual(notify_out._BAD_QR, self._notify._handle_notify_reply(example_com_info, data, fake_address)) def test_send_notify_message_udp_ipv4(self): example_com_info = self._notify._notify_infos[('example.net.', 'IN')] example_com_info.prepare_notify_out() self.assertIsNone(self._notifiedv4_zone_name) self.assertIsNone(self._notifiedv6_zone_name) ret = self._notify._send_notify_message_udp(example_com_info, ('192.0.2.1', 53)) self.assertTrue(ret) self.assertEqual(socket.AF_INET, example_com_info.sock_family) self.assertEqual(self._notifiedv4_zone_name, 'example.net.') self.assertIsNone(self._notifiedv6_zone_name) def test_send_notify_message_udp_ipv6(self): example_com_info = self._notify._notify_infos[('example.net.', 'IN')] self.assertIsNone(self._notifiedv4_zone_name) self.assertIsNone(self._notifiedv6_zone_name) ret = self._notify._send_notify_message_udp(example_com_info, ('2001:db8::53', 53)) self.assertTrue(ret) self.assertEqual(socket.AF_INET6, example_com_info.sock_family) self.assertIsNone(self._notifiedv4_zone_name) self.assertEqual(self._notifiedv6_zone_name, 'example.net.') def test_send_notify_message_udp_ipv4_with_nonetype_notifyoutv4(self): example_com_info = self._notify._notify_infos[('example.net.', 'IN')] example_com_info.prepare_notify_out() self.assertIsNone(self._notifiedv4_zone_name) self.assertIsNone(self._notifiedv6_zone_name) self._notify._counter_notifyoutv4 = None self._notify._send_notify_message_udp(example_com_info, ('192.0.2.1', 53)) self.assertIsNone(self._notifiedv4_zone_name) self.assertIsNone(self._notifiedv6_zone_name) def test_send_notify_message_udp_ipv4_with_notcallable_notifyoutv4(self): example_com_info = self._notify._notify_infos[('example.net.', 'IN')] example_com_info.prepare_notify_out() self._notify._counter_notifyoutv4 = 'NOT CALLABLE' self.assertRaises(TypeError, self._notify._send_notify_message_udp, example_com_info, ('192.0.2.1', 53)) def test_send_notify_message_udp_ipv6_with_nonetype_notifyoutv6(self): example_com_info = self._notify._notify_infos[('example.net.', 'IN')] self.assertIsNone(self._notifiedv4_zone_name) self.assertIsNone(self._notifiedv6_zone_name) self._notify._counter_notifyoutv6 = None self._notify._send_notify_message_udp(example_com_info, ('2001:db8::53', 53)) self.assertIsNone(self._notifiedv4_zone_name) self.assertIsNone(self._notifiedv6_zone_name) def test_send_notify_message_udp_ipv6_with_notcallable_notifyoutv6(self): example_com_info = self._notify._notify_infos[('example.net.', 'IN')] self._notify._counter_notifyoutv6 = 'NOT CALLABLE' self.assertRaises(TypeError, self._notify._send_notify_message_udp, example_com_info, ('2001:db8::53', 53)) def test_send_notify_message_with_bogus_address(self): example_com_info = self._notify._notify_infos[('example.net.', 'IN')] # As long as the underlying data source validates RDATA this shouldn't # happen, but right now it's not actually the case. Even if the # data source does its job, it's prudent to confirm the behavior for # an unexpected case. self.assertIsNone(self._notifiedv4_zone_name) self.assertIsNone(self._notifiedv6_zone_name) ret = self._notify._send_notify_message_udp(example_com_info, ('invalid', 53)) self.assertFalse(ret) self.assertIsNone(self._notifiedv4_zone_name) self.assertIsNone(self._notifiedv6_zone_name) def test_zone_notify_handler(self): old_send_msg = self._notify._send_notify_message_udp def _fake_send_notify_message_udp(va1, va2): pass self._notify._send_notify_message_udp = _fake_send_notify_message_udp self._notify.send_notify('example.net.') self._notify.send_notify('example.com.') notify_out._MAX_NOTIFY_NUM = 2 self._notify.send_notify('example.org.') example_net_info = self._notify._notify_infos[('example.net.', 'IN')] example_net_info.prepare_notify_out() example_net_info.notify_try_num = 2 self._notify._zone_notify_handler(example_net_info, notify_out._EVENT_TIMEOUT) self.assertEqual(3, example_net_info.notify_try_num) time1 = example_net_info.notify_timeout self._notify._zone_notify_handler(example_net_info, notify_out._EVENT_TIMEOUT) self.assertEqual(4, example_net_info.notify_try_num) self.assertGreater(example_net_info.notify_timeout, time1 + 2) # bigger than 2 seconds cur_tgt = example_net_info._notify_current example_net_info.notify_try_num = notify_out._MAX_NOTIFY_TRY_NUM self._notify._zone_notify_handler(example_net_info, notify_out._EVENT_NONE) self.assertNotEqual(cur_tgt, example_net_info._notify_current) cur_tgt = example_net_info._notify_current example_net_info.create_socket('127.0.0.1') # dns message, will result in bad_qid, but what we are testing # here is whether handle_notify_reply is called correctly example_net_info._sock.remote_end().send(b'\x2f\x18\xa0\x00\x00\x01\x00\x00\x00\x00\x00\x00\x07example\03com\x00\x00\x06\x00\x01') self._notify._zone_notify_handler(example_net_info, notify_out._EVENT_READ) self.assertNotEqual(cur_tgt, example_net_info._notify_current) def test_get_notify_slaves_from_ns(self): records = self._notify._get_notify_slaves_from_ns(Name('example.net.'), RRClass.IN) self.assertEqual(6, len(records)) self.assertEqual('8:8::8:8', records[5]) self.assertEqual('7.7.7.7', records[4]) self.assertEqual('6.6.6.6', records[3]) self.assertEqual('5:5::5:5', records[2]) self.assertEqual('4:4::4:4', records[1]) self.assertEqual('3.3.3.3', records[0]) records = self._notify._get_notify_slaves_from_ns(Name('example.com.'), RRClass.IN) self.assertEqual(3, len(records)) self.assertEqual('5:5::5:5', records[2]) self.assertEqual('4:4::4:4', records[1]) self.assertEqual('3.3.3.3', records[0]) def test_get_notify_slaves_from_ns_unusual(self): self._notify._db_file = TESTDATA_SRCDIR + '/brokentest.sqlite3' self.assertEqual([], self._notify._get_notify_slaves_from_ns( Name('nons.example'), RRClass.IN)) self.assertEqual([], self._notify._get_notify_slaves_from_ns( Name('nosoa.example'), RRClass.IN)) self.assertEqual([], self._notify._get_notify_slaves_from_ns( Name('multisoa.example'), RRClass.IN)) self.assertEqual([], self._notify._get_notify_slaves_from_ns( Name('nosuchzone.example'), RRClass.IN)) # This will cause failure in getting access to the data source. self._notify._db_file = TESTDATA_SRCDIR + '/nodir/error.sqlite3' self.assertEqual([], self._notify._get_notify_slaves_from_ns( Name('example.com'), RRClass.IN)) def test_init_notify_out(self): self._notify._init_notify_out(self._db_file) self.assertListEqual([('3.3.3.3', 53), ('4:4::4:4', 53), ('5:5::5:5', 53)], self._notify._notify_infos[('example.com.', 'IN')].notify_slaves) def test_prepare_select_info(self): timeout, valid_fds, notifying_zones = self._notify._prepare_select_info() self.assertEqual(None, timeout) self.assertListEqual([], valid_fds) self._notify._notify_infos[('example.net.', 'IN')]._sock = 1 self._notify._notify_infos[('example.net.', 'IN')].notify_timeout = time.time() + 5 timeout, valid_fds, notifying_zones = self._notify._prepare_select_info() self.assertGreater(timeout, 0) self.assertListEqual([1], valid_fds) self._notify._notify_infos[('example.net.', 'IN')]._sock = 1 self._notify._notify_infos[('example.net.', 'IN')].notify_timeout = time.time() - 5 timeout, valid_fds, notifying_zones = self._notify._prepare_select_info() self.assertEqual(timeout, 0) self.assertListEqual([1], valid_fds) self._notify._notify_infos[('example.com.', 'IN')]._sock = 2 self._notify._notify_infos[('example.com.', 'IN')].notify_timeout = time.time() + 5 timeout, valid_fds, notifying_zones = self._notify._prepare_select_info() self.assertEqual(timeout, 0) self.assertEqual(len(valid_fds), 2) self.assertIn(1, valid_fds) self.assertIn(2, valid_fds) def test_shutdown(self): thread = self._notify.dispatcher() self.assertTrue(thread.is_alive()) # nonblock_event won't be setted since there are no notifying zones. self.assertFalse(self._notify._nonblock_event.isSet()) # set nonblock_event manually self._notify._nonblock_event.set() # nonblock_event will be cleared soon since there are no notifying zones. while (self._notify._nonblock_event.isSet()): pass # send notify example_net_info = self._notify._notify_infos[('example.net.', 'IN')] example_net_info.notify_slaves = [('127.0.0.1', 53)] example_net_info.create_socket('127.0.0.1') self._notify.send_notify('example.net') self.assertTrue(self._notify._nonblock_event.isSet()) # set notify_try_num to _MAX_NOTIFY_TRY_NUM, zone 'example.net' will be removed # from notifying zones soon and nonblock_event will be cleared since there is no # notifying zone left. example_net_info.notify_try_num = notify_out._MAX_NOTIFY_TRY_NUM while (self._notify._nonblock_event.isSet()): pass self.assertFalse(self._notify._nonblock_event.isSet()) self._notify.shutdown() # nonblock_event should have been setted to stop waiting. self.assertTrue(self._notify._nonblock_event.isSet()) self.assertFalse(thread.is_alive()) if __name__== "__main__": isc.log.init("bind10") isc.log.resetUnitTestRootLogger() unittest.main()