diff options
Diffstat (limited to 'crates/phone-opus/src/mcp/service.rs')
| -rw-r--r-- | crates/phone-opus/src/mcp/service.rs | 886 |
1 files changed, 9 insertions, 877 deletions
diff --git a/crates/phone-opus/src/mcp/service.rs b/crates/phone-opus/src/mcp/service.rs index afb811e..5f6ab99 100644 --- a/crates/phone-opus/src/mcp/service.rs +++ b/crates/phone-opus/src/mcp/service.rs @@ -12,7 +12,7 @@ use std::time::{Duration, Instant, SystemTime, UNIX_EPOCH}; use dirs::{home_dir, state_dir}; use libmcp::{Generation, SurfaceKind}; use serde::{Deserialize, Serialize}; -use serde_json::{Map, Value, json}; +use serde_json::{Value, json}; use thiserror::Error; use time::{OffsetDateTime, format_description::well_known::Rfc3339}; use users::get_current_uid; @@ -56,11 +56,6 @@ pub(crate) fn run_worker(generation: u64) -> Result<(), Box<dyn std::error::Erro Ok(()) } -pub(crate) fn run_background_consult(job_file: PathBuf) -> Result<(), Box<dyn std::error::Error>> { - execute_background_consult(job_file)?; - Ok(()) -} - struct WorkerService { generation: Generation, } @@ -119,98 +114,26 @@ struct ConsultArgs { cwd: Option<String>, } -#[derive(Debug, Deserialize)] -struct ConsultJobArgs { - job_id: String, -} - -#[derive(Debug, Deserialize)] -struct ConsultWaitArgs { - job_id: String, - timeout_ms: Option<u64>, - poll_interval_ms: Option<u64>, -} - -#[derive(Debug, Default, Deserialize)] -struct ConsultJobsArgs { - limit: Option<u64>, -} - -const DEFAULT_CONSULT_WAIT_TIMEOUT_MS: u64 = 30 * 60 * 1_000; -const DEFAULT_CONSULT_WAIT_POLL_INTERVAL_MS: u64 = 1_000; -const MIN_CONSULT_WAIT_POLL_INTERVAL_MS: u64 = 10; - #[derive(Debug, Clone)] struct ConsultRequest { prompt: PromptText, cwd: WorkingDirectory, - context_key: ConsultContextKey, - session_plan: ConsultSessionPlan, + session: SessionHandle, } impl ConsultRequest { fn parse(args: ConsultArgs) -> Result<Self, ConsultRequestError> { let prompt = PromptText::parse(args.prompt)?; let cwd = WorkingDirectory::resolve(args.cwd)?; - let context_key = ConsultContextKey::from_cwd(&cwd); - let session_plan = ConsultSessionPlan::fresh(); Ok(Self { prompt, cwd, - context_key, - session_plan, + session: SessionHandle::fresh(), }) } - fn planned_session_id(&self) -> String { - self.session_plan.planned_session().display() - } - - #[allow( - dead_code, - reason = "session metadata is retained internally but hidden from the public surface" - )] - fn context_mode(&self) -> &'static str { - self.session_plan.context_mode() - } - - #[allow( - dead_code, - reason = "session metadata is retained internally but hidden from the public surface" - )] - fn reused_session_id(&self) -> Option<String> { - self.session_plan.reused_session_id() - } - - fn launch_session_id(&self) -> Option<String> { - match self.session_plan { - ConsultSessionPlan::Start { .. } => Some(self.planned_session_id()), - ConsultSessionPlan::Resume(_) => None, - } - } - - fn remember_context(&self, session_id: Option<&str>) -> io::Result<()> { - confirm_consult_context( - &self.context_key, - session_id, - self.session_plan.planned_session(), - ) - } - - fn remember_planned_context(&self) -> io::Result<()> { - if let ConsultSessionPlan::Start { session, .. } = &self.session_plan { - remember_planned_consult_context(&self.context_key, session) - } else { - Ok(()) - } - } - - #[allow(dead_code, reason = "background submission is parked but not exposed")] - fn background_request(&self) -> BackgroundConsultRequest { - BackgroundConsultRequest { - prompt: self.prompt.as_str().to_owned(), - cwd: self.cwd.display(), - } + fn session_id(&self) -> String { + self.session.display() } } @@ -310,353 +233,11 @@ impl SessionHandle { Self(Uuid::new_v4()) } - fn parse(raw: &str) -> Option<Self> { - Uuid::parse_str(raw).ok().map(Self) - } - - fn display(&self) -> String { - self.0.to_string() - } -} - -#[derive(Debug, Clone, Eq, Ord, PartialEq, PartialOrd)] -struct ConsultContextKey(String); - -impl ConsultContextKey { - fn from_cwd(cwd: &WorkingDirectory) -> Self { - Self(cwd.display()) - } - - fn as_str(&self) -> &str { - self.0.as_str() - } -} - -#[derive(Debug, Clone, Deserialize, Serialize)] -struct StoredConsultContext { - session_id: String, - #[serde(default = "default_consult_context_state")] - state: StoredConsultContextState, - updated_unix_ms: u64, -} - -#[derive(Debug, Clone, Copy, Default, Deserialize, Eq, PartialEq, Serialize)] -#[serde(rename_all = "snake_case")] -enum StoredConsultContextState { - Planned, - #[default] - Confirmed, -} - -const fn default_consult_context_state() -> StoredConsultContextState { - StoredConsultContextState::Confirmed -} - -#[derive(Debug, Default, Deserialize, Serialize)] -struct ConsultContextIndex { - by_cwd: BTreeMap<String, StoredConsultContext>, -} - -impl ConsultContextIndex { - #[allow( - dead_code, - reason = "context lookup is parked while session reuse stays disabled" - )] - fn context_for(&self, key: &ConsultContextKey) -> Option<StoredConsultContext> { - self.by_cwd.get(key.as_str()).cloned() - } - - fn remember( - &mut self, - key: &ConsultContextKey, - session: &SessionHandle, - state: StoredConsultContextState, - ) { - let _ = self.by_cwd.insert( - key.as_str().to_owned(), - StoredConsultContext { - session_id: session.display(), - state, - updated_unix_ms: unix_ms_now(), - }, - ); - } -} - -#[derive(Debug, Clone)] -enum ConsultSessionPlan { - Start { - session: SessionHandle, - reused: bool, - }, - #[allow( - dead_code, - reason = "resume plans are parked while one-shot consults are enforced" - )] - Resume(SessionHandle), -} - -impl ConsultSessionPlan { - fn fresh() -> Self { - Self::Start { - session: SessionHandle::fresh(), - reused: false, - } - } - - #[allow( - dead_code, - reason = "stored-session revival is parked while one-shot consults are enforced" - )] - fn from_stored(context: StoredConsultContext) -> Option<Self> { - let session = SessionHandle::parse(context.session_id.as_str())?; - Some(match context.state { - StoredConsultContextState::Planned => Self::Start { - session, - reused: true, - }, - StoredConsultContextState::Confirmed => Self::Resume(session), - }) - } - - fn planned_session(&self) -> &SessionHandle { - match self { - Self::Start { session, .. } | Self::Resume(session) => session, - } - } - - #[allow( - dead_code, - reason = "resume paths are parked while one-shot consults are enforced" - )] - fn resume_session(&self) -> Option<&SessionHandle> { - match self { - Self::Resume(session) => Some(session), - Self::Start { .. } => None, - } - } - - fn context_mode(&self) -> &'static str { - match self { - Self::Start { reused: false, .. } => "fresh", - Self::Start { reused: true, .. } | Self::Resume(_) => "reused", - } - } - - fn reused_session_id(&self) -> Option<String> { - match self { - Self::Start { - reused: true, - session, - } - | Self::Resume(session) => Some(session.display()), - Self::Start { reused: false, .. } => None, - } - } -} - -#[derive(Debug, Clone, Deserialize, Eq, PartialEq, Serialize)] -struct BackgroundConsultRequest { - prompt: String, - cwd: String, -} - -impl BackgroundConsultRequest { - fn into_consult_request(self) -> Result<ConsultRequest, ConsultRequestError> { - ConsultRequest::parse(ConsultArgs { - prompt: self.prompt, - cwd: Some(self.cwd), - }) - } -} - -#[derive(Debug, Clone, Copy, Deserialize, Eq, PartialEq, Serialize)] -#[serde(rename_all = "snake_case")] -enum BackgroundConsultStatus { - Queued, - Running, - Succeeded, - Failed, -} - -impl BackgroundConsultStatus { - fn done(self) -> bool { - matches!(self, Self::Succeeded | Self::Failed) - } - - fn success(self) -> bool { - matches!(self, Self::Succeeded) - } - - fn failed(self) -> bool { - matches!(self, Self::Failed) - } -} - -#[derive(Debug, Clone, Deserialize, Eq, Ord, PartialEq, PartialOrd, Serialize)] -#[serde(transparent)] -struct BackgroundConsultJobId(Uuid); - -impl BackgroundConsultJobId { - fn new() -> Self { - Self(Uuid::new_v4()) - } - - fn parse(raw: String) -> Result<Self, ConsultRequestError> { - Uuid::parse_str(&raw) - .map(Self) - .map_err(|_| ConsultRequestError::InvalidJobHandle(raw)) - } - fn display(&self) -> String { self.0.to_string() } } -#[derive(Debug, Clone, Deserialize, Eq, PartialEq, Serialize)] -struct BackgroundConsultFailure { - class: String, - detail: String, -} - -#[derive(Debug, Clone, Deserialize, PartialEq, Serialize)] -struct BackgroundConsultResponseRecord { - cwd: String, - response: String, - persisted_output_path: String, - duration_ms: u64, - duration_api_ms: Option<u64>, - num_turns: u64, - stop_reason: Option<String>, - session_id: Option<String>, - total_cost_usd: Option<f64>, - usage: Option<Value>, - model_usage: Option<Value>, - permission_denials: Vec<Value>, - fast_mode_state: Option<String>, - uuid: Option<String>, -} - -impl BackgroundConsultResponseRecord { - fn from_response(response: ConsultResponse) -> Self { - Self { - cwd: response.cwd.display(), - response: response.result, - persisted_output_path: response.persisted_output_path.display(), - duration_ms: response.duration_ms, - duration_api_ms: response.duration_api_ms, - num_turns: response.num_turns, - stop_reason: response.stop_reason, - session_id: response.session_id, - total_cost_usd: response.total_cost_usd, - usage: response.usage, - model_usage: response.model_usage, - permission_denials: response.permission_denials, - fast_mode_state: response.fast_mode_state, - uuid: response.uuid, - } - } - - fn model_name(&self) -> Option<String> { - model_name(self.model_usage.as_ref()) - } -} - -#[derive(Debug, Clone, Deserialize, PartialEq, Serialize)] -struct BackgroundConsultJobRecord { - job_id: BackgroundConsultJobId, - status: BackgroundConsultStatus, - created_unix_ms: u64, - updated_unix_ms: u64, - started_unix_ms: Option<u64>, - finished_unix_ms: Option<u64>, - runner_pid: Option<u32>, - request: BackgroundConsultRequest, - prompt_prefix_injected: bool, - result: Option<BackgroundConsultResponseRecord>, - failure: Option<BackgroundConsultFailure>, -} - -#[derive(Debug, Clone, Copy, Eq, PartialEq)] -struct ConsultWaitConfig { - timeout_ms: u64, - poll_interval_ms: u64, -} - -impl ConsultWaitConfig { - fn parse(timeout_ms: Option<u64>, poll_interval_ms: Option<u64>) -> Self { - Self { - timeout_ms: timeout_ms.unwrap_or(DEFAULT_CONSULT_WAIT_TIMEOUT_MS), - poll_interval_ms: poll_interval_ms - .unwrap_or(DEFAULT_CONSULT_WAIT_POLL_INTERVAL_MS) - .max(MIN_CONSULT_WAIT_POLL_INTERVAL_MS), - } - } -} - -#[derive(Debug, Clone)] -struct BackgroundConsultWaitRequest { - job_id: BackgroundConsultJobId, - config: ConsultWaitConfig, -} - -impl BackgroundConsultWaitRequest { - fn parse(args: ConsultWaitArgs) -> Result<Self, ConsultRequestError> { - Ok(Self { - job_id: BackgroundConsultJobId::parse(args.job_id)?, - config: ConsultWaitConfig::parse(args.timeout_ms, args.poll_interval_ms), - }) - } -} - -#[derive(Debug, Clone, Copy, Eq, PartialEq)] -struct BackgroundConsultWaitMetadata { - waited_ms: u64, - timed_out: bool, -} - -#[derive(Debug, Clone)] -struct BackgroundConsultWaitOutcome { - record: BackgroundConsultJobRecord, - metadata: BackgroundConsultWaitMetadata, -} - -impl BackgroundConsultJobRecord { - fn new(job_id: BackgroundConsultJobId, request: BackgroundConsultRequest) -> Self { - let now = unix_ms_now(); - Self { - job_id, - status: BackgroundConsultStatus::Queued, - created_unix_ms: now, - updated_unix_ms: now, - started_unix_ms: None, - finished_unix_ms: None, - runner_pid: None, - request, - prompt_prefix_injected: true, - result: None, - failure: None, - } - } - - fn summary(&self) -> Value { - json!({ - "job_id": self.job_id.display(), - "status": self.status, - "done": self.status.done(), - "succeeded": self.status.success(), - "failed": self.status.failed(), - "created_unix_ms": self.created_unix_ms, - "updated_unix_ms": self.updated_unix_ms, - "started_unix_ms": self.started_unix_ms, - "finished_unix_ms": self.finished_unix_ms, - "runner_pid": self.runner_pid, - "cwd": self.request.cwd, - "prompt_prefix_injected": self.prompt_prefix_injected, - }) - } -} - #[derive(Debug, Error)] enum ConsultRequestError { #[error("prompt must not be empty")] @@ -667,14 +248,6 @@ enum ConsultRequestError { Canonicalize { path: String, source: io::Error }, #[error("working directory `{0}` is not a directory")] NotDirectory(String), - #[allow( - dead_code, - reason = "context index loading is parked while one-shot consults are enforced" - )] - #[error("failed to resolve consult context state: {source}")] - ContextIndex { source: io::Error }, - #[error("job_id must be a valid UUID, got `{0}`")] - InvalidJobHandle(String), } #[derive(Debug, Error)] @@ -700,7 +273,6 @@ struct ClaudeJsonEnvelope { num_turns: Option<u64>, result: Option<String>, stop_reason: Option<String>, - session_id: Option<String>, total_cost_usd: Option<f64>, usage: Option<Value>, #[serde(rename = "modelUsage")] @@ -716,31 +288,10 @@ struct ConsultResponse { cwd: WorkingDirectory, result: String, persisted_output_path: PersistedConsultPath, - #[allow( - dead_code, - reason = "session metadata is retained internally but hidden from the public surface" - )] - context_mode: &'static str, - #[allow( - dead_code, - reason = "session metadata is retained internally but hidden from the public surface" - )] - planned_session_id: String, - #[allow( - dead_code, - reason = "session metadata is retained internally but hidden from the public surface" - )] - reused_session_id: Option<String>, duration_ms: u64, duration_api_ms: Option<u64>, num_turns: u64, stop_reason: Option<String>, - session_id: Option<String>, - #[allow( - dead_code, - reason = "session metadata is retained internally but hidden from the public surface" - )] - observed_session_id: Option<String>, total_cost_usd: Option<f64>, usage: Option<Value>, model_usage: Option<Value>, @@ -902,7 +453,6 @@ const SYSTEMD_RUN_BINARY: &str = "systemd-run"; const SYSTEMCTL_BINARY: &str = "systemctl"; const DEFAULT_PATH: &str = "/usr/local/bin:/usr/bin:/bin"; const PHONE_OPUS_STATE_ROOT_NAME: &str = "phone_opus"; -const CONSULT_CONTEXT_INDEX_FILE_NAME: &str = "consult_contexts.json"; const CLAUDE_HOME_DIR_NAME: &str = "claude-home"; const XDG_CONFIG_DIR_NAME: &str = "xdg-config"; const XDG_CACHE_DIR_NAME: &str = "xdg-cache"; @@ -1210,359 +760,6 @@ fn consult_retry_hint(quota_limited: bool, error: &ConsultInvocationError) -> Op } } -pub(crate) fn consult_job_tool_output( - arguments: Value, - generation: Generation, - stage: FaultStage, - operation: &str, -) -> Result<ToolOutput, FaultRecord> { - let args = deserialize::<ConsultJobArgs>(arguments, operation, generation)?; - let job_id = BackgroundConsultJobId::parse(args.job_id) - .map_err(|error| invalid_consult_request(generation, operation, error))?; - let record = load_background_consult_job(&job_id).map_err(|error| { - FaultRecord::downstream(generation, stage, operation, error.to_string()) - })?; - background_job_tool_output(&record, None, generation, stage, operation) -} - -pub(crate) fn consult_wait_tool_output( - arguments: Value, - generation: Generation, - stage: FaultStage, - operation: &str, -) -> Result<ToolOutput, FaultRecord> { - let args = deserialize::<ConsultWaitArgs>(arguments, operation, generation)?; - let request = BackgroundConsultWaitRequest::parse(args) - .map_err(|error| invalid_consult_request(generation, operation, error))?; - let outcome = wait_for_background_consult(&request).map_err(|error| { - FaultRecord::downstream(generation, stage, operation, error.to_string()) - })?; - background_job_tool_output( - &outcome.record, - Some(outcome.metadata), - generation, - stage, - operation, - ) -} - -pub(crate) fn consult_jobs_tool_output( - arguments: Value, - generation: Generation, - stage: FaultStage, - operation: &str, -) -> Result<ToolOutput, FaultRecord> { - let args = deserialize::<ConsultJobsArgs>(arguments, operation, generation)?; - let limit = args.limit.unwrap_or(10).clamp(1, 50); - let jobs = load_recent_background_consult_jobs(limit as usize).map_err(|error| { - FaultRecord::downstream(generation, stage, operation, error.to_string()) - })?; - let concise = json!({ - "jobs": jobs.iter().map(BackgroundConsultJobRecord::summary).collect::<Vec<_>>(), - "count": jobs.len(), - }); - let full = json!({ - "jobs": jobs, - "count": jobs.len(), - }); - let lines = if jobs.is_empty() { - "no background consult jobs".to_owned() - } else { - jobs.iter() - .map(|job| { - format!( - "{} status={} cwd={}", - job.job_id.display(), - serde_json::to_string(&job.status).unwrap_or_else(|_| "\"unknown\"".to_owned()), - job.request.cwd - ) - }) - .collect::<Vec<_>>() - .join("\n") - }; - fallback_detailed_tool_output( - &concise, - &full, - lines, - None, - SurfaceKind::Read, - generation, - stage, - operation, - ) -} - -#[allow(dead_code, reason = "background submission is parked but not exposed")] -fn submit_background_consult( - request: &ConsultRequest, - generation: Generation, - stage: FaultStage, - operation: &str, -) -> Result<ToolOutput, FaultRecord> { - let job_id = BackgroundConsultJobId::new(); - let mut record = BackgroundConsultJobRecord::new(job_id, request.background_request()); - persist_background_consult_job(&record) - .map_err(|error| FaultRecord::process(generation, stage, operation, error.to_string()))?; - - let executable = std::env::current_exe() - .map_err(|error| FaultRecord::process(generation, stage, operation, error.to_string()))?; - let job_file = background_consult_job_path(&record.job_id) - .map_err(|error| FaultRecord::process(generation, stage, operation, error.to_string()))?; - let child = Command::new(executable) - .arg("mcp") - .arg("background-consult") - .arg("--job-file") - .arg(&job_file) - .stdin(Stdio::null()) - .stdout(Stdio::null()) - .stderr(Stdio::null()) - .spawn() - .map_err(|error| FaultRecord::process(generation, stage, operation, error.to_string()))?; - - record.status = BackgroundConsultStatus::Running; - record.started_unix_ms = Some(unix_ms_now()); - record.updated_unix_ms = unix_ms_now(); - record.runner_pid = Some(child.id()); - persist_background_consult_job(&record) - .map_err(|error| FaultRecord::process(generation, stage, operation, error.to_string()))?; - - let concise = json!({ - "mode": "background", - "job_id": record.job_id.display(), - "status": record.status, - "done": false, - "reused_session_id": request.reused_session_id(), - "context_mode": request.context_mode(), - "prompt_prefix_injected": true, - "follow_up_tools": ["consult_wait", "consult_job", "consult_jobs"], - }); - let full = json!({ - "mode": "background", - "job_id": record.job_id.display(), - "status": record.status, - "done": false, - "reused_session_id": request.reused_session_id(), - "context_mode": request.context_mode(), - "prompt_prefix_injected": true, - "prompt": request.prompt.as_str(), - "effective_prompt": request.prompt.rendered(), - "cwd": request.cwd.display(), - "follow_up_tools": ["consult_wait", "consult_job", "consult_jobs"], - }); - fallback_detailed_tool_output( - &concise, - &full, - format!( - "background consult submitted job={} status=running", - record.job_id.display() - ), - None, - SurfaceKind::Read, - generation, - stage, - operation, - ) -} - -fn execute_background_consult(job_file: PathBuf) -> io::Result<()> { - let mut record = read_json_file::<BackgroundConsultJobRecord>(&job_file)?; - record.status = BackgroundConsultStatus::Running; - let _ = record.runner_pid.get_or_insert(std::process::id()); - let _ = record.started_unix_ms.get_or_insert_with(unix_ms_now); - record.updated_unix_ms = unix_ms_now(); - persist_background_consult_job_to_path(&job_file, &record)?; - - let request = record - .request - .clone() - .into_consult_request() - .map_err(|error| io::Error::other(error.to_string()))?; - match invoke_claude(&request) { - Ok(response) => { - record.status = BackgroundConsultStatus::Succeeded; - record.result = Some(BackgroundConsultResponseRecord::from_response(response)); - record.failure = None; - } - Err(error) => { - record.status = BackgroundConsultStatus::Failed; - record.result = None; - record.failure = Some(background_failure(error)); - } - } - record.finished_unix_ms = Some(unix_ms_now()); - record.updated_unix_ms = unix_ms_now(); - record.runner_pid = None; - persist_background_consult_job_to_path(&job_file, &record) -} - -fn background_failure(error: ConsultInvocationError) -> BackgroundConsultFailure { - match error { - ConsultInvocationError::Spawn(source) => BackgroundConsultFailure { - class: "process".to_owned(), - detail: source.to_string(), - }, - ConsultInvocationError::InvalidJson(detail) - | ConsultInvocationError::Stalled(detail) - | ConsultInvocationError::Downstream(detail) => BackgroundConsultFailure { - class: "downstream".to_owned(), - detail, - }, - } -} - -fn wait_for_background_consult( - request: &BackgroundConsultWaitRequest, -) -> io::Result<BackgroundConsultWaitOutcome> { - let started_at = Instant::now(); - loop { - let record = load_background_consult_job(&request.job_id)?; - let waited_ms = elapsed_duration_ms(started_at.elapsed()); - if record.status.done() { - return Ok(BackgroundConsultWaitOutcome { - record, - metadata: BackgroundConsultWaitMetadata { - waited_ms, - timed_out: false, - }, - }); - } - if waited_ms >= request.config.timeout_ms { - return Ok(BackgroundConsultWaitOutcome { - record, - metadata: BackgroundConsultWaitMetadata { - waited_ms, - timed_out: true, - }, - }); - } - let remaining_ms = request.config.timeout_ms.saturating_sub(waited_ms); - let sleep_ms = remaining_ms.min(request.config.poll_interval_ms); - thread::sleep(Duration::from_millis(sleep_ms)); - } -} - -fn background_job_tool_output( - record: &BackgroundConsultJobRecord, - wait: Option<BackgroundConsultWaitMetadata>, - generation: Generation, - stage: FaultStage, - operation: &str, -) -> Result<ToolOutput, FaultRecord> { - let mut concise = match record.summary() { - Value::Object(object) => object, - _ => Map::new(), - }; - if let Some(wait) = wait { - let _ = concise.insert("waited_ms".to_owned(), json!(wait.waited_ms)); - let _ = concise.insert("timed_out".to_owned(), json!(wait.timed_out)); - } - let _ = concise.insert( - "result".to_owned(), - json!(record.result.as_ref().map(|result| json!({ - "response": result.response, - "persisted_output_path": result.persisted_output_path, - "duration_ms": result.duration_ms, - "num_turns": result.num_turns, - "session_id": result.session_id, - "model": result.model_name(), - }))), - ); - let _ = concise.insert("failure".to_owned(), json!(record.failure)); - let mut full = Map::from_iter([("job".to_owned(), json!(record))]); - if let Some(wait) = wait { - let _ = full.insert("waited_ms".to_owned(), json!(wait.waited_ms)); - let _ = full.insert("timed_out".to_owned(), json!(wait.timed_out)); - } - let mut lines = vec![format!( - "job={} status={:?}", - record.job_id.display(), - record.status - )]; - if let Some(wait) = wait { - lines.push(format!( - "waited={} timed_out={}", - render_duration_ms(wait.waited_ms), - wait.timed_out - )); - } - if let Some(result) = record.result.as_ref() { - lines.push(format!( - "result ready model={} turns={} duration={}", - result.model_name().unwrap_or_else(|| "unknown".to_owned()), - result.num_turns, - render_duration_ms(result.duration_ms) - )); - lines.push(format!("saved={}", result.persisted_output_path)); - lines.push("response:".to_owned()); - lines.push(result.response.clone()); - } - if let Some(failure) = record.failure.as_ref() { - lines.push(format!("failure={} {}", failure.class, failure.detail)); - } - fallback_detailed_tool_output( - &Value::Object(concise), - &Value::Object(full), - lines.join("\n"), - None, - SurfaceKind::Read, - generation, - stage, - operation, - ) -} - -fn background_consult_job_root() -> io::Result<PathBuf> { - let path = phone_opus_state_root()?.join("mcp").join("consult_jobs"); - fs::create_dir_all(&path)?; - Ok(path) -} - -fn background_consult_job_path(job_id: &BackgroundConsultJobId) -> io::Result<PathBuf> { - Ok(background_consult_job_root()?.join(format!("{}.json", job_id.display()))) -} - -fn persist_background_consult_job(record: &BackgroundConsultJobRecord) -> io::Result<()> { - let path = background_consult_job_path(&record.job_id)?; - persist_background_consult_job_to_path(&path, record) -} - -fn persist_background_consult_job_to_path( - path: &Path, - record: &BackgroundConsultJobRecord, -) -> io::Result<()> { - write_json_file(path, record) -} - -fn load_background_consult_job( - job_id: &BackgroundConsultJobId, -) -> io::Result<BackgroundConsultJobRecord> { - read_json_file(background_consult_job_path(job_id)?.as_path()) -} - -fn load_recent_background_consult_jobs( - limit: usize, -) -> io::Result<Vec<BackgroundConsultJobRecord>> { - let mut jobs = fs::read_dir(background_consult_job_root()?)? - .filter_map(Result::ok) - .map(|entry| entry.path()) - .filter(|path| path.extension().and_then(|ext| ext.to_str()) == Some("json")) - .filter_map(|path| read_json_file::<BackgroundConsultJobRecord>(&path).ok()) - .collect::<Vec<_>>(); - jobs.sort_by(|left, right| { - right - .updated_unix_ms - .cmp(&left.updated_unix_ms) - .then_with(|| left.job_id.cmp(&right.job_id)) - }); - jobs.truncate(limit); - Ok(jobs) -} - -fn read_json_file<T: for<'de> Deserialize<'de>>(path: &Path) -> io::Result<T> { - let bytes = fs::read(path)?; - serde_json::from_slice(&bytes).map_err(|error| io::Error::other(error.to_string())) -} - fn write_json_file<T: Serialize>(path: &Path, value: &T) -> io::Result<()> { let payload = serde_json::to_vec_pretty(value).map_err(|error| io::Error::other(error.to_string()))?; @@ -1584,7 +781,7 @@ fn invoke_claude(request: &ConsultRequest) -> Result<ConsultResponse, ConsultInv let unit_name = claude_unit_name(request); let transcript = ClaudeTranscriptMonitor::new( sandbox.claude_config_dir().join("projects"), - request.planned_session_id(), + request.session_id(), ); let mut command = Command::new(SYSTEMD_RUN_BINARY); let runtime_dir = caller_runtime_dir(); @@ -1641,18 +838,14 @@ fn invoke_claude(request: &ConsultRequest) -> Result<ConsultResponse, ConsultInv .arg("--tools") .arg(render_claude_toolset(CLAUDE_TOOLSET)) .arg("--dangerously-skip-permissions") + .arg("--session-id") + .arg(request.session_id()) .stdout(Stdio::piped()) .stderr(Stdio::piped()); - if let Some(session_id) = request.launch_session_id() { - let _ = command.arg("--session-id").arg(session_id); - } let child = command .arg(request.prompt.rendered()) .spawn() .map_err(ConsultInvocationError::Spawn)?; - request - .remember_planned_context() - .map_err(ConsultInvocationError::Spawn)?; let output = wait_for_claude_completion(child, unit_name.as_str(), transcript)?; let status = output.status; let stdout = output.stdout.trim().to_owned(); @@ -1664,7 +857,6 @@ fn invoke_claude(request: &ConsultRequest) -> Result<ConsultResponse, ConsultInv envelope.envelope_type ))); } - let observed_session_id = envelope.session_id.clone(); if !status.success() || envelope.is_error || envelope.subtype.as_deref() != Some("success") { return Err(ConsultInvocationError::Downstream( envelope @@ -1674,27 +866,16 @@ fn invoke_claude(request: &ConsultRequest) -> Result<ConsultResponse, ConsultInv )); } let result = envelope.result.clone().unwrap_or_default(); - let canonical_session_id = observed_session_id - .clone() - .or_else(|| Some(request.planned_session_id())); - request - .remember_context(canonical_session_id.as_deref()) - .map_err(ConsultInvocationError::Spawn)?; let persisted_output_path = persist_consult_output(request, &result, &envelope) .map_err(ConsultInvocationError::Spawn)?; Ok(ConsultResponse { cwd: request.cwd.clone(), result, persisted_output_path, - context_mode: request.context_mode(), - planned_session_id: request.planned_session_id(), - reused_session_id: request.reused_session_id(), duration_ms: envelope.duration_ms.unwrap_or(0), duration_api_ms: envelope.duration_api_ms, num_turns: envelope.num_turns.unwrap_or(0), stop_reason: envelope.stop_reason, - session_id: canonical_session_id, - observed_session_id, total_cost_usd: envelope.total_cost_usd, usage: envelope.usage, model_usage: envelope.model_usage, @@ -1855,7 +1036,7 @@ fn stop_transient_unit(unit_name: &str) -> io::Result<()> { } fn claude_unit_name(request: &ConsultRequest) -> String { - format!("phone-opus-claude-{}", request.planned_session_id()) + format!("phone-opus-claude-{}", request.session_id()) } fn claude_stall_timeout() -> Duration { @@ -1890,51 +1071,6 @@ fn phone_opus_state_root() -> io::Result<PathBuf> { Ok(root) } -fn consult_context_index_path() -> io::Result<PathBuf> { - let root = phone_opus_state_root()?.join("mcp"); - fs::create_dir_all(&root)?; - Ok(root.join(CONSULT_CONTEXT_INDEX_FILE_NAME)) -} - -fn load_consult_context_index() -> io::Result<ConsultContextIndex> { - let path = consult_context_index_path()?; - match read_json_file::<ConsultContextIndex>(path.as_path()) { - Ok(index) => Ok(index), - Err(error) if error.kind() == io::ErrorKind::NotFound => Ok(ConsultContextIndex::default()), - Err(error) => Err(error), - } -} - -#[allow( - dead_code, - reason = "context lookup is parked while one-shot consults are enforced" -)] -fn load_consult_context(key: &ConsultContextKey) -> io::Result<Option<StoredConsultContext>> { - Ok(load_consult_context_index()?.context_for(key)) -} - -fn remember_planned_consult_context( - key: &ConsultContextKey, - session_id: &SessionHandle, -) -> io::Result<()> { - let mut index = load_consult_context_index()?; - index.remember(key, session_id, StoredConsultContextState::Planned); - write_json_file(consult_context_index_path()?.as_path(), &index) -} - -fn confirm_consult_context( - key: &ConsultContextKey, - observed_session_id: Option<&str>, - fallback_session_id: &SessionHandle, -) -> io::Result<()> { - let session_id = observed_session_id - .and_then(SessionHandle::parse) - .unwrap_or_else(|| fallback_session_id.clone()); - let mut index = load_consult_context_index()?; - index.remember(key, &session_id, StoredConsultContextState::Confirmed); - write_json_file(consult_context_index_path()?.as_path(), &index) -} - fn caller_home_dir() -> Option<PathBuf> { std::env::var_os("HOME") .filter(|value| !value.is_empty()) @@ -2250,10 +1386,6 @@ fn model_name(model_usage: Option<&Value>) -> Option<String> { models.keys().next().cloned() } -fn elapsed_duration_ms(duration: Duration) -> u64 { - duration.as_millis().try_into().unwrap_or(u64::MAX) -} - fn render_duration_ms(duration_ms: u64) -> String { if duration_ms < 1_000 { return format!("{duration_ms}ms"); |