|
@@ -197,6 +197,28 @@ class MsgQ:
|
|
|
# side.
|
|
|
self.__lock = threading.Lock()
|
|
|
|
|
|
+ def members_notify(self, event, params):
|
|
|
+ """
|
|
|
+ Thin wrapper around ccs's notify. Send a notification about change
|
|
|
+ of some list that can be requested by the members command.
|
|
|
+
|
|
|
+ The event is either one of:
|
|
|
+ - connected (client connected to MsgQ)
|
|
|
+ - disconected (client disconnected from MsgQ)
|
|
|
+ - subscribed (client subscribed to a group)
|
|
|
+ - unsubscribed (client unsubscribed from a group)
|
|
|
+
|
|
|
+ The params is dict containing:
|
|
|
+ - client: The lname of the client in question.
|
|
|
+ - group (only the 3rd and 4th): The group the client subscribed
|
|
|
+ or unsubscribed from.
|
|
|
+
|
|
|
+ It is expected to happen after the event (so client subscribing for these
|
|
|
+ notifications gets a notification about itself, but not in the case
|
|
|
+ of unsubscribing).
|
|
|
+ """
|
|
|
+ # Empty for now.
|
|
|
+
|
|
|
def cfgmgr_ready(self, ready=True):
|
|
|
"""Notify that the config manager is either subscribed, or
|
|
|
that the msgq is shutting down and it won't connect, but
|
|
@@ -339,6 +361,8 @@ class MsgQ:
|
|
|
else:
|
|
|
self.add_kqueue_socket(newsocket)
|
|
|
|
|
|
+ self.members_notify('connected', {'client': lname})
|
|
|
+
|
|
|
def kill_socket(self, fd, sock):
|
|
|
"""Fully close down the socket."""
|
|
|
# Unregister events on the socket. Note that we don't have to do
|
|
@@ -356,6 +380,7 @@ class MsgQ:
|
|
|
if fd in self.sendbuffs:
|
|
|
del self.sendbuffs[fd]
|
|
|
logger.debug(TRACE_BASIC, MSGQ_SOCK_CLOSE, fd)
|
|
|
+ self.members_notify('disconnected', {'client': lname})
|
|
|
|
|
|
def __getbytes(self, fd, sock, length, continued):
|
|
|
"""Get exactly the requested bytes, or raise an exception if
|
|
@@ -647,6 +672,12 @@ class MsgQ:
|
|
|
if group == None or instance == None:
|
|
|
return # ignore invalid packets entirely
|
|
|
self.subs.subscribe(group, instance, sock)
|
|
|
+ lname = self.fd_to_lname[sock.fileno()]
|
|
|
+ self.members_notify('subscribed',
|
|
|
+ {
|
|
|
+ 'client': lname,
|
|
|
+ 'group': group
|
|
|
+ })
|
|
|
|
|
|
def process_command_unsubscribe(self, sock, routing, data):
|
|
|
group = routing[CC_HEADER_GROUP]
|
|
@@ -654,6 +685,12 @@ class MsgQ:
|
|
|
if group == None or instance == None:
|
|
|
return # ignore invalid packets entirely
|
|
|
self.subs.unsubscribe(group, instance, sock)
|
|
|
+ lname = self.fd_to_lname[sock.fileno()]
|
|
|
+ self.members_notify('unsubscribed',
|
|
|
+ {
|
|
|
+ 'client': lname,
|
|
|
+ 'group': group
|
|
|
+ })
|
|
|
|
|
|
def run(self):
|
|
|
"""Process messages. Forever. Mostly."""
|