notify_out.py 12 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275276277278279280281282283284285286287288289290291292293294295296297298299300301302303304305306307308309310311
  1. import select
  2. import random
  3. import socket
  4. import threading
  5. import time
  6. from isc.datasrc import sqlite3_ds
  7. import isc
  8. try:
  9. from libdns_python import *
  10. except ImportError as e:
  11. # C++ loadable module may not be installed;
  12. sys.stderr.write('[b10-xfrout] failed to import DNS or XFR module: %s\n' % str(e))
  13. ZONE_NEW_DATA_READY_CMD = 'zone_new_data_ready'
  14. _MAX_NOTIFY_NUM = 30
  15. _MAX_NOTIFY_TRY_NUM = 5
  16. _EVENT_NONE = 0
  17. _EVENT_READ = 1
  18. _EVENT_TIMEOUT = 2
  19. _NOTIFY_TIMEOUT = 2
  20. def addr_to_str(addr):
  21. return '%s#%s' % (addr[0], addr[1])
  22. def dispatcher(notifier):
  23. while True:
  24. replied_zones, not_replied_zones = notifier._wait_for_notify_reply()
  25. if len(replied_zones) == 0 and len(not_replied_zones) == 0:
  26. time.sleep(0.5) # A better time?
  27. continue
  28. for name_ in replied_zones:
  29. notifier._zone_notify_handler(replied_zones[name_], _EVENT_READ)
  30. for name_ in not_replied_zones:
  31. if not_replied_zones[name_].notify_timeout < time.time():
  32. notifier._zone_notify_handler(not_replied_zones[name_], _EVENT_TIMEOUT)
  33. class ZoneNotifyInfo:
  34. '''This class keeps track of notify-out information for one zone.
  35. timeout_: absolute time for next notify reply.
  36. '''
  37. def __init__(self, zone_name_, klass):
  38. self._notify_slaves = []
  39. self._notify_current = None
  40. self._slave_index = 0
  41. self._sock = None
  42. self.zone_name = zone_name_
  43. self.zone_class = klass
  44. self.notify_msg_id = 0
  45. self.notify_timeout = 0
  46. # Notify times sending to one target.
  47. self.notify_try_num = 0
  48. def set_next_notify_target(self):
  49. if self._slave_index < (len(self._notify_slaves) - 1):
  50. self._slave_index += 1
  51. self._notify_current = self._notify_slaves[self._slave_index]
  52. else:
  53. self._notify_current = None
  54. def prepare_notify_out(self):
  55. '''Create the socket and set notify timeout time to now'''
  56. self._sock = socket.socket(socket.AF_INET, socket.SOCK_DGRAM) #TODO support IPv6?
  57. self.notify_timeout = time.time()
  58. self.notify_try_num = 0
  59. self._slave_index = 0
  60. if len(self._notify_slaves) > 0:
  61. self._notify_current = self._notify_slaves[0]
  62. def finish_notify_out(self):
  63. if self._sock:
  64. self._sock.close()
  65. self._sock = None
  66. def get_socket(self):
  67. return self._sock
  68. def get_current_notify_target(self):
  69. return self._notify_current
  70. class NotifyOut:
  71. def __init__(self, datasrc_file, log=None, verbose=True):
  72. self._notify_infos = {}
  73. self._waiting_zones = []
  74. self._notifying_zones = []
  75. self._log = log
  76. self.notify_num = 0 # the count of in progress notifies
  77. self._verbose = verbose
  78. self._lock = threading.Lock()
  79. self._db_file = datasrc_file
  80. self._init_notify_out(datasrc_file)
  81. def _init_notify_out(self, datasrc_file):
  82. '''Get all the zones name and its notify target's address
  83. TODO, currently the zones are got by going through the zone
  84. table in database. There should be a better way to get them
  85. and also the setting 'also_notify', and there should be one
  86. mechanism to cover the changed datasrc.'''
  87. self._db_file = datasrc_file
  88. for zone_name, zone_class in sqlite3_ds.get_zones_info(datasrc_file):
  89. self._notify_infos[zone_name] = ZoneNotifyInfo(zone_name, zone_class)
  90. slaves = self._get_notify_slaves_from_ns(zone_name)
  91. for item in slaves:
  92. self._notify_infos[zone_name]._notify_slaves.append((item, 53))
  93. def _get_rdata_data(self, rr):
  94. return rr[7].strip()
  95. def _get_notify_slaves_from_ns(self, zone_name):
  96. '''The simplest way to get the address of slaves, but now correct.
  97. TODO. the function should be provided by one library.'''
  98. ns_rrset = sqlite3_ds.get_zone_rrset(zone_name, zone_name, 'NS', self._db_file)
  99. soa_rrset = sqlite3_ds.get_zone_rrset(zone_name, zone_name, 'SOA', self._db_file)
  100. ns_rr_name = []
  101. for ns in ns_rrset:
  102. ns_rr_name.append(self._get_rdata_data(ns))
  103. sname = (soa_rrset[0][7].split(' '))[0].strip() #TODO, bad hardcode to get rdata part
  104. if sname in ns_rr_name:
  105. ns_rr_name.remove(sname)
  106. addr_list = []
  107. for rr_name in ns_rr_name:
  108. a_rrset = sqlite3_ds.get_zone_rrset(zone_name, rr_name, 'A', self._db_file)
  109. aaaa_rrset = sqlite3_ds.get_zone_rrset(zone_name, rr_name, 'AAAA', self._db_file)
  110. for rr in a_rrset:
  111. addr_list.append(self._get_rdata_data(rr))
  112. for rr in aaaa_rrset:
  113. addr_list.append(self._get_rdata_data(rr))
  114. return addr_list
  115. def send_notify(self, zone_name):
  116. if zone_name[len(zone_name) - 1] != '.':
  117. zone_name += '.'
  118. if zone_name not in self._notify_infos:
  119. return
  120. with self._lock:
  121. if (self.notify_num >= _MAX_NOTIFY_NUM) or (zone_name in self._notifying_zones):
  122. if zone_name not in self._waiting_zones:
  123. self._waiting_zones.append(zone_name)
  124. else:
  125. self._notify_infos[zone_name].prepare_notify_out()
  126. self.notify_num += 1
  127. self._notifying_zones.append(zone_name)
  128. def _wait_for_notify_reply(self):
  129. '''receive notify replies in specified time. returned value
  130. is one tuple:(replied_zones, not_replied_zones)
  131. replied_zones: the zones which receive notify reply.
  132. not_replied_zones: the zones which haven't got notify reply.
  133. '''
  134. valid_socks = []
  135. notifying_zones = {}
  136. min_timeout = time.time()
  137. for info in self._notify_infos:
  138. sock = self._notify_infos[info].get_socket()
  139. if sock:
  140. valid_socks.append(sock)
  141. notifying_zones[info] = self._notify_infos[info]
  142. tmp_timeout = self._notify_infos[info].notify_timeout
  143. if min_timeout > tmp_timeout:
  144. min_timeout = tmp_timeout
  145. block_timeout = min_timeout - time.time()
  146. if block_timeout < 0:
  147. block_timeout = 0
  148. try:
  149. r_fds, w, e = select.select(valid_socks, [], [], block_timeout)
  150. except select.error as err:
  151. if err.args[0] != EINTR:
  152. return [], []
  153. not_replied_zones = {}
  154. replied_zones = {}
  155. for info in notifying_zones:
  156. if notifying_zones[info].get_socket() in r_fds:
  157. replied_zones[info] = notifying_zones[info]
  158. else:
  159. not_replied_zones[info] = notifying_zones[info]
  160. return replied_zones, not_replied_zones
  161. def _zone_notify_handler(self, zone_notify_info, event_type):
  162. tgt = zone_notify_info.get_current_notify_target()
  163. if event_type == _EVENT_READ:
  164. reply = self._get_notify_reply(zone_notify_info.get_socket(), tgt)
  165. if reply:
  166. if self._handle_notify_reply(zone_notify_info, reply):
  167. self._notify_next_target(zone_notify_info)
  168. elif event_type == _EVENT_TIMEOUT and zone_notify_info.notify_try_num > 0:
  169. self._log_msg('info', 'notify retry to %s' % addr_to_str(tgt))
  170. tgt = zone_notify_info.get_current_notify_target()
  171. if tgt:
  172. zone_notify_info.notify_try_num += 1
  173. if zone_notify_info.notify_try_num > _MAX_NOTIFY_TRY_NUM:
  174. self._log_msg('info', 'notify to %s: retried exceeded' % addr_to_str(tgt))
  175. self._notify_next_target(zone_notify_info)
  176. else:
  177. retry_timeout = _NOTIFY_TIMEOUT * pow(2, zone_notify_info.notify_try_num)
  178. # set exponential backoff according rfc1996 section 3.6
  179. zone_notify_info.notify_timeout = time.time() + retry_timeout
  180. self._send_notify_message_udp(zone_notify_info, tgt)
  181. def _notify_next_target(self, zone_notify_info):
  182. '''Notify next address for the same zone. If all the targets
  183. has been notified, notify the first zone in waiting list. '''
  184. zone_notify_info.notify_try_num = 0
  185. zone_notify_info.set_next_notify_target()
  186. tgt = zone_notify_info.get_current_notify_target()
  187. if not tgt:
  188. zone_notify_info.finish_notify_out()
  189. with self._lock:
  190. self.notify_num -= 1
  191. self._notifying_zones.remove(zone_notify_info.zone_name)
  192. # trigger notify out for waiting zones
  193. if len(self._waiting_zones) > 0:
  194. zone_name = self._waiting_zones.pop(0)
  195. self._notify_infos[zone_name].prepare_notify_out()
  196. self.notify_num += 1
  197. def _send_notify_message_udp(self, zone_notify_info, addrinfo):
  198. msg, qid = self._create_notify_message(zone_notify_info.zone_name,
  199. zone_notify_info.zone_class)
  200. render = MessageRenderer()
  201. render.set_length_limit(512)
  202. msg.to_wire(render)
  203. zone_notify_info.notify_msg_id = qid
  204. sock = zone_notify_info.get_socket()
  205. try:
  206. sock.sendto(render.get_data(), 0, addrinfo)
  207. self._log_msg('info', 'sending notify to %s' % addr_to_str(addrinfo))
  208. except socket.error as err:
  209. self._log_msg('error', 'send notify to %s failed: %s' % (addr_to_str(addrinfo), str(err)))
  210. return False
  211. return True
  212. def _create_rrset_from_db_record(self, record):
  213. '''Create one rrset from one record of datasource, if the schema of record is changed,
  214. This function should be updated first. TODO, the function is copied from xfrout, there
  215. should be library for creating one rrset. '''
  216. rrtype_ = RRType(record[5])
  217. rdata_ = Rdata(rrtype_, RRClass("IN"), " ".join(record[7:]))
  218. rrset_ = RRset(Name(record[2]), RRClass("IN"), rrtype_, RRTTL( int(record[4])))
  219. rrset_.add_rdata(rdata_)
  220. return rrset_
  221. def _create_notify_message(self, zone_name, zone_class):
  222. msg = Message(Message.RENDER)
  223. qid = random.randint(0, 0xFFFF)
  224. msg.set_qid(qid)
  225. msg.set_opcode(Opcode.NOTIFY())
  226. msg.set_rcode(Rcode.NOERROR())
  227. msg.set_header_flag(MessageFlag.AA())
  228. question = Question(Name(zone_name), RRClass(zone_class), RRType('SOA'))
  229. msg.add_question(question)
  230. # Add soa record to answer section
  231. soa_record = sqlite3_ds.get_zone_rrset(zone_name, zone_name, 'SOA', self._db_file)
  232. rrset_soa = self._create_rrset_from_db_record(soa_record[0])
  233. msg.add_rrset(Section.ANSWER(), rrset_soa)
  234. return msg, qid
  235. def _handle_notify_reply(self, zone_notify_info, msg_data):
  236. '''Parse the notify reply message.
  237. TODO, the error message should be refined properly.'''
  238. msg = Message(Message.PARSE)
  239. try:
  240. errstr = 'notify reply error: '
  241. msg.from_wire(msg_data)
  242. if (msg.get_rcode() != Rcode.NOERROR()):
  243. self._log_msg('error', errstr + 'bad rcode')
  244. return False
  245. if not msg.get_header_flag(MessageFlag.QR()):
  246. self._log_msg('error', errstr + 'bad flags')
  247. return False
  248. if msg.get_qid() != zone_notify_info.notify_msg_id:
  249. self._log_msg('error', errstr + 'bad query ID')
  250. return False
  251. if msg.get_opcode != Opcode.NOTIFY():
  252. self._log_msg('error', errstr + 'bad opcode')
  253. return False
  254. except Exception as err:
  255. # We don't care what exception, just report it?
  256. self._log_msg('error', errstr + str(err))
  257. return False
  258. return True
  259. def _get_notify_reply(self, sock, tgt_addr):
  260. try:
  261. msg, addr = sock.recvfrom(512)
  262. except socket.error:
  263. self._log_msg('error', "notify to %s failed: can't read notify reply" % addr_to_str(tgt_addr))
  264. return None
  265. return msg
  266. def _log_msg(self, level, msg):
  267. if self._log:
  268. self._log.log_message(level, msg)