ccsession.cc 14 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275276277278279280281282283284285286287288289290291292293294295296297298299300301302303304305306307308309310311312313314315316317318319320321322323324325326327328329330331332333334335336337338339340341342343344345346347348349350351352353354355356357358359360361362363364365366367368369370371372373374375376377378379380381382383384385386387388389390391392393394395396397398399400401402403404405406407408409410411412413414415416417418419420421422
  1. // Copyright (C) 2009 Internet Systems Consortium, Inc. ("ISC")
  2. //
  3. // Permission to use, copy, modify, and/or distribute this software for any
  4. // purpose with or without fee is hereby granted, provided that the above
  5. // copyright notice and this permission notice appear in all copies.
  6. //
  7. // THE SOFTWARE IS PROVIDED "AS IS" AND ISC DISCLAIMS ALL WARRANTIES WITH
  8. // REGARD TO THIS SOFTWARE INCLUDING ALL IMPLIED WARRANTIES OF MERCHANTABILITY
  9. // AND FITNESS. IN NO EVENT SHALL ISC BE LIABLE FOR ANY SPECIAL, DIRECT,
  10. // INDIRECT, OR CONSEQUENTIAL DAMAGES OR ANY DAMAGES WHATSOEVER RESULTING FROM
  11. // LOSS OF USE, DATA OR PROFITS, WHETHER IN AN ACTION OF CONTRACT, NEGLIGENCE
  12. // OR OTHER TORTIOUS ACTION, ARISING OUT OF OR IN CONNECTION WITH THE USE OR
  13. // PERFORMANCE OF THIS SOFTWARE.
  14. // $Id$
  15. //
  16. // todo: generalize this and make it into a specific API for all modules
  17. // to use (i.e. connect to cc, send config and commands, get config,
  18. // react on config change announcements)
  19. //
  20. #include "config.h"
  21. #include <stdexcept>
  22. #include <stdlib.h>
  23. #include <string.h>
  24. #include <sys/time.h>
  25. #include <iostream>
  26. #include <fstream>
  27. #include <sstream>
  28. #include <cerrno>
  29. #include <boost/bind.hpp>
  30. #include <boost/foreach.hpp>
  31. #include <cc/data.h>
  32. #include <module_spec.h>
  33. #include <cc/session.h>
  34. #include <exceptions/exceptions.h>
  35. #include "ccsession.h"
  36. #include "config.h"
  37. using namespace std;
  38. using isc::data::Element;
  39. using isc::data::ElementPtr;
  40. using isc::data::ParseError;
  41. namespace isc {
  42. namespace config {
  43. /// Creates a standard config/command protocol answer message
  44. ElementPtr
  45. createAnswer()
  46. {
  47. ElementPtr answer = Element::createFromString("{\"result\": [] }");
  48. ElementPtr answer_content = answer->get("result");
  49. answer_content->add(Element::create(0));
  50. return answer;
  51. }
  52. ElementPtr
  53. createAnswer(const int rcode, const ElementPtr arg)
  54. {
  55. if (rcode != 0 && (!arg || arg->getType() != Element::string)) {
  56. isc_throw(CCSessionError, "Bad or no argument for rcode != 0");
  57. }
  58. ElementPtr answer = Element::createFromString("{\"result\": [] }");
  59. ElementPtr answer_content = answer->get("result");
  60. answer_content->add(Element::create(rcode));
  61. answer_content->add(arg);
  62. return answer;
  63. }
  64. ElementPtr
  65. createAnswer(const int rcode, const std::string& arg)
  66. {
  67. ElementPtr answer = Element::createFromString("{\"result\": [] }");
  68. ElementPtr answer_content = answer->get("result");
  69. answer_content->add(Element::create(rcode));
  70. answer_content->add(Element::create(arg));
  71. return answer;
  72. }
  73. ElementPtr
  74. parseAnswer(int &rcode, const ElementPtr msg)
  75. {
  76. if (msg &&
  77. msg->getType() == Element::map &&
  78. msg->contains("result")) {
  79. ElementPtr result = msg->get("result");
  80. if (result->getType() != Element::list) {
  81. isc_throw(CCSessionError, "Result element in answer message is not a list");
  82. } else if (result->get(0)->getType() != Element::integer) {
  83. isc_throw(CCSessionError, "First element of result is not an rcode in answer message");
  84. }
  85. rcode = result->get(0)->intValue();
  86. if (result->size() > 1) {
  87. if (rcode == 0 || result->get(1)->getType() == Element::string) {
  88. return result->get(1);
  89. } else {
  90. isc_throw(CCSessionError, "Error description in result with rcode != 0 is not a string");
  91. }
  92. } else {
  93. if (rcode == 0) {
  94. return ElementPtr();
  95. } else {
  96. isc_throw(CCSessionError, "Result with rcode != 0 does not have an error description");
  97. }
  98. }
  99. } else {
  100. isc_throw(CCSessionError, "No result part in answer message");
  101. }
  102. }
  103. ElementPtr
  104. createCommand(const std::string& command)
  105. {
  106. return createCommand(command, ElementPtr());
  107. }
  108. ElementPtr
  109. createCommand(const std::string& command, ElementPtr arg)
  110. {
  111. ElementPtr cmd = Element::createMap();
  112. ElementPtr cmd_parts = Element::createList();
  113. cmd_parts->add(Element::create(command));
  114. if (arg) {
  115. cmd_parts->add(arg);
  116. }
  117. cmd->set("command", cmd_parts);
  118. return cmd;
  119. }
  120. /// Returns "" and empty ElementPtr() if this does not
  121. /// look like a command
  122. const std::string
  123. parseCommand(ElementPtr& arg, const ElementPtr command)
  124. {
  125. if (command &&
  126. command->getType() == Element::map &&
  127. command->contains("command")) {
  128. ElementPtr cmd = command->get("command");
  129. if (cmd->getType() == Element::list &&
  130. cmd->size() > 0 &&
  131. cmd->get(0)->getType() == Element::string) {
  132. if (cmd->size() > 1) {
  133. arg = cmd->get(1);
  134. } else {
  135. arg = ElementPtr();
  136. }
  137. return cmd->get(0)->stringValue();
  138. } else {
  139. isc_throw(CCSessionError, "Command part in command message missing, empty, or not a list");
  140. }
  141. } else {
  142. isc_throw(CCSessionError, "Command Element empty or not a map with \"command\"");
  143. }
  144. }
  145. ModuleSpec
  146. ModuleCCSession::readModuleSpecification(const std::string& filename) {
  147. std::ifstream file;
  148. ModuleSpec module_spec;
  149. // this file should be declared in a @something@ directive
  150. file.open(filename.c_str());
  151. if (!file) {
  152. cout << "error opening " << filename << ": " << strerror(errno) << endl;
  153. exit(1);
  154. }
  155. try {
  156. module_spec = moduleSpecFromFile(file, true);
  157. } catch (ParseError pe) {
  158. cout << "Error parsing module specification file: " << pe.what() << endl;
  159. exit(1);
  160. } catch (ModuleSpecError dde) {
  161. cout << "Error reading module specification file: " << dde.what() << endl;
  162. exit(1);
  163. }
  164. file.close();
  165. return module_spec;
  166. }
  167. void
  168. ModuleCCSession::startCheck() {
  169. // data available on the command channel. process it in the synchronous
  170. // mode.
  171. checkCommand();
  172. // start asynchronous read again.
  173. session_.startRead(boost::bind(&ModuleCCSession::startCheck, this));
  174. }
  175. ModuleCCSession::ModuleCCSession(
  176. std::string spec_file_name,
  177. asio::io_service& io_service,
  178. isc::data::ElementPtr(*config_handler)(isc::data::ElementPtr new_config),
  179. isc::data::ElementPtr(*command_handler)(
  180. const std::string& command, const isc::data::ElementPtr args)
  181. ) throw (isc::cc::SessionError) :
  182. session_(io_service)
  183. {
  184. init(spec_file_name, config_handler, command_handler);
  185. // register callback for asynchronous read
  186. session_.startRead(boost::bind(&ModuleCCSession::startCheck, this));
  187. }
  188. ModuleCCSession::ModuleCCSession(
  189. std::string spec_file_name,
  190. isc::data::ElementPtr(*config_handler)(isc::data::ElementPtr new_config),
  191. isc::data::ElementPtr(*command_handler)(
  192. const std::string& command, const isc::data::ElementPtr args)
  193. ) throw (isc::cc::SessionError)
  194. {
  195. init(spec_file_name, config_handler, command_handler);
  196. }
  197. void
  198. ModuleCCSession::init(
  199. std::string spec_file_name,
  200. isc::data::ElementPtr(*config_handler)(isc::data::ElementPtr new_config),
  201. isc::data::ElementPtr(*command_handler)(
  202. const std::string& command, const isc::data::ElementPtr args)
  203. ) throw (isc::cc::SessionError)
  204. {
  205. module_specification_ = readModuleSpecification(spec_file_name);
  206. setModuleSpec(module_specification_);
  207. module_name_ = module_specification_.getFullSpec()->get("module_name")->stringValue();
  208. config_handler_ = config_handler;
  209. command_handler_ = command_handler;
  210. ElementPtr answer, env;
  211. session_.establish();
  212. session_.subscribe(module_name_, "*");
  213. //session_.subscribe("Boss", "*");
  214. //session_.subscribe("statistics", "*");
  215. // send the data specification
  216. ElementPtr spec_msg = createCommand("module_spec", module_specification_.getFullSpec());
  217. unsigned int seq = session_.group_sendmsg(spec_msg, "ConfigManager");
  218. session_.group_recvmsg(env, answer, false, seq);
  219. int rcode;
  220. ElementPtr err = parseAnswer(rcode, answer);
  221. if (rcode != 0) {
  222. std::cerr << "[" << module_name_ << "] Error in specification: " << answer << std::endl;
  223. }
  224. setLocalConfig(Element::createFromString("{}"));
  225. // get any stored configuration from the manager
  226. if (config_handler_) {
  227. ElementPtr cmd = Element::createFromString("{ \"command\": [\"get_config\", {\"module_name\":\"" + module_name_ + "\"} ] }");
  228. seq = session_.group_sendmsg(cmd, "ConfigManager");
  229. session_.group_recvmsg(env, answer, false, seq);
  230. ElementPtr new_config = parseAnswer(rcode, answer);
  231. if (rcode == 0) {
  232. handleConfigUpdate(new_config);
  233. } else {
  234. std::cerr << "[" << module_name_ << "] Error getting config: " << new_config << std::endl;
  235. }
  236. }
  237. }
  238. /// Validates the new config values, if they are correct,
  239. /// call the config handler with the values that have changed
  240. /// If that results in success, store the new config
  241. ElementPtr
  242. ModuleCCSession::handleConfigUpdate(ElementPtr new_config)
  243. {
  244. ElementPtr answer;
  245. ElementPtr errors = Element::createList();
  246. if (!config_handler_) {
  247. answer = createAnswer(1, module_name_ + " does not have a config handler");
  248. } else if (!module_specification_.validate_config(new_config, false, errors)) {
  249. std::stringstream ss;
  250. ss << "Error in config validation: ";
  251. BOOST_FOREACH(ElementPtr error, errors->listValue()) {
  252. ss << error->stringValue();
  253. }
  254. answer = createAnswer(2, ss.str());
  255. } else {
  256. // remove the values that have not changed
  257. isc::data::removeIdentical(new_config, getLocalConfig());
  258. // handle config update
  259. answer = config_handler_(new_config);
  260. int rcode;
  261. parseAnswer(rcode, answer);
  262. if (rcode == 0) {
  263. ElementPtr local_config = getLocalConfig();
  264. isc::data::merge(local_config, new_config);
  265. setLocalConfig(local_config);
  266. }
  267. }
  268. return answer;
  269. }
  270. int
  271. ModuleCCSession::getSocket()
  272. {
  273. return (session_.getSocket());
  274. }
  275. bool
  276. ModuleCCSession::hasQueuedMsgs()
  277. {
  278. return (session_.hasQueuedMsgs());
  279. }
  280. int
  281. ModuleCCSession::checkCommand()
  282. {
  283. ElementPtr cmd, routing, data;
  284. if (session_.group_recvmsg(routing, data, true)) {
  285. /* ignore result messages (in case we're out of sync, to prevent
  286. * pingpongs */
  287. if (data->getType() != Element::map || data->contains("result")) {
  288. return 0;
  289. }
  290. ElementPtr arg;
  291. ElementPtr answer;
  292. try {
  293. std::string cmd_str = parseCommand(arg, data);
  294. if (cmd_str == "config_update") {
  295. std::string target_module = routing->get("group")->stringValue();
  296. if (target_module == module_name_) {
  297. answer = handleConfigUpdate(arg);
  298. } else {
  299. // ok this update is not for us, if we have this module
  300. // in our remote config list, update that
  301. updateRemoteConfig(target_module, arg);
  302. // we're not supposed to answer to this, so return
  303. return 0;
  304. }
  305. } else {
  306. if (command_handler_) {
  307. answer = command_handler_(cmd_str, arg);
  308. } else {
  309. answer = createAnswer(1, "Command given but no command handler for module");
  310. }
  311. }
  312. } catch (CCSessionError re) {
  313. answer = createAnswer(1, re.what());
  314. }
  315. session_.reply(routing, answer);
  316. }
  317. return 0;
  318. }
  319. std::string
  320. ModuleCCSession::addRemoteConfig(const std::string& spec_file_name)
  321. {
  322. ModuleSpec rmod_spec = readModuleSpecification(spec_file_name);
  323. std::string module_name = rmod_spec.getFullSpec()->get("module_name")->stringValue();
  324. ConfigData rmod_config = ConfigData(rmod_spec);
  325. session_.subscribe(module_name);
  326. // Get the current configuration values for that module
  327. ElementPtr cmd = Element::createFromString("{ \"command\": [\"get_config\", {\"module_name\":\"" + module_name + "\"} ] }");
  328. ElementPtr env, answer;
  329. int rcode;
  330. unsigned int seq = session_.group_sendmsg(cmd, "ConfigManager");
  331. session_.group_recvmsg(env, answer, false, seq);
  332. ElementPtr new_config = parseAnswer(rcode, answer);
  333. if (rcode == 0) {
  334. rmod_config.setLocalConfig(new_config);
  335. } else {
  336. isc_throw(CCSessionError, "Error getting config for " + module_name + ": " + answer->str());
  337. }
  338. // all ok, add it
  339. remote_module_configs_[module_name] = rmod_config;
  340. return module_name;
  341. }
  342. void
  343. ModuleCCSession::removeRemoteConfig(const std::string& module_name)
  344. {
  345. std::map<std::string, ConfigData>::iterator it;
  346. it = remote_module_configs_.find(module_name);
  347. if (it != remote_module_configs_.end()) {
  348. remote_module_configs_.erase(it);
  349. session_.unsubscribe(module_name);
  350. }
  351. }
  352. ElementPtr
  353. ModuleCCSession::getRemoteConfigValue(const std::string& module_name, const std::string& identifier)
  354. {
  355. std::map<std::string, ConfigData>::iterator it;
  356. it = remote_module_configs_.find(module_name);
  357. if (it != remote_module_configs_.end()) {
  358. return remote_module_configs_[module_name].getValue(identifier);
  359. } else {
  360. isc_throw(CCSessionError, "Remote module " + module_name + " not found.");
  361. }
  362. }
  363. void
  364. ModuleCCSession::updateRemoteConfig(const std::string& module_name, ElementPtr new_config)
  365. {
  366. std::map<std::string, ConfigData>::iterator it;
  367. it = remote_module_configs_.find(module_name);
  368. if (it != remote_module_configs_.end()) {
  369. ElementPtr rconf = (*it).second.getLocalConfig();
  370. isc::data::merge(rconf, new_config);
  371. }
  372. }
  373. }
  374. }