Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
125 changes: 125 additions & 0 deletions backend/shared/query/__tests__/slowQueryMonitor.test.ts
Original file line number Diff line number Diff line change
@@ -0,0 +1,125 @@
import {
SlowQueryMonitor,
fingerprintSql,
SlowQueryEvent,
QueryClient,
} from '../slowQueryMonitor';

/**
* Fake client backed by a controllable clock. `t` is the monitor's "now"; each
* query advances it by the next value in `durations`, so the measured duration
* is fully deterministic.
*/
function makeHarness(durations: number[], opts?: { rows?: number; throwOn?: number }) {
let t = 0;
let call = 0;
const now = () => t;
const client: QueryClient = {
async query<T>(): Promise<{ rows: T[] }> {
const idx = call;
call += 1;
t += durations[idx] ?? 0;
if (opts?.throwOn === idx) {
throw new Error('boom');
}
const rows = Array.from({ length: opts?.rows ?? 0 }, () => ({})) as T[];
return { rows };
},
};
return { client, now };
}

describe('fingerprintSql', () => {
it('groups queries that differ only in whitespace/comments', () => {
const a = fingerprintSql('SELECT * FROM usage_alerts\n WHERE subscription_id = $1');
const b = fingerprintSql('-- hot path\nSELECT * FROM usage_alerts WHERE subscription_id = $1');
expect(a).toBe(b);
});
});

describe('SlowQueryMonitor', () => {
it('passes rows through unchanged', async () => {
const { client, now } = makeHarness([5], { rows: 3 });
const monitor = new SlowQueryMonitor(client, { now });
const result = await monitor.query('SELECT 1');
expect(result.rows).toHaveLength(3);
});

it('computes p50/p95/p99 per fingerprint', async () => {
const durations = [10, 20, 30, 40, 50, 60, 70, 80, 90, 100];
const { client, now } = makeHarness(durations);
const monitor = new SlowQueryMonitor(client, { now, slowThresholdMs: 1000 });

for (let i = 0; i < durations.length; i += 1) {
await monitor.query('SELECT * FROM subscriptions WHERE user_id = $1', ['u']);
}

const stats = monitor.getStats();
expect(stats).toHaveLength(1);
expect(stats[0].count).toBe(10);
expect(stats[0].maxMs).toBe(100);
expect(stats[0].p50Ms).toBe(50);
expect(stats[0].p95Ms).toBe(100);
expect(stats[0].p99Ms).toBe(100);
expect(stats[0].slowCount).toBe(0);
});

it('fires onSlowQuery only at/above the threshold', async () => {
const events: SlowQueryEvent[] = [];
const { client, now } = makeHarness([50, 100, 150]);
const monitor = new SlowQueryMonitor(client, {
now,
slowThresholdMs: 100,
onSlowQuery: (e) => events.push(e),
});

await monitor.query('SELECT 1'); // 50ms — fast
await monitor.query('SELECT 2'); // 100ms — slow (>= threshold)
await monitor.query('SELECT 3'); // 150ms — slow

expect(events).toHaveLength(2);
expect(events[0].durationMs).toBe(100);
expect(events[1].durationMs).toBe(150);
expect(events.every((e) => !e.failed)).toBe(true);
});

it('ranks the slowest patterns first via getTopSlow', async () => {
const { client, now } = makeHarness([10, 500]);
const monitor = new SlowQueryMonitor(client, { now, slowThresholdMs: 1000 });

await monitor.query('SELECT * FROM plans');
await monitor.query('SELECT * FROM transactions WHERE user_id = $1', ['u']);

const top = monitor.getTopSlow(1);
expect(top).toHaveLength(1);
expect(top[0].sample).toContain('transactions');
expect(top[0].p95Ms).toBe(500);
});

it('records timing and rethrows on query failure', async () => {
const events: SlowQueryEvent[] = [];
const { client, now } = makeHarness([200], { throwOn: 0 });
const monitor = new SlowQueryMonitor(client, {
now,
slowThresholdMs: 100,
onSlowQuery: (e) => events.push(e),
});

await expect(monitor.query('SELECT * FROM broken')).rejects.toThrow('boom');

const stats = monitor.getStats();
expect(stats[0].count).toBe(1);
expect(stats[0].maxMs).toBe(200);
expect(events).toHaveLength(1);
expect(events[0].failed).toBe(true);
});

it('reset clears collected stats', async () => {
const { client, now } = makeHarness([10]);
const monitor = new SlowQueryMonitor(client, { now });
await monitor.query('SELECT 1');
expect(monitor.getStats()).toHaveLength(1);
monitor.reset();
expect(monitor.getStats()).toHaveLength(0);
});
});
178 changes: 178 additions & 0 deletions backend/shared/query/slowQueryMonitor.ts
Original file line number Diff line number Diff line change
@@ -0,0 +1,178 @@
/**
* SlowQueryMonitor
*
* Transparent wrapper around a pg-style query client that times every query,
* groups timings by a normalized SQL fingerprint, exposes per-pattern latency
* percentiles (p50/p95/p99), and fires an alert callback when a query exceeds a
* configurable slow threshold.
*
* It is the runtime counterpart to the offline pg_stat_statements profiling
* documented in db/QUERY_OPTIMIZATION.md: use `getTopSlow(20)` to surface the
* 20 slowest query patterns and `onSlowQuery` to wire alerting.
*
* Usage:
* const monitor = new SlowQueryMonitor(pool, {
* slowThresholdMs: 100,
* onSlowQuery: (e) => logger.warn('slow query', e),
* });
* await monitor.query('SELECT ...', [userId]); // drop-in for pool.query
* const worst = monitor.getTopSlow(20);
*/

import { normalizeSql } from '../db/queryClassifier';

export interface QueryClient {
query<T = unknown>(sql: string, params?: unknown[]): Promise<{ rows: T[] }>;
}

export interface SlowQueryEvent {
fingerprint: string;
sql: string;
durationMs: number;
rowCount: number;
failed: boolean;
timestamp: number;
}

export interface QueryStat {
fingerprint: string;
sample: string;
count: number;
slowCount: number;
totalMs: number;
maxMs: number;
p50Ms: number;
p95Ms: number;
p99Ms: number;
}

export interface SlowQueryMonitorOptions {
/** Queries at or above this duration fire `onSlowQuery`. Default 100ms. */
slowThresholdMs?: number;
/** Ring-buffer size of retained durations per fingerprint. Default 1000. */
maxSamplesPerQuery?: number;
/** Alerting hook invoked for every slow query. */
onSlowQuery?: (event: SlowQueryEvent) => void;
/** Injectable monotonic clock (ms). Defaults to Date.now for production. */
now?: () => number;
}

interface Bucket {
sample: string;
count: number;
slowCount: number;
totalMs: number;
maxMs: number;
durations: number[];
}

const DEFAULT_SLOW_THRESHOLD_MS = 100;
const DEFAULT_MAX_SAMPLES = 1000;

/**
* Collapse a SQL string into a stable grouping key: strip comments (via the
* shared normalizer), collapse all runs of whitespace, and trim. Queries here
* are parameterized ($1, $2 …) so the residual text is stable per call site.
*/
export function fingerprintSql(sql: string): string {
return normalizeSql(sql).replace(/\s+/g, ' ').trim();
}

function percentile(sortedAsc: number[], p: number): number {
if (sortedAsc.length === 0) return 0;
const rank = Math.ceil((p / 100) * sortedAsc.length) - 1;
const index = Math.min(Math.max(rank, 0), sortedAsc.length - 1);
return sortedAsc[index];
}

export class SlowQueryMonitor implements QueryClient {
private readonly client: QueryClient;
private readonly slowThresholdMs: number;
private readonly maxSamplesPerQuery: number;
private readonly onSlowQuery?: (event: SlowQueryEvent) => void;
private readonly now: () => number;
private readonly buckets = new Map<string, Bucket>();

constructor(client: QueryClient, options: SlowQueryMonitorOptions = {}) {
this.client = client;
this.slowThresholdMs = options.slowThresholdMs ?? DEFAULT_SLOW_THRESHOLD_MS;
this.maxSamplesPerQuery = options.maxSamplesPerQuery ?? DEFAULT_MAX_SAMPLES;
this.onSlowQuery = options.onSlowQuery;
this.now = options.now ?? Date.now;
}

async query<T = unknown>(sql: string, params?: unknown[]): Promise<{ rows: T[] }> {
const start = this.now();
let rowCount = 0;
let failed = false;
try {
const result = await this.client.query<T>(sql, params);
rowCount = result.rows.length;
return result;
} catch (error) {
failed = true;
throw error;
} finally {
const durationMs = this.now() - start;
this.record(sql, durationMs, rowCount, failed);
}
}

private record(sql: string, durationMs: number, rowCount: number, failed: boolean): void {
const fingerprint = fingerprintSql(sql);
let bucket = this.buckets.get(fingerprint);
if (!bucket) {
bucket = { sample: sql, count: 0, slowCount: 0, totalMs: 0, maxMs: 0, durations: [] };
this.buckets.set(fingerprint, bucket);
}

bucket.count += 1;
bucket.totalMs += durationMs;
if (durationMs > bucket.maxMs) bucket.maxMs = durationMs;
bucket.durations.push(durationMs);
if (bucket.durations.length > this.maxSamplesPerQuery) {
bucket.durations.shift();
}

if (durationMs >= this.slowThresholdMs) {
bucket.slowCount += 1;
this.onSlowQuery?.({
fingerprint,
sql,
durationMs,
rowCount,
failed,
timestamp: this.now(),
});
}
}

/** Per-pattern latency stats, sorted by p95 descending. */
getStats(): QueryStat[] {
const stats: QueryStat[] = [];
for (const [fingerprint, bucket] of this.buckets) {
const sorted = [...bucket.durations].sort((a, b) => a - b);
stats.push({
fingerprint,
sample: bucket.sample,
count: bucket.count,
slowCount: bucket.slowCount,
totalMs: bucket.totalMs,
maxMs: bucket.maxMs,
p50Ms: percentile(sorted, 50),
p95Ms: percentile(sorted, 95),
p99Ms: percentile(sorted, 99),
});
}
return stats.sort((a, b) => b.p95Ms - a.p95Ms);
}

/** The `limit` slowest query patterns by p95 (default 20). */
getTopSlow(limit = 20): QueryStat[] {
return this.getStats().slice(0, limit);
}

reset(): void {
this.buckets.clear();
}
}
Loading