Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
142 changes: 70 additions & 72 deletions services/platform/convex/_generated/api.d.ts

Large diffs are not rendered by default.

Original file line number Diff line number Diff line change
Expand Up @@ -429,13 +429,13 @@ export const ACTION_REFERENCE: ActionReference[] = [
'operation',
'organizationId',
'tableName',
'documentId',
'workflowId',
'documentCreationTime',
'recordId',
'wfDefinitionId',
'recordCreationTime',
],
optionalParams: ['metadata'],
example:
'{ action: "workflow_processing_records", parameters: { operation: "record_processed", organizationId: "{{organizationId}}", tableName: "customers", documentId: "{{customerId}}", workflowId: "{{workflowId}}", documentCreationTime: {{customer._creationTime}} } }',
'{ action: "workflow_processing_records", parameters: { operation: "record_processed", organizationId: "{{organizationId}}", tableName: "customers", recordId: "{{customerId}}", wfDefinitionId: "{{wfDefinitionId}}", recordCreationTime: {{customer._creationTime}} } }',
},
],
},
Expand Down
6 changes: 3 additions & 3 deletions services/platform/convex/lib/create_workflow_agent.ts
Original file line number Diff line number Diff line change
Expand Up @@ -280,9 +280,9 @@ REMINDER: All of these are action steps with stepType: "action". The "type" fiel
operation: 'record_processed',
organizationId: '{{organizationId}}',
tableName: 'customers',
workflowId: '{{workflowId}}',
documentId: '{{entityId}}',
documentCreationTime: '{{entity._creationTime}}',
wfDefinitionId: '{{wfDefinitionId}}',
recordId: '{{entityId}}',
recordCreationTime: '{{entity._creationTime}}',
metadata: { processedAt: '{{now}}' }
}
}
Expand Down
8 changes: 7 additions & 1 deletion services/platform/convex/lib/variables/jexl_instance.ts
Original file line number Diff line number Diff line change
Expand Up @@ -46,7 +46,13 @@ jexlInstance.addTransform('join', (arr: unknown[], separator = ',') => {
});

// Add custom transforms for type conversion
jexlInstance.addTransform('string', (val: unknown) => toString(val));
jexlInstance.addTransform('string', (val: unknown) => {
// For objects and arrays, use JSON.stringify to get proper string representation
if (val !== null && typeof val === 'object') {
return JSON.stringify(val);
}
return toString(val);
});
jexlInstance.addTransform('number', (val: unknown) => toNumber(val));
jexlInstance.addTransform('boolean', (val: unknown) => Boolean(val));

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -26,11 +26,11 @@ interface SaveDefaultWorkflowsArgs {
* Save default workflows for a new organization
*
* This includes:
* - Document RAG Sync (scheduled every hour)
* - Document RAG Sync (scheduled every 20 minutes)
* - OneDrive Sync (scheduled every hour)
* - Product RAG Sync (scheduled every hour)
* - Website Pages RAG Sync (scheduled every hour)
* - Customer RAG Sync (scheduled every hour)
* - Product RAG Sync (scheduled every 20 minutes)
* - Website Pages RAG Sync (scheduled every 20 minutes)
* - Customer RAG Sync (scheduled every 20 minutes)
*/
export async function saveDefaultWorkflows(
ctx: ActionCtx,
Expand All @@ -46,7 +46,7 @@ export async function saveDefaultWorkflows(
const workflowsToSave = [
{
workflow: documentRagSync,
schedule: '0 */1 * * *', // Every hour
schedule: '*/20 * * * *', // Every 20 minutes
timezone: 'UTC',
},
{
Expand All @@ -56,17 +56,17 @@ export async function saveDefaultWorkflows(
},
{
workflow: productRagSync,
schedule: '0 */1 * * *', // Every hour
schedule: '*/20 * * * *', // Every 20 minutes
timezone: 'UTC',
},
{
workflow: websitePagesRagSync,
schedule: '0 */1 * * *', // Every hour
schedule: '*/20 * * * *', // Every 20 minutes
timezone: 'UTC',
},
{
workflow: customerRagSync,
schedule: '0 */1 * * *', // Every hour
schedule: '*/20 * * * *', // Every 20 minutes
timezone: 'UTC',
},
{
Expand Down
63 changes: 0 additions & 63 deletions services/platform/convex/model/wf_executions/create_execution.ts

This file was deleted.

1 change: 0 additions & 1 deletion services/platform/convex/model/wf_executions/index.ts
Original file line number Diff line number Diff line change
Expand Up @@ -7,7 +7,6 @@
export * from './types';

// Export business logic functions
export { createExecution } from './create_execution';
export { getExecution } from './get_execution';
export { getRawExecution } from './get_raw_execution';
export { listExecutions } from './list_executions';
Expand Down
22 changes: 0 additions & 22 deletions services/platform/convex/model/wf_executions/types.ts
Original file line number Diff line number Diff line change
Expand Up @@ -23,17 +23,6 @@ export interface DeserializedWorkflowExecution
variables: ExecutionVariables;
}

export interface CreateExecutionArgs {
organizationId: string;
wfDefinitionId: Doc<'wfDefinitions'>['_id'];
input?: unknown;
triggeredBy: string;
triggerData?: unknown;
workflowConfig?: unknown;
stepsConfig?: Record<string, unknown>;
workflowName?: string;
}

export interface UpdateExecutionStatusArgs {
executionId: Doc<'wfExecutions'>['_id'];
status: string;
Expand Down Expand Up @@ -102,17 +91,6 @@ export interface ListExecutionsArgs {
// Convex Validators
// =============================================================================

export const createExecutionArgsValidator = {
organizationId: v.string(),
wfDefinitionId: v.id('wfDefinitions'),
input: v.optional(v.any()),
triggeredBy: v.string(),
triggerData: v.optional(v.any()),
workflowConfig: v.optional(v.any()),
stepsConfig: v.optional(v.record(v.string(), v.any())),
workflowName: v.optional(v.string()),
};

export const updateExecutionStatusArgsValidator = {
executionId: v.id('wfExecutions'),
status: v.string(),
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -10,7 +10,7 @@ import { findUnprocessedWithCustomQuery } from './helpers/find_unprocessed_with_

export interface FindProductRecommendationByStatusArgs {
organizationId: string;
workflowId: string;
wfDefinitionId: string;
backoffHours: number; // Number of hours to look back for processing records
status: 'pending' | 'approved' | 'rejected';
}
Expand All @@ -35,7 +35,7 @@ export async function findProductRecommendationByStatus(
ctx: QueryCtx,
args: FindProductRecommendationByStatusArgs,
): Promise<FindProductRecommendationByStatusResult> {
const { organizationId, workflowId, backoffHours, status } = args;
const { organizationId, wfDefinitionId, backoffHours, status } = args;

// Calculate cutoff timestamp from backoffHours
const cutoffDate = new Date();
Expand All @@ -45,7 +45,7 @@ export async function findProductRecommendationByStatus(
const result = await findUnprocessedWithCustomQuery<Doc<'approvals'>>(ctx, {
organizationId,
tableName: 'approvals',
workflowId,
wfDefinitionId,
cutoffTimestamp,

// Build query with the specified status
Expand Down Expand Up @@ -78,4 +78,3 @@ export async function findProductRecommendationByStatus(
count: result.count,
};
}

Original file line number Diff line number Diff line change
Expand Up @@ -36,7 +36,7 @@ import { findUnprocessedWithCustomQuery } from './helpers/find_unprocessed_with_
export interface FindUnprocessedArgs {
organizationId: string;
tableName: TableName;
workflowId: string;
wfDefinitionId: string;
backoffHours: number; // Number of hours to look back for processing records
limit?: number;
}
Expand All @@ -56,7 +56,8 @@ export async function findUnprocessed(
ctx: QueryCtx,
args: FindUnprocessedArgs,
): Promise<FindUnprocessedResult> {
const { organizationId, tableName, workflowId, backoffHours, limit } = args;
const { organizationId, tableName, wfDefinitionId, backoffHours, limit } =
args;

// Calculate cutoff timestamp from backoffHours
const cutoffDate = new Date();
Expand All @@ -67,7 +68,7 @@ export async function findUnprocessed(
const result = await findUnprocessedWithCustomQuery(ctx, {
organizationId,
tableName,
workflowId,
wfDefinitionId,
cutoffTimestamp,
limit,

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -11,7 +11,7 @@ import { getLatestConversationMessage } from './helpers/get_latest_conversation_

export interface FindUnprocessedOpenConversationArgs {
organizationId: string;
workflowId: string;
wfDefinitionId: string;
backoffHours: number; // Number of hours to look back for processing records
}

Expand All @@ -38,7 +38,7 @@ export async function findUnprocessedOpenConversation(
ctx: QueryCtx,
args: FindUnprocessedOpenConversationArgs,
): Promise<FindUnprocessedOpenConversationResult> {
const { organizationId, workflowId, backoffHours } = args;
const { organizationId, wfDefinitionId, backoffHours } = args;

// Calculate cutoff timestamp from backoffHours
const cutoffDate = new Date();
Expand All @@ -50,7 +50,7 @@ export async function findUnprocessedOpenConversation(
{
organizationId,
tableName: 'conversations',
workflowId,
wfDefinitionId,
cutoffTimestamp,

// Hook 1: Build your custom query with the right index
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -17,7 +17,7 @@
* const result = await findUnprocessedWithCustomQuery(ctx, {
* organizationId,
* tableName: 'conversations',
* workflowId,
* wfDefinitionId,
* cutoffTimestamp,
* limit: 10,
* buildQuery: (resumeFrom) => {
Expand Down Expand Up @@ -65,7 +65,7 @@ export async function findUnprocessedWithCustomQuery<T = unknown>(
ctx: QueryCtx,
args: FindUnprocessedWithCustomQueryArgs<T>,
): Promise<FindUnprocessedWithCustomQueryResult<T>> {
const { organizationId, tableName, workflowId } = args;
const { organizationId, tableName, wfDefinitionId } = args;
const limitVal = args.limit ?? 1;

if (limitVal < 1 || limitVal > 10) {
Expand All @@ -78,7 +78,7 @@ export async function findUnprocessedWithCustomQuery<T = unknown>(
const resumeFrom = await getLatestProcessedCreationTime(ctx, {
organizationId,
tableName,
workflowId,
wfDefinitionId,
});

// First, try starting from the latest processed creation time (resumeFrom)
Expand Down
Original file line number Diff line number Diff line change
@@ -1,5 +1,5 @@
/**
* Get the _creationTime of the latest processed document for a workflow.
* Get the _creationTime of the latest processed record for a workflow.
*
* This is useful for resume optimization - you can start your query from this point
* instead of scanning from the beginning.
Expand All @@ -10,7 +10,7 @@
* const resumeFrom = await getLatestProcessedCreationTime(ctx, {
* organizationId,
* tableName: 'conversations',
* workflowId
* wfDefinitionId
* });
*
* // Build query with resume point
Expand All @@ -35,26 +35,26 @@ import { TableName } from '../types';
export interface GetLatestProcessedCreationTimeArgs {
organizationId: string;
tableName: TableName;
workflowId: string;
wfDefinitionId: string;
}

export async function getLatestProcessedCreationTime(
ctx: QueryCtx,
args: GetLatestProcessedCreationTimeArgs,
): Promise<number | null> {
const { organizationId, tableName, workflowId } = args;
const { organizationId, tableName, wfDefinitionId } = args;

// Get the latest processed document for this workflow+table
// Get the latest processed record for this workflow+table
const latestProcessed = await ctx.db
.query('workflowProcessingRecords')
.withIndex('by_org_table_workflow_creationTime', (q) =>
.withIndex('by_org_table_wfDefinition_creationTime', (q) =>
q
.eq('organizationId', organizationId)
.eq('tableName', tableName)
.eq('workflowId', workflowId),
.eq('wfDefinitionId', wfDefinitionId),
)
.order('desc')
.first();

return latestProcessed?.documentCreationTime ?? null;
return latestProcessed?.recordCreationTime ?? null;
}
Loading