swarm repositories / source
aboutsummaryrefslogtreecommitdiff
path: root/crates/phone-opus/src/mcp
diff options
context:
space:
mode:
Diffstat (limited to 'crates/phone-opus/src/mcp')
-rw-r--r--crates/phone-opus/src/mcp/catalog.rs107
-rw-r--r--crates/phone-opus/src/mcp/fault.rs214
-rw-r--r--crates/phone-opus/src/mcp/host/binary.rs41
-rw-r--r--crates/phone-opus/src/mcp/host/mod.rs3
-rw-r--r--crates/phone-opus/src/mcp/host/process.rs223
-rw-r--r--crates/phone-opus/src/mcp/host/runtime.rs779
-rw-r--r--crates/phone-opus/src/mcp/mod.rs10
-rw-r--r--crates/phone-opus/src/mcp/output.rs195
-rw-r--r--crates/phone-opus/src/mcp/protocol.rs74
-rw-r--r--crates/phone-opus/src/mcp/service.rs550
-rw-r--r--crates/phone-opus/src/mcp/telemetry.rs228
11 files changed, 2424 insertions, 0 deletions
diff --git a/crates/phone-opus/src/mcp/catalog.rs b/crates/phone-opus/src/mcp/catalog.rs
new file mode 100644
index 0000000..a7e7cf6
--- /dev/null
+++ b/crates/phone-opus/src/mcp/catalog.rs
@@ -0,0 +1,107 @@
+use libmcp::ReplayContract;
+use serde_json::{Value, json};
+
+use crate::mcp::output::with_common_presentation;
+
+#[derive(Clone, Copy, Debug, Eq, PartialEq)]
+pub(crate) enum DispatchTarget {
+ Host,
+ Worker,
+}
+
+#[derive(Clone, Copy, Debug, Eq, PartialEq)]
+pub(crate) struct ToolSpec {
+ pub(crate) name: &'static str,
+ pub(crate) description: &'static str,
+ pub(crate) dispatch: DispatchTarget,
+ pub(crate) replay: ReplayContract,
+}
+
+impl ToolSpec {
+ fn annotation_json(self) -> Value {
+ json!({
+ "title": self.name,
+ "readOnlyHint": true,
+ "destructiveHint": false,
+ "phoneOpus": {
+ "dispatch": match self.dispatch {
+ DispatchTarget::Host => "host",
+ DispatchTarget::Worker => "worker",
+ },
+ "replayContract": match self.replay {
+ ReplayContract::Convergent => "convergent",
+ ReplayContract::ProbeRequired => "probe_required",
+ ReplayContract::NeverReplay => "never_replay",
+ },
+ }
+ })
+ }
+}
+
+const TOOL_SPECS: &[ToolSpec] = &[
+ ToolSpec {
+ name: "consult",
+ description: "Run a blocking consult against the system Claude Code install using a read-only built-in toolset and return the response.",
+ dispatch: DispatchTarget::Worker,
+ replay: ReplayContract::NeverReplay,
+ },
+ ToolSpec {
+ name: "health_snapshot",
+ description: "Read host lifecycle, worker generation, rollout state, and latest fault. Defaults to render=porcelain; use render=json for structured output.",
+ dispatch: DispatchTarget::Host,
+ replay: ReplayContract::Convergent,
+ },
+ ToolSpec {
+ name: "telemetry_snapshot",
+ description: "Read aggregate request and recovery telemetry for this session. Defaults to render=porcelain; use render=json for structured output.",
+ dispatch: DispatchTarget::Host,
+ replay: ReplayContract::Convergent,
+ },
+];
+
+pub(crate) fn tool_spec(name: &str) -> Option<ToolSpec> {
+ TOOL_SPECS.iter().copied().find(|spec| spec.name == name)
+}
+
+pub(crate) fn tool_definitions() -> Vec<Value> {
+ TOOL_SPECS
+ .iter()
+ .map(|spec| {
+ json!({
+ "name": spec.name,
+ "description": spec.description,
+ "inputSchema": tool_schema(spec.name),
+ "annotations": spec.annotation_json(),
+ })
+ })
+ .collect()
+}
+
+fn tool_schema(name: &str) -> Value {
+ match name {
+ "consult" => with_common_presentation(json!({
+ "type": "object",
+ "properties": {
+ "prompt": {
+ "type": "string",
+ "description": "Prompt to send to Claude Code."
+ },
+ "cwd": {
+ "type": "string",
+ "description": "Optional working directory for the Claude Code session. Relative paths resolve against the MCP host working directory."
+ },
+ "max_turns": {
+ "type": "integer",
+ "minimum": 1,
+ "description": "Optional maximum number of Claude agent turns before stopping."
+ }
+ },
+ "required": ["prompt"]
+ })),
+ "health_snapshot" | "telemetry_snapshot" => with_common_presentation(json!({
+ "type": "object",
+ "properties": {}
+ })),
+ _ => Value::Null,
+ }
+}
diff --git a/crates/phone-opus/src/mcp/fault.rs b/crates/phone-opus/src/mcp/fault.rs
new file mode 100644
index 0000000..5b23f79
--- /dev/null
+++ b/crates/phone-opus/src/mcp/fault.rs
@@ -0,0 +1,214 @@
+use libmcp::{Fault, FaultClass, FaultCode, Generation, RecoveryDirective, ToolErrorDetail};
+use serde::{Deserialize, Serialize};
+use serde_json::{Value, json};
+
+#[derive(Clone, Copy, Debug, Deserialize, Eq, PartialEq, Serialize)]
+pub(crate) enum FaultStage {
+ Host,
+ Worker,
+ Claude,
+ Transport,
+ Protocol,
+ Rollout,
+}
+
+#[derive(Clone, Debug, Deserialize, Eq, PartialEq, Serialize)]
+pub(crate) struct FaultRecord {
+ pub(crate) fault: Fault,
+ pub(crate) stage: FaultStage,
+ pub(crate) operation: String,
+ pub(crate) jsonrpc_code: i64,
+ pub(crate) retryable: bool,
+ pub(crate) retried: bool,
+}
+
+impl FaultRecord {
+ pub(crate) fn invalid_input(
+ generation: Generation,
+ stage: FaultStage,
+ operation: impl Into<String>,
+ detail: impl Into<String>,
+ ) -> Self {
+ Self::new(
+ generation,
+ FaultClass::Protocol,
+ "invalid_input",
+ RecoveryDirective::AbortRequest,
+ stage,
+ operation,
+ detail,
+ -32602,
+ )
+ }
+
+ pub(crate) fn not_initialized(
+ generation: Generation,
+ stage: FaultStage,
+ operation: impl Into<String>,
+ detail: impl Into<String>,
+ ) -> Self {
+ Self::new(
+ generation,
+ FaultClass::Protocol,
+ "not_initialized",
+ RecoveryDirective::AbortRequest,
+ stage,
+ operation,
+ detail,
+ -32002,
+ )
+ }
+
+ pub(crate) fn transport(
+ generation: Generation,
+ stage: FaultStage,
+ operation: impl Into<String>,
+ detail: impl Into<String>,
+ ) -> Self {
+ Self::new(
+ generation,
+ FaultClass::Transport,
+ "transport_failure",
+ RecoveryDirective::RestartAndReplay,
+ stage,
+ operation,
+ detail,
+ -32603,
+ )
+ }
+
+ pub(crate) fn process(
+ generation: Generation,
+ stage: FaultStage,
+ operation: impl Into<String>,
+ detail: impl Into<String>,
+ ) -> Self {
+ Self::new(
+ generation,
+ FaultClass::Process,
+ "process_failure",
+ RecoveryDirective::RestartAndReplay,
+ stage,
+ operation,
+ detail,
+ -32603,
+ )
+ }
+
+ pub(crate) fn downstream(
+ generation: Generation,
+ stage: FaultStage,
+ operation: impl Into<String>,
+ detail: impl Into<String>,
+ ) -> Self {
+ Self::new(
+ generation,
+ FaultClass::Downstream,
+ "downstream_failure",
+ RecoveryDirective::AbortRequest,
+ stage,
+ operation,
+ detail,
+ -32603,
+ )
+ }
+
+ pub(crate) fn internal(
+ generation: Generation,
+ stage: FaultStage,
+ operation: impl Into<String>,
+ detail: impl Into<String>,
+ ) -> Self {
+ Self::new(
+ generation,
+ FaultClass::Invariant,
+ "internal_failure",
+ RecoveryDirective::AbortRequest,
+ stage,
+ operation,
+ detail,
+ -32603,
+ )
+ }
+
+ pub(crate) fn rollout(
+ generation: Generation,
+ operation: impl Into<String>,
+ detail: impl Into<String>,
+ ) -> Self {
+ Self::new(
+ generation,
+ FaultClass::Rollout,
+ "rollout_failure",
+ RecoveryDirective::RestartAndReplay,
+ FaultStage::Rollout,
+ operation,
+ detail,
+ -32603,
+ )
+ }
+
+ pub(crate) fn mark_retried(mut self) -> Self {
+ self.retried = true;
+ self
+ }
+
+ pub(crate) fn message(&self) -> &str {
+ self.fault.detail.as_str()
+ }
+
+ pub(crate) fn error_detail(&self) -> ToolErrorDetail {
+ ToolErrorDetail {
+ code: Some(self.jsonrpc_code),
+ kind: Some(self.fault.code.as_str().to_owned()),
+ message: Some(self.message().to_owned()),
+ }
+ }
+
+ pub(crate) fn into_jsonrpc_error(self) -> Value {
+ json!({
+ "code": self.jsonrpc_code,
+ "message": self.message(),
+ "data": self,
+ })
+ }
+
+ pub(crate) fn into_tool_result(self) -> Value {
+ json!({
+ "content": [{
+ "type": "text",
+ "text": self.message(),
+ }],
+ "structuredContent": self,
+ "isError": true,
+ })
+ }
+
+ fn new(
+ generation: Generation,
+ class: FaultClass,
+ code: &'static str,
+ directive: RecoveryDirective,
+ stage: FaultStage,
+ operation: impl Into<String>,
+ detail: impl Into<String>,
+ jsonrpc_code: i64,
+ ) -> Self {
+ let fault = Fault::new(generation, class, fault_code(code), directive, detail);
+ Self {
+ retryable: directive != RecoveryDirective::AbortRequest,
+ fault,
+ stage,
+ operation: operation.into(),
+ jsonrpc_code,
+ retried: false,
+ }
+ }
+}
+
+fn fault_code(code: &'static str) -> FaultCode {
+ match FaultCode::try_new(code.to_owned()) {
+ Ok(value) => value,
+ Err(_) => std::process::abort(),
+ }
+}
diff --git a/crates/phone-opus/src/mcp/host/binary.rs b/crates/phone-opus/src/mcp/host/binary.rs
new file mode 100644
index 0000000..9ec7721
--- /dev/null
+++ b/crates/phone-opus/src/mcp/host/binary.rs
@@ -0,0 +1,41 @@
+use std::fs;
+use std::io;
+use std::path::{Path, PathBuf};
+
+use crate::mcp::protocol::BinaryFingerprint;
+
+pub(crate) struct BinaryRuntime {
+ pub(crate) path: PathBuf,
+ startup_fingerprint: BinaryFingerprint,
+ pub(crate) launch_path_stable: bool,
+}
+
+impl BinaryRuntime {
+ pub(crate) fn new(path: PathBuf) -> io::Result<Self> {
+ let startup_fingerprint = fingerprint_binary(&path)?;
+ Ok(Self {
+ launch_path_stable: !path
+ .components()
+ .any(|component| component.as_os_str().to_string_lossy() == "target"),
+ path,
+ startup_fingerprint,
+ })
+ }
+
+ pub(crate) fn rollout_pending(&self) -> io::Result<bool> {
+ Ok(fingerprint_binary(&self.path)? != self.startup_fingerprint)
+ }
+}
+
+fn fingerprint_binary(path: &Path) -> io::Result<BinaryFingerprint> {
+ let metadata = fs::metadata(path)?;
+ let modified_unix_nanos = metadata
+ .modified()?
+ .duration_since(std::time::UNIX_EPOCH)
+ .map_err(|error| io::Error::other(format!("invalid binary mtime: {error}")))?
+ .as_nanos();
+ Ok(BinaryFingerprint {
+ length_bytes: metadata.len(),
+ modified_unix_nanos,
+ })
+}
diff --git a/crates/phone-opus/src/mcp/host/mod.rs b/crates/phone-opus/src/mcp/host/mod.rs
new file mode 100644
index 0000000..29cdcb1
--- /dev/null
+++ b/crates/phone-opus/src/mcp/host/mod.rs
@@ -0,0 +1,3 @@
+pub(crate) mod binary;
+pub(crate) mod process;
+pub(crate) mod runtime;
diff --git a/crates/phone-opus/src/mcp/host/process.rs b/crates/phone-opus/src/mcp/host/process.rs
new file mode 100644
index 0000000..d3c62c8
--- /dev/null
+++ b/crates/phone-opus/src/mcp/host/process.rs
@@ -0,0 +1,223 @@
+use std::io::{BufRead, BufReader, BufWriter, Write};
+use std::process::{Child, ChildStdin, ChildStdout, Command, Stdio};
+
+use libmcp::Generation;
+use serde_json::Value;
+
+use crate::mcp::fault::{FaultRecord, FaultStage};
+use crate::mcp::protocol::{
+ HostRequestId, WorkerOperation, WorkerOutcome, WorkerRequest, WorkerResponse, WorkerSpawnConfig,
+};
+
+pub(super) struct WorkerSupervisor {
+ config: WorkerSpawnConfig,
+ generation: Generation,
+ has_spawned: bool,
+ crash_before_reply_once: bool,
+ child: Option<Child>,
+ stdin: Option<BufWriter<ChildStdin>>,
+ stdout: Option<BufReader<ChildStdout>>,
+}
+
+impl WorkerSupervisor {
+ pub(super) fn new(
+ config: WorkerSpawnConfig,
+ generation: Generation,
+ has_spawned: bool,
+ ) -> Self {
+ Self {
+ config,
+ generation,
+ has_spawned,
+ crash_before_reply_once: false,
+ child: None,
+ stdin: None,
+ stdout: None,
+ }
+ }
+
+ pub(super) fn generation(&self) -> Generation {
+ self.generation
+ }
+
+ pub(super) fn has_spawned(&self) -> bool {
+ self.has_spawned
+ }
+
+ pub(super) fn execute(
+ &mut self,
+ request_id: HostRequestId,
+ operation: WorkerOperation,
+ ) -> Result<Value, FaultRecord> {
+ self.ensure_worker()?;
+ let request = WorkerRequest::Execute {
+ id: request_id,
+ operation,
+ };
+ let stdin = self.stdin.as_mut().ok_or_else(|| {
+ FaultRecord::transport(
+ self.generation,
+ FaultStage::Transport,
+ "worker.stdin",
+ "worker stdin is not available",
+ )
+ })?;
+ serde_json::to_writer(&mut *stdin, &request).map_err(|error| {
+ FaultRecord::transport(
+ self.generation,
+ FaultStage::Transport,
+ "worker.write",
+ format!("failed to encode worker request: {error}"),
+ )
+ })?;
+ stdin.write_all(b"\n").map_err(|error| {
+ FaultRecord::transport(
+ self.generation,
+ FaultStage::Transport,
+ "worker.write",
+ format!("failed to frame worker request: {error}"),
+ )
+ })?;
+ stdin.flush().map_err(|error| {
+ FaultRecord::transport(
+ self.generation,
+ FaultStage::Transport,
+ "worker.write",
+ format!("failed to flush worker request: {error}"),
+ )
+ })?;
+
+ if self.crash_before_reply_once {
+ self.crash_before_reply_once = false;
+ self.kill_current_worker();
+ return Err(FaultRecord::transport(
+ self.generation,
+ FaultStage::Transport,
+ "worker.read",
+ "worker crashed before replying",
+ ));
+ }
+
+ let stdout = self.stdout.as_mut().ok_or_else(|| {
+ FaultRecord::transport(
+ self.generation,
+ FaultStage::Transport,
+ "worker.stdout",
+ "worker stdout is not available",
+ )
+ })?;
+ let mut line = String::new();
+ let bytes = stdout.read_line(&mut line).map_err(|error| {
+ FaultRecord::transport(
+ self.generation,
+ FaultStage::Transport,
+ "worker.read",
+ format!("failed to read worker response: {error}"),
+ )
+ })?;
+ if bytes == 0 {
+ self.kill_current_worker();
+ return Err(FaultRecord::transport(
+ self.generation,
+ FaultStage::Transport,
+ "worker.read",
+ "worker exited before replying",
+ ));
+ }
+ let response = serde_json::from_str::<WorkerResponse>(&line).map_err(|error| {
+ FaultRecord::transport(
+ self.generation,
+ FaultStage::Transport,
+ "worker.read",
+ format!("invalid worker response: {error}"),
+ )
+ })?;
+ match response.outcome {
+ WorkerOutcome::Success { result } => Ok(result),
+ WorkerOutcome::Fault { fault } => Err(fault),
+ }
+ }
+
+ pub(super) fn restart(&mut self) -> Result<(), FaultRecord> {
+ self.kill_current_worker();
+ self.ensure_worker()
+ }
+
+ pub(super) fn is_alive(&mut self) -> bool {
+ let Some(child) = self.child.as_mut() else {
+ return false;
+ };
+ if let Ok(None) = child.try_wait() {
+ true
+ } else {
+ self.child = None;
+ self.stdin = None;
+ self.stdout = None;
+ false
+ }
+ }
+
+ pub(super) fn arm_crash_once(&mut self) {
+ self.crash_before_reply_once = true;
+ }
+
+ fn ensure_worker(&mut self) -> Result<(), FaultRecord> {
+ if self.is_alive() {
+ return Ok(());
+ }
+ let generation = if self.has_spawned {
+ self.generation.next()
+ } else {
+ self.generation
+ };
+ let mut child = Command::new(&self.config.executable)
+ .arg("mcp")
+ .arg("worker")
+ .arg("--generation")
+ .arg(generation.get().to_string())
+ .stdin(Stdio::piped())
+ .stdout(Stdio::piped())
+ .stderr(Stdio::inherit())
+ .spawn()
+ .map_err(|error| {
+ FaultRecord::process(
+ generation,
+ FaultStage::Transport,
+ "worker.spawn",
+ format!("failed to spawn worker: {error}"),
+ )
+ })?;
+ let stdin = child.stdin.take().ok_or_else(|| {
+ FaultRecord::internal(
+ generation,
+ FaultStage::Transport,
+ "worker.spawn",
+ "worker stdin pipe was not created",
+ )
+ })?;
+ let stdout = child.stdout.take().ok_or_else(|| {
+ FaultRecord::internal(
+ generation,
+ FaultStage::Transport,
+ "worker.spawn",
+ "worker stdout pipe was not created",
+ )
+ })?;
+ self.generation = generation;
+ self.has_spawned = true;
+ self.child = Some(child);
+ self.stdin = Some(BufWriter::new(stdin));
+ self.stdout = Some(BufReader::new(stdout));
+ Ok(())
+ }
+
+ fn kill_current_worker(&mut self) {
+ if let Some(child) = self.child.as_mut() {
+ let _ = child.kill();
+ let _ = child.wait();
+ }
+ self.child = None;
+ self.stdin = None;
+ self.stdout = None;
+ }
+}
diff --git a/crates/phone-opus/src/mcp/host/runtime.rs b/crates/phone-opus/src/mcp/host/runtime.rs
new file mode 100644
index 0000000..5922766
--- /dev/null
+++ b/crates/phone-opus/src/mcp/host/runtime.rs
@@ -0,0 +1,779 @@
+use std::fs;
+use std::io::{self, BufRead, Write};
+#[cfg(unix)]
+use std::os::unix::process::CommandExt;
+use std::path::PathBuf;
+use std::process::Command;
+use std::time::Instant;
+
+use dirs::{home_dir, state_dir};
+use libmcp::{
+ FramedMessage, Generation, HostSessionKernel, ReplayContract, RequestId, RolloutState,
+ TelemetryLog, ToolOutcome, load_snapshot_file_from_env, remove_snapshot_file,
+ write_snapshot_file,
+};
+use serde_json::{Value, json};
+
+use crate::mcp::catalog::{DispatchTarget, tool_definitions, tool_spec};
+use crate::mcp::fault::{FaultRecord, FaultStage};
+use crate::mcp::host::binary::BinaryRuntime;
+use crate::mcp::host::process::WorkerSupervisor;
+use crate::mcp::output::{
+ ToolOutput, fallback_detailed_tool_output, split_presentation, tool_success,
+};
+use crate::mcp::protocol::{
+ FORCE_ROLLOUT_ENV, HOST_STATE_ENV, HostRequestId, HostStateSeed, PROTOCOL_VERSION, SERVER_NAME,
+ WORKER_CRASH_ONCE_ENV, WorkerOperation, WorkerSpawnConfig,
+};
+use crate::mcp::telemetry::ServerTelemetry;
+
+pub(crate) fn run_host() -> Result<(), Box<dyn std::error::Error>> {
+ let stdin = io::stdin();
+ let mut stdout = io::stdout().lock();
+ let mut host = HostRuntime::new()?;
+
+ for line in stdin.lock().lines() {
+ let line = line?;
+ if line.trim().is_empty() {
+ continue;
+ }
+
+ let maybe_response = host.handle_line(&line);
+ if let Some(response) = maybe_response {
+ write_message(&mut stdout, &response)?;
+ }
+ host.maybe_roll_forward()?;
+ }
+
+ Ok(())
+}
+
+struct HostRuntime {
+ session_kernel: HostSessionKernel,
+ telemetry: ServerTelemetry,
+ telemetry_log: TelemetryLog,
+ next_request_id: u64,
+ worker: WorkerSupervisor,
+ binary: BinaryRuntime,
+ force_rollout_key: Option<String>,
+ force_rollout_consumed: bool,
+ rollout_requested: bool,
+ worker_crash_once_key: Option<String>,
+ worker_crash_once_consumed: bool,
+}
+
+impl HostRuntime {
+ fn new() -> Result<Self, Box<dyn std::error::Error>> {
+ let executable = std::env::current_exe()?;
+ let binary = BinaryRuntime::new(executable.clone())?;
+ let restored = restore_host_state()?;
+ let session_kernel = restored
+ .as_ref()
+ .map(|seed| seed.session_kernel.clone().restore())
+ .transpose()?
+ .map_or_else(HostSessionKernel::cold, HostSessionKernel::from_restored);
+ let telemetry = restored
+ .as_ref()
+ .map_or_else(ServerTelemetry::default, |seed| seed.telemetry.clone());
+ let next_request_id = restored
+ .as_ref()
+ .map_or(1, |seed| seed.next_request_id.max(1));
+ let worker_generation = restored
+ .as_ref()
+ .map_or(Generation::genesis(), |seed| seed.worker_generation);
+ let worker_spawned = restored.as_ref().is_some_and(|seed| seed.worker_spawned);
+ let force_rollout_consumed = restored
+ .as_ref()
+ .is_some_and(|seed| seed.force_rollout_consumed);
+ let worker_crash_once_consumed = restored
+ .as_ref()
+ .is_some_and(|seed| seed.worker_crash_once_consumed);
+ let telemetry_log = open_telemetry_log()?;
+ let worker = WorkerSupervisor::new(
+ WorkerSpawnConfig {
+ executable: executable.clone(),
+ },
+ worker_generation,
+ worker_spawned,
+ );
+
+ Ok(Self {
+ session_kernel,
+ telemetry,
+ telemetry_log,
+ next_request_id,
+ worker,
+ binary,
+ force_rollout_key: std::env::var(FORCE_ROLLOUT_ENV).ok(),
+ force_rollout_consumed,
+ rollout_requested: false,
+ worker_crash_once_key: std::env::var(WORKER_CRASH_ONCE_ENV).ok(),
+ worker_crash_once_consumed,
+ })
+ }
+
+ fn handle_line(&mut self, line: &str) -> Option<Value> {
+ let frame = match FramedMessage::parse(line.as_bytes().to_vec()) {
+ Ok(frame) => frame,
+ Err(error) => {
+ return Some(jsonrpc_error(
+ Value::Null,
+ FaultRecord::invalid_input(
+ self.worker.generation(),
+ FaultStage::Protocol,
+ "jsonrpc.parse",
+ format!("parse error: {error}"),
+ ),
+ ));
+ }
+ };
+ self.handle_frame(frame)
+ }
+
+ fn handle_frame(&mut self, frame: FramedMessage) -> Option<Value> {
+ self.session_kernel.observe_client_frame(&frame);
+ let Some(object) = frame.value.as_object() else {
+ return Some(jsonrpc_error(
+ Value::Null,
+ FaultRecord::invalid_input(
+ self.worker.generation(),
+ FaultStage::Protocol,
+ "jsonrpc.message",
+ "invalid request: expected JSON object",
+ ),
+ ));
+ };
+ let method = object.get("method").and_then(Value::as_str)?;
+ let id = object.get("id").cloned();
+ let params = object.get("params").cloned().unwrap_or_else(|| json!({}));
+ let operation_key = operation_key(method, &params);
+ let started_at = Instant::now();
+
+ self.telemetry.record_request(&operation_key);
+ let response = match self.dispatch(&frame, method, params, id.clone()) {
+ Ok(Some(result)) => {
+ let latency_ms = elapsed_ms(started_at.elapsed());
+ self.telemetry.record_success(
+ &operation_key,
+ latency_ms,
+ self.worker.generation(),
+ self.worker.is_alive(),
+ );
+ id.map(|id| jsonrpc_result(id, result))
+ }
+ Ok(None) => {
+ let latency_ms = elapsed_ms(started_at.elapsed());
+ self.telemetry.record_success(
+ &operation_key,
+ latency_ms,
+ self.worker.generation(),
+ self.worker.is_alive(),
+ );
+ None
+ }
+ Err(fault) => {
+ let latency_ms = elapsed_ms(started_at.elapsed());
+ self.telemetry.record_error(
+ &operation_key,
+ &fault,
+ latency_ms,
+ self.worker.generation(),
+ );
+ Some(match id {
+ Some(id) if method == "tools/call" => {
+ jsonrpc_result(id, fault.into_tool_result())
+ }
+ Some(id) => jsonrpc_error(id, fault),
+ None => jsonrpc_error(Value::Null, fault),
+ })
+ }
+ };
+
+ if self.should_force_rollout(&operation_key) {
+ self.force_rollout_consumed = true;
+ self.telemetry.record_rollout();
+ self.rollout_requested = true;
+ }
+
+ response
+ }
+
+ fn dispatch(
+ &mut self,
+ request_frame: &FramedMessage,
+ method: &str,
+ params: Value,
+ request_id: Option<Value>,
+ ) -> Result<Option<Value>, FaultRecord> {
+ match method {
+ "initialize" => Ok(Some(json!({
+ "protocolVersion": PROTOCOL_VERSION,
+ "capabilities": {
+ "tools": { "listChanged": false }
+ },
+ "serverInfo": {
+ "name": SERVER_NAME,
+ "version": env!("CARGO_PKG_VERSION")
+ },
+ "instructions": "Use consult for blocking Claude Code consultations. Each call runs the system Claude Code binary in print-mode JSON with no configured MCP servers, a read-only built-in toolset, and dontAsk permission mode. Edit tools are unavailable."
+ }))),
+ "notifications/initialized" => {
+ if !self.seed_captured() {
+ return Err(FaultRecord::not_initialized(
+ self.worker.generation(),
+ FaultStage::Host,
+ "notifications/initialized",
+ "received initialized notification before initialize",
+ ));
+ }
+ Ok(None)
+ }
+ "notifications/cancelled" => Ok(None),
+ "ping" => Ok(Some(json!({}))),
+ other => {
+ self.require_initialized(other)?;
+ match other {
+ "tools/list" => Ok(Some(json!({ "tools": tool_definitions() }))),
+ "tools/call" => Ok(Some(self.dispatch_tool_call(
+ request_frame,
+ params,
+ request_id,
+ )?)),
+ _ => Err(FaultRecord::invalid_input(
+ self.worker.generation(),
+ FaultStage::Protocol,
+ other,
+ format!("method `{other}` is not implemented"),
+ )),
+ }
+ }
+ }
+ }
+
+ fn dispatch_tool_call(
+ &mut self,
+ request_frame: &FramedMessage,
+ params: Value,
+ _request_id: Option<Value>,
+ ) -> Result<Value, FaultRecord> {
+ let envelope =
+ deserialize::<ToolCallEnvelope>(params, "tools/call", self.worker.generation())?;
+ let spec = tool_spec(&envelope.name).ok_or_else(|| {
+ FaultRecord::invalid_input(
+ self.worker.generation(),
+ FaultStage::Host,
+ format!("tools/call:{}", envelope.name),
+ format!("unknown tool `{}`", envelope.name),
+ )
+ })?;
+ match spec.dispatch {
+ DispatchTarget::Host => {
+ let started_at = Instant::now();
+ let request_id = request_id_from_frame(request_frame);
+ let result = self.handle_host_tool(&envelope.name, envelope.arguments);
+ self.record_host_tool_completion(
+ request_frame,
+ request_id.as_ref(),
+ elapsed_ms(started_at.elapsed()),
+ result.as_ref().err(),
+ );
+ result
+ }
+ DispatchTarget::Worker => {
+ self.dispatch_worker_tool(request_frame, spec, envelope.arguments)
+ }
+ }
+ }
+
+ fn dispatch_worker_tool(
+ &mut self,
+ request_frame: &FramedMessage,
+ spec: crate::mcp::catalog::ToolSpec,
+ arguments: Value,
+ ) -> Result<Value, FaultRecord> {
+ let operation = format!("tools/call:{}", spec.name);
+ self.dispatch_worker_operation(
+ request_frame,
+ operation,
+ spec.replay,
+ WorkerOperation::CallTool {
+ name: spec.name.to_owned(),
+ arguments,
+ },
+ )
+ }
+
+ fn dispatch_worker_operation(
+ &mut self,
+ request_frame: &FramedMessage,
+ operation: String,
+ replay: ReplayContract,
+ worker_operation: WorkerOperation,
+ ) -> Result<Value, FaultRecord> {
+ if self.should_crash_worker_once(&operation) {
+ self.worker.arm_crash_once();
+ }
+
+ self.session_kernel
+ .record_forwarded_request(request_frame, replay);
+ let forwarded_request_id = request_id_from_frame(request_frame);
+ let host_request_id = self.allocate_request_id();
+ let started_at = Instant::now();
+ let mut replay_attempts = 0;
+
+ let outcome = match self
+ .worker
+ .execute(host_request_id, worker_operation.clone())
+ {
+ Ok(result) => Ok(result),
+ Err(fault) => {
+ if replay == ReplayContract::Convergent && fault.retryable {
+ replay_attempts = 1;
+ self.telemetry.record_retry(&operation);
+ self.worker
+ .restart()
+ .map_err(|restart_fault| restart_fault.mark_retried())?;
+ self.telemetry
+ .record_worker_restart(self.worker.generation());
+ self.worker
+ .execute(host_request_id, worker_operation)
+ .map_err(FaultRecord::mark_retried)
+ } else {
+ Err(fault)
+ }
+ }
+ };
+
+ let completed = forwarded_request_id
+ .as_ref()
+ .and_then(|request_id| self.session_kernel.take_completed_request(request_id));
+ self.record_worker_tool_completion(
+ forwarded_request_id.as_ref(),
+ completed.as_ref(),
+ elapsed_ms(started_at.elapsed()),
+ replay_attempts,
+ outcome.as_ref().err(),
+ );
+ outcome
+ }
+
+ fn handle_host_tool(&mut self, name: &str, arguments: Value) -> Result<Value, FaultRecord> {
+ let operation = format!("tools/call:{name}");
+ let generation = self.worker.generation();
+ let (presentation, _arguments) =
+ split_presentation(arguments, &operation, generation, FaultStage::Host)?;
+ match name {
+ "health_snapshot" => {
+ let rollout = if self.binary.rollout_pending().map_err(|error| {
+ FaultRecord::rollout(generation, &operation, error.to_string())
+ })? {
+ RolloutState::Pending
+ } else {
+ RolloutState::Stable
+ };
+ let health = self.telemetry.health_snapshot(rollout);
+ tool_success(
+ health_snapshot_output(
+ &health,
+ self.worker.is_alive(),
+ self.binary.launch_path_stable,
+ generation,
+ )?,
+ presentation,
+ generation,
+ FaultStage::Host,
+ &operation,
+ )
+ }
+ "telemetry_snapshot" => {
+ let snapshot = self.telemetry.telemetry_snapshot();
+ tool_success(
+ telemetry_snapshot_output(
+ &snapshot,
+ self.telemetry.host_rollouts(),
+ generation,
+ )?,
+ presentation,
+ generation,
+ FaultStage::Host,
+ &operation,
+ )
+ }
+ other => Err(FaultRecord::invalid_input(
+ generation,
+ FaultStage::Host,
+ format!("tools/call:{other}"),
+ format!("unknown host tool `{other}`"),
+ )),
+ }
+ }
+
+ fn require_initialized(&self, operation: &str) -> Result<(), FaultRecord> {
+ if self.session_initialized() {
+ return Ok(());
+ }
+ Err(FaultRecord::not_initialized(
+ self.worker.generation(),
+ FaultStage::Host,
+ operation,
+ "client must call initialize and notifications/initialized before normal operations",
+ ))
+ }
+
+ fn session_initialized(&self) -> bool {
+ self.session_kernel
+ .initialization_seed()
+ .is_some_and(|seed| seed.initialized_notification.is_some())
+ }
+
+ fn seed_captured(&self) -> bool {
+ self.session_kernel.initialization_seed().is_some()
+ }
+
+ fn allocate_request_id(&mut self) -> HostRequestId {
+ let id = HostRequestId(self.next_request_id);
+ self.next_request_id += 1;
+ id
+ }
+
+ fn maybe_roll_forward(&mut self) -> Result<(), Box<dyn std::error::Error>> {
+ let binary_pending = self.binary.rollout_pending()?;
+ if !self.rollout_requested && !binary_pending {
+ return Ok(());
+ }
+ if binary_pending && !self.rollout_requested {
+ self.telemetry.record_rollout();
+ }
+ self.roll_forward()
+ }
+
+ fn roll_forward(&mut self) -> Result<(), Box<dyn std::error::Error>> {
+ let state = HostStateSeed {
+ session_kernel: self.session_kernel.snapshot(),
+ telemetry: self.telemetry.clone(),
+ next_request_id: self.next_request_id,
+ worker_generation: self.worker.generation(),
+ worker_spawned: self.worker.has_spawned(),
+ force_rollout_consumed: self.force_rollout_consumed,
+ worker_crash_once_consumed: self.worker_crash_once_consumed,
+ };
+ let state_path = write_snapshot_file("phone-opus-mcp-host-reexec", &state)?;
+ let mut command = Command::new(&self.binary.path);
+ let _ = command.arg("mcp").arg("serve");
+ let _ = command.env(HOST_STATE_ENV, &state_path);
+ #[cfg(unix)]
+ {
+ let error = command.exec();
+ let _ = remove_snapshot_file(&state_path);
+ Err(Box::new(error))
+ }
+ #[cfg(not(unix))]
+ {
+ let _ = remove_snapshot_file(&state_path);
+ Err(Box::new(io::Error::new(
+ io::ErrorKind::Unsupported,
+ "host rollout requires unix exec support",
+ )))
+ }
+ }
+
+ fn should_force_rollout(&self, operation: &str) -> bool {
+ self.force_rollout_key
+ .as_deref()
+ .is_some_and(|key| key == operation)
+ && !self.force_rollout_consumed
+ }
+
+ fn should_crash_worker_once(&mut self, operation: &str) -> bool {
+ let should_crash = self
+ .worker_crash_once_key
+ .as_deref()
+ .is_some_and(|key| key == operation)
+ && !self.worker_crash_once_consumed;
+ if should_crash {
+ self.worker_crash_once_consumed = true;
+ }
+ should_crash
+ }
+
+ fn record_host_tool_completion(
+ &mut self,
+ request_frame: &FramedMessage,
+ request_id: Option<&RequestId>,
+ latency_ms: u64,
+ fault: Option<&FaultRecord>,
+ ) {
+ let Some(request_id) = request_id else {
+ return;
+ };
+ let Some(tool_meta) = libmcp::parse_tool_call_meta(request_frame, "tools/call") else {
+ return;
+ };
+ self.record_tool_completion(request_id, &tool_meta, latency_ms, 0, fault);
+ }
+
+ fn record_worker_tool_completion(
+ &mut self,
+ request_id: Option<&RequestId>,
+ completed: Option<&libmcp::CompletedPendingRequest>,
+ latency_ms: u64,
+ replay_attempts: u8,
+ fault: Option<&FaultRecord>,
+ ) {
+ let Some(request_id) = request_id else {
+ return;
+ };
+ let Some(completed) = completed else {
+ return;
+ };
+ let Some(tool_meta) = completed.request.tool_call_meta.as_ref() else {
+ return;
+ };
+ self.record_tool_completion(request_id, tool_meta, latency_ms, replay_attempts, fault);
+ }
+
+ fn record_tool_completion(
+ &mut self,
+ request_id: &RequestId,
+ tool_meta: &libmcp::ToolCallMeta,
+ latency_ms: u64,
+ replay_attempts: u8,
+ fault: Option<&FaultRecord>,
+ ) {
+ let result = self.telemetry_log.record_tool_completion(
+ request_id,
+ tool_meta,
+ latency_ms,
+ replay_attempts,
+ if fault.is_some() {
+ ToolOutcome::Error
+ } else {
+ ToolOutcome::Ok
+ },
+ fault.map_or_else(libmcp::ToolErrorDetail::default, FaultRecord::error_detail),
+ );
+ if let Err(error) = result {
+ eprintln!("phone_opus telemetry write failed: {error}");
+ }
+ }
+}
+
+fn restore_host_state() -> Result<Option<HostStateSeed>, Box<dyn std::error::Error>> {
+ Ok(load_snapshot_file_from_env(HOST_STATE_ENV)?)
+}
+
+fn open_telemetry_log() -> io::Result<TelemetryLog> {
+ let log_root = state_root()?.join("mcp");
+ fs::create_dir_all(&log_root)?;
+ TelemetryLog::new(
+ log_root.join("telemetry.jsonl").as_path(),
+ repo_root()?.as_path(),
+ 1,
+ )
+}
+
+fn repo_root() -> io::Result<PathBuf> {
+ std::env::current_dir()
+}
+
+fn state_root() -> io::Result<PathBuf> {
+ if let Some(root) = state_dir() {
+ return Ok(root.join(SERVER_NAME));
+ }
+ if let Some(home) = home_dir() {
+ return Ok(home.join(".local").join("state").join(SERVER_NAME));
+ }
+ Err(io::Error::new(
+ io::ErrorKind::NotFound,
+ "failed to resolve a state directory for telemetry",
+ ))
+}
+
+fn health_snapshot_output(
+ health: &libmcp::HealthSnapshot,
+ worker_alive: bool,
+ launch_path_stable: bool,
+ generation: Generation,
+) -> Result<ToolOutput, FaultRecord> {
+ let rollout_pending = matches!(health.rollout, Some(RolloutState::Pending));
+ let concise = json!({
+ "ready": matches!(health.state, libmcp::LifecycleState::Ready),
+ "worker_generation": health.generation.get(),
+ "worker_alive": worker_alive,
+ "rollout_pending": rollout_pending,
+ "launch_path_stable": launch_path_stable,
+ });
+ let full = json!({
+ "health": health,
+ "worker_alive": worker_alive,
+ "launch_path_stable": launch_path_stable,
+ });
+ let mut line = format!(
+ "{} gen={}",
+ if matches!(health.state, libmcp::LifecycleState::Ready) {
+ "ready"
+ } else {
+ "not-ready"
+ },
+ health.generation.get()
+ );
+ if let Some(last_fault) = health.last_fault.as_ref() {
+ line.push_str(format!(" last_fault={}", last_fault.code.as_str()).as_str());
+ }
+ line.push_str(if worker_alive {
+ " worker=alive"
+ } else {
+ " worker=dead"
+ });
+ if rollout_pending {
+ line.push_str(" rollout=pending");
+ }
+ if !launch_path_stable {
+ line.push_str(" launch_path=unstable");
+ }
+ fallback_detailed_tool_output(
+ &concise,
+ &full,
+ line,
+ None,
+ libmcp::SurfaceKind::Ops,
+ generation,
+ FaultStage::Host,
+ "tools/call:health_snapshot",
+ )
+}
+
+fn telemetry_snapshot_output(
+ telemetry: &libmcp::TelemetrySnapshot,
+ host_rollouts: u64,
+ generation: Generation,
+) -> Result<ToolOutput, FaultRecord> {
+ let hot_methods = telemetry.methods.iter().take(6).collect::<Vec<_>>();
+ let concise = json!({
+ "requests": telemetry.totals.request_count,
+ "successes": telemetry.totals.success_count,
+ "response_errors": telemetry.totals.response_error_count,
+ "transport_faults": telemetry.totals.transport_fault_count,
+ "retries": telemetry.totals.retry_count,
+ "worker_restarts": telemetry.restart_count,
+ "host_rollouts": host_rollouts,
+ "hot_methods": hot_methods.iter().map(|method| json!({
+ "method": method.method,
+ "requests": method.request_count,
+ "response_errors": method.response_error_count,
+ "transport_faults": method.transport_fault_count,
+ "retries": method.retry_count,
+ })).collect::<Vec<_>>(),
+ });
+ let full = json!({
+ "telemetry": telemetry,
+ "host_rollouts": host_rollouts,
+ });
+ let mut lines = vec![format!(
+ "requests={} success={} response_error={} transport_fault={} retry={}",
+ telemetry.totals.request_count,
+ telemetry.totals.success_count,
+ telemetry.totals.response_error_count,
+ telemetry.totals.transport_fault_count,
+ telemetry.totals.retry_count
+ )];
+ lines.push(format!(
+ "worker_restarts={} host_rollouts={host_rollouts}",
+ telemetry.restart_count,
+ ));
+ if !hot_methods.is_empty() {
+ lines.push("hot methods:".to_owned());
+ for method in hot_methods {
+ lines.push(format!(
+ "{} req={} err={} transport={} retry={}",
+ method.method,
+ method.request_count,
+ method.response_error_count,
+ method.transport_fault_count,
+ method.retry_count,
+ ));
+ }
+ }
+ fallback_detailed_tool_output(
+ &concise,
+ &full,
+ lines.join("\n"),
+ None,
+ libmcp::SurfaceKind::Ops,
+ generation,
+ FaultStage::Host,
+ "tools/call:telemetry_snapshot",
+ )
+}
+
+fn deserialize<T: for<'de> serde::Deserialize<'de>>(
+ value: Value,
+ operation: &str,
+ generation: Generation,
+) -> Result<T, FaultRecord> {
+ serde_json::from_value(value).map_err(|error| {
+ FaultRecord::invalid_input(
+ generation,
+ FaultStage::Protocol,
+ operation,
+ format!("invalid params: {error}"),
+ )
+ })
+}
+
+fn operation_key(method: &str, params: &Value) -> String {
+ match method {
+ "tools/call" => params.get("name").and_then(Value::as_str).map_or_else(
+ || "tools/call".to_owned(),
+ |name| format!("tools/call:{name}"),
+ ),
+ other => other.to_owned(),
+ }
+}
+
+fn request_id_from_frame(frame: &FramedMessage) -> Option<RequestId> {
+ match frame.classify() {
+ libmcp::RpcEnvelopeKind::Request { id, .. } => Some(id),
+ libmcp::RpcEnvelopeKind::Notification { .. }
+ | libmcp::RpcEnvelopeKind::Response { .. }
+ | libmcp::RpcEnvelopeKind::Unknown => None,
+ }
+}
+
+fn jsonrpc_result(id: Value, result: Value) -> Value {
+ json!({
+ "jsonrpc": "2.0",
+ "id": id,
+ "result": result,
+ })
+}
+
+fn jsonrpc_error(id: Value, fault: FaultRecord) -> Value {
+ json!({
+ "jsonrpc": "2.0",
+ "id": id,
+ "error": fault.into_jsonrpc_error(),
+ })
+}
+
+fn write_message(stdout: &mut impl Write, message: &Value) -> io::Result<()> {
+ serde_json::to_writer(&mut *stdout, message)?;
+ stdout.write_all(b"\n")?;
+ stdout.flush()?;
+ Ok(())
+}
+
+fn elapsed_ms(duration: std::time::Duration) -> u64 {
+ u64::try_from(duration.as_millis()).unwrap_or(u64::MAX)
+}
+
+#[derive(Debug, serde::Deserialize)]
+struct ToolCallEnvelope {
+ name: String,
+ #[serde(default = "empty_json_object")]
+ arguments: Value,
+}
+
+fn empty_json_object() -> Value {
+ json!({})
+}
diff --git a/crates/phone-opus/src/mcp/mod.rs b/crates/phone-opus/src/mcp/mod.rs
new file mode 100644
index 0000000..666598f
--- /dev/null
+++ b/crates/phone-opus/src/mcp/mod.rs
@@ -0,0 +1,10 @@
+mod catalog;
+mod fault;
+mod host;
+mod output;
+mod protocol;
+mod service;
+mod telemetry;
+
+pub(crate) use host::runtime::run_host;
+pub(crate) use service::run_worker;
diff --git a/crates/phone-opus/src/mcp/output.rs b/crates/phone-opus/src/mcp/output.rs
new file mode 100644
index 0000000..90673b3
--- /dev/null
+++ b/crates/phone-opus/src/mcp/output.rs
@@ -0,0 +1,195 @@
+use libmcp::{
+ DetailLevel, FallbackJsonProjection, JsonPorcelainConfig, ProjectionError, RenderMode,
+ SurfaceKind, ToolProjection, render_json_porcelain, with_presentation_properties,
+};
+use serde::Serialize;
+use serde_json::{Value, json};
+
+use crate::mcp::fault::{FaultRecord, FaultStage};
+
+const FULL_PORCELAIN_MAX_LINES: usize = 40;
+const FULL_PORCELAIN_MAX_INLINE_CHARS: usize = 512;
+
+#[derive(Debug, Clone, Copy, PartialEq, Eq)]
+pub(crate) struct Presentation {
+ pub(crate) render: RenderMode,
+ pub(crate) detail: DetailLevel,
+}
+
+#[derive(Debug, Clone)]
+pub(crate) struct ToolOutput {
+ concise: Value,
+ full: Value,
+ concise_text: String,
+ full_text: Option<String>,
+}
+
+impl ToolOutput {
+ pub(crate) fn from_values(
+ concise: Value,
+ full: Value,
+ concise_text: impl Into<String>,
+ full_text: Option<String>,
+ ) -> Self {
+ Self {
+ concise,
+ full,
+ concise_text: concise_text.into(),
+ full_text,
+ }
+ }
+
+ fn structured(&self, detail: DetailLevel) -> &Value {
+ match detail {
+ DetailLevel::Concise => &self.concise,
+ DetailLevel::Full => &self.full,
+ }
+ }
+
+ fn porcelain_text(&self, detail: DetailLevel) -> String {
+ match detail {
+ DetailLevel::Concise => self.concise_text.clone(),
+ DetailLevel::Full => self
+ .full_text
+ .clone()
+ .unwrap_or_else(|| render_json_porcelain(&self.full, full_porcelain_config())),
+ }
+ }
+}
+
+impl Default for Presentation {
+ fn default() -> Self {
+ Self {
+ render: RenderMode::Porcelain,
+ detail: DetailLevel::Concise,
+ }
+ }
+}
+
+pub(crate) fn split_presentation(
+ arguments: Value,
+ operation: &str,
+ generation: libmcp::Generation,
+ stage: FaultStage,
+) -> Result<(Presentation, Value), FaultRecord> {
+ let Value::Object(mut object) = arguments else {
+ return Ok((Presentation::default(), arguments));
+ };
+ let render = object
+ .remove("render")
+ .map(|value| {
+ serde_json::from_value::<RenderMode>(value).map_err(|error| {
+ FaultRecord::invalid_input(
+ generation,
+ stage,
+ operation,
+ format!("invalid render mode: {error}"),
+ )
+ })
+ })
+ .transpose()?
+ .unwrap_or(RenderMode::Porcelain);
+ let detail = object
+ .remove("detail")
+ .map(|value| {
+ serde_json::from_value::<DetailLevel>(value).map_err(|error| {
+ FaultRecord::invalid_input(
+ generation,
+ stage,
+ operation,
+ format!("invalid detail level: {error}"),
+ )
+ })
+ })
+ .transpose()?
+ .unwrap_or(DetailLevel::Concise);
+ Ok((Presentation { render, detail }, Value::Object(object)))
+}
+
+pub(crate) fn projected_tool_output(
+ projection: &impl ToolProjection,
+ concise_text: impl Into<String>,
+ full_text: Option<String>,
+ generation: libmcp::Generation,
+ stage: FaultStage,
+ operation: &str,
+) -> Result<ToolOutput, FaultRecord> {
+ let concise = projection
+ .concise_projection()
+ .map_err(|error| projection_fault(error, generation, stage, operation))?;
+ let full = projection
+ .full_projection()
+ .map_err(|error| projection_fault(error, generation, stage, operation))?;
+ Ok(ToolOutput::from_values(
+ concise,
+ full,
+ concise_text,
+ full_text,
+ ))
+}
+
+pub(crate) fn fallback_detailed_tool_output(
+ concise: &impl Serialize,
+ full: &impl Serialize,
+ concise_text: impl Into<String>,
+ full_text: Option<String>,
+ kind: SurfaceKind,
+ generation: libmcp::Generation,
+ stage: FaultStage,
+ operation: &str,
+) -> Result<ToolOutput, FaultRecord> {
+ let projection = FallbackJsonProjection::new(concise, full, kind)
+ .map_err(|error| projection_fault(error, generation, stage, operation))?;
+ projected_tool_output(
+ &projection,
+ concise_text,
+ full_text,
+ generation,
+ stage,
+ operation,
+ )
+}
+
+pub(crate) fn tool_success(
+ output: ToolOutput,
+ presentation: Presentation,
+ generation: libmcp::Generation,
+ stage: FaultStage,
+ operation: &str,
+) -> Result<Value, FaultRecord> {
+ let structured = output.structured(presentation.detail).clone();
+ let text = match presentation.render {
+ RenderMode::Porcelain => output.porcelain_text(presentation.detail),
+ RenderMode::Json => serde_json::to_string_pretty(&structured).map_err(|error| {
+ FaultRecord::internal(generation, stage, operation, error.to_string())
+ })?,
+ };
+ Ok(json!({
+ "content": [{
+ "type": "text",
+ "text": text,
+ }],
+ "structuredContent": structured,
+ "isError": false,
+ }))
+}
+
+pub(crate) fn with_common_presentation(schema: Value) -> Value {
+ with_presentation_properties(schema)
+}
+
+fn projection_fault(
+ error: ProjectionError,
+ generation: libmcp::Generation,
+ stage: FaultStage,
+ operation: &str,
+) -> FaultRecord {
+ FaultRecord::internal(generation, stage, operation, error.to_string())
+}
+
+const fn full_porcelain_config() -> JsonPorcelainConfig {
+ JsonPorcelainConfig {
+ max_lines: FULL_PORCELAIN_MAX_LINES,
+ max_inline_chars: FULL_PORCELAIN_MAX_INLINE_CHARS,
+ }
+}
diff --git a/crates/phone-opus/src/mcp/protocol.rs b/crates/phone-opus/src/mcp/protocol.rs
new file mode 100644
index 0000000..6662fa9
--- /dev/null
+++ b/crates/phone-opus/src/mcp/protocol.rs
@@ -0,0 +1,74 @@
+use std::path::PathBuf;
+
+use libmcp::{Generation, HostSessionKernelSnapshot};
+use serde::{Deserialize, Serialize};
+use serde_json::Value;
+
+use crate::mcp::telemetry::ServerTelemetry;
+
+pub(crate) const PROTOCOL_VERSION: &str = "2025-11-25";
+pub(crate) const SERVER_NAME: &str = "phone_opus";
+pub(crate) const HOST_STATE_ENV: &str = "PHONE_OPUS_MCP_HOST_STATE";
+pub(crate) const FORCE_ROLLOUT_ENV: &str = "PHONE_OPUS_MCP_TEST_FORCE_ROLLOUT_KEY";
+pub(crate) const WORKER_CRASH_ONCE_ENV: &str = "PHONE_OPUS_MCP_TEST_WORKER_CRASH_ONCE_KEY";
+pub(crate) const CLAUDE_BIN_ENV: &str = "PHONE_OPUS_CLAUDE_BIN";
+pub(crate) const CLAUDE_TOOLSET: &str = "Bash,Read,Grep,Glob,LS,WebFetch,WebSearch";
+pub(crate) const EMPTY_MCP_CONFIG: &str = "{\"mcpServers\":{}}";
+
+#[derive(Clone, Debug, Deserialize, Serialize)]
+pub(crate) struct HostStateSeed {
+ pub(crate) session_kernel: HostSessionKernelSnapshot,
+ pub(crate) telemetry: ServerTelemetry,
+ pub(crate) next_request_id: u64,
+ pub(crate) worker_generation: Generation,
+ pub(crate) worker_spawned: bool,
+ pub(crate) force_rollout_consumed: bool,
+ pub(crate) worker_crash_once_consumed: bool,
+}
+
+#[derive(Clone, Copy, Debug, Deserialize, Eq, Ord, PartialEq, PartialOrd, Serialize)]
+#[serde(transparent)]
+pub(crate) struct HostRequestId(pub(crate) u64);
+
+#[derive(Clone, Debug, Deserialize, Eq, PartialEq, Serialize)]
+#[serde(tag = "kind", rename_all = "snake_case")]
+pub(crate) enum WorkerRequest {
+ Execute {
+ id: HostRequestId,
+ operation: WorkerOperation,
+ },
+}
+
+#[derive(Clone, Debug, Deserialize, Eq, PartialEq, Serialize)]
+#[serde(tag = "kind", rename_all = "snake_case")]
+pub(crate) enum WorkerOperation {
+ CallTool { name: String, arguments: Value },
+}
+
+#[derive(Clone, Debug, Deserialize, Eq, PartialEq, Serialize)]
+pub(crate) struct WorkerResponse {
+ pub(crate) id: HostRequestId,
+ pub(crate) outcome: WorkerOutcome,
+}
+
+#[derive(Clone, Debug, Deserialize, Eq, PartialEq, Serialize)]
+#[serde(tag = "status", rename_all = "snake_case")]
+pub(crate) enum WorkerOutcome {
+ Success {
+ result: Value,
+ },
+ Fault {
+ fault: crate::mcp::fault::FaultRecord,
+ },
+}
+
+#[derive(Clone, Debug, Deserialize, Eq, PartialEq, Serialize)]
+pub(crate) struct BinaryFingerprint {
+ pub(crate) length_bytes: u64,
+ pub(crate) modified_unix_nanos: u128,
+}
+
+#[derive(Clone, Debug)]
+pub(crate) struct WorkerSpawnConfig {
+ pub(crate) executable: PathBuf,
+}
diff --git a/crates/phone-opus/src/mcp/service.rs b/crates/phone-opus/src/mcp/service.rs
new file mode 100644
index 0000000..2472887
--- /dev/null
+++ b/crates/phone-opus/src/mcp/service.rs
@@ -0,0 +1,550 @@
+use std::collections::BTreeMap;
+use std::io::{self, BufRead, Write};
+use std::path::{Path, PathBuf};
+use std::process::Command;
+
+use libmcp::{Generation, SurfaceKind};
+use serde::Deserialize;
+use serde_json::{Value, json};
+use thiserror::Error;
+
+use crate::mcp::fault::{FaultRecord, FaultStage};
+use crate::mcp::output::{
+ ToolOutput, fallback_detailed_tool_output, split_presentation, tool_success,
+};
+use crate::mcp::protocol::{CLAUDE_BIN_ENV, CLAUDE_TOOLSET, EMPTY_MCP_CONFIG};
+
+pub(crate) fn run_worker(generation: u64) -> Result<(), Box<dyn std::error::Error>> {
+ let generation = generation_from_wire(generation);
+ let stdin = io::stdin();
+ let mut stdout = io::stdout().lock();
+ let mut service = WorkerService::new(generation);
+
+ for line in stdin.lock().lines() {
+ let line = line?;
+ if line.trim().is_empty() {
+ continue;
+ }
+ let request = serde_json::from_str::<crate::mcp::protocol::WorkerRequest>(&line)?;
+ let response = match request {
+ crate::mcp::protocol::WorkerRequest::Execute { id, operation } => {
+ let outcome = match service.execute(operation) {
+ Ok(result) => crate::mcp::protocol::WorkerOutcome::Success { result },
+ Err(fault) => crate::mcp::protocol::WorkerOutcome::Fault { fault },
+ };
+ crate::mcp::protocol::WorkerResponse { id, outcome }
+ }
+ };
+ serde_json::to_writer(&mut stdout, &response)?;
+ stdout.write_all(b"\n")?;
+ stdout.flush()?;
+ }
+
+ Ok(())
+}
+
+struct WorkerService {
+ generation: Generation,
+}
+
+impl WorkerService {
+ fn new(generation: Generation) -> Self {
+ Self { generation }
+ }
+
+ fn execute(
+ &mut self,
+ operation: crate::mcp::protocol::WorkerOperation,
+ ) -> Result<Value, FaultRecord> {
+ match operation {
+ crate::mcp::protocol::WorkerOperation::CallTool { name, arguments } => {
+ self.call_tool(&name, arguments)
+ }
+ }
+ }
+
+ fn call_tool(&mut self, name: &str, arguments: Value) -> Result<Value, FaultRecord> {
+ let operation = format!("tools/call:{name}");
+ let (presentation, arguments) =
+ split_presentation(arguments, &operation, self.generation, FaultStage::Worker)?;
+ let output = match name {
+ "consult" => {
+ let args = deserialize::<ConsultArgs>(arguments, &operation, self.generation)?;
+ let request = ConsultRequest::parse(args)
+ .map_err(|error| invalid_consult_request(self.generation, &operation, error))?;
+ let response = invoke_claude(&request)
+ .map_err(|error| consult_fault(self.generation, &operation, error))?;
+ consult_output(&request, &response, self.generation, &operation)?
+ }
+ other => {
+ return Err(FaultRecord::invalid_input(
+ self.generation,
+ FaultStage::Worker,
+ &operation,
+ format!("unknown worker tool `{other}`"),
+ ));
+ }
+ };
+ tool_success(
+ output,
+ presentation,
+ self.generation,
+ FaultStage::Worker,
+ &operation,
+ )
+ }
+}
+
+#[derive(Debug, Deserialize)]
+struct ConsultArgs {
+ prompt: String,
+ cwd: Option<String>,
+ max_turns: Option<u64>,
+}
+
+#[derive(Debug, Clone)]
+struct ConsultRequest {
+ prompt: PromptText,
+ cwd: WorkingDirectory,
+ max_turns: Option<TurnLimit>,
+}
+
+impl ConsultRequest {
+ fn parse(args: ConsultArgs) -> Result<Self, ConsultRequestError> {
+ Ok(Self {
+ prompt: PromptText::parse(args.prompt)?,
+ cwd: WorkingDirectory::resolve(args.cwd)?,
+ max_turns: args.max_turns.map(TurnLimit::parse).transpose()?,
+ })
+ }
+}
+
+#[derive(Debug, Clone)]
+struct PromptText(String);
+
+impl PromptText {
+ fn parse(raw: String) -> Result<Self, ConsultRequestError> {
+ if raw.trim().is_empty() {
+ return Err(ConsultRequestError::EmptyPrompt);
+ }
+ Ok(Self(raw))
+ }
+
+ fn as_str(&self) -> &str {
+ self.0.as_str()
+ }
+}
+
+#[derive(Debug, Clone)]
+struct WorkingDirectory(PathBuf);
+
+impl WorkingDirectory {
+ fn resolve(raw: Option<String>) -> Result<Self, ConsultRequestError> {
+ let base =
+ std::env::current_dir().map_err(|source| ConsultRequestError::CurrentDir { source })?;
+ let requested = raw.map_or_else(|| base.clone(), PathBuf::from);
+ let candidate = if requested.is_absolute() {
+ requested
+ } else {
+ base.join(requested)
+ };
+ let canonical =
+ candidate
+ .canonicalize()
+ .map_err(|source| ConsultRequestError::Canonicalize {
+ path: candidate.display().to_string(),
+ source,
+ })?;
+ if !canonical.is_dir() {
+ return Err(ConsultRequestError::NotDirectory(
+ canonical.display().to_string(),
+ ));
+ }
+ Ok(Self(canonical))
+ }
+
+ fn as_path(&self) -> &Path {
+ self.0.as_path()
+ }
+
+ fn display(&self) -> String {
+ self.0.display().to_string()
+ }
+}
+
+#[derive(Debug, Clone, Copy)]
+struct TurnLimit(u64);
+
+impl TurnLimit {
+ fn parse(raw: u64) -> Result<Self, ConsultRequestError> {
+ if raw == 0 {
+ return Err(ConsultRequestError::InvalidTurnLimit);
+ }
+ Ok(Self(raw))
+ }
+
+ fn get(self) -> u64 {
+ self.0
+ }
+}
+
+#[derive(Debug, Error)]
+enum ConsultRequestError {
+ #[error("prompt must not be empty")]
+ EmptyPrompt,
+ #[error("failed to resolve the current working directory: {source}")]
+ CurrentDir { source: io::Error },
+ #[error("failed to resolve working directory `{path}`: {source}")]
+ Canonicalize { path: String, source: io::Error },
+ #[error("working directory `{0}` is not a directory")]
+ NotDirectory(String),
+ #[error("max_turns must be greater than zero")]
+ InvalidTurnLimit,
+}
+
+#[derive(Debug, Error)]
+enum ConsultInvocationError {
+ #[error("failed to spawn Claude Code: {0}")]
+ Spawn(#[source] io::Error),
+ #[error("Claude Code returned non-JSON output: {0}")]
+ InvalidJson(String),
+ #[error("{0}")]
+ Downstream(String),
+}
+
+#[derive(Debug, Deserialize)]
+struct ClaudeJsonEnvelope {
+ #[serde(rename = "type")]
+ envelope_type: String,
+ subtype: Option<String>,
+ is_error: bool,
+ duration_ms: Option<u64>,
+ duration_api_ms: Option<u64>,
+ num_turns: Option<u64>,
+ result: Option<String>,
+ stop_reason: Option<String>,
+ session_id: Option<String>,
+ total_cost_usd: Option<f64>,
+ usage: Option<Value>,
+ #[serde(rename = "modelUsage")]
+ model_usage: Option<Value>,
+ #[serde(default)]
+ permission_denials: Vec<Value>,
+ fast_mode_state: Option<String>,
+ uuid: Option<String>,
+}
+
+#[derive(Debug)]
+struct ConsultResponse {
+ cwd: WorkingDirectory,
+ result: String,
+ duration_ms: u64,
+ duration_api_ms: Option<u64>,
+ num_turns: u64,
+ stop_reason: Option<String>,
+ session_id: Option<String>,
+ total_cost_usd: Option<f64>,
+ usage: Option<Value>,
+ model_usage: Option<Value>,
+ permission_denials: Vec<Value>,
+ fast_mode_state: Option<String>,
+ uuid: Option<String>,
+}
+
+impl ConsultResponse {
+ fn model_name(&self) -> Option<String> {
+ let Value::Object(models) = self.model_usage.as_ref()? else {
+ return None;
+ };
+ models.keys().next().cloned()
+ }
+}
+
+fn deserialize<T: for<'de> Deserialize<'de>>(
+ value: Value,
+ operation: &str,
+ generation: Generation,
+) -> Result<T, FaultRecord> {
+ serde_json::from_value(value).map_err(|error| {
+ FaultRecord::invalid_input(
+ generation,
+ FaultStage::Protocol,
+ operation,
+ format!("invalid params: {error}"),
+ )
+ })
+}
+
+fn invalid_consult_request(
+ generation: Generation,
+ operation: &str,
+ error: ConsultRequestError,
+) -> FaultRecord {
+ FaultRecord::invalid_input(generation, FaultStage::Worker, operation, error.to_string())
+}
+
+fn consult_fault(
+ generation: Generation,
+ operation: &str,
+ error: ConsultInvocationError,
+) -> FaultRecord {
+ match error {
+ ConsultInvocationError::Spawn(source) => FaultRecord::process(
+ generation,
+ FaultStage::Claude,
+ operation,
+ source.to_string(),
+ ),
+ ConsultInvocationError::InvalidJson(detail)
+ | ConsultInvocationError::Downstream(detail) => {
+ FaultRecord::downstream(generation, FaultStage::Claude, operation, detail)
+ }
+ }
+}
+
+fn invoke_claude(request: &ConsultRequest) -> Result<ConsultResponse, ConsultInvocationError> {
+ let mut command = Command::new(claude_binary());
+ let _ = command
+ .arg("-p")
+ .arg("--output-format")
+ .arg("json")
+ .arg("--strict-mcp-config")
+ .arg("--mcp-config")
+ .arg(EMPTY_MCP_CONFIG)
+ .arg("--disable-slash-commands")
+ .arg("--no-chrome")
+ .arg("--tools")
+ .arg(CLAUDE_TOOLSET)
+ .arg("--permission-mode")
+ .arg("dontAsk");
+ if let Some(max_turns) = request.max_turns {
+ let _ = command.arg("--max-turns").arg(max_turns.get().to_string());
+ }
+ let output = command
+ .current_dir(request.cwd.as_path())
+ .arg(request.prompt.as_str())
+ .output()
+ .map_err(ConsultInvocationError::Spawn)?;
+ let stdout = String::from_utf8_lossy(&output.stdout).trim().to_owned();
+ let stderr = String::from_utf8_lossy(&output.stderr).trim().to_owned();
+ let envelope = match serde_json::from_slice::<ClaudeJsonEnvelope>(&output.stdout) {
+ Ok(envelope) => envelope,
+ Err(_error) if !output.status.success() => {
+ return Err(ConsultInvocationError::Downstream(downstream_message(
+ output.status.code(),
+ &stdout,
+ &stderr,
+ )));
+ }
+ Err(error) => {
+ return Err(ConsultInvocationError::InvalidJson(format!(
+ "{error}; stdout={stdout}; stderr={stderr}"
+ )));
+ }
+ };
+ if envelope.envelope_type != "result" {
+ return Err(ConsultInvocationError::Downstream(format!(
+ "unexpected Claude envelope type `{}`",
+ envelope.envelope_type
+ )));
+ }
+ if !output.status.success()
+ || envelope.is_error
+ || envelope.subtype.as_deref() != Some("success")
+ {
+ return Err(ConsultInvocationError::Downstream(
+ envelope
+ .result
+ .filter(|value| !value.trim().is_empty())
+ .unwrap_or_else(|| downstream_message(output.status.code(), &stdout, &stderr)),
+ ));
+ }
+ Ok(ConsultResponse {
+ cwd: request.cwd.clone(),
+ result: envelope.result.unwrap_or_default(),
+ duration_ms: envelope.duration_ms.unwrap_or(0),
+ duration_api_ms: envelope.duration_api_ms,
+ num_turns: envelope.num_turns.unwrap_or(0),
+ stop_reason: envelope.stop_reason,
+ session_id: envelope.session_id,
+ total_cost_usd: envelope.total_cost_usd,
+ usage: envelope.usage,
+ model_usage: envelope.model_usage,
+ permission_denials: envelope.permission_denials,
+ fast_mode_state: envelope.fast_mode_state,
+ uuid: envelope.uuid,
+ })
+}
+
+fn downstream_message(status_code: Option<i32>, stdout: &str, stderr: &str) -> String {
+ if !stderr.is_empty() {
+ return stderr.to_owned();
+ }
+ if !stdout.is_empty() {
+ return stdout.to_owned();
+ }
+ format!("Claude Code exited with status {status_code:?}")
+}
+
+fn claude_binary() -> PathBuf {
+ std::env::var_os(CLAUDE_BIN_ENV)
+ .map(PathBuf::from)
+ .unwrap_or_else(|| PathBuf::from("claude"))
+}
+
+fn consult_output(
+ request: &ConsultRequest,
+ response: &ConsultResponse,
+ generation: Generation,
+ operation: &str,
+) -> Result<ToolOutput, FaultRecord> {
+ let concise = json!({
+ "response": response.result,
+ "cwd": response.cwd.display(),
+ "model": response.model_name(),
+ "duration_ms": response.duration_ms,
+ "num_turns": response.num_turns,
+ "stop_reason": response.stop_reason,
+ "session_id": response.session_id,
+ "total_cost_usd": response.total_cost_usd,
+ "permission_denial_count": response.permission_denials.len(),
+ });
+ let full = json!({
+ "response": response.result,
+ "cwd": response.cwd.display(),
+ "prompt": request.prompt.as_str(),
+ "max_turns": request.max_turns.map(TurnLimit::get),
+ "duration_ms": response.duration_ms,
+ "duration_api_ms": response.duration_api_ms,
+ "num_turns": response.num_turns,
+ "stop_reason": response.stop_reason,
+ "session_id": response.session_id,
+ "total_cost_usd": response.total_cost_usd,
+ "usage": response.usage,
+ "model_usage": response.model_usage,
+ "permission_denials": response.permission_denials,
+ "fast_mode_state": response.fast_mode_state,
+ "uuid": response.uuid,
+ });
+ fallback_detailed_tool_output(
+ &concise,
+ &full,
+ concise_text(response),
+ Some(full_text(response)),
+ SurfaceKind::Read,
+ generation,
+ FaultStage::Worker,
+ operation,
+ )
+}
+
+fn concise_text(response: &ConsultResponse) -> String {
+ let mut status = vec![
+ "consult ok".to_owned(),
+ format!("turns={}", response.num_turns),
+ format!("duration={}", render_duration_ms(response.duration_ms)),
+ ];
+ if let Some(model) = response.model_name() {
+ status.push(format!("model={model}"));
+ }
+ if let Some(stop_reason) = response.stop_reason.as_deref() {
+ status.push(format!("stop={stop_reason}"));
+ }
+ if let Some(cost) = response.total_cost_usd {
+ status.push(format!("cost=${cost:.6}"));
+ }
+
+ let mut lines = vec![status.join(" ")];
+ lines.push(format!("cwd: {}", response.cwd.display()));
+ if let Some(session_id) = response.session_id.as_deref() {
+ lines.push(format!("session: {session_id}"));
+ }
+ if !response.permission_denials.is_empty() {
+ lines.push(format!(
+ "permission_denials: {}",
+ response.permission_denials.len()
+ ));
+ }
+ lines.push("response:".to_owned());
+ lines.push(response.result.clone());
+ lines.join("\n")
+}
+
+fn full_text(response: &ConsultResponse) -> String {
+ let mut lines = vec![
+ format!("consult ok turns={}", response.num_turns),
+ format!("cwd: {}", response.cwd.display()),
+ format!("duration: {}", render_duration_ms(response.duration_ms)),
+ ];
+ if let Some(duration_api_ms) = response.duration_api_ms {
+ lines.push(format!(
+ "api_duration: {}",
+ render_duration_ms(duration_api_ms)
+ ));
+ }
+ if let Some(model) = response.model_name() {
+ lines.push(format!("model: {model}"));
+ }
+ if let Some(stop_reason) = response.stop_reason.as_deref() {
+ lines.push(format!("stop: {stop_reason}"));
+ }
+ if let Some(session_id) = response.session_id.as_deref() {
+ lines.push(format!("session: {session_id}"));
+ }
+ if let Some(cost) = response.total_cost_usd {
+ lines.push(format!("cost_usd: {cost:.6}"));
+ }
+ lines.push(format!(
+ "permission_denials: {}",
+ response.permission_denials.len()
+ ));
+ if let Some(fast_mode_state) = response.fast_mode_state.as_deref() {
+ lines.push(format!("fast_mode: {fast_mode_state}"));
+ }
+ if let Some(uuid) = response.uuid.as_deref() {
+ lines.push(format!("uuid: {uuid}"));
+ }
+ if let Some(usage) = usage_summary(response.usage.as_ref()) {
+ lines.push(format!("usage: {usage}"));
+ }
+ lines.push("response:".to_owned());
+ lines.push(response.result.clone());
+ lines.join("\n")
+}
+
+fn usage_summary(usage: Option<&Value>) -> Option<String> {
+ let Value::Object(usage) = usage? else {
+ return None;
+ };
+ let summary = usage
+ .iter()
+ .filter_map(|(key, value)| match value {
+ Value::Number(number) => Some((key.clone(), number.to_string())),
+ Value::String(text) if !text.is_empty() => Some((key.clone(), text.clone())),
+ _ => None,
+ })
+ .collect::<BTreeMap<_, _>>();
+ (!summary.is_empty()).then(|| {
+ summary
+ .into_iter()
+ .map(|(key, value)| format!("{key}={value}"))
+ .collect::<Vec<_>>()
+ .join(" ")
+ })
+}
+
+fn render_duration_ms(duration_ms: u64) -> String {
+ if duration_ms < 1_000 {
+ return format!("{duration_ms}ms");
+ }
+ let seconds = duration_ms as f64 / 1_000.0;
+ format!("{seconds:.3}s")
+}
+
+fn generation_from_wire(raw: u64) -> Generation {
+ let mut generation = Generation::genesis();
+ for _ in 1..raw {
+ generation = generation.next();
+ }
+ generation
+}
diff --git a/crates/phone-opus/src/mcp/telemetry.rs b/crates/phone-opus/src/mcp/telemetry.rs
new file mode 100644
index 0000000..8df0009
--- /dev/null
+++ b/crates/phone-opus/src/mcp/telemetry.rs
@@ -0,0 +1,228 @@
+use std::collections::BTreeMap;
+use std::time::{Duration, SystemTime, UNIX_EPOCH};
+
+use libmcp::{
+ Fault, Generation, HealthSnapshot, LifecycleState, MethodTelemetry, RolloutState,
+ TelemetrySnapshot, TelemetryTotals,
+};
+use serde::{Deserialize, Serialize};
+
+use crate::mcp::fault::FaultRecord;
+
+#[derive(Clone, Debug, Default, Deserialize, Eq, PartialEq, Serialize)]
+struct MethodStats {
+ request_count: u64,
+ success_count: u64,
+ response_error_count: u64,
+ transport_fault_count: u64,
+ retry_count: u64,
+ total_latency_ms: u128,
+ max_latency_ms: u64,
+ last_latency_ms: Option<u64>,
+ last_error: Option<String>,
+}
+
+#[derive(Clone, Debug, Deserialize, Eq, PartialEq, Serialize)]
+pub(crate) struct ServerTelemetry {
+ started_unix_ms: u64,
+ state: LifecycleState,
+ generation: Generation,
+ consecutive_failures: u32,
+ restart_count: u64,
+ host_rollouts: u64,
+ totals: TelemetryTotals,
+ methods: BTreeMap<String, MethodStats>,
+ last_fault: Option<Fault>,
+}
+
+impl Default for ServerTelemetry {
+ fn default() -> Self {
+ Self {
+ started_unix_ms: unix_ms_now(),
+ state: LifecycleState::Cold,
+ generation: Generation::genesis(),
+ consecutive_failures: 0,
+ restart_count: 0,
+ host_rollouts: 0,
+ totals: TelemetryTotals {
+ request_count: 0,
+ success_count: 0,
+ response_error_count: 0,
+ transport_fault_count: 0,
+ retry_count: 0,
+ },
+ methods: BTreeMap::new(),
+ last_fault: None,
+ }
+ }
+}
+
+impl ServerTelemetry {
+ pub(crate) fn record_request(&mut self, operation: &str) {
+ self.totals.request_count += 1;
+ self.methods
+ .entry(operation.to_owned())
+ .or_default()
+ .request_count += 1;
+ }
+
+ pub(crate) fn record_success(
+ &mut self,
+ operation: &str,
+ latency_ms: u64,
+ generation: Generation,
+ worker_alive: bool,
+ ) {
+ self.generation = generation;
+ self.state = if worker_alive {
+ LifecycleState::Ready
+ } else {
+ LifecycleState::Cold
+ };
+ self.consecutive_failures = 0;
+ self.last_fault = None;
+ self.totals.success_count += 1;
+ let entry = self.methods.entry(operation.to_owned()).or_default();
+ entry.success_count += 1;
+ entry.total_latency_ms = entry
+ .total_latency_ms
+ .saturating_add(u128::from(latency_ms));
+ entry.max_latency_ms = entry.max_latency_ms.max(latency_ms);
+ entry.last_latency_ms = Some(latency_ms);
+ entry.last_error = None;
+ }
+
+ pub(crate) fn record_error(
+ &mut self,
+ operation: &str,
+ fault: &FaultRecord,
+ latency_ms: u64,
+ generation: Generation,
+ ) {
+ self.generation = generation;
+ self.consecutive_failures = self.consecutive_failures.saturating_add(1);
+ self.last_fault = Some(fault.fault.clone());
+ let transportish = matches!(
+ fault.fault.class,
+ libmcp::FaultClass::Transport
+ | libmcp::FaultClass::Process
+ | libmcp::FaultClass::Timeout
+ | libmcp::FaultClass::Resource
+ | libmcp::FaultClass::Replay
+ | libmcp::FaultClass::Rollout
+ );
+ if transportish {
+ self.state = LifecycleState::Recovering;
+ self.totals.transport_fault_count += 1;
+ } else {
+ self.totals.response_error_count += 1;
+ }
+ let entry = self.methods.entry(operation.to_owned()).or_default();
+ if transportish {
+ entry.transport_fault_count += 1;
+ } else {
+ entry.response_error_count += 1;
+ }
+ entry.total_latency_ms = entry
+ .total_latency_ms
+ .saturating_add(u128::from(latency_ms));
+ entry.max_latency_ms = entry.max_latency_ms.max(latency_ms);
+ entry.last_latency_ms = Some(latency_ms);
+ entry.last_error = Some(fault.message().to_owned());
+ }
+
+ pub(crate) fn record_retry(&mut self, operation: &str) {
+ self.totals.retry_count += 1;
+ self.methods
+ .entry(operation.to_owned())
+ .or_default()
+ .retry_count += 1;
+ }
+
+ pub(crate) fn record_worker_restart(&mut self, generation: Generation) {
+ self.generation = generation;
+ self.restart_count += 1;
+ self.state = LifecycleState::Recovering;
+ }
+
+ pub(crate) fn record_rollout(&mut self) {
+ self.host_rollouts += 1;
+ }
+
+ pub(crate) fn host_rollouts(&self) -> u64 {
+ self.host_rollouts
+ }
+
+ pub(crate) fn health_snapshot(&self, rollout: RolloutState) -> HealthSnapshot {
+ HealthSnapshot {
+ state: self.state,
+ generation: self.generation,
+ uptime_ms: self.uptime_ms(),
+ consecutive_failures: self.consecutive_failures,
+ restart_count: self.restart_count,
+ rollout: Some(rollout),
+ last_fault: self.last_fault.clone(),
+ }
+ }
+
+ pub(crate) fn telemetry_snapshot(&self) -> TelemetrySnapshot {
+ TelemetrySnapshot {
+ uptime_ms: self.uptime_ms(),
+ state: self.state,
+ generation: self.generation,
+ consecutive_failures: self.consecutive_failures,
+ restart_count: self.restart_count,
+ totals: self.totals.clone(),
+ methods: self.ranked_methods(),
+ last_fault: self.last_fault.clone(),
+ }
+ }
+
+ pub(crate) fn ranked_methods(&self) -> Vec<MethodTelemetry> {
+ let mut methods = self
+ .methods
+ .iter()
+ .map(|(method, stats)| MethodTelemetry {
+ method: method.clone(),
+ request_count: stats.request_count,
+ success_count: stats.success_count,
+ response_error_count: stats.response_error_count,
+ transport_fault_count: stats.transport_fault_count,
+ retry_count: stats.retry_count,
+ last_latency_ms: stats.last_latency_ms,
+ max_latency_ms: stats.max_latency_ms,
+ avg_latency_ms: average_latency_ms(stats),
+ last_error: stats.last_error.clone(),
+ })
+ .collect::<Vec<_>>();
+ methods.sort_by(|left, right| {
+ right
+ .request_count
+ .cmp(&left.request_count)
+ .then_with(|| right.transport_fault_count.cmp(&left.transport_fault_count))
+ .then_with(|| right.response_error_count.cmp(&left.response_error_count))
+ .then_with(|| left.method.cmp(&right.method))
+ });
+ methods
+ }
+
+ fn uptime_ms(&self) -> u64 {
+ unix_ms_now().saturating_sub(self.started_unix_ms)
+ }
+}
+
+fn average_latency_ms(stats: &MethodStats) -> u64 {
+ if stats.request_count == 0 {
+ return 0;
+ }
+ let average = stats.total_latency_ms / u128::from(stats.request_count);
+ u64::try_from(average).unwrap_or(u64::MAX)
+}
+
+fn unix_ms_now() -> u64 {
+ let duration = SystemTime::now()
+ .duration_since(UNIX_EPOCH)
+ .unwrap_or(Duration::ZERO);
+ let millis = duration.as_millis();
+ u64::try_from(millis).unwrap_or(u64::MAX)
+}