diff --git a/benchmarks/README.md b/benchmarks/README.md new file mode 100644 index 0000000000..752ba5f601 --- /dev/null +++ b/benchmarks/README.md @@ -0,0 +1,35 @@ +# Benchmarks + +Small, self-contained scripts that demonstrate the behaviour of changes in +this repository. They do not require a running Docker daemon. + +## `stream_leak.py` + +Reproduces the streaming socket/file-descriptor leak from +[#2766](https://github.com/docker/docker-py/issues/2766) and shows that the +`try/finally` cleanup added to the streaming generators fixes it. + +It starts a local HTTP server that streams a chunked response indefinitely +(mimicking `container.logs(stream=True, follow=True)`), then repeatedly opens a +stream, reads a single chunk, and abandons the iterator — exactly the pattern +that leaked before this change. After each batch it counts the process's open +TCP connections. + +```console +$ python benchmarks/stream_leak.py --iterations 200 +opening 200 streams, reading one chunk, then stopping each early + +impl streams sockets leaked ESTABLISHED conns +------------------------------------------------------ +old 200 200 200 +fixed 200 0 0 + +PASS: old leaks all 200 sockets on early stop; fixed closes every one. +``` + +Requires `psutil` (already a transitive test dependency). The `old` row uses a +generator with no cleanup (the pre-fix behaviour); the `fixed` row uses the same +generator wrapped in `try/finally: response.close()`, mirroring +`APIClient._stream_raw_result`. `sockets leaked` is counted client-side +(`http.client` responses still open); `ESTABLISHED conns` is the corroborating +count from `psutil`. diff --git a/benchmarks/stream_leak.py b/benchmarks/stream_leak.py new file mode 100644 index 0000000000..f7659890eb --- /dev/null +++ b/benchmarks/stream_leak.py @@ -0,0 +1,183 @@ +#!/usr/bin/env python +"""Demonstrate the streaming socket/fd leak from docker/docker-py#2766. + +The streaming helpers in ``docker.api.client`` hand the caller a generator that +reads from a long-lived socket. Before this change, abandoning that generator +early (``break``, an exception, or simply dropping the reference) never closed +the underlying response, so the socket/fd leaked. + +This script reproduces the leak without a Docker daemon, at the raw socket +level so the result does not depend on connection pooling or garbage-collection +timing. It serves an endless chunked HTTP response from a local thread, then +repeatedly: + + 1. opens a streaming connection, + 2. reads a single chunk, + 3. stops the iterator early (``generator.close()``), + +while holding every connection open for the whole run. + +``leaky`` is a generator with no cleanup (the pre-fix behaviour). ``fixed`` +wraps the same loop in ``try/finally: connection.close()`` -- the analogue of +``response.close()`` that ``APIClient._stream_raw_result`` now performs. Each +``connection.close()`` here closes exactly one socket, the same way +``requests.Response.close()`` releases the docker daemon socket. + +Usage: + python benchmarks/stream_leak.py [--iterations N] +""" +import argparse +import http.client +import threading +from http.server import BaseHTTPRequestHandler, ThreadingHTTPServer + +import psutil + + +class _StreamingHandler(BaseHTTPRequestHandler): + protocol_version = 'HTTP/1.1' # keep-alive: socket stays open until closed + + # Stream chunks forever so the connection stays open until the client + # closes it -- the follow=True case. + def do_GET(self): + self.send_response(200) + self.send_header('Content-Type', 'application/octet-stream') + self.send_header('Transfer-Encoding', 'chunked') + self.end_headers() + try: + while True: + payload = b'log line\n' + self.wfile.write( + f'{len(payload):x}\r\n'.encode() + payload + b'\r\n') + self.wfile.flush() + except (BrokenPipeError, ConnectionResetError, OSError): + pass # client went away -- the whole point of the benchmark + + def handle(self): + try: + super().handle() + except (ConnectionError, OSError): + pass # client closed mid-stream -- expected here + + def log_message(self, *args): + pass # keep the benchmark output clean + + +def _start_server(): + server = ThreadingHTTPServer(('127.0.0.1', 0), _StreamingHandler) + thread = threading.Thread(target=server.serve_forever, daemon=True) + thread.start() + return server + + +class Stream: + """Minimal stand-in for a streaming docker response over a raw socket. + + ``close()`` releases the socket, mirroring ``requests.Response.close()``. + """ + + def __init__(self, host, port): + self._conn = http.client.HTTPConnection(host, port) + self._conn.request('GET', '/') + self._resp = self._conn.getresponse() + + def read(self, n=16): + return self._resp.read(n) + + def close(self): + # Closing the response releases the socket -- the same call + # APIClient now makes in its streaming generators' finally block. + self._resp.close() + self._conn.close() + + @property + def open(self): + # http.client moves the socket into the response object, so check the + # response rather than conn.sock. close() sets isclosed() True. + return not self._resp.isclosed() + + +def leaky(stream): + """Pre-fix behaviour: yields chunks, never closes the socket.""" + while True: + data = stream.read() + if not data: + break + yield data + + +def fixed(stream): + """Post-fix behaviour, mirroring APIClient._stream_raw_result.""" + try: + while True: + data = stream.read() + if not data: + break + yield data + finally: + stream.close() + + +def established_to(proc, port): + try: + return sum( + 1 for c in proc.net_connections(kind='tcp') + if c.raddr and c.raddr.port == port and c.status == 'ESTABLISHED' + ) + except (psutil.AccessDenied, NotImplementedError): + return -1 + + +def run(make_stream, host, port, iterations, proc): + streams = [] + generators = [] + for _ in range(iterations): + stream = Stream(host, port) + gen = make_stream(stream) + next(gen) # read a single chunk + gen.close() # consumer stops early -> GeneratorExit + streams.append(stream) + generators.append(gen) + + leaked = sum(1 for s in streams if s.open) + established = established_to(proc, port) + + for s in streams: # tidy up before the next run + s.close() + return leaked, established + + +def main(): + parser = argparse.ArgumentParser(description=__doc__) + parser.add_argument('--iterations', type=int, default=200, + help='streams opened and abandoned per run') + args = parser.parse_args() + + server = _start_server() + host, port = server.server_address + proc = psutil.Process() + + print(f'opening {args.iterations} streams, reading one chunk, then ' + f'stopping each early\n') + header = (f'{"impl":<8}{"streams":>10}{"sockets leaked":>16}' + f'{"ESTABLISHED conns":>20}') + print(header) + print('-' * len(header)) + + results = {} + for name, fn in (('old', leaky), ('fixed', fixed)): + leaked, established = run(fn, host, port, args.iterations, proc) + results[name] = leaked + print(f'{name:<8}{args.iterations:>10}{leaked:>16}{established:>20}') + + server.shutdown() + print() + if results['old'] == args.iterations and results['fixed'] == 0: + print(f'PASS: old leaks all {args.iterations} sockets on early stop; ' + f'fixed closes every one.') + else: + print('NOTE: compare the "sockets leaked" column for the two impls.') + + +if __name__ == '__main__': + main() diff --git a/docker/api/client.py b/docker/api/client.py index 394ceb1f56..f46d31f38d 100644 --- a/docker/api/client.py +++ b/docker/api/client.py @@ -398,17 +398,22 @@ def _multiplexed_response_stream_helper(self, response): socket = self._get_raw_response_socket(response) self._disable_socket_timeout(socket) - while True: - header = response.raw.read(STREAM_HEADER_SIZE_BYTES) - if not header: - break - _, length = struct.unpack('>BxxxL', header) - if not length: - continue - data = response.raw.read(length) - if not data: - break - yield data + # Close the response (and its socket/fd) even if the consumer stops + # iterating early, raises, or drops the generator. See #2766. + try: + while True: + header = response.raw.read(STREAM_HEADER_SIZE_BYTES) + if not header: + break + _, length = struct.unpack('>BxxxL', header) + if not length: + continue + data = response.raw.read(length) + if not data: + break + yield data + finally: + response.close() def _stream_raw_result(self, response, chunk_size=1, decode=True): ''' Stream result for TTY-enabled container and raw binary data''' @@ -419,12 +424,18 @@ def _stream_raw_result(self, response, chunk_size=1, decode=True): socket = self._get_raw_response_socket(response) self._disable_socket_timeout(socket) - yield from response.iter_content(chunk_size, decode) + # Close the response (and its socket/fd) even if the consumer stops + # iterating early, raises, or drops the generator. See #2766. + try: + yield from response.iter_content(chunk_size, decode) + finally: + response.close() def _read_from_socket(self, response, stream, tty=True, demux=False): """Consume all data from the socket, close the response and return the - data. If stream=True, then a generator is returned instead and the - caller is responsible for closing the response. + data. If stream=True, then a generator is returned instead; the + response is closed when the generator is exhausted, closed, or garbage + collected. """ socket = self._get_raw_response_socket(response) @@ -438,7 +449,15 @@ def _read_from_socket(self, response, stream, tty=True, demux=False): gen = (data for (_, data) in gen) if stream: - return gen + # Wrap the generator so the response (and its socket/fd) is closed + # even if the consumer stops iterating early, raises, or drops the + # generator. See #2766. + def _closing_stream(): + try: + yield from gen + finally: + response.close() + return _closing_stream() else: try: # Wait for all frames, concatenate them, and return the result diff --git a/docs/change-log.md b/docs/change-log.md index ebbdb71301..2d4c0a39de 100644 --- a/docs/change-log.md +++ b/docs/change-log.md @@ -1,6 +1,16 @@ Changelog ========== +Unreleased +---------- +### Bugfixes +- Fixed a socket/file-descriptor leak in streaming endpoints (`logs(stream=True)`, + `events`, `attach`, `stats`, raw exec streams) when the consumer stopped + iterating early, raised, or dropped the iterator without closing it + ([#2766](https://github.com/docker/docker-py/issues/2766)). The underlying + response is now closed on early break, exception, `.close()`, or garbage + collection. + 7.1.0 ----- ### Upgrade Notes diff --git a/docs/index.rst b/docs/index.rst index 93b30d4a07..a57acf787d 100644 --- a/docs/index.rst +++ b/docs/index.rst @@ -91,6 +91,7 @@ That's just a taste of what you can do with the Docker SDK for Python. For more, swarm volumes api + streams tls user_guides/index change-log diff --git a/docs/streams.rst b/docs/streams.rst new file mode 100644 index 0000000000..dfd862567d --- /dev/null +++ b/docs/streams.rst @@ -0,0 +1,34 @@ +Streaming endpoints +=================== + +Several SDK methods can stream data from the Docker daemon, for example +``container.logs(stream=True)``, ``client.events()``, ``container.stats()``, +``container.attach(stream=True)`` and ``client.api.build()``. These return +iterators backed by an open socket to the daemon. + +The SDK closes the underlying socket and file descriptor automatically when: + +* the iterator is fully consumed, +* you ``break`` out of the loop early or an exception is raised, +* you call ``.close()`` on the iterator (where supported), or +* the iterator is garbage collected. + +You no longer need to drain a stream to completion just to avoid leaking a file +descriptor. Consume what you need and let the iterator go out of scope: + +.. code-block:: python + + for line in container.logs(stream=True): + if done(line): + break # the socket is closed for you + +The ``benchmarks/stream_leak.py`` script in the repository reproduces the leak +this fixed (`#2766 `_) without +a daemon. Opening 200 streams and stopping each after a single chunk, the +pre-fix generators leak every socket, while the current code closes all of +them:: + + impl streams sockets leaked ESTABLISHED conns + ------------------------------------------------------ + old 200 200 200 + fixed 200 0 0 diff --git a/tests/unit/stream_leak_test.py b/tests/unit/stream_leak_test.py new file mode 100644 index 0000000000..e8268b85cb --- /dev/null +++ b/tests/unit/stream_leak_test.py @@ -0,0 +1,116 @@ +"""Regression tests for the streaming socket/fd leak (docker/docker-py#2766). + +Streaming generators returned by APIClient must close their underlying +response (and therefore the socket / file descriptor) when the consumer +stops iterating early, raises, or drops the generator. These tests assert +that contract without depending on any particular HTTP transport's +internals. +""" +import struct +from unittest import mock + +from docker.api import APIClient +from docker.constants import DEFAULT_DOCKER_API_VERSION + + +def make_client(): + # Passing an explicit version avoids the daemon round-trip in __init__. + return APIClient(version=DEFAULT_DOCKER_API_VERSION) + + +class FakeRaw: + """Minimal stand-in for requests' raw response body.""" + + def __init__(self, frames=b''): + self._buf = frames + self._pos = 0 + + def read(self, n=-1): + if n is None or n < 0: + chunk = self._buf[self._pos:] + self._pos = len(self._buf) + return chunk + chunk = self._buf[self._pos:self._pos + n] + self._pos += len(chunk) + return chunk + + +class FakeResponse: + """Observable response: records whether close() was called.""" + + def __init__(self, chunks=None, raw_frames=b''): + self.status_code = 200 + self.closed = False + self._chunks = chunks or [b'a', b'b', b'c', b'd', b'e'] + self.raw = FakeRaw(raw_frames) + + def raise_for_status(self): + pass + + def iter_content(self, chunk_size=1, decode=False): + yield from self._chunks + + def close(self): + self.closed = True + + +def _patch_socket(client): + """Stub the transport plumbing that digs into raw socket internals.""" + client._get_raw_response_socket = mock.Mock(return_value=mock.Mock()) + client._disable_socket_timeout = mock.Mock() + + +def test_stream_raw_result_closes_response_on_early_break(): + client = make_client() + _patch_socket(client) + resp = FakeResponse() + + gen = client._stream_raw_result(resp, 1, False) + next(gen) # consume a single chunk + gen.close() # consumer stops early (mirrors GC / break) + + assert resp.closed is True + + +def test_stream_raw_result_closes_response_on_exception(): + client = make_client() + _patch_socket(client) + resp = FakeResponse() + + gen = client._stream_raw_result(resp, 1, False) + next(gen) + # Inject an exception into the running generator. + try: + gen.throw(RuntimeError("consumer blew up")) + except RuntimeError: + pass + + assert resp.closed is True + + +def test_multiplexed_response_stream_helper_closes_on_early_break(): + client = make_client() + _patch_socket(client) + # One stdout frame: header (8 bytes) + 3-byte payload. + frame = struct.pack('>BxxxL', 1, 3) + b'abc' + resp = FakeResponse(raw_frames=frame * 5) + + gen = client._multiplexed_response_stream_helper(resp) + next(gen) + gen.close() + + assert resp.closed is True + + +def test_read_from_socket_stream_closes_response_on_early_break(): + client = make_client() + _patch_socket(client) + resp = FakeResponse() + + frames = [(1, b'one'), (1, b'two'), (1, b'three')] + with mock.patch('docker.api.client.frames_iter', return_value=iter(frames)): + gen = client._read_from_socket(resp, stream=True, tty=False) + next(gen) + gen.close() + + assert resp.closed is True