Browse Source

[2222] updated classes: XfroutCounter, XfroutServer and XfroutSession

 - added incrementer/decrementer for the number of IXFR/AXFR running into
   the
   XfroutCounter class

 - added getter methods of counter handlers for the XfroutSession/NotifyOut
   class into the XfroutCounter class

 - inserted incrementer/decrementer handler for the number of AXFR/IXFR
   running into XfroutSession

 - removed the unnecessary thread lock from the get_statistics()

 - removed a commented-out code

 - updated copyright
Naoki Kambe 12 years ago
parent
commit
1c9fa8a0b9
1 changed files with 94 additions and 19 deletions
  1. 94 19
      src/bin/xfrout/xfrout.py.in

+ 94 - 19
src/bin/xfrout/xfrout.py.in

@@ -1,6 +1,6 @@
 #!@PYTHON@
 #!@PYTHON@
 
 
-# Copyright (C) 2010  Internet Systems Consortium.
+# Copyright (C) 2010-2012  Internet Systems Consortium.
 #
 #
 # Permission to use, copy, modify, and distribute this software for any
 # Permission to use, copy, modify, and distribute this software for any
 # purpose with or without fee is hereby granted, provided that the above
 # purpose with or without fee is hereby granted, provided that the above
@@ -154,7 +154,7 @@ def get_soa_serial(soa_rdata):
 class XfroutSession():
 class XfroutSession():
     def __init__(self, sock_fd, request_data, server, tsig_key_ring, remote,
     def __init__(self, sock_fd, request_data, server, tsig_key_ring, remote,
                  default_acl, zone_config, client_class=DataSourceClient,
                  default_acl, zone_config, client_class=DataSourceClient,
-                 counter_xfrrej=None, counter_xfrreqdone=None):
+                 **counters):
         self._sock_fd = sock_fd
         self._sock_fd = sock_fd
         self._request_data = request_data
         self._request_data = request_data
         self._server = server
         self._server = server
@@ -169,10 +169,12 @@ class XfroutSession():
         self.ClientClass = client_class # parameterize this for testing
         self.ClientClass = client_class # parameterize this for testing
         self._soa = None # will be set in _xfrout_setup or in tests
         self._soa = None # will be set in _xfrout_setup or in tests
         self._jnl_reader = None # will be set to a reader for IXFR
         self._jnl_reader = None # will be set to a reader for IXFR
-        # Set counter handlers for counting Xfr requests. An argument
-        # is required for zone name.
-        self._counter_xfrrej = counter_xfrrej
-        self._counter_xfrreqdone = counter_xfrreqdone
+        # Set counter handlers for counting Xfr requests or
+        # incrementing or decrementing Xfr running. An argument
+        # is required for zone name in counting Xfr requests.
+        for (k, v) in counters.items():
+            if 'counter_' in k or 'inc_' in k or 'dec_' in k:
+                setattr(self, "_%s" % k, v)
         self._handle()
         self._handle()
 
 
     def create_tsig_ctx(self, tsig_record, tsig_key_ring):
     def create_tsig_ctx(self, tsig_record, tsig_key_ring):
@@ -527,15 +529,25 @@ class XfroutSession():
             return self._reply_query_with_error_rcode(msg, sock_fd, rcode_)
             return self._reply_query_with_error_rcode(msg, sock_fd, rcode_)
 
 
         try:
         try:
+            # increment Xfr starts by RRType
+            if self._request_type == RRType.AXFR():
+                self._inc_axfr_running()
+            else:
+                self._inc_ixfr_running()
             logger.info(XFROUT_XFR_TRANSFER_STARTED, self._request_typestr,
             logger.info(XFROUT_XFR_TRANSFER_STARTED, self._request_typestr,
                         format_addrinfo(self._remote), zone_str)
                         format_addrinfo(self._remote), zone_str)
             self._reply_xfrout_query(msg, sock_fd)
             self._reply_xfrout_query(msg, sock_fd)
         except Exception as err:
         except Exception as err:
             logger.error(XFROUT_XFR_TRANSFER_ERROR, self._request_typestr,
             logger.error(XFROUT_XFR_TRANSFER_ERROR, self._request_typestr,
                     format_addrinfo(self._remote), zone_str, err)
                     format_addrinfo(self._remote), zone_str, err)
-        if self._counter_xfrreqdone is not None:
-            # count done Xfr requests by each zone name
-            self._counter_xfrreqdone(zone_name.to_text())
+        finally:
+            # decrement Xfr starts by RRType
+            if self._request_type == RRType.AXFR():
+                self._dec_axfr_running()
+            else:
+                self._dec_ixfr_running()
+        # count done Xfr requests by each zone name
+        self._counter_xfrreqdone(zone_name.to_text())
         logger.info(XFROUT_XFR_TRANSFER_DONE, self._request_typestr,
         logger.info(XFROUT_XFR_TRANSFER_DONE, self._request_typestr,
                     format_addrinfo(self._remote), zone_str)
                     format_addrinfo(self._remote), zone_str)
 
 
@@ -948,6 +960,8 @@ class XfroutCounter:
         zones/example.com./notifyoutv6
         zones/example.com./notifyoutv6
         zones/example.com./xfrrej
         zones/example.com./xfrrej
         zones/example.com./xfrreqdone
         zones/example.com./xfrreqdone
+        ixfr_running
+        axfr_running
     """
     """
     # '_SERVER_' is a special zone name representing an entire
     # '_SERVER_' is a special zone name representing an entire
     # count. It doesn't mean a specific zone, but it means an
     # count. It doesn't mean a specific zone, but it means an
@@ -959,8 +973,15 @@ class XfroutCounter:
         self._statistics_spec = statistics_spec
         self._statistics_spec = statistics_spec
         # holding statistics data for Xfrout module
         # holding statistics data for Xfrout module
         self._statistics_data = {}
         self._statistics_data = {}
+        self._counters_for_xfroutsession = {}
+        self._counters_for_notifyout = {}
+        self._xfrrunning_names = [ \
+            n for n in \
+                isc.config.spec_name_list(self._statistics_spec) \
+                if 'xfr_running' in n ]
         self._lock = threading.RLock()
         self._lock = threading.RLock()
         self._create_perzone_incrementers()
         self._create_perzone_incrementers()
+        self._create_xfrrunning_xxcrementers()
 
 
     def get_statistics(self):
     def get_statistics(self):
         """Calculates an entire server counts, and returns statistics
         """Calculates an entire server counts, and returns statistics
@@ -971,9 +992,9 @@ class XfroutCounter:
         # If self._statistics_data contains nothing of zone name, it
         # If self._statistics_data contains nothing of zone name, it
         # returns an empty dict.
         # returns an empty dict.
         if len(self._statistics_data) == 0: return {}
         if len(self._statistics_data) == 0: return {}
+        # for per-zone counter
         zones = {}
         zones = {}
-        with self._lock:
-            zones = self._statistics_data[self.perzone_prefix].copy()
+        zones = self._statistics_data[self.perzone_prefix]
         # Start calculation for '_SERVER_' counts
         # Start calculation for '_SERVER_' counts
         attrs = self._get_default_statistics_data()[self.perzone_prefix][self.entire_server]
         attrs = self._get_default_statistics_data()[self.perzone_prefix][self.entire_server]
         statistics_data = {self.perzone_prefix: {}}
         statistics_data = {self.perzone_prefix: {}}
@@ -991,7 +1012,12 @@ class XfroutCounter:
             if  sum_ > 0:
             if  sum_ > 0:
                 if self.entire_server not in statistics_data[self.perzone_prefix]:
                 if self.entire_server not in statistics_data[self.perzone_prefix]:
                     statistics_data[self.perzone_prefix][self.entire_server] = {}
                     statistics_data[self.perzone_prefix][self.entire_server] = {}
-                statistics_data[self.perzone_prefix][self.entire_server].update({attr: sum_})
+                statistics_data[self.perzone_prefix][self.entire_server]\
+                    .update({attr:sum_})
+        # for xfrrunning incrementer/decrementer
+        for name in self._xfrrunning_names:
+            if name in self._statistics_data:
+                statistics_data[name] = self._statistics_data[name]
         return statistics_data
         return statistics_data
 
 
     def _get_default_statistics_data(self):
     def _get_default_statistics_data(self):
@@ -1021,11 +1047,51 @@ class XfroutCounter:
                 with self._lock:
                 with self._lock:
                     self._add_perzone_counter(zone_name)
                     self._add_perzone_counter(zone_name)
                     self._statistics_data[self.perzone_prefix][zone_name][counter_name] += step
                     self._statistics_data[self.perzone_prefix][zone_name][counter_name] += step
-            setattr(self, 'inc_%s' % item, __perzone_incrementer)
-
+            if 'notifyout' in item:
+                self._counters_for_notifyout['counter_%s' % item] \
+                    = __perzone_incrementer
+            else:
+                self._counters_for_xfroutsession['counter_%s' % item] \
+                    = __perzone_incrementer
+
+    def _create_xfrrunning_xxcrementers(self):
+        """Creates increment/decrement method of (a|i)xfr_running
+        based on the spec file. Incrementer can be accessed by name
+        "inc_${item_name}". Decrementer can be accessed by name
+        "dec_${item_name}". Both of them are passed to the
+        XfroutSession as counter handlers."""
+        # can be accessed by the name 'inc_xxx' or 'dec_xxx'
+        for item in self._xfrrunning_names:
+            def __xfrrunning_incrementer(counter_name=item, step=1):
+                """A incrementer for axfr or ixfr running. Locks the thread
+                because it is considered to be invoked by a multi-threading
+                caller."""
+                with self._lock:
+                    self._add_xfrrunning_counter(counter_name)
+                    self._statistics_data[counter_name] += step
+            def __xfrrunning_decrementer(counter_name=item, step=-1):
+                """A decrementer for axfr or ixfr running. Locks the thread
+                because it is considered to be invoked by a multi-threading
+                caller."""
+                with self._lock:
+                    self._statistics_data[counter_name] += step
+            self._counters_for_xfroutsession['inc_%s' % item] \
+                = __xfrrunning_incrementer
+            self._counters_for_xfroutsession['dec_%s' % item] \
+                = __xfrrunning_decrementer
+
+    def get_counters_for_xfroutsession(self):
+        """Returns counters, incrementers, and decrementers to be
+        passed to XfroutSession/UnixSockServer class."""
+        return self._counters_for_xfroutsession
+
+    def get_counters_for_notifyout(self):
+        """Returns counters handlers to be passed to NotifyOut
+        class."""
+        return self._counters_for_notifyout
 
 
     def _add_perzone_counter(self, zone):
     def _add_perzone_counter(self, zone):
-        """Adds named_set-type counter for each zone name"""
+        """Adds a named_set-type counter for each zone name."""
         try:
         try:
             self._statistics_data[self.perzone_prefix][zone]
             self._statistics_data[self.perzone_prefix][zone]
         except KeyError:
         except KeyError:
@@ -1041,6 +1107,17 @@ class XfroutCounter:
                                     (self.perzone_prefix, zone, id_),
                                     (self.perzone_prefix, zone, id_),
                                 spec['item_default'])
                                 spec['item_default'])
 
 
+    def _add_xfrrunning_counter(self, counter_name):
+        """Adds a counter for counting (a|i)xfr_running"""
+        try:
+            self._statistics_data[counter_name]
+        except KeyError:
+            # examines the names of xfer running
+            for n in self._xfrrunning_names:
+                spec = isc.config.find_spec_part(self._statistics_spec, n)
+                isc.cc.data.set(self._statistics_data, n, \
+                                    spec['item_default'])
+
 class XfroutServer:
 class XfroutServer:
     def __init__(self):
     def __init__(self):
         self._unix_socket_server = None
         self._unix_socket_server = None
@@ -1064,8 +1141,7 @@ class XfroutServer:
             self._shutdown_event,
             self._shutdown_event,
             self._config_data,
             self._config_data,
             self._cc,
             self._cc,
-            counter_xfrrej=self._counter.inc_xfrrej,
-            counter_xfrreqdone=self._counter.inc_xfrreqdone
+            **self._counter.get_counters_for_xfroutsession()
             )
             )
         listener = threading.Thread(target=self._unix_socket_server.serve_forever)
         listener = threading.Thread(target=self._unix_socket_server.serve_forever)
         listener.start()
         listener.start()
@@ -1074,8 +1150,7 @@ class XfroutServer:
         datasrc = self._unix_socket_server.get_db_file()
         datasrc = self._unix_socket_server.get_db_file()
         self._notifier = notify_out.NotifyOut(
         self._notifier = notify_out.NotifyOut(
             datasrc,
             datasrc,
-            counter_notifyoutv4=self._counter.inc_notifyoutv4,
-            counter_notifyoutv6=self._counter.inc_notifyoutv6
+            **self._counter.get_counters_for_notifyout()
             )
             )
         if 'also_notify' in self._config_data:
         if 'also_notify' in self._config_data:
             for slave in self._config_data['also_notify']:
             for slave in self._config_data['also_notify']: