Parcourir la source

[master] Merge branch 'trac2440'

fixed conflicts:
	doc/differences.txt
JINMEI Tatuya il y a 12 ans
Parent
commit
2323070601

+ 22 - 0
doc/differences.txt

@@ -6,6 +6,7 @@ BIND 9
 
 TODO: There are definitely more differences than just this.
 
+DNS zone transfer:
 * When an incoming zone transfer fails, for example because the
   received zone doesn't contain a NS record, bind 9 stops serving the
   zone and returns SERVFAIL to queries for that zone. Bind 10 still
@@ -18,3 +19,24 @@ RDATA implementations:
   should actually be NOT accepted per RFC 1035, but BIND 9 accepts them
   probably because of compatibility reasons.  Until our strict
   (and more correct) behavior causes operations issues, we'll keep it.
+
+DNS data sources:
+* In-memory data source does not sort RDATA of each RRset (in the
+  DNSSEC order) while BIND 9 normally sorts them internally.  The main
+  purpose of the BIND 9's behavior is to make the ordering
+  predictable, but if the RDATA are rotated in DNS responses (which
+  BIND 9 also does by default) the predictability wouldn't be that
+  useful for the clients.  So we skip the sorting in the BIND 10
+  implementation to simplify the implementation (and possibly make it
+  a bit more efficient).
+
+* If different RRs of the same RRset and their RRSIGs have different
+  TTL when loaded to the in-memory data source, the lowest TTL among
+  all RRs (whether it's the covered RRset or RRSIGs) will be used.
+  BIND 9 shows some inconsistent policy on this point for unknown
+  reason (sometimes the TTL of the first RR is used, sometimes the
+  latest one is used).  We differ here firstly for consistency, and
+  because it seems to be more compliant to the sense of RFC2181.
+  In any case, the administrator should make the TTLs same, especially
+  if the zone is signed, as described in RFC4034 (and, that will be
+  normally ensured by zone signing tools).

+ 202 - 19
src/lib/datasrc/memory/rdata_serialization.cc

@@ -25,13 +25,20 @@
 #include <dns/rrclass.h>
 #include <dns/rrtype.h>
 
+#include <boost/static_assert.hpp>
+#include <boost/function.hpp>
+#include <boost/bind.hpp>
+#include <boost/optional.hpp>
+
 #include <cassert>
 #include <cstring>
+#include <set>
 #include <vector>
-#include <boost/static_assert.hpp>
 
 using namespace isc::dns;
+using namespace isc::dns::rdata;
 using std::vector;
+using std::set;
 
 namespace isc {
 namespace datasrc {
@@ -222,7 +229,6 @@ getRdataEncodeSpec(const RRClass& rrclass, const RRType& rrtype) {
 }
 
 namespace {
-
 // This class is a helper for RdataEncoder to divide the content of RDATA
 // fields for encoding by "abusing" the  message rendering logic.
 // The idea is to identify domain name fields in the writeName() method,
@@ -368,16 +374,77 @@ private:
 
 } // end of unnamed namespace
 
+namespace {
+// A trivial comparison function used for std::set<ConstRdataPtr> below.
+bool
+RdataLess(const ConstRdataPtr& rdata1, const ConstRdataPtr& rdata2) {
+    return (rdata1->compare(*rdata2) < 0);
+}
+}
+
 struct RdataEncoder::RdataEncoderImpl {
     RdataEncoderImpl() : encode_spec_(NULL), rrsig_buffer_(0),
-                         rdata_count_(0)
+                         old_varlen_count_(0), old_sig_count_(0),
+                         old_data_len_(0), old_sig_len_(0),
+                         old_length_fields_(NULL), old_data_(NULL),
+                         old_sig_data_(NULL), olddata_buffer_(0),
+                         rdatas_(RdataLess), rrsigs_(RdataLess)
     {}
 
+    // Common initialization for RdataEncoder::start().
+    void start(RRClass rrclass, RRType rrtype) {
+        if (rrtype == RRType::RRSIG()) {
+            isc_throw(BadValue, "RRSIG cannot be encoded as main RDATA type");
+        }
+
+        encode_spec_ = &getRdataEncodeSpec(rrclass, rrtype);
+        current_class_ = rrclass;
+        current_type_ = rrtype;
+        field_composer_.clearLocal(encode_spec_);
+        rrsig_buffer_.clear();
+        rrsig_lengths_.clear();
+        old_varlen_count_ = 0;
+        old_sig_count_ = 0;
+        old_data_len_ = 0;
+        old_sig_len_ = 0;
+        old_length_fields_ = NULL;
+        old_data_ = NULL;
+        old_sig_data_ = NULL;
+        olddata_buffer_.clear();
+
+        rdatas_.clear();
+        rrsigs_.clear();
+    }
+
     const RdataEncodeSpec* encode_spec_; // encode spec of current RDATA set
     RdataFieldComposer field_composer_;
     util::OutputBuffer rrsig_buffer_;
-    size_t rdata_count_;
     vector<uint16_t> rrsig_lengths_;
+
+    // Placeholder for the RR class and type of the current session;
+    // initially null, and will be (re)set at the beginning of each session.
+    boost::optional<RRClass> current_class_;
+    boost::optional<RRType> current_type_;
+
+    // Parameters corresponding to the previously encoded data in the
+    // merge mode.
+    size_t old_varlen_count_;
+    size_t old_sig_count_;
+    size_t old_data_len_;
+    size_t old_sig_len_;
+    const void* old_length_fields_;
+    const void* old_data_;
+    const void* old_sig_data_;
+    util::OutputBuffer olddata_buffer_;
+
+    // Temporary storage of Rdata and RRSIGs to be encoded.  They are used
+    // to detect and ignore duplicate data.
+    typedef boost::function<bool(const ConstRdataPtr&, const ConstRdataPtr&)>
+    RdataCmp;
+    // added unique Rdatas
+    set<ConstRdataPtr, RdataCmp> rdatas_;
+    // added unique RRSIG Rdatas
+    set<ConstRdataPtr, RdataCmp> rrsigs_;
 };
 
 RdataEncoder::RdataEncoder() :
@@ -390,36 +457,127 @@ RdataEncoder::~RdataEncoder() {
 
 void
 RdataEncoder::start(RRClass rrclass, RRType rrtype) {
-    if (rrtype == RRType::RRSIG()) {
-        isc_throw(BadValue, "RRSIG cannot be encoded as main RDATA type");
-    }
+    impl_->start(rrclass, rrtype);
+}
 
-    impl_->encode_spec_ = &getRdataEncodeSpec(rrclass, rrtype);
-    impl_->field_composer_.clearLocal(impl_->encode_spec_);
-    impl_->rrsig_buffer_.clear();
-    impl_->rdata_count_ = 0;
-    impl_->rrsig_lengths_.clear();
+namespace {
+// Helper callbacks used in the merge mode of start().  These re-construct
+// each RDATA and RRSIG in the wire-format, counting the total length of the
+// encoded data fields.
+void
+decodeName(const LabelSequence& name_labels, RdataNameAttributes,
+           util::OutputBuffer* buffer, size_t* total_len)
+{
+    size_t name_dlen;
+    const uint8_t* name_data = name_labels.getData(&name_dlen);
+    buffer->writeData(name_data, name_dlen);
+    *total_len += name_labels.getSerializedLength();
 }
 
 void
-RdataEncoder::addRdata(const rdata::Rdata& rdata) {
+decodeData(const void* data, size_t data_len, util::OutputBuffer* buffer,
+           size_t* total_len)
+{
+    buffer->writeData(data, data_len);
+    *total_len += data_len;
+}
+}
+
+void
+RdataEncoder::start(RRClass rrclass, RRType rrtype, const void* old_data,
+                    size_t old_rdata_count, size_t old_sig_count)
+{
+    impl_->start(rrclass, rrtype);
+
+    // Identify start points of various fields of the encoded data and
+    // remember it in class variables.
+    const uint8_t* cp = static_cast<const uint8_t*>(old_data);
+    impl_->old_varlen_count_ =
+        impl_->encode_spec_->varlen_count * old_rdata_count;
+    if (impl_->old_varlen_count_ > 0 || old_sig_count > 0) {
+        impl_->old_length_fields_ = cp;
+        cp += (impl_->old_varlen_count_ + old_sig_count) * sizeof(uint16_t);
+    }
+    impl_->old_data_ = cp;
+    impl_->old_sig_count_ = old_sig_count;
+
+    // Re-construct RDATAs and RRSIGs in the form of Rdata objects, and
+    // keep them in rdatas_ and rrsigs_ so we can detect and ignore duplicate
+    // data with the existing one later.  We'll also figure out the lengths
+    // of the RDATA and RRSIG part of the data by iterating over the data
+    // fields.  Note that the given old_data shouldn't contain duplicate
+    // Rdata or RRSIG as they should have been generated by this own class,
+    // which ensures that condition; if this assumption doesn't hold, we throw.
+    size_t total_len = 0;
+    RdataReader reader(rrclass, rrtype, old_data, old_rdata_count,
+                       old_sig_count,
+                       boost::bind(decodeName, _1, _2, &impl_->olddata_buffer_,
+                                   &total_len),
+                       boost::bind(decodeData, _1, _2, &impl_->olddata_buffer_,
+                                   &total_len));
+    while (reader.iterateRdata()) {
+        util::InputBuffer ibuffer(impl_->olddata_buffer_.getData(),
+                                  impl_->olddata_buffer_.getLength());
+        if (!impl_->rdatas_.insert(
+                createRdata(rrtype, rrclass, ibuffer,
+                            impl_->olddata_buffer_.getLength())).second) {
+            isc_throw(Unexpected, "duplicate RDATA found in merging RdataSet");
+        }
+        impl_->olddata_buffer_.clear();
+    }
+    impl_->old_data_len_ = total_len;
+
+    total_len = 0;
+    while (reader.iterateSingleSig()) {
+        util::InputBuffer ibuffer(impl_->olddata_buffer_.getData(),
+                                  impl_->olddata_buffer_.getLength());
+        if (!impl_->rrsigs_.insert(
+                createRdata(RRType::RRSIG(), rrclass, ibuffer,
+                            impl_->olddata_buffer_.getLength())).second) {
+            isc_throw(Unexpected, "duplicate RRSIG found in merging RdataSet");
+        }
+        impl_->olddata_buffer_.clear();
+    }
+    impl_->old_sig_len_ = total_len;
+}
+
+bool
+RdataEncoder::addRdata(const Rdata& rdata) {
     if (impl_->encode_spec_ == NULL) {
         isc_throw(InvalidOperation,
                   "RdataEncoder::addRdata performed before start");
     }
 
+    // Simply ignore duplicate RDATA.  Creating RdataPtr also checks the
+    // given Rdata is of the correct RR type.
+    ConstRdataPtr rdatap = createRdata(*impl_->current_type_,
+                                       *impl_->current_class_, rdata);
+    if (impl_->rdatas_.find(rdatap) != impl_->rdatas_.end()) {
+        return (false);
+    }
+
     impl_->field_composer_.startRdata();
     rdata.toWire(impl_->field_composer_);
     impl_->field_composer_.endRdata();
-    ++impl_->rdata_count_;
+    impl_->rdatas_.insert(rdatap);
+
+    return (true);
 }
 
-void
-RdataEncoder::addSIGRdata(const rdata::Rdata& sig_rdata) {
+bool
+RdataEncoder::addSIGRdata(const Rdata& sig_rdata) {
     if (impl_->encode_spec_ == NULL) {
         isc_throw(InvalidOperation,
                   "RdataEncoder::addSIGRdata performed before start");
     }
+
+    // Ignore duplicate RRSIGs
+    ConstRdataPtr rdatap = createRdata(RRType::RRSIG(), *impl_->current_class_,
+                                       sig_rdata);
+    if (impl_->rrsigs_.find(rdatap) != impl_->rrsigs_.end()) {
+        return (false);
+    }
+
     const size_t cur_pos = impl_->rrsig_buffer_.getLength();
     sig_rdata.toWire(impl_->rrsig_buffer_);
     const size_t rrsig_datalen = impl_->rrsig_buffer_.getLength() - cur_pos;
@@ -427,7 +585,10 @@ RdataEncoder::addSIGRdata(const rdata::Rdata& sig_rdata) {
         isc_throw(RdataEncodingError, "RRSIG is too large: "
                   << rrsig_datalen << " bytes");
     }
+    impl_->rrsigs_.insert(rdatap);
     impl_->rrsig_lengths_.push_back(rrsig_datalen);
+
+    return (true);
 }
 
 size_t
@@ -437,8 +598,11 @@ RdataEncoder::getStorageLength() const {
                   "RdataEncoder::getStorageLength performed before start");
     }
 
-    return (sizeof(uint16_t) * impl_->field_composer_.data_lengths_.size() +
-            sizeof(uint16_t) * impl_->rrsig_lengths_.size() +
+    return (sizeof(uint16_t) * (impl_->old_varlen_count_ +
+                                impl_->old_sig_count_ +
+                                impl_->field_composer_.data_lengths_.size() +
+                                impl_->rrsig_lengths_.size()) +
+            impl_->old_data_len_ + impl_->old_sig_len_ +
             impl_->rrsig_buffer_.getLength() +
             impl_->field_composer_.getLength());
 }
@@ -461,6 +625,12 @@ RdataEncoder::encode(void* buf, size_t buf_len) const {
     uint8_t* dp = dp_beg;
     uint16_t* lenp = reinterpret_cast<uint16_t*>(buf);
 
+    // Encode list of lengths for variable length fields for old data (if any)
+    const size_t old_varlen_fields_len =
+        impl_->old_varlen_count_ * sizeof(uint16_t);
+    std::memcpy(lenp, impl_->old_length_fields_, old_varlen_fields_len);
+    lenp += impl_->old_varlen_count_;
+    dp += old_varlen_fields_len;
     // Encode list of lengths for variable length fields (if any)
     if (!impl_->field_composer_.data_lengths_.empty()) {
         const size_t varlen_fields_len =
@@ -470,6 +640,12 @@ RdataEncoder::encode(void* buf, size_t buf_len) const {
         lenp += impl_->field_composer_.data_lengths_.size();
         dp += varlen_fields_len;
     }
+    // Encode list of lengths for old RRSIGs (if any)
+    const size_t old_rrsigs_len = impl_->old_sig_count_ * sizeof(uint16_t);
+    std::memcpy(lenp, static_cast<const uint8_t*>(impl_->old_length_fields_) +
+                old_varlen_fields_len, old_rrsigs_len);
+    lenp += impl_->old_sig_count_;
+    dp += old_rrsigs_len;
     // Encode list of lengths for RRSIGs (if any)
     if (!impl_->rrsig_lengths_.empty()) {
         const size_t rrsigs_len =
@@ -477,10 +653,17 @@ RdataEncoder::encode(void* buf, size_t buf_len) const {
         std::memcpy(lenp, &impl_->rrsig_lengths_[0], rrsigs_len);
         dp += rrsigs_len;
     }
+    // Encode main old RDATA, if any
+    std::memcpy(dp, impl_->old_data_, impl_->old_data_len_);
+    dp += impl_->old_data_len_;
     // Encode main RDATA
     std::memcpy(dp, impl_->field_composer_.getData(),
                 impl_->field_composer_.getLength());
     dp += impl_->field_composer_.getLength();
+    // Encode old RRSIGs, if any
+    std::memcpy(dp, static_cast<const uint8_t*>(impl_->old_data_) +
+                impl_->old_data_len_, impl_->old_sig_len_);
+    dp += impl_->old_sig_len_;
     // Encode RRSIGs, if any
     std::memcpy(dp, impl_->rrsig_buffer_.getData(),
                 impl_->rrsig_buffer_.getLength());
@@ -501,7 +684,7 @@ RdataReader::RdataReader(const RRClass& rrclass, const RRType& rrtype,
     var_count_total_(spec_.varlen_count * rdata_count),
     sig_count_(sig_count),
     spec_count_(spec_.field_count * rdata_count),
-    // The lenghts are stored first
+    // The lengths are stored first
     lengths_(reinterpret_cast<const uint16_t*>(data)),
     // And the data just after all the lengths
     data_(reinterpret_cast<const uint8_t*>(data) +

+ 63 - 5
src/lib/datasrc/memory/rdata_serialization.h

@@ -25,8 +25,6 @@
 #include <boost/function.hpp>
 #include <boost/noncopyable.hpp>
 
-#include <vector>
-
 /// \file rdata_serialization.h
 ///
 /// This file defines a set of interfaces (classes, types, constants) to
@@ -157,6 +155,47 @@ public:
     /// \param rrtype The RR type of RDATA to be encoded in the session.
     void start(dns::RRClass rrclass, dns::RRType rrtype);
 
+    /// \brief Start the encoding session in the merge mode.
+    ///
+    /// This method is similar to the other version, but begins with a copy
+    /// of previously encoded data and merges Rdata and RRSIGs into it
+    /// that will be given via subsequent calls to \c addRdata() and
+    /// \c addSIGRdata().  \c old_data, \c old_rdata_count, and
+    /// \c old_sig_count correspond to parameters given to the
+    /// \c RdataReader constructor, and must have valid values for encoded
+    /// data by this class for the same \c rrclass and \c rrtype.
+    /// It's the caller's responsibility to ensure this condition; if it's
+    /// not met, the behavior will be undefined.
+    ///
+    /// The caller must also ensure that previously encoded data (pointed
+    /// to by \c old_data) will be valid and intact throughout the encoding
+    /// session started by this method.  The resulting encoded data (by
+    /// \c encode()) won't refer to the previous data, so once encoding the
+    /// merged data is completed (and unless this encoding session continues
+    /// for another attempt of encoding, which is unlikely), the caller can
+    /// modify or destroy the old data.
+    ///
+    /// The caller must also ensure that \c old_data don't contain any
+    /// duplicate Rdata or RRSIG.  Normally the caller doesn't have to do
+    /// anything special to meet this requirement, though, as the data
+    /// should have been generated by an \c RdataEncoder object before,
+    /// which guarantees that condition.  But this method checks the
+    /// assumption in case it was crafted or otherwise broken data, and
+    /// throws an exception if that is the case.
+    ///
+    /// \throw Unexpected Given encoded data contain duplicate Rdata or RRSIG
+    /// (normally shouldn't happen, see the description).
+    ///
+    /// \param rrclass The RR class of RDATA to be encoded in the session.
+    /// \param rrtype The RR type of RDATA to be encoded in the session.
+    /// \param old_data Point to previously encoded data for the same RR
+    /// class and type.
+    /// \param old_rdata_count The number of RDATAs stored in \c old_data.
+    /// \param old_sig_count The number of RRSIGs stored in \c old_data.
+    void start(dns::RRClass rrclass, dns::RRType rrtype,
+               const void* old_data, size_t old_rdata_count,
+               size_t old_sig_count);
+
     /// \brief Add an RDATA for encoding.
     ///
     /// This method updates internal state of the \c RdataEncoder() with the
@@ -168,6 +207,14 @@ public:
     /// to some extent, but the check is not complete; this is generally
     /// the responsibility of the caller.
     ///
+    /// This method checks if the given RDATA is a duplicate of already
+    /// added one (including ones encoded in the old data if the session
+    /// began with the merge mode).  If it's a duplicate this method ignores
+    /// the given RDATA and returns false; otherwise it returns true.
+    /// The check is based on the comparison in the "canonical form" as
+    /// described in RFC4034 Section 6.2.  In particular, domain name fields
+    /// of the RDATA are generally compared in case-insensitive manner.
+    ///
     /// The caller can destroy \c rdata after this call is completed.
     ///
     /// \note This implementation does not support RDATA (or any subfield of
@@ -183,12 +230,14 @@ public:
     /// new session from \c start() should this method throws an exception.
     ///
     /// \throw InvalidOperation called before start().
-    /// \throw BadValue inconsistent data found.
+    /// \throw std::bad_cast The given Rdata is of different RR type.
     /// \throw RdataEncodingError A very unusual case, such as over 64KB RDATA.
     /// \throw std::bad_alloc Internal memory allocation failure.
     ///
     /// \param rdata An RDATA to be encoded in the session.
-    void addRdata(const dns::rdata::Rdata& rdata);
+    /// \return true if the given RDATA was added to encode; false if
+    /// it's a duplicate and ignored.
+    bool addRdata(const dns::rdata::Rdata& rdata);
 
     /// \brief Add an RRSIG RDATA for encoding.
     ///
@@ -204,6 +253,13 @@ public:
     /// it could even accept any type of RDATA as opaque data.  It's caller's
     /// responsibility to ensure the assumption.
     ///
+    /// This method checks if the given RRSIG RDATA is a duplicate of already
+    /// added one (including ones encoded in the old data if the session
+    /// began with the merge mode).  If it's a duplicate this method ignores
+    /// the given RRSIG and returns false; otherwise it returns true.
+    /// The check is based on the comparison in the "canonical form" as
+    /// described in RFC4034 Section 6.2.
+    ///
     /// The caller can destroy \c rdata after this call is completed.
     ///
     /// \note Like addRdata(), this implementation does not support
@@ -218,7 +274,9 @@ public:
     ///
     /// \param sig_rdata An RDATA to be encoded in the session.  Supposed to
     /// be of type RRSIG.
-    void addSIGRdata(const dns::rdata::Rdata& sig_rdata);
+    /// \return true if the given RRSIG RDATA was added to encode; false if
+    /// it's a duplicate and ignored.
+    bool addSIGRdata(const dns::rdata::Rdata& sig_rdata);
 
     /// \brief Return the length of space for encoding for the session.
     ///

+ 64 - 21
src/lib/datasrc/memory/rdataset.cc

@@ -26,6 +26,7 @@
 #include <boost/static_assert.hpp>
 
 #include <stdint.h>
+#include <algorithm>
 #include <cstring>
 #include <typeinfo>             // for bad_cast
 #include <new>                  // for the placement new
@@ -48,11 +49,39 @@ getCoveredType(const Rdata& rdata) {
         isc_throw(BadValue, "Non RRSIG is given where it's expected");
     }
 }
+
+// A helper for lowestTTL: restore RRTTL object from wire-format 32-bit data.
+RRTTL
+restoreTTL(const void* ttl_data) {
+    isc::util::InputBuffer b(ttl_data, sizeof(uint32_t));
+    return (RRTTL(b));
+}
+
+// A helper function for create(): return the TTL that has lowest value
+// amount the given those of given rdataset (if non NULL), rrset, sig_rrset.
+RRTTL
+lowestTTL(const RdataSet* rdataset, ConstRRsetPtr& rrset,
+          ConstRRsetPtr& sig_rrset)
+{
+    if (rrset && sig_rrset) {
+        const RRTTL tmp(std::min(rrset->getTTL(), sig_rrset->getTTL()));
+        return (rdataset ?
+                std::min(restoreTTL(rdataset->getTTLData()), tmp) : tmp);
+    } else if (rrset) {
+        return (rdataset ? std::min(restoreTTL(rdataset->getTTLData()),
+                                    rrset->getTTL()) : rrset->getTTL());
+    } else {
+        return (rdataset ? std::min(restoreTTL(rdataset->getTTLData()),
+                                    sig_rrset->getTTL()) :
+                sig_rrset->getTTL());
+    }
+}
 }
 
 RdataSet*
 RdataSet::create(util::MemorySegment& mem_sgmt, RdataEncoder& encoder,
-                 ConstRRsetPtr rrset, ConstRRsetPtr sig_rrset)
+                 ConstRRsetPtr rrset, ConstRRsetPtr sig_rrset,
+                 const RdataSet* old_rdataset)
 {
     // Check basic validity
     if (!rrset && !sig_rrset) {
@@ -68,31 +97,40 @@ RdataSet::create(util::MemorySegment& mem_sgmt, RdataEncoder& encoder,
         isc_throw(BadValue, "RR class doesn't match between RRset and RRSIG");
     }
 
-    // Check assumptions on the number of RDATAs
-    if (rrset && rrset->getRdataCount() > MAX_RDATA_COUNT) {
-        isc_throw(RdataSetError, "Too many RDATAs for RdataSet: "
-                  << rrset->getRdataCount() << ", must be <= "
-                  << MAX_RDATA_COUNT);
-    }
-    if (sig_rrset && sig_rrset->getRdataCount() > MAX_RRSIG_COUNT) {
-        isc_throw(RdataSetError, "Too many RRSIGs for RdataSet: "
-                  << sig_rrset->getRdataCount() << ", must be <= "
-                  << MAX_RRSIG_COUNT);
-    }
-
     const RRClass rrclass = rrset ? rrset->getClass() : sig_rrset->getClass();
     const RRType rrtype = rrset ? rrset->getType() :
         getCoveredType(sig_rrset->getRdataIterator()->getCurrent());
-    const RRTTL rrttl = rrset ? rrset->getTTL() : sig_rrset->getTTL();
+    if (old_rdataset && old_rdataset->type != rrtype) {
+        isc_throw(BadValue, "RR type doesn't match for merging RdataSet");
+    }
+    const RRTTL rrttl = lowestTTL(old_rdataset, rrset, sig_rrset);
+    if (old_rdataset) {
+        encoder.start(rrclass, rrtype, old_rdataset->getDataBuf(),
+                      old_rdataset->getRdataCount(),
+                      old_rdataset->getSigRdataCount());
+    } else {
+        encoder.start(rrclass, rrtype);
+    }
 
-    encoder.start(rrclass, rrtype);
+    // Store RDATAs to be added and check assumptions on the number of them
+    size_t rdata_count = old_rdataset ? old_rdataset->getRdataCount() : 0;
     if (rrset) {
         for (RdataIteratorPtr it = rrset->getRdataIterator();
              !it->isLast();
              it->next()) {
-            encoder.addRdata(it->getCurrent());
+            if (encoder.addRdata(it->getCurrent())) {
+                ++rdata_count;
+            }
         }
     }
+    if (rdata_count > MAX_RDATA_COUNT) {
+        isc_throw(RdataSetError, "Too many RDATAs for RdataSet: "
+                  << rrset->getRdataCount() << ", must be <= "
+                  << MAX_RDATA_COUNT);
+    }
+
+    // Same for RRSIG
+    size_t rrsig_count = old_rdataset ? old_rdataset->getSigRdataCount() : 0;
     if (sig_rrset) {
         for (RdataIteratorPtr it = sig_rrset->getRdataIterator();
              !it->isLast();
@@ -101,19 +139,24 @@ RdataSet::create(util::MemorySegment& mem_sgmt, RdataEncoder& encoder,
             if (getCoveredType(it->getCurrent()) != rrtype) {
                 isc_throw(BadValue, "Type covered doesn't match");
             }
-            encoder.addSIGRdata(it->getCurrent());
+            if (encoder.addSIGRdata(it->getCurrent())) {
+                ++rrsig_count;
+            }
         }
     }
+    if (rrsig_count > MAX_RRSIG_COUNT) {
+        isc_throw(RdataSetError, "Too many RRSIGs for RdataSet: "
+                  << sig_rrset->getRdataCount() << ", must be <= "
+                  << MAX_RRSIG_COUNT);
+    }
 
-    const size_t rrsig_count = sig_rrset ? sig_rrset->getRdataCount() : 0;
     const size_t ext_rrsig_count_len =
         rrsig_count >= MANY_RRSIG_COUNT ? sizeof(uint16_t) : 0;
     const size_t data_len = encoder.getStorageLength();
     void* p = mem_sgmt.allocate(sizeof(RdataSet) + ext_rrsig_count_len +
                                 data_len);
-    RdataSet* rdataset = new(p) RdataSet(rrtype,
-                                         rrset ? rrset->getRdataCount() : 0,
-                                         rrsig_count, rrttl);
+    RdataSet* rdataset = new(p) RdataSet(rrtype, rdata_count, rrsig_count,
+                                         rrttl);
     if (rrsig_count >= MANY_RRSIG_COUNT) {
         *rdataset->getExtSIGCountBuf() = rrsig_count;
     }

+ 28 - 4
src/lib/datasrc/memory/rdataset.h

@@ -120,6 +120,27 @@ public:
     /// RRSIG from the given memory segment, constructs the object, and
     /// returns a pointer to it.
     ///
+    /// If the optional \c old_rdataset parameter is set to non NULL,
+    /// The given \c RdataSet, RRset, RRSIG will be merged in the new
+    /// \c Rdataset object: the new object contain the union set of all
+    /// RDATA and RRSIGs contained in these.  Obviously \c old_rdataset
+    /// was previously generated for the same RRClass and RRType as those
+    /// for the given RRsets; it's the caller's responsibility to ensure
+    /// this condition.  If it's not met the result will be undefined.
+    ///
+    /// In both cases, this method ensures the stored RDATA and RRSIG are
+    /// unique.  Any duplicate data (in the sense of the comparison in the
+    /// form of canonical form of RRs as described in RFC4034) within RRset or
+    /// RRSIG, or between data in \c old_rdataset and RRset/RRSIG will be
+    /// unified.
+    ///
+    /// In general, the TTLs of the given data are expected to be the same.
+    /// This is especially the case if the zone is signed (and RRSIG is given).
+    /// However, if different TTLs are found among the given data, this
+    /// method chooses the lowest one for the TTL of the resulting
+    /// \c RdataSet.  This is an implementation choice, but should be most
+    /// compliant to the sense of Section 5.2 of RFC2181.
+    ///
     /// Normally the (non RRSIG) RRset is given (\c rrset is not NULL) while
     /// its RRSIG (\c sig_rrset) may or may not be provided.  But it's also
     /// expected that in some rare (mostly broken) cases there can be an RRSIG
@@ -148,9 +169,9 @@ public:
     /// happens.
     ///
     /// Due to implementation limitations, this class cannot contain more than
-    /// 8191 RDATAs for the non RRISG RRset; also, it cannot contain more than
-    /// 65535 RRSIGs.  If the given RRset(s) fail to meet this condition,
-    /// an \c RdataSetError exception will be thrown.
+    /// 8191 RDATAs (after unifying duplicates) for the non RRISG RRset; also,
+    /// it cannot contain more than 65535 RRSIGs.  If the given RRset(s) fail
+    /// to meet this condition, an \c RdataSetError exception will be thrown.
     ///
     /// \throw isc::BadValue Given RRset(s) are invalid (see the description)
     /// \throw RdataSetError Number of RDATAs exceed the limits
@@ -164,12 +185,15 @@ public:
     /// created.  Can be NULL if sig_rrset is not.
     /// \param sig_rrset An RRSIG RRset from which the \c RdataSet is to be
     /// created.  Can be NULL if rrset is not.
+    /// \param old_rdataset If non NULL, create RdataSet merging old_rdataset
+    /// into given rrset and sig_rrset.
     ///
     /// \return A pointer to the created \c RdataSet.
     static RdataSet* create(util::MemorySegment& mem_sgmt,
                             RdataEncoder& encoder,
                             dns::ConstRRsetPtr rrset,
-                            dns::ConstRRsetPtr sig_rrset);
+                            dns::ConstRRsetPtr sig_rrset,
+                            const RdataSet* old_rdataset = NULL);
 
     /// \brief Destruct and deallocate \c RdataSet
     ///

+ 278 - 39
src/lib/datasrc/tests/memory/rdata_serialization_unittest.cc

@@ -34,7 +34,9 @@
 #include <boost/foreach.hpp>
 
 #include <cstring>
+#include <algorithm>
 #include <set>
+#include <stdexcept>
 #include <string>
 #include <vector>
 
@@ -131,7 +133,7 @@ protected:
                                           "20120715220826 12345 com. FAKE"))
     {}
 
-    // A wraper for RdataEncoder::encode() with buffer overrun check.
+    // A wrapper for RdataEncoder::encode() with buffer overrun check.
     void encodeWrapper(size_t data_len);
 
     // Some commonly used RDATA
@@ -159,14 +161,28 @@ public:
     // in the wire format.  It then compares the wire data with the one
     // generated by the normal libdns++ interface to see the encoding/decoding
     // works as intended.
+    // By default it encodes the given RDATAs from the scratch; if old_data
+    // is non NULL, the test case assumes it points to previously encoded data
+    // and the given RDATAs are to be merged with it.  old_rdata/rrsig_count
+    // will be set to the number of RDATAs and RRSIGs encoded in old_data.
+    // These "count" variables must not be set to non 0 unless old_data is
+    // non NULL, but it's not checked in this methods; it's the caller's
+    // responsibility to ensure that.  rdata_list and rrsig_list should contain
+    // all RDATAs and RRSIGs included those stored in old_data.
     void checkEncode(RRClass rrclass, RRType rrtype,
                      const vector<ConstRdataPtr>& rdata_list,
                      size_t expected_varlen_fields,
                      const vector<ConstRdataPtr>& rrsig_list =
-                     vector<ConstRdataPtr>());
+                     vector<ConstRdataPtr>(),
+                     const void* old_data = NULL,
+                     size_t old_rdata_count = 0,
+                     size_t old_rrsig_count = 0);
 
     void addRdataCommon(const vector<ConstRdataPtr>& rrsigs);
-    void addRdataMultiCommon(const vector<ConstRdataPtr>& rrsigs);
+    void addRdataMultiCommon(const vector<ConstRdataPtr>& rrsigs,
+                             bool duplicate = false);
+    void mergeRdataCommon(const vector<ConstRdataPtr>& old_rrsigs,
+                          const vector<ConstRdataPtr>& rrsigs);
 };
 
 // Used across more classes and scopes. But it's just uninteresting
@@ -272,11 +288,12 @@ public:
                        size_t rdata_count,
                        size_t rrsig_count,
                        size_t expected_varlen_fields,
-                       // Warning: this test actualy might change the
-                       // encoded_data !
-                       vector<uint8_t>& encoded_data, size_t,
+                       const vector<uint8_t>& encoded_data_orig, size_t,
                        MessageRenderer& renderer)
     {
+        // Make a manual copy, which we're going to modify.
+        vector<uint8_t> encoded_data = encoded_data_orig;
+
         // If this type of RDATA is expected to contain variable-length fields,
         // we brute force the encoded data, exploiting our knowledge of actual
         // encoding, then adjust the encoded data excluding the list of length
@@ -532,13 +549,20 @@ RdataSerializationTest::encodeWrapper(size_t data_len) {
     encoded_data_.resize(data_len);
 }
 
+bool
+rdataMatch(ConstRdataPtr rdata1, ConstRdataPtr rdata2) {
+    return (rdata1->compare(*rdata2) == 0);
+}
+
 template<class DecoderStyle>
 void
 RdataEncodeDecodeTest<DecoderStyle>::
 checkEncode(RRClass rrclass, RRType rrtype,
             const vector<ConstRdataPtr>& rdata_list,
             size_t expected_varlen_fields,
-            const vector<ConstRdataPtr>& rrsig_list)
+            const vector<ConstRdataPtr>& rrsig_list,
+            const void* old_data, size_t old_rdata_count,
+            size_t old_rrsig_count)
 {
     // These two names will be rendered before and after the test RDATA,
     // to check in case the RDATA contain a domain name whether it's
@@ -552,34 +576,70 @@ checkEncode(RRClass rrclass, RRType rrtype,
     actual_renderer_.clear();
     encoded_data_.clear();
 
-    // Build expected wire-format data
+    // Build expected wire-format data, skipping duplicate Rdata.
     expected_renderer_.writeName(dummy_name);
+    vector<ConstRdataPtr> rdata_uniq_list;
     BOOST_FOREACH(const ConstRdataPtr& rdata, rdata_list) {
-        rdata->toWire(expected_renderer_);
+        if (std::find_if(rdata_uniq_list.begin(), rdata_uniq_list.end(),
+                         boost::bind(rdataMatch, rdata, _1)) ==
+            rdata_uniq_list.end()) {
+            rdata_uniq_list.push_back(rdata);
+            rdata->toWire(expected_renderer_);
+        }
     }
     expected_renderer_.writeName(dummyName2());
+    vector<ConstRdataPtr> rrsig_uniq_list;
     BOOST_FOREACH(const ConstRdataPtr& rdata, rrsig_list) {
-        rdata->toWire(expected_renderer_);
+        if (std::find_if(rrsig_uniq_list.begin(), rrsig_uniq_list.end(),
+                         boost::bind(rdataMatch, rdata, _1)) ==
+            rrsig_uniq_list.end()) {
+            rrsig_uniq_list.push_back(rdata);
+            rdata->toWire(expected_renderer_);
+        }
     }
 
     // Then build wire format data using the encoded data.
     // 1st dummy name
     actual_renderer_.writeName(dummy_name);
 
-    // Create encoded data
-    encoder_.start(rrclass, rrtype);
+    // Create encoded data.  If old_xxx_count > 0, that part should be in
+    // old_data, so should be excluded from addRdata/addSIGRdata.
+    if (old_data) {
+        encoder_.start(rrclass, rrtype, old_data, old_rdata_count,
+                       old_rrsig_count);
+    } else {
+        encoder_.start(rrclass, rrtype);
+    }
+    size_t count = 0;
+    std::vector<ConstRdataPtr> encoded; // for duplicate check include old
     BOOST_FOREACH(const ConstRdataPtr& rdata, rdata_list) {
-        encoder_.addRdata(*rdata);
+        if (++count > old_rdata_count) {
+            const bool uniq =
+                (std::find_if(encoded.begin(), encoded.end(),
+                              boost::bind(rdataMatch, rdata, _1)) ==
+                 encoded.end());
+            EXPECT_EQ(uniq, encoder_.addRdata(*rdata));
+        }
+        encoded.push_back(rdata); // we need to remember old rdata too
     }
+    count = 0;
+    encoded.clear();
     BOOST_FOREACH(const ConstRdataPtr& rdata, rrsig_list) {
-        encoder_.addSIGRdata(*rdata);
+        if (++count > old_rrsig_count) {
+            const bool uniq =
+                (std::find_if(encoded.begin(), encoded.end(),
+                              boost::bind(rdataMatch, rdata, _1)) ==
+                 encoded.end());
+            EXPECT_EQ(uniq, encoder_.addSIGRdata(*rdata));
+        }
+        encoded.push_back(rdata);
     }
     const size_t storage_len = encoder_.getStorageLength();
     encodeWrapper(storage_len);
 
-    DecoderStyle::decode(rrclass, rrtype, rdata_list.size(), rrsig_list.size(),
-                         expected_varlen_fields, encoded_data_, storage_len,
-                         actual_renderer_);
+    DecoderStyle::decode(rrclass, rrtype, rdata_uniq_list.size(),
+                         rrsig_uniq_list.size(), expected_varlen_fields,
+                         encoded_data_, storage_len, actual_renderer_);
 
     // Two sets of wire-format data should be identical.
     matchWireData(expected_renderer_.getData(), expected_renderer_.getLength(),
@@ -619,7 +679,7 @@ TYPED_TEST(RdataEncodeDecodeTest, addRdata) {
 template<class DecoderStyle>
 void
 RdataEncodeDecodeTest<DecoderStyle>::
-addRdataMultiCommon(const vector<ConstRdataPtr>& rrsigs) {
+addRdataMultiCommon(const vector<ConstRdataPtr>& rrsigs, bool duplicate) {
     // Similar to addRdata(), but test with multiple RDATAs.
     // Four different cases are tested: a single fixed-len RDATA (A),
     // fixed-len data + domain name (MX), variable-len data only (TXT),
@@ -629,12 +689,19 @@ addRdataMultiCommon(const vector<ConstRdataPtr>& rrsigs) {
     rdata_list_.clear();
     rdata_list_.push_back(a_rdata_);
     rdata_list_.push_back(a_rdata2);
+    if (duplicate) {      // if duplicate is true, add duplicate Rdata
+        rdata_list_.push_back(a_rdata_);
+    }
     checkEncode(RRClass::IN(), RRType::A(), rdata_list_, 0, rrsigs);
 
     ConstRdataPtr mx_rdata1 = createRdata(RRType::MX(), RRClass::IN(),
                                           "5 mx1.example.com.");
     ConstRdataPtr mx_rdata2 = createRdata(RRType::MX(), RRClass::IN(),
                                           "10 mx2.example.com.");
+    if (duplicate) { // check duplicate detection is case insensitive for names
+        rdata_list_.push_back(createRdata(RRType::MX(), RRClass::IN(),
+                                          "5 MX1.example.COM."));
+    }
     rdata_list_.clear();
     rdata_list_.push_back(mx_rdata1);
     rdata_list_.push_back(mx_rdata2);
@@ -644,6 +711,9 @@ addRdataMultiCommon(const vector<ConstRdataPtr>& rrsigs) {
                                            "foo bar baz");
     ConstRdataPtr txt_rdata2 = createRdata(RRType::TXT(), RRClass::IN(),
                                           "another text data");
+    if (duplicate) {
+        rdata_list_.push_back(txt_rdata1);
+    }
     rdata_list_.clear();
     rdata_list_.push_back(txt_rdata1);
     rdata_list_.push_back(txt_rdata2);
@@ -655,6 +725,9 @@ addRdataMultiCommon(const vector<ConstRdataPtr>& rrsigs) {
     ConstRdataPtr naptr_rdata2 =
         createRdata(RRType::NAPTR(), RRClass::IN(),
                     "200 100 \"s\" \"http\" \"\" _http._tcp.example.com");
+    if (duplicate) {
+        rdata_list_.push_back(naptr_rdata1);
+    }
     rdata_list_.clear();
     rdata_list_.push_back(naptr_rdata1);
     rdata_list_.push_back(naptr_rdata2);
@@ -705,12 +778,18 @@ TYPED_TEST(RdataEncodeDecodeTest, addRdataMulti) {
     vector<ConstRdataPtr> rrsigs;
     this->addRdataMultiCommon(rrsigs); // test without RRSIGs (empty vector)
 
+    this->addRdataMultiCommon(rrsigs, true); // ditto, but with duplicated data
+
     // Tests with two RRSIGs
     rrsigs.push_back(this->rrsig_rdata_);
     rrsigs.push_back(createRdata(RRType::RRSIG(), RRClass::IN(),
                                  "A 5 2 3600 20120814220826 "
                                  "20120715220826 54321 com. FAKE"));
     this->addRdataMultiCommon(rrsigs);
+
+    // Similar to the previous, but with duplicate RRSIG.
+    rrsigs.push_back(this->rrsig_rdata_);
+    this->addRdataMultiCommon(rrsigs, true);
 }
 
 TEST_F(RdataSerializationTest, badAddRdata) {
@@ -732,50 +811,44 @@ TEST_F(RdataSerializationTest, badAddRdata) {
     EXPECT_THROW(encoder_.encode(&encoded_data_[0], buf_len - 1),
                  isc::BadValue);
 
-    // Type of RDATA and the specified RR type don't match.  addRdata() should
-    // detect this inconsistency.
+    // Some of the following checks confirm that adding an Rdata of the
+    // wrong RR type will be rejected.  Several different cases are checked,
+    // but there shouldn't be any essential difference among these cases in
+    // the tested code; these cases were considered because in an older version
+    // of implementation rejected them for possibly different reasons, and
+    // we simply keep these cases as they are not so many (and may help detect
+    // future possible regression).
     encoder_.start(RRClass::IN(), RRType::AAAA());
-    EXPECT_THROW(encoder_.addRdata(*a_rdata_), isc::BadValue);
+    EXPECT_THROW(encoder_.addRdata(*a_rdata_), std::bad_cast);
 
-    // Likewise.
     encoder_.start(RRClass::IN(), RRType::A());
-    EXPECT_THROW(encoder_.addRdata(*aaaa_rdata_), isc::BadValue);
+    EXPECT_THROW(encoder_.addRdata(*aaaa_rdata_), std::bad_cast);
 
-    // Likewise.  The encoder expects the first name completes the data, and
-    // throws on the second due as an unexpected name field.
     const ConstRdataPtr rp_rdata =
         createRdata(RRType::RP(), RRClass::IN(), "a.example. b.example");
     encoder_.start(RRClass::IN(), RRType::NS());
-    EXPECT_THROW(encoder_.addRdata(*rp_rdata), isc::BadValue);
+    EXPECT_THROW(encoder_.addRdata(*rp_rdata), std::bad_cast);
 
-    // Likewise.  The encoder considers the name data a variable length data
-    // field, and throws on the first name.
     encoder_.start(RRClass::IN(), RRType::DHCID());
-    EXPECT_THROW(encoder_.addRdata(*rp_rdata), isc::BadValue);
+    EXPECT_THROW(encoder_.addRdata(*rp_rdata), std::bad_cast);
 
-    // Likewise.  The text RDATA (2 bytes) will be treated as MX preference,
-    // and the encoder will still expect to see a domain name.
     const ConstRdataPtr txt_rdata = createRdata(RRType::TXT(), RRClass::IN(),
                                                 "a");
     encoder_.start(RRClass::IN(), RRType::MX());
-    EXPECT_THROW(encoder_.addRdata(*txt_rdata), isc::BadValue);
+    EXPECT_THROW(encoder_.addRdata(*txt_rdata), std::bad_cast);
 
-    // Similar to the previous one, but in this case there's no data field
-    // in the spec.
     encoder_.start(RRClass::IN(), RRType::NS());
-    EXPECT_THROW(encoder_.addRdata(*txt_rdata), isc::BadValue);
+    EXPECT_THROW(encoder_.addRdata(*txt_rdata), std::bad_cast);
 
-    // Likewise.  Inconsistent name compression policy.
     const ConstRdataPtr ns_rdata =
         createRdata(RRType::NS(), RRClass::IN(), "ns.example.");
     encoder_.start(RRClass::IN(), RRType::DNAME());
-    EXPECT_THROW(encoder_.addRdata(*ns_rdata), isc::BadValue);
+    EXPECT_THROW(encoder_.addRdata(*ns_rdata), std::bad_cast);
 
-    // Same as the previous one, opposite inconsistency.
     const ConstRdataPtr dname_rdata =
         createRdata(RRType::DNAME(), RRClass::IN(), "dname.example.");
     encoder_.start(RRClass::IN(), RRType::NS());
-    EXPECT_THROW(encoder_.addRdata(*dname_rdata), isc::BadValue);
+    EXPECT_THROW(encoder_.addRdata(*dname_rdata), std::bad_cast);
 
     // RDATA len exceeds the 16-bit range.  Technically not invalid, but
     // we don't support that (and it's practically useless anyway).
@@ -791,6 +864,172 @@ TEST_F(RdataSerializationTest, badAddRdata) {
                  isc::BadValue);
 }
 
+struct MergeTestData {
+    const char* const type_txt; // "AAAA", "NS", etc
+    const char* const rdata_txt1; // RDATA possibly used for old data
+    const char* const rdata_txt2; // RDATA possibly used for new data
+    const char* const rdata_txt3; // optional data to check with multi-RDATAs
+    const size_t varlen_fields; // number of variable-len fields in RDATA
+} merge_test_data[] = {
+    // For test with fixed-length RDATA
+    {"A", "192.0.2.53", "192.0.2.54", "192.0.2.55", 0},
+    // For test with variable-length RDATA
+    {"TXT", "foo bar baz", "another text data", "yet another", 1},
+    // For test with RDATA containing domain name
+    {"MX", "5 mx1.example.com.", "10 mx2.example.com.", "20 mx.example.", 0},
+    {NULL, NULL, NULL, NULL, 0}
+};
+
+// Identifier for slightly difference modes of "merge data" test below.
+// We test various combinations on # of old (before merge) and new (being
+// merged) RDATAs.
+enum MergeTestMode {
+    ONE_OLD_ONE_NEW = 0,
+    MULTI_OLD_NO_NEW,
+    ONE_OLD_MULTI_NEW,
+    DUPLICATE_NEW,      // The new RDATA is a duplicate of the old one
+    NO_OLD_ONE_NEW,     // no old RDATA; this can also cover the case where
+                        // the resulting RdataSet is RRSIG-only.
+    ONE_OLD_NO_NEW
+};
+
+// A helper to build vectors of Rata's for the given test mode.
+void
+createMergeData(int mode, const MergeTestData& data,
+                const RRClass& rrclass, const RRType& rrtype,
+                vector<ConstRdataPtr>& old_list,
+                vector<ConstRdataPtr>& new_list)
+{
+    old_list.clear();
+    new_list.clear();
+    old_list.push_back(createRdata(rrtype, rrclass, data.rdata_txt1));
+    new_list.push_back(createRdata(rrtype, rrclass, data.rdata_txt2));
+    switch (static_cast<MergeTestMode>(mode)) {
+    case ONE_OLD_ONE_NEW:
+        break;
+    case MULTI_OLD_NO_NEW:
+        old_list.push_back(createRdata(rrtype, rrclass, data.rdata_txt3));
+        break;
+    case ONE_OLD_MULTI_NEW:
+        new_list.push_back(createRdata(rrtype, rrclass, data.rdata_txt3));
+        break;
+    case DUPLICATE_NEW:
+        new_list.push_back(createRdata(rrtype, rrclass, data.rdata_txt1));
+        break;
+    case NO_OLD_ONE_NEW:
+        old_list.clear();
+        break;
+    case ONE_OLD_NO_NEW:
+        new_list.clear();
+        break;
+    }
+}
+
+template<class DecoderStyle>
+void
+RdataEncodeDecodeTest<DecoderStyle>::
+mergeRdataCommon(const vector<ConstRdataPtr>& old_rrsigs,
+                 const vector<ConstRdataPtr>& rrsigs)
+{
+    const RRClass rrclass(RRClass::IN()); // class is fixed in the test
+    vector<uint8_t> old_encoded_data;
+    vector<ConstRdataPtr> rrsigs_all;
+    vector<ConstRdataPtr> old_list;
+    vector<ConstRdataPtr> new_list;
+
+    // For each type of test Rdata, we check all modes of test scenarios.
+    for (const MergeTestData* data = merge_test_data;
+         data->type_txt;
+         ++data) {
+        const RRType rrtype(data->type_txt);
+
+        for (int mode = 0; mode <= ONE_OLD_NO_NEW; ++mode) {
+            createMergeData(mode, *data, rrclass, rrtype, old_list, new_list);
+
+            // Encode the old data
+            rdata_list_ = old_list;
+            checkEncode(rrclass, RRType(data->type_txt), rdata_list_,
+                        data->varlen_fields, old_rrsigs);
+            old_encoded_data = encoded_data_; // make a copy of the data
+
+            // Prepare new data.  rrsigs_all is set to "old_rrsigs + rrsigs".
+            // Then check the behavior in the "merge" mode.
+            const size_t old_rdata_count = rdata_list_.size();
+            rdata_list_.insert(rdata_list_.end(), new_list.begin(),
+                               new_list.end());
+            rrsigs_all = old_rrsigs;
+            rrsigs_all.insert(rrsigs_all.end(), rrsigs.begin(), rrsigs.end());
+            checkEncode(rrclass, rrtype, rdata_list_, data->varlen_fields,
+                        rrsigs_all, &old_encoded_data[0], old_rdata_count,
+                        old_rrsigs.size());
+        }
+    }
+}
+
+TYPED_TEST(RdataEncodeDecodeTest, mergeRdata) {
+    vector<ConstRdataPtr> old_rrsigs;
+    vector<ConstRdataPtr> rrsigs;
+
+    // Test without RRSIGs, either for old or new.
+    this->mergeRdataCommon(old_rrsigs, rrsigs);
+
+    // Test without RRSIG for old and with RRSIG for new.
+    rrsigs.push_back(this->rrsig_rdata_);
+    this->mergeRdataCommon(old_rrsigs, rrsigs);
+
+    // Test with RRSIG for old and without RRSIG for new.
+    rrsigs.clear();
+    old_rrsigs.push_back(this->rrsig_rdata_);
+    this->mergeRdataCommon(old_rrsigs, rrsigs);
+
+    // Tests with RRSIGs for both old and new.
+    old_rrsigs.clear();
+    rrsigs.push_back(createRdata(RRType::RRSIG(), RRClass::IN(),
+                                 "A 5 2 3600 20120814220826 "
+                                 "20120715220826 54321 com. FAKE"));
+    this->mergeRdataCommon(old_rrsigs, rrsigs);
+
+    // Tests with multiple old RRSIGs.
+    rrsigs.clear();
+    old_rrsigs.clear();
+    old_rrsigs.push_back(this->rrsig_rdata_);
+    old_rrsigs.push_back(createRdata(RRType::RRSIG(), RRClass::IN(),
+                                     "A 5 2 3600 20120814220826 "
+                                     "20120715220826 54321 com. FAKE"));
+    this->mergeRdataCommon(old_rrsigs, rrsigs);
+
+    // Tests with duplicate RRSIG in new one (keeping the old_rrsigs)
+    rrsigs.push_back(this->rrsig_rdata_);
+    this->mergeRdataCommon(old_rrsigs, rrsigs);
+}
+
+TEST_F(RdataSerializationTest, mergeRdataFromDuplicate) {
+    // create encoded data containing duplicate Rdata and try to start a
+    // new encoding session in the merge mode with that data.  It breaks the
+    // assumption and should result in an exception.
+    const uint8_t data[] = { 192, 0, 2, 1, 192, 0, 2, 1 };
+    EXPECT_THROW(encoder_.start(RRClass::IN(), RRType::A(), data, 2, 0),
+                 isc::Unexpected);
+
+    // Same for duplicate RRSIG
+    isc::util::OutputBuffer buffer(0);
+    vector<uint8_t> sigdata;
+    rrsig_rdata_->toWire(buffer);
+    const uint16_t sig_len = buffer.getLength();
+    // Encode lengths of RRSIGs, 2 bytes each, in host byte order
+    const uint8_t* const lp = static_cast<const uint8_t*>(
+        static_cast<const void*>(&sig_len));
+    sigdata.insert(sigdata.end(), lp, lp + sizeof(sig_len));
+    sigdata.insert(sigdata.end(), lp, lp + sizeof(sig_len));
+    // then the RRSIG data
+    const uint8_t* const dp = static_cast<const uint8_t*>(buffer.getData());
+    sigdata.insert(sigdata.end(), dp, dp + sig_len);
+    sigdata.insert(sigdata.end(), dp, dp + sig_len);
+
+    EXPECT_THROW(encoder_.start(RRClass::IN(), RRType::A(), &sigdata[0], 0, 2),
+                 isc::Unexpected);
+}
+
 void
 checkSigData(const ConstRdataPtr& decoded, bool* called, const void* encoded,
              size_t length)

+ 340 - 66
src/lib/datasrc/tests/memory/rdataset_unittest.cc

@@ -32,14 +32,18 @@
 
 #include <gtest/gtest.h>
 
+#include <boost/bind.hpp>
 #include <boost/lexical_cast.hpp>
 
+#include <vector>
 #include <string>
 
 using namespace isc::dns;
 using namespace isc::dns::rdata;
 using namespace isc::datasrc::memory;
 using namespace isc::testutils;
+using std::string;
+using std::vector;
 using isc::datasrc::memory::detail::SegmentObjectHolder;
 using boost::lexical_cast;
 
@@ -48,20 +52,37 @@ namespace {
 class RdataSetTest : public ::testing::Test {
 protected:
     RdataSetTest() :
+        rrclass(RRClass::IN()),
         // 1076895760 = 0x40302010.  Use this so we fill in all 8-bit "field"
         // of the 32-bit TTL
         a_rrset_(textToRRset("www.example.com. 1076895760 IN A 192.0.2.1")),
         rrsig_rrset_(textToRRset("www.example.com. 1076895760 IN RRSIG "
                                  "A 5 2 3600 20120814220826 20120715220826 "
                                  "1234 example.com. FAKE"))
-    {}
+    {
+        def_rdata_txt_.push_back("192.0.2.1");
+        def_rrsig_txt_.push_back("A 5 2 3600 20120814220826 20120715220826 "
+                                 "1234 example.com. FAKE");
+    }
     void TearDown() {
         EXPECT_TRUE(mem_sgmt_.allMemoryDeallocated());
     }
 
+    // Helper for checking common cases against both versions of create()
+    typedef boost::function<RdataSet*(isc::util::MemorySegment&, RdataEncoder&,
+                                      ConstRRsetPtr, ConstRRsetPtr)> CreateFn;
+    void checkCreateManyRRs(CreateFn create_fn, size_t n_old_rdata);
+    void checkCreateManyRRSIGs(CreateFn create_fn, size_t n_old_sig);
+    void checkBadCreate(CreateFn create_fn);
+
+    const RRClass rrclass;
     ConstRRsetPtr a_rrset_, rrsig_rrset_;
     isc::util::MemorySegmentLocal mem_sgmt_;
     RdataEncoder encoder_;
+
+    // These are placeholder for default expected values used in checkRdataSet.
+    vector<string> def_rdata_txt_;
+    vector<string> def_rrsig_txt_;
 };
 
 // Convert the given 32-bit integer (network byte order) to the corresponding
@@ -73,38 +94,60 @@ restoreTTL(const void* ttl_data) {
 }
 
 // A helper callback for checkRdataSet.  This confirms the given data
-// is the expected in::A RDATA (the value is taken from the RdataSetTest
-// constructor).
+// is the expected RDATA of the specified type.
 void
-checkData(const void* data, size_t size) {
+checkData(const void* data, size_t size, const RRType* rrtype,
+          vector<string>::const_iterator* it,
+          vector<string>::const_iterator it_end)
+{
+    ASSERT_TRUE(*it != it_end); // shouldn't reach the end yet
+
     isc::util::InputBuffer b(data, size);
-    EXPECT_EQ(0, in::A(b, size).compare(in::A("192.0.2.1")));
+    EXPECT_EQ(0, createRdata(*rrtype, RRClass::IN(), b, size)->compare(
+                  *createRdata(*rrtype, RRClass::IN(), **it)));
+    ++(*it);                    // move to the next expected data
 }
 
 // This is a set of checks for an RdataSet created with some simple
-// conditions.  with_rrset/with_rrsig is true iff the RdataSet is supposed to
-// contain normal/RRSIG RDATA.
+// conditions.  expected_data/sigs contain the RDATAs and RRSIGs that are
+// supposed to be contained in rdataset.  They can be empty if rdataset misses
+// RDATA or RRSIG (but not both).
 void
-checkRdataSet(const RdataSet& rdataset, bool with_rrset, bool with_rrsig) {
+checkRdataSet(const RdataSet& rdataset,
+              vector<string> expected_data, // we use a local copy
+              const vector<string>& expected_sigs)
+{
     EXPECT_FALSE(rdataset.next); // by default the next pointer should be NULL
     EXPECT_EQ(RRType::A(), rdataset.type);
     // See the RdataSetTest constructor for the magic number.
     EXPECT_EQ(RRTTL(1076895760), restoreTTL(rdataset.getTTLData()));
-    EXPECT_EQ(with_rrset ? 1 : 0, rdataset.getRdataCount());
-    EXPECT_EQ(with_rrsig ? 1 : 0, rdataset.getSigRdataCount());
+    EXPECT_EQ(expected_data.size(), rdataset.getRdataCount());
+    EXPECT_EQ(expected_sigs.size(), rdataset.getSigRdataCount());
+
+    // extend expected_data with sigs for the convenience of RdataReader
+    expected_data.insert(expected_data.end(), expected_sigs.begin(),
+                         expected_sigs.end());
 
-    // A simple test for the data content.  Details tests for the encoder/
+    // A simple test for the data content.  Detailed tests for the encoder/
     // reader should be basically sufficient for various cases of the data,
     // and the fact that this test doesn't detect memory leak should be
     // reasonably sufficient that the implementation handles the data region
-    // correctly.  Here we check one simple case for a simple form of RDATA,
-    // mainly for checking the behavior of getDataBuf().
+    // correctly.  Here we check one simple case for a simple form of RDATA
+    // and RRSIG, mainly for checking the behavior of getDataBuf().
+    vector<string>::const_iterator it = expected_data.begin();
+    RRType rrtype = RRType::A();
     RdataReader reader(RRClass::IN(), RRType::A(),
                        reinterpret_cast<const uint8_t*>(
                            rdataset.getDataBuf()),
                        rdataset.getRdataCount(), rdataset.getSigRdataCount(),
-                       &RdataReader::emptyNameAction, checkData);
+                       &RdataReader::emptyNameAction,
+                       boost::bind(checkData, _1, _2, &rrtype, &it,
+                                   expected_data.end()));
     reader.iterate();
+    rrtype = RRType::RRSIG();
+    reader.iterateAllSigs();
+
+    EXPECT_TRUE(it == expected_data.end());
 }
 
 TEST_F(RdataSetTest, create) {
@@ -113,10 +156,104 @@ TEST_F(RdataSetTest, create) {
     // would detect any memory leak)
     RdataSet* rdataset = RdataSet::create(mem_sgmt_, encoder_, a_rrset_,
                                           ConstRRsetPtr());
-    checkRdataSet(*rdataset, true, false);
+    checkRdataSet(*rdataset, def_rdata_txt_, vector<string>());
     RdataSet::destroy(mem_sgmt_, rdataset, RRClass::IN());
 }
 
+// This is similar to the simple create test, but we check all combinations
+// of old and new data.
+TEST_F(RdataSetTest, mergeCreate) {
+    // Prepare test data
+    const char* const a_rdatas[] = { "192.0.2.1", "192.0.2.2" };
+    const char* const sig_rdatas[] = {
+        "A 5 2 3600 20120814220826 20120715220826 1234 example.com. FAKE",
+        "A 5 2 3600 20120814220826 20120715220826 4321 example.com. FAKE" };
+    vector<ConstRRsetPtr> a_rrsets;
+    a_rrsets.push_back(textToRRset("www.example.com. 1076895760 IN A "
+                                   + string(a_rdatas[0])));
+    a_rrsets.push_back(textToRRset("www.example.com. 1076895760 IN A "
+                                   + string(a_rdatas[1])));
+    vector<ConstRRsetPtr> rrsig_rrsets;
+    rrsig_rrsets.push_back(textToRRset("www.example.com. 1076895760 IN RRSIG "
+                                       + string(sig_rdatas[0])));
+    rrsig_rrsets.push_back(textToRRset("www.example.com. 1076895760 IN RRSIG "
+                                       + string(sig_rdatas[1])));
+    ConstRRsetPtr null_rrset;   // convenience shortcut
+
+    // We are going to check all combinations of:
+    // with/without old/new RDATA/RRSIGs.
+    // counter variables i, j control the old and new data, respectively, and
+    // the meaning of the value is: bit 1: with RDATA, bit 2: with RRSIG.
+    // Note that at least one RDATA or RRSIG should be contained, so there's
+    // no case for value 0.
+    for (int i = 1; i < 4; ++i) {
+        for (int j = 1; j < 4; ++j) {
+            SCOPED_TRACE("creating merge case " + lexical_cast<string>(i) +
+                         ", " + lexical_cast<string>(j));
+            // Create old rdataset
+            SegmentObjectHolder<RdataSet, RRClass> holder1(
+                mem_sgmt_,
+                RdataSet::create(mem_sgmt_, encoder_,
+                                 (i & 1) != 0 ? a_rrsets[0] : null_rrset,
+                                 (i & 2) != 0 ? rrsig_rrsets[0] : null_rrset),
+                rrclass);
+            // Create merged rdataset, based on the old one and RRsets
+            SegmentObjectHolder<RdataSet, RRClass> holder2(
+                mem_sgmt_,
+                RdataSet::create(mem_sgmt_, encoder_,
+                                 (j & 1) != 0 ? a_rrsets[1] : null_rrset,
+                                 (j & 2) != 0 ? rrsig_rrsets[1] : null_rrset,
+                                 holder1.get()),
+                rrclass);
+
+            // Set up the expected data for the case.
+            vector<string> expected_rdata;
+            if ((i & 1) != 0) {
+                expected_rdata.push_back(a_rdatas[0]);
+            }
+            if ((j & 1) != 0) {
+                expected_rdata.push_back(a_rdatas[1]);
+            }
+            vector<string> expected_sigs;
+            if ((i & 2) != 0) {
+                expected_sigs.push_back(sig_rdatas[0]);
+            }
+            if ((j & 2) != 0) {
+                expected_sigs.push_back(sig_rdatas[1]);
+            }
+
+            // Then perform the check
+            checkRdataSet(*holder2.get(), expected_rdata, expected_sigs);
+        }
+    }
+}
+
+TEST_F(RdataSetTest, duplicate) {
+    // Create RRset and RRSIG containing duplicate RDATA.
+    ConstRRsetPtr dup_rrset =
+        textToRRset("www.example.com. 1076895760 IN A 192.0.2.1\n"
+                    "www.example.com. 1076895760 IN A 192.0.2.1\n");
+    ConstRRsetPtr dup_rrsig =
+        textToRRset("www.example.com. 1076895760 IN RRSIG " +
+                    def_rrsig_txt_[0] +
+                    "\nwww.example.com. 1076895760 IN RRSIG " +
+                    def_rrsig_txt_[0]);
+
+    // After suppressing duplicates, it should be the same as the default
+    // RdataSet.  Check that.
+    SegmentObjectHolder<RdataSet, RRClass> holder1(
+        mem_sgmt_,
+        RdataSet::create(mem_sgmt_, encoder_, dup_rrset, dup_rrsig), rrclass);
+    checkRdataSet(*holder1.get(), def_rdata_txt_, def_rrsig_txt_);
+
+    // Confirm the same thing for the merge mode.
+    SegmentObjectHolder<RdataSet, RRClass> holder2(
+        mem_sgmt_,
+        RdataSet::create(mem_sgmt_, encoder_, a_rrset_, rrsig_rrset_,
+                         holder1.get()), rrclass);
+    checkRdataSet(*holder2.get(), def_rdata_txt_, def_rrsig_txt_);
+}
+
 TEST_F(RdataSetTest, getNext) {
     RdataSet* rdataset = RdataSet::create(mem_sgmt_, encoder_, a_rrset_,
                                           ConstRRsetPtr());
@@ -192,8 +329,8 @@ TEST_F(RdataSetTest, find) {
 }
 
 // A helper function to create an RRset containing the given number of
-// unique RDATAs.
-ConstRRsetPtr
+// unique RDATAs.  We return non const pointer so that we can extend it.
+RRsetPtr
 getRRsetWithRdataCount(size_t rdata_count) {
     RRsetPtr rrset(new RRset(Name("example.com"), RRClass::IN(), RRType::TXT(),
                              RRTTL(3600)));
@@ -204,48 +341,60 @@ getRRsetWithRdataCount(size_t rdata_count) {
     return (rrset);
 }
 
-TEST_F(RdataSetTest, createManyRRs) {
-    // RRset with possible maximum number of RDATAs
-    RdataSet* rdataset = RdataSet::create(mem_sgmt_, encoder_,
-                                          getRRsetWithRdataCount(8191),
-                                          ConstRRsetPtr());
+void
+RdataSetTest::checkCreateManyRRs(CreateFn create_fn, size_t n_old_rdata) {
+    // RRset with possible maximum number of RDATAs, taking into account
+    // "pre-existing" RDATAs
+    RRsetPtr large_rrset = getRRsetWithRdataCount(8191 - n_old_rdata);
+    RdataSet* rdataset = create_fn(mem_sgmt_, encoder_, large_rrset,
+                                   ConstRRsetPtr());
     EXPECT_EQ(8191, rdataset->getRdataCount());
     EXPECT_EQ(0, rdataset->getSigRdataCount());
-    RdataSet::destroy(mem_sgmt_, rdataset, RRClass::IN());
+    RdataSet::destroy(mem_sgmt_, rdataset, rrclass);
+
+    // Duplicate RDATA will be ignored in this check.
+    large_rrset->addRdata(createRdata(RRType::TXT(), rrclass, "0"));
+    rdataset = create_fn(mem_sgmt_, encoder_, large_rrset, ConstRRsetPtr());
+    EXPECT_EQ(8191, rdataset->getRdataCount());
+    RdataSet::destroy(mem_sgmt_, rdataset, rrclass);
 
     // Exceeding that will result in an exception.
-    EXPECT_THROW(RdataSet::create(mem_sgmt_, encoder_,
-                                  getRRsetWithRdataCount(8192),
-                                  ConstRRsetPtr()),
+    large_rrset->addRdata(createRdata(RRType::TXT(), rrclass, "8192"));
+    EXPECT_THROW(create_fn(mem_sgmt_, encoder_, large_rrset, ConstRRsetPtr()),
                  RdataSetError);
     // To be very sure even try larger number than the threshold
-    EXPECT_THROW(RdataSet::create(mem_sgmt_, encoder_,
-                                  getRRsetWithRdataCount(65535),
-                                  ConstRRsetPtr()),
+    EXPECT_THROW(create_fn(mem_sgmt_, encoder_,
+                           getRRsetWithRdataCount(65535 - n_old_rdata),
+                           ConstRRsetPtr()),
                  RdataSetError);
 }
 
+TEST_F(RdataSetTest, createManyRRs) {
+    checkCreateManyRRs(boost::bind(&RdataSet::create, _1, _2, _3, _4,
+                                   static_cast<const RdataSet*>(NULL)), 0);
+}
+
+TEST_F(RdataSetTest, mergeCreateManyRRs) {
+    ConstRRsetPtr rrset = textToRRset("example.com. 3600 IN TXT some-text");
+    SegmentObjectHolder<RdataSet, RRClass> holder(
+        mem_sgmt_,
+        RdataSet::create(mem_sgmt_, encoder_, rrset, ConstRRsetPtr()),
+        RRClass::IN());
+
+    checkCreateManyRRs(boost::bind(&RdataSet::create, _1, _2, _3, _4,
+                                   holder.get()), rrset->getRdataCount());
+}
+
 TEST_F(RdataSetTest, createWithRRSIG) {
-    // Normal case.
     RdataSet* rdataset = RdataSet::create(mem_sgmt_, encoder_, a_rrset_,
                                           rrsig_rrset_);
-    checkRdataSet(*rdataset, true, true);
-    RdataSet::destroy(mem_sgmt_, rdataset, RRClass::IN());
-
-    // Unusual case: TTL doesn't match.  This implementation accepts that,
-    // using the TTL of the covered RRset.
-    ConstRRsetPtr rrsig_badttl(textToRRset(
-                                   "www.example.com. 3600 IN RRSIG "
-                                   "A 5 2 3600 20120814220826 "
-                                   "20120715220826 1234 example.com. FAKE"));
-    rdataset = RdataSet::create(mem_sgmt_, encoder_, a_rrset_, rrsig_badttl);
-    checkRdataSet(*rdataset, true, true);
+    checkRdataSet(*rdataset, def_rdata_txt_, def_rrsig_txt_);
     RdataSet::destroy(mem_sgmt_, rdataset, RRClass::IN());
 }
 
 // A helper function to create an RRSIG RRset containing the given number of
 // unique RDATAs.
-ConstRRsetPtr
+RRsetPtr
 getRRSIGWithRdataCount(size_t sig_count) {
     RRsetPtr rrset(new RRset(Name("example.com"), RRClass::IN(),
                              RRType::RRSIG(), RRTTL(3600)));
@@ -269,77 +418,111 @@ getRRSIGWithRdataCount(size_t sig_count) {
     return (rrset);
 }
 
-TEST_F(RdataSetTest, createManyRRSIGs) {
+void
+RdataSetTest::checkCreateManyRRSIGs(CreateFn create_fn, size_t n_old_sig) {
     // 7 has a special meaning in the implementation: if the number of the
     // RRSIGs reaches this value, an extra 'sig count' field will be created.
-    RdataSet* rdataset = RdataSet::create(mem_sgmt_, encoder_, a_rrset_,
-                                          getRRSIGWithRdataCount(7));
+    RdataSet* rdataset = create_fn(mem_sgmt_, encoder_, a_rrset_,
+                                   getRRSIGWithRdataCount(7 - n_old_sig));
     EXPECT_EQ(7, rdataset->getSigRdataCount());
     RdataSet::destroy(mem_sgmt_, rdataset, RRClass::IN());
 
     // 8 would cause overflow in the normal 3-bit field if there were no extra
     // count field.
-    rdataset = RdataSet::create(mem_sgmt_, encoder_, a_rrset_,
-                                getRRSIGWithRdataCount(8));
+    rdataset = create_fn(mem_sgmt_, encoder_, a_rrset_,
+                         getRRSIGWithRdataCount(8 - n_old_sig));
     EXPECT_EQ(8, rdataset->getSigRdataCount());
     RdataSet::destroy(mem_sgmt_, rdataset, RRClass::IN());
 
     // Up to 2^16-1 RRSIGs are allowed (although that would be useless
     // in practice)
-    rdataset = RdataSet::create(mem_sgmt_, encoder_, a_rrset_,
-                                getRRSIGWithRdataCount(65535));
+    RRsetPtr large_rrsig = getRRSIGWithRdataCount(65535 - n_old_sig);
+    rdataset = create_fn(mem_sgmt_, encoder_, a_rrset_, large_rrsig);
+    EXPECT_EQ(65535, rdataset->getSigRdataCount());
+    RdataSet::destroy(mem_sgmt_, rdataset, RRClass::IN());
+
+    // Duplicate shouldn't be counted
+    large_rrsig->addRdata(
+        createRdata(RRType::RRSIG(), rrclass,
+                    "A 5 2 0 20120814220826 20120715220826 1234 "
+                    "example.com. FAKE"));
+    rdataset = create_fn(mem_sgmt_, encoder_, a_rrset_, large_rrsig);
     EXPECT_EQ(65535, rdataset->getSigRdataCount());
     RdataSet::destroy(mem_sgmt_, rdataset, RRClass::IN());
 
     // Exceeding this limit will result in an exception.
-    EXPECT_THROW(RdataSet::create(mem_sgmt_, encoder_, a_rrset_,
-                                  getRRSIGWithRdataCount(65536)),
+    large_rrsig->addRdata(
+        createRdata(RRType::RRSIG(), rrclass,
+                    "A 5 2 65536 20120814220826 20120715220826 1234 "
+                    "example.com. FAKE"));
+    EXPECT_THROW(create_fn(mem_sgmt_, encoder_, a_rrset_, large_rrsig),
                  RdataSetError);
     // To be very sure even try larger number than the threshold
-    EXPECT_THROW(RdataSet::create(mem_sgmt_, encoder_, a_rrset_,
-                                  getRRSIGWithRdataCount(70000)),
+    EXPECT_THROW(create_fn(mem_sgmt_, encoder_, a_rrset_,
+                           getRRSIGWithRdataCount(70000 - n_old_sig)),
                  RdataSetError);
 }
 
+TEST_F(RdataSetTest, createManyRRSIGs) {
+    checkCreateManyRRSIGs(boost::bind(&RdataSet::create, _1, _2, _3, _4,
+                                      static_cast<const RdataSet*>(NULL)), 0);
+}
+
+TEST_F(RdataSetTest, mergeCreateManyRRSIGs) {
+    // Create "old" RRSIG that shouldn't be a duplicate of ones created in
+    // checkCreateManyRRSIGs (signature is different).
+    ConstRRsetPtr rrsig = textToRRset(
+        "example.com. 3600 IN RRSIG A 5 2 3600 20120814220826 20120715220826 "
+        "1234 example.com. FAKEFAKE");
+    SegmentObjectHolder<RdataSet, RRClass> holder(
+        mem_sgmt_,
+        RdataSet::create(mem_sgmt_, encoder_, ConstRRsetPtr(), rrsig),
+        rrclass);
+
+    checkCreateManyRRSIGs(boost::bind(&RdataSet::create, _1, _2, _3, _4,
+                                      holder.get()), rrsig->getRdataCount());
+}
+
 TEST_F(RdataSetTest, createWithRRSIGOnly) {
     // A rare, but allowed, case: RdataSet without the main RRset but with
     // RRSIG.
     RdataSet* rdataset = RdataSet::create(mem_sgmt_, encoder_, ConstRRsetPtr(),
                                           rrsig_rrset_);
-    checkRdataSet(*rdataset, false, true);
+    checkRdataSet(*rdataset, vector<string>(), def_rrsig_txt_);
     RdataSet::destroy(mem_sgmt_, rdataset, RRClass::IN());
 }
 
-TEST_F(RdataSetTest, badCeate) {
+// Checking initial validation for both versions of create().
+void
+RdataSetTest::checkBadCreate(CreateFn create_fn) {
     // Neither the RRset nor RRSIG RRset is given
-    EXPECT_THROW(RdataSet::create(mem_sgmt_, encoder_, ConstRRsetPtr(),
-                                  ConstRRsetPtr()), isc::BadValue);
+    EXPECT_THROW(create_fn(mem_sgmt_, encoder_, ConstRRsetPtr(),
+                           ConstRRsetPtr()), isc::BadValue);
 
     // Empty RRset (An RRset without RDATA)
     ConstRRsetPtr empty_rrset(new RRset(Name("example.com"), RRClass::IN(),
                                         RRType::A(), RRTTL(3600)));
-    EXPECT_THROW(RdataSet::create(mem_sgmt_, encoder_, empty_rrset,
-                                  ConstRRsetPtr()), isc::BadValue);
+    EXPECT_THROW(create_fn(mem_sgmt_, encoder_, empty_rrset,
+                           ConstRRsetPtr()), isc::BadValue);
     ConstRRsetPtr empty_rrsig(new RRset(Name("example.com"), RRClass::IN(),
                                         RRType::RRSIG(), RRTTL(3600)));
-    EXPECT_THROW(RdataSet::create(mem_sgmt_, encoder_, ConstRRsetPtr(),
-                                  empty_rrsig), isc::BadValue);
+    EXPECT_THROW(create_fn(mem_sgmt_, encoder_, ConstRRsetPtr(),
+                           empty_rrsig), isc::BadValue);
 
     // The RRset type and RRSIG's type covered don't match
     ConstRRsetPtr bad_rrsig(textToRRset(
                                 "www.example.com. 1076895760 IN RRSIG "
                                 "NS 5 2 3600 20120814220826 20120715220826 "
                                 "1234 example.com. FAKE"));
-    EXPECT_THROW(RdataSet::create(mem_sgmt_, encoder_, a_rrset_, bad_rrsig),
+    EXPECT_THROW(create_fn(mem_sgmt_, encoder_, a_rrset_, bad_rrsig),
                  isc::BadValue);
 
     // Pass non RRSIG for the sig parameter
-    EXPECT_THROW(RdataSet::create(mem_sgmt_, encoder_, a_rrset_, a_rrset_),
+    EXPECT_THROW(create_fn(mem_sgmt_, encoder_, a_rrset_, a_rrset_),
                  isc::BadValue);
 
     // Pass RRSIG for normal RRset (the RdataEncoder will catch this and throw)
-    EXPECT_THROW(RdataSet::create(mem_sgmt_, encoder_, rrsig_rrset_,
-                                  rrsig_rrset_),
+    EXPECT_THROW(create_fn(mem_sgmt_, encoder_, rrsig_rrset_, rrsig_rrset_),
                  isc::BadValue);
 
     // RR class doesn't match between RRset and RRSIG
@@ -348,8 +531,99 @@ TEST_F(RdataSetTest, badCeate) {
                                      "A 5 2 3600 20120814220826 "
                                      "20120715220826 1234 example.com. FAKE",
                                      RRClass::CH()));
+    EXPECT_THROW(create_fn(mem_sgmt_, encoder_, a_rrset_, badclass_rrsig),
+                 isc::BadValue);
+}
+
+TEST_F(RdataSetTest, badCreate) {
+    checkBadCreate(boost::bind(&RdataSet::create, _1, _2, _3, _4,
+                               static_cast<const RdataSet*>(NULL)));
+}
+
+TEST_F(RdataSetTest, badMergeCreate) {
+    // The 'old RdataSet' for merge.  Its content doesn't matter much; the test
+    // should trigger exception before examining it except for the last checks.
+    SegmentObjectHolder<RdataSet, RRClass> holder(
+        mem_sgmt_,
+        RdataSet::create(mem_sgmt_, encoder_,
+                         textToRRset("www.example.com. 0 IN AAAA 2001:db8::1"),
+                         ConstRRsetPtr()),
+        RRClass::IN());
+
+    checkBadCreate(boost::bind(&RdataSet::create, _1, _2, _3, _4,
+                               holder.get()));
+
+    // Type mismatch: this case is specific to the merge create.
     EXPECT_THROW(RdataSet::create(mem_sgmt_, encoder_, a_rrset_,
-                                  badclass_rrsig),
+                                  ConstRRsetPtr(), holder.get()),
+                 isc::BadValue);
+    EXPECT_THROW(RdataSet::create(mem_sgmt_, encoder_, ConstRRsetPtr(),
+                                  rrsig_rrset_, holder.get()),
                  isc::BadValue);
 }
+
+TEST_F(RdataSetTest, varyingTTL) {
+    // Creating RdataSets with different TTLs.  The lowest one should win.
+
+    ConstRRsetPtr aaaa_smaller = textToRRset("example. 5 IN AAAA 2001:db8::");
+    ConstRRsetPtr aaaa_small = textToRRset("example. 10 IN AAAA 2001:db8::1");
+    ConstRRsetPtr aaaa_large = textToRRset("example. 20 IN AAAA 2001:db8::2");
+    ConstRRsetPtr sig_smaller =
+        textToRRset("www.example.com. 5 IN RRSIG AAAA 5 2 3600 "
+                    "20120814220826 20120715220826 1111 example.com. FAKE");
+    ConstRRsetPtr sig_small =
+        textToRRset("www.example.com. 10 IN RRSIG AAAA 5 2 3600 "
+                    "20120814220826 20120715220826 1234 example.com. FAKE");
+    ConstRRsetPtr sig_large =
+        textToRRset("www.example.com. 20 IN RRSIG AAAA 5 2 3600 "
+                    "20120814220826 20120715220826 4321 example.com. FAKE");
+
+    // RRSIG's TTL is larger
+    RdataSet* rdataset = RdataSet::create(mem_sgmt_, encoder_, aaaa_small,
+                                          sig_large);
+    EXPECT_EQ(RRTTL(10), restoreTTL(rdataset->getTTLData()));
+    RdataSet::destroy(mem_sgmt_, rdataset, rrclass);
+
+    // RRSIG's TTL is smaller
+    SegmentObjectHolder<RdataSet, RRClass> holder1(
+        mem_sgmt_,
+        RdataSet::create(mem_sgmt_, encoder_, aaaa_large, sig_small), rrclass);
+    EXPECT_EQ(RRTTL(10), restoreTTL(holder1.get()->getTTLData()));
+
+    // Merging another RRset (w/o sig) that has larger TTL
+    rdataset = RdataSet::create(mem_sgmt_, encoder_, aaaa_large,
+                                ConstRRsetPtr(), holder1.get());
+    EXPECT_EQ(RRTTL(10), restoreTTL(rdataset->getTTLData()));
+    RdataSet::destroy(mem_sgmt_, rdataset, rrclass);
+
+    // Merging another RRset (w/o sig) that has smaller TTL
+    rdataset = RdataSet::create(mem_sgmt_, encoder_, aaaa_smaller,
+                                ConstRRsetPtr(), holder1.get());
+    EXPECT_EQ(RRTTL(5), restoreTTL(rdataset->getTTLData()));
+    RdataSet::destroy(mem_sgmt_, rdataset, rrclass);
+
+    // Merging another RRSIG (w/o RRset) that has larger TTL
+    rdataset = RdataSet::create(mem_sgmt_, encoder_, ConstRRsetPtr(),
+                                sig_large, holder1.get());
+    EXPECT_EQ(RRTTL(10), restoreTTL(rdataset->getTTLData()));
+    RdataSet::destroy(mem_sgmt_, rdataset, rrclass);
+
+    // Merging another RRSIG (w/o RRset) that has smaller TTL
+    rdataset = RdataSet::create(mem_sgmt_, encoder_, ConstRRsetPtr(),
+                                sig_smaller, holder1.get());
+    EXPECT_EQ(RRTTL(5), restoreTTL(rdataset->getTTLData()));
+    RdataSet::destroy(mem_sgmt_, rdataset, rrclass);
+
+    // Merging another RRset and RRSIG that have larger TTL
+    rdataset = RdataSet::create(mem_sgmt_, encoder_, aaaa_large, sig_large,
+                                holder1.get());
+    EXPECT_EQ(RRTTL(10), restoreTTL(rdataset->getTTLData()));
+    RdataSet::destroy(mem_sgmt_, rdataset, rrclass);
+
+    // Merging another RRset and RRSIG that have smaller TTL
+    rdataset = RdataSet::create(mem_sgmt_, encoder_, aaaa_smaller, sig_smaller,
+                                holder1.get());
+    EXPECT_EQ(RRTTL(5), restoreTTL(rdataset->getTTLData()));
+    RdataSet::destroy(mem_sgmt_, rdataset, rrclass);
+}
 }