Skip to content
Closed
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
2,438 changes: 470 additions & 1,968 deletions services/platform/convex/_generated/api.d.ts

Large diffs are not rendered by default.

277 changes: 277 additions & 0 deletions services/platform/convex/integration_processing_records.ts
Original file line number Diff line number Diff line change
@@ -0,0 +1,277 @@
/**
* Integration Processing Records API
*
* Internal mutations/queries for tracking processing status of external data sources.
* These work with the existing workflowProcessingRecords table but support
* extended table names in the format `integration:<integrationName>:<sourceIdentifier>`.
*/

import { internalQuery, internalMutation } from './_generated/server';
import { v } from 'convex/values';

/**
* Check if an integration record has been processed since the cutoff timestamp
*/
export const isRecordProcessed = internalQuery({
args: {
tableName: v.string(), // Accepts integration:* pattern
recordId: v.string(),
wfDefinitionId: v.string(),
cutoffTimestamp: v.string(), // ISO date string
},
returns: v.boolean(),
handler: async (ctx, args) => {
const { tableName, recordId, wfDefinitionId, cutoffTimestamp } = args;

const cutoffMs = new Date(cutoffTimestamp).getTime();

const processedRecord = await ctx.db
.query('workflowProcessingRecords')
.withIndex('by_record', (q) =>
q
.eq('tableName', tableName)
.eq('recordId', recordId)
.eq('wfDefinitionId', wfDefinitionId),
)
.first();

if (!processedRecord) {
return false;
}

return processedRecord.processedAt >= cutoffMs;
},
});

/**
* Atomically check if a record can be claimed and claim it if available.
*
* This combines the "check if processed" and "claim" operations into a single
* atomic mutation to prevent race conditions where multiple workflow executions
* might claim the same record.
*
* @returns The processing record ID if claimed successfully, null if already processed
*/
export const checkAndClaimRecord = internalMutation({
args: {
organizationId: v.string(),
tableName: v.string(),
recordId: v.string(),
wfDefinitionId: v.string(),
recordCreationTime: v.number(),
cutoffTimestamp: v.string(),
metadata: v.optional(v.any()),
},
returns: v.union(v.id('workflowProcessingRecords'), v.null()),
handler: async (ctx, args) => {
const {
organizationId,
tableName,
recordId,
wfDefinitionId,
recordCreationTime,
cutoffTimestamp,
metadata,
} = args;

const now = Date.now();
const cutoffMs = new Date(cutoffTimestamp).getTime();

// Check if this record already has a processing entry
const existing = await ctx.db
.query('workflowProcessingRecords')
.withIndex('by_record', (q) =>
q
.eq('tableName', tableName)
.eq('recordId', recordId)
.eq('wfDefinitionId', wfDefinitionId),
)
.first();

if (existing) {
// Record exists - check if it's still within the backoff period
if (existing.processedAt >= cutoffMs) {
// Already processed recently, cannot claim
return null;
}

// Outside backoff period, can reclaim
await ctx.db.patch(existing._id, {
processedAt: now,
status: 'in_progress',
metadata,
});
return existing._id;
}

// No existing record, create new claim
return await ctx.db.insert('workflowProcessingRecords', {
organizationId,
tableName,
recordId,
wfDefinitionId,
recordCreationTime,
processedAt: now,
status: 'in_progress',
metadata,
});
},
});

/**
* Claim an integration record for processing (mark as in_progress)
*
* @deprecated Use checkAndClaimRecord for atomic check-and-claim operations
*/
export const recordClaimed = internalMutation({
args: {
organizationId: v.string(),
tableName: v.string(), // Accepts integration:* pattern
recordId: v.string(),
wfDefinitionId: v.string(),
recordCreationTime: v.number(),
metadata: v.optional(v.any()),
},
returns: v.id('workflowProcessingRecords'),
handler: async (ctx, args) => {
const {
organizationId,
tableName,
recordId,
wfDefinitionId,
recordCreationTime,
metadata,
} = args;

const now = Date.now();

// Check if this record already has a processing entry
const existing = await ctx.db
.query('workflowProcessingRecords')
.withIndex('by_record', (q) =>
q
.eq('tableName', tableName)
.eq('recordId', recordId)
.eq('wfDefinitionId', wfDefinitionId),
)
.first();

if (existing) {
await ctx.db.patch(existing._id, {
processedAt: now,
status: 'in_progress',
metadata,
});
return existing._id;
}

return await ctx.db.insert('workflowProcessingRecords', {
organizationId,
tableName,
recordId,
wfDefinitionId,
recordCreationTime,
processedAt: now,
status: 'in_progress',
metadata,
});
},
});

/**
* Mark an integration record as processed (completed)
*/
export const recordProcessed = internalMutation({
args: {
organizationId: v.string(),
tableName: v.string(), // Accepts integration:* pattern
recordId: v.string(),
wfDefinitionId: v.string(),
recordCreationTime: v.number(),
metadata: v.optional(v.any()),
},
returns: v.id('workflowProcessingRecords'),
handler: async (ctx, args) => {
const {
organizationId,
tableName,
recordId,
wfDefinitionId,
recordCreationTime,
metadata,
} = args;

const now = Date.now();

const existing = await ctx.db
.query('workflowProcessingRecords')
.withIndex('by_record', (q) =>
q
.eq('tableName', tableName)
.eq('recordId', recordId)
.eq('wfDefinitionId', wfDefinitionId),
)
.first();

if (existing) {
await ctx.db.patch(existing._id, {
processedAt: now,
status: 'completed',
metadata,
});
return existing._id;
}

return await ctx.db.insert('workflowProcessingRecords', {
organizationId,
tableName,
recordId,
wfDefinitionId,
recordCreationTime,
processedAt: now,
status: 'completed',
metadata,
});
},
});

/**
* Get a processing record by ID
*/
export const getProcessingRecordById = internalQuery({
args: {
processingRecordId: v.id('workflowProcessingRecords'),
},
returns: v.union(v.any(), v.null()),
handler: async (ctx, args) => ctx.db.get(args.processingRecordId),
});

/**
* Get the latest processed record for an integration source
* Used to retrieve the resume point for incremental processing
*/
export const getLatestProcessedForIntegration = internalQuery({
args: {
tableName: v.string(), // integration:* pattern
wfDefinitionId: v.string(),
},
returns: v.union(v.any(), v.null()),
handler: async (ctx, args) => {
const { tableName, wfDefinitionId } = args;

// Get the most recently processed record for this integration source
// Use completed status for reliable resume points
return ctx.db
.query('workflowProcessingRecords')
.withIndex('by_org_table_wfDefinition_processedAt')
.filter((q) =>
q.and(
q.eq(q.field('tableName'), tableName),
q.eq(q.field('wfDefinitionId'), wfDefinitionId),
q.eq(q.field('status'), 'completed'),
),
)
.order('desc')
.first();
},
});
Comment on lines +253 to +277

Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

⚠️ Potential issue | 🟠 Major

🧩 Analysis chain

🏁 Script executed:

#!/bin/bash
# Find the schema file and check the index definition
fd -t f "schema\\.ts" services/platform/convex/ && \
rg -nA 10 'by_org_table_wfDefinition_processedAt' services/platform/convex/ --type=ts

Repository: tale-project/tale

Length of output: 1950


🏁 Script executed:

#!/bin/bash
# Verify the table definition for workflowProcessingRecords to see all fields and confirm status field exists
rg -nB 5 'workflowProcessingRecords:' services/platform/convex/schema.ts | head -40

Repository: tale-project/tale

Length of output: 444


🏁 Script executed:

#!/bin/bash
# Get the full workflowProcessingRecords table definition
rg -nA 30 'workflowProcessingRecords: defineTable' services/platform/convex/schema.ts

Repository: tale-project/tale

Length of output: 1352


Do not use .filter() for indexed fields; use equality conditions with .withIndex() instead, and add missing organizationId filter.

The query uses .withIndex('by_org_table_wfDefinition_processedAt') but then applies .filter() on indexed fields. Per Convex best practices, indexed fields must use equality conditions in the same order as the index definition: organizationIdtableNamewfDefinitionIdprocessedAt.

Current issues:

  • Missing equality condition on organizationId (first field in the index)
  • Using .filter() on tableName and wfDefinitionId instead of equality conditions
  • .filter() should only apply to non-indexed fields like status

The .order('desc') correctly uses the index's sort field (processedAt), and .first() will return the most recent record.

Original file line number Diff line number Diff line change
Expand Up @@ -24,7 +24,6 @@ const circulySyncCustomersWorkflow = {
timeout: 300000, // 5 minutes for full sync
retryPolicy: { maxRetries: 3, backoffMs: 2000 },
variables: {
organizationId: 'org_demo',
pageSize: 50, // Fetch 50 customers per page (Circuly max is 100)
maxPages: 20, // Safety limit to prevent infinite loops
currentPage: 1, // Track current page number (Circuly uses 1-based pagination)
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -25,7 +25,6 @@ const circulySyncProductsWorkflow = {
timeout: 300000, // 5 minutes for full sync
retryPolicy: { maxRetries: 3, backoffMs: 2000 },
variables: {
organizationId: 'org_demo',
pageSize: 50, // Fetch 50 products per page (Circuly max is 100)
maxPages: 20, // Safety limit to prevent infinite loops
currentPage: 1, // Track current page number (Circuly uses 1-based pagination)
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -27,7 +27,6 @@ const circulySyncSubscriptionsWorkflow = {
timeout: 600000, // 10 minutes for full sync
retryPolicy: { maxRetries: 3, backoffMs: 2000 },
variables: {
organizationId: 'org_demo',
pageSize: 50, // Fetch 50 customers per page
currentCursor: null, // Track current pagination cursor
totalProcessed: 0, // Track total customers processed
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -33,7 +33,6 @@ const conversationAutoArchiveWorkflow: PredefinedWorkflowDefinition = {
timeout: 60000, // 1 minute timeout for single conversation
retryPolicy: { maxRetries: 2, backoffMs: 3000 },
variables: {
organizationId: 'org_demo',
workflowId: 'conversationAutoArchive',
backoffHours: 168, // Only process conversations not processed in last 168 hours (7 days)
staleDays: 30, // Archive conversations closed for over 30 days
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -22,7 +22,6 @@ const customerRagSyncWorkflow = {
timeout: 120000, // 2 minutes for single customer upload
retryPolicy: { maxRetries: 3, backoffMs: 2000 },
variables: {
organizationId: 'org_demo',
backoffHours: 72, // Only process customers not processed in last 72 hours (3 days)
workflowId: 'customer_rag_sync',
},
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -22,7 +22,6 @@ const documentRagSyncWorkflow: PredefinedWorkflowDefinition = {
timeout: 120000, // 2 minutes for single document upload
retryPolicy: { maxRetries: 3, backoffMs: 2000 },
variables: {
organizationId: 'org_demo',
backoffHours: 72, // Only process documents not processed in last 72 hours (3 days)
includeMetadata: true, // Include document metadata in upload
workflowId: 'document_rag_sync',
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -11,7 +11,6 @@ const generalCustomerStatusAssessmentWorkflow: PredefinedWorkflowDefinition = {
timeout: 120000,
retryPolicy: { maxRetries: 2, backoffMs: 1000 },
variables: {
organizationId: 'org_demo',
workflowId: 'general-customer-status-assessment',
backoffHours: 72,
},
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -11,7 +11,6 @@ const generalProductRecommendationWorkflow: PredefinedWorkflowDefinition = {
timeout: 300000,
retryPolicy: { maxRetries: 2, backoffMs: 3000 },
variables: {
organizationId: 'org_demo',
workflowId: 'general-product-recommendation',
backoffHours: 168,
},
Expand Down
2 changes: 2 additions & 0 deletions services/platform/convex/predefined_workflows/index.ts
Original file line number Diff line number Diff line change
Expand Up @@ -14,6 +14,7 @@ import onedriveSync from './onedrive_sync';
import productRagSync from './product_rag_sync';
import productRecommendationEmail from './product_recommendation_email';
import productRelationshipAnalysis from './product_relationship_analysis';
import protelGuestWelcomeEmail from './protel_guest_welcome_email';
import shopifySyncCustomers from './shopify_sync_customers';
import shopifySyncProducts from './shopify_sync_products';
import websitePagesRagSync from './website_pages_rag_sync';
Expand Down Expand Up @@ -45,4 +46,5 @@ export const workflows = {
conversationAutoArchive,
loopiCustomerStatusAssessment,
productRelationshipAnalysis,
protelGuestWelcomeEmail,
} as const;
Original file line number Diff line number Diff line change
Expand Up @@ -44,7 +44,6 @@ const loopiCustomerStatusAssessmentWorkflow = {
timeout: 120000, // 2 minutes for single customer analysis
retryPolicy: { maxRetries: 2, backoffMs: 1000 },
variables: {
organizationId: 'org_demo',
workflowId: 'assess-customer-status',
backoffHours: 72, // Only process customers not processed in last 72 hours (3 days)
},
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -41,7 +41,6 @@ const loopiProductRecommendationWorkflow = {
timeout: 300000, // 5 minutes total timeout
retryPolicy: { maxRetries: 2, backoffMs: 3000 },
variables: {
organizationId: 'org_demo',
workflowId: 'product-recommendation',
backoffHours: 168, // Process each customer once per week (7 days * 24 hours)
},
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -28,7 +28,6 @@ const onedriveSyncWorkflow: PredefinedWorkflowDefinition = {
timeout: 120000, // 2 minutes for single file sync
retryPolicy: { maxRetries: 3, backoffMs: 2000 },
variables: {
organizationId: 'org_demo',
backoffHours: 1, // Only process configs not synced in last 1 hour
workflowId: 'onedrive_sync',
},
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -22,7 +22,6 @@ const productRagSyncWorkflow = {
timeout: 120000, // 2 minutes for single product upload
retryPolicy: { maxRetries: 3, backoffMs: 2000 },
variables: {
organizationId: 'org_demo',
backoffHours: 72, // Only process products not processed in last 72 hours (3 days)
workflowId: 'product_rag_sync',
},
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -50,7 +50,6 @@ const productRecommendationEmailWorkflow: PredefinedWorkflowDefinition = {
timeout: 300000, // 5 minutes total timeout
retryPolicy: { maxRetries: 2, backoffMs: 3000 },
variables: {
organizationId: 'org_demo',
workflowId: 'product-recommendation-email',
backoffHours: 24, // Process each approval once per day
},
Expand Down
Loading
Loading