swarm repositories / source
aboutsummaryrefslogtreecommitdiff
path: root/crates
diff options
context:
space:
mode:
Diffstat (limited to 'crates')
-rw-r--r--crates/phone-opus/src/mcp/service.rs145
-rw-r--r--crates/phone-opus/tests/mcp_hardening.rs88
2 files changed, 71 insertions, 162 deletions
diff --git a/crates/phone-opus/src/mcp/service.rs b/crates/phone-opus/src/mcp/service.rs
index b92f92e..5a6ca91 100644
--- a/crates/phone-opus/src/mcp/service.rs
+++ b/crates/phone-opus/src/mcp/service.rs
@@ -1,6 +1,6 @@
use std::collections::{BTreeMap, BTreeSet};
use std::fs;
-use std::io::{self, BufRead, Read, Write};
+use std::io::{self, BufRead, Write};
#[cfg(unix)]
use std::os::unix::fs::symlink;
use std::path::{Path, PathBuf};
@@ -169,10 +169,18 @@ impl ConsultRequest {
self.session_plan.planned_session().display()
}
+ #[allow(
+ dead_code,
+ reason = "session metadata is retained internally but hidden from the public surface"
+ )]
fn context_mode(&self) -> &'static str {
self.session_plan.context_mode()
}
+ #[allow(
+ dead_code,
+ reason = "session metadata is retained internally but hidden from the public surface"
+ )]
fn reused_session_id(&self) -> Option<String> {
self.session_plan.reused_session_id()
}
@@ -683,13 +691,6 @@ struct ClaudeJsonEnvelope {
uuid: Option<String>,
}
-#[derive(Debug, Default)]
-struct ClaudeStreamCapture {
- stdout: String,
- observed_session_id: Option<String>,
- final_envelope: Option<ClaudeJsonEnvelope>,
-}
-
#[derive(Debug)]
struct ConsultResponse {
cwd: WorkingDirectory,
@@ -1445,9 +1446,8 @@ fn invoke_claude(request: &ConsultRequest) -> Result<ConsultResponse, ConsultInv
let _ = command
.arg(claude_binary())
.arg("-p")
- .arg("--verbose")
.arg("--output-format")
- .arg("stream-json")
+ .arg("json")
.arg("--strict-mcp-config")
.arg("--mcp-config")
.arg(EMPTY_MCP_CONFIG)
@@ -1465,63 +1465,27 @@ fn invoke_claude(request: &ConsultRequest) -> Result<ConsultResponse, ConsultInv
if let Some(session_id) = request.launch_session_id() {
let _ = command.arg("--session-id").arg(session_id);
}
- let mut child = command
+ let child = command
.arg(request.prompt.rendered())
.spawn()
.map_err(ConsultInvocationError::Spawn)?;
request
.remember_planned_context()
.map_err(ConsultInvocationError::Spawn)?;
- let stdout = child
- .stdout
- .take()
- .ok_or_else(|| ConsultInvocationError::Spawn(io::Error::other("missing Claude stdout")))?;
- let stderr = child
- .stderr
- .take()
- .ok_or_else(|| ConsultInvocationError::Spawn(io::Error::other("missing Claude stderr")))?;
- let stderr_reader = thread::spawn(move || -> io::Result<String> {
- let mut stderr = stderr;
- let mut buffer = String::new();
- let _ = stderr.read_to_string(&mut buffer)?;
- Ok(buffer)
- });
- let capture = capture_claude_stream(stdout, request)?;
- let status = child.wait().map_err(ConsultInvocationError::Spawn)?;
- let stderr = stderr_reader
- .join()
- .map_err(|_| {
- ConsultInvocationError::Spawn(io::Error::other("Claude stderr reader panicked"))
- })?
- .map_err(ConsultInvocationError::Spawn)?
- .trim()
- .to_owned();
- let stdout = capture.stdout.trim().to_owned();
- let envelope = match capture.final_envelope {
- Some(envelope) => envelope,
- None if !status.success() => {
- return Err(ConsultInvocationError::Downstream(downstream_message(
- status.code(),
- &stdout,
- &stderr,
- )));
- }
- None => {
- return Err(ConsultInvocationError::InvalidJson(format!(
- "missing Claude result envelope; stdout={stdout}; stderr={stderr}"
- )));
- }
- };
+ let output = child
+ .wait_with_output()
+ .map_err(ConsultInvocationError::Spawn)?;
+ let status = output.status;
+ let stdout = String::from_utf8_lossy(&output.stdout).trim().to_owned();
+ let stderr = String::from_utf8_lossy(&output.stderr).trim().to_owned();
+ let envelope = parse_claude_json_envelope(&stdout, &stderr, status.code())?;
if envelope.envelope_type != "result" {
- return Err(ConsultInvocationError::Downstream(format!(
+ return Err(ConsultInvocationError::InvalidJson(format!(
"unexpected Claude envelope type `{}`",
envelope.envelope_type
)));
}
- let observed_session_id = envelope
- .session_id
- .clone()
- .or(capture.observed_session_id.clone());
+ let observed_session_id = envelope.session_id.clone();
if !status.success() || envelope.is_error || envelope.subtype.as_deref() != Some("success") {
return Err(ConsultInvocationError::Downstream(
envelope
@@ -1561,63 +1525,20 @@ fn invoke_claude(request: &ConsultRequest) -> Result<ConsultResponse, ConsultInv
})
}
-fn capture_claude_stream(
- stdout: impl Read,
- request: &ConsultRequest,
-) -> Result<ClaudeStreamCapture, ConsultInvocationError> {
- let mut capture = ClaudeStreamCapture::default();
- let reader = io::BufReader::new(stdout);
- for line in reader.lines() {
- let line = line.map_err(ConsultInvocationError::Spawn)?;
- let trimmed = line.trim();
- if trimmed.is_empty() {
- continue;
- }
- capture.stdout.push_str(trimmed);
- capture.stdout.push('\n');
- let Ok(value) = serde_json::from_str::<Value>(trimmed) else {
- continue;
- };
- if capture.observed_session_id.is_none()
- && let Some(session_id) = stream_session_id(&value)
- {
- request
- .remember_context(Some(session_id.as_str()))
- .map_err(ConsultInvocationError::Spawn)?;
- capture.observed_session_id = Some(session_id);
- }
- if value.get("type").and_then(Value::as_str) == Some("result") {
- let envelope =
- serde_json::from_value::<ClaudeJsonEnvelope>(value).map_err(|error| {
- ConsultInvocationError::InvalidJson(format!("{error}; stream_line={trimmed}"))
- })?;
- if let Some(session_id) = envelope.session_id.as_deref() {
- request
- .remember_context(Some(session_id))
- .map_err(ConsultInvocationError::Spawn)?;
- capture.observed_session_id = Some(session_id.to_owned());
- }
- capture.final_envelope = Some(envelope);
+fn parse_claude_json_envelope(
+ stdout: &str,
+ stderr: &str,
+ status_code: Option<i32>,
+) -> Result<ClaudeJsonEnvelope, ConsultInvocationError> {
+ serde_json::from_str(stdout).map_err(|error| {
+ if status_code == Some(0) {
+ ConsultInvocationError::InvalidJson(format!(
+ "{error}; stdout={stdout}; stderr={stderr}"
+ ))
+ } else {
+ ConsultInvocationError::Downstream(downstream_message(status_code, stdout, stderr))
}
- }
- Ok(capture)
-}
-
-fn stream_session_id(value: &Value) -> Option<String> {
- match value {
- Value::Object(object) => object
- .iter()
- .find_map(|(key, value)| {
- ((key == "session_id") || (key == "sessionId"))
- .then_some(value)
- .and_then(Value::as_str)
- .and_then(SessionHandle::parse)
- .map(|session| session.display())
- })
- .or_else(|| object.values().find_map(stream_session_id)),
- Value::Array(array) => array.iter().find_map(stream_session_id),
- _ => None,
- }
+ })
}
fn downstream_message(status_code: Option<i32>, stdout: &str, stderr: &str) -> String {
diff --git a/crates/phone-opus/tests/mcp_hardening.rs b/crates/phone-opus/tests/mcp_hardening.rs
index 29e90c4..ed996db 100644
--- a/crates/phone-opus/tests/mcp_hardening.rs
+++ b/crates/phone-opus/tests/mcp_hardening.rs
@@ -280,62 +280,45 @@ fn seed_caller_claude_home(home: &Path) -> TestResult {
Ok(())
}
-fn write_fake_claude_stream_success(
+fn write_fake_claude_json_success(
path: &Path,
result: &str,
session_id: &str,
uuid: &str,
) -> TestResult {
- let payload = [
- serde_json::to_string(&json!({
- "type": "system",
- "subtype": "init",
- "session_id": session_id,
- }))?,
- serde_json::to_string(&json!({
- "type": "result",
- "subtype": "success",
- "is_error": false,
- "duration_ms": 1234,
- "duration_api_ms": 1200,
- "num_turns": 2,
- "result": result,
- "stop_reason": "end_turn",
- "session_id": session_id,
- "total_cost_usd": 0.125,
- "usage": {
- "input_tokens": 10,
- "output_tokens": 5
- },
- "modelUsage": {
- "claude-opus-4-6": {
- "inputTokens": 10,
- "outputTokens": 5
- }
- },
- "permission_denials": [],
- "fast_mode_state": "off",
- "uuid": uuid
- }))?,
- ]
- .join("\n");
- must(fs::write(path, format!("{payload}\n")), "write fake stdout")
-}
-
-fn write_fake_claude_stream_init(path: &Path, session_id: &str) -> TestResult {
must(
fs::write(
path,
format!(
"{}\n",
serde_json::to_string(&json!({
- "type": "system",
- "subtype": "init",
+ "type": "result",
+ "subtype": "success",
+ "is_error": false,
+ "duration_ms": 1234,
+ "duration_api_ms": 1200,
+ "num_turns": 2,
+ "result": result,
+ "stop_reason": "end_turn",
"session_id": session_id,
+ "total_cost_usd": 0.125,
+ "usage": {
+ "input_tokens": 10,
+ "output_tokens": 5
+ },
+ "modelUsage": {
+ "claude-opus-4-6": {
+ "inputTokens": 10,
+ "outputTokens": 5
+ }
+ },
+ "permission_denials": [],
+ "fast_mode_state": "off",
+ "uuid": uuid
}))?
),
),
- "write fake init stream",
+ "write fake stdout",
)
}
@@ -411,7 +394,7 @@ fn consult_is_one_shot_and_hides_session_state() -> TestResult {
let first_observed_session = "81f218eb-568b-409b-871b-f6e86d8f666f";
let second_observed_session = "dbd3b6c2-4757-4b45-a8f0-f3d877e1a13f";
write_fake_claude_script(&fake_claude)?;
- write_fake_claude_stream_success(&stdout_file, "oracle", first_observed_session, "uuid-123")?;
+ write_fake_claude_json_success(&stdout_file, "oracle", first_observed_session, "uuid-123")?;
let claude_bin = fake_claude.display().to_string();
let stdout_path = stdout_file.display().to_string();
@@ -481,7 +464,7 @@ fn consult_is_one_shot_and_hides_session_state() -> TestResult {
)?;
assert!(uuid::Uuid::parse_str(&first_session_id).is_ok());
- write_fake_claude_stream_success(
+ write_fake_claude_json_success(
&stdout_file,
"oracle again",
second_observed_session,
@@ -573,9 +556,8 @@ fn consult_is_one_shot_and_hides_session_state() -> TestResult {
let args = must(fs::read_to_string(&args_file), "read fake args file")?;
let lines = args.lines().collect::<Vec<_>>();
assert!(lines.contains(&"-p"));
- assert!(lines.contains(&"--verbose"));
assert!(lines.contains(&"--output-format"));
- assert!(lines.contains(&"stream-json"));
+ assert!(lines.contains(&"json"));
assert!(lines.contains(&"--strict-mcp-config"));
assert!(lines.contains(&"--mcp-config"));
assert!(lines.contains(&"{\"mcpServers\":{}}"));
@@ -795,7 +777,7 @@ fn quota_failures_hide_session_state_on_public_surface() -> TestResult {
let stdout_file = root.join("stdout.json");
let remembered_session = "84b9d462-5af9-4a4e-8e44-379a8d0c46d7";
write_fake_claude_script(&fake_claude)?;
- write_fake_claude_stream_success(&stdout_file, "ok", remembered_session, "uuid-remembered")?;
+ write_fake_claude_json_success(&stdout_file, "ok", remembered_session, "uuid-remembered")?;
let claude_bin = fake_claude.display().to_string();
let stdout_path = stdout_file.display().to_string();
@@ -890,20 +872,19 @@ fn quota_failures_hide_session_state_on_public_surface() -> TestResult {
#[test]
fn fresh_failures_keep_internal_session_state_without_public_leakage() -> TestResult {
- let root = temp_root("consult_fresh_stream_failure")?;
+ let root = temp_root("consult_fresh_json_failure")?;
let state_home = root.join("state-home");
let sandbox = root.join("sandbox");
let caller_home = root.join("caller-home");
let fake_claude = root.join("claude");
let stdout_file = root.join("stdout.json");
let args_file = root.join("args.txt");
- let init_session = "550e8400-e29b-41d4-a716-446655440000";
must(fs::create_dir_all(&state_home), "create state home")?;
must(fs::create_dir_all(&sandbox), "create sandbox")?;
must(fs::create_dir_all(&caller_home), "create caller home")?;
seed_caller_claude_home(&caller_home)?;
write_fake_claude_script(&fake_claude)?;
- write_fake_claude_stream_init(&stdout_file, init_session)?;
+ must(fs::write(&stdout_file, ""), "write empty fake stdout")?;
let claude_bin = fake_claude.display().to_string();
let stdout_path = stdout_file.display().to_string();
@@ -944,6 +925,13 @@ fn fresh_failures_keep_internal_session_state_without_public_leakage() -> TestRe
let args = must(fs::read_to_string(&args_file), "read fresh failure args")?;
assert!(args.contains("--session-id"));
assert!(!args.contains("--resume"));
+ let planned_session_id = must_some(
+ args.lines()
+ .collect::<Vec<_>>()
+ .windows(2)
+ .find_map(|window| (window[0] == "--session-id").then_some(window[1].to_owned())),
+ "planned session id",
+ )?;
let consult_context_index = must(
fs::read_to_string(
state_home
@@ -959,7 +947,7 @@ fn fresh_failures_keep_internal_session_state_without_public_leakage() -> TestRe
)?;
assert_eq!(
consult_context_index["by_cwd"][sandbox.display().to_string()]["session_id"].as_str(),
- Some(init_session)
+ Some(planned_session_id.as_str())
);
assert!(
failed["result"]["content"]