Browse Source

Merge #2857: memory manager initial exchanges

This initializes the basic data structures of the memory manager and
starts loading data into the segments.
Michal 'vorner' Vaner 11 years ago
parent
commit
9b97fd8686
2 changed files with 160 additions and 11 deletions
  1. 54 11
      src/bin/memmgr/memmgr.py.in
  2. 106 0
      src/bin/memmgr/tests/memmgr_test.py

+ 54 - 11
src/bin/memmgr/memmgr.py.in

@@ -29,7 +29,7 @@ from isc.log_messages.memmgr_messages import *
 from isc.server_common.bind10_server import BIND10Server, BIND10ServerFatal
 from isc.server_common.datasrc_clients_mgr \
     import DataSrcClientsMgr, ConfigError
-from isc.memmgr.datasrc_info import DataSrcInfo
+from isc.memmgr.datasrc_info import DataSrcInfo, SegmentInfo
 from isc.memmgr.builder import MemorySegmentBuilder
 import isc.util.process
 
@@ -124,18 +124,51 @@ class Memmgr(BIND10Server):
         # All copy, switch to the new configuration.
         self._config_params = new_config_params
 
-    def __notify_from_builder(self):
-        # Nothing is implemented here for now. This method should have
-        # code to handle responses from the builder in
-        # self._builder_response_queue[]. Access must be synchronized
-        # using self._builder_lock.
-        pass
+    def _cmd_to_builder(self, cmd):
+        """
+        Send a command to the builder, with proper synchronization.
+        """
+        assert isinstance(cmd, tuple)
+        with self._builder_cv:
+            self._builder_command_queue.append(cmd)
+            self._builder_cv.notify_all()
+
+    def _notify_from_builder(self):
+        """
+        Read the notifications from the builder thread.
+        """
+        self._master_sock.recv(1) # Clear the wake-up data
+        notifications = None
+        with self._builder_lock:
+            # Copy the notifications out and clear them from the
+            # original list. We may not assign [] to
+            # self._builder_response_queue to clear it, because there's
+            # another reference to it from the other thread and it would
+            # not keep the original list.
+            notifications = self._builder_response_queue[:]
+            del self._builder_response_queue[:]
+        for notification in notifications:
+            notif_name = notification[0]
+            if notif_name == 'load-completed':
+                (_, dsrc_info, rrclass, dsrc_name) = notification
+                sgmt_info = dsrc_info.segment_info_map[(rrclass, dsrc_name)]
+                cmd = sgmt_info.complete_update()
+                # It may return another load command on the same data source.
+                # If it is so, we execute it too, before we start
+                # synchronizing with the readers.
+                if cmd is not None:
+                    self._cmd_to_builder(cmd)
+                else:
+                    pass
+                    # TODO: Send to the readers, #2858
+            else:
+                raise ValueError('Unknown notification name: ' + notif_name)
 
     def __create_builder_thread(self):
         # We get responses from the builder thread on this socket pair.
         (self._master_sock, self._builder_sock) = \
             socket.socketpair(socket.AF_UNIX, socket.SOCK_STREAM)
-        self.watch_fileno(self._master_sock, rcallback=self.__notify_from_builder)
+        self.watch_fileno(self._master_sock, rcallback=self._notify_from_builder)
 
         # See the documentation for MemorySegmentBuilder on how the
         # following are used.
@@ -161,9 +194,7 @@ class Memmgr(BIND10Server):
 
         # This makes the MemorySegmentBuilder exit its main loop. It
         # should make the builder thread joinable.
-        with self._builder_cv:
-            self._builder_command_queue.append(('shutdown',))
-            self._builder_cv.notify_all()
+        self._cmd_to_builder(('shutdown',))
 
         self._builder_thread.join()
 
@@ -202,13 +233,25 @@ class Memmgr(BIND10Server):
             genid, clients_map = self._datasrc_clients_mgr.get_clients_map()
             datasrc_info = DataSrcInfo(genid, clients_map, self._config_params)
             self._datasrc_info_list.append(datasrc_info)
+            self._init_segments(datasrc_info)
 
             # Full datasrc reconfig will be rare, so would be worth logging
             # at the info level.
             logger.info(MEMMGR_DATASRC_RECONFIGURED, genid)
+
         except isc.server_common.datasrc_clients_mgr.ConfigError as ex:
             logger.error(MEMMGR_DATASRC_CONFIG_ERROR, ex)
 
+    def _init_segments(self, datasrc_info):
+        for key, sgmt_info in datasrc_info.segment_info_map.items():
+            rrclass, dsrc_name = key
+            cmd = ('load', None, datasrc_info, rrclass, dsrc_name)
+            sgmt_info.add_event(cmd)
+            send_cmd = sgmt_info.start_update()
+            assert cmd == send_cmd and sgmt_info.get_state() == \
+                SegmentInfo.UPDATING
+            self._cmd_to_builder(cmd)
+
 if '__main__' == __name__:
     mgr = Memmgr()
     sys.exit(mgr.run(MODULE_NAME))

+ 106 - 0
src/bin/memmgr/tests/memmgr_test.py

@@ -16,12 +16,14 @@
 import unittest
 import os
 import re
+import threading
 
 import isc.log
 from isc.dns import RRClass
 import isc.config
 from isc.config import parse_answer
 import memmgr
+from isc.memmgr.datasrc_info import SegmentInfo
 from isc.testutils.ccsession_mock import MockModuleCCSession
 
 class MyCCSession(MockModuleCCSession, isc.config.ConfigData):
@@ -190,9 +192,14 @@ class TestMemmgr(unittest.TestCase):
         cfg_data = MockConfigData(
             {"classes": {"IN": [{"type": "MasterFiles",
                                  "cache-enable": True, "params": {}}]}})
+        self.__init_called = None
+        def mock_init_segments(param):
+            self.__init_called = param
+        self.__mgr._init_segments = mock_init_segments
         self.__mgr._datasrc_config_handler({}, cfg_data)
         self.assertEqual(1, len(self.__mgr._datasrc_info_list))
         self.assertEqual(1, self.__mgr._datasrc_info_list[0].gen_id)
+        self.assertEqual(self.__init_called, self.__mgr._datasrc_info_list[0])
 
         # Below we're using a mock DataSrcClientMgr for easier tests
         class MockDataSrcClientMgr:
@@ -220,6 +227,7 @@ class TestMemmgr(unittest.TestCase):
             MockDataSrcClientMgr([('sqlite3', 'mapped', None)])
         self.__mgr._datasrc_config_handler(None, None) # params don't matter
         self.assertEqual(2, len(self.__mgr._datasrc_info_list))
+        self.assertEqual(self.__init_called, self.__mgr._datasrc_info_list[1])
         self.assertIsNotNone(
             self.__mgr._datasrc_info_list[1].segment_info_map[
                 (RRClass.IN, 'sqlite3')])
@@ -230,6 +238,104 @@ class TestMemmgr(unittest.TestCase):
         self.__mgr._datasrc_config_handler(None, None)
         self.assertEqual(2, len(self.__mgr._datasrc_info_list))
 
+    def test_init_segments(self):
+        """
+        Test the initialization of segments ‒ just load everything found in there.
+        """
+        # Fake a lot of things. These are objects hard to set up, so this is
+        # easier.
+        class SgmtInfo:
+            def __init__(self):
+                self.events = []
+                self.__state = None
+
+            def add_event(self, cmd):
+                self.events.append(cmd)
+                self.__state = SegmentInfo.UPDATING
+
+            def start_update(self):
+                return self.events[0]
+
+            def get_state(self):
+                return self.__state
+
+        sgmt_info = SgmtInfo()
+        class DataSrcInfo:
+            def __init__(self):
+                self.segment_info_map = \
+                    {(isc.dns.RRClass.IN, "name"): sgmt_info}
+        dsrc_info = DataSrcInfo()
+
+        # Pretend to have the builder thread
+        self.__mgr._builder_cv = threading.Condition()
+
+        # Run the initialization
+        self.__mgr._init_segments(dsrc_info)
+
+        # The event was pushed into the segment info
+        command = ('load', None, dsrc_info, isc.dns.RRClass.IN, 'name')
+        self.assertEqual([command], sgmt_info.events)
+        self.assertEqual([command], self.__mgr._builder_command_queue)
+        del self.__mgr._builder_command_queue[:]
+
+    def test_notify_from_builder(self):
+        """
+        Check the notify from builder thing eats the notifications and
+        handles them.
+        """
+        # Some mocks
+        class SgmtInfo:
+            def complete_update():
+                return 'command'
+        sgmt_info = SgmtInfo
+        class DataSrcInfo:
+            def __init__(self):
+                self.segment_info_map = \
+                    {(isc.dns.RRClass.IN, "name"): sgmt_info}
+        dsrc_info = DataSrcInfo()
+        class Sock:
+            def recv(self, size):
+                pass
+        self.__mgr._master_sock = Sock()
+        commands = []
+        def mock_cmd_to_builder(cmd):
+            commands.append(cmd)
+        self.__mgr._cmd_to_builder = mock_cmd_to_builder
+
+        self.__mgr._builder_lock = threading.Lock()
+        # Extract the reference for the queue. We get a copy of the reference
+        # to check it is cleared, not a new empty one installed
+        notif_ref = self.__mgr._builder_response_queue
+        notif_ref.append(('load-completed', dsrc_info, isc.dns.RRClass.IN,
+                          'name'))
+        # Wake up the main thread and let it process the notifications
+        self.__mgr._notify_from_builder()
+        # All notifications are now eaten
+        self.assertEqual([], notif_ref)
+        self.assertEqual(['command'], commands)
+        del commands[:]
+        # The new command is sent
+        # Once again the same, but with the last command - nothing new pushed
+        sgmt_info.complete_update = lambda: None
+        notif_ref.append(('load-completed', dsrc_info, isc.dns.RRClass.IN,
+                          'name'))
+        self.__mgr._notify_from_builder()
+        self.assertEqual([], notif_ref)
+        self.assertEqual([], commands)
+        # This is invalid (unhandled) notification name
+        notif_ref.append(('unhandled',))
+        self.assertRaises(ValueError, self.__mgr._notify_from_builder)
+        self.assertEqual([], notif_ref)
+
+    def test_send_to_builder(self):
+        """
+        Send command to the builder test.
+        """
+        self.__mgr._builder_cv = threading.Condition()
+        self.__mgr._cmd_to_builder(('test',))
+        self.assertEqual([('test',)], self.__mgr._builder_command_queue)
+        del self.__mgr._builder_command_queue[:]
+
 if __name__== "__main__":
     isc.log.resetUnitTestRootLogger()
     unittest.main()