# emacs: -*- mode: python; py-indent-offset: 4; indent-tabs-mode: t -*- # vi: set ft=python sts=4 ts=4 sw=4 noet : # This file is part of Fail2Ban. # # Fail2Ban is free software; you can redistribute it and/or modify # it under the terms of the GNU General Public License as published by # the Free Software Foundation; either version 2 of the License, or # (at your option) any later version. # # Fail2Ban is distributed in the hope that it will be useful, # but WITHOUT ANY WARRANTY; without even the implied warranty of # MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the # GNU General Public License for more details. # # You should have received a copy of the GNU General Public License # along with Fail2Ban; if not, write to the Free Software # Foundation, Inc., 51 Franklin Street, Fifth Floor, Boston, MA 02110-1301, USA. # Author: Cyril Jaquier # __author__ = "Cyril Jaquier" __copyright__ = "Copyright (c) 2004 Cyril Jaquier" __license__ = "GPL" from pickle import dumps, loads, HIGHEST_PROTOCOL import asynchat import asyncore import errno import fcntl import os import socket import sys import threading import traceback from .utils import Utils from ..protocol import CSPROTO from ..helpers import logging, getLogger, formatExceptionInfo # Gets the instance of the logger. logSys = getLogger(__name__) ## # Request handler class. # # This class extends asynchat in order to provide a request handler for # incoming query. class RequestHandler(asynchat.async_chat): def __init__(self, conn, transmitter): asynchat.async_chat.__init__(self, conn) self.__conn = conn self.__transmitter = transmitter self.__buffer = [] # Sets the terminator. self.set_terminator(CSPROTO.END) def __close(self): if self.__conn: conn = self.__conn self.__conn = None try: conn.shutdown(socket.SHUT_RDWR) conn.close() except socket.error: # pragma: no cover - normally unreachable pass def handle_close(self): self.__close() asynchat.async_chat.handle_close(self) def collect_incoming_data(self, data): #logSys.debug("Received raw data: " + str(data)) self.__buffer.append(data) # exception identifies deserialization errors (exception by load in pickle): class LoadError(Exception): pass ## # Handles a new request. # # This method is called once we have a complete request. def found_terminator(self): try: # Pop whole buffer message = self.__buffer self.__buffer = [] # Joins the buffer items. message = CSPROTO.EMPTY.join(message) # Closes the channel if close was received if message == CSPROTO.CLOSE: self.close_when_done() return # Deserialize try: message = loads(message) except Exception as e: logSys.error('PROTO-error: load message failed: %s', e, exc_info=logSys.getEffectiveLevel() 100: # pragma: no cover - normally unreachable if ( e.args[0] == errno.EMFILE # [Errno 24] Too many open files or sum(err_count.itervalues()) > 1000 ): logSys.critical("Too many errors - critical count reached %r", err_count) break ## # Asynchronous server class. # # This class extends asyncore and dispatches connection requests to # RequestHandler. class AsyncServer(asyncore.dispatcher): def __init__(self, transmitter): asyncore.dispatcher.__init__(self) self.__transmitter = transmitter self.__sock = "/var/run/fail2ban/fail2ban.sock" self.__init = False self.__active = False self.__errCount = {'accept': 0, 'listen': 0} self.onstart = None ## # Returns False as we only read the socket first. def writable(self): return False def handle_accept(self): try: conn, addr = self.accept() except Exception as e: # pragma: no cover self.__errCount['accept'] += 1 if self.__errCount['accept'] < 20: logSys.warning("Accept socket error: %s", e, exc_info=(self.__errCount['accept'] <= 1)) elif self.__errCount['accept'] == 20: logSys.error("Too many acceptor errors - stop logging errors") elif self.__errCount['accept'] > 100: if ( (isinstance(e, socket.error) and e.args[0] == errno.EMFILE) # [Errno 24] Too many open files or sum(self.__errCount.itervalues()) > 1000 ): logSys.critical("Too many errors - critical count reached %r", self.__errCount) self.stop() return if self.__errCount['accept']: self.__errCount['accept'] -= 1; AsyncServer.__markCloseOnExec(conn) # Creates an instance of the handler class to handle the # request/response on the incoming connection. RequestHandler(conn, self.__transmitter) ## # Starts the communication server. # # @param sock: socket file. # @param force: remove the socket file if exists. def start(self, sock, force, timeout=None, use_poll=False): self.__worker = threading.current_thread() self.__sock = sock # Remove socket if os.path.exists(sock): logSys.error("Fail2ban seems to be already running") if force: logSys.warning("Forcing execution of the server") self._remove_sock() else: raise AsyncServerException("Server already running") # Creates the socket. self.create_socket(socket.AF_UNIX, socket.SOCK_STREAM) self.set_reuse_addr() try: self.bind(sock) except Exception: # pragma: no cover raise AsyncServerException("Unable to bind socket %s" % self.__sock) AsyncServer.__markCloseOnExec(self.socket) self.listen(1) # Sets the init flag. self.__init = self.__loop = self.__active = True # Execute on start event (server ready): if self.onstart: self.onstart() # Event loop as long as active: loop(lambda: self.__loop, timeout=timeout, use_poll=use_poll, err_count=self.__errCount) self.__active = False # Cleanup all self.stop() def close(self): stopflg = False if self.__active: self.__loop = False # shutdown socket here: if self.socket: try: self.socket.shutdown(socket.SHUT_RDWR) except socket.error: # pragma: no cover - normally unreachable pass # close connection: asyncore.dispatcher.close(self) # If not the loop thread (stops self in handler), wait (a little bit) # for the server leaves loop, before remove socket if threading.current_thread() != self.__worker: Utils.wait_for(lambda: not self.__active, 1) stopflg = True # Remove socket (file) only if it was created: if self.__init and os.path.exists(self.__sock): self._remove_sock() logSys.debug("Removed socket file " + self.__sock) if stopflg: logSys.debug("Socket shutdown") self.__active = False ## # Stops the communication server. def stop_communication(self): if self.__transmitter: logSys.debug("Stop communication, shutdown") self.__transmitter = None ## # Stops the server. def stop(self): self.stop_communication() self.close() # better remains a method (not a property) since used as a callable for wait_for def isActive(self): return self.__active ## # Safe remove (in multithreaded mode): def _remove_sock(self): try: os.remove(self.__sock) except OSError as e: # pragma: no cover if e.errno != errno.ENOENT: raise ## # Marks socket as close-on-exec to avoid leaking file descriptors when # running actions involving command execution. # @param sock: socket file. @staticmethod def __markCloseOnExec(sock): fd = sock.fileno() flags = fcntl.fcntl(fd, fcntl.F_GETFD) fcntl.fcntl(fd, fcntl.F_SETFD, flags|fcntl.FD_CLOEXEC) ## # AsyncServerException is used to wrap communication exceptions. class AsyncServerException(Exception): pass