#!/usr/bin/env python3 import re import sys from datetime import datetime, timezone from pathlib import Path from shutil import copy2, copytree, rmtree, which import subprocess import importlib.util from typing import Optional try: import tomllib except ModuleNotFoundError: # Python < 3.11 tomllib = None ORDER = [ "sync_rules", "sync_memory_bank", "sync_prompts", "sync_standards", "install_skills", "format_md", ] SCRIPT_DIR = Path(__file__).resolve().parent PLAYBOOK_ROOT = SCRIPT_DIR.parent MAIN_LOOP_SCRIPT = SCRIPT_DIR / "main_loop.py" MAIN_LOOP_SPEC = importlib.util.spec_from_file_location("playbook_main_loop", MAIN_LOOP_SCRIPT) assert MAIN_LOOP_SPEC and MAIN_LOOP_SPEC.loader MAIN_LOOP = importlib.util.module_from_spec(MAIN_LOOP_SPEC) MAIN_LOOP_SPEC.loader.exec_module(MAIN_LOOP) PATH_CONFIG_KEYS = { "project_root", "playbook_root", "deploy_root", "agents_home", "codex_home", "skill_link", } DOCS_INDEX_SECTION_HEADINGS = { "common": "## 跨语言(common)", "tsl": "## TSL(tsl/tsf)", "cpp": "## C++(cpp)", "python": "## Python(python)", "typescript": "## TypeScript(typescript)", "markdown": "## Markdown(markdown)", } def usage() -> str: return ( "Usage:\n" " python scripts/playbook.py -config \n" " python scripts/playbook.py -record-spec -progress \n" " python scripts/playbook.py -record-plan -progress \n" " python scripts/playbook.py -h" ) def parse_cli_value(argv: list[str], flag: str) -> Optional[str]: if flag not in argv: return None idx = argv.index(flag) if idx + 1 >= len(argv): return None value = argv[idx + 1].strip() return value or None def strip_inline_comment(value: str) -> str: in_single = False in_double = False escape = False for idx, ch in enumerate(value): if escape: escape = False continue if in_double and ch == "\\": escape = True continue if ch == "'" and not in_double: in_single = not in_single continue if ch == '"' and not in_single: in_double = not in_double continue if ch == "#" and not in_single and not in_double: return value[:idx].rstrip() return value def split_list_items(raw: str) -> list[str]: items: list[str] = [] buf: list[str] = [] in_single = False in_double = False escape = False for ch in raw: if escape: buf.append(ch) escape = False continue if in_double and ch == "\\": buf.append(ch) escape = True continue if ch == "'" and not in_double: in_single = not in_single buf.append(ch) continue if ch == '"' and not in_single: in_double = not in_double buf.append(ch) continue if ch == "," and not in_single and not in_double: items.append("".join(buf).strip()) buf = [] continue buf.append(ch) tail = "".join(buf).strip() if tail: items.append(tail) return items def parse_toml_value(raw: str) -> object: value = raw.strip() if not value: return "" if value.startswith("[") and value.endswith("]"): inner = value[1:-1].strip() if not inner: return [] return [parse_toml_value(item) for item in split_list_items(inner)] lowered = value.lower() if lowered == "true": return True if lowered == "false": return False if value[0] in ("'", '"') and value[-1] == value[0]: if value[0] == "'": return value[1:-1] import ast try: return ast.literal_eval(value) except (ValueError, SyntaxError): return value[1:-1] try: if "." in value: return float(value) return int(value) except ValueError: return value def loads_toml_minimal(raw: str) -> dict: data: dict[str, dict] = {} current = None for line in raw.splitlines(): stripped = line.strip() if not stripped or stripped.startswith("#"): continue if stripped.startswith("[") and stripped.endswith("]"): section = stripped[1:-1].strip() if not section: raise ValueError("empty section header") current = data.setdefault(section, {}) if not isinstance(current, dict): raise ValueError(f"invalid section: {section}") continue if "=" not in stripped: raise ValueError(f"invalid line: {line}") key, value = stripped.split("=", 1) key = key.strip() if not key: raise ValueError("missing key") value = strip_inline_comment(value.strip()) target = current if current is not None else data target[key] = parse_toml_value(value) return data def normalize_path_config_strings(raw: str) -> str: normalized_lines: list[str] = [] for line in raw.splitlines(): stripped = line.strip() if not stripped or stripped.startswith("#") or "=" not in line: normalized_lines.append(line) continue key_part, value_part = line.split("=", 1) key = key_part.strip() if key not in PATH_CONFIG_KEYS: normalized_lines.append(line) continue value = strip_inline_comment(value_part.strip()) if len(value) < 2 or value[0] != '"' or value[-1] != '"' or "\\" not in value[1:-1]: normalized_lines.append(line) continue inner = value[1:-1] has_lone_backslash = False probe_idx = 0 while probe_idx < len(inner): if inner[probe_idx] != "\\": probe_idx += 1 continue if probe_idx + 1 < len(inner) and inner[probe_idx + 1] == "\\": probe_idx += 2 continue has_lone_backslash = True break if not has_lone_backslash: normalized_lines.append(line) continue escaped: list[str] = [] idx = 0 while idx < len(inner): ch = inner[idx] if ch != "\\": escaped.append(ch) idx += 1 continue if idx + 1 < len(inner) and inner[idx + 1] == "\\": escaped.extend(["\\", "\\"]) idx += 2 continue escaped.extend(["\\", "\\"]) idx += 1 normalized_lines.append(f'{key_part}= "{"".join(escaped)}"') suffix = "\n" if raw.endswith("\n") else "" return "\n".join(normalized_lines) + suffix def load_config(path: Path) -> dict: raw = normalize_path_config_strings(path.read_text(encoding="utf-8")) if tomllib is not None: return tomllib.loads(raw) return loads_toml_minimal(raw) def log(message: str) -> None: print(message) def ensure_dir(path: Path) -> None: path.mkdir(parents=True, exist_ok=True) def normalize_langs(raw: object) -> list[str]: if raw is None: return ["tsl"] if isinstance(raw, str): langs = [raw] else: langs = list(raw) cleaned: list[str] = [] for lang in langs: item = str(lang).strip() if not item: continue if "/" in item or "\\" in item or ".." in item: raise ValueError(f"invalid lang: {item}") cleaned.append(item) if not cleaned: raise ValueError("langs is empty") return cleaned def normalize_relative_dir(raw: object, label: str) -> str: value = str(raw).strip() if not value: raise ValueError(f"{label} is empty") path = Path(value) if path.is_absolute() or ".." in path.parts: raise ValueError(f"invalid {label}: {value}") normalized = path.as_posix() return "." if normalized == "" else normalized def join_playbook_subpath(root: str, child: str) -> str: if root in ("", "."): return child.lstrip("/") return f"{root.rstrip('/')}/{child.lstrip('/')}" def resolve_in_project_playbook_root(project_root: Path) -> str | None: try: rel = PLAYBOOK_ROOT.resolve().relative_to(project_root.resolve()) if str(rel) != ".": return rel.as_posix() except ValueError: pass return None def config_uses_playbook_root(config: dict) -> bool: for key in ( "sync_rules", "sync_memory_bank", "sync_prompts", "sync_standards", "install_skills", ): if key in config: return True return False def validate_removed_config(config: dict) -> None: if "vendor" in config: raise ValueError( "[vendor] is no longer supported; use [playbook].install_mode = " '"snapshot" and configure languages in [sync_standards]' ) playbook_config = config.get("playbook", {}) if not isinstance(playbook_config, dict): raise ValueError("[playbook] must be a table") if "deploy_root" in playbook_config: raise ValueError( "playbook.deploy_root is no longer supported; use playbook.playbook_root" ) if "intall_mode" in playbook_config: raise ValueError( "playbook.intall_mode is not supported; use playbook.install_mode" ) def resolve_install_mode(config: dict) -> str: playbook_config = config.get("playbook", {}) raw = "subtree" if ( isinstance(playbook_config, dict) and playbook_config.get("install_mode") is not None ): raw = playbook_config.get("install_mode") mode = str(raw).strip().lower() if mode not in ("subtree", "snapshot"): raise ValueError( "playbook.install_mode must be one of: subtree, snapshot" ) return mode def resolve_configured_playbook_root(config: dict, project_root: Path) -> str: playbook_config = config.get("playbook", {}) raw = None if isinstance(playbook_config, dict): raw = playbook_config.get("playbook_root") if raw is not None and str(raw).strip(): return normalize_relative_dir(raw, "playbook_root") in_project_playbook_root = resolve_in_project_playbook_root(project_root) if in_project_playbook_root is not None: return in_project_playbook_root if resolve_install_mode(config) == "snapshot" or config_uses_playbook_root(config): raise ValueError( "playbook.playbook_root is required when running from an external clone; " "set it to the target project's relative Playbook path" ) return "docs/standards/playbook" def resolve_playbook_root(context: dict) -> str: return context["playbook_root"] def resolve_docs_prefix(context: dict) -> str: return join_playbook_subpath(resolve_playbook_root(context), "docs") def resolve_playbook_scripts(context: dict) -> str: return join_playbook_subpath(resolve_playbook_root(context), "scripts") def read_git_commit(root: Path) -> str: try: result = subprocess.run( ["git", "-C", str(root), "rev-parse", "HEAD"], capture_output=True, text=True, check=True, ) except (OSError, subprocess.CalledProcessError): return "N/A" return result.stdout.strip() or "N/A" def extract_docs_index_sections(lines: list[str]) -> dict[str, list[str]]: heading_to_key = {value: key for key, value in DOCS_INDEX_SECTION_HEADINGS.items()} starts: list[tuple[int, str]] = [] for idx, line in enumerate(lines): key = heading_to_key.get(line) if key is not None: starts.append((idx, key)) sections: dict[str, list[str]] = {} for idx, (start, key) in enumerate(starts): end = starts[idx + 1][0] if idx + 1 < len(starts) else len(lines) section_lines = lines[start:end] while section_lines and section_lines[-1] == "": section_lines = section_lines[:-1] sections[key] = section_lines return sections def build_docs_index_lines(langs: list[str], source_path: Path | None = None) -> list[str]: docs_index_path = source_path or (PLAYBOOK_ROOT / "docs" / "index.md") source_lines = docs_index_path.read_text(encoding="utf-8").splitlines() title = source_lines[0] if source_lines else "# 文档导航(Docs Index)" sections = extract_docs_index_sections(source_lines) ordered_keys = ["common", *langs] result = [ title, "", f"本快照为裁剪版 Playbook(langs: {','.join(langs)})。", "", ] for idx, key in enumerate(ordered_keys): section = sections.get(key) if section is None: raise ValueError(f"docs/index.md is missing section for {key}") if idx > 0: result.append("") result.extend(section) return result def write_docs_index(dest_prefix: Path, langs: list[str]) -> None: lines = build_docs_index_lines(langs) docs_index = dest_prefix / "docs/index.md" ensure_dir(docs_index.parent) docs_index.write_text("\n".join(lines) + "\n", encoding="utf-8", newline="\n") def write_snapshot_readme( dest_prefix: Path, playbook_root: str, langs: list[str] ) -> None: scripts_path = join_playbook_subpath(playbook_root, "scripts/playbook.py") docs_index_path = join_playbook_subpath(playbook_root, "docs/index.md") lines = [ "# Playbook(裁剪快照)", "", f"本目录为从 Playbook 部署到项目内的裁剪快照(langs: {','.join(langs)})。", "", "## 使用", "", "在目标项目根目录执行:", "", "```sh", f"python {scripts_path} -config playbook.toml", "```", "", f"配置示例:`{join_playbook_subpath(playbook_root, 'playbook.toml.example')}`", "", "文档入口:", "", f"- `{docs_index_path}`", "- `.agents/index.md`", ] (dest_prefix / "README.md").write_text( "\n".join(lines) + "\n", encoding="utf-8", newline="\n" ) def write_source_file(dest_prefix: Path, langs: list[str]) -> None: commit = read_git_commit(PLAYBOOK_ROOT) timestamp = datetime.now(timezone.utc).strftime("%Y-%m-%dT%H:%M:%SZ") lines = [ "# SOURCE", "", f"- Source: {PLAYBOOK_ROOT}", f"- Commit: {commit}", f"- Date: {timestamp}", f"- Langs: {','.join(langs)}", "- Generated-by: scripts/playbook.py", ] (dest_prefix / "SOURCE.md").write_text( "\n".join(lines) + "\n", encoding="utf-8", newline="\n" ) def snapshot_langs(config: dict) -> list[str]: sync_standards = config.get("sync_standards") if isinstance(sync_standards, dict): return normalize_langs(sync_standards.get("langs")) return normalize_langs(None) def is_generated_snapshot(path: Path) -> bool: source = path / "SOURCE.md" if not source.is_file(): return False try: return "Generated-by: scripts/playbook.py" in source.read_text(encoding="utf-8") except OSError: return False def install_snapshot(config: dict, context: dict) -> int: try: langs = snapshot_langs(config) except ValueError as exc: print(f"ERROR: {exc}", file=sys.stderr) return 2 playbook_root = context["playbook_root"] target_path = Path(playbook_root) project_root: Path = context["project_root"] dest_prefix = project_root / target_path dest_standards = dest_prefix.parent if dest_prefix.resolve() == PLAYBOOK_ROOT.resolve(): log(f"Snapshot already installed at {dest_prefix}") return 0 ensure_dir(dest_standards) if dest_prefix.exists(): if not is_generated_snapshot(dest_prefix): print( "ERROR: refusing to replace existing playbook_root because it is " "not a generated snapshot; use install_mode = \"subtree\" for a " "git subtree-managed Playbook", file=sys.stderr, ) return 2 timestamp = datetime.now().strftime("%Y%m%d%H%M%S") backup = dest_standards / f"{dest_prefix.name}.bak.{timestamp}" dest_prefix.rename(backup) log(f"Backed up existing snapshot -> {backup}") ensure_dir(dest_prefix) gitattributes_src = PLAYBOOK_ROOT / ".gitattributes" if gitattributes_src.is_file(): copy2(gitattributes_src, dest_prefix / ".gitattributes") copytree(PLAYBOOK_ROOT / "scripts", dest_prefix / "scripts") copytree(PLAYBOOK_ROOT / "skills", dest_prefix / "skills") copy2(PLAYBOOK_ROOT / "SKILLS.md", dest_prefix / "SKILLS.md") common_docs = PLAYBOOK_ROOT / "docs/common" if common_docs.is_dir(): copytree(common_docs, dest_prefix / "docs/common") rulesets_root = PLAYBOOK_ROOT / "rulesets" ensure_dir(dest_prefix / "rulesets") if (rulesets_root / "index.md").is_file(): copy2(rulesets_root / "index.md", dest_prefix / "rulesets/index.md") templates_root = PLAYBOOK_ROOT / "templates" templates_dst = dest_prefix / "templates" ensure_dir(templates_dst) templates_ci = templates_root / "ci" if templates_ci.is_dir(): copytree(templates_ci, templates_dst / "ci") for name in ("AGENTS.template.md", "AGENT_RULES.template.md", "README.md"): src = templates_root / name if src.is_file(): copy2(src, templates_dst / name) memory_src = templates_root / "memory-bank" if memory_src.is_dir(): copytree(memory_src, templates_dst / "memory-bank") prompts_src = templates_root / "prompts" if prompts_src.is_dir(): copytree(prompts_src, templates_dst / "prompts") for lang in langs: docs_src = PLAYBOOK_ROOT / "docs" / lang rules_src = PLAYBOOK_ROOT / "rulesets" / lang if not docs_src.is_dir(): print(f"ERROR: docs not found for lang={lang}", file=sys.stderr) return 2 if not rules_src.is_dir(): print(f"ERROR: rulesets not found for lang={lang}", file=sys.stderr) return 2 copytree(docs_src, dest_prefix / "docs" / lang) copytree(rules_src, dest_prefix / "rulesets" / lang) templates_src = PLAYBOOK_ROOT / "templates" / lang if templates_src.is_dir(): copytree(templates_src, dest_prefix / "templates" / lang) example_config = PLAYBOOK_ROOT / "playbook.toml.example" if example_config.is_file(): copy2(example_config, dest_prefix / "playbook.toml.example") write_docs_index(dest_prefix, langs) write_snapshot_readme(dest_prefix, playbook_root, langs) write_source_file(dest_prefix, langs) log(f"Installed snapshot -> {dest_prefix}") return 0 def validate_subtree_install(context: dict) -> int: project_root: Path = context["project_root"] playbook_root = context["playbook_root"] expected_root = (project_root / playbook_root).resolve() if PLAYBOOK_ROOT.resolve() == expected_root: return 0 print( "ERROR: install_mode = \"subtree\" requires running the project-local " f"Playbook script at " f"{join_playbook_subpath(playbook_root, 'scripts/playbook.py')}", file=sys.stderr, ) return 2 def replace_placeholders( text: str, project_name: str | None, date_value: str, playbook_scripts: str | None, playbook_root: str | None, ) -> str: result = text.replace("{{DATE}}", date_value) if project_name: result = result.replace("{{PROJECT_NAME}}", project_name) if playbook_root: result = result.replace("{{PLAYBOOK_ROOT}}", playbook_root) if playbook_scripts: result = result.replace("{{PLAYBOOK_SCRIPTS}}", playbook_scripts) return result def backup_path(path: Path, no_backup: bool) -> None: if not path.exists() or no_backup: return timestamp = datetime.now().strftime("%Y%m%d%H%M%S") backup = path.with_name(f"{path.name}.bak.{timestamp}") path.rename(backup) log(f"Backed up: {path} -> {backup}") def replace_placeholders_in_file( file_path: Path, project_name: str | None, date_value: str, playbook_scripts: str | None, playbook_root: str | None, ) -> None: if file_path.suffix != ".md": return text = file_path.read_text(encoding="utf-8") updated = replace_placeholders( text, project_name, date_value, playbook_scripts, playbook_root ) if updated != text: file_path.write_text(updated, encoding="utf-8", newline="\n") def resolve_template_target( template_file: Path, template_root: Path, target_root: Path ) -> Path: rel = template_file.relative_to(template_root) if rel.name.endswith(".template.md"): rel = rel.with_name(rel.name.replace(".template.md", ".md")) return target_root / rel def sync_directory( template_dir: Path, target_dir: Path, project_name: str | None, date_value: str, playbook_scripts: str | None, playbook_root: str | None, force: bool, no_backup: bool, ) -> None: for template_file in template_dir.rglob("*"): if not template_file.is_file(): continue target_file = resolve_template_target( template_file, template_dir, target_dir ) ensure_dir(target_file.parent) if target_file.exists(): if not force: continue backup_path(target_file, no_backup) copy2(template_file, target_file) replace_placeholders_in_file( target_file, project_name, date_value, playbook_scripts, playbook_root, ) def extract_block_lines(text: str, start: str, end: str) -> list[str]: lines = text.splitlines() block: list[str] = [] in_block = False for line in lines: if line.strip() == start: in_block = True if in_block: block.append(line) if in_block and line.strip() == end: break if not block or block[-1].strip() != end: return [] return block def update_agents_section( agents_path: Path, template_path: Path, start_marker: str, end_marker: str, project_name: str | None, date_value: str, playbook_scripts: str | None, playbook_root: str | None, ) -> None: template_text = template_path.read_text(encoding="utf-8") template_text = replace_placeholders( template_text, project_name, date_value, playbook_scripts, playbook_root ) block = extract_block_lines(template_text, start_marker, end_marker) if not block: log("Skip: markers not found in template") return if not agents_path.exists(): agents_path.write_text(template_text + "\n", encoding="utf-8", newline="\n") log("Created: AGENTS.md") return agents_text = agents_path.read_text(encoding="utf-8") if start_marker in agents_text: lines = agents_text.splitlines() updated: list[str] = [] in_block = False replaced = False for line in lines: if not replaced and line.strip() == start_marker: updated.extend(block) in_block = True replaced = True continue if in_block: if line.strip() == end_marker: in_block = False continue updated.append(line) agents_path.write_text( "\n".join(updated) + "\n", encoding="utf-8", newline="\n" ) log("Updated: AGENTS.md (section)") else: if ".agents/index.md" in agents_text: log("Skip: AGENTS.md already references .agents/index.md") return updated = agents_text.rstrip("\n") + "\n\n" + "\n".join(block) + "\n" agents_path.write_text(updated, encoding="utf-8", newline="\n") log("Appended: AGENTS.md (section)") def resolve_project_name(context: dict) -> str | None: config = context.get("config", {}) if not isinstance(config, dict): return None memory_conf = config.get("sync_memory_bank") if isinstance(memory_conf, dict): raw = memory_conf.get("project_name") if raw is not None and str(raw).strip(): return str(raw).strip() return None def resolve_template_date(context: dict) -> str: config = context.get("config", {}) if isinstance(config, dict): for key in ("sync_rules", "sync_memory_bank", "sync_prompts"): section = config.get(key) if isinstance(section, dict): value = section.get("date") if value: return str(value) return datetime.now().strftime("%Y-%m-%d") def sync_agents_template(context: dict) -> int: project_root: Path = context["project_root"] if project_root.resolve() == PLAYBOOK_ROOT.resolve(): log("Skip: playbook root equals project root.") return 0 templates_dir = PLAYBOOK_ROOT / "templates" agents_src = templates_dir / "AGENTS.template.md" if not agents_src.is_file(): return 0 project_name = resolve_project_name(context) playbook_scripts = resolve_playbook_scripts(context) playbook_root = resolve_playbook_root(context) date_value = resolve_template_date(context) agents_dst = project_root / "AGENTS.md" if agents_dst.exists(): agents_text = agents_dst.read_text(encoding="utf-8") if "" in agents_text: start_marker = "" end_marker = "" elif "" in agents_text: start_marker = "" end_marker = "" else: start_marker = "" end_marker = "" else: start_marker = "" end_marker = "" update_agents_section( agents_dst, agents_src, start_marker, end_marker, project_name, date_value, playbook_scripts, playbook_root, ) sync_claude_md(project_root, context.get("config", {})) return 0 _CLAUDE_BLOCK_START = "" _CLAUDE_BLOCK_END = "" _CLAUDE_MD_CANDIDATES = ["CLAUDE.md", ".claude/CLAUDE.md"] def _claude_block_needs_heading(text: str) -> bool: return text.lstrip().startswith(_CLAUDE_BLOCK_START) def sync_claude_md(project_root: Path, config: dict) -> None: claude_md_config = config.get("playbook", {}).get("claude_md") claude_md: Path | None = None if claude_md_config: claude_md = project_root / claude_md_config else: for candidate in _CLAUDE_MD_CANDIDATES: path = project_root / candidate if path.exists(): claude_md = path break if claude_md is None: claude_md = project_root / "CLAUDE.md" rel_prefix = "" try: rel = claude_md.parent.resolve().relative_to(project_root.resolve()) if rel != Path("."): depth = len(rel.parts) rel_prefix = "../" * depth except ValueError: pass block_lines = [ _CLAUDE_BLOCK_START, "", f"@{rel_prefix}AGENTS.md", f"@{rel_prefix}AGENT_RULES.md", "", _CLAUDE_BLOCK_END, ] if not claude_md.exists(): ensure_dir(claude_md.parent) claude_md.write_text( "# CLAUDE.md\n\n" + "\n".join(block_lines) + "\n", encoding="utf-8", newline="\n", ) log(f"Created {claude_md.relative_to(project_root)} with playbook block.") return text = claude_md.read_text(encoding="utf-8") if _CLAUDE_BLOCK_START in text: lines = text.splitlines() updated: list[str] = [] in_block = False replaced = False for line in lines: if not replaced and line.strip() == _CLAUDE_BLOCK_START: updated.extend(block_lines) in_block = True replaced = True continue if in_block: if line.strip() == _CLAUDE_BLOCK_END: in_block = False continue updated.append(line) updated_text = "\n".join(updated) + "\n" if _claude_block_needs_heading(updated_text): updated_text = "# CLAUDE.md\n\n" + updated_text.lstrip() claude_md.write_text(updated_text, encoding="utf-8", newline="\n") log("Updated CLAUDE.md (playbook block).") elif "@AGENTS.md" in text: log("Skip: CLAUDE.md already references AGENTS.md") else: appended = text.rstrip("\n") + "\n\n" + "\n".join(block_lines) + "\n" claude_md.write_text(appended, encoding="utf-8", newline="\n") log("Appended playbook block to CLAUDE.md") def should_sync_agents(config: dict) -> bool: for key in ("sync_rules", "sync_memory_bank", "sync_prompts", "sync_standards"): if key in config: return True return False def sync_rules_action(config: dict, context: dict) -> int: project_root: Path = context["project_root"] if project_root.resolve() == PLAYBOOK_ROOT.resolve(): log("Skip: playbook root equals project root.") return 0 templates_dir = PLAYBOOK_ROOT / "templates" rules_src = templates_dir / "AGENT_RULES.template.md" if not rules_src.is_file(): print(f"ERROR: template not found: {rules_src}", file=sys.stderr) return 2 rules_dst = project_root / "AGENT_RULES.md" force = bool(config.get("force", False)) if rules_dst.exists() and not force: log("AGENT_RULES.md already exists. Use force to overwrite.") return 0 project_name = resolve_project_name(context) playbook_scripts = resolve_playbook_scripts(context) playbook_root = resolve_playbook_root(context) date_value = config.get("date") or datetime.now().strftime("%Y-%m-%d") no_backup = bool(config.get("no_backup", False)) backup_path(rules_dst, no_backup) text = rules_src.read_text(encoding="utf-8") text = replace_placeholders( text, project_name, date_value, playbook_scripts, playbook_root ) rules_dst.write_text(text.rstrip("\n") + "\n", encoding="utf-8", newline="\n") log("Synced: AGENT_RULES.md") local_rules = project_root / "AGENT_RULES.local.md" if not local_rules.exists(): local_rules.write_text( "# AGENT_RULES.local\n" "\n" "项目私有规则(优先级高于 AGENT_RULES.md)。\n" "\n" "在此记录:\n" "\n" "- 项目特有的注意事项与常见陷阱\n" "- 同一错误发生 2 次以上时的修正规则\n" "- 团队约定的额外约束\n", encoding="utf-8", newline="\n", ) log("Created: AGENT_RULES.local.md") return 0 def sync_memory_bank_action(config: dict, context: dict) -> int: project_root: Path = context["project_root"] if project_root.resolve() == PLAYBOOK_ROOT.resolve(): log("Skip: playbook root equals project root.") return 0 templates_dir = PLAYBOOK_ROOT / "templates" memory_src = templates_dir / "memory-bank" if not memory_src.is_dir(): print(f"ERROR: templates not found: {memory_src}", file=sys.stderr) return 2 project_name = config.get("project_name") playbook_scripts = resolve_playbook_scripts(context) playbook_root = resolve_playbook_root(context) date_value = config.get("date") or datetime.now().strftime("%Y-%m-%d") force = bool(config.get("force", False)) no_backup = bool(config.get("no_backup", False)) memory_dst = project_root / "memory-bank" ensure_dir(memory_dst) sync_directory( memory_src, memory_dst, project_name, date_value, playbook_scripts, playbook_root, force, no_backup, ) log("Synced: memory-bank/") return 0 def sync_prompts_action(config: dict, context: dict) -> int: project_root: Path = context["project_root"] if project_root.resolve() == PLAYBOOK_ROOT.resolve(): log("Skip: playbook root equals project root.") return 0 templates_dir = PLAYBOOK_ROOT / "templates" prompts_src = templates_dir / "prompts" if not prompts_src.is_dir(): print(f"ERROR: templates not found: {prompts_src}", file=sys.stderr) return 2 project_name = resolve_project_name(context) playbook_scripts = resolve_playbook_scripts(context) playbook_root = resolve_playbook_root(context) date_value = config.get("date") or datetime.now().strftime("%Y-%m-%d") force = bool(config.get("force", False)) no_backup = bool(config.get("no_backup", False)) prompts_dst = project_root / "docs/prompts" ensure_dir(prompts_dst.parent) ensure_dir(prompts_dst) sync_directory( prompts_src, prompts_dst, project_name, date_value, playbook_scripts, playbook_root, force, no_backup, ) log("Synced: docs/prompts/") return 0 def render_agents_block(langs: list[str]) -> list[str]: entries = [f"`.agents/{lang}/index.md`" for lang in langs] langs_line = "、".join(entries) if entries else "" lines = [ "", "", "请以 `.agents/` 下的规则为准:", "", "- 入口:`.agents/index.md`", f"- 语言规则:{langs_line}" if langs_line else "- 语言规则:", "", ] return lines def update_agents_block(agents_md: Path, block_lines: list[str]) -> None: start = "" end = "" if not agents_md.exists(): content = "# Agent Instructions\n\n" + "\n".join(block_lines) + "\n" agents_md.write_text(content, encoding="utf-8", newline="\n") log("Created AGENTS.md") return text = agents_md.read_text(encoding="utf-8") if start in text: lines = text.splitlines() updated: list[str] = [] in_block = False replaced = False for line in lines: if not replaced and line.strip() == start: updated.extend(block_lines) in_block = True replaced = True continue if in_block: if line.strip() == end: in_block = False continue updated.append(line) agents_md.write_text("\n".join(updated) + "\n", encoding="utf-8", newline="\n") log("Updated AGENTS.md (playbook block).") else: if ".agents/index.md" in text: log("Skip: AGENTS.md already references .agents/index.md") return updated = text.rstrip("\n") + "\n\n" + "\n".join(block_lines) + "\n" agents_md.write_text(updated, encoding="utf-8", newline="\n") log("Appended playbook block to AGENTS.md") def create_agents_index(agents_root: Path, langs: list[str], docs_prefix: str | None) -> None: agents_index = agents_root / "index.md" lang_descriptions = { "tsl": "TSL 相关规则集(由 playbook 同步;适用于 `.tsl`/`.tsf`)", "cpp": "C++ 相关规则集(由 playbook 同步;适用于 C++23/Modules)", "python": "Python 相关规则集(由 playbook 同步)", "typescript": "TypeScript/JavaScript 相关规则集(由 playbook 同步)", "markdown": "Markdown 相关规则集(仅代码格式化)", } lines = [ "# .agents(多语言)", "", "本目录用于存放仓库级/语言级的代理规则集。", "", "建议约定:", "", ] for lang in langs: description = lang_descriptions.get(lang, "相关规则集(由 playbook 同步)") lines.append(f"- `.agents/{lang}/`:{description}") lines += [ "", "规则发生冲突时,建议以“更靠近代码的目录规则更具体”为准。", "", "入口建议从:", "", ] for lang in langs: lines.append(f"- `.agents/{lang}/index.md`") lines += [ "", "标准快照文档入口:", "", f"- {docs_prefix or 'docs/'}", ] agents_index.write_text("\n".join(lines) + "\n", encoding="utf-8", newline="\n") log("Synced .agents/index.md") def rewrite_docs_links_in_markdown(root: Path, docs_prefix: str, recursive: bool) -> None: replacements = { "tsl": f"{docs_prefix}/tsl/", "cpp": f"{docs_prefix}/cpp/", "python": f"{docs_prefix}/python/", "typescript": f"{docs_prefix}/typescript/", "markdown": f"{docs_prefix}/markdown/", "common": f"{docs_prefix}/common/", } iterator = root.rglob("*.md") if recursive else root.glob("*.md") patterns = [ (re.compile(rf"(? None: rewrite_docs_links_in_markdown(agents_dir, docs_prefix, recursive=False) def rewrite_skill_docs_links(skill_dir: Path, docs_prefix: str) -> None: rewrite_docs_links_in_markdown(skill_dir, docs_prefix, recursive=True) def read_gitattributes_entries(path: Path) -> list[str]: entries: list[str] = [] for line in path.read_text(encoding="utf-8").splitlines(): stripped = line.strip() if not stripped or stripped.startswith("#"): continue entries.append(stripped) return entries def sync_gitattributes_overwrite(src: Path, dst: Path, no_backup: bool) -> None: if src.resolve() == dst.resolve(): log("Skip: .gitattributes source equals destination.") return backup_path(dst, no_backup) copy2(src, dst) log("Synced .gitattributes from standards (overwrite).") def sync_gitattributes_append( src: Path, dst: Path, source_note: str, no_backup: bool ) -> None: src_entries = read_gitattributes_entries(src) dst_entries: list[str] = [] if dst.exists(): dst_entries = read_gitattributes_entries(dst) missing = [line for line in src_entries if line not in set(dst_entries)] if not missing: log("No missing .gitattributes rules to append.") return original = dst.read_text(encoding="utf-8") if dst.exists() else "" backup_path(dst, no_backup) header = f"# Added from playbook .gitattributes (source: {source_note})" content = original.rstrip("\n") if content: content += "\n\n" content += header + "\n" + "\n".join(missing) + "\n" dst.write_text(content, encoding="utf-8", newline="\n") log("Appended missing .gitattributes rules from standards.") def sync_gitattributes_block(src: Path, dst: Path, no_backup: bool) -> None: begin = "# BEGIN playbook .gitattributes" end = "# END playbook .gitattributes" begin_old = "# BEGIN tsl-playbook .gitattributes" end_old = "# END tsl-playbook .gitattributes" src_lines = src.read_text(encoding="utf-8").splitlines() block_lines = [begin] + src_lines + [end] if dst.exists(): original = dst.read_text(encoding="utf-8").splitlines() updated: list[str] = [] in_block = False replaced = False for line in original: if line == begin or line == begin_old: if not replaced: updated.extend(block_lines) replaced = True in_block = True continue if in_block: if line == end or line == end_old: in_block = False continue updated.append(line) if not replaced: if updated and updated[-1].strip(): updated.append("") updated.extend(block_lines) backup_path(dst, no_backup) dst.write_text("\n".join(updated) + "\n", encoding="utf-8", newline="\n") else: dst.write_text("\n".join(block_lines) + "\n", encoding="utf-8", newline="\n") log("Synced .gitattributes from standards (block).") def sync_standards_action(config: dict, context: dict) -> int: if "langs" not in config: print("ERROR: langs is required for sync_standards", file=sys.stderr) return 2 try: langs = normalize_langs(config.get("langs")) except ValueError as exc: print(f"ERROR: {exc}", file=sys.stderr) return 2 project_root: Path = context["project_root"] agents_root = project_root / ".agents" ensure_dir(agents_root) no_backup = bool(config.get("no_backup", False)) timestamp = datetime.now().strftime("%Y%m%d%H%M%S") for lang in langs: src = PLAYBOOK_ROOT / "rulesets" / lang if not src.is_dir(): print(f"ERROR: agents ruleset not found: {src}", file=sys.stderr) return 2 dst = agents_root / lang if dst.exists(): if no_backup: rmtree(dst) else: backup = agents_root / f"{lang}.bak.{timestamp}" dst.rename(backup) log(f"Backed up existing {lang} agents -> {backup.name}") copytree(src, dst) log(f"Synced .agents/{lang} from standards.") docs_prefix = resolve_docs_prefix(context) if docs_prefix: for lang in langs: rewrite_agents_docs_links(agents_root / lang, docs_prefix) agents_md = project_root / "AGENTS.md" block_lines = render_agents_block(langs) update_agents_block(agents_md, block_lines) create_agents_index(agents_root, langs, docs_prefix) gitattributes_src = PLAYBOOK_ROOT / ".gitattributes" if gitattributes_src.is_file(): mode = str(config.get("gitattr_mode", "append")).lower() gitattributes_dst = project_root / ".gitattributes" source_note = str(gitattributes_src) try: source_note = str(gitattributes_src.resolve().relative_to(project_root.resolve())) except ValueError: source_note = str(gitattributes_src) if mode == "skip": log("Skip: .gitattributes sync (mode=skip).") elif mode == "overwrite": sync_gitattributes_overwrite(gitattributes_src, gitattributes_dst, no_backup) elif mode == "block": sync_gitattributes_block(gitattributes_src, gitattributes_dst, no_backup) else: sync_gitattributes_append( gitattributes_src, gitattributes_dst, source_note, no_backup ) return 0 def normalize_names(raw: object, label: str) -> list[str]: if raw is None: raise ValueError(f"{label} is required") if isinstance(raw, str): items = [raw] else: items = list(raw) cleaned: list[str] = [] for item in items: name = str(item).strip() if not name: continue if "/" in name or "\\" in name or ".." in name: raise ValueError(f"invalid {label}: {name}") cleaned.append(name) if not cleaned: raise ValueError(f"{label} is empty") return cleaned def normalize_globs(raw: object) -> list[str]: if raw is None: return ["**/*.md"] if isinstance(raw, str): items = [raw] else: items = list(raw) cleaned = [str(item).strip() for item in items if str(item).strip()] return cleaned or ["**/*.md"] def install_skills_action(config: dict, context: dict) -> int: mode = str(config.get("mode", "list")).lower() if "codex_home" in config: print("ERROR: codex_home is no longer supported; use agents_home", file=sys.stderr) return 2 agents_home = Path(config.get("agents_home", "~/.agents")).expanduser() if not agents_home.is_absolute(): agents_home = (context["project_root"] / agents_home).resolve() skills_src_root = PLAYBOOK_ROOT / "skills" skills_thirdparty_root = skills_src_root / "thirdparty" if not skills_src_root.is_dir(): print(f"ERROR: skills source not found: {skills_src_root}", file=sys.stderr) return 2 skills_dst_root = agents_home / "skills" ensure_dir(skills_dst_root) if mode == "all": own_skills = [ (path.name, skills_src_root, "own") for path in skills_src_root.iterdir() if path.is_dir() and not path.name.startswith(".") and path.name != "thirdparty" ] third_skills = [ (path.name, skills_thirdparty_root, "thirdparty") for path in skills_thirdparty_root.iterdir() if path.is_dir() and not path.name.startswith(".") ] if skills_thirdparty_root.is_dir() else [] skill_entries = own_skills + third_skills elif mode == "list": try: names = normalize_names(config.get("skills"), "skills") except ValueError as exc: print(f"ERROR: {exc}", file=sys.stderr) return 2 skill_entries = [] for name in names: if (skills_src_root / name).is_dir(): skill_entries.append((name, skills_src_root, "own")) elif skills_thirdparty_root.is_dir() and (skills_thirdparty_root / name).is_dir(): skill_entries.append((name, skills_thirdparty_root, "thirdparty")) else: print(f"ERROR: skill not found: {name}", file=sys.stderr) return 2 else: print("ERROR: mode must be list or all", file=sys.stderr) return 2 timestamp = datetime.now().strftime("%Y%m%d%H%M%S") no_backup = bool(config.get("no_backup", False)) for name, src_root, origin in skill_entries: src = src_root / name dst = skills_dst_root / name if dst.exists(): if no_backup: rmtree(dst) else: backup = skills_dst_root / f"{name}.bak.{timestamp}" dst.rename(backup) log(f"Backed up existing skill: {name} -> {backup.name}") copytree(src, dst) rewrite_skill_docs_links(dst, resolve_docs_prefix(context)) tag = " [thirdparty]" if origin == "thirdparty" else "" log(f"Installed: {name}{tag}") skill_link_raw = config.get("skill_link") if skill_link_raw: skill_link_home = Path(str(skill_link_raw)).expanduser() if not skill_link_home.is_absolute(): skill_link_home = (context["project_root"] / skill_link_home).resolve() _create_skills_symlink(skill_link_home / "skills", skills_dst_root) return 0 def _is_junction(path: Path) -> bool: if sys.platform != "win32": return False try: import ctypes.wintypes attrs = ctypes.windll.kernel32.GetFileAttributesW(str(path)) return attrs != -1 and bool(attrs & 0x400) except Exception: return False def _create_skills_symlink(link_path: Path, target_path: Path) -> None: if link_path.is_symlink() or _is_junction(link_path): if link_path.resolve() == target_path.resolve(): log(f"Symlink already up to date: {link_path}") return if link_path.is_symlink(): link_path.unlink() elif _is_junction(link_path): link_path.rmdir() elif link_path.exists(): log(f"Skip symlink: {link_path} exists and is not a symlink") return ensure_dir(link_path.parent) try: link_path.symlink_to(target_path, target_is_directory=True) log(f"Created symlink: {link_path} -> {target_path}") except OSError: if sys.platform == "win32": result = subprocess.run( ["cmd", "/c", "mklink", "/J", str(link_path), str(target_path)], capture_output=True, ) if result.returncode == 0: log(f"Created junction: {link_path} -> {target_path}") else: log(f"Warning: could not create junction {link_path}") else: log(f"Warning: could not create symlink {link_path}") def format_md_action(config: dict, context: dict) -> int: tool = str(config.get("tool", "prettier")).lower() if tool != "prettier": print("ERROR: format_md.tool only supports prettier", file=sys.stderr) return 2 project_root: Path = context["project_root"] prettier = project_root / "node_modules/.bin/prettier" if not prettier.is_file(): prettier = PLAYBOOK_ROOT / "node_modules/.bin/prettier" if not prettier.is_file(): resolved = which("prettier") if resolved: prettier = Path(resolved) else: log("Skip: prettier not found.") return 0 globs_raw = config.get("globs", ["**/*.md"]) globs = normalize_globs(globs_raw) result = subprocess.run( [str(prettier), "-w", *globs], cwd=project_root, capture_output=True, text=True, ) if result.returncode != 0: sys.stderr.write(result.stderr) return result.returncode def run_action(name: str, config: dict, context: dict) -> int: print(f"[action] {name}") if name == "sync_rules": return sync_rules_action(config, context) if name == "sync_memory_bank": return sync_memory_bank_action(config, context) if name == "sync_prompts": return sync_prompts_action(config, context) if name == "sync_standards": return sync_standards_action(config, context) if name == "install_skills": return install_skills_action(config, context) if name == "format_md": return format_md_action(config, context) return 0 def main(argv: list[str]) -> int: if "-h" in argv or "-help" in argv: print(usage()) return 0 spec_path = parse_cli_value(argv, "-record-spec") if spec_path is not None: progress_path = parse_cli_value(argv, "-progress") if not progress_path: print("ERROR: -progress is required.\n" + usage(), file=sys.stderr) return 2 code, message = MAIN_LOOP.record_workflow_state( Path(progress_path), "planning", spec_path, None, None, None, ) if code != 0: print(message, file=sys.stderr) return code print(message) return 0 plan_path = parse_cli_value(argv, "-record-plan") if plan_path is not None: progress_path = parse_cli_value(argv, "-progress") if not progress_path: print("ERROR: -progress is required.\n" + usage(), file=sys.stderr) return 2 code, message = MAIN_LOOP.record_workflow_state( Path(progress_path), "planning", None, plan_path, "executing-plans", "karpathy-guidelines,.agents,AGENT_RULES", ) if code != 0: print(message, file=sys.stderr) return code print(message) return 0 if "-config" not in argv: print("ERROR: -config is required.\n" + usage(), file=sys.stderr) return 2 idx = argv.index("-config") if idx + 1 >= len(argv) or not argv[idx + 1]: print("ERROR: -config requires a path.\n" + usage(), file=sys.stderr) return 2 config_path = Path(argv[idx + 1]).expanduser() if not config_path.is_file(): print(f"ERROR: config not found: {config_path}", file=sys.stderr) return 2 config = load_config(config_path) try: validate_removed_config(config) install_mode = resolve_install_mode(config) except ValueError as exc: print(f"ERROR: {exc}", file=sys.stderr) return 2 playbook_config = config.get("playbook", {}) project_root = playbook_config.get("project_root") if project_root: root = Path(project_root).expanduser() if not root.is_absolute(): root = (config_path.parent / root).resolve() else: root = config_path.parent resolved_root = root.resolve() try: playbook_root = resolve_configured_playbook_root(config, resolved_root) except ValueError as exc: print(f"ERROR: {exc}", file=sys.stderr) return 2 context = { "project_root": resolved_root, "config_path": config_path.resolve(), "config": config, "install_mode": install_mode, "playbook_root": playbook_root, } if install_mode == "snapshot": result = install_snapshot(config, context) if result != 0: return result elif config_uses_playbook_root(config): result = validate_subtree_install(context) if result != 0: return result if should_sync_agents(config): result = sync_agents_template(context) if result != 0: return result for name in ORDER: if name in config: result = run_action(name, config[name], context) if result != 0: return result return 0 if __name__ == "__main__": raise SystemExit(main(sys.argv[1:]))