Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
3 changes: 3 additions & 0 deletions CHANGELOG.md
Original file line number Diff line number Diff line change
Expand Up @@ -7,6 +7,9 @@ and this project adheres to [Semantic Versioning](https://semver.org/spec/v2.0.0

## [Unreleased]

### Fixed
- Fixed repos not being re-indexed when their zoekt index shards are missing from disk (e.g., when the index directory is stored on ephemeral storage) while the database still marks them as indexed. [#1304](https://github.com/sourcebot-dev/sourcebot/pull/1304)

## [5.0.2] - 2026-06-11

### Changed
Expand Down
131 changes: 131 additions & 0 deletions packages/backend/src/repoIndexManager.test.ts
Original file line number Diff line number Diff line change
Expand Up @@ -37,6 +37,7 @@ vi.mock('@sourcebot/shared', () => ({
vi.mock('./constants.js', () => ({
WORKER_STOP_GRACEFUL_TIMEOUT_MS: 5000,
INDEX_CACHE_DIR: 'test-data/index',
REPOS_CACHE_DIR: 'test-data/repos',
}));

vi.mock('./git.js', () => ({
Expand Down Expand Up @@ -65,6 +66,13 @@ vi.mock('./posthog.js', () => ({
vi.mock('./utils.js', () => ({
getAuthCredentialsForRepo: vi.fn().mockResolvedValue(null),
getShardPrefix: vi.fn((orgId: number, repoId: number) => `${orgId}_${repoId}`),
getRepoIdFromShardFileName: vi.fn((fileName: string) => {
const match = fileName.match(/^(\d+)_(\d+)_/);
if (!match) {
return undefined;
}
return parseInt(match[2], 10);
}),
measure: vi.fn(async (cb: () => Promise<unknown>) => {
const data = await cb();
return { data, durationMs: 100 };
Expand Down Expand Up @@ -148,6 +156,7 @@ const createMockPrisma = () => {
repo: {
findMany: vi.fn().mockResolvedValue([]),
update: vi.fn(),
updateMany: vi.fn(),
delete: vi.fn(),
},
repoIndexingJob: {
Expand Down Expand Up @@ -783,6 +792,128 @@ describe('RepoIndexManager', () => {
});
});

describe('Missing Shard Reconciliation', () => {
const indexedRepo = (id: number, name: string) => createMockRepo({
id,
name,
indexedAt: new Date(),
indexedCommitHash: 'abc123',
});

test('clears indexedAt for indexed repos whose shard files are missing on startup', async () => {
(existsSync as Mock).mockImplementation((path: string) => path === 'test-data/index');
// Repo 1 has a shard on disk; repo 2 does not.
(readdir as Mock).mockResolvedValue(['1_1_v16.00000.zoekt']);
(mockPrisma.repo.findMany as Mock).mockResolvedValue([
indexedRepo(1, 'repo-with-shard'),
indexedRepo(2, 'repo-missing-shard'),
]);

manager = new RepoIndexManager(mockPrisma, mockSettings, mockRedis, mockPromClient as any);
await manager.startScheduler();

expect(mockPrisma.repo.updateMany).toHaveBeenCalledWith({
where: { id: { in: [2] } },
data: { indexedAt: null },
});
});

test('does not touch repos when all shards are present', async () => {
(existsSync as Mock).mockImplementation((path: string) => path === 'test-data/index');
(readdir as Mock).mockResolvedValue(['1_1_v16.00000.zoekt', '1_2_v16.00000.zoekt']);
(mockPrisma.repo.findMany as Mock).mockResolvedValue([
indexedRepo(1, 'repo-1'),
indexedRepo(2, 'repo-2'),
]);

manager = new RepoIndexManager(mockPrisma, mockSettings, mockRedis, mockPromClient as any);
await manager.startScheduler();

expect(mockPrisma.repo.updateMany).not.toHaveBeenCalled();
});

test('marks all indexed repos as stale when the index directory is missing', async () => {
(existsSync as Mock).mockReturnValue(false);
(mockPrisma.repo.findMany as Mock).mockResolvedValue([
indexedRepo(1, 'repo-1'),
indexedRepo(2, 'repo-2'),
]);

manager = new RepoIndexManager(mockPrisma, mockSettings, mockRedis, mockPromClient as any);
await manager.startScheduler();

expect(mockPrisma.repo.updateMany).toHaveBeenCalledWith({
where: { id: { in: [1, 2] } },
data: { indexedAt: null },
});
});

test('does not count temporary shard files as valid shards', async () => {
(existsSync as Mock).mockImplementation((path: string) => path === 'test-data/index');
(readdir as Mock).mockResolvedValue(['1_2_v16.00000.zoekt123.tmp']);
(mockPrisma.repo.findMany as Mock).mockResolvedValue([
indexedRepo(2, 'repo-with-only-tmp-shard'),
]);

manager = new RepoIndexManager(mockPrisma, mockSettings, mockRedis, mockPromClient as any);
await manager.startScheduler();

expect(mockPrisma.repo.updateMany).toHaveBeenCalledWith({
where: { id: { in: [2] } },
data: { indexedAt: null },
});
});

test('only considers repos that are indexed, non-empty, and connected', async () => {
(existsSync as Mock).mockImplementation((path: string) => path === 'test-data/index');
(readdir as Mock).mockResolvedValue([]);
(mockPrisma.repo.findMany as Mock).mockResolvedValue([]);

manager = new RepoIndexManager(mockPrisma, mockSettings, mockRedis, mockPromClient as any);
await manager.startScheduler();

// The reconciliation query must exclude unindexed repos (nothing to mark),
// empty repos (indexing completes without producing a shard), and
// unconnected repos (clearing indexedAt would bypass the GC grace period).
expect(mockPrisma.repo.findMany).toHaveBeenCalledWith(
expect.objectContaining({
where: expect.objectContaining({
indexedAt: { not: null },
indexedCommitHash: { not: null },
connections: { some: {} },
}),
})
);

expect(mockPrisma.repo.updateMany).not.toHaveBeenCalled();
});

test('reconciles on every scheduler poll, not just startup', async () => {
(existsSync as Mock).mockImplementation((path: string) => path === 'test-data/index');
(readdir as Mock).mockResolvedValue(['1_1_v16.00000.zoekt']);
(mockPrisma.repo.findMany as Mock).mockResolvedValue([]);

manager = new RepoIndexManager(mockPrisma, mockSettings, mockRedis, mockPromClient as any);
await manager.startScheduler();

// Simulate the index directory being wiped while the worker is running,
// with repo 1 still marked as indexed in the DB.
(readdir as Mock).mockResolvedValue([]);
(mockPrisma.repo.findMany as Mock).mockResolvedValue([
indexedRepo(1, 'repo-1'),
]);

const { setIntervalAsync } = await import('./utils.js');
const tick = (setIntervalAsync as Mock).mock.calls[0][0];
await tick();

expect(mockPrisma.repo.updateMany).toHaveBeenCalledWith({
where: { id: { in: [1] } },
data: { indexedAt: null },
});
});
});

describe('latestIndexingJobStatus Updates', () => {
test('sets latestIndexingJobStatus to IN_PROGRESS when job starts', async () => {
const repo = createMockRepoWithConnections();
Expand Down
62 changes: 62 additions & 0 deletions packages/backend/src/repoIndexManager.ts
Original file line number Diff line number Diff line change
Expand Up @@ -100,7 +100,9 @@ export class RepoIndexManager {
logger.debug('Starting scheduler');
// Cleanup any orphaned disk resources on startup
await this.cleanupOrphanedDiskResources();
await this.markReposWithMissingShardsAsStale();
this.interval = setIntervalAsync(async () => {
await this.markReposWithMissingShardsAsStale();
await this.scheduleIndexJobs();
await this.scheduleCleanupJobs();
}, this.settings.reindexRepoPollingIntervalMs);
Expand Down Expand Up @@ -682,6 +684,66 @@ export class RepoIndexManager {
}
}

// Detects repos that are marked as indexed in the database but have no
// index shards on disk (e.g., because the index directory lives on
// ephemeral storage and was lost), and clears their `indexedAt` so the
// scheduler re-indexes them. This is the inverse of
// `cleanupOrphanedDiskResources`.
private async markReposWithMissingShardsAsStale() {
// @note: the DB is queried *before* the disk is scanned so that a repo
// whose first index job completes between the two reads is not falsely
// marked as stale (its shard is guaranteed to be visible by the time it
// appears in the query result).
//
// Empty repositories are excluded (via `indexedCommitHash`) since they
// complete indexing without producing a shard. Unconnected repos are
// excluded since clearing `indexedAt` would bypass the garbage
// collection grace period in `scheduleCleanupJobs`.
const indexedRepos = await this.db.repo.findMany({
where: {
indexedAt: { not: null },
indexedCommitHash: { not: null },
connections: { some: {} },
},
select: {
id: true,
name: true,
},
});

if (indexedRepos.length === 0) {
return;
}

const repoIdsWithShards = new Set<number>();
if (existsSync(INDEX_CACHE_DIR)) {
const entries = await readdir(INDEX_CACHE_DIR);
Comment on lines +718 to +720

Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

⚠️ Potential issue | 🟠 Major | ⚡ Quick win

Handle ENOENT from the directory scan directly.

If INDEX_CACHE_DIR disappears between Line 719 and Line 720, readdir() throws and this poll aborts instead of recovering by marking repos stale. That is the exact failure mode this reconciliation is meant to survive. Prefer calling readdir() directly and treating ENOENT as “no shards present”.

Suggested fix
-        const repoIdsWithShards = new Set<number>();
-        if (existsSync(INDEX_CACHE_DIR)) {
-            const entries = await readdir(INDEX_CACHE_DIR);
-            for (const entry of entries) {
-                // Ignore temporary files (e.g., `.tmp` files from in-flight or
-                // failed indexing operations) - only completed shards count.
-                if (!entry.endsWith('.zoekt')) {
-                    continue;
-                }
-                const repoId = getRepoIdFromShardFileName(entry);
-                if (repoId !== undefined) {
-                    repoIdsWithShards.add(repoId);
-                }
-            }
-        }
+        const repoIdsWithShards = new Set<number>();
+        let entries: string[] = [];
+        try {
+            entries = await readdir(INDEX_CACHE_DIR);
+        } catch (error) {
+            if ((error as NodeJS.ErrnoException).code !== 'ENOENT') {
+                throw error;
+            }
+        }
+
+        for (const entry of entries) {
+            // Ignore temporary files (e.g., `.tmp` files from in-flight or
+            // failed indexing operations) - only completed shards count.
+            if (!entry.endsWith('.zoekt')) {
+                continue;
+            }
+            const repoId = getRepoIdFromShardFileName(entry);
+            if (repoId !== undefined) {
+                repoIdsWithShards.add(repoId);
+            }
+        }
📝 Committable suggestion

‼️ IMPORTANT
Carefully review the code before committing. Ensure that it accurately replaces the highlighted code, contains no missing lines, and has no issues with indentation. Thoroughly test & benchmark the code to ensure it meets the requirements.

Suggested change
const repoIdsWithShards = new Set<number>();
if (existsSync(INDEX_CACHE_DIR)) {
const entries = await readdir(INDEX_CACHE_DIR);
const repoIdsWithShards = new Set<number>();
let entries: string[] = [];
try {
entries = await readdir(INDEX_CACHE_DIR);
} catch (error) {
if ((error as NodeJS.ErrnoException).code !== 'ENOENT') {
throw error;
}
}
for (const entry of entries) {
// Ignore temporary files (e.g., `.tmp` files from in-flight or
// failed indexing operations) - only completed shards count.
if (!entry.endsWith('.zoekt')) {
continue;
}
const repoId = getRepoIdFromShardFileName(entry);
if (repoId !== undefined) {
repoIdsWithShards.add(repoId);
}
}
🤖 Prompt for AI Agents
Verify each finding against current code. Fix only still-valid issues, skip the
rest with a brief reason, keep changes minimal, and validate.

In `@packages/backend/src/repoIndexManager.ts` around lines 718 - 720, Replace the
two-step existsSync + readdir pattern so the directory scan tolerates a race:
call readdir(INDEX_CACHE_DIR) directly inside a try/catch around the call used
to populate entries (the block that constructs repoIdsWithShards), and if the
catch error.code === 'ENOENT' treat it as an empty directory (i.e. leave
repoIdsWithShards empty) instead of aborting the poll; for other errors rethrow
or log as before. Ensure you update the logic that uses entries to handle the
empty-case correctly.

for (const entry of entries) {
// Ignore temporary files (e.g., `.tmp` files from in-flight or
// failed indexing operations) - only completed shards count.
if (!entry.endsWith('.zoekt')) {
continue;
}
const repoId = getRepoIdFromShardFileName(entry);
if (repoId !== undefined) {
repoIdsWithShards.add(repoId);
}
}
}

const staleRepos = indexedRepos.filter(repo => !repoIdsWithShards.has(repo.id));
if (staleRepos.length === 0) {
return;
}

logger.warn(`Found ${staleRepos.length} repo(s) marked as indexed but with no index shards on disk. Marking as stale for re-indexing: ${staleRepos.map(repo => repo.name).join(', ')}`);

await this.db.repo.updateMany({
where: { id: { in: staleRepos.map(repo => repo.id) } },
data: { indexedAt: null },
});
Comment on lines +702 to +744

Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

⚠️ Potential issue | 🟠 Major | 🏗️ Heavy lift

Re-apply the stale-marking preconditions at write time.

staleRepos is derived from a snapshot, but Line 741 later clears indexedAt by id only. If a repo loses its last connection or finishes a reindex after the findMany() but before updateMany(), this write can either bypass the GC grace period or clobber a freshly-written indexedAt, which then triggers an unnecessary extra reindex. The update should re-check the repo is still connected and still has the same indexedAt value that was observed during selection.

Suggested fix
         const indexedRepos = await this.db.repo.findMany({
             where: {
                 indexedAt: { not: null },
                 indexedCommitHash: { not: null },
                 connections: { some: {} },
             },
             select: {
                 id: true,
                 name: true,
+                indexedAt: true,
             },
         });
@@
-        await this.db.repo.updateMany({
-            where: { id: { in: staleRepos.map(repo => repo.id) } },
-            data: { indexedAt: null },
-        });
+        await this.db.$transaction(
+            staleRepos.map((repo) =>
+                this.db.repo.updateMany({
+                    where: {
+                        id: repo.id,
+                        indexedAt: repo.indexedAt,
+                        indexedCommitHash: { not: null },
+                        connections: { some: {} },
+                    },
+                    data: { indexedAt: null },
+                })
+            )
+        );
📝 Committable suggestion

‼️ IMPORTANT
Carefully review the code before committing. Ensure that it accurately replaces the highlighted code, contains no missing lines, and has no issues with indentation. Thoroughly test & benchmark the code to ensure it meets the requirements.

Suggested change
const indexedRepos = await this.db.repo.findMany({
where: {
indexedAt: { not: null },
indexedCommitHash: { not: null },
connections: { some: {} },
},
select: {
id: true,
name: true,
},
});
if (indexedRepos.length === 0) {
return;
}
const repoIdsWithShards = new Set<number>();
if (existsSync(INDEX_CACHE_DIR)) {
const entries = await readdir(INDEX_CACHE_DIR);
for (const entry of entries) {
// Ignore temporary files (e.g., `.tmp` files from in-flight or
// failed indexing operations) - only completed shards count.
if (!entry.endsWith('.zoekt')) {
continue;
}
const repoId = getRepoIdFromShardFileName(entry);
if (repoId !== undefined) {
repoIdsWithShards.add(repoId);
}
}
}
const staleRepos = indexedRepos.filter(repo => !repoIdsWithShards.has(repo.id));
if (staleRepos.length === 0) {
return;
}
logger.warn(`Found ${staleRepos.length} repo(s) marked as indexed but with no index shards on disk. Marking as stale for re-indexing: ${staleRepos.map(repo => repo.name).join(', ')}`);
await this.db.repo.updateMany({
where: { id: { in: staleRepos.map(repo => repo.id) } },
data: { indexedAt: null },
});
const indexedRepos = await this.db.repo.findMany({
where: {
indexedAt: { not: null },
indexedCommitHash: { not: null },
connections: { some: {} },
},
select: {
id: true,
name: true,
indexedAt: true,
},
});
if (indexedRepos.length === 0) {
return;
}
const repoIdsWithShards = new Set<number>();
if (existsSync(INDEX_CACHE_DIR)) {
const entries = await readdir(INDEX_CACHE_DIR);
for (const entry of entries) {
// Ignore temporary files (e.g., `.tmp` files from in-flight or
// failed indexing operations) - only completed shards count.
if (!entry.endsWith('.zoekt')) {
continue;
}
const repoId = getRepoIdFromShardFileName(entry);
if (repoId !== undefined) {
repoIdsWithShards.add(repoId);
}
}
}
const staleRepos = indexedRepos.filter(repo => !repoIdsWithShards.has(repo.id));
if (staleRepos.length === 0) {
return;
}
logger.warn(`Found ${staleRepos.length} repo(s) marked as indexed but with no index shards on disk. Marking as stale for re-indexing: ${staleRepos.map(repo => repo.name).join(', ')}`);
await this.db.$transaction(
staleRepos.map((repo) =>
this.db.repo.updateMany({
where: {
id: repo.id,
indexedAt: repo.indexedAt,
indexedCommitHash: { not: null },
connections: { some: {} },
},
data: { indexedAt: null },
})
)
);
🤖 Prompt for AI Agents
Verify each finding against current code. Fix only still-valid issues, skip the
rest with a brief reason, keep changes minimal, and validate.

In `@packages/backend/src/repoIndexManager.ts` around lines 702 - 744, staleRepos
were selected from a snapshot but the subsequent updateMany only filters by id,
which can clear indexedAt incorrectly if a repo lost its connections or its
indexedAt changed; instead, re-apply the preconditions at write time by updating
only rows that still match both the original indexedAt value and still have
connections: for example iterate staleRepos and for each call
this.db.repo.updateMany (or a single updateMany with a where: { OR: [...] })
where each clause is { id: repo.id, indexedAt: repo.indexedAt, connections: {
some: {} } } so you only set indexedAt: null when the repo still has the same
indexedAt and at least one connection.

}

// Scans the repos and index directories on disk and removes any entries
// that have no corresponding Repo record in the database. This handles
// edge cases where the DB and disk resources are out of sync.
Expand Down