diff options
| author | main <main@swarm.moe> | 2026-03-25 00:03:55 -0400 |
|---|---|---|
| committer | main <main@swarm.moe> | 2026-03-25 00:03:55 -0400 |
| commit | 481aaa4ee150671d86655d566f52aa1bd7254c16 (patch) | |
| tree | e8fcdabeca5cbdce8565350102eeadaed40c360f /crates/phone-opus | |
| parent | 57db4dc94dbf571ac8a393f61549def5afaa0209 (diff) | |
| download | phone_opus-481aaa4ee150671d86655d566f52aa1bd7254c16.zip | |
Fail fast on silent Claude resume stalls
Diffstat (limited to 'crates/phone-opus')
| -rw-r--r-- | crates/phone-opus/src/mcp/service.rs | 164 | ||||
| -rw-r--r-- | crates/phone-opus/tests/mcp_hardening.rs | 51 |
2 files changed, 183 insertions, 32 deletions
diff --git a/crates/phone-opus/src/mcp/service.rs b/crates/phone-opus/src/mcp/service.rs index 39cc825..bb56ad3 100644 --- a/crates/phone-opus/src/mcp/service.rs +++ b/crates/phone-opus/src/mcp/service.rs @@ -5,6 +5,7 @@ use std::io::{self, BufRead, Read, Write}; use std::os::unix::fs::symlink; use std::path::{Path, PathBuf}; use std::process::{Command, Stdio}; +use std::sync::mpsc; use std::thread; use std::time::{Duration, Instant, SystemTime, UNIX_EPOCH}; @@ -659,6 +660,8 @@ enum ConsultInvocationError { #[error("Claude Code returned non-JSON output: {0}")] InvalidJson(String), #[error("{0}")] + Stalled(String), + #[error("{0}")] Downstream(String), } @@ -691,6 +694,12 @@ struct ClaudeStreamCapture { final_envelope: Option<ClaudeJsonEnvelope>, } +enum ClaudeStreamEvent { + Line(String), + Eof, + Error(io::Error), +} + #[derive(Debug)] struct ConsultResponse { cwd: WorkingDirectory, @@ -731,6 +740,8 @@ const SHARED_TMP_ROOTS: [&str; 2] = ["/tmp", "/var/tmp"]; const CONSULT_OUTPUT_ROOT: &str = "/tmp/phone_opus-consults"; const CONSULT_OUTPUT_KEEP_COUNT: usize = 256; const CONSULT_OUTPUT_MAX_AGE: Duration = Duration::from_secs(7 * 24 * 60 * 60); +const DEFAULT_CLAUDE_INITIAL_OUTPUT_TIMEOUT: Duration = Duration::from_secs(60); +const CLAUDE_INITIAL_OUTPUT_TIMEOUT_ENV: &str = "PHONE_OPUS_CLAUDE_INITIAL_OUTPUT_TIMEOUT_MS"; const CONSULT_TIMESTAMP_FORMAT: &[time::format_description::FormatItem<'static>] = time::macros::format_description!("[year][month][day]T[hour][minute][second]Z"); const CLAUDE_MIRROR_FILES: [&str; 4] = [ @@ -984,6 +995,7 @@ fn consult_fault( source.to_string(), ), ConsultInvocationError::InvalidJson(detail) + | ConsultInvocationError::Stalled(detail) | ConsultInvocationError::Downstream(detail) => { FaultRecord::downstream(generation, FaultStage::Claude, operation, detail) } @@ -995,6 +1007,7 @@ fn consult_fault_context(request: &ConsultRequest, error: &ConsultInvocationErro let detail = match error { ConsultInvocationError::Spawn(_) => None, ConsultInvocationError::InvalidJson(detail) + | ConsultInvocationError::Stalled(detail) | ConsultInvocationError::Downstream(detail) => Some(detail.as_str()), }; let reused_session_id = request.reused_session_id(); @@ -1247,6 +1260,7 @@ fn background_failure(error: ConsultInvocationError) -> BackgroundConsultFailure detail: source.to_string(), }, ConsultInvocationError::InvalidJson(detail) + | ConsultInvocationError::Stalled(detail) | ConsultInvocationError::Downstream(detail) => BackgroundConsultFailure { class: "downstream".to_owned(), detail, @@ -1425,6 +1439,7 @@ fn unix_ms_now() -> u64 { fn invoke_claude(request: &ConsultRequest) -> Result<ConsultResponse, ConsultInvocationError> { let sandbox = ClaudeSandbox::prepare().map_err(ConsultInvocationError::Spawn)?; + let transient_unit = consult_transient_unit(); let mut command = Command::new(SYSTEMD_RUN_BINARY); let runtime_dir = caller_runtime_dir(); let _ = command @@ -1438,6 +1453,8 @@ fn invoke_claude(request: &ConsultRequest) -> Result<ConsultResponse, ConsultInv .arg("--pipe") .arg("--collect") .arg("--quiet") + .arg("--unit") + .arg(transient_unit.as_str()) .arg("--working-directory") .arg(request.cwd.as_path()); for property in [ @@ -1465,6 +1482,7 @@ fn invoke_claude(request: &ConsultRequest) -> Result<ConsultResponse, ConsultInv let _ = command .arg(claude_binary()) .arg("-p") + .arg("--verbose") .arg("--output-format") .arg("stream-json") .arg("--strict-mcp-config") @@ -1508,7 +1526,7 @@ fn invoke_claude(request: &ConsultRequest) -> Result<ConsultResponse, ConsultInv let _ = stderr.read_to_string(&mut buffer)?; Ok(buffer) }); - let capture = capture_claude_stream(stdout, request)?; + let capture = capture_claude_stream(stdout, request, &mut child, transient_unit.as_str())?; let status = child.wait().map_err(ConsultInvocationError::Spawn)?; let stderr = stderr_reader .join() @@ -1590,45 +1608,127 @@ fn invoke_claude(request: &ConsultRequest) -> Result<ConsultResponse, ConsultInv } fn capture_claude_stream( - stdout: impl Read, + stdout: impl Read + Send + 'static, request: &ConsultRequest, + child: &mut std::process::Child, + transient_unit: &str, ) -> Result<ClaudeStreamCapture, ConsultInvocationError> { + let (sender, receiver) = mpsc::channel(); + let reader = thread::spawn(move || { + let reader = io::BufReader::new(stdout); + for line in reader.lines() { + match line { + Ok(line) => { + if sender.send(ClaudeStreamEvent::Line(line)).is_err() { + return; + } + } + Err(error) => { + let _ = sender.send(ClaudeStreamEvent::Error(error)); + return; + } + } + } + let _ = sender.send(ClaudeStreamEvent::Eof); + }); let mut capture = ClaudeStreamCapture::default(); - let reader = io::BufReader::new(stdout); - for line in reader.lines() { - let line = line.map_err(ConsultInvocationError::Spawn)?; - let trimmed = line.trim(); - if trimmed.is_empty() { - continue; + let first_event = receiver + .recv_timeout(claude_initial_output_timeout()) + .map_err(|_| { + cull_consult_unit(child, transient_unit); + ConsultInvocationError::Stalled(format!( + "Claude Code produced no stream output within {} ms; terminated stalled consult unit `{transient_unit}`", + claude_initial_output_timeout().as_millis() + )) + })?; + let first_event_finished = ingest_claude_stream_event(first_event, &mut capture, request)?; + if first_event_finished { + reader.join().map_err(|_| { + ConsultInvocationError::Spawn(io::Error::other("Claude stdout reader panicked")) + })?; + return Ok(capture); + } + while let Ok(event) = receiver.recv() { + if ingest_claude_stream_event(event, &mut capture, request)? { + break; } - capture.stdout.push_str(trimmed); - capture.stdout.push('\n'); - let Ok(value) = serde_json::from_str::<Value>(trimmed) else { - continue; - }; - if capture.observed_session_id.is_none() - && let Some(session_id) = stream_session_id(&value) - { + } + reader.join().map_err(|_| { + ConsultInvocationError::Spawn(io::Error::other("Claude stdout reader panicked")) + })?; + Ok(capture) +} + +fn ingest_claude_stream_event( + event: ClaudeStreamEvent, + capture: &mut ClaudeStreamCapture, + request: &ConsultRequest, +) -> Result<bool, ConsultInvocationError> { + let line = match event { + ClaudeStreamEvent::Line(line) => line, + ClaudeStreamEvent::Eof => return Ok(true), + ClaudeStreamEvent::Error(error) => return Err(ConsultInvocationError::Spawn(error)), + }; + let trimmed = line.trim(); + if trimmed.is_empty() { + return Ok(false); + } + capture.stdout.push_str(trimmed); + capture.stdout.push('\n'); + let Ok(value) = serde_json::from_str::<Value>(trimmed) else { + return Ok(false); + }; + if capture.observed_session_id.is_none() + && let Some(session_id) = stream_session_id(&value) + { + request + .remember_context(Some(session_id.as_str())) + .map_err(ConsultInvocationError::Spawn)?; + capture.observed_session_id = Some(session_id); + } + if value.get("type").and_then(Value::as_str) == Some("result") { + let envelope = serde_json::from_value::<ClaudeJsonEnvelope>(value).map_err(|error| { + ConsultInvocationError::InvalidJson(format!("{error}; stream_line={trimmed}")) + })?; + if let Some(session_id) = envelope.session_id.as_deref() { request - .remember_context(Some(session_id.as_str())) + .remember_context(Some(session_id)) .map_err(ConsultInvocationError::Spawn)?; - capture.observed_session_id = Some(session_id); - } - if value.get("type").and_then(Value::as_str) == Some("result") { - let envelope = - serde_json::from_value::<ClaudeJsonEnvelope>(value).map_err(|error| { - ConsultInvocationError::InvalidJson(format!("{error}; stream_line={trimmed}")) - })?; - if let Some(session_id) = envelope.session_id.as_deref() { - request - .remember_context(Some(session_id)) - .map_err(ConsultInvocationError::Spawn)?; - capture.observed_session_id = Some(session_id.to_owned()); - } - capture.final_envelope = Some(envelope); + capture.observed_session_id = Some(session_id.to_owned()); } + capture.final_envelope = Some(envelope); } - Ok(capture) + Ok(false) +} + +fn claude_initial_output_timeout() -> Duration { + std::env::var(CLAUDE_INITIAL_OUTPUT_TIMEOUT_ENV) + .ok() + .and_then(|value| value.parse::<u64>().ok()) + .map(Duration::from_millis) + .unwrap_or(DEFAULT_CLAUDE_INITIAL_OUTPUT_TIMEOUT) +} + +fn consult_transient_unit() -> String { + format!("phone-opus-consult-{}", Uuid::new_v4().simple()) +} + +fn cull_consult_unit(child: &mut std::process::Child, transient_unit: &str) { + let runtime_dir = caller_runtime_dir(); + let _ = Command::new("systemctl") + .env("XDG_RUNTIME_DIR", runtime_dir.as_os_str()) + .env( + "DBUS_SESSION_BUS_ADDRESS", + caller_dbus_session_bus_address(runtime_dir.as_path()), + ) + .arg("--user") + .arg("stop") + .arg(transient_unit) + .stdout(Stdio::null()) + .stderr(Stdio::null()) + .status(); + let _ = child.kill(); + let _ = child.wait(); } fn stream_session_id(value: &Value) -> Option<String> { diff --git a/crates/phone-opus/tests/mcp_hardening.rs b/crates/phone-opus/tests/mcp_hardening.rs index 0d53c33..0b32442 100644 --- a/crates/phone-opus/tests/mcp_hardening.rs +++ b/crates/phone-opus/tests/mcp_hardening.rs @@ -631,6 +631,7 @@ fn consult_reuses_context_per_cwd_by_default_and_fresh_context_opts_out() -> Tes let args = must(fs::read_to_string(&args_file), "read fake args file")?; let lines = args.lines().collect::<Vec<_>>(); assert!(lines.contains(&"-p")); + assert!(lines.contains(&"--verbose")); assert!(lines.contains(&"--output-format")); assert!(lines.contains(&"stream-json")); assert!(lines.contains(&"--strict-mcp-config")); @@ -838,6 +839,56 @@ fn consult_surfaces_downstream_cli_failures() -> TestResult { } #[test] +fn silent_claude_processes_fail_fast_instead_of_wedging() -> TestResult { + let root = temp_root("consult_stall")?; + let state_home = root.join("state-home"); + let fake_claude = root.join("claude"); + let caller_home = root.join("caller-home"); + must(fs::create_dir_all(&state_home), "create state home")?; + must(fs::create_dir_all(&caller_home), "create caller home")?; + seed_caller_claude_home(&caller_home)?; + write_fake_claude_script(&fake_claude)?; + + let claude_bin = fake_claude.display().to_string(); + let caller_home_path = caller_home.display().to_string(); + let env = [ + ("HOME", caller_home_path.as_str()), + ("PHONE_OPUS_CLAUDE_BIN", claude_bin.as_str()), + ("PHONE_OPUS_CLAUDE_INITIAL_OUTPUT_TIMEOUT_MS", "100"), + ("PHONE_OPUS_TEST_SLEEP_MS", "5000"), + ]; + let mut harness = McpHarness::spawn(&state_home, &env)?; + let _ = harness.initialize()?; + harness.notify_initialized()?; + + let started = std::time::Instant::now(); + let consult = harness.call_tool(3, "consult", json!({ "prompt": "hang forever" }))?; + let elapsed = started.elapsed(); + + assert_tool_error(&consult); + assert_eq!( + tool_content(&consult)["fault"]["class"].as_str(), + Some("downstream") + ); + assert!( + tool_content(&consult)["fault"]["detail"] + .as_str() + .is_some_and(|value| value.contains("produced no stream output within 100 ms")) + ); + assert!(elapsed < std::time::Duration::from_secs(3)); + assert_eq!( + tool_content(&consult)["context"]["consult"]["context_mode"].as_str(), + Some("fresh") + ); + assert!( + tool_content(&consult)["context"]["consult"]["planned_session_id"] + .as_str() + .is_some_and(|value| !value.is_empty()) + ); + Ok(()) +} + +#[test] fn quota_failures_surface_resume_context_for_same_cwd() -> TestResult { let root = temp_root("consult_quota_failure")?; let state_home = root.join("state-home"); |