diff --git a/web/src/app/api/v1/chat/completions/_post.ts b/web/src/app/api/v1/chat/completions/_post.ts index 76aa89248..f70c34b34 100644 --- a/web/src/app/api/v1/chat/completions/_post.ts +++ b/web/src/app/api/v1/chat/completions/_post.ts @@ -67,6 +67,10 @@ import { handleDeepSeekStream, isDeepSeekModel, } from '@/llm-api/deepseek' +import { + isLikelyDeepSeekOutage, + shouldBypassDeepSeek, +} from '@/llm-api/deepseek-health' import { handleMoonshotNonStream, handleMoonshotStream, @@ -118,6 +122,23 @@ import { withDefaultProperties } from '@codebuff/common/analytics' import { checkFreeModeRateLimit as defaultCheckFreeModeRateLimit } from './free-mode-rate-limiter' import { beginChatCompletionRequestMetrics } from './request-metrics' +/** + * Decide whether a failed DeepSeek request should transparently fail over to + * Fireworks. Pre-stream errors (network/timeout/5xx) on a model that has a + * known Fireworks fallback are eligible. The circuit-breaker failure was + * already recorded inside the DeepSeek handler. + */ +function canFailoverDeepSeekToFireworks( + error: unknown, + model: string, +): boolean { + if (!isFireworksModel(model)) return false + if (error instanceof DeepSeekError) { + return isLikelyDeepSeekOutage(undefined, error.statusCode) + } + return isLikelyDeepSeekOutage(error) +} + export const formatQuotaResetCountdown = ( nextQuotaReset: string | null | undefined, ): string => { @@ -814,10 +835,18 @@ export async function postChatCompletions(params: { const useMoonshot = !useOpenCodeZen && isMoonshotModel(typedBody.model) const useCanopyWave = !useMoonshot && !useOpenCodeZen && isCanopyWaveModel(typedBody.model) + const deepseekBypassed = shouldBypassDeepSeek(typedBody.model) + if (deepseekBypassed) { + providerLogger.info( + { model: typedBody.model }, + 'DeepSeek circuit open — routing to Fireworks fallback', + ) + } const useDeepSeek = !useMoonshot && !useOpenCodeZen && !useCanopyWave && + !deepseekBypassed && isDeepSeekModel(typedBody.model) const useFireworks = !useMoonshot && @@ -841,6 +870,23 @@ export async function postChatCompletions(params: { logger: providerLogger, insertMessageBigquery, } + const callDeepSeekStream = async () => { + try { + return await handleDeepSeekStream(baseArgs) + } catch (error) { + if (canFailoverDeepSeekToFireworks(error, typedBody.model)) { + providerLogger.warn( + { + model: typedBody.model, + error: getErrorObject(error), + }, + 'DeepSeek failed pre-stream — falling back to Fireworks', + ) + return await handleFireworksStream(baseArgs) + } + throw error + } + } const stream = useSiliconFlow ? await handleSiliconFlowStream(baseArgs) : useMoonshot @@ -850,7 +896,7 @@ export async function postChatCompletions(params: { : useCanopyWave ? await handleCanopyWaveStream(baseArgs) : useDeepSeek - ? await handleDeepSeekStream(baseArgs) + ? await callDeepSeekStream() : useFireworks ? await handleFireworksStream(baseArgs) : useOpenAIDirect @@ -886,10 +932,18 @@ export async function postChatCompletions(params: { const useMoonshot = !useOpenCodeZen && isMoonshotModel(model) const useCanopyWave = !useMoonshot && !useOpenCodeZen && isCanopyWaveModel(model) + const deepseekBypassed = shouldBypassDeepSeek(model) + if (deepseekBypassed) { + providerLogger.info( + { model }, + 'DeepSeek circuit open — routing to Fireworks fallback', + ) + } const useDeepSeek = !useMoonshot && !useOpenCodeZen && !useCanopyWave && + !deepseekBypassed && isDeepSeekModel(model) const useFireworks = !useMoonshot && @@ -914,6 +968,20 @@ export async function postChatCompletions(params: { logger: providerLogger, insertMessageBigquery, } + const callDeepSeekNonStream = async () => { + try { + return await handleDeepSeekNonStream(baseArgs) + } catch (error) { + if (canFailoverDeepSeekToFireworks(error, model)) { + providerLogger.warn( + { model, error: getErrorObject(error) }, + 'DeepSeek failed — falling back to Fireworks', + ) + return await handleFireworksNonStream(baseArgs) + } + throw error + } + } const nonStreamRequest = useSiliconFlow ? handleSiliconFlowNonStream(baseArgs) : useMoonshot @@ -923,7 +991,7 @@ export async function postChatCompletions(params: { : useCanopyWave ? handleCanopyWaveNonStream(baseArgs) : useDeepSeek - ? handleDeepSeekNonStream(baseArgs) + ? callDeepSeekNonStream() : useFireworks ? handleFireworksNonStream(baseArgs) : shouldUseOpenAIEndpoint diff --git a/web/src/llm-api/__tests__/deepseek-health.test.ts b/web/src/llm-api/__tests__/deepseek-health.test.ts new file mode 100644 index 000000000..bad485475 --- /dev/null +++ b/web/src/llm-api/__tests__/deepseek-health.test.ts @@ -0,0 +1,113 @@ +import { afterEach, beforeEach, describe, expect, it } from 'bun:test' + +import { deepseekModels } from '@codebuff/common/constants/model-config' + +import { + DEEPSEEK_CIRCUIT_CONFIG, + __resetDeepSeekCircuitForTests, + isDeepSeekCircuitOpen, + isLikelyDeepSeekOutage, + recordDeepSeekFailure, + recordDeepSeekSuccess, + shouldBypassDeepSeek, +} from '../deepseek-health' + +describe('DeepSeek circuit breaker', () => { + beforeEach(() => { + __resetDeepSeekCircuitForTests() + }) + afterEach(() => { + __resetDeepSeekCircuitForTests() + }) + + it('starts closed', () => { + expect(isDeepSeekCircuitOpen()).toBe(false) + expect(shouldBypassDeepSeek(deepseekModels.deepseekV4Flash)).toBe(false) + }) + + it('stays closed after fewer failures than threshold', () => { + for (let i = 0; i < DEEPSEEK_CIRCUIT_CONFIG.FAILURE_THRESHOLD - 1; i++) { + recordDeepSeekFailure() + } + expect(isDeepSeekCircuitOpen()).toBe(false) + }) + + it('opens after threshold failures in the window', () => { + for (let i = 0; i < DEEPSEEK_CIRCUIT_CONFIG.FAILURE_THRESHOLD; i++) { + recordDeepSeekFailure() + } + expect(isDeepSeekCircuitOpen()).toBe(true) + }) + + it('only bypasses v4-flash variants, not v4-pro', () => { + for (let i = 0; i < DEEPSEEK_CIRCUIT_CONFIG.FAILURE_THRESHOLD; i++) { + recordDeepSeekFailure() + } + expect(shouldBypassDeepSeek(deepseekModels.deepseekV4Flash)).toBe(true) + expect(shouldBypassDeepSeek(deepseekModels.deepseekV4FlashDirect)).toBe( + true, + ) + expect(shouldBypassDeepSeek(deepseekModels.deepseekV4Pro)).toBe(false) + expect(shouldBypassDeepSeek(deepseekModels.deepseekV4ProDirect)).toBe(false) + expect(shouldBypassDeepSeek('anthropic/claude-sonnet-4.5')).toBe(false) + }) + + it('resets on success', () => { + for (let i = 0; i < DEEPSEEK_CIRCUIT_CONFIG.FAILURE_THRESHOLD; i++) { + recordDeepSeekFailure() + } + expect(isDeepSeekCircuitOpen()).toBe(true) + recordDeepSeekSuccess() + expect(isDeepSeekCircuitOpen()).toBe(false) + expect(shouldBypassDeepSeek(deepseekModels.deepseekV4Flash)).toBe(false) + }) +}) + +describe('isLikelyDeepSeekOutage', () => { + it('treats 5xx, 408, 429 as outages', () => { + expect(isLikelyDeepSeekOutage(undefined, 500)).toBe(true) + expect(isLikelyDeepSeekOutage(undefined, 502)).toBe(true) + expect(isLikelyDeepSeekOutage(undefined, 503)).toBe(true) + expect(isLikelyDeepSeekOutage(undefined, 504)).toBe(true) + expect(isLikelyDeepSeekOutage(undefined, 408)).toBe(true) + expect(isLikelyDeepSeekOutage(undefined, 429)).toBe(true) + }) + + it('does not treat 4xx (other than 408/429) as outages', () => { + expect(isLikelyDeepSeekOutage(undefined, 400)).toBe(false) + expect(isLikelyDeepSeekOutage(undefined, 401)).toBe(false) + expect(isLikelyDeepSeekOutage(undefined, 403)).toBe(false) + expect(isLikelyDeepSeekOutage(undefined, 404)).toBe(false) + }) + + it('classifies undici header-timeout errors as outages', () => { + const error = Object.assign(new Error('Headers Timeout Error'), { + code: 'UND_ERR_HEADERS_TIMEOUT', + }) + expect(isLikelyDeepSeekOutage(error)).toBe(true) + }) + + it('classifies common network errors as outages', () => { + for (const code of [ + 'ECONNRESET', + 'ECONNREFUSED', + 'ENOTFOUND', + 'ETIMEDOUT', + ]) { + const error = Object.assign(new Error('boom'), { code }) + expect(isLikelyDeepSeekOutage(error)).toBe(true) + } + }) + + it('classifies AbortError as outage', () => { + const error = new Error('aborted') + error.name = 'AbortError' + expect(isLikelyDeepSeekOutage(error)).toBe(true) + }) + + it('treats generic non-network errors as non-outage', () => { + expect(isLikelyDeepSeekOutage(new Error('bad json'))).toBe(false) + expect(isLikelyDeepSeekOutage(undefined)).toBe(false) + expect(isLikelyDeepSeekOutage('string')).toBe(false) + }) +}) diff --git a/web/src/llm-api/deepseek-health.ts b/web/src/llm-api/deepseek-health.ts new file mode 100644 index 000000000..ce55fb45d --- /dev/null +++ b/web/src/llm-api/deepseek-health.ts @@ -0,0 +1,150 @@ +import { deepseekModels } from '@codebuff/common/constants/model-config' + +import { logger } from '@/util/logger' + +/** + * Passive circuit breaker for the official DeepSeek API. + * + * Tracks transient failures observed from real user requests in a rolling + * window. When the threshold is exceeded, the circuit opens for a cooldown, + * and supported models (currently `deepseek-v4-flash`) are routed to Fireworks + * instead. No background polling — every request acts as the probe. After the + * cooldown elapses, the next request retries DeepSeek directly; if it + * succeeds the circuit resets, otherwise it re-opens. + * + * State lives in-process. Each server instance maintains its own view, which + * is fine: failures are correlated across pods, so all instances converge to + * the same state within a few seconds. + */ + +const FAILURE_THRESHOLD = 3 +const FAILURE_WINDOW_MS = 60_000 +const OPEN_DURATION_MS = 5 * 60_000 + +let recentFailures: number[] = [] +let openUntil = 0 + +function isDeepSeekV4FlashModel(model: string): boolean { + return ( + model === deepseekModels.deepseekV4Flash || + model === deepseekModels.deepseekV4FlashDirect + ) +} + +export function recordDeepSeekFailure(context?: { + model?: string + reason?: string + statusCode?: number +}): void { + const now = Date.now() + recentFailures = recentFailures.filter((ts) => now - ts < FAILURE_WINDOW_MS) + recentFailures.push(now) + const wasOpen = now < openUntil + if (recentFailures.length >= FAILURE_THRESHOLD) { + openUntil = now + OPEN_DURATION_MS + if (!wasOpen) { + logger.warn( + { + failureCount: recentFailures.length, + openUntilIso: new Date(openUntil).toISOString(), + ...context, + }, + 'DeepSeek circuit opened — routing deepseek-v4-flash to Fireworks', + ) + } + } +} + +export function recordDeepSeekSuccess(): void { + if (openUntil !== 0 || recentFailures.length > 0) { + logger.info( + { previousFailureCount: recentFailures.length }, + 'DeepSeek circuit reset after successful request', + ) + } + recentFailures = [] + openUntil = 0 +} + +export function isDeepSeekCircuitOpen(): boolean { + return Date.now() < openUntil +} + +/** Returns true if this request should bypass DeepSeek and use the Fireworks + * fallback. Only `deepseek-v4-flash` has a Fireworks alternative today. */ +export function shouldBypassDeepSeek(model: string): boolean { + if (!isDeepSeekV4FlashModel(model)) return false + return isDeepSeekCircuitOpen() +} + +/** Classify whether an error/status reflects a likely DeepSeek-side outage + * (network/timeout/5xx) vs. a request-specific 4xx. We only count outages + * toward circuit-opening. */ +export function isLikelyDeepSeekOutage( + error: unknown, + statusCode?: number, +): boolean { + if (typeof statusCode === 'number') { + return statusCode >= 500 || statusCode === 408 || statusCode === 429 + } + if (error instanceof Error) { + const code = (error as { code?: string }).code + if ( + code === 'UND_ERR_HEADERS_TIMEOUT' || + code === 'UND_ERR_BODY_TIMEOUT' || + code === 'UND_ERR_CONNECT_TIMEOUT' || + code === 'UND_ERR_SOCKET' || + code === 'ECONNRESET' || + code === 'ECONNREFUSED' || + code === 'ENOTFOUND' || + code === 'ETIMEDOUT' || + code === 'EAI_AGAIN' + ) { + return true + } + if ( + error.name === 'AbortError' || + error.name === 'HeadersTimeoutError' || + error.name === 'TimeoutError' + ) { + return true + } + const msg = error.message?.toLowerCase() ?? '' + return ( + msg.includes('headers timeout') || + msg.includes('fetch failed') || + msg.includes('socket hang up') || + msg.includes('connect timeout') || + msg.includes('network') || + msg.includes('econnreset') || + msg.includes('aborted') + ) + } + return false +} + +export function getDeepSeekCircuitState(): { + circuitOpen: boolean + openUntil: number + recentFailureCount: number +} { + const now = Date.now() + return { + circuitOpen: now < openUntil, + openUntil, + recentFailureCount: recentFailures.filter( + (ts) => now - ts < FAILURE_WINDOW_MS, + ).length, + } +} + +export function __resetDeepSeekCircuitForTests(): void { + recentFailures = [] + openUntil = 0 +} + +export const DEEPSEEK_CIRCUIT_CONFIG = { + FAILURE_THRESHOLD, + FAILURE_WINDOW_MS, + OPEN_DURATION_MS, +} as const diff --git a/web/src/llm-api/deepseek.ts b/web/src/llm-api/deepseek.ts index 359bf9738..a58ebccf0 100644 --- a/web/src/llm-api/deepseek.ts +++ b/web/src/llm-api/deepseek.ts @@ -15,6 +15,11 @@ import { buildDeepSeekRequestBody, DEEPSEEK_MODEL_IDS, } from './deepseek-request-body' +import { + isLikelyDeepSeekOutage, + recordDeepSeekFailure, + recordDeepSeekSuccess, +} from './deepseek-health' import type { UsageData } from './helpers' import type { InsertMessageBigqueryFn } from '@codebuff/common/types/contracts/bigquery' @@ -27,11 +32,26 @@ const DEEPSEEK_BASE_URL = 'https://api.deepseek.com' // a long time to start streaming. const DEEPSEEK_HEADERS_TIMEOUT_MS = 30 * 60 * 1000 +// Tighter TTFB timeout for the non-reasoning Flash model so that when DeepSeek +// is unreachable we surface a failure within seconds instead of holding the +// connection open for half an hour. This feeds the circuit breaker which then +// fails open to Fireworks for subsequent requests. +const DEEPSEEK_FLASH_HEADERS_TIMEOUT_MS = 60 * 1000 + const deepseekAgent = new Agent({ headersTimeout: DEEPSEEK_HEADERS_TIMEOUT_MS, bodyTimeout: 0, }) +const deepseekFlashAgent = new Agent({ + headersTimeout: DEEPSEEK_FLASH_HEADERS_TIMEOUT_MS, + bodyTimeout: 0, +}) + +function getDeepSeekDispatcher(model: string): Agent { + return isDeepSeekV4FlashModel(model) ? deepseekFlashAgent : deepseekAgent +} + // DeepSeek per-token pricing (dollars per token) interface DeepSeekPricing { inputCostPerToken: number @@ -82,6 +102,38 @@ function isDeepSeekV4FlashModel(model: string): boolean { ) } +/** Wraps createDeepSeekRequest so that transient outages (network, timeout, + * 5xx) feed the circuit breaker and a healthy response resets it. 4xx + * request errors (bad input, auth) are NOT counted as outages. */ +async function createDeepSeekRequestTracked(params: { + body: ChatCompletionRequestBody + originalModel: string + fetch: typeof globalThis.fetch +}): Promise { + let response: Response + try { + response = await createDeepSeekRequest(params) + } catch (error) { + if (isLikelyDeepSeekOutage(error)) { + recordDeepSeekFailure({ + model: params.originalModel, + reason: error instanceof Error ? error.message : String(error), + }) + } + throw error + } + if (isLikelyDeepSeekOutage(undefined, response.status)) { + recordDeepSeekFailure({ + model: params.originalModel, + reason: `HTTP ${response.status}`, + statusCode: response.status, + }) + } else if (response.ok) { + recordDeepSeekSuccess() + } + return response +} + function getDeepSeekPricing(model: string): DeepSeekPricing { const entry = DEEPSEEK_MODELS[model] if (!entry) { @@ -131,7 +183,7 @@ export function createDeepSeekRequest(params: { }, body: JSON.stringify(deepseekBody), // @ts-expect-error - dispatcher is a valid undici option not in fetch types - dispatcher: deepseekAgent, + dispatcher: getDeepSeekDispatcher(originalModel), }) } @@ -206,7 +258,11 @@ export async function handleDeepSeekNonStream({ }) const auditRequest = createRequestAuditRecord(body) - const response = await createDeepSeekRequest({ body, originalModel, fetch }) + const response = await createDeepSeekRequestTracked({ + body, + originalModel, + fetch, + }) if (!response.ok) { throw await parseDeepSeekError(response) @@ -291,7 +347,11 @@ export async function handleDeepSeekStream({ const auditRequest = createRequestAuditRecord(body) const skipDisconnectedBilling = isDeepSeekV4FlashModel(body.model) - const response = await createDeepSeekRequest({ body, originalModel, fetch }) + const response = await createDeepSeekRequestTracked({ + body, + originalModel, + fetch, + }) if (!response.ok) { throw await parseDeepSeekError(response) diff --git a/web/src/llm-api/fireworks.ts b/web/src/llm-api/fireworks.ts index 2bd9cbe79..7ae2ec262 100644 --- a/web/src/llm-api/fireworks.ts +++ b/web/src/llm-api/fireworks.ts @@ -38,6 +38,10 @@ const FIREWORKS_MODEL_MAP: Record = { 'minimax/minimax-m2.7': 'accounts/fireworks/models/minimax-m2p7', 'moonshotai/kimi-k2.6': 'accounts/fireworks/models/kimi-k2p6', 'z-ai/glm-5.1': 'accounts/fireworks/models/glm-5p1', + // Fallback target when the official DeepSeek API is unhealthy. Routing is + // gated by the DeepSeek circuit breaker in deepseek-health.ts — when the + // circuit is closed, requests still go to the official API. + 'deepseek/deepseek-v4-flash': 'accounts/fireworks/models/deepseek-v4-flash', } /** Models that stay limited to freebuff deployment hours even on serverless. */ @@ -191,6 +195,11 @@ const FIREWORKS_PRICING_MAP: Record = { cachedInputCostPerToken: 0.26 / 1_000_000, outputCostPerToken: 4.4 / 1_000_000, }, + 'deepseek/deepseek-v4-flash': { + inputCostPerToken: 0.14 / 1_000_000, + cachedInputCostPerToken: 0.03 / 1_000_000, + outputCostPerToken: 0.28 / 1_000_000, + }, } function getFireworksPricing(model: string): FireworksPricing {