Browse Source

Merge #2676

Replacing pair of group_sendmsg & group_recvmsg by compound method rpc_call in
many cases. It's not possible to convert all cases, due to various hacks in the
code.
Michal 'vorner' Vaner 12 years ago
parent
commit
77d49bfce9

+ 6 - 1
src/bin/cmdctl/cmdctl.py.in

@@ -429,8 +429,13 @@ class CommandControl():
             # Process the command sent to cmdctl directly.
             answer = self.command_handler(command_name, params)
         else:
+            # FIXME: Due to the fact that we use a separate session
+            # from the module one (due to threads and blocking), and
+            # because the plain cc session does not have the high-level
+            # rpc-call method, we use the low-level way and create the
+            # command ourselves.
             msg = ccsession.create_command(command_name, params)
-            seq = self._cc.group_sendmsg(msg, module_name)
+            seq = self._cc.group_sendmsg(msg, module_name, want_answer=True)
             logger.debug(DBG_CMDCTL_MESSAGING, CMDCTL_COMMAND_SENT,
                          command_name, module_name)
             #TODO, it may be blocked, msqg need to add a new interface waiting in timeout.

+ 24 - 37
src/bin/ddns/ddns.py.in

@@ -32,7 +32,8 @@ import isc.util.cio.socketsession
 import isc.server_common.tsig_keyring
 from isc.server_common.dns_tcp import DNSTCPContext
 from isc.datasrc import DataSourceClient
-from isc.server_common.auth_command import auth_loadzone_command
+from isc.server_common.auth_command import AUTH_LOADZONE_COMMAND, \
+    auth_loadzone_params
 import select
 import time
 import errno
@@ -544,42 +545,38 @@ class DDNSServer:
     def __notify_start_forwarder(self):
         '''Notify auth that DDNS Update messages can now be forwarded'''
         try:
-            seq = self._cc._session.group_sendmsg(create_command(
-                    "start_ddns_forwarder"), AUTH_MODULE_NAME)
-            answer, _ = self._cc._session.group_recvmsg(False, seq)
-            rcode, error_msg = parse_answer(answer)
-            if rcode != 0:
-                logger.error(DDNS_START_FORWARDER_ERROR, error_msg)
-        except (SessionTimeout, SessionError, ProtocolError) as ex:
+            self._cc.rpc_call("start_ddns_forwarder", AUTH_MODULE_NAME)
+        except (SessionTimeout, SessionError, ProtocolError,
+                RPCRecipientMissing) as ex:
             logger.error(DDNS_START_FORWARDER_FAIL, ex)
+        except RPCError as e:
+            logger.error(DDNS_START_FORWARDER_ERROR, e)
 
     def __notify_stop_forwarder(self):
         '''Notify auth that DDNS Update messages should no longer be forwarded.
 
         '''
         try:
-            seq = self._cc._session.group_sendmsg(create_command(
-                    "stop_ddns_forwarder"), AUTH_MODULE_NAME)
-            answer, _ = self._cc._session.group_recvmsg(False, seq)
-            rcode, error_msg = parse_answer(answer)
-            if rcode != 0:
-                logger.error(DDNS_STOP_FORWARDER_ERROR, error_msg)
-        except (SessionTimeout, SessionError, ProtocolError) as ex:
+            self._cc.rpc_call("stop_ddns_forwarder", AUTH_MODULE_NAME)
+        except (SessionTimeout, SessionError, ProtocolError,
+                RPCRecipientMissing) as ex:
             logger.error(DDNS_STOP_FORWARDER_FAIL, ex)
+        except RPCError as e:
+            logger.error(DDNS_STOP_FORWARDER_ERROR, e)
 
     def __notify_auth(self, zname, zclass):
         '''Notify auth of the update, if necessary.'''
-        msg = auth_loadzone_command(self._cc, zname, zclass)
-        if msg is not None:
-            self.__notify_update(AUTH_MODULE_NAME, msg, zname, zclass)
+        self.__notify_update(AUTH_MODULE_NAME, AUTH_LOADZONE_COMMAND,
+                             auth_loadzone_params(zname, zclass), zname,
+                             zclass)
 
     def __notify_xfrout(self, zname, zclass):
         '''Notify xfrout of the update.'''
         param = {'zone_name': zname.to_text(), 'zone_class': zclass.to_text()}
-        msg = create_command('notify', param)
-        self.__notify_update(XFROUT_MODULE_NAME, msg, zname, zclass)
+        self.__notify_update(XFROUT_MODULE_NAME, 'notify', param, zname,
+                             zclass)
 
-    def __notify_update(self, modname, msg, zname, zclass):
+    def __notify_update(self, modname, command, params, zname, zclass):
         '''Notify other module of the update.
 
         Note that we use blocking communication here.  While the internal
@@ -590,27 +587,17 @@ class DDNSServer:
         For a longer term we'll need to switch to asynchronous communication,
         but for now we rely on the blocking operation.
 
-        Note also that we directly refer to the "protected" member of
-        ccsession (_cc._session) rather than creating a separate channel.
-        It's probably not the best practice, but hopefully we can introduce
-        a cleaner way when we support asynchronous communication.
-        At the moment we prefer the brevity with the use of internal channel
-        of the cc session.
-
         '''
         try:
-            seq = self._cc._session.group_sendmsg(msg, modname)
-            answer, _ = self._cc._session.group_recvmsg(False, seq)
-            rcode, error_msg = parse_answer(answer)
-        except (SessionTimeout, SessionError, ProtocolError) as ex:
-            rcode = 1
-            error_msg = str(ex)
-        if rcode == 0:
+            # FIXME? Is really rpc_call the correct one? What if there are more
+            # than one recipient of the given kind? What if none? We need to
+            # think of some kind of notification/broadcast mechanism.
+            self._cc.rpc_call(command, modname, params=params)
             logger.debug(TRACE_BASIC, DDNS_UPDATE_NOTIFY, modname,
                          ZoneFormatter(zname, zclass))
-        else:
+        except (SessionTimeout, SessionError, ProtocolError, RPCError) as ex:
             logger.error(DDNS_UPDATE_NOTIFY_FAIL, modname,
-                         ZoneFormatter(zname, zclass), error_msg)
+                         ZoneFormatter(zname, zclass), ex)
 
     def handle_session(self, fileno):
         """Handle incoming session on the socket with given fileno.

+ 17 - 27
src/bin/ddns/ddns_messages.mes

@@ -69,7 +69,7 @@ it's just a timing issue.  The number of total failed attempts is also
 logged.  If it reaches an internal threshold b10-ddns considers it a
 fatal error and terminates.  Even in that case, if b10-ddns is
 configured as a "dispensable" component (which is the default), the
-parent bind10 process will restart it, and there will be another
+parent ("init") process will restart it, and there will be another
 chance of getting the remote configuration successfully.  These are
 not the optimal behavior, but it's believed to be sufficient in
 practice (there would normally be no failure in the first place).  If
@@ -253,29 +253,19 @@ notify messages to secondary servers.
 b10-ddns has made updates to a zone based on an update request and
 tried to notify an external component of the updates, but the
 notification fails.  One possible cause of this is that the external
-component is not really running and it times out in waiting for the
-response, although it will be less likely to happen in practice
-because these components will normally be configured to run when the
-server provides the authoritative DNS service; ddns is rather optional
-among them.  If this happens, however, it will suspend b10-ddns for a
-few seconds during which it cannot handle new requests (some may be
-delayed, some may be dropped, depending on the volume of the incoming
-requests).  This is obviously bad, and if this error happens due to
-this reason, the administrator should make sure the component in
-question should be configured to run.  For a longer term, b10-ddns
-should be more robust about this case such as by making this
-notification asynchronously and/or detecting the existence of the
-external components to avoid hopeless notification in the first place.
-Severity of this error for the receiving components depends on the
-type of the component.  If it's b10-xfrout, this means DNS notify
-messages won't be sent to secondary servers of the zone.  It's
-suboptimal, but not necessarily critical as the secondary servers will
-try to check the zone's status periodically.  If it's b10-auth and the
-notification was needed to have it reload the corresponding zone, it's
-more serious because b10-auth won't be able to serve the new version
-of the zone unless some explicit recovery action is taken.  So the
-administrator needs to examine this message and takes an appropriate
-action.  In either case, this notification is generally expected to
-succeed; so the fact it fails itself means there's something wrong in
-the BIND 10 system, and it would be advisable to check other log
-messages.
+component is not really running, although it will be less likely to
+happen in practice because these components will normally be
+configured to run when the server provides the authoritative DNS
+service; ddns is rather optional among them. Severity of this error
+for the receiving components depends on the type of the component.  If
+it's b10-xfrout, this means DNS notify messages won't be sent to
+secondary servers of the zone.  It's suboptimal, but not necessarily
+critical as the secondary servers will try to check the zone's status
+periodically.  If it's b10-auth and the notification was needed to
+have it reload the corresponding zone, it's more serious because
+b10-auth won't be able to serve the new version of the zone unless
+some explicit recovery action is taken.  So the administrator needs to
+examine this message and takes an appropriate action.  In either case,
+this notification is generally expected to succeed; so the fact it
+fails itself means there's something wrong in the BIND 10 system, and
+it would be advisable to check other log messages.

+ 3 - 2
src/bin/ddns/tests/ddns_test.py

@@ -191,7 +191,7 @@ class FakeKeyringModule:
         '''Simply return the predefined TSIG keyring unconditionally.'''
         return TEST_TSIG_KEYRING
 
-class MyCCSession(isc.config.ConfigData):
+class MyCCSession(isc.config.ModuleCCSession):
     '''Fake session with minimal interface compliance.'''
 
     # faked CC sequence used in group_send/recvmsg
@@ -276,7 +276,8 @@ class MyCCSession(isc.config.ConfigData):
                     'secondary_zones')
                 return seczone_default, True
 
-    def group_sendmsg(self, msg, group):
+    def group_sendmsg(self, msg, group, instance='*', to='*',
+                      want_answer=False):
         # remember the passed parameter, and return dummy sequence
         self._sent_msg.append((msg, group))
         if self._sendmsg_exception is not None:

+ 55 - 50
src/bin/stats/stats.py.in

@@ -251,41 +251,45 @@ class Stats:
         # It counts the number of instances of same module by
         # examining the third value from the array result of
         # 'show_processes' of Init
-        seq = self.cc_session.group_sendmsg(
-            isc.config.ccsession.create_command("show_processes"),
-            'Init')
-        (answer, env) = self.cc_session.group_recvmsg(False, seq)
+        try:
+            value = self.mccs.rpc_call('show_processes', 'Init')
+        except isc.config.RPCRecipientMissing:
+            # This has been SessionTimeout before, so we keep the original
+            # behavior.
+            raise
+        except isc.config.RPCError:
+            # TODO: Is it OK to just pass? As part of refactoring, preserving
+            # the original behaviour.
+            value = None
         modules = []
-        if answer:
-            (rcode, value) = isc.config.ccsession.parse_answer(answer)
-            if rcode == 0 and type(value) is list:
-                # NOTE: For example, the "show_processes" command
-                # of Init is assumed to return the response in this
-                # format:
-                #  [
-                #  ...
-                #    [
-                #      20061,
-                #      "b10-auth",
-                #      "Auth"
-                #    ],
-                #    [
-                #      20103,
-                #      "b10-auth-2",
-                #      "Auth"
-                #    ]
-                #  ...
-                #  ]
-                # If multiple instances of the same module are
-                # running, the address names of them, which are at the
-                # third element, must be also same. Thus, the value of
-                # the third element of each outer element is read here
-                # for counting multiple instances.  This is a
-                # workaround for counting the instances. This should
-                # be fixed in another proper way in the future
-                # release.
-                modules = [ v[2] if type(v) is list and len(v) > 2 \
-                                else None for v in value ]
+        if type(value) is list:
+            # NOTE: For example, the "show_processes" command
+            # of Init is assumed to return the response in this
+            # format:
+            #  [
+            #  ...
+            #    [
+            #      20061,
+            #      "b10-auth",
+            #      "Auth"
+            #    ],
+            #    [
+            #      20103,
+            #      "b10-auth-2",
+            #      "Auth"
+            #    ]
+            #  ...
+            #  ]
+            # If multiple instances of the same module are
+            # running, the address names of them, which are at the
+            # third element, must be also same. Thus, the value of
+            # the third element of each outer element is read here
+            # for counting multiple instances.  This is a
+            # workaround for counting the instances. This should
+            # be fixed in another proper way in the future
+            # release.
+            modules = [ v[2] if type(v) is list and len(v) > 2 \
+                            else None for v in value ]
         # start requesting each module to collect statistics data
         sequences = []
         for (module_name, data) in self.get_statistics_data().items():
@@ -296,7 +300,12 @@ class Stats:
                          module_name)
             cmd = isc.config.ccsession.create_command(
                 "getstats", None) # no argument
-            seq = self.cc_session.group_sendmsg(cmd, module_name)
+            # Not using rpc_call here. We first send a bunch of commands, then
+            # collect all the answers. This eliminates some of the round-trip
+            # times. Unfortunately, rpc_call is not flexible enough to allow
+            # this, though the future rpc_call_async could.
+            seq = self.cc_session.group_sendmsg(cmd, module_name,
+                                                want_answer=True)
             sequences.append((module_name, seq))
             cnt = modules.count(module_name)
             if cnt > 1:
@@ -421,21 +430,17 @@ class Stats:
         raises StatsError.
         """
         modules = {}
-        seq = self.cc_session.group_sendmsg(
-            isc.config.ccsession.create_command(
-                isc.config.ccsession.COMMAND_GET_STATISTICS_SPEC),
-            'ConfigManager')
-        (answer, env) = self.cc_session.group_recvmsg(False, seq)
-        if answer:
-            (rcode, value) = isc.config.ccsession.parse_answer(answer)
-            if rcode == 0:
-                for mod in value:
-                    spec = { "module_name" : mod }
-                    if value[mod] and type(value[mod]) is list:
-                        spec["statistics"] = value[mod]
-                    modules[mod] = isc.config.module_spec.ModuleSpec(spec)
-            else:
-                raise StatsError("Updating module spec fails: " + str(value))
+        try:
+            value = self.mccs.rpc_call(isc.config.ccsession. \
+                                       COMMAND_GET_STATISTICS_SPEC,
+                                       'ConfigManager')
+        except isc.config.RPCError as e:
+            raise StatsError("Updating module spec fails: " + str(e))
+        for mod in value:
+            spec = { "module_name" : mod }
+            if value[mod] and type(value[mod]) is list:
+                spec["statistics"] = value[mod]
+            modules[mod] = isc.config.module_spec.ModuleSpec(spec)
         modules[self.module_name] = self.mccs.get_module_spec()
         self.modules = modules
 

+ 8 - 20
src/bin/stats/stats_httpd.py.in

@@ -459,20 +459,14 @@ class StatsHttpd:
         if name is not None:
             param['name'] = name
         try:
-            seq = self.cc_session.group_sendmsg(
-                isc.config.ccsession.create_command('show', param), 'Stats')
-            (answer, env) = self.cc_session.group_recvmsg(False, seq)
-            if answer:
-                (rcode, value) = isc.config.ccsession.parse_answer(answer)
+            return self.mccs.rpc_call('show', 'Stats', params=param)
         except (isc.cc.session.SessionTimeout,
-                isc.cc.session.SessionError) as err:
+                isc.cc.session.SessionError,
+                isc.config.RPCRecipientMissing) as err:
             raise StatsHttpdError("%s: %s" %
                                   (err.__class__.__name__, err))
-        else:
-            if rcode == 0:
-                return value
-            else:
-                raise StatsHttpdDataError("Stats module: %s" % str(value))
+        except isc.config.RPCError as e:
+            raise StatsHttpdDataError("Stats module: %s" % str(e))
 
     def get_stats_spec(self, owner=None, name=None):
         """Requests statistics data to the Stats daemon and returns
@@ -493,15 +487,9 @@ class StatsHttpd:
         if name is not None:
             param['name'] = name
         try:
-            seq = self.cc_session.group_sendmsg(
-                isc.config.ccsession.create_command('showschema', param), 'Stats')
-            (answer, env) = self.cc_session.group_recvmsg(False, seq)
-            if answer:
-                (rcode, value) = isc.config.ccsession.parse_answer(answer)
-                if rcode == 0:
-                    return value
-                else:
-                    raise StatsHttpdDataError("Stats module: %s" % str(value))
+            return self.mccs.rpc_call('showschema', 'Stats', params=param)
+        except isc.config.RPCError as e:
+            raise StatsHttpdDataError("Stats module: %s" % str(e))
         except (isc.cc.session.SessionTimeout,
                 isc.cc.session.SessionError) as err:
             raise StatsHttpdError("%s: %s" %

+ 1 - 1
src/bin/xfrin/tests/xfrin_test.py

@@ -2909,7 +2909,7 @@ class TestXfrinProcessMockCCSession:
         self.recv_called = False
         self.recv_called_correctly = False
 
-    def group_sendmsg(self, msg, module):
+    def group_sendmsg(self, msg, module, want_answer=False):
         self.send_called = True
         if module == 'Auth' and msg['command'][0] == 'loadzone':
             self.send_called_correctly = True

+ 17 - 5
src/bin/xfrin/xfrin.py.in

@@ -982,7 +982,6 @@ class XfrinConnection(asyncore.dispatcher):
                          format_addrinfo(self._master_addrinfo))
             ret = XFRIN_FAIL
         except XfrinProtocolError as e:
-            # FIXME: Why is this .info? Even the messageID contains "ERROR".
             logger.info(XFRIN_XFR_TRANSFER_PROTOCOL_VIOLATION, req_str,
                         self.zone_str(),
                         format_addrinfo(self._master_addrinfo), str(e))
@@ -1324,7 +1323,8 @@ def _do_auth_loadzone(server, zone_name, zone_class):
         param = msg['command'][1]
         logger.debug(DBG_XFRIN_TRACE, XFRIN_AUTH_LOADZONE, param["origin"],
                      param["class"])
-        seq = server._send_cc_session.group_sendmsg(msg, AUTH_MODULE_NAME)
+        seq = server._send_cc_session.group_sendmsg(msg, AUTH_MODULE_NAME,
+                                                    want_answer=True)
         answer, env = server._send_cc_session.group_recvmsg(False, seq)
 
 class Xfrin:
@@ -1630,18 +1630,29 @@ class Xfrin:
         param = {'zone_name': zone_name.to_text(),
                  'zone_class': zone_class.to_text()}
         if xfr_result == XFRIN_OK:
+            # FIXME: Due to the hack with two different CC sessions
+            # (see the _cc_setup comment) and the fact the rpc_call
+            # is a high-level call present only at ModuleCCSession,
+            # we are forced to use the primitive way of manually
+            # calling group_sendmsg and the group_recvmsg. Also, why
+            # do we do group_recvmsg when we don't need the answer?
+            # And why is this direct RPC call if a notification would
+            # be more appropriate?
             _do_auth_loadzone(self, zone_name, zone_class)
             msg = create_command(notify_out.ZONE_NEW_DATA_READY_CMD, param)
             # catch the exception, in case msgq has been killed.
             try:
                 seq = self._send_cc_session.group_sendmsg(msg,
-                                                          XFROUT_MODULE_NAME)
+                                                          XFROUT_MODULE_NAME,
+                                                          want_answer=True)
                 try:
                     answer, env = self._send_cc_session.group_recvmsg(False,
                                                                       seq)
                 except isc.cc.session.SessionTimeout:
                     pass        # for now we just ignore the failure
-                seq = self._send_cc_session.group_sendmsg(msg, ZONE_MANAGER_MODULE_NAME)
+                seq = self._send_cc_session.group_sendmsg(msg,
+                                                          ZONE_MANAGER_MODULE_NAME,
+                                                          want_answer=True)
                 try:
                     answer, env = self._send_cc_session.group_recvmsg(False,
                                                                       seq)
@@ -1654,7 +1665,8 @@ class Xfrin:
             msg = create_command(notify_out.ZONE_XFRIN_FAILED, param)
             # catch the exception, in case msgq has been killed.
             try:
-                seq = self._send_cc_session.group_sendmsg(msg, ZONE_MANAGER_MODULE_NAME)
+                seq = self._send_cc_session.group_sendmsg(msg, ZONE_MANAGER_MODULE_NAME,
+                                                          want_answer=True)
                 try:
                     answer, env = self._send_cc_session.group_recvmsg(False,
                                                                       seq)

+ 8 - 16
src/bin/zonemgr/tests/zonemgr_test.py

@@ -41,23 +41,16 @@ TEST_SQLITE3_DBFILE = os.getenv("TESTDATAOBJDIR") + '/initdb.file'
 class ZonemgrTestException(Exception):
     pass
 
-class MySession():
-    def __init__(self):
-        pass
-
-    def group_sendmsg(self, msg, module_name):
-        if module_name not in ("Auth", "Xfrin"):
-            raise ZonemgrTestException("module name not exist")
-
-    def group_recvmsg(self, nonblock, seq):
-        return None, None
-
 class FakeCCSession(isc.config.ConfigData, MockModuleCCSession):
     def __init__(self):
         module_spec = isc.config.module_spec_from_file(SPECFILE_LOCATION)
         ConfigData.__init__(self, module_spec)
         MockModuleCCSession.__init__(self)
 
+    def rpc_call(self, command, module, instance="*", to="*", params=None):
+        if module not in ("Auth", "Xfrin"):
+            raise ZonemgrTestException("module name not exist")
+
     def get_remote_config_value(self, module_name, identifier):
         if module_name == "Auth" and identifier == "database_file":
             return TEST_SQLITE3_DBFILE, False
@@ -84,8 +77,8 @@ class MyZonemgrRefresh(ZonemgrRefresh):
                 return None
         sqlite3_ds.get_zone_soa = get_zone_soa
 
-        ZonemgrRefresh.__init__(self, MySession(), TEST_SQLITE3_DBFILE,
-                                self._slave_socket, FakeCCSession())
+        ZonemgrRefresh.__init__(self, TEST_SQLITE3_DBFILE, self._slave_socket,
+                                FakeCCSession())
         current_time = time.time()
         self._zonemgr_refresh_info = {
          ('example.net.', 'IN'): {
@@ -619,7 +612,6 @@ class MyZonemgr(Zonemgr):
         self._db_file = TEST_SQLITE3_DBFILE
         self._zone_refresh = None
         self._shutdown_event = threading.Event()
-        self._cc = MySession()
         self._module_cc = FakeCCSession()
         self._config_data = {
                     "lowerbound_refresh" : 10,
@@ -664,8 +656,8 @@ class TestZonemgr(unittest.TestCase):
         self.zonemgr.config_handler(config_data3)
         self.assertEqual(0.5, self.zonemgr._config_data.get("refresh_jitter"))
         # The zone doesn't exist in database, simply skip loading soa for it and log an warning
-        self.zonemgr._zone_refresh = ZonemgrRefresh(None, TEST_SQLITE3_DBFILE,
-                                                    None, FakeCCSession())
+        self.zonemgr._zone_refresh = ZonemgrRefresh(TEST_SQLITE3_DBFILE, None,
+                                                    FakeCCSession())
         config_data1["secondary_zones"] = [{"name": "nonexistent.example",
                                             "class": "IN"}]
         self.assertEqual(self.zonemgr.config_handler(config_data1),

+ 9 - 10
src/bin/zonemgr/zonemgr.py.in

@@ -103,8 +103,8 @@ class ZonemgrRefresh:
     can be stopped by calling shutdown() in another thread.
     """
 
-    def __init__(self, cc, db_file, slave_socket, module_cc_session):
-        self._cc = cc
+    def __init__(self, db_file, slave_socket, module_cc_session):
+        self._mccs = module_cc_session
         self._check_sock = slave_socket
         self._db_file = db_file
         self._zonemgr_refresh_info = {}
@@ -277,15 +277,15 @@ class ZonemgrRefresh:
 
     def _send_command(self, module_name, command_name, params):
         """Send command between modules."""
-        msg = create_command(command_name, params)
         try:
-            seq = self._cc.group_sendmsg(msg, module_name)
-            try:
-                answer, env = self._cc.group_recvmsg(False, seq)
-            except isc.cc.session.SessionTimeout:
-                pass        # for now we just ignore the failure
+            self._mccs.rpc_call(command_name, module_name, params=params)
         except socket.error:
+            # FIXME: WTF? Where does socket.error come from? And how do we ever
+            # dare ignore such serious error? It can only be broken link to
+            # msgq, we need to terminate then.
             logger.error(ZONEMGR_SEND_FAIL, module_name)
+        except (isc.cc.session.SessionTimeout, isc.config.RPCError):
+            pass        # for now we just ignore the failure
 
     def _find_need_do_refresh_zone(self):
         """Find the first zone need do refresh, if no zone need
@@ -525,7 +525,7 @@ class Zonemgr:
         self._db_file = self.get_db_file()
         # Create socket pair for communicating between main thread and zonemgr timer thread
         self._master_socket, self._slave_socket = socket.socketpair(socket.AF_UNIX, socket.SOCK_STREAM)
-        self._zone_refresh = ZonemgrRefresh(self._cc, self._db_file, self._slave_socket, self._module_cc)
+        self._zone_refresh = ZonemgrRefresh(self._db_file, self._slave_socket, self._module_cc)
         self._zone_refresh.run_timer()
 
         self._lock = threading.Lock()
@@ -536,7 +536,6 @@ class Zonemgr:
         """Setup two sessions for zonemgr, one(self._module_cc) is used for receiving
         commands and config data sent from other modules, another one (self._cc)
         is used to send commands to proper modules."""
-        self._cc = isc.cc.Session()
         self._module_cc = isc.config.ModuleCCSession(SPECFILE_LOCATION,
                                                   self.config_handler,
                                                   self.command_handler)

+ 1 - 0
src/lib/cc/proto_defs.cc

@@ -38,6 +38,7 @@ const char* const CC_COMMAND_SEND = "send";
 const char* const CC_TO_WILDCARD = "*";
 const char* const CC_INSTANCE_WILDCARD = "*";
 // Reply codes
+const int CC_REPLY_SUCCESS = 0;
 const int CC_REPLY_NO_RECPT = -1;
 
 }

+ 66 - 3
src/lib/python/isc/config/ccsession.py

@@ -37,6 +37,7 @@
 """
 
 from isc.cc import Session
+from isc.cc.proto_defs import *
 from isc.config.config_data import ConfigData, MultiConfigData, BIND10_CONFIG_DATA_VERSION
 import isc.config.module_spec
 import isc
@@ -50,6 +51,31 @@ logger = isc.log.Logger("config")
 
 class ModuleCCSessionError(Exception): pass
 
+class RPCError(ModuleCCSessionError):
+    """
+    An exception raised by rpc_call in case the remote side reports
+    an error. It can be used to distinguish remote errors from protocol errors.
+    Also, it holds the code as well as the error message.
+    """
+    def __init__(self, code, message):
+        ModuleCCSessionError.__init__(self, message)
+        self.__code = code
+
+    def code(self):
+        """
+        The code as sent over the CC.
+        """
+        return self.__code
+
+class RPCRecipientMissing(RPCError):
+    """
+    Special version of the RPCError, for cases the recipient of the call
+    isn't connected to the bus. The code is always
+    isc.cc.proto_defs.CC_REPLY_NO_RECPT.
+    """
+    def __init__(self, message):
+        RPCError.__init__(self, CC_REPLY_NO_RECPT, message)
+
 def parse_answer(msg):
     """Returns a tuple (rcode, value), where value depends on the
        command that was called. If rcode != 0, value is a string
@@ -66,7 +92,8 @@ def parse_answer(msg):
         raise ModuleCCSessionError("wrong rcode type in answer message")
     else:
         if len(msg['result']) > 1:
-            if (msg['result'][0] != 0 and type(msg['result'][1]) != str):
+            if (msg['result'][0] != CC_REPLY_SUCCESS and
+                type(msg['result'][1]) != str):
                 raise ModuleCCSessionError("rcode in answer message is non-zero, value is not a string")
             return msg['result'][0], msg['result'][1]
         else:
@@ -79,7 +106,7 @@ def create_answer(rcode, arg = None):
        a string containing an error message"""
     if type(rcode) != int:
         raise ModuleCCSessionError("rcode in create_answer() must be an integer")
-    if rcode != 0 and type(arg) != str:
+    if rcode != CC_REPLY_SUCCESS and type(arg) != str:
         raise ModuleCCSessionError("arg in create_answer for rcode != 0 must be a string describing the error")
     if arg != None:
         return { 'result': [ rcode, arg ] }
@@ -299,7 +326,7 @@ class ModuleCCSession(ConfigData):
                         isc.cc.data.remove_identical(new_config, self.get_local_config())
                         answer = self._config_handler(new_config)
                         rcode, val = parse_answer(answer)
-                        if rcode == 0:
+                        if rcode == CC_REPLY_SUCCESS:
                             newc = self.get_local_config()
                             isc.cc.data.merge(newc, new_config)
                             self.set_local_config(newc)
@@ -474,6 +501,42 @@ class ModuleCCSession(ConfigData):
         except isc.cc.SessionTimeout:
             raise ModuleCCSessionError("CC Session timeout waiting for configuration manager")
 
+    def rpc_call(self, command, group, instance=CC_INSTANCE_WILDCARD,
+                 to=CC_TO_WILDCARD, params=None):
+        """
+        Create a command with the given name and parameters. Send it to a
+        recipient, wait for the answer and parse it.
+
+        This is a wrapper around the group_sendmsg and group_recvmsg on the CC
+        session. It exists mostly for convenience.
+
+        Params:
+        - command: Name of the command to call on the remote side.
+        - group, instance, to: Address specification of the recipient.
+        - params: Parameters to pass to the command (as keyword arguments).
+
+        Return: The return value of the remote call (just the value, no status
+          code or anything). May be None.
+
+        Raise:
+        - RPCRecipientMissing if the given recipient doesn't exist.
+        - RPCError if the other side sent an error response. The error string
+          is in the exception.
+        - ModuleCCSessionError in case of protocol errors, like malformed
+          answer.
+        """
+        cmd = create_command(command, params)
+        seq = self._session.group_sendmsg(cmd, group, instance=instance,
+                                          to=to, want_answer=True)
+        # For non-blocking, we'll have rpc_call_async (once the nonblock
+        # actualy works)
+        reply, rheaders = self._session.group_recvmsg(nonblock=False, seq=seq)
+        code, value = parse_answer(reply)
+        if code == CC_REPLY_NO_RECPT:
+            raise RPCRecipientMissing(value)
+        elif code != CC_REPLY_SUCCESS:
+            raise RPCError(code, value)
+        return value
 
 class UIModuleCCSession(MultiConfigData):
     """This class is used in a configuration user interface. It contains

+ 61 - 0
src/lib/python/isc/config/tests/ccsession_test.py

@@ -289,6 +289,67 @@ class TestModuleCCSession(unittest.TestCase):
         fake_session.close()
         mccs.__del__() # with closed fake_session
 
+    def rpc_check(self, reply):
+        fake_session = FakeModuleCCSession()
+        mccs = self.create_session("spec1.spec", None, None, fake_session)
+        fake_session.message_queue = [
+            ["Spec1", None, reply, False]
+        ]
+        exception = None
+        try:
+            result = mccs.rpc_call("test", "Spec2", params={
+                                       "param1": "Param 1",
+                                       "param2": "Param 2"
+                                   })
+        except Exception as e:
+            # We first want to check the value sent, raise the exception
+            # afterwards. So store it for a short while.
+            exception = e
+        self.assertEqual([
+                ["Spec2", "*", {"command": ["test", {
+                    "param1": "Param 1",
+                    "param2": "Param 2"
+                }]}, True]
+            ], fake_session.message_queue)
+        if exception is not None:
+            raise exception
+        return result
+
+    def test_rpc_call_success(self):
+        """
+        Test we can send an RPC (command) and get an answer. The answer is
+        success in this case.
+        """
+        result = self.rpc_check({"result": [0, {"Hello": "a"}]})
+        self.assertEqual({"Hello": "a"}, result)
+
+    def test_rpc_call_success_none(self):
+        """
+        Test the success case of RPC command, but the answer is empty
+        (eg. a "void" function on the remote side).
+        """
+        self.assertIsNone(self.rpc_check({"result": [0]}))
+
+    def test_rpc_call_malformed_answer(self):
+        """
+        Test it successfully raises ModuleCCSessionError when a malformed
+        reply is sent.
+        """
+        self.assertRaises(ModuleCCSessionError, self.rpc_check, ["Nonsense"])
+
+    def test_rpc_call_error(self):
+        """
+        Test it raises an exception when the remote side reports an error.
+        """
+        self.assertRaises(RPCError, self.rpc_check, {"result": [1, "Error"]})
+
+    def test_rpc_call_no_recpt(self):
+        """
+        Test RPC raises an error when the recipient is not there.
+        """
+        self.assertRaises(RPCRecipientMissing, self.rpc_check,
+                          {"result": [-1, "Error"]})
+
     def my_config_handler_ok(self, new_config):
         return isc.config.ccsession.create_answer(0)
 

+ 6 - 4
src/lib/python/isc/config/tests/unittest_fakesession.py

@@ -28,7 +28,7 @@ class WouldBlockForever(Exception):
 class FakeModuleCCSession:
     def __init__(self):
         self.subscriptions = {}
-        # each entry is of the form [ channel, instance, message ]
+        # each entry is of the form [ channel, instance, message, want_answer ]
         self.message_queue = []
         self._socket = "ok we just need something not-None here atm"
         # if self.timeout is set to anything other than 0, and
@@ -68,12 +68,14 @@ class FakeModuleCCSession:
         else:
             return False
 
-    def group_sendmsg(self, msg, channel, target = None):
-        self.message_queue.append([ channel, target, msg ])
+    def group_sendmsg(self, msg, group, instance=None, to=None,
+                      want_answer=False):
+        self.message_queue.append([ group, instance, msg, want_answer ])
+        return 42
 
     def group_reply(self, env, msg):
         if 'group' in env:
-            self.message_queue.append([ env['group'], None, msg])
+            self.message_queue.append([ env['group'], None, msg, False])
 
     def group_recvmsg(self, nonblock=True, seq = None):
         for qm in self.message_queue:

+ 9 - 5
src/lib/python/isc/server_common/auth_command.py

@@ -22,6 +22,13 @@ from isc.log_messages.server_common_messages import *
 from isc.server_common.logger import logger
 
 AUTH_MODULE_NAME = 'Auth'
+AUTH_LOADZONE_COMMAND = 'loadzone'
+
+def auth_loadzone_params(zone_name, zone_class):
+    return {
+        "origin": zone_name.to_text(),
+        "class": zone_class.to_text()
+    }
 
 def auth_loadzone_command(module_cc, zone_name, zone_class):
     '''Create a 'loadzone' command with a given zone for Auth server.
@@ -50,8 +57,5 @@ def auth_loadzone_command(module_cc, zone_name, zone_class):
     # to notification-driven approach, at which point the function would
     # be changed a lot.
 
-    param = {
-        "origin": zone_name.to_text(),
-        "class": zone_class.to_text()
-    }
-    return create_command("loadzone", param)
+    return create_command(AUTH_LOADZONE_COMMAND,
+                          auth_loadzone_params(zone_name, zone_class))