From 456a2b6ca67eee95590e81809f2bd3722880b6dc Mon Sep 17 00:00:00 2001 From: Daniel Standish <15932138+dstandish@users.noreply.github.com> Date: Fri, 12 Jul 2024 16:28:44 -0700 Subject: [PATCH 1/3] Add task instance event log table --- .../endpoints/event_log_endpoint.py | 38 +++++++++++ .../api_connexion/schemas/event_log_schema.py | 2 + .../schemas/task_event_log_schema.py | 40 ++++++++++++ .../auth/managers/models/resource_details.py | 1 + airflow/executors/base_executor.py | 33 +++++----- airflow/jobs/scheduler_job_runner.py | 27 +++++++- ...0147_2_10_0_add_task_instance_event_log.py | 64 +++++++++++++++++++ airflow/models/taskinstance.py | 62 ++++++++++++------ .../amazon/aws/executors/ecs/ecs_executor.py | 30 ++++----- .../cncf/kubernetes/utils/pod_manager.py | 2 +- .../fab/auth_manager/fab_auth_manager.py | 2 + .../auth_manager/security_manager/override.py | 1 + airflow/security/permissions.py | 1 + airflow/www/security_manager.py | 2 + 14 files changed, 253 insertions(+), 52 deletions(-) create mode 100644 airflow/api_connexion/schemas/task_event_log_schema.py create mode 100644 airflow/migrations/versions/0147_2_10_0_add_task_instance_event_log.py diff --git a/airflow/api_connexion/endpoints/event_log_endpoint.py b/airflow/api_connexion/endpoints/event_log_endpoint.py index 3b3dbe6efd490..3cd6f7c9f1d32 100644 --- a/airflow/api_connexion/endpoints/event_log_endpoint.py +++ b/airflow/api_connexion/endpoints/event_log_endpoint.py @@ -30,6 +30,7 @@ ) from airflow.auth.managers.models.resource_details import DagAccessEntity from airflow.models import Log +from airflow.models.taskinstance import TaskEventLog from airflow.utils import timezone from airflow.utils.db import get_query_count from airflow.utils.session import NEW_SESSION, provide_session @@ -112,3 +113,40 @@ def get_event_logs( return event_log_collection_schema.dump( EventLogCollection(event_logs=event_logs, total_entries=total_entries) ) + + +@security.requires_access_dag("GET", DagAccessEntity.TASK_EVENT_LOG) +@format_parameters({"limit": check_limit}) +@provide_session +def get_task_event_logs( + *, + dag_id: str | None = None, + task_id: str | None = None, + run_id: str | None = None, + map_index: int | None = None, + try_number: int | None = None, + limit: int, + offset: int | None = None, + session: Session = NEW_SESSION, +) -> APIResponse: + """Get all log entries from event log.""" + query = select(TaskEventLog) + + if dag_id: + query = query.where(TaskEventLog.dag_id == dag_id) + if task_id: + query = query.where(TaskEventLog.task_id == task_id) + if run_id: + query = query.where(TaskEventLog.run_id == run_id) + if map_index: + query = query.where(TaskEventLog.map_index == map_index) + if try_number: + query = query.where(TaskEventLog.try_number == try_number) + + total_entries = get_query_count(query, session=session) + + query = query.order_by(TaskEventLog.id) + event_logs = session.scalars(query.offset(offset).limit(limit)).all() + return event_log_collection_schema.dump( + EventLogCollection(event_logs=event_logs, total_entries=total_entries) + ) diff --git a/airflow/api_connexion/schemas/event_log_schema.py b/airflow/api_connexion/schemas/event_log_schema.py index da1fc73cc1697..e14591906c563 100644 --- a/airflow/api_connexion/schemas/event_log_schema.py +++ b/airflow/api_connexion/schemas/event_log_schema.py @@ -21,6 +21,7 @@ from marshmallow import Schema, fields from marshmallow_sqlalchemy import SQLAlchemySchema, auto_field +from airflow.api_connexion.schemas.task_event_log_schema import TaskEventLogSchema from airflow.models.log import Log @@ -54,6 +55,7 @@ class EventLogCollectionSchema(Schema): """EventLog Collection Schema.""" event_logs = fields.List(fields.Nested(EventLogSchema)) + task_event_logs = fields.List(fields.Nested(TaskEventLogSchema)) total_entries = fields.Int() diff --git a/airflow/api_connexion/schemas/task_event_log_schema.py b/airflow/api_connexion/schemas/task_event_log_schema.py new file mode 100644 index 0000000000000..43c44d7ddfb89 --- /dev/null +++ b/airflow/api_connexion/schemas/task_event_log_schema.py @@ -0,0 +1,40 @@ +from __future__ import annotations + +from typing import NamedTuple + +from marshmallow import Schema, fields +from marshmallow_sqlalchemy import SQLAlchemySchema, auto_field + +from airflow.models.taskinstance import TaskEventLog + + +class TaskEventLogSchema(SQLAlchemySchema): + """Event log schema.""" + + class Meta: + """Meta.""" + + model = TaskEventLog + + id = auto_field(dump_only=True) + dag_id = auto_field(dump_only=True) + task_id = auto_field(dump_only=True) + run_id = auto_field(dump_only=True) + map_index = auto_field(dump_only=True) + try_number = auto_field(dump_only=True) + message = auto_field(dump_only=True) + + +class TaskEventLogCollection(NamedTuple): + """List of import errors with metadata.""" + + event_logs: list[TaskEventLog] + total_entries: int + + +class TaskEventLogCollectionSchema(Schema): + """EventLog Collection Schema.""" + + event_logs = fields.List(fields.Nested(TaskEventLogSchema)) + task_event_logs = fields.List(fields.Nested(TaskEventLogSchema)) + total_entries = fields.Int() diff --git a/airflow/auth/managers/models/resource_details.py b/airflow/auth/managers/models/resource_details.py index fcbee5a2ad299..456702d8858a8 100644 --- a/airflow/auth/managers/models/resource_details.py +++ b/airflow/auth/managers/models/resource_details.py @@ -80,6 +80,7 @@ class DagAccessEntity(Enum): """Enum of DAG entities the user tries to access.""" AUDIT_LOG = "AUDIT_LOG" + TASK_EVENT_LOG = "TASK_EVENT_LOG" CODE = "CODE" DEPENDENCIES = "DEPENDENCIES" RUN = "RUN" diff --git a/airflow/executors/base_executor.py b/airflow/executors/base_executor.py index 153f0d3360e93..22e8bacf7f69b 100644 --- a/airflow/executors/base_executor.py +++ b/airflow/executors/base_executor.py @@ -21,9 +21,8 @@ import logging import sys import warnings -from collections import defaultdict +from collections import defaultdict, deque from dataclasses import dataclass, field -from functools import cached_property from typing import TYPE_CHECKING, Any, List, Optional, Sequence, Tuple import pendulum @@ -33,7 +32,6 @@ from airflow.exceptions import RemovedInAirflow3Warning from airflow.stats import Stats from airflow.utils.log.logging_mixin import LoggingMixin -from airflow.utils.log.task_context_logger import TaskContextLogger from airflow.utils.state import TaskInstanceState PARALLELISM: int = conf.getint("core", "PARALLELISM") @@ -131,6 +129,16 @@ def __init__(self, parallelism: int = PARALLELISM): self.queued_tasks: dict[TaskInstanceKey, QueuedTaskInstanceType] = {} self.running: set[TaskInstanceKey] = set() self.event_buffer: dict[TaskInstanceKey, EventBufferValueType] = {} + self._task_event_logs: deque[tuple[TaskInstanceKey, str]] = deque() + """ + Deque for storing task event log messages. + + This attribute is only internally public and should not be manipulated + directly by subclasses. + + :meta private: + """ + self.attempts: dict[TaskInstanceKey, RunningRetryAttemptType] = defaultdict(RunningRetryAttemptType) def __repr__(self): @@ -139,6 +147,10 @@ def __repr__(self): def start(self): # pragma: no cover """Executors may need to get things started.""" + def log_task_event(self, *, ti_key: TaskInstanceKey, description:str): + """Log an event to the task instance event log.""" + self._task_event_logs.append((ti_key, description)) + def queue_command( self, task_instance: TaskInstance, @@ -286,12 +298,11 @@ def trigger_tasks(self, open_slots: int) -> None: self.log.info("queued but still running; attempt=%s task=%s", attempt.total_tries, key) continue # Otherwise, we give up and remove the task from the queue. - self.send_message_to_task_logs( - logging.ERROR, + + self.log.error( "could not queue task %s (still running after %d attempts).", key, attempt.total_tries, - ti=ti, ) del self.attempts[key] del self.queued_tasks[key] @@ -523,16 +534,6 @@ def send_callback(self, request: CallbackRequest) -> None: raise ValueError("Callback sink is not ready.") self.callback_sink.send(request) - @cached_property - def _task_context_logger(self) -> TaskContextLogger: - return TaskContextLogger( - component_name="Executor", - call_site_logger=self.log, - ) - - def send_message_to_task_logs(self, level: int, msg: str, *args, ti: TaskInstance | TaskInstanceKey): - self._task_context_logger._log(level, msg, *args, ti=ti) - @staticmethod def get_cli_commands() -> list[GroupCommand]: """ diff --git a/airflow/jobs/scheduler_job_runner.py b/airflow/jobs/scheduler_job_runner.py index 1d49f412b0d7f..60e30d365aa96 100644 --- a/airflow/jobs/scheduler_job_runner.py +++ b/airflow/jobs/scheduler_job_runner.py @@ -24,7 +24,7 @@ import sys import time import warnings -from collections import Counter, defaultdict +from collections import Counter, defaultdict, deque from dataclasses import dataclass from datetime import timedelta from functools import lru_cache, partial @@ -55,7 +55,7 @@ TaskOutletDatasetReference, ) from airflow.models.serialized_dag import SerializedDagModel -from airflow.models.taskinstance import SimpleTaskInstance, TaskInstance +from airflow.models.taskinstance import SimpleTaskInstance, TaskEventLog, TaskInstance from airflow.stats import Stats from airflow.ti_deps.dependencies_states import EXECUTION_STATES from airflow.timetables.simple import DatasetTriggeredTimetable @@ -740,6 +740,21 @@ def _critical_section_enqueue_task_instances(self, session: Session) -> int: return len(queued_tis) + @staticmethod + def _process_task_event_logs(log_records: deque[tuple[TaskInstanceKey, str]], session: Session): + while log_records: + ti_key, description = log_records.popleft() + session.add( + TaskEventLog( + task_id=ti_key.task_id, + dag_id=ti_key.dag_id, + run_id=ti_key.run_id, + map_index=ti_key.map_index, + try_number=ti_key.try_number, + description=description, + ) + ) + def _process_executor_events(self, executor: BaseExecutor, session: Session) -> int: """Respond to executor events.""" if not self._standalone_dag_processor and not self.processor_agent: @@ -1066,6 +1081,14 @@ def _run_scheduler_loop(self) -> None: num_finished_events += self._process_executor_events( executor=executor, session=session ) + + for executor in self.job.executors: + try: + with create_session() as session: + self._process_task_event_logs(executor._task_event_logs, session) + except Exception: + self.log.exception("Something went wrong when trying to save task event logs.") + if self.processor_agent: self.processor_agent.heartbeat() diff --git a/airflow/migrations/versions/0147_2_10_0_add_task_instance_event_log.py b/airflow/migrations/versions/0147_2_10_0_add_task_instance_event_log.py new file mode 100644 index 0000000000000..0dc5153b398d6 --- /dev/null +++ b/airflow/migrations/versions/0147_2_10_0_add_task_instance_event_log.py @@ -0,0 +1,64 @@ +# +# Licensed to the Apache Software Foundation (ASF) under one +# or more contributor license agreements. See the NOTICE file +# distributed with this work for additional information +# regarding copyright ownership. The ASF licenses this file +# to you under the Apache License, Version 2.0 (the +# "License"); you may not use this file except in compliance +# with the License. You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, +# software distributed under the License is distributed on an +# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +# KIND, either express or implied. See the License for the +# specific language governing permissions and limitations +# under the License. + +""" +Add task instance event log. + +Revision ID: 8eef27533e75 +Revises: d482b7261ff9 +Create Date: 2024-07-13 07:33:43.406606 + +""" + +from __future__ import annotations + +import sqlalchemy as sa +from alembic import op + +from airflow.utils.sqlalchemy import UtcDateTime + +# revision identifiers, used by Alembic. +revision = "8eef27533e75" +down_revision = "d482b7261ff9" +branch_labels = None +depends_on = None +airflow_version = "2.10.0" + + +def upgrade(): + """Apply add task instance event log.""" + # ### commands auto generated by Alembic - please adjust! ### + op.create_table( + "task_event_log", + sa.Column("id", sa.Integer(), nullable=False), + sa.Column("task_id", sa.String(length=250), nullable=False), + sa.Column("dag_id", sa.String(length=250), nullable=False), + sa.Column("run_id", sa.String(length=250), nullable=False), + sa.Column("map_index", sa.Integer(), nullable=False), + sa.Column("try_number", sa.Integer(), nullable=False), + sa.Column( + "description", sa.String(length=100).with_variant(sa.Text(length=100), "mysql"), nullable=True + ), + sa.Column("created_at", UtcDateTime(timezone=True), nullable=False), + sa.PrimaryKeyConstraint("id", name="task_event_log_pkey"), + ) + + +def downgrade(): + """Unapply add task instance event log.""" + op.drop_table("task_event_log") diff --git a/airflow/models/taskinstance.py b/airflow/models/taskinstance.py index 80b3eedbc8433..b111de23eafb2 100644 --- a/airflow/models/taskinstance.py +++ b/airflow/models/taskinstance.py @@ -92,7 +92,6 @@ from airflow.models.base import Base, StringID, TaskInstanceDependencies, _sentinel from airflow.models.dagbag import DagBag from airflow.models.dataset import DatasetModel -from airflow.models.log import Log from airflow.models.mappedoperator import MappedOperator from airflow.models.param import process_params from airflow.models.renderedtifields import get_serialized_template_fields @@ -193,22 +192,18 @@ def _merge_ti(ti, session: Session = NEW_SESSION): @internal_api_call @provide_session def _add_log( - event, - task_instance=None, - owner=None, - owner_display_name=None, - extra=None, + task_instance, + message, session: Session = NEW_SESSION, - **kwargs, ): session.add( - Log( - event, - task_instance, - owner, - owner_display_name, - extra, - **kwargs, + TaskEventLog( + task_id=task_instance.task_id, + dag_id=task_instance.dag_id, + run_id=task_instance.run_id, + map_index=task_instance.map_index, + try_number=task_instance.try_number, + description=message, ) ) @@ -358,7 +353,7 @@ def _run_raw_task( _run_finished_callback(callbacks=ti.task.on_success_callback, context=context) if not test_mode: - _add_log(event=ti.state, task_instance=ti, session=session) + _add_log(task_instance=ti, message=ti.state, session=session) if ti.state == TaskInstanceState.SUCCESS: ti._register_dataset_changes(events=context["outlet_events"], session=session) @@ -1639,7 +1634,7 @@ def _defer_task( else: ti.trigger_timeout = ti.start_date + execution_timeout if ti.test_mode: - _add_log(event=ti.state, task_instance=ti, session=session) + _add_log(task_instance=ti, message=ti.state, session=session) if exception is not None: session.merge(ti) @@ -2762,7 +2757,7 @@ def _check_and_change_state_before_execution( cls.logger().info("Starting attempt %s of %s", ti.try_number, ti.max_tries + 1) if not test_mode: - session.add(Log(TaskInstanceState.RUNNING.value, ti)) + _add_log(task_instance=ti, message=TaskInstanceState.RUNNING.value, session=session) ti.state = TaskInstanceState.RUNNING ti.emit_state_change_metric(TaskInstanceState.RUNNING) @@ -3218,7 +3213,7 @@ def fetch_handle_failure_context( Stats.incr("ti_failures", tags=ti.stats_tags) if not test_mode: - session.add(Log(TaskInstanceState.FAILED.value, ti)) + _add_log(task_instance=ti, message=TaskInstanceState.FAILED.value, session=session) # Log failure duration session.add(TaskFail(ti=ti)) @@ -4094,6 +4089,37 @@ def __repr__(self): return prefix + ">" +class TaskEventLog(Base): + """For logging events around the scheduling and execution of task instances.""" + + __tablename__ = "task_event_log" + + id = Column(Integer, primary_key=True) + dag_id = Column(StringID(), nullable=False) + task_id = Column(StringID(), nullable=False) + run_id = Column(StringID(), nullable=False) + map_index = Column(Integer, nullable=False) + try_number = Column(Integer, nullable=False) + description = Column(String(100).with_variant(Text(100), "mysql")) + created_at = Column(UtcDateTime, default=timezone.utcnow, nullable=False) + + __table_args__ = ( + PrimaryKeyConstraint("id", name="task_event_log_pkey"), + # intentionally not adding foreign key to task_instance to avoid locking + ) + + def __init__(self, task_id, dag_id, run_id, map_index, try_number, description): + self.task_id = task_id + self.dag_id = dag_id + self.run_id = run_id + self.map_index = map_index + self.try_number = try_number + self.description = description + + def __repr__(self): + return f"<{self.__class__.__name__}: dag_id={self.dag_id} task_id={self.task_id} run_id={self.run_id} map_index={self.map_index}>" + + STATICA_HACK = True globals()["kcah_acitats"[::-1].upper()] = False if STATICA_HACK: # pragma: no cover diff --git a/airflow/providers/amazon/aws/executors/ecs/ecs_executor.py b/airflow/providers/amazon/aws/executors/ecs/ecs_executor.py index 48c286c6fbe90..569d99db05945 100644 --- a/airflow/providers/amazon/aws/executors/ecs/ecs_executor.py +++ b/airflow/providers/amazon/aws/executors/ecs/ecs_executor.py @@ -23,9 +23,9 @@ from __future__ import annotations -import logging import time from collections import defaultdict, deque +from contextlib import suppress from copy import deepcopy from typing import TYPE_CHECKING, Sequence @@ -385,19 +385,22 @@ def attempt_task_runs(self): ) self.pending_tasks.append(ecs_task) else: - self.send_message_to_task_logs( - logging.ERROR, - "ECS task %s has failed a maximum of %s times. Marking as failed. Reasons: %s", + reasons_str = ", ".join(failure_reasons) + self.log.error( + "ECS task %s has failed %s times. Marking as failed. Reasons: %s", task_key, attempt_number, - ", ".join(failure_reasons), - ti=task_key, + reasons_str, + ) + self.log_task_event( + ti_key=task_key, + description=f"Task could not be queued after {attempt_number} attempts. " + f"Marking as failed. Reasons: {reasons_str}", ) self.fail(task_key) elif not run_task_response["tasks"]: - self.send_message_to_task_logs( - logging.ERROR, "ECS RunTask Response: %s", run_task_response, ti=task_key - ) + self.log.error("ECS RunTask Response: %s", run_task_response) + self.log_task_event(ti_key=task_key, description=f"ECS RunTask Response: {run_task_response}") raise EcsExecutorException( "No failures and no ECS tasks provided in response. This should never happen." ) @@ -543,10 +546,7 @@ def try_adopt_task_instances(self, tis: Sequence[TaskInstance]) -> Sequence[Task not_adopted_tis = [ti for ti in tis if ti not in adopted_tis] return not_adopted_tis - def send_message_to_task_logs(self, level: int, msg: str, *args, ti: TaskInstance | TaskInstanceKey): + def log_task_event(self, *, ti_key: TaskInstanceKey, description: str): # TODO: remove this method when min_airflow_version is set to higher than 2.10.0 - try: - super().send_message_to_task_logs(level, msg, *args, ti=ti) - except AttributeError: - # ``send_message_to_task_logs`` is added in 2.10.0 - self.log.error(msg, *args) + with suppress(AttributeError): + super().log_task_event(ti_key=ti_key, description=description) diff --git a/airflow/providers/cncf/kubernetes/utils/pod_manager.py b/airflow/providers/cncf/kubernetes/utils/pod_manager.py index 7c283eaccc989..823fac7bba3b3 100644 --- a/airflow/providers/cncf/kubernetes/utils/pod_manager.py +++ b/airflow/providers/cncf/kubernetes/utils/pod_manager.py @@ -187,7 +187,7 @@ def get_container_termination_message(pod: V1Pod, container_name: str): with suppress(AttributeError, TypeError): container_statuses = pod.status.container_statuses container_status = next((x for x in container_statuses if x.name == container_name), None) - return container_status.state.terminated.message if container_status else None + return container_status.state.terminated.description if container_status else None def check_exception_is_kubernetes_api_unauthorized(exc: BaseException): diff --git a/airflow/providers/fab/auth_manager/fab_auth_manager.py b/airflow/providers/fab/auth_manager/fab_auth_manager.py index ffd5e5cab5d3e..2ad78b83fe56f 100644 --- a/airflow/providers/fab/auth_manager/fab_auth_manager.py +++ b/airflow/providers/fab/auth_manager/fab_auth_manager.py @@ -72,6 +72,7 @@ RESOURCE_POOL, RESOURCE_PROVIDER, RESOURCE_SLA_MISS, + RESOURCE_TASK_EVENT_LOG, RESOURCE_TASK_INSTANCE, RESOURCE_TASK_LOG, RESOURCE_TASK_RESCHEDULE, @@ -93,6 +94,7 @@ from airflow.providers.fab.auth_manager.security_manager.override import FabAirflowSecurityManagerOverride _MAP_DAG_ACCESS_ENTITY_TO_FAB_RESOURCE_TYPE: dict[DagAccessEntity, tuple[str, ...]] = { + DagAccessEntity.TASK_EVENT_LOG: (RESOURCE_TASK_EVENT_LOG,), DagAccessEntity.AUDIT_LOG: (RESOURCE_AUDIT_LOG,), DagAccessEntity.CODE: (RESOURCE_DAG_CODE,), DagAccessEntity.DEPENDENCIES: (RESOURCE_DAG_DEPENDENCIES,), diff --git a/airflow/providers/fab/auth_manager/security_manager/override.py b/airflow/providers/fab/auth_manager/security_manager/override.py index bc9a03966bb05..84de8d52527c6 100644 --- a/airflow/providers/fab/auth_manager/security_manager/override.py +++ b/airflow/providers/fab/auth_manager/security_manager/override.py @@ -309,6 +309,7 @@ class FabAirflowSecurityManagerOverride(AirflowSecurityManagerV2): # [START security_admin_perms] ADMIN_PERMISSIONS = [ + (permissions.ACTION_CAN_READ, permissions.RESOURCE_TASK_EVENT_LOG), (permissions.ACTION_CAN_READ, permissions.RESOURCE_AUDIT_LOG), (permissions.ACTION_CAN_ACCESS_MENU, permissions.RESOURCE_AUDIT_LOG), (permissions.ACTION_CAN_READ, permissions.RESOURCE_TASK_RESCHEDULE), diff --git a/airflow/security/permissions.py b/airflow/security/permissions.py index e333876184714..19c33b6df0ff7 100644 --- a/airflow/security/permissions.py +++ b/airflow/security/permissions.py @@ -20,6 +20,7 @@ RESOURCE_ACTION = "Permissions" RESOURCE_ADMIN_MENU = "Admin" RESOURCE_AUDIT_LOG = "Audit Logs" +RESOURCE_TASK_EVENT_LOG = "Task Event Logs" RESOURCE_BROWSE_MENU = "Browse" RESOURCE_CONFIG = "Configurations" RESOURCE_CONNECTION = "Connections" diff --git a/airflow/www/security_manager.py b/airflow/www/security_manager.py index 926148f7eba86..bb08e37838879 100644 --- a/airflow/www/security_manager.py +++ b/airflow/www/security_manager.py @@ -57,6 +57,7 @@ RESOURCE_POOL, RESOURCE_PROVIDER, RESOURCE_SLA_MISS, + RESOURCE_TASK_EVENT_LOG, RESOURCE_TASK_INSTANCE, RESOURCE_TASK_RESCHEDULE, RESOURCE_TRIGGER, @@ -271,6 +272,7 @@ def _is_authorized_dag(entity_=None, details_func_=None): for resource, entity, details_func in [ (RESOURCE_DAG, None, None), (RESOURCE_AUDIT_LOG, DagAccessEntity.AUDIT_LOG, None), + (RESOURCE_TASK_EVENT_LOG, DagAccessEntity.TASK_EVENT_LOG, None), (RESOURCE_DAG_CODE, DagAccessEntity.CODE, None), (RESOURCE_DAG_DEPENDENCIES, DagAccessEntity.DEPENDENCIES, None), (RESOURCE_SLA_MISS, DagAccessEntity.SLA_MISS, None), From 84484bdc28f984e6db8f32ed07587b553419ffbb Mon Sep 17 00:00:00 2001 From: Daniel Standish <15932138+dstandish@users.noreply.github.com> Date: Tue, 16 Jul 2024 11:18:39 -0700 Subject: [PATCH 2/3] pre-commit fixes --- .../schemas/task_event_log_schema.py | 17 + airflow/executors/base_executor.py | 2 +- ...149_2_10_0_add_task_instance_event_log.py} | 4 +- airflow/utils/db.py | 2 +- docs/apache-airflow/img/airflow_erd.sha256 | 2 +- docs/apache-airflow/img/airflow_erd.svg | 353 ++++++++++-------- docs/apache-airflow/migrations-ref.rst | 4 +- 7 files changed, 224 insertions(+), 160 deletions(-) rename airflow/migrations/versions/{0147_2_10_0_add_task_instance_event_log.py => 0149_2_10_0_add_task_instance_event_log.py} (97%) diff --git a/airflow/api_connexion/schemas/task_event_log_schema.py b/airflow/api_connexion/schemas/task_event_log_schema.py index 43c44d7ddfb89..05973dfc6cbfc 100644 --- a/airflow/api_connexion/schemas/task_event_log_schema.py +++ b/airflow/api_connexion/schemas/task_event_log_schema.py @@ -1,3 +1,20 @@ +# Licensed to the Apache Software Foundation (ASF) under one +# or more contributor license agreements. See the NOTICE file +# distributed with this work for additional information +# regarding copyright ownership. The ASF licenses this file +# to you under the Apache License, Version 2.0 (the +# "License"); you may not use this file except in compliance +# with the License. You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, +# software distributed under the License is distributed on an +# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +# KIND, either express or implied. See the License for the +# specific language governing permissions and limitations +# under the License. + from __future__ import annotations from typing import NamedTuple diff --git a/airflow/executors/base_executor.py b/airflow/executors/base_executor.py index 22e8bacf7f69b..c627d520fe88d 100644 --- a/airflow/executors/base_executor.py +++ b/airflow/executors/base_executor.py @@ -147,7 +147,7 @@ def __repr__(self): def start(self): # pragma: no cover """Executors may need to get things started.""" - def log_task_event(self, *, ti_key: TaskInstanceKey, description:str): + def log_task_event(self, *, ti_key: TaskInstanceKey, description: str): """Log an event to the task instance event log.""" self._task_event_logs.append((ti_key, description)) diff --git a/airflow/migrations/versions/0147_2_10_0_add_task_instance_event_log.py b/airflow/migrations/versions/0149_2_10_0_add_task_instance_event_log.py similarity index 97% rename from airflow/migrations/versions/0147_2_10_0_add_task_instance_event_log.py rename to airflow/migrations/versions/0149_2_10_0_add_task_instance_event_log.py index 0dc5153b398d6..8aaeab49b7e54 100644 --- a/airflow/migrations/versions/0147_2_10_0_add_task_instance_event_log.py +++ b/airflow/migrations/versions/0149_2_10_0_add_task_instance_event_log.py @@ -20,7 +20,7 @@ Add task instance event log. Revision ID: 8eef27533e75 -Revises: d482b7261ff9 +Revises: ec3471c1e067 Create Date: 2024-07-13 07:33:43.406606 """ @@ -34,7 +34,7 @@ # revision identifiers, used by Alembic. revision = "8eef27533e75" -down_revision = "d482b7261ff9" +down_revision = "ec3471c1e067" branch_labels = None depends_on = None airflow_version = "2.10.0" diff --git a/airflow/utils/db.py b/airflow/utils/db.py index 463b6894f5226..5c21cae1b4f59 100644 --- a/airflow/utils/db.py +++ b/airflow/utils/db.py @@ -118,7 +118,7 @@ class MappedClassProtocol(Protocol): "2.8.1": "88344c1d9134", "2.9.0": "1949afb29106", "2.9.2": "686269002441", - "2.10.0": "ec3471c1e067", + "2.10.0": "8eef27533e75", } diff --git a/docs/apache-airflow/img/airflow_erd.sha256 b/docs/apache-airflow/img/airflow_erd.sha256 index a3624746f5ec8..0bca608ed9746 100644 --- a/docs/apache-airflow/img/airflow_erd.sha256 +++ b/docs/apache-airflow/img/airflow_erd.sha256 @@ -1 +1 @@ -88c9ce0742c54f376a3c600600d06ac9a6f80a3a2cb85dfcf2472d1c77ed75db \ No newline at end of file +e9e0e8156e73709abafc177c4953303787944f73a47ea18ca1267ba6b25be91b \ No newline at end of file diff --git a/docs/apache-airflow/img/airflow_erd.svg b/docs/apache-airflow/img/airflow_erd.svg index 0328672c20a87..8fbb5caccaeb1 100644 --- a/docs/apache-airflow/img/airflow_erd.svg +++ b/docs/apache-airflow/img/airflow_erd.svg @@ -4,104 +4,149 @@ - - + + %3 - + job - -job - -id - - [INTEGER] - NOT NULL - -dag_id - - [VARCHAR(250)] - -end_date - - [TIMESTAMP] - -executor_class - - [VARCHAR(500)] - -hostname - - [VARCHAR(500)] - -job_type - - [VARCHAR(30)] - -latest_heartbeat - - [TIMESTAMP] - -start_date - - [TIMESTAMP] - -state - - [VARCHAR(20)] - -unixname - - [VARCHAR(1000)] + +job + +id + + [INTEGER] + NOT NULL + +dag_id + + [VARCHAR(250)] + +end_date + + [TIMESTAMP] + +executor_class + + [VARCHAR(500)] + +hostname + + [VARCHAR(500)] + +job_type + + [VARCHAR(30)] + +latest_heartbeat + + [TIMESTAMP] + +start_date + + [TIMESTAMP] + +state + + [VARCHAR(20)] + +unixname + + [VARCHAR(1000)] slot_pool - -slot_pool - -id - - [INTEGER] - NOT NULL - -description - - [TEXT] - -include_deferred - - [BOOLEAN] - NOT NULL - -pool - - [VARCHAR(256)] - -slots - - [INTEGER] + +slot_pool + +id + + [INTEGER] + NOT NULL + +description + + [TEXT] + +include_deferred + + [BOOLEAN] + NOT NULL + +pool + + [VARCHAR(256)] + +slots + + [INTEGER] dag_priority_parsing_request - -dag_priority_parsing_request - -id - - [VARCHAR(32)] - NOT NULL - -fileloc - - [VARCHAR(2000)] - NOT NULL + +dag_priority_parsing_request + +id + + [VARCHAR(32)] + NOT NULL + +fileloc + + [VARCHAR(2000)] + NOT NULL + + + +task_event_log + +task_event_log + +id + + [INTEGER] + NOT NULL + +created_at + + [TIMESTAMP] + NOT NULL + +dag_id + + [VARCHAR(250)] + NOT NULL + +description + + [VARCHAR(100)] + +map_index + + [INTEGER] + NOT NULL + +run_id + + [VARCHAR(250)] + NOT NULL + +task_id + + [VARCHAR(250)] + NOT NULL + +try_number + + [INTEGER] + NOT NULL - + log log @@ -152,7 +197,7 @@ [VARCHAR(250)] - + dag_code dag_code @@ -178,7 +223,7 @@ NOT NULL - + dag_pickle dag_pickle @@ -201,7 +246,7 @@ [BIGINT] - + ab_user ab_user @@ -282,7 +327,7 @@ {0,1} - + ab_user_role ab_user_role @@ -308,7 +353,7 @@ {0,1} - + dag_run_note dag_run_note @@ -344,7 +389,7 @@ {0,1} - + task_instance_note task_instance_note @@ -395,7 +440,7 @@ {0,1} - + ab_register_user ab_register_user @@ -438,7 +483,7 @@ NOT NULL - + connection connection @@ -495,7 +540,7 @@ [VARCHAR(500)] - + callback_request callback_request @@ -530,7 +575,7 @@ [VARCHAR(2000)] - + sla_miss sla_miss @@ -567,7 +612,7 @@ [TIMESTAMP] - + variable variable @@ -594,7 +639,7 @@ [TEXT] - + import_error import_error @@ -621,7 +666,7 @@ [TIMESTAMP] - + serialized_dag serialized_dag @@ -664,7 +709,7 @@ [VARCHAR(2000)] - + dataset dataset @@ -700,7 +745,7 @@ NOT NULL - + dag_schedule_dataset_reference dag_schedule_dataset_reference @@ -733,7 +778,7 @@ 1 - + task_outlet_dataset_reference task_outlet_dataset_reference @@ -771,7 +816,7 @@ 1 - + dataset_dag_run_queue dataset_dag_run_queue @@ -799,7 +844,7 @@ 1 - + dag dag @@ -942,7 +987,7 @@ 1 - + dag_tag dag_tag @@ -965,7 +1010,7 @@ 1 - + dag_owner_attributes dag_owner_attributes @@ -993,7 +1038,7 @@ 1 - + dag_warning dag_warning @@ -1026,7 +1071,7 @@ 1 - + dataset_alias dataset_alias @@ -1042,7 +1087,7 @@ NOT NULL - + dataset_alias_dataset_event dataset_alias_dataset_event @@ -1072,7 +1117,7 @@ 1 - + dataset_event dataset_event @@ -1128,7 +1173,7 @@ 1 - + dagrun_dataset_event dagrun_dataset_event @@ -1151,7 +1196,7 @@ 1 - + log_template log_template @@ -1177,7 +1222,7 @@ NOT NULL - + dag_run dag_run @@ -1286,7 +1331,7 @@ 1 - + task_instance task_instance @@ -1440,7 +1485,7 @@ 1 - + task_reschedule task_reschedule @@ -1566,7 +1611,7 @@ 1 - + rendered_task_instance_fields rendered_task_instance_fields @@ -1629,7 +1674,7 @@ 1 - + task_fail task_fail @@ -1700,7 +1745,7 @@ 1 - + task_map task_map @@ -1763,7 +1808,7 @@ 1 - + xcom xcom @@ -1836,7 +1881,7 @@ 1 - + task_instance_history task_instance_history @@ -2010,7 +2055,7 @@ 1 - + ab_permission ab_permission @@ -2026,7 +2071,7 @@ NOT NULL - + ab_permission_view ab_permission_view @@ -2052,7 +2097,7 @@ {0,1} - + ab_permission_view_role ab_permission_view_role @@ -2078,7 +2123,7 @@ {0,1} - + ab_view_menu ab_view_menu @@ -2101,7 +2146,7 @@ {0,1} - + ab_role ab_role @@ -2131,7 +2176,7 @@ {0,1} - + trigger trigger @@ -2167,39 +2212,39 @@ 0..N {0,1} - - -session - -session - -id - - [INTEGER] - NOT NULL - -data - - [BYTEA] - -expiry - - [TIMESTAMP] - -session_id - - [VARCHAR(255)] - alembic_version - -alembic_version - -version_num - - [VARCHAR(32)] - NOT NULL + +alembic_version + +version_num + + [VARCHAR(32)] + NOT NULL + + + +session + +session + +id + + [INTEGER] + NOT NULL + +data + + [BYTEA] + +expiry + + [TIMESTAMP] + +session_id + + [VARCHAR(255)] diff --git a/docs/apache-airflow/migrations-ref.rst b/docs/apache-airflow/migrations-ref.rst index 95fe3dc591403..0280c94d4a567 100644 --- a/docs/apache-airflow/migrations-ref.rst +++ b/docs/apache-airflow/migrations-ref.rst @@ -39,7 +39,9 @@ Here's the list of all the Database Migrations that are executed via when you ru +---------------------------------+-------------------+-------------------+--------------------------------------------------------------+ | Revision ID | Revises ID | Airflow Version | Description | +=================================+===================+===================+==============================================================+ -| ``ec3471c1e067`` (head) | ``05e19f3176be`` | ``2.10.0`` | Add dataset_alias_dataset_event. | +| ``8eef27533e75`` (head) | ``ec3471c1e067`` | ``2.10.0`` | Add task instance event log. | ++---------------------------------+-------------------+-------------------+--------------------------------------------------------------+ +| ``ec3471c1e067`` | ``05e19f3176be`` | ``2.10.0`` | Add dataset_alias_dataset_event. | +---------------------------------+-------------------+-------------------+--------------------------------------------------------------+ | ``05e19f3176be`` | ``d482b7261ff9`` | ``2.10.0`` | Add dataset_alias. | +---------------------------------+-------------------+-------------------+--------------------------------------------------------------+ From e1048d5f56916303fc3329860fcccc07994e2723 Mon Sep 17 00:00:00 2001 From: Daniel Standish <15932138+dstandish@users.noreply.github.com> Date: Tue, 16 Jul 2024 13:20:19 -0700 Subject: [PATCH 3/3] get API working --- .../endpoints/event_log_endpoint.py | 38 -------- .../endpoints/task_event_log_endpoint.py | 75 +++++++++++++++ airflow/api_connexion/openapi/v1.yaml | 96 +++++++++++++++++++ .../schemas/task_event_log_schema.py | 11 ++- airflow/www/static/js/types/api-generated.ts | 91 ++++++++++++++++++ 5 files changed, 269 insertions(+), 42 deletions(-) create mode 100644 airflow/api_connexion/endpoints/task_event_log_endpoint.py diff --git a/airflow/api_connexion/endpoints/event_log_endpoint.py b/airflow/api_connexion/endpoints/event_log_endpoint.py index 3cd6f7c9f1d32..3b3dbe6efd490 100644 --- a/airflow/api_connexion/endpoints/event_log_endpoint.py +++ b/airflow/api_connexion/endpoints/event_log_endpoint.py @@ -30,7 +30,6 @@ ) from airflow.auth.managers.models.resource_details import DagAccessEntity from airflow.models import Log -from airflow.models.taskinstance import TaskEventLog from airflow.utils import timezone from airflow.utils.db import get_query_count from airflow.utils.session import NEW_SESSION, provide_session @@ -113,40 +112,3 @@ def get_event_logs( return event_log_collection_schema.dump( EventLogCollection(event_logs=event_logs, total_entries=total_entries) ) - - -@security.requires_access_dag("GET", DagAccessEntity.TASK_EVENT_LOG) -@format_parameters({"limit": check_limit}) -@provide_session -def get_task_event_logs( - *, - dag_id: str | None = None, - task_id: str | None = None, - run_id: str | None = None, - map_index: int | None = None, - try_number: int | None = None, - limit: int, - offset: int | None = None, - session: Session = NEW_SESSION, -) -> APIResponse: - """Get all log entries from event log.""" - query = select(TaskEventLog) - - if dag_id: - query = query.where(TaskEventLog.dag_id == dag_id) - if task_id: - query = query.where(TaskEventLog.task_id == task_id) - if run_id: - query = query.where(TaskEventLog.run_id == run_id) - if map_index: - query = query.where(TaskEventLog.map_index == map_index) - if try_number: - query = query.where(TaskEventLog.try_number == try_number) - - total_entries = get_query_count(query, session=session) - - query = query.order_by(TaskEventLog.id) - event_logs = session.scalars(query.offset(offset).limit(limit)).all() - return event_log_collection_schema.dump( - EventLogCollection(event_logs=event_logs, total_entries=total_entries) - ) diff --git a/airflow/api_connexion/endpoints/task_event_log_endpoint.py b/airflow/api_connexion/endpoints/task_event_log_endpoint.py new file mode 100644 index 0000000000000..85d928019aab1 --- /dev/null +++ b/airflow/api_connexion/endpoints/task_event_log_endpoint.py @@ -0,0 +1,75 @@ +# Licensed to the Apache Software Foundation (ASF) under one +# or more contributor license agreements. See the NOTICE file +# distributed with this work for additional information +# regarding copyright ownership. The ASF licenses this file +# to you under the Apache License, Version 2.0 (the +# "License"); you may not use this file except in compliance +# with the License. You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, +# software distributed under the License is distributed on an +# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +# KIND, either express or implied. See the License for the +# specific language governing permissions and limitations +# under the License. + +from __future__ import annotations + +from typing import TYPE_CHECKING + +from sqlalchemy import select + +from airflow.api_connexion import security +from airflow.api_connexion.parameters import check_limit, format_parameters +from airflow.api_connexion.schemas.task_event_log_schema import ( + TaskEventLogCollection, + task_event_log_collection_schema, +) +from airflow.auth.managers.models.resource_details import DagAccessEntity +from airflow.models.taskinstance import TaskEventLog +from airflow.utils.db import get_query_count +from airflow.utils.session import NEW_SESSION, provide_session + +if TYPE_CHECKING: + from sqlalchemy.orm import Session + + from airflow.api_connexion.types import APIResponse + + +@security.requires_access_dag("GET", DagAccessEntity.TASK_EVENT_LOG) +@format_parameters({"limit": check_limit}) +@provide_session +def get_task_event_logs( + *, + dag_id: str | None = None, + task_id: str | None = None, + run_id: str | None = None, + map_index: int | None = None, + try_number: int | None = None, + limit: int, + offset: int | None = None, + session: Session = NEW_SESSION, +) -> APIResponse: + """Get all log entries from event log.""" + query = select(TaskEventLog) + + if dag_id: + query = query.where(TaskEventLog.dag_id == dag_id) + if task_id: + query = query.where(TaskEventLog.task_id == task_id) + if run_id: + query = query.where(TaskEventLog.run_id == run_id) + if map_index: + query = query.where(TaskEventLog.map_index == map_index) + if try_number: + query = query.where(TaskEventLog.try_number == try_number) + + total_entries = get_query_count(query, session=session) + + query = query.order_by(TaskEventLog.id) + logs = session.scalars(query.offset(offset).limit(limit)).all() + return task_event_log_collection_schema.dump( + TaskEventLogCollection(data=logs, total_entries=total_entries) + ) diff --git a/airflow/api_connexion/openapi/v1.yaml b/airflow/api_connexion/openapi/v1.yaml index 273d69ab705db..a399383c325e1 100644 --- a/airflow/api_connexion/openapi/v1.yaml +++ b/airflow/api_connexion/openapi/v1.yaml @@ -1229,6 +1229,37 @@ paths: "404": $ref: "#/components/responses/NotFound" + /taskEventLogs: + get: + summary: List log entries (experimental) + description: | + List log entries from task event log (experimental). + + *New in version 2.10.0* + x-openapi-router-controller: airflow.api_connexion.endpoints.task_event_log_endpoint + operationId: get_task_event_logs + tags: [TaskEventLog] + parameters: + - $ref: "#/components/parameters/PageLimit" + - $ref: "#/components/parameters/PageOffset" + - $ref: "#/components/parameters/OrderBy" + - $ref: "#/components/parameters/FilterDAGID" + - $ref: "#/components/parameters/FilterTaskID" + - $ref: "#/components/parameters/FilterRunID" + - $ref: "#/components/parameters/FilterMapIndex" + - $ref: "#/components/parameters/FilterTryNumber" + responses: + "200": + description: Success. + content: + application/json: + schema: + $ref: "#/components/schemas/TaskEventLogCollection" + "401": + $ref: "#/components/responses/Unauthenticated" + "403": + $ref: "#/components/responses/PermissionDenied" + /eventLogs: get: summary: List log entries @@ -3378,6 +3409,49 @@ components: readOnly: true nullable: true + TaskEventLog: + type: object + description: Log of task events. + properties: + id: + description: The task event log ID + type: integer + readOnly: true + created_at: + description: The time when the event was recorded + format: date-time + type: string + readOnly: true + dag_id: + description: The DAG ID + type: string + readOnly: true + nullable: true + task_id: + description: The Task ID + type: string + readOnly: true + nullable: true + run_id: + description: The DAG Run ID + type: string + readOnly: true + nullable: true + map_index: + description: The DAG Run ID + type: integer + readOnly: true + nullable: true + try_number: + description: The DAG Run ID + type: integer + readOnly: true + nullable: true + description: + description: A description of the event. + type: string + readOnly: true + EventLogCollection: type: object description: | @@ -3393,6 +3467,21 @@ components: $ref: "#/components/schemas/EventLog" - $ref: "#/components/schemas/CollectionInfo" + TaskEventLogCollection: + type: object + description: | + Collection of task event logs. (Experimental) + + *Added in version 2.10.0* + allOf: + - type: object + properties: + data: + type: array + items: + $ref: "#/components/schemas/TaskEventLog" + - $ref: "#/components/schemas/CollectionInfo" + ImportError: type: object properties: @@ -5587,6 +5676,13 @@ components: type: integer description: Filter on map index for mapped task. + FilterTryNumber: + in: query + name: try_number + schema: + type: integer + description: Filter on try number for task instance. + OrderBy: in: query name: order_by diff --git a/airflow/api_connexion/schemas/task_event_log_schema.py b/airflow/api_connexion/schemas/task_event_log_schema.py index 05973dfc6cbfc..14e1fddaa3318 100644 --- a/airflow/api_connexion/schemas/task_event_log_schema.py +++ b/airflow/api_connexion/schemas/task_event_log_schema.py @@ -39,19 +39,22 @@ class Meta: run_id = auto_field(dump_only=True) map_index = auto_field(dump_only=True) try_number = auto_field(dump_only=True) - message = auto_field(dump_only=True) + description = auto_field(dump_only=True) class TaskEventLogCollection(NamedTuple): """List of import errors with metadata.""" - event_logs: list[TaskEventLog] + data: list[TaskEventLog] total_entries: int class TaskEventLogCollectionSchema(Schema): """EventLog Collection Schema.""" - event_logs = fields.List(fields.Nested(TaskEventLogSchema)) - task_event_logs = fields.List(fields.Nested(TaskEventLogSchema)) + data = fields.List(fields.Nested(TaskEventLogSchema)) total_entries = fields.Int() + + +task_event_log_schema = TaskEventLogSchema() +task_event_log_collection_schema = TaskEventLogCollectionSchema() diff --git a/airflow/www/static/js/types/api-generated.ts b/airflow/www/static/js/types/api-generated.ts index 1b82d07835ab2..6a4130137ba8d 100644 --- a/airflow/www/static/js/types/api-generated.ts +++ b/airflow/www/static/js/types/api-generated.ts @@ -345,6 +345,14 @@ export interface paths { }; }; }; + "/taskEventLogs": { + /** + * List log entries from task event log (experimental). + * + * *New in version 2.10.0* + */ + get: operations["get_task_event_logs"]; + }; "/eventLogs": { /** List log entries from event log. */ get: operations["get_event_logs"]; @@ -1280,6 +1288,28 @@ export interface components { /** @description Other information that was not included in the other fields, e.g. the complete CLI command. */ extra?: string | null; }; + /** @description Log of task events. */ + TaskEventLog: { + /** @description The task event log ID */ + id?: number; + /** + * Format: date-time + * @description The time when the event was recorded + */ + created_at?: string; + /** @description The DAG ID */ + dag_id?: string | null; + /** @description The Task ID */ + task_id?: string | null; + /** @description The DAG Run ID */ + run_id?: string | null; + /** @description The DAG Run ID */ + map_index?: number | null; + /** @description The DAG Run ID */ + try_number?: number | null; + /** @description A description of the event. */ + description?: string; + }; /** * @description Collection of event logs. * @@ -1288,6 +1318,14 @@ export interface components { EventLogCollection: { event_logs?: components["schemas"]["EventLog"][]; } & components["schemas"]["CollectionInfo"]; + /** + * @description Collection of task event logs. (Experimental) + * + * *Added in version 2.10.0* + */ + TaskEventLogCollection: { + data?: components["schemas"]["TaskEventLog"][]; + } & components["schemas"]["CollectionInfo"]; ImportError: { /** @description The import error ID. */ import_error_id?: number; @@ -2598,6 +2636,8 @@ export interface components { FilterSourceMapIndex: number; /** @description Filter on map index for mapped task. */ FilterMapIndex: number; + /** @description Filter on try number for task instance. */ + FilterTryNumber: number; /** * @description The name of the field to order the results by. * Prefix a field name with `-` to reverse the sort order. @@ -3655,6 +3695,48 @@ export interface operations { 404: components["responses"]["NotFound"]; }; }; + /** + * List log entries from task event log (experimental). + * + * *New in version 2.10.0* + */ + get_task_event_logs: { + parameters: { + query: { + /** The numbers of items to return. */ + limit?: components["parameters"]["PageLimit"]; + /** The number of items to skip before starting to collect the result set. */ + offset?: components["parameters"]["PageOffset"]; + /** + * The name of the field to order the results by. + * Prefix a field name with `-` to reverse the sort order. + * + * *New in version 2.1.0* + */ + order_by?: components["parameters"]["OrderBy"]; + /** Returns objects matched by the DAG ID. */ + dag_id?: components["parameters"]["FilterDAGID"]; + /** Returns objects matched by the Task ID. */ + task_id?: components["parameters"]["FilterTaskID"]; + /** Returns objects matched by the Run ID. */ + run_id?: components["parameters"]["FilterRunID"]; + /** Filter on map index for mapped task. */ + map_index?: components["parameters"]["FilterMapIndex"]; + /** Filter on try number for task instance. */ + try_number?: components["parameters"]["FilterTryNumber"]; + }; + }; + responses: { + /** Success. */ + 200: { + content: { + "application/json": components["schemas"]["TaskEventLogCollection"]; + }; + }; + 401: components["responses"]["Unauthenticated"]; + 403: components["responses"]["PermissionDenied"]; + }; + }; /** List log entries from event log. */ get_event_logs: { parameters: { @@ -5242,9 +5324,15 @@ export type SetDagRunNote = CamelCasedPropertiesDeep< export type EventLog = CamelCasedPropertiesDeep< components["schemas"]["EventLog"] >; +export type TaskEventLog = CamelCasedPropertiesDeep< + components["schemas"]["TaskEventLog"] +>; export type EventLogCollection = CamelCasedPropertiesDeep< components["schemas"]["EventLogCollection"] >; +export type TaskEventLogCollection = CamelCasedPropertiesDeep< + components["schemas"]["TaskEventLogCollection"] +>; export type ImportError = CamelCasedPropertiesDeep< components["schemas"]["ImportError"] >; @@ -5580,6 +5668,9 @@ export type DeleteDatasetQueuedEventsVariables = CamelCasedPropertiesDeep< operations["delete_dataset_queued_events"]["parameters"]["path"] & operations["delete_dataset_queued_events"]["parameters"]["query"] >; +export type GetTaskEventLogsVariables = CamelCasedPropertiesDeep< + operations["get_task_event_logs"]["parameters"]["query"] +>; export type GetEventLogsVariables = CamelCasedPropertiesDeep< operations["get_event_logs"]["parameters"]["query"] >;