diff --git a/kafka/producer/kafka.py b/kafka/producer/kafka.py index 876ad3154..6d1dd079e 100644 --- a/kafka/producer/kafka.py +++ b/kafka/producer/kafka.py @@ -646,7 +646,7 @@ def _cleanup_factory(self): _self = weakref.proxy(self) def wrapper(): try: - _self.close(timeout=0, null_logger=True) + _self.close(timeout=1, null_logger=True) except (ReferenceError, AttributeError): pass return wrapper diff --git a/kafka/producer/sender.py b/kafka/producer/sender.py index 4e32b9854..e362a26d7 100644 --- a/kafka/producer/sender.py +++ b/kafka/producer/sender.py @@ -310,7 +310,7 @@ def _maybe_send_pending_request(self): return False log.debug("%s: Sending transactional request %s", str(self), next_request_handler.request) - while not self._force_close: + while self._running and not self._force_close: target_node = None try: if next_request_handler.needs_coordinator(): @@ -325,7 +325,8 @@ def _maybe_send_pending_request(self): else: target_node = self._client.least_loaded_node() if target_node is None: - self._client.poll(future=self._metadata.request_update()) + self._client.poll(timeout_ms=self.config['retry_backoff_ms'], + future=self._metadata.request_update()) elif not self._client.await_ready(target_node, timeout_ms=self.config['request_timeout_ms']): continue diff --git a/test/producer/test_kip679_config.py b/test/producer/test_kip679_config.py index 9169e7603..cfc11e7db 100644 --- a/test/producer/test_kip679_config.py +++ b/test/producer/test_kip679_config.py @@ -12,66 +12,52 @@ # same process. Observable behavior on the producer instance (enable_idempotence # flag, _transaction_manager presence, config['acks']) is what callers actually # depend on, and is what these tests verify. +# +# These producers point at the default localhost:9092 with no broker running, so +# the `with KafkaProducer(...) as p` form relies on close() tearing down the +# Sender thread even though it never connects (see Sender._maybe_send_pending_request). class TestDefaultsAreIdempotent: """KIP-679: defaults instantiate an idempotent producer with acks=all.""" def test_bare_construct_is_idempotent(self): - p = KafkaProducer(api_version=(0, 11)) - try: + with KafkaProducer(api_version=(0, 11)) as p: assert p.config['enable_idempotence'] is True assert p.config['acks'] == -1 assert p._transaction_manager is not None assert p._transaction_manager.is_transactional() is False - finally: - p.close(timeout=0) def test_opt_out_disables_idempotence_but_keeps_acks_default(self): - p = KafkaProducer(enable_idempotence=False, api_version=(0, 11)) - try: + with KafkaProducer(enable_idempotence=False, api_version=(0, 11)) as p: assert p.config['enable_idempotence'] is False assert p.config['acks'] == -1 assert p._transaction_manager is None - finally: - p.close(timeout=0) class TestDefaultIdempotenceSilentDisable: """User-provided incompatible config + default idempotence -> silent disable.""" def test_acks_1_silently_disables(self): - p = KafkaProducer(acks=1, api_version=(0, 11)) - try: + with KafkaProducer(acks=1, api_version=(0, 11)) as p: assert p.config['enable_idempotence'] is False assert p.config['acks'] == 1 assert p._transaction_manager is None - finally: - p.close(timeout=0) def test_acks_0_silently_disables(self): - p = KafkaProducer(acks=0, api_version=(0, 11)) - try: + with KafkaProducer(acks=0, api_version=(0, 11)) as p: assert p.config['enable_idempotence'] is False assert p._transaction_manager is None - finally: - p.close(timeout=0) def test_retries_zero_silently_disables(self): - p = KafkaProducer(retries=0, api_version=(0, 11)) - try: + with KafkaProducer(retries=0, api_version=(0, 11)) as p: assert p.config['enable_idempotence'] is False assert p._transaction_manager is None - finally: - p.close(timeout=0) def test_max_in_flight_too_high_silently_disables(self): - p = KafkaProducer(max_in_flight_requests_per_connection=10, api_version=(0, 11)) - try: + with KafkaProducer(max_in_flight_requests_per_connection=10, api_version=(0, 11)) as p: assert p.config['enable_idempotence'] is False assert p._transaction_manager is None - finally: - p.close(timeout=0) class TestOldBrokerFallback: @@ -79,14 +65,11 @@ class TestOldBrokerFallback: acks=-1 retained (acks=all is valid on any wire version).""" def test_old_broker_silently_disables(self): - p = KafkaProducer(api_version=(0, 10)) - try: + with KafkaProducer(api_version=(0, 10)) as p: assert p.config['enable_idempotence'] is False # Per KIP-679 + project decision: acks stays at -1 even on old brokers. assert p.config['acks'] == -1 assert p._transaction_manager is None - finally: - p.close(timeout=0) class TestExplicitIdempotenceConflicts: @@ -139,16 +122,13 @@ def test_multiple_conflicts_named_together(self): def test_acks_all_string_accepted(self): """'all' normalizes to -1 before the conflict check fires.""" - p = KafkaProducer( + with KafkaProducer( enable_idempotence=True, acks='all', api_version=(0, 11), - ) - try: + ) as p: assert p.config['acks'] == -1 assert p._transaction_manager is not None - finally: - p.close(timeout=0) def test_old_broker_with_explicit_idempotence_raises(self): with pytest.raises(Errors.KafkaConfigurationError, match="api_version"):