diff options
| author | main <main@swarm.moe> | 2026-03-23 19:16:10 -0400 |
|---|---|---|
| committer | main <main@swarm.moe> | 2026-03-23 19:16:10 -0400 |
| commit | 00949559a8a4757e1198e1ea582ebfcf7268fec4 (patch) | |
| tree | 7bb8821c1d597eb386998a6f0b4cba1b9e24de41 | |
| parent | dd2b64ed08b8ac55d6aaeb54d635b33b51eea790 (diff) | |
| download | phone_opus-00949559a8a4757e1198e1ea582ebfcf7268fec4.zip | |
Add blocking wait for background consult jobs
| -rw-r--r-- | README.md | 1 | ||||
| -rw-r--r-- | assets/codex-skills/phone-opus/SKILL.md | 7 | ||||
| -rw-r--r-- | crates/phone-opus/src/mcp/catalog.rs | 26 | ||||
| -rw-r--r-- | crates/phone-opus/src/mcp/host/runtime.rs | 11 | ||||
| -rw-r--r-- | crates/phone-opus/src/mcp/service.rs | 173 | ||||
| -rw-r--r-- | crates/phone-opus/tests/mcp_hardening.rs | 79 |
6 files changed, 244 insertions, 53 deletions
@@ -11,6 +11,7 @@ It exposes one blocking domain tool: - a fixed consult prefix is prepended before the caller-supplied prompt - pass `background: true` to queue the consult and get back a job handle instead of blocking - `consult_job`: inspect one background consult job and receive the final result once it completes +- `consult_wait`: block on one background consult job until it completes or a timeout elapses, then return the current job state plus any final result - `consult_jobs`: list recent background consult jobs The server keeps the public MCP session in a durable host, isolates the actual diff --git a/assets/codex-skills/phone-opus/SKILL.md b/assets/codex-skills/phone-opus/SKILL.md index 6b1ae3d..d1b22bd 100644 --- a/assets/codex-skills/phone-opus/SKILL.md +++ b/assets/codex-skills/phone-opus/SKILL.md @@ -16,7 +16,7 @@ should be taken as authoritative or final. It is a pure consultant. - Ask for a second opinion on code, architecture, debugging, or design. - Point Claude at a specific repository with `cwd`. - Reuse `session_id` from an earlier call when you want Claude to continue the same conversation. -- Set `background: true` when you want to launch a consult, keep working, and poll for the answer later. +- Set `background: true` when you want to launch a consult, keep working, and join it later with `consult_wait` or inspect it with `consult_job`. ## Tool surface @@ -25,6 +25,9 @@ should be taken as authoritative or final. It is a pure consultant. - optional: `cwd`, `session_id`, `background`, `render`, `detail` - `consult_job` - required: `job_id` +- `consult_wait` + - required: `job_id` + - optional: `timeout_ms`, `poll_interval_ms`, `render`, `detail` - `consult_jobs` - optional: `limit`, `render`, `detail` - `health_snapshot` @@ -40,7 +43,7 @@ should be taken as authoritative or final. It is a pure consultant. - For related follow-ups, strongly prefer reusing `session_id`; cold-start Opus burns quota rereading files, while session reuse is much cheaper. - This surface is consultative only. Edit tools are unavailable. - The returned `session_id` is reusable: pass it back into a later `consult` call to continue that Claude conversation. -- Background consults return a `job_id`; use `consult_job` to poll one job or `consult_jobs` to rediscover recent ones. +- Background consults return a `job_id`; use `consult_wait` when you want a blocking join, `consult_job` to inspect one job directly, or `consult_jobs` to rediscover recent ones. ## Example diff --git a/crates/phone-opus/src/mcp/catalog.rs b/crates/phone-opus/src/mcp/catalog.rs index f3f0925..a4a1780 100644 --- a/crates/phone-opus/src/mcp/catalog.rs +++ b/crates/phone-opus/src/mcp/catalog.rs @@ -52,6 +52,12 @@ const TOOL_SPECS: &[ToolSpec] = &[ replay: ReplayContract::Convergent, }, ToolSpec { + name: "consult_wait", + description: "Block until one background consult job finishes or a timeout elapses. When the job has finished, the final Claude response or failure is included.", + dispatch: DispatchTarget::Host, + replay: ReplayContract::Convergent, + }, + ToolSpec { name: "consult_jobs", description: "List recent background consult jobs. Defaults to render=porcelain; use render=json for structured output.", dispatch: DispatchTarget::Host, @@ -123,6 +129,26 @@ fn tool_schema(name: &str) -> Value { }, "required": ["job_id"] })), + "consult_wait" => with_common_presentation(json!({ + "type": "object", + "properties": { + "job_id": { + "type": "string", + "description": "Background consult job handle returned by consult with background=true." + }, + "timeout_ms": { + "type": "integer", + "minimum": 0, + "description": "Maximum time to wait for completion before returning the current job state. Defaults to 1800000 (30 minutes)." + }, + "poll_interval_ms": { + "type": "integer", + "minimum": 10, + "description": "Polling interval used while waiting. Defaults to 1000." + } + }, + "required": ["job_id"] + })), "consult_jobs" => with_common_presentation(json!({ "type": "object", "properties": { diff --git a/crates/phone-opus/src/mcp/host/runtime.rs b/crates/phone-opus/src/mcp/host/runtime.rs index 688ffd6..df54844 100644 --- a/crates/phone-opus/src/mcp/host/runtime.rs +++ b/crates/phone-opus/src/mcp/host/runtime.rs @@ -25,7 +25,9 @@ use crate::mcp::protocol::{ FORCE_ROLLOUT_ENV, HOST_STATE_ENV, HostRequestId, HostStateSeed, PROTOCOL_VERSION, SERVER_NAME, WORKER_CRASH_ONCE_ENV, WorkerOperation, WorkerSpawnConfig, }; -use crate::mcp::service::{consult_job_tool_output, consult_jobs_tool_output}; +use crate::mcp::service::{ + consult_job_tool_output, consult_jobs_tool_output, consult_wait_tool_output, +}; use crate::mcp::telemetry::ServerTelemetry; pub(crate) fn run_host() -> Result<(), Box<dyn std::error::Error>> { @@ -371,6 +373,13 @@ impl HostRuntime { FaultStage::Host, &operation, ), + "consult_wait" => tool_success( + consult_wait_tool_output(arguments, generation, FaultStage::Host, &operation)?, + presentation, + generation, + FaultStage::Host, + &operation, + ), "consult_jobs" => tool_success( consult_jobs_tool_output(arguments, generation, FaultStage::Host, &operation)?, presentation, diff --git a/crates/phone-opus/src/mcp/service.rs b/crates/phone-opus/src/mcp/service.rs index ff25433..773a516 100644 --- a/crates/phone-opus/src/mcp/service.rs +++ b/crates/phone-opus/src/mcp/service.rs @@ -3,12 +3,12 @@ use std::fs; use std::io::{self, BufRead, Write}; use std::path::{Path, PathBuf}; use std::process::{Command, Stdio}; -use std::time::{Duration, SystemTime, UNIX_EPOCH}; +use std::time::{Duration, Instant, SystemTime, UNIX_EPOCH}; use dirs::{home_dir, state_dir}; use libmcp::{Generation, SurfaceKind}; use serde::{Deserialize, Serialize}; -use serde_json::{Value, json}; +use serde_json::{Map, Value, json}; use thiserror::Error; use time::{OffsetDateTime, format_description::well_known::Rfc3339}; use users::get_current_uid; @@ -132,11 +132,22 @@ struct ConsultJobArgs { job_id: String, } +#[derive(Debug, Deserialize)] +struct ConsultWaitArgs { + job_id: String, + timeout_ms: Option<u64>, + poll_interval_ms: Option<u64>, +} + #[derive(Debug, Default, Deserialize)] struct ConsultJobsArgs { limit: Option<u64>, } +const DEFAULT_CONSULT_WAIT_TIMEOUT_MS: u64 = 30 * 60 * 1_000; +const DEFAULT_CONSULT_WAIT_POLL_INTERVAL_MS: u64 = 1_000; +const MIN_CONSULT_WAIT_POLL_INTERVAL_MS: u64 = 10; + #[derive(Debug, Clone)] struct ConsultRequest { prompt: PromptText, @@ -406,6 +417,50 @@ struct BackgroundConsultJobRecord { failure: Option<BackgroundConsultFailure>, } +#[derive(Debug, Clone, Copy, Eq, PartialEq)] +struct ConsultWaitConfig { + timeout_ms: u64, + poll_interval_ms: u64, +} + +impl ConsultWaitConfig { + fn parse(timeout_ms: Option<u64>, poll_interval_ms: Option<u64>) -> Self { + Self { + timeout_ms: timeout_ms.unwrap_or(DEFAULT_CONSULT_WAIT_TIMEOUT_MS), + poll_interval_ms: poll_interval_ms + .unwrap_or(DEFAULT_CONSULT_WAIT_POLL_INTERVAL_MS) + .max(MIN_CONSULT_WAIT_POLL_INTERVAL_MS), + } + } +} + +#[derive(Debug, Clone)] +struct BackgroundConsultWaitRequest { + job_id: BackgroundConsultJobId, + config: ConsultWaitConfig, +} + +impl BackgroundConsultWaitRequest { + fn parse(args: ConsultWaitArgs) -> Result<Self, ConsultRequestError> { + Ok(Self { + job_id: BackgroundConsultJobId::parse(args.job_id)?, + config: ConsultWaitConfig::parse(args.timeout_ms, args.poll_interval_ms), + }) + } +} + +#[derive(Debug, Clone, Copy, Eq, PartialEq)] +struct BackgroundConsultWaitMetadata { + waited_ms: u64, + timed_out: bool, +} + +#[derive(Debug, Clone)] +struct BackgroundConsultWaitOutcome { + record: BackgroundConsultJobRecord, + metadata: BackgroundConsultWaitMetadata, +} + impl BackgroundConsultJobRecord { fn new(job_id: BackgroundConsultJobId, request: BackgroundConsultRequest) -> Self { let now = unix_ms_now(); @@ -770,7 +825,28 @@ pub(crate) fn consult_job_tool_output( let record = load_background_consult_job(&job_id).map_err(|error| { FaultRecord::downstream(generation, stage, operation, error.to_string()) })?; - background_job_tool_output(&record, generation, stage, operation) + background_job_tool_output(&record, None, generation, stage, operation) +} + +pub(crate) fn consult_wait_tool_output( + arguments: Value, + generation: Generation, + stage: FaultStage, + operation: &str, +) -> Result<ToolOutput, FaultRecord> { + let args = deserialize::<ConsultWaitArgs>(arguments, operation, generation)?; + let request = BackgroundConsultWaitRequest::parse(args) + .map_err(|error| invalid_consult_request(generation, operation, error))?; + let outcome = wait_for_background_consult(&request).map_err(|error| { + FaultRecord::downstream(generation, stage, operation, error.to_string()) + })?; + background_job_tool_output( + &outcome.record, + Some(outcome.metadata), + generation, + stage, + operation, + ) } pub(crate) fn consult_jobs_tool_output( @@ -860,7 +936,7 @@ fn submit_background_consult( "requested_session_id": request.requested_session_id(), "session_mode": request.session_mode(), "prompt_prefix_injected": true, - "follow_up_tools": ["consult_job", "consult_jobs"], + "follow_up_tools": ["consult_wait", "consult_job", "consult_jobs"], }); let full = json!({ "mode": request.mode().as_str(), @@ -873,7 +949,7 @@ fn submit_background_consult( "prompt": request.prompt.as_str(), "effective_prompt": request.prompt.rendered(), "cwd": request.cwd.display(), - "follow_up_tools": ["consult_job", "consult_jobs"], + "follow_up_tools": ["consult_wait", "consult_job", "consult_jobs"], }); fallback_detailed_tool_output( &concise, @@ -935,44 +1011,81 @@ fn background_failure(error: ConsultInvocationError) -> BackgroundConsultFailure } } +fn wait_for_background_consult( + request: &BackgroundConsultWaitRequest, +) -> io::Result<BackgroundConsultWaitOutcome> { + let started_at = Instant::now(); + loop { + let record = load_background_consult_job(&request.job_id)?; + let waited_ms = elapsed_duration_ms(started_at.elapsed()); + if record.status.done() { + return Ok(BackgroundConsultWaitOutcome { + record, + metadata: BackgroundConsultWaitMetadata { + waited_ms, + timed_out: false, + }, + }); + } + if waited_ms >= request.config.timeout_ms { + return Ok(BackgroundConsultWaitOutcome { + record, + metadata: BackgroundConsultWaitMetadata { + waited_ms, + timed_out: true, + }, + }); + } + let remaining_ms = request.config.timeout_ms.saturating_sub(waited_ms); + let sleep_ms = remaining_ms.min(request.config.poll_interval_ms); + std::thread::sleep(Duration::from_millis(sleep_ms)); + } +} + fn background_job_tool_output( record: &BackgroundConsultJobRecord, + wait: Option<BackgroundConsultWaitMetadata>, generation: Generation, stage: FaultStage, operation: &str, ) -> Result<ToolOutput, FaultRecord> { - let concise = json!({ - "job_id": record.job_id.display(), - "status": record.status, - "done": record.status.done(), - "succeeded": record.status.success(), - "failed": record.status.failed(), - "created_unix_ms": record.created_unix_ms, - "updated_unix_ms": record.updated_unix_ms, - "started_unix_ms": record.started_unix_ms, - "finished_unix_ms": record.finished_unix_ms, - "runner_pid": record.runner_pid, - "cwd": record.request.cwd, - "requested_session_id": record.request.session_id, - "prompt_prefix_injected": record.prompt_prefix_injected, - "result": record.result.as_ref().map(|result| json!({ + let mut concise = match record.summary() { + Value::Object(object) => object, + _ => Map::new(), + }; + if let Some(wait) = wait { + let _ = concise.insert("waited_ms".to_owned(), json!(wait.waited_ms)); + let _ = concise.insert("timed_out".to_owned(), json!(wait.timed_out)); + } + let _ = concise.insert( + "result".to_owned(), + json!(record.result.as_ref().map(|result| json!({ "response": result.response, "persisted_output_path": result.persisted_output_path, "duration_ms": result.duration_ms, "num_turns": result.num_turns, "session_id": result.session_id, "model": result.model_name(), - })), - "failure": record.failure, - }); - let full = json!({ - "job": record, - }); + }))), + ); + let _ = concise.insert("failure".to_owned(), json!(record.failure)); + let mut full = Map::from_iter([("job".to_owned(), json!(record))]); + if let Some(wait) = wait { + let _ = full.insert("waited_ms".to_owned(), json!(wait.waited_ms)); + let _ = full.insert("timed_out".to_owned(), json!(wait.timed_out)); + } let mut lines = vec![format!( "job={} status={:?}", record.job_id.display(), record.status )]; + if let Some(wait) = wait { + lines.push(format!( + "waited={} timed_out={}", + render_duration_ms(wait.waited_ms), + wait.timed_out + )); + } if let Some(result) = record.result.as_ref() { lines.push(format!( "result ready model={} turns={} duration={}", @@ -988,8 +1101,8 @@ fn background_job_tool_output( lines.push(format!("failure={} {}", failure.class, failure.detail)); } fallback_detailed_tool_output( - &concise, - &full, + &Value::Object(concise), + &Value::Object(full), lines.join("\n"), None, SurfaceKind::Read, @@ -1552,6 +1665,10 @@ fn model_name(model_usage: Option<&Value>) -> Option<String> { models.keys().next().cloned() } +fn elapsed_duration_ms(duration: Duration) -> u64 { + duration.as_millis().try_into().unwrap_or(u64::MAX) +} + fn render_duration_ms(duration_ms: u64) -> String { if duration_ms < 1_000 { return format!("{duration_ms}ms"); diff --git a/crates/phone-opus/tests/mcp_hardening.rs b/crates/phone-opus/tests/mcp_hardening.rs index f4a8a89..cbe3354 100644 --- a/crates/phone-opus/tests/mcp_hardening.rs +++ b/crates/phone-opus/tests/mcp_hardening.rs @@ -6,7 +6,6 @@ use std::io::{self, BufRead, BufReader, Write}; use std::os::unix::fs::PermissionsExt; use std::path::{Path, PathBuf}; use std::process::{Child, ChildStdin, ChildStdout, Command, Stdio}; -use std::time::Duration; use libmcp_testkit::read_json_lines; use serde as _; @@ -197,6 +196,9 @@ fi if [ -n "${PHONE_OPUS_TEST_ARGS_FILE:-}" ]; then printf '%s\n' "$@" >"$PHONE_OPUS_TEST_ARGS_FILE" fi +if [ -n "${PHONE_OPUS_TEST_SLEEP_MS:-}" ]; then + python3 -c 'import os,time; time.sleep(int(os.environ["PHONE_OPUS_TEST_SLEEP_MS"]) / 1000.0)' +fi if [ -n "${PHONE_OPUS_TEST_CWD_WRITE_PROBE_FILE:-}" ]; then probe_target="${PWD}/.phone_opus_write_probe" probe_error="${PHONE_OPUS_TEST_CWD_WRITE_ERROR_FILE:-/tmp/phone-opus-write.err}" @@ -287,6 +289,7 @@ fn cold_start_exposes_consult_and_ops_tools() -> TestResult { let tool_names = tool_names(&tools); assert!(tool_names.contains(&"consult")); assert!(tool_names.contains(&"consult_job")); + assert!(tool_names.contains(&"consult_wait")); assert!(tool_names.contains(&"consult_jobs")); assert!(tool_names.contains(&"health_snapshot")); assert!(tool_names.contains(&"telemetry_snapshot")); @@ -536,7 +539,7 @@ fn consult_can_resume_a_prior_session_with_read_only_toolset_and_requested_worki } #[test] -fn consult_can_run_in_background_and_be_polled() -> TestResult { +fn consult_can_run_in_background_and_be_waited_on_or_polled() -> TestResult { let root = temp_root("consult_background")?; let state_home = root.join("state-home"); let sandbox = root.join("sandbox"); @@ -593,6 +596,7 @@ fn consult_can_run_in_background_and_be_polled() -> TestResult { ("PHONE_OPUS_TEST_STDOUT_FILE", stdout_path.as_str()), ("PHONE_OPUS_TEST_ARGS_FILE", args_path.as_str()), ("PHONE_OPUS_TEST_PWD_FILE", pwd_path.as_str()), + ("PHONE_OPUS_TEST_SLEEP_MS", "100"), ]; let mut harness = McpHarness::spawn(&state_home, &env)?; let _ = harness.initialize()?; @@ -609,6 +613,13 @@ fn consult_can_run_in_background_and_be_polled() -> TestResult { )?; assert_tool_ok(&submit); assert_eq!(tool_content(&submit)["mode"].as_str(), Some("background")); + assert!( + tool_content(&submit)["follow_up_tools"] + .as_array() + .into_iter() + .flatten() + .any(|value| value == "consult_wait") + ); let job_id = must_some( tool_content(&submit)["job_id"].as_str().map(str::to_owned), "background job id", @@ -616,30 +627,43 @@ fn consult_can_run_in_background_and_be_polled() -> TestResult { let _ = uuid::Uuid::parse_str(&job_id) .map_err(|error| io::Error::other(format!("job id uuid parse: {error}")))?; - let mut job = Value::Null; - for _ in 0..100 { - job = harness.call_tool( - 4, - "consult_job", - json!({ - "job_id": job_id, - "render": "json" - }), - )?; - assert_tool_ok(&job); - if tool_content(&job)["status"].as_str() == Some("succeeded") { - break; - } - std::thread::sleep(Duration::from_millis(10)); - } + let timed_out = harness.call_tool( + 4, + "consult_wait", + json!({ + "job_id": job_id, + "timeout_ms": 0, + "render": "json" + }), + )?; + assert_tool_ok(&timed_out); + assert_eq!(tool_content(&timed_out)["timed_out"].as_bool(), Some(true)); + assert_eq!(tool_content(&timed_out)["done"].as_bool(), Some(false)); - assert_eq!(tool_content(&job)["status"].as_str(), Some("succeeded")); + let waited = harness.call_tool( + 5, + "consult_wait", + json!({ + "job_id": job_id, + "timeout_ms": 5_000, + "poll_interval_ms": 10, + "render": "json" + }), + )?; + assert_tool_ok(&waited); + assert_eq!(tool_content(&waited)["timed_out"].as_bool(), Some(false)); + assert_eq!(tool_content(&waited)["status"].as_str(), Some("succeeded")); + assert!( + tool_content(&waited)["waited_ms"] + .as_u64() + .is_some_and(|value| value >= 50) + ); assert_eq!( - tool_content(&job)["result"]["response"].as_str(), + tool_content(&waited)["result"]["response"].as_str(), Some("background oracle") ); let persisted_output_path = must_some( - tool_content(&job)["result"]["persisted_output_path"] + tool_content(&waited)["result"]["persisted_output_path"] .as_str() .map(str::to_owned), "background persisted output path", @@ -659,7 +683,18 @@ fn consult_can_run_in_background_and_be_polled() -> TestResult { Some("background oracle") ); - let jobs = harness.call_tool(5, "consult_jobs", json!({ "render": "json" }))?; + let job = harness.call_tool( + 6, + "consult_job", + json!({ + "job_id": job_id, + "render": "json" + }), + )?; + assert_tool_ok(&job); + assert_eq!(tool_content(&job)["status"].as_str(), Some("succeeded")); + + let jobs = harness.call_tool(7, "consult_jobs", json!({ "render": "json" }))?; assert_tool_ok(&jobs); assert!( tool_content(&jobs)["jobs"] |