msgq.py.in 24 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275276277278279280281282283284285286287288289290291292293294295296297298299300301302303304305306307308309310311312313314315316317318319320321322323324325326327328329330331332333334335336337338339340341342343344345346347348349350351352353354355356357358359360361362363364365366367368369370371372373374375376377378379380381382383384385386387388389390391392393394395396397398399400401402403404405406407408409410411412413414415416417418419420421422423424425426427428429430431432433434435436437438439440441442443444445446447448449450451452453454455456457458459460461462463464465466467468469470471472473474475476477478479480481482483484485486487488489490491492493494495496497498499500501502503504505506507508509510511512513514515516517518519520521522523524525526527528529530531532533534535536537538539540541542543544545546547548549550551552553554555556557558559560561562563564565566567568569570571572573574575576577578579580581582583584585586587588589590591592593594595596597598599600601602603604605606607608609610611612613614615616617618619620621622623624625626627628629630631632633634635636637638639640641642643644645646647648649650651652653654655656657
  1. #!@PYTHON@
  2. # Copyright (C) 2010 Internet Systems Consortium.
  3. #
  4. # Permission to use, copy, modify, and distribute this software for any
  5. # purpose with or without fee is hereby granted, provided that the above
  6. # copyright notice and this permission notice appear in all copies.
  7. #
  8. # THE SOFTWARE IS PROVIDED "AS IS" AND INTERNET SYSTEMS CONSORTIUM
  9. # DISCLAIMS ALL WARRANTIES WITH REGARD TO THIS SOFTWARE INCLUDING ALL
  10. # IMPLIED WARRANTIES OF MERCHANTABILITY AND FITNESS. IN NO EVENT SHALL
  11. # INTERNET SYSTEMS CONSORTIUM BE LIABLE FOR ANY SPECIAL, DIRECT,
  12. # INDIRECT, OR CONSEQUENTIAL DAMAGES OR ANY DAMAGES WHATSOEVER RESULTING
  13. # FROM LOSS OF USE, DATA OR PROFITS, WHETHER IN AN ACTION OF CONTRACT,
  14. # NEGLIGENCE OR OTHER TORTIOUS ACTION, ARISING OUT OF OR IN CONNECTION
  15. # WITH THE USE OR PERFORMANCE OF THIS SOFTWARE.
  16. import sys; sys.path.append ('@@PYTHONPATH@@')
  17. """This code implements the msgq daemon."""
  18. import subprocess
  19. import signal
  20. import os
  21. import socket
  22. import sys
  23. import struct
  24. import errno
  25. import time
  26. import select
  27. import random
  28. import threading
  29. from optparse import OptionParser, OptionValueError
  30. import isc.util.process
  31. import isc.log
  32. from isc.log_messages.msgq_messages import *
  33. import isc.cc
  34. isc.util.process.rename()
  35. # Logger that is used in the actual msgq handling - startup, shutdown and the
  36. # poller thread.
  37. logger = isc.log.Logger("msgq")
  38. # A separate copy for the master/config thread when the poller thread runs.
  39. # We use a separate instance, since the logger itself doesn't have to be
  40. # thread safe.
  41. config_logger = isc.log.Logger("msgq")
  42. TRACE_START = logger.DBGLVL_START_SHUT
  43. TRACE_BASIC = logger.DBGLVL_TRACE_BASIC
  44. TRACE_DETAIL = logger.DBGLVL_TRACE_DETAIL
  45. # This is the version that gets displayed to the user.
  46. # The VERSION string consists of the module name, the module version
  47. # number, and the overall BIND 10 version number (set in configure.ac).
  48. VERSION = "b10-msgq 20110127 (BIND 10 @PACKAGE_VERSION@)"
  49. class MsgQReceiveError(Exception): pass
  50. class SubscriptionManager:
  51. def __init__(self, cfgmgr_ready):
  52. """
  53. Initialize the subscription manager.
  54. parameters:
  55. * cfgmgr_ready: A callable object run once the config manager
  56. subscribes. This is a hackish solution, but we can't read
  57. the configuration sooner.
  58. """
  59. self.subscriptions = {}
  60. self.__cfgmgr_ready = cfgmgr_ready
  61. self.__cfgmgr_ready_called = False
  62. def subscribe(self, group, instance, socket):
  63. """Add a subscription."""
  64. target = ( group, instance )
  65. if target in self.subscriptions:
  66. logger.debug(TRACE_BASIC, MSGQ_SUBS_APPEND_TARGET, group, instance)
  67. if socket not in self.subscriptions[target]:
  68. self.subscriptions[target].append(socket)
  69. else:
  70. logger.debug(TRACE_BASIC, MSGQ_SUBS_NEW_TARGET, group, instance)
  71. self.subscriptions[target] = [ socket ]
  72. if group == "ConfigManager" and not self.__cfgmgr_ready_called:
  73. logger.debug(TRACE_BASIC, MSGQ_CFGMGR_SUBSCRIBED)
  74. self.__cfgmgr_ready_called = True
  75. self.__cfgmgr_ready()
  76. def unsubscribe(self, group, instance, socket):
  77. """Remove the socket from the one specific subscription."""
  78. target = ( group, instance )
  79. if target in self.subscriptions:
  80. if socket in self.subscriptions[target]:
  81. self.subscriptions[target].remove(socket)
  82. def unsubscribe_all(self, socket):
  83. """Remove the socket from all subscriptions."""
  84. for socklist in self.subscriptions.values():
  85. if socket in socklist:
  86. socklist.remove(socket)
  87. def find_sub(self, group, instance):
  88. """Return an array of sockets which want this specific group,
  89. instance."""
  90. target = (group, instance)
  91. if target in self.subscriptions:
  92. return self.subscriptions[target]
  93. else:
  94. return []
  95. def find(self, group, instance):
  96. """Return an array of sockets who should get something sent to
  97. this group, instance pair. This includes wildcard subscriptions."""
  98. target = (group, instance)
  99. partone = self.find_sub(group, instance)
  100. parttwo = self.find_sub(group, "*")
  101. return list(set(partone + parttwo))
  102. class MsgQ:
  103. """Message Queue class."""
  104. # did we find a better way to do this?
  105. SOCKET_FILE = os.path.join("@localstatedir@",
  106. "@PACKAGE_NAME@",
  107. "msgq_socket").replace("${prefix}",
  108. "@prefix@")
  109. def __init__(self, socket_file=None, verbose=False):
  110. """Initialize the MsgQ master.
  111. The socket_file specifies the path to the UNIX domain socket
  112. that the msgq process listens on. If it is None, the
  113. environment variable BIND10_MSGQ_SOCKET_FILE is used. If that
  114. is not set, it will default to
  115. @localstatedir@/@PACKAGE_NAME@/msg_socket.
  116. If verbose is True, then the MsgQ reports
  117. what it is doing.
  118. """
  119. if socket_file is None:
  120. if "BIND10_MSGQ_SOCKET_FILE" in os.environ:
  121. self.socket_file = os.environ["BIND10_MSGQ_SOCKET_FILE"]
  122. else:
  123. self.socket_file = self.SOCKET_FILE
  124. else:
  125. self.socket_file = socket_file
  126. self.verbose = verbose
  127. self.poller = None
  128. self.kqueue = None
  129. self.runnable = False
  130. self.listen_socket = False
  131. self.sockets = {}
  132. self.connection_counter = random.random()
  133. self.hostname = socket.gethostname()
  134. self.subs = SubscriptionManager(self.cfgmgr_ready)
  135. self.lnames = {}
  136. self.sendbuffs = {}
  137. self.running = False
  138. def cfgmgr_ready(self):
  139. pass
  140. def setup_poller(self):
  141. """Set up the poll thing. Internal function."""
  142. try:
  143. self.kqueue = select.kqueue()
  144. except AttributeError:
  145. self.poller = select.poll()
  146. def add_kqueue_socket(self, socket, write_filter=False):
  147. """Add a kquque filter for a socket. By default the read
  148. filter is used; if write_filter is set to True, the write
  149. filter is used. We use a boolean value instead of a specific
  150. filter constant, because kqueue filter values do not seem to
  151. be defined on some systems. The use of boolean makes the
  152. interface restrictive because there are other filters, but this
  153. method is mostly only for our internal use, so it should be
  154. acceptable at least for now."""
  155. filter_type = select.KQ_FILTER_WRITE if write_filter else \
  156. select.KQ_FILTER_READ
  157. event = select.kevent(socket.fileno(), filter_type,
  158. select.KQ_EV_ADD | select.KQ_EV_ENABLE)
  159. self.kqueue.control([event], 0)
  160. def delete_kqueue_socket(self, socket, write_filter=False):
  161. """Delete a kqueue filter for socket. See add_kqueue_socket()
  162. for the semantics and notes about write_filter."""
  163. filter_type = select.KQ_FILTER_WRITE if write_filter else \
  164. select.KQ_FILTER_READ
  165. event = select.kevent(socket.fileno(), filter_type,
  166. select.KQ_EV_DELETE)
  167. self.kqueue.control([event], 0)
  168. def setup_listener(self):
  169. """Set up the listener socket. Internal function."""
  170. logger.debug(TRACE_BASIC, MSGQ_LISTENER_SETUP, self.socket_file)
  171. self.listen_socket = socket.socket(socket.AF_UNIX, socket.SOCK_STREAM)
  172. if os.path.exists(self.socket_file):
  173. os.remove(self.socket_file)
  174. try:
  175. self.listen_socket.bind(self.socket_file)
  176. self.listen_socket.listen(1024)
  177. except Exception as e:
  178. # remove the file again if something goes wrong
  179. # (note this is a catch-all, but we reraise it)
  180. if os.path.exists(self.socket_file):
  181. os.remove(self.socket_file)
  182. self.listen_socket.close()
  183. logger.fatal(MSGQ_LISTENER_FAILED, self.socket_file, e)
  184. raise e
  185. if self.poller:
  186. self.poller.register(self.listen_socket, select.POLLIN)
  187. else:
  188. self.add_kqueue_socket(self.listen_socket)
  189. def setup_signalsock(self):
  190. """Create a socket pair used to signal when we want to finish.
  191. Using a socket is easy and thread/signal safe way to signal
  192. the termination.
  193. """
  194. # The __poller_sock will be the end in the poller. When it is
  195. # closed, we should shut down.
  196. (self.__poller_sock, self.__control_sock) = socket.socketpair()
  197. if self.poller:
  198. self.poller.register(self.__poller_sock, select.POLLIN)
  199. else:
  200. self.add_kqueue_socket(self.__poller_sock)
  201. def setup(self):
  202. """Configure listener socket, polling, etc.
  203. Raises a socket.error if the socket_file cannot be
  204. created.
  205. """
  206. self.setup_poller()
  207. self.setup_signalsock()
  208. self.setup_listener()
  209. logger.debug(TRACE_START, MSGQ_LISTENER_STARTED);
  210. self.runnable = True
  211. def process_accept(self):
  212. """Process an accept on the listening socket."""
  213. newsocket, ipaddr = self.listen_socket.accept()
  214. # TODO: When we have logging, we might want
  215. # to add a debug message here that a new connection
  216. # was made
  217. self.register_socket(newsocket)
  218. def register_socket(self, newsocket):
  219. """
  220. Internal function to insert a socket. Used by process_accept and some tests.
  221. """
  222. self.sockets[newsocket.fileno()] = newsocket
  223. lname = self.newlname()
  224. self.lnames[lname] = newsocket
  225. if self.poller:
  226. self.poller.register(newsocket, select.POLLIN)
  227. else:
  228. self.add_kqueue_socket(newsocket)
  229. def process_socket(self, fd):
  230. """Process a read on a socket."""
  231. if not fd in self.sockets:
  232. logger.error(MSGQ_READ_UNKNOWN_FD, fd)
  233. return
  234. sock = self.sockets[fd]
  235. # sys.stderr.write("[b10-msgq] Got read on fd %d\n" %fd)
  236. self.process_packet(fd, sock)
  237. def kill_socket(self, fd, sock):
  238. """Fully close down the socket."""
  239. if self.poller:
  240. self.poller.unregister(sock)
  241. self.subs.unsubscribe_all(sock)
  242. lname = [ k for k, v in self.lnames.items() if v == sock ][0]
  243. del self.lnames[lname]
  244. sock.close()
  245. del self.sockets[fd]
  246. if fd in self.sendbuffs:
  247. del self.sendbuffs[fd]
  248. logger.debug(TRACE_BASIC, MSGQ_SOCK_CLOSE, fd)
  249. def getbytes(self, fd, sock, length):
  250. """Get exactly the requested bytes, or raise an exception if
  251. EOF."""
  252. received = b''
  253. while len(received) < length:
  254. try:
  255. data = sock.recv(length - len(received))
  256. except socket.error:
  257. raise MsgQReceiveError(socket.error)
  258. if len(data) == 0:
  259. raise MsgQReceiveError("EOF")
  260. received += data
  261. return received
  262. def read_packet(self, fd, sock):
  263. """Read a correctly formatted packet. Will raise exceptions if
  264. something fails."""
  265. lengths = self.getbytes(fd, sock, 6)
  266. overall_length, routing_length = struct.unpack(">IH", lengths)
  267. if overall_length < 2:
  268. raise MsgQReceiveError("overall_length < 2")
  269. overall_length -= 2
  270. if routing_length > overall_length:
  271. raise MsgQReceiveError("routing_length > overall_length")
  272. if routing_length == 0:
  273. raise MsgQReceiveError("routing_length == 0")
  274. data_length = overall_length - routing_length
  275. # probably need to sanity check lengths here...
  276. routing = self.getbytes(fd, sock, routing_length)
  277. if data_length > 0:
  278. data = self.getbytes(fd, sock, data_length)
  279. else:
  280. data = None
  281. return (routing, data)
  282. def process_packet(self, fd, sock):
  283. """Process one packet."""
  284. try:
  285. routing, data = self.read_packet(fd, sock)
  286. except MsgQReceiveError as err:
  287. logger.error(MSGQ_RECV_ERR, fd, err)
  288. self.kill_socket(fd, sock)
  289. return
  290. try:
  291. routingmsg = isc.cc.message.from_wire(routing)
  292. except DecodeError as err:
  293. self.kill_socket(fd, sock)
  294. logger.error(MSGQ_HDR_DECODE_ERR, fd, err)
  295. return
  296. self.process_command(fd, sock, routingmsg, data)
  297. def process_command(self, fd, sock, routing, data):
  298. """Process a single command. This will split out into one of the
  299. other functions."""
  300. logger.debug(TRACE_DETAIL, MSGQ_RECV_HDR, routing)
  301. cmd = routing["type"]
  302. if cmd == 'send':
  303. self.process_command_send(sock, routing, data)
  304. elif cmd == 'subscribe':
  305. self.process_command_subscribe(sock, routing, data)
  306. elif cmd == 'unsubscribe':
  307. self.process_command_unsubscribe(sock, routing, data)
  308. elif cmd == 'getlname':
  309. self.process_command_getlname(sock, routing, data)
  310. elif cmd == 'ping':
  311. # Command for testing purposes
  312. self.process_command_ping(sock, routing, data)
  313. elif cmd == 'stop':
  314. self.stop()
  315. else:
  316. logger.error(MSGQ_INVALID_CMD, cmd)
  317. def preparemsg(self, env, msg = None):
  318. if type(env) == dict:
  319. env = isc.cc.message.to_wire(env)
  320. if type(msg) == dict:
  321. msg = isc.cc.message.to_wire(msg)
  322. length = 2 + len(env);
  323. if msg:
  324. length += len(msg)
  325. ret = struct.pack("!IH", length, len(env))
  326. ret += env
  327. if msg:
  328. ret += msg
  329. return ret
  330. def sendmsg(self, sock, env, msg = None):
  331. self.send_prepared_msg(sock, self.preparemsg(env, msg))
  332. def __send_data(self, sock, data):
  333. """
  334. Send a piece of data to the given socket.
  335. Parameters:
  336. sock: The socket to send to
  337. data: The list of bytes to send
  338. Returns:
  339. An integer or None. If an integer (which can be 0), it signals
  340. the number of bytes sent. If None, the socket appears to have
  341. been closed on the other end, and it has been killed on this
  342. side too.
  343. """
  344. try:
  345. # We set the socket nonblocking, MSG_DONTWAIT doesn't exist
  346. # on some OSes
  347. sock.setblocking(0)
  348. return sock.send(data)
  349. except socket.error as e:
  350. if e.errno in [ errno.EAGAIN,
  351. errno.EWOULDBLOCK,
  352. errno.EINTR ]:
  353. return 0
  354. elif e.errno in [ errno.EPIPE,
  355. errno.ECONNRESET,
  356. errno.ENOBUFS ]:
  357. logger.error(MSGQ_SEND_ERR, sock.fileno(),
  358. errno.errorcode[e.errno])
  359. self.kill_socket(sock.fileno(), sock)
  360. return None
  361. else:
  362. raise e
  363. finally:
  364. # And set it back again
  365. sock.setblocking(1)
  366. def send_prepared_msg(self, sock, msg):
  367. # Try to send the data, but only if there's nothing waiting
  368. fileno = sock.fileno()
  369. if fileno in self.sendbuffs:
  370. amount_sent = 0
  371. else:
  372. amount_sent = self.__send_data(sock, msg)
  373. if amount_sent is None:
  374. # Socket has been killed, drop the send
  375. return
  376. # Still something to send, add it to outgoing queue
  377. if amount_sent < len(msg):
  378. now = time.clock()
  379. # Append it to buffer (but check the data go away)
  380. if fileno in self.sendbuffs:
  381. (last_sent, buff) = self.sendbuffs[fileno]
  382. if now - last_sent > 0.1:
  383. self.kill_socket(fileno, sock)
  384. return
  385. buff += msg
  386. else:
  387. buff = msg[amount_sent:]
  388. last_sent = now
  389. if self.poller:
  390. self.poller.register(fileno, select.POLLIN |
  391. select.POLLOUT)
  392. else:
  393. self.add_kqueue_socket(sock, True)
  394. self.sendbuffs[fileno] = (last_sent, buff)
  395. def __process_write(self, fileno):
  396. # Try to send some data from the buffer
  397. (_, msg) = self.sendbuffs[fileno]
  398. sock = self.sockets[fileno]
  399. amount_sent = self.__send_data(sock, msg)
  400. if amount_sent is not None:
  401. # Keep the rest
  402. msg = msg[amount_sent:]
  403. if len(msg) == 0:
  404. # If there's no more, stop requesting for write availability
  405. if self.poller:
  406. self.poller.register(fileno, select.POLLIN)
  407. else:
  408. self.delete_kqueue_socket(sock, True)
  409. del self.sendbuffs[fileno]
  410. else:
  411. self.sendbuffs[fileno] = (time.clock(), msg)
  412. def newlname(self):
  413. """Generate a unique connection identifier for this socket.
  414. This is done by using an increasing counter and the current
  415. time."""
  416. self.connection_counter += 1
  417. return "%x_%x@%s" % (time.time(), self.connection_counter, self.hostname)
  418. def process_command_ping(self, sock, routing, data):
  419. self.sendmsg(sock, { "type" : "pong" }, data)
  420. def process_command_getlname(self, sock, routing, data):
  421. lname = [ k for k, v in self.lnames.items() if v == sock ][0]
  422. self.sendmsg(sock, { "type" : "getlname" }, { "lname" : lname })
  423. def process_command_send(self, sock, routing, data):
  424. group = routing["group"]
  425. instance = routing["instance"]
  426. to = routing["to"]
  427. if group == None or instance == None:
  428. return # ignore invalid packets entirely
  429. if to == "*":
  430. sockets = self.subs.find(group, instance)
  431. else:
  432. if to in self.lnames:
  433. sockets = [ self.lnames[to] ]
  434. else:
  435. return # recipient doesn't exist
  436. msg = self.preparemsg(routing, data)
  437. if sock in sockets:
  438. sockets.remove(sock)
  439. for socket in sockets:
  440. self.send_prepared_msg(socket, msg)
  441. def process_command_subscribe(self, sock, routing, data):
  442. group = routing["group"]
  443. instance = routing["instance"]
  444. if group == None or instance == None:
  445. return # ignore invalid packets entirely
  446. self.subs.subscribe(group, instance, sock)
  447. def process_command_unsubscribe(self, sock, routing, data):
  448. group = routing["group"]
  449. instance = routing["instance"]
  450. if group == None or instance == None:
  451. return # ignore invalid packets entirely
  452. self.subs.unsubscribe(group, instance, sock)
  453. def run(self):
  454. """Process messages. Forever. Mostly."""
  455. self.running = True
  456. if self.poller:
  457. self.run_poller()
  458. else:
  459. self.run_kqueue()
  460. def run_poller(self):
  461. while self.running:
  462. try:
  463. # Poll with a timeout so that every once in a while,
  464. # the loop checks for self.running.
  465. events = self.poller.poll()
  466. except select.error as err:
  467. if err.args[0] == errno.EINTR:
  468. events = []
  469. else:
  470. logger.fatal(MSGQ_POLL_ERR, err)
  471. break
  472. for (fd, event) in events:
  473. if fd == self.listen_socket.fileno():
  474. self.process_accept()
  475. elif fd == self.__poller_sock.fileno():
  476. # If it's the signal socket, we should terminate now.
  477. self.running = False
  478. break
  479. else:
  480. if event & select.POLLOUT:
  481. self.__process_write(fd)
  482. elif event & select.POLLIN:
  483. self.process_socket(fd)
  484. else:
  485. logger.error(MSGQ_POLL_UNKNOWN_EVENT, fd, event)
  486. def run_kqueue(self):
  487. while self.running:
  488. # Check with a timeout so that every once in a while,
  489. # the loop checks for self.running.
  490. events = self.kqueue.control(None, 10)
  491. if not events:
  492. raise RuntimeError('serve: kqueue returned no events')
  493. for event in events:
  494. if event.ident == self.listen_socket.fileno():
  495. self.process_accept()
  496. elif event.ident == self.__poller_sock.fileno():
  497. # If it's the signal socket, we should terminate now.
  498. self.running = False
  499. break;
  500. else:
  501. if event.filter == select.KQ_FILTER_WRITE:
  502. self.__process_write(event.ident)
  503. if event.filter == select.KQ_FILTER_READ and \
  504. event.data > 0:
  505. self.process_socket(event.ident)
  506. elif event.flags & select.KQ_EV_EOF:
  507. self.kill_socket(event.ident,
  508. self.sockets[event.ident])
  509. def stop(self):
  510. # Signal it should terminate.
  511. self.__control_sock.close()
  512. self.__control_sock = None
  513. def cleanup_signalsock(self):
  514. """Close the signal sockets. We could do it directly in shutdown,
  515. but this part is reused in tests.
  516. """
  517. if self.__poller_sock:
  518. self.__poller_sock.close()
  519. self.__poller_sock = None
  520. if self.__control_sock:
  521. self.__control_sock.close()
  522. self.__control_sock = None
  523. def shutdown(self):
  524. """Stop the MsgQ master."""
  525. logger.debug(TRACE_START, MSGQ_SHUTDOWN)
  526. self.listen_socket.close()
  527. self.cleanup_signalsock()
  528. if os.path.exists(self.socket_file):
  529. os.remove(self.socket_file)
  530. # can signal handling and calling a destructor be done without a
  531. # global variable?
  532. msgq = None
  533. def signal_handler(signal, frame):
  534. if msgq:
  535. msgq.stop()
  536. if __name__ == "__main__":
  537. def check_port(option, opt_str, value, parser):
  538. """Function to insure that the port we are passed is actually
  539. a valid port number. Used by OptionParser() on startup."""
  540. intval = int(value)
  541. if (intval < 0) or (intval > 65535):
  542. raise OptionValueError("%s requires a port number (0-65535)" % opt_str)
  543. parser.values.msgq_port = intval
  544. # Parse any command-line options.
  545. parser = OptionParser(version=VERSION)
  546. parser.add_option("-v", "--verbose", dest="verbose", action="store_true",
  547. help="display more about what is going on")
  548. parser.add_option("-s", "--socket-file", dest="msgq_socket_file",
  549. type="string", default=None,
  550. help="UNIX domain socket file the msgq daemon will use")
  551. (options, args) = parser.parse_args()
  552. # Init logging, according to the parameters.
  553. # FIXME: Do proper logger configuration, this is just a hack
  554. # This is #2582
  555. sev = 'INFO'
  556. if options.verbose:
  557. sev = 'DEBUG'
  558. isc.log.init("b10-msgq", buffer=False, severity=sev, debuglevel=99)
  559. signal.signal(signal.SIGTERM, signal_handler)
  560. # Announce startup.
  561. logger.debug(TRACE_START, MSGQ_START, VERSION)
  562. msgq = MsgQ(options.msgq_socket_file, options.verbose)
  563. try:
  564. msgq.setup()
  565. except Exception as e:
  566. logger.fatal(MSGQ_START_FAIL, e)
  567. sys.exit(1)
  568. # We run the processing in a separate thread. This is because we want to
  569. # connect to the msgq ourself. But the cc library is unfortunately blocking
  570. # in many places and waiting for the processing part to answer, it would
  571. # deadlock.
  572. poller_thread = threading.Thread(target=msgq.run)
  573. poller_thread.daemon = True
  574. try:
  575. poller_thread.start()
  576. poller_thread.join()
  577. except KeyboardInterrupt:
  578. pass
  579. msgq.shutdown()