Guard transport.send() in proc() so IAM/gRPC errors don't crash supervised components#68250
Conversation
0555885 to
72bd974
Compare
henry3260
left a comment
There was a problem hiding this comment.
Could you clarify how the observed IAM, gRPC error propagates from _transport.send()? StackdriverRemoteLogIO uses BackgroundThreadTransport by default, whosesend()only enqueues the entry. The network request happens in the background worker, where exceptions frombatch.commit()are already caught.
c55644c to
1e8ef8b
Compare
d7f36ad to
b4f0658
Compare
@goingforstudying-ctrl please address Henry's concern, thanks! |
fdb836f to
bf2e7d8
Compare
In Airflow 3 the supervisor process runs REMOTE_TASK_LOG handlers, but record.task_instance is never set in supervisor context (it is a task-subprocess concept). When task_instance is missing the proc() closure shipped log entries with empty labels, making Cloud Logging entries unsearchable by dag_id / task_id. Parse dag_id, task_id, and try_number from the structured AF3 log path (dag_id=<x>/run_id=<x>/task_id=<x>/attempt=<N>.log) instead. This requires zero DB access and works regardless of whether the handler runs in a task subprocess or the supervisor. relates to apache#68240
- Rename _labels_from_path to _extract_labels_from_path - Use path labels as fallback instead of overriding event-derived labels - Actually store run_id instead of no-op pass branch
|
Closing this one — the label extraction fix was already merged via #68292 by ashb. Henry's point about BackgroundThreadTransport already catching network errors is also correct: send() only enqueues so the described IAM crash scenario doesn't apply. Thanks for the reviews. |
Fixes Bug 3 from #68240.
What
StackdriverRemoteLogIO.processors→proc()callstransport.send()withoutany error handling. In Airflow 3's supervisor model,
REMOTE_TASK_LOGappliesto ALL supervised components (scheduler, dag-processor, triggerer, workers).
A single IAM misconfiguration or gRPC error would crash the entire process.
Observed: dag-processor pod enters
CrashLoopBackOffon every log emit when theKubernetes Service Account lacks the
logging.logEntries.createIAM binding.Fix
Wrap
_transport.send()intry/except Exceptionand emit alogging.warninginstead of propagating. Log delivery is best-effort;a Cloud Logging error should never kill a task-executing process.
Changes
_transport.send()call inproc()with try/excepttest_processors_survives_transport_send_failureverifies:relates to #68240