tsl-devkit/lsp-server/test/run_lsp_json_tests.py

222 lines
7.6 KiB
Python

#!/usr/bin/env python3
import argparse
import json
import os
import select
import subprocess
import sys
import time
from pathlib import Path
def path_to_uri(path: Path) -> str:
return path.resolve().as_uri()
def load_sequence(sequence_path: Path) -> list[str]:
lines = []
for raw in sequence_path.read_text().splitlines():
line = raw.strip()
if not line or line.startswith("#"):
continue
lines.append(line)
return lines
def load_messages(request_dir: Path, replacements: dict[str, str], files: list[str]) -> list[dict]:
messages = []
for name in files:
text = (request_dir / name).read_text()
for key, value in replacements.items():
text = text.replace(key, value)
try:
messages.append(json.loads(text))
except json.JSONDecodeError as exc:
raise RuntimeError(f"Invalid JSON in {name}: {exc}") from exc
return messages
def build_payload(message: dict) -> bytes:
body = json.dumps(message, separators=(",", ":")).encode("utf-8")
header = f"Content-Length: {len(body)}\r\n\r\n".encode("ascii")
return header + body
def read_response(stream, buffer: bytes, timeout_seconds: float) -> tuple[dict, bytes]:
deadline = time.time() + timeout_seconds
fd = stream.fileno()
while True:
header_end = buffer.find(b"\r\n\r\n")
if header_end != -1:
header = buffer[:header_end].decode("ascii", errors="replace")
length = None
for line in header.splitlines():
if line.lower().startswith("content-length:"):
length = int(line.split(":", 1)[1].strip())
break
if length is None:
raise RuntimeError("Missing Content-Length in response header")
body_start = header_end + 4
if len(buffer) - body_start >= length:
body = buffer[body_start:body_start + length]
buffer = buffer[body_start + length:]
try:
return json.loads(body.decode("utf-8")), buffer
except json.JSONDecodeError as exc:
raise RuntimeError(f"Failed to decode response JSON: {exc}") from exc
remaining = deadline - time.time()
if remaining <= 0:
raise TimeoutError("Timed out waiting for response from tsl-server")
ready, _, _ = select.select([fd], [], [], remaining)
if not ready:
continue
chunk = os.read(fd, 4096)
if not chunk:
raise RuntimeError("tsl-server closed stdout unexpectedly")
buffer += chunk
def index_by_id(responses: list[dict]) -> dict[str, dict]:
by_id = {}
for response in responses:
if "id" in response:
by_id[str(response["id"])] = response
return by_id
def assert_has_key(obj: dict, key: str, context: str) -> None:
if key not in obj:
raise RuntimeError(f"Missing '{key}' in {context}")
def assert_error_code(response: dict, expected_code: int, context: str) -> None:
assert_has_key(response, "error", context)
error = response["error"]
if error.get("code") != expected_code:
raise RuntimeError(f"{context} error code {error.get('code')} != {expected_code}")
def validate_responses(by_id: dict[str, dict]) -> None:
for request_id in ("1", "2", "3", "4", "5", "6", "7"):
if request_id not in by_id:
raise RuntimeError(f"Missing response for request id {request_id}")
init = by_id["1"]
assert_has_key(init, "result", "initialize response")
capabilities = init["result"].get("capabilities", {})
if "completionProvider" not in capabilities:
raise RuntimeError("initialize response missing completionProvider")
completion = by_id["2"]
assert_has_key(completion, "result", "completion response")
result = completion["result"]
items = result.get("items") if isinstance(result, dict) else result
if not isinstance(items, list):
raise RuntimeError("completion response items missing or not a list")
if not any(item.get("label") == "function" for item in items if isinstance(item, dict)):
raise RuntimeError("completion response missing keyword completions")
resolved = by_id["3"]
if "result" in resolved:
if not isinstance(resolved["result"], dict) or resolved["result"].get("label") != "Widget":
raise RuntimeError("completion resolve should return the resolved item")
else:
assert_error_code(resolved, -32603, "completion resolve response")
assert_error_code(by_id["4"], -32601, "definition response")
assert_error_code(by_id["5"], -32601, "rename response")
assert_error_code(by_id["6"], -32601, "references response")
shutdown = by_id["7"]
if "error" in shutdown:
raise RuntimeError("shutdown response should not include error")
def main() -> int:
parser = argparse.ArgumentParser(description="Run LSP JSON tests against tsl-server.")
parser.add_argument("--server", default=None, help="Path to tsl-server binary")
parser.add_argument("--no-validate", action="store_true", help="Skip response validation")
args = parser.parse_args()
repo_root = Path(__file__).resolve().parents[2]
request_dir = repo_root / "lsp-server" / "test" / "request_json"
sequence_path = request_dir / "sequence.txt"
fixtures_dir = repo_root / "lsp-server" / "test" / "test_provider" / "fixtures"
main_unit = fixtures_dir / "main_unit.tsf"
rename_case = fixtures_dir / "rename_case.tsl"
workspace_dir = fixtures_dir / "workspace"
if not request_dir.exists():
raise RuntimeError(f"request_json not found: {request_dir}")
server_path = Path(args.server) if args.server else (repo_root / "vscode" / "bin" / "tsl-server")
if not server_path.exists():
raise RuntimeError(f"tsl-server not found: {server_path}")
replacements = {
"{{WORKSPACE_PATH}}": str(workspace_dir.resolve()),
"{{WORKSPACE_URI}}": path_to_uri(workspace_dir),
"{{MAIN_UNIT_URI}}": path_to_uri(main_unit),
"{{RENAME_CASE_URI}}": path_to_uri(rename_case),
"{{MAIN_UNIT_TEXT}}": json.dumps(main_unit.read_text()),
}
files = load_sequence(sequence_path)
messages = load_messages(request_dir, replacements, files)
proc = subprocess.Popen(
[str(server_path), "--log=off", "--log-stderr"],
stdin=subprocess.PIPE,
stdout=subprocess.PIPE,
stderr=subprocess.PIPE,
)
responses = []
by_id = {}
buffer = b""
try:
for message in messages:
if proc.stdin is None:
raise RuntimeError("tsl-server stdin is not available")
proc.stdin.write(build_payload(message))
proc.stdin.flush()
if "id" not in message:
continue
expected_id = str(message["id"])
while expected_id not in by_id:
response, buffer = read_response(proc.stdout, buffer, timeout_seconds=10)
responses.append(response)
if "id" in response:
by_id[str(response["id"])] = response
if proc.stdin:
proc.stdin.close()
proc.wait(timeout=5)
except Exception:
proc.kill()
proc.wait(timeout=5)
raise
if proc.returncode not in (0, None):
sys.stderr.write(proc.stderr.read().decode("utf-8", errors="replace"))
raise RuntimeError(f"tsl-server exited with code {proc.returncode}")
if not args.no_validate:
validate_responses(by_id)
return 0
if __name__ == "__main__":
raise SystemExit(main())