[Author Prev][Author Next][Thread Prev][Thread Next][Author Index][Thread Index]
[tor-commits] [stem/master] Copy "stem/socket.py" to "stem/async_socket.py"
commit bba3029f8d710b26cc0299ce086d31b28104c897
Author: Illia Volochii <illia.volochii@xxxxxxxxx>
Date: Sat Mar 14 21:57:20 2020 +0200
Copy "stem/socket.py" to "stem/async_socket.py"
---
stem/async_socket.py | 768 +++++++++++++++++++++++++++++++++++++++++++++++++++
1 file changed, 768 insertions(+)
diff --git a/stem/async_socket.py b/stem/async_socket.py
new file mode 100644
index 00000000..2ef42dd5
--- /dev/null
+++ b/stem/async_socket.py
@@ -0,0 +1,768 @@
+# Copyright 2011-2020, Damian Johnson and The Tor Project
+# See LICENSE for licensing information
+
+"""
+Supports communication with sockets speaking Tor protocols. This
+allows us to send messages as basic strings, and receive responses as
+:class:`~stem.response.ControlMessage` instances.
+
+**This module only consists of low level components, and is not intended for
+users.** See our `tutorials <../tutorials.html>`_ and `Control Module
+<control.html>`_ if you're new to Stem and looking to get started.
+
+With that aside, these can still be used for raw socket communication with
+Tor...
+
+::
+
+ import stem
+ import stem.connection
+ import stem.socket
+
+ if __name__ == '__main__':
+ try:
+ control_socket = stem.socket.ControlPort(port = 9051)
+ stem.connection.authenticate(control_socket)
+ except stem.SocketError as exc:
+ print 'Unable to connect to tor on port 9051: %s' % exc
+ sys.exit(1)
+ except stem.connection.AuthenticationFailure as exc:
+ print 'Unable to authenticate: %s' % exc
+ sys.exit(1)
+
+ print "Issuing 'GETINFO version' query...\\n"
+ control_socket.send('GETINFO version')
+ print control_socket.recv()
+
+::
+
+ % python example.py
+ Issuing 'GETINFO version' query...
+
+ version=0.2.4.10-alpha-dev (git-8be6058d8f31e578)
+ OK
+
+**Module Overview:**
+
+::
+
+ BaseSocket - Thread safe socket.
+ |- RelaySocket - Socket for a relay's ORPort.
+ | |- send - sends a message to the socket
+ | +- recv - receives a response from the socket
+ |
+ |- ControlSocket - Socket wrapper that speaks the tor control protocol.
+ | |- ControlPort - Control connection via a port.
+ | |- ControlSocketFile - Control connection via a local file socket.
+ | |
+ | |- send - sends a message to the socket
+ | +- recv - receives a ControlMessage from the socket
+ |
+ |- is_alive - reports if the socket is known to be closed
+ |- is_localhost - returns if the socket is for the local system or not
+ |- connection_time - timestamp when socket last connected or disconnected
+ |- connect - connects a new socket
+ |- close - shuts down the socket
+ +- __enter__ / __exit__ - manages socket connection
+
+ send_message - Writes a message to a control socket.
+ recv_message - Reads a ControlMessage from a control socket.
+ send_formatting - Performs the formatting expected from sent messages.
+"""
+
+from __future__ import absolute_import
+
+import re
+import socket
+import ssl
+import threading
+import time
+
+import stem.response
+import stem.util.str_tools
+
+from stem.util import log
+
+MESSAGE_PREFIX = re.compile(b'^[a-zA-Z0-9]{3}[-+ ]')
+ERROR_MSG = 'Error while receiving a control message (%s): %s'
+
+# lines to limit our trace logging to, you can disable this by setting it to None
+
+TRUNCATE_LOGS = 10
+
+
+class BaseSocket(object):
+ """
+ Thread safe socket, providing common socket functionality.
+ """
+
+ def __init__(self):
+ self._socket, self._socket_file = None, None
+ self._is_alive = False
+ self._connection_time = 0.0 # time when we last connected or disconnected
+
+ # Tracks sending and receiving separately. This should be safe, and doing
+ # so prevents deadlock where we block writes because we're waiting to read
+ # a message that isn't coming.
+
+ self._send_lock = threading.RLock()
+ self._recv_lock = threading.RLock()
+
+ def is_alive(self):
+ """
+ Checks if the socket is known to be closed. We won't be aware if it is
+ until we either use it or have explicitily shut it down.
+
+ In practice a socket derived from a port knows about its disconnection
+ after failing to receive data, whereas socket file derived connections
+ know after either sending or receiving data.
+
+ This means that to have reliable detection for when we're disconnected
+ you need to continually pull from the socket (which is part of what the
+ :class:`~stem.control.BaseController` does).
+
+ :returns: **bool** that's **True** if our socket is connected and **False**
+ otherwise
+ """
+
+ return self._is_alive
+
+ def is_localhost(self):
+ """
+ Returns if the connection is for the local system or not.
+
+ :returns: **bool** that's **True** if the connection is for the local host
+ and **False** otherwise
+ """
+
+ return False
+
+ def connection_time(self):
+ """
+ Provides the unix timestamp for when our socket was either connected or
+ disconnected. That is to say, the time we connected if we're currently
+ connected and the time we disconnected if we're not connected.
+
+ .. versionadded:: 1.3.0
+
+ :returns: **float** for when we last connected or disconnected, zero if
+ we've never connected
+ """
+
+ return self._connection_time
+
+ def connect(self):
+ """
+ Connects to a new socket, closing our previous one if we're already
+ attached.
+
+ :raises: :class:`stem.SocketError` if unable to make a socket
+ """
+
+ with self._send_lock:
+ # Closes the socket if we're currently attached to one. Once we're no
+ # longer alive it'll be safe to acquire the recv lock because recv()
+ # calls no longer block (raising SocketClosed instead).
+
+ if self.is_alive():
+ self.close()
+
+ with self._recv_lock:
+ self._socket = self._make_socket()
+ self._socket_file = self._socket.makefile(mode = 'rwb')
+ self._is_alive = True
+ self._connection_time = time.time()
+
+ # It's possible for this to have a transient failure...
+ # SocketError: [Errno 4] Interrupted system call
+ #
+ # It's safe to retry, so give it another try if it fails.
+
+ try:
+ self._connect()
+ except stem.SocketError:
+ self._connect() # single retry
+
+ def close(self):
+ """
+ Shuts down the socket. If it's already closed then this is a no-op.
+ """
+
+ with self._send_lock:
+ # Function is idempotent with one exception: we notify _close() if this
+ # is causing our is_alive() state to change.
+
+ is_change = self.is_alive()
+
+ if self._socket:
+ # if we haven't yet established a connection then this raises an error
+ # socket.error: [Errno 107] Transport endpoint is not connected
+
+ try:
+ self._socket.shutdown(socket.SHUT_RDWR)
+ except socket.error:
+ pass
+
+ self._socket.close()
+
+ if self._socket_file:
+ try:
+ self._socket_file.close()
+ except BrokenPipeError:
+ pass
+
+ self._socket = None
+ self._socket_file = None
+ self._is_alive = False
+ self._connection_time = time.time()
+
+ if is_change:
+ self._close()
+
+ def _send(self, message, handler):
+ """
+ Send message in a thread safe manner. Handler is expected to be of the form...
+
+ ::
+
+ my_handler(socket, socket_file, message)
+ """
+
+ with self._send_lock:
+ try:
+ if not self.is_alive():
+ raise stem.SocketClosed()
+
+ handler(self._socket, self._socket_file, message)
+ except stem.SocketClosed:
+ # if send_message raises a SocketClosed then we should properly shut
+ # everything down
+
+ if self.is_alive():
+ self.close()
+
+ raise
+
+ def _recv(self, handler):
+ """
+ Receives a message in a thread safe manner. Handler is expected to be of the form...
+
+ ::
+
+ my_handler(socket, socket_file)
+ """
+
+ with self._recv_lock:
+ try:
+ # makes a temporary reference to the _socket_file because connect()
+ # and close() may set or unset it
+
+ my_socket, my_socket_file = self._socket, self._socket_file
+
+ if not my_socket or not my_socket_file:
+ raise stem.SocketClosed()
+
+ return handler(my_socket, my_socket_file)
+ except stem.SocketClosed:
+ # If recv_message raises a SocketClosed then we should properly shut
+ # everything down. However, there's a couple cases where this will
+ # cause deadlock...
+ #
+ # * This SocketClosed was *caused by* a close() call, which is joining
+ # on our thread.
+ #
+ # * A send() call that's currently in flight is about to call close(),
+ # also attempting to join on us.
+ #
+ # To resolve this we make a non-blocking call to acquire the send lock.
+ # If we get it then great, we can close safely. If not then one of the
+ # above are in progress and we leave the close to them.
+
+ if self.is_alive():
+ if self._send_lock.acquire(False):
+ self.close()
+ self._send_lock.release()
+
+ raise
+
+ def _get_send_lock(self):
+ """
+ The send lock is useful to classes that interact with us at a deep level
+ because it's used to lock :func:`stem.socket.ControlSocket.connect` /
+ :func:`stem.socket.BaseSocket.close`, and by extension our
+ :func:`stem.socket.BaseSocket.is_alive` state changes.
+
+ :returns: **threading.RLock** that governs sending messages to our socket
+ and state changes
+ """
+
+ return self._send_lock
+
+ def __enter__(self):
+ return self
+
+ def __exit__(self, exit_type, value, traceback):
+ self.close()
+
+ def _connect(self):
+ """
+ Connection callback that can be overwritten by subclasses and wrappers.
+ """
+
+ pass
+
+ def _close(self):
+ """
+ Disconnection callback that can be overwritten by subclasses and wrappers.
+ """
+
+ pass
+
+ def _make_socket(self):
+ """
+ Constructs and connects new socket. This is implemented by subclasses.
+
+ :returns: **socket.socket** for our configuration
+
+ :raises:
+ * :class:`stem.SocketError` if unable to make a socket
+ * **NotImplementedError** if not implemented by a subclass
+ """
+
+ raise NotImplementedError('Unsupported Operation: this should be implemented by the BaseSocket subclass')
+
+
+class RelaySocket(BaseSocket):
+ """
+ `Link-level connection
+ <https://gitweb.torproject.org/torspec.git/tree/tor-spec.txt>`_ to a Tor
+ relay.
+
+ .. versionadded:: 1.7.0
+
+ :var str address: address our socket connects to
+ :var int port: ORPort our socket connects to
+ """
+
+ def __init__(self, address = '127.0.0.1', port = 9050, connect = True):
+ """
+ RelaySocket constructor.
+
+ :param str address: ip address of the relay
+ :param int port: orport of the relay
+ :param bool connect: connects to the socket if True, leaves it unconnected otherwise
+
+ :raises: :class:`stem.SocketError` if connect is **True** and we're
+ unable to establish a connection
+ """
+
+ super(RelaySocket, self).__init__()
+ self.address = address
+ self.port = port
+
+ if connect:
+ self.connect()
+
+ def send(self, message):
+ """
+ Sends a message to the relay's ORPort.
+
+ :param str message: message to be formatted and sent to the socket
+
+ :raises:
+ * :class:`stem.SocketError` if a problem arises in using the socket
+ * :class:`stem.SocketClosed` if the socket is known to be shut down
+ """
+
+ self._send(message, lambda s, sf, msg: _write_to_socket(sf, msg))
+
+ def recv(self, timeout = None):
+ """
+ Receives a message from the relay.
+
+ :param float timeout: maxiumum number of seconds to await a response, this
+ blocks indefinitely if **None**
+
+ :returns: bytes for the message received
+
+ :raises:
+ * :class:`stem.ProtocolError` the content from the socket is malformed
+ * :class:`stem.SocketClosed` if the socket closes before we receive a complete message
+ """
+
+ def wrapped_recv(s, sf):
+ if timeout is None:
+ return s.recv()
+ else:
+ s.setblocking(0)
+ s.settimeout(timeout)
+
+ try:
+ return s.recv()
+ except (socket.timeout, ssl.SSLError, ssl.SSLWantReadError):
+ return None
+ finally:
+ s.setblocking(1)
+
+ return self._recv(wrapped_recv)
+
+ def is_localhost(self):
+ return self.address == '127.0.0.1'
+
+ def _make_socket(self):
+ try:
+ relay_socket = socket.socket(socket.AF_INET, socket.SOCK_STREAM)
+ relay_socket.connect((self.address, self.port))
+ return ssl.wrap_socket(relay_socket)
+ except socket.error as exc:
+ raise stem.SocketError(exc)
+
+
+class ControlSocket(BaseSocket):
+ """
+ Wrapper for a socket connection that speaks the Tor control protocol. To the
+ better part this transparently handles the formatting for sending and
+ receiving complete messages.
+
+ Callers should not instantiate this class directly, but rather use subclasses
+ which are expected to implement the **_make_socket()** method.
+ """
+
+ def __init__(self):
+ super(ControlSocket, self).__init__()
+
+ def send(self, message):
+ """
+ Formats and sends a message to the control socket. For more information see
+ the :func:`~stem.socket.send_message` function.
+
+ :param str message: message to be formatted and sent to the socket
+
+ :raises:
+ * :class:`stem.SocketError` if a problem arises in using the socket
+ * :class:`stem.SocketClosed` if the socket is known to be shut down
+ """
+
+ self._send(message, lambda s, sf, msg: send_message(sf, msg))
+
+ def recv(self):
+ """
+ Receives a message from the control socket, blocking until we've received
+ one. For more information see the :func:`~stem.socket.recv_message` function.
+
+ :returns: :class:`~stem.response.ControlMessage` for the message received
+
+ :raises:
+ * :class:`stem.ProtocolError` the content from the socket is malformed
+ * :class:`stem.SocketClosed` if the socket closes before we receive a complete message
+ """
+
+ return self._recv(lambda s, sf: recv_message(sf))
+
+
+class ControlPort(ControlSocket):
+ """
+ Control connection to tor. For more information see tor's ControlPort torrc
+ option.
+
+ :var str address: address our socket connects to
+ :var int port: ControlPort our socket connects to
+ """
+
+ def __init__(self, address = '127.0.0.1', port = 9051, connect = True):
+ """
+ ControlPort constructor.
+
+ :param str address: ip address of the controller
+ :param int port: port number of the controller
+ :param bool connect: connects to the socket if True, leaves it unconnected otherwise
+
+ :raises: :class:`stem.SocketError` if connect is **True** and we're
+ unable to establish a connection
+ """
+
+ super(ControlPort, self).__init__()
+ self.address = address
+ self.port = port
+
+ if connect:
+ self.connect()
+
+ def is_localhost(self):
+ return self.address == '127.0.0.1'
+
+ def _make_socket(self):
+ try:
+ control_socket = socket.socket(socket.AF_INET, socket.SOCK_STREAM)
+ control_socket.connect((self.address, self.port))
+ return control_socket
+ except socket.error as exc:
+ raise stem.SocketError(exc)
+
+
+class ControlSocketFile(ControlSocket):
+ """
+ Control connection to tor. For more information see tor's ControlSocket torrc
+ option.
+
+ :var str path: filesystem path of the socket we connect to
+ """
+
+ def __init__(self, path = '/var/run/tor/control', connect = True):
+ """
+ ControlSocketFile constructor.
+
+ :param str socket_path: path where the control socket is located
+ :param bool connect: connects to the socket if True, leaves it unconnected otherwise
+
+ :raises: :class:`stem.SocketError` if connect is **True** and we're
+ unable to establish a connection
+ """
+
+ super(ControlSocketFile, self).__init__()
+ self.path = path
+
+ if connect:
+ self.connect()
+
+ def is_localhost(self):
+ return True
+
+ def _make_socket(self):
+ try:
+ control_socket = socket.socket(socket.AF_UNIX, socket.SOCK_STREAM)
+ control_socket.connect(self.path)
+ return control_socket
+ except socket.error as exc:
+ raise stem.SocketError(exc)
+
+
+def send_message(control_file, message, raw = False):
+ """
+ Sends a message to the control socket, adding the expected formatting for
+ single verses multi-line messages. Neither message type should contain an
+ ending newline (if so it'll be treated as a multi-line message with a blank
+ line at the end). If the message doesn't contain a newline then it's sent
+ as...
+
+ ::
+
+ <message>\\r\\n
+
+ and if it does contain newlines then it's split on ``\\n`` and sent as...
+
+ ::
+
+ +<line 1>\\r\\n
+ <line 2>\\r\\n
+ <line 3>\\r\\n
+ .\\r\\n
+
+ :param file control_file: file derived from the control socket (see the
+ socket's makefile() method for more information)
+ :param str message: message to be sent on the control socket
+ :param bool raw: leaves the message formatting untouched, passing it to the
+ socket as-is
+
+ :raises:
+ * :class:`stem.SocketError` if a problem arises in using the socket
+ * :class:`stem.SocketClosed` if the socket is known to be shut down
+ """
+
+ if not raw:
+ message = send_formatting(message)
+
+ _write_to_socket(control_file, message)
+
+ if log.is_tracing():
+ log_message = message.replace('\r\n', '\n').rstrip()
+ msg_div = '\n' if '\n' in log_message else ' '
+ log.trace('Sent to tor:%s%s' % (msg_div, log_message))
+
+
+def _write_to_socket(socket_file, message):
+ try:
+ socket_file.write(stem.util.str_tools._to_bytes(message))
+ socket_file.flush()
+ except socket.error as exc:
+ log.info('Failed to send: %s' % exc)
+
+ # When sending there doesn't seem to be a reliable method for
+ # distinguishing between failures from a disconnect verses other things.
+ # Just accounting for known disconnection responses.
+
+ if str(exc) == '[Errno 32] Broken pipe':
+ raise stem.SocketClosed(exc)
+ else:
+ raise stem.SocketError(exc)
+ except AttributeError:
+ # if the control_file has been closed then flush will receive:
+ # AttributeError: 'NoneType' object has no attribute 'sendall'
+
+ log.info('Failed to send: file has been closed')
+ raise stem.SocketClosed('file has been closed')
+
+
+def recv_message(control_file, arrived_at = None):
+ """
+ Pulls from a control socket until we either have a complete message or
+ encounter a problem.
+
+ :param file control_file: file derived from the control socket (see the
+ socket's makefile() method for more information)
+
+ :returns: :class:`~stem.response.ControlMessage` read from the socket
+
+ :raises:
+ * :class:`stem.ProtocolError` the content from the socket is malformed
+ * :class:`stem.SocketClosed` if the socket closes before we receive
+ a complete message
+ """
+
+ parsed_content, raw_content, first_line = None, None, True
+
+ while True:
+ try:
+ line = control_file.readline()
+ except AttributeError:
+ # if the control_file has been closed then we will receive:
+ # AttributeError: 'NoneType' object has no attribute 'recv'
+
+ log.info(ERROR_MSG % ('SocketClosed', 'socket file has been closed'))
+ raise stem.SocketClosed('socket file has been closed')
+ except (OSError, ValueError) as exc:
+ # when disconnected this errors with...
+ #
+ # * ValueError: I/O operation on closed file
+ # * OSError: [Errno 107] Transport endpoint is not connected
+ # * OSError: [Errno 9] Bad file descriptor
+
+ log.info(ERROR_MSG % ('SocketClosed', 'received exception "%s"' % exc))
+ raise stem.SocketClosed(exc)
+
+ # Parses the tor control lines. These are of the form...
+ # <status code><divider><content>\r\n
+
+ if not line:
+ # if the socket is disconnected then the readline() method will provide
+ # empty content
+
+ log.info(ERROR_MSG % ('SocketClosed', 'empty socket content'))
+ raise stem.SocketClosed('Received empty socket content.')
+ elif not MESSAGE_PREFIX.match(line):
+ log.info(ERROR_MSG % ('ProtocolError', 'malformed status code/divider, "%s"' % log.escape(line)))
+ raise stem.ProtocolError('Badly formatted reply line: beginning is malformed')
+ elif not line.endswith(b'\r\n'):
+ log.info(ERROR_MSG % ('ProtocolError', 'no CRLF linebreak, "%s"' % log.escape(line)))
+ raise stem.ProtocolError('All lines should end with CRLF')
+
+ status_code, divider, content = line[:3], line[3:4], line[4:-2] # strip CRLF off content
+
+ status_code = stem.util.str_tools._to_unicode(status_code)
+ divider = stem.util.str_tools._to_unicode(divider)
+
+ # Most controller responses are single lines, in which case we don't need
+ # so much overhead.
+
+ if first_line:
+ if divider == ' ':
+ _log_trace(line)
+ return stem.response.ControlMessage([(status_code, divider, content)], line, arrived_at = arrived_at)
+ else:
+ parsed_content, raw_content, first_line = [], bytearray(), False
+
+ raw_content += line
+
+ if divider == '-':
+ # mid-reply line, keep pulling for more content
+ parsed_content.append((status_code, divider, content))
+ elif divider == ' ':
+ # end of the message, return the message
+ parsed_content.append((status_code, divider, content))
+ _log_trace(bytes(raw_content))
+ return stem.response.ControlMessage(parsed_content, bytes(raw_content), arrived_at = arrived_at)
+ elif divider == '+':
+ # data entry, all of the following lines belong to the content until we
+ # get a line with just a period
+
+ content_block = bytearray(content)
+
+ while True:
+ try:
+ line = control_file.readline()
+ raw_content += line
+ except socket.error as exc:
+ log.info(ERROR_MSG % ('SocketClosed', 'received an exception while mid-way through a data reply (exception: "%s", read content: "%s")' % (exc, log.escape(bytes(raw_content)))))
+ raise stem.SocketClosed(exc)
+
+ if not line.endswith(b'\r\n'):
+ log.info(ERROR_MSG % ('ProtocolError', 'CRLF linebreaks missing from a data reply, "%s"' % log.escape(bytes(raw_content))))
+ raise stem.ProtocolError('All lines should end with CRLF')
+ elif line == b'.\r\n':
+ break # data block termination
+
+ line = line[:-2] # strips off the CRLF
+
+ # lines starting with a period are escaped by a second period (as per
+ # section 2.4 of the control-spec)
+
+ if line.startswith(b'..'):
+ line = line[1:]
+
+ content_block += b'\n' + line
+
+ # joins the content using a newline rather than CRLF separator (more
+ # conventional for multi-line string content outside the windows world)
+
+ parsed_content.append((status_code, divider, bytes(content_block)))
+ else:
+ # this should never be reached due to the prefix regex, but might as well
+ # be safe...
+
+ log.warn(ERROR_MSG % ('ProtocolError', "\"%s\" isn't a recognized divider type" % divider))
+ raise stem.ProtocolError("Unrecognized divider type '%s': %s" % (divider, stem.util.str_tools._to_unicode(line)))
+
+
+def send_formatting(message):
+ """
+ Performs the formatting expected from sent control messages. For more
+ information see the :func:`~stem.socket.send_message` function.
+
+ :param str message: message to be formatted
+
+ :returns: **str** of the message wrapped by the formatting expected from
+ controllers
+ """
+
+ # From control-spec section 2.2...
+ # Command = Keyword OptArguments CRLF / "+" Keyword OptArguments CRLF CmdData
+ # Keyword = 1*ALPHA
+ # OptArguments = [ SP *(SP / VCHAR) ]
+ #
+ # A command is either a single line containing a Keyword and arguments, or a
+ # multiline command whose initial keyword begins with +, and whose data
+ # section ends with a single "." on a line of its own.
+
+ # if we already have \r\n entries then standardize on \n to start with
+ message = message.replace('\r\n', '\n')
+
+ if '\n' in message:
+ return '+%s\r\n.\r\n' % message.replace('\n', '\r\n')
+ else:
+ return message + '\r\n'
+
+
+def _log_trace(response):
+ if not log.is_tracing():
+ return
+
+ log_message = stem.util.str_tools._to_unicode(response.replace(b'\r\n', b'\n').rstrip())
+ log_message_lines = log_message.split('\n')
+
+ if TRUNCATE_LOGS and len(log_message_lines) > TRUNCATE_LOGS:
+ log_message = '\n'.join(log_message_lines[:TRUNCATE_LOGS] + ['... %i more lines...' % (len(log_message_lines) - TRUNCATE_LOGS)])
+
+ if len(log_message_lines) > 2:
+ log.trace('Received from tor:\n%s' % log_message)
+ else:
+ log.trace('Received from tor: %s' % log_message.replace('\n', '\\n'))
_______________________________________________
tor-commits mailing list
tor-commits@xxxxxxxxxxxxxxxxxxxx
https://lists.torproject.org/cgi-bin/mailman/listinfo/tor-commits