diff options
Diffstat (limited to 'crates')
| -rw-r--r-- | crates/libmcp-testkit/Cargo.toml | 15 | ||||
| -rw-r--r-- | crates/libmcp-testkit/src/lib.rs | 32 | ||||
| -rw-r--r-- | crates/libmcp/Cargo.toml | 22 | ||||
| -rw-r--r-- | crates/libmcp/src/fault.rs | 118 | ||||
| -rw-r--r-- | crates/libmcp/src/health.rs | 116 | ||||
| -rw-r--r-- | crates/libmcp/src/jsonrpc.rs | 337 | ||||
| -rw-r--r-- | crates/libmcp/src/lib.rs | 28 | ||||
| -rw-r--r-- | crates/libmcp/src/normalize.rs | 133 | ||||
| -rw-r--r-- | crates/libmcp/src/render.rs | 138 | ||||
| -rw-r--r-- | crates/libmcp/src/replay.rs | 16 | ||||
| -rw-r--r-- | crates/libmcp/src/telemetry.rs | 299 | ||||
| -rw-r--r-- | crates/libmcp/src/types.rs | 50 |
12 files changed, 1304 insertions, 0 deletions
diff --git a/crates/libmcp-testkit/Cargo.toml b/crates/libmcp-testkit/Cargo.toml new file mode 100644 index 0000000..b8e68b1 --- /dev/null +++ b/crates/libmcp-testkit/Cargo.toml @@ -0,0 +1,15 @@ +[package] +name = "libmcp-testkit" +version.workspace = true +edition.workspace = true +license.workspace = true +rust-version.workspace = true +description.workspace = true +readme.workspace = true + +[dependencies] +serde.workspace = true +serde_json.workspace = true + +[lints] +workspace = true diff --git a/crates/libmcp-testkit/src/lib.rs b/crates/libmcp-testkit/src/lib.rs new file mode 100644 index 0000000..9f33643 --- /dev/null +++ b/crates/libmcp-testkit/src/lib.rs @@ -0,0 +1,32 @@ +//! Shared test helpers for `libmcp` consumers. + +use serde::de::DeserializeOwned; +use std::{ + fs::File, + io::{self, BufRead, BufReader}, + path::Path, +}; + +/// Reads an append-only JSONL file into typed records. +pub fn read_json_lines<T>(path: &Path) -> io::Result<Vec<T>> +where + T: DeserializeOwned, +{ + let file = File::open(path)?; + let reader = BufReader::new(file); + let mut records = Vec::new(); + for line in reader.lines() { + let line = line?; + if line.trim().is_empty() { + continue; + } + let parsed = serde_json::from_str::<T>(line.as_str()).map_err(|error| { + io::Error::new( + io::ErrorKind::InvalidData, + format!("invalid JSONL test record: {error}"), + ) + })?; + records.push(parsed); + } + Ok(records) +} diff --git a/crates/libmcp/Cargo.toml b/crates/libmcp/Cargo.toml new file mode 100644 index 0000000..02ea9db --- /dev/null +++ b/crates/libmcp/Cargo.toml @@ -0,0 +1,22 @@ +[package] +name = "libmcp" +version.workspace = true +edition.workspace = true +license.workspace = true +rust-version.workspace = true +description.workspace = true +readme.workspace = true + +[dependencies] +schemars.workspace = true +serde.workspace = true +serde_json.workspace = true +thiserror.workspace = true +tokio.workspace = true +url.workspace = true + +[dev-dependencies] +tempfile.workspace = true + +[lints] +workspace = true diff --git a/crates/libmcp/src/fault.rs b/crates/libmcp/src/fault.rs new file mode 100644 index 0000000..edbd05e --- /dev/null +++ b/crates/libmcp/src/fault.rs @@ -0,0 +1,118 @@ +//! Fault taxonomy and recovery directives. + +use crate::types::Generation; +use schemars::JsonSchema; +use serde::{Deserialize, Serialize}; + +/// Broad operational fault class. +#[derive(Debug, Clone, Copy, PartialEq, Eq, Serialize, Deserialize, JsonSchema)] +#[serde(rename_all = "snake_case")] +pub enum FaultClass { + /// Underlying transport or I/O failure. + Transport, + /// Process startup, liveness, or exit failure. + Process, + /// Protocol or framing failure. + Protocol, + /// Timeout or deadline failure. + Timeout, + /// Downstream service returned an error. + Downstream, + /// Resource budget or queue exhaustion. + Resource, + /// Replay or recovery budget exhaustion. + Replay, + /// Rollout or binary handoff failure. + Rollout, + /// Internal invariant breach. + Invariant, +} + +/// Recovery directive for an operational fault. +#[derive(Debug, Clone, Copy, PartialEq, Eq, Serialize, Deserialize, JsonSchema)] +#[serde(rename_all = "snake_case")] +pub enum RecoveryDirective { + /// Retry on the current live process. + RetryInPlace, + /// Restart or roll forward, then replay if the replay contract allows it. + RestartAndReplay, + /// Abort the request and surface the failure. + AbortRequest, +} + +/// A typed but extensible fault code. +#[derive(Debug, Clone, PartialEq, Eq, Hash, Serialize, Deserialize, JsonSchema)] +#[serde(transparent)] +pub struct FaultCode(String); + +impl FaultCode { + /// Constructs a new fault code. + /// + /// The code must be non-empty and use lowercase ASCII with underscores. + pub fn try_new(code: impl Into<String>) -> Result<Self, crate::types::InvariantViolation> { + let code = code.into(); + if code.is_empty() + || !code + .bytes() + .all(|byte| byte.is_ascii_lowercase() || byte == b'_' || byte.is_ascii_digit()) + { + return Err(crate::types::InvariantViolation::new( + "fault code must be non-empty lowercase ascii snake_case", + )); + } + Ok(Self(code)) + } + + /// Returns the code text. + #[must_use] + pub fn as_str(&self) -> &str { + self.0.as_str() + } +} + +/// Structured operational fault. +#[derive(Debug, Clone, PartialEq, Eq, Serialize, Deserialize, JsonSchema)] +pub struct Fault { + /// Generation in which the fault happened. + pub generation: Generation, + /// Broad fault class. + pub class: FaultClass, + /// Consumer-defined fine-grained code. + pub code: FaultCode, + /// Recovery directive implied by this fault. + pub directive: RecoveryDirective, + /// Human-facing detail. + pub detail: String, +} + +impl Fault { + /// Constructs a new fault. + #[must_use] + pub fn new( + generation: Generation, + class: FaultClass, + code: FaultCode, + directive: RecoveryDirective, + detail: impl Into<String>, + ) -> Self { + Self { + generation, + class, + code, + directive, + detail: detail.into(), + } + } +} + +#[cfg(test)] +mod tests { + use super::FaultCode; + + #[test] + fn fault_code_rejects_non_snake_case() { + assert!(FaultCode::try_new("broken_pipe").is_ok()); + assert!(FaultCode::try_new("BrokenPipe").is_err()); + assert!(FaultCode::try_new("").is_err()); + } +} diff --git a/crates/libmcp/src/health.rs b/crates/libmcp/src/health.rs new file mode 100644 index 0000000..96c4b4a --- /dev/null +++ b/crates/libmcp/src/health.rs @@ -0,0 +1,116 @@ +//! Standard health and telemetry payloads. + +use crate::{fault::Fault, types::Generation}; +use schemars::JsonSchema; +use serde::{Deserialize, Serialize}; + +/// Coarse lifecycle state of the live worker set. +#[derive(Debug, Clone, Copy, PartialEq, Eq, Serialize, Deserialize, JsonSchema)] +#[serde(rename_all = "snake_case")] +pub enum LifecycleState { + /// No worker is currently alive. + Cold, + /// Startup is in progress. + Starting, + /// A worker is healthy and serving. + Ready, + /// Recovery is in progress after a fault. + Recovering, +} + +/// Rollout or reload state of the host. +#[derive(Debug, Clone, Copy, PartialEq, Eq, Serialize, Deserialize, JsonSchema)] +#[serde(rename_all = "snake_case")] +pub enum RolloutState { + /// No rollout is pending. + Stable, + /// A rollout has been detected but not yet executed. + Pending, + /// A rollout or reexec is in flight. + Reloading, +} + +/// Base health snapshot for a hardened MCP host or worker. +#[derive(Debug, Clone, PartialEq, Eq, Serialize, Deserialize, JsonSchema)] +pub struct HealthSnapshot { + /// Current lifecycle state. + pub state: LifecycleState, + /// Current generation. + pub generation: Generation, + /// Process uptime in milliseconds. + pub uptime_ms: u64, + /// Consecutive failures since the last healthy request. + pub consecutive_failures: u32, + /// Total restart count. + pub restart_count: u64, + /// Rollout state when the runtime exposes it. + #[serde(skip_serializing_if = "Option::is_none")] + pub rollout: Option<RolloutState>, + /// Most recent fault, if any. + #[serde(skip_serializing_if = "Option::is_none")] + pub last_fault: Option<Fault>, +} + +/// Aggregate request totals. +#[derive(Debug, Clone, PartialEq, Eq, Serialize, Deserialize, JsonSchema)] +pub struct TelemetryTotals { + /// Total requests observed. + pub request_count: u64, + /// Requests that completed successfully. + pub success_count: u64, + /// Requests that returned downstream response errors. + pub response_error_count: u64, + /// Requests that failed due to transport or process churn. + pub transport_fault_count: u64, + /// Requests retried by the runtime. + pub retry_count: u64, +} + +/// Per-method telemetry aggregate. +#[derive(Debug, Clone, PartialEq, Eq, Serialize, Deserialize, JsonSchema)] +pub struct MethodTelemetry { + /// Method name. + pub method: String, + /// Total requests for this method. + pub request_count: u64, + /// Successful requests. + pub success_count: u64, + /// Response errors. + pub response_error_count: u64, + /// Transport/process faults. + pub transport_fault_count: u64, + /// Retry count. + pub retry_count: u64, + /// Most recent latency, if any. + #[serde(skip_serializing_if = "Option::is_none")] + pub last_latency_ms: Option<u64>, + /// Maximum latency. + pub max_latency_ms: u64, + /// Average latency. + pub avg_latency_ms: u64, + /// Most recent error text, if any. + #[serde(skip_serializing_if = "Option::is_none")] + pub last_error: Option<String>, +} + +/// Base telemetry snapshot for a hardened MCP runtime. +#[derive(Debug, Clone, PartialEq, Eq, Serialize, Deserialize, JsonSchema)] +pub struct TelemetrySnapshot { + /// Process uptime in milliseconds. + pub uptime_ms: u64, + /// Current lifecycle state. + pub state: LifecycleState, + /// Current generation. + pub generation: Generation, + /// Consecutive failures since last clean success. + pub consecutive_failures: u32, + /// Total restart count. + pub restart_count: u64, + /// Aggregate totals. + pub totals: TelemetryTotals, + /// Per-method aggregates. + pub methods: Vec<MethodTelemetry>, + /// Most recent fault, if any. + #[serde(skip_serializing_if = "Option::is_none")] + pub last_fault: Option<Fault>, +} diff --git a/crates/libmcp/src/jsonrpc.rs b/crates/libmcp/src/jsonrpc.rs new file mode 100644 index 0000000..a54b243 --- /dev/null +++ b/crates/libmcp/src/jsonrpc.rs @@ -0,0 +1,337 @@ +//! Lightweight JSON-RPC frame helpers. + +use crate::normalize::normalize_ascii_token; +use serde::{Deserialize, Serialize}; +use serde_json::Value; +use std::io; +use tokio::io::{AsyncBufReadExt, AsyncRead, AsyncWrite, AsyncWriteExt, BufReader}; +use url::Url; + +/// JSON-RPC request identifier. +#[derive(Debug, Clone, PartialEq, Eq, Hash, Serialize, Deserialize)] +pub enum RequestId { + /// Numeric identifier preserved as text for round-trip stability. + Number(String), + /// Text identifier. + Text(String), +} + +impl RequestId { + /// Parses a request ID from JSON. + #[must_use] + pub fn from_json_value(value: &Value) -> Option<Self> { + match value { + Value::Number(number) => Some(Self::Number(number.to_string())), + Value::String(text) => Some(Self::Text(text.clone())), + Value::Null | Value::Bool(_) | Value::Array(_) | Value::Object(_) => None, + } + } + + /// Converts the request ID back to JSON. + #[must_use] + pub fn to_json_value(&self) -> Value { + match self { + Self::Number(number) => { + let parsed = serde_json::from_str::<Value>(number); + match parsed { + Ok(value @ Value::Number(_)) => value, + Ok(_) | Err(_) => Value::String(number.clone()), + } + } + Self::Text(text) => Value::String(text.clone()), + } + } +} + +/// Parsed JSON-RPC frame. +#[derive(Debug, Clone)] +pub struct FramedMessage { + /// Original payload bytes. + pub payload: Vec<u8>, + /// Parsed JSON value. + pub value: Value, +} + +impl FramedMessage { + /// Parses a JSON-RPC frame payload. + pub fn parse(payload: Vec<u8>) -> io::Result<Self> { + let value = serde_json::from_slice::<Value>(&payload).map_err(|error| { + io::Error::new( + io::ErrorKind::InvalidData, + format!("invalid JSON-RPC frame payload: {error}"), + ) + })?; + if !value.is_object() { + return Err(io::Error::new( + io::ErrorKind::InvalidData, + "JSON-RPC frame root must be an object", + )); + } + Ok(Self { payload, value }) + } + + /// Classifies the envelope shape. + #[must_use] + pub fn classify(&self) -> RpcEnvelopeKind { + let method = self + .value + .get("method") + .and_then(Value::as_str) + .map(ToOwned::to_owned); + let request_id = self.value.get("id").and_then(RequestId::from_json_value); + match (method, request_id) { + (Some(method), Some(id)) => RpcEnvelopeKind::Request { id, method }, + (Some(method), None) => RpcEnvelopeKind::Notification { method }, + (None, Some(id)) => RpcEnvelopeKind::Response { + id, + has_error: self.value.get("error").is_some(), + }, + (None, None) => RpcEnvelopeKind::Unknown, + } + } +} + +/// Coarse JSON-RPC envelope classification. +#[derive(Debug, Clone, PartialEq, Eq)] +pub enum RpcEnvelopeKind { + /// Request with an ID. + Request { + /// Request identifier. + id: RequestId, + /// Method name. + method: String, + }, + /// Notification without an ID. + Notification { + /// Method name. + method: String, + }, + /// Response with an ID. + Response { + /// Request identifier. + id: RequestId, + /// Whether the response carries a JSON-RPC error payload. + has_error: bool, + }, + /// Frame shape did not match a recognized envelope. + Unknown, +} + +/// Tool call metadata extracted from a generic `tools/call` frame. +#[derive(Debug, Clone, PartialEq, Eq)] +pub struct ToolCallMeta { + /// Tool name. + pub tool_name: String, + /// Nested LSP method when the tool proxies LSP-style requests. + pub lsp_method: Option<String>, + /// Best-effort path hint for telemetry grouping. + pub path_hint: Option<String>, +} + +/// One result of reading a line-delimited JSON-RPC stream. +#[derive(Debug, Clone, PartialEq, Eq)] +pub enum FrameReadOutcome { + /// A frame payload was read. + Frame(Vec<u8>), + /// The stream ended cleanly. + EndOfStream, +} + +/// Extracts `tools/call` metadata from a JSON-RPC frame. +#[must_use] +pub fn parse_tool_call_meta(frame: &FramedMessage, rpc_method: &str) -> Option<ToolCallMeta> { + if rpc_method != "tools/call" { + return None; + } + let params = frame.value.get("params")?.as_object()?; + let tool_name = params.get("name")?.as_str()?.to_owned(); + let tool_arguments = params.get("arguments"); + let lsp_method = if normalize_ascii_token(tool_name.as_str()) == "advancedlsprequest" { + tool_arguments + .and_then(Value::as_object) + .and_then(|arguments| { + arguments + .get("method") + .or_else(|| arguments.get("lsp_method")) + .or_else(|| arguments.get("lspMethod")) + }) + .and_then(Value::as_str) + .map(str::to_owned) + } else { + None + }; + let path_hint = tool_arguments.and_then(extract_path_hint_from_value); + Some(ToolCallMeta { + tool_name, + lsp_method, + path_hint, + }) +} + +/// Reads one line-delimited JSON-RPC frame. +pub async fn read_frame<R>(reader: &mut BufReader<R>) -> io::Result<FrameReadOutcome> +where + R: AsyncRead + Unpin, +{ + loop { + let mut line = Vec::<u8>::new(); + let bytes_read = reader.read_until(b'\n', &mut line).await?; + if bytes_read == 0 { + return Ok(FrameReadOutcome::EndOfStream); + } + + while line + .last() + .is_some_and(|byte| *byte == b'\n' || *byte == b'\r') + { + let _popped = line.pop(); + } + + if line.is_empty() { + continue; + } + + return Ok(FrameReadOutcome::Frame(line)); + } +} + +/// Writes one line-delimited JSON-RPC frame. +pub async fn write_frame<W>(writer: &mut W, payload: &[u8]) -> io::Result<()> +where + W: AsyncWrite + Unpin, +{ + writer.write_all(payload).await?; + writer.write_all(b"\n").await?; + writer.flush().await?; + Ok(()) +} + +fn extract_path_hint_from_value(value: &Value) -> Option<String> { + match value { + Value::String(text) => { + let parsed = parse_nested_json_value(text)?; + extract_path_hint_from_value(&parsed) + } + Value::Object(_) => { + let direct = extract_direct_path_hint(value); + if let Some(path) = direct { + return Some(normalize_path_hint(path.as_str())); + } + value + .as_object()? + .values() + .find_map(extract_path_hint_from_value) + } + Value::Array(items) => items.iter().find_map(extract_path_hint_from_value), + Value::Null | Value::Bool(_) | Value::Number(_) => None, + } +} + +fn parse_nested_json_value(raw: &str) -> Option<Value> { + let trimmed = raw.trim(); + let first = trimmed.as_bytes().first()?; + if !matches!(*first, b'{' | b'[') { + return None; + } + serde_json::from_str(trimmed).ok() +} + +fn extract_direct_path_hint(value: &Value) -> Option<String> { + let object = value.as_object()?; + for key in ["file_path", "filePath", "path", "uri"] { + let path = object.get(key).and_then(Value::as_str); + if let Some(path) = path { + return Some(path.to_owned()); + } + } + + let text_document = object.get("textDocument").and_then(Value::as_object); + if let Some(text_document) = text_document { + for key in ["uri", "file_path", "filePath", "path"] { + let path = text_document.get(key).and_then(Value::as_str); + if let Some(path) = path { + return Some(path.to_owned()); + } + } + } + None +} + +fn normalize_path_hint(raw: &str) -> String { + let trimmed = raw.trim(); + if trimmed.starts_with("file://") { + let parsed = Url::parse(trimmed); + if let Ok(parsed) = parsed { + let to_path = parsed.to_file_path(); + if let Ok(path) = to_path { + return path.display().to_string(); + } + } + } + trimmed.to_owned() +} + +#[cfg(test)] +mod tests { + use super::{FramedMessage, RequestId, RpcEnvelopeKind, parse_tool_call_meta}; + use serde_json::json; + + #[test] + fn request_id_round_trips_numeric_and_textual_values() { + let numeric = RequestId::from_json_value(&json!(42)); + assert!(matches!(numeric, Some(RequestId::Number(ref value)) if value == "42")); + + let textual = RequestId::from_json_value(&json!("abc")); + assert!(matches!(textual, Some(RequestId::Text(ref value)) if value == "abc")); + + let round_trip = numeric.map(|value| value.to_json_value()); + assert_eq!(round_trip, Some(json!(42))); + } + + #[test] + fn classifies_request_frames() { + let frame = + FramedMessage::parse(br#"{"jsonrpc":"2.0","id":7,"method":"tools/call"}"#.to_vec()); + assert!(frame.is_ok()); + let frame = match frame { + Ok(value) => value, + Err(_) => return, + }; + assert!(matches!( + frame.classify(), + RpcEnvelopeKind::Request { method, .. } if method == "tools/call" + )); + } + + #[test] + fn extracts_tool_call_meta_with_nested_path_hint() { + let payload = br#"{ + "jsonrpc":"2.0", + "id":1, + "method":"tools/call", + "params":{ + "name":"advanced_lsp_request", + "arguments":{ + "method":"textDocument/hover", + "params":{"textDocument":{"uri":"file:///tmp/example.rs"}} + } + } + }"# + .to_vec(); + let frame = FramedMessage::parse(payload); + assert!(frame.is_ok()); + let frame = match frame { + Ok(value) => value, + Err(_) => return, + }; + let meta = parse_tool_call_meta(&frame, "tools/call"); + assert!(meta.is_some()); + let meta = match meta { + Some(value) => value, + None => return, + }; + assert_eq!(meta.tool_name, "advanced_lsp_request"); + assert_eq!(meta.lsp_method.as_deref(), Some("textDocument/hover")); + assert_eq!(meta.path_hint.as_deref(), Some("/tmp/example.rs")); + } +} diff --git a/crates/libmcp/src/lib.rs b/crates/libmcp/src/lib.rs new file mode 100644 index 0000000..b352d09 --- /dev/null +++ b/crates/libmcp/src/lib.rs @@ -0,0 +1,28 @@ +//! `libmcp` is the shared operational spine for hardened MCP servers. + +pub mod fault; +pub mod health; +pub mod jsonrpc; +pub mod normalize; +pub mod render; +pub mod replay; +pub mod telemetry; +pub mod types; + +pub use fault::{Fault, FaultClass, FaultCode, RecoveryDirective}; +pub use health::{ + HealthSnapshot, LifecycleState, MethodTelemetry, RolloutState, TelemetrySnapshot, + TelemetryTotals, +}; +pub use jsonrpc::{ + FrameReadOutcome, FramedMessage, RequestId, RpcEnvelopeKind, ToolCallMeta, + parse_tool_call_meta, read_frame, write_frame, +}; +pub use normalize::{ + NumericParseError, PathNormalizeError, normalize_ascii_token, normalize_local_path, + parse_human_unsigned_u64, saturating_u64_to_usize, +}; +pub use render::{PathStyle, RenderConfig, RenderMode, TruncatedText, collapse_inline_whitespace}; +pub use replay::ReplayContract; +pub use telemetry::{TelemetryLog, ToolErrorDetail, ToolOutcome}; +pub use types::{Generation, InvariantViolation}; diff --git a/crates/libmcp/src/normalize.rs b/crates/libmcp/src/normalize.rs new file mode 100644 index 0000000..ff24067 --- /dev/null +++ b/crates/libmcp/src/normalize.rs @@ -0,0 +1,133 @@ +//! Shared normalization helpers for model-facing input. + +use std::path::{Path, PathBuf}; +use thiserror::Error; +use url::Url; + +/// A numeric input could not be normalized. +#[derive(Debug, Clone, Copy, PartialEq, Eq, Error)] +pub enum NumericParseError { + /// The input was empty. + #[error("numeric input must be non-empty")] + Empty, + /// The input could not be represented as a non-negative integer. + #[error("expected a non-negative integer")] + Invalid, +} + +/// A path-like value could not be normalized. +#[derive(Debug, Clone, PartialEq, Eq, Error)] +pub enum PathNormalizeError { + /// The input was empty. + #[error("path input must be non-empty")] + Empty, + /// The `file://` URI was malformed. + #[error("file URI is invalid")] + InvalidFileUri, + /// The URI does not reference a local path. + #[error("file URI must resolve to a local path")] + NonLocalFileUri, +} + +/// Parses a human-facing unsigned integer. +/// +/// This accepts: +/// +/// - integer numbers +/// - integer-like floating-point spellings such as `42.0` +/// - numeric strings +#[must_use] +pub fn parse_human_unsigned_u64(raw: &str) -> Option<u64> { + let trimmed = raw.trim(); + if trimmed.is_empty() { + return None; + } + if let Ok(value) = trimmed.parse::<u64>() { + return Some(value); + } + let parsed_float = trimmed.parse::<f64>().ok()?; + if !parsed_float.is_finite() || parsed_float < 0.0 || parsed_float.fract() != 0.0 { + return None; + } + let max = u64::MAX as f64; + if parsed_float > max { + return None; + } + Some(parsed_float as u64) +} + +/// Converts `u64` to `usize`, saturating on overflow. +#[must_use] +pub fn saturating_u64_to_usize(value: u64) -> usize { + usize::try_from(value).unwrap_or(usize::MAX) +} + +/// Normalizes a token by dropping non-alphanumeric ASCII and lowercasing. +#[must_use] +pub fn normalize_ascii_token(raw: &str) -> String { + raw.chars() + .filter(|character| character.is_ascii_alphanumeric()) + .map(|character| character.to_ascii_lowercase()) + .collect() +} + +/// Resolves a local path or `file://` URI to an absolute path. +pub fn normalize_local_path( + raw: &str, + workspace_root: Option<&Path>, +) -> Result<PathBuf, PathNormalizeError> { + let trimmed = raw.trim(); + if trimmed.is_empty() { + return Err(PathNormalizeError::Empty); + } + let parsed = if trimmed.starts_with("file://") { + let file_url = Url::parse(trimmed).map_err(|_| PathNormalizeError::InvalidFileUri)?; + file_url + .to_file_path() + .map_err(|()| PathNormalizeError::NonLocalFileUri)? + } else { + PathBuf::from(trimmed) + }; + Ok(if parsed.is_absolute() { + parsed + } else if let Some(workspace_root) = workspace_root { + workspace_root.join(parsed) + } else { + parsed + }) +} + +#[cfg(test)] +mod tests { + use super::{normalize_ascii_token, normalize_local_path, parse_human_unsigned_u64}; + use std::path::Path; + + #[test] + fn parses_human_unsigned_integers() { + assert_eq!(parse_human_unsigned_u64("42"), Some(42)); + assert_eq!(parse_human_unsigned_u64("42.0"), Some(42)); + assert_eq!(parse_human_unsigned_u64(" 7 "), Some(7)); + assert_eq!(parse_human_unsigned_u64("-1"), None); + assert_eq!(parse_human_unsigned_u64("7.5"), None); + } + + #[test] + fn normalizes_ascii_tokens() { + assert_eq!( + normalize_ascii_token("textDocument/prepareRename"), + "textdocumentpreparerename" + ); + assert_eq!(normalize_ascii_token("prepare_rename"), "preparerename"); + } + + #[test] + fn resolves_relative_paths_against_workspace_root() { + let root = Path::new("/tmp/example-root"); + let resolved = normalize_local_path("src/lib.rs", Some(root)); + assert!(resolved.is_ok()); + assert_eq!( + resolved.ok().as_deref(), + Some(root.join("src/lib.rs").as_path()) + ); + } +} diff --git a/crates/libmcp/src/render.rs b/crates/libmcp/src/render.rs new file mode 100644 index 0000000..cbf2ae6 --- /dev/null +++ b/crates/libmcp/src/render.rs @@ -0,0 +1,138 @@ +//! Model-facing rendering helpers. + +use schemars::JsonSchema; +use serde::{Deserialize, Serialize}; +use std::path::Path; + +/// Output render mode. +#[derive(Debug, Clone, Copy, PartialEq, Eq, Serialize, Deserialize, JsonSchema, Default)] +#[serde(rename_all = "snake_case")] +pub enum RenderMode { + /// Model-optimized text output. + #[default] + #[serde(alias = "text", alias = "plain", alias = "plain_text")] + Porcelain, + /// Structured JSON output. + #[serde(alias = "structured")] + Json, +} + +/// Path rendering style. +#[derive(Debug, Clone, Copy, PartialEq, Eq, Serialize, Deserialize, JsonSchema, Default)] +#[serde(rename_all = "snake_case")] +pub enum PathStyle { + /// Render absolute filesystem paths. + Absolute, + /// Render paths relative to the workspace root when possible. + #[default] + #[serde(alias = "rel")] + Relative, +} + +/// Common render configuration. +#[derive(Debug, Clone, Copy, PartialEq, Eq)] +pub struct RenderConfig { + /// Chosen render mode. + pub render: RenderMode, + /// Chosen path rendering style. + pub path_style: PathStyle, +} + +impl RenderConfig { + /// Builds a render configuration from user input, applying the default + /// path style implied by the render mode. + #[must_use] + pub fn from_user_input(render: Option<RenderMode>, path_style: Option<PathStyle>) -> Self { + let render = render.unwrap_or(RenderMode::Porcelain); + let default_path_style = match render { + RenderMode::Porcelain => PathStyle::Relative, + RenderMode::Json => PathStyle::Absolute, + }; + Self { + render, + path_style: path_style.unwrap_or(default_path_style), + } + } +} + +/// Result of text truncation. +#[derive(Debug, Clone, PartialEq, Eq)] +pub struct TruncatedText { + /// Visible text after truncation. + pub text: String, + /// Whether text was truncated. + pub truncated: bool, +} + +/// Collapses all internal whitespace runs into single spaces. +#[must_use] +pub fn collapse_inline_whitespace(raw: &str) -> String { + raw.split_whitespace().collect::<Vec<_>>().join(" ") +} + +/// Truncates a string by Unicode scalar count. +#[must_use] +pub fn truncate_chars(raw: &str, limit: Option<usize>) -> TruncatedText { + let Some(limit) = limit else { + return TruncatedText { + text: raw.to_owned(), + truncated: false, + }; + }; + let truncated = raw.chars().take(limit).collect::<String>(); + let visible_len = truncated.chars().count(); + if raw.chars().count() > visible_len { + TruncatedText { + text: truncated, + truncated: true, + } + } else { + TruncatedText { + text: raw.to_owned(), + truncated: false, + } + } +} + +/// Renders a path according to the requested style. +#[must_use] +pub fn render_path(path: &Path, style: PathStyle, workspace_root: Option<&Path>) -> String { + match style { + PathStyle::Absolute => path.display().to_string(), + PathStyle::Relative => { + if let Some(workspace_root) = workspace_root + && let Ok(relative) = path.strip_prefix(workspace_root) + { + return relative.display().to_string(); + } + path.display().to_string() + } + } +} + +#[cfg(test)] +mod tests { + use super::{PathStyle, RenderConfig, RenderMode, collapse_inline_whitespace, render_path}; + use std::path::Path; + + #[test] + fn render_config_uses_mode_specific_defaults() { + let porcelain = RenderConfig::from_user_input(None, None); + assert_eq!(porcelain.render, RenderMode::Porcelain); + assert_eq!(porcelain.path_style, PathStyle::Relative); + + let json = RenderConfig::from_user_input(Some(RenderMode::Json), None); + assert_eq!(json.path_style, PathStyle::Absolute); + } + + #[test] + fn collapses_whitespace_and_renders_relative_paths() { + assert_eq!(collapse_inline_whitespace("a b\t c"), "a b c"); + let root = Path::new("/tmp/repo"); + let path = Path::new("/tmp/repo/src/lib.rs"); + assert_eq!( + render_path(path, PathStyle::Relative, Some(root)), + "src/lib.rs" + ); + } +} diff --git a/crates/libmcp/src/replay.rs b/crates/libmcp/src/replay.rs new file mode 100644 index 0000000..0f318a0 --- /dev/null +++ b/crates/libmcp/src/replay.rs @@ -0,0 +1,16 @@ +//! Replay contracts for request surfaces. + +use schemars::JsonSchema; +use serde::{Deserialize, Serialize}; + +/// Replay legality for a request surface. +#[derive(Debug, Clone, Copy, PartialEq, Eq, Serialize, Deserialize, JsonSchema)] +#[serde(rename_all = "snake_case")] +pub enum ReplayContract { + /// Repeated execution converges on the same observable outcome. + Convergent, + /// Replay is only legal after a probe or equivalent proof of safety. + ProbeRequired, + /// Replay is never allowed automatically. + NeverReplay, +} diff --git a/crates/libmcp/src/telemetry.rs b/crates/libmcp/src/telemetry.rs new file mode 100644 index 0000000..6cd4cff --- /dev/null +++ b/crates/libmcp/src/telemetry.rs @@ -0,0 +1,299 @@ +//! Append-only JSONL telemetry support. + +use crate::{ + jsonrpc::{RequestId, ToolCallMeta}, + render::render_path, +}; +use serde::Serialize; +use serde_json::Value; +use std::{ + collections::HashMap, + fs::OpenOptions, + io, + io::Write, + path::{Path, PathBuf}, + time::{Duration, SystemTime, UNIX_EPOCH}, +}; + +/// Tool completion outcome. +#[derive(Debug, Clone, Copy, PartialEq, Eq, Serialize)] +#[serde(rename_all = "snake_case")] +pub enum ToolOutcome { + /// The request completed successfully. + Ok, + /// The request completed with an error. + Error, +} + +/// Serializable tool error detail. +#[derive(Debug, Clone, Default)] +pub struct ToolErrorDetail { + /// Error code when one exists. + pub code: Option<i64>, + /// Structured error kind. + pub kind: Option<String>, + /// Human-facing error message. + pub message: Option<String>, +} + +#[derive(Debug, Default)] +struct PathAggregate { + request_count: u64, + error_count: u64, + total_latency_ms: u128, + max_latency_ms: u64, +} + +#[derive(Debug, Clone, Serialize)] +struct ToolEventRecord { + event: &'static str, + ts_unix_ms: u64, + repo_root: String, + request_id: Value, + tool_name: String, + lsp_method: Option<String>, + path_hint: Option<String>, + latency_ms: u64, + replay_attempts: u8, + outcome: ToolOutcome, + error_code: Option<i64>, + error_kind: Option<String>, + error_message: Option<String>, +} + +#[derive(Debug, Clone, Serialize)] +struct HotPathsSnapshotRecord { + event: &'static str, + ts_unix_ms: u64, + repo_root: String, + total_tool_events: u64, + hottest_paths: Vec<HotPathLine>, + slowest_paths: Vec<HotPathLine>, +} + +#[derive(Debug, Clone, Serialize)] +struct HotPathLine { + path: String, + request_count: u64, + error_count: u64, + avg_latency_ms: u64, + max_latency_ms: u64, +} + +/// Append-only telemetry log. +#[derive(Debug)] +pub struct TelemetryLog { + sink: std::fs::File, + repo_root: String, + by_path: HashMap<String, PathAggregate>, + emitted_tool_events: u64, + snapshot_every: u64, +} + +impl TelemetryLog { + /// Opens or creates a telemetry log file. + pub fn new(path: &Path, repo_root: &Path, snapshot_every: u64) -> io::Result<Self> { + if let Some(parent) = path.parent() { + std::fs::create_dir_all(parent)?; + } + let sink = OpenOptions::new().create(true).append(true).open(path)?; + let repo_root = render_path(repo_root, crate::render::PathStyle::Absolute, None); + Ok(Self { + sink, + repo_root, + by_path: HashMap::new(), + emitted_tool_events: 0, + snapshot_every: snapshot_every.max(1), + }) + } + + /// Records one tool completion and periodically emits a hot-path snapshot. + pub fn record_tool_completion( + &mut self, + request_id: &RequestId, + tool_meta: &ToolCallMeta, + latency_ms: u64, + replay_attempts: u8, + outcome: ToolOutcome, + error: ToolErrorDetail, + ) -> io::Result<()> { + let now = unix_ms_now(); + let request_id = request_id.to_json_value(); + let is_error = matches!(outcome, ToolOutcome::Error); + let ToolErrorDetail { + code: error_code, + kind: error_kind, + message: error_message, + } = error; + let record = ToolEventRecord { + event: "tool_call", + ts_unix_ms: now, + repo_root: self.repo_root.clone(), + request_id, + tool_name: tool_meta.tool_name.clone(), + lsp_method: tool_meta.lsp_method.clone(), + path_hint: tool_meta.path_hint.clone(), + latency_ms, + replay_attempts, + outcome, + error_code, + error_kind, + error_message, + }; + self.write_json_line(&record)?; + + if let Some(path) = tool_meta.path_hint.as_ref() { + let aggregate = self.by_path.entry(path.clone()).or_default(); + aggregate.request_count = aggregate.request_count.saturating_add(1); + aggregate.total_latency_ms = aggregate + .total_latency_ms + .saturating_add(u128::from(latency_ms)); + aggregate.max_latency_ms = aggregate.max_latency_ms.max(latency_ms); + if is_error { + aggregate.error_count = aggregate.error_count.saturating_add(1); + } + } + + self.emitted_tool_events = self.emitted_tool_events.saturating_add(1); + if self.emitted_tool_events.is_multiple_of(self.snapshot_every) { + self.write_hot_paths_snapshot()?; + } + Ok(()) + } + + /// Emits a hot-path snapshot immediately. + pub fn write_hot_paths_snapshot(&mut self) -> io::Result<()> { + let mut hottest = self + .by_path + .iter() + .map(|(path, aggregate)| hot_path_line(path.as_str(), aggregate)) + .collect::<Vec<_>>(); + hottest.sort_by(|left, right| { + right + .request_count + .cmp(&left.request_count) + .then_with(|| right.max_latency_ms.cmp(&left.max_latency_ms)) + .then_with(|| left.path.cmp(&right.path)) + }); + hottest.truncate(12); + + let mut slowest = self + .by_path + .iter() + .filter(|(_, aggregate)| aggregate.request_count > 0) + .map(|(path, aggregate)| hot_path_line(path.as_str(), aggregate)) + .collect::<Vec<_>>(); + slowest.sort_by(|left, right| { + right + .avg_latency_ms + .cmp(&left.avg_latency_ms) + .then_with(|| right.request_count.cmp(&left.request_count)) + .then_with(|| left.path.cmp(&right.path)) + }); + slowest.truncate(12); + + let snapshot = HotPathsSnapshotRecord { + event: "hot_paths_snapshot", + ts_unix_ms: unix_ms_now(), + repo_root: self.repo_root.clone(), + total_tool_events: self.emitted_tool_events, + hottest_paths: hottest, + slowest_paths: slowest, + }; + self.write_json_line(&snapshot) + } + + fn write_json_line<T: Serialize>(&mut self, value: &T) -> io::Result<()> { + let encoded = serde_json::to_vec(value).map_err(|error| { + io::Error::other(format!("telemetry serialization failed: {error}")) + })?; + self.sink.write_all(&encoded)?; + self.sink.write_all(b"\n")?; + Ok(()) + } +} + +fn hot_path_line(path: &str, aggregate: &PathAggregate) -> HotPathLine { + let avg_latency_ms = if aggregate.request_count == 0 { + 0 + } else { + let avg = aggregate.total_latency_ms / u128::from(aggregate.request_count); + u64::try_from(avg).unwrap_or(u64::MAX) + }; + HotPathLine { + path: PathBuf::from(path).display().to_string(), + request_count: aggregate.request_count, + error_count: aggregate.error_count, + avg_latency_ms, + max_latency_ms: aggregate.max_latency_ms, + } +} + +fn unix_ms_now() -> u64 { + let since_epoch = SystemTime::now() + .duration_since(UNIX_EPOCH) + .unwrap_or(Duration::ZERO); + let millis = since_epoch.as_millis(); + u64::try_from(millis).unwrap_or(u64::MAX) +} + +#[cfg(test)] +mod tests { + use super::{TelemetryLog, ToolErrorDetail, ToolOutcome}; + use crate::jsonrpc::{RequestId, ToolCallMeta}; + use serde_json::Value; + use std::fs; + use tempfile::tempdir; + + #[test] + fn writes_tool_events_and_hot_path_snapshots() { + let dir = tempdir(); + assert!(dir.is_ok()); + let dir = match dir { + Ok(value) => value, + Err(_) => return, + }; + let log_path = dir.path().join("telemetry.jsonl"); + let log = TelemetryLog::new(log_path.as_path(), dir.path(), 1); + assert!(log.is_ok()); + let mut log = match log { + Ok(value) => value, + Err(_) => return, + }; + let record = log.record_tool_completion( + &RequestId::Text("abc".to_owned()), + &ToolCallMeta { + tool_name: "hover".to_owned(), + lsp_method: None, + path_hint: Some("/tmp/example.rs".to_owned()), + }, + 12, + 0, + ToolOutcome::Ok, + ToolErrorDetail::default(), + ); + assert!(record.is_ok()); + let text = fs::read_to_string(log_path); + assert!(text.is_ok()); + let text = match text { + Ok(value) => value, + Err(_) => return, + }; + let lines = text.lines().collect::<Vec<_>>(); + assert_eq!(lines.len(), 2); + let first = serde_json::from_str::<Value>(lines[0]); + assert!(first.is_ok()); + let first = match first { + Ok(value) => value, + Err(_) => return, + }; + assert_eq!(first["event"], "tool_call"); + let second = serde_json::from_str::<Value>(lines[1]); + assert!(second.is_ok()); + let second = match second { + Ok(value) => value, + Err(_) => return, + }; + assert_eq!(second["event"], "hot_paths_snapshot"); + } +} diff --git a/crates/libmcp/src/types.rs b/crates/libmcp/src/types.rs new file mode 100644 index 0000000..f9a44a5 --- /dev/null +++ b/crates/libmcp/src/types.rs @@ -0,0 +1,50 @@ +//! Fundamental library-wide types. + +use schemars::JsonSchema; +use serde::{Deserialize, Serialize}; +use std::num::NonZeroU64; +use thiserror::Error; + +/// A library invariant was violated. +#[derive(Debug, Clone, PartialEq, Eq, Error)] +#[error("libmcp invariant violated: {detail}")] +pub struct InvariantViolation { + detail: &'static str, +} + +impl InvariantViolation { + /// Creates a new invariant violation. + #[must_use] + pub const fn new(detail: &'static str) -> Self { + Self { detail } + } +} + +/// Monotonic worker generation identifier. +#[derive( + Debug, Clone, Copy, PartialEq, Eq, PartialOrd, Ord, Hash, Serialize, Deserialize, JsonSchema, +)] +#[serde(transparent)] +pub struct Generation(NonZeroU64); + +impl Generation { + /// Returns the first generation. + #[must_use] + pub const fn genesis() -> Self { + Self(NonZeroU64::MIN) + } + + /// Returns the inner integer value. + #[must_use] + pub const fn get(self) -> u64 { + self.0.get() + } + + /// Advances to the next generation, saturating on overflow. + #[must_use] + pub fn next(self) -> Self { + let next = self.get().saturating_add(1); + let non_zero = NonZeroU64::new(next).map_or(NonZeroU64::MAX, |value| value); + Self(non_zero) + } +} |