|
@@ -1,4 +1,4 @@
|
|
|
-# Copyright (C) 2011 Internet Systems Consortium.
|
|
|
+# Copyright (C) 2011,2012 Internet Systems Consortium.
|
|
|
#
|
|
|
# Permission to use, copy, modify, and distribute this software for any
|
|
|
# purpose with or without fee is hereby granted, provided that the above
|
|
@@ -23,55 +23,150 @@ import socket
|
|
|
from isc.net.addr import IPAddr
|
|
|
import time
|
|
|
import isc
|
|
|
+import fcntl
|
|
|
|
|
|
class TestDhcpv6Daemon(unittest.TestCase):
|
|
|
def setUp(self):
|
|
|
+ # don't redirect stdout/stderr here as we want to print out things
|
|
|
+ # during the test
|
|
|
+ pass
|
|
|
+
|
|
|
+ def tearDown(self):
|
|
|
+ pass
|
|
|
+
|
|
|
+ def runCommand(self, params, wait=1):
|
|
|
+ """
|
|
|
+ This method runs a command and returns a touple: (returncode, stdout, stderr)
|
|
|
+ """
|
|
|
+ ## @todo: Convert this into generic method and reuse it in dhcp4 and dhcp6
|
|
|
+
|
|
|
+ print("Running command: %s" % (" ".join(params)))
|
|
|
+
|
|
|
+ # redirect stdout to a pipe so we can check that our
|
|
|
+ # process spawning is doing the right thing with stdout
|
|
|
+ self.stdout_old = os.dup(sys.stdout.fileno())
|
|
|
+ self.stdout_pipes = os.pipe()
|
|
|
+ os.dup2(self.stdout_pipes[1], sys.stdout.fileno())
|
|
|
+ os.close(self.stdout_pipes[1])
|
|
|
+
|
|
|
+ # do the same trick for stderr:
|
|
|
+ self.stderr_old = os.dup(sys.stderr.fileno())
|
|
|
+ self.stderr_pipes = os.pipe()
|
|
|
+ os.dup2(self.stderr_pipes[1], sys.stderr.fileno())
|
|
|
+ os.close(self.stderr_pipes[1])
|
|
|
|
|
|
- # Let's print this out before we redirect out stdout.
|
|
|
- print("Please ignore any socket errors. Purpose of this test is to")
|
|
|
- print("verify that DHCPv6 process could be started, not that socket")
|
|
|
- print("could be bound. Binding fails when run as non-root user.")
|
|
|
-
|
|
|
- # Redirect stdout to a pipe so we can check that our
|
|
|
- # process spawning is doing the right thing with stdout.
|
|
|
- self.old_stdout = os.dup(sys.stdout.fileno())
|
|
|
- self.pipes = os.pipe()
|
|
|
- os.dup2(self.pipes[1], sys.stdout.fileno())
|
|
|
- os.close(self.pipes[1])
|
|
|
# note that we use dup2() to restore the original stdout
|
|
|
# to the main program ASAP in each test... this prevents
|
|
|
# hangs reading from the child process (as the pipe is only
|
|
|
# open in the child), and also insures nice pretty output
|
|
|
|
|
|
- def tearDown(self):
|
|
|
- # clean up our stdout munging
|
|
|
- os.dup2(self.old_stdout, sys.stdout.fileno())
|
|
|
- os.close(self.pipes[0])
|
|
|
+ pi = ProcessInfo('Test Process', params)
|
|
|
+ pi.spawn()
|
|
|
+ time.sleep(wait)
|
|
|
+ os.dup2(self.stdout_old, sys.stdout.fileno())
|
|
|
+ os.dup2(self.stderr_old, sys.stderr.fileno())
|
|
|
+ self.assertNotEqual(pi.process, None)
|
|
|
+ self.assertTrue(type(pi.pid) is int)
|
|
|
+
|
|
|
+ # Set non-blocking read on pipes. Process may not print anything
|
|
|
+ # on specific output and the we would hang without this.
|
|
|
+ fd = self.stdout_pipes[0]
|
|
|
+ fl = fcntl.fcntl(fd, fcntl.F_GETFL)
|
|
|
+ fcntl.fcntl(fd, fcntl.F_SETFL, fl | os.O_NONBLOCK)
|
|
|
+
|
|
|
+ fd = self.stderr_pipes[0]
|
|
|
+ fl = fcntl.fcntl(fd, fcntl.F_GETFL)
|
|
|
+ fcntl.fcntl(fd, fcntl.F_SETFL, fl | os.O_NONBLOCK)
|
|
|
+
|
|
|
+ # There's potential problem if b10-dhcp4 prints out more
|
|
|
+ # than 4k of text
|
|
|
+ try:
|
|
|
+ output = os.read(self.stdout_pipes[0], 4096)
|
|
|
+ except OSError:
|
|
|
+ print("No data available from stdout")
|
|
|
+ output = ""
|
|
|
+
|
|
|
+ # read can return None. Make sure we have a string
|
|
|
+ if (output is None):
|
|
|
+ output = ""
|
|
|
+
|
|
|
+ try:
|
|
|
+ error = os.read(self.stderr_pipes[0], 4096)
|
|
|
+ except OSError:
|
|
|
+ print("No data available on stderr")
|
|
|
+ error = ""
|
|
|
+
|
|
|
+ # read can return None. Make sure we have a string
|
|
|
+ if (error is None):
|
|
|
+ error = ""
|
|
|
+
|
|
|
+ try:
|
|
|
+ if (not pi.process.poll()):
|
|
|
+ # let's be nice at first...
|
|
|
+ pi.process.terminate()
|
|
|
+ except OSError:
|
|
|
+ print("Ignoring failed kill attempt. Process is dead already.")
|
|
|
+
|
|
|
+ # call this to get returncode, process should be dead by now
|
|
|
+ rc = pi.process.wait()
|
|
|
+
|
|
|
+ # Clean up our stdout/stderr munging.
|
|
|
+ os.dup2(self.stdout_old, sys.stdout.fileno())
|
|
|
+ os.close(self.stdout_pipes[0])
|
|
|
+
|
|
|
+ os.dup2(self.stderr_old, sys.stderr.fileno())
|
|
|
+ os.close(self.stderr_pipes[0])
|
|
|
+
|
|
|
+ print ("Process finished, return code=%d, stdout=%d bytes, stderr=%d bytes"
|
|
|
+ % (rc, len(output), len(error)) )
|
|
|
+
|
|
|
+ return (rc, output, error)
|
|
|
|
|
|
def test_alive(self):
|
|
|
"""
|
|
|
Simple test. Checks that b10-dhcp6 can be started and prints out info
|
|
|
about starting DHCPv6 operation.
|
|
|
"""
|
|
|
- pi = ProcessInfo('Test Process', [ '../b10-dhcp6' , '-v' ])
|
|
|
- pi.spawn()
|
|
|
- time.sleep(1)
|
|
|
- os.dup2(self.old_stdout, sys.stdout.fileno())
|
|
|
- self.assertNotEqual(pi.process, None)
|
|
|
- self.assertTrue(type(pi.pid) is int)
|
|
|
- output = os.read(self.pipes[0], 4096)
|
|
|
+ print("Note: Purpose of some of the tests is to check if DHCPv6 server can be started,")
|
|
|
+ print(" not that is can bind sockets correctly. Please ignore binding errors.")
|
|
|
+ (returncode, output, error) = self.runCommand(["../b10-dhcp6", "-v"])
|
|
|
+
|
|
|
self.assertEqual( str(output).count("[b10-dhcp6] Initiating DHCPv6 operation."), 1)
|
|
|
|
|
|
- # kill this process
|
|
|
- # XXX: b10-dhcp6 is too dumb to understand 'shutdown' command for now,
|
|
|
- # so let's just kill the bastard
|
|
|
+ def test_portnumber_0(self):
|
|
|
+ print("Check that specifying port number 0 is not allowed.")
|
|
|
|
|
|
- # TODO: Ignore errors for now. This test will be more thorough once ticket #1503
|
|
|
- # (passing port number to b10-dhcp6 daemon) is implemented.
|
|
|
- try:
|
|
|
- os.kill(pi.pid, signal.SIGTERM)
|
|
|
- except OSError:
|
|
|
- print("Ignoring failed kill attempt. Process is dead already.")
|
|
|
+ (returncode, output, error) = self.runCommand(['../b10-dhcp6', '-p', '0'])
|
|
|
+
|
|
|
+ # When invalid port number is specified, return code must not be success
|
|
|
+ self.assertTrue(returncode != 0)
|
|
|
+
|
|
|
+ # Check that there is an error message about invalid port number printed on stderr
|
|
|
+ self.assertEqual( str(error).count("Failed to parse port number"), 1)
|
|
|
+
|
|
|
+ def test_portnumber_missing(self):
|
|
|
+ print("Check that -p option requires a parameter.")
|
|
|
+
|
|
|
+ (returncode, output, error) = self.runCommand(['../b10-dhcp6', '-p'])
|
|
|
+
|
|
|
+ # When invalid port number is specified, return code must not be success
|
|
|
+ self.assertTrue(returncode != 0)
|
|
|
+
|
|
|
+ # Check that there is an error message about invalid port number printed on stderr
|
|
|
+ self.assertEqual( str(error).count("option requires an argument"), 1)
|
|
|
+
|
|
|
+ def test_portnumber_nonroot(self):
|
|
|
+ print("Check that specifying unprivileged port number will work.")
|
|
|
+
|
|
|
+ (returncode, output, error) = self.runCommand(['../b10-dhcp6', '-p', '10057'])
|
|
|
+
|
|
|
+ # When invalid port number is specified, return code must not be success
|
|
|
+ # TODO: Temporarily commented out as socket binding on systems that do not have
|
|
|
+ # interface detection implemented currently fails.
|
|
|
+ # self.assertTrue(returncode == 0)
|
|
|
+
|
|
|
+ # Check that there is a message on stdout about opening proper port
|
|
|
+ self.assertEqual( str(output).count("opening sockets on port 10057"), 1)
|
|
|
|
|
|
if __name__ == '__main__':
|
|
|
unittest.main()
|