🐛 fix(templates): enforce main loop progress tracking
Update Third-party Superpowers / Update thirdparty/skill snapshot (push) Successful in 1m20s Details

replace the old select/record progress flow with a single main_loop claim/finish CLI.

route template execution through main_loop only, remove the legacy plan_progress entry points, and update tests to enforce the new behavior.
This commit is contained in:
csh 2026-03-12 17:47:33 +08:00
parent 51373d7469
commit 7b84daf3bd
7 changed files with 198 additions and 159 deletions

View File

@ -9,21 +9,25 @@ PLAN_STATUS_START = "<!-- plan-status:start -->"
PLAN_STATUS_END = "<!-- plan-status:end -->"
PLAN_FILE_RE = re.compile(r"^(\d{4}-\d{2}-\d{2})-.+\.md$")
PLAN_LINE_RE = re.compile(
r"^- \[(?P<check>[ xX])\] `(?P<plan>[^`]+)` (?P<status>done|blocked|pending|in-progress|skipped)(?:: (?P<note>.*))?$"
r"^- \[(?P<check>[ xX])\] `(?P<plan>[^`]+)` "
r"(?P<status>done|blocked|pending|in-progress|skipped)"
r"(?:: (?P<note>.*))?$"
)
VALID_STATUSES = {"done", "blocked", "pending", "in-progress", "skipped"}
FINISH_STATUSES = {"done", "blocked", "skipped"}
ENV_BLOCKED_RE = re.compile(r"^env:([^:]+):(.+)$")
def usage() -> str:
return (
"Usage:\n"
" python scripts/plan_progress.py select -plans <dir> -progress <file>\n"
" python scripts/plan_progress.py record -plan <path> -status <status> -progress <file> [-note <text>]\n"
" python scripts/plan_progress.py -h\n"
" python scripts/main_loop.py claim -plans <dir> -progress <file>\n"
" python scripts/main_loop.py finish -plan <path> -status <status> "
"-progress <file> [-note <text>]\n"
" python scripts/main_loop.py -h\n"
"Options:\n"
" -plans DIR\n"
" -plan PATH\n"
" -status done|blocked|pending|in-progress|skipped\n"
" -status done|blocked|skipped\n"
" -progress FILE\n"
" -note TEXT\n"
" -h, -help Show this help.\n"
@ -57,30 +61,18 @@ def normalize_plan_key(plan_value: str) -> str:
return raw
def normalize_note(note: str) -> str:
return note.replace("\n", " ").replace("\r", " ").replace("`", "'").strip()
def render_plan_line(plan_key: str, status: str, note: Optional[str]) -> str:
checked = "x" if status == "done" else " "
if status == "blocked":
suffix = "blocked"
if note:
suffix += f": {note}"
elif status == "pending":
suffix = "pending"
elif status == "in-progress":
suffix = "in-progress"
elif status == "skipped":
suffix = "skipped"
if note:
suffix += f": {note}"
else:
suffix = "done"
suffix = status
if note:
suffix += f": {note}"
return f"- [{checked}] `{plan_key}` {suffix}"
def normalize_note(note: str) -> str:
cleaned = note.replace("\n", " ").replace("\r", " ").replace("`", "'").strip()
return cleaned
def list_plan_files(plans_dir: Path) -> list[str]:
entries: list[str] = []
for path in plans_dir.iterdir():
@ -88,11 +80,7 @@ def list_plan_files(plans_dir: Path) -> list[str]:
continue
if not PLAN_FILE_RE.match(path.name):
continue
try:
rel = path.resolve().relative_to(plans_dir.resolve()).as_posix()
except ValueError:
rel = path.name
entries.append(rel)
entries.append(path.name)
return sorted(entries)
@ -110,7 +98,9 @@ def find_block(lines: list[str]) -> Optional[tuple[int, int]]:
return None
def parse_entries(lines: list[str], start_idx: int, end_idx: int) -> list[tuple[str, str, Optional[str], int]]:
def parse_entries(
lines: list[str], start_idx: int, end_idx: int
) -> list[tuple[str, str, Optional[str], int]]:
entries: list[tuple[str, str, Optional[str], int]] = []
for idx in range(start_idx + 1, end_idx):
line = lines[idx].strip()
@ -132,17 +122,13 @@ def render_progress_lines(plans: list[str]) -> list[str]:
return lines
ENV_BLOCKED_RE = re.compile(r"^env:([^:]+):(.+)$")
def parse_env_blocked_note(note: Optional[str]) -> Optional[tuple[str, str]]:
"""Parse 'env:windows:Task2,Task4' format. Returns (env, tasks) or None."""
if not note:
return None
match = ENV_BLOCKED_RE.match(note)
if match:
return match.group(1), match.group(2)
return None
if not match:
return None
return match.group(1), match.group(2)
def detect_env() -> Optional[str]:
@ -150,91 +136,124 @@ def detect_env() -> Optional[str]:
return mapping.get(platform.system().lower())
def select_plan(plans_dir: Path, progress_path: Path) -> tuple[int, str]:
def load_progress_lines(progress_path: Path) -> list[str]:
progress_path.parent.mkdir(parents=True, exist_ok=True)
if progress_path.exists():
return progress_path.read_text(encoding="utf-8").splitlines()
return []
def write_progress_lines(progress_path: Path, lines: list[str]) -> None:
progress_path.write_text("\n".join(lines) + "\n", encoding="utf-8")
def ensure_plan_block(
lines: list[str], progress_path: Path, plan_keys: list[str]
) -> tuple[list[str], int, int]:
block = find_block(lines)
if not block:
lines = render_progress_lines(plan_keys)
write_progress_lines(progress_path, lines)
block = find_block(lines)
if not block:
raise ValueError("failed to create plan status block")
return lines, block[0], block[1]
def ensure_all_plans_present(
lines: list[str], start_idx: int, end_idx: int, progress_path: Path, plan_keys: list[str]
) -> list[tuple[str, str, Optional[str], int]]:
entries = parse_entries(lines, start_idx, end_idx)
existing = {plan_key for plan_key, _, _, _ in entries}
missing = [plan_key for plan_key in plan_keys if plan_key not in existing]
if missing:
insert_lines = [render_plan_line(plan_key, "pending", None) for plan_key in missing]
lines[end_idx:end_idx] = insert_lines
write_progress_lines(progress_path, lines)
end_idx += len(insert_lines)
entries = parse_entries(lines, start_idx, end_idx)
return entries
def choose_claim_entry(
entries: list[tuple[str, str, Optional[str], int]], current_env: Optional[str]
) -> Optional[tuple[str, Optional[str], int]]:
for plan_key, status, note, idx in entries:
if status == "in-progress":
return plan_key, note, idx
for plan_key, status, note, idx in entries:
if status == "pending":
return plan_key, note, idx
if current_env:
for plan_key, status, note, idx in entries:
if status != "blocked":
continue
env_info = parse_env_blocked_note(note)
if env_info and env_info[0] == current_env:
return plan_key, note, idx
return None
def claim_plan(plans_dir: Path, progress_path: Path) -> tuple[int, str]:
if not plans_dir.is_dir():
return 2, f"ERROR: plans dir not found: {plans_dir}"
plan_keys = list_plan_files(plans_dir)
if not plan_keys:
return 2, "ERROR: no plan files found"
progress_path.parent.mkdir(parents=True, exist_ok=True)
if progress_path.exists():
lines = progress_path.read_text(encoding="utf-8").splitlines()
else:
lines = []
lines = load_progress_lines(progress_path)
try:
lines, start_idx, end_idx = ensure_plan_block(lines, progress_path, plan_keys)
except ValueError as exc:
return 2, f"ERROR: {exc}"
block = find_block(lines)
if not block:
lines = render_progress_lines(plan_keys)
progress_path.write_text("\n".join(lines) + "\n", encoding="utf-8")
return 0, (plans_dir / plan_keys[0]).as_posix()
entries = ensure_all_plans_present(lines, start_idx, end_idx, progress_path, plan_keys)
chosen = choose_claim_entry(entries, detect_env())
if not chosen:
return 2, "ERROR: no claimable plans"
start_idx, end_idx = block
entries = parse_entries(lines, start_idx, end_idx)
existing = {plan for plan, _, _, _ in entries}
missing = [plan for plan in plan_keys if plan not in existing]
if missing:
insert_lines = [render_plan_line(plan, "pending", None) for plan in missing]
lines[end_idx:end_idx] = insert_lines
end_idx += len(insert_lines)
progress_path.write_text("\n".join(lines) + "\n", encoding="utf-8")
entries = parse_entries(lines, start_idx, end_idx)
plan_key, note, idx = chosen
lines[idx] = render_plan_line(plan_key, "in-progress", note)
write_progress_lines(progress_path, lines)
for plan_key, status, note, _ in entries:
if status in ("pending", "in-progress"):
return 0, (plans_dir / plan_key).as_posix()
# Check for env-blocked Plans if current environment is detected
current_env = detect_env()
if current_env:
for plan_key, status, note, _ in entries:
if status == "blocked":
env_info = parse_env_blocked_note(note)
if env_info and env_info[0] == current_env:
return 0, (plans_dir / plan_key).as_posix()
return 2, "ERROR: no pending plans"
output = [f"PLAN={(plans_dir / plan_key).as_posix()}"]
if note:
output.append(f"NOTE={note}")
return 0, "\n".join(output)
def record_status(plan: str, status: str, progress_path: Path, note: Optional[str]) -> tuple[int, str]:
if status not in VALID_STATUSES:
def finish_plan(
plan: str, status: str, progress_path: Path, note: Optional[str]
) -> tuple[int, str]:
if status not in FINISH_STATUSES:
return 2, f"ERROR: invalid status: {status}"
if not plan:
return 2, "ERROR: plan is required"
progress_path.parent.mkdir(parents=True, exist_ok=True)
if progress_path.exists():
lines = progress_path.read_text(encoding="utf-8").splitlines()
else:
lines = []
lines = load_progress_lines(progress_path)
plan_key = normalize_plan_key(plan)
block = find_block(lines)
if not block:
lines = render_progress_lines([plan_key])
block = find_block(lines)
if not block:
return 2, "ERROR: failed to create plan status block"
start_idx, end_idx = block
try:
lines, start_idx, end_idx = ensure_plan_block(lines, progress_path, [plan_key])
except ValueError as exc:
return 2, f"ERROR: {exc}"
entries = parse_entries(lines, start_idx, end_idx)
rendered_note = None
if status in ("blocked", "skipped") and note:
rendered_note = normalize_note(note)
rendered_note = normalize_note(note) if note else None
updated_line = render_plan_line(plan_key, status, rendered_note)
updated = False
for entry_plan, _, _, idx in entries:
if entry_plan == plan_key:
lines[idx] = updated_line
updated = True
break
write_progress_lines(progress_path, lines)
return 0, updated_line
if not updated:
lines[end_idx:end_idx] = [updated_line]
progress_path.write_text("\n".join(lines) + "\n", encoding="utf-8")
lines[end_idx:end_idx] = [updated_line]
write_progress_lines(progress_path, lines)
return 0, updated_line
@ -247,7 +266,7 @@ def main(argv: list[str]) -> int:
return 0
mode = argv[0]
if mode not in ("select", "record"):
if mode not in {"claim", "finish"}:
print(f"ERROR: unknown mode: {mode}", file=sys.stderr)
print(usage(), file=sys.stderr)
return 2
@ -262,14 +281,14 @@ def main(argv: list[str]) -> int:
print(usage(), file=sys.stderr)
return 2
if mode == "select":
if mode == "claim":
plans = flags.get("-plans")
progress = flags.get("-progress")
if not plans or not progress:
print("ERROR: -plans and -progress are required", file=sys.stderr)
print(usage(), file=sys.stderr)
return 2
code, message = select_plan(Path(plans), Path(progress))
code, message = claim_plan(Path(plans), Path(progress))
if code != 0:
print(message, file=sys.stderr)
return code
@ -284,7 +303,7 @@ def main(argv: list[str]) -> int:
print("ERROR: -plan, -status, and -progress are required", file=sys.stderr)
print(usage(), file=sys.stderr)
return 2
code, message = record_status(plan, status, Path(progress), note)
code, message = finish_plan(plan, status, Path(progress), note)
if code != 0:
print(message, file=sys.stderr)
return code

View File

@ -62,13 +62,13 @@
## 规划与执行分工
| 阶段 | 工具 | 产出 | 留痕 |
| ------------ | ---------------------- | ----------------- | -------------------- |
| 头脑风暴 | `$brainstorming` skill | 设计思路 | 无 |
| 生成计划 | `$writing-plans` skill | `docs/plans/*.md` | 无 |
| **执行计划** | **主循环** | 代码/配置变更 | **plan_progress.py** |
| 阶段 | 工具 | 产出 | 留痕 |
| ------------ | ---------------------- | ----------------- | -------------------------- |
| 头脑风暴 | `$brainstorming` skill | 设计思路 | 无 |
| 生成计划 | `$writing-plans` skill | `docs/plans/*.md` | 无 |
| **执行计划** | **`main_loop.py` 主循环** | 代码/配置变更 | **`memory-bank/progress.md`** |
> **重要**:第三方 skills 不记录操作状态,执行必须通过主循环完成
> **重要**:第三方 skills 只用于规划,不负责执行留痕。收到执行触发词后,不得直接使用 `$executing-plans`,也不得直接使用 `$subagent-driven-development`;必须先运行 `main_loop.py claim` 领取 Plan再通过 `main_loop.py finish` 写回结果
## 主循环
@ -99,21 +99,21 @@
**流程**
1. 检测环境
- `plan_progress.py` 自动识别当前环境(`windows` / `linux` / `darwin`
2. 选择 Plan
- 运行 `python {{PLAYBOOK_SCRIPTS}}/plan_progress.py select -plans docs/plans -progress memory-bank/progress.md`
- 返回第一个可执行的 Plan
- `pending``in-progress` 的 Plan
- `blocked: env:<当前环境>:...` 的 Plan环境匹配时恢复执行
- 如无可执行 Plan跳到步骤 7
- **注意**:每次 select 会重新扫描 `docs/plans/` 目录,支持动态添加 Plan
3. 标记开始:
- 运行 `python {{PLAYBOOK_SCRIPTS}}/plan_progress.py record -plan <plan> -status in-progress -progress memory-bank/progress.md`
4. 阅读 Plan
1. 领取 Plan
- 运行 `python {{PLAYBOOK_SCRIPTS}}/main_loop.py claim -plans docs/plans -progress memory-bank/progress.md`
- 该命令会**原子化**完成三件事
- 自动识别当前环境(`windows` / `linux` / `darwin`
- 选择第一个可执行的 Plan优先恢复 `in-progress`,其次 `pending`,最后 `blocked: env:<当前环境>:...`
- 将选中的 Plan 写成 `in-progress`
- stdout 必须包含 `PLAN=<path>`;如果是从环境阻塞恢复,还会附带 `NOTE=env:<环境>:<Task列表>`
- 如无可执行 Plan跳到步骤 6
2. 阅读领取结果:
- 记录 `PLAN=` 返回的路径
- 如果 stdout 含 `NOTE=env:...`,本轮只执行列出的 Task
3. 阅读 Plan
- 理解目标、子任务与验证标准
- 如果是从 `blocked: env:...` 恢复,只执行列出的 Task
5. 逐步执行:
- **注意**Plan 文档中的 execution handoff / REQUIRED SUB-SKILL 仅作参考;如与本文件冲突,一律以主循环为准
4. 逐步执行:
- 按顺序执行 Task
- 每个 Task 完成后进行必要验证(测试/日志/diff
- **Task 失败处理**
@ -123,19 +123,23 @@
- 遇到歧义/风险/决策点:
- 常规模式:记录到回复中,可询问用户
- 无交互模式:按「需要确认的场景」规则自动处理
6. 记录结果:
- 全部完成:`... -status done ...`
- 有 Task 因环境跳过:`... -status blocked ... -note "env:<所需环境>:<Task列表>"`
- 其他阻塞:`... -status blocked ... -note "<原因>"`
- 跳过整个 Plan`... -status skipped ... -note "<原因>"`
- 回到步骤 2 继续下一个 Plan
7. 汇总报告(所有 Plan 处理完毕后):
5. 写回结果:
- 全部完成:
- `python {{PLAYBOOK_SCRIPTS}}/main_loop.py finish -plan <plan> -status done -progress memory-bank/progress.md`
- 有 Task 因环境跳过:
- `python {{PLAYBOOK_SCRIPTS}}/main_loop.py finish -plan <plan> -status blocked -progress memory-bank/progress.md -note "env:<所需环境>:<Task列表>"`
- 其他阻塞:
- `python {{PLAYBOOK_SCRIPTS}}/main_loop.py finish -plan <plan> -status blocked -progress memory-bank/progress.md -note "<原因>"`
- 跳过整个 Plan
- `python {{PLAYBOOK_SCRIPTS}}/main_loop.py finish -plan <plan> -status skipped -progress memory-bank/progress.md -note "<原因>"`
- 写回后回到步骤 1 继续下一个 Plan
6. 汇总报告(所有 Plan 处理完毕后):
- 已完成的 Plan
- 阻塞/跳过的 Plan 及原因
- 需要在其他环境执行的 Plan`blocked: env:...`
- 待确认的歧义/风险/决策点
- 如需记录重要决策,写入 `memory-bank/decisions.md`
8. **结束**:主循环终止
7. **结束**:主循环终止
## Plan 规则
@ -146,6 +150,7 @@
- `Verification Gate`must-pass
- **不允许中断任务**Plan 中不应包含必然失败或依赖未确认的信息;未确认项必须在 `$brainstorming` 阶段解决后再产出 Plan
- **验证必须可通过**Plan 内验证应为当前阶段可通过的局部验证;需要集成验证的内容放入上层/集成 Plan
- **执行入口唯一**Plan 生成完成后,后续执行只能由主循环驱动;不得按 Plan 头部说明直接切换到 `$executing-plans``$subagent-driven-development`
- 不因等待确认而中断可执行步骤;待确认事项在回复中列出
- 每轮只处理一个 Plan
- **小步快跑**:每个 Plan 应该可快速完成
@ -174,7 +179,7 @@
- **重要决策**:记录到 `memory-bank/decisions.md`ADR 格式)
- **待确认事项**:在回复中列出并等待确认
- **进度留痕**:通过 `{{PLAYBOOK_SCRIPTS}}/plan_progress.py` 维护 `memory-bank/progress.md` 的 Plan 状态块(唯一权威)
- **进度留痕**:通过 `{{PLAYBOOK_SCRIPTS}}/main_loop.py` 维护 `memory-bank/progress.md` 的 Plan 状态块(唯一权威)
### Git 操作
@ -233,7 +238,7 @@
- [ ] 相关测试通过(如有测试且未被豁免)
- [ ] 换行符正确
- [ ] 无语法错误
- [ ] 已通过 `plan_progress.py` 记录 Plan 状态
- [ ] 已通过 `main_loop.py finish` 写回 Plan 状态
---

View File

@ -224,7 +224,7 @@ project/
如需项目私有规则,建议创建 `AGENT_RULES.local.md`,其优先级高于 `AGENT_RULES.md`
且不会被 `playbook.py` 覆盖。
主循环会根据 `memory-bank/progress.md` 的 Plan 状态清单,
自动选择第一个 pending 的 Plan并要求通过 `scripts/plan_progress.py` 写入状态。
通过 `scripts/main_loop.py claim/finish` 原子化领取 Plan 并写回状态。
### 示例:不跑测试的计划提示词
@ -246,7 +246,7 @@ project/
1) 先完成 brainstorming并输出设计文档 `docs/plans/YYYY-MM-DD-<topic>-design.md`
2) 询问我“是否进入 `docs/plans/` 实施计划编写阶段”,确认后使用 writing-plans 生成实现计划。
3) 实现计划内明确标注每步要改的文件与命令;验证步骤只包含可通过的局部验证,不包含测试。
4) 执行计划并更新 `memory-bank/progress.md`
4) 执行计划时只走主循环,通过 `docs/standards/playbook/scripts/main_loop.py claim/finish` 更新 `memory-bank/progress.md`
```
### AGENTS.template.md
@ -304,7 +304,7 @@ playbook/
├── templates/ # 本目录:项目架构模板 → 部署到 memory-bank/ 等
└── scripts/
├── playbook.py # 统一入口vendor/sync_rules/sync_memory_bank/sync_prompts/sync_standards/...
└── plan_progress.py # Plan 选择与进度记录
└── main_loop.py # 主循环领取与状态写回
```
## 完整部署流程

View File

@ -36,7 +36,7 @@ prompts/
生成计划 → $writing-plans skill → docs/plans/*.md
执行计划 → AGENT_RULES 主循环(留痕)
执行计划 → AGENT_RULES 主循环(`main_loop.py claim/finish` 留痕)
代码评审(有 MR/PR 时)→ code-review.md
@ -45,7 +45,7 @@ prompts/
沉淀提示词 → prompt-generator.md可选
```
> **核心规则在 `AGENT_RULES.md`**,第三方 skills 负责规划,主循环负责执行和留痕
> **核心规则在 `AGENT_RULES.md`**,第三方 skills 只负责规划;执行与留痕必须走 `main_loop.py claim/finish`
---

View File

@ -14,7 +14,7 @@ tests/
├── test_no_backup_flags.py # no_backup 行为测试
├── test_sync_directory_actions.py # sync_memory_bank/sync_prompts 行为测试
├── test_vendor_snapshot_templates.py # vendor 快照模板完整性测试
├── test_plan_progress_cli.py # plan_progress CLI 测试
├── test_main_loop_cli.py # main_loop CLI 测试
├── test_superpowers_list_sync.py # superpowers 列表一致性测试
├── test_superpowers_workflows.py # superpowers 工作流配置校验
├── test_sync_templates_placeholders.py # 占位符替换测试sync_rules/sync_standards

View File

@ -1,12 +1,12 @@
import platform
import subprocess
import sys
import tempfile
import unittest
import platform
from pathlib import Path
ROOT = Path(__file__).resolve().parents[1]
SCRIPT = ROOT / "scripts" / "plan_progress.py"
SCRIPT = ROOT / "scripts" / "main_loop.py"
def run_cli(*args, cwd=None):
@ -18,7 +18,7 @@ def run_cli(*args, cwd=None):
)
class PlanProgressCliTests(unittest.TestCase):
class MainLoopCliTests(unittest.TestCase):
def _current_env(self) -> str:
system = platform.system().lower()
mapping = {"windows": "windows", "linux": "linux", "darwin": "darwin"}
@ -26,7 +26,7 @@ class PlanProgressCliTests(unittest.TestCase):
self.skipTest(f"Unsupported environment: {system}")
return mapping[system]
def test_select_seeds_progress_when_missing(self):
def test_claim_seeds_progress_and_marks_first_plan_in_progress(self):
with tempfile.TemporaryDirectory() as tmp_dir:
root = Path(tmp_dir)
plans_dir = root / "docs" / "plans"
@ -35,7 +35,7 @@ class PlanProgressCliTests(unittest.TestCase):
(plans_dir / "2026-01-02-new.md").write_text("new", encoding="utf-8")
result = run_cli(
"select",
"claim",
"-plans",
"docs/plans",
"-progress",
@ -44,16 +44,16 @@ class PlanProgressCliTests(unittest.TestCase):
)
self.assertEqual(result.returncode, 0, msg=result.stderr)
self.assertEqual(result.stdout.strip(), "docs/plans/2026-01-01-old.md")
self.assertEqual(result.stdout.strip(), "PLAN=docs/plans/2026-01-01-old.md")
progress = root / "memory-bank" / "progress.md"
text = progress.read_text(encoding="utf-8")
self.assertIn("<!-- plan-status:start -->", text)
self.assertIn("<!-- plan-status:end -->", text)
self.assertIn("`2026-01-01-old.md` pending", text)
self.assertIn("`2026-01-01-old.md` in-progress", text)
self.assertIn("`2026-01-02-new.md` pending", text)
def test_select_returns_first_pending_in_order(self):
def test_claim_returns_existing_in_progress_before_pending(self):
with tempfile.TemporaryDirectory() as tmp_dir:
root = Path(tmp_dir)
plans_dir = root / "docs" / "plans"
@ -70,7 +70,7 @@ class PlanProgressCliTests(unittest.TestCase):
"",
"<!-- plan-status:start -->",
"- [ ] `2026-01-02-b.md` pending",
"- [ ] `2026-01-01-a.md` pending",
"- [ ] `2026-01-01-a.md` in-progress",
"<!-- plan-status:end -->",
"",
]
@ -79,7 +79,7 @@ class PlanProgressCliTests(unittest.TestCase):
)
result = run_cli(
"select",
"claim",
"-plans",
"docs/plans",
"-progress",
@ -88,9 +88,9 @@ class PlanProgressCliTests(unittest.TestCase):
)
self.assertEqual(result.returncode, 0, msg=result.stderr)
self.assertEqual(result.stdout.strip(), "docs/plans/2026-01-02-b.md")
self.assertEqual(result.stdout.strip(), "PLAN=docs/plans/2026-01-01-a.md")
def test_select_returns_env_blocked_plan_without_flag(self):
def test_claim_resumes_env_blocked_plan_and_preserves_note(self):
with tempfile.TemporaryDirectory() as tmp_dir:
root = Path(tmp_dir)
plans_dir = root / "docs" / "plans"
@ -100,13 +100,14 @@ class PlanProgressCliTests(unittest.TestCase):
progress = root / "memory-bank" / "progress.md"
progress.parent.mkdir(parents=True)
env = self._current_env()
note = f"env:{env}:Task1,Task3"
progress.write_text(
"\n".join(
[
"# Plan 状态",
"",
"<!-- plan-status:start -->",
f"- [ ] `2026-01-05-env.md` blocked: env:{env}:Task1",
f"- [ ] `2026-01-05-env.md` blocked: {note}",
"<!-- plan-status:end -->",
"",
]
@ -115,7 +116,7 @@ class PlanProgressCliTests(unittest.TestCase):
)
result = run_cli(
"select",
"claim",
"-plans",
"docs/plans",
"-progress",
@ -124,9 +125,20 @@ class PlanProgressCliTests(unittest.TestCase):
)
self.assertEqual(result.returncode, 0, msg=result.stderr)
self.assertEqual(result.stdout.strip(), "docs/plans/2026-01-05-env.md")
self.assertEqual(
result.stdout.strip(),
"\n".join(
[
"PLAN=docs/plans/2026-01-05-env.md",
f"NOTE={note}",
]
),
)
def test_record_updates_line(self):
text = progress.read_text(encoding="utf-8")
self.assertIn(f"`2026-01-05-env.md` in-progress: {note}", text)
def test_finish_updates_line(self):
with tempfile.TemporaryDirectory() as tmp_dir:
root = Path(tmp_dir)
progress = root / "memory-bank" / "progress.md"
@ -137,7 +149,7 @@ class PlanProgressCliTests(unittest.TestCase):
"# Plan 状态",
"",
"<!-- plan-status:start -->",
"- [ ] `2026-01-03-demo.md` pending",
"- [ ] `2026-01-03-demo.md` in-progress",
"<!-- plan-status:end -->",
"",
]
@ -146,7 +158,7 @@ class PlanProgressCliTests(unittest.TestCase):
)
result = run_cli(
"record",
"finish",
"-plan",
"docs/plans/2026-01-03-demo.md",
"-status",

View File

@ -50,7 +50,10 @@ langs = [\"cpp\", \"tsl\"]
rules_md = Path(tmp_dir) / "AGENT_RULES.md"
rules_text = rules_md.read_text(encoding="utf-8")
self.assertIn("docs/standards/playbook/scripts/plan_progress.py", rules_text)
self.assertIn("docs/standards/playbook/scripts/main_loop.py claim", rules_text)
self.assertNotIn("plan_progress.py", rules_text)
self.assertIn("不得直接使用 `$executing-plans`", rules_text)
self.assertIn("不得直接使用 `$subagent-driven-development`", rules_text)
self.assertNotIn("{{PLAYBOOK_SCRIPTS}}", rules_text)
def test_sync_standards_rewrites_typescript_docs_prefix_for_vendored_playbook(self):