Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
51 changes: 51 additions & 0 deletions core/src/agent_api/tests.rs
Original file line number Diff line number Diff line change
Expand Up @@ -2388,6 +2388,57 @@ async fn subagent_events_populate_session_tracker() {
assert_eq!(session.subagent_tasks().await.len(), 1);
}

#[tokio::test]
async fn subagent_progress_events_accumulate_in_tracker() {
use super::runtime_events::RuntimeEventSink;
use crate::agent::AgentEvent;

let agent = Agent::from_config(test_config()).await.unwrap();
let session = agent
.session("/tmp/test-ws-subagent-progress", None)
.unwrap();

let run = session
.run_store
.create_run(session.session_id(), "parent prompt")
.await;
let sink = RuntimeEventSink::from_session(&session, &run.id);

let task_id = "task-progress".to_string();
let child_session_id = format!("task-run-{}", task_id);

sink.observe(&AgentEvent::SubagentStart {
task_id: task_id.clone(),
session_id: child_session_id.clone(),
parent_session_id: session.session_id().to_string(),
agent: "explore".to_string(),
description: "demo".to_string(),
})
.await;

sink.observe(&AgentEvent::SubagentProgress {
task_id: task_id.clone(),
session_id: child_session_id.clone(),
status: "tool_completed".to_string(),
metadata: serde_json::json!({ "tool": "bash", "exit_code": 0 }),
})
.await;

sink.observe(&AgentEvent::SubagentProgress {
task_id: task_id.clone(),
session_id: child_session_id.clone(),
status: "turn_completed".to_string(),
metadata: serde_json::json!({ "turn": 1, "total_tokens": 50 }),
})
.await;

let snap = session.subagent_task(&task_id).await.unwrap();
assert_eq!(snap.progress.len(), 2);
assert_eq!(snap.progress[0].status, "tool_completed");
assert_eq!(snap.progress[1].status, "turn_completed");
assert_eq!(snap.progress[1].metadata["total_tokens"], 50);
}

#[tokio::test]
async fn subagent_tasks_scope_to_parent_session() {
use super::runtime_events::RuntimeEventSink;
Expand Down
193 changes: 191 additions & 2 deletions core/src/tools/task.rs
Original file line number Diff line number Diff line change
Expand Up @@ -92,6 +92,60 @@ fn compact_task_output(output: &str) -> (String, bool) {
)
}

/// Translate selected child-loop events into a `SubagentProgress` milestone
/// for the parent broadcast. Returns `None` for events that aren't worth
/// surfacing as progress (text deltas, tool starts, subagent events from
/// nested delegation, etc.).
///
/// Currently emits progress for:
/// - `ToolEnd` → `status = "tool_completed"`,
/// `metadata = { tool, exit_code, output_bytes, error_kind? }`
/// - `TurnEnd` → `status = "turn_completed"`,
/// `metadata = { turn, total_tokens, prompt_tokens, completion_tokens }`
fn synthesize_subagent_progress(
event: &AgentEvent,
task_id: &str,
session_id: &str,
) -> Option<AgentEvent> {
match event {
AgentEvent::ToolEnd {
name,
output,
exit_code,
error_kind,
..
} => {
let mut metadata = serde_json::json!({
"tool": name,
"exit_code": exit_code,
"output_bytes": output.len(),
});
if let Some(kind) = error_kind {
metadata["error_kind"] =
serde_json::to_value(kind).unwrap_or(serde_json::Value::Null);
}
Some(AgentEvent::SubagentProgress {
task_id: task_id.to_string(),
session_id: session_id.to_string(),
status: "tool_completed".to_string(),
metadata,
})
}
AgentEvent::TurnEnd { turn, usage } => Some(AgentEvent::SubagentProgress {
task_id: task_id.to_string(),
session_id: session_id.to_string(),
status: "turn_completed".to_string(),
metadata: serde_json::json!({
"turn": turn,
"total_tokens": usage.total_tokens,
"prompt_tokens": usage.prompt_tokens,
"completion_tokens": usage.completion_tokens,
}),
}),
_ => None,
}
}

fn task_artifact_id(result: &TaskResult) -> String {
format!("task-output:{}", result.task_id)
}
Expand Down Expand Up @@ -300,14 +354,25 @@ impl TaskExecutor {
child_config,
);

// Create an mpsc channel for the child agent and forward events to broadcast
// Create an mpsc channel for the child agent and forward events to broadcast.
// Selected child events (ToolEnd, TurnEnd) are also surfaced to the parent
// broadcast as synthetic `SubagentProgress` events so dashboards can observe
// mid-task milestones without subscribing to the raw event stream.
let child_event_tx = if let Some(ref broadcast_tx) = event_tx {
let (mpsc_tx, mut mpsc_rx) = tokio::sync::mpsc::channel(100);
let broadcast_tx_clone = broadcast_tx.clone();
let progress_task_id = task_id.clone();
let progress_session_id = session_id.clone();

// Spawn a task to forward events from mpsc to broadcast
tokio::spawn(async move {
while let Some(event) = mpsc_rx.recv().await {
if let Some(progress) = synthesize_subagent_progress(
&event,
&progress_task_id,
&progress_session_id,
) {
let _ = broadcast_tx_clone.send(progress);
}
let _ = broadcast_tx_clone.send(event);
}
});
Expand Down Expand Up @@ -1962,10 +2027,12 @@ mod tests {

let mut starts = Vec::new();
let mut ends = Vec::new();
let mut progress_statuses: Vec<String> = Vec::new();
while let Ok(event) = rx.try_recv() {
match event {
AgentEvent::SubagentStart { description, .. } => starts.push(description),
AgentEvent::SubagentEnd { agent, success, .. } => ends.push((agent, success)),
AgentEvent::SubagentProgress { status, .. } => progress_statuses.push(status),
_ => {}
}
}
Expand All @@ -1976,6 +2043,17 @@ mod tests {
assert!(ends
.iter()
.all(|(agent, success)| agent == "worker" && *success));
// Each child loop emits at least one TurnEnd, so we expect at least
// two synthesized turn_completed progress events across the run.
assert!(
progress_statuses
.iter()
.filter(|s| s == &"turn_completed")
.count()
>= 2,
"expected at least two turn_completed progress events, got {:?}",
progress_statuses
);
}

#[tokio::test]
Expand Down Expand Up @@ -2082,4 +2160,115 @@ mod tests {
);
}
}

#[test]
fn synthesize_progress_emits_tool_completed_for_tool_end() {
let event = AgentEvent::ToolEnd {
id: "call-1".to_string(),
name: "bash".to_string(),
output: "hello".to_string(),
exit_code: 0,
metadata: None,
error_kind: None,
};
let progress =
synthesize_subagent_progress(&event, "task-1", "task-run-task-1").expect("some");
match progress {
AgentEvent::SubagentProgress {
task_id,
session_id,
status,
metadata,
} => {
assert_eq!(task_id, "task-1");
assert_eq!(session_id, "task-run-task-1");
assert_eq!(status, "tool_completed");
assert_eq!(metadata["tool"], "bash");
assert_eq!(metadata["exit_code"], 0);
assert_eq!(metadata["output_bytes"], 5);
assert!(metadata.get("error_kind").is_none());
}
other => panic!("expected SubagentProgress, got {:?}", other),
}
}

#[test]
fn synthesize_progress_includes_error_kind_when_present() {
let event = AgentEvent::ToolEnd {
id: "call-2".to_string(),
name: "edit".to_string(),
output: "boom".to_string(),
exit_code: 1,
metadata: None,
error_kind: Some(crate::tools::ToolErrorKind::NotFound {
path: "missing.txt".to_string(),
}),
};
let progress =
synthesize_subagent_progress(&event, "task-x", "task-run-task-x").expect("some");
if let AgentEvent::SubagentProgress { metadata, .. } = progress {
assert!(
metadata.get("error_kind").is_some(),
"error_kind should propagate into metadata"
);
} else {
panic!("expected SubagentProgress");
}
}

#[test]
fn synthesize_progress_emits_turn_completed_for_turn_end() {
let event = AgentEvent::TurnEnd {
turn: 3,
usage: crate::llm::TokenUsage {
prompt_tokens: 100,
completion_tokens: 25,
total_tokens: 125,
cache_read_tokens: None,
cache_write_tokens: None,
},
};
let progress =
synthesize_subagent_progress(&event, "task-1", "task-run-task-1").expect("some");
if let AgentEvent::SubagentProgress {
status, metadata, ..
} = progress
{
assert_eq!(status, "turn_completed");
assert_eq!(metadata["turn"], 3);
assert_eq!(metadata["total_tokens"], 125);
assert_eq!(metadata["prompt_tokens"], 100);
assert_eq!(metadata["completion_tokens"], 25);
} else {
panic!("expected SubagentProgress");
}
}

#[test]
fn synthesize_progress_ignores_unrelated_events() {
let ignored = [
AgentEvent::TextDelta {
text: "hi".to_string(),
},
AgentEvent::ToolStart {
id: "x".to_string(),
name: "bash".to_string(),
},
AgentEvent::TurnStart { turn: 1 },
AgentEvent::SubagentStart {
task_id: "nested".to_string(),
session_id: "nested-run".to_string(),
parent_session_id: "parent".to_string(),
agent: "explore".to_string(),
description: "nested".to_string(),
},
];
for event in &ignored {
assert!(
synthesize_subagent_progress(event, "task", "session").is_none(),
"{:?} should not emit progress",
event
);
}
}
}
Loading