diff options
| author | main <main@swarm.moe> | 2026-03-25 11:16:43 -0400 |
|---|---|---|
| committer | main <main@swarm.moe> | 2026-03-25 11:16:43 -0400 |
| commit | da6410fd33148e7dd0fec9190a1624e34f745b96 (patch) | |
| tree | 2dc0d974c9edb9d2e734cd2d6eab3f27f3aa5786 /crates/phone-opus | |
| parent | 401fb8edcd52880c70597f1c49c5309a78aaded6 (diff) | |
| download | phone_opus-da6410fd33148e7dd0fec9190a1624e34f745b96.zip | |
Simplify consults to final JSON output
Diffstat (limited to 'crates/phone-opus')
| -rw-r--r-- | crates/phone-opus/src/mcp/service.rs | 145 | ||||
| -rw-r--r-- | crates/phone-opus/tests/mcp_hardening.rs | 88 |
2 files changed, 71 insertions, 162 deletions
diff --git a/crates/phone-opus/src/mcp/service.rs b/crates/phone-opus/src/mcp/service.rs index b92f92e..5a6ca91 100644 --- a/crates/phone-opus/src/mcp/service.rs +++ b/crates/phone-opus/src/mcp/service.rs @@ -1,6 +1,6 @@ use std::collections::{BTreeMap, BTreeSet}; use std::fs; -use std::io::{self, BufRead, Read, Write}; +use std::io::{self, BufRead, Write}; #[cfg(unix)] use std::os::unix::fs::symlink; use std::path::{Path, PathBuf}; @@ -169,10 +169,18 @@ impl ConsultRequest { self.session_plan.planned_session().display() } + #[allow( + dead_code, + reason = "session metadata is retained internally but hidden from the public surface" + )] fn context_mode(&self) -> &'static str { self.session_plan.context_mode() } + #[allow( + dead_code, + reason = "session metadata is retained internally but hidden from the public surface" + )] fn reused_session_id(&self) -> Option<String> { self.session_plan.reused_session_id() } @@ -683,13 +691,6 @@ struct ClaudeJsonEnvelope { uuid: Option<String>, } -#[derive(Debug, Default)] -struct ClaudeStreamCapture { - stdout: String, - observed_session_id: Option<String>, - final_envelope: Option<ClaudeJsonEnvelope>, -} - #[derive(Debug)] struct ConsultResponse { cwd: WorkingDirectory, @@ -1445,9 +1446,8 @@ fn invoke_claude(request: &ConsultRequest) -> Result<ConsultResponse, ConsultInv let _ = command .arg(claude_binary()) .arg("-p") - .arg("--verbose") .arg("--output-format") - .arg("stream-json") + .arg("json") .arg("--strict-mcp-config") .arg("--mcp-config") .arg(EMPTY_MCP_CONFIG) @@ -1465,63 +1465,27 @@ fn invoke_claude(request: &ConsultRequest) -> Result<ConsultResponse, ConsultInv if let Some(session_id) = request.launch_session_id() { let _ = command.arg("--session-id").arg(session_id); } - let mut child = command + let child = command .arg(request.prompt.rendered()) .spawn() .map_err(ConsultInvocationError::Spawn)?; request .remember_planned_context() .map_err(ConsultInvocationError::Spawn)?; - let stdout = child - .stdout - .take() - .ok_or_else(|| ConsultInvocationError::Spawn(io::Error::other("missing Claude stdout")))?; - let stderr = child - .stderr - .take() - .ok_or_else(|| ConsultInvocationError::Spawn(io::Error::other("missing Claude stderr")))?; - let stderr_reader = thread::spawn(move || -> io::Result<String> { - let mut stderr = stderr; - let mut buffer = String::new(); - let _ = stderr.read_to_string(&mut buffer)?; - Ok(buffer) - }); - let capture = capture_claude_stream(stdout, request)?; - let status = child.wait().map_err(ConsultInvocationError::Spawn)?; - let stderr = stderr_reader - .join() - .map_err(|_| { - ConsultInvocationError::Spawn(io::Error::other("Claude stderr reader panicked")) - })? - .map_err(ConsultInvocationError::Spawn)? - .trim() - .to_owned(); - let stdout = capture.stdout.trim().to_owned(); - let envelope = match capture.final_envelope { - Some(envelope) => envelope, - None if !status.success() => { - return Err(ConsultInvocationError::Downstream(downstream_message( - status.code(), - &stdout, - &stderr, - ))); - } - None => { - return Err(ConsultInvocationError::InvalidJson(format!( - "missing Claude result envelope; stdout={stdout}; stderr={stderr}" - ))); - } - }; + let output = child + .wait_with_output() + .map_err(ConsultInvocationError::Spawn)?; + let status = output.status; + let stdout = String::from_utf8_lossy(&output.stdout).trim().to_owned(); + let stderr = String::from_utf8_lossy(&output.stderr).trim().to_owned(); + let envelope = parse_claude_json_envelope(&stdout, &stderr, status.code())?; if envelope.envelope_type != "result" { - return Err(ConsultInvocationError::Downstream(format!( + return Err(ConsultInvocationError::InvalidJson(format!( "unexpected Claude envelope type `{}`", envelope.envelope_type ))); } - let observed_session_id = envelope - .session_id - .clone() - .or(capture.observed_session_id.clone()); + let observed_session_id = envelope.session_id.clone(); if !status.success() || envelope.is_error || envelope.subtype.as_deref() != Some("success") { return Err(ConsultInvocationError::Downstream( envelope @@ -1561,63 +1525,20 @@ fn invoke_claude(request: &ConsultRequest) -> Result<ConsultResponse, ConsultInv }) } -fn capture_claude_stream( - stdout: impl Read, - request: &ConsultRequest, -) -> Result<ClaudeStreamCapture, ConsultInvocationError> { - let mut capture = ClaudeStreamCapture::default(); - let reader = io::BufReader::new(stdout); - for line in reader.lines() { - let line = line.map_err(ConsultInvocationError::Spawn)?; - let trimmed = line.trim(); - if trimmed.is_empty() { - continue; - } - capture.stdout.push_str(trimmed); - capture.stdout.push('\n'); - let Ok(value) = serde_json::from_str::<Value>(trimmed) else { - continue; - }; - if capture.observed_session_id.is_none() - && let Some(session_id) = stream_session_id(&value) - { - request - .remember_context(Some(session_id.as_str())) - .map_err(ConsultInvocationError::Spawn)?; - capture.observed_session_id = Some(session_id); - } - if value.get("type").and_then(Value::as_str) == Some("result") { - let envelope = - serde_json::from_value::<ClaudeJsonEnvelope>(value).map_err(|error| { - ConsultInvocationError::InvalidJson(format!("{error}; stream_line={trimmed}")) - })?; - if let Some(session_id) = envelope.session_id.as_deref() { - request - .remember_context(Some(session_id)) - .map_err(ConsultInvocationError::Spawn)?; - capture.observed_session_id = Some(session_id.to_owned()); - } - capture.final_envelope = Some(envelope); +fn parse_claude_json_envelope( + stdout: &str, + stderr: &str, + status_code: Option<i32>, +) -> Result<ClaudeJsonEnvelope, ConsultInvocationError> { + serde_json::from_str(stdout).map_err(|error| { + if status_code == Some(0) { + ConsultInvocationError::InvalidJson(format!( + "{error}; stdout={stdout}; stderr={stderr}" + )) + } else { + ConsultInvocationError::Downstream(downstream_message(status_code, stdout, stderr)) } - } - Ok(capture) -} - -fn stream_session_id(value: &Value) -> Option<String> { - match value { - Value::Object(object) => object - .iter() - .find_map(|(key, value)| { - ((key == "session_id") || (key == "sessionId")) - .then_some(value) - .and_then(Value::as_str) - .and_then(SessionHandle::parse) - .map(|session| session.display()) - }) - .or_else(|| object.values().find_map(stream_session_id)), - Value::Array(array) => array.iter().find_map(stream_session_id), - _ => None, - } + }) } fn downstream_message(status_code: Option<i32>, stdout: &str, stderr: &str) -> String { diff --git a/crates/phone-opus/tests/mcp_hardening.rs b/crates/phone-opus/tests/mcp_hardening.rs index 29e90c4..ed996db 100644 --- a/crates/phone-opus/tests/mcp_hardening.rs +++ b/crates/phone-opus/tests/mcp_hardening.rs @@ -280,62 +280,45 @@ fn seed_caller_claude_home(home: &Path) -> TestResult { Ok(()) } -fn write_fake_claude_stream_success( +fn write_fake_claude_json_success( path: &Path, result: &str, session_id: &str, uuid: &str, ) -> TestResult { - let payload = [ - serde_json::to_string(&json!({ - "type": "system", - "subtype": "init", - "session_id": session_id, - }))?, - serde_json::to_string(&json!({ - "type": "result", - "subtype": "success", - "is_error": false, - "duration_ms": 1234, - "duration_api_ms": 1200, - "num_turns": 2, - "result": result, - "stop_reason": "end_turn", - "session_id": session_id, - "total_cost_usd": 0.125, - "usage": { - "input_tokens": 10, - "output_tokens": 5 - }, - "modelUsage": { - "claude-opus-4-6": { - "inputTokens": 10, - "outputTokens": 5 - } - }, - "permission_denials": [], - "fast_mode_state": "off", - "uuid": uuid - }))?, - ] - .join("\n"); - must(fs::write(path, format!("{payload}\n")), "write fake stdout") -} - -fn write_fake_claude_stream_init(path: &Path, session_id: &str) -> TestResult { must( fs::write( path, format!( "{}\n", serde_json::to_string(&json!({ - "type": "system", - "subtype": "init", + "type": "result", + "subtype": "success", + "is_error": false, + "duration_ms": 1234, + "duration_api_ms": 1200, + "num_turns": 2, + "result": result, + "stop_reason": "end_turn", "session_id": session_id, + "total_cost_usd": 0.125, + "usage": { + "input_tokens": 10, + "output_tokens": 5 + }, + "modelUsage": { + "claude-opus-4-6": { + "inputTokens": 10, + "outputTokens": 5 + } + }, + "permission_denials": [], + "fast_mode_state": "off", + "uuid": uuid }))? ), ), - "write fake init stream", + "write fake stdout", ) } @@ -411,7 +394,7 @@ fn consult_is_one_shot_and_hides_session_state() -> TestResult { let first_observed_session = "81f218eb-568b-409b-871b-f6e86d8f666f"; let second_observed_session = "dbd3b6c2-4757-4b45-a8f0-f3d877e1a13f"; write_fake_claude_script(&fake_claude)?; - write_fake_claude_stream_success(&stdout_file, "oracle", first_observed_session, "uuid-123")?; + write_fake_claude_json_success(&stdout_file, "oracle", first_observed_session, "uuid-123")?; let claude_bin = fake_claude.display().to_string(); let stdout_path = stdout_file.display().to_string(); @@ -481,7 +464,7 @@ fn consult_is_one_shot_and_hides_session_state() -> TestResult { )?; assert!(uuid::Uuid::parse_str(&first_session_id).is_ok()); - write_fake_claude_stream_success( + write_fake_claude_json_success( &stdout_file, "oracle again", second_observed_session, @@ -573,9 +556,8 @@ fn consult_is_one_shot_and_hides_session_state() -> TestResult { let args = must(fs::read_to_string(&args_file), "read fake args file")?; let lines = args.lines().collect::<Vec<_>>(); assert!(lines.contains(&"-p")); - assert!(lines.contains(&"--verbose")); assert!(lines.contains(&"--output-format")); - assert!(lines.contains(&"stream-json")); + assert!(lines.contains(&"json")); assert!(lines.contains(&"--strict-mcp-config")); assert!(lines.contains(&"--mcp-config")); assert!(lines.contains(&"{\"mcpServers\":{}}")); @@ -795,7 +777,7 @@ fn quota_failures_hide_session_state_on_public_surface() -> TestResult { let stdout_file = root.join("stdout.json"); let remembered_session = "84b9d462-5af9-4a4e-8e44-379a8d0c46d7"; write_fake_claude_script(&fake_claude)?; - write_fake_claude_stream_success(&stdout_file, "ok", remembered_session, "uuid-remembered")?; + write_fake_claude_json_success(&stdout_file, "ok", remembered_session, "uuid-remembered")?; let claude_bin = fake_claude.display().to_string(); let stdout_path = stdout_file.display().to_string(); @@ -890,20 +872,19 @@ fn quota_failures_hide_session_state_on_public_surface() -> TestResult { #[test] fn fresh_failures_keep_internal_session_state_without_public_leakage() -> TestResult { - let root = temp_root("consult_fresh_stream_failure")?; + let root = temp_root("consult_fresh_json_failure")?; let state_home = root.join("state-home"); let sandbox = root.join("sandbox"); let caller_home = root.join("caller-home"); let fake_claude = root.join("claude"); let stdout_file = root.join("stdout.json"); let args_file = root.join("args.txt"); - let init_session = "550e8400-e29b-41d4-a716-446655440000"; must(fs::create_dir_all(&state_home), "create state home")?; must(fs::create_dir_all(&sandbox), "create sandbox")?; must(fs::create_dir_all(&caller_home), "create caller home")?; seed_caller_claude_home(&caller_home)?; write_fake_claude_script(&fake_claude)?; - write_fake_claude_stream_init(&stdout_file, init_session)?; + must(fs::write(&stdout_file, ""), "write empty fake stdout")?; let claude_bin = fake_claude.display().to_string(); let stdout_path = stdout_file.display().to_string(); @@ -944,6 +925,13 @@ fn fresh_failures_keep_internal_session_state_without_public_leakage() -> TestRe let args = must(fs::read_to_string(&args_file), "read fresh failure args")?; assert!(args.contains("--session-id")); assert!(!args.contains("--resume")); + let planned_session_id = must_some( + args.lines() + .collect::<Vec<_>>() + .windows(2) + .find_map(|window| (window[0] == "--session-id").then_some(window[1].to_owned())), + "planned session id", + )?; let consult_context_index = must( fs::read_to_string( state_home @@ -959,7 +947,7 @@ fn fresh_failures_keep_internal_session_state_without_public_leakage() -> TestRe )?; assert_eq!( consult_context_index["by_cwd"][sandbox.display().to_string()]["session_id"].as_str(), - Some(init_session) + Some(planned_session_id.as_str()) ); assert!( failed["result"]["content"] |