use std::collections::BTreeMap; use std::io::{self, BufRead, Write}; use std::path::{Path, PathBuf}; use std::process::Command; use libmcp::{Generation, SurfaceKind}; use serde::Deserialize; use serde_json::{Value, json}; use thiserror::Error; use uuid::Uuid; use crate::mcp::fault::{FaultRecord, FaultStage}; use crate::mcp::output::{ ToolOutput, fallback_detailed_tool_output, split_presentation, tool_success, }; use crate::mcp::protocol::{ CLAUDE_BIN_ENV, CLAUDE_CONSULT_PREFIX, CLAUDE_EFFORT, CLAUDE_MODEL, CLAUDE_TOOLSET, EMPTY_MCP_CONFIG, }; pub(crate) fn run_worker(generation: u64) -> Result<(), Box> { let generation = generation_from_wire(generation); let stdin = io::stdin(); let mut stdout = io::stdout().lock(); let mut service = WorkerService::new(generation); for line in stdin.lock().lines() { let line = line?; if line.trim().is_empty() { continue; } let request = serde_json::from_str::(&line)?; let response = match request { crate::mcp::protocol::WorkerRequest::Execute { id, operation } => { let outcome = match service.execute(operation) { Ok(result) => crate::mcp::protocol::WorkerOutcome::Success { result }, Err(fault) => crate::mcp::protocol::WorkerOutcome::Fault { fault }, }; crate::mcp::protocol::WorkerResponse { id, outcome } } }; serde_json::to_writer(&mut stdout, &response)?; stdout.write_all(b"\n")?; stdout.flush()?; } Ok(()) } struct WorkerService { generation: Generation, } impl WorkerService { fn new(generation: Generation) -> Self { Self { generation } } fn execute( &mut self, operation: crate::mcp::protocol::WorkerOperation, ) -> Result { match operation { crate::mcp::protocol::WorkerOperation::CallTool { name, arguments } => { self.call_tool(&name, arguments) } } } fn call_tool(&mut self, name: &str, arguments: Value) -> Result { let operation = format!("tools/call:{name}"); let (presentation, arguments) = split_presentation(arguments, &operation, self.generation, FaultStage::Worker)?; let output = match name { "consult" => { let args = deserialize::(arguments, &operation, self.generation)?; let request = ConsultRequest::parse(args) .map_err(|error| invalid_consult_request(self.generation, &operation, error))?; let response = invoke_claude(&request) .map_err(|error| consult_fault(self.generation, &operation, error))?; consult_output(&request, &response, self.generation, &operation)? } other => { return Err(FaultRecord::invalid_input( self.generation, FaultStage::Worker, &operation, format!("unknown worker tool `{other}`"), )); } }; tool_success( output, presentation, self.generation, FaultStage::Worker, &operation, ) } } #[derive(Debug, Deserialize)] struct ConsultArgs { prompt: String, cwd: Option, max_turns: Option, session_id: Option, } #[derive(Debug, Clone)] struct ConsultRequest { prompt: PromptText, cwd: WorkingDirectory, max_turns: Option, session: Option, } impl ConsultRequest { fn parse(args: ConsultArgs) -> Result { Ok(Self { prompt: PromptText::parse(args.prompt)?, cwd: WorkingDirectory::resolve(args.cwd)?, max_turns: args.max_turns.map(TurnLimit::parse).transpose()?, session: args.session_id.map(SessionHandle::parse).transpose()?, }) } fn session_mode(&self) -> &'static str { if self.session.is_some() { "resumed" } else { "new" } } fn requested_session_id(&self) -> Option { self.session.as_ref().map(SessionHandle::display) } } #[derive(Debug, Clone)] struct PromptText { original: String, rendered: String, } impl PromptText { fn parse(raw: String) -> Result { if raw.trim().is_empty() { return Err(ConsultRequestError::EmptyPrompt); } Ok(Self { rendered: format!("{CLAUDE_CONSULT_PREFIX}\n\n{raw}"), original: raw, }) } fn as_str(&self) -> &str { self.original.as_str() } fn rendered(&self) -> &str { self.rendered.as_str() } } #[derive(Debug, Clone)] struct WorkingDirectory(PathBuf); impl WorkingDirectory { fn resolve(raw: Option) -> Result { let base = std::env::current_dir().map_err(|source| ConsultRequestError::CurrentDir { source })?; let requested = raw.map_or_else(|| base.clone(), PathBuf::from); let candidate = if requested.is_absolute() { requested } else { base.join(requested) }; let canonical = candidate .canonicalize() .map_err(|source| ConsultRequestError::Canonicalize { path: candidate.display().to_string(), source, })?; if !canonical.is_dir() { return Err(ConsultRequestError::NotDirectory( canonical.display().to_string(), )); } Ok(Self(canonical)) } fn as_path(&self) -> &Path { self.0.as_path() } fn display(&self) -> String { self.0.display().to_string() } } #[derive(Debug, Clone, Copy)] struct TurnLimit(u64); impl TurnLimit { fn parse(raw: u64) -> Result { if raw == 0 { return Err(ConsultRequestError::InvalidTurnLimit); } Ok(Self(raw)) } fn get(self) -> u64 { self.0 } } #[derive(Debug, Clone)] struct SessionHandle(Uuid); impl SessionHandle { fn parse(raw: String) -> Result { Uuid::parse_str(&raw) .map(Self) .map_err(|_| ConsultRequestError::InvalidSessionHandle(raw)) } fn display(&self) -> String { self.0.to_string() } } #[derive(Debug, Error)] enum ConsultRequestError { #[error("prompt must not be empty")] EmptyPrompt, #[error("failed to resolve the current working directory: {source}")] CurrentDir { source: io::Error }, #[error("failed to resolve working directory `{path}`: {source}")] Canonicalize { path: String, source: io::Error }, #[error("working directory `{0}` is not a directory")] NotDirectory(String), #[error("max_turns must be greater than zero")] InvalidTurnLimit, #[error("session_id must be a valid UUID, got `{0}`")] InvalidSessionHandle(String), } #[derive(Debug, Error)] enum ConsultInvocationError { #[error("failed to spawn Claude Code: {0}")] Spawn(#[source] io::Error), #[error("Claude Code returned non-JSON output: {0}")] InvalidJson(String), #[error("{0}")] Downstream(String), } #[derive(Debug, Deserialize)] struct ClaudeJsonEnvelope { #[serde(rename = "type")] envelope_type: String, subtype: Option, is_error: bool, duration_ms: Option, duration_api_ms: Option, num_turns: Option, result: Option, stop_reason: Option, session_id: Option, total_cost_usd: Option, usage: Option, #[serde(rename = "modelUsage")] model_usage: Option, #[serde(default)] permission_denials: Vec, fast_mode_state: Option, uuid: Option, } #[derive(Debug)] struct ConsultResponse { cwd: WorkingDirectory, result: String, duration_ms: u64, duration_api_ms: Option, num_turns: u64, stop_reason: Option, session_id: Option, total_cost_usd: Option, usage: Option, model_usage: Option, permission_denials: Vec, fast_mode_state: Option, uuid: Option, } impl ConsultResponse { fn model_name(&self) -> Option { let Value::Object(models) = self.model_usage.as_ref()? else { return None; }; models.keys().next().cloned() } } fn deserialize Deserialize<'de>>( value: Value, operation: &str, generation: Generation, ) -> Result { serde_json::from_value(value).map_err(|error| { FaultRecord::invalid_input( generation, FaultStage::Protocol, operation, format!("invalid params: {error}"), ) }) } fn invalid_consult_request( generation: Generation, operation: &str, error: ConsultRequestError, ) -> FaultRecord { FaultRecord::invalid_input(generation, FaultStage::Worker, operation, error.to_string()) } fn consult_fault( generation: Generation, operation: &str, error: ConsultInvocationError, ) -> FaultRecord { match error { ConsultInvocationError::Spawn(source) => FaultRecord::process( generation, FaultStage::Claude, operation, source.to_string(), ), ConsultInvocationError::InvalidJson(detail) | ConsultInvocationError::Downstream(detail) => { FaultRecord::downstream(generation, FaultStage::Claude, operation, detail) } } } fn invoke_claude(request: &ConsultRequest) -> Result { let mut command = Command::new(claude_binary()); let _ = command .arg("-p") .arg("--output-format") .arg("json") .arg("--strict-mcp-config") .arg("--mcp-config") .arg(EMPTY_MCP_CONFIG) .arg("--disable-slash-commands") .arg("--no-chrome") .arg("--model") .arg(CLAUDE_MODEL) .arg("--effort") .arg(CLAUDE_EFFORT) .arg("--tools") .arg(CLAUDE_TOOLSET) .arg("--permission-mode") .arg("dontAsk"); if let Some(session) = request.session.as_ref() { let _ = command.arg("--resume").arg(session.display()); } if let Some(max_turns) = request.max_turns { let _ = command.arg("--max-turns").arg(max_turns.get().to_string()); } let output = command .current_dir(request.cwd.as_path()) .arg(request.prompt.rendered()) .output() .map_err(ConsultInvocationError::Spawn)?; let stdout = String::from_utf8_lossy(&output.stdout).trim().to_owned(); let stderr = String::from_utf8_lossy(&output.stderr).trim().to_owned(); let envelope = match serde_json::from_slice::(&output.stdout) { Ok(envelope) => envelope, Err(_error) if !output.status.success() => { return Err(ConsultInvocationError::Downstream(downstream_message( output.status.code(), &stdout, &stderr, ))); } Err(error) => { return Err(ConsultInvocationError::InvalidJson(format!( "{error}; stdout={stdout}; stderr={stderr}" ))); } }; if envelope.envelope_type != "result" { return Err(ConsultInvocationError::Downstream(format!( "unexpected Claude envelope type `{}`", envelope.envelope_type ))); } if !output.status.success() || envelope.is_error || envelope.subtype.as_deref() != Some("success") { return Err(ConsultInvocationError::Downstream( envelope .result .filter(|value| !value.trim().is_empty()) .unwrap_or_else(|| downstream_message(output.status.code(), &stdout, &stderr)), )); } Ok(ConsultResponse { cwd: request.cwd.clone(), result: envelope.result.unwrap_or_default(), duration_ms: envelope.duration_ms.unwrap_or(0), duration_api_ms: envelope.duration_api_ms, num_turns: envelope.num_turns.unwrap_or(0), stop_reason: envelope.stop_reason, session_id: envelope.session_id, total_cost_usd: envelope.total_cost_usd, usage: envelope.usage, model_usage: envelope.model_usage, permission_denials: envelope.permission_denials, fast_mode_state: envelope.fast_mode_state, uuid: envelope.uuid, }) } fn downstream_message(status_code: Option, stdout: &str, stderr: &str) -> String { if !stderr.is_empty() { return stderr.to_owned(); } if !stdout.is_empty() { return stdout.to_owned(); } format!("Claude Code exited with status {status_code:?}") } fn claude_binary() -> PathBuf { std::env::var_os(CLAUDE_BIN_ENV) .map(PathBuf::from) .unwrap_or_else(|| PathBuf::from("claude")) } fn consult_output( request: &ConsultRequest, response: &ConsultResponse, generation: Generation, operation: &str, ) -> Result { let concise = json!({ "response": response.result, "cwd": response.cwd.display(), "session_mode": request.session_mode(), "requested_session_id": request.requested_session_id(), "prompt_prefix_injected": true, "model": response.model_name(), "duration_ms": response.duration_ms, "num_turns": response.num_turns, "stop_reason": response.stop_reason, "session_id": response.session_id, "total_cost_usd": response.total_cost_usd, "permission_denial_count": response.permission_denials.len(), }); let full = json!({ "response": response.result, "cwd": response.cwd.display(), "prompt": request.prompt.as_str(), "prompt_prefix": CLAUDE_CONSULT_PREFIX, "effective_prompt": request.prompt.rendered(), "max_turns": request.max_turns.map(TurnLimit::get), "session_mode": request.session_mode(), "requested_session_id": request.requested_session_id(), "duration_ms": response.duration_ms, "duration_api_ms": response.duration_api_ms, "num_turns": response.num_turns, "stop_reason": response.stop_reason, "session_id": response.session_id, "total_cost_usd": response.total_cost_usd, "usage": response.usage, "model_usage": response.model_usage, "permission_denials": response.permission_denials, "fast_mode_state": response.fast_mode_state, "uuid": response.uuid, }); fallback_detailed_tool_output( &concise, &full, concise_text(request, response), Some(full_text(request, response)), SurfaceKind::Read, generation, FaultStage::Worker, operation, ) } fn concise_text(request: &ConsultRequest, response: &ConsultResponse) -> String { let mut status = vec![ "consult ok".to_owned(), format!("session={}", request.session_mode()), format!("turns={}", response.num_turns), format!("duration={}", render_duration_ms(response.duration_ms)), ]; if let Some(model) = response.model_name() { status.push(format!("model={model}")); } if let Some(stop_reason) = response.stop_reason.as_deref() { status.push(format!("stop={stop_reason}")); } if let Some(cost) = response.total_cost_usd { status.push(format!("cost=${cost:.6}")); } let mut lines = vec![status.join(" ")]; lines.push(format!("cwd: {}", response.cwd.display())); if let Some(session_id) = request.requested_session_id() { lines.push(format!("requested_session: {session_id}")); } if let Some(session_id) = response.session_id.as_deref() { lines.push(format!("session: {session_id}")); } if !response.permission_denials.is_empty() { lines.push(format!( "permission_denials: {}", response.permission_denials.len() )); } lines.push("response:".to_owned()); lines.push(response.result.clone()); lines.join("\n") } fn full_text(request: &ConsultRequest, response: &ConsultResponse) -> String { let mut lines = vec![ format!( "consult ok session={} turns={}", request.session_mode(), response.num_turns ), format!("cwd: {}", response.cwd.display()), format!("duration: {}", render_duration_ms(response.duration_ms)), ]; if let Some(session_id) = request.requested_session_id() { lines.push(format!("requested_session: {session_id}")); } if let Some(duration_api_ms) = response.duration_api_ms { lines.push(format!( "api_duration: {}", render_duration_ms(duration_api_ms) )); } if let Some(model) = response.model_name() { lines.push(format!("model: {model}")); } if let Some(stop_reason) = response.stop_reason.as_deref() { lines.push(format!("stop: {stop_reason}")); } if let Some(session_id) = response.session_id.as_deref() { lines.push(format!("session: {session_id}")); } if let Some(cost) = response.total_cost_usd { lines.push(format!("cost_usd: {cost:.6}")); } lines.push(format!( "permission_denials: {}", response.permission_denials.len() )); if let Some(fast_mode_state) = response.fast_mode_state.as_deref() { lines.push(format!("fast_mode: {fast_mode_state}")); } if let Some(uuid) = response.uuid.as_deref() { lines.push(format!("uuid: {uuid}")); } if let Some(usage) = usage_summary(response.usage.as_ref()) { lines.push(format!("usage: {usage}")); } lines.push("response:".to_owned()); lines.push(response.result.clone()); lines.join("\n") } fn usage_summary(usage: Option<&Value>) -> Option { let Value::Object(usage) = usage? else { return None; }; let summary = usage .iter() .filter_map(|(key, value)| match value { Value::Number(number) => Some((key.clone(), number.to_string())), Value::String(text) if !text.is_empty() => Some((key.clone(), text.clone())), _ => None, }) .collect::>(); (!summary.is_empty()).then(|| { summary .into_iter() .map(|(key, value)| format!("{key}={value}")) .collect::>() .join(" ") }) } fn render_duration_ms(duration_ms: u64) -> String { if duration_ms < 1_000 { return format!("{duration_ms}ms"); } let seconds = duration_ms as f64 / 1_000.0; format!("{seconds:.3}s") } fn generation_from_wire(raw: u64) -> Generation { let mut generation = Generation::genesis(); for _ in 1..raw { generation = generation.next(); } generation }