123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185 |
- # Copyright (C) 2013 Internet Systems Consortium.
- #
- # Permission to use, copy, modify, and distribute this software for any
- # purpose with or without fee is hereby granted, provided that the above
- # copyright notice and this permission notice appear in all copies.
- #
- # THE SOFTWARE IS PROVIDED "AS IS" AND INTERNET SYSTEMS CONSORTIUM
- # DISCLAIMS ALL WARRANTIES WITH REGARD TO THIS SOFTWARE INCLUDING ALL
- # IMPLIED WARRANTIES OF MERCHANTABILITY AND FITNESS. IN NO EVENT SHALL
- # INTERNET SYSTEMS CONSORTIUM BE LIABLE FOR ANY SPECIAL, DIRECT,
- # INDIRECT, OR CONSEQUENTIAL DAMAGES OR ANY DAMAGES WHATSOEVER RESULTING
- # FROM LOSS OF USE, DATA OR PROFITS, WHETHER IN AN ACTION OF CONTRACT,
- # NEGLIGENCE OR OTHER TORTIOUS ACTION, ARISING OUT OF OR IN CONNECTION
- # WITH THE USE OR PERFORMANCE OF THIS SOFTWARE.
- import json
- from isc.datasrc import ConfigurableClientList
- from isc.memmgr.datasrc_info import SegmentInfo
- from isc.log_messages.libmemmgr_messages import *
- from isc.memmgr.logger import logger
- class MemorySegmentBuilder:
- """The builder runs in a different thread in the memory manager. It
- waits for commands from the memory manager, and then executes them
- in the given order sequentially.
- """
- def __init__(self, sock, cv, command_queue, response_queue):
- """ The constructor takes the following arguments:
- sock: A socket using which this builder object notifies the
- main thread that it has a response waiting for it.
- cv: A condition variable object that is used by the main
- thread to tell this builder object that new commands are
- available to it. Note that this is also used for
- synchronizing access to the queues, so code that uses
- MemorySegmentBuilder must use this condition variable's
- lock object to synchronize its access to the queues.
- command_queue: A list of commands sent by the main thread to
- this object. Commands should be executed
- sequentially in the given order by this
- object.
- response_queue: A list of responses sent by this object to
- the main thread. The format of this is
- currently not strictly defined. Future
- tickets will be able to define it based on
- how it's used.
- """
- self._sock = sock
- self._cv = cv
- self._command_queue = command_queue
- self._response_queue = response_queue
- self._shutdown = False
- def __handle_shutdown(self):
- # This method is called when handling the 'shutdown' command. The
- # following tuple is passed:
- #
- # ('shutdown',)
- self._shutdown = True
- def __handle_bad_command(self, bad_command):
- # A bad command was received. Raising an exception is not useful
- # in this case as we are likely running in a different thread
- # from the main thread which would need to be notified. Instead
- # return this in the response queue.
- logger.error(LIBMEMMGR_BUILDER_BAD_COMMAND_ERROR, bad_command)
- self._response_queue.append(('bad_command',))
- self._shutdown = True
- def __handle_load(self, zone_name, dsrc_info, rrclass, dsrc_name):
- # This method is called when handling the 'load' command. The
- # following tuple is passed:
- #
- # ('load', zone_name, dsrc_info, rrclass, dsrc_name)
- #
- # where:
- #
- # * zone_name is None or isc.dns.Name, specifying the zone name
- # to load. If it's None, it means all zones to be cached in
- # the specified data source (used for initialization).
- #
- # * dsrc_info is a DataSrcInfo object corresponding to the
- # generation ID of the set of data sources for this loading.
- #
- # * rrclass is an isc.dns.RRClass object, the RR class of the
- # data source.
- #
- # * dsrc_name is a string, specifying a data source name.
- clist = dsrc_info.clients_map[rrclass]
- sgmt_info = dsrc_info.segment_info_map[(rrclass, dsrc_name)]
- params = json.dumps(sgmt_info.get_reset_param(SegmentInfo.WRITER))
- clist.reset_memory_segment(dsrc_name,
- ConfigurableClientList.READ_WRITE,
- params)
- if zone_name is not None:
- zones = [(None, zone_name)]
- else:
- zones = clist.get_zone_table_accessor(dsrc_name, True)
- for _, zone_name in zones:
- catch_load_error = (zone_name is None) # install empty zone initially
- result, writer = clist.get_cached_zone_writer(zone_name, catch_load_error,
- dsrc_name)
- if result != ConfigurableClientList.CACHE_STATUS_ZONE_SUCCESS:
- logger.error(LIBMEMMGR_BUILDER_GET_ZONE_WRITER_ERROR, zone_name, dsrc_name)
- continue
- try:
- error = writer.load()
- if error is not None:
- logger.error(LIBMEMMGR_BUILDER_ZONE_WRITER_LOAD_1_ERROR, zone_name, dsrc_name, error)
- continue
- except Exception as e:
- logger.error(LIBMEMMGR_BUILDER_ZONE_WRITER_LOAD_2_ERROR, zone_name, dsrc_name, str(e))
- continue
- writer.install()
- writer.cleanup()
- # need to reset the segment so readers can read it (note: memmgr
- # itself doesn't have to keep it open, but there's currently no
- # public API to just clear the segment)
- clist.reset_memory_segment(dsrc_name,
- ConfigurableClientList.READ_ONLY,
- params)
- self._response_queue.append(('load-completed', dsrc_info, rrclass,
- dsrc_name))
- def run(self):
- """ This is the method invoked when the builder thread is
- started. In this thread, be careful when modifying
- variables passed-by-reference in the constructor. If they
- are reassigned, they will not refer to the main thread's
- objects any longer. Any use of command_queue and
- response_queue must be synchronized by acquiring the lock in
- the condition variable. This method must normally terminate
- only when the 'shutdown' command is sent to it.
- """
- # Acquire the condition variable while running the loop.
- with self._cv:
- while not self._shutdown:
- while not self._command_queue:
- self._cv.wait()
- # Move the queue content to a local queue. Be careful of
- # not making assignments to reference variables.
- local_command_queue = self._command_queue[:]
- del self._command_queue[:]
- # Run commands passed in the command queue sequentially
- # in the given order. For now, it only supports the
- # "shutdown" command, which just exits the thread.
- for command_tuple in local_command_queue:
- command = command_tuple[0]
- if command == 'load':
- # See the comments for __handle_load() for
- # details of the tuple passed to the "load"
- # command.
- _, zone_name, dsrc_info, rrclass, dsrc_name = command_tuple
- self.__handle_load(zone_name, dsrc_info, rrclass, dsrc_name)
- elif command == 'shutdown':
- self.__handle_shutdown()
- # When the shutdown command is received, we do
- # not process any further commands.
- break
- else:
- self.__handle_bad_command(command)
- # When a bad command is received, we do not
- # process any further commands.
- break
- # Notify (any main thread) on the socket about a
- # response. Otherwise, the main thread may wait in its
- # loop without knowing there was a problem.
- if self._response_queue:
- while self._sock.send(b'x') != 1:
- continue
|