diff options
Diffstat (limited to 'crates/phone-opus/src')
| -rw-r--r-- | crates/phone-opus/src/mcp/service.rs | 317 |
1 files changed, 308 insertions, 9 deletions
diff --git a/crates/phone-opus/src/mcp/service.rs b/crates/phone-opus/src/mcp/service.rs index 5a6ca91..38e472e 100644 --- a/crates/phone-opus/src/mcp/service.rs +++ b/crates/phone-opus/src/mcp/service.rs @@ -1,10 +1,11 @@ use std::collections::{BTreeMap, BTreeSet}; use std::fs; -use std::io::{self, BufRead, Write}; +use std::io::{self, BufRead, Read, Write}; #[cfg(unix)] use std::os::unix::fs::symlink; use std::path::{Path, PathBuf}; -use std::process::{Command, Stdio}; +use std::process::{Child, Command, ExitStatus, Stdio}; +use std::sync::mpsc; use std::thread; use std::time::{Duration, Instant, SystemTime, UNIX_EPOCH}; @@ -666,6 +667,8 @@ enum ConsultInvocationError { #[error("Claude Code returned non-JSON output: {0}")] InvalidJson(String), #[error("{0}")] + Stalled(String), + #[error("{0}")] Downstream(String), } @@ -735,7 +738,151 @@ impl ConsultResponse { } } +#[derive(Debug)] +struct CompletedClaudeInvocation { + status: ExitStatus, + stdout: String, + stderr: String, +} + +#[derive(Debug)] +struct ClaudeProgressMonitor { + transcript: ClaudeTranscriptMonitor, + last_progress_at: Instant, + stall_timeout: Duration, +} + +impl ClaudeProgressMonitor { + fn new(transcript: ClaudeTranscriptMonitor) -> Self { + Self { + transcript, + last_progress_at: Instant::now(), + stall_timeout: claude_stall_timeout(), + } + } + + fn record_pipe_activity(&mut self) { + self.last_progress_at = Instant::now(); + } + + fn record_transcript_activity(&mut self) -> io::Result<()> { + if self.transcript.observe_progress()? { + self.last_progress_at = Instant::now(); + } + Ok(()) + } + + fn stalled(&self) -> bool { + self.last_progress_at.elapsed() >= self.stall_timeout + } + + fn stall_message(&self) -> String { + let timeout_ms = u64::try_from(self.stall_timeout.as_millis()).unwrap_or(u64::MAX); + match self.transcript.current_path() { + Some(path) => format!( + "Claude made no observable progress for {} while running {}; transcript={}", + render_duration_ms(timeout_ms), + self.transcript.session_id(), + path.display() + ), + None => format!( + "Claude made no observable progress for {} while running {}", + render_duration_ms(timeout_ms), + self.transcript.session_id() + ), + } + } +} + +#[derive(Debug)] +struct ClaudeTranscriptMonitor { + projects_root: PathBuf, + session_id: String, + resolved_path: Option<PathBuf>, + observed_signature: Option<TranscriptProgressSignature>, +} + +impl ClaudeTranscriptMonitor { + fn new(projects_root: PathBuf, session_id: String) -> Self { + Self { + projects_root, + session_id, + resolved_path: None, + observed_signature: None, + } + } + + fn session_id(&self) -> &str { + self.session_id.as_str() + } + + fn current_path(&self) -> Option<&Path> { + self.resolved_path.as_deref() + } + + fn observe_progress(&mut self) -> io::Result<bool> { + if self.resolved_path.is_none() { + self.resolved_path = self.find_transcript_path()?; + } + let Some(path) = self.resolved_path.as_ref() else { + return Ok(false); + }; + let metadata = match fs::metadata(path) { + Ok(metadata) => metadata, + Err(error) if error.kind() == io::ErrorKind::NotFound => { + self.resolved_path = None; + self.observed_signature = None; + return Ok(false); + } + Err(error) => return Err(error), + }; + let signature = TranscriptProgressSignature::from_metadata(&metadata)?; + let progressed = self.observed_signature.as_ref() != Some(&signature); + self.observed_signature = Some(signature); + Ok(progressed) + } + + fn find_transcript_path(&self) -> io::Result<Option<PathBuf>> { + let entries = match fs::read_dir(&self.projects_root) { + Ok(entries) => entries, + Err(error) if error.kind() == io::ErrorKind::NotFound => return Ok(None), + Err(error) => return Err(error), + }; + for entry in entries { + let entry = entry?; + if !entry.file_type()?.is_dir() { + continue; + } + let candidate = entry.path().join(format!("{}.jsonl", self.session_id)); + if candidate.exists() { + return Ok(Some(candidate)); + } + } + Ok(None) + } +} + +#[derive(Debug, Clone, Eq, PartialEq)] +struct TranscriptProgressSignature { + modified_unix_nanos: u128, + length_bytes: u64, +} + +impl TranscriptProgressSignature { + fn from_metadata(metadata: &fs::Metadata) -> io::Result<Self> { + Ok(Self { + modified_unix_nanos: metadata + .modified()? + .duration_since(UNIX_EPOCH) + .map_err(|error| io::Error::other(format!("invalid transcript mtime: {error}")))? + .as_nanos(), + length_bytes: metadata.len(), + }) + } +} + const SYSTEMD_RUN_BINARY: &str = "systemd-run"; +const SYSTEMCTL_BINARY: &str = "systemctl"; const DEFAULT_PATH: &str = "/usr/local/bin:/usr/bin:/bin"; const PHONE_OPUS_STATE_ROOT_NAME: &str = "phone_opus"; const CONSULT_CONTEXT_INDEX_FILE_NAME: &str = "consult_contexts.json"; @@ -747,6 +894,8 @@ const SHARED_TMP_ROOTS: [&str; 2] = ["/tmp", "/var/tmp"]; const CONSULT_OUTPUT_ROOT: &str = "/tmp/phone_opus-consults"; const CONSULT_OUTPUT_KEEP_COUNT: usize = 256; const CONSULT_OUTPUT_MAX_AGE: Duration = Duration::from_secs(7 * 24 * 60 * 60); +const CLAUDE_PROGRESS_POLL_INTERVAL: Duration = Duration::from_millis(250); +const DEFAULT_CLAUDE_STALL_TIMEOUT: Duration = Duration::from_secs(30); const CONSULT_TIMESTAMP_FORMAT: &[time::format_description::FormatItem<'static>] = time::macros::format_description!("[year][month][day]T[hour][minute][second]Z"); const CLAUDE_MIRROR_FILES: [&str; 4] = [ @@ -996,6 +1145,7 @@ fn consult_fault( source.to_string(), ), ConsultInvocationError::InvalidJson(detail) + | ConsultInvocationError::Stalled(detail) | ConsultInvocationError::Downstream(detail) => { FaultRecord::downstream(generation, FaultStage::Claude, operation, detail) } @@ -1007,6 +1157,7 @@ fn consult_fault_context(request: &ConsultRequest, error: &ConsultInvocationErro let detail = match error { ConsultInvocationError::Spawn(_) => None, ConsultInvocationError::InvalidJson(detail) + | ConsultInvocationError::Stalled(detail) | ConsultInvocationError::Downstream(detail) => Some(detail.as_str()), }; let quota_reset_hint = detail.and_then(quota_reset_hint); @@ -1032,8 +1183,14 @@ fn consult_retry_hint(quota_limited: bool, error: &ConsultInvocationError) -> Op if quota_limited { return Some("wait for the quota window to reset, then retry the consult".to_owned()); } - let _ = error; - None + match error { + ConsultInvocationError::Stalled(_) => { + Some("Claude stopped making observable progress; retry the consult".to_owned()) + } + ConsultInvocationError::Spawn(_) + | ConsultInvocationError::InvalidJson(_) + | ConsultInvocationError::Downstream(_) => None, + } } pub(crate) fn consult_job_tool_output( @@ -1228,6 +1385,7 @@ fn background_failure(error: ConsultInvocationError) -> BackgroundConsultFailure detail: source.to_string(), }, ConsultInvocationError::InvalidJson(detail) + | ConsultInvocationError::Stalled(detail) | ConsultInvocationError::Downstream(detail) => BackgroundConsultFailure { class: "downstream".to_owned(), detail, @@ -1406,6 +1564,11 @@ fn unix_ms_now() -> u64 { fn invoke_claude(request: &ConsultRequest) -> Result<ConsultResponse, ConsultInvocationError> { let sandbox = ClaudeSandbox::prepare().map_err(ConsultInvocationError::Spawn)?; + let unit_name = claude_unit_name(request); + let transcript = ClaudeTranscriptMonitor::new( + sandbox.claude_config_dir().join("projects"), + request.planned_session_id(), + ); let mut command = Command::new(SYSTEMD_RUN_BINARY); let runtime_dir = caller_runtime_dir(); let _ = command @@ -1414,6 +1577,7 @@ fn invoke_claude(request: &ConsultRequest) -> Result<ConsultResponse, ConsultInv "DBUS_SESSION_BUS_ADDRESS", caller_dbus_session_bus_address(runtime_dir.as_path()), ) + .arg(format!("--unit={unit_name}")) .arg("--user") .arg("--wait") .arg("--pipe") @@ -1472,12 +1636,10 @@ fn invoke_claude(request: &ConsultRequest) -> Result<ConsultResponse, ConsultInv request .remember_planned_context() .map_err(ConsultInvocationError::Spawn)?; - let output = child - .wait_with_output() - .map_err(ConsultInvocationError::Spawn)?; + let output = wait_for_claude_completion(child, unit_name.as_str(), transcript)?; let status = output.status; - let stdout = String::from_utf8_lossy(&output.stdout).trim().to_owned(); - let stderr = String::from_utf8_lossy(&output.stderr).trim().to_owned(); + let stdout = output.stdout.trim().to_owned(); + let stderr = output.stderr.trim().to_owned(); let envelope = parse_claude_json_envelope(&stdout, &stderr, status.code())?; if envelope.envelope_type != "result" { return Err(ConsultInvocationError::InvalidJson(format!( @@ -1525,6 +1687,56 @@ fn invoke_claude(request: &ConsultRequest) -> Result<ConsultResponse, ConsultInv }) } +fn wait_for_claude_completion( + mut child: Child, + unit_name: &str, + transcript: ClaudeTranscriptMonitor, +) -> Result<CompletedClaudeInvocation, ConsultInvocationError> { + let stdout = child + .stdout + .take() + .ok_or_else(|| ConsultInvocationError::Spawn(io::Error::other("missing Claude stdout")))?; + let stderr = child + .stderr + .take() + .ok_or_else(|| ConsultInvocationError::Spawn(io::Error::other("missing Claude stderr")))?; + let (activity_tx, activity_rx) = mpsc::channel::<()>(); + let stdout_reader = spawn_pipe_reader(stdout, activity_tx.clone()); + let stderr_reader = spawn_pipe_reader(stderr, activity_tx); + let mut progress = ClaudeProgressMonitor::new(transcript); + + loop { + while activity_rx.try_recv().is_ok() { + progress.record_pipe_activity(); + } + progress + .record_transcript_activity() + .map_err(ConsultInvocationError::Spawn)?; + if let Some(status) = child.try_wait().map_err(ConsultInvocationError::Spawn)? { + return Ok(CompletedClaudeInvocation { + status, + stdout: join_output_reader(stdout_reader, "stdout")?, + stderr: join_output_reader(stderr_reader, "stderr")?, + }); + } + if progress.stalled() { + terminate_claude_invocation(unit_name, &mut child); + let stdout = join_output_reader(stdout_reader, "stdout").unwrap_or_default(); + let stderr = join_output_reader(stderr_reader, "stderr").unwrap_or_default(); + let mut detail = progress.stall_message(); + if !stderr.trim().is_empty() { + detail.push_str("; stderr="); + detail.push_str(stderr.trim()); + } else if !stdout.trim().is_empty() { + detail.push_str("; stdout="); + detail.push_str(stdout.trim()); + } + return Err(ConsultInvocationError::Stalled(detail)); + } + thread::sleep(CLAUDE_PROGRESS_POLL_INTERVAL); + } +} + fn parse_claude_json_envelope( stdout: &str, stderr: &str, @@ -1551,6 +1763,93 @@ fn downstream_message(status_code: Option<i32>, stdout: &str, stderr: &str) -> S format!("Claude Code exited with status {status_code:?}") } +fn spawn_pipe_reader( + mut pipe: impl Read + Send + 'static, + activity_tx: mpsc::Sender<()>, +) -> thread::JoinHandle<io::Result<String>> { + thread::spawn(move || { + let mut bytes = Vec::new(); + let mut buffer = [0_u8; 8192]; + loop { + let read = pipe.read(&mut buffer)?; + if read == 0 { + break; + } + bytes.extend_from_slice(&buffer[..read]); + let _ = activity_tx.send(()); + } + Ok(String::from_utf8_lossy(&bytes).into_owned()) + }) +} + +fn join_output_reader( + reader: thread::JoinHandle<io::Result<String>>, + stream_name: &str, +) -> Result<String, ConsultInvocationError> { + reader + .join() + .map_err(|_| { + ConsultInvocationError::Spawn(io::Error::other(format!( + "Claude {stream_name} reader panicked" + ))) + })? + .map_err(ConsultInvocationError::Spawn) +} + +fn terminate_claude_invocation(unit_name: &str, child: &mut Child) { + let _ = stop_transient_unit(unit_name); + let _ = child.kill(); + let _ = child.wait(); +} + +fn stop_transient_unit(unit_name: &str) -> io::Result<()> { + let runtime_dir = caller_runtime_dir(); + let status = Command::new(SYSTEMCTL_BINARY) + .env("XDG_RUNTIME_DIR", runtime_dir.as_os_str()) + .env( + "DBUS_SESSION_BUS_ADDRESS", + caller_dbus_session_bus_address(runtime_dir.as_path()), + ) + .stdout(Stdio::null()) + .stderr(Stdio::null()) + .arg("--user") + .arg("stop") + .arg(unit_name) + .status()?; + let _ = Command::new(SYSTEMCTL_BINARY) + .env("XDG_RUNTIME_DIR", runtime_dir.as_os_str()) + .env( + "DBUS_SESSION_BUS_ADDRESS", + caller_dbus_session_bus_address(runtime_dir.as_path()), + ) + .stdout(Stdio::null()) + .stderr(Stdio::null()) + .arg("--user") + .arg("reset-failed") + .arg(unit_name) + .status(); + if status.success() { + Ok(()) + } else { + Err(io::Error::other(format!( + "failed to stop transient unit `{unit_name}`" + ))) + } +} + +fn claude_unit_name(request: &ConsultRequest) -> String { + format!("phone-opus-claude-{}", request.planned_session_id()) +} + +fn claude_stall_timeout() -> Duration { + std::env::var("PHONE_OPUS_CLAUDE_STALL_TIMEOUT_MS") + .ok() + .and_then(|value| value.parse::<u64>().ok()) + .filter(|value| *value > 0) + .map(Duration::from_millis) + .unwrap_or(DEFAULT_CLAUDE_STALL_TIMEOUT) +} + fn claude_binary() -> PathBuf { std::env::var_os(CLAUDE_BIN_ENV) .map(PathBuf::from) |