use std::collections::BTreeMap; use std::fs; use std::io::{self, BufRead, Write}; use std::path::{Path, PathBuf}; use std::process::{Command, Stdio}; use std::time::{Duration, SystemTime, UNIX_EPOCH}; use dirs::{home_dir, state_dir}; use libmcp::{Generation, SurfaceKind}; use serde::{Deserialize, Serialize}; use serde_json::{Value, json}; use thiserror::Error; use uuid::Uuid; use crate::mcp::fault::{FaultRecord, FaultStage}; use crate::mcp::output::{ ToolOutput, fallback_detailed_tool_output, split_presentation, tool_success, }; use crate::mcp::protocol::{ CLAUDE_BIN_ENV, CLAUDE_CONSULT_PREFIX, CLAUDE_EFFORT, CLAUDE_MODEL, CLAUDE_TOOLSET, EMPTY_MCP_CONFIG, }; pub(crate) fn run_worker(generation: u64) -> Result<(), Box> { let generation = generation_from_wire(generation); let stdin = io::stdin(); let mut stdout = io::stdout().lock(); let mut service = WorkerService::new(generation); for line in stdin.lock().lines() { let line = line?; if line.trim().is_empty() { continue; } let request = serde_json::from_str::(&line)?; let response = match request { crate::mcp::protocol::WorkerRequest::Execute { id, operation } => { let outcome = match service.execute(operation) { Ok(result) => crate::mcp::protocol::WorkerOutcome::Success { result }, Err(fault) => crate::mcp::protocol::WorkerOutcome::Fault { fault }, }; crate::mcp::protocol::WorkerResponse { id, outcome } } }; serde_json::to_writer(&mut stdout, &response)?; stdout.write_all(b"\n")?; stdout.flush()?; } Ok(()) } pub(crate) fn run_background_consult(job_file: PathBuf) -> Result<(), Box> { execute_background_consult(job_file)?; Ok(()) } struct WorkerService { generation: Generation, } impl WorkerService { fn new(generation: Generation) -> Self { Self { generation } } fn execute( &mut self, operation: crate::mcp::protocol::WorkerOperation, ) -> Result { match operation { crate::mcp::protocol::WorkerOperation::CallTool { name, arguments } => { self.call_tool(&name, arguments) } } } fn call_tool(&mut self, name: &str, arguments: Value) -> Result { let operation = format!("tools/call:{name}"); let (presentation, arguments) = split_presentation(arguments, &operation, self.generation, FaultStage::Worker)?; let output = match name { "consult" => { let args = deserialize::(arguments, &operation, self.generation)?; let request = ConsultRequest::parse(args) .map_err(|error| invalid_consult_request(self.generation, &operation, error))?; match request.mode() { ConsultMode::Sync => { let response = invoke_claude(&request) .map_err(|error| consult_fault(self.generation, &operation, error))?; consult_output(&request, &response, self.generation, &operation)? } ConsultMode::Background => submit_background_consult( &request, self.generation, FaultStage::Worker, &operation, )?, } } other => { return Err(FaultRecord::invalid_input( self.generation, FaultStage::Worker, &operation, format!("unknown worker tool `{other}`"), )); } }; tool_success( output, presentation, self.generation, FaultStage::Worker, &operation, ) } } #[derive(Debug, Deserialize)] struct ConsultArgs { prompt: String, cwd: Option, max_turns: Option, session_id: Option, background: Option, } #[derive(Debug, Deserialize)] struct ConsultJobArgs { job_id: String, } #[derive(Debug, Default, Deserialize)] struct ConsultJobsArgs { limit: Option, } #[derive(Debug, Clone)] struct ConsultRequest { prompt: PromptText, cwd: WorkingDirectory, max_turns: Option, session: Option, mode: ConsultMode, } impl ConsultRequest { fn parse(args: ConsultArgs) -> Result { Ok(Self { prompt: PromptText::parse(args.prompt)?, cwd: WorkingDirectory::resolve(args.cwd)?, max_turns: args.max_turns.map(TurnLimit::parse).transpose()?, session: args.session_id.map(SessionHandle::parse).transpose()?, mode: ConsultMode::from_background(args.background), }) } fn mode(&self) -> ConsultMode { self.mode } fn session_mode(&self) -> &'static str { if self.session.is_some() { "resumed" } else { "new" } } fn requested_session_id(&self) -> Option { self.session.as_ref().map(SessionHandle::display) } fn background_request(&self) -> BackgroundConsultRequest { BackgroundConsultRequest { prompt: self.prompt.as_str().to_owned(), cwd: self.cwd.display(), max_turns: self.max_turns.map(TurnLimit::get), session_id: self.requested_session_id(), } } } #[derive(Debug, Clone, Copy, Eq, PartialEq)] enum ConsultMode { Sync, Background, } impl ConsultMode { fn from_background(raw: Option) -> Self { if raw.unwrap_or(false) { Self::Background } else { Self::Sync } } fn as_str(self) -> &'static str { match self { Self::Sync => "sync", Self::Background => "background", } } } #[derive(Debug, Clone)] struct PromptText { original: String, rendered: String, } impl PromptText { fn parse(raw: String) -> Result { if raw.trim().is_empty() { return Err(ConsultRequestError::EmptyPrompt); } Ok(Self { rendered: format!("{CLAUDE_CONSULT_PREFIX}\n\n{raw}"), original: raw, }) } fn as_str(&self) -> &str { self.original.as_str() } fn rendered(&self) -> &str { self.rendered.as_str() } } #[derive(Debug, Clone)] struct WorkingDirectory(PathBuf); impl WorkingDirectory { fn resolve(raw: Option) -> Result { let base = std::env::current_dir().map_err(|source| ConsultRequestError::CurrentDir { source })?; let requested = raw.map_or_else(|| base.clone(), PathBuf::from); let candidate = if requested.is_absolute() { requested } else { base.join(requested) }; let canonical = candidate .canonicalize() .map_err(|source| ConsultRequestError::Canonicalize { path: candidate.display().to_string(), source, })?; if !canonical.is_dir() { return Err(ConsultRequestError::NotDirectory( canonical.display().to_string(), )); } Ok(Self(canonical)) } fn as_path(&self) -> &Path { self.0.as_path() } fn display(&self) -> String { self.0.display().to_string() } } #[derive(Debug, Clone, Copy)] struct TurnLimit(u64); impl TurnLimit { fn parse(raw: u64) -> Result { if raw == 0 { return Err(ConsultRequestError::InvalidTurnLimit); } Ok(Self(raw)) } fn get(self) -> u64 { self.0 } } #[derive(Debug, Clone)] struct SessionHandle(Uuid); impl SessionHandle { fn parse(raw: String) -> Result { Uuid::parse_str(&raw) .map(Self) .map_err(|_| ConsultRequestError::InvalidSessionHandle(raw)) } fn display(&self) -> String { self.0.to_string() } } #[derive(Debug, Clone, Deserialize, Eq, PartialEq, Serialize)] struct BackgroundConsultRequest { prompt: String, cwd: String, max_turns: Option, session_id: Option, } impl BackgroundConsultRequest { fn into_consult_request(self) -> Result { ConsultRequest::parse(ConsultArgs { prompt: self.prompt, cwd: Some(self.cwd), max_turns: self.max_turns, session_id: self.session_id, background: Some(false), }) } } #[derive(Debug, Clone, Copy, Deserialize, Eq, PartialEq, Serialize)] #[serde(rename_all = "snake_case")] enum BackgroundConsultStatus { Queued, Running, Succeeded, Failed, } impl BackgroundConsultStatus { fn done(self) -> bool { matches!(self, Self::Succeeded | Self::Failed) } fn success(self) -> bool { matches!(self, Self::Succeeded) } fn failed(self) -> bool { matches!(self, Self::Failed) } } #[derive(Debug, Clone, Deserialize, Eq, Ord, PartialEq, PartialOrd, Serialize)] #[serde(transparent)] struct BackgroundConsultJobId(Uuid); impl BackgroundConsultJobId { fn new() -> Self { Self(Uuid::new_v4()) } fn parse(raw: String) -> Result { Uuid::parse_str(&raw) .map(Self) .map_err(|_| ConsultRequestError::InvalidJobHandle(raw)) } fn display(&self) -> String { self.0.to_string() } } #[derive(Debug, Clone, Deserialize, Eq, PartialEq, Serialize)] struct BackgroundConsultFailure { class: String, detail: String, } #[derive(Debug, Clone, Deserialize, PartialEq, Serialize)] struct BackgroundConsultResponseRecord { cwd: String, response: String, duration_ms: u64, duration_api_ms: Option, num_turns: u64, stop_reason: Option, session_id: Option, total_cost_usd: Option, usage: Option, model_usage: Option, permission_denials: Vec, fast_mode_state: Option, uuid: Option, } impl BackgroundConsultResponseRecord { fn from_response(response: ConsultResponse) -> Self { Self { cwd: response.cwd.display(), response: response.result, duration_ms: response.duration_ms, duration_api_ms: response.duration_api_ms, num_turns: response.num_turns, stop_reason: response.stop_reason, session_id: response.session_id, total_cost_usd: response.total_cost_usd, usage: response.usage, model_usage: response.model_usage, permission_denials: response.permission_denials, fast_mode_state: response.fast_mode_state, uuid: response.uuid, } } fn model_name(&self) -> Option { let Value::Object(models) = self.model_usage.as_ref()? else { return None; }; models.keys().next().cloned() } } #[derive(Debug, Clone, Deserialize, PartialEq, Serialize)] struct BackgroundConsultJobRecord { job_id: BackgroundConsultJobId, status: BackgroundConsultStatus, created_unix_ms: u64, updated_unix_ms: u64, started_unix_ms: Option, finished_unix_ms: Option, runner_pid: Option, request: BackgroundConsultRequest, prompt_prefix_injected: bool, result: Option, failure: Option, } impl BackgroundConsultJobRecord { fn new(job_id: BackgroundConsultJobId, request: BackgroundConsultRequest) -> Self { let now = unix_ms_now(); Self { job_id, status: BackgroundConsultStatus::Queued, created_unix_ms: now, updated_unix_ms: now, started_unix_ms: None, finished_unix_ms: None, runner_pid: None, request, prompt_prefix_injected: true, result: None, failure: None, } } fn summary(&self) -> Value { json!({ "job_id": self.job_id.display(), "status": self.status, "done": self.status.done(), "succeeded": self.status.success(), "failed": self.status.failed(), "created_unix_ms": self.created_unix_ms, "updated_unix_ms": self.updated_unix_ms, "started_unix_ms": self.started_unix_ms, "finished_unix_ms": self.finished_unix_ms, "runner_pid": self.runner_pid, "cwd": self.request.cwd, "max_turns": self.request.max_turns, "requested_session_id": self.request.session_id, "prompt_prefix_injected": self.prompt_prefix_injected, }) } } #[derive(Debug, Error)] enum ConsultRequestError { #[error("prompt must not be empty")] EmptyPrompt, #[error("failed to resolve the current working directory: {source}")] CurrentDir { source: io::Error }, #[error("failed to resolve working directory `{path}`: {source}")] Canonicalize { path: String, source: io::Error }, #[error("working directory `{0}` is not a directory")] NotDirectory(String), #[error("max_turns must be greater than zero")] InvalidTurnLimit, #[error("session_id must be a valid UUID, got `{0}`")] InvalidSessionHandle(String), #[error("job_id must be a valid UUID, got `{0}`")] InvalidJobHandle(String), } #[derive(Debug, Error)] enum ConsultInvocationError { #[error("failed to spawn Claude Code: {0}")] Spawn(#[source] io::Error), #[error("Claude Code returned non-JSON output: {0}")] InvalidJson(String), #[error("{0}")] Downstream(String), } #[derive(Debug, Deserialize)] struct ClaudeJsonEnvelope { #[serde(rename = "type")] envelope_type: String, subtype: Option, is_error: bool, duration_ms: Option, duration_api_ms: Option, num_turns: Option, result: Option, stop_reason: Option, session_id: Option, total_cost_usd: Option, usage: Option, #[serde(rename = "modelUsage")] model_usage: Option, #[serde(default)] permission_denials: Vec, fast_mode_state: Option, uuid: Option, } #[derive(Debug)] struct ConsultResponse { cwd: WorkingDirectory, result: String, duration_ms: u64, duration_api_ms: Option, num_turns: u64, stop_reason: Option, session_id: Option, total_cost_usd: Option, usage: Option, model_usage: Option, permission_denials: Vec, fast_mode_state: Option, uuid: Option, } impl ConsultResponse { fn model_name(&self) -> Option { let Value::Object(models) = self.model_usage.as_ref()? else { return None; }; models.keys().next().cloned() } } fn deserialize Deserialize<'de>>( value: Value, operation: &str, generation: Generation, ) -> Result { serde_json::from_value(value).map_err(|error| { FaultRecord::invalid_input( generation, FaultStage::Protocol, operation, format!("invalid params: {error}"), ) }) } fn invalid_consult_request( generation: Generation, operation: &str, error: ConsultRequestError, ) -> FaultRecord { FaultRecord::invalid_input(generation, FaultStage::Worker, operation, error.to_string()) } fn consult_fault( generation: Generation, operation: &str, error: ConsultInvocationError, ) -> FaultRecord { match error { ConsultInvocationError::Spawn(source) => FaultRecord::process( generation, FaultStage::Claude, operation, source.to_string(), ), ConsultInvocationError::InvalidJson(detail) | ConsultInvocationError::Downstream(detail) => { FaultRecord::downstream(generation, FaultStage::Claude, operation, detail) } } } pub(crate) fn consult_job_tool_output( arguments: Value, generation: Generation, stage: FaultStage, operation: &str, ) -> Result { let args = deserialize::(arguments, operation, generation)?; let job_id = BackgroundConsultJobId::parse(args.job_id) .map_err(|error| invalid_consult_request(generation, operation, error))?; let record = load_background_consult_job(&job_id).map_err(|error| { FaultRecord::downstream(generation, stage, operation, error.to_string()) })?; background_job_tool_output(&record, generation, stage, operation) } pub(crate) fn consult_jobs_tool_output( arguments: Value, generation: Generation, stage: FaultStage, operation: &str, ) -> Result { let args = deserialize::(arguments, operation, generation)?; let limit = args.limit.unwrap_or(10).clamp(1, 50); let jobs = load_recent_background_consult_jobs(limit as usize).map_err(|error| { FaultRecord::downstream(generation, stage, operation, error.to_string()) })?; let concise = json!({ "jobs": jobs.iter().map(BackgroundConsultJobRecord::summary).collect::>(), "count": jobs.len(), }); let full = json!({ "jobs": jobs, "count": jobs.len(), }); let lines = if jobs.is_empty() { "no background consult jobs".to_owned() } else { jobs.iter() .map(|job| { format!( "{} status={} cwd={}", job.job_id.display(), serde_json::to_string(&job.status).unwrap_or_else(|_| "\"unknown\"".to_owned()), job.request.cwd ) }) .collect::>() .join("\n") }; fallback_detailed_tool_output( &concise, &full, lines, None, SurfaceKind::Read, generation, stage, operation, ) } fn submit_background_consult( request: &ConsultRequest, generation: Generation, stage: FaultStage, operation: &str, ) -> Result { let job_id = BackgroundConsultJobId::new(); let mut record = BackgroundConsultJobRecord::new(job_id, request.background_request()); persist_background_consult_job(&record) .map_err(|error| FaultRecord::process(generation, stage, operation, error.to_string()))?; let executable = std::env::current_exe() .map_err(|error| FaultRecord::process(generation, stage, operation, error.to_string()))?; let job_file = background_consult_job_path(&record.job_id) .map_err(|error| FaultRecord::process(generation, stage, operation, error.to_string()))?; let child = Command::new(executable) .arg("mcp") .arg("background-consult") .arg("--job-file") .arg(&job_file) .stdin(Stdio::null()) .stdout(Stdio::null()) .stderr(Stdio::null()) .spawn() .map_err(|error| FaultRecord::process(generation, stage, operation, error.to_string()))?; record.status = BackgroundConsultStatus::Running; record.started_unix_ms = Some(unix_ms_now()); record.updated_unix_ms = unix_ms_now(); record.runner_pid = Some(child.id()); persist_background_consult_job(&record) .map_err(|error| FaultRecord::process(generation, stage, operation, error.to_string()))?; let concise = json!({ "mode": request.mode().as_str(), "job_id": record.job_id.display(), "status": record.status, "done": false, "requested_session_id": request.requested_session_id(), "session_mode": request.session_mode(), "prompt_prefix_injected": true, "follow_up_tools": ["consult_job", "consult_jobs"], }); let full = json!({ "mode": request.mode().as_str(), "job_id": record.job_id.display(), "status": record.status, "done": false, "requested_session_id": request.requested_session_id(), "session_mode": request.session_mode(), "prompt_prefix_injected": true, "prompt": request.prompt.as_str(), "effective_prompt": request.prompt.rendered(), "cwd": request.cwd.display(), "max_turns": request.max_turns.map(TurnLimit::get), "follow_up_tools": ["consult_job", "consult_jobs"], }); fallback_detailed_tool_output( &concise, &full, format!( "background consult submitted job={} status=running", record.job_id.display() ), None, SurfaceKind::Read, generation, stage, operation, ) } fn execute_background_consult(job_file: PathBuf) -> io::Result<()> { let mut record = read_json_file::(&job_file)?; record.status = BackgroundConsultStatus::Running; let _ = record.runner_pid.get_or_insert(std::process::id()); let _ = record.started_unix_ms.get_or_insert_with(unix_ms_now); record.updated_unix_ms = unix_ms_now(); persist_background_consult_job_to_path(&job_file, &record)?; let request = record .request .clone() .into_consult_request() .map_err(|error| io::Error::other(error.to_string()))?; match invoke_claude(&request) { Ok(response) => { record.status = BackgroundConsultStatus::Succeeded; record.result = Some(BackgroundConsultResponseRecord::from_response(response)); record.failure = None; } Err(error) => { record.status = BackgroundConsultStatus::Failed; record.result = None; record.failure = Some(background_failure(error)); } } record.finished_unix_ms = Some(unix_ms_now()); record.updated_unix_ms = unix_ms_now(); record.runner_pid = None; persist_background_consult_job_to_path(&job_file, &record) } fn background_failure(error: ConsultInvocationError) -> BackgroundConsultFailure { match error { ConsultInvocationError::Spawn(source) => BackgroundConsultFailure { class: "process".to_owned(), detail: source.to_string(), }, ConsultInvocationError::InvalidJson(detail) | ConsultInvocationError::Downstream(detail) => BackgroundConsultFailure { class: "downstream".to_owned(), detail, }, } } fn background_job_tool_output( record: &BackgroundConsultJobRecord, generation: Generation, stage: FaultStage, operation: &str, ) -> Result { let concise = json!({ "job_id": record.job_id.display(), "status": record.status, "done": record.status.done(), "succeeded": record.status.success(), "failed": record.status.failed(), "created_unix_ms": record.created_unix_ms, "updated_unix_ms": record.updated_unix_ms, "started_unix_ms": record.started_unix_ms, "finished_unix_ms": record.finished_unix_ms, "runner_pid": record.runner_pid, "cwd": record.request.cwd, "requested_session_id": record.request.session_id, "prompt_prefix_injected": record.prompt_prefix_injected, "result": record.result.as_ref().map(|result| json!({ "response": result.response, "duration_ms": result.duration_ms, "num_turns": result.num_turns, "session_id": result.session_id, "model": result.model_name(), })), "failure": record.failure, }); let full = json!({ "job": record, }); let mut lines = vec![format!( "job={} status={:?}", record.job_id.display(), record.status )]; if let Some(result) = record.result.as_ref() { lines.push(format!( "result ready model={} turns={} duration={}", result.model_name().unwrap_or_else(|| "unknown".to_owned()), result.num_turns, render_duration_ms(result.duration_ms) )); lines.push("response:".to_owned()); lines.push(result.response.clone()); } if let Some(failure) = record.failure.as_ref() { lines.push(format!("failure={} {}", failure.class, failure.detail)); } fallback_detailed_tool_output( &concise, &full, lines.join("\n"), None, SurfaceKind::Read, generation, stage, operation, ) } fn background_consult_job_root() -> io::Result { let root = state_dir() .map(|root| root.join("phone_opus")) .or_else(|| home_dir().map(|home| home.join(".local").join("state").join("phone_opus"))) .ok_or_else(|| { io::Error::new( io::ErrorKind::NotFound, "failed to resolve phone_opus state root", ) })?; let path = root.join("mcp").join("consult_jobs"); fs::create_dir_all(&path)?; Ok(path) } fn background_consult_job_path(job_id: &BackgroundConsultJobId) -> io::Result { Ok(background_consult_job_root()?.join(format!("{}.json", job_id.display()))) } fn persist_background_consult_job(record: &BackgroundConsultJobRecord) -> io::Result<()> { let path = background_consult_job_path(&record.job_id)?; persist_background_consult_job_to_path(&path, record) } fn persist_background_consult_job_to_path( path: &Path, record: &BackgroundConsultJobRecord, ) -> io::Result<()> { write_json_file(path, record) } fn load_background_consult_job( job_id: &BackgroundConsultJobId, ) -> io::Result { read_json_file(background_consult_job_path(job_id)?.as_path()) } fn load_recent_background_consult_jobs( limit: usize, ) -> io::Result> { let mut jobs = fs::read_dir(background_consult_job_root()?)? .filter_map(Result::ok) .map(|entry| entry.path()) .filter(|path| path.extension().and_then(|ext| ext.to_str()) == Some("json")) .filter_map(|path| read_json_file::(&path).ok()) .collect::>(); jobs.sort_by(|left, right| { right .updated_unix_ms .cmp(&left.updated_unix_ms) .then_with(|| left.job_id.cmp(&right.job_id)) }); jobs.truncate(limit); Ok(jobs) } fn read_json_file Deserialize<'de>>(path: &Path) -> io::Result { let bytes = fs::read(path)?; serde_json::from_slice(&bytes).map_err(|error| io::Error::other(error.to_string())) } fn write_json_file(path: &Path, value: &T) -> io::Result<()> { let payload = serde_json::to_vec_pretty(value).map_err(|error| io::Error::other(error.to_string()))?; let temp_path = path.with_extension("json.tmp"); fs::write(&temp_path, payload)?; fs::rename(temp_path, path)?; Ok(()) } fn unix_ms_now() -> u64 { let duration = SystemTime::now() .duration_since(UNIX_EPOCH) .unwrap_or(Duration::ZERO); u64::try_from(duration.as_millis()).unwrap_or(u64::MAX) } fn invoke_claude(request: &ConsultRequest) -> Result { let mut command = Command::new(claude_binary()); let _ = command .arg("-p") .arg("--output-format") .arg("json") .arg("--strict-mcp-config") .arg("--mcp-config") .arg(EMPTY_MCP_CONFIG) .arg("--disable-slash-commands") .arg("--no-chrome") .arg("--model") .arg(CLAUDE_MODEL) .arg("--effort") .arg(CLAUDE_EFFORT) .arg("--tools") .arg(CLAUDE_TOOLSET) .arg("--permission-mode") .arg("dontAsk"); if let Some(session) = request.session.as_ref() { let _ = command.arg("--resume").arg(session.display()); } if let Some(max_turns) = request.max_turns { let _ = command.arg("--max-turns").arg(max_turns.get().to_string()); } let output = command .current_dir(request.cwd.as_path()) .arg(request.prompt.rendered()) .output() .map_err(ConsultInvocationError::Spawn)?; let stdout = String::from_utf8_lossy(&output.stdout).trim().to_owned(); let stderr = String::from_utf8_lossy(&output.stderr).trim().to_owned(); let envelope = match serde_json::from_slice::(&output.stdout) { Ok(envelope) => envelope, Err(_error) if !output.status.success() => { return Err(ConsultInvocationError::Downstream(downstream_message( output.status.code(), &stdout, &stderr, ))); } Err(error) => { return Err(ConsultInvocationError::InvalidJson(format!( "{error}; stdout={stdout}; stderr={stderr}" ))); } }; if envelope.envelope_type != "result" { return Err(ConsultInvocationError::Downstream(format!( "unexpected Claude envelope type `{}`", envelope.envelope_type ))); } if !output.status.success() || envelope.is_error || envelope.subtype.as_deref() != Some("success") { return Err(ConsultInvocationError::Downstream( envelope .result .filter(|value| !value.trim().is_empty()) .unwrap_or_else(|| downstream_message(output.status.code(), &stdout, &stderr)), )); } Ok(ConsultResponse { cwd: request.cwd.clone(), result: envelope.result.unwrap_or_default(), duration_ms: envelope.duration_ms.unwrap_or(0), duration_api_ms: envelope.duration_api_ms, num_turns: envelope.num_turns.unwrap_or(0), stop_reason: envelope.stop_reason, session_id: envelope.session_id, total_cost_usd: envelope.total_cost_usd, usage: envelope.usage, model_usage: envelope.model_usage, permission_denials: envelope.permission_denials, fast_mode_state: envelope.fast_mode_state, uuid: envelope.uuid, }) } fn downstream_message(status_code: Option, stdout: &str, stderr: &str) -> String { if !stderr.is_empty() { return stderr.to_owned(); } if !stdout.is_empty() { return stdout.to_owned(); } format!("Claude Code exited with status {status_code:?}") } fn claude_binary() -> PathBuf { std::env::var_os(CLAUDE_BIN_ENV) .map(PathBuf::from) .unwrap_or_else(|| PathBuf::from("claude")) } fn consult_output( request: &ConsultRequest, response: &ConsultResponse, generation: Generation, operation: &str, ) -> Result { let concise = json!({ "mode": request.mode().as_str(), "response": response.result, "cwd": response.cwd.display(), "session_mode": request.session_mode(), "requested_session_id": request.requested_session_id(), "prompt_prefix_injected": true, "model": response.model_name(), "duration_ms": response.duration_ms, "num_turns": response.num_turns, "stop_reason": response.stop_reason, "session_id": response.session_id, "total_cost_usd": response.total_cost_usd, "permission_denial_count": response.permission_denials.len(), }); let full = json!({ "mode": request.mode().as_str(), "response": response.result, "cwd": response.cwd.display(), "prompt": request.prompt.as_str(), "prompt_prefix": CLAUDE_CONSULT_PREFIX, "effective_prompt": request.prompt.rendered(), "max_turns": request.max_turns.map(TurnLimit::get), "session_mode": request.session_mode(), "requested_session_id": request.requested_session_id(), "duration_ms": response.duration_ms, "duration_api_ms": response.duration_api_ms, "num_turns": response.num_turns, "stop_reason": response.stop_reason, "session_id": response.session_id, "total_cost_usd": response.total_cost_usd, "usage": response.usage, "model_usage": response.model_usage, "permission_denials": response.permission_denials, "fast_mode_state": response.fast_mode_state, "uuid": response.uuid, }); fallback_detailed_tool_output( &concise, &full, concise_text(request, response), Some(full_text(request, response)), SurfaceKind::Read, generation, FaultStage::Worker, operation, ) } fn concise_text(request: &ConsultRequest, response: &ConsultResponse) -> String { let mut status = vec![ "consult ok".to_owned(), format!("session={}", request.session_mode()), format!("turns={}", response.num_turns), format!("duration={}", render_duration_ms(response.duration_ms)), ]; if let Some(model) = response.model_name() { status.push(format!("model={model}")); } if let Some(stop_reason) = response.stop_reason.as_deref() { status.push(format!("stop={stop_reason}")); } if let Some(cost) = response.total_cost_usd { status.push(format!("cost=${cost:.6}")); } let mut lines = vec![status.join(" ")]; lines.push(format!("cwd: {}", response.cwd.display())); if let Some(session_id) = request.requested_session_id() { lines.push(format!("requested_session: {session_id}")); } if let Some(session_id) = response.session_id.as_deref() { lines.push(format!("session: {session_id}")); } if !response.permission_denials.is_empty() { lines.push(format!( "permission_denials: {}", response.permission_denials.len() )); } lines.push("response:".to_owned()); lines.push(response.result.clone()); lines.join("\n") } fn full_text(request: &ConsultRequest, response: &ConsultResponse) -> String { let mut lines = vec![ format!( "consult ok session={} turns={}", request.session_mode(), response.num_turns ), format!("cwd: {}", response.cwd.display()), format!("duration: {}", render_duration_ms(response.duration_ms)), ]; if let Some(session_id) = request.requested_session_id() { lines.push(format!("requested_session: {session_id}")); } if let Some(duration_api_ms) = response.duration_api_ms { lines.push(format!( "api_duration: {}", render_duration_ms(duration_api_ms) )); } if let Some(model) = response.model_name() { lines.push(format!("model: {model}")); } if let Some(stop_reason) = response.stop_reason.as_deref() { lines.push(format!("stop: {stop_reason}")); } if let Some(session_id) = response.session_id.as_deref() { lines.push(format!("session: {session_id}")); } if let Some(cost) = response.total_cost_usd { lines.push(format!("cost_usd: {cost:.6}")); } lines.push(format!( "permission_denials: {}", response.permission_denials.len() )); if let Some(fast_mode_state) = response.fast_mode_state.as_deref() { lines.push(format!("fast_mode: {fast_mode_state}")); } if let Some(uuid) = response.uuid.as_deref() { lines.push(format!("uuid: {uuid}")); } if let Some(usage) = usage_summary(response.usage.as_ref()) { lines.push(format!("usage: {usage}")); } lines.push("response:".to_owned()); lines.push(response.result.clone()); lines.join("\n") } fn usage_summary(usage: Option<&Value>) -> Option { let Value::Object(usage) = usage? else { return None; }; let summary = usage .iter() .filter_map(|(key, value)| match value { Value::Number(number) => Some((key.clone(), number.to_string())), Value::String(text) if !text.is_empty() => Some((key.clone(), text.clone())), _ => None, }) .collect::>(); (!summary.is_empty()).then(|| { summary .into_iter() .map(|(key, value)| format!("{key}={value}")) .collect::>() .join(" ") }) } fn render_duration_ms(duration_ms: u64) -> String { if duration_ms < 1_000 { return format!("{duration_ms}ms"); } let seconds = duration_ms as f64 / 1_000.0; format!("{seconds:.3}s") } fn generation_from_wire(raw: u64) -> Generation { let mut generation = Generation::genesis(); for _ in 1..raw { generation = generation.next(); } generation }