123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240 |
- # Copyright (C) 2013 Internet Systems Consortium.
- #
- # Permission to use, copy, modify, and distribute this software for any
- # purpose with or without fee is hereby granted, provided that the above
- # copyright notice and this permission notice appear in all copies.
- #
- # THE SOFTWARE IS PROVIDED "AS IS" AND INTERNET SYSTEMS CONSORTIUM
- # DISCLAIMS ALL WARRANTIES WITH REGARD TO THIS SOFTWARE INCLUDING ALL
- # IMPLIED WARRANTIES OF MERCHANTABILITY AND FITNESS. IN NO EVENT SHALL
- # INTERNET SYSTEMS CONSORTIUM BE LIABLE FOR ANY SPECIAL, DIRECT,
- # INDIRECT, OR CONSEQUENTIAL DAMAGES OR ANY DAMAGES WHATSOEVER RESULTING
- # FROM LOSS OF USE, DATA OR PROFITS, WHETHER IN AN ACTION OF CONTRACT,
- # NEGLIGENCE OR OTHER TORTIOUS ACTION, ARISING OUT OF OR IN CONNECTION
- # WITH THE USE OR PERFORMANCE OF THIS SOFTWARE.
- import unittest
- import os
- import socket
- import select
- import threading
- import isc.log
- from isc.dns import *
- import isc.datasrc
- from isc.memmgr.builder import *
- from isc.server_common.datasrc_clients_mgr import DataSrcClientsMgr
- from isc.memmgr.datasrc_info import *
- TESTDATA_PATH = os.environ['TESTDATA_PATH'] + os.sep
- # Defined for easier tests with DataSrcClientsMgr.reconfigure(), which
- # only needs get_value() method
- class MockConfigData:
- def __init__(self, data):
- self.__data = data
- def get_value(self, identifier):
- return self.__data[identifier], False
- class TestMemorySegmentBuilder(unittest.TestCase):
- def _create_builder_thread(self):
- (self._master_sock, self._builder_sock) = \
- socket.socketpair(socket.AF_UNIX, socket.SOCK_STREAM)
- self._builder_command_queue = []
- self._builder_response_queue = []
- self._builder_lock = threading.Lock()
- self._builder_cv = threading.Condition(lock=self._builder_lock)
- self._builder = MemorySegmentBuilder(self._builder_sock,
- self._builder_cv,
- self._builder_command_queue,
- self._builder_response_queue)
- self._builder_thread = threading.Thread(target=self._builder.run)
- def setUp(self):
- self._create_builder_thread()
- self.__mapped_file_path = None
- def tearDown(self):
- # It's the tests' responsibility to stop and join the builder
- # thread if they start it.
- self.assertFalse(self._builder_thread.isAlive())
- self._master_sock.close()
- self._builder_sock.close()
- if self.__mapped_file_path is not None:
- if os.path.exists(self.__mapped_file_path):
- os.unlink(self.__mapped_file_path)
- def test_bad_command(self):
- """Tests what happens when a bad command is passed to the
- MemorySegmentBuilder.
- """
- self._builder_thread.start()
- # Now that the builder thread is running, send it a bad
- # command. The thread should exit its main loop and be joinable.
- with self._builder_cv:
- self._builder_command_queue.append(('bad_command',))
- self._builder_cv.notify_all()
- # Wait 5 seconds to receive a notification on the socket from
- # the builder.
- (reads, _, _) = select.select([self._master_sock], [], [], 5)
- self.assertTrue(self._master_sock in reads)
- # Reading 1 byte should not block us here, especially as the
- # socket is ready to read. It's a hack, but this is just a
- # testcase.
- got = self._master_sock.recv(1)
- self.assertEqual(got, b'x')
- # Wait 5 seconds at most for the main loop of the builder to
- # exit.
- self._builder_thread.join(5)
- self.assertFalse(self._builder_thread.isAlive())
- # The command queue must be cleared, and the response queue must
- # contain a response that a bad command was sent. The thread is
- # no longer running, so we can use the queues without a lock.
- self.assertEqual(len(self._builder_command_queue), 0)
- self.assertEqual(len(self._builder_response_queue), 1)
- response = self._builder_response_queue[0]
- self.assertTrue(isinstance(response, tuple))
- self.assertTupleEqual(response, ('bad_command',))
- del self._builder_response_queue[:]
- def test_shutdown(self):
- """Tests that shutdown command exits the MemorySegmentBuilder
- loop.
- """
- self._builder_thread.start()
- # Now that the builder thread is running, send it the "shutdown"
- # command. The thread should exit its main loop and be joinable.
- with self._builder_cv:
- self._builder_command_queue.append(('shutdown',))
- # Commands after 'shutdown' must be ignored.
- self._builder_command_queue.append(('bad_command_1',))
- self._builder_command_queue.append(('bad_command_2',))
- self._builder_cv.notify_all()
- # Wait 5 seconds at most for the main loop of the builder to
- # exit.
- self._builder_thread.join(5)
- self.assertFalse(self._builder_thread.isAlive())
- # The command queue must be cleared, and the response queue must
- # be untouched (we don't use it in this test). The thread is no
- # longer running, so we can use the queues without a lock.
- self.assertEqual(len(self._builder_command_queue), 0)
- self.assertEqual(len(self._builder_response_queue), 0)
- @unittest.skipIf(os.environ['HAVE_SHARED_MEMORY'] != 'yes',
- 'shared memory is not available')
- def test_load(self):
- """
- Test "load" command.
- """
- mapped_file_dir = os.environ['TESTDATA_WRITE_PATH']
- mgr_config = {'mapped_file_dir': mapped_file_dir}
- cfg_data = MockConfigData(
- {"classes":
- {"IN": [{"type": "MasterFiles",
- "params": { "example.com": TESTDATA_PATH + "example.com.zone" },
- "cache-enable": True,
- "cache-type": "mapped"}]
- }
- })
- cmgr = DataSrcClientsMgr(use_cache=True)
- cmgr.reconfigure({}, cfg_data)
- genid, clients_map = cmgr.get_clients_map()
- datasrc_info = DataSrcInfo(genid, clients_map, mgr_config)
- self.assertEqual(1, datasrc_info.gen_id)
- self.assertEqual(clients_map, datasrc_info.clients_map)
- self.assertEqual(1, len(datasrc_info.segment_info_map))
- sgmt_info = datasrc_info.segment_info_map[(RRClass.IN, 'MasterFiles')]
- self.assertIsNone(sgmt_info.get_reset_param(SegmentInfo.READER))
- self.assertIsNotNone(sgmt_info.get_reset_param(SegmentInfo.WRITER))
- param = sgmt_info.get_reset_param(SegmentInfo.WRITER)
- self.__mapped_file_path = param['mapped-file']
- self._builder_thread.start()
- # Now that the builder thread is running, send it the "load"
- # command. We should be notified when the load operation is
- # complete.
- with self._builder_cv:
- self._builder_command_queue.append(('load',
- isc.dns.Name("example.com"),
- datasrc_info, RRClass.IN,
- 'MasterFiles'))
- self._builder_cv.notify_all()
- # Wait 60 seconds to receive a notification on the socket from
- # the builder.
- (reads, _, _) = select.select([self._master_sock], [], [], 60)
- self.assertTrue(self._master_sock in reads)
- # Reading 1 byte should not block us here, especially as the
- # socket is ready to read. It's a hack, but this is just a
- # testcase.
- got = self._master_sock.recv(1)
- self.assertEqual(got, b'x')
- with self._builder_lock:
- # The command queue must be cleared, and the response queue
- # must contain a response that a bad command was sent. The
- # thread is no longer running, so we can use the queues
- # without a lock.
- self.assertEqual(len(self._builder_command_queue), 0)
- self.assertEqual(len(self._builder_response_queue), 1)
- response = self._builder_response_queue[0]
- self.assertTrue(isinstance(response, tuple))
- self.assertTupleEqual(response, ('load-completed', datasrc_info,
- RRClass.IN, 'MasterFiles'))
- del self._builder_response_queue[:]
- # Now try looking for some loaded data
- clist = datasrc_info.clients_map[RRClass.IN]
- dsrc, finder, exact = clist.find(isc.dns.Name("example.com"))
- self.assertIsNotNone(dsrc)
- self.assertTrue(isinstance(dsrc, isc.datasrc.DataSourceClient))
- self.assertIsNotNone(finder)
- self.assertTrue(isinstance(finder, isc.datasrc.ZoneFinder))
- self.assertTrue(exact)
- # Send the builder thread the "shutdown" command. The thread
- # should exit its main loop and be joinable.
- with self._builder_cv:
- self._builder_command_queue.append(('shutdown',))
- self._builder_cv.notify_all()
- # Wait 5 seconds at most for the main loop of the builder to
- # exit.
- self._builder_thread.join(5)
- self.assertFalse(self._builder_thread.isAlive())
- # The command queue must be cleared, and the response queue must
- # be untouched (we don't use it in this test). The thread is no
- # longer running, so we can use the queues without a lock.
- self.assertEqual(len(self._builder_command_queue), 0)
- self.assertEqual(len(self._builder_response_queue), 0)
- if __name__ == "__main__":
- isc.log.init("bind10-test")
- isc.log.resetUnitTestRootLogger()
- unittest.main()
|