swarm repositories / source
aboutsummaryrefslogtreecommitdiff
path: root/crates/phone-opus/src/mcp
diff options
context:
space:
mode:
Diffstat (limited to 'crates/phone-opus/src/mcp')
-rw-r--r--crates/phone-opus/src/mcp/service.rs145
1 files changed, 33 insertions, 112 deletions
diff --git a/crates/phone-opus/src/mcp/service.rs b/crates/phone-opus/src/mcp/service.rs
index b92f92e..5a6ca91 100644
--- a/crates/phone-opus/src/mcp/service.rs
+++ b/crates/phone-opus/src/mcp/service.rs
@@ -1,6 +1,6 @@
use std::collections::{BTreeMap, BTreeSet};
use std::fs;
-use std::io::{self, BufRead, Read, Write};
+use std::io::{self, BufRead, Write};
#[cfg(unix)]
use std::os::unix::fs::symlink;
use std::path::{Path, PathBuf};
@@ -169,10 +169,18 @@ impl ConsultRequest {
self.session_plan.planned_session().display()
}
+ #[allow(
+ dead_code,
+ reason = "session metadata is retained internally but hidden from the public surface"
+ )]
fn context_mode(&self) -> &'static str {
self.session_plan.context_mode()
}
+ #[allow(
+ dead_code,
+ reason = "session metadata is retained internally but hidden from the public surface"
+ )]
fn reused_session_id(&self) -> Option<String> {
self.session_plan.reused_session_id()
}
@@ -683,13 +691,6 @@ struct ClaudeJsonEnvelope {
uuid: Option<String>,
}
-#[derive(Debug, Default)]
-struct ClaudeStreamCapture {
- stdout: String,
- observed_session_id: Option<String>,
- final_envelope: Option<ClaudeJsonEnvelope>,
-}
-
#[derive(Debug)]
struct ConsultResponse {
cwd: WorkingDirectory,
@@ -1445,9 +1446,8 @@ fn invoke_claude(request: &ConsultRequest) -> Result<ConsultResponse, ConsultInv
let _ = command
.arg(claude_binary())
.arg("-p")
- .arg("--verbose")
.arg("--output-format")
- .arg("stream-json")
+ .arg("json")
.arg("--strict-mcp-config")
.arg("--mcp-config")
.arg(EMPTY_MCP_CONFIG)
@@ -1465,63 +1465,27 @@ fn invoke_claude(request: &ConsultRequest) -> Result<ConsultResponse, ConsultInv
if let Some(session_id) = request.launch_session_id() {
let _ = command.arg("--session-id").arg(session_id);
}
- let mut child = command
+ let child = command
.arg(request.prompt.rendered())
.spawn()
.map_err(ConsultInvocationError::Spawn)?;
request
.remember_planned_context()
.map_err(ConsultInvocationError::Spawn)?;
- let stdout = child
- .stdout
- .take()
- .ok_or_else(|| ConsultInvocationError::Spawn(io::Error::other("missing Claude stdout")))?;
- let stderr = child
- .stderr
- .take()
- .ok_or_else(|| ConsultInvocationError::Spawn(io::Error::other("missing Claude stderr")))?;
- let stderr_reader = thread::spawn(move || -> io::Result<String> {
- let mut stderr = stderr;
- let mut buffer = String::new();
- let _ = stderr.read_to_string(&mut buffer)?;
- Ok(buffer)
- });
- let capture = capture_claude_stream(stdout, request)?;
- let status = child.wait().map_err(ConsultInvocationError::Spawn)?;
- let stderr = stderr_reader
- .join()
- .map_err(|_| {
- ConsultInvocationError::Spawn(io::Error::other("Claude stderr reader panicked"))
- })?
- .map_err(ConsultInvocationError::Spawn)?
- .trim()
- .to_owned();
- let stdout = capture.stdout.trim().to_owned();
- let envelope = match capture.final_envelope {
- Some(envelope) => envelope,
- None if !status.success() => {
- return Err(ConsultInvocationError::Downstream(downstream_message(
- status.code(),
- &stdout,
- &stderr,
- )));
- }
- None => {
- return Err(ConsultInvocationError::InvalidJson(format!(
- "missing Claude result envelope; stdout={stdout}; stderr={stderr}"
- )));
- }
- };
+ let output = child
+ .wait_with_output()
+ .map_err(ConsultInvocationError::Spawn)?;
+ let status = output.status;
+ let stdout = String::from_utf8_lossy(&output.stdout).trim().to_owned();
+ let stderr = String::from_utf8_lossy(&output.stderr).trim().to_owned();
+ let envelope = parse_claude_json_envelope(&stdout, &stderr, status.code())?;
if envelope.envelope_type != "result" {
- return Err(ConsultInvocationError::Downstream(format!(
+ return Err(ConsultInvocationError::InvalidJson(format!(
"unexpected Claude envelope type `{}`",
envelope.envelope_type
)));
}
- let observed_session_id = envelope
- .session_id
- .clone()
- .or(capture.observed_session_id.clone());
+ let observed_session_id = envelope.session_id.clone();
if !status.success() || envelope.is_error || envelope.subtype.as_deref() != Some("success") {
return Err(ConsultInvocationError::Downstream(
envelope
@@ -1561,63 +1525,20 @@ fn invoke_claude(request: &ConsultRequest) -> Result<ConsultResponse, ConsultInv
})
}
-fn capture_claude_stream(
- stdout: impl Read,
- request: &ConsultRequest,
-) -> Result<ClaudeStreamCapture, ConsultInvocationError> {
- let mut capture = ClaudeStreamCapture::default();
- let reader = io::BufReader::new(stdout);
- for line in reader.lines() {
- let line = line.map_err(ConsultInvocationError::Spawn)?;
- let trimmed = line.trim();
- if trimmed.is_empty() {
- continue;
- }
- capture.stdout.push_str(trimmed);
- capture.stdout.push('\n');
- let Ok(value) = serde_json::from_str::<Value>(trimmed) else {
- continue;
- };
- if capture.observed_session_id.is_none()
- && let Some(session_id) = stream_session_id(&value)
- {
- request
- .remember_context(Some(session_id.as_str()))
- .map_err(ConsultInvocationError::Spawn)?;
- capture.observed_session_id = Some(session_id);
- }
- if value.get("type").and_then(Value::as_str) == Some("result") {
- let envelope =
- serde_json::from_value::<ClaudeJsonEnvelope>(value).map_err(|error| {
- ConsultInvocationError::InvalidJson(format!("{error}; stream_line={trimmed}"))
- })?;
- if let Some(session_id) = envelope.session_id.as_deref() {
- request
- .remember_context(Some(session_id))
- .map_err(ConsultInvocationError::Spawn)?;
- capture.observed_session_id = Some(session_id.to_owned());
- }
- capture.final_envelope = Some(envelope);
+fn parse_claude_json_envelope(
+ stdout: &str,
+ stderr: &str,
+ status_code: Option<i32>,
+) -> Result<ClaudeJsonEnvelope, ConsultInvocationError> {
+ serde_json::from_str(stdout).map_err(|error| {
+ if status_code == Some(0) {
+ ConsultInvocationError::InvalidJson(format!(
+ "{error}; stdout={stdout}; stderr={stderr}"
+ ))
+ } else {
+ ConsultInvocationError::Downstream(downstream_message(status_code, stdout, stderr))
}
- }
- Ok(capture)
-}
-
-fn stream_session_id(value: &Value) -> Option<String> {
- match value {
- Value::Object(object) => object
- .iter()
- .find_map(|(key, value)| {
- ((key == "session_id") || (key == "sessionId"))
- .then_some(value)
- .and_then(Value::as_str)
- .and_then(SessionHandle::parse)
- .map(|session| session.display())
- })
- .or_else(|| object.values().find_map(stream_session_id)),
- Value::Array(array) => array.iter().find_map(stream_session_id),
- _ => None,
- }
+ })
}
fn downstream_message(status_code: Option<i32>, stdout: &str, stderr: &str) -> String {