From 2fc866a3ce50b6ba9c5e84e0ad2f8c77517361ff Mon Sep 17 00:00:00 2001 From: main Date: Tue, 31 Mar 2026 13:21:05 -0400 Subject: Excise hidden consult machinery --- crates/phone-opus/src/main.rs | 11 - crates/phone-opus/src/mcp/host/runtime.rs | 26 +- crates/phone-opus/src/mcp/mod.rs | 2 +- crates/phone-opus/src/mcp/service.rs | 886 +----------------------------- 4 files changed, 11 insertions(+), 914 deletions(-) (limited to 'crates/phone-opus/src') diff --git a/crates/phone-opus/src/main.rs b/crates/phone-opus/src/main.rs index a1cace7..79ace26 100644 --- a/crates/phone-opus/src/main.rs +++ b/crates/phone-opus/src/main.rs @@ -3,7 +3,6 @@ mod mcp; use clap::{Args, Parser, Subcommand}; #[cfg(test)] use libmcp_testkit as _; -use std::path::PathBuf; #[derive(Parser)] #[command( @@ -31,8 +30,6 @@ enum McpCommand { Serve, /// Run the disposable worker process. Worker(McpWorkerArgs), - /// Run one detached background consult job. - BackgroundConsult(McpBackgroundConsultArgs), } #[derive(Args)] @@ -42,20 +39,12 @@ struct McpWorkerArgs { generation: u64, } -#[derive(Args)] -struct McpBackgroundConsultArgs { - /// Persisted background job file to execute and update. - #[arg(long)] - job_file: PathBuf, -} - fn main() -> Result<(), Box> { let cli = Cli::parse(); match cli.command { Command::Mcp { command } => match command { McpCommand::Serve => mcp::run_host()?, McpCommand::Worker(args) => mcp::run_worker(args.generation)?, - McpCommand::BackgroundConsult(args) => mcp::run_background_consult(args.job_file)?, }, } Ok(()) diff --git a/crates/phone-opus/src/mcp/host/runtime.rs b/crates/phone-opus/src/mcp/host/runtime.rs index df54844..9855191 100644 --- a/crates/phone-opus/src/mcp/host/runtime.rs +++ b/crates/phone-opus/src/mcp/host/runtime.rs @@ -25,9 +25,6 @@ use crate::mcp::protocol::{ FORCE_ROLLOUT_ENV, HOST_STATE_ENV, HostRequestId, HostStateSeed, PROTOCOL_VERSION, SERVER_NAME, WORKER_CRASH_ONCE_ENV, WorkerOperation, WorkerSpawnConfig, }; -use crate::mcp::service::{ - consult_job_tool_output, consult_jobs_tool_output, consult_wait_tool_output, -}; use crate::mcp::telemetry::ServerTelemetry; pub(crate) fn run_host() -> Result<(), Box> { @@ -363,30 +360,9 @@ impl HostRuntime { fn handle_host_tool(&mut self, name: &str, arguments: Value) -> Result { let operation = format!("tools/call:{name}"); let generation = self.worker.generation(); - let (presentation, arguments) = + let (presentation, _arguments) = split_presentation(arguments, &operation, generation, FaultStage::Host)?; match name { - "consult_job" => tool_success( - consult_job_tool_output(arguments, generation, FaultStage::Host, &operation)?, - presentation, - generation, - FaultStage::Host, - &operation, - ), - "consult_wait" => tool_success( - consult_wait_tool_output(arguments, generation, FaultStage::Host, &operation)?, - presentation, - generation, - FaultStage::Host, - &operation, - ), - "consult_jobs" => tool_success( - consult_jobs_tool_output(arguments, generation, FaultStage::Host, &operation)?, - presentation, - generation, - FaultStage::Host, - &operation, - ), "health_snapshot" => { let rollout = if self.binary.rollout_pending().map_err(|error| { FaultRecord::rollout(generation, &operation, error.to_string()) diff --git a/crates/phone-opus/src/mcp/mod.rs b/crates/phone-opus/src/mcp/mod.rs index ecf5aad..666598f 100644 --- a/crates/phone-opus/src/mcp/mod.rs +++ b/crates/phone-opus/src/mcp/mod.rs @@ -7,4 +7,4 @@ mod service; mod telemetry; pub(crate) use host::runtime::run_host; -pub(crate) use service::{run_background_consult, run_worker}; +pub(crate) use service::run_worker; diff --git a/crates/phone-opus/src/mcp/service.rs b/crates/phone-opus/src/mcp/service.rs index afb811e..5f6ab99 100644 --- a/crates/phone-opus/src/mcp/service.rs +++ b/crates/phone-opus/src/mcp/service.rs @@ -12,7 +12,7 @@ use std::time::{Duration, Instant, SystemTime, UNIX_EPOCH}; use dirs::{home_dir, state_dir}; use libmcp::{Generation, SurfaceKind}; use serde::{Deserialize, Serialize}; -use serde_json::{Map, Value, json}; +use serde_json::{Value, json}; use thiserror::Error; use time::{OffsetDateTime, format_description::well_known::Rfc3339}; use users::get_current_uid; @@ -56,11 +56,6 @@ pub(crate) fn run_worker(generation: u64) -> Result<(), Box Result<(), Box> { - execute_background_consult(job_file)?; - Ok(()) -} - struct WorkerService { generation: Generation, } @@ -119,98 +114,26 @@ struct ConsultArgs { cwd: Option, } -#[derive(Debug, Deserialize)] -struct ConsultJobArgs { - job_id: String, -} - -#[derive(Debug, Deserialize)] -struct ConsultWaitArgs { - job_id: String, - timeout_ms: Option, - poll_interval_ms: Option, -} - -#[derive(Debug, Default, Deserialize)] -struct ConsultJobsArgs { - limit: Option, -} - -const DEFAULT_CONSULT_WAIT_TIMEOUT_MS: u64 = 30 * 60 * 1_000; -const DEFAULT_CONSULT_WAIT_POLL_INTERVAL_MS: u64 = 1_000; -const MIN_CONSULT_WAIT_POLL_INTERVAL_MS: u64 = 10; - #[derive(Debug, Clone)] struct ConsultRequest { prompt: PromptText, cwd: WorkingDirectory, - context_key: ConsultContextKey, - session_plan: ConsultSessionPlan, + session: SessionHandle, } impl ConsultRequest { fn parse(args: ConsultArgs) -> Result { let prompt = PromptText::parse(args.prompt)?; let cwd = WorkingDirectory::resolve(args.cwd)?; - let context_key = ConsultContextKey::from_cwd(&cwd); - let session_plan = ConsultSessionPlan::fresh(); Ok(Self { prompt, cwd, - context_key, - session_plan, + session: SessionHandle::fresh(), }) } - fn planned_session_id(&self) -> String { - self.session_plan.planned_session().display() - } - - #[allow( - dead_code, - reason = "session metadata is retained internally but hidden from the public surface" - )] - fn context_mode(&self) -> &'static str { - self.session_plan.context_mode() - } - - #[allow( - dead_code, - reason = "session metadata is retained internally but hidden from the public surface" - )] - fn reused_session_id(&self) -> Option { - self.session_plan.reused_session_id() - } - - fn launch_session_id(&self) -> Option { - match self.session_plan { - ConsultSessionPlan::Start { .. } => Some(self.planned_session_id()), - ConsultSessionPlan::Resume(_) => None, - } - } - - fn remember_context(&self, session_id: Option<&str>) -> io::Result<()> { - confirm_consult_context( - &self.context_key, - session_id, - self.session_plan.planned_session(), - ) - } - - fn remember_planned_context(&self) -> io::Result<()> { - if let ConsultSessionPlan::Start { session, .. } = &self.session_plan { - remember_planned_consult_context(&self.context_key, session) - } else { - Ok(()) - } - } - - #[allow(dead_code, reason = "background submission is parked but not exposed")] - fn background_request(&self) -> BackgroundConsultRequest { - BackgroundConsultRequest { - prompt: self.prompt.as_str().to_owned(), - cwd: self.cwd.display(), - } + fn session_id(&self) -> String { + self.session.display() } } @@ -310,353 +233,11 @@ impl SessionHandle { Self(Uuid::new_v4()) } - fn parse(raw: &str) -> Option { - Uuid::parse_str(raw).ok().map(Self) - } - - fn display(&self) -> String { - self.0.to_string() - } -} - -#[derive(Debug, Clone, Eq, Ord, PartialEq, PartialOrd)] -struct ConsultContextKey(String); - -impl ConsultContextKey { - fn from_cwd(cwd: &WorkingDirectory) -> Self { - Self(cwd.display()) - } - - fn as_str(&self) -> &str { - self.0.as_str() - } -} - -#[derive(Debug, Clone, Deserialize, Serialize)] -struct StoredConsultContext { - session_id: String, - #[serde(default = "default_consult_context_state")] - state: StoredConsultContextState, - updated_unix_ms: u64, -} - -#[derive(Debug, Clone, Copy, Default, Deserialize, Eq, PartialEq, Serialize)] -#[serde(rename_all = "snake_case")] -enum StoredConsultContextState { - Planned, - #[default] - Confirmed, -} - -const fn default_consult_context_state() -> StoredConsultContextState { - StoredConsultContextState::Confirmed -} - -#[derive(Debug, Default, Deserialize, Serialize)] -struct ConsultContextIndex { - by_cwd: BTreeMap, -} - -impl ConsultContextIndex { - #[allow( - dead_code, - reason = "context lookup is parked while session reuse stays disabled" - )] - fn context_for(&self, key: &ConsultContextKey) -> Option { - self.by_cwd.get(key.as_str()).cloned() - } - - fn remember( - &mut self, - key: &ConsultContextKey, - session: &SessionHandle, - state: StoredConsultContextState, - ) { - let _ = self.by_cwd.insert( - key.as_str().to_owned(), - StoredConsultContext { - session_id: session.display(), - state, - updated_unix_ms: unix_ms_now(), - }, - ); - } -} - -#[derive(Debug, Clone)] -enum ConsultSessionPlan { - Start { - session: SessionHandle, - reused: bool, - }, - #[allow( - dead_code, - reason = "resume plans are parked while one-shot consults are enforced" - )] - Resume(SessionHandle), -} - -impl ConsultSessionPlan { - fn fresh() -> Self { - Self::Start { - session: SessionHandle::fresh(), - reused: false, - } - } - - #[allow( - dead_code, - reason = "stored-session revival is parked while one-shot consults are enforced" - )] - fn from_stored(context: StoredConsultContext) -> Option { - let session = SessionHandle::parse(context.session_id.as_str())?; - Some(match context.state { - StoredConsultContextState::Planned => Self::Start { - session, - reused: true, - }, - StoredConsultContextState::Confirmed => Self::Resume(session), - }) - } - - fn planned_session(&self) -> &SessionHandle { - match self { - Self::Start { session, .. } | Self::Resume(session) => session, - } - } - - #[allow( - dead_code, - reason = "resume paths are parked while one-shot consults are enforced" - )] - fn resume_session(&self) -> Option<&SessionHandle> { - match self { - Self::Resume(session) => Some(session), - Self::Start { .. } => None, - } - } - - fn context_mode(&self) -> &'static str { - match self { - Self::Start { reused: false, .. } => "fresh", - Self::Start { reused: true, .. } | Self::Resume(_) => "reused", - } - } - - fn reused_session_id(&self) -> Option { - match self { - Self::Start { - reused: true, - session, - } - | Self::Resume(session) => Some(session.display()), - Self::Start { reused: false, .. } => None, - } - } -} - -#[derive(Debug, Clone, Deserialize, Eq, PartialEq, Serialize)] -struct BackgroundConsultRequest { - prompt: String, - cwd: String, -} - -impl BackgroundConsultRequest { - fn into_consult_request(self) -> Result { - ConsultRequest::parse(ConsultArgs { - prompt: self.prompt, - cwd: Some(self.cwd), - }) - } -} - -#[derive(Debug, Clone, Copy, Deserialize, Eq, PartialEq, Serialize)] -#[serde(rename_all = "snake_case")] -enum BackgroundConsultStatus { - Queued, - Running, - Succeeded, - Failed, -} - -impl BackgroundConsultStatus { - fn done(self) -> bool { - matches!(self, Self::Succeeded | Self::Failed) - } - - fn success(self) -> bool { - matches!(self, Self::Succeeded) - } - - fn failed(self) -> bool { - matches!(self, Self::Failed) - } -} - -#[derive(Debug, Clone, Deserialize, Eq, Ord, PartialEq, PartialOrd, Serialize)] -#[serde(transparent)] -struct BackgroundConsultJobId(Uuid); - -impl BackgroundConsultJobId { - fn new() -> Self { - Self(Uuid::new_v4()) - } - - fn parse(raw: String) -> Result { - Uuid::parse_str(&raw) - .map(Self) - .map_err(|_| ConsultRequestError::InvalidJobHandle(raw)) - } - fn display(&self) -> String { self.0.to_string() } } -#[derive(Debug, Clone, Deserialize, Eq, PartialEq, Serialize)] -struct BackgroundConsultFailure { - class: String, - detail: String, -} - -#[derive(Debug, Clone, Deserialize, PartialEq, Serialize)] -struct BackgroundConsultResponseRecord { - cwd: String, - response: String, - persisted_output_path: String, - duration_ms: u64, - duration_api_ms: Option, - num_turns: u64, - stop_reason: Option, - session_id: Option, - total_cost_usd: Option, - usage: Option, - model_usage: Option, - permission_denials: Vec, - fast_mode_state: Option, - uuid: Option, -} - -impl BackgroundConsultResponseRecord { - fn from_response(response: ConsultResponse) -> Self { - Self { - cwd: response.cwd.display(), - response: response.result, - persisted_output_path: response.persisted_output_path.display(), - duration_ms: response.duration_ms, - duration_api_ms: response.duration_api_ms, - num_turns: response.num_turns, - stop_reason: response.stop_reason, - session_id: response.session_id, - total_cost_usd: response.total_cost_usd, - usage: response.usage, - model_usage: response.model_usage, - permission_denials: response.permission_denials, - fast_mode_state: response.fast_mode_state, - uuid: response.uuid, - } - } - - fn model_name(&self) -> Option { - model_name(self.model_usage.as_ref()) - } -} - -#[derive(Debug, Clone, Deserialize, PartialEq, Serialize)] -struct BackgroundConsultJobRecord { - job_id: BackgroundConsultJobId, - status: BackgroundConsultStatus, - created_unix_ms: u64, - updated_unix_ms: u64, - started_unix_ms: Option, - finished_unix_ms: Option, - runner_pid: Option, - request: BackgroundConsultRequest, - prompt_prefix_injected: bool, - result: Option, - failure: Option, -} - -#[derive(Debug, Clone, Copy, Eq, PartialEq)] -struct ConsultWaitConfig { - timeout_ms: u64, - poll_interval_ms: u64, -} - -impl ConsultWaitConfig { - fn parse(timeout_ms: Option, poll_interval_ms: Option) -> Self { - Self { - timeout_ms: timeout_ms.unwrap_or(DEFAULT_CONSULT_WAIT_TIMEOUT_MS), - poll_interval_ms: poll_interval_ms - .unwrap_or(DEFAULT_CONSULT_WAIT_POLL_INTERVAL_MS) - .max(MIN_CONSULT_WAIT_POLL_INTERVAL_MS), - } - } -} - -#[derive(Debug, Clone)] -struct BackgroundConsultWaitRequest { - job_id: BackgroundConsultJobId, - config: ConsultWaitConfig, -} - -impl BackgroundConsultWaitRequest { - fn parse(args: ConsultWaitArgs) -> Result { - Ok(Self { - job_id: BackgroundConsultJobId::parse(args.job_id)?, - config: ConsultWaitConfig::parse(args.timeout_ms, args.poll_interval_ms), - }) - } -} - -#[derive(Debug, Clone, Copy, Eq, PartialEq)] -struct BackgroundConsultWaitMetadata { - waited_ms: u64, - timed_out: bool, -} - -#[derive(Debug, Clone)] -struct BackgroundConsultWaitOutcome { - record: BackgroundConsultJobRecord, - metadata: BackgroundConsultWaitMetadata, -} - -impl BackgroundConsultJobRecord { - fn new(job_id: BackgroundConsultJobId, request: BackgroundConsultRequest) -> Self { - let now = unix_ms_now(); - Self { - job_id, - status: BackgroundConsultStatus::Queued, - created_unix_ms: now, - updated_unix_ms: now, - started_unix_ms: None, - finished_unix_ms: None, - runner_pid: None, - request, - prompt_prefix_injected: true, - result: None, - failure: None, - } - } - - fn summary(&self) -> Value { - json!({ - "job_id": self.job_id.display(), - "status": self.status, - "done": self.status.done(), - "succeeded": self.status.success(), - "failed": self.status.failed(), - "created_unix_ms": self.created_unix_ms, - "updated_unix_ms": self.updated_unix_ms, - "started_unix_ms": self.started_unix_ms, - "finished_unix_ms": self.finished_unix_ms, - "runner_pid": self.runner_pid, - "cwd": self.request.cwd, - "prompt_prefix_injected": self.prompt_prefix_injected, - }) - } -} - #[derive(Debug, Error)] enum ConsultRequestError { #[error("prompt must not be empty")] @@ -667,14 +248,6 @@ enum ConsultRequestError { Canonicalize { path: String, source: io::Error }, #[error("working directory `{0}` is not a directory")] NotDirectory(String), - #[allow( - dead_code, - reason = "context index loading is parked while one-shot consults are enforced" - )] - #[error("failed to resolve consult context state: {source}")] - ContextIndex { source: io::Error }, - #[error("job_id must be a valid UUID, got `{0}`")] - InvalidJobHandle(String), } #[derive(Debug, Error)] @@ -700,7 +273,6 @@ struct ClaudeJsonEnvelope { num_turns: Option, result: Option, stop_reason: Option, - session_id: Option, total_cost_usd: Option, usage: Option, #[serde(rename = "modelUsage")] @@ -716,31 +288,10 @@ struct ConsultResponse { cwd: WorkingDirectory, result: String, persisted_output_path: PersistedConsultPath, - #[allow( - dead_code, - reason = "session metadata is retained internally but hidden from the public surface" - )] - context_mode: &'static str, - #[allow( - dead_code, - reason = "session metadata is retained internally but hidden from the public surface" - )] - planned_session_id: String, - #[allow( - dead_code, - reason = "session metadata is retained internally but hidden from the public surface" - )] - reused_session_id: Option, duration_ms: u64, duration_api_ms: Option, num_turns: u64, stop_reason: Option, - session_id: Option, - #[allow( - dead_code, - reason = "session metadata is retained internally but hidden from the public surface" - )] - observed_session_id: Option, total_cost_usd: Option, usage: Option, model_usage: Option, @@ -902,7 +453,6 @@ const SYSTEMD_RUN_BINARY: &str = "systemd-run"; const SYSTEMCTL_BINARY: &str = "systemctl"; const DEFAULT_PATH: &str = "/usr/local/bin:/usr/bin:/bin"; const PHONE_OPUS_STATE_ROOT_NAME: &str = "phone_opus"; -const CONSULT_CONTEXT_INDEX_FILE_NAME: &str = "consult_contexts.json"; const CLAUDE_HOME_DIR_NAME: &str = "claude-home"; const XDG_CONFIG_DIR_NAME: &str = "xdg-config"; const XDG_CACHE_DIR_NAME: &str = "xdg-cache"; @@ -1210,359 +760,6 @@ fn consult_retry_hint(quota_limited: bool, error: &ConsultInvocationError) -> Op } } -pub(crate) fn consult_job_tool_output( - arguments: Value, - generation: Generation, - stage: FaultStage, - operation: &str, -) -> Result { - let args = deserialize::(arguments, operation, generation)?; - let job_id = BackgroundConsultJobId::parse(args.job_id) - .map_err(|error| invalid_consult_request(generation, operation, error))?; - let record = load_background_consult_job(&job_id).map_err(|error| { - FaultRecord::downstream(generation, stage, operation, error.to_string()) - })?; - background_job_tool_output(&record, None, generation, stage, operation) -} - -pub(crate) fn consult_wait_tool_output( - arguments: Value, - generation: Generation, - stage: FaultStage, - operation: &str, -) -> Result { - let args = deserialize::(arguments, operation, generation)?; - let request = BackgroundConsultWaitRequest::parse(args) - .map_err(|error| invalid_consult_request(generation, operation, error))?; - let outcome = wait_for_background_consult(&request).map_err(|error| { - FaultRecord::downstream(generation, stage, operation, error.to_string()) - })?; - background_job_tool_output( - &outcome.record, - Some(outcome.metadata), - generation, - stage, - operation, - ) -} - -pub(crate) fn consult_jobs_tool_output( - arguments: Value, - generation: Generation, - stage: FaultStage, - operation: &str, -) -> Result { - let args = deserialize::(arguments, operation, generation)?; - let limit = args.limit.unwrap_or(10).clamp(1, 50); - let jobs = load_recent_background_consult_jobs(limit as usize).map_err(|error| { - FaultRecord::downstream(generation, stage, operation, error.to_string()) - })?; - let concise = json!({ - "jobs": jobs.iter().map(BackgroundConsultJobRecord::summary).collect::>(), - "count": jobs.len(), - }); - let full = json!({ - "jobs": jobs, - "count": jobs.len(), - }); - let lines = if jobs.is_empty() { - "no background consult jobs".to_owned() - } else { - jobs.iter() - .map(|job| { - format!( - "{} status={} cwd={}", - job.job_id.display(), - serde_json::to_string(&job.status).unwrap_or_else(|_| "\"unknown\"".to_owned()), - job.request.cwd - ) - }) - .collect::>() - .join("\n") - }; - fallback_detailed_tool_output( - &concise, - &full, - lines, - None, - SurfaceKind::Read, - generation, - stage, - operation, - ) -} - -#[allow(dead_code, reason = "background submission is parked but not exposed")] -fn submit_background_consult( - request: &ConsultRequest, - generation: Generation, - stage: FaultStage, - operation: &str, -) -> Result { - let job_id = BackgroundConsultJobId::new(); - let mut record = BackgroundConsultJobRecord::new(job_id, request.background_request()); - persist_background_consult_job(&record) - .map_err(|error| FaultRecord::process(generation, stage, operation, error.to_string()))?; - - let executable = std::env::current_exe() - .map_err(|error| FaultRecord::process(generation, stage, operation, error.to_string()))?; - let job_file = background_consult_job_path(&record.job_id) - .map_err(|error| FaultRecord::process(generation, stage, operation, error.to_string()))?; - let child = Command::new(executable) - .arg("mcp") - .arg("background-consult") - .arg("--job-file") - .arg(&job_file) - .stdin(Stdio::null()) - .stdout(Stdio::null()) - .stderr(Stdio::null()) - .spawn() - .map_err(|error| FaultRecord::process(generation, stage, operation, error.to_string()))?; - - record.status = BackgroundConsultStatus::Running; - record.started_unix_ms = Some(unix_ms_now()); - record.updated_unix_ms = unix_ms_now(); - record.runner_pid = Some(child.id()); - persist_background_consult_job(&record) - .map_err(|error| FaultRecord::process(generation, stage, operation, error.to_string()))?; - - let concise = json!({ - "mode": "background", - "job_id": record.job_id.display(), - "status": record.status, - "done": false, - "reused_session_id": request.reused_session_id(), - "context_mode": request.context_mode(), - "prompt_prefix_injected": true, - "follow_up_tools": ["consult_wait", "consult_job", "consult_jobs"], - }); - let full = json!({ - "mode": "background", - "job_id": record.job_id.display(), - "status": record.status, - "done": false, - "reused_session_id": request.reused_session_id(), - "context_mode": request.context_mode(), - "prompt_prefix_injected": true, - "prompt": request.prompt.as_str(), - "effective_prompt": request.prompt.rendered(), - "cwd": request.cwd.display(), - "follow_up_tools": ["consult_wait", "consult_job", "consult_jobs"], - }); - fallback_detailed_tool_output( - &concise, - &full, - format!( - "background consult submitted job={} status=running", - record.job_id.display() - ), - None, - SurfaceKind::Read, - generation, - stage, - operation, - ) -} - -fn execute_background_consult(job_file: PathBuf) -> io::Result<()> { - let mut record = read_json_file::(&job_file)?; - record.status = BackgroundConsultStatus::Running; - let _ = record.runner_pid.get_or_insert(std::process::id()); - let _ = record.started_unix_ms.get_or_insert_with(unix_ms_now); - record.updated_unix_ms = unix_ms_now(); - persist_background_consult_job_to_path(&job_file, &record)?; - - let request = record - .request - .clone() - .into_consult_request() - .map_err(|error| io::Error::other(error.to_string()))?; - match invoke_claude(&request) { - Ok(response) => { - record.status = BackgroundConsultStatus::Succeeded; - record.result = Some(BackgroundConsultResponseRecord::from_response(response)); - record.failure = None; - } - Err(error) => { - record.status = BackgroundConsultStatus::Failed; - record.result = None; - record.failure = Some(background_failure(error)); - } - } - record.finished_unix_ms = Some(unix_ms_now()); - record.updated_unix_ms = unix_ms_now(); - record.runner_pid = None; - persist_background_consult_job_to_path(&job_file, &record) -} - -fn background_failure(error: ConsultInvocationError) -> BackgroundConsultFailure { - match error { - ConsultInvocationError::Spawn(source) => BackgroundConsultFailure { - class: "process".to_owned(), - detail: source.to_string(), - }, - ConsultInvocationError::InvalidJson(detail) - | ConsultInvocationError::Stalled(detail) - | ConsultInvocationError::Downstream(detail) => BackgroundConsultFailure { - class: "downstream".to_owned(), - detail, - }, - } -} - -fn wait_for_background_consult( - request: &BackgroundConsultWaitRequest, -) -> io::Result { - let started_at = Instant::now(); - loop { - let record = load_background_consult_job(&request.job_id)?; - let waited_ms = elapsed_duration_ms(started_at.elapsed()); - if record.status.done() { - return Ok(BackgroundConsultWaitOutcome { - record, - metadata: BackgroundConsultWaitMetadata { - waited_ms, - timed_out: false, - }, - }); - } - if waited_ms >= request.config.timeout_ms { - return Ok(BackgroundConsultWaitOutcome { - record, - metadata: BackgroundConsultWaitMetadata { - waited_ms, - timed_out: true, - }, - }); - } - let remaining_ms = request.config.timeout_ms.saturating_sub(waited_ms); - let sleep_ms = remaining_ms.min(request.config.poll_interval_ms); - thread::sleep(Duration::from_millis(sleep_ms)); - } -} - -fn background_job_tool_output( - record: &BackgroundConsultJobRecord, - wait: Option, - generation: Generation, - stage: FaultStage, - operation: &str, -) -> Result { - let mut concise = match record.summary() { - Value::Object(object) => object, - _ => Map::new(), - }; - if let Some(wait) = wait { - let _ = concise.insert("waited_ms".to_owned(), json!(wait.waited_ms)); - let _ = concise.insert("timed_out".to_owned(), json!(wait.timed_out)); - } - let _ = concise.insert( - "result".to_owned(), - json!(record.result.as_ref().map(|result| json!({ - "response": result.response, - "persisted_output_path": result.persisted_output_path, - "duration_ms": result.duration_ms, - "num_turns": result.num_turns, - "session_id": result.session_id, - "model": result.model_name(), - }))), - ); - let _ = concise.insert("failure".to_owned(), json!(record.failure)); - let mut full = Map::from_iter([("job".to_owned(), json!(record))]); - if let Some(wait) = wait { - let _ = full.insert("waited_ms".to_owned(), json!(wait.waited_ms)); - let _ = full.insert("timed_out".to_owned(), json!(wait.timed_out)); - } - let mut lines = vec![format!( - "job={} status={:?}", - record.job_id.display(), - record.status - )]; - if let Some(wait) = wait { - lines.push(format!( - "waited={} timed_out={}", - render_duration_ms(wait.waited_ms), - wait.timed_out - )); - } - if let Some(result) = record.result.as_ref() { - lines.push(format!( - "result ready model={} turns={} duration={}", - result.model_name().unwrap_or_else(|| "unknown".to_owned()), - result.num_turns, - render_duration_ms(result.duration_ms) - )); - lines.push(format!("saved={}", result.persisted_output_path)); - lines.push("response:".to_owned()); - lines.push(result.response.clone()); - } - if let Some(failure) = record.failure.as_ref() { - lines.push(format!("failure={} {}", failure.class, failure.detail)); - } - fallback_detailed_tool_output( - &Value::Object(concise), - &Value::Object(full), - lines.join("\n"), - None, - SurfaceKind::Read, - generation, - stage, - operation, - ) -} - -fn background_consult_job_root() -> io::Result { - let path = phone_opus_state_root()?.join("mcp").join("consult_jobs"); - fs::create_dir_all(&path)?; - Ok(path) -} - -fn background_consult_job_path(job_id: &BackgroundConsultJobId) -> io::Result { - Ok(background_consult_job_root()?.join(format!("{}.json", job_id.display()))) -} - -fn persist_background_consult_job(record: &BackgroundConsultJobRecord) -> io::Result<()> { - let path = background_consult_job_path(&record.job_id)?; - persist_background_consult_job_to_path(&path, record) -} - -fn persist_background_consult_job_to_path( - path: &Path, - record: &BackgroundConsultJobRecord, -) -> io::Result<()> { - write_json_file(path, record) -} - -fn load_background_consult_job( - job_id: &BackgroundConsultJobId, -) -> io::Result { - read_json_file(background_consult_job_path(job_id)?.as_path()) -} - -fn load_recent_background_consult_jobs( - limit: usize, -) -> io::Result> { - let mut jobs = fs::read_dir(background_consult_job_root()?)? - .filter_map(Result::ok) - .map(|entry| entry.path()) - .filter(|path| path.extension().and_then(|ext| ext.to_str()) == Some("json")) - .filter_map(|path| read_json_file::(&path).ok()) - .collect::>(); - jobs.sort_by(|left, right| { - right - .updated_unix_ms - .cmp(&left.updated_unix_ms) - .then_with(|| left.job_id.cmp(&right.job_id)) - }); - jobs.truncate(limit); - Ok(jobs) -} - -fn read_json_file Deserialize<'de>>(path: &Path) -> io::Result { - let bytes = fs::read(path)?; - serde_json::from_slice(&bytes).map_err(|error| io::Error::other(error.to_string())) -} - fn write_json_file(path: &Path, value: &T) -> io::Result<()> { let payload = serde_json::to_vec_pretty(value).map_err(|error| io::Error::other(error.to_string()))?; @@ -1584,7 +781,7 @@ fn invoke_claude(request: &ConsultRequest) -> Result Result Result Result io::Result<()> { } fn claude_unit_name(request: &ConsultRequest) -> String { - format!("phone-opus-claude-{}", request.planned_session_id()) + format!("phone-opus-claude-{}", request.session_id()) } fn claude_stall_timeout() -> Duration { @@ -1890,51 +1071,6 @@ fn phone_opus_state_root() -> io::Result { Ok(root) } -fn consult_context_index_path() -> io::Result { - let root = phone_opus_state_root()?.join("mcp"); - fs::create_dir_all(&root)?; - Ok(root.join(CONSULT_CONTEXT_INDEX_FILE_NAME)) -} - -fn load_consult_context_index() -> io::Result { - let path = consult_context_index_path()?; - match read_json_file::(path.as_path()) { - Ok(index) => Ok(index), - Err(error) if error.kind() == io::ErrorKind::NotFound => Ok(ConsultContextIndex::default()), - Err(error) => Err(error), - } -} - -#[allow( - dead_code, - reason = "context lookup is parked while one-shot consults are enforced" -)] -fn load_consult_context(key: &ConsultContextKey) -> io::Result> { - Ok(load_consult_context_index()?.context_for(key)) -} - -fn remember_planned_consult_context( - key: &ConsultContextKey, - session_id: &SessionHandle, -) -> io::Result<()> { - let mut index = load_consult_context_index()?; - index.remember(key, session_id, StoredConsultContextState::Planned); - write_json_file(consult_context_index_path()?.as_path(), &index) -} - -fn confirm_consult_context( - key: &ConsultContextKey, - observed_session_id: Option<&str>, - fallback_session_id: &SessionHandle, -) -> io::Result<()> { - let session_id = observed_session_id - .and_then(SessionHandle::parse) - .unwrap_or_else(|| fallback_session_id.clone()); - let mut index = load_consult_context_index()?; - index.remember(key, &session_id, StoredConsultContextState::Confirmed); - write_json_file(consult_context_index_path()?.as_path(), &index) -} - fn caller_home_dir() -> Option { std::env::var_os("HOME") .filter(|value| !value.is_empty()) @@ -2250,10 +1386,6 @@ fn model_name(model_usage: Option<&Value>) -> Option { models.keys().next().cloned() } -fn elapsed_duration_ms(duration: Duration) -> u64 { - duration.as_millis().try_into().unwrap_or(u64::MAX) -} - fn render_duration_ms(duration_ms: u64) -> String { if duration_ms < 1_000 { return format!("{duration_ms}ms"); -- cgit v1.2.3