From 481aaa4ee150671d86655d566f52aa1bd7254c16 Mon Sep 17 00:00:00 2001 From: main Date: Wed, 25 Mar 2026 00:03:55 -0400 Subject: Fail fast on silent Claude resume stalls --- crates/phone-opus/src/mcp/service.rs | 164 +++++++++++++++++++++++++------ crates/phone-opus/tests/mcp_hardening.rs | 51 ++++++++++ 2 files changed, 183 insertions(+), 32 deletions(-) (limited to 'crates') diff --git a/crates/phone-opus/src/mcp/service.rs b/crates/phone-opus/src/mcp/service.rs index 39cc825..bb56ad3 100644 --- a/crates/phone-opus/src/mcp/service.rs +++ b/crates/phone-opus/src/mcp/service.rs @@ -5,6 +5,7 @@ use std::io::{self, BufRead, Read, Write}; use std::os::unix::fs::symlink; use std::path::{Path, PathBuf}; use std::process::{Command, Stdio}; +use std::sync::mpsc; use std::thread; use std::time::{Duration, Instant, SystemTime, UNIX_EPOCH}; @@ -659,6 +660,8 @@ enum ConsultInvocationError { #[error("Claude Code returned non-JSON output: {0}")] InvalidJson(String), #[error("{0}")] + Stalled(String), + #[error("{0}")] Downstream(String), } @@ -691,6 +694,12 @@ struct ClaudeStreamCapture { final_envelope: Option, } +enum ClaudeStreamEvent { + Line(String), + Eof, + Error(io::Error), +} + #[derive(Debug)] struct ConsultResponse { cwd: WorkingDirectory, @@ -731,6 +740,8 @@ const SHARED_TMP_ROOTS: [&str; 2] = ["/tmp", "/var/tmp"]; const CONSULT_OUTPUT_ROOT: &str = "/tmp/phone_opus-consults"; const CONSULT_OUTPUT_KEEP_COUNT: usize = 256; const CONSULT_OUTPUT_MAX_AGE: Duration = Duration::from_secs(7 * 24 * 60 * 60); +const DEFAULT_CLAUDE_INITIAL_OUTPUT_TIMEOUT: Duration = Duration::from_secs(60); +const CLAUDE_INITIAL_OUTPUT_TIMEOUT_ENV: &str = "PHONE_OPUS_CLAUDE_INITIAL_OUTPUT_TIMEOUT_MS"; const CONSULT_TIMESTAMP_FORMAT: &[time::format_description::FormatItem<'static>] = time::macros::format_description!("[year][month][day]T[hour][minute][second]Z"); const CLAUDE_MIRROR_FILES: [&str; 4] = [ @@ -984,6 +995,7 @@ fn consult_fault( source.to_string(), ), ConsultInvocationError::InvalidJson(detail) + | ConsultInvocationError::Stalled(detail) | ConsultInvocationError::Downstream(detail) => { FaultRecord::downstream(generation, FaultStage::Claude, operation, detail) } @@ -995,6 +1007,7 @@ fn consult_fault_context(request: &ConsultRequest, error: &ConsultInvocationErro let detail = match error { ConsultInvocationError::Spawn(_) => None, ConsultInvocationError::InvalidJson(detail) + | ConsultInvocationError::Stalled(detail) | ConsultInvocationError::Downstream(detail) => Some(detail.as_str()), }; let reused_session_id = request.reused_session_id(); @@ -1247,6 +1260,7 @@ fn background_failure(error: ConsultInvocationError) -> BackgroundConsultFailure detail: source.to_string(), }, ConsultInvocationError::InvalidJson(detail) + | ConsultInvocationError::Stalled(detail) | ConsultInvocationError::Downstream(detail) => BackgroundConsultFailure { class: "downstream".to_owned(), detail, @@ -1425,6 +1439,7 @@ fn unix_ms_now() -> u64 { fn invoke_claude(request: &ConsultRequest) -> Result { let sandbox = ClaudeSandbox::prepare().map_err(ConsultInvocationError::Spawn)?; + let transient_unit = consult_transient_unit(); let mut command = Command::new(SYSTEMD_RUN_BINARY); let runtime_dir = caller_runtime_dir(); let _ = command @@ -1438,6 +1453,8 @@ fn invoke_claude(request: &ConsultRequest) -> Result Result Result Result Result { + let (sender, receiver) = mpsc::channel(); + let reader = thread::spawn(move || { + let reader = io::BufReader::new(stdout); + for line in reader.lines() { + match line { + Ok(line) => { + if sender.send(ClaudeStreamEvent::Line(line)).is_err() { + return; + } + } + Err(error) => { + let _ = sender.send(ClaudeStreamEvent::Error(error)); + return; + } + } + } + let _ = sender.send(ClaudeStreamEvent::Eof); + }); let mut capture = ClaudeStreamCapture::default(); - let reader = io::BufReader::new(stdout); - for line in reader.lines() { - let line = line.map_err(ConsultInvocationError::Spawn)?; - let trimmed = line.trim(); - if trimmed.is_empty() { - continue; + let first_event = receiver + .recv_timeout(claude_initial_output_timeout()) + .map_err(|_| { + cull_consult_unit(child, transient_unit); + ConsultInvocationError::Stalled(format!( + "Claude Code produced no stream output within {} ms; terminated stalled consult unit `{transient_unit}`", + claude_initial_output_timeout().as_millis() + )) + })?; + let first_event_finished = ingest_claude_stream_event(first_event, &mut capture, request)?; + if first_event_finished { + reader.join().map_err(|_| { + ConsultInvocationError::Spawn(io::Error::other("Claude stdout reader panicked")) + })?; + return Ok(capture); + } + while let Ok(event) = receiver.recv() { + if ingest_claude_stream_event(event, &mut capture, request)? { + break; } - capture.stdout.push_str(trimmed); - capture.stdout.push('\n'); - let Ok(value) = serde_json::from_str::(trimmed) else { - continue; - }; - if capture.observed_session_id.is_none() - && let Some(session_id) = stream_session_id(&value) - { + } + reader.join().map_err(|_| { + ConsultInvocationError::Spawn(io::Error::other("Claude stdout reader panicked")) + })?; + Ok(capture) +} + +fn ingest_claude_stream_event( + event: ClaudeStreamEvent, + capture: &mut ClaudeStreamCapture, + request: &ConsultRequest, +) -> Result { + let line = match event { + ClaudeStreamEvent::Line(line) => line, + ClaudeStreamEvent::Eof => return Ok(true), + ClaudeStreamEvent::Error(error) => return Err(ConsultInvocationError::Spawn(error)), + }; + let trimmed = line.trim(); + if trimmed.is_empty() { + return Ok(false); + } + capture.stdout.push_str(trimmed); + capture.stdout.push('\n'); + let Ok(value) = serde_json::from_str::(trimmed) else { + return Ok(false); + }; + if capture.observed_session_id.is_none() + && let Some(session_id) = stream_session_id(&value) + { + request + .remember_context(Some(session_id.as_str())) + .map_err(ConsultInvocationError::Spawn)?; + capture.observed_session_id = Some(session_id); + } + if value.get("type").and_then(Value::as_str) == Some("result") { + let envelope = serde_json::from_value::(value).map_err(|error| { + ConsultInvocationError::InvalidJson(format!("{error}; stream_line={trimmed}")) + })?; + if let Some(session_id) = envelope.session_id.as_deref() { request - .remember_context(Some(session_id.as_str())) + .remember_context(Some(session_id)) .map_err(ConsultInvocationError::Spawn)?; - capture.observed_session_id = Some(session_id); - } - if value.get("type").and_then(Value::as_str) == Some("result") { - let envelope = - serde_json::from_value::(value).map_err(|error| { - ConsultInvocationError::InvalidJson(format!("{error}; stream_line={trimmed}")) - })?; - if let Some(session_id) = envelope.session_id.as_deref() { - request - .remember_context(Some(session_id)) - .map_err(ConsultInvocationError::Spawn)?; - capture.observed_session_id = Some(session_id.to_owned()); - } - capture.final_envelope = Some(envelope); + capture.observed_session_id = Some(session_id.to_owned()); } + capture.final_envelope = Some(envelope); } - Ok(capture) + Ok(false) +} + +fn claude_initial_output_timeout() -> Duration { + std::env::var(CLAUDE_INITIAL_OUTPUT_TIMEOUT_ENV) + .ok() + .and_then(|value| value.parse::().ok()) + .map(Duration::from_millis) + .unwrap_or(DEFAULT_CLAUDE_INITIAL_OUTPUT_TIMEOUT) +} + +fn consult_transient_unit() -> String { + format!("phone-opus-consult-{}", Uuid::new_v4().simple()) +} + +fn cull_consult_unit(child: &mut std::process::Child, transient_unit: &str) { + let runtime_dir = caller_runtime_dir(); + let _ = Command::new("systemctl") + .env("XDG_RUNTIME_DIR", runtime_dir.as_os_str()) + .env( + "DBUS_SESSION_BUS_ADDRESS", + caller_dbus_session_bus_address(runtime_dir.as_path()), + ) + .arg("--user") + .arg("stop") + .arg(transient_unit) + .stdout(Stdio::null()) + .stderr(Stdio::null()) + .status(); + let _ = child.kill(); + let _ = child.wait(); } fn stream_session_id(value: &Value) -> Option { diff --git a/crates/phone-opus/tests/mcp_hardening.rs b/crates/phone-opus/tests/mcp_hardening.rs index 0d53c33..0b32442 100644 --- a/crates/phone-opus/tests/mcp_hardening.rs +++ b/crates/phone-opus/tests/mcp_hardening.rs @@ -631,6 +631,7 @@ fn consult_reuses_context_per_cwd_by_default_and_fresh_context_opts_out() -> Tes let args = must(fs::read_to_string(&args_file), "read fake args file")?; let lines = args.lines().collect::>(); assert!(lines.contains(&"-p")); + assert!(lines.contains(&"--verbose")); assert!(lines.contains(&"--output-format")); assert!(lines.contains(&"stream-json")); assert!(lines.contains(&"--strict-mcp-config")); @@ -837,6 +838,56 @@ fn consult_surfaces_downstream_cli_failures() -> TestResult { Ok(()) } +#[test] +fn silent_claude_processes_fail_fast_instead_of_wedging() -> TestResult { + let root = temp_root("consult_stall")?; + let state_home = root.join("state-home"); + let fake_claude = root.join("claude"); + let caller_home = root.join("caller-home"); + must(fs::create_dir_all(&state_home), "create state home")?; + must(fs::create_dir_all(&caller_home), "create caller home")?; + seed_caller_claude_home(&caller_home)?; + write_fake_claude_script(&fake_claude)?; + + let claude_bin = fake_claude.display().to_string(); + let caller_home_path = caller_home.display().to_string(); + let env = [ + ("HOME", caller_home_path.as_str()), + ("PHONE_OPUS_CLAUDE_BIN", claude_bin.as_str()), + ("PHONE_OPUS_CLAUDE_INITIAL_OUTPUT_TIMEOUT_MS", "100"), + ("PHONE_OPUS_TEST_SLEEP_MS", "5000"), + ]; + let mut harness = McpHarness::spawn(&state_home, &env)?; + let _ = harness.initialize()?; + harness.notify_initialized()?; + + let started = std::time::Instant::now(); + let consult = harness.call_tool(3, "consult", json!({ "prompt": "hang forever" }))?; + let elapsed = started.elapsed(); + + assert_tool_error(&consult); + assert_eq!( + tool_content(&consult)["fault"]["class"].as_str(), + Some("downstream") + ); + assert!( + tool_content(&consult)["fault"]["detail"] + .as_str() + .is_some_and(|value| value.contains("produced no stream output within 100 ms")) + ); + assert!(elapsed < std::time::Duration::from_secs(3)); + assert_eq!( + tool_content(&consult)["context"]["consult"]["context_mode"].as_str(), + Some("fresh") + ); + assert!( + tool_content(&consult)["context"]["consult"]["planned_session_id"] + .as_str() + .is_some_and(|value| !value.is_empty()) + ); + Ok(()) +} + #[test] fn quota_failures_surface_resume_context_for_same_cwd() -> TestResult { let root = temp_root("consult_quota_failure")?; -- cgit v1.2.3