Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
35 changes: 35 additions & 0 deletions benchmarks/README.md
Original file line number Diff line number Diff line change
@@ -0,0 +1,35 @@
# Benchmarks

Small, self-contained scripts that demonstrate the behaviour of changes in
this repository. They do not require a running Docker daemon.

## `stream_leak.py`

Reproduces the streaming socket/file-descriptor leak from
[#2766](https://github.com/docker/docker-py/issues/2766) and shows that the
`try/finally` cleanup added to the streaming generators fixes it.

It starts a local HTTP server that streams a chunked response indefinitely
(mimicking `container.logs(stream=True, follow=True)`), then repeatedly opens a
stream, reads a single chunk, and abandons the iterator — exactly the pattern
that leaked before this change. After each batch it counts the process's open
TCP connections.

```console
$ python benchmarks/stream_leak.py --iterations 200
opening 200 streams, reading one chunk, then stopping each early

impl streams sockets leaked ESTABLISHED conns
------------------------------------------------------
old 200 200 200
fixed 200 0 0

PASS: old leaks all 200 sockets on early stop; fixed closes every one.
```

Requires `psutil` (already a transitive test dependency). The `old` row uses a
generator with no cleanup (the pre-fix behaviour); the `fixed` row uses the same
generator wrapped in `try/finally: response.close()`, mirroring
`APIClient._stream_raw_result`. `sockets leaked` is counted client-side
(`http.client` responses still open); `ESTABLISHED conns` is the corroborating
count from `psutil`.
183 changes: 183 additions & 0 deletions benchmarks/stream_leak.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,183 @@
#!/usr/bin/env python
"""Demonstrate the streaming socket/fd leak from docker/docker-py#2766.

The streaming helpers in ``docker.api.client`` hand the caller a generator that
reads from a long-lived socket. Before this change, abandoning that generator
early (``break``, an exception, or simply dropping the reference) never closed
the underlying response, so the socket/fd leaked.

This script reproduces the leak without a Docker daemon, at the raw socket
level so the result does not depend on connection pooling or garbage-collection
timing. It serves an endless chunked HTTP response from a local thread, then
repeatedly:

1. opens a streaming connection,
2. reads a single chunk,
3. stops the iterator early (``generator.close()``),

while holding every connection open for the whole run.

``leaky`` is a generator with no cleanup (the pre-fix behaviour). ``fixed``
wraps the same loop in ``try/finally: connection.close()`` -- the analogue of
``response.close()`` that ``APIClient._stream_raw_result`` now performs. Each
``connection.close()`` here closes exactly one socket, the same way
``requests.Response.close()`` releases the docker daemon socket.

Usage:
python benchmarks/stream_leak.py [--iterations N]
"""
import argparse
import http.client
import threading
from http.server import BaseHTTPRequestHandler, ThreadingHTTPServer

import psutil


class _StreamingHandler(BaseHTTPRequestHandler):
protocol_version = 'HTTP/1.1' # keep-alive: socket stays open until closed

# Stream chunks forever so the connection stays open until the client
# closes it -- the follow=True case.
def do_GET(self):
self.send_response(200)
self.send_header('Content-Type', 'application/octet-stream')
self.send_header('Transfer-Encoding', 'chunked')
self.end_headers()
try:
while True:
payload = b'log line\n'
self.wfile.write(
f'{len(payload):x}\r\n'.encode() + payload + b'\r\n')
self.wfile.flush()
except (BrokenPipeError, ConnectionResetError, OSError):
pass # client went away -- the whole point of the benchmark

def handle(self):
try:
super().handle()
except (ConnectionError, OSError):
pass # client closed mid-stream -- expected here

def log_message(self, *args):
pass # keep the benchmark output clean


def _start_server():
server = ThreadingHTTPServer(('127.0.0.1', 0), _StreamingHandler)
thread = threading.Thread(target=server.serve_forever, daemon=True)
thread.start()
return server


class Stream:
"""Minimal stand-in for a streaming docker response over a raw socket.

``close()`` releases the socket, mirroring ``requests.Response.close()``.
"""

def __init__(self, host, port):
self._conn = http.client.HTTPConnection(host, port)
self._conn.request('GET', '/')
self._resp = self._conn.getresponse()

def read(self, n=16):
return self._resp.read(n)

def close(self):
# Closing the response releases the socket -- the same call
# APIClient now makes in its streaming generators' finally block.
self._resp.close()
self._conn.close()

@property
def open(self):
# http.client moves the socket into the response object, so check the
# response rather than conn.sock. close() sets isclosed() True.
return not self._resp.isclosed()


def leaky(stream):
"""Pre-fix behaviour: yields chunks, never closes the socket."""
while True:
data = stream.read()
if not data:
break
yield data


def fixed(stream):
"""Post-fix behaviour, mirroring APIClient._stream_raw_result."""
try:
while True:
data = stream.read()
if not data:
break
yield data
finally:
stream.close()


def established_to(proc, port):
try:
return sum(
1 for c in proc.net_connections(kind='tcp')
if c.raddr and c.raddr.port == port and c.status == 'ESTABLISHED'
)
except (psutil.AccessDenied, NotImplementedError):
return -1


def run(make_stream, host, port, iterations, proc):
streams = []
generators = []
for _ in range(iterations):
stream = Stream(host, port)
gen = make_stream(stream)
next(gen) # read a single chunk
gen.close() # consumer stops early -> GeneratorExit
streams.append(stream)
generators.append(gen)

leaked = sum(1 for s in streams if s.open)
established = established_to(proc, port)

for s in streams: # tidy up before the next run
s.close()
return leaked, established


def main():
parser = argparse.ArgumentParser(description=__doc__)
parser.add_argument('--iterations', type=int, default=200,
help='streams opened and abandoned per run')
args = parser.parse_args()

server = _start_server()
host, port = server.server_address
proc = psutil.Process()

print(f'opening {args.iterations} streams, reading one chunk, then '
f'stopping each early\n')
header = (f'{"impl":<8}{"streams":>10}{"sockets leaked":>16}'
f'{"ESTABLISHED conns":>20}')
print(header)
print('-' * len(header))

results = {}
for name, fn in (('old', leaky), ('fixed', fixed)):
leaked, established = run(fn, host, port, args.iterations, proc)
results[name] = leaked
print(f'{name:<8}{args.iterations:>10}{leaked:>16}{established:>20}')

server.shutdown()
print()
if results['old'] == args.iterations and results['fixed'] == 0:
print(f'PASS: old leaks all {args.iterations} sockets on early stop; '
f'fixed closes every one.')
else:
print('NOTE: compare the "sockets leaked" column for the two impls.')


if __name__ == '__main__':
main()
49 changes: 34 additions & 15 deletions docker/api/client.py
Original file line number Diff line number Diff line change
Expand Up @@ -398,17 +398,22 @@ def _multiplexed_response_stream_helper(self, response):
socket = self._get_raw_response_socket(response)
self._disable_socket_timeout(socket)

while True:
header = response.raw.read(STREAM_HEADER_SIZE_BYTES)
if not header:
break
_, length = struct.unpack('>BxxxL', header)
if not length:
continue
data = response.raw.read(length)
if not data:
break
yield data
# Close the response (and its socket/fd) even if the consumer stops
# iterating early, raises, or drops the generator. See #2766.
try:
while True:
header = response.raw.read(STREAM_HEADER_SIZE_BYTES)
if not header:
break
_, length = struct.unpack('>BxxxL', header)
if not length:
continue
data = response.raw.read(length)
if not data:
break
yield data
finally:
response.close()

def _stream_raw_result(self, response, chunk_size=1, decode=True):
''' Stream result for TTY-enabled container and raw binary data'''
Expand All @@ -419,12 +424,18 @@ def _stream_raw_result(self, response, chunk_size=1, decode=True):
socket = self._get_raw_response_socket(response)
self._disable_socket_timeout(socket)

yield from response.iter_content(chunk_size, decode)
# Close the response (and its socket/fd) even if the consumer stops
# iterating early, raises, or drops the generator. See #2766.
try:
yield from response.iter_content(chunk_size, decode)
finally:
response.close()

def _read_from_socket(self, response, stream, tty=True, demux=False):
"""Consume all data from the socket, close the response and return the
data. If stream=True, then a generator is returned instead and the
caller is responsible for closing the response.
data. If stream=True, then a generator is returned instead; the
response is closed when the generator is exhausted, closed, or garbage
collected.
"""
socket = self._get_raw_response_socket(response)

Expand All @@ -438,7 +449,15 @@ def _read_from_socket(self, response, stream, tty=True, demux=False):
gen = (data for (_, data) in gen)

if stream:
return gen
# Wrap the generator so the response (and its socket/fd) is closed
# even if the consumer stops iterating early, raises, or drops the
# generator. See #2766.
def _closing_stream():
try:
yield from gen
finally:
response.close()
return _closing_stream()
else:
try:
# Wait for all frames, concatenate them, and return the result
Expand Down
10 changes: 10 additions & 0 deletions docs/change-log.md
Original file line number Diff line number Diff line change
@@ -1,6 +1,16 @@
Changelog
==========

Unreleased
----------
### Bugfixes
- Fixed a socket/file-descriptor leak in streaming endpoints (`logs(stream=True)`,
`events`, `attach`, `stats`, raw exec streams) when the consumer stopped
iterating early, raised, or dropped the iterator without closing it
([#2766](https://github.com/docker/docker-py/issues/2766)). The underlying
response is now closed on early break, exception, `.close()`, or garbage
collection.

7.1.0
-----
### Upgrade Notes
Expand Down
1 change: 1 addition & 0 deletions docs/index.rst
Original file line number Diff line number Diff line change
Expand Up @@ -91,6 +91,7 @@ That's just a taste of what you can do with the Docker SDK for Python. For more,
swarm
volumes
api
streams
tls
user_guides/index
change-log
34 changes: 34 additions & 0 deletions docs/streams.rst
Original file line number Diff line number Diff line change
@@ -0,0 +1,34 @@
Streaming endpoints
===================

Several SDK methods can stream data from the Docker daemon, for example
``container.logs(stream=True)``, ``client.events()``, ``container.stats()``,
``container.attach(stream=True)`` and ``client.api.build()``. These return
iterators backed by an open socket to the daemon.

The SDK closes the underlying socket and file descriptor automatically when:

* the iterator is fully consumed,
* you ``break`` out of the loop early or an exception is raised,
* you call ``.close()`` on the iterator (where supported), or
* the iterator is garbage collected.

You no longer need to drain a stream to completion just to avoid leaking a file
descriptor. Consume what you need and let the iterator go out of scope:

.. code-block:: python

for line in container.logs(stream=True):
if done(line):
break # the socket is closed for you

The ``benchmarks/stream_leak.py`` script in the repository reproduces the leak
this fixed (`#2766 <https://github.com/docker/docker-py/issues/2766>`_) without
a daemon. Opening 200 streams and stopping each after a single chunk, the
pre-fix generators leak every socket, while the current code closes all of
them::

impl streams sockets leaked ESTABLISHED conns
------------------------------------------------------
old 200 200 200
fixed 200 0 0
Loading