diff --git a/src/features/workers/resync-failed-files/helper/__tests__/resync-failed-files.helper.test.ts b/src/features/workers/resync-failed-files/helper/__tests__/resync-failed-files.helper.test.ts new file mode 100644 index 0000000..44177f8 --- /dev/null +++ b/src/features/workers/resync-failed-files/helper/__tests__/resync-failed-files.helper.test.ts @@ -0,0 +1,192 @@ +import { beforeEach, describe, expect, it, vi } from 'vitest' + +// Service method spies — shared across mocked class instances. +const markAttempt = vi.fn(async () => undefined) +const markFailure = vi.fn(async () => undefined) +const markUpdated = vi.fn(async () => undefined) +const markDeleted = vi.fn(async () => undefined) +const updateFileMap = vi.fn(async () => undefined) +const updateChannelMapSyncedFilesCount = vi.fn(async () => undefined) + +const retrieveFile = vi.fn() +const deleteFile = vi.fn(async () => undefined) +const completePendingAssemblyCreate = vi.fn(async () => undefined) + +const filesGetMetadata = vi.fn() +const channelSyncFindFirst = vi.fn() +const dropboxConnectionsFindFirst = vi.fn() + +vi.mock('@/config/server.env', () => ({ default: { COPILOT_API_KEY: 'test-key' } })) + +vi.mock('@/db', () => ({ + default: { + query: { + dropboxConnections: { findFirst: () => dropboxConnectionsFindFirst() }, + channelSync: { findFirst: () => channelSyncFindFirst() }, + }, + }, +})) + +vi.mock('@/lib/copilot/generateToken', () => ({ generateToken: () => 'token' })) +vi.mock('@/lib/copilot/models/User.model', () => ({ + default: { authenticate: async () => ({ portalId: 'p1' }) }, +})) +vi.mock('@/lib/copilot/CopilotAPI', () => ({ + CopilotAPI: class { + retrieveFile = retrieveFile + deleteFile = deleteFile + }, + // Mirror the production guard exactly (instanceof Error + the four SDK fields) + // rather than importing it — the real module pulls in the Copilot SDK, which + // vitest's ESM resolver can't load. + isCopilotApiError: (e: unknown) => { + if (!(e instanceof Error)) return false + const x = e as { url?: unknown; status?: unknown; statusText?: unknown; body?: unknown } + return ( + typeof x.url === 'string' && + typeof x.status === 'number' && + typeof x.statusText === 'string' && + typeof x.body === 'object' && + x.body !== null + ) + }, +})) +vi.mock('@/lib/dropbox/DropboxClient', () => ({ + DropboxClient: class { + getDropboxClient = () => ({ filesGetMetadata }) + }, +})) +vi.mock('@/features/sync/lib/Sync.service', () => ({ + SyncService: class { + completePendingAssemblyCreate = completePendingAssemblyCreate + }, +})) +vi.mock('@/features/sync/lib/MapFiles.service', () => ({ + MapFilesService: class { + markAttempt = markAttempt + markFailure = markFailure + markUpdated = markUpdated + markDeleted = markDeleted + updateFileMap = updateFileMap + updateChannelMapSyncedFilesCount = updateChannelMapSyncedFilesCount + }, +})) + +import { PendingAction, PendingActionTarget } from '@/db/constants' +import { retryFailedSyncsForPortal } from '../resync-failed-files.helper' + +const DAY_MS = 24 * 60 * 60 * 1000 + +const makeRow = (overrides: Record = {}) => + ({ + id: 'row-1', + channelSyncId: 'cs-1', + dbxFileId: 'id:dbx-1', + itemPath: '/folder/file.txt', + assemblyFileId: 'asm-1', + object: 'file', + createdAt: new Date(), + pendingAction: PendingAction.CREATE, + pendingActionTarget: PendingActionTarget.ASSEMBLY, + ...overrides, + // biome-ignore lint/suspicious/noExplicitAny: test fixture, not the full select shape + }) as any + +const dbxFileEntry = { + '.tag': 'file', + name: 'file.txt', + path_display: '/folder/file.txt', + id: 'id:dbx-1', + content_hash: 'hash-abc', +} + +beforeEach(() => { + vi.clearAllMocks() + dropboxConnectionsFindFirst.mockResolvedValue({ + refreshToken: 'rt', + accountId: 'acc', + rootNamespaceId: 'ns', + initiatedBy: 'user-1', + }) + channelSyncFindFirst.mockResolvedValue({ + id: 'cs-1', + assemblyChannelId: 'ach-1', + dbxRootPath: '/root', + }) + filesGetMetadata.mockResolvedValue({ result: dbxFileEntry }) +}) + +describe('retryCreateInAssembly :: reconcile branches', () => { + it('reconciles when the existing Assembly file is already completed', async () => { + retrieveFile.mockResolvedValue({ status: 'completed' }) + + await retryFailedSyncsForPortal('p1', [makeRow()]) + + expect(markUpdated).toHaveBeenCalledWith('row-1', { + assemblyFileId: 'asm-1', + contentHash: 'hash-abc', + }) + expect(updateChannelMapSyncedFilesCount).toHaveBeenCalledWith('cs-1') + expect(completePendingAssemblyCreate).not.toHaveBeenCalled() + expect(deleteFile).not.toHaveBeenCalled() + }) + + it('waits (marks failure) when the file is still pending and the row is young', async () => { + retrieveFile.mockResolvedValue({ status: 'pending' }) + + await retryFailedSyncsForPortal('p1', [makeRow({ createdAt: new Date() })]) + + expect(markFailure).toHaveBeenCalledWith('row-1', expect.stringContaining('still pending')) + expect(deleteFile).not.toHaveBeenCalled() + expect(completePendingAssemblyCreate).not.toHaveBeenCalled() + }) + + it('reclaims (nulls id, deletes, re-creates) when pending past the cutoff', async () => { + retrieveFile.mockResolvedValue({ status: 'pending' }) + const stale = new Date(Date.now() - 2 * DAY_MS) + + await retryFailedSyncsForPortal('p1', [makeRow({ createdAt: stale })]) + + expect(updateFileMap).toHaveBeenCalledWith({ assemblyFileId: null }, expect.anything()) + expect(deleteFile).toHaveBeenCalledWith('asm-1') + expect(completePendingAssemblyCreate).toHaveBeenCalledTimes(1) + // id is nulled before the delete fires + expect(updateFileMap.mock.invocationCallOrder[0]).toBeLessThan( + deleteFile.mock.invocationCallOrder[0], + ) + }) + + it('proceeds to create when the existing file is already gone (404)', async () => { + retrieveFile.mockRejectedValue( + Object.assign(new Error('Not found'), { + url: 'https://api.copilot/files/asm-1', + status: 404, + statusText: 'Not Found', + body: { message: 'Not found' }, + }), + ) + + await retryFailedSyncsForPortal('p1', [makeRow()]) + + expect(completePendingAssemblyCreate).toHaveBeenCalledTimes(1) + expect(markFailure).not.toHaveBeenCalled() + expect(deleteFile).not.toHaveBeenCalled() + }) + + it('skips the reconcile check and creates directly when the row has no assemblyFileId', async () => { + await retryFailedSyncsForPortal('p1', [makeRow({ assemblyFileId: null })]) + + expect(retrieveFile).not.toHaveBeenCalled() + expect(completePendingAssemblyCreate).toHaveBeenCalledTimes(1) + }) + + it('soft-deletes the row when the Dropbox source no longer exists', async () => { + filesGetMetadata.mockResolvedValue({ result: { '.tag': 'deleted', name: 'file.txt' } }) + + await retryFailedSyncsForPortal('p1', [makeRow()]) + + expect(markDeleted).toHaveBeenCalledWith('row-1') + expect(retrieveFile).not.toHaveBeenCalled() + expect(completePendingAssemblyCreate).not.toHaveBeenCalled() + }) +}) diff --git a/src/features/workers/resync-failed-files/helper/resync-failed-files.helper.ts b/src/features/workers/resync-failed-files/helper/resync-failed-files.helper.ts index 69a2e30..7cc6939 100644 --- a/src/features/workers/resync-failed-files/helper/resync-failed-files.helper.ts +++ b/src/features/workers/resync-failed-files/helper/resync-failed-files.helper.ts @@ -1,9 +1,9 @@ -import { and } from 'drizzle-orm' +import { and, eq } from 'drizzle-orm' import { DropboxResponseError } from 'dropbox' import env from '@/config/server.env' import db from '@/db' import { PendingAction, PendingActionTarget } from '@/db/constants' -import type { FileSyncSelectType } from '@/db/schema/fileFolderSync.schema' +import { type FileSyncSelectType, fileFolderSync } from '@/db/schema/fileFolderSync.schema' import APIError from '@/errors/APIError' import { MapFilesService } from '@/features/sync/lib/MapFiles.service' import { SyncService } from '@/features/sync/lib/Sync.service' @@ -24,6 +24,9 @@ type PortalDeps = { mapFilesService: MapFilesService } +// Pending uploads older than this are treated as abandoned and reclaimed. +const STUCK_PENDING_THRESHOLD_MS = 24 * 60 * 60 * 1000 + /** Entry point for the scheduled sweep — per-row errors are recorded, never rethrown. */ export const retryFailedSyncsForPortal = async (portalId: string, rows: FileSyncSelectType[]) => { const connection = await getDropboxConnection(portalId) @@ -119,9 +122,7 @@ const retryDeleteInDropbox = async (failedSync: FileSyncSelectType, deps: Portal return } - const channelSync = await db.query.channelSync.findFirst({ - where: (channelSync, { eq }) => eq(channelSync.id, failedSync.channelSyncId), - }) + const channelSync = await getChannelSync(failedSync.channelSyncId) if (!channelSync) { logger.warn('retryDeleteInDropbox :: channelSync missing, soft-deleting row', { rowId: failedSync.id, @@ -170,9 +171,7 @@ const retryCreateInDropbox = async (failedSync: FileSyncSelectType, deps: Portal return } - const channelSync = await db.query.channelSync.findFirst({ - where: (channelSync, { eq }) => eq(channelSync.id, failedSync.channelSyncId), - }) + const channelSync = await getChannelSync(failedSync.channelSyncId) if (!channelSync) { await mapFilesService.markFailure(failedSync.id, 'retryCreateInDropbox: channelSync missing') return @@ -228,9 +227,7 @@ const retryCreateInAssembly = async (failedSync: FileSyncSelectType, deps: Porta return } - const channelSync = await db.query.channelSync.findFirst({ - where: (channelSync, { eq }) => eq(channelSync.id, failedSync.channelSyncId), - }) + const channelSync = await getChannelSync(failedSync.channelSyncId) if (!channelSync) { await mapFilesService.markFailure(failedSync.id, 'retryCreateInAssembly: channelSync missing') return @@ -241,9 +238,17 @@ const retryCreateInAssembly = async (failedSync: FileSyncSelectType, deps: Porta await mapFilesService.markDeleted(failedSync.id) return } - - // Bypass syncDropboxFilesToAssembly: it would no-op because insertCreatePending sees the existing row. const entry = DropboxFileListFolderSingleEntrySchema.parse(dbxMeta) + + const shouldRecreate = await reconcileExistingAssemblyFile( + failedSync, + channelSync.id, + entry, + deps, + ) + if (!shouldRecreate) return + + // Bypass syncDropboxFilesToAssembly — it would no-op against the existing row. await syncService.completePendingAssemblyCreate({ pendingRowId: failedSync.id, itemPath: failedSync.itemPath, @@ -253,6 +258,66 @@ const retryCreateInAssembly = async (failedSync: FileSyncSelectType, deps: Porta }) } +// A row's early-stamped assemblyFileId may already point at a live file. +// Returns true if the caller should (re-)create, false if already handled. +const reconcileExistingAssemblyFile = async ( + failedSync: FileSyncSelectType, + channelSyncId: string, + entry: ReturnType, + deps: PortalDeps, +): Promise => { + const { copilotApi, mapFilesService } = deps + + if (!failedSync.assemblyFileId) return true + + // 404 → file is already gone; proceed to the create path. Other errors + // propagate so the task can retry instead of being silently swallowed. + const existing = await copilotApi.retrieveFile(failedSync.assemblyFileId).catch((err) => { + if (isCopilotApiError(err) && err.status === 404) return null + throw err + }) + + if (!existing) return true + + if (existing.status === 'completed') { + // Last attempt finished but never reached markUpdated — reconcile. + await mapFilesService.markUpdated(failedSync.id, { + assemblyFileId: failedSync.assemblyFileId, + contentHash: entry.content_hash ?? null, + }) + await mapFilesService.updateChannelMapSyncedFilesCount(channelSyncId) + logger.info('retryCreateInAssembly :: reconciled existing complete Assembly file', { + rowId: failedSync.id, + assemblyFileId: failedSync.assemblyFileId, + }) + return false + } + + // Still pending and young — likely a legit in-flight upload, retry later. + const ageMs = Date.now() - failedSync.createdAt.getTime() + if (ageMs < STUCK_PENDING_THRESHOLD_MS) { + await mapFilesService.markFailure( + failedSync.id, + 'retryCreateInAssembly: Assembly file still pending upload, will retry next sweep', + ) + return false + } + + // Abandoned. Null the id before deleting so the file.deleted webhook can't + // match this row and race the re-create. + await mapFilesService.updateFileMap( + { assemblyFileId: null }, + eq(fileFolderSync.id, failedSync.id), + ) + await copilotApi.deleteFile(failedSync.assemblyFileId) + logger.info('retryCreateInAssembly :: deleted stale pending Assembly file before re-create', { + rowId: failedSync.id, + assemblyFileId: failedSync.assemblyFileId, + ageMs, + }) + return true +} + const getDropboxConnection = async (portalId: string) => { const connection = await db.query.dropboxConnections.findFirst({ where: (dropboxConnections, { eq }) => @@ -289,6 +354,11 @@ const initializeSyncDependencies = async ( return { user, copilotApi, dbxClient, connectionToken, syncService, mapFilesService } } +const getChannelSync = (channelSyncId: string) => + db.query.channelSync.findFirst({ + where: (channelSync, { eq }) => eq(channelSync.id, channelSyncId), + }) + const getFileFromDropbox = async (dbxClient: DropboxClient, dropboxFileId: string) => { if (!dropboxFileId) return null try {