#!/usr/bin/env python3 import argparse import json import os import select import subprocess import sys import time from pathlib import Path def path_to_uri(path: Path) -> str: return path.resolve().as_uri() def load_sequence(sequence_path: Path) -> list[str]: lines = [] for raw in sequence_path.read_text().splitlines(): line = raw.strip() if not line or line.startswith("#"): continue lines.append(line) return lines def load_messages(request_dir: Path, replacements: dict[str, str], files: list[str]) -> list[dict]: messages = [] for name in files: text = (request_dir / name).read_text() for key, value in replacements.items(): text = text.replace(key, value) try: messages.append(json.loads(text)) except json.JSONDecodeError as exc: raise RuntimeError(f"Invalid JSON in {name}: {exc}") from exc return messages def build_payload(message: dict) -> bytes: body = json.dumps(message, separators=(",", ":")).encode("utf-8") header = f"Content-Length: {len(body)}\r\n\r\n".encode("ascii") return header + body def read_response(stream, buffer: bytes, timeout_seconds: float) -> tuple[dict, bytes]: deadline = time.time() + timeout_seconds fd = stream.fileno() while True: header_end = buffer.find(b"\r\n\r\n") if header_end != -1: header = buffer[:header_end].decode("ascii", errors="replace") length = None for line in header.splitlines(): if line.lower().startswith("content-length:"): length = int(line.split(":", 1)[1].strip()) break if length is None: raise RuntimeError("Missing Content-Length in response header") body_start = header_end + 4 if len(buffer) - body_start >= length: body = buffer[body_start:body_start + length] buffer = buffer[body_start + length:] try: return json.loads(body.decode("utf-8")), buffer except json.JSONDecodeError as exc: raise RuntimeError(f"Failed to decode response JSON: {exc}") from exc remaining = deadline - time.time() if remaining <= 0: raise TimeoutError("Timed out waiting for response from tsl-server") ready, _, _ = select.select([fd], [], [], remaining) if not ready: continue chunk = os.read(fd, 4096) if not chunk: raise RuntimeError("tsl-server closed stdout unexpectedly") buffer += chunk def index_by_id(responses: list[dict]) -> dict[str, dict]: by_id = {} for response in responses: if "id" in response: by_id[str(response["id"])] = response return by_id def assert_has_key(obj: dict, key: str, context: str) -> None: if key not in obj: raise RuntimeError(f"Missing '{key}' in {context}") def assert_error_code(response: dict, expected_code: int, context: str) -> None: assert_has_key(response, "error", context) error = response["error"] if error.get("code") != expected_code: raise RuntimeError(f"{context} error code {error.get('code')} != {expected_code}") def validate_responses(by_id: dict[str, dict]) -> None: for request_id in ("1", "2", "3", "4", "5", "6", "7"): if request_id not in by_id: raise RuntimeError(f"Missing response for request id {request_id}") init = by_id["1"] assert_has_key(init, "result", "initialize response") capabilities = init["result"].get("capabilities", {}) if "completionProvider" not in capabilities: raise RuntimeError("initialize response missing completionProvider") completion = by_id["2"] assert_has_key(completion, "result", "completion response") result = completion["result"] items = result.get("items") if isinstance(result, dict) else result if not isinstance(items, list): raise RuntimeError("completion response items missing or not a list") if not any(item.get("label") == "function" for item in items if isinstance(item, dict)): raise RuntimeError("completion response missing keyword completions") resolved = by_id["3"] if "result" in resolved: if not isinstance(resolved["result"], dict) or resolved["result"].get("label") != "Widget": raise RuntimeError("completion resolve should return the resolved item") else: assert_error_code(resolved, -32603, "completion resolve response") definition = by_id["4"] assert_has_key(definition, "result", "definition response") definition_result = definition["result"] if definition_result is not None and not isinstance(definition_result, (dict, list)): raise RuntimeError("definition response should be a location, list of locations, or null") rename = by_id["5"] assert_has_key(rename, "result", "rename response") rename_result = rename["result"] if rename_result is not None and not isinstance(rename_result, dict): raise RuntimeError("rename response should be a workspace edit or null") references = by_id["6"] assert_has_key(references, "result", "references response") references_result = references["result"] if not isinstance(references_result, list): raise RuntimeError("references response should be a list of locations") shutdown = by_id["7"] if "error" in shutdown: raise RuntimeError("shutdown response should not include error") def main() -> int: parser = argparse.ArgumentParser(description="Run LSP JSON tests against tsl-server.") parser.add_argument("--server", default=None, help="Path to tsl-server binary") parser.add_argument("--no-validate", action="store_true", help="Skip response validation") args = parser.parse_args() repo_root = Path(__file__).resolve().parents[2] request_dir = repo_root / "lsp-server" / "test" / "request_json" sequence_path = request_dir / "sequence.txt" fixtures_dir = repo_root / "lsp-server" / "test" / "test_provider" / "fixtures" main_unit = fixtures_dir / "main_unit.tsf" rename_case = fixtures_dir / "rename_case.tsl" workspace_dir = fixtures_dir / "workspace" if not request_dir.exists(): raise RuntimeError(f"request_json not found: {request_dir}") server_path = Path(args.server) if args.server else (repo_root / "vscode" / "bin" / "tsl-server") if not server_path.exists(): raise RuntimeError(f"tsl-server not found: {server_path}") replacements = { "{{WORKSPACE_PATH}}": str(workspace_dir.resolve()), "{{WORKSPACE_URI}}": path_to_uri(workspace_dir), "{{MAIN_UNIT_URI}}": path_to_uri(main_unit), "{{RENAME_CASE_URI}}": path_to_uri(rename_case), "{{MAIN_UNIT_TEXT}}": json.dumps(main_unit.read_text()), } files = load_sequence(sequence_path) messages = load_messages(request_dir, replacements, files) proc = subprocess.Popen( [str(server_path), "--log=off", "--log-stderr"], stdin=subprocess.PIPE, stdout=subprocess.PIPE, stderr=subprocess.PIPE, ) responses = [] by_id = {} buffer = b"" try: for message in messages: if proc.stdin is None: raise RuntimeError("tsl-server stdin is not available") proc.stdin.write(build_payload(message)) proc.stdin.flush() if "id" not in message: continue expected_id = str(message["id"]) while expected_id not in by_id: response, buffer = read_response(proc.stdout, buffer, timeout_seconds=10) responses.append(response) if "id" in response: by_id[str(response["id"])] = response if proc.stdin: proc.stdin.close() proc.wait(timeout=5) except Exception: proc.kill() proc.wait(timeout=5) raise if proc.returncode not in (0, None): sys.stderr.write(proc.stderr.read().decode("utf-8", errors="replace")) raise RuntimeError(f"tsl-server exited with code {proc.returncode}") if not args.no_validate: validate_responses(by_id) return 0 if __name__ == "__main__": raise SystemExit(main())