diff --git a/.env.example b/.env.example index bde0ab3..a9a0a84 100644 --- a/.env.example +++ b/.env.example @@ -185,3 +185,21 @@ SLOW_QUERY_POLL_INTERVAL_MS=300000 # Deduplication window per query fingerprint (seconds). A query that was # already alerted on will not fire again within this window. Default: 3600 (1h). SLOW_QUERY_DEDUP_WINDOW_SECONDS=3600 + +# ----------------------------------------------------------------------------- +# Usage Anomaly Detector — 5-minute rolling baseline per developer +# Compares the latest 5-minute window to the mean of the trailing 12 windows. +# When traffic exceeds baseline * multiplier, emits usage.anomaly.detected. +# ----------------------------------------------------------------------------- +# Set to false to disable the background worker. +USAGE_ANOMALY_DETECTOR_ENABLED=true +# Traffic multiplier threshold (default 5x baseline). +USAGE_ANOMALY_MULTIPLIER=5 +# Poll interval in milliseconds (default 300000 = 5 min). +USAGE_ANOMALY_POLL_INTERVAL_MS=300000 +# Window size in milliseconds (default 300000 = 5 min). +USAGE_ANOMALY_WINDOW_MS=300000 +# Number of trailing windows used for the baseline mean (default 12). +USAGE_ANOMALY_BASELINE_WINDOWS=12 +# Optional dedup window per developer/window (defaults to USAGE_ANOMALY_WINDOW_MS). +# USAGE_ANOMALY_DEDUP_WINDOW_MS=300000 diff --git a/README.md b/README.md index cb4de9a..426166e 100644 --- a/README.md +++ b/README.md @@ -25,6 +25,7 @@ API gateway, usage metering, and billing services for the Callora API marketplac - `POST /api/apis` for authenticated developers to register an API with priced endpoints - Usage route: `GET /api/usage` - Admin usage anomalies: `GET /api/admin/usage/anomalies` returns per-API daily usage anomalies (z-score spikes/drops) for admin review, filterable by `from`/`to`/`apiId`/`threshold`/`limit` (admin auth + IP allowlist) +- Usage anomaly detector: background worker emits `usage.anomaly.detected` when per-developer 5-minute traffic exceeds a rolling 12-window baseline by a configurable multiplier (see `docs/usage-anomaly-detector.md`) - JSON body parsing plus gateway API key authentication for upstream proxy routes - Per-user global REST rate limiting for authenticated `/api/billing`, `/api/usage`, `/api/developers`, `/api/vault`, and `/api/keys` traffic, with IP fallback for unauthenticated requests - In-memory `VaultRepository` with: diff --git a/docs/usage-anomaly-detector.md b/docs/usage-anomaly-detector.md new file mode 100644 index 0000000..14a1812 --- /dev/null +++ b/docs/usage-anomaly-detector.md @@ -0,0 +1,75 @@ +# Usage Anomaly Detector + +Background worker that compares each developer's latest 5-minute API call +volume against a rolling baseline and emits `usage.anomaly.detected` when +traffic exceeds a configurable multiplier (default **5×**). + +## How it works + +1. Every `USAGE_ANOMALY_POLL_INTERVAL_MS` (default 5 min) the worker scans + developers with recent `usage_events` activity. +2. For each developer, call counts are bucketed into fixed 5-minute windows. +3. **Baseline** = arithmetic mean of the trailing **12** completed windows (configurable). +4. The **most recently completed** 5-minute window is compared to `baseline × multiplier`. +5. When the threshold is exceeded, the worker emits `usage.anomaly.detected` + through the typed event emitter, which fans out to matching developer + webhook subscriptions. + +Missing windows in the series are treated as **zero calls** so quiet periods +do not inflate the baseline. + +## Configuration + +| Variable | Default | Description | +|----------|---------|-------------| +| `USAGE_ANOMALY_DETECTOR_ENABLED` | `true` | Set to `false` to disable the worker | +| `USAGE_ANOMALY_MULTIPLIER` | `5` | Traffic must exceed `baseline × multiplier` | +| `USAGE_ANOMALY_POLL_INTERVAL_MS` | `300000` | Scan interval in ms (5 min) | +| `USAGE_ANOMALY_WINDOW_MS` | `300000` | Window size in ms (5 min) | +| `USAGE_ANOMALY_BASELINE_WINDOWS` | `12` | Trailing windows used for the baseline mean | +| `USAGE_ANOMALY_DEDUP_WINDOW_MS` | `USAGE_ANOMALY_WINDOW_MS` | Suppress duplicate alerts per developer/window | + +## Event payload + +```json +{ + "event": "usage.anomaly.detected", + "timestamp": "2026-06-01T12:05:00.000Z", + "developerId": "dev_123", + "data": { + "windowStart": "2026-06-01T12:00:00.000Z", + "windowEnd": "2026-06-01T12:05:00.000Z", + "currentCalls": 100, + "baselineMean": 10, + "multiplier": 5, + "ratio": 10, + "windowMs": 300000 + } +} +``` + +Developers subscribe by registering a webhook for the `usage.anomaly.detected` +event type. + +## Metrics + +| Metric | Type | Description | +|--------|------|-------------| +| `usage_anomaly_detector_runs_total` | Counter | Total scan cycles completed | +| `usage_anomaly_detector_anomalies_total` | Counter | Total anomaly events emitted | + +## Testing + +```bash +npx jest src/services/anomalyService.test.ts src/workers/anomalyDetector.test.ts +``` + +## Related code + +- `src/services/anomalyService.ts` — detection logic and DB aggregation +- `src/workers/anomalyDetector.ts` — interval job wrapper +- `src/events/event.emitter.ts` — webhook fan-out for `usage.anomaly.detected` + +The admin `GET /api/admin/usage/anomalies` endpoint uses a separate daily +z-score detector (`usageAnomalyDetector.ts`) for retrospective review; this +worker provides real-time per-developer 5-minute spike detection. diff --git a/src/config/env.ts b/src/config/env.ts index 20ab6ca..32c41fe 100644 --- a/src/config/env.ts +++ b/src/config/env.ts @@ -200,6 +200,17 @@ export const envSchema = z SLOW_QUERY_P95_THRESHOLD_MS: z.coerce.number().positive().default(500), SLOW_QUERY_POLL_INTERVAL_MS: z.coerce.number().int().positive().default(300_000), SLOW_QUERY_DEDUP_WINDOW_SECONDS: z.coerce.number().int().positive().default(3600), + + // Usage anomaly detector (5-minute rolling baseline) + USAGE_ANOMALY_DETECTOR_ENABLED: z + .enum(['true', 'false']) + .default('true') + .transform((v) => v === 'true'), + USAGE_ANOMALY_MULTIPLIER: z.coerce.number().positive().default(5), + USAGE_ANOMALY_POLL_INTERVAL_MS: z.coerce.number().int().positive().default(300_000), + USAGE_ANOMALY_WINDOW_MS: z.coerce.number().int().positive().default(300_000), + USAGE_ANOMALY_BASELINE_WINDOWS: z.coerce.number().int().positive().default(12), + USAGE_ANOMALY_DEDUP_WINDOW_MS: z.coerce.number().int().positive().optional(), }) .superRefine((values, ctx) => { if (values.SOROBAN_RPC_ENABLED && !values.SOROBAN_RPC_URL) { diff --git a/src/config/index.ts b/src/config/index.ts index 8fcdf5f..9993980 100644 --- a/src/config/index.ts +++ b/src/config/index.ts @@ -221,4 +221,20 @@ export const config = { enabled: env.MEMORY_ACCOUNTING_ENABLED, thresholdMb: env.MEMORY_ACCOUNTING_THRESHOLD_MB, }, + + slowQueryAlerter: { + webhookUrl: env.SLOW_QUERY_ALERT_WEBHOOK_URL, + p95ThresholdMs: env.SLOW_QUERY_P95_THRESHOLD_MS, + pollIntervalMs: env.SLOW_QUERY_POLL_INTERVAL_MS, + dedupWindowMs: env.SLOW_QUERY_DEDUP_WINDOW_SECONDS * 1000, + }, + + usageAnomalyDetector: { + enabled: env.USAGE_ANOMALY_DETECTOR_ENABLED, + multiplier: env.USAGE_ANOMALY_MULTIPLIER, + pollIntervalMs: env.USAGE_ANOMALY_POLL_INTERVAL_MS, + windowMs: env.USAGE_ANOMALY_WINDOW_MS, + baselineWindows: env.USAGE_ANOMALY_BASELINE_WINDOWS, + dedupWindowMs: env.USAGE_ANOMALY_DEDUP_WINDOW_MS ?? env.USAGE_ANOMALY_WINDOW_MS, + }, } as const; diff --git a/src/events/README.md b/src/events/README.md index 71f7376..83e4726 100644 --- a/src/events/README.md +++ b/src/events/README.md @@ -54,6 +54,24 @@ Used when a developer or consumer balance falls below the configured threshold. } ``` +### `usage.anomaly.detected` + +Emitted by the usage anomaly background worker when a developer's latest +5-minute call volume exceeds the rolling baseline multiplied by the configured +threshold (default 5×). + +```ts +{ + windowStart: string; + windowEnd: string; + currentCalls: number; + baselineMean: number; + multiplier: number; + ratio: number; + windowMs: number; +} +``` + ## Typing Guarantees ```ts @@ -105,5 +123,7 @@ The module registers one built-in listener per documented event: - `new_api_call` - `settlement_completed` - `low_balance_alert` +- `invoice_created` +- `usage.anomaly.detected` Each built-in listener resolves matching webhook subscriptions from `WebhookStore` and forwards the typed payload through `dispatchToAll(...)`. diff --git a/src/events/event.emitter.test.ts b/src/events/event.emitter.test.ts index a4a8336..e1b2c7a 100644 --- a/src/events/event.emitter.test.ts +++ b/src/events/event.emitter.test.ts @@ -31,6 +31,8 @@ describe('calloraEvents', () => { expect(calloraEvents.listenerCount('new_api_call')).toBe(1); expect(calloraEvents.listenerCount('settlement_completed')).toBe(1); expect(calloraEvents.listenerCount('low_balance_alert')).toBe(1); + expect(calloraEvents.listenerCount('invoice_created')).toBe(1); + expect(calloraEvents.listenerCount('usage.anomaly.detected')).toBe(1); }); it('dispatches registered webhook configs with the correct typed payload', async () => { diff --git a/src/events/event.emitter.ts b/src/events/event.emitter.ts index 45ae9b7..b09e336 100644 --- a/src/events/event.emitter.ts +++ b/src/events/event.emitter.ts @@ -6,6 +6,7 @@ import type { NewApiCallData, SettlementCompletedData, InvoiceCreatedData, + UsageAnomalyDetectedData, WebhookPayload, } from '../webhooks/webhook.types.js'; @@ -14,6 +15,7 @@ export interface CalloraEventPayloadMap { settlement_completed: SettlementCompletedData; low_balance_alert: LowBalanceAlertData; invoice_created: InvoiceCreatedData; + 'usage.anomaly.detected': UsageAnomalyDetectedData; } export type CalloraEventName = keyof CalloraEventPayloadMap; @@ -33,6 +35,7 @@ const createListenerSetMap = (): ListenerSetMap => ({ settlement_completed: new Set>(), low_balance_alert: new Set>(), invoice_created: new Set>(), + 'usage.anomaly.detected': new Set>(), }); async function handleEvent( @@ -116,4 +119,7 @@ calloraEvents.on('low_balance_alert', (developerId, data) => { }); calloraEvents.on('invoice_created', (developerId, data) => { return handleEvent('invoice_created', developerId, data); +}); +calloraEvents.on('usage.anomaly.detected', (developerId, data) => { + return handleEvent('usage.anomaly.detected', developerId, data); }); \ No newline at end of file diff --git a/src/index.ts b/src/index.ts index 0848c2f..c29f60e 100644 --- a/src/index.ts +++ b/src/index.ts @@ -41,6 +41,7 @@ import { createApiRegistry } from './data/apiRegistry.js'; import { ApiKey } from './types/gateway.js'; import { listingsCache } from './lib/listingsCache.js'; import { createSlowQueryAlerterJob } from './workers/slowQueryAlerter.js'; +import { createAnomalyDetectorJob } from './workers/anomalyDetector.js'; // Helper for Jest/CommonJS compat const isDirectExecution = process.argv[1] && (process.argv[1].endsWith('index.ts') || process.argv[1].endsWith('index.js')); @@ -143,6 +144,18 @@ if (isDirectExecution) { }) : null; + const anomalyDetectorJob = config.usageAnomalyDetector.enabled + ? createAnomalyDetectorJob(pool, { + intervalMs: config.usageAnomalyDetector.pollIntervalMs, + dedupWindowMs: config.usageAnomalyDetector.dedupWindowMs, + config: { + multiplier: config.usageAnomalyDetector.multiplier, + baselineWindows: config.usageAnomalyDetector.baselineWindows, + windowMs: config.usageAnomalyDetector.windowMs, + }, + }) + : null; + const apiKeys = new Map([ ['test-key-1', { key: 'test-key-1', developerId: 'dev_001', apiId: 'api_001' }], ['test-key-2', { key: 'test-key-2', developerId: 'dev_002', apiId: 'api_002' }], @@ -214,6 +227,14 @@ if (isDirectExecution) { awaitIdle: () => slowQueryAlerterJob.awaitIdle(), }); } + + if (anomalyDetectorJob) { + shutdownSubsystems.push({ + name: 'usage-anomaly-detector', + beginShutdown: () => anomalyDetectorJob.beginShutdown(), + awaitIdle: () => anomalyDetectorJob.awaitIdle(), + }); + } app.use('/v1/call', legacyV1DeprecationMiddleware, proxyDrainTracker.middleware); app.use('/v1/call', proxyRouter); @@ -231,6 +252,7 @@ if (isDirectExecution) { settlementReconJob.stop(); idempotencySweeperJob.stop(); slowQueryAlerterJob?.stop(); + anomalyDetectorJob?.stop(); await closeDb(); await Promise.allSettled([ closePgPool(), @@ -264,6 +286,7 @@ if (isDirectExecution) { settlementReconJob.start(); idempotencySweeperJob.start(); slowQueryAlerterJob?.start(); + anomalyDetectorJob?.start(); const server = app.listen(PORT, () => { console.log(`Callora backend listening on http://localhost:${PORT}`); diff --git a/src/metrics.ts b/src/metrics.ts index 9209aa2..c21caec 100644 --- a/src/metrics.ts +++ b/src/metrics.ts @@ -498,6 +498,7 @@ export function resetAllMetrics(): void { idempotencyStoreRows.reset(); gatewayUpstreamBreakerState.reset(); resetSlowQueryAlerterMetrics(); + resetUsageAnomalyDetectorMetrics(); resetReplicaMetrics(); } @@ -623,6 +624,34 @@ export function resetSlowQueryAlerterMetrics(): void { slowQueryAlerterQueriesAboveThreshold.reset(); } +// ── Usage anomaly detector metrics ──────────────────────────────────────────── + +const usageAnomalyDetectorRunsTotal = new client.Counter({ + name: 'usage_anomaly_detector_runs_total', + help: 'Total number of usage anomaly detector scan cycles', +}); + +const usageAnomalyDetectorAnomaliesTotal = new client.Counter({ + name: 'usage_anomaly_detector_anomalies_total', + help: 'Total number of usage anomalies emitted', +}); + +register.registerMetric(usageAnomalyDetectorRunsTotal); +register.registerMetric(usageAnomalyDetectorAnomaliesTotal); + +export function recordUsageAnomalyDetectorRun(): void { + usageAnomalyDetectorRunsTotal.inc(); +} + +export function recordUsageAnomalyDetectorAnomaly(): void { + usageAnomalyDetectorAnomaliesTotal.inc(); +} + +export function resetUsageAnomalyDetectorMetrics(): void { + usageAnomalyDetectorRunsTotal.reset(); + usageAnomalyDetectorAnomaliesTotal.reset(); +} + /** Reset all replica routing metrics. Used in tests to isolate metric state. */ export function resetReplicaMetrics(): void { dbReplicaQueriesTotal.reset(); diff --git a/src/repositories/usageEventsRepository.pg.ts b/src/repositories/usageEventsRepository.pg.ts index 1918f11..83cac94 100644 --- a/src/repositories/usageEventsRepository.pg.ts +++ b/src/repositories/usageEventsRepository.pg.ts @@ -7,10 +7,8 @@ import { type UsageBucket, type GroupBy, } from './usageEventsRepository.js'; - feature/usage-cursor-pagination import { generateCursor, getNextCursor, decodeCursor } from '../lib/pagination.js'; import { readQuery, writeQuery } from '../db.js'; - main export interface CreateUsageEventInput { userId: string; diff --git a/src/services/anomalyService.test.ts b/src/services/anomalyService.test.ts new file mode 100644 index 0000000..4a3f48f --- /dev/null +++ b/src/services/anomalyService.test.ts @@ -0,0 +1,259 @@ +import { + anomalyDedupKey, + buildExpectedWindowStarts, + computeBaselineMean, + createAnomalyDedupStore, + detectDeveloperAnomaly, + floorToWindowStart, + isAnomalousTraffic, + mergeWindowCounts, + runAnomalyScan, + toAnomalyEventData, + validateDetectionConfig, + type WindowCount, +} from './anomalyService.js'; + +const WINDOW_MS = 5 * 60 * 1000; +const BASELINE_WINDOWS = 12; +const MULTIPLIER = 5; + +const config = { + multiplier: MULTIPLIER, + baselineWindows: BASELINE_WINDOWS, + windowMs: WINDOW_MS, +}; + +const windowAt = (index: number, calls: number, anchor: Date): WindowCount => ({ + windowStart: new Date(anchor.getTime() + index * WINDOW_MS), + calls, +}); + +const buildSeries = ( + baselineCalls: number, + currentCalls: number, + anchor = new Date('2026-06-01T11:00:00.000Z'), +): WindowCount[] => { + const now = new Date(anchor.getTime() + (BASELINE_WINDOWS + 1) * WINDOW_MS); + const starts = buildExpectedWindowStarts(now, config); + return starts.map((windowStart, index) => ({ + windowStart, + calls: index < BASELINE_WINDOWS ? baselineCalls : currentCalls, + })); +}; + +describe('anomalyService pure helpers', () => { + it('computes baseline mean over trailing windows', () => { + expect(computeBaselineMean([10, 20, 30])).toBe(20); + expect(computeBaselineMean([])).toBe(0); + }); + + it('floors timestamps to fixed window boundaries', () => { + const date = new Date('2026-06-01T12:07:30.000Z'); + expect(floorToWindowStart(date, WINDOW_MS).toISOString()).toBe( + '2026-06-01T12:05:00.000Z', + ); + }); + + it('flags traffic above multiplier * baseline', () => { + expect(isAnomalousTraffic(10, 51, 5)).toBe(true); + expect(isAnomalousTraffic(10, 50, 5)).toBe(false); + expect(isAnomalousTraffic(0, 1, 5)).toBe(true); + expect(isAnomalousTraffic(0, 0, 5)).toBe(false); + }); + + it('rejects invalid detection config at the boundary', () => { + expect(() => validateDetectionConfig({ ...config, multiplier: 0 })).toThrow( + 'multiplier must be a positive finite number', + ); + expect(() => validateDetectionConfig({ ...config, baselineWindows: 0 })).toThrow( + 'baselineWindows must be a positive integer', + ); + expect(() => isAnomalousTraffic(1, -1, 5)).toThrow('currentCalls must be non-negative'); + }); + + it('fills missing windows with zero counts', () => { + const anchor = new Date('2026-06-01T12:00:00.000Z'); + const expected = [ + anchor, + new Date(anchor.getTime() + WINDOW_MS), + new Date(anchor.getTime() + 2 * WINDOW_MS), + ]; + const merged = mergeWindowCounts(expected, [windowAt(1, 7, anchor)]); + expect(merged).toEqual([ + { windowStart: expected[0], calls: 0 }, + { windowStart: expected[1], calls: 7 }, + { windowStart: expected[2], calls: 0 }, + ]); + }); + + it('detects a 5x baseline spike for one developer', () => { + const finding = detectDeveloperAnomaly('dev_1', buildSeries(10, 51), config); + expect(finding).toMatchObject({ + developerId: 'dev_1', + currentCalls: 51, + baselineMean: 10, + multiplier: 5, + }); + expect(finding?.ratio).toBeCloseTo(5.1, 4); + }); + + it('returns null when traffic stays within the multiplier', () => { + expect(detectDeveloperAnomaly('dev_1', buildSeries(10, 50), config)).toBeNull(); + }); + + it('returns null when there is insufficient history', () => { + const anchor = new Date('2026-06-01T12:00:00.000Z'); + const shortSeries = [windowAt(0, 1, anchor), windowAt(1, 100, anchor)]; + expect(detectDeveloperAnomaly('dev_1', shortSeries, config)).toBeNull(); + }); + + it('builds stable dedup keys and event payloads', () => { + const finding = detectDeveloperAnomaly('dev_1', buildSeries(2, 20), config)!; + const key = anomalyDedupKey('dev_1', finding.windowStart); + expect(key).toContain('dev_1:'); + + const data = toAnomalyEventData(finding, WINDOW_MS); + expect(data.windowEnd).toBe( + new Date(finding.windowStart.getTime() + WINDOW_MS).toISOString(), + ); + expect(data.windowMs).toBe(WINDOW_MS); + }); +}); + +describe('createAnomalyDedupStore', () => { + beforeEach(() => { + jest.useFakeTimers(); + jest.setSystemTime(1_000_000); + }); + + afterEach(() => { + jest.useRealTimers(); + }); + + it('deduplicates keys within the configured window', () => { + const dedup = createAnomalyDedupStore(1_000); + expect(dedup.has('dev:window')).toBe(false); + dedup.set('dev:window'); + expect(dedup.has('dev:window')).toBe(true); + + jest.setSystemTime(1_001_500); + expect(dedup.has('dev:window')).toBe(false); + }); +}); + +describe('runAnomalyScan', () => { + const anchor = new Date('2026-06-01T11:00:00.000Z'); + const now = new Date(anchor.getTime() + (BASELINE_WINDOWS + 1) * WINDOW_MS); + + const makePool = (options: { + developers?: string[]; + countsByDeveloper?: Record; + }) => ({ + query: jest.fn(async (sql: string, params: unknown[]) => { + if (sql.includes('DISTINCT developer_id')) { + return { rows: (options.developers ?? []).map((developer_id) => ({ developer_id })) }; + } + + const developerId = params[0] as string; + const rows = (options.countsByDeveloper?.[developerId] ?? []).map((window) => ({ + window_start: window.windowStart, + calls: window.calls, + })); + return { rows }; + }), + }); + + it('emits usage.anomaly.detected once per developer window', async () => { + const emit = jest.fn(); + const dedup = createAnomalyDedupStore(WINDOW_MS); + const series = buildSeries(10, 100, anchor); + + const result = await runAnomalyScan({ + pool: makePool({ + developers: ['dev_1'], + countsByDeveloper: { dev_1: series }, + }) as never, + config, + dedup, + now: () => now, + emit: emit as never, + log: { info: jest.fn(), error: jest.fn() }, + }); + + expect(result).toEqual({ + developersScanned: 1, + anomaliesDetected: 1, + anomaliesEmitted: 1, + }); + expect(emit).toHaveBeenCalledWith( + 'usage.anomaly.detected', + 'dev_1', + expect.objectContaining({ + currentCalls: 100, + baselineMean: 10, + multiplier: MULTIPLIER, + }), + ); + + emit.mockClear(); + const secondPass = await runAnomalyScan({ + pool: makePool({ + developers: ['dev_1'], + countsByDeveloper: { dev_1: series }, + }) as never, + config, + dedup, + now: () => now, + emit: emit as never, + log: { info: jest.fn(), error: jest.fn() }, + }); + + expect(secondPass.anomaliesDetected).toBe(1); + expect(secondPass.anomaliesEmitted).toBe(0); + expect(emit).not.toHaveBeenCalled(); + }); + + it('scopes detection per developer independently', async () => { + const emit = jest.fn(); + const dedup = createAnomalyDedupStore(WINDOW_MS); + + await runAnomalyScan({ + pool: makePool({ + developers: ['dev_spike', 'dev_normal'], + countsByDeveloper: { + dev_spike: buildSeries(10, 100, anchor), + dev_normal: buildSeries(10, 12, anchor), + }, + }) as never, + config, + dedup, + now: () => now, + emit: emit as never, + log: { info: jest.fn(), error: jest.fn() }, + }); + + expect(emit).toHaveBeenCalledTimes(1); + expect(emit).toHaveBeenCalledWith('usage.anomaly.detected', 'dev_spike', expect.any(Object)); + }); + + it('returns zero counts when developer lookup fails', async () => { + const pool = { + query: jest.fn().mockRejectedValue(new Error('db down')), + }; + + const result = await runAnomalyScan({ + pool: pool as never, + config, + dedup: createAnomalyDedupStore(WINDOW_MS), + now: () => now, + emit: jest.fn() as never, + log: { info: jest.fn(), error: jest.fn() }, + }); + + expect(result).toEqual({ + developersScanned: 0, + anomaliesDetected: 0, + anomaliesEmitted: 0, + }); + }); +}); diff --git a/src/services/anomalyService.ts b/src/services/anomalyService.ts new file mode 100644 index 0000000..0b12512 --- /dev/null +++ b/src/services/anomalyService.ts @@ -0,0 +1,366 @@ +/** + * Usage anomaly detection for per-developer 5-minute traffic windows. + * + * Baseline = arithmetic mean of the trailing N windows (default 12). The most + * recent completed window is compared against `baseline * multiplier`; when + * traffic exceeds that threshold an anomaly is returned for event emission. + */ + +import type { Pool } from 'pg'; +import { randomUUID } from 'node:crypto'; +import { calloraEvents } from '../events/event.emitter.js'; +import { getOrCreateRequestId } from '../utils/asyncContext.js'; +import { logger } from '../logger.js'; +import type { UsageAnomalyDetectedData } from '../webhooks/webhook.types.js'; +import { + recordUsageAnomalyDetectorAnomaly, + recordUsageAnomalyDetectorRun, +} from '../metrics.js'; + +export const DEFAULT_BASELINE_WINDOWS = 12; +export const DEFAULT_WINDOW_MS = 5 * 60 * 1000; +export const DEFAULT_MULTIPLIER = 5; + +export interface WindowCount { + windowStart: Date; + calls: number; +} + +export interface AnomalyDetectionConfig { + multiplier: number; + baselineWindows: number; + windowMs: number; +} + +export interface UsageAnomalyFinding { + developerId: string; + windowStart: Date; + currentCalls: number; + baselineMean: number; + multiplier: number; + ratio: number; +} + +export interface AnomalyScanResult { + developersScanned: number; + anomaliesDetected: number; + anomaliesEmitted: number; +} + +export interface AnomalyDedupStore { + has(key: string): boolean; + set(key: string): void; +} + +export interface AnomalyServiceDeps { + pool: Pool; + config: AnomalyDetectionConfig; + dedup: AnomalyDedupStore; + now?: () => Date; + emit?: typeof calloraEvents.emit; + log?: Pick; +} + +const round4 = (value: number): number => Math.round(value * 10_000) / 10_000; + +/** Aligns a timestamp to the start of its fixed-size window (UTC epoch ms). */ +export function floorToWindowStart(date: Date, windowMs: number): Date { + if (!Number.isInteger(windowMs) || windowMs <= 0) { + throw new Error('windowMs must be a positive integer'); + } + const ms = Math.floor(date.getTime() / windowMs) * windowMs; + return new Date(ms); +} + +/** Mean of the supplied call counts; returns 0 for an empty array. */ +export function computeBaselineMean(counts: number[]): number { + if (counts.length === 0) { + return 0; + } + return counts.reduce((sum, count) => sum + count, 0) / counts.length; +} + +/** + * Returns true when `currentCalls` exceeds `baselineMean * multiplier`. + * A zero baseline flags any positive traffic as anomalous. + */ +export function isAnomalousTraffic( + baselineMean: number, + currentCalls: number, + multiplier: number, +): boolean { + validateDetectionConfig({ multiplier, baselineWindows: 1, windowMs: 1 }); + if (currentCalls < 0) { + throw new Error('currentCalls must be non-negative'); + } + if (baselineMean === 0) { + return currentCalls > 0; + } + return currentCalls > baselineMean * multiplier; +} + +export function validateDetectionConfig(config: AnomalyDetectionConfig): void { + if (!Number.isFinite(config.multiplier) || config.multiplier <= 0) { + throw new Error('multiplier must be a positive finite number'); + } + if (!Number.isInteger(config.baselineWindows) || config.baselineWindows <= 0) { + throw new Error('baselineWindows must be a positive integer'); + } + if (!Number.isInteger(config.windowMs) || config.windowMs <= 0) { + throw new Error('windowMs must be a positive integer'); + } +} + +/** + * Builds window starts for baseline + the most recently completed window. + * Oldest window first; the last entry is the window under test. + */ +export function buildExpectedWindowStarts( + now: Date, + config: Pick, +): Date[] { + validateDetectionConfig({ + multiplier: 1, + baselineWindows: config.baselineWindows, + windowMs: config.windowMs, + }); + const currentStart = floorToWindowStart(now, config.windowMs); + const lastCompletedStart = new Date(currentStart.getTime() - config.windowMs); + const starts: Date[] = []; + for (let i = config.baselineWindows; i >= 0; i -= 1) { + starts.push(new Date(lastCompletedStart.getTime() - i * config.windowMs)); + } + return starts; +} + +/** Fills gaps in the series with zero-call windows so the baseline stays correct. */ +export function mergeWindowCounts( + expectedStarts: Date[], + actual: WindowCount[], +): WindowCount[] { + const byStart = new Map(actual.map((w) => [w.windowStart.getTime(), w.calls])); + return expectedStarts.map((windowStart) => ({ + windowStart, + calls: byStart.get(windowStart.getTime()) ?? 0, + })); +} + +/** + * Scores one developer's window series. Requires `baselineWindows + 1` points + * (trailing baseline windows plus the window under test). + */ +export function detectDeveloperAnomaly( + developerId: string, + windows: WindowCount[], + config: AnomalyDetectionConfig, +): UsageAnomalyFinding | null { + validateDetectionConfig(config); + + if (windows.length < config.baselineWindows + 1) { + return null; + } + + const sorted = [...windows].sort( + (a, b) => a.windowStart.getTime() - b.windowStart.getTime(), + ); + const historical = sorted.slice(0, config.baselineWindows); + const current = sorted[sorted.length - 1]; + const baselineMean = computeBaselineMean(historical.map((w) => w.calls)); + + if (!isAnomalousTraffic(baselineMean, current.calls, config.multiplier)) { + return null; + } + + const ratio = + baselineMean === 0 ? Number.POSITIVE_INFINITY : current.calls / baselineMean; + + return { + developerId, + windowStart: current.windowStart, + currentCalls: current.calls, + baselineMean: round4(baselineMean), + multiplier: config.multiplier, + ratio: Number.isFinite(ratio) ? round4(ratio) : ratio, + }; +} + +export function anomalyDedupKey(developerId: string, windowStart: Date): string { + return `${developerId}:${windowStart.toISOString()}`; +} + +export function toAnomalyEventData( + finding: UsageAnomalyFinding, + windowMs: number, +): UsageAnomalyDetectedData { + const windowEnd = new Date(finding.windowStart.getTime() + windowMs); + return { + windowStart: finding.windowStart.toISOString(), + windowEnd: windowEnd.toISOString(), + currentCalls: finding.currentCalls, + baselineMean: finding.baselineMean, + multiplier: finding.multiplier, + ratio: finding.ratio, + windowMs, + }; +} + +export async function fetchActiveDeveloperIds(pool: Pool, since: Date): Promise { + const result = await pool.query<{ developer_id: string }>( + `SELECT DISTINCT developer_id + FROM usage_events + WHERE created_at >= $1 + AND developer_id IS NOT NULL + AND developer_id <> ''`, + [since], + ); + return result.rows.map((row) => row.developer_id); +} + +export async function fetchDeveloperWindowCounts( + pool: Pool, + developerId: string, + from: Date, + to: Date, + windowMs: number, +): Promise { + validateDetectionConfig({ + multiplier: 1, + baselineWindows: 1, + windowMs, + }); + + const windowSeconds = windowMs / 1000; + const result = await pool.query<{ window_start: Date | string; calls: number }>( + `SELECT + to_timestamp(floor(extract(epoch from created_at) / $4) * $4) AS window_start, + COUNT(*)::int AS calls + FROM usage_events + WHERE developer_id = $1 + AND created_at >= $2 + AND created_at < $3 + GROUP BY 1 + ORDER BY 1`, + [developerId, from, to, windowSeconds], + ); + + return result.rows.map((row) => ({ + windowStart: new Date(row.window_start), + calls: Number(row.calls), + })); +} + +/** + * Runs one anomaly-detection pass across all developers with recent usage. + * Emits `usage.anomaly.detected` per developer/window at most once per dedup key. + */ +export async function runAnomalyScan(deps: AnomalyServiceDeps): Promise { + const log = deps.log ?? logger; + const emit = deps.emit ?? calloraEvents.emit.bind(calloraEvents); + const now = deps.now ?? (() => new Date()); + const { pool, config, dedup } = deps; + + validateDetectionConfig(config); + recordUsageAnomalyDetectorRun(); + + const correlationId = getOrCreateRequestId(randomUUID); + const currentTime = now(); + const expectedStarts = buildExpectedWindowStarts(currentTime, config); + const from = expectedStarts[0]; + const to = new Date( + expectedStarts[expectedStarts.length - 1].getTime() + config.windowMs, + ); + + let developerIds: string[]; + try { + developerIds = await fetchActiveDeveloperIds(pool, from); + } catch (error) { + log.error('[anomalyService] Failed to list active developers', { + correlationId, + error, + }); + return { developersScanned: 0, anomaliesDetected: 0, anomaliesEmitted: 0 }; + } + + let anomaliesDetected = 0; + let anomaliesEmitted = 0; + + for (const developerId of developerIds) { + let actualWindows: WindowCount[]; + try { + actualWindows = await fetchDeveloperWindowCounts( + pool, + developerId, + from, + to, + config.windowMs, + ); + } catch (error) { + log.error('[anomalyService] Failed to fetch window counts', { + correlationId, + developerId, + error, + }); + continue; + } + + const windows = mergeWindowCounts(expectedStarts, actualWindows); + const finding = detectDeveloperAnomaly(developerId, windows, config); + if (!finding) { + continue; + } + + anomaliesDetected += 1; + const dedupKey = anomalyDedupKey(developerId, finding.windowStart); + if (dedup.has(dedupKey)) { + continue; + } + dedup.set(dedupKey); + + const data = toAnomalyEventData(finding, config.windowMs); + emit('usage.anomaly.detected', developerId, data); + recordUsageAnomalyDetectorAnomaly(); + + log.info('[anomalyService] Emitted usage.anomaly.detected', { + correlationId, + developerId, + windowStart: data.windowStart, + currentCalls: data.currentCalls, + baselineMean: data.baselineMean, + multiplier: data.multiplier, + ratio: data.ratio, + }); + anomaliesEmitted += 1; + } + + return { + developersScanned: developerIds.length, + anomaliesDetected, + anomaliesEmitted, + }; +} + +export function createAnomalyDedupStore(windowMs: number): AnomalyDedupStore { + if (!Number.isInteger(windowMs) || windowMs <= 0) { + throw new Error('windowMs must be a positive integer'); + } + + const store = new Map(); + + return { + has(key: string): boolean { + const expiry = store.get(key); + if (expiry === undefined) { + return false; + } + if (Date.now() > expiry) { + store.delete(key); + return false; + } + return true; + }, + + set(key: string): void { + store.set(key, Date.now() + windowMs); + }, + }; +} diff --git a/src/webhooks/webhook.types.ts b/src/webhooks/webhook.types.ts index ea912cb..a65a5a5 100644 --- a/src/webhooks/webhook.types.ts +++ b/src/webhooks/webhook.types.ts @@ -2,8 +2,9 @@ export type WebhookEventType = | 'new_api_call' | 'settlement_completed' | 'low_balance_alert' - | 'quota.threshold.reached'; + | 'quota.threshold.reached' | 'invoice_created' + | 'usage.anomaly.detected' export interface WebhookConfig { developerId: string; @@ -68,6 +69,24 @@ export interface LowBalanceAlertData { asset: string; } +/** Fired when a developer's 5-minute traffic exceeds baseline * multiplier. */ +export interface UsageAnomalyDetectedData { + /** ISO 8601 start of the anomalous window (UTC). */ + windowStart: string; + /** ISO 8601 end of the anomalous window (UTC). */ + windowEnd: string; + /** Call count in the anomalous window. */ + currentCalls: number; + /** Mean call count across the trailing baseline windows. */ + baselineMean: number; + /** Configured multiplier threshold that was exceeded. */ + multiplier: number; + /** currentCalls / baselineMean (Infinity when baselineMean is 0). */ + ratio: number; + /** Window size in milliseconds. */ + windowMs: number; +} + /** Fired when a developer crosses 80%, 95%, or 100% of their monthly call quota. */ export interface QuotaThresholdReachedData { /** Billing period in YYYY-MM format, e.g. "2026-06". */ diff --git a/src/workers/anomalyDetector.test.ts b/src/workers/anomalyDetector.test.ts new file mode 100644 index 0000000..eafc107 --- /dev/null +++ b/src/workers/anomalyDetector.test.ts @@ -0,0 +1,151 @@ +import { resetAllMetrics } from '../metrics.js'; +import { createAnomalyDetectorJob } from './anomalyDetector.js'; + +jest.mock('../services/anomalyService.js', () => ({ + ...jest.requireActual('../services/anomalyService.js'), + runAnomalyScan: jest.fn(async () => ({ + developersScanned: 0, + anomaliesDetected: 0, + anomaliesEmitted: 0, + })), +})); + +const { runAnomalyScan } = jest.requireMock('../services/anomalyService.js') as { + runAnomalyScan: jest.Mock; +}; + +const baseConfig = { + multiplier: 5, + baselineWindows: 12, + windowMs: 300_000, +}; + +describe('anomalyDetector worker', () => { + const pool = { query: jest.fn() } as never; + + beforeAll(() => { + jest.useFakeTimers(); + }); + + beforeEach(() => { + runAnomalyScan.mockClear(); + jest.spyOn(console, 'error').mockImplementation(() => {}); + }); + + afterEach(() => { + jest.clearAllTimers(); + jest.restoreAllMocks(); + resetAllMetrics(); + }); + + afterAll(() => { + jest.useRealTimers(); + }); + + it('rejects invalid intervalMs at construction', () => { + expect(() => + createAnomalyDetectorJob(pool, { + intervalMs: 0, + config: baseConfig, + }), + ).toThrow('intervalMs must be a positive integer'); + }); + + it('runs an initial scan on start and on each interval tick', async () => { + const job = createAnomalyDetectorJob(pool, { + intervalMs: 60_000, + config: baseConfig, + }); + + job.start(); + await Promise.resolve(); + expect(runAnomalyScan).toHaveBeenCalledTimes(1); + + jest.advanceTimersByTime(60_000); + await Promise.resolve(); + expect(runAnomalyScan).toHaveBeenCalledTimes(2); + + job.stop(); + }); + + it('skips overlapping ticks while a scan is in flight', async () => { + let resolveScan: (() => void) | undefined; + runAnomalyScan.mockImplementation( + () => + new Promise((resolve) => { + resolveScan = resolve as () => void; + }), + ); + + const job = createAnomalyDetectorJob(pool, { + intervalMs: 1_000, + config: baseConfig, + }); + + job.start(); + await Promise.resolve(); + expect(runAnomalyScan).toHaveBeenCalledTimes(1); + + jest.advanceTimersByTime(1_000); + await Promise.resolve(); + expect(runAnomalyScan).toHaveBeenCalledTimes(1); + + resolveScan?.(); + await Promise.resolve(); + + jest.advanceTimersByTime(1_000); + await Promise.resolve(); + expect(runAnomalyScan).toHaveBeenCalledTimes(2); + + job.stop(); + }); + + it('supports graceful shutdown hooks', async () => { + let resolveScan: (() => void) | undefined; + runAnomalyScan.mockImplementation( + () => + new Promise((resolve) => { + resolveScan = resolve as () => void; + }), + ); + + const job = createAnomalyDetectorJob(pool, { + intervalMs: 1_000, + config: baseConfig, + }); + + job.start(); + await Promise.resolve(); + + job.beginShutdown(); + jest.advanceTimersByTime(5_000); + await Promise.resolve(); + expect(runAnomalyScan).toHaveBeenCalledTimes(1); + + resolveScan?.(); + await job.awaitIdle(); + + job.stop(); + }); + + it('logs scan failures without crashing the worker', async () => { + const log = { info: jest.fn(), error: jest.fn() }; + runAnomalyScan.mockRejectedValueOnce(new Error('scan failed')); + + const job = createAnomalyDetectorJob(pool, { + intervalMs: 1_000, + config: baseConfig, + logger: log, + }); + + job.start(); + await Promise.resolve(); + + expect(log.error).toHaveBeenCalledWith( + '[anomalyDetector] Job failed', + expect.objectContaining({ error: expect.any(Error) }), + ); + + job.stop(); + }); +}); diff --git a/src/workers/anomalyDetector.ts b/src/workers/anomalyDetector.ts new file mode 100644 index 0000000..4d166e2 --- /dev/null +++ b/src/workers/anomalyDetector.ts @@ -0,0 +1,97 @@ +import type { Pool } from 'pg'; +import { logger } from '../logger.js'; +import { + createAnomalyDedupStore, + runAnomalyScan, + type AnomalyDetectionConfig, + type AnomalyDedupStore, +} from '../services/anomalyService.js'; + +export interface AnomalyDetectorOptions { + intervalMs: number; + config: AnomalyDetectionConfig; + dedupWindowMs?: number; + logger?: Pick; + dedup?: AnomalyDedupStore; +} + +export interface AnomalyDetectorJob { + start(): void; + stop(): void; + beginShutdown(): void; + awaitIdle(): Promise; +} + +export function createAnomalyDetectorJob( + pool: Pool, + options: AnomalyDetectorOptions, +): AnomalyDetectorJob { + const log = options.logger ?? logger; + + if (!Number.isInteger(options.intervalMs) || options.intervalMs <= 0) { + throw new Error('intervalMs must be a positive integer'); + } + + const dedup = + options.dedup ?? + createAnomalyDedupStore(options.dedupWindowMs ?? options.config.windowMs); + + let timer: NodeJS.Timeout | null = null; + let accepting = true; + let running: Promise | null = null; + + const tick = async (): Promise => { + if (!accepting || running) { + return; + } + + running = (async () => { + try { + await runAnomalyScan({ + pool, + config: options.config, + dedup, + log, + }); + } catch (error) { + log.error('[anomalyDetector] Job failed', { error }); + } finally { + running = null; + } + })(); + + await running; + }; + + return { + start() { + if (timer || !accepting) { + return; + } + void tick(); + timer = setInterval(() => { + void tick(); + }, options.intervalMs); + }, + + stop() { + if (!timer) { + return; + } + clearInterval(timer); + timer = null; + }, + + beginShutdown() { + accepting = false; + if (timer) { + clearInterval(timer); + timer = null; + } + }, + + async awaitIdle() { + await (running ?? Promise.resolve()); + }, + }; +}