123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275276277278279280281282283284285286287288289290291292293294295296297298299300301302303304305306307308309310311312313314315316317318319320321322323324325326327328329330331332333334335336337338339340341342343344345346347348349350351352353354355356357358359360361362363364365366367368369370371372373374375376377378379380381382383384385386387388389390391392393394395396397398399400401402403404405406407408409410411412413414415416417418419420421422423424425426427428429430431432433434435436437438439440441442443444445446447448449450451452453454455456457458459460461462463464465466467468469470471472473474475476477478479480481482483484485486487488489490491492493494495496497498499500501502503504505506507508509510511512513514515516517518519520521522523524525526527528529530531532533534535536537538539540541542543544545546547548549550551552553554555556557558559560561562563564565566567568569570571572573574575576577578579580581582583584585586587588589590591592593594595596597598599600601602603604605606607608609610611612613614615616617618619620621622623624625626627628629630631632633634635636637638639640641642643644645646647648649650651652653654655656657658659660661662663664665666667668669670671672673674675676677678679680681682683684685686687688689690691692693694 |
- # Copyright (C) 2009 Internet Systems Consortium.
- #
- # Permission to use, copy, modify, and distribute this software for any
- # purpose with or without fee is hereby granted, provided that the above
- # copyright notice and this permission notice appear in all copies.
- #
- # THE SOFTWARE IS PROVIDED "AS IS" AND INTERNET SYSTEMS CONSORTIUM
- # DISCLAIMS ALL WARRANTIES WITH REGARD TO THIS SOFTWARE INCLUDING ALL
- # IMPLIED WARRANTIES OF MERCHANTABILITY AND FITNESS. IN NO EVENT SHALL
- # INTERNET SYSTEMS CONSORTIUM BE LIABLE FOR ANY SPECIAL, DIRECT,
- # INDIRECT, OR CONSEQUENTIAL DAMAGES OR ANY DAMAGES WHATSOEVER RESULTING
- # FROM LOSS OF USE, DATA OR PROFITS, WHETHER IN AN ACTION OF CONTRACT,
- # NEGLIGENCE OR OTHER TORTIOUS ACTION, ARISING OUT OF OR IN CONNECTION
- # WITH THE USE OR PERFORMANCE OF THIS SOFTWARE.
- import unittest
- import isc.cc.data
- import os
- import io
- import errno
- import sys
- import socket
- import ssl
- import http.client
- import pwd
- import getpass
- import re
- import json
- from optparse import OptionParser
- from isc.config.config_data import ConfigData, MultiConfigData
- from isc.config.module_spec import ModuleSpec
- from isc.testutils.parse_args import TestOptParser, OptsError
- from bindctl_main import set_bindctl_options
- from bindctl import cmdparse
- from bindctl import bindcmd
- from bindctl.moduleinfo import *
- from bindctl.exception import *
- try:
- from collections import OrderedDict
- except ImportError:
- from mycollections import OrderedDict
- class TestCmdLex(unittest.TestCase):
- def my_assert_raise(self, exception_type, cmd_line):
- self.assertRaises(exception_type, cmdparse.BindCmdParser, cmd_line)
- def testCommandWithoutParameter(self):
- cmd_parser = cmdparse.BindCmdParser("zone add")
- assert cmd_parser.module == "zone"
- assert cmd_parser.command == "add"
- self.assertEqual(len(cmd_parser.params), 0)
- def testCommandWithParameters(self):
- lines = {"zone add zone_name = cnnic.cn, file = cnnic.cn.file master=1.1.1.1",
- "zone add zone_name = \"cnnic.cn\", file ='cnnic.cn.file' master=1.1.1.1 ",
- "zone add zone_name = 'cnnic.cn\", file ='cnnic.cn.file' master=1.1.1.1, " }
- for cmd_line in lines:
- cmd_parser = cmdparse.BindCmdParser(cmd_line)
- assert cmd_parser.module == "zone"
- assert cmd_parser.command == "add"
- assert cmd_parser.params["zone_name"] == "cnnic.cn"
- assert cmd_parser.params["file"] == "cnnic.cn.file"
- assert cmd_parser.params["master"] == '1.1.1.1'
- def testCommandWithParamters_2(self):
- '''Test whether the parameters in key=value can be parsed properly.'''
- cmd_parser = cmdparse.BindCmdParser('zone cmd name = 1:34::2')
- self.assertEqual(cmd_parser.params['name'], '1:34::2')
- cmd_parser = cmdparse.BindCmdParser('zone cmd name = 1\"\'34**&2'
- ' value=44\"\'\"')
- self.assertEqual(cmd_parser.params['name'], '1\"\'34**&2')
- self.assertEqual(cmd_parser.params['value'], '44\"\'\"')
- cmd_parser = cmdparse.BindCmdParser('zone cmd name = 1\"\'34**&2'
- ',value= 44\"\'\"')
- self.assertEqual(cmd_parser.params['name'], '1\"\'34**&2')
- self.assertEqual(cmd_parser.params['value'], '44\"\'\"')
- cmd_parser = cmdparse.BindCmdParser('zone cmd name = 1\'34**&2'
- 'value=44\"\'\" value = '
- '\"==============\'')
- self.assertEqual(cmd_parser.params['name'], '1\'34**&2value=44\"\'\"')
- self.assertEqual(cmd_parser.params['value'], '==============')
- cmd_parser = cmdparse.BindCmdParser('zone cmd name = \"1234, '
- '567890 \" value ==&*/')
- self.assertEqual(cmd_parser.params['name'], '1234, 567890 ')
- self.assertEqual(cmd_parser.params['value'], '=&*/')
- def testCommandWithListParam(self):
- cmd_parser = cmdparse.BindCmdParser("zone set zone_name='cnnic.cn', "
- "master='1.1.1.1, 2.2.2.2'")
- assert cmd_parser.params["master"] == '1.1.1.1, 2.2.2.2'
- def testCommandWithHelpParam(self):
- cmd_parser = cmdparse.BindCmdParser("zone add help")
- assert cmd_parser.params["help"] == "help"
- cmd_parser = cmdparse.BindCmdParser("zone add help *&)&)*&&$#$^%")
- assert cmd_parser.params["help"] == "help"
- self.assertEqual(len(cmd_parser.params), 1)
- def testCmdModuleNameFormatError(self):
- self.my_assert_raise(CmdModuleNameFormatError, "zone=good")
- self.my_assert_raise(CmdModuleNameFormatError, "zo/ne")
- self.my_assert_raise(CmdModuleNameFormatError, "")
- self.my_assert_raise(CmdModuleNameFormatError, "=zone")
- self.my_assert_raise(CmdModuleNameFormatError, "zone,")
- def testCmdMissCommandNameFormatError(self):
- self.my_assert_raise(CmdMissCommandNameFormatError, "zone")
- self.my_assert_raise(CmdMissCommandNameFormatError, "zone ")
- self.my_assert_raise(CmdMissCommandNameFormatError, "help ")
- def testCmdCommandNameFormatError(self):
- self.my_assert_raise(CmdCommandNameFormatError, "zone =d")
- self.my_assert_raise(CmdCommandNameFormatError, "zone z=d")
- self.my_assert_raise(CmdCommandNameFormatError, "zone z-d ")
- self.my_assert_raise(CmdCommandNameFormatError, "zone zdd/")
- self.my_assert_raise(CmdCommandNameFormatError, "zone zdd/ \"")
- class TestCmdSyntax(unittest.TestCase):
- def _create_bindcmd(self):
- """Create one bindcmd"""
- tool = bindcmd.BindCmdInterpreter()
- string_spec = { 'item_type' : 'string',
- 'item_optional' : False,
- 'item_default' : ''}
- int_spec = { 'item_type' : 'integer',
- 'item_optional' : False,
- 'item_default' : 10}
- zone_file_param = ParamInfo(name = "zone_file",
- param_spec = string_spec)
- zone_name = ParamInfo(name = 'zone_name', param_spec = string_spec)
- load_cmd = CommandInfo(name = "load")
- load_cmd.add_param(zone_file_param)
- load_cmd.add_param(zone_name)
- param_master = ParamInfo(name = "master", optional = True,
- param_spec = string_spec)
- param_master = ParamInfo(name = "port", optional = True,
- param_spec = int_spec)
- param_allow_update = ParamInfo(name = "allow_update",
- optional = True,
- param_spec = string_spec)
- set_cmd = CommandInfo(name = "set")
- set_cmd.add_param(param_master)
- set_cmd.add_param(param_allow_update)
- set_cmd.add_param(zone_name)
- reload_all_cmd = CommandInfo(name = "reload_all")
- zone_module = ModuleInfo(name = "zone")
- zone_module.add_command(load_cmd)
- zone_module.add_command(set_cmd)
- zone_module.add_command(reload_all_cmd)
- tool.add_module_info(zone_module)
- return tool
- def setUp(self):
- self.bindcmd = self._create_bindcmd()
- def no_assert_raise(self, cmd_line):
- cmd_parser = cmdparse.BindCmdParser(cmd_line)
- self.bindcmd._validate_cmd(cmd_parser)
- def my_assert_raise(self, exception_type, cmd_line):
- cmd_parser = cmdparse.BindCmdParser(cmd_line)
- self.assertRaises(exception_type, self.bindcmd._validate_cmd,
- cmd_parser)
- def testValidateSuccess(self):
- self.no_assert_raise("zone load zone_file='cn' zone_name='cn'")
- self.no_assert_raise("zone load zone_file='cn', zone_name='cn', ")
- self.no_assert_raise("zone help ")
- self.no_assert_raise("zone load help ")
- self.no_assert_raise("zone help help='dd' ")
- self.no_assert_raise("zone set allow_update='1.1.1.1' zone_name='cn'")
- self.no_assert_raise("zone set zone_name='cn'")
- self.my_assert_raise(isc.cc.data.DataTypeError,
- "zone set zone_name ='cn', port='cn'")
- self.no_assert_raise("zone reload_all")
- def testCmdUnknownModuleSyntaxError(self):
- self.my_assert_raise(CmdUnknownModuleSyntaxError, "zoned d")
- self.my_assert_raise(CmdUnknownModuleSyntaxError, "dd dd ")
- def testCmdUnknownCmdSyntaxError(self):
- self.my_assert_raise(CmdUnknownCmdSyntaxError, "zone dd")
- def testCmdMissParamSyntaxError(self):
- self.my_assert_raise(CmdMissParamSyntaxError,
- "zone load zone_file='cn'")
- self.my_assert_raise(CmdMissParamSyntaxError,
- "zone load zone_name='cn'")
- self.my_assert_raise(CmdMissParamSyntaxError,
- "zone set allow_update='1.1.1.1'")
- self.my_assert_raise(CmdMissParamSyntaxError,
- "zone set ")
- def testCmdUnknownParamSyntaxError(self):
- self.my_assert_raise(CmdUnknownParamSyntaxError,
- "zone load zone_d='cn'")
- self.my_assert_raise(CmdUnknownParamSyntaxError,
- "zone reload_all zone_name = 'cn'")
- self.my_assert_raise(CmdUnknownParamSyntaxError,
- "zone help a b c")
- class TestModuleInfo(unittest.TestCase):
- def test_get_param_name_by_position(self):
- cmd = CommandInfo('command')
- cmd.add_param(ParamInfo('name'))
- cmd.add_param(ParamInfo('age'))
- cmd.add_param(ParamInfo('data', optional = True))
- cmd.add_param(ParamInfo('sex'))
- self.assertEqual('name', cmd.get_param_name_by_position(0, 2))
- self.assertEqual('age', cmd.get_param_name_by_position(1, 2))
- self.assertEqual('sex', cmd.get_param_name_by_position(2, 3))
- self.assertEqual('data', cmd.get_param_name_by_position(2, 4))
- self.assertEqual('data', cmd.get_param_name_by_position(2, 4))
- self.assertRaises(KeyError, cmd.get_param_name_by_position, 4, 4)
- class TestNameSequence(unittest.TestCase):
- """
- Test if the module/command/parameters is saved in the order creation
- """
- def _create_bindcmd(self):
- """Create one bindcmd"""
- self._cmd = CommandInfo(name = "load")
- self.module = ModuleInfo(name = "zone")
- self.tool = bindcmd.BindCmdInterpreter()
- for random_str in self.random_names:
- self._cmd.add_param(ParamInfo(name = random_str))
- self.module.add_command(CommandInfo(name = random_str))
- self.tool.add_module_info(ModuleInfo(name = random_str))
- def setUp(self):
- self.random_names = ['1erdfeDDWsd', '3fe', '2009erd',
- 'Fe231', 'tere142', 'rei8WD']
- self._create_bindcmd()
- def testSequence(self):
- param_names = self._cmd.get_param_names()
- cmd_names = self.module.get_command_names()
- module_names = self.tool.get_module_names()
- i = 0
- while i < len(self.random_names):
- assert self.random_names[i] == param_names[i+1]
- assert self.random_names[i] == cmd_names[i+1]
- assert self.random_names[i] == module_names[i+1]
- i = i + 1
- # tine class to fake a UIModuleCCSession, but only the config data
- # parts for the next set of tests
- class FakeCCSession(MultiConfigData):
- def __init__(self):
- self._local_changes = {}
- self._current_config = {}
- self._specifications = {}
- self.add_foo_spec()
- def add_foo_spec(self):
- spec = { "module_name": "foo",
- "config_data": [
- { "item_name": "an_int",
- "item_type": "integer",
- "item_optional": False,
- "item_default": 1
- },
- { "item_name": "a_list",
- "item_type": "list",
- "item_optional": False,
- "item_default": [],
- "list_item_spec":
- { "item_name": "a_string",
- "item_type": "string",
- "item_optional": False,
- "item_default": "bar"
- }
- }
- ]
- }
- self.set_specification(ModuleSpec(spec))
- # fake socket
- class FakeSocket():
- def __init__(self):
- self.run = True
- def connect(self, to):
- if not self.run:
- raise socket.error
- def close(self):
- self.run = False
- def send(self, data):
- if not self.run:
- raise socket.error
- return len(data)
- def makefile(self, type):
- return self
- def sendall(self, data):
- if not self.run:
- raise socket.error
- return len(data)
- class TestConfigCommands(unittest.TestCase):
- def setUp(self):
- self.tool = bindcmd.BindCmdInterpreter()
- mod_info = ModuleInfo(name = "foo")
- self.tool.add_module_info(mod_info)
- self.tool.config_data = FakeCCSession()
- self.stdout_backup = sys.stdout
- self.printed_messages = []
- self.tool._print = self.store_print
- def test_precmd(self):
- def update_all_modules_info():
- raise socket.error
- def precmd(line):
- self.tool.precmd(line)
- self.tool._update_all_modules_info = update_all_modules_info
- # If line is equals to 'EOF', _update_all_modules_info()
- # shouldn't be called
- precmd('EOF')
- self.assertRaises(socket.error, precmd, 'continue')
- def store_print(self, *args):
- '''Method to override _print in BindCmdInterpreter.
- Instead of printing the values, appends the argument tuple
- to the list in self.printed_messages'''
- self.printed_messages.append(" ".join(map(str, args)))
- def __check_printed_message(self, expected_message, printed_message):
- self.assertIsNotNone(re.match(expected_message, printed_message),
- "Printed message '" + printed_message +
- "' does not match '" + expected_message + "'")
- def __check_printed_messages(self, expected_messages):
- '''Helper test function to check the printed messages against a list
- of regexps'''
- self.assertEqual(len(expected_messages), len(self.printed_messages))
- for _ in map(self.__check_printed_message,
- expected_messages,
- self.printed_messages):
- pass
- def test_try_login(self):
- # Make sure __try_login raises the correct exception
- # upon failure of either send_POST or the read() on the
- # response
- orig_send_POST = self.tool.send_POST
- expected_printed_messages = []
- try:
- def send_POST_raiseImmediately(self, params):
- raise socket.error("test error")
- self.tool.send_POST = send_POST_raiseImmediately
- self.assertRaises(FailToLogin, self.tool._try_login, "foo", "bar")
- expected_printed_messages.append(
- 'Error while sending login information: test error')
- self.__check_printed_messages(expected_printed_messages)
- def create_send_POST_raiseOnRead(exception):
- '''Create a replacement send_POST() method that raises
- the given exception when read() is called on the value
- returned from send_POST()'''
- def send_POST_raiseOnRead(self, params):
- class MyResponse:
- def read(self):
- raise exception
- return MyResponse()
- return send_POST_raiseOnRead
- # basic socket error
- self.tool.send_POST =\
- create_send_POST_raiseOnRead(socket.error("read error"))
- self.assertRaises(FailToLogin, self.tool._try_login, "foo", "bar")
- expected_printed_messages.append(
- 'Error while sending login information: read error')
- self.__check_printed_messages(expected_printed_messages)
- # connection reset
- exc = socket.error("connection reset")
- exc.errno = errno.ECONNRESET
- self.tool.send_POST =\
- create_send_POST_raiseOnRead(exc)
- self.assertRaises(FailToLogin, self.tool._try_login, "foo", "bar")
- expected_printed_messages.append(
- 'Error while sending login information: connection reset')
- self.__check_printed_messages(expected_printed_messages)
- # 'normal' SSL error
- exc = ssl.SSLError()
- self.tool.send_POST =\
- create_send_POST_raiseOnRead(exc)
- self.assertRaises(FailToLogin, self.tool._try_login, "foo", "bar")
- expected_printed_messages.append(
- 'Error while sending login information: .*')
- self.__check_printed_messages(expected_printed_messages)
- # 'EOF' SSL error
- exc = ssl.SSLError()
- exc.errno = ssl.SSL_ERROR_EOF
- self.tool.send_POST =\
- create_send_POST_raiseOnRead(exc)
- self.assertRaises(FailToLogin, self.tool._try_login, "foo", "bar")
- expected_printed_messages.append(
- 'Error while sending login information: .*')
- self.__check_printed_messages(expected_printed_messages)
- # any other exception should be passed through
- self.tool.send_POST =\
- create_send_POST_raiseOnRead(ImportError())
- self.assertRaises(ImportError, self.tool._try_login, "foo", "bar")
- self.__check_printed_messages(expected_printed_messages)
- finally:
- self.tool.send_POST = orig_send_POST
- def test_try_login_calls_cmdctl(self):
- # Make sure _try_login() makes the right API call to cmdctl.
- orig_conn = self.tool.conn
- try:
- class MyConn:
- def __init__(self):
- self.method = None
- self.url = None
- self.param = None
- self.headers = None
- def request(self, method, url, param, headers):
- self.method = method
- self.url = url
- self.param = param
- self.headers = headers
- def getresponse(self):
- class MyResponse:
- def __init__(self):
- self.status = http.client.OK
- def read(self):
- class MyData:
- def decode(self):
- return json.dumps(True)
- return MyData()
- return MyResponse()
- self.tool.conn = MyConn()
- self.assertTrue(self.tool._try_login('user32', 'pass64'))
- self.assertEqual(self.tool.conn.method, 'POST')
- self.assertEqual(self.tool.conn.url, '/login')
- self.assertEqual(json.loads(self.tool.conn.param),
- {"password": "pass64", "username": "user32"})
- self.assertIn('cookie', self.tool.conn.headers)
- finally:
- self.tool.conn = orig_conn
- def test_run(self):
- def login_to_cmdctl():
- return True
- def cmd_loop():
- self.tool._send_message("/module_spec", None)
- self.tool.login_to_cmdctl = login_to_cmdctl
- # rewrite cmdloop() to avoid interactive mode
- self.tool.cmdloop = cmd_loop
- self.tool.conn.sock = FakeSocket()
- self.tool.conn.sock.close()
- self.assertEqual(1, self.tool.run())
- # First few lines may be some kind of heading, or a warning that
- # Python readline is unavailable, so we do a sub-string check.
- self.assertIn("Failed to send request, the connection is closed",
- self.printed_messages)
- self.assertEqual(1, len(self.printed_messages))
- # validate log message for http.client.CannotSendRequest
- self.assertEqual(1, self.tool.run())
- # First few lines may be some kind of heading, or a warning that
- # Python readline is unavailable, so we do a sub-string check.
- self.assertIn("Can not send request, the connection is busy",
- self.printed_messages)
- self.assertEqual(2, len(self.printed_messages))
- def test_apply_cfg_command_int(self):
- self.tool.location = '/'
- self.assertEqual((1, MultiConfigData.DEFAULT),
- self.tool.config_data.get_value("/foo/an_int"))
- cmd_parser = cmdparse.BindCmdParser('config set identifier='
- '"foo/an_int" value="5"')
- self.tool.apply_config_cmd(cmd_parser)
- self.assertEqual((5, MultiConfigData.LOCAL),
- self.tool.config_data.get_value("/foo/an_int"))
- cmd_parser = cmdparse.BindCmdParser('config unset identifier='
- '"foo/an_int"')
- self.tool.apply_config_cmd(cmd_parser)
- self.assertEqual((1, MultiConfigData.DEFAULT),
- self.tool.config_data.get_value("/foo/an_int"))
- # this should raise a NotFoundError
- cmd_parser = cmdparse.BindCmdParser('config set identifier='
- '"foo/bar" value="[]"')
- self.assertRaises(isc.cc.data.DataNotFoundError,
- self.tool.apply_config_cmd, cmd_parser)
- cmd_parser = cmdparse.BindCmdParser('config unset identifier='
- '"foo/bar"')
- self.assertRaises(isc.cc.data.DataNotFoundError,
- self.tool.apply_config_cmd, cmd_parser)
- # this should raise a TypeError
- cmd_parser = cmdparse.BindCmdParser('config set identifier='
- '"foo/an_int" value="[]"')
- self.assertRaises(isc.cc.data.DataTypeError,
- self.tool.apply_config_cmd, cmd_parser)
- # this is a very specific one for use with a set of list tests
- # to try out the flexibility of the parser (only in the next test)
- def clt(self, full_cmd_string, item_value):
- cmd_parser = cmdparse.BindCmdParser(full_cmd_string)
- self.tool.apply_config_cmd(cmd_parser)
- self.assertEqual(([item_value], MultiConfigData.LOCAL),
- self.tool.config_data.get_value("/foo/a_list"))
- def test_apply_cfg_command_list(self):
- self.tool.location = '/'
- self.assertEqual(([], MultiConfigData.DEFAULT),
- self.tool.config_data.get_value("/foo/a_list"))
- self.clt("config set identifier=\"foo/a_list\" value=[\"a\"]", "a")
- self.clt("config set identifier=\"foo/a_list\" value =[\"b\"]", "b")
- self.clt("config set identifier=\"foo/a_list\" value= [\"c\"]", "c")
- self.clt("config set identifier=\"foo/a_list\" value = [\"d\"]", "d")
- self.clt("config set identifier =\"foo/a_list\" value=[\"e\"]", "e")
- self.clt("config set identifier= \"foo/a_list\" value=[\"f\"]", "f")
- self.clt("config set identifier = \"foo/a_list\" value=[\"g\"]", "g")
- self.clt("config set identifier = \"foo/a_list\" value = [\"h\"]", "h")
- self.clt("config set identifier = \"foo/a_list\" value=[\"i\" ]", "i")
- self.clt("config set identifier = \"foo/a_list\" value=[ \"j\"]", "j")
- self.clt("config set identifier = \"foo/a_list\" value=[ \"k\" ]", "k")
- # this should raise a TypeError
- cmd_parser = cmdparse.BindCmdParser('config set identifier='
- '"foo/a_list" value="a"')
- self.assertRaises(isc.cc.data.DataTypeError,
- self.tool.apply_config_cmd, cmd_parser)
- cmd_parser = cmdparse.BindCmdParser('config set identifier='
- '"foo/a_list" value=[1]')
- self.assertRaises(isc.cc.data.DataTypeError,
- self.tool.apply_config_cmd, cmd_parser)
- def tearDown(self):
- sys.stdout = self.stdout_backup
- def test_cmd_has_identifier_param(self):
- module = ModuleInfo(name="test_module")
- cmd = CommandInfo(name="command_with_identifier")
- param = ParamInfo(name=bindcmd.CFGITEM_IDENTIFIER_PARAM)
- cmd.add_param(param)
- module.add_command(cmd)
- cmd = CommandInfo(name="command_without_identifier")
- param = ParamInfo(name="some_argument")
- cmd.add_param(param)
- module.add_command(cmd)
- self.tool.add_module_info(module)
- cmd_parser = cmdparse.BindCmdParser('test_module '
- 'command_with_identifier')
- self.assertTrue(self.tool._cmd_has_identifier_param(cmd_parser))
- cmd_parser = cmdparse.BindCmdParser('test_module '
- 'command_without_identifier')
- self.assertFalse(self.tool._cmd_has_identifier_param(cmd_parser))
- cmd_parser = cmdparse.BindCmdParser('badmodule '
- 'command_without_identifier')
- self.assertFalse(self.tool._cmd_has_identifier_param(cmd_parser))
- def test_get_identifier_startswith(self):
- hints = self.tool._get_identifier_startswith("/")
- self.assertEqual(['foo/an_int', 'foo/a_list'], hints)
- hints = self.tool._get_identifier_startswith("/foo/an")
- self.assertEqual(['foo/an_int'], hints)
- hints = self.tool._get_identifier_startswith("/bar")
- self.assertEqual([], hints)
- class FakeBindCmdInterpreter(bindcmd.BindCmdInterpreter):
- def __init__(self):
- pass
- class TestBindCmdInterpreter(unittest.TestCase):
- def setUp(self):
- self.old_stdout = sys.stdout
- def tearDown(self):
- sys.stdout = self.old_stdout
- def _create_invalid_csv_file(self, csvfilename):
- import csv
- csvfile = open(csvfilename, 'w')
- writer = csv.writer(csvfile)
- writer.writerow(['name1'])
- writer.writerow(['name2'])
- csvfile.close()
- def test_csv_file_dir(self):
- # Checking default value
- home_dir = pwd.getpwnam(getpass.getuser()).pw_dir
- self.assertEqual(home_dir + os.sep + '.bind10' + os.sep,
- bindcmd.BindCmdInterpreter().csv_file_dir)
- new_csv_dir = '/something/different/'
- custom_cmd = bindcmd.BindCmdInterpreter(csv_file_dir=new_csv_dir)
- self.assertEqual(new_csv_dir, custom_cmd.csv_file_dir)
- def test_get_saved_user_info(self):
- with open(os.devnull, 'w') as f:
- sys.stdout = f
- cmd = bindcmd.BindCmdInterpreter()
- users = cmd._get_saved_user_info('/notexist', 'csv_file.csv')
- self.assertEqual([], users)
- csvfilename = 'csv_file.csv'
- self._create_invalid_csv_file(csvfilename)
- users = cmd._get_saved_user_info('./', csvfilename)
- self.assertEqual([], users)
- os.remove(csvfilename)
- class TestCommandLineOptions(unittest.TestCase):
- def setUp(self):
- self.parser = TestOptParser()
- set_bindctl_options(self.parser)
- def test_csv_file_dir(self):
- # by default the option is "undefined"
- (options, _) = self.parser.parse_args([])
- self.assertEqual(None, options.csv_file_dir)
- # specify the option, valid case.
- (options, _) = self.parser.parse_args(['--csv-file-dir', 'some_dir'])
- self.assertEqual('some_dir', options.csv_file_dir)
- # missing option arg; should trigger parser error.
- self.assertRaises(OptsError, self.parser.parse_args,
- ['--csv-file-dir'])
- if __name__== "__main__":
- unittest.main()
|