swarm repositories / source
aboutsummaryrefslogtreecommitdiff
path: root/crates/jira-at-home/src/mcp
diff options
context:
space:
mode:
Diffstat (limited to 'crates/jira-at-home/src/mcp')
-rw-r--r--crates/jira-at-home/src/mcp/catalog.rs140
-rw-r--r--crates/jira-at-home/src/mcp/fault.rs214
-rw-r--r--crates/jira-at-home/src/mcp/host/binary.rs41
-rw-r--r--crates/jira-at-home/src/mcp/host/mod.rs3
-rw-r--r--crates/jira-at-home/src/mcp/host/process.rs262
-rw-r--r--crates/jira-at-home/src/mcp/host/runtime.rs952
-rw-r--r--crates/jira-at-home/src/mcp/mod.rs10
-rw-r--r--crates/jira-at-home/src/mcp/output.rs195
-rw-r--r--crates/jira-at-home/src/mcp/protocol.rs78
-rw-r--r--crates/jira-at-home/src/mcp/service.rs336
-rw-r--r--crates/jira-at-home/src/mcp/telemetry.rs228
11 files changed, 2459 insertions, 0 deletions
diff --git a/crates/jira-at-home/src/mcp/catalog.rs b/crates/jira-at-home/src/mcp/catalog.rs
new file mode 100644
index 0000000..2ff8e81
--- /dev/null
+++ b/crates/jira-at-home/src/mcp/catalog.rs
@@ -0,0 +1,140 @@
+use libmcp::ReplayContract;
+use serde_json::{Value, json};
+
+use crate::mcp::output::with_common_presentation;
+
+#[derive(Clone, Copy, Debug, Eq, PartialEq)]
+pub(crate) enum DispatchTarget {
+ Host,
+ Worker,
+}
+
+#[derive(Clone, Copy, Debug, Eq, PartialEq)]
+pub(crate) struct ToolSpec {
+ pub(crate) name: &'static str,
+ pub(crate) description: &'static str,
+ pub(crate) dispatch: DispatchTarget,
+ pub(crate) replay: ReplayContract,
+}
+
+impl ToolSpec {
+ fn annotation_json(self) -> Value {
+ json!({
+ "title": self.name,
+ "readOnlyHint": self.replay == ReplayContract::Convergent,
+ "destructiveHint": self.replay == ReplayContract::NeverReplay,
+ "jiraAtHome": {
+ "dispatch": match self.dispatch {
+ DispatchTarget::Host => "host",
+ DispatchTarget::Worker => "worker",
+ },
+ "replayContract": match self.replay {
+ ReplayContract::Convergent => "convergent",
+ ReplayContract::ProbeRequired => "probe_required",
+ ReplayContract::NeverReplay => "never_replay",
+ },
+ }
+ })
+ }
+}
+
+const TOOL_SPECS: &[ToolSpec] = &[
+ ToolSpec {
+ name: "project.bind",
+ description: "Bind this MCP session to a project root or a nested path inside one.",
+ dispatch: DispatchTarget::Host,
+ replay: ReplayContract::NeverReplay,
+ },
+ ToolSpec {
+ name: "issue.save",
+ description: "Create or overwrite one issue note at `issues/<slug>.md`.",
+ dispatch: DispatchTarget::Worker,
+ replay: ReplayContract::NeverReplay,
+ },
+ ToolSpec {
+ name: "issue.list",
+ description: "List the currently open issues. There is no close state; all existing issue files are open.",
+ dispatch: DispatchTarget::Worker,
+ replay: ReplayContract::Convergent,
+ },
+ ToolSpec {
+ name: "issue.read",
+ description: "Read one issue note by slug.",
+ dispatch: DispatchTarget::Worker,
+ replay: ReplayContract::Convergent,
+ },
+ ToolSpec {
+ name: "system.health",
+ description: "Read MCP host health, binding state, worker generation, and rollout state.",
+ dispatch: DispatchTarget::Host,
+ replay: ReplayContract::Convergent,
+ },
+ ToolSpec {
+ name: "system.telemetry",
+ description: "Read aggregate MCP host telemetry and top hot methods for this session.",
+ dispatch: DispatchTarget::Host,
+ replay: ReplayContract::Convergent,
+ },
+];
+
+pub(crate) fn tool_spec(name: &str) -> Option<ToolSpec> {
+ TOOL_SPECS.iter().copied().find(|spec| spec.name == name)
+}
+
+pub(crate) fn tool_definitions() -> Vec<Value> {
+ TOOL_SPECS
+ .iter()
+ .map(|spec| {
+ json!({
+ "name": spec.name,
+ "description": spec.description,
+ "inputSchema": tool_schema(spec.name),
+ "annotations": spec.annotation_json(),
+ })
+ })
+ .collect()
+}
+
+fn tool_schema(name: &str) -> Value {
+ match name {
+ "project.bind" => with_common_presentation(json!({
+ "type": "object",
+ "properties": {
+ "path": {
+ "type": "string",
+ "description": "Project root or any nested path inside the target project."
+ }
+ },
+ "required": ["path"]
+ })),
+ "issue.save" => with_common_presentation(json!({
+ "type": "object",
+ "properties": {
+ "slug": {
+ "type": "string",
+ "description": "Stable slug. Stored at `issues/<slug>.md`."
+ },
+ "body": {
+ "type": "string",
+ "description": "Freeform issue body. Markdown is fine."
+ }
+ },
+ "required": ["slug", "body"]
+ })),
+ "issue.list" | "system.health" | "system.telemetry" => with_common_presentation(json!({
+ "type": "object",
+ "properties": {}
+ })),
+ "issue.read" => with_common_presentation(json!({
+ "type": "object",
+ "properties": {
+ "slug": {
+ "type": "string",
+ "description": "Issue slug to read."
+ }
+ },
+ "required": ["slug"]
+ })),
+ _ => Value::Null,
+ }
+}
diff --git a/crates/jira-at-home/src/mcp/fault.rs b/crates/jira-at-home/src/mcp/fault.rs
new file mode 100644
index 0000000..5f71e6a
--- /dev/null
+++ b/crates/jira-at-home/src/mcp/fault.rs
@@ -0,0 +1,214 @@
+use libmcp::{Fault, FaultClass, FaultCode, Generation, RecoveryDirective, ToolErrorDetail};
+use serde::{Deserialize, Serialize};
+use serde_json::{Value, json};
+
+#[derive(Clone, Copy, Debug, Deserialize, Eq, PartialEq, Serialize)]
+pub(crate) enum FaultStage {
+ Host,
+ Worker,
+ Store,
+ Transport,
+ Protocol,
+ Rollout,
+}
+
+#[derive(Clone, Debug, Deserialize, Eq, PartialEq, Serialize)]
+pub(crate) struct FaultRecord {
+ pub(crate) fault: Fault,
+ pub(crate) stage: FaultStage,
+ pub(crate) operation: String,
+ pub(crate) jsonrpc_code: i64,
+ pub(crate) retryable: bool,
+ pub(crate) retried: bool,
+}
+
+impl FaultRecord {
+ pub(crate) fn invalid_input(
+ generation: Generation,
+ stage: FaultStage,
+ operation: impl Into<String>,
+ detail: impl Into<String>,
+ ) -> Self {
+ Self::new(
+ generation,
+ FaultClass::Protocol,
+ "invalid_input",
+ RecoveryDirective::AbortRequest,
+ stage,
+ operation,
+ detail,
+ -32602,
+ )
+ }
+
+ pub(crate) fn not_initialized(
+ generation: Generation,
+ stage: FaultStage,
+ operation: impl Into<String>,
+ detail: impl Into<String>,
+ ) -> Self {
+ Self::new(
+ generation,
+ FaultClass::Protocol,
+ "not_initialized",
+ RecoveryDirective::AbortRequest,
+ stage,
+ operation,
+ detail,
+ -32002,
+ )
+ }
+
+ pub(crate) fn unavailable(
+ generation: Generation,
+ stage: FaultStage,
+ operation: impl Into<String>,
+ detail: impl Into<String>,
+ ) -> Self {
+ Self::new(
+ generation,
+ FaultClass::Resource,
+ "unavailable",
+ RecoveryDirective::AbortRequest,
+ stage,
+ operation,
+ detail,
+ -32004,
+ )
+ }
+
+ pub(crate) fn transport(
+ generation: Generation,
+ stage: FaultStage,
+ operation: impl Into<String>,
+ detail: impl Into<String>,
+ ) -> Self {
+ Self::new(
+ generation,
+ FaultClass::Transport,
+ "transport_failure",
+ RecoveryDirective::RestartAndReplay,
+ stage,
+ operation,
+ detail,
+ -32603,
+ )
+ }
+
+ pub(crate) fn process(
+ generation: Generation,
+ stage: FaultStage,
+ operation: impl Into<String>,
+ detail: impl Into<String>,
+ ) -> Self {
+ Self::new(
+ generation,
+ FaultClass::Process,
+ "process_failure",
+ RecoveryDirective::RestartAndReplay,
+ stage,
+ operation,
+ detail,
+ -32603,
+ )
+ }
+
+ pub(crate) fn internal(
+ generation: Generation,
+ stage: FaultStage,
+ operation: impl Into<String>,
+ detail: impl Into<String>,
+ ) -> Self {
+ Self::new(
+ generation,
+ FaultClass::Invariant,
+ "internal_failure",
+ RecoveryDirective::AbortRequest,
+ stage,
+ operation,
+ detail,
+ -32603,
+ )
+ }
+
+ pub(crate) fn rollout(
+ generation: Generation,
+ operation: impl Into<String>,
+ detail: impl Into<String>,
+ ) -> Self {
+ Self::new(
+ generation,
+ FaultClass::Rollout,
+ "rollout_failure",
+ RecoveryDirective::RestartAndReplay,
+ FaultStage::Rollout,
+ operation,
+ detail,
+ -32603,
+ )
+ }
+
+ pub(crate) fn mark_retried(mut self) -> Self {
+ self.retried = true;
+ self
+ }
+
+ pub(crate) fn message(&self) -> &str {
+ self.fault.detail.as_str()
+ }
+
+ pub(crate) fn error_detail(&self) -> ToolErrorDetail {
+ ToolErrorDetail {
+ code: Some(self.jsonrpc_code),
+ kind: Some(self.fault.code.as_str().to_owned()),
+ message: Some(self.message().to_owned()),
+ }
+ }
+
+ pub(crate) fn into_jsonrpc_error(self) -> Value {
+ json!({
+ "code": self.jsonrpc_code,
+ "message": self.message(),
+ "data": self,
+ })
+ }
+
+ pub(crate) fn into_tool_result(self) -> Value {
+ json!({
+ "content": [{
+ "type": "text",
+ "text": self.message(),
+ }],
+ "structuredContent": self,
+ "isError": true,
+ })
+ }
+
+ fn new(
+ generation: Generation,
+ class: FaultClass,
+ code: &'static str,
+ directive: RecoveryDirective,
+ stage: FaultStage,
+ operation: impl Into<String>,
+ detail: impl Into<String>,
+ jsonrpc_code: i64,
+ ) -> Self {
+ let fault = Fault::new(generation, class, fault_code(code), directive, detail);
+ Self {
+ retryable: directive != RecoveryDirective::AbortRequest,
+ fault,
+ stage,
+ operation: operation.into(),
+ jsonrpc_code,
+ retried: false,
+ }
+ }
+}
+
+fn fault_code(code: &'static str) -> FaultCode {
+ match FaultCode::try_new(code.to_owned()) {
+ Ok(value) => value,
+ Err(_) => std::process::abort(),
+ }
+}
diff --git a/crates/jira-at-home/src/mcp/host/binary.rs b/crates/jira-at-home/src/mcp/host/binary.rs
new file mode 100644
index 0000000..9ec7721
--- /dev/null
+++ b/crates/jira-at-home/src/mcp/host/binary.rs
@@ -0,0 +1,41 @@
+use std::fs;
+use std::io;
+use std::path::{Path, PathBuf};
+
+use crate::mcp::protocol::BinaryFingerprint;
+
+pub(crate) struct BinaryRuntime {
+ pub(crate) path: PathBuf,
+ startup_fingerprint: BinaryFingerprint,
+ pub(crate) launch_path_stable: bool,
+}
+
+impl BinaryRuntime {
+ pub(crate) fn new(path: PathBuf) -> io::Result<Self> {
+ let startup_fingerprint = fingerprint_binary(&path)?;
+ Ok(Self {
+ launch_path_stable: !path
+ .components()
+ .any(|component| component.as_os_str().to_string_lossy() == "target"),
+ path,
+ startup_fingerprint,
+ })
+ }
+
+ pub(crate) fn rollout_pending(&self) -> io::Result<bool> {
+ Ok(fingerprint_binary(&self.path)? != self.startup_fingerprint)
+ }
+}
+
+fn fingerprint_binary(path: &Path) -> io::Result<BinaryFingerprint> {
+ let metadata = fs::metadata(path)?;
+ let modified_unix_nanos = metadata
+ .modified()?
+ .duration_since(std::time::UNIX_EPOCH)
+ .map_err(|error| io::Error::other(format!("invalid binary mtime: {error}")))?
+ .as_nanos();
+ Ok(BinaryFingerprint {
+ length_bytes: metadata.len(),
+ modified_unix_nanos,
+ })
+}
diff --git a/crates/jira-at-home/src/mcp/host/mod.rs b/crates/jira-at-home/src/mcp/host/mod.rs
new file mode 100644
index 0000000..29cdcb1
--- /dev/null
+++ b/crates/jira-at-home/src/mcp/host/mod.rs
@@ -0,0 +1,3 @@
+pub(crate) mod binary;
+pub(crate) mod process;
+pub(crate) mod runtime;
diff --git a/crates/jira-at-home/src/mcp/host/process.rs b/crates/jira-at-home/src/mcp/host/process.rs
new file mode 100644
index 0000000..492be55
--- /dev/null
+++ b/crates/jira-at-home/src/mcp/host/process.rs
@@ -0,0 +1,262 @@
+use std::io::{BufRead, BufReader, BufWriter, Write};
+use std::path::PathBuf;
+use std::process::{Child, ChildStdin, ChildStdout, Command, Stdio};
+
+use libmcp::Generation;
+use serde::{Deserialize, Serialize};
+use serde_json::Value;
+
+use crate::mcp::fault::{FaultRecord, FaultStage};
+use crate::mcp::protocol::{
+ HostRequestId, WorkerOperation, WorkerOutcome, WorkerRequest, WorkerResponse, WorkerSpawnConfig,
+};
+
+#[derive(Clone, Debug, Deserialize, Eq, PartialEq, Serialize)]
+pub(super) struct ProjectBinding {
+ pub(super) requested_path: PathBuf,
+ pub(super) project_root: PathBuf,
+ pub(super) issues_root: PathBuf,
+ pub(super) state_root: PathBuf,
+}
+
+pub(super) struct WorkerSupervisor {
+ config: WorkerSpawnConfig,
+ generation: Generation,
+ has_spawned: bool,
+ crash_before_reply_once: bool,
+ bound_project_root: Option<PathBuf>,
+ child: Option<Child>,
+ stdin: Option<BufWriter<ChildStdin>>,
+ stdout: Option<BufReader<ChildStdout>>,
+}
+
+impl WorkerSupervisor {
+ pub(super) fn new(
+ config: WorkerSpawnConfig,
+ generation: Generation,
+ has_spawned: bool,
+ ) -> Self {
+ Self {
+ config,
+ generation,
+ has_spawned,
+ crash_before_reply_once: false,
+ bound_project_root: None,
+ child: None,
+ stdin: None,
+ stdout: None,
+ }
+ }
+
+ pub(super) fn generation(&self) -> Generation {
+ self.generation
+ }
+
+ pub(super) fn has_spawned(&self) -> bool {
+ self.has_spawned
+ }
+
+ pub(super) fn rebind(&mut self, project_root: PathBuf) {
+ if self
+ .bound_project_root
+ .as_ref()
+ .is_some_and(|current| current == &project_root)
+ {
+ return;
+ }
+ self.kill_current_worker();
+ self.bound_project_root = Some(project_root);
+ }
+
+ pub(super) fn refresh_binding(&mut self, project_root: PathBuf) {
+ self.kill_current_worker();
+ self.bound_project_root = Some(project_root);
+ }
+
+ pub(super) fn execute(
+ &mut self,
+ request_id: HostRequestId,
+ operation: WorkerOperation,
+ ) -> Result<Value, FaultRecord> {
+ self.ensure_worker()?;
+ let request = WorkerRequest::Execute {
+ id: request_id,
+ operation,
+ };
+ let stdin = self.stdin.as_mut().ok_or_else(|| {
+ FaultRecord::transport(
+ self.generation,
+ FaultStage::Transport,
+ "worker.stdin",
+ "worker stdin is not available",
+ )
+ })?;
+ serde_json::to_writer(&mut *stdin, &request).map_err(|error| {
+ FaultRecord::transport(
+ self.generation,
+ FaultStage::Transport,
+ "worker.write",
+ format!("failed to encode worker request: {error}"),
+ )
+ })?;
+ stdin.write_all(b"\n").map_err(|error| {
+ FaultRecord::transport(
+ self.generation,
+ FaultStage::Transport,
+ "worker.write",
+ format!("failed to frame worker request: {error}"),
+ )
+ })?;
+ stdin.flush().map_err(|error| {
+ FaultRecord::transport(
+ self.generation,
+ FaultStage::Transport,
+ "worker.write",
+ format!("failed to flush worker request: {error}"),
+ )
+ })?;
+
+ if self.crash_before_reply_once {
+ self.crash_before_reply_once = false;
+ self.kill_current_worker();
+ return Err(FaultRecord::transport(
+ self.generation,
+ FaultStage::Transport,
+ "worker.read",
+ "worker crashed before replying",
+ ));
+ }
+
+ let stdout = self.stdout.as_mut().ok_or_else(|| {
+ FaultRecord::transport(
+ self.generation,
+ FaultStage::Transport,
+ "worker.stdout",
+ "worker stdout is not available",
+ )
+ })?;
+ let mut line = String::new();
+ let bytes = stdout.read_line(&mut line).map_err(|error| {
+ FaultRecord::transport(
+ self.generation,
+ FaultStage::Transport,
+ "worker.read",
+ format!("failed to read worker response: {error}"),
+ )
+ })?;
+ if bytes == 0 {
+ self.kill_current_worker();
+ return Err(FaultRecord::transport(
+ self.generation,
+ FaultStage::Transport,
+ "worker.read",
+ "worker exited before replying",
+ ));
+ }
+ let response = serde_json::from_str::<WorkerResponse>(&line).map_err(|error| {
+ FaultRecord::transport(
+ self.generation,
+ FaultStage::Transport,
+ "worker.read",
+ format!("invalid worker response: {error}"),
+ )
+ })?;
+ match response.outcome {
+ WorkerOutcome::Success { result } => Ok(result),
+ WorkerOutcome::Fault { fault } => Err(fault),
+ }
+ }
+
+ pub(super) fn restart(&mut self) -> Result<(), FaultRecord> {
+ self.kill_current_worker();
+ self.ensure_worker()
+ }
+
+ pub(super) fn is_alive(&mut self) -> bool {
+ let Some(child) = self.child.as_mut() else {
+ return false;
+ };
+ if let Ok(None) = child.try_wait() {
+ true
+ } else {
+ self.child = None;
+ self.stdin = None;
+ self.stdout = None;
+ false
+ }
+ }
+
+ pub(super) fn arm_crash_once(&mut self) {
+ self.crash_before_reply_once = true;
+ }
+
+ fn ensure_worker(&mut self) -> Result<(), FaultRecord> {
+ if self.is_alive() {
+ return Ok(());
+ }
+ let Some(project_root) = self.bound_project_root.as_ref() else {
+ return Err(FaultRecord::unavailable(
+ self.generation,
+ FaultStage::Host,
+ "worker.spawn",
+ "project is not bound; call project.bind before using issue tools",
+ ));
+ };
+ let generation = if self.has_spawned {
+ self.generation.next()
+ } else {
+ self.generation
+ };
+ let mut child = Command::new(&self.config.executable)
+ .arg("mcp")
+ .arg("worker")
+ .arg("--project")
+ .arg(project_root)
+ .arg("--generation")
+ .arg(generation.get().to_string())
+ .stdin(Stdio::piped())
+ .stdout(Stdio::piped())
+ .stderr(Stdio::inherit())
+ .spawn()
+ .map_err(|error| {
+ FaultRecord::process(
+ generation,
+ FaultStage::Transport,
+ "worker.spawn",
+ format!("failed to spawn worker: {error}"),
+ )
+ })?;
+ let stdin = child.stdin.take().ok_or_else(|| {
+ FaultRecord::internal(
+ generation,
+ FaultStage::Transport,
+ "worker.spawn",
+ "worker stdin pipe was not created",
+ )
+ })?;
+ let stdout = child.stdout.take().ok_or_else(|| {
+ FaultRecord::internal(
+ generation,
+ FaultStage::Transport,
+ "worker.spawn",
+ "worker stdout pipe was not created",
+ )
+ })?;
+ self.generation = generation;
+ self.has_spawned = true;
+ self.child = Some(child);
+ self.stdin = Some(BufWriter::new(stdin));
+ self.stdout = Some(BufReader::new(stdout));
+ Ok(())
+ }
+
+ fn kill_current_worker(&mut self) {
+ if let Some(child) = self.child.as_mut() {
+ let _ = child.kill();
+ let _ = child.wait();
+ }
+ self.child = None;
+ self.stdin = None;
+ self.stdout = None;
+ }
+}
diff --git a/crates/jira-at-home/src/mcp/host/runtime.rs b/crates/jira-at-home/src/mcp/host/runtime.rs
new file mode 100644
index 0000000..b2049a0
--- /dev/null
+++ b/crates/jira-at-home/src/mcp/host/runtime.rs
@@ -0,0 +1,952 @@
+use std::io::{self, BufRead, Write};
+#[cfg(unix)]
+use std::os::unix::process::CommandExt;
+use std::path::PathBuf;
+use std::process::Command;
+use std::time::Instant;
+
+use libmcp::{
+ FramedMessage, Generation, HostSessionKernel, ReplayContract, RequestId, RolloutState,
+ TelemetryLog, ToolOutcome, load_snapshot_file_from_env, remove_snapshot_file,
+ write_snapshot_file,
+};
+use serde::Serialize;
+use serde_json::{Map, Value, json};
+
+use crate::mcp::catalog::{DispatchTarget, tool_definitions, tool_spec};
+use crate::mcp::fault::{FaultRecord, FaultStage};
+use crate::mcp::host::binary::BinaryRuntime;
+use crate::mcp::host::process::{ProjectBinding, WorkerSupervisor};
+use crate::mcp::output::{
+ ToolOutput, fallback_detailed_tool_output, split_presentation, tool_success,
+};
+use crate::mcp::protocol::{
+ CRASH_ONCE_ENV, FORCE_ROLLOUT_ENV, HOST_STATE_ENV, HostRequestId, HostStateSeed,
+ PROTOCOL_VERSION, ProjectBindingSeed, SERVER_NAME, WorkerOperation, WorkerSpawnConfig,
+};
+use crate::mcp::telemetry::ServerTelemetry;
+use crate::store::IssueStore;
+
+pub(crate) fn run_host(initial_project: Option<PathBuf>) -> Result<(), Box<dyn std::error::Error>> {
+ let stdin = io::stdin();
+ let mut stdout = io::stdout().lock();
+ let mut host = HostRuntime::new(initial_project)?;
+
+ for line in stdin.lock().lines() {
+ let line = line?;
+ if line.trim().is_empty() {
+ continue;
+ }
+
+ let maybe_response = host.handle_line(&line);
+ if let Some(response) = maybe_response {
+ write_message(&mut stdout, &response)?;
+ }
+ host.maybe_roll_forward()?;
+ }
+
+ Ok(())
+}
+
+struct HostRuntime {
+ initial_project: Option<PathBuf>,
+ binding: Option<ProjectBinding>,
+ session_kernel: HostSessionKernel,
+ telemetry: ServerTelemetry,
+ telemetry_log: Option<TelemetryLog>,
+ next_request_id: u64,
+ worker: WorkerSupervisor,
+ binary: BinaryRuntime,
+ force_rollout_key: Option<String>,
+ force_rollout_consumed: bool,
+ rollout_requested: bool,
+ crash_once_key: Option<String>,
+ crash_once_consumed: bool,
+}
+
+impl HostRuntime {
+ fn new(initial_project: Option<PathBuf>) -> Result<Self, Box<dyn std::error::Error>> {
+ let executable = std::env::current_exe()?;
+ let binary = BinaryRuntime::new(executable.clone())?;
+ let restored = restore_host_state()?;
+ let session_kernel = restored
+ .as_ref()
+ .map(|seed| seed.session_kernel.clone().restore())
+ .transpose()?
+ .map_or_else(HostSessionKernel::cold, HostSessionKernel::from_restored);
+ let telemetry = restored
+ .as_ref()
+ .map_or_else(ServerTelemetry::default, |seed| seed.telemetry.clone());
+ let next_request_id = restored
+ .as_ref()
+ .map_or(1, |seed| seed.next_request_id.max(1));
+ let worker_generation = restored
+ .as_ref()
+ .map_or(Generation::genesis(), |seed| seed.worker_generation);
+ let worker_spawned = restored.as_ref().is_some_and(|seed| seed.worker_spawned);
+ let force_rollout_consumed = restored
+ .as_ref()
+ .is_some_and(|seed| seed.force_rollout_consumed);
+ let crash_once_consumed = restored
+ .as_ref()
+ .is_some_and(|seed| seed.crash_once_consumed);
+ let binding = if let Some(seed) = restored.as_ref().and_then(|seed| seed.binding.clone()) {
+ Some(restore_binding(seed)?)
+ } else if let Some(path) = initial_project.clone() {
+ Some(resolve_project_binding(path)?.binding)
+ } else {
+ None
+ };
+ let telemetry_log = binding.as_ref().map(open_telemetry_log).transpose()?;
+
+ let mut worker = WorkerSupervisor::new(
+ WorkerSpawnConfig {
+ executable: executable.clone(),
+ },
+ worker_generation,
+ worker_spawned,
+ );
+ if let Some(project_root) = binding.as_ref().map(|binding| binding.project_root.clone()) {
+ worker.rebind(project_root);
+ }
+
+ Ok(Self {
+ initial_project,
+ binding,
+ session_kernel,
+ telemetry,
+ telemetry_log,
+ next_request_id,
+ worker,
+ binary,
+ force_rollout_key: std::env::var(FORCE_ROLLOUT_ENV).ok(),
+ force_rollout_consumed,
+ rollout_requested: false,
+ crash_once_key: std::env::var(CRASH_ONCE_ENV).ok(),
+ crash_once_consumed,
+ })
+ }
+
+ fn handle_line(&mut self, line: &str) -> Option<Value> {
+ let frame = match FramedMessage::parse(line.as_bytes().to_vec()) {
+ Ok(frame) => frame,
+ Err(error) => {
+ return Some(jsonrpc_error(
+ Value::Null,
+ FaultRecord::invalid_input(
+ self.worker.generation(),
+ FaultStage::Protocol,
+ "jsonrpc.parse",
+ format!("parse error: {error}"),
+ ),
+ ));
+ }
+ };
+ self.handle_frame(frame)
+ }
+
+ fn handle_frame(&mut self, frame: FramedMessage) -> Option<Value> {
+ self.session_kernel.observe_client_frame(&frame);
+ let Some(object) = frame.value.as_object() else {
+ return Some(jsonrpc_error(
+ Value::Null,
+ FaultRecord::invalid_input(
+ self.worker.generation(),
+ FaultStage::Protocol,
+ "jsonrpc.message",
+ "invalid request: expected JSON object",
+ ),
+ ));
+ };
+ let method = object.get("method").and_then(Value::as_str)?;
+ let id = object.get("id").cloned();
+ let params = object.get("params").cloned().unwrap_or_else(|| json!({}));
+ let operation_key = operation_key(method, &params);
+ let started_at = Instant::now();
+
+ self.telemetry.record_request(&operation_key);
+ let response = match self.dispatch(&frame, method, params, id.clone()) {
+ Ok(Some(result)) => {
+ let latency_ms = elapsed_ms(started_at.elapsed());
+ self.telemetry.record_success(
+ &operation_key,
+ latency_ms,
+ self.worker.generation(),
+ self.worker.is_alive(),
+ );
+ id.map(|id| jsonrpc_result(id, result))
+ }
+ Ok(None) => {
+ let latency_ms = elapsed_ms(started_at.elapsed());
+ self.telemetry.record_success(
+ &operation_key,
+ latency_ms,
+ self.worker.generation(),
+ self.worker.is_alive(),
+ );
+ None
+ }
+ Err(fault) => {
+ let latency_ms = elapsed_ms(started_at.elapsed());
+ self.telemetry.record_error(
+ &operation_key,
+ &fault,
+ latency_ms,
+ self.worker.generation(),
+ );
+ Some(match id {
+ Some(id) if method == "tools/call" => {
+ jsonrpc_result(id, fault.into_tool_result())
+ }
+ Some(id) => jsonrpc_error(id, fault),
+ None => jsonrpc_error(Value::Null, fault),
+ })
+ }
+ };
+
+ if self.should_force_rollout(&operation_key) {
+ self.force_rollout_consumed = true;
+ self.telemetry.record_rollout();
+ self.rollout_requested = true;
+ }
+
+ response
+ }
+
+ fn dispatch(
+ &mut self,
+ request_frame: &FramedMessage,
+ method: &str,
+ params: Value,
+ request_id: Option<Value>,
+ ) -> Result<Option<Value>, FaultRecord> {
+ match method {
+ "initialize" => Ok(Some(json!({
+ "protocolVersion": PROTOCOL_VERSION,
+ "capabilities": {
+ "tools": { "listChanged": false }
+ },
+ "serverInfo": {
+ "name": SERVER_NAME,
+ "version": env!("CARGO_PKG_VERSION")
+ },
+ "instructions": "Bind the session with project.bind, then use issue.save to park ideas in issues/<slug>.md. issue.list enumerates every existing issue file because there is no closed state."
+ }))),
+ "notifications/initialized" => {
+ if !self.seed_captured() {
+ return Err(FaultRecord::not_initialized(
+ self.worker.generation(),
+ FaultStage::Host,
+ "notifications/initialized",
+ "received initialized notification before initialize",
+ ));
+ }
+ Ok(None)
+ }
+ "notifications/cancelled" => Ok(None),
+ "ping" => Ok(Some(json!({}))),
+ other => {
+ self.require_initialized(other)?;
+ match other {
+ "tools/list" => Ok(Some(json!({ "tools": tool_definitions() }))),
+ "tools/call" => Ok(Some(self.dispatch_tool_call(
+ request_frame,
+ params,
+ request_id,
+ )?)),
+ _ => Err(FaultRecord::invalid_input(
+ self.worker.generation(),
+ FaultStage::Protocol,
+ other,
+ format!("method `{other}` is not implemented"),
+ )),
+ }
+ }
+ }
+ }
+
+ fn dispatch_tool_call(
+ &mut self,
+ request_frame: &FramedMessage,
+ params: Value,
+ _request_id: Option<Value>,
+ ) -> Result<Value, FaultRecord> {
+ let envelope =
+ deserialize::<ToolCallEnvelope>(params, "tools/call", self.worker.generation())?;
+ let spec = tool_spec(&envelope.name).ok_or_else(|| {
+ FaultRecord::invalid_input(
+ self.worker.generation(),
+ FaultStage::Host,
+ format!("tools/call:{}", envelope.name),
+ format!("unknown tool `{}`", envelope.name),
+ )
+ })?;
+ match spec.dispatch {
+ DispatchTarget::Host => {
+ let started_at = Instant::now();
+ let request_id = request_id_from_frame(request_frame);
+ let result = self.handle_host_tool(&envelope.name, envelope.arguments);
+ self.record_host_tool_completion(
+ request_frame,
+ request_id.as_ref(),
+ elapsed_ms(started_at.elapsed()),
+ result.as_ref().err(),
+ );
+ result
+ }
+ DispatchTarget::Worker => {
+ self.dispatch_worker_tool(request_frame, spec, envelope.arguments)
+ }
+ }
+ }
+
+ fn dispatch_worker_tool(
+ &mut self,
+ request_frame: &FramedMessage,
+ spec: crate::mcp::catalog::ToolSpec,
+ arguments: Value,
+ ) -> Result<Value, FaultRecord> {
+ let operation = format!("tools/call:{}", spec.name);
+ self.dispatch_worker_operation(
+ request_frame,
+ operation,
+ spec.replay,
+ WorkerOperation::CallTool {
+ name: spec.name.to_owned(),
+ arguments,
+ },
+ )
+ }
+
+ fn dispatch_worker_operation(
+ &mut self,
+ request_frame: &FramedMessage,
+ operation: String,
+ replay: ReplayContract,
+ worker_operation: WorkerOperation,
+ ) -> Result<Value, FaultRecord> {
+ let binding = self.require_bound_project(&operation)?;
+ self.worker.rebind(binding.project_root.clone());
+
+ if self.should_crash_worker_once(&operation) {
+ self.worker.arm_crash_once();
+ }
+
+ self.session_kernel
+ .record_forwarded_request(request_frame, replay);
+ let forwarded_request_id = request_id_from_frame(request_frame);
+ let host_request_id = self.allocate_request_id();
+ let started_at = Instant::now();
+ let mut replay_attempts = 0;
+
+ let outcome = match self
+ .worker
+ .execute(host_request_id, worker_operation.clone())
+ {
+ Ok(result) => Ok(result),
+ Err(fault) => {
+ if replay == ReplayContract::Convergent && fault.retryable {
+ replay_attempts = 1;
+ self.telemetry.record_retry(&operation);
+ self.worker
+ .restart()
+ .map_err(|restart_fault| restart_fault.mark_retried())?;
+ self.telemetry
+ .record_worker_restart(self.worker.generation());
+ self.worker
+ .execute(host_request_id, worker_operation)
+ .map_err(FaultRecord::mark_retried)
+ } else {
+ Err(fault)
+ }
+ }
+ };
+
+ let completed = forwarded_request_id
+ .as_ref()
+ .and_then(|request_id| self.session_kernel.take_completed_request(request_id));
+ self.record_worker_tool_completion(
+ forwarded_request_id.as_ref(),
+ completed.as_ref(),
+ elapsed_ms(started_at.elapsed()),
+ replay_attempts,
+ outcome.as_ref().err(),
+ );
+ outcome
+ }
+
+ fn handle_host_tool(&mut self, name: &str, arguments: Value) -> Result<Value, FaultRecord> {
+ let operation = format!("tools/call:{name}");
+ let generation = self.worker.generation();
+ let (presentation, arguments) =
+ split_presentation(arguments, &operation, generation, FaultStage::Host)?;
+ match name {
+ "project.bind" => {
+ let args = deserialize::<ProjectBindArgs>(
+ arguments,
+ "tools/call:project.bind",
+ generation,
+ )?;
+ let resolved =
+ resolve_project_binding(PathBuf::from(args.path)).map_err(|error| {
+ FaultRecord::invalid_input(
+ generation,
+ FaultStage::Host,
+ "tools/call:project.bind",
+ error.to_string(),
+ )
+ })?;
+ self.worker
+ .refresh_binding(resolved.binding.project_root.clone());
+ self.telemetry_log =
+ Some(open_telemetry_log(&resolved.binding).map_err(|error| {
+ FaultRecord::internal(
+ generation,
+ FaultStage::Host,
+ "tools/call:project.bind",
+ error.to_string(),
+ )
+ })?);
+ self.binding = Some(resolved.binding);
+ tool_success(
+ project_bind_output(&resolved.status, generation)?,
+ presentation,
+ generation,
+ FaultStage::Host,
+ "tools/call:project.bind",
+ )
+ }
+ "system.health" => {
+ let rollout = if self.binary.rollout_pending().map_err(|error| {
+ FaultRecord::rollout(generation, &operation, error.to_string())
+ })? {
+ RolloutState::Pending
+ } else {
+ RolloutState::Stable
+ };
+ let health = self.telemetry.health_snapshot(rollout);
+ tool_success(
+ system_health_output(
+ &health,
+ self.binding.as_ref(),
+ self.worker.is_alive(),
+ self.binary.launch_path_stable,
+ generation,
+ )?,
+ presentation,
+ generation,
+ FaultStage::Host,
+ &operation,
+ )
+ }
+ "system.telemetry" => {
+ let snapshot = self.telemetry.telemetry_snapshot();
+ tool_success(
+ system_telemetry_output(&snapshot, self.telemetry.host_rollouts(), generation)?,
+ presentation,
+ generation,
+ FaultStage::Host,
+ &operation,
+ )
+ }
+ other => Err(FaultRecord::invalid_input(
+ generation,
+ FaultStage::Host,
+ format!("tools/call:{other}"),
+ format!("unknown host tool `{other}`"),
+ )),
+ }
+ }
+
+ fn require_initialized(&self, operation: &str) -> Result<(), FaultRecord> {
+ if self.session_initialized() {
+ return Ok(());
+ }
+ Err(FaultRecord::not_initialized(
+ self.worker.generation(),
+ FaultStage::Host,
+ operation,
+ "client must call initialize and notifications/initialized before normal operations",
+ ))
+ }
+
+ fn require_bound_project(&self, operation: &str) -> Result<&ProjectBinding, FaultRecord> {
+ self.binding.as_ref().ok_or_else(|| {
+ FaultRecord::unavailable(
+ self.worker.generation(),
+ FaultStage::Host,
+ operation,
+ "project is not bound; call project.bind with the target project root or a nested path inside it",
+ )
+ })
+ }
+
+ fn session_initialized(&self) -> bool {
+ self.session_kernel
+ .initialization_seed()
+ .is_some_and(|seed| seed.initialized_notification.is_some())
+ }
+
+ fn seed_captured(&self) -> bool {
+ self.session_kernel.initialization_seed().is_some()
+ }
+
+ fn allocate_request_id(&mut self) -> HostRequestId {
+ let id = HostRequestId(self.next_request_id);
+ self.next_request_id += 1;
+ id
+ }
+
+ fn maybe_roll_forward(&mut self) -> Result<(), Box<dyn std::error::Error>> {
+ let binary_pending = self.binary.rollout_pending()?;
+ if !self.rollout_requested && !binary_pending {
+ return Ok(());
+ }
+ if binary_pending && !self.rollout_requested {
+ self.telemetry.record_rollout();
+ }
+ self.roll_forward()
+ }
+
+ fn roll_forward(&mut self) -> Result<(), Box<dyn std::error::Error>> {
+ let state = HostStateSeed {
+ session_kernel: self.session_kernel.snapshot(),
+ telemetry: self.telemetry.clone(),
+ next_request_id: self.next_request_id,
+ binding: self.binding.as_ref().map(ProjectBindingSeed::from),
+ worker_generation: self.worker.generation(),
+ worker_spawned: self.worker.has_spawned(),
+ force_rollout_consumed: self.force_rollout_consumed,
+ crash_once_consumed: self.crash_once_consumed,
+ };
+ let state_path = write_snapshot_file("jira-at-home-mcp-host-reexec", &state)?;
+ let mut command = Command::new(&self.binary.path);
+ let _ = command.arg("mcp").arg("serve");
+ if let Some(project) = self.initial_project.as_ref() {
+ let _ = command.arg("--project").arg(project);
+ }
+ let _ = command.env(HOST_STATE_ENV, &state_path);
+ #[cfg(unix)]
+ {
+ let error = command.exec();
+ let _ = remove_snapshot_file(&state_path);
+ Err(Box::new(error))
+ }
+ #[cfg(not(unix))]
+ {
+ let _ = remove_snapshot_file(&state_path);
+ Err(Box::new(io::Error::new(
+ io::ErrorKind::Unsupported,
+ "host rollout requires unix exec support",
+ )))
+ }
+ }
+
+ fn should_force_rollout(&self, operation: &str) -> bool {
+ self.force_rollout_key
+ .as_deref()
+ .is_some_and(|key| key == operation)
+ && !self.force_rollout_consumed
+ }
+
+ fn should_crash_worker_once(&mut self, operation: &str) -> bool {
+ let should_crash = self
+ .crash_once_key
+ .as_deref()
+ .is_some_and(|key| key == operation)
+ && !self.crash_once_consumed;
+ if should_crash {
+ self.crash_once_consumed = true;
+ }
+ should_crash
+ }
+
+ fn record_host_tool_completion(
+ &mut self,
+ request_frame: &FramedMessage,
+ request_id: Option<&RequestId>,
+ latency_ms: u64,
+ fault: Option<&FaultRecord>,
+ ) {
+ let Some(request_id) = request_id else {
+ return;
+ };
+ let Some(tool_meta) = libmcp::parse_tool_call_meta(request_frame, "tools/call") else {
+ return;
+ };
+ self.record_tool_completion(request_id, &tool_meta, latency_ms, 0, fault);
+ }
+
+ fn record_worker_tool_completion(
+ &mut self,
+ request_id: Option<&RequestId>,
+ completed: Option<&libmcp::CompletedPendingRequest>,
+ latency_ms: u64,
+ replay_attempts: u8,
+ fault: Option<&FaultRecord>,
+ ) {
+ let Some(request_id) = request_id else {
+ return;
+ };
+ let Some(completed) = completed else {
+ return;
+ };
+ let Some(tool_meta) = completed.request.tool_call_meta.as_ref() else {
+ return;
+ };
+ self.record_tool_completion(request_id, tool_meta, latency_ms, replay_attempts, fault);
+ }
+
+ fn record_tool_completion(
+ &mut self,
+ request_id: &RequestId,
+ tool_meta: &libmcp::ToolCallMeta,
+ latency_ms: u64,
+ replay_attempts: u8,
+ fault: Option<&FaultRecord>,
+ ) {
+ let Some(log) = self.telemetry_log.as_mut() else {
+ return;
+ };
+ let result = log.record_tool_completion(
+ request_id,
+ tool_meta,
+ latency_ms,
+ replay_attempts,
+ if fault.is_some() {
+ ToolOutcome::Error
+ } else {
+ ToolOutcome::Ok
+ },
+ fault.map_or_else(libmcp::ToolErrorDetail::default, FaultRecord::error_detail),
+ );
+ if let Err(error) = result {
+ eprintln!("jira_at_home telemetry write failed: {error}");
+ }
+ }
+}
+
+struct ResolvedProjectBinding {
+ binding: ProjectBinding,
+ status: ProjectBindStatus,
+}
+
+#[derive(Debug, Serialize)]
+struct ProjectBindStatus {
+ requested_path: String,
+ project_root: String,
+ issues_root: String,
+ state_root: String,
+ issue_count: usize,
+}
+
+fn resolve_project_binding(
+ requested_path: PathBuf,
+) -> Result<ResolvedProjectBinding, Box<dyn std::error::Error>> {
+ let store = IssueStore::bind(requested_path.clone())?;
+ let layout = store.layout().clone();
+ let status = store.status()?;
+ Ok(ResolvedProjectBinding {
+ binding: ProjectBinding {
+ requested_path: requested_path.clone(),
+ project_root: layout.project_root.clone(),
+ issues_root: layout.issues_root.clone(),
+ state_root: layout.state_root.clone(),
+ },
+ status: ProjectBindStatus {
+ requested_path: requested_path.display().to_string(),
+ project_root: layout.project_root.display().to_string(),
+ issues_root: layout.issues_root.display().to_string(),
+ state_root: layout.state_root.display().to_string(),
+ issue_count: status.issue_count,
+ },
+ })
+}
+
+fn restore_binding(seed: ProjectBindingSeed) -> Result<ProjectBinding, Box<dyn std::error::Error>> {
+ Ok(resolve_project_binding(seed.requested_path)?.binding)
+}
+
+fn restore_host_state() -> Result<Option<HostStateSeed>, Box<dyn std::error::Error>> {
+ Ok(load_snapshot_file_from_env(HOST_STATE_ENV)?)
+}
+
+fn open_telemetry_log(binding: &ProjectBinding) -> io::Result<TelemetryLog> {
+ TelemetryLog::new(
+ binding
+ .state_root
+ .join("mcp")
+ .join("telemetry.jsonl")
+ .as_path(),
+ binding.project_root.as_path(),
+ 1,
+ )
+}
+
+fn project_bind_output(
+ status: &ProjectBindStatus,
+ generation: Generation,
+) -> Result<ToolOutput, FaultRecord> {
+ let mut concise = Map::new();
+ let _ = concise.insert("project_root".to_owned(), json!(status.project_root));
+ let _ = concise.insert("issues_root".to_owned(), json!(status.issues_root));
+ let _ = concise.insert("state_root".to_owned(), json!(status.state_root));
+ let _ = concise.insert("issue_count".to_owned(), json!(status.issue_count));
+ if status.requested_path != status.project_root {
+ let _ = concise.insert("requested_path".to_owned(), json!(status.requested_path));
+ }
+ fallback_detailed_tool_output(
+ &Value::Object(concise),
+ status,
+ [
+ format!("bound project {}", status.project_root),
+ format!("issues: {}", status.issues_root),
+ format!("state: {}", status.state_root),
+ format!("issues tracked: {}", status.issue_count),
+ ]
+ .join("\n"),
+ None,
+ libmcp::SurfaceKind::Mutation,
+ generation,
+ FaultStage::Host,
+ "tools/call:project.bind",
+ )
+}
+
+fn system_health_output(
+ health: &libmcp::HealthSnapshot,
+ binding: Option<&ProjectBinding>,
+ worker_alive: bool,
+ launch_path_stable: bool,
+ generation: Generation,
+) -> Result<ToolOutput, FaultRecord> {
+ let rollout_pending = matches!(health.rollout, Some(RolloutState::Pending));
+ let mut concise = Map::new();
+ let _ = concise.insert(
+ "ready".to_owned(),
+ json!(matches!(health.state, libmcp::LifecycleState::Ready)),
+ );
+ let _ = concise.insert("bound".to_owned(), json!(binding.is_some()));
+ let _ = concise.insert(
+ "worker_generation".to_owned(),
+ json!(health.generation.get()),
+ );
+ let _ = concise.insert("worker_alive".to_owned(), json!(worker_alive));
+ let _ = concise.insert("rollout_pending".to_owned(), json!(rollout_pending));
+ let _ = concise.insert("launch_path_stable".to_owned(), json!(launch_path_stable));
+ if let Some(binding) = binding {
+ let _ = concise.insert(
+ "project_root".to_owned(),
+ json!(binding.project_root.display().to_string()),
+ );
+ let _ = concise.insert(
+ "issues_root".to_owned(),
+ json!(binding.issues_root.display().to_string()),
+ );
+ }
+ let full = json!({
+ "health": health,
+ "binding": binding.map(|binding| json!({
+ "requested_path": binding.requested_path.display().to_string(),
+ "project_root": binding.project_root.display().to_string(),
+ "issues_root": binding.issues_root.display().to_string(),
+ "state_root": binding.state_root.display().to_string(),
+ })),
+ "worker_alive": worker_alive,
+ "launch_path_stable": launch_path_stable,
+ });
+ let mut lines = vec![format!(
+ "{} | {}",
+ if matches!(health.state, libmcp::LifecycleState::Ready) {
+ "ready"
+ } else {
+ "not-ready"
+ },
+ if binding.is_some() {
+ "bound"
+ } else {
+ "unbound"
+ }
+ )];
+ if let Some(binding) = binding {
+ lines.push(format!("project: {}", binding.project_root.display()));
+ lines.push(format!("issues: {}", binding.issues_root.display()));
+ }
+ lines.push(format!(
+ "worker: gen {} {}",
+ health.generation.get(),
+ if worker_alive { "alive" } else { "dead" }
+ ));
+ lines.push(format!(
+ "binary: {}{}",
+ if launch_path_stable {
+ "stable"
+ } else {
+ "unstable"
+ },
+ if rollout_pending {
+ " rollout-pending"
+ } else {
+ ""
+ }
+ ));
+ fallback_detailed_tool_output(
+ &Value::Object(concise),
+ &full,
+ lines.join("\n"),
+ None,
+ libmcp::SurfaceKind::Ops,
+ generation,
+ FaultStage::Host,
+ "tools/call:system.health",
+ )
+}
+
+fn system_telemetry_output(
+ telemetry: &libmcp::TelemetrySnapshot,
+ host_rollouts: u64,
+ generation: Generation,
+) -> Result<ToolOutput, FaultRecord> {
+ let hot_methods = telemetry.methods.iter().take(6).collect::<Vec<_>>();
+ let concise = json!({
+ "requests": telemetry.totals.request_count,
+ "successes": telemetry.totals.success_count,
+ "response_errors": telemetry.totals.response_error_count,
+ "transport_faults": telemetry.totals.transport_fault_count,
+ "retries": telemetry.totals.retry_count,
+ "worker_restarts": telemetry.restart_count,
+ "host_rollouts": host_rollouts,
+ "hot_methods": hot_methods.iter().map(|method| json!({
+ "method": method.method,
+ "requests": method.request_count,
+ "response_errors": method.response_error_count,
+ "transport_faults": method.transport_fault_count,
+ "retries": method.retry_count,
+ })).collect::<Vec<_>>(),
+ });
+ let full = json!({
+ "telemetry": telemetry,
+ "host_rollouts": host_rollouts,
+ });
+ let mut lines = vec![format!(
+ "requests={} success={} response_error={} transport_fault={} retry={}",
+ telemetry.totals.request_count,
+ telemetry.totals.success_count,
+ telemetry.totals.response_error_count,
+ telemetry.totals.transport_fault_count,
+ telemetry.totals.retry_count
+ )];
+ lines.push(format!(
+ "worker_restarts={} host_rollouts={host_rollouts}",
+ telemetry.restart_count,
+ ));
+ if !hot_methods.is_empty() {
+ lines.push("hot methods:".to_owned());
+ for method in hot_methods {
+ lines.push(format!(
+ "{} req={} err={} transport={} retry={}",
+ method.method,
+ method.request_count,
+ method.response_error_count,
+ method.transport_fault_count,
+ method.retry_count,
+ ));
+ }
+ }
+ fallback_detailed_tool_output(
+ &concise,
+ &full,
+ lines.join("\n"),
+ None,
+ libmcp::SurfaceKind::Ops,
+ generation,
+ FaultStage::Host,
+ "tools/call:system.telemetry",
+ )
+}
+
+fn deserialize<T: for<'de> serde::Deserialize<'de>>(
+ value: Value,
+ operation: &str,
+ generation: Generation,
+) -> Result<T, FaultRecord> {
+ serde_json::from_value(value).map_err(|error| {
+ FaultRecord::invalid_input(
+ generation,
+ FaultStage::Protocol,
+ operation,
+ format!("invalid params: {error}"),
+ )
+ })
+}
+
+fn operation_key(method: &str, params: &Value) -> String {
+ match method {
+ "tools/call" => params.get("name").and_then(Value::as_str).map_or_else(
+ || "tools/call".to_owned(),
+ |name| format!("tools/call:{name}"),
+ ),
+ other => other.to_owned(),
+ }
+}
+
+fn request_id_from_frame(frame: &FramedMessage) -> Option<RequestId> {
+ match frame.classify() {
+ libmcp::RpcEnvelopeKind::Request { id, .. } => Some(id),
+ libmcp::RpcEnvelopeKind::Notification { .. }
+ | libmcp::RpcEnvelopeKind::Response { .. }
+ | libmcp::RpcEnvelopeKind::Unknown => None,
+ }
+}
+
+fn jsonrpc_result(id: Value, result: Value) -> Value {
+ json!({
+ "jsonrpc": "2.0",
+ "id": id,
+ "result": result,
+ })
+}
+
+fn jsonrpc_error(id: Value, fault: FaultRecord) -> Value {
+ json!({
+ "jsonrpc": "2.0",
+ "id": id,
+ "error": fault.into_jsonrpc_error(),
+ })
+}
+
+fn write_message(stdout: &mut impl Write, message: &Value) -> io::Result<()> {
+ serde_json::to_writer(&mut *stdout, message)?;
+ stdout.write_all(b"\n")?;
+ stdout.flush()?;
+ Ok(())
+}
+
+fn elapsed_ms(duration: std::time::Duration) -> u64 {
+ u64::try_from(duration.as_millis()).unwrap_or(u64::MAX)
+}
+
+#[derive(Debug, serde::Deserialize)]
+struct ToolCallEnvelope {
+ name: String,
+ #[serde(default = "empty_json_object")]
+ arguments: Value,
+}
+
+fn empty_json_object() -> Value {
+ json!({})
+}
+
+#[derive(Debug, serde::Deserialize)]
+struct ProjectBindArgs {
+ path: String,
+}
+
+impl From<&ProjectBinding> for ProjectBindingSeed {
+ fn from(value: &ProjectBinding) -> Self {
+ Self {
+ requested_path: value.requested_path.clone(),
+ project_root: value.project_root.clone(),
+ }
+ }
+}
diff --git a/crates/jira-at-home/src/mcp/mod.rs b/crates/jira-at-home/src/mcp/mod.rs
new file mode 100644
index 0000000..666598f
--- /dev/null
+++ b/crates/jira-at-home/src/mcp/mod.rs
@@ -0,0 +1,10 @@
+mod catalog;
+mod fault;
+mod host;
+mod output;
+mod protocol;
+mod service;
+mod telemetry;
+
+pub(crate) use host::runtime::run_host;
+pub(crate) use service::run_worker;
diff --git a/crates/jira-at-home/src/mcp/output.rs b/crates/jira-at-home/src/mcp/output.rs
new file mode 100644
index 0000000..90673b3
--- /dev/null
+++ b/crates/jira-at-home/src/mcp/output.rs
@@ -0,0 +1,195 @@
+use libmcp::{
+ DetailLevel, FallbackJsonProjection, JsonPorcelainConfig, ProjectionError, RenderMode,
+ SurfaceKind, ToolProjection, render_json_porcelain, with_presentation_properties,
+};
+use serde::Serialize;
+use serde_json::{Value, json};
+
+use crate::mcp::fault::{FaultRecord, FaultStage};
+
+const FULL_PORCELAIN_MAX_LINES: usize = 40;
+const FULL_PORCELAIN_MAX_INLINE_CHARS: usize = 512;
+
+#[derive(Debug, Clone, Copy, PartialEq, Eq)]
+pub(crate) struct Presentation {
+ pub(crate) render: RenderMode,
+ pub(crate) detail: DetailLevel,
+}
+
+#[derive(Debug, Clone)]
+pub(crate) struct ToolOutput {
+ concise: Value,
+ full: Value,
+ concise_text: String,
+ full_text: Option<String>,
+}
+
+impl ToolOutput {
+ pub(crate) fn from_values(
+ concise: Value,
+ full: Value,
+ concise_text: impl Into<String>,
+ full_text: Option<String>,
+ ) -> Self {
+ Self {
+ concise,
+ full,
+ concise_text: concise_text.into(),
+ full_text,
+ }
+ }
+
+ fn structured(&self, detail: DetailLevel) -> &Value {
+ match detail {
+ DetailLevel::Concise => &self.concise,
+ DetailLevel::Full => &self.full,
+ }
+ }
+
+ fn porcelain_text(&self, detail: DetailLevel) -> String {
+ match detail {
+ DetailLevel::Concise => self.concise_text.clone(),
+ DetailLevel::Full => self
+ .full_text
+ .clone()
+ .unwrap_or_else(|| render_json_porcelain(&self.full, full_porcelain_config())),
+ }
+ }
+}
+
+impl Default for Presentation {
+ fn default() -> Self {
+ Self {
+ render: RenderMode::Porcelain,
+ detail: DetailLevel::Concise,
+ }
+ }
+}
+
+pub(crate) fn split_presentation(
+ arguments: Value,
+ operation: &str,
+ generation: libmcp::Generation,
+ stage: FaultStage,
+) -> Result<(Presentation, Value), FaultRecord> {
+ let Value::Object(mut object) = arguments else {
+ return Ok((Presentation::default(), arguments));
+ };
+ let render = object
+ .remove("render")
+ .map(|value| {
+ serde_json::from_value::<RenderMode>(value).map_err(|error| {
+ FaultRecord::invalid_input(
+ generation,
+ stage,
+ operation,
+ format!("invalid render mode: {error}"),
+ )
+ })
+ })
+ .transpose()?
+ .unwrap_or(RenderMode::Porcelain);
+ let detail = object
+ .remove("detail")
+ .map(|value| {
+ serde_json::from_value::<DetailLevel>(value).map_err(|error| {
+ FaultRecord::invalid_input(
+ generation,
+ stage,
+ operation,
+ format!("invalid detail level: {error}"),
+ )
+ })
+ })
+ .transpose()?
+ .unwrap_or(DetailLevel::Concise);
+ Ok((Presentation { render, detail }, Value::Object(object)))
+}
+
+pub(crate) fn projected_tool_output(
+ projection: &impl ToolProjection,
+ concise_text: impl Into<String>,
+ full_text: Option<String>,
+ generation: libmcp::Generation,
+ stage: FaultStage,
+ operation: &str,
+) -> Result<ToolOutput, FaultRecord> {
+ let concise = projection
+ .concise_projection()
+ .map_err(|error| projection_fault(error, generation, stage, operation))?;
+ let full = projection
+ .full_projection()
+ .map_err(|error| projection_fault(error, generation, stage, operation))?;
+ Ok(ToolOutput::from_values(
+ concise,
+ full,
+ concise_text,
+ full_text,
+ ))
+}
+
+pub(crate) fn fallback_detailed_tool_output(
+ concise: &impl Serialize,
+ full: &impl Serialize,
+ concise_text: impl Into<String>,
+ full_text: Option<String>,
+ kind: SurfaceKind,
+ generation: libmcp::Generation,
+ stage: FaultStage,
+ operation: &str,
+) -> Result<ToolOutput, FaultRecord> {
+ let projection = FallbackJsonProjection::new(concise, full, kind)
+ .map_err(|error| projection_fault(error, generation, stage, operation))?;
+ projected_tool_output(
+ &projection,
+ concise_text,
+ full_text,
+ generation,
+ stage,
+ operation,
+ )
+}
+
+pub(crate) fn tool_success(
+ output: ToolOutput,
+ presentation: Presentation,
+ generation: libmcp::Generation,
+ stage: FaultStage,
+ operation: &str,
+) -> Result<Value, FaultRecord> {
+ let structured = output.structured(presentation.detail).clone();
+ let text = match presentation.render {
+ RenderMode::Porcelain => output.porcelain_text(presentation.detail),
+ RenderMode::Json => serde_json::to_string_pretty(&structured).map_err(|error| {
+ FaultRecord::internal(generation, stage, operation, error.to_string())
+ })?,
+ };
+ Ok(json!({
+ "content": [{
+ "type": "text",
+ "text": text,
+ }],
+ "structuredContent": structured,
+ "isError": false,
+ }))
+}
+
+pub(crate) fn with_common_presentation(schema: Value) -> Value {
+ with_presentation_properties(schema)
+}
+
+fn projection_fault(
+ error: ProjectionError,
+ generation: libmcp::Generation,
+ stage: FaultStage,
+ operation: &str,
+) -> FaultRecord {
+ FaultRecord::internal(generation, stage, operation, error.to_string())
+}
+
+const fn full_porcelain_config() -> JsonPorcelainConfig {
+ JsonPorcelainConfig {
+ max_lines: FULL_PORCELAIN_MAX_LINES,
+ max_inline_chars: FULL_PORCELAIN_MAX_INLINE_CHARS,
+ }
+}
diff --git a/crates/jira-at-home/src/mcp/protocol.rs b/crates/jira-at-home/src/mcp/protocol.rs
new file mode 100644
index 0000000..9226a36
--- /dev/null
+++ b/crates/jira-at-home/src/mcp/protocol.rs
@@ -0,0 +1,78 @@
+use std::path::PathBuf;
+
+use libmcp::{Generation, HostSessionKernelSnapshot};
+use serde::{Deserialize, Serialize};
+use serde_json::Value;
+
+use crate::mcp::telemetry::ServerTelemetry;
+
+pub(crate) const PROTOCOL_VERSION: &str = "2025-11-25";
+pub(crate) const SERVER_NAME: &str = "jira-at-home";
+pub(crate) const HOST_STATE_ENV: &str = "JIRA_AT_HOME_MCP_HOST_STATE";
+pub(crate) const FORCE_ROLLOUT_ENV: &str = "JIRA_AT_HOME_MCP_TEST_FORCE_ROLLOUT_KEY";
+pub(crate) const CRASH_ONCE_ENV: &str = "JIRA_AT_HOME_MCP_TEST_HOST_CRASH_ONCE_KEY";
+
+#[derive(Clone, Debug, Deserialize, Serialize)]
+pub(crate) struct HostStateSeed {
+ pub(crate) session_kernel: HostSessionKernelSnapshot,
+ pub(crate) telemetry: ServerTelemetry,
+ pub(crate) next_request_id: u64,
+ pub(crate) binding: Option<ProjectBindingSeed>,
+ pub(crate) worker_generation: Generation,
+ pub(crate) worker_spawned: bool,
+ pub(crate) force_rollout_consumed: bool,
+ pub(crate) crash_once_consumed: bool,
+}
+
+#[derive(Clone, Debug, Deserialize, Eq, PartialEq, Serialize)]
+pub(crate) struct ProjectBindingSeed {
+ pub(crate) requested_path: PathBuf,
+ pub(crate) project_root: PathBuf,
+}
+
+#[derive(Clone, Copy, Debug, Deserialize, Eq, Ord, PartialEq, PartialOrd, Serialize)]
+#[serde(transparent)]
+pub(crate) struct HostRequestId(pub(crate) u64);
+
+#[derive(Clone, Debug, Deserialize, Eq, PartialEq, Serialize)]
+#[serde(tag = "kind", rename_all = "snake_case")]
+pub(crate) enum WorkerRequest {
+ Execute {
+ id: HostRequestId,
+ operation: WorkerOperation,
+ },
+}
+
+#[derive(Clone, Debug, Deserialize, Eq, PartialEq, Serialize)]
+#[serde(tag = "kind", rename_all = "snake_case")]
+pub(crate) enum WorkerOperation {
+ CallTool { name: String, arguments: Value },
+}
+
+#[derive(Clone, Debug, Deserialize, Eq, PartialEq, Serialize)]
+pub(crate) struct WorkerResponse {
+ pub(crate) id: HostRequestId,
+ pub(crate) outcome: WorkerOutcome,
+}
+
+#[derive(Clone, Debug, Deserialize, Eq, PartialEq, Serialize)]
+#[serde(tag = "status", rename_all = "snake_case")]
+pub(crate) enum WorkerOutcome {
+ Success {
+ result: Value,
+ },
+ Fault {
+ fault: crate::mcp::fault::FaultRecord,
+ },
+}
+
+#[derive(Clone, Debug, Deserialize, Eq, PartialEq, Serialize)]
+pub(crate) struct BinaryFingerprint {
+ pub(crate) length_bytes: u64,
+ pub(crate) modified_unix_nanos: u128,
+}
+
+#[derive(Clone, Debug)]
+pub(crate) struct WorkerSpawnConfig {
+ pub(crate) executable: PathBuf,
+}
diff --git a/crates/jira-at-home/src/mcp/service.rs b/crates/jira-at-home/src/mcp/service.rs
new file mode 100644
index 0000000..fc9dbf0
--- /dev/null
+++ b/crates/jira-at-home/src/mcp/service.rs
@@ -0,0 +1,336 @@
+use std::io::{self, BufRead, Write};
+use std::path::{Path, PathBuf};
+
+use libmcp::{Generation, SurfaceKind};
+use serde::Deserialize;
+use serde_json::{Value, json};
+
+use crate::mcp::fault::{FaultRecord, FaultStage};
+use crate::mcp::output::{
+ ToolOutput, fallback_detailed_tool_output, split_presentation, tool_success,
+};
+use crate::store::{
+ IssueBody, IssueRecord, IssueSlug, IssueStore, SaveReceipt, StoreError, format_timestamp,
+};
+
+pub(crate) fn run_worker(
+ project_root: PathBuf,
+ generation: u64,
+) -> Result<(), Box<dyn std::error::Error>> {
+ let generation = generation_from_wire(generation);
+ let store = IssueStore::bind(project_root)?;
+ let stdin = io::stdin();
+ let mut stdout = io::stdout().lock();
+ let mut service = WorkerService::new(store, generation);
+
+ for line in stdin.lock().lines() {
+ let line = line?;
+ if line.trim().is_empty() {
+ continue;
+ }
+ let request = serde_json::from_str::<crate::mcp::protocol::WorkerRequest>(&line)?;
+ let response = match request {
+ crate::mcp::protocol::WorkerRequest::Execute { id, operation } => {
+ let outcome = match service.execute(operation) {
+ Ok(result) => crate::mcp::protocol::WorkerOutcome::Success { result },
+ Err(fault) => crate::mcp::protocol::WorkerOutcome::Fault { fault },
+ };
+ crate::mcp::protocol::WorkerResponse { id, outcome }
+ }
+ };
+ serde_json::to_writer(&mut stdout, &response)?;
+ stdout.write_all(b"\n")?;
+ stdout.flush()?;
+ }
+
+ Ok(())
+}
+
+struct WorkerService {
+ store: IssueStore,
+ generation: Generation,
+}
+
+impl WorkerService {
+ fn new(store: IssueStore, generation: Generation) -> Self {
+ Self { store, generation }
+ }
+
+ fn execute(
+ &mut self,
+ operation: crate::mcp::protocol::WorkerOperation,
+ ) -> Result<Value, FaultRecord> {
+ match operation {
+ crate::mcp::protocol::WorkerOperation::CallTool { name, arguments } => {
+ self.call_tool(&name, arguments)
+ }
+ }
+ }
+
+ fn call_tool(&mut self, name: &str, arguments: Value) -> Result<Value, FaultRecord> {
+ let operation = format!("tools/call:{name}");
+ let (presentation, arguments) =
+ split_presentation(arguments, &operation, self.generation, FaultStage::Worker)?;
+ let output = match name {
+ "issue.save" => {
+ let args = deserialize::<IssueSaveArgs>(arguments, &operation, self.generation)?;
+ let slug = IssueSlug::parse(args.slug)
+ .map_err(store_fault(self.generation, &operation))?;
+ let body = IssueBody::parse(args.body)
+ .map_err(store_fault(self.generation, &operation))?;
+ let receipt = self
+ .store
+ .save(slug, body)
+ .map_err(store_fault(self.generation, &operation))?;
+ issue_save_output(
+ &receipt,
+ self.store.layout().project_root.as_path(),
+ self.generation,
+ &operation,
+ )?
+ }
+ "issue.list" => {
+ let issues = self
+ .store
+ .list()
+ .map_err(store_fault(self.generation, &operation))?;
+ issue_list_output(
+ &issues,
+ self.store.layout().project_root.as_path(),
+ self.generation,
+ &operation,
+ )?
+ }
+ "issue.read" => {
+ let args = deserialize::<IssueReadArgs>(arguments, &operation, self.generation)?;
+ let slug = IssueSlug::parse(args.slug)
+ .map_err(store_fault(self.generation, &operation))?;
+ let record = self
+ .store
+ .read(slug)
+ .map_err(store_fault(self.generation, &operation))?;
+ issue_read_output(
+ &record,
+ self.store.layout().project_root.as_path(),
+ self.generation,
+ &operation,
+ )?
+ }
+ other => {
+ return Err(FaultRecord::invalid_input(
+ self.generation,
+ FaultStage::Worker,
+ &operation,
+ format!("unknown worker tool `{other}`"),
+ ));
+ }
+ };
+ tool_success(
+ output,
+ presentation,
+ self.generation,
+ FaultStage::Worker,
+ &operation,
+ )
+ }
+}
+
+#[derive(Debug, Deserialize)]
+struct IssueSaveArgs {
+ slug: String,
+ body: String,
+}
+
+#[derive(Debug, Deserialize)]
+struct IssueReadArgs {
+ slug: String,
+}
+
+fn deserialize<T: for<'de> Deserialize<'de>>(
+ value: Value,
+ operation: &str,
+ generation: Generation,
+) -> Result<T, FaultRecord> {
+ serde_json::from_value(value).map_err(|error| {
+ FaultRecord::invalid_input(
+ generation,
+ FaultStage::Protocol,
+ operation,
+ format!("invalid params: {error}"),
+ )
+ })
+}
+
+fn store_fault(
+ generation: Generation,
+ operation: &str,
+) -> impl FnOnce(StoreError) -> FaultRecord + '_ {
+ move |error| {
+ let stage = if matches!(error, StoreError::Io(_)) {
+ FaultStage::Store
+ } else {
+ FaultStage::Worker
+ };
+ match error {
+ StoreError::InvalidSlug(_)
+ | StoreError::EmptyIssueBody
+ | StoreError::IssueNotFound(_)
+ | StoreError::MalformedIssueEntry(_, _)
+ | StoreError::MissingProjectPath(_)
+ | StoreError::ProjectPathNotDirectory(_) => {
+ FaultRecord::invalid_input(generation, stage, operation, error.to_string())
+ }
+ StoreError::Io(_) => {
+ FaultRecord::internal(generation, stage, operation, error.to_string())
+ }
+ }
+ }
+}
+
+fn issue_save_output(
+ receipt: &SaveReceipt,
+ project_root: &Path,
+ generation: Generation,
+ operation: &str,
+) -> Result<ToolOutput, FaultRecord> {
+ let relative_path = relative_issue_path(&receipt.path, project_root);
+ let status = if receipt.created {
+ "created"
+ } else {
+ "updated"
+ };
+ let concise = json!({
+ "slug": receipt.slug,
+ "status": status,
+ "path": relative_path,
+ "updated_at": format_timestamp(receipt.updated_at),
+ });
+ let full = json!({
+ "slug": receipt.slug,
+ "status": status,
+ "path": relative_path,
+ "updated_at": format_timestamp(receipt.updated_at),
+ "bytes": receipt.bytes,
+ });
+ fallback_detailed_tool_output(
+ &concise,
+ &full,
+ [
+ format!("saved issue {}", receipt.slug),
+ format!("status: {status}"),
+ format!("path: {}", relative_issue_path(&receipt.path, project_root)),
+ format!("updated: {}", format_timestamp(receipt.updated_at)),
+ ]
+ .join("\n"),
+ None,
+ SurfaceKind::Mutation,
+ generation,
+ FaultStage::Worker,
+ operation,
+ )
+}
+
+fn issue_list_output(
+ issues: &[crate::store::IssueSummary],
+ project_root: &Path,
+ generation: Generation,
+ operation: &str,
+) -> Result<ToolOutput, FaultRecord> {
+ let concise_items = issues
+ .iter()
+ .map(|issue| {
+ json!({
+ "slug": issue.slug,
+ "updated_at": format_timestamp(issue.updated_at),
+ })
+ })
+ .collect::<Vec<_>>();
+ let full_items = issues
+ .iter()
+ .map(|issue| {
+ let path = relative_issue_path(
+ &project_root
+ .join("issues")
+ .join(format!("{}.md", issue.slug)),
+ project_root,
+ );
+ json!({
+ "slug": issue.slug,
+ "path": path,
+ "updated_at": format_timestamp(issue.updated_at),
+ })
+ })
+ .collect::<Vec<_>>();
+ let mut lines = vec![format!("{} issue(s)", issues.len())];
+ lines.extend(issues.iter().map(|issue| issue.slug.to_string()));
+ fallback_detailed_tool_output(
+ &json!({ "count": issues.len(), "issues": concise_items }),
+ &json!({ "count": issues.len(), "issues": full_items }),
+ lines.join("\n"),
+ None,
+ SurfaceKind::List,
+ generation,
+ FaultStage::Worker,
+ operation,
+ )
+}
+
+fn issue_read_output(
+ record: &IssueRecord,
+ project_root: &Path,
+ generation: Generation,
+ operation: &str,
+) -> Result<ToolOutput, FaultRecord> {
+ let relative_path = relative_issue_path(&record.path, project_root);
+ let concise = json!({
+ "slug": record.slug,
+ "updated_at": format_timestamp(record.updated_at),
+ "body": record.body,
+ });
+ let full = json!({
+ "slug": record.slug,
+ "path": relative_path,
+ "updated_at": format_timestamp(record.updated_at),
+ "bytes": record.bytes,
+ "body": record.body,
+ });
+ let concise_text = format!(
+ "issue {}\nupdated: {}\n\n{}",
+ record.slug,
+ format_timestamp(record.updated_at),
+ record.body,
+ );
+ let full_text = Some(format!(
+ "issue {}\npath: {}\nupdated: {}\nbytes: {}\n\n{}",
+ record.slug,
+ relative_issue_path(&record.path, project_root),
+ format_timestamp(record.updated_at),
+ record.bytes,
+ record.body,
+ ));
+ fallback_detailed_tool_output(
+ &concise,
+ &full,
+ concise_text,
+ full_text,
+ SurfaceKind::Read,
+ generation,
+ FaultStage::Worker,
+ operation,
+ )
+}
+
+fn relative_issue_path(path: &Path, project_root: &Path) -> String {
+ path.strip_prefix(project_root).map_or_else(
+ |_| path.display().to_string(),
+ |relative| relative.display().to_string(),
+ )
+}
+
+fn generation_from_wire(raw: u64) -> Generation {
+ let mut generation = Generation::genesis();
+ for _ in 1..raw {
+ generation = generation.next();
+ }
+ generation
+}
diff --git a/crates/jira-at-home/src/mcp/telemetry.rs b/crates/jira-at-home/src/mcp/telemetry.rs
new file mode 100644
index 0000000..8df0009
--- /dev/null
+++ b/crates/jira-at-home/src/mcp/telemetry.rs
@@ -0,0 +1,228 @@
+use std::collections::BTreeMap;
+use std::time::{Duration, SystemTime, UNIX_EPOCH};
+
+use libmcp::{
+ Fault, Generation, HealthSnapshot, LifecycleState, MethodTelemetry, RolloutState,
+ TelemetrySnapshot, TelemetryTotals,
+};
+use serde::{Deserialize, Serialize};
+
+use crate::mcp::fault::FaultRecord;
+
+#[derive(Clone, Debug, Default, Deserialize, Eq, PartialEq, Serialize)]
+struct MethodStats {
+ request_count: u64,
+ success_count: u64,
+ response_error_count: u64,
+ transport_fault_count: u64,
+ retry_count: u64,
+ total_latency_ms: u128,
+ max_latency_ms: u64,
+ last_latency_ms: Option<u64>,
+ last_error: Option<String>,
+}
+
+#[derive(Clone, Debug, Deserialize, Eq, PartialEq, Serialize)]
+pub(crate) struct ServerTelemetry {
+ started_unix_ms: u64,
+ state: LifecycleState,
+ generation: Generation,
+ consecutive_failures: u32,
+ restart_count: u64,
+ host_rollouts: u64,
+ totals: TelemetryTotals,
+ methods: BTreeMap<String, MethodStats>,
+ last_fault: Option<Fault>,
+}
+
+impl Default for ServerTelemetry {
+ fn default() -> Self {
+ Self {
+ started_unix_ms: unix_ms_now(),
+ state: LifecycleState::Cold,
+ generation: Generation::genesis(),
+ consecutive_failures: 0,
+ restart_count: 0,
+ host_rollouts: 0,
+ totals: TelemetryTotals {
+ request_count: 0,
+ success_count: 0,
+ response_error_count: 0,
+ transport_fault_count: 0,
+ retry_count: 0,
+ },
+ methods: BTreeMap::new(),
+ last_fault: None,
+ }
+ }
+}
+
+impl ServerTelemetry {
+ pub(crate) fn record_request(&mut self, operation: &str) {
+ self.totals.request_count += 1;
+ self.methods
+ .entry(operation.to_owned())
+ .or_default()
+ .request_count += 1;
+ }
+
+ pub(crate) fn record_success(
+ &mut self,
+ operation: &str,
+ latency_ms: u64,
+ generation: Generation,
+ worker_alive: bool,
+ ) {
+ self.generation = generation;
+ self.state = if worker_alive {
+ LifecycleState::Ready
+ } else {
+ LifecycleState::Cold
+ };
+ self.consecutive_failures = 0;
+ self.last_fault = None;
+ self.totals.success_count += 1;
+ let entry = self.methods.entry(operation.to_owned()).or_default();
+ entry.success_count += 1;
+ entry.total_latency_ms = entry
+ .total_latency_ms
+ .saturating_add(u128::from(latency_ms));
+ entry.max_latency_ms = entry.max_latency_ms.max(latency_ms);
+ entry.last_latency_ms = Some(latency_ms);
+ entry.last_error = None;
+ }
+
+ pub(crate) fn record_error(
+ &mut self,
+ operation: &str,
+ fault: &FaultRecord,
+ latency_ms: u64,
+ generation: Generation,
+ ) {
+ self.generation = generation;
+ self.consecutive_failures = self.consecutive_failures.saturating_add(1);
+ self.last_fault = Some(fault.fault.clone());
+ let transportish = matches!(
+ fault.fault.class,
+ libmcp::FaultClass::Transport
+ | libmcp::FaultClass::Process
+ | libmcp::FaultClass::Timeout
+ | libmcp::FaultClass::Resource
+ | libmcp::FaultClass::Replay
+ | libmcp::FaultClass::Rollout
+ );
+ if transportish {
+ self.state = LifecycleState::Recovering;
+ self.totals.transport_fault_count += 1;
+ } else {
+ self.totals.response_error_count += 1;
+ }
+ let entry = self.methods.entry(operation.to_owned()).or_default();
+ if transportish {
+ entry.transport_fault_count += 1;
+ } else {
+ entry.response_error_count += 1;
+ }
+ entry.total_latency_ms = entry
+ .total_latency_ms
+ .saturating_add(u128::from(latency_ms));
+ entry.max_latency_ms = entry.max_latency_ms.max(latency_ms);
+ entry.last_latency_ms = Some(latency_ms);
+ entry.last_error = Some(fault.message().to_owned());
+ }
+
+ pub(crate) fn record_retry(&mut self, operation: &str) {
+ self.totals.retry_count += 1;
+ self.methods
+ .entry(operation.to_owned())
+ .or_default()
+ .retry_count += 1;
+ }
+
+ pub(crate) fn record_worker_restart(&mut self, generation: Generation) {
+ self.generation = generation;
+ self.restart_count += 1;
+ self.state = LifecycleState::Recovering;
+ }
+
+ pub(crate) fn record_rollout(&mut self) {
+ self.host_rollouts += 1;
+ }
+
+ pub(crate) fn host_rollouts(&self) -> u64 {
+ self.host_rollouts
+ }
+
+ pub(crate) fn health_snapshot(&self, rollout: RolloutState) -> HealthSnapshot {
+ HealthSnapshot {
+ state: self.state,
+ generation: self.generation,
+ uptime_ms: self.uptime_ms(),
+ consecutive_failures: self.consecutive_failures,
+ restart_count: self.restart_count,
+ rollout: Some(rollout),
+ last_fault: self.last_fault.clone(),
+ }
+ }
+
+ pub(crate) fn telemetry_snapshot(&self) -> TelemetrySnapshot {
+ TelemetrySnapshot {
+ uptime_ms: self.uptime_ms(),
+ state: self.state,
+ generation: self.generation,
+ consecutive_failures: self.consecutive_failures,
+ restart_count: self.restart_count,
+ totals: self.totals.clone(),
+ methods: self.ranked_methods(),
+ last_fault: self.last_fault.clone(),
+ }
+ }
+
+ pub(crate) fn ranked_methods(&self) -> Vec<MethodTelemetry> {
+ let mut methods = self
+ .methods
+ .iter()
+ .map(|(method, stats)| MethodTelemetry {
+ method: method.clone(),
+ request_count: stats.request_count,
+ success_count: stats.success_count,
+ response_error_count: stats.response_error_count,
+ transport_fault_count: stats.transport_fault_count,
+ retry_count: stats.retry_count,
+ last_latency_ms: stats.last_latency_ms,
+ max_latency_ms: stats.max_latency_ms,
+ avg_latency_ms: average_latency_ms(stats),
+ last_error: stats.last_error.clone(),
+ })
+ .collect::<Vec<_>>();
+ methods.sort_by(|left, right| {
+ right
+ .request_count
+ .cmp(&left.request_count)
+ .then_with(|| right.transport_fault_count.cmp(&left.transport_fault_count))
+ .then_with(|| right.response_error_count.cmp(&left.response_error_count))
+ .then_with(|| left.method.cmp(&right.method))
+ });
+ methods
+ }
+
+ fn uptime_ms(&self) -> u64 {
+ unix_ms_now().saturating_sub(self.started_unix_ms)
+ }
+}
+
+fn average_latency_ms(stats: &MethodStats) -> u64 {
+ if stats.request_count == 0 {
+ return 0;
+ }
+ let average = stats.total_latency_ms / u128::from(stats.request_count);
+ u64::try_from(average).unwrap_or(u64::MAX)
+}
+
+fn unix_ms_now() -> u64 {
+ let duration = SystemTime::now()
+ .duration_since(UNIX_EPOCH)
+ .unwrap_or(Duration::ZERO);
+ let millis = duration.as_millis();
+ u64::try_from(millis).unwrap_or(u64::MAX)
+}