297 lines
9.3 KiB
Python
297 lines
9.3 KiB
Python
#!/usr/bin/env python3
|
|
import platform
|
|
import re
|
|
import sys
|
|
from pathlib import Path
|
|
from typing import Optional
|
|
|
|
PLAN_STATUS_START = "<!-- plan-status:start -->"
|
|
PLAN_STATUS_END = "<!-- plan-status:end -->"
|
|
PLAN_FILE_RE = re.compile(r"^(\d{4}-\d{2}-\d{2})-.+\.md$")
|
|
PLAN_LINE_RE = re.compile(
|
|
r"^- \[(?P<check>[ xX])\] `(?P<plan>[^`]+)` (?P<status>done|blocked|pending|in-progress|skipped)(?:: (?P<note>.*))?$"
|
|
)
|
|
VALID_STATUSES = {"done", "blocked", "pending", "in-progress", "skipped"}
|
|
|
|
|
|
def usage() -> str:
|
|
return (
|
|
"Usage:\n"
|
|
" python scripts/plan_progress.py select -plans <dir> -progress <file>\n"
|
|
" python scripts/plan_progress.py record -plan <path> -status <status> -progress <file> [-note <text>]\n"
|
|
" python scripts/plan_progress.py -h\n"
|
|
"Options:\n"
|
|
" -plans DIR\n"
|
|
" -plan PATH\n"
|
|
" -status done|blocked|pending|in-progress|skipped\n"
|
|
" -progress FILE\n"
|
|
" -note TEXT\n"
|
|
" -h, -help Show this help.\n"
|
|
)
|
|
|
|
|
|
def parse_flags(args: list[str]) -> dict[str, str]:
|
|
flags: dict[str, str] = {}
|
|
idx = 0
|
|
while idx < len(args):
|
|
arg = args[idx]
|
|
if arg in ("-h", "-help"):
|
|
raise ValueError("help")
|
|
if not arg.startswith("-"):
|
|
raise ValueError(f"unexpected arg: {arg}")
|
|
if idx + 1 >= len(args):
|
|
raise ValueError(f"missing value for {arg}")
|
|
flags[arg] = args[idx + 1]
|
|
idx += 2
|
|
return flags
|
|
|
|
|
|
def normalize_plan_key(plan_value: str) -> str:
|
|
raw = plan_value.strip().replace("\\", "/")
|
|
raw = raw.lstrip("./")
|
|
if raw.startswith("docs/plans/"):
|
|
return raw[len("docs/plans/") :]
|
|
marker = "/docs/plans/"
|
|
if marker in raw:
|
|
return raw.split(marker, 1)[1]
|
|
return raw
|
|
|
|
|
|
def render_plan_line(plan_key: str, status: str, note: Optional[str]) -> str:
|
|
checked = "x" if status == "done" else " "
|
|
if status == "blocked":
|
|
suffix = "blocked"
|
|
if note:
|
|
suffix += f": {note}"
|
|
elif status == "pending":
|
|
suffix = "pending"
|
|
elif status == "in-progress":
|
|
suffix = "in-progress"
|
|
elif status == "skipped":
|
|
suffix = "skipped"
|
|
if note:
|
|
suffix += f": {note}"
|
|
else:
|
|
suffix = "done"
|
|
return f"- [{checked}] `{plan_key}` {suffix}"
|
|
|
|
|
|
def normalize_note(note: str) -> str:
|
|
cleaned = note.replace("\n", " ").replace("\r", " ").replace("`", "'").strip()
|
|
return cleaned
|
|
|
|
|
|
def list_plan_files(plans_dir: Path) -> list[str]:
|
|
entries: list[str] = []
|
|
for path in plans_dir.iterdir():
|
|
if not path.is_file():
|
|
continue
|
|
if not PLAN_FILE_RE.match(path.name):
|
|
continue
|
|
try:
|
|
rel = path.resolve().relative_to(plans_dir.resolve()).as_posix()
|
|
except ValueError:
|
|
rel = path.name
|
|
entries.append(rel)
|
|
return sorted(entries)
|
|
|
|
|
|
def find_block(lines: list[str]) -> Optional[tuple[int, int]]:
|
|
start_idx = None
|
|
for idx, line in enumerate(lines):
|
|
if line.strip() == PLAN_STATUS_START:
|
|
start_idx = idx
|
|
break
|
|
if start_idx is None:
|
|
return None
|
|
for idx in range(start_idx + 1, len(lines)):
|
|
if lines[idx].strip() == PLAN_STATUS_END:
|
|
return start_idx, idx
|
|
return None
|
|
|
|
|
|
def parse_entries(lines: list[str], start_idx: int, end_idx: int) -> list[tuple[str, str, Optional[str], int]]:
|
|
entries: list[tuple[str, str, Optional[str], int]] = []
|
|
for idx in range(start_idx + 1, end_idx):
|
|
line = lines[idx].strip()
|
|
match = PLAN_LINE_RE.match(line)
|
|
if not match:
|
|
continue
|
|
plan_key = normalize_plan_key(match.group("plan"))
|
|
status = match.group("status")
|
|
note = match.group("note")
|
|
entries.append((plan_key, status, note, idx))
|
|
return entries
|
|
|
|
|
|
def render_progress_lines(plans: list[str]) -> list[str]:
|
|
lines = ["# Plan 状态", "", PLAN_STATUS_START]
|
|
for plan_key in plans:
|
|
lines.append(render_plan_line(plan_key, "pending", None))
|
|
lines.append(PLAN_STATUS_END)
|
|
return lines
|
|
|
|
|
|
ENV_BLOCKED_RE = re.compile(r"^env:([^:]+):(.+)$")
|
|
|
|
|
|
def parse_env_blocked_note(note: Optional[str]) -> Optional[tuple[str, str]]:
|
|
"""Parse 'env:windows:Task2,Task4' format. Returns (env, tasks) or None."""
|
|
if not note:
|
|
return None
|
|
match = ENV_BLOCKED_RE.match(note)
|
|
if match:
|
|
return match.group(1), match.group(2)
|
|
return None
|
|
|
|
|
|
def detect_env() -> Optional[str]:
|
|
mapping = {"windows": "windows", "linux": "linux", "darwin": "darwin"}
|
|
return mapping.get(platform.system().lower())
|
|
|
|
|
|
def select_plan(plans_dir: Path, progress_path: Path) -> tuple[int, str]:
|
|
if not plans_dir.is_dir():
|
|
return 2, f"ERROR: plans dir not found: {plans_dir}"
|
|
plan_keys = list_plan_files(plans_dir)
|
|
if not plan_keys:
|
|
return 2, "ERROR: no plan files found"
|
|
|
|
progress_path.parent.mkdir(parents=True, exist_ok=True)
|
|
if progress_path.exists():
|
|
lines = progress_path.read_text(encoding="utf-8").splitlines()
|
|
else:
|
|
lines = []
|
|
|
|
block = find_block(lines)
|
|
if not block:
|
|
lines = render_progress_lines(plan_keys)
|
|
progress_path.write_text("\n".join(lines) + "\n", encoding="utf-8")
|
|
return 0, (plans_dir / plan_keys[0]).as_posix()
|
|
|
|
start_idx, end_idx = block
|
|
entries = parse_entries(lines, start_idx, end_idx)
|
|
existing = {plan for plan, _, _, _ in entries}
|
|
missing = [plan for plan in plan_keys if plan not in existing]
|
|
if missing:
|
|
insert_lines = [render_plan_line(plan, "pending", None) for plan in missing]
|
|
lines[end_idx:end_idx] = insert_lines
|
|
end_idx += len(insert_lines)
|
|
progress_path.write_text("\n".join(lines) + "\n", encoding="utf-8")
|
|
entries = parse_entries(lines, start_idx, end_idx)
|
|
|
|
for plan_key, status, note, _ in entries:
|
|
if status in ("pending", "in-progress"):
|
|
return 0, (plans_dir / plan_key).as_posix()
|
|
|
|
# Check for env-blocked Plans if current environment is detected
|
|
current_env = detect_env()
|
|
if current_env:
|
|
for plan_key, status, note, _ in entries:
|
|
if status == "blocked":
|
|
env_info = parse_env_blocked_note(note)
|
|
if env_info and env_info[0] == current_env:
|
|
return 0, (plans_dir / plan_key).as_posix()
|
|
|
|
return 2, "ERROR: no pending plans"
|
|
|
|
|
|
def record_status(plan: str, status: str, progress_path: Path, note: Optional[str]) -> tuple[int, str]:
|
|
if status not in VALID_STATUSES:
|
|
return 2, f"ERROR: invalid status: {status}"
|
|
if not plan:
|
|
return 2, "ERROR: plan is required"
|
|
|
|
progress_path.parent.mkdir(parents=True, exist_ok=True)
|
|
if progress_path.exists():
|
|
lines = progress_path.read_text(encoding="utf-8").splitlines()
|
|
else:
|
|
lines = []
|
|
|
|
plan_key = normalize_plan_key(plan)
|
|
block = find_block(lines)
|
|
if not block:
|
|
lines = render_progress_lines([plan_key])
|
|
block = find_block(lines)
|
|
if not block:
|
|
return 2, "ERROR: failed to create plan status block"
|
|
|
|
start_idx, end_idx = block
|
|
entries = parse_entries(lines, start_idx, end_idx)
|
|
|
|
rendered_note = None
|
|
if status in ("blocked", "skipped") and note:
|
|
rendered_note = normalize_note(note)
|
|
|
|
updated_line = render_plan_line(plan_key, status, rendered_note)
|
|
updated = False
|
|
for entry_plan, _, _, idx in entries:
|
|
if entry_plan == plan_key:
|
|
lines[idx] = updated_line
|
|
updated = True
|
|
break
|
|
|
|
if not updated:
|
|
lines[end_idx:end_idx] = [updated_line]
|
|
|
|
progress_path.write_text("\n".join(lines) + "\n", encoding="utf-8")
|
|
return 0, updated_line
|
|
|
|
|
|
def main(argv: list[str]) -> int:
|
|
if not argv:
|
|
print(usage(), file=sys.stderr)
|
|
return 2
|
|
if argv[0] in ("-h", "-help"):
|
|
print(usage())
|
|
return 0
|
|
|
|
mode = argv[0]
|
|
if mode not in ("select", "record"):
|
|
print(f"ERROR: unknown mode: {mode}", file=sys.stderr)
|
|
print(usage(), file=sys.stderr)
|
|
return 2
|
|
|
|
try:
|
|
flags = parse_flags(argv[1:])
|
|
except ValueError as exc:
|
|
if str(exc) == "help":
|
|
print(usage())
|
|
return 0
|
|
print(f"ERROR: {exc}", file=sys.stderr)
|
|
print(usage(), file=sys.stderr)
|
|
return 2
|
|
|
|
if mode == "select":
|
|
plans = flags.get("-plans")
|
|
progress = flags.get("-progress")
|
|
if not plans or not progress:
|
|
print("ERROR: -plans and -progress are required", file=sys.stderr)
|
|
print(usage(), file=sys.stderr)
|
|
return 2
|
|
code, message = select_plan(Path(plans), Path(progress))
|
|
if code != 0:
|
|
print(message, file=sys.stderr)
|
|
return code
|
|
print(message)
|
|
return 0
|
|
|
|
plan = flags.get("-plan")
|
|
status = flags.get("-status")
|
|
progress = flags.get("-progress")
|
|
note = flags.get("-note")
|
|
if not plan or not status or not progress:
|
|
print("ERROR: -plan, -status, and -progress are required", file=sys.stderr)
|
|
print(usage(), file=sys.stderr)
|
|
return 2
|
|
code, message = record_status(plan, status, Path(progress), note)
|
|
if code != 0:
|
|
print(message, file=sys.stderr)
|
|
return code
|
|
print(message)
|
|
return 0
|
|
|
|
|
|
if __name__ == "__main__":
|
|
raise SystemExit(main(sys.argv[1:]))
|