123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275276277278279280281282283284285286287288289290291292293294295296297298299300301302303304305306307308309310311312313314315316317318319320321322323324325326327328329330331332333334335336337338339340341342343344345346347348349350351352353354355356357358359360361362363364365366367368369370371372373374375376377378379380381382383384385386387388389390391392393394395396397398399400401402403404405406407408409410411412413414415416417418419420421422423424425426427428429430431432433434435436437438439440441442443444445446447448449450451452453454455456457458459460461462463464465466467468469470471472473474475476477478479480481482483484485486487488489490491492493494495496497498499500501502503504505506507508509510511512513514515516517518519 |
- '''This module returns system information.'''
- import os
- import sys
- import re
- import subprocess
- import os.path
- import platform
- import time
- from datetime import timedelta
- class SysInfo:
- def __init__(self):
- self._num_processors = None
- self._endianness = 'Unknown'
- self._hostname = ''
- self._platform_name = 'Unknown'
- self._platform_version = 'Unknown'
- self._platform_machine = 'Unknown'
- self._platform_is_smp = None
- self._uptime = None
- self._loadavg = None
- self._mem_total = None
- self._mem_free = None
- self._mem_swap_total = None
- self._mem_swap_free = None
- self._net_interfaces = 'Unknown\n'
- self._net_routing_table = 'Unknown\n'
- self._net_stats = 'Unknown\n'
- self._net_connections = 'Unknown\n'
-
-
-
- self._platform_distro = None
- self._mem_cached = None
- self._mem_buffers = None
- def get_num_processors(self):
- """Returns the number of processors. This is the number of
- hyperthreads when hyper-threading is enabled.
- """
- return self._num_processors
- def get_endianness(self):
- """Returns 'big' or 'little'."""
- return self._endianness
- def get_platform_hostname(self):
- """Returns the hostname of the system."""
- return self._hostname
- def get_platform_name(self):
- """Returns the platform name (uname -s)."""
- return self._platform_name
- def get_platform_version(self):
- """Returns the platform version (uname -v)."""
- return self._platform_version
- def get_platform_machine(self):
- """Returns the platform machine architecture."""
- return self._platform_machine
- def get_platform_is_smp(self):
- """Returns True if an SMP kernel is being used, False otherwise."""
- return self._platform_is_smp
- def get_platform_distro(self):
- """Returns the name of the OS distribution in use.
- Note: the concept of 'distribution' is Linux specific. This shouldn't
- be at this level.
- """
- return self._platform_distro
- def get_uptime(self):
- """Returns the uptime in seconds."""
- return self._uptime
- def get_uptime_desc(self):
- """Returns the uptime in human readable form.
- Specifically, the format is '[DD day[s],] hh:mm'.
- It returns None if _uptime is None.
- """
- if self._uptime is None:
- return None
- uptime_desc = ''
- time_delta = timedelta(seconds=self._uptime)
- days = time_delta.days
- if days > 0:
- uptime_desc += ('%d day%s, ' % (days, 's' if days > 1 else ''))
- hours = int(time_delta.seconds / 3600)
- minutes = int((time_delta.seconds - hours * 3600) / 60)
- uptime_desc += ('%d:%02d' % (hours, minutes))
- return uptime_desc
- def get_loadavg(self):
- """Returns the load average as 3 floating point values in an array."""
- return self._loadavg
- def get_mem_total(self):
- """Returns the total amount of memory in bytes."""
- return self._mem_total
- def get_mem_free(self):
- """Returns the amount of free memory in bytes."""
- return self._mem_free
- def get_mem_cached(self):
- """Returns the amount of cached memory in bytes."""
- return self._mem_cached
- def get_mem_buffers(self):
- """Returns the amount of buffer in bytes."""
- return self._mem_buffers
- def get_mem_swap_total(self):
- """Returns the total amount of swap in bytes."""
- return self._mem_swap_total
- def get_mem_swap_free(self):
- """Returns the amount of free swap in bytes."""
- return self._mem_swap_free
- def get_net_interfaces(self):
- """Returns information about network interfaces (as a multi-line string)."""
- return self._net_interfaces
- def get_net_routing_table(self):
- """Returns information about network routing table (as a multi-line string)."""
- return self._net_routing_table
- def get_net_stats(self):
- """Returns network statistics (as a multi-line string)."""
- return self._net_stats
- def get_net_connections(self):
- """Returns network connection information (as a multi-line string)."""
- return self._net_connections
- class SysInfoPOSIX(SysInfo):
- """Common POSIX implementation of the SysInfo class.
- See the SysInfo class documentation for more information.
- """
- def __init__(self):
- super().__init__()
- self._num_processors = os.sysconf('SC_NPROCESSORS_CONF')
- self._endianness = sys.byteorder
- u = os.uname()
- self._platform_name = u[0]
- self._platform_version = u[2]
- self._platform_machine = u[4]
- class SysInfoLinux(SysInfoPOSIX):
- """Linux implementation of the SysInfo class.
- See the SysInfo class documentation for more information.
- """
- def __init__(self):
- super().__init__()
- with open('/proc/sys/kernel/hostname') as f:
- self._hostname = f.read().strip()
- with open('/proc/version') as f:
- self._platform_is_smp = ' SMP ' in f.read().strip()
- with open('/proc/uptime') as f:
- u = f.read().strip().split(' ')
- if len(u) > 1:
- self._uptime = int(round(float(u[0])))
- with open('/proc/loadavg') as f:
- l = f.read().strip().split(' ')
- if len(l) >= 3:
- self._loadavg = (float(l[0]), float(l[1]), float(l[2]))
- with open('/proc/meminfo') as f:
- m = f.readlines()
- for line in m:
- r = re.match('^MemTotal:\s+(.*)\s*kB', line)
- if r:
- self._mem_total = int(r.group(1).strip()) * 1024
- continue
- r = re.match('^MemFree:\s+(.*)\s*kB', line)
- if r:
- self._mem_free = int(r.group(1).strip()) * 1024
- continue
- r = re.match('^Cached:\s+(.*)\s*kB', line)
- if r:
- self._mem_cached = int(r.group(1).strip()) * 1024
- continue
- r = re.match('^Buffers:\s+(.*)\s*kB', line)
- if r:
- self._mem_buffers = int(r.group(1).strip()) * 1024
- continue
- r = re.match('^SwapTotal:\s+(.*)\s*kB', line)
- if r:
- self._mem_swap_total = int(r.group(1).strip()) * 1024
- continue
- r = re.match('^SwapFree:\s+(.*)\s*kB', line)
- if r:
- self._mem_swap_free = int(r.group(1).strip()) * 1024
- continue
- self._platform_distro = None
- try:
- s = subprocess.check_output(['lsb_release', '-a'])
- for line in s.decode('utf-8').split('\n'):
- r = re.match('^Description:(.*)', line)
- if r:
- self._platform_distro = r.group(1).strip()
- break
- except (subprocess.CalledProcessError, OSError):
- pass
- if self._platform_distro is None:
- files = ['/etc/debian_release',
- '/etc/debian_version',
- '/etc/SuSE-release',
- '/etc/UnitedLinux-release',
- '/etc/mandrake-release',
- '/etc/gentoo-release',
- '/etc/fedora-release',
- '/etc/redhat-release',
- '/etc/redhat_version',
- '/etc/slackware-release',
- '/etc/slackware-version',
- '/etc/arch-release',
- '/etc/lsb-release',
- '/etc/mageia-release']
- for fn in files:
- if os.path.exists(fn):
- with open(fn) as f:
- self._platform_distro = f.read().strip()
- break
- if self._platform_distro is None:
- self._platform_distro = 'Unknown'
- try:
- s = subprocess.check_output(['ip', 'addr'])
- self._net_interfaces = s.decode('utf-8')
- except (subprocess.CalledProcessError, OSError):
- self._net_interfaces = 'Warning: "ip addr" command failed.\n'
- try:
- s = subprocess.check_output(['ip', 'route'])
- self._net_routing_table = s.decode('utf-8')
- self._net_routing_table += '\n'
- s = subprocess.check_output(['ip', '-f', 'inet6', 'route'])
- self._net_routing_table += s.decode('utf-8')
- except (subprocess.CalledProcessError, OSError):
- self._net_routing_table = 'Warning: "ip route" or "ip -f inet6 route" command failed.\n'
- try:
- s = subprocess.check_output(['netstat', '-s'])
- self._net_stats = s.decode('utf-8')
- except (subprocess.CalledProcessError, OSError):
- self._net_stats = 'Warning: "netstat -s" command failed.\n'
- try:
- s = subprocess.check_output(['netstat', '-apn'])
- self._net_connections = s.decode('utf-8')
- except (subprocess.CalledProcessError, OSError):
- self._net_connections = 'Warning: "netstat -apn" command failed.\n'
- class SysInfoBSD(SysInfoPOSIX):
- """Common BSD implementation of the SysInfo class.
- See the SysInfo class documentation for more information.
- """
- def __init__(self):
- super().__init__()
- try:
- s = subprocess.check_output(['hostname'])
- self._hostname = s.decode('utf-8').strip()
- except (subprocess.CalledProcessError, OSError):
- pass
- try:
- s = subprocess.check_output(['sysctl', '-n', 'hw.physmem'])
- self._mem_total = int(s.decode('utf-8').strip())
- except (subprocess.CalledProcessError, OSError):
- pass
- try:
- s = subprocess.check_output(['ifconfig'])
- self._net_interfaces = s.decode('utf-8')
- except (subprocess.CalledProcessError, OSError):
- self._net_interfaces = 'Warning: "ifconfig" command failed.\n'
- try:
- s = subprocess.check_output(['netstat', '-s'])
- self._net_stats = s.decode('utf-8')
- except (subprocess.CalledProcessError, OSError):
- self._net_stats = 'Warning: "netstat -s" command failed.\n'
- try:
- s = subprocess.check_output(['netstat', '-an'])
- self._net_connections = s.decode('utf-8')
- except (subprocess.CalledProcessError, OSError):
- self._net_connections = 'Warning: "netstat -an" command failed.\n'
- try:
- s = subprocess.check_output(['netstat', '-nr'])
- self._net_routing_table = s.decode('utf-8')
- except (subprocess.CalledProcessError, OSError):
- self._net_connections = 'Warning: "netstat -nr" command failed.\n'
- class SysInfoOpenBSD(SysInfoBSD):
- """OpenBSD implementation of the SysInfo class.
- See the SysInfo class documentation for more information.
- """
- def __init__(self):
- super().__init__()
- try:
- s = subprocess.check_output(['sysctl', '-n', 'kern.boottime'])
- t = s.decode('utf-8').strip()
- sec = time.time() - int(t)
- self._uptime = int(round(sec))
- except (subprocess.CalledProcessError, OSError):
- pass
- try:
- s = subprocess.check_output(['sysctl', '-n', 'vm.loadavg'])
- l = s.decode('utf-8').strip().split(' ')
- if len(l) >= 3:
- self._loadavg = (float(l[0]), float(l[1]), float(l[2]))
- except (subprocess.CalledProcessError, OSError):
- pass
- try:
- s = subprocess.check_output(['vmstat'])
- lines = s.decode('utf-8').split('\n')
- v = re.split('\s+', lines[2])
- used = int(v[4]) * 1024
- self._mem_free = self._mem_total - used
- except (subprocess.CalledProcessError, OSError):
- pass
- try:
- s = subprocess.check_output(['swapctl', '-s', '-k'])
- l = s.decode('utf-8').strip()
- r = re.match('^total: (\d+) 1K-blocks allocated, (\d+) used, (\d+) available', l)
- if r:
- self._mem_swap_total = int(r.group(1).strip()) * 1024
- self._mem_swap_free = int(r.group(3).strip()) * 1024
- except (subprocess.CalledProcessError, OSError):
- pass
- class SysInfoFreeBSDOSX(SysInfoBSD):
- """Shared code for the FreeBSD and OS X implementations of the SysInfo
- class. See the SysInfo class documentation for more information.
- """
- def __init__(self):
- super().__init__()
- try:
- s = subprocess.check_output(['sysctl', '-n', 'kern.boottime'])
- t = s.decode('utf-8').strip()
- r = re.match('^\{\s+sec\s+\=\s+(\d+),.*', t)
- if r:
- sec = time.time() - int(r.group(1))
- self._uptime = int(round(sec))
- except (subprocess.CalledProcessError, OSError):
- pass
- try:
- s = subprocess.check_output(['sysctl', '-n', 'vm.loadavg'])
- l = s.decode('utf-8').strip()
- r = re.match('^\{(.*)\}$', l)
- if r:
- la = r.group(1).strip().split(' ')
- else:
- la = l.split(' ')
- if len(la) >= 3:
- self._loadavg = (float(la[0]), float(la[1]), float(la[2]))
- except (subprocess.CalledProcessError, OSError):
- pass
- class SysInfoFreeBSD(SysInfoFreeBSDOSX):
- """FreeBSD implementation of the SysInfo class.
- See the SysInfo class documentation for more information.
- """
- def __init__(self):
- super().__init__()
- try:
-
-
-
-
-
- s = subprocess.check_output(['sysctl', '-n',
- 'kern.smp.forward_signal_enabled'])
- self._platform_is_smp = True
- except subprocess.CalledProcessError:
-
-
- self._platform_is_smp = False
- except OSError:
- pass
- try:
- s = subprocess.check_output(['vmstat', '-H'])
- lines = s.decode('utf-8').split('\n')
- v = re.split('\s+', lines[2])
- used = int(v[4]) * 1024
- self._mem_free = self._mem_total - used
- except (subprocess.CalledProcessError, OSError):
- pass
- try:
- s = subprocess.check_output(['swapctl', '-s', '-k'])
- l = s.decode('utf-8').strip()
- r = re.match('^Total:\s+(\d+)\s+(\d+)', l)
- if r:
- self._mem_swap_total = int(r.group(1).strip()) * 1024
- self._mem_swap_free = self._mem_swap_total - (int(r.group(2).strip()) * 1024)
- except (subprocess.CalledProcessError, OSError):
- pass
- class SysInfoOSX(SysInfoFreeBSDOSX):
- """OS X (Darwin) implementation of the SysInfo class.
- See the SysInfo class documentation for more information.
- """
- def __init__(self):
- super().__init__()
-
-
-
-
- self._mem_total = None
- try:
- s = subprocess.check_output(['sysctl', '-n', 'hw.memsize'])
- self._mem_total = int(s.decode('utf-8').strip())
- except (subprocess.CalledProcessError, OSError):
- pass
- try:
- s = subprocess.check_output(['vm_stat'])
- lines = s.decode('utf-8').split('\n')
-
- values = {}
- page_size = None
- page_size_re = re.compile('.*page size of ([0-9]+) bytes')
- for line in lines:
- page_size_m = page_size_re.match(line)
- if page_size_m:
- page_size = int(page_size_m.group(1))
- else:
- key, _, value = line.partition(':')
- values[key] = value.strip()[:-1]
-
- if page_size is not None:
- self._mem_free = int(values['Pages free']) * page_size +\
- int(values['Pages speculative']) * page_size
- except (subprocess.CalledProcessError, OSError):
- pass
- try:
- s = subprocess.check_output(['sysctl', '-n', 'vm.swapusage'])
- l = s.decode('utf-8').strip()
- r = re.match('^total = (\d+\.\d+)M\s+used = (\d+\.\d+)M\s+free = (\d+\.\d+)M', l)
- if r:
- self._mem_swap_total = float(r.group(1).strip()) * 1024
- self._mem_swap_free = float(r.group(3).strip()) * 1024
- except (subprocess.CalledProcessError, OSError):
- pass
- class SysInfoTestcase(SysInfo):
- def __init__(self):
- super().__init__()
- self._endianness = 'bigrastafarian'
- self._platform_name = 'b10test'
- self._uptime = 131072
- def SysInfoFromFactory():
- osname = platform.system()
- if osname == 'Linux':
- return SysInfoLinux()
- elif osname == 'OpenBSD':
- return SysInfoOpenBSD()
- elif osname == 'FreeBSD':
- return SysInfoFreeBSD()
- elif osname == 'Darwin':
- return SysInfoOSX()
- elif osname == 'BIND10Testcase':
- return SysInfoTestcase()
- else:
- return SysInfo()
|