msgq.py.in 19 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275276277278279280281282283284285286287288289290291292293294295296297298299300301302303304305306307308309310311312313314315316317318319320321322323324325326327328329330331332333334335336337338339340341342343344345346347348349350351352353354355356357358359360361362363364365366367368369370371372373374375376377378379380381382383384385386387388389390391392393394395396397398399400401402403404405406407408409410411412413414415416417418419420421422423424425426427428429430431432433434435436437438439440441442443444445446447448449450451452453454455456457458459460461462463464465466467468469470471472473474475476477478479480481482483484485486487488489490491492493494495496497498499500501502503504505506507508509510511512513514515516517518519520521522523524525526527528529
  1. #!@PYTHON@
  2. # Copyright (C) 2010 Internet Systems Consortium.
  3. #
  4. # Permission to use, copy, modify, and distribute this software for any
  5. # purpose with or without fee is hereby granted, provided that the above
  6. # copyright notice and this permission notice appear in all copies.
  7. #
  8. # THE SOFTWARE IS PROVIDED "AS IS" AND INTERNET SYSTEMS CONSORTIUM
  9. # DISCLAIMS ALL WARRANTIES WITH REGARD TO THIS SOFTWARE INCLUDING ALL
  10. # IMPLIED WARRANTIES OF MERCHANTABILITY AND FITNESS. IN NO EVENT SHALL
  11. # INTERNET SYSTEMS CONSORTIUM BE LIABLE FOR ANY SPECIAL, DIRECT,
  12. # INDIRECT, OR CONSEQUENTIAL DAMAGES OR ANY DAMAGES WHATSOEVER RESULTING
  13. # FROM LOSS OF USE, DATA OR PROFITS, WHETHER IN AN ACTION OF CONTRACT,
  14. # NEGLIGENCE OR OTHER TORTIOUS ACTION, ARISING OUT OF OR IN CONNECTION
  15. # WITH THE USE OR PERFORMANCE OF THIS SOFTWARE.
  16. import sys; sys.path.append ('@@PYTHONPATH@@')
  17. """This code implements the msgq daemon."""
  18. import subprocess
  19. import signal
  20. import os
  21. import socket
  22. import sys
  23. import struct
  24. import errno
  25. import time
  26. import select
  27. import pprint
  28. import random
  29. from optparse import OptionParser, OptionValueError
  30. import isc.util.process
  31. import isc.cc
  32. isc.util.process.rename()
  33. # This is the version that gets displayed to the user.
  34. # The VERSION string consists of the module name, the module version
  35. # number, and the overall BIND 10 version number (set in configure.ac).
  36. VERSION = "b10-msgq 20100818 (BIND 10 @PACKAGE_VERSION@)"
  37. class MsgQReceiveError(Exception): pass
  38. class SubscriptionManager:
  39. def __init__(self):
  40. self.subscriptions = {}
  41. def subscribe(self, group, instance, socket):
  42. """Add a subscription."""
  43. target = ( group, instance )
  44. if target in self.subscriptions:
  45. print("[b10-msgq] Appending to existing target")
  46. if socket not in self.subscriptions[target]:
  47. self.subscriptions[target].append(socket)
  48. else:
  49. print("[b10-msgq] Creating new target")
  50. self.subscriptions[target] = [ socket ]
  51. def unsubscribe(self, group, instance, socket):
  52. """Remove the socket from the one specific subscription."""
  53. target = ( group, instance )
  54. if target in self.subscriptions:
  55. if socket in self.subscriptions[target]:
  56. self.subscriptions[target].remove(socket)
  57. def unsubscribe_all(self, socket):
  58. """Remove the socket from all subscriptions."""
  59. for socklist in self.subscriptions.values():
  60. if socket in socklist:
  61. socklist.remove(socket)
  62. def find_sub(self, group, instance):
  63. """Return an array of sockets which want this specific group,
  64. instance."""
  65. target = (group, instance)
  66. if target in self.subscriptions:
  67. return self.subscriptions[target]
  68. else:
  69. return []
  70. def find(self, group, instance):
  71. """Return an array of sockets who should get something sent to
  72. this group, instance pair. This includes wildcard subscriptions."""
  73. target = (group, instance)
  74. partone = self.find_sub(group, instance)
  75. parttwo = self.find_sub(group, "*")
  76. return list(set(partone + parttwo))
  77. class MsgQ:
  78. """Message Queue class."""
  79. # did we find a better way to do this?
  80. SOCKET_FILE = os.path.join("@localstatedir@",
  81. "@PACKAGE_NAME@",
  82. "msgq_socket").replace("${prefix}",
  83. "@prefix@")
  84. def __init__(self, socket_file=None, verbose=False):
  85. """Initialize the MsgQ master.
  86. The socket_file specifies the path to the UNIX domain socket
  87. that the msgq process listens on. If it is None, the
  88. environment variable BIND10_MSGQ_SOCKET_FILE is used. If that
  89. is not set, it will default to
  90. @localstatedir@/@PACKAGE_NAME@/msg_socket.
  91. If verbose is True, then the MsgQ reports
  92. what it is doing.
  93. """
  94. if socket_file is None:
  95. if "BIND10_MSGQ_SOCKET_FILE" in os.environ:
  96. self.socket_file = os.environ["BIND10_MSGQ_SOCKET_FILE"]
  97. else:
  98. self.socket_file = self.SOCKET_FILE
  99. else:
  100. self.socket_file = socket_file
  101. self.verbose = verbose
  102. self.poller = None
  103. self.kqueue = None
  104. self.runnable = False
  105. self.listen_socket = False
  106. self.sockets = {}
  107. self.connection_counter = random.random()
  108. self.hostname = socket.gethostname()
  109. self.subs = SubscriptionManager()
  110. self.lnames = {}
  111. self.sendbuffs = {}
  112. def setup_poller(self):
  113. """Set up the poll thing. Internal function."""
  114. try:
  115. self.poller = select.poll()
  116. except AttributeError:
  117. self.kqueue = select.kqueue()
  118. def add_kqueue_socket(self, socket, add_filter = 0):
  119. event = select.kevent(socket.fileno(),
  120. select.KQ_FILTER_READ | add_filter,
  121. select.KQ_EV_ADD | select.KQ_EV_ENABLE)
  122. self.kqueue.control([event], 0)
  123. def setup_listener(self):
  124. """Set up the listener socket. Internal function."""
  125. if self.verbose:
  126. sys.stdout.write("[b10-msgq] Setting up socket at %s\n" %
  127. self.socket_file)
  128. self.listen_socket = socket.socket(socket.AF_UNIX, socket.SOCK_STREAM)
  129. if os.path.exists(self.socket_file):
  130. os.remove(self.socket_file)
  131. try:
  132. self.listen_socket.bind(self.socket_file)
  133. self.listen_socket.listen(1024)
  134. except Exception as e:
  135. # remove the file again if something goes wrong
  136. # (note this is a catch-all, but we reraise it)
  137. if os.path.exists(self.socket_file):
  138. os.remove(self.socket_file)
  139. raise e
  140. if self.poller:
  141. self.poller.register(self.listen_socket, select.POLLIN)
  142. else:
  143. self.add_kqueue_socket(self.listen_socket)
  144. def setup(self):
  145. """Configure listener socket, polling, etc.
  146. Raises a socket.error if the socket_file cannot be
  147. created.
  148. """
  149. self.setup_poller()
  150. self.setup_listener()
  151. if self.verbose:
  152. sys.stdout.write("[b10-msgq] Listening\n")
  153. self.runnable = True
  154. def process_accept(self):
  155. """Process an accept on the listening socket."""
  156. newsocket, ipaddr = self.listen_socket.accept()
  157. # TODO: When we have logging, we might want
  158. # to add a debug message here that a new connection
  159. # was made
  160. self.register_socket(self, newsocket)
  161. def register_socket(self, newsocket):
  162. """
  163. Internal function to insert a socket. Used by process_accept and some tests.
  164. """
  165. self.sockets[newsocket.fileno()] = newsocket
  166. lname = self.newlname()
  167. self.lnames[lname] = newsocket
  168. if self.poller:
  169. self.poller.register(newsocket, select.POLLIN)
  170. else:
  171. self.add_kqueue_socket(newsocket)
  172. def process_socket(self, fd):
  173. """Process a read on a socket."""
  174. if not fd in self.sockets:
  175. sys.stderr.write("[b10-msgq] Got read on Strange Socket fd %d\n" % fd)
  176. return
  177. sock = self.sockets[fd]
  178. # sys.stderr.write("[b10-msgq] Got read on fd %d\n" %fd)
  179. self.process_packet(fd, sock)
  180. def kill_socket(self, fd, sock):
  181. """Fully close down the socket."""
  182. if self.poller:
  183. self.poller.unregister(sock)
  184. self.subs.unsubscribe_all(sock)
  185. lname = [ k for k, v in self.lnames.items() if v == sock ][0]
  186. del self.lnames[lname]
  187. sock.close()
  188. del self.sockets[fd]
  189. if fd in self.sendbuffs:
  190. del self.sendbuffs[fd]
  191. sys.stderr.write("[b10-msgq] Closing socket fd %d\n" % fd)
  192. def getbytes(self, fd, sock, length):
  193. """Get exactly the requested bytes, or raise an exception if
  194. EOF."""
  195. received = b''
  196. while len(received) < length:
  197. try:
  198. data = sock.recv(length - len(received))
  199. except socket.error:
  200. raise MsgQReceiveError(socket.error)
  201. if len(data) == 0:
  202. raise MsgQReceiveError("EOF")
  203. received += data
  204. return received
  205. def read_packet(self, fd, sock):
  206. """Read a correctly formatted packet. Will raise exceptions if
  207. something fails."""
  208. lengths = self.getbytes(fd, sock, 6)
  209. overall_length, routing_length = struct.unpack(">IH", lengths)
  210. if overall_length < 2:
  211. raise MsgQReceiveError("overall_length < 2")
  212. overall_length -= 2
  213. if routing_length > overall_length:
  214. raise MsgQReceiveError("routing_length > overall_length")
  215. if routing_length == 0:
  216. raise MsgQReceiveError("routing_length == 0")
  217. data_length = overall_length - routing_length
  218. # probably need to sanity check lengths here...
  219. routing = self.getbytes(fd, sock, routing_length)
  220. if data_length > 0:
  221. data = self.getbytes(fd, sock, data_length)
  222. else:
  223. data = None
  224. return (routing, data)
  225. def process_packet(self, fd, sock):
  226. """Process one packet."""
  227. try:
  228. routing, data = self.read_packet(fd, sock)
  229. except MsgQReceiveError as err:
  230. self.kill_socket(fd, sock)
  231. sys.stderr.write("[b10-msgq] Receive error: %s\n" % err)
  232. return
  233. try:
  234. routingmsg = isc.cc.message.from_wire(routing)
  235. except DecodeError as err:
  236. self.kill_socket(fd, sock)
  237. sys.stderr.write("[b10-msgq] Routing decode error: %s\n" % err)
  238. return
  239. # sys.stdout.write("\t" + pprint.pformat(routingmsg) + "\n")
  240. # sys.stdout.write("\t" + pprint.pformat(data) + "\n")
  241. self.process_command(fd, sock, routingmsg, data)
  242. def process_command(self, fd, sock, routing, data):
  243. """Process a single command. This will split out into one of the
  244. other functions."""
  245. # TODO: A print statement got removed here (one that prints the
  246. # routing envelope). When we have logging with multiple levels,
  247. # we might want to re-add that on a high debug verbosity.
  248. cmd = routing["type"]
  249. if cmd == 'send':
  250. self.process_command_send(sock, routing, data)
  251. elif cmd == 'subscribe':
  252. self.process_command_subscribe(sock, routing, data)
  253. elif cmd == 'unsubscribe':
  254. self.process_command_unsubscribe(sock, routing, data)
  255. elif cmd == 'getlname':
  256. self.process_command_getlname(sock, routing, data)
  257. elif cmd == 'ping':
  258. # Command for testing purposes
  259. self.process_command_ping(sock, routing, data)
  260. else:
  261. sys.stderr.write("[b10-msgq] Invalid command: %s\n" % cmd)
  262. def preparemsg(self, env, msg = None):
  263. if type(env) == dict:
  264. env = isc.cc.message.to_wire(env)
  265. if type(msg) == dict:
  266. msg = isc.cc.message.to_wire(msg)
  267. length = 2 + len(env);
  268. if msg:
  269. length += len(msg)
  270. ret = struct.pack("!IH", length, len(env))
  271. ret += env
  272. if msg:
  273. ret += msg
  274. return ret
  275. def sendmsg(self, sock, env, msg = None):
  276. self.send_prepared_msg(sock, self.preparemsg(env, msg))
  277. def send_prepared_msg(self, sock, msg):
  278. # Try to send the data, but only if there's nothing waiting
  279. fileno = sock.fileno()
  280. if fileno in self.sendbuffs:
  281. amount_sent = 0
  282. else:
  283. try:
  284. amount_sent = sock.send(msg, socket.MSG_DONTWAIT)
  285. except socket.error as e:
  286. if e.errno == errno.EAGAIN or e.errno == errno.EWOULDBLOCK:
  287. amount_sent = 0
  288. else:
  289. raise e
  290. # Still something to send
  291. if amount_sent < len(msg):
  292. now = time.clock()
  293. # Append it to buffer (but check the data go away)
  294. if fileno in self.sendbuffs:
  295. (last_sent, buff) = self.sendbuffs[fileno]
  296. if now - last_sent > 0.1:
  297. self.kill_socket(fileno, sock)
  298. return
  299. buff += msg[amount_sent:]
  300. else:
  301. buff = msg
  302. last_sent = now
  303. if self.poller:
  304. try:
  305. self.poller.register(fileno, select.POLLIN |
  306. select.POLLOUT)
  307. except Exception as e:
  308. raise e
  309. else:
  310. self.add_kqueue_socket(fileno, select.KQ_FILTER_WRITE)
  311. self.sendbuffs[fileno] = (last_sent, buff)
  312. def process_write(self, fileno):
  313. # Try to send some data from the buffer
  314. (_, msg) = self.sendbuffs[fileno]
  315. sock = self.sockets[fileno]
  316. try:
  317. amount_sent = sock.send(msg, socket.MSG_DONTWAIT)
  318. except socket.error as e:
  319. if e.errno == errno.EAGAIN or e.errno == errno.EWOULDBLOCK:
  320. amount_sent = 0
  321. else:
  322. raise e
  323. # Keep the rest
  324. msg = msg[amount_sent:]
  325. if len(msg) == 0:
  326. # If there's no more, stop requesting for write availability
  327. if self.poller:
  328. self.poller.register(fileno, select.POLLIN)
  329. else:
  330. self.add_kqueue_socket(fileno)
  331. del self.sendbuffs[fileno]
  332. else:
  333. self.sendbuffs[fileno] = (time.clock(), msg)
  334. def newlname(self):
  335. """Generate a unique connection identifier for this socket.
  336. This is done by using an increasing counter and the current
  337. time."""
  338. self.connection_counter += 1
  339. return "%x_%x@%s" % (time.time(), self.connection_counter, self.hostname)
  340. def process_command_ping(self, sock, routing, data):
  341. self.sendmsg(sock, { "type" : "pong" }, data)
  342. def process_command_getlname(self, sock, routing, data):
  343. lname = [ k for k, v in self.lnames.items() if v == sock ][0]
  344. self.sendmsg(sock, { "type" : "getlname" }, { "lname" : lname })
  345. def process_command_send(self, sock, routing, data):
  346. group = routing["group"]
  347. instance = routing["instance"]
  348. to = routing["to"]
  349. if group == None or instance == None:
  350. return # ignore invalid packets entirely
  351. if to == "*":
  352. sockets = self.subs.find(group, instance)
  353. else:
  354. if to in self.lnames:
  355. sockets = [ self.lnames[to] ]
  356. else:
  357. return # recipient doesn't exist
  358. msg = self.preparemsg(routing, data)
  359. if sock in sockets:
  360. sockets.remove(sock)
  361. for socket in sockets:
  362. self.send_prepared_msg(socket, msg)
  363. def process_command_subscribe(self, sock, routing, data):
  364. group = routing["group"]
  365. instance = routing["instance"]
  366. if group == None or instance == None:
  367. return # ignore invalid packets entirely
  368. self.subs.subscribe(group, instance, sock)
  369. def process_command_unsubscribe(self, sock, routing, data):
  370. group = routing["group"]
  371. instance = routing["instance"]
  372. if group == None or instance == None:
  373. return # ignore invalid packets entirely
  374. self.subs.unsubscribe(group, instance, sock)
  375. def run(self):
  376. """Process messages. Forever. Mostly."""
  377. if self.poller:
  378. self.run_poller()
  379. else:
  380. self.run_kqueue()
  381. def run_poller(self):
  382. while True:
  383. try:
  384. events = self.poller.poll()
  385. except select.error as err:
  386. if err.args[0] == errno.EINTR:
  387. events = []
  388. else:
  389. sys.stderr.write("[b10-msgq] Error with poll(): %s\n" % err)
  390. break
  391. for (fd, event) in events:
  392. if fd == self.listen_socket.fileno():
  393. self.process_accept()
  394. else:
  395. if event & select.POLLOUT:
  396. self.process_write(fd)
  397. if event & select.POLLIN:
  398. self.process_socket(fd)
  399. def run_kqueue(self):
  400. while True:
  401. events = self.kqueue.control(None, 10)
  402. if not events:
  403. raise RuntimeError('serve: kqueue returned no events')
  404. for event in events:
  405. if event.ident == self.listen_socket.fileno():
  406. self.process_accept()
  407. else:
  408. if event.flags & select.KQ_FILTER_WRITE:
  409. self.process_socket(event.ident)
  410. if event.flags & select.KQ_FILTER_READ and event.data > 0:
  411. self.process_socket(event.ident)
  412. elif event.flags & select.KQ_EV_EOF:
  413. self.kill_socket(event.ident, self.sockets[event.ident])
  414. def shutdown(self):
  415. """Stop the MsgQ master."""
  416. if self.verbose:
  417. sys.stdout.write("[b10-msgq] Stopping the server.\n")
  418. self.listen_socket.close()
  419. if os.path.exists(self.socket_file):
  420. os.remove(self.socket_file)
  421. # can signal handling and calling a destructor be done without a
  422. # global variable?
  423. msgq = None
  424. def signal_handler(signal, frame):
  425. if msgq:
  426. msgq.shutdown()
  427. sys.exit(0)
  428. if __name__ == "__main__":
  429. def check_port(option, opt_str, value, parser):
  430. """Function to insure that the port we are passed is actually
  431. a valid port number. Used by OptionParser() on startup."""
  432. intval = int(value)
  433. if (intval < 0) or (intval > 65535):
  434. raise OptionValueError("%s requires a port number (0-65535)" % opt_str)
  435. parser.values.msgq_port = intval
  436. # Parse any command-line options.
  437. parser = OptionParser(version=VERSION)
  438. parser.add_option("-v", "--verbose", dest="verbose", action="store_true",
  439. help="display more about what is going on")
  440. parser.add_option("-s", "--socket-file", dest="msgq_socket_file",
  441. type="string", default=None,
  442. help="UNIX domain socket file the msgq daemon will use")
  443. (options, args) = parser.parse_args()
  444. signal.signal(signal.SIGTERM, signal_handler)
  445. # Announce startup.
  446. if options.verbose:
  447. sys.stdout.write("[b10-msgq] %s\n" % VERSION)
  448. msgq = MsgQ(options.msgq_socket_file, options.verbose)
  449. setup_result = msgq.setup()
  450. if setup_result:
  451. sys.stderr.write("[b10-msgq] Error on startup: %s\n" % setup_result)
  452. sys.exit(1)
  453. try:
  454. msgq.run()
  455. except KeyboardInterrupt:
  456. pass
  457. msgq.shutdown()