swarm repositories / source
aboutsummaryrefslogtreecommitdiff
path: root/crates
diff options
context:
space:
mode:
Diffstat (limited to 'crates')
-rw-r--r--crates/phone-opus/src/mcp/service.rs164
-rw-r--r--crates/phone-opus/tests/mcp_hardening.rs51
2 files changed, 183 insertions, 32 deletions
diff --git a/crates/phone-opus/src/mcp/service.rs b/crates/phone-opus/src/mcp/service.rs
index 39cc825..bb56ad3 100644
--- a/crates/phone-opus/src/mcp/service.rs
+++ b/crates/phone-opus/src/mcp/service.rs
@@ -5,6 +5,7 @@ use std::io::{self, BufRead, Read, Write};
use std::os::unix::fs::symlink;
use std::path::{Path, PathBuf};
use std::process::{Command, Stdio};
+use std::sync::mpsc;
use std::thread;
use std::time::{Duration, Instant, SystemTime, UNIX_EPOCH};
@@ -659,6 +660,8 @@ enum ConsultInvocationError {
#[error("Claude Code returned non-JSON output: {0}")]
InvalidJson(String),
#[error("{0}")]
+ Stalled(String),
+ #[error("{0}")]
Downstream(String),
}
@@ -691,6 +694,12 @@ struct ClaudeStreamCapture {
final_envelope: Option<ClaudeJsonEnvelope>,
}
+enum ClaudeStreamEvent {
+ Line(String),
+ Eof,
+ Error(io::Error),
+}
+
#[derive(Debug)]
struct ConsultResponse {
cwd: WorkingDirectory,
@@ -731,6 +740,8 @@ const SHARED_TMP_ROOTS: [&str; 2] = ["/tmp", "/var/tmp"];
const CONSULT_OUTPUT_ROOT: &str = "/tmp/phone_opus-consults";
const CONSULT_OUTPUT_KEEP_COUNT: usize = 256;
const CONSULT_OUTPUT_MAX_AGE: Duration = Duration::from_secs(7 * 24 * 60 * 60);
+const DEFAULT_CLAUDE_INITIAL_OUTPUT_TIMEOUT: Duration = Duration::from_secs(60);
+const CLAUDE_INITIAL_OUTPUT_TIMEOUT_ENV: &str = "PHONE_OPUS_CLAUDE_INITIAL_OUTPUT_TIMEOUT_MS";
const CONSULT_TIMESTAMP_FORMAT: &[time::format_description::FormatItem<'static>] =
time::macros::format_description!("[year][month][day]T[hour][minute][second]Z");
const CLAUDE_MIRROR_FILES: [&str; 4] = [
@@ -984,6 +995,7 @@ fn consult_fault(
source.to_string(),
),
ConsultInvocationError::InvalidJson(detail)
+ | ConsultInvocationError::Stalled(detail)
| ConsultInvocationError::Downstream(detail) => {
FaultRecord::downstream(generation, FaultStage::Claude, operation, detail)
}
@@ -995,6 +1007,7 @@ fn consult_fault_context(request: &ConsultRequest, error: &ConsultInvocationErro
let detail = match error {
ConsultInvocationError::Spawn(_) => None,
ConsultInvocationError::InvalidJson(detail)
+ | ConsultInvocationError::Stalled(detail)
| ConsultInvocationError::Downstream(detail) => Some(detail.as_str()),
};
let reused_session_id = request.reused_session_id();
@@ -1247,6 +1260,7 @@ fn background_failure(error: ConsultInvocationError) -> BackgroundConsultFailure
detail: source.to_string(),
},
ConsultInvocationError::InvalidJson(detail)
+ | ConsultInvocationError::Stalled(detail)
| ConsultInvocationError::Downstream(detail) => BackgroundConsultFailure {
class: "downstream".to_owned(),
detail,
@@ -1425,6 +1439,7 @@ fn unix_ms_now() -> u64 {
fn invoke_claude(request: &ConsultRequest) -> Result<ConsultResponse, ConsultInvocationError> {
let sandbox = ClaudeSandbox::prepare().map_err(ConsultInvocationError::Spawn)?;
+ let transient_unit = consult_transient_unit();
let mut command = Command::new(SYSTEMD_RUN_BINARY);
let runtime_dir = caller_runtime_dir();
let _ = command
@@ -1438,6 +1453,8 @@ fn invoke_claude(request: &ConsultRequest) -> Result<ConsultResponse, ConsultInv
.arg("--pipe")
.arg("--collect")
.arg("--quiet")
+ .arg("--unit")
+ .arg(transient_unit.as_str())
.arg("--working-directory")
.arg(request.cwd.as_path());
for property in [
@@ -1465,6 +1482,7 @@ fn invoke_claude(request: &ConsultRequest) -> Result<ConsultResponse, ConsultInv
let _ = command
.arg(claude_binary())
.arg("-p")
+ .arg("--verbose")
.arg("--output-format")
.arg("stream-json")
.arg("--strict-mcp-config")
@@ -1508,7 +1526,7 @@ fn invoke_claude(request: &ConsultRequest) -> Result<ConsultResponse, ConsultInv
let _ = stderr.read_to_string(&mut buffer)?;
Ok(buffer)
});
- let capture = capture_claude_stream(stdout, request)?;
+ let capture = capture_claude_stream(stdout, request, &mut child, transient_unit.as_str())?;
let status = child.wait().map_err(ConsultInvocationError::Spawn)?;
let stderr = stderr_reader
.join()
@@ -1590,45 +1608,127 @@ fn invoke_claude(request: &ConsultRequest) -> Result<ConsultResponse, ConsultInv
}
fn capture_claude_stream(
- stdout: impl Read,
+ stdout: impl Read + Send + 'static,
request: &ConsultRequest,
+ child: &mut std::process::Child,
+ transient_unit: &str,
) -> Result<ClaudeStreamCapture, ConsultInvocationError> {
+ let (sender, receiver) = mpsc::channel();
+ let reader = thread::spawn(move || {
+ let reader = io::BufReader::new(stdout);
+ for line in reader.lines() {
+ match line {
+ Ok(line) => {
+ if sender.send(ClaudeStreamEvent::Line(line)).is_err() {
+ return;
+ }
+ }
+ Err(error) => {
+ let _ = sender.send(ClaudeStreamEvent::Error(error));
+ return;
+ }
+ }
+ }
+ let _ = sender.send(ClaudeStreamEvent::Eof);
+ });
let mut capture = ClaudeStreamCapture::default();
- let reader = io::BufReader::new(stdout);
- for line in reader.lines() {
- let line = line.map_err(ConsultInvocationError::Spawn)?;
- let trimmed = line.trim();
- if trimmed.is_empty() {
- continue;
+ let first_event = receiver
+ .recv_timeout(claude_initial_output_timeout())
+ .map_err(|_| {
+ cull_consult_unit(child, transient_unit);
+ ConsultInvocationError::Stalled(format!(
+ "Claude Code produced no stream output within {} ms; terminated stalled consult unit `{transient_unit}`",
+ claude_initial_output_timeout().as_millis()
+ ))
+ })?;
+ let first_event_finished = ingest_claude_stream_event(first_event, &mut capture, request)?;
+ if first_event_finished {
+ reader.join().map_err(|_| {
+ ConsultInvocationError::Spawn(io::Error::other("Claude stdout reader panicked"))
+ })?;
+ return Ok(capture);
+ }
+ while let Ok(event) = receiver.recv() {
+ if ingest_claude_stream_event(event, &mut capture, request)? {
+ break;
}
- capture.stdout.push_str(trimmed);
- capture.stdout.push('\n');
- let Ok(value) = serde_json::from_str::<Value>(trimmed) else {
- continue;
- };
- if capture.observed_session_id.is_none()
- && let Some(session_id) = stream_session_id(&value)
- {
+ }
+ reader.join().map_err(|_| {
+ ConsultInvocationError::Spawn(io::Error::other("Claude stdout reader panicked"))
+ })?;
+ Ok(capture)
+}
+
+fn ingest_claude_stream_event(
+ event: ClaudeStreamEvent,
+ capture: &mut ClaudeStreamCapture,
+ request: &ConsultRequest,
+) -> Result<bool, ConsultInvocationError> {
+ let line = match event {
+ ClaudeStreamEvent::Line(line) => line,
+ ClaudeStreamEvent::Eof => return Ok(true),
+ ClaudeStreamEvent::Error(error) => return Err(ConsultInvocationError::Spawn(error)),
+ };
+ let trimmed = line.trim();
+ if trimmed.is_empty() {
+ return Ok(false);
+ }
+ capture.stdout.push_str(trimmed);
+ capture.stdout.push('\n');
+ let Ok(value) = serde_json::from_str::<Value>(trimmed) else {
+ return Ok(false);
+ };
+ if capture.observed_session_id.is_none()
+ && let Some(session_id) = stream_session_id(&value)
+ {
+ request
+ .remember_context(Some(session_id.as_str()))
+ .map_err(ConsultInvocationError::Spawn)?;
+ capture.observed_session_id = Some(session_id);
+ }
+ if value.get("type").and_then(Value::as_str) == Some("result") {
+ let envelope = serde_json::from_value::<ClaudeJsonEnvelope>(value).map_err(|error| {
+ ConsultInvocationError::InvalidJson(format!("{error}; stream_line={trimmed}"))
+ })?;
+ if let Some(session_id) = envelope.session_id.as_deref() {
request
- .remember_context(Some(session_id.as_str()))
+ .remember_context(Some(session_id))
.map_err(ConsultInvocationError::Spawn)?;
- capture.observed_session_id = Some(session_id);
- }
- if value.get("type").and_then(Value::as_str) == Some("result") {
- let envelope =
- serde_json::from_value::<ClaudeJsonEnvelope>(value).map_err(|error| {
- ConsultInvocationError::InvalidJson(format!("{error}; stream_line={trimmed}"))
- })?;
- if let Some(session_id) = envelope.session_id.as_deref() {
- request
- .remember_context(Some(session_id))
- .map_err(ConsultInvocationError::Spawn)?;
- capture.observed_session_id = Some(session_id.to_owned());
- }
- capture.final_envelope = Some(envelope);
+ capture.observed_session_id = Some(session_id.to_owned());
}
+ capture.final_envelope = Some(envelope);
}
- Ok(capture)
+ Ok(false)
+}
+
+fn claude_initial_output_timeout() -> Duration {
+ std::env::var(CLAUDE_INITIAL_OUTPUT_TIMEOUT_ENV)
+ .ok()
+ .and_then(|value| value.parse::<u64>().ok())
+ .map(Duration::from_millis)
+ .unwrap_or(DEFAULT_CLAUDE_INITIAL_OUTPUT_TIMEOUT)
+}
+
+fn consult_transient_unit() -> String {
+ format!("phone-opus-consult-{}", Uuid::new_v4().simple())
+}
+
+fn cull_consult_unit(child: &mut std::process::Child, transient_unit: &str) {
+ let runtime_dir = caller_runtime_dir();
+ let _ = Command::new("systemctl")
+ .env("XDG_RUNTIME_DIR", runtime_dir.as_os_str())
+ .env(
+ "DBUS_SESSION_BUS_ADDRESS",
+ caller_dbus_session_bus_address(runtime_dir.as_path()),
+ )
+ .arg("--user")
+ .arg("stop")
+ .arg(transient_unit)
+ .stdout(Stdio::null())
+ .stderr(Stdio::null())
+ .status();
+ let _ = child.kill();
+ let _ = child.wait();
}
fn stream_session_id(value: &Value) -> Option<String> {
diff --git a/crates/phone-opus/tests/mcp_hardening.rs b/crates/phone-opus/tests/mcp_hardening.rs
index 0d53c33..0b32442 100644
--- a/crates/phone-opus/tests/mcp_hardening.rs
+++ b/crates/phone-opus/tests/mcp_hardening.rs
@@ -631,6 +631,7 @@ fn consult_reuses_context_per_cwd_by_default_and_fresh_context_opts_out() -> Tes
let args = must(fs::read_to_string(&args_file), "read fake args file")?;
let lines = args.lines().collect::<Vec<_>>();
assert!(lines.contains(&"-p"));
+ assert!(lines.contains(&"--verbose"));
assert!(lines.contains(&"--output-format"));
assert!(lines.contains(&"stream-json"));
assert!(lines.contains(&"--strict-mcp-config"));
@@ -838,6 +839,56 @@ fn consult_surfaces_downstream_cli_failures() -> TestResult {
}
#[test]
+fn silent_claude_processes_fail_fast_instead_of_wedging() -> TestResult {
+ let root = temp_root("consult_stall")?;
+ let state_home = root.join("state-home");
+ let fake_claude = root.join("claude");
+ let caller_home = root.join("caller-home");
+ must(fs::create_dir_all(&state_home), "create state home")?;
+ must(fs::create_dir_all(&caller_home), "create caller home")?;
+ seed_caller_claude_home(&caller_home)?;
+ write_fake_claude_script(&fake_claude)?;
+
+ let claude_bin = fake_claude.display().to_string();
+ let caller_home_path = caller_home.display().to_string();
+ let env = [
+ ("HOME", caller_home_path.as_str()),
+ ("PHONE_OPUS_CLAUDE_BIN", claude_bin.as_str()),
+ ("PHONE_OPUS_CLAUDE_INITIAL_OUTPUT_TIMEOUT_MS", "100"),
+ ("PHONE_OPUS_TEST_SLEEP_MS", "5000"),
+ ];
+ let mut harness = McpHarness::spawn(&state_home, &env)?;
+ let _ = harness.initialize()?;
+ harness.notify_initialized()?;
+
+ let started = std::time::Instant::now();
+ let consult = harness.call_tool(3, "consult", json!({ "prompt": "hang forever" }))?;
+ let elapsed = started.elapsed();
+
+ assert_tool_error(&consult);
+ assert_eq!(
+ tool_content(&consult)["fault"]["class"].as_str(),
+ Some("downstream")
+ );
+ assert!(
+ tool_content(&consult)["fault"]["detail"]
+ .as_str()
+ .is_some_and(|value| value.contains("produced no stream output within 100 ms"))
+ );
+ assert!(elapsed < std::time::Duration::from_secs(3));
+ assert_eq!(
+ tool_content(&consult)["context"]["consult"]["context_mode"].as_str(),
+ Some("fresh")
+ );
+ assert!(
+ tool_content(&consult)["context"]["consult"]["planned_session_id"]
+ .as_str()
+ .is_some_and(|value| !value.is_empty())
+ );
+ Ok(())
+}
+
+#[test]
fn quota_failures_surface_resume_context_for_same_cwd() -> TestResult {
let root = temp_root("consult_quota_failure")?;
let state_home = root.join("state-home");