Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
2 changes: 1 addition & 1 deletion kafka/producer/kafka.py
Original file line number Diff line number Diff line change
Expand Up @@ -646,7 +646,7 @@ def _cleanup_factory(self):
_self = weakref.proxy(self)
def wrapper():
try:
_self.close(timeout=0, null_logger=True)
_self.close(timeout=1, null_logger=True)
except (ReferenceError, AttributeError):
pass
return wrapper
Expand Down
5 changes: 3 additions & 2 deletions kafka/producer/sender.py
Original file line number Diff line number Diff line change
Expand Up @@ -310,7 +310,7 @@ def _maybe_send_pending_request(self):
return False

log.debug("%s: Sending transactional request %s", str(self), next_request_handler.request)
while not self._force_close:
while self._running and not self._force_close:
target_node = None
try:
if next_request_handler.needs_coordinator():
Expand All @@ -325,7 +325,8 @@ def _maybe_send_pending_request(self):
else:
target_node = self._client.least_loaded_node()
if target_node is None:
self._client.poll(future=self._metadata.request_update())
self._client.poll(timeout_ms=self.config['retry_backoff_ms'],
future=self._metadata.request_update())
elif not self._client.await_ready(target_node, timeout_ms=self.config['request_timeout_ms']):
continue

Expand Down
46 changes: 13 additions & 33 deletions test/producer/test_kip679_config.py
Original file line number Diff line number Diff line change
Expand Up @@ -12,81 +12,64 @@
# same process. Observable behavior on the producer instance (enable_idempotence
# flag, _transaction_manager presence, config['acks']) is what callers actually
# depend on, and is what these tests verify.
#
# These producers point at the default localhost:9092 with no broker running, so
# the `with KafkaProducer(...) as p` form relies on close() tearing down the
# Sender thread even though it never connects (see Sender._maybe_send_pending_request).


class TestDefaultsAreIdempotent:
"""KIP-679: defaults instantiate an idempotent producer with acks=all."""

def test_bare_construct_is_idempotent(self):
p = KafkaProducer(api_version=(0, 11))
try:
with KafkaProducer(api_version=(0, 11)) as p:
assert p.config['enable_idempotence'] is True
assert p.config['acks'] == -1
assert p._transaction_manager is not None
assert p._transaction_manager.is_transactional() is False
finally:
p.close(timeout=0)

def test_opt_out_disables_idempotence_but_keeps_acks_default(self):
p = KafkaProducer(enable_idempotence=False, api_version=(0, 11))
try:
with KafkaProducer(enable_idempotence=False, api_version=(0, 11)) as p:
assert p.config['enable_idempotence'] is False
assert p.config['acks'] == -1
assert p._transaction_manager is None
finally:
p.close(timeout=0)


class TestDefaultIdempotenceSilentDisable:
"""User-provided incompatible config + default idempotence -> silent disable."""

def test_acks_1_silently_disables(self):
p = KafkaProducer(acks=1, api_version=(0, 11))
try:
with KafkaProducer(acks=1, api_version=(0, 11)) as p:
assert p.config['enable_idempotence'] is False
assert p.config['acks'] == 1
assert p._transaction_manager is None
finally:
p.close(timeout=0)

def test_acks_0_silently_disables(self):
p = KafkaProducer(acks=0, api_version=(0, 11))
try:
with KafkaProducer(acks=0, api_version=(0, 11)) as p:
assert p.config['enable_idempotence'] is False
assert p._transaction_manager is None
finally:
p.close(timeout=0)

def test_retries_zero_silently_disables(self):
p = KafkaProducer(retries=0, api_version=(0, 11))
try:
with KafkaProducer(retries=0, api_version=(0, 11)) as p:
assert p.config['enable_idempotence'] is False
assert p._transaction_manager is None
finally:
p.close(timeout=0)

def test_max_in_flight_too_high_silently_disables(self):
p = KafkaProducer(max_in_flight_requests_per_connection=10, api_version=(0, 11))
try:
with KafkaProducer(max_in_flight_requests_per_connection=10, api_version=(0, 11)) as p:
assert p.config['enable_idempotence'] is False
assert p._transaction_manager is None
finally:
p.close(timeout=0)


class TestOldBrokerFallback:
"""Defaults against a pre-0.11 broker: idempotence silently disabled,
acks=-1 retained (acks=all is valid on any wire version)."""

def test_old_broker_silently_disables(self):
p = KafkaProducer(api_version=(0, 10))
try:
with KafkaProducer(api_version=(0, 10)) as p:
assert p.config['enable_idempotence'] is False
# Per KIP-679 + project decision: acks stays at -1 even on old brokers.
assert p.config['acks'] == -1
assert p._transaction_manager is None
finally:
p.close(timeout=0)


class TestExplicitIdempotenceConflicts:
Expand Down Expand Up @@ -139,16 +122,13 @@ def test_multiple_conflicts_named_together(self):

def test_acks_all_string_accepted(self):
"""'all' normalizes to -1 before the conflict check fires."""
p = KafkaProducer(
with KafkaProducer(
enable_idempotence=True,
acks='all',
api_version=(0, 11),
)
try:
) as p:
assert p.config['acks'] == -1
assert p._transaction_manager is not None
finally:
p.close(timeout=0)

def test_old_broker_with_explicit_idempotence_raises(self):
with pytest.raises(Errors.KafkaConfigurationError, match="api_version"):
Expand Down