b10-stats_test.py 28 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275276277278279280281282283284285286287288289290291292293294295296297298299300301302303304305306307308309310311312313314315316317318319320321322323324325326327328329330331332333334335336337338339340341342343344345346347348349350351352353354355356357358359360361362363364365366367368369370371372373374375376377378379380381382383384385386387388389390391392393394395396397398399400401402403404405406407408409410411412413414415416417418419420421422423424425426427428429430431432433434435436437438439440441442443444445446447448449450451452453454455456457458459460461462463464465466467468469470471472473474475476477478479480481482483484485486487488489490491492493494495496497498499500501502503504505506507508509510511512513514515516517518519520521522523524525526527528529530531532533534535536537538539540541542543544545546547548549550551552553554555556557558559560561562563564565566567568569570571572573574575576577578579580581582583584585586587588589590591592593594595596597598599600601602603604605606607608609610611612613614615616617618619620621622623624625626627628629630631632633634635636637638639640641642643644645646647648649650651652653654655656657658659660661662
  1. # Copyright (C) 2010, 2011 Internet Systems Consortium.
  2. #
  3. # Permission to use, copy, modify, and distribute this software for any
  4. # purpose with or without fee is hereby granted, provided that the above
  5. # copyright notice and this permission notice appear in all copies.
  6. #
  7. # THE SOFTWARE IS PROVIDED "AS IS" AND INTERNET SYSTEMS CONSORTIUM
  8. # DISCLAIMS ALL WARRANTIES WITH REGARD TO THIS SOFTWARE INCLUDING ALL
  9. # IMPLIED WARRANTIES OF MERCHANTABILITY AND FITNESS. IN NO EVENT SHALL
  10. # INTERNET SYSTEMS CONSORTIUM BE LIABLE FOR ANY SPECIAL, DIRECT,
  11. # INDIRECT, OR CONSEQUENTIAL DAMAGES OR ANY DAMAGES WHATSOEVER RESULTING
  12. # FROM LOSS OF USE, DATA OR PROFITS, WHETHER IN AN ACTION OF CONTRACT,
  13. # NEGLIGENCE OR OTHER TORTIOUS ACTION, ARISING OUT OF OR IN CONNECTION
  14. # WITH THE USE OR PERFORMANCE OF THIS SOFTWARE.
  15. #
  16. # Tests for the stats module
  17. #
  18. import os
  19. import sys
  20. import time
  21. import unittest
  22. import imp
  23. from isc.cc.session import Session, SessionError
  24. from isc.config.ccsession import ModuleCCSession, ModuleCCSessionError
  25. from fake_time import time, strftime, gmtime
  26. import stats
  27. stats.time = time
  28. stats.strftime = strftime
  29. stats.gmtime = gmtime
  30. from stats import SessionSubject, CCSessionListener, get_timestamp, get_datetime
  31. from fake_time import _TEST_TIME_SECS, _TEST_TIME_STRF
  32. if "B10_FROM_SOURCE" in os.environ:
  33. TEST_SPECFILE_LOCATION = os.environ["B10_FROM_SOURCE"] +\
  34. "/src/bin/stats/tests/testdata/stats_test.spec"
  35. else:
  36. TEST_SPECFILE_LOCATION = "./testdata/stats_test.spec"
  37. class TestStats(unittest.TestCase):
  38. def setUp(self):
  39. self.session = Session()
  40. self.subject = SessionSubject(session=self.session)
  41. self.listener = CCSessionListener(self.subject)
  42. self.stats_spec = self.listener.cc_session.get_module_spec().get_config_spec()
  43. self.module_name = self.listener.cc_session.get_module_spec().get_module_name()
  44. self.stats_data = {
  45. 'report_time' : get_datetime(),
  46. 'bind10.boot_time' : "1970-01-01T00:00:00Z",
  47. 'stats.timestamp' : get_timestamp(),
  48. 'stats.lname' : self.session.lname,
  49. 'auth.queries.tcp': 0,
  50. 'auth.queries.udp': 0,
  51. "stats.boot_time": get_datetime(),
  52. "stats.start_time": get_datetime(),
  53. "stats.last_update_time": get_datetime()
  54. }
  55. # check starting
  56. self.assertFalse(self.subject.running)
  57. self.subject.start()
  58. self.assertEqual(len(self.session.old_message_queue), 1)
  59. self.assertTrue(self.subject.running)
  60. self.assertEqual(len(self.session.message_queue), 0)
  61. self.assertEqual(self.module_name, 'Stats')
  62. def tearDown(self):
  63. # check closing
  64. self.subject.stop()
  65. self.assertFalse(self.subject.running)
  66. self.subject.detach(self.listener)
  67. self.listener.stop()
  68. self.session.close()
  69. def test_local_func(self):
  70. """
  71. Test for local function
  72. """
  73. # test for result_ok
  74. self.assertEqual(type(result_ok()), dict)
  75. self.assertEqual(result_ok(), {'result': [0]})
  76. self.assertEqual(result_ok(1), {'result': [1]})
  77. self.assertEqual(result_ok(0,'OK'), {'result': [0, 'OK']})
  78. self.assertEqual(result_ok(1,'Not good'), {'result': [1, 'Not good']})
  79. self.assertEqual(result_ok(None,"It's None"), {'result': [None, "It's None"]})
  80. self.assertNotEqual(result_ok(), {'RESULT': [0]})
  81. # test for get_timestamp
  82. self.assertEqual(get_timestamp(), _TEST_TIME_SECS)
  83. # test for get_datetime
  84. self.assertEqual(get_datetime(), _TEST_TIME_STRF)
  85. def test_show_command(self):
  86. """
  87. Test for show command
  88. """
  89. # test show command without arg
  90. self.session.group_sendmsg({"command": [ "show", None ]}, "Stats")
  91. self.assertEqual(len(self.session.message_queue), 1)
  92. self.subject.check()
  93. result_data = self.session.get_message("Stats", None)
  94. # ignore under 0.9 seconds
  95. self.assertEqual(result_ok(0, self.stats_data), result_data)
  96. self.assertEqual(len(self.session.message_queue), 0)
  97. # test show command with arg
  98. self.session.group_sendmsg({"command": [ "show", {"stats_item_name": "stats.lname"}]}, "Stats")
  99. self.assertEqual(len(self.subject.session.message_queue), 1)
  100. self.subject.check()
  101. result_data = self.subject.session.get_message("Stats", None)
  102. self.assertEqual(result_ok(0, {'stats.lname': self.stats_data['stats.lname']}),
  103. result_data)
  104. self.assertEqual(len(self.subject.session.message_queue), 0)
  105. # test show command with arg which has wrong name
  106. self.session.group_sendmsg({"command": [ "show", {"stats_item_name": "stats.dummy"}]}, "Stats")
  107. self.assertEqual(len(self.session.message_queue), 1)
  108. self.subject.check()
  109. result_data = self.session.get_message("Stats", None)
  110. # ignore under 0.9 seconds
  111. self.assertEqual(result_ok(0, self.stats_data), result_data)
  112. self.assertEqual(len(self.session.message_queue), 0)
  113. def test_set_command(self):
  114. """
  115. Test for set command
  116. """
  117. # test set command
  118. self.stats_data['auth.queries.udp'] = 54321
  119. self.assertEqual(self.stats_data['auth.queries.udp'], 54321)
  120. self.assertEqual(self.stats_data['auth.queries.tcp'], 0)
  121. self.session.group_sendmsg({ "command": [
  122. "set", {
  123. 'stats_data': {'auth.queries.udp': 54321 }
  124. } ] },
  125. "Stats")
  126. self.assertEqual(len(self.session.message_queue), 1)
  127. self.subject.check()
  128. self.assertEqual(result_ok(),
  129. self.session.get_message("Stats", None))
  130. self.assertEqual(len(self.session.message_queue), 0)
  131. # test show command
  132. self.session.group_sendmsg({"command": [ "show", None ]}, "Stats")
  133. self.assertEqual(len(self.session.message_queue), 1)
  134. self.subject.check()
  135. result_data = self.session.get_message("Stats", None)
  136. self.assertEqual(result_ok(0, self.stats_data), result_data)
  137. self.assertEqual(len(self.session.message_queue), 0)
  138. # test set command 2
  139. self.stats_data['auth.queries.udp'] = 0
  140. self.assertEqual(self.stats_data['auth.queries.udp'], 0)
  141. self.assertEqual(self.stats_data['auth.queries.tcp'], 0)
  142. self.session.group_sendmsg({ "command": [ "set", {'stats_data': {'auth.queries.udp': 0}} ]},
  143. "Stats")
  144. self.assertEqual(len(self.session.message_queue), 1)
  145. self.subject.check()
  146. self.assertEqual(result_ok(),
  147. self.session.get_message("Stats", None))
  148. self.assertEqual(len(self.session.message_queue), 0)
  149. # test show command 2
  150. self.session.group_sendmsg({"command": [ "show", None ]}, "Stats")
  151. self.assertEqual(len(self.session.message_queue), 1)
  152. self.subject.check()
  153. result_data = self.session.get_message("Stats", None)
  154. self.assertEqual(result_ok(0, self.stats_data), result_data)
  155. self.assertEqual(len(self.session.message_queue), 0)
  156. # test set command 3
  157. self.stats_data['auth.queries.tcp'] = 54322
  158. self.assertEqual(self.stats_data['auth.queries.udp'], 0)
  159. self.assertEqual(self.stats_data['auth.queries.tcp'], 54322)
  160. self.session.group_sendmsg({ "command": [
  161. "set", {
  162. 'stats_data': {'auth.queries.tcp': 54322 }
  163. } ] },
  164. "Stats")
  165. self.assertEqual(len(self.session.message_queue), 1)
  166. self.subject.check()
  167. self.assertEqual(result_ok(),
  168. self.session.get_message("Stats", None))
  169. self.assertEqual(len(self.session.message_queue), 0)
  170. # test show command 3
  171. self.session.group_sendmsg({"command": [ "show", None ]}, "Stats")
  172. self.assertEqual(len(self.session.message_queue), 1)
  173. self.subject.check()
  174. result_data = self.session.get_message("Stats", None)
  175. self.assertEqual(result_ok(0, self.stats_data), result_data)
  176. self.assertEqual(len(self.session.message_queue), 0)
  177. def test_remove_command(self):
  178. """
  179. Test for remove command
  180. """
  181. self.session.group_sendmsg({"command":
  182. [ "remove", {"stats_item_name": 'bind10.boot_time' }]},
  183. "Stats")
  184. self.assertEqual(len(self.session.message_queue), 1)
  185. self.subject.check()
  186. self.assertEqual(result_ok(),
  187. self.session.get_message("Stats", None))
  188. self.assertEqual(len(self.session.message_queue), 0)
  189. self.assertEqual(self.stats_data.pop('bind10.boot_time'), "1970-01-01T00:00:00Z")
  190. self.assertFalse('bind10.boot_time' in self.stats_data)
  191. # test show command with arg
  192. self.session.group_sendmsg({"command":
  193. [ "show", {"stats_item_name": 'bind10.boot_time'}]},
  194. "Stats")
  195. self.assertEqual(len(self.session.message_queue), 1)
  196. self.subject.check()
  197. result_data = self.session.get_message("Stats", None)
  198. self.assertFalse('bind10.boot_time' in result_data['result'][1])
  199. self.assertEqual(result_ok(0, self.stats_data), result_data)
  200. self.assertEqual(len(self.session.message_queue), 0)
  201. def test_reset_command(self):
  202. """
  203. Test for reset command
  204. """
  205. self.session.group_sendmsg({"command": [ "reset" ] }, "Stats")
  206. self.assertEqual(len(self.session.message_queue), 1)
  207. self.subject.check()
  208. self.assertEqual(result_ok(),
  209. self.session.get_message("Stats", None))
  210. self.assertEqual(len(self.session.message_queue), 0)
  211. # test show command
  212. self.session.group_sendmsg({"command": [ "show" ]}, "Stats")
  213. self.assertEqual(len(self.session.message_queue), 1)
  214. self.subject.check()
  215. result_data = self.session.get_message("Stats", None)
  216. self.assertEqual(result_ok(0, self.stats_data), result_data)
  217. self.assertEqual(len(self.session.message_queue), 0)
  218. def test_status_command(self):
  219. """
  220. Test for status command
  221. """
  222. self.session.group_sendmsg({"command": [ "status" ] }, "Stats")
  223. self.assertEqual(len(self.session.message_queue), 1)
  224. self.subject.check()
  225. self.assertEqual(result_ok(0, "I'm alive."),
  226. self.session.get_message("Stats", None))
  227. self.assertEqual(len(self.session.message_queue), 0)
  228. def test_unknown_command(self):
  229. """
  230. Test for unknown command
  231. """
  232. self.session.group_sendmsg({"command": [ "hoge", None ]}, "Stats")
  233. self.assertEqual(len(self.session.message_queue), 1)
  234. self.subject.check()
  235. self.assertEqual(result_ok(1, "Unknown command: 'hoge'"),
  236. self.session.get_message("Stats", None))
  237. self.assertEqual(len(self.session.message_queue), 0)
  238. def test_shutdown_command(self):
  239. """
  240. Test for shutdown command
  241. """
  242. self.session.group_sendmsg({"command": [ "shutdown", None ]}, "Stats")
  243. self.assertEqual(len(self.session.message_queue), 1)
  244. self.assertTrue(self.subject.running)
  245. self.subject.check()
  246. self.assertFalse(self.subject.running)
  247. self.assertEqual(result_ok(),
  248. self.session.get_message("Stats", None))
  249. self.assertEqual(len(self.session.message_queue), 0)
  250. def test_some_commands(self):
  251. """
  252. Test for some commands in a row
  253. """
  254. # test set command
  255. self.stats_data['bind10.boot_time'] = '2010-08-02T14:47:56Z'
  256. self.assertEqual(self.stats_data['bind10.boot_time'], '2010-08-02T14:47:56Z')
  257. self.session.group_sendmsg({ "command": [
  258. "set", {
  259. 'stats_data': {'bind10.boot_time': '2010-08-02T14:47:56Z' }
  260. }]},
  261. "Stats")
  262. self.assertEqual(len(self.session.message_queue), 1)
  263. self.subject.check()
  264. self.assertEqual(result_ok(),
  265. self.session.get_message("Stats", None))
  266. self.assertEqual(len(self.session.message_queue), 0)
  267. # check its value
  268. self.session.group_sendmsg({ "command": [
  269. "show", { 'stats_item_name': 'bind10.boot_time' }
  270. ] }, "Stats")
  271. self.assertEqual(len(self.session.message_queue), 1)
  272. self.subject.check()
  273. result_data = self.session.get_message("Stats", None)
  274. self.assertEqual(result_ok(0, {'bind10.boot_time': '2010-08-02T14:47:56Z'}),
  275. result_data)
  276. self.assertEqual(result_ok(0, {'bind10.boot_time': self.stats_data['bind10.boot_time']}),
  277. result_data)
  278. self.assertEqual(len(self.session.message_queue), 0)
  279. # test set command 2nd
  280. self.stats_data['auth.queries.udp'] = 98765
  281. self.assertEqual(self.stats_data['auth.queries.udp'], 98765)
  282. self.session.group_sendmsg({ "command": [
  283. "set", { 'stats_data': {
  284. 'auth.queries.udp':
  285. self.stats_data['auth.queries.udp']
  286. } }
  287. ] }, "Stats")
  288. self.assertEqual(len(self.session.message_queue), 1)
  289. self.subject.check()
  290. self.assertEqual(result_ok(),
  291. self.session.get_message("Stats", None))
  292. self.assertEqual(len(self.session.message_queue), 0)
  293. # check its value
  294. self.session.group_sendmsg({"command": [
  295. "show", {'stats_item_name': 'auth.queries.udp'}
  296. ] }, "Stats")
  297. self.assertEqual(len(self.session.message_queue), 1)
  298. self.subject.check()
  299. result_data = self.session.get_message("Stats", None)
  300. self.assertEqual(result_ok(0, {'auth.queries.udp': 98765}),
  301. result_data)
  302. self.assertEqual(result_ok(0, {'auth.queries.udp': self.stats_data['auth.queries.udp']}),
  303. result_data)
  304. self.assertEqual(len(self.session.message_queue), 0)
  305. # test set command 3
  306. self.stats_data['auth.queries.tcp'] = 4321
  307. self.session.group_sendmsg({"command": [
  308. "set",
  309. {'stats_data': {'auth.queries.tcp': 4321 }} ]},
  310. "Stats")
  311. self.assertEqual(len(self.session.message_queue), 1)
  312. self.subject.check()
  313. self.assertEqual(result_ok(),
  314. self.session.get_message("Stats", None))
  315. self.assertEqual(len(self.session.message_queue), 0)
  316. # check value
  317. self.session.group_sendmsg({"command": [ "show", {'stats_item_name': 'auth.queries.tcp'} ]},
  318. "Stats")
  319. self.assertEqual(len(self.session.message_queue), 1)
  320. self.subject.check()
  321. result_data = self.session.get_message("Stats", None)
  322. self.assertEqual(result_ok(0, {'auth.queries.tcp': 4321}),
  323. result_data)
  324. self.assertEqual(result_ok(0, {'auth.queries.tcp': self.stats_data['auth.queries.tcp']}),
  325. result_data)
  326. self.assertEqual(len(self.session.message_queue), 0)
  327. self.session.group_sendmsg({"command": [ "show", {'stats_item_name': 'auth.queries.udp'} ]},
  328. "Stats")
  329. self.assertEqual(len(self.session.message_queue), 1)
  330. self.subject.check()
  331. result_data = self.session.get_message("Stats", None)
  332. self.assertEqual(result_ok(0, {'auth.queries.udp': 98765}),
  333. result_data)
  334. self.assertEqual(result_ok(0, {'auth.queries.udp': self.stats_data['auth.queries.udp']}),
  335. result_data)
  336. self.assertEqual(len(self.session.message_queue), 0)
  337. # test set command 4
  338. self.stats_data['auth.queries.tcp'] = 67890
  339. self.session.group_sendmsg({"command": [
  340. "set", {'stats_data': {'auth.queries.tcp': 67890 }} ]},
  341. "Stats")
  342. self.assertEqual(len(self.session.message_queue), 1)
  343. self.subject.check()
  344. self.assertEqual(result_ok(),
  345. self.session.get_message("Stats", None))
  346. self.assertEqual(len(self.session.message_queue), 0)
  347. # test show command for all values
  348. self.session.group_sendmsg({"command": [ "show", None ]}, "Stats")
  349. self.assertEqual(len(self.session.message_queue), 1)
  350. self.subject.check()
  351. result_data = self.session.get_message("Stats", None)
  352. self.assertEqual(result_ok(0, self.stats_data), result_data)
  353. self.assertEqual(len(self.session.message_queue), 0)
  354. def test_some_commands2(self):
  355. """
  356. Test for some commands in a row using list-type value
  357. """
  358. self.stats_data['listtype'] = [1, 2, 3]
  359. self.assertEqual(self.stats_data['listtype'], [1, 2, 3])
  360. self.session.group_sendmsg({ "command": [
  361. "set", {'stats_data': {'listtype': [1, 2, 3] }}
  362. ]}, "Stats")
  363. self.assertEqual(len(self.session.message_queue), 1)
  364. self.subject.check()
  365. self.assertEqual(result_ok(),
  366. self.session.get_message("Stats", None))
  367. self.assertEqual(len(self.session.message_queue), 0)
  368. # check its value
  369. self.session.group_sendmsg({ "command": [
  370. "show", { 'stats_item_name': 'listtype'}
  371. ]}, "Stats")
  372. self.assertEqual(len(self.session.message_queue), 1)
  373. self.subject.check()
  374. result_data = self.session.get_message("Stats", None)
  375. self.assertEqual(result_ok(0, {'listtype': [1, 2, 3]}),
  376. result_data)
  377. self.assertEqual(result_ok(0, {'listtype': self.stats_data['listtype']}),
  378. result_data)
  379. self.assertEqual(len(self.session.message_queue), 0)
  380. # test set list-type value
  381. self.assertEqual(self.stats_data['listtype'], [1, 2, 3])
  382. self.session.group_sendmsg({"command": [
  383. "set", {'stats_data': {'listtype': [3, 2, 1, 0] }}
  384. ]}, "Stats")
  385. self.assertEqual(len(self.session.message_queue), 1)
  386. self.subject.check()
  387. self.assertEqual(result_ok(),
  388. self.session.get_message("Stats", None))
  389. self.assertEqual(len(self.session.message_queue), 0)
  390. # check its value
  391. self.session.group_sendmsg({ "command": [
  392. "show", { 'stats_item_name': 'listtype' }
  393. ] }, "Stats")
  394. self.assertEqual(len(self.session.message_queue), 1)
  395. self.subject.check()
  396. result_data = self.session.get_message("Stats", None)
  397. self.assertEqual(result_ok(0, {'listtype': [3, 2, 1, 0]}),
  398. result_data)
  399. self.assertEqual(len(self.session.message_queue), 0)
  400. def test_some_commands3(self):
  401. """
  402. Test for some commands in a row using dictionary-type value
  403. """
  404. self.stats_data['dicttype'] = {"a": 1, "b": 2, "c": 3}
  405. self.assertEqual(self.stats_data['dicttype'], {"a": 1, "b": 2, "c": 3})
  406. self.session.group_sendmsg({ "command": [
  407. "set", {
  408. 'stats_data': {'dicttype': {"a": 1, "b": 2, "c": 3} }
  409. }]},
  410. "Stats")
  411. self.assertEqual(len(self.session.message_queue), 1)
  412. self.subject.check()
  413. self.assertEqual(result_ok(),
  414. self.session.get_message("Stats", None))
  415. self.assertEqual(len(self.session.message_queue), 0)
  416. # check its value
  417. self.session.group_sendmsg({ "command": [ "show", { 'stats_item_name': 'dicttype' } ]}, "Stats")
  418. self.assertEqual(len(self.session.message_queue), 1)
  419. self.subject.check()
  420. result_data = self.session.get_message("Stats", None)
  421. self.assertEqual(result_ok(0, {'dicttype': {"a": 1, "b": 2, "c": 3}}),
  422. result_data)
  423. self.assertEqual(result_ok(0, {'dicttype': self.stats_data['dicttype']}),
  424. result_data)
  425. self.assertEqual(len(self.session.message_queue), 0)
  426. # test set list-type value
  427. self.assertEqual(self.stats_data['dicttype'], {"a": 1, "b": 2, "c": 3})
  428. self.session.group_sendmsg({"command": [
  429. "set", {'stats_data': {'dicttype': {"a": 3, "b": 2, "c": 1, "d": 0} }} ]},
  430. "Stats")
  431. self.assertEqual(len(self.session.message_queue), 1)
  432. self.subject.check()
  433. self.assertEqual(result_ok(),
  434. self.session.get_message("Stats", None))
  435. self.assertEqual(len(self.session.message_queue), 0)
  436. # check its value
  437. self.session.group_sendmsg({ "command": [ "show", { 'stats_item_name': 'dicttype' }]}, "Stats")
  438. self.assertEqual(len(self.session.message_queue), 1)
  439. self.subject.check()
  440. result_data = self.session.get_message("Stats", None)
  441. self.assertEqual(result_ok(0, {'dicttype': {"a": 3, "b": 2, "c": 1, "d": 0} }),
  442. result_data)
  443. self.assertEqual(len(self.session.message_queue), 0)
  444. def test_config_update(self):
  445. """
  446. Test for config update
  447. """
  448. # test show command without arg
  449. self.session.group_sendmsg({"command": [ "config_update", {"x-version":999} ]}, "Stats")
  450. self.assertEqual(len(self.session.message_queue), 1)
  451. self.subject.check()
  452. self.assertEqual(result_ok(),
  453. self.session.get_message("Stats", None))
  454. def test_for_boss(self):
  455. last_queue = self.session.old_message_queue.pop()
  456. self.assertEqual(
  457. last_queue.msg, {'command': ['getstats']})
  458. self.assertEqual(
  459. last_queue.env['group'], 'Boss')
  460. class TestStats2(unittest.TestCase):
  461. def setUp(self):
  462. self.session = Session()
  463. self.subject = SessionSubject(session=self.session)
  464. self.listener = CCSessionListener(self.subject)
  465. self.module_name = self.listener.cc_session.get_module_spec().get_module_name()
  466. # check starting
  467. self.assertFalse(self.subject.running)
  468. self.subject.start()
  469. self.assertTrue(self.subject.running)
  470. self.assertEqual(len(self.session.message_queue), 0)
  471. self.assertEqual(self.module_name, 'Stats')
  472. def tearDown(self):
  473. # check closing
  474. self.subject.stop()
  475. self.assertFalse(self.subject.running)
  476. self.subject.detach(self.listener)
  477. self.listener.stop()
  478. def test_specfile(self):
  479. """
  480. Test for specfile
  481. """
  482. if "B10_FROM_SOURCE" in os.environ:
  483. self.assertEqual(stats.SPECFILE_LOCATION,
  484. os.environ["B10_FROM_SOURCE"] + os.sep + \
  485. "src" + os.sep + "bin" + os.sep + "stats" + \
  486. os.sep + "stats.spec")
  487. self.assertEqual(stats.SCHEMA_SPECFILE_LOCATION,
  488. os.environ["B10_FROM_SOURCE"] + os.sep + \
  489. "src" + os.sep + "bin" + os.sep + "stats" + \
  490. os.sep + "stats-schema.spec")
  491. imp.reload(stats)
  492. # change path of SPECFILE_LOCATION
  493. stats.SPECFILE_LOCATION = TEST_SPECFILE_LOCATION
  494. stats.SCHEMA_SPECFILE_LOCATION = TEST_SPECFILE_LOCATION
  495. self.assertEqual(stats.SPECFILE_LOCATION, TEST_SPECFILE_LOCATION)
  496. self.subject = stats.SessionSubject(session=self.session)
  497. self.session = self.subject.session
  498. self.listener = stats.CCSessionListener(self.subject)
  499. self.assertEqual(self.listener.stats_spec, [])
  500. self.assertEqual(self.listener.stats_data, {})
  501. self.assertEqual(self.listener.commands_spec, [
  502. {
  503. "command_name": "status",
  504. "command_description": "identify whether stats module is alive or not",
  505. "command_args": []
  506. },
  507. {
  508. "command_name": "the_dummy",
  509. "command_description": "this is for testing",
  510. "command_args": []
  511. }])
  512. def test_func_initialize_data(self):
  513. """
  514. Test for initialize_data function
  515. """
  516. # prepare for sample data set
  517. stats_spec = [
  518. {
  519. "item_name": "none_sample",
  520. "item_type": "null",
  521. "item_default": "None"
  522. },
  523. {
  524. "item_name": "boolean_sample",
  525. "item_type": "boolean",
  526. "item_default": True
  527. },
  528. {
  529. "item_name": "string_sample",
  530. "item_type": "string",
  531. "item_default": "A something"
  532. },
  533. {
  534. "item_name": "int_sample",
  535. "item_type": "integer",
  536. "item_default": 9999999
  537. },
  538. {
  539. "item_name": "real_sample",
  540. "item_type": "real",
  541. "item_default": 0.0009
  542. },
  543. {
  544. "item_name": "list_sample",
  545. "item_type": "list",
  546. "item_default": [0, 1, 2, 3, 4],
  547. "list_item_spec": []
  548. },
  549. {
  550. "item_name": "map_sample",
  551. "item_type": "map",
  552. "item_default": {'name':'value'},
  553. "map_item_spec": []
  554. },
  555. {
  556. "item_name": "other_sample",
  557. "item_type": "__unknown__",
  558. "item_default": "__unknown__"
  559. }
  560. ]
  561. # data for comparison
  562. stats_data = {
  563. 'none_sample': None,
  564. 'boolean_sample': True,
  565. 'string_sample': 'A something',
  566. 'int_sample': 9999999,
  567. 'real_sample': 0.0009,
  568. 'list_sample': [0, 1, 2, 3, 4],
  569. 'map_sample': {'name':'value'},
  570. 'other_sample': '__unknown__'
  571. }
  572. self.assertEqual(self.listener.initialize_data(stats_spec), stats_data)
  573. def test_func_main(self):
  574. # explicitly make failed
  575. self.session.close()
  576. stats.main(session=self.session)
  577. def test_osenv(self):
  578. """
  579. test for not having environ "B10_FROM_SOURCE"
  580. """
  581. if "B10_FROM_SOURCE" in os.environ:
  582. path = os.environ["B10_FROM_SOURCE"]
  583. os.environ.pop("B10_FROM_SOURCE")
  584. imp.reload(stats)
  585. os.environ["B10_FROM_SOURCE"] = path
  586. imp.reload(stats)
  587. def result_ok(*args):
  588. if args:
  589. return { 'result': list(args) }
  590. else:
  591. return { 'result': [ 0 ] }
  592. if __name__ == "__main__":
  593. unittest.main()