swarm repositories / source
aboutsummaryrefslogtreecommitdiff
diff options
context:
space:
mode:
authormain <main@swarm.moe>2026-03-23 19:16:10 -0400
committermain <main@swarm.moe>2026-03-23 19:16:10 -0400
commit00949559a8a4757e1198e1ea582ebfcf7268fec4 (patch)
tree7bb8821c1d597eb386998a6f0b4cba1b9e24de41
parentdd2b64ed08b8ac55d6aaeb54d635b33b51eea790 (diff)
downloadphone_opus-00949559a8a4757e1198e1ea582ebfcf7268fec4.zip
Add blocking wait for background consult jobs
-rw-r--r--README.md1
-rw-r--r--assets/codex-skills/phone-opus/SKILL.md7
-rw-r--r--crates/phone-opus/src/mcp/catalog.rs26
-rw-r--r--crates/phone-opus/src/mcp/host/runtime.rs11
-rw-r--r--crates/phone-opus/src/mcp/service.rs173
-rw-r--r--crates/phone-opus/tests/mcp_hardening.rs79
6 files changed, 244 insertions, 53 deletions
diff --git a/README.md b/README.md
index caa5f2f..062fdd8 100644
--- a/README.md
+++ b/README.md
@@ -11,6 +11,7 @@ It exposes one blocking domain tool:
- a fixed consult prefix is prepended before the caller-supplied prompt
- pass `background: true` to queue the consult and get back a job handle instead of blocking
- `consult_job`: inspect one background consult job and receive the final result once it completes
+- `consult_wait`: block on one background consult job until it completes or a timeout elapses, then return the current job state plus any final result
- `consult_jobs`: list recent background consult jobs
The server keeps the public MCP session in a durable host, isolates the actual
diff --git a/assets/codex-skills/phone-opus/SKILL.md b/assets/codex-skills/phone-opus/SKILL.md
index 6b1ae3d..d1b22bd 100644
--- a/assets/codex-skills/phone-opus/SKILL.md
+++ b/assets/codex-skills/phone-opus/SKILL.md
@@ -16,7 +16,7 @@ should be taken as authoritative or final. It is a pure consultant.
- Ask for a second opinion on code, architecture, debugging, or design.
- Point Claude at a specific repository with `cwd`.
- Reuse `session_id` from an earlier call when you want Claude to continue the same conversation.
-- Set `background: true` when you want to launch a consult, keep working, and poll for the answer later.
+- Set `background: true` when you want to launch a consult, keep working, and join it later with `consult_wait` or inspect it with `consult_job`.
## Tool surface
@@ -25,6 +25,9 @@ should be taken as authoritative or final. It is a pure consultant.
- optional: `cwd`, `session_id`, `background`, `render`, `detail`
- `consult_job`
- required: `job_id`
+- `consult_wait`
+ - required: `job_id`
+ - optional: `timeout_ms`, `poll_interval_ms`, `render`, `detail`
- `consult_jobs`
- optional: `limit`, `render`, `detail`
- `health_snapshot`
@@ -40,7 +43,7 @@ should be taken as authoritative or final. It is a pure consultant.
- For related follow-ups, strongly prefer reusing `session_id`; cold-start Opus burns quota rereading files, while session reuse is much cheaper.
- This surface is consultative only. Edit tools are unavailable.
- The returned `session_id` is reusable: pass it back into a later `consult` call to continue that Claude conversation.
-- Background consults return a `job_id`; use `consult_job` to poll one job or `consult_jobs` to rediscover recent ones.
+- Background consults return a `job_id`; use `consult_wait` when you want a blocking join, `consult_job` to inspect one job directly, or `consult_jobs` to rediscover recent ones.
## Example
diff --git a/crates/phone-opus/src/mcp/catalog.rs b/crates/phone-opus/src/mcp/catalog.rs
index f3f0925..a4a1780 100644
--- a/crates/phone-opus/src/mcp/catalog.rs
+++ b/crates/phone-opus/src/mcp/catalog.rs
@@ -52,6 +52,12 @@ const TOOL_SPECS: &[ToolSpec] = &[
replay: ReplayContract::Convergent,
},
ToolSpec {
+ name: "consult_wait",
+ description: "Block until one background consult job finishes or a timeout elapses. When the job has finished, the final Claude response or failure is included.",
+ dispatch: DispatchTarget::Host,
+ replay: ReplayContract::Convergent,
+ },
+ ToolSpec {
name: "consult_jobs",
description: "List recent background consult jobs. Defaults to render=porcelain; use render=json for structured output.",
dispatch: DispatchTarget::Host,
@@ -123,6 +129,26 @@ fn tool_schema(name: &str) -> Value {
},
"required": ["job_id"]
})),
+ "consult_wait" => with_common_presentation(json!({
+ "type": "object",
+ "properties": {
+ "job_id": {
+ "type": "string",
+ "description": "Background consult job handle returned by consult with background=true."
+ },
+ "timeout_ms": {
+ "type": "integer",
+ "minimum": 0,
+ "description": "Maximum time to wait for completion before returning the current job state. Defaults to 1800000 (30 minutes)."
+ },
+ "poll_interval_ms": {
+ "type": "integer",
+ "minimum": 10,
+ "description": "Polling interval used while waiting. Defaults to 1000."
+ }
+ },
+ "required": ["job_id"]
+ })),
"consult_jobs" => with_common_presentation(json!({
"type": "object",
"properties": {
diff --git a/crates/phone-opus/src/mcp/host/runtime.rs b/crates/phone-opus/src/mcp/host/runtime.rs
index 688ffd6..df54844 100644
--- a/crates/phone-opus/src/mcp/host/runtime.rs
+++ b/crates/phone-opus/src/mcp/host/runtime.rs
@@ -25,7 +25,9 @@ use crate::mcp::protocol::{
FORCE_ROLLOUT_ENV, HOST_STATE_ENV, HostRequestId, HostStateSeed, PROTOCOL_VERSION, SERVER_NAME,
WORKER_CRASH_ONCE_ENV, WorkerOperation, WorkerSpawnConfig,
};
-use crate::mcp::service::{consult_job_tool_output, consult_jobs_tool_output};
+use crate::mcp::service::{
+ consult_job_tool_output, consult_jobs_tool_output, consult_wait_tool_output,
+};
use crate::mcp::telemetry::ServerTelemetry;
pub(crate) fn run_host() -> Result<(), Box<dyn std::error::Error>> {
@@ -371,6 +373,13 @@ impl HostRuntime {
FaultStage::Host,
&operation,
),
+ "consult_wait" => tool_success(
+ consult_wait_tool_output(arguments, generation, FaultStage::Host, &operation)?,
+ presentation,
+ generation,
+ FaultStage::Host,
+ &operation,
+ ),
"consult_jobs" => tool_success(
consult_jobs_tool_output(arguments, generation, FaultStage::Host, &operation)?,
presentation,
diff --git a/crates/phone-opus/src/mcp/service.rs b/crates/phone-opus/src/mcp/service.rs
index ff25433..773a516 100644
--- a/crates/phone-opus/src/mcp/service.rs
+++ b/crates/phone-opus/src/mcp/service.rs
@@ -3,12 +3,12 @@ use std::fs;
use std::io::{self, BufRead, Write};
use std::path::{Path, PathBuf};
use std::process::{Command, Stdio};
-use std::time::{Duration, SystemTime, UNIX_EPOCH};
+use std::time::{Duration, Instant, SystemTime, UNIX_EPOCH};
use dirs::{home_dir, state_dir};
use libmcp::{Generation, SurfaceKind};
use serde::{Deserialize, Serialize};
-use serde_json::{Value, json};
+use serde_json::{Map, Value, json};
use thiserror::Error;
use time::{OffsetDateTime, format_description::well_known::Rfc3339};
use users::get_current_uid;
@@ -132,11 +132,22 @@ struct ConsultJobArgs {
job_id: String,
}
+#[derive(Debug, Deserialize)]
+struct ConsultWaitArgs {
+ job_id: String,
+ timeout_ms: Option<u64>,
+ poll_interval_ms: Option<u64>,
+}
+
#[derive(Debug, Default, Deserialize)]
struct ConsultJobsArgs {
limit: Option<u64>,
}
+const DEFAULT_CONSULT_WAIT_TIMEOUT_MS: u64 = 30 * 60 * 1_000;
+const DEFAULT_CONSULT_WAIT_POLL_INTERVAL_MS: u64 = 1_000;
+const MIN_CONSULT_WAIT_POLL_INTERVAL_MS: u64 = 10;
+
#[derive(Debug, Clone)]
struct ConsultRequest {
prompt: PromptText,
@@ -406,6 +417,50 @@ struct BackgroundConsultJobRecord {
failure: Option<BackgroundConsultFailure>,
}
+#[derive(Debug, Clone, Copy, Eq, PartialEq)]
+struct ConsultWaitConfig {
+ timeout_ms: u64,
+ poll_interval_ms: u64,
+}
+
+impl ConsultWaitConfig {
+ fn parse(timeout_ms: Option<u64>, poll_interval_ms: Option<u64>) -> Self {
+ Self {
+ timeout_ms: timeout_ms.unwrap_or(DEFAULT_CONSULT_WAIT_TIMEOUT_MS),
+ poll_interval_ms: poll_interval_ms
+ .unwrap_or(DEFAULT_CONSULT_WAIT_POLL_INTERVAL_MS)
+ .max(MIN_CONSULT_WAIT_POLL_INTERVAL_MS),
+ }
+ }
+}
+
+#[derive(Debug, Clone)]
+struct BackgroundConsultWaitRequest {
+ job_id: BackgroundConsultJobId,
+ config: ConsultWaitConfig,
+}
+
+impl BackgroundConsultWaitRequest {
+ fn parse(args: ConsultWaitArgs) -> Result<Self, ConsultRequestError> {
+ Ok(Self {
+ job_id: BackgroundConsultJobId::parse(args.job_id)?,
+ config: ConsultWaitConfig::parse(args.timeout_ms, args.poll_interval_ms),
+ })
+ }
+}
+
+#[derive(Debug, Clone, Copy, Eq, PartialEq)]
+struct BackgroundConsultWaitMetadata {
+ waited_ms: u64,
+ timed_out: bool,
+}
+
+#[derive(Debug, Clone)]
+struct BackgroundConsultWaitOutcome {
+ record: BackgroundConsultJobRecord,
+ metadata: BackgroundConsultWaitMetadata,
+}
+
impl BackgroundConsultJobRecord {
fn new(job_id: BackgroundConsultJobId, request: BackgroundConsultRequest) -> Self {
let now = unix_ms_now();
@@ -770,7 +825,28 @@ pub(crate) fn consult_job_tool_output(
let record = load_background_consult_job(&job_id).map_err(|error| {
FaultRecord::downstream(generation, stage, operation, error.to_string())
})?;
- background_job_tool_output(&record, generation, stage, operation)
+ background_job_tool_output(&record, None, generation, stage, operation)
+}
+
+pub(crate) fn consult_wait_tool_output(
+ arguments: Value,
+ generation: Generation,
+ stage: FaultStage,
+ operation: &str,
+) -> Result<ToolOutput, FaultRecord> {
+ let args = deserialize::<ConsultWaitArgs>(arguments, operation, generation)?;
+ let request = BackgroundConsultWaitRequest::parse(args)
+ .map_err(|error| invalid_consult_request(generation, operation, error))?;
+ let outcome = wait_for_background_consult(&request).map_err(|error| {
+ FaultRecord::downstream(generation, stage, operation, error.to_string())
+ })?;
+ background_job_tool_output(
+ &outcome.record,
+ Some(outcome.metadata),
+ generation,
+ stage,
+ operation,
+ )
}
pub(crate) fn consult_jobs_tool_output(
@@ -860,7 +936,7 @@ fn submit_background_consult(
"requested_session_id": request.requested_session_id(),
"session_mode": request.session_mode(),
"prompt_prefix_injected": true,
- "follow_up_tools": ["consult_job", "consult_jobs"],
+ "follow_up_tools": ["consult_wait", "consult_job", "consult_jobs"],
});
let full = json!({
"mode": request.mode().as_str(),
@@ -873,7 +949,7 @@ fn submit_background_consult(
"prompt": request.prompt.as_str(),
"effective_prompt": request.prompt.rendered(),
"cwd": request.cwd.display(),
- "follow_up_tools": ["consult_job", "consult_jobs"],
+ "follow_up_tools": ["consult_wait", "consult_job", "consult_jobs"],
});
fallback_detailed_tool_output(
&concise,
@@ -935,44 +1011,81 @@ fn background_failure(error: ConsultInvocationError) -> BackgroundConsultFailure
}
}
+fn wait_for_background_consult(
+ request: &BackgroundConsultWaitRequest,
+) -> io::Result<BackgroundConsultWaitOutcome> {
+ let started_at = Instant::now();
+ loop {
+ let record = load_background_consult_job(&request.job_id)?;
+ let waited_ms = elapsed_duration_ms(started_at.elapsed());
+ if record.status.done() {
+ return Ok(BackgroundConsultWaitOutcome {
+ record,
+ metadata: BackgroundConsultWaitMetadata {
+ waited_ms,
+ timed_out: false,
+ },
+ });
+ }
+ if waited_ms >= request.config.timeout_ms {
+ return Ok(BackgroundConsultWaitOutcome {
+ record,
+ metadata: BackgroundConsultWaitMetadata {
+ waited_ms,
+ timed_out: true,
+ },
+ });
+ }
+ let remaining_ms = request.config.timeout_ms.saturating_sub(waited_ms);
+ let sleep_ms = remaining_ms.min(request.config.poll_interval_ms);
+ std::thread::sleep(Duration::from_millis(sleep_ms));
+ }
+}
+
fn background_job_tool_output(
record: &BackgroundConsultJobRecord,
+ wait: Option<BackgroundConsultWaitMetadata>,
generation: Generation,
stage: FaultStage,
operation: &str,
) -> Result<ToolOutput, FaultRecord> {
- let concise = json!({
- "job_id": record.job_id.display(),
- "status": record.status,
- "done": record.status.done(),
- "succeeded": record.status.success(),
- "failed": record.status.failed(),
- "created_unix_ms": record.created_unix_ms,
- "updated_unix_ms": record.updated_unix_ms,
- "started_unix_ms": record.started_unix_ms,
- "finished_unix_ms": record.finished_unix_ms,
- "runner_pid": record.runner_pid,
- "cwd": record.request.cwd,
- "requested_session_id": record.request.session_id,
- "prompt_prefix_injected": record.prompt_prefix_injected,
- "result": record.result.as_ref().map(|result| json!({
+ let mut concise = match record.summary() {
+ Value::Object(object) => object,
+ _ => Map::new(),
+ };
+ if let Some(wait) = wait {
+ let _ = concise.insert("waited_ms".to_owned(), json!(wait.waited_ms));
+ let _ = concise.insert("timed_out".to_owned(), json!(wait.timed_out));
+ }
+ let _ = concise.insert(
+ "result".to_owned(),
+ json!(record.result.as_ref().map(|result| json!({
"response": result.response,
"persisted_output_path": result.persisted_output_path,
"duration_ms": result.duration_ms,
"num_turns": result.num_turns,
"session_id": result.session_id,
"model": result.model_name(),
- })),
- "failure": record.failure,
- });
- let full = json!({
- "job": record,
- });
+ }))),
+ );
+ let _ = concise.insert("failure".to_owned(), json!(record.failure));
+ let mut full = Map::from_iter([("job".to_owned(), json!(record))]);
+ if let Some(wait) = wait {
+ let _ = full.insert("waited_ms".to_owned(), json!(wait.waited_ms));
+ let _ = full.insert("timed_out".to_owned(), json!(wait.timed_out));
+ }
let mut lines = vec![format!(
"job={} status={:?}",
record.job_id.display(),
record.status
)];
+ if let Some(wait) = wait {
+ lines.push(format!(
+ "waited={} timed_out={}",
+ render_duration_ms(wait.waited_ms),
+ wait.timed_out
+ ));
+ }
if let Some(result) = record.result.as_ref() {
lines.push(format!(
"result ready model={} turns={} duration={}",
@@ -988,8 +1101,8 @@ fn background_job_tool_output(
lines.push(format!("failure={} {}", failure.class, failure.detail));
}
fallback_detailed_tool_output(
- &concise,
- &full,
+ &Value::Object(concise),
+ &Value::Object(full),
lines.join("\n"),
None,
SurfaceKind::Read,
@@ -1552,6 +1665,10 @@ fn model_name(model_usage: Option<&Value>) -> Option<String> {
models.keys().next().cloned()
}
+fn elapsed_duration_ms(duration: Duration) -> u64 {
+ duration.as_millis().try_into().unwrap_or(u64::MAX)
+}
+
fn render_duration_ms(duration_ms: u64) -> String {
if duration_ms < 1_000 {
return format!("{duration_ms}ms");
diff --git a/crates/phone-opus/tests/mcp_hardening.rs b/crates/phone-opus/tests/mcp_hardening.rs
index f4a8a89..cbe3354 100644
--- a/crates/phone-opus/tests/mcp_hardening.rs
+++ b/crates/phone-opus/tests/mcp_hardening.rs
@@ -6,7 +6,6 @@ use std::io::{self, BufRead, BufReader, Write};
use std::os::unix::fs::PermissionsExt;
use std::path::{Path, PathBuf};
use std::process::{Child, ChildStdin, ChildStdout, Command, Stdio};
-use std::time::Duration;
use libmcp_testkit::read_json_lines;
use serde as _;
@@ -197,6 +196,9 @@ fi
if [ -n "${PHONE_OPUS_TEST_ARGS_FILE:-}" ]; then
printf '%s\n' "$@" >"$PHONE_OPUS_TEST_ARGS_FILE"
fi
+if [ -n "${PHONE_OPUS_TEST_SLEEP_MS:-}" ]; then
+ python3 -c 'import os,time; time.sleep(int(os.environ["PHONE_OPUS_TEST_SLEEP_MS"]) / 1000.0)'
+fi
if [ -n "${PHONE_OPUS_TEST_CWD_WRITE_PROBE_FILE:-}" ]; then
probe_target="${PWD}/.phone_opus_write_probe"
probe_error="${PHONE_OPUS_TEST_CWD_WRITE_ERROR_FILE:-/tmp/phone-opus-write.err}"
@@ -287,6 +289,7 @@ fn cold_start_exposes_consult_and_ops_tools() -> TestResult {
let tool_names = tool_names(&tools);
assert!(tool_names.contains(&"consult"));
assert!(tool_names.contains(&"consult_job"));
+ assert!(tool_names.contains(&"consult_wait"));
assert!(tool_names.contains(&"consult_jobs"));
assert!(tool_names.contains(&"health_snapshot"));
assert!(tool_names.contains(&"telemetry_snapshot"));
@@ -536,7 +539,7 @@ fn consult_can_resume_a_prior_session_with_read_only_toolset_and_requested_worki
}
#[test]
-fn consult_can_run_in_background_and_be_polled() -> TestResult {
+fn consult_can_run_in_background_and_be_waited_on_or_polled() -> TestResult {
let root = temp_root("consult_background")?;
let state_home = root.join("state-home");
let sandbox = root.join("sandbox");
@@ -593,6 +596,7 @@ fn consult_can_run_in_background_and_be_polled() -> TestResult {
("PHONE_OPUS_TEST_STDOUT_FILE", stdout_path.as_str()),
("PHONE_OPUS_TEST_ARGS_FILE", args_path.as_str()),
("PHONE_OPUS_TEST_PWD_FILE", pwd_path.as_str()),
+ ("PHONE_OPUS_TEST_SLEEP_MS", "100"),
];
let mut harness = McpHarness::spawn(&state_home, &env)?;
let _ = harness.initialize()?;
@@ -609,6 +613,13 @@ fn consult_can_run_in_background_and_be_polled() -> TestResult {
)?;
assert_tool_ok(&submit);
assert_eq!(tool_content(&submit)["mode"].as_str(), Some("background"));
+ assert!(
+ tool_content(&submit)["follow_up_tools"]
+ .as_array()
+ .into_iter()
+ .flatten()
+ .any(|value| value == "consult_wait")
+ );
let job_id = must_some(
tool_content(&submit)["job_id"].as_str().map(str::to_owned),
"background job id",
@@ -616,30 +627,43 @@ fn consult_can_run_in_background_and_be_polled() -> TestResult {
let _ = uuid::Uuid::parse_str(&job_id)
.map_err(|error| io::Error::other(format!("job id uuid parse: {error}")))?;
- let mut job = Value::Null;
- for _ in 0..100 {
- job = harness.call_tool(
- 4,
- "consult_job",
- json!({
- "job_id": job_id,
- "render": "json"
- }),
- )?;
- assert_tool_ok(&job);
- if tool_content(&job)["status"].as_str() == Some("succeeded") {
- break;
- }
- std::thread::sleep(Duration::from_millis(10));
- }
+ let timed_out = harness.call_tool(
+ 4,
+ "consult_wait",
+ json!({
+ "job_id": job_id,
+ "timeout_ms": 0,
+ "render": "json"
+ }),
+ )?;
+ assert_tool_ok(&timed_out);
+ assert_eq!(tool_content(&timed_out)["timed_out"].as_bool(), Some(true));
+ assert_eq!(tool_content(&timed_out)["done"].as_bool(), Some(false));
- assert_eq!(tool_content(&job)["status"].as_str(), Some("succeeded"));
+ let waited = harness.call_tool(
+ 5,
+ "consult_wait",
+ json!({
+ "job_id": job_id,
+ "timeout_ms": 5_000,
+ "poll_interval_ms": 10,
+ "render": "json"
+ }),
+ )?;
+ assert_tool_ok(&waited);
+ assert_eq!(tool_content(&waited)["timed_out"].as_bool(), Some(false));
+ assert_eq!(tool_content(&waited)["status"].as_str(), Some("succeeded"));
+ assert!(
+ tool_content(&waited)["waited_ms"]
+ .as_u64()
+ .is_some_and(|value| value >= 50)
+ );
assert_eq!(
- tool_content(&job)["result"]["response"].as_str(),
+ tool_content(&waited)["result"]["response"].as_str(),
Some("background oracle")
);
let persisted_output_path = must_some(
- tool_content(&job)["result"]["persisted_output_path"]
+ tool_content(&waited)["result"]["persisted_output_path"]
.as_str()
.map(str::to_owned),
"background persisted output path",
@@ -659,7 +683,18 @@ fn consult_can_run_in_background_and_be_polled() -> TestResult {
Some("background oracle")
);
- let jobs = harness.call_tool(5, "consult_jobs", json!({ "render": "json" }))?;
+ let job = harness.call_tool(
+ 6,
+ "consult_job",
+ json!({
+ "job_id": job_id,
+ "render": "json"
+ }),
+ )?;
+ assert_tool_ok(&job);
+ assert_eq!(tool_content(&job)["status"].as_str(), Some("succeeded"));
+
+ let jobs = harness.call_tool(7, "consult_jobs", json!({ "render": "json" }))?;
assert_tool_ok(&jobs);
assert!(
tool_content(&jobs)["jobs"]