Browse Source

[2292] Send the notifications

Also, make sure we don't block long time on a function holding a mutex.
We use select for that.
Michal 'vorner' Vaner 12 years ago
parent
commit
64dec90474
1 changed files with 17 additions and 2 deletions
  1. 17 2
      src/bin/msgq/msgq.py.in

+ 17 - 2
src/bin/msgq/msgq.py.in

@@ -199,6 +199,7 @@ class MsgQ:
         # not for performance, so we use wide lock scopes to be on the safe
         # side.
         self.__lock = threading.Lock()
+        self._session = None
 
     def members_notify(self, event, params):
         """
@@ -220,7 +221,11 @@ class MsgQ:
         notifications gets a notification about itself, but not in the case
         of unsubscribing).
         """
-        # Empty for now.
+        # Due to the interaction between threads (and fear it might influence
+        # sending stuff), we test this method in msgq_run_test, instead of
+        # mocking the ccs.
+        if self._session: # Don't send before we have started up
+            self._session.notify('cc_members', event, params)
 
     def cfgmgr_ready(self, ready=True):
         """Notify that the config manager is either subscribed, or
@@ -924,13 +929,23 @@ if __name__ == "__main__":
                                                  msgq.command_handler,
                                                  None, True,
                                                  msgq.socket_file)
+            msgq._session = session
             session.start()
             # And we create a thread that'll just wait for commands and
             # handle them. We don't terminate the thread, we set it to
             # daemon. Once the main thread terminates, it'll just die.
             def run_session():
                 while True:
-                    session.check_command(False)
+                    # As the check_command has internal mutex that is shared
+                    # with sending part (which includes notify). So we don't
+                    # want to hold it long-term and block using select.
+                    fileno = session.get_socket().fileno()
+                    try:
+                        (reads, _, _) = select.select([fileno], [], [])
+                    except select.error as se:
+                        if se.args[0] != errno.EINTR:
+                            raise
+                    session.check_command(True)
             background_thread = threading.Thread(target=run_session)
             background_thread.daemon = True
             background_thread.start()