msgq.py.in 30 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275276277278279280281282283284285286287288289290291292293294295296297298299300301302303304305306307308309310311312313314315316317318319320321322323324325326327328329330331332333334335336337338339340341342343344345346347348349350351352353354355356357358359360361362363364365366367368369370371372373374375376377378379380381382383384385386387388389390391392393394395396397398399400401402403404405406407408409410411412413414415416417418419420421422423424425426427428429430431432433434435436437438439440441442443444445446447448449450451452453454455456457458459460461462463464465466467468469470471472473474475476477478479480481482483484485486487488489490491492493494495496497498499500501502503504505506507508509510511512513514515516517518519520521522523524525526527528529530531532533534535536537538539540541542543544545546547548549550551552553554555556557558559560561562563564565566567568569570571572573574575576577578579580581582583584585586587588589590591592593594595596597598599600601602603604605606607608609610611612613614615616617618619620621622623624625626627628629630631632633634635636637638639640641642643644645646647648649650651652653654655656657658659660661662663664665666667668669670671672673674675676677678679680681682683684685686687688689690691692693694695696697698699700701702703704705706707708709710711712713714715716717718719720721722723724725726727728729730731732733734735736737738739740741742743744745746747748749750751752753754755756757758759760761762763764765766767768769770771772773774775776777778779780781782783784785786787788789790791
  1. #!@PYTHON@
  2. # Copyright (C) 2010 Internet Systems Consortium.
  3. #
  4. # Permission to use, copy, modify, and distribute this software for any
  5. # purpose with or without fee is hereby granted, provided that the above
  6. # copyright notice and this permission notice appear in all copies.
  7. #
  8. # THE SOFTWARE IS PROVIDED "AS IS" AND INTERNET SYSTEMS CONSORTIUM
  9. # DISCLAIMS ALL WARRANTIES WITH REGARD TO THIS SOFTWARE INCLUDING ALL
  10. # IMPLIED WARRANTIES OF MERCHANTABILITY AND FITNESS. IN NO EVENT SHALL
  11. # INTERNET SYSTEMS CONSORTIUM BE LIABLE FOR ANY SPECIAL, DIRECT,
  12. # INDIRECT, OR CONSEQUENTIAL DAMAGES OR ANY DAMAGES WHATSOEVER RESULTING
  13. # FROM LOSS OF USE, DATA OR PROFITS, WHETHER IN AN ACTION OF CONTRACT,
  14. # NEGLIGENCE OR OTHER TORTIOUS ACTION, ARISING OUT OF OR IN CONNECTION
  15. # WITH THE USE OR PERFORMANCE OF THIS SOFTWARE.
  16. import sys; sys.path.append ('@@PYTHONPATH@@')
  17. """This code implements the msgq daemon."""
  18. import subprocess
  19. import signal
  20. import os
  21. import socket
  22. import sys
  23. import struct
  24. import errno
  25. import time
  26. import select
  27. import random
  28. import threading
  29. import isc.config.ccsession
  30. from optparse import OptionParser, OptionValueError
  31. import isc.util.process
  32. import isc.log
  33. from isc.log_messages.msgq_messages import *
  34. import isc.cc
  35. isc.util.process.rename()
  36. isc.log.init("b10-msgq", buffer=True)
  37. # Logger that is used in the actual msgq handling - startup, shutdown and the
  38. # poller thread.
  39. logger = isc.log.Logger("msgq")
  40. # A separate copy for the master/config thread when the poller thread runs.
  41. # We use a separate instance, since the logger itself doesn't have to be
  42. # thread safe.
  43. config_logger = isc.log.Logger("msgq")
  44. TRACE_START = logger.DBGLVL_START_SHUT
  45. TRACE_BASIC = logger.DBGLVL_TRACE_BASIC
  46. TRACE_DETAIL = logger.DBGLVL_TRACE_DETAIL
  47. # This is the version that gets displayed to the user.
  48. # The VERSION string consists of the module name, the module version
  49. # number, and the overall BIND 10 version number (set in configure.ac).
  50. VERSION = "b10-msgq 20110127 (BIND 10 @PACKAGE_VERSION@)"
  51. # If B10_FROM_BUILD is set in the environment, we use data files
  52. # from a directory relative to that, otherwise we use the ones
  53. # installed on the system
  54. if "B10_FROM_BUILD" in os.environ:
  55. SPECFILE_PATH = os.environ["B10_FROM_BUILD"] + "/src/bin/msgq"
  56. else:
  57. PREFIX = "@prefix@"
  58. DATAROOTDIR = "@datarootdir@"
  59. SPECFILE_PATH = "@datadir@/@PACKAGE@".replace("${datarootdir}", DATAROOTDIR).replace("${prefix}", PREFIX)
  60. SPECFILE_LOCATION = SPECFILE_PATH + "/msgq.spec"
  61. class MsgQReceiveError(Exception): pass
  62. class SubscriptionManager:
  63. def __init__(self, cfgmgr_ready):
  64. """
  65. Initialize the subscription manager.
  66. parameters:
  67. * cfgmgr_ready: A callable object run once the config manager
  68. subscribes. This is a hackish solution, but we can't read
  69. the configuration sooner.
  70. """
  71. self.subscriptions = {}
  72. self.__cfgmgr_ready = cfgmgr_ready
  73. self.__cfgmgr_ready_called = False
  74. def subscribe(self, group, instance, socket):
  75. """Add a subscription."""
  76. target = ( group, instance )
  77. if target in self.subscriptions:
  78. logger.debug(TRACE_BASIC, MSGQ_SUBS_APPEND_TARGET, group, instance)
  79. if socket not in self.subscriptions[target]:
  80. self.subscriptions[target].append(socket)
  81. else:
  82. logger.debug(TRACE_BASIC, MSGQ_SUBS_NEW_TARGET, group, instance)
  83. self.subscriptions[target] = [ socket ]
  84. if group == "ConfigManager" and not self.__cfgmgr_ready_called:
  85. logger.debug(TRACE_BASIC, MSGQ_CFGMGR_SUBSCRIBED)
  86. self.__cfgmgr_ready_called = True
  87. self.__cfgmgr_ready()
  88. def unsubscribe(self, group, instance, socket):
  89. """Remove the socket from the one specific subscription."""
  90. target = ( group, instance )
  91. if target in self.subscriptions:
  92. if socket in self.subscriptions[target]:
  93. self.subscriptions[target].remove(socket)
  94. def unsubscribe_all(self, socket):
  95. """Remove the socket from all subscriptions."""
  96. for socklist in self.subscriptions.values():
  97. if socket in socklist:
  98. socklist.remove(socket)
  99. def find_sub(self, group, instance):
  100. """Return an array of sockets which want this specific group,
  101. instance."""
  102. target = (group, instance)
  103. if target in self.subscriptions:
  104. return self.subscriptions[target]
  105. else:
  106. return []
  107. def find(self, group, instance):
  108. """Return an array of sockets who should get something sent to
  109. this group, instance pair. This includes wildcard subscriptions."""
  110. target = (group, instance)
  111. partone = self.find_sub(group, instance)
  112. parttwo = self.find_sub(group, "*")
  113. return list(set(partone + parttwo))
  114. class MsgQ:
  115. """Message Queue class."""
  116. # did we find a better way to do this?
  117. SOCKET_FILE = os.path.join("@localstatedir@",
  118. "@PACKAGE_NAME@",
  119. "msgq_socket").replace("${prefix}",
  120. "@prefix@")
  121. def __init__(self, socket_file=None, verbose=False):
  122. """Initialize the MsgQ master.
  123. The socket_file specifies the path to the UNIX domain socket
  124. that the msgq process listens on. If it is None, the
  125. environment variable BIND10_MSGQ_SOCKET_FILE is used. If that
  126. is not set, it will default to
  127. @localstatedir@/@PACKAGE_NAME@/msg_socket.
  128. If verbose is True, then the MsgQ reports
  129. what it is doing.
  130. """
  131. if socket_file is None:
  132. if "BIND10_MSGQ_SOCKET_FILE" in os.environ:
  133. self.socket_file = os.environ["BIND10_MSGQ_SOCKET_FILE"]
  134. else:
  135. self.socket_file = self.SOCKET_FILE
  136. else:
  137. self.socket_file = socket_file
  138. self.verbose = verbose
  139. self.poller = None
  140. self.kqueue = None
  141. self.runnable = False
  142. self.listen_socket = False
  143. self.sockets = {}
  144. self.connection_counter = random.random()
  145. self.hostname = socket.gethostname()
  146. self.subs = SubscriptionManager(self.cfgmgr_ready)
  147. self.lnames = {}
  148. self.sendbuffs = {}
  149. self.running = False
  150. self.__cfgmgr_ready = None
  151. self.__cfgmgr_ready_cond = threading.Condition()
  152. # A lock used when the message queue does anything more complicated.
  153. # It is mostly a safety measure, the threads doing so should be mostly
  154. # independent, and the one with config session should be read only,
  155. # but with threads, one never knows. We use threads for concurrency,
  156. # not for performance, so we use wide lock scopes to be on the safe
  157. # side.
  158. self.__lock = threading.Lock()
  159. def cfgmgr_ready(self, ready=True):
  160. """Notify that the config manager is either subscribed, or
  161. that the msgq is shutting down and it won't connect, but
  162. anybody waiting for it should stop anyway.
  163. The ready parameter signifies if the config manager is subscribed.
  164. This method can be called multiple times, but second and any
  165. following call is simply ignored. This means the "abort" version
  166. of the call can be used on any stop unconditionally, even when
  167. the config manager already connected.
  168. """
  169. with self.__cfgmgr_ready_cond:
  170. if self.__cfgmgr_ready is not None:
  171. # This is a second call to this method. In that case it does
  172. # nothing.
  173. return
  174. self.__cfgmgr_ready = ready
  175. self.__cfgmgr_ready_cond.notify_all()
  176. def wait_cfgmgr(self):
  177. """Wait for msgq to subscribe.
  178. When this returns, the config manager is either subscribed, or
  179. msgq gave up waiting for it. Success is signified by the return
  180. value.
  181. """
  182. with self.__cfgmgr_ready_cond:
  183. # Wait until it either aborts or subscribes
  184. while self.__cfgmgr_ready is None:
  185. self.__cfgmgr_ready_cond.wait()
  186. return self.__cfgmgr_ready
  187. def setup_poller(self):
  188. """Set up the poll thing. Internal function."""
  189. try:
  190. self.kqueue = select.kqueue()
  191. except AttributeError:
  192. self.poller = select.poll()
  193. def add_kqueue_socket(self, socket, write_filter=False):
  194. """Add a kqueue filter for a socket. By default the read
  195. filter is used; if write_filter is set to True, the write
  196. filter is used. We use a boolean value instead of a specific
  197. filter constant, because kqueue filter values do not seem to
  198. be defined on some systems. The use of boolean makes the
  199. interface restrictive because there are other filters, but this
  200. method is mostly only for our internal use, so it should be
  201. acceptable at least for now."""
  202. filter_type = select.KQ_FILTER_WRITE if write_filter else \
  203. select.KQ_FILTER_READ
  204. event = select.kevent(socket.fileno(), filter_type,
  205. select.KQ_EV_ADD | select.KQ_EV_ENABLE)
  206. self.kqueue.control([event], 0)
  207. def delete_kqueue_socket(self, socket, write_filter=False):
  208. """Delete a kqueue filter for socket. See add_kqueue_socket()
  209. for the semantics and notes about write_filter."""
  210. filter_type = select.KQ_FILTER_WRITE if write_filter else \
  211. select.KQ_FILTER_READ
  212. event = select.kevent(socket.fileno(), filter_type,
  213. select.KQ_EV_DELETE)
  214. self.kqueue.control([event], 0)
  215. def setup_listener(self):
  216. """Set up the listener socket. Internal function."""
  217. logger.debug(TRACE_BASIC, MSGQ_LISTENER_SETUP, self.socket_file)
  218. self.listen_socket = socket.socket(socket.AF_UNIX, socket.SOCK_STREAM)
  219. if os.path.exists(self.socket_file):
  220. os.remove(self.socket_file)
  221. try:
  222. self.listen_socket.bind(self.socket_file)
  223. self.listen_socket.listen(1024)
  224. except Exception as e:
  225. # remove the file again if something goes wrong
  226. # (note this is a catch-all, but we reraise it)
  227. if os.path.exists(self.socket_file):
  228. os.remove(self.socket_file)
  229. self.listen_socket.close()
  230. logger.fatal(MSGQ_LISTENER_FAILED, self.socket_file, e)
  231. raise e
  232. if self.poller:
  233. self.poller.register(self.listen_socket, select.POLLIN)
  234. else:
  235. self.add_kqueue_socket(self.listen_socket)
  236. def setup_signalsock(self):
  237. """Create a socket pair used to signal when we want to finish.
  238. Using a socket is easy and thread/signal safe way to signal
  239. the termination.
  240. """
  241. # The __poller_sock will be the end in the poller. When it is
  242. # closed, we should shut down.
  243. (self.__poller_sock, self.__control_sock) = socket.socketpair()
  244. if self.poller:
  245. self.poller.register(self.__poller_sock, select.POLLIN)
  246. else:
  247. self.add_kqueue_socket(self.__poller_sock)
  248. def setup(self):
  249. """Configure listener socket, polling, etc.
  250. Raises a socket.error if the socket_file cannot be
  251. created.
  252. """
  253. self.setup_poller()
  254. self.setup_signalsock()
  255. self.setup_listener()
  256. logger.debug(TRACE_START, MSGQ_LISTENER_STARTED);
  257. self.runnable = True
  258. def process_accept(self):
  259. """Process an accept on the listening socket."""
  260. newsocket, ipaddr = self.listen_socket.accept()
  261. # TODO: When we have logging, we might want
  262. # to add a debug message here that a new connection
  263. # was made
  264. self.register_socket(newsocket)
  265. def register_socket(self, newsocket):
  266. """
  267. Internal function to insert a socket. Used by process_accept and some tests.
  268. """
  269. self.sockets[newsocket.fileno()] = newsocket
  270. lname = self.newlname()
  271. self.lnames[lname] = newsocket
  272. if self.poller:
  273. self.poller.register(newsocket, select.POLLIN)
  274. else:
  275. self.add_kqueue_socket(newsocket)
  276. def process_socket(self, fd):
  277. """Process a read on a socket."""
  278. if not fd in self.sockets:
  279. logger.error(MSGQ_READ_UNKNOWN_FD, fd)
  280. return
  281. sock = self.sockets[fd]
  282. self.process_packet(fd, sock)
  283. def kill_socket(self, fd, sock):
  284. """Fully close down the socket."""
  285. if self.poller:
  286. self.poller.unregister(sock)
  287. self.subs.unsubscribe_all(sock)
  288. lname = [ k for k, v in self.lnames.items() if v == sock ][0]
  289. del self.lnames[lname]
  290. sock.close()
  291. del self.sockets[fd]
  292. if fd in self.sendbuffs:
  293. del self.sendbuffs[fd]
  294. logger.debug(TRACE_BASIC, MSGQ_SOCK_CLOSE, fd)
  295. def getbytes(self, fd, sock, length):
  296. """Get exactly the requested bytes, or raise an exception if
  297. EOF."""
  298. received = b''
  299. while len(received) < length:
  300. try:
  301. data = sock.recv(length - len(received))
  302. except socket.error:
  303. raise MsgQReceiveError(socket.error)
  304. if len(data) == 0:
  305. raise MsgQReceiveError("EOF")
  306. received += data
  307. return received
  308. def read_packet(self, fd, sock):
  309. """Read a correctly formatted packet. Will raise exceptions if
  310. something fails."""
  311. lengths = self.getbytes(fd, sock, 6)
  312. overall_length, routing_length = struct.unpack(">IH", lengths)
  313. if overall_length < 2:
  314. raise MsgQReceiveError("overall_length < 2")
  315. overall_length -= 2
  316. if routing_length > overall_length:
  317. raise MsgQReceiveError("routing_length > overall_length")
  318. if routing_length == 0:
  319. raise MsgQReceiveError("routing_length == 0")
  320. data_length = overall_length - routing_length
  321. # probably need to sanity check lengths here...
  322. routing = self.getbytes(fd, sock, routing_length)
  323. if data_length > 0:
  324. data = self.getbytes(fd, sock, data_length)
  325. else:
  326. data = None
  327. return (routing, data)
  328. def process_packet(self, fd, sock):
  329. """Process one packet."""
  330. try:
  331. routing, data = self.read_packet(fd, sock)
  332. except MsgQReceiveError as err:
  333. logger.error(MSGQ_RECV_ERR, fd, err)
  334. self.kill_socket(fd, sock)
  335. return
  336. try:
  337. routingmsg = isc.cc.message.from_wire(routing)
  338. except DecodeError as err:
  339. self.kill_socket(fd, sock)
  340. logger.error(MSGQ_HDR_DECODE_ERR, fd, err)
  341. return
  342. self.process_command(fd, sock, routingmsg, data)
  343. def process_command(self, fd, sock, routing, data):
  344. """Process a single command. This will split out into one of the
  345. other functions."""
  346. logger.debug(TRACE_DETAIL, MSGQ_RECV_HDR, routing)
  347. cmd = routing["type"]
  348. if cmd == 'send':
  349. self.process_command_send(sock, routing, data)
  350. elif cmd == 'subscribe':
  351. self.process_command_subscribe(sock, routing, data)
  352. elif cmd == 'unsubscribe':
  353. self.process_command_unsubscribe(sock, routing, data)
  354. elif cmd == 'getlname':
  355. self.process_command_getlname(sock, routing, data)
  356. elif cmd == 'ping':
  357. # Command for testing purposes
  358. self.process_command_ping(sock, routing, data)
  359. elif cmd == 'stop':
  360. self.stop()
  361. else:
  362. logger.error(MSGQ_INVALID_CMD, cmd)
  363. def preparemsg(self, env, msg = None):
  364. if type(env) == dict:
  365. env = isc.cc.message.to_wire(env)
  366. if type(msg) == dict:
  367. msg = isc.cc.message.to_wire(msg)
  368. length = 2 + len(env);
  369. if msg:
  370. length += len(msg)
  371. ret = struct.pack("!IH", length, len(env))
  372. ret += env
  373. if msg:
  374. ret += msg
  375. return ret
  376. def sendmsg(self, sock, env, msg = None):
  377. self.send_prepared_msg(sock, self.preparemsg(env, msg))
  378. def __send_data(self, sock, data):
  379. """
  380. Send a piece of data to the given socket.
  381. Parameters:
  382. sock: The socket to send to
  383. data: The list of bytes to send
  384. Returns:
  385. An integer or None. If an integer (which can be 0), it signals
  386. the number of bytes sent. If None, the socket appears to have
  387. been closed on the other end, and it has been killed on this
  388. side too.
  389. """
  390. try:
  391. # We set the socket nonblocking, MSG_DONTWAIT doesn't exist
  392. # on some OSes
  393. sock.setblocking(0)
  394. return sock.send(data)
  395. except socket.error as e:
  396. if e.errno in [ errno.EAGAIN,
  397. errno.EWOULDBLOCK,
  398. errno.EINTR ]:
  399. return 0
  400. elif e.errno in [ errno.EPIPE,
  401. errno.ECONNRESET,
  402. errno.ENOBUFS ]:
  403. logger.error(MSGQ_SEND_ERR, sock.fileno(),
  404. errno.errorcode[e.errno])
  405. self.kill_socket(sock.fileno(), sock)
  406. return None
  407. else:
  408. raise e
  409. finally:
  410. # And set it back again
  411. sock.setblocking(1)
  412. def send_prepared_msg(self, sock, msg):
  413. # Try to send the data, but only if there's nothing waiting
  414. fileno = sock.fileno()
  415. if fileno in self.sendbuffs:
  416. amount_sent = 0
  417. else:
  418. amount_sent = self.__send_data(sock, msg)
  419. if amount_sent is None:
  420. # Socket has been killed, drop the send
  421. return
  422. # Still something to send, add it to outgoing queue
  423. if amount_sent < len(msg):
  424. now = time.clock()
  425. # Append it to buffer (but check the data go away)
  426. if fileno in self.sendbuffs:
  427. (last_sent, buff) = self.sendbuffs[fileno]
  428. if now - last_sent > 0.1:
  429. self.kill_socket(fileno, sock)
  430. return
  431. buff += msg
  432. else:
  433. buff = msg[amount_sent:]
  434. last_sent = now
  435. if self.poller:
  436. self.poller.register(fileno, select.POLLIN |
  437. select.POLLOUT)
  438. else:
  439. self.add_kqueue_socket(sock, True)
  440. self.sendbuffs[fileno] = (last_sent, buff)
  441. def __process_write(self, fileno):
  442. # Try to send some data from the buffer
  443. (_, msg) = self.sendbuffs[fileno]
  444. sock = self.sockets[fileno]
  445. amount_sent = self.__send_data(sock, msg)
  446. if amount_sent is not None:
  447. # Keep the rest
  448. msg = msg[amount_sent:]
  449. if len(msg) == 0:
  450. # If there's no more, stop requesting for write availability
  451. if self.poller:
  452. self.poller.register(fileno, select.POLLIN)
  453. else:
  454. self.delete_kqueue_socket(sock, True)
  455. del self.sendbuffs[fileno]
  456. else:
  457. self.sendbuffs[fileno] = (time.clock(), msg)
  458. def newlname(self):
  459. """Generate a unique connection identifier for this socket.
  460. This is done by using an increasing counter and the current
  461. time."""
  462. self.connection_counter += 1
  463. return "%x_%x@%s" % (time.time(), self.connection_counter, self.hostname)
  464. def process_command_ping(self, sock, routing, data):
  465. self.sendmsg(sock, { "type" : "pong" }, data)
  466. def process_command_getlname(self, sock, routing, data):
  467. lname = [ k for k, v in self.lnames.items() if v == sock ][0]
  468. self.sendmsg(sock, { "type" : "getlname" }, { "lname" : lname })
  469. def process_command_send(self, sock, routing, data):
  470. group = routing["group"]
  471. instance = routing["instance"]
  472. to = routing["to"]
  473. if group == None or instance == None:
  474. # FIXME: Should we log them instead?
  475. return # ignore invalid packets entirely
  476. if to == "*":
  477. sockets = self.subs.find(group, instance)
  478. else:
  479. if to in self.lnames:
  480. sockets = [ self.lnames[to] ]
  481. else:
  482. sockets = []
  483. msg = self.preparemsg(routing, data)
  484. if sock in sockets:
  485. # Don't bounce to self
  486. sockets.remove(sock)
  487. if sockets:
  488. for socket in sockets:
  489. self.send_prepared_msg(socket, msg)
  490. elif routing.get("wants_reply") and "reply" not in routing:
  491. # We have no recipients. But the sender insists on a reply
  492. # (and the message isn't a reply itself). We need to send
  493. # an error to satisfy the senders hurger for response, since
  494. # nobody else will do that.
  495. # The real errors would be positive, 1 most probably. We use
  496. # negative errors for delivery errors to distinguish them a
  497. # little. We probably should have a way to provide more data
  498. # in the error message.
  499. payload = isc.config.ccsession.create_answer(-1,
  500. "No such recipient")
  501. # We create the header based on the current one. But we don't
  502. # want to mangle it for the caller, so we get a copy. A shallow
  503. # one should be enough, we modify the dict only.
  504. header = routing.copy()
  505. header["reply"] = routing["seq"]
  506. header["from"] = 'msgq' # Dummy lname not assigned to clients
  507. header["to"] = routing["from"]
  508. # We keep the seq as it is. We don't need to track the message
  509. # and provided the sender always uses a new one, it won't know
  510. # we're cheating, since we won't send it two same either.
  511. errmsg = self.preparemsg(header, payload)
  512. # Send it back.
  513. self.send_prepared_msg(sock, errmsg)
  514. def process_command_subscribe(self, sock, routing, data):
  515. group = routing["group"]
  516. instance = routing["instance"]
  517. if group == None or instance == None:
  518. return # ignore invalid packets entirely
  519. self.subs.subscribe(group, instance, sock)
  520. def process_command_unsubscribe(self, sock, routing, data):
  521. group = routing["group"]
  522. instance = routing["instance"]
  523. if group == None or instance == None:
  524. return # ignore invalid packets entirely
  525. self.subs.unsubscribe(group, instance, sock)
  526. def run(self):
  527. """Process messages. Forever. Mostly."""
  528. self.running = True
  529. if self.poller:
  530. self.run_poller()
  531. else:
  532. self.run_kqueue()
  533. def run_poller(self):
  534. while self.running:
  535. try:
  536. # Poll with a timeout so that every once in a while,
  537. # the loop checks for self.running.
  538. events = self.poller.poll()
  539. except select.error as err:
  540. if err.args[0] == errno.EINTR:
  541. events = []
  542. else:
  543. logger.fatal(MSGQ_POLL_ERR, err)
  544. break
  545. with self.__lock:
  546. for (fd, event) in events:
  547. if fd == self.listen_socket.fileno():
  548. self.process_accept()
  549. elif fd == self.__poller_sock.fileno():
  550. # If it's the signal socket, we should terminate now.
  551. self.running = False
  552. break
  553. else:
  554. if event & select.POLLOUT:
  555. self.__process_write(fd)
  556. elif event & select.POLLIN:
  557. self.process_socket(fd)
  558. else:
  559. logger.error(MSGQ_POLL_UNKNOWN_EVENT, fd, event)
  560. def run_kqueue(self):
  561. while self.running:
  562. # Check with a timeout so that every once in a while,
  563. # the loop checks for self.running.
  564. events = self.kqueue.control(None, 10)
  565. if not events:
  566. raise RuntimeError('serve: kqueue returned no events')
  567. with self.__lock:
  568. for event in events:
  569. if event.ident == self.listen_socket.fileno():
  570. self.process_accept()
  571. elif event.ident == self.__poller_sock.fileno():
  572. # If it's the signal socket, we should terminate now.
  573. self.running = False
  574. break;
  575. else:
  576. if event.filter == select.KQ_FILTER_WRITE:
  577. self.__process_write(event.ident)
  578. if event.filter == select.KQ_FILTER_READ and \
  579. event.data > 0:
  580. self.process_socket(event.ident)
  581. elif event.flags & select.KQ_EV_EOF:
  582. self.kill_socket(event.ident,
  583. self.sockets[event.ident])
  584. def stop(self):
  585. # Signal it should terminate.
  586. self.__control_sock.close()
  587. self.__control_sock = None
  588. # Abort anything waiting on the condition, just to make sure it's not
  589. # blocked forever
  590. self.cfgmgr_ready(False)
  591. def cleanup_signalsock(self):
  592. """Close the signal sockets. We could do it directly in shutdown,
  593. but this part is reused in tests.
  594. """
  595. if self.__poller_sock:
  596. self.__poller_sock.close()
  597. self.__poller_sock = None
  598. if self.__control_sock:
  599. self.__control_sock.close()
  600. self.__control_sock = None
  601. def shutdown(self):
  602. """Stop the MsgQ master."""
  603. logger.debug(TRACE_START, MSGQ_SHUTDOWN)
  604. self.listen_socket.close()
  605. self.cleanup_signalsock()
  606. # Close all the sockets too. In real life, there should be none now,
  607. # as Msgq should be the last one. But some tests don't adhere to this
  608. # and create a new Msgq for each test, which led to huge socket leaks.
  609. # Some other threads put some other things in instead of sockets, so
  610. # we catch whatever exceptions there we can. This should be safe,
  611. # because in real operation, we will terminate now anyway, implicitly
  612. # closing anything anyway.
  613. for sock in self.sockets.values():
  614. try:
  615. sock.close()
  616. except Exception:
  617. pass
  618. if os.path.exists(self.socket_file):
  619. os.remove(self.socket_file)
  620. def config_handler(self, new_config):
  621. """The configuration handler (run in a separate thread).
  622. Not tested, currently effectively empty.
  623. """
  624. config_logger.debug(TRACE_DETAIL, MSGQ_CONFIG_DATA, new_config)
  625. with self.__lock:
  626. if not self.running:
  627. return
  628. # TODO: Any config handlig goes here.
  629. return isc.config.create_answer(0)
  630. def command_handler(self, command, args):
  631. """The command handler (run in a separate thread).
  632. Not tested, currently effectively empty.
  633. """
  634. config_logger.debug(TRACE_DETAIL, MSGQ_COMMAND, command, args)
  635. with self.__lock:
  636. if not self.running:
  637. return
  638. # TODO: Any commands go here
  639. config_logger.error(MSGQ_COMMAND_UNKNOWN, command)
  640. return isc.config.create_answer(1, 'unknown command: ' + command)
  641. def signal_handler(msgq, signal, frame):
  642. if msgq:
  643. msgq.stop()
  644. if __name__ == "__main__":
  645. def check_port(option, opt_str, value, parser):
  646. """Function to insure that the port we are passed is actually
  647. a valid port number. Used by OptionParser() on startup."""
  648. intval = int(value)
  649. if (intval < 0) or (intval > 65535):
  650. raise OptionValueError("%s requires a port number (0-65535)" % opt_str)
  651. parser.values.msgq_port = intval
  652. # Parse any command-line options.
  653. parser = OptionParser(version=VERSION)
  654. # TODO: Should we remove the option?
  655. parser.add_option("-v", "--verbose", dest="verbose", action="store_true",
  656. help="display more about what is going on")
  657. parser.add_option("-s", "--socket-file", dest="msgq_socket_file",
  658. type="string", default=None,
  659. help="UNIX domain socket file the msgq daemon will use")
  660. (options, args) = parser.parse_args()
  661. # Announce startup.
  662. logger.debug(TRACE_START, MSGQ_START, VERSION)
  663. msgq = MsgQ(options.msgq_socket_file, options.verbose)
  664. signal.signal(signal.SIGTERM,
  665. lambda signal, frame: signal_handler(msgq, signal, frame))
  666. try:
  667. msgq.setup()
  668. except Exception as e:
  669. logger.fatal(MSGQ_START_FAIL, e)
  670. sys.exit(1)
  671. # We run the processing in a separate thread. This is because we want to
  672. # connect to the msgq ourself. But the cc library is unfortunately blocking
  673. # in many places and waiting for the processing part to answer, it would
  674. # deadlock.
  675. poller_thread = threading.Thread(target=msgq.run)
  676. poller_thread.daemon = True
  677. try:
  678. poller_thread.start()
  679. if msgq.wait_cfgmgr():
  680. # Once we get the config manager, we can read our own config.
  681. session = isc.config.ModuleCCSession(SPECFILE_LOCATION,
  682. msgq.config_handler,
  683. msgq.command_handler,
  684. None, True,
  685. msgq.socket_file)
  686. session.start()
  687. # And we create a thread that'll just wait for commands and
  688. # handle them. We don't terminate the thread, we set it to
  689. # daemon. Once the main thread terminates, it'll just die.
  690. def run_session():
  691. while True:
  692. session.check_command(False)
  693. background_thread = threading.Thread(target=run_session)
  694. background_thread.daemon = True
  695. background_thread.start()
  696. poller_thread.join()
  697. except KeyboardInterrupt:
  698. pass
  699. msgq.shutdown()