From b47cdab778fa361da1f0b9be9b26d6f70bdc73bd Mon Sep 17 00:00:00 2001 From: James Ross Date: Wed, 20 May 2026 05:37:42 -0700 Subject: [PATCH 1/9] feat(core): stage ticketed runtime ingress --- CHANGELOG.md | 12 ++ crates/warp-core/src/coordinator.rs | 269 ++++++++++++++++++++++++++++ crates/warp-core/src/head_inbox.rs | 10 ++ crates/warp-core/src/lib.rs | 4 +- crates/warp-core/tests/inbox.rs | 254 +++++++++++++++++++++++++- docs/BEARING.md | 15 +- 6 files changed, 551 insertions(+), 13 deletions(-) diff --git a/CHANGELOG.md b/CHANGELOG.md index 9fc85e47..4b02d6ef 100644 --- a/CHANGELOG.md +++ b/CHANGELOG.md @@ -7,6 +7,18 @@ ### Added +- `warp-core` now exposes a ticketed runtime ingress boundary. + `WorldlineRuntime::submit_intent(...)` records witnessed submission history + without entering a head inbox, ticking, dispatching handlers, or mutating + application state. `WorldlineRuntime::ingest_ticketed_invocation(...)` stages + a witnessed submission into runtime ingress only when the caller holds the + explicit `TicketedRuntimeIngressAuthority` runtime-owner token and supplies an + `OpticAdmissionTicket`, records deterministic ticketed-ingress correlation + material, rejects unknown or mismatched submissions, and treats duplicate + staging of the same ticket/submission pair idempotently. This does not + correlate tick receipts, expose intent outcome observation, dispatch installed + handlers, execute contracts outside scheduler-owned ticks, or introduce + automatic retry. - `warp-core` optic invocation admission now issues an `OpticAdmissionTicket` after BasisResolution, ApertureResolution, BudgetResolution, RuntimeSupport, capability identity coverage, InvocationAdmission, SchedulerAdmission, diff --git a/crates/warp-core/src/coordinator.rs b/crates/warp-core/src/coordinator.rs index 47409b5e..0da05b9d 100644 --- a/crates/warp-core/src/coordinator.rs +++ b/crates/warp-core/src/coordinator.rs @@ -17,6 +17,7 @@ use crate::head::{ }; use crate::head_inbox::{InboxAddress, InboxIngestResult, IngressEnvelope, IngressTarget}; use crate::ident::Hash; +use crate::optic_artifact::OpticAdmissionTicket; use crate::provenance_store::{ HistoryError, ProvenanceCheckpoint, ProvenanceEntry, ProvenanceService, ProvenanceStore, ReplayError, @@ -92,6 +93,15 @@ pub enum RuntimeError { /// the runtime counter can represent. #[error("intent submission generation overflow")] IntentSubmissionGenerationOverflow, + /// Ticketed runtime ingress referenced a submission Echo has not witnessed. + #[error("unknown intent submission: {0:?}")] + UnknownIntentSubmission(Hash), + /// Ticketed runtime ingress envelope did not match the witnessed submission. + #[error("ticketed runtime ingress does not match witnessed submission: {0:?}")] + TicketedIngressSubmissionMismatch(Hash), + /// A different admission ticket already staged this witnessed submission. + #[error("witnessed submission already has ticketed runtime ingress: {0:?}")] + TicketedIngressAlreadyStaged(Hash), } /// Echo-owned intake/correlation generation for witnessed intent submissions. @@ -145,6 +155,58 @@ pub struct IntentSubmissionRecord { pub submission_generation: IngressSubmissionGeneration, } +/// Explicit authority token for staging ticketed runtime ingress. +/// +/// Application-facing code should not hold this token. An admission ticket is +/// evidence, but ticketed runtime ingress is an Echo runtime-owner action: +/// handing this token to application/plugin/browser code would let that code +/// choose which witnessed submissions enter scheduler-visible runtime ingress. +#[derive(Clone, Copy, Debug, PartialEq, Eq)] +pub struct TicketedRuntimeIngressAuthority { + _private: (), +} + +impl TicketedRuntimeIngressAuthority { + /// Assumes trusted runtime-owner authority for staging ticketed ingress. + /// + /// The caller must prove it is executing inside Echo's trusted runtime + /// owner, test harness, or equivalent host-controlled boundary. + #[cfg(feature = "host_test")] + #[doc(hidden)] + #[must_use] + pub fn assume_runtime_owner() -> Self { + Self { _private: () } + } +} + +/// Result of accepting an intent into witnessed ingress history. +#[derive(Clone, Copy, Debug, PartialEq, Eq)] +pub enum IntentSubmissionDisposition { + /// Echo recorded a new witnessed submission without entering runtime ingress. + Accepted { + /// Content-addressed ingress id. + ingress_id: Hash, + /// Resolved semantic writer-head target. + head_key: WriterHeadKey, + /// Witnessed submission event id. + submission_id: Hash, + /// Echo-owned intake/correlation generation. + submission_generation: IngressSubmissionGeneration, + }, + /// Echo had already witnessed this semantic submission. + Duplicate { + /// Content-addressed ingress id. + ingress_id: Hash, + /// Resolved semantic writer-head target. + head_key: WriterHeadKey, + /// Existing witnessed submission event id. + submission_id: Hash, + /// Existing submission generation, or zero when only the duplicate + /// identity can be derived from replayed committed state. + submission_generation: IngressSubmissionGeneration, + }, +} + /// Result of ingesting an envelope into the runtime. #[derive(Clone, Copy, Debug, PartialEq, Eq)] pub enum IngressDisposition { @@ -173,6 +235,41 @@ pub enum IngressDisposition { }, } +/// Runtime ingress staged from a witnessed submission and admission ticket. +/// +/// This record is correlation material only. It is not a tick receipt, not +/// handler dispatch, and not execution. +#[derive(Clone, Debug, PartialEq, Eq)] +pub struct TicketedRuntimeIngressRecord { + /// Content-addressed ticketed-ingress event id. + pub ticketed_ingress_id: Hash, + /// Witnessed Echo submission being staged. + pub submission_id: Hash, + /// Admission ticket digest that authorizes runtime ingress. + pub ticket_digest: Hash, + /// Content-addressed canonical ingress id. + pub ingress_id: Hash, + /// Resolved semantic writer-head target. + pub head_key: WriterHeadKey, +} + +/// Result of staging a ticketed invocation into runtime ingress. +#[derive(Clone, Debug, PartialEq, Eq)] +pub enum TicketedRuntimeIngressDisposition { + /// The ticketed invocation entered the runtime inbox. + Staged { + /// Ticketed ingress correlation record. + record: TicketedRuntimeIngressRecord, + /// Underlying inbox disposition. + ingress: IngressDisposition, + }, + /// The same ticketed invocation had already been staged. + Duplicate { + /// Existing ticketed ingress correlation record. + record: TicketedRuntimeIngressRecord, + }, +} + /// Request to fork a strand from one precise source-lane coordinate. #[derive(Clone, Debug)] pub struct ForkStrandRequest { @@ -230,6 +327,10 @@ pub struct WorldlineRuntime { /// Deterministic lookup from resolved semantic target and ingress id to /// witnessed submission id. submission_by_target: BTreeMap<(WriterHeadKey, Hash), Hash>, + /// Ticketed runtime ingress records keyed by deterministic event id. + ticketed_runtime_ingress: BTreeMap, + /// Deterministic lookup from witnessed submission id to ticketed ingress. + ticketed_runtime_ingress_by_submission: BTreeMap, /// Registry of live speculative strands attached to the runtime. strands: StrandRegistry, } @@ -318,6 +419,28 @@ impl WorldlineRuntime { self.witnessed_submissions.len() } + /// Returns a ticketed runtime ingress record by deterministic event id. + #[must_use] + pub fn ticketed_runtime_ingress( + &self, + ticketed_ingress_id: &Hash, + ) -> Option<&TicketedRuntimeIngressRecord> { + self.ticketed_runtime_ingress.get(ticketed_ingress_id) + } + + /// Iterates ticketed runtime ingress records in deterministic id order. + pub fn ticketed_runtime_ingress_records( + &self, + ) -> impl Iterator { + self.ticketed_runtime_ingress.values() + } + + /// Returns the number of staged ticketed runtime ingress records. + #[must_use] + pub fn ticketed_runtime_ingress_count(&self) -> usize { + self.ticketed_runtime_ingress.len() + } + /// Returns the current correlation tick. #[must_use] pub fn global_tick(&self) -> GlobalTick { @@ -584,6 +707,136 @@ impl WorldlineRuntime { Ok(()) } + /// Records an accepted intent submission without entering runtime ingress. + /// + /// This is witnessed Echo ingress history only. It does not store the + /// envelope in a head inbox, does not advance ticks, and does not dispatch + /// handlers. A later ticketed runtime ingress step must stage the envelope + /// before scheduler-owned execution can consider it. + /// + /// # Errors + /// + /// Returns an error if the routing target does not resolve or if the target + /// head would reject the envelope under its inbox policy. + pub fn submit_intent( + &mut self, + envelope: IngressEnvelope, + ) -> Result { + let ingress_id = envelope.ingress_id(); + let head_key = self.resolve_target(envelope.target())?; + + if self + .worldlines + .get(&head_key.worldline_id) + .is_some_and(|frontier| { + frontier + .state() + .contains_committed_ingress(&head_key, &ingress_id) + }) + { + let record = self.duplicate_submission_record(head_key, ingress_id); + return Ok(IntentSubmissionDisposition::Duplicate { + ingress_id, + head_key, + submission_id: record.submission_id, + submission_generation: record.submission_generation, + }); + } + + let head = self + .heads + .get(&head_key) + .ok_or(RuntimeError::UnknownHead(head_key))?; + if !head.inbox().would_accept(&envelope) { + return Err(RuntimeError::RejectedByPolicy(head_key)); + } + + if let Some(record) = self + .submission_by_target + .get(&(head_key, ingress_id)) + .and_then(|submission_id| self.witnessed_submissions.get(submission_id)) + { + return Ok(IntentSubmissionDisposition::Duplicate { + ingress_id, + head_key, + submission_id: record.submission_id, + submission_generation: record.submission_generation, + }); + } + + let record = self.record_witnessed_submission(head_key, ingress_id)?; + Ok(IntentSubmissionDisposition::Accepted { + ingress_id, + head_key, + submission_id: record.submission_id, + submission_generation: record.submission_generation, + }) + } + + /// Stages a witnessed submission into runtime ingress using an admission ticket. + /// + /// The ticket opens the runtime ingress boundary only. This method does not + /// tick, dispatch handlers, execute contracts, correlate receipts, or + /// observe outcomes. + /// + /// # Errors + /// + /// Returns an error when the submission is unknown, the envelope does not + /// match the witnessed submission, the target rejects the envelope, or a + /// different ticket has already staged the same submission. + pub fn ingest_ticketed_invocation( + &mut self, + _authority: &TicketedRuntimeIngressAuthority, + submission_id: Hash, + ticket: &OpticAdmissionTicket, + envelope: IngressEnvelope, + ) -> Result { + let Some(submission) = self.witnessed_submissions.get(&submission_id) else { + return Err(RuntimeError::UnknownIntentSubmission(submission_id)); + }; + let ingress_id = envelope.ingress_id(); + let head_key = self.resolve_target(envelope.target())?; + if submission.ingress_id != ingress_id || submission.head_key != head_key { + return Err(RuntimeError::TicketedIngressSubmissionMismatch( + submission_id, + )); + } + + let ticketed_ingress_id = derive_ticketed_runtime_ingress_id( + submission_id, + ticket.ticket_digest, + ingress_id, + head_key, + ); + if let Some(existing_id) = self + .ticketed_runtime_ingress_by_submission + .get(&submission_id) + .copied() + { + let Some(record) = self.ticketed_runtime_ingress.get(&existing_id).cloned() else { + return Err(RuntimeError::TicketedIngressAlreadyStaged(submission_id)); + }; + if existing_id == ticketed_ingress_id { + return Ok(TicketedRuntimeIngressDisposition::Duplicate { record }); + } + return Err(RuntimeError::TicketedIngressAlreadyStaged(submission_id)); + } + + let ingress = self.ingest(envelope)?; + let record = TicketedRuntimeIngressRecord { + ticketed_ingress_id, + submission_id, + ticket_digest: ticket.ticket_digest, + ingress_id, + head_key, + }; + self.ticketed_runtime_ingress + .insert(ticketed_ingress_id, record.clone()); + self.ticketed_runtime_ingress_by_submission + .insert(submission_id, ticketed_ingress_id); + Ok(TicketedRuntimeIngressDisposition::Staged { record, ingress }) + } + /// Resolves an ingress envelope to a specific writer head and stores it in that inbox. /// /// # Errors @@ -734,6 +987,22 @@ fn derive_intent_submission_id(head_key: WriterHeadKey, ingress_id: Hash) -> Has hasher.finalize().into() } +fn derive_ticketed_runtime_ingress_id( + submission_id: Hash, + ticket_digest: Hash, + ingress_id: Hash, + head_key: WriterHeadKey, +) -> Hash { + let mut hasher = blake3::Hasher::new(); + hasher.update(b"echo.ticketed-runtime-ingress"); + hasher.update(&submission_id); + hasher.update(&ticket_digest); + hasher.update(head_key.worldline_id.as_bytes()); + hasher.update(head_key.head_id.as_bytes()); + hasher.update(&ingress_id); + hasher.finalize().into() +} + // ============================================================================= // StepRecord // ============================================================================= diff --git a/crates/warp-core/src/head_inbox.rs b/crates/warp-core/src/head_inbox.rs index e2bbf2cb..27a3c1f8 100644 --- a/crates/warp-core/src/head_inbox.rs +++ b/crates/warp-core/src/head_inbox.rs @@ -327,6 +327,16 @@ impl HeadInbox { } } + /// Returns `true` when this inbox policy would accept the envelope. + /// + /// This performs the policy check without storing the envelope. It is used + /// by witnessed submission intake so Echo can record accepted ingress + /// history without entering runtime scheduling. + #[must_use] + pub fn would_accept(&self, envelope: &IngressEnvelope) -> bool { + self.policy_accepts(envelope) + } + /// Returns `true` if the policy would accept this envelope. fn policy_accepts(&self, envelope: &IngressEnvelope) -> bool { match &self.policy { diff --git a/crates/warp-core/src/lib.rs b/crates/warp-core/src/lib.rs index c18960aa..63e6ac64 100644 --- a/crates/warp-core/src/lib.rs +++ b/crates/warp-core/src/lib.rs @@ -326,7 +326,9 @@ pub use worldline::{ /// Prefer this coordinator/runtime API for new stepping and routing code. pub use coordinator::{ ForkStrandReceipt, ForkStrandRequest, IngressDisposition, IngressSubmissionGeneration, - IntentSubmissionRecord, RuntimeError, SchedulerCoordinator, StepRecord, WorldlineRuntime, + IntentSubmissionDisposition, IntentSubmissionRecord, RuntimeError, SchedulerCoordinator, + StepRecord, TicketedRuntimeIngressAuthority, TicketedRuntimeIngressDisposition, + TicketedRuntimeIngressRecord, WorldlineRuntime, }; /// Writer-head registry and routing primitives used by the runtime-owned ingress path. pub use head::{ diff --git a/crates/warp-core/tests/inbox.rs b/crates/warp-core/tests/inbox.rs index 23c506f9..3a9d0217 100644 --- a/crates/warp-core/tests/inbox.rs +++ b/crates/warp-core/tests/inbox.rs @@ -4,11 +4,17 @@ //! Runtime-owned ingress integration tests. use warp_core::{ - make_head_id, make_intent_kind, make_node_id, make_type_id, Engine, EngineBuilder, GraphStore, - InboxAddress, InboxPolicy, IngressDisposition, IngressEnvelope, IngressTarget, NodeId, - NodeRecord, PlaybackMode, ProvenanceEventKind, ProvenanceService, ProvenanceStore, - SchedulerCoordinator, SchedulerKind, WorldlineId, WorldlineRuntime, WorldlineState, - WorldlineTick, WorldlineTickPatchV1, WriterHead, WriterHeadKey, + make_head_id, make_intent_kind, make_node_id, make_type_id, Engine, EngineBuilder, GlobalTick, + GraphStore, InboxAddress, InboxPolicy, IngressDisposition, IngressEnvelope, IngressTarget, + IntentSubmissionDisposition, NodeId, NodeRecord, PlaybackMode, ProvenanceEventKind, + ProvenanceService, ProvenanceStore, SchedulerCoordinator, SchedulerKind, WorldlineId, + WorldlineRuntime, WorldlineState, WorldlineTick, WorldlineTickPatchV1, WriterHead, + WriterHeadKey, +}; +#[cfg(feature = "host_test")] +use warp_core::{ + OpticAdmissionTicket, OpticArtifactHandle, RuntimeError, TicketedRuntimeIngressAuthority, + TicketedRuntimeIngressDisposition, OPTIC_ADMISSION_TICKET_KIND, OPTIC_ARTIFACT_HANDLE_KIND, }; fn wl(n: u8) -> WorldlineId { @@ -19,6 +25,10 @@ fn wt(raw: u64) -> WorldlineTick { WorldlineTick::from_raw(raw) } +fn gt(raw: u64) -> GlobalTick { + GlobalTick::from_raw(raw) +} + fn empty_engine() -> Engine { let mut store = GraphStore::default(); let root = make_node_id("root"); @@ -76,6 +86,31 @@ fn registered_worldlines_provenance(runtime: &WorldlineRuntime) -> ProvenanceSer provenance } +#[cfg(feature = "host_test")] +fn admission_ticket(seed: u8) -> OpticAdmissionTicket { + OpticAdmissionTicket { + kind: OPTIC_ADMISSION_TICKET_KIND.to_owned(), + artifact_handle: OpticArtifactHandle { + kind: OPTIC_ARTIFACT_HANDLE_KIND.to_owned(), + id: format!("ticketed-runtime-ingress-{seed}"), + }, + artifact_hash: format!("artifact-hash-{seed}"), + operation_id: format!("operation-{seed}"), + requirements_digest: format!("requirements-{seed}"), + canonical_variables_digest: vec![seed], + basis_request_digest: [seed; 32], + aperture_request_digest: [seed.wrapping_add(1); 32], + budget_request_digest: [seed.wrapping_add(2); 32], + law_witness_digest: [seed.wrapping_add(3); 32], + ticket_digest: [seed.wrapping_add(4); 32], + } +} + +#[cfg(feature = "host_test")] +fn ticketed_runtime_ingress_authority() -> TicketedRuntimeIngressAuthority { + TicketedRuntimeIngressAuthority::assume_runtime_owner() +} + #[test] fn runtime_ingest_commits_without_legacy_graph_inbox_nodes() { let mut runtime = WorldlineRuntime::new(); @@ -112,6 +147,215 @@ fn runtime_ingest_commits_without_legacy_graph_inbox_nodes() { assert!(store.node(&make_node_id("sim/inbox")).is_none()); } +#[test] +fn witnessed_submission_does_not_enter_runtime_ingress_before_ticket() { + let mut runtime = WorldlineRuntime::new(); + let mut engine = empty_engine(); + let worldline_id = wl(1); + runtime + .register_worldline(worldline_id, WorldlineState::empty()) + .unwrap(); + let head_key = register_head(&mut runtime, worldline_id, "default", None, true); + + let envelope = IngressEnvelope::local_intent( + IngressTarget::DefaultWriter { worldline_id }, + make_intent_kind("test/runtime"), + b"witness-only".to_vec(), + ); + let disposition = runtime.submit_intent(envelope.clone()).unwrap(); + + assert!(matches!( + disposition, + IntentSubmissionDisposition::Accepted { + ingress_id, + head_key: routed_head_key, + .. + } if ingress_id == envelope.ingress_id() && routed_head_key == head_key + )); + assert_eq!(runtime.witnessed_submission_count(), 1); + assert_eq!( + runtime + .heads() + .get(&head_key) + .unwrap() + .inbox() + .pending_count(), + 0 + ); + assert_eq!(runtime.global_tick(), gt(0)); + + let mut provenance = registered_worldlines_provenance(&runtime); + let records = + SchedulerCoordinator::super_tick(&mut runtime, &mut provenance, &mut engine).unwrap(); + assert!(records.is_empty()); + let frontier = runtime.worldlines().get(&worldline_id).unwrap(); + assert_eq!(frontier.frontier_tick(), wt(0)); + assert!(frontier.state().tick_history().is_empty()); +} + +#[test] +#[cfg(feature = "host_test")] +fn ticketed_runtime_ingress_rejects_unknown_submission() { + let mut runtime = WorldlineRuntime::new(); + let worldline_id = wl(1); + runtime + .register_worldline(worldline_id, WorldlineState::empty()) + .unwrap(); + let head_key = register_head(&mut runtime, worldline_id, "default", None, true); + let envelope = IngressEnvelope::local_intent( + IngressTarget::DefaultWriter { worldline_id }, + make_intent_kind("test/runtime"), + b"unknown-submission".to_vec(), + ); + + let err = runtime + .ingest_ticketed_invocation( + &ticketed_runtime_ingress_authority(), + [9; 32], + &admission_ticket(1), + envelope, + ) + .unwrap_err(); + + assert!(matches!( + err, + RuntimeError::UnknownIntentSubmission(id) if id == [9; 32] + )); + assert_eq!( + runtime + .heads() + .get(&head_key) + .unwrap() + .inbox() + .pending_count(), + 0 + ); + assert_eq!(runtime.ticketed_runtime_ingress_count(), 0); +} + +#[test] +#[cfg(feature = "host_test")] +fn ticketed_invocation_ingests_runtime_envelope_without_ticking() { + let mut runtime = WorldlineRuntime::new(); + let worldline_id = wl(1); + runtime + .register_worldline(worldline_id, WorldlineState::empty()) + .unwrap(); + let head_key = register_head(&mut runtime, worldline_id, "default", None, true); + let envelope = IngressEnvelope::local_intent( + IngressTarget::DefaultWriter { worldline_id }, + make_intent_kind("test/runtime"), + b"ticketed-ingress".to_vec(), + ); + let submission = match runtime.submit_intent(envelope.clone()).unwrap() { + IntentSubmissionDisposition::Accepted { submission_id, .. } => submission_id, + IntentSubmissionDisposition::Duplicate { .. } => panic!("first submission duplicated"), + }; + let ticket = admission_ticket(2); + + let disposition = runtime + .ingest_ticketed_invocation( + &ticketed_runtime_ingress_authority(), + submission, + &ticket, + envelope.clone(), + ) + .unwrap(); + + let record = match disposition { + TicketedRuntimeIngressDisposition::Staged { record, ingress } => { + assert!(matches!( + ingress, + IngressDisposition::Accepted { + ingress_id, + head_key: routed_head_key, + submission_id, + .. + } if ingress_id == envelope.ingress_id() + && routed_head_key == head_key + && submission_id == submission + )); + record + } + TicketedRuntimeIngressDisposition::Duplicate { .. } => { + panic!("first ticketed runtime ingress duplicated") + } + }; + + assert_eq!(record.submission_id, submission); + assert_eq!(record.ticket_digest, ticket.ticket_digest); + assert_eq!(record.ingress_id, envelope.ingress_id()); + assert_eq!(record.head_key, head_key); + assert_eq!(runtime.ticketed_runtime_ingress_count(), 1); + assert_eq!( + runtime + .heads() + .get(&head_key) + .unwrap() + .inbox() + .pending_count(), + 1 + ); + assert_eq!(runtime.global_tick(), gt(0)); + let frontier = runtime.worldlines().get(&worldline_id).unwrap(); + assert_eq!(frontier.frontier_tick(), wt(0)); + assert!(frontier.state().tick_history().is_empty()); +} + +#[test] +#[cfg(feature = "host_test")] +fn ticketed_ingress_preserves_submission_and_ticket_identity() { + let mut runtime = WorldlineRuntime::new(); + let worldline_id = wl(1); + runtime + .register_worldline(worldline_id, WorldlineState::empty()) + .unwrap(); + register_head(&mut runtime, worldline_id, "default", None, true); + let envelope = IngressEnvelope::local_intent( + IngressTarget::DefaultWriter { worldline_id }, + make_intent_kind("test/runtime"), + b"stable-ticketed-ingress".to_vec(), + ); + let submission = match runtime.submit_intent(envelope.clone()).unwrap() { + IntentSubmissionDisposition::Accepted { submission_id, .. } => submission_id, + IntentSubmissionDisposition::Duplicate { .. } => panic!("first submission duplicated"), + }; + let ticket = admission_ticket(3); + + let first = runtime + .ingest_ticketed_invocation( + &ticketed_runtime_ingress_authority(), + submission, + &ticket, + envelope.clone(), + ) + .unwrap(); + let duplicate = runtime + .ingest_ticketed_invocation( + &ticketed_runtime_ingress_authority(), + submission, + &ticket, + envelope, + ) + .unwrap(); + + let first_record = match first { + TicketedRuntimeIngressDisposition::Staged { record, .. } => record, + TicketedRuntimeIngressDisposition::Duplicate { .. } => { + panic!("first ticketed ingress duplicated") + } + }; + let duplicate_record = match duplicate { + TicketedRuntimeIngressDisposition::Duplicate { record } => record, + TicketedRuntimeIngressDisposition::Staged { .. } => { + panic!("duplicate ticketed ingress staged twice") + } + }; + + assert_eq!(first_record, duplicate_record); + assert_eq!(runtime.ticketed_runtime_ingress_count(), 1); +} + #[test] fn runtime_ingest_is_idempotent_per_resolved_head_after_commit() { let mut runtime = WorldlineRuntime::new(); diff --git a/docs/BEARING.md b/docs/BEARING.md index f4bd05af..c1ddf1e2 100644 --- a/docs/BEARING.md +++ b/docs/BEARING.md @@ -30,12 +30,12 @@ scheduler-owned tick outcome without giving application code tick authority. - Tick receipts exist and witness scheduler-owned candidate outcomes. - Footprint conflicts are explicit receipt rejections, not hidden retries. - The optic admission ladder resolves through AdmissionTicket and currently - stops before ticketed runtime ingress. + can stage ticketed runtime ingress through an explicit runtime-owner authority + token without ticking. ## What Is Not Yet True - Accepted submissions are not yet complete witnessed ingress history. -- Ticketed runtime ingress is not wired. - Tick receipts are not cleanly correlated to intent, submission, and ticket ids. - Clients cannot observe intent outcome by id. - Installed Wesley handler dispatch is not wired into scheduler-owned execution. @@ -112,9 +112,10 @@ AdmissionTicket + witnessed submission -> ticketed runtime ingress ## Immediate Next Slice -TicketedRuntimeIngress should prove that only ticketed invocations enter runtime -ingress while application code still cannot tick the runtime. +ReceiptCorrelation should bind scheduler-owned tick receipts back to the +witnessed submission, admission ticket, and ticketed runtime ingress records +that carried the intent into execution. -This slice must not implement receipt correlation, outcome observation, -installed handler dispatch, QueryView, streaming subscriptions, automatic retry, -execution outside scheduler-owned ticks, or wall-clock cadence semantics. +This slice must not implement outcome observation, installed handler dispatch, +QueryView, streaming subscriptions, automatic retry, execution outside +scheduler-owned ticks, or wall-clock cadence semantics. From 5cfc5da409d090501f41f417cc7780245e2db63a Mon Sep 17 00:00:00 2001 From: James Ross Date: Wed, 20 May 2026 06:04:15 -0700 Subject: [PATCH 2/9] feat(core): correlate ticketed ingress receipts --- CHANGELOG.md | 9 ++ crates/warp-core/src/coordinator.rs | 160 +++++++++++++++++++++++++++- crates/warp-core/src/lib.rs | 6 +- crates/warp-core/tests/inbox.rs | 143 +++++++++++++++++++++++++ docs/BEARING.md | 15 +-- 5 files changed, 322 insertions(+), 11 deletions(-) diff --git a/CHANGELOG.md b/CHANGELOG.md index 4b02d6ef..595f1674 100644 --- a/CHANGELOG.md +++ b/CHANGELOG.md @@ -7,6 +7,15 @@ ### Added +- `warp-core` now records scheduler-owned receipt correlations for ticketed + runtime ingress. After `SchedulerCoordinator::super_tick(...)` commits a + ticketed ingress batch, Echo indexes the witnessed submission id, admission + ticket digest, ticketed ingress id, ingress id, writer head, logical tick + coordinates, receipt digest, and commit hash. Correlations are created only + after scheduler-owned ticks and only for ticketed runtime ingress; legacy + direct inbox ingress remains uncorrelated. This does not expose intent + outcome observation, dispatch installed handlers, execute contracts outside + scheduler-owned ticks, or introduce automatic retry. - `warp-core` now exposes a ticketed runtime ingress boundary. `WorldlineRuntime::submit_intent(...)` records witnessed submission history without entering a head inbox, ticking, dispatching handlers, or mutating diff --git a/crates/warp-core/src/coordinator.rs b/crates/warp-core/src/coordinator.rs index 0da05b9d..95f08e93 100644 --- a/crates/warp-core/src/coordinator.rs +++ b/crates/warp-core/src/coordinator.rs @@ -270,6 +270,32 @@ pub enum TicketedRuntimeIngressDisposition { }, } +/// Correlation between a ticketed runtime ingress record and a scheduler tick receipt. +/// +/// This is an observation/correlation index only. It does not interpret the +/// receipt into an application outcome and does not dispatch handlers. +#[derive(Clone, Debug, PartialEq, Eq)] +pub struct ReceiptCorrelationRecord { + /// Ticketed runtime ingress event that reached scheduler-owned execution. + pub ticketed_ingress_id: Hash, + /// Witnessed Echo submission that produced the runtime ingress. + pub submission_id: Hash, + /// Admission ticket digest bound to the runtime ingress. + pub ticket_digest: Hash, + /// Content-addressed canonical ingress id decided by the tick. + pub ingress_id: Hash, + /// Writer head that committed the ingress batch. + pub head_key: WriterHeadKey, + /// Runtime cycle stamp that produced the receipt. + pub commit_global_tick: GlobalTick, + /// Worldline frontier tick after the scheduler-owned commit. + pub worldline_tick_after: WorldlineTick, + /// Digest of the scheduler-owned tick receipt. + pub tick_receipt_digest: Hash, + /// Commit hash emitted by the scheduler-owned tick. + pub commit_hash: Hash, +} + /// Request to fork a strand from one precise source-lane coordinate. #[derive(Clone, Debug)] pub struct ForkStrandRequest { @@ -331,6 +357,15 @@ pub struct WorldlineRuntime { ticketed_runtime_ingress: BTreeMap, /// Deterministic lookup from witnessed submission id to ticketed ingress. ticketed_runtime_ingress_by_submission: BTreeMap, + /// Deterministic lookup from resolved semantic target and ingress id to + /// ticketed ingress id. + ticketed_runtime_ingress_by_target: BTreeMap<(WriterHeadKey, Hash), Hash>, + /// Receipt correlations keyed by ticketed ingress id. + receipt_correlations_by_ticketed_ingress: BTreeMap, + /// Deterministic lookup from witnessed submission id to receipt correlation. + receipt_correlation_by_submission: BTreeMap, + /// Deterministic lookup from admission ticket digest to receipt correlation. + receipt_correlation_by_ticket: BTreeMap, /// Registry of live speculative strands attached to the runtime. strands: StrandRegistry, } @@ -340,6 +375,9 @@ struct RuntimeCheckpoint { global_tick: GlobalTick, heads: BTreeMap, frontiers: BTreeMap, + receipt_correlations_by_ticketed_ingress: BTreeMap, + receipt_correlation_by_submission: BTreeMap, + receipt_correlation_by_ticket: BTreeMap, } impl WorldlineRuntime { @@ -376,11 +414,20 @@ impl WorldlineRuntime { global_tick: self.global_tick, heads, frontiers, + receipt_correlations_by_ticketed_ingress: self + .receipt_correlations_by_ticketed_ingress + .clone(), + receipt_correlation_by_submission: self.receipt_correlation_by_submission.clone(), + receipt_correlation_by_ticket: self.receipt_correlation_by_ticket.clone(), }) } fn restore(&mut self, checkpoint: RuntimeCheckpoint) { self.global_tick = checkpoint.global_tick; + self.receipt_correlations_by_ticketed_ingress = + checkpoint.receipt_correlations_by_ticketed_ingress; + self.receipt_correlation_by_submission = checkpoint.receipt_correlation_by_submission; + self.receipt_correlation_by_ticket = checkpoint.receipt_correlation_by_ticket; for head in checkpoint.heads.into_values() { self.heads.insert(head); } @@ -441,6 +488,55 @@ impl WorldlineRuntime { self.ticketed_runtime_ingress.len() } + /// Returns a receipt correlation by ticketed runtime ingress id. + #[must_use] + pub fn receipt_correlation_for_ticketed_ingress( + &self, + ticketed_ingress_id: &Hash, + ) -> Option<&ReceiptCorrelationRecord> { + self.receipt_correlations_by_ticketed_ingress + .get(ticketed_ingress_id) + } + + /// Returns a receipt correlation by witnessed submission id. + #[must_use] + pub fn receipt_correlation_for_submission( + &self, + submission_id: &Hash, + ) -> Option<&ReceiptCorrelationRecord> { + self.receipt_correlation_by_submission + .get(submission_id) + .and_then(|ticketed_ingress_id| { + self.receipt_correlations_by_ticketed_ingress + .get(ticketed_ingress_id) + }) + } + + /// Returns a receipt correlation by admission ticket digest. + #[must_use] + pub fn receipt_correlation_for_ticket( + &self, + ticket_digest: &Hash, + ) -> Option<&ReceiptCorrelationRecord> { + self.receipt_correlation_by_ticket + .get(ticket_digest) + .and_then(|ticketed_ingress_id| { + self.receipt_correlations_by_ticketed_ingress + .get(ticketed_ingress_id) + }) + } + + /// Iterates receipt correlations in deterministic ticketed-ingress id order. + pub fn receipt_correlations(&self) -> impl Iterator { + self.receipt_correlations_by_ticketed_ingress.values() + } + + /// Returns the number of scheduler-owned receipt correlations. + #[must_use] + pub fn receipt_correlation_count(&self) -> usize { + self.receipt_correlations_by_ticketed_ingress.len() + } + /// Returns the current correlation tick. #[must_use] pub fn global_tick(&self) -> GlobalTick { @@ -834,6 +930,8 @@ impl WorldlineRuntime { .insert(ticketed_ingress_id, record.clone()); self.ticketed_runtime_ingress_by_submission .insert(submission_id, ticketed_ingress_id); + self.ticketed_runtime_ingress_by_target + .insert((head_key, ingress_id), ticketed_ingress_id); Ok(TicketedRuntimeIngressDisposition::Staged { record, ingress }) } @@ -950,6 +1048,57 @@ impl WorldlineRuntime { } } + fn record_receipt_correlations( + &mut self, + head_key: WriterHeadKey, + admitted: &[IngressEnvelope], + commit_global_tick: GlobalTick, + worldline_tick_after: WorldlineTick, + tick_receipt_digest: Hash, + commit_hash: Hash, + ) { + for envelope in admitted { + let ingress_id = envelope.ingress_id(); + let Some(ticketed_ingress_id) = self + .ticketed_runtime_ingress_by_target + .get(&(head_key, ingress_id)) + .copied() + else { + continue; + }; + if self + .receipt_correlations_by_ticketed_ingress + .contains_key(&ticketed_ingress_id) + { + continue; + } + let Some(ticketed_ingress) = self + .ticketed_runtime_ingress + .get(&ticketed_ingress_id) + .cloned() + else { + continue; + }; + let record = ReceiptCorrelationRecord { + ticketed_ingress_id, + submission_id: ticketed_ingress.submission_id, + ticket_digest: ticketed_ingress.ticket_digest, + ingress_id, + head_key, + commit_global_tick, + worldline_tick_after, + tick_receipt_digest, + commit_hash, + }; + self.receipt_correlations_by_ticketed_ingress + .insert(ticketed_ingress_id, record); + self.receipt_correlation_by_submission + .insert(ticketed_ingress.submission_id, ticketed_ingress_id); + self.receipt_correlation_by_ticket + .insert(ticketed_ingress.ticket_digest, ticketed_ingress_id); + } + } + fn resolve_target(&self, target: &IngressTarget) -> Result { match target { IngressTarget::DefaultWriter { worldline_id } => self @@ -1108,7 +1257,7 @@ impl SchedulerCoordinator { let CommitOutcome { snapshot, patch, - receipt: _, + receipt, } = { let frontier = runtime .worldlines @@ -1118,6 +1267,7 @@ impl SchedulerCoordinator { .commit_with_state(frontier.state_mut(), &admitted) .map_err(RuntimeError::from)? }; + let tick_receipt_digest = receipt.digest(); let (state_root, worldline_tick_after) = { let frontier = runtime @@ -1170,6 +1320,14 @@ impl SchedulerCoordinator { .ok_or(RuntimeError::FrontierTickOverflow(key.worldline_id))?; (snapshot.state_root, worldline_tick_after) }; + runtime.record_receipt_correlations( + *key, + &admitted, + next_global_tick, + worldline_tick_after, + tick_receipt_digest, + snapshot.hash, + ); Ok(StepRecord { head_key: *key, diff --git a/crates/warp-core/src/lib.rs b/crates/warp-core/src/lib.rs index 63e6ac64..a8e9e685 100644 --- a/crates/warp-core/src/lib.rs +++ b/crates/warp-core/src/lib.rs @@ -326,9 +326,9 @@ pub use worldline::{ /// Prefer this coordinator/runtime API for new stepping and routing code. pub use coordinator::{ ForkStrandReceipt, ForkStrandRequest, IngressDisposition, IngressSubmissionGeneration, - IntentSubmissionDisposition, IntentSubmissionRecord, RuntimeError, SchedulerCoordinator, - StepRecord, TicketedRuntimeIngressAuthority, TicketedRuntimeIngressDisposition, - TicketedRuntimeIngressRecord, WorldlineRuntime, + IntentSubmissionDisposition, IntentSubmissionRecord, ReceiptCorrelationRecord, RuntimeError, + SchedulerCoordinator, StepRecord, TicketedRuntimeIngressAuthority, + TicketedRuntimeIngressDisposition, TicketedRuntimeIngressRecord, WorldlineRuntime, }; /// Writer-head registry and routing primitives used by the runtime-owned ingress path. pub use head::{ diff --git a/crates/warp-core/tests/inbox.rs b/crates/warp-core/tests/inbox.rs index 3a9d0217..0d576ddb 100644 --- a/crates/warp-core/tests/inbox.rs +++ b/crates/warp-core/tests/inbox.rs @@ -356,6 +356,149 @@ fn ticketed_ingress_preserves_submission_and_ticket_identity() { assert_eq!(runtime.ticketed_runtime_ingress_count(), 1); } +#[test] +#[cfg(feature = "host_test")] +fn receipt_correlation_does_not_exist_before_scheduler_tick() { + let mut runtime = WorldlineRuntime::new(); + let worldline_id = wl(1); + runtime + .register_worldline(worldline_id, WorldlineState::empty()) + .unwrap(); + register_head(&mut runtime, worldline_id, "default", None, true); + let envelope = IngressEnvelope::local_intent( + IngressTarget::DefaultWriter { worldline_id }, + make_intent_kind("test/runtime"), + b"pending-receipt-correlation".to_vec(), + ); + let submission = match runtime.submit_intent(envelope.clone()).unwrap() { + IntentSubmissionDisposition::Accepted { submission_id, .. } => submission_id, + IntentSubmissionDisposition::Duplicate { .. } => panic!("first submission duplicated"), + }; + let ticket = admission_ticket(4); + let staged = runtime + .ingest_ticketed_invocation( + &ticketed_runtime_ingress_authority(), + submission, + &ticket, + envelope, + ) + .unwrap(); + let ticketed_ingress_id = match staged { + TicketedRuntimeIngressDisposition::Staged { record, .. } => record.ticketed_ingress_id, + TicketedRuntimeIngressDisposition::Duplicate { .. } => { + panic!("first ticketed ingress duplicated") + } + }; + + assert!(runtime + .receipt_correlation_for_ticketed_ingress(&ticketed_ingress_id) + .is_none()); + assert!(runtime + .receipt_correlation_for_submission(&submission) + .is_none()); + assert_eq!(runtime.receipt_correlation_count(), 0); +} + +#[test] +#[cfg(feature = "host_test")] +fn ticketed_ingress_correlates_tick_receipt_after_scheduler_owned_tick() { + let mut runtime = WorldlineRuntime::new(); + let mut engine = empty_engine(); + let worldline_id = wl(1); + runtime + .register_worldline(worldline_id, WorldlineState::empty()) + .unwrap(); + let head_key = register_head(&mut runtime, worldline_id, "default", None, true); + let envelope = IngressEnvelope::local_intent( + IngressTarget::DefaultWriter { worldline_id }, + make_intent_kind("test/runtime"), + b"correlate-after-tick".to_vec(), + ); + let submission = match runtime.submit_intent(envelope.clone()).unwrap() { + IntentSubmissionDisposition::Accepted { submission_id, .. } => submission_id, + IntentSubmissionDisposition::Duplicate { .. } => panic!("first submission duplicated"), + }; + let ticket = admission_ticket(5); + let staged = runtime + .ingest_ticketed_invocation( + &ticketed_runtime_ingress_authority(), + submission, + &ticket, + envelope.clone(), + ) + .unwrap(); + let ticketed_ingress_id = match staged { + TicketedRuntimeIngressDisposition::Staged { record, .. } => record.ticketed_ingress_id, + TicketedRuntimeIngressDisposition::Duplicate { .. } => { + panic!("first ticketed ingress duplicated") + } + }; + + let mut provenance = registered_worldlines_provenance(&runtime); + let records = + SchedulerCoordinator::super_tick(&mut runtime, &mut provenance, &mut engine).unwrap(); + + assert_eq!(records.len(), 1); + let correlation = runtime + .receipt_correlation_for_ticketed_ingress(&ticketed_ingress_id) + .expect("ticketed ingress should correlate after scheduler tick"); + let frontier = runtime.worldlines().get(&worldline_id).unwrap(); + let (_, receipt, _) = frontier + .state() + .tick_history() + .last() + .expect("scheduler-owned tick should be recorded"); + assert_eq!(correlation.ticketed_ingress_id, ticketed_ingress_id); + assert_eq!(correlation.submission_id, submission); + assert_eq!(correlation.ticket_digest, ticket.ticket_digest); + assert_eq!(correlation.ingress_id, envelope.ingress_id()); + assert_eq!(correlation.head_key, head_key); + assert_eq!( + correlation.commit_global_tick, + records[0].commit_global_tick + ); + assert_eq!( + correlation.worldline_tick_after, + records[0].worldline_tick_after + ); + assert_eq!(correlation.tick_receipt_digest, receipt.digest()); + assert_eq!(correlation.commit_hash, records[0].commit_hash); + assert_eq!( + runtime + .receipt_correlation_for_submission(&submission) + .unwrap(), + correlation + ); +} + +#[test] +#[cfg(feature = "host_test")] +fn legacy_ingress_without_ticket_does_not_create_receipt_correlation() { + let mut runtime = WorldlineRuntime::new(); + let mut engine = empty_engine(); + let worldline_id = wl(1); + runtime + .register_worldline(worldline_id, WorldlineState::empty()) + .unwrap(); + register_head(&mut runtime, worldline_id, "default", None, true); + let envelope = IngressEnvelope::local_intent( + IngressTarget::DefaultWriter { worldline_id }, + make_intent_kind("test/runtime"), + b"legacy-uncorrelated".to_vec(), + ); + + assert!(matches!( + runtime.ingest(envelope).unwrap(), + IngressDisposition::Accepted { .. } + )); + let mut provenance = registered_worldlines_provenance(&runtime); + let records = + SchedulerCoordinator::super_tick(&mut runtime, &mut provenance, &mut engine).unwrap(); + + assert_eq!(records.len(), 1); + assert_eq!(runtime.receipt_correlation_count(), 0); +} + #[test] fn runtime_ingest_is_idempotent_per_resolved_head_after_commit() { let mut runtime = WorldlineRuntime::new(); diff --git a/docs/BEARING.md b/docs/BEARING.md index c1ddf1e2..a94df648 100644 --- a/docs/BEARING.md +++ b/docs/BEARING.md @@ -28,6 +28,8 @@ scheduler-owned tick outcome without giving application code tick authority. - Fixed logical timestep doctrine exists. Wall-clock cadence is host/runtime owner policy, not semantic Echo history. - Tick receipts exist and witness scheduler-owned candidate outcomes. +- Scheduler-owned tick receipts can be correlated back to ticketed runtime + ingress records, admission ticket digests, and witnessed submission ids. - Footprint conflicts are explicit receipt rejections, not hidden retries. - The optic admission ladder resolves through AdmissionTicket and currently can stage ticketed runtime ingress through an explicit runtime-owner authority @@ -36,7 +38,6 @@ scheduler-owned tick outcome without giving application code tick authority. ## What Is Not Yet True - Accepted submissions are not yet complete witnessed ingress history. -- Tick receipts are not cleanly correlated to intent, submission, and ticket ids. - Clients cannot observe intent outcome by id. - Installed Wesley handler dispatch is not wired into scheduler-owned execution. - Generic QueryView remains unsupported in core. @@ -112,10 +113,10 @@ AdmissionTicket + witnessed submission -> ticketed runtime ingress ## Immediate Next Slice -ReceiptCorrelation should bind scheduler-owned tick receipts back to the -witnessed submission, admission ticket, and ticketed runtime ingress records -that carried the intent into execution. +IntentOutcomeObservation should expose a small polling surface that reports +whether a witnessed submission is still pending or has reached a +scheduler-owned tick receipt. -This slice must not implement outcome observation, installed handler dispatch, -QueryView, streaming subscriptions, automatic retry, execution outside -scheduler-owned ticks, or wall-clock cadence semantics. +This slice must not implement installed handler dispatch, QueryView, streaming +subscriptions, automatic retry, execution outside scheduler-owned ticks, or +wall-clock cadence semantics. From ed09f7ea533778fff2c8ecf585656c76b09c3613 Mon Sep 17 00:00:00 2001 From: James Ross Date: Wed, 20 May 2026 06:11:42 -0700 Subject: [PATCH 3/9] feat(core): observe intent outcome posture --- CHANGELOG.md | 8 +++ crates/warp-core/src/coordinator.rs | 56 +++++++++++++++++ crates/warp-core/src/lib.rs | 7 ++- crates/warp-core/tests/inbox.rs | 93 ++++++++++++++++++++++++++++- docs/BEARING.md | 16 ++--- 5 files changed, 167 insertions(+), 13 deletions(-) diff --git a/CHANGELOG.md b/CHANGELOG.md index 595f1674..f897ca0f 100644 --- a/CHANGELOG.md +++ b/CHANGELOG.md @@ -7,6 +7,14 @@ ### Added +- `warp-core` now exposes a zero-write `observe_intent_outcome(...)` polling + surface over witnessed submission ids. The observation reports + `UnknownSubmission`, `Pending` with optional ticketed-ingress identity, or + `Decided` with the scheduler-owned receipt correlation once a ticketed + submission reaches a tick receipt. This does not infer per-candidate + applied/rejected application semantics, stream updates, dispatch installed + handlers, execute contracts outside scheduler-owned ticks, or introduce + automatic retry. - `warp-core` now records scheduler-owned receipt correlations for ticketed runtime ingress. After `SchedulerCoordinator::super_tick(...)` commits a ticketed ingress batch, Echo indexes the witnessed submission id, admission diff --git a/crates/warp-core/src/coordinator.rs b/crates/warp-core/src/coordinator.rs index 95f08e93..61d02361 100644 --- a/crates/warp-core/src/coordinator.rs +++ b/crates/warp-core/src/coordinator.rs @@ -296,6 +296,35 @@ pub struct ReceiptCorrelationRecord { pub commit_hash: Hash, } +/// Polling observation for a witnessed intent submission. +/// +/// This is intentionally narrower than a final applied/rejected application +/// outcome. Until receipt entries are bound to intent-level semantics, Echo can +/// report whether the submission is unknown, still pending, or decided by a +/// scheduler-owned tick receipt. +#[derive(Clone, Debug, PartialEq, Eq)] +pub enum IntentOutcomeObservation { + /// Echo has no witnessed submission for the supplied id. + UnknownSubmission { + /// Submission id the caller asked about. + submission_id: Hash, + }, + /// Echo has witnessed the submission, but no receipt correlation exists yet. + Pending { + /// Witnessed Echo submission id. + submission_id: Hash, + /// Echo-owned intake/correlation generation. + submission_generation: IngressSubmissionGeneration, + /// Ticketed runtime ingress id, if the submission has reached runtime ingress. + ticketed_ingress_id: Option, + }, + /// Echo has correlated the submission to a scheduler-owned tick receipt. + Decided { + /// Scheduler-owned receipt correlation. + correlation: ReceiptCorrelationRecord, + }, +} + /// Request to fork a strand from one precise source-lane coordinate. #[derive(Clone, Debug)] pub struct ForkStrandRequest { @@ -537,6 +566,33 @@ impl WorldlineRuntime { self.receipt_correlations_by_ticketed_ingress.len() } + /// Observes the current scheduler-owned outcome posture for a submission. + /// + /// This is a zero-write polling surface. It does not tick, dispatch + /// handlers, subscribe to streams, or infer applied/rejected semantics from + /// receipt entries. + #[must_use] + pub fn observe_intent_outcome(&self, submission_id: &Hash) -> IntentOutcomeObservation { + let Some(submission) = self.witnessed_submissions.get(submission_id) else { + return IntentOutcomeObservation::UnknownSubmission { + submission_id: *submission_id, + }; + }; + if let Some(correlation) = self.receipt_correlation_for_submission(submission_id) { + return IntentOutcomeObservation::Decided { + correlation: correlation.clone(), + }; + } + IntentOutcomeObservation::Pending { + submission_id: *submission_id, + submission_generation: submission.submission_generation, + ticketed_ingress_id: self + .ticketed_runtime_ingress_by_submission + .get(submission_id) + .copied(), + } + } + /// Returns the current correlation tick. #[must_use] pub fn global_tick(&self) -> GlobalTick { diff --git a/crates/warp-core/src/lib.rs b/crates/warp-core/src/lib.rs index a8e9e685..bb5a5342 100644 --- a/crates/warp-core/src/lib.rs +++ b/crates/warp-core/src/lib.rs @@ -326,9 +326,10 @@ pub use worldline::{ /// Prefer this coordinator/runtime API for new stepping and routing code. pub use coordinator::{ ForkStrandReceipt, ForkStrandRequest, IngressDisposition, IngressSubmissionGeneration, - IntentSubmissionDisposition, IntentSubmissionRecord, ReceiptCorrelationRecord, RuntimeError, - SchedulerCoordinator, StepRecord, TicketedRuntimeIngressAuthority, - TicketedRuntimeIngressDisposition, TicketedRuntimeIngressRecord, WorldlineRuntime, + IntentOutcomeObservation, IntentSubmissionDisposition, IntentSubmissionRecord, + ReceiptCorrelationRecord, RuntimeError, SchedulerCoordinator, StepRecord, + TicketedRuntimeIngressAuthority, TicketedRuntimeIngressDisposition, + TicketedRuntimeIngressRecord, WorldlineRuntime, }; /// Writer-head registry and routing primitives used by the runtime-owned ingress path. pub use head::{ diff --git a/crates/warp-core/tests/inbox.rs b/crates/warp-core/tests/inbox.rs index 0d576ddb..1edbc592 100644 --- a/crates/warp-core/tests/inbox.rs +++ b/crates/warp-core/tests/inbox.rs @@ -6,9 +6,9 @@ use warp_core::{ make_head_id, make_intent_kind, make_node_id, make_type_id, Engine, EngineBuilder, GlobalTick, GraphStore, InboxAddress, InboxPolicy, IngressDisposition, IngressEnvelope, IngressTarget, - IntentSubmissionDisposition, NodeId, NodeRecord, PlaybackMode, ProvenanceEventKind, - ProvenanceService, ProvenanceStore, SchedulerCoordinator, SchedulerKind, WorldlineId, - WorldlineRuntime, WorldlineState, WorldlineTick, WorldlineTickPatchV1, WriterHead, + IntentOutcomeObservation, IntentSubmissionDisposition, NodeId, NodeRecord, PlaybackMode, + ProvenanceEventKind, ProvenanceService, ProvenanceStore, SchedulerCoordinator, SchedulerKind, + WorldlineId, WorldlineRuntime, WorldlineState, WorldlineTick, WorldlineTickPatchV1, WriterHead, WriterHeadKey, }; #[cfg(feature = "host_test")] @@ -399,6 +399,50 @@ fn receipt_correlation_does_not_exist_before_scheduler_tick() { assert_eq!(runtime.receipt_correlation_count(), 0); } +#[test] +fn unknown_submission_outcome_observation_is_unknown() { + let runtime = WorldlineRuntime::new(); + let unknown_submission = [42; 32]; + + assert!(matches!( + runtime.observe_intent_outcome(&unknown_submission), + IntentOutcomeObservation::UnknownSubmission { submission_id } + if submission_id == unknown_submission + )); +} + +#[test] +fn witnessed_submission_outcome_observation_is_pending_without_tick() { + let mut runtime = WorldlineRuntime::new(); + let worldline_id = wl(1); + runtime + .register_worldline(worldline_id, WorldlineState::empty()) + .unwrap(); + register_head(&mut runtime, worldline_id, "default", None, true); + let envelope = IngressEnvelope::local_intent( + IngressTarget::DefaultWriter { worldline_id }, + make_intent_kind("test/runtime"), + b"pending-outcome".to_vec(), + ); + let (submission, generation) = match runtime.submit_intent(envelope).unwrap() { + IntentSubmissionDisposition::Accepted { + submission_id, + submission_generation, + .. + } => (submission_id, submission_generation), + IntentSubmissionDisposition::Duplicate { .. } => panic!("first submission duplicated"), + }; + + assert!(matches!( + runtime.observe_intent_outcome(&submission), + IntentOutcomeObservation::Pending { + submission_id, + submission_generation, + ticketed_ingress_id: None, + } if submission_id == submission && submission_generation == generation + )); +} + #[test] #[cfg(feature = "host_test")] fn ticketed_ingress_correlates_tick_receipt_after_scheduler_owned_tick() { @@ -471,6 +515,49 @@ fn ticketed_ingress_correlates_tick_receipt_after_scheduler_owned_tick() { ); } +#[test] +#[cfg(feature = "host_test")] +fn ticketed_submission_outcome_observation_is_decided_after_scheduler_tick() { + let mut runtime = WorldlineRuntime::new(); + let mut engine = empty_engine(); + let worldline_id = wl(1); + runtime + .register_worldline(worldline_id, WorldlineState::empty()) + .unwrap(); + register_head(&mut runtime, worldline_id, "default", None, true); + let envelope = IngressEnvelope::local_intent( + IngressTarget::DefaultWriter { worldline_id }, + make_intent_kind("test/runtime"), + b"decided-outcome".to_vec(), + ); + let submission = match runtime.submit_intent(envelope.clone()).unwrap() { + IntentSubmissionDisposition::Accepted { submission_id, .. } => submission_id, + IntentSubmissionDisposition::Duplicate { .. } => panic!("first submission duplicated"), + }; + let ticket = admission_ticket(6); + runtime + .ingest_ticketed_invocation( + &ticketed_runtime_ingress_authority(), + submission, + &ticket, + envelope, + ) + .unwrap(); + + let mut provenance = registered_worldlines_provenance(&runtime); + SchedulerCoordinator::super_tick(&mut runtime, &mut provenance, &mut engine).unwrap(); + + let correlation = runtime + .receipt_correlation_for_submission(&submission) + .expect("submission should have receipt correlation") + .clone(); + assert!(matches!( + runtime.observe_intent_outcome(&submission), + IntentOutcomeObservation::Decided { correlation: observed } + if observed == correlation + )); +} + #[test] #[cfg(feature = "host_test")] fn legacy_ingress_without_ticket_does_not_create_receipt_correlation() { diff --git a/docs/BEARING.md b/docs/BEARING.md index a94df648..5c4b35c8 100644 --- a/docs/BEARING.md +++ b/docs/BEARING.md @@ -30,6 +30,8 @@ scheduler-owned tick outcome without giving application code tick authority. - Tick receipts exist and witness scheduler-owned candidate outcomes. - Scheduler-owned tick receipts can be correlated back to ticketed runtime ingress records, admission ticket digests, and witnessed submission ids. +- Core can observe a witnessed submission as unknown, pending, or decided by a + scheduler-owned tick receipt. - Footprint conflicts are explicit receipt rejections, not hidden retries. - The optic admission ladder resolves through AdmissionTicket and currently can stage ticketed runtime ingress through an explicit runtime-owner authority @@ -38,7 +40,8 @@ scheduler-owned tick outcome without giving application code tick authority. ## What Is Not Yet True - Accepted submissions are not yet complete witnessed ingress history. -- Clients cannot observe intent outcome by id. +- Clients cannot yet observe per-intent applied/rejected application semantics + by id. - Installed Wesley handler dispatch is not wired into scheduler-owned execution. - Generic QueryView remains unsupported in core. @@ -113,10 +116,9 @@ AdmissionTicket + witnessed submission -> ticketed runtime ingress ## Immediate Next Slice -IntentOutcomeObservation should expose a small polling surface that reports -whether a witnessed submission is still pending or has reached a -scheduler-owned tick receipt. +InstalledContractHostDispatch should connect installed Wesley contract handlers +to scheduler-owned runtime execution without letting application dispatch call +handlers synchronously. -This slice must not implement installed handler dispatch, QueryView, streaming -subscriptions, automatic retry, execution outside scheduler-owned ticks, or -wall-clock cadence semantics. +This slice must not implement QueryView, streaming subscriptions, automatic +retry, execution outside scheduler-owned ticks, or wall-clock cadence semantics. From 159cd110dbf0dfdb0ff8e72b1bccf7043a335189 Mon Sep 17 00:00:00 2001 From: James Ross Date: Wed, 20 May 2026 08:15:01 -0700 Subject: [PATCH 4/9] Fix: remove panic from ticketed ingress tests --- crates/warp-core/tests/inbox.rs | 133 ++++++++++++++++++++++---------- 1 file changed, 92 insertions(+), 41 deletions(-) diff --git a/crates/warp-core/tests/inbox.rs b/crates/warp-core/tests/inbox.rs index 1edbc592..433ba6c3 100644 --- a/crates/warp-core/tests/inbox.rs +++ b/crates/warp-core/tests/inbox.rs @@ -247,9 +247,13 @@ fn ticketed_invocation_ingests_runtime_envelope_without_ticking() { make_intent_kind("test/runtime"), b"ticketed-ingress".to_vec(), ); - let submission = match runtime.submit_intent(envelope.clone()).unwrap() { - IntentSubmissionDisposition::Accepted { submission_id, .. } => submission_id, - IntentSubmissionDisposition::Duplicate { .. } => panic!("first submission duplicated"), + let accepted = match runtime.submit_intent(envelope.clone()).unwrap() { + IntentSubmissionDisposition::Accepted { submission_id, .. } => Some(submission_id), + IntentSubmissionDisposition::Duplicate { .. } => None, + }; + assert!(accepted.is_some(), "first submission should be accepted"); + let Some(submission) = accepted else { + return; }; let ticket = admission_ticket(2); @@ -262,7 +266,7 @@ fn ticketed_invocation_ingests_runtime_envelope_without_ticking() { ) .unwrap(); - let record = match disposition { + let staged_record = match disposition { TicketedRuntimeIngressDisposition::Staged { record, ingress } => { assert!(matches!( ingress, @@ -275,11 +279,16 @@ fn ticketed_invocation_ingests_runtime_envelope_without_ticking() { && routed_head_key == head_key && submission_id == submission )); - record - } - TicketedRuntimeIngressDisposition::Duplicate { .. } => { - panic!("first ticketed runtime ingress duplicated") + Some(record) } + TicketedRuntimeIngressDisposition::Duplicate { .. } => None, + }; + assert!( + staged_record.is_some(), + "first ticketed runtime ingress should be staged" + ); + let Some(record) = staged_record else { + return; }; assert_eq!(record.submission_id, submission); @@ -316,9 +325,13 @@ fn ticketed_ingress_preserves_submission_and_ticket_identity() { make_intent_kind("test/runtime"), b"stable-ticketed-ingress".to_vec(), ); - let submission = match runtime.submit_intent(envelope.clone()).unwrap() { - IntentSubmissionDisposition::Accepted { submission_id, .. } => submission_id, - IntentSubmissionDisposition::Duplicate { .. } => panic!("first submission duplicated"), + let accepted = match runtime.submit_intent(envelope.clone()).unwrap() { + IntentSubmissionDisposition::Accepted { submission_id, .. } => Some(submission_id), + IntentSubmissionDisposition::Duplicate { .. } => None, + }; + assert!(accepted.is_some(), "first submission should be accepted"); + let Some(submission) = accepted else { + return; }; let ticket = admission_ticket(3); @@ -340,16 +353,26 @@ fn ticketed_ingress_preserves_submission_and_ticket_identity() { .unwrap(); let first_record = match first { - TicketedRuntimeIngressDisposition::Staged { record, .. } => record, - TicketedRuntimeIngressDisposition::Duplicate { .. } => { - panic!("first ticketed ingress duplicated") - } + TicketedRuntimeIngressDisposition::Staged { record, .. } => Some(record), + TicketedRuntimeIngressDisposition::Duplicate { .. } => None, + }; + assert!( + first_record.is_some(), + "first ticketed ingress should be staged" + ); + let Some(first_record) = first_record else { + return; }; let duplicate_record = match duplicate { - TicketedRuntimeIngressDisposition::Duplicate { record } => record, - TicketedRuntimeIngressDisposition::Staged { .. } => { - panic!("duplicate ticketed ingress staged twice") - } + TicketedRuntimeIngressDisposition::Duplicate { record } => Some(record), + TicketedRuntimeIngressDisposition::Staged { .. } => None, + }; + assert!( + duplicate_record.is_some(), + "duplicate ticketed ingress should not stage twice" + ); + let Some(duplicate_record) = duplicate_record else { + return; }; assert_eq!(first_record, duplicate_record); @@ -370,9 +393,13 @@ fn receipt_correlation_does_not_exist_before_scheduler_tick() { make_intent_kind("test/runtime"), b"pending-receipt-correlation".to_vec(), ); - let submission = match runtime.submit_intent(envelope.clone()).unwrap() { - IntentSubmissionDisposition::Accepted { submission_id, .. } => submission_id, - IntentSubmissionDisposition::Duplicate { .. } => panic!("first submission duplicated"), + let accepted = match runtime.submit_intent(envelope.clone()).unwrap() { + IntentSubmissionDisposition::Accepted { submission_id, .. } => Some(submission_id), + IntentSubmissionDisposition::Duplicate { .. } => None, + }; + assert!(accepted.is_some(), "first submission should be accepted"); + let Some(submission) = accepted else { + return; }; let ticket = admission_ticket(4); let staged = runtime @@ -383,12 +410,18 @@ fn receipt_correlation_does_not_exist_before_scheduler_tick() { envelope, ) .unwrap(); - let ticketed_ingress_id = match staged { - TicketedRuntimeIngressDisposition::Staged { record, .. } => record.ticketed_ingress_id, - TicketedRuntimeIngressDisposition::Duplicate { .. } => { - panic!("first ticketed ingress duplicated") - } + let staged_record = match staged { + TicketedRuntimeIngressDisposition::Staged { record, .. } => Some(record), + TicketedRuntimeIngressDisposition::Duplicate { .. } => None, }; + assert!( + staged_record.is_some(), + "first ticketed ingress should be staged" + ); + let Some(staged_record) = staged_record else { + return; + }; + let ticketed_ingress_id = staged_record.ticketed_ingress_id; assert!(runtime .receipt_correlation_for_ticketed_ingress(&ticketed_ingress_id) @@ -424,13 +457,17 @@ fn witnessed_submission_outcome_observation_is_pending_without_tick() { make_intent_kind("test/runtime"), b"pending-outcome".to_vec(), ); - let (submission, generation) = match runtime.submit_intent(envelope).unwrap() { + let accepted = match runtime.submit_intent(envelope).unwrap() { IntentSubmissionDisposition::Accepted { submission_id, submission_generation, .. - } => (submission_id, submission_generation), - IntentSubmissionDisposition::Duplicate { .. } => panic!("first submission duplicated"), + } => Some((submission_id, submission_generation)), + IntentSubmissionDisposition::Duplicate { .. } => None, + }; + assert!(accepted.is_some(), "first submission should be accepted"); + let Some((submission, generation)) = accepted else { + return; }; assert!(matches!( @@ -458,9 +495,13 @@ fn ticketed_ingress_correlates_tick_receipt_after_scheduler_owned_tick() { make_intent_kind("test/runtime"), b"correlate-after-tick".to_vec(), ); - let submission = match runtime.submit_intent(envelope.clone()).unwrap() { - IntentSubmissionDisposition::Accepted { submission_id, .. } => submission_id, - IntentSubmissionDisposition::Duplicate { .. } => panic!("first submission duplicated"), + let accepted = match runtime.submit_intent(envelope.clone()).unwrap() { + IntentSubmissionDisposition::Accepted { submission_id, .. } => Some(submission_id), + IntentSubmissionDisposition::Duplicate { .. } => None, + }; + assert!(accepted.is_some(), "first submission should be accepted"); + let Some(submission) = accepted else { + return; }; let ticket = admission_ticket(5); let staged = runtime @@ -471,12 +512,18 @@ fn ticketed_ingress_correlates_tick_receipt_after_scheduler_owned_tick() { envelope.clone(), ) .unwrap(); - let ticketed_ingress_id = match staged { - TicketedRuntimeIngressDisposition::Staged { record, .. } => record.ticketed_ingress_id, - TicketedRuntimeIngressDisposition::Duplicate { .. } => { - panic!("first ticketed ingress duplicated") - } + let staged_record = match staged { + TicketedRuntimeIngressDisposition::Staged { record, .. } => Some(record), + TicketedRuntimeIngressDisposition::Duplicate { .. } => None, + }; + assert!( + staged_record.is_some(), + "first ticketed ingress should be staged" + ); + let Some(staged_record) = staged_record else { + return; }; + let ticketed_ingress_id = staged_record.ticketed_ingress_id; let mut provenance = registered_worldlines_provenance(&runtime); let records = @@ -530,9 +577,13 @@ fn ticketed_submission_outcome_observation_is_decided_after_scheduler_tick() { make_intent_kind("test/runtime"), b"decided-outcome".to_vec(), ); - let submission = match runtime.submit_intent(envelope.clone()).unwrap() { - IntentSubmissionDisposition::Accepted { submission_id, .. } => submission_id, - IntentSubmissionDisposition::Duplicate { .. } => panic!("first submission duplicated"), + let accepted = match runtime.submit_intent(envelope.clone()).unwrap() { + IntentSubmissionDisposition::Accepted { submission_id, .. } => Some(submission_id), + IntentSubmissionDisposition::Duplicate { .. } => None, + }; + assert!(accepted.is_some(), "first submission should be accepted"); + let Some(submission) = accepted else { + return; }; let ticket = admission_ticket(6); runtime From e3c5cdddc51befc6f7b7a3b3716f87b339ccc7d8 Mon Sep 17 00:00:00 2001 From: James Ross Date: Wed, 20 May 2026 10:20:16 -0700 Subject: [PATCH 5/9] Fix: reject duplicate runtime ingress in ticketed path --- CHANGELOG.md | 3 ++ crates/warp-core/src/coordinator.rs | 20 ++++++++- crates/warp-core/tests/inbox.rs | 65 +++++++++++++++++++++++++++++ 3 files changed, 86 insertions(+), 2 deletions(-) diff --git a/CHANGELOG.md b/CHANGELOG.md index f897ca0f..2000cdb1 100644 --- a/CHANGELOG.md +++ b/CHANGELOG.md @@ -344,6 +344,9 @@ ### Fixed +- `warp-core` ticketed runtime ingress now rejects duplicate pending or already + committed runtime ingress before recording ticketed correlation material, so + an admission ticket cannot retroactively claim legacy direct inbox work. - `Determinism Guards` no longer runs `apt-get install ripgrep`; static guard scripts now fall back to Perl regex scanning when `rg` is unavailable, so mirror stalls cannot hang the determinism gate. diff --git a/crates/warp-core/src/coordinator.rs b/crates/warp-core/src/coordinator.rs index 61d02361..b6185691 100644 --- a/crates/warp-core/src/coordinator.rs +++ b/crates/warp-core/src/coordinator.rs @@ -102,6 +102,15 @@ pub enum RuntimeError { /// A different admission ticket already staged this witnessed submission. #[error("witnessed submission already has ticketed runtime ingress: {0:?}")] TicketedIngressAlreadyStaged(Hash), + /// Ticketed runtime ingress attempted to claim an envelope that was already + /// pending or committed through another ingress path. + #[error("ticketed runtime ingress cannot claim duplicate runtime ingress {ingress_id:?} for head {head_key:?}")] + TicketedIngressDuplicateRuntimeIngress { + /// The resolved writer head containing the duplicate ingress. + head_key: WriterHeadKey, + /// The content-addressed ingress id that was already known to runtime. + ingress_id: Hash, + }, } /// Echo-owned intake/correlation generation for witnessed intent submissions. @@ -934,8 +943,9 @@ impl WorldlineRuntime { /// # Errors /// /// Returns an error when the submission is unknown, the envelope does not - /// match the witnessed submission, the target rejects the envelope, or a - /// different ticket has already staged the same submission. + /// match the witnessed submission, the target rejects the envelope, the + /// runtime ingress already exists through another path, or a different + /// ticket has already staged the same submission. pub fn ingest_ticketed_invocation( &mut self, _authority: &TicketedRuntimeIngressAuthority, @@ -975,6 +985,12 @@ impl WorldlineRuntime { } let ingress = self.ingest(envelope)?; + if !matches!(ingress, IngressDisposition::Accepted { .. }) { + return Err(RuntimeError::TicketedIngressDuplicateRuntimeIngress { + head_key, + ingress_id, + }); + } let record = TicketedRuntimeIngressRecord { ticketed_ingress_id, submission_id, diff --git a/crates/warp-core/tests/inbox.rs b/crates/warp-core/tests/inbox.rs index 433ba6c3..885c0490 100644 --- a/crates/warp-core/tests/inbox.rs +++ b/crates/warp-core/tests/inbox.rs @@ -637,6 +637,71 @@ fn legacy_ingress_without_ticket_does_not_create_receipt_correlation() { assert_eq!(runtime.receipt_correlation_count(), 0); } +#[test] +#[cfg(feature = "host_test")] +fn ticketed_ingress_rejects_legacy_pending_duplicate_without_correlation() { + let mut runtime = WorldlineRuntime::new(); + let mut engine = empty_engine(); + let worldline_id = wl(1); + runtime + .register_worldline(worldline_id, WorldlineState::empty()) + .unwrap(); + let head_key = register_head(&mut runtime, worldline_id, "default", None, true); + let envelope = IngressEnvelope::local_intent( + IngressTarget::DefaultWriter { worldline_id }, + make_intent_kind("test/runtime"), + b"legacy-pending-duplicate".to_vec(), + ); + let ingress_id = envelope.ingress_id(); + + let accepted = match runtime.ingest(envelope.clone()).unwrap() { + IngressDisposition::Accepted { submission_id, .. } => Some(submission_id), + IngressDisposition::Duplicate { .. } => None, + }; + assert!( + accepted.is_some(), + "legacy ingress should be accepted first" + ); + let Some(submission) = accepted else { + return; + }; + + let err = runtime + .ingest_ticketed_invocation( + &ticketed_runtime_ingress_authority(), + submission, + &admission_ticket(7), + envelope, + ) + .unwrap_err(); + + assert!(matches!( + err, + RuntimeError::TicketedIngressDuplicateRuntimeIngress { + head_key: duplicate_head_key, + ingress_id: duplicate_ingress_id, + } if duplicate_head_key == head_key && duplicate_ingress_id == ingress_id + )); + assert_eq!(runtime.ticketed_runtime_ingress_count(), 0); + assert_eq!(runtime.receipt_correlation_count(), 0); + + let mut provenance = registered_worldlines_provenance(&runtime); + let records = + SchedulerCoordinator::super_tick(&mut runtime, &mut provenance, &mut engine).unwrap(); + + assert_eq!(records.len(), 1); + assert_eq!(runtime.ticketed_runtime_ingress_count(), 0); + assert_eq!(runtime.receipt_correlation_count(), 0); + assert!(matches!( + runtime.observe_intent_outcome(&submission), + IntentOutcomeObservation::Pending { + submission_id, + ticketed_ingress_id: None, + .. + } if submission_id == submission + )); +} + #[test] fn runtime_ingest_is_idempotent_per_resolved_head_after_commit() { let mut runtime = WorldlineRuntime::new(); From 9c46b447fa791e21075c11837c9c27340dea457b Mon Sep 17 00:00:00 2001 From: James Ross Date: Wed, 20 May 2026 10:20:43 -0700 Subject: [PATCH 6/9] Fix: split verification into explicit lanes --- .github/workflows/ci.yml | 94 +++++++++++++++++-- .github/workflows/macos-local.yml | 37 +++++++- CHANGELOG.md | 9 +- .../design.md | 2 +- .../witnessed-suffix-admission-evaluator.md | 2 +- .../witnessed-suffix-admission-shell.md | 2 +- docs/determinism/SPEC_DETERMINISTIC_MATH.md | 4 +- docs/method/HANDOFF.md | 2 +- .../backlog/inbox/PLATFORM_tooling-misc.md | 2 +- docs/workflows.md | 2 +- scripts/hooks/README.md | 12 +-- scripts/verify-local.sh | 87 +++++++++++------ 12 files changed, 202 insertions(+), 53 deletions(-) diff --git a/.github/workflows/ci.yml b/.github/workflows/ci.yml index 309ef7b4..040f6fcf 100644 --- a/.github/workflows/ci.yml +++ b/.github/workflows/ci.yml @@ -34,8 +34,8 @@ jobs: - name: verify-local hook regression run: bash tests/hooks/test_verify_local.sh - clippy: - name: Clippy + clippy-libs-core: + name: Clippy (core libs) runs-on: ubuntu-latest steps: - uses: actions/checkout@v4 @@ -48,8 +48,90 @@ jobs: with: workspaces: | . - - name: cargo clippy - run: cargo clippy --all-targets -- -D warnings -D missing_docs + - name: cargo clippy (core libraries) + run: | + cargo clippy \ + -p warp-core \ + -p warp-geom \ + -p warp-wasm \ + -p echo-wasm-abi \ + -p echo-runtime-schema \ + -p echo-dry-tests \ + -p echo-graph \ + -p echo-app-core \ + -p echo-config-fs \ + -p echo-session-proto \ + --lib -- -D warnings -D missing_docs + + clippy-libs-support: + name: Clippy (support libs) + runs-on: ubuntu-latest + steps: + - uses: actions/checkout@v4 + with: + submodules: false + - uses: dtolnay/rust-toolchain@1.90.0 + with: + components: clippy + - uses: Swatinem/rust-cache@v2 + with: + workspaces: | + . + - name: cargo clippy (support libraries) + run: | + cargo clippy \ + -p echo-registry-api \ + -p echo-wasm-bindings \ + -p echo-wesley-gen \ + -p echo-cas \ + -p echo-scene-port \ + -p echo-scene-codec \ + -p echo-ttd \ + -p echo-dind-harness \ + -p echo-dind-tests \ + -p ttd-browser \ + -p ttd-protocol-rs \ + -p method \ + --lib -- -D warnings -D missing_docs + + clippy-bins: + name: Clippy (bins) + runs-on: ubuntu-latest + steps: + - uses: actions/checkout@v4 + with: + submodules: false + - uses: dtolnay/rust-toolchain@1.90.0 + with: + components: clippy + - uses: Swatinem/rust-cache@v2 + with: + workspaces: | + . + - name: cargo clippy (binaries) + run: | + cargo clippy -p warp-core --bin gen_sin_qtr_lut -- -D warnings -D missing_docs + cargo clippy -p warp-cli --bins -- -D warnings -D missing_docs + cargo clippy -p echo-dind-harness --bins -- -D warnings -D missing_docs + cargo clippy -p echo-wesley-gen --bins -- -D warnings -D missing_docs + cargo clippy -p xtask --bins -- -D warnings -D missing_docs + + clippy-warp-core-tests: + name: Clippy (warp-core test lanes) + runs-on: ubuntu-latest + steps: + - uses: actions/checkout@v4 + with: + submodules: false + - uses: dtolnay/rust-toolchain@1.90.0 + with: + components: clippy + - uses: Swatinem/rust-cache@v2 + with: + workspaces: | + . + - name: cargo clippy (warp-core runtime inbox) + run: cargo clippy -p warp-core --features native_rule_bootstrap,host_test --test inbox -- -D warnings -D missing_docs - name: cargo clippy (warp-core host_test admission fixtures) run: | cargo clippy -p warp-core --features host_test --test causal_fact_publication_tests -- -D warnings -D missing_docs @@ -69,8 +151,8 @@ jobs: with: workspaces: | . - - name: cargo clippy (warp-core, det_fixed) - run: cargo clippy -p warp-core --all-targets --features det_fixed -- -D warnings -D missing_docs + - name: cargo clippy (warp-core lib, det_fixed) + run: cargo clippy -p warp-core --lib --features det_fixed -- -D warnings -D missing_docs test-workspace: name: Tests (workspace sans warp-core) diff --git a/.github/workflows/macos-local.yml b/.github/workflows/macos-local.yml index 24993255..cba9de13 100644 --- a/.github/workflows/macos-local.yml +++ b/.github/workflows/macos-local.yml @@ -20,8 +20,41 @@ jobs: . - name: cargo fmt run: cargo fmt --all -- --check - - name: cargo clippy - run: cargo clippy --all-targets -- -D warnings -D missing_docs + - name: cargo clippy (libraries) + run: | + cargo clippy \ + -p warp-core \ + -p warp-geom \ + -p warp-wasm \ + -p echo-wasm-abi \ + -p echo-runtime-schema \ + -p echo-dry-tests \ + -p echo-graph \ + -p echo-app-core \ + -p echo-config-fs \ + -p echo-session-proto \ + -p echo-registry-api \ + -p echo-wasm-bindings \ + -p echo-wesley-gen \ + -p echo-cas \ + -p echo-scene-port \ + -p echo-scene-codec \ + -p echo-ttd \ + -p echo-dind-harness \ + -p echo-dind-tests \ + -p ttd-browser \ + -p ttd-protocol-rs \ + -p method \ + --lib -- -D warnings -D missing_docs + - name: cargo clippy (binaries) + run: | + cargo clippy -p warp-core --bin gen_sin_qtr_lut -- -D warnings -D missing_docs + cargo clippy -p warp-cli --bins -- -D warnings -D missing_docs + cargo clippy -p echo-dind-harness --bins -- -D warnings -D missing_docs + cargo clippy -p echo-wesley-gen --bins -- -D warnings -D missing_docs + cargo clippy -p xtask --bins -- -D warnings -D missing_docs + - name: cargo clippy (warp-core runtime inbox) + run: cargo clippy -p warp-core --features native_rule_bootstrap,host_test --test inbox -- -D warnings -D missing_docs - name: cargo clippy (warp-core host_test admission fixtures) run: | cargo clippy -p warp-core --features host_test --test causal_fact_publication_tests -- -D warnings -D missing_docs diff --git a/CHANGELOG.md b/CHANGELOG.md index 2000cdb1..06cffbc1 100644 --- a/CHANGELOG.md +++ b/CHANGELOG.md @@ -329,6 +329,11 @@ ### Changed +- CI and local verification now split broad clippy coverage into explicit + library, binary, and selected integration-test lanes instead of invoking one + monolithic cargo pass. The `warp-core` runtime inbox test target is now an + explicit lint lane with the same `native_rule_bootstrap,host_test` features + used by the ticketed-ingress regression suite. - Local verification now treats rustdoc warnings as CI-owned by default. `scripts/verify-local.sh` skips local rustdoc lanes unless `VERIFY_LOCAL_RUSTDOC=1` is set, keeping pre-push and full local gates focused @@ -719,8 +724,8 @@ warp-core` and `warp-core` shards, preserving the required `Tests` status - **Fixed** removed `redundant_clone` clippy suppression from `head.rs` and `coordinator.rs` test modules. - **Fixed** ADR exceptions ledger sentinel row no longer mimics an active entry. -- **Fixed** verification matrix in implementation plan now matches hook-enforced - gate (`--workspace --all-targets -D missing_docs`). +- **Fixed** verification matrix in implementation plan now matches the + hook-enforced gate used in that phase. ### fix(warp-core): self-review fixes for Phases 0–3 diff --git a/docs/design/0016-wesley-to-echo-toy-contract-proof/design.md b/docs/design/0016-wesley-to-echo-toy-contract-proof/design.md index fc85bd4d..9441d6f6 100644 --- a/docs/design/0016-wesley-to-echo-toy-contract-proof/design.md +++ b/docs/design/0016-wesley-to-echo-toy-contract-proof/design.md @@ -200,7 +200,7 @@ Broader generator witness: ```sh cargo test -p echo-wesley-gen -cargo clippy -p echo-wesley-gen --all-targets -- -D warnings -D missing_docs +cargo clippy -p echo-wesley-gen --lib --tests --bins -- -D warnings -D missing_docs ``` Result: passed. diff --git a/docs/design/witnessed-suffix-admission-evaluator.md b/docs/design/witnessed-suffix-admission-evaluator.md index 47e1e52e..9360e418 100644 --- a/docs/design/witnessed-suffix-admission-evaluator.md +++ b/docs/design/witnessed-suffix-admission-evaluator.md @@ -466,6 +466,6 @@ Expected implementation gates for the future RED/GREEN cycle: cargo fmt --all -- --check cargo test -p echo-wasm-abi --lib witnessed_suffix cargo test -p warp-core --lib witnessed_suffix -cargo clippy -p warp-core --all-targets -- -D warnings -D missing_docs +cargo clippy -p warp-core --lib -- -D warnings -D missing_docs pnpm docs:build ``` diff --git a/docs/design/witnessed-suffix-admission-shell.md b/docs/design/witnessed-suffix-admission-shell.md index 080fe3df..025e621a 100644 --- a/docs/design/witnessed-suffix-admission-shell.md +++ b/docs/design/witnessed-suffix-admission-shell.md @@ -378,6 +378,6 @@ Expected implementation: cargo fmt --all -- --check cargo test -p echo-wasm-abi --lib witnessed_suffix cargo test -p warp-core --lib witnessed_suffix -cargo clippy -p warp-core --all-targets -- -D warnings -D missing_docs +cargo clippy -p warp-core --lib -- -D warnings -D missing_docs pnpm docs:build ``` diff --git a/docs/determinism/SPEC_DETERMINISTIC_MATH.md b/docs/determinism/SPEC_DETERMINISTIC_MATH.md index 98a7c9d5..9714cfef 100644 --- a/docs/determinism/SPEC_DETERMINISTIC_MATH.md +++ b/docs/determinism/SPEC_DETERMINISTIC_MATH.md @@ -74,7 +74,7 @@ The default `warp-core` build uses the float32-backed lane (`F32Scalar`) and the trig backend (`warp_core::math::trig`). - `cargo test -p warp-core` -- `cargo clippy -p warp-core --all-targets -- -D warnings -D missing_docs` +- `cargo clippy -p warp-core --lib -- -D warnings -D missing_docs` ### Fixed-point lane (`det_fixed`) @@ -82,7 +82,7 @@ trig backend (`warp_core::math::trig`). default runtime surface. - `cargo test -p warp-core --features det_fixed` -- `cargo clippy -p warp-core --all-targets --features det_fixed -- -D warnings -D missing_docs` +- `cargo clippy -p warp-core --lib --features det_fixed -- -D warnings -D missing_docs` ### MUSL (Linux portability lane) diff --git a/docs/method/HANDOFF.md b/docs/method/HANDOFF.md index 6c7bd3f6..63a2d4d0 100644 --- a/docs/method/HANDOFF.md +++ b/docs/method/HANDOFF.md @@ -58,7 +58,7 @@ Verification already run: - `cargo test -p method --test inbox_tests` - `cargo test -p method --test status_tests` - `cargo test -p method --lib` -- `cargo clippy -p method --all-targets -- -D warnings` +- `cargo clippy -p method --lib --tests -- -D warnings` - `cargo xtask method status --json` - `cargo xtask method inbox --help` - `git diff --check` diff --git a/docs/method/backlog/inbox/PLATFORM_tooling-misc.md b/docs/method/backlog/inbox/PLATFORM_tooling-misc.md index 800412fb..549f074f 100644 --- a/docs/method/backlog/inbox/PLATFORM_tooling-misc.md +++ b/docs/method/backlog/inbox/PLATFORM_tooling-misc.md @@ -144,7 +144,7 @@ Housekeeping tasks: documentation, logging, naming consistency, and debugger UX - R1: Add a single local entry point for the current Rustdoc gate commands on the critical crates - R2: Ensure the command runs with `RUSTDOCFLAGS="-D warnings"` so it matches the CI rustdoc gate -- R3: Document when contributors should run it, how it differs from plain `cargo doc`, and which broader compile/doc gates remain separate (`RUSTFLAGS="-Dwarnings"`, `cargo clippy --all-targets -- -D missing_docs`, `cargo test`) +- R3: Document when contributors should run it, how it differs from plain `cargo doc`, and which broader compile/doc gates remain separate (`RUSTFLAGS="-Dwarnings"`, explicit clippy lane commands, `cargo test`) - R4: Keep the crate list aligned with the CI rustdoc gate **Acceptance Criteria:** diff --git a/docs/workflows.md b/docs/workflows.md index 68237124..94963c51 100644 --- a/docs/workflows.md +++ b/docs/workflows.md @@ -67,7 +67,7 @@ Checkpoint checks: ```sh cargo fmt --all cargo test --workspace -cargo clippy --all-targets -- -D warnings -D missing_docs +make verify-pr ``` --- diff --git a/scripts/hooks/README.md b/scripts/hooks/README.md index 679e4c9c..4e4a49bf 100644 --- a/scripts/hooks/README.md +++ b/scripts/hooks/README.md @@ -48,8 +48,8 @@ fallback if you need to debug the lane runner itself. A critical path no longer means “run the same local Rust cargo gauntlet for every kind of full change.” Tooling-only full changes stay tooling-local, while -critical Rust changes run a local smoke lane and leave the exhaustive all-target -proof to CI. +critical Rust changes run a local smoke lane and leave exhaustive target-family +proof to CI's explicit lane matrix. That local smoke path is also file-family aware for `warp-core`: ordinary source edits stay on the library test lane, while runtime/inbox, playback, and PRNG @@ -65,8 +65,8 @@ or other non-Rust crate changes do not wake the Rust smoke lanes. `make verify-ultra-fast` is now the shortest edit-loop lane. It stays compile-first: Rust changes get `cargo check` on changed Rust crates plus the -same targeted critical smoke selection used by the full gate, while clippy, -guard scans, and exhaustive local proof stay on the heavier paths and in CI. +selected witness targets for the touched file family, while clippy, guard +scans, and broader proof stay on the heavier paths and in CI. Rustdoc warnings are CI-owned by default; use `VERIFY_LOCAL_RUSTDOC=1 make verify-full` only when you deliberately want to pay that local cost. Tooling-only changes stay on a syntax/smoke path instead @@ -82,5 +82,5 @@ locally. Local timing data now lands in `$(git rev-parse --git-dir)/verify-local/timing.jsonl`, including run-level and per-lane durations, which keeps timing artifacts out of the tracked repo while still making lane cost visible. The staged and reduced local Rust paths are also intentionally -narrower than CI: heavy all-target clippy coverage stays in CI, while local -hooks bias toward faster iteration on the current work surface. +narrower than CI: broad clippy coverage is split across CI lane families, while +local hooks bias toward faster iteration on the current work surface. diff --git a/scripts/verify-local.sh b/scripts/verify-local.sh index 73bf8554..60830a0a 100755 --- a/scripts/verify-local.sh +++ b/scripts/verify-local.sh @@ -363,6 +363,7 @@ FULL_SCOPE_RUN_WARP_CORE_SMOKE=0 FULL_SCOPE_WARP_WASM_TEST_MODE="none" FULL_SCOPE_ECHO_WASM_ABI_RUN_LIB=0 FULL_SCOPE_ECHO_WASM_ABI_EXTRA_TESTS=() +FULL_SCOPE_WARP_CORE_CLIPPY_TESTS=() FULL_SCOPE_WARP_CORE_EXTRA_TESTS=() FULL_SCOPE_WARP_CORE_RUN_PRNG=0 @@ -1017,18 +1018,13 @@ clippy_target_args_for_scope() { return fi - if [[ "$scope" == "full" ]]; then - printf '%s\n' "--all-targets" - return - fi - if crate_supports_lib_target "$crate"; then printf '%s\n' "--lib" elif crate_supports_bin_target "$crate"; then printf '%s\n' "--bins" else - printf '%s\n' "--all-targets" - return + echo "verify-local: ${crate} has no library or binary clippy lane" >&2 + exit 1 fi if crate_supports_lib_target "$crate" && ! crate_is_fast_clippy_lib_only "$crate"; then @@ -1067,6 +1063,7 @@ filter_package_set_by_selection() { } prepare_warp_core_scope() { + FULL_SCOPE_WARP_CORE_CLIPPY_TESTS=() FULL_SCOPE_WARP_CORE_EXTRA_TESTS=() FULL_SCOPE_WARP_CORE_RUN_PRNG=0 @@ -1075,18 +1072,25 @@ prepare_warp_core_scope() { [[ -z "$file" ]] && continue case "$file" in crates/warp-core/tests/*.rs) - append_unique "$(basename "$file" .rs)" FULL_SCOPE_WARP_CORE_EXTRA_TESTS + local test_name + test_name="$(basename "$file" .rs)" + append_unique "$test_name" FULL_SCOPE_WARP_CORE_EXTRA_TESTS + append_unique "$test_name" FULL_SCOPE_WARP_CORE_CLIPPY_TESTS ;; crates/warp-core/src/optic_artifact.rs) append_unique "optic_artifact_registry_tests" FULL_SCOPE_WARP_CORE_EXTRA_TESTS append_unique "optic_invocation_admission_tests" FULL_SCOPE_WARP_CORE_EXTRA_TESTS append_unique "causal_fact_publication_tests" FULL_SCOPE_WARP_CORE_EXTRA_TESTS append_unique "capability_grant_intent_tests" FULL_SCOPE_WARP_CORE_EXTRA_TESTS + append_unique "optic_invocation_admission_tests" FULL_SCOPE_WARP_CORE_CLIPPY_TESTS + append_unique "causal_fact_publication_tests" FULL_SCOPE_WARP_CORE_CLIPPY_TESTS ;; crates/warp-core/src/causal_facts.rs) append_unique "causal_fact_publication_tests" FULL_SCOPE_WARP_CORE_EXTRA_TESTS append_unique "optic_artifact_registry_tests" FULL_SCOPE_WARP_CORE_EXTRA_TESTS append_unique "optic_invocation_admission_tests" FULL_SCOPE_WARP_CORE_EXTRA_TESTS + append_unique "causal_fact_publication_tests" FULL_SCOPE_WARP_CORE_CLIPPY_TESTS + append_unique "optic_invocation_admission_tests" FULL_SCOPE_WARP_CORE_CLIPPY_TESTS ;; crates/warp-core/src/coordinator.rs|\ crates/warp-core/src/engine_impl.rs|\ @@ -1096,6 +1100,7 @@ prepare_warp_core_scope() { crates/warp-core/src/worldline_registry.rs|\ crates/warp-core/src/runtime*.rs) append_unique "inbox" FULL_SCOPE_WARP_CORE_EXTRA_TESTS + append_unique "inbox" FULL_SCOPE_WARP_CORE_CLIPPY_TESTS ;; crates/warp-core/src/playback.rs) append_unique "playback_cursor_tests" FULL_SCOPE_WARP_CORE_EXTRA_TESTS @@ -1106,6 +1111,38 @@ prepare_warp_core_scope() { ;; esac done <<< "${CHANGED_FILES}" + + append_unique "causal_fact_publication_tests" FULL_SCOPE_WARP_CORE_CLIPPY_TESTS + append_unique "optic_invocation_admission_tests" FULL_SCOPE_WARP_CORE_CLIPPY_TESTS +} + +warp_core_feature_args_for_test() { + local test_target="$1" + + case "$test_target" in + inbox) + printf '%s\n' "--features" "native_rule_bootstrap,host_test" + ;; + causal_fact_publication_tests|optic_invocation_admission_tests) + printf '%s\n' "--features" "host_test" + ;; + esac +} + +run_warp_core_test_target() { + local lane="$1" + local test_target="$2" + local -a feature_args=() + mapfile -t feature_args < <(warp_core_feature_args_for_test "$test_target") + lane_cargo "$lane" test -p warp-core "${feature_args[@]}" --test "$test_target" +} + +run_warp_core_clippy_test_target() { + local lane="$1" + local test_target="$2" + local -a feature_args=() + mapfile -t feature_args < <(warp_core_feature_args_for_test "$test_target") + lane_cargo "$lane" clippy -p warp-core "${feature_args[@]}" --test "$test_target" -- -D warnings -D missing_docs } prepare_warp_wasm_scope() { @@ -1287,8 +1324,10 @@ run_full_lane_clippy_core() { echo "[verify-local][clippy-core] curated clippy on selected core packages" lane_cargo "full-clippy-core" clippy "${args[@]}" --lib -- -D warnings -D missing_docs if array_contains "warp-core" "${FULL_SCOPE_CLIPPY_CORE_PACKAGES[@]}"; then - lane_cargo "full-clippy-core" clippy -p warp-core --features host_test --test causal_fact_publication_tests -- -D warnings -D missing_docs - lane_cargo "full-clippy-core" clippy -p warp-core --features host_test --test optic_invocation_admission_tests -- -D warnings -D missing_docs + local test_target + for test_target in "${FULL_SCOPE_WARP_CORE_CLIPPY_TESTS[@]}"; do + run_warp_core_clippy_test_target "full-clippy-core" "$test_target" + done fi } @@ -1355,14 +1394,7 @@ run_full_lane_tests_warp_core() { lane_cargo "full-tests-warp-core" test -p warp-core --lib local test_target for test_target in "${FULL_SCOPE_WARP_CORE_EXTRA_TESTS[@]}"; do - case "$test_target" in - causal_fact_publication_tests|optic_invocation_admission_tests) - lane_cargo "full-tests-warp-core" test -p warp-core --features host_test --test "$test_target" - ;; - *) - lane_cargo "full-tests-warp-core" test -p warp-core --test "$test_target" - ;; - esac + run_warp_core_test_target "full-tests-warp-core" "$test_target" done if [[ "$FULL_SCOPE_WARP_CORE_RUN_PRNG" == "1" ]]; then lane_cargo "full-tests-warp-core" test -p warp-core --features golden_prng --test prng_golden_regression @@ -1572,18 +1604,15 @@ run_ultra_fast_smoke() { if [[ "$FULL_SCOPE_RUN_WARP_CORE_SMOKE" == "1" ]]; then echo "[verify-local][ultra-fast] warp-core smoke" - cargo +"$PINNED" test -p warp-core --lib local warp_core_test_target for warp_core_test_target in "${FULL_SCOPE_WARP_CORE_EXTRA_TESTS[@]}"; do - case "$warp_core_test_target" in - causal_fact_publication_tests|optic_invocation_admission_tests) - cargo +"$PINNED" test -p warp-core --features host_test --test "$warp_core_test_target" - ;; - *) - cargo +"$PINNED" test -p warp-core --test "$warp_core_test_target" - ;; - esac + local -a feature_args=() + mapfile -t feature_args < <(warp_core_feature_args_for_test "$warp_core_test_target") + cargo +"$PINNED" test -p warp-core "${feature_args[@]}" --test "$warp_core_test_target" done + if [[ ${#FULL_SCOPE_WARP_CORE_EXTRA_TESTS[@]} -eq 0 && "$FULL_SCOPE_WARP_CORE_RUN_PRNG" != "1" ]]; then + echo "[verify-local][ultra-fast] warp-core: cargo check already covered this edit" + fi if [[ "$FULL_SCOPE_WARP_CORE_RUN_PRNG" == "1" ]]; then cargo +"$PINNED" test -p warp-core --features golden_prng --test prng_golden_regression fi @@ -1671,7 +1700,7 @@ run_auto_mode() { run_targeted_checks "${changed_crates[@]}" ;; full) - echo "[verify-local] full verification required by critical/tooling changes" + echo "[verify-local] explicit verification lanes required by critical/tooling changes" run_full_checks ;; *) @@ -1732,7 +1761,7 @@ case "$MODE" in full) if should_skip_via_stamp "full"; then VERIFY_RUN_CACHE_STATE="cached" - echo "[verify-local] reusing cached full verification for tree $(printf '%.12s' "$VERIFY_STAMP_SUBJECT")" + echo "[verify-local] reusing cached explicit-lane verification for tree $(printf '%.12s' "$VERIFY_STAMP_SUBJECT")" exit 0 fi run_full_checks From 8f6627ebeb88965ba7dcee11309b0bd477d44e31 Mon Sep 17 00:00:00 2001 From: James Ross Date: Wed, 20 May 2026 10:25:19 -0700 Subject: [PATCH 7/9] Fix: validate canonical ingress before submit intake --- CHANGELOG.md | 3 +++ crates/warp-core/src/head_inbox.rs | 16 ++++++++++++++++ 2 files changed, 19 insertions(+) diff --git a/CHANGELOG.md b/CHANGELOG.md index 06cffbc1..f7677417 100644 --- a/CHANGELOG.md +++ b/CHANGELOG.md @@ -349,6 +349,9 @@ ### Fixed +- `warp-core` submit-only intake now enforces the same canonical + `IngressEnvelope` content-address invariant as runtime inbox ingestion, so + `would_accept(...)` cannot approve malformed ingress ids. - `warp-core` ticketed runtime ingress now rejects duplicate pending or already committed runtime ingress before recording ticketed correlation material, so an admission ticket cannot retroactively claim legacy direct inbox work. diff --git a/crates/warp-core/src/head_inbox.rs b/crates/warp-core/src/head_inbox.rs index 27a3c1f8..1b93d64a 100644 --- a/crates/warp-core/src/head_inbox.rs +++ b/crates/warp-core/src/head_inbox.rs @@ -334,6 +334,7 @@ impl HeadInbox { /// history without entering runtime scheduling. #[must_use] pub fn would_accept(&self, envelope: &IngressEnvelope) -> bool { + envelope.assert_canonical_ingress_id(); self.policy_accepts(envelope) } @@ -637,4 +638,19 @@ mod tests { envelope.ingress_id = [0xff; 32]; let _ = inbox.ingest(envelope); } + + #[test] + #[should_panic(expected = "ingress_id does not match payload")] + fn invalid_envelope_panics_on_would_accept() { + let inbox = HeadInbox::new( + WriterHeadKey { + worldline_id: wl(1), + head_id: crate::head::make_head_id("default"), + }, + InboxPolicy::AcceptAll, + ); + let mut envelope = make_envelope(test_kind(), b"payload"); + envelope.ingress_id = [0xfe; 32]; + let _ = inbox.would_accept(&envelope); + } } From aed7a95a4979871501088532af9bf7a0d00d8710 Mon Sep 17 00:00:00 2001 From: James Ross Date: Wed, 20 May 2026 10:39:41 -0700 Subject: [PATCH 8/9] Fix: avoid receipt correlation checkpoint clones --- CHANGELOG.md | 4 + crates/warp-core/src/coordinator.rs | 235 ++++++++++++++++++++++++---- 2 files changed, 211 insertions(+), 28 deletions(-) diff --git a/CHANGELOG.md b/CHANGELOG.md index f7677417..fa94b501 100644 --- a/CHANGELOG.md +++ b/CHANGELOG.md @@ -349,6 +349,10 @@ ### Fixed +- `SchedulerCoordinator::super_tick(...)` now journals ticket-local receipt + correlation writes instead of checkpointing whole historical correlation + indexes, preserving failure-atomic rollback while keeping rollback + bookkeeping proportional to the attempted tick's writes. - `warp-core` submit-only intake now enforces the same canonical `IngressEnvelope` content-address invariant as runtime inbox ingestion, so `would_accept(...)` cannot approve malformed ingress ids. diff --git a/crates/warp-core/src/coordinator.rs b/crates/warp-core/src/coordinator.rs index b6185691..9c367c95 100644 --- a/crates/warp-core/src/coordinator.rs +++ b/crates/warp-core/src/coordinator.rs @@ -413,9 +413,30 @@ struct RuntimeCheckpoint { global_tick: GlobalTick, heads: BTreeMap, frontiers: BTreeMap, - receipt_correlations_by_ticketed_ingress: BTreeMap, - receipt_correlation_by_submission: BTreeMap, - receipt_correlation_by_ticket: BTreeMap, +} + +#[derive(Clone, Debug)] +struct ReceiptCorrelationRollbackEntry { + ticketed_ingress_id: Hash, + previous_record: Option, + submission_id: Hash, + previous_submission_ticketed_ingress: Option, + ticket_digest: Hash, + previous_ticket_ticketed_ingress: Option, +} + +#[derive(Clone, Debug, Default)] +struct ReceiptCorrelationRollback { + entries: Vec, +} + +#[derive(Clone, Copy, Debug)] +struct ReceiptCorrelationCommitContext { + head_key: WriterHeadKey, + commit_global_tick: GlobalTick, + worldline_tick_after: WorldlineTick, + tick_receipt_digest: Hash, + commit_hash: Hash, } impl WorldlineRuntime { @@ -452,20 +473,11 @@ impl WorldlineRuntime { global_tick: self.global_tick, heads, frontiers, - receipt_correlations_by_ticketed_ingress: self - .receipt_correlations_by_ticketed_ingress - .clone(), - receipt_correlation_by_submission: self.receipt_correlation_by_submission.clone(), - receipt_correlation_by_ticket: self.receipt_correlation_by_ticket.clone(), }) } fn restore(&mut self, checkpoint: RuntimeCheckpoint) { self.global_tick = checkpoint.global_tick; - self.receipt_correlations_by_ticketed_ingress = - checkpoint.receipt_correlations_by_ticketed_ingress; - self.receipt_correlation_by_submission = checkpoint.receipt_correlation_by_submission; - self.receipt_correlation_by_ticket = checkpoint.receipt_correlation_by_ticket; for head in checkpoint.heads.into_values() { self.heads.insert(head); } @@ -475,6 +487,41 @@ impl WorldlineRuntime { self.refresh_runnable(); } + fn rollback_receipt_correlations(&mut self, rollback: &mut ReceiptCorrelationRollback) { + for entry in rollback.entries.drain(..).rev() { + match entry.previous_record { + Some(previous) => { + self.receipt_correlations_by_ticketed_ingress + .insert(entry.ticketed_ingress_id, previous); + } + None => { + self.receipt_correlations_by_ticketed_ingress + .remove(&entry.ticketed_ingress_id); + } + } + match entry.previous_submission_ticketed_ingress { + Some(previous) => { + self.receipt_correlation_by_submission + .insert(entry.submission_id, previous); + } + None => { + self.receipt_correlation_by_submission + .remove(&entry.submission_id); + } + } + match entry.previous_ticket_ticketed_ingress { + Some(previous) => { + self.receipt_correlation_by_ticket + .insert(entry.ticket_digest, previous); + } + None => { + self.receipt_correlation_by_ticket + .remove(&entry.ticket_digest); + } + } + } + } + /// Returns the registered worldline frontiers. #[must_use] pub fn worldlines(&self) -> &WorldlineRegistry { @@ -1122,18 +1169,15 @@ impl WorldlineRuntime { fn record_receipt_correlations( &mut self, - head_key: WriterHeadKey, admitted: &[IngressEnvelope], - commit_global_tick: GlobalTick, - worldline_tick_after: WorldlineTick, - tick_receipt_digest: Hash, - commit_hash: Hash, + context: ReceiptCorrelationCommitContext, + rollback: &mut ReceiptCorrelationRollback, ) { for envelope in admitted { let ingress_id = envelope.ingress_id(); let Some(ticketed_ingress_id) = self .ticketed_runtime_ingress_by_target - .get(&(head_key, ingress_id)) + .get(&(context.head_key, ingress_id)) .copied() else { continue; @@ -1156,12 +1200,29 @@ impl WorldlineRuntime { submission_id: ticketed_ingress.submission_id, ticket_digest: ticketed_ingress.ticket_digest, ingress_id, - head_key, - commit_global_tick, - worldline_tick_after, - tick_receipt_digest, - commit_hash, + head_key: context.head_key, + commit_global_tick: context.commit_global_tick, + worldline_tick_after: context.worldline_tick_after, + tick_receipt_digest: context.tick_receipt_digest, + commit_hash: context.commit_hash, }; + rollback.entries.push(ReceiptCorrelationRollbackEntry { + ticketed_ingress_id, + previous_record: self + .receipt_correlations_by_ticketed_ingress + .get(&ticketed_ingress_id) + .cloned(), + submission_id: ticketed_ingress.submission_id, + previous_submission_ticketed_ingress: self + .receipt_correlation_by_submission + .get(&ticketed_ingress.submission_id) + .copied(), + ticket_digest: ticketed_ingress.ticket_digest, + previous_ticket_ticketed_ingress: self + .receipt_correlation_by_ticket + .get(&ticketed_ingress.ticket_digest) + .copied(), + }); self.receipt_correlations_by_ticketed_ingress .insert(ticketed_ingress_id, record); self.receipt_correlation_by_submission @@ -1304,6 +1365,7 @@ impl SchedulerCoordinator { } let runtime_before = runtime.checkpoint_for(&keys)?; + let mut receipt_correlation_rollback = ReceiptCorrelationRollback::default(); let provenance_before: ProvenanceCheckpoint = provenance.checkpoint_for(keys.iter().map(|key| key.worldline_id))?; @@ -1393,12 +1455,15 @@ impl SchedulerCoordinator { (snapshot.state_root, worldline_tick_after) }; runtime.record_receipt_correlations( - *key, &admitted, - next_global_tick, - worldline_tick_after, - tick_receipt_digest, - snapshot.hash, + ReceiptCorrelationCommitContext { + head_key: *key, + commit_global_tick: next_global_tick, + worldline_tick_after, + tick_receipt_digest, + commit_hash: snapshot.hash, + }, + &mut receipt_correlation_rollback, ); Ok(StepRecord { @@ -1414,11 +1479,13 @@ impl SchedulerCoordinator { let record = match outcome { Ok(Ok(record)) => record, Ok(Err(err)) => { + runtime.rollback_receipt_correlations(&mut receipt_correlation_rollback); runtime.restore(runtime_before); provenance.restore(&provenance_before); return Err(err); } Err(payload) => { + runtime.rollback_receipt_correlations(&mut receipt_correlation_rollback); runtime.restore(runtime_before); provenance.restore(&provenance_before); resume_unwind(payload); @@ -2911,6 +2978,118 @@ mod tests { ); } + #[test] + fn super_tick_failure_rolls_back_ticketed_receipt_correlations() { + let mut runtime = WorldlineRuntime::new(); + let mut engine = empty_engine(); + let worldline_a = wl(1); + let worldline_b = wl(2); + runtime + .register_worldline(worldline_a, WorldlineState::empty()) + .unwrap(); + runtime + .register_worldline(worldline_b, WorldlineState::empty()) + .unwrap(); + let head_a = register_head( + &mut runtime, + worldline_a, + "default-a", + None, + true, + InboxPolicy::AcceptAll, + ); + let head_b = register_head( + &mut runtime, + worldline_b, + "default-b", + None, + true, + InboxPolicy::AcceptAll, + ); + let env_a = IngressEnvelope::local_intent( + IngressTarget::DefaultWriter { + worldline_id: worldline_a, + }, + make_intent_kind("test"), + b"commit-a".to_vec(), + ); + let env_b = IngressEnvelope::local_intent( + IngressTarget::DefaultWriter { + worldline_id: worldline_b, + }, + make_intent_kind("test"), + b"commit-b".to_vec(), + ); + let ingress_id = env_a.ingress_id(); + let submission_id = match runtime.ingest(env_a).unwrap() { + IngressDisposition::Accepted { submission_id, .. } => submission_id, + IngressDisposition::Duplicate { .. } => { + unreachable!("first runtime ingress must be accepted") + } + }; + runtime.ingest(env_b).unwrap(); + + let ticket_digest = [7; 32]; + let ticketed_ingress_id = + derive_ticketed_runtime_ingress_id(submission_id, ticket_digest, ingress_id, head_a); + let ticketed_record = TicketedRuntimeIngressRecord { + ticketed_ingress_id, + submission_id, + ticket_digest, + ingress_id, + head_key: head_a, + }; + runtime + .ticketed_runtime_ingress + .insert(ticketed_ingress_id, ticketed_record); + runtime + .ticketed_runtime_ingress_by_submission + .insert(submission_id, ticketed_ingress_id); + runtime + .ticketed_runtime_ingress_by_target + .insert((head_a, ingress_id), ticketed_ingress_id); + + { + let frontier = runtime.worldlines.frontier_mut(&worldline_b).unwrap(); + let broken_root = frontier.state.root.warp_id; + assert!(frontier.state.warp_state.delete_instance(&broken_root)); + } + + let mut provenance = mirrored_provenance(&runtime); + let err = SchedulerCoordinator::super_tick(&mut runtime, &mut provenance, &mut engine) + .unwrap_err(); + assert!(matches!( + err, + RuntimeError::Engine(EngineError::UnknownWarp(warp_id)) + if warp_id == runtime + .worldlines + .get(&worldline_b) + .unwrap() + .state() + .root() + .warp_id + )); + assert_eq!( + runtime.receipt_correlation_count(), + 0, + "failed SuperTick must roll back receipt correlations from earlier heads" + ); + assert!(runtime + .receipt_correlation_for_ticketed_ingress(&ticketed_ingress_id) + .is_none()); + assert!(runtime + .receipt_correlation_for_submission(&submission_id) + .is_none()); + assert!(runtime + .receipt_correlation_for_ticket(&ticket_digest) + .is_none()); + assert_eq!( + runtime.heads.get(&head_b).unwrap().inbox().pending_count(), + 1, + "rollback must preserve the failing head inbox contents" + ); + } + #[test] fn super_tick_restores_runtime_before_resuming_a_later_head_panic() { let mut runtime = WorldlineRuntime::new(); From 2d30271bc27be1b01b8fd8b3fe6cfb49bcc61bec Mon Sep 17 00:00:00 2001 From: James Ross Date: Wed, 20 May 2026 10:43:42 -0700 Subject: [PATCH 9/9] Fix: align hook cache regression expectation --- tests/hooks/test_verify_local.sh | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/tests/hooks/test_verify_local.sh b/tests/hooks/test_verify_local.sh index b95a4b72..9e06bc43 100755 --- a/tests/hooks/test_verify_local.sh +++ b/tests/hooks/test_verify_local.sh @@ -1078,7 +1078,7 @@ else fi fake_full_stamp_output="$(run_fake_full_stamp_sequence)" -if printf '%s\n' "$fake_full_stamp_output" | grep -q 'reusing cached full verification for tree tree-aaaaaaa'; then +if printf '%s\n' "$fake_full_stamp_output" | grep -q 'reusing cached explicit-lane verification for tree tree-aaaaaaa'; then pass "full verification stamp reuse keys off the working tree instead of HEAD" else fail "full verification should reuse the cache for a different commit with the same working tree"