From 200bb116c3fa836d33b303cf4fbd8d600a447b7b Mon Sep 17 00:00:00 2001 From: Claude Date: Sat, 23 May 2026 23:42:41 +0800 Subject: [PATCH] feat: cancel in-flight subagent tasks by id Closes the last gap in the "subagent task as task dashboard" story: a parent session can now interrupt a delegated child run without canceling the whole parent run. Core - `SubagentStatus` gains a `Cancelled` variant. Late `SubagentEnd` events from the cancelled child do not downgrade it back to `Failed`. - `InMemorySubagentTaskTracker` now also stores `CancellationToken`s per task. `register_canceller` / `clear_canceller` / `cancel(id)` bracket the in-flight token lifetime; `cancel` fires the token and flips the snapshot status atomically. - `TaskExecutor` gains `with_subagent_tracker(...)`. When set, each task registers its token, then runs the child loop through `AgentLoop::execute_with_session(... Some(&token))` so the cancellation propagates into LLM streaming and tool execution. - `register_task_with_mcp` grows an optional tracker parameter so the session bootstrap path can share a single Arc with the executor and the live `AgentSession`. - `AgentSession::cancel_subagent_task(task_id)` exposes the operation to callers. SDKs - Node: `Session.cancelSubagentTask(taskId): Promise` via the same get_runtime().spawn pattern used by other run-control methods. - Python: `Session.cancel_subagent_task(task_id) -> bool` via the py.allow_threads / tokio block_on pattern. Tests - Tracker-level unit tests for the four interesting cases: cancel fires the token + flips status, cancel returns False on unknown ids, late SubagentEnd doesn't downgrade Cancelled, and clear_canceller disarms future cancel calls. - Integration test in agent_api/tests.rs drives a synthetic subagent lifecycle through `RuntimeEventSink`, registers a canceller, and asserts the public `cancel_subagent_task` API + the Cancelled terminal state survive a late SubagentEnd. - Node + Python smoke tests assert cancelling an unknown task id resolves to false / False. --- core/src/agent_api.rs | 9 ++ core/src/agent_api/capabilities.rs | 6 + core/src/agent_api/session_builder.rs | 3 +- core/src/agent_api/tests.rs | 56 ++++++++++ core/src/subagent_task_tracker.rs | 116 +++++++++++++++++++- core/src/tools/builtin/mod.rs | 16 ++- core/src/tools/task.rs | 40 ++++++- sdk/node/generated.d.ts | 6 + sdk/node/src/lib.rs | 12 ++ sdk/node/test.mjs | 3 + sdk/python/src/lib.rs | 8 ++ sdk/python/tests/test_subagent_query_api.py | 5 + 12 files changed, 272 insertions(+), 8 deletions(-) diff --git a/core/src/agent_api.rs b/core/src/agent_api.rs index 0d5c09c..ee5d6b0 100644 --- a/core/src/agent_api.rs +++ b/core/src/agent_api.rs @@ -594,6 +594,15 @@ impl AgentSession { .collect() } + /// Cancel an in-flight delegated subagent task by id. Returns `true` + /// when a cancellation token was found and fired, `false` when the + /// task id is unknown or the task has already finished. The eventual + /// `SubagentEnd` from the cancelled child loop won't downgrade the + /// terminal status — it stays `Cancelled`. + pub async fn cancel_subagent_task(&self, task_id: &str) -> bool { + self.subagent_tasks.cancel(task_id).await + } + /// Return a snapshot of the session's conversation history. pub fn history(&self) -> Vec { SessionView::from_session(self).history() diff --git a/core/src/agent_api/capabilities.rs b/core/src/agent_api/capabilities.rs index 6777481..69e668c 100644 --- a/core/src/agent_api/capabilities.rs +++ b/core/src/agent_api/capabilities.rs @@ -35,6 +35,7 @@ pub(super) struct SessionCapabilities { pub(super) context_providers: Vec>, pub(super) skill_registry: Arc, pub(super) agent_registry: Arc, + pub(super) subagent_tasks: Arc, } pub(super) fn build_session_capabilities(input: SessionCapabilityInput<'_>) -> SessionCapabilities { @@ -60,12 +61,14 @@ pub(super) fn build_session_capabilities(input: SessionCapabilityInput<'_>) -> S .set_search_config(search_config.clone()); } + let subagent_tasks = Arc::new(crate::subagent_task_tracker::InMemorySubagentTaskTracker::new()); let agent_registry = register_task_capability( input.code_config, input.opts, input.workspace, Arc::clone(&input.llm_client), &tool_executor, + Arc::clone(&subagent_tasks), ); // Register generate_object tool (structured JSON output) @@ -90,6 +93,7 @@ pub(super) fn build_session_capabilities(input: SessionCapabilityInput<'_>) -> S context_providers, skill_registry, agent_registry, + subagent_tasks, } } @@ -136,6 +140,7 @@ fn register_task_capability( workspace: &Path, llm_client: Arc, tool_executor: &Arc, + subagent_tasks: Arc, ) -> Arc { use crate::child_run::ChildRunContext; use crate::subagent::load_agents_from_dir; @@ -177,6 +182,7 @@ fn register_task_capability( workspace.display().to_string(), opts.mcp_manager.clone(), Some(parent_context), + Some(subagent_tasks), ); registry } diff --git a/core/src/agent_api/session_builder.rs b/core/src/agent_api/session_builder.rs index 5d4be97..f7c0564 100644 --- a/core/src/agent_api/session_builder.rs +++ b/core/src/agent_api/session_builder.rs @@ -103,6 +103,7 @@ pub(super) fn build_agent_session( let tool_defs = capabilities.tool_defs; let context_providers = capabilities.context_providers; let effective_registry = capabilities.skill_registry; + let subagent_tasks = capabilities.subagent_tasks; let prompt_slots = opts .prompt_slots @@ -219,7 +220,7 @@ pub(super) fn build_agent_session( cancel_token: Arc::new(tokio::sync::Mutex::new(None)), current_run_id: Arc::new(tokio::sync::Mutex::new(None)), run_store: Arc::new(crate::run::InMemoryRunStore::new()), - subagent_tasks: Arc::new(crate::subagent_task_tracker::InMemorySubagentTaskTracker::new()), + subagent_tasks, active_tools: Arc::new(tokio::sync::RwLock::new(HashMap::new())), trace_sink, verification_reports: Arc::new(RwLock::new(Vec::new())), diff --git a/core/src/agent_api/tests.rs b/core/src/agent_api/tests.rs index 34b737c..7de3448 100644 --- a/core/src/agent_api/tests.rs +++ b/core/src/agent_api/tests.rs @@ -2468,3 +2468,59 @@ async fn subagent_tasks_scope_to_parent_session() { assert!(session_b.subagent_tasks().await.is_empty()); assert!(session_b.subagent_task("task-from-a").await.is_none()); } + +#[tokio::test] +async fn cancel_subagent_task_marks_snapshot_cancelled() { + use super::runtime_events::RuntimeEventSink; + use crate::agent::AgentEvent; + use crate::subagent_task_tracker::SubagentStatus; + use tokio_util::sync::CancellationToken; + + let agent = Agent::from_config(test_config()).await.unwrap(); + let session = agent.session("/tmp/test-ws-subagent-cancel", None).unwrap(); + let run = session + .run_store + .create_run(session.session_id(), "parent") + .await; + let sink = RuntimeEventSink::from_session(&session, &run.id); + + let task_id = "task-to-cancel".to_string(); + sink.observe(&AgentEvent::SubagentStart { + task_id: task_id.clone(), + session_id: format!("task-run-{}", task_id), + parent_session_id: session.session_id().to_string(), + agent: "explore".to_string(), + description: "long task".to_string(), + }) + .await; + + // Simulate what TaskExecutor would do: register a cancellation token + // for this in-flight task so the public API has something to fire. + let token = CancellationToken::new(); + session + .subagent_tasks + .register_canceller(&task_id, token.clone()) + .await; + + assert!(session.cancel_subagent_task(&task_id).await); + assert!(token.is_cancelled()); + + let snap = session.subagent_task(&task_id).await.unwrap(); + assert_eq!(snap.status, SubagentStatus::Cancelled); + + // A late SubagentEnd from the cancelled child must not downgrade. + sink.observe(&AgentEvent::SubagentEnd { + task_id: task_id.clone(), + session_id: format!("task-run-{}", task_id), + agent: "explore".to_string(), + output: "Task cancelled by caller".to_string(), + success: false, + }) + .await; + let snap = session.subagent_task(&task_id).await.unwrap(); + assert_eq!(snap.status, SubagentStatus::Cancelled); + + // Cancelling again or against an unknown id is a no-op. + assert!(!session.cancel_subagent_task(&task_id).await); + assert!(!session.cancel_subagent_task("task-unknown").await); +} diff --git a/core/src/subagent_task_tracker.rs b/core/src/subagent_task_tracker.rs index 5f174f6..5226916 100644 --- a/core/src/subagent_task_tracker.rs +++ b/core/src/subagent_task_tracker.rs @@ -9,6 +9,7 @@ use crate::agent::AgentEvent; use serde::{Deserialize, Serialize}; use std::collections::HashMap; use tokio::sync::RwLock; +use tokio_util::sync::CancellationToken; #[derive(Debug, Clone, Copy, PartialEq, Eq, Serialize, Deserialize)] #[serde(rename_all = "snake_case")] @@ -16,6 +17,7 @@ pub enum SubagentStatus { Running, Completed, Failed, + Cancelled, } #[derive(Debug, Clone, Serialize, Deserialize)] @@ -47,6 +49,7 @@ pub struct SubagentTaskSnapshot { #[derive(Debug, Default)] pub struct InMemorySubagentTaskTracker { tasks: RwLock>, + cancellers: RwLock>, } impl InMemorySubagentTaskTracker { @@ -54,6 +57,44 @@ impl InMemorySubagentTaskTracker { Self::default() } + /// Register a `CancellationToken` for a running task so callers can + /// trigger cancellation through `cancel(task_id)`. The task executor + /// is expected to remove the entry on exit via `clear_canceller`. + pub async fn register_canceller(&self, task_id: &str, token: CancellationToken) { + self.cancellers + .write() + .await + .insert(task_id.to_string(), token); + } + + pub async fn clear_canceller(&self, task_id: &str) { + self.cancellers.write().await.remove(task_id); + } + + /// Fire the registered token and mark the snapshot as `Cancelled`. + /// Returns `true` if a token was found (caller can interpret as + /// "cancellation initiated"), `false` if the task id was unknown or + /// the task already finished. The eventual `SubagentEnd` event won't + /// overwrite the Cancelled status — see `record_event`. + pub async fn cancel(&self, task_id: &str) -> bool { + let token = self.cancellers.write().await.remove(task_id); + match token { + Some(token) => { + token.cancel(); + let now = now_ms(); + let mut tasks = self.tasks.write().await; + if let Some(entry) = tasks.get_mut(task_id) { + if entry.status == SubagentStatus::Running { + entry.status = SubagentStatus::Cancelled; + entry.updated_ms = now; + } + } + true + } + None => false, + } + } + /// Apply a single agent event to the tracker. Non-subagent events are ignored. pub async fn record_event(&self, event: &AgentEvent) { match event { @@ -148,11 +189,16 @@ impl InMemorySubagentTaskTracker { success: None, progress: Vec::new(), }); - entry.status = if *success { - SubagentStatus::Completed - } else { - SubagentStatus::Failed - }; + // Preserve a pre-set Cancelled status (set by `cancel()`) + // — a late SubagentEnd from the cancelled child loop is + // expected and must not downgrade the terminal state. + if entry.status != SubagentStatus::Cancelled { + entry.status = if *success { + SubagentStatus::Completed + } else { + SubagentStatus::Failed + }; + } entry.updated_ms = now; entry.finished_ms = Some(now); entry.output = Some(output.clone()); @@ -333,4 +379,64 @@ mod tests { .await; assert!(tracker.list().await.is_empty()); } + + #[tokio::test] + async fn cancel_fires_token_and_marks_snapshot_cancelled() { + let tracker = InMemorySubagentTaskTracker::new(); + tracker + .record_event(&start_event("task-c", "parent", "child")) + .await; + + let token = CancellationToken::new(); + tracker.register_canceller("task-c", token.clone()).await; + assert!(!token.is_cancelled()); + + let fired = tracker.cancel("task-c").await; + assert!(fired, "cancel should report success"); + assert!(token.is_cancelled(), "registered token should be triggered"); + + let snap = tracker.get("task-c").await.unwrap(); + assert_eq!(snap.status, SubagentStatus::Cancelled); + } + + #[tokio::test] + async fn cancel_returns_false_for_unknown_task() { + let tracker = InMemorySubagentTaskTracker::new(); + assert!(!tracker.cancel("task-does-not-exist").await); + } + + #[tokio::test] + async fn late_subagent_end_does_not_downgrade_cancelled_status() { + let tracker = InMemorySubagentTaskTracker::new(); + tracker + .record_event(&start_event("task-d", "parent", "child")) + .await; + let token = CancellationToken::new(); + tracker.register_canceller("task-d", token).await; + assert!(tracker.cancel("task-d").await); + + // The cancelled child loop will still emit a (likely failed) + // SubagentEnd. The terminal status should remain Cancelled. + tracker + .record_event(&end_event("task-d", "child", false)) + .await; + let snap = tracker.get("task-d").await.unwrap(); + assert_eq!(snap.status, SubagentStatus::Cancelled); + assert!(snap.finished_ms.is_some()); + assert_eq!(snap.success, Some(false)); + } + + #[tokio::test] + async fn clear_canceller_disarms_future_cancel_calls() { + let tracker = InMemorySubagentTaskTracker::new(); + tracker + .record_event(&start_event("task-e", "parent", "child")) + .await; + let token = CancellationToken::new(); + tracker.register_canceller("task-e", token.clone()).await; + tracker.clear_canceller("task-e").await; + + assert!(!tracker.cancel("task-e").await); + assert!(!token.is_cancelled()); + } } diff --git a/core/src/tools/builtin/mod.rs b/core/src/tools/builtin/mod.rs index 1bde654..46977c7 100644 --- a/core/src/tools/builtin/mod.rs +++ b/core/src/tools/builtin/mod.rs @@ -93,7 +93,15 @@ pub fn register_task( agent_registry: Arc, workspace: String, ) { - register_task_with_mcp(registry, llm_client, agent_registry, workspace, None, None); + register_task_with_mcp( + registry, + llm_client, + agent_registry, + workspace, + None, + None, + None, + ); } /// Register the task delegation tools with optional MCP manager and parent context. @@ -101,6 +109,8 @@ pub fn register_task( /// When `mcp_manager` is provided, delegated child sessions will have access /// to all MCP tools from connected servers. /// When `parent_context` is provided, child runs inherit parent capabilities. +/// When `subagent_tracker` is provided, each task registers a +/// `CancellationToken` against it so callers can cancel by `task_id`. pub fn register_task_with_mcp( registry: &Arc, llm_client: Arc, @@ -108,6 +118,7 @@ pub fn register_task_with_mcp( workspace: String, mcp_manager: Option>, parent_context: Option, + subagent_tracker: Option>, ) { use crate::tools::task::{ParallelTaskTool, TaskExecutor, TaskTool}; let mut executor = match mcp_manager { @@ -117,6 +128,9 @@ pub fn register_task_with_mcp( if let Some(ctx) = parent_context { executor = executor.with_parent_context(ctx); } + if let Some(tracker) = subagent_tracker { + executor = executor.with_subagent_tracker(tracker); + } let executor = Arc::new(executor); registry.register_builtin(Arc::new(TaskTool::new(Arc::clone(&executor)))); registry.register_builtin(Arc::new(ParallelTaskTool::new(Arc::clone(&executor)))); diff --git a/core/src/tools/task.rs b/core/src/tools/task.rs index 5789784..bd82e3b 100644 --- a/core/src/tools/task.rs +++ b/core/src/tools/task.rs @@ -194,6 +194,9 @@ pub struct TaskExecutor { /// Parent capabilities to inherit into child runs. parent_context: Option, max_parallel_tasks: usize, + /// Optional shared tracker — when present each task registers a + /// `CancellationToken` so callers can cancel by `task_id`. + subagent_tracker: Option>, } impl TaskExecutor { @@ -210,6 +213,7 @@ impl TaskExecutor { mcp_manager: None, parent_context: None, max_parallel_tasks: crate::agent::DEFAULT_MAX_PARALLEL_TASKS, + subagent_tracker: None, } } @@ -227,6 +231,7 @@ impl TaskExecutor { mcp_manager: Some(mcp_manager), parent_context: None, max_parallel_tasks: crate::agent::DEFAULT_MAX_PARALLEL_TASKS, + subagent_tracker: None, } } @@ -244,6 +249,17 @@ impl TaskExecutor { self } + /// Share a tracker with this executor. When set, each task registers + /// a `CancellationToken` against the tracker so the parent session + /// can cancel by `task_id`. + pub fn with_subagent_tracker( + mut self, + tracker: Arc, + ) -> Self { + self.subagent_tracker = Some(tracker); + self + } + /// Execute a task by spawning an isolated child AgentLoop. /// /// `parent_session_id` flows into the emitted `SubagentStart`/`SubagentEnd` @@ -382,14 +398,36 @@ impl TaskExecutor { None }; + // Register a CancellationToken with the tracker (if shared) so the + // parent session's `cancel_subagent_task` can interrupt this run. + let cancel_token = tokio_util::sync::CancellationToken::new(); + if let Some(ref tracker) = self.subagent_tracker { + tracker + .register_canceller(&task_id, cancel_token.clone()) + .await; + } + let (output, success) = match agent_loop - .execute(&[], ¶ms.prompt, child_event_tx) + .execute_with_session( + &[], + ¶ms.prompt, + Some(&session_id), + child_event_tx, + Some(&cancel_token), + ) .await { Ok(result) => (result.text, true), + Err(e) if cancel_token.is_cancelled() => { + (format!("Task cancelled by caller: {}", e), false) + } Err(e) => (format!("Task failed: {}", e), false), }; + if let Some(ref tracker) = self.subagent_tracker { + tracker.clear_canceller(&task_id).await; + } + if let Some(ref tx) = event_tx { let _ = tx.send(AgentEvent::SubagentEnd { task_id: task_id.clone(), diff --git a/sdk/node/generated.d.ts b/sdk/node/generated.d.ts index 81b4682..9eeddc4 100644 --- a/sdk/node/generated.d.ts +++ b/sdk/node/generated.d.ts @@ -1148,6 +1148,12 @@ export declare class Session { subagentTasks(): Promise /** Return snapshots of subagent tasks still in `running` state. */ pendingSubagentTasks(): Promise + /** + * Cancel an in-flight subagent task by id. Resolves to `true` when a + * cancellation token was found and fired, `false` when the task id + * is unknown or the task already finished. + */ + cancelSubagentTask(taskId: string): Promise /** Cancel a specific run only if it is still the active run. */ cancelRun(runId: string): Promise /** Execute a tool by name, bypassing the LLM. */ diff --git a/sdk/node/src/lib.rs b/sdk/node/src/lib.rs index b77f7d4..ee5bc4a 100644 --- a/sdk/node/src/lib.rs +++ b/sdk/node/src/lib.rs @@ -3136,6 +3136,18 @@ impl Session { .map_err(|e| napi::Error::from_reason(format!("Serialization error: {e}"))) } + /// Cancel an in-flight subagent task by id. Resolves to `true` when a + /// cancellation token was found and fired, `false` when the task id + /// is unknown or the task already finished. + #[napi(js_name = "cancelSubagentTask")] + pub async fn cancel_subagent_task(&self, task_id: String) -> napi::Result { + let session = self.inner.clone(); + get_runtime() + .spawn(async move { session.cancel_subagent_task(&task_id).await }) + .await + .map_err(|e| napi::Error::from_reason(format!("Task join error: {e}"))) + } + /// Cancel a specific run only if it is still the active run. #[napi(js_name = "cancelRun")] pub async fn cancel_run(&self, run_id: String) -> napi::Result { diff --git a/sdk/node/test.mjs b/sdk/node/test.mjs index 081390d..ee9081d 100644 --- a/sdk/node/test.mjs +++ b/sdk/node/test.mjs @@ -112,6 +112,9 @@ assert.match(result.text, /tools=\d+$/, 'custom slash command should receive too const missing = await session.subagentTask('task-does-not-exist') assert.equal(missing, null, 'unknown subagent task id should resolve to null') + + const cancelled = await session.cancelSubagentTask('task-does-not-exist') + assert.equal(cancelled, false, 'cancelling an unknown subagent task id should resolve to false') } session.close() diff --git a/sdk/python/src/lib.rs b/sdk/python/src/lib.rs index 716d0eb..068569d 100644 --- a/sdk/python/src/lib.rs +++ b/sdk/python/src/lib.rs @@ -1569,6 +1569,14 @@ impl PySession { json_string_to_py(py, &json) } + /// Cancel an in-flight subagent task by id. Returns True when a + /// cancellation token was found and fired, False when the task id is + /// unknown or the task already finished. + fn cancel_subagent_task(&self, py: Python<'_>, task_id: String) -> bool { + let session = self.inner.clone(); + py.allow_threads(move || get_runtime().block_on(session.cancel_subagent_task(&task_id))) + } + /// Cancel a specific run only if it is still the active run. fn cancel_run(&self, py: Python<'_>, run_id: String) -> bool { let session = self.inner.clone(); diff --git a/sdk/python/tests/test_subagent_query_api.py b/sdk/python/tests/test_subagent_query_api.py index d088b5f..3262b93 100644 --- a/sdk/python/tests/test_subagent_query_api.py +++ b/sdk/python/tests/test_subagent_query_api.py @@ -51,6 +51,11 @@ def main() -> None: missing = session.subagent_task("task-does-not-exist") assert missing is None, f"unknown subagent task id should return None, got {missing!r}" + cancelled = session.cancel_subagent_task("task-does-not-exist") + assert cancelled is False, ( + f"cancelling unknown subagent task id should return False, got {cancelled!r}" + ) + session.close() print("python sdk subagent query api ok")