Skip to content

Add durable execution for AgentOperator & @task.agent#64199

Merged
kaxil merged 6 commits into
apache:mainfrom
astronomer:worktree-iridescent-mapping-cupcake
Mar 25, 2026
Merged

Add durable execution for AgentOperator & @task.agent#64199
kaxil merged 6 commits into
apache:mainfrom
astronomer:worktree-iridescent-mapping-cupcake

Conversation

@kaxil

@kaxil kaxil commented Mar 25, 2026

Copy link
Copy Markdown
Member

(Part of AIP-99 Common Data Access Pattern + AI)

Adds durable=True parameter to AgentOperator and @task.agent that caches each LLM response and tool result to ObjectStorage as the agent runs. On retry, cached steps replay instantly -- no repeated LLM calls, no repeated tool execution, no repeated cost.

How it works

  1. First run: each model response and tool result is saved to a JSON file as the agent progresses.
  2. If the task fails and Airflow retries it, completed steps are loaded from the cache. Only steps not yet cached run against the live model and tools.
  3. After successful completion, the cache file is deleted.

Architecture:

  • CachingModel wraps the pydantic-ai model to intercept request() calls
  • CachingToolset wraps toolsets to intercept call_tool() calls
  • DurableStepCounter shared counter for deterministic step ordering (handles concurrent tool calls)
  • DurableStorage persists all steps in a single JSON file on ObjectStorage

Why ObjectStorage instead of XCom?

Airflow 3.x clears XCom for a task on retry. ObjectStorage (file://, s3://, gs://, etc.) survives retry clearing. The cache file is deleted on successful completion.

Demo

Attempt 1 -- agent runs normally, caching each step. A transient failure occurs after 3 tool calls:

Attempt 1: agent runs with tool calls, then fails

Attempt 2 -- cached steps replay instantly (no LLM calls, no tool execution), then the agent continues from where it left off:

Attempt 2: cached steps replayed, agent continues

Summary line at INFO level shows how many steps were replayed vs executed fresh:

Durable summary: replayed N cached steps

Per-step detail is available at DEBUG level:

DEBUG level: individual cached/replayed step logs
Try it yourself

1. Create a demo database and connection

# Inside breeze shell
python -c "
import sqlite3
conn = sqlite3.connect('/tmp/demo_analytics.db')
conn.executescript('''
CREATE TABLE IF NOT EXISTS customers (
    id INTEGER PRIMARY KEY, name TEXT, email TEXT, plan TEXT, signed_up DATE
);
CREATE TABLE IF NOT EXISTS orders (
    id INTEGER PRIMARY KEY, customer_id INTEGER, product TEXT, amount REAL, status TEXT, created_at DATE
);
CREATE TABLE IF NOT EXISTS support_tickets (
    id INTEGER PRIMARY KEY, customer_id INTEGER, subject TEXT, priority TEXT, resolved BOOLEAN, created_at DATE
);
DELETE FROM customers; DELETE FROM orders; DELETE FROM support_tickets;
INSERT INTO customers VALUES
    (1,\"Acme Corp\",\"admin@acme.com\",\"enterprise\",\"2024-01-15\"),
    (2,\"Globex Inc\",\"ops@globex.com\",\"pro\",\"2024-03-22\"),
    (3,\"Initech\",\"peter@initech.com\",\"starter\",\"2024-06-01\"),
    (4,\"Umbrella Ltd\",\"info@umbrella.com\",\"enterprise\",\"2024-02-10\");
INSERT INTO orders VALUES
    (1,1,\"Platform License\",12000.00,\"completed\",\"2024-02-01\"),
    (2,1,\"Support Add-on\",3000.00,\"completed\",\"2024-03-15\"),
    (3,2,\"Platform License\",5000.00,\"completed\",\"2024-04-01\"),
    (4,3,\"Starter Plan\",500.00,\"completed\",\"2024-06-15\"),
    (5,4,\"Platform License\",12000.00,\"completed\",\"2024-03-01\");
INSERT INTO support_tickets VALUES
    (1,3,\"Cannot login\",\"high\",1,\"2024-06-20\"),
    (2,1,\"Billing question\",\"low\",1,\"2024-04-10\"),
    (3,4,\"API rate limiting\",\"medium\",0,\"2024-07-15\");
''')
conn.commit()
conn.close()
print('Done')
"

# Create the connection
airflow connections add sqlite_default --conn-type sqlite --conn-host /tmp/demo_analytics.db

2. Configure durable cache path

Add to airflow.cfg:

[common.ai]
durable_cache_path = file:///tmp/airflow_durable_cache

3. Create the demo DAG

Save to your dags folder (uses a FlakyToolset that fails once on query when a flag file exists):

from __future__ import annotations

import os
from datetime import timedelta

from airflow.providers.common.ai.operators.agent import AgentOperator
from airflow.providers.common.ai.toolsets.sql import SQLToolset
from airflow.providers.common.compat.sdk import dag, task

FAIL_FLAG = "/tmp/durable_demo_fail"


class FlakyToolset(SQLToolset):
    """SQLToolset that fails once on the 'query' tool when a flag file exists."""

    async def call_tool(self, name, tool_args, ctx, tool):
        result = await super().call_tool(name, tool_args, ctx, tool)
        if name == "query" and os.path.exists(FAIL_FLAG):
            os.remove(FAIL_FLAG)
            raise RuntimeError("Simulated transient failure (retry will succeed)")
        return result


@dag(
    schedule=None,
    default_args={"retries": 3, "retry_delay": timedelta(seconds=5)},
    tags=["demo", "durable"],
)
def demo_durable_execution():
    AgentOperator(
        task_id="durable_sql_agent",
        prompt="Which customers have the highest total order value? Show the top 3.",
        llm_conn_id="my_llm",        # your pydanticai connection
        durable=True,
        toolsets=[FlakyToolset(db_conn_id="sqlite_default", max_rows=20)],
    )


demo_durable_execution()

4. Run the demo

# Create the flag file so the first attempt fails
touch /tmp/durable_demo_fail

# Trigger the DAG
airflow dags trigger demo_durable_execution

Attempt 1 will run list_tables, get_schema, etc. (all cached), then fail on query. The flag file is removed on failure. Attempt 2 retries automatically -- cached steps replay instantly, then query succeeds.

Usage

AgentOperator(
    task_id="analyst",
    prompt="Analyze the data",
    llm_conn_id="my_llm",
    durable=True,
    retries=3,
    toolsets=[SQLToolset(db_conn_id="postgres_default")],
)

Requires [common.ai] durable_cache_path in airflow.cfg:

[common.ai]
durable_cache_path = file:///tmp/airflow_durable_cache
# or: s3://my-bucket/airflow/durable-cache

Side effects

Durable execution caches return values, not side effects. Read-only tools (SQLToolset, HookToolset) replay safely. Tools with side effects (file writes, API calls) won't re-execute on replay -- the cached return value is used instead. See the docs for details on idempotency considerations.

Dependencies

Bumps pydantic-ai-slim to >=1.34.0 for WrapperModel/WrapperToolset base classes.

kaxil added 3 commits March 25, 2026 12:58
When `durable=True`, model responses and tool results are cached
step-by-step via ObjectStorage. On retry, cached steps replay
instead of re-executing LLM calls and tool operations.

Architecture:
- CachingModel(WrapperModel) intercepts model.request() calls
- CachingToolset(WrapperToolset) intercepts tool.call_tool() calls
- DurableStepCounter provides shared monotonic step indexing
- DurableStorage persists all steps in a single JSON file on
  ObjectStorage (configured via [common.ai] durable_cache_path)
- Cache file is deleted on successful task completion

Bumps pydantic-ai-slim to >=1.34.0 for WrapperModel/WrapperToolset.
@kaxil kaxil force-pushed the worktree-iridescent-mapping-cupcake branch from bd9a25e to 9f22f34 Compare March 25, 2026 13:06
@kaxil kaxil changed the title Add durable execution for AgentOperator Add durable execution for AgentOperator & @task.agent Mar 25, 2026
- Rewrite docs section: clarify config errors, production storage,
  retries requirement, side effects/idempotency, cache cleanup
- Move per-step durable logs to DEBUG, add single INFO summary line
  showing replayed vs new steps after agent run completes
- Remove noisy storage-level log lines (save/hit/cleanup)
- Swap toolset wrapping order so CachingToolset sits inside
  LoggingToolset (cache logs appear within tool call log groups)
- Track replay/cache stats on DurableStepCounter
@kaxil kaxil marked this pull request as ready for review March 25, 2026 14:14
@kaxil kaxil requested a review from gopidesupavan as a code owner March 25, 2026 14:14
- Catch json.JSONDecodeError in _load_cache so a truncated cache file
  (from a crash mid-write) is treated as empty instead of crashing
  all subsequent retries
- Wrap save_tool_result json.dumps with clear TypeError message when
  a custom tool returns non-JSON-serializable values
- Fix docs: per-step logs are DEBUG, only the summary is INFO
… handling

- Raise ValueError if both durable=True and enable_hitl_review=True
  are set (HITL regeneration bypasses durable model wrapping)
- Skip caching with a warning for non-serializable tool results
  (e.g. BinaryContent from MCP tools) instead of failing the task
- Document the BinaryContent limitation in agent.rst
@kaxil

kaxil commented Mar 25, 2026

Copy link
Copy Markdown
Member Author

Addressed both review comments in 35d5833:

HITL + durable — Added ValueError in __init__ if both durable=True and enable_hitl_review=True. HITL regeneration builds a fresh agent without the CachingModel wrapper, so model calls during review would bypass caching. Making them mutually exclusive for now.

BinaryContent from MCP tools — Changed save_tool_result to catch TypeError and log a warning instead of failing the task. The tool call succeeds, but that step won't be cached (it'll re-execute on retry). Added a note in the docs about the JSON-serializable requirement.

@kaxil kaxil merged commit 10715fb into apache:main Mar 25, 2026
139 of 144 checks passed
@kaxil kaxil deleted the worktree-iridescent-mapping-cupcake branch March 25, 2026 19:12
nailo2c pushed a commit to nailo2c/airflow that referenced this pull request Mar 30, 2026
…#64199)

When `durable=True`, model responses and tool results are cached
step-by-step via ObjectStorage. On retry, cached steps replay
instead of re-executing LLM calls and tool operations.

Architecture:
- CachingModel(WrapperModel) intercepts model.request() calls
- CachingToolset(WrapperToolset) intercepts tool.call_tool() calls
- DurableStepCounter provides shared monotonic step indexing
- DurableStorage persists all steps in a single JSON file on
  ObjectStorage (configured via [common.ai] durable_cache_path)
- Cache file is deleted on successful task completion

Bumps pydantic-ai-slim to >=1.34.0 for WrapperModel/WrapperToolset.
Suraj-kumar00 pushed a commit to Suraj-kumar00/airflow that referenced this pull request Apr 7, 2026
…#64199)

When `durable=True`, model responses and tool results are cached
step-by-step via ObjectStorage. On retry, cached steps replay
instead of re-executing LLM calls and tool operations.

Architecture:
- CachingModel(WrapperModel) intercepts model.request() calls
- CachingToolset(WrapperToolset) intercepts tool.call_tool() calls
- DurableStepCounter provides shared monotonic step indexing
- DurableStorage persists all steps in a single JSON file on
  ObjectStorage (configured via [common.ai] durable_cache_path)
- Cache file is deleted on successful task completion

Bumps pydantic-ai-slim to >=1.34.0 for WrapperModel/WrapperToolset.
abhijeets25012-tech pushed a commit to abhijeets25012-tech/airflow that referenced this pull request Apr 9, 2026
…#64199)

When `durable=True`, model responses and tool results are cached
step-by-step via ObjectStorage. On retry, cached steps replay
instead of re-executing LLM calls and tool operations.

Architecture:
- CachingModel(WrapperModel) intercepts model.request() calls
- CachingToolset(WrapperToolset) intercepts tool.call_tool() calls
- DurableStepCounter provides shared monotonic step indexing
- DurableStorage persists all steps in a single JSON file on
  ObjectStorage (configured via [common.ai] durable_cache_path)
- Cache file is deleted on successful task completion

Bumps pydantic-ai-slim to >=1.34.0 for WrapperModel/WrapperToolset.
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment

Projects

Development

Successfully merging this pull request may close these issues.

2 participants