Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -1779,6 +1779,7 @@ describe('/api/v1/chat/completions POST endpoint', () => {
expect(response.headers.get('Content-Type')).toBe('text/event-stream')
expect(response.headers.get('Cache-Control')).toBe('no-cache')
expect(response.headers.get('Connection')).toBe('keep-alive')
expect(await response.text()).toContain(' stream')
},
FETCH_PATH_TEST_TIMEOUT_MS,
)
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,113 @@
import { describe, expect, it, mock } from 'bun:test'

import {
beginChatCompletionRequestMetrics,
getActiveChatCompletionRequestCount,
} from '../request-metrics'

import type { Logger } from '@codebuff/common/types/contracts/logger'

const createLogger = (): Logger => ({
debug: mock(() => {}),
error: mock(() => {}),
info: mock(() => {}),
warn: mock(() => {}),
})

const baseParams = (logger: Logger) => ({
logger,
userId: 'user-1',
agentId: 'agent-1',
runId: 'run-1',
model: 'provider/model',
streaming: true,
costMode: 'normal',
logSampleRate: 1,
})

const drainStream = async (stream: ReadableStream<Uint8Array>) => {
const reader = stream.getReader()
while (true) {
const { done } = await reader.read()
if (done) return
}
}

describe('chat completion request metrics', () => {
it('increments and decrements when manually ended', () => {
const logger = createLogger()
const metrics = beginChatCompletionRequestMetrics(baseParams(logger))

expect(getActiveChatCompletionRequestCount()).toBe(1)

metrics.end('completed')
metrics.end('completed')

expect(getActiveChatCompletionRequestCount()).toBe(0)
expect(logger.info).toHaveBeenCalledTimes(2)
})

it('tracks requests without logging when sampling skips the request', () => {
const logger = createLogger()
const metrics = beginChatCompletionRequestMetrics({
...baseParams(logger),
logSampleRate: 0,
})

expect(getActiveChatCompletionRequestCount()).toBe(1)

metrics.end('completed')

expect(getActiveChatCompletionRequestCount()).toBe(0)
expect(logger.info).toHaveBeenCalledTimes(0)
})

it('decrements when a wrapped stream completes', async () => {
const logger = createLogger()
const metrics = beginChatCompletionRequestMetrics(baseParams(logger))
const stream = new ReadableStream<Uint8Array>({
start(controller) {
controller.enqueue(new TextEncoder().encode('data: test\n\n'))
controller.close()
},
})

await drainStream(metrics.wrapStream(stream))

expect(getActiveChatCompletionRequestCount()).toBe(0)
expect(logger.info).toHaveBeenCalledTimes(2)
})

it('decrements when a wrapped stream is cancelled', async () => {
const logger = createLogger()
const metrics = beginChatCompletionRequestMetrics(baseParams(logger))
const stream = new ReadableStream<Uint8Array>({
start(controller) {
controller.enqueue(new TextEncoder().encode('data: test\n\n'))
},
})

const reader = metrics.wrapStream(stream).getReader()
await reader.cancel('client disconnected')

expect(getActiveChatCompletionRequestCount()).toBe(0)
expect(logger.info).toHaveBeenCalledTimes(2)
})

it('decrements when a wrapped stream errors', async () => {
const logger = createLogger()
const metrics = beginChatCompletionRequestMetrics(baseParams(logger))
const stream = new ReadableStream<Uint8Array>({
pull() {
throw new Error('provider stream failed')
},
})

await expect(drainStream(metrics.wrapStream(stream))).rejects.toThrow(
'provider stream failed',
)

expect(getActiveChatCompletionRequestCount()).toBe(0)
expect(logger.info).toHaveBeenCalledTimes(2)
})
})
15 changes: 14 additions & 1 deletion web/src/app/api/v1/chat/completions/_post.ts
Original file line number Diff line number Diff line change
Expand Up @@ -116,6 +116,7 @@ import type {
import { extractApiKeyFromHeader } from '@/util/auth'
import { withDefaultProperties } from '@codebuff/common/analytics'
import { checkFreeModeRateLimit as defaultCheckFreeModeRateLimit } from './free-mode-rate-limiter'
import { beginChatCompletionRequestMetrics } from './request-metrics'

export const formatQuotaResetCountdown = (
nextQuotaReset: string | null | undefined,
Expand Down Expand Up @@ -794,6 +795,16 @@ export async function postChatCompletions(params: {
insertChatCompletionTraceBigquery,
})

const requestMetrics = beginChatCompletionRequestMetrics({
logger,
userId,
agentId,
runId: runIdFromBody,
model: typedBody.model,
streaming: bodyStream,
costMode,
})

// Handle streaming vs non-streaming
try {
if (bodyStream) {
Expand Down Expand Up @@ -859,7 +870,7 @@ export async function postChatCompletions(params: {
logger,
})

return new NextResponse(stream, {
return new NextResponse(requestMetrics.wrapStream(stream), {
headers: {
'Content-Type': 'text/event-stream',
'Cache-Control': 'no-cache',
Expand Down Expand Up @@ -934,9 +945,11 @@ export async function postChatCompletions(params: {
logger,
})

requestMetrics.end('completed')
return NextResponse.json(result)
}
} catch (error) {
requestMetrics.end('error', { error: getErrorObject(error) })
let openrouterError: OpenRouterError | undefined
if (error instanceof OpenRouterError) {
openrouterError = error
Expand Down
125 changes: 125 additions & 0 deletions web/src/app/api/v1/chat/completions/request-metrics.ts
Original file line number Diff line number Diff line change
@@ -0,0 +1,125 @@
import os from 'os'

import { getErrorObject } from '@codebuff/common/util/error'

import type { Logger } from '@codebuff/common/types/contracts/logger'

const HOSTNAME = os.hostname()
const DEFAULT_LOG_SAMPLE_RATE = 0.05

let activeChatCompletionRequests = 0
let nextRequestSequence = 0

type RequestMetricsParams = {
logger: Logger
userId: string
agentId: string
runId: string
model: string
streaming: boolean
costMode: string | undefined
logSampleRate?: number
}

type EndReason = 'completed' | 'cancelled' | 'error'

export function beginChatCompletionRequestMetrics({
logger,
userId,
agentId,
runId,
model,
streaming,
costMode,
logSampleRate = DEFAULT_LOG_SAMPLE_RATE,
}: RequestMetricsParams) {
const requestSequence = ++nextRequestSequence
const startedAt = Date.now()
activeChatCompletionRequests += 1
const activeRequestsAtStart = activeChatCompletionRequests
const normalizedLogSampleRate = Math.max(0, Math.min(1, logSampleRate))
const shouldLog = Math.random() < normalizedLogSampleRate

const baseFields = {
metric: 'chat_completion_concurrency',
host: HOSTNAME,
pid: process.pid,
requestSequence,
userId,
agentId,
runId,
model,
streaming,
costMode,
logSampleRate: normalizedLogSampleRate,
}

if (shouldLog) {
logger.info(
{
...baseFields,
event: 'start',
activeChatCompletionRequests: activeRequestsAtStart,
},
'Chat completion request started',
)
}

let ended = false

const end = (reason: EndReason, extra?: Record<string, unknown>) => {
if (ended) return
ended = true
activeChatCompletionRequests = Math.max(0, activeChatCompletionRequests - 1)

if (!shouldLog) return

logger.info(
{
...baseFields,
...extra,
event: 'finish',
endReason: reason,
durationMs: Date.now() - startedAt,
activeRequestsAtStart,
activeChatCompletionRequests,
},
'Chat completion request finished',
)
}

return {
end,
wrapStream(stream: ReadableStream<Uint8Array>) {
const reader = stream.getReader()

return new ReadableStream<Uint8Array>({
async pull(controller) {
try {
const { done, value } = await reader.read()
if (done) {
end('completed')
controller.close()
return
}
controller.enqueue(value)
} catch (error) {
end('error', { error: getErrorObject(error) })
controller.error(error)
}
},
async cancel(reason) {
end('cancelled', {
cancelReason:
typeof reason === 'string' ? reason : getErrorObject(reason),
})
await reader.cancel(reason)
},
})
},
}
}

export function getActiveChatCompletionRequestCount() {
return activeChatCompletionRequests
}
Loading