Browse Source

added first version of stats-collector

git-svn-id: svn://bind10.isc.org/svn/bind10/branches/parkinglot@621 e5f2f494-b856-4b98-b285-d166d9295462
Kazunori Fujiwara 15 years ago
parent
commit
f249a0caa3

+ 0 - 76
src/bin/statistics-test/collector.py

@@ -1,76 +0,0 @@
-#!/usr/bin/python
-#
-# This program collects "counter" from "statistics" channel.
-# It accepts one command: "Boss" group "shutdown"
-
-import ISC
-import time
-import select
-import sys
-import os
-
-step_time = 1
-statgroup = "statistics"
-
-cc = ISC.CC.Session()
-# print (cc.lname)
-cc.group_subscribe(statgroup)
-cc.group_subscribe("Boss")
-
-f = open('statistics.out', 'a')
-sys.stdout = f
-
-server_by_name = {}
-server_by_id = []
-counter = []
-timestamp = []
-servers = 0
-delta_sum = 0
-received = 0
-output_time = -1
-output_delta = -1
-
-sent = -1
-last_sent = -1
-loop = 0
-while 1:
-    wait = sent + step_time - time.time()
-    if wait <= 0:
-        sent = time.time();
-        command = { "command": "getstat", "sent":time.time() }
-        #print ("loop=", loop, "    SEND: ", command)
-        cc.group_sendmsg(command, "statistics")
-        wait = last_sent + step_time - time.time()
-        if wait < 0:
-            wait = step_time
-        loop += 1
-        delta_sum = -1
-        received = 0
-    r,w,e = select.select([cc._socket],[],[], wait)
-    for sock in r:
-        if sock == cc._socket:
-            data,envelope = cc.group_recvmsg(False)
-            if (envelope["group"] == "Boss"):
-                if ("shutdown" in data):
-                    exit()
-            if (envelope["group"] == statgroup):
-                if (envelope["from"] in server_by_name):
-                    id = server_by_name[envelope["from"]]
-                    delta_t = float(data["sent"]) - float(timestamp[id])
-                    delta_c = float(data["counter"]) - float(counter[id])
-                    print ("server",id,"  time=",data["sent"], end=" "),
-                    print ("counter=", data["counter"], end=" ")
-                    print ("dT=", delta_t, "  dC=", delta_c)
-                    sys.stdout.flush()
-                    timestamp[id] = data["sent"]
-                    counter[id] = data["counter"]
-#
-                    if (timestamp[id] == sent):
-                        received += 1
-                else:
-                    print ("server ", servers, " name ", envelope["from"])
-                    server_by_id.append(envelope["from"])
-                    server_by_name[envelope["from"]] = len(server_by_id) - 1
-                    counter.append(data["counter"])
-                    timestamp.append(data["sent"])
-                    servers += 1

+ 0 - 42
src/bin/statistics-test/respondent.py

@@ -1,42 +0,0 @@
-#!/usr/bin/python
-
-# This program acts statistics respondent.
-# It has pseudo "counter" which is incremented each 0.3 second and C-channel query.
-# Two command is available
-#   "statistics" group: "getstat"
-#   "Boss"       group: "shutdown"
-
-import ISC
-import select
-import random
-
-statgroup = "statistics"
-
-cc = ISC.CC.Session()
-print (cc.lname)
-cc.group_subscribe(statgroup)
-cc.group_subscribe("Boss")
-
-counter = 0
-
-while 1:
-    r,w,e = select.select([cc._socket],[],[],1)
-    for sock in r:
-        if sock == cc._socket:
-            data,envelope = cc.group_recvmsg(False);
-            if (envelope["group"] == statgroup):
-                if (data["command"] == "getstat"):
-                    cc.group_reply(envelope,
-                                   {
-                                    "response":data["command"],
-                                    "sent": data["sent"],
-                                    "counter":counter
-                                   })
-                # Do another statistics command
-            elif (envelope["group"] == "Boss"):
-                if ("shutdown" in data):
-                    exit()
-            # Do another group
-         # Do another socket
-    # Do main work
-    counter += random.randrange(1,100)

+ 179 - 0
src/bin/stats-collector/b10-stats-collector.py

@@ -0,0 +1,179 @@
+#!/usr/bin/python
+#
+# This program collects "counters" from "statistics" channel.
+# It accepts one command: "Boss" group "shutdown"
+
+import ISC.CC
+import time
+import select
+import os
+
+bossgroup = "Boss"
+
+def total(s):
+    def totalsub(d,s):
+        for k in s.keys():
+            if (k == 'component' or k == 'version' 
+                or k == 'timestamp' or k == 'from'):
+                continue
+            if (k in d):
+                if (isinstance(s[k], dict)):
+                    totalsub(d[k], s[k])
+                else:
+                    d[k] = s[k] + d[k]
+            else:
+                d[k] = s[k]
+
+    if (len(s) == 0):
+        return {}
+    if (len(s) == 1):
+        for k in s.keys():
+            out = s[k]
+        out['components'] = 1
+        out['timestamp2'] = out['timestamp']
+        del out['from']
+        return out
+    _time1 = 0
+    _time2 = 0
+    out = {}
+    for i in s.values():
+        if (_time1 == 0 or _time1 < i['timestamp']):
+            _time1 = i['timestamp']
+        if (_time2 == 0 or _time2 > i['timestamp']):
+            _time2 = i['timestamp']
+        totalsub(out, i)
+    out['components'] = len(s)
+    out['timestamp'] = _time1;
+    out['timestamp2'] = _time2;
+    return out
+
+def dicttoxml(stats):
+    def dicttoxmlsub(s, level):
+        output = ''
+        spaces = '  ' * level
+        for k in s.keys():
+            if (isinstance(s[k], dict)):
+                output += spaces + ('<%s>\n' %k) + dicttoxmlsub(s[k], level+1) + spaces + '</%s>\n' %k
+            else:
+                output += spaces + '<%s>%s</%s>\n' %(k, s[k], k)
+        return output
+
+    for k in stats.keys():
+        output = '<component="%s">\n' % k
+        s = stats[k]
+        if ('component' in s or 'components' in s):
+            output = output + dicttoxmlsub(s, 1)
+        else:
+            for l in s.keys():
+                output = output + '  <from="%s">\n' %l + dicttoxmlsub(s[l], 2) + "  </from>\n"
+        output = output + "</component>\n"
+        return output
+
+def dump_stats(statpath, statcount, stat):
+    newfile = open(statpath + '.new', 'w')
+    newfile.write(dicttoxml(stat))
+    newfile.close()
+    loop = statcount
+    while(loop > 0):
+        old = statpath + '.%d'%loop
+        loop -= 1
+        new = statpath + '.%d'%loop
+        if (os.access(new, os.F_OK)):
+            os.rename(new, old)
+    if (os.access(statpath, os.F_OK)):
+        os.rename(statpath, new)
+    os.rename(statpath + '.new', statpath)
+
+def collector(statgroup,step,statpath,statcount):
+    cc = ISC.CC.Session()
+    print (cc.lname)
+    cc.group_subscribe(statgroup)
+    cc.group_subscribe(bossgroup)
+    wrote_time = -1
+    last_wrote_time = -1
+    last_recvd_time = -1
+    stats = {}
+    statstotal = {}
+    while 1:
+        wait = wrote_time + step - time.time()
+        if wait <= 0 and last_recvd_time > wrote_time:
+            dump_stats(statpath, statcount, statstotal)
+            last_wrote_time = wrote_time;
+            wrote_time = time.time();
+            wait = last_wrote_time + step - time.time()
+            if wait < 0:
+                wait = step
+        r,w,e = select.select([cc._socket],[],[], wait)
+        for sock in r:
+            if sock == cc._socket:
+                data,envelope = cc.group_recvmsg(False)
+                if (envelope["group"] == "Boss"):
+                    if ("shutdown" in data):
+                        exit()
+                if (envelope["group"] == statgroup):
+                    # Check received data
+                    if (not('component' in data and 'version' in data
+                        and 'stats' in data)):
+                        continue
+                    component = data['component']
+                    _from = envelope['from']
+                    data['from'] = _from
+                    if (not (component in stats)):
+                        stats[component] = {}
+                    (stats[component])[_from] = data;
+                    statstotal[component] = total(stats[component])
+                    last_recvd_time = time.time()
+                    #print (stats)
+                    #print (statstotal)
+                    #print (dicttoxml(statstotal))
+
+def test_total():
+    stats = {
+              'auth': {
+                     'from1': {
+                               'component':'auth',
+                               'version':1,
+                               'from':'from1',
+                               'timestamp':20100125,
+                               'stats': {
+                                   'AUTH': {
+                                       'counterid': 1,
+                                       'requestv4': 2,
+                                       'requestv6': 4,
+                                   },
+                                   'SYS': {
+                                       'sockets': 8,
+                                       'memory': 16,
+                                   },
+                                },
+                     },
+                     'from2': {
+                               'component':'auth',
+                               'version':1,
+                               'from':'from1',
+                               'timestamp':20100126,
+                               'stats': {
+                                   'AUTH': {
+                                       'counterid': 256,
+                                       'requestv4': 512,
+                                       'requestv6': 1024,
+                                   },
+                                   'SYS': {
+                                       'sockets': 2048,
+                                       'memory': 4096,
+                                   },
+                                },
+                     },
+              },
+            };
+    t = {}
+    for key in stats:
+        t[key] = total(stats[key])
+    print (stats)
+    print (dicttoxml(stats))
+    print (t)
+    print (dicttoxml(t))
+
+if __name__ == "__main__":
+    collector('statistics', 10, '/tmp/stats', 100)
+    #test_total()

+ 1 - 1
src/bin/statistics-test/boss-shutdown.py

@@ -3,4 +3,4 @@
 import ISC
 import ISC
 cc = ISC.CC.Session()
 cc = ISC.CC.Session()
 cc.group_subscribe("Boss")
 cc.group_subscribe("Boss")
-cc.group_sendmsg({ "shutdown":None},"Boss")
+cc.group_sendmsg({ "command":"shutdown"},"Boss")

+ 57 - 0
src/bin/stats-collector/unittest/test-agent.py

@@ -0,0 +1,57 @@
+#!/usr/bin/python
+
+# This program acts statistics agent.
+# It has pseudo counters which is incremented each 10 second and
+# sends data to "statistics" channel periodically.
+# One command is available
+#   "Boss"       group: "shutdown"
+
+import ISC
+import time
+import select
+import random
+
+step_time = 10
+statgroup = "statistics"
+
+cc = ISC.CC.Session()
+print (cc.lname)
+#cc.group_subscribe(statgroup)
+cc.group_subscribe("Boss")
+
+# counters
+
+AUTH={}
+AUTH['counterid'] = 0
+AUTH['requestv4'] = 2
+AUTH['requestv6'] = 1
+SYS={}
+SYS['sockets'] = 0
+SYS['memory'] = 0
+
+sent = -1
+last_sent = -1
+loop = 0
+
+while 1:
+    wait = sent + step_time - time.time()
+    if wait <= 0:
+        last_sent = sent;
+        sent = time.time();
+        msg = {'component':'auth', 'version':1, 'timestamp':time.time(),'stats':{'AUTH':AUTH,'SYS':SYS}}
+        print (msg)
+        print (cc.group_sendmsg(msg, statgroup))
+        wait = last_sent + step_time - time.time()
+        if wait < 0:
+            wait = step_time
+        loop += 1
+    r,w,e = select.select([cc._socket],[],[], wait)
+    for sock in r:
+        if sock == cc._socket:
+            data,envelope = cc.group_recvmsg(False)
+            print (data)
+            if (envelope["group"] == "Boss"):
+                if ("shutdown" in data):
+                    exit()
+            else:
+                print ("Unknown data: ", envelope,data)