From da6410fd33148e7dd0fec9190a1624e34f745b96 Mon Sep 17 00:00:00 2001 From: main Date: Wed, 25 Mar 2026 11:16:43 -0400 Subject: Simplify consults to final JSON output --- crates/phone-opus/src/mcp/service.rs | 145 +++++++------------------------ crates/phone-opus/tests/mcp_hardening.rs | 88 ++++++++----------- 2 files changed, 71 insertions(+), 162 deletions(-) (limited to 'crates') diff --git a/crates/phone-opus/src/mcp/service.rs b/crates/phone-opus/src/mcp/service.rs index b92f92e..5a6ca91 100644 --- a/crates/phone-opus/src/mcp/service.rs +++ b/crates/phone-opus/src/mcp/service.rs @@ -1,6 +1,6 @@ use std::collections::{BTreeMap, BTreeSet}; use std::fs; -use std::io::{self, BufRead, Read, Write}; +use std::io::{self, BufRead, Write}; #[cfg(unix)] use std::os::unix::fs::symlink; use std::path::{Path, PathBuf}; @@ -169,10 +169,18 @@ impl ConsultRequest { self.session_plan.planned_session().display() } + #[allow( + dead_code, + reason = "session metadata is retained internally but hidden from the public surface" + )] fn context_mode(&self) -> &'static str { self.session_plan.context_mode() } + #[allow( + dead_code, + reason = "session metadata is retained internally but hidden from the public surface" + )] fn reused_session_id(&self) -> Option { self.session_plan.reused_session_id() } @@ -683,13 +691,6 @@ struct ClaudeJsonEnvelope { uuid: Option, } -#[derive(Debug, Default)] -struct ClaudeStreamCapture { - stdout: String, - observed_session_id: Option, - final_envelope: Option, -} - #[derive(Debug)] struct ConsultResponse { cwd: WorkingDirectory, @@ -1445,9 +1446,8 @@ fn invoke_claude(request: &ConsultRequest) -> Result Result io::Result { - let mut stderr = stderr; - let mut buffer = String::new(); - let _ = stderr.read_to_string(&mut buffer)?; - Ok(buffer) - }); - let capture = capture_claude_stream(stdout, request)?; - let status = child.wait().map_err(ConsultInvocationError::Spawn)?; - let stderr = stderr_reader - .join() - .map_err(|_| { - ConsultInvocationError::Spawn(io::Error::other("Claude stderr reader panicked")) - })? - .map_err(ConsultInvocationError::Spawn)? - .trim() - .to_owned(); - let stdout = capture.stdout.trim().to_owned(); - let envelope = match capture.final_envelope { - Some(envelope) => envelope, - None if !status.success() => { - return Err(ConsultInvocationError::Downstream(downstream_message( - status.code(), - &stdout, - &stderr, - ))); - } - None => { - return Err(ConsultInvocationError::InvalidJson(format!( - "missing Claude result envelope; stdout={stdout}; stderr={stderr}" - ))); - } - }; + let output = child + .wait_with_output() + .map_err(ConsultInvocationError::Spawn)?; + let status = output.status; + let stdout = String::from_utf8_lossy(&output.stdout).trim().to_owned(); + let stderr = String::from_utf8_lossy(&output.stderr).trim().to_owned(); + let envelope = parse_claude_json_envelope(&stdout, &stderr, status.code())?; if envelope.envelope_type != "result" { - return Err(ConsultInvocationError::Downstream(format!( + return Err(ConsultInvocationError::InvalidJson(format!( "unexpected Claude envelope type `{}`", envelope.envelope_type ))); } - let observed_session_id = envelope - .session_id - .clone() - .or(capture.observed_session_id.clone()); + let observed_session_id = envelope.session_id.clone(); if !status.success() || envelope.is_error || envelope.subtype.as_deref() != Some("success") { return Err(ConsultInvocationError::Downstream( envelope @@ -1561,63 +1525,20 @@ fn invoke_claude(request: &ConsultRequest) -> Result Result { - let mut capture = ClaudeStreamCapture::default(); - let reader = io::BufReader::new(stdout); - for line in reader.lines() { - let line = line.map_err(ConsultInvocationError::Spawn)?; - let trimmed = line.trim(); - if trimmed.is_empty() { - continue; - } - capture.stdout.push_str(trimmed); - capture.stdout.push('\n'); - let Ok(value) = serde_json::from_str::(trimmed) else { - continue; - }; - if capture.observed_session_id.is_none() - && let Some(session_id) = stream_session_id(&value) - { - request - .remember_context(Some(session_id.as_str())) - .map_err(ConsultInvocationError::Spawn)?; - capture.observed_session_id = Some(session_id); - } - if value.get("type").and_then(Value::as_str) == Some("result") { - let envelope = - serde_json::from_value::(value).map_err(|error| { - ConsultInvocationError::InvalidJson(format!("{error}; stream_line={trimmed}")) - })?; - if let Some(session_id) = envelope.session_id.as_deref() { - request - .remember_context(Some(session_id)) - .map_err(ConsultInvocationError::Spawn)?; - capture.observed_session_id = Some(session_id.to_owned()); - } - capture.final_envelope = Some(envelope); +fn parse_claude_json_envelope( + stdout: &str, + stderr: &str, + status_code: Option, +) -> Result { + serde_json::from_str(stdout).map_err(|error| { + if status_code == Some(0) { + ConsultInvocationError::InvalidJson(format!( + "{error}; stdout={stdout}; stderr={stderr}" + )) + } else { + ConsultInvocationError::Downstream(downstream_message(status_code, stdout, stderr)) } - } - Ok(capture) -} - -fn stream_session_id(value: &Value) -> Option { - match value { - Value::Object(object) => object - .iter() - .find_map(|(key, value)| { - ((key == "session_id") || (key == "sessionId")) - .then_some(value) - .and_then(Value::as_str) - .and_then(SessionHandle::parse) - .map(|session| session.display()) - }) - .or_else(|| object.values().find_map(stream_session_id)), - Value::Array(array) => array.iter().find_map(stream_session_id), - _ => None, - } + }) } fn downstream_message(status_code: Option, stdout: &str, stderr: &str) -> String { diff --git a/crates/phone-opus/tests/mcp_hardening.rs b/crates/phone-opus/tests/mcp_hardening.rs index 29e90c4..ed996db 100644 --- a/crates/phone-opus/tests/mcp_hardening.rs +++ b/crates/phone-opus/tests/mcp_hardening.rs @@ -280,62 +280,45 @@ fn seed_caller_claude_home(home: &Path) -> TestResult { Ok(()) } -fn write_fake_claude_stream_success( +fn write_fake_claude_json_success( path: &Path, result: &str, session_id: &str, uuid: &str, ) -> TestResult { - let payload = [ - serde_json::to_string(&json!({ - "type": "system", - "subtype": "init", - "session_id": session_id, - }))?, - serde_json::to_string(&json!({ - "type": "result", - "subtype": "success", - "is_error": false, - "duration_ms": 1234, - "duration_api_ms": 1200, - "num_turns": 2, - "result": result, - "stop_reason": "end_turn", - "session_id": session_id, - "total_cost_usd": 0.125, - "usage": { - "input_tokens": 10, - "output_tokens": 5 - }, - "modelUsage": { - "claude-opus-4-6": { - "inputTokens": 10, - "outputTokens": 5 - } - }, - "permission_denials": [], - "fast_mode_state": "off", - "uuid": uuid - }))?, - ] - .join("\n"); - must(fs::write(path, format!("{payload}\n")), "write fake stdout") -} - -fn write_fake_claude_stream_init(path: &Path, session_id: &str) -> TestResult { must( fs::write( path, format!( "{}\n", serde_json::to_string(&json!({ - "type": "system", - "subtype": "init", + "type": "result", + "subtype": "success", + "is_error": false, + "duration_ms": 1234, + "duration_api_ms": 1200, + "num_turns": 2, + "result": result, + "stop_reason": "end_turn", "session_id": session_id, + "total_cost_usd": 0.125, + "usage": { + "input_tokens": 10, + "output_tokens": 5 + }, + "modelUsage": { + "claude-opus-4-6": { + "inputTokens": 10, + "outputTokens": 5 + } + }, + "permission_denials": [], + "fast_mode_state": "off", + "uuid": uuid }))? ), ), - "write fake init stream", + "write fake stdout", ) } @@ -411,7 +394,7 @@ fn consult_is_one_shot_and_hides_session_state() -> TestResult { let first_observed_session = "81f218eb-568b-409b-871b-f6e86d8f666f"; let second_observed_session = "dbd3b6c2-4757-4b45-a8f0-f3d877e1a13f"; write_fake_claude_script(&fake_claude)?; - write_fake_claude_stream_success(&stdout_file, "oracle", first_observed_session, "uuid-123")?; + write_fake_claude_json_success(&stdout_file, "oracle", first_observed_session, "uuid-123")?; let claude_bin = fake_claude.display().to_string(); let stdout_path = stdout_file.display().to_string(); @@ -481,7 +464,7 @@ fn consult_is_one_shot_and_hides_session_state() -> TestResult { )?; assert!(uuid::Uuid::parse_str(&first_session_id).is_ok()); - write_fake_claude_stream_success( + write_fake_claude_json_success( &stdout_file, "oracle again", second_observed_session, @@ -573,9 +556,8 @@ fn consult_is_one_shot_and_hides_session_state() -> TestResult { let args = must(fs::read_to_string(&args_file), "read fake args file")?; let lines = args.lines().collect::>(); assert!(lines.contains(&"-p")); - assert!(lines.contains(&"--verbose")); assert!(lines.contains(&"--output-format")); - assert!(lines.contains(&"stream-json")); + assert!(lines.contains(&"json")); assert!(lines.contains(&"--strict-mcp-config")); assert!(lines.contains(&"--mcp-config")); assert!(lines.contains(&"{\"mcpServers\":{}}")); @@ -795,7 +777,7 @@ fn quota_failures_hide_session_state_on_public_surface() -> TestResult { let stdout_file = root.join("stdout.json"); let remembered_session = "84b9d462-5af9-4a4e-8e44-379a8d0c46d7"; write_fake_claude_script(&fake_claude)?; - write_fake_claude_stream_success(&stdout_file, "ok", remembered_session, "uuid-remembered")?; + write_fake_claude_json_success(&stdout_file, "ok", remembered_session, "uuid-remembered")?; let claude_bin = fake_claude.display().to_string(); let stdout_path = stdout_file.display().to_string(); @@ -890,20 +872,19 @@ fn quota_failures_hide_session_state_on_public_surface() -> TestResult { #[test] fn fresh_failures_keep_internal_session_state_without_public_leakage() -> TestResult { - let root = temp_root("consult_fresh_stream_failure")?; + let root = temp_root("consult_fresh_json_failure")?; let state_home = root.join("state-home"); let sandbox = root.join("sandbox"); let caller_home = root.join("caller-home"); let fake_claude = root.join("claude"); let stdout_file = root.join("stdout.json"); let args_file = root.join("args.txt"); - let init_session = "550e8400-e29b-41d4-a716-446655440000"; must(fs::create_dir_all(&state_home), "create state home")?; must(fs::create_dir_all(&sandbox), "create sandbox")?; must(fs::create_dir_all(&caller_home), "create caller home")?; seed_caller_claude_home(&caller_home)?; write_fake_claude_script(&fake_claude)?; - write_fake_claude_stream_init(&stdout_file, init_session)?; + must(fs::write(&stdout_file, ""), "write empty fake stdout")?; let claude_bin = fake_claude.display().to_string(); let stdout_path = stdout_file.display().to_string(); @@ -944,6 +925,13 @@ fn fresh_failures_keep_internal_session_state_without_public_leakage() -> TestRe let args = must(fs::read_to_string(&args_file), "read fresh failure args")?; assert!(args.contains("--session-id")); assert!(!args.contains("--resume")); + let planned_session_id = must_some( + args.lines() + .collect::>() + .windows(2) + .find_map(|window| (window[0] == "--session-id").then_some(window[1].to_owned())), + "planned session id", + )?; let consult_context_index = must( fs::read_to_string( state_home @@ -959,7 +947,7 @@ fn fresh_failures_keep_internal_session_state_without_public_leakage() -> TestRe )?; assert_eq!( consult_context_index["by_cwd"][sandbox.display().to_string()]["session_id"].as_str(), - Some(init_session) + Some(planned_session_id.as_str()) ); assert!( failed["result"]["content"] -- cgit v1.2.3