123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205 |
- # Copyright (C) 2012 Internet Systems Consortium.
- #
- # Permission to use, copy, modify, and distribute this software for any
- # purpose with or without fee is hereby granted, provided that the above
- # copyright notice and this permission notice appear in all copies.
- #
- # THE SOFTWARE IS PROVIDED "AS IS" AND INTERNET SYSTEMS CONSORTIUM
- # DISCLAIMS ALL WARRANTIES WITH REGARD TO THIS SOFTWARE INCLUDING ALL
- # IMPLIED WARRANTIES OF MERCHANTABILITY AND FITNESS. IN NO EVENT SHALL
- # INTERNET SYSTEMS CONSORTIUM BE LIABLE FOR ANY SPECIAL, DIRECT,
- # INDIRECT, OR CONSEQUENTIAL DAMAGES OR ANY DAMAGES WHATSOEVER RESULTING
- # FROM LOSS OF USE, DATA OR PROFITS, WHETHER IN AN ACTION OF CONTRACT,
- # NEGLIGENCE OR OTHER TORTIOUS ACTION, ARISING OUT OF OR IN CONNECTION
- # WITH THE USE OR PERFORMANCE OF THIS SOFTWARE.
- from bind10_src import ProcessInfo, parse_args, dump_pid, unlink_pid_file, _BASETIME
- import unittest
- import sys
- import os
- import signal
- import socket
- from isc.net.addr import IPAddr
- import time
- import isc
- import fcntl
- class TestDhcpv4Daemon(unittest.TestCase):
- def setUp(self):
- # don't redirect stdout/stderr here as we want to print out things
- # during the test
- pass
- def tearDown(self):
- pass
- def runCommand(self, params, wait=1):
- """
- This method runs dhcp4 and returns a touple: (returncode, stdout, stderr)
- """
- ## @todo: Convert this into generic method and reuse it in dhcp6
- print("Running command: %s" % (" ".join(params)))
- # redirect stdout to a pipe so we can check that our
- # process spawning is doing the right thing with stdout
- self.stdout_old = os.dup(sys.stdout.fileno())
- self.stdout_pipes = os.pipe()
- os.dup2(self.stdout_pipes[1], sys.stdout.fileno())
- os.close(self.stdout_pipes[1])
- # do the same trick for stderr:
- self.stderr_old = os.dup(sys.stderr.fileno())
- self.stderr_pipes = os.pipe()
- os.dup2(self.stderr_pipes[1], sys.stderr.fileno())
- os.close(self.stderr_pipes[1])
- # note that we use dup2() to restore the original stdout
- # to the main program ASAP in each test... this prevents
- # hangs reading from the child process (as the pipe is only
- # open in the child), and also insures nice pretty output
- pi = ProcessInfo('Test Process', params)
- pi.spawn()
- time.sleep(wait)
- os.dup2(self.stdout_old, sys.stdout.fileno())
- os.dup2(self.stderr_old, sys.stderr.fileno())
- self.assertNotEqual(pi.process, None)
- self.assertTrue(type(pi.pid) is int)
- # Set non-blocking read on pipes. Process may not print anything
- # on specific output and the we would hang without this.
- fd = self.stdout_pipes[0]
- fl = fcntl.fcntl(fd, fcntl.F_GETFL)
- fcntl.fcntl(fd, fcntl.F_SETFL, fl | os.O_NONBLOCK)
- fd = self.stderr_pipes[0]
- fl = fcntl.fcntl(fd, fcntl.F_GETFL)
- fcntl.fcntl(fd, fcntl.F_SETFL, fl | os.O_NONBLOCK)
- # There's potential problem if b10-dhcp4 prints out more
- # than 4k of text
- try:
- output = os.read(self.stdout_pipes[0], 4096)
- except OSError:
- print("No data available from stdout")
- output = ""
- # read can return None. Make sure we have a string
- if (output is None):
- output = ""
- try:
- error = os.read(self.stderr_pipes[0], 4096)
- except OSError:
- print("No data available on stderr")
- error = ""
- # read can return None. Make sure we have a string
- if (error is None):
- error = ""
- try:
- if (not pi.process.poll()):
- # let's be nice at first...
- pi.process.terminate()
- except OSError:
- print("Ignoring failed kill attempt. Process is dead already.")
- # call this to get returncode, process should be dead by now
- rc = pi.process.wait()
- # Clean up our stdout/stderr munging.
- os.dup2(self.stdout_old, sys.stdout.fileno())
- os.close(self.stdout_pipes[0])
- os.dup2(self.stderr_old, sys.stderr.fileno())
- os.close(self.stderr_pipes[0])
- print ("Process finished, return code=%d, stdout=%d bytes, stderr=%d bytes"
- % (rc, len(output), len(error)) )
- return (rc, output, error)
- def test_alive(self):
- print("Note: Purpose of some of the tests is to check if DHCPv4 server can be started,")
- print(" not that is can bind sockets correctly. Please ignore binding errors.")
- (returncode, output, error) = self.runCommand(["../b10-dhcp4", "-v"])
- self.assertEqual( str(output).count("[b10-dhcp4] Initiating DHCPv4 server operation."), 1)
- def test_portnumber_0(self):
- print("Check that specifying port number 0 is not allowed.")
- (returncode, output, error) = self.runCommand(['../b10-dhcp4', '-p', '0'])
- # When invalid port number is specified, return code must not be success
- self.assertTrue(returncode != 0)
- # Check that there is an error message about invalid port number printed on stderr
- self.assertEqual( str(error).count("Failed to parse port number"), 1)
- def test_portnumber_missing(self):
- print("Check that -p option requires a parameter.")
- (returncode, output, error) = self.runCommand(['../b10-dhcp4', '-p'])
- # When invalid port number is specified, return code must not be success
- self.assertTrue(returncode != 0)
- # Check that there is an error message about invalid port number printed on stderr
- self.assertEqual( str(error).count("option requires an argument"), 1)
- def test_portnumber_invalid1(self):
- print("Check that -p option is check against bogus port number (999999).")
- (returncode, output, error) = self.runCommand(['../b10-dhcp4', '-p','999999'])
- # When invalid port number is specified, return code must not be success
- self.assertTrue(returncode != 0)
- # Check that there is an error message about invalid port number printed on stderr
- self.assertEqual( str(error).count("Failed to parse port number"), 1)
- def test_portnumber_invalid2(self):
- print("Check that -p option is check against bogus port number (123garbage).")
- (returncode, output, error) = self.runCommand(['../b10-dhcp4', '-p','123garbage'])
- # When invalid port number is specified, return code must not be success
- self.assertTrue(returncode != 0)
- # Check that there is an error message about invalid port number printed on stderr
- self.assertEqual( str(error).count("Failed to parse port number"), 1)
- def test_portnumber_nonroot(self):
- print("Check that specifying unprivileged port number will work.")
- (returncode, output, error) = self.runCommand(['../b10-dhcp4', '-s', '-p', '10057'])
- # When invalid port number is specified, return code must not be success
- # TODO: Temporarily commented out as socket binding on systems that do not have
- # interface detection implemented currently fails.
- # self.assertTrue(returncode == 0)
- # Check that there is an error message about invalid port number printed on stderr
- self.assertEqual( str(output).count("opening sockets on port 10057"), 1)
- def test_skip_msgq(self):
- print("Check that connection to BIND10 msgq can be disabled.")
- (returncode, output, error) = self.runCommand(['../b10-dhcp4', '-s', '-p', '10057'])
- # When invalid port number is specified, return code must not be success
- # TODO: Temporarily commented out as socket binding on systems that do not have
- # interface detection implemented currently fails.
- # self.assertTrue(returncode == 0)
- # Check that there is an error message about invalid port number printed on stderr
- self.assertEqual( str(output).count("Skipping connection to the BIND10 msgq."), 1)
- if __name__ == '__main__':
- unittest.main()
|