diff --git a/scripts/main_loop.py b/scripts/main_loop.py
index 084e4dc4..e8719987 100644
--- a/scripts/main_loop.py
+++ b/scripts/main_loop.py
@@ -1,8 +1,11 @@
#!/usr/bin/env python3
from contextlib import contextmanager
+from datetime import datetime, timezone
+import getpass
import os
import platform
import re
+import socket
import sys
import threading
import time
@@ -31,6 +34,22 @@ PLAN_LINE_RE = re.compile(
)
FINISH_STATUSES = {"done", "blocked", "skipped"}
ENV_BLOCKED_RE = re.compile(r"^env:([^:]+):(.+)$")
+PLAN_META_REQUIRED_FIELDS = (
+ "Plan Group",
+ "Parent Plan",
+ "Verification Scope",
+ "Verification Gate",
+)
+WORKFLOW_STATE_KEYS = {
+ "phase",
+ "spec",
+ "plan",
+ "executor",
+ "constraints",
+ "claimed_by",
+ "claimed_at",
+ "verification",
+}
WORKFLOW_PHASES = {
"brainstorming",
"planning",
@@ -48,7 +67,8 @@ def usage() -> str:
"Usage:\n"
" python scripts/main_loop.py claim -plans
-progress \n"
" python scripts/main_loop.py finish -plan -status "
- "-progress [-note ]\n"
+ "-progress [-note ] [-verified ]\n"
+ " python scripts/main_loop.py status -plans -progress \n"
" python scripts/main_loop.py record -progress -phase "
"[-spec ] [-plan ] [-executor ] "
"[-constraints ]\n"
@@ -63,6 +83,8 @@ def usage() -> str:
" -executor NAME\n"
" -constraints CSV\n"
" -note TEXT\n"
+ " -owner NAME\n"
+ " -verified TEXT\n"
" -h, -help Show this help.\n"
)
@@ -98,6 +120,24 @@ def normalize_note(note: str) -> str:
return note.replace("\n", " ").replace("\r", " ").replace("`", "'").strip()
+def now_utc() -> str:
+ return datetime.now(timezone.utc).replace(microsecond=0).isoformat().replace(
+ "+00:00", "Z"
+ )
+
+
+def default_claim_owner() -> str:
+ owner = os.environ.get("PLAYBOOK_MAIN_LOOP_OWNER")
+ if owner:
+ return normalize_note(owner)
+ host = socket.gethostname() or "unknown-host"
+ try:
+ user = getpass.getuser()
+ except Exception: # pragma: no cover
+ user = "unknown-user"
+ return f"{user}@{host}:{os.getpid()}"
+
+
def render_plan_line(plan_key: str, status: str, note: Optional[str]) -> str:
checked = "x" if status == "done" else " "
suffix = status
@@ -188,18 +228,23 @@ def render_workflow_state_lines(
plan: Optional[str] = None,
executor: Optional[str] = None,
constraints: Optional[str] = None,
+ claimed_by: Optional[str] = None,
+ claimed_at: Optional[str] = None,
+ verification: Optional[str] = None,
) -> list[str]:
lines = [WORKFLOW_STATE_START]
- if phase:
- lines.append(f"phase: {phase}")
- if spec:
- lines.append(f"spec: {spec}")
- if plan:
- lines.append(f"plan: {plan}")
- if executor:
- lines.append(f"executor: {executor}")
- if constraints:
- lines.append(f"constraints: {constraints}")
+ for key, value in (
+ ("phase", phase),
+ ("spec", spec),
+ ("plan", plan),
+ ("executor", executor),
+ ("constraints", constraints),
+ ("claimed_by", claimed_by),
+ ("claimed_at", claimed_at),
+ ("verification", verification),
+ ):
+ if value:
+ lines.append(f"{key}: {value}")
lines.append(WORKFLOW_STATE_END)
return lines
@@ -213,7 +258,7 @@ def parse_workflow_state(
if ": " not in line:
continue
key, value = line.split(": ", 1)
- if key in {"phase", "spec", "plan", "executor", "constraints"}:
+ if key in WORKFLOW_STATE_KEYS:
state[key] = value
return state
@@ -348,9 +393,15 @@ def update_workflow_state(
plan: Optional[str] = None,
executor: Optional[str] = None,
constraints: Optional[str] = None,
+ claimed_by: Optional[str] = None,
+ claimed_at: Optional[str] = None,
+ verification: Optional[str] = None,
+ clear_keys: tuple[str, ...] = (),
) -> list[str]:
lines, start_idx, end_idx = ensure_workflow_state_block(lines)
state = parse_workflow_state(lines, start_idx, end_idx)
+ for key in clear_keys:
+ state.pop(key, None)
if phase is not None:
state["phase"] = phase
if spec is not None:
@@ -361,12 +412,21 @@ def update_workflow_state(
state["executor"] = executor
if constraints is not None:
state["constraints"] = constraints
+ if claimed_by is not None:
+ state["claimed_by"] = claimed_by
+ if claimed_at is not None:
+ state["claimed_at"] = claimed_at
+ if verification is not None:
+ state["verification"] = verification
lines[start_idx : end_idx + 1] = render_workflow_state_lines(
state.get("phase"),
state.get("spec"),
state.get("plan"),
state.get("executor"),
state.get("constraints"),
+ state.get("claimed_by"),
+ state.get("claimed_at"),
+ state.get("verification"),
)
return lines
@@ -399,6 +459,27 @@ def filter_existing_entries(
return [entry for entry in entries if entry[0] in available]
+def validate_plan_meta(plan_path: Path) -> list[str]:
+ text = plan_path.read_text(encoding="utf-8")
+ missing: list[str] = []
+ if not re.search(r"(?im)^##\s+Plan Meta\s*$", text):
+ missing.append("Plan Meta")
+ for field in PLAN_META_REQUIRED_FIELDS:
+ pattern = rf"(?im)^\s*[-*]\s+(?:\*\*)?{re.escape(field)}(?:\*\*)?\s*:"
+ if not re.search(pattern, text):
+ missing.append(field)
+ return missing
+
+
+def validate_plan_files(plans_dir: Path, plan_keys: list[str]) -> Optional[str]:
+ for plan_key in plan_keys:
+ missing = validate_plan_meta(plans_dir / plan_key)
+ if missing:
+ fields = ", ".join(missing)
+ return f"ERROR: {plan_key} missing required Plan Meta fields: {fields}"
+ return None
+
+
def choose_claim_entry(
entries: list[tuple[str, str, Optional[str], int]],
current_env: Optional[str],
@@ -427,13 +508,18 @@ def choose_claim_entry(
return None
-def claim_plan(plans_dir: Path, progress_path: Path) -> tuple[int, str]:
+def claim_plan(
+ plans_dir: Path, progress_path: Path, owner: Optional[str] = None
+) -> tuple[int, str]:
if not plans_dir.is_dir():
return 2, f"ERROR: plans dir not found: {plans_dir}"
plan_keys = list_plan_files(plans_dir)
if not plan_keys:
return 2, "ERROR: no plan files found"
+ plan_error = validate_plan_files(plans_dir, plan_keys)
+ if plan_error:
+ return 2, plan_error
with locked_progress(progress_path):
lines = load_progress_lines(progress_path)
@@ -458,6 +544,9 @@ def claim_plan(plans_dir: Path, progress_path: Path) -> tuple[int, str]:
lines,
phase="executing",
plan=(plans_dir / plan_key).as_posix(),
+ claimed_by=normalize_note(owner) if owner else default_claim_owner(),
+ claimed_at=now_utc(),
+ clear_keys=("verification",),
)
write_progress_lines(progress_path, lines)
@@ -468,12 +557,18 @@ def claim_plan(plans_dir: Path, progress_path: Path) -> tuple[int, str]:
def finish_plan(
- plan: str, status: str, progress_path: Path, note: Optional[str]
+ plan: str,
+ status: str,
+ progress_path: Path,
+ note: Optional[str],
+ verified: Optional[str] = None,
) -> tuple[int, str]:
if status not in FINISH_STATUSES:
return 2, f"ERROR: invalid status: {status}"
if not plan:
return 2, "ERROR: plan is required"
+ if verified and status != "done":
+ return 2, "ERROR: -verified is only valid with -status done"
plan_key = normalize_plan_key(plan)
with locked_progress(progress_path):
@@ -488,6 +583,15 @@ def finish_plan(
entries = parse_entries(lines, start_idx, end_idx)
rendered_note = normalize_note(note) if note else None
+ rendered_verified = normalize_note(verified) if verified else None
+ verification_clear_keys = () if rendered_verified else ("verification",)
+ if rendered_verified:
+ verified_note = f"verified: {rendered_verified}"
+ rendered_note = (
+ f"{verified_note}; note: {rendered_note}"
+ if rendered_note
+ else verified_note
+ )
updated_line = render_plan_line(plan_key, status, rendered_note)
workflow_phase = {
"done": "done",
@@ -502,6 +606,8 @@ def finish_plan(
lines,
phase=workflow_phase,
plan=f"docs/superpowers/plans/{plan_key}",
+ verification=rendered_verified,
+ clear_keys=verification_clear_keys,
)
write_progress_lines(progress_path, lines)
return 0, updated_line
@@ -511,11 +617,64 @@ def finish_plan(
lines,
phase=workflow_phase,
plan=f"docs/superpowers/plans/{plan_key}",
+ verification=rendered_verified,
+ clear_keys=verification_clear_keys,
)
write_progress_lines(progress_path, lines)
return 0, updated_line
+def status_report(plans_dir: Path, progress_path: Path) -> tuple[int, str]:
+ if not plans_dir.is_dir():
+ return 2, f"ERROR: plans dir not found: {plans_dir}"
+
+ plan_keys = list_plan_files(plans_dir)
+ lines = load_progress_lines(progress_path)
+ block = find_block(lines)
+ entries: list[tuple[str, str, Optional[str], int]] = []
+ if block:
+ entries = filter_existing_entries(parse_entries(lines, *block), plan_keys)
+
+ entry_by_plan = {plan_key: (status, note) for plan_key, status, note, _ in entries}
+ counts = {status: 0 for status in ("pending", "in-progress", "done", "blocked", "skipped")}
+ rows: list[str] = []
+ for plan_key in plan_keys:
+ status, note = entry_by_plan.get(plan_key, ("pending", None))
+ counts[status] += 1
+ suffix = f": {note}" if note else ""
+ rows.append(f"PLAN {plan_key} {status}{suffix}")
+
+ state: dict[str, str] = {}
+ workflow_block = find_named_block(lines, WORKFLOW_STATE_START, WORKFLOW_STATE_END)
+ if workflow_block:
+ state = parse_workflow_state(lines, *workflow_block)
+
+ output = [
+ "STATUS "
+ f"total={len(plan_keys)} "
+ f"pending={counts['pending']} "
+ f"in-progress={counts['in-progress']} "
+ f"done={counts['done']} "
+ f"blocked={counts['blocked']} "
+ f"skipped={counts['skipped']}"
+ ]
+ current_parts = [
+ f"{key}={state[key]}"
+ for key in (
+ "phase",
+ "plan",
+ "claimed_by",
+ "claimed_at",
+ "verification",
+ )
+ if key in state
+ ]
+ if current_parts:
+ output.append("CURRENT " + " ".join(current_parts))
+ output.extend(rows)
+ return 0, "\n".join(output)
+
+
def record_workflow_state(
progress_path: Path,
phase: str,
@@ -543,7 +702,7 @@ def main(argv: list[str]) -> int:
return 0
mode = argv[0]
- if mode not in {"claim", "finish", "record"}:
+ if mode not in {"claim", "finish", "record", "status"}:
print(f"ERROR: unknown mode: {mode}", file=sys.stderr)
print(usage(), file=sys.stderr)
return 2
@@ -561,11 +720,26 @@ def main(argv: list[str]) -> int:
if mode == "claim":
plans = flags.get("-plans")
progress = flags.get("-progress")
+ owner = flags.get("-owner")
if not plans or not progress:
print("ERROR: -plans and -progress are required", file=sys.stderr)
print(usage(), file=sys.stderr)
return 2
- code, message = claim_plan(Path(plans), Path(progress))
+ code, message = claim_plan(Path(plans), Path(progress), owner)
+ if code != 0:
+ print(message, file=sys.stderr)
+ return code
+ print(message)
+ return 0
+
+ if mode == "status":
+ plans = flags.get("-plans")
+ progress = flags.get("-progress")
+ if not plans or not progress:
+ print("ERROR: -plans and -progress are required", file=sys.stderr)
+ print(usage(), file=sys.stderr)
+ return 2
+ code, message = status_report(Path(plans), Path(progress))
if code != 0:
print(message, file=sys.stderr)
return code
@@ -596,11 +770,12 @@ def main(argv: list[str]) -> int:
status = flags.get("-status")
progress = flags.get("-progress")
note = flags.get("-note")
+ verified = flags.get("-verified")
if not plan or not status or not progress:
print("ERROR: -plan, -status, and -progress are required", file=sys.stderr)
print(usage(), file=sys.stderr)
return 2
- code, message = finish_plan(plan, status, Path(progress), note)
+ code, message = finish_plan(plan, status, Path(progress), note, verified)
if code != 0:
print(message, file=sys.stderr)
return code
diff --git a/templates/AGENT_RULES.template.md b/templates/AGENT_RULES.template.md
index de38b9d3..c7d1a1a8 100644
--- a/templates/AGENT_RULES.template.md
+++ b/templates/AGENT_RULES.template.md
@@ -86,6 +86,22 @@
再更新 `progress.md` 上半部分摘要,并按主循环收尾要求归档当前
Plan 变更
+### 条件触发 Skills
+
+以下 skill 是额外能力,只在满足触发条件时使用,不改变主流程顺序,
+也不由 `main_loop.py` 自动调度:
+
+| Skill | 触发条件 | 负责范围 |
+| ------------------ | -------------------------------------- | -------------------------------------- |
+| `codebase-recon` | 架构、跨模块、重构、迁移或风险不明任务 | 代码库侦察、热点分析、影响面判断 |
+| `brooks-audit` | 方案影响架构边界、模块职责或长期维护性 | 架构审查、结构风险识别 |
+| `codebase-migrate` | 大规模迁移、多文件重构、API 替换 | 分批迁移、可审查 refactor、CI 验证节奏 |
+| `brooks-review` | 代码类 Plan 完成后,归档前需要审查 | diff / PR 级代码审查 |
+| `brooks-test` | 测试改动复杂,或需要确认测试质量 | 测试有效性、覆盖边界、断言质量审查 |
+| `gitea-fix-ci` | Gitea Actions 失败 | 拉取 CI 日志、定位失败、形成修复计划 |
+| `style-cleanup` | 代码实现后需要格式或 lint 收尾 | 格式化、lint cleanup,不改变语义 |
+| `commit-message` | 需要提交或归档当前 Plan 改动 | commit message 生成与 staged diff 检查 |
+
### Plan 要求
- `Plan Meta` 必填,位于 Plan 头部 `---` 之后、Task 1 之前
@@ -135,16 +151,20 @@
```bash
python {{PLAYBOOK_SCRIPTS}}/main_loop.py claim \
-plans docs/superpowers/plans \
- -progress memory-bank/progress.md
+ -progress memory-bank/progress.md \
+ -owner "<当前session或agent标识>"
```
该命令会在锁保护下串行完成三件事:
- 自动识别当前环境:`windows`、`linux`、`darwin`
+- 校验 Plan 文件包含必需 `Plan Meta`
- 已有 `in-progress` 优先恢复
- 如无 `in-progress`,按 Plan 文件顺序选择第一个可执行 Plan:
`pending` 或 `blocked: env:<当前环境>:...`
- 将选中的 Plan 写成 `in-progress`
+- 在 `workflow-state` 写入 `claimed_by`、`claimed_at`,并清理上一轮
+ `verification`
这里的锁保护的是 `progress.md` 状态块更新,避免多个 session
同时读写时发生覆盖。
@@ -176,7 +196,8 @@ python {{PLAYBOOK_SCRIPTS}}/playbook.py \
python {{PLAYBOOK_SCRIPTS}}/main_loop.py finish \
-plan \
-status done \
- -progress memory-bank/progress.md
+ -progress memory-bank/progress.md \
+ -verified "<本轮已通过的验证命令或证据>"
```
```bash
@@ -195,6 +216,17 @@ python {{PLAYBOOK_SCRIPTS}}/main_loop.py finish \
-note "<原因>"
```
+只读状态命令:
+
+```bash
+python {{PLAYBOOK_SCRIPTS}}/main_loop.py status \
+ -plans docs/superpowers/plans \
+ -progress memory-bank/progress.md
+```
+
+`status` 只汇总 `pending`、`in-progress`、`done`、`blocked`、
+`skipped` 和当前 `workflow-state`,不得修改 `progress.md`。
+
### Plan 场景下的执行上下文与隔离策略
- 本节仅适用于通过主循环领取并执行 Plan 的场景
@@ -235,11 +267,14 @@ python {{PLAYBOOK_SCRIPTS}}/main_loop.py finish \
- Plan `done` 后必须完成当前 Plan 变更的归档/提交,然后才能继续领取下一个
Plan;归档方式由项目约定决定
- 收尾顺序:
- 1. 完成 Plan 约定验证
- 2. 运行 `main_loop.py finish -status done` 写回状态
- 3. 必要时更新 `progress.md` 上半部分摘要和相关 memory
- 4. 检查当前变更清单与差异
- 5. 只归档/提交当前 Plan 相关改动
+
+1. 完成 Plan 约定验证
+2. 运行 `main_loop.py finish -status done -verified "<证据>"`
+ 写回状态
+3. 必要时更新 `progress.md` 上半部分摘要和相关 memory
+4. 检查当前变更清单与差异
+5. 只归档/提交当前 Plan 相关改动
+
- 当前 Plan 相关改动包括但不限于:
- 本轮代码、配置、测试、模板改动
- 当前 Plan 文件(创建、补充、勾选 Task、记录结果等)
diff --git a/tests/test_main_loop_cli.py b/tests/test_main_loop_cli.py
index 0a27559d..f9bee94f 100644
--- a/tests/test_main_loop_cli.py
+++ b/tests/test_main_loop_cli.py
@@ -27,6 +27,26 @@ def run_cli(*args, cwd=None):
)
+def valid_plan_text(title: str = "Demo Plan") -> str:
+ return "\n".join(
+ [
+ f"# {title}",
+ "",
+ "## Plan Meta",
+ "",
+ "- **Plan Group**: `demo`",
+ "- **Parent Plan**: `none`",
+ "- **Verification Scope**: `unit`",
+ "- **Verification Gate**: `python -m unittest tests.test_main_loop_cli`",
+ "",
+ "## Tasks",
+ "",
+ "- [ ] Task 1: demo",
+ "",
+ ]
+ )
+
+
class MainLoopCliTests(unittest.TestCase):
def _current_env(self) -> str:
system = platform.system().lower()
@@ -40,8 +60,12 @@ class MainLoopCliTests(unittest.TestCase):
root = Path(tmp_dir)
plans_dir = root / "docs" / "superpowers" / "plans"
plans_dir.mkdir(parents=True)
- (plans_dir / "2026-01-01-old.md").write_text("old", encoding="utf-8")
- (plans_dir / "2026-01-02-new.md").write_text("new", encoding="utf-8")
+ (plans_dir / "2026-01-01-old.md").write_text(
+ valid_plan_text("old"), encoding="utf-8"
+ )
+ (plans_dir / "2026-01-02-new.md").write_text(
+ valid_plan_text("new"), encoding="utf-8"
+ )
result = run_cli(
"claim",
@@ -74,7 +98,9 @@ class MainLoopCliTests(unittest.TestCase):
root = Path(tmp_dir)
plans_dir = root / "docs" / "superpowers" / "plans"
plans_dir.mkdir(parents=True)
- (plans_dir / "2026-01-01-demo.md").write_text("demo", encoding="utf-8")
+ (plans_dir / "2026-01-01-demo.md").write_text(
+ valid_plan_text(), encoding="utf-8"
+ )
progress = root / "memory-bank" / "progress.md"
progress.parent.mkdir(parents=True)
@@ -119,8 +145,12 @@ class MainLoopCliTests(unittest.TestCase):
root = Path(tmp_dir)
plans_dir = root / "docs" / "superpowers" / "plans"
plans_dir.mkdir(parents=True)
- (plans_dir / "2026-01-01-a.md").write_text("a", encoding="utf-8")
- (plans_dir / "2026-01-02-b.md").write_text("b", encoding="utf-8")
+ (plans_dir / "2026-01-01-a.md").write_text(
+ valid_plan_text("a"), encoding="utf-8"
+ )
+ (plans_dir / "2026-01-02-b.md").write_text(
+ valid_plan_text("b"), encoding="utf-8"
+ )
progress = root / "memory-bank" / "progress.md"
progress.parent.mkdir(parents=True)
@@ -159,7 +189,9 @@ class MainLoopCliTests(unittest.TestCase):
root = Path(tmp_dir)
plans_dir = root / "docs" / "superpowers" / "plans"
plans_dir.mkdir(parents=True)
- (plans_dir / "2026-01-02-live.md").write_text("live", encoding="utf-8")
+ (plans_dir / "2026-01-02-live.md").write_text(
+ valid_plan_text("live"), encoding="utf-8"
+ )
progress = root / "memory-bank" / "progress.md"
progress.parent.mkdir(parents=True)
@@ -202,7 +234,9 @@ class MainLoopCliTests(unittest.TestCase):
root = Path(tmp_dir)
plans_dir = root / "docs" / "superpowers" / "plans"
plans_dir.mkdir(parents=True)
- (plans_dir / "2026-01-05-env.md").write_text("env", encoding="utf-8")
+ (plans_dir / "2026-01-05-env.md").write_text(
+ valid_plan_text("env"), encoding="utf-8"
+ )
progress = root / "memory-bank" / "progress.md"
progress.parent.mkdir(parents=True)
@@ -250,9 +284,11 @@ class MainLoopCliTests(unittest.TestCase):
root = Path(tmp_dir)
plans_dir = root / "docs" / "superpowers" / "plans"
plans_dir.mkdir(parents=True)
- (plans_dir / "2026-01-01-env.md").write_text("env", encoding="utf-8")
+ (plans_dir / "2026-01-01-env.md").write_text(
+ valid_plan_text("env"), encoding="utf-8"
+ )
(plans_dir / "2026-01-02-pending.md").write_text(
- "pending", encoding="utf-8"
+ valid_plan_text("pending"), encoding="utf-8"
)
progress = root / "memory-bank" / "progress.md"
@@ -294,6 +330,99 @@ class MainLoopCliTests(unittest.TestCase):
),
)
+ def test_claim_rejects_plan_missing_required_plan_meta(self):
+ with tempfile.TemporaryDirectory() as tmp_dir:
+ root = Path(tmp_dir)
+ plans_dir = root / "docs" / "superpowers" / "plans"
+ plans_dir.mkdir(parents=True)
+ (plans_dir / "2026-01-01-invalid.md").write_text(
+ "# Invalid Plan\n\n- [ ] Task 1: missing metadata\n",
+ encoding="utf-8",
+ )
+
+ result = run_cli(
+ "claim",
+ "-plans",
+ "docs/superpowers/plans",
+ "-progress",
+ "memory-bank/progress.md",
+ cwd=root,
+ )
+
+ self.assertEqual(result.returncode, 2)
+ self.assertIn("missing required Plan Meta", result.stderr)
+ self.assertIn("2026-01-01-invalid.md", result.stderr)
+
+ def test_claim_records_claim_metadata_in_workflow_state(self):
+ with tempfile.TemporaryDirectory() as tmp_dir:
+ root = Path(tmp_dir)
+ plans_dir = root / "docs" / "superpowers" / "plans"
+ plans_dir.mkdir(parents=True)
+ (plans_dir / "2026-01-01-demo.md").write_text(
+ valid_plan_text(), encoding="utf-8"
+ )
+
+ result = run_cli(
+ "claim",
+ "-plans",
+ "docs/superpowers/plans",
+ "-progress",
+ "memory-bank/progress.md",
+ "-owner",
+ "codex-test",
+ cwd=root,
+ )
+
+ self.assertEqual(result.returncode, 0, msg=result.stderr)
+ text = (root / "memory-bank" / "progress.md").read_text(
+ encoding="utf-8"
+ )
+ self.assertIn("claimed_by: codex-test", text)
+ self.assertRegex(text, r"claimed_at: \d{4}-\d{2}-\d{2}T")
+
+ def test_claim_clears_stale_verification_from_workflow_state(self):
+ with tempfile.TemporaryDirectory() as tmp_dir:
+ root = Path(tmp_dir)
+ plans_dir = root / "docs" / "superpowers" / "plans"
+ plans_dir.mkdir(parents=True)
+ (plans_dir / "2026-01-01-demo.md").write_text(
+ valid_plan_text(), encoding="utf-8"
+ )
+
+ progress = root / "memory-bank" / "progress.md"
+ progress.parent.mkdir(parents=True)
+ progress.write_text(
+ "\n".join(
+ [
+ "# 当前进展",
+ "",
+ "",
+ "phase: done",
+ "verification: old evidence",
+ "",
+ "",
+ "",
+ "- [ ] `2026-01-01-demo.md` pending",
+ "",
+ "",
+ ]
+ ),
+ encoding="utf-8",
+ )
+
+ result = run_cli(
+ "claim",
+ "-plans",
+ "docs/superpowers/plans",
+ "-progress",
+ "memory-bank/progress.md",
+ cwd=root,
+ )
+
+ self.assertEqual(result.returncode, 0, msg=result.stderr)
+ text = progress.read_text(encoding="utf-8")
+ self.assertNotIn("verification: old evidence", text)
+
def test_finish_updates_line(self):
with tempfile.TemporaryDirectory() as tmp_dir:
root = Path(tmp_dir)
@@ -332,6 +461,98 @@ class MainLoopCliTests(unittest.TestCase):
1,
)
+ def test_finish_done_records_verification_evidence(self):
+ with tempfile.TemporaryDirectory() as tmp_dir:
+ root = Path(tmp_dir)
+ progress = root / "memory-bank" / "progress.md"
+ progress.parent.mkdir(parents=True)
+ progress.write_text(
+ "\n".join(
+ [
+ "# Plan 状态",
+ "",
+ "",
+ "phase: executing",
+ "plan: docs/superpowers/plans/2026-01-03-demo.md",
+ "",
+ "",
+ "",
+ "- [ ] `2026-01-03-demo.md` in-progress",
+ "",
+ "",
+ ]
+ ),
+ encoding="utf-8",
+ )
+
+ result = run_cli(
+ "finish",
+ "-plan",
+ "docs/superpowers/plans/2026-01-03-demo.md",
+ "-status",
+ "done",
+ "-progress",
+ "memory-bank/progress.md",
+ "-verified",
+ "python -m unittest tests.test_main_loop_cli",
+ cwd=root,
+ )
+
+ self.assertEqual(result.returncode, 0, msg=result.stderr)
+ text = progress.read_text(encoding="utf-8")
+ self.assertIn(
+ "- [x] `2026-01-03-demo.md` done: "
+ "verified: python -m unittest tests.test_main_loop_cli",
+ text,
+ )
+ self.assertIn(
+ "verification: python -m unittest tests.test_main_loop_cli",
+ text,
+ )
+
+ def test_finish_without_verified_clears_stale_verification(self):
+ with tempfile.TemporaryDirectory() as tmp_dir:
+ root = Path(tmp_dir)
+ progress = root / "memory-bank" / "progress.md"
+ progress.parent.mkdir(parents=True)
+ progress.write_text(
+ "\n".join(
+ [
+ "# Plan 状态",
+ "",
+ "",
+ "phase: executing",
+ "plan: docs/superpowers/plans/2026-01-03-demo.md",
+ "verification: old evidence",
+ "",
+ "",
+ "",
+ "- [ ] `2026-01-03-demo.md` in-progress",
+ "",
+ "",
+ ]
+ ),
+ encoding="utf-8",
+ )
+
+ result = run_cli(
+ "finish",
+ "-plan",
+ "docs/superpowers/plans/2026-01-03-demo.md",
+ "-status",
+ "blocked",
+ "-progress",
+ "memory-bank/progress.md",
+ "-note",
+ "needs confirmation",
+ cwd=root,
+ )
+
+ self.assertEqual(result.returncode, 0, msg=result.stderr)
+ text = progress.read_text(encoding="utf-8")
+ self.assertNotIn("verification: old evidence", text)
+ self.assertIn("phase: blocked", text)
+
def test_finish_updates_workflow_phase_and_preserves_metadata(self):
with tempfile.TemporaryDirectory() as tmp_dir:
root = Path(tmp_dir)
@@ -491,7 +712,9 @@ class MainLoopCliTests(unittest.TestCase):
root = Path(tmp_dir)
plans_dir = root / "docs" / "superpowers" / "plans"
plans_dir.mkdir(parents=True)
- (plans_dir / "2026-05-18-demo.md").write_text("demo", encoding="utf-8")
+ (plans_dir / "2026-05-18-demo.md").write_text(
+ valid_plan_text(), encoding="utf-8"
+ )
progress = root / "memory-bank" / "progress.md"
progress.parent.mkdir(parents=True)
@@ -561,6 +784,67 @@ class MainLoopCliTests(unittest.TestCase):
)
self.assertIn("- [x] `2026-05-18-demo.md` done", text)
+ def test_status_reports_counts_and_current_workflow_state(self):
+ with tempfile.TemporaryDirectory() as tmp_dir:
+ root = Path(tmp_dir)
+ plans_dir = root / "docs" / "superpowers" / "plans"
+ plans_dir.mkdir(parents=True)
+ for plan_key in (
+ "2026-01-01-a.md",
+ "2026-01-02-b.md",
+ "2026-01-03-c.md",
+ "2026-01-04-d.md",
+ "2026-01-05-e.md",
+ ):
+ (plans_dir / plan_key).write_text(valid_plan_text(), encoding="utf-8")
+
+ progress = root / "memory-bank" / "progress.md"
+ progress.parent.mkdir(parents=True)
+ progress.write_text(
+ "\n".join(
+ [
+ "# 当前进展",
+ "",
+ "",
+ "phase: executing",
+ "plan: docs/superpowers/plans/2026-01-02-b.md",
+ "claimed_by: codex-test",
+ "",
+ "",
+ "",
+ "- [ ] `2026-01-01-a.md` pending",
+ "- [ ] `2026-01-02-b.md` in-progress",
+ "- [x] `2026-01-03-c.md` done",
+ "- [ ] `2026-01-04-d.md` blocked: env:linux:Task2",
+ "- [ ] `2026-01-05-e.md` skipped: obsolete",
+ "",
+ "",
+ ]
+ ),
+ encoding="utf-8",
+ )
+
+ result = run_cli(
+ "status",
+ "-plans",
+ "docs/superpowers/plans",
+ "-progress",
+ "memory-bank/progress.md",
+ cwd=root,
+ )
+
+ self.assertEqual(result.returncode, 0, msg=result.stderr)
+ self.assertIn(
+ "STATUS total=5 pending=1 in-progress=1 done=1 blocked=1 skipped=1",
+ result.stdout,
+ )
+ self.assertIn(
+ "CURRENT phase=executing "
+ "plan=docs/superpowers/plans/2026-01-02-b.md "
+ "claimed_by=codex-test",
+ result.stdout,
+ )
+
def test_concurrent_record_preserves_spec_and_plan_metadata(self):
with tempfile.TemporaryDirectory() as tmp_dir:
root = Path(tmp_dir)