|
@@ -678,30 +678,40 @@ class UnixSockServer(socketserver_mixin.NoPollMixIn,
|
|
|
except socket.error:
|
|
|
logger.error(XFROUT_FETCH_REQUEST_ERROR)
|
|
|
return
|
|
|
+ self._select_loop(request)
|
|
|
+
|
|
|
+ def _select_loop(self, request_sock):
|
|
|
+ '''Main loop for a single session between xfrout and auth.
|
|
|
+
|
|
|
+ This is a dedicated subroutine of handle_request(), but is defined
|
|
|
+ as a separate "protected" method for the convenience of tests.
|
|
|
+ '''
|
|
|
|
|
|
# Check self._shutdown_event to ensure the real shutdown comes.
|
|
|
# Linux could trigger a spurious readable event on the _read_sock
|
|
|
# due to a bug, so we need perform a double check.
|
|
|
while not self._shutdown_event.is_set(): # Check if xfrout is shutdown
|
|
|
try:
|
|
|
- (rlist, wlist, xlist) = select.select([self._read_sock, request], [], [])
|
|
|
+ (rlist, wlist, xlist) = select.select([self._read_sock,
|
|
|
+ request_sock], [], [])
|
|
|
except select.error as e:
|
|
|
if e.args[0] == errno.EINTR:
|
|
|
(rlist, wlist, xlist) = ([], [], [])
|
|
|
continue
|
|
|
else:
|
|
|
- logger.error(XFROUT_SOCKET_SELECT_ERROR, str(e))
|
|
|
+ logger.error(XFROUT_SOCKET_SELECT_ERROR, e)
|
|
|
break
|
|
|
|
|
|
- # self.server._shutdown_event will be set by now, if it is not a false
|
|
|
- # alarm
|
|
|
+ # self.server._shutdown_event will be set by now, if it is not a
|
|
|
+ # false alarm
|
|
|
if self._read_sock in rlist:
|
|
|
continue
|
|
|
|
|
|
try:
|
|
|
- self.process_request(request)
|
|
|
+ if not self.process_request(request_sock):
|
|
|
+ break
|
|
|
except Exception as pre:
|
|
|
- logger.error(XFROUT_PROCESS_REQUEST_ERROR, str(pre))
|
|
|
+ logger.error(XFROUT_PROCESS_REQUEST_ERROR, pre)
|
|
|
break
|
|
|
|
|
|
def _handle_request_noblock(self):
|
|
@@ -713,26 +723,33 @@ class UnixSockServer(socketserver_mixin.NoPollMixIn,
|
|
|
|
|
|
def process_request(self, request):
|
|
|
"""Receive socket fd and query message from auth, then
|
|
|
- start a new thread to process the request."""
|
|
|
+ start a new thread to process the request.
|
|
|
+
|
|
|
+ Return: True if everything is okay; otherwise False, in which case
|
|
|
+ the calling thread will terminate.
|
|
|
+
|
|
|
+ """
|
|
|
sock_fd = recv_fd(request.fileno())
|
|
|
if sock_fd < 0:
|
|
|
- # This may happen when one xfrout process try to connect to
|
|
|
- # xfrout unix socket server, to check whether there is another
|
|
|
- # xfrout running.
|
|
|
- if sock_fd == FD_SYSTEM_ERROR:
|
|
|
- logger.error(XFROUT_RECEIVE_FILE_DESCRIPTOR_ERROR)
|
|
|
- return
|
|
|
+ logger.warn(XFROUT_RECEIVE_FILE_DESCRIPTOR_ERROR)
|
|
|
+ return False
|
|
|
|
|
|
- # receive request msg
|
|
|
+ # receive request msg. If it fails we simply terminate the thread;
|
|
|
+ # it might be possible to recover from this state, but it's more likely
|
|
|
+ # that auth and xfrout are in inconsistent states. So it will make
|
|
|
+ # more sense to restart in a new session.
|
|
|
request_data = self._receive_query_message(request)
|
|
|
- if not request_data:
|
|
|
- return
|
|
|
+ if request_data is None:
|
|
|
+ # The specific exception type doesn't matter so we use session
|
|
|
+ # error.
|
|
|
+ raise XfroutSessionError('Failed to get complete xfr request')
|
|
|
|
|
|
t = threading.Thread(target=self.finish_request,
|
|
|
- args = (sock_fd, request_data))
|
|
|
+ args=(sock_fd, request_data))
|
|
|
if self.daemon_threads:
|
|
|
t.daemon = True
|
|
|
t.start()
|
|
|
+ return True
|
|
|
|
|
|
def _guess_remote(self, sock_fd):
|
|
|
"""Guess remote address and port of the socket.
|