Browse Source

[2013] make sure we use the common TSIG keyring module.

JINMEI Tatuya 13 years ago
parent
commit
ebbfe90a30
2 changed files with 28 additions and 7 deletions
  1. 2 3
      src/bin/ddns/ddns.py.in
  2. 26 4
      src/bin/ddns/tests/ddns_test.py

+ 2 - 3
src/bin/ddns/ddns.py.in

@@ -114,6 +114,7 @@ class DDNSServer:
 
         self._config_data = self._cc.get_full_config()
         self._cc.start()
+        isc.server_common.tsig_keyring.init_keyring(self._cc)
         self._shutdown = False
         # List of the session receivers where we get the requests
         self._socksession_receivers = {}
@@ -128,8 +129,6 @@ class DDNSServer:
         #
         # DDNS Protocol handling class.
         self._UpdateSessionClass = isc.ddns.session.UpdateSession
-        # A callabale that returns effective TSIG key ring.
-        self._get_tsig_keyring = isc.server_common.tsig_keyring.get_keyring
 
     class SessionError(Exception):
         '''Exception for internal errors in an update session.
@@ -216,7 +215,7 @@ class DDNSServer:
             return None
         tsig_ctx = TSIGContext(tsig_record.get_name(),
                                tsig_record.get_rdata().get_algorithm(),
-                               self._get_tsig_keyring())
+                               isc.server_common.tsig_keyring.get_keyring())
         tsig_error = tsig_ctx.verify(tsig_record, req_data)
         if tsig_error != TSIGError.NOERROR:
             raise SessionError("Failed to verify request's TSIG: " +

+ 26 - 4
src/bin/ddns/tests/ddns_test.py

@@ -114,6 +114,23 @@ class FakeUpdateSession:
             self.__msg.set_rcode(Rcode.REFUSED())
         return self.__msg
 
+class FakeKeyringModule:
+    '''Fake the entire isc.server_common.tsig_keyring module.'''
+
+    def init_keyring(self, cc):
+        '''Set the instrumental attribute to True when called.
+
+        It can be used for a test that confirms TSIG key initialization is
+        surely performed.  This class doesn't use any CC session, so the
+        cc parameter will be ignored.
+
+        '''
+        self.initialized = True
+
+    def get_keyring(self):
+        '''Simply return the predefined TSIG keyring unconditionally.'''
+        return TEST_TSIG_KEYRING
+
 class MyCCSession(isc.config.ConfigData):
     '''Fake session with minimal interface compliance'''
     def __init__(self):
@@ -167,6 +184,8 @@ class TestDDNSServer(unittest.TestCase):
     def setUp(self):
         cc_session = MyCCSession()
         self.assertFalse(cc_session._started)
+        self.orig_tsig_keyring = isc.server_common.tsig_keyring
+        isc.server_common.tsig_keyring = FakeKeyringModule()
         self.ddns_server = ddns.DDNSServer(cc_session)
         self.__cc_session = cc_session
         self.assertTrue(cc_session._started)
@@ -181,6 +200,7 @@ class TestDDNSServer(unittest.TestCase):
         ddns.select.select = select.select
         ddns.isc.util.cio.socketsession.SocketSessionReceiver = \
             isc.util.cio.socketsession.SocketSessionReceiver
+        isc.server_common.tsig_keyring = self.orig_tsig_keyring
 
     def test_listen(self):
         '''
@@ -449,19 +469,21 @@ class TestDDNSession(unittest.TestCase):
     def setUp(self):
         cc_session = MyCCSession()
         self.assertFalse(cc_session._started)
+        self.orig_tsig_keyring = isc.server_common.tsig_keyring
+        isc.server_common.tsig_keyring = FakeKeyringModule()
         self.server = ddns.DDNSServer(cc_session)
         self.server._UpdateSessionClass = self.__fake_session_creator
-        self.server._get_tsig_keyring = self.__fake_keyring_getter
         self.__faked_result = UPDATE_SUCCESS # will be returned by fake session
         self.__sock = FakeSocket(-1)
 
+    def tearDown(self):
+        self.assertTrue(isc.server_common.tsig_keyring.initialized)
+        isc.server_common.tsig_keyring = self.orig_tsig_keyring
+
     def __fake_session_creator(self, req_message, client_addr, zone_config):
         return FakeUpdateSession(req_message, client_addr, zone_config,
                                  self.__faked_result)
 
-    def __fake_keyring_getter(self):
-        return TEST_TSIG_KEYRING
-
     def check_update_response(self, resp_wire, expected_rcode=Rcode.NOERROR(),
                               tsig_ctx=None):
         '''Check if given wire data are valid form of update response.