Parcourir la source

[5189] Eliminated race conditions and mem leaks in CA unit tests.

Marcin Siodelski il y a 8 ans
Parent
commit
060f324fad

+ 1 - 1
src/bin/agent/ca_command_mgr.cc

@@ -207,7 +207,7 @@ CtrlAgentCommandMgr::forwardCommand(const std::string& service,
                    received_ec = ec;
                    received_feed = feed;
                    // Stop the IO service so as we can continue.
-                   io_service->stop();
+                   io_service->stopWork();
                }, ClientConnection::Timeout(CONNECTION_TIMEOUT));
     io_service->run();
 

+ 21 - 7
src/bin/agent/tests/ca_command_mgr_unittests.cc

@@ -171,12 +171,14 @@ public:
     ///
     /// @param response Stub response to be sent from the server socket to the
     /// client.
-    void bindServerSocket(const std::string& response) {
+    /// @param use_thread Indicates if the IO service will be ran in thread.
+    void bindServerSocket(const std::string& response,
+                          const bool use_thread = false) {
         server_socket_.reset(new test::TestServerUnixSocket(*getIOService(),
                                                             unixSocketFilePath(),
                                                             response));
         server_socket_->startTimer(TEST_TIMEOUT);
-        server_socket_->bindServerSocket();
+        server_socket_->bindServerSocket(use_thread);
     }
 
     /// @brief Creates command with no arguments.
@@ -224,22 +226,30 @@ public:
         // Configure client side socket.
         configureControlSocket(server_type);
         // Create server side socket.
-        bindServerSocket(server_response);
+        bindServerSocket(server_response, true);
 
         // The client side communication is synchronous. To be able to respond
         // to this we need to run the server side socket at the same time.
         // Running IO service in a thread guarantees that the server responds
         // as soon as it receives the control command.
-        isc::util::thread::Thread th(boost::bind(&IOService::run, getIOService().get()));
+        isc::util::thread::Thread th(boost::bind(&IOService::run,
+                                                 getIOService().get()));
+
+
+        // Wait for the IO service in thread to actually run.
+        server_socket_->waitForRunning();
 
         ConstElementPtr command = createCommand("foo", service);
         ConstElementPtr answer = mgr_.handleCommand("foo", ConstElementPtr(),
                                                     command);
 
-        getIOService()->stop();
+        server_socket_->stopServer();
+        getIOService()->stopWork();
 
         th.wait();
 
+        EXPECT_EQ(expected_responses, server_socket_->getResponseNum());
+
         checkAnswer(answer, expected_result0, expected_result1, expected_result2);
     }
 
@@ -342,7 +352,7 @@ TEST_F(CtrlAgentCommandMgrTest, forwardListCommands) {
     // Configure client side socket.
     configureControlSocket(CtrlAgentCfgContext::TYPE_DHCP4);
     // Create server side socket.
-    bindServerSocket("{ \"result\" : 3 }");
+    bindServerSocket("{ \"result\" : 3 }", true);
 
     // The client side communication is synchronous. To be able to respond
     // to this we need to run the server side socket at the same time.
@@ -350,11 +360,15 @@ TEST_F(CtrlAgentCommandMgrTest, forwardListCommands) {
     // as soon as it receives the control command.
     isc::util::thread::Thread th(boost::bind(&IOService::run, getIOService().get()));
 
+    // Wait for the IO service in thread to actually run.
+    server_socket_->waitForRunning();
+
     ConstElementPtr command = createCommand("list-commands", "dhcp4");
     ConstElementPtr answer = mgr_.handleCommand("list-commands", ConstElementPtr(),
                                                 command);
 
-    getIOService()->stop();
+    server_socket_->stopServer();
+    getIOService()->stopWork();
 
     th.wait();
 

+ 15 - 3
src/lib/asiolink/io_service.cc

@@ -1,4 +1,4 @@
-// Copyright (C) 2011-2016 Internet Systems Consortium, Inc. ("ISC")
+// Copyright (C) 2011-2017 Internet Systems Consortium, Inc. ("ISC")
 //
 // This Source Code Form is subject to the terms of the Mozilla Public
 // License, v. 2.0. If a copy of the MPL was not distributed with this
@@ -10,6 +10,7 @@
 
 #include <unistd.h>             // for some IPC/network system calls
 #include <netinet/in.h>
+#include <boost/shared_ptr.hpp>
 #include <sys/socket.h>
 
 namespace isc {
@@ -40,7 +41,7 @@ public:
     /// \brief The constructor
     IOServiceImpl() :
         io_service_(),
-        work_(io_service_)
+        work_(new boost::asio::io_service::work(io_service_))
     {};
     /// \brief The destructor.
     ~IOServiceImpl() {};
@@ -76,6 +77,12 @@ public:
     /// This will return the control to the caller of the \c run() method.
     void stop() { io_service_.stop();} ;
 
+    /// \brief Removes IO service work object to let it finish running
+    /// when all handlers have been invoked.
+    void stopWork() {
+        work_.reset();
+    }
+
     /// \brief Return the native \c io_service object used in this wrapper.
     ///
     /// This is a short term work around to support other Kea modules
@@ -89,7 +96,7 @@ public:
     }
 private:
     boost::asio::io_service io_service_;
-    boost::asio::io_service::work work_;
+    boost::shared_ptr<boost::asio::io_service::work> work_;
 };
 
 IOService::IOService() {
@@ -120,6 +127,11 @@ IOService::stop() {
     io_impl_->stop();
 }
 
+void
+IOService::stopWork() {
+    io_impl_->stopWork();
+}
+
 boost::asio::io_service&
 IOService::get_io_service() {
     return (io_impl_->get_io_service());

+ 5 - 1
src/lib/asiolink/io_service.h

@@ -1,4 +1,4 @@
-// Copyright (C) 2011-2015 Internet Systems Consortium, Inc. ("ISC")
+// Copyright (C) 2011-2017 Internet Systems Consortium, Inc. ("ISC")
 //
 // This Source Code Form is subject to the terms of the Mozilla Public
 // License, v. 2.0. If a copy of the MPL was not distributed with this
@@ -64,6 +64,10 @@ public:
     /// This will return the control to the caller of the \c run() method.
     void stop();
 
+    /// \brief Removes IO service work object to let it finish running
+    /// when all handlers have been invoked.
+    void stopWork();
+
     /// \brief Return the native \c io_service object used in this wrapper.
     ///
     /// This is a short term work around to support other Kea modules

+ 62 - 3
src/lib/asiolink/testutils/test_server_unix_socket.cc

@@ -50,6 +50,7 @@ public:
           sent_response_callback_(sent_response_callback) {
     }
 
+    /// @brief Starts asynchronous read from the socket.
     void start() {
        socket_->async_read_some(boost::asio::buffer(&raw_buf_[0], raw_buf_.size()),
            boost::bind(&Connection::readHandler, shared_from_this(),
@@ -57,6 +58,11 @@ public:
                        boost::asio::placeholders::bytes_transferred));
     }
 
+    /// @brief Closes the socket.
+    void stop() {
+        socket_->close();
+    }
+
     /// @brief Handler invoked when data have been received over the socket.
     ///
     /// This is the handler invoked when the data have been received over the
@@ -123,12 +129,13 @@ public:
     ///
     /// @param io_service Reference to the IO service.
     ConnectionPool(IOService& io_service)
-        : io_service_(io_service), next_socket_(),
+        : io_service_(io_service), connections_(), next_socket_(),
           response_num_(0) {
     }
 
     /// @brief Destructor.
     ~ConnectionPool() {
+        stopAll();
     }
 
     /// @brief Creates new unix domain socket and returns it.
@@ -158,9 +165,27 @@ public:
         }));
         conn->start();
 
+        connections_.insert(conn);
         next_socket_.reset();
     }
 
+    /// @brief Stops the given connection.
+    ///
+    /// @param conn Pointer to the connection to be stopped.
+    void stop(const ConnectionPtr& conn) {
+        conn->stop();
+        connections_.erase(conn);
+    }
+
+    /// @brief Stops all connections.
+    void stopAll() {
+        for (auto conn = connections_.begin(); conn != connections_.end();
+             ++conn) {
+            (*conn)->stop();
+        }
+        connections_.clear();
+    }
+
     /// @brief Returns number of responses sent so far.
     size_t getResponseNum() const {
         return (response_num_);
@@ -171,6 +196,9 @@ private:
     /// @brief Reference to the IO service.
     IOService& io_service_;
 
+    /// @brief Container holding established connections.
+    std::set<ConnectionPtr> connections_;
+
     /// @brief Holds pointer to the generated socket.
     ///
     /// This socket will be used by the next connection.
@@ -190,10 +218,12 @@ TestServerUnixSocket::TestServerUnixSocket(IOService& io_service,
       test_timer_(io_service_),
       custom_response_(custom_response),
       connection_pool_(new ConnectionPool(io_service)),
-      stopped_(false) {
+      stopped_(false),
+      running_(false) {
 }
 
 TestServerUnixSocket::~TestServerUnixSocket() {
+    server_acceptor_.close();
 }
 
 void
@@ -214,11 +244,23 @@ TestServerUnixSocket::startTimer(const long test_timeout) {
 }
 
 void
-TestServerUnixSocket::bindServerSocket() {
+TestServerUnixSocket::stopServer() {
+    test_timer_.cancel();
+    server_acceptor_.cancel();
+    connection_pool_->stopAll();
+}
+
+void
+TestServerUnixSocket::bindServerSocket(const bool use_thread) {
     server_acceptor_.open();
     server_acceptor_.bind(server_endpoint_);
     server_acceptor_.listen();
     accept();
+
+    if (use_thread) {
+        io_service_.post(boost::bind(&TestServerUnixSocket::signalRunning,
+                                     this));
+    }
 }
 
 void
@@ -238,6 +280,23 @@ TestServerUnixSocket::accept() {
 }
 
 void
+TestServerUnixSocket::signalRunning() {
+    {
+        isc::util::thread::Mutex::Locker lock(mutex_);
+        running_ = true;
+    }
+    condvar_.signal();
+}
+
+void
+TestServerUnixSocket::waitForRunning() {
+    isc::util::thread::Mutex::Locker lock(mutex_);
+    while (!running_) {
+        condvar_.wait(mutex_);
+    }
+}
+
+void
 TestServerUnixSocket::timeoutHandler() {
     ADD_FAILURE() << "Timeout occurred while running the test!";
     io_service_.stop();

+ 36 - 4
src/lib/asiolink/testutils/test_server_unix_socket.h

@@ -10,6 +10,8 @@
 #include <config.h>
 #include <asiolink/interval_timer.h>
 #include <asiolink/io_service.h>
+#include <util/threads/thread.h>
+#include <util/threads/sync.h>
 #include <boost/shared_ptr.hpp>
 #include <gtest/gtest.h>
 #include <list>
@@ -65,13 +67,19 @@ public:
     /// @param test_timeout Test timeout in milliseconds.
     void startTimer(const long test_timeout);
 
+    /// @brief Cancels all asynchronous operations.
+    void stopServer();
+
     /// @brief Generates response of a given length.
     ///
     /// @param response_size Desired response size.
     void generateCustomResponse(const uint64_t response_size);
 
     /// @brief Creates and binds server socket.
-    void bindServerSocket();
+    ///
+    /// @param use_thread Boolean value indicating if the IO service
+    /// is running in thread.
+    void bindServerSocket(const bool use_thread = false);
 
     /// @brief Server acceptor handler.
     ///
@@ -86,17 +94,32 @@ public:
     /// @brief Return number of responses sent so far to the clients.
     size_t getResponseNum() const;
 
-    /// @brief Checks if IO service has been stopped as a result of the
-    /// timeout.
-    bool isStopped() const {
+    /// @brief Indicates if the server has been stopped.
+    bool isStopped() {
         return (stopped_);
     }
 
+    /// @brief Waits for the server signal that it is running.
+    ///
+    /// When the caller starts the service he indicates whether
+    /// IO service will be running in thread or not. If threads
+    /// are used the caller has to wait for the IO service to
+    /// actually run. In such case this function should be invoked
+    /// which waits for a posted callback to be executed. When this
+    /// happens it means that IO service is running and the main
+    /// thread can move forward.
+    void waitForRunning();
+
 private:
 
     /// @brief Asynchronously accept new connections.
     void accept();
 
+    /// @brief Handler invoked to signal that server is running.
+    ///
+    /// This is used only when thread is used to run IO service.
+    void signalRunning();
+
     /// @brief IO service used by the tests.
     IOService& io_service_;
 
@@ -117,6 +140,15 @@ private:
     /// @brief Indicates if IO service has been stopped as a result of
     /// a timeout.
     bool stopped_;
+
+    /// @brief Indicates if the server in a thread is running.
+    bool running_;
+
+    /// @brief Mutex used by the server.
+    isc::util::thread::Mutex mutex_;
+
+    /// @brief Conditional variable used by the server.
+    isc::util::thread::CondVar condvar_;
 };
 
 /// @brief Pointer to the @ref TestServerUnixSocket.

+ 2 - 1
src/lib/config/client_connection.cc

@@ -113,7 +113,7 @@ ClientConnectionImpl::start(const ClientConnection::SocketPath& socket_path,
                             const ClientConnection::Timeout& timeout) {
     // Start the timer protecting against timeouts.
     timer_.setup(boost::bind(&ClientConnectionImpl::timeoutCallback,
-                             shared_from_this(), handler),
+                             this, handler),
                  timeout.timeout_, IntervalTimer::ONE_SHOT);
 
     // Store the command in the class member to make sure it is valid
@@ -220,6 +220,7 @@ void
 ClientConnectionImpl::terminate(const boost::system::error_code& ec,
                                 ClientConnection::Handler handler) {
     try {
+        timer_.cancel();
         current_command_.clear();
         handler(ec, feed_);