diff --git a/airflow-core/docs/img/airflow_erd.sha256 b/airflow-core/docs/img/airflow_erd.sha256 index a669d4d768e2b..49e18181c99b6 100644 --- a/airflow-core/docs/img/airflow_erd.sha256 +++ b/airflow-core/docs/img/airflow_erd.sha256 @@ -1 +1 @@ -c8047070343c2f31da70d1cf2fc286ec74aff921b964c5a662c9ac5eb8d07eac \ No newline at end of file +1a9b0180a1d8fecd9236fc1d897636e3bbe55fd0d0d24b9c530673fa78b4dc08 \ No newline at end of file diff --git a/airflow-core/docs/img/airflow_erd.svg b/airflow-core/docs/img/airflow_erd.svg index ad60590bf137a..24951ff72fbca 100644 --- a/airflow-core/docs/img/airflow_erd.svg +++ b/airflow-core/docs/img/airflow_erd.svg @@ -4,878 +4,878 @@ - - + + %3 - + log - -log - -id - - [INTEGER] - NOT NULL - -dag_id - - [VARCHAR(250)] - -dttm - - [TIMESTAMP] - -event - - [VARCHAR(60)] - -extra - - [TEXT] - -logical_date - - [TIMESTAMP] - -map_index - - [INTEGER] - -owner - - [VARCHAR(500)] - -owner_display_name - - [VARCHAR(500)] - -run_id - - [VARCHAR(250)] - -task_id - - [VARCHAR(250)] - -try_number - - [INTEGER] + +log + +id + + [INTEGER] + NOT NULL + +dag_id + + [VARCHAR(250)] + +dttm + + [TIMESTAMP] + +event + + [VARCHAR(60)] + +extra + + [TEXT] + +logical_date + + [TIMESTAMP] + +map_index + + [INTEGER] + +owner + + [VARCHAR(500)] + +owner_display_name + + [VARCHAR(500)] + +run_id + + [VARCHAR(250)] + +task_id + + [VARCHAR(250)] + +try_number + + [INTEGER] dag_priority_parsing_request - -dag_priority_parsing_request - -id - - [VARCHAR(32)] - NOT NULL - -bundle_name - - [VARCHAR(250)] - NOT NULL - -relative_fileloc - - [VARCHAR(2000)] - NOT NULL + +dag_priority_parsing_request + +id + + [VARCHAR(32)] + NOT NULL + +bundle_name + + [VARCHAR(250)] + NOT NULL + +relative_fileloc + + [VARCHAR(2000)] + NOT NULL job - -job - -id - - [INTEGER] - NOT NULL - -dag_id - - [VARCHAR(250)] - -end_date - - [TIMESTAMP] - -executor_class - - [VARCHAR(500)] - -hostname - - [VARCHAR(500)] - -job_type - - [VARCHAR(30)] - -latest_heartbeat - - [TIMESTAMP] - -start_date - - [TIMESTAMP] - -state - - [VARCHAR(20)] - -unixname - - [VARCHAR(1000)] + +job + +id + + [INTEGER] + NOT NULL + +dag_id + + [VARCHAR(250)] + +end_date + + [TIMESTAMP] + +executor_class + + [VARCHAR(500)] + +hostname + + [VARCHAR(500)] + +job_type + + [VARCHAR(30)] + +latest_heartbeat + + [TIMESTAMP] + +start_date + + [TIMESTAMP] + +state + + [VARCHAR(20)] + +unixname + + [VARCHAR(1000)] callback_request - -callback_request - -id - - [INTEGER] - NOT NULL - -callback_data - - [JSONB] - NOT NULL - -callback_type - - [VARCHAR(20)] - NOT NULL - -created_at - - [TIMESTAMP] - NOT NULL - -priority_weight - - [INTEGER] - NOT NULL + +callback_request + +id + + [INTEGER] + NOT NULL + +callback_data + + [JSONB] + NOT NULL + +callback_type + + [VARCHAR(20)] + NOT NULL + +created_at + + [TIMESTAMP] + NOT NULL + +priority_weight + + [INTEGER] + NOT NULL import_error - -import_error - -id - - [INTEGER] - NOT NULL - -bundle_name - - [VARCHAR(250)] - -filename - - [VARCHAR(1024)] - -stacktrace - - [TEXT] - -timestamp - - [TIMESTAMP] + +import_error + +id + + [INTEGER] + NOT NULL + +bundle_name + + [VARCHAR(250)] + +filename + + [VARCHAR(1024)] + +stacktrace + + [TEXT] + +timestamp + + [TIMESTAMP] dag_bundle - -dag_bundle - -name - - [VARCHAR(250)] - NOT NULL - -active - - [BOOLEAN] - -last_refreshed - - [TIMESTAMP] - -signed_url_template - - [VARCHAR(200)] - -template_params - - [JSON] - -version - - [VARCHAR(200)] + +dag_bundle + +name + + [VARCHAR(250)] + NOT NULL + +active + + [BOOLEAN] + +last_refreshed + + [TIMESTAMP] + +signed_url_template + + [VARCHAR(200)] + +template_params + + [JSON] + +version + + [VARCHAR(200)] dag_bundle_team - -dag_bundle_team - -dag_bundle_name - - [VARCHAR(250)] - NOT NULL - -team_id - - [UUID] - NOT NULL + +dag_bundle_team + +dag_bundle_name + + [VARCHAR(250)] + NOT NULL + +team_id + + [UUID] + NOT NULL dag_bundle:name--dag_bundle_team:dag_bundle_name - -0..N -1 + +0..N +1 dag - -dag - -dag_id - - [VARCHAR(250)] - NOT NULL - -asset_expression - - [JSON] - -bundle_name - - [VARCHAR(250)] - NOT NULL - -bundle_version - - [VARCHAR(200)] - -dag_display_name - - [VARCHAR(2000)] - -deadline - - [JSON] - -description - - [TEXT] - -fileloc - - [VARCHAR(2000)] - -has_import_errors - - [BOOLEAN] - -has_task_concurrency_limits - - [BOOLEAN] - NOT NULL - -is_paused - - [BOOLEAN] - -is_stale - - [BOOLEAN] - -last_expired - - [TIMESTAMP] - -last_parse_duration - - [DOUBLE_PRECISION] - -last_parsed_time - - [TIMESTAMP] - -max_active_runs - - [INTEGER] - -max_active_tasks - - [INTEGER] - NOT NULL - -max_consecutive_failed_dag_runs - - [INTEGER] - NOT NULL - -next_dagrun - - [TIMESTAMP] - -next_dagrun_create_after - - [TIMESTAMP] - -next_dagrun_data_interval_end - - [TIMESTAMP] - -next_dagrun_data_interval_start - - [TIMESTAMP] - -owners - - [VARCHAR(2000)] - -relative_fileloc - - [VARCHAR(2000)] - -timetable_description - - [VARCHAR(1000)] - -timetable_summary - - [TEXT] + +dag + +dag_id + + [VARCHAR(250)] + NOT NULL + +asset_expression + + [JSON] + +bundle_name + + [VARCHAR(250)] + NOT NULL + +bundle_version + + [VARCHAR(200)] + +dag_display_name + + [VARCHAR(2000)] + +deadline + + [JSON] + +description + + [TEXT] + +fileloc + + [VARCHAR(2000)] + +has_import_errors + + [BOOLEAN] + +has_task_concurrency_limits + + [BOOLEAN] + NOT NULL + +is_paused + + [BOOLEAN] + +is_stale + + [BOOLEAN] + +last_expired + + [TIMESTAMP] + +last_parse_duration + + [DOUBLE_PRECISION] + +last_parsed_time + + [TIMESTAMP] + +max_active_runs + + [INTEGER] + +max_active_tasks + + [INTEGER] + NOT NULL + +max_consecutive_failed_dag_runs + + [INTEGER] + NOT NULL + +next_dagrun + + [TIMESTAMP] + +next_dagrun_create_after + + [TIMESTAMP] + +next_dagrun_data_interval_end + + [TIMESTAMP] + +next_dagrun_data_interval_start + + [TIMESTAMP] + +owners + + [VARCHAR(2000)] + +relative_fileloc + + [VARCHAR(2000)] + +timetable_description + + [VARCHAR(1000)] + +timetable_summary + + [TEXT] dag_bundle:name--dag:bundle_name - -0..N -1 + +0..N +1 team - -team - -id - - [UUID] - NOT NULL - -name - - [VARCHAR(50)] - NOT NULL + +team + +id + + [UUID] + NOT NULL + +name + + [VARCHAR(50)] + NOT NULL team:id--dag_bundle_team:team_id - -0..N -1 + +0..N +1 connection - -connection - -id - - [INTEGER] - NOT NULL - -conn_id - - [VARCHAR(250)] - NOT NULL - -conn_type - - [VARCHAR(500)] - NOT NULL - -description - - [TEXT] - -extra - - [TEXT] - -host - - [VARCHAR(500)] - -is_encrypted - - [BOOLEAN] - -is_extra_encrypted - - [BOOLEAN] - -login - - [TEXT] - -password - - [TEXT] - -port - - [INTEGER] - -schema - - [VARCHAR(500)] - -team_id - - [UUID] + +connection + +id + + [INTEGER] + NOT NULL + +conn_id + + [VARCHAR(250)] + NOT NULL + +conn_type + + [VARCHAR(500)] + NOT NULL + +description + + [TEXT] + +extra + + [TEXT] + +host + + [VARCHAR(500)] + +is_encrypted + + [BOOLEAN] + +is_extra_encrypted + + [BOOLEAN] + +login + + [TEXT] + +password + + [TEXT] + +port + + [INTEGER] + +schema + + [VARCHAR(500)] + +team_id + + [UUID] team:id--connection:team_id - -0..N -{0,1} + +0..N +{0,1} variable - -variable - -id - - [INTEGER] - NOT NULL - -description - - [TEXT] - -is_encrypted - - [BOOLEAN] - -key - - [VARCHAR(250)] - -team_id - - [UUID] - -val - - [TEXT] + +variable + +id + + [INTEGER] + NOT NULL + +description + + [TEXT] + +is_encrypted + + [BOOLEAN] + +key + + [VARCHAR(250)] + +team_id + + [UUID] + +val + + [TEXT] team:id--variable:team_id - -0..N -{0,1} + +0..N +{0,1} slot_pool - -slot_pool - -id - - [INTEGER] - NOT NULL - -description - - [TEXT] - -include_deferred - - [BOOLEAN] - NOT NULL - -pool - - [VARCHAR(256)] - -slots - - [INTEGER] - -team_id - - [UUID] + +slot_pool + +id + + [INTEGER] + NOT NULL + +description + + [TEXT] + +include_deferred + + [BOOLEAN] + NOT NULL + +pool + + [VARCHAR(256)] + +slots + + [INTEGER] + +team_id + + [UUID] team:id--slot_pool:team_id - -0..N -{0,1} + +0..N +{0,1} asset_alias - -asset_alias - -id - - [INTEGER] - NOT NULL - -group - - [VARCHAR(1500)] - NOT NULL - -name - - [VARCHAR(1500)] - NOT NULL + +asset_alias + +id + + [INTEGER] + NOT NULL + +group + + [VARCHAR(1500)] + NOT NULL + +name + + [VARCHAR(1500)] + NOT NULL asset_alias_asset - -asset_alias_asset - -alias_id - - [INTEGER] - NOT NULL - -asset_id - - [INTEGER] - NOT NULL + +asset_alias_asset + +alias_id + + [INTEGER] + NOT NULL + +asset_id + + [INTEGER] + NOT NULL asset_alias:id--asset_alias_asset:alias_id - -0..N -1 + +0..N +1 asset_alias_asset_event - -asset_alias_asset_event - -alias_id - - [INTEGER] - NOT NULL - -event_id - - [INTEGER] - NOT NULL + +asset_alias_asset_event + +alias_id + + [INTEGER] + NOT NULL + +event_id + + [INTEGER] + NOT NULL - + asset_alias:id--asset_alias_asset_event:alias_id - -0..N -1 + +0..N +1 dag_schedule_asset_alias_reference - -dag_schedule_asset_alias_reference - -alias_id - - [INTEGER] - NOT NULL - -dag_id - - [VARCHAR(250)] - NOT NULL - -created_at - - [TIMESTAMP] - NOT NULL - -updated_at - - [TIMESTAMP] - NOT NULL + +dag_schedule_asset_alias_reference + +alias_id + + [INTEGER] + NOT NULL + +dag_id + + [VARCHAR(250)] + NOT NULL + +created_at + + [TIMESTAMP] + NOT NULL + +updated_at + + [TIMESTAMP] + NOT NULL - + asset_alias:id--dag_schedule_asset_alias_reference:alias_id - -0..N -1 + +0..N +1 asset - -asset - -id - - [INTEGER] - NOT NULL - -created_at - - [TIMESTAMP] - NOT NULL - -extra - - [JSON] - NOT NULL - -group - - [VARCHAR(1500)] - NOT NULL - -name - - [VARCHAR(1500)] - NOT NULL - -updated_at - - [TIMESTAMP] - NOT NULL - -uri - - [VARCHAR(1500)] - NOT NULL + +asset + +id + + [INTEGER] + NOT NULL + +created_at + + [TIMESTAMP] + NOT NULL + +extra + + [JSON] + NOT NULL + +group + + [VARCHAR(1500)] + NOT NULL + +name + + [VARCHAR(1500)] + NOT NULL + +updated_at + + [TIMESTAMP] + NOT NULL + +uri + + [VARCHAR(1500)] + NOT NULL asset:id--asset_alias_asset:asset_id - -0..N -1 + +0..N +1 asset_watcher - -asset_watcher - -asset_id - - [INTEGER] - NOT NULL - -trigger_id - - [INTEGER] - NOT NULL - -name - - [VARCHAR(1500)] - NOT NULL + +asset_watcher + +asset_id + + [INTEGER] + NOT NULL + +trigger_id + + [INTEGER] + NOT NULL + +name + + [VARCHAR(1500)] + NOT NULL - + asset:id--asset_watcher:asset_id - -0..N -1 + +0..N +1 asset_active - -asset_active - -name - - [VARCHAR(1500)] - NOT NULL - -uri - - [VARCHAR(1500)] - NOT NULL + +asset_active + +name + + [VARCHAR(1500)] + NOT NULL + +uri + + [VARCHAR(1500)] + NOT NULL -asset:name--asset_active:name - -1 -1 +asset:uri--asset_active:uri + +1 +1 -asset:uri--asset_active:uri - -1 -1 +asset:name--asset_active:name + +1 +1 dag_schedule_asset_reference - -dag_schedule_asset_reference - -asset_id - - [INTEGER] - NOT NULL - -dag_id - - [VARCHAR(250)] - NOT NULL - -created_at - - [TIMESTAMP] - NOT NULL - -updated_at - - [TIMESTAMP] - NOT NULL + +dag_schedule_asset_reference + +asset_id + + [INTEGER] + NOT NULL + +dag_id + + [VARCHAR(250)] + NOT NULL + +created_at + + [TIMESTAMP] + NOT NULL + +updated_at + + [TIMESTAMP] + NOT NULL - + asset:id--dag_schedule_asset_reference:asset_id - -0..N -1 + +0..N +1 task_outlet_asset_reference - -task_outlet_asset_reference - -asset_id - - [INTEGER] - NOT NULL - -dag_id - - [VARCHAR(250)] - NOT NULL - -task_id - - [VARCHAR(250)] - NOT NULL - -created_at - - [TIMESTAMP] - NOT NULL - -updated_at - - [TIMESTAMP] - NOT NULL + +task_outlet_asset_reference + +asset_id + + [INTEGER] + NOT NULL + +dag_id + + [VARCHAR(250)] + NOT NULL + +task_id + + [VARCHAR(250)] + NOT NULL + +created_at + + [TIMESTAMP] + NOT NULL + +updated_at + + [TIMESTAMP] + NOT NULL - + asset:id--task_outlet_asset_reference:asset_id - -0..N -1 + +0..N +1 task_inlet_asset_reference - -task_inlet_asset_reference - -asset_id - - [INTEGER] - NOT NULL - -dag_id - - [VARCHAR(250)] - NOT NULL - -task_id - - [VARCHAR(250)] - NOT NULL - -created_at - - [TIMESTAMP] - NOT NULL - -updated_at - - [TIMESTAMP] - NOT NULL + +task_inlet_asset_reference + +asset_id + + [INTEGER] + NOT NULL + +dag_id + + [VARCHAR(250)] + NOT NULL + +task_id + + [VARCHAR(250)] + NOT NULL + +created_at + + [TIMESTAMP] + NOT NULL + +updated_at + + [TIMESTAMP] + NOT NULL asset:id--task_inlet_asset_reference:asset_id - -0..N -1 + +0..N +1 asset_dag_run_queue - -asset_dag_run_queue - -asset_id - - [INTEGER] - NOT NULL - -target_dag_id - - [VARCHAR(250)] - NOT NULL - -created_at - - [TIMESTAMP] - NOT NULL + +asset_dag_run_queue + +asset_id + + [INTEGER] + NOT NULL + +target_dag_id + + [VARCHAR(250)] + NOT NULL + +created_at + + [TIMESTAMP] + NOT NULL - + asset:id--asset_dag_run_queue:asset_id - -0..N -1 + +0..N +1 @@ -920,1521 +920,1592 @@ NOT NULL - + asset_event:id--asset_alias_asset_event:event_id - -0..N + +0..N 1 dagrun_asset_event - -dagrun_asset_event - -dag_run_id - - [INTEGER] - NOT NULL - -event_id - - [INTEGER] - NOT NULL + +dagrun_asset_event + +dag_run_id + + [INTEGER] + NOT NULL + +event_id + + [INTEGER] + NOT NULL asset_event:id--dagrun_asset_event:event_id - -0..N + +0..N 1 trigger - -trigger - -id - - [INTEGER] - NOT NULL - -classpath - - [VARCHAR(1000)] - NOT NULL - -created_date - - [TIMESTAMP] - NOT NULL - -kwargs - - [TEXT] - NOT NULL - -triggerer_id - - [INTEGER] + +trigger + +id + + [INTEGER] + NOT NULL + +classpath + + [VARCHAR(1000)] + NOT NULL + +created_date + + [TIMESTAMP] + NOT NULL + +kwargs + + [TEXT] + NOT NULL + +triggerer_id + + [INTEGER] - + trigger:id--asset_watcher:trigger_id - -0..N -1 + +0..N +1 task_instance - -task_instance - -id - - [UUID] - NOT NULL - -context_carrier - - [JSONB] - -custom_operator_name - - [VARCHAR(1000)] - -dag_id - - [VARCHAR(250)] - NOT NULL - -dag_version_id - - [UUID] - -duration - - [DOUBLE_PRECISION] - -end_date - - [TIMESTAMP] - -executor - - [VARCHAR(1000)] - -executor_config - - [BYTEA] - -external_executor_id - - [VARCHAR(250)] - -hostname - - [VARCHAR(1000)] - -last_heartbeat_at - - [TIMESTAMP] - -map_index - - [INTEGER] - NOT NULL - -max_tries - - [INTEGER] - -next_kwargs - - [JSONB] - -next_method - - [VARCHAR(1000)] - -operator - - [VARCHAR(1000)] - -pid - - [INTEGER] - -pool - - [VARCHAR(256)] - NOT NULL - -pool_slots - - [INTEGER] - NOT NULL - -priority_weight - - [INTEGER] - -queue - - [VARCHAR(256)] - -queued_by_job_id - - [INTEGER] - -queued_dttm - - [TIMESTAMP] - -rendered_map_index - - [VARCHAR(250)] - -run_id - - [VARCHAR(250)] - NOT NULL - -scheduled_dttm - - [TIMESTAMP] - -span_status - - [VARCHAR(250)] - NOT NULL - -start_date - - [TIMESTAMP] - -state - - [VARCHAR(20)] - -task_display_name - - [VARCHAR(2000)] - -task_id - - [VARCHAR(250)] - NOT NULL - -trigger_id - - [INTEGER] - -trigger_timeout - - [TIMESTAMP] - -try_number - - [INTEGER] - -unixname - - [VARCHAR(1000)] - -updated_at - - [TIMESTAMP] + +task_instance + +id + + [UUID] + NOT NULL + +context_carrier + + [JSONB] + +custom_operator_name + + [VARCHAR(1000)] + +dag_id + + [VARCHAR(250)] + NOT NULL + +dag_version_id + + [UUID] + +duration + + [DOUBLE_PRECISION] + +end_date + + [TIMESTAMP] + +executor + + [VARCHAR(1000)] + +executor_config + + [BYTEA] + +external_executor_id + + [VARCHAR(250)] + +hostname + + [VARCHAR(1000)] + +last_heartbeat_at + + [TIMESTAMP] + +map_index + + [INTEGER] + NOT NULL + +max_tries + + [INTEGER] + +next_kwargs + + [JSONB] + +next_method + + [VARCHAR(1000)] + +operator + + [VARCHAR(1000)] + +pid + + [INTEGER] + +pool + + [VARCHAR(256)] + NOT NULL + +pool_slots + + [INTEGER] + NOT NULL + +priority_weight + + [INTEGER] + +queue + + [VARCHAR(256)] + +queued_by_job_id + + [INTEGER] + +queued_dttm + + [TIMESTAMP] + +rendered_map_index + + [VARCHAR(250)] + +run_id + + [VARCHAR(250)] + NOT NULL + +scheduled_dttm + + [TIMESTAMP] + +span_status + + [VARCHAR(250)] + NOT NULL + +start_date + + [TIMESTAMP] + +state + + [VARCHAR(20)] + +task_display_name + + [VARCHAR(2000)] + +task_id + + [VARCHAR(250)] + NOT NULL + +trigger_id + + [INTEGER] + +trigger_timeout + + [TIMESTAMP] + +try_number + + [INTEGER] + +unixname + + [VARCHAR(1000)] + +updated_at + + [TIMESTAMP] - + trigger:id--task_instance:trigger_id - -0..N -{0,1} + +0..N +{0,1} deadline - -deadline - -id - - [UUID] - NOT NULL - -callback - - [JSON] - NOT NULL - -callback_state - - [VARCHAR(20)] - -dagrun_id - - [INTEGER] - -deadline_time - - [TIMESTAMP] - NOT NULL - -trigger_id - - [INTEGER] + +deadline + +id + + [UUID] + NOT NULL + +callback + + [JSON] + NOT NULL + +callback_state + + [VARCHAR(20)] + +dagrun_id + + [INTEGER] + +deadline_time + + [TIMESTAMP] + NOT NULL + +trigger_id + + [INTEGER] trigger:id--deadline:trigger_id - -0..N -{0,1} + +0..N +{0,1} dag_schedule_asset_name_reference - -dag_schedule_asset_name_reference - -dag_id - - [VARCHAR(250)] - NOT NULL - -name - - [VARCHAR(1500)] - NOT NULL - -created_at - - [TIMESTAMP] - NOT NULL + +dag_schedule_asset_name_reference + +dag_id + + [VARCHAR(250)] + NOT NULL + +name + + [VARCHAR(1500)] + NOT NULL + +created_at + + [TIMESTAMP] + NOT NULL dag:dag_id--dag_schedule_asset_name_reference:dag_id - -0..N -1 + +0..N +1 dag_schedule_asset_uri_reference - -dag_schedule_asset_uri_reference - -dag_id - - [VARCHAR(250)] - NOT NULL - -uri - - [VARCHAR(1500)] - NOT NULL - -created_at - - [TIMESTAMP] - NOT NULL + +dag_schedule_asset_uri_reference + +dag_id + + [VARCHAR(250)] + NOT NULL + +uri + + [VARCHAR(1500)] + NOT NULL + +created_at + + [TIMESTAMP] + NOT NULL dag:dag_id--dag_schedule_asset_uri_reference:dag_id - -0..N -1 + +0..N +1 - + dag:dag_id--dag_schedule_asset_alias_reference:dag_id - -0..N -1 + +0..N +1 - + dag:dag_id--dag_schedule_asset_reference:dag_id - -0..N -1 + +0..N +1 - + dag:dag_id--task_outlet_asset_reference:dag_id - -0..N -1 + +0..N +1 dag:dag_id--task_inlet_asset_reference:dag_id - -0..N -1 + +0..N +1 - + dag:dag_id--asset_dag_run_queue:target_dag_id - -0..N -1 + +0..N +1 dag_version - -dag_version - -id - - [UUID] - NOT NULL - -bundle_name - - [VARCHAR(250)] - -bundle_version - - [VARCHAR(250)] - -created_at - - [TIMESTAMP] - NOT NULL - -dag_id - - [VARCHAR(250)] - NOT NULL - -last_updated - - [TIMESTAMP] - NOT NULL - -version_number - - [INTEGER] - NOT NULL + +dag_version + +id + + [UUID] + NOT NULL + +bundle_name + + [VARCHAR(250)] + +bundle_version + + [VARCHAR(250)] + +created_at + + [TIMESTAMP] + NOT NULL + +dag_id + + [VARCHAR(250)] + NOT NULL + +last_updated + + [TIMESTAMP] + NOT NULL + +version_number + + [INTEGER] + NOT NULL dag:dag_id--dag_version:dag_id - -0..N -1 + +0..N +1 dag_tag - -dag_tag - -dag_id - - [VARCHAR(250)] - NOT NULL - -name - - [VARCHAR(100)] - NOT NULL + +dag_tag + +dag_id + + [VARCHAR(250)] + NOT NULL + +name + + [VARCHAR(100)] + NOT NULL dag:dag_id--dag_tag:dag_id - -0..N -1 + +0..N +1 dag_owner_attributes - -dag_owner_attributes - -dag_id - - [VARCHAR(250)] - NOT NULL - -owner - - [VARCHAR(500)] - NOT NULL - -link - - [VARCHAR(500)] - NOT NULL + +dag_owner_attributes + +dag_id + + [VARCHAR(250)] + NOT NULL + +owner + + [VARCHAR(500)] + NOT NULL + +link + + [VARCHAR(500)] + NOT NULL dag:dag_id--dag_owner_attributes:dag_id - -0..N -1 + +0..N +1 dag_warning - -dag_warning - -dag_id - - [VARCHAR(250)] - NOT NULL - -warning_type - - [VARCHAR(50)] - NOT NULL - -message - - [TEXT] - NOT NULL - -timestamp - - [TIMESTAMP] - NOT NULL + +dag_warning + +dag_id + + [VARCHAR(250)] + NOT NULL + +warning_type + + [VARCHAR(50)] + NOT NULL + +message + + [TEXT] + NOT NULL + +timestamp + + [TIMESTAMP] + NOT NULL dag:dag_id--dag_warning:dag_id - -0..N -1 + +0..N +1 dag_favorite - -dag_favorite - -dag_id - - [VARCHAR(250)] - NOT NULL - -user_id - - [VARCHAR(250)] - NOT NULL + +dag_favorite + +dag_id + + [VARCHAR(250)] + NOT NULL + +user_id + + [VARCHAR(250)] + NOT NULL dag:dag_id--dag_favorite:dag_id - -0..N -1 + +0..N +1 dag_run - -dag_run - -id - - [INTEGER] - NOT NULL - -backfill_id - - [INTEGER] - -bundle_version - - [VARCHAR(250)] - -clear_number - - [INTEGER] - NOT NULL - -conf - - [JSONB] - -context_carrier - - [JSONB] - -created_dag_version_id - - [UUID] - -creating_job_id - - [INTEGER] - -dag_id - - [VARCHAR(250)] - NOT NULL - -data_interval_end - - [TIMESTAMP] - -data_interval_start - - [TIMESTAMP] - -end_date - - [TIMESTAMP] - -last_scheduling_decision - - [TIMESTAMP] - -log_template_id - - [INTEGER] - -logical_date - - [TIMESTAMP] - -queued_at - - [TIMESTAMP] - -run_after - - [TIMESTAMP] - NOT NULL - -run_id - - [VARCHAR(250)] - NOT NULL - -run_type - - [VARCHAR(50)] - NOT NULL - -scheduled_by_job_id - - [INTEGER] - -span_status - - [VARCHAR(250)] - NOT NULL - -start_date - - [TIMESTAMP] - -state - - [VARCHAR(50)] - -triggered_by - - [VARCHAR(50)] - -triggering_user_name - - [VARCHAR(512)] - -updated_at - - [TIMESTAMP] + +dag_run + +id + + [INTEGER] + NOT NULL + +backfill_id + + [INTEGER] + +bundle_version + + [VARCHAR(250)] + +clear_number + + [INTEGER] + NOT NULL + +conf + + [JSONB] + +context_carrier + + [JSONB] + +created_dag_version_id + + [UUID] + +creating_job_id + + [INTEGER] + +dag_id + + [VARCHAR(250)] + NOT NULL + +data_interval_end + + [TIMESTAMP] + +data_interval_start + + [TIMESTAMP] + +end_date + + [TIMESTAMP] + +last_scheduling_decision + + [TIMESTAMP] + +log_template_id + + [INTEGER] + +logical_date + + [TIMESTAMP] + +queued_at + + [TIMESTAMP] + +run_after + + [TIMESTAMP] + NOT NULL + +run_id + + [VARCHAR(250)] + NOT NULL + +run_type + + [VARCHAR(50)] + NOT NULL + +scheduled_by_job_id + + [INTEGER] + +span_status + + [VARCHAR(250)] + NOT NULL + +start_date + + [TIMESTAMP] + +state + + [VARCHAR(50)] + +triggered_by + + [VARCHAR(50)] + +triggering_user_name + + [VARCHAR(512)] + +updated_at + + [TIMESTAMP] dag_version:id--dag_run:created_dag_version_id - -0..N -{0,1} + +0..N +{0,1} dag_code - -dag_code - -id - - [UUID] - NOT NULL - -created_at - - [TIMESTAMP] - NOT NULL - -dag_id - - [VARCHAR(250)] - NOT NULL - -dag_version_id - - [UUID] - NOT NULL - -fileloc - - [VARCHAR(2000)] - NOT NULL - -last_updated - - [TIMESTAMP] - NOT NULL - -source_code - - [TEXT] - NOT NULL - -source_code_hash - - [VARCHAR(32)] - NOT NULL + +dag_code + +id + + [UUID] + NOT NULL + +created_at + + [TIMESTAMP] + NOT NULL + +dag_id + + [VARCHAR(250)] + NOT NULL + +dag_version_id + + [UUID] + NOT NULL + +fileloc + + [VARCHAR(2000)] + NOT NULL + +last_updated + + [TIMESTAMP] + NOT NULL + +source_code + + [TEXT] + NOT NULL + +source_code_hash + + [VARCHAR(32)] + NOT NULL dag_version:id--dag_code:dag_version_id - -0..N -1 + +0..N +1 serialized_dag - -serialized_dag - -id - - [UUID] - NOT NULL - -created_at - - [TIMESTAMP] - NOT NULL - -dag_hash - - [VARCHAR(32)] - NOT NULL - -dag_id - - [VARCHAR(250)] - NOT NULL - -dag_version_id - - [UUID] - NOT NULL - -data - - [JSONB] - -data_compressed - - [BYTEA] - -last_updated - - [TIMESTAMP] - NOT NULL + +serialized_dag + +id + + [UUID] + NOT NULL + +created_at + + [TIMESTAMP] + NOT NULL + +dag_hash + + [VARCHAR(32)] + NOT NULL + +dag_id + + [VARCHAR(250)] + NOT NULL + +dag_version_id + + [UUID] + NOT NULL + +data + + [JSONB] + +data_compressed + + [BYTEA] + +last_updated + + [TIMESTAMP] + NOT NULL dag_version:id--serialized_dag:dag_version_id - -0..N -1 + +0..N +1 - + dag_version:id--task_instance:dag_version_id - -0..N -{0,1} + +0..N +{0,1} log_template - -log_template - -id - - [INTEGER] - NOT NULL - -created_at - - [TIMESTAMP] - NOT NULL - -elasticsearch_id - - [TEXT] - NOT NULL - -filename - - [TEXT] - NOT NULL + +log_template + +id + + [INTEGER] + NOT NULL + +created_at + + [TIMESTAMP] + NOT NULL + +elasticsearch_id + + [TEXT] + NOT NULL + +filename + + [TEXT] + NOT NULL - + log_template:id--dag_run:log_template_id - -0..N -{0,1} + +0..N +{0,1} dag_run:id--dagrun_asset_event:dag_run_id - -0..N -1 + +0..N +1 - -dag_run:run_id--task_instance:run_id - -0..N -1 + +dag_run:dag_id--task_instance:dag_id + +0..N +1 - -dag_run:dag_id--task_instance:dag_id - -0..N -1 + +dag_run:run_id--task_instance:run_id + +0..N +1 dag_run:id--deadline:dagrun_id - -0..N -{0,1} + +0..N +{0,1} backfill_dag_run - -backfill_dag_run - -id - - [INTEGER] - NOT NULL - -backfill_id - - [INTEGER] - NOT NULL - -dag_run_id - - [INTEGER] - -exception_reason - - [VARCHAR(250)] - -logical_date - - [TIMESTAMP] - NOT NULL - -sort_ordinal - - [INTEGER] - NOT NULL + +backfill_dag_run + +id + + [INTEGER] + NOT NULL + +backfill_id + + [INTEGER] + NOT NULL + +dag_run_id + + [INTEGER] + +exception_reason + + [VARCHAR(250)] + +logical_date + + [TIMESTAMP] + NOT NULL + +sort_ordinal + + [INTEGER] + NOT NULL - + dag_run:id--backfill_dag_run:dag_run_id - -0..N -{0,1} + +0..N +{0,1} dag_run_note - -dag_run_note - -dag_run_id - - [INTEGER] - NOT NULL - -content - - [VARCHAR(1000)] - -created_at - - [TIMESTAMP] - NOT NULL - -updated_at - - [TIMESTAMP] - NOT NULL - -user_id - - [VARCHAR(128)] + +dag_run_note + +dag_run_id + + [INTEGER] + NOT NULL + +content + + [VARCHAR(1000)] + +created_at + + [TIMESTAMP] + NOT NULL + +updated_at + + [TIMESTAMP] + NOT NULL + +user_id + + [VARCHAR(128)] dag_run:id--dag_run_note:dag_run_id - -1 -1 + +1 +1 backfill - -backfill - -id - - [INTEGER] - NOT NULL - -completed_at - - [TIMESTAMP] - -created_at - - [TIMESTAMP] - NOT NULL - -dag_id - - [VARCHAR(250)] - NOT NULL - -dag_run_conf - - [JSON] - NOT NULL - -from_date - - [TIMESTAMP] - NOT NULL - -is_paused - - [BOOLEAN] - -max_active_runs - - [INTEGER] - NOT NULL - -reprocess_behavior - - [VARCHAR(250)] - NOT NULL - -to_date - - [TIMESTAMP] - NOT NULL - -triggering_user_name - - [VARCHAR(512)] - -updated_at - - [TIMESTAMP] - NOT NULL + +backfill + +id + + [INTEGER] + NOT NULL + +completed_at + + [TIMESTAMP] + +created_at + + [TIMESTAMP] + NOT NULL + +dag_id + + [VARCHAR(250)] + NOT NULL + +dag_run_conf + + [JSON] + NOT NULL + +from_date + + [TIMESTAMP] + NOT NULL + +is_paused + + [BOOLEAN] + +max_active_runs + + [INTEGER] + NOT NULL + +reprocess_behavior + + [VARCHAR(250)] + NOT NULL + +to_date + + [TIMESTAMP] + NOT NULL + +triggering_user_name + + [VARCHAR(512)] + +updated_at + + [TIMESTAMP] + NOT NULL - + backfill:id--dag_run:backfill_id - -0..N -{0,1} + +0..N +{0,1} - + backfill:id--backfill_dag_run:backfill_id - -0..N -1 + +0..N +1 hitl_detail - -hitl_detail - -ti_id - - [UUID] - NOT NULL - -assignees - - [JSON] - -body - - [TEXT] - -chosen_options - - [JSON] - -created_at - - [TIMESTAMP] - NOT NULL - -defaults - - [JSON] - -multiple - - [BOOLEAN] - -options - - [JSON] - NOT NULL - -params - - [JSON] - NOT NULL - -params_input - - [JSON] - NOT NULL - -responded_at - - [TIMESTAMP] - -responded_by - - [JSON] - -subject - - [TEXT] - NOT NULL + +hitl_detail + +ti_id + + [UUID] + NOT NULL + +assignees + + [JSON] + +body + + [TEXT] + +chosen_options + + [JSON] + +created_at + + [TIMESTAMP] + NOT NULL + +defaults + + [JSON] + +multiple + + [BOOLEAN] + +options + + [JSON] + NOT NULL + +params + + [JSON] + NOT NULL + +params_input + + [JSON] + NOT NULL + +responded_at + + [TIMESTAMP] + +responded_by + + [JSON] + +subject + + [TEXT] + NOT NULL task_instance:id--hitl_detail:ti_id - -1 -1 + +1 +1 task_map - -task_map - -dag_id - - [VARCHAR(250)] - NOT NULL - -map_index - - [INTEGER] - NOT NULL - -run_id - - [VARCHAR(250)] - NOT NULL - -task_id - - [VARCHAR(250)] - NOT NULL - -keys - - [JSONB] - -length - - [INTEGER] - NOT NULL + +task_map + +dag_id + + [VARCHAR(250)] + NOT NULL + +map_index + + [INTEGER] + NOT NULL + +run_id + + [VARCHAR(250)] + NOT NULL + +task_id + + [VARCHAR(250)] + NOT NULL + +keys + + [JSONB] + +length + + [INTEGER] + NOT NULL -task_instance:run_id--task_map:run_id - -0..N -1 +task_instance:dag_id--task_map:dag_id + +0..N +1 -task_instance:map_index--task_map:map_index - -0..N -1 +task_instance:run_id--task_map:run_id + +0..N +1 -task_instance:dag_id--task_map:dag_id - -0..N -1 +task_instance:task_id--task_map:task_id + +0..N +1 -task_instance:task_id--task_map:task_id - -0..N -1 +task_instance:map_index--task_map:map_index + +0..N +1 task_reschedule - -task_reschedule - -id - - [INTEGER] - NOT NULL - -duration - - [INTEGER] - NOT NULL - -end_date - - [TIMESTAMP] - NOT NULL - -reschedule_date - - [TIMESTAMP] - NOT NULL - -start_date - - [TIMESTAMP] - NOT NULL - -ti_id - - [UUID] - NOT NULL + +task_reschedule + +id + + [INTEGER] + NOT NULL + +duration + + [INTEGER] + NOT NULL + +end_date + + [TIMESTAMP] + NOT NULL + +reschedule_date + + [TIMESTAMP] + NOT NULL + +start_date + + [TIMESTAMP] + NOT NULL + +ti_id + + [UUID] + NOT NULL task_instance:id--task_reschedule:ti_id - -0..N -1 + +0..N +1 xcom - -xcom - -dag_run_id - - [INTEGER] - NOT NULL - -key - - [VARCHAR(512)] - NOT NULL - -map_index - - [INTEGER] - NOT NULL - -task_id - - [VARCHAR(250)] - NOT NULL - -dag_id - - [VARCHAR(250)] - NOT NULL - -run_id - - [VARCHAR(250)] - NOT NULL - -timestamp - - [TIMESTAMP] - NOT NULL - -value - - [JSONB] + +xcom + +dag_run_id + + [INTEGER] + NOT NULL + +key + + [VARCHAR(512)] + NOT NULL + +map_index + + [INTEGER] + NOT NULL + +task_id + + [VARCHAR(250)] + NOT NULL + +dag_id + + [VARCHAR(250)] + NOT NULL + +run_id + + [VARCHAR(250)] + NOT NULL + +timestamp + + [TIMESTAMP] + NOT NULL + +value + + [JSONB] task_instance:dag_id--xcom:dag_id - -0..N -1 + +0..N +1 -task_instance:run_id--xcom:run_id - -0..N -1 +task_instance:map_index--xcom:map_index + +0..N +1 -task_instance:map_index--xcom:map_index - -0..N -1 +task_instance:run_id--xcom:run_id + +0..N +1 task_instance:task_id--xcom:task_id - -0..N -1 + +0..N +1 task_instance_note - -task_instance_note - -ti_id - - [UUID] - NOT NULL - -content - - [VARCHAR(1000)] - -created_at - - [TIMESTAMP] - NOT NULL - -updated_at - - [TIMESTAMP] - NOT NULL - -user_id - - [VARCHAR(128)] + +task_instance_note + +ti_id + + [UUID] + NOT NULL + +content + + [VARCHAR(1000)] + +created_at + + [TIMESTAMP] + NOT NULL + +updated_at + + [TIMESTAMP] + NOT NULL + +user_id + + [VARCHAR(128)] task_instance:id--task_instance_note:ti_id - -1 -1 + +1 +1 task_instance_history - -task_instance_history - -task_instance_id - - [UUID] - NOT NULL - -context_carrier - - [JSONB] - -custom_operator_name - - [VARCHAR(1000)] - -dag_id - - [VARCHAR(250)] - NOT NULL - -dag_version_id - - [UUID] - -duration - - [DOUBLE_PRECISION] - -end_date - - [TIMESTAMP] - -executor - - [VARCHAR(1000)] - -executor_config - - [BYTEA] - -external_executor_id - - [VARCHAR(250)] - -hostname - - [VARCHAR(1000)] - -map_index - - [INTEGER] - NOT NULL - -max_tries - - [INTEGER] - -next_kwargs - - [JSONB] - -next_method - - [VARCHAR(1000)] - -operator - - [VARCHAR(1000)] - -pid - - [INTEGER] - -pool - - [VARCHAR(256)] - NOT NULL - -pool_slots - - [INTEGER] - NOT NULL - -priority_weight - - [INTEGER] - -queue - - [VARCHAR(256)] - -queued_by_job_id - - [INTEGER] - -queued_dttm - - [TIMESTAMP] - -rendered_map_index - - [VARCHAR(250)] - -run_id - - [VARCHAR(250)] - NOT NULL - -scheduled_dttm - - [TIMESTAMP] - -span_status - - [VARCHAR(250)] - NOT NULL - -start_date - - [TIMESTAMP] - -state - - [VARCHAR(20)] - -task_display_name - - [VARCHAR(2000)] - -task_id - - [VARCHAR(250)] - NOT NULL - -trigger_id - - [INTEGER] - -trigger_timeout - - [TIMESTAMP] - -try_number - - [INTEGER] - NOT NULL - -unixname - - [VARCHAR(1000)] - -updated_at - - [TIMESTAMP] + +task_instance_history + +task_instance_id + + [UUID] + NOT NULL + +context_carrier + + [JSONB] + +custom_operator_name + + [VARCHAR(1000)] + +dag_id + + [VARCHAR(250)] + NOT NULL + +dag_version_id + + [UUID] + +duration + + [DOUBLE_PRECISION] + +end_date + + [TIMESTAMP] + +executor + + [VARCHAR(1000)] + +executor_config + + [BYTEA] + +external_executor_id + + [VARCHAR(250)] + +hostname + + [VARCHAR(1000)] + +map_index + + [INTEGER] + NOT NULL + +max_tries + + [INTEGER] + +next_kwargs + + [JSONB] + +next_method + + [VARCHAR(1000)] + +operator + + [VARCHAR(1000)] + +pid + + [INTEGER] + +pool + + [VARCHAR(256)] + NOT NULL + +pool_slots + + [INTEGER] + NOT NULL + +priority_weight + + [INTEGER] + +queue + + [VARCHAR(256)] + +queued_by_job_id + + [INTEGER] + +queued_dttm + + [TIMESTAMP] + +rendered_map_index + + [VARCHAR(250)] + +run_id + + [VARCHAR(250)] + NOT NULL + +scheduled_dttm + + [TIMESTAMP] + +span_status + + [VARCHAR(250)] + NOT NULL + +start_date + + [TIMESTAMP] + +state + + [VARCHAR(20)] + +task_display_name + + [VARCHAR(2000)] + +task_id + + [VARCHAR(250)] + NOT NULL + +trigger_id + + [INTEGER] + +trigger_timeout + + [TIMESTAMP] + +try_number + + [INTEGER] + NOT NULL + +unixname + + [VARCHAR(1000)] + +updated_at + + [TIMESTAMP] -task_instance:task_id--task_instance_history:task_id - -0..N -1 +task_instance:dag_id--task_instance_history:dag_id + +0..N +1 task_instance:map_index--task_instance_history:map_index - -0..N -1 + +0..N +1 -task_instance:run_id--task_instance_history:run_id - -0..N -1 +task_instance:task_id--task_instance_history:task_id + +0..N +1 -task_instance:dag_id--task_instance_history:dag_id - -0..N -1 +task_instance:run_id--task_instance_history:run_id + +0..N +1 rendered_task_instance_fields - -rendered_task_instance_fields - -dag_id - - [VARCHAR(250)] - NOT NULL - -map_index - - [INTEGER] - NOT NULL - -run_id - - [VARCHAR(250)] - NOT NULL - -task_id - - [VARCHAR(250)] - NOT NULL - -k8s_pod_yaml - - [JSON] - -rendered_fields - - [JSON] - NOT NULL + +rendered_task_instance_fields + +dag_id + + [VARCHAR(250)] + NOT NULL + +map_index + + [INTEGER] + NOT NULL + +run_id + + [VARCHAR(250)] + NOT NULL + +task_id + + [VARCHAR(250)] + NOT NULL + +k8s_pod_yaml + + [JSON] + +rendered_fields + + [JSON] + NOT NULL -task_instance:run_id--rendered_task_instance_fields:run_id - -0..N -1 +task_instance:map_index--rendered_task_instance_fields:map_index + +0..N +1 -task_instance:task_id--rendered_task_instance_fields:task_id - -0..N -1 +task_instance:run_id--rendered_task_instance_fields:run_id + +0..N +1 -task_instance:map_index--rendered_task_instance_fields:map_index - -0..N -1 +task_instance:task_id--rendered_task_instance_fields:task_id + +0..N +1 task_instance:dag_id--rendered_task_instance_fields:dag_id - -0..N -1 + +0..N +1 - + +hitl_detail_history + +hitl_detail_history + +ti_history_id + + [UUID] + NOT NULL + +assignees + + [JSON] + +body + + [TEXT] + +chosen_options + + [JSON] + +created_at + + [TIMESTAMP] + NOT NULL + +defaults + + [JSON] + +multiple + + [BOOLEAN] + +options + + [JSON] + NOT NULL + +params + + [JSON] + NOT NULL + +params_input + + [JSON] + NOT NULL + +responded_at + + [TIMESTAMP] + +responded_by + + [JSON] + +subject + + [TEXT] + NOT NULL + + + +task_instance_history:task_instance_id--hitl_detail_history:ti_history_id + +1 +1 + + + alembic_version - -alembic_version - -version_num - - [VARCHAR(32)] - NOT NULL + +alembic_version + +version_num + + [VARCHAR(32)] + NOT NULL diff --git a/airflow-core/docs/migrations-ref.rst b/airflow-core/docs/migrations-ref.rst index 16a2a460d872d..90500ea9b05fa 100644 --- a/airflow-core/docs/migrations-ref.rst +++ b/airflow-core/docs/migrations-ref.rst @@ -39,7 +39,9 @@ Here's the list of all the Database Migrations that are executed via when you ru +-------------------------+------------------+-------------------+--------------------------------------------------------------+ | Revision ID | Revises ID | Airflow Version | Description | +=========================+==================+===================+==============================================================+ -| ``1b2c3d4e5f6g`` (head) | ``ab6dc0c82d0e`` | ``3.2.0`` | Add length to dag_bundle_team.dag_bundle_name. | +| ``5cc8117e9285`` (head) | ``1b2c3d4e5f6g`` | ``3.2.0`` | Add Human In the Loop Detail History table. | ++-------------------------+------------------+-------------------+--------------------------------------------------------------+ +| ``1b2c3d4e5f6g`` | ``ab6dc0c82d0e`` | ``3.2.0`` | Add length to dag_bundle_team.dag_bundle_name. | +-------------------------+------------------+-------------------+--------------------------------------------------------------+ | ``ab6dc0c82d0e`` | ``15d84ca19038`` | ``3.2.0`` | Change ``serialized_dag`` data column to JSONB for | | | | | PostgreSQL. | diff --git a/airflow-core/src/airflow/api_fastapi/core_api/datamodels/hitl.py b/airflow-core/src/airflow/api_fastapi/core_api/datamodels/hitl.py index aa4f44f212bd9..a1be3479d4c48 100644 --- a/airflow-core/src/airflow/api_fastapi/core_api/datamodels/hitl.py +++ b/airflow-core/src/airflow/api_fastapi/core_api/datamodels/hitl.py @@ -24,7 +24,6 @@ from airflow.api_fastapi.core_api.base import BaseModel from airflow.api_fastapi.core_api.datamodels.task_instances import TaskInstanceResponse -from airflow.sdk import Param class UpdateHITLDetailPayload(BaseModel): @@ -50,10 +49,8 @@ class HITLUser(BaseModel): name: str -class HITLDetail(BaseModel): - """Schema for Human-in-the-loop detail.""" - - task_instance: TaskInstanceResponse +class BaseHITLDetail(BaseModel): + """The common part within HITLDetail and HITLDetailHisotry.""" # User Request Detail options: list[str] = Field(min_length=1) @@ -77,7 +74,13 @@ class HITLDetail(BaseModel): @classmethod def get_params(cls, params: dict[str, Any]) -> dict[str, Any]: """Convert params attribute to dict representation.""" - return {k: v.dump() if isinstance(v, Param) else v for k, v in params.items()} + return {k: v.dump() if getattr(v, "dump", None) else v for k, v in params.items()} + + +class HITLDetail(BaseHITLDetail): + """Schema for Human-in-the-loop detail.""" + + task_instance: TaskInstanceResponse class HITLDetailCollection(BaseModel): diff --git a/airflow-core/src/airflow/api_fastapi/core_api/datamodels/task_instance_history.py b/airflow-core/src/airflow/api_fastapi/core_api/datamodels/task_instance_history.py new file mode 100644 index 0000000000000..21f16d2777323 --- /dev/null +++ b/airflow-core/src/airflow/api_fastapi/core_api/datamodels/task_instance_history.py @@ -0,0 +1,77 @@ +# Licensed to the Apache Software Foundation (ASF) under one +# or more contributor license agreements. See the NOTICE file +# distributed with this work for additional information +# regarding copyright ownership. The ASF licenses this file +# to you under the Apache License, Version 2.0 (the +# "License"); you may not use this file except in compliance +# with the License. You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, +# software distributed under the License is distributed on an +# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +# KIND, either express or implied. See the License for the +# specific language governing permissions and limitations +# under the License. +from __future__ import annotations + +from datetime import datetime +from typing import Annotated + +from pydantic import ( + AliasPath, + BeforeValidator, + Field, +) + +from airflow.api_fastapi.core_api.base import BaseModel +from airflow.api_fastapi.core_api.datamodels.dag_versions import DagVersionResponse +from airflow.api_fastapi.core_api.datamodels.hitl import BaseHITLDetail +from airflow.utils.state import TaskInstanceState + + +class HITLDetailHisotry(BaseHITLDetail): + """Schema for Human-in-the-loop detail history.""" + + +class TaskInstanceHistoryResponse(BaseModel): + """TaskInstanceHistory serializer for responses.""" + + task_id: str + dag_id: str + + # todo: this should not be aliased; it's ambiguous with dag run's "id" - airflow 3.0 + run_id: str = Field(alias="dag_run_id") + + map_index: int + start_date: datetime | None + end_date: datetime | None + duration: float | None + state: TaskInstanceState | None + try_number: int + max_tries: int + task_display_name: str + dag_display_name: str = Field(validation_alias=AliasPath("dag_run", "dag_model", "dag_display_name")) + hostname: str | None + unixname: str | None + pool: str + pool_slots: int + queue: str | None + priority_weight: int | None + operator: str | None + custom_operator_name: str | None = Field(alias="operator_name") + queued_dttm: datetime | None = Field(alias="queued_when") + scheduled_dttm: datetime | None = Field(alias="scheduled_when") + pid: int | None + executor: str | None + executor_config: Annotated[str, BeforeValidator(str)] + dag_version: DagVersionResponse | None + hitl_detail: HITLDetailHisotry | None + + +class TaskInstanceHistoryCollectionResponse(BaseModel): + """TaskInstanceHistory Collection serializer for responses.""" + + task_instances: list[TaskInstanceHistoryResponse] + total_entries: int diff --git a/airflow-core/src/airflow/api_fastapi/core_api/datamodels/task_instances.py b/airflow-core/src/airflow/api_fastapi/core_api/datamodels/task_instances.py index 0b6edcf98e25b..45fa65762c688 100644 --- a/airflow-core/src/airflow/api_fastapi/core_api/datamodels/task_instances.py +++ b/airflow-core/src/airflow/api_fastapi/core_api/datamodels/task_instances.py @@ -141,47 +141,6 @@ class TaskInstancesBatchBody(StrictBaseModel): order_by: str | None = None -class TaskInstanceHistoryResponse(BaseModel): - """TaskInstanceHistory serializer for responses.""" - - task_id: str - dag_id: str - - # todo: this should not be aliased; it's ambiguous with dag run's "id" - airflow 3.0 - run_id: str = Field(alias="dag_run_id") - - map_index: int - start_date: datetime | None - end_date: datetime | None - duration: float | None - state: TaskInstanceState | None - try_number: int - max_tries: int - task_display_name: str - dag_display_name: str = Field(validation_alias=AliasPath("dag_run", "dag_model", "dag_display_name")) - hostname: str | None - unixname: str | None - pool: str - pool_slots: int - queue: str | None - priority_weight: int | None - operator: str | None - custom_operator_name: str | None = Field(alias="operator_name") - queued_dttm: datetime | None = Field(alias="queued_when") - scheduled_dttm: datetime | None = Field(alias="scheduled_when") - pid: int | None - executor: str | None - executor_config: Annotated[str, BeforeValidator(str)] - dag_version: DagVersionResponse | None - - -class TaskInstanceHistoryCollectionResponse(BaseModel): - """TaskInstanceHistory Collection serializer for responses.""" - - task_instances: list[TaskInstanceHistoryResponse] - total_entries: int - - class ClearTaskInstancesBody(StrictBaseModel): """Request body for Clear Task Instances endpoint.""" diff --git a/airflow-core/src/airflow/api_fastapi/core_api/openapi/_private_ui.yaml b/airflow-core/src/airflow/api_fastapi/core_api/openapi/_private_ui.yaml index a3858d6fc2a23..a600d09a3d9b8 100644 --- a/airflow-core/src/airflow/api_fastapi/core_api/openapi/_private_ui.yaml +++ b/airflow-core/src/airflow/api_fastapi/core_api/openapi/_private_ui.yaml @@ -2005,8 +2005,6 @@ components: description: DAG Run model for the Grid UI. HITLDetail: properties: - task_instance: - $ref: '#/components/schemas/TaskInstanceResponse' options: items: type: string @@ -2070,12 +2068,14 @@ components: type: boolean title: Response Received default: false + task_instance: + $ref: '#/components/schemas/TaskInstanceResponse' type: object required: - - task_instance - options - subject - created_at + - task_instance title: HITLDetail description: Schema for Human-in-the-loop detail. HITLUser: diff --git a/airflow-core/src/airflow/api_fastapi/core_api/openapi/v2-rest-api-generated.yaml b/airflow-core/src/airflow/api_fastapi/core_api/openapi/v2-rest-api-generated.yaml index 88d5a1ff4312d..4d3ff42e9205b 100644 --- a/airflow-core/src/airflow/api_fastapi/core_api/openapi/v2-rest-api-generated.yaml +++ b/airflow-core/src/airflow/api_fastapi/core_api/openapi/v2-rest-api-generated.yaml @@ -11170,8 +11170,6 @@ components: description: Serializer for Plugin FastAPI root middleware responses. HITLDetail: properties: - task_instance: - $ref: '#/components/schemas/TaskInstanceResponse' options: items: type: string @@ -11235,12 +11233,14 @@ components: type: boolean title: Response Received default: false + task_instance: + $ref: '#/components/schemas/TaskInstanceResponse' type: object required: - - task_instance - options - subject - created_at + - task_instance title: HITLDetail description: Schema for Human-in-the-loop detail. HITLDetailCollection: @@ -11259,6 +11259,78 @@ components: - total_entries title: HITLDetailCollection description: Schema for a collection of Human-in-the-loop details. + HITLDetailHisotry: + properties: + options: + items: + type: string + type: array + minItems: 1 + title: Options + subject: + type: string + title: Subject + body: + anyOf: + - type: string + - type: 'null' + title: Body + defaults: + anyOf: + - items: + type: string + type: array + - type: 'null' + title: Defaults + multiple: + type: boolean + title: Multiple + default: false + params: + additionalProperties: true + type: object + title: Params + assigned_users: + items: + $ref: '#/components/schemas/HITLUser' + type: array + title: Assigned Users + created_at: + type: string + format: date-time + title: Created At + responded_by_user: + anyOf: + - $ref: '#/components/schemas/HITLUser' + - type: 'null' + responded_at: + anyOf: + - type: string + format: date-time + - type: 'null' + title: Responded At + chosen_options: + anyOf: + - items: + type: string + type: array + - type: 'null' + title: Chosen Options + params_input: + additionalProperties: true + type: object + title: Params Input + response_received: + type: boolean + title: Response Received + default: false + type: object + required: + - options + - subject + - created_at + title: HITLDetailHisotry + description: Schema for Human-in-the-loop detail history. HITLDetailResponse: properties: responded_by: @@ -12146,6 +12218,10 @@ components: anyOf: - $ref: '#/components/schemas/DagVersionResponse' - type: 'null' + hitl_detail: + anyOf: + - $ref: '#/components/schemas/HITLDetailHisotry' + - type: 'null' type: object required: - task_id @@ -12174,6 +12250,7 @@ components: - executor - executor_config - dag_version + - hitl_detail title: TaskInstanceHistoryResponse description: TaskInstanceHistory serializer for responses. TaskInstanceResponse: diff --git a/airflow-core/src/airflow/api_fastapi/core_api/routes/public/task_instances.py b/airflow-core/src/airflow/api_fastapi/core_api/routes/public/task_instances.py index a4eb092885fa9..f834d3b51808c 100644 --- a/airflow-core/src/airflow/api_fastapi/core_api/routes/public/task_instances.py +++ b/airflow-core/src/airflow/api_fastapi/core_api/routes/public/task_instances.py @@ -60,14 +60,16 @@ ) from airflow.api_fastapi.common.router import AirflowRouter from airflow.api_fastapi.core_api.datamodels.common import BulkBody, BulkResponse +from airflow.api_fastapi.core_api.datamodels.task_instance_history import ( + TaskInstanceHistoryCollectionResponse, + TaskInstanceHistoryResponse, +) from airflow.api_fastapi.core_api.datamodels.task_instances import ( BulkTaskInstanceBody, ClearTaskInstancesBody, PatchTaskInstanceBody, TaskDependencyCollectionResponse, TaskInstanceCollectionResponse, - TaskInstanceHistoryCollectionResponse, - TaskInstanceHistoryResponse, TaskInstanceResponse, TaskInstancesBatchBody, ) @@ -321,6 +323,7 @@ def _query(orm_object: Base) -> Select: ) .options(joinedload(orm_object.dag_version)) .options(joinedload(orm_object.dag_run).options(joinedload(DagRun.dag_model))) + .options(joinedload(orm_object.hitl_detail)) ) return query @@ -644,12 +647,16 @@ def get_task_instance_try_details( """Get task instance details by try number.""" def _query(orm_object: Base) -> TI | TIH | None: - query = select(orm_object).where( - orm_object.dag_id == dag_id, - orm_object.run_id == dag_run_id, - orm_object.task_id == task_id, - orm_object.try_number == task_try_number, - orm_object.map_index == map_index, + query = ( + select(orm_object) + .where( + orm_object.dag_id == dag_id, + orm_object.run_id == dag_run_id, + orm_object.task_id == task_id, + orm_object.try_number == task_try_number, + orm_object.map_index == map_index, + ) + .options(joinedload(orm_object.hitl_detail)) ) task_instance = session.scalar(query) diff --git a/airflow-core/src/airflow/migrations/versions/0089_3_2_0_add_human_in_the_loop_detail_history.py b/airflow-core/src/airflow/migrations/versions/0089_3_2_0_add_human_in_the_loop_detail_history.py new file mode 100644 index 0000000000000..cd02d7a5a1a3f --- /dev/null +++ b/airflow-core/src/airflow/migrations/versions/0089_3_2_0_add_human_in_the_loop_detail_history.py @@ -0,0 +1,81 @@ +# +# Licensed to the Apache Software Foundation (ASF) under one +# or more contributor license agreements. See the NOTICE file +# distributed with this work for additional information +# regarding copyright ownership. The ASF licenses this file +# to you under the Apache License, Version 2.0 (the +# "License"); you may not use this file except in compliance +# with the License. You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, +# software distributed under the License is distributed on an +# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +# KIND, either express or implied. See the License for the +# specific language governing permissions and limitations +# under the License. + +""" +Add Human In the Loop Detail History table. + +Revision ID: 5cc8117e9285 +Revises: 1b2c3d4e5f6g +Create Date: 2025-09-22 13:21:21.957132 + +""" + +from __future__ import annotations + +import sqlalchemy_jsonfield +from alembic import op +from sqlalchemy import Boolean, Column, ForeignKeyConstraint, String, Text +from sqlalchemy.dialects import postgresql + +from airflow._shared.timezones import timezone +from airflow.settings import json +from airflow.utils.sqlalchemy import UtcDateTime + +# revision identifiers, used by Alembic. +revision = "5cc8117e9285" +down_revision = "1b2c3d4e5f6g" +branch_labels = None +depends_on = None +airflow_version = "3.2.0" + + +def upgrade(): + """Add Human In the Loop Detail History table.""" + op.create_table( + "hitl_detail_history", + Column( + "ti_history_id", + String(length=36).with_variant(postgresql.UUID(), "postgresql"), + primary_key=True, + nullable=False, + ), + Column("options", sqlalchemy_jsonfield.JSONField(json=json), nullable=False), + Column("subject", Text, nullable=False), + Column("body", Text, nullable=True), + Column("defaults", sqlalchemy_jsonfield.JSONField(json=json), nullable=True), + Column("multiple", Boolean, unique=False, default=False), + Column("params", sqlalchemy_jsonfield.JSONField(json=json), nullable=False, default={}), + Column("assignees", sqlalchemy_jsonfield.JSONField(json=json), nullable=True), + Column("created_at", UtcDateTime(timezone=True), nullable=False, default=timezone.utcnow), + Column("responded_at", UtcDateTime, nullable=True), + Column("responded_by", sqlalchemy_jsonfield.JSONField(json=json), nullable=True), + Column("chosen_options", sqlalchemy_jsonfield.JSONField(json=json), nullable=True), + Column("params_input", sqlalchemy_jsonfield.JSONField(json=json), nullable=False, default={}), + ForeignKeyConstraint( + ["ti_history_id"], + ["task_instance_history.task_instance_id"], + name="hitl_detail_history_tih_fkey", + ondelete="CASCADE", + onupdate="CASCADE", + ), + ) + + +def downgrade(): + """Response Human In the Loop Detail Hisotry table.""" + op.drop_table("hitl_detail_history") diff --git a/airflow-core/src/airflow/models/hitl.py b/airflow-core/src/airflow/models/hitl.py index b6bbb2bc402b0..c1bf9b8fb5c96 100644 --- a/airflow-core/src/airflow/models/hitl.py +++ b/airflow-core/src/airflow/models/hitl.py @@ -80,50 +80,12 @@ class HITLUser(TypedDict): name: str -class HITLDetail(Base): - """Human-in-the-loop request and corresponding response.""" - - __tablename__ = "hitl_detail" - ti_id = Column( - String(36).with_variant(postgresql.UUID(as_uuid=False), "postgresql"), - primary_key=True, - nullable=False, - ) - - # User Request Detail - options = Column(sqlalchemy_jsonfield.JSONField(json=json), nullable=False) - subject = Column(Text, nullable=False) - body = Column(Text, nullable=True) - defaults = Column(sqlalchemy_jsonfield.JSONField(json=json), nullable=True) - multiple = Column(Boolean, unique=False, default=False) - params = Column(sqlalchemy_jsonfield.JSONField(json=json), nullable=False, default={}) - assignees = Column(sqlalchemy_jsonfield.JSONField(json=json), nullable=True) - created_at = Column(UtcDateTime, default=timezone.utcnow, nullable=False) - - # Response Content Detail - responded_at = Column(UtcDateTime, nullable=True) - responded_by = Column(sqlalchemy_jsonfield.JSONField(json=json), nullable=True) - chosen_options = Column( - sqlalchemy_jsonfield.JSONField(json=json), - nullable=True, - default=None, - ) - params_input = Column(sqlalchemy_jsonfield.JSONField(json=json), nullable=False, default={}) - task_instance = relationship( - "TaskInstance", - lazy="joined", - back_populates="hitl_detail", - ) +class HITLDetailPropertyMixin: + """The property part of HITLDetail and HITLDetailHistory.""" - __table_args__ = ( - ForeignKeyConstraint( - (ti_id,), - ["task_instance.id"], - name="hitl_detail_ti_fkey", - ondelete="CASCADE", - onupdate="CASCADE", - ), - ) + responded_at: UtcDateTime + responded_by: dict[str, Any] + assignees: list[dict[str, str]] @hybrid_property def response_received(self) -> bool: @@ -169,3 +131,49 @@ def responded_by_user(self) -> HITLUser | None: id=self.responded_by["id"], name=self.responded_by["name"], ) + + +class HITLDetail(Base, HITLDetailPropertyMixin): + """Human-in-the-loop request and corresponding response.""" + + __tablename__ = "hitl_detail" + ti_id = Column( + String(36).with_variant(postgresql.UUID(as_uuid=False), "postgresql"), + primary_key=True, + nullable=False, + ) + + # User Request Detail + options = Column(sqlalchemy_jsonfield.JSONField(json=json), nullable=False) + subject = Column(Text, nullable=False) + body = Column(Text, nullable=True) + defaults = Column(sqlalchemy_jsonfield.JSONField(json=json), nullable=True) + multiple = Column(Boolean, unique=False, default=False) + params = Column(sqlalchemy_jsonfield.JSONField(json=json), nullable=False, default={}) + assignees = Column(sqlalchemy_jsonfield.JSONField(json=json), nullable=True) + created_at = Column(UtcDateTime, default=timezone.utcnow, nullable=False) + + # Response Content Detail + responded_at = Column(UtcDateTime, nullable=True) + responded_by = Column(sqlalchemy_jsonfield.JSONField(json=json), nullable=True) + chosen_options = Column( + sqlalchemy_jsonfield.JSONField(json=json), + nullable=True, + default=None, + ) + params_input = Column(sqlalchemy_jsonfield.JSONField(json=json), nullable=False, default={}) + task_instance = relationship( + "TaskInstance", + lazy="joined", + back_populates="hitl_detail", + ) + + __table_args__ = ( + ForeignKeyConstraint( + (ti_id,), + ["task_instance.id"], + name="hitl_detail_ti_fkey", + ondelete="CASCADE", + onupdate="CASCADE", + ), + ) diff --git a/airflow-core/src/airflow/models/hitl_history.py b/airflow-core/src/airflow/models/hitl_history.py new file mode 100644 index 0000000000000..94be216460956 --- /dev/null +++ b/airflow-core/src/airflow/models/hitl_history.py @@ -0,0 +1,91 @@ +# Licensed to the Apache Software Foundation (ASF) under one +# or more contributor license agreements. See the NOTICE file +# distributed with this work for additional information +# regarding copyright ownership. The ASF licenses this file +# to you under the Apache License, Version 2.0 (the +# "License"); you may not use this file except in compliance +# with the License. You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, +# software distributed under the License is distributed on an +# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +# KIND, either express or implied. See the License for the +# specific language governing permissions and limitations +# under the License. +from __future__ import annotations + +from typing import TYPE_CHECKING + +import sqlalchemy_jsonfield +from sqlalchemy import Boolean, Column, ForeignKeyConstraint, String, Text +from sqlalchemy.dialects import postgresql +from sqlalchemy.orm import relationship + +from airflow._shared.timezones import timezone +from airflow.models.base import Base +from airflow.models.hitl import HITLDetailPropertyMixin +from airflow.settings import json +from airflow.utils.sqlalchemy import UtcDateTime + +if TYPE_CHECKING: + from airflow.models.hitl import HITLDetail + + +class HITLDetailHistory(Base, HITLDetailPropertyMixin): + """ + Store HITLDetail for old tries of TaskInstances. + + :meta private: + """ + + __tablename__ = "hitl_detail_history" + ti_history_id = Column( + String(36).with_variant(postgresql.UUID(as_uuid=False), "postgresql"), + primary_key=True, + nullable=False, + ) + + # User Request Detail + options = Column(sqlalchemy_jsonfield.JSONField(json=json), nullable=False) + subject = Column(Text, nullable=False) + body = Column(Text, nullable=True) + defaults = Column(sqlalchemy_jsonfield.JSONField(json=json), nullable=True) + multiple = Column(Boolean, unique=False, default=False) + params = Column(sqlalchemy_jsonfield.JSONField(json=json), nullable=False, default={}) + assignees = Column(sqlalchemy_jsonfield.JSONField(json=json), nullable=True) + created_at = Column(UtcDateTime, default=timezone.utcnow, nullable=False) + + # Response Content Detail + responded_at = Column(UtcDateTime, nullable=True) + responded_by = Column(sqlalchemy_jsonfield.JSONField(json=json), nullable=True) + chosen_options = Column( + sqlalchemy_jsonfield.JSONField(json=json), + nullable=True, + default=None, + ) + params_input = Column(sqlalchemy_jsonfield.JSONField(json=json), nullable=False, default={}) + task_instance = relationship( + "TaskInstanceHistory", + lazy="joined", + back_populates="hitl_detail", + ) + + def __init__(self, hitl_detail: HITLDetail): + super().__init__() + for column in self.__table__.columns: + if column.name == "ti_history_id": + setattr(self, column.name, hitl_detail.ti_id) + continue + setattr(self, column.name, getattr(hitl_detail, column.name)) + + __table_args__ = ( + ForeignKeyConstraint( + (ti_history_id,), + ["task_instance_history.task_instance_id"], + name="hitl_detail_history_tih_fkey", + ondelete="CASCADE", + onupdate="CASCADE", + ), + ) diff --git a/airflow-core/src/airflow/models/taskinstancehistory.py b/airflow-core/src/airflow/models/taskinstancehistory.py index b5932b28c58a3..4ca4caecc4ce3 100644 --- a/airflow-core/src/airflow/models/taskinstancehistory.py +++ b/airflow-core/src/airflow/models/taskinstancehistory.py @@ -40,6 +40,8 @@ from airflow._shared.timezones import timezone from airflow.models.base import Base, StringID +from airflow.models.hitl import HITLDetail +from airflow.models.hitl_history import HITLDetailHistory from airflow.utils.session import NEW_SESSION, provide_session from airflow.utils.span_status import SpanStatus from airflow.utils.sqlalchemy import ( @@ -121,6 +123,8 @@ class TaskInstanceHistory(Base): foreign_keys=[run_id, dag_id], ) + hitl_detail = relationship("HITLDetailHistory", lazy="noload", uselist=False) + def __init__( self, ti: TaskInstance, @@ -190,6 +194,10 @@ def record_ti(ti: TaskInstance, session: Session = NEW_SESSION) -> None: ti_history = TaskInstanceHistory(ti, state=ti_history_state) session.add(ti_history) + ti_hitl_detail = session.scalar(select(HITLDetail).where(HITLDetail.ti_id == ti.id)) + if ti_hitl_detail is not None: + session.add(HITLDetailHistory(ti_hitl_detail)) + @provide_session def get_dagrun(self, session: Session = NEW_SESSION) -> DagRun: """Return the DagRun for this TaskInstanceHistory, matching TaskInstance.""" diff --git a/airflow-core/src/airflow/ui/openapi-gen/requests/schemas.gen.ts b/airflow-core/src/airflow/ui/openapi-gen/requests/schemas.gen.ts index 23b53707db2fc..159478097f6a4 100644 --- a/airflow-core/src/airflow/ui/openapi-gen/requests/schemas.gen.ts +++ b/airflow-core/src/airflow/ui/openapi-gen/requests/schemas.gen.ts @@ -3681,9 +3681,6 @@ export const $FastAPIRootMiddlewareResponse = { export const $HITLDetail = { properties: { - task_instance: { - '$ref': '#/components/schemas/TaskInstanceResponse' - }, options: { items: { type: 'string' @@ -3788,10 +3785,13 @@ export const $HITLDetail = { type: 'boolean', title: 'Response Received', default: false + }, + task_instance: { + '$ref': '#/components/schemas/TaskInstanceResponse' } }, type: 'object', - required: ['task_instance', 'options', 'subject', 'created_at'], + required: ['options', 'subject', 'created_at', 'task_instance'], title: 'HITLDetail', description: 'Schema for Human-in-the-loop detail.' } as const; @@ -3816,6 +3816,120 @@ export const $HITLDetailCollection = { description: 'Schema for a collection of Human-in-the-loop details.' } as const; +export const $HITLDetailHisotry = { + properties: { + options: { + items: { + type: 'string' + }, + type: 'array', + minItems: 1, + title: 'Options' + }, + subject: { + type: 'string', + title: 'Subject' + }, + body: { + anyOf: [ + { + type: 'string' + }, + { + type: 'null' + } + ], + title: 'Body' + }, + defaults: { + anyOf: [ + { + items: { + type: 'string' + }, + type: 'array' + }, + { + type: 'null' + } + ], + title: 'Defaults' + }, + multiple: { + type: 'boolean', + title: 'Multiple', + default: false + }, + params: { + additionalProperties: true, + type: 'object', + title: 'Params' + }, + assigned_users: { + items: { + '$ref': '#/components/schemas/HITLUser' + }, + type: 'array', + title: 'Assigned Users' + }, + created_at: { + type: 'string', + format: 'date-time', + title: 'Created At' + }, + responded_by_user: { + anyOf: [ + { + '$ref': '#/components/schemas/HITLUser' + }, + { + type: 'null' + } + ] + }, + responded_at: { + anyOf: [ + { + type: 'string', + format: 'date-time' + }, + { + type: 'null' + } + ], + title: 'Responded At' + }, + chosen_options: { + anyOf: [ + { + items: { + type: 'string' + }, + type: 'array' + }, + { + type: 'null' + } + ], + title: 'Chosen Options' + }, + params_input: { + additionalProperties: true, + type: 'object', + title: 'Params Input' + }, + response_received: { + type: 'boolean', + title: 'Response Received', + default: false + } + }, + type: 'object', + required: ['options', 'subject', 'created_at'], + title: 'HITLDetailHisotry', + description: 'Schema for Human-in-the-loop detail history.' +} as const; + export const $HITLDetailResponse = { properties: { responded_by: { @@ -5085,10 +5199,20 @@ export const $TaskInstanceHistoryResponse = { type: 'null' } ] + }, + hitl_detail: { + anyOf: [ + { + '$ref': '#/components/schemas/HITLDetailHisotry' + }, + { + type: 'null' + } + ] } }, type: 'object', - required: ['task_id', 'dag_id', 'dag_run_id', 'map_index', 'start_date', 'end_date', 'duration', 'state', 'try_number', 'max_tries', 'task_display_name', 'dag_display_name', 'hostname', 'unixname', 'pool', 'pool_slots', 'queue', 'priority_weight', 'operator', 'operator_name', 'queued_when', 'scheduled_when', 'pid', 'executor', 'executor_config', 'dag_version'], + required: ['task_id', 'dag_id', 'dag_run_id', 'map_index', 'start_date', 'end_date', 'duration', 'state', 'try_number', 'max_tries', 'task_display_name', 'dag_display_name', 'hostname', 'unixname', 'pool', 'pool_slots', 'queue', 'priority_weight', 'operator', 'operator_name', 'queued_when', 'scheduled_when', 'pid', 'executor', 'executor_config', 'dag_version', 'hitl_detail'], title: 'TaskInstanceHistoryResponse', description: 'TaskInstanceHistory serializer for responses.' } as const; diff --git a/airflow-core/src/airflow/ui/openapi-gen/requests/types.gen.ts b/airflow-core/src/airflow/ui/openapi-gen/requests/types.gen.ts index 63e325717c4d4..d1c58397edde8 100644 --- a/airflow-core/src/airflow/ui/openapi-gen/requests/types.gen.ts +++ b/airflow-core/src/airflow/ui/openapi-gen/requests/types.gen.ts @@ -964,7 +964,6 @@ export type FastAPIRootMiddlewareResponse = { * Schema for Human-in-the-loop detail. */ export type HITLDetail = { - task_instance: TaskInstanceResponse; options: Array<(string)>; subject: string; body?: string | null; @@ -982,6 +981,7 @@ export type HITLDetail = { [key: string]: unknown; }; response_received?: boolean; + task_instance: TaskInstanceResponse; }; /** @@ -992,6 +992,29 @@ export type HITLDetailCollection = { total_entries: number; }; +/** + * Schema for Human-in-the-loop detail history. + */ +export type HITLDetailHisotry = { + options: Array<(string)>; + subject: string; + body?: string | null; + defaults?: Array<(string)> | null; + multiple?: boolean; + params?: { + [key: string]: unknown; + }; + assigned_users?: Array; + created_at: string; + responded_by_user?: HITLUser | null; + responded_at?: string | null; + chosen_options?: Array<(string)> | null; + params_input?: { + [key: string]: unknown; + }; + response_received?: boolean; +}; + /** * Response of updating a Human-in-the-loop detail. */ @@ -1349,6 +1372,7 @@ export type TaskInstanceHistoryResponse = { executor: string | null; executor_config: string; dag_version: DagVersionResponse | null; + hitl_detail: HITLDetailHisotry | null; }; /** diff --git a/airflow-core/src/airflow/utils/db.py b/airflow-core/src/airflow/utils/db.py index 6e430d3dc4ea1..d283f04acf6cb 100644 --- a/airflow-core/src/airflow/utils/db.py +++ b/airflow-core/src/airflow/utils/db.py @@ -111,7 +111,7 @@ class MappedClassProtocol(Protocol): "3.0.0": "29ce7909c52b", "3.0.3": "fe199e1abd77", "3.1.0": "cc92b33c6709", - "3.2.0": "ab6dc0c82d0e", + "3.2.0": "5cc8117e9285", } diff --git a/airflow-core/tests/unit/api_fastapi/core_api/routes/public/test_task_instances.py b/airflow-core/tests/unit/api_fastapi/core_api/routes/public/test_task_instances.py index 0c06e83392cf7..6aaa54b8479b8 100644 --- a/airflow-core/tests/unit/api_fastapi/core_api/routes/public/test_task_instances.py +++ b/airflow-core/tests/unit/api_fastapi/core_api/routes/public/test_task_instances.py @@ -36,6 +36,7 @@ from airflow.listeners.listener import get_listener_manager from airflow.models import DagRun, Log, TaskInstance from airflow.models.dag_version import DagVersion +from airflow.models.hitl import HITLDetail from airflow.models.renderedtifields import RenderedTaskInstanceFields as RTIF from airflow.models.taskinstancehistory import TaskInstanceHistory from airflow.models.taskmap import TaskMap @@ -53,6 +54,9 @@ from tests_common.test_utils.logs import check_last_log from tests_common.test_utils.mock_operators import MockOperator +if TYPE_CHECKING: + from tests_common.pytest_plugin import CreateTaskInstance + pytestmark = pytest.mark.db_test DEFAULT = datetime(2020, 1, 1) @@ -1904,6 +1908,7 @@ def test_should_respond_200(self, test_client, session): "unixname": getuser(), "dag_run_id": "TEST_DAG_RUN_ID", "dag_version": mock.ANY, + "hitl_detail": None, } @pytest.mark.parametrize("try_number", [1, 2]) @@ -1941,6 +1946,7 @@ def test_should_respond_200_with_different_try_numbers(self, test_client, try_nu "unixname": getuser(), "dag_run_id": "TEST_DAG_RUN_ID", "dag_version": mock.ANY, + "hitl_detail": None, } @pytest.mark.parametrize("try_number", [1, 2]) @@ -2009,6 +2015,7 @@ def test_should_respond_200_with_mapped_task_at_different_try_numbers( "unixname": getuser(), "dag_run_id": "TEST_DAG_RUN_ID", "dag_version": mock.ANY, + "hitl_detail": None, } def test_should_respond_200_with_task_state_in_deferred(self, test_client, session): @@ -2073,6 +2080,7 @@ def test_should_respond_200_with_task_state_in_deferred(self, test_client, sessi "unixname": getuser(), "dag_run_id": "TEST_DAG_RUN_ID", "dag_version": mock.ANY, + "hitl_detail": None, } def test_should_respond_200_with_task_state_in_removed(self, test_client, session): @@ -2111,6 +2119,77 @@ def test_should_respond_200_with_task_state_in_removed(self, test_client, sessio "unixname": getuser(), "dag_run_id": "TEST_DAG_RUN_ID", "dag_version": mock.ANY, + "hitl_detail": None, + } + + def test_should_respond_200_with_hitl( + self, test_client, create_task_instance: CreateTaskInstance, session + ): + ti = create_task_instance(dag_id="test_hitl_dag", task_id="sample_task_hitl") + ti.try_number = 1 + session.add(ti) + hitl_detail = HITLDetail( + ti_id=ti.id, + options=["Approve", "Reject"], + subject="This is subject", + body="this is body", + defaults=["Approve"], + multiple=False, + params={"input_1": 1}, + assignees=None, + ) + session.add(hitl_detail) + session.commit() + # Record the TaskInstanceHistory + TaskInstanceHistory.record_ti(ti, session=session) + session.flush() + + response = test_client.get( + f"/dags/{ti.dag_id}/dagRuns/{ti.run_id}/taskInstances/{ti.task_id}/tries/1", + ) + assert response.status_code == 200 + assert response.json() == { + "dag_id": "test_hitl_dag", + "dag_display_name": "test_hitl_dag", + "duration": None, + "end_date": mock.ANY, + "executor": None, + "executor_config": "{}", + "hostname": "", + "map_index": -1, + "max_tries": 0, + "operator": "EmptyOperator", + "operator_name": "EmptyOperator", + "pid": None, + "pool": "default_pool", + "pool_slots": 1, + "priority_weight": 1, + "queue": "default", + "queued_when": None, + "scheduled_when": None, + "start_date": None, + "state": None, + "task_id": "sample_task_hitl", + "task_display_name": "sample_task_hitl", + "try_number": 1, + "unixname": getuser(), + "dag_run_id": "test", + "dag_version": mock.ANY, + "hitl_detail": { + "assigned_users": [], + "body": "this is body", + "chosen_options": None, + "created_at": mock.ANY, + "defaults": ["Approve"], + "multiple": False, + "options": ["Approve", "Reject"], + "params": {"input_1": 1}, + "params_input": {}, + "responded_at": None, + "responded_by_user": None, + "response_received": False, + "subject": "This is subject", + }, } def test_should_respond_401(self, unauthenticated_test_client): @@ -2190,6 +2269,7 @@ def test_should_respond_200_with_versions( "created_at": mock.ANY, "dag_display_name": "dag_with_multiple_versions", }, + "hitl_detail": None, } @pytest.mark.parametrize( @@ -2244,6 +2324,7 @@ def test_should_respond_200_with_versions_using_url_template( "created_at": mock.ANY, "dag_display_name": "dag_with_multiple_versions", }, + "hitl_detail": None, } def test_should_not_return_duplicate_runs(self, test_client, session): @@ -3175,6 +3256,7 @@ def test_should_respond_200(self, test_client, session): "unixname": getuser(), "dag_run_id": "TEST_DAG_RUN_ID", "dag_version": mock.ANY, + "hitl_detail": None, }, { "dag_id": "example_python_operator", @@ -3203,11 +3285,87 @@ def test_should_respond_200(self, test_client, session): "unixname": getuser(), "dag_run_id": "TEST_DAG_RUN_ID", "dag_version": mock.ANY, + "hitl_detail": None, }, ], "total_entries": 2, } + def test_should_respond_200_with_hitl( + self, test_client, create_task_instance: CreateTaskInstance, session + ): + ti = create_task_instance(dag_id="test_hitl_dag", task_id="sample_task_hitl") + ti.try_number = 1 + session.add(ti) + hitl_detail = HITLDetail( + ti_id=ti.id, + options=["Approve", "Reject"], + subject="This is subject", + body="this is body", + defaults=["Approve"], + multiple=False, + params={"input_1": 1}, + assignees=None, + ) + session.add(hitl_detail) + session.commit() + # Record the TaskInstanceHistory + TaskInstanceHistory.record_ti(ti, session=session) + session.flush() + + response = test_client.get( + f"/dags/{ti.dag_id}/dagRuns/{ti.run_id}/taskInstances/{ti.task_id}/tries", + ) + assert response.status_code == 200 + assert response.json() == { + "task_instances": [ + { + "dag_id": "test_hitl_dag", + "dag_display_name": "test_hitl_dag", + "duration": None, + "end_date": mock.ANY, + "executor": None, + "executor_config": "{}", + "hostname": "", + "map_index": -1, + "max_tries": 0, + "operator": "EmptyOperator", + "operator_name": "EmptyOperator", + "pid": None, + "pool": "default_pool", + "pool_slots": 1, + "priority_weight": 1, + "queue": "default", + "queued_when": None, + "scheduled_when": None, + "start_date": None, + "state": None, + "task_id": "sample_task_hitl", + "task_display_name": "sample_task_hitl", + "try_number": 1, + "unixname": getuser(), + "dag_run_id": "test", + "dag_version": mock.ANY, + "hitl_detail": { + "assigned_users": [], + "body": "this is body", + "chosen_options": None, + "created_at": mock.ANY, + "defaults": ["Approve"], + "multiple": False, + "options": ["Approve", "Reject"], + "params": {"input_1": 1}, + "params_input": {}, + "responded_at": None, + "responded_by_user": None, + "response_received": False, + "subject": "This is subject", + }, + }, + ], + "total_entries": 1, + } + def test_should_respond_401(self, unauthenticated_test_client): response = unauthenticated_test_client.get( "/dags/example_python_operator/dagRuns/TEST_DAG_RUN_ID/taskInstances/print_the_context/tries" @@ -3264,6 +3422,7 @@ def test_ti_in_retry_state_not_returned(self, test_client, session): "unixname": getuser(), "dag_run_id": "TEST_DAG_RUN_ID", "dag_version": mock.ANY, + "hitl_detail": None, }, ], "total_entries": 1, @@ -3336,6 +3495,7 @@ def test_mapped_task_should_respond_200(self, test_client, session): "unixname": getuser(), "dag_run_id": "TEST_DAG_RUN_ID", "dag_version": mock.ANY, + "hitl_detail": None, }, { "dag_id": "example_python_operator", @@ -3364,6 +3524,7 @@ def test_mapped_task_should_respond_200(self, test_client, session): "unixname": getuser(), "dag_run_id": "TEST_DAG_RUN_ID", "dag_version": mock.ANY, + "hitl_detail": None, }, ], "total_entries": 2, @@ -3435,6 +3596,7 @@ def test_should_respond_200_with_versions( "created_at": mock.ANY, "dag_display_name": "dag_with_multiple_versions", }, + "hitl_detail": None, } @pytest.mark.parametrize( @@ -3490,6 +3652,7 @@ def test_should_respond_200_with_versions_using_url_template( "created_at": mock.ANY, "dag_display_name": "dag_with_multiple_versions", }, + "hitl_detail": None, } diff --git a/airflow-core/tests/unit/models/test_taskinstance.py b/airflow-core/tests/unit/models/test_taskinstance.py index 74260a05a11ce..fccac898d315b 100644 --- a/airflow-core/tests/unit/models/test_taskinstance.py +++ b/airflow-core/tests/unit/models/test_taskinstance.py @@ -22,7 +22,7 @@ import operator import os import pathlib -from typing import cast +from typing import TYPE_CHECKING, cast from unittest import mock from unittest.mock import patch @@ -44,6 +44,7 @@ from airflow.models.connection import Connection from airflow.models.dag_version import DagVersion from airflow.models.dagrun import DagRun +from airflow.models.hitl_history import HITLDetailHistory from airflow.models.pool import Pool from airflow.models.renderedtifields import RenderedTaskInstanceFields from airflow.models.serialized_dag import SerializedDagModel @@ -59,6 +60,9 @@ from airflow.models.xcom import XComModel from airflow.providers.standard.operators.bash import BashOperator from airflow.providers.standard.operators.empty import EmptyOperator +from airflow.providers.standard.operators.hitl import ( + HITLBranchOperator, +) from airflow.providers.standard.operators.python import PythonOperator from airflow.providers.standard.sensors.python import PythonSensor from airflow.sdk import DAG, BaseOperator, BaseSensorOperator, Metadata, task, task_group @@ -88,6 +92,11 @@ from tests_common.test_utils.mock_operators import MockOperator from unit.models import DEFAULT_DATE +if TYPE_CHECKING: + from sqlalchemy.orm import Session + + from tests_common.pytest_plugin import DagMaker + pytestmark = [pytest.mark.db_test, pytest.mark.need_serialized_dag, pytest.mark.want_activate_assets] @@ -2687,6 +2696,40 @@ def test_task_instance_history_is_created_when_ti_goes_for_retry(self, dag_maker # the new try_id should be different from what's recorded in tih assert str(tih[0].task_instance_id) == try_id + def test_task_instance_history_with_hitl_history_is_created_when_ti_goes_for_retry( + self, dag_maker: DagMaker, session: Session + ): + with dag_maker(serialized=True): + task = HITLBranchOperator( + task_id="hitl_test", + subject="This is subject", + body="This is body", + options=["1", "2", "3", "4", "5"], + params={"input": 1}, + retries=1, + retry_delay=datetime.timedelta(seconds=2), + notifiers=[None], + ) + + dr = dag_maker.create_dagrun() + ti = dr.task_instances[0] + ti.task = task + try_id = ti.id + with pytest.raises(TypeError): + ti.run() + ti = session.query(TaskInstance).one() + # the ti.id should be different from the previous one + assert ti.id != try_id + assert ti.state == State.UP_FOR_RETRY + assert session.query(TaskInstance).count() == 1 + tih = session.query(TaskInstanceHistory).all() + assert len(tih) == 1 + # the new try_id should be different from what's recorded in tih + assert str(tih[0].task_instance_id) == try_id + hitl_histories = session.query(HITLDetailHistory).all() + assert len(hitl_histories) == 1 + assert str(hitl_histories[0].task_instance.id) == try_id + @pytest.mark.parametrize("pool_override", [None, "test_pool2"]) @pytest.mark.parametrize("queue_by_policy", [None, "forced_queue"]) diff --git a/airflow-ctl/src/airflowctl/api/datamodels/generated.py b/airflow-ctl/src/airflowctl/api/datamodels/generated.py index 61a85d4e3b192..15d090f7d10d0 100644 --- a/airflow-ctl/src/airflowctl/api/datamodels/generated.py +++ b/airflow-ctl/src/airflowctl/api/datamodels/generated.py @@ -1477,6 +1477,26 @@ class EventLogCollectionResponse(BaseModel): total_entries: Annotated[int, Field(title="Total Entries")] +class HITLDetailHisotry(BaseModel): + """ + Schema for Human-in-the-loop detail history. + """ + + options: Annotated[list[str], Field(min_length=1, title="Options")] + subject: Annotated[str, Field(title="Subject")] + body: Annotated[str | None, Field(title="Body")] = None + defaults: Annotated[list[str] | None, Field(title="Defaults")] = None + multiple: Annotated[bool | None, Field(title="Multiple")] = False + params: Annotated[dict[str, Any] | None, Field(title="Params")] = None + assigned_users: Annotated[list[HITLUser] | None, Field(title="Assigned Users")] = None + created_at: Annotated[datetime, Field(title="Created At")] + responded_by_user: HITLUser | None = None + responded_at: Annotated[datetime | None, Field(title="Responded At")] = None + chosen_options: Annotated[list[str] | None, Field(title="Chosen Options")] = None + params_input: Annotated[dict[str, Any] | None, Field(title="Params Input")] = None + response_received: Annotated[bool | None, Field(title="Response Received")] = False + + class HITLDetailResponse(BaseModel): """ Response of updating a Human-in-the-loop detail. @@ -1641,6 +1661,7 @@ class TaskInstanceHistoryResponse(BaseModel): executor: Annotated[str | None, Field(title="Executor")] = None executor_config: Annotated[str, Field(title="Executor Config")] dag_version: DagVersionResponse | None = None + hitl_detail: HITLDetailHisotry | None = None class TaskInstanceResponse(BaseModel): @@ -1875,7 +1896,6 @@ class HITLDetail(BaseModel): Schema for Human-in-the-loop detail. """ - task_instance: TaskInstanceResponse options: Annotated[list[str], Field(min_length=1, title="Options")] subject: Annotated[str, Field(title="Subject")] body: Annotated[str | None, Field(title="Body")] = None @@ -1889,6 +1909,7 @@ class HITLDetail(BaseModel): chosen_options: Annotated[list[str] | None, Field(title="Chosen Options")] = None params_input: Annotated[dict[str, Any] | None, Field(title="Params Input")] = None response_received: Annotated[bool | None, Field(title="Response Received")] = False + task_instance: TaskInstanceResponse class HITLDetailCollection(BaseModel):