swarm repositories / source
aboutsummaryrefslogtreecommitdiff
path: root/crates/fidget-spinner-cli/src/mcp/host
diff options
context:
space:
mode:
authormain <main@swarm.moe>2026-03-19 11:19:42 -0400
committermain <main@swarm.moe>2026-03-19 11:19:42 -0400
commiteb6b1af642f5829d5dc08aa61138d893b91b60b2 (patch)
tree0f1f5a81424f2a98ea08a8743995303769763e32 /crates/fidget-spinner-cli/src/mcp/host
parent7b9bd8b42883f82b090718175b8316296ef18236 (diff)
downloadfidget_spinner-eb6b1af642f5829d5dc08aa61138d893b91b60b2.zip
Retrofit MCP host onto libmcp
Diffstat (limited to 'crates/fidget-spinner-cli/src/mcp/host')
-rw-r--r--crates/fidget-spinner-cli/src/mcp/host/runtime.rs235
1 files changed, 148 insertions, 87 deletions
diff --git a/crates/fidget-spinner-cli/src/mcp/host/runtime.rs b/crates/fidget-spinner-cli/src/mcp/host/runtime.rs
index dd75544..17c26c7 100644
--- a/crates/fidget-spinner-cli/src/mcp/host/runtime.rs
+++ b/crates/fidget-spinner-cli/src/mcp/host/runtime.rs
@@ -5,6 +5,10 @@ use std::path::PathBuf;
use std::process::Command;
use std::time::Instant;
+use libmcp::{
+ FramedMessage, HostSessionKernel, ReplayContract, RequestId, load_snapshot_file_from_env,
+ remove_snapshot_file, write_snapshot_file,
+};
use serde::Serialize;
use serde_json::{Value, json};
@@ -14,13 +18,13 @@ use super::{
process::{ProjectBinding, WorkerSupervisor},
};
use crate::mcp::catalog::{
- DispatchTarget, ReplayContract, list_resources, resource_spec, tool_definitions, tool_spec,
+ DispatchTarget, list_resources, resource_spec, tool_definitions, tool_spec,
};
use crate::mcp::fault::{FaultKind, FaultRecord, FaultStage};
+use crate::mcp::output::split_render_mode;
use crate::mcp::protocol::{
CRASH_ONCE_ENV, FORCE_ROLLOUT_ENV, HOST_STATE_ENV, HostRequestId, HostStateSeed,
- PROTOCOL_VERSION, ProjectBindingSeed, SERVER_NAME, SessionSeed, WorkerOperation,
- WorkerSpawnConfig,
+ PROTOCOL_VERSION, ProjectBindingSeed, SERVER_NAME, WorkerOperation, WorkerSpawnConfig,
};
use crate::mcp::telemetry::{
BinaryHealth, BindingHealth, HealthSnapshot, InitializationHealth, ServerTelemetry,
@@ -59,7 +63,7 @@ pub(crate) fn run_host(
struct HostRuntime {
config: HostConfig,
binding: Option<ProjectBinding>,
- session: SessionSeed,
+ session_kernel: HostSessionKernel,
telemetry: ServerTelemetry,
next_request_id: u64,
worker: WorkerSupervisor,
@@ -73,10 +77,13 @@ struct HostRuntime {
impl HostRuntime {
fn new(config: HostConfig) -> Result<Self, fidget_spinner_store_sqlite::StoreError> {
- let restored = restore_host_state();
- let session = restored
+ let restored = restore_host_state()?;
+ let session_kernel = restored
.as_ref()
- .map_or_else(SessionSeed::default, |seed| seed.session.clone());
+ .map(|seed| seed.session_kernel.clone().restore())
+ .transpose()
+ .map_err(fidget_spinner_store_sqlite::StoreError::Io)?
+ .map_or_else(HostSessionKernel::cold, HostSessionKernel::from_restored);
let telemetry = restored
.as_ref()
.map_or_else(ServerTelemetry::default, |seed| seed.telemetry.clone());
@@ -117,7 +124,7 @@ impl HostRuntime {
Ok(Self {
config: config.clone(),
binding,
- session,
+ session_kernel,
telemetry,
next_request_id,
worker,
@@ -131,8 +138,8 @@ impl HostRuntime {
}
fn handle_line(&mut self, line: &str) -> Option<Value> {
- let message = match serde_json::from_str::<Value>(line) {
- Ok(message) => message,
+ let frame = match FramedMessage::parse(line.as_bytes().to_vec()) {
+ Ok(frame) => frame,
Err(error) => {
return Some(jsonrpc_error(
Value::Null,
@@ -145,11 +152,12 @@ impl HostRuntime {
));
}
};
- self.handle_message(message)
+ self.handle_frame(frame)
}
- fn handle_message(&mut self, message: Value) -> Option<Value> {
- let Some(object) = message.as_object() else {
+ fn handle_frame(&mut self, frame: FramedMessage) -> Option<Value> {
+ self.session_kernel.observe_client_frame(&frame);
+ let Some(object) = frame.value.as_object() else {
return Some(jsonrpc_error(
Value::Null,
FaultRecord::new(
@@ -168,7 +176,7 @@ impl HostRuntime {
let started_at = Instant::now();
self.telemetry.record_request(&operation_key);
- let response = match self.dispatch(method, params, id.clone()) {
+ let response = match self.dispatch(&frame, method, params, id.clone()) {
Ok(Some(result)) => {
self.telemetry
.record_success(&operation_key, started_at.elapsed().as_millis());
@@ -206,29 +214,26 @@ impl HostRuntime {
fn dispatch(
&mut self,
+ request_frame: &FramedMessage,
method: &str,
params: Value,
request_id: Option<Value>,
) -> Result<Option<Value>, FaultRecord> {
match method {
- "initialize" => {
- self.session.initialize_params = Some(params.clone());
- self.session.initialized = false;
- Ok(Some(json!({
- "protocolVersion": PROTOCOL_VERSION,
- "capabilities": {
- "tools": { "listChanged": false },
- "resources": { "listChanged": false, "subscribe": false }
- },
- "serverInfo": {
- "name": SERVER_NAME,
- "version": env!("CARGO_PKG_VERSION")
- },
- "instructions": "The DAG is canonical truth. Frontier state is derived. Bind the session with project.bind before project-local DAG operations when the MCP is running unbound."
- })))
- }
+ "initialize" => Ok(Some(json!({
+ "protocolVersion": PROTOCOL_VERSION,
+ "capabilities": {
+ "tools": { "listChanged": false },
+ "resources": { "listChanged": false, "subscribe": false }
+ },
+ "serverInfo": {
+ "name": SERVER_NAME,
+ "version": env!("CARGO_PKG_VERSION")
+ },
+ "instructions": "The DAG is canonical truth. Frontier state is derived. Bind the session with project.bind before project-local DAG operations when the MCP is running unbound."
+ }))),
"notifications/initialized" => {
- if self.session.initialize_params.is_none() {
+ if !self.seed_captured() {
return Err(FaultRecord::new(
FaultKind::NotInitialized,
FaultStage::Host,
@@ -236,7 +241,6 @@ impl HostRuntime {
"received initialized notification before initialize",
));
}
- self.session.initialized = true;
Ok(None)
}
"notifications/cancelled" => Ok(None),
@@ -246,8 +250,14 @@ impl HostRuntime {
match other {
"tools/list" => Ok(Some(json!({ "tools": tool_definitions() }))),
"resources/list" => Ok(Some(json!({ "resources": list_resources() }))),
- "tools/call" => Ok(Some(self.dispatch_tool_call(params, request_id)?)),
- "resources/read" => Ok(Some(self.dispatch_resource_read(params)?)),
+ "tools/call" => Ok(Some(self.dispatch_tool_call(
+ request_frame,
+ params,
+ request_id,
+ )?)),
+ "resources/read" => {
+ Ok(Some(self.dispatch_resource_read(request_frame, params)?))
+ }
_ => Err(FaultRecord::new(
FaultKind::InvalidInput,
FaultStage::Protocol,
@@ -261,6 +271,7 @@ impl HostRuntime {
fn dispatch_tool_call(
&mut self,
+ request_frame: &FramedMessage,
params: Value,
_request_id: Option<Value>,
) -> Result<Value, FaultRecord> {
@@ -275,11 +286,17 @@ impl HostRuntime {
})?;
match spec.dispatch {
DispatchTarget::Host => self.handle_host_tool(&envelope.name, envelope.arguments),
- DispatchTarget::Worker => self.dispatch_worker_tool(spec, envelope.arguments),
+ DispatchTarget::Worker => {
+ self.dispatch_worker_tool(request_frame, spec, envelope.arguments)
+ }
}
}
- fn dispatch_resource_read(&mut self, params: Value) -> Result<Value, FaultRecord> {
+ fn dispatch_resource_read(
+ &mut self,
+ request_frame: &FramedMessage,
+ params: Value,
+ ) -> Result<Value, FaultRecord> {
let args = deserialize::<ReadResourceArgs>(params, "resources/read")?;
let spec = resource_spec(&args.uri).ok_or_else(|| {
FaultRecord::new(
@@ -292,6 +309,7 @@ impl HostRuntime {
match spec.dispatch {
DispatchTarget::Host => Ok(Self::handle_host_resource(spec.uri)),
DispatchTarget::Worker => self.dispatch_worker_operation(
+ request_frame,
format!("resources/read:{}", args.uri),
spec.replay,
WorkerOperation::ReadResource { uri: args.uri },
@@ -301,11 +319,13 @@ impl HostRuntime {
fn dispatch_worker_tool(
&mut self,
+ request_frame: &FramedMessage,
spec: crate::mcp::catalog::ToolSpec,
arguments: Value,
) -> Result<Value, FaultRecord> {
let operation = format!("tools/call:{}", spec.name);
self.dispatch_worker_operation(
+ request_frame,
operation.clone(),
spec.replay,
WorkerOperation::CallTool {
@@ -317,6 +337,7 @@ impl HostRuntime {
fn dispatch_worker_operation(
&mut self,
+ request_frame: &FramedMessage,
operation: String,
replay: ReplayContract,
worker_operation: WorkerOperation,
@@ -328,21 +349,34 @@ impl HostRuntime {
self.worker.arm_crash_once();
}
+ self.session_kernel
+ .record_forwarded_request(request_frame, replay);
+ let forwarded_request_id = request_id_from_frame(request_frame);
let request_id = self.allocate_request_id();
match self.worker.execute(request_id, worker_operation.clone()) {
- Ok(result) => Ok(result),
+ Ok(result) => {
+ self.complete_forwarded_request(forwarded_request_id.as_ref());
+ Ok(result)
+ }
Err(fault) => {
- if replay == ReplayContract::SafeReplay && fault.retryable {
+ if replay == ReplayContract::Convergent && fault.retryable {
self.telemetry.record_retry(&operation);
self.telemetry.record_worker_restart();
self.worker
.restart()
.map_err(|restart_fault| restart_fault.mark_retried())?;
match self.worker.execute(request_id, worker_operation) {
- Ok(result) => Ok(result),
- Err(retry_fault) => Err(retry_fault.mark_retried()),
+ Ok(result) => {
+ self.complete_forwarded_request(forwarded_request_id.as_ref());
+ Ok(result)
+ }
+ Err(retry_fault) => {
+ self.complete_forwarded_request(forwarded_request_id.as_ref());
+ Err(retry_fault.mark_retried())
+ }
}
} else {
+ self.complete_forwarded_request(forwarded_request_id.as_ref());
Err(fault)
}
}
@@ -350,6 +384,8 @@ impl HostRuntime {
}
fn handle_host_tool(&mut self, name: &str, arguments: Value) -> Result<Value, FaultRecord> {
+ let operation = format!("tools/call:{name}");
+ let (render, arguments) = split_render_mode(arguments, &operation, FaultStage::Host)?;
match name {
"project.bind" => {
let args = deserialize::<ProjectBindArgs>(arguments, "tools/call:project.bind")?;
@@ -357,11 +393,14 @@ impl HostRuntime {
.map_err(host_store_fault("tools/call:project.bind"))?;
self.worker.rebind(resolved.binding.project_root.clone());
self.binding = Some(resolved.binding);
- tool_success(&resolved.status)
+ tool_success(&resolved.status, render)
}
- "skill.list" => tool_success(&json!({
- "skills": crate::bundled_skill::bundled_skill_summaries(),
- })),
+ "skill.list" => tool_success(
+ &json!({
+ "skills": crate::bundled_skill::bundled_skill_summaries(),
+ }),
+ render,
+ ),
"skill.show" => {
let args = deserialize::<SkillShowArgs>(arguments, "tools/call:skill.show")?;
let skill = args.name.as_deref().map_or_else(
@@ -377,31 +416,37 @@ impl HostRuntime {
})
},
)?;
- tool_success(&json!({
- "name": skill.name,
- "description": skill.description,
- "resource_uri": skill.resource_uri,
- "body": skill.body,
- }))
+ tool_success(
+ &json!({
+ "name": skill.name,
+ "description": skill.description,
+ "resource_uri": skill.resource_uri,
+ "body": skill.body,
+ }),
+ render,
+ )
}
- "system.health" => tool_success(&HealthSnapshot {
- initialization: InitializationHealth {
- ready: self.session.initialized,
- seed_captured: self.session.initialize_params.is_some(),
- },
- binding: binding_health(self.binding.as_ref()),
- worker: WorkerHealth {
- worker_generation: self.worker.generation(),
- alive: self.worker.is_alive(),
- },
- binary: BinaryHealth {
- current_executable: self.binary.path.display().to_string(),
- launch_path_stable: self.binary.launch_path_stable,
- rollout_pending: self.binary.rollout_pending().unwrap_or(false),
+ "system.health" => tool_success(
+ &HealthSnapshot {
+ initialization: InitializationHealth {
+ ready: self.session_initialized(),
+ seed_captured: self.seed_captured(),
+ },
+ binding: binding_health(self.binding.as_ref()),
+ worker: WorkerHealth {
+ worker_generation: self.worker.generation(),
+ alive: self.worker.is_alive(),
+ },
+ binary: BinaryHealth {
+ current_executable: self.binary.path.display().to_string(),
+ launch_path_stable: self.binary.launch_path_stable,
+ rollout_pending: self.binary.rollout_pending().unwrap_or(false),
+ },
+ last_fault: self.telemetry.last_fault.clone(),
},
- last_fault: self.telemetry.last_fault.clone(),
- }),
- "system.telemetry" => tool_success(&self.telemetry),
+ render,
+ ),
+ "system.telemetry" => tool_success(&self.telemetry, render),
other => Err(FaultRecord::new(
FaultKind::InvalidInput,
FaultStage::Host,
@@ -425,7 +470,7 @@ impl HostRuntime {
}
fn require_initialized(&self, operation: &str) -> Result<(), FaultRecord> {
- if self.session.initialized {
+ if self.session_initialized() {
return Ok(());
}
Err(FaultRecord::new(
@@ -447,6 +492,22 @@ impl HostRuntime {
})
}
+ fn session_initialized(&self) -> bool {
+ self.session_kernel
+ .initialization_seed()
+ .is_some_and(|seed| seed.initialized_notification.is_some())
+ }
+
+ fn seed_captured(&self) -> bool {
+ self.session_kernel.initialization_seed().is_some()
+ }
+
+ fn complete_forwarded_request(&mut self, request_id: Option<&RequestId>) {
+ if let Some(request_id) = request_id {
+ let _ = self.session_kernel.take_completed_request(request_id);
+ }
+ }
+
fn allocate_request_id(&mut self) -> HostRequestId {
let id = HostRequestId(self.next_request_id);
self.next_request_id += 1;
@@ -466,7 +527,7 @@ impl HostRuntime {
fn roll_forward(&mut self) -> Result<(), fidget_spinner_store_sqlite::StoreError> {
let state = HostStateSeed {
- session: self.session.clone(),
+ session_kernel: self.session_kernel.snapshot(),
telemetry: self.telemetry.clone(),
next_request_id: self.next_request_id,
binding: self.binding.clone().map(ProjectBindingSeed::from),
@@ -474,20 +535,23 @@ impl HostRuntime {
force_rollout_consumed: self.force_rollout_consumed,
crash_once_consumed: self.crash_once_consumed,
};
- let serialized = serde_json::to_string(&state)?;
+ let state_path = write_snapshot_file("fidget-spinner-mcp-host-reexec", &state)
+ .map_err(fidget_spinner_store_sqlite::StoreError::Io)?;
let mut command = Command::new(&self.binary.path);
let _ = command.arg("mcp").arg("serve");
if let Some(project) = self.config.initial_project.as_ref() {
let _ = command.arg("--project").arg(project);
}
- let _ = command.env(HOST_STATE_ENV, serialized);
+ let _ = command.env(HOST_STATE_ENV, &state_path);
#[cfg(unix)]
{
let error = command.exec();
+ let _removed = remove_snapshot_file(&state_path);
Err(fidget_spinner_store_sqlite::StoreError::Io(error))
}
#[cfg(not(unix))]
{
+ let _removed = remove_snapshot_file(&state_path);
return Err(fidget_spinner_store_sqlite::StoreError::Io(io::Error::new(
io::ErrorKind::Unsupported,
"host rollout requires unix exec support",
@@ -605,9 +669,8 @@ impl From<ProjectBinding> for ProjectBindingSeed {
}
}
-fn restore_host_state() -> Option<HostStateSeed> {
- let raw = std::env::var(HOST_STATE_ENV).ok()?;
- serde_json::from_str::<HostStateSeed>(&raw).ok()
+fn restore_host_state() -> Result<Option<HostStateSeed>, fidget_spinner_store_sqlite::StoreError> {
+ load_snapshot_file_from_env(HOST_STATE_ENV).map_err(fidget_spinner_store_sqlite::StoreError::Io)
}
fn deserialize<T: for<'de> serde::Deserialize<'de>>(
@@ -638,19 +701,17 @@ fn operation_key(method: &str, params: &Value) -> String {
}
}
-fn tool_success(value: &impl Serialize) -> Result<Value, FaultRecord> {
- Ok(json!({
- "content": [{
- "type": "text",
- "text": crate::to_pretty_json(value).map_err(|error| {
- FaultRecord::new(FaultKind::Internal, FaultStage::Host, "tool_success", error.to_string())
- })?,
- }],
- "structuredContent": serde_json::to_value(value).map_err(|error| {
- FaultRecord::new(FaultKind::Internal, FaultStage::Host, "tool_success", error.to_string())
- })?,
- "isError": false,
- }))
+fn request_id_from_frame(frame: &FramedMessage) -> Option<RequestId> {
+ match frame.classify() {
+ libmcp::RpcEnvelopeKind::Request { id, .. } => Some(id),
+ libmcp::RpcEnvelopeKind::Notification { .. }
+ | libmcp::RpcEnvelopeKind::Response { .. }
+ | libmcp::RpcEnvelopeKind::Unknown => None,
+ }
+}
+
+fn tool_success(value: &impl Serialize, render: libmcp::RenderMode) -> Result<Value, FaultRecord> {
+ crate::mcp::output::tool_success(value, render, FaultStage::Host, "tool_success")
}
fn host_store_fault(