Add Vertex AI Agent Engine operators (Create, Get, Query, Update, Delete)#68479
Add Vertex AI Agent Engine operators (Create, Get, Query, Update, Delete)#68479AlejandroMorgante wants to merge 23 commits into
Conversation
7efe781 to
36971f7
Compare
SameerMesiah97
left a comment
There was a problem hiding this comment.
This is a large PR so I could only get through part of the diff in one sitting. I have left some comment for you to address.
MaksYermak
left a comment
There was a problem hiding this comment.
@AlejandroMorgante could you please provider a screenshot from Airflow UI that the new example_vertex_ai_agent_engine.py system test pass successfully?
Sure. I validated the same operator lifecycle end-to-end in a real Airflow environment using this DAG: AlejandroMorgante/agentic-airflow-demo@bedf76b It exercises:
All tasks completed successfully:
The DAG uses my real Agent Engine container, so the query payload is adapted to that runtime, but the operator lifecycle is the same as in |
34b8962 to
4747cfc
Compare
|
Quickest fix: git fetch upstream main && git rebase upstream/main
rm uv.lock && uv lock
git add uv.lock && git rebase --continue
git push --force-with-leaseAutomated nudge — ignore if you're not ready to rebase. This comment is updated in place on future |
962d3b2 to
d13b01f
Compare
8473b09 to
3917f29
Compare
SameerMesiah97
left a comment
There was a problem hiding this comment.
I have left some more comments.
e62e21c to
ad80e73
Compare
…ete) Adds hook, operators, deferrable trigger, unit tests, and a system test for the Vertex AI Agent Engine (Reasoning Engine) API. The Query operator uses the REST endpoint directly because the public SDK only exposes run_query_job for async/GCS-backed queries, while the private _query helper is affected by a response-parsing bug in google-genai==2.8.0.
- Fix error message: JSONDecodeError now raises "must be valid JSON"
- Replace hardcoded REST URL with sdk_client._api_client.request(), letting
the SDK handle URL construction and auth (removes google.auth.transport dep)
- Add request_timeout param to query_agent_engine and QueryAgentEngineOperator
- Add poll_interval=30 default to wait_for_agent_engine_deleted
- Fix docstrings: project_id marked Optional and moved to end of param list
- Clarify force=None docstring: distinguishes "not specified" from False
- Remove validate_execute_complete_event helper (inlined in execute_complete)
- Remove redundant xcom_push("agent_engine_name") from CreateAgentEngineOperator
- Trigger: use "timeout" status (not "error") and >= for boundary check
- execute_complete: explicit branches for success / timeout / error
Introduces CheckQueryAgentEngineOperator (with deferrable support via AgentEngineQueryJobTrigger) to poll the status of a batch query job started by QueryAgentEngineOperator. Switches QueryAgentEngineOperator from a custom REST call to the SDK's public run_query_job() method, and removes the now-unused internal polling helpers.
… Engine - Move _serialize_value from operators and triggers into the hook module so both import from a single source of truth - Warn on unknown query job status values in wait_for_query_agent_engine_job to surface unexpected API responses instead of silently looping - Add round-trip serialize test for AgentEngineQueryJobTrigger with a Pydantic model config to verify _serialize_value is applied on serialize()
ad80e73 to
70ede63
Compare

Add a hook, operators, deferrable triggers, unit tests, docs, and a system test for the Vertex AI Agent Engine (Reasoning Engine) API.
Operators
CreateAgentEngineOperator- creates a new Agent Engine resource.GetAgentEngineOperator- retrieves an existing Agent Engine.QueryAgentEngineOperator- starts a query job using the publicrun_query_jobSDK method and returns the submitted job metadata.CheckQueryAgentEngineOperator- waits for a query job using the publiccheck_query_jobSDK method, optionally retrieves the output from Cloud Storage, and supports deferrable mode.UpdateAgentEngineOperator- updates an Agent Engine.DeleteAgentEngineOperator- deletes an Agent Engine, with optional wait / deferrable mode.Review-driven changes
This PR was updated based on review feedback to avoid relying on private SDK helpers for query execution and to keep the Airflow task semantics explicit:
run_query_jobAPI.CheckQueryAgentEngineOperatorso submitting a query job and waiting for its output are represented as separate Airflow tasks.AgentEngineQueryJobTriggerfor deferrable query-job checks.QueryAgentEngineOperator >> CheckQueryAgentEngineOperatorflow.Changes
providers/google/cloud/hooks/vertex_ai/agent_engine.py- new Agent Engine hook methods.providers/google/cloud/operators/vertex_ai/agent_engine.py- new Agent Engine operators.providers/google/cloud/triggers/vertex_ai.py- Agent Engine delete and query-job triggers.providers/google/provider.yaml- hook, operator, and system test registration.providers/google/get_provider_info.py- matching provider metadata registration.providers/google/docs/operators/cloud/vertex_ai.rst- Agent Engine operator docs.example_vertex_ai_agent_engine.py.Validation
Validated end-to-end against a real GCP environment, including create, get, query-job submission, query-job check/output retrieval, update, and delete flows. The latest query flow was also validated in a production environment successfully.
The public demo repository used for validation is available at https://github.com/AlejandroMorgante/agentic-airflow-demo.
Was generative AI tooling used to co-author this PR?
Generated-by: Claude Code (Sonnet 4.6) and Codex (GPT-5) following the guidelines