msgq.py 7.6 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227
  1. """\
  2. This code implements a Python version of the msgq daemon.
  3. This replaces the C version, which relied upon the BIND 9 "libisc"
  4. libraries.
  5. """
  6. import subprocess
  7. import signal
  8. import os
  9. import socket
  10. import sys
  11. import struct
  12. import errno
  13. import time
  14. import select
  15. import pprint
  16. from optparse import OptionParser, OptionValueError
  17. import ISC.CC
  18. class MsgQReceiveError(Exception): pass
  19. # This is the version that gets displayed to the user.
  20. __version__ = "v20091030 (Paving the DNS Parking Lot)"
  21. class MsgQ:
  22. """Message Queue class."""
  23. def __init__(self, c_channel_port=9912, verbose=False):
  24. """Initialize the MsgQ master.
  25. The c_channel_port specifies the TCP/IP port that the msgq
  26. process listens on. If verbose is True, then the MsgQ reports
  27. what it is doing.
  28. """
  29. self.verbose = True
  30. self.c_channel_port = c_channel_port
  31. self.poller = None
  32. self.runnable = False
  33. self.listen_socket = False
  34. self.sockets = {}
  35. def setup_poller(self):
  36. """Set up the poll thing. Internal function."""
  37. self.poller = select.poll()
  38. def setup_listener(self):
  39. """Set up the listener socket. Internal function."""
  40. self.listen_socket = socket.socket(socket.AF_INET, socket.SOCK_STREAM)
  41. self.listen_socket.bind(("127.0.0.1", self.c_channel_port))
  42. self.listen_socket.listen(1024)
  43. self.listen_socket.setsockopt(socket.SOL_SOCKET, socket.SO_REUSEADDR, 1)
  44. self.poller.register(self.listen_socket, select.POLLIN)
  45. def setup(self):
  46. """Configure listener socket, polling, etc."""
  47. self.setup_poller()
  48. self.setup_listener()
  49. if self.verbose:
  50. sys.stdout.write("Listening\n")
  51. self.runnable = True
  52. def process_accept(self):
  53. """Process an accept on the listening socket."""
  54. newsocket, ipaddr = self.listen_socket.accept()
  55. sys.stderr.write("Connection\n")
  56. self.sockets[newsocket.fileno()] = newsocket
  57. self.poller.register(newsocket, select.POLLIN)
  58. def process_socket(self, fd):
  59. """Process a read on a socket."""
  60. sock = self.sockets[fd]
  61. if sock == None:
  62. sys.stderr.write("Got read on Strange Socket fd %d\n" % fd)
  63. return
  64. sys.stderr.write("Got read on fd %d\n" %fd)
  65. self.process_packet(fd, sock)
  66. def kill_socket(self, fd, sock):
  67. """Fully close down the socket."""
  68. self.poller.unregister(sock)
  69. sock.close()
  70. self.sockets[fd] = None
  71. sys.stderr.write("Closing socket fd %d\n" % fd)
  72. def getbytes(self, fd, sock, length):
  73. """Get exactly the requested bytes, or raise an exception if
  74. EOF."""
  75. received = b''
  76. while len(received) < length:
  77. data = sock.recv(length - len(received))
  78. if len(data) == 0:
  79. raise MsgQReceiveError("EOF")
  80. received += data
  81. return received
  82. def read_packet(self, fd, sock):
  83. """Read a correctly formatted packet. Will raise exceptions if
  84. something fails."""
  85. lengths = self.getbytes(fd, sock, 6)
  86. overall_length, routing_length = struct.unpack(">IH", lengths)
  87. if overall_length < 2:
  88. raise MsgQReceiveError("overall_length < 2")
  89. overall_length -= 2
  90. sys.stderr.write("overall length: %d, routing_length %d\n"
  91. % (overall_length, routing_length))
  92. if routing_length > overall_length:
  93. raise MsgQReceiveError("routing_length > overall_length")
  94. if routing_length == 0:
  95. raise MsgQReceiveError("routing_length == 0")
  96. data_length = overall_length - routing_length
  97. # probably need to sanity check lengths here...
  98. routing = self.getbytes(fd, sock, routing_length)
  99. if data_length > 0:
  100. data = self.getbytes(fd, sock, data_length)
  101. else:
  102. data = None
  103. return (routing, data)
  104. def process_packet(self, fd, sock):
  105. """Process one packet."""
  106. try:
  107. routing, data = self.read_packet(fd, sock)
  108. except MsgQReceiveError as err:
  109. self.kill_socket(fd, sock)
  110. sys.stderr.write("Receive error: %s\n" % err)
  111. return
  112. try:
  113. routingmsg = ISC.CC.Message.from_wire(routing)
  114. except DecodeError as err:
  115. self.kill_socket(fd, sock)
  116. sys.stderr.write("Routing decode error: %s\n" % err)
  117. return
  118. sys.stdout.write("\t" + pprint.pformat(routingmsg) + "\n")
  119. sys.stdout.write("\t" + pprint.pformat(data) + "\n")
  120. self.process_command(fd, sock, routingmsg, data)
  121. def process_command(self, fd, sock, routing, data):
  122. """Process a single command. This will split out into one of the
  123. other functions, above."""
  124. cmd = routing["type"]
  125. if cmd == 'getlname':
  126. self.process_command_getlname(sock, routing, data)
  127. elif cmd == 'send':
  128. self.process_command_send(sock, routing, data)
  129. else:
  130. sys.stderr.write("Invalid command: %s\n" % cmd)
  131. def sendmsg(self, sock, env, msg = None):
  132. if type(env) == dict:
  133. env = ISC.CC.Message.to_wire(env)
  134. if type(msg) == dict:
  135. msg = ISC.CC.Message.to_wire(msg)
  136. sock.setblocking(1)
  137. length = 2 + len(env);
  138. if msg:
  139. length += len(msg)
  140. sock.send(struct.pack("!IH", length, len(env)))
  141. sock.send(env)
  142. if msg:
  143. sock.send(msg)
  144. def process_command_getlname(self, sock, routing, data):
  145. self.sendmsg(sock, { "type" : "getlname" }, { "lname" : "staticlname" })
  146. def run(self):
  147. """Process messages. Forever. Mostly."""
  148. while True:
  149. try:
  150. events = self.poller.poll()
  151. except select.error as err:
  152. if err.args[0] == errno.EINTR:
  153. events = []
  154. else:
  155. sys.stderr.write("Error with poll(): %s\n" % err)
  156. break
  157. for (fd, event) in events:
  158. if fd == self.listen_socket.fileno():
  159. self.process_accept()
  160. else:
  161. self.process_socket(fd)
  162. def shutdown(self):
  163. """Stop the MsgQ master."""
  164. if self.verbose:
  165. sys.stdout.write("Stopping the server.\n")
  166. self.listen_socket.close()
  167. if __name__ == "__main__":
  168. def check_port(option, opt_str, value, parser):
  169. """Function to insure that the port we are passed is actually
  170. a valid port number. Used by OptionParser() on startup."""
  171. intval = int(value)
  172. if (intval < 0) or (intval > 65535):
  173. raise OptionValueError("%s requires a port number (0-65535)" % opt_str)
  174. parser.values.msgq_port = intval
  175. # Parse any command-line options.
  176. parser = OptionParser(version=__version__)
  177. parser.add_option("-v", "--verbose", dest="verbose", action="store_true",
  178. help="display more about what is going on")
  179. parser.add_option("-m", "--msgq-port", dest="msgq_port", type="string",
  180. action="callback", callback=check_port, default="9912",
  181. help="port the msgq daemon will use")
  182. (options, args) = parser.parse_args()
  183. # Announce startup.
  184. if options.verbose:
  185. sys.stdout.write("MsgQ %s\n" % __version__)
  186. msgq = MsgQ(int(options.msgq_port), options.verbose)
  187. setup_result = msgq.setup()
  188. if setup_result:
  189. sys.stderr.write("Error on startup: %s\n" % setup_result)
  190. sys.exit(1)
  191. msgq.run()
  192. msgq.shutdown()