diff options
Diffstat (limited to 'crates/phone-opus/src/mcp')
| -rw-r--r-- | crates/phone-opus/src/mcp/service.rs | 172 |
1 files changed, 34 insertions, 138 deletions
diff --git a/crates/phone-opus/src/mcp/service.rs b/crates/phone-opus/src/mcp/service.rs index e152fa7..b92f92e 100644 --- a/crates/phone-opus/src/mcp/service.rs +++ b/crates/phone-opus/src/mcp/service.rs @@ -5,7 +5,6 @@ use std::io::{self, BufRead, Read, Write}; use std::os::unix::fs::symlink; use std::path::{Path, PathBuf}; use std::process::{Command, Stdio}; -use std::sync::mpsc; use std::thread; use std::time::{Duration, Instant, SystemTime, UNIX_EPOCH}; @@ -659,8 +658,6 @@ enum ConsultInvocationError { #[error("Claude Code returned non-JSON output: {0}")] InvalidJson(String), #[error("{0}")] - Stalled(String), - #[error("{0}")] Downstream(String), } @@ -693,12 +690,6 @@ struct ClaudeStreamCapture { final_envelope: Option<ClaudeJsonEnvelope>, } -enum ClaudeStreamEvent { - Line(String), - Eof, - Error(io::Error), -} - #[derive(Debug)] struct ConsultResponse { cwd: WorkingDirectory, @@ -755,8 +746,6 @@ const SHARED_TMP_ROOTS: [&str; 2] = ["/tmp", "/var/tmp"]; const CONSULT_OUTPUT_ROOT: &str = "/tmp/phone_opus-consults"; const CONSULT_OUTPUT_KEEP_COUNT: usize = 256; const CONSULT_OUTPUT_MAX_AGE: Duration = Duration::from_secs(7 * 24 * 60 * 60); -const DEFAULT_CLAUDE_INITIAL_OUTPUT_TIMEOUT: Duration = Duration::from_secs(60); -const CLAUDE_INITIAL_OUTPUT_TIMEOUT_ENV: &str = "PHONE_OPUS_CLAUDE_INITIAL_OUTPUT_TIMEOUT_MS"; const CONSULT_TIMESTAMP_FORMAT: &[time::format_description::FormatItem<'static>] = time::macros::format_description!("[year][month][day]T[hour][minute][second]Z"); const CLAUDE_MIRROR_FILES: [&str; 4] = [ @@ -1006,7 +995,6 @@ fn consult_fault( source.to_string(), ), ConsultInvocationError::InvalidJson(detail) - | ConsultInvocationError::Stalled(detail) | ConsultInvocationError::Downstream(detail) => { FaultRecord::downstream(generation, FaultStage::Claude, operation, detail) } @@ -1018,7 +1006,6 @@ fn consult_fault_context(request: &ConsultRequest, error: &ConsultInvocationErro let detail = match error { ConsultInvocationError::Spawn(_) => None, ConsultInvocationError::InvalidJson(detail) - | ConsultInvocationError::Stalled(detail) | ConsultInvocationError::Downstream(detail) => Some(detail.as_str()), }; let quota_reset_hint = detail.and_then(quota_reset_hint); @@ -1044,13 +1031,8 @@ fn consult_retry_hint(quota_limited: bool, error: &ConsultInvocationError) -> Op if quota_limited { return Some("wait for the quota window to reset, then retry the consult".to_owned()); } - match error { - ConsultInvocationError::Stalled(_) => Some( - "Claude stalled before producing output; retry the consult as a fresh one-shot call" - .to_owned(), - ), - _ => None, - } + let _ = error; + None } pub(crate) fn consult_job_tool_output( @@ -1245,7 +1227,6 @@ fn background_failure(error: ConsultInvocationError) -> BackgroundConsultFailure detail: source.to_string(), }, ConsultInvocationError::InvalidJson(detail) - | ConsultInvocationError::Stalled(detail) | ConsultInvocationError::Downstream(detail) => BackgroundConsultFailure { class: "downstream".to_owned(), detail, @@ -1424,7 +1405,6 @@ fn unix_ms_now() -> u64 { fn invoke_claude(request: &ConsultRequest) -> Result<ConsultResponse, ConsultInvocationError> { let sandbox = ClaudeSandbox::prepare().map_err(ConsultInvocationError::Spawn)?; - let transient_unit = consult_transient_unit(); let mut command = Command::new(SYSTEMD_RUN_BINARY); let runtime_dir = caller_runtime_dir(); let _ = command @@ -1438,8 +1418,6 @@ fn invoke_claude(request: &ConsultRequest) -> Result<ConsultResponse, ConsultInv .arg("--pipe") .arg("--collect") .arg("--quiet") - .arg("--unit") - .arg(transient_unit.as_str()) .arg("--working-directory") .arg(request.cwd.as_path()); for property in [ @@ -1508,7 +1486,7 @@ fn invoke_claude(request: &ConsultRequest) -> Result<ConsultResponse, ConsultInv let _ = stderr.read_to_string(&mut buffer)?; Ok(buffer) }); - let capture = capture_claude_stream(stdout, request, &mut child, transient_unit.as_str())?; + let capture = capture_claude_stream(stdout, request)?; let status = child.wait().map_err(ConsultInvocationError::Spawn)?; let stderr = stderr_reader .join() @@ -1584,127 +1562,45 @@ fn invoke_claude(request: &ConsultRequest) -> Result<ConsultResponse, ConsultInv } fn capture_claude_stream( - stdout: impl Read + Send + 'static, + stdout: impl Read, request: &ConsultRequest, - child: &mut std::process::Child, - transient_unit: &str, ) -> Result<ClaudeStreamCapture, ConsultInvocationError> { - let (sender, receiver) = mpsc::channel(); - let reader = thread::spawn(move || { - let reader = io::BufReader::new(stdout); - for line in reader.lines() { - match line { - Ok(line) => { - if sender.send(ClaudeStreamEvent::Line(line)).is_err() { - return; - } - } - Err(error) => { - let _ = sender.send(ClaudeStreamEvent::Error(error)); - return; - } - } - } - let _ = sender.send(ClaudeStreamEvent::Eof); - }); let mut capture = ClaudeStreamCapture::default(); - let first_event = receiver - .recv_timeout(claude_initial_output_timeout()) - .map_err(|_| { - cull_consult_unit(child, transient_unit); - ConsultInvocationError::Stalled(format!( - "Claude Code produced no stream output within {} ms; terminated stalled consult unit `{transient_unit}`", - claude_initial_output_timeout().as_millis() - )) - })?; - let first_event_finished = ingest_claude_stream_event(first_event, &mut capture, request)?; - if first_event_finished { - reader.join().map_err(|_| { - ConsultInvocationError::Spawn(io::Error::other("Claude stdout reader panicked")) - })?; - return Ok(capture); - } - while let Ok(event) = receiver.recv() { - if ingest_claude_stream_event(event, &mut capture, request)? { - break; + let reader = io::BufReader::new(stdout); + for line in reader.lines() { + let line = line.map_err(ConsultInvocationError::Spawn)?; + let trimmed = line.trim(); + if trimmed.is_empty() { + continue; } - } - reader.join().map_err(|_| { - ConsultInvocationError::Spawn(io::Error::other("Claude stdout reader panicked")) - })?; - Ok(capture) -} - -fn ingest_claude_stream_event( - event: ClaudeStreamEvent, - capture: &mut ClaudeStreamCapture, - request: &ConsultRequest, -) -> Result<bool, ConsultInvocationError> { - let line = match event { - ClaudeStreamEvent::Line(line) => line, - ClaudeStreamEvent::Eof => return Ok(true), - ClaudeStreamEvent::Error(error) => return Err(ConsultInvocationError::Spawn(error)), - }; - let trimmed = line.trim(); - if trimmed.is_empty() { - return Ok(false); - } - capture.stdout.push_str(trimmed); - capture.stdout.push('\n'); - let Ok(value) = serde_json::from_str::<Value>(trimmed) else { - return Ok(false); - }; - if capture.observed_session_id.is_none() - && let Some(session_id) = stream_session_id(&value) - { - request - .remember_context(Some(session_id.as_str())) - .map_err(ConsultInvocationError::Spawn)?; - capture.observed_session_id = Some(session_id); - } - if value.get("type").and_then(Value::as_str) == Some("result") { - let envelope = serde_json::from_value::<ClaudeJsonEnvelope>(value).map_err(|error| { - ConsultInvocationError::InvalidJson(format!("{error}; stream_line={trimmed}")) - })?; - if let Some(session_id) = envelope.session_id.as_deref() { + capture.stdout.push_str(trimmed); + capture.stdout.push('\n'); + let Ok(value) = serde_json::from_str::<Value>(trimmed) else { + continue; + }; + if capture.observed_session_id.is_none() + && let Some(session_id) = stream_session_id(&value) + { request - .remember_context(Some(session_id)) + .remember_context(Some(session_id.as_str())) .map_err(ConsultInvocationError::Spawn)?; - capture.observed_session_id = Some(session_id.to_owned()); + capture.observed_session_id = Some(session_id); + } + if value.get("type").and_then(Value::as_str) == Some("result") { + let envelope = + serde_json::from_value::<ClaudeJsonEnvelope>(value).map_err(|error| { + ConsultInvocationError::InvalidJson(format!("{error}; stream_line={trimmed}")) + })?; + if let Some(session_id) = envelope.session_id.as_deref() { + request + .remember_context(Some(session_id)) + .map_err(ConsultInvocationError::Spawn)?; + capture.observed_session_id = Some(session_id.to_owned()); + } + capture.final_envelope = Some(envelope); } - capture.final_envelope = Some(envelope); } - Ok(false) -} - -fn claude_initial_output_timeout() -> Duration { - std::env::var(CLAUDE_INITIAL_OUTPUT_TIMEOUT_ENV) - .ok() - .and_then(|value| value.parse::<u64>().ok()) - .map(Duration::from_millis) - .unwrap_or(DEFAULT_CLAUDE_INITIAL_OUTPUT_TIMEOUT) -} - -fn consult_transient_unit() -> String { - format!("phone-opus-consult-{}", Uuid::new_v4().simple()) -} - -fn cull_consult_unit(child: &mut std::process::Child, transient_unit: &str) { - let runtime_dir = caller_runtime_dir(); - let _ = Command::new("systemctl") - .env("XDG_RUNTIME_DIR", runtime_dir.as_os_str()) - .env( - "DBUS_SESSION_BUS_ADDRESS", - caller_dbus_session_bus_address(runtime_dir.as_path()), - ) - .arg("--user") - .arg("stop") - .arg(transient_unit) - .stdout(Stdio::null()) - .stderr(Stdio::null()) - .status(); - let _ = child.kill(); - let _ = child.wait(); + Ok(capture) } fn stream_session_id(value: &Value) -> Option<String> { |