diff options
Diffstat (limited to 'crates/jira-at-home')
| -rw-r--r-- | crates/jira-at-home/Cargo.toml | 27 | ||||
| -rw-r--r-- | crates/jira-at-home/src/main.rs | 64 | ||||
| -rw-r--r-- | crates/jira-at-home/src/mcp/catalog.rs | 140 | ||||
| -rw-r--r-- | crates/jira-at-home/src/mcp/fault.rs | 214 | ||||
| -rw-r--r-- | crates/jira-at-home/src/mcp/host/binary.rs | 41 | ||||
| -rw-r--r-- | crates/jira-at-home/src/mcp/host/mod.rs | 3 | ||||
| -rw-r--r-- | crates/jira-at-home/src/mcp/host/process.rs | 262 | ||||
| -rw-r--r-- | crates/jira-at-home/src/mcp/host/runtime.rs | 952 | ||||
| -rw-r--r-- | crates/jira-at-home/src/mcp/mod.rs | 10 | ||||
| -rw-r--r-- | crates/jira-at-home/src/mcp/output.rs | 195 | ||||
| -rw-r--r-- | crates/jira-at-home/src/mcp/protocol.rs | 78 | ||||
| -rw-r--r-- | crates/jira-at-home/src/mcp/service.rs | 336 | ||||
| -rw-r--r-- | crates/jira-at-home/src/mcp/telemetry.rs | 228 | ||||
| -rw-r--r-- | crates/jira-at-home/src/store.rs | 287 | ||||
| -rw-r--r-- | crates/jira-at-home/tests/mcp_hardening.rs | 411 |
15 files changed, 3248 insertions, 0 deletions
diff --git a/crates/jira-at-home/Cargo.toml b/crates/jira-at-home/Cargo.toml new file mode 100644 index 0000000..96192d0 --- /dev/null +++ b/crates/jira-at-home/Cargo.toml @@ -0,0 +1,27 @@ +[package] +name = "jira-at-home" +categories.workspace = true +description.workspace = true +edition.workspace = true +keywords.workspace = true +license.workspace = true +publish = false +readme.workspace = true +repository.workspace = true +rust-version.workspace = true +version.workspace = true + +[dependencies] +clap.workspace = true +dirs.workspace = true +libmcp.workspace = true +serde.workspace = true +serde_json.workspace = true +thiserror.workspace = true +time.workspace = true + +[dev-dependencies] +libmcp-testkit = { git = "https://git.swarm.moe/libmcp.git", rev = "bb92a05eb5446e07c6288e266bd06d7b5899eee5", package = "libmcp-testkit" } + +[lints] +workspace = true diff --git a/crates/jira-at-home/src/main.rs b/crates/jira-at-home/src/main.rs new file mode 100644 index 0000000..932bc63 --- /dev/null +++ b/crates/jira-at-home/src/main.rs @@ -0,0 +1,64 @@ +mod mcp; +mod store; + +use std::path::PathBuf; + +use clap::{Args, Parser, Subcommand}; +#[cfg(test)] +use libmcp_testkit as _; + +#[derive(Parser)] +#[command( + author, + version, + about = "Per-project issue notebook MCP with a hardened host/worker spine" +)] +struct Cli { + #[command(subcommand)] + command: Command, +} + +#[derive(Subcommand)] +enum Command { + /// Serve the stdio MCP host. + Mcp { + #[command(subcommand)] + command: McpCommand, + }, +} + +#[derive(Subcommand)] +enum McpCommand { + /// Run the durable stdio host. + Serve(McpServeArgs), + /// Run the disposable worker process. + Worker(McpWorkerArgs), +} + +#[derive(Args)] +struct McpServeArgs { + /// Optional project path to bind immediately on startup. + #[arg(long)] + project: Option<PathBuf>, +} + +#[derive(Args)] +struct McpWorkerArgs { + /// Bound project root. + #[arg(long)] + project: PathBuf, + /// Logical worker generation assigned by the host. + #[arg(long)] + generation: u64, +} + +fn main() -> Result<(), Box<dyn std::error::Error>> { + let cli = Cli::parse(); + match cli.command { + Command::Mcp { command } => match command { + McpCommand::Serve(args) => mcp::run_host(args.project)?, + McpCommand::Worker(args) => mcp::run_worker(args.project, args.generation)?, + }, + } + Ok(()) +} diff --git a/crates/jira-at-home/src/mcp/catalog.rs b/crates/jira-at-home/src/mcp/catalog.rs new file mode 100644 index 0000000..2ff8e81 --- /dev/null +++ b/crates/jira-at-home/src/mcp/catalog.rs @@ -0,0 +1,140 @@ +use libmcp::ReplayContract; +use serde_json::{Value, json}; + +use crate::mcp::output::with_common_presentation; + +#[derive(Clone, Copy, Debug, Eq, PartialEq)] +pub(crate) enum DispatchTarget { + Host, + Worker, +} + +#[derive(Clone, Copy, Debug, Eq, PartialEq)] +pub(crate) struct ToolSpec { + pub(crate) name: &'static str, + pub(crate) description: &'static str, + pub(crate) dispatch: DispatchTarget, + pub(crate) replay: ReplayContract, +} + +impl ToolSpec { + fn annotation_json(self) -> Value { + json!({ + "title": self.name, + "readOnlyHint": self.replay == ReplayContract::Convergent, + "destructiveHint": self.replay == ReplayContract::NeverReplay, + "jiraAtHome": { + "dispatch": match self.dispatch { + DispatchTarget::Host => "host", + DispatchTarget::Worker => "worker", + }, + "replayContract": match self.replay { + ReplayContract::Convergent => "convergent", + ReplayContract::ProbeRequired => "probe_required", + ReplayContract::NeverReplay => "never_replay", + }, + } + }) + } +} + +const TOOL_SPECS: &[ToolSpec] = &[ + ToolSpec { + name: "project.bind", + description: "Bind this MCP session to a project root or a nested path inside one.", + dispatch: DispatchTarget::Host, + replay: ReplayContract::NeverReplay, + }, + ToolSpec { + name: "issue.save", + description: "Create or overwrite one issue note at `issues/<slug>.md`.", + dispatch: DispatchTarget::Worker, + replay: ReplayContract::NeverReplay, + }, + ToolSpec { + name: "issue.list", + description: "List the currently open issues. There is no close state; all existing issue files are open.", + dispatch: DispatchTarget::Worker, + replay: ReplayContract::Convergent, + }, + ToolSpec { + name: "issue.read", + description: "Read one issue note by slug.", + dispatch: DispatchTarget::Worker, + replay: ReplayContract::Convergent, + }, + ToolSpec { + name: "system.health", + description: "Read MCP host health, binding state, worker generation, and rollout state.", + dispatch: DispatchTarget::Host, + replay: ReplayContract::Convergent, + }, + ToolSpec { + name: "system.telemetry", + description: "Read aggregate MCP host telemetry and top hot methods for this session.", + dispatch: DispatchTarget::Host, + replay: ReplayContract::Convergent, + }, +]; + +pub(crate) fn tool_spec(name: &str) -> Option<ToolSpec> { + TOOL_SPECS.iter().copied().find(|spec| spec.name == name) +} + +pub(crate) fn tool_definitions() -> Vec<Value> { + TOOL_SPECS + .iter() + .map(|spec| { + json!({ + "name": spec.name, + "description": spec.description, + "inputSchema": tool_schema(spec.name), + "annotations": spec.annotation_json(), + }) + }) + .collect() +} + +fn tool_schema(name: &str) -> Value { + match name { + "project.bind" => with_common_presentation(json!({ + "type": "object", + "properties": { + "path": { + "type": "string", + "description": "Project root or any nested path inside the target project." + } + }, + "required": ["path"] + })), + "issue.save" => with_common_presentation(json!({ + "type": "object", + "properties": { + "slug": { + "type": "string", + "description": "Stable slug. Stored at `issues/<slug>.md`." + }, + "body": { + "type": "string", + "description": "Freeform issue body. Markdown is fine." + } + }, + "required": ["slug", "body"] + })), + "issue.list" | "system.health" | "system.telemetry" => with_common_presentation(json!({ + "type": "object", + "properties": {} + })), + "issue.read" => with_common_presentation(json!({ + "type": "object", + "properties": { + "slug": { + "type": "string", + "description": "Issue slug to read." + } + }, + "required": ["slug"] + })), + _ => Value::Null, + } +} diff --git a/crates/jira-at-home/src/mcp/fault.rs b/crates/jira-at-home/src/mcp/fault.rs new file mode 100644 index 0000000..5f71e6a --- /dev/null +++ b/crates/jira-at-home/src/mcp/fault.rs @@ -0,0 +1,214 @@ +use libmcp::{Fault, FaultClass, FaultCode, Generation, RecoveryDirective, ToolErrorDetail}; +use serde::{Deserialize, Serialize}; +use serde_json::{Value, json}; + +#[derive(Clone, Copy, Debug, Deserialize, Eq, PartialEq, Serialize)] +pub(crate) enum FaultStage { + Host, + Worker, + Store, + Transport, + Protocol, + Rollout, +} + +#[derive(Clone, Debug, Deserialize, Eq, PartialEq, Serialize)] +pub(crate) struct FaultRecord { + pub(crate) fault: Fault, + pub(crate) stage: FaultStage, + pub(crate) operation: String, + pub(crate) jsonrpc_code: i64, + pub(crate) retryable: bool, + pub(crate) retried: bool, +} + +impl FaultRecord { + pub(crate) fn invalid_input( + generation: Generation, + stage: FaultStage, + operation: impl Into<String>, + detail: impl Into<String>, + ) -> Self { + Self::new( + generation, + FaultClass::Protocol, + "invalid_input", + RecoveryDirective::AbortRequest, + stage, + operation, + detail, + -32602, + ) + } + + pub(crate) fn not_initialized( + generation: Generation, + stage: FaultStage, + operation: impl Into<String>, + detail: impl Into<String>, + ) -> Self { + Self::new( + generation, + FaultClass::Protocol, + "not_initialized", + RecoveryDirective::AbortRequest, + stage, + operation, + detail, + -32002, + ) + } + + pub(crate) fn unavailable( + generation: Generation, + stage: FaultStage, + operation: impl Into<String>, + detail: impl Into<String>, + ) -> Self { + Self::new( + generation, + FaultClass::Resource, + "unavailable", + RecoveryDirective::AbortRequest, + stage, + operation, + detail, + -32004, + ) + } + + pub(crate) fn transport( + generation: Generation, + stage: FaultStage, + operation: impl Into<String>, + detail: impl Into<String>, + ) -> Self { + Self::new( + generation, + FaultClass::Transport, + "transport_failure", + RecoveryDirective::RestartAndReplay, + stage, + operation, + detail, + -32603, + ) + } + + pub(crate) fn process( + generation: Generation, + stage: FaultStage, + operation: impl Into<String>, + detail: impl Into<String>, + ) -> Self { + Self::new( + generation, + FaultClass::Process, + "process_failure", + RecoveryDirective::RestartAndReplay, + stage, + operation, + detail, + -32603, + ) + } + + pub(crate) fn internal( + generation: Generation, + stage: FaultStage, + operation: impl Into<String>, + detail: impl Into<String>, + ) -> Self { + Self::new( + generation, + FaultClass::Invariant, + "internal_failure", + RecoveryDirective::AbortRequest, + stage, + operation, + detail, + -32603, + ) + } + + pub(crate) fn rollout( + generation: Generation, + operation: impl Into<String>, + detail: impl Into<String>, + ) -> Self { + Self::new( + generation, + FaultClass::Rollout, + "rollout_failure", + RecoveryDirective::RestartAndReplay, + FaultStage::Rollout, + operation, + detail, + -32603, + ) + } + + pub(crate) fn mark_retried(mut self) -> Self { + self.retried = true; + self + } + + pub(crate) fn message(&self) -> &str { + self.fault.detail.as_str() + } + + pub(crate) fn error_detail(&self) -> ToolErrorDetail { + ToolErrorDetail { + code: Some(self.jsonrpc_code), + kind: Some(self.fault.code.as_str().to_owned()), + message: Some(self.message().to_owned()), + } + } + + pub(crate) fn into_jsonrpc_error(self) -> Value { + json!({ + "code": self.jsonrpc_code, + "message": self.message(), + "data": self, + }) + } + + pub(crate) fn into_tool_result(self) -> Value { + json!({ + "content": [{ + "type": "text", + "text": self.message(), + }], + "structuredContent": self, + "isError": true, + }) + } + + fn new( + generation: Generation, + class: FaultClass, + code: &'static str, + directive: RecoveryDirective, + stage: FaultStage, + operation: impl Into<String>, + detail: impl Into<String>, + jsonrpc_code: i64, + ) -> Self { + let fault = Fault::new(generation, class, fault_code(code), directive, detail); + Self { + retryable: directive != RecoveryDirective::AbortRequest, + fault, + stage, + operation: operation.into(), + jsonrpc_code, + retried: false, + } + } +} + +fn fault_code(code: &'static str) -> FaultCode { + match FaultCode::try_new(code.to_owned()) { + Ok(value) => value, + Err(_) => std::process::abort(), + } +} diff --git a/crates/jira-at-home/src/mcp/host/binary.rs b/crates/jira-at-home/src/mcp/host/binary.rs new file mode 100644 index 0000000..9ec7721 --- /dev/null +++ b/crates/jira-at-home/src/mcp/host/binary.rs @@ -0,0 +1,41 @@ +use std::fs; +use std::io; +use std::path::{Path, PathBuf}; + +use crate::mcp::protocol::BinaryFingerprint; + +pub(crate) struct BinaryRuntime { + pub(crate) path: PathBuf, + startup_fingerprint: BinaryFingerprint, + pub(crate) launch_path_stable: bool, +} + +impl BinaryRuntime { + pub(crate) fn new(path: PathBuf) -> io::Result<Self> { + let startup_fingerprint = fingerprint_binary(&path)?; + Ok(Self { + launch_path_stable: !path + .components() + .any(|component| component.as_os_str().to_string_lossy() == "target"), + path, + startup_fingerprint, + }) + } + + pub(crate) fn rollout_pending(&self) -> io::Result<bool> { + Ok(fingerprint_binary(&self.path)? != self.startup_fingerprint) + } +} + +fn fingerprint_binary(path: &Path) -> io::Result<BinaryFingerprint> { + let metadata = fs::metadata(path)?; + let modified_unix_nanos = metadata + .modified()? + .duration_since(std::time::UNIX_EPOCH) + .map_err(|error| io::Error::other(format!("invalid binary mtime: {error}")))? + .as_nanos(); + Ok(BinaryFingerprint { + length_bytes: metadata.len(), + modified_unix_nanos, + }) +} diff --git a/crates/jira-at-home/src/mcp/host/mod.rs b/crates/jira-at-home/src/mcp/host/mod.rs new file mode 100644 index 0000000..29cdcb1 --- /dev/null +++ b/crates/jira-at-home/src/mcp/host/mod.rs @@ -0,0 +1,3 @@ +pub(crate) mod binary; +pub(crate) mod process; +pub(crate) mod runtime; diff --git a/crates/jira-at-home/src/mcp/host/process.rs b/crates/jira-at-home/src/mcp/host/process.rs new file mode 100644 index 0000000..492be55 --- /dev/null +++ b/crates/jira-at-home/src/mcp/host/process.rs @@ -0,0 +1,262 @@ +use std::io::{BufRead, BufReader, BufWriter, Write}; +use std::path::PathBuf; +use std::process::{Child, ChildStdin, ChildStdout, Command, Stdio}; + +use libmcp::Generation; +use serde::{Deserialize, Serialize}; +use serde_json::Value; + +use crate::mcp::fault::{FaultRecord, FaultStage}; +use crate::mcp::protocol::{ + HostRequestId, WorkerOperation, WorkerOutcome, WorkerRequest, WorkerResponse, WorkerSpawnConfig, +}; + +#[derive(Clone, Debug, Deserialize, Eq, PartialEq, Serialize)] +pub(super) struct ProjectBinding { + pub(super) requested_path: PathBuf, + pub(super) project_root: PathBuf, + pub(super) issues_root: PathBuf, + pub(super) state_root: PathBuf, +} + +pub(super) struct WorkerSupervisor { + config: WorkerSpawnConfig, + generation: Generation, + has_spawned: bool, + crash_before_reply_once: bool, + bound_project_root: Option<PathBuf>, + child: Option<Child>, + stdin: Option<BufWriter<ChildStdin>>, + stdout: Option<BufReader<ChildStdout>>, +} + +impl WorkerSupervisor { + pub(super) fn new( + config: WorkerSpawnConfig, + generation: Generation, + has_spawned: bool, + ) -> Self { + Self { + config, + generation, + has_spawned, + crash_before_reply_once: false, + bound_project_root: None, + child: None, + stdin: None, + stdout: None, + } + } + + pub(super) fn generation(&self) -> Generation { + self.generation + } + + pub(super) fn has_spawned(&self) -> bool { + self.has_spawned + } + + pub(super) fn rebind(&mut self, project_root: PathBuf) { + if self + .bound_project_root + .as_ref() + .is_some_and(|current| current == &project_root) + { + return; + } + self.kill_current_worker(); + self.bound_project_root = Some(project_root); + } + + pub(super) fn refresh_binding(&mut self, project_root: PathBuf) { + self.kill_current_worker(); + self.bound_project_root = Some(project_root); + } + + pub(super) fn execute( + &mut self, + request_id: HostRequestId, + operation: WorkerOperation, + ) -> Result<Value, FaultRecord> { + self.ensure_worker()?; + let request = WorkerRequest::Execute { + id: request_id, + operation, + }; + let stdin = self.stdin.as_mut().ok_or_else(|| { + FaultRecord::transport( + self.generation, + FaultStage::Transport, + "worker.stdin", + "worker stdin is not available", + ) + })?; + serde_json::to_writer(&mut *stdin, &request).map_err(|error| { + FaultRecord::transport( + self.generation, + FaultStage::Transport, + "worker.write", + format!("failed to encode worker request: {error}"), + ) + })?; + stdin.write_all(b"\n").map_err(|error| { + FaultRecord::transport( + self.generation, + FaultStage::Transport, + "worker.write", + format!("failed to frame worker request: {error}"), + ) + })?; + stdin.flush().map_err(|error| { + FaultRecord::transport( + self.generation, + FaultStage::Transport, + "worker.write", + format!("failed to flush worker request: {error}"), + ) + })?; + + if self.crash_before_reply_once { + self.crash_before_reply_once = false; + self.kill_current_worker(); + return Err(FaultRecord::transport( + self.generation, + FaultStage::Transport, + "worker.read", + "worker crashed before replying", + )); + } + + let stdout = self.stdout.as_mut().ok_or_else(|| { + FaultRecord::transport( + self.generation, + FaultStage::Transport, + "worker.stdout", + "worker stdout is not available", + ) + })?; + let mut line = String::new(); + let bytes = stdout.read_line(&mut line).map_err(|error| { + FaultRecord::transport( + self.generation, + FaultStage::Transport, + "worker.read", + format!("failed to read worker response: {error}"), + ) + })?; + if bytes == 0 { + self.kill_current_worker(); + return Err(FaultRecord::transport( + self.generation, + FaultStage::Transport, + "worker.read", + "worker exited before replying", + )); + } + let response = serde_json::from_str::<WorkerResponse>(&line).map_err(|error| { + FaultRecord::transport( + self.generation, + FaultStage::Transport, + "worker.read", + format!("invalid worker response: {error}"), + ) + })?; + match response.outcome { + WorkerOutcome::Success { result } => Ok(result), + WorkerOutcome::Fault { fault } => Err(fault), + } + } + + pub(super) fn restart(&mut self) -> Result<(), FaultRecord> { + self.kill_current_worker(); + self.ensure_worker() + } + + pub(super) fn is_alive(&mut self) -> bool { + let Some(child) = self.child.as_mut() else { + return false; + }; + if let Ok(None) = child.try_wait() { + true + } else { + self.child = None; + self.stdin = None; + self.stdout = None; + false + } + } + + pub(super) fn arm_crash_once(&mut self) { + self.crash_before_reply_once = true; + } + + fn ensure_worker(&mut self) -> Result<(), FaultRecord> { + if self.is_alive() { + return Ok(()); + } + let Some(project_root) = self.bound_project_root.as_ref() else { + return Err(FaultRecord::unavailable( + self.generation, + FaultStage::Host, + "worker.spawn", + "project is not bound; call project.bind before using issue tools", + )); + }; + let generation = if self.has_spawned { + self.generation.next() + } else { + self.generation + }; + let mut child = Command::new(&self.config.executable) + .arg("mcp") + .arg("worker") + .arg("--project") + .arg(project_root) + .arg("--generation") + .arg(generation.get().to_string()) + .stdin(Stdio::piped()) + .stdout(Stdio::piped()) + .stderr(Stdio::inherit()) + .spawn() + .map_err(|error| { + FaultRecord::process( + generation, + FaultStage::Transport, + "worker.spawn", + format!("failed to spawn worker: {error}"), + ) + })?; + let stdin = child.stdin.take().ok_or_else(|| { + FaultRecord::internal( + generation, + FaultStage::Transport, + "worker.spawn", + "worker stdin pipe was not created", + ) + })?; + let stdout = child.stdout.take().ok_or_else(|| { + FaultRecord::internal( + generation, + FaultStage::Transport, + "worker.spawn", + "worker stdout pipe was not created", + ) + })?; + self.generation = generation; + self.has_spawned = true; + self.child = Some(child); + self.stdin = Some(BufWriter::new(stdin)); + self.stdout = Some(BufReader::new(stdout)); + Ok(()) + } + + fn kill_current_worker(&mut self) { + if let Some(child) = self.child.as_mut() { + let _ = child.kill(); + let _ = child.wait(); + } + self.child = None; + self.stdin = None; + self.stdout = None; + } +} diff --git a/crates/jira-at-home/src/mcp/host/runtime.rs b/crates/jira-at-home/src/mcp/host/runtime.rs new file mode 100644 index 0000000..b2049a0 --- /dev/null +++ b/crates/jira-at-home/src/mcp/host/runtime.rs @@ -0,0 +1,952 @@ +use std::io::{self, BufRead, Write}; +#[cfg(unix)] +use std::os::unix::process::CommandExt; +use std::path::PathBuf; +use std::process::Command; +use std::time::Instant; + +use libmcp::{ + FramedMessage, Generation, HostSessionKernel, ReplayContract, RequestId, RolloutState, + TelemetryLog, ToolOutcome, load_snapshot_file_from_env, remove_snapshot_file, + write_snapshot_file, +}; +use serde::Serialize; +use serde_json::{Map, Value, json}; + +use crate::mcp::catalog::{DispatchTarget, tool_definitions, tool_spec}; +use crate::mcp::fault::{FaultRecord, FaultStage}; +use crate::mcp::host::binary::BinaryRuntime; +use crate::mcp::host::process::{ProjectBinding, WorkerSupervisor}; +use crate::mcp::output::{ + ToolOutput, fallback_detailed_tool_output, split_presentation, tool_success, +}; +use crate::mcp::protocol::{ + CRASH_ONCE_ENV, FORCE_ROLLOUT_ENV, HOST_STATE_ENV, HostRequestId, HostStateSeed, + PROTOCOL_VERSION, ProjectBindingSeed, SERVER_NAME, WorkerOperation, WorkerSpawnConfig, +}; +use crate::mcp::telemetry::ServerTelemetry; +use crate::store::IssueStore; + +pub(crate) fn run_host(initial_project: Option<PathBuf>) -> Result<(), Box<dyn std::error::Error>> { + let stdin = io::stdin(); + let mut stdout = io::stdout().lock(); + let mut host = HostRuntime::new(initial_project)?; + + for line in stdin.lock().lines() { + let line = line?; + if line.trim().is_empty() { + continue; + } + + let maybe_response = host.handle_line(&line); + if let Some(response) = maybe_response { + write_message(&mut stdout, &response)?; + } + host.maybe_roll_forward()?; + } + + Ok(()) +} + +struct HostRuntime { + initial_project: Option<PathBuf>, + binding: Option<ProjectBinding>, + session_kernel: HostSessionKernel, + telemetry: ServerTelemetry, + telemetry_log: Option<TelemetryLog>, + next_request_id: u64, + worker: WorkerSupervisor, + binary: BinaryRuntime, + force_rollout_key: Option<String>, + force_rollout_consumed: bool, + rollout_requested: bool, + crash_once_key: Option<String>, + crash_once_consumed: bool, +} + +impl HostRuntime { + fn new(initial_project: Option<PathBuf>) -> Result<Self, Box<dyn std::error::Error>> { + let executable = std::env::current_exe()?; + let binary = BinaryRuntime::new(executable.clone())?; + let restored = restore_host_state()?; + let session_kernel = restored + .as_ref() + .map(|seed| seed.session_kernel.clone().restore()) + .transpose()? + .map_or_else(HostSessionKernel::cold, HostSessionKernel::from_restored); + let telemetry = restored + .as_ref() + .map_or_else(ServerTelemetry::default, |seed| seed.telemetry.clone()); + let next_request_id = restored + .as_ref() + .map_or(1, |seed| seed.next_request_id.max(1)); + let worker_generation = restored + .as_ref() + .map_or(Generation::genesis(), |seed| seed.worker_generation); + let worker_spawned = restored.as_ref().is_some_and(|seed| seed.worker_spawned); + let force_rollout_consumed = restored + .as_ref() + .is_some_and(|seed| seed.force_rollout_consumed); + let crash_once_consumed = restored + .as_ref() + .is_some_and(|seed| seed.crash_once_consumed); + let binding = if let Some(seed) = restored.as_ref().and_then(|seed| seed.binding.clone()) { + Some(restore_binding(seed)?) + } else if let Some(path) = initial_project.clone() { + Some(resolve_project_binding(path)?.binding) + } else { + None + }; + let telemetry_log = binding.as_ref().map(open_telemetry_log).transpose()?; + + let mut worker = WorkerSupervisor::new( + WorkerSpawnConfig { + executable: executable.clone(), + }, + worker_generation, + worker_spawned, + ); + if let Some(project_root) = binding.as_ref().map(|binding| binding.project_root.clone()) { + worker.rebind(project_root); + } + + Ok(Self { + initial_project, + binding, + session_kernel, + telemetry, + telemetry_log, + next_request_id, + worker, + binary, + force_rollout_key: std::env::var(FORCE_ROLLOUT_ENV).ok(), + force_rollout_consumed, + rollout_requested: false, + crash_once_key: std::env::var(CRASH_ONCE_ENV).ok(), + crash_once_consumed, + }) + } + + fn handle_line(&mut self, line: &str) -> Option<Value> { + let frame = match FramedMessage::parse(line.as_bytes().to_vec()) { + Ok(frame) => frame, + Err(error) => { + return Some(jsonrpc_error( + Value::Null, + FaultRecord::invalid_input( + self.worker.generation(), + FaultStage::Protocol, + "jsonrpc.parse", + format!("parse error: {error}"), + ), + )); + } + }; + self.handle_frame(frame) + } + + fn handle_frame(&mut self, frame: FramedMessage) -> Option<Value> { + self.session_kernel.observe_client_frame(&frame); + let Some(object) = frame.value.as_object() else { + return Some(jsonrpc_error( + Value::Null, + FaultRecord::invalid_input( + self.worker.generation(), + FaultStage::Protocol, + "jsonrpc.message", + "invalid request: expected JSON object", + ), + )); + }; + let method = object.get("method").and_then(Value::as_str)?; + let id = object.get("id").cloned(); + let params = object.get("params").cloned().unwrap_or_else(|| json!({})); + let operation_key = operation_key(method, ¶ms); + let started_at = Instant::now(); + + self.telemetry.record_request(&operation_key); + let response = match self.dispatch(&frame, method, params, id.clone()) { + Ok(Some(result)) => { + let latency_ms = elapsed_ms(started_at.elapsed()); + self.telemetry.record_success( + &operation_key, + latency_ms, + self.worker.generation(), + self.worker.is_alive(), + ); + id.map(|id| jsonrpc_result(id, result)) + } + Ok(None) => { + let latency_ms = elapsed_ms(started_at.elapsed()); + self.telemetry.record_success( + &operation_key, + latency_ms, + self.worker.generation(), + self.worker.is_alive(), + ); + None + } + Err(fault) => { + let latency_ms = elapsed_ms(started_at.elapsed()); + self.telemetry.record_error( + &operation_key, + &fault, + latency_ms, + self.worker.generation(), + ); + Some(match id { + Some(id) if method == "tools/call" => { + jsonrpc_result(id, fault.into_tool_result()) + } + Some(id) => jsonrpc_error(id, fault), + None => jsonrpc_error(Value::Null, fault), + }) + } + }; + + if self.should_force_rollout(&operation_key) { + self.force_rollout_consumed = true; + self.telemetry.record_rollout(); + self.rollout_requested = true; + } + + response + } + + fn dispatch( + &mut self, + request_frame: &FramedMessage, + method: &str, + params: Value, + request_id: Option<Value>, + ) -> Result<Option<Value>, FaultRecord> { + match method { + "initialize" => Ok(Some(json!({ + "protocolVersion": PROTOCOL_VERSION, + "capabilities": { + "tools": { "listChanged": false } + }, + "serverInfo": { + "name": SERVER_NAME, + "version": env!("CARGO_PKG_VERSION") + }, + "instructions": "Bind the session with project.bind, then use issue.save to park ideas in issues/<slug>.md. issue.list enumerates every existing issue file because there is no closed state." + }))), + "notifications/initialized" => { + if !self.seed_captured() { + return Err(FaultRecord::not_initialized( + self.worker.generation(), + FaultStage::Host, + "notifications/initialized", + "received initialized notification before initialize", + )); + } + Ok(None) + } + "notifications/cancelled" => Ok(None), + "ping" => Ok(Some(json!({}))), + other => { + self.require_initialized(other)?; + match other { + "tools/list" => Ok(Some(json!({ "tools": tool_definitions() }))), + "tools/call" => Ok(Some(self.dispatch_tool_call( + request_frame, + params, + request_id, + )?)), + _ => Err(FaultRecord::invalid_input( + self.worker.generation(), + FaultStage::Protocol, + other, + format!("method `{other}` is not implemented"), + )), + } + } + } + } + + fn dispatch_tool_call( + &mut self, + request_frame: &FramedMessage, + params: Value, + _request_id: Option<Value>, + ) -> Result<Value, FaultRecord> { + let envelope = + deserialize::<ToolCallEnvelope>(params, "tools/call", self.worker.generation())?; + let spec = tool_spec(&envelope.name).ok_or_else(|| { + FaultRecord::invalid_input( + self.worker.generation(), + FaultStage::Host, + format!("tools/call:{}", envelope.name), + format!("unknown tool `{}`", envelope.name), + ) + })?; + match spec.dispatch { + DispatchTarget::Host => { + let started_at = Instant::now(); + let request_id = request_id_from_frame(request_frame); + let result = self.handle_host_tool(&envelope.name, envelope.arguments); + self.record_host_tool_completion( + request_frame, + request_id.as_ref(), + elapsed_ms(started_at.elapsed()), + result.as_ref().err(), + ); + result + } + DispatchTarget::Worker => { + self.dispatch_worker_tool(request_frame, spec, envelope.arguments) + } + } + } + + fn dispatch_worker_tool( + &mut self, + request_frame: &FramedMessage, + spec: crate::mcp::catalog::ToolSpec, + arguments: Value, + ) -> Result<Value, FaultRecord> { + let operation = format!("tools/call:{}", spec.name); + self.dispatch_worker_operation( + request_frame, + operation, + spec.replay, + WorkerOperation::CallTool { + name: spec.name.to_owned(), + arguments, + }, + ) + } + + fn dispatch_worker_operation( + &mut self, + request_frame: &FramedMessage, + operation: String, + replay: ReplayContract, + worker_operation: WorkerOperation, + ) -> Result<Value, FaultRecord> { + let binding = self.require_bound_project(&operation)?; + self.worker.rebind(binding.project_root.clone()); + + if self.should_crash_worker_once(&operation) { + self.worker.arm_crash_once(); + } + + self.session_kernel + .record_forwarded_request(request_frame, replay); + let forwarded_request_id = request_id_from_frame(request_frame); + let host_request_id = self.allocate_request_id(); + let started_at = Instant::now(); + let mut replay_attempts = 0; + + let outcome = match self + .worker + .execute(host_request_id, worker_operation.clone()) + { + Ok(result) => Ok(result), + Err(fault) => { + if replay == ReplayContract::Convergent && fault.retryable { + replay_attempts = 1; + self.telemetry.record_retry(&operation); + self.worker + .restart() + .map_err(|restart_fault| restart_fault.mark_retried())?; + self.telemetry + .record_worker_restart(self.worker.generation()); + self.worker + .execute(host_request_id, worker_operation) + .map_err(FaultRecord::mark_retried) + } else { + Err(fault) + } + } + }; + + let completed = forwarded_request_id + .as_ref() + .and_then(|request_id| self.session_kernel.take_completed_request(request_id)); + self.record_worker_tool_completion( + forwarded_request_id.as_ref(), + completed.as_ref(), + elapsed_ms(started_at.elapsed()), + replay_attempts, + outcome.as_ref().err(), + ); + outcome + } + + fn handle_host_tool(&mut self, name: &str, arguments: Value) -> Result<Value, FaultRecord> { + let operation = format!("tools/call:{name}"); + let generation = self.worker.generation(); + let (presentation, arguments) = + split_presentation(arguments, &operation, generation, FaultStage::Host)?; + match name { + "project.bind" => { + let args = deserialize::<ProjectBindArgs>( + arguments, + "tools/call:project.bind", + generation, + )?; + let resolved = + resolve_project_binding(PathBuf::from(args.path)).map_err(|error| { + FaultRecord::invalid_input( + generation, + FaultStage::Host, + "tools/call:project.bind", + error.to_string(), + ) + })?; + self.worker + .refresh_binding(resolved.binding.project_root.clone()); + self.telemetry_log = + Some(open_telemetry_log(&resolved.binding).map_err(|error| { + FaultRecord::internal( + generation, + FaultStage::Host, + "tools/call:project.bind", + error.to_string(), + ) + })?); + self.binding = Some(resolved.binding); + tool_success( + project_bind_output(&resolved.status, generation)?, + presentation, + generation, + FaultStage::Host, + "tools/call:project.bind", + ) + } + "system.health" => { + let rollout = if self.binary.rollout_pending().map_err(|error| { + FaultRecord::rollout(generation, &operation, error.to_string()) + })? { + RolloutState::Pending + } else { + RolloutState::Stable + }; + let health = self.telemetry.health_snapshot(rollout); + tool_success( + system_health_output( + &health, + self.binding.as_ref(), + self.worker.is_alive(), + self.binary.launch_path_stable, + generation, + )?, + presentation, + generation, + FaultStage::Host, + &operation, + ) + } + "system.telemetry" => { + let snapshot = self.telemetry.telemetry_snapshot(); + tool_success( + system_telemetry_output(&snapshot, self.telemetry.host_rollouts(), generation)?, + presentation, + generation, + FaultStage::Host, + &operation, + ) + } + other => Err(FaultRecord::invalid_input( + generation, + FaultStage::Host, + format!("tools/call:{other}"), + format!("unknown host tool `{other}`"), + )), + } + } + + fn require_initialized(&self, operation: &str) -> Result<(), FaultRecord> { + if self.session_initialized() { + return Ok(()); + } + Err(FaultRecord::not_initialized( + self.worker.generation(), + FaultStage::Host, + operation, + "client must call initialize and notifications/initialized before normal operations", + )) + } + + fn require_bound_project(&self, operation: &str) -> Result<&ProjectBinding, FaultRecord> { + self.binding.as_ref().ok_or_else(|| { + FaultRecord::unavailable( + self.worker.generation(), + FaultStage::Host, + operation, + "project is not bound; call project.bind with the target project root or a nested path inside it", + ) + }) + } + + fn session_initialized(&self) -> bool { + self.session_kernel + .initialization_seed() + .is_some_and(|seed| seed.initialized_notification.is_some()) + } + + fn seed_captured(&self) -> bool { + self.session_kernel.initialization_seed().is_some() + } + + fn allocate_request_id(&mut self) -> HostRequestId { + let id = HostRequestId(self.next_request_id); + self.next_request_id += 1; + id + } + + fn maybe_roll_forward(&mut self) -> Result<(), Box<dyn std::error::Error>> { + let binary_pending = self.binary.rollout_pending()?; + if !self.rollout_requested && !binary_pending { + return Ok(()); + } + if binary_pending && !self.rollout_requested { + self.telemetry.record_rollout(); + } + self.roll_forward() + } + + fn roll_forward(&mut self) -> Result<(), Box<dyn std::error::Error>> { + let state = HostStateSeed { + session_kernel: self.session_kernel.snapshot(), + telemetry: self.telemetry.clone(), + next_request_id: self.next_request_id, + binding: self.binding.as_ref().map(ProjectBindingSeed::from), + worker_generation: self.worker.generation(), + worker_spawned: self.worker.has_spawned(), + force_rollout_consumed: self.force_rollout_consumed, + crash_once_consumed: self.crash_once_consumed, + }; + let state_path = write_snapshot_file("jira-at-home-mcp-host-reexec", &state)?; + let mut command = Command::new(&self.binary.path); + let _ = command.arg("mcp").arg("serve"); + if let Some(project) = self.initial_project.as_ref() { + let _ = command.arg("--project").arg(project); + } + let _ = command.env(HOST_STATE_ENV, &state_path); + #[cfg(unix)] + { + let error = command.exec(); + let _ = remove_snapshot_file(&state_path); + Err(Box::new(error)) + } + #[cfg(not(unix))] + { + let _ = remove_snapshot_file(&state_path); + Err(Box::new(io::Error::new( + io::ErrorKind::Unsupported, + "host rollout requires unix exec support", + ))) + } + } + + fn should_force_rollout(&self, operation: &str) -> bool { + self.force_rollout_key + .as_deref() + .is_some_and(|key| key == operation) + && !self.force_rollout_consumed + } + + fn should_crash_worker_once(&mut self, operation: &str) -> bool { + let should_crash = self + .crash_once_key + .as_deref() + .is_some_and(|key| key == operation) + && !self.crash_once_consumed; + if should_crash { + self.crash_once_consumed = true; + } + should_crash + } + + fn record_host_tool_completion( + &mut self, + request_frame: &FramedMessage, + request_id: Option<&RequestId>, + latency_ms: u64, + fault: Option<&FaultRecord>, + ) { + let Some(request_id) = request_id else { + return; + }; + let Some(tool_meta) = libmcp::parse_tool_call_meta(request_frame, "tools/call") else { + return; + }; + self.record_tool_completion(request_id, &tool_meta, latency_ms, 0, fault); + } + + fn record_worker_tool_completion( + &mut self, + request_id: Option<&RequestId>, + completed: Option<&libmcp::CompletedPendingRequest>, + latency_ms: u64, + replay_attempts: u8, + fault: Option<&FaultRecord>, + ) { + let Some(request_id) = request_id else { + return; + }; + let Some(completed) = completed else { + return; + }; + let Some(tool_meta) = completed.request.tool_call_meta.as_ref() else { + return; + }; + self.record_tool_completion(request_id, tool_meta, latency_ms, replay_attempts, fault); + } + + fn record_tool_completion( + &mut self, + request_id: &RequestId, + tool_meta: &libmcp::ToolCallMeta, + latency_ms: u64, + replay_attempts: u8, + fault: Option<&FaultRecord>, + ) { + let Some(log) = self.telemetry_log.as_mut() else { + return; + }; + let result = log.record_tool_completion( + request_id, + tool_meta, + latency_ms, + replay_attempts, + if fault.is_some() { + ToolOutcome::Error + } else { + ToolOutcome::Ok + }, + fault.map_or_else(libmcp::ToolErrorDetail::default, FaultRecord::error_detail), + ); + if let Err(error) = result { + eprintln!("jira_at_home telemetry write failed: {error}"); + } + } +} + +struct ResolvedProjectBinding { + binding: ProjectBinding, + status: ProjectBindStatus, +} + +#[derive(Debug, Serialize)] +struct ProjectBindStatus { + requested_path: String, + project_root: String, + issues_root: String, + state_root: String, + issue_count: usize, +} + +fn resolve_project_binding( + requested_path: PathBuf, +) -> Result<ResolvedProjectBinding, Box<dyn std::error::Error>> { + let store = IssueStore::bind(requested_path.clone())?; + let layout = store.layout().clone(); + let status = store.status()?; + Ok(ResolvedProjectBinding { + binding: ProjectBinding { + requested_path: requested_path.clone(), + project_root: layout.project_root.clone(), + issues_root: layout.issues_root.clone(), + state_root: layout.state_root.clone(), + }, + status: ProjectBindStatus { + requested_path: requested_path.display().to_string(), + project_root: layout.project_root.display().to_string(), + issues_root: layout.issues_root.display().to_string(), + state_root: layout.state_root.display().to_string(), + issue_count: status.issue_count, + }, + }) +} + +fn restore_binding(seed: ProjectBindingSeed) -> Result<ProjectBinding, Box<dyn std::error::Error>> { + Ok(resolve_project_binding(seed.requested_path)?.binding) +} + +fn restore_host_state() -> Result<Option<HostStateSeed>, Box<dyn std::error::Error>> { + Ok(load_snapshot_file_from_env(HOST_STATE_ENV)?) +} + +fn open_telemetry_log(binding: &ProjectBinding) -> io::Result<TelemetryLog> { + TelemetryLog::new( + binding + .state_root + .join("mcp") + .join("telemetry.jsonl") + .as_path(), + binding.project_root.as_path(), + 1, + ) +} + +fn project_bind_output( + status: &ProjectBindStatus, + generation: Generation, +) -> Result<ToolOutput, FaultRecord> { + let mut concise = Map::new(); + let _ = concise.insert("project_root".to_owned(), json!(status.project_root)); + let _ = concise.insert("issues_root".to_owned(), json!(status.issues_root)); + let _ = concise.insert("state_root".to_owned(), json!(status.state_root)); + let _ = concise.insert("issue_count".to_owned(), json!(status.issue_count)); + if status.requested_path != status.project_root { + let _ = concise.insert("requested_path".to_owned(), json!(status.requested_path)); + } + fallback_detailed_tool_output( + &Value::Object(concise), + status, + [ + format!("bound project {}", status.project_root), + format!("issues: {}", status.issues_root), + format!("state: {}", status.state_root), + format!("issues tracked: {}", status.issue_count), + ] + .join("\n"), + None, + libmcp::SurfaceKind::Mutation, + generation, + FaultStage::Host, + "tools/call:project.bind", + ) +} + +fn system_health_output( + health: &libmcp::HealthSnapshot, + binding: Option<&ProjectBinding>, + worker_alive: bool, + launch_path_stable: bool, + generation: Generation, +) -> Result<ToolOutput, FaultRecord> { + let rollout_pending = matches!(health.rollout, Some(RolloutState::Pending)); + let mut concise = Map::new(); + let _ = concise.insert( + "ready".to_owned(), + json!(matches!(health.state, libmcp::LifecycleState::Ready)), + ); + let _ = concise.insert("bound".to_owned(), json!(binding.is_some())); + let _ = concise.insert( + "worker_generation".to_owned(), + json!(health.generation.get()), + ); + let _ = concise.insert("worker_alive".to_owned(), json!(worker_alive)); + let _ = concise.insert("rollout_pending".to_owned(), json!(rollout_pending)); + let _ = concise.insert("launch_path_stable".to_owned(), json!(launch_path_stable)); + if let Some(binding) = binding { + let _ = concise.insert( + "project_root".to_owned(), + json!(binding.project_root.display().to_string()), + ); + let _ = concise.insert( + "issues_root".to_owned(), + json!(binding.issues_root.display().to_string()), + ); + } + let full = json!({ + "health": health, + "binding": binding.map(|binding| json!({ + "requested_path": binding.requested_path.display().to_string(), + "project_root": binding.project_root.display().to_string(), + "issues_root": binding.issues_root.display().to_string(), + "state_root": binding.state_root.display().to_string(), + })), + "worker_alive": worker_alive, + "launch_path_stable": launch_path_stable, + }); + let mut lines = vec![format!( + "{} | {}", + if matches!(health.state, libmcp::LifecycleState::Ready) { + "ready" + } else { + "not-ready" + }, + if binding.is_some() { + "bound" + } else { + "unbound" + } + )]; + if let Some(binding) = binding { + lines.push(format!("project: {}", binding.project_root.display())); + lines.push(format!("issues: {}", binding.issues_root.display())); + } + lines.push(format!( + "worker: gen {} {}", + health.generation.get(), + if worker_alive { "alive" } else { "dead" } + )); + lines.push(format!( + "binary: {}{}", + if launch_path_stable { + "stable" + } else { + "unstable" + }, + if rollout_pending { + " rollout-pending" + } else { + "" + } + )); + fallback_detailed_tool_output( + &Value::Object(concise), + &full, + lines.join("\n"), + None, + libmcp::SurfaceKind::Ops, + generation, + FaultStage::Host, + "tools/call:system.health", + ) +} + +fn system_telemetry_output( + telemetry: &libmcp::TelemetrySnapshot, + host_rollouts: u64, + generation: Generation, +) -> Result<ToolOutput, FaultRecord> { + let hot_methods = telemetry.methods.iter().take(6).collect::<Vec<_>>(); + let concise = json!({ + "requests": telemetry.totals.request_count, + "successes": telemetry.totals.success_count, + "response_errors": telemetry.totals.response_error_count, + "transport_faults": telemetry.totals.transport_fault_count, + "retries": telemetry.totals.retry_count, + "worker_restarts": telemetry.restart_count, + "host_rollouts": host_rollouts, + "hot_methods": hot_methods.iter().map(|method| json!({ + "method": method.method, + "requests": method.request_count, + "response_errors": method.response_error_count, + "transport_faults": method.transport_fault_count, + "retries": method.retry_count, + })).collect::<Vec<_>>(), + }); + let full = json!({ + "telemetry": telemetry, + "host_rollouts": host_rollouts, + }); + let mut lines = vec![format!( + "requests={} success={} response_error={} transport_fault={} retry={}", + telemetry.totals.request_count, + telemetry.totals.success_count, + telemetry.totals.response_error_count, + telemetry.totals.transport_fault_count, + telemetry.totals.retry_count + )]; + lines.push(format!( + "worker_restarts={} host_rollouts={host_rollouts}", + telemetry.restart_count, + )); + if !hot_methods.is_empty() { + lines.push("hot methods:".to_owned()); + for method in hot_methods { + lines.push(format!( + "{} req={} err={} transport={} retry={}", + method.method, + method.request_count, + method.response_error_count, + method.transport_fault_count, + method.retry_count, + )); + } + } + fallback_detailed_tool_output( + &concise, + &full, + lines.join("\n"), + None, + libmcp::SurfaceKind::Ops, + generation, + FaultStage::Host, + "tools/call:system.telemetry", + ) +} + +fn deserialize<T: for<'de> serde::Deserialize<'de>>( + value: Value, + operation: &str, + generation: Generation, +) -> Result<T, FaultRecord> { + serde_json::from_value(value).map_err(|error| { + FaultRecord::invalid_input( + generation, + FaultStage::Protocol, + operation, + format!("invalid params: {error}"), + ) + }) +} + +fn operation_key(method: &str, params: &Value) -> String { + match method { + "tools/call" => params.get("name").and_then(Value::as_str).map_or_else( + || "tools/call".to_owned(), + |name| format!("tools/call:{name}"), + ), + other => other.to_owned(), + } +} + +fn request_id_from_frame(frame: &FramedMessage) -> Option<RequestId> { + match frame.classify() { + libmcp::RpcEnvelopeKind::Request { id, .. } => Some(id), + libmcp::RpcEnvelopeKind::Notification { .. } + | libmcp::RpcEnvelopeKind::Response { .. } + | libmcp::RpcEnvelopeKind::Unknown => None, + } +} + +fn jsonrpc_result(id: Value, result: Value) -> Value { + json!({ + "jsonrpc": "2.0", + "id": id, + "result": result, + }) +} + +fn jsonrpc_error(id: Value, fault: FaultRecord) -> Value { + json!({ + "jsonrpc": "2.0", + "id": id, + "error": fault.into_jsonrpc_error(), + }) +} + +fn write_message(stdout: &mut impl Write, message: &Value) -> io::Result<()> { + serde_json::to_writer(&mut *stdout, message)?; + stdout.write_all(b"\n")?; + stdout.flush()?; + Ok(()) +} + +fn elapsed_ms(duration: std::time::Duration) -> u64 { + u64::try_from(duration.as_millis()).unwrap_or(u64::MAX) +} + +#[derive(Debug, serde::Deserialize)] +struct ToolCallEnvelope { + name: String, + #[serde(default = "empty_json_object")] + arguments: Value, +} + +fn empty_json_object() -> Value { + json!({}) +} + +#[derive(Debug, serde::Deserialize)] +struct ProjectBindArgs { + path: String, +} + +impl From<&ProjectBinding> for ProjectBindingSeed { + fn from(value: &ProjectBinding) -> Self { + Self { + requested_path: value.requested_path.clone(), + project_root: value.project_root.clone(), + } + } +} diff --git a/crates/jira-at-home/src/mcp/mod.rs b/crates/jira-at-home/src/mcp/mod.rs new file mode 100644 index 0000000..666598f --- /dev/null +++ b/crates/jira-at-home/src/mcp/mod.rs @@ -0,0 +1,10 @@ +mod catalog; +mod fault; +mod host; +mod output; +mod protocol; +mod service; +mod telemetry; + +pub(crate) use host::runtime::run_host; +pub(crate) use service::run_worker; diff --git a/crates/jira-at-home/src/mcp/output.rs b/crates/jira-at-home/src/mcp/output.rs new file mode 100644 index 0000000..90673b3 --- /dev/null +++ b/crates/jira-at-home/src/mcp/output.rs @@ -0,0 +1,195 @@ +use libmcp::{ + DetailLevel, FallbackJsonProjection, JsonPorcelainConfig, ProjectionError, RenderMode, + SurfaceKind, ToolProjection, render_json_porcelain, with_presentation_properties, +}; +use serde::Serialize; +use serde_json::{Value, json}; + +use crate::mcp::fault::{FaultRecord, FaultStage}; + +const FULL_PORCELAIN_MAX_LINES: usize = 40; +const FULL_PORCELAIN_MAX_INLINE_CHARS: usize = 512; + +#[derive(Debug, Clone, Copy, PartialEq, Eq)] +pub(crate) struct Presentation { + pub(crate) render: RenderMode, + pub(crate) detail: DetailLevel, +} + +#[derive(Debug, Clone)] +pub(crate) struct ToolOutput { + concise: Value, + full: Value, + concise_text: String, + full_text: Option<String>, +} + +impl ToolOutput { + pub(crate) fn from_values( + concise: Value, + full: Value, + concise_text: impl Into<String>, + full_text: Option<String>, + ) -> Self { + Self { + concise, + full, + concise_text: concise_text.into(), + full_text, + } + } + + fn structured(&self, detail: DetailLevel) -> &Value { + match detail { + DetailLevel::Concise => &self.concise, + DetailLevel::Full => &self.full, + } + } + + fn porcelain_text(&self, detail: DetailLevel) -> String { + match detail { + DetailLevel::Concise => self.concise_text.clone(), + DetailLevel::Full => self + .full_text + .clone() + .unwrap_or_else(|| render_json_porcelain(&self.full, full_porcelain_config())), + } + } +} + +impl Default for Presentation { + fn default() -> Self { + Self { + render: RenderMode::Porcelain, + detail: DetailLevel::Concise, + } + } +} + +pub(crate) fn split_presentation( + arguments: Value, + operation: &str, + generation: libmcp::Generation, + stage: FaultStage, +) -> Result<(Presentation, Value), FaultRecord> { + let Value::Object(mut object) = arguments else { + return Ok((Presentation::default(), arguments)); + }; + let render = object + .remove("render") + .map(|value| { + serde_json::from_value::<RenderMode>(value).map_err(|error| { + FaultRecord::invalid_input( + generation, + stage, + operation, + format!("invalid render mode: {error}"), + ) + }) + }) + .transpose()? + .unwrap_or(RenderMode::Porcelain); + let detail = object + .remove("detail") + .map(|value| { + serde_json::from_value::<DetailLevel>(value).map_err(|error| { + FaultRecord::invalid_input( + generation, + stage, + operation, + format!("invalid detail level: {error}"), + ) + }) + }) + .transpose()? + .unwrap_or(DetailLevel::Concise); + Ok((Presentation { render, detail }, Value::Object(object))) +} + +pub(crate) fn projected_tool_output( + projection: &impl ToolProjection, + concise_text: impl Into<String>, + full_text: Option<String>, + generation: libmcp::Generation, + stage: FaultStage, + operation: &str, +) -> Result<ToolOutput, FaultRecord> { + let concise = projection + .concise_projection() + .map_err(|error| projection_fault(error, generation, stage, operation))?; + let full = projection + .full_projection() + .map_err(|error| projection_fault(error, generation, stage, operation))?; + Ok(ToolOutput::from_values( + concise, + full, + concise_text, + full_text, + )) +} + +pub(crate) fn fallback_detailed_tool_output( + concise: &impl Serialize, + full: &impl Serialize, + concise_text: impl Into<String>, + full_text: Option<String>, + kind: SurfaceKind, + generation: libmcp::Generation, + stage: FaultStage, + operation: &str, +) -> Result<ToolOutput, FaultRecord> { + let projection = FallbackJsonProjection::new(concise, full, kind) + .map_err(|error| projection_fault(error, generation, stage, operation))?; + projected_tool_output( + &projection, + concise_text, + full_text, + generation, + stage, + operation, + ) +} + +pub(crate) fn tool_success( + output: ToolOutput, + presentation: Presentation, + generation: libmcp::Generation, + stage: FaultStage, + operation: &str, +) -> Result<Value, FaultRecord> { + let structured = output.structured(presentation.detail).clone(); + let text = match presentation.render { + RenderMode::Porcelain => output.porcelain_text(presentation.detail), + RenderMode::Json => serde_json::to_string_pretty(&structured).map_err(|error| { + FaultRecord::internal(generation, stage, operation, error.to_string()) + })?, + }; + Ok(json!({ + "content": [{ + "type": "text", + "text": text, + }], + "structuredContent": structured, + "isError": false, + })) +} + +pub(crate) fn with_common_presentation(schema: Value) -> Value { + with_presentation_properties(schema) +} + +fn projection_fault( + error: ProjectionError, + generation: libmcp::Generation, + stage: FaultStage, + operation: &str, +) -> FaultRecord { + FaultRecord::internal(generation, stage, operation, error.to_string()) +} + +const fn full_porcelain_config() -> JsonPorcelainConfig { + JsonPorcelainConfig { + max_lines: FULL_PORCELAIN_MAX_LINES, + max_inline_chars: FULL_PORCELAIN_MAX_INLINE_CHARS, + } +} diff --git a/crates/jira-at-home/src/mcp/protocol.rs b/crates/jira-at-home/src/mcp/protocol.rs new file mode 100644 index 0000000..9226a36 --- /dev/null +++ b/crates/jira-at-home/src/mcp/protocol.rs @@ -0,0 +1,78 @@ +use std::path::PathBuf; + +use libmcp::{Generation, HostSessionKernelSnapshot}; +use serde::{Deserialize, Serialize}; +use serde_json::Value; + +use crate::mcp::telemetry::ServerTelemetry; + +pub(crate) const PROTOCOL_VERSION: &str = "2025-11-25"; +pub(crate) const SERVER_NAME: &str = "jira-at-home"; +pub(crate) const HOST_STATE_ENV: &str = "JIRA_AT_HOME_MCP_HOST_STATE"; +pub(crate) const FORCE_ROLLOUT_ENV: &str = "JIRA_AT_HOME_MCP_TEST_FORCE_ROLLOUT_KEY"; +pub(crate) const CRASH_ONCE_ENV: &str = "JIRA_AT_HOME_MCP_TEST_HOST_CRASH_ONCE_KEY"; + +#[derive(Clone, Debug, Deserialize, Serialize)] +pub(crate) struct HostStateSeed { + pub(crate) session_kernel: HostSessionKernelSnapshot, + pub(crate) telemetry: ServerTelemetry, + pub(crate) next_request_id: u64, + pub(crate) binding: Option<ProjectBindingSeed>, + pub(crate) worker_generation: Generation, + pub(crate) worker_spawned: bool, + pub(crate) force_rollout_consumed: bool, + pub(crate) crash_once_consumed: bool, +} + +#[derive(Clone, Debug, Deserialize, Eq, PartialEq, Serialize)] +pub(crate) struct ProjectBindingSeed { + pub(crate) requested_path: PathBuf, + pub(crate) project_root: PathBuf, +} + +#[derive(Clone, Copy, Debug, Deserialize, Eq, Ord, PartialEq, PartialOrd, Serialize)] +#[serde(transparent)] +pub(crate) struct HostRequestId(pub(crate) u64); + +#[derive(Clone, Debug, Deserialize, Eq, PartialEq, Serialize)] +#[serde(tag = "kind", rename_all = "snake_case")] +pub(crate) enum WorkerRequest { + Execute { + id: HostRequestId, + operation: WorkerOperation, + }, +} + +#[derive(Clone, Debug, Deserialize, Eq, PartialEq, Serialize)] +#[serde(tag = "kind", rename_all = "snake_case")] +pub(crate) enum WorkerOperation { + CallTool { name: String, arguments: Value }, +} + +#[derive(Clone, Debug, Deserialize, Eq, PartialEq, Serialize)] +pub(crate) struct WorkerResponse { + pub(crate) id: HostRequestId, + pub(crate) outcome: WorkerOutcome, +} + +#[derive(Clone, Debug, Deserialize, Eq, PartialEq, Serialize)] +#[serde(tag = "status", rename_all = "snake_case")] +pub(crate) enum WorkerOutcome { + Success { + result: Value, + }, + Fault { + fault: crate::mcp::fault::FaultRecord, + }, +} + +#[derive(Clone, Debug, Deserialize, Eq, PartialEq, Serialize)] +pub(crate) struct BinaryFingerprint { + pub(crate) length_bytes: u64, + pub(crate) modified_unix_nanos: u128, +} + +#[derive(Clone, Debug)] +pub(crate) struct WorkerSpawnConfig { + pub(crate) executable: PathBuf, +} diff --git a/crates/jira-at-home/src/mcp/service.rs b/crates/jira-at-home/src/mcp/service.rs new file mode 100644 index 0000000..fc9dbf0 --- /dev/null +++ b/crates/jira-at-home/src/mcp/service.rs @@ -0,0 +1,336 @@ +use std::io::{self, BufRead, Write}; +use std::path::{Path, PathBuf}; + +use libmcp::{Generation, SurfaceKind}; +use serde::Deserialize; +use serde_json::{Value, json}; + +use crate::mcp::fault::{FaultRecord, FaultStage}; +use crate::mcp::output::{ + ToolOutput, fallback_detailed_tool_output, split_presentation, tool_success, +}; +use crate::store::{ + IssueBody, IssueRecord, IssueSlug, IssueStore, SaveReceipt, StoreError, format_timestamp, +}; + +pub(crate) fn run_worker( + project_root: PathBuf, + generation: u64, +) -> Result<(), Box<dyn std::error::Error>> { + let generation = generation_from_wire(generation); + let store = IssueStore::bind(project_root)?; + let stdin = io::stdin(); + let mut stdout = io::stdout().lock(); + let mut service = WorkerService::new(store, generation); + + for line in stdin.lock().lines() { + let line = line?; + if line.trim().is_empty() { + continue; + } + let request = serde_json::from_str::<crate::mcp::protocol::WorkerRequest>(&line)?; + let response = match request { + crate::mcp::protocol::WorkerRequest::Execute { id, operation } => { + let outcome = match service.execute(operation) { + Ok(result) => crate::mcp::protocol::WorkerOutcome::Success { result }, + Err(fault) => crate::mcp::protocol::WorkerOutcome::Fault { fault }, + }; + crate::mcp::protocol::WorkerResponse { id, outcome } + } + }; + serde_json::to_writer(&mut stdout, &response)?; + stdout.write_all(b"\n")?; + stdout.flush()?; + } + + Ok(()) +} + +struct WorkerService { + store: IssueStore, + generation: Generation, +} + +impl WorkerService { + fn new(store: IssueStore, generation: Generation) -> Self { + Self { store, generation } + } + + fn execute( + &mut self, + operation: crate::mcp::protocol::WorkerOperation, + ) -> Result<Value, FaultRecord> { + match operation { + crate::mcp::protocol::WorkerOperation::CallTool { name, arguments } => { + self.call_tool(&name, arguments) + } + } + } + + fn call_tool(&mut self, name: &str, arguments: Value) -> Result<Value, FaultRecord> { + let operation = format!("tools/call:{name}"); + let (presentation, arguments) = + split_presentation(arguments, &operation, self.generation, FaultStage::Worker)?; + let output = match name { + "issue.save" => { + let args = deserialize::<IssueSaveArgs>(arguments, &operation, self.generation)?; + let slug = IssueSlug::parse(args.slug) + .map_err(store_fault(self.generation, &operation))?; + let body = IssueBody::parse(args.body) + .map_err(store_fault(self.generation, &operation))?; + let receipt = self + .store + .save(slug, body) + .map_err(store_fault(self.generation, &operation))?; + issue_save_output( + &receipt, + self.store.layout().project_root.as_path(), + self.generation, + &operation, + )? + } + "issue.list" => { + let issues = self + .store + .list() + .map_err(store_fault(self.generation, &operation))?; + issue_list_output( + &issues, + self.store.layout().project_root.as_path(), + self.generation, + &operation, + )? + } + "issue.read" => { + let args = deserialize::<IssueReadArgs>(arguments, &operation, self.generation)?; + let slug = IssueSlug::parse(args.slug) + .map_err(store_fault(self.generation, &operation))?; + let record = self + .store + .read(slug) + .map_err(store_fault(self.generation, &operation))?; + issue_read_output( + &record, + self.store.layout().project_root.as_path(), + self.generation, + &operation, + )? + } + other => { + return Err(FaultRecord::invalid_input( + self.generation, + FaultStage::Worker, + &operation, + format!("unknown worker tool `{other}`"), + )); + } + }; + tool_success( + output, + presentation, + self.generation, + FaultStage::Worker, + &operation, + ) + } +} + +#[derive(Debug, Deserialize)] +struct IssueSaveArgs { + slug: String, + body: String, +} + +#[derive(Debug, Deserialize)] +struct IssueReadArgs { + slug: String, +} + +fn deserialize<T: for<'de> Deserialize<'de>>( + value: Value, + operation: &str, + generation: Generation, +) -> Result<T, FaultRecord> { + serde_json::from_value(value).map_err(|error| { + FaultRecord::invalid_input( + generation, + FaultStage::Protocol, + operation, + format!("invalid params: {error}"), + ) + }) +} + +fn store_fault( + generation: Generation, + operation: &str, +) -> impl FnOnce(StoreError) -> FaultRecord + '_ { + move |error| { + let stage = if matches!(error, StoreError::Io(_)) { + FaultStage::Store + } else { + FaultStage::Worker + }; + match error { + StoreError::InvalidSlug(_) + | StoreError::EmptyIssueBody + | StoreError::IssueNotFound(_) + | StoreError::MalformedIssueEntry(_, _) + | StoreError::MissingProjectPath(_) + | StoreError::ProjectPathNotDirectory(_) => { + FaultRecord::invalid_input(generation, stage, operation, error.to_string()) + } + StoreError::Io(_) => { + FaultRecord::internal(generation, stage, operation, error.to_string()) + } + } + } +} + +fn issue_save_output( + receipt: &SaveReceipt, + project_root: &Path, + generation: Generation, + operation: &str, +) -> Result<ToolOutput, FaultRecord> { + let relative_path = relative_issue_path(&receipt.path, project_root); + let status = if receipt.created { + "created" + } else { + "updated" + }; + let concise = json!({ + "slug": receipt.slug, + "status": status, + "path": relative_path, + "updated_at": format_timestamp(receipt.updated_at), + }); + let full = json!({ + "slug": receipt.slug, + "status": status, + "path": relative_path, + "updated_at": format_timestamp(receipt.updated_at), + "bytes": receipt.bytes, + }); + fallback_detailed_tool_output( + &concise, + &full, + [ + format!("saved issue {}", receipt.slug), + format!("status: {status}"), + format!("path: {}", relative_issue_path(&receipt.path, project_root)), + format!("updated: {}", format_timestamp(receipt.updated_at)), + ] + .join("\n"), + None, + SurfaceKind::Mutation, + generation, + FaultStage::Worker, + operation, + ) +} + +fn issue_list_output( + issues: &[crate::store::IssueSummary], + project_root: &Path, + generation: Generation, + operation: &str, +) -> Result<ToolOutput, FaultRecord> { + let concise_items = issues + .iter() + .map(|issue| { + json!({ + "slug": issue.slug, + "updated_at": format_timestamp(issue.updated_at), + }) + }) + .collect::<Vec<_>>(); + let full_items = issues + .iter() + .map(|issue| { + let path = relative_issue_path( + &project_root + .join("issues") + .join(format!("{}.md", issue.slug)), + project_root, + ); + json!({ + "slug": issue.slug, + "path": path, + "updated_at": format_timestamp(issue.updated_at), + }) + }) + .collect::<Vec<_>>(); + let mut lines = vec![format!("{} issue(s)", issues.len())]; + lines.extend(issues.iter().map(|issue| issue.slug.to_string())); + fallback_detailed_tool_output( + &json!({ "count": issues.len(), "issues": concise_items }), + &json!({ "count": issues.len(), "issues": full_items }), + lines.join("\n"), + None, + SurfaceKind::List, + generation, + FaultStage::Worker, + operation, + ) +} + +fn issue_read_output( + record: &IssueRecord, + project_root: &Path, + generation: Generation, + operation: &str, +) -> Result<ToolOutput, FaultRecord> { + let relative_path = relative_issue_path(&record.path, project_root); + let concise = json!({ + "slug": record.slug, + "updated_at": format_timestamp(record.updated_at), + "body": record.body, + }); + let full = json!({ + "slug": record.slug, + "path": relative_path, + "updated_at": format_timestamp(record.updated_at), + "bytes": record.bytes, + "body": record.body, + }); + let concise_text = format!( + "issue {}\nupdated: {}\n\n{}", + record.slug, + format_timestamp(record.updated_at), + record.body, + ); + let full_text = Some(format!( + "issue {}\npath: {}\nupdated: {}\nbytes: {}\n\n{}", + record.slug, + relative_issue_path(&record.path, project_root), + format_timestamp(record.updated_at), + record.bytes, + record.body, + )); + fallback_detailed_tool_output( + &concise, + &full, + concise_text, + full_text, + SurfaceKind::Read, + generation, + FaultStage::Worker, + operation, + ) +} + +fn relative_issue_path(path: &Path, project_root: &Path) -> String { + path.strip_prefix(project_root).map_or_else( + |_| path.display().to_string(), + |relative| relative.display().to_string(), + ) +} + +fn generation_from_wire(raw: u64) -> Generation { + let mut generation = Generation::genesis(); + for _ in 1..raw { + generation = generation.next(); + } + generation +} diff --git a/crates/jira-at-home/src/mcp/telemetry.rs b/crates/jira-at-home/src/mcp/telemetry.rs new file mode 100644 index 0000000..8df0009 --- /dev/null +++ b/crates/jira-at-home/src/mcp/telemetry.rs @@ -0,0 +1,228 @@ +use std::collections::BTreeMap; +use std::time::{Duration, SystemTime, UNIX_EPOCH}; + +use libmcp::{ + Fault, Generation, HealthSnapshot, LifecycleState, MethodTelemetry, RolloutState, + TelemetrySnapshot, TelemetryTotals, +}; +use serde::{Deserialize, Serialize}; + +use crate::mcp::fault::FaultRecord; + +#[derive(Clone, Debug, Default, Deserialize, Eq, PartialEq, Serialize)] +struct MethodStats { + request_count: u64, + success_count: u64, + response_error_count: u64, + transport_fault_count: u64, + retry_count: u64, + total_latency_ms: u128, + max_latency_ms: u64, + last_latency_ms: Option<u64>, + last_error: Option<String>, +} + +#[derive(Clone, Debug, Deserialize, Eq, PartialEq, Serialize)] +pub(crate) struct ServerTelemetry { + started_unix_ms: u64, + state: LifecycleState, + generation: Generation, + consecutive_failures: u32, + restart_count: u64, + host_rollouts: u64, + totals: TelemetryTotals, + methods: BTreeMap<String, MethodStats>, + last_fault: Option<Fault>, +} + +impl Default for ServerTelemetry { + fn default() -> Self { + Self { + started_unix_ms: unix_ms_now(), + state: LifecycleState::Cold, + generation: Generation::genesis(), + consecutive_failures: 0, + restart_count: 0, + host_rollouts: 0, + totals: TelemetryTotals { + request_count: 0, + success_count: 0, + response_error_count: 0, + transport_fault_count: 0, + retry_count: 0, + }, + methods: BTreeMap::new(), + last_fault: None, + } + } +} + +impl ServerTelemetry { + pub(crate) fn record_request(&mut self, operation: &str) { + self.totals.request_count += 1; + self.methods + .entry(operation.to_owned()) + .or_default() + .request_count += 1; + } + + pub(crate) fn record_success( + &mut self, + operation: &str, + latency_ms: u64, + generation: Generation, + worker_alive: bool, + ) { + self.generation = generation; + self.state = if worker_alive { + LifecycleState::Ready + } else { + LifecycleState::Cold + }; + self.consecutive_failures = 0; + self.last_fault = None; + self.totals.success_count += 1; + let entry = self.methods.entry(operation.to_owned()).or_default(); + entry.success_count += 1; + entry.total_latency_ms = entry + .total_latency_ms + .saturating_add(u128::from(latency_ms)); + entry.max_latency_ms = entry.max_latency_ms.max(latency_ms); + entry.last_latency_ms = Some(latency_ms); + entry.last_error = None; + } + + pub(crate) fn record_error( + &mut self, + operation: &str, + fault: &FaultRecord, + latency_ms: u64, + generation: Generation, + ) { + self.generation = generation; + self.consecutive_failures = self.consecutive_failures.saturating_add(1); + self.last_fault = Some(fault.fault.clone()); + let transportish = matches!( + fault.fault.class, + libmcp::FaultClass::Transport + | libmcp::FaultClass::Process + | libmcp::FaultClass::Timeout + | libmcp::FaultClass::Resource + | libmcp::FaultClass::Replay + | libmcp::FaultClass::Rollout + ); + if transportish { + self.state = LifecycleState::Recovering; + self.totals.transport_fault_count += 1; + } else { + self.totals.response_error_count += 1; + } + let entry = self.methods.entry(operation.to_owned()).or_default(); + if transportish { + entry.transport_fault_count += 1; + } else { + entry.response_error_count += 1; + } + entry.total_latency_ms = entry + .total_latency_ms + .saturating_add(u128::from(latency_ms)); + entry.max_latency_ms = entry.max_latency_ms.max(latency_ms); + entry.last_latency_ms = Some(latency_ms); + entry.last_error = Some(fault.message().to_owned()); + } + + pub(crate) fn record_retry(&mut self, operation: &str) { + self.totals.retry_count += 1; + self.methods + .entry(operation.to_owned()) + .or_default() + .retry_count += 1; + } + + pub(crate) fn record_worker_restart(&mut self, generation: Generation) { + self.generation = generation; + self.restart_count += 1; + self.state = LifecycleState::Recovering; + } + + pub(crate) fn record_rollout(&mut self) { + self.host_rollouts += 1; + } + + pub(crate) fn host_rollouts(&self) -> u64 { + self.host_rollouts + } + + pub(crate) fn health_snapshot(&self, rollout: RolloutState) -> HealthSnapshot { + HealthSnapshot { + state: self.state, + generation: self.generation, + uptime_ms: self.uptime_ms(), + consecutive_failures: self.consecutive_failures, + restart_count: self.restart_count, + rollout: Some(rollout), + last_fault: self.last_fault.clone(), + } + } + + pub(crate) fn telemetry_snapshot(&self) -> TelemetrySnapshot { + TelemetrySnapshot { + uptime_ms: self.uptime_ms(), + state: self.state, + generation: self.generation, + consecutive_failures: self.consecutive_failures, + restart_count: self.restart_count, + totals: self.totals.clone(), + methods: self.ranked_methods(), + last_fault: self.last_fault.clone(), + } + } + + pub(crate) fn ranked_methods(&self) -> Vec<MethodTelemetry> { + let mut methods = self + .methods + .iter() + .map(|(method, stats)| MethodTelemetry { + method: method.clone(), + request_count: stats.request_count, + success_count: stats.success_count, + response_error_count: stats.response_error_count, + transport_fault_count: stats.transport_fault_count, + retry_count: stats.retry_count, + last_latency_ms: stats.last_latency_ms, + max_latency_ms: stats.max_latency_ms, + avg_latency_ms: average_latency_ms(stats), + last_error: stats.last_error.clone(), + }) + .collect::<Vec<_>>(); + methods.sort_by(|left, right| { + right + .request_count + .cmp(&left.request_count) + .then_with(|| right.transport_fault_count.cmp(&left.transport_fault_count)) + .then_with(|| right.response_error_count.cmp(&left.response_error_count)) + .then_with(|| left.method.cmp(&right.method)) + }); + methods + } + + fn uptime_ms(&self) -> u64 { + unix_ms_now().saturating_sub(self.started_unix_ms) + } +} + +fn average_latency_ms(stats: &MethodStats) -> u64 { + if stats.request_count == 0 { + return 0; + } + let average = stats.total_latency_ms / u128::from(stats.request_count); + u64::try_from(average).unwrap_or(u64::MAX) +} + +fn unix_ms_now() -> u64 { + let duration = SystemTime::now() + .duration_since(UNIX_EPOCH) + .unwrap_or(Duration::ZERO); + let millis = duration.as_millis(); + u64::try_from(millis).unwrap_or(u64::MAX) +} diff --git a/crates/jira-at-home/src/store.rs b/crates/jira-at-home/src/store.rs new file mode 100644 index 0000000..faf2c9a --- /dev/null +++ b/crates/jira-at-home/src/store.rs @@ -0,0 +1,287 @@ +use std::ffi::OsStr; +use std::fs; +use std::io; +use std::path::{Component, Path, PathBuf}; +use std::time::SystemTime; + +use serde::Serialize; +use thiserror::Error; +use time::OffsetDateTime; + +pub(crate) const ISSUES_DIR_NAME: &str = "issues"; +const APP_STATE_DIR_NAME: &str = "jira_at_home"; + +#[derive(Debug, Clone, PartialEq, Eq, Serialize)] +pub(crate) struct IssueSlug(String); + +impl IssueSlug { + pub(crate) fn parse(raw: impl Into<String>) -> Result<Self, StoreError> { + let raw = raw.into(); + if raw.is_empty() { + return Err(StoreError::InvalidSlug("slug must not be empty".to_owned())); + } + if raw.starts_with('-') || raw.ends_with('-') { + return Err(StoreError::InvalidSlug( + "slug must not start or end with `-`".to_owned(), + )); + } + if !raw + .bytes() + .all(|byte| byte.is_ascii_lowercase() || byte.is_ascii_digit() || byte == b'-') + { + return Err(StoreError::InvalidSlug( + "slug must use lowercase ascii letters, digits, and `-` only".to_owned(), + )); + } + if raw.split('-').any(str::is_empty) { + return Err(StoreError::InvalidSlug( + "slug must not contain empty `-` segments".to_owned(), + )); + } + Ok(Self(raw)) + } + + pub(crate) fn as_str(&self) -> &str { + self.0.as_str() + } + + fn from_issue_path(path: &Path) -> Result<Self, StoreError> { + let extension = path.extension().and_then(OsStr::to_str); + if extension != Some("md") { + return Err(StoreError::MalformedIssueEntry( + path.display().to_string(), + "issue file must use the `.md` extension".to_owned(), + )); + } + let stem = path + .file_stem() + .and_then(OsStr::to_str) + .ok_or_else(|| { + StoreError::MalformedIssueEntry( + path.display().to_string(), + "issue file name must be valid UTF-8".to_owned(), + ) + })? + .to_owned(); + Self::parse(stem) + } +} + +impl std::fmt::Display for IssueSlug { + fn fmt(&self, formatter: &mut std::fmt::Formatter<'_>) -> std::fmt::Result { + formatter.write_str(self.as_str()) + } +} + +#[derive(Debug, Clone, PartialEq, Eq, Serialize)] +pub(crate) struct IssueBody(String); + +impl IssueBody { + pub(crate) fn parse(raw: impl Into<String>) -> Result<Self, StoreError> { + let raw = raw.into(); + if raw.trim().is_empty() { + return Err(StoreError::EmptyIssueBody); + } + Ok(Self(raw)) + } + + pub(crate) fn into_inner(self) -> String { + self.0 + } +} + +#[derive(Debug, Clone, PartialEq, Eq, Serialize)] +pub(crate) struct ProjectLayout { + pub(crate) requested_path: PathBuf, + pub(crate) project_root: PathBuf, + pub(crate) issues_root: PathBuf, + pub(crate) state_root: PathBuf, +} + +impl ProjectLayout { + pub(crate) fn bind(requested_path: impl Into<PathBuf>) -> Result<Self, StoreError> { + let requested_path = requested_path.into(); + let project_root = resolve_project_root(&requested_path)?; + let issues_root = project_root.join(ISSUES_DIR_NAME); + fs::create_dir_all(&issues_root)?; + let state_root = external_state_root(&project_root)?; + fs::create_dir_all(state_root.join("mcp"))?; + Ok(Self { + requested_path, + project_root, + issues_root, + state_root, + }) + } + + pub(crate) fn issue_path(&self, slug: &IssueSlug) -> PathBuf { + self.issues_root.join(format!("{slug}.md")) + } +} + +#[derive(Debug, Clone, PartialEq, Eq, Serialize)] +pub(crate) struct ProjectStatus { + pub(crate) issue_count: usize, +} + +#[derive(Debug, Clone, PartialEq, Eq, Serialize)] +pub(crate) struct IssueSummary { + pub(crate) slug: IssueSlug, + pub(crate) updated_at: OffsetDateTime, +} + +#[derive(Debug, Clone, PartialEq, Eq, Serialize)] +pub(crate) struct IssueRecord { + pub(crate) slug: IssueSlug, + pub(crate) body: String, + pub(crate) path: PathBuf, + pub(crate) updated_at: OffsetDateTime, + pub(crate) bytes: usize, +} + +#[derive(Debug, Clone, PartialEq, Eq, Serialize)] +pub(crate) struct SaveReceipt { + pub(crate) slug: IssueSlug, + pub(crate) path: PathBuf, + pub(crate) created: bool, + pub(crate) updated_at: OffsetDateTime, + pub(crate) bytes: usize, +} + +#[derive(Debug, Clone)] +pub(crate) struct IssueStore { + layout: ProjectLayout, +} + +impl IssueStore { + pub(crate) fn bind(requested_path: impl Into<PathBuf>) -> Result<Self, StoreError> { + Ok(Self { + layout: ProjectLayout::bind(requested_path)?, + }) + } + + pub(crate) fn layout(&self) -> &ProjectLayout { + &self.layout + } + + pub(crate) fn status(&self) -> Result<ProjectStatus, StoreError> { + Ok(ProjectStatus { + issue_count: self.list()?.len(), + }) + } + + pub(crate) fn save(&self, slug: IssueSlug, body: IssueBody) -> Result<SaveReceipt, StoreError> { + let path = self.layout.issue_path(&slug); + let created = !path.exists(); + let body = body.into_inner(); + fs::write(&path, body.as_bytes())?; + let metadata = fs::metadata(&path)?; + Ok(SaveReceipt { + slug, + path, + created, + updated_at: metadata_modified_at(&metadata.modified()?), + bytes: body.len(), + }) + } + + pub(crate) fn list(&self) -> Result<Vec<IssueSummary>, StoreError> { + let mut issues = Vec::new(); + for entry in fs::read_dir(&self.layout.issues_root)? { + let entry = entry?; + let path = entry.path(); + let file_type = entry.file_type()?; + if !file_type.is_file() { + continue; + } + let slug = IssueSlug::from_issue_path(&path)?; + let updated_at = metadata_modified_at(&entry.metadata()?.modified()?); + issues.push(IssueSummary { slug, updated_at }); + } + issues.sort_by(|left, right| left.slug.as_str().cmp(right.slug.as_str())); + Ok(issues) + } + + pub(crate) fn read(&self, slug: IssueSlug) -> Result<IssueRecord, StoreError> { + let path = self.layout.issue_path(&slug); + if !path.is_file() { + return Err(StoreError::IssueNotFound(slug.to_string())); + } + let body = fs::read_to_string(&path)?; + let metadata = fs::metadata(&path)?; + Ok(IssueRecord { + slug, + bytes: body.len(), + body, + path, + updated_at: metadata_modified_at(&metadata.modified()?), + }) + } +} + +#[derive(Debug, Error)] +pub(crate) enum StoreError { + #[error("project path `{0}` does not exist")] + MissingProjectPath(String), + #[error("project path `{0}` does not resolve to a directory")] + ProjectPathNotDirectory(String), + #[error("invalid issue slug: {0}")] + InvalidSlug(String), + #[error("issue body must not be blank")] + EmptyIssueBody, + #[error("issue `{0}` does not exist")] + IssueNotFound(String), + #[error("malformed issue entry `{0}`: {1}")] + MalformedIssueEntry(String, String), + #[error(transparent)] + Io(#[from] io::Error), +} + +pub(crate) fn format_timestamp(timestamp: OffsetDateTime) -> String { + let format = &time::format_description::well_known::Rfc3339; + timestamp + .format(format) + .unwrap_or_else(|_| timestamp.unix_timestamp().to_string()) +} + +fn resolve_project_root(requested_path: &Path) -> Result<PathBuf, StoreError> { + if !requested_path.exists() { + return Err(StoreError::MissingProjectPath( + requested_path.display().to_string(), + )); + } + let canonical = requested_path.canonicalize()?; + let search_root = if canonical.is_dir() { + canonical + } else { + canonical.parent().map(Path::to_path_buf).ok_or_else(|| { + StoreError::ProjectPathNotDirectory(requested_path.display().to_string()) + })? + }; + + for ancestor in search_root.ancestors() { + if ancestor.join(".git").exists() { + return Ok(ancestor.to_path_buf()); + } + } + Ok(search_root) +} + +fn external_state_root(project_root: &Path) -> Result<PathBuf, StoreError> { + let mut base = dirs::state_dir().unwrap_or_else(std::env::temp_dir); + base.push(APP_STATE_DIR_NAME); + base.push("projects"); + for component in project_root.components() { + match component { + Component::Normal(part) => base.push(part), + Component::Prefix(prefix) => base.push(prefix.as_os_str()), + Component::CurDir | Component::ParentDir | Component::RootDir => {} + } + } + fs::create_dir_all(&base)?; + Ok(base) +} + +fn metadata_modified_at(system_time: &SystemTime) -> OffsetDateTime { + OffsetDateTime::from(*system_time) +} diff --git a/crates/jira-at-home/tests/mcp_hardening.rs b/crates/jira-at-home/tests/mcp_hardening.rs new file mode 100644 index 0000000..02f4fda --- /dev/null +++ b/crates/jira-at-home/tests/mcp_hardening.rs @@ -0,0 +1,411 @@ +use clap as _; +use dirs as _; +use std::fs; +use std::io::{self, BufRead, BufReader, Write}; +use std::path::{Path, PathBuf}; +use std::process::{Child, ChildStdin, ChildStdout, Command, Stdio}; + +use libmcp as _; +use libmcp_testkit::read_json_lines; +use serde as _; +use serde_json::{Value, json}; +use thiserror as _; +use time as _; + +type TestResult<T = ()> = Result<T, Box<dyn std::error::Error>>; + +fn must<T, E: std::fmt::Display, C: std::fmt::Display>( + result: Result<T, E>, + context: C, +) -> TestResult<T> { + result.map_err(|error| io::Error::other(format!("{context}: {error}")).into()) +} + +fn must_some<T>(value: Option<T>, context: &str) -> TestResult<T> { + value.ok_or_else(|| io::Error::other(context).into()) +} + +fn temp_project_root(name: &str) -> TestResult<PathBuf> { + let root = std::env::temp_dir().join(format!( + "jira_at_home_{name}_{}_{}", + std::process::id(), + must( + std::time::SystemTime::now().duration_since(std::time::UNIX_EPOCH), + "current time after unix epoch", + )? + .as_nanos() + )); + must(fs::create_dir_all(&root), "create temp project root")?; + Ok(root) +} + +fn binary_path() -> PathBuf { + PathBuf::from(env!("CARGO_BIN_EXE_jira-at-home")) +} + +struct McpHarness { + child: Child, + stdin: ChildStdin, + stdout: BufReader<ChildStdout>, +} + +impl McpHarness { + fn spawn( + project_root: Option<&Path>, + state_home: &Path, + extra_env: &[(&str, &str)], + ) -> TestResult<Self> { + let mut command = Command::new(binary_path()); + let _ = command + .arg("mcp") + .arg("serve") + .env("XDG_STATE_HOME", state_home) + .stdin(Stdio::piped()) + .stdout(Stdio::piped()) + .stderr(Stdio::inherit()); + if let Some(project_root) = project_root { + let _ = command.arg("--project").arg(project_root); + } + for (key, value) in extra_env { + let _ = command.env(key, value); + } + let mut child = must(command.spawn(), "spawn mcp host")?; + let stdin = must_some(child.stdin.take(), "host stdin")?; + let stdout = BufReader::new(must_some(child.stdout.take(), "host stdout")?); + Ok(Self { + child, + stdin, + stdout, + }) + } + + fn initialize(&mut self) -> TestResult<Value> { + self.request(json!({ + "jsonrpc": "2.0", + "id": 1, + "method": "initialize", + "params": { + "protocolVersion": "2025-11-25", + "capabilities": {}, + "clientInfo": { "name": "mcp-hardening-test", "version": "0" } + } + })) + } + + fn notify_initialized(&mut self) -> TestResult { + self.notify(json!({ + "jsonrpc": "2.0", + "method": "notifications/initialized", + })) + } + + fn tools_list(&mut self) -> TestResult<Value> { + self.request(json!({ + "jsonrpc": "2.0", + "id": 2, + "method": "tools/list", + "params": {}, + })) + } + + fn bind_project(&mut self, id: u64, path: &Path) -> TestResult<Value> { + self.call_tool( + id, + "project.bind", + json!({ "path": path.display().to_string() }), + ) + } + + fn call_tool(&mut self, id: u64, name: &str, arguments: Value) -> TestResult<Value> { + self.request(json!({ + "jsonrpc": "2.0", + "id": id, + "method": "tools/call", + "params": { + "name": name, + "arguments": arguments, + } + })) + } + + fn call_tool_full(&mut self, id: u64, name: &str, arguments: Value) -> TestResult<Value> { + let mut arguments = arguments.as_object().cloned().unwrap_or_default(); + let _ = arguments.insert("render".to_owned(), json!("json")); + let _ = arguments.insert("detail".to_owned(), json!("full")); + self.call_tool(id, name, Value::Object(arguments)) + } + + fn request(&mut self, message: Value) -> TestResult<Value> { + let encoded = must(serde_json::to_string(&message), "request json")?; + must(writeln!(self.stdin, "{encoded}"), "write request")?; + must(self.stdin.flush(), "flush request")?; + let mut line = String::new(); + let byte_count = must(self.stdout.read_line(&mut line), "read response")?; + if byte_count == 0 { + return Err(io::Error::other("unexpected EOF reading response").into()); + } + must(serde_json::from_str(&line), "response json") + } + + fn notify(&mut self, message: Value) -> TestResult { + let encoded = must(serde_json::to_string(&message), "notify json")?; + must(writeln!(self.stdin, "{encoded}"), "write notify")?; + must(self.stdin.flush(), "flush notify")?; + Ok(()) + } +} + +impl Drop for McpHarness { + fn drop(&mut self) { + let _ = self.child.kill(); + let _ = self.child.wait(); + } +} + +fn assert_tool_ok(response: &Value) { + assert_eq!( + response["result"]["isError"].as_bool(), + Some(false), + "tool response unexpectedly errored: {response:#}" + ); +} + +fn tool_content(response: &Value) -> &Value { + &response["result"]["structuredContent"] +} + +fn tool_names(response: &Value) -> Vec<&str> { + response["result"]["tools"] + .as_array() + .into_iter() + .flatten() + .filter_map(|tool| tool["name"].as_str()) + .collect() +} + +#[test] +fn cold_start_exposes_basic_toolset_and_binding_surface() -> TestResult { + let project_root = temp_project_root("cold_start")?; + let state_home = project_root.join("state-home"); + must(fs::create_dir_all(&state_home), "create state home")?; + + let mut harness = McpHarness::spawn(None, &state_home, &[])?; + let initialize = harness.initialize()?; + assert_eq!( + initialize["result"]["protocolVersion"].as_str(), + Some("2025-11-25") + ); + harness.notify_initialized()?; + + let tools = harness.tools_list()?; + let tool_names = tool_names(&tools); + assert!(tool_names.contains(&"project.bind")); + assert!(tool_names.contains(&"issue.save")); + assert!(tool_names.contains(&"issue.list")); + assert!(tool_names.contains(&"issue.read")); + assert!(tool_names.contains(&"system.health")); + assert!(tool_names.contains(&"system.telemetry")); + + let health = harness.call_tool(3, "system.health", json!({}))?; + assert_tool_ok(&health); + assert_eq!(tool_content(&health)["bound"].as_bool(), Some(false)); + + let nested = project_root.join("nested").join("deeper"); + must(fs::create_dir_all(&nested), "create nested path")?; + must( + fs::create_dir_all(project_root.join(".git")), + "create fake git root", + )?; + let bind = harness.bind_project(4, &nested)?; + assert_tool_ok(&bind); + assert_eq!( + tool_content(&bind)["project_root"].as_str(), + Some(project_root.display().to_string().as_str()) + ); + assert_eq!(tool_content(&bind)["issue_count"].as_u64(), Some(0)); + + let rebound_health = harness.call_tool(5, "system.health", json!({}))?; + assert_tool_ok(&rebound_health); + assert_eq!(tool_content(&rebound_health)["bound"].as_bool(), Some(true)); + Ok(()) +} + +#[test] +fn save_list_and_read_roundtrip_through_canonical_issue_dir() -> TestResult { + let project_root = temp_project_root("roundtrip")?; + let state_home = project_root.join("state-home"); + must(fs::create_dir_all(&state_home), "create state home")?; + let mut harness = McpHarness::spawn(None, &state_home, &[])?; + let _ = harness.initialize()?; + harness.notify_initialized()?; + + let bind = harness.bind_project(2, &project_root)?; + assert_tool_ok(&bind); + let state_root = must_some( + tool_content(&bind)["state_root"] + .as_str() + .map(PathBuf::from), + "state root in bind response", + )?; + + let body = "# Feral Machine\n\nMake note parking brutally small."; + let save = harness.call_tool( + 3, + "issue.save", + json!({ + "slug": "feral-machine", + "body": body, + }), + )?; + assert_tool_ok(&save); + assert_eq!( + tool_content(&save)["path"].as_str(), + Some("issues/feral-machine.md") + ); + + let saved_path = project_root.join("issues").join("feral-machine.md"); + assert_eq!( + must(fs::read_to_string(&saved_path), "read saved issue")?, + body + ); + + let list = harness.call_tool(4, "issue.list", json!({}))?; + assert_tool_ok(&list); + assert_eq!(tool_content(&list)["count"].as_u64(), Some(1)); + assert_eq!( + tool_content(&list)["issues"][0]["slug"].as_str(), + Some("feral-machine") + ); + assert!(tool_content(&list)["issues"][0].get("body").is_none()); + + let read = harness.call_tool_full( + 5, + "issue.read", + json!({ + "slug": "feral-machine", + }), + )?; + assert_tool_ok(&read); + assert_eq!(tool_content(&read)["body"].as_str(), Some(body)); + assert_eq!( + tool_content(&read)["path"].as_str(), + Some("issues/feral-machine.md") + ); + + let telemetry_path = state_root.join("mcp").join("telemetry.jsonl"); + let events = must( + read_json_lines::<Value>(&telemetry_path), + "read telemetry log", + )?; + assert!( + events + .iter() + .any(|event| event["event"] == "tool_call" && event["tool_name"] == "issue.save"), + "expected issue.save tool_call event: {events:#?}" + ); + assert!( + events + .iter() + .any(|event| event["event"] == "hot_paths_snapshot"), + "expected hot_paths_snapshot event: {events:#?}" + ); + Ok(()) +} + +#[test] +fn convergent_issue_list_survives_worker_crash() -> TestResult { + let project_root = temp_project_root("worker_retry")?; + let state_home = project_root.join("state-home"); + must(fs::create_dir_all(&state_home), "create state home")?; + let mut harness = McpHarness::spawn( + Some(&project_root), + &state_home, + &[( + "JIRA_AT_HOME_MCP_TEST_HOST_CRASH_ONCE_KEY", + "tools/call:issue.list", + )], + )?; + let _ = harness.initialize()?; + harness.notify_initialized()?; + + let save = harness.call_tool( + 2, + "issue.save", + json!({ + "slug": "one-shot", + "body": "body", + }), + )?; + assert_tool_ok(&save); + + let list = harness.call_tool(3, "issue.list", json!({}))?; + assert_tool_ok(&list); + assert_eq!(tool_content(&list)["count"].as_u64(), Some(1)); + + let telemetry = harness.call_tool_full(4, "system.telemetry", json!({}))?; + assert_tool_ok(&telemetry); + assert_eq!( + tool_content(&telemetry)["telemetry"]["totals"]["retry_count"].as_u64(), + Some(1) + ); + assert!( + tool_content(&telemetry)["telemetry"]["restart_count"] + .as_u64() + .is_some_and(|count| count >= 1) + ); + Ok(()) +} + +#[test] +fn host_rollout_reexec_preserves_session_and_binding() -> TestResult { + let project_root = temp_project_root("rollout")?; + let state_home = project_root.join("state-home"); + must(fs::create_dir_all(&state_home), "create state home")?; + let mut harness = McpHarness::spawn( + Some(&project_root), + &state_home, + &[( + "JIRA_AT_HOME_MCP_TEST_FORCE_ROLLOUT_KEY", + "tools/call:issue.list", + )], + )?; + let _ = harness.initialize()?; + harness.notify_initialized()?; + + let save = harness.call_tool( + 2, + "issue.save", + json!({ + "slug": "after-rollout", + "body": "body", + }), + )?; + assert_tool_ok(&save); + + let list = harness.call_tool(3, "issue.list", json!({}))?; + assert_tool_ok(&list); + assert_eq!(tool_content(&list)["count"].as_u64(), Some(1)); + + let health = harness.call_tool(4, "system.health", json!({}))?; + assert_tool_ok(&health); + assert_eq!(tool_content(&health)["bound"].as_bool(), Some(true)); + + let read = harness.call_tool( + 5, + "issue.read", + json!({ + "slug": "after-rollout", + }), + )?; + assert_tool_ok(&read); + assert_eq!(tool_content(&read)["body"].as_str(), Some("body")); + + let telemetry = harness.call_tool_full(6, "system.telemetry", json!({}))?; + assert_tool_ok(&telemetry); + assert!( + tool_content(&telemetry)["host_rollouts"] + .as_u64() + .is_some_and(|count| count >= 1) + ); + Ok(()) +} |