|
@@ -28,6 +28,7 @@
|
|
#include <unistd.h> // for some IPC/network system calls
|
|
#include <unistd.h> // for some IPC/network system calls
|
|
#include <asio.hpp>
|
|
#include <asio.hpp>
|
|
#include <asio/error_code.hpp>
|
|
#include <asio/error_code.hpp>
|
|
|
|
+#include <asio/deadline_timer.hpp>
|
|
#include <asio/system_error.hpp>
|
|
#include <asio/system_error.hpp>
|
|
|
|
|
|
#include <cstdio>
|
|
#include <cstdio>
|
|
@@ -38,7 +39,9 @@
|
|
#include <sys/un.h>
|
|
#include <sys/un.h>
|
|
|
|
|
|
#include <boost/bind.hpp>
|
|
#include <boost/bind.hpp>
|
|
|
|
+#include <boost/optional.hpp>
|
|
#include <boost/function.hpp>
|
|
#include <boost/function.hpp>
|
|
|
|
+#include <boost/date_time/posix_time/posix_time_types.hpp>
|
|
|
|
|
|
#include <exceptions/exceptions.h>
|
|
#include <exceptions/exceptions.h>
|
|
|
|
|
|
@@ -53,20 +56,39 @@ using namespace isc::data;
|
|
// (e.g. write(2)) so we don't import the entire asio namespace.
|
|
// (e.g. write(2)) so we don't import the entire asio namespace.
|
|
using asio::io_service;
|
|
using asio::io_service;
|
|
|
|
|
|
|
|
+namespace {
|
|
|
|
+/// \brief Sets the given Optional 'result' to the given error code
|
|
|
|
+/// Used as a callback for emulating sync reads with async calls
|
|
|
|
+/// \param result Pointer to the optional to set
|
|
|
|
+/// \param err The error code to set it to
|
|
|
|
+void
|
|
|
|
+setResult(boost::optional<asio::error_code>* result,
|
|
|
|
+ const asio::error_code& err)
|
|
|
|
+{
|
|
|
|
+ result->reset(err);
|
|
|
|
+}
|
|
|
|
+}
|
|
|
|
+
|
|
namespace isc {
|
|
namespace isc {
|
|
namespace cc {
|
|
namespace cc {
|
|
|
|
+
|
|
class SessionImpl {
|
|
class SessionImpl {
|
|
public:
|
|
public:
|
|
SessionImpl(io_service& io_service) :
|
|
SessionImpl(io_service& io_service) :
|
|
sequence_(-1), queue_(Element::createList()),
|
|
sequence_(-1), queue_(Element::createList()),
|
|
- io_service_(io_service), socket_(io_service_), data_length_(0)
|
|
|
|
|
|
+ io_service_(io_service), socket_(io_service_), data_length_(0),
|
|
|
|
+ timeout_(MSGQ_DEFAULT_TIMEOUT)
|
|
{}
|
|
{}
|
|
void establish(const char& socket_file);
|
|
void establish(const char& socket_file);
|
|
void disconnect();
|
|
void disconnect();
|
|
void writeData(const void* data, size_t datalen);
|
|
void writeData(const void* data, size_t datalen);
|
|
size_t readDataLength();
|
|
size_t readDataLength();
|
|
|
|
+ // Blocking read. Will throw a SessionTimeout if the timeout value
|
|
|
|
+ // (in seconds) is thrown. If timeout is 0 it will block forever
|
|
void readData(void* data, size_t datalen);
|
|
void readData(void* data, size_t datalen);
|
|
void startRead(boost::function<void()> user_handler);
|
|
void startRead(boost::function<void()> user_handler);
|
|
|
|
+ void setTimeout(size_t seconds) { timeout_ = seconds; };
|
|
|
|
+ size_t getTimeout() const { return timeout_; };
|
|
|
|
|
|
long int sequence_; // the next sequence number to use
|
|
long int sequence_; // the next sequence number to use
|
|
std::string lname_;
|
|
std::string lname_;
|
|
@@ -82,6 +104,17 @@ private:
|
|
uint32_t data_length_;
|
|
uint32_t data_length_;
|
|
boost::function<void()> user_handler_;
|
|
boost::function<void()> user_handler_;
|
|
asio::error_code error_;
|
|
asio::error_code error_;
|
|
|
|
+ size_t timeout_;
|
|
|
|
+
|
|
|
|
+ // By default, unless changed or disabled, blocking reads on
|
|
|
|
+ // the msgq channel will time out after 4 seconds in this
|
|
|
|
+ // implementation.
|
|
|
|
+ // This number is chosen to be low enough so that whatever
|
|
|
|
+ // component is blocking does not seem to be hanging, but
|
|
|
|
+ // still gives enough time for other modules to respond if they
|
|
|
|
+ // are busy. If this choice turns out to be a bad one, we can
|
|
|
|
+ // change it later.
|
|
|
|
+ static const size_t MSGQ_DEFAULT_TIMEOUT = 4000;
|
|
};
|
|
};
|
|
|
|
|
|
void
|
|
void
|
|
@@ -131,8 +164,51 @@ SessionImpl::readDataLength() {
|
|
|
|
|
|
void
|
|
void
|
|
SessionImpl::readData(void* data, size_t datalen) {
|
|
SessionImpl::readData(void* data, size_t datalen) {
|
|
|
|
+ boost::optional<asio::error_code> read_result;
|
|
|
|
+ boost::optional<asio::error_code> timer_result;
|
|
|
|
+
|
|
try {
|
|
try {
|
|
- asio::read(socket_, asio::buffer(data, datalen));
|
|
|
|
|
|
+ asio::async_read(socket_, asio::buffer(data, datalen),
|
|
|
|
+ boost::bind(&setResult, &read_result, _1));
|
|
|
|
+ asio::deadline_timer timer(socket_.io_service());
|
|
|
|
+
|
|
|
|
+ if (getTimeout() != 0) {
|
|
|
|
+ timer.expires_from_now(boost::posix_time::milliseconds(getTimeout()));
|
|
|
|
+ timer.async_wait(boost::bind(&setResult, &timer_result, _1));
|
|
|
|
+ }
|
|
|
|
+
|
|
|
|
+ // wait until either we have read the data we want, the
|
|
|
|
+ // timer expires, or one of the two is triggered with an error.
|
|
|
|
+ // When one of them has a result, cancel the other, and wait
|
|
|
|
+ // until the cancel is processed before we continue
|
|
|
|
+ while (!read_result && !timer_result) {
|
|
|
|
+ socket_.io_service().run_one();
|
|
|
|
+
|
|
|
|
+ // Don't cancel the timer if we haven't set it
|
|
|
|
+ if (read_result && getTimeout() != 0) {
|
|
|
|
+ timer.cancel();
|
|
|
|
+ while (!timer_result) {
|
|
|
|
+ socket_.io_service().run_one();
|
|
|
|
+ }
|
|
|
|
+ } else if (timer_result) {
|
|
|
|
+ socket_.cancel();
|
|
|
|
+ while (!read_result) {
|
|
|
|
+ socket_.io_service().run_one();
|
|
|
|
+ }
|
|
|
|
+ }
|
|
|
|
+ }
|
|
|
|
+
|
|
|
|
+ // asio::error_code evaluates to false if there was no error
|
|
|
|
+ if (*read_result) {
|
|
|
|
+ if (*read_result == asio::error::operation_aborted) {
|
|
|
|
+ isc_throw(SessionTimeout,
|
|
|
|
+ "Timeout while reading data from cc session");
|
|
|
|
+ } else {
|
|
|
|
+ isc_throw(SessionError,
|
|
|
|
+ "Error while reading data from cc session: " <<
|
|
|
|
+ read_result->message());
|
|
|
|
+ }
|
|
|
|
+ }
|
|
} catch (const asio::system_error& asio_ex) {
|
|
} catch (const asio::system_error& asio_ex) {
|
|
// to hide ASIO specific exceptions, we catch them explicitly
|
|
// to hide ASIO specific exceptions, we catch them explicitly
|
|
// and convert it to SessionError.
|
|
// and convert it to SessionError.
|
|
@@ -144,11 +220,11 @@ void
|
|
SessionImpl::startRead(boost::function<void()> user_handler) {
|
|
SessionImpl::startRead(boost::function<void()> user_handler) {
|
|
data_length_ = 0;
|
|
data_length_ = 0;
|
|
user_handler_ = user_handler;
|
|
user_handler_ = user_handler;
|
|
- async_read(socket_, asio::buffer(&data_length_,
|
|
|
|
- sizeof(data_length_)),
|
|
|
|
- boost::bind(&SessionImpl::internalRead, this,
|
|
|
|
- asio::placeholders::error,
|
|
|
|
- asio::placeholders::bytes_transferred));
|
|
|
|
|
|
+ asio::async_read(socket_, asio::buffer(&data_length_,
|
|
|
|
+ sizeof(data_length_)),
|
|
|
|
+ boost::bind(&SessionImpl::internalRead, this,
|
|
|
|
+ asio::placeholders::error,
|
|
|
|
+ asio::placeholders::bytes_transferred));
|
|
}
|
|
}
|
|
|
|
|
|
void
|
|
void
|
|
@@ -410,5 +486,14 @@ Session::hasQueuedMsgs() {
|
|
return (impl_->queue_->size() > 0);
|
|
return (impl_->queue_->size() > 0);
|
|
}
|
|
}
|
|
|
|
|
|
|
|
+void
|
|
|
|
+Session::setTimeout(size_t milliseconds) {
|
|
|
|
+ impl_->setTimeout(milliseconds);
|
|
|
|
+}
|
|
|
|
+
|
|
|
|
+size_t
|
|
|
|
+Session::getTimeout() const {
|
|
|
|
+ return (impl_->getTimeout());
|
|
|
|
+}
|
|
}
|
|
}
|
|
}
|
|
}
|