Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
72 changes: 47 additions & 25 deletions airflow/providers/openlineage/plugins/adapter.py
Original file line number Diff line number Diff line change
Expand Up @@ -40,7 +40,6 @@
from airflow.providers.openlineage import __version__ as OPENLINEAGE_PROVIDER_VERSION, conf
from airflow.providers.openlineage.utils.utils import (
OpenLineageRedactor,
get_airflow_dag_run_facet,
get_airflow_debug_facet,
get_airflow_state_run_facet,
)
Expand All @@ -50,9 +49,9 @@
if TYPE_CHECKING:
from datetime import datetime

from airflow.models.dagrun import DagRun
from airflow.providers.openlineage.extractors import OperatorLineage
from airflow.utils.log.secrets_masker import SecretsMasker
from airflow.utils.state import DagRunState

_PRODUCER = f"https://github.com/apache/airflow/tree/providers-openlineage/{OPENLINEAGE_PROVIDER_VERSION}"

Expand Down Expand Up @@ -336,33 +335,36 @@ def fail_task(

def dag_started(
self,
dag_run: DagRun,
msg: str,
dag_id: str,
logical_date: datetime,
start_date: datetime,
nominal_start_time: str,
nominal_end_time: str,
owners: list[str],
run_facets: dict[str, RunFacet],
description: str | None = None,
job_facets: dict[str, JobFacet] | None = None, # Custom job facets
):
try:
owner = [x.strip() for x in dag_run.dag.owner.split(",")] if dag_run.dag else None
event = RunEvent(
eventType=RunState.START,
eventTime=dag_run.start_date.isoformat(),
eventTime=start_date.isoformat(),
job=self._build_job(
job_name=dag_run.dag_id,
job_name=dag_id,
job_type=_JOB_TYPE_DAG,
job_description=dag_run.dag.description if dag_run.dag else None,
owners=owner,
job_description=description,
owners=owners,
job_facets=job_facets,
),
run=self._build_run(
run_id=self.build_dag_run_id(
dag_id=dag_run.dag_id,
logical_date=dag_run.logical_date,
dag_id=dag_id,
logical_date=logical_date,
),
job_name=dag_run.dag_id,
job_name=dag_id,
nominal_start_time=nominal_start_time,
nominal_end_time=nominal_end_time,
run_facets={**get_airflow_dag_run_facet(dag_run), **get_airflow_debug_facet()},
run_facets={**run_facets, **get_airflow_debug_facet()},
),
inputs=[],
outputs=[],
Expand All @@ -375,18 +377,29 @@ def dag_started(
# This part cannot be wrapped to deduplicate code, otherwise the method cannot be pickled in multiprocessing.
self.log.warning("Failed to emit DAG started event: \n %s", traceback.format_exc())

def dag_success(self, dag_run: DagRun, msg: str):
def dag_success(
self,
dag_id: str,
run_id: str,
end_date: datetime,
logical_date: datetime,
dag_run_state: DagRunState,
task_ids: list[str],
):
try:
event = RunEvent(
eventType=RunState.COMPLETE,
eventTime=dag_run.end_date.isoformat(),
job=self._build_job(job_name=dag_run.dag_id, job_type=_JOB_TYPE_DAG),
eventTime=end_date.isoformat(),
job=self._build_job(job_name=dag_id, job_type=_JOB_TYPE_DAG),
run=Run(
runId=self.build_dag_run_id(
dag_id=dag_run.dag_id,
logical_date=dag_run.logical_date,
dag_id=dag_id,
logical_date=logical_date,
),
facets={**get_airflow_state_run_facet(dag_run), **get_airflow_debug_facet()},
facets={
**get_airflow_state_run_facet(dag_id, run_id, task_ids, dag_run_state),
**get_airflow_debug_facet(),
},
),
inputs=[],
outputs=[],
Expand All @@ -399,22 +412,31 @@ def dag_success(self, dag_run: DagRun, msg: str):
# This part cannot be wrapped to deduplicate code, otherwise the method cannot be pickled in multiprocessing.
self.log.warning("Failed to emit DAG success event: \n %s", traceback.format_exc())

def dag_failed(self, dag_run: DagRun, msg: str):
def dag_failed(
self,
dag_id: str,
run_id: str,
end_date: datetime,
logical_date: datetime,
dag_run_state: DagRunState,
task_ids: list[str],
msg: str,
):
try:
event = RunEvent(
eventType=RunState.FAIL,
eventTime=dag_run.end_date.isoformat(),
job=self._build_job(job_name=dag_run.dag_id, job_type=_JOB_TYPE_DAG),
eventTime=end_date.isoformat(),
job=self._build_job(job_name=dag_id, job_type=_JOB_TYPE_DAG),
run=Run(
runId=self.build_dag_run_id(
dag_id=dag_run.dag_id,
logical_date=dag_run.logical_date,
dag_id=dag_id,
logical_date=logical_date,
),
facets={
"errorMessage": error_message_run.ErrorMessageRunFacet(
message=msg, programmingLanguage="python"
),
**get_airflow_state_run_facet(dag_run),
**get_airflow_state_run_facet(dag_id, run_id, task_ids, dag_run_state),
**get_airflow_debug_facet(),
},
),
Expand Down
149 changes: 103 additions & 46 deletions airflow/providers/openlineage/plugins/listener.py
Original file line number Diff line number Diff line change
Expand Up @@ -27,11 +27,13 @@

from airflow import settings
from airflow.listeners import hookimpl
from airflow.models import DagRun
from airflow.providers.openlineage import conf
from airflow.providers.openlineage.extractors import ExtractorManager
from airflow.providers.openlineage.plugins.adapter import OpenLineageAdapter, RunState
from airflow.providers.openlineage.utils.utils import (
IS_AIRFLOW_2_10_OR_HIGHER,
get_airflow_dag_run_facet,
get_airflow_debug_facet,
get_airflow_job_facet,
get_airflow_mapped_task_facet,
Expand All @@ -51,7 +53,7 @@
if TYPE_CHECKING:
from sqlalchemy.orm import Session

from airflow.models import DagRun, TaskInstance
from airflow.models import TaskInstance

_openlineage_listener: OpenLineageListener | None = None

Expand Down Expand Up @@ -413,65 +415,120 @@ def before_stopping(self, component) -> None:

@hookimpl
def on_dag_run_running(self, dag_run: DagRun, msg: str) -> None:
if dag_run.dag and not is_selective_lineage_enabled(dag_run.dag):
self.log.debug(
"Skipping OpenLineage event emission for DAG `%s` "
"due to lack of explicit lineage enablement for DAG while "
"[openlineage] selective_enable is on.",
dag_run.dag_id,
try:
if dag_run.dag and not is_selective_lineage_enabled(dag_run.dag):
self.log.debug(
"Skipping OpenLineage event emission for DAG `%s` "
"due to lack of explicit lineage enablement for DAG while "
"[openlineage] selective_enable is on.",
dag_run.dag_id,
)
return

if not self.executor:
self.log.debug("Executor have not started before `on_dag_run_running`")
return

data_interval_start = (
dag_run.data_interval_start.isoformat() if dag_run.data_interval_start else None
)
return
data_interval_end = dag_run.data_interval_end.isoformat() if dag_run.data_interval_end else None

if not self.executor:
self.log.debug("Executor have not started before `on_dag_run_running`")
return
run_facets = {**get_airflow_dag_run_facet(dag_run)}

data_interval_start = dag_run.data_interval_start.isoformat() if dag_run.data_interval_start else None
data_interval_end = dag_run.data_interval_end.isoformat() if dag_run.data_interval_end else None
self.executor.submit(
self.adapter.dag_started,
dag_run=dag_run,
msg=msg,
nominal_start_time=data_interval_start,
nominal_end_time=data_interval_end,
# AirflowJobFacet should be created outside ProcessPoolExecutor that pickles objects,
# as it causes lack of some TaskGroup attributes and crashes event emission.
job_facets=get_airflow_job_facet(dag_run=dag_run),
)
self.submit_callable(
self.adapter.dag_started,
dag_id=dag_run.dag_id,
run_id=dag_run.run_id,
logical_date=dag_run.logical_date,
start_date=dag_run.start_date,
nominal_start_time=data_interval_start,
nominal_end_time=data_interval_end,
run_facets=run_facets,
owners=[x.strip() for x in dag_run.dag.owner.split(",")] if dag_run.dag else None,
description=dag_run.dag.description if dag_run.dag else None,
# AirflowJobFacet should be created outside ProcessPoolExecutor that pickles objects,
# as it causes lack of some TaskGroup attributes and crashes event emission.
job_facets=get_airflow_job_facet(dag_run=dag_run),
)
except BaseException as e:
self.log.warning("OpenLineage received exception in method on_dag_run_running", exc_info=e)

@hookimpl
def on_dag_run_success(self, dag_run: DagRun, msg: str) -> None:
if dag_run.dag and not is_selective_lineage_enabled(dag_run.dag):
self.log.debug(
"Skipping OpenLineage event emission for DAG `%s` "
"due to lack of explicit lineage enablement for DAG while "
"[openlineage] selective_enable is on.",
dag_run.dag_id,
)
return
try:
if dag_run.dag and not is_selective_lineage_enabled(dag_run.dag):
self.log.debug(
"Skipping OpenLineage event emission for DAG `%s` "
"due to lack of explicit lineage enablement for DAG while "
"[openlineage] selective_enable is on.",
dag_run.dag_id,
)
return

if not self.executor:
self.log.debug("Executor have not started before `on_dag_run_success`")
return
if not self.executor:
self.log.debug("Executor have not started before `on_dag_run_success`")
return

self.executor.submit(self.adapter.dag_success, dag_run=dag_run, msg=msg)
if IS_AIRFLOW_2_10_OR_HIGHER:
task_ids = DagRun._get_partial_task_ids(dag_run.dag)
else:
task_ids = dag_run.dag.task_ids if dag_run.dag and dag_run.dag.partial else None
self.submit_callable(
self.adapter.dag_success,
dag_id=dag_run.dag_id,
run_id=dag_run.run_id,
end_date=dag_run.end_date,
logical_date=dag_run.logical_date,
task_ids=task_ids,
dag_run_state=dag_run.get_state(),
)
except BaseException as e:
self.log.warning("OpenLineage received exception in method on_dag_run_success", exc_info=e)

@hookimpl
def on_dag_run_failed(self, dag_run: DagRun, msg: str) -> None:
if dag_run.dag and not is_selective_lineage_enabled(dag_run.dag):
self.log.debug(
"Skipping OpenLineage event emission for DAG `%s` "
"due to lack of explicit lineage enablement for DAG while "
"[openlineage] selective_enable is on.",
dag_run.dag_id,
try:
if dag_run.dag and not is_selective_lineage_enabled(dag_run.dag):
self.log.debug(
"Skipping OpenLineage event emission for DAG `%s` "
"due to lack of explicit lineage enablement for DAG while "
"[openlineage] selective_enable is on.",
dag_run.dag_id,
)
return

if not self.executor:
self.log.debug("Executor have not started before `on_dag_run_failed`")
return

if IS_AIRFLOW_2_10_OR_HIGHER:
task_ids = DagRun._get_partial_task_ids(dag_run.dag)
else:
task_ids = dag_run.dag.task_ids if dag_run.dag and dag_run.dag.partial else None
self.submit_callable(
self.adapter.dag_failed,
dag_id=dag_run.dag_id,
run_id=dag_run.run_id,
end_date=dag_run.end_date,
logical_date=dag_run.logical_date,
dag_run_state=dag_run.get_state(),
task_ids=task_ids,
msg=msg,
)
return
except BaseException as e:
self.log.warning("OpenLineage received exception in method on_dag_run_failed", exc_info=e)

if not self.executor:
self.log.debug("Executor have not started before `on_dag_run_failed`")
return
def submit_callable(self, callable, *args, **kwargs):
fut = self.executor.submit(callable, *args, **kwargs)
fut.add_done_callback(self.log_submit_error)
return fut

self.executor.submit(self.adapter.dag_failed, dag_run=dag_run, msg=msg)
def log_submit_error(self, fut):
if fut.exception():
self.log.warning("Failed to submit method to executor", exc_info=fut.exception())
else:
self.log.debug("Successfully submitted method to executor")


def get_openlineage_listener() -> OpenLineageListener:
Expand Down
16 changes: 9 additions & 7 deletions airflow/providers/openlineage/utils/utils.py
Original file line number Diff line number Diff line change
Expand Up @@ -33,7 +33,7 @@
from airflow import __version__ as AIRFLOW_VERSION
from airflow.datasets import Dataset
from airflow.exceptions import AirflowProviderDeprecationWarning # TODO: move this maybe to Airflow's logic?
from airflow.models import DAG, BaseOperator, MappedOperator
from airflow.models import DAG, BaseOperator, DagRun, MappedOperator
from airflow.providers.openlineage import conf
from airflow.providers.openlineage.plugins.facets import (
AirflowDagRunFacet,
Expand All @@ -58,9 +58,8 @@
from openlineage.client.event_v2 import Dataset as OpenLineageDataset
from openlineage.client.facet_v2 import RunFacet

from airflow.models import DagRun, TaskInstance
from airflow.utils.state import TaskInstanceState

from airflow.models import TaskInstance
from airflow.utils.state import DagRunState, TaskInstanceState

log = logging.getLogger(__name__)
_NOMINAL_TIME_FORMAT = "%Y-%m-%dT%H:%M:%S.%fZ"
Expand Down Expand Up @@ -439,11 +438,14 @@ def get_airflow_job_facet(dag_run: DagRun) -> dict[str, AirflowJobFacet]:
}


def get_airflow_state_run_facet(dag_run: DagRun) -> dict[str, AirflowStateRunFacet]:
def get_airflow_state_run_facet(
dag_id: str, run_id: str, task_ids: list[str], dag_run_state: DagRunState
) -> dict[str, AirflowStateRunFacet]:
tis = DagRun.fetch_task_instances(dag_id=dag_id, run_id=run_id, task_ids=task_ids)
return {
"airflowState": AirflowStateRunFacet(
dagRunState=dag_run.get_state(),
tasksState={ti.task_id: ti.state for ti in dag_run.get_task_instances()},
dagRunState=dag_run_state,
tasksState={ti.task_id: ti.state for ti in tis},
)
}

Expand Down
Loading