Source code for h11._connection

# This contains the main Connection class. Everything in h11 revolves around
# this.
from typing import Any, Callable, cast, Dict, List, Optional, Tuple, Type, Union

from ._events import (
from ._headers import get_comma_header, has_expect_100_continue, set_comma_header
from ._readers import READERS, ReadersType
from ._receivebuffer import ReceiveBuffer
from ._state import (
from ._util import (  # Import the internal things we need
from ._writers import WRITERS, WritersType

# Everything in __all__ gets re-exported as part of the h11 public API.
__all__ = ["Connection", "NEED_DATA", "PAUSED"]

[docs]class NEED_DATA(Sentinel, metaclass=Sentinel): pass
[docs]class PAUSED(Sentinel, metaclass=Sentinel): pass
# If we ever have this much buffered without it making a complete parseable # event, we error out. The only time we really buffer is when reading the # request/response line + headers together, so this is effectively the limit on # the size of that. # # Some precedents for defaults: # - node.js: 80 * 1024 # - tomcat: 8 * 1024 # - IIS: 16 * 1024 # - Apache: <8 KiB per line> DEFAULT_MAX_INCOMPLETE_EVENT_SIZE = 16 * 1024 # RFC 7230's rules for connection lifecycles: # - If either side says they want to close the connection, then the connection # must close. # - HTTP/1.1 defaults to keep-alive unless someone says Connection: close # - HTTP/1.0 defaults to close unless both sides say Connection: keep-alive # (and even this is a mess -- e.g. if you're implementing a proxy then # sending Connection: keep-alive is forbidden). # # We simplify life by simply not supporting keep-alive with HTTP/1.0 peers. So # our rule is: # - If someone says Connection: close, we will close # - If someone uses HTTP/1.0, we will close. def _keep_alive(event: Union[Request, Response]) -> bool: connection = get_comma_header(event.headers, b"connection") if b"close" in connection: return False if getattr(event, "http_version", b"1.1") < b"1.1": return False return True def _body_framing( request_method: bytes, event: Union[Request, Response] ) -> Tuple[str, Union[Tuple[()], Tuple[int]]]: # Called when we enter SEND_BODY to figure out framing information for # this body. # # These are the only two events that can trigger a SEND_BODY state: assert type(event) in (Request, Response) # Returns one of: # # ("content-length", count) # ("chunked", ()) # ("http/1.0", ()) # # which are (lookup key, *args) for constructing body reader/writer # objects. # # Reference: # # Step 1: some responses always have an empty body, regardless of what the # headers say. if type(event) is Response: if ( event.status_code in (204, 304) or request_method == b"HEAD" or (request_method == b"CONNECT" and 200 <= event.status_code < 300) ): return ("content-length", (0,)) # Section 3.3.3 also lists another case -- responses with status_code # < 200. For us these are InformationalResponses, not Responses, so # they can't get into this function in the first place. assert event.status_code >= 200 # Step 2: check for Transfer-Encoding (T-E beats C-L): transfer_encodings = get_comma_header(event.headers, b"transfer-encoding") if transfer_encodings: assert transfer_encodings == [b"chunked"] return ("chunked", ()) # Step 3: check for Content-Length content_lengths = get_comma_header(event.headers, b"content-length") if content_lengths: return ("content-length", (int(content_lengths[0]),)) # Step 4: no applicable headers; fallback/default depends on type if type(event) is Request: return ("content-length", (0,)) else: return ("http/1.0", ()) ################################################################ # # The main Connection class # ################################################################
[docs]class Connection: """An object encapsulating the state of an HTTP connection. Args: our_role: If you're implementing a client, pass :data:`h11.CLIENT`. If you're implementing a server, pass :data:`h11.SERVER`. max_incomplete_event_size (int): The maximum number of bytes we're willing to buffer of an incomplete event. In practice this mostly sets a limit on the maximum size of the request/response line + headers. If this is exceeded, then :meth:`next_event` will raise :exc:`RemoteProtocolError`. """ def __init__( self, our_role: Type[Sentinel], max_incomplete_event_size: int = DEFAULT_MAX_INCOMPLETE_EVENT_SIZE, ) -> None: self._max_incomplete_event_size = max_incomplete_event_size # State and role tracking if our_role not in (CLIENT, SERVER): raise ValueError("expected CLIENT or SERVER, not {!r}".format(our_role)) self.our_role = our_role self.their_role: Type[Sentinel] if our_role is CLIENT: self.their_role = SERVER else: self.their_role = CLIENT self._cstate = ConnectionState() # Callables for converting data->events or vice-versa given the # current state self._writer = self._get_io_object(self.our_role, None, WRITERS) self._reader = self._get_io_object(self.their_role, None, READERS) # Holds any unprocessed received data self._receive_buffer = ReceiveBuffer() # If this is true, then it indicates that the incoming connection was # closed *after* the end of whatever's in self._receive_buffer: self._receive_buffer_closed = False # Extra bits of state that don't fit into the state machine. # # These two are only used to interpret framing headers for figuring # out how to read/write response bodies. their_http_version is also # made available as a convenient public API. self.their_http_version: Optional[bytes] = None self._request_method: Optional[bytes] = None # This is pure flow-control and doesn't at all affect the set of legal # transitions, so no need to bother ConnectionState with it: self.client_is_waiting_for_100_continue = False @property def states(self) -> Dict[Type[Sentinel], Type[Sentinel]]: """A dictionary like:: {CLIENT: <client state>, SERVER: <server state>} See :ref:`state-machine` for details. """ return dict(self._cstate.states) @property def our_state(self) -> Type[Sentinel]: """The current state of whichever role we are playing. See :ref:`state-machine` for details. """ return self._cstate.states[self.our_role] @property def their_state(self) -> Type[Sentinel]: """The current state of whichever role we are NOT playing. See :ref:`state-machine` for details. """ return self._cstate.states[self.their_role] @property def they_are_waiting_for_100_continue(self) -> bool: return self.their_role is CLIENT and self.client_is_waiting_for_100_continue
[docs] def start_next_cycle(self) -> None: """Attempt to reset our connection state for a new request/response cycle. If both client and server are in :data:`DONE` state, then resets them both to :data:`IDLE` state in preparation for a new request/response cycle on this same connection. Otherwise, raises a :exc:`LocalProtocolError`. See :ref:`keepalive-and-pipelining`. """ old_states = dict(self._cstate.states) self._cstate.start_next_cycle() self._request_method = None # self.their_http_version gets left alone, since it presumably lasts # beyond a single request/response cycle assert not self.client_is_waiting_for_100_continue self._respond_to_state_changes(old_states)
def _process_error(self, role: Type[Sentinel]) -> None: old_states = dict(self._cstate.states) self._cstate.process_error(role) self._respond_to_state_changes(old_states) def _server_switch_event(self, event: Event) -> Optional[Type[Sentinel]]: if type(event) is InformationalResponse and event.status_code == 101: return _SWITCH_UPGRADE if type(event) is Response: if ( _SWITCH_CONNECT in self._cstate.pending_switch_proposals and 200 <= event.status_code < 300 ): return _SWITCH_CONNECT return None # All events go through here def _process_event(self, role: Type[Sentinel], event: Event) -> None: # First, pass the event through the state machine to make sure it # succeeds. old_states = dict(self._cstate.states) if role is CLIENT and type(event) is Request: if event.method == b"CONNECT": self._cstate.process_client_switch_proposal(_SWITCH_CONNECT) if get_comma_header(event.headers, b"upgrade"): self._cstate.process_client_switch_proposal(_SWITCH_UPGRADE) server_switch_event = None if role is SERVER: server_switch_event = self._server_switch_event(event) self._cstate.process_event(role, type(event), server_switch_event) # Then perform the updates triggered by it. if type(event) is Request: self._request_method = event.method if role is self.their_role and type(event) in ( Request, Response, InformationalResponse, ): event = cast(Union[Request, Response, InformationalResponse], event) self.their_http_version = event.http_version # Keep alive handling # # RFC 7230 doesn't really say what one should do if Connection: close # shows up on a 1xx InformationalResponse. I think the idea is that # this is not supposed to happen. In any case, if it does happen, we # ignore it. if type(event) in (Request, Response) and not _keep_alive( cast(Union[Request, Response], event) ): self._cstate.process_keep_alive_disabled() # 100-continue if type(event) is Request and has_expect_100_continue(event): self.client_is_waiting_for_100_continue = True if type(event) in (InformationalResponse, Response): self.client_is_waiting_for_100_continue = False if role is CLIENT and type(event) in (Data, EndOfMessage): self.client_is_waiting_for_100_continue = False self._respond_to_state_changes(old_states, event) def _get_io_object( self, role: Type[Sentinel], event: Optional[Event], io_dict: Union[ReadersType, WritersType], ) -> Optional[Callable[..., Any]]: # event may be None; it's only used when entering SEND_BODY state = self._cstate.states[role] if state is SEND_BODY: # Special case: the io_dict has a dict of reader/writer factories # that depend on the request/response framing. framing_type, args = _body_framing( cast(bytes, self._request_method), cast(Union[Request, Response], event) ) return io_dict[SEND_BODY][framing_type](*args) # type: ignore[index] else: # General case: the io_dict just has the appropriate reader/writer # for this state return io_dict.get((role, state)) # type: ignore[return-value] # This must be called after any action that might have caused # self._cstate.states to change. def _respond_to_state_changes( self, old_states: Dict[Type[Sentinel], Type[Sentinel]], event: Optional[Event] = None, ) -> None: # Update reader/writer if self.our_state != old_states[self.our_role]: self._writer = self._get_io_object(self.our_role, event, WRITERS) if self.their_state != old_states[self.their_role]: self._reader = self._get_io_object(self.their_role, event, READERS) @property def trailing_data(self) -> Tuple[bytes, bool]: """Data that has been received, but not yet processed, represented as a tuple with two elements, where the first is a byte-string containing the unprocessed data itself, and the second is a bool that is True if the receive connection was closed. See :ref:`switching-protocols` for discussion of why you'd want this. """ return (bytes(self._receive_buffer), self._receive_buffer_closed)
[docs] def receive_data(self, data: bytes) -> None: """Add data to our internal receive buffer. This does not actually do any processing on the data, just stores it. To trigger processing, you have to call :meth:`next_event`. Args: data (:term:`bytes-like object`): The new data that was just received. Special case: If *data* is an empty byte-string like ``b""``, then this indicates that the remote side has closed the connection (end of file). Normally this is convenient, because standard Python APIs like :meth:`` or :meth:`socket.recv` use ``b""`` to indicate end-of-file, while other failures to read are indicated using other mechanisms like raising :exc:`TimeoutError`. When using such an API you can just blindly pass through whatever you get from ``read`` to :meth:`receive_data`, and everything will work. But, if you have an API where reading an empty string is a valid non-EOF condition, then you need to be aware of this and make sure to check for such strings and avoid passing them to :meth:`receive_data`. Returns: Nothing, but after calling this you should call :meth:`next_event` to parse the newly received data. Raises: RuntimeError: Raised if you pass an empty *data*, indicating EOF, and then pass a non-empty *data*, indicating more data that somehow arrived after the EOF. (Calling ``receive_data(b"")`` multiple times is fine, and equivalent to calling it once.) """ if data: if self._receive_buffer_closed: raise RuntimeError("received close, then received more data?") self._receive_buffer += data else: self._receive_buffer_closed = True
def _extract_next_receive_event( self, ) -> Union[Event, Type[NEED_DATA], Type[PAUSED]]: state = self.their_state # We don't pause immediately when they enter DONE, because even in # DONE state we can still process a ConnectionClosed() event. But # if we have data in our buffer, then we definitely aren't getting # a ConnectionClosed() immediately and we need to pause. if state is DONE and self._receive_buffer: return PAUSED if state is MIGHT_SWITCH_PROTOCOL or state is SWITCHED_PROTOCOL: return PAUSED assert self._reader is not None event = self._reader(self._receive_buffer) if event is None: if not self._receive_buffer and self._receive_buffer_closed: # In some unusual cases (basically just HTTP/1.0 bodies), EOF # triggers an actual protocol event; in that case, we want to # return that event, and then the state will change and we'll # get called again to generate the actual ConnectionClosed(). if hasattr(self._reader, "read_eof"): event = self._reader.read_eof() # type: ignore[attr-defined] else: event = ConnectionClosed() if event is None: event = NEED_DATA return event # type: ignore[no-any-return]
[docs] def next_event(self) -> Union[Event, Type[NEED_DATA], Type[PAUSED]]: """Parse the next event out of our receive buffer, update our internal state, and return it. This is a mutating operation -- think of it like calling :func:`next` on an iterator. Returns: : One of three things: 1) An event object -- see :ref:`events`. 2) The special constant :data:`NEED_DATA`, which indicates that you need to read more data from your socket and pass it to :meth:`receive_data` before this method will be able to return any more events. 3) The special constant :data:`PAUSED`, which indicates that we are not in a state where we can process incoming data (usually because the peer has finished their part of the current request/response cycle, and you have not yet called :meth:`start_next_cycle`). See :ref:`flow-control` for details. Raises: RemoteProtocolError: The peer has misbehaved. You should close the connection (possibly after sending some kind of 4xx response). Once this method returns :class:`ConnectionClosed` once, then all subsequent calls will also return :class:`ConnectionClosed`. If this method raises any exception besides :exc:`RemoteProtocolError` then that's a bug -- if it happens please file a bug report! If this method raises any exception then it also sets :attr:`Connection.their_state` to :data:`ERROR` -- see :ref:`error-handling` for discussion. """ if self.their_state is ERROR: raise RemoteProtocolError("Can't receive data when peer state is ERROR") try: event = self._extract_next_receive_event() if event not in [NEED_DATA, PAUSED]: self._process_event(self.their_role, cast(Event, event)) if event is NEED_DATA: if len(self._receive_buffer) > self._max_incomplete_event_size: # 431 is "Request header fields too large" which is pretty # much the only situation where we can get here raise RemoteProtocolError( "Receive buffer too long", error_status_hint=431 ) if self._receive_buffer_closed: # We're still trying to complete some event, but that's # never going to happen because no more data is coming raise RemoteProtocolError("peer unexpectedly closed connection") return event except BaseException as exc: self._process_error(self.their_role) if isinstance(exc, LocalProtocolError): exc._reraise_as_remote_protocol_error() else: raise
[docs] def send(self, event: Event) -> Optional[bytes]: """Convert a high-level event into bytes that can be sent to the peer, while updating our internal state machine. Args: event: The :ref:`event <events>` to send. Returns: If ``type(event) is ConnectionClosed``, then returns ``None``. Otherwise, returns a :term:`bytes-like object`. Raises: LocalProtocolError: Sending this event at this time would violate our understanding of the HTTP/1.1 protocol. If this method raises any exception then it also sets :attr:`Connection.our_state` to :data:`ERROR` -- see :ref:`error-handling` for discussion. """ data_list = self.send_with_data_passthrough(event) if data_list is None: return None else: return b"".join(data_list)
[docs] def send_with_data_passthrough(self, event: Event) -> Optional[List[bytes]]: """Identical to :meth:`send`, except that in situations where :meth:`send` returns a single :term:`bytes-like object`, this instead returns a list of them -- and when sending a :class:`Data` event, this list is guaranteed to contain the exact object you passed in as :attr:``. See :ref:`sendfile` for discussion. """ if self.our_state is ERROR: raise LocalProtocolError("Can't send data when our state is ERROR") try: if type(event) is Response: event = self._clean_up_response_headers_for_sending(event) # We want to call _process_event before calling the writer, # because if someone tries to do something invalid then this will # give a sensible error message, while our writers all just assume # they will only receive valid events. But, _process_event might # change self._writer. So we have to do a little dance: writer = self._writer self._process_event(self.our_role, event) if type(event) is ConnectionClosed: return None else: # In any situation where writer is None, process_event should # have raised ProtocolError assert writer is not None data_list: List[bytes] = [] writer(event, data_list.append) return data_list except: self._process_error(self.our_role) raise
[docs] def send_failed(self) -> None: """Notify the state machine that we failed to send the data it gave us. This causes :attr:`Connection.our_state` to immediately become :data:`ERROR` -- see :ref:`error-handling` for discussion. """ self._process_error(self.our_role)
# When sending a Response, we take responsibility for a few things: # # - Sometimes you MUST set Connection: close. We take care of those # times. (You can also set it yourself if you want, and if you do then # we'll respect that and close the connection at the right time. But you # don't have to worry about that unless you want to.) # # - The user has to set Content-Length if they want it. Otherwise, for # responses that have bodies (e.g. not HEAD), then we will automatically # select the right mechanism for streaming a body of unknown length, # which depends on depending on the peer's HTTP version. # # This function's *only* responsibility is making sure headers are set up # right -- everything downstream just looks at the headers. There are no # side channels. def _clean_up_response_headers_for_sending(self, response: Response) -> Response: assert type(response) is Response headers = response.headers need_close = False # HEAD requests need some special handling: they always act like they # have Content-Length: 0, and that's how _body_framing treats # them. But their headers are supposed to match what we would send if # the request was a GET. (Technically there is one deviation allowed: # we're allowed to leave out the framing headers -- see # . But it's just as # easy to get them right.) method_for_choosing_headers = cast(bytes, self._request_method) if method_for_choosing_headers == b"HEAD": method_for_choosing_headers = b"GET" framing_type, _ = _body_framing(method_for_choosing_headers, response) if framing_type in ("chunked", "http/1.0"): # This response has a body of unknown length. # If our peer is HTTP/1.1, we use Transfer-Encoding: chunked # If our peer is HTTP/1.0, we use no framing headers, and close the # connection afterwards. # # Make sure to clear Content-Length (in principle user could have # set both and then we ignored Content-Length b/c # Transfer-Encoding overwrote it -- this would be naughty of them, # but the HTTP spec says that if our peer does this then we have # to fix it instead of erroring out, so we'll accord the user the # same respect). headers = set_comma_header(headers, b"content-length", []) if self.their_http_version is None or self.their_http_version < b"1.1": # Either we never got a valid request and are sending back an # error (their_http_version is None), so we assume the worst; # or else we did get a valid HTTP/1.0 request, so we know that # they don't understand chunked encoding. headers = set_comma_header(headers, b"transfer-encoding", []) # This is actually redundant ATM, since currently we # unconditionally disable keep-alive when talking to HTTP/1.0 # peers. But let's be defensive just in case we add # Connection: keep-alive support later: if self._request_method != b"HEAD": need_close = True else: headers = set_comma_header(headers, b"transfer-encoding", [b"chunked"]) if not self._cstate.keep_alive or need_close: # Make sure Connection: close is set connection = set(get_comma_header(headers, b"connection")) connection.discard(b"keep-alive") connection.add(b"close") headers = set_comma_header(headers, b"connection", sorted(connection)) return Response( headers=headers, status_code=response.status_code, http_version=response.http_version, reason=response.reason, )