222 lines
7.6 KiB
Python
222 lines
7.6 KiB
Python
#!/usr/bin/env python3
|
|
import argparse
|
|
import json
|
|
import os
|
|
import select
|
|
import subprocess
|
|
import sys
|
|
import time
|
|
from pathlib import Path
|
|
|
|
|
|
def path_to_uri(path: Path) -> str:
|
|
return path.resolve().as_uri()
|
|
|
|
|
|
def load_sequence(sequence_path: Path) -> list[str]:
|
|
lines = []
|
|
for raw in sequence_path.read_text().splitlines():
|
|
line = raw.strip()
|
|
if not line or line.startswith("#"):
|
|
continue
|
|
lines.append(line)
|
|
return lines
|
|
|
|
|
|
def load_messages(request_dir: Path, replacements: dict[str, str], files: list[str]) -> list[dict]:
|
|
messages = []
|
|
for name in files:
|
|
text = (request_dir / name).read_text()
|
|
for key, value in replacements.items():
|
|
text = text.replace(key, value)
|
|
try:
|
|
messages.append(json.loads(text))
|
|
except json.JSONDecodeError as exc:
|
|
raise RuntimeError(f"Invalid JSON in {name}: {exc}") from exc
|
|
return messages
|
|
|
|
|
|
def build_payload(message: dict) -> bytes:
|
|
body = json.dumps(message, separators=(",", ":")).encode("utf-8")
|
|
header = f"Content-Length: {len(body)}\r\n\r\n".encode("ascii")
|
|
return header + body
|
|
|
|
|
|
def read_response(stream, buffer: bytes, timeout_seconds: float) -> tuple[dict, bytes]:
|
|
deadline = time.time() + timeout_seconds
|
|
fd = stream.fileno()
|
|
|
|
while True:
|
|
header_end = buffer.find(b"\r\n\r\n")
|
|
if header_end != -1:
|
|
header = buffer[:header_end].decode("ascii", errors="replace")
|
|
length = None
|
|
for line in header.splitlines():
|
|
if line.lower().startswith("content-length:"):
|
|
length = int(line.split(":", 1)[1].strip())
|
|
break
|
|
if length is None:
|
|
raise RuntimeError("Missing Content-Length in response header")
|
|
|
|
body_start = header_end + 4
|
|
if len(buffer) - body_start >= length:
|
|
body = buffer[body_start:body_start + length]
|
|
buffer = buffer[body_start + length:]
|
|
try:
|
|
return json.loads(body.decode("utf-8")), buffer
|
|
except json.JSONDecodeError as exc:
|
|
raise RuntimeError(f"Failed to decode response JSON: {exc}") from exc
|
|
|
|
remaining = deadline - time.time()
|
|
if remaining <= 0:
|
|
raise TimeoutError("Timed out waiting for response from tsl-server")
|
|
|
|
ready, _, _ = select.select([fd], [], [], remaining)
|
|
if not ready:
|
|
continue
|
|
|
|
chunk = os.read(fd, 4096)
|
|
if not chunk:
|
|
raise RuntimeError("tsl-server closed stdout unexpectedly")
|
|
buffer += chunk
|
|
|
|
|
|
def index_by_id(responses: list[dict]) -> dict[str, dict]:
|
|
by_id = {}
|
|
for response in responses:
|
|
if "id" in response:
|
|
by_id[str(response["id"])] = response
|
|
return by_id
|
|
|
|
|
|
def assert_has_key(obj: dict, key: str, context: str) -> None:
|
|
if key not in obj:
|
|
raise RuntimeError(f"Missing '{key}' in {context}")
|
|
|
|
|
|
def assert_error_code(response: dict, expected_code: int, context: str) -> None:
|
|
assert_has_key(response, "error", context)
|
|
error = response["error"]
|
|
if error.get("code") != expected_code:
|
|
raise RuntimeError(f"{context} error code {error.get('code')} != {expected_code}")
|
|
|
|
|
|
def validate_responses(by_id: dict[str, dict]) -> None:
|
|
for request_id in ("1", "2", "3", "4", "5", "6", "7"):
|
|
if request_id not in by_id:
|
|
raise RuntimeError(f"Missing response for request id {request_id}")
|
|
|
|
init = by_id["1"]
|
|
assert_has_key(init, "result", "initialize response")
|
|
capabilities = init["result"].get("capabilities", {})
|
|
if "completionProvider" not in capabilities:
|
|
raise RuntimeError("initialize response missing completionProvider")
|
|
|
|
completion = by_id["2"]
|
|
assert_has_key(completion, "result", "completion response")
|
|
result = completion["result"]
|
|
items = result.get("items") if isinstance(result, dict) else result
|
|
if not isinstance(items, list):
|
|
raise RuntimeError("completion response items missing or not a list")
|
|
if not any(item.get("label") == "function" for item in items if isinstance(item, dict)):
|
|
raise RuntimeError("completion response missing keyword completions")
|
|
|
|
resolved = by_id["3"]
|
|
if "result" in resolved:
|
|
if not isinstance(resolved["result"], dict) or resolved["result"].get("label") != "Widget":
|
|
raise RuntimeError("completion resolve should return the resolved item")
|
|
else:
|
|
assert_error_code(resolved, -32603, "completion resolve response")
|
|
|
|
assert_error_code(by_id["4"], -32601, "definition response")
|
|
assert_error_code(by_id["5"], -32601, "rename response")
|
|
assert_error_code(by_id["6"], -32601, "references response")
|
|
|
|
shutdown = by_id["7"]
|
|
if "error" in shutdown:
|
|
raise RuntimeError("shutdown response should not include error")
|
|
|
|
|
|
def main() -> int:
|
|
parser = argparse.ArgumentParser(description="Run LSP JSON tests against tsl-server.")
|
|
parser.add_argument("--server", default=None, help="Path to tsl-server binary")
|
|
parser.add_argument("--no-validate", action="store_true", help="Skip response validation")
|
|
args = parser.parse_args()
|
|
|
|
repo_root = Path(__file__).resolve().parents[2]
|
|
request_dir = repo_root / "lsp-server" / "test" / "request_json"
|
|
sequence_path = request_dir / "sequence.txt"
|
|
|
|
fixtures_dir = repo_root / "lsp-server" / "test" / "test_provider" / "fixtures"
|
|
main_unit = fixtures_dir / "main_unit.tsf"
|
|
rename_case = fixtures_dir / "rename_case.tsl"
|
|
workspace_dir = fixtures_dir / "workspace"
|
|
|
|
if not request_dir.exists():
|
|
raise RuntimeError(f"request_json not found: {request_dir}")
|
|
|
|
server_path = Path(args.server) if args.server else (repo_root / "vscode" / "bin" / "tsl-server")
|
|
if not server_path.exists():
|
|
raise RuntimeError(f"tsl-server not found: {server_path}")
|
|
|
|
replacements = {
|
|
"{{WORKSPACE_PATH}}": str(workspace_dir.resolve()),
|
|
"{{WORKSPACE_URI}}": path_to_uri(workspace_dir),
|
|
"{{MAIN_UNIT_URI}}": path_to_uri(main_unit),
|
|
"{{RENAME_CASE_URI}}": path_to_uri(rename_case),
|
|
"{{MAIN_UNIT_TEXT}}": json.dumps(main_unit.read_text()),
|
|
}
|
|
|
|
files = load_sequence(sequence_path)
|
|
messages = load_messages(request_dir, replacements, files)
|
|
|
|
proc = subprocess.Popen(
|
|
[str(server_path), "--log=off", "--log-stderr"],
|
|
stdin=subprocess.PIPE,
|
|
stdout=subprocess.PIPE,
|
|
stderr=subprocess.PIPE,
|
|
)
|
|
|
|
responses = []
|
|
by_id = {}
|
|
buffer = b""
|
|
|
|
try:
|
|
for message in messages:
|
|
if proc.stdin is None:
|
|
raise RuntimeError("tsl-server stdin is not available")
|
|
proc.stdin.write(build_payload(message))
|
|
proc.stdin.flush()
|
|
|
|
if "id" not in message:
|
|
continue
|
|
|
|
expected_id = str(message["id"])
|
|
while expected_id not in by_id:
|
|
response, buffer = read_response(proc.stdout, buffer, timeout_seconds=10)
|
|
responses.append(response)
|
|
if "id" in response:
|
|
by_id[str(response["id"])] = response
|
|
|
|
if proc.stdin:
|
|
proc.stdin.close()
|
|
|
|
proc.wait(timeout=5)
|
|
except Exception:
|
|
proc.kill()
|
|
proc.wait(timeout=5)
|
|
raise
|
|
|
|
if proc.returncode not in (0, None):
|
|
sys.stderr.write(proc.stderr.read().decode("utf-8", errors="replace"))
|
|
raise RuntimeError(f"tsl-server exited with code {proc.returncode}")
|
|
|
|
if not args.no_validate:
|
|
validate_responses(by_id)
|
|
|
|
return 0
|
|
|
|
|
|
if __name__ == "__main__":
|
|
raise SystemExit(main())
|