Browse Source

Merged trac 353

git-svn-id: svn://bind10.isc.org/svn/bind10/trunk@3240 e5f2f494-b856-4b98-b285-d166d9295462
Michal Vaner 14 years ago
parent
commit
84e585bf67

+ 6 - 0
ChangeLog

@@ -1,3 +1,9 @@
+  110.  [func]      Michal Vaner
+	Added isc.net.check module to check ip addresses and ports for correctness
+	and isc.net.addr to hold IP address. The bind10, xfrin and cmdctl programs
+	are modified to use it.
+	(Trac #353, svn r3240)
+
   109.  [func]		naokikambe
 	Added the initial version of the stats module for the statistics
 	feature of BIND 10, which supports the restricted features and

+ 2 - 0
configure.ac

@@ -497,6 +497,8 @@ AC_CONFIG_FILES([Makefile
                  src/lib/python/isc/config/tests/Makefile
                  src/lib/python/isc/log/Makefile
                  src/lib/python/isc/log/tests/Makefile
+                 src/lib/python/isc/net/Makefile
+                 src/lib/python/isc/net/tests/Makefile
                  src/lib/python/isc/notify/Makefile
                  src/lib/python/isc/notify/tests/Makefile
                  src/lib/config/Makefile

+ 20 - 46
src/bin/bind10/bind10.py.in

@@ -63,6 +63,7 @@ import pwd
 import posix
 
 import isc.cc
+import isc.net.parse
 import isc.utils.process
 
 # Assign this process some longer name
@@ -183,35 +184,10 @@ class ProcessInfo:
     def respawn(self):
         self._spawn()
 
-class IPAddr:
-    """Stores an IPv4 or IPv6 address."""
-    family = None
-    addr = None
-
-    def __init__(self, addr):
-        try:
-            a = socket.inet_pton(socket.AF_INET, addr)
-            self.family = socket.AF_INET
-            self.addr = a
-            return
-        except:
-            pass
-
-        try:
-            a = socket.inet_pton(socket.AF_INET6, addr)
-            self.family = socket.AF_INET6
-            self.addr = a
-            return
-        except Exception as e:
-            raise e
-    
-    def __str__(self):
-        return socket.inet_ntop(self.family, self.addr)
-
 class BoB:
     """Boss of BIND class."""
     
-    def __init__(self, msgq_socket_file=None, auth_port=5300, address='',
+    def __init__(self, msgq_socket_file=None, auth_port=5300, address=None,
                  nocache=False, verbose=False, setuid=None, username=None):
         """Initialize the Boss of BIND. This is a singleton (only one
         can run).
@@ -225,7 +201,7 @@ class BoB:
         self.auth_port = auth_port
         self.address = None
         if address:
-            self.address = IPAddr(address)
+            self.address = address
         self.cc_session = None
         self.ccs = None
         self.processes = {}
@@ -608,7 +584,7 @@ def reaper(signal_number, stack_frame):
     # the Python signal handler has been set up to write
     # down a pipe, waking up our select() bit
     pass
-                   
+
 def get_signame(signal_number):
     """Return the symbolic name for a signal."""
     for sig in dir(signal):
@@ -630,26 +606,24 @@ def fatal_signal(signal_number, stack_frame):
 def check_port(option, opt_str, value, parser):
     """Function to insure that the port we are passed is actually 
     a valid port number. Used by OptionParser() on startup."""
-    if not re.match('^(6553[0-5]|655[0-2]\d|65[0-4]\d\d|6[0-4]\d{3}|[1-5]\d{4}|[1-9]\d{0,3}|0)$', value):
-        raise OptionValueError("%s requires a port number (0-65535)" % opt_str)
-    if (opt_str == '-m' or opt_str == '--msgq-port'):
-        parser.values.msgq_port = value
-    elif (opt_str == '-p' or opt_str == '--port'):
-        parser.values.auth_port = value
-    else:
-        raise OptionValueError("Unknown option " + opt_str)
-  
+    try:
+        if opt_str in ['-p', '--port']:
+            parser.values.auth_port = isc.net.parse.port_parse(value)
+        else:
+            raise OptionValueError("Unknown option " + opt_str)
+    except ValueError as e:
+        raise OptionValueError(str(e))
+
 def check_addr(option, opt_str, value, parser):
     """Function to insure that the address we are passed is actually 
     a valid address. Used by OptionParser() on startup."""
     try:
-        IPAddr(value)
-    except:
+        if opt_str in ['-a', '--address']:
+            parser.values.address = isc.net.parse.addr_parse(value)
+        else:
+            raise OptionValueError("Unknown option " + opt_str)
+    except ValueError:
         raise OptionValueError("%s requires a valid IPv4 or IPv6 address" % opt_str)
-    if (opt_str == '-a' or opt_str == '--address'):
-        parser.values.address = value
-    else:
-        raise OptionValueError("Unknown option " + opt_str)
 
 def process_rename(option, opt_str, value, parser):
     """Function that renames the process if it is requested by a option."""
@@ -672,8 +646,8 @@ def main():
                       help="UNIX domain socket file the b10-msgq daemon will use")
     parser.add_option("-n", "--no-cache", action="store_true", dest="nocache",
                       default=False, help="disable hot-spot cache in b10-auth")
-    parser.add_option("-p", "--port", dest="auth_port", type="string",
-                      action="callback", callback=check_port, default="5300",
+    parser.add_option("-p", "--port", dest="auth_port", type="int",
+                      action="callback", callback=check_port, default=5300,
                       help="port the b10-auth daemon will use (default 5300)")
     parser.add_option("-u", "--user", dest="user",
                       type="string", default=None,
@@ -740,7 +714,7 @@ def main():
     signal.signal(signal.SIGPIPE, signal.SIG_IGN)
 
     # Go bob!
-    boss_of_bind = BoB(options.msgq_socket_file, int(options.auth_port),
+    boss_of_bind = BoB(options.msgq_socket_file, options.auth_port,
                        options.address, options.nocache, options.verbose,
                        setuid, username)
     startup_result = boss_of_bind.startup()

+ 3 - 24
src/bin/bind10/tests/bind10_test.py

@@ -1,4 +1,4 @@
-from bind10 import ProcessInfo, BoB, IPAddr
+from bind10 import ProcessInfo, BoB
 
 # XXX: environment tests are currently disabled, due to the preprocessor
 #      setup that we have now complicating the environment
@@ -8,6 +8,7 @@ import sys
 import os
 import signal
 import socket
+from isc.net.addr import IPAddr
 
 class TestProcessInfo(unittest.TestCase):
     def setUp(self):
@@ -72,28 +73,6 @@ class TestProcessInfo(unittest.TestCase):
         self.assertTrue(type(pi.pid) is int)
         self.assertNotEqual(pi.pid, old_pid)
 
-class TestIPAddr(unittest.TestCase):
-    def test_v6ok(self):
-        addr = IPAddr('2001:4f8::1')
-        self.assertEqual(addr.family, socket.AF_INET6)
-        self.assertEqual(addr.addr, socket.inet_pton(socket.AF_INET6, '2001:4f8::1'))
-
-    def test_v4ok(self):
-        addr = IPAddr('127.127.127.127')
-        self.assertEqual(addr.family, socket.AF_INET)
-        self.assertEqual(addr.addr, socket.inet_aton('127.127.127.127'))
-
-    def test_badaddr(self):
-        self.assertRaises(socket.error, IPAddr, 'foobar')
-        self.assertRaises(socket.error, IPAddr, 'foo::bar')
-        self.assertRaises(socket.error, IPAddr, '123')
-        self.assertRaises(socket.error, IPAddr, '123.456.789.0')
-        self.assertRaises(socket.error, IPAddr, '127/8')
-        self.assertRaises(socket.error, IPAddr, '0/0')
-        self.assertRaises(socket.error, IPAddr, '1.2.3.4/32')
-        self.assertRaises(socket.error, IPAddr, '0')
-        self.assertRaises(socket.error, IPAddr, '')
-
 class TestBoB(unittest.TestCase):
     def test_init(self):
         bob = BoB()
@@ -127,7 +106,7 @@ class TestBoB(unittest.TestCase):
         self.assertEqual(bob.runnable, False)
 
     def test_init_alternate_address(self):
-        bob = BoB(None, 5300, '127.127.127.127')
+        bob = BoB(None, 5300, IPAddr('127.127.127.127'))
         self.assertEqual(bob.verbose, False)
         self.assertEqual(bob.auth_port, 5300)
         self.assertEqual(bob.msgq_socket_file, None)

+ 9 - 13
src/bin/cmdctl/cmdctl.py.in

@@ -43,6 +43,7 @@ import random
 import time
 import signal
 from isc.config import ccsession
+import isc.net.parse
 import isc.utils.process
 from optparse import OptionParser, OptionValueError
 from hashlib import sha1
@@ -565,22 +566,17 @@ def run(addr = 'localhost', port = 8080, idle_timeout = 1200, verbose = False):
     httpd.serve_forever()
 
 def check_port(option, opt_str, value, parser):
-    if (value < 0) or (value > 65535):
-        raise OptionValueError('%s requires a port number (0-65535)' % opt_str)
-    parser.values.port = value
+    try:
+        parser.values.port = isc.net.parse.port_parse(value)
+    except ValueError as e:
+        raise OptionValueError(str(e))
 
 def check_addr(option, opt_str, value, parser):
-    ipstr = value
-    ip_family = socket.AF_INET
-    if (ipstr.find(':') != -1):
-        ip_family = socket.AF_INET6
-
     try:
-        socket.inet_pton(ip_family, ipstr)
-    except:
-        raise OptionValueError("%s invalid ip address" % ipstr)
-
-    parser.values.addr = value
+        isc.net.parse.addr_parse(value)
+        parser.values.addr = value
+    except ValueError as e:
+        raise OptionValueError(str(e))
 
 def set_cmd_options(parser):
     parser.add_option('-p', '--port', dest = 'port', type = 'int',

+ 5 - 5
src/bin/xfrin/tests/xfrin_test.py

@@ -421,21 +421,21 @@ class TestXfrin(unittest.TestCase):
         name, rrclass = self._do_parse_zone_name_class()
         master_addrinfo = self._do_parse_master_port()
         db_file = self.args.get('db_file')
-        self.assertEqual(master_addrinfo[4][1], int(TEST_MASTER_PORT))
+        self.assertEqual(master_addrinfo[2][1], int(TEST_MASTER_PORT))
         self.assertEqual(name, TEST_ZONE_NAME)
         self.assertEqual(rrclass, TEST_RRCLASS)
-        self.assertEqual(master_addrinfo[4][0], TEST_MASTER_IPV4_ADDRESS)
+        self.assertEqual(master_addrinfo[2][0], TEST_MASTER_IPV4_ADDRESS)
         self.assertEqual(db_file, TEST_DB_FILE)
 
     def test_parse_cmd_params_default_port(self):
         del self.args['port']
         master_addrinfo = self._do_parse_master_port()
-        self.assertEqual(master_addrinfo[4][1], 53)
+        self.assertEqual(master_addrinfo[2][1], 53)
 
     def test_parse_cmd_params_ip6master(self):
         self.args['master'] = TEST_MASTER_IPV6_ADDRESS
         master_addrinfo = self._do_parse_master_port()
-        self.assertEqual(master_addrinfo[4][0], TEST_MASTER_IPV6_ADDRESS)
+        self.assertEqual(master_addrinfo[2][0], TEST_MASTER_IPV6_ADDRESS)
 
     def test_parse_cmd_params_chclass(self):
         self.args['zone_class'] = 'CH'
@@ -454,7 +454,7 @@ class TestXfrin(unittest.TestCase):
         # master address is mandatory.
         del self.args['master']
         master_addrinfo = self._do_parse_master_port()
-        self.assertEqual(master_addrinfo[4][0], DEFAULT_MASTER)
+        self.assertEqual(master_addrinfo[2][0], DEFAULT_MASTER)
 
     def test_parse_cmd_params_bad_ip4(self):
         self.args['master'] = '3.3.3.3.3'

+ 17 - 27
src/bin/xfrin/xfrin.py.in

@@ -30,6 +30,7 @@ import random
 from optparse import OptionParser, OptionValueError
 from isc.config.ccsession import *
 from isc.notify import notify_out
+import isc.net.parse
 import isc.utils.process
 try:
     from pydnspp import *
@@ -94,7 +95,7 @@ class XfrinConnection(asyncore.dispatcher):
         self.setblocking(1)
         self._shutdown_event = shutdown_event
         self._verbose = verbose
-        self._master_address = master_addrinfo[4]
+        self._master_address = master_addrinfo[2]
 
     def connect_to_master(self):
         '''Connect to master in TCP.'''
@@ -406,15 +407,15 @@ class Xfrin:
     def config_handler(self, new_config):
         self._max_transfers_in = new_config.get("transfers_in") or self._max_transfers_in
         if ('master_addr' in new_config) or ('master_port' in new_config):
-            # Check if the new master is valid, there should be library for check it.
-            # and user should change the port and address together.
+            # User should change the port and address together.
             try:
                 addr = new_config.get('master_addr') or self._master_addr
                 port = new_config.get('master_port') or self._master_port
-                check_addr_port(addr, port)
+                isc.net.parse.addr_parse(addr)
+                isc.net.parse.port_parse(port)
                 self._master_addr = addr
                 self._master_port = port
-            except:
+            except ValueError:
                 errmsg = "bad format for zone's master: " + str(new_config)
                 log_error(errmsg)
                 return create_answer(1, errmsg)
@@ -443,7 +444,7 @@ class Xfrin:
                 # specify the notifyfrom address and port, according the RFC1996, zone
                 # transfer should starts first from the notifyfrom, but now, let 'TODO' it.
                 (zone_name, rrclass) = self._parse_zone_name_and_class(args)
-                (master_addr) = check_addr_port(self._master_addr, self._master_port)
+                (master_addr) = build_addr_info(self._master_addr, self._master_port)
                 ret = self.xfrin_start(zone_name, 
                                        rrclass, 
                                        self._get_db_file(),
@@ -491,7 +492,7 @@ class Xfrin:
     def _parse_master_and_port(self, args):
         port = args.get('port') or self._master_port
         master = args.get('master') or self._master_addr
-        return check_addr_port(master, port)
+        return build_addr_info(master, port)
  
     def _get_db_file(self):
         #TODO, the db file path should be got in auth server's configuration
@@ -564,31 +565,20 @@ def set_signal_handler():
     signal.signal(signal.SIGTERM, signal_handler)
     signal.signal(signal.SIGINT, signal_handler)
 
-def check_addr_port(addrstr, portstr):
-    # XXX: Linux (glibc)'s getaddrinfo incorrectly accepts numeric port
-    # string larger than 65535.  So we need to explicit validate it separately.
+def build_addr_info(addrstr, portstr):
+    """
+    Return tuple (family, socktype, sockaddr) for given address and port.
+    IPv4 and IPv6 are the only supported addresses now, so sockaddr will be
+    (address, port). The socktype is socket.SOCK_STREAM for now.
+    """
     try:
-        portnum = int(portstr)
-        if portnum < 0 or portnum > 65535:
-            raise ValueError("invalid port number (out of range): " + portstr)
+        port = isc.net.parse.port_parse(portstr)
+        addr = isc.net.parse.addr_parse(addrstr)
+        return (addr.family, socket.SOCK_STREAM, (addrstr, port))
     except ValueError as err:
         raise XfrinException("failed to resolve master address/port=%s/%s: %s" %
                              (addrstr, portstr, str(err)))
 
-    try:
-        addrinfo = socket.getaddrinfo(addrstr, portstr, socket.AF_UNSPEC,
-                                      socket.SOCK_STREAM, socket.IPPROTO_TCP,
-                                      socket.AI_NUMERICHOST|
-                                      socket.AI_NUMERICSERV)
-    except socket.gaierror as err:
-        raise XfrinException("failed to resolve master address/port=%s/%s: %s" %
-                             (addrstr, portstr, str(err)))
-    if len(addrinfo) != 1:
-        # with the parameters above the result must be uniquely determined.
-        errmsg = "unexpected result for address/port resolution for %s:%s"
-        raise XfrinException(errmsg % (addrstr, portstr))
-    return addrinfo[0]
-
 def set_cmd_options(parser):
     parser.add_option("-v", "--verbose", dest="verbose", action="store_true",
             help="display more about what is going on")

+ 1 - 1
src/lib/python/isc/Makefile.am

@@ -1,4 +1,4 @@
-SUBDIRS = datasrc cc config log notify utils # Util
+SUBDIRS = datasrc cc config log net notify utils # Util
 
 python_PYTHON = __init__.py
 

+ 5 - 0
src/lib/python/isc/net/Makefile.am

@@ -0,0 +1,5 @@
+SUBDIRS = tests
+
+python_PYTHON = __init__.py addr.py parse.py
+
+pythondir = $(pyexecdir)/isc/net

+ 3 - 0
src/lib/python/isc/net/__init__.py

@@ -0,0 +1,3 @@
+"""
+Here are function and classes that are related to networking.
+"""

+ 46 - 0
src/lib/python/isc/net/addr.py

@@ -0,0 +1,46 @@
+# Copyright (C) 2010  Internet Systems Consortium.
+#
+# Permission to use, copy, modify, and distribute this software for any
+# purpose with or without fee is hereby granted, provided that the above
+# copyright notice and this permission notice appear in all copies.
+#
+# THE SOFTWARE IS PROVIDED "AS IS" AND INTERNET SYSTEMS CONSORTIUM
+# DISCLAIMS ALL WARRANTIES WITH REGARD TO THIS SOFTWARE INCLUDING ALL
+# IMPLIED WARRANTIES OF MERCHANTABILITY AND FITNESS. IN NO EVENT SHALL
+# INTERNET SYSTEMS CONSORTIUM BE LIABLE FOR ANY SPECIAL, DIRECT,
+# INDIRECT, OR CONSEQUENTIAL DAMAGES OR ANY DAMAGES WHATSOEVER RESULTING
+# FROM LOSS OF USE, DATA OR PROFITS, WHETHER IN AN ACTION OF CONTRACT,
+# NEGLIGENCE OR OTHER TORTIOUS ACTION, ARISING OUT OF OR IN CONNECTION
+# WITH THE USE OR PERFORMANCE OF THIS SOFTWARE.
+
+"""Module where address representations live."""
+import socket
+import re
+
+class InvalidAddress(ValueError):
+    """Exception for invalid addresses."""
+    pass
+
+class IPAddr:
+    """Stores an IPv4 or IPv6 address."""
+    family = None
+    addr = None
+
+    def __init__(self, addr):
+        """
+        Creates the address object from a string representation. It raises
+        an InvalidAddr exception if the provided string isn't valid address.
+        """
+        try:
+            addrinfo = socket.getaddrinfo(addr, None, 0, 0, 0,
+                socket.AI_NUMERICHOST)[0]
+            self.family = addrinfo[0]
+            if not self.family in [socket.AF_INET, socket.AF_INET6]:
+                raise InvalidAddress(
+                    'IPAddr can hold only IPv4 or IPv6 address')
+            self.addr = socket.inet_pton(self.family, addr)
+        except socket.error as e:
+            raise InvalidAddress(str(e))
+
+    def __str__(self):
+        return socket.inet_ntop(self.family, self.addr)

+ 48 - 0
src/lib/python/isc/net/parse.py

@@ -0,0 +1,48 @@
+# Copyright (C) 2010  CZ NIC
+#
+# Permission to use, copy, modify, and distribute this software for any
+# purpose with or without fee is hereby granted, provided that the above
+# copyright notice and this permission notice appear in all copies.
+#
+# THE SOFTWARE IS PROVIDED "AS IS" AND INTERNET SYSTEMS CONSORTIUM
+# DISCLAIMS ALL WARRANTIES WITH REGARD TO THIS SOFTWARE INCLUDING ALL
+# IMPLIED WARRANTIES OF MERCHANTABILITY AND FITNESS. IN NO EVENT SHALL
+# INTERNET SYSTEMS CONSORTIUM BE LIABLE FOR ANY SPECIAL, DIRECT,
+# INDIRECT, OR CONSEQUENTIAL DAMAGES OR ANY DAMAGES WHATSOEVER RESULTING
+# FROM LOSS OF USE, DATA OR PROFITS, WHETHER IN AN ACTION OF CONTRACT,
+# NEGLIGENCE OR OTHER TORTIOUS ACTION, ARISING OUT OF OR IN CONNECTION
+# WITH THE USE OR PERFORMANCE OF THIS SOFTWARE.
+
+"""
+Checking and parsing of ports and IP addresses.
+"""
+
+from isc.net.addr import IPAddr, InvalidAddress
+import socket
+
+def port_parse(port):
+    """
+    Takes a port as an int or string and checks if it is valid. It returns
+    the port as int. If it is not a valid port (the string doesn't contain
+    number or it is not in the valid range), it raises ValueError.
+    """
+    inted = int(port)
+    if inted < 0:
+        raise ValueError("Port value " + str(inted) +
+            " too small, allower range is 0-65535")
+    if inted > 65535:
+        raise ValueError("Port value " + str(inted) +
+            " too large, allowed range is 0-65535")
+    return inted
+
+def addr_parse(addr):
+    """
+    Checks and parses an IP address (either IPv4 or IPv6) and returns
+    the IPAddr object. It raises ValueError if the passed string is not
+    valid IP address.
+    """
+    try:
+        return IPAddr(addr)
+    except InvalidAddress:
+        raise ValueError('Value ' + addr +
+            ' is not valid IPv4 or IPv6 address')

+ 12 - 0
src/lib/python/isc/net/tests/Makefile.am

@@ -0,0 +1,12 @@
+PYTESTS = addr_test.py parse_test.py
+EXTRA_DIST = $(PYTESTS)
+
+# later will have configure option to choose this, like: coverage run --branch
+PYCOVERAGE = $(PYTHON)
+# test using command-line arguments, so use check-local target instead of TESTS
+check-local:
+	for pytest in $(PYTESTS) ; do \
+	echo Running test: $$pytest ; \
+	env PYTHONPATH=$(abs_top_srcdir)/src/lib/python:$(abs_top_builddir)/src/lib/python:$(abs_top_builddir)/src/lib/dns/python/.libs \
+	$(PYCOVERAGE) $(abs_srcdir)/$$pytest || exit ; \
+	done

+ 48 - 0
src/lib/python/isc/net/tests/addr_test.py

@@ -0,0 +1,48 @@
+# Copyright (C) 2010  Internet Systems Consortium.
+#
+# Permission to use, copy, modify, and distribute this software for any
+# purpose with or without fee is hereby granted, provided that the above
+# copyright notice and this permission notice appear in all copies.
+#
+# THE SOFTWARE IS PROVIDED "AS IS" AND INTERNET SYSTEMS CONSORTIUM
+# DISCLAIMS ALL WARRANTIES WITH REGARD TO THIS SOFTWARE INCLUDING ALL
+# IMPLIED WARRANTIES OF MERCHANTABILITY AND FITNESS. IN NO EVENT SHALL
+# INTERNET SYSTEMS CONSORTIUM BE LIABLE FOR ANY SPECIAL, DIRECT,
+# INDIRECT, OR CONSEQUENTIAL DAMAGES OR ANY DAMAGES WHATSOEVER RESULTING
+# FROM LOSS OF USE, DATA OR PROFITS, WHETHER IN AN ACTION OF CONTRACT,
+# NEGLIGENCE OR OTHER TORTIOUS ACTION, ARISING OUT OF OR IN CONNECTION
+# WITH THE USE OR PERFORMANCE OF THIS SOFTWARE.
+
+"""Tests for the isc.net.addr module."""
+import unittest
+import socket
+from isc.net.addr import IPAddr, InvalidAddress
+
+class TestIPAddr(unittest.TestCase):
+    """Test for the IPAddr class."""
+    def test_v6ok(self):
+        """Test if we can parse IPv6."""
+        addr = IPAddr('2001:4f8::1')
+        self.assertEqual(addr.family, socket.AF_INET6)
+        self.assertEqual(addr.addr, socket.inet_pton(socket.AF_INET6, '2001:4f8::1'))
+
+    def test_v4ok(self):
+        """Test if we can parse IPv4."""
+        addr = IPAddr('127.127.127.127')
+        self.assertEqual(addr.family, socket.AF_INET)
+        self.assertEqual(addr.addr, socket.inet_aton('127.127.127.127'))
+
+    def test_badaddr(self):
+        """Test if we raise on wrong address."""
+        self.assertRaises(InvalidAddress, IPAddr, 'foobar')
+        self.assertRaises(InvalidAddress, IPAddr, 'foo::bar')
+        self.assertRaises(InvalidAddress, IPAddr, '123')
+        self.assertRaises(InvalidAddress, IPAddr, '123.456.789.0')
+        self.assertRaises(InvalidAddress, IPAddr, '127/8')
+        self.assertRaises(InvalidAddress, IPAddr, '0/0')
+        self.assertRaises(InvalidAddress, IPAddr, '1.2.3.4/32')
+        self.assertRaises(InvalidAddress, IPAddr, '0')
+        self.assertRaises(InvalidAddress, IPAddr, '')
+
+if __name__ == '__main__':
+    unittest.main()

+ 85 - 0
src/lib/python/isc/net/tests/parse_test.py

@@ -0,0 +1,85 @@
+# Copyright (C) 2010  CZ NIC
+#
+# Permission to use, copy, modify, and distribute this software for any
+# purpose with or without fee is hereby granted, provided that the above
+# copyright notice and this permission notice appear in all copies.
+#
+# THE SOFTWARE IS PROVIDED "AS IS" AND INTERNET SYSTEMS CONSORTIUM
+# DISCLAIMS ALL WARRANTIES WITH REGARD TO THIS SOFTWARE INCLUDING ALL
+# IMPLIED WARRANTIES OF MERCHANTABILITY AND FITNESS. IN NO EVENT SHALL
+# INTERNET SYSTEMS CONSORTIUM BE LIABLE FOR ANY SPECIAL, DIRECT,
+# INDIRECT, OR CONSEQUENTIAL DAMAGES OR ANY DAMAGES WHATSOEVER RESULTING
+# FROM LOSS OF USE, DATA OR PROFITS, WHETHER IN AN ACTION OF CONTRACT,
+# NEGLIGENCE OR OTHER TORTIOUS ACTION, ARISING OUT OF OR IN CONNECTION
+# WITH THE USE OR PERFORMANCE OF THIS SOFTWARE.
+
+"""Tests for isc.net.parse."""
+import unittest
+import socket
+from isc.net.parse import port_parse, addr_parse
+
+class TestCheckPort(unittest.TestCase):
+    """
+    Testcases for the isc.net.port_parse function
+    """
+    def test_fail(self):
+        """
+        Test if it fails on invalid input in the correct way.
+        """
+        self.assertRaises(ValueError, port_parse, "not a number")
+        self.assertRaises(ValueError, port_parse, -1)
+        self.assertRaises(ValueError, port_parse, 65536)
+
+    def test_success(self):
+        """
+        Test if it succeeds on valid inputs and returns the correct output
+        """
+        self.assertEqual(port_parse(0), 0)
+        self.assertEqual(port_parse("65535"), 65535)
+        self.assertEqual(port_parse(1234), 1234)
+
+class TestCheckIP(unittest.TestCase):
+    """
+    Testcases for the isc.net.ip_par function
+    """
+    def test_fail(self):
+        """
+        Test if it fails on invalid input the correct way.
+        """
+        self.assertRaises(ValueError, addr_parse, "not an address")
+        self.assertRaises(ValueError, addr_parse, "123.456.789.012")
+        self.assertRaises(ValueError, addr_parse, "123.0.0.")
+        # Address range not allowed
+        self.assertRaises(ValueError, addr_parse, "192.0.2.0/24")
+        try:
+            # XXX: MacOS X's inet_pton() doesn't reject this form, so we
+            # check the behavior of the underlying library implementation
+            # before the actual test
+            socket.inet_pton(socket.AF_INET, "0000.0.0.0")
+        except socket.error:
+            self.assertRaises(ValueError, addr_parse, "0000.0.0.0")
+        self.assertRaises(ValueError, addr_parse, "bada:ddr0::")
+        self.assertRaises(ValueError, addr_parse, "2001:db8::/32")
+        # This should be one part too long (eg. 9 segments)
+        self.assertRaises(ValueError, addr_parse, "2001:db8:0:0:0:0:0:0:0")
+        # Only one :: allowed
+        self.assertRaises(ValueError, addr_parse, "2001::db8::c")
+
+    def test_success(self):
+        """
+        Test if it succeeds on valid inputs and returns addresses that look
+        the same.
+        """
+        self.assertEqual("192.0.2.0", str(addr_parse("192.0.2.0")))
+        # The OS could return something else than canonical form, in which
+        # case the test would fail. However, I do not see an easy way to fix
+        # the test, so it is left this way unless someone finds an OS that
+        # does return something else.
+        self.assertEqual("2001:bd8::", str(addr_parse("2001:bd8::")))
+        # It should strip the unnecesarry parts
+        self.assertEqual("2001:bd8::", str(addr_parse("2001:bd8:0:0:0:0:0:0")))
+        self.assertEqual("::", str(addr_parse("::")))
+        self.assertEqual("2001:bd8::", str(addr_parse("2001:bd8::0.0.0.0")))
+
+if __name__ == "__main__":
+    unittest.main()