feat(main_loop): harden plan execution state

Validate required Plan Meta before claims, record claim ownership, and persist verification evidence on completion.

Document conditional skill triggers without adding them to automatic main-loop dispatch.
This commit is contained in:
csh 2026-05-29 15:42:03 +08:00
parent 3023aef8a0
commit eaa061fd2b
3 changed files with 528 additions and 34 deletions

View File

@ -1,8 +1,11 @@
#!/usr/bin/env python3
from contextlib import contextmanager
from datetime import datetime, timezone
import getpass
import os
import platform
import re
import socket
import sys
import threading
import time
@ -31,6 +34,22 @@ PLAN_LINE_RE = re.compile(
)
FINISH_STATUSES = {"done", "blocked", "skipped"}
ENV_BLOCKED_RE = re.compile(r"^env:([^:]+):(.+)$")
PLAN_META_REQUIRED_FIELDS = (
"Plan Group",
"Parent Plan",
"Verification Scope",
"Verification Gate",
)
WORKFLOW_STATE_KEYS = {
"phase",
"spec",
"plan",
"executor",
"constraints",
"claimed_by",
"claimed_at",
"verification",
}
WORKFLOW_PHASES = {
"brainstorming",
"planning",
@ -48,7 +67,8 @@ def usage() -> str:
"Usage:\n"
" python scripts/main_loop.py claim -plans <dir> -progress <file>\n"
" python scripts/main_loop.py finish -plan <path> -status <status> "
"-progress <file> [-note <text>]\n"
"-progress <file> [-note <text>] [-verified <text>]\n"
" python scripts/main_loop.py status -plans <dir> -progress <file>\n"
" python scripts/main_loop.py record -progress <file> -phase <phase> "
"[-spec <path>] [-plan <path>] [-executor <name>] "
"[-constraints <csv>]\n"
@ -63,6 +83,8 @@ def usage() -> str:
" -executor NAME\n"
" -constraints CSV\n"
" -note TEXT\n"
" -owner NAME\n"
" -verified TEXT\n"
" -h, -help Show this help.\n"
)
@ -98,6 +120,24 @@ def normalize_note(note: str) -> str:
return note.replace("\n", " ").replace("\r", " ").replace("`", "'").strip()
def now_utc() -> str:
return datetime.now(timezone.utc).replace(microsecond=0).isoformat().replace(
"+00:00", "Z"
)
def default_claim_owner() -> str:
owner = os.environ.get("PLAYBOOK_MAIN_LOOP_OWNER")
if owner:
return normalize_note(owner)
host = socket.gethostname() or "unknown-host"
try:
user = getpass.getuser()
except Exception: # pragma: no cover
user = "unknown-user"
return f"{user}@{host}:{os.getpid()}"
def render_plan_line(plan_key: str, status: str, note: Optional[str]) -> str:
checked = "x" if status == "done" else " "
suffix = status
@ -188,18 +228,23 @@ def render_workflow_state_lines(
plan: Optional[str] = None,
executor: Optional[str] = None,
constraints: Optional[str] = None,
claimed_by: Optional[str] = None,
claimed_at: Optional[str] = None,
verification: Optional[str] = None,
) -> list[str]:
lines = [WORKFLOW_STATE_START]
if phase:
lines.append(f"phase: {phase}")
if spec:
lines.append(f"spec: {spec}")
if plan:
lines.append(f"plan: {plan}")
if executor:
lines.append(f"executor: {executor}")
if constraints:
lines.append(f"constraints: {constraints}")
for key, value in (
("phase", phase),
("spec", spec),
("plan", plan),
("executor", executor),
("constraints", constraints),
("claimed_by", claimed_by),
("claimed_at", claimed_at),
("verification", verification),
):
if value:
lines.append(f"{key}: {value}")
lines.append(WORKFLOW_STATE_END)
return lines
@ -213,7 +258,7 @@ def parse_workflow_state(
if ": " not in line:
continue
key, value = line.split(": ", 1)
if key in {"phase", "spec", "plan", "executor", "constraints"}:
if key in WORKFLOW_STATE_KEYS:
state[key] = value
return state
@ -348,9 +393,15 @@ def update_workflow_state(
plan: Optional[str] = None,
executor: Optional[str] = None,
constraints: Optional[str] = None,
claimed_by: Optional[str] = None,
claimed_at: Optional[str] = None,
verification: Optional[str] = None,
clear_keys: tuple[str, ...] = (),
) -> list[str]:
lines, start_idx, end_idx = ensure_workflow_state_block(lines)
state = parse_workflow_state(lines, start_idx, end_idx)
for key in clear_keys:
state.pop(key, None)
if phase is not None:
state["phase"] = phase
if spec is not None:
@ -361,12 +412,21 @@ def update_workflow_state(
state["executor"] = executor
if constraints is not None:
state["constraints"] = constraints
if claimed_by is not None:
state["claimed_by"] = claimed_by
if claimed_at is not None:
state["claimed_at"] = claimed_at
if verification is not None:
state["verification"] = verification
lines[start_idx : end_idx + 1] = render_workflow_state_lines(
state.get("phase"),
state.get("spec"),
state.get("plan"),
state.get("executor"),
state.get("constraints"),
state.get("claimed_by"),
state.get("claimed_at"),
state.get("verification"),
)
return lines
@ -399,6 +459,27 @@ def filter_existing_entries(
return [entry for entry in entries if entry[0] in available]
def validate_plan_meta(plan_path: Path) -> list[str]:
text = plan_path.read_text(encoding="utf-8")
missing: list[str] = []
if not re.search(r"(?im)^##\s+Plan Meta\s*$", text):
missing.append("Plan Meta")
for field in PLAN_META_REQUIRED_FIELDS:
pattern = rf"(?im)^\s*[-*]\s+(?:\*\*)?{re.escape(field)}(?:\*\*)?\s*:"
if not re.search(pattern, text):
missing.append(field)
return missing
def validate_plan_files(plans_dir: Path, plan_keys: list[str]) -> Optional[str]:
for plan_key in plan_keys:
missing = validate_plan_meta(plans_dir / plan_key)
if missing:
fields = ", ".join(missing)
return f"ERROR: {plan_key} missing required Plan Meta fields: {fields}"
return None
def choose_claim_entry(
entries: list[tuple[str, str, Optional[str], int]],
current_env: Optional[str],
@ -427,13 +508,18 @@ def choose_claim_entry(
return None
def claim_plan(plans_dir: Path, progress_path: Path) -> tuple[int, str]:
def claim_plan(
plans_dir: Path, progress_path: Path, owner: Optional[str] = None
) -> tuple[int, str]:
if not plans_dir.is_dir():
return 2, f"ERROR: plans dir not found: {plans_dir}"
plan_keys = list_plan_files(plans_dir)
if not plan_keys:
return 2, "ERROR: no plan files found"
plan_error = validate_plan_files(plans_dir, plan_keys)
if plan_error:
return 2, plan_error
with locked_progress(progress_path):
lines = load_progress_lines(progress_path)
@ -458,6 +544,9 @@ def claim_plan(plans_dir: Path, progress_path: Path) -> tuple[int, str]:
lines,
phase="executing",
plan=(plans_dir / plan_key).as_posix(),
claimed_by=normalize_note(owner) if owner else default_claim_owner(),
claimed_at=now_utc(),
clear_keys=("verification",),
)
write_progress_lines(progress_path, lines)
@ -468,12 +557,18 @@ def claim_plan(plans_dir: Path, progress_path: Path) -> tuple[int, str]:
def finish_plan(
plan: str, status: str, progress_path: Path, note: Optional[str]
plan: str,
status: str,
progress_path: Path,
note: Optional[str],
verified: Optional[str] = None,
) -> tuple[int, str]:
if status not in FINISH_STATUSES:
return 2, f"ERROR: invalid status: {status}"
if not plan:
return 2, "ERROR: plan is required"
if verified and status != "done":
return 2, "ERROR: -verified is only valid with -status done"
plan_key = normalize_plan_key(plan)
with locked_progress(progress_path):
@ -488,6 +583,15 @@ def finish_plan(
entries = parse_entries(lines, start_idx, end_idx)
rendered_note = normalize_note(note) if note else None
rendered_verified = normalize_note(verified) if verified else None
verification_clear_keys = () if rendered_verified else ("verification",)
if rendered_verified:
verified_note = f"verified: {rendered_verified}"
rendered_note = (
f"{verified_note}; note: {rendered_note}"
if rendered_note
else verified_note
)
updated_line = render_plan_line(plan_key, status, rendered_note)
workflow_phase = {
"done": "done",
@ -502,6 +606,8 @@ def finish_plan(
lines,
phase=workflow_phase,
plan=f"docs/superpowers/plans/{plan_key}",
verification=rendered_verified,
clear_keys=verification_clear_keys,
)
write_progress_lines(progress_path, lines)
return 0, updated_line
@ -511,11 +617,64 @@ def finish_plan(
lines,
phase=workflow_phase,
plan=f"docs/superpowers/plans/{plan_key}",
verification=rendered_verified,
clear_keys=verification_clear_keys,
)
write_progress_lines(progress_path, lines)
return 0, updated_line
def status_report(plans_dir: Path, progress_path: Path) -> tuple[int, str]:
if not plans_dir.is_dir():
return 2, f"ERROR: plans dir not found: {plans_dir}"
plan_keys = list_plan_files(plans_dir)
lines = load_progress_lines(progress_path)
block = find_block(lines)
entries: list[tuple[str, str, Optional[str], int]] = []
if block:
entries = filter_existing_entries(parse_entries(lines, *block), plan_keys)
entry_by_plan = {plan_key: (status, note) for plan_key, status, note, _ in entries}
counts = {status: 0 for status in ("pending", "in-progress", "done", "blocked", "skipped")}
rows: list[str] = []
for plan_key in plan_keys:
status, note = entry_by_plan.get(plan_key, ("pending", None))
counts[status] += 1
suffix = f": {note}" if note else ""
rows.append(f"PLAN {plan_key} {status}{suffix}")
state: dict[str, str] = {}
workflow_block = find_named_block(lines, WORKFLOW_STATE_START, WORKFLOW_STATE_END)
if workflow_block:
state = parse_workflow_state(lines, *workflow_block)
output = [
"STATUS "
f"total={len(plan_keys)} "
f"pending={counts['pending']} "
f"in-progress={counts['in-progress']} "
f"done={counts['done']} "
f"blocked={counts['blocked']} "
f"skipped={counts['skipped']}"
]
current_parts = [
f"{key}={state[key]}"
for key in (
"phase",
"plan",
"claimed_by",
"claimed_at",
"verification",
)
if key in state
]
if current_parts:
output.append("CURRENT " + " ".join(current_parts))
output.extend(rows)
return 0, "\n".join(output)
def record_workflow_state(
progress_path: Path,
phase: str,
@ -543,7 +702,7 @@ def main(argv: list[str]) -> int:
return 0
mode = argv[0]
if mode not in {"claim", "finish", "record"}:
if mode not in {"claim", "finish", "record", "status"}:
print(f"ERROR: unknown mode: {mode}", file=sys.stderr)
print(usage(), file=sys.stderr)
return 2
@ -561,11 +720,26 @@ def main(argv: list[str]) -> int:
if mode == "claim":
plans = flags.get("-plans")
progress = flags.get("-progress")
owner = flags.get("-owner")
if not plans or not progress:
print("ERROR: -plans and -progress are required", file=sys.stderr)
print(usage(), file=sys.stderr)
return 2
code, message = claim_plan(Path(plans), Path(progress))
code, message = claim_plan(Path(plans), Path(progress), owner)
if code != 0:
print(message, file=sys.stderr)
return code
print(message)
return 0
if mode == "status":
plans = flags.get("-plans")
progress = flags.get("-progress")
if not plans or not progress:
print("ERROR: -plans and -progress are required", file=sys.stderr)
print(usage(), file=sys.stderr)
return 2
code, message = status_report(Path(plans), Path(progress))
if code != 0:
print(message, file=sys.stderr)
return code
@ -596,11 +770,12 @@ def main(argv: list[str]) -> int:
status = flags.get("-status")
progress = flags.get("-progress")
note = flags.get("-note")
verified = flags.get("-verified")
if not plan or not status or not progress:
print("ERROR: -plan, -status, and -progress are required", file=sys.stderr)
print(usage(), file=sys.stderr)
return 2
code, message = finish_plan(plan, status, Path(progress), note)
code, message = finish_plan(plan, status, Path(progress), note, verified)
if code != 0:
print(message, file=sys.stderr)
return code

View File

@ -86,6 +86,22 @@
再更新 `progress.md` 上半部分摘要,并按主循环收尾要求归档当前
Plan 变更
### 条件触发 Skills
以下 skill 是额外能力,只在满足触发条件时使用,不改变主流程顺序,
也不由 `main_loop.py` 自动调度:
| Skill | 触发条件 | 负责范围 |
| ------------------ | -------------------------------------- | -------------------------------------- |
| `codebase-recon` | 架构、跨模块、重构、迁移或风险不明任务 | 代码库侦察、热点分析、影响面判断 |
| `brooks-audit` | 方案影响架构边界、模块职责或长期维护性 | 架构审查、结构风险识别 |
| `codebase-migrate` | 大规模迁移、多文件重构、API 替换 | 分批迁移、可审查 refactor、CI 验证节奏 |
| `brooks-review` | 代码类 Plan 完成后,归档前需要审查 | diff / PR 级代码审查 |
| `brooks-test` | 测试改动复杂,或需要确认测试质量 | 测试有效性、覆盖边界、断言质量审查 |
| `gitea-fix-ci` | Gitea Actions 失败 | 拉取 CI 日志、定位失败、形成修复计划 |
| `style-cleanup` | 代码实现后需要格式或 lint 收尾 | 格式化、lint cleanup不改变语义 |
| `commit-message` | 需要提交或归档当前 Plan 改动 | commit message 生成与 staged diff 检查 |
### Plan 要求
- `Plan Meta` 必填,位于 Plan 头部 `---` 之后、Task 1 之前
@ -135,16 +151,20 @@
```bash
python {{PLAYBOOK_SCRIPTS}}/main_loop.py claim \
-plans docs/superpowers/plans \
-progress memory-bank/progress.md
-progress memory-bank/progress.md \
-owner "<当前session或agent标识>"
```
该命令会在锁保护下串行完成三件事:
- 自动识别当前环境:`windows`、`linux`、`darwin`
- 校验 Plan 文件包含必需 `Plan Meta`
- 已有 `in-progress` 优先恢复
- 如无 `in-progress`,按 Plan 文件顺序选择第一个可执行 Plan
`pending``blocked: env:<当前环境>:...`
- 将选中的 Plan 写成 `in-progress`
- 在 `workflow-state` 写入 `claimed_by`、`claimed_at`,并清理上一轮
`verification`
这里的锁保护的是 `progress.md` 状态块更新,避免多个 session
同时读写时发生覆盖。
@ -176,7 +196,8 @@ python {{PLAYBOOK_SCRIPTS}}/playbook.py \
python {{PLAYBOOK_SCRIPTS}}/main_loop.py finish \
-plan <plan> \
-status done \
-progress memory-bank/progress.md
-progress memory-bank/progress.md \
-verified "<本轮已通过的验证命令或证据>"
```
```bash
@ -195,6 +216,17 @@ python {{PLAYBOOK_SCRIPTS}}/main_loop.py finish \
-note "<原因>"
```
只读状态命令:
```bash
python {{PLAYBOOK_SCRIPTS}}/main_loop.py status \
-plans docs/superpowers/plans \
-progress memory-bank/progress.md
```
`status` 只汇总 `pending`、`in-progress`、`done`、`blocked`、
`skipped` 和当前 `workflow-state`,不得修改 `progress.md`
### Plan 场景下的执行上下文与隔离策略
- 本节仅适用于通过主循环领取并执行 Plan 的场景
@ -235,11 +267,14 @@ python {{PLAYBOOK_SCRIPTS}}/main_loop.py finish \
- Plan `done` 后必须完成当前 Plan 变更的归档/提交,然后才能继续领取下一个
Plan归档方式由项目约定决定
- 收尾顺序:
1. 完成 Plan 约定验证
2. 运行 `main_loop.py finish -status done` 写回状态
3. 必要时更新 `progress.md` 上半部分摘要和相关 memory
4. 检查当前变更清单与差异
5. 只归档/提交当前 Plan 相关改动
1. 完成 Plan 约定验证
2. 运行 `main_loop.py finish -status done -verified "<证据>"`
写回状态
3. 必要时更新 `progress.md` 上半部分摘要和相关 memory
4. 检查当前变更清单与差异
5. 只归档/提交当前 Plan 相关改动
- 当前 Plan 相关改动包括但不限于:
- 本轮代码、配置、测试、模板改动
- 当前 Plan 文件(创建、补充、勾选 Task、记录结果等

View File

@ -27,6 +27,26 @@ def run_cli(*args, cwd=None):
)
def valid_plan_text(title: str = "Demo Plan") -> str:
return "\n".join(
[
f"# {title}",
"",
"## Plan Meta",
"",
"- **Plan Group**: `demo`",
"- **Parent Plan**: `none`",
"- **Verification Scope**: `unit`",
"- **Verification Gate**: `python -m unittest tests.test_main_loop_cli`",
"",
"## Tasks",
"",
"- [ ] Task 1: demo",
"",
]
)
class MainLoopCliTests(unittest.TestCase):
def _current_env(self) -> str:
system = platform.system().lower()
@ -40,8 +60,12 @@ class MainLoopCliTests(unittest.TestCase):
root = Path(tmp_dir)
plans_dir = root / "docs" / "superpowers" / "plans"
plans_dir.mkdir(parents=True)
(plans_dir / "2026-01-01-old.md").write_text("old", encoding="utf-8")
(plans_dir / "2026-01-02-new.md").write_text("new", encoding="utf-8")
(plans_dir / "2026-01-01-old.md").write_text(
valid_plan_text("old"), encoding="utf-8"
)
(plans_dir / "2026-01-02-new.md").write_text(
valid_plan_text("new"), encoding="utf-8"
)
result = run_cli(
"claim",
@ -74,7 +98,9 @@ class MainLoopCliTests(unittest.TestCase):
root = Path(tmp_dir)
plans_dir = root / "docs" / "superpowers" / "plans"
plans_dir.mkdir(parents=True)
(plans_dir / "2026-01-01-demo.md").write_text("demo", encoding="utf-8")
(plans_dir / "2026-01-01-demo.md").write_text(
valid_plan_text(), encoding="utf-8"
)
progress = root / "memory-bank" / "progress.md"
progress.parent.mkdir(parents=True)
@ -119,8 +145,12 @@ class MainLoopCliTests(unittest.TestCase):
root = Path(tmp_dir)
plans_dir = root / "docs" / "superpowers" / "plans"
plans_dir.mkdir(parents=True)
(plans_dir / "2026-01-01-a.md").write_text("a", encoding="utf-8")
(plans_dir / "2026-01-02-b.md").write_text("b", encoding="utf-8")
(plans_dir / "2026-01-01-a.md").write_text(
valid_plan_text("a"), encoding="utf-8"
)
(plans_dir / "2026-01-02-b.md").write_text(
valid_plan_text("b"), encoding="utf-8"
)
progress = root / "memory-bank" / "progress.md"
progress.parent.mkdir(parents=True)
@ -159,7 +189,9 @@ class MainLoopCliTests(unittest.TestCase):
root = Path(tmp_dir)
plans_dir = root / "docs" / "superpowers" / "plans"
plans_dir.mkdir(parents=True)
(plans_dir / "2026-01-02-live.md").write_text("live", encoding="utf-8")
(plans_dir / "2026-01-02-live.md").write_text(
valid_plan_text("live"), encoding="utf-8"
)
progress = root / "memory-bank" / "progress.md"
progress.parent.mkdir(parents=True)
@ -202,7 +234,9 @@ class MainLoopCliTests(unittest.TestCase):
root = Path(tmp_dir)
plans_dir = root / "docs" / "superpowers" / "plans"
plans_dir.mkdir(parents=True)
(plans_dir / "2026-01-05-env.md").write_text("env", encoding="utf-8")
(plans_dir / "2026-01-05-env.md").write_text(
valid_plan_text("env"), encoding="utf-8"
)
progress = root / "memory-bank" / "progress.md"
progress.parent.mkdir(parents=True)
@ -250,9 +284,11 @@ class MainLoopCliTests(unittest.TestCase):
root = Path(tmp_dir)
plans_dir = root / "docs" / "superpowers" / "plans"
plans_dir.mkdir(parents=True)
(plans_dir / "2026-01-01-env.md").write_text("env", encoding="utf-8")
(plans_dir / "2026-01-01-env.md").write_text(
valid_plan_text("env"), encoding="utf-8"
)
(plans_dir / "2026-01-02-pending.md").write_text(
"pending", encoding="utf-8"
valid_plan_text("pending"), encoding="utf-8"
)
progress = root / "memory-bank" / "progress.md"
@ -294,6 +330,99 @@ class MainLoopCliTests(unittest.TestCase):
),
)
def test_claim_rejects_plan_missing_required_plan_meta(self):
with tempfile.TemporaryDirectory() as tmp_dir:
root = Path(tmp_dir)
plans_dir = root / "docs" / "superpowers" / "plans"
plans_dir.mkdir(parents=True)
(plans_dir / "2026-01-01-invalid.md").write_text(
"# Invalid Plan\n\n- [ ] Task 1: missing metadata\n",
encoding="utf-8",
)
result = run_cli(
"claim",
"-plans",
"docs/superpowers/plans",
"-progress",
"memory-bank/progress.md",
cwd=root,
)
self.assertEqual(result.returncode, 2)
self.assertIn("missing required Plan Meta", result.stderr)
self.assertIn("2026-01-01-invalid.md", result.stderr)
def test_claim_records_claim_metadata_in_workflow_state(self):
with tempfile.TemporaryDirectory() as tmp_dir:
root = Path(tmp_dir)
plans_dir = root / "docs" / "superpowers" / "plans"
plans_dir.mkdir(parents=True)
(plans_dir / "2026-01-01-demo.md").write_text(
valid_plan_text(), encoding="utf-8"
)
result = run_cli(
"claim",
"-plans",
"docs/superpowers/plans",
"-progress",
"memory-bank/progress.md",
"-owner",
"codex-test",
cwd=root,
)
self.assertEqual(result.returncode, 0, msg=result.stderr)
text = (root / "memory-bank" / "progress.md").read_text(
encoding="utf-8"
)
self.assertIn("claimed_by: codex-test", text)
self.assertRegex(text, r"claimed_at: \d{4}-\d{2}-\d{2}T")
def test_claim_clears_stale_verification_from_workflow_state(self):
with tempfile.TemporaryDirectory() as tmp_dir:
root = Path(tmp_dir)
plans_dir = root / "docs" / "superpowers" / "plans"
plans_dir.mkdir(parents=True)
(plans_dir / "2026-01-01-demo.md").write_text(
valid_plan_text(), encoding="utf-8"
)
progress = root / "memory-bank" / "progress.md"
progress.parent.mkdir(parents=True)
progress.write_text(
"\n".join(
[
"# 当前进展",
"",
"<!-- workflow-state:start -->",
"phase: done",
"verification: old evidence",
"<!-- workflow-state:end -->",
"",
"<!-- plan-status:start -->",
"- [ ] `2026-01-01-demo.md` pending",
"<!-- plan-status:end -->",
"",
]
),
encoding="utf-8",
)
result = run_cli(
"claim",
"-plans",
"docs/superpowers/plans",
"-progress",
"memory-bank/progress.md",
cwd=root,
)
self.assertEqual(result.returncode, 0, msg=result.stderr)
text = progress.read_text(encoding="utf-8")
self.assertNotIn("verification: old evidence", text)
def test_finish_updates_line(self):
with tempfile.TemporaryDirectory() as tmp_dir:
root = Path(tmp_dir)
@ -332,6 +461,98 @@ class MainLoopCliTests(unittest.TestCase):
1,
)
def test_finish_done_records_verification_evidence(self):
with tempfile.TemporaryDirectory() as tmp_dir:
root = Path(tmp_dir)
progress = root / "memory-bank" / "progress.md"
progress.parent.mkdir(parents=True)
progress.write_text(
"\n".join(
[
"# Plan 状态",
"",
"<!-- workflow-state:start -->",
"phase: executing",
"plan: docs/superpowers/plans/2026-01-03-demo.md",
"<!-- workflow-state:end -->",
"",
"<!-- plan-status:start -->",
"- [ ] `2026-01-03-demo.md` in-progress",
"<!-- plan-status:end -->",
"",
]
),
encoding="utf-8",
)
result = run_cli(
"finish",
"-plan",
"docs/superpowers/plans/2026-01-03-demo.md",
"-status",
"done",
"-progress",
"memory-bank/progress.md",
"-verified",
"python -m unittest tests.test_main_loop_cli",
cwd=root,
)
self.assertEqual(result.returncode, 0, msg=result.stderr)
text = progress.read_text(encoding="utf-8")
self.assertIn(
"- [x] `2026-01-03-demo.md` done: "
"verified: python -m unittest tests.test_main_loop_cli",
text,
)
self.assertIn(
"verification: python -m unittest tests.test_main_loop_cli",
text,
)
def test_finish_without_verified_clears_stale_verification(self):
with tempfile.TemporaryDirectory() as tmp_dir:
root = Path(tmp_dir)
progress = root / "memory-bank" / "progress.md"
progress.parent.mkdir(parents=True)
progress.write_text(
"\n".join(
[
"# Plan 状态",
"",
"<!-- workflow-state:start -->",
"phase: executing",
"plan: docs/superpowers/plans/2026-01-03-demo.md",
"verification: old evidence",
"<!-- workflow-state:end -->",
"",
"<!-- plan-status:start -->",
"- [ ] `2026-01-03-demo.md` in-progress",
"<!-- plan-status:end -->",
"",
]
),
encoding="utf-8",
)
result = run_cli(
"finish",
"-plan",
"docs/superpowers/plans/2026-01-03-demo.md",
"-status",
"blocked",
"-progress",
"memory-bank/progress.md",
"-note",
"needs confirmation",
cwd=root,
)
self.assertEqual(result.returncode, 0, msg=result.stderr)
text = progress.read_text(encoding="utf-8")
self.assertNotIn("verification: old evidence", text)
self.assertIn("phase: blocked", text)
def test_finish_updates_workflow_phase_and_preserves_metadata(self):
with tempfile.TemporaryDirectory() as tmp_dir:
root = Path(tmp_dir)
@ -491,7 +712,9 @@ class MainLoopCliTests(unittest.TestCase):
root = Path(tmp_dir)
plans_dir = root / "docs" / "superpowers" / "plans"
plans_dir.mkdir(parents=True)
(plans_dir / "2026-05-18-demo.md").write_text("demo", encoding="utf-8")
(plans_dir / "2026-05-18-demo.md").write_text(
valid_plan_text(), encoding="utf-8"
)
progress = root / "memory-bank" / "progress.md"
progress.parent.mkdir(parents=True)
@ -561,6 +784,67 @@ class MainLoopCliTests(unittest.TestCase):
)
self.assertIn("- [x] `2026-05-18-demo.md` done", text)
def test_status_reports_counts_and_current_workflow_state(self):
with tempfile.TemporaryDirectory() as tmp_dir:
root = Path(tmp_dir)
plans_dir = root / "docs" / "superpowers" / "plans"
plans_dir.mkdir(parents=True)
for plan_key in (
"2026-01-01-a.md",
"2026-01-02-b.md",
"2026-01-03-c.md",
"2026-01-04-d.md",
"2026-01-05-e.md",
):
(plans_dir / plan_key).write_text(valid_plan_text(), encoding="utf-8")
progress = root / "memory-bank" / "progress.md"
progress.parent.mkdir(parents=True)
progress.write_text(
"\n".join(
[
"# 当前进展",
"",
"<!-- workflow-state:start -->",
"phase: executing",
"plan: docs/superpowers/plans/2026-01-02-b.md",
"claimed_by: codex-test",
"<!-- workflow-state:end -->",
"",
"<!-- plan-status:start -->",
"- [ ] `2026-01-01-a.md` pending",
"- [ ] `2026-01-02-b.md` in-progress",
"- [x] `2026-01-03-c.md` done",
"- [ ] `2026-01-04-d.md` blocked: env:linux:Task2",
"- [ ] `2026-01-05-e.md` skipped: obsolete",
"<!-- plan-status:end -->",
"",
]
),
encoding="utf-8",
)
result = run_cli(
"status",
"-plans",
"docs/superpowers/plans",
"-progress",
"memory-bank/progress.md",
cwd=root,
)
self.assertEqual(result.returncode, 0, msg=result.stderr)
self.assertIn(
"STATUS total=5 pending=1 in-progress=1 done=1 blocked=1 skipped=1",
result.stdout,
)
self.assertIn(
"CURRENT phase=executing "
"plan=docs/superpowers/plans/2026-01-02-b.md "
"claimed_by=codex-test",
result.stdout,
)
def test_concurrent_record_preserves_spec_and_plan_metadata(self):
with tempfile.TemporaryDirectory() as tmp_dir:
root = Path(tmp_dir)