msgq.py.in 41 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275276277278279280281282283284285286287288289290291292293294295296297298299300301302303304305306307308309310311312313314315316317318319320321322323324325326327328329330331332333334335336337338339340341342343344345346347348349350351352353354355356357358359360361362363364365366367368369370371372373374375376377378379380381382383384385386387388389390391392393394395396397398399400401402403404405406407408409410411412413414415416417418419420421422423424425426427428429430431432433434435436437438439440441442443444445446447448449450451452453454455456457458459460461462463464465466467468469470471472473474475476477478479480481482483484485486487488489490491492493494495496497498499500501502503504505506507508509510511512513514515516517518519520521522523524525526527528529530531532533534535536537538539540541542543544545546547548549550551552553554555556557558559560561562563564565566567568569570571572573574575576577578579580581582583584585586587588589590591592593594595596597598599600601602603604605606607608609610611612613614615616617618619620621622623624625626627628629630631632633634635636637638639640641642643644645646647648649650651652653654655656657658659660661662663664665666667668669670671672673674675676677678679680681682683684685686687688689690691692693694695696697698699700701702703704705706707708709710711712713714715716717718719720721722723724725726727728729730731732733734735736737738739740741742743744745746747748749750751752753754755756757758759760761762763764765766767768769770771772773774775776777778779780781782783784785786787788789790791792793794795796797798799800801802803804805806807808809810811812813814815816817818819820821822823824825826827828829830831832833834835836837838839840841842843844845846847848849850851852853854855856857858859860861862863864865866867868869870871872873874875876877878879880881882883884885886887888889890891892893894895896897898899900901902903904905906907908909910911912913914915916917918919920921922923924925926927928929930931932933934935936937938939940941942943944945946947948949950951952953954955956957958959960961962963964965966967968969970971972973974975976977978979980981982983984985986987988989990991992993994995996997998
  1. #!@PYTHON@
  2. # Copyright (C) 2010 Internet Systems Consortium.
  3. #
  4. # Permission to use, copy, modify, and distribute this software for any
  5. # purpose with or without fee is hereby granted, provided that the above
  6. # copyright notice and this permission notice appear in all copies.
  7. #
  8. # THE SOFTWARE IS PROVIDED "AS IS" AND INTERNET SYSTEMS CONSORTIUM
  9. # DISCLAIMS ALL WARRANTIES WITH REGARD TO THIS SOFTWARE INCLUDING ALL
  10. # IMPLIED WARRANTIES OF MERCHANTABILITY AND FITNESS. IN NO EVENT SHALL
  11. # INTERNET SYSTEMS CONSORTIUM BE LIABLE FOR ANY SPECIAL, DIRECT,
  12. # INDIRECT, OR CONSEQUENTIAL DAMAGES OR ANY DAMAGES WHATSOEVER RESULTING
  13. # FROM LOSS OF USE, DATA OR PROFITS, WHETHER IN AN ACTION OF CONTRACT,
  14. # NEGLIGENCE OR OTHER TORTIOUS ACTION, ARISING OUT OF OR IN CONNECTION
  15. # WITH THE USE OR PERFORMANCE OF THIS SOFTWARE.
  16. import sys; sys.path.append ('@@PYTHONPATH@@')
  17. """This code implements the msgq daemon."""
  18. import subprocess
  19. import signal
  20. import os
  21. import socket
  22. import sys
  23. import struct
  24. import errno
  25. import time
  26. import select
  27. import random
  28. import threading
  29. import isc.config.ccsession
  30. from optparse import OptionParser, OptionValueError
  31. import isc.util.process
  32. from isc.cc.proto_defs import *
  33. import isc.log
  34. from isc.log_messages.msgq_messages import *
  35. import isc.cc
  36. isc.util.process.rename()
  37. isc.log.init("b10-msgq", buffer=True)
  38. # Logger that is used in the actual msgq handling - startup, shutdown and the
  39. # poller thread.
  40. logger = isc.log.Logger("msgq")
  41. # A separate copy for the master/config thread when the poller thread runs.
  42. # We use a separate instance, since the logger itself doesn't have to be
  43. # thread safe.
  44. config_logger = isc.log.Logger("msgq")
  45. TRACE_START = logger.DBGLVL_START_SHUT
  46. TRACE_BASIC = logger.DBGLVL_TRACE_BASIC
  47. TRACE_DETAIL = logger.DBGLVL_TRACE_DETAIL
  48. # This is the version that gets displayed to the user.
  49. # The VERSION string consists of the module name, the module version
  50. # number, and the overall BIND 10 version number (set in configure.ac).
  51. VERSION = "b10-msgq 20110127 (BIND 10 @PACKAGE_VERSION@)"
  52. # If B10_FROM_BUILD is set in the environment, we use data files
  53. # from a directory relative to that, otherwise we use the ones
  54. # installed on the system
  55. if "B10_FROM_SOURCE" in os.environ:
  56. SPECFILE_PATH = os.environ["B10_FROM_SOURCE"] + "/src/bin/msgq"
  57. else:
  58. PREFIX = "@prefix@"
  59. DATAROOTDIR = "@datarootdir@"
  60. SPECFILE_PATH = "@datadir@/@PACKAGE@".replace("${datarootdir}",
  61. DATAROOTDIR). \
  62. replace("${prefix}", PREFIX)
  63. SPECFILE_LOCATION = SPECFILE_PATH + "/msgq.spec"
  64. class MsgQReceiveError(Exception): pass
  65. class MsgQCloseOnReceive(Exception):
  66. """Exception raised when reading data from a socket results in 'shutdown'.
  67. This happens when msgq received 0-length data. This class holds whether
  68. it happens in the middle of reading (i.e. after reading some) via
  69. partial_read parameter, which is set to True if and only if so.
  70. This will be used by an upper layer catching the exception to distinguish
  71. the severity of the event.
  72. """
  73. def __init__(self, reason, partial_read):
  74. self.partial_read = partial_read
  75. self.__reason = reason
  76. def __str__(self):
  77. return self.__reason
  78. class SubscriptionManager:
  79. def __init__(self, cfgmgr_ready):
  80. """
  81. Initialize the subscription manager.
  82. parameters:
  83. * cfgmgr_ready: A callable object run once the config manager
  84. subscribes. This is a hackish solution, but we can't read
  85. the configuration sooner.
  86. """
  87. self.subscriptions = {}
  88. self.__cfgmgr_ready = cfgmgr_ready
  89. self.__cfgmgr_ready_called = False
  90. def subscribe(self, group, instance, socket):
  91. """Add a subscription."""
  92. target = ( group, instance )
  93. if target in self.subscriptions:
  94. logger.debug(TRACE_BASIC, MSGQ_SUBS_APPEND_TARGET, group, instance)
  95. if socket not in self.subscriptions[target]:
  96. self.subscriptions[target].append(socket)
  97. else:
  98. logger.debug(TRACE_BASIC, MSGQ_SUBS_NEW_TARGET, group, instance)
  99. self.subscriptions[target] = [ socket ]
  100. if group == "ConfigManager" and not self.__cfgmgr_ready_called:
  101. logger.debug(TRACE_BASIC, MSGQ_CFGMGR_SUBSCRIBED)
  102. self.__cfgmgr_ready_called = True
  103. self.__cfgmgr_ready()
  104. def unsubscribe(self, group, instance, socket):
  105. """Remove the socket from the one specific subscription."""
  106. target = ( group, instance )
  107. if target in self.subscriptions:
  108. if socket in self.subscriptions[target]:
  109. self.subscriptions[target].remove(socket)
  110. return True
  111. return False
  112. def unsubscribe_all(self, socket):
  113. """Remove the socket from all subscriptions."""
  114. removed_from = []
  115. for subs, socklist in self.subscriptions.items():
  116. if socket in socklist:
  117. socklist.remove(socket)
  118. removed_from.append(subs)
  119. return removed_from
  120. def find_sub(self, group, instance):
  121. """Return an array of sockets which want this specific group,
  122. instance."""
  123. target = (group, instance)
  124. if target in self.subscriptions:
  125. return self.subscriptions[target]
  126. else:
  127. return []
  128. def find(self, group, instance):
  129. """Return an array of sockets who should get something sent to
  130. this group, instance pair. This includes wildcard subscriptions."""
  131. target = (group, instance)
  132. partone = self.find_sub(group, instance)
  133. parttwo = self.find_sub(group, CC_INSTANCE_WILDCARD)
  134. return list(set(partone + parttwo))
  135. class MsgQ:
  136. """Message Queue class."""
  137. # did we find a better way to do this?
  138. SOCKET_FILE = os.path.join("@localstatedir@",
  139. "@PACKAGE_NAME@",
  140. "msgq_socket").replace("${prefix}",
  141. "@prefix@")
  142. def __init__(self, socket_file=None, verbose=False):
  143. """Initialize the MsgQ master.
  144. The socket_file specifies the path to the UNIX domain socket
  145. that the msgq process listens on. If it is None, the
  146. environment variable BIND10_MSGQ_SOCKET_FILE is used. If that
  147. is not set, it will default to
  148. @localstatedir@/@PACKAGE_NAME@/msg_socket.
  149. If verbose is True, then the MsgQ reports
  150. what it is doing.
  151. """
  152. if socket_file is None:
  153. if "BIND10_MSGQ_SOCKET_FILE" in os.environ:
  154. self.socket_file = os.environ["BIND10_MSGQ_SOCKET_FILE"]
  155. else:
  156. self.socket_file = self.SOCKET_FILE
  157. else:
  158. self.socket_file = socket_file
  159. self.verbose = verbose
  160. self.poller = None
  161. self.kqueue = None
  162. self.runnable = False
  163. self.listen_socket = False
  164. self.sockets = {}
  165. self.connection_counter = random.random()
  166. self.hostname = socket.gethostname()
  167. self.subs = SubscriptionManager(self.cfgmgr_ready)
  168. self.lnames = {}
  169. self.fd_to_lname = {}
  170. self.sendbuffs = {}
  171. self.running = False
  172. self.__cfgmgr_ready = None
  173. self.__cfgmgr_ready_cond = threading.Condition()
  174. # A lock used when the message queue does anything more complicated.
  175. # It is mostly a safety measure, the threads doing so should be mostly
  176. # independent, and the one with config session should be read only,
  177. # but with threads, one never knows. We use threads for concurrency,
  178. # not for performance, so we use wide lock scopes to be on the safe
  179. # side.
  180. self.__lock = threading.Lock()
  181. self._session = None
  182. def members_notify(self, event, params):
  183. """
  184. Thin wrapper around ccs's notify. Send a notification about change
  185. of some list that can be requested by the members command.
  186. The event is one of:
  187. - connected (client connected to MsgQ)
  188. - disconected (client disconnected from MsgQ)
  189. - subscribed (client subscribed to a group)
  190. - unsubscribed (client unsubscribed from a group)
  191. The params is dict containing:
  192. - client: The lname of the client in question.
  193. - group (for 'subscribed' and 'unsubscribed' events):
  194. The group the client subscribed or unsubscribed from.
  195. The notification occurs after the event, so client a subscribing for
  196. notifications will get a notification about its own subscription, but
  197. will not get a notification when it unsubscribes.
  198. """
  199. # Due to the interaction between threads (and fear it might influence
  200. # sending stuff), we test this method in msgq_run_test, instead of
  201. # mocking the ccs.
  202. if self._session: # Don't send before we have started up
  203. self._session.notify('cc_members', event, params)
  204. def cfgmgr_ready(self, ready=True):
  205. """Notify that the config manager is either subscribed, or
  206. that the msgq is shutting down and it won't connect, but
  207. anybody waiting for it should stop anyway.
  208. The ready parameter signifies if the config manager is subscribed.
  209. This method can be called multiple times, but second and any
  210. following call is simply ignored. This means the "abort" version
  211. of the call can be used on any stop unconditionally, even when
  212. the config manager already connected.
  213. """
  214. with self.__cfgmgr_ready_cond:
  215. if self.__cfgmgr_ready is not None:
  216. # This is a second call to this method. In that case it does
  217. # nothing.
  218. return
  219. self.__cfgmgr_ready = ready
  220. self.__cfgmgr_ready_cond.notify_all()
  221. def wait_cfgmgr(self):
  222. """Wait for msgq to subscribe.
  223. When this returns, the config manager is either subscribed, or
  224. msgq gave up waiting for it. Success is signified by the return
  225. value.
  226. """
  227. with self.__cfgmgr_ready_cond:
  228. # Wait until it either aborts or subscribes
  229. while self.__cfgmgr_ready is None:
  230. self.__cfgmgr_ready_cond.wait()
  231. return self.__cfgmgr_ready
  232. def setup_poller(self):
  233. """Set up the poll thing. Internal function."""
  234. try:
  235. self.kqueue = select.kqueue()
  236. except AttributeError:
  237. self.poller = select.poll()
  238. def add_kqueue_socket(self, socket, write_filter=False):
  239. """Add a kqueue filter for a socket. By default the read
  240. filter is used; if write_filter is set to True, the write
  241. filter is used. We use a boolean value instead of a specific
  242. filter constant, because kqueue filter values do not seem to
  243. be defined on some systems. The use of boolean makes the
  244. interface restrictive because there are other filters, but this
  245. method is mostly only for our internal use, so it should be
  246. acceptable at least for now."""
  247. filter_type = select.KQ_FILTER_WRITE if write_filter else \
  248. select.KQ_FILTER_READ
  249. event = select.kevent(socket.fileno(), filter_type,
  250. select.KQ_EV_ADD | select.KQ_EV_ENABLE)
  251. self.kqueue.control([event], 0)
  252. def delete_kqueue_socket(self, socket, write_filter=False):
  253. """Delete a kqueue filter for socket. See add_kqueue_socket()
  254. for the semantics and notes about write_filter."""
  255. filter_type = select.KQ_FILTER_WRITE if write_filter else \
  256. select.KQ_FILTER_READ
  257. event = select.kevent(socket.fileno(), filter_type,
  258. select.KQ_EV_DELETE)
  259. self.kqueue.control([event], 0)
  260. def setup_listener(self):
  261. """Set up the listener socket. Internal function."""
  262. logger.debug(TRACE_BASIC, MSGQ_LISTENER_SETUP, self.socket_file)
  263. self.listen_socket = socket.socket(socket.AF_UNIX, socket.SOCK_STREAM)
  264. if os.path.exists(self.socket_file):
  265. os.remove(self.socket_file)
  266. try:
  267. self.listen_socket.bind(self.socket_file)
  268. self.listen_socket.listen(1024)
  269. except Exception as e:
  270. # remove the file again if something goes wrong
  271. # (note this is a catch-all, but we reraise it)
  272. if os.path.exists(self.socket_file):
  273. os.remove(self.socket_file)
  274. self.listen_socket.close()
  275. logger.fatal(MSGQ_LISTENER_FAILED, self.socket_file, e)
  276. raise e
  277. if self.poller:
  278. self.poller.register(self.listen_socket, select.POLLIN)
  279. else:
  280. self.add_kqueue_socket(self.listen_socket)
  281. def setup_signalsock(self):
  282. """Create a socket pair used to signal when we want to finish.
  283. Using a socket is easy and thread/signal safe way to signal
  284. the termination.
  285. """
  286. # The __poller_sock will be the end in the poller. When it is
  287. # closed, we should shut down.
  288. (self.__poller_sock, self.__control_sock) = socket.socketpair()
  289. if self.poller:
  290. self.poller.register(self.__poller_sock, select.POLLIN)
  291. else:
  292. self.add_kqueue_socket(self.__poller_sock)
  293. def setup(self):
  294. """Configure listener socket, polling, etc.
  295. Raises a socket.error if the socket_file cannot be
  296. created.
  297. """
  298. self.setup_poller()
  299. self.setup_signalsock()
  300. self.setup_listener()
  301. logger.debug(TRACE_START, MSGQ_LISTENER_STARTED);
  302. self.runnable = True
  303. def process_accept(self):
  304. """Process an accept on the listening socket."""
  305. newsocket, ipaddr = self.listen_socket.accept()
  306. # TODO: When we have logging, we might want
  307. # to add a debug message here that a new connection
  308. # was made
  309. self.register_socket(newsocket)
  310. def register_socket(self, newsocket):
  311. """
  312. Internal function to insert a socket. Used by process_accept and some
  313. tests.
  314. """
  315. self.sockets[newsocket.fileno()] = newsocket
  316. lname = self.newlname()
  317. self.lnames[lname] = newsocket
  318. self.fd_to_lname[newsocket.fileno()] = lname
  319. logger.debug(TRACE_BASIC, MSGQ_SOCKET_REGISTERED, newsocket.fileno(),
  320. lname)
  321. if self.poller:
  322. self.poller.register(newsocket, select.POLLIN)
  323. else:
  324. self.add_kqueue_socket(newsocket)
  325. self.members_notify('connected', {'client': lname})
  326. def kill_socket(self, fd, sock):
  327. """Fully close down the socket."""
  328. # Unregister events on the socket. Note that we don't have to do
  329. # this for kqueue because the registered events are automatically
  330. # deleted when the corresponding socket is closed.
  331. if self.poller:
  332. self.poller.unregister(sock)
  333. unsubscribed_from = self.subs.unsubscribe_all(sock)
  334. lname = self.fd_to_lname[fd]
  335. del self.fd_to_lname[fd]
  336. del self.lnames[lname]
  337. sock.close()
  338. del self.sockets[fd]
  339. if fd in self.sendbuffs:
  340. del self.sendbuffs[fd]
  341. logger.debug(TRACE_BASIC, MSGQ_SOCK_CLOSE, fd)
  342. # Filter out just the groups.
  343. unsubscribed_from_groups = set(map(lambda x: x[0], unsubscribed_from))
  344. for group in unsubscribed_from_groups:
  345. self.members_notify('unsubscribed', {
  346. 'client': lname,
  347. 'group': group
  348. })
  349. self.members_notify('disconnected', {'client': lname})
  350. def __getbytes(self, fd, sock, length, continued):
  351. """Get exactly the requested bytes, or raise an exception if
  352. EOF.
  353. continued is set to True if this method is called to complete
  354. already read data.
  355. """
  356. received = b''
  357. while len(received) < length:
  358. try:
  359. data = sock.recv(length - len(received))
  360. except socket.error as err:
  361. # This case includes ECONNRESET, which seems to happen when
  362. # the remote client has closed its socket at some subtle
  363. # timing (it should normally result in receiving empty data).
  364. # Since we didn't figure out how exactly that could happen,
  365. # we treat it just like other really-unexpected socket errors.
  366. raise MsgQReceiveError(str(err))
  367. if len(data) == 0:
  368. raise MsgQCloseOnReceive("EOF", continued)
  369. received += data
  370. continued = True
  371. return received
  372. def read_packet(self, fd, sock):
  373. """Read a correctly formatted packet. Will raise exceptions if
  374. something fails."""
  375. lengths = self.__getbytes(fd, sock, 6, False)
  376. overall_length, routing_length = struct.unpack(">IH", lengths)
  377. if overall_length < 2:
  378. raise MsgQReceiveError("overall_length < 2")
  379. overall_length -= 2
  380. if routing_length > overall_length:
  381. raise MsgQReceiveError("routing_length > overall_length")
  382. if routing_length == 0:
  383. raise MsgQReceiveError("routing_length == 0")
  384. data_length = overall_length - routing_length
  385. # probably need to sanity check lengths here...
  386. routing = self.__getbytes(fd, sock, routing_length, True)
  387. if data_length > 0:
  388. data = self.__getbytes(fd, sock, data_length, True)
  389. else:
  390. data = None
  391. return (routing, data)
  392. def process_packet(self, fd, sock):
  393. """Process one packet."""
  394. try:
  395. routing, data = self.read_packet(fd, sock)
  396. except (MsgQReceiveError, MsgQCloseOnReceive) as err:
  397. # If it's MsgQCloseOnReceive and that happens without reading
  398. # any data, it basically means the remote client has closed the
  399. # socket, so we log it as debug information. Otherwise, it's
  400. # a somewhat unexpected event, so we consider it an "error".
  401. if isinstance(err, MsgQCloseOnReceive) and not err.partial_read:
  402. logger.debug(TRACE_BASIC, MSGQ_CLOSE_ON_RECV, fd)
  403. else:
  404. logger.error(MSGQ_RECV_ERROR, fd, err)
  405. self.kill_socket(fd, sock)
  406. return
  407. try:
  408. routingmsg = isc.cc.message.from_wire(routing)
  409. except DecodeError as err:
  410. self.kill_socket(fd, sock)
  411. logger.error(MSGQ_HDR_DECODE_ERROR, fd, err)
  412. return
  413. self.process_command(fd, sock, routingmsg, data)
  414. def process_command(self, fd, sock, routing, data):
  415. """Process a single command. This will split out into one of the
  416. other functions."""
  417. logger.debug(TRACE_DETAIL, MSGQ_RECV_HDR, routing)
  418. cmd = routing[CC_HEADER_TYPE]
  419. if cmd == CC_COMMAND_SEND:
  420. self.process_command_send(sock, routing, data)
  421. elif cmd == CC_COMMAND_SUBSCRIBE:
  422. self.process_command_subscribe(sock, routing, data)
  423. elif cmd == CC_COMMAND_UNSUBSCRIBE:
  424. self.process_command_unsubscribe(sock, routing, data)
  425. elif cmd == CC_COMMAND_GET_LNAME:
  426. self.process_command_getlname(sock, routing, data)
  427. elif cmd == CC_COMMAND_PING:
  428. # Command for testing purposes
  429. self.process_command_ping(sock, routing, data)
  430. elif cmd == CC_COMMAND_STOP:
  431. self.stop()
  432. else:
  433. logger.error(MSGQ_INVALID_CMD, cmd)
  434. def preparemsg(self, env, msg = None):
  435. if type(env) == dict:
  436. env = isc.cc.message.to_wire(env)
  437. if type(msg) == dict:
  438. msg = isc.cc.message.to_wire(msg)
  439. length = 2 + len(env);
  440. if msg:
  441. length += len(msg)
  442. ret = struct.pack("!IH", length, len(env))
  443. ret += env
  444. if msg:
  445. ret += msg
  446. return ret
  447. def sendmsg(self, sock, env, msg = None):
  448. self.send_prepared_msg(sock, self.preparemsg(env, msg))
  449. def _send_data(self, sock, data):
  450. """
  451. Send a piece of data to the given socket. This method is
  452. essentially "private" to MsgQ, but defined as if it were "protected"
  453. for easier access from tests.
  454. Parameters:
  455. sock: The socket to send to
  456. data: The list of bytes to send
  457. Returns:
  458. An integer or None. If an integer (which can be 0), it signals
  459. the number of bytes sent. If None, the socket appears to have
  460. been closed on the other end, and it has been killed on this
  461. side too.
  462. """
  463. try:
  464. # We set the socket nonblocking, MSG_DONTWAIT doesn't exist
  465. # on some OSes
  466. sock.setblocking(0)
  467. return sock.send(data)
  468. except socket.error as e:
  469. if e.errno in [ errno.EAGAIN, errno.EWOULDBLOCK, errno.EINTR ]:
  470. return 0
  471. elif e.errno in [ errno.EPIPE, errno.ECONNRESET, errno.ENOBUFS ]:
  472. # EPIPE happens if the remote module has terminated by the time
  473. # of this send; its severity can vary, but in many cases it
  474. # shouldn't be critical, so we log it separately as a warning.
  475. if e.errno == errno.EPIPE:
  476. logger.warn(MSGQ_CLOSE_ON_SEND, sock.fileno())
  477. else:
  478. logger.error(MSGQ_SEND_ERROR, sock.fileno(),
  479. errno.errorcode[e.errno])
  480. self.kill_socket(sock.fileno(), sock)
  481. return None
  482. else:
  483. raise e
  484. finally:
  485. # And set it back again
  486. sock.setblocking(1)
  487. def send_prepared_msg(self, sock, msg):
  488. '''
  489. Add a message to the queue. If there's nothing waiting, try
  490. to send it right away.
  491. Return if the socket is still alive. It can return false if the
  492. socket dies (for example due to EPIPE in the attempt to send).
  493. Returning true does not guarantee the message will be delivered,
  494. but returning false means it won't.
  495. '''
  496. # Try to send the data, but only if there's nothing waiting
  497. fileno = sock.fileno()
  498. if fileno in self.sendbuffs:
  499. amount_sent = 0
  500. else:
  501. amount_sent = self._send_data(sock, msg)
  502. if amount_sent is None:
  503. # Socket has been killed, drop the send
  504. return False
  505. # Still something to send, add it to outgoing queue
  506. if amount_sent < len(msg):
  507. now = time.clock()
  508. # Append it to buffer (but check the data go away)
  509. if fileno in self.sendbuffs:
  510. (last_sent, buff) = self.sendbuffs[fileno]
  511. if now - last_sent > 0.1:
  512. self.kill_socket(fileno, sock)
  513. return False
  514. buff += msg
  515. else:
  516. buff = msg[amount_sent:]
  517. last_sent = now
  518. if self.poller:
  519. self.poller.register(fileno, select.POLLIN |
  520. select.POLLOUT)
  521. else:
  522. self.add_kqueue_socket(sock, True)
  523. self.sendbuffs[fileno] = (last_sent, buff)
  524. return True
  525. def __process_write(self, fileno):
  526. # Try to send some data from the buffer
  527. (_, msg) = self.sendbuffs[fileno]
  528. sock = self.sockets[fileno]
  529. amount_sent = self._send_data(sock, msg)
  530. if amount_sent is not None:
  531. # Keep the rest
  532. msg = msg[amount_sent:]
  533. if len(msg) == 0:
  534. # If there's no more, stop requesting for write availability
  535. if self.poller:
  536. self.poller.register(fileno, select.POLLIN)
  537. else:
  538. self.delete_kqueue_socket(sock, True)
  539. del self.sendbuffs[fileno]
  540. else:
  541. self.sendbuffs[fileno] = (time.clock(), msg)
  542. def newlname(self):
  543. """Generate a unique connection identifier for this socket.
  544. This is done by using an increasing counter and the current
  545. time."""
  546. self.connection_counter += 1
  547. return "%x_%x@%s" % (time.time(), self.connection_counter,
  548. self.hostname)
  549. def process_command_ping(self, sock, routing, data):
  550. self.sendmsg(sock, { CC_HEADER_TYPE : CC_COMMAND_PONG }, data)
  551. def process_command_getlname(self, sock, routing, data):
  552. lname = [ k for k, v in self.lnames.items() if v == sock ][0]
  553. self.sendmsg(sock, { CC_HEADER_TYPE : CC_COMMAND_GET_LNAME },
  554. { CC_PAYLOAD_LNAME : lname })
  555. def process_command_send(self, sock, routing, data):
  556. group = routing[CC_HEADER_GROUP]
  557. instance = routing[CC_HEADER_INSTANCE]
  558. to = routing[CC_HEADER_TO]
  559. if group == None or instance == None:
  560. # FIXME: Should we log them instead?
  561. return # ignore invalid packets entirely
  562. if to == CC_TO_WILDCARD:
  563. sockets = self.subs.find(group, instance)
  564. else:
  565. if to in self.lnames:
  566. sockets = [ self.lnames[to] ]
  567. else:
  568. sockets = []
  569. msg = self.preparemsg(routing, data)
  570. if sock in sockets:
  571. # Don't bounce to self
  572. sockets.remove(sock)
  573. has_recipient = False
  574. for socket in sockets:
  575. if self.send_prepared_msg(socket, msg):
  576. has_recipient = True
  577. if not has_recipient and routing.get(CC_HEADER_WANT_ANSWER) and \
  578. CC_HEADER_REPLY not in routing:
  579. # We have no recipients. But the sender insists on a reply
  580. # (and the message isn't a reply itself). We need to send
  581. # an error to satisfy the request, since there's nobody
  582. # else who can.
  583. #
  584. # We omit the replies on purpose. The recipient might generate
  585. # the response by copying and mangling the header of incoming
  586. # message (just like we do below) and would include the want_answer
  587. # by accident. And we want to avoid loops of errors. Also, it
  588. # is unclear if the knowledge of undeliverable reply would be
  589. # of any use to the sender, and it should be much rarer situation.
  590. # The real errors would be positive, 1 most probably. We use
  591. # negative errors for delivery errors to distinguish them a
  592. # little. We probably should have a way to provide more data
  593. # in the error message.
  594. payload = isc.config.ccsession.create_answer(CC_REPLY_NO_RECPT,
  595. "No such recipient")
  596. # We create the header based on the current one. But we don't
  597. # want to mangle it for the caller, so we get a copy. A shallow
  598. # one should be enough, we modify the dict only.
  599. header = routing.copy()
  600. header[CC_HEADER_REPLY] = routing[CC_HEADER_SEQ]
  601. # Dummy lname not assigned to clients
  602. header[CC_HEADER_FROM] = "msgq"
  603. header[CC_HEADER_TO] = routing[CC_HEADER_FROM]
  604. # We keep the seq as it is. We don't need to track the message
  605. # and we will not confuse the sender. The sender would use an
  606. # unique id for each message, so we won't return one twice to it.
  607. errmsg = self.preparemsg(header, payload)
  608. # Send it back.
  609. self.send_prepared_msg(sock, errmsg)
  610. def process_command_subscribe(self, sock, routing, data):
  611. group = routing[CC_HEADER_GROUP]
  612. instance = routing[CC_HEADER_INSTANCE]
  613. if group == None or instance == None:
  614. return # ignore invalid packets entirely
  615. self.subs.subscribe(group, instance, sock)
  616. lname = self.fd_to_lname[sock.fileno()]
  617. self.members_notify('subscribed',
  618. {
  619. 'client': lname,
  620. 'group': group
  621. })
  622. def process_command_unsubscribe(self, sock, routing, data):
  623. group = routing[CC_HEADER_GROUP]
  624. instance = routing[CC_HEADER_INSTANCE]
  625. if group == None or instance == None:
  626. return # ignore invalid packets entirely
  627. if self.subs.unsubscribe(group, instance, sock):
  628. lname = self.fd_to_lname[sock.fileno()]
  629. self.members_notify('unsubscribed',
  630. {
  631. 'client': lname,
  632. 'group': group
  633. })
  634. def run(self):
  635. """Process messages. Forever. Mostly."""
  636. self.running = True
  637. self.run_select()
  638. def run_select(self):
  639. while self.running:
  640. reads = list(self.fd_to_lname.keys())
  641. if self.listen_socket.fileno() != -1: # Skip in tests
  642. reads.append(self.listen_socket.fileno())
  643. if self.__poller_sock.fileno() != -1:
  644. reads.append(self.__poller_sock.fileno())
  645. writes = list(self.sendbuffs.keys())
  646. (read_ready, write_ready) = ([], [])
  647. try:
  648. (read_ready, write_ready, _) = select.select(reads, writes,
  649. []);
  650. except select.error as err:
  651. if err.args[0] == errno.EINTR:
  652. continue # Just try it again if interrupted.
  653. else:
  654. logger.fatal(MSGQ_SELECT_ERROR, err)
  655. break
  656. with self.__lock:
  657. write_ready = set(write_ready)
  658. for fd in read_ready:
  659. # Do only one operation per loop iteration on the given fd.
  660. # It could be possible to perform both, but it may have
  661. # undesired side effects in special situations (like, if the
  662. # read closes the socket).
  663. if fd in write_ready:
  664. write_ready.remove(fd)
  665. if fd == self.listen_socket.fileno():
  666. self.process_accept()
  667. elif fd == self.__poller_sock.fileno():
  668. # The signal socket. We should terminate now.
  669. self.running = False
  670. break
  671. else:
  672. self._process_fd(fd, False, True, False)
  673. for fd in write_ready:
  674. self._process_fd(fd, True, False, False)
  675. def run_poller(self):
  676. while self.running:
  677. try:
  678. # Poll with a timeout so that every once in a while,
  679. # the loop checks for self.running.
  680. events = self.poller.poll()
  681. except select.error as err:
  682. if err.args[0] == errno.EINTR:
  683. events = []
  684. else:
  685. logger.fatal(MSGQ_POLL_ERROR, err)
  686. break
  687. with self.__lock:
  688. for (fd, event) in events:
  689. if fd == self.listen_socket.fileno():
  690. self.process_accept()
  691. elif fd == self.__poller_sock.fileno():
  692. # If it's the signal socket, we should terminate now.
  693. self.running = False
  694. break
  695. else:
  696. writable = event & select.POLLOUT
  697. # Note: it may be okay to read data if available
  698. # immediately after write some, but due to unexpected
  699. # regression (see comments on the kqueue version below)
  700. # we restrict one operation per iteration for now.
  701. # In future we may clarify the point and enable the
  702. # "read/write" mode.
  703. readable = not writable and (event & select.POLLIN)
  704. if not writable and not readable:
  705. logger.error(MSGQ_POLL_UNKNOWN_EVENT, fd, event)
  706. self._process_fd(fd, writable, readable, False)
  707. def run_kqueue(self):
  708. while self.running:
  709. # Check with a timeout so that every once in a while,
  710. # the loop checks for self.running.
  711. events = self.kqueue.control(None, 10)
  712. if not events:
  713. raise RuntimeError('serve: kqueue returned no events')
  714. with self.__lock:
  715. for event in events:
  716. if event.ident == self.listen_socket.fileno():
  717. self.process_accept()
  718. elif event.ident == self.__poller_sock.fileno():
  719. # If it's the signal socket, we should terminate now.
  720. self.running = False
  721. break;
  722. else:
  723. fd = event.ident
  724. writable = event.filter == select.KQ_FILTER_WRITE
  725. readable = (event.filter == select.KQ_FILTER_READ and
  726. event.data > 0)
  727. # It seems to break some of our test cases if we
  728. # immediately close the socket on EOF after reading
  729. # some data. It may be possible to avoid by tweaking
  730. # the test, but unless we can be sure we'll hold off.
  731. closed = (not readable and
  732. (event.flags & select.KQ_EV_EOF))
  733. self._process_fd(fd, writable, readable, closed)
  734. def _process_fd(self, fd, writable, readable, closed):
  735. '''Process a single FD: unified subroutine of run_kqueue/poller.
  736. closed can be True only in the case of kqueue. This is essentially
  737. private but is defined as if it were "protected" so it's callable
  738. from tests.
  739. '''
  740. # We need to check if FD is still in the sockets dict, because
  741. # it's possible that the socket has been "killed" while processing
  742. # other FDs; it's even possible it's killed within this method.
  743. if writable and fd in self.sockets:
  744. self.__process_write(fd)
  745. if readable and fd in self.sockets:
  746. self.process_packet(fd, self.sockets[fd])
  747. if closed and fd in self.sockets:
  748. self.kill_socket(fd, self.sockets[fd])
  749. def stop(self):
  750. # Signal it should terminate.
  751. self.__control_sock.close()
  752. self.__control_sock = None
  753. # Abort anything waiting on the condition, just to make sure it's not
  754. # blocked forever
  755. self.cfgmgr_ready(False)
  756. def cleanup_signalsock(self):
  757. """Close the signal sockets. We could do it directly in shutdown,
  758. but this part is reused in tests.
  759. """
  760. if self.__poller_sock:
  761. self.__poller_sock.close()
  762. self.__poller_sock = None
  763. if self.__control_sock:
  764. self.__control_sock.close()
  765. self.__control_sock = None
  766. def shutdown(self):
  767. """Stop the MsgQ master."""
  768. logger.debug(TRACE_START, MSGQ_SHUTDOWN)
  769. self.listen_socket.close()
  770. self.cleanup_signalsock()
  771. # Close all the sockets too. In real life, there should be none now,
  772. # as Msgq should be the last one. But some tests don't adhere to this
  773. # and create a new Msgq for each test, which led to huge socket leaks.
  774. # Some other threads put some other things in instead of sockets, so
  775. # we catch whatever exceptions there we can. This should be safe,
  776. # because in real operation, we will terminate now anyway, implicitly
  777. # closing anything anyway.
  778. for sock in self.sockets.values():
  779. try:
  780. sock.close()
  781. except Exception:
  782. pass
  783. if os.path.exists(self.socket_file):
  784. os.remove(self.socket_file)
  785. def config_handler(self, new_config):
  786. """The configuration handler (run in a separate thread).
  787. Not tested, currently effectively empty.
  788. """
  789. config_logger.debug(TRACE_DETAIL, MSGQ_CONFIG_DATA, new_config)
  790. with self.__lock:
  791. if not self.running:
  792. return
  793. # TODO: Any config handling goes here.
  794. return isc.config.create_answer(0)
  795. def command_handler(self, command, args):
  796. """The command handler (run in a separate thread)."""
  797. config_logger.debug(TRACE_DETAIL, MSGQ_COMMAND, command, args)
  798. with self.__lock:
  799. if not self.running:
  800. return
  801. # TODO: Who does validation? The ModuleCCSession or must we?
  802. if command == 'members':
  803. # List all members of MsgQ or of a group.
  804. if args is None:
  805. args = {}
  806. group = args.get('group')
  807. if group:
  808. return isc.config.create_answer(0,
  809. list(map(lambda sock: self.fd_to_lname[sock.fileno()],
  810. self.subs.find(group, ''))))
  811. else:
  812. return isc.config.create_answer(0,
  813. list(self.lnames.keys()))
  814. config_logger.error(MSGQ_COMMAND_UNKNOWN, command)
  815. return isc.config.create_answer(1, 'unknown command: ' + command)
  816. def signal_handler(msgq, signal, frame):
  817. if msgq:
  818. msgq.stop()
  819. if __name__ == "__main__":
  820. def check_port(option, opt_str, value, parser):
  821. """Function to insure that the port we are passed is actually
  822. a valid port number. Used by OptionParser() on startup."""
  823. intval = int(value)
  824. if (intval < 0) or (intval > 65535):
  825. raise OptionValueError("%s requires a port number (0-65535)" %
  826. opt_str)
  827. parser.values.msgq_port = intval
  828. # Parse any command-line options.
  829. parser = OptionParser(version=VERSION)
  830. # TODO: Should we remove the option?
  831. parser.add_option("-v", "--verbose", dest="verbose", action="store_true",
  832. help="display more about what is going on")
  833. parser.add_option("-s", "--socket-file", dest="msgq_socket_file",
  834. type="string", default=None,
  835. help="UNIX domain socket file the msgq daemon will use")
  836. (options, args) = parser.parse_args()
  837. # Announce startup.
  838. logger.debug(TRACE_START, MSGQ_START, VERSION)
  839. msgq = MsgQ(options.msgq_socket_file, options.verbose)
  840. signal.signal(signal.SIGTERM,
  841. lambda signal, frame: signal_handler(msgq, signal, frame))
  842. try:
  843. msgq.setup()
  844. except Exception as e:
  845. logger.fatal(MSGQ_START_FAIL, e)
  846. sys.exit(1)
  847. # We run the processing in a separate thread. This is because we want to
  848. # connect to the msgq ourself. But the cc library is unfortunately blocking
  849. # in many places and waiting for the processing part to answer, it would
  850. # deadlock.
  851. poller_thread = threading.Thread(target=msgq.run)
  852. poller_thread.daemon = True
  853. try:
  854. poller_thread.start()
  855. if msgq.wait_cfgmgr():
  856. # Once we get the config manager, we can read our own config.
  857. session = isc.config.ModuleCCSession(SPECFILE_LOCATION,
  858. msgq.config_handler,
  859. msgq.command_handler,
  860. None, True,
  861. msgq.socket_file)
  862. msgq._session = session
  863. session.start()
  864. # And we create a thread that'll just wait for commands and
  865. # handle them. We don't terminate the thread, we set it to
  866. # daemon. Once the main thread terminates, it'll just die.
  867. def run_session():
  868. while True:
  869. # As the check_command has internal mutex that is shared
  870. # with sending part (which includes notify). So we don't
  871. # want to hold it long-term and block using select.
  872. fileno = session.get_socket().fileno()
  873. try:
  874. (reads, _, _) = select.select([fileno], [], [])
  875. except select.error as se:
  876. if se.args[0] != errno.EINTR:
  877. raise
  878. session.check_command(True)
  879. background_thread = threading.Thread(target=run_session)
  880. background_thread.daemon = True
  881. background_thread.start()
  882. poller_thread.join()
  883. except KeyboardInterrupt:
  884. pass
  885. msgq.shutdown()
  886. logger.info(MSGQ_EXITING)