#!/usr/bin/env python3 from __future__ import annotations import json import os import select import shutil import subprocess import sys import time from dataclasses import dataclass from pathlib import Path from typing import Any ROOT = Path(__file__).resolve().parent BINARY = ROOT / "target" / "debug" / "adequate-rust-mcp" DEFAULT_TIMEOUT_S = 60.0 CLIPPY_TIMEOUT_S = 180.0 TRANSIENT_RETRIES = 4 TRANSIENT_BACKOFF_S = 0.25 PREVIEW_LIMIT = 220 MAIN_REL = Path("crates/adequate-rust-mcp/src/main.rs") WORKER_MOD_REL = Path("crates/adequate-rust-mcp/src/worker/mod.rs") MAIN_PATH = ROOT / MAIN_REL WORKER_MOD_PATH = ROOT / WORKER_MOD_REL MAIN_URI = MAIN_PATH.resolve().as_uri() JsonValue = Any @dataclass(frozen=True) class QaStep: label: str method: str params: dict[str, JsonValue] timeout_s: float = DEFAULT_TIMEOUT_S transient_retry: bool = False send_initialized_notification: bool = False class JsonRpcFailure(RuntimeError): def __init__(self, method: str, error: JsonValue) -> None: self.method = method self.error = error super().__init__(f"json-rpc error for `{method}`: {compact_json(error)}") class ToolPayloadFailure(RuntimeError): def __init__(self, tool_name: str, payload: JsonValue) -> None: self.tool_name = tool_name self.payload = payload super().__init__(f"tool `{tool_name}` returned error payload: {compact_json(payload)}") class McpSession: def __init__(self, proc: subprocess.Popen[str]) -> None: self.proc = proc self.stdin = proc.stdin self.stdout = proc.stdout self.next_id = 1 self.buffered_responses: dict[int, JsonValue] = {} @classmethod def spawn(cls) -> McpSession: env = os.environ.copy() env["ADEQUATE_MCP_WORKSPACE_ROOT"] = str(ROOT) proc = subprocess.Popen( [str(BINARY)], cwd=ROOT, env=env, stdin=subprocess.PIPE, stdout=subprocess.PIPE, stderr=subprocess.DEVNULL, text=True, bufsize=1, ) if proc.stdin is None or proc.stdout is None: raise RuntimeError("failed to capture host stdio") return cls(proc) def close(self) -> None: if self.proc.poll() is None: self.proc.kill() self.proc.wait(timeout=5) def notify(self, method: str, params: dict[str, JsonValue]) -> None: payload = { "jsonrpc": "2.0", "method": method, "params": params, } self._write_frame(payload) def request( self, method: str, params: dict[str, JsonValue], *, timeout_s: float, ) -> JsonValue: request_id = self.next_id self.next_id += 1 payload = { "jsonrpc": "2.0", "id": request_id, "method": method, "params": params, } self._write_frame(payload) response = self._read_response(request_id, timeout_s=timeout_s) if "error" in response: raise JsonRpcFailure(method, response["error"]) if "result" not in response: raise RuntimeError(f"json-rpc response for `{method}` missing result") return response["result"] def _write_frame(self, payload: dict[str, JsonValue]) -> None: if self.stdin is None: raise RuntimeError("server stdin is closed") self.stdin.write(json.dumps(payload, separators=(",", ":")) + "\n") self.stdin.flush() def _read_response(self, request_id: int, *, timeout_s: float) -> JsonValue: buffered = self.buffered_responses.pop(request_id, None) if buffered is not None: return buffered deadline = time.monotonic() + timeout_s while True: remaining = deadline - time.monotonic() if remaining <= 0: raise TimeoutError(f"timed out waiting for response id {request_id}") if self.stdout is None: raise RuntimeError("server stdout is closed") ready, _, _ = select.select([self.stdout], [], [], remaining) if not ready: raise TimeoutError(f"timed out waiting for response id {request_id}") line = self.stdout.readline() if line == "": raise RuntimeError("server stdout closed while awaiting response") try: message = json.loads(line) except json.JSONDecodeError: continue response_id = message.get("id") if not isinstance(response_id, int): continue if response_id == request_id: return message self.buffered_responses[response_id] = message def compact_json(value: JsonValue) -> str: return json.dumps(value, separators=(",", ":"), sort_keys=True) def truncate(text: str, limit: int = PREVIEW_LIMIT) -> str: normalized = " ".join(text.split()) if len(normalized) <= limit: return normalized return normalized[: limit - 3] + "..." def range_brief(range_payload: JsonValue) -> str | None: if not isinstance(range_payload, dict): return None start = range_payload.get("start") end = range_payload.get("end") if not isinstance(start, dict) or not isinstance(end, dict): return None start_line = start.get("line") start_column = start.get("column") end_line = end.get("line") end_column = end.get("column") if not all(isinstance(value, int) for value in (start_line, start_column, end_line, end_column)): return None return f"{start_line}:{start_column}-{end_line}:{end_column}" def location_brief(location: JsonValue) -> str | None: if not isinstance(location, dict): return None file_path = location.get("file_path") line = location.get("line") column = location.get("column") if not isinstance(file_path, str) or not isinstance(line, int) or not isinstance(column, int): return None return f"{Path(file_path).name}:{line}:{column}" def hover_preview(payload: dict[str, JsonValue]) -> str: rendered = payload.get("rendered") range_text = range_brief(payload.get("range")) if isinstance(rendered, str) and rendered.strip(): prefix = f"{range_text} " if range_text else "" return truncate(prefix + rendered) return truncate(compact_json(payload)) def locations_preview(payload: dict[str, JsonValue]) -> str: locations = payload.get("locations") if not isinstance(locations, list): return truncate(compact_json(payload)) samples = [location_brief(location) for location in locations[:3]] visible = [sample for sample in samples if sample is not None] joined = ", ".join(visible) suffix = f": {joined}" if joined else "" return truncate(f"{len(locations)} location(s){suffix}") def advanced_preview(payload: dict[str, JsonValue]) -> str: result = payload.get("result") if isinstance(result, list): names = [ item.get("name") for item in result[:3] if isinstance(item, dict) and isinstance(item.get("name"), str) ] if names: return truncate(f"{len(result)} item(s): {', '.join(names)}") return truncate(f"{len(result)} item(s)") if isinstance(result, dict): if "data" in result and isinstance(result.get("data"), list): return truncate(f"semantic tokens: {len(result['data'])} integers") if "items" in result and "kind" in result: items = result.get("items") item_count = len(items) if isinstance(items, list) else 0 kind = result.get("kind") return truncate(f"document diagnostic kind={kind} items={item_count}") if "isIncomplete" in result and isinstance(result.get("items"), list): return truncate( f"completion incomplete={result['isIncomplete']} items={len(result['items'])}" ) if "signatures" in result and isinstance(result.get("signatures"), list): return truncate( f"signature help signatures={len(result['signatures'])} active={result.get('activeSignature')}" ) if {"start", "end"} <= result.keys(): brief = range_brief(result) if brief is not None: return brief return truncate(compact_json(payload)) def extract_tool_payload(result: JsonValue) -> JsonValue: if not isinstance(result, dict): return result if result.get("isError") is True: tool_name = "" raise ToolPayloadFailure(tool_name, result) structured = result.get("structuredContent") if structured is not None: return structured content = result.get("content") if isinstance(content, list) and content: first = content[0] if isinstance(first, dict): text = first.get("text") if isinstance(text, str): return text return result def preview_result(step: QaStep, result: JsonValue) -> str: if step.method == "initialize": if isinstance(result, dict): preview = { "protocolVersion": result.get("protocolVersion"), "serverInfo": result.get("serverInfo"), } return truncate(compact_json(preview)) return truncate(compact_json(result)) if step.method == "tools/list": tools = result.get("tools") if isinstance(result, dict) else None if isinstance(tools, list): names = [ tool.get("name") for tool in tools if isinstance(tool, dict) and isinstance(tool.get("name"), str) ] joined = ", ".join(names) return truncate(f"{len(names)} tools: {joined}") return truncate(compact_json(result)) if step.method == "tools/call": tool_name = step.params.get("name", "") payload = extract_tool_payload(result) if isinstance(payload, str): return truncate(payload) if isinstance(payload, dict) and tool_name == "hover": return hover_preview(payload) if isinstance(payload, dict) and tool_name in {"definition", "references"}: return locations_preview(payload) if isinstance(payload, dict) and tool_name == "advanced_lsp_request": return advanced_preview(payload) if tool_name == "health_snapshot" and isinstance(payload, dict): preview = { "state": payload.get("state"), "generation": payload.get("generation"), "restart_count": payload.get("restart_count"), "consecutive_failures": payload.get("consecutive_failures"), } return truncate(compact_json(preview)) return truncate(compact_json(payload)) return truncate(compact_json(result)) def is_transient_failure(error: BaseException) -> bool: if isinstance(error, JsonRpcFailure): error_object = error.error if isinstance(error.error, dict) else {} data = error_object.get("data") if isinstance(data, dict): if data.get("retryable") is True: return True if data.get("kind") == "transient_retryable": return True code = error_object.get("code") if code == -32801: return True message = error_object.get("message") if isinstance(message, str): lowered = message.lower() if "content modified" in lowered or "document changed" in lowered: return True text = str(error).lower() return "\"kind\":\"transient_retryable\"" in text or "content modified" in text or "document changed" in text def tool_step( label: str, tool_name: str, arguments: dict[str, JsonValue], *, timeout_s: float = DEFAULT_TIMEOUT_S, ) -> QaStep: return QaStep( label=label, method="tools/call", params={"name": tool_name, "arguments": arguments}, timeout_s=timeout_s, transient_retry=True, ) def build_steps() -> list[QaStep]: multi_file_args = { "file_paths": [str(MAIN_PATH), str(WORKER_MOD_PATH)], "render": "json", } definition_string_params = json.dumps( { "textDocument": {"uri": MAIN_URI}, "position": {"line": 29, "character": 34}, }, separators=(",", ":"), ) return [ QaStep( label="initialize", method="initialize", params={ "protocolVersion": "2025-11-25", "capabilities": {}, "clientInfo": { "name": "adequate-qa-checklist", "version": "1.0.0", }, }, send_initialized_notification=True, ), QaStep(label="tools/list", method="tools/list", params={}), tool_step( "advanced documentSymbol warm-up", "advanced_lsp_request", { "method": "textDocument/documentSymbol", "params": {"textDocument": {"uri": MAIN_URI}}, }, ), tool_step( "advanced prepareRename warm-up", "advanced_lsp_request", { "method": "textDocument/prepareRename", "params": { "textDocument": {"uri": MAIN_URI}, "position": {"line": 28, "character": 10}, }, }, ), tool_step( "advanced completion warm-up", "advanced_lsp_request", { "method": "textDocument/completion", "params": { "textDocument": {"uri": MAIN_URI}, "position": {"line": 26, "character": 53}, }, }, ), tool_step( "advanced signatureHelp warm-up", "advanced_lsp_request", { "method": "textDocument/signatureHelp", "params": { "textDocument": {"uri": MAIN_URI}, "position": {"line": 27, "character": 41}, }, }, ), tool_step( "advanced documentHighlight warm-up", "advanced_lsp_request", { "method": "textDocument/documentHighlight", "params": { "textDocument": {"uri": MAIN_URI}, "position": {"line": 28, "character": 10}, }, }, ), tool_step("health_snapshot", "health_snapshot", {}), tool_step( "hover main.rs:27:16", "hover", {"file_path": str(MAIN_REL), "line": 27, "column": 16}, ), tool_step( "definition main.rs:27:16", "definition", {"file_path": str(MAIN_REL), "line": 27, "column": 16}, ), tool_step( "references main.rs:27:16", "references", {"file_path": str(MAIN_REL), "line": 27, "column": 16}, ), tool_step( "diagnostics default porcelain", "diagnostics", {"file_path": str(MAIN_PATH)}, ), tool_step( "diagnostics render=json", "diagnostics", {"file_path": str(MAIN_PATH), "render": "json"}, ), tool_step( "diagnostics mode=full render=json", "diagnostics", {"file_path": str(MAIN_PATH), "mode": "full", "render": "json"}, ), tool_step("diagnostics multi-file fused", "diagnostics", multi_file_args), tool_step( "diagnostics path_style=relative", "diagnostics", {"file_path": str(MAIN_PATH), "path_style": "relative"}, ), tool_step( "diagnostics file URI", "diagnostics", {"file_path": MAIN_URI}, ), tool_step( "diagnostics workspace-relative path", "diagnostics", {"file_path": str(MAIN_REL)}, ), tool_step( "clippy_diagnostics default porcelain", "clippy_diagnostics", {"file_path": str(MAIN_PATH)}, timeout_s=CLIPPY_TIMEOUT_S, ), tool_step( "clippy_diagnostics render=json", "clippy_diagnostics", {"file_path": str(MAIN_PATH), "render": "json"}, timeout_s=CLIPPY_TIMEOUT_S, ), tool_step( "clippy_diagnostics mode=full render=json", "clippy_diagnostics", {"file_path": str(MAIN_PATH), "mode": "full", "render": "json"}, timeout_s=CLIPPY_TIMEOUT_S, ), tool_step( "clippy_diagnostics multi-file fused", "clippy_diagnostics", multi_file_args, timeout_s=CLIPPY_TIMEOUT_S, ), tool_step( "advanced workspace/symbol", "advanced_lsp_request", {"method": "workspace/symbol", "params": {"query": "LaunchMode"}}, ), tool_step( "advanced definition via stringified params", "advanced_lsp_request", { "method": "textDocument/definition", "params": definition_string_params, }, ), tool_step( "advanced foldingRange", "advanced_lsp_request", { "method": "textDocument/foldingRange", "params": {"textDocument": {"uri": MAIN_URI}}, }, ), tool_step( "advanced selectionRange", "advanced_lsp_request", { "method": "textDocument/selectionRange", "params": { "textDocument": {"uri": MAIN_URI}, "positions": [{"line": 29, "character": 34}], }, }, ), tool_step( "advanced inlayHint", "advanced_lsp_request", { "method": "textDocument/inlayHint", "params": { "textDocument": {"uri": MAIN_URI}, "range": { "start": {"line": 0, "character": 0}, "end": {"line": 40, "character": 0}, }, }, }, ), tool_step( "advanced documentDiagnostic", "advanced_lsp_request", { "method": "textDocument/diagnostic", "params": {"textDocument": {"uri": MAIN_URI}}, }, ), tool_step( "advanced semanticTokens/range", "advanced_lsp_request", { "method": "textDocument/semanticTokens/range", "params": { "textDocument": {"uri": MAIN_URI}, "range": { "start": {"line": 26, "character": 0}, "end": {"line": 31, "character": 80}, }, }, }, ), ] def ensure_prerequisites() -> None: if not MAIN_PATH.is_file(): raise SystemExit(f"missing target file: {MAIN_PATH}") if not WORKER_MOD_PATH.is_file(): raise SystemExit(f"missing target file: {WORKER_MOD_PATH}") ra_binary = os.environ.get("ADEQUATE_MCP_RA_BINARY", "rust-analyzer") if shutil.which(ra_binary) is None: raise SystemExit( f"rust-analyzer binary `{ra_binary}` not found on PATH; set ADEQUATE_MCP_RA_BINARY if needed" ) def build_binary() -> None: command = ["cargo", "build", "-q", "-p", "adequate-rust-mcp"] print("+", " ".join(command), flush=True) proc = subprocess.run(command, cwd=ROOT) if proc.returncode != 0: raise SystemExit(proc.returncode) if not BINARY.is_file(): raise SystemExit(f"expected built binary at {BINARY}") def run_step(session: McpSession, step: QaStep) -> JsonValue: attempt = 0 while True: attempt += 1 try: result = session.request(step.method, step.params, timeout_s=step.timeout_s) if step.send_initialized_notification: session.notify("notifications/initialized", {}) if step.method == "tools/call": tool_name = step.params.get("name", "") if isinstance(result, dict) and result.get("isError") is True: raise ToolPayloadFailure(str(tool_name), result) return result except BaseException as error: if step.transient_retry and attempt < TRANSIENT_RETRIES and is_transient_failure(error): time.sleep(TRANSIENT_BACKOFF_S) continue raise def main() -> None: ensure_prerequisites() build_binary() steps = build_steps() failures: list[tuple[QaStep, BaseException]] = [] session = McpSession.spawn() try: for index, step in enumerate(steps, start=1): print(f"[{index:02d}/{len(steps):02d}] {step.label}", flush=True) try: result = run_step(session, step) except BaseException as error: failures.append((step, error)) print(f" error: {error}", flush=True) continue print(f" preview: {preview_result(step, result)}", flush=True) finally: session.close() print() print(f"completed {len(steps)} checklist steps") if failures: print(f"{len(failures)} step(s) failed:") for step, error in failures: print(f"- {step.label}: {error}") raise SystemExit(1) print("all steps returned without error") print("manual vibe check: inspect the previews above and answer yes/no") if __name__ == "__main__": try: main() except KeyboardInterrupt: raise SystemExit(130)