diff --git a/README.md b/README.md index 7909ee1..13ea1e3 100644 --- a/README.md +++ b/README.md @@ -105,7 +105,9 @@ The daemon's default production scope is the eight core repos: `engineering-loop`, `network-operations`, `hyrule-cloud`, `hyrule-web`, `hyrule-mcp`, `noc-agent`, `hyrule-network-proxy`, and `as215932.net`. It runs low-and-slow by default: at most 2 runs/day, $10/day, and docs-only mutation boundaries unless -a later reviewed PR widens them. +a later reviewed PR widens them. Feature-sized approved issues can opt into a +larger per-run cap with `loop:budget-large` or `loop:budget-xl`; daily caps +still apply. The dedicated `loop` VM sets `HYRULE_MODEL_POLICY_FILE` to `configs/loop/model-policy.production.yml` after the operator completes Pi auth; diff --git a/configs/loop/model-policy.production.yml b/configs/loop/model-policy.production.yml index c16839f..b61e75d 100644 --- a/configs/loop/model-policy.production.yml +++ b/configs/loop/model-policy.production.yml @@ -68,4 +68,6 @@ backends: command: - pi - --print + - --mode + - json - "{prompt}" diff --git a/src/hyrule_engineering_loop/backend.py b/src/hyrule_engineering_loop/backend.py index ba44409..7e5e0aa 100644 --- a/src/hyrule_engineering_loop/backend.py +++ b/src/hyrule_engineering_loop/backend.py @@ -276,7 +276,12 @@ def assemble_backend_prompt(task_spec: TaskSpec, constraints: BackendConstraints "- Touch only paths under the allowed prefixes below; anything else fails policy.", "- No secret material, credentials, or environment-specific tokens in any file.", f"- Budget: {constraints.max_iterations} iterations, " - f"{int(constraints.max_wall_clock_seconds)}s wall clock.", + f"{int(constraints.max_wall_clock_seconds)}s wall clock" + + ( + f", ${constraints.max_cost_usd:.2f} reported harness cost." + if constraints.max_cost_usd is not None + else "." + ), ]) for repo, prefixes in sorted(task_spec.allowed_paths.items()): lines.append(f"- Allowed paths ({repo}): {', '.join(prefixes) or 'none configured'}") @@ -548,6 +553,122 @@ def execute( ) +def _content_text(message: Mapping[str, Any]) -> str: + content = message.get("content") + if isinstance(content, str): + return content + if not isinstance(content, list): + return "" + parts: list[str] = [] + for item in content: + if isinstance(item, dict) and isinstance(item.get("text"), str): + parts.append(str(item["text"])) + return "".join(parts) + + +def _usage_value(raw_usage: Mapping[str, Any], primary: str, fallback: str) -> Any: + value = raw_usage.get(primary) + return raw_usage.get(fallback) if value is None else value + + +def _usage_from_pi_message(message: Mapping[str, Any]) -> dict[str, Any] | None: + raw_usage = message.get("usage") + if not isinstance(raw_usage, dict): + return None + raw_cost = raw_usage.get("cost") + cost = raw_cost if isinstance(raw_cost, dict) else {} + return { + "usage": { + "input_tokens": _usage_value(raw_usage, "input", "input_tokens"), + "output_tokens": _usage_value(raw_usage, "output", "output_tokens"), + }, + "total_cost_usd": cost.get("total") if isinstance(cost, dict) else None, + } + + +def _pi_message_is_error(message: Mapping[str, Any]) -> bool: + stop_reason = str(message.get("stopReason", "")).lower() + return bool( + stop_reason in {"abort", "aborted", "cancel", "cancelled", "canceled", "error"} + or message.get("errorMessage") + or message.get("error") + ) + + +PI_JSON_EVENT_TYPES = frozenset( + { + "session", + "agent_start", + "turn_start", + "message_start", + "message_update", + "message_end", + "turn_end", + "agent_end", + "error", + "agent_error", + } +) + + +def _is_pi_json_event(value: Mapping[str, Any]) -> bool: + return value.get("type") in PI_JSON_EVENT_TYPES + + +def _parse_pi_json_events(stdout: str) -> dict[str, Any]: + """Map pi ``--mode json`` NDJSON events into the generic harness schema.""" + events: list[dict[str, Any]] = [] + for line in stdout.splitlines(): + line = line.strip() + if not line: + continue + try: + decoded = json.loads(line) + except (json.JSONDecodeError, ValueError): + continue + if isinstance(decoded, dict): + events.append(decoded) + if not events: + return {} + + parsed: dict[str, Any] = {"num_turns": 0, "is_error": False} + for event in events: + event_type = event.get("type") + if event_type == "turn_end": + parsed["num_turns"] = int(parsed.get("num_turns", 0)) + 1 + if event_type in {"error", "agent_error"} or event.get("error"): + parsed["is_error"] = True + + message = event.get("message") + if isinstance(message, dict) and message.get("role") == "assistant": + if _pi_message_is_error(message): + parsed["is_error"] = True + usage = _usage_from_pi_message(message) + if usage is not None: + parsed.update(usage) + text = _content_text(message) + if text: + parsed["result"] = text + + messages = event.get("messages") + if isinstance(messages, list): + for candidate in messages: + if not isinstance(candidate, dict) or candidate.get("role") != "assistant": + continue + if _pi_message_is_error(candidate): + parsed["is_error"] = True + usage = _usage_from_pi_message(candidate) + if usage is not None: + parsed.update(usage) + text = _content_text(candidate) + if text: + parsed["result"] = text + + if int(parsed.get("num_turns", 0)) < 1: + parsed["num_turns"] = 1 + return parsed + + class SubprocessBackend: """Shared driver for real coding-agent harnesses run as subprocesses.""" @@ -579,7 +700,9 @@ def _parse_harness_output(self, stdout: str) -> dict[str, Any]: try: decoded = json.loads(stdout) except (json.JSONDecodeError, ValueError): - return {} + return _parse_pi_json_events(stdout) + if isinstance(decoded, dict) and _is_pi_json_event(decoded): + return _parse_pi_json_events(stdout) return decoded if isinstance(decoded, dict) else {} def execute( @@ -702,6 +825,24 @@ def _result( cost=cost, error=f"harness exited with code {returncode}", ) + if ( + constraints.max_cost_usd is not None + and cost.reported + and cost.usd is not None + and cost.usd > constraints.max_cost_usd + ): + return _result( + "budget_exhausted", + diff=diff, + changed=tuple(changed), + transcript=transcript_path, + iterations=iterations, + cost=cost, + notes=( + f"reported harness cost ${cost.usd:.4f} exceeded " + f"run budget ${constraints.max_cost_usd:.4f}; partial work kept for inspection" + ), + ) return _result( "completed", diff=diff, @@ -716,13 +857,14 @@ def _result( class PiBackend(SubprocessBackend): """Non-interactive ``pi`` invocation in the worktree. - The default argv mirrors the ``claude -p`` convention; override the - command per-deployment through the ``backends.definitions`` section of - ``model-policy.yml`` if the local ``pi`` build differs. + ``--mode json`` emits newline-delimited session events; the shared parser + maps those events back into the cost/usage fields the loop ledger expects. + Override the command per-deployment through ``model-policy.yml`` only when + the local ``pi`` build differs. """ name = "pi" - default_command = ("pi", "--print", "{prompt}") + default_command = ("pi", "--print", "--mode", "json", "{prompt}") extra_env_names = PI_PROVIDER_ENV_NAMES @@ -815,13 +957,17 @@ def task_spec_from_state(state: GraphState) -> TaskSpec: if tail: journal_parts.append(tail) + gate_commands: list[list[str]] = list(state.get("gate_commands", [])) + for commands in state.get("gate_commands_by_repo", {}).values(): + gate_commands.extend(commands) + return TaskSpec( change_id=state["change_id"], change_class=str(state["change_class"]), risk_level=str(state["risk_level"]), request=state.get("feature_request", ""), allowed_paths=allowed, - gate_commands=tuple(tuple(command) for command in state.get("gate_commands", [])), + gate_commands=tuple(tuple(command) for command in gate_commands), transcript_dir=state.get("handoff_output_dir") or os.environ.get("HYRULE_HANDOFF_DIR"), intent=str(spec.get("intent", "")), acceptance_criteria=tuple(criteria), diff --git a/src/hyrule_engineering_loop/daemon.py b/src/hyrule_engineering_loop/daemon.py index 46e96c5..aaec321 100644 --- a/src/hyrule_engineering_loop/daemon.py +++ b/src/hyrule_engineering_loop/daemon.py @@ -77,6 +77,22 @@ } HIGH_RISK_LABELS = frozenset({"critical", "security"}) +ISSUE_BUDGET_LABELS: dict[str, dict[str, float | int]] = { + # Explicit human triage signal for feature-class work that is too large for + # the low-and-slow timer default. These raise only the per-run cap; daily + # run/cost caps still apply. + "loop:budget-large": { + "max_iterations": 40, + "max_wall_clock_minutes": 90, + "max_cost_usd": 7.5, + }, + "loop:budget-xl": { + "max_iterations": 60, + "max_wall_clock_minutes": 120, + "max_cost_usd": 10.0, + }, +} + class DaemonError(RuntimeError): """Raised when a daemon cycle cannot run at all.""" @@ -329,6 +345,32 @@ def repo_name_for_issue(item: IntakeItem) -> str: return REPO_CHECKOUT_NAMES.get(short, short) +def backend_budget_for_issue( + item: IntakeItem, + config: DaemonConfig, + *, + remaining_cost_usd: float | None = None, +) -> dict[str, float | int]: + """Resolve per-run backend budget, optionally raised by issue label.""" + budget: dict[str, float | int] = { + "max_iterations": config.max_iterations_per_run, + "max_wall_clock_minutes": config.max_wall_clock_minutes_per_run, + "max_cost_usd": config.max_cost_usd_per_run, + } + normalized_labels = {label.lower() for label in item.labels} + for label, override in ISSUE_BUDGET_LABELS.items(): + if label not in normalized_labels: + continue + budget["max_iterations"] = max(int(budget["max_iterations"]), int(override["max_iterations"])) + budget["max_wall_clock_minutes"] = max( + int(budget["max_wall_clock_minutes"]), int(override["max_wall_clock_minutes"]) + ) + budget["max_cost_usd"] = max(float(budget["max_cost_usd"]), float(override["max_cost_usd"])) + if remaining_cost_usd is not None: + budget["max_cost_usd"] = max(0.0, min(float(budget["max_cost_usd"]), remaining_cost_usd)) + return budget + + def _issue_body(item: IntakeItem, *, client: GhClient) -> str: raw = client.run( ["issue", "view", str(item.number), "--repo", item.repo, "--json", "body"] @@ -416,6 +458,16 @@ def daemon_once( item = queue[0] change_class, risk = classify_issue(item) change_id = _change_id_for(item) + remaining_cost_usd = config.max_cost_usd_per_day - float(ledger.get("cost_usd", 0.0)) + if remaining_cost_usd <= 0: + return _finish( + DaemonReport( + outcome="over_budget", + detail=f"daily cost budget reached (${config.max_cost_usd_per_day:.2f})", + ), + discord_poster, + icinga_poster, + ) body = _issue_body(item, client=client) output_root = config.output_root.expanduser().resolve() / change_id.lower() @@ -432,6 +484,11 @@ def daemon_once( runner = feature_runner or run_feature_intake repo_name = repo_name_for_issue(item) effective_allowed_paths = list(config.allowed_paths_by_repo.get(repo_name, config.allowed_paths)) + effective_backend_budget = backend_budget_for_issue( + item, + config, + remaining_cost_usd=remaining_cost_usd, + ) result = runner( change_id=change_id, change_class=change_class, @@ -442,11 +499,7 @@ def daemon_once( allowed_paths=effective_allowed_paths, source_files=["README.md"], memory_dir=config.memory_dir, - backend_budget={ - "max_iterations": config.max_iterations_per_run, - "max_wall_clock_minutes": config.max_wall_clock_minutes_per_run, - "max_cost_usd": config.max_cost_usd_per_run, - }, + backend_budget=effective_backend_budget, knowledge_context=config.knowledge_context, knowledge_learning_dir=config.knowledge_learning_dir, ) @@ -462,7 +515,11 @@ def daemon_once( journal_path=(final_state.get("reflection_results") or {}).get("journal_path"), ) - if result.get("signoff_status") == "ready_for_review" and final_state.get( + run_cost_budget = float(effective_backend_budget.get("max_cost_usd", 0.0)) + cost_budget_exceeded = run_cost_budget > 0 and cost > run_cost_budget + if cost_budget_exceeded: + report.detail = f"reported run cost ${cost:.4f} exceeded budget ${run_cost_budget:.4f}" + elif result.get("signoff_status") == "ready_for_review" and final_state.get( "promotion_results" ): # The human pre-authorized this work by applying loop:approved; diff --git a/src/hyrule_engineering_loop/gate_runner.py b/src/hyrule_engineering_loop/gate_runner.py index b3c324b..53a9408 100644 --- a/src/hyrule_engineering_loop/gate_runner.py +++ b/src/hyrule_engineering_loop/gate_runner.py @@ -5,10 +5,61 @@ import json import subprocess import sys +import tomllib from pathlib import Path from typing import Any, Iterable, Sequence MAX_OUTPUT_CHARS = 8_000 +PYTHON_GATE_TOOLS = frozenset({"pytest", "ruff", "mypy"}) +UV_DEV_SELECTORS = frozenset({"--group", "--extra", "--only-group", "--only-dev", "--all-groups", "--all-extras"}) +UV_LOCK_GUARDS = frozenset({"--locked", "--frozen"}) +UV_OPTIONS_WITH_VALUE = frozenset( + { + "--allow-insecure-host", + "--cache-dir", + "--color", + "--config-file", + "--config-setting", + "--config-settings-package", + "--default-index", + "--directory", + "--env-file", + "--exclude-newer", + "--exclude-newer-package", + "--extra", + "--extra-index-url", + "--find-links", + "--fork-strategy", + "--group", + "--index", + "--index-strategy", + "--index-url", + "--keyring-provider", + "--link-mode", + "--no-binary-package", + "--no-build-isolation-package", + "--no-build-package", + "--no-editable-package", + "--no-extra", + "--no-group", + "--no-sources-package", + "--only-group", + "--package", + "--prerelease", + "--project", + "--python", + "--python-platform", + "--refresh-package", + "--reinstall-package", + "--resolution", + "--upgrade-group", + "--upgrade-package", + "--with", + "--with-editable", + "--with-requirements", + } +) +UV_SHORT_OPTIONS_WITH_VALUE = frozenset({"-C", "-P", "-f", "-i", "-p", "-w"}) def _clip(text: str) -> str: @@ -25,6 +76,193 @@ def _as_text(value: bytes | str | None) -> str: return value +def _path_name(value: str) -> str: + return Path(value).name + + +def _is_python_executable(value: str) -> bool: + name = _path_name(value) + return name == "python" or name.startswith("python3") + + +def _is_python_gate_payload(argv: Sequence[str]) -> bool: + if not argv: + return False + name = _path_name(argv[0]) + if name in PYTHON_GATE_TOOLS: + return True + return _is_python_executable(name) and len(argv) >= 3 and argv[1] == "-m" and argv[2] in PYTHON_GATE_TOOLS + + +def _uv_run_option_args(argv: Sequence[str]) -> list[str]: + if len(argv) < 2 or _path_name(argv[0]) != "uv" or argv[1] != "run": + return [] + options: list[str] = [] + index = 2 + while index < len(argv) and argv[index].startswith("-"): + option = argv[index] + if option == "--": + break + options.append(option) + if "=" in option: + index += 1 + elif option in UV_OPTIONS_WITH_VALUE or option in UV_SHORT_OPTIONS_WITH_VALUE: + if index + 1 < len(argv): + options.append(argv[index + 1]) + index += 2 + else: + index += 1 + return options + + +def _uv_option_value(options: Sequence[str], name: str) -> list[str]: + values: list[str] = [] + index = 0 + while index < len(options): + option = options[index] + if option == name and index + 1 < len(options): + values.append(options[index + 1]) + index += 2 + continue + prefix = f"{name}=" + if option.startswith(prefix): + values.append(option[len(prefix) :]) + index += 1 + return values + + +def _uv_run_excludes_required_dev_selector(argv: Sequence[str], dev_args: tuple[str, str] | tuple[()]) -> bool: + if not dev_args: + return False + options = _uv_run_option_args(argv) + selector, value = dev_args + if selector == "--group": + return "--no-dev" in options or value in _uv_option_value(options, "--no-group") + if selector == "--extra": + return value in _uv_option_value(options, "--no-extra") + return False + + +def _uv_run_has_required_dev_selector(argv: Sequence[str], dev_args: tuple[str, str] | tuple[()]) -> bool: + if not dev_args: + return True + if _uv_run_excludes_required_dev_selector(argv, dev_args): + return False + options = _uv_run_option_args(argv) + selector, value = dev_args + if selector == "--group": + return ( + "--all-groups" in options + or "--only-dev" in options + or value in _uv_option_value(options, "--group") + or value in _uv_option_value(options, "--only-group") + ) + if selector == "--extra": + return "--all-extras" in options or value in _uv_option_value(options, "--extra") + return False + + +def gate_command_preparation_error(command: Sequence[str], *, cwd: Path | str | None = None) -> str | None: + """Return a fail-closed preparation error before executing a gate.""" + argv = list(command) + if len(argv) < 2 or _path_name(argv[0]) != "uv" or argv[1] != "run": + return None + cwd_path = Path(cwd).expanduser().resolve() if cwd is not None else None + dev_args = _uv_dev_args(cwd_path) + if not dev_args: + return None + if not _is_python_gate_payload(_uv_run_payload(argv[2:])): + return None + if _uv_run_excludes_required_dev_selector(argv, dev_args): + selector, value = dev_args + return f"uv gate excludes required dev dependencies ({selector} {value})" + return None + + +def _uv_run_has_lock_guard(argv: Sequence[str]) -> bool: + return any(arg in UV_LOCK_GUARDS for arg in _uv_run_option_args(argv)) + + +def _with_uv_lock_guard(argv: Sequence[str]) -> list[str]: + rendered = list(argv) + if len(rendered) >= 2 and _path_name(rendered[0]) == "uv" and rendered[1] == "run" and not _uv_run_has_lock_guard(rendered): + return [rendered[0], rendered[1], "--locked", *rendered[2:]] + return rendered + + +def _uv_run_payload(argv_after_run: Sequence[str]) -> list[str]: + index = 0 + argv = list(argv_after_run) + while index < len(argv) and argv[index].startswith("-"): + option = argv[index] + if option == "--": + return argv[index + 1 :] + if option in UV_LOCK_GUARDS: + index += 1 + elif "=" in option: + index += 1 + elif option in UV_OPTIONS_WITH_VALUE or option in UV_SHORT_OPTIONS_WITH_VALUE: + index += 2 + else: + index += 1 + return argv[index:] + + +def _uv_dev_args(cwd: Path | None) -> tuple[str, str] | tuple[()]: + if cwd is None: + return () + pyproject = cwd / "pyproject.toml" + if not pyproject.is_file(): + return () + try: + data = tomllib.loads(pyproject.read_text(encoding="utf-8")) + except (OSError, tomllib.TOMLDecodeError): + return () + + dependency_groups = data.get("dependency-groups") + if isinstance(dependency_groups, dict) and "dev" in dependency_groups: + return ("--group", "dev") + + project = data.get("project") + optional_dependencies = ( + project.get("optional-dependencies") if isinstance(project, dict) else None + ) + if isinstance(optional_dependencies, dict) and "dev" in optional_dependencies: + return ("--extra", "dev") + return () + + +def prepare_gate_command(command: Sequence[str], *, cwd: Path | str | None = None) -> list[str]: + """Return the argv to execute, adding the target repo's dev env when needed. + + Python quality gates in AS215932 repos commonly live in a ``dev`` + dependency group or optional extra. Running ``ruff``/``mypy``/``pytest`` + bare can silently test the loop host instead of the target repo. When the + worktree declares a dev dependency set, execute those gates via ``uv run`` + with the matching selector. + """ + argv = list(command) + cwd_path = Path(cwd).expanduser().resolve() if cwd is not None else None + dev_args = _uv_dev_args(cwd_path) + + name = _path_name(argv[0]) if argv else "" + if name == "uv" and len(argv) >= 2 and argv[1] == "run": + locked = _with_uv_lock_guard(argv) + if not dev_args: + return locked + if _uv_run_has_required_dev_selector(locked, dev_args): + return locked + if _is_python_gate_payload(_uv_run_payload(locked[2:])): + return _with_uv_lock_guard([*argv[:2], *dev_args, *argv[2:]]) + return locked + + if not dev_args: + return argv + if _is_python_gate_payload(argv): + return ["uv", "run", "--locked", *dev_args, *argv] + return argv + + def run_gate_commands( commands: Iterable[Sequence[str]], *, @@ -44,10 +282,35 @@ def run_gate_commands( argv = list(command) if not argv: raise ValueError("gate command cannot be empty") + prepared = prepare_gate_command(argv, cwd=cwd) + preparation_error = gate_command_preparation_error(argv, cwd=cwd) + if preparation_error: + result = { + "command": argv, + "executed_command": prepared, + "returncode": 126, + "status": "failed", + "stdout": "", + "stderr": _clip(preparation_error), + } + results.append(result) + errors.append( + { + "node": "gate_execution", + "domain": "ci", + "message": preparation_error, + "command": argv, + "executed_command": prepared, + "returncode": result["returncode"], + "stdout": "", + "stderr": result["stderr"], + } + ) + continue try: completed = subprocess.run( - argv, + prepared, cwd=cwd, capture_output=True, check=False, @@ -56,39 +319,99 @@ def run_gate_commands( ) result = { "command": argv, + "executed_command": prepared, "returncode": completed.returncode, + "status": "passed" if completed.returncode == 0 else "failed", "stdout": _clip(completed.stdout), "stderr": _clip(completed.stderr), } + except FileNotFoundError as exc: + result = { + "command": argv, + "executed_command": prepared, + "returncode": 127, + "status": "failed", + "stdout": "", + "stderr": _clip(f"command not found: {prepared[0]} ({exc})"), + } + except PermissionError as exc: + result = { + "command": argv, + "executed_command": prepared, + "returncode": 126, + "status": "failed", + "stdout": "", + "stderr": _clip(f"command is not executable: {prepared[0]} ({exc})"), + } + except OSError as exc: + result = { + "command": argv, + "executed_command": prepared, + "returncode": 126, + "status": "failed", + "stdout": "", + "stderr": _clip(f"command could not start: {' '.join(prepared)} ({exc})"), + } except subprocess.TimeoutExpired as exc: result = { "command": argv, + "executed_command": prepared, "returncode": 124, + "status": "failed", "stdout": _clip(_as_text(exc.stdout)), "stderr": _clip(_as_text(exc.stderr) or f"timed out after {timeout_seconds}s"), } results.append(result) if result["returncode"] != 0: + stderr = str(result.get("stderr", "")) + stdout = str(result.get("stdout", "")) errors.append( { "node": "gate_execution", "domain": "ci", - "message": f"command failed: {' '.join(argv)}", + "message": f"command failed: {' '.join(prepared)}", + "command": argv, + "executed_command": prepared, "returncode": result["returncode"], - "stderr": result["stderr"], + "stdout": stdout, + "stderr": stderr, } ) return results, errors -def select_gate_commands_for_mutations(paths: Iterable[str]) -> list[list[str]]: +def _mypy_targets(paths: Sequence[str]) -> list[str]: + top_level = sorted( + { + path.split("/", 1)[0] + for path in paths + if path.endswith(".py") and "/" in path and path.split("/", 1)[0] not in {"tests", "test"} + } + ) + if len(top_level) == 1: + return [top_level[0]] + return ["."] + + +def select_gate_commands_for_mutations( + paths: Iterable[str], + *, + cwd: Path | str | None = None, +) -> list[list[str]]: """Select local, workspace-safe gates from proposed mutation paths.""" normalized = [path.split(":", 1)[1] if ":" in path else path for path in paths] if not normalized: return [] if any(path.endswith(".py") for path in normalized): + cwd_path = Path(cwd).expanduser().resolve() if cwd is not None else None + if _uv_dev_args(cwd_path): + return [ + ["uv", "run", "python", "-m", "pytest", "-q", "-p", "no:cacheprovider"], + ["uv", "run", "ruff", "check", "--no-cache", "."], + ["uv", "run", "mypy", "--no-incremental", *_mypy_targets(normalized)], + ] return [[sys.executable, "-m", "compileall", "-q", "."]] if all(path.startswith("docs/") or path.endswith((".md", ".txt", ".rst")) for path in normalized): paths_literal = repr(json.dumps(normalized)) diff --git a/src/hyrule_engineering_loop/nodes.py b/src/hyrule_engineering_loop/nodes.py index e0388a7..06e8161 100644 --- a/src/hyrule_engineering_loop/nodes.py +++ b/src/hyrule_engineering_loop/nodes.py @@ -35,8 +35,10 @@ select_model_for_role, ) from hyrule_engineering_loop.task_spec import ( + DEFAULT_ACCEPTANCE_CRITERIA, DEFAULT_BUDGET, TaskSpecError, + extract_acceptance_criteria_from_markdown, parse_task_spec_text, render_task_spec, write_task_spec, @@ -320,6 +322,9 @@ def planner_node(state: GraphState) -> StateUpdate: (line.strip() for line in request.splitlines() if line.strip()), "(no request text supplied)", )[:300] + acceptance_criteria = extract_acceptance_criteria_from_markdown(request) or list( + DEFAULT_ACCEPTANCE_CRITERIA + ) spec = { "change_id": state["change_id"], "change_class": str(state["change_class"]), @@ -335,11 +340,7 @@ def planner_node(state: GraphState) -> StateUpdate: "budget": dict(state.get("backend_budget") or DEFAULT_BUDGET), "intake_source": "operator", "intent": intent, - "acceptance_criteria": [ - "The request is implemented within the allowed paths of each target repo.", - "All selected gates pass in the branch-backed worktree.", - "The diff introduces no secret material or denied content patterns.", - ], + "acceptance_criteria": acceptance_criteria, "non_goals": "Anything outside the allowed paths; unrelated refactors.", "rollback_sketch": state.get("rollback_plan") or "Discard the generated worktree and branch; no production state changes.", @@ -534,6 +535,56 @@ def _mutation_operations_from_writer(review: RoleReviewOutput, *, source: str) - ] +def _mutation_paths_for_repo(mutations: dict[str, str], repo: str) -> list[str]: + paths: list[str] = [] + for raw_path in mutations: + if ":" not in raw_path: + continue + mutation_repo, path = raw_path.split(":", 1) + if mutation_repo == repo: + paths.append(path) + return sorted(paths) + + +def _select_gate_commands_by_repo( + state: GraphState, + *, + backend_runs: list[dict[str, Any]], + mutations: dict[str, str], +) -> dict[str, list[list[str]]]: + """Select auto gates independently for each branch-backed worktree.""" + selected: dict[str, list[list[str]]] = {} + for worktree in state.get("worktree_results") or []: + repo = str(worktree.get("repo", "")) + if not repo: + continue + paths: list[str] = [] + for run in backend_runs: + if str(run.get("repo", "")) == repo: + paths.extend(str(path) for path in run.get("changed_paths", [])) + if not paths: + paths = _mutation_paths_for_repo(mutations, repo) + if not paths: + continue + worktree_path = worktree.get("worktree_path") + commands = select_gate_commands_for_mutations( + paths, + cwd=str(worktree_path) if worktree_path else None, + ) + if commands: + selected[repo] = commands + return selected + + +def _tag_gate_entries( + entries: list[dict[str, Any]], + *, + repo: str | None, + cwd: str | None, +) -> list[dict[str, Any]]: + return [{**entry, "repo": repo, "cwd": cwd} for entry in entries] + + def worktree_setup_node(state: GraphState) -> StateUpdate: print("[Node: Worktree Setup] Creating branch-backed worktrees before implementation...") if not state.get("promotion_enabled", False): @@ -789,10 +840,23 @@ def delegate_implementation_node(state: GraphState) -> StateUpdate: update["retry_counters"] = _increment_counter( state["retry_counters"], retry_key or "backend" ) - if not state.get("gate_commands") and (changed_paths or mutations): - update["gate_commands"] = select_gate_commands_for_mutations( - changed_paths or list(mutations) + if ( + not state.get("gate_commands") + and not state.get("gate_commands_by_repo") + and (changed_paths or mutations) + ): + per_repo = _select_gate_commands_by_repo( + state, + backend_runs=backend_runs, + mutations=mutations, ) + if per_repo: + update["gate_commands_by_repo"] = per_repo + else: + update["gate_commands"] = select_gate_commands_for_mutations( + changed_paths or list(mutations), + cwd=state.get("workspace_root"), + ) return with_trace( "delegate_implementation", state, @@ -811,9 +875,10 @@ def gate_execution_node(state: GraphState) -> StateUpdate: print("[Node: Gate Execution] Running deterministic validation gates...") if "FAIL_GATES" not in state["change_id"]: commands = state.get("gate_commands", []) - if not commands: + commands_by_repo = state.get("gate_commands_by_repo", {}) + if not commands and not commands_by_repo: update = cast(StateUpdate, {"gate_status": "passed"}) - return with_trace("gate_execution", state, update, input_keys=["gate_commands", "workspace_root"]) + return with_trace("gate_execution", state, update, input_keys=["gate_commands", "gate_commands_by_repo", "workspace_root"]) violations = validate_gate_commands_for_state(state) if violations: @@ -830,19 +895,38 @@ def gate_execution_node(state: GraphState) -> StateUpdate: ], "retry_counters": _increment_counter(state["retry_counters"], "policy"), }) - return with_trace("gate_execution", state, update, input_keys=["gate_commands", "workspace_root"]) + return with_trace("gate_execution", state, update, input_keys=["gate_commands", "gate_commands_by_repo", "workspace_root"]) - cwds: list[str | None] = [ - str(worktree.get("worktree_path")) - for worktree in state.get("worktree_results") or [] - if worktree.get("worktree_path") - ] or [state.get("workspace_root")] results: list[dict[str, Any]] = [] errors: list[dict[str, Any]] = [] - for cwd in cwds: - cwd_results, cwd_errors = run_gate_commands(commands, cwd=cwd) - results.extend(cwd_results) - errors.extend(cwd_errors) + worktrees = [ + worktree + for worktree in state.get("worktree_results") or [] + if worktree.get("worktree_path") + ] + if worktrees: + for worktree in worktrees: + repo = str(worktree.get("repo", "")) or None + cwd = str(worktree.get("worktree_path")) + if commands: + cwd_results, cwd_errors = run_gate_commands(commands, cwd=cwd) + results.extend(_tag_gate_entries(cwd_results, repo=repo, cwd=cwd)) + errors.extend(_tag_gate_entries(cwd_errors, repo=repo, cwd=cwd)) + repo_commands = commands_by_repo.get(repo or "", []) + if repo_commands: + cwd_results, cwd_errors = run_gate_commands(repo_commands, cwd=cwd) + results.extend(_tag_gate_entries(cwd_results, repo=repo, cwd=cwd)) + errors.extend(_tag_gate_entries(cwd_errors, repo=repo, cwd=cwd)) + else: + workspace_cwd = state.get("workspace_root") + if commands: + cwd_results, cwd_errors = run_gate_commands(commands, cwd=workspace_cwd) + results.extend(_tag_gate_entries(cwd_results, repo=None, cwd=workspace_cwd)) + errors.extend(_tag_gate_entries(cwd_errors, repo=None, cwd=workspace_cwd)) + for repo, repo_commands in commands_by_repo.items(): + cwd_results, cwd_errors = run_gate_commands(repo_commands, cwd=workspace_cwd) + results.extend(_tag_gate_entries(cwd_results, repo=repo, cwd=workspace_cwd)) + errors.extend(_tag_gate_entries(cwd_errors, repo=repo, cwd=workspace_cwd)) if errors: update = cast(StateUpdate, { "gate_results": results, @@ -850,9 +934,9 @@ def gate_execution_node(state: GraphState) -> StateUpdate: "retry_counters": _increment_counter(state["retry_counters"], "ci"), "gate_status": "failed", }) - return with_trace("gate_execution", state, update, input_keys=["gate_commands", "workspace_root"]) + return with_trace("gate_execution", state, update, input_keys=["gate_commands", "gate_commands_by_repo", "workspace_root"]) update = cast(StateUpdate, {"gate_results": results, "gate_status": "passed"}) - return with_trace("gate_execution", state, update, input_keys=["gate_commands", "workspace_root"]) + return with_trace("gate_execution", state, update, input_keys=["gate_commands", "gate_commands_by_repo", "workspace_root"]) domain = "security" error = { diff --git a/src/hyrule_engineering_loop/policy.py b/src/hyrule_engineering_loop/policy.py index 65f6629..15f99f3 100644 --- a/src/hyrule_engineering_loop/policy.py +++ b/src/hyrule_engineering_loop/policy.py @@ -11,6 +11,7 @@ import yaml +from hyrule_engineering_loop.gate_runner import gate_command_preparation_error, prepare_gate_command from hyrule_engineering_loop.state import GraphState from hyrule_engineering_loop.workspace import _safe_relative_path @@ -253,13 +254,53 @@ def _validate_gate_commands(state: GraphState, policy: dict[str, Any]) -> list[s return [] violations: list[str] = [] - for command in state.get("gate_commands", []): + + def check(command: list[str], *, prefix: str = "gate command") -> None: if not command: - violations.append("gate command cannot be empty") - continue + violations.append(f"{prefix} cannot be empty") + return name = Path(command[0]).name if name not in allowed: - violations.append(f"gate command not allowlisted: {name}") + violations.append(f"{prefix} not allowlisted: {name}") + + def check_prepared( + command: list[str], + *, + cwd: str | None, + prefix: str = "prepared gate command", + ) -> None: + if not command: + return + preparation_error = gate_command_preparation_error(command, cwd=cwd) + if preparation_error: + violations.append(preparation_error) + prepared = prepare_gate_command(command, cwd=cwd) + if prepared == command: + return + check(prepared, prefix=prefix) + + worktree_cwds = [ + str(worktree.get("worktree_path")) + for worktree in state.get("worktree_results") or [] + if worktree.get("worktree_path") + ] + workspace_cwd = state.get("workspace_root") + global_cwds: list[str | None] = list(worktree_cwds) if worktree_cwds else [workspace_cwd] + cwd_by_repo = { + str(worktree.get("repo", "")): str(worktree.get("worktree_path")) + for worktree in state.get("worktree_results") or [] + if worktree.get("repo") and worktree.get("worktree_path") + } + + for command in state.get("gate_commands", []): + check(command) + for cwd in global_cwds: + check_prepared(command, cwd=cwd) + for repo, commands in state.get("gate_commands_by_repo", {}).items(): + cwd = cwd_by_repo.get(repo, workspace_cwd) + for command in commands: + check(command, prefix=f"gate command for {repo}") + check_prepared(command, cwd=cwd, prefix=f"prepared gate command for {repo}") return violations diff --git a/src/hyrule_engineering_loop/state.py b/src/hyrule_engineering_loop/state.py index b2236d5..395dfdf 100644 --- a/src/hyrule_engineering_loop/state.py +++ b/src/hyrule_engineering_loop/state.py @@ -103,6 +103,7 @@ class GraphState(TypedDict): requires_human_signoff: bool gate_commands: NotRequired[List[List[str]]] + gate_commands_by_repo: NotRequired[Dict[str, List[List[str]]]] gate_results: NotRequired[Annotated[List[Dict[str, Any]], operator.add]] gate_status: NotRequired[GateStatus] prompt_artifacts: NotRequired[Annotated[Dict[str, str], merge_string_map]] diff --git a/src/hyrule_engineering_loop/task_spec.py b/src/hyrule_engineering_loop/task_spec.py index cf6cf79..a360c53 100644 --- a/src/hyrule_engineering_loop/task_spec.py +++ b/src/hyrule_engineering_loop/task_spec.py @@ -10,6 +10,7 @@ from __future__ import annotations +import re from pathlib import Path from typing import Any @@ -30,6 +31,15 @@ "max_cost_usd": 5.0, } +DEFAULT_ACCEPTANCE_CRITERIA = ( + "The request is implemented within the allowed paths of each target repo.", + "All selected gates pass in the branch-backed worktree.", + "The diff introduces no secret material or denied content patterns.", +) + +HEADING_RE = re.compile(r"^(#{1,6})\s+(.+?)\s*#*\s*$") +LIST_ITEM_RE = re.compile(r"^(?:[-*+]\s+(?:\[[ xX]\]\s*)?|(?:\d+|[A-Za-z])[.)]\s+)(.+)$") + class TaskSpecError(RuntimeError): """Raised when a task spec is structurally invalid.""" @@ -76,14 +86,61 @@ def _numbered_items(section_text: str) -> list[str]: stripped = line.strip() if not stripped: continue - head = stripped.split(".", 1) - if head[0].isdigit() and len(head) == 2: - items.append(head[1].strip()) - elif stripped.startswith(("- ", "* ")): - items.append(stripped[2:].strip()) + match = LIST_ITEM_RE.match(stripped) + if match: + items.append(match.group(1).strip()) + continue + if items and (line.startswith((" ", "\t")) or not HEADING_RE.match(stripped)): + items[-1] = f"{items[-1]} {stripped}".strip() return [item for item in items if item and item != "..."] +def _normalized_heading(title: str) -> str: + return re.sub(r"[^a-z0-9]+", " ", title.lower()).strip() + + +def _is_acceptance_heading(title: str) -> bool: + normalized = _normalized_heading(title) + return normalized == "acceptance" or normalized.startswith("acceptance ") + + +def _acceptance_sections(text: str) -> list[str]: + sections: list[str] = [] + active_level: int | None = None + lines: list[str] = [] + for line in text.splitlines(): + heading = HEADING_RE.match(line.strip()) + if heading: + level = len(heading.group(1)) + title = heading.group(2).strip() + if active_level is not None and level <= active_level: + sections.append("\n".join(lines).strip()) + lines = [] + active_level = None + if active_level is None and _is_acceptance_heading(title): + active_level = level + lines = [] + continue + if active_level is not None: + lines.append(line) + if active_level is not None: + sections.append("\n".join(lines).strip()) + return sections + + +def extract_acceptance_criteria_from_markdown(text: str) -> list[str]: + """Extract deterministic acceptance criteria from issue/request Markdown.""" + criteria: list[str] = [] + seen: set[str] = set() + for section in _acceptance_sections(text): + for item in _numbered_items(section): + normalized = re.sub(r"\s+", " ", item).strip() + if normalized and normalized not in seen: + criteria.append(normalized) + seen.add(normalized) + return criteria + + def _repos_with_allowed_paths(frontmatter: dict[str, Any]) -> dict[str, list[str]]: raw_repos = frontmatter.get("repos") if not isinstance(raw_repos, dict) or not raw_repos: diff --git a/src/hyrule_engineering_loop/trace.py b/src/hyrule_engineering_loop/trace.py index 2f8932b..97a9e9a 100644 --- a/src/hyrule_engineering_loop/trace.py +++ b/src/hyrule_engineering_loop/trace.py @@ -11,6 +11,7 @@ from hyrule_engineering_loop.state import GraphState TRACE_FILENAME = "loop_trace.json" +TRACE_OUTPUT_CHARS = 2_000 def _resolve_trace_dir(state: GraphState) -> Path | None: @@ -22,6 +23,13 @@ def _resolve_trace_dir(state: GraphState) -> Path | None: return path +def _trace_output_excerpt(value: Any) -> str: + text = str(value or "") + if len(text) <= TRACE_OUTPUT_CHARS: + return text + return text[:TRACE_OUTPUT_CHARS] + "\n[trace output truncated]" + + def _summarize_value(key: str, value: Any) -> Any: if key in {"feature_request"}: return {"chars": len(str(value))} @@ -66,9 +74,13 @@ def _summarize_value(key: str, value: Any) -> Any: if key in {"gate_results"} and isinstance(value, list): return [ { + "repo": item.get("repo"), "command": item.get("command"), + "executed_command": item.get("executed_command", item.get("command")), "status": item.get("status"), "returncode": item.get("returncode"), + "stdout": _trace_output_excerpt(item.get("stdout", "")), + "stderr": _trace_output_excerpt(item.get("stderr", "")), } for item in value if isinstance(item, dict) diff --git a/tests/test_gate_runner.py b/tests/test_gate_runner.py index 4384cad..d93dbfa 100644 --- a/tests/test_gate_runner.py +++ b/tests/test_gate_runner.py @@ -1,6 +1,11 @@ from __future__ import annotations +import os + +import pytest + from hyrule_engineering_loop.gate_runner import run_gate_commands, select_gate_commands_for_mutations +from hyrule_engineering_loop.trace import compact_update def test_docs_gate_reads_only_mutated_text_paths(tmp_path) -> None: @@ -23,5 +28,411 @@ def test_docs_gate_reports_non_utf8_mutated_file(tmp_path) -> None: results, errors = run_gate_commands(commands, cwd=tmp_path) assert results[0]["returncode"] == 1 + assert results[0]["status"] == "failed" assert errors assert "UnicodeDecodeError" in errors[0]["stderr"] + + +def test_missing_gate_binary_is_a_structured_failure(tmp_path) -> None: + results, errors = run_gate_commands([["definitely-not-a-real-loop-gate"]], cwd=tmp_path) + + assert results[0]["returncode"] == 127 + assert results[0]["status"] == "failed" + assert errors[0]["domain"] == "ci" + assert "command not found" in errors[0]["stderr"] + + +def test_python_gate_uses_uv_dev_group(tmp_path, monkeypatch: pytest.MonkeyPatch) -> None: + repo = tmp_path / "repo" + repo.mkdir() + (repo / "pyproject.toml").write_text( + "[dependency-groups]\ndev = ['ruff']\n", + encoding="utf-8", + ) + bin_dir = tmp_path / "bin" + bin_dir.mkdir() + log_path = tmp_path / "uv-args.txt" + uv = bin_dir / "uv" + uv.write_text("#!/bin/sh\nprintf '%s\n' \"$@\" > \"$UV_ARG_LOG\"\n", encoding="utf-8") + uv.chmod(0o755) + monkeypatch.setenv("PATH", f"{bin_dir}{os.pathsep}{os.environ.get('PATH', '')}") + monkeypatch.setenv("UV_ARG_LOG", str(log_path)) + + results, errors = run_gate_commands([["ruff", "check", "."]], cwd=repo) + + assert errors == [] + assert results[0]["executed_command"] == [ + "uv", + "run", + "--locked", + "--group", + "dev", + "ruff", + "check", + ".", + ] + assert log_path.read_text(encoding="utf-8").splitlines() == [ + "run", + "--locked", + "--group", + "dev", + "ruff", + "check", + ".", + ] + + +def test_uv_gate_uses_optional_dev_extra(tmp_path, monkeypatch: pytest.MonkeyPatch) -> None: + repo = tmp_path / "repo" + repo.mkdir() + (repo / "pyproject.toml").write_text( + "[project]\nname = 'demo'\nversion = '0'\n[project.optional-dependencies]\ndev = ['mypy']\n", + encoding="utf-8", + ) + bin_dir = tmp_path / "bin" + bin_dir.mkdir() + log_path = tmp_path / "uv-args.txt" + uv = bin_dir / "uv" + uv.write_text("#!/bin/sh\nprintf '%s\n' \"$@\" > \"$UV_ARG_LOG\"\n", encoding="utf-8") + uv.chmod(0o755) + monkeypatch.setenv("PATH", f"{bin_dir}{os.pathsep}{os.environ.get('PATH', '')}") + monkeypatch.setenv("UV_ARG_LOG", str(log_path)) + + results, errors = run_gate_commands([["uv", "run", "mypy", "."]], cwd=repo) + + assert errors == [] + assert results[0]["executed_command"] == ["uv", "run", "--locked", "--extra", "dev", "mypy", "."] + + +def test_explicit_uv_gate_without_dev_env_still_gets_lock_guard( + tmp_path, + monkeypatch: pytest.MonkeyPatch, +) -> None: + (tmp_path / "pyproject.toml").write_text("[project]\nname = 'demo'\nversion = '0'\n", encoding="utf-8") + bin_dir = tmp_path / "bin" + bin_dir.mkdir() + log_path = tmp_path / "uv-args.txt" + uv = bin_dir / "uv" + uv.write_text("#!/bin/sh\nprintf '%s\n' \"$@\" > \"$UV_ARG_LOG\"\n", encoding="utf-8") + uv.chmod(0o755) + monkeypatch.setenv("PATH", f"{bin_dir}{os.pathsep}{os.environ.get('PATH', '')}") + monkeypatch.setenv("UV_ARG_LOG", str(log_path)) + + results, errors = run_gate_commands([["uv", "run", "python", "-c", "pass"]], cwd=tmp_path) + + assert errors == [] + assert results[0]["executed_command"] == ["uv", "run", "--locked", "python", "-c", "pass"] + + +def test_uv_value_option_does_not_hide_python_gate_payload( + tmp_path, + monkeypatch: pytest.MonkeyPatch, +) -> None: + (tmp_path / "pyproject.toml").write_text( + "[dependency-groups]\ndev = ['pytest']\n", + encoding="utf-8", + ) + bin_dir = tmp_path / "bin" + bin_dir.mkdir() + log_path = tmp_path / "uv-args.txt" + uv = bin_dir / "uv" + uv.write_text("#!/bin/sh\nprintf '%s\n' \"$@\" > \"$UV_ARG_LOG\"\n", encoding="utf-8") + uv.chmod(0o755) + monkeypatch.setenv("PATH", f"{bin_dir}{os.pathsep}{os.environ.get('PATH', '')}") + monkeypatch.setenv("UV_ARG_LOG", str(log_path)) + + results, errors = run_gate_commands( + [["uv", "run", "--package", "api", "pytest", "-q"]], + cwd=tmp_path, + ) + + assert errors == [] + assert results[0]["executed_command"] == [ + "uv", + "run", + "--locked", + "--group", + "dev", + "--package", + "api", + "pytest", + "-q", + ] + + +def test_uv_no_group_dev_exclusion_fails_closed(tmp_path) -> None: + (tmp_path / "pyproject.toml").write_text( + "[dependency-groups]\ndev = ['pytest']\n", + encoding="utf-8", + ) + + results, errors = run_gate_commands( + [["uv", "run", "--all-groups", "--no-group", "dev", "pytest", "-q"]], + cwd=tmp_path, + ) + + assert results[0]["returncode"] == 126 + assert results[0]["status"] == "failed" + assert "excludes required dev dependencies" in results[0]["stderr"] + assert errors[0]["domain"] == "ci" + + +def test_uv_only_group_dev_satisfies_dev_selector( + tmp_path, + monkeypatch: pytest.MonkeyPatch, +) -> None: + (tmp_path / "pyproject.toml").write_text( + "[dependency-groups]\ndev = ['pytest']\n", + encoding="utf-8", + ) + bin_dir = tmp_path / "bin" + bin_dir.mkdir() + log_path = tmp_path / "uv-args.txt" + uv = bin_dir / "uv" + uv.write_text("#!/bin/sh\nprintf '%s\n' \"$@\" > \"$UV_ARG_LOG\"\n", encoding="utf-8") + uv.chmod(0o755) + monkeypatch.setenv("PATH", f"{bin_dir}{os.pathsep}{os.environ.get('PATH', '')}") + monkeypatch.setenv("UV_ARG_LOG", str(log_path)) + + results, errors = run_gate_commands( + [["uv", "run", "--only-group", "dev", "pytest", "-q"]], + cwd=tmp_path, + ) + + assert errors == [] + assert results[0]["executed_command"] == [ + "uv", + "run", + "--locked", + "--only-group", + "dev", + "pytest", + "-q", + ] + + +def test_uv_only_dev_satisfies_dev_selector( + tmp_path, + monkeypatch: pytest.MonkeyPatch, +) -> None: + (tmp_path / "pyproject.toml").write_text( + "[dependency-groups]\ndev = ['pytest']\n", + encoding="utf-8", + ) + bin_dir = tmp_path / "bin" + bin_dir.mkdir() + log_path = tmp_path / "uv-args.txt" + uv = bin_dir / "uv" + uv.write_text("#!/bin/sh\nprintf '%s\n' \"$@\" > \"$UV_ARG_LOG\"\n", encoding="utf-8") + uv.chmod(0o755) + monkeypatch.setenv("PATH", f"{bin_dir}{os.pathsep}{os.environ.get('PATH', '')}") + monkeypatch.setenv("UV_ARG_LOG", str(log_path)) + + results, errors = run_gate_commands( + [["uv", "run", "--only-dev", "pytest", "-q"]], + cwd=tmp_path, + ) + + assert errors == [] + assert results[0]["executed_command"] == [ + "uv", + "run", + "--locked", + "--only-dev", + "pytest", + "-q", + ] + + +def test_uv_non_dev_extra_does_not_suppress_dev_selector( + tmp_path, + monkeypatch: pytest.MonkeyPatch, +) -> None: + (tmp_path / "pyproject.toml").write_text( + "[project]\nname = 'demo'\nversion = '0'\n[project.optional-dependencies]\ndev = ['pytest']\ndocs = ['mkdocs']\n", + encoding="utf-8", + ) + bin_dir = tmp_path / "bin" + bin_dir.mkdir() + log_path = tmp_path / "uv-args.txt" + uv = bin_dir / "uv" + uv.write_text("#!/bin/sh\nprintf '%s\n' \"$@\" > \"$UV_ARG_LOG\"\n", encoding="utf-8") + uv.chmod(0o755) + monkeypatch.setenv("PATH", f"{bin_dir}{os.pathsep}{os.environ.get('PATH', '')}") + monkeypatch.setenv("UV_ARG_LOG", str(log_path)) + + results, errors = run_gate_commands( + [["uv", "run", "--extra", "docs", "pytest", "-q"]], + cwd=tmp_path, + ) + + assert errors == [] + assert results[0]["executed_command"] == [ + "uv", + "run", + "--locked", + "--extra", + "dev", + "--extra", + "docs", + "pytest", + "-q", + ] + + +def test_uv_no_argument_flag_does_not_hide_python_gate_payload( + tmp_path, + monkeypatch: pytest.MonkeyPatch, +) -> None: + (tmp_path / "pyproject.toml").write_text( + "[dependency-groups]\ndev = ['pytest']\n", + encoding="utf-8", + ) + bin_dir = tmp_path / "bin" + bin_dir.mkdir() + log_path = tmp_path / "uv-args.txt" + uv = bin_dir / "uv" + uv.write_text("#!/bin/sh\nprintf '%s\n' \"$@\" > \"$UV_ARG_LOG\"\n", encoding="utf-8") + uv.chmod(0o755) + monkeypatch.setenv("PATH", f"{bin_dir}{os.pathsep}{os.environ.get('PATH', '')}") + monkeypatch.setenv("UV_ARG_LOG", str(log_path)) + + results, errors = run_gate_commands( + [["uv", "run", "--managed-python", "pytest", "-q"]], + cwd=tmp_path, + ) + + assert errors == [] + assert results[0]["executed_command"] == [ + "uv", + "run", + "--locked", + "--group", + "dev", + "--managed-python", + "pytest", + "-q", + ] + + +def test_uv_payload_lock_like_arg_does_not_count_as_lock_guard( + tmp_path, + monkeypatch: pytest.MonkeyPatch, +) -> None: + bin_dir = tmp_path / "bin" + bin_dir.mkdir() + log_path = tmp_path / "uv-args.txt" + uv = bin_dir / "uv" + uv.write_text("#!/bin/sh\nprintf '%s\n' \"$@\" > \"$UV_ARG_LOG\"\n", encoding="utf-8") + uv.chmod(0o755) + monkeypatch.setenv("PATH", f"{bin_dir}{os.pathsep}{os.environ.get('PATH', '')}") + monkeypatch.setenv("UV_ARG_LOG", str(log_path)) + + results, errors = run_gate_commands( + [["uv", "run", "python", "tools/check.py", "--locked"]], + cwd=tmp_path, + ) + + assert errors == [] + assert results[0]["executed_command"] == [ + "uv", + "run", + "--locked", + "python", + "tools/check.py", + "--locked", + ] + + +def test_uv_double_dash_payload_lock_like_arg_does_not_count_as_lock_guard( + tmp_path, + monkeypatch: pytest.MonkeyPatch, +) -> None: + bin_dir = tmp_path / "bin" + bin_dir.mkdir() + log_path = tmp_path / "uv-args.txt" + uv = bin_dir / "uv" + uv.write_text("#!/bin/sh\nprintf '%s\n' \"$@\" > \"$UV_ARG_LOG\"\n", encoding="utf-8") + uv.chmod(0o755) + monkeypatch.setenv("PATH", f"{bin_dir}{os.pathsep}{os.environ.get('PATH', '')}") + monkeypatch.setenv("UV_ARG_LOG", str(log_path)) + + results, errors = run_gate_commands( + [["uv", "run", "--", "python", "tools/check.py", "--frozen"]], + cwd=tmp_path, + ) + + assert errors == [] + assert results[0]["executed_command"] == [ + "uv", + "run", + "--locked", + "--", + "python", + "tools/check.py", + "--frozen", + ] + + +def test_uv_gate_preserves_explicit_frozen_guard(tmp_path, monkeypatch: pytest.MonkeyPatch) -> None: + bin_dir = tmp_path / "bin" + bin_dir.mkdir() + log_path = tmp_path / "uv-args.txt" + uv = bin_dir / "uv" + uv.write_text("#!/bin/sh\nprintf '%s\n' \"$@\" > \"$UV_ARG_LOG\"\n", encoding="utf-8") + uv.chmod(0o755) + monkeypatch.setenv("PATH", f"{bin_dir}{os.pathsep}{os.environ.get('PATH', '')}") + monkeypatch.setenv("UV_ARG_LOG", str(log_path)) + + results, errors = run_gate_commands([["uv", "run", "--frozen", "python", "-c", "pass"]], cwd=tmp_path) + + assert errors == [] + assert results[0]["executed_command"] == ["uv", "run", "--frozen", "python", "-c", "pass"] + + +def test_python_mutations_select_repo_quality_gates_when_dev_env_exists(tmp_path) -> None: + (tmp_path / "pyproject.toml").write_text( + "[dependency-groups]\ndev = ['pytest', 'ruff', 'mypy']\n", + encoding="utf-8", + ) + + commands = select_gate_commands_for_mutations(["hyrule_cloud/api.py"], cwd=tmp_path) + + assert commands == [ + ["uv", "run", "python", "-m", "pytest", "-q", "-p", "no:cacheprovider"], + ["uv", "run", "ruff", "check", "--no-cache", "."], + ["uv", "run", "mypy", "--no-incremental", "hyrule_cloud"], + ] + + +def test_python_mutation_gates_disable_worktree_cache_dirs(tmp_path) -> None: + (tmp_path / "pyproject.toml").write_text( + "[dependency-groups]\ndev = ['pytest', 'ruff', 'mypy']\n", + encoding="utf-8", + ) + + commands = select_gate_commands_for_mutations(["pkg/module.py"], cwd=tmp_path) + + assert ["-p", "no:cacheprovider"] == commands[0][-2:] + assert "--no-cache" in commands[1] + assert "--no-incremental" in commands[2] + + +def test_gate_output_is_visible_in_compact_trace() -> None: + summary = compact_update( + { + "gate_results": [ + { + "command": ["ruff", "check", "."], + "executed_command": ["uv", "run", "--locked", "--group", "dev", "ruff", "check", "."], + "returncode": 1, + "status": "failed", + "stdout": "stdout detail", + "stderr": "stderr detail", + } + ] + } + ) + + assert summary["gate_results"][0]["stdout"] == "stdout detail" + assert summary["gate_results"][0]["stderr"] == "stderr detail" diff --git a/tests/test_phase20_agent_backend.py b/tests/test_phase20_agent_backend.py index e1b442e..f48c3d1 100644 --- a/tests/test_phase20_agent_backend.py +++ b/tests/test_phase20_agent_backend.py @@ -4,6 +4,7 @@ import json import subprocess +import sys from pathlib import Path from typing import Any, cast @@ -22,8 +23,9 @@ from hyrule_engineering_loop.cli import main from hyrule_engineering_loop.feature import build_feature_state from hyrule_engineering_loop.graph import build_graph +from hyrule_engineering_loop.nodes import delegate_implementation_node from hyrule_engineering_loop.model_policy import select_backend_for_state, validate_model_policy -from hyrule_engineering_loop.promotion import rollback_promotions +from hyrule_engineering_loop.promotion import rollback_promotions, setup_worktrees_for_state from hyrule_engineering_loop.state import GraphState @@ -137,6 +139,11 @@ def test_subprocess_backend_command_assembly_and_refusals(tmp_path: Path) -> Non assert command[command.index("--max-turns") + 1] == "7" assert "CMD_ASSEMBLY" in command[command.index("-p") + 1] + pi_command = PiBackend().build_command( + prompt=assemble_backend_prompt(spec, constraints), constraints=constraints + ) + assert pi_command[:4] == ["pi", "--print", "--mode", "json"] + refused = PiBackend().execute(task_spec=spec, worktree=None, constraints=constraints) assert refused.status == "failed" assert "requires a branch-backed worktree" in str(refused.error) @@ -148,6 +155,107 @@ def test_subprocess_backend_command_assembly_and_refusals(tmp_path: Path) -> Non assert "acceptEdits" not in read_only_command +def test_pi_backend_parses_single_json_error_event() -> None: + stdout = json.dumps( + { + "type": "agent_error", + "error": {"message": "provider refused the request"}, + "willRetry": False, + } + ) + + parsed = PiBackend()._parse_harness_output(stdout) + + assert parsed["num_turns"] == 1 + assert parsed["is_error"] is True + + +def test_pi_backend_retry_bookkeeping_does_not_fail_later_success() -> None: + stdout = "\n".join( + json.dumps(event) + for event in [ + {"type": "compaction_end", "willRetry": True}, + { + "type": "message_end", + "message": { + "role": "assistant", + "content": [{"type": "text", "text": "recovered"}], + "stopReason": "stop", + "usage": {"input": 20, "output": 3, "cost": {"total": 0.02}}, + }, + }, + {"type": "turn_end", "message": {"role": "assistant", "content": []}}, + {"type": "agent_end", "willRetry": False}, + ] + ) + + parsed = PiBackend()._parse_harness_output(stdout) + + assert parsed["is_error"] is False + assert parsed["result"] == "recovered" + assert parsed["total_cost_usd"] == 0.02 + + +def test_pi_backend_treats_assistant_error_stop_reason_as_error() -> None: + stdout = "\n".join( + json.dumps(event) + for event in [ + { + "type": "message_end", + "message": { + "role": "assistant", + "content": [{"type": "text", "text": "provider failed"}], + "stopReason": "error", + "errorMessage": "upstream provider failed", + "usage": { + "input": 10, + "output": 2, + "cost": {"total": 0.01}, + }, + }, + }, + {"type": "turn_end", "message": {"role": "assistant", "content": []}}, + ] + ) + + parsed = PiBackend()._parse_harness_output(stdout) + + assert parsed["is_error"] is True + assert parsed["result"] == "provider failed" + assert parsed["total_cost_usd"] == 0.01 + + +def test_pi_backend_parses_json_event_usage_and_cost() -> None: + stdout = "\n".join( + json.dumps(event) + for event in [ + {"type": "agent_start"}, + { + "type": "message_end", + "message": { + "role": "assistant", + "content": [{"type": "text", "text": "draft complete"}], + "usage": { + "input": 1323, + "output": 5, + "cost": {"input": 0.006615, "output": 0.00015, "total": 0.006765}, + }, + }, + }, + {"type": "turn_end", "message": {"role": "assistant", "content": []}}, + {"type": "agent_end", "willRetry": False}, + ] + ) + + parsed = PiBackend()._parse_harness_output(stdout) + + assert parsed["num_turns"] == 1 + assert parsed["usage"] == {"input_tokens": 1323, "output_tokens": 5} + assert parsed["total_cost_usd"] == 0.006765 + assert parsed["result"] == "draft complete" + assert parsed["is_error"] is False + + def test_backend_selection_follows_tier_escalation(tmp_path: Path) -> None: policy_path = tmp_path / "model-policy.yml" policy_path.write_text( @@ -211,6 +319,35 @@ def _state(risk: str) -> GraphState: assert any("unknown default backend" in error for error in result["errors"]) +def test_subprocess_backend_enforces_reported_cost_budget(tmp_path: Path) -> None: + repo = tmp_path / "repo" + _init_repo(repo) + backend = ClaudeCodeBackend( + command=[ + sys.executable, + "-c", + "import json; print(json.dumps({'num_turns': 2, 'total_cost_usd': 2.5, 'usage': {'input_tokens': 10, 'output_tokens': 20}, 'result': 'done'}))", + ] + ) + spec = TaskSpec( + change_id="COST_BUDGET", + change_class="app_feature", + risk_level="low", + request="exercise cost budget", + allowed_paths={"repo": ("docs",)}, + ) + + result = backend.execute( + task_spec=spec, + worktree=repo, + constraints=BackendConstraints(max_cost_usd=1.0), + ) + + assert result.status == "budget_exhausted" + assert result.cost.usd == 2.5 + assert "exceeded run budget" in result.notes + + def test_budget_exhaustion_routes_to_human_signoff(tmp_path: Path) -> None: state = _feature_state(tmp_path, "BUDGET_EXHAUSTED", allow=["docs"]) state["backend_budget"] = {"max_iterations": 0} @@ -328,6 +465,78 @@ def test_policy_guard_enforces_changed_file_cap(tmp_path: Path) -> None: rollback_promotions(final_state["worktree_results"]) +def test_auto_gate_selection_is_per_worktree(tmp_path: Path) -> None: + workspace_root = tmp_path / "workspace" + _init_repo(workspace_root / "docs-repo") + _init_repo(workspace_root / "python-repo") + (workspace_root / "python-repo" / "pyproject.toml").write_text( + "[dependency-groups]\ndev = ['pytest', 'ruff', 'mypy']\n", + encoding="utf-8", + ) + (workspace_root / "python-repo" / "python_repo").mkdir() + (workspace_root / "python-repo" / "python_repo" / "__init__.py").write_text("", encoding="utf-8") + _run(["git", "add", "pyproject.toml", "python_repo/__init__.py"], workspace_root / "python-repo") + _run(["git", "commit", "-m", "add python project"], workspace_root / "python-repo") + + state = cast( + GraphState, + { + "change_id": "MULTI_REPO_GATES", + "change_class": "app_feature", + "risk_level": "low", + "customer_impact": "none", + "source_of_truth_files": [], + "proposed_mutations": {}, + "mcp_schema_breaking": False, + "emulated_lab_verified": "not_applicable", + "validation_errors": [], + "role_approvals": {}, + "retry_counters": {}, + "rollback_plan": "", + "noc_handoff_metadata": {}, + "requires_human_signoff": False, + "promotion_enabled": True, + "promotion_repositories": { + "docs-repo": str(workspace_root / "docs-repo"), + "python-repo": str(workspace_root / "python-repo"), + }, + "promotion_allowed_paths": { + "docs-repo": ["docs"], + "python-repo": ["python_repo"], + }, + "promotion_worktree_root": str(tmp_path / "worktrees"), + "promotion_branch_prefix": "hyrule-feature", + "feature_request": "exercise per-worktree gate selection", + "llm_mock_responses": { + "implementation_writer": { + "approved": True, + "proposed_mutations": [ + { + "path": "python-repo:python_repo/change.py", + "content": "VALUE = 1\n", + "operation": "create", + } + ], + } + }, + }, + ) + worktrees = setup_worktrees_for_state(state) + state["worktree_results"] = worktrees + + update = delegate_implementation_node(state) + + assert "gate_commands" not in update + assert update["gate_commands_by_repo"] == { + "python-repo": [ + ["uv", "run", "python", "-m", "pytest", "-q", "-p", "no:cacheprovider"], + ["uv", "run", "ruff", "check", "--no-cache", "."], + ["uv", "run", "mypy", "--no-incremental", "python_repo"], + ] + } + rollback_promotions(worktrees) + + def test_backend_canary_dry_live_assembles_without_execution( tmp_path: Path, capsys: pytest.CaptureFixture[str], diff --git a/tests/test_phase21_task_specs.py b/tests/test_phase21_task_specs.py index d0d9fe2..60311f9 100644 --- a/tests/test_phase21_task_specs.py +++ b/tests/test_phase21_task_specs.py @@ -23,7 +23,13 @@ from hyrule_engineering_loop.prompts import load_role_prompts from hyrule_engineering_loop.promotion import rollback_promotions from hyrule_engineering_loop.state import GraphState -from hyrule_engineering_loop.task_spec import TaskSpecError, parse_task_spec_text, render_task_spec +from hyrule_engineering_loop.task_spec import ( + DEFAULT_ACCEPTANCE_CRITERIA, + TaskSpecError, + extract_acceptance_criteria_from_markdown, + parse_task_spec_text, + render_task_spec, +) def _run(command: list[str], cwd: Path) -> None: @@ -160,6 +166,67 @@ def test_planner_failure_routes_to_human_signoff(tmp_path: Path) -> None: assert "delegate_implementation" not in nodes +# --- Issue-body acceptance extraction (#11) --------------------------------- + + +def test_extract_acceptance_criteria_from_markdown_handles_common_issue_shapes() -> None: + body = """# Add VPS flow + +Context paragraph. + +## Acceptance + +- [ ] States and models represent VPS provisioning. +- Smoke tests cover the no-op path. +1. No production mutation happens without operator approval. + +## Notes + +This is not an acceptance item. + +### Acceptance criteria + +* Duplicate headings are allowed. +* Smoke tests cover the no-op path. +""" + + assert extract_acceptance_criteria_from_markdown(body) == [ + "States and models represent VPS provisioning.", + "Smoke tests cover the no-op path.", + "No production mutation happens without operator approval.", + "Duplicate headings are allowed.", + ] + + +def test_planner_derives_acceptance_criteria_from_request_body(tmp_path: Path) -> None: + state = _feature_state(tmp_path, "PLANNER_ACCEPTANCE") + state["feature_request"] = """Add the VPS no-op launch path. + +## Acceptance criteria + +- The API has an idempotent no-op launch path. +- `uv run --group dev pytest` passes for the target repo. +""" + + final_state = dict(build_graph().invoke(state)) + + assert final_state["task_spec"]["acceptance_criteria"] == [ + "The API has an idempotent no-op launch path.", + "`uv run --group dev pytest` passes for the target repo.", + ] + rollback_promotions(final_state["promotion_results"]) + + +def test_planner_falls_back_to_generic_acceptance_without_section(tmp_path: Path) -> None: + state = _feature_state(tmp_path, "PLANNER_DEFAULT_ACCEPTANCE") + state["feature_request"] = "Add a task-spec driven tranche without explicit acceptance." + + final_state = dict(build_graph().invoke(state)) + + assert final_state["task_spec"]["acceptance_criteria"] == list(DEFAULT_ACCEPTANCE_CRITERIA) + rollback_promotions(final_state["promotion_results"]) + + # --- AC2: consult + judgment recorded --------------------------------------- diff --git a/tests/test_phase24_daemon.py b/tests/test_phase24_daemon.py index 2411081..b887faa 100644 --- a/tests/test_phase24_daemon.py +++ b/tests/test_phase24_daemon.py @@ -4,6 +4,7 @@ import json import subprocess +from datetime import UTC, datetime from pathlib import Path from typing import Any, cast @@ -14,6 +15,7 @@ DaemonConfig, DaemonReport, acquire_lock, + backend_budget_for_issue, classify_issue, daemon_once, notify_discord, @@ -220,6 +222,40 @@ def test_daemon_defaults_to_core_repos_and_low_and_slow_budget() -> None: assert config.allowed_paths_by_repo == {} +def test_loop_budget_label_raises_only_the_per_issue_run_budget() -> None: + item = IntakeItem( + repo="AS215932/hyrule-cloud", + number=12, + title="Feature-sized work", + url="u", + labels=("loop:approved", "loop:budget-xl"), + updated_at="", + score=0.0, + body_complete=True, + ) + + budget = backend_budget_for_issue(item, DaemonConfig()) + + assert budget == {"max_iterations": 60, "max_wall_clock_minutes": 120, "max_cost_usd": 10.0} + + +def test_loop_budget_label_is_clamped_to_remaining_daily_cost() -> None: + item = IntakeItem( + repo="AS215932/hyrule-cloud", + number=12, + title="Feature-sized work", + url="u", + labels=("loop:approved", "loop:budget-xl"), + updated_at="", + score=0.0, + body_complete=True, + ) + + budget = backend_budget_for_issue(item, DaemonConfig(), remaining_cost_usd=4.25) + + assert budget == {"max_iterations": 60, "max_wall_clock_minutes": 120, "max_cost_usd": 4.25} + + def _capture_allowed_paths(tmp_path: Path, config_kwargs: dict[str, Any], repo: str = "AS215932/hyrule-cloud") -> dict[str, Any]: captured: dict[str, Any] = {} @@ -256,6 +292,98 @@ def test_daemon_allowed_paths_unlisted_repo_falls_back_to_docs(tmp_path: Path) - assert captured["allowed_paths"] == ["docs"] +def test_daemon_passes_issue_budget_override_to_feature_runner(tmp_path: Path) -> None: + captured: dict[str, Any] = {} + + def runner(**kwargs: Any) -> dict[str, Any]: + captured.update(kwargs) + return {"final_state": {}, "state_path": str(tmp_path / "state.json")} + + repo = "AS215932/hyrule-cloud" + config = DaemonConfig(repos=(repo,), state_dir=tmp_path / "state", output_root=tmp_path / "runs") + gh = FakeGh( + { + "issue list": _approved_issue_json(12, repo=repo, labels=["loop:approved", "loop:budget-xl"]), + "issue view": json.dumps({"body": "x"}), + } + ) + + daemon_once(config, client=gh, feature_runner=runner) + + assert captured["backend_budget"] == { + "max_iterations": 60, + "max_wall_clock_minutes": 120, + "max_cost_usd": 10.0, + } + + +def test_daemon_blocks_publication_when_reported_cost_exceeds_run_budget(tmp_path: Path) -> None: + def runner(**kwargs: Any) -> dict[str, Any]: + return { + "state_path": str(kwargs["output_root"] / "state" / f"{kwargs['change_id']}.json"), + "signoff_status": "ready_for_review", + "final_state": { + "promotion_results": [{"repo": "hyrule-cloud", "branch": "b", "worktree_path": "w"}], + "noc_handoff_path": "h", + "backend_results": [{"cost": {"usd": 6.0}}], + "reflection_results": {"written": True}, + }, + } + + published: list[dict[str, Any]] = [] + repo = "AS215932/hyrule-cloud" + config = DaemonConfig(repos=(repo,), state_dir=tmp_path / "state", output_root=tmp_path / "runs") + gh = FakeGh( + { + "issue list": _approved_issue_json(13, repo=repo, labels=["loop:approved"]), + "issue view": json.dumps({"body": "x"}), + } + ) + + report = daemon_once( + config, + client=gh, + feature_runner=runner, + publisher=lambda state, **kwargs: published.append(state) or [], + ) + + assert report.outcome == "needs_triage" + assert "exceeded budget" in report.detail + assert published == [] + + +def test_daemon_clamps_issue_budget_to_remaining_daily_cost(tmp_path: Path) -> None: + captured: dict[str, Any] = {} + + def runner(**kwargs: Any) -> dict[str, Any]: + captured.update(kwargs) + return {"final_state": {}, "state_path": str(tmp_path / "state.json")} + + repo = "AS215932/hyrule-cloud" + state_dir = tmp_path / "state" + state_dir.mkdir() + day = datetime.now(UTC).strftime("%Y-%m-%d") + (state_dir / f"ledger-{day}.json").write_text( + json.dumps({"runs": 1, "cost_usd": 6.0, "wall_clock_seconds": 10.0}), + encoding="utf-8", + ) + config = DaemonConfig(repos=(repo,), state_dir=state_dir, output_root=tmp_path / "runs") + gh = FakeGh( + { + "issue list": _approved_issue_json(12, repo=repo, labels=["loop:approved", "loop:budget-xl"]), + "issue view": json.dumps({"body": "x"}), + } + ) + + daemon_once(config, client=gh, feature_runner=runner) + + assert captured["backend_budget"] == { + "max_iterations": 60, + "max_wall_clock_minutes": 120, + "max_cost_usd": 4.0, + } + + def test_repo_name_for_issue_maps_core_repo_checkout_names() -> None: cases = { "AS215932/engineering-loop": "engineering-loop", diff --git a/tests/test_phase7_policy.py b/tests/test_phase7_policy.py index 1b5a1dc..e65ba11 100644 --- a/tests/test_phase7_policy.py +++ b/tests/test_phase7_policy.py @@ -8,7 +8,12 @@ import yaml from hyrule_engineering_loop.graph import build_graph -from hyrule_engineering_loop.policy import PolicyViolation, validate_graph_state, validate_pr_remote +from hyrule_engineering_loop.policy import ( + PolicyViolation, + validate_gate_commands_for_state, + validate_graph_state, + validate_pr_remote, +) from hyrule_engineering_loop.state import GraphState @@ -126,6 +131,62 @@ def test_policy_node_stops_graph_before_promotion(tmp_path: Path) -> None: assert "promotion_status" not in final_state +def test_policy_rejects_uv_gate_that_excludes_required_dev_deps(tmp_path: Path) -> None: + policy_path = _write_policy( + tmp_path / "policy.yml", + { + "defaults": { + "max_changed_files": 3, + "max_file_bytes": 100, + "denied_path_globs": [], + "denied_content_patterns": [], + "allowed_gate_commands": ["uv"], + "protected_branch_prefixes": [], + "allowed_pr_remotes": ["origin"], + "allowed_handoff_dirs": [str(tmp_path)], + } + }, + ) + repo = tmp_path / "repo" + repo.mkdir() + (repo / "pyproject.toml").write_text("[dependency-groups]\ndev = ['pytest']\n", encoding="utf-8") + state = _base_state(policy_path) + state["gate_commands"] = [["uv", "run", "--all-groups", "--no-group", "dev", "pytest", "-q"]] + state["worktree_results"] = [{"repo": "demo", "worktree_path": str(repo)}] + + violations = validate_gate_commands_for_state(state) + + assert "uv gate excludes required dev dependencies (--group dev)" in violations + + +def test_policy_validates_prepared_gate_command(tmp_path: Path) -> None: + policy_path = _write_policy( + tmp_path / "policy.yml", + { + "defaults": { + "max_changed_files": 3, + "max_file_bytes": 100, + "denied_path_globs": [], + "denied_content_patterns": [], + "allowed_gate_commands": ["ruff"], + "protected_branch_prefixes": [], + "allowed_pr_remotes": ["origin"], + "allowed_handoff_dirs": [str(tmp_path)], + } + }, + ) + repo = tmp_path / "repo" + repo.mkdir() + (repo / "pyproject.toml").write_text("[dependency-groups]\ndev = ['ruff']\n", encoding="utf-8") + state = _base_state(policy_path) + state["gate_commands"] = [["ruff", "check", "."]] + state["worktree_results"] = [{"repo": "demo", "worktree_path": str(repo)}] + + violations = validate_gate_commands_for_state(state) + + assert "prepared gate command not allowlisted: uv" in violations + + def test_policy_blocks_gate_command_before_execution(tmp_path: Path) -> None: policy_path = _write_policy(tmp_path / "policy.yml") state = _base_state(policy_path)