Skip to content
4 changes: 3 additions & 1 deletion README.md
Original file line number Diff line number Diff line change
Expand Up @@ -105,7 +105,9 @@ The daemon's default production scope is the eight core repos:
`engineering-loop`, `network-operations`, `hyrule-cloud`, `hyrule-web`,
`hyrule-mcp`, `noc-agent`, `hyrule-network-proxy`, and `as215932.net`. It runs low-and-slow by
default: at most 2 runs/day, $10/day, and docs-only mutation boundaries unless
a later reviewed PR widens them.
a later reviewed PR widens them. Feature-sized approved issues can opt into a
larger per-run cap with `loop:budget-large` or `loop:budget-xl`; daily caps
still apply.

The dedicated `loop` VM sets `HYRULE_MODEL_POLICY_FILE` to
`configs/loop/model-policy.production.yml` after the operator completes Pi auth;
Expand Down
2 changes: 2 additions & 0 deletions configs/loop/model-policy.production.yml
Original file line number Diff line number Diff line change
Expand Up @@ -68,4 +68,6 @@ backends:
command:
- pi
- --print
- --mode
- json
- "{prompt}"
160 changes: 153 additions & 7 deletions src/hyrule_engineering_loop/backend.py
Original file line number Diff line number Diff line change
Expand Up @@ -276,7 +276,12 @@ def assemble_backend_prompt(task_spec: TaskSpec, constraints: BackendConstraints
"- Touch only paths under the allowed prefixes below; anything else fails policy.",
"- No secret material, credentials, or environment-specific tokens in any file.",
f"- Budget: {constraints.max_iterations} iterations, "
f"{int(constraints.max_wall_clock_seconds)}s wall clock.",
f"{int(constraints.max_wall_clock_seconds)}s wall clock"
+ (
f", ${constraints.max_cost_usd:.2f} reported harness cost."
if constraints.max_cost_usd is not None
else "."
),
])
for repo, prefixes in sorted(task_spec.allowed_paths.items()):
lines.append(f"- Allowed paths ({repo}): {', '.join(prefixes) or 'none configured'}")
Expand Down Expand Up @@ -548,6 +553,122 @@ def execute(
)


def _content_text(message: Mapping[str, Any]) -> str:
content = message.get("content")
if isinstance(content, str):
return content
if not isinstance(content, list):
return ""
parts: list[str] = []
for item in content:
if isinstance(item, dict) and isinstance(item.get("text"), str):
parts.append(str(item["text"]))
return "".join(parts)


def _usage_value(raw_usage: Mapping[str, Any], primary: str, fallback: str) -> Any:
value = raw_usage.get(primary)
return raw_usage.get(fallback) if value is None else value


def _usage_from_pi_message(message: Mapping[str, Any]) -> dict[str, Any] | None:
raw_usage = message.get("usage")
if not isinstance(raw_usage, dict):
return None
raw_cost = raw_usage.get("cost")
cost = raw_cost if isinstance(raw_cost, dict) else {}
return {
"usage": {
"input_tokens": _usage_value(raw_usage, "input", "input_tokens"),
"output_tokens": _usage_value(raw_usage, "output", "output_tokens"),
},
"total_cost_usd": cost.get("total") if isinstance(cost, dict) else None,
}


def _pi_message_is_error(message: Mapping[str, Any]) -> bool:
stop_reason = str(message.get("stopReason", "")).lower()
return bool(
stop_reason in {"abort", "aborted", "cancel", "cancelled", "canceled", "error"}
or message.get("errorMessage")
or message.get("error")
)


PI_JSON_EVENT_TYPES = frozenset(
{
"session",
"agent_start",
"turn_start",
"message_start",
"message_update",
"message_end",
"turn_end",
"agent_end",
"error",
"agent_error",
}
)


def _is_pi_json_event(value: Mapping[str, Any]) -> bool:
return value.get("type") in PI_JSON_EVENT_TYPES


def _parse_pi_json_events(stdout: str) -> dict[str, Any]:
"""Map pi ``--mode json`` NDJSON events into the generic harness schema."""
events: list[dict[str, Any]] = []
for line in stdout.splitlines():
line = line.strip()
if not line:
continue
try:
decoded = json.loads(line)
except (json.JSONDecodeError, ValueError):
continue
if isinstance(decoded, dict):
events.append(decoded)
if not events:
return {}

parsed: dict[str, Any] = {"num_turns": 0, "is_error": False}
for event in events:
event_type = event.get("type")
if event_type == "turn_end":
parsed["num_turns"] = int(parsed.get("num_turns", 0)) + 1
if event_type in {"error", "agent_error"} or event.get("error"):
parsed["is_error"] = True

message = event.get("message")
if isinstance(message, dict) and message.get("role") == "assistant":
if _pi_message_is_error(message):
parsed["is_error"] = True
usage = _usage_from_pi_message(message)
if usage is not None:
parsed.update(usage)
text = _content_text(message)
Comment on lines +643 to +649

Copy link
Copy Markdown

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

P2 Badge Honor Pi message stopReason errors

When Pi encodes a provider/runtime failure on an assistant message, e.g. stopReason: "error" or "aborted" with errorMessage (as its message types document), this branch still treats the message like a normal assistant response and never sets is_error. If that JSON-mode run exits 0, execute() reports a completed backend run instead of a failure, so the loop can proceed past a failed Pi invocation; check assistant stopReason/errorMessage while normalizing these events.

Useful? React with 👍 / 👎.

Copy link
Copy Markdown
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Addressed in the PR branch: Pi assistant messages with error-like stopReason values (error, aborted, canceled variants) or errorMessage/error now normalize to is_error=True. Added test_pi_backend_treats_assistant_error_stop_reason_as_error, preserving usage/cost extraction while making zero-exit provider/runtime failures fail the backend run.

if text:
parsed["result"] = text

messages = event.get("messages")
if isinstance(messages, list):
for candidate in messages:
if not isinstance(candidate, dict) or candidate.get("role") != "assistant":
continue
if _pi_message_is_error(candidate):
parsed["is_error"] = True
usage = _usage_from_pi_message(candidate)
if usage is not None:
parsed.update(usage)
text = _content_text(candidate)
if text:
parsed["result"] = text

if int(parsed.get("num_turns", 0)) < 1:
parsed["num_turns"] = 1
return parsed


class SubprocessBackend:
"""Shared driver for real coding-agent harnesses run as subprocesses."""

Expand Down Expand Up @@ -579,7 +700,9 @@ def _parse_harness_output(self, stdout: str) -> dict[str, Any]:
try:
decoded = json.loads(stdout)
except (json.JSONDecodeError, ValueError):
return {}
return _parse_pi_json_events(stdout)
if isinstance(decoded, dict) and _is_pi_json_event(decoded):
return _parse_pi_json_events(stdout)
return decoded if isinstance(decoded, dict) else {}
Comment on lines 700 to 706

Copy link
Copy Markdown

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

P2 Badge Normalize single-event Pi JSON before returning

When pi --mode json emits only one JSON event, such as an immediate agent_error before any turns, json.loads(stdout) succeeds and the new Pi event mapper is skipped. That single event is returned without the normalized is_error, usage, or total_cost_usd fields, so a zero-exit error event can be treated as a completed backend run and the daemon ledger records no reported cost. Detect Pi event-shaped dicts here and pass them through the same mapper before returning generic harness JSON.

Useful? React with 👍 / 👎.

Copy link
Copy Markdown
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Addressed in commit a0ab44f: single Pi JSON events whose type is a Pi event are now routed through the same Pi event normalizer instead of being returned raw. Added test_pi_backend_parses_single_json_error_event.


def execute(
Expand Down Expand Up @@ -702,6 +825,24 @@ def _result(
cost=cost,
error=f"harness exited with code {returncode}",
)
if (
constraints.max_cost_usd is not None
and cost.reported
and cost.usd is not None
and cost.usd > constraints.max_cost_usd
):
return _result(
"budget_exhausted",
diff=diff,
changed=tuple(changed),
transcript=transcript_path,
iterations=iterations,
cost=cost,
notes=(
f"reported harness cost ${cost.usd:.4f} exceeded "
f"run budget ${constraints.max_cost_usd:.4f}; partial work kept for inspection"
),
)
return _result(
"completed",
diff=diff,
Expand All @@ -716,13 +857,14 @@ def _result(
class PiBackend(SubprocessBackend):
"""Non-interactive ``pi`` invocation in the worktree.

The default argv mirrors the ``claude -p`` convention; override the
command per-deployment through the ``backends.definitions`` section of
``model-policy.yml`` if the local ``pi`` build differs.
``--mode json`` emits newline-delimited session events; the shared parser
maps those events back into the cost/usage fields the loop ledger expects.
Override the command per-deployment through ``model-policy.yml`` only when
the local ``pi`` build differs.
"""

name = "pi"
default_command = ("pi", "--print", "{prompt}")
default_command = ("pi", "--print", "--mode", "json", "{prompt}")
extra_env_names = PI_PROVIDER_ENV_NAMES


Expand Down Expand Up @@ -815,13 +957,17 @@ def task_spec_from_state(state: GraphState) -> TaskSpec:
if tail:
journal_parts.append(tail)

gate_commands: list[list[str]] = list(state.get("gate_commands", []))
for commands in state.get("gate_commands_by_repo", {}).values():
gate_commands.extend(commands)

return TaskSpec(
change_id=state["change_id"],
change_class=str(state["change_class"]),
risk_level=str(state["risk_level"]),
request=state.get("feature_request", ""),
allowed_paths=allowed,
gate_commands=tuple(tuple(command) for command in state.get("gate_commands", [])),
gate_commands=tuple(tuple(command) for command in gate_commands),
transcript_dir=state.get("handoff_output_dir") or os.environ.get("HYRULE_HANDOFF_DIR"),
intent=str(spec.get("intent", "")),
acceptance_criteria=tuple(criteria),
Expand Down
69 changes: 63 additions & 6 deletions src/hyrule_engineering_loop/daemon.py
Original file line number Diff line number Diff line change
Expand Up @@ -77,6 +77,22 @@
}
HIGH_RISK_LABELS = frozenset({"critical", "security"})

ISSUE_BUDGET_LABELS: dict[str, dict[str, float | int]] = {
# Explicit human triage signal for feature-class work that is too large for
# the low-and-slow timer default. These raise only the per-run cap; daily
# run/cost caps still apply.
"loop:budget-large": {
"max_iterations": 40,
"max_wall_clock_minutes": 90,
"max_cost_usd": 7.5,
},
"loop:budget-xl": {
"max_iterations": 60,
"max_wall_clock_minutes": 120,
"max_cost_usd": 10.0,
},
}


class DaemonError(RuntimeError):
"""Raised when a daemon cycle cannot run at all."""
Expand Down Expand Up @@ -329,6 +345,32 @@ def repo_name_for_issue(item: IntakeItem) -> str:
return REPO_CHECKOUT_NAMES.get(short, short)


def backend_budget_for_issue(
item: IntakeItem,
config: DaemonConfig,
*,
remaining_cost_usd: float | None = None,
) -> dict[str, float | int]:
"""Resolve per-run backend budget, optionally raised by issue label."""
budget: dict[str, float | int] = {
"max_iterations": config.max_iterations_per_run,
"max_wall_clock_minutes": config.max_wall_clock_minutes_per_run,
"max_cost_usd": config.max_cost_usd_per_run,
}
normalized_labels = {label.lower() for label in item.labels}
for label, override in ISSUE_BUDGET_LABELS.items():
if label not in normalized_labels:
continue
budget["max_iterations"] = max(int(budget["max_iterations"]), int(override["max_iterations"]))
budget["max_wall_clock_minutes"] = max(
int(budget["max_wall_clock_minutes"]), int(override["max_wall_clock_minutes"])
)
budget["max_cost_usd"] = max(float(budget["max_cost_usd"]), float(override["max_cost_usd"]))

Copy link
Copy Markdown

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

P2 Badge Enforce the raised cost budget before trusting labels

Fresh evidence beyond the earlier clamp thread: when loop:budget-large/xl reaches this assignment, the only downstream use is copying max_cost_usd into BackendConstraints; assemble_backend_prompt does not mention the dollar cap and SubprocessBackend.execute never compares reported Pi cost to it. The new label therefore neither enforces nor meaningfully raises a run cost cap, and the daemon can still add an over-cap cost to the daily ledger after the run.

Useful? React with 👍 / 👎.

Copy link
Copy Markdown
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Addressed in the PR branch: reported harness cost is now enforced against BackendConstraints.max_cost_usd; an over-budget completed harness run becomes budget_exhausted. The daemon also blocks draft PR publication if the final reported run cost exceeds the effective per-run budget. Added backend and daemon regression coverage.

if remaining_cost_usd is not None:
budget["max_cost_usd"] = max(0.0, min(float(budget["max_cost_usd"]), remaining_cost_usd))
return budget


def _issue_body(item: IntakeItem, *, client: GhClient) -> str:
raw = client.run(
["issue", "view", str(item.number), "--repo", item.repo, "--json", "body"]
Expand Down Expand Up @@ -416,6 +458,16 @@ def daemon_once(
item = queue[0]
change_class, risk = classify_issue(item)
change_id = _change_id_for(item)
remaining_cost_usd = config.max_cost_usd_per_day - float(ledger.get("cost_usd", 0.0))
if remaining_cost_usd <= 0:
return _finish(
DaemonReport(
outcome="over_budget",
detail=f"daily cost budget reached (${config.max_cost_usd_per_day:.2f})",
),
discord_poster,
icinga_poster,
)
body = _issue_body(item, client=client)

output_root = config.output_root.expanduser().resolve() / change_id.lower()
Expand All @@ -432,6 +484,11 @@ def daemon_once(
runner = feature_runner or run_feature_intake
repo_name = repo_name_for_issue(item)
effective_allowed_paths = list(config.allowed_paths_by_repo.get(repo_name, config.allowed_paths))
effective_backend_budget = backend_budget_for_issue(
item,
config,
remaining_cost_usd=remaining_cost_usd,
)
result = runner(
change_id=change_id,
change_class=change_class,
Expand All @@ -442,11 +499,7 @@ def daemon_once(
allowed_paths=effective_allowed_paths,
source_files=["README.md"],
memory_dir=config.memory_dir,
backend_budget={
"max_iterations": config.max_iterations_per_run,
"max_wall_clock_minutes": config.max_wall_clock_minutes_per_run,
"max_cost_usd": config.max_cost_usd_per_run,
},
backend_budget=effective_backend_budget,
knowledge_context=config.knowledge_context,
knowledge_learning_dir=config.knowledge_learning_dir,
)
Expand All @@ -462,7 +515,11 @@ def daemon_once(
journal_path=(final_state.get("reflection_results") or {}).get("journal_path"),
)

if result.get("signoff_status") == "ready_for_review" and final_state.get(
run_cost_budget = float(effective_backend_budget.get("max_cost_usd", 0.0))
cost_budget_exceeded = run_cost_budget > 0 and cost > run_cost_budget
if cost_budget_exceeded:
report.detail = f"reported run cost ${cost:.4f} exceeded budget ${run_cost_budget:.4f}"
elif result.get("signoff_status") == "ready_for_review" and final_state.get(
"promotion_results"
):
# The human pre-authorized this work by applying loop:approved;
Expand Down
Loading