|
@@ -161,6 +161,8 @@ class MyCCSession(isc.config.ConfigData):
|
|
|
self._stopped = False
|
|
|
# Used as the return value of get_remote_config_value. Customizable.
|
|
|
self.auth_db_file = READ_ZONE_DB_FILE
|
|
|
+ # Used as the return value of get_remote_config_value. Customizable.
|
|
|
+ self.auth_datasources = None
|
|
|
# faked cc channel, providing group_send/recvmsg itself. The following
|
|
|
# attributes are for inspection/customization in tests.
|
|
|
self._session = self
|
|
@@ -190,6 +192,11 @@ class MyCCSession(isc.config.ConfigData):
|
|
|
def get_remote_config_value(self, module_name, item):
|
|
|
if module_name == "Auth" and item == "database_file":
|
|
|
return self.auth_db_file, False
|
|
|
+ if module_name == "Auth" and item == "datasources":
|
|
|
+ if self.auth_datasources is None:
|
|
|
+ return [], True # default
|
|
|
+ else:
|
|
|
+ return self.auth_datasources, False
|
|
|
|
|
|
def group_sendmsg(self, msg, group):
|
|
|
# remember the passed parameter, and return dummy sequence
|
|
@@ -745,20 +752,42 @@ class TestDDNSSession(unittest.TestCase):
|
|
|
num_rrsets = len(self.__req_message.get_section(SECTION_PREREQUISITE))
|
|
|
self.assertEqual(2, num_rrsets)
|
|
|
|
|
|
- def check_session_msg(self, result, expect_recv=1):
|
|
|
+ def check_session_msg(self, result, expect_recv=1, notify_auth=False):
|
|
|
'''Check post update communication with other modules.'''
|
|
|
# iff the update succeeds, b10-ddns should tell interested other
|
|
|
- # modules the information about the update zone in the form of
|
|
|
+ # modules the information about the update zone. Possible modules
|
|
|
+ # are xfrout and auth: for xfrout, the message format should be:
|
|
|
# {'command': ['notify', {'zone_name': <updated_zone_name>,
|
|
|
# 'zone_class', <updated_zone_class>}]}
|
|
|
+ # for auth, it should be:
|
|
|
+ # {'command': ['loadzone', {'origin': <updated_zone_name>,
|
|
|
+ # 'class', <updated_zone_class>,
|
|
|
+ # 'datasrc', <datasrc type, should be
|
|
|
+ # "memory" in practice>}]}
|
|
|
# and expect an answer by calling group_recvmsg().
|
|
|
#
|
|
|
# expect_recv indicates the expected number of calls to
|
|
|
- # group_recvmsg(), which is normally 1, but can be 0 if send fails.
|
|
|
+ # group_recvmsg(), which is normally 1, but can be 0 if send fails;
|
|
|
+ # if the message is to be sent
|
|
|
if result == UPDATE_SUCCESS:
|
|
|
- self.assertEqual(1, len(self.__cc_session._sent_msg))
|
|
|
+ expected_sentmsg = 2 if notify_auth else 1
|
|
|
+ self.assertEqual(expected_sentmsg,
|
|
|
+ len(self.__cc_session._sent_msg))
|
|
|
self.assertEqual(expect_recv, self.__cc_session._recvmsg_called)
|
|
|
- sent_msg, sent_group = self.__cc_session._sent_msg[0]
|
|
|
+ msg_cnt = 0
|
|
|
+ if notify_auth:
|
|
|
+ sent_msg, sent_group = self.__cc_session._sent_msg[msg_cnt]
|
|
|
+ sent_cmd = sent_msg['command']
|
|
|
+ self.assertEqual('Auth', sent_group)
|
|
|
+ self.assertEqual('loadzone', sent_cmd[0])
|
|
|
+ self.assertEqual(3, len(sent_cmd[1]))
|
|
|
+ self.assertEqual(TEST_ZONE_NAME.to_text(),
|
|
|
+ sent_cmd[1]['origin'])
|
|
|
+ self.assertEqual(TEST_RRCLASS.to_text(),
|
|
|
+ sent_cmd[1]['class'])
|
|
|
+ self.assertEqual('memory', sent_cmd[1]['datasrc'])
|
|
|
+ msg_cnt += 1
|
|
|
+ sent_msg, sent_group = self.__cc_session._sent_msg[msg_cnt]
|
|
|
sent_cmd = sent_msg['command']
|
|
|
self.assertEqual('Xfrout', sent_group)
|
|
|
self.assertEqual('notify', sent_cmd[0])
|
|
@@ -815,8 +844,32 @@ class TestDDNSSession(unittest.TestCase):
|
|
|
self.__cc_session._sendmsg_exception = RuntimeError('unexpected')
|
|
|
self.assertRaises(RuntimeError, self.check_session)
|
|
|
|
|
|
+ def test_session_msg_for_auth(self):
|
|
|
+ '''Test post update communication with other modules including Auth.'''
|
|
|
+ # Let the CC session return in-memory config with sqlite3 backend.
|
|
|
+ # (The default case was covered by other tests.)
|
|
|
+ self.__cc_session.auth_datasources = \
|
|
|
+ [{'type': 'memory', 'class': 'IN', 'zones': [
|
|
|
+ {'origin': TEST_ZONE_NAME_STR, 'filetype': 'sqlite3'}]}]
|
|
|
+ self.check_session()
|
|
|
+ self.check_session_msg(UPDATE_SUCCESS, expect_recv=2, notify_auth=True)
|
|
|
+
|
|
|
+ # Let sendmsg() raise an exception. The first exception shouldn't
|
|
|
+ # stop sending the second message. There's just no recv calls.
|
|
|
+ self.__cc_session.clear_msg()
|
|
|
+ self.__cc_session._sendmsg_exception = SessionError('send error')
|
|
|
+ self.check_session()
|
|
|
+ self.check_session_msg(UPDATE_SUCCESS, expect_recv=0, notify_auth=True)
|
|
|
+
|
|
|
+ # Likewise, in the case recvmsg() raises (and there should be recv
|
|
|
+ # calls in this case)
|
|
|
+ self.__cc_session.clear_msg()
|
|
|
+ self.__cc_session._recvmsg_exception = SessionError('recv error')
|
|
|
+ self.check_session()
|
|
|
+ self.check_session_msg(UPDATE_SUCCESS, expect_recv=2, notify_auth=True)
|
|
|
+
|
|
|
def test_session_with_config(self):
|
|
|
- '''Check a session with more relistic config setups
|
|
|
+ '''Check a session with more realistic config setups.
|
|
|
|
|
|
We don't have to explore various cases in detail in this test.
|
|
|
We're just checking if the expected configured objects are passed
|