playbook/scripts/main_loop.py

585 lines
18 KiB
Python

#!/usr/bin/env python3
from contextlib import contextmanager
import os
import platform
import re
import sys
import threading
import time
from pathlib import Path
from typing import Optional
try:
import fcntl
except ImportError: # pragma: no cover
fcntl = None
try:
import msvcrt
except ImportError: # pragma: no cover
msvcrt = None
PLAN_STATUS_START = "<!-- plan-status:start -->"
PLAN_STATUS_END = "<!-- plan-status:end -->"
WORKFLOW_STATE_START = "<!-- workflow-state:start -->"
WORKFLOW_STATE_END = "<!-- workflow-state:end -->"
PLAN_FILE_RE = re.compile(r"^(\d{4}-\d{2}-\d{2})-.+\.md$")
PLAN_LINE_RE = re.compile(
r"^- \[(?P<check>[ xX])\] `(?P<plan>[^`]+)` "
r"(?P<status>done|blocked|pending|in-progress|skipped)"
r"(?:: (?P<note>.*))?$"
)
FINISH_STATUSES = {"done", "blocked", "skipped"}
ENV_BLOCKED_RE = re.compile(r"^env:([^:]+):(.+)$")
WORKFLOW_PHASES = {"brainstorming", "planning", "executing", "done", "blocked"}
THREAD_LOCKS: dict[str, threading.Lock] = {}
THREAD_LOCKS_GUARD = threading.Lock()
def usage() -> str:
return (
"Usage:\n"
" python scripts/main_loop.py claim -plans <dir> -progress <file>\n"
" python scripts/main_loop.py finish -plan <path> -status <status> "
"-progress <file> [-note <text>]\n"
" python scripts/main_loop.py record -progress <file> -phase <phase> "
"[-spec <path>] [-plan <path>] [-executor <name>] "
"[-constraints <csv>]\n"
" python scripts/main_loop.py -h\n"
"Options:\n"
" -plans DIR\n"
" -plan PATH\n"
" -status done|blocked|skipped\n"
" -progress FILE\n"
" -phase brainstorming|planning|executing|done|blocked\n"
" -spec PATH\n"
" -executor NAME\n"
" -constraints CSV\n"
" -note TEXT\n"
" -h, -help Show this help.\n"
)
def parse_flags(args: list[str]) -> dict[str, str]:
flags: dict[str, str] = {}
idx = 0
while idx < len(args):
arg = args[idx]
if arg in ("-h", "-help"):
raise ValueError("help")
if not arg.startswith("-"):
raise ValueError(f"unexpected arg: {arg}")
if idx + 1 >= len(args):
raise ValueError(f"missing value for {arg}")
flags[arg] = args[idx + 1]
idx += 2
return flags
def normalize_plan_key(plan_value: str) -> str:
raw = plan_value.strip().replace("\\", "/")
raw = raw.lstrip("./")
if raw.startswith("docs/superpowers/plans/"):
return raw[len("docs/superpowers/plans/") :]
marker = "/docs/superpowers/plans/"
if marker in raw:
return raw.split(marker, 1)[1]
return raw
def normalize_note(note: str) -> str:
return note.replace("\n", " ").replace("\r", " ").replace("`", "'").strip()
def render_plan_line(plan_key: str, status: str, note: Optional[str]) -> str:
checked = "x" if status == "done" else " "
suffix = status
if note:
suffix += f": {note}"
return f"- [{checked}] `{plan_key}` {suffix}"
def list_plan_files(plans_dir: Path) -> list[str]:
entries: list[str] = []
for path in plans_dir.iterdir():
if not path.is_file():
continue
if not PLAN_FILE_RE.match(path.name):
continue
entries.append(path.name)
return sorted(entries)
def find_block(lines: list[str]) -> Optional[tuple[int, int]]:
start_idx = None
for idx, line in enumerate(lines):
if line.strip() == PLAN_STATUS_START:
start_idx = idx
break
if start_idx is None:
return None
for idx in range(start_idx + 1, len(lines)):
if lines[idx].strip() == PLAN_STATUS_END:
return start_idx, idx
return None
def find_named_block(
lines: list[str], start_marker: str, end_marker: str
) -> Optional[tuple[int, int]]:
start_idx = None
for idx, line in enumerate(lines):
if line.strip() == start_marker:
start_idx = idx
break
if start_idx is None:
return None
for idx in range(start_idx + 1, len(lines)):
if lines[idx].strip() == end_marker:
return start_idx, idx
return None
def parse_entries(
lines: list[str], start_idx: int, end_idx: int
) -> list[tuple[str, str, Optional[str], int]]:
entries: list[tuple[str, str, Optional[str], int]] = []
for idx in range(start_idx + 1, end_idx):
line = lines[idx].strip()
match = PLAN_LINE_RE.match(line)
if not match:
continue
plan_key = normalize_plan_key(match.group("plan"))
status = match.group("status")
note = match.group("note")
entries.append((plan_key, status, note, idx))
return entries
def render_progress_lines(plans: list[str]) -> list[str]:
lines = [
"# 当前进展",
"",
"## Workflow State",
"",
WORKFLOW_STATE_START,
WORKFLOW_STATE_END,
"",
"## Plan Status",
"",
PLAN_STATUS_START,
]
for plan_key in plans:
lines.append(render_plan_line(plan_key, "pending", None))
lines.append(PLAN_STATUS_END)
return lines
def render_workflow_state_lines(
phase: Optional[str] = None,
spec: Optional[str] = None,
plan: Optional[str] = None,
executor: Optional[str] = None,
constraints: Optional[str] = None,
) -> list[str]:
lines = [WORKFLOW_STATE_START]
if phase:
lines.append(f"phase: {phase}")
if spec:
lines.append(f"spec: {spec}")
if plan:
lines.append(f"plan: {plan}")
if executor:
lines.append(f"executor: {executor}")
if constraints:
lines.append(f"constraints: {constraints}")
lines.append(WORKFLOW_STATE_END)
return lines
def parse_workflow_state(
lines: list[str], start_idx: int, end_idx: int
) -> dict[str, str]:
state: dict[str, str] = {}
for idx in range(start_idx + 1, end_idx):
line = lines[idx].strip()
if ": " not in line:
continue
key, value = line.split(": ", 1)
if key in {"phase", "spec", "plan", "executor", "constraints"}:
state[key] = value
return state
def parse_env_blocked_note(note: Optional[str]) -> Optional[tuple[str, str]]:
if not note:
return None
match = ENV_BLOCKED_RE.match(note)
if not match:
return None
return match.group(1), match.group(2)
def detect_env() -> Optional[str]:
mapping = {"windows": "windows", "linux": "linux", "darwin": "darwin"}
return mapping.get(platform.system().lower())
def load_progress_lines(progress_path: Path) -> list[str]:
progress_path.parent.mkdir(parents=True, exist_ok=True)
if progress_path.exists():
return progress_path.read_text(encoding="utf-8").splitlines()
return []
def write_progress_lines(progress_path: Path, lines: list[str]) -> None:
progress_path.write_text("\n".join(lines) + "\n", encoding="utf-8")
def get_thread_lock(lock_path: Path) -> threading.Lock:
key = str(lock_path.resolve())
with THREAD_LOCKS_GUARD:
lock = THREAD_LOCKS.get(key)
if lock is None:
lock = threading.Lock()
THREAD_LOCKS[key] = lock
return lock
@contextmanager
def locked_progress(progress_path: Path):
progress_path.parent.mkdir(parents=True, exist_ok=True)
lock_path = progress_path.with_name(f"{progress_path.name}.lock")
thread_lock = get_thread_lock(lock_path)
with thread_lock:
with lock_path.open("a+b") as lock_file:
if fcntl is not None:
fcntl.flock(lock_file.fileno(), fcntl.LOCK_EX)
elif msvcrt is not None: # pragma: no cover
while True:
try:
lock_file.seek(0)
msvcrt.locking(lock_file.fileno(), msvcrt.LK_LOCK, 1)
break
except OSError:
time.sleep(0.05)
try:
hold_ms = os.environ.get("PLAYBOOK_MAIN_LOOP_HOLD_LOCK_MS")
if hold_ms:
time.sleep(max(0.0, float(hold_ms) / 1000.0))
yield
finally:
if fcntl is not None:
fcntl.flock(lock_file.fileno(), fcntl.LOCK_UN)
elif msvcrt is not None: # pragma: no cover
lock_file.seek(0)
msvcrt.locking(lock_file.fileno(), msvcrt.LK_UNLCK, 1)
def ensure_section(lines: list[str], heading: str) -> list[str]:
if any(line.strip() == heading for line in lines):
return lines
if lines and lines[-1] != "":
lines.append("")
lines.extend([heading, ""])
return lines
def ensure_block_with_lines(
lines: list[str],
start_marker: str,
end_marker: str,
default_lines: list[str],
heading: Optional[str] = None,
) -> tuple[list[str], int, int]:
block = find_named_block(lines, start_marker, end_marker)
if block:
return lines, block[0], block[1]
if not lines:
lines = ["# 当前进展", ""]
if heading:
lines = ensure_section(lines, heading)
if lines and lines[-1] != "":
lines.append("")
insert_at = len(lines)
lines[insert_at:insert_at] = default_lines
return lines, insert_at, insert_at + len(default_lines) - 1
def ensure_plan_block(
lines: list[str], progress_path: Path, plan_keys: list[str]
) -> tuple[list[str], int, int]:
lines, _, _ = ensure_workflow_state_block(lines)
lines, start_idx, end_idx = ensure_block_with_lines(
lines,
PLAN_STATUS_START,
PLAN_STATUS_END,
[PLAN_STATUS_START, PLAN_STATUS_END],
"## Plan Status",
)
write_progress_lines(progress_path, lines)
return lines, start_idx, end_idx
def ensure_workflow_state_block(
lines: list[str],
) -> tuple[list[str], int, int]:
return ensure_block_with_lines(
lines,
WORKFLOW_STATE_START,
WORKFLOW_STATE_END,
[WORKFLOW_STATE_START, WORKFLOW_STATE_END],
"## Workflow State",
)
def update_workflow_state(
lines: list[str],
phase: Optional[str] = None,
spec: Optional[str] = None,
plan: Optional[str] = None,
executor: Optional[str] = None,
constraints: Optional[str] = None,
) -> list[str]:
lines, start_idx, end_idx = ensure_workflow_state_block(lines)
state = parse_workflow_state(lines, start_idx, end_idx)
if phase is not None:
state["phase"] = phase
if spec is not None:
state["spec"] = spec
if plan is not None:
state["plan"] = plan
if executor is not None:
state["executor"] = executor
if constraints is not None:
state["constraints"] = constraints
lines[start_idx : end_idx + 1] = render_workflow_state_lines(
state.get("phase"),
state.get("spec"),
state.get("plan"),
state.get("executor"),
state.get("constraints"),
)
return lines
def ensure_all_plans_present(
lines: list[str], start_idx: int, end_idx: int, progress_path: Path, plan_keys: list[str]
) -> list[tuple[str, str, Optional[str], int]]:
entries = parse_entries(lines, start_idx, end_idx)
existing = {plan_key for plan_key, _, _, _ in entries}
missing = [plan_key for plan_key in plan_keys if plan_key not in existing]
if missing:
insert_lines = [render_plan_line(plan_key, "pending", None) for plan_key in missing]
lines[end_idx:end_idx] = insert_lines
write_progress_lines(progress_path, lines)
end_idx += len(insert_lines)
entries = parse_entries(lines, start_idx, end_idx)
return entries
def filter_existing_entries(
entries: list[tuple[str, str, Optional[str], int]], plan_keys: list[str]
) -> list[tuple[str, str, Optional[str], int]]:
available = set(plan_keys)
return [entry for entry in entries if entry[0] in available]
def choose_claim_entry(
entries: list[tuple[str, str, Optional[str], int]], current_env: Optional[str]
) -> Optional[tuple[str, Optional[str], int]]:
for plan_key, status, note, idx in entries:
if status == "in-progress":
return plan_key, note, idx
for plan_key, status, note, idx in entries:
if status == "pending":
return plan_key, note, idx
if current_env:
for plan_key, status, note, idx in entries:
if status != "blocked":
continue
env_info = parse_env_blocked_note(note)
if env_info and env_info[0] == current_env:
return plan_key, note, idx
return None
def claim_plan(plans_dir: Path, progress_path: Path) -> tuple[int, str]:
if not plans_dir.is_dir():
return 2, f"ERROR: plans dir not found: {plans_dir}"
plan_keys = list_plan_files(plans_dir)
if not plan_keys:
return 2, "ERROR: no plan files found"
with locked_progress(progress_path):
lines = load_progress_lines(progress_path)
try:
lines, start_idx, end_idx = ensure_plan_block(lines, progress_path, plan_keys)
except ValueError as exc:
return 2, f"ERROR: {exc}"
entries = ensure_all_plans_present(lines, start_idx, end_idx, progress_path, plan_keys)
entries = filter_existing_entries(entries, plan_keys)
chosen = choose_claim_entry(entries, detect_env())
if not chosen:
return 0, "NOOP: no claimable plans"
plan_key, note, idx = chosen
lines[idx] = render_plan_line(plan_key, "in-progress", note)
lines = update_workflow_state(
lines,
phase="executing",
plan=(plans_dir / plan_key).as_posix(),
)
write_progress_lines(progress_path, lines)
output = [f"PLAN={(plans_dir / plan_key).as_posix()}"]
if note:
output.append(f"NOTE={note}")
return 0, "\n".join(output)
def finish_plan(
plan: str, status: str, progress_path: Path, note: Optional[str]
) -> tuple[int, str]:
if status not in FINISH_STATUSES:
return 2, f"ERROR: invalid status: {status}"
if not plan:
return 2, "ERROR: plan is required"
plan_key = normalize_plan_key(plan)
with locked_progress(progress_path):
lines = load_progress_lines(progress_path)
try:
lines, start_idx, end_idx = ensure_plan_block(lines, progress_path, [plan_key])
except ValueError as exc:
return 2, f"ERROR: {exc}"
entries = parse_entries(lines, start_idx, end_idx)
rendered_note = normalize_note(note) if note else None
updated_line = render_plan_line(plan_key, status, rendered_note)
for entry_plan, _, _, idx in entries:
if entry_plan == plan_key:
lines[idx] = updated_line
workflow_phase = "done" if status == "done" else "blocked"
lines = update_workflow_state(
lines,
phase=workflow_phase,
plan=f"docs/superpowers/plans/{plan_key}",
)
write_progress_lines(progress_path, lines)
return 0, updated_line
lines[end_idx:end_idx] = [updated_line]
workflow_phase = "done" if status == "done" else "blocked"
lines = update_workflow_state(
lines,
phase=workflow_phase,
plan=f"docs/superpowers/plans/{plan_key}",
)
write_progress_lines(progress_path, lines)
return 0, updated_line
def record_workflow_state(
progress_path: Path,
phase: str,
spec: Optional[str],
plan: Optional[str],
executor: Optional[str],
constraints: Optional[str],
) -> tuple[int, str]:
if phase not in WORKFLOW_PHASES:
return 2, f"ERROR: invalid phase: {phase}"
with locked_progress(progress_path):
lines = load_progress_lines(progress_path)
lines = update_workflow_state(lines, phase, spec, plan, executor, constraints)
write_progress_lines(progress_path, lines)
return 0, "OK"
def main(argv: list[str]) -> int:
if not argv:
print(usage(), file=sys.stderr)
return 2
if argv[0] in ("-h", "-help"):
print(usage())
return 0
mode = argv[0]
if mode not in {"claim", "finish", "record"}:
print(f"ERROR: unknown mode: {mode}", file=sys.stderr)
print(usage(), file=sys.stderr)
return 2
try:
flags = parse_flags(argv[1:])
except ValueError as exc:
if str(exc) == "help":
print(usage())
return 0
print(f"ERROR: {exc}", file=sys.stderr)
print(usage(), file=sys.stderr)
return 2
if mode == "claim":
plans = flags.get("-plans")
progress = flags.get("-progress")
if not plans or not progress:
print("ERROR: -plans and -progress are required", file=sys.stderr)
print(usage(), file=sys.stderr)
return 2
code, message = claim_plan(Path(plans), Path(progress))
if code != 0:
print(message, file=sys.stderr)
return code
print(message)
return 0
if mode == "record":
progress = flags.get("-progress")
phase = flags.get("-phase")
spec = flags.get("-spec")
plan = flags.get("-plan")
executor = flags.get("-executor")
constraints = flags.get("-constraints")
if not progress or not phase:
print("ERROR: -progress and -phase are required", file=sys.stderr)
print(usage(), file=sys.stderr)
return 2
code, message = record_workflow_state(
Path(progress), phase, spec, plan, executor, constraints
)
if code != 0:
print(message, file=sys.stderr)
return code
print(message)
return 0
plan = flags.get("-plan")
status = flags.get("-status")
progress = flags.get("-progress")
note = flags.get("-note")
if not plan or not status or not progress:
print("ERROR: -plan, -status, and -progress are required", file=sys.stderr)
print(usage(), file=sys.stderr)
return 2
code, message = finish_plan(plan, status, Path(progress), note)
if code != 0:
print(message, file=sys.stderr)
return code
print(message)
return 0
if __name__ == "__main__":
raise SystemExit(main(sys.argv[1:]))