diff options
| author | main <main@swarm.moe> | 2026-03-23 16:28:52 -0400 |
|---|---|---|
| committer | main <main@swarm.moe> | 2026-03-23 16:28:52 -0400 |
| commit | c3ad44cf3ec3bcd080f62c19d915ac1749576302 (patch) | |
| tree | a3da7810f794182f4740e0da270e9c40f3fdad2b | |
| parent | 5cf9432092da40a2653c3d156ca5a4746e853827 (diff) | |
| download | phone_opus-c3ad44cf3ec3bcd080f62c19d915ac1749576302.zip | |
Add optional background consult jobs
| -rw-r--r-- | Cargo.lock | 254 | ||||
| -rw-r--r-- | Cargo.toml | 2 | ||||
| -rw-r--r-- | README.md | 3 | ||||
| -rw-r--r-- | assets/codex-skills/phone-opus/SKILL.md | 8 | ||||
| -rw-r--r-- | crates/phone-opus/src/main.rs | 11 | ||||
| -rw-r--r-- | crates/phone-opus/src/mcp/catalog.rs | 38 | ||||
| -rw-r--r-- | crates/phone-opus/src/mcp/host/runtime.rs | 17 | ||||
| -rw-r--r-- | crates/phone-opus/src/mcp/mod.rs | 2 | ||||
| -rw-r--r-- | crates/phone-opus/src/mcp/service.rs | 564 | ||||
| -rw-r--r-- | crates/phone-opus/tests/mcp_hardening.rs | 121 |
10 files changed, 1009 insertions, 11 deletions
@@ -53,6 +53,18 @@ dependencies = [ ] [[package]] +name = "anyhow" +version = "1.0.102" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "7f202df86484c868dbad7eaa557ef785d5c66295e41b460ef922eca0723b842c" + +[[package]] +name = "bitflags" +version = "2.11.0" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "843867be96c8daad0d758b57df9392b6d8d271134fce549de6ce169ff98a92af" + +[[package]] name = "bumpalo" version = "3.20.2" source = "registry+https://github.com/rust-lang/crates.io-index" @@ -164,6 +176,18 @@ source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "d0881ea181b1df73ff77ffaaf9c7544ecc11e82fba9b5f27b262a3c73a332555" [[package]] +name = "equivalent" +version = "1.0.2" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "877a4ace8713b0bcf2a4e7eec82529c029f1d0619886d18145fea96c3ffe5c0f" + +[[package]] +name = "foldhash" +version = "0.1.5" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "d9c4f5dac5e15c24eb999c26181a6ca40b39fe946cbe4c263c7209467bc83af2" + +[[package]] name = "form_urlencoded" version = "1.2.2" source = "registry+https://github.com/rust-lang/crates.io-index" @@ -184,6 +208,34 @@ dependencies = [ ] [[package]] +name = "getrandom" +version = "0.4.2" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "0de51e6874e94e7bf76d726fc5d13ba782deca734ff60d5bb2fb2607c7406555" +dependencies = [ + "cfg-if", + "libc", + "r-efi", + "wasip2", + "wasip3", +] + +[[package]] +name = "hashbrown" +version = "0.15.5" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "9229cfe53dfd69f0609a49f65461bd93001ea1ef889cd5529dd176593f5338a1" +dependencies = [ + "foldhash", +] + +[[package]] +name = "hashbrown" +version = "0.16.1" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "841d1cc9bed7f9236f321df977030373f4a4163ae1a7dbfe1a51a2c1a51d9100" + +[[package]] name = "heck" version = "0.5.0" source = "registry+https://github.com/rust-lang/crates.io-index" @@ -271,6 +323,12 @@ dependencies = [ ] [[package]] +name = "id-arena" +version = "2.3.0" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "3d3067d79b975e8844ca9eb072e16b31c3c1c36928edf9c6789548c524d0d954" + +[[package]] name = "idna" version = "1.1.0" source = "registry+https://github.com/rust-lang/crates.io-index" @@ -292,6 +350,18 @@ dependencies = [ ] [[package]] +name = "indexmap" +version = "2.13.0" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "7714e70437a7dc3ac8eb7e6f8df75fd8eb422675fc7678aff7364301092b1017" +dependencies = [ + "equivalent", + "hashbrown 0.16.1", + "serde", + "serde_core", +] + +[[package]] name = "is_terminal_polyfill" version = "1.70.2" source = "registry+https://github.com/rust-lang/crates.io-index" @@ -314,6 +384,12 @@ dependencies = [ ] [[package]] +name = "leb128fmt" +version = "0.1.0" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "09edd9e8b54e49e587e4f6295a7d29c3ea94d469cb40ab8ca70b288248a81db2" + +[[package]] name = "libc" version = "0.2.183" source = "registry+https://github.com/rust-lang/crates.io-index" @@ -370,6 +446,12 @@ source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "6373607a59f0be73a39b6fe456b8192fcc3585f602af20751600e974dd455e77" [[package]] +name = "log" +version = "0.4.29" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "5e5032e24019045c762d3c0f28f5b6b8bbf38563a65908389bf7978758920897" + +[[package]] name = "memchr" version = "2.8.0" source = "registry+https://github.com/rust-lang/crates.io-index" @@ -441,6 +523,16 @@ source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "439ee305def115ba05938db6eb1644ff94165c5ab5e9420d1c1bcedbba909391" [[package]] +name = "prettyplease" +version = "0.2.37" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "479ca8adacdd7ce8f1fb39ce9ecccbfe93a3f1344b3d0d97f20bc0196208f62b" +dependencies = [ + "proc-macro2", + "syn", +] + +[[package]] name = "proc-macro2" version = "1.0.106" source = "registry+https://github.com/rust-lang/crates.io-index" @@ -459,12 +551,18 @@ dependencies = [ ] [[package]] +name = "r-efi" +version = "6.0.0" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "f8dcc9c7d52a811697d2151c701e0d08956f92b0e24136cf4cf27b57a6a0d9bf" + +[[package]] name = "redox_users" version = "0.5.2" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "a4e608c6638b9c18977b00b475ac1f28d14e84b27d8d42f70e0bf1e3dec127ac" dependencies = [ - "getrandom", + "getrandom 0.2.17", "libredox", "thiserror", ] @@ -521,6 +619,12 @@ dependencies = [ ] [[package]] +name = "semver" +version = "1.0.27" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "d767eb0aabc880b29956c35734170f26ed551a859dbd361d140cdbeca61ab1e2" + +[[package]] name = "serde" version = "1.0.228" source = "registry+https://github.com/rust-lang/crates.io-index" @@ -704,6 +808,12 @@ source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "e6e4313cd5fcd3dad5cafa179702e2b244f760991f45397d14d4ebf38247da75" [[package]] +name = "unicode-xid" +version = "0.2.6" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "ebc1c04c71510c7f702b52b7c350734c9ff1295c464a03335b00bb84fc54f853" + +[[package]] name = "url" version = "2.5.8" source = "registry+https://github.com/rust-lang/crates.io-index" @@ -733,7 +843,9 @@ version = "1.22.0" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "a68d3c8f01c0cfa54a75291d83601161799e4a89a39e0929f4b0354d88757a37" dependencies = [ + "getrandom 0.4.2", "js-sys", + "serde_core", "wasm-bindgen", ] @@ -744,6 +856,24 @@ source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "ccf3ec651a847eb01de73ccad15eb7d99f80485de043efb2f370cd654f4ea44b" [[package]] +name = "wasip2" +version = "1.0.2+wasi-0.2.9" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "9517f9239f02c069db75e65f174b3da828fe5f5b945c4dd26bd25d89c03ebcf5" +dependencies = [ + "wit-bindgen", +] + +[[package]] +name = "wasip3" +version = "0.4.0+wasi-0.3.0-rc-2026-01-06" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "5428f8bf88ea5ddc08faddef2ac4a67e390b88186c703ce6dbd955e1c145aca5" +dependencies = [ + "wit-bindgen", +] + +[[package]] name = "wasm-bindgen" version = "0.2.114" source = "registry+https://github.com/rust-lang/crates.io-index" @@ -789,6 +919,40 @@ dependencies = [ ] [[package]] +name = "wasm-encoder" +version = "0.244.0" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "990065f2fe63003fe337b932cfb5e3b80e0b4d0f5ff650e6985b1048f62c8319" +dependencies = [ + "leb128fmt", + "wasmparser", +] + +[[package]] +name = "wasm-metadata" +version = "0.244.0" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "bb0e353e6a2fbdc176932bbaab493762eb1255a7900fe0fea1a2f96c296cc909" +dependencies = [ + "anyhow", + "indexmap", + "wasm-encoder", + "wasmparser", +] + +[[package]] +name = "wasmparser" +version = "0.244.0" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "47b807c72e1bac69382b3a6fb3dbe8ea4c0ed87ff5629b8685ae6b9a611028fe" +dependencies = [ + "bitflags", + "hashbrown 0.15.5", + "indexmap", + "semver", +] + +[[package]] name = "windows-link" version = "0.2.1" source = "registry+https://github.com/rust-lang/crates.io-index" @@ -804,6 +968,94 @@ dependencies = [ ] [[package]] +name = "wit-bindgen" +version = "0.51.0" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "d7249219f66ced02969388cf2bb044a09756a083d0fab1e566056b04d9fbcaa5" +dependencies = [ + "wit-bindgen-rust-macro", +] + +[[package]] +name = "wit-bindgen-core" +version = "0.51.0" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "ea61de684c3ea68cb082b7a88508a8b27fcc8b797d738bfc99a82facf1d752dc" +dependencies = [ + "anyhow", + "heck", + "wit-parser", +] + +[[package]] +name = "wit-bindgen-rust" +version = "0.51.0" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "b7c566e0f4b284dd6561c786d9cb0142da491f46a9fbed79ea69cdad5db17f21" +dependencies = [ + "anyhow", + "heck", + "indexmap", + "prettyplease", + "syn", + "wasm-metadata", + "wit-bindgen-core", + "wit-component", +] + +[[package]] +name = "wit-bindgen-rust-macro" +version = "0.51.0" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "0c0f9bfd77e6a48eccf51359e3ae77140a7f50b1e2ebfe62422d8afdaffab17a" +dependencies = [ + "anyhow", + "prettyplease", + "proc-macro2", + "quote", + "syn", + "wit-bindgen-core", + "wit-bindgen-rust", +] + +[[package]] +name = "wit-component" +version = "0.244.0" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "9d66ea20e9553b30172b5e831994e35fbde2d165325bec84fc43dbf6f4eb9cb2" +dependencies = [ + "anyhow", + "bitflags", + "indexmap", + "log", + "serde", + "serde_derive", + "serde_json", + "wasm-encoder", + "wasm-metadata", + "wasmparser", + "wit-parser", +] + +[[package]] +name = "wit-parser" +version = "0.244.0" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "ecc8ac4bc1dc3381b7f59c34f00b67e18f910c2c0f50015669dde7def656a736" +dependencies = [ + "anyhow", + "id-arena", + "indexmap", + "log", + "semver", + "serde", + "serde_derive", + "serde_json", + "unicode-xid", + "wasmparser", +] + +[[package]] name = "writeable" version = "0.6.2" source = "registry+https://github.com/rust-lang/crates.io-index" @@ -21,7 +21,7 @@ libmcp-testkit = { git = "https://git.swarm.moe/libmcp.git", rev = "e325cd23f193 serde = { version = "1.0.228", features = ["derive"] } serde_json = "1.0.145" thiserror = "2.0.17" -uuid = "1.18.1" +uuid = { version = "1.18.1", features = ["serde", "v4"] } [workspace.lints.rust] elided_lifetimes_in_paths = "deny" @@ -9,6 +9,9 @@ It exposes one blocking domain tool: and return the response plus execution metadata - pass `session_id` from a previous response to resume that Claude Code conversation - a fixed consult prefix is prepended before the caller-supplied prompt + - pass `background: true` to queue the consult and get back a job handle instead of blocking +- `consult_job`: inspect one background consult job and receive the final result once it completes +- `consult_jobs`: list recent background consult jobs The server keeps the public MCP session in a durable host, isolates the actual Claude invocation in a disposable worker, and ships standard health and diff --git a/assets/codex-skills/phone-opus/SKILL.md b/assets/codex-skills/phone-opus/SKILL.md index 8fb2000..305badb 100644 --- a/assets/codex-skills/phone-opus/SKILL.md +++ b/assets/codex-skills/phone-opus/SKILL.md @@ -17,12 +17,17 @@ should be taken as authoritative or final. It is a pure consultant. - Point Claude at a specific repository with `cwd`. - Cap the consultation with `max_turns` when needed. - Reuse `session_id` from an earlier call when you want Claude to continue the same conversation. +- Set `background: true` when you want to launch a consult, keep working, and poll for the answer later. ## Tool surface - `consult` - required: `prompt` - - optional: `cwd`, `max_turns`, `session_id`, `render`, `detail` + - optional: `cwd`, `max_turns`, `session_id`, `background`, `render`, `detail` +- `consult_job` + - required: `job_id` +- `consult_jobs` + - optional: `limit`, `render`, `detail` - `health_snapshot` - `telemetry_snapshot` @@ -33,6 +38,7 @@ should be taken as authoritative or final. It is a pure consultant. - Uses `--permission-mode dontAsk`, so only globally preapproved read-only Bash commands can execute. - This surface is consultative only. Edit tools are unavailable. - The returned `session_id` is reusable: pass it back into a later `consult` call to continue that Claude conversation. +- Background consults return a `job_id`; use `consult_job` to poll one job or `consult_jobs` to rediscover recent ones. ## Example diff --git a/crates/phone-opus/src/main.rs b/crates/phone-opus/src/main.rs index 79ace26..a1cace7 100644 --- a/crates/phone-opus/src/main.rs +++ b/crates/phone-opus/src/main.rs @@ -3,6 +3,7 @@ mod mcp; use clap::{Args, Parser, Subcommand}; #[cfg(test)] use libmcp_testkit as _; +use std::path::PathBuf; #[derive(Parser)] #[command( @@ -30,6 +31,8 @@ enum McpCommand { Serve, /// Run the disposable worker process. Worker(McpWorkerArgs), + /// Run one detached background consult job. + BackgroundConsult(McpBackgroundConsultArgs), } #[derive(Args)] @@ -39,12 +42,20 @@ struct McpWorkerArgs { generation: u64, } +#[derive(Args)] +struct McpBackgroundConsultArgs { + /// Persisted background job file to execute and update. + #[arg(long)] + job_file: PathBuf, +} + fn main() -> Result<(), Box<dyn std::error::Error>> { let cli = Cli::parse(); match cli.command { Command::Mcp { command } => match command { McpCommand::Serve => mcp::run_host()?, McpCommand::Worker(args) => mcp::run_worker(args.generation)?, + McpCommand::BackgroundConsult(args) => mcp::run_background_consult(args.job_file)?, }, } Ok(()) diff --git a/crates/phone-opus/src/mcp/catalog.rs b/crates/phone-opus/src/mcp/catalog.rs index 4c71e83..839c477 100644 --- a/crates/phone-opus/src/mcp/catalog.rs +++ b/crates/phone-opus/src/mcp/catalog.rs @@ -41,11 +41,23 @@ impl ToolSpec { const TOOL_SPECS: &[ToolSpec] = &[ ToolSpec { name: "consult", - description: "Run a blocking consult against the system Claude Code install using a read-only built-in toolset, optionally resume a prior Claude session by session_id, and return the response.", + description: "Run a consult against the system Claude Code install using a read-only built-in toolset, optionally resume a prior Claude session by session_id, optionally queue the consult in the background, and return the response or job handle.", dispatch: DispatchTarget::Worker, replay: ReplayContract::NeverReplay, }, ToolSpec { + name: "consult_job", + description: "Read the status of one background consult job by job_id. When the job has finished, the final Claude response or failure is included.", + dispatch: DispatchTarget::Host, + replay: ReplayContract::Convergent, + }, + ToolSpec { + name: "consult_jobs", + description: "List recent background consult jobs. Defaults to render=porcelain; use render=json for structured output.", + dispatch: DispatchTarget::Host, + replay: ReplayContract::Convergent, + }, + ToolSpec { name: "health_snapshot", description: "Read host lifecycle, worker generation, rollout state, and latest fault. Defaults to render=porcelain; use render=json for structured output.", dispatch: DispatchTarget::Host, @@ -98,10 +110,34 @@ fn tool_schema(name: &str) -> Value { "session_id": { "type": "string", "description": "Optional Claude session handle returned by a previous consult call. When set, phone_opus resumes that conversation instead of starting a fresh one." + }, + "background": { + "type": "boolean", + "description": "When true, queue the consult as a background job and return immediately with a job handle. The default is false, which keeps consult synchronous." } }, "required": ["prompt"] })), + "consult_job" => with_common_presentation(json!({ + "type": "object", + "properties": { + "job_id": { + "type": "string", + "description": "Background consult job handle returned by consult with background=true." + } + }, + "required": ["job_id"] + })), + "consult_jobs" => with_common_presentation(json!({ + "type": "object", + "properties": { + "limit": { + "type": "integer", + "minimum": 1, + "description": "Maximum number of recent background jobs to return. Defaults to 10." + } + } + })), "health_snapshot" | "telemetry_snapshot" => with_common_presentation(json!({ "type": "object", "properties": {} diff --git a/crates/phone-opus/src/mcp/host/runtime.rs b/crates/phone-opus/src/mcp/host/runtime.rs index 5922766..1d18453 100644 --- a/crates/phone-opus/src/mcp/host/runtime.rs +++ b/crates/phone-opus/src/mcp/host/runtime.rs @@ -25,6 +25,7 @@ use crate::mcp::protocol::{ FORCE_ROLLOUT_ENV, HOST_STATE_ENV, HostRequestId, HostStateSeed, PROTOCOL_VERSION, SERVER_NAME, WORKER_CRASH_ONCE_ENV, WorkerOperation, WorkerSpawnConfig, }; +use crate::mcp::service::{consult_job_tool_output, consult_jobs_tool_output}; use crate::mcp::telemetry::ServerTelemetry; pub(crate) fn run_host() -> Result<(), Box<dyn std::error::Error>> { @@ -360,9 +361,23 @@ impl HostRuntime { fn handle_host_tool(&mut self, name: &str, arguments: Value) -> Result<Value, FaultRecord> { let operation = format!("tools/call:{name}"); let generation = self.worker.generation(); - let (presentation, _arguments) = + let (presentation, arguments) = split_presentation(arguments, &operation, generation, FaultStage::Host)?; match name { + "consult_job" => tool_success( + consult_job_tool_output(arguments, generation, FaultStage::Host, &operation)?, + presentation, + generation, + FaultStage::Host, + &operation, + ), + "consult_jobs" => tool_success( + consult_jobs_tool_output(arguments, generation, FaultStage::Host, &operation)?, + presentation, + generation, + FaultStage::Host, + &operation, + ), "health_snapshot" => { let rollout = if self.binary.rollout_pending().map_err(|error| { FaultRecord::rollout(generation, &operation, error.to_string()) diff --git a/crates/phone-opus/src/mcp/mod.rs b/crates/phone-opus/src/mcp/mod.rs index 666598f..ecf5aad 100644 --- a/crates/phone-opus/src/mcp/mod.rs +++ b/crates/phone-opus/src/mcp/mod.rs @@ -7,4 +7,4 @@ mod service; mod telemetry; pub(crate) use host::runtime::run_host; -pub(crate) use service::run_worker; +pub(crate) use service::{run_background_consult, run_worker}; diff --git a/crates/phone-opus/src/mcp/service.rs b/crates/phone-opus/src/mcp/service.rs index d42c0a0..5b38c4b 100644 --- a/crates/phone-opus/src/mcp/service.rs +++ b/crates/phone-opus/src/mcp/service.rs @@ -1,10 +1,13 @@ use std::collections::BTreeMap; +use std::fs; use std::io::{self, BufRead, Write}; use std::path::{Path, PathBuf}; -use std::process::Command; +use std::process::{Command, Stdio}; +use std::time::{Duration, SystemTime, UNIX_EPOCH}; +use dirs::{home_dir, state_dir}; use libmcp::{Generation, SurfaceKind}; -use serde::Deserialize; +use serde::{Deserialize, Serialize}; use serde_json::{Value, json}; use thiserror::Error; use uuid::Uuid; @@ -47,6 +50,11 @@ pub(crate) fn run_worker(generation: u64) -> Result<(), Box<dyn std::error::Erro Ok(()) } +pub(crate) fn run_background_consult(job_file: PathBuf) -> Result<(), Box<dyn std::error::Error>> { + execute_background_consult(job_file)?; + Ok(()) +} + struct WorkerService { generation: Generation, } @@ -76,9 +84,19 @@ impl WorkerService { let args = deserialize::<ConsultArgs>(arguments, &operation, self.generation)?; let request = ConsultRequest::parse(args) .map_err(|error| invalid_consult_request(self.generation, &operation, error))?; - let response = invoke_claude(&request) - .map_err(|error| consult_fault(self.generation, &operation, error))?; - consult_output(&request, &response, self.generation, &operation)? + match request.mode() { + ConsultMode::Sync => { + let response = invoke_claude(&request) + .map_err(|error| consult_fault(self.generation, &operation, error))?; + consult_output(&request, &response, self.generation, &operation)? + } + ConsultMode::Background => submit_background_consult( + &request, + self.generation, + FaultStage::Worker, + &operation, + )?, + } } other => { return Err(FaultRecord::invalid_input( @@ -105,6 +123,17 @@ struct ConsultArgs { cwd: Option<String>, max_turns: Option<u64>, session_id: Option<String>, + background: Option<bool>, +} + +#[derive(Debug, Deserialize)] +struct ConsultJobArgs { + job_id: String, +} + +#[derive(Debug, Default, Deserialize)] +struct ConsultJobsArgs { + limit: Option<u64>, } #[derive(Debug, Clone)] @@ -113,6 +142,7 @@ struct ConsultRequest { cwd: WorkingDirectory, max_turns: Option<TurnLimit>, session: Option<SessionHandle>, + mode: ConsultMode, } impl ConsultRequest { @@ -122,9 +152,14 @@ impl ConsultRequest { cwd: WorkingDirectory::resolve(args.cwd)?, max_turns: args.max_turns.map(TurnLimit::parse).transpose()?, session: args.session_id.map(SessionHandle::parse).transpose()?, + mode: ConsultMode::from_background(args.background), }) } + fn mode(&self) -> ConsultMode { + self.mode + } + fn session_mode(&self) -> &'static str { if self.session.is_some() { "resumed" @@ -136,6 +171,38 @@ impl ConsultRequest { fn requested_session_id(&self) -> Option<String> { self.session.as_ref().map(SessionHandle::display) } + + fn background_request(&self) -> BackgroundConsultRequest { + BackgroundConsultRequest { + prompt: self.prompt.as_str().to_owned(), + cwd: self.cwd.display(), + max_turns: self.max_turns.map(TurnLimit::get), + session_id: self.requested_session_id(), + } + } +} + +#[derive(Debug, Clone, Copy, Eq, PartialEq)] +enum ConsultMode { + Sync, + Background, +} + +impl ConsultMode { + fn from_background(raw: Option<bool>) -> Self { + if raw.unwrap_or(false) { + Self::Background + } else { + Self::Sync + } + } + + fn as_str(self) -> &'static str { + match self { + Self::Sync => "sync", + Self::Background => "background", + } + } } #[derive(Debug, Clone)] @@ -232,6 +299,172 @@ impl SessionHandle { } } +#[derive(Debug, Clone, Deserialize, Eq, PartialEq, Serialize)] +struct BackgroundConsultRequest { + prompt: String, + cwd: String, + max_turns: Option<u64>, + session_id: Option<String>, +} + +impl BackgroundConsultRequest { + fn into_consult_request(self) -> Result<ConsultRequest, ConsultRequestError> { + ConsultRequest::parse(ConsultArgs { + prompt: self.prompt, + cwd: Some(self.cwd), + max_turns: self.max_turns, + session_id: self.session_id, + background: Some(false), + }) + } +} + +#[derive(Debug, Clone, Copy, Deserialize, Eq, PartialEq, Serialize)] +#[serde(rename_all = "snake_case")] +enum BackgroundConsultStatus { + Queued, + Running, + Succeeded, + Failed, +} + +impl BackgroundConsultStatus { + fn done(self) -> bool { + matches!(self, Self::Succeeded | Self::Failed) + } + + fn success(self) -> bool { + matches!(self, Self::Succeeded) + } + + fn failed(self) -> bool { + matches!(self, Self::Failed) + } +} + +#[derive(Debug, Clone, Deserialize, Eq, Ord, PartialEq, PartialOrd, Serialize)] +#[serde(transparent)] +struct BackgroundConsultJobId(Uuid); + +impl BackgroundConsultJobId { + fn new() -> Self { + Self(Uuid::new_v4()) + } + + fn parse(raw: String) -> Result<Self, ConsultRequestError> { + Uuid::parse_str(&raw) + .map(Self) + .map_err(|_| ConsultRequestError::InvalidJobHandle(raw)) + } + + fn display(&self) -> String { + self.0.to_string() + } +} + +#[derive(Debug, Clone, Deserialize, Eq, PartialEq, Serialize)] +struct BackgroundConsultFailure { + class: String, + detail: String, +} + +#[derive(Debug, Clone, Deserialize, PartialEq, Serialize)] +struct BackgroundConsultResponseRecord { + cwd: String, + response: String, + duration_ms: u64, + duration_api_ms: Option<u64>, + num_turns: u64, + stop_reason: Option<String>, + session_id: Option<String>, + total_cost_usd: Option<f64>, + usage: Option<Value>, + model_usage: Option<Value>, + permission_denials: Vec<Value>, + fast_mode_state: Option<String>, + uuid: Option<String>, +} + +impl BackgroundConsultResponseRecord { + fn from_response(response: ConsultResponse) -> Self { + Self { + cwd: response.cwd.display(), + response: response.result, + duration_ms: response.duration_ms, + duration_api_ms: response.duration_api_ms, + num_turns: response.num_turns, + stop_reason: response.stop_reason, + session_id: response.session_id, + total_cost_usd: response.total_cost_usd, + usage: response.usage, + model_usage: response.model_usage, + permission_denials: response.permission_denials, + fast_mode_state: response.fast_mode_state, + uuid: response.uuid, + } + } + + fn model_name(&self) -> Option<String> { + let Value::Object(models) = self.model_usage.as_ref()? else { + return None; + }; + models.keys().next().cloned() + } +} + +#[derive(Debug, Clone, Deserialize, PartialEq, Serialize)] +struct BackgroundConsultJobRecord { + job_id: BackgroundConsultJobId, + status: BackgroundConsultStatus, + created_unix_ms: u64, + updated_unix_ms: u64, + started_unix_ms: Option<u64>, + finished_unix_ms: Option<u64>, + runner_pid: Option<u32>, + request: BackgroundConsultRequest, + prompt_prefix_injected: bool, + result: Option<BackgroundConsultResponseRecord>, + failure: Option<BackgroundConsultFailure>, +} + +impl BackgroundConsultJobRecord { + fn new(job_id: BackgroundConsultJobId, request: BackgroundConsultRequest) -> Self { + let now = unix_ms_now(); + Self { + job_id, + status: BackgroundConsultStatus::Queued, + created_unix_ms: now, + updated_unix_ms: now, + started_unix_ms: None, + finished_unix_ms: None, + runner_pid: None, + request, + prompt_prefix_injected: true, + result: None, + failure: None, + } + } + + fn summary(&self) -> Value { + json!({ + "job_id": self.job_id.display(), + "status": self.status, + "done": self.status.done(), + "succeeded": self.status.success(), + "failed": self.status.failed(), + "created_unix_ms": self.created_unix_ms, + "updated_unix_ms": self.updated_unix_ms, + "started_unix_ms": self.started_unix_ms, + "finished_unix_ms": self.finished_unix_ms, + "runner_pid": self.runner_pid, + "cwd": self.request.cwd, + "max_turns": self.request.max_turns, + "requested_session_id": self.request.session_id, + "prompt_prefix_injected": self.prompt_prefix_injected, + }) + } +} + #[derive(Debug, Error)] enum ConsultRequestError { #[error("prompt must not be empty")] @@ -246,6 +479,8 @@ enum ConsultRequestError { InvalidTurnLimit, #[error("session_id must be a valid UUID, got `{0}`")] InvalidSessionHandle(String), + #[error("job_id must be a valid UUID, got `{0}`")] + InvalidJobHandle(String), } #[derive(Debug, Error)] @@ -348,6 +583,323 @@ fn consult_fault( } } +pub(crate) fn consult_job_tool_output( + arguments: Value, + generation: Generation, + stage: FaultStage, + operation: &str, +) -> Result<ToolOutput, FaultRecord> { + let args = deserialize::<ConsultJobArgs>(arguments, operation, generation)?; + let job_id = BackgroundConsultJobId::parse(args.job_id) + .map_err(|error| invalid_consult_request(generation, operation, error))?; + let record = load_background_consult_job(&job_id).map_err(|error| { + FaultRecord::downstream(generation, stage, operation, error.to_string()) + })?; + background_job_tool_output(&record, generation, stage, operation) +} + +pub(crate) fn consult_jobs_tool_output( + arguments: Value, + generation: Generation, + stage: FaultStage, + operation: &str, +) -> Result<ToolOutput, FaultRecord> { + let args = deserialize::<ConsultJobsArgs>(arguments, operation, generation)?; + let limit = args.limit.unwrap_or(10).clamp(1, 50); + let jobs = load_recent_background_consult_jobs(limit as usize).map_err(|error| { + FaultRecord::downstream(generation, stage, operation, error.to_string()) + })?; + let concise = json!({ + "jobs": jobs.iter().map(BackgroundConsultJobRecord::summary).collect::<Vec<_>>(), + "count": jobs.len(), + }); + let full = json!({ + "jobs": jobs, + "count": jobs.len(), + }); + let lines = if jobs.is_empty() { + "no background consult jobs".to_owned() + } else { + jobs.iter() + .map(|job| { + format!( + "{} status={} cwd={}", + job.job_id.display(), + serde_json::to_string(&job.status).unwrap_or_else(|_| "\"unknown\"".to_owned()), + job.request.cwd + ) + }) + .collect::<Vec<_>>() + .join("\n") + }; + fallback_detailed_tool_output( + &concise, + &full, + lines, + None, + SurfaceKind::Read, + generation, + stage, + operation, + ) +} + +fn submit_background_consult( + request: &ConsultRequest, + generation: Generation, + stage: FaultStage, + operation: &str, +) -> Result<ToolOutput, FaultRecord> { + let job_id = BackgroundConsultJobId::new(); + let mut record = BackgroundConsultJobRecord::new(job_id, request.background_request()); + persist_background_consult_job(&record) + .map_err(|error| FaultRecord::process(generation, stage, operation, error.to_string()))?; + + let executable = std::env::current_exe() + .map_err(|error| FaultRecord::process(generation, stage, operation, error.to_string()))?; + let job_file = background_consult_job_path(&record.job_id) + .map_err(|error| FaultRecord::process(generation, stage, operation, error.to_string()))?; + let child = Command::new(executable) + .arg("mcp") + .arg("background-consult") + .arg("--job-file") + .arg(&job_file) + .stdin(Stdio::null()) + .stdout(Stdio::null()) + .stderr(Stdio::null()) + .spawn() + .map_err(|error| FaultRecord::process(generation, stage, operation, error.to_string()))?; + + record.status = BackgroundConsultStatus::Running; + record.started_unix_ms = Some(unix_ms_now()); + record.updated_unix_ms = unix_ms_now(); + record.runner_pid = Some(child.id()); + persist_background_consult_job(&record) + .map_err(|error| FaultRecord::process(generation, stage, operation, error.to_string()))?; + + let concise = json!({ + "mode": request.mode().as_str(), + "job_id": record.job_id.display(), + "status": record.status, + "done": false, + "requested_session_id": request.requested_session_id(), + "session_mode": request.session_mode(), + "prompt_prefix_injected": true, + "follow_up_tools": ["consult_job", "consult_jobs"], + }); + let full = json!({ + "mode": request.mode().as_str(), + "job_id": record.job_id.display(), + "status": record.status, + "done": false, + "requested_session_id": request.requested_session_id(), + "session_mode": request.session_mode(), + "prompt_prefix_injected": true, + "prompt": request.prompt.as_str(), + "effective_prompt": request.prompt.rendered(), + "cwd": request.cwd.display(), + "max_turns": request.max_turns.map(TurnLimit::get), + "follow_up_tools": ["consult_job", "consult_jobs"], + }); + fallback_detailed_tool_output( + &concise, + &full, + format!( + "background consult submitted job={} status=running", + record.job_id.display() + ), + None, + SurfaceKind::Read, + generation, + stage, + operation, + ) +} + +fn execute_background_consult(job_file: PathBuf) -> io::Result<()> { + let mut record = read_json_file::<BackgroundConsultJobRecord>(&job_file)?; + record.status = BackgroundConsultStatus::Running; + let _ = record.runner_pid.get_or_insert(std::process::id()); + let _ = record.started_unix_ms.get_or_insert_with(unix_ms_now); + record.updated_unix_ms = unix_ms_now(); + persist_background_consult_job_to_path(&job_file, &record)?; + + let request = record + .request + .clone() + .into_consult_request() + .map_err(|error| io::Error::other(error.to_string()))?; + match invoke_claude(&request) { + Ok(response) => { + record.status = BackgroundConsultStatus::Succeeded; + record.result = Some(BackgroundConsultResponseRecord::from_response(response)); + record.failure = None; + } + Err(error) => { + record.status = BackgroundConsultStatus::Failed; + record.result = None; + record.failure = Some(background_failure(error)); + } + } + record.finished_unix_ms = Some(unix_ms_now()); + record.updated_unix_ms = unix_ms_now(); + record.runner_pid = None; + persist_background_consult_job_to_path(&job_file, &record) +} + +fn background_failure(error: ConsultInvocationError) -> BackgroundConsultFailure { + match error { + ConsultInvocationError::Spawn(source) => BackgroundConsultFailure { + class: "process".to_owned(), + detail: source.to_string(), + }, + ConsultInvocationError::InvalidJson(detail) + | ConsultInvocationError::Downstream(detail) => BackgroundConsultFailure { + class: "downstream".to_owned(), + detail, + }, + } +} + +fn background_job_tool_output( + record: &BackgroundConsultJobRecord, + generation: Generation, + stage: FaultStage, + operation: &str, +) -> Result<ToolOutput, FaultRecord> { + let concise = json!({ + "job_id": record.job_id.display(), + "status": record.status, + "done": record.status.done(), + "succeeded": record.status.success(), + "failed": record.status.failed(), + "created_unix_ms": record.created_unix_ms, + "updated_unix_ms": record.updated_unix_ms, + "started_unix_ms": record.started_unix_ms, + "finished_unix_ms": record.finished_unix_ms, + "runner_pid": record.runner_pid, + "cwd": record.request.cwd, + "requested_session_id": record.request.session_id, + "prompt_prefix_injected": record.prompt_prefix_injected, + "result": record.result.as_ref().map(|result| json!({ + "response": result.response, + "duration_ms": result.duration_ms, + "num_turns": result.num_turns, + "session_id": result.session_id, + "model": result.model_name(), + })), + "failure": record.failure, + }); + let full = json!({ + "job": record, + }); + let mut lines = vec![format!( + "job={} status={:?}", + record.job_id.display(), + record.status + )]; + if let Some(result) = record.result.as_ref() { + lines.push(format!( + "result ready model={} turns={} duration={}", + result.model_name().unwrap_or_else(|| "unknown".to_owned()), + result.num_turns, + render_duration_ms(result.duration_ms) + )); + lines.push("response:".to_owned()); + lines.push(result.response.clone()); + } + if let Some(failure) = record.failure.as_ref() { + lines.push(format!("failure={} {}", failure.class, failure.detail)); + } + fallback_detailed_tool_output( + &concise, + &full, + lines.join("\n"), + None, + SurfaceKind::Read, + generation, + stage, + operation, + ) +} + +fn background_consult_job_root() -> io::Result<PathBuf> { + let root = state_dir() + .map(|root| root.join("phone_opus")) + .or_else(|| home_dir().map(|home| home.join(".local").join("state").join("phone_opus"))) + .ok_or_else(|| { + io::Error::new( + io::ErrorKind::NotFound, + "failed to resolve phone_opus state root", + ) + })?; + let path = root.join("mcp").join("consult_jobs"); + fs::create_dir_all(&path)?; + Ok(path) +} + +fn background_consult_job_path(job_id: &BackgroundConsultJobId) -> io::Result<PathBuf> { + Ok(background_consult_job_root()?.join(format!("{}.json", job_id.display()))) +} + +fn persist_background_consult_job(record: &BackgroundConsultJobRecord) -> io::Result<()> { + let path = background_consult_job_path(&record.job_id)?; + persist_background_consult_job_to_path(&path, record) +} + +fn persist_background_consult_job_to_path( + path: &Path, + record: &BackgroundConsultJobRecord, +) -> io::Result<()> { + write_json_file(path, record) +} + +fn load_background_consult_job( + job_id: &BackgroundConsultJobId, +) -> io::Result<BackgroundConsultJobRecord> { + read_json_file(background_consult_job_path(job_id)?.as_path()) +} + +fn load_recent_background_consult_jobs( + limit: usize, +) -> io::Result<Vec<BackgroundConsultJobRecord>> { + let mut jobs = fs::read_dir(background_consult_job_root()?)? + .filter_map(Result::ok) + .map(|entry| entry.path()) + .filter(|path| path.extension().and_then(|ext| ext.to_str()) == Some("json")) + .filter_map(|path| read_json_file::<BackgroundConsultJobRecord>(&path).ok()) + .collect::<Vec<_>>(); + jobs.sort_by(|left, right| { + right + .updated_unix_ms + .cmp(&left.updated_unix_ms) + .then_with(|| left.job_id.cmp(&right.job_id)) + }); + jobs.truncate(limit); + Ok(jobs) +} + +fn read_json_file<T: for<'de> Deserialize<'de>>(path: &Path) -> io::Result<T> { + let bytes = fs::read(path)?; + serde_json::from_slice(&bytes).map_err(|error| io::Error::other(error.to_string())) +} + +fn write_json_file<T: Serialize>(path: &Path, value: &T) -> io::Result<()> { + let payload = + serde_json::to_vec_pretty(value).map_err(|error| io::Error::other(error.to_string()))?; + let temp_path = path.with_extension("json.tmp"); + fs::write(&temp_path, payload)?; + fs::rename(temp_path, path)?; + Ok(()) +} + +fn unix_ms_now() -> u64 { + let duration = SystemTime::now() + .duration_since(UNIX_EPOCH) + .unwrap_or(Duration::ZERO); + u64::try_from(duration.as_millis()).unwrap_or(u64::MAX) +} + fn invoke_claude(request: &ConsultRequest) -> Result<ConsultResponse, ConsultInvocationError> { let mut command = Command::new(claude_binary()); let _ = command @@ -452,6 +1004,7 @@ fn consult_output( operation: &str, ) -> Result<ToolOutput, FaultRecord> { let concise = json!({ + "mode": request.mode().as_str(), "response": response.result, "cwd": response.cwd.display(), "session_mode": request.session_mode(), @@ -466,6 +1019,7 @@ fn consult_output( "permission_denial_count": response.permission_denials.len(), }); let full = json!({ + "mode": request.mode().as_str(), "response": response.result, "cwd": response.cwd.display(), "prompt": request.prompt.as_str(), diff --git a/crates/phone-opus/tests/mcp_hardening.rs b/crates/phone-opus/tests/mcp_hardening.rs index e9ee06b..a1fb6ae 100644 --- a/crates/phone-opus/tests/mcp_hardening.rs +++ b/crates/phone-opus/tests/mcp_hardening.rs @@ -6,6 +6,7 @@ use std::io::{self, BufRead, BufReader, Write}; use std::os::unix::fs::PermissionsExt; use std::path::{Path, PathBuf}; use std::process::{Child, ChildStdin, ChildStdout, Command, Stdio}; +use std::time::Duration; use libmcp_testkit::read_json_lines; use serde as _; @@ -221,6 +222,8 @@ fn cold_start_exposes_consult_and_ops_tools() -> TestResult { let tools = harness.tools_list()?; let tool_names = tool_names(&tools); assert!(tool_names.contains(&"consult")); + assert!(tool_names.contains(&"consult_job")); + assert!(tool_names.contains(&"consult_jobs")); assert!(tool_names.contains(&"health_snapshot")); assert!(tool_names.contains(&"telemetry_snapshot")); @@ -371,6 +374,124 @@ fn consult_can_resume_a_prior_session_with_read_only_toolset_and_requested_worki } #[test] +fn consult_can_run_in_background_and_be_polled() -> TestResult { + let root = temp_root("consult_background")?; + let state_home = root.join("state-home"); + let sandbox = root.join("sandbox"); + must(fs::create_dir_all(&state_home), "create state home")?; + must(fs::create_dir_all(&sandbox), "create sandbox")?; + + let fake_claude = root.join("claude"); + let stdout_file = root.join("stdout.json"); + let args_file = root.join("args.txt"); + let pwd_file = root.join("pwd.txt"); + write_fake_claude_script(&fake_claude)?; + must( + fs::write( + &stdout_file, + serde_json::to_string(&json!({ + "type": "result", + "subtype": "success", + "is_error": false, + "duration_ms": 4321, + "duration_api_ms": 4200, + "num_turns": 3, + "result": "background oracle", + "stop_reason": "end_turn", + "session_id": "3fc69f58-7752-4d9d-a95d-19a217814b6a", + "total_cost_usd": 0.25, + "usage": { + "input_tokens": 11, + "output_tokens": 7 + }, + "modelUsage": { + "claude-opus-4-6": { + "inputTokens": 11, + "outputTokens": 7 + } + }, + "permission_denials": [], + "fast_mode_state": "off", + "uuid": "uuid-456" + }))?, + ), + "write fake stdout", + )?; + + let claude_bin = fake_claude.display().to_string(); + let stdout_path = stdout_file.display().to_string(); + let args_path = args_file.display().to_string(); + let pwd_path = pwd_file.display().to_string(); + let env = [ + ("PHONE_OPUS_CLAUDE_BIN", claude_bin.as_str()), + ("PHONE_OPUS_TEST_STDOUT_FILE", stdout_path.as_str()), + ("PHONE_OPUS_TEST_ARGS_FILE", args_path.as_str()), + ("PHONE_OPUS_TEST_PWD_FILE", pwd_path.as_str()), + ]; + let mut harness = McpHarness::spawn(&state_home, &env)?; + let _ = harness.initialize()?; + harness.notify_initialized()?; + + let submit = harness.call_tool( + 3, + "consult", + json!({ + "prompt": "background oracle", + "cwd": sandbox.display().to_string(), + "background": true + }), + )?; + assert_tool_ok(&submit); + assert_eq!(tool_content(&submit)["mode"].as_str(), Some("background")); + let job_id = must_some( + tool_content(&submit)["job_id"].as_str().map(str::to_owned), + "background job id", + )?; + let _ = uuid::Uuid::parse_str(&job_id) + .map_err(|error| io::Error::other(format!("job id uuid parse: {error}")))?; + + let mut job = Value::Null; + for _ in 0..100 { + job = harness.call_tool( + 4, + "consult_job", + json!({ + "job_id": job_id, + "render": "json" + }), + )?; + assert_tool_ok(&job); + if tool_content(&job)["status"].as_str() == Some("succeeded") { + break; + } + std::thread::sleep(Duration::from_millis(10)); + } + + assert_eq!(tool_content(&job)["status"].as_str(), Some("succeeded")); + assert_eq!( + tool_content(&job)["result"]["response"].as_str(), + Some("background oracle") + ); + + let jobs = harness.call_tool(5, "consult_jobs", json!({ "render": "json" }))?; + assert_tool_ok(&jobs); + assert!( + tool_content(&jobs)["jobs"] + .as_array() + .into_iter() + .flatten() + .any(|value| value["job_id"] == job_id) + ); + + let args = must(fs::read_to_string(&args_file), "read fake args file")?; + assert!(args.contains(PROMPT_PREFIX)); + assert!(args.contains("background oracle")); + let pwd = must(fs::read_to_string(&pwd_file), "read fake pwd file")?; + assert_eq!(pwd.trim(), sandbox.display().to_string()); + Ok(()) +} + +#[test] fn consult_rejects_invalid_session_handles() -> TestResult { let root = temp_root("consult_invalid_session")?; let state_home = root.join("state-home"); |