Skip to content

feat(workflow): add integration_processing_records action for external data sources#98

Closed
larryro wants to merge 2 commits into
mainfrom
feat/84-integration-processing-records
Closed

feat(workflow): add integration_processing_records action for external data sources#98
larryro wants to merge 2 commits into
mainfrom
feat/84-integration-processing-records

Conversation

@larryro

@larryro larryro commented Jan 7, 2026

Copy link
Copy Markdown
Collaborator

Summary

  • Adds a new integration_processing_records workflow action that enables processing records from external data sources (SQL databases, REST APIs)
  • Supports incremental processing with multiple strategies: timestamp-based, cursor-based, ID-based, and full scan
  • Implements processing record tracking via workflowProcessingRecords table with extended table names (integration:<integrationName>:<sourceIdentifier>)
  • Includes TTFT (Time to First Token) metrics for agent responses (from feat/chat branch)

Closes #84

Changes

  • New action: integration_processing_records with helpers for:
    • find_unprocessed: Find and claim unprocessed records from external sources
    • record_processed: Mark records as completed
    • build_fetch_params: Handle resume point injection for incremental processing
    • extract_record_id: Extract unique identifiers from fetched records
  • New Convex mutations/queries in integration_processing_records.ts for tracking external record processing
  • Added timeToFirstTokenMs field to message metadata schema and UI display

Test plan

  • Verify integration action can fetch records from external SQL/REST sources
  • Verify incremental processing correctly resumes from last processed record
  • Verify processing records are tracked correctly in workflowProcessingRecords
  • Verify TTFT metrics display correctly in message info dialog

🤖 Generated with Claude Code

@coderabbitai

coderabbitai Bot commented Jan 7, 2026

Copy link
Copy Markdown
Contributor
📝 Walkthrough

Walkthrough

This PR introduces a comprehensive integration processing records feature for Convex workflows. It adds a new base module (integration_processing_records.ts) that exposes five mutations/queries for tracking and querying the state of integration data processing. The PR registers a new integrationProcessingRecordsAction in the workflow actions registry and provides supporting helper modules that implement core logic: resume point handling for incremental fetches, record ID extraction and validation, unprocessed record discovery with claim logic, and processed record persistence. The system supports multiple incremental strategies (timestamp-based, cursor-based, ID-based, full-scan) and includes comprehensive type definitions for configuration and result structures.

Estimated code review effort

🎯 4 (Complex) | ⏱️ ~50 minutes

Possibly related issues

Possibly related PRs

  • refactor(workflow-processing): add atomic claim/lock mechanism and flatten helpers structure #23 — Introduces integration-specific processing-records mutations and helpers (claim, recordProcessed, status, resume points) that mirror the atomic operations and status handling in this PR's core module.
  • optimize rag action #4 — Updates the record-oriented workflow processing API with functions like isRecordProcessed and recordProcessed that overlap with the same record-processing names and semantics introduced here.
  • tale-project/poc2#271 — Extends the workflow and integration processing-records surface by adding workflows and integrations wiring that depend on the integration processing records module added in this PR.

Comment @coderabbitai help to get the list of available commands and usage tips.

@coderabbitai coderabbitai Bot left a comment

Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Actionable comments posted: 7

🤖 Fix all issues with AI agents
In
@services/platform/convex/workflow/actions/integration_processing_records/helpers/build_fetch_params.ts:
- Around line 156-180: The code uses timestampField as the field holding IDs in
the case 'id_based' branch which is confusing; update the logic in the switch
branch for 'id_based' inside build_fetch_params (the case 'id_based' block that
iterates records and calls getNestedValue) to first prefer a dedicated idField
if provided on the IncrementalConfig, falling back to timestampField if not, and
add a concise clarifying comment above the block explaining that this branch
treats the chosen field as the resume ID (and reference IncrementalConfig in
types.ts); alternatively, if you don't want a new config property now, at
minimum add that clarifying comment and a TODO to rename timestampField to
resumeField or add idField later.

In
@services/platform/convex/workflow/actions/integration_processing_records/helpers/extract_record_id.ts:
- Around line 12-27: Extract the duplicated getNestedValue implementation into a
single exported helper function in a new shared utility module (e.g.,
helpers/getNestedValue.ts), then replace the inline copies in both
extract_record_id and build_fetch_params to import that function; update imports
to use the shared getNestedValue, keep the signature getNestedValue(obj:
Record<string, unknown>, path: string): unknown, and ensure TypeScript types
compile where it is referenced.

In
@services/platform/convex/workflow/actions/integration_processing_records/helpers/find_unprocessed.ts:
- Around line 46-54: The calculateCutoffTimestamp function currently returns a
far-future date when backoffHours === BACKOFF_NEVER_REPROCESS, which inverts
logic and makes records appear unprocessed; change the branch for
BACKOFF_NEVER_REPROCESS to return the epoch (new Date(0).toISOString()) so
processedAt >= cutoff will mark records as processed, or better yet import and
reuse the canonical calculateCutoffTimestamp from
model/workflow_processing_records/calculate_cutoff_timestamp.ts to ensure
consistent behavior across codepaths.

In
@services/platform/convex/workflow/actions/integration_processing_records/integration_processing_records_action.ts:
- Around line 103-116: Add a brief comment above the
IntegrationProcessingRecordsActionParams type explaining the type/validator
duality: state that the TypeScript discriminated union
(IntegrationProcessingRecordsActionParams) provides compile-time safety and
IDE/autocomplete benefits, while a separate runtime validator (declared
elsewhere in this file) enforces the same shape at runtime for incoming action
payloads; place the comment immediately above the type declaration and mention
the validator symbol name if present later in the file so maintainers understand
why both exist.

In
@services/platform/convex/workflow/actions/integration_processing_records/types.ts:
- Around line 41-46: createIntegrationTableName currently allows empty
integrationName or sourceIdentifier which yields malformed table names; add
validation in the createIntegrationTableName function to ensure both
integrationName and sourceIdentifier are non-empty (after trimming) and throw a
clear error (or return Result) if either is empty, so callers cannot produce
strings like "integration::source" or "integration:name:"; reference the
createIntegrationTableName function and validate its integrationName and
sourceIdentifier parameters before constructing the return value.
- Around line 51-55: The type guard isIntegrationTableName currently only checks
the 'integration:' prefix and allows invalid strings like "integration:foo";
update it to enforce the required two-colon pattern for IntegrationTableName
(e.g., use a regex that ensures "integration:<non-empty>:<non-empty>" or
explicitly split on ':' and validate there are exactly three non-empty
segments), and return true only when that stricter check passes so the guard
accurately narrows to IntegrationTableName.
📜 Review details

Configuration used: Organization UI

Review profile: ASSERTIVE

Plan: Pro (Legacy)

📥 Commits

Reviewing files that changed from the base of the PR and between 972bab3 and cf7faad.

📒 Files selected for processing (8)
  • services/platform/convex/integration_processing_records.ts
  • services/platform/convex/workflow/actions/action_registry.ts
  • services/platform/convex/workflow/actions/integration_processing_records/helpers/build_fetch_params.ts
  • services/platform/convex/workflow/actions/integration_processing_records/helpers/extract_record_id.ts
  • services/platform/convex/workflow/actions/integration_processing_records/helpers/find_unprocessed.ts
  • services/platform/convex/workflow/actions/integration_processing_records/helpers/record_processed.ts
  • services/platform/convex/workflow/actions/integration_processing_records/integration_processing_records_action.ts
  • services/platform/convex/workflow/actions/integration_processing_records/types.ts
🧰 Additional context used
📓 Path-based instructions (1)
**/*.{ts,tsx}

📄 CodeRabbit inference engine (CLAUDE.md)

**/*.{ts,tsx}: USE implicit typing whenever possible in TypeScript
DO NOT use type casting. Avoid any, and unknown whenever possible in TypeScript
ALWAYS put imports at the top and exports at the bottom. Keep them sorted correctly in TypeScript
PREFER named exports. AVOID default exports (only if needed) in TypeScript
PREFER export const/let, export function, export class etc. instead of export { ... } in TypeScript
PREFER export * from instead of export { ... } from in TypeScript
DO NOT export if not needed outside the module in TypeScript

Files:

  • services/platform/convex/workflow/actions/integration_processing_records/helpers/extract_record_id.ts
  • services/platform/convex/workflow/actions/integration_processing_records/helpers/build_fetch_params.ts
  • services/platform/convex/workflow/actions/integration_processing_records/helpers/record_processed.ts
  • services/platform/convex/workflow/actions/integration_processing_records/integration_processing_records_action.ts
  • services/platform/convex/workflow/actions/integration_processing_records/helpers/find_unprocessed.ts
  • services/platform/convex/integration_processing_records.ts
  • services/platform/convex/workflow/actions/action_registry.ts
  • services/platform/convex/workflow/actions/integration_processing_records/types.ts
🧠 Learnings (25)
📓 Common learnings
Learnt from: larryro
Repo: tale-project/tale PR: 18
File: services/platform/convex/workflow/actions/conversation/helpers/update_conversations.ts:7-10
Timestamp: 2025-12-15T14:01:55.275Z
Learning: In Convex action helpers (services/platform/convex/workflow/actions/**/helpers/*.ts), using Record<string, unknown> for update parameters is acceptable when field validation is handled at the mutation level in Convex. This provides flexibility for dynamic field updates while keeping validation centralized at the mutation layer.
📚 Learning: 2026-01-07T05:05:11.898Z
Learnt from: larryro
Repo: tale-project/tale PR: 96
File: services/platform/convex/workflow/actions/rag/helpers/get_document_info.ts:33-35
Timestamp: 2026-01-07T05:05:11.898Z
Learning: In workflow action helpers (e.g., services/platform/convex/workflow/actions/rag/helpers/get_document_info.ts), when accepting ID parameters like `recordId` from workflow parameters (which are untyped at runtime), prefer keeping the parameter type as `string` at the boundary and using a localized cast (e.g., `params.recordId as Id<'documents'>`) at the call site rather than tightening the type to `Id<'documents'>` in the parameter interface. This keeps the boundary explicit and avoids moving the cast to construction sites where workflow parameters are assembled.

Applied to files:

  • services/platform/convex/workflow/actions/integration_processing_records/helpers/extract_record_id.ts
  • services/platform/convex/workflow/actions/integration_processing_records/helpers/build_fetch_params.ts
  • services/platform/convex/workflow/actions/integration_processing_records/helpers/record_processed.ts
  • services/platform/convex/workflow/actions/integration_processing_records/integration_processing_records_action.ts
  • services/platform/convex/workflow/actions/integration_processing_records/types.ts
📚 Learning: 2025-12-15T14:01:50.330Z
Learnt from: larryro
Repo: tale-project/tale PR: 18
File: services/platform/convex/workflow/actions/conversation/helpers/update_conversations.ts:7-10
Timestamp: 2025-12-15T14:01:50.330Z
Learning: In Convex action helper files under services/platform/convex/workflow/actions/**/helpers/*.ts, prefer using Record<string, unknown> for update parameter types since field validation is performed at the mutation layer. This allows dynamic update shapes while keeping validation centralized at mutation runtime.

Applied to files:

  • services/platform/convex/workflow/actions/integration_processing_records/helpers/extract_record_id.ts
  • services/platform/convex/workflow/actions/integration_processing_records/helpers/build_fetch_params.ts
  • services/platform/convex/workflow/actions/integration_processing_records/helpers/record_processed.ts
  • services/platform/convex/workflow/actions/integration_processing_records/helpers/find_unprocessed.ts
📚 Learning: 2025-10-03T11:34:20.628Z
Learnt from: CR
Repo: talecorp/poc2 PR: 0
File: .cursor/rules/convex_rules.mdc:0-0
Timestamp: 2025-10-03T11:34:20.628Z
Learning: Applies to convex/**/*.ts : Ensure TypeScript Record key/value types match validators (e.g., Record<Id<'users'>, string>)

Applied to files:

  • services/platform/convex/workflow/actions/integration_processing_records/helpers/extract_record_id.ts
📚 Learning: 2025-10-03T11:34:20.628Z
Learnt from: CR
Repo: talecorp/poc2 PR: 0
File: .cursor/rules/convex_rules.mdc:0-0
Timestamp: 2025-10-03T11:34:20.628Z
Learning: Applies to convex/**/*.ts : Use Id helper type from ./_generated/dataModel to type document ids (e.g., Id<'users'>)

Applied to files:

  • services/platform/convex/workflow/actions/integration_processing_records/helpers/extract_record_id.ts
📚 Learning: 2026-01-07T05:04:32.489Z
Learnt from: larryro
Repo: tale-project/tale PR: 96
File: services/platform/convex/model/integrations/save_related_workflows.ts:45-76
Timestamp: 2026-01-07T05:04:32.489Z
Learning: In Convex workflow integration code (e.g., services/platform/convex/model/integrations/save_related_workflows.ts), prefer using targeted inline narrowing casts like (baseConfig as { variables?: Record<string, unknown> }).variables to access nested optional fields from dynamic config (Record<string, unknown>). This approach is preferable to blanket as any assertions and does not require extracting into a helper function. Apply this guidance to TypeScript files under services/platform/convex/**/*.ts for consistent, type-safe access to optional nested fields.

Applied to files:

  • services/platform/convex/workflow/actions/integration_processing_records/helpers/extract_record_id.ts
  • services/platform/convex/workflow/actions/integration_processing_records/helpers/build_fetch_params.ts
  • services/platform/convex/workflow/actions/integration_processing_records/helpers/record_processed.ts
  • services/platform/convex/workflow/actions/integration_processing_records/integration_processing_records_action.ts
  • services/platform/convex/workflow/actions/integration_processing_records/helpers/find_unprocessed.ts
  • services/platform/convex/integration_processing_records.ts
  • services/platform/convex/workflow/actions/action_registry.ts
  • services/platform/convex/workflow/actions/integration_processing_records/types.ts
📚 Learning: 2025-12-15T14:44:04.593Z
Learnt from: larryro
Repo: tale-project/tale PR: 18
File: services/platform/convex/workflow/actions/conversation/conversation_action.ts:47-98
Timestamp: 2025-12-15T14:44:04.593Z
Learning: In Convex action files under services/platform/convex/workflow/actions/**, prefer maintaining a separate TypeScript type for the action parameters alongside the runtime validators (parametersValidator). The TypeScript type provides IDE support and compile-time checking, while the validator handles runtime validation. Document this design with a clear comment next to the type/validator pair explaining the rationale (e.g., separate types for static typing vs runtime checks, and how they relate). This pattern applies across all actions in this directory, not just a single file.

Applied to files:

  • services/platform/convex/workflow/actions/integration_processing_records/helpers/extract_record_id.ts
  • services/platform/convex/workflow/actions/integration_processing_records/helpers/build_fetch_params.ts
  • services/platform/convex/workflow/actions/integration_processing_records/helpers/record_processed.ts
  • services/platform/convex/workflow/actions/integration_processing_records/integration_processing_records_action.ts
  • services/platform/convex/workflow/actions/integration_processing_records/helpers/find_unprocessed.ts
  • services/platform/convex/workflow/actions/action_registry.ts
  • services/platform/convex/workflow/actions/integration_processing_records/types.ts
📚 Learning: 2025-07-19T15:30:00.886Z
Learnt from: CR
Repo: talecorp/poc PR: 0
File: .cursor/rules/core-rules.mdc:0-0
Timestamp: 2025-07-19T15:30:00.886Z
Learning: Applies to **/*.{ts,tsx,js,jsx} : Extract reusable functions, add comprehensive error handling, improve naming and documentation, and remove temporary debug code during optimization

Applied to files:

  • services/platform/convex/workflow/actions/integration_processing_records/helpers/extract_record_id.ts
📚 Learning: 2025-11-30T03:53:00.316Z
Learnt from: CR
Repo: tale-project/poc2 PR: 0
File: .cursor/rules/convex_rules.mdc:0-0
Timestamp: 2025-11-30T03:53:00.316Z
Learning: Applies to convex/**/*.ts : Use `v.id(tableName)` validator for document IDs, and use strict TypeScript types with `Id<'tableName'>` instead of generic string types for function arguments and returns

Applied to files:

  • services/platform/convex/workflow/actions/integration_processing_records/helpers/extract_record_id.ts
  • services/platform/convex/workflow/actions/integration_processing_records/types.ts
📚 Learning: 2025-10-03T11:34:20.628Z
Learnt from: CR
Repo: talecorp/poc2 PR: 0
File: .cursor/rules/convex_rules.mdc:0-0
Timestamp: 2025-10-03T11:34:20.628Z
Learning: Applies to convex/**/*.ts : Be strict with id types; accept Id<'table'> rather than string

Applied to files:

  • services/platform/convex/workflow/actions/integration_processing_records/helpers/extract_record_id.ts
📚 Learning: 2025-11-30T03:53:00.316Z
Learnt from: CR
Repo: tale-project/poc2 PR: 0
File: .cursor/rules/convex_rules.mdc:0-0
Timestamp: 2025-11-30T03:53:00.316Z
Learning: Applies to convex/**/*.ts : Be strict with types, particularly around IDs of documents; use `Id<'tableName'>` rather than `string` for function arguments and returns

Applied to files:

  • services/platform/convex/workflow/actions/integration_processing_records/helpers/extract_record_id.ts
  • services/platform/convex/workflow/actions/integration_processing_records/types.ts
📚 Learning: 2025-12-26T03:04:07.995Z
Learnt from: larryro
Repo: tale-project/tale PR: 35
File: services/platform/convex/approvals.ts:51-62
Timestamp: 2025-12-26T03:04:07.995Z
Learning: In Convex approvals API (services/platform/convex/approvals.ts), continue the pattern of maintaining separate internalQuery functions for internal vs public API access (e.g., getApprovalInternal vs getApprovalById) even if implementations are identical. This separation preserves the ability to diverge access control patterns in the future without breaking call sites. Apply this guideline broadly to the Convex-related API files under services/platform/convex/, using the pattern services/platform/convex/**/*.ts to cover similar modules. Ensure new or refactored internal/public wrappers follow this convention and document intent where access rules may evolve.

Applied to files:

  • services/platform/convex/workflow/actions/integration_processing_records/helpers/extract_record_id.ts
  • services/platform/convex/workflow/actions/integration_processing_records/helpers/build_fetch_params.ts
  • services/platform/convex/workflow/actions/integration_processing_records/helpers/record_processed.ts
  • services/platform/convex/workflow/actions/integration_processing_records/integration_processing_records_action.ts
  • services/platform/convex/workflow/actions/integration_processing_records/helpers/find_unprocessed.ts
  • services/platform/convex/integration_processing_records.ts
  • services/platform/convex/workflow/actions/action_registry.ts
  • services/platform/convex/workflow/actions/integration_processing_records/types.ts
📚 Learning: 2025-12-30T03:24:33.770Z
Learnt from: larryro
Repo: tale-project/tale PR: 36
File: services/platform/convex/wf_step_defs.ts:33-39
Timestamp: 2025-12-30T03:24:33.770Z
Learning: In Convex API files under services/platform/convex (e.g., wf_step_defs.ts and peers) refrain from delegating trivial single-line database calls like ctx.db.get(id) to model helpers. Use direct calls for simple operations with no extra business logic. Reserve model helpers for complex tasks (ordering, filtering, validation, transformation). This guideline applies to all .ts files in this Convex API area.

Applied to files:

  • services/platform/convex/workflow/actions/integration_processing_records/helpers/extract_record_id.ts
  • services/platform/convex/workflow/actions/integration_processing_records/helpers/build_fetch_params.ts
  • services/platform/convex/workflow/actions/integration_processing_records/helpers/record_processed.ts
  • services/platform/convex/workflow/actions/integration_processing_records/integration_processing_records_action.ts
  • services/platform/convex/workflow/actions/integration_processing_records/helpers/find_unprocessed.ts
  • services/platform/convex/integration_processing_records.ts
  • services/platform/convex/workflow/actions/action_registry.ts
  • services/platform/convex/workflow/actions/integration_processing_records/types.ts
📚 Learning: 2026-01-05T01:44:20.855Z
Learnt from: larryro
Repo: tale-project/tale PR: 76
File: services/platform/convex/agent_tools/sub_agents/helpers/format_integrations.ts:73-111
Timestamp: 2026-01-05T01:44:20.855Z
Learning: Guideline: In Convex integration-related TypeScript files (e.g., services/platform/convex/agent_tools/sub_agents/helpers/format_integrations.ts and services/platform/convex/agent_tools/integrations/execute_batch_integration_internal.ts), avoid relying on generated types for optional fields. If you must access optional fields like sqlOperations or connector on Doc<'integrations'> and the generated type doesn’t capture them, use an explicit as any cast to access those fields, and document this rationale (as referenced in issue #79). During reviews, flag such casts, verify they’re necessary due to known type limitations, and ensure alternative type-safe approaches aren’t feasible before accepting the cast.

Applied to files:

  • services/platform/convex/workflow/actions/integration_processing_records/helpers/extract_record_id.ts
  • services/platform/convex/workflow/actions/integration_processing_records/helpers/build_fetch_params.ts
  • services/platform/convex/workflow/actions/integration_processing_records/helpers/record_processed.ts
  • services/platform/convex/workflow/actions/integration_processing_records/integration_processing_records_action.ts
  • services/platform/convex/workflow/actions/integration_processing_records/helpers/find_unprocessed.ts
  • services/platform/convex/integration_processing_records.ts
  • services/platform/convex/workflow/actions/action_registry.ts
  • services/platform/convex/workflow/actions/integration_processing_records/types.ts
📚 Learning: 2025-12-30T06:21:13.183Z
Learnt from: larryro
Repo: tale-project/tale PR: 37
File: services/platform/convex/model/documents/validators.ts:89-102
Timestamp: 2025-12-30T06:21:13.183Z
Learning: Do not flag a missing trailing newline for TypeScript files in code reviews. POSIX text files should end with a trailing newline and Prettier (or your formatter) will enforce this. Treat the trailing newline as a non-issue in reviews for all TS files.

Applied to files:

  • services/platform/convex/workflow/actions/integration_processing_records/helpers/extract_record_id.ts
  • services/platform/convex/workflow/actions/integration_processing_records/helpers/build_fetch_params.ts
  • services/platform/convex/workflow/actions/integration_processing_records/helpers/record_processed.ts
  • services/platform/convex/workflow/actions/integration_processing_records/integration_processing_records_action.ts
  • services/platform/convex/workflow/actions/integration_processing_records/helpers/find_unprocessed.ts
  • services/platform/convex/integration_processing_records.ts
  • services/platform/convex/workflow/actions/action_registry.ts
  • services/platform/convex/workflow/actions/integration_processing_records/types.ts
📚 Learning: 2026-01-07T04:59:31.831Z
Learnt from: larryro
Repo: tale-project/tale PR: 96
File: services/platform/convex/model/customers/update_customers.ts:98-101
Timestamp: 2026-01-07T04:59:31.831Z
Learning: When accessing v.any() schema fields in Convex code (e.g., customer.metadata, conversation.metadata), after a runtime guard that ensures the value is an object (typeof val === 'object' && val !== null && !Array.isArray(val)), prefer applying an explicit TypeScript cast to Record<string, unknown> at the usage site. This maintains type safety and explicitness at each usage point without introducing dedicated type guard functions, while aligning with how dynamically-typed schema fields are handled in the codebase.

Applied to files:

  • services/platform/convex/workflow/actions/integration_processing_records/helpers/extract_record_id.ts
  • services/platform/convex/workflow/actions/integration_processing_records/helpers/build_fetch_params.ts
  • services/platform/convex/workflow/actions/integration_processing_records/helpers/record_processed.ts
  • services/platform/convex/workflow/actions/integration_processing_records/integration_processing_records_action.ts
  • services/platform/convex/workflow/actions/integration_processing_records/helpers/find_unprocessed.ts
  • services/platform/convex/integration_processing_records.ts
  • services/platform/convex/workflow/actions/action_registry.ts
  • services/platform/convex/workflow/actions/integration_processing_records/types.ts
📚 Learning: 2025-12-15T14:01:55.275Z
Learnt from: larryro
Repo: tale-project/tale PR: 18
File: services/platform/convex/workflow/actions/conversation/helpers/update_conversations.ts:7-10
Timestamp: 2025-12-15T14:01:55.275Z
Learning: In Convex action helpers (services/platform/convex/workflow/actions/**/helpers/*.ts), using Record<string, unknown> for update parameters is acceptable when field validation is handled at the mutation level in Convex. This provides flexibility for dynamic field updates while keeping validation centralized at the mutation layer.

Applied to files:

  • services/platform/convex/workflow/actions/integration_processing_records/integration_processing_records_action.ts
  • services/platform/convex/integration_processing_records.ts
  • services/platform/convex/workflow/actions/action_registry.ts
  • services/platform/convex/workflow/actions/integration_processing_records/types.ts
📚 Learning: 2025-12-30T03:52:31.412Z
Learnt from: larryro
Repo: tale-project/tale PR: 36
File: services/platform/convex/model/workflow_processing_records/query_building/types.ts:32-34
Timestamp: 2025-12-30T03:52:31.412Z
Learning: In Convex workflow processing query building (services/platform/convex/model/workflow_processing_records/query_building/types.ts), `FindUnprocessedResult` intentionally uses `unknown | null` for the document field rather than generics because: (1) the returned document shape varies by table at runtime (conversations, customers, products, etc.), (2) consumers must perform type narrowing regardless since the table is determined dynamically, and (3) adding generics would introduce complexity without practical type safety benefits since callers already know which table they queried.

Applied to files:

  • services/platform/convex/workflow/actions/integration_processing_records/helpers/find_unprocessed.ts
  • services/platform/convex/integration_processing_records.ts
  • services/platform/convex/workflow/actions/integration_processing_records/types.ts
📚 Learning: 2025-10-03T11:34:20.628Z
Learnt from: CR
Repo: talecorp/poc2 PR: 0
File: .cursor/rules/convex_rules.mdc:0-0
Timestamp: 2025-10-03T11:34:20.628Z
Learning: Applies to convex/**/*.{ts,js} : Register internal functions with internalQuery, internalMutation, and internalAction (imported from ./_generated/server)

Applied to files:

  • services/platform/convex/integration_processing_records.ts
📚 Learning: 2025-11-30T03:53:00.316Z
Learnt from: CR
Repo: tale-project/poc2 PR: 0
File: .cursor/rules/convex_rules.mdc:0-0
Timestamp: 2025-11-30T03:53:00.316Z
Learning: Applies to convex/**/*.ts : Use the `internal` object from `convex/_generated/api.ts` to call internal functions registered with `internalQuery`, `internalMutation`, or `internalAction`

Applied to files:

  • services/platform/convex/integration_processing_records.ts
📚 Learning: 2025-11-30T03:53:00.316Z
Learnt from: CR
Repo: tale-project/poc2 PR: 0
File: .cursor/rules/convex_rules.mdc:0-0
Timestamp: 2025-11-30T03:53:00.316Z
Learning: Applies to convex/**/*.ts : Always use the new Convex function syntax with `query`, `mutation`, `internalQuery`, `internalMutation`, `action`, or `internalAction` with explicit `args`, `returns`, and `handler` properties

Applied to files:

  • services/platform/convex/integration_processing_records.ts
📚 Learning: 2025-07-03T08:43:49.346Z
Learnt from: CR
Repo: talecorp/poc PR: 0
File: .cursor/rules/next-best-practice.mdc:0-0
Timestamp: 2025-07-03T08:43:49.346Z
Learning: Applies to {actions/**/*.ts,actions/**/*.tsx,app/**/*.tsx} : Prioritize server actions for mutations and side effects (use `'use server'` and server action functions)

Applied to files:

  • services/platform/convex/workflow/actions/action_registry.ts
📚 Learning: 2025-08-21T15:03:10.828Z
Learnt from: CR
Repo: talecorp/lanserhof PR: 0
File: .cursor/rules/supabase.mdc:0-0
Timestamp: 2025-08-21T15:03:10.828Z
Learning: Applies to supabase/types.ts : Do not edit `types.ts`; it is generated by the script

Applied to files:

  • services/platform/convex/workflow/actions/integration_processing_records/types.ts
📚 Learning: 2026-01-05T01:37:40.813Z
Learnt from: larryro
Repo: tale-project/tale PR: 76
File: services/platform/convex/lib/create_web_agent.ts:22-22
Timestamp: 2026-01-05T01:37:40.813Z
Learning: In agent factory files (services/platform/convex/lib/create_*_agent.ts), explicit `ToolName[]` type annotations for convexToolNames arrays are preferred over implicit typing for consistency across agent factories and clear type documentation.

Applied to files:

  • services/platform/convex/workflow/actions/integration_processing_records/types.ts
📚 Learning: 2025-11-30T03:53:00.316Z
Learnt from: CR
Repo: tale-project/poc2 PR: 0
File: .cursor/rules/convex_rules.mdc:0-0
Timestamp: 2025-11-30T03:53:00.316Z
Learning: Applies to convex/**/*.ts : Use the helper TypeScript type `Id<'tableName'>` from `./_generated/dataModel` to get the type of the id for a given table

Applied to files:

  • services/platform/convex/workflow/actions/integration_processing_records/types.ts
🧬 Code graph analysis (4)
services/platform/convex/workflow/actions/integration_processing_records/helpers/build_fetch_params.ts (1)
services/platform/convex/workflow/actions/integration_processing_records/types.ts (3)
  • IncrementalStrategy (85-89)
  • TimestampFormat (94-98)
  • IncrementalConfig (103-122)
services/platform/convex/workflow/actions/integration_processing_records/integration_processing_records_action.ts (5)
services/platform/convex/model/common/validators.ts (1)
  • dataSourceValidator (37-71)
services/platform/convex/workflow/helpers/nodes/action/types.ts (1)
  • ActionDefinition (5-19)
services/platform/convex/workflow/actions/integration_processing_records/helpers/find_unprocessed.ts (1)
  • findUnprocessed (174-328)
services/platform/convex/integration_processing_records.ts (1)
  • recordProcessed (107-159)
services/platform/convex/workflow/actions/integration_processing_records/helpers/record_processed.ts (1)
  • recordProcessed (35-77)
services/platform/convex/workflow/actions/integration_processing_records/helpers/find_unprocessed.ts (9)
services/platform/convex/workflow/actions/integration_processing_records/types.ts (1)
  • IntegrationDataSourceConfig (127-165)
services/platform/convex/model/workflow_processing_records/calculate_cutoff_timestamp.ts (1)
  • calculateCutoffTimestamp (12-37)
services/platform/convex/model/workflow_processing_records/constants.ts (1)
  • BACKOFF_NEVER_REPROCESS (20-20)
services/platform/convex/_generated/server.d.ts (1)
  • ActionCtx (123-123)
services/platform/convex/workflow/actions/integration_processing_records/helpers/build_fetch_params.ts (1)
  • ResumePoint (17-22)
services/platform/convex/integration_processing_records.ts (1)
  • isRecordProcessed (15-44)
services/platform/convex/lib/variables/jexl_instance.ts (1)
  • jexlInstance (24-24)
services/platform/convex/workflow/actions/integration/integration_action.ts (1)
  • integrationAction (26-186)
services/platform/convex/workflow/actions/integration_processing_records/helpers/extract_record_id.ts (1)
  • extractRecordId (43-71)
services/platform/convex/workflow/actions/action_registry.ts (1)
services/platform/convex/workflow/actions/integration_processing_records/integration_processing_records_action.ts (1)
  • integrationProcessingRecordsAction (118-208)
⏰ Context from checks skipped due to timeout of 90000ms. You can increase the timeout in your CodeRabbit configuration to a maximum of 15 minutes (900000ms). (1)
  • GitHub Check: Build Docker Image
🔇 Additional comments (18)
services/platform/convex/workflow/actions/integration_processing_records/helpers/record_processed.ts (1)

35-77: LGTM! Clean implementation of the record processed helper.

The function correctly:

  • Builds the integration table name using the shared utility
  • Persists the processing record via mutation
  • Retrieves and validates the created record
  • Uses a localized cast on line 76, which is acceptable per learnings since the query returns v.any() and null has already been checked.
services/platform/convex/workflow/actions/action_registry.ts (1)

19-19: LGTM! Action registration follows existing patterns.

The new integrationProcessingRecordsAction is correctly imported and added to the ACTIONS array, enabling discovery via ACTIONS_MAP and getAction.

Also applies to: 47-47

services/platform/convex/workflow/actions/integration_processing_records/integration_processing_records_action.ts (1)

165-207: LGTM! Execute function properly validates context and delegates to helpers.

The implementation correctly:

  • Validates required workflow context variables with clear error messages
  • Delegates to appropriate helper functions based on operation
  • Handles unsupported operations with a descriptive error
services/platform/convex/workflow/actions/integration_processing_records/helpers/extract_record_id.ts (2)

43-71: LGTM! Robust record ID extraction with good type handling.

The function correctly handles strings, numbers, and objects with custom toString() implementations (like MongoDB ObjectId). Note that plain objects without a custom toString() would produce "[object Object]", but this is unlikely for ID fields in practice.


80-126: LGTM! Clean batch extraction and validation utilities.

Both extractRecordIds and validateRecordIds correctly leverage extractRecordId and provide useful abstractions for batch processing.

services/platform/convex/workflow/actions/integration_processing_records/helpers/find_unprocessed.ts (3)

59-85: LGTM! Resume point retrieval is correctly implemented.

The function properly queries the latest processed record and extracts the resume point from metadata. The casts are acceptable per learnings for accessing v.any() schema fields.


107-137: LGTM! Claim logic correctly handles concurrent execution.

The function returns false on failure (allowing iteration to continue to the next record), which is appropriate for handling concurrent claim attempts.


174-328: LGTM! Core find-and-claim logic is well-structured.

The function correctly implements:

  • Resume point retrieval and fetch parameter building
  • Multiple result shape normalization (data, records, items patterns)
  • Record iteration with ID extraction, local filtering, and processing status checks
  • Atomic claim pattern with proper handling of concurrent execution failures
services/platform/convex/workflow/actions/integration_processing_records/helpers/build_fetch_params.ts (2)

27-108: LGTM! Timestamp formatting and fetch param building are correctly implemented.

The functions properly handle all incremental strategies and timestamp formats, with appropriate fallbacks for missing configuration.


118-190: LGTM! Resume point extraction handles all strategies correctly.

The implementation properly computes max values for timestamp and ID-based strategies, passes through cursors, and returns null for full_scan.

services/platform/convex/integration_processing_records.ts (3)

15-44: LGTM! Query correctly uses index for efficient lookup.

The isRecordProcessed query properly uses the by_record index for the composite key lookup and correctly compares timestamps.


49-102: LGTM! Upsert pattern correctly handles claim logic.

The mutation properly checks for existing records and either patches or inserts as needed, returning the record ID in both cases.


107-159: LGTM! Record processed mutation follows same upsert pattern.

Consistent implementation with recordClaimed, properly updating status to completed.

services/platform/convex/workflow/actions/integration_processing_records/types.ts (5)

1-36: LGTM!

The module documentation, type-only import, and foundational type definitions are well-structured. The template literal type IntegrationTableName correctly enforces the integration:${string}:${string} pattern.


61-80: Good handling of colons in sourceIdentifier.

The destructuring pattern correctly preserves colons within the sourceIdentifier (e.g., integration:shopify:orders:v2sourceIdentifier = "orders:v2").

One edge case: if tableName is "integration::source", this returns integrationName: "". Consider whether empty segments should return null for stricter validation.


82-122: LGTM!

The incremental processing strategy types are well-documented with clear examples. The IncrementalConfig interface appropriately uses optional fields since not all strategies require all configuration options.


124-165: LGTM!

The IntegrationDataSourceConfig interface is comprehensive and well-documented. The JEXL filter expression examples are helpful for consumers. Based on learnings, using Record<string, unknown> for fetchParams is acceptable for dynamic field handling when validation is handled at the mutation layer.


167-203: LGTM!

The result types are well-defined. Using Id<'workflowProcessingRecords'> for the _id field follows Convex best practices. The use of unknown for metadata is appropriate here since the document shape varies at runtime and consumers perform type narrowing. Based on learnings, this pattern is established for workflow processing results.

Comment on lines +178 to +204
export const getLatestProcessedForIntegration = internalQuery({
args: {
tableName: v.string(), // integration:* pattern
wfDefinitionId: v.string(),
},
returns: v.union(v.any(), v.null()),
handler: async (ctx, args) => {
const { tableName, wfDefinitionId } = args;

// Get the most recently processed record for this integration source
// We need to find records with status 'completed' to get reliable resume points
const records = await ctx.db
.query('workflowProcessingRecords')
.withIndex('by_org_table_wfDefinition_processedAt')
.filter((q) =>
q.and(
q.eq(q.field('tableName'), tableName),
q.eq(q.field('wfDefinitionId'), wfDefinitionId),
q.eq(q.field('status'), 'completed'),
),
)
.order('desc')
.first();

return records;
},
});

Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

⚠️ Potential issue | 🟠 Major

🧩 Analysis chain

🏁 Script executed:

#!/bin/bash
# Find the schema file and check the index definition
fd -t f "schema\\.ts" services/platform/convex/ && \
rg -nA 10 'by_org_table_wfDefinition_processedAt' services/platform/convex/ --type=ts

Repository: tale-project/tale

Length of output: 1950


🏁 Script executed:

#!/bin/bash
# Verify the table definition for workflowProcessingRecords to see all fields and confirm status field exists
rg -nB 5 'workflowProcessingRecords:' services/platform/convex/schema.ts | head -40

Repository: tale-project/tale

Length of output: 444


🏁 Script executed:

#!/bin/bash
# Get the full workflowProcessingRecords table definition
rg -nA 30 'workflowProcessingRecords: defineTable' services/platform/convex/schema.ts

Repository: tale-project/tale

Length of output: 1352


Do not use .filter() for indexed fields; use equality conditions with .withIndex() instead, and add missing organizationId filter.

The query uses .withIndex('by_org_table_wfDefinition_processedAt') but then applies .filter() on indexed fields. Per Convex best practices, indexed fields must use equality conditions in the same order as the index definition: organizationIdtableNamewfDefinitionIdprocessedAt.

Current issues:

  • Missing equality condition on organizationId (first field in the index)
  • Using .filter() on tableName and wfDefinitionId instead of equality conditions
  • .filter() should only apply to non-indexed fields like status

The .order('desc') correctly uses the index's sort field (processedAt), and .first() will return the most recent record.

Comment on lines +156 to +180
case 'id_based': {
if (!timestampField) {
return null;
}

// Find the maximum ID in the fetched records
let maxId: number | string | null = null;

for (const record of records) {
const value = getNestedValue(record, timestampField);

if (typeof value === 'number') {
if (maxId === null || value > (maxId as number)) {
maxId = value;
}
} else if (typeof value === 'string') {
// For string IDs, use lexicographic comparison
if (maxId === null || value > (maxId as string)) {
maxId = value;
}
}
}

return maxId;
}

Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

🧹 Nitpick | 🔵 Trivial

Consider renaming timestampField or adding clarity for id_based strategy.

The timestampField property is used for the ID field in the id_based strategy (line 157), which is semantically confusing. The IncrementalConfig type in types.ts documents this usage, but consider either:

  1. Renaming to a more generic term like resumeField
  2. Adding a separate idField property for the id_based strategy
  3. Adding a clarifying comment in this function
🤖 Prompt for AI Agents
In
@services/platform/convex/workflow/actions/integration_processing_records/helpers/build_fetch_params.ts
around lines 156 - 180, The code uses timestampField as the field holding IDs in
the case 'id_based' branch which is confusing; update the logic in the switch
branch for 'id_based' inside build_fetch_params (the case 'id_based' block that
iterates records and calls getNestedValue) to first prefer a dedicated idField
if provided on the IncrementalConfig, falling back to timestampField if not, and
add a concise clarifying comment above the block explaining that this branch
treats the chosen field as the resume ID (and reference IncrementalConfig in
types.ts); alternatively, if you don't want a new config property now, at
minimum add that clarifying comment and a TODO to rename timestampField to
resumeField or add idField later.

Comment on lines +12 to +27
function getNestedValue(obj: Record<string, unknown>, path: string): unknown {
const parts = path.split('.');
let current: unknown = obj;

for (const part of parts) {
if (current === null || current === undefined) {
return undefined;
}
if (typeof current !== 'object') {
return undefined;
}
current = (current as Record<string, unknown>)[part];
}

return current;
}

Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

🛠️ Refactor suggestion | 🟠 Major

🧩 Analysis chain

🏁 Script executed:

#!/bin/bash
# Search for all getNestedValue implementations
rg -nA 16 'function getNestedValue' --type=ts

Repository: tale-project/tale

Length of output: 4377


🏁 Script executed:

#!/bin/bash
# Also check if there's already a shared utility module for this
fd -e ts -e tsx -path '*/utils/*' | head -30

Repository: tale-project/tale

Length of output: 229


🏁 Script executed:

#!/bin/bash
# Look for the build_fetch_params.ts file location
fd 'build_fetch_params.ts' --type=f

Repository: tale-project/tale

Length of output: 163


🏁 Script executed:

#!/bin/bash
# Check if getNestedValue is exported or used elsewhere
rg 'getNestedValue' --type=ts -B2 -A2

Repository: tale-project/tale

Length of output: 4047


🏁 Script executed:

#!/bin/bash
# Check the file structure of the helpers directory
ls -la services/platform/convex/workflow/actions/integration_processing_records/helpers/

Repository: tale-project/tale

Length of output: 451


🏁 Script executed:

#!/bin/bash
# Check if there's a common utilities location for this module
fd . services/platform/convex/workflow/actions/integration_processing_records/ --max-depth 2 --type d

Repository: tale-project/tale

Length of output: 142


Extract getNestedValue to a shared utility module to eliminate duplication.

This helper function is duplicated identically in both build_fetch_params.ts (lines 223-238) and extract_record_id.ts (lines 12-27). Create a shared utility file in the helpers directory and import it from both modules to maintain a single source of truth.

🤖 Prompt for AI Agents
In
@services/platform/convex/workflow/actions/integration_processing_records/helpers/extract_record_id.ts
around lines 12 - 27, Extract the duplicated getNestedValue implementation into
a single exported helper function in a new shared utility module (e.g.,
helpers/getNestedValue.ts), then replace the inline copies in both
extract_record_id and build_fetch_params to import that function; update imports
to use the shared getNestedValue, keep the signature getNestedValue(obj:
Record<string, unknown>, path: string): unknown, and ensure TypeScript types
compile where it is referenced.

Comment on lines +46 to +54
function calculateCutoffTimestamp(backoffHours: number): string {
if (backoffHours === BACKOFF_NEVER_REPROCESS) {
// Use a far future date to never consider records for reprocessing
return new Date(Date.now() + 100 * 365 * 24 * 60 * 60 * 1000).toISOString();
}

const cutoffMs = Date.now() - backoffHours * 60 * 60 * 1000;
return new Date(cutoffMs).toISOString();
}

Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

⚠️ Potential issue | 🔴 Critical

Critical: calculateCutoffTimestamp logic is inverted for BACKOFF_NEVER_REPROCESS.

When BACKOFF_NEVER_REPROCESS is used, the intent is that records should never become eligible for reprocessing (i.e., they are always considered "processed"). However, using a far future date means processedAt >= cutoffMs will always be false, making records appear unprocessed.

The model helper at model/workflow_processing_records/calculate_cutoff_timestamp.ts correctly uses epoch (new Date(0)) so that any record with processedAt >= 0 is considered processed.

🐛 Proposed fix
 function calculateCutoffTimestamp(backoffHours: number): string {
   if (backoffHours === BACKOFF_NEVER_REPROCESS) {
-    // Use a far future date to never consider records for reprocessing
-    return new Date(Date.now() + 100 * 365 * 24 * 60 * 60 * 1000).toISOString();
+    // Use epoch - any record that was ever processed will have processedAt >= 0
+    return new Date(0).toISOString();
   }
 
   const cutoffMs = Date.now() - backoffHours * 60 * 60 * 1000;
   return new Date(cutoffMs).toISOString();
 }

Alternatively, consider reusing the existing calculateCutoffTimestamp from model/workflow_processing_records/calculate_cutoff_timestamp.ts to avoid duplication and ensure consistent behavior.

📝 Committable suggestion

‼️ IMPORTANT
Carefully review the code before committing. Ensure that it accurately replaces the highlighted code, contains no missing lines, and has no issues with indentation. Thoroughly test & benchmark the code to ensure it meets the requirements.

Suggested change
function calculateCutoffTimestamp(backoffHours: number): string {
if (backoffHours === BACKOFF_NEVER_REPROCESS) {
// Use a far future date to never consider records for reprocessing
return new Date(Date.now() + 100 * 365 * 24 * 60 * 60 * 1000).toISOString();
}
const cutoffMs = Date.now() - backoffHours * 60 * 60 * 1000;
return new Date(cutoffMs).toISOString();
}
function calculateCutoffTimestamp(backoffHours: number): string {
if (backoffHours === BACKOFF_NEVER_REPROCESS) {
// Use epoch - any record that was ever processed will have processedAt >= 0
return new Date(0).toISOString();
}
const cutoffMs = Date.now() - backoffHours * 60 * 60 * 1000;
return new Date(cutoffMs).toISOString();
}
🤖 Prompt for AI Agents
In
@services/platform/convex/workflow/actions/integration_processing_records/helpers/find_unprocessed.ts
around lines 46 - 54, The calculateCutoffTimestamp function currently returns a
far-future date when backoffHours === BACKOFF_NEVER_REPROCESS, which inverts
logic and makes records appear unprocessed; change the branch for
BACKOFF_NEVER_REPROCESS to return the epoch (new Date(0).toISOString()) so
processedAt >= cutoff will mark records as processed, or better yet import and
reuse the canonical calculateCutoffTimestamp from
model/workflow_processing_records/calculate_cutoff_timestamp.ts to ensure
consistent behavior across codepaths.

Comment on lines +103 to +116
// Type for action params (discriminated union)
type IntegrationProcessingRecordsActionParams =
| {
operation: 'find_unprocessed';
dataSource: IntegrationDataSourceConfig;
backoffHours: number;
}
| {
operation: 'record_processed';
integrationName: string;
sourceIdentifier: string;
recordId: string;
metadata?: Record<string, unknown>;
};

Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

🧹 Nitpick | 🔵 Trivial

Consider adding a comment explaining the type/validator duality.

Per learnings, action files should document the rationale for maintaining both a TypeScript type and a runtime validator. A brief comment would help future maintainers understand this design choice.

📝 Suggested comment
+// TypeScript type for static type checking and IDE support.
+// The parametersValidator (below) handles runtime validation.
+// We maintain both because TypeScript types are erased at runtime,
+// while Convex validators provide runtime schema enforcement.
 type IntegrationProcessingRecordsActionParams =
   | {
       operation: 'find_unprocessed';
📝 Committable suggestion

‼️ IMPORTANT
Carefully review the code before committing. Ensure that it accurately replaces the highlighted code, contains no missing lines, and has no issues with indentation. Thoroughly test & benchmark the code to ensure it meets the requirements.

Suggested change
// Type for action params (discriminated union)
type IntegrationProcessingRecordsActionParams =
| {
operation: 'find_unprocessed';
dataSource: IntegrationDataSourceConfig;
backoffHours: number;
}
| {
operation: 'record_processed';
integrationName: string;
sourceIdentifier: string;
recordId: string;
metadata?: Record<string, unknown>;
};
// TypeScript type for static type checking and IDE support.
// The parametersValidator (below) handles runtime validation.
// We maintain both because TypeScript types are erased at runtime,
// while Convex validators provide runtime schema enforcement.
type IntegrationProcessingRecordsActionParams =
| {
operation: 'find_unprocessed';
dataSource: IntegrationDataSourceConfig;
backoffHours: number;
}
| {
operation: 'record_processed';
integrationName: string;
sourceIdentifier: string;
recordId: string;
metadata?: Record<string, unknown>;
};
🤖 Prompt for AI Agents
In
@services/platform/convex/workflow/actions/integration_processing_records/integration_processing_records_action.ts
around lines 103 - 116, Add a brief comment above the
IntegrationProcessingRecordsActionParams type explaining the type/validator
duality: state that the TypeScript discriminated union
(IntegrationProcessingRecordsActionParams) provides compile-time safety and
IDE/autocomplete benefits, while a separate runtime validator (declared
elsewhere in this file) enforces the same shape at runtime for incoming action
payloads; place the comment immediately above the type declaration and mention
the validator symbol name if present later in the file so maintainers understand
why both exist.

Comment on lines +41 to +46
export function createIntegrationTableName(
integrationName: string,
sourceIdentifier: string,
): IntegrationTableName {
return `integration:${integrationName}:${sourceIdentifier}`;
}

Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

🧹 Nitpick | 🔵 Trivial

Consider validating non-empty inputs.

The function accepts empty strings, which would produce malformed table names like integration::source or integration:name:. Adding guards would prevent invalid table names from propagating.

♻️ Suggested validation
 export function createIntegrationTableName(
   integrationName: string,
   sourceIdentifier: string,
 ): IntegrationTableName {
+  if (!integrationName || !sourceIdentifier) {
+    throw new Error('integrationName and sourceIdentifier must be non-empty');
+  }
   return `integration:${integrationName}:${sourceIdentifier}`;
 }
🤖 Prompt for AI Agents
In
@services/platform/convex/workflow/actions/integration_processing_records/types.ts
around lines 41 - 46, createIntegrationTableName currently allows empty
integrationName or sourceIdentifier which yields malformed table names; add
validation in the createIntegrationTableName function to ensure both
integrationName and sourceIdentifier are non-empty (after trimming) and throw a
clear error (or return Result) if either is empty, so callers cannot produce
strings like "integration::source" or "integration:name:"; reference the
createIntegrationTableName function and validate its integrationName and
sourceIdentifier parameters before constructing the return value.

Comment on lines +51 to +55
export function isIntegrationTableName(
tableName: string,
): tableName is IntegrationTableName {
return tableName.startsWith('integration:');
}

Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

⚠️ Potential issue | 🟡 Minor

Type guard is imprecise for IntegrationTableName.

The type IntegrationTableName is defined as integration:${string}:${string} (requiring two colons), but this guard only checks for the prefix. A string like "integration:foo" would pass but doesn't match the type pattern, causing a type-safety gap.

🔧 Suggested fix to align with the type
 export function isIntegrationTableName(
   tableName: string,
 ): tableName is IntegrationTableName {
-  return tableName.startsWith('integration:');
+  const parts = tableName.split(':');
+  return parts[0] === 'integration' && parts.length >= 3;
 }
📝 Committable suggestion

‼️ IMPORTANT
Carefully review the code before committing. Ensure that it accurately replaces the highlighted code, contains no missing lines, and has no issues with indentation. Thoroughly test & benchmark the code to ensure it meets the requirements.

Suggested change
export function isIntegrationTableName(
tableName: string,
): tableName is IntegrationTableName {
return tableName.startsWith('integration:');
}
export function isIntegrationTableName(
tableName: string,
): tableName is IntegrationTableName {
const parts = tableName.split(':');
return parts[0] === 'integration' && parts.length >= 3;
}
🤖 Prompt for AI Agents
In
@services/platform/convex/workflow/actions/integration_processing_records/types.ts
around lines 51 - 55, The type guard isIntegrationTableName currently only
checks the 'integration:' prefix and allows invalid strings like
"integration:foo"; update it to enforce the required two-colon pattern for
IntegrationTableName (e.g., use a regex that ensures
"integration:<non-empty>:<non-empty>" or explicitly split on ':' and validate
there are exactly three non-empty segments), and return true only when that
stricter check passes so the guard accurately narrows to IntegrationTableName.

@larryro larryro force-pushed the feat/84-integration-processing-records branch 8 times, most recently from 0b77122 to 3672724 Compare January 14, 2026 12:56
@larryro larryro force-pushed the feat/84-integration-processing-records branch from 3672724 to f49c485 Compare January 24, 2026 07:17
…l data sources

- Add integration_processing_records action for processing external data sources
- Add batch processing with limit parameter
Move action from workflow/actions to workflow_engine/actions where
all other actions reside, and fix import path for BACKOFF_NEVER_REPROCESS
constant.
@larryro larryro force-pushed the feat/84-integration-processing-records branch from edf2949 to fa03097 Compare February 16, 2026 10:57
@yannickmonney yannickmonney deleted the feat/84-integration-processing-records branch March 30, 2026 00:52
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment

Labels

None yet

Projects

None yet

Development

Successfully merging this pull request may close these issues.

feat(workflow): support external data source incremental processing (integration_processing_records)

2 participants