diff options
| author | main <main@swarm.moe> | 2026-03-25 00:03:55 -0400 |
|---|---|---|
| committer | main <main@swarm.moe> | 2026-03-25 00:03:55 -0400 |
| commit | 481aaa4ee150671d86655d566f52aa1bd7254c16 (patch) | |
| tree | e8fcdabeca5cbdce8565350102eeadaed40c360f /crates/phone-opus/src | |
| parent | 57db4dc94dbf571ac8a393f61549def5afaa0209 (diff) | |
| download | phone_opus-481aaa4ee150671d86655d566f52aa1bd7254c16.zip | |
Fail fast on silent Claude resume stalls
Diffstat (limited to 'crates/phone-opus/src')
| -rw-r--r-- | crates/phone-opus/src/mcp/service.rs | 164 |
1 files changed, 132 insertions, 32 deletions
diff --git a/crates/phone-opus/src/mcp/service.rs b/crates/phone-opus/src/mcp/service.rs index 39cc825..bb56ad3 100644 --- a/crates/phone-opus/src/mcp/service.rs +++ b/crates/phone-opus/src/mcp/service.rs @@ -5,6 +5,7 @@ use std::io::{self, BufRead, Read, Write}; use std::os::unix::fs::symlink; use std::path::{Path, PathBuf}; use std::process::{Command, Stdio}; +use std::sync::mpsc; use std::thread; use std::time::{Duration, Instant, SystemTime, UNIX_EPOCH}; @@ -659,6 +660,8 @@ enum ConsultInvocationError { #[error("Claude Code returned non-JSON output: {0}")] InvalidJson(String), #[error("{0}")] + Stalled(String), + #[error("{0}")] Downstream(String), } @@ -691,6 +694,12 @@ struct ClaudeStreamCapture { final_envelope: Option<ClaudeJsonEnvelope>, } +enum ClaudeStreamEvent { + Line(String), + Eof, + Error(io::Error), +} + #[derive(Debug)] struct ConsultResponse { cwd: WorkingDirectory, @@ -731,6 +740,8 @@ const SHARED_TMP_ROOTS: [&str; 2] = ["/tmp", "/var/tmp"]; const CONSULT_OUTPUT_ROOT: &str = "/tmp/phone_opus-consults"; const CONSULT_OUTPUT_KEEP_COUNT: usize = 256; const CONSULT_OUTPUT_MAX_AGE: Duration = Duration::from_secs(7 * 24 * 60 * 60); +const DEFAULT_CLAUDE_INITIAL_OUTPUT_TIMEOUT: Duration = Duration::from_secs(60); +const CLAUDE_INITIAL_OUTPUT_TIMEOUT_ENV: &str = "PHONE_OPUS_CLAUDE_INITIAL_OUTPUT_TIMEOUT_MS"; const CONSULT_TIMESTAMP_FORMAT: &[time::format_description::FormatItem<'static>] = time::macros::format_description!("[year][month][day]T[hour][minute][second]Z"); const CLAUDE_MIRROR_FILES: [&str; 4] = [ @@ -984,6 +995,7 @@ fn consult_fault( source.to_string(), ), ConsultInvocationError::InvalidJson(detail) + | ConsultInvocationError::Stalled(detail) | ConsultInvocationError::Downstream(detail) => { FaultRecord::downstream(generation, FaultStage::Claude, operation, detail) } @@ -995,6 +1007,7 @@ fn consult_fault_context(request: &ConsultRequest, error: &ConsultInvocationErro let detail = match error { ConsultInvocationError::Spawn(_) => None, ConsultInvocationError::InvalidJson(detail) + | ConsultInvocationError::Stalled(detail) | ConsultInvocationError::Downstream(detail) => Some(detail.as_str()), }; let reused_session_id = request.reused_session_id(); @@ -1247,6 +1260,7 @@ fn background_failure(error: ConsultInvocationError) -> BackgroundConsultFailure detail: source.to_string(), }, ConsultInvocationError::InvalidJson(detail) + | ConsultInvocationError::Stalled(detail) | ConsultInvocationError::Downstream(detail) => BackgroundConsultFailure { class: "downstream".to_owned(), detail, @@ -1425,6 +1439,7 @@ fn unix_ms_now() -> u64 { fn invoke_claude(request: &ConsultRequest) -> Result<ConsultResponse, ConsultInvocationError> { let sandbox = ClaudeSandbox::prepare().map_err(ConsultInvocationError::Spawn)?; + let transient_unit = consult_transient_unit(); let mut command = Command::new(SYSTEMD_RUN_BINARY); let runtime_dir = caller_runtime_dir(); let _ = command @@ -1438,6 +1453,8 @@ fn invoke_claude(request: &ConsultRequest) -> Result<ConsultResponse, ConsultInv .arg("--pipe") .arg("--collect") .arg("--quiet") + .arg("--unit") + .arg(transient_unit.as_str()) .arg("--working-directory") .arg(request.cwd.as_path()); for property in [ @@ -1465,6 +1482,7 @@ fn invoke_claude(request: &ConsultRequest) -> Result<ConsultResponse, ConsultInv let _ = command .arg(claude_binary()) .arg("-p") + .arg("--verbose") .arg("--output-format") .arg("stream-json") .arg("--strict-mcp-config") @@ -1508,7 +1526,7 @@ fn invoke_claude(request: &ConsultRequest) -> Result<ConsultResponse, ConsultInv let _ = stderr.read_to_string(&mut buffer)?; Ok(buffer) }); - let capture = capture_claude_stream(stdout, request)?; + let capture = capture_claude_stream(stdout, request, &mut child, transient_unit.as_str())?; let status = child.wait().map_err(ConsultInvocationError::Spawn)?; let stderr = stderr_reader .join() @@ -1590,45 +1608,127 @@ fn invoke_claude(request: &ConsultRequest) -> Result<ConsultResponse, ConsultInv } fn capture_claude_stream( - stdout: impl Read, + stdout: impl Read + Send + 'static, request: &ConsultRequest, + child: &mut std::process::Child, + transient_unit: &str, ) -> Result<ClaudeStreamCapture, ConsultInvocationError> { + let (sender, receiver) = mpsc::channel(); + let reader = thread::spawn(move || { + let reader = io::BufReader::new(stdout); + for line in reader.lines() { + match line { + Ok(line) => { + if sender.send(ClaudeStreamEvent::Line(line)).is_err() { + return; + } + } + Err(error) => { + let _ = sender.send(ClaudeStreamEvent::Error(error)); + return; + } + } + } + let _ = sender.send(ClaudeStreamEvent::Eof); + }); let mut capture = ClaudeStreamCapture::default(); - let reader = io::BufReader::new(stdout); - for line in reader.lines() { - let line = line.map_err(ConsultInvocationError::Spawn)?; - let trimmed = line.trim(); - if trimmed.is_empty() { - continue; + let first_event = receiver + .recv_timeout(claude_initial_output_timeout()) + .map_err(|_| { + cull_consult_unit(child, transient_unit); + ConsultInvocationError::Stalled(format!( + "Claude Code produced no stream output within {} ms; terminated stalled consult unit `{transient_unit}`", + claude_initial_output_timeout().as_millis() + )) + })?; + let first_event_finished = ingest_claude_stream_event(first_event, &mut capture, request)?; + if first_event_finished { + reader.join().map_err(|_| { + ConsultInvocationError::Spawn(io::Error::other("Claude stdout reader panicked")) + })?; + return Ok(capture); + } + while let Ok(event) = receiver.recv() { + if ingest_claude_stream_event(event, &mut capture, request)? { + break; } - capture.stdout.push_str(trimmed); - capture.stdout.push('\n'); - let Ok(value) = serde_json::from_str::<Value>(trimmed) else { - continue; - }; - if capture.observed_session_id.is_none() - && let Some(session_id) = stream_session_id(&value) - { + } + reader.join().map_err(|_| { + ConsultInvocationError::Spawn(io::Error::other("Claude stdout reader panicked")) + })?; + Ok(capture) +} + +fn ingest_claude_stream_event( + event: ClaudeStreamEvent, + capture: &mut ClaudeStreamCapture, + request: &ConsultRequest, +) -> Result<bool, ConsultInvocationError> { + let line = match event { + ClaudeStreamEvent::Line(line) => line, + ClaudeStreamEvent::Eof => return Ok(true), + ClaudeStreamEvent::Error(error) => return Err(ConsultInvocationError::Spawn(error)), + }; + let trimmed = line.trim(); + if trimmed.is_empty() { + return Ok(false); + } + capture.stdout.push_str(trimmed); + capture.stdout.push('\n'); + let Ok(value) = serde_json::from_str::<Value>(trimmed) else { + return Ok(false); + }; + if capture.observed_session_id.is_none() + && let Some(session_id) = stream_session_id(&value) + { + request + .remember_context(Some(session_id.as_str())) + .map_err(ConsultInvocationError::Spawn)?; + capture.observed_session_id = Some(session_id); + } + if value.get("type").and_then(Value::as_str) == Some("result") { + let envelope = serde_json::from_value::<ClaudeJsonEnvelope>(value).map_err(|error| { + ConsultInvocationError::InvalidJson(format!("{error}; stream_line={trimmed}")) + })?; + if let Some(session_id) = envelope.session_id.as_deref() { request - .remember_context(Some(session_id.as_str())) + .remember_context(Some(session_id)) .map_err(ConsultInvocationError::Spawn)?; - capture.observed_session_id = Some(session_id); - } - if value.get("type").and_then(Value::as_str) == Some("result") { - let envelope = - serde_json::from_value::<ClaudeJsonEnvelope>(value).map_err(|error| { - ConsultInvocationError::InvalidJson(format!("{error}; stream_line={trimmed}")) - })?; - if let Some(session_id) = envelope.session_id.as_deref() { - request - .remember_context(Some(session_id)) - .map_err(ConsultInvocationError::Spawn)?; - capture.observed_session_id = Some(session_id.to_owned()); - } - capture.final_envelope = Some(envelope); + capture.observed_session_id = Some(session_id.to_owned()); } + capture.final_envelope = Some(envelope); } - Ok(capture) + Ok(false) +} + +fn claude_initial_output_timeout() -> Duration { + std::env::var(CLAUDE_INITIAL_OUTPUT_TIMEOUT_ENV) + .ok() + .and_then(|value| value.parse::<u64>().ok()) + .map(Duration::from_millis) + .unwrap_or(DEFAULT_CLAUDE_INITIAL_OUTPUT_TIMEOUT) +} + +fn consult_transient_unit() -> String { + format!("phone-opus-consult-{}", Uuid::new_v4().simple()) +} + +fn cull_consult_unit(child: &mut std::process::Child, transient_unit: &str) { + let runtime_dir = caller_runtime_dir(); + let _ = Command::new("systemctl") + .env("XDG_RUNTIME_DIR", runtime_dir.as_os_str()) + .env( + "DBUS_SESSION_BUS_ADDRESS", + caller_dbus_session_bus_address(runtime_dir.as_path()), + ) + .arg("--user") + .arg("stop") + .arg(transient_unit) + .stdout(Stdio::null()) + .stderr(Stdio::null()) + .status(); + let _ = child.kill(); + let _ = child.wait(); } fn stream_session_id(value: &Value) -> Option<String> { |