Parcourir la source

rebase to trunk

git-svn-id: svn://bind10.isc.org/svn/bind10/branches/trac341@2950 e5f2f494-b856-4b98-b285-d166d9295462
Jeremy C. Reed il y a 14 ans
Parent
commit
eb344309b9

+ 4 - 0
ChangeLog

@@ -1,3 +1,7 @@
+  95.	[bug]		jreed
+	bin/xfrout and bin/zonemgr: Fixed some stderr output.
+	(Trac #342, svn r2949)
+
   94.	[bug]		jelte
   94.	[bug]		jelte
   	bin/xfrout:  Fixed a problem in xfrout where only 2 or 3 RRs
   	bin/xfrout:  Fixed a problem in xfrout where only 2 or 3 RRs
 	were used per DNS message in the xfrout stream.
 	were used per DNS message in the xfrout stream.

+ 11 - 4
src/bin/bind10/bind10.py.in

@@ -65,7 +65,9 @@ import posix
 import isc.cc
 import isc.cc
 
 
 # This is the version that gets displayed to the user.
 # This is the version that gets displayed to the user.
-__version__ = "v20100531"
+# The VERSION string consists of the module name, the module version
+# number, and the overall BIND 10 version number (set in configure.ac).
+VERSION = "bind10 20100916 (BIND 10 @PACKAGE_VERSION@)"
 
 
 class RestartSchedule:
 class RestartSchedule:
     """
     """
@@ -627,7 +629,7 @@ def main():
 
 
 
 
     # Parse any command-line options.
     # Parse any command-line options.
-    parser = OptionParser(version=__version__)
+    parser = OptionParser(version=VERSION)
     parser.add_option("-a", "--address", dest="address", type="string",
     parser.add_option("-a", "--address", dest="address", type="string",
                       action="callback", callback=check_addr, default='',
                       action="callback", callback=check_addr, default='',
                       help="address the b10-auth daemon will use (default: listen on all addresses)")
                       help="address the b10-auth daemon will use (default: listen on all addresses)")
@@ -680,7 +682,7 @@ def main():
 
 
     # Announce startup.
     # Announce startup.
     if options.verbose:
     if options.verbose:
-        sys.stdout.write("BIND 10 %s\n" % __version__)
+        sys.stdout.write("%s\n" % VERSION)
 
 
     # TODO: set process name, perhaps by:
     # TODO: set process name, perhaps by:
     #       http://code.google.com/p/procname/
     #       http://code.google.com/p/procname/
@@ -735,7 +737,12 @@ def main():
 
 
         for fd in rlist + xlist:
         for fd in rlist + xlist:
             if fd == ccs_fd:
             if fd == ccs_fd:
-                boss_of_bind.ccs.check_command()
+                try:
+                    boss_of_bind.ccs.check_command()
+                except isc.cc.session.ProtocolError:
+                    if options.verbose:
+                        sys.stderr.write("[bind10] msgq channel disappeared.\n")
+                    break
             elif fd == wakeup_fd:
             elif fd == wakeup_fd:
                 os.read(wakeup_fd, 32)
                 os.read(wakeup_fd, 32)
 
 

+ 8 - 2
src/bin/cmdctl/cmdctl.py.in

@@ -380,6 +380,7 @@ class CommandControl():
     def send_command(self, module_name, command_name, params = None):
     def send_command(self, module_name, command_name, params = None):
         '''Send the command from bindctl to proper module. '''
         '''Send the command from bindctl to proper module. '''
         errstr = 'unknown error'
         errstr = 'unknown error'
+        answer = None
         if self._verbose:
         if self._verbose:
             self.log_info("Begin send command '%s' to module '%s'" %(command_name, module_name))
             self.log_info("Begin send command '%s' to module '%s'" %(command_name, module_name))
 
 
@@ -390,7 +391,10 @@ class CommandControl():
             msg = ccsession.create_command(command_name, params)
             msg = ccsession.create_command(command_name, params)
             seq = self._cc.group_sendmsg(msg, module_name)
             seq = self._cc.group_sendmsg(msg, module_name)
             #TODO, it may be blocked, msqg need to add a new interface waiting in timeout.
             #TODO, it may be blocked, msqg need to add a new interface waiting in timeout.
-            answer, env = self._cc.group_recvmsg(False, seq)
+            try:
+                answer, env = self._cc.group_recvmsg(False, seq)
+            except isc.cc.session.SessionTimeout:
+                errstr = "Module '%s' not responding" % module_name
 
 
         if self._verbose:
         if self._verbose:
             self.log_info("Finish send command '%s' to module '%s'" % (command_name, module_name))
             self.log_info("Finish send command '%s' to module '%s'" % (command_name, module_name))
@@ -410,7 +414,6 @@ class CommandControl():
             except ccsession.ModuleCCSessionError as mcse:
             except ccsession.ModuleCCSessionError as mcse:
                 errstr = str("Error in ccsession answer:") + str(mcse)
                 errstr = str("Error in ccsession answer:") + str(mcse)
                 self.log_info(errstr)
                 self.log_info(errstr)
-        
         return 1, {'error': errstr}
         return 1, {'error': errstr}
     
     
     def log_info(self, msg):
     def log_info(self, msg):
@@ -602,6 +605,9 @@ if __name__ == '__main__':
     except isc.cc.SessionError as err:
     except isc.cc.SessionError as err:
         sys.stderr.write("[b10-cmdctl] Error creating b10-cmdctl, "
         sys.stderr.write("[b10-cmdctl] Error creating b10-cmdctl, "
                          "is the command channel daemon running?\n")        
                          "is the command channel daemon running?\n")        
+    except isc.cc.SessionTimeout:
+        sys.stderr.write("[b10-cmdctl] Error creating b10-cmdctl, "
+                         "is the configuration manager running?\n")        
     except KeyboardInterrupt:
     except KeyboardInterrupt:
         sys.stderr.write("[b10-cmdctl] exit from Cmdctl\n")
         sys.stderr.write("[b10-cmdctl] exit from Cmdctl\n")
     except CmdctlException as err:
     except CmdctlException as err:

+ 12 - 9
src/bin/xfrout/xfrout.py.in

@@ -27,7 +27,7 @@ from socketserver import *
 import os
 import os
 from isc.config.ccsession import *
 from isc.config.ccsession import *
 from isc.log.log import *
 from isc.log.log import *
-from isc.cc import SessionError
+from isc.cc import SessionError, SessionTimeout
 from isc.notify import notify_out
 from isc.notify import notify_out
 import socket
 import socket
 import select
 import select
@@ -310,8 +310,8 @@ class UnixSockServer(ThreadingUnixStreamServer):
         If it's not a socket file or nobody is listening
         If it's not a socket file or nobody is listening
         , it will be removed. If it can't be removed, exit from python. '''
         , it will be removed. If it can't be removed, exit from python. '''
         if self._sock_file_in_use(sock_file):
         if self._sock_file_in_use(sock_file):
-            print("[b10-xfrout] Fail to start xfrout process, unix socket" 
-                  " file '%s' is being used by another xfrout process" % sock_file)
+            sys.stderr.write("[b10-xfrout] Fail to start xfrout process, unix socket" 
+                  " file '%s' is being used by another xfrout process\n" % sock_file)
             sys.exit(0)
             sys.exit(0)
         else:
         else:
             if not os.path.exists(sock_file):
             if not os.path.exists(sock_file):
@@ -320,7 +320,7 @@ class UnixSockServer(ThreadingUnixStreamServer):
             try:
             try:
                 os.unlink(sock_file)
                 os.unlink(sock_file)
             except OSError as err:
             except OSError as err:
-                print('[b10-xfrout] Fail to remove file ' + sock_file, err)
+                sys.stderr.write('[b10-xfrout] Fail to remove file %s: %s\n' % (sock_file, err))
                 sys.exit(0)
                 sys.exit(0)
    
    
     def _sock_file_in_use(self, sock_file):
     def _sock_file_in_use(self, sock_file):
@@ -404,9 +404,9 @@ class XfroutServer:
         self._listen_sock_file = UNIX_SOCKET_FILE 
         self._listen_sock_file = UNIX_SOCKET_FILE 
         self._shutdown_event = threading.Event()
         self._shutdown_event = threading.Event()
         self._cc = isc.config.ModuleCCSession(SPECFILE_LOCATION, self.config_handler, self.command_handler)
         self._cc = isc.config.ModuleCCSession(SPECFILE_LOCATION, self.config_handler, self.command_handler)
-        self._cc.add_remote_config(AUTH_SPECFILE_LOCATION);
         self._config_data = self._cc.get_full_config()
         self._config_data = self._cc.get_full_config()
         self._cc.start()
         self._cc.start()
+        self._cc.add_remote_config(AUTH_SPECFILE_LOCATION);
         self._log = isc.log.NSLogger(self._config_data.get('log_name'), self._config_data.get('log_file'),
         self._log = isc.log.NSLogger(self._config_data.get('log_name'), self._config_data.get('log_file'),
                                 self._config_data.get('log_severity'), self._config_data.get('log_versions'),
                                 self._config_data.get('log_severity'), self._config_data.get('log_versions'),
                                 self._config_data.get('log_max_bytes'), True)
                                 self._config_data.get('log_max_bytes'), True)
@@ -520,12 +520,15 @@ if '__main__' == __name__:
         xfrout_server = XfroutServer()
         xfrout_server = XfroutServer()
         xfrout_server.run()
         xfrout_server.run()
     except KeyboardInterrupt:
     except KeyboardInterrupt:
-        sys.stderr.write("[b10-xfrout] exit xfrout process")
+        sys.stderr.write("[b10-xfrout] exit xfrout process\n")
     except SessionError as e:
     except SessionError as e:
-        sys.stderr.write("[b10-xfrout] Error creating xfrout," 
-                           "is the command channel daemon running?")
+        sys.stderr.write("[b10-xfrout] Error creating xfrout, "
+                           "is the command channel daemon running?\n")
+    except SessionTimeout as e:
+        sys.stderr.write("[b10-xfrout] Error creating xfrout, " 
+                           "is the configuration manager running?\n")
     except ModuleCCSessionError as e:
     except ModuleCCSessionError as e:
-        sys.stderr.write("info", '[b10-xfrout] exit xfrout process:', e)
+        sys.stderr.write("[b10-xfrout] exit xfrout process:%s\n" % str(e))
 
 
     if xfrout_server:
     if xfrout_server:
         xfrout_server.shutdown()
         xfrout_server.shutdown()

+ 6 - 3
src/bin/zonemgr/zonemgr.py.in

@@ -513,12 +513,15 @@ if '__main__' == __name__:
         zonemgrd = Zonemgr()
         zonemgrd = Zonemgr()
         zonemgrd.run()
         zonemgrd.run()
     except KeyboardInterrupt:
     except KeyboardInterrupt:
-        sys.stderr.write("[b10-zonemgr] exit zonemgr process")
+        sys.stderr.write("[b10-zonemgr] exit zonemgr process\n")
     except isc.cc.session.SessionError as e:
     except isc.cc.session.SessionError as e:
         sys.stderr.write("[b10-zonemgr] Error creating zonemgr, " 
         sys.stderr.write("[b10-zonemgr] Error creating zonemgr, " 
-                           "is the command channel daemon running?")
+                           "is the command channel daemon running?\n")
+    except isc.cc.session.SessionTimeout as e:
+        sys.stderr.write("[b10-zonemgr] Error creating zonemgr, " 
+                           "is the configuration manager running?\n")
     except isc.config.ModuleCCSessionError as e:
     except isc.config.ModuleCCSessionError as e:
-        sys.stderr.write("info", "[b10-zonemgr] exit zonemgr process:", e)
+        sys.stderr.write("[b10-zonemgr] exit zonemgr process: %s\n" % str(e))
 
 
     if zonemgrd:
     if zonemgrd:
         zonemgrd.shutdown()
         zonemgrd.shutdown()

+ 83 - 33
src/lib/python/isc/cc/session.py

@@ -16,6 +16,7 @@
 import sys
 import sys
 import socket
 import socket
 import struct
 import struct
+import errno
 import os
 import os
 import threading
 import threading
 import bind10_config
 import bind10_config
@@ -25,21 +26,21 @@ import isc.cc.message
 class ProtocolError(Exception): pass
 class ProtocolError(Exception): pass
 class NetworkError(Exception): pass
 class NetworkError(Exception): pass
 class SessionError(Exception): pass
 class SessionError(Exception): pass
+class SessionTimeout(Exception): pass
 
 
 class Session:
 class Session:
+    MSGQ_DEFAULT_TIMEOUT = 4000
+    
     def __init__(self, socket_file=None):
     def __init__(self, socket_file=None):
         self._socket = None
         self._socket = None
-        # store the current timeout value in seconds (the way
-        # settimeout() wants them, our API takes milliseconds
-        # so that it is consistent with the C++ version)
-        self._socket_timeout = 4;
         self._lname = None
         self._lname = None
-        self._recvbuffer = bytearray()
-        self._recvlength = 0
         self._sequence = 1
         self._sequence = 1
         self._closed = False
         self._closed = False
         self._queue = []
         self._queue = []
         self._lock = threading.RLock()
         self._lock = threading.RLock()
+        self.set_timeout(self.MSGQ_DEFAULT_TIMEOUT);
+        self._recv_len_size = 0
+        self._recv_size = 0
 
 
         if socket_file is None:
         if socket_file is None:
             if "BIND10_MSGQ_SOCKET_FILE" in os.environ:
             if "BIND10_MSGQ_SOCKET_FILE" in os.environ:
@@ -121,6 +122,43 @@ class Session:
                     return isc.cc.message.from_wire(data[2:header_length+2]), None
                     return isc.cc.message.from_wire(data[2:header_length+2]), None
             return None, None
             return None, None
 
 
+    def _receive_bytes(self, size):
+        """Try to get size bytes of data from the socket.
+           Raises a ProtocolError if the size is 0.
+           Raises any error from recv().
+           Returns whatever data was available (if >0 bytes).
+           """
+        data = self._socket.recv(size)
+        if len(data) == 0: # server closed connection
+            raise ProtocolError("Read of 0 bytes: connection closed")
+        return data
+        
+    def _receive_len_data(self):
+        """Reads self._recv_len_size bytes of data from the socket into
+           self._recv_len_data
+           This is done through class variables so in the case of
+           an EAGAIN we can continue on a subsequent call.
+           Raises a ProtocolError, a socket.error (which may be
+           timeout or eagain), or reads until we have all data we need.
+           """
+        while self._recv_len_size > 0:
+            new_data = self._receive_bytes(self._recv_len_size)
+            self._recv_len_data += new_data
+            self._recv_len_size -= len(new_data)
+
+    def _receive_data(self):
+        """Reads self._recv_size bytes of data from the socket into
+           self._recv_data.
+           This is done through class variables so in the case of
+           an EAGAIN we can continue on a subsequent call.
+           Raises a ProtocolError, a socket.error (which may be
+           timeout or eagain), or reads until we have all data we need.
+        """
+        while self._recv_size > 0:
+            new_data = self._receive_bytes(self._recv_size)
+            self._recv_data += new_data
+            self._recv_size -= len(new_data)
+
     def _receive_full_buffer(self, nonblock):
     def _receive_full_buffer(self, nonblock):
         if nonblock:
         if nonblock:
             self._socket.setblocking(0)
             self._socket.setblocking(0)
@@ -131,35 +169,47 @@ class Session:
             else:
             else:
                 self._socket.settimeout(self._socket_timeout)
                 self._socket.settimeout(self._socket_timeout)
 
 
-        if self._recvlength == 0:
-            length = 4
-            length -= len(self._recvbuffer)
-            try:
-                data = self._socket.recv(length)
-            except:
-                return None
-            if data == "": # server closed connection
-                raise ProtocolError("Read of 0 bytes: connection closed")
-            self._recvbuffer += data
-            if len(self._recvbuffer) < 4:
+        try:
+            # we might be in a call following an EAGAIN, in which case
+            # we simply continue. In the first case, either
+            # recv_size or recv_len size are not zero
+            # they may never both be non-zero (we are either starting
+            # a full read, or continuing one of the reads
+            assert self._recv_size == 0 or self._recv_len_size == 0
+            
+            if self._recv_size == 0:
+                if self._recv_len_size == 0:
+                    # both zero, start a new full read
+                    self._recv_len_size = 4
+                    self._recv_len_data = bytearray()
+                self._receive_len_data()
+
+                self._recv_size = struct.unpack('>I', self._recv_len_data)[0]
+                self._recv_data = bytearray()
+            self._receive_data()
+
+            # no EAGAIN, so copy data and reset internal counters
+            data = self._recv_data
+
+            self._recv_len_size = 0
+            self._recv_size = 0
+
+            return (data)
+
+        except socket.timeout:
+            raise SessionTimeout("recv() on cc session timed out")
+        except socket.error as se:
+            # Only keep data in case of EAGAIN
+            if se.errno == errno.EAGAIN:
                 return None
                 return None
-            self._recvlength = struct.unpack('>I', self._recvbuffer)[0]
-            self._recvbuffer = bytearray()
-
-        length = self._recvlength - len(self._recvbuffer)
-        while (length > 0):
-            try:
-                data = self._socket.recv(length)
-            except:
+            # unknown state otherwise, best to drop data
+            self._recv_len_size = 0
+            self._recv_size = 0
+            # ctrl-c can result in EINTR, return None to prevent
+            # stacktrace output
+            if se.errno == errno.EINTR:
                 return None
                 return None
-            if data == "": # server closed connection
-                raise ProtocolError("Read of 0 bytes: connection closed")
-            self._recvbuffer += data
-            length -= len(data)
-        data = self._recvbuffer
-        self._recvbuffer = bytearray()
-        self._recvlength = 0
-        return (data)
+            raise se
 
 
     def _next_sequence(self):
     def _next_sequence(self):
         self._sequence += 1
         self._sequence += 1

+ 21 - 16
src/lib/python/isc/cc/tests/session_test.py

@@ -28,6 +28,7 @@ class MySocket():
         self.type = type
         self.type = type
         self.recvqueue = bytearray()
         self.recvqueue = bytearray()
         self.sendqueue = bytearray()
         self.sendqueue = bytearray()
+        self._blocking = True
 
 
     def connect(self, to):
     def connect(self, to):
         pass
         pass
@@ -36,7 +37,7 @@ class MySocket():
         pass
         pass
 
 
     def setblocking(self, val):
     def setblocking(self, val):
-        pass
+        self._blocking = val
 
 
     def send(self, data):
     def send(self, data):
         self.sendqueue.extend(data);
         self.sendqueue.extend(data);
@@ -67,6 +68,11 @@ class MySocket():
         return result
         return result
 
 
     def recv(self, length):
     def recv(self, length):
+        if len(self.recvqueue) == 0:
+            if self._blocking:
+                return bytes()
+            else:
+                raise socket.error(errno.EAGAIN, "Resource temporarily unavailable")
         if length > len(self.recvqueue):
         if length > len(self.recvqueue):
             raise Exception("Buffer underrun in test, does the test provide the right data?")
             raise Exception("Buffer underrun in test, does the test provide the right data?")
         result = self.recvqueue[:length]
         result = self.recvqueue[:length]
@@ -105,7 +111,8 @@ class MySession(Session):
         self._socket_timeout = 1
         self._socket_timeout = 1
         self._lname = None
         self._lname = None
         self._recvbuffer = bytearray()
         self._recvbuffer = bytearray()
-        self._recvlength = 0
+        self._recv_len_size = 0
+        self._recv_size = 0
         self._sequence = 1
         self._sequence = 1
         self._closed = False
         self._closed = False
         self._queue = []
         self._queue = []
@@ -192,10 +199,10 @@ class testSession(unittest.TestCase):
         # get no message without asking for a specific sequence number reply
         # get no message without asking for a specific sequence number reply
         self.assertFalse(sess.has_queued_msgs())
         self.assertFalse(sess.has_queued_msgs())
         sess._socket.addrecv({'to': 'someone', 'reply': 1}, {"hello": "a"})
         sess._socket.addrecv({'to': 'someone', 'reply': 1}, {"hello": "a"})
-        env, msg = sess.recvmsg(False)
+        env, msg = sess.recvmsg(True)
         self.assertEqual(None, env)
         self.assertEqual(None, env)
         self.assertTrue(sess.has_queued_msgs())
         self.assertTrue(sess.has_queued_msgs())
-        env, msg = sess.recvmsg(False, 1)
+        env, msg = sess.recvmsg(True, 1)
         self.assertEqual({'to': 'someone', 'reply': 1}, env)
         self.assertEqual({'to': 'someone', 'reply': 1}, env)
         self.assertEqual({"hello": "a"}, msg)
         self.assertEqual({"hello": "a"}, msg)
         self.assertFalse(sess.has_queued_msgs())
         self.assertFalse(sess.has_queued_msgs())
@@ -204,11 +211,11 @@ class testSession(unittest.TestCase):
         # then ask for the one that is there
         # then ask for the one that is there
         self.assertFalse(sess.has_queued_msgs())
         self.assertFalse(sess.has_queued_msgs())
         sess._socket.addrecv({'to': 'someone', 'reply': 1}, {"hello": "a"})
         sess._socket.addrecv({'to': 'someone', 'reply': 1}, {"hello": "a"})
-        env, msg = sess.recvmsg(False, 2)
+        env, msg = sess.recvmsg(True, 2)
         self.assertEqual(None, env)
         self.assertEqual(None, env)
         self.assertEqual(None, msg)
         self.assertEqual(None, msg)
         self.assertTrue(sess.has_queued_msgs())
         self.assertTrue(sess.has_queued_msgs())
-        env, msg = sess.recvmsg(False, 1)
+        env, msg = sess.recvmsg(True, 1)
         self.assertEqual({'to': 'someone', 'reply': 1}, env)
         self.assertEqual({'to': 'someone', 'reply': 1}, env)
         self.assertEqual({"hello": "a"}, msg)
         self.assertEqual({"hello": "a"}, msg)
         self.assertFalse(sess.has_queued_msgs())
         self.assertFalse(sess.has_queued_msgs())
@@ -217,11 +224,11 @@ class testSession(unittest.TestCase):
         # then ask for any message
         # then ask for any message
         self.assertFalse(sess.has_queued_msgs())
         self.assertFalse(sess.has_queued_msgs())
         sess._socket.addrecv({'to': 'someone', 'reply': 1}, {"hello": "a"})
         sess._socket.addrecv({'to': 'someone', 'reply': 1}, {"hello": "a"})
-        env, msg = sess.recvmsg(False, 2)
+        env, msg = sess.recvmsg(True, 2)
         self.assertEqual(None, env)
         self.assertEqual(None, env)
         self.assertEqual(None, msg)
         self.assertEqual(None, msg)
         self.assertTrue(sess.has_queued_msgs())
         self.assertTrue(sess.has_queued_msgs())
-        env, msg = sess.recvmsg(False, 1)
+        env, msg = sess.recvmsg(True, 1)
         self.assertEqual({'to': 'someone', 'reply': 1}, env)
         self.assertEqual({'to': 'someone', 'reply': 1}, env)
         self.assertEqual({"hello": "a"}, msg)
         self.assertEqual({"hello": "a"}, msg)
         self.assertFalse(sess.has_queued_msgs())
         self.assertFalse(sess.has_queued_msgs())
@@ -233,16 +240,16 @@ class testSession(unittest.TestCase):
         # then ask for any message (get the second)
         # then ask for any message (get the second)
         self.assertFalse(sess.has_queued_msgs())
         self.assertFalse(sess.has_queued_msgs())
         sess._socket.addrecv({'to': 'someone', 'reply': 1}, {'hello': 'a'})
         sess._socket.addrecv({'to': 'someone', 'reply': 1}, {'hello': 'a'})
-        env, msg = sess.recvmsg(False, 2)
+        env, msg = sess.recvmsg(True, 2)
         self.assertEqual(None, env)
         self.assertEqual(None, env)
         self.assertEqual(None, msg)
         self.assertEqual(None, msg)
         self.assertTrue(sess.has_queued_msgs())
         self.assertTrue(sess.has_queued_msgs())
         sess._socket.addrecv({'to': 'someone' }, {'hello': 'b'})
         sess._socket.addrecv({'to': 'someone' }, {'hello': 'b'})
-        env, msg = sess.recvmsg(False, 1)
+        env, msg = sess.recvmsg(True, 1)
         self.assertEqual({'to': 'someone', 'reply': 1 }, env)
         self.assertEqual({'to': 'someone', 'reply': 1 }, env)
         self.assertEqual({"hello": "a"}, msg)
         self.assertEqual({"hello": "a"}, msg)
         self.assertFalse(sess.has_queued_msgs())
         self.assertFalse(sess.has_queued_msgs())
-        env, msg = sess.recvmsg(False)
+        env, msg = sess.recvmsg(True)
         self.assertEqual({'to': 'someone'}, env)
         self.assertEqual({'to': 'someone'}, env)
         self.assertEqual({"hello": "b"}, msg)
         self.assertEqual({"hello": "b"}, msg)
         self.assertFalse(sess.has_queued_msgs())
         self.assertFalse(sess.has_queued_msgs())
@@ -253,11 +260,11 @@ class testSession(unittest.TestCase):
         self.assertFalse(sess.has_queued_msgs())
         self.assertFalse(sess.has_queued_msgs())
         sess._socket.addrecv({'to': 'someone' }, {'hello': 'b'})
         sess._socket.addrecv({'to': 'someone' }, {'hello': 'b'})
         sess._socket.addrecv({'to': 'someone', 'reply': 1}, {'hello': 'a'})
         sess._socket.addrecv({'to': 'someone', 'reply': 1}, {'hello': 'a'})
-        env, msg = sess.recvmsg(False, 1)
+        env, msg = sess.recvmsg(True, 1)
         self.assertEqual({'to': 'someone', 'reply': 1}, env)
         self.assertEqual({'to': 'someone', 'reply': 1}, env)
         self.assertEqual({"hello": "a"}, msg)
         self.assertEqual({"hello": "a"}, msg)
         self.assertTrue(sess.has_queued_msgs())
         self.assertTrue(sess.has_queued_msgs())
-        env, msg = sess.recvmsg(False)
+        env, msg = sess.recvmsg(True)
         self.assertEqual({'to': 'someone'}, env)
         self.assertEqual({'to': 'someone'}, env)
         self.assertEqual({"hello": "b"}, msg)
         self.assertEqual({"hello": "b"}, msg)
         self.assertFalse(sess.has_queued_msgs())
         self.assertFalse(sess.has_queued_msgs())
@@ -353,9 +360,7 @@ class testSession(unittest.TestCase):
             sess = MySession(1, s2)
             sess = MySession(1, s2)
             # set timeout to 100 msec, so test does not take too long
             # set timeout to 100 msec, so test does not take too long
             sess.set_timeout(100)
             sess.set_timeout(100)
-            env, msg = sess.group_recvmsg(False)
-            self.assertEqual(None, env)
-            self.assertEqual(None, msg)
+            self.assertRaises(SessionTimeout, sess.group_recvmsg, False)
         finally:
         finally:
             os.remove(TEST_SOCKET_FILE)
             os.remove(TEST_SOCKET_FILE)
 
 

+ 36 - 17
src/lib/python/isc/config/ccsession.py

@@ -177,10 +177,14 @@ class ModuleCCSession(ConfigData):
     def check_command(self):
     def check_command(self):
         """Check whether there is a command or configuration update
         """Check whether there is a command or configuration update
            on the channel. Call the corresponding callback function if
            on the channel. Call the corresponding callback function if
-           there is."""
-        msg, env = self._session.group_recvmsg(False)
+           there is. This function does a non-blocking read on the
+           cc session, and returns nothing. It will respond to any
+           command by either an error or the answer message returned
+           by the callback, unless the latter is None."""
+        msg, env = self._session.group_recvmsg(True)
+        
         # should we default to an answer? success-by-default? unhandled error?
         # should we default to an answer? success-by-default? unhandled error?
-        if msg and not 'result' in msg:
+        if msg is not None and not 'result' in msg:
             answer = None
             answer = None
             try:
             try:
                 module_name = env['group']
                 module_name = env['group']
@@ -244,6 +248,8 @@ class ModuleCCSession(ConfigData):
            also subscribes to the channel of the remote module name
            also subscribes to the channel of the remote module name
            to receive the relevant updates. It is not possible to
            to receive the relevant updates. It is not possible to
            specify your own handler for this right now.
            specify your own handler for this right now.
+           start() must have been called on this CCSession
+           prior to the call to this method.
            Returns the name of the module."""
            Returns the name of the module."""
         module_spec = isc.config.module_spec_from_file(spec_file_name)
         module_spec = isc.config.module_spec_from_file(spec_file_name)
         module_cfg = ConfigData(module_spec)
         module_cfg = ConfigData(module_spec)
@@ -252,7 +258,13 @@ class ModuleCCSession(ConfigData):
 
 
         # Get the current config for that module now
         # Get the current config for that module now
         seq = self._session.group_sendmsg(create_command(COMMAND_GET_CONFIG, { "module_name": module_name }), "ConfigManager")
         seq = self._session.group_sendmsg(create_command(COMMAND_GET_CONFIG, { "module_name": module_name }), "ConfigManager")
-        answer, env = self._session.group_recvmsg(False, seq)
+
+        try:
+            answer, env = self._session.group_recvmsg(False, seq)
+        except isc.cc.SessionTimeout:
+            raise ModuleCCSessionError("No answer from ConfigManager when "
+                                       "asking about Remote module " +
+                                       module_name)
         if answer:
         if answer:
             rcode, value = parse_answer(answer)
             rcode, value = parse_answer(answer)
             if rcode == 0:
             if rcode == 0:
@@ -283,25 +295,32 @@ class ModuleCCSession(ConfigData):
         """Sends the data specification to the configuration manager"""
         """Sends the data specification to the configuration manager"""
         msg = create_command(COMMAND_MODULE_SPEC, self.get_module_spec().get_full_spec())
         msg = create_command(COMMAND_MODULE_SPEC, self.get_module_spec().get_full_spec())
         seq = self._session.group_sendmsg(msg, "ConfigManager")
         seq = self._session.group_sendmsg(msg, "ConfigManager")
-        answer, env = self._session.group_recvmsg(False, seq)
+        try:
+            answer, env = self._session.group_recvmsg(False, seq)
+        except isc.cc.SessionTimeout:
+            # TODO: log an error?
+            pass
         
         
     def __request_config(self):
     def __request_config(self):
         """Asks the configuration manager for the current configuration, and call the config handler if set.
         """Asks the configuration manager for the current configuration, and call the config handler if set.
            Raises a ModuleCCSessionError if there is no answer from the configuration manager"""
            Raises a ModuleCCSessionError if there is no answer from the configuration manager"""
         seq = self._session.group_sendmsg(create_command(COMMAND_GET_CONFIG, { "module_name": self._module_name }), "ConfigManager")
         seq = self._session.group_sendmsg(create_command(COMMAND_GET_CONFIG, { "module_name": self._module_name }), "ConfigManager")
-        answer, env = self._session.group_recvmsg(False, seq)
-        if answer:
-            rcode, value = parse_answer(answer)
-            if rcode == 0:
-                if value != None and self.get_module_spec().validate_config(False, value):
-                    self.set_local_config(value);
-                    if self._config_handler:
-                        self._config_handler(value)
+        try:
+            answer, env = self._session.group_recvmsg(False, seq)
+            if answer:
+                rcode, value = parse_answer(answer)
+                if rcode == 0:
+                    if value != None and self.get_module_spec().validate_config(False, value):
+                        self.set_local_config(value);
+                        if self._config_handler:
+                            self._config_handler(value)
+                else:
+                    # log error
+                    print("[" + self._module_name + "] Error requesting configuration: " + value)
             else:
             else:
-                # log error
-                print("[" + self._module_name + "] Error requesting configuration: " + value)
-        else:
-            raise ModuleCCSessionError("No answer from configuration manager")
+                raise ModuleCCSessionError("No answer from configuration manager")
+        except isc.cc.SessionTimeout:
+            raise ModuleCCSessionError("CC Session timeout waiting for configuration manager")
 
 
 
 
 class UIModuleCCSession(MultiConfigData):
 class UIModuleCCSession(MultiConfigData):

+ 26 - 11
src/lib/python/isc/config/cfgmgr.py

@@ -283,7 +283,10 @@ class ConfigManager:
             update_cmd = ccsession.create_command(ccsession.COMMAND_CONFIG_UPDATE,
             update_cmd = ccsession.create_command(ccsession.COMMAND_CONFIG_UPDATE,
                                                   conf_part)
                                                   conf_part)
             seq = self.cc.group_sendmsg(update_cmd, module_name)
             seq = self.cc.group_sendmsg(update_cmd, module_name)
-            answer, env = self.cc.group_recvmsg(False, seq)
+            try:
+                answer, env = self.cc.group_recvmsg(False, seq)
+            except isc.cc.SessionTimeout:
+                answer = ccsession.create_answer(1, "Timeout waiting for answer from " + module_name)
         else:
         else:
             conf_part = data.set(self.config.data, module_name, {})
             conf_part = data.set(self.config.data, module_name, {})
             data.merge(conf_part[module_name], cmd[1])
             data.merge(conf_part[module_name], cmd[1])
@@ -292,7 +295,10 @@ class ConfigManager:
                                                   conf_part[module_name])
                                                   conf_part[module_name])
             seq = self.cc.group_sendmsg(update_cmd, module_name)
             seq = self.cc.group_sendmsg(update_cmd, module_name)
             # replace 'our' answer with that of the module
             # replace 'our' answer with that of the module
-            answer, env = self.cc.group_recvmsg(False, seq)
+            try:
+                answer, env = self.cc.group_recvmsg(False, seq)
+            except isc.cc.SessionTimeout:
+                answer = ccsession.create_answer(1, "Timeout waiting for answer from " + module_name)
         if answer:
         if answer:
             rcode, val = ccsession.parse_answer(answer)
             rcode, val = ccsession.parse_answer(answer)
             if rcode == 0:
             if rcode == 0:
@@ -313,15 +319,19 @@ class ConfigManager:
                 update_cmd = ccsession.create_command(ccsession.COMMAND_CONFIG_UPDATE,
                 update_cmd = ccsession.create_command(ccsession.COMMAND_CONFIG_UPDATE,
                                                       self.config.data[module])
                                                       self.config.data[module])
                 seq = self.cc.group_sendmsg(update_cmd, module)
                 seq = self.cc.group_sendmsg(update_cmd, module)
-                answer, env = self.cc.group_recvmsg(False, seq)
-                if answer == None:
-                    got_error = True
-                    err_list.append("No answer message from " + module)
-                else:
-                    rcode, val = ccsession.parse_answer(answer)
-                    if rcode != 0:
+                try:
+                    answer, env = self.cc.group_recvmsg(False, seq)
+                    if answer == None:
                         got_error = True
                         got_error = True
-                        err_list.append(val)
+                        err_list.append("No answer message from " + module)
+                    else:
+                        rcode, val = ccsession.parse_answer(answer)
+                        if rcode != 0:
+                            got_error = True
+                            err_list.append(val)
+                except isc.cc.SessionTimeout:
+                    got_error = True
+                    err_list.append("CC Timeout waiting on answer message from " + module)
         if not got_error:
         if not got_error:
             self.write_config()
             self.write_config()
             return ccsession.create_answer(0)
             return ccsession.create_answer(0)
@@ -394,8 +404,13 @@ class ConfigManager:
         """Runs the configuration manager."""
         """Runs the configuration manager."""
         self.running = True
         self.running = True
         while (self.running):
         while (self.running):
+            # we just wait eternally for any command here, so disable
+            # timeouts for this specific recv
+            self.cc.set_timeout(0)
             msg, env = self.cc.group_recvmsg(False)
             msg, env = self.cc.group_recvmsg(False)
-            # ignore 'None' value (current result of timeout)
+            # and set it back to whatever we default to
+            self.cc.set_timeout(isc.cc.Session.MSGQ_DEFAULT_TIMEOUT)
+            # ignore 'None' value (even though they should not occur)
             # and messages that are answers to questions we did
             # and messages that are answers to questions we did
             # not ask
             # not ask
             if msg is not None and not 'result' in msg:
             if msg is not None and not 'result' in msg:

+ 14 - 2
src/lib/python/isc/config/tests/unittest_fakesession.py

@@ -15,6 +15,8 @@
 
 
 # $Id$
 # $Id$
 
 
+import isc
+
 #
 #
 # We can probably use a more general version of this
 # We can probably use a more general version of this
 #
 #
@@ -24,6 +26,10 @@ class FakeModuleCCSession:
         # each entry is of the form [ channel, instance, message ]
         # each entry is of the form [ channel, instance, message ]
         self.message_queue = []
         self.message_queue = []
         self._socket = "ok we just need something not-None here atm"
         self._socket = "ok we just need something not-None here atm"
+        # if self.timeout is set to anything other than 0, and
+        # the message_queue is empty when receive is called, throw
+        # a SessionTimeout
+        self._timeout = 0
 
 
     def group_subscribe(self, group_name, instance_name = None):
     def group_subscribe(self, group_name, instance_name = None):
         if not group_name in self.subscriptions:
         if not group_name in self.subscriptions:
@@ -63,7 +69,11 @@ class FakeModuleCCSession:
             if qm[0] in self.subscriptions and (qm[1] == None or qm[1] in self.subscriptions[qm[0]]):
             if qm[0] in self.subscriptions and (qm[1] == None or qm[1] in self.subscriptions[qm[0]]):
                 self.message_queue.remove(qm)
                 self.message_queue.remove(qm)
                 return qm[2], {'group': qm[0], 'from': qm[1]}
                 return qm[2], {'group': qm[0], 'from': qm[1]}
-        return None, None
+        if self._timeout == 0:
+            return None, None
+        else:
+            raise isc.cc.SessionTimeout("Timeout set but no data to "
+                                 "return to group_recvmsg()")
 
 
     def get_message(self, channel, target = None):
     def get_message(self, channel, target = None):
         for qm in self.message_queue:
         for qm in self.message_queue:
@@ -75,4 +85,6 @@ class FakeModuleCCSession:
     def close(self):
     def close(self):
         # need to pass along somehow that this function has been called,
         # need to pass along somehow that this function has been called,
         self._socket = "closed"
         self._socket = "closed"
-        pass
+
+    def set_timeout(self, timeout):
+        self._timeout = timeout