Browse Source

[2158] updated for counting xfr requests

- added the new class XfroutCounter
  It is intended for producing incrementers of the counters based on the
  statistics spec and for outputting statistics data format to passed to the
  b10-stats daemon.
- added counter handlers passed to the NotifyOut object and the XfroutSession
  object
- added handling of the getstats command
- added counting xfr requests in the XfroutSession class
- updated the UnixSockServer class for taking over the counter handlers
Naoki Kambe 12 years ago
parent
commit
9acbaae908
1 changed files with 148 additions and 9 deletions
  1. 148 9
      src/bin/xfrout/xfrout.py.in

+ 148 - 9
src/bin/xfrout/xfrout.py.in

@@ -153,7 +153,8 @@ def get_soa_serial(soa_rdata):
 
 class XfroutSession():
     def __init__(self, sock_fd, request_data, server, tsig_key_ring, remote,
-                 default_acl, zone_config, client_class=DataSourceClient):
+                 default_acl, zone_config, client_class=DataSourceClient,
+                 counter_xfrrej=None, counter_xfrreqdone=None):
         self._sock_fd = sock_fd
         self._request_data = request_data
         self._server = server
@@ -168,6 +169,14 @@ class XfroutSession():
         self.ClientClass = client_class # parameterize this for testing
         self._soa = None # will be set in _xfrout_setup or in tests
         self._jnl_reader = None # will be set to a reader for IXFR
+        # Set counter handlers for counting Xfr requests. An argument
+        # is required for zone name.
+        self._counter_xfrrej = lambda x: None
+        if hasattr(counter_xfrrej, '__call__'):
+            self._counter_xfrrej = counter_xfrrej
+        self._counter_xfrreqdone = lambda x: None
+        if hasattr(counter_xfrreqdone, '__call__'):
+            self._counter_xfrreqdone = counter_xfrreqdone
         self._handle()
 
     def create_tsig_ctx(self, tsig_record, tsig_key_ring):
@@ -270,6 +279,8 @@ class XfroutSession():
                          format_zone_str(zone_name, zone_class))
             return None, None
         elif acl_result == REJECT:
+            # count rejected Xfr request by each zone name
+            self._counter_xfrrej(zone_name.to_text())
             logger.debug(DBG_XFROUT_TRACE, XFROUT_QUERY_REJECTED,
                          self._request_type, format_addrinfo(self._remote),
                          format_zone_str(zone_name, zone_class))
@@ -525,6 +536,8 @@ class XfroutSession():
         except Exception as err:
             logger.error(XFROUT_XFR_TRANSFER_ERROR, self._request_typestr,
                     format_addrinfo(self._remote), zone_str, err)
+        # count done Xfr requests by each zone name
+        self._counter_xfrreqdone(zone_name.to_text())
         logger.info(XFROUT_XFR_TRANSFER_DONE, self._request_typestr,
                     format_addrinfo(self._remote), zone_str)
 
@@ -634,7 +647,7 @@ class UnixSockServer(socketserver_mixin.NoPollMixIn,
     '''The unix domain socket server which accept xfr query sent from auth server.'''
 
     def __init__(self, sock_file, handle_class, shutdown_event, config_data,
-                 cc):
+                 cc, **counters):
         self._remove_unused_sock_file(sock_file)
         self._sock_file = sock_file
         socketserver_mixin.NoPollMixIn.__init__(self)
@@ -644,6 +657,8 @@ class UnixSockServer(socketserver_mixin.NoPollMixIn,
         self._common_init()
         self._cc = cc
         self.update_config_data(config_data)
+        # handlers for statistics use
+        self._counters = counters
 
     def _common_init(self):
         '''Initialization shared with the mock server class used for tests'''
@@ -798,7 +813,8 @@ class UnixSockServer(socketserver_mixin.NoPollMixIn,
         self._lock.release()
         self.RequestHandlerClass(sock_fd, request_data, self,
                                  isc.server_common.tsig_keyring.get_keyring(),
-                                 self._guess_remote(sock_fd), acl, zone_config)
+                                 self._guess_remote(sock_fd), acl, zone_config,
+                                 **self._counters)
 
     def _remove_unused_sock_file(self, sock_file):
         '''Try to remove the socket file. If the file is being used
@@ -926,6 +942,109 @@ class UnixSockServer(socketserver_mixin.NoPollMixIn,
         self._transfers_counter -= 1
         self._lock.release()
 
+class XfroutCounter:
+    """A class for handling all statistics counters of Xfrout.  In
+    this class, the structure of per-zone counters is assumed to be
+    like this:
+        zones/example.com./notifyoutv4
+        zones/example.com./notifyoutv6
+        zones/example.com./xfrrej
+        zones/example.com./xfrreqdone
+    """
+    # '_SERVER_' is a special zone name representing an entire
+    # count. It doesn't mean a specific zone, but it means an
+    # entire count in the server.
+    entire_server = '_SERVER_'
+    # zone names are contained under this dirname in the spec file.
+    perzone_prefix = 'zones'
+    def __init__(self, statistics_spec):
+        self._statistics_spec = statistics_spec
+        # holding statistics data for Xfrout module
+        self._statistics_data = {}
+        self._lock = threading.RLock()
+        self._create_perzone_incrementers()
+
+    def get_statistics(self):
+        """Calculates an entire server counts, and returns statistics
+        data format to send out the stats module including each
+        counter. If there is no counts, then it returns an empty
+        dictionary. Locks the thread because it is considered to be
+        invoked by a multi-threading caller."""
+        # If self._statistics_data contains nothing of zone name, it
+        # returns an empty dict.
+        if len(self._statistics_data) == 0: return {}
+        zones = {}
+        with self._lock:
+            zones = self._statistics_data[self.perzone_prefix].copy()
+        # Start calculation for '_SERVER_' counts
+        attrs = self._get_default_statistics_data()[self.perzone_prefix][self.entire_server]
+        statistics_data = {self.perzone_prefix: {}}
+        for attr in attrs:
+            sum_ = 0
+            for name in zones:
+                if name == self.entire_server: continue
+                if attr in zones[name]:
+                    if  name not in statistics_data[self.perzone_prefix]:
+                        statistics_data[self.perzone_prefix][name] = {}
+                    statistics_data[self.perzone_prefix][name].update(
+                        {attr: zones[name][attr]}
+                        )
+                    sum_ += zones[name][attr]
+            if  sum_ > 0:
+                if self.entire_server not in statistics_data[self.perzone_prefix]:
+                    statistics_data[self.perzone_prefix][self.entire_server] = {}
+                statistics_data[self.perzone_prefix][self.entire_server].update({attr: sum_})
+        return statistics_data
+
+    def _get_default_statistics_data(self):
+        """Returns default statistics data from the spec file"""
+        statistics_data = {}
+        for id_ in isc.config.spec_name_list(self._statistics_spec):
+            spec = isc.config.find_spec_part(self._statistics_spec, id_)
+            statistics_data.update({id_: spec['item_default']})
+        return statistics_data
+
+    def _create_perzone_incrementers(self):
+        """Creates increment method of each per-zone counter based on
+        the spec file. Incrementer can be accessed by name
+        "inc_${item_name}".Incrementers are passed to the
+        XfroutSession and NotifyOut class as counter handlers."""
+        # add a new element under the named_set item for the zone
+        zones_spec = isc.config.find_spec_part(
+            self._statistics_spec, self.perzone_prefix)
+        item_list =  isc.config.spec_name_list(\
+            zones_spec['named_set_item_spec']['map_item_spec'])
+        # can be accessed by the name 'inc_xxx'
+        for item in item_list:
+            def __perzone_incrementer(zone_name, counter_name=item, step=1):
+                """A per-zone incrementer for counter_name. Locks the thread
+                because it is considered to be invoked by a multi-threading
+                caller."""
+                with self._lock:
+                    self._add_perzone_counter(zone_name)
+                    self._statistics_data[self.perzone_prefix][zone_name][counter_name] += step
+            #def __perzone_incrementer(zone_name, counter_name=item):
+            #    self._perzone_incrementer(zone_name, counter_name)
+            setattr(self, 'inc_%s' % item, __perzone_incrementer)
+
+
+    def _add_perzone_counter(self, zone):
+        """Adds named_set-type counter for each zone name"""
+        try:
+            self._statistics_data[self.perzone_prefix][zone]
+        except KeyError:
+            # add a new element under the named_set item for the zone
+            map_spec = isc.config.find_spec_part(
+                self._statistics_spec, '%s/%s' % \
+                    (self.perzone_prefix, zone))['map_item_spec']
+            id_list =  isc.config.spec_name_list(map_spec)
+            for id_ in id_list:
+                spec = isc.config.find_spec_part(map_spec, id_)
+                isc.cc.data.set(self._statistics_data,
+                                '%s/%s/%s' % \
+                                    (self.perzone_prefix, zone, id_),
+                                spec['item_default'])
+
 class XfroutServer:
     def __init__(self):
         self._unix_socket_server = None
@@ -933,6 +1052,8 @@ class XfroutServer:
         self._shutdown_event = threading.Event()
         self._cc = isc.config.ModuleCCSession(SPECFILE_LOCATION, self.config_handler, self.command_handler)
         self._config_data = self._cc.get_full_config()
+        self._counter = XfroutCounter(
+            self._cc.get_module_spec().get_statistics_spec())
         self._cc.start()
         self._cc.add_remote_config(AUTH_SPECFILE_LOCATION)
         isc.server_common.tsig_keyring.init_keyring(self._cc)
@@ -941,17 +1062,25 @@ class XfroutServer:
 
     def _start_xfr_query_listener(self):
         '''Start a new thread to accept xfr query. '''
-        self._unix_socket_server = UnixSockServer(self._listen_sock_file,
-                                                  XfroutSession,
-                                                  self._shutdown_event,
-                                                  self._config_data,
-                                                  self._cc)
+        self._unix_socket_server = UnixSockServer(
+            self._listen_sock_file,
+            XfroutSession,
+            self._shutdown_event,
+            self._config_data,
+            self._cc,
+            counter_xfrrej=self._counter.inc_xfrrej,
+            counter_xfrreqdone=self._counter.inc_xfrreqdone
+            )
         listener = threading.Thread(target=self._unix_socket_server.serve_forever)
         listener.start()
 
     def _start_notifier(self):
         datasrc = self._unix_socket_server.get_db_file()
-        self._notifier = notify_out.NotifyOut(datasrc)
+        self._notifier = notify_out.NotifyOut(
+            datasrc,
+            counter_notifyoutv4=self._counter.inc_notifyoutv4,
+            counter_notifyoutv6=self._counter.inc_notifyoutv6
+            )
         if 'also_notify' in self._config_data:
             for slave in self._config_data['also_notify']:
                 self._notifier.add_slave(slave['address'], slave['port'])
@@ -1027,6 +1156,16 @@ class XfroutServer:
             else:
                 answer = create_answer(1, "Bad command parameter:" + str(args))
 
+        # return statistics data to the stats daemon
+        elif cmd == "getstats":
+            # The log level is here set to debug in order to avoid
+            # that a log becomes too verbose. Because the b10-stats
+            # daemon is periodically asking to the b10-xfrout daemon.
+            logger.debug(DBG_XFROUT_TRACE, \
+                             XFROUT_RECEIVED_GETSTATS_COMMAND)
+            answer = isc.config.ccsession.create_answer(
+                0, self._counter.get_statistics())
+
         else:
             answer = create_answer(1, "Unknown command:" + str(cmd))