diff --git a/web/src/app/api/v1/chat/completions/__tests__/completions.test.ts b/web/src/app/api/v1/chat/completions/__tests__/completions.test.ts index c8fdaa232..5704535f8 100644 --- a/web/src/app/api/v1/chat/completions/__tests__/completions.test.ts +++ b/web/src/app/api/v1/chat/completions/__tests__/completions.test.ts @@ -1779,6 +1779,7 @@ describe('/api/v1/chat/completions POST endpoint', () => { expect(response.headers.get('Content-Type')).toBe('text/event-stream') expect(response.headers.get('Cache-Control')).toBe('no-cache') expect(response.headers.get('Connection')).toBe('keep-alive') + expect(await response.text()).toContain(' stream') }, FETCH_PATH_TEST_TIMEOUT_MS, ) diff --git a/web/src/app/api/v1/chat/completions/__tests__/request-metrics.test.ts b/web/src/app/api/v1/chat/completions/__tests__/request-metrics.test.ts new file mode 100644 index 000000000..ce6f6544f --- /dev/null +++ b/web/src/app/api/v1/chat/completions/__tests__/request-metrics.test.ts @@ -0,0 +1,113 @@ +import { describe, expect, it, mock } from 'bun:test' + +import { + beginChatCompletionRequestMetrics, + getActiveChatCompletionRequestCount, +} from '../request-metrics' + +import type { Logger } from '@codebuff/common/types/contracts/logger' + +const createLogger = (): Logger => ({ + debug: mock(() => {}), + error: mock(() => {}), + info: mock(() => {}), + warn: mock(() => {}), +}) + +const baseParams = (logger: Logger) => ({ + logger, + userId: 'user-1', + agentId: 'agent-1', + runId: 'run-1', + model: 'provider/model', + streaming: true, + costMode: 'normal', + logSampleRate: 1, +}) + +const drainStream = async (stream: ReadableStream) => { + const reader = stream.getReader() + while (true) { + const { done } = await reader.read() + if (done) return + } +} + +describe('chat completion request metrics', () => { + it('increments and decrements when manually ended', () => { + const logger = createLogger() + const metrics = beginChatCompletionRequestMetrics(baseParams(logger)) + + expect(getActiveChatCompletionRequestCount()).toBe(1) + + metrics.end('completed') + metrics.end('completed') + + expect(getActiveChatCompletionRequestCount()).toBe(0) + expect(logger.info).toHaveBeenCalledTimes(2) + }) + + it('tracks requests without logging when sampling skips the request', () => { + const logger = createLogger() + const metrics = beginChatCompletionRequestMetrics({ + ...baseParams(logger), + logSampleRate: 0, + }) + + expect(getActiveChatCompletionRequestCount()).toBe(1) + + metrics.end('completed') + + expect(getActiveChatCompletionRequestCount()).toBe(0) + expect(logger.info).toHaveBeenCalledTimes(0) + }) + + it('decrements when a wrapped stream completes', async () => { + const logger = createLogger() + const metrics = beginChatCompletionRequestMetrics(baseParams(logger)) + const stream = new ReadableStream({ + start(controller) { + controller.enqueue(new TextEncoder().encode('data: test\n\n')) + controller.close() + }, + }) + + await drainStream(metrics.wrapStream(stream)) + + expect(getActiveChatCompletionRequestCount()).toBe(0) + expect(logger.info).toHaveBeenCalledTimes(2) + }) + + it('decrements when a wrapped stream is cancelled', async () => { + const logger = createLogger() + const metrics = beginChatCompletionRequestMetrics(baseParams(logger)) + const stream = new ReadableStream({ + start(controller) { + controller.enqueue(new TextEncoder().encode('data: test\n\n')) + }, + }) + + const reader = metrics.wrapStream(stream).getReader() + await reader.cancel('client disconnected') + + expect(getActiveChatCompletionRequestCount()).toBe(0) + expect(logger.info).toHaveBeenCalledTimes(2) + }) + + it('decrements when a wrapped stream errors', async () => { + const logger = createLogger() + const metrics = beginChatCompletionRequestMetrics(baseParams(logger)) + const stream = new ReadableStream({ + pull() { + throw new Error('provider stream failed') + }, + }) + + await expect(drainStream(metrics.wrapStream(stream))).rejects.toThrow( + 'provider stream failed', + ) + + expect(getActiveChatCompletionRequestCount()).toBe(0) + expect(logger.info).toHaveBeenCalledTimes(2) + }) +}) diff --git a/web/src/app/api/v1/chat/completions/_post.ts b/web/src/app/api/v1/chat/completions/_post.ts index b23e5fe1b..76aa89248 100644 --- a/web/src/app/api/v1/chat/completions/_post.ts +++ b/web/src/app/api/v1/chat/completions/_post.ts @@ -116,6 +116,7 @@ import type { import { extractApiKeyFromHeader } from '@/util/auth' import { withDefaultProperties } from '@codebuff/common/analytics' import { checkFreeModeRateLimit as defaultCheckFreeModeRateLimit } from './free-mode-rate-limiter' +import { beginChatCompletionRequestMetrics } from './request-metrics' export const formatQuotaResetCountdown = ( nextQuotaReset: string | null | undefined, @@ -794,6 +795,16 @@ export async function postChatCompletions(params: { insertChatCompletionTraceBigquery, }) + const requestMetrics = beginChatCompletionRequestMetrics({ + logger, + userId, + agentId, + runId: runIdFromBody, + model: typedBody.model, + streaming: bodyStream, + costMode, + }) + // Handle streaming vs non-streaming try { if (bodyStream) { @@ -859,7 +870,7 @@ export async function postChatCompletions(params: { logger, }) - return new NextResponse(stream, { + return new NextResponse(requestMetrics.wrapStream(stream), { headers: { 'Content-Type': 'text/event-stream', 'Cache-Control': 'no-cache', @@ -934,9 +945,11 @@ export async function postChatCompletions(params: { logger, }) + requestMetrics.end('completed') return NextResponse.json(result) } } catch (error) { + requestMetrics.end('error', { error: getErrorObject(error) }) let openrouterError: OpenRouterError | undefined if (error instanceof OpenRouterError) { openrouterError = error diff --git a/web/src/app/api/v1/chat/completions/request-metrics.ts b/web/src/app/api/v1/chat/completions/request-metrics.ts new file mode 100644 index 000000000..54af0063c --- /dev/null +++ b/web/src/app/api/v1/chat/completions/request-metrics.ts @@ -0,0 +1,125 @@ +import os from 'os' + +import { getErrorObject } from '@codebuff/common/util/error' + +import type { Logger } from '@codebuff/common/types/contracts/logger' + +const HOSTNAME = os.hostname() +const DEFAULT_LOG_SAMPLE_RATE = 0.05 + +let activeChatCompletionRequests = 0 +let nextRequestSequence = 0 + +type RequestMetricsParams = { + logger: Logger + userId: string + agentId: string + runId: string + model: string + streaming: boolean + costMode: string | undefined + logSampleRate?: number +} + +type EndReason = 'completed' | 'cancelled' | 'error' + +export function beginChatCompletionRequestMetrics({ + logger, + userId, + agentId, + runId, + model, + streaming, + costMode, + logSampleRate = DEFAULT_LOG_SAMPLE_RATE, +}: RequestMetricsParams) { + const requestSequence = ++nextRequestSequence + const startedAt = Date.now() + activeChatCompletionRequests += 1 + const activeRequestsAtStart = activeChatCompletionRequests + const normalizedLogSampleRate = Math.max(0, Math.min(1, logSampleRate)) + const shouldLog = Math.random() < normalizedLogSampleRate + + const baseFields = { + metric: 'chat_completion_concurrency', + host: HOSTNAME, + pid: process.pid, + requestSequence, + userId, + agentId, + runId, + model, + streaming, + costMode, + logSampleRate: normalizedLogSampleRate, + } + + if (shouldLog) { + logger.info( + { + ...baseFields, + event: 'start', + activeChatCompletionRequests: activeRequestsAtStart, + }, + 'Chat completion request started', + ) + } + + let ended = false + + const end = (reason: EndReason, extra?: Record) => { + if (ended) return + ended = true + activeChatCompletionRequests = Math.max(0, activeChatCompletionRequests - 1) + + if (!shouldLog) return + + logger.info( + { + ...baseFields, + ...extra, + event: 'finish', + endReason: reason, + durationMs: Date.now() - startedAt, + activeRequestsAtStart, + activeChatCompletionRequests, + }, + 'Chat completion request finished', + ) + } + + return { + end, + wrapStream(stream: ReadableStream) { + const reader = stream.getReader() + + return new ReadableStream({ + async pull(controller) { + try { + const { done, value } = await reader.read() + if (done) { + end('completed') + controller.close() + return + } + controller.enqueue(value) + } catch (error) { + end('error', { error: getErrorObject(error) }) + controller.error(error) + } + }, + async cancel(reason) { + end('cancelled', { + cancelReason: + typeof reason === 'string' ? reason : getErrorObject(reason), + }) + await reader.cancel(reason) + }, + }) + }, + } +} + +export function getActiveChatCompletionRequestCount() { + return activeChatCompletionRequests +}