diff options
Diffstat (limited to 'crates/fidget-spinner-cli/src/mcp/host')
| -rw-r--r-- | crates/fidget-spinner-cli/src/mcp/host/binary.rs | 43 | ||||
| -rw-r--r-- | crates/fidget-spinner-cli/src/mcp/host/config.rs | 18 | ||||
| -rw-r--r-- | crates/fidget-spinner-cli/src/mcp/host/mod.rs | 8 | ||||
| -rw-r--r-- | crates/fidget-spinner-cli/src/mcp/host/process.rs | 246 | ||||
| -rw-r--r-- | crates/fidget-spinner-cli/src/mcp/host/runtime.rs | 719 |
5 files changed, 1034 insertions, 0 deletions
diff --git a/crates/fidget-spinner-cli/src/mcp/host/binary.rs b/crates/fidget-spinner-cli/src/mcp/host/binary.rs new file mode 100644 index 0000000..2107461 --- /dev/null +++ b/crates/fidget-spinner-cli/src/mcp/host/binary.rs @@ -0,0 +1,43 @@ +use std::fs; +use std::io; +use std::path::{Path, PathBuf}; + +use fidget_spinner_store_sqlite::StoreError; + +use crate::mcp::protocol::BinaryFingerprint; + +pub(super) struct BinaryRuntime { + pub(super) path: PathBuf, + startup_fingerprint: BinaryFingerprint, + pub(super) launch_path_stable: bool, +} + +impl BinaryRuntime { + pub(super) fn new(path: PathBuf) -> Result<Self, StoreError> { + let startup_fingerprint = fingerprint_binary(&path)?; + Ok(Self { + launch_path_stable: !path + .components() + .any(|component| component.as_os_str().to_string_lossy() == "target"), + path, + startup_fingerprint, + }) + } + + pub(super) fn rollout_pending(&self) -> Result<bool, StoreError> { + Ok(fingerprint_binary(&self.path)? != self.startup_fingerprint) + } +} + +fn fingerprint_binary(path: &Path) -> Result<BinaryFingerprint, StoreError> { + let metadata = fs::metadata(path)?; + let modified_unix_nanos = metadata + .modified()? + .duration_since(std::time::UNIX_EPOCH) + .map_err(|error| io::Error::other(format!("invalid binary mtime: {error}")))? + .as_nanos(); + Ok(BinaryFingerprint { + length_bytes: metadata.len(), + modified_unix_nanos, + }) +} diff --git a/crates/fidget-spinner-cli/src/mcp/host/config.rs b/crates/fidget-spinner-cli/src/mcp/host/config.rs new file mode 100644 index 0000000..8d1ee4b --- /dev/null +++ b/crates/fidget-spinner-cli/src/mcp/host/config.rs @@ -0,0 +1,18 @@ +use std::path::PathBuf; + +use fidget_spinner_store_sqlite::StoreError; + +#[derive(Clone, Debug)] +pub(super) struct HostConfig { + pub(super) executable: PathBuf, + pub(super) initial_project: Option<PathBuf>, +} + +impl HostConfig { + pub(super) fn new(initial_project: Option<PathBuf>) -> Result<Self, StoreError> { + Ok(Self { + executable: std::env::current_exe()?, + initial_project, + }) + } +} diff --git a/crates/fidget-spinner-cli/src/mcp/host/mod.rs b/crates/fidget-spinner-cli/src/mcp/host/mod.rs new file mode 100644 index 0000000..21c19ec --- /dev/null +++ b/crates/fidget-spinner-cli/src/mcp/host/mod.rs @@ -0,0 +1,8 @@ +//! Stable host process that owns the MCP session and routes store work to a disposable worker. + +mod binary; +mod config; +mod process; +mod runtime; + +pub(crate) use runtime::run_host as serve; diff --git a/crates/fidget-spinner-cli/src/mcp/host/process.rs b/crates/fidget-spinner-cli/src/mcp/host/process.rs new file mode 100644 index 0000000..d4cbb4b --- /dev/null +++ b/crates/fidget-spinner-cli/src/mcp/host/process.rs @@ -0,0 +1,246 @@ +use std::io::{BufRead, BufReader, BufWriter, Write}; +use std::path::PathBuf; +use std::process::{Child, ChildStdin, ChildStdout, Command, Stdio}; + +use serde::{Deserialize, Serialize}; +use serde_json::Value; + +use crate::mcp::fault::{FaultKind, FaultRecord, FaultStage}; +use crate::mcp::protocol::{ + HostRequestId, WorkerOperation, WorkerOutcome, WorkerRequest, WorkerResponse, WorkerSpawnConfig, +}; + +#[derive(Clone, Debug, Deserialize, Eq, PartialEq, Serialize)] +pub(super) struct ProjectBinding { + pub(super) requested_path: PathBuf, + pub(super) project_root: PathBuf, +} + +pub(super) struct WorkerSupervisor { + config: WorkerSpawnConfig, + generation: u64, + crash_before_reply_once: bool, + bound_project_root: Option<PathBuf>, + child: Option<Child>, + stdin: Option<BufWriter<ChildStdin>>, + stdout: Option<BufReader<ChildStdout>>, +} + +impl WorkerSupervisor { + pub(super) fn new(config: WorkerSpawnConfig, generation: u64) -> Self { + Self { + config, + generation, + crash_before_reply_once: false, + bound_project_root: None, + child: None, + stdin: None, + stdout: None, + } + } + + pub(super) fn generation(&self) -> u64 { + self.generation + } + + pub(super) fn rebind(&mut self, project_root: PathBuf) { + if self + .bound_project_root + .as_ref() + .is_some_and(|current| current == &project_root) + { + return; + } + self.kill_current_worker(); + self.bound_project_root = Some(project_root); + } + + pub(super) fn execute( + &mut self, + request_id: HostRequestId, + operation: WorkerOperation, + ) -> Result<Value, FaultRecord> { + self.ensure_worker()?; + let request = WorkerRequest::Execute { + id: request_id, + operation, + }; + let stdin = self.stdin.as_mut().ok_or_else(|| { + FaultRecord::new( + FaultKind::Transient, + FaultStage::Transport, + "worker.stdin", + "worker stdin is not available", + ) + .retryable(Some(self.generation)) + })?; + serde_json::to_writer(&mut *stdin, &request).map_err(|error| { + FaultRecord::new( + FaultKind::Transient, + FaultStage::Transport, + "worker.write", + format!("failed to write worker request: {error}"), + ) + .retryable(Some(self.generation)) + })?; + stdin.write_all(b"\n").map_err(|error| { + FaultRecord::new( + FaultKind::Transient, + FaultStage::Transport, + "worker.write", + format!("failed to frame worker request: {error}"), + ) + .retryable(Some(self.generation)) + })?; + stdin.flush().map_err(|error| { + FaultRecord::new( + FaultKind::Transient, + FaultStage::Transport, + "worker.write", + format!("failed to flush worker request: {error}"), + ) + .retryable(Some(self.generation)) + })?; + + if self.crash_before_reply_once { + self.crash_before_reply_once = false; + self.kill_current_worker(); + return Err(FaultRecord::new( + FaultKind::Transient, + FaultStage::Transport, + "worker.read", + "worker crashed before replying", + ) + .retryable(Some(self.generation))); + } + + let stdout = self.stdout.as_mut().ok_or_else(|| { + FaultRecord::new( + FaultKind::Transient, + FaultStage::Transport, + "worker.stdout", + "worker stdout is not available", + ) + .retryable(Some(self.generation)) + })?; + let mut line = String::new(); + let bytes = stdout.read_line(&mut line).map_err(|error| { + FaultRecord::new( + FaultKind::Transient, + FaultStage::Transport, + "worker.read", + format!("failed to read worker response: {error}"), + ) + .retryable(Some(self.generation)) + })?; + if bytes == 0 { + self.kill_current_worker(); + return Err(FaultRecord::new( + FaultKind::Transient, + FaultStage::Transport, + "worker.read", + "worker exited before replying", + ) + .retryable(Some(self.generation))); + } + let response = serde_json::from_str::<WorkerResponse>(&line).map_err(|error| { + FaultRecord::new( + FaultKind::Transient, + FaultStage::Protocol, + "worker.read", + format!("invalid worker response: {error}"), + ) + .retryable(Some(self.generation)) + })?; + match response.outcome { + WorkerOutcome::Success { result } => Ok(result), + WorkerOutcome::Fault { fault } => Err(fault), + } + } + + pub(super) fn restart(&mut self) -> Result<(), FaultRecord> { + self.kill_current_worker(); + self.ensure_worker() + } + + pub(super) fn is_alive(&mut self) -> bool { + let Some(child) = self.child.as_mut() else { + return false; + }; + if let Ok(None) = child.try_wait() { + true + } else { + self.child = None; + self.stdin = None; + self.stdout = None; + false + } + } + + pub(super) fn arm_crash_once(&mut self) { + self.crash_before_reply_once = true; + } + + fn ensure_worker(&mut self) -> Result<(), FaultRecord> { + if self.is_alive() { + return Ok(()); + } + let Some(project_root) = self.bound_project_root.as_ref() else { + return Err(FaultRecord::new( + FaultKind::Unavailable, + FaultStage::Host, + "worker.spawn", + "project is not bound; call project.bind before using project tools", + )); + }; + self.generation += 1; + let mut child = Command::new(&self.config.executable) + .arg("mcp") + .arg("worker") + .arg("--project") + .arg(project_root) + .stdin(Stdio::piped()) + .stdout(Stdio::piped()) + .stderr(Stdio::inherit()) + .spawn() + .map_err(|error| { + FaultRecord::new( + FaultKind::Transient, + FaultStage::Transport, + "worker.spawn", + format!("failed to spawn worker: {error}"), + ) + .retryable(Some(self.generation)) + })?; + let stdin = child.stdin.take().ok_or_else(|| { + FaultRecord::new( + FaultKind::Internal, + FaultStage::Transport, + "worker.spawn", + "worker stdin pipe was not created", + ) + })?; + let stdout = child.stdout.take().ok_or_else(|| { + FaultRecord::new( + FaultKind::Internal, + FaultStage::Transport, + "worker.spawn", + "worker stdout pipe was not created", + ) + })?; + self.child = Some(child); + self.stdin = Some(BufWriter::new(stdin)); + self.stdout = Some(BufReader::new(stdout)); + Ok(()) + } + + fn kill_current_worker(&mut self) { + if let Some(child) = self.child.as_mut() { + let _ = child.kill(); + let _ = child.wait(); + } + self.child = None; + self.stdin = None; + self.stdout = None; + } +} diff --git a/crates/fidget-spinner-cli/src/mcp/host/runtime.rs b/crates/fidget-spinner-cli/src/mcp/host/runtime.rs new file mode 100644 index 0000000..dd75544 --- /dev/null +++ b/crates/fidget-spinner-cli/src/mcp/host/runtime.rs @@ -0,0 +1,719 @@ +use std::io::{self, BufRead, Write}; +#[cfg(unix)] +use std::os::unix::process::CommandExt; +use std::path::PathBuf; +use std::process::Command; +use std::time::Instant; + +use serde::Serialize; +use serde_json::{Value, json}; + +use super::{ + binary::BinaryRuntime, + config::HostConfig, + process::{ProjectBinding, WorkerSupervisor}, +}; +use crate::mcp::catalog::{ + DispatchTarget, ReplayContract, list_resources, resource_spec, tool_definitions, tool_spec, +}; +use crate::mcp::fault::{FaultKind, FaultRecord, FaultStage}; +use crate::mcp::protocol::{ + CRASH_ONCE_ENV, FORCE_ROLLOUT_ENV, HOST_STATE_ENV, HostRequestId, HostStateSeed, + PROTOCOL_VERSION, ProjectBindingSeed, SERVER_NAME, SessionSeed, WorkerOperation, + WorkerSpawnConfig, +}; +use crate::mcp::telemetry::{ + BinaryHealth, BindingHealth, HealthSnapshot, InitializationHealth, ServerTelemetry, + WorkerHealth, +}; + +pub(crate) fn run_host( + initial_project: Option<PathBuf>, +) -> Result<(), fidget_spinner_store_sqlite::StoreError> { + let stdin = io::stdin(); + let mut stdout = io::stdout().lock(); + let mut host = HostRuntime::new(HostConfig::new(initial_project)?)?; + + for line in stdin.lock().lines() { + let line = match line { + Ok(line) => line, + Err(error) => { + eprintln!("mcp stdin failure: {error}"); + continue; + } + }; + if line.trim().is_empty() { + continue; + } + + let maybe_response = host.handle_line(&line); + if let Some(response) = maybe_response { + write_message(&mut stdout, &response)?; + } + host.maybe_roll_forward()?; + } + + Ok(()) +} + +struct HostRuntime { + config: HostConfig, + binding: Option<ProjectBinding>, + session: SessionSeed, + telemetry: ServerTelemetry, + next_request_id: u64, + worker: WorkerSupervisor, + binary: BinaryRuntime, + force_rollout_key: Option<String>, + force_rollout_consumed: bool, + rollout_requested: bool, + crash_once_key: Option<String>, + crash_once_consumed: bool, +} + +impl HostRuntime { + fn new(config: HostConfig) -> Result<Self, fidget_spinner_store_sqlite::StoreError> { + let restored = restore_host_state(); + let session = restored + .as_ref() + .map_or_else(SessionSeed::default, |seed| seed.session.clone()); + let telemetry = restored + .as_ref() + .map_or_else(ServerTelemetry::default, |seed| seed.telemetry.clone()); + let next_request_id = restored + .as_ref() + .map_or(1, |seed| seed.next_request_id.max(1)); + let worker_generation = restored.as_ref().map_or(0, |seed| seed.worker_generation); + let force_rollout_consumed = restored + .as_ref() + .is_some_and(|seed| seed.force_rollout_consumed); + let crash_once_consumed = restored + .as_ref() + .is_some_and(|seed| seed.crash_once_consumed); + let binding = restored + .as_ref() + .and_then(|seed| seed.binding.clone().map(ProjectBinding::from)) + .or(config + .initial_project + .clone() + .map(resolve_project_binding) + .transpose()? + .map(|resolved| resolved.binding)); + + let worker = { + let mut worker = WorkerSupervisor::new( + WorkerSpawnConfig { + executable: config.executable.clone(), + }, + worker_generation, + ); + if let Some(project_root) = binding.as_ref().map(|binding| binding.project_root.clone()) + { + worker.rebind(project_root); + } + worker + }; + + Ok(Self { + config: config.clone(), + binding, + session, + telemetry, + next_request_id, + worker, + binary: BinaryRuntime::new(config.executable.clone())?, + force_rollout_key: std::env::var(FORCE_ROLLOUT_ENV).ok(), + force_rollout_consumed, + rollout_requested: false, + crash_once_key: std::env::var(CRASH_ONCE_ENV).ok(), + crash_once_consumed, + }) + } + + fn handle_line(&mut self, line: &str) -> Option<Value> { + let message = match serde_json::from_str::<Value>(line) { + Ok(message) => message, + Err(error) => { + return Some(jsonrpc_error( + Value::Null, + FaultRecord::new( + FaultKind::InvalidInput, + FaultStage::Protocol, + "jsonrpc.parse", + format!("parse error: {error}"), + ), + )); + } + }; + self.handle_message(message) + } + + fn handle_message(&mut self, message: Value) -> Option<Value> { + let Some(object) = message.as_object() else { + return Some(jsonrpc_error( + Value::Null, + FaultRecord::new( + FaultKind::InvalidInput, + FaultStage::Protocol, + "jsonrpc.message", + "invalid request: expected JSON object", + ), + )); + }; + + let method = object.get("method").and_then(Value::as_str)?; + let id = object.get("id").cloned(); + let params = object.get("params").cloned().unwrap_or_else(|| json!({})); + let operation_key = operation_key(method, ¶ms); + let started_at = Instant::now(); + + self.telemetry.record_request(&operation_key); + let response = match self.dispatch(method, params, id.clone()) { + Ok(Some(result)) => { + self.telemetry + .record_success(&operation_key, started_at.elapsed().as_millis()); + id.map(|id| jsonrpc_result(id, result)) + } + Ok(None) => { + self.telemetry + .record_success(&operation_key, started_at.elapsed().as_millis()); + None + } + Err(fault) => { + self.telemetry.record_error( + &operation_key, + fault.clone(), + started_at.elapsed().as_millis(), + ); + Some(match id { + Some(id) => match method { + "tools/call" => jsonrpc_result(id, fault.into_tool_result()), + _ => jsonrpc_error(id, fault), + }, + None => jsonrpc_error(Value::Null, fault), + }) + } + }; + + if self.should_force_rollout(&operation_key) { + self.force_rollout_consumed = true; + self.telemetry.record_rollout(); + self.rollout_requested = true; + } + + response + } + + fn dispatch( + &mut self, + method: &str, + params: Value, + request_id: Option<Value>, + ) -> Result<Option<Value>, FaultRecord> { + match method { + "initialize" => { + self.session.initialize_params = Some(params.clone()); + self.session.initialized = false; + Ok(Some(json!({ + "protocolVersion": PROTOCOL_VERSION, + "capabilities": { + "tools": { "listChanged": false }, + "resources": { "listChanged": false, "subscribe": false } + }, + "serverInfo": { + "name": SERVER_NAME, + "version": env!("CARGO_PKG_VERSION") + }, + "instructions": "The DAG is canonical truth. Frontier state is derived. Bind the session with project.bind before project-local DAG operations when the MCP is running unbound." + }))) + } + "notifications/initialized" => { + if self.session.initialize_params.is_none() { + return Err(FaultRecord::new( + FaultKind::NotInitialized, + FaultStage::Host, + "notifications/initialized", + "received initialized notification before initialize", + )); + } + self.session.initialized = true; + Ok(None) + } + "notifications/cancelled" => Ok(None), + "ping" => Ok(Some(json!({}))), + other => { + self.require_initialized(other)?; + match other { + "tools/list" => Ok(Some(json!({ "tools": tool_definitions() }))), + "resources/list" => Ok(Some(json!({ "resources": list_resources() }))), + "tools/call" => Ok(Some(self.dispatch_tool_call(params, request_id)?)), + "resources/read" => Ok(Some(self.dispatch_resource_read(params)?)), + _ => Err(FaultRecord::new( + FaultKind::InvalidInput, + FaultStage::Protocol, + other, + format!("method `{other}` is not implemented"), + )), + } + } + } + } + + fn dispatch_tool_call( + &mut self, + params: Value, + _request_id: Option<Value>, + ) -> Result<Value, FaultRecord> { + let envelope = deserialize::<ToolCallEnvelope>(params, "tools/call")?; + let spec = tool_spec(&envelope.name).ok_or_else(|| { + FaultRecord::new( + FaultKind::InvalidInput, + FaultStage::Host, + format!("tools/call:{}", envelope.name), + format!("unknown tool `{}`", envelope.name), + ) + })?; + match spec.dispatch { + DispatchTarget::Host => self.handle_host_tool(&envelope.name, envelope.arguments), + DispatchTarget::Worker => self.dispatch_worker_tool(spec, envelope.arguments), + } + } + + fn dispatch_resource_read(&mut self, params: Value) -> Result<Value, FaultRecord> { + let args = deserialize::<ReadResourceArgs>(params, "resources/read")?; + let spec = resource_spec(&args.uri).ok_or_else(|| { + FaultRecord::new( + FaultKind::InvalidInput, + FaultStage::Host, + format!("resources/read:{}", args.uri), + format!("unknown resource `{}`", args.uri), + ) + })?; + match spec.dispatch { + DispatchTarget::Host => Ok(Self::handle_host_resource(spec.uri)), + DispatchTarget::Worker => self.dispatch_worker_operation( + format!("resources/read:{}", args.uri), + spec.replay, + WorkerOperation::ReadResource { uri: args.uri }, + ), + } + } + + fn dispatch_worker_tool( + &mut self, + spec: crate::mcp::catalog::ToolSpec, + arguments: Value, + ) -> Result<Value, FaultRecord> { + let operation = format!("tools/call:{}", spec.name); + self.dispatch_worker_operation( + operation.clone(), + spec.replay, + WorkerOperation::CallTool { + name: spec.name.to_owned(), + arguments, + }, + ) + } + + fn dispatch_worker_operation( + &mut self, + operation: String, + replay: ReplayContract, + worker_operation: WorkerOperation, + ) -> Result<Value, FaultRecord> { + let binding = self.require_bound_project(&operation)?; + self.worker.rebind(binding.project_root.clone()); + + if self.should_crash_worker_once(&operation) { + self.worker.arm_crash_once(); + } + + let request_id = self.allocate_request_id(); + match self.worker.execute(request_id, worker_operation.clone()) { + Ok(result) => Ok(result), + Err(fault) => { + if replay == ReplayContract::SafeReplay && fault.retryable { + self.telemetry.record_retry(&operation); + self.telemetry.record_worker_restart(); + self.worker + .restart() + .map_err(|restart_fault| restart_fault.mark_retried())?; + match self.worker.execute(request_id, worker_operation) { + Ok(result) => Ok(result), + Err(retry_fault) => Err(retry_fault.mark_retried()), + } + } else { + Err(fault) + } + } + } + } + + fn handle_host_tool(&mut self, name: &str, arguments: Value) -> Result<Value, FaultRecord> { + match name { + "project.bind" => { + let args = deserialize::<ProjectBindArgs>(arguments, "tools/call:project.bind")?; + let resolved = resolve_project_binding(PathBuf::from(args.path)) + .map_err(host_store_fault("tools/call:project.bind"))?; + self.worker.rebind(resolved.binding.project_root.clone()); + self.binding = Some(resolved.binding); + tool_success(&resolved.status) + } + "skill.list" => tool_success(&json!({ + "skills": crate::bundled_skill::bundled_skill_summaries(), + })), + "skill.show" => { + let args = deserialize::<SkillShowArgs>(arguments, "tools/call:skill.show")?; + let skill = args.name.as_deref().map_or_else( + || Ok(crate::bundled_skill::default_bundled_skill()), + |name| { + crate::bundled_skill::bundled_skill(name).ok_or_else(|| { + FaultRecord::new( + FaultKind::InvalidInput, + FaultStage::Host, + "tools/call:skill.show", + format!("unknown bundled skill `{name}`"), + ) + }) + }, + )?; + tool_success(&json!({ + "name": skill.name, + "description": skill.description, + "resource_uri": skill.resource_uri, + "body": skill.body, + })) + } + "system.health" => tool_success(&HealthSnapshot { + initialization: InitializationHealth { + ready: self.session.initialized, + seed_captured: self.session.initialize_params.is_some(), + }, + binding: binding_health(self.binding.as_ref()), + worker: WorkerHealth { + worker_generation: self.worker.generation(), + alive: self.worker.is_alive(), + }, + binary: BinaryHealth { + current_executable: self.binary.path.display().to_string(), + launch_path_stable: self.binary.launch_path_stable, + rollout_pending: self.binary.rollout_pending().unwrap_or(false), + }, + last_fault: self.telemetry.last_fault.clone(), + }), + "system.telemetry" => tool_success(&self.telemetry), + other => Err(FaultRecord::new( + FaultKind::InvalidInput, + FaultStage::Host, + format!("tools/call:{other}"), + format!("unknown host tool `{other}`"), + )), + } + } + + fn handle_host_resource(uri: &str) -> Value { + match uri { + "fidget-spinner://skill/fidget-spinner" => { + skill_resource(uri, crate::bundled_skill::default_bundled_skill().body) + } + "fidget-spinner://skill/frontier-loop" => skill_resource( + uri, + crate::bundled_skill::frontier_loop_bundled_skill().body, + ), + _ => unreachable!("host resources are catalog-gated"), + } + } + + fn require_initialized(&self, operation: &str) -> Result<(), FaultRecord> { + if self.session.initialized { + return Ok(()); + } + Err(FaultRecord::new( + FaultKind::NotInitialized, + FaultStage::Host, + operation, + "client must call initialize and notifications/initialized before normal operations", + )) + } + + fn require_bound_project(&self, operation: &str) -> Result<&ProjectBinding, FaultRecord> { + self.binding.as_ref().ok_or_else(|| { + FaultRecord::new( + FaultKind::Unavailable, + FaultStage::Host, + operation, + "project is not bound; call project.bind with the target project root or a nested path inside it", + ) + }) + } + + fn allocate_request_id(&mut self) -> HostRequestId { + let id = HostRequestId(self.next_request_id); + self.next_request_id += 1; + id + } + + fn maybe_roll_forward(&mut self) -> Result<(), fidget_spinner_store_sqlite::StoreError> { + let binary_pending = self.binary.rollout_pending()?; + if !self.rollout_requested && !binary_pending { + return Ok(()); + } + if binary_pending && !self.rollout_requested { + self.telemetry.record_rollout(); + } + self.roll_forward() + } + + fn roll_forward(&mut self) -> Result<(), fidget_spinner_store_sqlite::StoreError> { + let state = HostStateSeed { + session: self.session.clone(), + telemetry: self.telemetry.clone(), + next_request_id: self.next_request_id, + binding: self.binding.clone().map(ProjectBindingSeed::from), + worker_generation: self.worker.generation(), + force_rollout_consumed: self.force_rollout_consumed, + crash_once_consumed: self.crash_once_consumed, + }; + let serialized = serde_json::to_string(&state)?; + let mut command = Command::new(&self.binary.path); + let _ = command.arg("mcp").arg("serve"); + if let Some(project) = self.config.initial_project.as_ref() { + let _ = command.arg("--project").arg(project); + } + let _ = command.env(HOST_STATE_ENV, serialized); + #[cfg(unix)] + { + let error = command.exec(); + Err(fidget_spinner_store_sqlite::StoreError::Io(error)) + } + #[cfg(not(unix))] + { + return Err(fidget_spinner_store_sqlite::StoreError::Io(io::Error::new( + io::ErrorKind::Unsupported, + "host rollout requires unix exec support", + ))); + } + } + + fn should_force_rollout(&self, operation: &str) -> bool { + self.force_rollout_key + .as_deref() + .is_some_and(|key| key == operation) + && !self.force_rollout_consumed + } + + fn should_crash_worker_once(&mut self, operation: &str) -> bool { + let should_crash = self + .crash_once_key + .as_deref() + .is_some_and(|key| key == operation) + && !self.crash_once_consumed; + if should_crash { + self.crash_once_consumed = true; + } + should_crash + } +} + +#[derive(Debug, Serialize)] +struct ProjectBindStatus { + requested_path: String, + project_root: String, + state_root: String, + display_name: fidget_spinner_core::NonEmptyText, + schema: fidget_spinner_core::PayloadSchemaRef, + git_repo_detected: bool, +} + +struct ResolvedProjectBinding { + binding: ProjectBinding, + status: ProjectBindStatus, +} + +fn resolve_project_binding( + requested_path: PathBuf, +) -> Result<ResolvedProjectBinding, fidget_spinner_store_sqlite::StoreError> { + let store = crate::open_store(&requested_path)?; + Ok(ResolvedProjectBinding { + binding: ProjectBinding { + requested_path: requested_path.clone(), + project_root: PathBuf::from(store.project_root().as_str()), + }, + status: ProjectBindStatus { + requested_path: requested_path.display().to_string(), + project_root: store.project_root().to_string(), + state_root: store.state_root().to_string(), + display_name: store.config().display_name.clone(), + schema: store.schema().schema_ref(), + git_repo_detected: crate::run_git( + store.project_root(), + &["rev-parse", "--show-toplevel"], + )? + .is_some(), + }, + }) +} + +fn binding_health(binding: Option<&ProjectBinding>) -> BindingHealth { + match binding { + Some(binding) => BindingHealth { + bound: true, + requested_path: Some(binding.requested_path.display().to_string()), + project_root: Some(binding.project_root.display().to_string()), + state_root: Some( + binding + .project_root + .join(fidget_spinner_store_sqlite::STORE_DIR_NAME) + .display() + .to_string(), + ), + }, + None => BindingHealth { + bound: false, + requested_path: None, + project_root: None, + state_root: None, + }, + } +} + +fn skill_resource(uri: &str, body: &str) -> Value { + json!({ + "contents": [{ + "uri": uri, + "mimeType": "text/markdown", + "text": body, + }] + }) +} + +impl From<ProjectBindingSeed> for ProjectBinding { + fn from(value: ProjectBindingSeed) -> Self { + Self { + requested_path: value.requested_path, + project_root: value.project_root, + } + } +} + +impl From<ProjectBinding> for ProjectBindingSeed { + fn from(value: ProjectBinding) -> Self { + Self { + requested_path: value.requested_path, + project_root: value.project_root, + } + } +} + +fn restore_host_state() -> Option<HostStateSeed> { + let raw = std::env::var(HOST_STATE_ENV).ok()?; + serde_json::from_str::<HostStateSeed>(&raw).ok() +} + +fn deserialize<T: for<'de> serde::Deserialize<'de>>( + value: Value, + operation: &str, +) -> Result<T, FaultRecord> { + serde_json::from_value(value).map_err(|error| { + FaultRecord::new( + FaultKind::InvalidInput, + FaultStage::Protocol, + operation, + format!("invalid params: {error}"), + ) + }) +} + +fn operation_key(method: &str, params: &Value) -> String { + match method { + "tools/call" => params.get("name").and_then(Value::as_str).map_or_else( + || "tools/call".to_owned(), + |name| format!("tools/call:{name}"), + ), + "resources/read" => params.get("uri").and_then(Value::as_str).map_or_else( + || "resources/read".to_owned(), + |uri| format!("resources/read:{uri}"), + ), + other => other.to_owned(), + } +} + +fn tool_success(value: &impl Serialize) -> Result<Value, FaultRecord> { + Ok(json!({ + "content": [{ + "type": "text", + "text": crate::to_pretty_json(value).map_err(|error| { + FaultRecord::new(FaultKind::Internal, FaultStage::Host, "tool_success", error.to_string()) + })?, + }], + "structuredContent": serde_json::to_value(value).map_err(|error| { + FaultRecord::new(FaultKind::Internal, FaultStage::Host, "tool_success", error.to_string()) + })?, + "isError": false, + })) +} + +fn host_store_fault( + operation: &'static str, +) -> impl FnOnce(fidget_spinner_store_sqlite::StoreError) -> FaultRecord { + move |error| { + FaultRecord::new( + FaultKind::InvalidInput, + FaultStage::Host, + operation, + error.to_string(), + ) + } +} + +fn jsonrpc_result(id: Value, result: Value) -> Value { + json!({ + "jsonrpc": "2.0", + "id": id, + "result": result, + }) +} + +fn jsonrpc_error(id: Value, fault: FaultRecord) -> Value { + json!({ + "jsonrpc": "2.0", + "id": id, + "error": fault.into_jsonrpc_error(), + }) +} + +fn write_message( + stdout: &mut impl Write, + message: &Value, +) -> Result<(), fidget_spinner_store_sqlite::StoreError> { + serde_json::to_writer(&mut *stdout, message)?; + stdout.write_all(b"\n")?; + stdout.flush()?; + Ok(()) +} + +#[derive(Debug, serde::Deserialize)] +struct ToolCallEnvelope { + name: String, + #[serde(default = "empty_json_object")] + arguments: Value, +} + +fn empty_json_object() -> Value { + json!({}) +} + +#[derive(Debug, serde::Deserialize)] +struct ReadResourceArgs { + uri: String, +} + +#[derive(Debug, serde::Deserialize)] +struct ProjectBindArgs { + path: String, +} + +#[derive(Debug, serde::Deserialize)] +struct SkillShowArgs { + name: Option<String>, +} |