builder.py 8.4 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185
  1. # Copyright (C) 2013 Internet Systems Consortium.
  2. #
  3. # Permission to use, copy, modify, and distribute this software for any
  4. # purpose with or without fee is hereby granted, provided that the above
  5. # copyright notice and this permission notice appear in all copies.
  6. #
  7. # THE SOFTWARE IS PROVIDED "AS IS" AND INTERNET SYSTEMS CONSORTIUM
  8. # DISCLAIMS ALL WARRANTIES WITH REGARD TO THIS SOFTWARE INCLUDING ALL
  9. # IMPLIED WARRANTIES OF MERCHANTABILITY AND FITNESS. IN NO EVENT SHALL
  10. # INTERNET SYSTEMS CONSORTIUM BE LIABLE FOR ANY SPECIAL, DIRECT,
  11. # INDIRECT, OR CONSEQUENTIAL DAMAGES OR ANY DAMAGES WHATSOEVER RESULTING
  12. # FROM LOSS OF USE, DATA OR PROFITS, WHETHER IN AN ACTION OF CONTRACT,
  13. # NEGLIGENCE OR OTHER TORTIOUS ACTION, ARISING OUT OF OR IN CONNECTION
  14. # WITH THE USE OR PERFORMANCE OF THIS SOFTWARE.
  15. import json
  16. from isc.datasrc import ConfigurableClientList
  17. from isc.memmgr.datasrc_info import SegmentInfo
  18. from isc.log_messages.libmemmgr_messages import *
  19. from isc.memmgr.logger import logger
  20. class MemorySegmentBuilder:
  21. """The builder runs in a different thread in the memory manager. It
  22. waits for commands from the memory manager, and then executes them
  23. in the given order sequentially.
  24. """
  25. def __init__(self, sock, cv, command_queue, response_queue):
  26. """ The constructor takes the following arguments:
  27. sock: A socket using which this builder object notifies the
  28. main thread that it has a response waiting for it.
  29. cv: A condition variable object that is used by the main
  30. thread to tell this builder object that new commands are
  31. available to it. Note that this is also used for
  32. synchronizing access to the queues, so code that uses
  33. MemorySegmentBuilder must use this condition variable's
  34. lock object to synchronize its access to the queues.
  35. command_queue: A list of commands sent by the main thread to
  36. this object. Commands should be executed
  37. sequentially in the given order by this
  38. object.
  39. response_queue: A list of responses sent by this object to
  40. the main thread. The format of this is
  41. currently not strictly defined. Future
  42. tickets will be able to define it based on
  43. how it's used.
  44. """
  45. self._sock = sock
  46. self._cv = cv
  47. self._command_queue = command_queue
  48. self._response_queue = response_queue
  49. self._shutdown = False
  50. def __handle_shutdown(self):
  51. # This method is called when handling the 'shutdown' command. The
  52. # following tuple is passed:
  53. #
  54. # ('shutdown',)
  55. self._shutdown = True
  56. def __handle_bad_command(self, bad_command):
  57. # A bad command was received. Raising an exception is not useful
  58. # in this case as we are likely running in a different thread
  59. # from the main thread which would need to be notified. Instead
  60. # return this in the response queue.
  61. logger.error(LIBMEMMGR_BUILDER_BAD_COMMAND_ERROR, bad_command)
  62. self._response_queue.append(('bad_command',))
  63. self._shutdown = True
  64. def __handle_load(self, zone_name, dsrc_info, rrclass, dsrc_name):
  65. # This method is called when handling the 'load' command. The
  66. # following tuple is passed:
  67. #
  68. # ('load', zone_name, dsrc_info, rrclass, dsrc_name)
  69. #
  70. # where:
  71. #
  72. # * zone_name is None or isc.dns.Name, specifying the zone name
  73. # to load. If it's None, it means all zones to be cached in
  74. # the specified data source (used for initialization).
  75. #
  76. # * dsrc_info is a DataSrcInfo object corresponding to the
  77. # generation ID of the set of data sources for this loading.
  78. #
  79. # * rrclass is an isc.dns.RRClass object, the RR class of the
  80. # data source.
  81. #
  82. # * dsrc_name is a string, specifying a data source name.
  83. clist = dsrc_info.clients_map[rrclass]
  84. sgmt_info = dsrc_info.segment_info_map[(rrclass, dsrc_name)]
  85. params = json.dumps(sgmt_info.get_reset_param(SegmentInfo.WRITER))
  86. clist.reset_memory_segment(dsrc_name,
  87. ConfigurableClientList.READ_WRITE,
  88. params)
  89. if zone_name is not None:
  90. zones = [(None, zone_name)]
  91. else:
  92. zones = clist.get_zone_table_accessor(dsrc_name, True)
  93. for _, zone_name in zones:
  94. catch_load_error = (zone_name is None) # install empty zone initially
  95. result, writer = clist.get_cached_zone_writer(zone_name, catch_load_error,
  96. dsrc_name)
  97. if result != ConfigurableClientList.CACHE_STATUS_ZONE_SUCCESS:
  98. logger.error(LIBMEMMGR_BUILDER_GET_ZONE_WRITER_ERROR, zone_name, dsrc_name)
  99. continue
  100. try:
  101. error = writer.load()
  102. if error is not None:
  103. logger.error(LIBMEMMGR_BUILDER_ZONE_WRITER_LOAD_1_ERROR, zone_name, dsrc_name, error)
  104. continue
  105. except Exception as e:
  106. logger.error(LIBMEMMGR_BUILDER_ZONE_WRITER_LOAD_2_ERROR, zone_name, dsrc_name, str(e))
  107. continue
  108. writer.install()
  109. writer.cleanup()
  110. # need to reset the segment so readers can read it (note: memmgr
  111. # itself doesn't have to keep it open, but there's currently no
  112. # public API to just clear the segment)
  113. clist.reset_memory_segment(dsrc_name,
  114. ConfigurableClientList.READ_ONLY,
  115. params)
  116. self._response_queue.append(('load-completed', dsrc_info, rrclass,
  117. dsrc_name))
  118. def run(self):
  119. """ This is the method invoked when the builder thread is
  120. started. In this thread, be careful when modifying
  121. variables passed-by-reference in the constructor. If they
  122. are reassigned, they will not refer to the main thread's
  123. objects any longer. Any use of command_queue and
  124. response_queue must be synchronized by acquiring the lock in
  125. the condition variable. This method must normally terminate
  126. only when the 'shutdown' command is sent to it.
  127. """
  128. # Acquire the condition variable while running the loop.
  129. with self._cv:
  130. while not self._shutdown:
  131. while not self._command_queue:
  132. self._cv.wait()
  133. # Move the queue content to a local queue. Be careful of
  134. # not making assignments to reference variables.
  135. local_command_queue = self._command_queue[:]
  136. del self._command_queue[:]
  137. # Run commands passed in the command queue sequentially
  138. # in the given order. For now, it only supports the
  139. # "shutdown" command, which just exits the thread.
  140. for command_tuple in local_command_queue:
  141. command = command_tuple[0]
  142. if command == 'load':
  143. # See the comments for __handle_load() for
  144. # details of the tuple passed to the "load"
  145. # command.
  146. _, zone_name, dsrc_info, rrclass, dsrc_name = command_tuple
  147. self.__handle_load(zone_name, dsrc_info, rrclass, dsrc_name)
  148. elif command == 'shutdown':
  149. self.__handle_shutdown()
  150. # When the shutdown command is received, we do
  151. # not process any further commands.
  152. break
  153. else:
  154. self.__handle_bad_command(command)
  155. # When a bad command is received, we do not
  156. # process any further commands.
  157. break
  158. # Notify (any main thread) on the socket about a
  159. # response. Otherwise, the main thread may wait in its
  160. # loop without knowing there was a problem.
  161. if self._response_queue:
  162. while self._sock.send(b'x') != 1:
  163. continue