Add message_history to AgentOperator for multi-turn agent sessions#68648
Merged
Conversation
b2482e3 to
ec83203
Compare
AgentOperator and @task.agent ran a fresh single-turn conversation every time. Add an opt-in message_history parameter that seeds the run with prior turns and pushes the post-run transcript to XCom (key 'message_history') so the next run can resume. Default None keeps single-turn behavior unchanged. Storing the transcript under a session key stays the DAG's responsibility.
ec83203 to
8ab8b52
Compare
75 tasks
This file contains hidden or bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
Sign up for free
to join this conversation on GitHub.
Already have an account?
Sign in to comment
Add this suggestion to a batch that can be applied as a single commit.This suggestion is invalid because no changes were made to the code.Suggestions cannot be applied while the pull request is closed.Suggestions cannot be applied while viewing a subset of changes.Only one suggestion per line can be applied in a batch.Add this suggestion to a batch that can be applied as a single commit.Applying suggestions on deleted lines is not supported.You must change the existing code in this line in order to create a valid suggestion.Outdated suggestions cannot be applied.This suggestion has been applied or marked resolved.Suggestions cannot be applied from pending reviews.Suggestions cannot be applied on multi-line comments.Suggestions cannot be applied while the pull request is queued to merge.Suggestion cannot be applied right now. Please check back later.
AgentOperator(and the@task.agentdecorator) ran a fresh, single-turnconversation on every run. There was no supported way to seed a run with prior
turns or to persist the resulting transcript for the next run, so conversational
or iterative agents that resume a conversation across DAG runs could not be
expressed with the operator.
This adds an opt-in
message_historyparameter. When set, the operator seeds therun with the prior turns and pushes the full post-run transcript to XCom (key
message_history) so the next run can resume.message_history=None(thedefault) keeps the existing single-turn behavior unchanged, so there is no impact
on current users.
How it works
message_historyaccepts a list of pydantic-aiModelMessageobjects or theirJSON form (
str/bytes), and is a templated field. It is deserialized viaModelMessagesTypeAdapterand passed torun_sync(message_history=...)on boththe durable and non-durable branches.
result.all_messages()) is serialized andpushed to XCom under the key
message_history.[]/""starts a fresh session, so a templated{{ ti.xcom_pull(task_ids='ask', key='message_history', default='[]') }}workson the first run (no XCom yet) instead of failing to parse the string
"None".Usage
A multi-turn session brackets the agent with a load and a save task. Where the
transcript is stored (keyed by a session id, e.g. in object storage) is left to
the DAG:
https://github.com/astronomer/airflow/blob/3378a9c6c3c6e6fe4002a6ad8ccc5b3a08c2bd52/providers/common/ai/src/airflow/providers/common/ai/example_dags/example_agent.py#L224-L268
Design notes
(history in, transcript out); it does not own where a session is stored.
Session keying is deployment-specific and belongs in the DAG, and keeping the
surface this thin avoids committing the provider to a message-persistence
protocol while the upstream ones are still settling.
message_historycannot be combined withenable_hitl_review(the operatorraises at construction, mirroring the existing
durable+ HITL guard). Thepost-review transcript is not recoverable today (
run_hitl_reviewreturns onlythe final string), so emitting the pre-review transcript would silently drop the
human-approved turns. This can be lifted once HITL surfaces the final history.
Gotchas
configure an object-storage XCom backend or trim older turns before feeding the
history back, rather than passing the whole history unbounded.