#!/usr/bin/env python3 from contextlib import contextmanager from datetime import datetime, timezone import getpass import os import platform import re import socket import sys import threading import time from pathlib import Path from typing import Optional try: import fcntl except ImportError: # pragma: no cover fcntl = None try: import msvcrt except ImportError: # pragma: no cover msvcrt = None PLAN_STATUS_START = "" PLAN_STATUS_END = "" WORKFLOW_STATE_START = "" WORKFLOW_STATE_END = "" PLAN_FILE_RE = re.compile(r"^(\d{4}-\d{2}-\d{2})-.+\.md$") PLAN_LINE_RE = re.compile( r"^- \[(?P[ xX])\] `(?P[^`]+)` " r"(?Pdone|blocked|pending|in-progress|skipped)" r"(?:: (?P.*))?$" ) FINISH_STATUSES = {"done", "blocked", "skipped"} ENV_BLOCKED_RE = re.compile(r"^env:([^:]+):(.+)$") PLAN_META_REQUIRED_FIELDS = ( "Plan Group", "Parent Plan", "Verification Scope", "Verification Gate", ) WORKFLOW_STATE_KEYS = { "phase", "spec", "plan", "executor", "constraints", "claimed_by", "claimed_at", "verification", } WORKFLOW_PHASES = { "brainstorming", "planning", "executing", "done", "blocked", "skipped", } THREAD_LOCKS: dict[str, threading.Lock] = {} THREAD_LOCKS_GUARD = threading.Lock() def usage() -> str: return ( "Usage:\n" " python scripts/main_loop.py claim -plans -progress \n" " python scripts/main_loop.py finish -plan -status " "-progress [-note ] [-verified ]\n" " python scripts/main_loop.py status -plans -progress \n" " python scripts/main_loop.py record -progress -phase " "[-spec ] [-plan ] [-executor ] " "[-constraints ]\n" " python scripts/main_loop.py -h\n" "Options:\n" " -plans DIR\n" " -plan PATH\n" " -status done|blocked|skipped\n" " -progress FILE\n" " -phase brainstorming|planning|executing|done|blocked|skipped\n" " -spec PATH\n" " -executor NAME\n" " -constraints CSV\n" " -note TEXT\n" " -owner NAME\n" " -verified TEXT\n" " -h, -help Show this help.\n" ) def parse_flags(args: list[str]) -> dict[str, str]: flags: dict[str, str] = {} idx = 0 while idx < len(args): arg = args[idx] if arg in ("-h", "-help"): raise ValueError("help") if not arg.startswith("-"): raise ValueError(f"unexpected arg: {arg}") if idx + 1 >= len(args): raise ValueError(f"missing value for {arg}") flags[arg] = args[idx + 1] idx += 2 return flags def normalize_plan_key(plan_value: str) -> str: raw = plan_value.strip().replace("\\", "/") raw = raw.lstrip("./") if raw.startswith("docs/superpowers/plans/"): return raw[len("docs/superpowers/plans/") :] marker = "/docs/superpowers/plans/" if marker in raw: return raw.split(marker, 1)[1] return raw def normalize_note(note: str) -> str: return note.replace("\n", " ").replace("\r", " ").replace("`", "'").strip() def now_utc() -> str: return datetime.now(timezone.utc).replace(microsecond=0).isoformat().replace( "+00:00", "Z" ) def default_claim_owner() -> str: owner = os.environ.get("PLAYBOOK_MAIN_LOOP_OWNER") if owner: return normalize_note(owner) host = socket.gethostname() or "unknown-host" try: user = getpass.getuser() except Exception: # pragma: no cover user = "unknown-user" return f"{user}@{host}:{os.getpid()}" def render_plan_line(plan_key: str, status: str, note: Optional[str]) -> str: checked = "x" if status == "done" else " " suffix = status if note: suffix += f": {note}" return f"- [{checked}] `{plan_key}` {suffix}" def list_plan_files(plans_dir: Path) -> list[str]: entries: list[str] = [] for path in plans_dir.iterdir(): if not path.is_file(): continue if not PLAN_FILE_RE.match(path.name): continue entries.append(path.name) return sorted(entries) def find_block(lines: list[str]) -> Optional[tuple[int, int]]: start_idx = None for idx, line in enumerate(lines): if line.strip() == PLAN_STATUS_START: start_idx = idx break if start_idx is None: return None for idx in range(start_idx + 1, len(lines)): if lines[idx].strip() == PLAN_STATUS_END: return start_idx, idx return None def find_named_block( lines: list[str], start_marker: str, end_marker: str ) -> Optional[tuple[int, int]]: start_idx = None for idx, line in enumerate(lines): if line.strip() == start_marker: start_idx = idx break if start_idx is None: return None for idx in range(start_idx + 1, len(lines)): if lines[idx].strip() == end_marker: return start_idx, idx return None def parse_entries( lines: list[str], start_idx: int, end_idx: int ) -> list[tuple[str, str, Optional[str], int]]: entries: list[tuple[str, str, Optional[str], int]] = [] for idx in range(start_idx + 1, end_idx): line = lines[idx].strip() match = PLAN_LINE_RE.match(line) if not match: continue plan_key = normalize_plan_key(match.group("plan")) status = match.group("status") note = match.group("note") entries.append((plan_key, status, note, idx)) return entries def render_progress_lines(plans: list[str]) -> list[str]: lines = [ "# 当前进展", "", "## Workflow State", "", WORKFLOW_STATE_START, WORKFLOW_STATE_END, "", "## Plan Status", "", PLAN_STATUS_START, ] for plan_key in plans: lines.append(render_plan_line(plan_key, "pending", None)) lines.append(PLAN_STATUS_END) return lines def render_workflow_state_lines( phase: Optional[str] = None, spec: Optional[str] = None, plan: Optional[str] = None, executor: Optional[str] = None, constraints: Optional[str] = None, claimed_by: Optional[str] = None, claimed_at: Optional[str] = None, verification: Optional[str] = None, ) -> list[str]: lines = [WORKFLOW_STATE_START] for key, value in ( ("phase", phase), ("spec", spec), ("plan", plan), ("executor", executor), ("constraints", constraints), ("claimed_by", claimed_by), ("claimed_at", claimed_at), ("verification", verification), ): if value: lines.append(f"{key}: {value}") lines.append(WORKFLOW_STATE_END) return lines def parse_workflow_state( lines: list[str], start_idx: int, end_idx: int ) -> dict[str, str]: state: dict[str, str] = {} for idx in range(start_idx + 1, end_idx): line = lines[idx].strip() if ": " not in line: continue key, value = line.split(": ", 1) if key in WORKFLOW_STATE_KEYS: state[key] = value return state def parse_env_blocked_note(note: Optional[str]) -> Optional[tuple[str, str]]: if not note: return None match = ENV_BLOCKED_RE.match(note) if not match: return None return match.group(1), match.group(2) def detect_env() -> Optional[str]: mapping = {"windows": "windows", "linux": "linux", "darwin": "darwin"} return mapping.get(platform.system().lower()) def load_progress_lines(progress_path: Path) -> list[str]: progress_path.parent.mkdir(parents=True, exist_ok=True) if progress_path.exists(): return progress_path.read_text(encoding="utf-8").splitlines() return [] def write_progress_lines(progress_path: Path, lines: list[str]) -> None: progress_path.write_text("\n".join(lines) + "\n", encoding="utf-8", newline="\n") def get_thread_lock(lock_path: Path) -> threading.Lock: key = str(lock_path.resolve()) with THREAD_LOCKS_GUARD: lock = THREAD_LOCKS.get(key) if lock is None: lock = threading.Lock() THREAD_LOCKS[key] = lock return lock @contextmanager def locked_progress(progress_path: Path): progress_path.parent.mkdir(parents=True, exist_ok=True) lock_path = progress_path.with_name(f"{progress_path.name}.lock") thread_lock = get_thread_lock(lock_path) with thread_lock: with lock_path.open("a+b") as lock_file: if fcntl is not None: fcntl.flock(lock_file.fileno(), fcntl.LOCK_EX) elif msvcrt is not None: # pragma: no cover while True: try: lock_file.seek(0) msvcrt.locking(lock_file.fileno(), msvcrt.LK_LOCK, 1) break except OSError: time.sleep(0.05) try: hold_ms = os.environ.get("PLAYBOOK_MAIN_LOOP_HOLD_LOCK_MS") if hold_ms: time.sleep(max(0.0, float(hold_ms) / 1000.0)) yield finally: if fcntl is not None: fcntl.flock(lock_file.fileno(), fcntl.LOCK_UN) elif msvcrt is not None: # pragma: no cover lock_file.seek(0) msvcrt.locking(lock_file.fileno(), msvcrt.LK_UNLCK, 1) def ensure_section(lines: list[str], heading: str) -> list[str]: if any(line.strip() == heading for line in lines): return lines if lines and lines[-1] != "": lines.append("") lines.extend([heading, ""]) return lines def ensure_block_with_lines( lines: list[str], start_marker: str, end_marker: str, default_lines: list[str], heading: Optional[str] = None, ) -> tuple[list[str], int, int]: block = find_named_block(lines, start_marker, end_marker) if block: return lines, block[0], block[1] if not lines: lines = ["# 当前进展", ""] if heading: lines = ensure_section(lines, heading) if lines and lines[-1] != "": lines.append("") insert_at = len(lines) lines[insert_at:insert_at] = default_lines return lines, insert_at, insert_at + len(default_lines) - 1 def ensure_plan_block( lines: list[str], progress_path: Path, plan_keys: list[str] ) -> tuple[list[str], int, int]: lines, _, _ = ensure_workflow_state_block(lines) lines, start_idx, end_idx = ensure_block_with_lines( lines, PLAN_STATUS_START, PLAN_STATUS_END, [PLAN_STATUS_START, PLAN_STATUS_END], "## Plan Status", ) write_progress_lines(progress_path, lines) return lines, start_idx, end_idx def ensure_workflow_state_block( lines: list[str], ) -> tuple[list[str], int, int]: return ensure_block_with_lines( lines, WORKFLOW_STATE_START, WORKFLOW_STATE_END, [WORKFLOW_STATE_START, WORKFLOW_STATE_END], "## Workflow State", ) def update_workflow_state( lines: list[str], phase: Optional[str] = None, spec: Optional[str] = None, plan: Optional[str] = None, executor: Optional[str] = None, constraints: Optional[str] = None, claimed_by: Optional[str] = None, claimed_at: Optional[str] = None, verification: Optional[str] = None, clear_keys: tuple[str, ...] = (), ) -> list[str]: lines, start_idx, end_idx = ensure_workflow_state_block(lines) state = parse_workflow_state(lines, start_idx, end_idx) for key in clear_keys: state.pop(key, None) if phase is not None: state["phase"] = phase if spec is not None: state["spec"] = spec if plan is not None: state["plan"] = plan if executor is not None: state["executor"] = executor if constraints is not None: state["constraints"] = constraints if claimed_by is not None: state["claimed_by"] = claimed_by if claimed_at is not None: state["claimed_at"] = claimed_at if verification is not None: state["verification"] = verification lines[start_idx : end_idx + 1] = render_workflow_state_lines( state.get("phase"), state.get("spec"), state.get("plan"), state.get("executor"), state.get("constraints"), state.get("claimed_by"), state.get("claimed_at"), state.get("verification"), ) return lines def ensure_all_plans_present( lines: list[str], start_idx: int, end_idx: int, progress_path: Path, plan_keys: list[str], ) -> list[tuple[str, str, Optional[str], int]]: entries = parse_entries(lines, start_idx, end_idx) existing = {plan_key for plan_key, _, _, _ in entries} missing = [plan_key for plan_key in plan_keys if plan_key not in existing] if missing: insert_lines = [ render_plan_line(plan_key, "pending", None) for plan_key in missing ] lines[end_idx:end_idx] = insert_lines write_progress_lines(progress_path, lines) end_idx += len(insert_lines) entries = parse_entries(lines, start_idx, end_idx) return entries def filter_existing_entries( entries: list[tuple[str, str, Optional[str], int]], plan_keys: list[str] ) -> list[tuple[str, str, Optional[str], int]]: available = set(plan_keys) return [entry for entry in entries if entry[0] in available] def validate_plan_meta(plan_path: Path) -> list[str]: text = plan_path.read_text(encoding="utf-8") missing: list[str] = [] if not re.search(r"(?im)^##\s+Plan Meta\s*$", text): missing.append("Plan Meta") for field in PLAN_META_REQUIRED_FIELDS: pattern = rf"(?im)^\s*[-*]\s+(?:\*\*)?{re.escape(field)}(?:\*\*)?\s*:" if not re.search(pattern, text): missing.append(field) return missing def validate_plan_files(plans_dir: Path, plan_keys: list[str]) -> Optional[str]: for plan_key in plan_keys: missing = validate_plan_meta(plans_dir / plan_key) if missing: fields = ", ".join(missing) return f"ERROR: {plan_key} missing required Plan Meta fields: {fields}" return None def choose_claim_entry( entries: list[tuple[str, str, Optional[str], int]], current_env: Optional[str], plan_keys: list[str], ) -> Optional[tuple[str, Optional[str], int]]: entry_by_plan: dict[str, tuple[str, str, Optional[str], int]] = {} for entry in entries: entry_by_plan.setdefault(entry[0], entry) ordered_entries = [ entry_by_plan[plan_key] for plan_key in plan_keys if plan_key in entry_by_plan ] for plan_key, status, note, idx in ordered_entries: if status == "in-progress": return plan_key, note, idx for plan_key, status, note, idx in ordered_entries: if status == "pending": return plan_key, note, idx if status != "blocked" or not current_env: continue env_info = parse_env_blocked_note(note) if env_info and env_info[0] == current_env: return plan_key, note, idx return None def claim_plan( plans_dir: Path, progress_path: Path, owner: Optional[str] = None ) -> tuple[int, str]: if not plans_dir.is_dir(): return 2, f"ERROR: plans dir not found: {plans_dir}" plan_keys = list_plan_files(plans_dir) if not plan_keys: return 2, "ERROR: no plan files found" plan_error = validate_plan_files(plans_dir, plan_keys) if plan_error: return 2, plan_error with locked_progress(progress_path): lines = load_progress_lines(progress_path) try: lines, start_idx, end_idx = ensure_plan_block( lines, progress_path, plan_keys ) except ValueError as exc: return 2, f"ERROR: {exc}" entries = ensure_all_plans_present( lines, start_idx, end_idx, progress_path, plan_keys ) entries = filter_existing_entries(entries, plan_keys) chosen = choose_claim_entry(entries, detect_env(), plan_keys) if not chosen: return 0, "NOOP: no claimable plans" plan_key, note, idx = chosen lines[idx] = render_plan_line(plan_key, "in-progress", note) lines = update_workflow_state( lines, phase="executing", plan=(plans_dir / plan_key).as_posix(), claimed_by=normalize_note(owner) if owner else default_claim_owner(), claimed_at=now_utc(), clear_keys=("verification",), ) write_progress_lines(progress_path, lines) output = [f"PLAN={(plans_dir / plan_key).as_posix()}"] if note: output.append(f"NOTE={note}") return 0, "\n".join(output) def finish_plan( plan: str, status: str, progress_path: Path, note: Optional[str], verified: Optional[str] = None, ) -> tuple[int, str]: if status not in FINISH_STATUSES: return 2, f"ERROR: invalid status: {status}" if not plan: return 2, "ERROR: plan is required" if verified and status != "done": return 2, "ERROR: -verified is only valid with -status done" plan_key = normalize_plan_key(plan) with locked_progress(progress_path): lines = load_progress_lines(progress_path) try: lines, start_idx, end_idx = ensure_plan_block( lines, progress_path, [plan_key] ) except ValueError as exc: return 2, f"ERROR: {exc}" entries = parse_entries(lines, start_idx, end_idx) rendered_note = normalize_note(note) if note else None rendered_verified = normalize_note(verified) if verified else None verification_clear_keys = () if rendered_verified else ("verification",) if rendered_verified: verified_note = f"verified: {rendered_verified}" rendered_note = ( f"{verified_note}; note: {rendered_note}" if rendered_note else verified_note ) updated_line = render_plan_line(plan_key, status, rendered_note) workflow_phase = { "done": "done", "blocked": "blocked", "skipped": "skipped", }[status] for entry_plan, _, _, idx in entries: if entry_plan == plan_key: lines[idx] = updated_line lines = update_workflow_state( lines, phase=workflow_phase, plan=f"docs/superpowers/plans/{plan_key}", verification=rendered_verified, clear_keys=verification_clear_keys, ) write_progress_lines(progress_path, lines) return 0, updated_line lines[end_idx:end_idx] = [updated_line] lines = update_workflow_state( lines, phase=workflow_phase, plan=f"docs/superpowers/plans/{plan_key}", verification=rendered_verified, clear_keys=verification_clear_keys, ) write_progress_lines(progress_path, lines) return 0, updated_line def status_report(plans_dir: Path, progress_path: Path) -> tuple[int, str]: if not plans_dir.is_dir(): return 2, f"ERROR: plans dir not found: {plans_dir}" plan_keys = list_plan_files(plans_dir) lines = load_progress_lines(progress_path) block = find_block(lines) entries: list[tuple[str, str, Optional[str], int]] = [] if block: entries = filter_existing_entries(parse_entries(lines, *block), plan_keys) entry_by_plan = {plan_key: (status, note) for plan_key, status, note, _ in entries} counts = {status: 0 for status in ("pending", "in-progress", "done", "blocked", "skipped")} rows: list[str] = [] for plan_key in plan_keys: status, note = entry_by_plan.get(plan_key, ("pending", None)) counts[status] += 1 suffix = f": {note}" if note else "" rows.append(f"PLAN {plan_key} {status}{suffix}") state: dict[str, str] = {} workflow_block = find_named_block(lines, WORKFLOW_STATE_START, WORKFLOW_STATE_END) if workflow_block: state = parse_workflow_state(lines, *workflow_block) output = [ "STATUS " f"total={len(plan_keys)} " f"pending={counts['pending']} " f"in-progress={counts['in-progress']} " f"done={counts['done']} " f"blocked={counts['blocked']} " f"skipped={counts['skipped']}" ] current_parts = [ f"{key}={state[key]}" for key in ( "phase", "plan", "claimed_by", "claimed_at", "verification", ) if key in state ] if current_parts: output.append("CURRENT " + " ".join(current_parts)) output.extend(rows) return 0, "\n".join(output) def record_workflow_state( progress_path: Path, phase: str, spec: Optional[str], plan: Optional[str], executor: Optional[str], constraints: Optional[str], ) -> tuple[int, str]: if phase not in WORKFLOW_PHASES: return 2, f"ERROR: invalid phase: {phase}" with locked_progress(progress_path): lines = load_progress_lines(progress_path) lines = update_workflow_state(lines, phase, spec, plan, executor, constraints) write_progress_lines(progress_path, lines) return 0, "OK" def main(argv: list[str]) -> int: if not argv: print(usage(), file=sys.stderr) return 2 if argv[0] in ("-h", "-help"): print(usage()) return 0 mode = argv[0] if mode not in {"claim", "finish", "record", "status"}: print(f"ERROR: unknown mode: {mode}", file=sys.stderr) print(usage(), file=sys.stderr) return 2 try: flags = parse_flags(argv[1:]) except ValueError as exc: if str(exc) == "help": print(usage()) return 0 print(f"ERROR: {exc}", file=sys.stderr) print(usage(), file=sys.stderr) return 2 if mode == "claim": plans = flags.get("-plans") progress = flags.get("-progress") owner = flags.get("-owner") if not plans or not progress: print("ERROR: -plans and -progress are required", file=sys.stderr) print(usage(), file=sys.stderr) return 2 code, message = claim_plan(Path(plans), Path(progress), owner) if code != 0: print(message, file=sys.stderr) return code print(message) return 0 if mode == "status": plans = flags.get("-plans") progress = flags.get("-progress") if not plans or not progress: print("ERROR: -plans and -progress are required", file=sys.stderr) print(usage(), file=sys.stderr) return 2 code, message = status_report(Path(plans), Path(progress)) if code != 0: print(message, file=sys.stderr) return code print(message) return 0 if mode == "record": progress = flags.get("-progress") phase = flags.get("-phase") spec = flags.get("-spec") plan = flags.get("-plan") executor = flags.get("-executor") constraints = flags.get("-constraints") if not progress or not phase: print("ERROR: -progress and -phase are required", file=sys.stderr) print(usage(), file=sys.stderr) return 2 code, message = record_workflow_state( Path(progress), phase, spec, plan, executor, constraints ) if code != 0: print(message, file=sys.stderr) return code print(message) return 0 plan = flags.get("-plan") status = flags.get("-status") progress = flags.get("-progress") note = flags.get("-note") verified = flags.get("-verified") if not plan or not status or not progress: print("ERROR: -plan, -status, and -progress are required", file=sys.stderr) print(usage(), file=sys.stderr) return 2 code, message = finish_plan(plan, status, Path(progress), note, verified) if code != 0: print(message, file=sys.stderr) return code print(message) return 0 if __name__ == "__main__": raise SystemExit(main(sys.argv[1:]))