Browse Source

[trac554] Update function names and prepare for TCP looping

Updated function names to match convention.  Also added receiveComplete()
to prepare for the fact that a TCP receive may require multiple reads
before the complete message is read.
Stephen Morris 14 years ago
parent
commit
ad418dc785

+ 8 - 7
src/lib/asiolink/io_fetch.cc

@@ -86,6 +86,7 @@ IOFetch::IOFetchData::IOFetchData(IOService& io_service,
     msgbuf(new OutputBuffer(512)),         // TODO: Why this number?
     msgbuf(new OutputBuffer(512)),         // TODO: Why this number?
     data(new char[IOFetch::MAX_LENGTH]),
     data(new char[IOFetch::MAX_LENGTH]),
     callback(cb),
     callback(cb),
+    rcv_amount(0),
     stopped(false),
     stopped(false),
     timer(io_service.get_io_service()),
     timer(io_service.get_io_service()),
     timeout(wait)
     timeout(wait)
@@ -127,6 +128,10 @@ IOFetch::operator()(error_code ec, size_t length) {
             msg.addQuestion(data_->question);
             msg.addQuestion(data_->question);
             MessageRenderer renderer(*data_->msgbuf);
             MessageRenderer renderer(*data_->msgbuf);
             msg.toWire(renderer);
             msg.toWire(renderer);
+
+            // As this is a new fetch, clear the amount of data received
+            data_->rcv_amount = 0;
+
             dlog("Sending " + msg.toText() + " to " +
             dlog("Sending " + msg.toText() + " to " +
                 data_->remote->getAddress().toText());
                 data_->remote->getAddress().toText());
         }
         }
@@ -142,9 +147,9 @@ IOFetch::operator()(error_code ec, size_t length) {
         }
         }
 
 
         // Open a connection to the target system.  For speed, if the operation
         // Open a connection to the target system.  For speed, if the operation
-        // was a no-op (i.e. UDP operation) we bypass the yield.
-        bool do_yield = data_->socket->open(data->remote.get(), *this);
-        if (do_yield) {
+        // was completed synchronously (i.e. UDP operation) we bypass the yield.
+        bool asynch = data_->socket->open(data->remote.get(), *this);
+        if (asynch) {
             CORO_YIELD;
             CORO_YIELD;
         }
         }
 
 
@@ -153,10 +158,6 @@ IOFetch::operator()(error_code ec, size_t length) {
         CORO_YIELD data_->socket->async_send(data_->msgbuf->getData(),
         CORO_YIELD data_->socket->async_send(data_->msgbuf->getData(),
             data_->msgbuf->getLength(), data_->remote.get(), *this);
             data_->msgbuf->getLength(), data_->remote.get(), *this);
 
 
-        /// Allocate space for the response.  (XXX: This should be
-        /// optimized by maintaining a free list of pre-allocated blocks)
-        data_->data.reset(new char[MAX_LENGTH]);
-
         /// Begin an asynchronous receive, and yield.  When the receive
         /// Begin an asynchronous receive, and yield.  When the receive
         /// completes, we will resume immediately after this point.
         /// completes, we will resume immediately after this point.
         CORO_YIELD data_->socket->async_receive(data_->data.get(),
         CORO_YIELD data_->socket->async_receive(data_->data.get(),

+ 2 - 3
src/lib/asiolink/io_fetch.h

@@ -44,9 +44,7 @@ namespace asiolink {
 /// \brief Upstream Fetch Processing
 /// \brief Upstream Fetch Processing
 ///
 ///
 /// IOFetch is the class used to send upstream fetches and to handle responses.
 /// IOFetch is the class used to send upstream fetches and to handle responses.
-/// It is a base class containing most of the logic, although the ASIO will
-/// actually instantiate one of the derived classes TCPFetch or UDPFetch.
-/// (These differ in the type of socket and endpoint.)
+/// It is more or less transport-agnostic, although the
 class IOFetch : public IOCompletionCallback  {
 class IOFetch : public IOCompletionCallback  {
 public:
 public:
 
 
@@ -114,6 +112,7 @@ public:
         isc::dns::OutputBufferPtr   msgbuf;     ///< ... and here
         isc::dns::OutputBufferPtr   msgbuf;     ///< ... and here
         boost::shared_array<char>   data;       ///< Temporary array for the data
         boost::shared_array<char>   data;       ///< Temporary array for the data
         Callback*                   callback;   ///< Called on I/O Completion
         Callback*                   callback;   ///< Called on I/O Completion
+        size_t                      rcv_amount; ///< Received amount
         bool                        stopped;    ///< Have we stopped running?
         bool                        stopped;    ///< Have we stopped running?
         asio::deadline_timer        timer;      ///< Timer to measure timeouts
         asio::deadline_timer        timer;      ///< Timer to measure timeouts
         int                         timeout;    ///< Timeout in ms
         int                         timeout;    ///< Timeout in ms

+ 14 - 2
src/lib/asiolink/io_socket.cc

@@ -74,7 +74,7 @@ public:
     /// \param length Unused
     /// \param length Unused
     /// \param endpoint Unused
     /// \param endpoint Unused
     /// \param callback Unused
     /// \param callback Unused
-    virtual void async_send(const void*, size_t, const IOEndpoint*,
+    virtual void asyncSend(const void*, size_t, const IOEndpoint*,
         IOCompletionCallback&) {
         IOCompletionCallback&) {
     }
     }
 
 
@@ -84,12 +84,24 @@ public:
     ///
     ///
     /// \param data Unused
     /// \param data Unused
     /// \param length Unused
     /// \param length Unused
+    /// \param cumulative Unused
     /// \param endpoint Unused
     /// \param endpoint Unused
     /// \param callback Unused
     /// \param callback Unused
-    virtual void async_receive(void* data, size_t, IOEndpoint*,
+    virtual void asyncReceive(void* data, size_t, size_t, IOEndpoint*,
         IOCompletionCallback&) {
         IOCompletionCallback&) {
     }
     }
 
 
+    /// \brief Checks if the data received is complete.
+    ///
+    /// \param data Unused
+    /// \param length Unused
+    /// \param cumulative Unused
+    ///
+    /// \return Always true
+    virtual bool receiveComplete(void*, size_t, size_t&) {
+        return (true);
+    }
+
     /// \brief Cancel I/O On Socket
     /// \brief Cancel I/O On Socket
     ///
     ///
     /// Must be supplied as it is abstract in the base class.
     /// Must be supplied as it is abstract in the base class.

+ 25 - 3
src/lib/asiolink/io_socket.h

@@ -133,7 +133,7 @@ public:
     /// \param length Length of data to send
     /// \param length Length of data to send
     /// \param endpoint Target of the send
     /// \param endpoint Target of the send
     /// \param callback Callback object.
     /// \param callback Callback object.
-    virtual void async_send(const void* data, size_t length,
+    virtual void asyncSend(const void* data, size_t length,
         const IOEndpoint* endpoint, IOCompletionCallback& callback) = 0;
         const IOEndpoint* endpoint, IOCompletionCallback& callback) = 0;
 
 
     /// \brief Receive Asynchronously
     /// \brief Receive Asynchronously
@@ -145,10 +145,32 @@ public:
     ///
     ///
     /// \param data Buffer to receive incoming message
     /// \param data Buffer to receive incoming message
     /// \param length Length of the data buffer
     /// \param length Length of the data buffer
+    /// \param cumulative Amount of data that should already be in the buffer.
     /// \param endpoint Source of the communication
     /// \param endpoint Source of the communication
     /// \param callback Callback object
     /// \param callback Callback object
-    virtual void async_receive(void* data, size_t length, IOEndpoint* endpoint,
-        IOCompletionCallback& callback) = 0;
+    virtual void asyncReceive(void* data, size_t length, size_t cumulative,
+        IOEndpoint* endpoint, IOCompletionCallback& callback) = 0;
+
+    /// \brief Checks if the data received is complete.
+    ///
+    /// This applies to TCP receives, where the data is a byte stream and a
+    /// receive is not guaranteed to receive the entire message.  DNS messages
+    /// over TCP are prefixed by a two-byte count field.  This method takes the
+    /// amount received so far and the amount received in this I/O and checks
+    /// if the message is complete, returning the appropriate indication.  As
+    /// a side-effect, it also updates the amount received.
+    ///
+    /// For a UDP receive, all the data is received in one I/O, so this is
+    /// effectively a no-op (although it does update the amount received).
+    ///
+    /// \param data Data buffer containing data to date
+    /// \param length Amount of data received in last asynchronous I/O
+    /// \param cumulative On input, amount of data received before the last
+    /// I/O.  On output, the total amount of data received to date.
+    ///
+    /// \return true if the receive is complete, false if another receive is
+    /// needed.
+    virtual bool receiveComplete(void* data, size_t length, size_t& cumulative) = 0;
 
 
     /// \brief Cancel I/O On Socket
     /// \brief Cancel I/O On Socket
     virtual void cancel() = 0;
     virtual void cancel() = 0;

+ 20 - 2
src/lib/asiolink/tcp_socket.h

@@ -75,7 +75,7 @@ public:
     /// \param length Length of data to send
     /// \param length Length of data to send
     /// \param endpoint Target of the send
     /// \param endpoint Target of the send
     /// \param callback Callback object.
     /// \param callback Callback object.
-    virtual void async_send(const void*, size_t,
+    virtual void asyncSend(const void*, size_t,
         const IOEndpoint*, IOCompletionCallback&) {
         const IOEndpoint*, IOCompletionCallback&) {
     }
     }
 
 
@@ -88,12 +88,30 @@ public:
     ///
     ///
     /// \param data Buffer to receive incoming message
     /// \param data Buffer to receive incoming message
     /// \param length Length of the data buffer
     /// \param length Length of the data buffer
+    /// \param cumulative Amount of data that should already be in the buffer.
     /// \param endpoint Source of the communication
     /// \param endpoint Source of the communication
     /// \param callback Callback object
     /// \param callback Callback object
-    virtual void async_receive(void* data, size_t, IOEndpoint*,
+    virtual void asyncReceive(void* data, size_t, size_t, IOEndpoint*,
         IOCompletionCallback&) {
         IOCompletionCallback&) {
     }
     }
 
 
+    /// \brief Checks if the data received is complete.
+    ///
+    /// Checks that the total data received is the amount expected by the
+    /// two-byte header to the message.
+    ///
+    /// \param data Data buffer containing data to date
+    /// \param length Amount of data received in last asynchronous I/O
+    /// \param cumulative On input, amount of data received before the last
+    /// I/O.  On output, the total amount of data received to date.
+    ///
+    /// \return true if the receive is complete, false if another receive is
+    /// needed.
+    virtual bool receiveComplete(void*, size_t length, size_t& cumulative) {
+        cumulative = length;
+        return (true);
+    }
+
     /// \brief Cancel I/O On Socket
     /// \brief Cancel I/O On Socket
     virtual void cancel() {
     virtual void cancel() {
     }
     }

+ 12 - 5
src/lib/asiolink/tests/udp_socket_unittest.cc

@@ -64,10 +64,10 @@ using namespace std;
 
 
 namespace {
 namespace {
 
 
-const char* SERVER_ADDRESS = "127.0.0.1";
+const char SERVER_ADDRESS[] = "127.0.0.1";
 const unsigned short SERVER_PORT = 5301;
 const unsigned short SERVER_PORT = 5301;
 
 
-// FIXME Shouldn't we send something that is real message?
+// TODO: Shouldn't we send something that is real message?
 const char OUTBOUND_DATA[] = "Data sent from client to server";
 const char OUTBOUND_DATA[] = "Data sent from client to server";
 const char INBOUND_DATA[] = "Returned data from server to client";
 const char INBOUND_DATA[] = "Returned data from server to client";
 }
 }
@@ -179,6 +179,7 @@ private:
     boost::shared_ptr<PrivateData>  ptr_;   ///< Pointer to private data
     boost::shared_ptr<PrivateData>  ptr_;   ///< Pointer to private data
 };
 };
 
 
+// TODO: Need to add a test to check the cancel() method
 
 
 // Tests the operation of a UDPSocket by opening it, sending an asynchronous
 // Tests the operation of a UDPSocket by opening it, sending an asynchronous
 // message to a server, receiving an asynchronous message from the server and
 // message to a server, receiving an asynchronous message from the server and
@@ -193,6 +194,7 @@ TEST(UDPSocket, SequenceTest) {
     // The client - the UDPSocket being tested
     // The client - the UDPSocket being tested
     UDPSocket   client(service);            // Socket under test
     UDPSocket   client(service);            // Socket under test
     UDPCallback client_cb("Client");        // Async I/O callback function
     UDPCallback client_cb("Client");        // Async I/O callback function
+    size_t      client_cumulative = 0;      // Cumulative data received
 
 
     // The server - with which the client communicates.  For convenience, we
     // The server - with which the client communicates.  For convenience, we
     // use the same io_service, and use the endpoint object created for
     // use the same io_service, and use the endpoint object created for
@@ -220,7 +222,7 @@ TEST(UDPSocket, SequenceTest) {
     // be called until we call the io_service.run() method.
     // be called until we call the io_service.run() method.
     client_cb.setCalled(false);
     client_cb.setCalled(false);
     client_cb.setCode(7);  // Arbitrary number
     client_cb.setCode(7);  // Arbitrary number
-    client.async_send(OUTBOUND_DATA, sizeof(OUTBOUND_DATA), &endpoint, client_cb);
+    client.asyncSend(OUTBOUND_DATA, sizeof(OUTBOUND_DATA), &endpoint, client_cb);
     EXPECT_FALSE(client_cb.getCalled());
     EXPECT_FALSE(client_cb.getCalled());
 
 
     // Execute the two callbacks.
     // Execute the two callbacks.
@@ -243,7 +245,8 @@ TEST(UDPSocket, SequenceTest) {
     client_cb.setCalled(false);
     client_cb.setCalled(false);
     client_cb.setCode(32);                  // Arbitrary number
     client_cb.setCode(32);                  // Arbitrary number
     UDPEndpoint client_remote_endpoint;     // To receive address of remote system
     UDPEndpoint client_remote_endpoint;     // To receive address of remote system
-    client.async_receive(data, sizeof(data), &client_remote_endpoint, client_cb);
+    client.asyncReceive(data, sizeof(data), client_cumulative,
+        &client_remote_endpoint, client_cb);
 
 
     // Issue the write on the server side to the source of the data it received.
     // Issue the write on the server side to the source of the data it received.
     server_cb.setLength(22345);             // Arbitrary number
     server_cb.setLength(22345);             // Arbitrary number
@@ -252,7 +255,6 @@ TEST(UDPSocket, SequenceTest) {
     server.async_send_to(buffer(INBOUND_DATA, sizeof(INBOUND_DATA)),
     server.async_send_to(buffer(INBOUND_DATA, sizeof(INBOUND_DATA)),
         server_remote_endpoint.getASIOEndpoint(), server_cb);
         server_remote_endpoint.getASIOEndpoint(), server_cb);
 
 
-
     // Expect two callbacks to run
     // Expect two callbacks to run
     service.run_one();
     service.run_one();
     service.run_one();
     service.run_one();
@@ -272,6 +274,11 @@ TEST(UDPSocket, SequenceTest) {
     EXPECT_TRUE(server_address == client_remote_endpoint.getAddress());
     EXPECT_TRUE(server_address == client_remote_endpoint.getAddress());
     EXPECT_EQ(SERVER_PORT, client_remote_endpoint.getPort());
     EXPECT_EQ(SERVER_PORT, client_remote_endpoint.getPort());
 
 
+    // Finally, check that the receive received a complete buffer's worth of data.
+    EXPECT_TRUE(client.receiveComplete(&data[0], client_cb.getLength(),
+        client_cumulative));
+    EXPECT_EQ(client_cb.getLength(), client_cumulative);
+
     // Close client and server.
     // Close client and server.
     EXPECT_NO_THROW(client.close());
     EXPECT_NO_THROW(client.close());
     EXPECT_NO_THROW(server.close());
     EXPECT_NO_THROW(server.close());

+ 5 - 3
src/lib/asiolink/udp_socket.cc

@@ -84,7 +84,7 @@ UDPSocket::open(const IOEndpoint* endpoint, IOCompletionCallback&) {
 // Send a message.
 // Send a message.
 
 
 void
 void
-UDPSocket::async_send(const void* data, size_t length,
+UDPSocket::asyncSend(const void* data, size_t length,
     const IOEndpoint* endpoint, IOCompletionCallback& callback)
     const IOEndpoint* endpoint, IOCompletionCallback& callback)
 {
 {
     // Upconverting.  Not nice, but we have the problem that in the abstract
     // Upconverting.  Not nice, but we have the problem that in the abstract
@@ -99,10 +99,12 @@ UDPSocket::async_send(const void* data, size_t length,
         callback);
         callback);
 }
 }
 
 
-// UDPSocket::receive_from
+// Receive a message. Note that the "cumulative" argument is ignored - every UDP
+// receive is put into the buffer beginning at the start - there is no concept
+// receiving a subsequent part of a message.
 
 
 void
 void
-UDPSocket::async_receive(void* data, size_t length, IOEndpoint* endpoint,
+UDPSocket::asyncReceive(void* data, size_t length, size_t, IOEndpoint* endpoint,
     IOCompletionCallback& callback)
     IOCompletionCallback& callback)
 {
 {
     // Upconvert the endpoint again.
     // Upconvert the endpoint again.

+ 23 - 3
src/lib/asiolink/udp_socket.h

@@ -82,7 +82,7 @@ public:
     /// \param length Length of data to send
     /// \param length Length of data to send
     /// \param endpoint Target of the send
     /// \param endpoint Target of the send
     /// \param callback Callback object.
     /// \param callback Callback object.
-    virtual void async_send(const void* data, size_t length,
+    virtual void asyncSend(const void* data, size_t length,
         const IOEndpoint* endpoint, IOCompletionCallback& callback);
         const IOEndpoint* endpoint, IOCompletionCallback& callback);
 
 
     /// \brief Receive Asynchronously
     /// \brief Receive Asynchronously
@@ -94,10 +94,30 @@ public:
     ///
     ///
     /// \param data Buffer to receive incoming message
     /// \param data Buffer to receive incoming message
     /// \param length Length of the data buffer
     /// \param length Length of the data buffer
+    /// \param cumulative Amount of data that should already be in the buffer.
+    /// (This is ignored - every UPD receive fills the buffer from the start.)
     /// \param endpoint Source of the communication
     /// \param endpoint Source of the communication
     /// \param callback Callback object
     /// \param callback Callback object
-    virtual void async_receive(void* data, size_t length, IOEndpoint* endpoint,
-        IOCompletionCallback& callback);
+    virtual void asyncReceive(void* data, size_t length, size_t cumulative,
+        IOEndpoint* endpoint, IOCompletionCallback& callback);
+
+    /// \brief Checks if the data received is complete.
+    ///
+    /// As all the data is received in one I/O, so this is, this is effectively
+    /// a no-op (although it does update the amount of data received).
+    ///
+    /// \param data Data buffer containing data to date.  (This is ignored
+    /// for UDP receives.)
+    /// \param length Amount of data received in last asynchronous I/O
+    /// \param cumulative On input, amount of data received before the last
+    /// I/O.  On output, the total amount of data received to date.
+    ///
+    /// \return true if the receive is complete, false if another receive is
+    /// needed.
+    virtual bool receiveComplete(void*, size_t length, size_t& cumulative) {
+        cumulative = length;
+        return (true);
+    }
 
 
     /// \brief Cancel I/O On Socket
     /// \brief Cancel I/O On Socket
     virtual void cancel();
     virtual void cancel();