use std::collections::{BTreeMap, BTreeSet}; use std::fs; use std::io::{self, BufRead, Read, Write}; #[cfg(unix)] use std::os::unix::fs::symlink; use std::path::{Path, PathBuf}; use std::process::{Child, Command, ExitStatus, Stdio}; use std::sync::mpsc; use std::thread; use std::time::{Duration, Instant, SystemTime, UNIX_EPOCH}; use dirs::{home_dir, state_dir}; use libmcp::{Generation, SurfaceKind}; use serde::{Deserialize, Serialize}; use serde_json::{Value, json}; use thiserror::Error; use time::{OffsetDateTime, format_description::well_known::Rfc3339}; use users::get_current_uid; use uuid::Uuid; use crate::mcp::fault::{ConsultFaultContext, FaultContext, FaultRecord, FaultStage}; use crate::mcp::output::{ ToolOutput, fallback_detailed_tool_output, split_presentation, tool_success, }; use crate::mcp::protocol::{ CLAUDE_BIN_ENV, CLAUDE_CONSULT_PREFIX, CLAUDE_EFFORT, CLAUDE_MODEL, CLAUDE_TOOLSET, EMPTY_MCP_CONFIG, render_claude_toolset, }; pub(crate) fn run_worker(generation: u64) -> Result<(), Box> { let generation = generation_from_wire(generation); let stdin = io::stdin(); let mut stdout = io::stdout().lock(); let mut service = WorkerService::new(generation); for line in stdin.lock().lines() { let line = line?; if line.trim().is_empty() { continue; } let request = serde_json::from_str::(&line)?; let response = match request { crate::mcp::protocol::WorkerRequest::Execute { id, operation } => { let outcome = match service.execute(operation) { Ok(result) => crate::mcp::protocol::WorkerOutcome::Success { result }, Err(fault) => crate::mcp::protocol::WorkerOutcome::Fault { fault }, }; crate::mcp::protocol::WorkerResponse { id, outcome } } }; serde_json::to_writer(&mut stdout, &response)?; stdout.write_all(b"\n")?; stdout.flush()?; } Ok(()) } struct WorkerService { generation: Generation, } impl WorkerService { fn new(generation: Generation) -> Self { Self { generation } } fn execute( &mut self, operation: crate::mcp::protocol::WorkerOperation, ) -> Result { match operation { crate::mcp::protocol::WorkerOperation::CallTool { name, arguments } => { self.call_tool(&name, arguments) } } } fn call_tool(&mut self, name: &str, arguments: Value) -> Result { let operation = format!("tools/call:{name}"); let (presentation, arguments) = split_presentation(arguments, &operation, self.generation, FaultStage::Worker)?; let output = match name { "consult" => { let args = deserialize::(arguments, &operation, self.generation)?; let request = ConsultRequest::parse(args) .map_err(|error| invalid_consult_request(self.generation, &operation, error))?; let response = invoke_claude(&request) .map_err(|error| consult_fault(self.generation, &operation, &request, error))?; consult_output(&request, &response, self.generation, &operation)? } other => { return Err(FaultRecord::invalid_input( self.generation, FaultStage::Worker, &operation, format!("unknown worker tool `{other}`"), )); } }; tool_success( output, presentation, self.generation, FaultStage::Worker, &operation, ) } } #[derive(Debug, Deserialize)] struct ConsultArgs { prompt: String, cwd: Option, } #[derive(Debug, Clone)] struct ConsultRequest { prompt: PromptText, cwd: WorkingDirectory, session: SessionHandle, } impl ConsultRequest { fn parse(args: ConsultArgs) -> Result { let prompt = PromptText::parse(args.prompt)?; let cwd = WorkingDirectory::resolve(args.cwd)?; Ok(Self { prompt, cwd, session: SessionHandle::fresh(), }) } fn session_id(&self) -> String { self.session.display() } } #[derive(Debug, Clone)] struct PromptText { original: String, rendered: String, } impl PromptText { fn parse(raw: String) -> Result { if raw.trim().is_empty() { return Err(ConsultRequestError::EmptyPrompt); } Ok(Self { rendered: format!("{CLAUDE_CONSULT_PREFIX}\n\n{raw}"), original: raw, }) } fn as_str(&self) -> &str { self.original.as_str() } fn rendered(&self) -> &str { self.rendered.as_str() } } #[derive(Debug, Clone)] struct WorkingDirectory(PathBuf); impl WorkingDirectory { fn resolve(raw: Option) -> Result { let requested = raw.map(PathBuf::from); let base = requested .as_ref() .is_none_or(|path| !path.is_absolute()) .then(|| { std::env::current_dir().map_err(|source| ConsultRequestError::CurrentDir { source }) }) .transpose()?; let candidate = Self::resolve_candidate(base.as_deref(), requested)?; Self::canonicalize_directory(candidate) } fn resolve_candidate( base: Option<&Path>, requested: Option, ) -> Result { match requested { Some(path) if path.is_absolute() => Ok(path), Some(path) => Ok(Self::require_base(base)?.join(path)), None => Ok(Self::require_base(base)?.to_path_buf()), } } fn require_base(base: Option<&Path>) -> Result<&Path, ConsultRequestError> { base.ok_or_else(|| ConsultRequestError::CurrentDir { source: io::Error::new( io::ErrorKind::NotFound, "current working directory is unavailable", ), }) } fn canonicalize_directory(candidate: PathBuf) -> Result { let canonical = candidate .canonicalize() .map_err(|source| ConsultRequestError::Canonicalize { path: candidate.display().to_string(), source, })?; if !canonical.is_dir() { return Err(ConsultRequestError::NotDirectory( canonical.display().to_string(), )); } Ok(Self(canonical)) } fn as_path(&self) -> &Path { self.0.as_path() } fn display(&self) -> String { self.0.display().to_string() } } #[derive(Debug, Clone)] struct SessionHandle(Uuid); impl SessionHandle { fn fresh() -> Self { Self(Uuid::new_v4()) } fn display(&self) -> String { self.0.to_string() } } #[derive(Debug, Error)] enum ConsultRequestError { #[error("prompt must not be empty")] EmptyPrompt, #[error("failed to resolve the current working directory: {source}")] CurrentDir { source: io::Error }, #[error("failed to resolve working directory `{path}`: {source}")] Canonicalize { path: String, source: io::Error }, #[error("working directory `{0}` is not a directory")] NotDirectory(String), } #[derive(Debug, Error)] enum ConsultInvocationError { #[error("failed to spawn Claude Code: {0}")] Spawn(#[source] io::Error), #[error("Claude Code returned non-JSON output: {0}")] InvalidJson(String), #[error("{0}")] Stalled(String), #[error("{0}")] Downstream(String), } #[derive(Debug, Deserialize)] struct ClaudeJsonEnvelope { #[serde(rename = "type")] envelope_type: String, subtype: Option, is_error: bool, duration_ms: Option, duration_api_ms: Option, num_turns: Option, result: Option, stop_reason: Option, total_cost_usd: Option, usage: Option, #[serde(rename = "modelUsage")] model_usage: Option, #[serde(default)] permission_denials: Vec, fast_mode_state: Option, uuid: Option, } #[derive(Debug)] struct ConsultResponse { cwd: WorkingDirectory, result: String, persisted_output_path: PersistedConsultPath, duration_ms: u64, duration_api_ms: Option, num_turns: u64, stop_reason: Option, total_cost_usd: Option, usage: Option, model_usage: Option, permission_denials: Vec, fast_mode_state: Option, uuid: Option, } impl ConsultResponse { fn model_name(&self) -> Option { model_name(self.model_usage.as_ref()) } } #[derive(Debug)] struct CompletedClaudeInvocation { status: ExitStatus, stdout: String, stderr: String, } #[derive(Debug)] struct ClaudeProgressMonitor { transcript: ClaudeTranscriptMonitor, last_progress_at: Instant, stall_timeout: Duration, } impl ClaudeProgressMonitor { fn new(transcript: ClaudeTranscriptMonitor) -> Self { Self { transcript, last_progress_at: Instant::now(), stall_timeout: claude_stall_timeout(), } } fn record_pipe_activity(&mut self) { self.last_progress_at = Instant::now(); } fn record_transcript_activity(&mut self) -> io::Result<()> { if self.transcript.observe_progress()? { self.last_progress_at = Instant::now(); } Ok(()) } fn stalled(&self) -> bool { self.last_progress_at.elapsed() >= self.stall_timeout } fn stall_message(&self) -> String { let timeout_ms = u64::try_from(self.stall_timeout.as_millis()).unwrap_or(u64::MAX); match self.transcript.current_path() { Some(path) => format!( "Claude made no observable progress for {} while running {}; transcript={}", render_duration_ms(timeout_ms), self.transcript.session_id(), path.display() ), None => format!( "Claude made no observable progress for {} while running {}", render_duration_ms(timeout_ms), self.transcript.session_id() ), } } } #[derive(Debug)] struct ClaudeTranscriptMonitor { projects_root: PathBuf, session_id: String, resolved_path: Option, observed_signature: Option, } impl ClaudeTranscriptMonitor { fn new(projects_root: PathBuf, session_id: String) -> Self { Self { projects_root, session_id, resolved_path: None, observed_signature: None, } } fn session_id(&self) -> &str { self.session_id.as_str() } fn current_path(&self) -> Option<&Path> { self.resolved_path.as_deref() } fn observe_progress(&mut self) -> io::Result { if self.resolved_path.is_none() { self.resolved_path = self.find_transcript_path()?; } let Some(path) = self.resolved_path.as_ref() else { return Ok(false); }; let metadata = match fs::metadata(path) { Ok(metadata) => metadata, Err(error) if error.kind() == io::ErrorKind::NotFound => { self.resolved_path = None; self.observed_signature = None; return Ok(false); } Err(error) => return Err(error), }; let signature = TranscriptProgressSignature::from_metadata(&metadata)?; let progressed = self.observed_signature.as_ref() != Some(&signature); self.observed_signature = Some(signature); Ok(progressed) } fn find_transcript_path(&self) -> io::Result> { let entries = match fs::read_dir(&self.projects_root) { Ok(entries) => entries, Err(error) if error.kind() == io::ErrorKind::NotFound => return Ok(None), Err(error) => return Err(error), }; for entry in entries { let entry = entry?; if !entry.file_type()?.is_dir() { continue; } let candidate = entry.path().join(format!("{}.jsonl", self.session_id)); if candidate.exists() { return Ok(Some(candidate)); } } Ok(None) } } #[derive(Debug, Clone, Eq, PartialEq)] struct TranscriptProgressSignature { modified_unix_nanos: u128, length_bytes: u64, } impl TranscriptProgressSignature { fn from_metadata(metadata: &fs::Metadata) -> io::Result { Ok(Self { modified_unix_nanos: metadata .modified()? .duration_since(UNIX_EPOCH) .map_err(|error| io::Error::other(format!("invalid transcript mtime: {error}")))? .as_nanos(), length_bytes: metadata.len(), }) } } const SYSTEMD_RUN_BINARY: &str = "systemd-run"; const SYSTEMCTL_BINARY: &str = "systemctl"; const DEFAULT_PATH: &str = "/usr/local/bin:/usr/bin:/bin"; const PHONE_OPUS_STATE_ROOT_NAME: &str = "phone_opus"; const CLAUDE_HOME_DIR_NAME: &str = "claude-home"; const XDG_CONFIG_DIR_NAME: &str = "xdg-config"; const XDG_CACHE_DIR_NAME: &str = "xdg-cache"; const XDG_STATE_DIR_NAME: &str = "xdg-state"; const SHARED_TMP_ROOTS: [&str; 2] = ["/tmp", "/var/tmp"]; const CONSULT_OUTPUT_ROOT: &str = "/tmp/phone_opus-consults"; const CONSULT_OUTPUT_KEEP_COUNT: usize = 256; const CONSULT_OUTPUT_MAX_AGE: Duration = Duration::from_secs(7 * 24 * 60 * 60); const CLAUDE_PROGRESS_POLL_INTERVAL: Duration = Duration::from_millis(250); const DEFAULT_CLAUDE_STALL_TIMEOUT: Duration = Duration::from_secs(20 * 60); const CONSULT_TIMESTAMP_FORMAT: &[time::format_description::FormatItem<'static>] = time::macros::format_description!("[year][month][day]T[hour][minute][second]Z"); const CLAUDE_MIRROR_FILES: [&str; 4] = [ "settings.json", "settings.local.json", ".claude/settings.local.json", "CLAUDE.md", ]; const SERVICE_ENV_ALLOWLIST: [&str; 19] = [ "LANG", "LC_ALL", "LC_CTYPE", "TERM", "COLORTERM", "USER", "LOGNAME", "HTTP_PROXY", "HTTPS_PROXY", "NO_PROXY", "ALL_PROXY", "http_proxy", "https_proxy", "no_proxy", "all_proxy", "SSL_CERT_FILE", "SSL_CERT_DIR", "REQUESTS_CA_BUNDLE", "NODE_EXTRA_CA_CERTS", ]; const SERVICE_ENV_PREFIX_ALLOWLIST: [&str; 2] = ["ANTHROPIC_", "PHONE_OPUS_TEST_"]; #[derive(Debug, Clone)] struct ClaudeSandbox { source_home: PathBuf, state_root: PathBuf, claude_home: PathBuf, xdg_config_home: PathBuf, xdg_cache_home: PathBuf, xdg_state_home: PathBuf, } impl ClaudeSandbox { fn prepare() -> io::Result { let source_home = caller_home_dir().ok_or_else(|| { io::Error::new( io::ErrorKind::NotFound, "failed to resolve the caller home directory", ) })?; let state_root = phone_opus_state_root()?; let sandbox = Self { source_home, claude_home: state_root.join(CLAUDE_HOME_DIR_NAME), xdg_config_home: state_root.join(XDG_CONFIG_DIR_NAME), xdg_cache_home: state_root.join(XDG_CACHE_DIR_NAME), xdg_state_home: state_root.join(XDG_STATE_DIR_NAME), state_root, }; sandbox.create_layout()?; sandbox.sync_seed_claude_files()?; Ok(sandbox) } fn create_layout(&self) -> io::Result<()> { fs::create_dir_all(&self.claude_home)?; fs::create_dir_all(self.claude_config_dir())?; fs::create_dir_all(&self.xdg_config_home)?; fs::create_dir_all(&self.xdg_cache_home)?; fs::create_dir_all(&self.xdg_state_home)?; Ok(()) } fn claude_config_dir(&self) -> PathBuf { self.claude_home.join(".claude") } fn source_claude_dir(&self) -> PathBuf { self.source_home.join(".claude") } fn source_credentials_path(&self) -> PathBuf { self.source_claude_dir().join(".credentials.json") } fn destination_credentials_path(&self) -> PathBuf { self.claude_config_dir().join(".credentials.json") } fn sync_seed_claude_files(&self) -> io::Result<()> { let source_root = self.source_claude_dir(); let destination_root = self.claude_config_dir(); for relative in CLAUDE_MIRROR_FILES { sync_optional_seed_file( source_root.join(relative).as_path(), destination_root.join(relative).as_path(), )?; } self.sync_live_credentials()?; Ok(()) } fn read_only_paths(&self, request: &ConsultRequest) -> BTreeSet { let cwd = request.cwd.as_path(); let mut paths = BTreeSet::from([self.source_home.clone()]); if self.force_read_only_cwd(cwd) { let _ = paths.insert(cwd.to_path_buf()); } paths } fn force_read_only_cwd(&self, cwd: &Path) -> bool { self.read_write_paths() .iter() .any(|path| cwd.starts_with(path)) && !SHARED_TMP_ROOTS.iter().any(|root| cwd == Path::new(root)) } fn read_write_paths(&self) -> BTreeSet { let mut paths = BTreeSet::new(); let _ = paths.insert(self.state_root.clone()); if self.source_credentials_path().exists() { let _ = paths.insert(self.source_credentials_path()); } for root in SHARED_TMP_ROOTS { let _ = paths.insert(PathBuf::from(root)); } paths } fn service_environment(&self) -> BTreeMap { let runtime_dir = caller_runtime_dir(); let mut environment = BTreeMap::from([ ("HOME".to_owned(), self.claude_home.display().to_string()), ( "XDG_CONFIG_HOME".to_owned(), self.xdg_config_home.display().to_string(), ), ( "XDG_CACHE_HOME".to_owned(), self.xdg_cache_home.display().to_string(), ), ( "XDG_STATE_HOME".to_owned(), self.xdg_state_home.display().to_string(), ), ( "XDG_RUNTIME_DIR".to_owned(), runtime_dir.display().to_string(), ), ( "DBUS_SESSION_BUS_ADDRESS".to_owned(), caller_dbus_session_bus_address(runtime_dir.as_path()), ), ("PATH".to_owned(), caller_path()), ]); for name in SERVICE_ENV_ALLOWLIST { if let Some(value) = caller_env(name) { let _ = environment.insert(name.to_owned(), value); } } for (name, value) in std::env::vars() { if SERVICE_ENV_PREFIX_ALLOWLIST .iter() .any(|prefix| name.starts_with(prefix)) { let _ = environment.insert(name, value); } } environment } fn sync_live_credentials(&self) -> io::Result<()> { let source = self.source_credentials_path(); let destination = self.destination_credentials_path(); if !source.exists() { remove_optional_path(destination.as_path())?; return Ok(()); } if fs::read_link(destination.as_path()).ok().as_ref() == Some(&source) { return Ok(()); } remove_optional_path(destination.as_path())?; symlink(source.as_path(), destination.as_path()) } } #[derive(Debug, Clone)] struct PersistedConsultPath(PathBuf); impl PersistedConsultPath { fn new(request: &ConsultRequest) -> io::Result { fs::create_dir_all(CONSULT_OUTPUT_ROOT)?; let timestamp = OffsetDateTime::now_utc() .format(CONSULT_TIMESTAMP_FORMAT) .map_err(|error| io::Error::other(error.to_string()))?; let slug = consult_slug(request.prompt.as_str()); Ok(Self(Path::new(CONSULT_OUTPUT_ROOT).join(format!( "{timestamp}-{slug}-{}.json", Uuid::new_v4() )))) } fn as_path(&self) -> &Path { self.0.as_path() } fn display(&self) -> String { self.0.display().to_string() } } fn deserialize Deserialize<'de>>( value: Value, operation: &str, generation: Generation, ) -> Result { serde_json::from_value(value).map_err(|error| { FaultRecord::invalid_input( generation, FaultStage::Protocol, operation, format!("invalid params: {error}"), ) }) } fn invalid_consult_request( generation: Generation, operation: &str, error: ConsultRequestError, ) -> FaultRecord { FaultRecord::invalid_input(generation, FaultStage::Worker, operation, error.to_string()) } fn consult_fault( generation: Generation, operation: &str, request: &ConsultRequest, error: ConsultInvocationError, ) -> FaultRecord { let context = consult_fault_context(request, &error); let record = match error { ConsultInvocationError::Spawn(source) => FaultRecord::process( generation, FaultStage::Claude, operation, source.to_string(), ), ConsultInvocationError::InvalidJson(detail) | ConsultInvocationError::Stalled(detail) | ConsultInvocationError::Downstream(detail) => { FaultRecord::downstream(generation, FaultStage::Claude, operation, detail) } }; record.with_context(context) } fn consult_fault_context(request: &ConsultRequest, error: &ConsultInvocationError) -> FaultContext { let detail = match error { ConsultInvocationError::Spawn(_) => None, ConsultInvocationError::InvalidJson(detail) | ConsultInvocationError::Stalled(detail) | ConsultInvocationError::Downstream(detail) => Some(detail.as_str()), }; let quota_reset_hint = detail.and_then(quota_reset_hint); let quota_limited = quota_reset_hint.is_some(); let retry_hint = consult_retry_hint(quota_limited, error); FaultContext { consult: Some(ConsultFaultContext { cwd: request.cwd.display(), quota_limited, quota_reset_hint, retry_hint, }), } } fn quota_reset_hint(detail: &str) -> Option { let (_, suffix) = detail.split_once("resets ")?; let hint = suffix.trim(); (!hint.is_empty()).then(|| hint.to_owned()) } fn consult_retry_hint(quota_limited: bool, error: &ConsultInvocationError) -> Option { if quota_limited { return Some("wait for the quota window to reset, then retry the consult".to_owned()); } match error { ConsultInvocationError::Stalled(_) => { Some("Claude stopped making observable progress; retry the consult".to_owned()) } ConsultInvocationError::Spawn(_) | ConsultInvocationError::InvalidJson(_) | ConsultInvocationError::Downstream(_) => None, } } fn write_json_file(path: &Path, value: &T) -> io::Result<()> { let payload = serde_json::to_vec_pretty(value).map_err(|error| io::Error::other(error.to_string()))?; let temp_path = path.with_extension("json.tmp"); fs::write(&temp_path, payload)?; fs::rename(temp_path, path)?; Ok(()) } fn unix_ms_now() -> u64 { let duration = SystemTime::now() .duration_since(UNIX_EPOCH) .unwrap_or(Duration::ZERO); u64::try_from(duration.as_millis()).unwrap_or(u64::MAX) } fn invoke_claude(request: &ConsultRequest) -> Result { let sandbox = ClaudeSandbox::prepare().map_err(ConsultInvocationError::Spawn)?; let unit_name = claude_unit_name(request); let transcript = ClaudeTranscriptMonitor::new( sandbox.claude_config_dir().join("projects"), request.session_id(), ); let mut command = Command::new(SYSTEMD_RUN_BINARY); let runtime_dir = caller_runtime_dir(); let _ = command .env("XDG_RUNTIME_DIR", runtime_dir.as_os_str()) .env( "DBUS_SESSION_BUS_ADDRESS", caller_dbus_session_bus_address(runtime_dir.as_path()), ) .arg(format!("--unit={unit_name}")) .arg("--user") .arg("--wait") .arg("--pipe") .arg("--collect") .arg("--quiet") .arg("--working-directory") .arg(request.cwd.as_path()); for property in [ "ProtectSystem=strict", "NoNewPrivileges=yes", "PrivateDevices=yes", "RestrictSUIDSGID=yes", "RestrictAddressFamilies=AF_UNIX AF_INET AF_INET6", ] { let _ = command.arg("-p").arg(property); } for path in sandbox.read_write_paths() { let _ = command .arg("-p") .arg(format!("ReadWritePaths={}", path.display())); } for path in sandbox.read_only_paths(request) { let _ = command .arg("-p") .arg(format!("ReadOnlyPaths={}", path.display())); } for (name, value) in sandbox.service_environment() { let _ = command.arg(format!("--setenv={name}={value}")); } let _ = command .arg(claude_binary()) .arg("-p") .arg("--output-format") .arg("json") .arg("--strict-mcp-config") .arg("--mcp-config") .arg(EMPTY_MCP_CONFIG) .arg("--disable-slash-commands") .arg("--no-chrome") .arg("--model") .arg(CLAUDE_MODEL) .arg("--effort") .arg(CLAUDE_EFFORT) .arg("--tools") .arg(render_claude_toolset(CLAUDE_TOOLSET)) .arg("--dangerously-skip-permissions") .arg("--session-id") .arg(request.session_id()) .stdout(Stdio::piped()) .stderr(Stdio::piped()); let child = command .arg(request.prompt.rendered()) .spawn() .map_err(ConsultInvocationError::Spawn)?; let output = wait_for_claude_completion(child, unit_name.as_str(), transcript)?; let status = output.status; let stdout = output.stdout.trim().to_owned(); let stderr = output.stderr.trim().to_owned(); let envelope = parse_claude_json_envelope(&stdout, &stderr, status.code())?; if envelope.envelope_type != "result" { return Err(ConsultInvocationError::InvalidJson(format!( "unexpected Claude envelope type `{}`", envelope.envelope_type ))); } if !status.success() || envelope.is_error || envelope.subtype.as_deref() != Some("success") { return Err(ConsultInvocationError::Downstream( envelope .result .filter(|value| !value.trim().is_empty()) .unwrap_or_else(|| downstream_message(status.code(), &stdout, &stderr)), )); } let result = envelope.result.clone().unwrap_or_default(); let persisted_output_path = persist_consult_output(request, &result, &envelope) .map_err(ConsultInvocationError::Spawn)?; Ok(ConsultResponse { cwd: request.cwd.clone(), result, persisted_output_path, duration_ms: envelope.duration_ms.unwrap_or(0), duration_api_ms: envelope.duration_api_ms, num_turns: envelope.num_turns.unwrap_or(0), stop_reason: envelope.stop_reason, total_cost_usd: envelope.total_cost_usd, usage: envelope.usage, model_usage: envelope.model_usage, permission_denials: envelope.permission_denials, fast_mode_state: envelope.fast_mode_state, uuid: envelope.uuid, }) } fn wait_for_claude_completion( mut child: Child, unit_name: &str, transcript: ClaudeTranscriptMonitor, ) -> Result { let stdout = child .stdout .take() .ok_or_else(|| ConsultInvocationError::Spawn(io::Error::other("missing Claude stdout")))?; let stderr = child .stderr .take() .ok_or_else(|| ConsultInvocationError::Spawn(io::Error::other("missing Claude stderr")))?; let (activity_tx, activity_rx) = mpsc::channel::<()>(); let stdout_reader = spawn_pipe_reader(stdout, activity_tx.clone()); let stderr_reader = spawn_pipe_reader(stderr, activity_tx); let mut progress = ClaudeProgressMonitor::new(transcript); loop { while activity_rx.try_recv().is_ok() { progress.record_pipe_activity(); } progress .record_transcript_activity() .map_err(ConsultInvocationError::Spawn)?; if let Some(status) = child.try_wait().map_err(ConsultInvocationError::Spawn)? { return Ok(CompletedClaudeInvocation { status, stdout: join_output_reader(stdout_reader, "stdout")?, stderr: join_output_reader(stderr_reader, "stderr")?, }); } if progress.stalled() { terminate_claude_invocation(unit_name, &mut child); let stdout = join_output_reader(stdout_reader, "stdout").unwrap_or_default(); let stderr = join_output_reader(stderr_reader, "stderr").unwrap_or_default(); let mut detail = progress.stall_message(); if !stderr.trim().is_empty() { detail.push_str("; stderr="); detail.push_str(stderr.trim()); } else if !stdout.trim().is_empty() { detail.push_str("; stdout="); detail.push_str(stdout.trim()); } return Err(ConsultInvocationError::Stalled(detail)); } thread::sleep(CLAUDE_PROGRESS_POLL_INTERVAL); } } fn parse_claude_json_envelope( stdout: &str, stderr: &str, status_code: Option, ) -> Result { serde_json::from_str(stdout).map_err(|error| { if status_code == Some(0) { ConsultInvocationError::InvalidJson(format!( "{error}; stdout={stdout}; stderr={stderr}" )) } else { ConsultInvocationError::Downstream(downstream_message(status_code, stdout, stderr)) } }) } fn downstream_message(status_code: Option, stdout: &str, stderr: &str) -> String { if !stderr.is_empty() { return stderr.to_owned(); } if !stdout.is_empty() { return stdout.to_owned(); } format!("Claude Code exited with status {status_code:?}") } fn spawn_pipe_reader( mut pipe: impl Read + Send + 'static, activity_tx: mpsc::Sender<()>, ) -> thread::JoinHandle> { thread::spawn(move || { let mut bytes = Vec::new(); let mut buffer = [0_u8; 8192]; loop { let read = pipe.read(&mut buffer)?; if read == 0 { break; } bytes.extend_from_slice(&buffer[..read]); let _ = activity_tx.send(()); } Ok(String::from_utf8_lossy(&bytes).into_owned()) }) } fn join_output_reader( reader: thread::JoinHandle>, stream_name: &str, ) -> Result { reader .join() .map_err(|_| { ConsultInvocationError::Spawn(io::Error::other(format!( "Claude {stream_name} reader panicked" ))) })? .map_err(ConsultInvocationError::Spawn) } fn terminate_claude_invocation(unit_name: &str, child: &mut Child) { let _ = stop_transient_unit(unit_name); let _ = child.kill(); let _ = child.wait(); } fn stop_transient_unit(unit_name: &str) -> io::Result<()> { let runtime_dir = caller_runtime_dir(); let status = Command::new(SYSTEMCTL_BINARY) .env("XDG_RUNTIME_DIR", runtime_dir.as_os_str()) .env( "DBUS_SESSION_BUS_ADDRESS", caller_dbus_session_bus_address(runtime_dir.as_path()), ) .stdout(Stdio::null()) .stderr(Stdio::null()) .arg("--user") .arg("stop") .arg(unit_name) .status()?; let _ = Command::new(SYSTEMCTL_BINARY) .env("XDG_RUNTIME_DIR", runtime_dir.as_os_str()) .env( "DBUS_SESSION_BUS_ADDRESS", caller_dbus_session_bus_address(runtime_dir.as_path()), ) .stdout(Stdio::null()) .stderr(Stdio::null()) .arg("--user") .arg("reset-failed") .arg(unit_name) .status(); if status.success() { Ok(()) } else { Err(io::Error::other(format!( "failed to stop transient unit `{unit_name}`" ))) } } fn claude_unit_name(request: &ConsultRequest) -> String { format!("phone-opus-claude-{}", request.session_id()) } fn claude_stall_timeout() -> Duration { std::env::var("PHONE_OPUS_CLAUDE_STALL_TIMEOUT_MS") .ok() .and_then(|value| value.parse::().ok()) .filter(|value| *value > 0) .map(Duration::from_millis) .unwrap_or(DEFAULT_CLAUDE_STALL_TIMEOUT) } fn claude_binary() -> PathBuf { std::env::var_os(CLAUDE_BIN_ENV) .map(PathBuf::from) .unwrap_or_else(|| PathBuf::from("claude")) } fn phone_opus_state_root() -> io::Result { let base = std::env::var_os("XDG_STATE_HOME") .filter(|value| !value.is_empty()) .map(PathBuf::from) .or_else(state_dir) .or_else(|| caller_home_dir().map(|home| home.join(".local").join("state"))) .ok_or_else(|| { io::Error::new( io::ErrorKind::NotFound, "failed to resolve phone_opus state root", ) })?; let root = base.join(PHONE_OPUS_STATE_ROOT_NAME); fs::create_dir_all(&root)?; Ok(root) } fn caller_home_dir() -> Option { std::env::var_os("HOME") .filter(|value| !value.is_empty()) .map(PathBuf::from) .or_else(home_dir) } fn caller_env(name: &str) -> Option { std::env::var(name).ok().filter(|value| !value.is_empty()) } fn caller_path() -> String { caller_env("PATH").unwrap_or_else(|| DEFAULT_PATH.to_owned()) } fn caller_runtime_dir() -> PathBuf { std::env::var_os("XDG_RUNTIME_DIR") .filter(|value| !value.is_empty()) .map(PathBuf::from) .unwrap_or_else(|| PathBuf::from(format!("/run/user/{}", get_current_uid()))) } fn caller_dbus_session_bus_address(runtime_dir: &Path) -> String { caller_env("DBUS_SESSION_BUS_ADDRESS") .unwrap_or_else(|| format!("unix:path={}", runtime_dir.join("bus").display())) } fn sync_optional_seed_file(source: &Path, destination: &Path) -> io::Result<()> { if source.exists() { let bytes = fs::read(source)?; write_bytes_file(destination, &bytes)?; } else if destination.exists() { fs::remove_file(destination)?; } Ok(()) } fn remove_optional_path(path: &Path) -> io::Result<()> { match fs::symlink_metadata(path) { Ok(metadata) if metadata.is_dir() => fs::remove_dir_all(path), Ok(_) => fs::remove_file(path), Err(error) if error.kind() == io::ErrorKind::NotFound => Ok(()), Err(error) => Err(error), } } fn write_bytes_file(path: &Path, bytes: &[u8]) -> io::Result<()> { let parent = path.parent().ok_or_else(|| { io::Error::new( io::ErrorKind::InvalidInput, format!("path `{}` has no parent directory", path.display()), ) })?; fs::create_dir_all(parent)?; let temp_path = path.with_extension(format!("tmp-{}", Uuid::new_v4())); fs::write(&temp_path, bytes)?; fs::rename(temp_path, path)?; Ok(()) } fn persist_consult_output( request: &ConsultRequest, result: &str, envelope: &ClaudeJsonEnvelope, ) -> io::Result { let path = PersistedConsultPath::new(request)?; let saved_at = OffsetDateTime::now_utc() .format(&Rfc3339) .map_err(|error| io::Error::other(error.to_string()))?; let artifact = json!({ "kind": "phone_opus_consult", "saved_at": saved_at, "saved_unix_ms": unix_ms_now(), "cwd": request.cwd.display(), "prompt": request.prompt.as_str(), "prompt_prefix": CLAUDE_CONSULT_PREFIX, "effective_prompt": request.prompt.rendered(), "response": result, "model": model_name(envelope.model_usage.as_ref()), "duration_ms": envelope.duration_ms.unwrap_or(0), "duration_api_ms": envelope.duration_api_ms, "num_turns": envelope.num_turns.unwrap_or(0), "stop_reason": envelope.stop_reason, "total_cost_usd": envelope.total_cost_usd, "usage": envelope.usage, "model_usage": envelope.model_usage, "permission_denials": envelope.permission_denials, "fast_mode_state": envelope.fast_mode_state, "uuid": envelope.uuid, }); write_json_file(path.as_path(), &artifact)?; let _ = prune_persisted_consult_outputs(); Ok(path) } fn prune_persisted_consult_outputs() -> io::Result<()> { let root = Path::new(CONSULT_OUTPUT_ROOT); if !root.exists() { return Ok(()); } let now = SystemTime::now(); let mut entries = fs::read_dir(root)? .filter_map(Result::ok) .filter(|entry| entry.file_type().is_ok_and(|kind| kind.is_file())) .filter_map(|entry| { let path = entry.path(); (path.extension().and_then(|ext| ext.to_str()) == Some("json")).then_some(path) }) .collect::>(); entries.sort_unstable_by(|left, right| right.file_name().cmp(&left.file_name())); for stale in entries.iter().skip(CONSULT_OUTPUT_KEEP_COUNT) { let _ = fs::remove_file(stale); } for path in entries.iter().take(CONSULT_OUTPUT_KEEP_COUNT) { let age = path .metadata() .and_then(|metadata| metadata.modified()) .ok() .and_then(|modified| now.duration_since(modified).ok()); if age.is_some_and(|age| age > CONSULT_OUTPUT_MAX_AGE) { let _ = fs::remove_file(path); } } Ok(()) } fn consult_slug(prompt: &str) -> String { let mut slug = String::new(); let mut last_was_dash = false; for ch in prompt.chars() { let next = if ch.is_ascii_alphanumeric() { Some(ch.to_ascii_lowercase()) } else if ch.is_ascii_whitespace() || "-_./:".contains(ch) { Some('-') } else { None }; let Some(next) = next else { continue; }; if next == '-' { if slug.is_empty() || last_was_dash { continue; } last_was_dash = true; } else { last_was_dash = false; } slug.push(next); if slug.len() >= 48 { break; } } while slug.ends_with('-') { let _ = slug.pop(); } if slug.is_empty() { "consult".to_owned() } else { slug } } fn consult_output( request: &ConsultRequest, response: &ConsultResponse, generation: Generation, operation: &str, ) -> Result { let concise = json!({ "response": response.result, "cwd": response.cwd.display(), "persisted_output_path": response.persisted_output_path.display(), "prompt_prefix_injected": true, "model": response.model_name(), "duration_ms": response.duration_ms, "num_turns": response.num_turns, "stop_reason": response.stop_reason, "total_cost_usd": response.total_cost_usd, "permission_denial_count": response.permission_denials.len(), }); let full = json!({ "response": response.result, "cwd": response.cwd.display(), "persisted_output_path": response.persisted_output_path.display(), "prompt": request.prompt.as_str(), "prompt_prefix": CLAUDE_CONSULT_PREFIX, "effective_prompt": request.prompt.rendered(), "duration_ms": response.duration_ms, "duration_api_ms": response.duration_api_ms, "num_turns": response.num_turns, "stop_reason": response.stop_reason, "total_cost_usd": response.total_cost_usd, "usage": response.usage, "model_usage": response.model_usage, "permission_denials": response.permission_denials, "fast_mode_state": response.fast_mode_state, "uuid": response.uuid, }); fallback_detailed_tool_output( &concise, &full, concise_text(request, response), Some(full_text(request, response)), SurfaceKind::Read, generation, FaultStage::Worker, operation, ) } fn concise_text(_request: &ConsultRequest, response: &ConsultResponse) -> String { let mut status = vec![ "consult ok".to_owned(), format!("turns={}", response.num_turns), format!("duration={}", render_duration_ms(response.duration_ms)), ]; if let Some(model) = response.model_name() { status.push(format!("model={model}")); } if let Some(stop_reason) = response.stop_reason.as_deref() { status.push(format!("stop={stop_reason}")); } if let Some(cost) = response.total_cost_usd { status.push(format!("cost=${cost:.6}")); } let mut lines = vec![status.join(" ")]; lines.push(format!("cwd: {}", response.cwd.display())); lines.push(format!( "saved: {}", response.persisted_output_path.display() )); if !response.permission_denials.is_empty() { lines.push(format!( "permission_denials: {}", response.permission_denials.len() )); } lines.push("response:".to_owned()); lines.push(response.result.clone()); lines.join("\n") } fn full_text(_request: &ConsultRequest, response: &ConsultResponse) -> String { let mut lines = vec![ format!("consult ok turns={}", response.num_turns), format!("cwd: {}", response.cwd.display()), format!("duration: {}", render_duration_ms(response.duration_ms)), ]; if let Some(duration_api_ms) = response.duration_api_ms { lines.push(format!( "api_duration: {}", render_duration_ms(duration_api_ms) )); } if let Some(model) = response.model_name() { lines.push(format!("model: {model}")); } if let Some(stop_reason) = response.stop_reason.as_deref() { lines.push(format!("stop: {stop_reason}")); } lines.push(format!( "saved: {}", response.persisted_output_path.display() )); if let Some(cost) = response.total_cost_usd { lines.push(format!("cost_usd: {cost:.6}")); } lines.push(format!( "permission_denials: {}", response.permission_denials.len() )); if let Some(fast_mode_state) = response.fast_mode_state.as_deref() { lines.push(format!("fast_mode: {fast_mode_state}")); } if let Some(uuid) = response.uuid.as_deref() { lines.push(format!("uuid: {uuid}")); } if let Some(usage) = usage_summary(response.usage.as_ref()) { lines.push(format!("usage: {usage}")); } lines.push("response:".to_owned()); lines.push(response.result.clone()); lines.join("\n") } fn usage_summary(usage: Option<&Value>) -> Option { let Value::Object(usage) = usage? else { return None; }; let summary = usage .iter() .filter_map(|(key, value)| match value { Value::Number(number) => Some((key.clone(), number.to_string())), Value::String(text) if !text.is_empty() => Some((key.clone(), text.clone())), _ => None, }) .collect::>(); (!summary.is_empty()).then(|| { summary .into_iter() .map(|(key, value)| format!("{key}={value}")) .collect::>() .join(" ") }) } fn model_name(model_usage: Option<&Value>) -> Option { let Value::Object(models) = model_usage? else { return None; }; models.keys().next().cloned() } fn render_duration_ms(duration_ms: u64) -> String { if duration_ms < 1_000 { return format!("{duration_ms}ms"); } let seconds = duration_ms as f64 / 1_000.0; format!("{seconds:.3}s") } fn generation_from_wire(raw: u64) -> Generation { let mut generation = Generation::genesis(); for _ in 1..raw { generation = generation.next(); } generation } #[cfg(test)] mod tests { use super::WorkingDirectory; use std::path::{Path, PathBuf}; #[test] fn absolute_working_directory_resolution_bypasses_base() { let absolute = PathBuf::from("/tmp"); let nonexistent = Path::new("/definitely/not/a/real/base"); let resolved = WorkingDirectory::resolve_candidate(Some(nonexistent), Some(absolute.clone())).unwrap(); assert_eq!(resolved, absolute); } #[test] fn relative_working_directory_resolution_uses_base() { let base = Path::new("/tmp"); let resolved = WorkingDirectory::resolve_candidate(Some(base), Some(PathBuf::from("phone_opus"))) .unwrap(); assert_eq!(resolved, PathBuf::from("/tmp/phone_opus")); } }