Browse Source

merge trac215(zone manager) and trac289(notify-out)


git-svn-id: svn://bind10.isc.org/svn/bind10/trunk@2736 e5f2f494-b856-4b98-b285-d166d9295462
Jerry 15 years ago
parent
commit
454a8f667a

+ 11 - 0
ChangeLog

@@ -1,3 +1,14 @@
+  87.   [func]      zhanglikun
+	lib/python/isc/notifyout: Add the feature of notify-out, when 
+	zone axfr/ixfr finishing, the server will notify its slaves.
+	(Trac #289, svn r2735)
+
+  86.   [func]      jerry
+    bin/zonemgr: Added zone manager module. The zone manager is	one 
+	of the co-operating processes of BIND10, which keeps track of 
+	timers and other information necessary for BIND10 to act as a 
+	slave. (Trac #215, svn r2735)
+
   85.	[build]*	jinmei
   85.	[build]*	jinmei
 	Build programs using dynamic link by default.  A new configure
 	Build programs using dynamic link by default.  A new configure
 	option --enable-static-link is provided to force static link for
 	option --enable-static-link is provided to force static link for

+ 11 - 0
configure.ac

@@ -411,6 +411,8 @@ AC_CONFIG_FILES([Makefile
                  src/bin/xfrin/tests/Makefile
                  src/bin/xfrin/tests/Makefile
                  src/bin/xfrout/Makefile
                  src/bin/xfrout/Makefile
                  src/bin/xfrout/tests/Makefile
                  src/bin/xfrout/tests/Makefile
+                 src/bin/zonemgr/Makefile
+                 src/bin/zonemgr/tests/Makefile
                  src/bin/usermgr/Makefile
                  src/bin/usermgr/Makefile
                  src/lib/Makefile
                  src/lib/Makefile
                  src/lib/bench/Makefile
                  src/lib/bench/Makefile
@@ -427,6 +429,8 @@ AC_CONFIG_FILES([Makefile
                  src/lib/python/isc/config/tests/Makefile
                  src/lib/python/isc/config/tests/Makefile
                  src/lib/python/isc/log/Makefile
                  src/lib/python/isc/log/Makefile
                  src/lib/python/isc/log/tests/Makefile
                  src/lib/python/isc/log/tests/Makefile
+                 src/lib/python/isc/notify/Makefile
+                 src/lib/python/isc/notify/tests/Makefile
                  src/lib/config/Makefile
                  src/lib/config/Makefile
                  src/lib/config/tests/Makefile
                  src/lib/config/tests/Makefile
                  src/lib/dns/Makefile
                  src/lib/dns/Makefile
@@ -453,6 +457,10 @@ AC_OUTPUT([src/bin/cfgmgr/b10-cfgmgr.py
            src/bin/xfrout/xfrout.spec.pre
            src/bin/xfrout/xfrout.spec.pre
            src/bin/xfrout/tests/xfrout_test
            src/bin/xfrout/tests/xfrout_test
            src/bin/xfrout/run_b10-xfrout.sh
            src/bin/xfrout/run_b10-xfrout.sh
+           src/bin/zonemgr/zonemgr.py
+           src/bin/zonemgr/zonemgr.spec.pre
+           src/bin/zonemgr/tests/zonemgr_test
+           src/bin/zonemgr/run_b10-zonemgr.sh
            src/bin/bind10/bind10.py
            src/bin/bind10/bind10.py
            src/bin/bind10/tests/bind10_test
            src/bin/bind10/tests/bind10_test
            src/bin/bind10/run_bind10.sh
            src/bin/bind10/run_bind10.sh
@@ -474,6 +482,7 @@ AC_OUTPUT([src/bin/cfgmgr/b10-cfgmgr.py
            src/lib/python/isc/config/tests/config_test
            src/lib/python/isc/config/tests/config_test
            src/lib/python/isc/cc/tests/cc_test
            src/lib/python/isc/cc/tests/cc_test
            src/lib/python/isc/log/tests/log_test
            src/lib/python/isc/log/tests/log_test
+           src/lib/python/isc/notify/tests/notify_out_test
            src/lib/dns/gen-rdatacode.py
            src/lib/dns/gen-rdatacode.py
            src/lib/python/bind10_config.py
            src/lib/python/bind10_config.py
            src/lib/dns/tests/testdata/gen-wiredata.py
            src/lib/dns/tests/testdata/gen-wiredata.py
@@ -482,10 +491,12 @@ AC_OUTPUT([src/bin/cfgmgr/b10-cfgmgr.py
            chmod +x src/bin/cmdctl/run_b10-cmdctl.sh
            chmod +x src/bin/cmdctl/run_b10-cmdctl.sh
            chmod +x src/bin/xfrin/run_b10-xfrin.sh
            chmod +x src/bin/xfrin/run_b10-xfrin.sh
            chmod +x src/bin/xfrout/run_b10-xfrout.sh
            chmod +x src/bin/xfrout/run_b10-xfrout.sh
+           chmod +x src/bin/zonemgr/run_b10-zonemgr.sh
            chmod +x src/bin/bind10/run_bind10.sh
            chmod +x src/bin/bind10/run_bind10.sh
            chmod +x src/bin/cmdctl/tests/cmdctl_test
            chmod +x src/bin/cmdctl/tests/cmdctl_test
            chmod +x src/bin/xfrin/tests/xfrin_test
            chmod +x src/bin/xfrin/tests/xfrin_test
            chmod +x src/bin/xfrout/tests/xfrout_test
            chmod +x src/bin/xfrout/tests/xfrout_test
+           chmod +x src/bin/zonemgr/tests/zonemgr_test
            chmod +x src/bin/bindctl/tests/bindctl_test
            chmod +x src/bin/bindctl/tests/bindctl_test
            chmod +x src/bin/bindctl/run_bindctl.sh
            chmod +x src/bin/bindctl/run_bindctl.sh
            chmod +x src/bin/loadzone/run_loadzone.sh
            chmod +x src/bin/loadzone/run_loadzone.sh

+ 1 - 1
src/bin/Makefile.am

@@ -1 +1 @@
-SUBDIRS = bind10 bindctl cfgmgr loadzone msgq host cmdctl auth xfrin xfrout usermgr
+SUBDIRS = bind10 bindctl cfgmgr loadzone msgq host cmdctl auth xfrin xfrout usermgr zonemgr

+ 4 - 5
src/bin/auth/auth_srv.cc

@@ -327,7 +327,6 @@ AuthSrvImpl::processNormalQuery(const IOMessage& io_message, Message& message,
     return (true);
     return (true);
 }
 }
 
 
-
 bool
 bool
 AuthSrvImpl::processAxfrQuery(const IOMessage& io_message, Message& message,
 AuthSrvImpl::processAxfrQuery(const IOMessage& io_message, Message& message,
                             MessageRenderer& response_renderer)
                             MessageRenderer& response_renderer)
@@ -423,7 +422,7 @@ AuthSrvImpl::processNotify(const IOMessage& io_message, Message& message,
     static const string command_template_start =
     static const string command_template_start =
         "{\"command\": [\"notify\", {\"zone_name\" : \"";
         "{\"command\": [\"notify\", {\"zone_name\" : \"";
     static const string command_template_master = "\", \"master\" : \"";
     static const string command_template_master = "\", \"master\" : \"";
-    static const string command_template_rrclass = "\", \"rrclass\" : \"";
+    static const string command_template_rrclass = "\", \"zone_class\" : \"";
     static const string command_template_end = "\"}]}";
     static const string command_template_end = "\"}]}";
 
 
     try {
     try {
@@ -433,7 +432,7 @@ AuthSrvImpl::processNotify(const IOMessage& io_message, Message& message,
                 command_template_rrclass + question->getClass().toText() +
                 command_template_rrclass + question->getClass().toText() +
                 command_template_end);
                 command_template_end);
         const unsigned int seq =
         const unsigned int seq =
-            xfrin_session_->group_sendmsg(notify_command, "Xfrin",
+            xfrin_session_->group_sendmsg(notify_command, "Zonemgr",
                                           "*", "*");
                                           "*", "*");
         ElementPtr env, answer, parsed_answer;
         ElementPtr env, answer, parsed_answer;
         xfrin_session_->group_recvmsg(env, answer, false, seq);
         xfrin_session_->group_recvmsg(env, answer, false, seq);
@@ -441,14 +440,14 @@ AuthSrvImpl::processNotify(const IOMessage& io_message, Message& message,
         parsed_answer = parseAnswer(rcode, answer);
         parsed_answer = parseAnswer(rcode, answer);
         if (rcode != 0) {
         if (rcode != 0) {
             if (verbose_mode_) {
             if (verbose_mode_) {
-                cerr << "[b10-auth] failed to notify Xfrin: "
+                cerr << "[b10-auth] failed to notify Zonemgr: "
                      << parsed_answer->str() << endl; 
                      << parsed_answer->str() << endl; 
             }
             }
             return (false);
             return (false);
         }
         }
     } catch (const Exception& ex) {
     } catch (const Exception& ex) {
         if (verbose_mode_) {
         if (verbose_mode_) {
-            cerr << "[b10-auth] failed to notify Xfrin: " << ex.what() << endl;
+            cerr << "[b10-auth] failed to notify Zonemgr: " << ex.what() << endl;
         }
         }
         return (false);
         return (false);
     }
     }

+ 22 - 0
src/bin/bind10/bind10.py.in

@@ -398,6 +398,26 @@ class BoB:
             sys.stdout.write("[bind10] Started b10-xfrin (PID %d)\n" % 
             sys.stdout.write("[bind10] Started b10-xfrin (PID %d)\n" % 
                              xfrind.pid)
                              xfrind.pid)
 
 
+        # start b10-zonemgr
+        zonemgr_args = ['b10-zonemgr']
+        if self.verbose:
+            sys.stdout.write("[bind10] Starting b10-zonemgr\n")
+            zonemgr_args += ['-v']
+        try:
+            zonemgr = ProcessInfo("b10-zonemgr", zonemgr_args,
+                                 c_channel_env)
+        except Exception as e:
+            c_channel.process.kill()
+            bind_cfgd.process.kill()
+            xfrout.process.kill()
+            auth.process.kill()
+            xfrind.process.kill()
+            return "Unable to start b10-zonemgr; " + str(e)
+        self.processes[zonemgr.pid] = zonemgr 
+        if self.verbose:
+            sys.stdout.write("[bind10] Started b10-zonemgr(PID %d)\n" % 
+                             zonemgr.pid)
+
         # start the b10-cmdctl
         # start the b10-cmdctl
         # XXX: we hardcode port 8080
         # XXX: we hardcode port 8080
         cmdctl_args = ['b10-cmdctl']
         cmdctl_args = ['b10-cmdctl']
@@ -413,6 +433,7 @@ class BoB:
             xfrout.process.kill()
             xfrout.process.kill()
             auth.process.kill()
             auth.process.kill()
             xfrind.process.kill()
             xfrind.process.kill()
+            zonemgr.process.kill()
             return "Unable to start b10-cmdctl; " + str(e)
             return "Unable to start b10-cmdctl; " + str(e)
         self.processes[cmd_ctrld.pid] = cmd_ctrld
         self.processes[cmd_ctrld.pid] = cmd_ctrld
         if self.verbose:
         if self.verbose:
@@ -431,6 +452,7 @@ class BoB:
         self.cc_session.group_sendmsg(cmd, "Boss", "Auth")
         self.cc_session.group_sendmsg(cmd, "Boss", "Auth")
         self.cc_session.group_sendmsg(cmd, "Boss", "Xfrout")
         self.cc_session.group_sendmsg(cmd, "Boss", "Xfrout")
         self.cc_session.group_sendmsg(cmd, "Boss", "Xfrin")
         self.cc_session.group_sendmsg(cmd, "Boss", "Xfrin")
+        self.cc_session.group_sendmsg(cmd, "Boss", "Zonemgr")
 
 
     def stop_process(self, process):
     def stop_process(self, process):
         """Stop the given process, friendly-like."""
         """Stop the given process, friendly-like."""

+ 1 - 1
src/bin/bind10/run_bind10.sh.in

@@ -20,7 +20,7 @@ export PYTHON_EXEC
 
 
 BIND10_PATH=@abs_top_builddir@/src/bin/bind10
 BIND10_PATH=@abs_top_builddir@/src/bin/bind10
 
 
-PATH=@abs_top_builddir@/src/bin/msgq:@abs_top_builddir@/src/bin/auth:@abs_top_builddir@/src/bin/cfgmgr:@abs_top_builddir@/src/bin/cmdctl:@abs_top_builddir@/src/bin/xfrin:@abs_top_builddir@/src/bin/xfrout:$PATH
+PATH=@abs_top_builddir@/src/bin/msgq:@abs_top_builddir@/src/bin/auth:@abs_top_builddir@/src/bin/cfgmgr:@abs_top_builddir@/src/bin/cmdctl:@abs_top_builddir@/src/bin/xfrin:@abs_top_builddir@/src/bin/xfrout:@abs_top_builddir@/src/bin/zonemgr:$PATH
 export PATH
 export PATH
 
 
 PYTHONPATH=@abs_top_builddir@/src/lib/python:@abs_top_builddir@/src/lib/dns/python/.libs:@abs_top_builddir@/src/lib/xfr/.libs
 PYTHONPATH=@abs_top_builddir@/src/lib/python:@abs_top_builddir@/src/lib/dns/python/.libs:@abs_top_builddir@/src/lib/xfr/.libs

+ 47 - 17
src/bin/xfrin/tests/xfrin_test.py

@@ -63,6 +63,9 @@ class MockXfrin(Xfrin):
 
 
     def _cc_setup(self):
     def _cc_setup(self):
         pass
         pass
+
+    def _get_db_file(self):
+        pass
     
     
     def _cc_check_command(self):
     def _cc_check_command(self):
         self._shutdown_event.set()
         self._shutdown_event.set()
@@ -408,11 +411,16 @@ class TestXfrin(unittest.TestCase):
     def tearDown(self):
     def tearDown(self):
         self.xfr.shutdown()
         self.xfr.shutdown()
 
 
-    def _do_parse(self):
-        return self.xfr._parse_cmd_params(self.args)
+    def _do_parse_zone_name_class(self):
+        return self.xfr._parse_zone_name_and_class(self.args)
+
+    def _do_parse_master_port(self):
+        return self.xfr._parse_master_and_port(self.args)
 
 
     def test_parse_cmd_params(self):
     def test_parse_cmd_params(self):
-        name, rrclass, master_addrinfo, db_file = self._do_parse()
+        name, rrclass = self._do_parse_zone_name_class()
+        master_addrinfo = self._do_parse_master_port()
+        db_file = self.args.get('db_file')
         self.assertEqual(master_addrinfo[4][1], int(TEST_MASTER_PORT))
         self.assertEqual(master_addrinfo[4][1], int(TEST_MASTER_PORT))
         self.assertEqual(name, TEST_ZONE_NAME)
         self.assertEqual(name, TEST_ZONE_NAME)
         self.assertEqual(rrclass, TEST_RRCLASS)
         self.assertEqual(rrclass, TEST_RRCLASS)
@@ -421,49 +429,50 @@ class TestXfrin(unittest.TestCase):
 
 
     def test_parse_cmd_params_default_port(self):
     def test_parse_cmd_params_default_port(self):
         del self.args['port']
         del self.args['port']
-        master_addrinfo = self._do_parse()[2]
+        master_addrinfo = self._do_parse_master_port()
         self.assertEqual(master_addrinfo[4][1], 53)
         self.assertEqual(master_addrinfo[4][1], 53)
 
 
     def test_parse_cmd_params_ip6master(self):
     def test_parse_cmd_params_ip6master(self):
         self.args['master'] = TEST_MASTER_IPV6_ADDRESS
         self.args['master'] = TEST_MASTER_IPV6_ADDRESS
-        master_addrinfo = self._do_parse()[2]
+        master_addrinfo = self._do_parse_master_port()
         self.assertEqual(master_addrinfo[4][0], TEST_MASTER_IPV6_ADDRESS)
         self.assertEqual(master_addrinfo[4][0], TEST_MASTER_IPV6_ADDRESS)
 
 
     def test_parse_cmd_params_chclass(self):
     def test_parse_cmd_params_chclass(self):
-        self.args['rrclass'] = 'CH'
-        self.assertEqual(self._do_parse()[1], RRClass.CH())
+        self.args['zone_class'] = 'CH'
+        self.assertEqual(self._do_parse_zone_name_class()[1], RRClass.CH())
 
 
     def test_parse_cmd_params_bogusclass(self):
     def test_parse_cmd_params_bogusclass(self):
-        self.args['rrclass'] = 'XXX'
-        self.assertRaises(XfrinException, self._do_parse)
+        self.args['zone_class'] = 'XXX'
+        self.assertRaises(XfrinException, self._do_parse_zone_name_class)
 
 
     def test_parse_cmd_params_nozone(self):
     def test_parse_cmd_params_nozone(self):
         # zone name is mandatory.
         # zone name is mandatory.
         del self.args['zone_name']
         del self.args['zone_name']
-        self.assertRaises(XfrinException, self._do_parse)
+        self.assertRaises(XfrinException, self._do_parse_zone_name_class)
 
 
     def test_parse_cmd_params_nomaster(self):
     def test_parse_cmd_params_nomaster(self):
         # master address is mandatory.
         # master address is mandatory.
         del self.args['master']
         del self.args['master']
-        self.assertRaises(XfrinException, self._do_parse)
+        master_addrinfo = self._do_parse_master_port()
+        self.assertEqual(master_addrinfo[4][0], DEFAULT_MASTER)
 
 
     def test_parse_cmd_params_bad_ip4(self):
     def test_parse_cmd_params_bad_ip4(self):
         self.args['master'] = '3.3.3.3.3'
         self.args['master'] = '3.3.3.3.3'
-        self.assertRaises(XfrinException, self._do_parse)
+        self.assertRaises(XfrinException, self._do_parse_master_port)
 
 
     def test_parse_cmd_params_bad_ip6(self):
     def test_parse_cmd_params_bad_ip6(self):
         self.args['master'] = '1::1::1'
         self.args['master'] = '1::1::1'
-        self.assertRaises(XfrinException, self._do_parse)
+        self.assertRaises(XfrinException, self._do_parse_master_port)
 
 
     def test_parse_cmd_params_bad_port(self):
     def test_parse_cmd_params_bad_port(self):
         self.args['port'] = '-1'
         self.args['port'] = '-1'
-        self.assertRaises(XfrinException, self._do_parse)
+        self.assertRaises(XfrinException, self._do_parse_master_port)
 
 
         self.args['port'] = '65536'
         self.args['port'] = '65536'
-        self.assertRaises(XfrinException, self._do_parse)
+        self.assertRaises(XfrinException, self._do_parse_master_port)
 
 
         self.args['port'] = 'http'
         self.args['port'] = 'http'
-        self.assertRaises(XfrinException, self._do_parse)
+        self.assertRaises(XfrinException, self._do_parse_master_port)
 
 
     def test_command_handler_shutdown(self):
     def test_command_handler_shutdown(self):
         self.assertEqual(self.xfr.command_handler("shutdown",
         self.assertEqual(self.xfr.command_handler("shutdown",
@@ -518,11 +527,32 @@ class TestXfrin(unittest.TestCase):
         self.args['master'] = TEST_MASTER_IPV6_ADDRESS
         self.args['master'] = TEST_MASTER_IPV6_ADDRESS
         # ...but right now we disable the feature due to security concerns.
         # ...but right now we disable the feature due to security concerns.
         self.assertEqual(self.xfr.command_handler("notify",
         self.assertEqual(self.xfr.command_handler("notify",
-                                                  self.args)['result'][0], 1)
+                                                  self.args)['result'][0], 0)
 
 
     def test_command_handler_unknown(self):
     def test_command_handler_unknown(self):
         self.assertEqual(self.xfr.command_handler("xxx", None)['result'][0], 1)
         self.assertEqual(self.xfr.command_handler("xxx", None)['result'][0], 1)
 
 
+    def test_command_handler_transfers_in(self):
+        self.assertEqual(self.xfr.config_handler({})['result'][0], 0)
+        self.assertEqual(self.xfr.config_handler({'transfers_in': 3})['result'][0], 0)
+        self.assertEqual(self.xfr._max_transfers_in, 3)
+
+    def test_command_handler_masters(self):
+        master_info = {'master_addr': '1.1.1.1', 'master_port':53}
+        self.assertEqual(self.xfr.config_handler(master_info)['result'][0], 0)
+
+        master_info = {'master_addr': '1111.1.1.1', 'master_port':53 }
+        self.assertEqual(self.xfr.config_handler(master_info)['result'][0], 1)
+
+        master_info = {'master_addr': '2.2.2.2', 'master_port':530000 }
+        self.assertEqual(self.xfr.config_handler(master_info)['result'][0], 1)
+
+        master_info = {'master_addr': '2.2.2.2', 'master_port':53 } 
+        self.xfr.config_handler(master_info)
+        self.assertEqual(self.xfr._master_addr, '2.2.2.2')
+        self.assertEqual(self.xfr._master_port, 53)
+
+
 def raise_interrupt():
 def raise_interrupt():
     raise KeyboardInterrupt()
     raise KeyboardInterrupt()
 
 

+ 113 - 82
src/bin/xfrin/xfrin.py.in

@@ -28,6 +28,7 @@ import socket
 import random
 import random
 from optparse import OptionParser, OptionValueError
 from optparse import OptionParser, OptionValueError
 from isc.config.ccsession import *
 from isc.config.ccsession import *
+from isc.notify import notify_out
 try:
 try:
     from libdns_python import *
     from libdns_python import *
 except ImportError as e:
 except ImportError as e:
@@ -49,13 +50,17 @@ else:
 SPECFILE_LOCATION = SPECFILE_PATH + "/xfrin.spec"
 SPECFILE_LOCATION = SPECFILE_PATH + "/xfrin.spec"
 AUTH_SPECFILE_LOCATION = AUTH_SPECFILE_PATH + "/auth.spec"
 AUTH_SPECFILE_LOCATION = AUTH_SPECFILE_PATH + "/auth.spec"
 
 
-
+XFROUT_MODULE_NAME = 'Xfrout'
+ZONE_MANAGER_MODULE_NAME = 'Zonemgr'
+REFRESH_FROM_ZONEMGR = 'refresh_from_zonemgr'
+ZONE_XFRIN_FAILED = 'zone_xfrin_failed'
 __version__ = 'BIND10'
 __version__ = 'BIND10'
 # define xfrin rcode
 # define xfrin rcode
 XFRIN_OK = 0
 XFRIN_OK = 0
 XFRIN_FAIL = 1
 XFRIN_FAIL = 1
 
 
 DEFAULT_MASTER_PORT = '53'
 DEFAULT_MASTER_PORT = '53'
+DEFAULT_MASTER = '127.0.0.1'
 
 
 def log_error(msg):
 def log_error(msg):
     sys.stderr.write("[b10-xfrin] %s\n" % str(msg))
     sys.stderr.write("[b10-xfrin] %s\n" % str(msg))
@@ -316,14 +321,15 @@ class XfrinConnection(asyncore.dispatcher):
             sys.stdout.write('[b10-xfrin] %s\n' % str(msg))
             sys.stdout.write('[b10-xfrin] %s\n' % str(msg))
 
 
 
 
-def process_xfrin(xfrin_recorder, zone_name, rrclass, db_file, 
+def process_xfrin(server, xfrin_recorder, zone_name, rrclass, db_file, 
                   shutdown_event, master_addrinfo, check_soa, verbose):
                   shutdown_event, master_addrinfo, check_soa, verbose):
     xfrin_recorder.increment(zone_name)
     xfrin_recorder.increment(zone_name)
     sock_map = {}
     sock_map = {}
     conn = XfrinConnection(sock_map, zone_name, rrclass, db_file,
     conn = XfrinConnection(sock_map, zone_name, rrclass, db_file,
                            shutdown_event, master_addrinfo, verbose)
                            shutdown_event, master_addrinfo, verbose)
     if conn.connect_to_master():
     if conn.connect_to_master():
-        conn.do_xfrin(check_soa)
+        ret = conn.do_xfrin(check_soa)
+        server.publish_xfrin_news(zone_name, rrclass, ret)
 
 
     xfrin_recorder.decrement(zone_name)
     xfrin_recorder.decrement(zone_name)
 
 
@@ -358,32 +364,54 @@ class XfrinRecorder:
 
 
 class Xfrin:
 class Xfrin:
     def __init__(self, verbose = False):
     def __init__(self, verbose = False):
-        self._cc_setup()
         self._max_transfers_in = 10
         self._max_transfers_in = 10
+        #TODO, this is the temp way to set the zone's master.
+        self._master_addr = DEFAULT_MASTER
+        self._master_port = DEFAULT_MASTER_PORT
+        self._cc_setup()
         self.recorder = XfrinRecorder()
         self.recorder = XfrinRecorder()
         self._shutdown_event = threading.Event()
         self._shutdown_event = threading.Event()
         self._verbose = verbose
         self._verbose = verbose
 
 
     def _cc_setup(self):
     def _cc_setup(self):
-        '''
-This method is used only as part of initialization, but is implemented
-separately for convenience of unit tests; by letting the test code override
-this method we can test most of this class without requiring a command channel.
-'''
-        self._cc = isc.config.ModuleCCSession(SPECFILE_LOCATION,
+        '''This method is used only as part of initialization, but is 
+        implemented separately for convenience of unit tests; by letting 
+        the test code override this method we can test most of this class 
+        without requiring a command channel.'''
+        # Create one session for sending command to other modules, because the 
+        # listening session will block the send operation.
+        self._send_cc_session = isc.cc.Session()
+        self._module_cc = isc.config.ModuleCCSession(SPECFILE_LOCATION,
                                               self.config_handler,
                                               self.config_handler,
                                               self.command_handler)
                                               self.command_handler)
-        self._cc.start()
+        self._module_cc.start()
+        config_data = self._module_cc.get_full_config()
+        self._max_transfers_in = config_data.get("transfers_in")
+        self._master_addr = config_data.get('master_addr') or self._master_addr
+        self._master_port = config_data.get('master_port') or self._master_port
 
 
     def _cc_check_command(self):
     def _cc_check_command(self):
-        '''
-This is a straightforward wrapper for cc.check_command, but provided as
-a separate method for the convenience of unit tests.
-'''
-        self._cc.check_command()
+        '''This is a straightforward wrapper for cc.check_command, 
+        but provided as a separate method for the convenience 
+        of unit tests.'''
+        self._module_cc.check_command()
 
 
     def config_handler(self, new_config):
     def config_handler(self, new_config):
-        # TODO, process new config data
+        self._max_transfers_in = new_config.get("transfers_in") or self._max_transfers_in
+        if ('master_addr' in new_config) or ('master_port' in new_config):
+            # Check if the new master is valid, there should be library for check it.
+            # and user should change the port and address together.
+            try:
+                addr = new_config.get('master_addr') or self._master_addr
+                port = new_config.get('master_port') or self._master_port
+                check_addr_port(addr, port)
+                self._master_addr = addr
+                self._master_port = port
+            except:
+                errmsg = "bad format for zone's master: " + str(new_config)
+                log_error(errmsg)
+                return create_answer(1, errmsg)
+
         return create_answer(0)
         return create_answer(0)
 
 
     def shutdown(self):
     def shutdown(self):
@@ -397,95 +425,97 @@ a separate method for the convenience of unit tests.
                 continue
                 continue
             th.join()
             th.join()
 
 
-
     def command_handler(self, command, args):
     def command_handler(self, command, args):
         answer = create_answer(0)
         answer = create_answer(0)
         try:
         try:
             if command == 'shutdown':
             if command == 'shutdown':
                 self._shutdown_event.set()
                 self._shutdown_event.set()
-            elif command == 'retransfer' or command == 'refresh':
-                (zone_name, rrclass,
-                 master_addr, db_file) = self._parse_cmd_params(args)
-                ret = self.xfrin_start(zone_name, rrclass, db_file,
+            elif command == 'notify' or command == REFRESH_FROM_ZONEMGR:
+                # Xfrin receives the refresh/notify command from zone manager. 
+                # notify command maybe has the parameters which 
+                # specify the notifyfrom address and port, according the RFC1996, zone
+                # transfer should starts first from the notifyfrom, but now, let 'TODO' it.
+                (zone_name, rrclass) = self._parse_zone_name_and_class(args)
+                (master_addr) = check_addr_port(self._master_addr, self._master_port)
+                ret = self.xfrin_start(zone_name, 
+                                       rrclass, 
+                                       self._get_db_file(),
                                        master_addr,
                                        master_addr,
-                                   False if command == 'retransfer' else True)
+                                       True)
                 answer = create_answer(ret[0], ret[1])
                 answer = create_answer(ret[0], ret[1])
-            elif command == 'notify':
-                # This is the temporary implementation for notify.
-                # actually the notfiy command should be sent to the
-                # Zone Manager module.  Being temporary, we separate this case
-                # from refresh/retransfer while we could (and should otherwise)
-                # share the code.
-                (zone_name, rrclass,
-                 master_addr, db_file) = self._parse_cmd_params(args)
-
-                # XXX: master_addr is the sender of the notify message.
-                # It's very dangerous to naively trust it as the source of
-                # subsequent zone transfer; any remote node can easily exploit
-                # it to mount zone poisoning or DoS attacks.  We should
-                # locally identify the appropriate set of master servers.
-                # For now, we disable the code below.
-                master_is_valid = False
-
-                if master_is_valid:
-                    ret = self.xfrin_start(zone_name, rrclass, db_file,
-                                           master_addr, True)
-                else:
-                    errmsg = 'Failed to validate the master address ('
-                    errmsg += args['master'] + '), ignoring notify'
-                    ret = [1, errmsg]
+
+            elif command == 'retransfer' or command == 'refresh':
+                # Xfrin receives the retransfer/refresh from cmdctl(sent by bindctl).
+                # If the command has specified master address, do transfer from the 
+                # master address, or else do transfer from the configured masters.                
+                (zone_name, rrclass) = self._parse_zone_name_and_class(args)
+                master_addr = self._parse_master_and_port(args)
+                db_file = args.get('db_file') or self._get_db_file()
+                ret = self.xfrin_start(zone_name, 
+                                       rrclass, 
+                                       db_file, 
+                                       master_addr,
+                                       (False if command == 'retransfer' else True))
                 answer = create_answer(ret[0], ret[1])
                 answer = create_answer(ret[0], ret[1])
+
             else:
             else:
                 answer = create_answer(1, 'unknown command: ' + command)
                 answer = create_answer(1, 'unknown command: ' + command)
-
         except XfrinException as err:
         except XfrinException as err:
+            log_error('error happened for command: %s, %s' % (command, str(err)) )
             answer = create_answer(1, str(err))
             answer = create_answer(1, str(err))
-
         return answer
         return answer
 
 
-    def _parse_cmd_params(self, args):
+    def _parse_zone_name_and_class(self, args):
         zone_name = args.get('zone_name')
         zone_name = args.get('zone_name')
         if not zone_name:
         if not zone_name:
             raise XfrinException('zone name should be provided')
             raise XfrinException('zone name should be provided')
 
 
-        rrclass = args.get('rrclass')
+        rrclass = args.get('zone_class')
         if not rrclass:
         if not rrclass:
-            # The default RR class is IN.  We should fix this so that
-            # the class is always passed in the command arg (where we specify
-            # the default)
             rrclass = RRClass.IN()
             rrclass = RRClass.IN()
         else:
         else:
             try:
             try:
                 rrclass = RRClass(rrclass)
                 rrclass = RRClass(rrclass)
             except InvalidRRClass as e:
             except InvalidRRClass as e:
                 raise XfrinException('invalid RRClass: ' + rrclass)
                 raise XfrinException('invalid RRClass: ' + rrclass)
-
-        master = args.get('master')
-        if not master:
-            raise XfrinException('master address should be provided')
-
-        port_str = args.get('port')
-        if not port_str:
-            port_str = DEFAULT_MASTER_PORT
-        master_addrinfo = check_addr_port(master, port_str)
-
-        db_file = args.get('db_file')
-        if not db_file:
-            #TODO, the db file path should be got in auth server's configuration
-            # if we need access to this configuration more often, we
-            # should add it on start, and not remove it here
-            # (or, if we have writable ds, we might not need this in
-            # the first place)
-            self._cc.add_remote_config(AUTH_SPECFILE_LOCATION)
-            db_file, is_default = self._cc.get_remote_config_value("Auth", "database_file")
-            if is_default and "B10_FROM_BUILD" in os.environ:
-                # this too should be unnecessary, but currently the
-                # 'from build' override isn't stored in the config
-                # (and we don't have writable datasources yet)
-                db_file = os.environ["B10_FROM_BUILD"] + os.sep + "bind10_zones.sqlite3"
-            self._cc.remove_remote_config(AUTH_SPECFILE_LOCATION)
-
-        return (zone_name, rrclass, master_addrinfo, db_file)
+        
+        return zone_name, rrclass
+
+    def _parse_master_and_port(self, args):
+        port = args.get('port') or self._master_port
+        master = args.get('master') or self._master_addr
+        return check_addr_port(master, port)
+ 
+    def _get_db_file(self):
+        #TODO, the db file path should be got in auth server's configuration
+        # if we need access to this configuration more often, we
+        # should add it on start, and not remove it here
+        # (or, if we have writable ds, we might not need this in
+        # the first place)
+        self._module_cc.add_remote_config(AUTH_SPECFILE_LOCATION)
+        db_file, is_default = self._module_cc.get_remote_config_value("Auth", "database_file")
+        if is_default and "B10_FROM_BUILD" in os.environ:
+            # this too should be unnecessary, but currently the
+            # 'from build' override isn't stored in the config
+            # (and we don't have writable datasources yet)
+            db_file = os.environ["B10_FROM_BUILD"] + os.sep + "bind10_zones.sqlite3"
+        self._module_cc.remove_remote_config(AUTH_SPECFILE_LOCATION)
+        return db_file
+       
+    def publish_xfrin_news(self, zone_name, zone_class,  xfr_result):
+        '''Send command to xfrout/zone manager module.
+        If xfrin has finished successfully for one zone, tell the good 
+        news(command: zone_new_data_ready) to zone manager and xfrout.
+        if xfrin failed, just tell the bad news to zone manager, so that 
+        it can reset the refresh timer for that zone. '''
+        param = {'zone_name': zone_name, 'zone_class': zone_class.to_text()}
+        if xfr_result == XFRIN_OK:
+            msg = create_command(notify_out.ZONE_NEW_DATA_READY_CMD, param)
+            self._send_cc_session.group_sendmsg(msg, XFROUT_MODULE_NAME)
+            self._send_cc_session.group_sendmsg(msg, ZONE_MANAGER_MODULE_NAME)
+        else:
+            msg = create_command(ZONE_XFRIN_FAILED, param)
+            self._send_cc_session.group_sendmsg(msg, ZONE_MANAGER_MODULE_NAME)
 
 
     def startup(self):
     def startup(self):
         while not self._shutdown_event.is_set():
         while not self._shutdown_event.is_set():
@@ -504,7 +534,8 @@ a separate method for the convenience of unit tests.
             return (1, 'zone xfrin is in progress')
             return (1, 'zone xfrin is in progress')
 
 
         xfrin_thread = threading.Thread(target = process_xfrin,
         xfrin_thread = threading.Thread(target = process_xfrin,
-                                        args = (self.recorder,
+                                        args = (self,
+                                                self.recorder,
                                                 zone_name, rrclass,
                                                 zone_name, rrclass,
                                                 db_file,
                                                 db_file,
                                                 self._shutdown_event,
                                                 self._shutdown_event,

+ 18 - 1
src/bin/xfrin/xfrin.spec.pre.in

@@ -8,6 +8,17 @@
         "item_type": "integer",
         "item_type": "integer",
         "item_optional": false,
         "item_optional": false,
         "item_default": 10
         "item_default": 10
+      },
+      {
+        "item_name": "master_addr",
+        "item_type": "string",
+        "item_optional": false,
+        "item_default": ""
+      },
+      { "item_name": "master_port",
+        "item_type": "integer",
+        "item_optional": false,
+        "item_default": 53
       }
       }
     ],
     ],
     "commands": [
     "commands": [
@@ -21,9 +32,15 @@
             "item_default": ""
             "item_default": ""
           },
           },
           {
           {
+            "item_name": "zone_class",
+            "item_type": "string",
+            "item_optional": true,
+            "item_default": "IN"
+          },
+          {
             "item_name": "master",
             "item_name": "master",
             "item_type": "string",
             "item_type": "string",
-            "item_optional": false,
+            "item_optional": true,
             "item_default": ""
             "item_default": ""
           },
           },
           {
           {

+ 24 - 3
src/bin/xfrout/xfrout.py.in

@@ -28,6 +28,7 @@ import os
 from isc.config.ccsession import *
 from isc.config.ccsession import *
 from isc.log.log import *
 from isc.log.log import *
 from isc.cc import SessionError
 from isc.cc import SessionError
+from isc.notify import notify_out
 import socket
 import socket
 import select
 import select
 import errno
 import errno
@@ -303,7 +304,7 @@ class UnixSockServer(ThreadingUnixStreamServer):
         self._log = log
         self._log = log
         self.update_config_data(config_data)
         self.update_config_data(config_data)
         self._cc = cc
         self._cc = cc
-
+        
     def finish_request(self, request, client_address):
     def finish_request(self, request, client_address):
         '''Finish one request by instantiating RequestHandlerClass.'''
         '''Finish one request by instantiating RequestHandlerClass.'''
         self.RequestHandlerClass(request, client_address, self, self._log)
         self.RequestHandlerClass(request, client_address, self, self._log)
@@ -415,16 +416,25 @@ class XfroutServer:
                                 self._config_data.get('log_severity'), self._config_data.get('log_versions'),
                                 self._config_data.get('log_severity'), self._config_data.get('log_versions'),
                                 self._config_data.get('log_max_bytes'), True)
                                 self._config_data.get('log_max_bytes'), True)
         self._start_xfr_query_listener()
         self._start_xfr_query_listener()
+        self._start_notifier()
 
 
     def _start_xfr_query_listener(self):
     def _start_xfr_query_listener(self):
         '''Start a new thread to accept xfr query. '''
         '''Start a new thread to accept xfr query. '''
-    
         self._unix_socket_server = UnixSockServer(self._listen_sock_file, XfroutSession, 
         self._unix_socket_server = UnixSockServer(self._listen_sock_file, XfroutSession, 
                                                   self._shutdown_event, self._config_data,
                                                   self._shutdown_event, self._config_data,
                                                   self._cc, self._log);
                                                   self._cc, self._log);
         listener = threading.Thread(target = listen_on_xfr_query, args = (self._unix_socket_server,))
         listener = threading.Thread(target = listen_on_xfr_query, args = (self._unix_socket_server,))
         listener.start()
         listener.start()
+        
+    def _start_notifier(self):
+        datasrc = self._unix_socket_server.get_db_file()
+        self._notifier = notify_out.NotifyOut(datasrc, self._log)
+        td = threading.Thread(target = notify_out.dispatcher, args = (self._notifier,))
+        td.daemon = True
+        td.start()
 
 
+    def send_notify(self, zone_name, zone_class):
+        self._notifier.send_notify(zone_name, zone_class)
 
 
     def config_handler(self, new_config):
     def config_handler(self, new_config):
         '''Update config data. TODO. Do error check'''
         '''Update config data. TODO. Do error check'''
@@ -466,11 +476,22 @@ class XfroutServer:
             self._log.log_message("info", "Received shutdown command.")
             self._log.log_message("info", "Received shutdown command.")
             self.shutdown()
             self.shutdown()
             answer = create_answer(0)
             answer = create_answer(0)
+        
+        elif cmd == notify_out.ZONE_NEW_DATA_READY_CMD:
+            zone_name = args.get('zone_name')
+            zone_class = args.get('zone_class')
+            if zone_name and zone_class:
+                self._log.log_message("info", "Receive notify command for zone:'%s/%s'" \
+                                     % (zone_name, zone_class))
+                self.send_notify(zone_name, zone_class)
+                answer = create_answer(0)
+            else:
+                answer = create_answer(1, "Bad command parameter:" + str(args))
+
         else: 
         else: 
             answer = create_answer(1, "Unknown command:" + str(cmd))
             answer = create_answer(1, "Unknown command:" + str(cmd))
 
 
         return answer    
         return answer    
- 
 
 
     def run(self):
     def run(self):
         '''Get and process all commands sent from cfgmgr or other modules. '''
         '''Get and process all commands sent from cfgmgr or other modules. '''

+ 18 - 0
src/bin/zonemgr/Makefile.am

@@ -0,0 +1,18 @@
+SUBDIRS = tests
+
+pkglibexecdir = $(libexecdir)/@PACKAGE@
+
+pkglibexec_SCRIPTS = b10-zonemgr
+
+b10_zonemgrdir = $(DESTDIR)$(pkgdatadir)
+b10_zonemgr_DATA = zonemgr.spec
+
+CLEANFILES = b10-zonemgr zonemgr.pyc zonemgr.spec
+
+zonemgr.spec: zonemgr.spec.pre
+	$(SED) -e "s|@@LOCALSTATEDIR@@|$(localstatedir)|" zonemgr.spec.pre >$@
+
+b10-zonemgr: zonemgr.py
+	$(SED) -e "s|@@PYTHONPATH@@|@pyexecdir@|" \
+	       -e "s|@@LOCALSTATEDIR@@|$(localstatedir)|" zonemgr.py >$@
+	chmod a+x $@

+ 1 - 0
src/bin/zonemgr/TODO

@@ -0,0 +1 @@
+1. Zonemgr should support adding/deleting zones dynamically.

+ 27 - 0
src/bin/zonemgr/run_b10-zonemgr.sh.in

@@ -0,0 +1,27 @@
+#! /bin/sh
+
+# Copyright (C) 2010  Internet Systems Consortium.
+#
+# Permission to use, copy, modify, and distribute this software for any
+# purpose with or without fee is hereby granted, provided that the above
+# copyright notice and this permission notice appear in all copies.
+#
+# THE SOFTWARE IS PROVIDED "AS IS" AND INTERNET SYSTEMS CONSORTIUM
+# DISCLAIMS ALL WARRANTIES WITH REGARD TO THIS SOFTWARE INCLUDING ALL
+# IMPLIED WARRANTIES OF MERCHANTABILITY AND FITNESS. IN NO EVENT SHALL
+# INTERNET SYSTEMS CONSORTIUM BE LIABLE FOR ANY SPECIAL, DIRECT,
+# INDIRECT, OR CONSEQUENTIAL DAMAGES OR ANY DAMAGES WHATSOEVER RESULTING
+# FROM LOSS OF USE, DATA OR PROFITS, WHETHER IN AN ACTION OF CONTRACT,
+# NEGLIGENCE OR OTHER TORTIOUS ACTION, ARISING OUT OF OR IN CONNECTION
+# WITH THE USE OR PERFORMANCE OF THIS SOFTWARE.
+
+PYTHON_EXEC=${PYTHON_EXEC:-@PYTHON@}
+export PYTHON_EXEC
+
+MYPATH_PATH=@abs_top_builddir@/src/bin/zonemgr
+PYTHONPATH=@abs_top_srcdir@/src/lib/python:@abs_top_builddir@/src/lib/dns/.libs
+export PYTHONPATH
+
+cd ${MYPATH_PATH}
+${PYTHON_EXEC} b10-zonemgr
+

+ 12 - 0
src/bin/zonemgr/tests/Makefile.am

@@ -0,0 +1,12 @@
+PYTESTS = zonemgr_test.py
+EXTRA_DIST = $(PYTESTS)
+
+# later will have configure option to choose this, like: coverage run --branch
+PYCOVERAGE = $(PYTHON)
+# test using command-line arguments, so use check-local target instead of TESTS
+check-local:
+	for pytest in $(PYTESTS) ; do \
+	echo Running test: $$pytest ; \
+	env PYTHONPATH=$(abs_top_builddir)/src/bin/zonemgr:$(abs_top_srcdir)/src/lib/python:$(abs_top_builddir)/src/lib/python:$(abs_top_builddir)/src/lib/dns/.libs:$(abs_top_builddir)/src/lib/dns/python/.libs:$(abs_top_builddir)/src/lib/xfr/.libs \
+	$(PYCOVERAGE) $(abs_srcdir)/$$pytest ; \
+	done

+ 27 - 0
src/bin/zonemgr/tests/zonemgr_test.in

@@ -0,0 +1,27 @@
+#! /bin/sh
+
+# Copyright (C) 2010  Internet Systems Consortium.
+#
+# Permission to use, copy, modify, and distribute this software for any
+# purpose with or without fee is hereby granted, provided that the above
+# copyright notice and this permission notice appear in all copies.
+#
+# THE SOFTWARE IS PROVIDED "AS IS" AND INTERNET SYSTEMS CONSORTIUM
+# DISCLAIMS ALL WARRANTIES WITH REGARD TO THIS SOFTWARE INCLUDING ALL
+# IMPLIED WARRANTIES OF MERCHANTABILITY AND FITNESS. IN NO EVENT SHALL
+# INTERNET SYSTEMS CONSORTIUM BE LIABLE FOR ANY SPECIAL, DIRECT,
+# INDIRECT, OR CONSEQUENTIAL DAMAGES OR ANY DAMAGES WHATSOEVER RESULTING
+# FROM LOSS OF USE, DATA OR PROFITS, WHETHER IN AN ACTION OF CONTRACT,
+# NEGLIGENCE OR OTHER TORTIOUS ACTION, ARISING OUT OF OR IN CONNECTION
+# WITH THE USE OR PERFORMANCE OF THIS SOFTWARE.
+
+PYTHON_EXEC=${PYTHON_EXEC:-@PYTHON@}
+export PYTHON_EXEC
+
+TEST_PATH=@abs_top_srcdir@/src/bin/zonemgr/tests
+PYTHONPATH=@abs_top_srcdir@/src/bin/zonemgr:@abs_top_srcdir@/src/lib/python
+export PYTHONPATH
+
+cd ${TEST_PATH}
+exec ${PYTHON_EXEC} -O zonemgr_test.py $*
+

+ 460 - 0
src/bin/zonemgr/tests/zonemgr_test.py

@@ -0,0 +1,460 @@
+# Copyright (C) 2010  Internet Systems Consortium.
+#
+# Permission to use, copy, modify, and distribute this software for any
+# purpose with or without fee is hereby granted, provided that the above
+# copyright notice and this permission notice appear in all copies.
+#
+# THE SOFTWARE IS PROVIDED "AS IS" AND INTERNET SYSTEMS CONSORTIUM
+# DISCLAIMS ALL WARRANTIES WITH REGARD TO THIS SOFTWARE INCLUDING ALL
+# IMPLIED WARRANTIES OF MERCHANTABILITY AND FITNESS. IN NO EVENT SHALL
+# INTERNET SYSTEMS CONSORTIUM BE LIABLE FOR ANY SPECIAL, DIRECT,
+# INDIRECT, OR CONSEQUENTIAL DAMAGES OR ANY DAMAGES WHATSOEVER RESULTING
+# FROM LOSS OF USE, DATA OR PROFITS, WHETHER IN AN ACTION OF CONTRACT,
+# NEGLIGENCE OR OTHER TORTIOUS ACTION, ARISING OUT OF OR IN CONNECTION
+# WITH THE USE OR PERFORMANCE OF THIS SOFTWARE.
+
+'''Tests for the ZonemgrRefresh and Zonemgr classes '''
+
+
+import unittest
+import os
+import tempfile
+from zonemgr import *
+
+ZONE_NAME_CLASS1_IN = ("sd.cn.", "IN")
+ZONE_NAME_CLASS2_CH = ("tw.cn", "CH")
+ZONE_NAME_CLASS3_IN = ("example.com", "IN")
+ZONE_NAME_CLASS1_CH = ("sd.cn.", "CH")
+ZONE_NAME_CLASS2_IN = ("tw.cn", "IN")
+
+class ZonemgrTestException(Exception):
+    pass
+
+class MySession():
+    def __init__(self):
+        pass
+
+    def group_sendmsg(self, msg, module_name):
+        if module_name not in ("Auth", "Xfrin"):
+            raise ZonemgrTestException("module name not exist")
+
+class MyZonemgrRefresh(ZonemgrRefresh):
+    def __init__(self):
+        self._cc = MySession()
+        self._db_file = "initdb.file"
+        self._zonemgr_refresh_info = { 
+         ('sd.cn.', 'IN'): {
+         'last_refresh_time': 1280474398.822142,
+         'next_refresh_time': 1280481598.822153, 
+         'zone_soa_rdata': 'a.dns.cn. root.cnnic.cn. 2009073105 7200 3600 2419200 21600', 
+         'zone_state': 0},
+         ('tw.cn', 'CH'): {
+         'last_refresh_time': 1280474399.116421, 
+         'next_refresh_time': 1280481599.116433, 
+         'zone_soa_rdata': 'a.dns.cn. root.cnnic.cn. 2009073112 7200 3600 2419200 21600', 
+         'zone_state': 0}
+        } 
+
+class TestZonemgrRefresh(unittest.TestCase):
+    def setUp(self):
+        self.stdout_backup = sys.stdout
+        sys.stdout = open(os.devnull, 'w')
+        self.zone_refresh = MyZonemgrRefresh()
+
+    def test_random_jitter(self):
+        max = 100025.120
+        jitter = 0
+        self.assertEqual(max, self.zone_refresh._random_jitter(max, jitter))
+        jitter = max / 4
+        for i in range (0, 150):
+            self.assertTrue((3 * max / 4) <= self.zone_refresh._random_jitter(max, jitter)) 
+            self.assertTrue(self.zone_refresh._random_jitter(max, jitter) <= max) 
+            i += 1;
+
+    def test_get_current_time(self):
+        pass
+
+    def test_set_zone_timer(self):
+        max = 3600
+        jitter = 900
+        time1 = time.time()
+        self.zone_refresh._set_zone_timer(ZONE_NAME_CLASS1_IN, 3600, 900)
+        time2 = time.time()
+        zone_timeout = self.zone_refresh._zonemgr_refresh_info[ZONE_NAME_CLASS1_IN]["next_refresh_time"]
+        self.assertTrue((3600 - 900) <= (zone_timeout - time1))
+        self.assertTrue((zone_timeout - time2) <= 3600)
+
+    def test_set_zone_refresh_timer(self):
+        time1 = time.time()
+        self.zone_refresh._set_zone_refresh_timer(ZONE_NAME_CLASS1_IN)
+        zone_timeout = self.zone_refresh._zonemgr_refresh_info[ZONE_NAME_CLASS1_IN]["next_refresh_time"]
+        time2 = time.time()
+        self.assertTrue((time1 + 7200 * 3 / 4) <= zone_timeout)
+        self.assertTrue(zone_timeout <= time2 + 7200)
+        
+    def test_set_zone_retry_timer(self):
+        time1 = time.time()
+        self.zone_refresh._set_zone_retry_timer(ZONE_NAME_CLASS1_IN)
+        zone_timeout = self.zone_refresh._zonemgr_refresh_info[ZONE_NAME_CLASS1_IN]["next_refresh_time"]
+        time2 = time.time()
+        self.assertTrue((time1 + 3600 * 3 / 4) <= zone_timeout)
+        self.assertTrue(zone_timeout <= time2 + 3600)
+
+    def test_zone_not_exist(self):
+        self.assertFalse(self.zone_refresh._zone_not_exist(ZONE_NAME_CLASS1_IN))
+        self.assertTrue(self.zone_refresh._zone_not_exist(ZONE_NAME_CLASS1_CH))
+        self.assertFalse(self.zone_refresh._zone_not_exist(ZONE_NAME_CLASS2_CH))
+        self.assertTrue(self.zone_refresh._zone_not_exist(ZONE_NAME_CLASS2_IN))
+        self.assertTrue(self.zone_refresh._zone_not_exist(ZONE_NAME_CLASS3_IN))
+
+    def test_set_zone_notify_timer(self):
+        time1 = time.time()
+        self.zone_refresh._set_zone_notify_timer(ZONE_NAME_CLASS1_IN)
+        zone_timeout = self.zone_refresh._zonemgr_refresh_info[ZONE_NAME_CLASS1_IN]["next_refresh_time"]
+        time2 = time.time()
+        self.assertTrue(time1 <= zone_timeout)
+        self.assertTrue(zone_timeout <= time2)
+
+    def test_zone_is_expired(self):
+        current_time = time.time()
+        zone_expired_time = 2419200
+        self.zone_refresh._zonemgr_refresh_info[ZONE_NAME_CLASS1_IN]["last_refresh_time"] = \
+                                                  current_time - zone_expired_time - 1
+        self.assertTrue(self.zone_refresh._zone_is_expired(ZONE_NAME_CLASS1_IN))
+        self.zone_refresh._zonemgr_refresh_info[ZONE_NAME_CLASS1_IN]["last_refresh_time"] = \
+                                                  current_time - zone_expired_time + 1
+        self.assertFalse(self.zone_refresh._zone_is_expired(ZONE_NAME_CLASS1_IN))
+        self.zone_refresh._zonemgr_refresh_info[ZONE_NAME_CLASS1_IN]["zone_state"] = ZONE_EXPIRED
+        self.assertTrue(self.zone_refresh._zone_is_expired(ZONE_NAME_CLASS1_IN))
+
+    def test_get_zone_soa_rdata(self):
+        soa_rdata1  = 'a.dns.cn. root.cnnic.cn. 2009073105 7200 3600 2419200 21600' 
+        soa_rdata2  = 'a.dns.cn. root.cnnic.cn. 2009073112 7200 3600 2419200 21600' 
+        self.assertEqual(soa_rdata1, self.zone_refresh._get_zone_soa_rdata(ZONE_NAME_CLASS1_IN))
+        self.assertRaises(KeyError, self.zone_refresh._get_zone_soa_rdata, ZONE_NAME_CLASS1_CH)
+        self.assertEqual(soa_rdata2, self.zone_refresh._get_zone_soa_rdata(ZONE_NAME_CLASS2_CH))
+        self.assertRaises(KeyError, self.zone_refresh._get_zone_soa_rdata, ZONE_NAME_CLASS2_IN)
+         
+    def test_zonemgr_reload_zone(self):
+        soa_rdata = 'a.dns.cn. root.cnnic.cn. 2009073106 1800 900 2419200 21600'
+        def get_zone_soa(zone_name, db_file):
+            return (1, 2, 'sd.cn.', 'cn.sd.', 21600, 'SOA', None, 
+                    'a.dns.cn. root.cnnic.cn. 2009073106 1800 900 2419200 21600')
+        sqlite3_ds.get_zone_soa = get_zone_soa
+
+        self.zone_refresh.zonemgr_reload_zone(ZONE_NAME_CLASS1_IN)
+        self.assertEqual(soa_rdata, self.zone_refresh._zonemgr_refresh_info[ZONE_NAME_CLASS1_IN]["zone_soa_rdata"])
+
+    def test_get_zone_notifier_master(self):
+        notify_master = "192.168.1.1"
+        self.assertEqual(None, self.zone_refresh._get_zone_notifier_master(ZONE_NAME_CLASS1_IN))
+        self.zone_refresh._zonemgr_refresh_info[ZONE_NAME_CLASS1_IN]["notify_master"] = notify_master
+        self.assertEqual(notify_master, self.zone_refresh._get_zone_notifier_master(ZONE_NAME_CLASS1_IN))
+
+    def test_set_zone_notifier_master(self):
+        notify_master = "192.168.1.1"
+        self.zone_refresh._set_zone_notifier_master(ZONE_NAME_CLASS1_IN, notify_master)
+        self.assertEqual(self.zone_refresh._zonemgr_refresh_info[ZONE_NAME_CLASS1_IN]\
+                                                                ["notify_master"], notify_master)
+
+    def test_clear_zone_notifier_master(self):
+        notify_master = "192.168.1.1"
+        self.zone_refresh._zonemgr_refresh_info[ZONE_NAME_CLASS1_IN]["notify_master"] = notify_master
+        self.zone_refresh._clear_zone_notifier_master(ZONE_NAME_CLASS1_IN)
+        self.assertFalse("notify_master" in self.zone_refresh._zonemgr_refresh_info[ZONE_NAME_CLASS1_IN].keys())
+        self.zone_refresh._clear_zone_notifier_master(ZONE_NAME_CLASS2_CH)
+        self.assertFalse("notify_master" in self.zone_refresh._zonemgr_refresh_info[ZONE_NAME_CLASS2_CH].keys())
+
+    def test_get_zone_state(self):
+        self.assertEqual(ZONE_OK, self.zone_refresh._get_zone_state(ZONE_NAME_CLASS1_IN))
+        self.assertEqual(ZONE_OK, self.zone_refresh._get_zone_state(ZONE_NAME_CLASS2_CH))
+
+    def test_set_zone_state(self):
+        self.zone_refresh._set_zone_state(ZONE_NAME_CLASS1_IN, ZONE_REFRESHING)
+        self.zone_refresh._set_zone_state(ZONE_NAME_CLASS2_CH, ZONE_EXPIRED)
+        self.assertEqual(ZONE_REFRESHING, \
+                self.zone_refresh._zonemgr_refresh_info[ZONE_NAME_CLASS1_IN]["zone_state"])
+        self.assertEqual(ZONE_EXPIRED, \
+                self.zone_refresh._zonemgr_refresh_info[ZONE_NAME_CLASS2_CH]["zone_state"])
+
+    def test_get_zone_refresh_timeout(self):
+        current_time = time.time()
+        self.assertFalse("refresh_timeout" in self.zone_refresh._zonemgr_refresh_info[ZONE_NAME_CLASS1_IN].keys())
+        self.zone_refresh._zonemgr_refresh_info[ZONE_NAME_CLASS1_IN]["refresh_timeout"] = current_time
+        self.assertEqual(current_time, self.zone_refresh._get_zone_refresh_timeout(ZONE_NAME_CLASS1_IN))
+
+    def test_set_zone_refresh_timeout(self):
+        current_time = time.time()
+        self.zone_refresh._set_zone_refresh_timeout(ZONE_NAME_CLASS1_IN, current_time)
+        refresh_time = self.zone_refresh._zonemgr_refresh_info[ZONE_NAME_CLASS1_IN]["refresh_timeout"]
+        self.assertEqual(current_time, refresh_time) 
+
+    def test_get_zone_next_refresh_time(self):
+        current_time = time.time()
+        self.zone_refresh._zonemgr_refresh_info[ZONE_NAME_CLASS1_IN]["next_refresh_time"] = current_time
+        self.assertEqual(current_time, self.zone_refresh._get_zone_next_refresh_time(ZONE_NAME_CLASS1_IN))
+
+    def test_set_zone_next_refresh_time(self):
+        current_time = time.time()
+        self.zone_refresh._set_zone_next_refresh_time(ZONE_NAME_CLASS1_IN, current_time)
+        next_refresh_time = self.zone_refresh._zonemgr_refresh_info[ZONE_NAME_CLASS1_IN]["next_refresh_time"]
+        self.assertEqual(current_time, next_refresh_time)
+
+    def test_get_zone_last_refresh_time(self):
+        current_time = time.time()
+        self.zone_refresh._zonemgr_refresh_info[ZONE_NAME_CLASS1_IN]["last_refresh_time"] = current_time
+        self.assertEqual(current_time, self.zone_refresh._get_zone_last_refresh_time(ZONE_NAME_CLASS1_IN))
+
+    def test_set_zone_last_refresh_time(self):
+        current_time = time.time()
+        self.zone_refresh._set_zone_last_refresh_time(ZONE_NAME_CLASS1_IN, current_time)
+        last_refresh_time = self.zone_refresh._zonemgr_refresh_info[ZONE_NAME_CLASS1_IN]["last_refresh_time"]
+        self.assertEqual(current_time, last_refresh_time) 
+
+    def test_send_command(self):
+        self.assertRaises(ZonemgrTestException, self.zone_refresh._send_command, "Unknown", "Notify", None)
+
+    def test_zone_mgr_is_empty(self):
+        self.assertFalse(self.zone_refresh._zone_mgr_is_empty())
+        self.zone_refresh._zonemgr_refresh_info = {} 
+        self.assertTrue(self.zone_refresh._zone_mgr_is_empty())
+
+    def test_zonemgr_add_zone(self):
+        soa_rdata = 'a.dns.cn. root.cnnic.cn. 2009073106 1800 900 2419200 21600'
+
+        def get_zone_soa(zone_name, db_file):
+            return (1, 2, 'sd.cn.', 'cn.sd.', 21600, 'SOA', None, 
+                    'a.dns.cn. root.cnnic.cn. 2009073106 1800 900 2419200 21600')
+
+        sqlite3_ds.get_zone_soa = get_zone_soa
+
+        self.zone_refresh._zonemgr_refresh_info = {}
+        self.zone_refresh.zonemgr_add_zone(ZONE_NAME_CLASS1_IN)
+        self.assertEqual(1, len(self.zone_refresh._zonemgr_refresh_info))
+        zone_soa_rdata = self.zone_refresh._zonemgr_refresh_info[ZONE_NAME_CLASS1_IN]["zone_soa_rdata"]
+        self.assertEqual(soa_rdata, zone_soa_rdata) 
+        self.assertEqual(ZONE_OK, self.zone_refresh._zonemgr_refresh_info[ZONE_NAME_CLASS1_IN]["zone_state"])
+        self.assertTrue("last_refresh_time" in self.zone_refresh._zonemgr_refresh_info[ZONE_NAME_CLASS1_IN].keys())
+        self.assertTrue("next_refresh_time" in self.zone_refresh._zonemgr_refresh_info[ZONE_NAME_CLASS1_IN].keys())
+
+        def get_zone_soa2(zone_name, db_file):
+            return None
+        sqlite3_ds.get_zone_soa = get_zone_soa2
+        self.assertRaises(ZonemgrException, self.zone_refresh.zonemgr_add_zone, \
+                                          ZONE_NAME_CLASS1_IN)
+
+    def test_build_zonemgr_refresh_info(self):
+        soa_rdata = 'a.dns.cn. root.cnnic.cn. 2009073106 1800 900 2419200 21600'
+
+        def get_zones_info(db_file):
+            return [("sd.cn.", "IN")] 
+
+        def get_zone_soa(zone_name, db_file):
+            return (1, 2, 'sd.cn.', 'cn.sd.', 21600, 'SOA', None, 
+                    'a.dns.cn. root.cnnic.cn. 2009073106 1800 900 2419200 21600')
+
+        sqlite3_ds.get_zones_info = get_zones_info
+        sqlite3_ds.get_zone_soa = get_zone_soa
+
+        self.zone_refresh._zonemgr_refresh_info = {}
+        self.zone_refresh._build_zonemgr_refresh_info()
+        self.assertEqual(1, len(self.zone_refresh._zonemgr_refresh_info))
+        zone_soa_rdata = self.zone_refresh._zonemgr_refresh_info[ZONE_NAME_CLASS1_IN]["zone_soa_rdata"]
+        self.assertEqual(soa_rdata, zone_soa_rdata) 
+        self.assertEqual(ZONE_OK, self.zone_refresh._zonemgr_refresh_info[ZONE_NAME_CLASS1_IN]["zone_state"])
+        self.assertTrue("last_refresh_time" in self.zone_refresh._zonemgr_refresh_info[ZONE_NAME_CLASS1_IN].keys())
+        self.assertTrue("next_refresh_time" in self.zone_refresh._zonemgr_refresh_info[ZONE_NAME_CLASS1_IN].keys())
+
+    def test_zone_handle_notify(self):
+        self.zone_refresh.zone_handle_notify(ZONE_NAME_CLASS1_IN,"127.0.0.1")
+        notify_master = self.zone_refresh._zonemgr_refresh_info[ZONE_NAME_CLASS1_IN]["notify_master"]
+        self.assertEqual("127.0.0.1", notify_master) 
+        zone_timeout = self.zone_refresh._zonemgr_refresh_info[ZONE_NAME_CLASS1_IN]["next_refresh_time"]
+        current_time = time.time()
+        self.assertTrue(zone_timeout <= current_time)
+        self.assertRaises(ZonemgrException, self.zone_refresh.zone_handle_notify,\
+                          ("org.cn.", "IN"), "127.0.0.1")
+        self.assertRaises(ZonemgrException, self.zone_refresh.zone_handle_notify,\
+                          ZONE_NAME_CLASS3_IN, "127.0.0.1")
+
+    def test_zone_refresh_success(self):
+        soa_rdata = 'a.dns.cn. root.cnnic.cn. 2009073106 1800 900 2419200 21600'
+        def get_zone_soa(zone_name, db_file):
+            return (1, 2, 'sd.cn.', 'cn.sd.', 21600, 'SOA', None, 
+                    'a.dns.cn. root.cnnic.cn. 2009073106 1800 900 2419200 21600')
+        sqlite3_ds.get_zone_soa = get_zone_soa
+        time1 = time.time()
+        self.zone_refresh._zonemgr_refresh_info[ZONE_NAME_CLASS1_IN]["zone_state"] = ZONE_REFRESHING
+        self.zone_refresh.zone_refresh_success(ZONE_NAME_CLASS1_IN)
+        time2 = time.time()
+        zone_soa_rdata = self.zone_refresh._zonemgr_refresh_info[ZONE_NAME_CLASS1_IN]["zone_soa_rdata"]
+        self.assertEqual(soa_rdata, zone_soa_rdata) 
+        next_refresh_time = self.zone_refresh._zonemgr_refresh_info[ZONE_NAME_CLASS1_IN]["next_refresh_time"]
+        self.assertTrue((time1 + 3 * 1800 / 4) <= next_refresh_time) 
+        self.assertTrue(next_refresh_time <= time2 + 1800)
+        self.assertEqual(ZONE_OK, self.zone_refresh._zonemgr_refresh_info[ZONE_NAME_CLASS1_IN]["zone_state"])
+        last_refresh_time = self.zone_refresh._zonemgr_refresh_info[ZONE_NAME_CLASS1_IN]["last_refresh_time"]
+        self.assertTrue(time1 <= last_refresh_time)
+        self.assertTrue(last_refresh_time <= time2)
+        self.assertRaises(ZonemgrException, self.zone_refresh.zone_refresh_success, ("org.cn.", "CH"))
+        self.assertRaises(ZonemgrException, self.zone_refresh.zone_refresh_success, ZONE_NAME_CLASS3_IN) 
+
+    def test_zone_refresh_fail(self):
+        soa_rdata = 'a.dns.cn. root.cnnic.cn. 2009073105 7200 3600 2419200 21600' 
+        time1 = time.time()
+        self.zone_refresh._zonemgr_refresh_info[ZONE_NAME_CLASS1_IN]["zone_state"] = ZONE_REFRESHING
+        self.zone_refresh.zone_refresh_fail(ZONE_NAME_CLASS1_IN)
+        time2 = time.time()
+        zone_soa_rdata = self.zone_refresh._zonemgr_refresh_info[ZONE_NAME_CLASS1_IN]["zone_soa_rdata"]
+        self.assertEqual(soa_rdata, zone_soa_rdata)
+        next_refresh_time = self.zone_refresh._zonemgr_refresh_info[ZONE_NAME_CLASS1_IN]["next_refresh_time"]
+        self.assertTrue((time1 + 3 * 3600 / 4) <= next_refresh_time)
+        self.assertTrue(next_refresh_time <= time2 + 3600)
+        self.assertEqual(ZONE_OK, self.zone_refresh._zonemgr_refresh_info[ZONE_NAME_CLASS1_IN]["zone_state"])
+        self.assertRaises(ZonemgrException, self.zone_refresh.zone_refresh_fail, ("org.cn.", "CH"))
+        self.assertRaises(ZonemgrException, self.zone_refresh.zone_refresh_fail, ZONE_NAME_CLASS3_IN) 
+
+    def test_find_need_do_refresh_zone(self):
+        time1 = time.time()
+        self.zone_refresh._zonemgr_refresh_info = { 
+                ("sd.cn.","IN"):{
+                    'last_refresh_time': time1,
+                    'next_refresh_time': time1 + 7200, 
+                    'zone_soa_rdata': 'a.dns.cn. root.cnnic.cn. 2009073105 7200 3600 2419200 21600', 
+                    'zone_state': ZONE_OK},
+                ("tw.cn","CH"):{
+                    'last_refresh_time': time1 - 7200, 
+                    'next_refresh_time': time1, 
+                    'refresh_timeout': time1 + MAX_TRANSFER_TIMEOUT, 
+                    'zone_soa_rdata': 'a.dns.cn. root.cnnic.cn. 2009073112 7200 3600 2419200 21600', 
+                    'zone_state': ZONE_REFRESHING}
+                }
+        zone_need_refresh = self.zone_refresh._find_need_do_refresh_zone()
+        self.assertEqual(ZONE_NAME_CLASS1_IN, zone_need_refresh)
+
+        self.zone_refresh._zonemgr_refresh_info[ZONE_NAME_CLASS1_IN]["last_refresh_time"] = time1 - 2419200
+        self.zone_refresh._zonemgr_refresh_info[ZONE_NAME_CLASS1_IN]["zone_state"] = ZONE_EXPIRED
+        zone_need_refresh = self.zone_refresh._find_need_do_refresh_zone()
+        self.assertEqual(None, zone_need_refresh)
+
+        self.zone_refresh._zonemgr_refresh_info[ZONE_NAME_CLASS1_IN]["zone_state"] = ZONE_REFRESHING
+        self.zone_refresh._zonemgr_refresh_info[ZONE_NAME_CLASS1_IN]["notify_master"] = "192.168.0.1"
+        zone_need_refresh = self.zone_refresh._find_need_do_refresh_zone()
+        self.assertEqual(ZONE_NAME_CLASS1_IN, zone_need_refresh)
+        self.assertEqual(ZONE_EXPIRED, self.zone_refresh._zonemgr_refresh_info[ZONE_NAME_CLASS1_IN]["zone_state"])
+
+        self.zone_refresh._zonemgr_refresh_info[ZONE_NAME_CLASS2_CH]["refresh_timeout"] = time1 
+        zone_need_refresh = self.zone_refresh._find_need_do_refresh_zone()
+        self.assertEqual(ZONE_NAME_CLASS2_CH, zone_need_refresh)
+
+    def test_do_refresh(self):
+        time1 = time.time()
+        self.zone_refresh._zonemgr_refresh_info = {
+                ("sd.cn.", "IN"):{
+                    'last_refresh_time': time1 - 7200,
+                    'next_refresh_time': time1 - 1, 
+                    'zone_soa_rdata': 'a.dns.cn. root.cnnic.cn. 2009073105 7200 3600 2419200 21600', 
+                    'zone_state': ZONE_OK}
+                }
+        self.zone_refresh._do_refresh(ZONE_NAME_CLASS1_IN)
+        time2 = time.time()
+        zone_state = self.zone_refresh._zonemgr_refresh_info[ZONE_NAME_CLASS1_IN]["zone_state"]
+        self.assertEqual(ZONE_REFRESHING, zone_state)
+        refresh_timeout = self.zone_refresh._zonemgr_refresh_info[ZONE_NAME_CLASS1_IN]["refresh_timeout"] 
+        self.assertTrue(time1 + MAX_TRANSFER_TIMEOUT <= refresh_timeout)
+        self.assertTrue(time2 + MAX_TRANSFER_TIMEOUT >= refresh_timeout) 
+
+        self.zone_refresh._zonemgr_refresh_info[ZONE_NAME_CLASS1_IN]["notify_master"] = "127.0.0.1"
+        self.zone_refresh._do_refresh(ZONE_NAME_CLASS1_IN)
+        time2 = time.time()
+        zone_state = self.zone_refresh._zonemgr_refresh_info[ZONE_NAME_CLASS1_IN]["zone_state"]
+        self.assertEqual(ZONE_REFRESHING, zone_state) 
+        refresh_timeout = self.zone_refresh._zonemgr_refresh_info[ZONE_NAME_CLASS1_IN]["refresh_timeout"] 
+        self.assertTrue(time1 + MAX_TRANSFER_TIMEOUT <= refresh_timeout)
+        self.assertTrue(time2 + MAX_TRANSFER_TIMEOUT >= refresh_timeout)
+        self.assertFalse("notify_master" in self.zone_refresh._zonemgr_refresh_info[ZONE_NAME_CLASS1_IN].keys())
+
+    def test_run_timer(self):
+        """This case will run timer in daemon thread. 
+        The zone's next_refresh_time is less than now, so zonemgr will do zone refresh 
+        immediately. The zone's state will become "refreshing". 
+        Then closing the socket ,the timer will stop, and throw a ZonemgrException."""
+        time1 = time.time()
+        self.zone_refresh._zonemgr_refresh_info = {
+                ("sd.cn.", "IN"):{
+                    'last_refresh_time': time1 - 7200,
+                    'next_refresh_time': time1 - 1, 
+                    'zone_soa_rdata': 'a.dns.cn. root.cnnic.cn. 2009073105 7200 3600 2419200 21600', 
+                    'zone_state': ZONE_OK}
+                }
+        master_socket, slave_socket = socket.socketpair(socket.AF_UNIX, socket.SOCK_STREAM)
+        self.zone_refresh._socket = slave_socket
+        listener = threading.Thread(target = self.zone_refresh.run_timer, args = ())
+        listener.setDaemon(True)
+        listener.start()
+        slave_socket.close()
+        zone_state = self.zone_refresh._zonemgr_refresh_info[ZONE_NAME_CLASS1_IN]["zone_state"]
+        self.assertTrue("refresh_timeout" in self.zone_refresh._zonemgr_refresh_info[ZONE_NAME_CLASS1_IN].keys())
+        self.assertTrue(zone_state == ZONE_REFRESHING)
+
+        self.assertRaises(ZonemgrException, self.zone_refresh.run_timer)
+
+    def tearDown(self):
+        sys.stdout = self.stdout_backup
+
+
+class MyCCSession():
+    def __init__(self):
+        pass
+                    
+    def get_remote_config_value(self, module_name, identifier):
+        if module_name == "Auth" and identifier == "database_file":
+            return "initdb.file", False
+        else:
+            return "unknown", False
+
+
+class MyZonemgr(Zonemgr):
+
+    def __init__(self):
+        self._db_file = "initdb.file"
+        self._shutdown_event = threading.Event()
+        self._cc = MySession()
+        self._module_cc = MyCCSession()
+        self._config_data = {"zone_name" : "org.cn", "zone_class" : "CH", "master" : "127.0.0.1"}
+
+    def _start_zone_refresh_timer(self):
+        pass
+
+class TestZonemgr(unittest.TestCase):
+
+    def setUp(self):
+        self.zonemgr = MyZonemgr()
+
+    def test_config_handler(self):
+        config_data1 = {"zone_name" : "sd.cn.", "zone_class" : "CH", "master" : "192.168.1.1"}
+        self.zonemgr.config_handler(config_data1)
+        self.assertEqual(config_data1, self.zonemgr._config_data)
+        config_data2 = {"zone_name" : "sd.cn.", "port" : "53", "master" : "192.168.1.1"}
+        self.zonemgr.config_handler(config_data2)
+        self.assertEqual(config_data1, self.zonemgr._config_data)
+
+    def test_get_db_file(self):
+        self.assertEqual("initdb.file", self.zonemgr.get_db_file())
+    
+    def test_parse_cmd_params(self):
+        params1 = {"zone_name" : "org.cn", "zone_class" : "CH", "master" : "127.0.0.1"}
+        answer1 = (("org.cn", "CH"), "127.0.0.1")
+        self.assertEqual(answer1, self.zonemgr._parse_cmd_params(params1, ZONE_NOTIFY_COMMAND))
+        params2 = {"zone_name" : "org.cn", "zone_class" : "CH"}
+        answer2 = ("org.cn", "CH")
+        self.assertEqual(answer2, self.zonemgr._parse_cmd_params(params2, ZONE_XFRIN_SUCCESS_COMMAND))
+        self.assertRaises(ZonemgrException, self.zonemgr._parse_cmd_params, params2, ZONE_NOTIFY_COMMAND)
+        params1 = {"zone_class" : "CH"}
+        self.assertRaises(ZonemgrException, self.zonemgr._parse_cmd_params, params2, ZONE_NOTIFY_COMMAND)
+
+    def tearDown(self):
+        pass
+
+if __name__== "__main__":
+    unittest.main()

+ 524 - 0
src/bin/zonemgr/zonemgr.py.in

@@ -0,0 +1,524 @@
+#!@PYTHON@
+
+# Copyright (C) 2010  Internet Systems Consortium.
+#
+# Permission to use, copy, modify, and distribute this software for any
+# purpose with or without fee is hereby granted, provided that the above
+# copyright notice and this permission notice appear in all copies.
+#
+# THE SOFTWARE IS PROVIDED "AS IS" AND INTERNET SYSTEMS CONSORTIUM
+# DISCLAIMS ALL WARRANTIES WITH REGARD TO THIS SOFTWARE INCLUDING ALL
+# IMPLIED WARRANTIES OF MERCHANTABILITY AND FITNESS. IN NO EVENT SHALL
+# INTERNET SYSTEMS CONSORTIUM BE LIABLE FOR ANY SPECIAL, DIRECT,
+# INDIRECT, OR CONSEQUENTIAL DAMAGES OR ANY DAMAGES WHATSOEVER RESULTING
+# FROM LOSS OF USE, DATA OR PROFITS, WHETHER IN AN ACTION OF CONTRACT,
+# NEGLIGENCE OR OTHER TORTIOUS ACTION, ARISING OUT OF OR IN CONNECTION
+# WITH THE USE OR PERFORMANCE OF THIS SOFTWARE.
+
+"""\
+This file implements the Secondary Manager program.
+
+The secondary manager is one of the co-operating processes
+of BIND10, which keeps track of timers and other information
+necessary for BIND10 to act as a slave.
+"""
+
+import sys; sys.path.append ('@@PYTHONPATH@@')
+import os
+import time
+import signal
+import isc
+import random
+import threading
+import select
+import socket
+from isc.datasrc import sqlite3_ds
+from optparse import OptionParser, OptionValueError
+from isc.config.ccsession import *
+
+# If B10_FROM_BUILD is set in the environment, we use data files
+# from a directory relative to that, otherwise we use the ones
+# installed on the system
+if "B10_FROM_BUILD" in os.environ:
+    SPECFILE_PATH = os.environ["B10_FROM_BUILD"] + "/src/bin/zonemgr"
+    AUTH_SPECFILE_PATH = os.environ["B10_FROM_BUILD"] + "/src/bin/auth"
+else:
+    PREFIX = "@prefix@"
+    DATAROOTDIR = "@datarootdir@"
+    SPECFILE_PATH = "@datadir@/@PACKAGE@".replace("${datarootdir}", DATAROOTDIR).replace("${prefix}", PREFIX)
+    AUTH_SPECFILE_PATH = SPECFILE_PATH
+
+SPECFILE_LOCATION = SPECFILE_PATH + "/zonemgr.spec"
+AUTH_SPECFILE_LOCATION = AUTH_SPECFILE_PATH + "/auth.spec"
+
+__version__ = "BIND10"
+
+# define module name
+XFRIN_MODULE_NAME = 'Xfrin'
+AUTH_MODULE_NAME = 'Auth'
+
+# define command name
+ZONE_XFRIN_FAILED_COMMAND = 'zone_xfrin_failed'
+ZONE_XFRIN_SUCCESS_COMMAND = 'zone_new_data_ready'
+ZONE_REFRESH_COMMAND = 'refresh_from_zonemgr'
+ZONE_NOTIFY_COMMAND = 'notify'
+
+# define zone state
+ZONE_OK = 0
+ZONE_REFRESHING = 1
+ZONE_EXPIRED = 2
+
+# smallest refresh timeout
+LOWERBOUND_REFRESH = 10
+# smallest retry timeout
+LOWERBOUND_RETRY = 5
+# max zone transfer timeout
+MAX_TRANSFER_TIMEOUT = 14400
+
+# offsets of fields in the SOA RDATA
+REFRESH_OFFSET = 3
+RETRY_OFFSET = 4
+EXPIRED_OFFSET = 5
+
+# verbose mode
+VERBOSE_MODE = False
+
+def log_msg(msg):
+    if VERBOSE_MODE:
+        sys.stdout.write("[b10-zonemgr] %s\n" % str(msg))
+
+class ZonemgrException(Exception):
+    pass
+
+class ZonemgrRefresh:
+    """This class will maintain and manage zone refresh info.
+    It also provides methods to keep track of zone timers and 
+    do zone refresh.
+    """
+
+    def __init__(self, cc, db_file, slave_socket):
+        self._cc = cc
+        self._socket = slave_socket 
+        self._db_file = db_file
+        self._zonemgr_refresh_info = {} 
+        self._build_zonemgr_refresh_info()
+    
+    def _random_jitter(self, max, jitter):
+        """Imposes some random jitters for refresh and
+        retry timers to avoid many zones need to do refresh
+        at the same time. 
+        The value should be between (max - jitter) and max.
+        """
+        if 0 == jitter:
+            return max
+        return random.uniform(max - jitter, max)
+
+    def _get_current_time(self):
+        return time.time()
+
+    def _set_zone_timer(self, zone_name_class, max, jitter):
+        """Set zone next refresh time."""
+        self._set_zone_next_refresh_time(zone_name_class, self._get_current_time() + \
+                                            self._random_jitter(max, jitter))
+
+    def _set_zone_refresh_timer(self, zone_name_class):
+        """Set zone next refresh time after zone refresh success.
+           now + refresh*3/4 <= next_refresh_time <= now + refresh
+           """
+        zone_refresh_time = float(self._get_zone_soa_rdata(zone_name_class).split(" ")[REFRESH_OFFSET])
+        zone_refresh_time = max(LOWERBOUND_REFRESH, zone_refresh_time)
+        self._set_zone_timer(zone_name_class, zone_refresh_time, (1 * zone_refresh_time) / 4)
+
+    def _set_zone_retry_timer(self, zone_name_class):
+        """Set zone next refresh time after zone refresh fail.
+           now + retry*3/4 <= next_refresh_time <= now + retry
+           """
+        zone_retry_time = float(self._get_zone_soa_rdata(zone_name_class).split(" ")[RETRY_OFFSET])
+        zone_retry_time = max(LOWERBOUND_RETRY, zone_retry_time)
+        self._set_zone_timer(zone_name_class, zone_retry_time, (1 * zone_retry_time) / 4)
+
+    def _set_zone_notify_timer(self, zone_name_class):
+        """Set zone next refresh time after receiving notify
+           next_refresh_time = now 
+        """
+        self._set_zone_timer(zone_name_class, 0, 0)
+
+    def _zone_not_exist(self, zone_name_class):
+        """ Zone doesn't belong to zonemgr"""
+        if zone_name_class in self._zonemgr_refresh_info.keys():
+            return False
+        return True
+
+    def zone_refresh_success(self, zone_name_class):
+        """Update zone info after zone refresh success"""
+        if (self._zone_not_exist(zone_name_class)):
+            raise ZonemgrException("[b10-zonemgr] Zone (%s, %s) doesn't \
+                                    belong to zonemgr" % zone_name_class)
+            return
+        self.zonemgr_reload_zone(zone_name_class)
+        self._set_zone_refresh_timer(zone_name_class)
+        self._set_zone_state(zone_name_class, ZONE_OK)
+        self._set_zone_last_refresh_time(zone_name_class, self._get_current_time())
+
+    def zone_refresh_fail(self, zone_name_class):
+        """Update zone info after zone refresh fail"""
+        if (self._zone_not_exist(zone_name_class)):
+            raise ZonemgrException("[b10-zonemgr] Zone (%s, %s) doesn't \
+                                    belong to zonemgr" % zone_name_class)
+            return
+        self._set_zone_state(zone_name_class, ZONE_OK)
+        self._set_zone_retry_timer(zone_name_class)
+
+    def zone_handle_notify(self, zone_name_class, master):
+        """Handle zone notify"""
+        if (self._zone_not_exist(zone_name_class)):
+            raise ZonemgrException("[b10-zonemgr] Notified zone (%s, %s) doesn't \
+                                    belong to zonemgr" % zone_name_class)
+            return
+        self._set_zone_notifier_master(zone_name_class, master)
+        self._set_zone_notify_timer(zone_name_class)
+
+    def zonemgr_reload_zone(self, zone_name_class):
+        """ Reload a zone."""
+        zone_soa = sqlite3_ds.get_zone_soa(str(zone_name_class[0]), self._db_file)
+        self._zonemgr_refresh_info[zone_name_class]["zone_soa_rdata"] = zone_soa[7]
+
+    def zonemgr_add_zone(self, zone_name_class):
+        """ Add a zone into zone manager."""
+        zone_info = {}
+        zone_soa = sqlite3_ds.get_zone_soa(str(zone_name_class[0]), self._db_file)
+        if not zone_soa:
+            raise ZonemgrException("[b10-zonemgr] zone (%s, %s) doesn't have soa." % zone_name_class)
+        zone_info["zone_soa_rdata"] = zone_soa[7]
+        zone_info["zone_state"] = ZONE_OK
+        zone_info["last_refresh_time"] = self._get_current_time() 
+        zone_info["next_refresh_time"] = self._get_current_time() + \
+                                         float(zone_soa[7].split(" ")[REFRESH_OFFSET])
+        self._zonemgr_refresh_info[zone_name_class] = zone_info
+
+    def _build_zonemgr_refresh_info(self):
+        """ Build zonemgr refresh info map."""
+        log_msg("Start loading zone into zonemgr.")
+        for zone_name, zone_class in sqlite3_ds.get_zones_info(self._db_file):
+            zone_name_class = (zone_name, zone_class)
+            self.zonemgr_add_zone(zone_name_class)
+        log_msg("Finish loading zone into zonemgr.")
+
+    def _zone_is_expired(self, zone_name_class):
+        """Judge whether a zone is expired or not."""
+        zone_expired_time = float(self._get_zone_soa_rdata(zone_name_class).split(" ")[EXPIRED_OFFSET])
+        zone_last_refresh_time = self._get_zone_last_refresh_time(zone_name_class)
+        if (ZONE_EXPIRED == self._get_zone_state(zone_name_class) or
+            zone_last_refresh_time + zone_expired_time <= self._get_current_time()):
+            return True
+
+        return False
+
+    def _get_zone_soa_rdata(self, zone_name_class):
+        return self._zonemgr_refresh_info[zone_name_class]["zone_soa_rdata"]
+
+    def _get_zone_last_refresh_time(self, zone_name_class):
+        return self._zonemgr_refresh_info[zone_name_class]["last_refresh_time"]
+
+    def _set_zone_last_refresh_time(self, zone_name_class, time):
+        self._zonemgr_refresh_info[zone_name_class]["last_refresh_time"] = time
+
+    def _get_zone_notifier_master(self, zone_name_class):
+        if ("notify_master" in self._zonemgr_refresh_info[zone_name_class].keys()):
+            return self._zonemgr_refresh_info[zone_name_class]["notify_master"] 
+
+        return None
+
+    def _set_zone_notifier_master(self, zone_name_class, master_addr):
+        self._zonemgr_refresh_info[zone_name_class]["notify_master"] = master_addr
+
+    def _clear_zone_notifier_master(self, zone_name_class):
+        if ("notify_master" in self._zonemgr_refresh_info[zone_name_class].keys()):
+            del self._zonemgr_refresh_info[zone_name_class]["notify_master"]
+
+    def _get_zone_state(self, zone_name_class):
+        return self._zonemgr_refresh_info[zone_name_class]["zone_state"]
+
+    def _set_zone_state(self, zone_name_class, zone_state):
+        self._zonemgr_refresh_info[zone_name_class]["zone_state"] = zone_state 
+
+    def _get_zone_refresh_timeout(self, zone_name_class):
+        return self._zonemgr_refresh_info[zone_name_class]["refresh_timeout"]
+
+    def _set_zone_refresh_timeout(self, zone_name_class, time):
+        self._zonemgr_refresh_info[zone_name_class]["refresh_timeout"] = time
+
+    def _get_zone_next_refresh_time(self, zone_name_class):
+        return self._zonemgr_refresh_info[zone_name_class]["next_refresh_time"]
+
+    def _set_zone_next_refresh_time(self, zone_name_class, time):
+        self._zonemgr_refresh_info[zone_name_class]["next_refresh_time"] = time
+
+    def _send_command(self, module_name, command_name, params):
+        """Send command between modules."""
+        msg = create_command(command_name, params)
+        try:
+            self._cc.group_sendmsg(msg, module_name)
+        except socket.error:
+            sys.err.write("[b10-zonemgr] Failed to send to module %s, the session has been closed." % module_name) 
+
+    def _find_need_do_refresh_zone(self):
+        """Find the first zone need do refresh, if no zone need
+        do refresh, return the zone with minimum next_refresh_time.
+        """
+        zone_need_refresh = None
+        for zone_name_class in self._zonemgr_refresh_info.keys():
+            # Does the zone expired?
+            if (ZONE_EXPIRED != self._get_zone_state(zone_name_class) and 
+                self._zone_is_expired(zone_name_class)):
+                log_msg("Zone (%s, %s) is expired." % zone_name_class)
+                self._set_zone_state(zone_name_class, ZONE_EXPIRED)
+
+            zone_state = self._get_zone_state(zone_name_class)
+            # If zone is expired and doesn't receive notify, skip the zone
+            if (ZONE_EXPIRED == zone_state and 
+                (not self._get_zone_notifier_master(zone_name_class))):
+                continue
+
+            # If hasn't received refresh response but are within refresh timeout, skip the zone
+            if (ZONE_REFRESHING == zone_state and
+                (self._get_zone_refresh_timeout(zone_name_class) > self._get_current_time())):
+                continue
+                    
+            # Get the zone with minimum next_refresh_time 
+            if ((None == zone_need_refresh) or 
+                (self._get_zone_next_refresh_time(zone_name_class) < 
+                 self._get_zone_next_refresh_time(zone_need_refresh))):
+                zone_need_refresh = zone_name_class
+
+            # Find the zone need do refresh
+            if (self._get_zone_next_refresh_time(zone_need_refresh) < self._get_current_time()):
+                break
+
+        return zone_need_refresh 
+
+    
+    def _do_refresh(self, zone_name_class):
+        """Do zone refresh."""
+        log_msg("Do refresh for zone (%s, %s)." % zone_name_class)
+        self._set_zone_state(zone_name_class, ZONE_REFRESHING)
+        self._set_zone_refresh_timeout(zone_name_class, self._get_current_time() + MAX_TRANSFER_TIMEOUT) 
+        notify_master = self._get_zone_notifier_master(zone_name_class)
+        # If the zone has notify master, send notify command to xfrin module
+        if notify_master:
+            param = {"zone_name" : zone_name_class[0],
+                     "zone_class" : zone_name_class[1],
+                     "master" : notify_master
+                     }
+            self._send_command(XFRIN_MODULE_NAME, ZONE_NOTIFY_COMMAND, param) 
+            self._clear_zone_notifier_master(zone_name_class)
+        # Send refresh command to xfrin module
+        else:
+            param = {"zone_name" : zone_name_class[0],
+                     "zone_class" : zone_name_class[1]
+                    }
+            self._send_command(XFRIN_MODULE_NAME, ZONE_REFRESH_COMMAND, param)
+
+    def _zone_mgr_is_empty(self):
+        """Does zone manager has no zone?"""
+        if not len(self._zonemgr_refresh_info):
+            return True
+
+        return False
+
+    def run_timer(self):
+        """Keep track of zone timers."""
+        while True:
+            # Zonemgr has no zone.
+            if self._zone_mgr_is_empty():
+                time.sleep(LOWERBOUND_RETRY) # A better time?
+                continue
+
+            zone_need_refresh = self._find_need_do_refresh_zone()
+            # If don't get zone with minimum next refresh time, set timer timeout = LOWERBOUND_REFRESH
+            if not zone_need_refresh:
+                timeout = LOWERBOUND_RETRY
+            else:
+                timeout = self._get_zone_next_refresh_time(zone_need_refresh) - self._get_current_time()
+                if (timeout < 0):
+                    self._do_refresh(zone_need_refresh)
+                    continue
+
+            """ Wait for the socket notification for a maximum time of timeout 
+            in seconds (as float)."""
+            try:
+                (rlist, wlist, xlist) = select.select([self._socket], [], [], timeout)
+                if rlist:
+                    self._socket.recv(32)
+            except ValueError as e:
+                raise ZonemgrException("[b10-zonemgr] Socket has been closed\n")
+                break
+            except select.error as e:
+                if e.args[0] == errno.EINTR:
+                    (rlist, wlist, xlist) = ([], [], [])
+                else:
+                    raise ZonemgrException("[b10-zonemgr] Error with select(): %s\n" % err)
+                    break
+
+
+class Zonemgr:
+    """Zone manager class."""
+    def __init__(self):
+        self._setup_session()
+        self._db_file = self.get_db_file()
+        # Create socket pair for communicating between main thread and zonemgr timer thread 
+        self._master_socket, self._slave_socket = socket.socketpair(socket.AF_UNIX, socket.SOCK_STREAM)
+        self._zone_refresh= ZonemgrRefresh(self._cc, self._db_file, self._slave_socket)
+        self._start_zone_refresh_timer()
+
+        self._lock = threading.Lock()
+        self._shutdown_event = threading.Event()
+
+    def _start_zone_refresh_timer(self):
+        """Start a new thread to keep track of zone timers"""
+        listener = threading.Thread(target = self._zone_refresh.run_timer, args = ())
+        listener.setDaemon(True)
+        listener.start()
+
+    def _setup_session(self):
+        """Setup two sessions for zonemgr, one(self._module_cc) is used for receiving 
+        commands and config data sent from other modules, another one (self._cc)
+        is used to send commands to proper modules."""
+        self._cc = isc.cc.Session()
+        self._module_cc = isc.config.ModuleCCSession(SPECFILE_LOCATION,
+                                                  self.config_handler,
+                                                  self.command_handler)
+        self._module_cc.add_remote_config(AUTH_SPECFILE_LOCATION)
+        self._config_data = self._module_cc.get_full_config()
+        self._module_cc.start()
+
+    def get_db_file(self):
+        db_file, is_default = self._module_cc.get_remote_config_value(AUTH_MODULE_NAME, "database_file")
+        # this too should be unnecessary, but currently the
+        # 'from build' override isn't stored in the config
+        # (and we don't have indirect python access to datasources yet)
+        if is_default and "B10_FROM_BUILD" in os.environ:
+            db_file = os.environ["B10_FROM_BUILD"] + "/bind10_zones.sqlite3"
+        return db_file
+
+    def shutdown(self):
+        """Shutdown the zonemgr process. the thread which is keeping track of zone
+        timers should be terminated.
+        """ 
+        self._slave_socket.close()
+        self._master_socket.close()
+
+        self._shutdown_event.set()
+        main_thread = threading.currentThread()
+        for th in threading.enumerate():
+            if th is main_thread:
+                continue
+            th.join()
+
+    def config_handler(self, new_config):
+        """Update config data."""
+        answer = create_answer(0)
+        for key in new_config:
+            if key not in self._config_data:
+                answer = create_answer(1, "Unknown config data: " + str(key))
+                continue
+            self._config_data[key] = new_config[key]
+        return answer
+
+    def _parse_cmd_params(self, args, command):
+        zone_name = args.get("zone_name")
+        if not zone_name:
+            raise ZonemgrException("zone name should be provided")
+
+        zone_class = args.get("zone_class")
+        if not zone_class:
+            raise ZonemgrException("zone class should be provided")
+
+        if (command != ZONE_NOTIFY_COMMAND):
+            return (zone_name, zone_class)
+
+        master_str = args.get("master")
+        if not master_str:
+            raise ZonemgrException("master address should be provided")
+
+        return ((zone_name, zone_class), master_str)
+
+
+    def command_handler(self, command, args):
+        """Handle command receivd from command channel.
+        ZONE_NOTIFY_COMMAND is issued by Auth process; ZONE_XFRIN_SUCCESS_COMMAND 
+        and ZONE_XFRIN_FAILED_COMMAND are issued by Xfrin process; shutdown is issued
+        by a user or Boss process. """
+        answer = create_answer(0)
+        if command == ZONE_NOTIFY_COMMAND:
+            """ Handle Auth notify command"""
+            # master is the source sender of the notify message.
+            zone_name_class, master = self._parse_cmd_params(args, command)
+            log_msg("Received notify command for zone (%s, %s)." % zone_name_class)
+            with self._lock:
+                self._zone_refresh.zone_handle_notify(zone_name_class, master)
+            # Send notification to zonemgr timer thread
+            self._master_socket.send(b" ")
+
+        elif command == ZONE_XFRIN_SUCCESS_COMMAND:
+            """ Handle xfrin success command"""
+            zone_name_class = self._parse_cmd_params(args, command)
+            with self._lock:
+                self._zone_refresh.zone_refresh_success(zone_name_class)
+            self._master_socket.send(b" ")
+
+        elif command == ZONE_XFRIN_FAILED_COMMAND:
+            """ Handle xfrin fail command"""
+            zone_name_class = self._parse_cmd_params(args, command)
+            with self._lock:
+                self._zone_refresh.zone_refresh_fail(zone_name_class)
+            self._master_socket.send(b" ")
+
+        elif command == "shutdown":
+            self.shutdown()
+
+        else:
+            answer = create_answer(1, "Unknown command:" + str(command))
+
+        return answer
+
+    def run(self):
+        while not self._shutdown_event.is_set():
+            self._module_cc.check_command()
+
+zonemgrd = None
+
+def signal_handler(signal, frame):
+    if zonemgrd:
+        zonemgrd.shutdown()
+        sys.exit(0)
+
+def set_signal_handler():
+    signal.signal(signal.SIGTERM, signal_handler)
+    signal.signal(signal.SIGINT, signal_handler)
+
+def set_cmd_options(parser):
+    parser.add_option("-v", "--verbose", dest="verbose", action="store_true",
+            help="display more about what is going on")
+
+if '__main__' == __name__:
+    try:
+        parser = OptionParser()
+        set_cmd_options(parser)
+        (options, args) = parser.parse_args()
+        VERBOSE_MODE = options.verbose
+
+        set_signal_handler()
+        zonemgrd = Zonemgr()
+        zonemgrd.run()
+    except KeyboardInterrupt:
+        sys.stderr.write("[b10-zonemgr] exit zonemgr process")
+    except isc.cc.session.SessionError as e:
+        sys.stderr.write("[b10-zonemgr] Error creating ,zonemgr" 
+                           "is the command channel daemon running?")
+    except isc.config.ModuleCCSessionError as e:
+        sys.stderr.write("info", "[b10-zonemgr] exit zonemgr process:", e)
+
+    if zonemgrd:
+        zonemgrd.shutdown()
+

+ 15 - 0
src/bin/zonemgr/zonemgr.spec.pre.in

@@ -0,0 +1,15 @@
+{
+  "module_spec": {
+     "module_name": "Zonemgr",
+      "config_data":[
+      ],
+      "commands": [
+        {
+          "command_name": "shutdown",
+          "command_description": "Shut down Zonemgr",
+          "command_args": []
+        }
+      ]
+  }
+}
+     

+ 1 - 1
src/lib/python/isc/Makefile.am

@@ -1,4 +1,4 @@
-SUBDIRS = datasrc cc config log # Util
+SUBDIRS = datasrc cc config log notify # Util
 
 
 python_PYTHON = __init__.py
 python_PYTHON = __init__.py
 
 

+ 41 - 1
src/lib/python/isc/datasrc/sqlite3_ds.py

@@ -18,6 +18,13 @@
 import sqlite3, re, random
 import sqlite3, re, random
 import isc
 import isc
 
 
+
+#define the index of different part of one record
+RR_TYPE_INDEX = 5
+RR_NAME_INDEX = 2
+RR_TTL_INDEX = 4
+RR_RDATA_INDEX = 7
+
 #########################################################################
 #########################################################################
 # define exceptions
 # define exceptions
 #########################################################################
 #########################################################################
@@ -120,6 +127,39 @@ def get_zone_soa(zonename, dbfile):
 
 
     return datas
     return datas
 
 
+
+#########################################################################
+# get_zone_rrset
+#   returns the rrset of the zone with the given zone name, rrset name 
+#   and given rd type. 
+#   If the zone doesn't exist or rd type doesn't exist, return an empty list. 
+#########################################################################
+def get_zone_rrset(zonename, rr_name, rdtype, dbfile):
+    conn, cur = open(dbfile)
+    id = get_zoneid(zonename, cur)
+    cur.execute("SELECT * FROM records WHERE name = ? and zone_id = ? and rdtype = ?", 
+                [rr_name, id, rdtype])
+    datas = cur.fetchall()
+    cur.close()
+    conn.close()
+    return datas
+
+
+#########################################################################
+# get_zones_info:
+#   returns all the zones' information.
+#########################################################################
+def get_zones_info(db_file):
+    conn, cur = open(db_file)
+    cur.execute("SELECT name, rdclass FROM zones")
+    info = cur.fetchone()
+    while info:
+        yield info
+        info = cur.fetchone()
+
+    cur.close()
+    conn.close()
+
 #########################################################################
 #########################################################################
 # get_zoneid:
 # get_zoneid:
 #   returns the zone_id for a given zone name, or an empty
 #   returns the zone_id for a given zone name, or an empty
@@ -132,7 +172,7 @@ def get_zoneid(zone, cur):
         return row[0]
         return row[0]
     else:
     else:
         return ''
         return ''
-
+    
 #########################################################################
 #########################################################################
 # reverse_name:
 # reverse_name:
 #   reverse the labels of a DNS name.  (for example,
 #   reverse the labels of a DNS name.  (for example,

+ 5 - 0
src/lib/python/isc/notify/Makefile.am

@@ -0,0 +1,5 @@
+SUBDIRS = tests
+
+python_PYTHON = __init__.py notify_out.py
+
+pythondir = $(pyexecdir)/isc/notify

+ 2 - 0
src/lib/python/isc/notify/TODO

@@ -0,0 +1,2 @@
+1. the class NotifyOut should provide one interface for adding/deleting zones dynamically.
+

+ 1 - 0
src/lib/python/isc/notify/__init__.py

@@ -0,0 +1 @@
+from isc.notify.notify_out import *

+ 389 - 0
src/lib/python/isc/notify/notify_out.py

@@ -0,0 +1,389 @@
+# Copyright (C) 2010  Internet Systems Consortium.
+#
+# Permission to use, copy, modify, and distribute this software for any
+# purpose with or without fee is hereby granted, provided that the above
+# copyright notice and this permission notice appear in all copies.
+#
+# THE SOFTWARE IS PROVIDED "AS IS" AND INTERNET SYSTEMS CONSORTIUM
+# DISCLAIMS ALL WARRANTIES WITH REGARD TO THIS SOFTWARE INCLUDING ALL
+# IMPLIED WARRANTIES OF MERCHANTABILITY AND FITNESS. IN NO EVENT SHALL
+# INTERNET SYSTEMS CONSORTIUM BE LIABLE FOR ANY SPECIAL, DIRECT,
+# INDIRECT, OR CONSEQUENTIAL DAMAGES OR ANY DAMAGES WHATSOEVER RESULTING
+# FROM LOSS OF USE, DATA OR PROFITS, WHETHER IN AN ACTION OF CONTRACT,
+# NEGLIGENCE OR OTHER TORTIOUS ACTION, ARISING OUT OF OR IN CONNECTION
+# WITH THE USE OR PERFORMANCE OF THIS SOFTWARE.
+
+import select
+import random
+import socket
+import threading
+import time
+from isc.datasrc import sqlite3_ds
+import isc
+try: 
+    from libdns_python import * 
+except ImportError as e: 
+    # C++ loadable module may not be installed; 
+    sys.stderr.write('[b10-xfrout] failed to import DNS or XFR module: %s\n' % str(e)) 
+
+ZONE_NEW_DATA_READY_CMD = 'zone_new_data_ready'
+_MAX_NOTIFY_NUM = 30
+_MAX_NOTIFY_TRY_NUM = 5
+_EVENT_NONE = 0
+_EVENT_READ = 1
+_EVENT_TIMEOUT = 2
+_NOTIFY_TIMEOUT = 1
+_IDLE_SLEEP_TIME = 0.5
+
+# define the rcode for parsing notify reply message
+_REPLY_OK = 0
+_BAD_QUERY_ID = 1
+_BAD_QUERY_NAME = 2
+_BAD_OPCODE = 3
+_BAD_QR = 4
+_BAD_REPLY_PACKET = 5
+
+def addr_to_str(addr):
+    return '%s#%s' % (addr[0], addr[1])
+
+def dispatcher(notifier):
+    '''The loop function for handling notify related events.
+    If one zone get the notify reply before timeout, call the
+    handle to process the reply. If one zone can't get the notify
+    before timeout, call the handler to resend notify or notify 
+    next slave.  
+    notifier: one object of class NotifyOut. '''
+    while True:
+        replied_zones, not_replied_zones = notifier._wait_for_notify_reply()
+        if len(replied_zones) == 0 and len(not_replied_zones) == 0:
+            time.sleep(_IDLE_SLEEP_TIME) #TODO set a better time for idle sleep
+            continue
+
+        for name_ in replied_zones:
+            notifier._zone_notify_handler(replied_zones[name_], _EVENT_READ)
+            
+        for name_ in not_replied_zones:
+            if not_replied_zones[name_].notify_timeout <= time.time():
+                notifier._zone_notify_handler(not_replied_zones[name_], _EVENT_TIMEOUT)
+ 
+class ZoneNotifyInfo:
+    '''This class keeps track of notify-out information for one zone.'''
+
+    def __init__(self, zone_name_, class_):
+        '''notify_timeout_: absolute time for next notify reply. when the zone 
+        is preparing for sending notify message, notify_timeout_ is set to now, 
+        that means the first sending is triggered by the 'Timeout' mechanism. 
+        '''
+        self._notify_current = None
+        self._slave_index = 0
+        self._sock = None
+
+        self.notify_slaves = []
+        self.zone_name = zone_name_
+        self.zone_class = class_
+        self.notify_msg_id = 0
+        self.notify_timeout = 0
+        self.notify_try_num = 0  #Notify times sending to one target.
+       
+    def set_next_notify_target(self):
+        if self._slave_index < (len(self.notify_slaves) - 1):
+            self._slave_index += 1
+            self._notify_current = self.notify_slaves[self._slave_index]
+        else:
+            self._notify_current = None
+
+    def prepare_notify_out(self):
+        '''Create the socket and set notify timeout time to now'''
+        self._sock = socket.socket(socket.AF_INET, socket.SOCK_DGRAM) #TODO support IPv6?
+        self.notify_timeout = time.time()
+        self.notify_try_num = 0
+        self._slave_index = 0
+        if len(self.notify_slaves) > 0:
+            self._notify_current = self.notify_slaves[0]
+
+    def finish_notify_out(self):
+        if self._sock:
+            self._sock.close()
+            self._sock = None
+
+    def get_socket(self):
+        return self._sock
+
+    def get_current_notify_target(self):
+        return self._notify_current
+
+class NotifyOut:
+    '''This class is used to handle notify logic for all zones(sending
+    notify message to its slaves).The only interface provided to 
+    the user is send_notify(). the object of this class should be 
+    used together with function dispatcher(). '''
+    def __init__(self, datasrc_file, log=None, verbose=True):
+        self._notify_infos = {} # key is (zone_name, zone_class)
+        self._waiting_zones = []
+        self._notifying_zones = []
+        self._log = log
+        self.notify_num = 0  # the count of in progress notifies
+        self._verbose = verbose
+        self._lock = threading.Lock()
+        self._db_file = datasrc_file
+        self._init_notify_out(datasrc_file)
+
+    def _init_notify_out(self, datasrc_file):
+        '''Get all the zones name and its notify target's address
+        TODO, currently the zones are got by going through the zone 
+        table in database. There should be a better way to get them 
+        and also the setting 'also_notify', and there should be one 
+        mechanism to cover the changed datasrc.'''
+        self._db_file = datasrc_file
+        for zone_name, zone_class in sqlite3_ds.get_zones_info(datasrc_file):
+            zone_id = (zone_name, zone_class)
+            self._notify_infos[zone_id] = ZoneNotifyInfo(zone_name, zone_class)
+            slaves = self._get_notify_slaves_from_ns(zone_name)
+            for item in slaves:
+                self._notify_infos[zone_id].notify_slaves.append((item, 53))
+
+    def send_notify(self, zone_name, zone_class='IN'):
+        '''Send notify to one zone's slaves, this function is 
+        the only interface for class NotifyOut which can be called
+        by other object.
+          Internally, the function only set the zone's notify-reply
+        timeout to now, then notify message will be sent out. '''
+        if zone_name[len(zone_name) - 1] != '.':
+            zone_name += '.'
+
+        zone_id = (zone_name, zone_class)
+        if zone_id not in self._notify_infos:
+            return
+
+        with self._lock:
+            if (self.notify_num >= _MAX_NOTIFY_NUM) or (zone_id in self._notifying_zones):
+                if zone_id not in self._waiting_zones:
+                    self._waiting_zones.append(zone_id)
+            else:
+                self._notify_infos[zone_id].prepare_notify_out()
+                self.notify_num += 1 
+                self._notifying_zones.append(zone_id)
+
+    def _get_rdata_data(self, rr):
+        return rr[7].strip()
+
+    def _get_notify_slaves_from_ns(self, zone_name):
+        '''Get all NS records, then remove the primary master from ns rrset,
+        then use the name in NS record rdata part to get the a/aaaa records
+        in the same zone. the targets listed in a/aaaa record rdata are treated
+        as the notify slaves.
+        Note: this is the simplest way to get the address of slaves, 
+        but not correct, it can't handle the delegation slaves, or the CNAME
+        and DNAME logic.
+        TODO. the function should be provided by one library.'''
+        ns_rrset = sqlite3_ds.get_zone_rrset(zone_name, zone_name, 'NS', self._db_file)
+        soa_rrset = sqlite3_ds.get_zone_rrset(zone_name, zone_name, 'SOA', self._db_file)
+        ns_rr_name = []
+        for ns in ns_rrset:
+            ns_rr_name.append(self._get_rdata_data(ns)) 
+       
+        if len(soa_rrset) > 0:
+            sname = (soa_rrset[0][sqlite3_ds.RR_RDATA_INDEX].split(' '))[0].strip() #TODO, bad hardcode to get rdata part
+            if sname in ns_rr_name:
+                ns_rr_name.remove(sname)
+
+        addr_list = []
+        for rr_name in ns_rr_name:
+            a_rrset = sqlite3_ds.get_zone_rrset(zone_name, rr_name, 'A', self._db_file)
+            aaaa_rrset = sqlite3_ds.get_zone_rrset(zone_name, rr_name, 'AAAA', self._db_file)
+            for rr in a_rrset:
+                addr_list.append(self._get_rdata_data(rr))
+            for rr in aaaa_rrset:
+                addr_list.append(self._get_rdata_data(rr))
+
+        return addr_list
+
+    def _prepare_select_info(self):
+        '''Prepare the information for select(), returned 
+        value is one tuple 
+        (block_timeout, valid_socks, notifying_zones)
+        block_timeout: the timeout for select()
+        valid_socks: sockets list for waiting ready reading.
+        notifying_zones: the zones which have been triggered 
+                        for notify. '''
+        valid_socks = []
+        notifying_zones = {}
+        min_timeout = None 
+        for info in self._notify_infos:
+            sock = self._notify_infos[info].get_socket()
+            if sock:
+                valid_socks.append(sock)
+                notifying_zones[info] = self._notify_infos[info]
+                tmp_timeout = self._notify_infos[info].notify_timeout
+                if min_timeout:
+                    if tmp_timeout < min_timeout:
+                        min_timeout = tmp_timeout
+                else:
+                    min_timeout = tmp_timeout
+       
+        block_timeout = 0
+        if min_timeout:
+            block_timeout = min_timeout - time.time()
+            if block_timeout < 0:
+                block_timeout = 0
+        
+        return (block_timeout, valid_socks, notifying_zones)
+
+    def _wait_for_notify_reply(self):
+        '''receive notify replies in specified time. returned value 
+        is one tuple:(replied_zones, not_replied_zones)
+        replied_zones: the zones which receive notify reply.
+        not_replied_zones: the zones which haven't got notify reply.
+        '''
+        (block_timeout, valid_socks, notifying_zones) = self._prepare_select_info()
+        try:
+            r_fds, w, e = select.select(valid_socks, [], [], block_timeout)
+        except select.error as err:
+            if err.args[0] != EINTR:
+                return [], []
+        
+        not_replied_zones = {}
+        replied_zones = {}
+        for info in notifying_zones:
+            if notifying_zones[info].get_socket() in r_fds:
+                replied_zones[info] = notifying_zones[info]
+            else:
+                not_replied_zones[info] = notifying_zones[info]
+
+        return replied_zones, not_replied_zones
+
+    def _zone_notify_handler(self, zone_notify_info, event_type):
+        '''Notify handler for one zone. The first notify message is 
+        always triggered by the event "_EVENT_TIMEOUT" since when 
+        one zone prepares to notify its slaves, it's notify_timeout 
+        is set to now, which is used to trigger sending notify 
+        message when dispatcher() scanning zones. '''
+        tgt = zone_notify_info.get_current_notify_target()
+        if event_type == _EVENT_READ:
+            reply = self._get_notify_reply(zone_notify_info.get_socket(), tgt)
+            if reply:
+                if self._handle_notify_reply(zone_notify_info, reply):
+                    self._notify_next_target(zone_notify_info)
+
+        elif event_type == _EVENT_TIMEOUT and zone_notify_info.notify_try_num > 0:
+            self._log_msg('info', 'notify retry to %s' % addr_to_str(tgt))
+
+        tgt = zone_notify_info.get_current_notify_target()
+        if tgt:
+            zone_notify_info.notify_try_num += 1
+            if zone_notify_info.notify_try_num > _MAX_NOTIFY_TRY_NUM:
+                self._log_msg('info', 'notify to %s: retried exceeded' % addr_to_str(tgt))
+                self._notify_next_target(zone_notify_info)
+            else:
+                retry_timeout = _NOTIFY_TIMEOUT * pow(2, zone_notify_info.notify_try_num)
+                # set exponential backoff according rfc1996 section 3.6
+                zone_notify_info.notify_timeout = time.time() + retry_timeout
+                self._send_notify_message_udp(zone_notify_info, tgt)
+
+    def _notify_next_target(self, zone_notify_info):
+        '''Notify next address for the same zone. If all the targets 
+        has been notified, notify the first zone in waiting list. '''
+        zone_notify_info.notify_try_num = 0
+        zone_notify_info.set_next_notify_target()
+        tgt = zone_notify_info.get_current_notify_target()
+        if not tgt:
+            zone_notify_info.finish_notify_out()
+            with self._lock:
+                self.notify_num -= 1 
+                self._notifying_zones.remove((zone_notify_info.zone_name, 
+                                              zone_notify_info.zone_class)) 
+                # trigger notify out for waiting zones
+                if len(self._waiting_zones) > 0:
+                    zone_id = self._waiting_zones.pop(0) 
+                    self._notify_infos[zone_id].prepare_notify_out()
+                    self.notify_num += 1 
+
+    def _send_notify_message_udp(self, zone_notify_info, addrinfo):
+        msg, qid = self._create_notify_message(zone_notify_info.zone_name, 
+                                               zone_notify_info.zone_class)
+        render = MessageRenderer()
+        render.set_length_limit(512) 
+        msg.to_wire(render)
+        zone_notify_info.notify_msg_id = qid
+        sock = zone_notify_info.get_socket()
+        try:
+            sock.sendto(render.get_data(), 0, addrinfo)
+            self._log_msg('info', 'sending notify to %s' % addr_to_str(addrinfo))
+        except socket.error as err:
+            self._log_msg('error', 'send notify to %s failed: %s' % (addr_to_str(addrinfo), str(err)))
+            return False
+
+        return True
+
+    def _create_rrset_from_db_record(self, record, zone_class):
+        '''Create one rrset from one record of datasource, if the schema of record is changed, 
+        This function should be updated first. TODO, the function is copied from xfrout, there
+        should be library for creating one rrset. '''
+        rrtype_ = RRType(record[sqlite3_ds.RR_TYPE_INDEX])
+        rdata_ = Rdata(rrtype_, RRClass(zone_class), " ".join(record[sqlite3_ds.RR_RDATA_INDEX:]))
+        rrset_ = RRset(Name(record[sqlite3_ds.RR_NAME_INDEX]), RRClass(zone_class), \
+                       rrtype_, RRTTL( int(record[sqlite3_ds.RR_TTL_INDEX])))
+        rrset_.add_rdata(rdata_)
+        return rrset_
+
+    def _create_notify_message(self, zone_name, zone_class):
+        msg = Message(Message.RENDER)
+        qid = random.randint(0, 0xFFFF)
+        msg.set_qid(qid)
+        msg.set_opcode(Opcode.NOTIFY())
+        msg.set_rcode(Rcode.NOERROR())
+        msg.set_header_flag(MessageFlag.AA())
+        question = Question(Name(zone_name), RRClass(zone_class), RRType('SOA'))
+        msg.add_question(question)
+        # Add soa record to answer section
+        soa_record = sqlite3_ds.get_zone_rrset(zone_name, zone_name, 'SOA', self._db_file) 
+        rrset_soa = self._create_rrset_from_db_record(soa_record[0], zone_class)
+        msg.add_rrset(Section.ANSWER(), rrset_soa)
+        return msg, qid
+
+    def _handle_notify_reply(self, zone_notify_info, msg_data):
+        '''Parse the notify reply message.
+        TODO, the error message should be refined properly.
+        rcode will not checked here, If we get the response
+        from the slave, it means the slaves has got the notify.'''
+        msg = Message(Message.PARSE)
+        try:
+            errstr = 'notify reply error: '
+            msg.from_wire(msg_data)
+            if not msg.get_header_flag(MessageFlag.QR()):
+                self._log_msg('error', errstr + 'bad flags')
+                return _BAD_QR
+
+            if msg.get_qid() != zone_notify_info.notify_msg_id: 
+                self._log_msg('error', errstr + 'bad query ID')
+                return _BAD_QUERY_ID
+            
+            question = msg.get_question()[0]
+            if question.get_name() != Name(zone_notify_info.zone_name):
+                self._log_msg('error', errstr + 'bad query name')
+                return _BAD_QUERY_NAME
+
+            if msg.get_opcode() != Opcode.NOTIFY():
+                self._log_msg('error', errstr + 'bad opcode')
+                return _BAD_OPCODE
+        except Exception as err:
+            # We don't care what exception, just report it? 
+            self._log_msg('error', errstr + str(err))
+            return _BAD_REPLY_PACKET
+
+        return _REPLY_OK
+
+    def _get_notify_reply(self, sock, tgt_addr):
+        try:
+            msg, addr = sock.recvfrom(512)
+        except socket.error:
+            self._log_msg('error', "notify to %s failed: can't read notify reply" % addr_to_str(tgt_addr))
+            return None
+
+        return msg
+
+
+    def _log_msg(self, level, msg):
+        if self._log:
+            self._log.log_message(level, msg)
+

+ 12 - 0
src/lib/python/isc/notify/tests/Makefile.am

@@ -0,0 +1,12 @@
+PYTESTS = notify_out_test.py
+EXTRA_DIST = $(PYTESTS)
+
+# later will have configure option to choose this, like: coverage run --branch
+PYCOVERAGE = $(PYTHON)
+# test using command-line arguments, so use check-local target instead of TESTS
+check-local:
+	for pytest in $(PYTESTS) ; do \
+	echo Running test: $$pytest ; \
+	env PYTHONPATH=$(abs_top_srcdir)/src/lib/python:$(abs_top_builddir)/src/lib/python \
+	$(PYCOVERAGE) $(abs_srcdir)/$$pytest ; \
+	done

+ 28 - 0
src/lib/python/isc/notify/tests/notify_out_test.in

@@ -0,0 +1,28 @@
+#! /bin/sh
+
+# Copyright (C) 2010  Internet Systems Consortium.
+#
+# Permission to use, copy, modify, and distribute this software for any
+# purpose with or without fee is hereby granted, provided that the above
+# copyright notice and this permission notice appear in all copies.
+#
+# THE SOFTWARE IS PROVIDED "AS IS" AND INTERNET SYSTEMS CONSORTIUM
+# DISCLAIMS ALL WARRANTIES WITH REGARD TO THIS SOFTWARE INCLUDING ALL
+# IMPLIED WARRANTIES OF MERCHANTABILITY AND FITNESS. IN NO EVENT SHALL
+# INTERNET SYSTEMS CONSORTIUM BE LIABLE FOR ANY SPECIAL, DIRECT,
+# INDIRECT, OR CONSEQUENTIAL DAMAGES OR ANY DAMAGES WHATSOEVER RESULTING
+# FROM LOSS OF USE, DATA OR PROFITS, WHETHER IN AN ACTION OF CONTRACT,
+# NEGLIGENCE OR OTHER TORTIOUS ACTION, ARISING OUT OF OR IN CONNECTION
+# WITH THE USE OR PERFORMANCE OF THIS SOFTWARE.
+
+PYTHON_EXEC=${PYTHON_EXEC:-@PYTHON@}
+export PYTHON_EXEC
+
+NOTIFY_OUT_PATH=@abs_top_srcdir@/src/lib/python/isc/notify/tests
+
+PYTHONPATH=@abs_top_srcdir@/src/lib/python
+export PYTHONPATH
+
+cd ${BIND10_PATH}
+${PYTHON_EXEC} -O ${NOTIFY_OUT_PATH}/notify_out_test.py $*
+

+ 285 - 0
src/lib/python/isc/notify/tests/notify_out_test.py

@@ -0,0 +1,285 @@
+# Copyright (C) 2010  Internet Systems Consortium.
+#
+# Permission to use, copy, modify, and distribute this software for any
+# purpose with or without fee is hereby granted, provided that the above
+# copyright notice and this permission notice appear in all copies.
+#
+# THE SOFTWARE IS PROVIDED "AS IS" AND INTERNET SYSTEMS CONSORTIUM
+# DISCLAIMS ALL WARRANTIES WITH REGARD TO THIS SOFTWARE INCLUDING ALL
+# IMPLIED WARRANTIES OF MERCHANTABILITY AND FITNESS. IN NO EVENT SHALL
+# INTERNET SYSTEMS CONSORTIUM BE LIABLE FOR ANY SPECIAL, DIRECT,
+# INDIRECT, OR CONSEQUENTIAL DAMAGES OR ANY DAMAGES WHATSOEVER RESULTING
+# FROM LOSS OF USE, DATA OR PROFITS, WHETHER IN AN ACTION OF CONTRACT,
+# NEGLIGENCE OR OTHER TORTIOUS ACTION, ARISING OUT OF OR IN CONNECTION
+# WITH THE USE OR PERFORMANCE OF THIS SOFTWARE.
+
+import unittest
+import sys
+import os
+import tempfile
+import time
+import socket
+from isc.datasrc import sqlite3_ds
+from isc.notify import notify_out
+
+class TestZoneNotifyInfo(unittest.TestCase):
+    def setUp(self):
+        self.info = notify_out.ZoneNotifyInfo('cn.', 'IN')
+
+    def test_prepare_finish_notify_out(self):
+        self.info.prepare_notify_out()
+        self.assertNotEqual(self.info._sock, None)
+        self.assertIsNone(self.info._notify_current)
+
+        self.info.finish_notify_out()
+        self.assertEqual(self.info._sock, None)
+
+    def test_set_next_notify_target(self):
+        self.info.notify_slaves.append(('127.0.0.1', 53))
+        self.info.notify_slaves.append(('1.1.1.1', 5353))
+        self.info.prepare_notify_out()
+        self.assertEqual(self.info.get_current_notify_target(), ('127.0.0.1', 53))
+
+        self.assertEqual('127.0.0.1#53', notify_out.addr_to_str(('127.0.0.1', 53)))
+        self.info.set_next_notify_target()
+        self.assertEqual(self.info.get_current_notify_target(), ('1.1.1.1', 5353))
+        self.info.set_next_notify_target()
+        self.assertIsNone(self.info.get_current_notify_target())
+
+        temp_info = notify_out.ZoneNotifyInfo('com.', 'IN')
+        temp_info.prepare_notify_out()
+        self.assertIsNone(temp_info.get_current_notify_target())
+
+
+class TestNotifyOut(unittest.TestCase):
+    def setUp(self):
+        self.old_stdout = sys.stdout
+        sys.stdout = open(os.devnull, 'w')
+        self._db_file = tempfile.NamedTemporaryFile(delete=False)
+        sqlite3_ds.load(self._db_file.name, 'cn.', self._cn_data_reader)
+        sqlite3_ds.load(self._db_file.name, 'com.', self._com_data_reader)
+        self._notify = notify_out.NotifyOut(self._db_file.name)
+        self._notify._notify_infos[('com.', 'IN')] = notify_out.ZoneNotifyInfo('com.', 'IN')
+        self._notify._notify_infos[('com.', 'CH')] = notify_out.ZoneNotifyInfo('com.', 'CH')
+        self._notify._notify_infos[('cn.', 'IN')] = notify_out.ZoneNotifyInfo('cn.', 'IN')
+        self._notify._notify_infos[('org.', 'IN')] = notify_out.ZoneNotifyInfo('org.', 'IN')
+        self._notify._notify_infos[('org.', 'CH')] = notify_out.ZoneNotifyInfo('org.', 'CH')
+        
+        info = self._notify._notify_infos[('cn.', 'IN')]
+        info.notify_slaves.append(('127.0.0.1', 53))
+        info.notify_slaves.append(('1.1.1.1', 5353))
+
+    def tearDown(self):
+        sys.stdout = self.old_stdout
+        self._db_file.close()
+        os.unlink(self._db_file.name)
+
+    def test_send_notify(self):
+        self._notify.send_notify('cn')
+        self.assertEqual(self._notify.notify_num, 1)
+        self.assertEqual(self._notify._notifying_zones[0], ('cn.','IN'))
+
+        self._notify.send_notify('com')
+        self.assertEqual(self._notify.notify_num, 2)
+        self.assertEqual(self._notify._notifying_zones[1], ('com.','IN'))
+
+        notify_out._MAX_NOTIFY_NUM = 3
+        self._notify.send_notify('com', 'CH')
+        self.assertEqual(self._notify.notify_num, 3)
+        self.assertEqual(self._notify._notifying_zones[2], ('com.','CH'))
+    
+        self._notify.send_notify('org.')
+        self.assertEqual(self._notify._waiting_zones[0], ('org.', 'IN'))
+        self._notify.send_notify('org.')
+        self.assertEqual(1, len(self._notify._waiting_zones))
+
+        self._notify.send_notify('org.', 'CH')
+        self.assertEqual(2, len(self._notify._waiting_zones))
+        self.assertEqual(self._notify._waiting_zones[1], ('org.', 'CH'))
+
+    def test_wait_for_notify_reply(self):
+        self._notify.send_notify('cn.')
+        self._notify.send_notify('com.')
+    
+        notify_out._MAX_NOTIFY_NUM = 2
+        self._notify.send_notify('org.')
+        replied_zones, timeout_zones = self._notify._wait_for_notify_reply()
+        self.assertEqual(len(replied_zones), 0)
+        self.assertEqual(len(timeout_zones), 2)
+
+        # Now make one socket be readable
+        addr = ('localhost', 12340)
+        self._notify._notify_infos[('cn.', 'IN')]._sock.bind(addr)
+        self._notify._notify_infos[('cn.', 'IN')].notify_timeout = time.time() + 10
+        self._notify._notify_infos[('com.', 'IN')].notify_timeout = time.time() + 10
+        
+        send_fd = socket.socket(socket.AF_INET, socket.SOCK_DGRAM)
+        #Send some data to socket 12340, to make the target socket be readable
+        send_fd.sendto(b'data', addr)
+        replied_zones, timeout_zones = self._notify._wait_for_notify_reply()
+        self.assertEqual(len(replied_zones), 1)
+        self.assertEqual(len(timeout_zones), 1)
+        self.assertTrue(('cn.', 'IN') in replied_zones.keys())
+        self.assertTrue(('com.', 'IN') in timeout_zones.keys())
+        self.assertLess(time.time(), self._notify._notify_infos[('com.', 'IN')].notify_timeout)
+    
+    def test_notify_next_target(self):
+        self._notify.send_notify('cn.')
+        self._notify.send_notify('com.')
+        notify_out._MAX_NOTIFY_NUM = 2
+        self._notify.send_notify('org.')
+        self._notify.send_notify('com.', 'CH')
+
+        info = self._notify._notify_infos[('cn.', 'IN')]
+        self._notify._notify_next_target(info)
+        self.assertEqual(0, info.notify_try_num)
+        self.assertEqual(info.get_current_notify_target(), ('1.1.1.1', 5353))
+        self.assertEqual(2, self._notify.notify_num)
+
+        self._notify._notify_next_target(info)
+        self.assertEqual(0, info.notify_try_num)
+        self.assertIsNone(info.get_current_notify_target())
+        self.assertEqual(2, self._notify.notify_num)
+        self.assertEqual(1, len(self._notify._waiting_zones))
+
+        com_info = self._notify._notify_infos[('com.', 'IN')]
+        self._notify._notify_next_target(com_info)
+        self.assertEqual(2, self._notify.notify_num)
+        self.assertEqual(0, len(self._notify._notifying_zones))
+    
+    def test_handle_notify_reply(self):
+        self.assertEqual(notify_out._BAD_REPLY_PACKET, self._notify._handle_notify_reply(None, b'badmsg'))
+        com_info = self._notify._notify_infos[('com.', 'IN')]
+        com_info.notify_msg_id = 0X2f18
+
+        # test with right notify reply message
+        data = b'\x2f\x18\xa0\x00\x00\x01\x00\x00\x00\x00\x00\x00\x03com\x00\x00\x06\x00\x01'
+        self.assertEqual(notify_out._REPLY_OK, self._notify._handle_notify_reply(com_info, data))
+
+        # test with unright query id
+        data = b'\x2e\x18\xa0\x00\x00\x01\x00\x00\x00\x00\x00\x00\x03com\x00\x00\x06\x00\x01'
+        self.assertEqual(notify_out._BAD_QUERY_ID, self._notify._handle_notify_reply(com_info, data))
+
+        # test with unright query name
+        data = b'\x2f\x18\xa0\x00\x00\x01\x00\x00\x00\x00\x00\x00\x02cn\x00\x00\x06\x00\x01'
+        self.assertEqual(notify_out._BAD_QUERY_NAME, self._notify._handle_notify_reply(com_info, data))
+
+        # test with unright opcode
+        data = b'\x2f\x18\x80\x00\x00\x01\x00\x00\x00\x00\x00\x00\x03com\x00\x00\x06\x00\x01'
+        self.assertEqual(notify_out._BAD_OPCODE, self._notify._handle_notify_reply(com_info, data))
+
+        # test with unright qr
+        data = b'\x2f\x18\x10\x10\x00\x01\x00\x00\x00\x00\x00\x00\x03com\x00\x00\x06\x00\x01'
+        self.assertEqual(notify_out._BAD_QR, self._notify._handle_notify_reply(com_info, data))
+
+    def test_send_notify_message_udp(self):
+        com_info = self._notify._notify_infos[('cn.', 'IN')]
+        com_info.prepare_notify_out()
+        ret = self._notify._send_notify_message_udp(com_info, ('1.1.1.1', 53))
+        self.assertTrue(ret)
+
+    def test_zone_notify_handler(self):
+        old_send_msg = self._notify._send_notify_message_udp
+        def _fake_send_notify_message_udp(va1, va2): 
+            pass
+        self._notify._send_notify_message_udp = _fake_send_notify_message_udp
+        self._notify.send_notify('cn.')
+        self._notify.send_notify('com.')
+        notify_out._MAX_NOTIFY_NUM = 2
+        self._notify.send_notify('org.')
+
+        cn_info = self._notify._notify_infos[('cn.', 'IN')]
+        cn_info.prepare_notify_out()
+
+        cn_info.notify_try_num = 2
+        self._notify._zone_notify_handler(cn_info, notify_out._EVENT_TIMEOUT)
+        self.assertEqual(3, cn_info.notify_try_num)
+
+        time1 = cn_info.notify_timeout
+        self._notify._zone_notify_handler(cn_info, notify_out._EVENT_TIMEOUT)
+        self.assertEqual(4, cn_info.notify_try_num)
+        self.assertGreater(cn_info.notify_timeout, time1 + 2) # bigger than 2 seconds
+
+        cur_tgt = cn_info._notify_current
+        cn_info.notify_try_num = notify_out._MAX_NOTIFY_TRY_NUM
+        self._notify._zone_notify_handler(cn_info, notify_out._EVENT_NONE)
+        self.assertNotEqual(cur_tgt, cn_info._notify_current)
+
+    def _cn_data_reader(self):
+        zone_data = [
+        ('cn.',         '1000',  'IN',  'SOA', 'a.dns.cn. mail.cn. 1 1 1 1 1'),
+        ('cn.',         '1000',  'IN',  'NS',  'a.dns.cn.'),
+        ('cn.',         '1000',  'IN',  'NS',  'b.dns.cn.'),
+        ('cn.',         '1000',  'IN',  'NS',  'c.dns.cn.'),
+        ('a.dns.cn.',   '1000',  'IN',  'A',    '1.1.1.1'),
+        ('a.dns.cn.',   '1000',  'IN',  'AAAA', '2:2::2:2'),
+        ('b.dns.cn.',   '1000',  'IN',  'A',    '3.3.3.3'),
+        ('b.dns.cn.',   '1000',  'IN',  'AAAA', '4:4::4:4'),
+        ('b.dns.cn.',   '1000',  'IN',  'AAAA', '5:5::5:5'),
+        ('c.dns.cn.',   '1000',  'IN',  'A',    '6.6.6.6'),
+        ('c.dns.cn.',   '1000',  'IN',  'A',    '7.7.7.7'),
+        ('c.dns.cn.',   '1000',  'IN',  'AAAA', '8:8::8:8')]
+        for item in zone_data:
+            yield item
+
+    def _com_data_reader(self):
+        zone_data = [
+        ('com.',         '1000',  'IN',  'SOA', 'a.dns.com. mail.com. 1 1 1 1 1'),
+        ('com.',         '1000',  'IN',  'NS',  'a.dns.com.'),
+        ('com.',         '1000',  'IN',  'NS',  'b.dns.com.'),
+        ('com.',         '1000',  'IN',  'NS',  'c.dns.com.'),
+        ('a.dns.com.',   '1000',  'IN',  'A',    '1.1.1.1'),
+        ('b.dns.com.',   '1000',  'IN',  'A',    '3.3.3.3'),
+        ('b.dns.com.',   '1000',  'IN',  'AAAA', '4:4::4:4'),
+        ('b.dns.com.',   '1000',  'IN',  'AAAA', '5:5::5:5')]
+        for item in zone_data:
+            yield item
+
+    def test_get_notify_slaves_from_ns(self):
+        records = self._notify._get_notify_slaves_from_ns('cn.')
+        self.assertEqual(6, len(records))
+        self.assertEqual('8:8::8:8', records[5])
+        self.assertEqual('7.7.7.7', records[4])
+        self.assertEqual('6.6.6.6', records[3])
+        self.assertEqual('5:5::5:5', records[2])
+        self.assertEqual('4:4::4:4', records[1])
+        self.assertEqual('3.3.3.3', records[0])
+
+        records = self._notify._get_notify_slaves_from_ns('com.')
+        self.assertEqual(3, len(records))
+        self.assertEqual('5:5::5:5', records[2])
+        self.assertEqual('4:4::4:4', records[1])
+        self.assertEqual('3.3.3.3', records[0])
+    
+    def test_init_notify_out(self):
+        self._notify._init_notify_out(self._db_file.name)
+        self.assertListEqual([('3.3.3.3', 53), ('4:4::4:4', 53), ('5:5::5:5', 53)], 
+                             self._notify._notify_infos[('com.', 'IN')].notify_slaves)
+        
+    def test_prepare_select_info(self):
+        timeout, valid_fds, notifying_zones = self._notify._prepare_select_info()
+        self.assertEqual(0, timeout)
+        self.assertListEqual([], valid_fds)
+
+        self._notify._notify_infos[('cn.', 'IN')]._sock = 1
+        self._notify._notify_infos[('cn.', 'IN')].notify_timeout = time.time() + 5
+        timeout, valid_fds, notifying_zones = self._notify._prepare_select_info()
+        self.assertGreater(timeout, 0)
+        self.assertListEqual([1], valid_fds)
+
+        self._notify._notify_infos[('cn.', 'IN')]._sock = 1
+        self._notify._notify_infos[('cn.', 'IN')].notify_timeout = time.time() - 5
+        timeout, valid_fds, notifying_zones = self._notify._prepare_select_info()
+        self.assertEqual(timeout, 0)
+        self.assertListEqual([1], valid_fds)
+
+        self._notify._notify_infos[('com.', 'IN')]._sock = 2
+        self._notify._notify_infos[('com.', 'IN')].notify_timeout = time.time() + 5
+        timeout, valid_fds, notifying_zones = self._notify._prepare_select_info()
+        self.assertEqual(timeout, 0)
+        self.assertListEqual([2, 1], valid_fds)
+
+if __name__== "__main__":
+    unittest.main()
+
+