From e26679df8315aad804b1a31fe3d86aa411a9afe8 Mon Sep 17 00:00:00 2001 From: Daniel Standish <15932138+dstandish@users.noreply.github.com> Date: Thu, 14 Dec 2023 10:19:31 -0800 Subject: [PATCH] KPO write_logs does not need to use complicated pod logs reader --- .../cncf/kubernetes/operators/pod.py | 21 +++++++++---------- 1 file changed, 10 insertions(+), 11 deletions(-) diff --git a/airflow/providers/cncf/kubernetes/operators/pod.py b/airflow/providers/cncf/kubernetes/operators/pod.py index 8a701b675a19d..0ccbc26159ced 100644 --- a/airflow/providers/cncf/kubernetes/operators/pod.py +++ b/airflow/providers/cncf/kubernetes/operators/pod.py @@ -625,7 +625,7 @@ def execute_complete(self, context: Context, event: dict, **kwargs): if event["status"] in ("error", "failed", "timeout"): # fetch some logs when pod is failed if self.get_logs: - self.write_logs(pod) + self._write_logs(pod) if "stack_trace" in event: message = f"{event['message']}\n{event['stack_trace']}" else: @@ -634,7 +634,7 @@ def execute_complete(self, context: Context, event: dict, **kwargs): elif event["status"] == "success": # fetch some logs when pod is executed successfully if self.get_logs: - self.write_logs(pod) + self._write_logs(pod) if self.do_xcom_push: xcom_sidecar_output = self.extract_xcom(pod=pod) @@ -651,22 +651,21 @@ def execute_complete(self, context: Context, event: dict, **kwargs): remote_pod=pod, ) - def write_logs(self, pod: k8s.V1Pod): + def _write_logs(self, pod: k8s.V1Pod): try: - logs = self.pod_manager.read_pod_logs( - pod=pod, + logs = self.client.read_namespaced_pod_log( + name=pod.metadata.name, + namespace=pod.metadata.namespace, container_name=self.base_container_name, + timestamps=False, follow=False, + _preload_content=False, ) for raw_line in logs: line = raw_line.decode("utf-8", errors="backslashreplace").rstrip("\n") - self.log.info("Container logs: %s", line) + self.log.info("[%s] %s", self.base_container_name, line) except HTTPError as e: - self.log.warning( - "Reading of logs interrupted with error %r; will retry. " - "Set log level to DEBUG for traceback.", - e, - ) + self.log.warning("Reading of logs interrupted with error %r;", e) def post_complete_action(self, *, pod, remote_pod, **kwargs): """Actions that must be done after operator finishes logic of the deferrable_execution."""