From c3ad44cf3ec3bcd080f62c19d915ac1749576302 Mon Sep 17 00:00:00 2001 From: main Date: Mon, 23 Mar 2026 16:28:52 -0400 Subject: Add optional background consult jobs --- crates/phone-opus/src/mcp/service.rs | 564 ++++++++++++++++++++++++++++++++++- 1 file changed, 559 insertions(+), 5 deletions(-) (limited to 'crates/phone-opus/src/mcp/service.rs') diff --git a/crates/phone-opus/src/mcp/service.rs b/crates/phone-opus/src/mcp/service.rs index d42c0a0..5b38c4b 100644 --- a/crates/phone-opus/src/mcp/service.rs +++ b/crates/phone-opus/src/mcp/service.rs @@ -1,10 +1,13 @@ use std::collections::BTreeMap; +use std::fs; use std::io::{self, BufRead, Write}; use std::path::{Path, PathBuf}; -use std::process::Command; +use std::process::{Command, Stdio}; +use std::time::{Duration, SystemTime, UNIX_EPOCH}; +use dirs::{home_dir, state_dir}; use libmcp::{Generation, SurfaceKind}; -use serde::Deserialize; +use serde::{Deserialize, Serialize}; use serde_json::{Value, json}; use thiserror::Error; use uuid::Uuid; @@ -47,6 +50,11 @@ pub(crate) fn run_worker(generation: u64) -> Result<(), Box Result<(), Box> { + execute_background_consult(job_file)?; + Ok(()) +} + struct WorkerService { generation: Generation, } @@ -76,9 +84,19 @@ impl WorkerService { let args = deserialize::(arguments, &operation, self.generation)?; let request = ConsultRequest::parse(args) .map_err(|error| invalid_consult_request(self.generation, &operation, error))?; - let response = invoke_claude(&request) - .map_err(|error| consult_fault(self.generation, &operation, error))?; - consult_output(&request, &response, self.generation, &operation)? + match request.mode() { + ConsultMode::Sync => { + let response = invoke_claude(&request) + .map_err(|error| consult_fault(self.generation, &operation, error))?; + consult_output(&request, &response, self.generation, &operation)? + } + ConsultMode::Background => submit_background_consult( + &request, + self.generation, + FaultStage::Worker, + &operation, + )?, + } } other => { return Err(FaultRecord::invalid_input( @@ -105,6 +123,17 @@ struct ConsultArgs { cwd: Option, max_turns: Option, session_id: Option, + background: Option, +} + +#[derive(Debug, Deserialize)] +struct ConsultJobArgs { + job_id: String, +} + +#[derive(Debug, Default, Deserialize)] +struct ConsultJobsArgs { + limit: Option, } #[derive(Debug, Clone)] @@ -113,6 +142,7 @@ struct ConsultRequest { cwd: WorkingDirectory, max_turns: Option, session: Option, + mode: ConsultMode, } impl ConsultRequest { @@ -122,9 +152,14 @@ impl ConsultRequest { cwd: WorkingDirectory::resolve(args.cwd)?, max_turns: args.max_turns.map(TurnLimit::parse).transpose()?, session: args.session_id.map(SessionHandle::parse).transpose()?, + mode: ConsultMode::from_background(args.background), }) } + fn mode(&self) -> ConsultMode { + self.mode + } + fn session_mode(&self) -> &'static str { if self.session.is_some() { "resumed" @@ -136,6 +171,38 @@ impl ConsultRequest { fn requested_session_id(&self) -> Option { self.session.as_ref().map(SessionHandle::display) } + + fn background_request(&self) -> BackgroundConsultRequest { + BackgroundConsultRequest { + prompt: self.prompt.as_str().to_owned(), + cwd: self.cwd.display(), + max_turns: self.max_turns.map(TurnLimit::get), + session_id: self.requested_session_id(), + } + } +} + +#[derive(Debug, Clone, Copy, Eq, PartialEq)] +enum ConsultMode { + Sync, + Background, +} + +impl ConsultMode { + fn from_background(raw: Option) -> Self { + if raw.unwrap_or(false) { + Self::Background + } else { + Self::Sync + } + } + + fn as_str(self) -> &'static str { + match self { + Self::Sync => "sync", + Self::Background => "background", + } + } } #[derive(Debug, Clone)] @@ -232,6 +299,172 @@ impl SessionHandle { } } +#[derive(Debug, Clone, Deserialize, Eq, PartialEq, Serialize)] +struct BackgroundConsultRequest { + prompt: String, + cwd: String, + max_turns: Option, + session_id: Option, +} + +impl BackgroundConsultRequest { + fn into_consult_request(self) -> Result { + ConsultRequest::parse(ConsultArgs { + prompt: self.prompt, + cwd: Some(self.cwd), + max_turns: self.max_turns, + session_id: self.session_id, + background: Some(false), + }) + } +} + +#[derive(Debug, Clone, Copy, Deserialize, Eq, PartialEq, Serialize)] +#[serde(rename_all = "snake_case")] +enum BackgroundConsultStatus { + Queued, + Running, + Succeeded, + Failed, +} + +impl BackgroundConsultStatus { + fn done(self) -> bool { + matches!(self, Self::Succeeded | Self::Failed) + } + + fn success(self) -> bool { + matches!(self, Self::Succeeded) + } + + fn failed(self) -> bool { + matches!(self, Self::Failed) + } +} + +#[derive(Debug, Clone, Deserialize, Eq, Ord, PartialEq, PartialOrd, Serialize)] +#[serde(transparent)] +struct BackgroundConsultJobId(Uuid); + +impl BackgroundConsultJobId { + fn new() -> Self { + Self(Uuid::new_v4()) + } + + fn parse(raw: String) -> Result { + Uuid::parse_str(&raw) + .map(Self) + .map_err(|_| ConsultRequestError::InvalidJobHandle(raw)) + } + + fn display(&self) -> String { + self.0.to_string() + } +} + +#[derive(Debug, Clone, Deserialize, Eq, PartialEq, Serialize)] +struct BackgroundConsultFailure { + class: String, + detail: String, +} + +#[derive(Debug, Clone, Deserialize, PartialEq, Serialize)] +struct BackgroundConsultResponseRecord { + cwd: String, + response: String, + duration_ms: u64, + duration_api_ms: Option, + num_turns: u64, + stop_reason: Option, + session_id: Option, + total_cost_usd: Option, + usage: Option, + model_usage: Option, + permission_denials: Vec, + fast_mode_state: Option, + uuid: Option, +} + +impl BackgroundConsultResponseRecord { + fn from_response(response: ConsultResponse) -> Self { + Self { + cwd: response.cwd.display(), + response: response.result, + duration_ms: response.duration_ms, + duration_api_ms: response.duration_api_ms, + num_turns: response.num_turns, + stop_reason: response.stop_reason, + session_id: response.session_id, + total_cost_usd: response.total_cost_usd, + usage: response.usage, + model_usage: response.model_usage, + permission_denials: response.permission_denials, + fast_mode_state: response.fast_mode_state, + uuid: response.uuid, + } + } + + fn model_name(&self) -> Option { + let Value::Object(models) = self.model_usage.as_ref()? else { + return None; + }; + models.keys().next().cloned() + } +} + +#[derive(Debug, Clone, Deserialize, PartialEq, Serialize)] +struct BackgroundConsultJobRecord { + job_id: BackgroundConsultJobId, + status: BackgroundConsultStatus, + created_unix_ms: u64, + updated_unix_ms: u64, + started_unix_ms: Option, + finished_unix_ms: Option, + runner_pid: Option, + request: BackgroundConsultRequest, + prompt_prefix_injected: bool, + result: Option, + failure: Option, +} + +impl BackgroundConsultJobRecord { + fn new(job_id: BackgroundConsultJobId, request: BackgroundConsultRequest) -> Self { + let now = unix_ms_now(); + Self { + job_id, + status: BackgroundConsultStatus::Queued, + created_unix_ms: now, + updated_unix_ms: now, + started_unix_ms: None, + finished_unix_ms: None, + runner_pid: None, + request, + prompt_prefix_injected: true, + result: None, + failure: None, + } + } + + fn summary(&self) -> Value { + json!({ + "job_id": self.job_id.display(), + "status": self.status, + "done": self.status.done(), + "succeeded": self.status.success(), + "failed": self.status.failed(), + "created_unix_ms": self.created_unix_ms, + "updated_unix_ms": self.updated_unix_ms, + "started_unix_ms": self.started_unix_ms, + "finished_unix_ms": self.finished_unix_ms, + "runner_pid": self.runner_pid, + "cwd": self.request.cwd, + "max_turns": self.request.max_turns, + "requested_session_id": self.request.session_id, + "prompt_prefix_injected": self.prompt_prefix_injected, + }) + } +} + #[derive(Debug, Error)] enum ConsultRequestError { #[error("prompt must not be empty")] @@ -246,6 +479,8 @@ enum ConsultRequestError { InvalidTurnLimit, #[error("session_id must be a valid UUID, got `{0}`")] InvalidSessionHandle(String), + #[error("job_id must be a valid UUID, got `{0}`")] + InvalidJobHandle(String), } #[derive(Debug, Error)] @@ -348,6 +583,323 @@ fn consult_fault( } } +pub(crate) fn consult_job_tool_output( + arguments: Value, + generation: Generation, + stage: FaultStage, + operation: &str, +) -> Result { + let args = deserialize::(arguments, operation, generation)?; + let job_id = BackgroundConsultJobId::parse(args.job_id) + .map_err(|error| invalid_consult_request(generation, operation, error))?; + let record = load_background_consult_job(&job_id).map_err(|error| { + FaultRecord::downstream(generation, stage, operation, error.to_string()) + })?; + background_job_tool_output(&record, generation, stage, operation) +} + +pub(crate) fn consult_jobs_tool_output( + arguments: Value, + generation: Generation, + stage: FaultStage, + operation: &str, +) -> Result { + let args = deserialize::(arguments, operation, generation)?; + let limit = args.limit.unwrap_or(10).clamp(1, 50); + let jobs = load_recent_background_consult_jobs(limit as usize).map_err(|error| { + FaultRecord::downstream(generation, stage, operation, error.to_string()) + })?; + let concise = json!({ + "jobs": jobs.iter().map(BackgroundConsultJobRecord::summary).collect::>(), + "count": jobs.len(), + }); + let full = json!({ + "jobs": jobs, + "count": jobs.len(), + }); + let lines = if jobs.is_empty() { + "no background consult jobs".to_owned() + } else { + jobs.iter() + .map(|job| { + format!( + "{} status={} cwd={}", + job.job_id.display(), + serde_json::to_string(&job.status).unwrap_or_else(|_| "\"unknown\"".to_owned()), + job.request.cwd + ) + }) + .collect::>() + .join("\n") + }; + fallback_detailed_tool_output( + &concise, + &full, + lines, + None, + SurfaceKind::Read, + generation, + stage, + operation, + ) +} + +fn submit_background_consult( + request: &ConsultRequest, + generation: Generation, + stage: FaultStage, + operation: &str, +) -> Result { + let job_id = BackgroundConsultJobId::new(); + let mut record = BackgroundConsultJobRecord::new(job_id, request.background_request()); + persist_background_consult_job(&record) + .map_err(|error| FaultRecord::process(generation, stage, operation, error.to_string()))?; + + let executable = std::env::current_exe() + .map_err(|error| FaultRecord::process(generation, stage, operation, error.to_string()))?; + let job_file = background_consult_job_path(&record.job_id) + .map_err(|error| FaultRecord::process(generation, stage, operation, error.to_string()))?; + let child = Command::new(executable) + .arg("mcp") + .arg("background-consult") + .arg("--job-file") + .arg(&job_file) + .stdin(Stdio::null()) + .stdout(Stdio::null()) + .stderr(Stdio::null()) + .spawn() + .map_err(|error| FaultRecord::process(generation, stage, operation, error.to_string()))?; + + record.status = BackgroundConsultStatus::Running; + record.started_unix_ms = Some(unix_ms_now()); + record.updated_unix_ms = unix_ms_now(); + record.runner_pid = Some(child.id()); + persist_background_consult_job(&record) + .map_err(|error| FaultRecord::process(generation, stage, operation, error.to_string()))?; + + let concise = json!({ + "mode": request.mode().as_str(), + "job_id": record.job_id.display(), + "status": record.status, + "done": false, + "requested_session_id": request.requested_session_id(), + "session_mode": request.session_mode(), + "prompt_prefix_injected": true, + "follow_up_tools": ["consult_job", "consult_jobs"], + }); + let full = json!({ + "mode": request.mode().as_str(), + "job_id": record.job_id.display(), + "status": record.status, + "done": false, + "requested_session_id": request.requested_session_id(), + "session_mode": request.session_mode(), + "prompt_prefix_injected": true, + "prompt": request.prompt.as_str(), + "effective_prompt": request.prompt.rendered(), + "cwd": request.cwd.display(), + "max_turns": request.max_turns.map(TurnLimit::get), + "follow_up_tools": ["consult_job", "consult_jobs"], + }); + fallback_detailed_tool_output( + &concise, + &full, + format!( + "background consult submitted job={} status=running", + record.job_id.display() + ), + None, + SurfaceKind::Read, + generation, + stage, + operation, + ) +} + +fn execute_background_consult(job_file: PathBuf) -> io::Result<()> { + let mut record = read_json_file::(&job_file)?; + record.status = BackgroundConsultStatus::Running; + let _ = record.runner_pid.get_or_insert(std::process::id()); + let _ = record.started_unix_ms.get_or_insert_with(unix_ms_now); + record.updated_unix_ms = unix_ms_now(); + persist_background_consult_job_to_path(&job_file, &record)?; + + let request = record + .request + .clone() + .into_consult_request() + .map_err(|error| io::Error::other(error.to_string()))?; + match invoke_claude(&request) { + Ok(response) => { + record.status = BackgroundConsultStatus::Succeeded; + record.result = Some(BackgroundConsultResponseRecord::from_response(response)); + record.failure = None; + } + Err(error) => { + record.status = BackgroundConsultStatus::Failed; + record.result = None; + record.failure = Some(background_failure(error)); + } + } + record.finished_unix_ms = Some(unix_ms_now()); + record.updated_unix_ms = unix_ms_now(); + record.runner_pid = None; + persist_background_consult_job_to_path(&job_file, &record) +} + +fn background_failure(error: ConsultInvocationError) -> BackgroundConsultFailure { + match error { + ConsultInvocationError::Spawn(source) => BackgroundConsultFailure { + class: "process".to_owned(), + detail: source.to_string(), + }, + ConsultInvocationError::InvalidJson(detail) + | ConsultInvocationError::Downstream(detail) => BackgroundConsultFailure { + class: "downstream".to_owned(), + detail, + }, + } +} + +fn background_job_tool_output( + record: &BackgroundConsultJobRecord, + generation: Generation, + stage: FaultStage, + operation: &str, +) -> Result { + let concise = json!({ + "job_id": record.job_id.display(), + "status": record.status, + "done": record.status.done(), + "succeeded": record.status.success(), + "failed": record.status.failed(), + "created_unix_ms": record.created_unix_ms, + "updated_unix_ms": record.updated_unix_ms, + "started_unix_ms": record.started_unix_ms, + "finished_unix_ms": record.finished_unix_ms, + "runner_pid": record.runner_pid, + "cwd": record.request.cwd, + "requested_session_id": record.request.session_id, + "prompt_prefix_injected": record.prompt_prefix_injected, + "result": record.result.as_ref().map(|result| json!({ + "response": result.response, + "duration_ms": result.duration_ms, + "num_turns": result.num_turns, + "session_id": result.session_id, + "model": result.model_name(), + })), + "failure": record.failure, + }); + let full = json!({ + "job": record, + }); + let mut lines = vec![format!( + "job={} status={:?}", + record.job_id.display(), + record.status + )]; + if let Some(result) = record.result.as_ref() { + lines.push(format!( + "result ready model={} turns={} duration={}", + result.model_name().unwrap_or_else(|| "unknown".to_owned()), + result.num_turns, + render_duration_ms(result.duration_ms) + )); + lines.push("response:".to_owned()); + lines.push(result.response.clone()); + } + if let Some(failure) = record.failure.as_ref() { + lines.push(format!("failure={} {}", failure.class, failure.detail)); + } + fallback_detailed_tool_output( + &concise, + &full, + lines.join("\n"), + None, + SurfaceKind::Read, + generation, + stage, + operation, + ) +} + +fn background_consult_job_root() -> io::Result { + let root = state_dir() + .map(|root| root.join("phone_opus")) + .or_else(|| home_dir().map(|home| home.join(".local").join("state").join("phone_opus"))) + .ok_or_else(|| { + io::Error::new( + io::ErrorKind::NotFound, + "failed to resolve phone_opus state root", + ) + })?; + let path = root.join("mcp").join("consult_jobs"); + fs::create_dir_all(&path)?; + Ok(path) +} + +fn background_consult_job_path(job_id: &BackgroundConsultJobId) -> io::Result { + Ok(background_consult_job_root()?.join(format!("{}.json", job_id.display()))) +} + +fn persist_background_consult_job(record: &BackgroundConsultJobRecord) -> io::Result<()> { + let path = background_consult_job_path(&record.job_id)?; + persist_background_consult_job_to_path(&path, record) +} + +fn persist_background_consult_job_to_path( + path: &Path, + record: &BackgroundConsultJobRecord, +) -> io::Result<()> { + write_json_file(path, record) +} + +fn load_background_consult_job( + job_id: &BackgroundConsultJobId, +) -> io::Result { + read_json_file(background_consult_job_path(job_id)?.as_path()) +} + +fn load_recent_background_consult_jobs( + limit: usize, +) -> io::Result> { + let mut jobs = fs::read_dir(background_consult_job_root()?)? + .filter_map(Result::ok) + .map(|entry| entry.path()) + .filter(|path| path.extension().and_then(|ext| ext.to_str()) == Some("json")) + .filter_map(|path| read_json_file::(&path).ok()) + .collect::>(); + jobs.sort_by(|left, right| { + right + .updated_unix_ms + .cmp(&left.updated_unix_ms) + .then_with(|| left.job_id.cmp(&right.job_id)) + }); + jobs.truncate(limit); + Ok(jobs) +} + +fn read_json_file Deserialize<'de>>(path: &Path) -> io::Result { + let bytes = fs::read(path)?; + serde_json::from_slice(&bytes).map_err(|error| io::Error::other(error.to_string())) +} + +fn write_json_file(path: &Path, value: &T) -> io::Result<()> { + let payload = + serde_json::to_vec_pretty(value).map_err(|error| io::Error::other(error.to_string()))?; + let temp_path = path.with_extension("json.tmp"); + fs::write(&temp_path, payload)?; + fs::rename(temp_path, path)?; + Ok(()) +} + +fn unix_ms_now() -> u64 { + let duration = SystemTime::now() + .duration_since(UNIX_EPOCH) + .unwrap_or(Duration::ZERO); + u64::try_from(duration.as_millis()).unwrap_or(u64::MAX) +} + fn invoke_claude(request: &ConsultRequest) -> Result { let mut command = Command::new(claude_binary()); let _ = command @@ -452,6 +1004,7 @@ fn consult_output( operation: &str, ) -> Result { let concise = json!({ + "mode": request.mode().as_str(), "response": response.result, "cwd": response.cwd.display(), "session_mode": request.session_mode(), @@ -466,6 +1019,7 @@ fn consult_output( "permission_denial_count": response.permission_denials.len(), }); let full = json!({ + "mode": request.mode().as_str(), "response": response.result, "cwd": response.cwd.display(), "prompt": request.prompt.as_str(), -- cgit v1.2.3