-
Notifications
You must be signed in to change notification settings - Fork 0
Fix production loop readiness gaps #22
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
base: main
Are you sure you want to change the base?
Changes from all commits
d948972
986b750
aa56212
a0ab44f
16b6e1b
c84e9ca
bc08975
ccae6f7
8ad7a8a
5a7157d
c7c416f
f4cec95
File filter
Filter by extension
Conversations
Jump to
Diff view
Diff view
There are no files selected for viewing
| Original file line number | Diff line number | Diff line change |
|---|---|---|
|
|
@@ -68,4 +68,6 @@ backends: | |
| command: | ||
| - pi | ||
| - --mode | ||
| - json | ||
| - "{prompt}" | ||
| Original file line number | Diff line number | Diff line change |
|---|---|---|
|
|
@@ -276,7 +276,12 @@ def assemble_backend_prompt(task_spec: TaskSpec, constraints: BackendConstraints | |
| "- Touch only paths under the allowed prefixes below; anything else fails policy.", | ||
| "- No secret material, credentials, or environment-specific tokens in any file.", | ||
| f"- Budget: {constraints.max_iterations} iterations, " | ||
| f"{int(constraints.max_wall_clock_seconds)}s wall clock.", | ||
| f"{int(constraints.max_wall_clock_seconds)}s wall clock" | ||
| + ( | ||
| f", ${constraints.max_cost_usd:.2f} reported harness cost." | ||
| if constraints.max_cost_usd is not None | ||
| else "." | ||
| ), | ||
| ]) | ||
| for repo, prefixes in sorted(task_spec.allowed_paths.items()): | ||
| lines.append(f"- Allowed paths ({repo}): {', '.join(prefixes) or 'none configured'}") | ||
|
|
@@ -548,6 +553,122 @@ def execute( | |
| ) | ||
|
|
||
|
|
||
| def _content_text(message: Mapping[str, Any]) -> str: | ||
| content = message.get("content") | ||
| if isinstance(content, str): | ||
| return content | ||
| if not isinstance(content, list): | ||
| return "" | ||
| parts: list[str] = [] | ||
| for item in content: | ||
| if isinstance(item, dict) and isinstance(item.get("text"), str): | ||
| parts.append(str(item["text"])) | ||
| return "".join(parts) | ||
|
|
||
|
|
||
| def _usage_value(raw_usage: Mapping[str, Any], primary: str, fallback: str) -> Any: | ||
| value = raw_usage.get(primary) | ||
| return raw_usage.get(fallback) if value is None else value | ||
|
|
||
|
|
||
| def _usage_from_pi_message(message: Mapping[str, Any]) -> dict[str, Any] | None: | ||
| raw_usage = message.get("usage") | ||
| if not isinstance(raw_usage, dict): | ||
| return None | ||
| raw_cost = raw_usage.get("cost") | ||
| cost = raw_cost if isinstance(raw_cost, dict) else {} | ||
| return { | ||
| "usage": { | ||
| "input_tokens": _usage_value(raw_usage, "input", "input_tokens"), | ||
| "output_tokens": _usage_value(raw_usage, "output", "output_tokens"), | ||
| }, | ||
| "total_cost_usd": cost.get("total") if isinstance(cost, dict) else None, | ||
| } | ||
|
|
||
|
|
||
| def _pi_message_is_error(message: Mapping[str, Any]) -> bool: | ||
| stop_reason = str(message.get("stopReason", "")).lower() | ||
| return bool( | ||
| stop_reason in {"abort", "aborted", "cancel", "cancelled", "canceled", "error"} | ||
| or message.get("errorMessage") | ||
| or message.get("error") | ||
| ) | ||
|
|
||
|
|
||
| PI_JSON_EVENT_TYPES = frozenset( | ||
| { | ||
| "session", | ||
| "agent_start", | ||
| "turn_start", | ||
| "message_start", | ||
| "message_update", | ||
| "message_end", | ||
| "turn_end", | ||
| "agent_end", | ||
| "error", | ||
| "agent_error", | ||
| } | ||
| ) | ||
|
|
||
|
|
||
| def _is_pi_json_event(value: Mapping[str, Any]) -> bool: | ||
| return value.get("type") in PI_JSON_EVENT_TYPES | ||
|
|
||
|
|
||
| def _parse_pi_json_events(stdout: str) -> dict[str, Any]: | ||
| """Map pi ``--mode json`` NDJSON events into the generic harness schema.""" | ||
| events: list[dict[str, Any]] = [] | ||
| for line in stdout.splitlines(): | ||
| line = line.strip() | ||
| if not line: | ||
| continue | ||
| try: | ||
| decoded = json.loads(line) | ||
| except (json.JSONDecodeError, ValueError): | ||
| continue | ||
| if isinstance(decoded, dict): | ||
| events.append(decoded) | ||
| if not events: | ||
| return {} | ||
|
|
||
| parsed: dict[str, Any] = {"num_turns": 0, "is_error": False} | ||
| for event in events: | ||
| event_type = event.get("type") | ||
| if event_type == "turn_end": | ||
| parsed["num_turns"] = int(parsed.get("num_turns", 0)) + 1 | ||
| if event_type in {"error", "agent_error"} or event.get("error"): | ||
| parsed["is_error"] = True | ||
|
|
||
| message = event.get("message") | ||
| if isinstance(message, dict) and message.get("role") == "assistant": | ||
| if _pi_message_is_error(message): | ||
| parsed["is_error"] = True | ||
| usage = _usage_from_pi_message(message) | ||
| if usage is not None: | ||
| parsed.update(usage) | ||
| text = _content_text(message) | ||
| if text: | ||
| parsed["result"] = text | ||
|
|
||
| messages = event.get("messages") | ||
| if isinstance(messages, list): | ||
| for candidate in messages: | ||
| if not isinstance(candidate, dict) or candidate.get("role") != "assistant": | ||
| continue | ||
| if _pi_message_is_error(candidate): | ||
| parsed["is_error"] = True | ||
| usage = _usage_from_pi_message(candidate) | ||
| if usage is not None: | ||
| parsed.update(usage) | ||
| text = _content_text(candidate) | ||
| if text: | ||
| parsed["result"] = text | ||
|
|
||
| if int(parsed.get("num_turns", 0)) < 1: | ||
| parsed["num_turns"] = 1 | ||
| return parsed | ||
|
|
||
|
|
||
| class SubprocessBackend: | ||
| """Shared driver for real coding-agent harnesses run as subprocesses.""" | ||
|
|
||
|
|
@@ -579,7 +700,9 @@ def _parse_harness_output(self, stdout: str) -> dict[str, Any]: | |
| try: | ||
| decoded = json.loads(stdout) | ||
| except (json.JSONDecodeError, ValueError): | ||
| return {} | ||
| return _parse_pi_json_events(stdout) | ||
| if isinstance(decoded, dict) and _is_pi_json_event(decoded): | ||
| return _parse_pi_json_events(stdout) | ||
| return decoded if isinstance(decoded, dict) else {} | ||
|
Comment on lines
700
to
706
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more.
When Useful? React with 👍 / 👎.
Contributor
Author
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. Addressed in commit |
||
|
|
||
| def execute( | ||
|
|
@@ -702,6 +825,24 @@ def _result( | |
| cost=cost, | ||
| error=f"harness exited with code {returncode}", | ||
| ) | ||
| if ( | ||
| constraints.max_cost_usd is not None | ||
| and cost.reported | ||
| and cost.usd is not None | ||
| and cost.usd > constraints.max_cost_usd | ||
| ): | ||
| return _result( | ||
| "budget_exhausted", | ||
| diff=diff, | ||
| changed=tuple(changed), | ||
| transcript=transcript_path, | ||
| iterations=iterations, | ||
| cost=cost, | ||
| notes=( | ||
| f"reported harness cost ${cost.usd:.4f} exceeded " | ||
| f"run budget ${constraints.max_cost_usd:.4f}; partial work kept for inspection" | ||
| ), | ||
| ) | ||
| return _result( | ||
| "completed", | ||
| diff=diff, | ||
|
|
@@ -716,13 +857,14 @@ def _result( | |
| class PiBackend(SubprocessBackend): | ||
| """Non-interactive ``pi`` invocation in the worktree. | ||
|
|
||
| The default argv mirrors the ``claude -p`` convention; override the | ||
| command per-deployment through the ``backends.definitions`` section of | ||
| ``model-policy.yml`` if the local ``pi`` build differs. | ||
| ``--mode json`` emits newline-delimited session events; the shared parser | ||
| maps those events back into the cost/usage fields the loop ledger expects. | ||
| Override the command per-deployment through ``model-policy.yml`` only when | ||
| the local ``pi`` build differs. | ||
| """ | ||
|
|
||
| name = "pi" | ||
| default_command = ("pi", "--print", "{prompt}") | ||
| default_command = ("pi", "--print", "--mode", "json", "{prompt}") | ||
| extra_env_names = PI_PROVIDER_ENV_NAMES | ||
|
|
||
|
|
||
|
|
@@ -815,13 +957,17 @@ def task_spec_from_state(state: GraphState) -> TaskSpec: | |
| if tail: | ||
| journal_parts.append(tail) | ||
|
|
||
| gate_commands: list[list[str]] = list(state.get("gate_commands", [])) | ||
| for commands in state.get("gate_commands_by_repo", {}).values(): | ||
| gate_commands.extend(commands) | ||
|
|
||
| return TaskSpec( | ||
| change_id=state["change_id"], | ||
| change_class=str(state["change_class"]), | ||
| risk_level=str(state["risk_level"]), | ||
| request=state.get("feature_request", ""), | ||
| allowed_paths=allowed, | ||
| gate_commands=tuple(tuple(command) for command in state.get("gate_commands", [])), | ||
| gate_commands=tuple(tuple(command) for command in gate_commands), | ||
| transcript_dir=state.get("handoff_output_dir") or os.environ.get("HYRULE_HANDOFF_DIR"), | ||
| intent=str(spec.get("intent", "")), | ||
| acceptance_criteria=tuple(criteria), | ||
|
|
||
| Original file line number | Diff line number | Diff line change |
|---|---|---|
|
|
@@ -77,6 +77,22 @@ | |
| } | ||
| HIGH_RISK_LABELS = frozenset({"critical", "security"}) | ||
|
|
||
| ISSUE_BUDGET_LABELS: dict[str, dict[str, float | int]] = { | ||
| # Explicit human triage signal for feature-class work that is too large for | ||
| # the low-and-slow timer default. These raise only the per-run cap; daily | ||
| # run/cost caps still apply. | ||
| "loop:budget-large": { | ||
| "max_iterations": 40, | ||
| "max_wall_clock_minutes": 90, | ||
| "max_cost_usd": 7.5, | ||
| }, | ||
| "loop:budget-xl": { | ||
| "max_iterations": 60, | ||
| "max_wall_clock_minutes": 120, | ||
| "max_cost_usd": 10.0, | ||
| }, | ||
| } | ||
|
|
||
|
|
||
| class DaemonError(RuntimeError): | ||
| """Raised when a daemon cycle cannot run at all.""" | ||
|
|
@@ -329,6 +345,32 @@ def repo_name_for_issue(item: IntakeItem) -> str: | |
| return REPO_CHECKOUT_NAMES.get(short, short) | ||
|
|
||
|
|
||
| def backend_budget_for_issue( | ||
| item: IntakeItem, | ||
| config: DaemonConfig, | ||
| *, | ||
| remaining_cost_usd: float | None = None, | ||
| ) -> dict[str, float | int]: | ||
| """Resolve per-run backend budget, optionally raised by issue label.""" | ||
| budget: dict[str, float | int] = { | ||
| "max_iterations": config.max_iterations_per_run, | ||
| "max_wall_clock_minutes": config.max_wall_clock_minutes_per_run, | ||
| "max_cost_usd": config.max_cost_usd_per_run, | ||
| } | ||
| normalized_labels = {label.lower() for label in item.labels} | ||
| for label, override in ISSUE_BUDGET_LABELS.items(): | ||
| if label not in normalized_labels: | ||
| continue | ||
| budget["max_iterations"] = max(int(budget["max_iterations"]), int(override["max_iterations"])) | ||
| budget["max_wall_clock_minutes"] = max( | ||
| int(budget["max_wall_clock_minutes"]), int(override["max_wall_clock_minutes"]) | ||
| ) | ||
| budget["max_cost_usd"] = max(float(budget["max_cost_usd"]), float(override["max_cost_usd"])) | ||
|
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more.
Fresh evidence beyond the earlier clamp thread: when Useful? React with 👍 / 👎.
Contributor
Author
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. Addressed in the PR branch: reported harness cost is now enforced against |
||
| if remaining_cost_usd is not None: | ||
| budget["max_cost_usd"] = max(0.0, min(float(budget["max_cost_usd"]), remaining_cost_usd)) | ||
| return budget | ||
|
|
||
|
|
||
| def _issue_body(item: IntakeItem, *, client: GhClient) -> str: | ||
| raw = client.run( | ||
| ["issue", "view", str(item.number), "--repo", item.repo, "--json", "body"] | ||
|
|
@@ -416,6 +458,16 @@ def daemon_once( | |
| item = queue[0] | ||
| change_class, risk = classify_issue(item) | ||
| change_id = _change_id_for(item) | ||
| remaining_cost_usd = config.max_cost_usd_per_day - float(ledger.get("cost_usd", 0.0)) | ||
| if remaining_cost_usd <= 0: | ||
| return _finish( | ||
| DaemonReport( | ||
| outcome="over_budget", | ||
| detail=f"daily cost budget reached (${config.max_cost_usd_per_day:.2f})", | ||
| ), | ||
| discord_poster, | ||
| icinga_poster, | ||
| ) | ||
| body = _issue_body(item, client=client) | ||
|
|
||
| output_root = config.output_root.expanduser().resolve() / change_id.lower() | ||
|
|
@@ -432,6 +484,11 @@ def daemon_once( | |
| runner = feature_runner or run_feature_intake | ||
| repo_name = repo_name_for_issue(item) | ||
| effective_allowed_paths = list(config.allowed_paths_by_repo.get(repo_name, config.allowed_paths)) | ||
| effective_backend_budget = backend_budget_for_issue( | ||
| item, | ||
| config, | ||
| remaining_cost_usd=remaining_cost_usd, | ||
| ) | ||
| result = runner( | ||
| change_id=change_id, | ||
| change_class=change_class, | ||
|
|
@@ -442,11 +499,7 @@ def daemon_once( | |
| allowed_paths=effective_allowed_paths, | ||
| source_files=["README.md"], | ||
| memory_dir=config.memory_dir, | ||
| backend_budget={ | ||
| "max_iterations": config.max_iterations_per_run, | ||
| "max_wall_clock_minutes": config.max_wall_clock_minutes_per_run, | ||
| "max_cost_usd": config.max_cost_usd_per_run, | ||
| }, | ||
| backend_budget=effective_backend_budget, | ||
| knowledge_context=config.knowledge_context, | ||
| knowledge_learning_dir=config.knowledge_learning_dir, | ||
| ) | ||
|
|
@@ -462,7 +515,11 @@ def daemon_once( | |
| journal_path=(final_state.get("reflection_results") or {}).get("journal_path"), | ||
| ) | ||
|
|
||
| if result.get("signoff_status") == "ready_for_review" and final_state.get( | ||
| run_cost_budget = float(effective_backend_budget.get("max_cost_usd", 0.0)) | ||
| cost_budget_exceeded = run_cost_budget > 0 and cost > run_cost_budget | ||
| if cost_budget_exceeded: | ||
| report.detail = f"reported run cost ${cost:.4f} exceeded budget ${run_cost_budget:.4f}" | ||
| elif result.get("signoff_status") == "ready_for_review" and final_state.get( | ||
| "promotion_results" | ||
| ): | ||
| # The human pre-authorized this work by applying loop:approved; | ||
|
|
||
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
When Pi encodes a provider/runtime failure on an assistant message, e.g.
stopReason: "error"or"aborted"witherrorMessage(as its message types document), this branch still treats the message like a normal assistant response and never setsis_error. If that JSON-mode run exits 0,execute()reports a completed backend run instead of a failure, so the loop can proceed past a failed Pi invocation; check assistantstopReason/errorMessagewhile normalizing these events.Useful? React with 👍 / 👎.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Addressed in the PR branch: Pi assistant messages with error-like
stopReasonvalues (error,aborted, canceled variants) orerrorMessage/errornow normalize tois_error=True. Addedtest_pi_backend_treats_assistant_error_stop_reason_as_error, preserving usage/cost extraction while making zero-exit provider/runtime failures fail the backend run.