diff --git a/README.md b/README.md index cb4de9a..4cc3e83 100644 --- a/README.md +++ b/README.md @@ -24,6 +24,7 @@ API gateway, usage metering, and billing services for the Callora API marketplac - `GET /api/apis/:id` - `POST /api/apis` for authenticated developers to register an API with priced endpoints - Usage route: `GET /api/usage` +- Live usage stream: `GET /api/usage/sse` for authenticated developer dashboards - Admin usage anomalies: `GET /api/admin/usage/anomalies` returns per-API daily usage anomalies (z-score spikes/drops) for admin review, filterable by `from`/`to`/`apiId`/`threshold`/`limit` (admin auth + IP allowlist) - JSON body parsing plus gateway API key authentication for upstream proxy routes - Per-user global REST rate limiting for authenticated `/api/billing`, `/api/usage`, `/api/developers`, `/api/vault`, and `/api/keys` traffic, with IP fallback for unauthenticated requests diff --git a/docs/usage-sse.md b/docs/usage-sse.md new file mode 100644 index 0000000..40e09e1 --- /dev/null +++ b/docs/usage-sse.md @@ -0,0 +1,29 @@ +# Usage SSE stream + +## Overview + +The backend now exposes an authenticated Server-Sent Events endpoint at `/api/usage/sse` for live developer dashboard updates. + +## Behavior + +- The stream uses `Content-Type: text/event-stream` and keeps the connection open while the client remains connected. +- The server sends an initial `connected` event immediately after the handshake succeeds. +- Each new usage event recorded for the authenticated user is emitted as an SSE `usage` event with the event payload. +- Clients should reconnect on disconnects; the backend will clean up the subscription automatically. + +## Authentication + +The endpoint accepts the same authentication mechanisms as the rest of the usage API: + +- `x-user-id` header, or +- a bearer JWT via the standard `Authorization` header. + +## Example + +```bash +curl -N -H 'x-user-id: user-123' http://localhost:3000/api/usage/sse +``` + +## Notes + +The SSE endpoint is intended for developer dashboards that need real-time usage feedback without polling the REST usage endpoints. diff --git a/src/routes/gatewayRoutes.ts b/src/routes/gatewayRoutes.ts index 8590796..2b6c270 100644 --- a/src/routes/gatewayRoutes.ts +++ b/src/routes/gatewayRoutes.ts @@ -5,6 +5,7 @@ import { startUpstreamTimer, getUpstreamHealth, type UpstreamOutcome } from '../ import { validate } from '../middleware/validate.js'; import type { GatewayDeps, ApiKey } from '../types/gateway.js'; import { buildHopByHopSet } from '../lib/hopByHop.js'; +import { defaultUsageSseBroadcaster } from './usage/sse.js'; import { getDefaultBreakerRegistry, CircuitBreakerState } from '../lib/circuitBreaker.js'; import { BadGatewayError, @@ -298,7 +299,7 @@ export function createGatewayRouter(deps: GatewayDeps): Router { } } - await usageStore.record({ + const recorded = await usageStore.record({ id: randomUUID(), requestId, apiKey: apiKeyHeader, @@ -311,6 +312,21 @@ export function createGatewayRouter(deps: GatewayDeps): Router { timestamp: new Date().toISOString(), }); + if (recorded) { + defaultUsageSseBroadcaster.emitForUser(keyRecord.developerId, { + id: randomUUID(), + requestId, + apiKey: apiKeyHeader, + apiKeyId: keyRecord.key, + apiId: keyRecord.apiId, + endpointId: 'legacy', + userId: keyRecord.developerId, + amountUsdc: CREDIT_COST_PER_CALL, + statusCode: upstreamStatus, + timestamp: new Date().toISOString(), + }); + } + res.set('x-request-id', requestId); // Forward safe upstream response headers (hop-by-hop already stripped above) for (const [key, value] of Object.entries(upstreamResponseHeaders)) { diff --git a/src/routes/index.ts b/src/routes/index.ts index 47d07c0..a2f024c 100644 --- a/src/routes/index.ts +++ b/src/routes/index.ts @@ -8,6 +8,7 @@ import { createBillingPortalRouter } from './billing/portal.js'; import healthRouter from './health.js'; import { createApisRouter, type ApisRouterDeps } from './apis.js'; import { createUsageRouter, type UsageRouterDeps } from './usage.js'; +import { createUsageSseRouter } from './usage/sse.js'; import { createLimitsRouter } from './limits.js'; import { InMemoryRestRateLimiter } from '../middleware/restRateLimit.js'; import { createUsageCsvRouter } from './usage/csv.js'; @@ -38,6 +39,8 @@ export function createApiRouter(deps: ApiRouterDeps = {}): Router { usageEventsRepository: deps.usageEventsRepository! })); + router.use('/usage/sse', createUsageSseRouter()); + router.use('/usage', createUsageRouter({ usageEventsRepository: deps.usageEventsRepository! })); diff --git a/src/routes/proxyRoutes.ts b/src/routes/proxyRoutes.ts index c9e9caf..d39b7be 100644 --- a/src/routes/proxyRoutes.ts +++ b/src/routes/proxyRoutes.ts @@ -21,6 +21,7 @@ import { CircuitBreakerOpenError } from '../lib/errors.js'; import { CircuitBreaker, type CircuitBreakerStore } from '../lib/circuitBreaker.js'; import { env } from '../config/env.js'; import { getOrCreateRequestId } from '../utils/asyncContext.js'; +import { defaultUsageSseBroadcaster } from './usage/sse.js'; /** * Headers that must never be forwarded to the upstream server. @@ -316,6 +317,21 @@ export function createProxyRouter(deps: ProxyDeps): Router { timestamp: new Date().toISOString(), }); + if (recorded) { + defaultUsageSseBroadcaster.emitForUser(keyRecord.userId, { + id: randomUUID(), + requestId, + apiKey: apiKeyHeader, + apiKeyId: keyRecord.id, + apiId: String(apiEntry.id), + endpointId: endpoint.endpointId, + userId: keyRecord.userId, + amountUsdc: endpoint.priceUsdc, + statusCode: upstreamStatus, + timestamp: new Date().toISOString(), + }); + } + // Only deduct billing if this requestId hasn't been processed // before (idempotency guard inside usageStore.record). if (recorded && endpoint.priceUsdc > 0) { diff --git a/src/routes/usage/sse.test.ts b/src/routes/usage/sse.test.ts new file mode 100644 index 0000000..af42e56 --- /dev/null +++ b/src/routes/usage/sse.test.ts @@ -0,0 +1,73 @@ +import express from 'express'; +import { createUsageSseRouter, UsageSseBroadcaster } from './sse.js'; +import { errorHandler } from '../../middleware/errorHandler.js'; +import { requestIdMiddleware } from '../../middleware/requestId.js'; +import type { AddressInfo } from 'node:net'; + +const USER_ID = 'user-1'; + +describe('GET /api/usage/sse', () => { + it('returns 401 when the request is unauthenticated', async () => { + const app = express(); + app.use(requestIdMiddleware); + app.use('/api/usage/sse', createUsageSseRouter()); + app.use(errorHandler); + + const server = app.listen(0); + try { + const address = server.address() as AddressInfo; + const response = await fetch(`http://127.0.0.1:${address.port}/api/usage/sse`); + expect(response.status).toBe(401); + expect(await response.json()).toMatchObject({ code: 'UNAUTHORIZED' }); + } finally { + await new Promise((resolve) => server.close(() => resolve())); + } + }); + + it('streams usage updates to the authenticated user', async () => { + const broadcaster = new UsageSseBroadcaster(); + const app = express(); + app.use(requestIdMiddleware); + app.use('/api/usage/sse', createUsageSseRouter({ broadcaster })); + app.use(errorHandler); + + const server = app.listen(0); + try { + const address = server.address() as AddressInfo; + const response = await fetch(`http://127.0.0.1:${address.port}/api/usage/sse`, { + headers: { 'x-user-id': USER_ID }, + }); + + expect(response.status).toBe(200); + expect(response.headers.get('content-type')).toContain('text/event-stream'); + expect(response.headers.get('cache-control')).toBe('no-store'); + + const reader = response.body?.getReader(); + expect(reader).toBeDefined(); + + const initialChunk = await reader!.read(); + const initialText = new TextDecoder().decode(initialChunk.value ?? new Uint8Array()); + expect(initialText).toContain('event: connected'); + + broadcaster.emitForUser(USER_ID, { + id: 'evt-1', + requestId: 'req-1', + apiKey: 'key-1', + apiKeyId: 'key-id-1', + apiId: 'api-1', + endpointId: 'endpoint-1', + userId: USER_ID, + amountUsdc: 1, + statusCode: 200, + timestamp: '2026-06-28T12:00:00.000Z', + }); + + const nextChunk = await reader!.read(); + const nextText = new TextDecoder().decode(nextChunk.value ?? new Uint8Array()); + expect(nextText).toContain('event: usage'); + expect(nextText).toContain('"apiId":"api-1"'); + } finally { + await new Promise((resolve) => server.close(() => resolve())); + } + }); +}); diff --git a/src/routes/usage/sse.ts b/src/routes/usage/sse.ts new file mode 100644 index 0000000..2852442 --- /dev/null +++ b/src/routes/usage/sse.ts @@ -0,0 +1,99 @@ +import { Router, type Response } from 'express'; +import { requireAuth, type AuthenticatedLocals } from '../../middleware/requireAuth.js'; +import { UnauthorizedError } from '../../errors/index.js'; +import { logger } from '../../logger.js'; + +export interface UsageSseDeps { + broadcaster?: UsageSseBroadcaster; +} + +export interface UsageSseEventPayload { + id: string; + requestId: string; + apiKey: string; + apiKeyId: string; + apiId: string; + endpointId: string; + userId: string; + amountUsdc: number; + statusCode: number; + timestamp: string; +} + +export class UsageSseBroadcaster { + private readonly listeners = new Map void>>(); + + subscribe(userId: string, listener: (event: UsageSseEventPayload) => void): () => void { + const listeners = this.listeners.get(userId) ?? new Set(); + listeners.add(listener); + this.listeners.set(userId, listeners); + + return () => { + listeners.delete(listener); + if (listeners.size === 0) { + this.listeners.delete(userId); + } + }; + } + + emitForUser(userId: string, event: UsageSseEventPayload): void { + const listeners = this.listeners.get(userId); + if (!listeners || listeners.size === 0) { + return; + } + + for (const listener of [...listeners]) { + try { + listener(event); + } catch (error) { + logger.error('[usage.sse] failed to dispatch event', { userId, error }); + } + } + } +} + +export const defaultUsageSseBroadcaster = new UsageSseBroadcaster(); + +export function createUsageSseRouter(deps: UsageSseDeps = {}): Router { + const router = Router(); + const broadcaster = deps.broadcaster ?? new UsageSseBroadcaster(); + + router.get('/', requireAuth, async (req, res: Response, next) => { + const user = res.locals.authenticatedUser; + if (!user) { + next(new UnauthorizedError()); + return; + } + + res.setHeader('Content-Type', 'text/event-stream; charset=utf-8'); + res.setHeader('Cache-Control', 'no-store'); + res.setHeader('Connection', 'keep-alive'); + res.setHeader('X-Accel-Buffering', 'no'); + res.flushHeaders?.(); + + const writeSse = (event: string, payload: unknown): void => { + const data = JSON.stringify(payload); + res.write(`event: ${event}\n`); + res.write(`data: ${data}\n\n`); + }; + + writeSse('connected', { userId: user.id, connectedAt: new Date().toISOString() }); + + const unsubscribe = broadcaster.subscribe(user.id, (event) => { + writeSse('usage', event); + }); + + req.on('close', () => { + unsubscribe(); + logger.info('[usage.sse] client disconnected', { userId: user.id }); + }); + + req.on('aborted', () => { + unsubscribe(); + }); + }); + + return router; +} + +export default createUsageSseRouter;