249 lines
8.7 KiB
Python
249 lines
8.7 KiB
Python
#!/usr/bin/env python3
|
|
from __future__ import annotations
|
|
|
|
import argparse
|
|
import os
|
|
import re
|
|
import sys
|
|
from pathlib import Path
|
|
|
|
from _safe_files import is_safe_regular_file
|
|
from _project_paths import find_repo_root
|
|
from validate_skills import configure_utf8_output, parse_frontmatter
|
|
|
|
|
|
FRONTMATTER_PATTERN = re.compile(r"^---\s*\n(.*?)\n---", re.DOTALL)
|
|
TOP_LEVEL_KEY_PATTERN = re.compile(r"^[A-Za-z0-9_-]+:\s*")
|
|
SECURITY_DISCLAIMER_PATTERN = re.compile(r"AUTHORIZED USE ONLY", re.IGNORECASE)
|
|
SKILLS_ADD_PATTERN = re.compile(
|
|
r"\b(?:npx|pnpm\s+dlx|yarn\s+dlx|bunx)?\s*skills\s+add\s+([A-Za-z0-9_.-]+/[A-Za-z0-9_.-]+)"
|
|
)
|
|
SECTION_HEADING_PATTERN = re.compile(r"^##\s+", re.MULTILINE)
|
|
SOURCE_HEADING_PATTERN = re.compile(r"^##\s+Sources?\s*$", re.MULTILINE | re.IGNORECASE)
|
|
URL_PATTERN = re.compile(r"https?://[^\s)>'\"]+")
|
|
GITHUB_REPO_PATTERN = re.compile(r"^https?://github\.com/([^/\s]+)/([^/\s#?]+)")
|
|
|
|
|
|
def strip_frontmatter(content: str) -> tuple[str, str] | None:
|
|
match = FRONTMATTER_PATTERN.search(content)
|
|
if not match:
|
|
return None
|
|
return match.group(1), content[match.end():]
|
|
|
|
|
|
def repair_malformed_injected_metadata(content: str) -> str:
|
|
pattern = re.compile(
|
|
r"(^metadata:\n)(risk:\s+[^\n]+\nsource:\s+[^\n]+\n)((?:[ \t]+[^\n]*\n)+)",
|
|
re.MULTILINE,
|
|
)
|
|
return pattern.sub(lambda match: match.group(2) + match.group(1) + match.group(3), content, count=1)
|
|
|
|
|
|
def normalize_github_url(url: str) -> str:
|
|
match = GITHUB_REPO_PATTERN.match(url.rstrip("/"))
|
|
if not match:
|
|
return url.rstrip("/")
|
|
owner, repo = match.groups()
|
|
if repo.endswith(".git"):
|
|
repo = repo[:-4]
|
|
return f"https://github.com/{owner}/{repo}"
|
|
|
|
|
|
def extract_urls(text: str) -> list[str]:
|
|
return [match.group(0).rstrip(".,:;") for match in URL_PATTERN.finditer(text)]
|
|
|
|
|
|
def extract_source_section(body: str) -> str | None:
|
|
match = SOURCE_HEADING_PATTERN.search(body)
|
|
if not match:
|
|
return None
|
|
|
|
remainder = body[match.end():]
|
|
next_heading = SECTION_HEADING_PATTERN.search(remainder)
|
|
if next_heading:
|
|
return remainder[: next_heading.start()].strip()
|
|
return remainder.strip()
|
|
|
|
|
|
def infer_source(skill_name: str, body: str) -> str:
|
|
skills_add_match = SKILLS_ADD_PATTERN.search(body)
|
|
if skills_add_match:
|
|
return f"https://github.com/{skills_add_match.group(1)}"
|
|
|
|
source_section = extract_source_section(body)
|
|
if source_section:
|
|
urls = [normalize_github_url(url) for url in extract_urls(source_section)]
|
|
unique_urls = list(dict.fromkeys(urls))
|
|
if len(unique_urls) == 1:
|
|
return unique_urls[0]
|
|
|
|
non_empty_lines = [
|
|
line.strip(" -*`>")
|
|
for line in source_section.splitlines()
|
|
if line.strip() and not line.strip().startswith("```")
|
|
]
|
|
if len(non_empty_lines) == 1 and len(non_empty_lines[0]) <= 120:
|
|
return non_empty_lines[0]
|
|
|
|
urls = [normalize_github_url(url) for url in extract_urls(body)]
|
|
unique_urls = list(dict.fromkeys(urls))
|
|
github_urls = [url for url in unique_urls if GITHUB_REPO_PATTERN.match(url)]
|
|
|
|
normalized_skill_name = skill_name.lower().replace("-", "")
|
|
github_matches = []
|
|
for url in github_urls:
|
|
github_match = GITHUB_REPO_PATTERN.match(url)
|
|
if not github_match:
|
|
continue
|
|
owner, repo = github_match.groups()
|
|
normalized_repo = repo.lower().replace("-", "").replace("_", "")
|
|
if normalized_skill_name and normalized_skill_name in normalized_repo:
|
|
github_matches.append(normalize_github_url(url))
|
|
|
|
github_matches = list(dict.fromkeys(github_matches))
|
|
if len(github_matches) == 1:
|
|
return github_matches[0]
|
|
|
|
if len(github_urls) == 1:
|
|
github_match = GITHUB_REPO_PATTERN.match(github_urls[0])
|
|
if github_match:
|
|
_, repo = github_match.groups()
|
|
normalized_repo = repo.lower().replace("-", "").replace("_", "")
|
|
if normalized_skill_name and (
|
|
normalized_skill_name in normalized_repo or normalized_repo in normalized_skill_name
|
|
):
|
|
return github_urls[0]
|
|
|
|
return "community"
|
|
|
|
|
|
def infer_risk(body: str) -> str:
|
|
if SECURITY_DISCLAIMER_PATTERN.search(body):
|
|
return "offensive"
|
|
return "unknown"
|
|
|
|
|
|
def insert_metadata_keys(frontmatter_text: str, additions: dict[str, str]) -> str:
|
|
lines = frontmatter_text.splitlines()
|
|
insertion_index = len(lines)
|
|
|
|
for index, line in enumerate(lines):
|
|
stripped = line.strip()
|
|
indent = len(line) - len(line.lstrip(" "))
|
|
if not stripped:
|
|
continue
|
|
if indent == 0 and TOP_LEVEL_KEY_PATTERN.match(stripped) and not stripped.startswith(("name:", "description:")):
|
|
insertion_index = index
|
|
break
|
|
|
|
new_lines = [f'{key}: "{value}"' if ":" in value or value.startswith("http") else f"{key}: {value}" for key, value in additions.items()]
|
|
updated = lines[:insertion_index] + new_lines + lines[insertion_index:]
|
|
return "\n".join(updated)
|
|
|
|
|
|
def update_skill_file(skill_path: Path) -> tuple[bool, list[str]]:
|
|
if not is_safe_regular_file(skill_path):
|
|
return False, []
|
|
|
|
content = skill_path.read_text(encoding="utf-8")
|
|
repaired_content = repair_malformed_injected_metadata(content)
|
|
if repaired_content != content:
|
|
skill_path.write_text(repaired_content, encoding="utf-8")
|
|
content = repaired_content
|
|
|
|
frontmatter = strip_frontmatter(content)
|
|
if frontmatter is None:
|
|
return False, []
|
|
|
|
frontmatter_text, body = frontmatter
|
|
metadata, _ = parse_frontmatter(content, skill_path.as_posix())
|
|
if not metadata:
|
|
return False, []
|
|
|
|
additions: dict[str, str] = {}
|
|
changes: list[str] = []
|
|
skill_name = str(metadata.get("name") or skill_path.parent.name)
|
|
|
|
if "risk" not in metadata:
|
|
additions["risk"] = infer_risk(body)
|
|
changes.append("added_risk")
|
|
|
|
if "source" not in metadata:
|
|
additions["source"] = infer_source(skill_name, body)
|
|
changes.append("added_source")
|
|
|
|
if not additions:
|
|
return False, []
|
|
|
|
updated_frontmatter = insert_metadata_keys(frontmatter_text, additions)
|
|
updated_content = f"---\n{updated_frontmatter}\n---{body}"
|
|
if updated_content == content:
|
|
return False, []
|
|
|
|
skill_path.write_text(updated_content, encoding="utf-8")
|
|
return True, changes
|
|
|
|
|
|
def main() -> int:
|
|
configure_utf8_output()
|
|
|
|
parser = argparse.ArgumentParser(description="Add conservative defaults for missing skill risk/source metadata.")
|
|
parser.add_argument("--dry-run", action="store_true", help="Preview changes without writing files.")
|
|
args = parser.parse_args()
|
|
|
|
repo_root = find_repo_root(__file__)
|
|
skills_dir = repo_root / "skills"
|
|
|
|
modified = 0
|
|
for root, dirs, files in os.walk(skills_dir):
|
|
dirs[:] = [directory for directory in dirs if not directory.startswith(".")]
|
|
if "SKILL.md" not in files:
|
|
continue
|
|
|
|
skill_path = Path(root) / "SKILL.md"
|
|
if not is_safe_regular_file(skill_path):
|
|
print(f"SKIP {skill_path.relative_to(repo_root)} [symlinked_or_unreadable]")
|
|
continue
|
|
content = skill_path.read_text(encoding="utf-8")
|
|
repaired_content = repair_malformed_injected_metadata(content)
|
|
if repaired_content != content:
|
|
if args.dry_run:
|
|
modified += 1
|
|
print(f"FIX {skill_path.relative_to(repo_root)} [repaired_malformed_frontmatter]")
|
|
continue
|
|
skill_path.write_text(repaired_content, encoding="utf-8")
|
|
content = repaired_content
|
|
modified += 1
|
|
print(f"FIX {skill_path.relative_to(repo_root)} [repaired_malformed_frontmatter]")
|
|
|
|
metadata, _ = parse_frontmatter(content, skill_path.as_posix())
|
|
if not metadata:
|
|
continue
|
|
if "risk" in metadata and "source" in metadata:
|
|
continue
|
|
|
|
if args.dry_run:
|
|
changes: list[str] = []
|
|
frontmatter = strip_frontmatter(content)
|
|
body = frontmatter[1] if frontmatter else ""
|
|
if "risk" not in metadata:
|
|
changes.append(f"added_risk={infer_risk(body)}")
|
|
if "source" not in metadata:
|
|
skill_name = str(metadata.get("name") or skill_path.parent.name)
|
|
changes.append(f"added_source={infer_source(skill_name, body)}")
|
|
modified += 1
|
|
print(f"FIX {skill_path.relative_to(repo_root)} [{', '.join(changes)}]")
|
|
continue
|
|
|
|
changed, changes = update_skill_file(skill_path)
|
|
if changed:
|
|
modified += 1
|
|
print(f"FIX {skill_path.relative_to(repo_root)} [{', '.join(changes)}]")
|
|
|
|
print(f"\nModified: {modified}")
|
|
return 0
|
|
|
|
|
|
if __name__ == "__main__":
|
|
sys.exit(main())
|