#!/usr/bin/env python3 from contextlib import contextmanager import os import platform import re import sys import threading import time from pathlib import Path from typing import Optional try: import fcntl except ImportError: # pragma: no cover fcntl = None try: import msvcrt except ImportError: # pragma: no cover msvcrt = None PLAN_STATUS_START = "" PLAN_STATUS_END = "" WORKFLOW_STATE_START = "" WORKFLOW_STATE_END = "" PLAN_FILE_RE = re.compile(r"^(\d{4}-\d{2}-\d{2})-.+\.md$") PLAN_LINE_RE = re.compile( r"^- \[(?P[ xX])\] `(?P[^`]+)` " r"(?Pdone|blocked|pending|in-progress|skipped)" r"(?:: (?P.*))?$" ) FINISH_STATUSES = {"done", "blocked", "skipped"} ENV_BLOCKED_RE = re.compile(r"^env:([^:]+):(.+)$") WORKFLOW_PHASES = {"brainstorming", "planning", "executing", "done", "blocked"} THREAD_LOCKS: dict[str, threading.Lock] = {} THREAD_LOCKS_GUARD = threading.Lock() def usage() -> str: return ( "Usage:\n" " python scripts/main_loop.py claim -plans -progress \n" " python scripts/main_loop.py finish -plan -status " "-progress [-note ]\n" " python scripts/main_loop.py record -progress -phase " "[-spec ] [-plan ] [-executor ] " "[-constraints ]\n" " python scripts/main_loop.py -h\n" "Options:\n" " -plans DIR\n" " -plan PATH\n" " -status done|blocked|skipped\n" " -progress FILE\n" " -phase brainstorming|planning|executing|done|blocked\n" " -spec PATH\n" " -executor NAME\n" " -constraints CSV\n" " -note TEXT\n" " -h, -help Show this help.\n" ) def parse_flags(args: list[str]) -> dict[str, str]: flags: dict[str, str] = {} idx = 0 while idx < len(args): arg = args[idx] if arg in ("-h", "-help"): raise ValueError("help") if not arg.startswith("-"): raise ValueError(f"unexpected arg: {arg}") if idx + 1 >= len(args): raise ValueError(f"missing value for {arg}") flags[arg] = args[idx + 1] idx += 2 return flags def normalize_plan_key(plan_value: str) -> str: raw = plan_value.strip().replace("\\", "/") raw = raw.lstrip("./") if raw.startswith("docs/superpowers/plans/"): return raw[len("docs/superpowers/plans/") :] marker = "/docs/superpowers/plans/" if marker in raw: return raw.split(marker, 1)[1] return raw def normalize_note(note: str) -> str: return note.replace("\n", " ").replace("\r", " ").replace("`", "'").strip() def render_plan_line(plan_key: str, status: str, note: Optional[str]) -> str: checked = "x" if status == "done" else " " suffix = status if note: suffix += f": {note}" return f"- [{checked}] `{plan_key}` {suffix}" def list_plan_files(plans_dir: Path) -> list[str]: entries: list[str] = [] for path in plans_dir.iterdir(): if not path.is_file(): continue if not PLAN_FILE_RE.match(path.name): continue entries.append(path.name) return sorted(entries) def find_block(lines: list[str]) -> Optional[tuple[int, int]]: start_idx = None for idx, line in enumerate(lines): if line.strip() == PLAN_STATUS_START: start_idx = idx break if start_idx is None: return None for idx in range(start_idx + 1, len(lines)): if lines[idx].strip() == PLAN_STATUS_END: return start_idx, idx return None def find_named_block( lines: list[str], start_marker: str, end_marker: str ) -> Optional[tuple[int, int]]: start_idx = None for idx, line in enumerate(lines): if line.strip() == start_marker: start_idx = idx break if start_idx is None: return None for idx in range(start_idx + 1, len(lines)): if lines[idx].strip() == end_marker: return start_idx, idx return None def parse_entries( lines: list[str], start_idx: int, end_idx: int ) -> list[tuple[str, str, Optional[str], int]]: entries: list[tuple[str, str, Optional[str], int]] = [] for idx in range(start_idx + 1, end_idx): line = lines[idx].strip() match = PLAN_LINE_RE.match(line) if not match: continue plan_key = normalize_plan_key(match.group("plan")) status = match.group("status") note = match.group("note") entries.append((plan_key, status, note, idx)) return entries def render_progress_lines(plans: list[str]) -> list[str]: lines = [ "# 当前进展", "", "## Workflow State", "", WORKFLOW_STATE_START, WORKFLOW_STATE_END, "", "## Plan Status", "", PLAN_STATUS_START, ] for plan_key in plans: lines.append(render_plan_line(plan_key, "pending", None)) lines.append(PLAN_STATUS_END) return lines def render_workflow_state_lines( phase: Optional[str] = None, spec: Optional[str] = None, plan: Optional[str] = None, executor: Optional[str] = None, constraints: Optional[str] = None, ) -> list[str]: lines = [WORKFLOW_STATE_START] if phase: lines.append(f"phase: {phase}") if spec: lines.append(f"spec: {spec}") if plan: lines.append(f"plan: {plan}") if executor: lines.append(f"executor: {executor}") if constraints: lines.append(f"constraints: {constraints}") lines.append(WORKFLOW_STATE_END) return lines def parse_workflow_state( lines: list[str], start_idx: int, end_idx: int ) -> dict[str, str]: state: dict[str, str] = {} for idx in range(start_idx + 1, end_idx): line = lines[idx].strip() if ": " not in line: continue key, value = line.split(": ", 1) if key in {"phase", "spec", "plan", "executor", "constraints"}: state[key] = value return state def parse_env_blocked_note(note: Optional[str]) -> Optional[tuple[str, str]]: if not note: return None match = ENV_BLOCKED_RE.match(note) if not match: return None return match.group(1), match.group(2) def detect_env() -> Optional[str]: mapping = {"windows": "windows", "linux": "linux", "darwin": "darwin"} return mapping.get(platform.system().lower()) def load_progress_lines(progress_path: Path) -> list[str]: progress_path.parent.mkdir(parents=True, exist_ok=True) if progress_path.exists(): return progress_path.read_text(encoding="utf-8").splitlines() return [] def write_progress_lines(progress_path: Path, lines: list[str]) -> None: progress_path.write_text("\n".join(lines) + "\n", encoding="utf-8") def get_thread_lock(lock_path: Path) -> threading.Lock: key = str(lock_path.resolve()) with THREAD_LOCKS_GUARD: lock = THREAD_LOCKS.get(key) if lock is None: lock = threading.Lock() THREAD_LOCKS[key] = lock return lock @contextmanager def locked_progress(progress_path: Path): progress_path.parent.mkdir(parents=True, exist_ok=True) lock_path = progress_path.with_name(f"{progress_path.name}.lock") thread_lock = get_thread_lock(lock_path) with thread_lock: with lock_path.open("a+b") as lock_file: if fcntl is not None: fcntl.flock(lock_file.fileno(), fcntl.LOCK_EX) elif msvcrt is not None: # pragma: no cover while True: try: lock_file.seek(0) msvcrt.locking(lock_file.fileno(), msvcrt.LK_LOCK, 1) break except OSError: time.sleep(0.05) try: hold_ms = os.environ.get("PLAYBOOK_MAIN_LOOP_HOLD_LOCK_MS") if hold_ms: time.sleep(max(0.0, float(hold_ms) / 1000.0)) yield finally: if fcntl is not None: fcntl.flock(lock_file.fileno(), fcntl.LOCK_UN) elif msvcrt is not None: # pragma: no cover lock_file.seek(0) msvcrt.locking(lock_file.fileno(), msvcrt.LK_UNLCK, 1) def ensure_section(lines: list[str], heading: str) -> list[str]: if any(line.strip() == heading for line in lines): return lines if lines and lines[-1] != "": lines.append("") lines.extend([heading, ""]) return lines def ensure_block_with_lines( lines: list[str], start_marker: str, end_marker: str, default_lines: list[str], heading: Optional[str] = None, ) -> tuple[list[str], int, int]: block = find_named_block(lines, start_marker, end_marker) if block: return lines, block[0], block[1] if not lines: lines = ["# 当前进展", ""] if heading: lines = ensure_section(lines, heading) if lines and lines[-1] != "": lines.append("") insert_at = len(lines) lines[insert_at:insert_at] = default_lines return lines, insert_at, insert_at + len(default_lines) - 1 def ensure_plan_block( lines: list[str], progress_path: Path, plan_keys: list[str] ) -> tuple[list[str], int, int]: lines, _, _ = ensure_workflow_state_block(lines) lines, start_idx, end_idx = ensure_block_with_lines( lines, PLAN_STATUS_START, PLAN_STATUS_END, [PLAN_STATUS_START, PLAN_STATUS_END], "## Plan Status", ) write_progress_lines(progress_path, lines) return lines, start_idx, end_idx def ensure_workflow_state_block( lines: list[str], ) -> tuple[list[str], int, int]: return ensure_block_with_lines( lines, WORKFLOW_STATE_START, WORKFLOW_STATE_END, [WORKFLOW_STATE_START, WORKFLOW_STATE_END], "## Workflow State", ) def update_workflow_state( lines: list[str], phase: Optional[str] = None, spec: Optional[str] = None, plan: Optional[str] = None, executor: Optional[str] = None, constraints: Optional[str] = None, ) -> list[str]: lines, start_idx, end_idx = ensure_workflow_state_block(lines) state = parse_workflow_state(lines, start_idx, end_idx) if phase is not None: state["phase"] = phase if spec is not None: state["spec"] = spec if plan is not None: state["plan"] = plan if executor is not None: state["executor"] = executor if constraints is not None: state["constraints"] = constraints lines[start_idx : end_idx + 1] = render_workflow_state_lines( state.get("phase"), state.get("spec"), state.get("plan"), state.get("executor"), state.get("constraints"), ) return lines def ensure_all_plans_present( lines: list[str], start_idx: int, end_idx: int, progress_path: Path, plan_keys: list[str] ) -> list[tuple[str, str, Optional[str], int]]: entries = parse_entries(lines, start_idx, end_idx) existing = {plan_key for plan_key, _, _, _ in entries} missing = [plan_key for plan_key in plan_keys if plan_key not in existing] if missing: insert_lines = [render_plan_line(plan_key, "pending", None) for plan_key in missing] lines[end_idx:end_idx] = insert_lines write_progress_lines(progress_path, lines) end_idx += len(insert_lines) entries = parse_entries(lines, start_idx, end_idx) return entries def filter_existing_entries( entries: list[tuple[str, str, Optional[str], int]], plan_keys: list[str] ) -> list[tuple[str, str, Optional[str], int]]: available = set(plan_keys) return [entry for entry in entries if entry[0] in available] def choose_claim_entry( entries: list[tuple[str, str, Optional[str], int]], current_env: Optional[str] ) -> Optional[tuple[str, Optional[str], int]]: for plan_key, status, note, idx in entries: if status == "in-progress": return plan_key, note, idx for plan_key, status, note, idx in entries: if status == "pending": return plan_key, note, idx if current_env: for plan_key, status, note, idx in entries: if status != "blocked": continue env_info = parse_env_blocked_note(note) if env_info and env_info[0] == current_env: return plan_key, note, idx return None def claim_plan(plans_dir: Path, progress_path: Path) -> tuple[int, str]: if not plans_dir.is_dir(): return 2, f"ERROR: plans dir not found: {plans_dir}" plan_keys = list_plan_files(plans_dir) if not plan_keys: return 2, "ERROR: no plan files found" with locked_progress(progress_path): lines = load_progress_lines(progress_path) try: lines, start_idx, end_idx = ensure_plan_block(lines, progress_path, plan_keys) except ValueError as exc: return 2, f"ERROR: {exc}" entries = ensure_all_plans_present(lines, start_idx, end_idx, progress_path, plan_keys) entries = filter_existing_entries(entries, plan_keys) chosen = choose_claim_entry(entries, detect_env()) if not chosen: return 0, "NOOP: no claimable plans" plan_key, note, idx = chosen lines[idx] = render_plan_line(plan_key, "in-progress", note) lines = update_workflow_state( lines, phase="executing", plan=(plans_dir / plan_key).as_posix(), ) write_progress_lines(progress_path, lines) output = [f"PLAN={(plans_dir / plan_key).as_posix()}"] if note: output.append(f"NOTE={note}") return 0, "\n".join(output) def finish_plan( plan: str, status: str, progress_path: Path, note: Optional[str] ) -> tuple[int, str]: if status not in FINISH_STATUSES: return 2, f"ERROR: invalid status: {status}" if not plan: return 2, "ERROR: plan is required" plan_key = normalize_plan_key(plan) with locked_progress(progress_path): lines = load_progress_lines(progress_path) try: lines, start_idx, end_idx = ensure_plan_block(lines, progress_path, [plan_key]) except ValueError as exc: return 2, f"ERROR: {exc}" entries = parse_entries(lines, start_idx, end_idx) rendered_note = normalize_note(note) if note else None updated_line = render_plan_line(plan_key, status, rendered_note) for entry_plan, _, _, idx in entries: if entry_plan == plan_key: lines[idx] = updated_line workflow_phase = "done" if status == "done" else "blocked" lines = update_workflow_state( lines, phase=workflow_phase, plan=f"docs/superpowers/plans/{plan_key}", ) write_progress_lines(progress_path, lines) return 0, updated_line lines[end_idx:end_idx] = [updated_line] workflow_phase = "done" if status == "done" else "blocked" lines = update_workflow_state( lines, phase=workflow_phase, plan=f"docs/superpowers/plans/{plan_key}", ) write_progress_lines(progress_path, lines) return 0, updated_line def record_workflow_state( progress_path: Path, phase: str, spec: Optional[str], plan: Optional[str], executor: Optional[str], constraints: Optional[str], ) -> tuple[int, str]: if phase not in WORKFLOW_PHASES: return 2, f"ERROR: invalid phase: {phase}" with locked_progress(progress_path): lines = load_progress_lines(progress_path) lines = update_workflow_state(lines, phase, spec, plan, executor, constraints) write_progress_lines(progress_path, lines) return 0, "OK" def main(argv: list[str]) -> int: if not argv: print(usage(), file=sys.stderr) return 2 if argv[0] in ("-h", "-help"): print(usage()) return 0 mode = argv[0] if mode not in {"claim", "finish", "record"}: print(f"ERROR: unknown mode: {mode}", file=sys.stderr) print(usage(), file=sys.stderr) return 2 try: flags = parse_flags(argv[1:]) except ValueError as exc: if str(exc) == "help": print(usage()) return 0 print(f"ERROR: {exc}", file=sys.stderr) print(usage(), file=sys.stderr) return 2 if mode == "claim": plans = flags.get("-plans") progress = flags.get("-progress") if not plans or not progress: print("ERROR: -plans and -progress are required", file=sys.stderr) print(usage(), file=sys.stderr) return 2 code, message = claim_plan(Path(plans), Path(progress)) if code != 0: print(message, file=sys.stderr) return code print(message) return 0 if mode == "record": progress = flags.get("-progress") phase = flags.get("-phase") spec = flags.get("-spec") plan = flags.get("-plan") executor = flags.get("-executor") constraints = flags.get("-constraints") if not progress or not phase: print("ERROR: -progress and -phase are required", file=sys.stderr) print(usage(), file=sys.stderr) return 2 code, message = record_workflow_state( Path(progress), phase, spec, plan, executor, constraints ) if code != 0: print(message, file=sys.stderr) return code print(message) return 0 plan = flags.get("-plan") status = flags.get("-status") progress = flags.get("-progress") note = flags.get("-note") if not plan or not status or not progress: print("ERROR: -plan, -status, and -progress are required", file=sys.stderr) print(usage(), file=sys.stderr) return 2 code, message = finish_plan(plan, status, Path(progress), note) if code != 0: print(message, file=sys.stderr) return code print(message) return 0 if __name__ == "__main__": raise SystemExit(main(sys.argv[1:]))