Browse Source

[1028] prevented more unusual leaks when process_xfrin() raises an excpetion.
also logged it when such an event happens.

JINMEI Tatuya 13 years ago
parent
commit
738b11db9f
3 changed files with 165 additions and 39 deletions
  1. 66 4
      src/bin/xfrin/tests/xfrin_test.py
  2. 84 35
      src/bin/xfrin/xfrin.py.in
  3. 15 0
      src/bin/xfrin/xfrin_messages.mes

+ 66 - 4
src/bin/xfrin/tests/xfrin_test.py

@@ -303,6 +303,7 @@ class TestXfrinState(unittest.TestCase):
         self.conn = MockXfrinConnection(self.sock_map, TEST_ZONE_NAME,
                                         TEST_RRCLASS, None, threading.Event(),
                                         TEST_MASTER_IPV4_ADDRINFO)
+        self.conn.init_socket()
         self.begin_soa = RRset(TEST_ZONE_NAME, TEST_RRCLASS, RRType.SOA(),
                                RRTTL(3600))
         self.begin_soa.add_rdata(Rdata(RRType.SOA(), TEST_RRCLASS,
@@ -588,6 +589,7 @@ class TestXfrinConnection(unittest.TestCase):
         self.conn = MockXfrinConnection(self.sock_map, TEST_ZONE_NAME,
                                         TEST_RRCLASS, None, threading.Event(),
                                         TEST_MASTER_IPV4_ADDRINFO)
+        self.conn.init_socket()
         self.soa_response_params = {
             'questions': [example_soa_question],
             'bad_qid': False,
@@ -723,12 +725,14 @@ class TestAXFR(TestXfrinConnection):
         # uncovers such a bug.
         c = MockXfrinConnection({}, TEST_ZONE_NAME, TEST_RRCLASS, None,
                                 threading.Event(), TEST_MASTER_IPV6_ADDRINFO)
+        c.init_socket()
         c.bind(('::', 0))
         c.close()
 
     def test_init_chclass(self):
         c = MockXfrinConnection({}, TEST_ZONE_NAME, RRClass.CH(), None,
                                 threading.Event(), TEST_MASTER_IPV4_ADDRINFO)
+        c.init_socket()
         axfrmsg = c._create_query(RRType.AXFR())
         self.assertEqual(axfrmsg.get_question()[0].get_class(),
                          RRClass.CH())
@@ -1683,15 +1687,27 @@ class TestXfrinRecorder(unittest.TestCase):
 class TestXfrinProcess(unittest.TestCase):
     def setUp(self):
         self.unlocked = False
+        self.conn_closed = False
+        self.do_raise_on_close = False
+        self.do_raise_on_connect = False
+        self.do_raise_on_publish = False
+        self.master = (socket.AF_INET, socket.SOCK_STREAM,
+                       (TEST_MASTER_IPV4_ADDRESS, TEST_MASTER_PORT))
 
     def tearDown(self):
+        # whatever happens the lock acquired in xfrin_recorder.increment
+        # must always be released.  We checked the condition for all test
+        # cases.
         self.assertTrue(self.unlocked)
 
+        # Same for the connection
+        self.assertTrue(self.conn_closed)
+
     def increment(self, zone_name):
         '''Fake method of xfrin_recorder.increment.
 
         '''
-        pass
+        self.unlocked = False
 
     def decrement(self, zone_name):
         '''Fake method of xfrin_recorder.decrement.
@@ -1703,7 +1719,20 @@ class TestXfrinProcess(unittest.TestCase):
         '''Fake method of serve.publish_xfrin_news
 
         '''
-        pass
+        if self.do_raise_on_publish:
+            raise XfrinTestException('Emulated exception in publish')
+
+    def connect_to_master(self, conn):
+        self.sock_fd = conn.fileno()
+        if self.do_raise_on_connect:
+            raise XfrinTestException('Emulated exception in connect')
+        return True
+
+    def conn_close(self, conn):
+        self.conn_closed = True
+        XfrinConnection.close(conn)
+        if self.do_raise_on_close:
+            raise XfrinTestException('Emulated exception in connect')
 
     def create_xfrinconn(self, sock_map, zone_name, rrclass, datasrc_client,
                          shutdown_event, master_addrinfo, tsig_key):
@@ -1718,12 +1747,45 @@ class TestXfrinProcess(unittest.TestCase):
         conn._tsig_ctx_creator = None
         self.assertEqual(orig_ref, sys.getrefcount(conn))
 
+        # Replace some methods for connect with our internal ones for the
+        # convenience of tests
+        conn.connect_to_master = lambda : self.connect_to_master(conn)
+        conn.do_xfrin = lambda x, y : XFRIN_OK
+        conn.close = lambda : self.conn_close(conn)
+
         return conn
 
     def test_process_xfrin_normal(self):
+        # Normal, successful case.  We only check that things are cleaned up
+        # at the tearDown time.
+        process_xfrin(self, self, TEST_ZONE_NAME, TEST_RRCLASS, None, None,
+                      self.master,  False, None, RRType.AXFR(),
+                      self.create_xfrinconn)
+
+    def test_process_xfrin_exception_on_connect(self):
+        # connect_to_master() will raise an exception.  Things must still be
+        # cleaned up.
+        self.do_raise_on_connect = True
+        process_xfrin(self, self, TEST_ZONE_NAME, TEST_RRCLASS, None, None,
+                      self.master,  False, None, RRType.AXFR(),
+                      self.create_xfrinconn)
+
+    def test_process_xfrin_exception_on_close(self):
+        # connect() will result in exception, and even the cleanup close()
+        # will fail with an exception.  This should be quite likely a bug,
+        # but we deal with that case.
+        self.do_raise_on_connect = True
+        self.do_raise_on_close = True
+        process_xfrin(self, self, TEST_ZONE_NAME, TEST_RRCLASS, None, None,
+                      self.master,  False, None, RRType.AXFR(),
+                      self.create_xfrinconn)
+
+    def test_process_xfrin_exception_on_publish(self):
+        # xfr succeeds but notifying the zonemgr fails with exception.
+        # everything must still be cleaned up.
+        self.do_raise_on_publish = True
         process_xfrin(self, self, TEST_ZONE_NAME, TEST_RRCLASS, None, None,
-                      (socket.AF_INET, socket.SOCK_STREAM,
-                       TEST_MASTER_IPV4_ADDRESS), False, None, RRType.AXFR(),
+                      self.master,  False, None, RRType.AXFR(),
                       self.create_xfrinconn)
 
 class TestXfrin(unittest.TestCase):

+ 84 - 35
src/bin/xfrin/xfrin.py.in

@@ -469,19 +469,28 @@ class XfrinConnection(asyncore.dispatcher):
         # Data source handler
         self._datasrc_client = datasrc_client
 
-        self.create_socket(master_addrinfo[0], master_addrinfo[1])
         self._sock_map = sock_map
         self._soa_rr_count = 0
         self._idle_timeout = idle_timeout
-        self.setblocking(1)
         self._shutdown_event = shutdown_event
-        self._master_address = master_addrinfo[2]
+        self._master_addrinfo = master_addrinfo
         self._tsig_key = tsig_key
         self._tsig_ctx = None
         # tsig_ctx_creator is introduced to allow tests to use a mock class for
         # easier tests (in normal case we always use the default)
         self._tsig_ctx_creator = lambda key : TSIGContext(key)
 
+    def init_socket(self):
+        '''Initialize the underlyig socket.
+
+        This is essentially a part of __init__() and is expected to be
+        called immediately after the constructor.  It's separated from
+        the constructor because otherwise we might not be able to close
+        it if the constructor raises an exception after opening the socket.
+        '''
+        self.create_socket(self._master_addrinfo[0], self._master_addrinfo[1])
+        self.setblocking(1)
+
     def __set_xfrstate(self, new_state):
         self.__state = new_state
 
@@ -496,10 +505,11 @@ class XfrinConnection(asyncore.dispatcher):
         '''Connect to master in TCP.'''
 
         try:
-            self.connect(self._master_address)
+            self.connect(self._master_addrinfo[2])
             return True
         except socket.error as e:
-            logger.error(XFRIN_CONNECT_MASTER, self._master_address, str(e))
+            logger.error(XFRIN_CONNECT_MASTER, self._master_addrinfo[2],
+                         str(e))
             return False
 
     def _get_zone_soa(self):
@@ -695,7 +705,6 @@ class XfrinConnection(asyncore.dispatcher):
             # (if not yet - possible in case of xfr-level exception) as soon
             # as possible
             self._diff = None
-            self.close()
 
         return ret
 
@@ -796,43 +805,83 @@ class XfrinConnection(asyncore.dispatcher):
         # Overwrite the log function, log nothing
         pass
 
-def process_xfrin(server, xfrin_recorder, zone_name, rrclass, db_file,
-                  shutdown_event, master_addrinfo, check_soa, tsig_key,
-                  request_type, conn_class=XfrinConnection):
-    xfrin_recorder.increment(zone_name)
-
-    # Create a data source client used in this XFR session.  Right now we
-    # still assume an sqlite3-based data source, and use both the old and new
-    # data source APIs.  We also need to use a mock client for tests.
-    # For a temporary workaround to deal with these situations, we skip the
-    # creation when the given file is none (the test case).  Eventually
-    # this code will be much cleaner.
-    datasrc_client = None
-    if db_file is not None:
-        # temporary hardcoded sqlite initialization. Once we decide on
-        # the config specification, we need to update this (TODO)
-        # this may depend on #1207, or any followup ticket created for #1207
-        datasrc_type = "sqlite3"
-        datasrc_config = "{ \"database_file\": \"" + db_file + "\"}"
-        datasrc_client = DataSourceClient(datasrc_type, datasrc_config)
-
-    # Create a TCP connection for the XFR session and perform the operation.
-    sock_map = {}
-    conn = conn_class(sock_map, zone_name, rrclass, datasrc_client,
-                      shutdown_event, master_addrinfo, tsig_key)
-    # XXX: We still need _db_file for temporary workaround in _create_query().
-    # This should be removed when we eliminate the need for the workaround.
-    conn._db_file = db_file
+def __process_xfrin(server, zone_name, rrclass, db_file,
+                    shutdown_event, master_addrinfo, check_soa, tsig_key,
+                    request_type, conn_class=XfrinConnection):
+    conn = None
+    exception = None
     ret = XFRIN_FAIL
-    if conn.connect_to_master():
-        ret = conn.do_xfrin(check_soa, request_type)
+    try:
+        # Create a data source client used in this XFR session.  Right now we
+        # still assume an sqlite3-based data source, and use both the old and
+        # new data source APIs.  We also need to use a mock client for tests.
+        # For a temporary workaround to deal with these situations, we skip the
+        # creation when the given file is none (the test case).  Eventually
+        # this code will be much cleaner.
+        datasrc_client = None
+        if db_file is not None:
+            # temporary hardcoded sqlite initialization. Once we decide on
+            # the config specification, we need to update this (TODO)
+            # this may depend on #1207, or any followup ticket created for #1207
+            datasrc_type = "sqlite3"
+            datasrc_config = "{ \"database_file\": \"" + db_file + "\"}"
+            datasrc_client = DataSourceClient(datasrc_type, datasrc_config)
+
+        # Create a TCP connection for the XFR session and perform the operation
+        sock_map = {}
+        conn = conn_class(sock_map, zone_name, rrclass, datasrc_client,
+                          shutdown_event, master_addrinfo, tsig_key)
+        conn.init_socket()
+        # XXX: We still need _db_file for temporary workaround in _create_query().
+        # This should be removed when we eliminate the need for the workaround.
+        conn._db_file = db_file
+        if conn.connect_to_master():
+            ret = conn.do_xfrin(check_soa, request_type)
+    except Exception as ex:
+        # If exception happens, just remember it here so that we can re-raise
+        # after cleaning up things.  We don't log it here because we want
+        # eliminate smallest possibility of having an exception in logging
+        # itself.
+        exception = ex
+
+    # asyncore.dispatcher requires explicit close() unless its lifetime
+    # from born to destruction is closed within asyncore.loop, which is not
+    # the case for us.  We always close() here, whether or not do_xfrin
+    # succeeds, and even when we see an unexpected exception.
+    if conn is not None:
+        conn.close()
 
     # Publish the zone transfer result news, so zonemgr can reset the
     # zone timer, and xfrout can notify the zone's slaves if the result
     # is success.
     server.publish_xfrin_news(zone_name, rrclass, ret)
+
+    if exception is not None:
+        raise exception
+
+def process_xfrin(server, xfrin_recorder, zone_name, rrclass, db_file,
+                  shutdown_event, master_addrinfo, check_soa, tsig_key,
+                  request_type, conn_class=XfrinConnection):
+    # Even if it should be rare, the main process of xfrin session can
+    # raise an exception.  In order to make sure the lock in xfrin_recorder
+    # is released in any cases, we delegate the main part to the helper
+    # function in the try block, catch any exceptions, then release the lock.
+    xfrin_recorder.increment(zone_name)
+    exception = None
+    try:
+        __process_xfrin(server, zone_name, rrclass, db_file,
+                        shutdown_event, master_addrinfo, check_soa, tsig_key,
+                        request_type, conn_class)
+    except Exception as ex:
+        # don't log it until we complete derement().
+        exception = ex
     xfrin_recorder.decrement(zone_name)
 
+    if exception is not None:
+        typestr = "AXFR" if request_type == RRType.AXFR() else "IXFR"
+        logger.error(XFRIN_XFR_PROCESS_FAILURE, typestr, zone_name.to_text(),
+                     str(rrclass), str(exception))
+
 class XfrinRecorder:
     def __init__(self):
         self._lock = threading.Lock()

+ 15 - 0
src/bin/xfrin/xfrin_messages.mes

@@ -29,6 +29,21 @@ this can only happen for AXFR.
 The XFR transfer for the given zone has failed due to a protocol error.
 The error is shown in the log message.
 
+% XFRIN_XFR_PROCESS_FAILURE %1 transfer of zone %2/%3 failed: %4
+An XFR session failed outside the main protocol handling.  This
+includes an error at the data source level at the initialization
+phase, unexpected failure in the network connection setup to the
+master server, or even more unexpected failure due to unlikely events
+such as memory allocation failure.  Details of the error are shown in
+the log message.  In general, these errors are not really expected
+ones, and indicate an installation error or a program bug.  The
+session handler thread tries to clean up all intermediate resources
+even on these errors, but it may be incomplete.  So, if this log
+message continuously appears, system resource consumption should be
+checked, and you may even want to disable the corresponding transfers.
+You may also want to file a bug report if this message appears so
+often.
+
 % XFRIN_XFR_TRANSFER_STARTED %1 transfer of zone %2 started
 A connection to the master server has been made, the serial value in
 the SOA record has been checked, and a zone transfer has been started.