123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275276277278279280281282283284285286287288289290291292293294295296297298299300301302303304305306307308309310311312313314315316317318319320321322323324325326327328329330331332333334335336337338339340341342343344345346347348349350351352353354355356357358359360361362363364365366367368369370371372373374375376377378379380381382383384385386387388389390391392393394395396397398399400401402403404405406407408409410411412413414415416417418419420421422423424425426427428429430431432433434435436437438439440441442443444445446447448449450451452453454455456457458459460461462463464465466467468469470471 |
- from isc.sysinfo import *
- import os
- import unittest
- import platform
- import subprocess
- import time
- NPROCESSORS_LINUX = 42
- NPROCESSORS_OPENBSD = 43
- NPROCESSORS_FREEBSD = 44
- NPROCESSORS_OSX = 45
- def _my_testcase_platform_system():
- return 'BIND10Testcase'
- def _my_linux_platform_system():
- return 'Linux'
- def _my_linux_os_sysconf(key):
- if key == 'SC_NPROCESSORS_CONF':
- return NPROCESSORS_LINUX
- assert False, 'Unhandled key'
- class MyLinuxFile:
- def __init__(self, filename):
- self._filename = filename
- def read(self):
- if self._filename == '/proc/sys/kernel/hostname':
- return 'myhostname'
- elif self._filename == '/proc/version':
- return 'An SMP version string'
- elif self._filename == '/proc/uptime':
- return '172800.75 139993.71'
- elif self._filename == '/proc/loadavg':
- return '0.1 0.2 0.3 0.4'
- else:
- assert False, 'Unhandled filename'
- def readlines(self):
- if self._filename == '/proc/meminfo':
- return ['MemTotal: 3083872 kB',
- 'MemFree: 870492 kB',
- 'Buffers: 27412 kB',
- 'Cached: 1303860 kB',
- 'SwapTotal: 4194300 kB',
- 'SwapFree: 3999464 kB']
- else:
- assert False, 'Unhandled filename'
- def close(self):
- return
- def __enter__(self):
- return self
- def __exit__(self, type, value, traceback):
- return
- def _my_linux_open(filename):
- return MyLinuxFile(filename)
- def _my_linux_subprocess_check_output(command):
- assert type(command) == list, 'command argument is not a list'
- if command == ['lsb_release', '-a']:
- return b'Description: My Distribution\n'
- elif command == ['ip', 'addr']:
- return b'qB2osV6vUOjqm3P/+tQ4d92xoYz8/U8P9v3KWRpNwlI=\n'
- elif command == ['ip', 'route']:
- return b'VGWAS92AlS14Pl2xqENJs5P2Ihe6Nv9g181Mu6Zz+aQ=\n'
- elif command == ['ip', '-f', 'inet6', 'route']:
- return b'XfizswwNA9NkXz6K36ZExpjV08Y5IXkHI8jjDSV+5Nc=\n'
- elif command == ['netstat', '-s']:
- return b'osuxbrcc1g9VgaF4yf3FrtfodrfATrbSnjhqhuQSAs8=\n'
- elif command == ['netstat', '-apn']:
- return b'Z+w0lwa02/T+5+EIio84rrst/Dtizoz/aL9Im7J7ESA=\n'
- else:
- assert False, 'Unhandled command'
- def _my_openbsd_platform_system():
- return 'OpenBSD'
- def _my_openbsd_os_sysconf(key):
- if key == 'SC_NPROCESSORS_CONF':
- return NPROCESSORS_OPENBSD
- assert False, 'Unhandled key'
- def _my_openbsd_platform_uname():
- return ('OpenBSD', 'test.example.com', '5.0', '', 'amd64')
- def _my_bsd_subprocess_check_output(command):
- '''subprocess output for all bsd types'''
- assert type(command) == list, 'command argument is not a list'
- if command == ['hostname']:
- return b'test.example.com\n'
- elif command == ['sysctl', '-n', 'hw.physmem']:
- return b'543214321\n'
- elif command == ['ifconfig']:
- return b'qB2osV6vUOjqm3P/+tQ4d92xoYz8/U8P9v3KWRpNwlI=\n'
- elif command == ['netstat', '-s']:
- return b'osuxbrcc1g9VgaF4yf3FrtfodrfATrbSnjhqhuQSAs8=\n'
- elif command == ['netstat', '-an']:
- return b'Z+w0lwa02/T+5+EIio84rrst/Dtizoz/aL9Im7J7ESA=\n'
- elif command == ['netstat', '-nr']:
- return b'XfizswwNA9NkXz6K36ZExpjV08Y5IXkHI8jjDSV+5Nc=\n'
- else:
- return None
- def _my_openbsd_subprocess_check_output(command):
- assert type(command) == list, 'command argument is not a list'
- if command == ['sysctl', '-n', 'kern.boottime']:
- return bytes(str(int(time.time() - 76632)), 'utf-8')
- elif command == ['sysctl', '-n', 'vm.loadavg']:
- return b'0.7 0.9 0.8\n'
- elif command == ['vmstat']:
- return b' procs memory page disks traps cpu\n r b w avm fre flt re pi po fr sr wd0 cd0 int sys cs us sy id\n 0 0 0 121212 123456 47 0 0 0 0 0 2 0 2 80 14 0 1 99\n'
- elif command == ['swapctl', '-s', '-k']:
- return b'total: 553507 1K-blocks allocated, 2 used, 553505 available'
- else:
- bsd_output = _my_bsd_subprocess_check_output(command)
- if bsd_output is not None:
- return bsd_output
- else:
- assert False, 'Unhandled command'
- def _my_freebsd_platform_system():
- return 'FreeBSD'
- def _my_freebsd_os_sysconf(key):
- if key == 'SC_NPROCESSORS_CONF':
- return NPROCESSORS_FREEBSD
- assert False, 'Unhandled key'
- def _my_freebsd_platform_uname():
- return ('FreeBSD', 'freebsd', '8.2-RELEASE', '', 'i386')
- def _my_freebsd_osx_subprocess_check_output(command, faked_output={}):
- '''subprocess output shared for freebsd and osx'''
- assert type(command) == list, 'command argument is not a list'
- if command == ['sysctl', '-n', 'kern.boottime']:
- if 'boottime-sysctl' in faked_output:
- return faked_output['boottime-sysctl']
- return bytes('{ sec = ' + str(int(time.time() - 76632)) +
- ', usec = 0 }\n', 'utf-8')
- elif command == ['sysctl', '-n', 'vm.loadavg']:
- return b'{ 0.2 0.4 0.6 }\n'
- else:
- return _my_bsd_subprocess_check_output(command)
- def _my_freebsd_subprocess_check_output(command, faked_output):
- assert type(command) == list, 'command argument is not a list'
- if command == ['sysctl', '-n', 'kern.smp.forward_signal_enabled']:
- output = faked_output['smp-sysctl']
- if isinstance(output, Exception):
- raise output
- return faked_output['smp-sysctl']
- elif command == ['vmstat', '-H']:
- return b' procs memory page disks traps cpu\n r b w avm fre flt re pi po fr sr wd0 cd0 int sys cs us sy id\n 0 0 0 343434 123456 47 0 0 0 0 0 2 0 2 80 14 0 1 99\n'
- elif command == ['swapctl', '-s', '-k']:
- return b'Total: 1013216 0\n'
- else:
- freebsd_osx_output = \
- _my_freebsd_osx_subprocess_check_output(command, faked_output)
- if freebsd_osx_output is not None:
- return freebsd_osx_output
- else:
- assert False, 'Unhandled command'
- def _my_osx_platform_system():
- return 'Darwin'
- def _my_osx_platform_uname():
- return ('Darwin', 'test.example.com', '10.6.0', '', '')
- def _my_osx_os_sysconf(key):
- if key == 'SC_NPROCESSORS_CONF':
- return NPROCESSORS_OSX
- assert False, 'Unhandled key'
- def _my_osx_subprocess_check_output(command):
- assert type(command) == list, 'command argument is not a list'
- if command == ['sysctl', '-n', 'hw.memsize']:
-
- return b'123456789\n'
- elif command == ['vm_stat']:
- return b'Mach Virtual Memory Statistics: (page size of 4096 bytes)\nPages free: 12345.\nPages speculative: 11111.\n'
- elif command == ['sysctl', '-n', 'vm.swapusage']:
- return b'total = 18432.00M used = 17381.23M free = 1050.77M\n'
- else:
- freebsd_osx_output = _my_freebsd_osx_subprocess_check_output(command)
- if freebsd_osx_output is not None:
- return freebsd_osx_output
- else:
- assert False, 'Unhandled command'
- class SysInfoTest(unittest.TestCase):
- def setUp(self):
-
-
- self.old_platform_system = platform.system
- self.old_os_sysconf = os.sysconf
- self.old_open = __builtins__.open
- self.old_subprocess_check_output = subprocess.check_output
- def tearDown(self):
-
- platform.system = self.old_platform_system
- os.sysconf = self.old_os_sysconf
- __builtins__.open = self.old_open
- subprocess.check_output = self.old_subprocess_check_output
- def test_sysinfo(self):
- """Test that the various methods on SysInfo exist and return data."""
- s = SysInfo()
- self.assertEqual(None, s.get_num_processors())
- self.assertEqual('Unknown', s.get_endianness())
- self.assertEqual('', s.get_platform_hostname())
- self.assertEqual('Unknown', s.get_platform_name())
- self.assertEqual('Unknown', s.get_platform_version())
- self.assertEqual('Unknown', s.get_platform_machine())
- self.assertFalse(s.get_platform_is_smp())
- self.assertEqual(None, s.get_uptime())
- self.assertEqual(None, s.get_uptime_desc())
- self.assertEqual(None, s.get_loadavg())
- self.assertEqual(None, s.get_mem_total())
- self.assertEqual(None, s.get_mem_free())
- self.assertEqual(None, s.get_mem_cached())
- self.assertEqual(None, s.get_mem_buffers())
- self.assertEqual(None, s.get_mem_swap_total())
- self.assertEqual(None, s.get_mem_swap_free())
- self.assertEqual(None, s.get_platform_distro())
- self.assertEqual('Unknown\n', s.get_net_interfaces())
- self.assertEqual('Unknown\n', s.get_net_routing_table())
- self.assertEqual('Unknown\n', s.get_net_stats())
- self.assertEqual('Unknown\n', s.get_net_connections())
- def test_sysinfo_factory(self):
- """Test that SysInfoFromFactory returns a valid system-specific
- SysInfo implementation.
- See sysinfo.SysInfoTestcase() for some ofthe parameters.
- """
- old_platform_system = platform.system
- platform.system = _my_testcase_platform_system
- s = SysInfoFromFactory()
- self.assertEqual(None, s.get_num_processors())
- self.assertEqual('bigrastafarian', s.get_endianness())
- self.assertEqual('', s.get_platform_hostname())
- self.assertEqual('b10test', s.get_platform_name())
- self.assertEqual('Unknown', s.get_platform_version())
- self.assertEqual('Unknown', s.get_platform_machine())
- self.assertFalse(s.get_platform_is_smp())
- self.assertEqual(131072, s.get_uptime())
-
- self.assertEqual('1 day, 12:24', s.get_uptime_desc())
- self.assertEqual(None, s.get_loadavg())
- self.assertEqual(None, s.get_mem_total())
- self.assertEqual(None, s.get_mem_free())
- self.assertEqual(None, s.get_mem_cached())
- self.assertEqual(None, s.get_mem_buffers())
- self.assertEqual(None, s.get_mem_swap_total())
- self.assertEqual(None, s.get_mem_swap_free())
- self.assertEqual(None, s.get_platform_distro())
- self.assertEqual('Unknown\n', s.get_net_interfaces())
- self.assertEqual('Unknown\n', s.get_net_routing_table())
- self.assertEqual('Unknown\n', s.get_net_stats())
- self.assertEqual('Unknown\n', s.get_net_connections())
- platform.system = old_platform_system
- def test_sysinfo_linux(self):
- """Tests the Linux implementation of SysInfo. Note that this
- tests deep into the implementation, and not just the
- interfaces."""
-
-
- platform.system = _my_linux_platform_system
- os.sysconf = _my_linux_os_sysconf
- __builtins__.open = _my_linux_open
- subprocess.check_output = _my_linux_subprocess_check_output
- s = SysInfoFromFactory()
- self.assertEqual(NPROCESSORS_LINUX, s.get_num_processors())
- self.assertEqual('myhostname', s.get_platform_hostname())
- self.assertTrue(s.get_platform_is_smp())
- self.assertEqual(172801, s.get_uptime())
-
-
- self.assertEqual('2 days, 0:00', s.get_uptime_desc())
- self.assertEqual((0.1, 0.2, 0.3), s.get_loadavg())
- self.assertEqual(3157884928, s.get_mem_total())
- self.assertEqual(891383808, s.get_mem_free())
- self.assertEqual(1335152640, s.get_mem_cached())
- self.assertEqual(28069888, s.get_mem_buffers())
- self.assertEqual(4294963200, s.get_mem_swap_total())
- self.assertEqual(4095451136, s.get_mem_swap_free())
- self.assertEqual('My Distribution', s.get_platform_distro())
-
-
-
- self.assertEqual('qB2osV6vUOjqm3P/+tQ4d92xoYz8/U8P9v3KWRpNwlI=\n', s.get_net_interfaces())
- self.assertEqual('VGWAS92AlS14Pl2xqENJs5P2Ihe6Nv9g181Mu6Zz+aQ=\n\nXfizswwNA9NkXz6K36ZExpjV08Y5IXkHI8jjDSV+5Nc=\n', s.get_net_routing_table())
- self.assertEqual('osuxbrcc1g9VgaF4yf3FrtfodrfATrbSnjhqhuQSAs8=\n', s.get_net_stats())
- self.assertEqual('Z+w0lwa02/T+5+EIio84rrst/Dtizoz/aL9Im7J7ESA=\n', s.get_net_connections())
- def check_bsd_values(self, s):
-
- self.assertEqual('test.example.com', s.get_platform_hostname())
- self.assertLess(abs(76632 - s.get_uptime()), 4)
- self.assertEqual(None, s.get_mem_cached())
- self.assertEqual(None, s.get_mem_buffers())
- self.assertEqual(None, s.get_platform_distro())
-
-
-
- self.assertEqual('qB2osV6vUOjqm3P/+tQ4d92xoYz8/U8P9v3KWRpNwlI=\n', s.get_net_interfaces())
- self.assertEqual('XfizswwNA9NkXz6K36ZExpjV08Y5IXkHI8jjDSV+5Nc=\n', s.get_net_routing_table())
- self.assertEqual('osuxbrcc1g9VgaF4yf3FrtfodrfATrbSnjhqhuQSAs8=\n', s.get_net_stats())
- self.assertEqual('Z+w0lwa02/T+5+EIio84rrst/Dtizoz/aL9Im7J7ESA=\n', s.get_net_connections())
- def test_sysinfo_openbsd(self):
- """Tests the OpenBSD implementation of SysInfo. Note that this
- tests deep into the implementation, and not just the
- interfaces."""
-
-
- platform.system = _my_openbsd_platform_system
- os.sysconf = _my_openbsd_os_sysconf
- subprocess.check_output = _my_openbsd_subprocess_check_output
- os.uname = _my_openbsd_platform_uname
- s = SysInfoFromFactory()
- self.assertEqual(NPROCESSORS_OPENBSD, s.get_num_processors())
- self.check_bsd_values(s)
- self.assertEqual((0.7, 0.9, 0.8), s.get_loadavg())
- self.assertFalse(s.get_platform_is_smp())
- self.assertEqual(543214321, s.get_mem_total())
- self.assertEqual(123456 * 1024, s.get_mem_free())
- self.assertEqual(566791168, s.get_mem_swap_total())
- self.assertEqual(566789120, s.get_mem_swap_free())
- def test_sysinfo_freebsd(self):
- """Tests the FreeBSD implementation of SysInfo. Note that this
- tests deep into the implementation, and not just the
- interfaces."""
-
-
- platform.system = _my_freebsd_platform_system
- os.sysconf = _my_freebsd_os_sysconf
-
-
- faked_process_output = { 'smp-sysctl': b'1\n' }
- subprocess.check_output = lambda command : \
- _my_freebsd_subprocess_check_output(command, faked_process_output)
- os.uname = _my_freebsd_platform_uname
- s = SysInfoFromFactory()
- self.assertEqual(NPROCESSORS_FREEBSD, s.get_num_processors())
- self.assertTrue(s.get_platform_is_smp())
-
-
- faked_process_output['smp-sysctl'] = b'0\n'
- s = SysInfoFromFactory()
- self.assertTrue(s.get_platform_is_smp())
-
-
- faked_process_output['smp-sysctl'] = \
- subprocess.CalledProcessError(1, 'sysctl')
- s = SysInfoFromFactory()
- self.assertFalse(s.get_platform_is_smp())
-
- faked_process_output['smp-sysctl'] = OSError()
- s = SysInfoFromFactory()
- self.assertIsNone(s.get_platform_is_smp())
- self.check_bsd_values(s)
- self.assertEqual((0.2, 0.4, 0.6), s.get_loadavg())
- self.assertEqual(543214321, s.get_mem_total())
- self.assertEqual(123456 * 1024, s.get_mem_free())
- self.assertEqual(1037533184, s.get_mem_swap_total())
- self.assertEqual(1037533184, s.get_mem_swap_free())
-
-
- faked_process_output['boottime-sysctl'] = bytes(
- '{ sec = ' + str(int(time.time() - 4200)) +
- ', usec = 0 }\n', 'utf-8')
- s = SysInfoFromFactory()
- self.assertEqual('1:10', s.get_uptime_desc())
- def test_sysinfo_osx(self):
- """Tests the OS X implementation of SysInfo. Note that this
- tests deep into the implementation, and not just the
- interfaces."""
-
-
- platform.system = _my_osx_platform_system
- os.sysconf = _my_osx_os_sysconf
- subprocess.check_output = _my_osx_subprocess_check_output
- os.uname = _my_osx_platform_uname
- s = SysInfoFromFactory()
- self.assertEqual(NPROCESSORS_OSX, s.get_num_processors())
- self.assertFalse(s.get_platform_is_smp())
- self.check_bsd_values(s)
- self.assertEqual((0.2, 0.4, 0.6), s.get_loadavg())
- self.assertEqual(123456789, s.get_mem_total())
- self.assertEqual((23456 * 4096), s.get_mem_free())
- self.assertEqual(18874368.0, s.get_mem_swap_total())
- self.assertEqual(1075988.48, s.get_mem_swap_free())
- if __name__ == "__main__":
- unittest.main()
|