swarm repositories / source
aboutsummaryrefslogtreecommitdiff
path: root/crates/phone-opus
diff options
context:
space:
mode:
Diffstat (limited to 'crates/phone-opus')
-rw-r--r--crates/phone-opus/src/main.rs11
-rw-r--r--crates/phone-opus/src/mcp/catalog.rs38
-rw-r--r--crates/phone-opus/src/mcp/host/runtime.rs17
-rw-r--r--crates/phone-opus/src/mcp/mod.rs2
-rw-r--r--crates/phone-opus/src/mcp/service.rs564
-rw-r--r--crates/phone-opus/tests/mcp_hardening.rs121
6 files changed, 745 insertions, 8 deletions
diff --git a/crates/phone-opus/src/main.rs b/crates/phone-opus/src/main.rs
index 79ace26..a1cace7 100644
--- a/crates/phone-opus/src/main.rs
+++ b/crates/phone-opus/src/main.rs
@@ -3,6 +3,7 @@ mod mcp;
use clap::{Args, Parser, Subcommand};
#[cfg(test)]
use libmcp_testkit as _;
+use std::path::PathBuf;
#[derive(Parser)]
#[command(
@@ -30,6 +31,8 @@ enum McpCommand {
Serve,
/// Run the disposable worker process.
Worker(McpWorkerArgs),
+ /// Run one detached background consult job.
+ BackgroundConsult(McpBackgroundConsultArgs),
}
#[derive(Args)]
@@ -39,12 +42,20 @@ struct McpWorkerArgs {
generation: u64,
}
+#[derive(Args)]
+struct McpBackgroundConsultArgs {
+ /// Persisted background job file to execute and update.
+ #[arg(long)]
+ job_file: PathBuf,
+}
+
fn main() -> Result<(), Box<dyn std::error::Error>> {
let cli = Cli::parse();
match cli.command {
Command::Mcp { command } => match command {
McpCommand::Serve => mcp::run_host()?,
McpCommand::Worker(args) => mcp::run_worker(args.generation)?,
+ McpCommand::BackgroundConsult(args) => mcp::run_background_consult(args.job_file)?,
},
}
Ok(())
diff --git a/crates/phone-opus/src/mcp/catalog.rs b/crates/phone-opus/src/mcp/catalog.rs
index 4c71e83..839c477 100644
--- a/crates/phone-opus/src/mcp/catalog.rs
+++ b/crates/phone-opus/src/mcp/catalog.rs
@@ -41,11 +41,23 @@ impl ToolSpec {
const TOOL_SPECS: &[ToolSpec] = &[
ToolSpec {
name: "consult",
- description: "Run a blocking consult against the system Claude Code install using a read-only built-in toolset, optionally resume a prior Claude session by session_id, and return the response.",
+ description: "Run a consult against the system Claude Code install using a read-only built-in toolset, optionally resume a prior Claude session by session_id, optionally queue the consult in the background, and return the response or job handle.",
dispatch: DispatchTarget::Worker,
replay: ReplayContract::NeverReplay,
},
ToolSpec {
+ name: "consult_job",
+ description: "Read the status of one background consult job by job_id. When the job has finished, the final Claude response or failure is included.",
+ dispatch: DispatchTarget::Host,
+ replay: ReplayContract::Convergent,
+ },
+ ToolSpec {
+ name: "consult_jobs",
+ description: "List recent background consult jobs. Defaults to render=porcelain; use render=json for structured output.",
+ dispatch: DispatchTarget::Host,
+ replay: ReplayContract::Convergent,
+ },
+ ToolSpec {
name: "health_snapshot",
description: "Read host lifecycle, worker generation, rollout state, and latest fault. Defaults to render=porcelain; use render=json for structured output.",
dispatch: DispatchTarget::Host,
@@ -98,10 +110,34 @@ fn tool_schema(name: &str) -> Value {
"session_id": {
"type": "string",
"description": "Optional Claude session handle returned by a previous consult call. When set, phone_opus resumes that conversation instead of starting a fresh one."
+ },
+ "background": {
+ "type": "boolean",
+ "description": "When true, queue the consult as a background job and return immediately with a job handle. The default is false, which keeps consult synchronous."
}
},
"required": ["prompt"]
})),
+ "consult_job" => with_common_presentation(json!({
+ "type": "object",
+ "properties": {
+ "job_id": {
+ "type": "string",
+ "description": "Background consult job handle returned by consult with background=true."
+ }
+ },
+ "required": ["job_id"]
+ })),
+ "consult_jobs" => with_common_presentation(json!({
+ "type": "object",
+ "properties": {
+ "limit": {
+ "type": "integer",
+ "minimum": 1,
+ "description": "Maximum number of recent background jobs to return. Defaults to 10."
+ }
+ }
+ })),
"health_snapshot" | "telemetry_snapshot" => with_common_presentation(json!({
"type": "object",
"properties": {}
diff --git a/crates/phone-opus/src/mcp/host/runtime.rs b/crates/phone-opus/src/mcp/host/runtime.rs
index 5922766..1d18453 100644
--- a/crates/phone-opus/src/mcp/host/runtime.rs
+++ b/crates/phone-opus/src/mcp/host/runtime.rs
@@ -25,6 +25,7 @@ use crate::mcp::protocol::{
FORCE_ROLLOUT_ENV, HOST_STATE_ENV, HostRequestId, HostStateSeed, PROTOCOL_VERSION, SERVER_NAME,
WORKER_CRASH_ONCE_ENV, WorkerOperation, WorkerSpawnConfig,
};
+use crate::mcp::service::{consult_job_tool_output, consult_jobs_tool_output};
use crate::mcp::telemetry::ServerTelemetry;
pub(crate) fn run_host() -> Result<(), Box<dyn std::error::Error>> {
@@ -360,9 +361,23 @@ impl HostRuntime {
fn handle_host_tool(&mut self, name: &str, arguments: Value) -> Result<Value, FaultRecord> {
let operation = format!("tools/call:{name}");
let generation = self.worker.generation();
- let (presentation, _arguments) =
+ let (presentation, arguments) =
split_presentation(arguments, &operation, generation, FaultStage::Host)?;
match name {
+ "consult_job" => tool_success(
+ consult_job_tool_output(arguments, generation, FaultStage::Host, &operation)?,
+ presentation,
+ generation,
+ FaultStage::Host,
+ &operation,
+ ),
+ "consult_jobs" => tool_success(
+ consult_jobs_tool_output(arguments, generation, FaultStage::Host, &operation)?,
+ presentation,
+ generation,
+ FaultStage::Host,
+ &operation,
+ ),
"health_snapshot" => {
let rollout = if self.binary.rollout_pending().map_err(|error| {
FaultRecord::rollout(generation, &operation, error.to_string())
diff --git a/crates/phone-opus/src/mcp/mod.rs b/crates/phone-opus/src/mcp/mod.rs
index 666598f..ecf5aad 100644
--- a/crates/phone-opus/src/mcp/mod.rs
+++ b/crates/phone-opus/src/mcp/mod.rs
@@ -7,4 +7,4 @@ mod service;
mod telemetry;
pub(crate) use host::runtime::run_host;
-pub(crate) use service::run_worker;
+pub(crate) use service::{run_background_consult, run_worker};
diff --git a/crates/phone-opus/src/mcp/service.rs b/crates/phone-opus/src/mcp/service.rs
index d42c0a0..5b38c4b 100644
--- a/crates/phone-opus/src/mcp/service.rs
+++ b/crates/phone-opus/src/mcp/service.rs
@@ -1,10 +1,13 @@
use std::collections::BTreeMap;
+use std::fs;
use std::io::{self, BufRead, Write};
use std::path::{Path, PathBuf};
-use std::process::Command;
+use std::process::{Command, Stdio};
+use std::time::{Duration, SystemTime, UNIX_EPOCH};
+use dirs::{home_dir, state_dir};
use libmcp::{Generation, SurfaceKind};
-use serde::Deserialize;
+use serde::{Deserialize, Serialize};
use serde_json::{Value, json};
use thiserror::Error;
use uuid::Uuid;
@@ -47,6 +50,11 @@ pub(crate) fn run_worker(generation: u64) -> Result<(), Box<dyn std::error::Erro
Ok(())
}
+pub(crate) fn run_background_consult(job_file: PathBuf) -> Result<(), Box<dyn std::error::Error>> {
+ execute_background_consult(job_file)?;
+ Ok(())
+}
+
struct WorkerService {
generation: Generation,
}
@@ -76,9 +84,19 @@ impl WorkerService {
let args = deserialize::<ConsultArgs>(arguments, &operation, self.generation)?;
let request = ConsultRequest::parse(args)
.map_err(|error| invalid_consult_request(self.generation, &operation, error))?;
- let response = invoke_claude(&request)
- .map_err(|error| consult_fault(self.generation, &operation, error))?;
- consult_output(&request, &response, self.generation, &operation)?
+ match request.mode() {
+ ConsultMode::Sync => {
+ let response = invoke_claude(&request)
+ .map_err(|error| consult_fault(self.generation, &operation, error))?;
+ consult_output(&request, &response, self.generation, &operation)?
+ }
+ ConsultMode::Background => submit_background_consult(
+ &request,
+ self.generation,
+ FaultStage::Worker,
+ &operation,
+ )?,
+ }
}
other => {
return Err(FaultRecord::invalid_input(
@@ -105,6 +123,17 @@ struct ConsultArgs {
cwd: Option<String>,
max_turns: Option<u64>,
session_id: Option<String>,
+ background: Option<bool>,
+}
+
+#[derive(Debug, Deserialize)]
+struct ConsultJobArgs {
+ job_id: String,
+}
+
+#[derive(Debug, Default, Deserialize)]
+struct ConsultJobsArgs {
+ limit: Option<u64>,
}
#[derive(Debug, Clone)]
@@ -113,6 +142,7 @@ struct ConsultRequest {
cwd: WorkingDirectory,
max_turns: Option<TurnLimit>,
session: Option<SessionHandle>,
+ mode: ConsultMode,
}
impl ConsultRequest {
@@ -122,9 +152,14 @@ impl ConsultRequest {
cwd: WorkingDirectory::resolve(args.cwd)?,
max_turns: args.max_turns.map(TurnLimit::parse).transpose()?,
session: args.session_id.map(SessionHandle::parse).transpose()?,
+ mode: ConsultMode::from_background(args.background),
})
}
+ fn mode(&self) -> ConsultMode {
+ self.mode
+ }
+
fn session_mode(&self) -> &'static str {
if self.session.is_some() {
"resumed"
@@ -136,6 +171,38 @@ impl ConsultRequest {
fn requested_session_id(&self) -> Option<String> {
self.session.as_ref().map(SessionHandle::display)
}
+
+ fn background_request(&self) -> BackgroundConsultRequest {
+ BackgroundConsultRequest {
+ prompt: self.prompt.as_str().to_owned(),
+ cwd: self.cwd.display(),
+ max_turns: self.max_turns.map(TurnLimit::get),
+ session_id: self.requested_session_id(),
+ }
+ }
+}
+
+#[derive(Debug, Clone, Copy, Eq, PartialEq)]
+enum ConsultMode {
+ Sync,
+ Background,
+}
+
+impl ConsultMode {
+ fn from_background(raw: Option<bool>) -> Self {
+ if raw.unwrap_or(false) {
+ Self::Background
+ } else {
+ Self::Sync
+ }
+ }
+
+ fn as_str(self) -> &'static str {
+ match self {
+ Self::Sync => "sync",
+ Self::Background => "background",
+ }
+ }
}
#[derive(Debug, Clone)]
@@ -232,6 +299,172 @@ impl SessionHandle {
}
}
+#[derive(Debug, Clone, Deserialize, Eq, PartialEq, Serialize)]
+struct BackgroundConsultRequest {
+ prompt: String,
+ cwd: String,
+ max_turns: Option<u64>,
+ session_id: Option<String>,
+}
+
+impl BackgroundConsultRequest {
+ fn into_consult_request(self) -> Result<ConsultRequest, ConsultRequestError> {
+ ConsultRequest::parse(ConsultArgs {
+ prompt: self.prompt,
+ cwd: Some(self.cwd),
+ max_turns: self.max_turns,
+ session_id: self.session_id,
+ background: Some(false),
+ })
+ }
+}
+
+#[derive(Debug, Clone, Copy, Deserialize, Eq, PartialEq, Serialize)]
+#[serde(rename_all = "snake_case")]
+enum BackgroundConsultStatus {
+ Queued,
+ Running,
+ Succeeded,
+ Failed,
+}
+
+impl BackgroundConsultStatus {
+ fn done(self) -> bool {
+ matches!(self, Self::Succeeded | Self::Failed)
+ }
+
+ fn success(self) -> bool {
+ matches!(self, Self::Succeeded)
+ }
+
+ fn failed(self) -> bool {
+ matches!(self, Self::Failed)
+ }
+}
+
+#[derive(Debug, Clone, Deserialize, Eq, Ord, PartialEq, PartialOrd, Serialize)]
+#[serde(transparent)]
+struct BackgroundConsultJobId(Uuid);
+
+impl BackgroundConsultJobId {
+ fn new() -> Self {
+ Self(Uuid::new_v4())
+ }
+
+ fn parse(raw: String) -> Result<Self, ConsultRequestError> {
+ Uuid::parse_str(&raw)
+ .map(Self)
+ .map_err(|_| ConsultRequestError::InvalidJobHandle(raw))
+ }
+
+ fn display(&self) -> String {
+ self.0.to_string()
+ }
+}
+
+#[derive(Debug, Clone, Deserialize, Eq, PartialEq, Serialize)]
+struct BackgroundConsultFailure {
+ class: String,
+ detail: String,
+}
+
+#[derive(Debug, Clone, Deserialize, PartialEq, Serialize)]
+struct BackgroundConsultResponseRecord {
+ cwd: String,
+ response: String,
+ duration_ms: u64,
+ duration_api_ms: Option<u64>,
+ num_turns: u64,
+ stop_reason: Option<String>,
+ session_id: Option<String>,
+ total_cost_usd: Option<f64>,
+ usage: Option<Value>,
+ model_usage: Option<Value>,
+ permission_denials: Vec<Value>,
+ fast_mode_state: Option<String>,
+ uuid: Option<String>,
+}
+
+impl BackgroundConsultResponseRecord {
+ fn from_response(response: ConsultResponse) -> Self {
+ Self {
+ cwd: response.cwd.display(),
+ response: response.result,
+ duration_ms: response.duration_ms,
+ duration_api_ms: response.duration_api_ms,
+ num_turns: response.num_turns,
+ stop_reason: response.stop_reason,
+ session_id: response.session_id,
+ total_cost_usd: response.total_cost_usd,
+ usage: response.usage,
+ model_usage: response.model_usage,
+ permission_denials: response.permission_denials,
+ fast_mode_state: response.fast_mode_state,
+ uuid: response.uuid,
+ }
+ }
+
+ fn model_name(&self) -> Option<String> {
+ let Value::Object(models) = self.model_usage.as_ref()? else {
+ return None;
+ };
+ models.keys().next().cloned()
+ }
+}
+
+#[derive(Debug, Clone, Deserialize, PartialEq, Serialize)]
+struct BackgroundConsultJobRecord {
+ job_id: BackgroundConsultJobId,
+ status: BackgroundConsultStatus,
+ created_unix_ms: u64,
+ updated_unix_ms: u64,
+ started_unix_ms: Option<u64>,
+ finished_unix_ms: Option<u64>,
+ runner_pid: Option<u32>,
+ request: BackgroundConsultRequest,
+ prompt_prefix_injected: bool,
+ result: Option<BackgroundConsultResponseRecord>,
+ failure: Option<BackgroundConsultFailure>,
+}
+
+impl BackgroundConsultJobRecord {
+ fn new(job_id: BackgroundConsultJobId, request: BackgroundConsultRequest) -> Self {
+ let now = unix_ms_now();
+ Self {
+ job_id,
+ status: BackgroundConsultStatus::Queued,
+ created_unix_ms: now,
+ updated_unix_ms: now,
+ started_unix_ms: None,
+ finished_unix_ms: None,
+ runner_pid: None,
+ request,
+ prompt_prefix_injected: true,
+ result: None,
+ failure: None,
+ }
+ }
+
+ fn summary(&self) -> Value {
+ json!({
+ "job_id": self.job_id.display(),
+ "status": self.status,
+ "done": self.status.done(),
+ "succeeded": self.status.success(),
+ "failed": self.status.failed(),
+ "created_unix_ms": self.created_unix_ms,
+ "updated_unix_ms": self.updated_unix_ms,
+ "started_unix_ms": self.started_unix_ms,
+ "finished_unix_ms": self.finished_unix_ms,
+ "runner_pid": self.runner_pid,
+ "cwd": self.request.cwd,
+ "max_turns": self.request.max_turns,
+ "requested_session_id": self.request.session_id,
+ "prompt_prefix_injected": self.prompt_prefix_injected,
+ })
+ }
+}
+
#[derive(Debug, Error)]
enum ConsultRequestError {
#[error("prompt must not be empty")]
@@ -246,6 +479,8 @@ enum ConsultRequestError {
InvalidTurnLimit,
#[error("session_id must be a valid UUID, got `{0}`")]
InvalidSessionHandle(String),
+ #[error("job_id must be a valid UUID, got `{0}`")]
+ InvalidJobHandle(String),
}
#[derive(Debug, Error)]
@@ -348,6 +583,323 @@ fn consult_fault(
}
}
+pub(crate) fn consult_job_tool_output(
+ arguments: Value,
+ generation: Generation,
+ stage: FaultStage,
+ operation: &str,
+) -> Result<ToolOutput, FaultRecord> {
+ let args = deserialize::<ConsultJobArgs>(arguments, operation, generation)?;
+ let job_id = BackgroundConsultJobId::parse(args.job_id)
+ .map_err(|error| invalid_consult_request(generation, operation, error))?;
+ let record = load_background_consult_job(&job_id).map_err(|error| {
+ FaultRecord::downstream(generation, stage, operation, error.to_string())
+ })?;
+ background_job_tool_output(&record, generation, stage, operation)
+}
+
+pub(crate) fn consult_jobs_tool_output(
+ arguments: Value,
+ generation: Generation,
+ stage: FaultStage,
+ operation: &str,
+) -> Result<ToolOutput, FaultRecord> {
+ let args = deserialize::<ConsultJobsArgs>(arguments, operation, generation)?;
+ let limit = args.limit.unwrap_or(10).clamp(1, 50);
+ let jobs = load_recent_background_consult_jobs(limit as usize).map_err(|error| {
+ FaultRecord::downstream(generation, stage, operation, error.to_string())
+ })?;
+ let concise = json!({
+ "jobs": jobs.iter().map(BackgroundConsultJobRecord::summary).collect::<Vec<_>>(),
+ "count": jobs.len(),
+ });
+ let full = json!({
+ "jobs": jobs,
+ "count": jobs.len(),
+ });
+ let lines = if jobs.is_empty() {
+ "no background consult jobs".to_owned()
+ } else {
+ jobs.iter()
+ .map(|job| {
+ format!(
+ "{} status={} cwd={}",
+ job.job_id.display(),
+ serde_json::to_string(&job.status).unwrap_or_else(|_| "\"unknown\"".to_owned()),
+ job.request.cwd
+ )
+ })
+ .collect::<Vec<_>>()
+ .join("\n")
+ };
+ fallback_detailed_tool_output(
+ &concise,
+ &full,
+ lines,
+ None,
+ SurfaceKind::Read,
+ generation,
+ stage,
+ operation,
+ )
+}
+
+fn submit_background_consult(
+ request: &ConsultRequest,
+ generation: Generation,
+ stage: FaultStage,
+ operation: &str,
+) -> Result<ToolOutput, FaultRecord> {
+ let job_id = BackgroundConsultJobId::new();
+ let mut record = BackgroundConsultJobRecord::new(job_id, request.background_request());
+ persist_background_consult_job(&record)
+ .map_err(|error| FaultRecord::process(generation, stage, operation, error.to_string()))?;
+
+ let executable = std::env::current_exe()
+ .map_err(|error| FaultRecord::process(generation, stage, operation, error.to_string()))?;
+ let job_file = background_consult_job_path(&record.job_id)
+ .map_err(|error| FaultRecord::process(generation, stage, operation, error.to_string()))?;
+ let child = Command::new(executable)
+ .arg("mcp")
+ .arg("background-consult")
+ .arg("--job-file")
+ .arg(&job_file)
+ .stdin(Stdio::null())
+ .stdout(Stdio::null())
+ .stderr(Stdio::null())
+ .spawn()
+ .map_err(|error| FaultRecord::process(generation, stage, operation, error.to_string()))?;
+
+ record.status = BackgroundConsultStatus::Running;
+ record.started_unix_ms = Some(unix_ms_now());
+ record.updated_unix_ms = unix_ms_now();
+ record.runner_pid = Some(child.id());
+ persist_background_consult_job(&record)
+ .map_err(|error| FaultRecord::process(generation, stage, operation, error.to_string()))?;
+
+ let concise = json!({
+ "mode": request.mode().as_str(),
+ "job_id": record.job_id.display(),
+ "status": record.status,
+ "done": false,
+ "requested_session_id": request.requested_session_id(),
+ "session_mode": request.session_mode(),
+ "prompt_prefix_injected": true,
+ "follow_up_tools": ["consult_job", "consult_jobs"],
+ });
+ let full = json!({
+ "mode": request.mode().as_str(),
+ "job_id": record.job_id.display(),
+ "status": record.status,
+ "done": false,
+ "requested_session_id": request.requested_session_id(),
+ "session_mode": request.session_mode(),
+ "prompt_prefix_injected": true,
+ "prompt": request.prompt.as_str(),
+ "effective_prompt": request.prompt.rendered(),
+ "cwd": request.cwd.display(),
+ "max_turns": request.max_turns.map(TurnLimit::get),
+ "follow_up_tools": ["consult_job", "consult_jobs"],
+ });
+ fallback_detailed_tool_output(
+ &concise,
+ &full,
+ format!(
+ "background consult submitted job={} status=running",
+ record.job_id.display()
+ ),
+ None,
+ SurfaceKind::Read,
+ generation,
+ stage,
+ operation,
+ )
+}
+
+fn execute_background_consult(job_file: PathBuf) -> io::Result<()> {
+ let mut record = read_json_file::<BackgroundConsultJobRecord>(&job_file)?;
+ record.status = BackgroundConsultStatus::Running;
+ let _ = record.runner_pid.get_or_insert(std::process::id());
+ let _ = record.started_unix_ms.get_or_insert_with(unix_ms_now);
+ record.updated_unix_ms = unix_ms_now();
+ persist_background_consult_job_to_path(&job_file, &record)?;
+
+ let request = record
+ .request
+ .clone()
+ .into_consult_request()
+ .map_err(|error| io::Error::other(error.to_string()))?;
+ match invoke_claude(&request) {
+ Ok(response) => {
+ record.status = BackgroundConsultStatus::Succeeded;
+ record.result = Some(BackgroundConsultResponseRecord::from_response(response));
+ record.failure = None;
+ }
+ Err(error) => {
+ record.status = BackgroundConsultStatus::Failed;
+ record.result = None;
+ record.failure = Some(background_failure(error));
+ }
+ }
+ record.finished_unix_ms = Some(unix_ms_now());
+ record.updated_unix_ms = unix_ms_now();
+ record.runner_pid = None;
+ persist_background_consult_job_to_path(&job_file, &record)
+}
+
+fn background_failure(error: ConsultInvocationError) -> BackgroundConsultFailure {
+ match error {
+ ConsultInvocationError::Spawn(source) => BackgroundConsultFailure {
+ class: "process".to_owned(),
+ detail: source.to_string(),
+ },
+ ConsultInvocationError::InvalidJson(detail)
+ | ConsultInvocationError::Downstream(detail) => BackgroundConsultFailure {
+ class: "downstream".to_owned(),
+ detail,
+ },
+ }
+}
+
+fn background_job_tool_output(
+ record: &BackgroundConsultJobRecord,
+ generation: Generation,
+ stage: FaultStage,
+ operation: &str,
+) -> Result<ToolOutput, FaultRecord> {
+ let concise = json!({
+ "job_id": record.job_id.display(),
+ "status": record.status,
+ "done": record.status.done(),
+ "succeeded": record.status.success(),
+ "failed": record.status.failed(),
+ "created_unix_ms": record.created_unix_ms,
+ "updated_unix_ms": record.updated_unix_ms,
+ "started_unix_ms": record.started_unix_ms,
+ "finished_unix_ms": record.finished_unix_ms,
+ "runner_pid": record.runner_pid,
+ "cwd": record.request.cwd,
+ "requested_session_id": record.request.session_id,
+ "prompt_prefix_injected": record.prompt_prefix_injected,
+ "result": record.result.as_ref().map(|result| json!({
+ "response": result.response,
+ "duration_ms": result.duration_ms,
+ "num_turns": result.num_turns,
+ "session_id": result.session_id,
+ "model": result.model_name(),
+ })),
+ "failure": record.failure,
+ });
+ let full = json!({
+ "job": record,
+ });
+ let mut lines = vec![format!(
+ "job={} status={:?}",
+ record.job_id.display(),
+ record.status
+ )];
+ if let Some(result) = record.result.as_ref() {
+ lines.push(format!(
+ "result ready model={} turns={} duration={}",
+ result.model_name().unwrap_or_else(|| "unknown".to_owned()),
+ result.num_turns,
+ render_duration_ms(result.duration_ms)
+ ));
+ lines.push("response:".to_owned());
+ lines.push(result.response.clone());
+ }
+ if let Some(failure) = record.failure.as_ref() {
+ lines.push(format!("failure={} {}", failure.class, failure.detail));
+ }
+ fallback_detailed_tool_output(
+ &concise,
+ &full,
+ lines.join("\n"),
+ None,
+ SurfaceKind::Read,
+ generation,
+ stage,
+ operation,
+ )
+}
+
+fn background_consult_job_root() -> io::Result<PathBuf> {
+ let root = state_dir()
+ .map(|root| root.join("phone_opus"))
+ .or_else(|| home_dir().map(|home| home.join(".local").join("state").join("phone_opus")))
+ .ok_or_else(|| {
+ io::Error::new(
+ io::ErrorKind::NotFound,
+ "failed to resolve phone_opus state root",
+ )
+ })?;
+ let path = root.join("mcp").join("consult_jobs");
+ fs::create_dir_all(&path)?;
+ Ok(path)
+}
+
+fn background_consult_job_path(job_id: &BackgroundConsultJobId) -> io::Result<PathBuf> {
+ Ok(background_consult_job_root()?.join(format!("{}.json", job_id.display())))
+}
+
+fn persist_background_consult_job(record: &BackgroundConsultJobRecord) -> io::Result<()> {
+ let path = background_consult_job_path(&record.job_id)?;
+ persist_background_consult_job_to_path(&path, record)
+}
+
+fn persist_background_consult_job_to_path(
+ path: &Path,
+ record: &BackgroundConsultJobRecord,
+) -> io::Result<()> {
+ write_json_file(path, record)
+}
+
+fn load_background_consult_job(
+ job_id: &BackgroundConsultJobId,
+) -> io::Result<BackgroundConsultJobRecord> {
+ read_json_file(background_consult_job_path(job_id)?.as_path())
+}
+
+fn load_recent_background_consult_jobs(
+ limit: usize,
+) -> io::Result<Vec<BackgroundConsultJobRecord>> {
+ let mut jobs = fs::read_dir(background_consult_job_root()?)?
+ .filter_map(Result::ok)
+ .map(|entry| entry.path())
+ .filter(|path| path.extension().and_then(|ext| ext.to_str()) == Some("json"))
+ .filter_map(|path| read_json_file::<BackgroundConsultJobRecord>(&path).ok())
+ .collect::<Vec<_>>();
+ jobs.sort_by(|left, right| {
+ right
+ .updated_unix_ms
+ .cmp(&left.updated_unix_ms)
+ .then_with(|| left.job_id.cmp(&right.job_id))
+ });
+ jobs.truncate(limit);
+ Ok(jobs)
+}
+
+fn read_json_file<T: for<'de> Deserialize<'de>>(path: &Path) -> io::Result<T> {
+ let bytes = fs::read(path)?;
+ serde_json::from_slice(&bytes).map_err(|error| io::Error::other(error.to_string()))
+}
+
+fn write_json_file<T: Serialize>(path: &Path, value: &T) -> io::Result<()> {
+ let payload =
+ serde_json::to_vec_pretty(value).map_err(|error| io::Error::other(error.to_string()))?;
+ let temp_path = path.with_extension("json.tmp");
+ fs::write(&temp_path, payload)?;
+ fs::rename(temp_path, path)?;
+ Ok(())
+}
+
+fn unix_ms_now() -> u64 {
+ let duration = SystemTime::now()
+ .duration_since(UNIX_EPOCH)
+ .unwrap_or(Duration::ZERO);
+ u64::try_from(duration.as_millis()).unwrap_or(u64::MAX)
+}
+
fn invoke_claude(request: &ConsultRequest) -> Result<ConsultResponse, ConsultInvocationError> {
let mut command = Command::new(claude_binary());
let _ = command
@@ -452,6 +1004,7 @@ fn consult_output(
operation: &str,
) -> Result<ToolOutput, FaultRecord> {
let concise = json!({
+ "mode": request.mode().as_str(),
"response": response.result,
"cwd": response.cwd.display(),
"session_mode": request.session_mode(),
@@ -466,6 +1019,7 @@ fn consult_output(
"permission_denial_count": response.permission_denials.len(),
});
let full = json!({
+ "mode": request.mode().as_str(),
"response": response.result,
"cwd": response.cwd.display(),
"prompt": request.prompt.as_str(),
diff --git a/crates/phone-opus/tests/mcp_hardening.rs b/crates/phone-opus/tests/mcp_hardening.rs
index e9ee06b..a1fb6ae 100644
--- a/crates/phone-opus/tests/mcp_hardening.rs
+++ b/crates/phone-opus/tests/mcp_hardening.rs
@@ -6,6 +6,7 @@ use std::io::{self, BufRead, BufReader, Write};
use std::os::unix::fs::PermissionsExt;
use std::path::{Path, PathBuf};
use std::process::{Child, ChildStdin, ChildStdout, Command, Stdio};
+use std::time::Duration;
use libmcp_testkit::read_json_lines;
use serde as _;
@@ -221,6 +222,8 @@ fn cold_start_exposes_consult_and_ops_tools() -> TestResult {
let tools = harness.tools_list()?;
let tool_names = tool_names(&tools);
assert!(tool_names.contains(&"consult"));
+ assert!(tool_names.contains(&"consult_job"));
+ assert!(tool_names.contains(&"consult_jobs"));
assert!(tool_names.contains(&"health_snapshot"));
assert!(tool_names.contains(&"telemetry_snapshot"));
@@ -371,6 +374,124 @@ fn consult_can_resume_a_prior_session_with_read_only_toolset_and_requested_worki
}
#[test]
+fn consult_can_run_in_background_and_be_polled() -> TestResult {
+ let root = temp_root("consult_background")?;
+ let state_home = root.join("state-home");
+ let sandbox = root.join("sandbox");
+ must(fs::create_dir_all(&state_home), "create state home")?;
+ must(fs::create_dir_all(&sandbox), "create sandbox")?;
+
+ let fake_claude = root.join("claude");
+ let stdout_file = root.join("stdout.json");
+ let args_file = root.join("args.txt");
+ let pwd_file = root.join("pwd.txt");
+ write_fake_claude_script(&fake_claude)?;
+ must(
+ fs::write(
+ &stdout_file,
+ serde_json::to_string(&json!({
+ "type": "result",
+ "subtype": "success",
+ "is_error": false,
+ "duration_ms": 4321,
+ "duration_api_ms": 4200,
+ "num_turns": 3,
+ "result": "background oracle",
+ "stop_reason": "end_turn",
+ "session_id": "3fc69f58-7752-4d9d-a95d-19a217814b6a",
+ "total_cost_usd": 0.25,
+ "usage": {
+ "input_tokens": 11,
+ "output_tokens": 7
+ },
+ "modelUsage": {
+ "claude-opus-4-6": {
+ "inputTokens": 11,
+ "outputTokens": 7
+ }
+ },
+ "permission_denials": [],
+ "fast_mode_state": "off",
+ "uuid": "uuid-456"
+ }))?,
+ ),
+ "write fake stdout",
+ )?;
+
+ let claude_bin = fake_claude.display().to_string();
+ let stdout_path = stdout_file.display().to_string();
+ let args_path = args_file.display().to_string();
+ let pwd_path = pwd_file.display().to_string();
+ let env = [
+ ("PHONE_OPUS_CLAUDE_BIN", claude_bin.as_str()),
+ ("PHONE_OPUS_TEST_STDOUT_FILE", stdout_path.as_str()),
+ ("PHONE_OPUS_TEST_ARGS_FILE", args_path.as_str()),
+ ("PHONE_OPUS_TEST_PWD_FILE", pwd_path.as_str()),
+ ];
+ let mut harness = McpHarness::spawn(&state_home, &env)?;
+ let _ = harness.initialize()?;
+ harness.notify_initialized()?;
+
+ let submit = harness.call_tool(
+ 3,
+ "consult",
+ json!({
+ "prompt": "background oracle",
+ "cwd": sandbox.display().to_string(),
+ "background": true
+ }),
+ )?;
+ assert_tool_ok(&submit);
+ assert_eq!(tool_content(&submit)["mode"].as_str(), Some("background"));
+ let job_id = must_some(
+ tool_content(&submit)["job_id"].as_str().map(str::to_owned),
+ "background job id",
+ )?;
+ let _ = uuid::Uuid::parse_str(&job_id)
+ .map_err(|error| io::Error::other(format!("job id uuid parse: {error}")))?;
+
+ let mut job = Value::Null;
+ for _ in 0..100 {
+ job = harness.call_tool(
+ 4,
+ "consult_job",
+ json!({
+ "job_id": job_id,
+ "render": "json"
+ }),
+ )?;
+ assert_tool_ok(&job);
+ if tool_content(&job)["status"].as_str() == Some("succeeded") {
+ break;
+ }
+ std::thread::sleep(Duration::from_millis(10));
+ }
+
+ assert_eq!(tool_content(&job)["status"].as_str(), Some("succeeded"));
+ assert_eq!(
+ tool_content(&job)["result"]["response"].as_str(),
+ Some("background oracle")
+ );
+
+ let jobs = harness.call_tool(5, "consult_jobs", json!({ "render": "json" }))?;
+ assert_tool_ok(&jobs);
+ assert!(
+ tool_content(&jobs)["jobs"]
+ .as_array()
+ .into_iter()
+ .flatten()
+ .any(|value| value["job_id"] == job_id)
+ );
+
+ let args = must(fs::read_to_string(&args_file), "read fake args file")?;
+ assert!(args.contains(PROMPT_PREFIX));
+ assert!(args.contains("background oracle"));
+ let pwd = must(fs::read_to_string(&pwd_file), "read fake pwd file")?;
+ assert_eq!(pwd.trim(), sandbox.display().to_string());
+ Ok(())
+}
+
+#[test]
fn consult_rejects_invalid_session_handles() -> TestResult {
let root = temp_root("consult_invalid_session")?;
let state_home = root.join("state-home");