feat(plan_progress): track plan status in progress.md

This commit is contained in:
csh 2026-01-27 16:03:53 +08:00
parent 73d5c261b1
commit 6774a9d4aa
4 changed files with 293 additions and 182 deletions

View File

@ -1,29 +1,31 @@
#!/usr/bin/env python3
import re
import sys
from datetime import datetime
from pathlib import Path
from typing import Optional
PLAN_PREFIX = "[PLAN]"
PLAN_SECTION_HEADER = "## Plan 状态记录"
PLAN_STATUS_START = "<!-- plan-status:start -->"
PLAN_STATUS_END = "<!-- plan-status:end -->"
PLAN_FILE_RE = re.compile(r"^(\d{4}-\d{2}-\d{2})-.+\.md$")
VALID_STATUSES = {"in-progress", "done", "blocked"}
PLAN_LINE_RE = re.compile(
r"^- \[(?P<check>[ xX])\] `(?P<plan>[^`]+)` (?P<status>done|blocked|pending)(?:: (?P<note>.*))?$"
)
VALID_STATUSES = {"done", "blocked", "pending"}
def usage() -> str:
return (
"Usage:\\n"
" python scripts/plan_progress.py select -plans <dir> -progress <file>\\n"
" python scripts/plan_progress.py record -plan <path> -status <status> -progress <file> [-note <text>]\\n"
" python scripts/plan_progress.py -h\\n"
"Options:\\n"
" -plans DIR\\n"
" -plan PATH\\n"
" -status in-progress|done|blocked\\n"
" -progress FILE\\n"
" -note TEXT\\n"
" -h, -help Show this help.\\n"
"Usage:\n"
" python scripts/plan_progress.py select -plans <dir> -progress <file>\n"
" python scripts/plan_progress.py record -plan <path> -status <status> -progress <file> [-note <text>]\n"
" python scripts/plan_progress.py -h\n"
"Options:\n"
" -plans DIR\n"
" -plan PATH\n"
" -status done|blocked|pending\n"
" -progress FILE\n"
" -note TEXT\n"
" -h, -help Show this help.\n"
)
@ -43,102 +45,123 @@ def parse_flags(args: list[str]) -> dict[str, str]:
return flags
def normalize_plan_key(plan_value: str, cwd: Path) -> str:
try:
return Path(plan_value).resolve().relative_to(cwd.resolve()).as_posix()
except ValueError:
return Path(plan_value).as_posix()
def normalize_plan_key(plan_value: str) -> str:
raw = plan_value.strip().replace("\\", "/")
raw = raw.lstrip("./")
if raw.startswith("docs/plans/"):
return raw[len("docs/plans/") :]
marker = "/docs/plans/"
if marker in raw:
return raw.split(marker, 1)[1]
return raw
def load_plan_records(progress_path: Path, cwd: Path) -> dict[str, str]:
if not progress_path.exists():
return {}
text = progress_path.read_text(encoding="utf-8")
records: dict[str, str] = {}
for line in text.splitlines():
if not line.startswith(PLAN_PREFIX):
continue
payload = line[len(PLAN_PREFIX) :].strip()
if not payload:
continue
segments = [seg.strip() for seg in payload.split("|")]
if not segments:
continue
plan_path = segments[0]
status = None
for seg in segments[1:]:
if "=" not in seg:
continue
key, value = seg.split("=", 1)
if key.strip() == "status":
status = value.strip()
if not plan_path or status is None:
continue
records[normalize_plan_key(plan_path, cwd)] = status
return records
def list_plan_files(plans_dir: Path, cwd: Path) -> list[tuple[str, Path, str]]:
entries: list[tuple[str, Path, str]] = []
for path in plans_dir.iterdir():
if not path.is_file():
continue
match = PLAN_FILE_RE.match(path.name)
if not match:
continue
date_value = match.group(1)
try:
rel = path.resolve().relative_to(cwd.resolve()).as_posix()
except ValueError:
rel = path.as_posix()
entries.append((date_value, path, rel))
return entries
def select_plan(plans_dir: Path, progress_path: Path) -> tuple[int, str]:
cwd = Path.cwd()
if not plans_dir.is_dir():
return 2, f"ERROR: plans dir not found: {plans_dir}"
plans = list_plan_files(plans_dir, cwd)
if not plans:
return 2, "ERROR: no plan files found"
records = load_plan_records(progress_path, cwd)
in_progress = [item for item in plans if records.get(item[2]) == "in-progress"]
if in_progress:
in_progress.sort(key=lambda item: (item[0], item[2]))
return 0, in_progress[-1][2]
pending = [
item
for item in plans
if records.get(item[2]) not in ("done", "blocked")
]
if not pending:
return 2, "ERROR: no pending plans"
pending.sort(key=lambda item: (item[0], item[2]))
return 0, pending[-1][2]
def ensure_plan_section(text: str) -> str:
if PLAN_SECTION_HEADER in text:
return text
suffix = text
if suffix and not suffix.endswith("\n"):
suffix += "\n"
if suffix:
suffix += "\n"
suffix += PLAN_SECTION_HEADER + "\n"
return suffix
def render_plan_line(plan_key: str, status: str, note: Optional[str]) -> str:
checked = "x" if status == "done" else " "
if status == "blocked":
suffix = "blocked"
if note:
suffix += f": {note}"
elif status == "pending":
suffix = "pending"
else:
suffix = "done"
return f"- [{checked}] `{plan_key}` {suffix}"
def normalize_note(note: str) -> str:
cleaned = note.replace("\n", " ").replace("|", " ").strip()
cleaned = note.replace("\n", " ").replace("\r", " ").replace("`", "'").strip()
return cleaned
def list_plan_files(plans_dir: Path) -> list[str]:
entries: list[str] = []
for path in plans_dir.iterdir():
if not path.is_file():
continue
if not PLAN_FILE_RE.match(path.name):
continue
try:
rel = path.resolve().relative_to(plans_dir.resolve()).as_posix()
except ValueError:
rel = path.name
entries.append(rel)
return sorted(entries)
def find_block(lines: list[str]) -> Optional[tuple[int, int]]:
start_idx = None
for idx, line in enumerate(lines):
if line.strip() == PLAN_STATUS_START:
start_idx = idx
break
if start_idx is None:
return None
for idx in range(start_idx + 1, len(lines)):
if lines[idx].strip() == PLAN_STATUS_END:
return start_idx, idx
return None
def parse_entries(lines: list[str], start_idx: int, end_idx: int) -> list[tuple[str, str, Optional[str], int]]:
entries: list[tuple[str, str, Optional[str], int]] = []
for idx in range(start_idx + 1, end_idx):
line = lines[idx].strip()
match = PLAN_LINE_RE.match(line)
if not match:
continue
plan_key = normalize_plan_key(match.group("plan"))
status = match.group("status")
note = match.group("note")
entries.append((plan_key, status, note, idx))
return entries
def render_progress_lines(plans: list[str]) -> list[str]:
lines = ["# Plan 状态", "", PLAN_STATUS_START]
for plan_key in plans:
lines.append(render_plan_line(plan_key, "pending", None))
lines.append(PLAN_STATUS_END)
return lines
def select_plan(plans_dir: Path, progress_path: Path) -> tuple[int, str]:
if not plans_dir.is_dir():
return 2, f"ERROR: plans dir not found: {plans_dir}"
plan_keys = list_plan_files(plans_dir)
if not plan_keys:
return 2, "ERROR: no plan files found"
progress_path.parent.mkdir(parents=True, exist_ok=True)
if progress_path.exists():
lines = progress_path.read_text(encoding="utf-8").splitlines()
else:
lines = []
block = find_block(lines)
if not block:
lines = render_progress_lines(plan_keys)
progress_path.write_text("\n".join(lines) + "\n", encoding="utf-8")
return 0, (plans_dir / plan_keys[0]).as_posix()
start_idx, end_idx = block
entries = parse_entries(lines, start_idx, end_idx)
existing = {plan for plan, _, _, _ in entries}
missing = [plan for plan in plan_keys if plan not in existing]
if missing:
insert_lines = [render_plan_line(plan, "pending", None) for plan in missing]
lines[end_idx:end_idx] = insert_lines
end_idx += len(insert_lines)
progress_path.write_text("\n".join(lines) + "\n", encoding="utf-8")
entries = parse_entries(lines, start_idx, end_idx)
for plan_key, status, _, _ in entries:
if status == "pending":
return 0, (plans_dir / plan_key).as_posix()
return 2, "ERROR: no pending plans"
def record_status(plan: str, status: str, progress_path: Path, note: Optional[str]) -> tuple[int, str]:
if status not in VALID_STATUSES:
return 2, f"ERROR: invalid status: {status}"
@ -147,24 +170,38 @@ def record_status(plan: str, status: str, progress_path: Path, note: Optional[st
progress_path.parent.mkdir(parents=True, exist_ok=True)
if progress_path.exists():
text = progress_path.read_text(encoding="utf-8")
lines = progress_path.read_text(encoding="utf-8").splitlines()
else:
text = "# 开发进度追踪\n"
lines = []
text = ensure_plan_section(text)
if not text.endswith("\n"):
text += "\n"
plan_key = normalize_plan_key(plan)
block = find_block(lines)
if not block:
lines = render_progress_lines([plan_key])
block = find_block(lines)
if not block:
return 2, "ERROR: failed to create plan status block"
date_value = datetime.now().strftime("%Y-%m-%d")
plan_path = Path(plan).as_posix()
line = f"{PLAN_PREFIX} {plan_path} | status={status} | date={date_value}"
if note:
cleaned = normalize_note(note)
if cleaned:
line += f" | note={cleaned}"
text += line + "\n"
progress_path.write_text(text, encoding="utf-8")
return 0, line
start_idx, end_idx = block
entries = parse_entries(lines, start_idx, end_idx)
rendered_note = None
if status == "blocked" and note:
rendered_note = normalize_note(note)
updated_line = render_plan_line(plan_key, status, rendered_note)
updated = False
for entry_plan, _, _, idx in entries:
if entry_plan == plan_key:
lines[idx] = updated_line
updated = True
break
if not updated:
lines[end_idx:end_idx] = [updated_line]
progress_path.write_text("\n".join(lines) + "\n", encoding="utf-8")
return 0, updated_line
def main(argv: list[str]) -> int:
@ -222,4 +259,4 @@ def main(argv: list[str]) -> int:
if __name__ == "__main__":
sys.exit(main(sys.argv[1:]))
raise SystemExit(main(sys.argv[1:]))

View File

@ -1,6 +1,6 @@
# AGENT_RULES
目的:为本仓库提供稳定的执行流程。
目的:为本仓库提供稳定的执行流程与行为规范
## 优先级
@ -14,6 +14,35 @@
- 不得在代码/日志/注释中写入明文密钥、密码、Token
- 修改鉴权/权限逻辑必须说明动机与风险
- 不确定是否敏感时按敏感信息处理
- 执行修改文件系统的命令前,必须解释目的和潜在影响
## 行为准则
### 项目适应
- **模仿项目风格**:优先分析周围代码和配置,遵循现有约定
- **不假设可用性**:不假设库或框架可用,先验证再使用
- **完整完成请求**:不遗漏用户要求的任何部分
### 技术态度
- **准确性优先**:技术准确性优先于迎合用户
- **诚实纠正**:发现用户理解有误时,礼貌纠正
- **先查后答**:不确定时先调查再回答
### 避免过度工程
- **只做要求的**:不主动添加未要求的功能或重构
- **不过度抽象**:不为一次性操作创建工具函数
- **不为未来设计**:不为假设的未来需求设计
## 沟通原则
- **简洁直接**:专业、直接、简洁,避免对话填充词
- **拒绝时提供替代**:无法满足请求时,简洁说明并提供替代方案
- **不给时间估算**:专注任务本身,让用户自己判断时间
- **代码块标注语言**:输出代码时标注语言类型
- **不使用 emoji**:除非用户明确要求
## 上下文加载(每次会话开始)
@ -30,23 +59,47 @@
**目的**:让 AI 快速理解项目全貌,避免重复解释。
## 规划与执行分工
| 阶段 | 工具 | 产出 | 留痕 |
| ------------ | ---------------------- | ----------------- | -------------------- |
| 头脑风暴 | `$brainstorming` skill | 设计思路 | 无 |
| 生成计划 | `$writing-plans` skill | `docs/plans/*.md` | 无 |
| **执行计划** | **主循环** | 代码/配置变更 | **plan_progress.py** |
> **重要**:第三方 skills 不记录操作状态,执行必须通过主循环完成。
## 主循环
**触发词**
| 触发词 | 模式 | 说明 |
| --------------------------------------- | ---------- | ---------------------- |
| `执行主循环`、`继续执行`、`下一个 Plan` | 常规模式 | 遇确认场景可询问用户 |
| `自动执行所有 Plan` | 无交互模式 | 不询问,按规则自动处理 |
0. 选择 Plan
- 运行 `python {{PLAYBOOK_SCRIPTS}}/plan_progress.py select -plans docs/plans -progress memory-bank/progress.md`
- 如无可执行 Plan说明情况并询问用户下一步新增 Plan/切换任务/结束)
1. 标记开始:
- `python {{PLAYBOOK_SCRIPTS}}/plan_progress.py record -plan <plan> -status in-progress -progress memory-bank/progress.md`
2. 阅读 Plan
- 如无可执行 Plan跳到步骤 4
1. 阅读 Plan
- 理解目标、子任务与验证标准
3. 逐步执行:
2. 逐步执行:
- 按顺序执行子任务
- 每步完成后进行必要验证(测试/日志/diff
- 遇到阻塞立即记录并停止
4. 记录结果(写入 `memory-bank/progress.md`
- 遇到歧义/风险/决策点:
- 常规模式:记录到回复中,可询问用户
- 无交互模式:按「需要确认的场景」规则自动处理
- 遇到阻塞:记录原因,跳到步骤 3 标记 blocked
- **安全红线阻塞**(发现明文密钥等):立即停止,不继续后续 Plan
3. 记录结果:
- 完成:`python {{PLAYBOOK_SCRIPTS}}/plan_progress.py record -plan <plan> -status done -progress memory-bank/progress.md`
- 阻塞:`python {{PLAYBOOK_SCRIPTS}}/plan_progress.py record -plan <plan> -status blocked -progress memory-bank/progress.md -note <原因>`
5. 如存在歧义/风险/决策点,在回复中明确提出,并视需要记录到 `memory-bank/decisions.md`
- 回到步骤 0 继续下一个 Plan
4. 汇总报告(所有 Plan 处理完毕后):
- 列出已完成的 Plan
- 列出阻塞的 Plan 及原因
- 列出待确认的歧义/风险/决策点
- 如需记录重要决策,写入 `memory-bank/decisions.md`
## Plan 规则
@ -64,20 +117,37 @@
## 执行约束
### 代码修改约束
### 代码修改
- **必须先读文件再修改**:不读文件就提议修改是禁止的
- **必须运行测试验证**:相关测试必须通过
- **遵循换行规则**:遵循 `.gitattributes` 规则
- **命名一致性**:遵循项目现有的命名风格
- **最小改动原则**:只修改必要的部分,不顺手重构
### 决策记录约束
### 决策记录
- **重要决策**:记录到 `memory-bank/decisions.md`ADR 格式)
- **待确认事项**:在回复中列出并等待确认
- **进度留痕**:通过 `{{PLAYBOOK_SCRIPTS}}/plan_progress.py` 写入 `memory-bank/progress.md`,该文件为 Plan 状态唯一权威
- **进度留痕**:通过 `{{PLAYBOOK_SCRIPTS}}/plan_progress.py` 维护 `memory-bank/progress.md` 的 Plan 状态块(唯一权威)
### Git 操作
- **不使用 --amend**:除非用户明确要求,总是创建新提交
- **不使用 --force**:特别是推送到 main/master如用户要求必须警告风险
- **不跳过 hooks**:不使用 `--no-verify`
## 工具使用
- **并行执行**:独立的工具调用尽可能并行执行
- **遵循 schema**:严格遵循工具参数定义
- **避免循环**:避免重复调用同一工具获取相同信息
- **优先专用工具**:文件操作用 Read/Edit/Write搜索用 Grep/Glob
## 需要确认的场景
**常规模式**(可交互):
- 需求不明确或存在多种可行方案
- 需要行为/兼容性取舍
- 风险或约束冲突
@ -85,6 +155,21 @@
- **性能权衡**:需要在性能和可维护性之间选择
- **兼容性问题**:可能破坏现有用户代码
**无交互模式**(自动处理):
| 场景 | 处理方式 |
| -------------------------- | ---------------------------------- |
| 安全红线 | 立即停止,不继续后续 Plan |
| 架构变更/兼容性/破坏性修改 | 标记 blocked跳到下一个 Plan |
| 多种可行方案 | 选择最保守方案,记录选择理由到报告 |
| 歧义/风险/决策点 | 记录到报告,继续执行 |
**可以不确认**(两种模式通用):
- 明显的 bug 修复
- 符合现有模式的小改动
- 测试用例补充
## 验证清单
每个 Plan 完成后,必须验证:
@ -93,7 +178,7 @@
- [ ] 相关测试通过(如有测试且未被豁免)
- [ ] 换行符正确
- [ ] 无语法错误
- [ ] 已更新 `memory-bank/progress.md`
- [ ] 已通过 `plan_progress.py` 记录 Plan 状态
---

View File

@ -1,31 +1,4 @@
# 开发进度追踪
# Plan 状态
## 已知问题
<!-- 记录已知但暂不解决的问题 -->
#### {{ISSUE_CATEGORY_1}}
- {{ISSUE_1}}
- **临时方案**{{WORKAROUND_1}}
- **长期方案**{{SOLUTION_1}}
## 里程碑
#### M1: {{MILESTONE_1}}(目标:{{TARGET_DATE_1}}
- [ ] {{MILESTONE_1_TASK_1}}
- [ ] {{MILESTONE_1_TASK_2}}
#### M2: {{MILESTONE_2}}(目标:{{TARGET_DATE_2}}
- [ ] {{MILESTONE_2_TASK_1}}
- [ ] {{MILESTONE_2_TASK_2}}
## Plan 状态记录
<!-- 由 plan_progress.py 自动管理,请勿手动编辑此节内容 -->
---
**最后更新**{{DATE}}
<!-- plan-status:start -->
<!-- plan-status:end -->

View File

@ -18,7 +18,7 @@ def run_cli(*args, cwd=None):
class PlanProgressCliTests(unittest.TestCase):
def test_select_prefers_in_progress(self):
def test_select_seeds_progress_when_missing(self):
with tempfile.TemporaryDirectory() as tmp_dir:
root = Path(tmp_dir)
plans_dir = root / "docs" / "plans"
@ -26,13 +26,6 @@ class PlanProgressCliTests(unittest.TestCase):
(plans_dir / "2026-01-01-old.md").write_text("old", encoding="utf-8")
(plans_dir / "2026-01-02-new.md").write_text("new", encoding="utf-8")
progress = root / "memory-bank" / "progress.md"
progress.parent.mkdir(parents=True)
progress.write_text(
"[PLAN] docs/plans/2026-01-01-old.md | status=in-progress | date=2026-01-03\n",
encoding="utf-8",
)
result = run_cli(
"select",
"-plans",
@ -42,10 +35,17 @@ class PlanProgressCliTests(unittest.TestCase):
cwd=root,
)
self.assertEqual(result.returncode, 0)
self.assertEqual(result.returncode, 0, msg=result.stderr)
self.assertEqual(result.stdout.strip(), "docs/plans/2026-01-01-old.md")
def test_select_skips_done_and_blocked(self):
progress = root / "memory-bank" / "progress.md"
text = progress.read_text(encoding="utf-8")
self.assertIn("<!-- plan-status:start -->", text)
self.assertIn("<!-- plan-status:end -->", text)
self.assertIn("`2026-01-01-old.md` pending", text)
self.assertIn("`2026-01-02-new.md` pending", text)
def test_select_returns_first_pending_in_order(self):
with tempfile.TemporaryDirectory() as tmp_dir:
root = Path(tmp_dir)
plans_dir = root / "docs" / "plans"
@ -58,8 +58,12 @@ class PlanProgressCliTests(unittest.TestCase):
progress.write_text(
"\n".join(
[
"[PLAN] docs/plans/2026-01-02-b.md | status=done | date=2026-01-03",
"[PLAN] docs/plans/2026-01-01-a.md | status=blocked | date=2026-01-03",
"# Plan 状态",
"",
"<!-- plan-status:start -->",
"- [ ] `2026-01-02-b.md` pending",
"- [ ] `2026-01-01-a.md` pending",
"<!-- plan-status:end -->",
"",
]
),
@ -75,13 +79,27 @@ class PlanProgressCliTests(unittest.TestCase):
cwd=root,
)
self.assertNotEqual(result.returncode, 0)
self.assertIn("no pending plans", (result.stdout + result.stderr).lower())
self.assertEqual(result.returncode, 0, msg=result.stderr)
self.assertEqual(result.stdout.strip(), "docs/plans/2026-01-02-b.md")
def test_record_creates_section(self):
def test_record_updates_line(self):
with tempfile.TemporaryDirectory() as tmp_dir:
root = Path(tmp_dir)
progress = root / "memory-bank" / "progress.md"
progress.parent.mkdir(parents=True)
progress.write_text(
"\n".join(
[
"# Plan 状态",
"",
"<!-- plan-status:start -->",
"- [ ] `2026-01-03-demo.md` pending",
"<!-- plan-status:end -->",
"",
]
),
encoding="utf-8",
)
result = run_cli(
"record",
@ -91,15 +109,13 @@ class PlanProgressCliTests(unittest.TestCase):
"done",
"-progress",
"memory-bank/progress.md",
"-note",
"done",
cwd=root,
)
self.assertEqual(result.returncode, 0)
self.assertEqual(result.returncode, 0, msg=result.stderr)
text = progress.read_text(encoding="utf-8")
self.assertIn("## Plan 状态记录", text)
self.assertIn("status=done", text)
self.assertIn("- [x] `2026-01-03-demo.md` done", text)
self.assertEqual(text.count("2026-01-03-demo.md"), 1)
if __name__ == "__main__":