Browse Source

[1924] Merge remote-tracking branch 'origin/trac1914' into trac1924

fixed Conflicts:
	src/lib/cc/tests/session_unittests.cc
JINMEI Tatuya 12 years ago
parent
commit
7f8ec08cfd

+ 37 - 17
src/bin/msgq/msgq.py.in

@@ -33,6 +33,7 @@ import threading
 import isc.config.ccsession
 import isc.config.ccsession
 from optparse import OptionParser, OptionValueError
 from optparse import OptionParser, OptionValueError
 import isc.util.process
 import isc.util.process
+from isc.util.common_defs import *
 import isc.log
 import isc.log
 from isc.log_messages.msgq_messages import *
 from isc.log_messages.msgq_messages import *
 
 
@@ -464,6 +465,15 @@ class MsgQ:
             sock.setblocking(1)
             sock.setblocking(1)
 
 
     def send_prepared_msg(self, sock, msg):
     def send_prepared_msg(self, sock, msg):
+        '''
+        Add a message to the queue. If there's nothing waiting, try
+        to send it right away.
+
+        Return if the socket is still alive. It can return false if the
+        socket dies (for example due to EPIPE in the attempt to send).
+        Returning true does not guarantee the message will be delivered,
+        but returning false means it won't.
+        '''
         # Try to send the data, but only if there's nothing waiting
         # Try to send the data, but only if there's nothing waiting
         fileno = sock.fileno()
         fileno = sock.fileno()
         if fileno in self.sendbuffs:
         if fileno in self.sendbuffs:
@@ -472,7 +482,7 @@ class MsgQ:
             amount_sent = self.__send_data(sock, msg)
             amount_sent = self.__send_data(sock, msg)
             if amount_sent is None:
             if amount_sent is None:
                 # Socket has been killed, drop the send
                 # Socket has been killed, drop the send
-                return
+                return False
 
 
         # Still something to send, add it to outgoing queue
         # Still something to send, add it to outgoing queue
         if amount_sent < len(msg):
         if amount_sent < len(msg):
@@ -482,7 +492,7 @@ class MsgQ:
                 (last_sent, buff) = self.sendbuffs[fileno]
                 (last_sent, buff) = self.sendbuffs[fileno]
                 if now - last_sent > 0.1:
                 if now - last_sent > 0.1:
                     self.kill_socket(fileno, sock)
                     self.kill_socket(fileno, sock)
-                    return
+                    return False
                 buff += msg
                 buff += msg
             else:
             else:
                 buff = msg[amount_sent:]
                 buff = msg[amount_sent:]
@@ -493,6 +503,7 @@ class MsgQ:
                 else:
                 else:
                     self.add_kqueue_socket(sock, True)
                     self.add_kqueue_socket(sock, True)
             self.sendbuffs[fileno] = (last_sent, buff)
             self.sendbuffs[fileno] = (last_sent, buff)
+        return True
 
 
     def __process_write(self, fileno):
     def __process_write(self, fileno):
         # Try to send some data from the buffer
         # Try to send some data from the buffer
@@ -527,14 +538,14 @@ class MsgQ:
         self.sendmsg(sock, { "type" : "getlname" }, { "lname" : lname })
         self.sendmsg(sock, { "type" : "getlname" }, { "lname" : lname })
 
 
     def process_command_send(self, sock, routing, data):
     def process_command_send(self, sock, routing, data):
-        group = routing["group"]
-        instance = routing["instance"]
-        to = routing["to"]
+        group = routing[CC_HEADER_GROUP]
+        instance = routing[CC_HEADER_INSTANCE]
+        to = routing[CC_HEADER_TO]
         if group == None or instance == None:
         if group == None or instance == None:
             # FIXME: Should we log them instead?
             # FIXME: Should we log them instead?
             return  # ignore invalid packets entirely
             return  # ignore invalid packets entirely
 
 
-        if to == "*":
+        if to == CC_TO_WILDCARD:
             sockets = self.subs.find(group, instance)
             sockets = self.subs.find(group, instance)
         else:
         else:
             if to in self.lnames:
             if to in self.lnames:
@@ -548,31 +559,40 @@ class MsgQ:
             # Don't bounce to self
             # Don't bounce to self
             sockets.remove(sock)
             sockets.remove(sock)
 
 
-        if sockets:
-            for socket in sockets:
-                self.send_prepared_msg(socket, msg)
-        elif routing.get("wants_reply") and "reply" not in routing:
+        has_recipient = False
+        for socket in sockets:
+            if self.send_prepared_msg(socket, msg):
+                has_recipient = True
+        if not has_recipient and routing.get(CC_HEADER_WANT_ANSWER) and \
+            CC_HEADER_REPLY not in routing:
             # We have no recipients. But the sender insists on a reply
             # We have no recipients. But the sender insists on a reply
             # (and the message isn't a reply itself). We need to send
             # (and the message isn't a reply itself). We need to send
-            # an error to satisfy the senders hurger for response, since
+            # an error to satisfy the senders hunger for response, since
             # nobody else will do that.
             # nobody else will do that.
+            #
+            # We omit the replies on purpose. The recipient might generate
+            # the response by copying and mangling the header of incoming
+            # message (just like we do below) and would include the want_answer
+            # by accident. And we want to avoid loops of errors. Also, it
+            # is unclear if the knowledge of undeliverable reply would be
+            # of any use to the sender, and it should be much rarer situation.
 
 
             # The real errors would be positive, 1 most probably. We use
             # The real errors would be positive, 1 most probably. We use
             # negative errors for delivery errors to distinguish them a
             # negative errors for delivery errors to distinguish them a
             # little. We probably should have a way to provide more data
             # little. We probably should have a way to provide more data
             # in the error message.
             # in the error message.
-            payload = isc.config.ccsession.create_answer(-1,
+            payload = isc.config.ccsession.create_answer(CC_REPLY_NO_RECPT,
                                                          "No such recipient")
                                                          "No such recipient")
             # We create the header based on the current one. But we don't
             # We create the header based on the current one. But we don't
             # want to mangle it for the caller, so we get a copy. A shallow
             # want to mangle it for the caller, so we get a copy. A shallow
             # one should be enough, we modify the dict only.
             # one should be enough, we modify the dict only.
             header = routing.copy()
             header = routing.copy()
-            header["reply"] = routing["seq"]
-            header["from"] = 'msgq' # Dummy lname not assigned to clients
-            header["to"] = routing["from"]
+            header[CC_HEADER_REPLY] = routing[CC_HEADER_SEQ]
+            header[CC_HEADER_FROM] = "msgq" # Dummy lname not assigned to clients
+            header[CC_HEADER_TO] = routing[CC_HEADER_FROM]
             # We keep the seq as it is. We don't need to track the message
             # We keep the seq as it is. We don't need to track the message
-            # and provided the sender always uses a new one, it won't know
-            # we're cheating, since we won't send it two same either.
+            # and we will not confuse the sender. The sender would use an unique
+            # id for each message, so we won't return one twice to it.
             errmsg = self.preparemsg(header, payload)
             errmsg = self.preparemsg(header, payload)
             # Send it back.
             # Send it back.
             self.send_prepared_msg(sock, errmsg)
             self.send_prepared_msg(sock, errmsg)

+ 92 - 17
src/bin/msgq/tests/msgq_test.py

@@ -166,16 +166,25 @@ class MsgQTest(unittest.TestCase):
         """
         """
         Send several packets through the MsgQ and check it generates
         Send several packets through the MsgQ and check it generates
         undeliverable notifications under the correct circumstances.
         undeliverable notifications under the correct circumstances.
+
+        The test is not exhaustive as it doesn't test all combination
+        of existence of the recipient, addressing schemes, want_answer
+        header and the reply header. It is not needed, these should
+        be mostly independant (eg. if the recipient is missing, it
+        shouldn't matter why to the handling of the reply header). If
+        we included everything, the test would have too many scenarios.
         """
         """
         sent_messages = []
         sent_messages = []
-        def fake_end_prepared_msg(socket, msg):
+        def fake_send_prepared_msg(socket, msg):
             sent_messages.append((socket, msg))
             sent_messages.append((socket, msg))
-        self.__msgq.send_prepared_msg = fake_end_prepared_msg
+            return True
+        self.__msgq.send_prepared_msg = fake_send_prepared_msg
         # These would be real sockets in the MsgQ, but we pass them as
         # These would be real sockets in the MsgQ, but we pass them as
         # parameters only, so we don't need them to be. We use simple
         # parameters only, so we don't need them to be. We use simple
         # integers to tell one from another.
         # integers to tell one from another.
         sender = 1
         sender = 1
         recipient = 2
         recipient = 2
+        another_recipiet = 3
         # The routing headers and data to test with.
         # The routing headers and data to test with.
         routing = {
         routing = {
             'to': '*',
             'to': '*',
@@ -192,11 +201,11 @@ class MsgQTest(unittest.TestCase):
         self.__msgq.process_command_send(sender, routing, data)
         self.__msgq.process_command_send(sender, routing, data)
         self.assertEqual([], sent_messages)
         self.assertEqual([], sent_messages)
         # It should act the same if we explicitly say we do not want replies.
         # It should act the same if we explicitly say we do not want replies.
-        routing["wants_reply"] = False
+        routing["want_answer"] = False
         self.__msgq.process_command_send(sender, routing, data)
         self.__msgq.process_command_send(sender, routing, data)
         self.assertEqual([], sent_messages)
         self.assertEqual([], sent_messages)
         # Ask for errors if it can't be delivered.
         # Ask for errors if it can't be delivered.
-        routing["wants_reply"] = True
+        routing["want_answer"] = True
         self.__msgq.process_command_send(sender, routing, data)
         self.__msgq.process_command_send(sender, routing, data)
         self.assertEqual(1, len(sent_messages))
         self.assertEqual(1, len(sent_messages))
         self.assertEqual(1, sent_messages[0][0])
         self.assertEqual(1, sent_messages[0][0])
@@ -207,17 +216,12 @@ class MsgQTest(unittest.TestCase):
                               'seq': 42,
                               'seq': 42,
                               'from': 'msgq',
                               'from': 'msgq',
                               'to': 'sender',
                               'to': 'sender',
-                              'wants_reply': True
+                              'want_answer': True
                           }, {'result': [-1, "No such recipient"]}),
                           }, {'result': [-1, "No such recipient"]}),
                           self.parse_msg(sent_messages[0][1]))
                           self.parse_msg(sent_messages[0][1]))
         # the reply header too.
         # the reply header too.
         sent_messages = []
         sent_messages = []
-        # If the message is a reply itself, we never generate the errors, even
-        # if they can't be delivered. This is partly because the answer reuses
-        # the old header (which would then inherit the wants_reply flag) and
-        # partly we want to avoid loops of errors that can't be delivered.
-        # If a reply can't be delivered, the sender can't do much anyway even
-        # if notified.
+        # If the message is a reply itself, we never generate the errors
         routing["reply"] = 3
         routing["reply"] = 3
         self.__msgq.process_command_send(sender, routing, data)
         self.__msgq.process_command_send(sender, routing, data)
         self.assertEqual([], sent_messages)
         self.assertEqual([], sent_messages)
@@ -243,7 +247,7 @@ class MsgQTest(unittest.TestCase):
                               'seq': 42,
                               'seq': 42,
                               'from': 'msgq',
                               'from': 'msgq',
                               'to': 'sender',
                               'to': 'sender',
-                              'wants_reply': True
+                              'want_answer': True
                           }, {'result': [-1, "No such recipient"]}),
                           }, {'result': [-1, "No such recipient"]}),
                           self.parse_msg(sent_messages[0][1]))
                           self.parse_msg(sent_messages[0][1]))
         sent_messages = []
         sent_messages = []
@@ -255,6 +259,39 @@ class MsgQTest(unittest.TestCase):
         self.assertEqual(2, sent_messages[0][0]) # The recipient
         self.assertEqual(2, sent_messages[0][0]) # The recipient
         self.assertEqual((routing, data), self.parse_msg(sent_messages[0][1]))
         self.assertEqual((routing, data), self.parse_msg(sent_messages[0][1]))
         sent_messages = []
         sent_messages = []
+        # If an attempt to send fails, consider it no recipient.
+        def fail_send_prepared_msg(socket, msg):
+            '''
+            Pretend sending a message failed. After one call, return to the
+            usual mock, so the errors or other messages can be sent.
+            '''
+            self.__msgq.send_prepared_msg = fake_send_prepared_msg
+        self.__msgq.send_prepared_msg = fail_send_prepared_msg
+        self.__msgq.process_command_send(sender, routing, data)
+        self.assertEqual(1, len(sent_messages))
+        self.assertEqual(1, sent_messages[0][0])
+        self.assertEqual(({
+                              'group': 'group',
+                              'instance': '*',
+                              'reply': 42,
+                              'seq': 42,
+                              'from': 'msgq',
+                              'to': 'sender',
+                              'want_answer': True
+                          }, {'result': [-1, "No such recipient"]}),
+                          self.parse_msg(sent_messages[0][1]))
+        sent_messages = []
+        # But if there are more recipients and only one fails, it should
+        # be delivered to the other and not considered an error
+        self.__msgq.send_prepared_msg = fail_send_prepared_msg
+        routing["to"] = '*'
+        self.__msgq.subs.find = lambda group, instance: [recipient,
+                                                         another_recipiet]
+        self.__msgq.process_command_send(sender, routing, data)
+        self.assertEqual(1, len(sent_messages))
+        self.assertEqual(3, sent_messages[0][0]) # The recipient
+        self.assertEqual((routing, data), self.parse_msg(sent_messages[0][1]))
+        sent_messages = []
 
 
 class DummySocket:
 class DummySocket:
     """
     """
@@ -360,17 +397,27 @@ class SendNonblock(unittest.TestCase):
             self.assertEqual(0, status,
             self.assertEqual(0, status,
                 "The task did not complete successfully in time")
                 "The task did not complete successfully in time")
 
 
+    def get_msgq_with_sockets(self):
+        '''
+        Create a message queue and prepare it for use with a socket pair.
+        The write end is put into the message queue, so we can check it.
+        It returns (msgq, read_end, write_end). It is expected the sockets
+        are closed by the caller afterwards.
+        '''
+        msgq = MsgQ()
+        # We do only partial setup, so we don't create the listening socket
+        msgq.setup_poller()
+        (read, write) = socket.socketpair(socket.AF_UNIX, socket.SOCK_STREAM)
+        msgq.register_socket(write)
+        return (msgq, read, write)
+
     def infinite_sender(self, sender):
     def infinite_sender(self, sender):
         """
         """
         Sends data until an exception happens. socket.error is caught,
         Sends data until an exception happens. socket.error is caught,
         as it means the socket got closed. Sender is called to actually
         as it means the socket got closed. Sender is called to actually
         send the data.
         send the data.
         """
         """
-        msgq = MsgQ()
-        # We do only partial setup, so we don't create the listening socket
-        msgq.setup_poller()
-        (read, write) = socket.socketpair(socket.AF_UNIX, socket.SOCK_STREAM)
-        msgq.register_socket(write)
+        (msgq, read, write) = self.get_msgq_with_sockets()
         # Keep sending while it is not closed by the msgq
         # Keep sending while it is not closed by the msgq
         try:
         try:
             while True:
             while True:
@@ -406,6 +453,34 @@ class SendNonblock(unittest.TestCase):
         self.terminate_check(lambda: self.infinite_sender(
         self.terminate_check(lambda: self.infinite_sender(
             lambda msgq, socket: msgq.send_prepared_msg(socket, data)))
             lambda msgq, socket: msgq.send_prepared_msg(socket, data)))
 
 
+    def test_sendprepared_success(self):
+        '''
+        Test the send_prepared_msg returns success when queueing messages.
+        It does so on the first attempt (when it actually tries to send
+        something to the socket) and on any attempt that follows and the
+        buffer is already full.
+        '''
+        (msgq, read, write) = self.get_msgq_with_sockets()
+        # Now keep sending until we fill in something into the internal
+        # buffer.
+        while not write.fileno() in msgq.sendbuffs:
+            self.assertTrue(msgq.send_prepared_msg(write, b'data'))
+        read.close()
+        write.close()
+
+    def test_sendprepared_epipe(self):
+        '''
+        Test the send_prepared_msg returns false when we try to queue a
+        message and the other side is not there any more. It should be done
+        with EPIPE, so not a fatal error.
+        '''
+        (msgq, read, write) = self.get_msgq_with_sockets()
+        # Close one end. It should make a EPIPE on the other.
+        read.close()
+        # Now it should soft-fail
+        self.assertFalse(msgq.send_prepared_msg(write, b'data'))
+        write.close()
+
     def send_many(self, data):
     def send_many(self, data):
         """
         """
         Tries that sending a command many times and getting an answer works.
         Tries that sending a command many times and getting an answer works.

+ 11 - 10
src/lib/cc/session.cc

@@ -30,6 +30,9 @@
 #include <asio/deadline_timer.hpp>
 #include <asio/deadline_timer.hpp>
 #include <asio/system_error.hpp>
 #include <asio/system_error.hpp>
 
 
+#include <cc/data.h>
+#include <cc/session.h>
+
 #include <cstdio>
 #include <cstdio>
 #include <vector>
 #include <vector>
 #include <iostream>
 #include <iostream>
@@ -44,9 +47,6 @@
 
 
 #include <exceptions/exceptions.h>
 #include <exceptions/exceptions.h>
 
 
-#include <cc/data.h>
-#include <cc/session.h>
-
 using namespace std;
 using namespace std;
 using namespace isc::cc;
 using namespace isc::cc;
 using namespace isc::data;
 using namespace isc::data;
@@ -480,13 +480,14 @@ Session::group_sendmsg(ConstElementPtr msg, std::string group,
     ElementPtr env = Element::createMap();
     ElementPtr env = Element::createMap();
     long int nseq = ++impl_->sequence_;
     long int nseq = ++impl_->sequence_;
 
 
-    env->set("type", Element::create("send"));
-    env->set("from", Element::create(impl_->lname_));
-    env->set("to", Element::create(to));
-    env->set("group", Element::create(group));
-    env->set("instance", Element::create(instance));
-    env->set("seq", Element::create(nseq));
-    env->set("want_answer", Element::create(want_answer));
+    env->set(isc::util::CC_HEADER_TYPE,
+             Element::create(isc::util::CC_COMMAND_SEND));
+    env->set(isc::util::CC_HEADER_FROM, Element::create(impl_->lname_));
+    env->set(isc::util::CC_HEADER_TO, Element::create(to));
+    env->set(isc::util::CC_HEADER_GROUP, Element::create(group));
+    env->set(isc::util::CC_HEADER_INSTANCE, Element::create(instance));
+    env->set(isc::util::CC_HEADER_SEQ, Element::create(nseq));
+    env->set(isc::util::CC_HEADER_WANT_ANSWER, Element::create(want_answer));
 
 
     sendmsg(env, msg);
     sendmsg(env, msg);
     return (nseq);
     return (nseq);

+ 18 - 9
src/lib/cc/session.h

@@ -15,14 +15,16 @@
 #ifndef ISC_SESSION_H
 #ifndef ISC_SESSION_H
 #define ISC_SESSION_H 1
 #define ISC_SESSION_H 1
 
 
-#include <string>
+#include <cc/data.h>
+#include <cc/session_config.h>
 
 
-#include <boost/function.hpp>
+#include <util/common_defs.h>
 
 
 #include <exceptions/exceptions.h>
 #include <exceptions/exceptions.h>
 
 
-#include <cc/data.h>
-#include <cc/session_config.h>
+#include <string>
+
+#include <boost/function.hpp>
 
 
 namespace asio {
 namespace asio {
 class io_service;
 class io_service;
@@ -81,8 +83,10 @@ namespace isc {
             virtual void disconnect() = 0;
             virtual void disconnect() = 0;
             virtual int group_sendmsg(isc::data::ConstElementPtr msg,
             virtual int group_sendmsg(isc::data::ConstElementPtr msg,
                                       std::string group,
                                       std::string group,
-                                      std::string instance = "*",
-                                      std::string to = "*",
+                                      std::string instance =
+                                          isc::util::CC_INSTANCE_WILDCARD,
+                                      std::string to =
+                                          isc::util::CC_TO_WILDCARD,
                                       bool want_answer = false) = 0;
                                       bool want_answer = false) = 0;
             virtual bool group_recvmsg(isc::data::ConstElementPtr& envelope,
             virtual bool group_recvmsg(isc::data::ConstElementPtr& envelope,
                                        isc::data::ConstElementPtr& msg,
                                        isc::data::ConstElementPtr& msg,
@@ -164,9 +168,14 @@ namespace isc {
             /// @param returns socket descriptor used for session connection
             /// @param returns socket descriptor used for session connection
             virtual int getSocketDesc() const;
             virtual int getSocketDesc() const;
     private:
     private:
-            void sendmsg(isc::data::ConstElementPtr msg);
-            void sendmsg(isc::data::ConstElementPtr env,
-                         isc::data::ConstElementPtr msg);
+            // The following two methods are virtual to allow tests steal and
+            // replace them. It is not expected to be specialized by a derived
+            // class. Actually, it is not expected to inherit from this class
+            // to begin with.
+            virtual void sendmsg(isc::data::ConstElementPtr msg);
+            virtual void sendmsg(isc::data::ConstElementPtr env,
+                                 isc::data::ConstElementPtr msg);
+
             bool recvmsg(isc::data::ConstElementPtr& msg,
             bool recvmsg(isc::data::ConstElementPtr& msg,
                          bool nonblock = true,
                          bool nonblock = true,
                          int seq = -1);
                          int seq = -1);

+ 43 - 82
src/lib/cc/tests/session_unittests.cc

@@ -29,13 +29,13 @@
 #include <exceptions/exceptions.h>
 #include <exceptions/exceptions.h>
 
 
 #include <utility>
 #include <utility>
-#include <vector>
+#include <list>
 #include <string>
 #include <string>
 #include <iostream>
 #include <iostream>
 
 
 using namespace isc::cc;
 using namespace isc::cc;
 using std::pair;
 using std::pair;
-using std::vector;
+using std::list;
 using std::string;
 using std::string;
 using isc::data::ConstElementPtr;
 using isc::data::ConstElementPtr;
 using isc::data::Element;
 using isc::data::Element;
@@ -64,9 +64,6 @@ TEST(AsioSession, establish) {
     );
     );
 }
 }
 
 
-/// \brief Pair holding header and data of a message sent over the wire.
-typedef pair<ConstElementPtr, ConstElementPtr> SentMessage;
-
 // This class sets up a domain socket for the session to connect to
 // This class sets up a domain socket for the session to connect to
 // it will impersonate the msgq a tiny bit (if setSendLname() has
 // it will impersonate the msgq a tiny bit (if setSendLname() has
 // been called, it will send an 'answer' to the lname query that is
 // been called, it will send an 'answer' to the lname query that is
@@ -109,53 +106,6 @@ public:
         socket_.send(asio::buffer(body_wire.data(), body_wire.length()));
         socket_.send(asio::buffer(body_wire.data(), body_wire.length()));
     }
     }
 
 
-    /// \brief Read a message from the socket
-    ///
-    /// Read a message from the socket and parse it. Block until it is
-    /// read or error happens. If error happens, it asio::system_error.
-    ///
-    /// This method would block for ever if the sender is not sending.
-    /// But the whole test has a timeout of 10 seconds (see the
-    /// SessionTest::SetUp and SessionTest::TearDown).
-    ///
-    /// \note The method assumes the wire data are correct and does not check
-    ///    it. Strange things might happen if it is not the case, but the
-    ///    test would likely fail as a result, so we prefer simplicity here.
-    ///
-    /// \return Pair containing the header and body elements (in this order).
-    SentMessage readmsg() {
-        // The format is:
-        // <uint32_t in net order = total length>
-        // <uint16_t in net order = header length>
-        // <char * header length = the header>
-        // <char * the rest of the total length = the data>
-
-        // Read and convert the lengths first.
-        uint32_t total_len_data;
-        uint16_t header_len_data;
-        vector<asio::mutable_buffer> len_buffers;
-        len_buffers.push_back(asio::buffer(&total_len_data,
-                                           sizeof total_len_data));
-        len_buffers.push_back(asio::buffer(&header_len_data,
-                                           sizeof header_len_data));
-        asio::read(socket_, len_buffers);
-        const uint32_t total_len = ntohl(total_len_data);
-        const uint16_t header_len = ntohs(header_len_data);
-        string header, msg;
-        header.resize(header_len);
-        msg.resize(total_len - header_len - sizeof header_len_data);
-        vector<asio::mutable_buffer> data_buffers;
-        data_buffers.push_back(asio::buffer(&header[0], header.size()));
-        data_buffers.push_back(asio::buffer(&msg[0], msg.size()));
-        asio::read(socket_, data_buffers);
-        if (msg == "") { // The case of no msg present, for control messages
-            msg = "null";
-        }
-        // Extract the right data into each string and convert.
-        return (SentMessage(Element::fromWire(header),
-                            Element::fromWire(msg)));
-    }
-
     void sendLname() {
     void sendLname() {
         isc::data::ElementPtr lname_answer1 =
         isc::data::ElementPtr lname_answer1 =
             isc::data::Element::fromJSON("{ \"type\": \"lname\" }");
             isc::data::Element::fromJSON("{ \"type\": \"lname\" }");
@@ -178,6 +128,38 @@ private:
     char data_buf[1024];
     char data_buf[1024];
 };
 };
 
 
+/// \brief Pair holding header and data of a message sent over the connection.
+typedef pair<ConstElementPtr, ConstElementPtr> SentMessage;
+
+// We specialize the tested class a little. We replace some low-level
+// methods so we can examine the rest without relying on real network IO
+class TestSession : public Session {
+public:
+    TestSession(asio::io_service& ioservice) :
+        Session(ioservice)
+    {}
+    // Get first message previously sent by sendmsg and remove it from the
+    // buffer. Expects there's at leas one message in the buffer.
+    SentMessage getSentMessage() {
+        assert(!sent_messages_.empty());
+        SentMessage result(sent_messages_.front());
+        sent_messages_.pop_front();
+        return (result);
+    }
+private:
+    // Override the sendmsg. They are not sent over the real connection, but
+    // stored locally and can be extracted by getSentMessage()
+    virtual void sendmsg(ConstElementPtr msg) {
+        sendmsg(msg, ConstElementPtr(new isc::data::NullElement));
+    }
+    virtual void sendmsg(ConstElementPtr env, ConstElementPtr msg) {
+        sent_messages_.push_back(SentMessage(env, msg));
+    }
+
+    // The sendmsg stores data here.
+    list<SentMessage> sent_messages_;
+};
+
 class SessionTest : public ::testing::Test {
 class SessionTest : public ::testing::Test {
 protected:
 protected:
     SessionTest() : sess(my_io_service), work(my_io_service) {
     SessionTest() : sess(my_io_service), work(my_io_service) {
@@ -191,36 +173,13 @@ protected:
         delete tds;
         delete tds;
     }
     }
 
 
-    void SetUp() {
-        // There are blocking reads in some tests. We want to have a safety
-        // catch in case the sender didn't write enough. We set a timeout of
-        // 10 seconds per one test (which should really be enough even on
-        // slow machines). If the timeout happens, it kills the test and
-        // the whole test fails.
-        //alarm(10);
-    }
-
-    void TearDown() {
-        // Cancel the timeout scheduled in SetUp. We don't want to kill any
-        // of the other tests by it by accident.
-        alarm(0);
-    }
-
     // Check two elements are equal
     // Check two elements are equal
-    void elementsEqual(const ConstElementPtr& expected,
-                       const ConstElementPtr& actual) const
-    {
-        EXPECT_TRUE(expected->equals(*actual)) <<
-            "Elements differ, expected: " << expected->toWire() <<
-            ", got: " << actual->toWire();
-    }
-
-    // The same, but with one specified as string
     void elementsEqual(const string& expected,
     void elementsEqual(const string& expected,
                        const ConstElementPtr& actual) const
                        const ConstElementPtr& actual) const
     {
     {
-        const ConstElementPtr expected_el(Element::fromJSON(expected));
-        elementsEqual(expected_el, actual);
+        EXPECT_TRUE(Element::fromJSON(expected)->equals(*actual)) <<
+            "Elements differ, expected: " << expected <<
+            ", got: " << actual->toWire();
     }
     }
 
 
     // Check the session sent a message with the given header. The
     // Check the session sent a message with the given header. The
@@ -229,7 +188,7 @@ protected:
                           const char* description) const
                           const char* description) const
     {
     {
         SCOPED_TRACE(description);
         SCOPED_TRACE(description);
-        const SentMessage msg(tds->readmsg());
+        const SentMessage &msg(sess.getSentMessage());
         elementsEqual(expected_hdr, msg.first);
         elementsEqual(expected_hdr, msg.first);
         elementsEqual("{\"test\": 42}", msg.second);
         elementsEqual("{\"test\": 42}", msg.second);
     }
     }
@@ -256,7 +215,7 @@ public:
 protected:
 protected:
     asio::io_service my_io_service;
     asio::io_service my_io_service;
     TestDomainSocket* tds;
     TestDomainSocket* tds;
-    Session sess;
+    TestSession sess;
     // Keep run() from stopping right away by informing it it has work to do
     // Keep run() from stopping right away by informing it it has work to do
     asio::io_service::work work;
     asio::io_service::work work;
 };
 };
@@ -352,10 +311,12 @@ TEST_F(SessionTest, get_socket_descr) {
 
 
 // Test the group_sendmsg sends the correct data.
 // Test the group_sendmsg sends the correct data.
 TEST_F(SessionTest, group_sendmsg) {
 TEST_F(SessionTest, group_sendmsg) {
-    // Connect
+    // Connect (to set the lname, so we can see it sets the from)
     tds->setSendLname();
     tds->setSendLname();
     sess.establish(BIND10_TEST_SOCKET_FILE);
     sess.establish(BIND10_TEST_SOCKET_FILE);
-    elementsEqual("{\"type\": \"getlname\"}", tds->readmsg().first);
+    // Eat the "get_lname" message, so it doesn't confuse the
+    // test below.
+    sess.getSentMessage();
 
 
     const ConstElementPtr msg(Element::fromJSON("{\"test\": 42}"));
     const ConstElementPtr msg(Element::fromJSON("{\"test\": 42}"));
     sess.group_sendmsg(msg, "group");
     sess.group_sendmsg(msg, "group");

+ 1 - 1
src/lib/python/isc/Makefile.am

@@ -1,4 +1,4 @@
-SUBDIRS = datasrc cc config dns log net notify util testutils acl bind10
+SUBDIRS = datasrc util cc config dns log net notify testutils acl bind10
 SUBDIRS += xfrin log_messages server_common ddns sysinfo statistics
 SUBDIRS += xfrin log_messages server_common ddns sysinfo statistics
 
 
 python_PYTHON = __init__.py
 python_PYTHON = __init__.py

+ 28 - 9
src/lib/python/isc/cc/session.py

@@ -22,6 +22,7 @@ import threading
 import bind10_config
 import bind10_config
 
 
 import isc.cc.message
 import isc.cc.message
+from isc.util.common_defs import *
 
 
 class ProtocolError(Exception): pass
 class ProtocolError(Exception): pass
 class NetworkError(Exception): pass
 class NetworkError(Exception): pass
@@ -256,17 +257,35 @@ class Session:
             "instance": instance,
             "instance": instance,
         })
         })
 
 
-    def group_sendmsg(self, msg, group, instance = "*", to = "*",
-                      want_answer=False):
+    def group_sendmsg(self, msg, group, instance=CC_INSTANCE_WILDCARD,
+                      to=CC_TO_WILDCARD, want_answer=False):
+        '''
+        Send a message over the CC session.
+
+        Parameters:
+        - msg The message to send, encoded as python structures (dicts,
+          lists, etc).
+        - group The recipient group to send to.
+        - instance Instance in the group.
+        - to Direct recipient (overrides the above, should contain the
+          lname of the recipient).
+        - want_answer If an answer is requested. If there's no recipient,
+          the message queue would send an error message instead of the
+          answer.
+
+        Return:
+          A sequence number that can be used to wait for an answer
+          (see group_recvmsg).
+        '''
         seq = self._next_sequence()
         seq = self._next_sequence()
         self.sendmsg({
         self.sendmsg({
-            "type": "send",
-            "from": self._lname,
-            "to": to,
-            "group": group,
-            "instance": instance,
-            "seq": seq,
-            "want_answer": want_answer
+            CC_HEADER_TYPE: CC_COMMAND_SEND,
+            CC_HEADER_FROM: self._lname,
+            CC_HEADER_TO: to,
+            CC_HEADER_GROUP: group,
+            CC_HEADER_INSTANCE: instance,
+            CC_HEADER_SEQ: seq,
+            CC_HEADER_WANT_ANSWER: want_answer
         }, isc.cc.message.to_wire(msg))
         }, isc.cc.message.to_wire(msg))
         return seq
         return seq
 
 

+ 30 - 38
src/lib/python/isc/cc/tests/session_test.py

@@ -377,46 +377,38 @@ class testSession(unittest.TestCase):
         sess = MySession()
         sess = MySession()
         self.assertEqual(sess._sequence, 1)
         self.assertEqual(sess._sequence, 1)
 
 
-        sess.group_sendmsg({ 'hello': 'a' }, "my_group")
-        sent = sess._socket.readsentmsg_parsed()
-        self.assertEqual(sent, ({"from": "test_name", "seq": 2, "to": "*",
-                                 "instance": "*", "group": "my_group",
-                                 "type": "send", "want_answer": False},
-                                {"hello": "a"}))
-        self.assertEqual(sess._sequence, 2)
+        msg = { "hello": "a" }
+
+        def check_sent(additional_headers, sequence):
+            sent = sess._socket.readsentmsg_parsed()
+            headers = dict({"from": "test_name",
+                            "seq": sequence,
+                            "to": "*",
+                            "type": "send"},
+                           **additional_headers)
+            self.assertEqual(sent, (headers, msg))
+            self.assertEqual(sess._sequence, sequence)
+
+        sess.group_sendmsg(msg, "my_group")
+        check_sent({"instance": "*", "group": "my_group",
+                    "want_answer": False}, 2)
+
+        sess.group_sendmsg(msg, "my_group", "my_instance")
+        check_sent({"instance": "my_instance", "group": "my_group",
+                    "want_answer": False}, 3)
+
+        sess.group_sendmsg(msg, "your_group", "your_instance")
+        check_sent({"instance": "your_instance", "group": "your_group",
+                    "want_answer": False}, 4)
 
 
-        sess.group_sendmsg({ 'hello': 'a' }, "my_group", "my_instance")
-        sent = sess._socket.readsentmsg_parsed()
-        self.assertEqual(sent, ({"from": "test_name", "seq": 3, "to": "*",
-                                 "instance": "my_instance",
-                                 "group": "my_group", "type": "send",
-                                 "want_answer": False},
-                                {"hello": "a"}))
-        self.assertEqual(sess._sequence, 3)
-
-        sess.group_sendmsg({ 'hello': 'a' }, "your_group", "your_instance")
-        sent = sess._socket.readsentmsg_parsed()
-        self.assertEqual(sent, ({"from": "test_name", "seq": 4, "to": "*",
-                                 "instance": "your_instance",
-                                 "group": "your_group", "type": "send",
-                                 "want_answer": False},
-                                {"hello": "a"}))
-        self.assertEqual(sess._sequence, 4)
         # Test the optional want_answer parameter
         # Test the optional want_answer parameter
-        sess.group_sendmsg({'hello': 'a'}, "group", want_answer=True)
-        sent = sess._socket.readsentmsg_parsed()
-        self.assertEqual(sent, ({"from": "test_name", "seq": 5, "to": "*",
-                                 "instance": "*", "group": "group", "type":
-                                 "send", "want_answer": True},
-                                {"hello": "a"}))
-        self.assertEqual(sess._sequence, 5)
-        sess.group_sendmsg({'hello': 'a'}, "group", want_answer=False)
-        sent = sess._socket.readsentmsg_parsed()
-        self.assertEqual(sent, ({"from": "test_name", "seq": 6, "to": "*",
-                                 "instance": "*", "group": "group", "type":
-                                 "send", "want_answer": False},
-                                {"hello": "a"}))
-        self.assertEqual(sess._sequence, 6)
+        sess.group_sendmsg(msg, "group", want_answer=True)
+        check_sent({"instance": "*", "group": "group", "want_answer": True}, 5)
+
+
+        sess.group_sendmsg(msg, "group", want_answer=False)
+        check_sent({"instance": "*", "group": "group", "want_answer": False},
+                   6)
 
 
     def test_group_recvmsg(self):
     def test_group_recvmsg(self):
         # must this one do anything except not return messages with
         # must this one do anything except not return messages with

+ 8 - 1
src/lib/python/isc/util/Makefile.am

@@ -1,6 +1,13 @@
 SUBDIRS = . cio tests
 SUBDIRS = . cio tests
 
 
-python_PYTHON = __init__.py process.py socketserver_mixin.py file.py
+python_PYTHON = __init__.py process.py socketserver_mixin.py file.py \
+		common_defs.py
+BUILT_SOURCES = common_defs.py
+CLEANFILES = $(BUILT_SOURCES)
+EXTRA_DIST = pythonize_constants.py
+
+common_defs.py: $(top_srcdir)/src/lib/util/common_defs.cc pythonize_constants.py
+	$(PYTHON) $(srcdir)/pythonize_constants.py $(top_srcdir)/src/lib/util/common_defs.cc $@
 
 
 pythondir = $(pyexecdir)/isc/util
 pythondir = $(pyexecdir)/isc/util
 
 

+ 49 - 0
src/lib/python/isc/util/pythonize_constants.py

@@ -0,0 +1,49 @@
+# Copyright (C) 2013  Internet Systems Consortium.
+#
+# Permission to use, copy, modify, and distribute this software for any
+# purpose with or without fee is hereby granted, provided that the above
+# copyright notice and this permission notice appear in all copies.
+#
+# THE SOFTWARE IS PROVIDED "AS IS" AND INTERNET SYSTEMS CONSORTIUM
+# DISCLAIMS ALL WARRANTIES WITH REGARD TO THIS SOFTWARE INCLUDING ALL
+# IMPLIED WARRANTIES OF MERCHANTABILITY AND FITNESS. IN NO EVENT SHALL
+# INTERNET SYSTEMS CONSORTIUM BE LIABLE FOR ANY SPECIAL, DIRECT,
+# INDIRECT, OR CONSEQUENTIAL DAMAGES OR ANY DAMAGES WHATSOEVER RESULTING
+# FROM LOSS OF USE, DATA OR PROFITS, WHETHER IN AN ACTION OF CONTRACT,
+# NEGLIGENCE OR OTHER TORTIOUS ACTION, ARISING OUT OF OR IN CONNECTION
+# WITH THE USE OR PERFORMANCE OF THIS SOFTWARE.
+
+import sys
+import re
+
+def die(message):
+    sys.stderr.write(message + "\n")
+    sys.exit(1)
+
+if len(sys.argv) != 3:
+    die("Usage: python3 ./pythonize_constants.py input.cpp output.py")
+
+[filename_in, filename_out] = sys.argv[1:3]
+
+# Ignore preprocessor, namespaces and the ends of namespaces.
+ignore = re.compile('^(#|namespace|})')
+comment = re.compile('^//(.*)')
+constant = re.compile('^[a-zA-Z].*?([a-zA-Z_0-9]+\\s*=.*);')
+
+with open(filename_in) as file_in, open(filename_out, "w") as file_out:
+    file_out.write("# This file is generated from " + filename_in + "\n" +
+                   "# by the pythonize_constants.py script.\n" +
+                   "# Do not edit, all changes will be lost.\n\n")
+    for line in file_in:
+        if ignore.match(line):
+            continue
+        # Mangle comments to be python-like
+        line = comment.sub('#\\1', line)
+        # Extract the constant.
+
+        # TODO: We may want to do something with the true vs. True and
+        # NULL vs. None and such. Left out for now, since none are in
+        # the input file currently.
+        line = constant.sub('\\1', line)
+
+        file_out.write(line)

+ 7 - 2
src/lib/util/Makefile.am

@@ -19,6 +19,7 @@ libb10_util_la_SOURCES += interprocess_sync_null.h interprocess_sync_null.cc
 libb10_util_la_SOURCES += memory_segment.h
 libb10_util_la_SOURCES += memory_segment.h
 libb10_util_la_SOURCES += memory_segment_local.h memory_segment_local.cc
 libb10_util_la_SOURCES += memory_segment_local.h memory_segment_local.cc
 libb10_util_la_SOURCES += range_utilities.h
 libb10_util_la_SOURCES += range_utilities.h
+libb10_util_la_SOURCES += common_defs.h common_defs.cc
 libb10_util_la_SOURCES += hash/sha1.h hash/sha1.cc
 libb10_util_la_SOURCES += hash/sha1.h hash/sha1.cc
 libb10_util_la_SOURCES += encode/base16_from_binary.h
 libb10_util_la_SOURCES += encode/base16_from_binary.h
 libb10_util_la_SOURCES += encode/base32hex.h encode/base64.h
 libb10_util_la_SOURCES += encode/base32hex.h encode/base64.h
@@ -29,10 +30,14 @@ libb10_util_la_SOURCES += encode/binary_from_base16.h
 libb10_util_la_SOURCES += random/qid_gen.h random/qid_gen.cc
 libb10_util_la_SOURCES += random/qid_gen.h random/qid_gen.cc
 libb10_util_la_SOURCES += random/random_number_generator.h
 libb10_util_la_SOURCES += random/random_number_generator.h
 
 
-EXTRA_DIST = python/pycppwrapper_util.h
+EXTRA_DIST = python/pycppwrapper_util.h const2hdr.py
+BUILT_SOURCES = common_defs.h
+
+common_defs.h: const2hdr.py common_defs.cc
+	$(PYTHON) $(srcdir)/const2hdr.py $(srcdir)/common_defs.cc $@
 
 
 libb10_util_la_LIBADD = $(top_builddir)/src/lib/exceptions/libb10-exceptions.la
 libb10_util_la_LIBADD = $(top_builddir)/src/lib/exceptions/libb10-exceptions.la
-CLEANFILES = *.gcno *.gcda
+CLEANFILES = *.gcno *.gcda common_defs.h
 
 
 libb10_util_includedir = $(includedir)/$(PACKAGE_NAME)/util
 libb10_util_includedir = $(includedir)/$(PACKAGE_NAME)/util
 libb10_util_include_HEADERS = buffer.h
 libb10_util_include_HEADERS = buffer.h

+ 44 - 0
src/lib/util/common_defs.cc

@@ -0,0 +1,44 @@
+// Copyright (C) 2013  Internet Systems Consortium, Inc. ("ISC")
+//
+// Permission to use, copy, modify, and/or distribute this software for any
+// purpose with or without fee is hereby granted, provided that the above
+// copyright notice and this permission notice appear in all copies.
+//
+// THE SOFTWARE IS PROVIDED "AS IS" AND ISC DISCLAIMS ALL WARRANTIES WITH
+// REGARD TO THIS SOFTWARE INCLUDING ALL IMPLIED WARRANTIES OF MERCHANTABILITY
+// AND FITNESS.  IN NO EVENT SHALL ISC BE LIABLE FOR ANY SPECIAL, DIRECT,
+// INDIRECT, OR CONSEQUENTIAL DAMAGES OR ANY DAMAGES WHATSOEVER RESULTING FROM
+// LOSS OF USE, DATA OR PROFITS, WHETHER IN AN ACTION OF CONTRACT, NEGLIGENCE
+// OR OTHER TORTIOUS ACTION, ARISING OUT OF OR IN CONNECTION WITH THE USE OR
+// PERFORMANCE OF THIS SOFTWARE.
+
+#include <util/common_defs.h>
+
+namespace isc {
+namespace util {
+
+// Aside from defining the values for the C++ util library, this file is also
+// used as direct input of the generator of the python counterpart. Please,
+// keep the syntax here simple and check the generated file
+// (lib/python/isc/util/common_defs.py) is correct and sane.
+
+// The constants used in the CC protocol
+// First the header names
+const char* CC_HEADER_TYPE = "type";
+const char* CC_HEADER_FROM = "from";
+const char* CC_HEADER_TO = "to";
+const char* CC_HEADER_GROUP = "group";
+const char* CC_HEADER_INSTANCE = "instance";
+const char* CC_HEADER_SEQ = "seq";
+const char* CC_HEADER_WANT_ANSWER = "want_answer";
+const char* CC_HEADER_REPLY = "reply";
+// The commands in the "type" header
+const char* CC_COMMAND_SEND = "send";
+// The wildcards of some headers
+const char* CC_TO_WILDCARD = "*";
+const char* CC_INSTANCE_WILDCARD = "*";
+// Reply codes
+const int CC_REPLY_NO_RECPT = -1;
+
+}
+}

+ 56 - 0
src/lib/util/const2hdr.py

@@ -0,0 +1,56 @@
+# Copyright (C) 2013  Internet Systems Consortium.
+#
+# Permission to use, copy, modify, and distribute this software for any
+# purpose with or without fee is hereby granted, provided that the above
+# copyright notice and this permission notice appear in all copies.
+#
+# THE SOFTWARE IS PROVIDED "AS IS" AND INTERNET SYSTEMS CONSORTIUM
+# DISCLAIMS ALL WARRANTIES WITH REGARD TO THIS SOFTWARE INCLUDING ALL
+# IMPLIED WARRANTIES OF MERCHANTABILITY AND FITNESS. IN NO EVENT SHALL
+# INTERNET SYSTEMS CONSORTIUM BE LIABLE FOR ANY SPECIAL, DIRECT,
+# INDIRECT, OR CONSEQUENTIAL DAMAGES OR ANY DAMAGES WHATSOEVER RESULTING
+# FROM LOSS OF USE, DATA OR PROFITS, WHETHER IN AN ACTION OF CONTRACT,
+# NEGLIGENCE OR OTHER TORTIOUS ACTION, ARISING OUT OF OR IN CONNECTION
+# WITH THE USE OR PERFORMANCE OF THIS SOFTWARE.
+
+import sys
+import re
+
+def die(message):
+    sys.stderr.write(message + "\n")
+    sys.exit(1)
+
+if len(sys.argv) != 3:
+    die("Usage: python3 ./const2hdr.py input.cpp output.h")
+
+[filename_in, filename_out] = sys.argv[1:3]
+
+preproc = re.compile('^#')
+constant = re.compile('^([a-zA-Z].*?[a-zA-Z_0-9]+)\\s*=.*;')
+
+with open(filename_in) as file_in, open(filename_out, "w") as file_out:
+    file_out.write("// This file is generated from " + filename_in + "\n" +
+                   "// by the const2hdr.py script.\n" +
+                   "// Do not edit, all changes will be lost.\n\n")
+    for line in file_in:
+        if preproc.match(line):
+            # There's only one preprocessor line in the .cc file. We abuse
+            # that to position the top part of the header.
+            file_out.write("#ifndef BIND10_COMMON_DEFS_H\n" +
+                           "#define BIND10_COMMON_DEFS_H\n" +
+                           "\n" +
+                           "// \\file " + filename_out + "\n" +
+'''// \\brief Common shared constants\n
+// This file contains common definitions of constasts used across the sources.
+// It includes, but is not limited to the definitions of messages sent from
+// one process to another. Since the names should be self-explanatory and
+// the variables here are used mostly to synchronize the same values across
+// multiple programs, separate documentation for each variable is not provided.
+''')
+            continue
+        # Extract the constant. Remove the values and add "extern"
+        line = constant.sub('extern \\1;', line)
+
+        file_out.write(line)
+
+    file_out.write("#endif\n")