swarm repositories / source
aboutsummaryrefslogtreecommitdiff
path: root/crates
diff options
context:
space:
mode:
authormain <main@swarm.moe>2026-03-25 12:49:33 -0400
committermain <main@swarm.moe>2026-03-25 12:49:33 -0400
commit7f28cb7356092934a25ab5fd277dce38b91eb8c0 (patch)
tree054bcf33708f253e5aaa8668105dc066cc7aa73c /crates
parentda6410fd33148e7dd0fec9190a1624e34f745b96 (diff)
downloadphone_opus-7f28cb7356092934a25ab5fd277dce38b91eb8c0.zip
Fail fast on inert Claude consults
Diffstat (limited to 'crates')
-rw-r--r--crates/phone-opus/src/mcp/service.rs317
-rw-r--r--crates/phone-opus/tests/mcp_hardening.rs138
2 files changed, 446 insertions, 9 deletions
diff --git a/crates/phone-opus/src/mcp/service.rs b/crates/phone-opus/src/mcp/service.rs
index 5a6ca91..38e472e 100644
--- a/crates/phone-opus/src/mcp/service.rs
+++ b/crates/phone-opus/src/mcp/service.rs
@@ -1,10 +1,11 @@
use std::collections::{BTreeMap, BTreeSet};
use std::fs;
-use std::io::{self, BufRead, Write};
+use std::io::{self, BufRead, Read, Write};
#[cfg(unix)]
use std::os::unix::fs::symlink;
use std::path::{Path, PathBuf};
-use std::process::{Command, Stdio};
+use std::process::{Child, Command, ExitStatus, Stdio};
+use std::sync::mpsc;
use std::thread;
use std::time::{Duration, Instant, SystemTime, UNIX_EPOCH};
@@ -666,6 +667,8 @@ enum ConsultInvocationError {
#[error("Claude Code returned non-JSON output: {0}")]
InvalidJson(String),
#[error("{0}")]
+ Stalled(String),
+ #[error("{0}")]
Downstream(String),
}
@@ -735,7 +738,151 @@ impl ConsultResponse {
}
}
+#[derive(Debug)]
+struct CompletedClaudeInvocation {
+ status: ExitStatus,
+ stdout: String,
+ stderr: String,
+}
+
+#[derive(Debug)]
+struct ClaudeProgressMonitor {
+ transcript: ClaudeTranscriptMonitor,
+ last_progress_at: Instant,
+ stall_timeout: Duration,
+}
+
+impl ClaudeProgressMonitor {
+ fn new(transcript: ClaudeTranscriptMonitor) -> Self {
+ Self {
+ transcript,
+ last_progress_at: Instant::now(),
+ stall_timeout: claude_stall_timeout(),
+ }
+ }
+
+ fn record_pipe_activity(&mut self) {
+ self.last_progress_at = Instant::now();
+ }
+
+ fn record_transcript_activity(&mut self) -> io::Result<()> {
+ if self.transcript.observe_progress()? {
+ self.last_progress_at = Instant::now();
+ }
+ Ok(())
+ }
+
+ fn stalled(&self) -> bool {
+ self.last_progress_at.elapsed() >= self.stall_timeout
+ }
+
+ fn stall_message(&self) -> String {
+ let timeout_ms = u64::try_from(self.stall_timeout.as_millis()).unwrap_or(u64::MAX);
+ match self.transcript.current_path() {
+ Some(path) => format!(
+ "Claude made no observable progress for {} while running {}; transcript={}",
+ render_duration_ms(timeout_ms),
+ self.transcript.session_id(),
+ path.display()
+ ),
+ None => format!(
+ "Claude made no observable progress for {} while running {}",
+ render_duration_ms(timeout_ms),
+ self.transcript.session_id()
+ ),
+ }
+ }
+}
+
+#[derive(Debug)]
+struct ClaudeTranscriptMonitor {
+ projects_root: PathBuf,
+ session_id: String,
+ resolved_path: Option<PathBuf>,
+ observed_signature: Option<TranscriptProgressSignature>,
+}
+
+impl ClaudeTranscriptMonitor {
+ fn new(projects_root: PathBuf, session_id: String) -> Self {
+ Self {
+ projects_root,
+ session_id,
+ resolved_path: None,
+ observed_signature: None,
+ }
+ }
+
+ fn session_id(&self) -> &str {
+ self.session_id.as_str()
+ }
+
+ fn current_path(&self) -> Option<&Path> {
+ self.resolved_path.as_deref()
+ }
+
+ fn observe_progress(&mut self) -> io::Result<bool> {
+ if self.resolved_path.is_none() {
+ self.resolved_path = self.find_transcript_path()?;
+ }
+ let Some(path) = self.resolved_path.as_ref() else {
+ return Ok(false);
+ };
+ let metadata = match fs::metadata(path) {
+ Ok(metadata) => metadata,
+ Err(error) if error.kind() == io::ErrorKind::NotFound => {
+ self.resolved_path = None;
+ self.observed_signature = None;
+ return Ok(false);
+ }
+ Err(error) => return Err(error),
+ };
+ let signature = TranscriptProgressSignature::from_metadata(&metadata)?;
+ let progressed = self.observed_signature.as_ref() != Some(&signature);
+ self.observed_signature = Some(signature);
+ Ok(progressed)
+ }
+
+ fn find_transcript_path(&self) -> io::Result<Option<PathBuf>> {
+ let entries = match fs::read_dir(&self.projects_root) {
+ Ok(entries) => entries,
+ Err(error) if error.kind() == io::ErrorKind::NotFound => return Ok(None),
+ Err(error) => return Err(error),
+ };
+ for entry in entries {
+ let entry = entry?;
+ if !entry.file_type()?.is_dir() {
+ continue;
+ }
+ let candidate = entry.path().join(format!("{}.jsonl", self.session_id));
+ if candidate.exists() {
+ return Ok(Some(candidate));
+ }
+ }
+ Ok(None)
+ }
+}
+
+#[derive(Debug, Clone, Eq, PartialEq)]
+struct TranscriptProgressSignature {
+ modified_unix_nanos: u128,
+ length_bytes: u64,
+}
+
+impl TranscriptProgressSignature {
+ fn from_metadata(metadata: &fs::Metadata) -> io::Result<Self> {
+ Ok(Self {
+ modified_unix_nanos: metadata
+ .modified()?
+ .duration_since(UNIX_EPOCH)
+ .map_err(|error| io::Error::other(format!("invalid transcript mtime: {error}")))?
+ .as_nanos(),
+ length_bytes: metadata.len(),
+ })
+ }
+}
+
const SYSTEMD_RUN_BINARY: &str = "systemd-run";
+const SYSTEMCTL_BINARY: &str = "systemctl";
const DEFAULT_PATH: &str = "/usr/local/bin:/usr/bin:/bin";
const PHONE_OPUS_STATE_ROOT_NAME: &str = "phone_opus";
const CONSULT_CONTEXT_INDEX_FILE_NAME: &str = "consult_contexts.json";
@@ -747,6 +894,8 @@ const SHARED_TMP_ROOTS: [&str; 2] = ["/tmp", "/var/tmp"];
const CONSULT_OUTPUT_ROOT: &str = "/tmp/phone_opus-consults";
const CONSULT_OUTPUT_KEEP_COUNT: usize = 256;
const CONSULT_OUTPUT_MAX_AGE: Duration = Duration::from_secs(7 * 24 * 60 * 60);
+const CLAUDE_PROGRESS_POLL_INTERVAL: Duration = Duration::from_millis(250);
+const DEFAULT_CLAUDE_STALL_TIMEOUT: Duration = Duration::from_secs(30);
const CONSULT_TIMESTAMP_FORMAT: &[time::format_description::FormatItem<'static>] =
time::macros::format_description!("[year][month][day]T[hour][minute][second]Z");
const CLAUDE_MIRROR_FILES: [&str; 4] = [
@@ -996,6 +1145,7 @@ fn consult_fault(
source.to_string(),
),
ConsultInvocationError::InvalidJson(detail)
+ | ConsultInvocationError::Stalled(detail)
| ConsultInvocationError::Downstream(detail) => {
FaultRecord::downstream(generation, FaultStage::Claude, operation, detail)
}
@@ -1007,6 +1157,7 @@ fn consult_fault_context(request: &ConsultRequest, error: &ConsultInvocationErro
let detail = match error {
ConsultInvocationError::Spawn(_) => None,
ConsultInvocationError::InvalidJson(detail)
+ | ConsultInvocationError::Stalled(detail)
| ConsultInvocationError::Downstream(detail) => Some(detail.as_str()),
};
let quota_reset_hint = detail.and_then(quota_reset_hint);
@@ -1032,8 +1183,14 @@ fn consult_retry_hint(quota_limited: bool, error: &ConsultInvocationError) -> Op
if quota_limited {
return Some("wait for the quota window to reset, then retry the consult".to_owned());
}
- let _ = error;
- None
+ match error {
+ ConsultInvocationError::Stalled(_) => {
+ Some("Claude stopped making observable progress; retry the consult".to_owned())
+ }
+ ConsultInvocationError::Spawn(_)
+ | ConsultInvocationError::InvalidJson(_)
+ | ConsultInvocationError::Downstream(_) => None,
+ }
}
pub(crate) fn consult_job_tool_output(
@@ -1228,6 +1385,7 @@ fn background_failure(error: ConsultInvocationError) -> BackgroundConsultFailure
detail: source.to_string(),
},
ConsultInvocationError::InvalidJson(detail)
+ | ConsultInvocationError::Stalled(detail)
| ConsultInvocationError::Downstream(detail) => BackgroundConsultFailure {
class: "downstream".to_owned(),
detail,
@@ -1406,6 +1564,11 @@ fn unix_ms_now() -> u64 {
fn invoke_claude(request: &ConsultRequest) -> Result<ConsultResponse, ConsultInvocationError> {
let sandbox = ClaudeSandbox::prepare().map_err(ConsultInvocationError::Spawn)?;
+ let unit_name = claude_unit_name(request);
+ let transcript = ClaudeTranscriptMonitor::new(
+ sandbox.claude_config_dir().join("projects"),
+ request.planned_session_id(),
+ );
let mut command = Command::new(SYSTEMD_RUN_BINARY);
let runtime_dir = caller_runtime_dir();
let _ = command
@@ -1414,6 +1577,7 @@ fn invoke_claude(request: &ConsultRequest) -> Result<ConsultResponse, ConsultInv
"DBUS_SESSION_BUS_ADDRESS",
caller_dbus_session_bus_address(runtime_dir.as_path()),
)
+ .arg(format!("--unit={unit_name}"))
.arg("--user")
.arg("--wait")
.arg("--pipe")
@@ -1472,12 +1636,10 @@ fn invoke_claude(request: &ConsultRequest) -> Result<ConsultResponse, ConsultInv
request
.remember_planned_context()
.map_err(ConsultInvocationError::Spawn)?;
- let output = child
- .wait_with_output()
- .map_err(ConsultInvocationError::Spawn)?;
+ let output = wait_for_claude_completion(child, unit_name.as_str(), transcript)?;
let status = output.status;
- let stdout = String::from_utf8_lossy(&output.stdout).trim().to_owned();
- let stderr = String::from_utf8_lossy(&output.stderr).trim().to_owned();
+ let stdout = output.stdout.trim().to_owned();
+ let stderr = output.stderr.trim().to_owned();
let envelope = parse_claude_json_envelope(&stdout, &stderr, status.code())?;
if envelope.envelope_type != "result" {
return Err(ConsultInvocationError::InvalidJson(format!(
@@ -1525,6 +1687,56 @@ fn invoke_claude(request: &ConsultRequest) -> Result<ConsultResponse, ConsultInv
})
}
+fn wait_for_claude_completion(
+ mut child: Child,
+ unit_name: &str,
+ transcript: ClaudeTranscriptMonitor,
+) -> Result<CompletedClaudeInvocation, ConsultInvocationError> {
+ let stdout = child
+ .stdout
+ .take()
+ .ok_or_else(|| ConsultInvocationError::Spawn(io::Error::other("missing Claude stdout")))?;
+ let stderr = child
+ .stderr
+ .take()
+ .ok_or_else(|| ConsultInvocationError::Spawn(io::Error::other("missing Claude stderr")))?;
+ let (activity_tx, activity_rx) = mpsc::channel::<()>();
+ let stdout_reader = spawn_pipe_reader(stdout, activity_tx.clone());
+ let stderr_reader = spawn_pipe_reader(stderr, activity_tx);
+ let mut progress = ClaudeProgressMonitor::new(transcript);
+
+ loop {
+ while activity_rx.try_recv().is_ok() {
+ progress.record_pipe_activity();
+ }
+ progress
+ .record_transcript_activity()
+ .map_err(ConsultInvocationError::Spawn)?;
+ if let Some(status) = child.try_wait().map_err(ConsultInvocationError::Spawn)? {
+ return Ok(CompletedClaudeInvocation {
+ status,
+ stdout: join_output_reader(stdout_reader, "stdout")?,
+ stderr: join_output_reader(stderr_reader, "stderr")?,
+ });
+ }
+ if progress.stalled() {
+ terminate_claude_invocation(unit_name, &mut child);
+ let stdout = join_output_reader(stdout_reader, "stdout").unwrap_or_default();
+ let stderr = join_output_reader(stderr_reader, "stderr").unwrap_or_default();
+ let mut detail = progress.stall_message();
+ if !stderr.trim().is_empty() {
+ detail.push_str("; stderr=");
+ detail.push_str(stderr.trim());
+ } else if !stdout.trim().is_empty() {
+ detail.push_str("; stdout=");
+ detail.push_str(stdout.trim());
+ }
+ return Err(ConsultInvocationError::Stalled(detail));
+ }
+ thread::sleep(CLAUDE_PROGRESS_POLL_INTERVAL);
+ }
+}
+
fn parse_claude_json_envelope(
stdout: &str,
stderr: &str,
@@ -1551,6 +1763,93 @@ fn downstream_message(status_code: Option<i32>, stdout: &str, stderr: &str) -> S
format!("Claude Code exited with status {status_code:?}")
}
+fn spawn_pipe_reader(
+ mut pipe: impl Read + Send + 'static,
+ activity_tx: mpsc::Sender<()>,
+) -> thread::JoinHandle<io::Result<String>> {
+ thread::spawn(move || {
+ let mut bytes = Vec::new();
+ let mut buffer = [0_u8; 8192];
+ loop {
+ let read = pipe.read(&mut buffer)?;
+ if read == 0 {
+ break;
+ }
+ bytes.extend_from_slice(&buffer[..read]);
+ let _ = activity_tx.send(());
+ }
+ Ok(String::from_utf8_lossy(&bytes).into_owned())
+ })
+}
+
+fn join_output_reader(
+ reader: thread::JoinHandle<io::Result<String>>,
+ stream_name: &str,
+) -> Result<String, ConsultInvocationError> {
+ reader
+ .join()
+ .map_err(|_| {
+ ConsultInvocationError::Spawn(io::Error::other(format!(
+ "Claude {stream_name} reader panicked"
+ )))
+ })?
+ .map_err(ConsultInvocationError::Spawn)
+}
+
+fn terminate_claude_invocation(unit_name: &str, child: &mut Child) {
+ let _ = stop_transient_unit(unit_name);
+ let _ = child.kill();
+ let _ = child.wait();
+}
+
+fn stop_transient_unit(unit_name: &str) -> io::Result<()> {
+ let runtime_dir = caller_runtime_dir();
+ let status = Command::new(SYSTEMCTL_BINARY)
+ .env("XDG_RUNTIME_DIR", runtime_dir.as_os_str())
+ .env(
+ "DBUS_SESSION_BUS_ADDRESS",
+ caller_dbus_session_bus_address(runtime_dir.as_path()),
+ )
+ .stdout(Stdio::null())
+ .stderr(Stdio::null())
+ .arg("--user")
+ .arg("stop")
+ .arg(unit_name)
+ .status()?;
+ let _ = Command::new(SYSTEMCTL_BINARY)
+ .env("XDG_RUNTIME_DIR", runtime_dir.as_os_str())
+ .env(
+ "DBUS_SESSION_BUS_ADDRESS",
+ caller_dbus_session_bus_address(runtime_dir.as_path()),
+ )
+ .stdout(Stdio::null())
+ .stderr(Stdio::null())
+ .arg("--user")
+ .arg("reset-failed")
+ .arg(unit_name)
+ .status();
+ if status.success() {
+ Ok(())
+ } else {
+ Err(io::Error::other(format!(
+ "failed to stop transient unit `{unit_name}`"
+ )))
+ }
+}
+
+fn claude_unit_name(request: &ConsultRequest) -> String {
+ format!("phone-opus-claude-{}", request.planned_session_id())
+}
+
+fn claude_stall_timeout() -> Duration {
+ std::env::var("PHONE_OPUS_CLAUDE_STALL_TIMEOUT_MS")
+ .ok()
+ .and_then(|value| value.parse::<u64>().ok())
+ .filter(|value| *value > 0)
+ .map(Duration::from_millis)
+ .unwrap_or(DEFAULT_CLAUDE_STALL_TIMEOUT)
+}
+
fn claude_binary() -> PathBuf {
std::env::var_os(CLAUDE_BIN_ENV)
.map(PathBuf::from)
diff --git a/crates/phone-opus/tests/mcp_hardening.rs b/crates/phone-opus/tests/mcp_hardening.rs
index ed996db..754ee79 100644
--- a/crates/phone-opus/tests/mcp_hardening.rs
+++ b/crates/phone-opus/tests/mcp_hardening.rs
@@ -6,6 +6,7 @@ use std::io::{self, BufRead, BufReader, Write};
use std::os::unix::fs::PermissionsExt;
use std::path::{Path, PathBuf};
use std::process::{Child, ChildStdin, ChildStdout, Command, Stdio};
+use std::time::{Duration, Instant};
use libmcp_testkit::read_json_lines;
use serde as _;
@@ -196,6 +197,56 @@ fi
if [ -n "${PHONE_OPUS_TEST_ARGS_FILE:-}" ]; then
printf '%s\n' "$@" >"$PHONE_OPUS_TEST_ARGS_FILE"
fi
+if [ -n "${PHONE_OPUS_TEST_TRANSCRIPT_HEARTBEAT_MS:-}" ]; then
+ session_id=""
+ previous=""
+ for argument in "$@"; do
+ if [ "$previous" = "--session-id" ]; then
+ session_id="$argument"
+ break
+ fi
+ previous="$argument"
+ done
+ if [ -n "$session_id" ]; then
+ SESSION_ID="$session_id" python3 - <<'PY'
+import os
+import time
+from pathlib import Path
+
+session_id = os.environ["SESSION_ID"]
+cwd = Path.cwd()
+slug = []
+last_dash = False
+for ch in str(cwd):
+ nxt = ch.lower() if ch.isalnum() else "-"
+ if nxt == "-":
+ if not slug:
+ slug.append("-")
+ last_dash = True
+ continue
+ if last_dash:
+ continue
+ last_dash = True
+ else:
+ last_dash = False
+ slug.append(nxt)
+transcript = (
+ Path(os.environ["HOME"])
+ / ".claude"
+ / "projects"
+ / "".join(slug)
+ / f"{session_id}.jsonl"
+)
+transcript.parent.mkdir(parents=True, exist_ok=True)
+interval = int(os.environ["PHONE_OPUS_TEST_TRANSCRIPT_HEARTBEAT_MS"]) / 1000.0
+count = int(os.environ.get("PHONE_OPUS_TEST_TRANSCRIPT_HEARTBEAT_COUNT", "1"))
+for index in range(count):
+ with transcript.open("a", encoding="utf-8") as handle:
+ handle.write(f'{{"kind":"heartbeat","index":{index}}}\n')
+ time.sleep(interval)
+PY
+ fi
+fi
if [ -n "${PHONE_OPUS_TEST_SLEEP_MS:-}" ]; then
python3 -c 'import os,time; time.sleep(int(os.environ["PHONE_OPUS_TEST_SLEEP_MS"]) / 1000.0)'
fi
@@ -665,6 +716,93 @@ fn consult_is_one_shot_and_hides_session_state() -> TestResult {
}
#[test]
+fn silent_claude_is_failed_fast_when_progress_stalls() -> TestResult {
+ let root = temp_root("consult_stall_timeout")?;
+ let state_home = root.join("state-home");
+ let caller_home = root.join("caller-home");
+ let fake_claude = root.join("claude");
+ must(fs::create_dir_all(&state_home), "create state home")?;
+ must(fs::create_dir_all(&caller_home), "create caller home")?;
+ seed_caller_claude_home(&caller_home)?;
+ write_fake_claude_script(&fake_claude)?;
+
+ let claude_bin = fake_claude.display().to_string();
+ let caller_home_path = caller_home.display().to_string();
+ let env = [
+ ("HOME", caller_home_path.as_str()),
+ ("PHONE_OPUS_CLAUDE_BIN", claude_bin.as_str()),
+ ("PHONE_OPUS_TEST_SLEEP_MS", "5000"),
+ ("PHONE_OPUS_CLAUDE_STALL_TIMEOUT_MS", "750"),
+ ];
+ let mut harness = McpHarness::spawn(&state_home, &env)?;
+ let _ = harness.initialize()?;
+ harness.notify_initialized()?;
+
+ let started = Instant::now();
+ let stalled = harness.call_tool(3, "consult", json!({ "prompt": "stall" }))?;
+ assert_tool_error(&stalled);
+ assert!(started.elapsed() < Duration::from_secs(4));
+ assert_eq!(
+ tool_content(&stalled)["fault"]["class"].as_str(),
+ Some("downstream")
+ );
+ assert!(
+ tool_content(&stalled)["fault"]["detail"]
+ .as_str()
+ .is_some_and(|value| value.contains("no observable progress"))
+ );
+ assert_eq!(
+ tool_content(&stalled)["context"]["consult"]["retry_hint"].as_str(),
+ Some("Claude stopped making observable progress; retry the consult")
+ );
+ Ok(())
+}
+
+#[test]
+fn transcript_progress_prevents_false_stall_timeout() -> TestResult {
+ let root = temp_root("consult_transcript_progress")?;
+ let state_home = root.join("state-home");
+ let caller_home = root.join("caller-home");
+ let fake_claude = root.join("claude");
+ let stdout_file = root.join("stdout.json");
+ must(fs::create_dir_all(&state_home), "create state home")?;
+ must(fs::create_dir_all(&caller_home), "create caller home")?;
+ seed_caller_claude_home(&caller_home)?;
+ write_fake_claude_script(&fake_claude)?;
+ write_fake_claude_json_success(
+ &stdout_file,
+ "heartbeat oracle",
+ "1bfb2c8a-c6d8-42f6-8f18-6b3c70ad2e11",
+ "uuid-heartbeat",
+ )?;
+
+ let claude_bin = fake_claude.display().to_string();
+ let stdout_path = stdout_file.display().to_string();
+ let caller_home_path = caller_home.display().to_string();
+ let env = [
+ ("HOME", caller_home_path.as_str()),
+ ("PHONE_OPUS_CLAUDE_BIN", claude_bin.as_str()),
+ ("PHONE_OPUS_TEST_STDOUT_FILE", stdout_path.as_str()),
+ ("PHONE_OPUS_TEST_TRANSCRIPT_HEARTBEAT_MS", "200"),
+ ("PHONE_OPUS_TEST_TRANSCRIPT_HEARTBEAT_COUNT", "5"),
+ ("PHONE_OPUS_CLAUDE_STALL_TIMEOUT_MS", "500"),
+ ];
+ let mut harness = McpHarness::spawn(&state_home, &env)?;
+ let _ = harness.initialize()?;
+ harness.notify_initialized()?;
+
+ let started = Instant::now();
+ let consult = harness.call_tool(3, "consult", json!({ "prompt": "heartbeat" }))?;
+ assert_tool_ok(&consult);
+ assert!(started.elapsed() >= Duration::from_millis(800));
+ assert_eq!(
+ tool_content(&consult)["response"].as_str(),
+ Some("heartbeat oracle")
+ );
+ Ok(())
+}
+
+#[test]
fn background_surfaces_are_hidden_from_public_mcp() -> TestResult {
let root = temp_root("consult_hidden_background")?;
let state_home = root.join("state-home");