From c9337a12e5a64087be760460259bdc747e49a2d6 Mon Sep 17 00:00:00 2001 From: main Date: Fri, 20 Mar 2026 21:19:07 -0400 Subject: Bootstrap minimal issue MCP --- crates/jira-at-home/src/mcp/telemetry.rs | 228 +++++++++++++++++++++++++++++++ 1 file changed, 228 insertions(+) create mode 100644 crates/jira-at-home/src/mcp/telemetry.rs (limited to 'crates/jira-at-home/src/mcp/telemetry.rs') diff --git a/crates/jira-at-home/src/mcp/telemetry.rs b/crates/jira-at-home/src/mcp/telemetry.rs new file mode 100644 index 0000000..8df0009 --- /dev/null +++ b/crates/jira-at-home/src/mcp/telemetry.rs @@ -0,0 +1,228 @@ +use std::collections::BTreeMap; +use std::time::{Duration, SystemTime, UNIX_EPOCH}; + +use libmcp::{ + Fault, Generation, HealthSnapshot, LifecycleState, MethodTelemetry, RolloutState, + TelemetrySnapshot, TelemetryTotals, +}; +use serde::{Deserialize, Serialize}; + +use crate::mcp::fault::FaultRecord; + +#[derive(Clone, Debug, Default, Deserialize, Eq, PartialEq, Serialize)] +struct MethodStats { + request_count: u64, + success_count: u64, + response_error_count: u64, + transport_fault_count: u64, + retry_count: u64, + total_latency_ms: u128, + max_latency_ms: u64, + last_latency_ms: Option, + last_error: Option, +} + +#[derive(Clone, Debug, Deserialize, Eq, PartialEq, Serialize)] +pub(crate) struct ServerTelemetry { + started_unix_ms: u64, + state: LifecycleState, + generation: Generation, + consecutive_failures: u32, + restart_count: u64, + host_rollouts: u64, + totals: TelemetryTotals, + methods: BTreeMap, + last_fault: Option, +} + +impl Default for ServerTelemetry { + fn default() -> Self { + Self { + started_unix_ms: unix_ms_now(), + state: LifecycleState::Cold, + generation: Generation::genesis(), + consecutive_failures: 0, + restart_count: 0, + host_rollouts: 0, + totals: TelemetryTotals { + request_count: 0, + success_count: 0, + response_error_count: 0, + transport_fault_count: 0, + retry_count: 0, + }, + methods: BTreeMap::new(), + last_fault: None, + } + } +} + +impl ServerTelemetry { + pub(crate) fn record_request(&mut self, operation: &str) { + self.totals.request_count += 1; + self.methods + .entry(operation.to_owned()) + .or_default() + .request_count += 1; + } + + pub(crate) fn record_success( + &mut self, + operation: &str, + latency_ms: u64, + generation: Generation, + worker_alive: bool, + ) { + self.generation = generation; + self.state = if worker_alive { + LifecycleState::Ready + } else { + LifecycleState::Cold + }; + self.consecutive_failures = 0; + self.last_fault = None; + self.totals.success_count += 1; + let entry = self.methods.entry(operation.to_owned()).or_default(); + entry.success_count += 1; + entry.total_latency_ms = entry + .total_latency_ms + .saturating_add(u128::from(latency_ms)); + entry.max_latency_ms = entry.max_latency_ms.max(latency_ms); + entry.last_latency_ms = Some(latency_ms); + entry.last_error = None; + } + + pub(crate) fn record_error( + &mut self, + operation: &str, + fault: &FaultRecord, + latency_ms: u64, + generation: Generation, + ) { + self.generation = generation; + self.consecutive_failures = self.consecutive_failures.saturating_add(1); + self.last_fault = Some(fault.fault.clone()); + let transportish = matches!( + fault.fault.class, + libmcp::FaultClass::Transport + | libmcp::FaultClass::Process + | libmcp::FaultClass::Timeout + | libmcp::FaultClass::Resource + | libmcp::FaultClass::Replay + | libmcp::FaultClass::Rollout + ); + if transportish { + self.state = LifecycleState::Recovering; + self.totals.transport_fault_count += 1; + } else { + self.totals.response_error_count += 1; + } + let entry = self.methods.entry(operation.to_owned()).or_default(); + if transportish { + entry.transport_fault_count += 1; + } else { + entry.response_error_count += 1; + } + entry.total_latency_ms = entry + .total_latency_ms + .saturating_add(u128::from(latency_ms)); + entry.max_latency_ms = entry.max_latency_ms.max(latency_ms); + entry.last_latency_ms = Some(latency_ms); + entry.last_error = Some(fault.message().to_owned()); + } + + pub(crate) fn record_retry(&mut self, operation: &str) { + self.totals.retry_count += 1; + self.methods + .entry(operation.to_owned()) + .or_default() + .retry_count += 1; + } + + pub(crate) fn record_worker_restart(&mut self, generation: Generation) { + self.generation = generation; + self.restart_count += 1; + self.state = LifecycleState::Recovering; + } + + pub(crate) fn record_rollout(&mut self) { + self.host_rollouts += 1; + } + + pub(crate) fn host_rollouts(&self) -> u64 { + self.host_rollouts + } + + pub(crate) fn health_snapshot(&self, rollout: RolloutState) -> HealthSnapshot { + HealthSnapshot { + state: self.state, + generation: self.generation, + uptime_ms: self.uptime_ms(), + consecutive_failures: self.consecutive_failures, + restart_count: self.restart_count, + rollout: Some(rollout), + last_fault: self.last_fault.clone(), + } + } + + pub(crate) fn telemetry_snapshot(&self) -> TelemetrySnapshot { + TelemetrySnapshot { + uptime_ms: self.uptime_ms(), + state: self.state, + generation: self.generation, + consecutive_failures: self.consecutive_failures, + restart_count: self.restart_count, + totals: self.totals.clone(), + methods: self.ranked_methods(), + last_fault: self.last_fault.clone(), + } + } + + pub(crate) fn ranked_methods(&self) -> Vec { + let mut methods = self + .methods + .iter() + .map(|(method, stats)| MethodTelemetry { + method: method.clone(), + request_count: stats.request_count, + success_count: stats.success_count, + response_error_count: stats.response_error_count, + transport_fault_count: stats.transport_fault_count, + retry_count: stats.retry_count, + last_latency_ms: stats.last_latency_ms, + max_latency_ms: stats.max_latency_ms, + avg_latency_ms: average_latency_ms(stats), + last_error: stats.last_error.clone(), + }) + .collect::>(); + methods.sort_by(|left, right| { + right + .request_count + .cmp(&left.request_count) + .then_with(|| right.transport_fault_count.cmp(&left.transport_fault_count)) + .then_with(|| right.response_error_count.cmp(&left.response_error_count)) + .then_with(|| left.method.cmp(&right.method)) + }); + methods + } + + fn uptime_ms(&self) -> u64 { + unix_ms_now().saturating_sub(self.started_unix_ms) + } +} + +fn average_latency_ms(stats: &MethodStats) -> u64 { + if stats.request_count == 0 { + return 0; + } + let average = stats.total_latency_ms / u128::from(stats.request_count); + u64::try_from(average).unwrap_or(u64::MAX) +} + +fn unix_ms_now() -> u64 { + let duration = SystemTime::now() + .duration_since(UNIX_EPOCH) + .unwrap_or(Duration::ZERO); + let millis = duration.as_millis(); + u64::try_from(millis).unwrap_or(u64::MAX) +} -- cgit v1.2.3