From 465abd12b9c5c5939e05d1dbd9349bc0848bc26a Mon Sep 17 00:00:00 2001 From: Claude Date: Sat, 23 May 2026 22:29:46 +0800 Subject: [PATCH] feat(core): emit SubagentProgress for child tool/turn milestones MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit The `SubagentProgress` event variant exists but until now was never sent — the subagent task tracker introduced in the previous commit observed Start and End but had no mid-task signal to update from. Adds a `synthesize_subagent_progress()` helper called by the mpsc → broadcast forwarder inside `TaskExecutor::execute_with_task_id`. It translates two child-loop events into compact progress milestones on the parent broadcast: - `ToolEnd` → `status = "tool_completed"`, metadata `{ tool, exit_code, output_bytes, error_kind? }` - `TurnEnd` → `status = "turn_completed"`, metadata `{ turn, total_tokens, prompt_tokens, completion_tokens }` Noisy events (TextDelta / ToolStart / ToolOutputDelta / nested Subagent*) are intentionally not translated — consumers needing token-level detail should subscribe to the raw event stream directly. The original child events are still forwarded unchanged; progress is additive, so downstream consumers see both the synthesized milestone and the underlying event. --- core/src/agent_api/tests.rs | 51 ++++++++++ core/src/tools/task.rs | 193 +++++++++++++++++++++++++++++++++++- 2 files changed, 242 insertions(+), 2 deletions(-) diff --git a/core/src/agent_api/tests.rs b/core/src/agent_api/tests.rs index 7a9b835..34b737c 100644 --- a/core/src/agent_api/tests.rs +++ b/core/src/agent_api/tests.rs @@ -2388,6 +2388,57 @@ async fn subagent_events_populate_session_tracker() { assert_eq!(session.subagent_tasks().await.len(), 1); } +#[tokio::test] +async fn subagent_progress_events_accumulate_in_tracker() { + use super::runtime_events::RuntimeEventSink; + use crate::agent::AgentEvent; + + let agent = Agent::from_config(test_config()).await.unwrap(); + let session = agent + .session("/tmp/test-ws-subagent-progress", None) + .unwrap(); + + let run = session + .run_store + .create_run(session.session_id(), "parent prompt") + .await; + let sink = RuntimeEventSink::from_session(&session, &run.id); + + let task_id = "task-progress".to_string(); + let child_session_id = format!("task-run-{}", task_id); + + sink.observe(&AgentEvent::SubagentStart { + task_id: task_id.clone(), + session_id: child_session_id.clone(), + parent_session_id: session.session_id().to_string(), + agent: "explore".to_string(), + description: "demo".to_string(), + }) + .await; + + sink.observe(&AgentEvent::SubagentProgress { + task_id: task_id.clone(), + session_id: child_session_id.clone(), + status: "tool_completed".to_string(), + metadata: serde_json::json!({ "tool": "bash", "exit_code": 0 }), + }) + .await; + + sink.observe(&AgentEvent::SubagentProgress { + task_id: task_id.clone(), + session_id: child_session_id.clone(), + status: "turn_completed".to_string(), + metadata: serde_json::json!({ "turn": 1, "total_tokens": 50 }), + }) + .await; + + let snap = session.subagent_task(&task_id).await.unwrap(); + assert_eq!(snap.progress.len(), 2); + assert_eq!(snap.progress[0].status, "tool_completed"); + assert_eq!(snap.progress[1].status, "turn_completed"); + assert_eq!(snap.progress[1].metadata["total_tokens"], 50); +} + #[tokio::test] async fn subagent_tasks_scope_to_parent_session() { use super::runtime_events::RuntimeEventSink; diff --git a/core/src/tools/task.rs b/core/src/tools/task.rs index 1ef612a..5789784 100644 --- a/core/src/tools/task.rs +++ b/core/src/tools/task.rs @@ -92,6 +92,60 @@ fn compact_task_output(output: &str) -> (String, bool) { ) } +/// Translate selected child-loop events into a `SubagentProgress` milestone +/// for the parent broadcast. Returns `None` for events that aren't worth +/// surfacing as progress (text deltas, tool starts, subagent events from +/// nested delegation, etc.). +/// +/// Currently emits progress for: +/// - `ToolEnd` → `status = "tool_completed"`, +/// `metadata = { tool, exit_code, output_bytes, error_kind? }` +/// - `TurnEnd` → `status = "turn_completed"`, +/// `metadata = { turn, total_tokens, prompt_tokens, completion_tokens }` +fn synthesize_subagent_progress( + event: &AgentEvent, + task_id: &str, + session_id: &str, +) -> Option { + match event { + AgentEvent::ToolEnd { + name, + output, + exit_code, + error_kind, + .. + } => { + let mut metadata = serde_json::json!({ + "tool": name, + "exit_code": exit_code, + "output_bytes": output.len(), + }); + if let Some(kind) = error_kind { + metadata["error_kind"] = + serde_json::to_value(kind).unwrap_or(serde_json::Value::Null); + } + Some(AgentEvent::SubagentProgress { + task_id: task_id.to_string(), + session_id: session_id.to_string(), + status: "tool_completed".to_string(), + metadata, + }) + } + AgentEvent::TurnEnd { turn, usage } => Some(AgentEvent::SubagentProgress { + task_id: task_id.to_string(), + session_id: session_id.to_string(), + status: "turn_completed".to_string(), + metadata: serde_json::json!({ + "turn": turn, + "total_tokens": usage.total_tokens, + "prompt_tokens": usage.prompt_tokens, + "completion_tokens": usage.completion_tokens, + }), + }), + _ => None, + } +} + fn task_artifact_id(result: &TaskResult) -> String { format!("task-output:{}", result.task_id) } @@ -300,14 +354,25 @@ impl TaskExecutor { child_config, ); - // Create an mpsc channel for the child agent and forward events to broadcast + // Create an mpsc channel for the child agent and forward events to broadcast. + // Selected child events (ToolEnd, TurnEnd) are also surfaced to the parent + // broadcast as synthetic `SubagentProgress` events so dashboards can observe + // mid-task milestones without subscribing to the raw event stream. let child_event_tx = if let Some(ref broadcast_tx) = event_tx { let (mpsc_tx, mut mpsc_rx) = tokio::sync::mpsc::channel(100); let broadcast_tx_clone = broadcast_tx.clone(); + let progress_task_id = task_id.clone(); + let progress_session_id = session_id.clone(); - // Spawn a task to forward events from mpsc to broadcast tokio::spawn(async move { while let Some(event) = mpsc_rx.recv().await { + if let Some(progress) = synthesize_subagent_progress( + &event, + &progress_task_id, + &progress_session_id, + ) { + let _ = broadcast_tx_clone.send(progress); + } let _ = broadcast_tx_clone.send(event); } }); @@ -1962,10 +2027,12 @@ mod tests { let mut starts = Vec::new(); let mut ends = Vec::new(); + let mut progress_statuses: Vec = Vec::new(); while let Ok(event) = rx.try_recv() { match event { AgentEvent::SubagentStart { description, .. } => starts.push(description), AgentEvent::SubagentEnd { agent, success, .. } => ends.push((agent, success)), + AgentEvent::SubagentProgress { status, .. } => progress_statuses.push(status), _ => {} } } @@ -1976,6 +2043,17 @@ mod tests { assert!(ends .iter() .all(|(agent, success)| agent == "worker" && *success)); + // Each child loop emits at least one TurnEnd, so we expect at least + // two synthesized turn_completed progress events across the run. + assert!( + progress_statuses + .iter() + .filter(|s| s == &"turn_completed") + .count() + >= 2, + "expected at least two turn_completed progress events, got {:?}", + progress_statuses + ); } #[tokio::test] @@ -2082,4 +2160,115 @@ mod tests { ); } } + + #[test] + fn synthesize_progress_emits_tool_completed_for_tool_end() { + let event = AgentEvent::ToolEnd { + id: "call-1".to_string(), + name: "bash".to_string(), + output: "hello".to_string(), + exit_code: 0, + metadata: None, + error_kind: None, + }; + let progress = + synthesize_subagent_progress(&event, "task-1", "task-run-task-1").expect("some"); + match progress { + AgentEvent::SubagentProgress { + task_id, + session_id, + status, + metadata, + } => { + assert_eq!(task_id, "task-1"); + assert_eq!(session_id, "task-run-task-1"); + assert_eq!(status, "tool_completed"); + assert_eq!(metadata["tool"], "bash"); + assert_eq!(metadata["exit_code"], 0); + assert_eq!(metadata["output_bytes"], 5); + assert!(metadata.get("error_kind").is_none()); + } + other => panic!("expected SubagentProgress, got {:?}", other), + } + } + + #[test] + fn synthesize_progress_includes_error_kind_when_present() { + let event = AgentEvent::ToolEnd { + id: "call-2".to_string(), + name: "edit".to_string(), + output: "boom".to_string(), + exit_code: 1, + metadata: None, + error_kind: Some(crate::tools::ToolErrorKind::NotFound { + path: "missing.txt".to_string(), + }), + }; + let progress = + synthesize_subagent_progress(&event, "task-x", "task-run-task-x").expect("some"); + if let AgentEvent::SubagentProgress { metadata, .. } = progress { + assert!( + metadata.get("error_kind").is_some(), + "error_kind should propagate into metadata" + ); + } else { + panic!("expected SubagentProgress"); + } + } + + #[test] + fn synthesize_progress_emits_turn_completed_for_turn_end() { + let event = AgentEvent::TurnEnd { + turn: 3, + usage: crate::llm::TokenUsage { + prompt_tokens: 100, + completion_tokens: 25, + total_tokens: 125, + cache_read_tokens: None, + cache_write_tokens: None, + }, + }; + let progress = + synthesize_subagent_progress(&event, "task-1", "task-run-task-1").expect("some"); + if let AgentEvent::SubagentProgress { + status, metadata, .. + } = progress + { + assert_eq!(status, "turn_completed"); + assert_eq!(metadata["turn"], 3); + assert_eq!(metadata["total_tokens"], 125); + assert_eq!(metadata["prompt_tokens"], 100); + assert_eq!(metadata["completion_tokens"], 25); + } else { + panic!("expected SubagentProgress"); + } + } + + #[test] + fn synthesize_progress_ignores_unrelated_events() { + let ignored = [ + AgentEvent::TextDelta { + text: "hi".to_string(), + }, + AgentEvent::ToolStart { + id: "x".to_string(), + name: "bash".to_string(), + }, + AgentEvent::TurnStart { turn: 1 }, + AgentEvent::SubagentStart { + task_id: "nested".to_string(), + session_id: "nested-run".to_string(), + parent_session_id: "parent".to_string(), + agent: "explore".to_string(), + description: "nested".to_string(), + }, + ]; + for event in &ignored { + assert!( + synthesize_subagent_progress(event, "task", "session").is_none(), + "{:?} should not emit progress", + event + ); + } + } }