Browse Source

Merge branch 'trac2855_2'

Mukund Sivaraman 11 years ago
parent
commit
fa4e16aaa9

+ 60 - 0
src/bin/memmgr/memmgr.py.in

@@ -19,6 +19,8 @@ import copy
 import os
 import sys
 import signal
+import socket
+import threading
 
 sys.path.append('@@PYTHONPATH@@')
 import isc.log
@@ -28,6 +30,7 @@ from isc.server_common.bind10_server import BIND10Server, BIND10ServerFatal
 from isc.server_common.datasrc_clients_mgr \
     import DataSrcClientsMgr, ConfigError
 from isc.memmgr.datasrc_info import DataSrcInfo
+from isc.memmgr.builder import MemorySegmentBuilder
 import isc.util.process
 
 MODULE_NAME = 'memmgr'
@@ -43,6 +46,7 @@ class ConfigError(Exception):
 
 class Memmgr(BIND10Server):
     def __init__(self):
+        BIND10Server.__init__(self)
         # Running configurable parameters: on initial configuration this will
         # be a dict: str=>config_value.
         # This is defined as "protected" so tests can inspect it; others
@@ -57,6 +61,10 @@ class Memmgr(BIND10Server):
         # active configuration generations.  Allow tests to inspec it.
         self._datasrc_info_list = []
 
+        self._builder_setup = False
+        self._builder_command_queue = []
+        self._builder_response_queue = []
+
     def _config_handler(self, new_config):
         """Configuration handler, called via BIND10Server.
 
@@ -116,6 +124,52 @@ class Memmgr(BIND10Server):
         # All copy, switch to the new configuration.
         self._config_params = new_config_params
 
+    def __notify_from_builder(self):
+        # Nothing is implemented here for now. This method should have
+        # code to handle responses from the builder in
+        # self._builder_response_queue[]. Access must be synchronized
+        # using self._builder_lock.
+        pass
+
+    def __create_builder_thread(self):
+        # We get responses from the builder thread on this socket pair.
+        (self._master_sock, self._builder_sock) = \
+            socket.socketpair(socket.AF_UNIX, socket.SOCK_STREAM)
+        self.watch_fileno(self._master_sock, rcallback=self.__notify_from_builder)
+
+        # See the documentation for MemorySegmentBuilder on how the
+        # following are used.
+        self._builder_lock = threading.Lock()
+        self._builder_cv = threading.Condition(lock=self._builder_lock)
+
+        self._builder = MemorySegmentBuilder(self._builder_sock,
+                                             self._builder_cv,
+                                             self._builder_command_queue,
+                                             self._builder_response_queue)
+        self._builder_thread = threading.Thread(target=self._builder.run)
+        self._builder_thread.start()
+
+        self._builder_setup = True
+
+    def __shutdown_builder_thread(self):
+        # Some unittests do not create the builder thread, so we check
+        # that.
+        if not self._builder_setup:
+            return
+
+        self._builder_setup = False
+
+        # This makes the MemorySegmentBuilder exit its main loop. It
+        # should make the builder thread joinable.
+        with self._builder_cv:
+            self._builder_command_queue.append('shutdown')
+            self._builder_cv.notify_all()
+
+        self._builder_thread.join()
+
+        self._master_sock.close()
+        self._builder_sock.close()
+
     def _setup_module(self):
         """Module specific initialization for BIND10Server."""
         try:
@@ -129,6 +183,12 @@ class Memmgr(BIND10Server):
             logger.error(MEMMGR_NO_DATASRC_CONF, ex)
             raise BIND10ServerFatal('failed to setup memmgr module')
 
+        self.__create_builder_thread()
+
+    def _shutdown_module(self):
+        """Module specific finalization."""
+        self.__shutdown_builder_thread()
+
     def _datasrc_config_handler(self, new_config, config_data):
         """Callback of data_sources configuration update.
 

+ 9 - 0
src/bin/memmgr/tests/memmgr_test.py

@@ -74,6 +74,15 @@ class TestMemmgr(unittest.TestCase):
         self.__orig_isdir = os.path.isdir
 
     def tearDown(self):
+        # Not all unittests cause this method to be called, so we call
+        # it explicitly as it may be necessary in some cases where the
+        # builder thread has been created.
+        self.__mgr._shutdown_module()
+
+        # Assert that all commands sent to the builder thread were
+        # handled.
+        self.assertEqual(len(self.__mgr._builder_command_queue), 0)
+
         # Restore faked values
         os.access = self.__orig_os_access
         os.path.isdir = self.__orig_isdir

+ 1 - 1
src/lib/python/isc/memmgr/Makefile.am

@@ -1,6 +1,6 @@
 SUBDIRS = . tests
 
-python_PYTHON = __init__.py datasrc_info.py
+python_PYTHON = __init__.py builder.py datasrc_info.py
 
 pythondir = $(pyexecdir)/isc/memmgr
 

+ 99 - 0
src/lib/python/isc/memmgr/builder.py

@@ -0,0 +1,99 @@
+# Copyright (C) 2013  Internet Systems Consortium.
+#
+# Permission to use, copy, modify, and distribute this software for any
+# purpose with or without fee is hereby granted, provided that the above
+# copyright notice and this permission notice appear in all copies.
+#
+# THE SOFTWARE IS PROVIDED "AS IS" AND INTERNET SYSTEMS CONSORTIUM
+# DISCLAIMS ALL WARRANTIES WITH REGARD TO THIS SOFTWARE INCLUDING ALL
+# IMPLIED WARRANTIES OF MERCHANTABILITY AND FITNESS. IN NO EVENT SHALL
+# INTERNET SYSTEMS CONSORTIUM BE LIABLE FOR ANY SPECIAL, DIRECT,
+# INDIRECT, OR CONSEQUENTIAL DAMAGES OR ANY DAMAGES WHATSOEVER RESULTING
+# FROM LOSS OF USE, DATA OR PROFITS, WHETHER IN AN ACTION OF CONTRACT,
+# NEGLIGENCE OR OTHER TORTIOUS ACTION, ARISING OUT OF OR IN CONNECTION
+# WITH THE USE OR PERFORMANCE OF THIS SOFTWARE.
+
+class MemorySegmentBuilder:
+    """The builder runs in a different thread in the memory manager. It
+    waits for commands from the memory manager, and then executes them
+    in the given order sequentially.
+    """
+
+    def __init__(self, sock, cv, command_queue, response_queue):
+        """ The constructor takes the following arguments:
+
+            sock: A socket using which this builder object notifies the
+                  main thread that it has a response waiting for it.
+
+            cv: A condition variable object that is used by the main
+                thread to tell this builder object that new commands are
+                available to it. Note that this is also used for
+                synchronizing access to the queues, so code that uses
+                MemorySegmentBuilder must use this condition variable's
+                lock object to synchronize its access to the queues.
+
+            command_queue: A list of commands sent by the main thread to
+                           this object. Commands should be executed
+                           sequentially in the given order by this
+                           object.
+
+            response_queue: A list of responses sent by this object to
+                            the main thread. The format of this is
+                            currently not strictly defined. Future
+                            tickets will be able to define it based on
+                            how it's used.
+        """
+
+        self._sock = sock
+        self._cv = cv
+        self._command_queue = command_queue
+        self._response_queue = response_queue
+        self._shutdown = False
+
+    def run(self):
+        """ This is the method invoked when the builder thread is
+            started.  In this thread, be careful when modifying
+            variables passed-by-reference in the constructor. If they
+            are reassigned, they will not refer to the main thread's
+            objects any longer. Any use of command_queue and
+            response_queue must be synchronized by acquiring the lock in
+            the condition variable. This method must normally terminate
+            only when the 'shutdown' command is sent to it.
+        """
+
+        # Acquire the condition variable while running the loop.
+        with self._cv:
+            while not self._shutdown:
+                while len(self._command_queue) == 0:
+                    self._cv.wait()
+                # Move the queue content to a local queue. Be careful of
+                # not making assignments to reference variables.
+                local_command_queue = self._command_queue.copy()
+                del self._command_queue[:]
+
+                # Run commands passed in the command queue sequentially
+                # in the given order.  For now, it only supports the
+                # "shutdown" command, which just exits the thread.
+                for command in local_command_queue:
+                    if command == 'shutdown':
+                        self._shutdown = True
+                        # When the shutdown command is received, we do
+                        # not process any further commands.
+                        break
+                    else:
+                        # A bad command was received. Raising an
+                        # exception is not useful in this case as we are
+                        # likely running in a different thread from the
+                        # main thread which would need to be
+                        # notified. Instead return this in the response
+                        # queue.
+                        self._response_queue.append(('bad_command',))
+                        self._shutdown = True
+                        break
+
+                # Notify (any main thread) on the socket about a
+                # response. Otherwise, the main thread may wait in its
+                # loop without knowing there was a problem.
+                if len(self._response_queue) > 0:
+                    while self._sock.send(b'x') != 1:
+                        continue

+ 1 - 1
src/lib/python/isc/memmgr/tests/Makefile.am

@@ -1,5 +1,5 @@
 PYCOVERAGE_RUN = @PYCOVERAGE_RUN@
-PYTESTS = datasrc_info_tests.py
+PYTESTS = builder_tests.py datasrc_info_tests.py
 EXTRA_DIST = $(PYTESTS)
 
 # If necessary (rare cases), explicitly specify paths to dynamic libraries

+ 121 - 0
src/lib/python/isc/memmgr/tests/builder_tests.py

@@ -0,0 +1,121 @@
+# Copyright (C) 2013  Internet Systems Consortium.
+#
+# Permission to use, copy, modify, and distribute this software for any
+# purpose with or without fee is hereby granted, provided that the above
+# copyright notice and this permission notice appear in all copies.
+#
+# THE SOFTWARE IS PROVIDED "AS IS" AND INTERNET SYSTEMS CONSORTIUM
+# DISCLAIMS ALL WARRANTIES WITH REGARD TO THIS SOFTWARE INCLUDING ALL
+# IMPLIED WARRANTIES OF MERCHANTABILITY AND FITNESS. IN NO EVENT SHALL
+# INTERNET SYSTEMS CONSORTIUM BE LIABLE FOR ANY SPECIAL, DIRECT,
+# INDIRECT, OR CONSEQUENTIAL DAMAGES OR ANY DAMAGES WHATSOEVER RESULTING
+# FROM LOSS OF USE, DATA OR PROFITS, WHETHER IN AN ACTION OF CONTRACT,
+# NEGLIGENCE OR OTHER TORTIOUS ACTION, ARISING OUT OF OR IN CONNECTION
+# WITH THE USE OR PERFORMANCE OF THIS SOFTWARE.
+
+import unittest
+import socket
+import select
+import threading
+
+import isc.log
+from isc.memmgr.builder import *
+
+class TestMemorySegmentBuilder(unittest.TestCase):
+    def _create_builder_thread(self):
+        (self._master_sock, self._builder_sock) = \
+            socket.socketpair(socket.AF_UNIX, socket.SOCK_STREAM)
+
+        self._builder_command_queue = []
+        self._builder_response_queue = []
+
+        self._builder_cv = threading.Condition()
+
+        self._builder = MemorySegmentBuilder(self._builder_sock,
+                                             self._builder_cv,
+                                             self._builder_command_queue,
+                                             self._builder_response_queue)
+        self._builder_thread = threading.Thread(target=self._builder.run)
+
+    def setUp(self):
+        self._create_builder_thread()
+
+    def tearDown(self):
+        # It's the tests' responsibility to stop and join the builder
+        # thread if they start it.
+        self.assertFalse(self._builder_thread.isAlive())
+
+        self._master_sock.close()
+        self._builder_sock.close()
+
+    def test_bad_command(self):
+        """Tests what happens when a bad command is passed to the
+        MemorySegmentBuilder.
+        """
+
+        self._builder_thread.start()
+
+        # Now that the builder thread is running, send it a bad
+        # command. The thread should exit its main loop and be joinable.
+        with self._builder_cv:
+            self._builder_command_queue.append('bad_command')
+            self._builder_cv.notify_all()
+
+        # Wait 5 seconds to receive a notification on the socket from
+        # the builder.
+        (reads, _, _) = select.select([self._master_sock], [], [], 5)
+        self.assertTrue(self._master_sock in reads)
+
+        # Reading 1 byte should not block us here, especially as the
+        # socket is ready to read. It's a hack, but this is just a
+        # testcase.
+        got = self._master_sock.recv(1)
+        self.assertEqual(got, b'x')
+
+        # Wait 5 seconds at most for the main loop of the builder to
+        # exit.
+        self._builder_thread.join(5)
+        self.assertFalse(self._builder_thread.isAlive())
+
+        # The command queue must be cleared, and the response queue must
+        # contain a response that a bad command was sent. The thread is
+        # no longer running, so we can use the queues without a lock.
+        self.assertEqual(len(self._builder_command_queue), 0)
+        self.assertEqual(len(self._builder_response_queue), 1)
+
+        response = self._builder_response_queue[0]
+        self.assertTrue(isinstance(response, tuple))
+        self.assertTupleEqual(response, ('bad_command',))
+        del self._builder_response_queue[:]
+
+    def test_shutdown(self):
+        """Tests that shutdown command exits the MemorySegmentBuilder
+        loop.
+        """
+
+        self._builder_thread.start()
+
+        # Now that the builder thread is running, send it the shutdown
+        # command. The thread should exit its main loop and be joinable.
+        with self._builder_cv:
+            self._builder_command_queue.append('shutdown')
+            # Commands after 'shutdown' must be ignored.
+            self._builder_command_queue.append('bad_command_1')
+            self._builder_command_queue.append('bad_command_2')
+            self._builder_cv.notify_all()
+
+        # Wait 5 seconds at most for the main loop of the builder to
+        # exit.
+        self._builder_thread.join(5)
+        self.assertFalse(self._builder_thread.isAlive())
+
+        # The command queue must be cleared, and the response queue must
+        # be untouched (we don't use it in this test). The thread is no
+        # longer running, so we can use the queues without a lock.
+        self.assertEqual(len(self._builder_command_queue), 0)
+        self.assertEqual(len(self._builder_response_queue), 0)
+
+if __name__ == "__main__":
+    isc.log.init("bind10-test")
+    isc.log.resetUnitTestRootLogger()
+    unittest.main()

+ 70 - 8
src/lib/python/isc/server_common/bind10_server.py.in

@@ -52,8 +52,14 @@ class BIND10Server:
                      initialization.  This is called after the module CC
                      session has started, and can be used for registering
                      interest on remote modules, etc.  If it raises an
-                     exception, the server will be immediatelly stopped.
+                     exception, the server will be immediately stopped.
                      Parameter: None, Return: None
+      _shutdown_module: can be optionally defined for module-specific
+                        finalization. This is called right before the
+                        module CC session is stopped. If it raises an
+                        exception, the server will be immediately
+                        stopped.
+                        Parameter: None, Return: None
 
     """
     # Will be set to True when the server should stop and shut down.
@@ -72,6 +78,11 @@ class BIND10Server:
     # Basically constant, but allow tests to override it.
     _select_fn = select.select
 
+    def __init__(self):
+        self._read_callbacks = {}
+        self._write_callbacks = {}
+        self._error_callbacks = {}
+
     @property
     def shutdown(self):
         return self.__shutdown
@@ -141,7 +152,13 @@ class BIND10Server:
         cc_fileno = self._mod_cc.get_socket().fileno()
         while not self.__shutdown:
             try:
-                (reads, _, _) = self._select_fn([cc_fileno], [], [])
+                read_fds = list(self._read_callbacks.keys())
+                read_fds.append(cc_fileno)
+                write_fds = list(self._write_callbacks.keys())
+                error_fds = list(self._error_callbacks.keys())
+
+                (reads, writes, errors) = \
+                    self._select_fn(read_fds, write_fds, error_fds)
             except select.error as ex:
                 # ignore intterruption by signal; regard other select errors
                 # fatal.
@@ -149,12 +166,28 @@ class BIND10Server:
                     continue
                 else:
                     raise
-            for fileno in reads:
-                if fileno == cc_fileno:
-                    # this shouldn't raise an exception (if it does, we'll
-                    # propagate it)
-                    self._mod_cc.check_command(True)
 
+            for fileno in reads:
+                if fileno in self._read_callbacks:
+                    for callback in self._read_callbacks[fileno]:
+                        callback()
+
+            for fileno in writes:
+                if fileno in self._write_callbacks:
+                    for callback in self._write_callbacks[fileno]:
+                        callback()
+
+            for fileno in errors:
+                if fileno in self._error_callbacks:
+                    for callback in self._error_callbacks[fileno]:
+                        callback()
+
+            if cc_fileno in reads:
+                # this shouldn't raise an exception (if it does, we'll
+                # propagate it)
+                self._mod_cc.check_command(True)
+
+        self._shutdown_module()
         self._mod_cc.send_stopping()
 
     def _command_handler(self, cmd, args):
@@ -173,9 +206,38 @@ class BIND10Server:
         return isc.config.create_answer(1, "Unknown command: " + str(cmd))
 
     def _setup_module(self):
-        """The default implementation of the module specific initilization"""
+        """The default implementation of the module specific initialization"""
         pass
 
+    def _shutdown_module(self):
+        """The default implementation of the module specific finalization"""
+        pass
+
+    def watch_fileno(self, fileno, rcallback=None, wcallback=None, \
+                         xcallback=None):
+        """Register the fileno for the internal select() call.
+
+        *callback's are callable objects which would be called when
+        read, write, error events occur on the specified fileno.
+        """
+        if rcallback is not None:
+            if fileno in self._read_callbacks:
+                self._read_callbacks[fileno].append(rcallback)
+            else:
+                self._read_callbacks[fileno] = [rcallback]
+
+        if wcallback is not None:
+            if fileno in self._write_callbacks:
+                self._write_callbacks[fileno].append(wcallback)
+            else:
+                self._write_callbacks[fileno] = [wcallback]
+
+        if xcallback is not None:
+            if fileno in self._error_callbacks:
+                self._error_callbacks[fileno].append(xcallback)
+            else:
+                self._error_callbacks[fileno] = [xcallback]
+
     def run(self, module_name):
         """Start the server and let it run until it's told to stop.
 

+ 42 - 0
src/lib/python/isc/server_common/tests/bind10_server_test.py

@@ -62,6 +62,7 @@ class MyCCSession(MockModuleCCSession, isc.config.ConfigData):
 
 class MockServer(BIND10Server):
     def __init__(self):
+        BIND10Server.__init__(self)
         self._select_fn = self.select_wrapper
 
     def _setup_ccsession(self):
@@ -90,6 +91,9 @@ class MockServer(BIND10Server):
 class TestBIND10Server(unittest.TestCase):
     def setUp(self):
         self.__server = MockServer()
+        self.__reads = 0
+        self.__writes = 0
+        self.__errors = 0
 
     def test_init(self):
         """Check initial conditions"""
@@ -162,6 +166,16 @@ class TestBIND10Server(unittest.TestCase):
         self.assertTrue(self.__server.shutdown)
         self.assertEqual((0, None), isc.config.parse_answer(answer))
 
+    def test_run_with_shutdown_module(self):
+        """Check run() with module specific shutdown method."""
+        self.shutdown_called = False
+        def check_called():
+            self.shutdown_called = True
+        self.__server.__shutdown = True
+        self.__server._shutdown_module = check_called
+        self.assertEqual(0, self.__server.run('test'))
+        self.assertTrue(self.shutdown_called)
+
     def test_other_command(self):
         self.__server._mod_command_handler = self.__server.mod_command_handler
         answer = self.__server._command_handler('other command', None)
@@ -246,6 +260,34 @@ class TestBIND10Server(unittest.TestCase):
         # others will notice it due to connection reset.
         self.assertFalse(self.__server.mod_ccsession.stopped)
 
+    def my_read_callback(self):
+        self.__reads += 1
+
+    def my_write_callback(self):
+        self.__writes += 1
+
+    def my_error_callback(self):
+        self.__errors += 1
+
+    def test_watch_fileno(self):
+        """Test watching for fileno."""
+        self.select_params = []
+        self.__server._select_fn = \
+            lambda r, w, e: self.select_wrapper(r, w, e,
+                                                ret=([10, 20, 42, TEST_FILENO], [], [30]))
+        self.__server._setup_ccsession()
+
+        self.__server.watch_fileno(10, rcallback=self.my_read_callback)
+        self.__server.watch_fileno(20, rcallback=self.my_read_callback, \
+                                       wcallback=self.my_write_callback)
+        self.__server.watch_fileno(30, xcallback=self.my_error_callback)
+
+        self.__server._run_internal()
+        self.assertEqual([([10, 20, TEST_FILENO], [20], [30])], self.select_params)
+        self.assertEqual(2, self.__reads)
+        self.assertEqual(0, self.__writes)
+        self.assertEqual(1, self.__errors)
+
 if __name__== "__main__":
     isc.log.init("bind10_server_test")
     isc.log.resetUnitTestRootLogger()