Parcourir la source

[2582] Perform some locking of msgq

Not that it would be obvious what should be locked, mostly to be on the
safe side.
Michal 'vorner' Vaner il y a 12 ans
Parent
commit
a7872f60cb
1 fichiers modifiés avec 50 ajouts et 31 suppressions
  1. 50 31
      src/bin/msgq/msgq.py.in

+ 50 - 31
src/bin/msgq/msgq.py.in

@@ -170,6 +170,13 @@ class MsgQ:
         self.running = False
         self.__cfgmgr_ready = None
         self.__cfgmgr_ready_cond = threading.Condition()
+        # A lock used when the message queue does anything more complicated.
+        # It is mostly a safety measure, the threads doing so should be mostly
+        # independent, and the one with config session should be read only,
+        # but with threads, one never knows. We use threads for concurrency,
+        # not for performance, so we use wide lock scopes to be on the safe
+        # side.
+        self.__lock = threading.Lock()
 
     def cfgmgr_ready(self, ready=True):
         """Notify that the config manager is either subscribed, or
@@ -577,20 +584,21 @@ class MsgQ:
                 else:
                     logger.fatal(MSGQ_POLL_ERR, err)
                     break
-            for (fd, event) in events:
-                if fd == self.listen_socket.fileno():
-                    self.process_accept()
-                elif fd == self.__poller_sock.fileno():
-                    # If it's the signal socket, we should terminate now.
-                    self.running = False
-                    break
-                else:
-                    if event & select.POLLOUT:
-                        self.__process_write(fd)
-                    elif event & select.POLLIN:
-                        self.process_socket(fd)
+            with self.__lock:
+                for (fd, event) in events:
+                    if fd == self.listen_socket.fileno():
+                        self.process_accept()
+                    elif fd == self.__poller_sock.fileno():
+                        # If it's the signal socket, we should terminate now.
+                        self.running = False
+                        break
                     else:
-                        logger.error(MSGQ_POLL_UNKNOWN_EVENT, fd, event)
+                        if event & select.POLLOUT:
+                            self.__process_write(fd)
+                        elif event & select.POLLIN:
+                            self.process_socket(fd)
+                        else:
+                            logger.error(MSGQ_POLL_UNKNOWN_EVENT, fd, event)
 
     def run_kqueue(self):
         while self.running:
@@ -600,22 +608,23 @@ class MsgQ:
             if not events:
                 raise RuntimeError('serve: kqueue returned no events')
 
-            for event in events:
-                if event.ident == self.listen_socket.fileno():
-                    self.process_accept()
-                elif event.ident == self.__poller_sock.fileno():
-                    # If it's the signal socket, we should terminate now.
-                    self.running = False
-                    break;
-                else:
-                    if event.filter == select.KQ_FILTER_WRITE:
-                        self.__process_write(event.ident)
-                    if event.filter == select.KQ_FILTER_READ and \
-                            event.data > 0:
-                        self.process_socket(event.ident)
-                    elif event.flags & select.KQ_EV_EOF:
-                        self.kill_socket(event.ident,
-                                         self.sockets[event.ident])
+            with self.__lock:
+                for event in events:
+                    if event.ident == self.listen_socket.fileno():
+                        self.process_accept()
+                    elif event.ident == self.__poller_sock.fileno():
+                        # If it's the signal socket, we should terminate now.
+                        self.running = False
+                        break;
+                    else:
+                        if event.filter == select.KQ_FILTER_WRITE:
+                            self.__process_write(event.ident)
+                        if event.filter == select.KQ_FILTER_READ and \
+                                event.data > 0:
+                            self.process_socket(event.ident)
+                        elif event.flags & select.KQ_EV_EOF:
+                            self.kill_socket(event.ident,
+                                             self.sockets[event.ident])
 
     def stop(self):
         # Signal it should terminate.
@@ -648,13 +657,23 @@ class MsgQ:
         """The configuration handler (run in a separate thread).
            Not tested, currently effectively empty.
         """
-        return isc.config.create_answer(0)
+        with self.__lock:
+            if not self.running:
+                return
+
+            # TODO: Any config handlig goes here.
+            return isc.config.create_answer(0)
 
     def command_handler(self, command, args):
         """The command handler (run in a separate thread).
            Not tested, currently effectively empty.
         """
-        return isc.config.create_answer(1, 'unknown command: ' + command)
+        with self.__lock:
+            if not self.running:
+                return
+
+            # TODO: Any commands go here
+            return isc.config.create_answer(1, 'unknown command: ' + command)
 
 # can signal handling and calling a destructor be done without a
 # global variable?