From eaa061fd2b69c2997cc5247d80710605f1820d05 Mon Sep 17 00:00:00 2001 From: csh Date: Fri, 29 May 2026 15:42:03 +0800 Subject: [PATCH] :sparkles: feat(main_loop): harden plan execution state Validate required Plan Meta before claims, record claim ownership, and persist verification evidence on completion. Document conditional skill triggers without adding them to automatic main-loop dispatch. --- scripts/main_loop.py | 209 ++++++++++++++++++-- templates/AGENT_RULES.template.md | 49 ++++- tests/test_main_loop_cli.py | 304 +++++++++++++++++++++++++++++- 3 files changed, 528 insertions(+), 34 deletions(-) diff --git a/scripts/main_loop.py b/scripts/main_loop.py index 084e4dc4..e8719987 100644 --- a/scripts/main_loop.py +++ b/scripts/main_loop.py @@ -1,8 +1,11 @@ #!/usr/bin/env python3 from contextlib import contextmanager +from datetime import datetime, timezone +import getpass import os import platform import re +import socket import sys import threading import time @@ -31,6 +34,22 @@ PLAN_LINE_RE = re.compile( ) FINISH_STATUSES = {"done", "blocked", "skipped"} ENV_BLOCKED_RE = re.compile(r"^env:([^:]+):(.+)$") +PLAN_META_REQUIRED_FIELDS = ( + "Plan Group", + "Parent Plan", + "Verification Scope", + "Verification Gate", +) +WORKFLOW_STATE_KEYS = { + "phase", + "spec", + "plan", + "executor", + "constraints", + "claimed_by", + "claimed_at", + "verification", +} WORKFLOW_PHASES = { "brainstorming", "planning", @@ -48,7 +67,8 @@ def usage() -> str: "Usage:\n" " python scripts/main_loop.py claim -plans -progress \n" " python scripts/main_loop.py finish -plan -status " - "-progress [-note ]\n" + "-progress [-note ] [-verified ]\n" + " python scripts/main_loop.py status -plans -progress \n" " python scripts/main_loop.py record -progress -phase " "[-spec ] [-plan ] [-executor ] " "[-constraints ]\n" @@ -63,6 +83,8 @@ def usage() -> str: " -executor NAME\n" " -constraints CSV\n" " -note TEXT\n" + " -owner NAME\n" + " -verified TEXT\n" " -h, -help Show this help.\n" ) @@ -98,6 +120,24 @@ def normalize_note(note: str) -> str: return note.replace("\n", " ").replace("\r", " ").replace("`", "'").strip() +def now_utc() -> str: + return datetime.now(timezone.utc).replace(microsecond=0).isoformat().replace( + "+00:00", "Z" + ) + + +def default_claim_owner() -> str: + owner = os.environ.get("PLAYBOOK_MAIN_LOOP_OWNER") + if owner: + return normalize_note(owner) + host = socket.gethostname() or "unknown-host" + try: + user = getpass.getuser() + except Exception: # pragma: no cover + user = "unknown-user" + return f"{user}@{host}:{os.getpid()}" + + def render_plan_line(plan_key: str, status: str, note: Optional[str]) -> str: checked = "x" if status == "done" else " " suffix = status @@ -188,18 +228,23 @@ def render_workflow_state_lines( plan: Optional[str] = None, executor: Optional[str] = None, constraints: Optional[str] = None, + claimed_by: Optional[str] = None, + claimed_at: Optional[str] = None, + verification: Optional[str] = None, ) -> list[str]: lines = [WORKFLOW_STATE_START] - if phase: - lines.append(f"phase: {phase}") - if spec: - lines.append(f"spec: {spec}") - if plan: - lines.append(f"plan: {plan}") - if executor: - lines.append(f"executor: {executor}") - if constraints: - lines.append(f"constraints: {constraints}") + for key, value in ( + ("phase", phase), + ("spec", spec), + ("plan", plan), + ("executor", executor), + ("constraints", constraints), + ("claimed_by", claimed_by), + ("claimed_at", claimed_at), + ("verification", verification), + ): + if value: + lines.append(f"{key}: {value}") lines.append(WORKFLOW_STATE_END) return lines @@ -213,7 +258,7 @@ def parse_workflow_state( if ": " not in line: continue key, value = line.split(": ", 1) - if key in {"phase", "spec", "plan", "executor", "constraints"}: + if key in WORKFLOW_STATE_KEYS: state[key] = value return state @@ -348,9 +393,15 @@ def update_workflow_state( plan: Optional[str] = None, executor: Optional[str] = None, constraints: Optional[str] = None, + claimed_by: Optional[str] = None, + claimed_at: Optional[str] = None, + verification: Optional[str] = None, + clear_keys: tuple[str, ...] = (), ) -> list[str]: lines, start_idx, end_idx = ensure_workflow_state_block(lines) state = parse_workflow_state(lines, start_idx, end_idx) + for key in clear_keys: + state.pop(key, None) if phase is not None: state["phase"] = phase if spec is not None: @@ -361,12 +412,21 @@ def update_workflow_state( state["executor"] = executor if constraints is not None: state["constraints"] = constraints + if claimed_by is not None: + state["claimed_by"] = claimed_by + if claimed_at is not None: + state["claimed_at"] = claimed_at + if verification is not None: + state["verification"] = verification lines[start_idx : end_idx + 1] = render_workflow_state_lines( state.get("phase"), state.get("spec"), state.get("plan"), state.get("executor"), state.get("constraints"), + state.get("claimed_by"), + state.get("claimed_at"), + state.get("verification"), ) return lines @@ -399,6 +459,27 @@ def filter_existing_entries( return [entry for entry in entries if entry[0] in available] +def validate_plan_meta(plan_path: Path) -> list[str]: + text = plan_path.read_text(encoding="utf-8") + missing: list[str] = [] + if not re.search(r"(?im)^##\s+Plan Meta\s*$", text): + missing.append("Plan Meta") + for field in PLAN_META_REQUIRED_FIELDS: + pattern = rf"(?im)^\s*[-*]\s+(?:\*\*)?{re.escape(field)}(?:\*\*)?\s*:" + if not re.search(pattern, text): + missing.append(field) + return missing + + +def validate_plan_files(plans_dir: Path, plan_keys: list[str]) -> Optional[str]: + for plan_key in plan_keys: + missing = validate_plan_meta(plans_dir / plan_key) + if missing: + fields = ", ".join(missing) + return f"ERROR: {plan_key} missing required Plan Meta fields: {fields}" + return None + + def choose_claim_entry( entries: list[tuple[str, str, Optional[str], int]], current_env: Optional[str], @@ -427,13 +508,18 @@ def choose_claim_entry( return None -def claim_plan(plans_dir: Path, progress_path: Path) -> tuple[int, str]: +def claim_plan( + plans_dir: Path, progress_path: Path, owner: Optional[str] = None +) -> tuple[int, str]: if not plans_dir.is_dir(): return 2, f"ERROR: plans dir not found: {plans_dir}" plan_keys = list_plan_files(plans_dir) if not plan_keys: return 2, "ERROR: no plan files found" + plan_error = validate_plan_files(plans_dir, plan_keys) + if plan_error: + return 2, plan_error with locked_progress(progress_path): lines = load_progress_lines(progress_path) @@ -458,6 +544,9 @@ def claim_plan(plans_dir: Path, progress_path: Path) -> tuple[int, str]: lines, phase="executing", plan=(plans_dir / plan_key).as_posix(), + claimed_by=normalize_note(owner) if owner else default_claim_owner(), + claimed_at=now_utc(), + clear_keys=("verification",), ) write_progress_lines(progress_path, lines) @@ -468,12 +557,18 @@ def claim_plan(plans_dir: Path, progress_path: Path) -> tuple[int, str]: def finish_plan( - plan: str, status: str, progress_path: Path, note: Optional[str] + plan: str, + status: str, + progress_path: Path, + note: Optional[str], + verified: Optional[str] = None, ) -> tuple[int, str]: if status not in FINISH_STATUSES: return 2, f"ERROR: invalid status: {status}" if not plan: return 2, "ERROR: plan is required" + if verified and status != "done": + return 2, "ERROR: -verified is only valid with -status done" plan_key = normalize_plan_key(plan) with locked_progress(progress_path): @@ -488,6 +583,15 @@ def finish_plan( entries = parse_entries(lines, start_idx, end_idx) rendered_note = normalize_note(note) if note else None + rendered_verified = normalize_note(verified) if verified else None + verification_clear_keys = () if rendered_verified else ("verification",) + if rendered_verified: + verified_note = f"verified: {rendered_verified}" + rendered_note = ( + f"{verified_note}; note: {rendered_note}" + if rendered_note + else verified_note + ) updated_line = render_plan_line(plan_key, status, rendered_note) workflow_phase = { "done": "done", @@ -502,6 +606,8 @@ def finish_plan( lines, phase=workflow_phase, plan=f"docs/superpowers/plans/{plan_key}", + verification=rendered_verified, + clear_keys=verification_clear_keys, ) write_progress_lines(progress_path, lines) return 0, updated_line @@ -511,11 +617,64 @@ def finish_plan( lines, phase=workflow_phase, plan=f"docs/superpowers/plans/{plan_key}", + verification=rendered_verified, + clear_keys=verification_clear_keys, ) write_progress_lines(progress_path, lines) return 0, updated_line +def status_report(plans_dir: Path, progress_path: Path) -> tuple[int, str]: + if not plans_dir.is_dir(): + return 2, f"ERROR: plans dir not found: {plans_dir}" + + plan_keys = list_plan_files(plans_dir) + lines = load_progress_lines(progress_path) + block = find_block(lines) + entries: list[tuple[str, str, Optional[str], int]] = [] + if block: + entries = filter_existing_entries(parse_entries(lines, *block), plan_keys) + + entry_by_plan = {plan_key: (status, note) for plan_key, status, note, _ in entries} + counts = {status: 0 for status in ("pending", "in-progress", "done", "blocked", "skipped")} + rows: list[str] = [] + for plan_key in plan_keys: + status, note = entry_by_plan.get(plan_key, ("pending", None)) + counts[status] += 1 + suffix = f": {note}" if note else "" + rows.append(f"PLAN {plan_key} {status}{suffix}") + + state: dict[str, str] = {} + workflow_block = find_named_block(lines, WORKFLOW_STATE_START, WORKFLOW_STATE_END) + if workflow_block: + state = parse_workflow_state(lines, *workflow_block) + + output = [ + "STATUS " + f"total={len(plan_keys)} " + f"pending={counts['pending']} " + f"in-progress={counts['in-progress']} " + f"done={counts['done']} " + f"blocked={counts['blocked']} " + f"skipped={counts['skipped']}" + ] + current_parts = [ + f"{key}={state[key]}" + for key in ( + "phase", + "plan", + "claimed_by", + "claimed_at", + "verification", + ) + if key in state + ] + if current_parts: + output.append("CURRENT " + " ".join(current_parts)) + output.extend(rows) + return 0, "\n".join(output) + + def record_workflow_state( progress_path: Path, phase: str, @@ -543,7 +702,7 @@ def main(argv: list[str]) -> int: return 0 mode = argv[0] - if mode not in {"claim", "finish", "record"}: + if mode not in {"claim", "finish", "record", "status"}: print(f"ERROR: unknown mode: {mode}", file=sys.stderr) print(usage(), file=sys.stderr) return 2 @@ -561,11 +720,26 @@ def main(argv: list[str]) -> int: if mode == "claim": plans = flags.get("-plans") progress = flags.get("-progress") + owner = flags.get("-owner") if not plans or not progress: print("ERROR: -plans and -progress are required", file=sys.stderr) print(usage(), file=sys.stderr) return 2 - code, message = claim_plan(Path(plans), Path(progress)) + code, message = claim_plan(Path(plans), Path(progress), owner) + if code != 0: + print(message, file=sys.stderr) + return code + print(message) + return 0 + + if mode == "status": + plans = flags.get("-plans") + progress = flags.get("-progress") + if not plans or not progress: + print("ERROR: -plans and -progress are required", file=sys.stderr) + print(usage(), file=sys.stderr) + return 2 + code, message = status_report(Path(plans), Path(progress)) if code != 0: print(message, file=sys.stderr) return code @@ -596,11 +770,12 @@ def main(argv: list[str]) -> int: status = flags.get("-status") progress = flags.get("-progress") note = flags.get("-note") + verified = flags.get("-verified") if not plan or not status or not progress: print("ERROR: -plan, -status, and -progress are required", file=sys.stderr) print(usage(), file=sys.stderr) return 2 - code, message = finish_plan(plan, status, Path(progress), note) + code, message = finish_plan(plan, status, Path(progress), note, verified) if code != 0: print(message, file=sys.stderr) return code diff --git a/templates/AGENT_RULES.template.md b/templates/AGENT_RULES.template.md index de38b9d3..c7d1a1a8 100644 --- a/templates/AGENT_RULES.template.md +++ b/templates/AGENT_RULES.template.md @@ -86,6 +86,22 @@ 再更新 `progress.md` 上半部分摘要,并按主循环收尾要求归档当前 Plan 变更 +### 条件触发 Skills + +以下 skill 是额外能力,只在满足触发条件时使用,不改变主流程顺序, +也不由 `main_loop.py` 自动调度: + +| Skill | 触发条件 | 负责范围 | +| ------------------ | -------------------------------------- | -------------------------------------- | +| `codebase-recon` | 架构、跨模块、重构、迁移或风险不明任务 | 代码库侦察、热点分析、影响面判断 | +| `brooks-audit` | 方案影响架构边界、模块职责或长期维护性 | 架构审查、结构风险识别 | +| `codebase-migrate` | 大规模迁移、多文件重构、API 替换 | 分批迁移、可审查 refactor、CI 验证节奏 | +| `brooks-review` | 代码类 Plan 完成后,归档前需要审查 | diff / PR 级代码审查 | +| `brooks-test` | 测试改动复杂,或需要确认测试质量 | 测试有效性、覆盖边界、断言质量审查 | +| `gitea-fix-ci` | Gitea Actions 失败 | 拉取 CI 日志、定位失败、形成修复计划 | +| `style-cleanup` | 代码实现后需要格式或 lint 收尾 | 格式化、lint cleanup,不改变语义 | +| `commit-message` | 需要提交或归档当前 Plan 改动 | commit message 生成与 staged diff 检查 | + ### Plan 要求 - `Plan Meta` 必填,位于 Plan 头部 `---` 之后、Task 1 之前 @@ -135,16 +151,20 @@ ```bash python {{PLAYBOOK_SCRIPTS}}/main_loop.py claim \ -plans docs/superpowers/plans \ - -progress memory-bank/progress.md + -progress memory-bank/progress.md \ + -owner "<当前session或agent标识>" ``` 该命令会在锁保护下串行完成三件事: - 自动识别当前环境:`windows`、`linux`、`darwin` +- 校验 Plan 文件包含必需 `Plan Meta` - 已有 `in-progress` 优先恢复 - 如无 `in-progress`,按 Plan 文件顺序选择第一个可执行 Plan: `pending` 或 `blocked: env:<当前环境>:...` - 将选中的 Plan 写成 `in-progress` +- 在 `workflow-state` 写入 `claimed_by`、`claimed_at`,并清理上一轮 + `verification` 这里的锁保护的是 `progress.md` 状态块更新,避免多个 session 同时读写时发生覆盖。 @@ -176,7 +196,8 @@ python {{PLAYBOOK_SCRIPTS}}/playbook.py \ python {{PLAYBOOK_SCRIPTS}}/main_loop.py finish \ -plan \ -status done \ - -progress memory-bank/progress.md + -progress memory-bank/progress.md \ + -verified "<本轮已通过的验证命令或证据>" ``` ```bash @@ -195,6 +216,17 @@ python {{PLAYBOOK_SCRIPTS}}/main_loop.py finish \ -note "<原因>" ``` +只读状态命令: + +```bash +python {{PLAYBOOK_SCRIPTS}}/main_loop.py status \ + -plans docs/superpowers/plans \ + -progress memory-bank/progress.md +``` + +`status` 只汇总 `pending`、`in-progress`、`done`、`blocked`、 +`skipped` 和当前 `workflow-state`,不得修改 `progress.md`。 + ### Plan 场景下的执行上下文与隔离策略 - 本节仅适用于通过主循环领取并执行 Plan 的场景 @@ -235,11 +267,14 @@ python {{PLAYBOOK_SCRIPTS}}/main_loop.py finish \ - Plan `done` 后必须完成当前 Plan 变更的归档/提交,然后才能继续领取下一个 Plan;归档方式由项目约定决定 - 收尾顺序: - 1. 完成 Plan 约定验证 - 2. 运行 `main_loop.py finish -status done` 写回状态 - 3. 必要时更新 `progress.md` 上半部分摘要和相关 memory - 4. 检查当前变更清单与差异 - 5. 只归档/提交当前 Plan 相关改动 + +1. 完成 Plan 约定验证 +2. 运行 `main_loop.py finish -status done -verified "<证据>"` + 写回状态 +3. 必要时更新 `progress.md` 上半部分摘要和相关 memory +4. 检查当前变更清单与差异 +5. 只归档/提交当前 Plan 相关改动 + - 当前 Plan 相关改动包括但不限于: - 本轮代码、配置、测试、模板改动 - 当前 Plan 文件(创建、补充、勾选 Task、记录结果等) diff --git a/tests/test_main_loop_cli.py b/tests/test_main_loop_cli.py index 0a27559d..f9bee94f 100644 --- a/tests/test_main_loop_cli.py +++ b/tests/test_main_loop_cli.py @@ -27,6 +27,26 @@ def run_cli(*args, cwd=None): ) +def valid_plan_text(title: str = "Demo Plan") -> str: + return "\n".join( + [ + f"# {title}", + "", + "## Plan Meta", + "", + "- **Plan Group**: `demo`", + "- **Parent Plan**: `none`", + "- **Verification Scope**: `unit`", + "- **Verification Gate**: `python -m unittest tests.test_main_loop_cli`", + "", + "## Tasks", + "", + "- [ ] Task 1: demo", + "", + ] + ) + + class MainLoopCliTests(unittest.TestCase): def _current_env(self) -> str: system = platform.system().lower() @@ -40,8 +60,12 @@ class MainLoopCliTests(unittest.TestCase): root = Path(tmp_dir) plans_dir = root / "docs" / "superpowers" / "plans" plans_dir.mkdir(parents=True) - (plans_dir / "2026-01-01-old.md").write_text("old", encoding="utf-8") - (plans_dir / "2026-01-02-new.md").write_text("new", encoding="utf-8") + (plans_dir / "2026-01-01-old.md").write_text( + valid_plan_text("old"), encoding="utf-8" + ) + (plans_dir / "2026-01-02-new.md").write_text( + valid_plan_text("new"), encoding="utf-8" + ) result = run_cli( "claim", @@ -74,7 +98,9 @@ class MainLoopCliTests(unittest.TestCase): root = Path(tmp_dir) plans_dir = root / "docs" / "superpowers" / "plans" plans_dir.mkdir(parents=True) - (plans_dir / "2026-01-01-demo.md").write_text("demo", encoding="utf-8") + (plans_dir / "2026-01-01-demo.md").write_text( + valid_plan_text(), encoding="utf-8" + ) progress = root / "memory-bank" / "progress.md" progress.parent.mkdir(parents=True) @@ -119,8 +145,12 @@ class MainLoopCliTests(unittest.TestCase): root = Path(tmp_dir) plans_dir = root / "docs" / "superpowers" / "plans" plans_dir.mkdir(parents=True) - (plans_dir / "2026-01-01-a.md").write_text("a", encoding="utf-8") - (plans_dir / "2026-01-02-b.md").write_text("b", encoding="utf-8") + (plans_dir / "2026-01-01-a.md").write_text( + valid_plan_text("a"), encoding="utf-8" + ) + (plans_dir / "2026-01-02-b.md").write_text( + valid_plan_text("b"), encoding="utf-8" + ) progress = root / "memory-bank" / "progress.md" progress.parent.mkdir(parents=True) @@ -159,7 +189,9 @@ class MainLoopCliTests(unittest.TestCase): root = Path(tmp_dir) plans_dir = root / "docs" / "superpowers" / "plans" plans_dir.mkdir(parents=True) - (plans_dir / "2026-01-02-live.md").write_text("live", encoding="utf-8") + (plans_dir / "2026-01-02-live.md").write_text( + valid_plan_text("live"), encoding="utf-8" + ) progress = root / "memory-bank" / "progress.md" progress.parent.mkdir(parents=True) @@ -202,7 +234,9 @@ class MainLoopCliTests(unittest.TestCase): root = Path(tmp_dir) plans_dir = root / "docs" / "superpowers" / "plans" plans_dir.mkdir(parents=True) - (plans_dir / "2026-01-05-env.md").write_text("env", encoding="utf-8") + (plans_dir / "2026-01-05-env.md").write_text( + valid_plan_text("env"), encoding="utf-8" + ) progress = root / "memory-bank" / "progress.md" progress.parent.mkdir(parents=True) @@ -250,9 +284,11 @@ class MainLoopCliTests(unittest.TestCase): root = Path(tmp_dir) plans_dir = root / "docs" / "superpowers" / "plans" plans_dir.mkdir(parents=True) - (plans_dir / "2026-01-01-env.md").write_text("env", encoding="utf-8") + (plans_dir / "2026-01-01-env.md").write_text( + valid_plan_text("env"), encoding="utf-8" + ) (plans_dir / "2026-01-02-pending.md").write_text( - "pending", encoding="utf-8" + valid_plan_text("pending"), encoding="utf-8" ) progress = root / "memory-bank" / "progress.md" @@ -294,6 +330,99 @@ class MainLoopCliTests(unittest.TestCase): ), ) + def test_claim_rejects_plan_missing_required_plan_meta(self): + with tempfile.TemporaryDirectory() as tmp_dir: + root = Path(tmp_dir) + plans_dir = root / "docs" / "superpowers" / "plans" + plans_dir.mkdir(parents=True) + (plans_dir / "2026-01-01-invalid.md").write_text( + "# Invalid Plan\n\n- [ ] Task 1: missing metadata\n", + encoding="utf-8", + ) + + result = run_cli( + "claim", + "-plans", + "docs/superpowers/plans", + "-progress", + "memory-bank/progress.md", + cwd=root, + ) + + self.assertEqual(result.returncode, 2) + self.assertIn("missing required Plan Meta", result.stderr) + self.assertIn("2026-01-01-invalid.md", result.stderr) + + def test_claim_records_claim_metadata_in_workflow_state(self): + with tempfile.TemporaryDirectory() as tmp_dir: + root = Path(tmp_dir) + plans_dir = root / "docs" / "superpowers" / "plans" + plans_dir.mkdir(parents=True) + (plans_dir / "2026-01-01-demo.md").write_text( + valid_plan_text(), encoding="utf-8" + ) + + result = run_cli( + "claim", + "-plans", + "docs/superpowers/plans", + "-progress", + "memory-bank/progress.md", + "-owner", + "codex-test", + cwd=root, + ) + + self.assertEqual(result.returncode, 0, msg=result.stderr) + text = (root / "memory-bank" / "progress.md").read_text( + encoding="utf-8" + ) + self.assertIn("claimed_by: codex-test", text) + self.assertRegex(text, r"claimed_at: \d{4}-\d{2}-\d{2}T") + + def test_claim_clears_stale_verification_from_workflow_state(self): + with tempfile.TemporaryDirectory() as tmp_dir: + root = Path(tmp_dir) + plans_dir = root / "docs" / "superpowers" / "plans" + plans_dir.mkdir(parents=True) + (plans_dir / "2026-01-01-demo.md").write_text( + valid_plan_text(), encoding="utf-8" + ) + + progress = root / "memory-bank" / "progress.md" + progress.parent.mkdir(parents=True) + progress.write_text( + "\n".join( + [ + "# 当前进展", + "", + "", + "phase: done", + "verification: old evidence", + "", + "", + "", + "- [ ] `2026-01-01-demo.md` pending", + "", + "", + ] + ), + encoding="utf-8", + ) + + result = run_cli( + "claim", + "-plans", + "docs/superpowers/plans", + "-progress", + "memory-bank/progress.md", + cwd=root, + ) + + self.assertEqual(result.returncode, 0, msg=result.stderr) + text = progress.read_text(encoding="utf-8") + self.assertNotIn("verification: old evidence", text) + def test_finish_updates_line(self): with tempfile.TemporaryDirectory() as tmp_dir: root = Path(tmp_dir) @@ -332,6 +461,98 @@ class MainLoopCliTests(unittest.TestCase): 1, ) + def test_finish_done_records_verification_evidence(self): + with tempfile.TemporaryDirectory() as tmp_dir: + root = Path(tmp_dir) + progress = root / "memory-bank" / "progress.md" + progress.parent.mkdir(parents=True) + progress.write_text( + "\n".join( + [ + "# Plan 状态", + "", + "", + "phase: executing", + "plan: docs/superpowers/plans/2026-01-03-demo.md", + "", + "", + "", + "- [ ] `2026-01-03-demo.md` in-progress", + "", + "", + ] + ), + encoding="utf-8", + ) + + result = run_cli( + "finish", + "-plan", + "docs/superpowers/plans/2026-01-03-demo.md", + "-status", + "done", + "-progress", + "memory-bank/progress.md", + "-verified", + "python -m unittest tests.test_main_loop_cli", + cwd=root, + ) + + self.assertEqual(result.returncode, 0, msg=result.stderr) + text = progress.read_text(encoding="utf-8") + self.assertIn( + "- [x] `2026-01-03-demo.md` done: " + "verified: python -m unittest tests.test_main_loop_cli", + text, + ) + self.assertIn( + "verification: python -m unittest tests.test_main_loop_cli", + text, + ) + + def test_finish_without_verified_clears_stale_verification(self): + with tempfile.TemporaryDirectory() as tmp_dir: + root = Path(tmp_dir) + progress = root / "memory-bank" / "progress.md" + progress.parent.mkdir(parents=True) + progress.write_text( + "\n".join( + [ + "# Plan 状态", + "", + "", + "phase: executing", + "plan: docs/superpowers/plans/2026-01-03-demo.md", + "verification: old evidence", + "", + "", + "", + "- [ ] `2026-01-03-demo.md` in-progress", + "", + "", + ] + ), + encoding="utf-8", + ) + + result = run_cli( + "finish", + "-plan", + "docs/superpowers/plans/2026-01-03-demo.md", + "-status", + "blocked", + "-progress", + "memory-bank/progress.md", + "-note", + "needs confirmation", + cwd=root, + ) + + self.assertEqual(result.returncode, 0, msg=result.stderr) + text = progress.read_text(encoding="utf-8") + self.assertNotIn("verification: old evidence", text) + self.assertIn("phase: blocked", text) + def test_finish_updates_workflow_phase_and_preserves_metadata(self): with tempfile.TemporaryDirectory() as tmp_dir: root = Path(tmp_dir) @@ -491,7 +712,9 @@ class MainLoopCliTests(unittest.TestCase): root = Path(tmp_dir) plans_dir = root / "docs" / "superpowers" / "plans" plans_dir.mkdir(parents=True) - (plans_dir / "2026-05-18-demo.md").write_text("demo", encoding="utf-8") + (plans_dir / "2026-05-18-demo.md").write_text( + valid_plan_text(), encoding="utf-8" + ) progress = root / "memory-bank" / "progress.md" progress.parent.mkdir(parents=True) @@ -561,6 +784,67 @@ class MainLoopCliTests(unittest.TestCase): ) self.assertIn("- [x] `2026-05-18-demo.md` done", text) + def test_status_reports_counts_and_current_workflow_state(self): + with tempfile.TemporaryDirectory() as tmp_dir: + root = Path(tmp_dir) + plans_dir = root / "docs" / "superpowers" / "plans" + plans_dir.mkdir(parents=True) + for plan_key in ( + "2026-01-01-a.md", + "2026-01-02-b.md", + "2026-01-03-c.md", + "2026-01-04-d.md", + "2026-01-05-e.md", + ): + (plans_dir / plan_key).write_text(valid_plan_text(), encoding="utf-8") + + progress = root / "memory-bank" / "progress.md" + progress.parent.mkdir(parents=True) + progress.write_text( + "\n".join( + [ + "# 当前进展", + "", + "", + "phase: executing", + "plan: docs/superpowers/plans/2026-01-02-b.md", + "claimed_by: codex-test", + "", + "", + "", + "- [ ] `2026-01-01-a.md` pending", + "- [ ] `2026-01-02-b.md` in-progress", + "- [x] `2026-01-03-c.md` done", + "- [ ] `2026-01-04-d.md` blocked: env:linux:Task2", + "- [ ] `2026-01-05-e.md` skipped: obsolete", + "", + "", + ] + ), + encoding="utf-8", + ) + + result = run_cli( + "status", + "-plans", + "docs/superpowers/plans", + "-progress", + "memory-bank/progress.md", + cwd=root, + ) + + self.assertEqual(result.returncode, 0, msg=result.stderr) + self.assertIn( + "STATUS total=5 pending=1 in-progress=1 done=1 blocked=1 skipped=1", + result.stdout, + ) + self.assertIn( + "CURRENT phase=executing " + "plan=docs/superpowers/plans/2026-01-02-b.md " + "claimed_by=codex-test", + result.stdout, + ) + def test_concurrent_record_preserves_spec_and_plan_metadata(self): with tempfile.TemporaryDirectory() as tmp_dir: root = Path(tmp_dir)