Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
11 changes: 11 additions & 0 deletions providers/dbt/cloud/docs/operators.rst
Original file line number Diff line number Diff line change
Expand Up @@ -82,6 +82,17 @@ via the ``additional_run_config`` dictionary.
:start-after: [START howto_operator_dbt_cloud_run_job_async]
:end-before: [END howto_operator_dbt_cloud_run_job_async]

You can also trigger a dbt Cloud job without providing the ``job_id``. Instead, you can identify the job
by providing the ``project_name``, ``environment_name``, and ``job_name``.
Please note that it will only work if the above three parameters uniquely identify a job in your account
(i.e. you cannot have two jobs with the same name in the same project and environment).

.. exampleinclude:: /../../providers/dbt/cloud/tests/system/dbt/cloud/example_dbt_cloud.py
:language: python
:dedent: 4
:start-after: [START howto_operator_dbt_cloud_run_job_without_job_id]
:end-before: [END howto_operator_dbt_cloud_run_job_without_job_id]

.. _howto/operator:DbtCloudJobRunSensor:

Poll for status of a dbt Cloud Job run
Expand Down
133 changes: 129 additions & 4 deletions providers/dbt/cloud/src/airflow/providers/dbt/cloud/hooks/dbt.py
Original file line number Diff line number Diff line change
Expand Up @@ -135,6 +135,10 @@ class DbtCloudJobRunException(AirflowException):
"""An exception that indicates a job run failed to complete."""


class DbtCloudResourceLookupError(AirflowException):
"""Exception raised when a dbt Cloud resource cannot be uniquely identified."""


T = TypeVar("T", bound=Any)


Expand Down Expand Up @@ -356,14 +360,23 @@ def get_account(self, account_id: int | None = None) -> Response:
return self._run_and_get_response(endpoint=f"{account_id}/")

@fallback_to_default_account
def list_projects(self, account_id: int | None = None) -> list[Response]:
def list_projects(
self, account_id: int | None = None, name_contains: str | None = None
) -> list[Response]:
"""
Retrieve metadata for all projects tied to a specified dbt Cloud account.

:param account_id: Optional. The ID of a dbt Cloud account.
:param name_contains: Optional. The case-insensitive substring of a dbt Cloud project name to filter by.
:return: List of request responses.
"""
return self._run_and_get_response(endpoint=f"{account_id}/projects/", paginate=True, api_version="v3")
payload = {"name__icontains": name_contains} if name_contains else None
return self._run_and_get_response(
endpoint=f"{account_id}/projects/",
payload=payload,
paginate=True,
api_version="v3",
)

@fallback_to_default_account
def get_project(self, project_id: int, account_id: int | None = None) -> Response:
Expand All @@ -376,27 +389,73 @@ def get_project(self, project_id: int, account_id: int | None = None) -> Respons
"""
return self._run_and_get_response(endpoint=f"{account_id}/projects/{project_id}/", api_version="v3")

@fallback_to_default_account
def list_environments(
self, project_id: int, *, name_contains: str | None = None, account_id: int | None = None
) -> list[Response]:
"""
Retrieve metadata for all environments tied to a specified dbt Cloud project.

:param project_id: The ID of a dbt Cloud project.
:param name_contains: Optional. The case-insensitive substring of a dbt Cloud environment name to filter by.
:param account_id: Optional. The ID of a dbt Cloud account.
:return: List of request responses.
"""
payload = {"name__icontains": name_contains} if name_contains else None
return self._run_and_get_response(
endpoint=f"{account_id}/projects/{project_id}/environments/",
payload=payload,
paginate=True,
api_version="v3",
)

@fallback_to_default_account
def get_environment(
self, project_id: int, environment_id: int, *, account_id: int | None = None
) -> Response:
"""
Retrieve metadata for a specific project's environment.

:param project_id: The ID of a dbt Cloud project.
:param environment_id: The ID of a dbt Cloud environment.
:param account_id: Optional. The ID of a dbt Cloud account.
:return: The request response.
"""
return self._run_and_get_response(
endpoint=f"{account_id}/projects/{project_id}/environments/{environment_id}/", api_version="v3"
)

@fallback_to_default_account
def list_jobs(
self,
account_id: int | None = None,
order_by: str | None = None,
project_id: int | None = None,
environment_id: int | None = None,
name_contains: str | None = None,
) -> list[Response]:
"""
Retrieve metadata for all jobs tied to a specified dbt Cloud account.

If a ``project_id`` is supplied, only jobs pertaining to this project will be retrieved.
If an ``environment_id`` is supplied, only jobs pertaining to this environment will be retrieved.

:param account_id: Optional. The ID of a dbt Cloud account.
:param order_by: Optional. Field to order the result by. Use '-' to indicate reverse order.
For example, to use reverse order by the run ID use ``order_by=-id``.
:param project_id: The ID of a dbt Cloud project.
:param project_id: Optional. The ID of a dbt Cloud project.
:param environment_id: Optional. The ID of a dbt Cloud environment.
:param name_contains: Optional. The case-insensitive substring of a dbt Cloud job name to filter by.
:return: List of request responses.
"""
payload = {"order_by": order_by, "project_id": project_id}
if environment_id:
payload["environment_id"] = environment_id
if name_contains:
payload["name__icontains"] = name_contains
return self._run_and_get_response(
endpoint=f"{account_id}/jobs/",
payload={"order_by": order_by, "project_id": project_id},
payload=payload,
paginate=True,
)

Expand All @@ -411,6 +470,72 @@ def get_job(self, job_id: int, account_id: int | None = None) -> Response:
"""
return self._run_and_get_response(endpoint=f"{account_id}/jobs/{job_id}")

@fallback_to_default_account
def get_job_by_name(
self, *, project_name: str, environment_name: str, job_name: str, account_id: int | None = None
) -> dict:
"""
Retrieve metadata for a specific job by combination of project, environment, and job name.

Raises DbtCloudResourceLookupError if the job is not found or cannot be uniquely identified by provided parameters.

:param project_name: The name of a dbt Cloud project.
:param environment_name: The name of a dbt Cloud environment.
:param job_name: The name of a dbt Cloud job.
:param account_id: Optional. The ID of a dbt Cloud account.
:return: The details of a job.
"""
# get project_id using project_name
list_projects_responses = self.list_projects(name_contains=project_name, account_id=account_id)
# flatten & filter the list of responses to find the exact match
projects = [
project
for response in list_projects_responses
for project in response.json()["data"]
if project["name"] == project_name
]
if len(projects) != 1:
raise DbtCloudResourceLookupError(f"Found {len(projects)} projects with name `{project_name}`.")
project_id = projects[0]["id"]

# get environment_id using project_id and environment_name
list_environments_responses = self.list_environments(
project_id=project_id, name_contains=environment_name, account_id=account_id
)
# flatten & filter the list of responses to find the exact match
environments = [
env
for response in list_environments_responses
for env in response.json()["data"]
if env["name"] == environment_name
]
if len(environments) != 1:
raise DbtCloudResourceLookupError(
f"Found {len(environments)} environments with name `{environment_name}` in project `{project_name}`."
)
environment_id = environments[0]["id"]

# get job using project_id, environment_id and job_name
list_jobs_responses = self.list_jobs(
project_id=project_id,
environment_id=environment_id,
name_contains=job_name,
account_id=account_id,
)
# flatten & filter the list of responses to find the exact match
jobs = [
job
for response in list_jobs_responses
for job in response.json()["data"]
if job["name"] == job_name
]
if len(jobs) != 1:
raise DbtCloudResourceLookupError(
f"Found {len(jobs)} jobs with name `{job_name}` in environment `{environment_name}` in project `{project_name}`."
)

return jobs[0]

@fallback_to_default_account
def trigger_job_run(
self,
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -57,7 +57,10 @@ class DbtCloudRunJobOperator(BaseOperator):
:ref:`howto/operator:DbtCloudRunJobOperator`

:param dbt_cloud_conn_id: The connection ID for connecting to dbt Cloud.
:param job_id: The ID of a dbt Cloud job.
:param job_id: The ID of a dbt Cloud job. Required if project_name, environment_name, and job_name are not provided.
:param project_name: Optional. The name of a dbt Cloud project. Used only if ``job_id`` is None.
:param environment_name: Optional. The name of a dbt Cloud environment. Used only if ``job_id`` is None.
:param job_name: Optional. The name of a dbt Cloud job. Used only if ``job_id`` is None.
:param account_id: Optional. The ID of a dbt Cloud account.
:param trigger_reason: Optional. Description of the reason to trigger the job.
Defaults to "Triggered via Apache Airflow by task <task_id> in the <dag_id> DAG."
Expand Down Expand Up @@ -86,6 +89,9 @@ class DbtCloudRunJobOperator(BaseOperator):
template_fields = (
"dbt_cloud_conn_id",
"job_id",
"project_name",
"environment_name",
"job_name",
"account_id",
"trigger_reason",
"steps_override",
Expand All @@ -99,7 +105,10 @@ def __init__(
self,
*,
dbt_cloud_conn_id: str = DbtCloudHook.default_conn_name,
job_id: int,
job_id: int | None = None,
project_name: str | None = None,
environment_name: str | None = None,
job_name: str | None = None,
account_id: int | None = None,
trigger_reason: str | None = None,
steps_override: list[str] | None = None,
Expand All @@ -117,6 +126,9 @@ def __init__(
self.dbt_cloud_conn_id = dbt_cloud_conn_id
self.account_id = account_id
self.job_id = job_id
self.project_name = project_name
self.environment_name = environment_name
self.job_name = job_name
self.trigger_reason = trigger_reason
self.steps_override = steps_override
self.schema_override = schema_override
Expand All @@ -135,6 +147,18 @@ def execute(self, context: Context):
f"Triggered via Apache Airflow by task {self.task_id!r} in the {self.dag.dag_id} DAG."
)

if self.job_id is None:
if not all([self.project_name, self.environment_name, self.job_name]):
raise ValueError(
"Either job_id or project_name, environment_name, and job_name must be provided."
)
self.job_id = self.hook.get_job_by_name(
account_id=self.account_id,
project_name=self.project_name,
environment_name=self.environment_name,
job_name=self.job_name,
)["id"]

non_terminal_runs = None
if self.reuse_existing_run:
non_terminal_runs = self.hook.get_job_runs(
Expand Down
Loading