swarm repositories / source
aboutsummaryrefslogtreecommitdiff
path: root/crates/phone-opus/src/mcp/service.rs
diff options
context:
space:
mode:
authormain <main@swarm.moe>2026-03-31 13:21:05 -0400
committermain <main@swarm.moe>2026-03-31 13:21:05 -0400
commit2fc866a3ce50b6ba9c5e84e0ad2f8c77517361ff (patch)
treedab1bc9d653794944369ec51c2ed250aaf680f44 /crates/phone-opus/src/mcp/service.rs
parent2160224b7ef21e3319a93d057165712aabe8cbe2 (diff)
downloadphone_opus-2fc866a3ce50b6ba9c5e84e0ad2f8c77517361ff.zip
Excise hidden consult machinery
Diffstat (limited to 'crates/phone-opus/src/mcp/service.rs')
-rw-r--r--crates/phone-opus/src/mcp/service.rs886
1 files changed, 9 insertions, 877 deletions
diff --git a/crates/phone-opus/src/mcp/service.rs b/crates/phone-opus/src/mcp/service.rs
index afb811e..5f6ab99 100644
--- a/crates/phone-opus/src/mcp/service.rs
+++ b/crates/phone-opus/src/mcp/service.rs
@@ -12,7 +12,7 @@ use std::time::{Duration, Instant, SystemTime, UNIX_EPOCH};
use dirs::{home_dir, state_dir};
use libmcp::{Generation, SurfaceKind};
use serde::{Deserialize, Serialize};
-use serde_json::{Map, Value, json};
+use serde_json::{Value, json};
use thiserror::Error;
use time::{OffsetDateTime, format_description::well_known::Rfc3339};
use users::get_current_uid;
@@ -56,11 +56,6 @@ pub(crate) fn run_worker(generation: u64) -> Result<(), Box<dyn std::error::Erro
Ok(())
}
-pub(crate) fn run_background_consult(job_file: PathBuf) -> Result<(), Box<dyn std::error::Error>> {
- execute_background_consult(job_file)?;
- Ok(())
-}
-
struct WorkerService {
generation: Generation,
}
@@ -119,98 +114,26 @@ struct ConsultArgs {
cwd: Option<String>,
}
-#[derive(Debug, Deserialize)]
-struct ConsultJobArgs {
- job_id: String,
-}
-
-#[derive(Debug, Deserialize)]
-struct ConsultWaitArgs {
- job_id: String,
- timeout_ms: Option<u64>,
- poll_interval_ms: Option<u64>,
-}
-
-#[derive(Debug, Default, Deserialize)]
-struct ConsultJobsArgs {
- limit: Option<u64>,
-}
-
-const DEFAULT_CONSULT_WAIT_TIMEOUT_MS: u64 = 30 * 60 * 1_000;
-const DEFAULT_CONSULT_WAIT_POLL_INTERVAL_MS: u64 = 1_000;
-const MIN_CONSULT_WAIT_POLL_INTERVAL_MS: u64 = 10;
-
#[derive(Debug, Clone)]
struct ConsultRequest {
prompt: PromptText,
cwd: WorkingDirectory,
- context_key: ConsultContextKey,
- session_plan: ConsultSessionPlan,
+ session: SessionHandle,
}
impl ConsultRequest {
fn parse(args: ConsultArgs) -> Result<Self, ConsultRequestError> {
let prompt = PromptText::parse(args.prompt)?;
let cwd = WorkingDirectory::resolve(args.cwd)?;
- let context_key = ConsultContextKey::from_cwd(&cwd);
- let session_plan = ConsultSessionPlan::fresh();
Ok(Self {
prompt,
cwd,
- context_key,
- session_plan,
+ session: SessionHandle::fresh(),
})
}
- fn planned_session_id(&self) -> String {
- self.session_plan.planned_session().display()
- }
-
- #[allow(
- dead_code,
- reason = "session metadata is retained internally but hidden from the public surface"
- )]
- fn context_mode(&self) -> &'static str {
- self.session_plan.context_mode()
- }
-
- #[allow(
- dead_code,
- reason = "session metadata is retained internally but hidden from the public surface"
- )]
- fn reused_session_id(&self) -> Option<String> {
- self.session_plan.reused_session_id()
- }
-
- fn launch_session_id(&self) -> Option<String> {
- match self.session_plan {
- ConsultSessionPlan::Start { .. } => Some(self.planned_session_id()),
- ConsultSessionPlan::Resume(_) => None,
- }
- }
-
- fn remember_context(&self, session_id: Option<&str>) -> io::Result<()> {
- confirm_consult_context(
- &self.context_key,
- session_id,
- self.session_plan.planned_session(),
- )
- }
-
- fn remember_planned_context(&self) -> io::Result<()> {
- if let ConsultSessionPlan::Start { session, .. } = &self.session_plan {
- remember_planned_consult_context(&self.context_key, session)
- } else {
- Ok(())
- }
- }
-
- #[allow(dead_code, reason = "background submission is parked but not exposed")]
- fn background_request(&self) -> BackgroundConsultRequest {
- BackgroundConsultRequest {
- prompt: self.prompt.as_str().to_owned(),
- cwd: self.cwd.display(),
- }
+ fn session_id(&self) -> String {
+ self.session.display()
}
}
@@ -310,353 +233,11 @@ impl SessionHandle {
Self(Uuid::new_v4())
}
- fn parse(raw: &str) -> Option<Self> {
- Uuid::parse_str(raw).ok().map(Self)
- }
-
- fn display(&self) -> String {
- self.0.to_string()
- }
-}
-
-#[derive(Debug, Clone, Eq, Ord, PartialEq, PartialOrd)]
-struct ConsultContextKey(String);
-
-impl ConsultContextKey {
- fn from_cwd(cwd: &WorkingDirectory) -> Self {
- Self(cwd.display())
- }
-
- fn as_str(&self) -> &str {
- self.0.as_str()
- }
-}
-
-#[derive(Debug, Clone, Deserialize, Serialize)]
-struct StoredConsultContext {
- session_id: String,
- #[serde(default = "default_consult_context_state")]
- state: StoredConsultContextState,
- updated_unix_ms: u64,
-}
-
-#[derive(Debug, Clone, Copy, Default, Deserialize, Eq, PartialEq, Serialize)]
-#[serde(rename_all = "snake_case")]
-enum StoredConsultContextState {
- Planned,
- #[default]
- Confirmed,
-}
-
-const fn default_consult_context_state() -> StoredConsultContextState {
- StoredConsultContextState::Confirmed
-}
-
-#[derive(Debug, Default, Deserialize, Serialize)]
-struct ConsultContextIndex {
- by_cwd: BTreeMap<String, StoredConsultContext>,
-}
-
-impl ConsultContextIndex {
- #[allow(
- dead_code,
- reason = "context lookup is parked while session reuse stays disabled"
- )]
- fn context_for(&self, key: &ConsultContextKey) -> Option<StoredConsultContext> {
- self.by_cwd.get(key.as_str()).cloned()
- }
-
- fn remember(
- &mut self,
- key: &ConsultContextKey,
- session: &SessionHandle,
- state: StoredConsultContextState,
- ) {
- let _ = self.by_cwd.insert(
- key.as_str().to_owned(),
- StoredConsultContext {
- session_id: session.display(),
- state,
- updated_unix_ms: unix_ms_now(),
- },
- );
- }
-}
-
-#[derive(Debug, Clone)]
-enum ConsultSessionPlan {
- Start {
- session: SessionHandle,
- reused: bool,
- },
- #[allow(
- dead_code,
- reason = "resume plans are parked while one-shot consults are enforced"
- )]
- Resume(SessionHandle),
-}
-
-impl ConsultSessionPlan {
- fn fresh() -> Self {
- Self::Start {
- session: SessionHandle::fresh(),
- reused: false,
- }
- }
-
- #[allow(
- dead_code,
- reason = "stored-session revival is parked while one-shot consults are enforced"
- )]
- fn from_stored(context: StoredConsultContext) -> Option<Self> {
- let session = SessionHandle::parse(context.session_id.as_str())?;
- Some(match context.state {
- StoredConsultContextState::Planned => Self::Start {
- session,
- reused: true,
- },
- StoredConsultContextState::Confirmed => Self::Resume(session),
- })
- }
-
- fn planned_session(&self) -> &SessionHandle {
- match self {
- Self::Start { session, .. } | Self::Resume(session) => session,
- }
- }
-
- #[allow(
- dead_code,
- reason = "resume paths are parked while one-shot consults are enforced"
- )]
- fn resume_session(&self) -> Option<&SessionHandle> {
- match self {
- Self::Resume(session) => Some(session),
- Self::Start { .. } => None,
- }
- }
-
- fn context_mode(&self) -> &'static str {
- match self {
- Self::Start { reused: false, .. } => "fresh",
- Self::Start { reused: true, .. } | Self::Resume(_) => "reused",
- }
- }
-
- fn reused_session_id(&self) -> Option<String> {
- match self {
- Self::Start {
- reused: true,
- session,
- }
- | Self::Resume(session) => Some(session.display()),
- Self::Start { reused: false, .. } => None,
- }
- }
-}
-
-#[derive(Debug, Clone, Deserialize, Eq, PartialEq, Serialize)]
-struct BackgroundConsultRequest {
- prompt: String,
- cwd: String,
-}
-
-impl BackgroundConsultRequest {
- fn into_consult_request(self) -> Result<ConsultRequest, ConsultRequestError> {
- ConsultRequest::parse(ConsultArgs {
- prompt: self.prompt,
- cwd: Some(self.cwd),
- })
- }
-}
-
-#[derive(Debug, Clone, Copy, Deserialize, Eq, PartialEq, Serialize)]
-#[serde(rename_all = "snake_case")]
-enum BackgroundConsultStatus {
- Queued,
- Running,
- Succeeded,
- Failed,
-}
-
-impl BackgroundConsultStatus {
- fn done(self) -> bool {
- matches!(self, Self::Succeeded | Self::Failed)
- }
-
- fn success(self) -> bool {
- matches!(self, Self::Succeeded)
- }
-
- fn failed(self) -> bool {
- matches!(self, Self::Failed)
- }
-}
-
-#[derive(Debug, Clone, Deserialize, Eq, Ord, PartialEq, PartialOrd, Serialize)]
-#[serde(transparent)]
-struct BackgroundConsultJobId(Uuid);
-
-impl BackgroundConsultJobId {
- fn new() -> Self {
- Self(Uuid::new_v4())
- }
-
- fn parse(raw: String) -> Result<Self, ConsultRequestError> {
- Uuid::parse_str(&raw)
- .map(Self)
- .map_err(|_| ConsultRequestError::InvalidJobHandle(raw))
- }
-
fn display(&self) -> String {
self.0.to_string()
}
}
-#[derive(Debug, Clone, Deserialize, Eq, PartialEq, Serialize)]
-struct BackgroundConsultFailure {
- class: String,
- detail: String,
-}
-
-#[derive(Debug, Clone, Deserialize, PartialEq, Serialize)]
-struct BackgroundConsultResponseRecord {
- cwd: String,
- response: String,
- persisted_output_path: String,
- duration_ms: u64,
- duration_api_ms: Option<u64>,
- num_turns: u64,
- stop_reason: Option<String>,
- session_id: Option<String>,
- total_cost_usd: Option<f64>,
- usage: Option<Value>,
- model_usage: Option<Value>,
- permission_denials: Vec<Value>,
- fast_mode_state: Option<String>,
- uuid: Option<String>,
-}
-
-impl BackgroundConsultResponseRecord {
- fn from_response(response: ConsultResponse) -> Self {
- Self {
- cwd: response.cwd.display(),
- response: response.result,
- persisted_output_path: response.persisted_output_path.display(),
- duration_ms: response.duration_ms,
- duration_api_ms: response.duration_api_ms,
- num_turns: response.num_turns,
- stop_reason: response.stop_reason,
- session_id: response.session_id,
- total_cost_usd: response.total_cost_usd,
- usage: response.usage,
- model_usage: response.model_usage,
- permission_denials: response.permission_denials,
- fast_mode_state: response.fast_mode_state,
- uuid: response.uuid,
- }
- }
-
- fn model_name(&self) -> Option<String> {
- model_name(self.model_usage.as_ref())
- }
-}
-
-#[derive(Debug, Clone, Deserialize, PartialEq, Serialize)]
-struct BackgroundConsultJobRecord {
- job_id: BackgroundConsultJobId,
- status: BackgroundConsultStatus,
- created_unix_ms: u64,
- updated_unix_ms: u64,
- started_unix_ms: Option<u64>,
- finished_unix_ms: Option<u64>,
- runner_pid: Option<u32>,
- request: BackgroundConsultRequest,
- prompt_prefix_injected: bool,
- result: Option<BackgroundConsultResponseRecord>,
- failure: Option<BackgroundConsultFailure>,
-}
-
-#[derive(Debug, Clone, Copy, Eq, PartialEq)]
-struct ConsultWaitConfig {
- timeout_ms: u64,
- poll_interval_ms: u64,
-}
-
-impl ConsultWaitConfig {
- fn parse(timeout_ms: Option<u64>, poll_interval_ms: Option<u64>) -> Self {
- Self {
- timeout_ms: timeout_ms.unwrap_or(DEFAULT_CONSULT_WAIT_TIMEOUT_MS),
- poll_interval_ms: poll_interval_ms
- .unwrap_or(DEFAULT_CONSULT_WAIT_POLL_INTERVAL_MS)
- .max(MIN_CONSULT_WAIT_POLL_INTERVAL_MS),
- }
- }
-}
-
-#[derive(Debug, Clone)]
-struct BackgroundConsultWaitRequest {
- job_id: BackgroundConsultJobId,
- config: ConsultWaitConfig,
-}
-
-impl BackgroundConsultWaitRequest {
- fn parse(args: ConsultWaitArgs) -> Result<Self, ConsultRequestError> {
- Ok(Self {
- job_id: BackgroundConsultJobId::parse(args.job_id)?,
- config: ConsultWaitConfig::parse(args.timeout_ms, args.poll_interval_ms),
- })
- }
-}
-
-#[derive(Debug, Clone, Copy, Eq, PartialEq)]
-struct BackgroundConsultWaitMetadata {
- waited_ms: u64,
- timed_out: bool,
-}
-
-#[derive(Debug, Clone)]
-struct BackgroundConsultWaitOutcome {
- record: BackgroundConsultJobRecord,
- metadata: BackgroundConsultWaitMetadata,
-}
-
-impl BackgroundConsultJobRecord {
- fn new(job_id: BackgroundConsultJobId, request: BackgroundConsultRequest) -> Self {
- let now = unix_ms_now();
- Self {
- job_id,
- status: BackgroundConsultStatus::Queued,
- created_unix_ms: now,
- updated_unix_ms: now,
- started_unix_ms: None,
- finished_unix_ms: None,
- runner_pid: None,
- request,
- prompt_prefix_injected: true,
- result: None,
- failure: None,
- }
- }
-
- fn summary(&self) -> Value {
- json!({
- "job_id": self.job_id.display(),
- "status": self.status,
- "done": self.status.done(),
- "succeeded": self.status.success(),
- "failed": self.status.failed(),
- "created_unix_ms": self.created_unix_ms,
- "updated_unix_ms": self.updated_unix_ms,
- "started_unix_ms": self.started_unix_ms,
- "finished_unix_ms": self.finished_unix_ms,
- "runner_pid": self.runner_pid,
- "cwd": self.request.cwd,
- "prompt_prefix_injected": self.prompt_prefix_injected,
- })
- }
-}
-
#[derive(Debug, Error)]
enum ConsultRequestError {
#[error("prompt must not be empty")]
@@ -667,14 +248,6 @@ enum ConsultRequestError {
Canonicalize { path: String, source: io::Error },
#[error("working directory `{0}` is not a directory")]
NotDirectory(String),
- #[allow(
- dead_code,
- reason = "context index loading is parked while one-shot consults are enforced"
- )]
- #[error("failed to resolve consult context state: {source}")]
- ContextIndex { source: io::Error },
- #[error("job_id must be a valid UUID, got `{0}`")]
- InvalidJobHandle(String),
}
#[derive(Debug, Error)]
@@ -700,7 +273,6 @@ struct ClaudeJsonEnvelope {
num_turns: Option<u64>,
result: Option<String>,
stop_reason: Option<String>,
- session_id: Option<String>,
total_cost_usd: Option<f64>,
usage: Option<Value>,
#[serde(rename = "modelUsage")]
@@ -716,31 +288,10 @@ struct ConsultResponse {
cwd: WorkingDirectory,
result: String,
persisted_output_path: PersistedConsultPath,
- #[allow(
- dead_code,
- reason = "session metadata is retained internally but hidden from the public surface"
- )]
- context_mode: &'static str,
- #[allow(
- dead_code,
- reason = "session metadata is retained internally but hidden from the public surface"
- )]
- planned_session_id: String,
- #[allow(
- dead_code,
- reason = "session metadata is retained internally but hidden from the public surface"
- )]
- reused_session_id: Option<String>,
duration_ms: u64,
duration_api_ms: Option<u64>,
num_turns: u64,
stop_reason: Option<String>,
- session_id: Option<String>,
- #[allow(
- dead_code,
- reason = "session metadata is retained internally but hidden from the public surface"
- )]
- observed_session_id: Option<String>,
total_cost_usd: Option<f64>,
usage: Option<Value>,
model_usage: Option<Value>,
@@ -902,7 +453,6 @@ const SYSTEMD_RUN_BINARY: &str = "systemd-run";
const SYSTEMCTL_BINARY: &str = "systemctl";
const DEFAULT_PATH: &str = "/usr/local/bin:/usr/bin:/bin";
const PHONE_OPUS_STATE_ROOT_NAME: &str = "phone_opus";
-const CONSULT_CONTEXT_INDEX_FILE_NAME: &str = "consult_contexts.json";
const CLAUDE_HOME_DIR_NAME: &str = "claude-home";
const XDG_CONFIG_DIR_NAME: &str = "xdg-config";
const XDG_CACHE_DIR_NAME: &str = "xdg-cache";
@@ -1210,359 +760,6 @@ fn consult_retry_hint(quota_limited: bool, error: &ConsultInvocationError) -> Op
}
}
-pub(crate) fn consult_job_tool_output(
- arguments: Value,
- generation: Generation,
- stage: FaultStage,
- operation: &str,
-) -> Result<ToolOutput, FaultRecord> {
- let args = deserialize::<ConsultJobArgs>(arguments, operation, generation)?;
- let job_id = BackgroundConsultJobId::parse(args.job_id)
- .map_err(|error| invalid_consult_request(generation, operation, error))?;
- let record = load_background_consult_job(&job_id).map_err(|error| {
- FaultRecord::downstream(generation, stage, operation, error.to_string())
- })?;
- background_job_tool_output(&record, None, generation, stage, operation)
-}
-
-pub(crate) fn consult_wait_tool_output(
- arguments: Value,
- generation: Generation,
- stage: FaultStage,
- operation: &str,
-) -> Result<ToolOutput, FaultRecord> {
- let args = deserialize::<ConsultWaitArgs>(arguments, operation, generation)?;
- let request = BackgroundConsultWaitRequest::parse(args)
- .map_err(|error| invalid_consult_request(generation, operation, error))?;
- let outcome = wait_for_background_consult(&request).map_err(|error| {
- FaultRecord::downstream(generation, stage, operation, error.to_string())
- })?;
- background_job_tool_output(
- &outcome.record,
- Some(outcome.metadata),
- generation,
- stage,
- operation,
- )
-}
-
-pub(crate) fn consult_jobs_tool_output(
- arguments: Value,
- generation: Generation,
- stage: FaultStage,
- operation: &str,
-) -> Result<ToolOutput, FaultRecord> {
- let args = deserialize::<ConsultJobsArgs>(arguments, operation, generation)?;
- let limit = args.limit.unwrap_or(10).clamp(1, 50);
- let jobs = load_recent_background_consult_jobs(limit as usize).map_err(|error| {
- FaultRecord::downstream(generation, stage, operation, error.to_string())
- })?;
- let concise = json!({
- "jobs": jobs.iter().map(BackgroundConsultJobRecord::summary).collect::<Vec<_>>(),
- "count": jobs.len(),
- });
- let full = json!({
- "jobs": jobs,
- "count": jobs.len(),
- });
- let lines = if jobs.is_empty() {
- "no background consult jobs".to_owned()
- } else {
- jobs.iter()
- .map(|job| {
- format!(
- "{} status={} cwd={}",
- job.job_id.display(),
- serde_json::to_string(&job.status).unwrap_or_else(|_| "\"unknown\"".to_owned()),
- job.request.cwd
- )
- })
- .collect::<Vec<_>>()
- .join("\n")
- };
- fallback_detailed_tool_output(
- &concise,
- &full,
- lines,
- None,
- SurfaceKind::Read,
- generation,
- stage,
- operation,
- )
-}
-
-#[allow(dead_code, reason = "background submission is parked but not exposed")]
-fn submit_background_consult(
- request: &ConsultRequest,
- generation: Generation,
- stage: FaultStage,
- operation: &str,
-) -> Result<ToolOutput, FaultRecord> {
- let job_id = BackgroundConsultJobId::new();
- let mut record = BackgroundConsultJobRecord::new(job_id, request.background_request());
- persist_background_consult_job(&record)
- .map_err(|error| FaultRecord::process(generation, stage, operation, error.to_string()))?;
-
- let executable = std::env::current_exe()
- .map_err(|error| FaultRecord::process(generation, stage, operation, error.to_string()))?;
- let job_file = background_consult_job_path(&record.job_id)
- .map_err(|error| FaultRecord::process(generation, stage, operation, error.to_string()))?;
- let child = Command::new(executable)
- .arg("mcp")
- .arg("background-consult")
- .arg("--job-file")
- .arg(&job_file)
- .stdin(Stdio::null())
- .stdout(Stdio::null())
- .stderr(Stdio::null())
- .spawn()
- .map_err(|error| FaultRecord::process(generation, stage, operation, error.to_string()))?;
-
- record.status = BackgroundConsultStatus::Running;
- record.started_unix_ms = Some(unix_ms_now());
- record.updated_unix_ms = unix_ms_now();
- record.runner_pid = Some(child.id());
- persist_background_consult_job(&record)
- .map_err(|error| FaultRecord::process(generation, stage, operation, error.to_string()))?;
-
- let concise = json!({
- "mode": "background",
- "job_id": record.job_id.display(),
- "status": record.status,
- "done": false,
- "reused_session_id": request.reused_session_id(),
- "context_mode": request.context_mode(),
- "prompt_prefix_injected": true,
- "follow_up_tools": ["consult_wait", "consult_job", "consult_jobs"],
- });
- let full = json!({
- "mode": "background",
- "job_id": record.job_id.display(),
- "status": record.status,
- "done": false,
- "reused_session_id": request.reused_session_id(),
- "context_mode": request.context_mode(),
- "prompt_prefix_injected": true,
- "prompt": request.prompt.as_str(),
- "effective_prompt": request.prompt.rendered(),
- "cwd": request.cwd.display(),
- "follow_up_tools": ["consult_wait", "consult_job", "consult_jobs"],
- });
- fallback_detailed_tool_output(
- &concise,
- &full,
- format!(
- "background consult submitted job={} status=running",
- record.job_id.display()
- ),
- None,
- SurfaceKind::Read,
- generation,
- stage,
- operation,
- )
-}
-
-fn execute_background_consult(job_file: PathBuf) -> io::Result<()> {
- let mut record = read_json_file::<BackgroundConsultJobRecord>(&job_file)?;
- record.status = BackgroundConsultStatus::Running;
- let _ = record.runner_pid.get_or_insert(std::process::id());
- let _ = record.started_unix_ms.get_or_insert_with(unix_ms_now);
- record.updated_unix_ms = unix_ms_now();
- persist_background_consult_job_to_path(&job_file, &record)?;
-
- let request = record
- .request
- .clone()
- .into_consult_request()
- .map_err(|error| io::Error::other(error.to_string()))?;
- match invoke_claude(&request) {
- Ok(response) => {
- record.status = BackgroundConsultStatus::Succeeded;
- record.result = Some(BackgroundConsultResponseRecord::from_response(response));
- record.failure = None;
- }
- Err(error) => {
- record.status = BackgroundConsultStatus::Failed;
- record.result = None;
- record.failure = Some(background_failure(error));
- }
- }
- record.finished_unix_ms = Some(unix_ms_now());
- record.updated_unix_ms = unix_ms_now();
- record.runner_pid = None;
- persist_background_consult_job_to_path(&job_file, &record)
-}
-
-fn background_failure(error: ConsultInvocationError) -> BackgroundConsultFailure {
- match error {
- ConsultInvocationError::Spawn(source) => BackgroundConsultFailure {
- class: "process".to_owned(),
- detail: source.to_string(),
- },
- ConsultInvocationError::InvalidJson(detail)
- | ConsultInvocationError::Stalled(detail)
- | ConsultInvocationError::Downstream(detail) => BackgroundConsultFailure {
- class: "downstream".to_owned(),
- detail,
- },
- }
-}
-
-fn wait_for_background_consult(
- request: &BackgroundConsultWaitRequest,
-) -> io::Result<BackgroundConsultWaitOutcome> {
- let started_at = Instant::now();
- loop {
- let record = load_background_consult_job(&request.job_id)?;
- let waited_ms = elapsed_duration_ms(started_at.elapsed());
- if record.status.done() {
- return Ok(BackgroundConsultWaitOutcome {
- record,
- metadata: BackgroundConsultWaitMetadata {
- waited_ms,
- timed_out: false,
- },
- });
- }
- if waited_ms >= request.config.timeout_ms {
- return Ok(BackgroundConsultWaitOutcome {
- record,
- metadata: BackgroundConsultWaitMetadata {
- waited_ms,
- timed_out: true,
- },
- });
- }
- let remaining_ms = request.config.timeout_ms.saturating_sub(waited_ms);
- let sleep_ms = remaining_ms.min(request.config.poll_interval_ms);
- thread::sleep(Duration::from_millis(sleep_ms));
- }
-}
-
-fn background_job_tool_output(
- record: &BackgroundConsultJobRecord,
- wait: Option<BackgroundConsultWaitMetadata>,
- generation: Generation,
- stage: FaultStage,
- operation: &str,
-) -> Result<ToolOutput, FaultRecord> {
- let mut concise = match record.summary() {
- Value::Object(object) => object,
- _ => Map::new(),
- };
- if let Some(wait) = wait {
- let _ = concise.insert("waited_ms".to_owned(), json!(wait.waited_ms));
- let _ = concise.insert("timed_out".to_owned(), json!(wait.timed_out));
- }
- let _ = concise.insert(
- "result".to_owned(),
- json!(record.result.as_ref().map(|result| json!({
- "response": result.response,
- "persisted_output_path": result.persisted_output_path,
- "duration_ms": result.duration_ms,
- "num_turns": result.num_turns,
- "session_id": result.session_id,
- "model": result.model_name(),
- }))),
- );
- let _ = concise.insert("failure".to_owned(), json!(record.failure));
- let mut full = Map::from_iter([("job".to_owned(), json!(record))]);
- if let Some(wait) = wait {
- let _ = full.insert("waited_ms".to_owned(), json!(wait.waited_ms));
- let _ = full.insert("timed_out".to_owned(), json!(wait.timed_out));
- }
- let mut lines = vec![format!(
- "job={} status={:?}",
- record.job_id.display(),
- record.status
- )];
- if let Some(wait) = wait {
- lines.push(format!(
- "waited={} timed_out={}",
- render_duration_ms(wait.waited_ms),
- wait.timed_out
- ));
- }
- if let Some(result) = record.result.as_ref() {
- lines.push(format!(
- "result ready model={} turns={} duration={}",
- result.model_name().unwrap_or_else(|| "unknown".to_owned()),
- result.num_turns,
- render_duration_ms(result.duration_ms)
- ));
- lines.push(format!("saved={}", result.persisted_output_path));
- lines.push("response:".to_owned());
- lines.push(result.response.clone());
- }
- if let Some(failure) = record.failure.as_ref() {
- lines.push(format!("failure={} {}", failure.class, failure.detail));
- }
- fallback_detailed_tool_output(
- &Value::Object(concise),
- &Value::Object(full),
- lines.join("\n"),
- None,
- SurfaceKind::Read,
- generation,
- stage,
- operation,
- )
-}
-
-fn background_consult_job_root() -> io::Result<PathBuf> {
- let path = phone_opus_state_root()?.join("mcp").join("consult_jobs");
- fs::create_dir_all(&path)?;
- Ok(path)
-}
-
-fn background_consult_job_path(job_id: &BackgroundConsultJobId) -> io::Result<PathBuf> {
- Ok(background_consult_job_root()?.join(format!("{}.json", job_id.display())))
-}
-
-fn persist_background_consult_job(record: &BackgroundConsultJobRecord) -> io::Result<()> {
- let path = background_consult_job_path(&record.job_id)?;
- persist_background_consult_job_to_path(&path, record)
-}
-
-fn persist_background_consult_job_to_path(
- path: &Path,
- record: &BackgroundConsultJobRecord,
-) -> io::Result<()> {
- write_json_file(path, record)
-}
-
-fn load_background_consult_job(
- job_id: &BackgroundConsultJobId,
-) -> io::Result<BackgroundConsultJobRecord> {
- read_json_file(background_consult_job_path(job_id)?.as_path())
-}
-
-fn load_recent_background_consult_jobs(
- limit: usize,
-) -> io::Result<Vec<BackgroundConsultJobRecord>> {
- let mut jobs = fs::read_dir(background_consult_job_root()?)?
- .filter_map(Result::ok)
- .map(|entry| entry.path())
- .filter(|path| path.extension().and_then(|ext| ext.to_str()) == Some("json"))
- .filter_map(|path| read_json_file::<BackgroundConsultJobRecord>(&path).ok())
- .collect::<Vec<_>>();
- jobs.sort_by(|left, right| {
- right
- .updated_unix_ms
- .cmp(&left.updated_unix_ms)
- .then_with(|| left.job_id.cmp(&right.job_id))
- });
- jobs.truncate(limit);
- Ok(jobs)
-}
-
-fn read_json_file<T: for<'de> Deserialize<'de>>(path: &Path) -> io::Result<T> {
- let bytes = fs::read(path)?;
- serde_json::from_slice(&bytes).map_err(|error| io::Error::other(error.to_string()))
-}
-
fn write_json_file<T: Serialize>(path: &Path, value: &T) -> io::Result<()> {
let payload =
serde_json::to_vec_pretty(value).map_err(|error| io::Error::other(error.to_string()))?;
@@ -1584,7 +781,7 @@ fn invoke_claude(request: &ConsultRequest) -> Result<ConsultResponse, ConsultInv
let unit_name = claude_unit_name(request);
let transcript = ClaudeTranscriptMonitor::new(
sandbox.claude_config_dir().join("projects"),
- request.planned_session_id(),
+ request.session_id(),
);
let mut command = Command::new(SYSTEMD_RUN_BINARY);
let runtime_dir = caller_runtime_dir();
@@ -1641,18 +838,14 @@ fn invoke_claude(request: &ConsultRequest) -> Result<ConsultResponse, ConsultInv
.arg("--tools")
.arg(render_claude_toolset(CLAUDE_TOOLSET))
.arg("--dangerously-skip-permissions")
+ .arg("--session-id")
+ .arg(request.session_id())
.stdout(Stdio::piped())
.stderr(Stdio::piped());
- if let Some(session_id) = request.launch_session_id() {
- let _ = command.arg("--session-id").arg(session_id);
- }
let child = command
.arg(request.prompt.rendered())
.spawn()
.map_err(ConsultInvocationError::Spawn)?;
- request
- .remember_planned_context()
- .map_err(ConsultInvocationError::Spawn)?;
let output = wait_for_claude_completion(child, unit_name.as_str(), transcript)?;
let status = output.status;
let stdout = output.stdout.trim().to_owned();
@@ -1664,7 +857,6 @@ fn invoke_claude(request: &ConsultRequest) -> Result<ConsultResponse, ConsultInv
envelope.envelope_type
)));
}
- let observed_session_id = envelope.session_id.clone();
if !status.success() || envelope.is_error || envelope.subtype.as_deref() != Some("success") {
return Err(ConsultInvocationError::Downstream(
envelope
@@ -1674,27 +866,16 @@ fn invoke_claude(request: &ConsultRequest) -> Result<ConsultResponse, ConsultInv
));
}
let result = envelope.result.clone().unwrap_or_default();
- let canonical_session_id = observed_session_id
- .clone()
- .or_else(|| Some(request.planned_session_id()));
- request
- .remember_context(canonical_session_id.as_deref())
- .map_err(ConsultInvocationError::Spawn)?;
let persisted_output_path = persist_consult_output(request, &result, &envelope)
.map_err(ConsultInvocationError::Spawn)?;
Ok(ConsultResponse {
cwd: request.cwd.clone(),
result,
persisted_output_path,
- context_mode: request.context_mode(),
- planned_session_id: request.planned_session_id(),
- reused_session_id: request.reused_session_id(),
duration_ms: envelope.duration_ms.unwrap_or(0),
duration_api_ms: envelope.duration_api_ms,
num_turns: envelope.num_turns.unwrap_or(0),
stop_reason: envelope.stop_reason,
- session_id: canonical_session_id,
- observed_session_id,
total_cost_usd: envelope.total_cost_usd,
usage: envelope.usage,
model_usage: envelope.model_usage,
@@ -1855,7 +1036,7 @@ fn stop_transient_unit(unit_name: &str) -> io::Result<()> {
}
fn claude_unit_name(request: &ConsultRequest) -> String {
- format!("phone-opus-claude-{}", request.planned_session_id())
+ format!("phone-opus-claude-{}", request.session_id())
}
fn claude_stall_timeout() -> Duration {
@@ -1890,51 +1071,6 @@ fn phone_opus_state_root() -> io::Result<PathBuf> {
Ok(root)
}
-fn consult_context_index_path() -> io::Result<PathBuf> {
- let root = phone_opus_state_root()?.join("mcp");
- fs::create_dir_all(&root)?;
- Ok(root.join(CONSULT_CONTEXT_INDEX_FILE_NAME))
-}
-
-fn load_consult_context_index() -> io::Result<ConsultContextIndex> {
- let path = consult_context_index_path()?;
- match read_json_file::<ConsultContextIndex>(path.as_path()) {
- Ok(index) => Ok(index),
- Err(error) if error.kind() == io::ErrorKind::NotFound => Ok(ConsultContextIndex::default()),
- Err(error) => Err(error),
- }
-}
-
-#[allow(
- dead_code,
- reason = "context lookup is parked while one-shot consults are enforced"
-)]
-fn load_consult_context(key: &ConsultContextKey) -> io::Result<Option<StoredConsultContext>> {
- Ok(load_consult_context_index()?.context_for(key))
-}
-
-fn remember_planned_consult_context(
- key: &ConsultContextKey,
- session_id: &SessionHandle,
-) -> io::Result<()> {
- let mut index = load_consult_context_index()?;
- index.remember(key, session_id, StoredConsultContextState::Planned);
- write_json_file(consult_context_index_path()?.as_path(), &index)
-}
-
-fn confirm_consult_context(
- key: &ConsultContextKey,
- observed_session_id: Option<&str>,
- fallback_session_id: &SessionHandle,
-) -> io::Result<()> {
- let session_id = observed_session_id
- .and_then(SessionHandle::parse)
- .unwrap_or_else(|| fallback_session_id.clone());
- let mut index = load_consult_context_index()?;
- index.remember(key, &session_id, StoredConsultContextState::Confirmed);
- write_json_file(consult_context_index_path()?.as_path(), &index)
-}
-
fn caller_home_dir() -> Option<PathBuf> {
std::env::var_os("HOME")
.filter(|value| !value.is_empty())
@@ -2250,10 +1386,6 @@ fn model_name(model_usage: Option<&Value>) -> Option<String> {
models.keys().next().cloned()
}
-fn elapsed_duration_ms(duration: Duration) -> u64 {
- duration.as_millis().try_into().unwrap_or(u64::MAX)
-}
-
fn render_duration_ms(duration_ms: u64) -> String {
if duration_ms < 1_000 {
return format!("{duration_ms}ms");