#!/usr/bin/env python3
import re
import sys
from datetime import datetime
from pathlib import Path
from typing import Optional
PLAN_PREFIX = "[PLAN]"
PLAN_SECTION_HEADER = "## Plan 状态记录"
PLAN_FILE_RE = re.compile(r"^(\d{4}-\d{2}-\d{2})-.+\.md$")
VALID_STATUSES = {"in-progress", "done", "blocked"}
def usage() -> str:
return (
"Usage:\\n"
" python scripts/plan_progress.py select -plans
-progress \\n"
" python scripts/plan_progress.py record -plan -status -progress [-note ]\\n"
" python scripts/plan_progress.py -h\\n"
"Options:\\n"
" -plans DIR\\n"
" -plan PATH\\n"
" -status in-progress|done|blocked\\n"
" -progress FILE\\n"
" -note TEXT\\n"
" -h, -help Show this help.\\n"
)
def parse_flags(args: list[str]) -> dict[str, str]:
flags: dict[str, str] = {}
idx = 0
while idx < len(args):
arg = args[idx]
if arg in ("-h", "-help"):
raise ValueError("help")
if not arg.startswith("-"):
raise ValueError(f"unexpected arg: {arg}")
if idx + 1 >= len(args):
raise ValueError(f"missing value for {arg}")
flags[arg] = args[idx + 1]
idx += 2
return flags
def normalize_plan_key(plan_value: str, cwd: Path) -> str:
try:
return Path(plan_value).resolve().relative_to(cwd.resolve()).as_posix()
except ValueError:
return Path(plan_value).as_posix()
def load_plan_records(progress_path: Path, cwd: Path) -> dict[str, str]:
if not progress_path.exists():
return {}
text = progress_path.read_text(encoding="utf-8")
records: dict[str, str] = {}
for line in text.splitlines():
if not line.startswith(PLAN_PREFIX):
continue
payload = line[len(PLAN_PREFIX) :].strip()
if not payload:
continue
segments = [seg.strip() for seg in payload.split("|")]
if not segments:
continue
plan_path = segments[0]
status = None
for seg in segments[1:]:
if "=" not in seg:
continue
key, value = seg.split("=", 1)
if key.strip() == "status":
status = value.strip()
if not plan_path or status is None:
continue
records[normalize_plan_key(plan_path, cwd)] = status
return records
def list_plan_files(plans_dir: Path, cwd: Path) -> list[tuple[str, Path, str]]:
entries: list[tuple[str, Path, str]] = []
for path in plans_dir.iterdir():
if not path.is_file():
continue
match = PLAN_FILE_RE.match(path.name)
if not match:
continue
date_value = match.group(1)
try:
rel = path.resolve().relative_to(cwd.resolve()).as_posix()
except ValueError:
rel = path.as_posix()
entries.append((date_value, path, rel))
return entries
def select_plan(plans_dir: Path, progress_path: Path) -> tuple[int, str]:
cwd = Path.cwd()
if not plans_dir.is_dir():
return 2, f"ERROR: plans dir not found: {plans_dir}"
plans = list_plan_files(plans_dir, cwd)
if not plans:
return 2, "ERROR: no plan files found"
records = load_plan_records(progress_path, cwd)
in_progress = [item for item in plans if records.get(item[2]) == "in-progress"]
if in_progress:
in_progress.sort(key=lambda item: (item[0], item[2]))
return 0, in_progress[-1][2]
pending = [
item
for item in plans
if records.get(item[2]) not in ("done", "blocked")
]
if not pending:
return 2, "ERROR: no pending plans"
pending.sort(key=lambda item: (item[0], item[2]))
return 0, pending[-1][2]
def ensure_plan_section(text: str) -> str:
if PLAN_SECTION_HEADER in text:
return text
suffix = text
if suffix and not suffix.endswith("\n"):
suffix += "\n"
if suffix:
suffix += "\n"
suffix += PLAN_SECTION_HEADER + "\n"
return suffix
def normalize_note(note: str) -> str:
cleaned = note.replace("\n", " ").replace("|", " ").strip()
return cleaned
def record_status(plan: str, status: str, progress_path: Path, note: Optional[str]) -> tuple[int, str]:
if status not in VALID_STATUSES:
return 2, f"ERROR: invalid status: {status}"
if not plan:
return 2, "ERROR: plan is required"
progress_path.parent.mkdir(parents=True, exist_ok=True)
if progress_path.exists():
text = progress_path.read_text(encoding="utf-8")
else:
text = "# 开发进度追踪\n"
text = ensure_plan_section(text)
if not text.endswith("\n"):
text += "\n"
date_value = datetime.now().strftime("%Y-%m-%d")
plan_path = Path(plan).as_posix()
line = f"{PLAN_PREFIX} {plan_path} | status={status} | date={date_value}"
if note:
cleaned = normalize_note(note)
if cleaned:
line += f" | note={cleaned}"
text += line + "\n"
progress_path.write_text(text, encoding="utf-8")
return 0, line
def main(argv: list[str]) -> int:
if not argv:
print(usage(), file=sys.stderr)
return 2
if argv[0] in ("-h", "-help"):
print(usage())
return 0
mode = argv[0]
if mode not in ("select", "record"):
print(f"ERROR: unknown mode: {mode}", file=sys.stderr)
print(usage(), file=sys.stderr)
return 2
try:
flags = parse_flags(argv[1:])
except ValueError as exc:
if str(exc) == "help":
print(usage())
return 0
print(f"ERROR: {exc}", file=sys.stderr)
print(usage(), file=sys.stderr)
return 2
if mode == "select":
plans = flags.get("-plans")
progress = flags.get("-progress")
if not plans or not progress:
print("ERROR: -plans and -progress are required", file=sys.stderr)
print(usage(), file=sys.stderr)
return 2
code, message = select_plan(Path(plans), Path(progress))
if code != 0:
print(message, file=sys.stderr)
return code
print(message)
return 0
plan = flags.get("-plan")
status = flags.get("-status")
progress = flags.get("-progress")
note = flags.get("-note")
if not plan or not status or not progress:
print("ERROR: -plan, -status, and -progress are required", file=sys.stderr)
print(usage(), file=sys.stderr)
return 2
code, message = record_status(plan, status, Path(progress), note)
if code != 0:
print(message, file=sys.stderr)
return code
print(message)
return 0
if __name__ == "__main__":
sys.exit(main(sys.argv[1:]))