Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
1 change: 1 addition & 0 deletions README.md
Original file line number Diff line number Diff line change
Expand Up @@ -24,6 +24,7 @@ API gateway, usage metering, and billing services for the Callora API marketplac
- `GET /api/apis/:id`
- `POST /api/apis` for authenticated developers to register an API with priced endpoints
- Usage route: `GET /api/usage`
- Live usage stream: `GET /api/usage/sse` for authenticated developer dashboards
- Admin usage anomalies: `GET /api/admin/usage/anomalies` returns per-API daily usage anomalies (z-score spikes/drops) for admin review, filterable by `from`/`to`/`apiId`/`threshold`/`limit` (admin auth + IP allowlist)
- JSON body parsing plus gateway API key authentication for upstream proxy routes
- Per-user global REST rate limiting for authenticated `/api/billing`, `/api/usage`, `/api/developers`, `/api/vault`, and `/api/keys` traffic, with IP fallback for unauthenticated requests
Expand Down
29 changes: 29 additions & 0 deletions docs/usage-sse.md
Original file line number Diff line number Diff line change
@@ -0,0 +1,29 @@
# Usage SSE stream

## Overview

The backend now exposes an authenticated Server-Sent Events endpoint at `/api/usage/sse` for live developer dashboard updates.

## Behavior

- The stream uses `Content-Type: text/event-stream` and keeps the connection open while the client remains connected.
- The server sends an initial `connected` event immediately after the handshake succeeds.
- Each new usage event recorded for the authenticated user is emitted as an SSE `usage` event with the event payload.
- Clients should reconnect on disconnects; the backend will clean up the subscription automatically.

## Authentication

The endpoint accepts the same authentication mechanisms as the rest of the usage API:

- `x-user-id` header, or
- a bearer JWT via the standard `Authorization` header.

## Example

```bash
curl -N -H 'x-user-id: user-123' http://localhost:3000/api/usage/sse
```

## Notes

The SSE endpoint is intended for developer dashboards that need real-time usage feedback without polling the REST usage endpoints.
18 changes: 17 additions & 1 deletion src/routes/gatewayRoutes.ts
Original file line number Diff line number Diff line change
Expand Up @@ -5,6 +5,7 @@ import { startUpstreamTimer, getUpstreamHealth, type UpstreamOutcome } from '../
import { validate } from '../middleware/validate.js';
import type { GatewayDeps, ApiKey } from '../types/gateway.js';
import { buildHopByHopSet } from '../lib/hopByHop.js';
import { defaultUsageSseBroadcaster } from './usage/sse.js';
import { getDefaultBreakerRegistry, CircuitBreakerState } from '../lib/circuitBreaker.js';
import {
BadGatewayError,
Expand Down Expand Up @@ -298,7 +299,7 @@ export function createGatewayRouter(deps: GatewayDeps): Router {
}
}

await usageStore.record({
const recorded = await usageStore.record({
id: randomUUID(),
requestId,
apiKey: apiKeyHeader,
Expand All @@ -311,6 +312,21 @@ export function createGatewayRouter(deps: GatewayDeps): Router {
timestamp: new Date().toISOString(),
});

if (recorded) {
defaultUsageSseBroadcaster.emitForUser(keyRecord.developerId, {
id: randomUUID(),
requestId,
apiKey: apiKeyHeader,
apiKeyId: keyRecord.key,
apiId: keyRecord.apiId,
endpointId: 'legacy',
userId: keyRecord.developerId,
amountUsdc: CREDIT_COST_PER_CALL,
statusCode: upstreamStatus,
timestamp: new Date().toISOString(),
});
}

res.set('x-request-id', requestId);
// Forward safe upstream response headers (hop-by-hop already stripped above)
for (const [key, value] of Object.entries(upstreamResponseHeaders)) {
Expand Down
3 changes: 3 additions & 0 deletions src/routes/index.ts
Original file line number Diff line number Diff line change
Expand Up @@ -8,6 +8,7 @@ import { createBillingPortalRouter } from './billing/portal.js';
import healthRouter from './health.js';
import { createApisRouter, type ApisRouterDeps } from './apis.js';
import { createUsageRouter, type UsageRouterDeps } from './usage.js';
import { createUsageSseRouter } from './usage/sse.js';
import { createLimitsRouter } from './limits.js';
import { InMemoryRestRateLimiter } from '../middleware/restRateLimit.js';
import { createUsageCsvRouter } from './usage/csv.js';
Expand Down Expand Up @@ -38,6 +39,8 @@ export function createApiRouter(deps: ApiRouterDeps = {}): Router {
usageEventsRepository: deps.usageEventsRepository!
}));

router.use('/usage/sse', createUsageSseRouter());

router.use('/usage', createUsageRouter({
usageEventsRepository: deps.usageEventsRepository!
}));
Expand Down
16 changes: 16 additions & 0 deletions src/routes/proxyRoutes.ts
Original file line number Diff line number Diff line change
Expand Up @@ -21,6 +21,7 @@ import { CircuitBreakerOpenError } from '../lib/errors.js';
import { CircuitBreaker, type CircuitBreakerStore } from '../lib/circuitBreaker.js';
import { env } from '../config/env.js';
import { getOrCreateRequestId } from '../utils/asyncContext.js';
import { defaultUsageSseBroadcaster } from './usage/sse.js';

/**
* Headers that must never be forwarded to the upstream server.
Expand Down Expand Up @@ -316,6 +317,21 @@ export function createProxyRouter(deps: ProxyDeps): Router {
timestamp: new Date().toISOString(),
});

if (recorded) {
defaultUsageSseBroadcaster.emitForUser(keyRecord.userId, {
id: randomUUID(),
requestId,
apiKey: apiKeyHeader,
apiKeyId: keyRecord.id,
apiId: String(apiEntry.id),
endpointId: endpoint.endpointId,
userId: keyRecord.userId,
amountUsdc: endpoint.priceUsdc,
statusCode: upstreamStatus,
timestamp: new Date().toISOString(),
});
}

// Only deduct billing if this requestId hasn't been processed
// before (idempotency guard inside usageStore.record).
if (recorded && endpoint.priceUsdc > 0) {
Expand Down
73 changes: 73 additions & 0 deletions src/routes/usage/sse.test.ts
Original file line number Diff line number Diff line change
@@ -0,0 +1,73 @@
import express from 'express';
import { createUsageSseRouter, UsageSseBroadcaster } from './sse.js';
import { errorHandler } from '../../middleware/errorHandler.js';
import { requestIdMiddleware } from '../../middleware/requestId.js';
import type { AddressInfo } from 'node:net';

const USER_ID = 'user-1';

describe('GET /api/usage/sse', () => {
it('returns 401 when the request is unauthenticated', async () => {
const app = express();
app.use(requestIdMiddleware);
app.use('/api/usage/sse', createUsageSseRouter());
app.use(errorHandler);

const server = app.listen(0);
try {
const address = server.address() as AddressInfo;
const response = await fetch(`http://127.0.0.1:${address.port}/api/usage/sse`);
expect(response.status).toBe(401);
expect(await response.json()).toMatchObject({ code: 'UNAUTHORIZED' });
} finally {
await new Promise<void>((resolve) => server.close(() => resolve()));
}
});

it('streams usage updates to the authenticated user', async () => {
const broadcaster = new UsageSseBroadcaster();
const app = express();
app.use(requestIdMiddleware);
app.use('/api/usage/sse', createUsageSseRouter({ broadcaster }));
app.use(errorHandler);

const server = app.listen(0);
try {
const address = server.address() as AddressInfo;
const response = await fetch(`http://127.0.0.1:${address.port}/api/usage/sse`, {
headers: { 'x-user-id': USER_ID },
});

expect(response.status).toBe(200);
expect(response.headers.get('content-type')).toContain('text/event-stream');
expect(response.headers.get('cache-control')).toBe('no-store');

const reader = response.body?.getReader();
expect(reader).toBeDefined();

const initialChunk = await reader!.read();
const initialText = new TextDecoder().decode(initialChunk.value ?? new Uint8Array());
expect(initialText).toContain('event: connected');

broadcaster.emitForUser(USER_ID, {
id: 'evt-1',
requestId: 'req-1',
apiKey: 'key-1',
apiKeyId: 'key-id-1',
apiId: 'api-1',
endpointId: 'endpoint-1',
userId: USER_ID,
amountUsdc: 1,
statusCode: 200,
timestamp: '2026-06-28T12:00:00.000Z',
});

const nextChunk = await reader!.read();
const nextText = new TextDecoder().decode(nextChunk.value ?? new Uint8Array());
expect(nextText).toContain('event: usage');
expect(nextText).toContain('"apiId":"api-1"');
} finally {
await new Promise<void>((resolve) => server.close(() => resolve()));
}
});
});
99 changes: 99 additions & 0 deletions src/routes/usage/sse.ts
Original file line number Diff line number Diff line change
@@ -0,0 +1,99 @@
import { Router, type Response } from 'express';
import { requireAuth, type AuthenticatedLocals } from '../../middleware/requireAuth.js';
import { UnauthorizedError } from '../../errors/index.js';
import { logger } from '../../logger.js';

export interface UsageSseDeps {
broadcaster?: UsageSseBroadcaster;
}

export interface UsageSseEventPayload {
id: string;
requestId: string;
apiKey: string;
apiKeyId: string;
apiId: string;
endpointId: string;
userId: string;
amountUsdc: number;
statusCode: number;
timestamp: string;
}

export class UsageSseBroadcaster {
private readonly listeners = new Map<string, Set<(event: UsageSseEventPayload) => void>>();

subscribe(userId: string, listener: (event: UsageSseEventPayload) => void): () => void {
const listeners = this.listeners.get(userId) ?? new Set();
listeners.add(listener);
this.listeners.set(userId, listeners);

return () => {
listeners.delete(listener);
if (listeners.size === 0) {
this.listeners.delete(userId);
}
};
}

emitForUser(userId: string, event: UsageSseEventPayload): void {
const listeners = this.listeners.get(userId);
if (!listeners || listeners.size === 0) {
return;
}

for (const listener of [...listeners]) {
try {
listener(event);
} catch (error) {
logger.error('[usage.sse] failed to dispatch event', { userId, error });
}
}
}
}

export const defaultUsageSseBroadcaster = new UsageSseBroadcaster();

export function createUsageSseRouter(deps: UsageSseDeps = {}): Router {
const router = Router();
const broadcaster = deps.broadcaster ?? new UsageSseBroadcaster();

router.get('/', requireAuth, async (req, res: Response<unknown, AuthenticatedLocals>, next) => {
const user = res.locals.authenticatedUser;
if (!user) {
next(new UnauthorizedError());
return;
}

res.setHeader('Content-Type', 'text/event-stream; charset=utf-8');
res.setHeader('Cache-Control', 'no-store');
res.setHeader('Connection', 'keep-alive');
res.setHeader('X-Accel-Buffering', 'no');
res.flushHeaders?.();

const writeSse = (event: string, payload: unknown): void => {
const data = JSON.stringify(payload);
res.write(`event: ${event}\n`);
res.write(`data: ${data}\n\n`);
};

writeSse('connected', { userId: user.id, connectedAt: new Date().toISOString() });

const unsubscribe = broadcaster.subscribe(user.id, (event) => {
writeSse('usage', event);
});

req.on('close', () => {
unsubscribe();
logger.info('[usage.sse] client disconnected', { userId: user.id });
});

req.on('aborted', () => {
unsubscribe();
});
});

return router;
}

export default createUsageSseRouter;