Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
36 changes: 32 additions & 4 deletions src/features/sync/lib/MapFiles.service.ts
Original file line number Diff line number Diff line change
Expand Up @@ -75,12 +75,29 @@ export class MapFilesService extends AuthenticatedDropboxService {
return results
}

async insertFileMap(payload: FileSyncCreateType): Promise<FileSyncSelectType> {
/**
* Insert a file/folder map row; returns null on conflict. Parallel siblings
* race to create their shared parent folder and get the same assemblyFileId,
* so onConflictDoNothing dedupes the second insert (OUT-3800). WHERE must
* mirror the partial unique index predicate in fileFolderSync.schema.ts.
*/
async insertFileMap(payload: FileSyncCreateType): Promise<FileSyncSelectType | null> {
logger.info('MapFilesService#insertFileMap :: Inserting file map', payload)

const [mappedFile] = await db.insert(fileFolderSync).values(payload).returning()
const [mappedFile] = await db
.insert(fileFolderSync)
.values(payload)
.onConflictDoNothing({
target: [
fileFolderSync.portalId,
fileFolderSync.channelSyncId,
fileFolderSync.assemblyFileId,
],
where: sql`${fileFolderSync.deletedAt} IS NULL AND ${fileFolderSync.assemblyFileId} IS NOT NULL`,
})
.returning()
logger.info('MapFilesService#insertFileMap :: Inserted file map', mappedFile)
return mappedFile
return mappedFile ?? null
}

/** Pre-insert a create-pending tombstone row. Returns null if the conflict target already has a live row. */
Expand Down Expand Up @@ -293,13 +310,24 @@ export class MapFilesService extends AuthenticatedDropboxService {
channelSyncId,
)

const [mappedFile] = await this.getAllFileMaps(
const mappedFiles = await this.getAllFileMaps(
and(
eq(fileFolderSync.channelSyncId, channelSyncId),
eq(fileFolderSync.itemPath, dbxPath),
isNotNull(fileFolderSync.assemblyFileId),
) as WhereClause,
)
const [mappedFile] = mappedFiles

// A path maps to one live row. >1 means a duplicate slipped through — e.g.
// if Assembly stopped being path-idempotent on folder create.
if (mappedFiles.length > 1) {
logger.warn('MapFilesService#getDbxMappedFileFromPath :: duplicate rows for path', {
channelSyncId,
dbxPath,
assemblyFileIds: mappedFiles.map((f) => f.assemblyFileId),
})
}
logger.info(
'MapFilesService#getDbxMappedFileFromPath :: Got dbx mapped file from path',
mappedFile,
Expand Down
142 changes: 107 additions & 35 deletions src/features/sync/lib/Sync.service.ts
Original file line number Diff line number Diff line change
Expand Up @@ -149,46 +149,108 @@ export class SyncService extends AuthenticatedDropboxService {
'SyncService#createAndUploadFileToAssembly :: Creating and uploading file to Assembly for channel',
assemblyChannelId,
)
const copilotApi = new CopilotAPI(this.user.token)
const tempFileType = lastItem ? fileObjectType : ObjectType.FOLDER

const isLeafFile = lastItem && fileObjectType === ObjectType.FILE

if (isLeafFile) {
const pending = await this.mapFilesService.insertCreatePending({
await this.createLeafFileInAssembly({ assemblyChannelId, itemPath, channelSyncId, entry })
return
}

await this.createFolderInAssembly({
assemblyChannelId,
itemPath,
lastItem,
tempFileType: lastItem ? fileObjectType : ObjectType.FOLDER,
channelSyncId,
entry,
basePath,
})
}

/**
* Leaf-file create: pre-insert a create-pending row (which dedupes concurrent
* siblings via insertCreatePending's onConflict), then drive the Assembly
* create against that row, marking failure if it throws.
*/
private async createLeafFileInAssembly(params: {
assemblyChannelId: string
itemPath: string
channelSyncId: string
entry: DropboxFileListFolderSingleEntry
}): Promise<void> {
const { assemblyChannelId, itemPath, channelSyncId, entry } = params

const pending = await this.mapFilesService.insertCreatePending({
channelSyncId,
itemPath,
object: ObjectType.FILE,
target: PendingActionTarget.ASSEMBLY,
assemblyFileId: null,
dbxFileId: entry.id,
})

if (!pending) {
logger.info('SyncService#createLeafFileInAssembly :: race lost, skipping', {
channelSyncId,
itemPath,
object: ObjectType.FILE,
target: PendingActionTarget.ASSEMBLY,
assemblyFileId: null,
dbxFileId: entry.id,
itemPath,
})
return
}

if (!pending) {
logger.info('createAndUploadFileToAssembly :: race lost, skipping', {
channelSyncId,
dbxFileId: entry.id,
itemPath,
})
return
}
try {
await this.completePendingAssemblyCreate({
pendingRowId: pending.id,
itemPath,
assemblyChannelId,
channelSyncId,
entry,
})
} catch (error) {
await this.mapFilesService.markFailure(pending.id, normalizeError(error))
throw error
}
}

try {
await this.completePendingAssemblyCreate({
pendingRowId: pending.id,
itemPath,
assemblyChannelId,
/** Folder create: pre-check skips redundant creates, insertFileMap's onConflict is the race net (OUT-3800). */
private async createFolderInAssembly(params: {
assemblyChannelId: string
itemPath: string
lastItem: boolean
tempFileType: ObjectTypeValue
channelSyncId: string
entry: DropboxFileListFolderSingleEntry
basePath: string
}): Promise<void> {
const { assemblyChannelId, itemPath, lastItem, tempFileType, channelSyncId, entry, basePath } =
params

try {
// A sibling may have already created this folder; skip the redundant create.
const existingFolder = await this.mapFilesService.getDbxMappedFileFromPath(
itemPath,
channelSyncId,
)
if (existingFolder) {
logger.info(
'SyncService#createFolderInAssembly :: folder already mapped, skipping create',
{
channelSyncId,
itemPath,
},
)
// If this entry is the folder itself, make sure its dbxFileId is stamped.
await this.handleFolderCreatedCase(
lastItem,
tempFileType,
channelSyncId,
entry,
})
} catch (error) {
await this.mapFilesService.markFailure(pending.id, normalizeError(error))
throw error
basePath,
entry.id,
)
return
}
return
}

try {
const copilotApi = new CopilotAPI(this.user.token)
const fileCreateResponse = await copilotApi.createFile(
itemPath,
assemblyChannelId,
Expand All @@ -200,14 +262,24 @@ export class SyncService extends AuthenticatedDropboxService {
object: tempFileType,
assemblyFileId: fileCreateResponse.id,
portalId: this.user.portalId,
dbxFileId: lastItem ? entry.id : null,
}

await this.mapFilesService.insertFileMap({
...filePayload,
dbxFileId: lastItem ? entry.id : null,
})
const inserted = await this.mapFilesService.insertFileMap(filePayload)
Comment thread
SandipBajracharya marked this conversation as resolved.

await this.mapFilesService.updateChannelMapSyncedFilesCount(channelSyncId)
if (inserted) {
await this.mapFilesService.updateChannelMapSyncedFilesCount(channelSyncId)
} else {
// Insert lost the race. If this is the folder entry itself, stamp dbxFileId:
// needed if the winner wrote null (intermediate segment), else a no-op.
await this.handleFolderCreatedCase(
lastItem,
tempFileType,
channelSyncId,
basePath,
entry.id,
)
}
} catch (error: unknown) {
Comment thread
SandipBajracharya marked this conversation as resolved.
if (
isCopilotApiError(error) &&
Expand All @@ -225,7 +297,7 @@ export class SyncService extends AuthenticatedDropboxService {
return
}
console.error(
`SyncService#createAndUploadFileToAssembly. Upload failed. Channel ID: ${assemblyChannelId}. Path: ${itemPath}`,
`SyncService#createFolderInAssembly. Upload failed. Channel ID: ${assemblyChannelId}. Path: ${itemPath}`,
)
throw error
}
Expand Down
49 changes: 48 additions & 1 deletion src/features/sync/lib/__tests__/MapFiles.tombstone.test.ts
Original file line number Diff line number Diff line change
Expand Up @@ -9,6 +9,9 @@ const returningSpy = vi.fn()
// For .insert(...).values(...).onConflictDoNothing(...).returning()
const valuesSpy = vi.fn()
const onConflictSpy = vi.fn()
// Controls what the insert chain's .returning() resolves to, so tests can
// simulate both a successful insert and an onConflictDoNothing no-op (empty).
let insertReturning: unknown[] = [{ id: 'row-1' }]

vi.mock('@/db', () => {
const builder = {
Expand All @@ -34,7 +37,7 @@ vi.mock('@/db', () => {
onConflictSpy(opts)
return insertBuilder
},
returning: () => Promise.resolve([{ id: 'row-1' }]),
returning: () => Promise.resolve(insertReturning),
}
return {
default: {
Expand Down Expand Up @@ -76,6 +79,7 @@ beforeEach(() => {
returningSpy.mockClear()
valuesSpy.mockClear()
onConflictSpy.mockClear()
insertReturning = [{ id: 'row-1' }]
service = new MapFilesService(user, connectionToken)
})

Expand Down Expand Up @@ -189,6 +193,49 @@ describe('markUpdated', () => {
})
})

describe('insertFileMap', () => {
const folderPayload = {
portalId,
channelSyncId: 'cs-1',
itemPath: '/abc',
object: ObjectType.FOLDER,
assemblyFileId: '00000000-0000-0000-0000-000000000001',
dbxFileId: null,
}

it('dedupes on (portal, channel, assemblyFileId) with the partial-index predicate', async () => {
await service.insertFileMap(folderPayload)

expect(onConflictSpy).toHaveBeenCalledTimes(1)
const opts = onConflictSpy.mock.calls[0][0] as { target: unknown; where: unknown }

// Conflict target must be the existing assembly unique index columns.
const targetCols = (opts.target as unknown[]).map((c) => (c as { name: string }).name).join(' ')
expect(targetCols).toMatch(/portal[_]?id/i)
expect(targetCols).toMatch(/channel[_]?sync[_]?id/i)
expect(targetCols).toMatch(/assembly[_]?file[_]?id/i)

// WHERE must mirror the partial index predicate exactly.
const where = sqlText(opts.where)
expect(where).toMatch(/deleted[_]?at/i)
expect(where).toMatch(/IS NULL/i)
expect(where).toMatch(/assembly[_]?file[_]?id/i)
expect(where).toMatch(/IS NOT NULL/i)
})

it('returns the inserted row when no conflict', async () => {
insertReturning = [{ id: 'row-1' }]
const result = await service.insertFileMap(folderPayload)
expect(result).toEqual({ id: 'row-1' })
})

it('returns null when the row already exists (onConflictDoNothing no-op)', async () => {
insertReturning = []
const result = await service.insertFileMap(folderPayload)
expect(result).toBeNull()
})
})

describe('insertCreatePending', () => {
it('stamps pendingActionLastAttemptAt = NOW() so the sweeper backoff guards in-flight rows', async () => {
await service.insertCreatePending({
Expand Down
Loading