Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
18 changes: 18 additions & 0 deletions .env.example
Original file line number Diff line number Diff line change
Expand Up @@ -185,3 +185,21 @@ SLOW_QUERY_POLL_INTERVAL_MS=300000
# Deduplication window per query fingerprint (seconds). A query that was
# already alerted on will not fire again within this window. Default: 3600 (1h).
SLOW_QUERY_DEDUP_WINDOW_SECONDS=3600

# -----------------------------------------------------------------------------
# Usage Anomaly Detector — 5-minute rolling baseline per developer
# Compares the latest 5-minute window to the mean of the trailing 12 windows.
# When traffic exceeds baseline * multiplier, emits usage.anomaly.detected.
# -----------------------------------------------------------------------------
# Set to false to disable the background worker.
USAGE_ANOMALY_DETECTOR_ENABLED=true
# Traffic multiplier threshold (default 5x baseline).
USAGE_ANOMALY_MULTIPLIER=5
# Poll interval in milliseconds (default 300000 = 5 min).
USAGE_ANOMALY_POLL_INTERVAL_MS=300000
# Window size in milliseconds (default 300000 = 5 min).
USAGE_ANOMALY_WINDOW_MS=300000
# Number of trailing windows used for the baseline mean (default 12).
USAGE_ANOMALY_BASELINE_WINDOWS=12
# Optional dedup window per developer/window (defaults to USAGE_ANOMALY_WINDOW_MS).
# USAGE_ANOMALY_DEDUP_WINDOW_MS=300000
1 change: 1 addition & 0 deletions README.md
Original file line number Diff line number Diff line change
Expand Up @@ -25,6 +25,7 @@ API gateway, usage metering, and billing services for the Callora API marketplac
- `POST /api/apis` for authenticated developers to register an API with priced endpoints
- Usage route: `GET /api/usage`
- Admin usage anomalies: `GET /api/admin/usage/anomalies` returns per-API daily usage anomalies (z-score spikes/drops) for admin review, filterable by `from`/`to`/`apiId`/`threshold`/`limit` (admin auth + IP allowlist)
- Usage anomaly detector: background worker emits `usage.anomaly.detected` when per-developer 5-minute traffic exceeds a rolling 12-window baseline by a configurable multiplier (see `docs/usage-anomaly-detector.md`)
- JSON body parsing plus gateway API key authentication for upstream proxy routes
- Per-user global REST rate limiting for authenticated `/api/billing`, `/api/usage`, `/api/developers`, `/api/vault`, and `/api/keys` traffic, with IP fallback for unauthenticated requests
- In-memory `VaultRepository` with:
Expand Down
75 changes: 75 additions & 0 deletions docs/usage-anomaly-detector.md
Original file line number Diff line number Diff line change
@@ -0,0 +1,75 @@
# Usage Anomaly Detector

Background worker that compares each developer's latest 5-minute API call
volume against a rolling baseline and emits `usage.anomaly.detected` when
traffic exceeds a configurable multiplier (default **5×**).

## How it works

1. Every `USAGE_ANOMALY_POLL_INTERVAL_MS` (default 5 min) the worker scans
developers with recent `usage_events` activity.
2. For each developer, call counts are bucketed into fixed 5-minute windows.
3. **Baseline** = arithmetic mean of the trailing **12** completed windows (configurable).
4. The **most recently completed** 5-minute window is compared to `baseline × multiplier`.
5. When the threshold is exceeded, the worker emits `usage.anomaly.detected`
through the typed event emitter, which fans out to matching developer
webhook subscriptions.

Missing windows in the series are treated as **zero calls** so quiet periods
do not inflate the baseline.

## Configuration

| Variable | Default | Description |
|----------|---------|-------------|
| `USAGE_ANOMALY_DETECTOR_ENABLED` | `true` | Set to `false` to disable the worker |
| `USAGE_ANOMALY_MULTIPLIER` | `5` | Traffic must exceed `baseline × multiplier` |
| `USAGE_ANOMALY_POLL_INTERVAL_MS` | `300000` | Scan interval in ms (5 min) |
| `USAGE_ANOMALY_WINDOW_MS` | `300000` | Window size in ms (5 min) |
| `USAGE_ANOMALY_BASELINE_WINDOWS` | `12` | Trailing windows used for the baseline mean |
| `USAGE_ANOMALY_DEDUP_WINDOW_MS` | `USAGE_ANOMALY_WINDOW_MS` | Suppress duplicate alerts per developer/window |

## Event payload

```json
{
"event": "usage.anomaly.detected",
"timestamp": "2026-06-01T12:05:00.000Z",
"developerId": "dev_123",
"data": {
"windowStart": "2026-06-01T12:00:00.000Z",
"windowEnd": "2026-06-01T12:05:00.000Z",
"currentCalls": 100,
"baselineMean": 10,
"multiplier": 5,
"ratio": 10,
"windowMs": 300000
}
}
```

Developers subscribe by registering a webhook for the `usage.anomaly.detected`
event type.

## Metrics

| Metric | Type | Description |
|--------|------|-------------|
| `usage_anomaly_detector_runs_total` | Counter | Total scan cycles completed |
| `usage_anomaly_detector_anomalies_total` | Counter | Total anomaly events emitted |

## Testing

```bash
npx jest src/services/anomalyService.test.ts src/workers/anomalyDetector.test.ts
```

## Related code

- `src/services/anomalyService.ts` — detection logic and DB aggregation
- `src/workers/anomalyDetector.ts` — interval job wrapper
- `src/events/event.emitter.ts` — webhook fan-out for `usage.anomaly.detected`

The admin `GET /api/admin/usage/anomalies` endpoint uses a separate daily
z-score detector (`usageAnomalyDetector.ts`) for retrospective review; this
worker provides real-time per-developer 5-minute spike detection.
11 changes: 11 additions & 0 deletions src/config/env.ts
Original file line number Diff line number Diff line change
Expand Up @@ -200,6 +200,17 @@ export const envSchema = z
SLOW_QUERY_P95_THRESHOLD_MS: z.coerce.number().positive().default(500),
SLOW_QUERY_POLL_INTERVAL_MS: z.coerce.number().int().positive().default(300_000),
SLOW_QUERY_DEDUP_WINDOW_SECONDS: z.coerce.number().int().positive().default(3600),

// Usage anomaly detector (5-minute rolling baseline)
USAGE_ANOMALY_DETECTOR_ENABLED: z
.enum(['true', 'false'])
.default('true')
.transform((v) => v === 'true'),
USAGE_ANOMALY_MULTIPLIER: z.coerce.number().positive().default(5),
USAGE_ANOMALY_POLL_INTERVAL_MS: z.coerce.number().int().positive().default(300_000),
USAGE_ANOMALY_WINDOW_MS: z.coerce.number().int().positive().default(300_000),
USAGE_ANOMALY_BASELINE_WINDOWS: z.coerce.number().int().positive().default(12),
USAGE_ANOMALY_DEDUP_WINDOW_MS: z.coerce.number().int().positive().optional(),
})
.superRefine((values, ctx) => {
if (values.SOROBAN_RPC_ENABLED && !values.SOROBAN_RPC_URL) {
Expand Down
16 changes: 16 additions & 0 deletions src/config/index.ts
Original file line number Diff line number Diff line change
Expand Up @@ -221,4 +221,20 @@ export const config = {
enabled: env.MEMORY_ACCOUNTING_ENABLED,
thresholdMb: env.MEMORY_ACCOUNTING_THRESHOLD_MB,
},

slowQueryAlerter: {
webhookUrl: env.SLOW_QUERY_ALERT_WEBHOOK_URL,
p95ThresholdMs: env.SLOW_QUERY_P95_THRESHOLD_MS,
pollIntervalMs: env.SLOW_QUERY_POLL_INTERVAL_MS,
dedupWindowMs: env.SLOW_QUERY_DEDUP_WINDOW_SECONDS * 1000,
},

usageAnomalyDetector: {
enabled: env.USAGE_ANOMALY_DETECTOR_ENABLED,
multiplier: env.USAGE_ANOMALY_MULTIPLIER,
pollIntervalMs: env.USAGE_ANOMALY_POLL_INTERVAL_MS,
windowMs: env.USAGE_ANOMALY_WINDOW_MS,
baselineWindows: env.USAGE_ANOMALY_BASELINE_WINDOWS,
dedupWindowMs: env.USAGE_ANOMALY_DEDUP_WINDOW_MS ?? env.USAGE_ANOMALY_WINDOW_MS,
},
} as const;
20 changes: 20 additions & 0 deletions src/events/README.md
Original file line number Diff line number Diff line change
Expand Up @@ -54,6 +54,24 @@ Used when a developer or consumer balance falls below the configured threshold.
}
```

### `usage.anomaly.detected`

Emitted by the usage anomaly background worker when a developer's latest
5-minute call volume exceeds the rolling baseline multiplied by the configured
threshold (default 5×).

```ts
{
windowStart: string;
windowEnd: string;
currentCalls: number;
baselineMean: number;
multiplier: number;
ratio: number;
windowMs: number;
}
```

## Typing Guarantees

```ts
Expand Down Expand Up @@ -105,5 +123,7 @@ The module registers one built-in listener per documented event:
- `new_api_call`
- `settlement_completed`
- `low_balance_alert`
- `invoice_created`
- `usage.anomaly.detected`

Each built-in listener resolves matching webhook subscriptions from `WebhookStore` and forwards the typed payload through `dispatchToAll(...)`.
2 changes: 2 additions & 0 deletions src/events/event.emitter.test.ts
Original file line number Diff line number Diff line change
Expand Up @@ -31,6 +31,8 @@ describe('calloraEvents', () => {
expect(calloraEvents.listenerCount('new_api_call')).toBe(1);
expect(calloraEvents.listenerCount('settlement_completed')).toBe(1);
expect(calloraEvents.listenerCount('low_balance_alert')).toBe(1);
expect(calloraEvents.listenerCount('invoice_created')).toBe(1);
expect(calloraEvents.listenerCount('usage.anomaly.detected')).toBe(1);
});

it('dispatches registered webhook configs with the correct typed payload', async () => {
Expand Down
6 changes: 6 additions & 0 deletions src/events/event.emitter.ts
Original file line number Diff line number Diff line change
Expand Up @@ -6,6 +6,7 @@ import type {
NewApiCallData,
SettlementCompletedData,
InvoiceCreatedData,
UsageAnomalyDetectedData,
WebhookPayload,
} from '../webhooks/webhook.types.js';

Expand All @@ -14,6 +15,7 @@ export interface CalloraEventPayloadMap {
settlement_completed: SettlementCompletedData;
low_balance_alert: LowBalanceAlertData;
invoice_created: InvoiceCreatedData;
'usage.anomaly.detected': UsageAnomalyDetectedData;
}
export type CalloraEventName = keyof CalloraEventPayloadMap;

Expand All @@ -33,6 +35,7 @@ const createListenerSetMap = (): ListenerSetMap => ({
settlement_completed: new Set<CalloraEventListener<'settlement_completed'>>(),
low_balance_alert: new Set<CalloraEventListener<'low_balance_alert'>>(),
invoice_created: new Set<CalloraEventListener<'invoice_created'>>(),
'usage.anomaly.detected': new Set<CalloraEventListener<'usage.anomaly.detected'>>(),
});

async function handleEvent<K extends CalloraEventName>(
Expand Down Expand Up @@ -116,4 +119,7 @@ calloraEvents.on('low_balance_alert', (developerId, data) => {
});
calloraEvents.on('invoice_created', (developerId, data) => {
return handleEvent('invoice_created', developerId, data);
});
calloraEvents.on('usage.anomaly.detected', (developerId, data) => {
return handleEvent('usage.anomaly.detected', developerId, data);
});
23 changes: 23 additions & 0 deletions src/index.ts
Original file line number Diff line number Diff line change
Expand Up @@ -41,6 +41,7 @@ import { createApiRegistry } from './data/apiRegistry.js';
import { ApiKey } from './types/gateway.js';
import { listingsCache } from './lib/listingsCache.js';
import { createSlowQueryAlerterJob } from './workers/slowQueryAlerter.js';
import { createAnomalyDetectorJob } from './workers/anomalyDetector.js';

// Helper for Jest/CommonJS compat
const isDirectExecution = process.argv[1] && (process.argv[1].endsWith('index.ts') || process.argv[1].endsWith('index.js'));
Expand Down Expand Up @@ -143,6 +144,18 @@ if (isDirectExecution) {
})
: null;

const anomalyDetectorJob = config.usageAnomalyDetector.enabled
? createAnomalyDetectorJob(pool, {
intervalMs: config.usageAnomalyDetector.pollIntervalMs,
dedupWindowMs: config.usageAnomalyDetector.dedupWindowMs,
config: {
multiplier: config.usageAnomalyDetector.multiplier,
baselineWindows: config.usageAnomalyDetector.baselineWindows,
windowMs: config.usageAnomalyDetector.windowMs,
},
})
: null;

const apiKeys = new Map<string, ApiKey>([
['test-key-1', { key: 'test-key-1', developerId: 'dev_001', apiId: 'api_001' }],
['test-key-2', { key: 'test-key-2', developerId: 'dev_002', apiId: 'api_002' }],
Expand Down Expand Up @@ -214,6 +227,14 @@ if (isDirectExecution) {
awaitIdle: () => slowQueryAlerterJob.awaitIdle(),
});
}

if (anomalyDetectorJob) {
shutdownSubsystems.push({
name: 'usage-anomaly-detector',
beginShutdown: () => anomalyDetectorJob.beginShutdown(),
awaitIdle: () => anomalyDetectorJob.awaitIdle(),
});
}
app.use('/v1/call', legacyV1DeprecationMiddleware, proxyDrainTracker.middleware);
app.use('/v1/call', proxyRouter);

Expand All @@ -231,6 +252,7 @@ if (isDirectExecution) {
settlementReconJob.stop();
idempotencySweeperJob.stop();
slowQueryAlerterJob?.stop();
anomalyDetectorJob?.stop();
await closeDb();
await Promise.allSettled([
closePgPool(),
Expand Down Expand Up @@ -264,6 +286,7 @@ if (isDirectExecution) {
settlementReconJob.start();
idempotencySweeperJob.start();
slowQueryAlerterJob?.start();
anomalyDetectorJob?.start();

const server = app.listen(PORT, () => {
console.log(`Callora backend listening on http://localhost:${PORT}`);
Expand Down
29 changes: 29 additions & 0 deletions src/metrics.ts
Original file line number Diff line number Diff line change
Expand Up @@ -498,6 +498,7 @@ export function resetAllMetrics(): void {
idempotencyStoreRows.reset();
gatewayUpstreamBreakerState.reset();
resetSlowQueryAlerterMetrics();
resetUsageAnomalyDetectorMetrics();
resetReplicaMetrics();
}

Expand Down Expand Up @@ -623,6 +624,34 @@ export function resetSlowQueryAlerterMetrics(): void {
slowQueryAlerterQueriesAboveThreshold.reset();
}

// ── Usage anomaly detector metrics ────────────────────────────────────────────

const usageAnomalyDetectorRunsTotal = new client.Counter({
name: 'usage_anomaly_detector_runs_total',
help: 'Total number of usage anomaly detector scan cycles',
});

const usageAnomalyDetectorAnomaliesTotal = new client.Counter({
name: 'usage_anomaly_detector_anomalies_total',
help: 'Total number of usage anomalies emitted',
});

register.registerMetric(usageAnomalyDetectorRunsTotal);
register.registerMetric(usageAnomalyDetectorAnomaliesTotal);

export function recordUsageAnomalyDetectorRun(): void {
usageAnomalyDetectorRunsTotal.inc();
}

export function recordUsageAnomalyDetectorAnomaly(): void {
usageAnomalyDetectorAnomaliesTotal.inc();
}

export function resetUsageAnomalyDetectorMetrics(): void {
usageAnomalyDetectorRunsTotal.reset();
usageAnomalyDetectorAnomaliesTotal.reset();
}

/** Reset all replica routing metrics. Used in tests to isolate metric state. */
export function resetReplicaMetrics(): void {
dbReplicaQueriesTotal.reset();
Expand Down
2 changes: 0 additions & 2 deletions src/repositories/usageEventsRepository.pg.ts
Original file line number Diff line number Diff line change
Expand Up @@ -7,10 +7,8 @@ import {
type UsageBucket,
type GroupBy,
} from './usageEventsRepository.js';
feature/usage-cursor-pagination
import { generateCursor, getNextCursor, decodeCursor } from '../lib/pagination.js';
import { readQuery, writeQuery } from '../db.js';
main

export interface CreateUsageEventInput {
userId: string;
Expand Down
Loading
Loading