From 053ebaeb6dbc4237b94129414a89e7ff494f6b16 Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?=EA=B9=80=EB=8F=99=EC=9B=90?= Date: Mon, 11 Sep 2023 14:35:08 +0900 Subject: [PATCH 1/4] Remove duplicated logs by reusing PodLogsConsumer MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit * reuse the kubernetes connection not to re-consume the logs * If a failure occurs while consuming logs through `PodLogsConsumer`, a new `PodLogsConsumer` is created. * But, at this time there are duplicated logs even though they have already been consumed. * To fix the duplicated logs, `PodLogsConsumer` instance is created initially at once and is to reused when a failure occurs to prevent duplicate logs from occurring. Signed-off-by: 김동원 --- .../cncf/kubernetes/utils/pod_manager.py | 26 +++++++------------ 1 file changed, 10 insertions(+), 16 deletions(-) diff --git a/airflow/providers/cncf/kubernetes/utils/pod_manager.py b/airflow/providers/cncf/kubernetes/utils/pod_manager.py index 142528ddad639..8d119e89bbfca 100644 --- a/airflow/providers/cncf/kubernetes/utils/pod_manager.py +++ b/airflow/providers/cncf/kubernetes/utils/pod_manager.py @@ -390,9 +390,7 @@ def fetch_container_logs( wait=tenacity.wait_fixed(1), before=before_log(self.log, logging.INFO), ) - def consume_logs( - *, since_time: DateTime | None = None, follow: bool = True, termination_timeout: int = 120 - ) -> DateTime | None: + def consume_logs(*, logs: PodLogsConsumer) -> DateTime | None: """ Tries to follow container logs until container completes. @@ -403,16 +401,6 @@ def consume_logs( """ last_captured_timestamp = None try: - logs = self.read_pod_logs( - pod=pod, - container_name=container_name, - timestamps=True, - since_seconds=( - math.ceil((pendulum.now() - since_time).total_seconds()) if since_time else None - ), - follow=follow, - post_termination_timeout=termination_timeout, - ) for raw_line in logs: line = raw_line.decode("utf-8", errors="backslashreplace") line_timestamp, message = self.parse_log_line(line) @@ -438,11 +426,17 @@ def consume_logs( # note: `read_pod_logs` follows the logs, so we shouldn't necessarily *need* to # loop as we do here. But in a long-running process we might temporarily lose connectivity. # So the looping logic is there to let us resume following the logs. + logs = self.read_pod_logs( + pod=pod, + container_name=container_name, + timestamps=True, + since_seconds=(math.ceil((pendulum.now() - since_time).total_seconds()) if since_time else None), + follow=follow, + post_termination_timeout=post_termination_timeout, + ) last_log_time = since_time while True: - last_log_time = consume_logs( - since_time=last_log_time, follow=follow, termination_timeout=post_termination_timeout - ) + last_log_time = consume_logs(logs=logs) if not self.container_is_running(pod, container_name=container_name): return PodLoggingStatus(running=False, last_log_time=last_log_time) if not follow: From 1b7d854027602137c98d220abab63483e92f2e89 Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?=EA=B9=80=EB=8F=99=EC=9B=90?= Date: Mon, 11 Sep 2023 16:34:18 +0900 Subject: [PATCH 2/4] Move the try inside the connection creation MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit Signed-off-by: 김동원 --- .../cncf/kubernetes/utils/pod_manager.py | 37 +++++++++++++------ 1 file changed, 26 insertions(+), 11 deletions(-) diff --git a/airflow/providers/cncf/kubernetes/utils/pod_manager.py b/airflow/providers/cncf/kubernetes/utils/pod_manager.py index 8d119e89bbfca..a3a4beec351ce 100644 --- a/airflow/providers/cncf/kubernetes/utils/pod_manager.py +++ b/airflow/providers/cncf/kubernetes/utils/pod_manager.py @@ -390,7 +390,13 @@ def fetch_container_logs( wait=tenacity.wait_fixed(1), before=before_log(self.log, logging.INFO), ) - def consume_logs(*, logs: PodLogsConsumer) -> DateTime | None: + def consume_logs( + *, + since_time: DateTime | None = None, + follow: bool = True, + termination_timeout: int = 120, + logs: PodLogsConsumer | None, + ) -> tuple[DateTime | None, PodLogsConsumer | None]: """ Tries to follow container logs until container completes. @@ -401,6 +407,17 @@ def consume_logs(*, logs: PodLogsConsumer) -> DateTime | None: """ last_captured_timestamp = None try: + if not logs: + logs = self.read_pod_logs( + pod=pod, + container_name=container_name, + timestamps=True, + since_seconds=( + math.ceil((pendulum.now() - since_time).total_seconds()) if since_time else None + ), + follow=follow, + post_termination_timeout=post_termination_timeout, + ) for raw_line in logs: line = raw_line.decode("utf-8", errors="backslashreplace") line_timestamp, message = self.parse_log_line(line) @@ -421,22 +438,20 @@ def consume_logs(*, logs: PodLogsConsumer) -> DateTime | None: pod.metadata.name, exc_info=True, ) - return last_captured_timestamp or since_time + return last_captured_timestamp or since_time, logs # note: `read_pod_logs` follows the logs, so we shouldn't necessarily *need* to # loop as we do here. But in a long-running process we might temporarily lose connectivity. # So the looping logic is there to let us resume following the logs. - logs = self.read_pod_logs( - pod=pod, - container_name=container_name, - timestamps=True, - since_seconds=(math.ceil((pendulum.now() - since_time).total_seconds()) if since_time else None), - follow=follow, - post_termination_timeout=post_termination_timeout, - ) + logs = None last_log_time = since_time while True: - last_log_time = consume_logs(logs=logs) + last_log_time, logs = consume_logs( + since_time=last_log_time, + follow=follow, + termination_timeout=post_termination_timeout, + logs=logs, + ) if not self.container_is_running(pod, container_name=container_name): return PodLoggingStatus(running=False, last_log_time=last_log_time) if not follow: From 663ea698639491effa24e20a9769e49f7fa46445 Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?=EA=B9=80=EB=8F=99=EC=9B=90?= Date: Mon, 18 Sep 2023 23:11:43 +0900 Subject: [PATCH 3/4] Validate reusing PodLogsConsumer instance MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit Signed-off-by: 김동원 --- .../cncf/kubernetes/utils/test_pod_manager.py | 24 +++++++++++++++++++ 1 file changed, 24 insertions(+) diff --git a/tests/providers/cncf/kubernetes/utils/test_pod_manager.py b/tests/providers/cncf/kubernetes/utils/test_pod_manager.py index dfe06d9a74648..88ee73269abbc 100644 --- a/tests/providers/cncf/kubernetes/utils/test_pod_manager.py +++ b/tests/providers/cncf/kubernetes/utils/test_pod_manager.py @@ -284,6 +284,30 @@ def test_fetch_container_logs_invoke_progress_callback( self.pod_manager.fetch_container_logs(mock.MagicMock(), mock.MagicMock(), follow=True) self.mock_progress_callback.assert_has_calls([mock.call(message), mock.call(no_ts_message)]) + @mock.patch("airflow.providers.cncf.kubernetes.utils.pod_manager.PodManager.container_is_running") + def test_fetch_container_logs_failures(self, mock_container_is_running): + last_timestamp_string = "2020-10-08T14:18:17.793417674Z" + messages = [ + bytes("2020-10-08T14:16:17.793417674Z message", "utf-8"), + bytes("2020-10-08T14:17:17.793417674Z message", "utf-8"), + None, + bytes(f"{last_timestamp_string} message", "utf-8"), + ] + PodLogsConsumer.messages = messages + + def consumer_iter(): + while messages: + message = messages.pop(0) + if message is None: + raise BaseHTTPError("Boom") + yield message + + with mock.patch.object(PodLogsConsumer, "__iter__") as mock_consumer_iter: + mock_consumer_iter.side_effect = consumer_iter + mock_container_is_running.side_effect = [True, True, False] + status = self.pod_manager.fetch_container_logs(mock.MagicMock(), mock.MagicMock(), follow=True) + assert status.last_log_time == cast(DateTime, pendulum.parse(last_timestamp_string)) + def test_parse_invalid_log_line(self, caplog): with caplog.at_level(logging.INFO): self.pod_manager.parse_log_line("2020-10-08T14:16:17.793417674ZInvalidmessage\n") From d71f2f4ad4b0d059cbe5fe3d46a29def8344a114 Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?=EA=B9=80=EB=8F=99=EC=9B=90?= Date: Mon, 18 Sep 2023 23:28:26 +0900 Subject: [PATCH 4/4] Check there are duplicated logs MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit Signed-off-by: 김동원 --- tests/providers/cncf/kubernetes/utils/test_pod_manager.py | 5 +++-- 1 file changed, 3 insertions(+), 2 deletions(-) diff --git a/tests/providers/cncf/kubernetes/utils/test_pod_manager.py b/tests/providers/cncf/kubernetes/utils/test_pod_manager.py index 88ee73269abbc..42e2a5c6104da 100644 --- a/tests/providers/cncf/kubernetes/utils/test_pod_manager.py +++ b/tests/providers/cncf/kubernetes/utils/test_pod_manager.py @@ -293,7 +293,7 @@ def test_fetch_container_logs_failures(self, mock_container_is_running): None, bytes(f"{last_timestamp_string} message", "utf-8"), ] - PodLogsConsumer.messages = messages + expected_call_count = len([message for message in messages if message is not None]) def consumer_iter(): while messages: @@ -306,7 +306,8 @@ def consumer_iter(): mock_consumer_iter.side_effect = consumer_iter mock_container_is_running.side_effect = [True, True, False] status = self.pod_manager.fetch_container_logs(mock.MagicMock(), mock.MagicMock(), follow=True) - assert status.last_log_time == cast(DateTime, pendulum.parse(last_timestamp_string)) + assert status.last_log_time == cast(DateTime, pendulum.parse(last_timestamp_string)) + assert self.mock_progress_callback.call_count == expected_call_count def test_parse_invalid_log_line(self, caplog): with caplog.at_level(logging.INFO):