swarm repositories / source
aboutsummaryrefslogtreecommitdiff
path: root/crates/phone-opus/src
diff options
context:
space:
mode:
Diffstat (limited to 'crates/phone-opus/src')
-rw-r--r--crates/phone-opus/src/mcp/service.rs172
1 files changed, 34 insertions, 138 deletions
diff --git a/crates/phone-opus/src/mcp/service.rs b/crates/phone-opus/src/mcp/service.rs
index e152fa7..b92f92e 100644
--- a/crates/phone-opus/src/mcp/service.rs
+++ b/crates/phone-opus/src/mcp/service.rs
@@ -5,7 +5,6 @@ use std::io::{self, BufRead, Read, Write};
use std::os::unix::fs::symlink;
use std::path::{Path, PathBuf};
use std::process::{Command, Stdio};
-use std::sync::mpsc;
use std::thread;
use std::time::{Duration, Instant, SystemTime, UNIX_EPOCH};
@@ -659,8 +658,6 @@ enum ConsultInvocationError {
#[error("Claude Code returned non-JSON output: {0}")]
InvalidJson(String),
#[error("{0}")]
- Stalled(String),
- #[error("{0}")]
Downstream(String),
}
@@ -693,12 +690,6 @@ struct ClaudeStreamCapture {
final_envelope: Option<ClaudeJsonEnvelope>,
}
-enum ClaudeStreamEvent {
- Line(String),
- Eof,
- Error(io::Error),
-}
-
#[derive(Debug)]
struct ConsultResponse {
cwd: WorkingDirectory,
@@ -755,8 +746,6 @@ const SHARED_TMP_ROOTS: [&str; 2] = ["/tmp", "/var/tmp"];
const CONSULT_OUTPUT_ROOT: &str = "/tmp/phone_opus-consults";
const CONSULT_OUTPUT_KEEP_COUNT: usize = 256;
const CONSULT_OUTPUT_MAX_AGE: Duration = Duration::from_secs(7 * 24 * 60 * 60);
-const DEFAULT_CLAUDE_INITIAL_OUTPUT_TIMEOUT: Duration = Duration::from_secs(60);
-const CLAUDE_INITIAL_OUTPUT_TIMEOUT_ENV: &str = "PHONE_OPUS_CLAUDE_INITIAL_OUTPUT_TIMEOUT_MS";
const CONSULT_TIMESTAMP_FORMAT: &[time::format_description::FormatItem<'static>] =
time::macros::format_description!("[year][month][day]T[hour][minute][second]Z");
const CLAUDE_MIRROR_FILES: [&str; 4] = [
@@ -1006,7 +995,6 @@ fn consult_fault(
source.to_string(),
),
ConsultInvocationError::InvalidJson(detail)
- | ConsultInvocationError::Stalled(detail)
| ConsultInvocationError::Downstream(detail) => {
FaultRecord::downstream(generation, FaultStage::Claude, operation, detail)
}
@@ -1018,7 +1006,6 @@ fn consult_fault_context(request: &ConsultRequest, error: &ConsultInvocationErro
let detail = match error {
ConsultInvocationError::Spawn(_) => None,
ConsultInvocationError::InvalidJson(detail)
- | ConsultInvocationError::Stalled(detail)
| ConsultInvocationError::Downstream(detail) => Some(detail.as_str()),
};
let quota_reset_hint = detail.and_then(quota_reset_hint);
@@ -1044,13 +1031,8 @@ fn consult_retry_hint(quota_limited: bool, error: &ConsultInvocationError) -> Op
if quota_limited {
return Some("wait for the quota window to reset, then retry the consult".to_owned());
}
- match error {
- ConsultInvocationError::Stalled(_) => Some(
- "Claude stalled before producing output; retry the consult as a fresh one-shot call"
- .to_owned(),
- ),
- _ => None,
- }
+ let _ = error;
+ None
}
pub(crate) fn consult_job_tool_output(
@@ -1245,7 +1227,6 @@ fn background_failure(error: ConsultInvocationError) -> BackgroundConsultFailure
detail: source.to_string(),
},
ConsultInvocationError::InvalidJson(detail)
- | ConsultInvocationError::Stalled(detail)
| ConsultInvocationError::Downstream(detail) => BackgroundConsultFailure {
class: "downstream".to_owned(),
detail,
@@ -1424,7 +1405,6 @@ fn unix_ms_now() -> u64 {
fn invoke_claude(request: &ConsultRequest) -> Result<ConsultResponse, ConsultInvocationError> {
let sandbox = ClaudeSandbox::prepare().map_err(ConsultInvocationError::Spawn)?;
- let transient_unit = consult_transient_unit();
let mut command = Command::new(SYSTEMD_RUN_BINARY);
let runtime_dir = caller_runtime_dir();
let _ = command
@@ -1438,8 +1418,6 @@ fn invoke_claude(request: &ConsultRequest) -> Result<ConsultResponse, ConsultInv
.arg("--pipe")
.arg("--collect")
.arg("--quiet")
- .arg("--unit")
- .arg(transient_unit.as_str())
.arg("--working-directory")
.arg(request.cwd.as_path());
for property in [
@@ -1508,7 +1486,7 @@ fn invoke_claude(request: &ConsultRequest) -> Result<ConsultResponse, ConsultInv
let _ = stderr.read_to_string(&mut buffer)?;
Ok(buffer)
});
- let capture = capture_claude_stream(stdout, request, &mut child, transient_unit.as_str())?;
+ let capture = capture_claude_stream(stdout, request)?;
let status = child.wait().map_err(ConsultInvocationError::Spawn)?;
let stderr = stderr_reader
.join()
@@ -1584,127 +1562,45 @@ fn invoke_claude(request: &ConsultRequest) -> Result<ConsultResponse, ConsultInv
}
fn capture_claude_stream(
- stdout: impl Read + Send + 'static,
+ stdout: impl Read,
request: &ConsultRequest,
- child: &mut std::process::Child,
- transient_unit: &str,
) -> Result<ClaudeStreamCapture, ConsultInvocationError> {
- let (sender, receiver) = mpsc::channel();
- let reader = thread::spawn(move || {
- let reader = io::BufReader::new(stdout);
- for line in reader.lines() {
- match line {
- Ok(line) => {
- if sender.send(ClaudeStreamEvent::Line(line)).is_err() {
- return;
- }
- }
- Err(error) => {
- let _ = sender.send(ClaudeStreamEvent::Error(error));
- return;
- }
- }
- }
- let _ = sender.send(ClaudeStreamEvent::Eof);
- });
let mut capture = ClaudeStreamCapture::default();
- let first_event = receiver
- .recv_timeout(claude_initial_output_timeout())
- .map_err(|_| {
- cull_consult_unit(child, transient_unit);
- ConsultInvocationError::Stalled(format!(
- "Claude Code produced no stream output within {} ms; terminated stalled consult unit `{transient_unit}`",
- claude_initial_output_timeout().as_millis()
- ))
- })?;
- let first_event_finished = ingest_claude_stream_event(first_event, &mut capture, request)?;
- if first_event_finished {
- reader.join().map_err(|_| {
- ConsultInvocationError::Spawn(io::Error::other("Claude stdout reader panicked"))
- })?;
- return Ok(capture);
- }
- while let Ok(event) = receiver.recv() {
- if ingest_claude_stream_event(event, &mut capture, request)? {
- break;
+ let reader = io::BufReader::new(stdout);
+ for line in reader.lines() {
+ let line = line.map_err(ConsultInvocationError::Spawn)?;
+ let trimmed = line.trim();
+ if trimmed.is_empty() {
+ continue;
}
- }
- reader.join().map_err(|_| {
- ConsultInvocationError::Spawn(io::Error::other("Claude stdout reader panicked"))
- })?;
- Ok(capture)
-}
-
-fn ingest_claude_stream_event(
- event: ClaudeStreamEvent,
- capture: &mut ClaudeStreamCapture,
- request: &ConsultRequest,
-) -> Result<bool, ConsultInvocationError> {
- let line = match event {
- ClaudeStreamEvent::Line(line) => line,
- ClaudeStreamEvent::Eof => return Ok(true),
- ClaudeStreamEvent::Error(error) => return Err(ConsultInvocationError::Spawn(error)),
- };
- let trimmed = line.trim();
- if trimmed.is_empty() {
- return Ok(false);
- }
- capture.stdout.push_str(trimmed);
- capture.stdout.push('\n');
- let Ok(value) = serde_json::from_str::<Value>(trimmed) else {
- return Ok(false);
- };
- if capture.observed_session_id.is_none()
- && let Some(session_id) = stream_session_id(&value)
- {
- request
- .remember_context(Some(session_id.as_str()))
- .map_err(ConsultInvocationError::Spawn)?;
- capture.observed_session_id = Some(session_id);
- }
- if value.get("type").and_then(Value::as_str) == Some("result") {
- let envelope = serde_json::from_value::<ClaudeJsonEnvelope>(value).map_err(|error| {
- ConsultInvocationError::InvalidJson(format!("{error}; stream_line={trimmed}"))
- })?;
- if let Some(session_id) = envelope.session_id.as_deref() {
+ capture.stdout.push_str(trimmed);
+ capture.stdout.push('\n');
+ let Ok(value) = serde_json::from_str::<Value>(trimmed) else {
+ continue;
+ };
+ if capture.observed_session_id.is_none()
+ && let Some(session_id) = stream_session_id(&value)
+ {
request
- .remember_context(Some(session_id))
+ .remember_context(Some(session_id.as_str()))
.map_err(ConsultInvocationError::Spawn)?;
- capture.observed_session_id = Some(session_id.to_owned());
+ capture.observed_session_id = Some(session_id);
+ }
+ if value.get("type").and_then(Value::as_str) == Some("result") {
+ let envelope =
+ serde_json::from_value::<ClaudeJsonEnvelope>(value).map_err(|error| {
+ ConsultInvocationError::InvalidJson(format!("{error}; stream_line={trimmed}"))
+ })?;
+ if let Some(session_id) = envelope.session_id.as_deref() {
+ request
+ .remember_context(Some(session_id))
+ .map_err(ConsultInvocationError::Spawn)?;
+ capture.observed_session_id = Some(session_id.to_owned());
+ }
+ capture.final_envelope = Some(envelope);
}
- capture.final_envelope = Some(envelope);
}
- Ok(false)
-}
-
-fn claude_initial_output_timeout() -> Duration {
- std::env::var(CLAUDE_INITIAL_OUTPUT_TIMEOUT_ENV)
- .ok()
- .and_then(|value| value.parse::<u64>().ok())
- .map(Duration::from_millis)
- .unwrap_or(DEFAULT_CLAUDE_INITIAL_OUTPUT_TIMEOUT)
-}
-
-fn consult_transient_unit() -> String {
- format!("phone-opus-consult-{}", Uuid::new_v4().simple())
-}
-
-fn cull_consult_unit(child: &mut std::process::Child, transient_unit: &str) {
- let runtime_dir = caller_runtime_dir();
- let _ = Command::new("systemctl")
- .env("XDG_RUNTIME_DIR", runtime_dir.as_os_str())
- .env(
- "DBUS_SESSION_BUS_ADDRESS",
- caller_dbus_session_bus_address(runtime_dir.as_path()),
- )
- .arg("--user")
- .arg("stop")
- .arg(transient_unit)
- .stdout(Stdio::null())
- .stderr(Stdio::null())
- .status();
- let _ = child.kill();
- let _ = child.wait();
+ Ok(capture)
}
fn stream_session_id(value: &Value) -> Option<String> {