use std::collections::{BTreeMap, BTreeSet}; use std::fs; use std::io::{self, BufRead, Write}; use std::path::{Path, PathBuf}; use std::process::{Command, Stdio}; use std::time::{Duration, Instant, SystemTime, UNIX_EPOCH}; use dirs::{home_dir, state_dir}; use libmcp::{Generation, SurfaceKind}; use serde::{Deserialize, Serialize}; use serde_json::{Map, Value, json}; use thiserror::Error; use time::{OffsetDateTime, format_description::well_known::Rfc3339}; use users::get_current_uid; use uuid::Uuid; use crate::mcp::fault::{FaultRecord, FaultStage}; use crate::mcp::output::{ ToolOutput, fallback_detailed_tool_output, split_presentation, tool_success, }; use crate::mcp::protocol::{ CLAUDE_BIN_ENV, CLAUDE_CONSULT_PREFIX, CLAUDE_EFFORT, CLAUDE_MODEL, CLAUDE_TOOLSET, EMPTY_MCP_CONFIG, }; pub(crate) fn run_worker(generation: u64) -> Result<(), Box> { let generation = generation_from_wire(generation); let stdin = io::stdin(); let mut stdout = io::stdout().lock(); let mut service = WorkerService::new(generation); for line in stdin.lock().lines() { let line = line?; if line.trim().is_empty() { continue; } let request = serde_json::from_str::(&line)?; let response = match request { crate::mcp::protocol::WorkerRequest::Execute { id, operation } => { let outcome = match service.execute(operation) { Ok(result) => crate::mcp::protocol::WorkerOutcome::Success { result }, Err(fault) => crate::mcp::protocol::WorkerOutcome::Fault { fault }, }; crate::mcp::protocol::WorkerResponse { id, outcome } } }; serde_json::to_writer(&mut stdout, &response)?; stdout.write_all(b"\n")?; stdout.flush()?; } Ok(()) } pub(crate) fn run_background_consult(job_file: PathBuf) -> Result<(), Box> { execute_background_consult(job_file)?; Ok(()) } struct WorkerService { generation: Generation, } impl WorkerService { fn new(generation: Generation) -> Self { Self { generation } } fn execute( &mut self, operation: crate::mcp::protocol::WorkerOperation, ) -> Result { match operation { crate::mcp::protocol::WorkerOperation::CallTool { name, arguments } => { self.call_tool(&name, arguments) } } } fn call_tool(&mut self, name: &str, arguments: Value) -> Result { let operation = format!("tools/call:{name}"); let (presentation, arguments) = split_presentation(arguments, &operation, self.generation, FaultStage::Worker)?; let output = match name { "consult" => { let args = deserialize::(arguments, &operation, self.generation)?; let request = ConsultRequest::parse(args) .map_err(|error| invalid_consult_request(self.generation, &operation, error))?; match request.mode() { ConsultMode::Sync => { let response = invoke_claude(&request) .map_err(|error| consult_fault(self.generation, &operation, error))?; consult_output(&request, &response, self.generation, &operation)? } ConsultMode::Background => submit_background_consult( &request, self.generation, FaultStage::Worker, &operation, )?, } } other => { return Err(FaultRecord::invalid_input( self.generation, FaultStage::Worker, &operation, format!("unknown worker tool `{other}`"), )); } }; tool_success( output, presentation, self.generation, FaultStage::Worker, &operation, ) } } #[derive(Debug, Deserialize)] struct ConsultArgs { prompt: String, cwd: Option, session_id: Option, background: Option, } #[derive(Debug, Deserialize)] struct ConsultJobArgs { job_id: String, } #[derive(Debug, Deserialize)] struct ConsultWaitArgs { job_id: String, timeout_ms: Option, poll_interval_ms: Option, } #[derive(Debug, Default, Deserialize)] struct ConsultJobsArgs { limit: Option, } const DEFAULT_CONSULT_WAIT_TIMEOUT_MS: u64 = 30 * 60 * 1_000; const DEFAULT_CONSULT_WAIT_POLL_INTERVAL_MS: u64 = 1_000; const MIN_CONSULT_WAIT_POLL_INTERVAL_MS: u64 = 10; #[derive(Debug, Clone)] struct ConsultRequest { prompt: PromptText, cwd: WorkingDirectory, session: Option, mode: ConsultMode, } impl ConsultRequest { fn parse(args: ConsultArgs) -> Result { Ok(Self { prompt: PromptText::parse(args.prompt)?, cwd: WorkingDirectory::resolve(args.cwd)?, session: args.session_id.map(SessionHandle::parse).transpose()?, mode: ConsultMode::from_background(args.background), }) } fn mode(&self) -> ConsultMode { self.mode } fn session_mode(&self) -> &'static str { if self.session.is_some() { "resumed" } else { "new" } } fn requested_session_id(&self) -> Option { self.session.as_ref().map(SessionHandle::display) } fn background_request(&self) -> BackgroundConsultRequest { BackgroundConsultRequest { prompt: self.prompt.as_str().to_owned(), cwd: self.cwd.display(), session_id: self.requested_session_id(), } } } #[derive(Debug, Clone, Copy, Eq, PartialEq)] enum ConsultMode { Sync, Background, } impl ConsultMode { fn from_background(raw: Option) -> Self { if raw.unwrap_or(false) { Self::Background } else { Self::Sync } } fn as_str(self) -> &'static str { match self { Self::Sync => "sync", Self::Background => "background", } } } #[derive(Debug, Clone)] struct PromptText { original: String, rendered: String, } impl PromptText { fn parse(raw: String) -> Result { if raw.trim().is_empty() { return Err(ConsultRequestError::EmptyPrompt); } Ok(Self { rendered: format!("{CLAUDE_CONSULT_PREFIX}\n\n{raw}"), original: raw, }) } fn as_str(&self) -> &str { self.original.as_str() } fn rendered(&self) -> &str { self.rendered.as_str() } } #[derive(Debug, Clone)] struct WorkingDirectory(PathBuf); impl WorkingDirectory { fn resolve(raw: Option) -> Result { let base = std::env::current_dir().map_err(|source| ConsultRequestError::CurrentDir { source })?; let requested = raw.map_or_else(|| base.clone(), PathBuf::from); let candidate = if requested.is_absolute() { requested } else { base.join(requested) }; let canonical = candidate .canonicalize() .map_err(|source| ConsultRequestError::Canonicalize { path: candidate.display().to_string(), source, })?; if !canonical.is_dir() { return Err(ConsultRequestError::NotDirectory( canonical.display().to_string(), )); } Ok(Self(canonical)) } fn as_path(&self) -> &Path { self.0.as_path() } fn display(&self) -> String { self.0.display().to_string() } } #[derive(Debug, Clone)] struct SessionHandle(Uuid); impl SessionHandle { fn parse(raw: String) -> Result { Uuid::parse_str(&raw) .map(Self) .map_err(|_| ConsultRequestError::InvalidSessionHandle(raw)) } fn display(&self) -> String { self.0.to_string() } } #[derive(Debug, Clone, Deserialize, Eq, PartialEq, Serialize)] struct BackgroundConsultRequest { prompt: String, cwd: String, session_id: Option, } impl BackgroundConsultRequest { fn into_consult_request(self) -> Result { ConsultRequest::parse(ConsultArgs { prompt: self.prompt, cwd: Some(self.cwd), session_id: self.session_id, background: Some(false), }) } } #[derive(Debug, Clone, Copy, Deserialize, Eq, PartialEq, Serialize)] #[serde(rename_all = "snake_case")] enum BackgroundConsultStatus { Queued, Running, Succeeded, Failed, } impl BackgroundConsultStatus { fn done(self) -> bool { matches!(self, Self::Succeeded | Self::Failed) } fn success(self) -> bool { matches!(self, Self::Succeeded) } fn failed(self) -> bool { matches!(self, Self::Failed) } } #[derive(Debug, Clone, Deserialize, Eq, Ord, PartialEq, PartialOrd, Serialize)] #[serde(transparent)] struct BackgroundConsultJobId(Uuid); impl BackgroundConsultJobId { fn new() -> Self { Self(Uuid::new_v4()) } fn parse(raw: String) -> Result { Uuid::parse_str(&raw) .map(Self) .map_err(|_| ConsultRequestError::InvalidJobHandle(raw)) } fn display(&self) -> String { self.0.to_string() } } #[derive(Debug, Clone, Deserialize, Eq, PartialEq, Serialize)] struct BackgroundConsultFailure { class: String, detail: String, } #[derive(Debug, Clone, Deserialize, PartialEq, Serialize)] struct BackgroundConsultResponseRecord { cwd: String, response: String, persisted_output_path: String, duration_ms: u64, duration_api_ms: Option, num_turns: u64, stop_reason: Option, session_id: Option, total_cost_usd: Option, usage: Option, model_usage: Option, permission_denials: Vec, fast_mode_state: Option, uuid: Option, } impl BackgroundConsultResponseRecord { fn from_response(response: ConsultResponse) -> Self { Self { cwd: response.cwd.display(), response: response.result, persisted_output_path: response.persisted_output_path.display(), duration_ms: response.duration_ms, duration_api_ms: response.duration_api_ms, num_turns: response.num_turns, stop_reason: response.stop_reason, session_id: response.session_id, total_cost_usd: response.total_cost_usd, usage: response.usage, model_usage: response.model_usage, permission_denials: response.permission_denials, fast_mode_state: response.fast_mode_state, uuid: response.uuid, } } fn model_name(&self) -> Option { model_name(self.model_usage.as_ref()) } } #[derive(Debug, Clone, Deserialize, PartialEq, Serialize)] struct BackgroundConsultJobRecord { job_id: BackgroundConsultJobId, status: BackgroundConsultStatus, created_unix_ms: u64, updated_unix_ms: u64, started_unix_ms: Option, finished_unix_ms: Option, runner_pid: Option, request: BackgroundConsultRequest, prompt_prefix_injected: bool, result: Option, failure: Option, } #[derive(Debug, Clone, Copy, Eq, PartialEq)] struct ConsultWaitConfig { timeout_ms: u64, poll_interval_ms: u64, } impl ConsultWaitConfig { fn parse(timeout_ms: Option, poll_interval_ms: Option) -> Self { Self { timeout_ms: timeout_ms.unwrap_or(DEFAULT_CONSULT_WAIT_TIMEOUT_MS), poll_interval_ms: poll_interval_ms .unwrap_or(DEFAULT_CONSULT_WAIT_POLL_INTERVAL_MS) .max(MIN_CONSULT_WAIT_POLL_INTERVAL_MS), } } } #[derive(Debug, Clone)] struct BackgroundConsultWaitRequest { job_id: BackgroundConsultJobId, config: ConsultWaitConfig, } impl BackgroundConsultWaitRequest { fn parse(args: ConsultWaitArgs) -> Result { Ok(Self { job_id: BackgroundConsultJobId::parse(args.job_id)?, config: ConsultWaitConfig::parse(args.timeout_ms, args.poll_interval_ms), }) } } #[derive(Debug, Clone, Copy, Eq, PartialEq)] struct BackgroundConsultWaitMetadata { waited_ms: u64, timed_out: bool, } #[derive(Debug, Clone)] struct BackgroundConsultWaitOutcome { record: BackgroundConsultJobRecord, metadata: BackgroundConsultWaitMetadata, } impl BackgroundConsultJobRecord { fn new(job_id: BackgroundConsultJobId, request: BackgroundConsultRequest) -> Self { let now = unix_ms_now(); Self { job_id, status: BackgroundConsultStatus::Queued, created_unix_ms: now, updated_unix_ms: now, started_unix_ms: None, finished_unix_ms: None, runner_pid: None, request, prompt_prefix_injected: true, result: None, failure: None, } } fn summary(&self) -> Value { json!({ "job_id": self.job_id.display(), "status": self.status, "done": self.status.done(), "succeeded": self.status.success(), "failed": self.status.failed(), "created_unix_ms": self.created_unix_ms, "updated_unix_ms": self.updated_unix_ms, "started_unix_ms": self.started_unix_ms, "finished_unix_ms": self.finished_unix_ms, "runner_pid": self.runner_pid, "cwd": self.request.cwd, "requested_session_id": self.request.session_id, "prompt_prefix_injected": self.prompt_prefix_injected, }) } } #[derive(Debug, Error)] enum ConsultRequestError { #[error("prompt must not be empty")] EmptyPrompt, #[error("failed to resolve the current working directory: {source}")] CurrentDir { source: io::Error }, #[error("failed to resolve working directory `{path}`: {source}")] Canonicalize { path: String, source: io::Error }, #[error("working directory `{0}` is not a directory")] NotDirectory(String), #[error("session_id must be a valid UUID, got `{0}`")] InvalidSessionHandle(String), #[error("job_id must be a valid UUID, got `{0}`")] InvalidJobHandle(String), } #[derive(Debug, Error)] enum ConsultInvocationError { #[error("failed to spawn Claude Code: {0}")] Spawn(#[source] io::Error), #[error("Claude Code returned non-JSON output: {0}")] InvalidJson(String), #[error("{0}")] Downstream(String), } #[derive(Debug, Deserialize)] struct ClaudeJsonEnvelope { #[serde(rename = "type")] envelope_type: String, subtype: Option, is_error: bool, duration_ms: Option, duration_api_ms: Option, num_turns: Option, result: Option, stop_reason: Option, session_id: Option, total_cost_usd: Option, usage: Option, #[serde(rename = "modelUsage")] model_usage: Option, #[serde(default)] permission_denials: Vec, fast_mode_state: Option, uuid: Option, } #[derive(Debug)] struct ConsultResponse { cwd: WorkingDirectory, result: String, persisted_output_path: PersistedConsultPath, duration_ms: u64, duration_api_ms: Option, num_turns: u64, stop_reason: Option, session_id: Option, total_cost_usd: Option, usage: Option, model_usage: Option, permission_denials: Vec, fast_mode_state: Option, uuid: Option, } impl ConsultResponse { fn model_name(&self) -> Option { model_name(self.model_usage.as_ref()) } } const SYSTEMD_RUN_BINARY: &str = "systemd-run"; const DEFAULT_PATH: &str = "/usr/local/bin:/usr/bin:/bin"; const PHONE_OPUS_STATE_ROOT_NAME: &str = "phone_opus"; const CLAUDE_HOME_DIR_NAME: &str = "claude-home"; const XDG_CONFIG_DIR_NAME: &str = "xdg-config"; const XDG_CACHE_DIR_NAME: &str = "xdg-cache"; const XDG_STATE_DIR_NAME: &str = "xdg-state"; const SHARED_TMP_ROOTS: [&str; 2] = ["/tmp", "/var/tmp"]; const CONSULT_OUTPUT_ROOT: &str = "/tmp/phone_opus-consults"; const CONSULT_OUTPUT_KEEP_COUNT: usize = 256; const CONSULT_OUTPUT_MAX_AGE: Duration = Duration::from_secs(7 * 24 * 60 * 60); const CONSULT_TIMESTAMP_FORMAT: &[time::format_description::FormatItem<'static>] = time::macros::format_description!("[year][month][day]T[hour][minute][second]Z"); const CLAUDE_SEED_FILES: [&str; 5] = [ ".credentials.json", "settings.json", "settings.local.json", ".claude/settings.local.json", "CLAUDE.md", ]; const SERVICE_ENV_ALLOWLIST: [&str; 19] = [ "LANG", "LC_ALL", "LC_CTYPE", "TERM", "COLORTERM", "USER", "LOGNAME", "HTTP_PROXY", "HTTPS_PROXY", "NO_PROXY", "ALL_PROXY", "http_proxy", "https_proxy", "no_proxy", "all_proxy", "SSL_CERT_FILE", "SSL_CERT_DIR", "REQUESTS_CA_BUNDLE", "NODE_EXTRA_CA_CERTS", ]; const SERVICE_ENV_PREFIX_ALLOWLIST: [&str; 2] = ["ANTHROPIC_", "PHONE_OPUS_TEST_"]; #[derive(Debug, Clone)] struct ClaudeSandbox { source_home: PathBuf, state_root: PathBuf, claude_home: PathBuf, xdg_config_home: PathBuf, xdg_cache_home: PathBuf, xdg_state_home: PathBuf, } impl ClaudeSandbox { fn prepare() -> io::Result { let source_home = caller_home_dir().ok_or_else(|| { io::Error::new( io::ErrorKind::NotFound, "failed to resolve the caller home directory", ) })?; let state_root = phone_opus_state_root()?; let sandbox = Self { source_home, claude_home: state_root.join(CLAUDE_HOME_DIR_NAME), xdg_config_home: state_root.join(XDG_CONFIG_DIR_NAME), xdg_cache_home: state_root.join(XDG_CACHE_DIR_NAME), xdg_state_home: state_root.join(XDG_STATE_DIR_NAME), state_root, }; sandbox.create_layout()?; sandbox.sync_seed_claude_files()?; Ok(sandbox) } fn create_layout(&self) -> io::Result<()> { fs::create_dir_all(&self.claude_home)?; fs::create_dir_all(self.claude_config_dir())?; fs::create_dir_all(&self.xdg_config_home)?; fs::create_dir_all(&self.xdg_cache_home)?; fs::create_dir_all(&self.xdg_state_home)?; Ok(()) } fn claude_config_dir(&self) -> PathBuf { self.claude_home.join(".claude") } fn source_claude_dir(&self) -> PathBuf { self.source_home.join(".claude") } fn sync_seed_claude_files(&self) -> io::Result<()> { let source_root = self.source_claude_dir(); let destination_root = self.claude_config_dir(); for relative in CLAUDE_SEED_FILES { sync_optional_seed_file( source_root.join(relative).as_path(), destination_root.join(relative).as_path(), )?; } Ok(()) } fn read_only_paths(&self, request: &ConsultRequest) -> BTreeSet { let cwd = request.cwd.as_path(); let mut paths = BTreeSet::from([self.source_home.clone()]); if self.force_read_only_cwd(cwd) { let _ = paths.insert(cwd.to_path_buf()); } paths } fn force_read_only_cwd(&self, cwd: &Path) -> bool { self.read_write_paths() .iter() .any(|path| cwd.starts_with(path)) && !SHARED_TMP_ROOTS.iter().any(|root| cwd == Path::new(root)) } fn read_write_paths(&self) -> BTreeSet { let mut paths = BTreeSet::new(); let _ = paths.insert(self.state_root.clone()); for root in SHARED_TMP_ROOTS { let _ = paths.insert(PathBuf::from(root)); } paths } fn service_environment(&self) -> BTreeMap { let runtime_dir = caller_runtime_dir(); let mut environment = BTreeMap::from([ ("HOME".to_owned(), self.claude_home.display().to_string()), ( "XDG_CONFIG_HOME".to_owned(), self.xdg_config_home.display().to_string(), ), ( "XDG_CACHE_HOME".to_owned(), self.xdg_cache_home.display().to_string(), ), ( "XDG_STATE_HOME".to_owned(), self.xdg_state_home.display().to_string(), ), ( "XDG_RUNTIME_DIR".to_owned(), runtime_dir.display().to_string(), ), ( "DBUS_SESSION_BUS_ADDRESS".to_owned(), caller_dbus_session_bus_address(runtime_dir.as_path()), ), ("PATH".to_owned(), caller_path()), ]); for name in SERVICE_ENV_ALLOWLIST { if let Some(value) = caller_env(name) { let _ = environment.insert(name.to_owned(), value); } } for (name, value) in std::env::vars() { if SERVICE_ENV_PREFIX_ALLOWLIST .iter() .any(|prefix| name.starts_with(prefix)) { let _ = environment.insert(name, value); } } environment } } #[derive(Debug, Clone)] struct PersistedConsultPath(PathBuf); impl PersistedConsultPath { fn new(request: &ConsultRequest, session_id: Option<&str>) -> io::Result { fs::create_dir_all(CONSULT_OUTPUT_ROOT)?; let timestamp = OffsetDateTime::now_utc() .format(CONSULT_TIMESTAMP_FORMAT) .map_err(|error| io::Error::other(error.to_string()))?; let slug = consult_slug(request.prompt.as_str()); let session_slug = session_id.map_or_else( || "session-none".to_owned(), |session_id| format!("session-{}", consult_slug(session_id)), ); Ok(Self(Path::new(CONSULT_OUTPUT_ROOT).join(format!( "{timestamp}-{slug}-{session_slug}-{}.json", Uuid::new_v4() )))) } fn as_path(&self) -> &Path { self.0.as_path() } fn display(&self) -> String { self.0.display().to_string() } } fn deserialize Deserialize<'de>>( value: Value, operation: &str, generation: Generation, ) -> Result { serde_json::from_value(value).map_err(|error| { FaultRecord::invalid_input( generation, FaultStage::Protocol, operation, format!("invalid params: {error}"), ) }) } fn invalid_consult_request( generation: Generation, operation: &str, error: ConsultRequestError, ) -> FaultRecord { FaultRecord::invalid_input(generation, FaultStage::Worker, operation, error.to_string()) } fn consult_fault( generation: Generation, operation: &str, error: ConsultInvocationError, ) -> FaultRecord { match error { ConsultInvocationError::Spawn(source) => FaultRecord::process( generation, FaultStage::Claude, operation, source.to_string(), ), ConsultInvocationError::InvalidJson(detail) | ConsultInvocationError::Downstream(detail) => { FaultRecord::downstream(generation, FaultStage::Claude, operation, detail) } } } pub(crate) fn consult_job_tool_output( arguments: Value, generation: Generation, stage: FaultStage, operation: &str, ) -> Result { let args = deserialize::(arguments, operation, generation)?; let job_id = BackgroundConsultJobId::parse(args.job_id) .map_err(|error| invalid_consult_request(generation, operation, error))?; let record = load_background_consult_job(&job_id).map_err(|error| { FaultRecord::downstream(generation, stage, operation, error.to_string()) })?; background_job_tool_output(&record, None, generation, stage, operation) } pub(crate) fn consult_wait_tool_output( arguments: Value, generation: Generation, stage: FaultStage, operation: &str, ) -> Result { let args = deserialize::(arguments, operation, generation)?; let request = BackgroundConsultWaitRequest::parse(args) .map_err(|error| invalid_consult_request(generation, operation, error))?; let outcome = wait_for_background_consult(&request).map_err(|error| { FaultRecord::downstream(generation, stage, operation, error.to_string()) })?; background_job_tool_output( &outcome.record, Some(outcome.metadata), generation, stage, operation, ) } pub(crate) fn consult_jobs_tool_output( arguments: Value, generation: Generation, stage: FaultStage, operation: &str, ) -> Result { let args = deserialize::(arguments, operation, generation)?; let limit = args.limit.unwrap_or(10).clamp(1, 50); let jobs = load_recent_background_consult_jobs(limit as usize).map_err(|error| { FaultRecord::downstream(generation, stage, operation, error.to_string()) })?; let concise = json!({ "jobs": jobs.iter().map(BackgroundConsultJobRecord::summary).collect::>(), "count": jobs.len(), }); let full = json!({ "jobs": jobs, "count": jobs.len(), }); let lines = if jobs.is_empty() { "no background consult jobs".to_owned() } else { jobs.iter() .map(|job| { format!( "{} status={} cwd={}", job.job_id.display(), serde_json::to_string(&job.status).unwrap_or_else(|_| "\"unknown\"".to_owned()), job.request.cwd ) }) .collect::>() .join("\n") }; fallback_detailed_tool_output( &concise, &full, lines, None, SurfaceKind::Read, generation, stage, operation, ) } fn submit_background_consult( request: &ConsultRequest, generation: Generation, stage: FaultStage, operation: &str, ) -> Result { let job_id = BackgroundConsultJobId::new(); let mut record = BackgroundConsultJobRecord::new(job_id, request.background_request()); persist_background_consult_job(&record) .map_err(|error| FaultRecord::process(generation, stage, operation, error.to_string()))?; let executable = std::env::current_exe() .map_err(|error| FaultRecord::process(generation, stage, operation, error.to_string()))?; let job_file = background_consult_job_path(&record.job_id) .map_err(|error| FaultRecord::process(generation, stage, operation, error.to_string()))?; let child = Command::new(executable) .arg("mcp") .arg("background-consult") .arg("--job-file") .arg(&job_file) .stdin(Stdio::null()) .stdout(Stdio::null()) .stderr(Stdio::null()) .spawn() .map_err(|error| FaultRecord::process(generation, stage, operation, error.to_string()))?; record.status = BackgroundConsultStatus::Running; record.started_unix_ms = Some(unix_ms_now()); record.updated_unix_ms = unix_ms_now(); record.runner_pid = Some(child.id()); persist_background_consult_job(&record) .map_err(|error| FaultRecord::process(generation, stage, operation, error.to_string()))?; let concise = json!({ "mode": request.mode().as_str(), "job_id": record.job_id.display(), "status": record.status, "done": false, "requested_session_id": request.requested_session_id(), "session_mode": request.session_mode(), "prompt_prefix_injected": true, "follow_up_tools": ["consult_wait", "consult_job", "consult_jobs"], }); let full = json!({ "mode": request.mode().as_str(), "job_id": record.job_id.display(), "status": record.status, "done": false, "requested_session_id": request.requested_session_id(), "session_mode": request.session_mode(), "prompt_prefix_injected": true, "prompt": request.prompt.as_str(), "effective_prompt": request.prompt.rendered(), "cwd": request.cwd.display(), "follow_up_tools": ["consult_wait", "consult_job", "consult_jobs"], }); fallback_detailed_tool_output( &concise, &full, format!( "background consult submitted job={} status=running", record.job_id.display() ), None, SurfaceKind::Read, generation, stage, operation, ) } fn execute_background_consult(job_file: PathBuf) -> io::Result<()> { let mut record = read_json_file::(&job_file)?; record.status = BackgroundConsultStatus::Running; let _ = record.runner_pid.get_or_insert(std::process::id()); let _ = record.started_unix_ms.get_or_insert_with(unix_ms_now); record.updated_unix_ms = unix_ms_now(); persist_background_consult_job_to_path(&job_file, &record)?; let request = record .request .clone() .into_consult_request() .map_err(|error| io::Error::other(error.to_string()))?; match invoke_claude(&request) { Ok(response) => { record.status = BackgroundConsultStatus::Succeeded; record.result = Some(BackgroundConsultResponseRecord::from_response(response)); record.failure = None; } Err(error) => { record.status = BackgroundConsultStatus::Failed; record.result = None; record.failure = Some(background_failure(error)); } } record.finished_unix_ms = Some(unix_ms_now()); record.updated_unix_ms = unix_ms_now(); record.runner_pid = None; persist_background_consult_job_to_path(&job_file, &record) } fn background_failure(error: ConsultInvocationError) -> BackgroundConsultFailure { match error { ConsultInvocationError::Spawn(source) => BackgroundConsultFailure { class: "process".to_owned(), detail: source.to_string(), }, ConsultInvocationError::InvalidJson(detail) | ConsultInvocationError::Downstream(detail) => BackgroundConsultFailure { class: "downstream".to_owned(), detail, }, } } fn wait_for_background_consult( request: &BackgroundConsultWaitRequest, ) -> io::Result { let started_at = Instant::now(); loop { let record = load_background_consult_job(&request.job_id)?; let waited_ms = elapsed_duration_ms(started_at.elapsed()); if record.status.done() { return Ok(BackgroundConsultWaitOutcome { record, metadata: BackgroundConsultWaitMetadata { waited_ms, timed_out: false, }, }); } if waited_ms >= request.config.timeout_ms { return Ok(BackgroundConsultWaitOutcome { record, metadata: BackgroundConsultWaitMetadata { waited_ms, timed_out: true, }, }); } let remaining_ms = request.config.timeout_ms.saturating_sub(waited_ms); let sleep_ms = remaining_ms.min(request.config.poll_interval_ms); std::thread::sleep(Duration::from_millis(sleep_ms)); } } fn background_job_tool_output( record: &BackgroundConsultJobRecord, wait: Option, generation: Generation, stage: FaultStage, operation: &str, ) -> Result { let mut concise = match record.summary() { Value::Object(object) => object, _ => Map::new(), }; if let Some(wait) = wait { let _ = concise.insert("waited_ms".to_owned(), json!(wait.waited_ms)); let _ = concise.insert("timed_out".to_owned(), json!(wait.timed_out)); } let _ = concise.insert( "result".to_owned(), json!(record.result.as_ref().map(|result| json!({ "response": result.response, "persisted_output_path": result.persisted_output_path, "duration_ms": result.duration_ms, "num_turns": result.num_turns, "session_id": result.session_id, "model": result.model_name(), }))), ); let _ = concise.insert("failure".to_owned(), json!(record.failure)); let mut full = Map::from_iter([("job".to_owned(), json!(record))]); if let Some(wait) = wait { let _ = full.insert("waited_ms".to_owned(), json!(wait.waited_ms)); let _ = full.insert("timed_out".to_owned(), json!(wait.timed_out)); } let mut lines = vec![format!( "job={} status={:?}", record.job_id.display(), record.status )]; if let Some(wait) = wait { lines.push(format!( "waited={} timed_out={}", render_duration_ms(wait.waited_ms), wait.timed_out )); } if let Some(result) = record.result.as_ref() { lines.push(format!( "result ready model={} turns={} duration={}", result.model_name().unwrap_or_else(|| "unknown".to_owned()), result.num_turns, render_duration_ms(result.duration_ms) )); lines.push(format!("saved={}", result.persisted_output_path)); lines.push("response:".to_owned()); lines.push(result.response.clone()); } if let Some(failure) = record.failure.as_ref() { lines.push(format!("failure={} {}", failure.class, failure.detail)); } fallback_detailed_tool_output( &Value::Object(concise), &Value::Object(full), lines.join("\n"), None, SurfaceKind::Read, generation, stage, operation, ) } fn background_consult_job_root() -> io::Result { let path = phone_opus_state_root()?.join("mcp").join("consult_jobs"); fs::create_dir_all(&path)?; Ok(path) } fn background_consult_job_path(job_id: &BackgroundConsultJobId) -> io::Result { Ok(background_consult_job_root()?.join(format!("{}.json", job_id.display()))) } fn persist_background_consult_job(record: &BackgroundConsultJobRecord) -> io::Result<()> { let path = background_consult_job_path(&record.job_id)?; persist_background_consult_job_to_path(&path, record) } fn persist_background_consult_job_to_path( path: &Path, record: &BackgroundConsultJobRecord, ) -> io::Result<()> { write_json_file(path, record) } fn load_background_consult_job( job_id: &BackgroundConsultJobId, ) -> io::Result { read_json_file(background_consult_job_path(job_id)?.as_path()) } fn load_recent_background_consult_jobs( limit: usize, ) -> io::Result> { let mut jobs = fs::read_dir(background_consult_job_root()?)? .filter_map(Result::ok) .map(|entry| entry.path()) .filter(|path| path.extension().and_then(|ext| ext.to_str()) == Some("json")) .filter_map(|path| read_json_file::(&path).ok()) .collect::>(); jobs.sort_by(|left, right| { right .updated_unix_ms .cmp(&left.updated_unix_ms) .then_with(|| left.job_id.cmp(&right.job_id)) }); jobs.truncate(limit); Ok(jobs) } fn read_json_file Deserialize<'de>>(path: &Path) -> io::Result { let bytes = fs::read(path)?; serde_json::from_slice(&bytes).map_err(|error| io::Error::other(error.to_string())) } fn write_json_file(path: &Path, value: &T) -> io::Result<()> { let payload = serde_json::to_vec_pretty(value).map_err(|error| io::Error::other(error.to_string()))?; let temp_path = path.with_extension("json.tmp"); fs::write(&temp_path, payload)?; fs::rename(temp_path, path)?; Ok(()) } fn unix_ms_now() -> u64 { let duration = SystemTime::now() .duration_since(UNIX_EPOCH) .unwrap_or(Duration::ZERO); u64::try_from(duration.as_millis()).unwrap_or(u64::MAX) } fn invoke_claude(request: &ConsultRequest) -> Result { let sandbox = ClaudeSandbox::prepare().map_err(ConsultInvocationError::Spawn)?; let mut command = Command::new(SYSTEMD_RUN_BINARY); let runtime_dir = caller_runtime_dir(); let _ = command .env("XDG_RUNTIME_DIR", runtime_dir.as_os_str()) .env( "DBUS_SESSION_BUS_ADDRESS", caller_dbus_session_bus_address(runtime_dir.as_path()), ) .arg("--user") .arg("--wait") .arg("--pipe") .arg("--collect") .arg("--quiet") .arg("--working-directory") .arg(request.cwd.as_path()); for property in [ "ProtectSystem=strict", "NoNewPrivileges=yes", "PrivateDevices=yes", "RestrictSUIDSGID=yes", "RestrictAddressFamilies=AF_UNIX AF_INET AF_INET6", ] { let _ = command.arg("-p").arg(property); } for path in sandbox.read_write_paths() { let _ = command .arg("-p") .arg(format!("ReadWritePaths={}", path.display())); } for path in sandbox.read_only_paths(request) { let _ = command .arg("-p") .arg(format!("ReadOnlyPaths={}", path.display())); } for (name, value) in sandbox.service_environment() { let _ = command.arg(format!("--setenv={name}={value}")); } let _ = command .arg(claude_binary()) .arg("-p") .arg("--output-format") .arg("json") .arg("--strict-mcp-config") .arg("--mcp-config") .arg(EMPTY_MCP_CONFIG) .arg("--disable-slash-commands") .arg("--no-chrome") .arg("--model") .arg(CLAUDE_MODEL) .arg("--effort") .arg(CLAUDE_EFFORT) .arg("--tools") .arg(CLAUDE_TOOLSET) .arg("--dangerously-skip-permissions"); if let Some(session) = request.session.as_ref() { let _ = command.arg("--resume").arg(session.display()); } let output = command .arg(request.prompt.rendered()) .output() .map_err(ConsultInvocationError::Spawn)?; let stdout = String::from_utf8_lossy(&output.stdout).trim().to_owned(); let stderr = String::from_utf8_lossy(&output.stderr).trim().to_owned(); let envelope = match serde_json::from_slice::(&output.stdout) { Ok(envelope) => envelope, Err(_error) if !output.status.success() => { return Err(ConsultInvocationError::Downstream(downstream_message( output.status.code(), &stdout, &stderr, ))); } Err(error) => { return Err(ConsultInvocationError::InvalidJson(format!( "{error}; stdout={stdout}; stderr={stderr}" ))); } }; if envelope.envelope_type != "result" { return Err(ConsultInvocationError::Downstream(format!( "unexpected Claude envelope type `{}`", envelope.envelope_type ))); } if !output.status.success() || envelope.is_error || envelope.subtype.as_deref() != Some("success") { return Err(ConsultInvocationError::Downstream( envelope .result .filter(|value| !value.trim().is_empty()) .unwrap_or_else(|| downstream_message(output.status.code(), &stdout, &stderr)), )); } let result = envelope.result.clone().unwrap_or_default(); let persisted_output_path = persist_consult_output(request, &result, &envelope) .map_err(ConsultInvocationError::Spawn)?; Ok(ConsultResponse { cwd: request.cwd.clone(), result, persisted_output_path, duration_ms: envelope.duration_ms.unwrap_or(0), duration_api_ms: envelope.duration_api_ms, num_turns: envelope.num_turns.unwrap_or(0), stop_reason: envelope.stop_reason, session_id: envelope.session_id, total_cost_usd: envelope.total_cost_usd, usage: envelope.usage, model_usage: envelope.model_usage, permission_denials: envelope.permission_denials, fast_mode_state: envelope.fast_mode_state, uuid: envelope.uuid, }) } fn downstream_message(status_code: Option, stdout: &str, stderr: &str) -> String { if !stderr.is_empty() { return stderr.to_owned(); } if !stdout.is_empty() { return stdout.to_owned(); } format!("Claude Code exited with status {status_code:?}") } fn claude_binary() -> PathBuf { std::env::var_os(CLAUDE_BIN_ENV) .map(PathBuf::from) .unwrap_or_else(|| PathBuf::from("claude")) } fn phone_opus_state_root() -> io::Result { let base = std::env::var_os("XDG_STATE_HOME") .filter(|value| !value.is_empty()) .map(PathBuf::from) .or_else(state_dir) .or_else(|| caller_home_dir().map(|home| home.join(".local").join("state"))) .ok_or_else(|| { io::Error::new( io::ErrorKind::NotFound, "failed to resolve phone_opus state root", ) })?; let root = base.join(PHONE_OPUS_STATE_ROOT_NAME); fs::create_dir_all(&root)?; Ok(root) } fn caller_home_dir() -> Option { std::env::var_os("HOME") .filter(|value| !value.is_empty()) .map(PathBuf::from) .or_else(home_dir) } fn caller_env(name: &str) -> Option { std::env::var(name).ok().filter(|value| !value.is_empty()) } fn caller_path() -> String { caller_env("PATH").unwrap_or_else(|| DEFAULT_PATH.to_owned()) } fn caller_runtime_dir() -> PathBuf { std::env::var_os("XDG_RUNTIME_DIR") .filter(|value| !value.is_empty()) .map(PathBuf::from) .unwrap_or_else(|| PathBuf::from(format!("/run/user/{}", get_current_uid()))) } fn caller_dbus_session_bus_address(runtime_dir: &Path) -> String { caller_env("DBUS_SESSION_BUS_ADDRESS") .unwrap_or_else(|| format!("unix:path={}", runtime_dir.join("bus").display())) } fn sync_optional_seed_file(source: &Path, destination: &Path) -> io::Result<()> { if source.exists() { let bytes = fs::read(source)?; write_bytes_file(destination, &bytes)?; } else if destination.exists() { fs::remove_file(destination)?; } Ok(()) } fn write_bytes_file(path: &Path, bytes: &[u8]) -> io::Result<()> { let parent = path.parent().ok_or_else(|| { io::Error::new( io::ErrorKind::InvalidInput, format!("path `{}` has no parent directory", path.display()), ) })?; fs::create_dir_all(parent)?; let temp_path = path.with_extension(format!("tmp-{}", Uuid::new_v4())); fs::write(&temp_path, bytes)?; fs::rename(temp_path, path)?; Ok(()) } fn persist_consult_output( request: &ConsultRequest, result: &str, envelope: &ClaudeJsonEnvelope, ) -> io::Result { let path = PersistedConsultPath::new(request, envelope.session_id.as_deref())?; let saved_at = OffsetDateTime::now_utc() .format(&Rfc3339) .map_err(|error| io::Error::other(error.to_string()))?; let artifact = json!({ "kind": "phone_opus_consult", "saved_at": saved_at, "saved_unix_ms": unix_ms_now(), "cwd": request.cwd.display(), "prompt": request.prompt.as_str(), "prompt_prefix": CLAUDE_CONSULT_PREFIX, "effective_prompt": request.prompt.rendered(), "session_mode": request.session_mode(), "requested_session_id": request.requested_session_id(), "response": result, "model": model_name(envelope.model_usage.as_ref()), "duration_ms": envelope.duration_ms.unwrap_or(0), "duration_api_ms": envelope.duration_api_ms, "num_turns": envelope.num_turns.unwrap_or(0), "stop_reason": envelope.stop_reason, "session_id": envelope.session_id, "total_cost_usd": envelope.total_cost_usd, "usage": envelope.usage, "model_usage": envelope.model_usage, "permission_denials": envelope.permission_denials, "fast_mode_state": envelope.fast_mode_state, "uuid": envelope.uuid, }); write_json_file(path.as_path(), &artifact)?; let _ = prune_persisted_consult_outputs(); Ok(path) } fn prune_persisted_consult_outputs() -> io::Result<()> { let root = Path::new(CONSULT_OUTPUT_ROOT); if !root.exists() { return Ok(()); } let now = SystemTime::now(); let mut entries = fs::read_dir(root)? .filter_map(Result::ok) .filter(|entry| entry.file_type().is_ok_and(|kind| kind.is_file())) .filter_map(|entry| { let path = entry.path(); (path.extension().and_then(|ext| ext.to_str()) == Some("json")).then_some(path) }) .collect::>(); entries.sort_unstable_by(|left, right| right.file_name().cmp(&left.file_name())); for stale in entries.iter().skip(CONSULT_OUTPUT_KEEP_COUNT) { let _ = fs::remove_file(stale); } for path in entries.iter().take(CONSULT_OUTPUT_KEEP_COUNT) { let age = path .metadata() .and_then(|metadata| metadata.modified()) .ok() .and_then(|modified| now.duration_since(modified).ok()); if age.is_some_and(|age| age > CONSULT_OUTPUT_MAX_AGE) { let _ = fs::remove_file(path); } } Ok(()) } fn consult_slug(prompt: &str) -> String { let mut slug = String::new(); let mut last_was_dash = false; for ch in prompt.chars() { let next = if ch.is_ascii_alphanumeric() { Some(ch.to_ascii_lowercase()) } else if ch.is_ascii_whitespace() || "-_./:".contains(ch) { Some('-') } else { None }; let Some(next) = next else { continue; }; if next == '-' { if slug.is_empty() || last_was_dash { continue; } last_was_dash = true; } else { last_was_dash = false; } slug.push(next); if slug.len() >= 48 { break; } } while slug.ends_with('-') { let _ = slug.pop(); } if slug.is_empty() { "consult".to_owned() } else { slug } } fn consult_output( request: &ConsultRequest, response: &ConsultResponse, generation: Generation, operation: &str, ) -> Result { let concise = json!({ "mode": request.mode().as_str(), "response": response.result, "cwd": response.cwd.display(), "persisted_output_path": response.persisted_output_path.display(), "session_mode": request.session_mode(), "requested_session_id": request.requested_session_id(), "prompt_prefix_injected": true, "model": response.model_name(), "duration_ms": response.duration_ms, "num_turns": response.num_turns, "stop_reason": response.stop_reason, "session_id": response.session_id, "total_cost_usd": response.total_cost_usd, "permission_denial_count": response.permission_denials.len(), }); let full = json!({ "mode": request.mode().as_str(), "response": response.result, "cwd": response.cwd.display(), "persisted_output_path": response.persisted_output_path.display(), "prompt": request.prompt.as_str(), "prompt_prefix": CLAUDE_CONSULT_PREFIX, "effective_prompt": request.prompt.rendered(), "session_mode": request.session_mode(), "requested_session_id": request.requested_session_id(), "duration_ms": response.duration_ms, "duration_api_ms": response.duration_api_ms, "num_turns": response.num_turns, "stop_reason": response.stop_reason, "session_id": response.session_id, "total_cost_usd": response.total_cost_usd, "usage": response.usage, "model_usage": response.model_usage, "permission_denials": response.permission_denials, "fast_mode_state": response.fast_mode_state, "uuid": response.uuid, }); fallback_detailed_tool_output( &concise, &full, concise_text(request, response), Some(full_text(request, response)), SurfaceKind::Read, generation, FaultStage::Worker, operation, ) } fn concise_text(request: &ConsultRequest, response: &ConsultResponse) -> String { let mut status = vec![ "consult ok".to_owned(), format!("session={}", request.session_mode()), format!("turns={}", response.num_turns), format!("duration={}", render_duration_ms(response.duration_ms)), ]; if let Some(model) = response.model_name() { status.push(format!("model={model}")); } if let Some(stop_reason) = response.stop_reason.as_deref() { status.push(format!("stop={stop_reason}")); } if let Some(cost) = response.total_cost_usd { status.push(format!("cost=${cost:.6}")); } let mut lines = vec![status.join(" ")]; lines.push(format!("cwd: {}", response.cwd.display())); if let Some(session_id) = request.requested_session_id() { lines.push(format!("requested_session: {session_id}")); } if let Some(session_id) = response.session_id.as_deref() { lines.push(format!("session: {session_id}")); } lines.push(format!( "saved: {}", response.persisted_output_path.display() )); if !response.permission_denials.is_empty() { lines.push(format!( "permission_denials: {}", response.permission_denials.len() )); } lines.push("response:".to_owned()); lines.push(response.result.clone()); lines.join("\n") } fn full_text(request: &ConsultRequest, response: &ConsultResponse) -> String { let mut lines = vec![ format!( "consult ok session={} turns={}", request.session_mode(), response.num_turns ), format!("cwd: {}", response.cwd.display()), format!("duration: {}", render_duration_ms(response.duration_ms)), ]; if let Some(session_id) = request.requested_session_id() { lines.push(format!("requested_session: {session_id}")); } if let Some(duration_api_ms) = response.duration_api_ms { lines.push(format!( "api_duration: {}", render_duration_ms(duration_api_ms) )); } if let Some(model) = response.model_name() { lines.push(format!("model: {model}")); } if let Some(stop_reason) = response.stop_reason.as_deref() { lines.push(format!("stop: {stop_reason}")); } if let Some(session_id) = response.session_id.as_deref() { lines.push(format!("session: {session_id}")); } lines.push(format!( "saved: {}", response.persisted_output_path.display() )); if let Some(cost) = response.total_cost_usd { lines.push(format!("cost_usd: {cost:.6}")); } lines.push(format!( "permission_denials: {}", response.permission_denials.len() )); if let Some(fast_mode_state) = response.fast_mode_state.as_deref() { lines.push(format!("fast_mode: {fast_mode_state}")); } if let Some(uuid) = response.uuid.as_deref() { lines.push(format!("uuid: {uuid}")); } if let Some(usage) = usage_summary(response.usage.as_ref()) { lines.push(format!("usage: {usage}")); } lines.push("response:".to_owned()); lines.push(response.result.clone()); lines.join("\n") } fn usage_summary(usage: Option<&Value>) -> Option { let Value::Object(usage) = usage? else { return None; }; let summary = usage .iter() .filter_map(|(key, value)| match value { Value::Number(number) => Some((key.clone(), number.to_string())), Value::String(text) if !text.is_empty() => Some((key.clone(), text.clone())), _ => None, }) .collect::>(); (!summary.is_empty()).then(|| { summary .into_iter() .map(|(key, value)| format!("{key}={value}")) .collect::>() .join(" ") }) } fn model_name(model_usage: Option<&Value>) -> Option { let Value::Object(models) = model_usage? else { return None; }; models.keys().next().cloned() } fn elapsed_duration_ms(duration: Duration) -> u64 { duration.as_millis().try_into().unwrap_or(u64::MAX) } fn render_duration_ms(duration_ms: u64) -> String { if duration_ms < 1_000 { return format!("{duration_ms}ms"); } let seconds = duration_ms as f64 / 1_000.0; format!("{seconds:.3}s") } fn generation_from_wire(raw: u64) -> Generation { let mut generation = Generation::genesis(); for _ in 1..raw { generation = generation.next(); } generation }