diff options
| author | main <main@swarm.moe> | 2026-03-22 22:20:17 -0400 |
|---|---|---|
| committer | main <main@swarm.moe> | 2026-03-22 22:20:17 -0400 |
| commit | d986442e8e4bc2d716c9d63159a1cfa7b1e6ed76 (patch) | |
| tree | b9ca3d0cb62b5c59e614abfb6f74ac5310c69c2f /crates/phone-opus/src/mcp/telemetry.rs | |
| download | phone_opus-d986442e8e4bc2d716c9d63159a1cfa7b1e6ed76.zip | |
Bootstrap consultative Claude Code MCP
Diffstat (limited to 'crates/phone-opus/src/mcp/telemetry.rs')
| -rw-r--r-- | crates/phone-opus/src/mcp/telemetry.rs | 228 |
1 files changed, 228 insertions, 0 deletions
diff --git a/crates/phone-opus/src/mcp/telemetry.rs b/crates/phone-opus/src/mcp/telemetry.rs new file mode 100644 index 0000000..8df0009 --- /dev/null +++ b/crates/phone-opus/src/mcp/telemetry.rs @@ -0,0 +1,228 @@ +use std::collections::BTreeMap; +use std::time::{Duration, SystemTime, UNIX_EPOCH}; + +use libmcp::{ + Fault, Generation, HealthSnapshot, LifecycleState, MethodTelemetry, RolloutState, + TelemetrySnapshot, TelemetryTotals, +}; +use serde::{Deserialize, Serialize}; + +use crate::mcp::fault::FaultRecord; + +#[derive(Clone, Debug, Default, Deserialize, Eq, PartialEq, Serialize)] +struct MethodStats { + request_count: u64, + success_count: u64, + response_error_count: u64, + transport_fault_count: u64, + retry_count: u64, + total_latency_ms: u128, + max_latency_ms: u64, + last_latency_ms: Option<u64>, + last_error: Option<String>, +} + +#[derive(Clone, Debug, Deserialize, Eq, PartialEq, Serialize)] +pub(crate) struct ServerTelemetry { + started_unix_ms: u64, + state: LifecycleState, + generation: Generation, + consecutive_failures: u32, + restart_count: u64, + host_rollouts: u64, + totals: TelemetryTotals, + methods: BTreeMap<String, MethodStats>, + last_fault: Option<Fault>, +} + +impl Default for ServerTelemetry { + fn default() -> Self { + Self { + started_unix_ms: unix_ms_now(), + state: LifecycleState::Cold, + generation: Generation::genesis(), + consecutive_failures: 0, + restart_count: 0, + host_rollouts: 0, + totals: TelemetryTotals { + request_count: 0, + success_count: 0, + response_error_count: 0, + transport_fault_count: 0, + retry_count: 0, + }, + methods: BTreeMap::new(), + last_fault: None, + } + } +} + +impl ServerTelemetry { + pub(crate) fn record_request(&mut self, operation: &str) { + self.totals.request_count += 1; + self.methods + .entry(operation.to_owned()) + .or_default() + .request_count += 1; + } + + pub(crate) fn record_success( + &mut self, + operation: &str, + latency_ms: u64, + generation: Generation, + worker_alive: bool, + ) { + self.generation = generation; + self.state = if worker_alive { + LifecycleState::Ready + } else { + LifecycleState::Cold + }; + self.consecutive_failures = 0; + self.last_fault = None; + self.totals.success_count += 1; + let entry = self.methods.entry(operation.to_owned()).or_default(); + entry.success_count += 1; + entry.total_latency_ms = entry + .total_latency_ms + .saturating_add(u128::from(latency_ms)); + entry.max_latency_ms = entry.max_latency_ms.max(latency_ms); + entry.last_latency_ms = Some(latency_ms); + entry.last_error = None; + } + + pub(crate) fn record_error( + &mut self, + operation: &str, + fault: &FaultRecord, + latency_ms: u64, + generation: Generation, + ) { + self.generation = generation; + self.consecutive_failures = self.consecutive_failures.saturating_add(1); + self.last_fault = Some(fault.fault.clone()); + let transportish = matches!( + fault.fault.class, + libmcp::FaultClass::Transport + | libmcp::FaultClass::Process + | libmcp::FaultClass::Timeout + | libmcp::FaultClass::Resource + | libmcp::FaultClass::Replay + | libmcp::FaultClass::Rollout + ); + if transportish { + self.state = LifecycleState::Recovering; + self.totals.transport_fault_count += 1; + } else { + self.totals.response_error_count += 1; + } + let entry = self.methods.entry(operation.to_owned()).or_default(); + if transportish { + entry.transport_fault_count += 1; + } else { + entry.response_error_count += 1; + } + entry.total_latency_ms = entry + .total_latency_ms + .saturating_add(u128::from(latency_ms)); + entry.max_latency_ms = entry.max_latency_ms.max(latency_ms); + entry.last_latency_ms = Some(latency_ms); + entry.last_error = Some(fault.message().to_owned()); + } + + pub(crate) fn record_retry(&mut self, operation: &str) { + self.totals.retry_count += 1; + self.methods + .entry(operation.to_owned()) + .or_default() + .retry_count += 1; + } + + pub(crate) fn record_worker_restart(&mut self, generation: Generation) { + self.generation = generation; + self.restart_count += 1; + self.state = LifecycleState::Recovering; + } + + pub(crate) fn record_rollout(&mut self) { + self.host_rollouts += 1; + } + + pub(crate) fn host_rollouts(&self) -> u64 { + self.host_rollouts + } + + pub(crate) fn health_snapshot(&self, rollout: RolloutState) -> HealthSnapshot { + HealthSnapshot { + state: self.state, + generation: self.generation, + uptime_ms: self.uptime_ms(), + consecutive_failures: self.consecutive_failures, + restart_count: self.restart_count, + rollout: Some(rollout), + last_fault: self.last_fault.clone(), + } + } + + pub(crate) fn telemetry_snapshot(&self) -> TelemetrySnapshot { + TelemetrySnapshot { + uptime_ms: self.uptime_ms(), + state: self.state, + generation: self.generation, + consecutive_failures: self.consecutive_failures, + restart_count: self.restart_count, + totals: self.totals.clone(), + methods: self.ranked_methods(), + last_fault: self.last_fault.clone(), + } + } + + pub(crate) fn ranked_methods(&self) -> Vec<MethodTelemetry> { + let mut methods = self + .methods + .iter() + .map(|(method, stats)| MethodTelemetry { + method: method.clone(), + request_count: stats.request_count, + success_count: stats.success_count, + response_error_count: stats.response_error_count, + transport_fault_count: stats.transport_fault_count, + retry_count: stats.retry_count, + last_latency_ms: stats.last_latency_ms, + max_latency_ms: stats.max_latency_ms, + avg_latency_ms: average_latency_ms(stats), + last_error: stats.last_error.clone(), + }) + .collect::<Vec<_>>(); + methods.sort_by(|left, right| { + right + .request_count + .cmp(&left.request_count) + .then_with(|| right.transport_fault_count.cmp(&left.transport_fault_count)) + .then_with(|| right.response_error_count.cmp(&left.response_error_count)) + .then_with(|| left.method.cmp(&right.method)) + }); + methods + } + + fn uptime_ms(&self) -> u64 { + unix_ms_now().saturating_sub(self.started_unix_ms) + } +} + +fn average_latency_ms(stats: &MethodStats) -> u64 { + if stats.request_count == 0 { + return 0; + } + let average = stats.total_latency_ms / u128::from(stats.request_count); + u64::try_from(average).unwrap_or(u64::MAX) +} + +fn unix_ms_now() -> u64 { + let duration = SystemTime::now() + .duration_since(UNIX_EPOCH) + .unwrap_or(Duration::ZERO); + let millis = duration.as_millis(); + u64::try_from(millis).unwrap_or(u64::MAX) +} |