From f53bb57847371a3533effb5947e010e10f2c740d Mon Sep 17 00:00:00 2001 From: Claude Date: Sat, 23 May 2026 22:29:46 +0800 Subject: [PATCH 1/2] feat(core): emit SubagentProgress for child tool/turn milestones MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit The `SubagentProgress` event variant exists but until now was never sent — the subagent task tracker introduced in the previous commit observed Start and End but had no mid-task signal to update from. Adds a `synthesize_subagent_progress()` helper called by the mpsc → broadcast forwarder inside `TaskExecutor::execute_with_task_id`. It translates two child-loop events into compact progress milestones on the parent broadcast: - `ToolEnd` → `status = "tool_completed"`, metadata `{ tool, exit_code, output_bytes, error_kind? }` - `TurnEnd` → `status = "turn_completed"`, metadata `{ turn, total_tokens, prompt_tokens, completion_tokens }` Noisy events (TextDelta / ToolStart / ToolOutputDelta / nested Subagent*) are intentionally not translated — consumers needing token-level detail should subscribe to the raw event stream directly. The original child events are still forwarded unchanged; progress is additive, so downstream consumers see both the synthesized milestone and the underlying event. --- core/src/agent_api/tests.rs | 51 ++++++++++ core/src/tools/task.rs | 193 +++++++++++++++++++++++++++++++++++- 2 files changed, 242 insertions(+), 2 deletions(-) diff --git a/core/src/agent_api/tests.rs b/core/src/agent_api/tests.rs index 7a9b835..34b737c 100644 --- a/core/src/agent_api/tests.rs +++ b/core/src/agent_api/tests.rs @@ -2388,6 +2388,57 @@ async fn subagent_events_populate_session_tracker() { assert_eq!(session.subagent_tasks().await.len(), 1); } +#[tokio::test] +async fn subagent_progress_events_accumulate_in_tracker() { + use super::runtime_events::RuntimeEventSink; + use crate::agent::AgentEvent; + + let agent = Agent::from_config(test_config()).await.unwrap(); + let session = agent + .session("/tmp/test-ws-subagent-progress", None) + .unwrap(); + + let run = session + .run_store + .create_run(session.session_id(), "parent prompt") + .await; + let sink = RuntimeEventSink::from_session(&session, &run.id); + + let task_id = "task-progress".to_string(); + let child_session_id = format!("task-run-{}", task_id); + + sink.observe(&AgentEvent::SubagentStart { + task_id: task_id.clone(), + session_id: child_session_id.clone(), + parent_session_id: session.session_id().to_string(), + agent: "explore".to_string(), + description: "demo".to_string(), + }) + .await; + + sink.observe(&AgentEvent::SubagentProgress { + task_id: task_id.clone(), + session_id: child_session_id.clone(), + status: "tool_completed".to_string(), + metadata: serde_json::json!({ "tool": "bash", "exit_code": 0 }), + }) + .await; + + sink.observe(&AgentEvent::SubagentProgress { + task_id: task_id.clone(), + session_id: child_session_id.clone(), + status: "turn_completed".to_string(), + metadata: serde_json::json!({ "turn": 1, "total_tokens": 50 }), + }) + .await; + + let snap = session.subagent_task(&task_id).await.unwrap(); + assert_eq!(snap.progress.len(), 2); + assert_eq!(snap.progress[0].status, "tool_completed"); + assert_eq!(snap.progress[1].status, "turn_completed"); + assert_eq!(snap.progress[1].metadata["total_tokens"], 50); +} + #[tokio::test] async fn subagent_tasks_scope_to_parent_session() { use super::runtime_events::RuntimeEventSink; diff --git a/core/src/tools/task.rs b/core/src/tools/task.rs index 1ef612a..5789784 100644 --- a/core/src/tools/task.rs +++ b/core/src/tools/task.rs @@ -92,6 +92,60 @@ fn compact_task_output(output: &str) -> (String, bool) { ) } +/// Translate selected child-loop events into a `SubagentProgress` milestone +/// for the parent broadcast. Returns `None` for events that aren't worth +/// surfacing as progress (text deltas, tool starts, subagent events from +/// nested delegation, etc.). +/// +/// Currently emits progress for: +/// - `ToolEnd` → `status = "tool_completed"`, +/// `metadata = { tool, exit_code, output_bytes, error_kind? }` +/// - `TurnEnd` → `status = "turn_completed"`, +/// `metadata = { turn, total_tokens, prompt_tokens, completion_tokens }` +fn synthesize_subagent_progress( + event: &AgentEvent, + task_id: &str, + session_id: &str, +) -> Option { + match event { + AgentEvent::ToolEnd { + name, + output, + exit_code, + error_kind, + .. + } => { + let mut metadata = serde_json::json!({ + "tool": name, + "exit_code": exit_code, + "output_bytes": output.len(), + }); + if let Some(kind) = error_kind { + metadata["error_kind"] = + serde_json::to_value(kind).unwrap_or(serde_json::Value::Null); + } + Some(AgentEvent::SubagentProgress { + task_id: task_id.to_string(), + session_id: session_id.to_string(), + status: "tool_completed".to_string(), + metadata, + }) + } + AgentEvent::TurnEnd { turn, usage } => Some(AgentEvent::SubagentProgress { + task_id: task_id.to_string(), + session_id: session_id.to_string(), + status: "turn_completed".to_string(), + metadata: serde_json::json!({ + "turn": turn, + "total_tokens": usage.total_tokens, + "prompt_tokens": usage.prompt_tokens, + "completion_tokens": usage.completion_tokens, + }), + }), + _ => None, + } +} + fn task_artifact_id(result: &TaskResult) -> String { format!("task-output:{}", result.task_id) } @@ -300,14 +354,25 @@ impl TaskExecutor { child_config, ); - // Create an mpsc channel for the child agent and forward events to broadcast + // Create an mpsc channel for the child agent and forward events to broadcast. + // Selected child events (ToolEnd, TurnEnd) are also surfaced to the parent + // broadcast as synthetic `SubagentProgress` events so dashboards can observe + // mid-task milestones without subscribing to the raw event stream. let child_event_tx = if let Some(ref broadcast_tx) = event_tx { let (mpsc_tx, mut mpsc_rx) = tokio::sync::mpsc::channel(100); let broadcast_tx_clone = broadcast_tx.clone(); + let progress_task_id = task_id.clone(); + let progress_session_id = session_id.clone(); - // Spawn a task to forward events from mpsc to broadcast tokio::spawn(async move { while let Some(event) = mpsc_rx.recv().await { + if let Some(progress) = synthesize_subagent_progress( + &event, + &progress_task_id, + &progress_session_id, + ) { + let _ = broadcast_tx_clone.send(progress); + } let _ = broadcast_tx_clone.send(event); } }); @@ -1962,10 +2027,12 @@ mod tests { let mut starts = Vec::new(); let mut ends = Vec::new(); + let mut progress_statuses: Vec = Vec::new(); while let Ok(event) = rx.try_recv() { match event { AgentEvent::SubagentStart { description, .. } => starts.push(description), AgentEvent::SubagentEnd { agent, success, .. } => ends.push((agent, success)), + AgentEvent::SubagentProgress { status, .. } => progress_statuses.push(status), _ => {} } } @@ -1976,6 +2043,17 @@ mod tests { assert!(ends .iter() .all(|(agent, success)| agent == "worker" && *success)); + // Each child loop emits at least one TurnEnd, so we expect at least + // two synthesized turn_completed progress events across the run. + assert!( + progress_statuses + .iter() + .filter(|s| s == &"turn_completed") + .count() + >= 2, + "expected at least two turn_completed progress events, got {:?}", + progress_statuses + ); } #[tokio::test] @@ -2082,4 +2160,115 @@ mod tests { ); } } + + #[test] + fn synthesize_progress_emits_tool_completed_for_tool_end() { + let event = AgentEvent::ToolEnd { + id: "call-1".to_string(), + name: "bash".to_string(), + output: "hello".to_string(), + exit_code: 0, + metadata: None, + error_kind: None, + }; + let progress = + synthesize_subagent_progress(&event, "task-1", "task-run-task-1").expect("some"); + match progress { + AgentEvent::SubagentProgress { + task_id, + session_id, + status, + metadata, + } => { + assert_eq!(task_id, "task-1"); + assert_eq!(session_id, "task-run-task-1"); + assert_eq!(status, "tool_completed"); + assert_eq!(metadata["tool"], "bash"); + assert_eq!(metadata["exit_code"], 0); + assert_eq!(metadata["output_bytes"], 5); + assert!(metadata.get("error_kind").is_none()); + } + other => panic!("expected SubagentProgress, got {:?}", other), + } + } + + #[test] + fn synthesize_progress_includes_error_kind_when_present() { + let event = AgentEvent::ToolEnd { + id: "call-2".to_string(), + name: "edit".to_string(), + output: "boom".to_string(), + exit_code: 1, + metadata: None, + error_kind: Some(crate::tools::ToolErrorKind::NotFound { + path: "missing.txt".to_string(), + }), + }; + let progress = + synthesize_subagent_progress(&event, "task-x", "task-run-task-x").expect("some"); + if let AgentEvent::SubagentProgress { metadata, .. } = progress { + assert!( + metadata.get("error_kind").is_some(), + "error_kind should propagate into metadata" + ); + } else { + panic!("expected SubagentProgress"); + } + } + + #[test] + fn synthesize_progress_emits_turn_completed_for_turn_end() { + let event = AgentEvent::TurnEnd { + turn: 3, + usage: crate::llm::TokenUsage { + prompt_tokens: 100, + completion_tokens: 25, + total_tokens: 125, + cache_read_tokens: None, + cache_write_tokens: None, + }, + }; + let progress = + synthesize_subagent_progress(&event, "task-1", "task-run-task-1").expect("some"); + if let AgentEvent::SubagentProgress { + status, metadata, .. + } = progress + { + assert_eq!(status, "turn_completed"); + assert_eq!(metadata["turn"], 3); + assert_eq!(metadata["total_tokens"], 125); + assert_eq!(metadata["prompt_tokens"], 100); + assert_eq!(metadata["completion_tokens"], 25); + } else { + panic!("expected SubagentProgress"); + } + } + + #[test] + fn synthesize_progress_ignores_unrelated_events() { + let ignored = [ + AgentEvent::TextDelta { + text: "hi".to_string(), + }, + AgentEvent::ToolStart { + id: "x".to_string(), + name: "bash".to_string(), + }, + AgentEvent::TurnStart { turn: 1 }, + AgentEvent::SubagentStart { + task_id: "nested".to_string(), + session_id: "nested-run".to_string(), + parent_session_id: "parent".to_string(), + agent: "explore".to_string(), + description: "nested".to_string(), + }, + ]; + for event in &ignored { + assert!( + synthesize_subagent_progress(event, "task", "session").is_none(), + "{:?} should not emit progress", + event + ); + } + } } From 8f0fd3a1a9925ae327d445480a7e4dbd8cf41c2d Mon Sep 17 00:00:00 2001 From: Claude Date: Sat, 23 May 2026 22:40:56 +0800 Subject: [PATCH 2/2] feat(node-sdk): expose subagent task query API MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit Mirrors the new Session APIs from the core crate so JS/TS callers can introspect delegated subagent tasks without parsing run_events. - `Session.subagentTask(taskId)` → snapshot or null - `Session.subagentTasks()` → snapshots from this session - `Session.pendingSubagentTasks()` → only `running` entries Regenerated index.d.ts also picks up doc-comment drift and napi-rs ordering churn that had accumulated since the last build. The hand-added type aliases (ToolErrorKind union, VerificationStatus / VerificationCheck / VerificationReport / ToolArtifact) are preserved — they aren't generatable from Rust and have to be reinserted around each rebuild. test.mjs gains a small smoke block confirming the three methods exist and return the expected empty-state shapes (array, array, null). --- sdk/node/index.d.ts | 81 +++++++++++++++++++++++++++------------------ sdk/node/src/lib.rs | 38 +++++++++++++++++++++ sdk/node/test.mjs | 14 ++++++++ 3 files changed, 101 insertions(+), 32 deletions(-) diff --git a/sdk/node/index.d.ts b/sdk/node/index.d.ts index 87208eb..d8dd9ed 100644 --- a/sdk/node/index.d.ts +++ b/sdk/node/index.d.ts @@ -73,10 +73,9 @@ export interface AgentEvent { data?: string /** * Structured discriminant for tool failures on `tool_end` events - * (JSON-encoded with a `type` field on the top level, e.g. - * `{"type":"version_conflict","path":"doc.md","expected":"etag-1","actual":"etag-2"}`). - * Undefined on success or untyped failure. Streaming consumers parse - * this to branch on the failure kind without scanning `toolOutput`. + * (JSON-encoded with a `type` field). `None` on success or untyped + * failure. Lets streaming consumers branch on the failure kind + * without scanning `tool_output`. */ errorKindJson?: string } @@ -126,7 +125,7 @@ export interface ToolResult { * Structured discriminant for tool failures, JSON-encoded with a * `type` field on the top level — e.g. * `{"type":"version_conflict","path":"doc.md","expected":"etag-1","actual":"etag-2"}`. - * Undefined on success or untyped failure. SDK callers parse it to + * `None` on success or untyped failure. SDK callers parse it to * branch on the failure kind without scanning the `output` string. */ errorKindJson?: string @@ -225,16 +224,6 @@ export interface InlineSkill { /** Markdown content for the skill. */ content: string } -export interface AutoDelegationOptions { - /** Enable runtime-driven automatic child-agent delegation. */ - enabled?: boolean - /** Allow automatic delegation to launch multiple child agents in parallel. */ - autoParallel?: boolean - /** Minimum local confidence required to auto-delegate a child task. */ - minConfidence?: number - /** Maximum number of automatic child tasks per user request. */ - maxTasks?: number -} export interface JsMemoryStore { backend: string dir?: string @@ -305,20 +294,21 @@ export interface JsS3BackendConfig { */ maxGrepBytesPerObject?: number /** - * Concurrent object downloads during `grep`. Defaults to 8 on the Rust - * side. Set lower when the gitserver / S3 endpoint rate-limits + * Concurrent object downloads during `grep`. Defaults to 8 on the + * Rust side. Set lower when the gitserver / S3 endpoint rate-limits * aggressively; set higher when latency dominates. Ignored when * `searchEnabled` is `false`. */ searchConcurrency?: number } /** - * Configuration for a `RemoteGitBackend` — an HTTP/JSON client that + * Configuration for a [`RemoteGitBackend`] — an HTTP/JSON client that * brings the `git` tool to non-local workspaces (S3, future container / * DFS). * * Pass alongside `workspaceBackend` on a session to attach remote git - * on top of any filesystem backend. + * on top of any filesystem backend. The protocol is specified in the + * repository RFC `apps/docs/content/docs/en/code/rfcs/workspace-remote-git.mdx`. */ export interface JsRemoteGitBackendConfig { /** @@ -333,14 +323,15 @@ export interface JsRemoteGitBackendConfig { repoId: string /** * Bearer token sent as `Authorization: Bearer `. Required in - * production; omitting it emits a server-side warning and is only safe + * production; omitting it emits a `tracing::warn!` and is only safe * on a trusted localhost gitserver. */ bearerToken?: string /** - * mTLS client certificate path (PEM). When set together with `clientKeyPem`, - * the backend reads both files at construction and configures mTLS on the - * HTTP client. Setting only one of the pair errors at construction. + * mTLS client certificate path (PEM). When set together with + * `clientKeyPem`, the backend reads both files at construction and + * configures mTLS on the HTTP client. Setting only one of the pair + * errors at construction. */ clientCertPem?: string /** @@ -444,12 +435,19 @@ export interface PendingConfirmation { /** Milliseconds remaining before the confirmation times out. */ remainingMs: number } -/** Retention limits for large tool/program artifacts. */ -export interface ArtifactStoreLimits { - /** Maximum number of artifacts retained by a session. */ - maxArtifacts?: number - /** Maximum total artifact content bytes retained by a session. */ - maxBytes?: number +export interface AutoDelegationOptions { + /** Enable runtime-driven automatic child-agent delegation. */ + enabled?: boolean + /** + * Allow automatic delegation to launch multiple child agents in parallel. + * + * Manual `parallel_task` calls remain available when this is false. + */ + autoParallel?: boolean + /** Minimum local confidence required to auto-delegate a child task. */ + minConfidence?: number + /** Maximum number of automatic child tasks per user request. */ + maxTasks?: number } export interface SessionOptions { /** Override the default model. Format: "provider/model" (e.g., "openai/gpt-4o"). */ @@ -668,6 +666,13 @@ export interface SessionOptions { */ maxExecutionTimeMs?: number } +/** Retention limits for large tool/program artifacts. */ +export interface ArtifactStoreLimits { + /** Maximum number of artifacts retained by a session. */ + maxArtifacts?: number + /** Maximum total artifact content bytes retained by a session. */ + maxBytes?: number +} /** A single message in conversation history. */ export interface MessageObject { role: string @@ -1174,6 +1179,18 @@ export declare class Session { currentRun(): Promise /** Return active tool calls observed for the currently running operation. */ activeTools(): Promise + /** + * Look up a delegated subagent task by id. Resolves to `null` when no + * such task has been observed in this session. + */ + subagentTask(taskId: string): Promise + /** + * Return snapshots of every delegated subagent task observed in this + * session (including completed and failed ones), oldest first. + */ + subagentTasks(): Promise + /** Return snapshots of subagent tasks still in `running` state. */ + pendingSubagentTasks(): Promise /** Cancel a specific run only if it is still the active run. */ cancelRun(runId: string): Promise /** Execute a tool by name, bypassing the LLM. */ @@ -1261,9 +1278,9 @@ export declare class Session { /** Return compact execution trace events recorded for this session. */ traceEvents(): any /** Return structured verification reports recorded for this session. */ - verificationReports(): Array + verificationReports(): any /** Add externally produced verification reports to this session. */ - recordVerificationReports(reports: Array | VerificationReport): void + recordVerificationReports(reports: any): void /** Return a structured verification summary for this session. */ verificationSummary(): any /** Return a concise human-readable verification summary for this session. */ @@ -1369,7 +1386,7 @@ export declare class Session { /** Return full model-visible tool definitions currently registered on this session. */ toolDefinitions(): any /** Return a stored tool artifact by URI, or null if it is not retained. */ - getArtifact(artifactUri: string): ToolArtifact | null + getArtifact(artifactUri: string): any /** * Register a hook for lifecycle event interception. * diff --git a/sdk/node/src/lib.rs b/sdk/node/src/lib.rs index 01a5e6f..b77f7d4 100644 --- a/sdk/node/src/lib.rs +++ b/sdk/node/src/lib.rs @@ -3098,6 +3098,44 @@ impl Session { .map_err(|e| napi::Error::from_reason(format!("Serialization error: {e}"))) } + /// Look up a delegated subagent task by id. Resolves to `null` when no + /// such task has been observed in this session. + #[napi(js_name = "subagentTask")] + pub async fn subagent_task(&self, task_id: String) -> napi::Result { + let session = self.inner.clone(); + let snapshot = get_runtime() + .spawn(async move { session.subagent_task(&task_id).await }) + .await + .map_err(|e| napi::Error::from_reason(format!("Task join error: {e}")))?; + serde_json::to_value(snapshot) + .map_err(|e| napi::Error::from_reason(format!("Serialization error: {e}"))) + } + + /// Return snapshots of every delegated subagent task observed in this + /// session (including completed and failed ones), oldest first. + #[napi(js_name = "subagentTasks")] + pub async fn subagent_tasks(&self) -> napi::Result { + let session = self.inner.clone(); + let tasks = get_runtime() + .spawn(async move { session.subagent_tasks().await }) + .await + .map_err(|e| napi::Error::from_reason(format!("Task join error: {e}")))?; + serde_json::to_value(tasks) + .map_err(|e| napi::Error::from_reason(format!("Serialization error: {e}"))) + } + + /// Return snapshots of subagent tasks still in `running` state. + #[napi(js_name = "pendingSubagentTasks")] + pub async fn pending_subagent_tasks(&self) -> napi::Result { + let session = self.inner.clone(); + let tasks = get_runtime() + .spawn(async move { session.pending_subagent_tasks().await }) + .await + .map_err(|e| napi::Error::from_reason(format!("Task join error: {e}")))?; + serde_json::to_value(tasks) + .map_err(|e| napi::Error::from_reason(format!("Serialization error: {e}"))) + } + /// Cancel a specific run only if it is still the active run. #[napi(js_name = "cancelRun")] pub async fn cancel_run(&self, run_id: String) -> napi::Result { diff --git a/sdk/node/test.mjs b/sdk/node/test.mjs index e7acb09..081390d 100644 --- a/sdk/node/test.mjs +++ b/sdk/node/test.mjs @@ -100,6 +100,20 @@ assert.equal( ) assert.match(result.text, /tools=\d+$/, 'custom slash command should receive toolNames in context') +// --- Subagent task query API (PR #3): three new Session methods --- +{ + const list = await session.subagentTasks() + assert.ok(Array.isArray(list), 'subagentTasks() should resolve to an array') + assert.equal(list.length, 0, 'fresh session should have no subagent tasks') + + const pending = await session.pendingSubagentTasks() + assert.ok(Array.isArray(pending), 'pendingSubagentTasks() should resolve to an array') + assert.equal(pending.length, 0, 'fresh session should have no pending subagent tasks') + + const missing = await session.subagentTask('task-does-not-exist') + assert.equal(missing, null, 'unknown subagent task id should resolve to null') +} + session.close() console.log('node sdk integration ok')