diff --git a/docs/architecture/core-decomposition.md b/docs/architecture/core-decomposition.md index 95cb3e5f9..f4d9197d7 100644 --- a/docs/architecture/core-decomposition.md +++ b/docs/architecture/core-decomposition.md @@ -298,8 +298,11 @@ owner 边界,否则不要把一个 feature group 继续拆成更小的 crate concrete scheduler/session restore/terminal adapter、workspace-root source、 persistence/workspace service reads、remote-SSH runtime、agent registry/scheduler 等仍必须 另起 port/provider 设计和等价评审;HR3 进一步把这些仍 core-owned 的 - service/agent runtime 绑定入口集中到 `src/crates/core/src/service_agent_runtime.rs`, - 不改变 remote-connect、remote-SSH 或 scheduler 执行路径。 + service/agent runtime 绑定入口集中到 `src/crates/core/src/service_agent_runtime.rs`: + remote dialog/cancel/file/tracker host adapter、remote model catalog/session-model + selection adapter、remote chat history persistence/message conversion adapter、 + remote image-context conversion 和 coordinator runtime-port binding 均由该入口承载,不改变 remote-connect、 + remote-SSH 或 scheduler 执行路径。 最新 main 的 `/goal` 模式、goal verification events、request-context section policy、 prompt compression/cache、workspace related-path prompt facts、subagent cancellation 与 running-turn/continuation 语义必须纳入 HR-C 保护;这些不是普通 DTO 移动项。 diff --git a/docs/plans/core-decomposition-plan.md b/docs/plans/core-decomposition-plan.md index f2c475b8c..1f8b83c47 100644 --- a/docs/plans/core-decomposition-plan.md +++ b/docs/plans/core-decomposition-plan.md @@ -1859,10 +1859,14 @@ source、persistence/workspace service reads 与 `ImageContextData` concrete imp **HR3 closure update(2026-05-21):** HR3 now also centralizes the still core-owned service/agent runtime bindings in -`src/crates/core/src/service_agent_runtime.rs`: remote dialog host factory, -remote image-context conversion, and `ConversationCoordinator` runtime-port -adapter binding. This does not move remote-SSH runtime, workspace-root source, -persistence/workspace service reads, concrete scheduler/session restore/terminal adapters, +`src/crates/core/src/service_agent_runtime.rs`: remote dialog/cancel/file/tracker +host adapters, remote model catalog/session-model selection config adapters, +remote chat history persistence/message conversion adapters, remote image-context +conversion, core-owned workspace-root lookup helper, and `ConversationCoordinator` +runtime-port adapter binding. The model/history adapter moves only centralize existing +core reads and persisted-turn-to-mobile-message conversion for remote catalog/session +selection and chat history; they do not move remote-SSH runtime, workspace-root source +ownership, broad persistence/workspace service ownership, concrete scheduler/session restore/terminal adapters, `ImageContextData` concrete ownership, or agent registry/scheduler behavior out of core; those remain high-risk owner topics requiring a separate port/provider design and equivalence tests before any deeper migration. @@ -1892,7 +1896,7 @@ owner-by-owner 迁移队列,不得把它们当作 H4 漏项补进当前 PR。 |---|---|---|---|---| | HR1:tool runtime deep owner migration | 条件性高风险 PR | 当前已完成 core 内部 `product_runtime.rs` 单一 owner 收口;后续若继续深迁移,只允许在单独评审后移动 `ToolUseContext`、runtime manifest execution owner、`GetToolSpecTool` Tool impl、collapsed unlock state、snapshot wrapper implementation 或 concrete tools,也可以明确继续 core-owned | MiniApp/function-agent runtime、remote-connect / remote-SSH runtime、default feature 或构建收益声明 | 先补 port/provider 设计;证明 builtin/readonly/dynamic manifest、expanded/collapsed exposure、unlock state、dynamic provider metadata、snapshot wrapping、runtime restrictions、cancellation 和 Deep Review tool flow 等价 | | HR2:product-domain runtime deep owner migration | 条件性高风险 PR | 当前已完成 core 内部 `product_domain_runtime.rs` 单一 owner 收口;MiniApp filesystem IO、worker process execution、host dispatch、built-in asset include / seed / marker IO / recompile,以及 function-agent Git/AI service adapter / AI client call 继续显式 core-owned;后续若继续深迁移,只允许在单独评审后移动一个 owner 主题 | tool runtime、remote-connect / remote-SSH runtime、CLI/Desktop/Remote/ACP surface 行为变更 | `product-domains` 不依赖 core;PathManager、process execution、permission policy、Git/AI error/transport mapping 和 focused MiniApp/function-agent regression 已作为后续迁移门禁 | -| HR3:service / agent runtime deep owner migration | 条件性高风险 PR | 当前已完成 core 内部 `service_agent_runtime.rs` 单一 owner 收口;remote-SSH manager / remote FS / terminal、remote-connect workspace-root source / persistence/workspace service reads / `ImageContextData` concrete impl / concrete scheduler-session-restore-terminal adapter、agent registry / scheduler 继续显式 core-owned;后续若继续深迁移,只允许在单独评审后移动一个 owner 主题 | tool runtime、product-domain runtime、feature matrix 或产品逻辑变更 | 有 port/provider 设计、旧路径兼容、mode-scoped subagent visibility / background delivery / remote-connect dialog order / remote workspace guard / DeepResearch post-turn hook 等行为等价测试 | +| HR3:service / agent runtime deep owner migration | 条件性高风险 PR | 当前已完成 core 内部 `service_agent_runtime.rs` 单一 owner 收口;remote dialog/cancel/file/tracker adapter、remote model/session selection adapter 与 remote chat history persistence/message conversion adapter 已集中。remote-SSH manager / remote FS / terminal、remote-connect workspace-root source / broad persistence/workspace service ownership / `ImageContextData` concrete impl / concrete scheduler-session-restore-terminal adapter、agent registry / scheduler 继续显式 core-owned;后续若继续深迁移,只允许在单独评审后移动一个 owner 主题 | tool runtime、product-domain runtime、feature matrix 或产品逻辑变更 | 有 port/provider 设计、旧路径兼容、mode-scoped subagent visibility / background delivery / remote-connect dialog order / remote chat history shape / remote workspace guard / DeepResearch post-turn hook 等行为等价测试 | | H5:feature/build-benefit evaluation | 可选评估 PR | `bitfun-core default = []`、per-product explicit feature set、依赖版本收敛、构建收益数据记录 | 任何 runtime owner 迁移、产品逻辑变更或构建脚本改造 | 有 feature graph baseline、`cargo check -p bitfun-core`、workspace check、目标 crate check 和必要 product check 的前后数据;若收益不清晰则不执行 | **H5 start(2026-05-21):** 本轮先完成 feature graph baseline 的第一道编译门禁: @@ -2017,7 +2021,8 @@ HR2:product-domain runtime deep owner migration 的主要风险和控制点: HR3:service / agent runtime deep owner migration 的主要风险和控制点: - 当前 HR3 结论:本轮只完成 core 内部 owner closure。`CoreServiceAgentRuntime` - 集中创建 remote dialog host、remote image context adapter 和 + 集中创建 remote dialog/cancel/file/tracker host、remote model catalog/session-model + selection adapter、remote image context adapter 和 `ConversationCoordinator` 的 runtime-port binding;这让后续 service/agent runtime 深迁移有唯一审查入口,但不改变 remote-connect / remote-SSH / scheduler / registry 的实际执行路径。 @@ -2194,7 +2199,7 @@ git diff -- package.json scripts/dev.cjs scripts/desktop-tauri-build.mjs scripts 14. 已完成:remote-connect tracker / wire / pure policy owner slice:产品表面 DTO 已以 contract-only 方式进入 `bitfun-core-types`;`bitfun-services-integrations` 的 `remote-connect` feature 拥有 remote command/response wire DTO、remote model catalog DTO、poll response assembly / model catalog poll delta、remote chat/image/tool/session wire DTO、relay/bot session/submission request builder、remote image attachment/request DTO、tracker state / registry lifecycle、tracker event reduction、legacy image context fallback / preference、restore target decision、cancel decision 与 remote file transfer size/chunk/name policy;relay/bot 创建 session 通过 `AgentSubmissionPort`,取消、远程状态读取和事件事实已有 `runtime-ports` 契约。H3 进一步把远程消息提交编排、cancel-task orchestration、terminal pre-warm decision、remote workspace file IO/path helper、remote file command / response assembly、dialog/cancel/interaction response helper、workspace/session response assembly helper 与 image-context adapter contract 迁入 `bitfun-services-integrations` port/provider;concrete terminal adapter、workspace/session restore 执行、workspace-root source、persistence/workspace service reads 与 `ImageContextData` concrete impl 仍保留在 `bitfun-core` product runtime assembly。 15. 已完成:agent tools + `tool-packs` owner 化低风险闭环;tool contract / DTO、runtime restriction、path resolution、host path normalization / runtime artifact URI / remote POSIX path pure contract、allowed-list / collapsed-tool execution gate policy、portable context facts/provider、generic registry / static provider installation / dynamic provider container 已归属 `bitfun-agent-tools`,`tool-packs` 提供 feature-group scaffold 和 product provider group plan,core 保留 concrete tool materialization、product snapshot wrapper adapter、`ToolUseContext` 和 concrete tool implementation,后续外移需单独 service port/provider 设计。 16. 已完成:关键语义回归 baseline,不移动 runtime owner。覆盖 MCP config failure / catalog invalidation / 既有 list-changed helper / dynamic manifest、tool manifest / `GetToolSpec`、product-domains adapter equivalence、remote workspace search fallback 的 focused tests 或 snapshots。 -17. 已完成:remote-connect runtime 当前批次收口与 HR3 core owner 收口。已基于当前 port baseline 记录 remote command/response、remote model catalog、poll response、model catalog delta、session restore、active turn、cancel、image context、tracker event、queue/event fanout、workspace/session response shape 与 dialog orchestration 顺序的输入输出和验证命令;tracker state / registry lifecycle、legacy image context fallback / preference、restore target decision、cancel decision、cancel-task orchestration、remote file transfer size/chunk/name policy、remote workspace file IO/path helper、remote file command / response assembly、dialog/cancel/interaction response helper、workspace/session response assembly helper、image-context adapter contract 与 RemoteRelay/Bot dialog submission orchestration 已迁入 `bitfun-services-integrations`。HR3 进一步用 `CoreServiceAgentRuntime` 集中 dispatcher compatibility wrapper 所需的 remote dialog/cancel/file host、remote image context conversion 和 `ConversationCoordinator` runtime-port binding;product execution、workspace-root source、persistence/workspace service reads、`ImageContextData` concrete impl、concrete terminal pre-warm adapter、workspace/session restore 执行、remote-SSH runtime 与 agent registry/scheduler 显式保留在 core-owned runtime;后续只有在另起 port/provider 设计且 focused regression 继续通过时才允许继续移动这些 runtime owner,不能把 generic attachment guard 当作已接入多模态行为。 +17. 已完成:remote-connect runtime 当前批次收口与 HR3 core owner 收口。已基于当前 port baseline 记录 remote command/response、remote model catalog、poll response、model catalog delta、session restore、active turn、cancel、image context、tracker event、queue/event fanout、workspace/session response shape 与 dialog orchestration 顺序的输入输出和验证命令;tracker state / registry lifecycle、legacy image context fallback / preference、restore target decision、cancel decision、cancel-task orchestration、remote file transfer size/chunk/name policy、remote workspace file IO/path helper、remote file command / response assembly、dialog/cancel/interaction response helper、workspace/session response assembly helper、image-context adapter contract 与 RemoteRelay/Bot dialog submission orchestration 已迁入 `bitfun-services-integrations`。HR3 进一步用 `CoreServiceAgentRuntime` 集中 dispatcher compatibility wrapper 所需的 remote dialog/cancel/file/tracker host、remote model catalog/session-model selection config adapter、remote chat history persistence/message conversion adapter、remote image context conversion 和 `ConversationCoordinator` runtime-port binding;product execution、workspace-root source、broad persistence/workspace service ownership、`ImageContextData` concrete impl、concrete terminal pre-warm adapter、workspace/session restore 执行、remote-SSH runtime 与 agent registry/scheduler 显式保留在 core-owned runtime;后续只有在另起 port/provider 设计且 focused regression 继续通过时才允许继续移动这些 runtime owner,不能把 generic attachment guard 当作已接入多模态行为。 18. 已完成:`product-domains` runtime port/facade closure 与 HR2/HR-B core owner 收口。已迁入 MiniApp storage-backed runtime-state facade、MiniApp create/update/draft/apply/import pure state transitions、imported meta identity/timestamp helper、built-in seed plan / marker wire helper / seed meta timestamp policy 与 function-agent Git/AI port-backed runtime facade,并补充 focused contract tests;core 只对 MiniApp deps/restart/recompile/sync/rollback/import 的状态持久化委托 facade,仍保留 `PathManager` 注入、filesystem/source IO、worker process execution、host dispatch 执行、built-in asset seeding/source-hash lookup。Git commit-message 与 Startchat work-state 产品路径已通过 core-owned Git/AI adapter 接入 function-agent facade;H2 已将 function-agent prompt template、AI response JSON extraction、JSON repair、domain error mapping 与 JSON-to-domain parsing policy 迁入 product-domain,HR2 进一步用 `CoreProductDomainRuntime` 集中 core-owned MiniApp/function-agent runtime 绑定,core 继续保留 Git/AI service adapter、AI client 调用、provider acquisition 与 AI transport error mapping。Startchat 接线已用 no-HEAD diff fallback、非 Git 目录空状态和 `analyze_git=false` time-info 保护旧行为,`analyzed_at` 仍由 core 在 AI 分析完成后赋值。 19. 已合入:tool runtime owner 迁移前置基线。纯 helper 已从 runtime owner 中剥离到 `bitfun-agent-tools`:`StaticToolProviderGroup`、registry snapshot 到 manifest policy input、generic collapsed exposure 查询、`GetToolSpec` collapsed-load 纯收集规则、prompt-visible manifest definition 组装规则和 `GetToolSpec` catalog/detail helper;完整 collapsed 工具清单、runtime context 传递和 portable facts 边界回归已作为迁移前基线。 20. 已合入:core-owned tool runtime assembly closure。`ProductToolRuntime` 作为 core 内部单一 owner 收敛 static provider 安装和 product snapshot wrapper adapter,并保持 legacy `create_tool_registry()`、global registry、dynamic MCP tools、manifest resolver、`GetToolSpecTool` 执行、`ToolUseContext` 和 concrete tools 的行为边界不变。 diff --git a/scripts/check-core-boundaries.mjs b/scripts/check-core-boundaries.mjs index dc4aac192..0d311770f 100644 --- a/scripts/check-core-boundaries.mjs +++ b/scripts/check-core-boundaries.mjs @@ -1183,6 +1183,61 @@ const forbiddenContentRules = [ { path: 'src/crates/core/src/service/remote_connect/remote_server.rs', patterns: [ + { + regex: /\bpub\(crate\) struct CoreRemoteDialogRuntimeHost\b/, + message: + 'remote_server must not own concrete remote dialog runtime host; keep it in service_agent_runtime', + }, + { + regex: /\bpub\(crate\) struct CoreRemoteCancelRuntimeHost\b/, + message: + 'remote_server must not own concrete remote cancel runtime host; keep it in service_agent_runtime', + }, + { + regex: /\bpub\(crate\) struct CoreRemoteWorkspaceFileRuntimeHost\b/, + message: + 'remote_server must not own concrete remote workspace file runtime host; keep it in service_agent_runtime', + }, + { + regex: /\bstruct CoreRemoteSessionTrackerHost\b/, + message: + 'remote_server must not own concrete remote tracker host; keep it in service_agent_runtime', + }, + { + regex: /\basync fn resolve_session_model_id\b/, + message: + 'remote_server must not own remote session model resolution; keep it in service_agent_runtime', + }, + { + regex: /\basync fn load_remote_model_catalog\b/, + message: + 'remote_server must not own remote model catalog loading; keep it in service_agent_runtime', + }, + { + regex: /\bget_global_config_service\b/, + message: + 'remote_server must not own remote model config access; route it through service_agent_runtime', + }, + { + regex: /\bfn compress_data_url_for_mobile\b/, + message: + 'remote_server must not own remote chat thumbnail compression; keep it in service_agent_runtime', + }, + { + regex: /\bfn turns_to_chat_messages\b/, + message: + 'remote_server must not own persisted turn to remote chat conversion; keep it in service_agent_runtime', + }, + { + regex: /\basync fn load_chat_messages_from_conversation_persistence\b/, + message: + 'remote_server must not own remote chat history persistence loading; keep it in service_agent_runtime', + }, + { + regex: /\bfn strip_user_input_tags\b/, + message: + 'remote_server must not own remote user input display cleanup; keep it in service_agent_runtime', + }, { regex: /\bpub struct ImageAttachment\b/, message: 'core remote-connect server must not redefine image attachment wire DTOs; use the integrations contract', @@ -2237,6 +2292,38 @@ const requiredContentRules = [ regex: /\bfn remote_image_context\b/, message: 'missing remote image context owner adapter', }, + { + regex: /\bfn load_remote_model_catalog\b/, + message: 'missing remote model catalog owner adapter', + }, + { + regex: /\bfn update_remote_session_model\b/, + message: 'missing remote session model update owner adapter', + }, + { + regex: /\bfn normalize_remote_session_model_id\b/, + message: 'missing remote session model id normalization regression hook', + }, + { + regex: /\bfn normalize_remote_model_selection\b/, + message: 'missing remote model selection normalization regression hook', + }, + { + regex: /\bfn remote_chat_messages_from_turns\b/, + message: 'missing remote chat history conversion owner adapter', + }, + { + regex: /\bfn strip_remote_user_input_tags\b/, + message: 'missing remote user input display cleanup owner adapter', + }, + { + regex: /\bfn compress_remote_chat_data_url_for_mobile\b/, + message: 'missing remote chat thumbnail compression owner adapter', + }, + { + regex: /\bfn load_remote_chat_messages\b/, + message: 'missing remote chat history persistence owner adapter', + }, { regex: /\bfn agent_submission_port\b/, message: 'missing agent submission port owner binding', @@ -2257,10 +2344,34 @@ const requiredContentRules = [ regex: /\bCoreRemoteCancelRuntimeHost\b/, message: 'missing core remote cancel host binding', }, + { + regex: /\bCoreRemoteWorkspaceFileRuntimeHost\b/, + message: 'missing core remote workspace file host binding', + }, + { + regex: /\bCoreRemoteSessionTrackerHost\b/, + message: 'missing core remote session tracker host binding', + }, { regex: /\bRemoteExecutionDispatcher\b/, message: 'missing remote execution dispatcher binding', }, + { + regex: /\bimpl RemoteDialogRuntimeHost for CoreRemoteDialogRuntimeHost\b/, + message: 'missing remote dialog host adapter implementation in runtime owner', + }, + { + regex: /\bimpl RemoteCancelRuntimeHost for CoreRemoteCancelRuntimeHost\b/, + message: 'missing remote cancel host adapter implementation in runtime owner', + }, + { + regex: /\bimpl RemoteWorkspaceFileRuntimeHost for CoreRemoteWorkspaceFileRuntimeHost\b/, + message: 'missing remote workspace file host adapter implementation in runtime owner', + }, + { + regex: /\bimpl RemoteSessionTrackerHost for CoreRemoteSessionTrackerHost\b/, + message: 'missing remote tracker host adapter implementation in runtime owner', + }, { regex: /\bImageContextData\b/, message: 'missing core image context binding', @@ -2289,6 +2400,22 @@ const requiredContentRules = [ regex: /\bcore_service_agent_runtime_owner_keeps_coordinator_port_contracts\b/, message: 'missing coordinator runtime port contract regression', }, + { + regex: /\bcore_service_agent_runtime_owner_normalizes_remote_session_model_ids\b/, + message: 'missing remote session model id normalization regression', + }, + { + regex: /\bcore_service_agent_runtime_owner_normalizes_remote_model_selection_aliases\b/, + message: 'missing remote model selection alias regression', + }, + { + regex: /\bcore_service_agent_runtime_owner_preserves_remote_chat_history_shape\b/, + message: 'missing remote chat history conversion regression', + }, + { + regex: /\bcore_service_agent_runtime_owner_skips_in_progress_remote_assistant_history\b/, + message: 'missing in-progress remote assistant history regression', + }, ], }, { @@ -2653,30 +2780,6 @@ const requiredContentRules = [ regex: /\bCoreServiceAgentRuntime\b/, message: 'missing core service/agent runtime owner routing', }, - { - regex: /\bstruct CoreRemoteDialogRuntimeHost\b/, - message: 'missing core remote dialog runtime adapter', - }, - { - regex: /\bstruct CoreRemoteCancelRuntimeHost\b/, - message: 'missing core remote cancel runtime adapter', - }, - { - regex: /\bstruct CoreRemoteWorkspaceFileRuntimeHost\b/, - message: 'missing core remote file runtime adapter', - }, - { - regex: /\bimpl RemoteDialogRuntimeHost for CoreRemoteDialogRuntimeHost\b/, - message: 'missing integrations dialog host adapter implementation', - }, - { - regex: /\bimpl RemoteCancelRuntimeHost for CoreRemoteCancelRuntimeHost\b/, - message: 'missing integrations cancel host adapter implementation', - }, - { - regex: /\bimpl RemoteWorkspaceFileRuntimeHost for CoreRemoteWorkspaceFileRuntimeHost\b/, - message: 'missing integrations file host adapter implementation', - }, { regex: /\bsubmit_remote_dialog\b/, message: 'missing remote dialog owner orchestration delegation', @@ -5570,11 +5673,21 @@ function runManifestParserSelfTest() { 'remote_dialog_host', 'remote_cancel_host', 'remote_image_context', + 'load_remote_model_catalog', + 'update_remote_session_model', + 'normalize_remote_session_model_id', + 'normalize_remote_model_selection', + 'remote_chat_messages_from_turns', + 'strip_remote_user_input_tags', + 'compress_remote_chat_data_url_for_mobile', + 'load_remote_chat_messages', 'agent_submission_port', 'agent_turn_cancellation_port', 'remote_control_state_port', 'CoreRemoteDialogRuntimeHost', 'CoreRemoteCancelRuntimeHost', + 'CoreRemoteWorkspaceFileRuntimeHost', + 'CoreRemoteSessionTrackerHost', 'RemoteExecutionDispatcher', 'ImageContextData', 'RemoteImageContextAdapter', @@ -5583,6 +5696,10 @@ function runManifestParserSelfTest() { 'RemoteControlStatePort', 'SessionTranscriptReader', 'core_service_agent_runtime_owner_keeps_coordinator_port_contracts', + 'core_service_agent_runtime_owner_normalizes_remote_session_model_ids', + 'core_service_agent_runtime_owner_normalizes_remote_model_selection_aliases', + 'core_service_agent_runtime_owner_preserves_remote_chat_history_shape', + 'core_service_agent_runtime_owner_skips_in_progress_remote_assistant_history', ], }, { diff --git a/src/crates/core/AGENTS.md b/src/crates/core/AGENTS.md index c3b04edb6..317d2da57 100644 --- a/src/crates/core/AGENTS.md +++ b/src/crates/core/AGENTS.md @@ -92,7 +92,9 @@ SessionManager → Session → DialogTurn → ModelRound workspace-root source selection, persistence/workspace service reads, concrete scheduler/session restore, terminal pre-warm adapters, and product execution core-owned until a reviewed migration proves equivalence. Core - service/agent runtime bindings are centralized in + remote dialog/cancel/file/tracker adapters, remote model catalog/session-model + selection adapters, remote chat history persistence/message conversion + adapters, and service/agent runtime bindings are centralized in `src/crates/core/src/service_agent_runtime.rs`. - Keep concrete remote SSH runtime code behind `ssh-remote`. No-default builds may keep workspace identity helpers and explicit unsupported stubs, but must diff --git a/src/crates/core/src/service/remote_connect/remote_server.rs b/src/crates/core/src/service/remote_connect/remote_server.rs index 688b7dfbe..52ac239f6 100644 --- a/src/crates/core/src/service/remote_connect/remote_server.rs +++ b/src/crates/core/src/service/remote_connect/remote_server.rs @@ -8,27 +8,14 @@ //! for state changes using the `PollSession` command, receiving only //! incremental updates (new messages + current active turn snapshot). -use crate::service_agent_runtime::CoreServiceAgentRuntime; -use anyhow::{Result, anyhow}; +use crate::service_agent_runtime::{CoreRemoteSessionTrackerHost, CoreServiceAgentRuntime}; +use anyhow::{anyhow, Result}; use log::{debug, error, info}; use serde_json::Value; use std::sync::{Arc, OnceLock}; use super::encryption; -pub use bitfun_services_integrations::remote_connect::{ - ActiveTurnSnapshot, AssistantEntry, ChatImageAttachment, ChatMessage, ChatMessageItem, - ImageAttachment, RecentWorkspaceEntry, RemoteCommand, RemoteDefaultModelsConfig, - RemoteModelCatalog, RemoteModelConfig, RemoteResponse, RemoteSessionStateTracker, - RemoteToolStatus, SessionInfo, TrackerEvent, -}; use bitfun_services_integrations::remote_connect::{ - RemoteAssistantWorkspaceFacts, RemoteCancelRuntimeHost, RemoteCancelTaskRequest, - RemoteConnectSubmissionSource, RemoteDialogQueuePriority, RemoteDialogResolvedSubmission, - RemoteDialogRuntimeHost, RemoteDialogSubmissionPolicy, RemoteDialogSubmissionRequest, - RemoteDialogSubmitOutcome, RemoteImageContext, RemoteRecentWorkspaceFacts, - RemoteSessionMetadata, RemoteSessionTrackerHost, RemoteSessionTrackerRegistry, - RemoteTerminalPrewarmRequest, RemoteWorkspaceFacts, RemoteWorkspaceFileRuntimeHost, - RemoteWorkspaceKind as RemoteConnectWorkspaceKind, RemoteWorkspaceUpdate, build_remote_image_contexts, cancel_remote_task, handle_remote_workspace_file_command, remote_answer_question_response, remote_assistant_list_response, remote_assistant_updated_response, remote_dialog_submit_response, remote_initial_sync_response, @@ -38,13 +25,18 @@ use bitfun_services_integrations::remote_connect::{ remote_session_created_response, remote_session_deleted_response, remote_session_list_response, remote_session_model_updated_response, remote_snapshot_poll_response, remote_task_cancel_response, remote_workspace_info_response, remote_workspace_updated_response, - resolve_remote_execution_image_contexts, submit_remote_dialog, + resolve_remote_execution_image_contexts, submit_remote_dialog, RemoteAssistantWorkspaceFacts, + RemoteCancelTaskRequest, RemoteConnectSubmissionSource, RemoteDialogSubmissionPolicy, + RemoteDialogSubmissionRequest, RemoteDialogSubmitOutcome, RemoteImageContext, + RemoteRecentWorkspaceFacts, RemoteSessionMetadata, RemoteSessionTrackerRegistry, + RemoteWorkspaceFacts, RemoteWorkspaceKind as RemoteConnectWorkspaceKind, RemoteWorkspaceUpdate, +}; +pub use bitfun_services_integrations::remote_connect::{ + ActiveTurnSnapshot, AssistantEntry, ChatImageAttachment, ChatMessage, ChatMessageItem, + ImageAttachment, RecentWorkspaceEntry, RemoteCommand, RemoteDefaultModelsConfig, + RemoteModelCatalog, RemoteModelConfig, RemoteResponse, RemoteSessionStateTracker, + RemoteToolStatus, SessionInfo, TrackerEvent, }; - -fn current_workspace_path() -> Option { - crate::service::workspace::get_global_workspace_service() - .and_then(|service| service.try_get_current_workspace_path()) -} fn remote_workspace_kind( kind: crate::service::workspace::WorkspaceKind, @@ -66,452 +58,8 @@ fn git_branch_for_workspace_path(path: &std::path::Path) -> Option { }) } -async fn resolve_session_workspace_path(session_id: &str) -> Option { - use crate::agentic::coordination::get_global_coordinator; - - if let Some(coordinator) = get_global_coordinator() { - return coordinator.resolve_session_workspace_path(session_id).await; - } - - None -} - -async fn resolve_file_workspace_root(session_id: Option<&str>) -> Option { - if let Some(session_id) = session_id { - if let Some(workspace_path) = resolve_session_workspace_path(session_id).await { - return Some(workspace_path); - } - } - - current_workspace_path() -} - -async fn resolve_session_model_id(session_id: &str) -> Option { - use crate::agentic::coordination::get_global_coordinator; - - let coordinator = get_global_coordinator()?; - let session_manager = coordinator.get_session_manager(); - - let normalize = |model_id: Option| match model_id { - Some(value) => { - let trimmed = value.trim(); - if trimmed.is_empty() || trimmed == "default" { - Some("auto".to_string()) - } else { - Some(trimmed.to_string()) - } - } - None => Some("auto".to_string()), - }; - - if let Some(session) = session_manager.get_session(session_id) { - return normalize(session.config.model_id.clone()); - } - - let workspace_path = resolve_session_workspace_path(session_id).await?; - coordinator - .restore_session(&workspace_path, session_id) - .await - .ok() - .and_then(|session| normalize(session.config.model_id.clone())) -} - -async fn load_remote_model_catalog( - session_id: Option<&str>, -) -> std::result::Result { - use crate::service::config::{ - get_global_config_service, - types::{AIConfig, GlobalConfig}, - }; - - let config_service = get_global_config_service() - .await - .map_err(|e| format!("Config service not available: {e}"))?; - let global_config: GlobalConfig = config_service - .get_config(None) - .await - .map_err(|e| format!("Failed to load global config: {e}"))?; - let ai_config: AIConfig = global_config.ai; - - let models: Vec = - ai_config - .models - .into_iter() - .map(|model| { - let reasoning_mode = model.effective_reasoning_mode(); - - RemoteModelConfig { - id: model.id, - name: model.name, - provider: model.provider, - base_url: model.base_url, - model_name: model.model_name, - context_window: model.context_window, - enabled: model.enabled, - capabilities: model - .capabilities - .into_iter() - .map(|capability| { - match capability { - crate::service::config::types::ModelCapability::TextChat => "text_chat", - crate::service::config::types::ModelCapability::ImageUnderstanding => { - "image_understanding" - } - crate::service::config::types::ModelCapability::ImageGeneration => { - "image_generation" - } - crate::service::config::types::ModelCapability::Embedding => "embedding", - crate::service::config::types::ModelCapability::Search => "search", - crate::service::config::types::ModelCapability::CodeSpecialized => { - "code_specialized" - } - crate::service::config::types::ModelCapability::FunctionCalling => { - "function_calling" - } - crate::service::config::types::ModelCapability::SpeechRecognition => { - "speech_recognition" - } - } - .to_string() - }) - .collect(), - enable_thinking_process: model.enable_thinking_process, - reasoning_mode: Some( - match reasoning_mode { - crate::service::config::types::ReasoningMode::Default => "default", - crate::service::config::types::ReasoningMode::Enabled => "enabled", - crate::service::config::types::ReasoningMode::Disabled => "disabled", - crate::service::config::types::ReasoningMode::Adaptive => "adaptive", - } - .to_string(), - ), - reasoning_effort: model.reasoning_effort, - thinking_budget_tokens: model.thinking_budget_tokens, - } - }) - .collect(); - - let session_model_id = if let Some(session_id) = session_id { - resolve_session_model_id(session_id).await - } else { - None - }; - Ok(RemoteModelCatalog { - version: global_config.last_modified.timestamp_millis().max(0) as u64, - models, - default_models: RemoteDefaultModelsConfig { - primary: ai_config.default_models.primary, - fast: ai_config.default_models.fast, - search: ai_config.default_models.search, - image_understanding: ai_config.default_models.image_understanding, - image_generation: ai_config.default_models.image_generation, - speech_recognition: ai_config.default_models.speech_recognition, - }, - session_model_id, - }) -} - pub type EncryptedPayload = (String, String); -/// Compress a base64 data-URL image to a small thumbnail for mobile display. -/// Falls back to the original if decoding/compression fails or the image is -/// already within `max_bytes`. -fn compress_data_url_for_mobile(data_url: &str, max_bytes: usize) -> String { - use base64::Engine; - use base64::engine::general_purpose::STANDARD as BASE64; - use image::imageops::FilterType; - - const MAX_THUMBNAIL_DIM: u32 = 400; - - let Some(comma_pos) = data_url.find(',') else { - return data_url.to_string(); - }; - let b64_data = &data_url[comma_pos + 1..]; - - if b64_data.len() * 3 / 4 <= max_bytes { - return data_url.to_string(); - } - - let Ok(raw_bytes) = BASE64.decode(b64_data) else { - return data_url.to_string(); - }; - - let Ok(img) = image::load_from_memory(&raw_bytes) else { - return data_url.to_string(); - }; - - let resized = if img.width() > MAX_THUMBNAIL_DIM || img.height() > MAX_THUMBNAIL_DIM { - img.resize(MAX_THUMBNAIL_DIM, MAX_THUMBNAIL_DIM, FilterType::Triangle) - } else { - img - }; - - fn encode_jpeg(img: &image::DynamicImage, quality: u8) -> Option> { - let mut buf = Vec::new(); - let encoder = image::codecs::jpeg::JpegEncoder::new_with_quality(&mut buf, quality); - img.write_with_encoder(encoder).ok()?; - Some(buf) - } - - for quality in [75u8, 60, 45, 30] { - if let Some(buf) = encode_jpeg(&resized, quality) { - if buf.len() <= max_bytes || quality == 30 { - let b64 = BASE64.encode(&buf); - return format!("data:image/jpeg;base64,{b64}"); - } - } - } - - data_url.to_string() -} - -/// Max thumbnail size per image sent to mobile (100 KB). -const MOBILE_IMAGE_MAX_BYTES: usize = 100 * 1024; - -/// Convert persisted turns into mobile ChatMessages. -/// This is the same data source the desktop frontend uses. -fn turns_to_chat_messages(turns: &[crate::service::session::DialogTurnData]) -> Vec { - use crate::service::session::TurnStatus; - - let mut result = Vec::new(); - - for turn in turns { - if !turn.kind.is_model_visible() { - continue; - } - - let images = turn - .user_message - .metadata - .as_ref() - .and_then(|m| m.get("images")) - .and_then(|v| v.as_array()) - .map(|arr| { - arr.iter() - .filter_map(|v| { - let name = v.get("name")?.as_str()?.to_string(); - let raw_url = v.get("data_url")?.as_str()?; - let data_url = - compress_data_url_for_mobile(raw_url, MOBILE_IMAGE_MAX_BYTES); - Some(ChatImageAttachment { name, data_url }) - }) - .collect::>() - }) - .filter(|v| !v.is_empty()); - - // Prefer original_text from metadata (pre-enhancement) for display - let display_content = turn - .user_message - .metadata - .as_ref() - .and_then(|m| m.get("original_text")) - .and_then(|v| v.as_str()) - .map(|s| s.to_string()) - .unwrap_or_else(|| strip_user_input_tags(&turn.user_message.content)); - - result.push(ChatMessage { - id: turn.user_message.id.clone(), - role: "user".to_string(), - content: display_content, - timestamp: (turn.user_message.timestamp / 1000).to_string(), - metadata: None, - tools: None, - thinking: None, - items: None, - images, - }); - - // Skip assistant message for in-progress turns. The active turn's - // content is delivered via the real-time overlay, not the historical - // list. Including an empty / partial assistant message here would - // "consume" a slot in the count-based skip cursor and prevent the - // final version from ever being delivered. - if turn.status == TurnStatus::InProgress { - continue; - } - - // Collect ordered items across all rounds, preserving interleaved order - struct OrderedEntry { - order_index: Option, - sequence: usize, - round_idx: usize, - item: ChatMessageItem, - } - let mut ordered: Vec = Vec::new(); - let mut tools_flat = Vec::new(); - let mut thinking_parts = Vec::new(); - let mut text_parts = Vec::new(); - let mut sequence = 0usize; - - for (round_idx, round) in turn.model_rounds.iter().enumerate() { - // Iterate in streaming order: thinking → text → tools. - // The model first thinks, then outputs text (which may reference - // tool calls), and finally the tools are detected and executed. - // This matches the real-time display order on the tracker. - for t in &round.thinking_items { - if t.is_subagent_item.unwrap_or(false) { - continue; - } - if !t.content.is_empty() { - thinking_parts.push(t.content.clone()); - ordered.push(OrderedEntry { - order_index: t.order_index, - sequence, - round_idx, - item: ChatMessageItem { - item_type: "thinking".to_string(), - content: Some(t.content.clone()), - tool: None, - is_subagent: None, - }, - }); - sequence += 1; - } - } - for t in &round.text_items { - if t.is_subagent_item.unwrap_or(false) { - continue; - } - if !t.content.is_empty() { - text_parts.push(t.content.clone()); - ordered.push(OrderedEntry { - order_index: t.order_index, - sequence, - round_idx, - item: ChatMessageItem { - item_type: "text".to_string(), - content: Some(t.content.clone()), - tool: None, - is_subagent: None, - }, - }); - sequence += 1; - } - } - for t in &round.tool_items { - if t.is_subagent_item.unwrap_or(false) { - continue; - } - let status_str = t.status.as_deref().unwrap_or(if t.tool_result.is_some() { - "completed" - } else { - "running" - }); - let tool_status = RemoteToolStatus { - id: t.id.clone(), - name: t.tool_name.clone(), - status: status_str.to_string(), - duration_ms: t.duration_ms, - start_ms: Some(t.start_time), - input_preview: - bitfun_services_integrations::remote_connect::make_slim_tool_params( - &t.tool_call.input, - ), - tool_input: if t.tool_name == "AskUserQuestion" - || t.tool_name == "Task" - || t.tool_name == "TodoWrite" - { - Some(t.tool_call.input.clone()) - } else { - None - }, - }; - tools_flat.push(tool_status.clone()); - ordered.push(OrderedEntry { - order_index: t.order_index, - sequence, - round_idx, - item: ChatMessageItem { - item_type: "tool".to_string(), - content: None, - tool: Some(tool_status), - is_subagent: None, - }, - }); - sequence += 1; - } - } - - // Sort by round first (rounds are strictly sequential), then by - // order_index within each round. order_index is per-round (resets - // to 0 each round), so it must NOT be compared across rounds. - ordered.sort_by(|a, b| { - let round_cmp = a.round_idx.cmp(&b.round_idx); - if round_cmp != std::cmp::Ordering::Equal { - return round_cmp; - } - match (a.order_index, b.order_index) { - (Some(a_idx), Some(b_idx)) => a_idx.cmp(&b_idx), - (Some(_), None) => std::cmp::Ordering::Less, - (None, Some(_)) => std::cmp::Ordering::Greater, - (None, None) => a.sequence.cmp(&b.sequence), - } - }); - let items: Vec = ordered.into_iter().map(|e| e.item).collect(); - - let ts = turn - .model_rounds - .last() - .map(|r| r.end_time.unwrap_or(r.start_time)) - .unwrap_or(turn.start_time); - - result.push(ChatMessage { - id: format!("{}_assistant", turn.turn_id), - role: "assistant".to_string(), - content: text_parts.join("\n\n"), - timestamp: (ts / 1000).to_string(), - metadata: None, - tools: if tools_flat.is_empty() { - None - } else { - Some(tools_flat) - }, - thinking: if thinking_parts.is_empty() { - None - } else { - Some(thinking_parts.join("\n\n")) - }, - items: if items.is_empty() { None } else { Some(items) }, - images: None, - }); - } - - result -} - -/// Load historical chat messages from the unified project session store. -/// Uses the same data source as the desktop frontend. -async fn load_chat_messages_from_conversation_persistence( - workspace_path: &std::path::Path, - session_id: &str, -) -> (Vec, bool) { - use crate::agentic::persistence::PersistenceManager; - use crate::infrastructure::PathManager; - - let Ok(pm) = PathManager::new() else { - return (vec![], false); - }; - let pm = std::sync::Arc::new(pm); - let Ok(store) = PersistenceManager::new(pm) else { - return (vec![], false); - }; - let Ok(turns) = store.load_session_turns(workspace_path, session_id).await else { - return (vec![], false); - }; - (turns_to_chat_messages(&turns), false) -} - -fn strip_user_input_tags(content: &str) -> String { - let s = crate::agentic::core::strip_prompt_markup(content); - // Extract original question from enhancer-wrapped content - if s.starts_with("User uploaded") { - if let Some(pos) = s.find("User's question:\n") { - return s[pos + "User's question:\n".len()..].trim().to_string(); - } - } - s -} - fn resolve_agent_type(mobile_type: Option<&str>) -> &'static str { bitfun_services_integrations::remote_connect::resolve_remote_agent_type(mobile_type) } @@ -538,312 +86,6 @@ fn remote_image_context_to_core( CoreServiceAgentRuntime::remote_image_context(context) } -// ── RemoteSessionStateTracker subscriber adapter ───────────────── - -#[async_trait::async_trait] -impl crate::agentic::events::EventSubscriber for Arc { - async fn on_event( - &self, - event: &crate::agentic::events::AgenticEvent, - ) -> crate::util::errors::BitFunResult<()> { - self.handle_agentic_event(event); - Ok(()) - } -} - -struct CoreRemoteSessionTrackerHost; - -impl RemoteSessionTrackerHost for CoreRemoteSessionTrackerHost { - fn subscribe_tracker(&self, session_id: &str, tracker: Arc) { - if let Some(coordinator) = crate::agentic::coordination::get_global_coordinator() { - let sub_id = format!("remote_tracker_{}", session_id); - coordinator.subscribe_internal(sub_id, tracker); - info!("Registered state tracker for session {session_id}"); - } - } - - fn unsubscribe_tracker(&self, session_id: &str) { - if let Some(coordinator) = crate::agentic::coordination::get_global_coordinator() { - let sub_id = format!("remote_tracker_{}", session_id); - coordinator.unsubscribe_internal(&sub_id); - } - } - - fn active_turn_id(&self, session_id: &str) -> Option { - let coordinator = crate::agentic::coordination::get_global_coordinator()?; - let session_mgr = coordinator.get_session_manager(); - let session = session_mgr.get_session(session_id)?; - match &session.state { - crate::agentic::core::SessionState::Processing { - current_turn_id, .. - } => { - info!( - "Seeded tracker with existing active turn {} for session {}", - current_turn_id, session_id - ); - Some(current_turn_id.clone()) - } - _ => None, - } - } -} - -pub(crate) struct CoreRemoteDialogRuntimeHost<'a> { - dispatcher: &'a RemoteExecutionDispatcher, - coordinator: Arc, - scheduler: Arc, -} - -impl<'a> CoreRemoteDialogRuntimeHost<'a> { - pub(crate) fn new( - dispatcher: &'a RemoteExecutionDispatcher, - ) -> std::result::Result { - use crate::agentic::coordination::{get_global_coordinator, get_global_scheduler}; - - let coordinator = get_global_coordinator() - .ok_or_else(|| "Desktop session system not ready".to_string())?; - let scheduler = get_global_scheduler() - .ok_or_else(|| "Dialog scheduler is not initialized".to_string())?; - - Ok(Self { - dispatcher, - coordinator, - scheduler, - }) - } -} - -pub(crate) struct CoreRemoteCancelRuntimeHost { - coordinator: Arc, -} - -pub(crate) struct CoreRemoteWorkspaceFileRuntimeHost; - -impl CoreRemoteCancelRuntimeHost { - pub(crate) fn new() -> std::result::Result { - let coordinator = crate::agentic::coordination::get_global_coordinator() - .ok_or_else(|| "Desktop session system not ready".to_string())?; - Ok(Self { coordinator }) - } -} - -impl CoreRemoteWorkspaceFileRuntimeHost { - pub(crate) fn new() -> Self { - Self - } -} - -fn core_dialog_submission_policy( - policy: RemoteDialogSubmissionPolicy, -) -> crate::agentic::coordination::DialogSubmissionPolicy { - use crate::agentic::coordination::{ - DialogQueuePriority, DialogSubmissionPolicy, DialogTriggerSource, - }; - - let trigger_source = match policy.source { - RemoteConnectSubmissionSource::Relay => DialogTriggerSource::RemoteRelay, - RemoteConnectSubmissionSource::Bot => DialogTriggerSource::Bot, - }; - let queue_priority = match policy.queue_priority { - RemoteDialogQueuePriority::Low => DialogQueuePriority::Low, - RemoteDialogQueuePriority::Normal => DialogQueuePriority::Normal, - RemoteDialogQueuePriority::High => DialogQueuePriority::High, - }; - - DialogSubmissionPolicy::new( - trigger_source, - queue_priority, - policy.skip_tool_confirmation, - ) -} - -#[async_trait::async_trait] -impl RemoteDialogRuntimeHost for CoreRemoteDialogRuntimeHost<'_> { - type ImageContext = crate::agentic::image_analysis::ImageContextData; - - fn ensure_tracker(&self, session_id: &str) { - self.dispatcher.ensure_tracker(session_id); - } - - async fn resolve_binding_workspace(&self, session_id: &str) -> Option { - self.coordinator - .resolve_session_workspace_path(session_id) - .await - .map(|path| path.to_string_lossy().into_owned()) - } - - async fn remote_session_exists(&self, session_id: &str) -> std::result::Result { - Ok(self - .coordinator - .get_session_manager() - .get_session(session_id) - .is_some()) - } - - async fn restore_remote_session( - &self, - session_id: &str, - workspace_path: &str, - ) -> std::result::Result<(), String> { - self.coordinator - .restore_session(std::path::Path::new(workspace_path), session_id) - .await - .map(|_| ()) - .map_err(|e| e.to_string()) - } - - fn prewarm_remote_terminal(&self, request: RemoteTerminalPrewarmRequest) { - use terminal_core::session::SessionSource; - use terminal_core::{TerminalApi, TerminalBindingOptions}; - - let sid = request.session_id; - let binding_workspace_for_terminal = request.binding_workspace; - tokio::spawn(async move { - let Ok(api) = TerminalApi::from_singleton() else { - return; - }; - let binding = api.session_manager().binding(); - if binding.get(&sid).is_some() { - return; - } - let workspace = binding_workspace_for_terminal; - let name = format!("Chat-{}", &sid[..8.min(sid.len())]); - match binding - .get_or_create( - &sid, - TerminalBindingOptions { - working_directory: workspace, - session_id: Some(sid.clone()), - session_name: Some(name), - env: Some( - crate::agentic::tools::implementations::bash_tool::BashTool::noninteractive_env(), - ), - source: Some(SessionSource::Agent), - ..Default::default() - }, - ) - .await - { - Ok(_) => info!("Terminal pre-warmed for remote session {sid}"), - Err(e) => debug!("Terminal pre-warm skipped for {sid}: {e}"), - } - }); - } - - fn generate_turn_id(&self) -> String { - format!("turn_{}", chrono::Utc::now().timestamp_millis()) - } - - async fn submit_dialog( - &self, - submission: RemoteDialogResolvedSubmission, - ) -> std::result::Result { - let image_payload = if submission.image_contexts.is_empty() { - None - } else { - Some(submission.image_contexts) - }; - let policy = core_dialog_submission_policy(submission.policy); - - self.scheduler - .submit( - submission.session_id, - submission.content, - None, - Some(submission.turn_id), - submission.resolved_agent_type, - submission.binding_workspace, - policy, - None, - None, - image_payload, - ) - .await - .map(|outcome| match outcome { - crate::agentic::coordination::DialogSubmitOutcome::Started { - session_id, - turn_id, - } => RemoteDialogSubmitOutcome::Started { - session_id, - turn_id, - }, - crate::agentic::coordination::DialogSubmitOutcome::Queued { - session_id, - turn_id, - } => RemoteDialogSubmitOutcome::Queued { - session_id, - turn_id, - }, - }) - } -} - -#[async_trait::async_trait] -impl RemoteWorkspaceFileRuntimeHost for CoreRemoteWorkspaceFileRuntimeHost { - async fn resolve_remote_file_workspace_root( - &self, - session_id: Option<&str>, - ) -> Option { - resolve_file_workspace_root(session_id).await - } -} - -#[async_trait::async_trait] -impl RemoteCancelRuntimeHost for CoreRemoteCancelRuntimeHost { - async fn resolve_restore_workspace(&self, session_id: &str) -> Option { - self.coordinator - .resolve_session_workspace_path(session_id) - .await - .map(|path| path.to_string_lossy().into_owned()) - } - - async fn remote_control_state( - &self, - session_id: &str, - ) -> std::result::Result, String> { - let state_port = - CoreServiceAgentRuntime::remote_control_state_port(self.coordinator.as_ref()); - state_port - .read_remote_control_state(bitfun_runtime_ports::RemoteControlStateRequest { - session_id: session_id.to_string(), - }) - .await - .map_err(|error| error.message) - } - - async fn restore_remote_session( - &self, - session_id: &str, - workspace_path: &str, - ) -> std::result::Result<(), String> { - self.coordinator - .restore_session(std::path::Path::new(workspace_path), session_id) - .await - .map(|_| ()) - .map_err(|error| error.to_string()) - } - - async fn cancel_remote_turn( - &self, - session_id: &str, - turn_id: &str, - ) -> std::result::Result<(), String> { - let cancellation_port = - CoreServiceAgentRuntime::agent_turn_cancellation_port(self.coordinator.as_ref()); - cancellation_port - .cancel_turn(bitfun_runtime_ports::AgentTurnCancellationRequest { - session_id: session_id.to_string(), - turn_id: Some(turn_id.to_string()), - source: Some(bitfun_runtime_ports::AgentSubmissionSource::RemoteRelay), - reason: None, - wait_timeout_ms: None, - }) - .await - .map(|_| ()) - .map_err(|error| error.message) - } -} - // ── RemoteExecutionDispatcher (global singleton) ──────────────────── /// Shared dispatch layer that owns the session state trackers. @@ -1124,7 +366,10 @@ impl RemoteServer { let tracker = self.ensure_tracker(session_id); let current_version = tracker.version(); - let current_model_catalog = load_remote_model_catalog(Some(session_id)).await.ok(); + let current_model_catalog = + CoreServiceAgentRuntime::load_remote_model_catalog(Some(session_id)) + .await + .ok(); let model_catalog_delta = remote_model_catalog_poll_delta(current_model_catalog, *known_model_catalog_version); @@ -1145,13 +390,15 @@ impl RemoteServer { ); } - let Some(workspace_path) = resolve_session_workspace_path(session_id).await else { + let Some(workspace_path) = + CoreServiceAgentRuntime::resolve_session_workspace_path(session_id).await + else { return RemoteResponse::Error { message: format!("Workspace path not available for session: {}", session_id), }; }; let (all_chat_msgs, _) = - load_chat_messages_from_conversation_persistence(&workspace_path, session_id).await; + CoreServiceAgentRuntime::load_remote_chat_messages(&workspace_path, session_id).await; let total_msg_count = all_chat_msgs.len(); let skip = *known_msg_count; let new_messages: Vec = all_chat_msgs.into_iter().skip(skip).collect(); @@ -1292,7 +539,7 @@ impl RemoteServer { async fn handle_session_command(&self, cmd: &RemoteCommand) -> RemoteResponse { use crate::agentic::coordination::get_global_coordinator; use bitfun_services_integrations::remote_connect::{ - RemoteConnectSubmissionSource, build_remote_session_create_request, + build_remote_session_create_request, RemoteConnectSubmissionSource, }; let coordinator = match get_global_coordinator() { @@ -1458,7 +705,9 @@ impl RemoteServer { } } RemoteCommand::GetModelCatalog { session_id } => { - match load_remote_model_catalog(session_id.as_deref()).await { + match CoreServiceAgentRuntime::load_remote_model_catalog(session_id.as_deref()) + .await + { Ok(catalog) => RemoteResponse::ModelCatalog { catalog }, Err(message) => RemoteResponse::Error { message }, } @@ -1467,85 +716,18 @@ impl RemoteServer { session_id, model_id, } => { - use crate::service::config::{get_global_config_service, types::AIConfig}; - - let requested_model_id = model_id.trim(); - if requested_model_id.is_empty() { - return RemoteResponse::Error { - message: "model_id is required".to_string(), - }; - } - - let normalized_model_id = - if matches!(requested_model_id, "auto" | "default" | "primary" | "fast") { - if requested_model_id == "default" { - "auto".to_string() - } else { - requested_model_id.to_string() - } - } else { - let Ok(config_service) = get_global_config_service().await else { - return RemoteResponse::Error { - message: "Config service not available".to_string(), - }; - }; - let ai_config: AIConfig = match config_service.get_config(Some("ai")).await - { - Ok(config) => config, - Err(e) => { - return RemoteResponse::Error { - message: format!("Failed to load AI config: {e}"), - }; - } - }; - match ai_config.resolve_model_reference(requested_model_id) { - Some(resolved) => resolved, - None => { - return RemoteResponse::Error { - message: format!( - "Unknown model selection: {requested_model_id}" - ), - }; - } - } - }; - - if coordinator - .get_session_manager() - .get_session(session_id) - .is_none() - { - let Some(workspace_path) = resolve_session_workspace_path(session_id).await - else { - return RemoteResponse::Error { - message: format!( - "Workspace path not available for session: {}", - session_id - ), - }; - }; - if let Err(e) = coordinator - .restore_session(&workspace_path, session_id) - .await - { - return RemoteResponse::Error { - message: format!("Failed to restore session: {e}"), - }; - } - } - - match coordinator - .get_session_manager() - .update_session_model_id(session_id, &normalized_model_id) - .await + match CoreServiceAgentRuntime::update_remote_session_model( + coordinator.as_ref(), + session_id, + model_id, + ) + .await { - Ok(()) => remote_session_model_updated_response( + Ok(normalized_model_id) => remote_session_model_updated_response( session_id.clone(), normalized_model_id, ), - Err(e) => RemoteResponse::Error { - message: e.to_string(), - }, + Err(message) => RemoteResponse::Error { message }, } } RemoteCommand::GetSessionMessages { @@ -1553,7 +735,9 @@ impl RemoteServer { limit: _, before_message_id: _, } => { - let Some(workspace_path) = resolve_session_workspace_path(session_id).await else { + let Some(workspace_path) = + CoreServiceAgentRuntime::resolve_session_workspace_path(session_id).await + else { return RemoteResponse::Error { message: format!( "Workspace path not available for session: {}", @@ -1562,12 +746,14 @@ impl RemoteServer { }; }; let (chat_msgs, has_more) = - load_chat_messages_from_conversation_persistence(&workspace_path, session_id) + CoreServiceAgentRuntime::load_remote_chat_messages(&workspace_path, session_id) .await; remote_messages_response(session_id.clone(), chat_msgs, has_more) } RemoteCommand::DeleteSession { session_id } => { - let Some(workspace_path) = resolve_session_workspace_path(session_id).await else { + let Some(workspace_path) = + CoreServiceAgentRuntime::resolve_session_workspace_path(session_id).await + else { return RemoteResponse::Error { message: format!( "Workspace path not available for session: {}", @@ -1730,7 +916,7 @@ mod tests { use super::*; use crate::service::remote_connect::encryption::KeyPair; use bitfun_services_integrations::remote_connect::{ - RemoteCancelDecision, remote_session_restore_target, resolve_remote_cancel_decision, + remote_session_restore_target, resolve_remote_cancel_decision, RemoteCancelDecision, }; #[test] diff --git a/src/crates/core/src/service_agent_runtime.rs b/src/crates/core/src/service_agent_runtime.rs index 3da478e28..323f3b702 100644 --- a/src/crates/core/src/service_agent_runtime.rs +++ b/src/crates/core/src/service_agent_runtime.rs @@ -6,16 +6,391 @@ //! implementations until a reviewed port/provider migration proves equivalence. use bitfun_runtime_ports::{ - AgentSubmissionPort, AgentTurnCancellationPort, RemoteControlStatePort, + AgentSubmissionPort, AgentSubmissionSource, AgentTurnCancellationPort, + AgentTurnCancellationRequest, RemoteControlStatePort, RemoteControlStateRequest, + RemoteControlStateSnapshot, }; -use bitfun_services_integrations::remote_connect::{RemoteImageContext, RemoteImageContextAdapter}; +use bitfun_services_integrations::remote_connect::{ + ChatImageAttachment, ChatMessage, ChatMessageItem, RemoteCancelRuntimeHost, + RemoteConnectSubmissionSource, RemoteDefaultModelsConfig, RemoteDialogQueuePriority, + RemoteDialogResolvedSubmission, RemoteDialogRuntimeHost, RemoteDialogSubmissionPolicy, + RemoteDialogSubmitOutcome, RemoteImageContext, RemoteImageContextAdapter, RemoteModelCatalog, + RemoteModelConfig, RemoteSessionStateTracker, RemoteSessionTrackerHost, + RemoteTerminalPrewarmRequest, RemoteToolStatus, RemoteWorkspaceFileRuntimeHost, +}; +use log::{debug, info}; +use std::sync::Arc; -use crate::agentic::coordination::ConversationCoordinator; -use crate::agentic::image_analysis::ImageContextData; -use crate::service::remote_connect::remote_server::{ - CoreRemoteCancelRuntimeHost, CoreRemoteDialogRuntimeHost, CoreRemoteWorkspaceFileRuntimeHost, - RemoteExecutionDispatcher, +use crate::agentic::coordination::{ + get_global_coordinator, get_global_scheduler, ConversationCoordinator, DialogQueuePriority, + DialogScheduler, DialogSubmissionPolicy, DialogSubmitOutcome, DialogTriggerSource, }; +use crate::agentic::image_analysis::ImageContextData; +use crate::service::remote_connect::remote_server::RemoteExecutionDispatcher; + +use crate::service::config::types::{AIConfig, GlobalConfig, ModelCapability, ReasoningMode}; +use crate::service::session::{DialogTurnData, TurnStatus}; + +/// Max thumbnail size per remote chat image sent to mobile (100 KB). +const MOBILE_IMAGE_MAX_BYTES: usize = 100 * 1024; + +fn current_workspace_path() -> Option { + crate::service::workspace::get_global_workspace_service() + .and_then(|service| service.try_get_current_workspace_path()) +} + +fn normalize_remote_session_model_id(model_id: Option) -> Option { + match model_id { + Some(value) => { + let trimmed = value.trim(); + if trimmed.is_empty() || trimmed == "default" { + Some("auto".to_string()) + } else { + Some(trimmed.to_string()) + } + } + None => Some("auto".to_string()), + } +} + +fn normalize_remote_model_selection( + requested_model_id: &str, + ai_config: Option<&AIConfig>, +) -> Result { + let requested_model_id = requested_model_id.trim(); + if requested_model_id.is_empty() { + return Err("model_id is required".to_string()); + } + + if matches!(requested_model_id, "auto" | "default" | "primary" | "fast") { + return Ok(if requested_model_id == "default" { + "auto".to_string() + } else { + requested_model_id.to_string() + }); + } + + let Some(ai_config) = ai_config else { + return Err("Config service not available".to_string()); + }; + ai_config + .resolve_model_reference(requested_model_id) + .ok_or_else(|| format!("Unknown model selection: {requested_model_id}")) +} + +fn remote_model_selection_needs_config(requested_model_id: &str) -> bool { + let requested_model_id = requested_model_id.trim(); + !requested_model_id.is_empty() + && !matches!(requested_model_id, "auto" | "default" | "primary" | "fast") +} + +/// Compress a base64 data-URL image to a small thumbnail for mobile display. +/// Falls back to the original if decoding/compression fails or the image is +/// already within `max_bytes`. +fn compress_remote_chat_data_url_for_mobile(data_url: &str, max_bytes: usize) -> String { + use base64::engine::general_purpose::STANDARD as BASE64; + use base64::Engine; + use image::imageops::FilterType; + + const MAX_THUMBNAIL_DIM: u32 = 400; + + let Some(comma_pos) = data_url.find(',') else { + return data_url.to_string(); + }; + let b64_data = &data_url[comma_pos + 1..]; + + if b64_data.len() * 3 / 4 <= max_bytes { + return data_url.to_string(); + } + + let Ok(raw_bytes) = BASE64.decode(b64_data) else { + return data_url.to_string(); + }; + + let Ok(img) = image::load_from_memory(&raw_bytes) else { + return data_url.to_string(); + }; + + let resized = if img.width() > MAX_THUMBNAIL_DIM || img.height() > MAX_THUMBNAIL_DIM { + img.resize(MAX_THUMBNAIL_DIM, MAX_THUMBNAIL_DIM, FilterType::Triangle) + } else { + img + }; + + fn encode_jpeg(img: &image::DynamicImage, quality: u8) -> Option> { + let mut buf = Vec::new(); + let encoder = image::codecs::jpeg::JpegEncoder::new_with_quality(&mut buf, quality); + img.write_with_encoder(encoder).ok()?; + Some(buf) + } + + for quality in [75u8, 60, 45, 30] { + if let Some(buf) = encode_jpeg(&resized, quality) { + if buf.len() <= max_bytes || quality == 30 { + let b64 = BASE64.encode(&buf); + return format!("data:image/jpeg;base64,{b64}"); + } + } + } + + data_url.to_string() +} + +/// Convert persisted turns into mobile ChatMessages. +/// This is the same data source the desktop frontend uses. +fn remote_chat_messages_from_turns(turns: &[DialogTurnData]) -> Vec { + let mut result = Vec::new(); + + for turn in turns { + if !turn.kind.is_model_visible() { + continue; + } + + let images = turn + .user_message + .metadata + .as_ref() + .and_then(|m| m.get("images")) + .and_then(|v| v.as_array()) + .map(|arr| { + arr.iter() + .filter_map(|v| { + let name = v.get("name")?.as_str()?.to_string(); + let raw_url = v.get("data_url")?.as_str()?; + let data_url = compress_remote_chat_data_url_for_mobile( + raw_url, + MOBILE_IMAGE_MAX_BYTES, + ); + Some(ChatImageAttachment { name, data_url }) + }) + .collect::>() + }) + .filter(|v| !v.is_empty()); + + // Prefer original_text from metadata (pre-enhancement) for display. + let display_content = turn + .user_message + .metadata + .as_ref() + .and_then(|m| m.get("original_text")) + .and_then(|v| v.as_str()) + .map(|s| s.to_string()) + .unwrap_or_else(|| strip_remote_user_input_tags(&turn.user_message.content)); + + result.push(ChatMessage { + id: turn.user_message.id.clone(), + role: "user".to_string(), + content: display_content, + timestamp: (turn.user_message.timestamp / 1000).to_string(), + metadata: None, + tools: None, + thinking: None, + items: None, + images, + }); + + // Skip assistant message for in-progress turns. The active turn's + // content is delivered via the real-time overlay, not the historical + // list. Including an empty or partial assistant message here would + // consume a slot in the count-based skip cursor and prevent the final + // version from ever being delivered. + if turn.status == TurnStatus::InProgress { + continue; + } + + // Collect ordered items across all rounds, preserving interleaved order. + struct OrderedEntry { + order_index: Option, + sequence: usize, + round_idx: usize, + item: ChatMessageItem, + } + + let mut ordered: Vec = Vec::new(); + let mut tools_flat = Vec::new(); + let mut thinking_parts = Vec::new(); + let mut text_parts = Vec::new(); + let mut sequence = 0usize; + + for (round_idx, round) in turn.model_rounds.iter().enumerate() { + // Iterate in streaming order: thinking, text, tools. + // The model first thinks, then outputs text, and finally the tools + // are detected/executed. This matches the real-time tracker order. + for t in &round.thinking_items { + if t.is_subagent_item.unwrap_or(false) { + continue; + } + if !t.content.is_empty() { + thinking_parts.push(t.content.clone()); + ordered.push(OrderedEntry { + order_index: t.order_index, + sequence, + round_idx, + item: ChatMessageItem { + item_type: "thinking".to_string(), + content: Some(t.content.clone()), + tool: None, + is_subagent: None, + }, + }); + sequence += 1; + } + } + for t in &round.text_items { + if t.is_subagent_item.unwrap_or(false) { + continue; + } + if !t.content.is_empty() { + text_parts.push(t.content.clone()); + ordered.push(OrderedEntry { + order_index: t.order_index, + sequence, + round_idx, + item: ChatMessageItem { + item_type: "text".to_string(), + content: Some(t.content.clone()), + tool: None, + is_subagent: None, + }, + }); + sequence += 1; + } + } + for t in &round.tool_items { + if t.is_subagent_item.unwrap_or(false) { + continue; + } + let status_str = t.status.as_deref().unwrap_or(if t.tool_result.is_some() { + "completed" + } else { + "running" + }); + let tool_status = RemoteToolStatus { + id: t.id.clone(), + name: t.tool_name.clone(), + status: status_str.to_string(), + duration_ms: t.duration_ms, + start_ms: Some(t.start_time), + input_preview: + bitfun_services_integrations::remote_connect::make_slim_tool_params( + &t.tool_call.input, + ), + tool_input: if t.tool_name == "AskUserQuestion" + || t.tool_name == "Task" + || t.tool_name == "TodoWrite" + { + Some(t.tool_call.input.clone()) + } else { + None + }, + }; + tools_flat.push(tool_status.clone()); + ordered.push(OrderedEntry { + order_index: t.order_index, + sequence, + round_idx, + item: ChatMessageItem { + item_type: "tool".to_string(), + content: None, + tool: Some(tool_status), + is_subagent: None, + }, + }); + sequence += 1; + } + } + + // Sort by round first (rounds are strictly sequential), then by + // order_index within each round. order_index is per-round, so it must + // not be compared across rounds. + ordered.sort_by(|a, b| { + let round_cmp = a.round_idx.cmp(&b.round_idx); + if round_cmp != std::cmp::Ordering::Equal { + return round_cmp; + } + match (a.order_index, b.order_index) { + (Some(a_idx), Some(b_idx)) => a_idx.cmp(&b_idx), + (Some(_), None) => std::cmp::Ordering::Less, + (None, Some(_)) => std::cmp::Ordering::Greater, + (None, None) => a.sequence.cmp(&b.sequence), + } + }); + let items: Vec = ordered.into_iter().map(|e| e.item).collect(); + + let ts = turn + .model_rounds + .last() + .map(|r| r.end_time.unwrap_or(r.start_time)) + .unwrap_or(turn.start_time); + + result.push(ChatMessage { + id: format!("{}_assistant", turn.turn_id), + role: "assistant".to_string(), + content: text_parts.join("\n\n"), + timestamp: (ts / 1000).to_string(), + metadata: None, + tools: if tools_flat.is_empty() { + None + } else { + Some(tools_flat) + }, + thinking: if thinking_parts.is_empty() { + None + } else { + Some(thinking_parts.join("\n\n")) + }, + items: if items.is_empty() { None } else { Some(items) }, + images: None, + }); + } + + result +} + +fn strip_remote_user_input_tags(content: &str) -> String { + let s = crate::agentic::core::strip_prompt_markup(content); + if s.starts_with("User uploaded") { + if let Some(pos) = s.find("User's question:\n") { + return s[pos + "User's question:\n".len()..].trim().to_string(); + } + } + s +} + +async fn resolve_session_model_id(session_id: &str) -> Option { + let coordinator = get_global_coordinator()?; + let session_manager = coordinator.get_session_manager(); + + if let Some(session) = session_manager.get_session(session_id) { + return normalize_remote_session_model_id(session.config.model_id.clone()); + } + + let workspace_path = + CoreServiceAgentRuntime::resolve_session_workspace_path(session_id).await?; + coordinator + .restore_session(&workspace_path, session_id) + .await + .ok() + .and_then(|session| normalize_remote_session_model_id(session.config.model_id.clone())) +} + +fn core_dialog_submission_policy(policy: RemoteDialogSubmissionPolicy) -> DialogSubmissionPolicy { + let trigger_source = match policy.source { + RemoteConnectSubmissionSource::Relay => DialogTriggerSource::RemoteRelay, + RemoteConnectSubmissionSource::Bot => DialogTriggerSource::Bot, + }; + let queue_priority = match policy.queue_priority { + RemoteDialogQueuePriority::Low => DialogQueuePriority::Low, + RemoteDialogQueuePriority::Normal => DialogQueuePriority::Normal, + RemoteDialogQueuePriority::High => DialogQueuePriority::High, + }; + + DialogSubmissionPolicy::new( + trigger_source, + queue_priority, + policy.skip_tool_confirmation, + ) +} impl RemoteImageContextAdapter for ImageContextData { fn from_remote_image_context(context: RemoteImageContext) -> Self { @@ -32,6 +407,25 @@ impl RemoteImageContextAdapter for ImageContextData { pub(crate) struct CoreServiceAgentRuntime; impl CoreServiceAgentRuntime { + pub(crate) async fn resolve_session_workspace_path( + session_id: &str, + ) -> Option { + let coordinator = get_global_coordinator()?; + coordinator.resolve_session_workspace_path(session_id).await + } + + pub(crate) async fn resolve_remote_file_workspace_root( + session_id: Option<&str>, + ) -> Option { + if let Some(session_id) = session_id { + if let Some(workspace_path) = Self::resolve_session_workspace_path(session_id).await { + return Some(workspace_path); + } + } + + current_workspace_path() + } + pub(crate) fn remote_dialog_host( dispatcher: &RemoteExecutionDispatcher, ) -> Result, String> { @@ -50,6 +444,147 @@ impl CoreServiceAgentRuntime { ImageContextData::from_remote_image_context(context) } + pub(crate) async fn load_remote_chat_messages( + workspace_path: &std::path::Path, + session_id: &str, + ) -> (Vec, bool) { + let Ok(pm) = crate::infrastructure::PathManager::new() else { + return (vec![], false); + }; + let pm = std::sync::Arc::new(pm); + let Ok(store) = crate::agentic::persistence::PersistenceManager::new(pm) else { + return (vec![], false); + }; + let Ok(turns) = store.load_session_turns(workspace_path, session_id).await else { + return (vec![], false); + }; + (remote_chat_messages_from_turns(&turns), false) + } + + pub(crate) async fn load_remote_model_catalog( + session_id: Option<&str>, + ) -> Result { + let config_service = crate::service::config::get_global_config_service() + .await + .map_err(|e| format!("Config service not available: {e}"))?; + let global_config: GlobalConfig = config_service + .get_config(None) + .await + .map_err(|e| format!("Failed to load global config: {e}"))?; + let ai_config: AIConfig = global_config.ai; + + let models: Vec = ai_config + .models + .into_iter() + .map(|model| { + let reasoning_mode = model.effective_reasoning_mode(); + + RemoteModelConfig { + id: model.id, + name: model.name, + provider: model.provider, + base_url: model.base_url, + model_name: model.model_name, + context_window: model.context_window, + enabled: model.enabled, + capabilities: model + .capabilities + .into_iter() + .map(|capability| { + match capability { + ModelCapability::TextChat => "text_chat", + ModelCapability::ImageUnderstanding => "image_understanding", + ModelCapability::ImageGeneration => "image_generation", + ModelCapability::Embedding => "embedding", + ModelCapability::Search => "search", + ModelCapability::CodeSpecialized => "code_specialized", + ModelCapability::FunctionCalling => "function_calling", + ModelCapability::SpeechRecognition => "speech_recognition", + } + .to_string() + }) + .collect(), + enable_thinking_process: model.enable_thinking_process, + reasoning_mode: Some( + match reasoning_mode { + ReasoningMode::Default => "default", + ReasoningMode::Enabled => "enabled", + ReasoningMode::Disabled => "disabled", + ReasoningMode::Adaptive => "adaptive", + } + .to_string(), + ), + reasoning_effort: model.reasoning_effort, + thinking_budget_tokens: model.thinking_budget_tokens, + } + }) + .collect(); + + let session_model_id = if let Some(session_id) = session_id { + resolve_session_model_id(session_id).await + } else { + None + }; + Ok(RemoteModelCatalog { + version: global_config.last_modified.timestamp_millis().max(0) as u64, + models, + default_models: RemoteDefaultModelsConfig { + primary: ai_config.default_models.primary, + fast: ai_config.default_models.fast, + search: ai_config.default_models.search, + image_understanding: ai_config.default_models.image_understanding, + image_generation: ai_config.default_models.image_generation, + speech_recognition: ai_config.default_models.speech_recognition, + }, + session_model_id, + }) + } + + pub(crate) async fn update_remote_session_model( + coordinator: &ConversationCoordinator, + session_id: &str, + model_id: &str, + ) -> Result { + let ai_config = if remote_model_selection_needs_config(model_id) { + let config_service = crate::service::config::get_global_config_service() + .await + .map_err(|_| "Config service not available".to_string())?; + Some( + config_service + .get_config::(Some("ai")) + .await + .map_err(|e| format!("Failed to load AI config: {e}"))?, + ) + } else { + None + }; + let normalized_model_id = normalize_remote_model_selection(model_id, ai_config.as_ref())?; + + if coordinator + .get_session_manager() + .get_session(session_id) + .is_none() + { + let Some(workspace_path) = Self::resolve_session_workspace_path(session_id).await + else { + return Err(format!( + "Workspace path not available for session: {session_id}" + )); + }; + coordinator + .restore_session(&workspace_path, session_id) + .await + .map_err(|e| format!("Failed to restore session: {e}"))?; + } + + coordinator + .get_session_manager() + .update_session_model_id(session_id, &normalized_model_id) + .await + .map_err(|e| e.to_string())?; + Ok(normalized_model_id) + } + pub(crate) fn agent_submission_port( coordinator: &ConversationCoordinator, ) -> &(dyn AgentSubmissionPort + '_) { @@ -69,13 +604,287 @@ impl CoreServiceAgentRuntime { } } +pub(crate) struct CoreRemoteSessionTrackerHost; + +#[async_trait::async_trait] +impl crate::agentic::events::EventSubscriber for Arc { + async fn on_event( + &self, + event: &crate::agentic::events::AgenticEvent, + ) -> crate::util::errors::BitFunResult<()> { + self.handle_agentic_event(event); + Ok(()) + } +} + +impl RemoteSessionTrackerHost for CoreRemoteSessionTrackerHost { + fn subscribe_tracker(&self, session_id: &str, tracker: Arc) { + if let Some(coordinator) = get_global_coordinator() { + let sub_id = format!("remote_tracker_{}", session_id); + coordinator.subscribe_internal(sub_id, tracker); + info!("Registered state tracker for session {session_id}"); + } + } + + fn unsubscribe_tracker(&self, session_id: &str) { + if let Some(coordinator) = get_global_coordinator() { + let sub_id = format!("remote_tracker_{}", session_id); + coordinator.unsubscribe_internal(&sub_id); + } + } + + fn active_turn_id(&self, session_id: &str) -> Option { + let coordinator = get_global_coordinator()?; + let session_mgr = coordinator.get_session_manager(); + let session = session_mgr.get_session(session_id)?; + match &session.state { + crate::agentic::core::SessionState::Processing { + current_turn_id, .. + } => { + info!( + "Seeded tracker with existing active turn {} for session {}", + current_turn_id, session_id + ); + Some(current_turn_id.clone()) + } + _ => None, + } + } +} + +pub(crate) struct CoreRemoteDialogRuntimeHost<'a> { + dispatcher: &'a RemoteExecutionDispatcher, + coordinator: Arc, + scheduler: Arc, +} + +impl<'a> CoreRemoteDialogRuntimeHost<'a> { + pub(crate) fn new(dispatcher: &'a RemoteExecutionDispatcher) -> Result { + let coordinator = get_global_coordinator() + .ok_or_else(|| "Desktop session system not ready".to_string())?; + let scheduler = get_global_scheduler() + .ok_or_else(|| "Dialog scheduler is not initialized".to_string())?; + + Ok(Self { + dispatcher, + coordinator, + scheduler, + }) + } +} + +pub(crate) struct CoreRemoteCancelRuntimeHost { + coordinator: Arc, +} + +impl CoreRemoteCancelRuntimeHost { + pub(crate) fn new() -> Result { + let coordinator = get_global_coordinator() + .ok_or_else(|| "Desktop session system not ready".to_string())?; + Ok(Self { coordinator }) + } +} + +pub(crate) struct CoreRemoteWorkspaceFileRuntimeHost; + +impl CoreRemoteWorkspaceFileRuntimeHost { + pub(crate) fn new() -> Self { + Self + } +} + +#[async_trait::async_trait] +impl RemoteDialogRuntimeHost for CoreRemoteDialogRuntimeHost<'_> { + type ImageContext = ImageContextData; + + fn ensure_tracker(&self, session_id: &str) { + self.dispatcher.ensure_tracker(session_id); + } + + async fn resolve_binding_workspace(&self, session_id: &str) -> Option { + self.coordinator + .resolve_session_workspace_path(session_id) + .await + .map(|path| path.to_string_lossy().into_owned()) + } + + async fn remote_session_exists(&self, session_id: &str) -> Result { + Ok(self + .coordinator + .get_session_manager() + .get_session(session_id) + .is_some()) + } + + async fn restore_remote_session( + &self, + session_id: &str, + workspace_path: &str, + ) -> Result<(), String> { + self.coordinator + .restore_session(std::path::Path::new(workspace_path), session_id) + .await + .map(|_| ()) + .map_err(|e| e.to_string()) + } + + fn prewarm_remote_terminal(&self, request: RemoteTerminalPrewarmRequest) { + use terminal_core::session::SessionSource; + use terminal_core::{TerminalApi, TerminalBindingOptions}; + + let sid = request.session_id; + let binding_workspace_for_terminal = request.binding_workspace; + tokio::spawn(async move { + let Ok(api) = TerminalApi::from_singleton() else { + return; + }; + let binding = api.session_manager().binding(); + if binding.get(&sid).is_some() { + return; + } + let workspace = binding_workspace_for_terminal; + let name = format!("Chat-{}", &sid[..8.min(sid.len())]); + match binding + .get_or_create( + &sid, + TerminalBindingOptions { + working_directory: workspace, + session_id: Some(sid.clone()), + session_name: Some(name), + env: Some( + crate::agentic::tools::implementations::bash_tool::BashTool::noninteractive_env(), + ), + source: Some(SessionSource::Agent), + ..Default::default() + }, + ) + .await + { + Ok(_) => info!("Terminal pre-warmed for remote session {sid}"), + Err(e) => debug!("Terminal pre-warm skipped for {sid}: {e}"), + } + }); + } + + fn generate_turn_id(&self) -> String { + format!("turn_{}", chrono::Utc::now().timestamp_millis()) + } + + async fn submit_dialog( + &self, + submission: RemoteDialogResolvedSubmission, + ) -> Result { + let image_payload = if submission.image_contexts.is_empty() { + None + } else { + Some(submission.image_contexts) + }; + let policy = core_dialog_submission_policy(submission.policy); + + self.scheduler + .submit( + submission.session_id, + submission.content, + None, + Some(submission.turn_id), + submission.resolved_agent_type, + submission.binding_workspace, + policy, + None, + None, + image_payload, + ) + .await + .map(|outcome| match outcome { + DialogSubmitOutcome::Started { + session_id, + turn_id, + } => RemoteDialogSubmitOutcome::Started { + session_id, + turn_id, + }, + DialogSubmitOutcome::Queued { + session_id, + turn_id, + } => RemoteDialogSubmitOutcome::Queued { + session_id, + turn_id, + }, + }) + } +} + +#[async_trait::async_trait] +impl RemoteWorkspaceFileRuntimeHost for CoreRemoteWorkspaceFileRuntimeHost { + async fn resolve_remote_file_workspace_root( + &self, + session_id: Option<&str>, + ) -> Option { + CoreServiceAgentRuntime::resolve_remote_file_workspace_root(session_id).await + } +} + +#[async_trait::async_trait] +impl RemoteCancelRuntimeHost for CoreRemoteCancelRuntimeHost { + async fn resolve_restore_workspace(&self, session_id: &str) -> Option { + self.coordinator + .resolve_session_workspace_path(session_id) + .await + .map(|path| path.to_string_lossy().into_owned()) + } + + async fn remote_control_state( + &self, + session_id: &str, + ) -> Result, String> { + let state_port = + CoreServiceAgentRuntime::remote_control_state_port(self.coordinator.as_ref()); + state_port + .read_remote_control_state(RemoteControlStateRequest { + session_id: session_id.to_string(), + }) + .await + .map_err(|error| error.message) + } + + async fn restore_remote_session( + &self, + session_id: &str, + workspace_path: &str, + ) -> Result<(), String> { + self.coordinator + .restore_session(std::path::Path::new(workspace_path), session_id) + .await + .map(|_| ()) + .map_err(|error| error.to_string()) + } + + async fn cancel_remote_turn(&self, session_id: &str, turn_id: &str) -> Result<(), String> { + let cancellation_port = + CoreServiceAgentRuntime::agent_turn_cancellation_port(self.coordinator.as_ref()); + cancellation_port + .cancel_turn(AgentTurnCancellationRequest { + session_id: session_id.to_string(), + turn_id: Some(turn_id.to_string()), + source: Some(AgentSubmissionSource::RemoteRelay), + reason: None, + wait_timeout_ms: None, + }) + .await + .map(|_| ()) + .map_err(|error| error.message) + } +} + #[cfg(test)] mod tests { - use bitfun_runtime_ports::{ - AgentTurnCancellationPort, RemoteControlStatePort, SessionTranscriptReader, - }; + use bitfun_runtime_ports::SessionTranscriptReader; use super::*; + use crate::service::session::{ + DialogTurnData, DialogTurnKind, ModelRoundData, TextItemData, ThinkingItemData, + ToolCallData, ToolItemData, TurnStatus, UserMessageData, + }; #[test] fn core_service_agent_runtime_owner_keeps_coordinator_port_contracts() { @@ -107,4 +916,231 @@ mod tests { let _ = assert_port_accessors; } + + #[test] + fn core_service_agent_runtime_owner_maps_remote_dialog_policy() { + let relay = core_dialog_submission_policy(RemoteDialogSubmissionPolicy { + source: RemoteConnectSubmissionSource::Relay, + queue_priority: RemoteDialogQueuePriority::High, + skip_tool_confirmation: true, + }); + assert_eq!(relay.trigger_source, DialogTriggerSource::RemoteRelay); + assert_eq!(relay.queue_priority, DialogQueuePriority::High); + assert!(relay.skip_tool_confirmation); + + let bot = core_dialog_submission_policy(RemoteDialogSubmissionPolicy { + source: RemoteConnectSubmissionSource::Bot, + queue_priority: RemoteDialogQueuePriority::Low, + skip_tool_confirmation: false, + }); + assert_eq!(bot.trigger_source, DialogTriggerSource::Bot); + assert_eq!(bot.queue_priority, DialogQueuePriority::Low); + assert!(!bot.skip_tool_confirmation); + } + + #[test] + fn core_service_agent_runtime_owner_normalizes_remote_session_model_ids() { + assert_eq!( + normalize_remote_session_model_id(None), + Some("auto".to_string()) + ); + assert_eq!( + normalize_remote_session_model_id(Some("".to_string())), + Some("auto".to_string()) + ); + assert_eq!( + normalize_remote_session_model_id(Some(" default ".to_string())), + Some("auto".to_string()) + ); + assert_eq!( + normalize_remote_session_model_id(Some(" model-1 ".to_string())), + Some("model-1".to_string()) + ); + } + + #[test] + fn core_service_agent_runtime_owner_normalizes_remote_model_selection_aliases() { + assert_eq!( + normalize_remote_model_selection("auto", None).unwrap(), + "auto" + ); + assert_eq!( + normalize_remote_model_selection("default", None).unwrap(), + "auto" + ); + assert_eq!( + normalize_remote_model_selection("primary", None).unwrap(), + "primary" + ); + assert_eq!( + normalize_remote_model_selection("fast", None).unwrap(), + "fast" + ); + assert_eq!( + normalize_remote_model_selection(" ", None).unwrap_err(), + "model_id is required" + ); + } + + #[test] + fn core_service_agent_runtime_owner_preserves_remote_chat_history_shape() { + let turn = remote_history_test_turn( + TurnStatus::Completed, + Some(serde_json::json!({ + "original_text": "original question", + "images": [ + { + "name": "screenshot.png", + "data_url": "data:image/png;base64,abcd" + } + ] + })), + ); + + let messages = remote_chat_messages_from_turns(&[turn]); + + assert_eq!(messages.len(), 2); + assert_eq!(messages[0].role, "user"); + assert_eq!(messages[0].content, "original question"); + assert_eq!( + messages[0].images.as_ref().unwrap()[0].name, + "screenshot.png" + ); + + assert_eq!(messages[1].role, "assistant"); + assert_eq!(messages[1].content, "visible text"); + assert_eq!(messages[1].thinking.as_deref(), Some("visible thought")); + let items = messages[1].items.as_ref().expect("assistant items"); + assert_eq!(items.len(), 3); + assert_eq!(items[0].item_type, "thinking"); + assert_eq!(items[1].item_type, "text"); + assert_eq!(items[2].item_type, "tool"); + assert_eq!( + messages[1].tools.as_ref().unwrap()[0].name, + "AskUserQuestion" + ); + } + + #[test] + fn core_service_agent_runtime_owner_skips_in_progress_remote_assistant_history() { + let turn = remote_history_test_turn(TurnStatus::InProgress, None); + + let messages = remote_chat_messages_from_turns(&[turn]); + + assert_eq!(messages.len(), 1); + assert_eq!(messages[0].role, "user"); + } + + #[test] + fn core_service_agent_runtime_owner_strips_enhanced_remote_user_input() { + let content = "User uploaded a file.\nUser's question:\n explain this "; + + assert_eq!(strip_remote_user_input_tags(content), "explain this"); + } + + fn remote_history_test_turn( + status: TurnStatus, + metadata: Option, + ) -> DialogTurnData { + DialogTurnData { + turn_id: "turn-1".to_string(), + turn_index: 0, + session_id: "session-1".to_string(), + timestamp: 1_000, + kind: DialogTurnKind::UserDialog, + agent_type: None, + user_message: UserMessageData { + id: "user-1".to_string(), + content: "fallback text".to_string(), + timestamp: 1_000, + metadata, + }, + model_rounds: vec![ModelRoundData { + id: "round-1".to_string(), + turn_id: "turn-1".to_string(), + round_index: 0, + timestamp: 1_100, + text_items: vec![ + TextItemData { + id: "text-hidden".to_string(), + content: "hidden text".to_string(), + is_streaming: false, + timestamp: 1_111, + is_markdown: true, + order_index: Some(1), + is_subagent_item: Some(true), + parent_task_tool_id: None, + subagent_session_id: None, + status: None, + }, + TextItemData { + id: "text-1".to_string(), + content: "visible text".to_string(), + is_streaming: false, + timestamp: 1_112, + is_markdown: true, + order_index: Some(1), + is_subagent_item: None, + parent_task_tool_id: None, + subagent_session_id: None, + status: None, + }, + ], + tool_items: vec![ToolItemData { + id: "tool-1".to_string(), + tool_name: "AskUserQuestion".to_string(), + tool_call: ToolCallData { + input: serde_json::json!({ "question": "confirm?" }), + id: "call-1".to_string(), + }, + tool_result: None, + ai_intent: None, + start_time: 1_130, + end_time: None, + duration_ms: Some(25), + queue_wait_ms: None, + preflight_ms: None, + confirmation_wait_ms: None, + execution_ms: None, + order_index: Some(2), + is_subagent_item: None, + parent_task_tool_id: None, + subagent_session_id: None, + subagent_model_id: None, + subagent_model_alias: None, + status: Some("running".to_string()), + interruption_reason: None, + }], + thinking_items: vec![ThinkingItemData { + id: "thinking-1".to_string(), + content: "visible thought".to_string(), + is_streaming: false, + is_collapsed: false, + timestamp: 1_105, + order_index: Some(0), + status: None, + is_subagent_item: None, + parent_task_tool_id: None, + subagent_session_id: None, + }], + start_time: 1_100, + end_time: Some(1_200), + duration_ms: Some(100), + provider_id: None, + model_id: None, + model_alias: None, + first_chunk_ms: None, + first_visible_output_ms: None, + stream_duration_ms: None, + attempt_count: None, + failure_category: None, + token_details: None, + status: "completed".to_string(), + }], + start_time: 1_000, + end_time: Some(1_250), + duration_ms: Some(250), + status, + } + } } diff --git a/src/crates/services-integrations/AGENTS.md b/src/crates/services-integrations/AGENTS.md index 43033c413..afb8cdf7d 100644 --- a/src/crates/services-integrations/AGENTS.md +++ b/src/crates/services-integrations/AGENTS.md @@ -24,7 +24,9 @@ slices that are outside pure product logic but still platform-neutral. persistence/workspace service reads, concrete scheduler submission, concrete session restore / terminal pre-warm adapters, and product execution remain core-owned unless a later reviewed port/provider moves them with equivalence - tests. Core bindings for these runtime adapters are centralized in + tests. Core remote dialog/cancel/file/tracker, remote model + catalog/session-model selection, and remote chat history persistence/message + conversion adapter bindings are centralized in `src/crates/core/src/service_agent_runtime.rs`. - Remote-SSH path/session identity helpers may live here; SSH channels, SFTP, remote FS, remote terminal, and manager assembly remain core-owned unless a