diff options
| author | main <main@swarm.moe> | 2026-03-19 11:19:42 -0400 |
|---|---|---|
| committer | main <main@swarm.moe> | 2026-03-19 11:19:42 -0400 |
| commit | eb6b1af642f5829d5dc08aa61138d893b91b60b2 (patch) | |
| tree | 0f1f5a81424f2a98ea08a8743995303769763e32 /crates/fidget-spinner-cli/src/mcp/host/runtime.rs | |
| parent | 7b9bd8b42883f82b090718175b8316296ef18236 (diff) | |
| download | fidget_spinner-eb6b1af642f5829d5dc08aa61138d893b91b60b2.zip | |
Retrofit MCP host onto libmcp
Diffstat (limited to 'crates/fidget-spinner-cli/src/mcp/host/runtime.rs')
| -rw-r--r-- | crates/fidget-spinner-cli/src/mcp/host/runtime.rs | 235 |
1 files changed, 148 insertions, 87 deletions
diff --git a/crates/fidget-spinner-cli/src/mcp/host/runtime.rs b/crates/fidget-spinner-cli/src/mcp/host/runtime.rs index dd75544..17c26c7 100644 --- a/crates/fidget-spinner-cli/src/mcp/host/runtime.rs +++ b/crates/fidget-spinner-cli/src/mcp/host/runtime.rs @@ -5,6 +5,10 @@ use std::path::PathBuf; use std::process::Command; use std::time::Instant; +use libmcp::{ + FramedMessage, HostSessionKernel, ReplayContract, RequestId, load_snapshot_file_from_env, + remove_snapshot_file, write_snapshot_file, +}; use serde::Serialize; use serde_json::{Value, json}; @@ -14,13 +18,13 @@ use super::{ process::{ProjectBinding, WorkerSupervisor}, }; use crate::mcp::catalog::{ - DispatchTarget, ReplayContract, list_resources, resource_spec, tool_definitions, tool_spec, + DispatchTarget, list_resources, resource_spec, tool_definitions, tool_spec, }; use crate::mcp::fault::{FaultKind, FaultRecord, FaultStage}; +use crate::mcp::output::split_render_mode; use crate::mcp::protocol::{ CRASH_ONCE_ENV, FORCE_ROLLOUT_ENV, HOST_STATE_ENV, HostRequestId, HostStateSeed, - PROTOCOL_VERSION, ProjectBindingSeed, SERVER_NAME, SessionSeed, WorkerOperation, - WorkerSpawnConfig, + PROTOCOL_VERSION, ProjectBindingSeed, SERVER_NAME, WorkerOperation, WorkerSpawnConfig, }; use crate::mcp::telemetry::{ BinaryHealth, BindingHealth, HealthSnapshot, InitializationHealth, ServerTelemetry, @@ -59,7 +63,7 @@ pub(crate) fn run_host( struct HostRuntime { config: HostConfig, binding: Option<ProjectBinding>, - session: SessionSeed, + session_kernel: HostSessionKernel, telemetry: ServerTelemetry, next_request_id: u64, worker: WorkerSupervisor, @@ -73,10 +77,13 @@ struct HostRuntime { impl HostRuntime { fn new(config: HostConfig) -> Result<Self, fidget_spinner_store_sqlite::StoreError> { - let restored = restore_host_state(); - let session = restored + let restored = restore_host_state()?; + let session_kernel = restored .as_ref() - .map_or_else(SessionSeed::default, |seed| seed.session.clone()); + .map(|seed| seed.session_kernel.clone().restore()) + .transpose() + .map_err(fidget_spinner_store_sqlite::StoreError::Io)? + .map_or_else(HostSessionKernel::cold, HostSessionKernel::from_restored); let telemetry = restored .as_ref() .map_or_else(ServerTelemetry::default, |seed| seed.telemetry.clone()); @@ -117,7 +124,7 @@ impl HostRuntime { Ok(Self { config: config.clone(), binding, - session, + session_kernel, telemetry, next_request_id, worker, @@ -131,8 +138,8 @@ impl HostRuntime { } fn handle_line(&mut self, line: &str) -> Option<Value> { - let message = match serde_json::from_str::<Value>(line) { - Ok(message) => message, + let frame = match FramedMessage::parse(line.as_bytes().to_vec()) { + Ok(frame) => frame, Err(error) => { return Some(jsonrpc_error( Value::Null, @@ -145,11 +152,12 @@ impl HostRuntime { )); } }; - self.handle_message(message) + self.handle_frame(frame) } - fn handle_message(&mut self, message: Value) -> Option<Value> { - let Some(object) = message.as_object() else { + fn handle_frame(&mut self, frame: FramedMessage) -> Option<Value> { + self.session_kernel.observe_client_frame(&frame); + let Some(object) = frame.value.as_object() else { return Some(jsonrpc_error( Value::Null, FaultRecord::new( @@ -168,7 +176,7 @@ impl HostRuntime { let started_at = Instant::now(); self.telemetry.record_request(&operation_key); - let response = match self.dispatch(method, params, id.clone()) { + let response = match self.dispatch(&frame, method, params, id.clone()) { Ok(Some(result)) => { self.telemetry .record_success(&operation_key, started_at.elapsed().as_millis()); @@ -206,29 +214,26 @@ impl HostRuntime { fn dispatch( &mut self, + request_frame: &FramedMessage, method: &str, params: Value, request_id: Option<Value>, ) -> Result<Option<Value>, FaultRecord> { match method { - "initialize" => { - self.session.initialize_params = Some(params.clone()); - self.session.initialized = false; - Ok(Some(json!({ - "protocolVersion": PROTOCOL_VERSION, - "capabilities": { - "tools": { "listChanged": false }, - "resources": { "listChanged": false, "subscribe": false } - }, - "serverInfo": { - "name": SERVER_NAME, - "version": env!("CARGO_PKG_VERSION") - }, - "instructions": "The DAG is canonical truth. Frontier state is derived. Bind the session with project.bind before project-local DAG operations when the MCP is running unbound." - }))) - } + "initialize" => Ok(Some(json!({ + "protocolVersion": PROTOCOL_VERSION, + "capabilities": { + "tools": { "listChanged": false }, + "resources": { "listChanged": false, "subscribe": false } + }, + "serverInfo": { + "name": SERVER_NAME, + "version": env!("CARGO_PKG_VERSION") + }, + "instructions": "The DAG is canonical truth. Frontier state is derived. Bind the session with project.bind before project-local DAG operations when the MCP is running unbound." + }))), "notifications/initialized" => { - if self.session.initialize_params.is_none() { + if !self.seed_captured() { return Err(FaultRecord::new( FaultKind::NotInitialized, FaultStage::Host, @@ -236,7 +241,6 @@ impl HostRuntime { "received initialized notification before initialize", )); } - self.session.initialized = true; Ok(None) } "notifications/cancelled" => Ok(None), @@ -246,8 +250,14 @@ impl HostRuntime { match other { "tools/list" => Ok(Some(json!({ "tools": tool_definitions() }))), "resources/list" => Ok(Some(json!({ "resources": list_resources() }))), - "tools/call" => Ok(Some(self.dispatch_tool_call(params, request_id)?)), - "resources/read" => Ok(Some(self.dispatch_resource_read(params)?)), + "tools/call" => Ok(Some(self.dispatch_tool_call( + request_frame, + params, + request_id, + )?)), + "resources/read" => { + Ok(Some(self.dispatch_resource_read(request_frame, params)?)) + } _ => Err(FaultRecord::new( FaultKind::InvalidInput, FaultStage::Protocol, @@ -261,6 +271,7 @@ impl HostRuntime { fn dispatch_tool_call( &mut self, + request_frame: &FramedMessage, params: Value, _request_id: Option<Value>, ) -> Result<Value, FaultRecord> { @@ -275,11 +286,17 @@ impl HostRuntime { })?; match spec.dispatch { DispatchTarget::Host => self.handle_host_tool(&envelope.name, envelope.arguments), - DispatchTarget::Worker => self.dispatch_worker_tool(spec, envelope.arguments), + DispatchTarget::Worker => { + self.dispatch_worker_tool(request_frame, spec, envelope.arguments) + } } } - fn dispatch_resource_read(&mut self, params: Value) -> Result<Value, FaultRecord> { + fn dispatch_resource_read( + &mut self, + request_frame: &FramedMessage, + params: Value, + ) -> Result<Value, FaultRecord> { let args = deserialize::<ReadResourceArgs>(params, "resources/read")?; let spec = resource_spec(&args.uri).ok_or_else(|| { FaultRecord::new( @@ -292,6 +309,7 @@ impl HostRuntime { match spec.dispatch { DispatchTarget::Host => Ok(Self::handle_host_resource(spec.uri)), DispatchTarget::Worker => self.dispatch_worker_operation( + request_frame, format!("resources/read:{}", args.uri), spec.replay, WorkerOperation::ReadResource { uri: args.uri }, @@ -301,11 +319,13 @@ impl HostRuntime { fn dispatch_worker_tool( &mut self, + request_frame: &FramedMessage, spec: crate::mcp::catalog::ToolSpec, arguments: Value, ) -> Result<Value, FaultRecord> { let operation = format!("tools/call:{}", spec.name); self.dispatch_worker_operation( + request_frame, operation.clone(), spec.replay, WorkerOperation::CallTool { @@ -317,6 +337,7 @@ impl HostRuntime { fn dispatch_worker_operation( &mut self, + request_frame: &FramedMessage, operation: String, replay: ReplayContract, worker_operation: WorkerOperation, @@ -328,21 +349,34 @@ impl HostRuntime { self.worker.arm_crash_once(); } + self.session_kernel + .record_forwarded_request(request_frame, replay); + let forwarded_request_id = request_id_from_frame(request_frame); let request_id = self.allocate_request_id(); match self.worker.execute(request_id, worker_operation.clone()) { - Ok(result) => Ok(result), + Ok(result) => { + self.complete_forwarded_request(forwarded_request_id.as_ref()); + Ok(result) + } Err(fault) => { - if replay == ReplayContract::SafeReplay && fault.retryable { + if replay == ReplayContract::Convergent && fault.retryable { self.telemetry.record_retry(&operation); self.telemetry.record_worker_restart(); self.worker .restart() .map_err(|restart_fault| restart_fault.mark_retried())?; match self.worker.execute(request_id, worker_operation) { - Ok(result) => Ok(result), - Err(retry_fault) => Err(retry_fault.mark_retried()), + Ok(result) => { + self.complete_forwarded_request(forwarded_request_id.as_ref()); + Ok(result) + } + Err(retry_fault) => { + self.complete_forwarded_request(forwarded_request_id.as_ref()); + Err(retry_fault.mark_retried()) + } } } else { + self.complete_forwarded_request(forwarded_request_id.as_ref()); Err(fault) } } @@ -350,6 +384,8 @@ impl HostRuntime { } fn handle_host_tool(&mut self, name: &str, arguments: Value) -> Result<Value, FaultRecord> { + let operation = format!("tools/call:{name}"); + let (render, arguments) = split_render_mode(arguments, &operation, FaultStage::Host)?; match name { "project.bind" => { let args = deserialize::<ProjectBindArgs>(arguments, "tools/call:project.bind")?; @@ -357,11 +393,14 @@ impl HostRuntime { .map_err(host_store_fault("tools/call:project.bind"))?; self.worker.rebind(resolved.binding.project_root.clone()); self.binding = Some(resolved.binding); - tool_success(&resolved.status) + tool_success(&resolved.status, render) } - "skill.list" => tool_success(&json!({ - "skills": crate::bundled_skill::bundled_skill_summaries(), - })), + "skill.list" => tool_success( + &json!({ + "skills": crate::bundled_skill::bundled_skill_summaries(), + }), + render, + ), "skill.show" => { let args = deserialize::<SkillShowArgs>(arguments, "tools/call:skill.show")?; let skill = args.name.as_deref().map_or_else( @@ -377,31 +416,37 @@ impl HostRuntime { }) }, )?; - tool_success(&json!({ - "name": skill.name, - "description": skill.description, - "resource_uri": skill.resource_uri, - "body": skill.body, - })) + tool_success( + &json!({ + "name": skill.name, + "description": skill.description, + "resource_uri": skill.resource_uri, + "body": skill.body, + }), + render, + ) } - "system.health" => tool_success(&HealthSnapshot { - initialization: InitializationHealth { - ready: self.session.initialized, - seed_captured: self.session.initialize_params.is_some(), - }, - binding: binding_health(self.binding.as_ref()), - worker: WorkerHealth { - worker_generation: self.worker.generation(), - alive: self.worker.is_alive(), - }, - binary: BinaryHealth { - current_executable: self.binary.path.display().to_string(), - launch_path_stable: self.binary.launch_path_stable, - rollout_pending: self.binary.rollout_pending().unwrap_or(false), + "system.health" => tool_success( + &HealthSnapshot { + initialization: InitializationHealth { + ready: self.session_initialized(), + seed_captured: self.seed_captured(), + }, + binding: binding_health(self.binding.as_ref()), + worker: WorkerHealth { + worker_generation: self.worker.generation(), + alive: self.worker.is_alive(), + }, + binary: BinaryHealth { + current_executable: self.binary.path.display().to_string(), + launch_path_stable: self.binary.launch_path_stable, + rollout_pending: self.binary.rollout_pending().unwrap_or(false), + }, + last_fault: self.telemetry.last_fault.clone(), }, - last_fault: self.telemetry.last_fault.clone(), - }), - "system.telemetry" => tool_success(&self.telemetry), + render, + ), + "system.telemetry" => tool_success(&self.telemetry, render), other => Err(FaultRecord::new( FaultKind::InvalidInput, FaultStage::Host, @@ -425,7 +470,7 @@ impl HostRuntime { } fn require_initialized(&self, operation: &str) -> Result<(), FaultRecord> { - if self.session.initialized { + if self.session_initialized() { return Ok(()); } Err(FaultRecord::new( @@ -447,6 +492,22 @@ impl HostRuntime { }) } + fn session_initialized(&self) -> bool { + self.session_kernel + .initialization_seed() + .is_some_and(|seed| seed.initialized_notification.is_some()) + } + + fn seed_captured(&self) -> bool { + self.session_kernel.initialization_seed().is_some() + } + + fn complete_forwarded_request(&mut self, request_id: Option<&RequestId>) { + if let Some(request_id) = request_id { + let _ = self.session_kernel.take_completed_request(request_id); + } + } + fn allocate_request_id(&mut self) -> HostRequestId { let id = HostRequestId(self.next_request_id); self.next_request_id += 1; @@ -466,7 +527,7 @@ impl HostRuntime { fn roll_forward(&mut self) -> Result<(), fidget_spinner_store_sqlite::StoreError> { let state = HostStateSeed { - session: self.session.clone(), + session_kernel: self.session_kernel.snapshot(), telemetry: self.telemetry.clone(), next_request_id: self.next_request_id, binding: self.binding.clone().map(ProjectBindingSeed::from), @@ -474,20 +535,23 @@ impl HostRuntime { force_rollout_consumed: self.force_rollout_consumed, crash_once_consumed: self.crash_once_consumed, }; - let serialized = serde_json::to_string(&state)?; + let state_path = write_snapshot_file("fidget-spinner-mcp-host-reexec", &state) + .map_err(fidget_spinner_store_sqlite::StoreError::Io)?; let mut command = Command::new(&self.binary.path); let _ = command.arg("mcp").arg("serve"); if let Some(project) = self.config.initial_project.as_ref() { let _ = command.arg("--project").arg(project); } - let _ = command.env(HOST_STATE_ENV, serialized); + let _ = command.env(HOST_STATE_ENV, &state_path); #[cfg(unix)] { let error = command.exec(); + let _removed = remove_snapshot_file(&state_path); Err(fidget_spinner_store_sqlite::StoreError::Io(error)) } #[cfg(not(unix))] { + let _removed = remove_snapshot_file(&state_path); return Err(fidget_spinner_store_sqlite::StoreError::Io(io::Error::new( io::ErrorKind::Unsupported, "host rollout requires unix exec support", @@ -605,9 +669,8 @@ impl From<ProjectBinding> for ProjectBindingSeed { } } -fn restore_host_state() -> Option<HostStateSeed> { - let raw = std::env::var(HOST_STATE_ENV).ok()?; - serde_json::from_str::<HostStateSeed>(&raw).ok() +fn restore_host_state() -> Result<Option<HostStateSeed>, fidget_spinner_store_sqlite::StoreError> { + load_snapshot_file_from_env(HOST_STATE_ENV).map_err(fidget_spinner_store_sqlite::StoreError::Io) } fn deserialize<T: for<'de> serde::Deserialize<'de>>( @@ -638,19 +701,17 @@ fn operation_key(method: &str, params: &Value) -> String { } } -fn tool_success(value: &impl Serialize) -> Result<Value, FaultRecord> { - Ok(json!({ - "content": [{ - "type": "text", - "text": crate::to_pretty_json(value).map_err(|error| { - FaultRecord::new(FaultKind::Internal, FaultStage::Host, "tool_success", error.to_string()) - })?, - }], - "structuredContent": serde_json::to_value(value).map_err(|error| { - FaultRecord::new(FaultKind::Internal, FaultStage::Host, "tool_success", error.to_string()) - })?, - "isError": false, - })) +fn request_id_from_frame(frame: &FramedMessage) -> Option<RequestId> { + match frame.classify() { + libmcp::RpcEnvelopeKind::Request { id, .. } => Some(id), + libmcp::RpcEnvelopeKind::Notification { .. } + | libmcp::RpcEnvelopeKind::Response { .. } + | libmcp::RpcEnvelopeKind::Unknown => None, + } +} + +fn tool_success(value: &impl Serialize, render: libmcp::RenderMode) -> Result<Value, FaultRecord> { + crate::mcp::output::tool_success(value, render, FaultStage::Host, "tool_success") } fn host_store_fault( |