swarm repositories / source
aboutsummaryrefslogtreecommitdiff
path: root/crates/phone-opus/src/mcp/service.rs
diff options
context:
space:
mode:
Diffstat (limited to 'crates/phone-opus/src/mcp/service.rs')
-rw-r--r--crates/phone-opus/src/mcp/service.rs564
1 files changed, 559 insertions, 5 deletions
diff --git a/crates/phone-opus/src/mcp/service.rs b/crates/phone-opus/src/mcp/service.rs
index d42c0a0..5b38c4b 100644
--- a/crates/phone-opus/src/mcp/service.rs
+++ b/crates/phone-opus/src/mcp/service.rs
@@ -1,10 +1,13 @@
use std::collections::BTreeMap;
+use std::fs;
use std::io::{self, BufRead, Write};
use std::path::{Path, PathBuf};
-use std::process::Command;
+use std::process::{Command, Stdio};
+use std::time::{Duration, SystemTime, UNIX_EPOCH};
+use dirs::{home_dir, state_dir};
use libmcp::{Generation, SurfaceKind};
-use serde::Deserialize;
+use serde::{Deserialize, Serialize};
use serde_json::{Value, json};
use thiserror::Error;
use uuid::Uuid;
@@ -47,6 +50,11 @@ pub(crate) fn run_worker(generation: u64) -> Result<(), Box<dyn std::error::Erro
Ok(())
}
+pub(crate) fn run_background_consult(job_file: PathBuf) -> Result<(), Box<dyn std::error::Error>> {
+ execute_background_consult(job_file)?;
+ Ok(())
+}
+
struct WorkerService {
generation: Generation,
}
@@ -76,9 +84,19 @@ impl WorkerService {
let args = deserialize::<ConsultArgs>(arguments, &operation, self.generation)?;
let request = ConsultRequest::parse(args)
.map_err(|error| invalid_consult_request(self.generation, &operation, error))?;
- let response = invoke_claude(&request)
- .map_err(|error| consult_fault(self.generation, &operation, error))?;
- consult_output(&request, &response, self.generation, &operation)?
+ match request.mode() {
+ ConsultMode::Sync => {
+ let response = invoke_claude(&request)
+ .map_err(|error| consult_fault(self.generation, &operation, error))?;
+ consult_output(&request, &response, self.generation, &operation)?
+ }
+ ConsultMode::Background => submit_background_consult(
+ &request,
+ self.generation,
+ FaultStage::Worker,
+ &operation,
+ )?,
+ }
}
other => {
return Err(FaultRecord::invalid_input(
@@ -105,6 +123,17 @@ struct ConsultArgs {
cwd: Option<String>,
max_turns: Option<u64>,
session_id: Option<String>,
+ background: Option<bool>,
+}
+
+#[derive(Debug, Deserialize)]
+struct ConsultJobArgs {
+ job_id: String,
+}
+
+#[derive(Debug, Default, Deserialize)]
+struct ConsultJobsArgs {
+ limit: Option<u64>,
}
#[derive(Debug, Clone)]
@@ -113,6 +142,7 @@ struct ConsultRequest {
cwd: WorkingDirectory,
max_turns: Option<TurnLimit>,
session: Option<SessionHandle>,
+ mode: ConsultMode,
}
impl ConsultRequest {
@@ -122,9 +152,14 @@ impl ConsultRequest {
cwd: WorkingDirectory::resolve(args.cwd)?,
max_turns: args.max_turns.map(TurnLimit::parse).transpose()?,
session: args.session_id.map(SessionHandle::parse).transpose()?,
+ mode: ConsultMode::from_background(args.background),
})
}
+ fn mode(&self) -> ConsultMode {
+ self.mode
+ }
+
fn session_mode(&self) -> &'static str {
if self.session.is_some() {
"resumed"
@@ -136,6 +171,38 @@ impl ConsultRequest {
fn requested_session_id(&self) -> Option<String> {
self.session.as_ref().map(SessionHandle::display)
}
+
+ fn background_request(&self) -> BackgroundConsultRequest {
+ BackgroundConsultRequest {
+ prompt: self.prompt.as_str().to_owned(),
+ cwd: self.cwd.display(),
+ max_turns: self.max_turns.map(TurnLimit::get),
+ session_id: self.requested_session_id(),
+ }
+ }
+}
+
+#[derive(Debug, Clone, Copy, Eq, PartialEq)]
+enum ConsultMode {
+ Sync,
+ Background,
+}
+
+impl ConsultMode {
+ fn from_background(raw: Option<bool>) -> Self {
+ if raw.unwrap_or(false) {
+ Self::Background
+ } else {
+ Self::Sync
+ }
+ }
+
+ fn as_str(self) -> &'static str {
+ match self {
+ Self::Sync => "sync",
+ Self::Background => "background",
+ }
+ }
}
#[derive(Debug, Clone)]
@@ -232,6 +299,172 @@ impl SessionHandle {
}
}
+#[derive(Debug, Clone, Deserialize, Eq, PartialEq, Serialize)]
+struct BackgroundConsultRequest {
+ prompt: String,
+ cwd: String,
+ max_turns: Option<u64>,
+ session_id: Option<String>,
+}
+
+impl BackgroundConsultRequest {
+ fn into_consult_request(self) -> Result<ConsultRequest, ConsultRequestError> {
+ ConsultRequest::parse(ConsultArgs {
+ prompt: self.prompt,
+ cwd: Some(self.cwd),
+ max_turns: self.max_turns,
+ session_id: self.session_id,
+ background: Some(false),
+ })
+ }
+}
+
+#[derive(Debug, Clone, Copy, Deserialize, Eq, PartialEq, Serialize)]
+#[serde(rename_all = "snake_case")]
+enum BackgroundConsultStatus {
+ Queued,
+ Running,
+ Succeeded,
+ Failed,
+}
+
+impl BackgroundConsultStatus {
+ fn done(self) -> bool {
+ matches!(self, Self::Succeeded | Self::Failed)
+ }
+
+ fn success(self) -> bool {
+ matches!(self, Self::Succeeded)
+ }
+
+ fn failed(self) -> bool {
+ matches!(self, Self::Failed)
+ }
+}
+
+#[derive(Debug, Clone, Deserialize, Eq, Ord, PartialEq, PartialOrd, Serialize)]
+#[serde(transparent)]
+struct BackgroundConsultJobId(Uuid);
+
+impl BackgroundConsultJobId {
+ fn new() -> Self {
+ Self(Uuid::new_v4())
+ }
+
+ fn parse(raw: String) -> Result<Self, ConsultRequestError> {
+ Uuid::parse_str(&raw)
+ .map(Self)
+ .map_err(|_| ConsultRequestError::InvalidJobHandle(raw))
+ }
+
+ fn display(&self) -> String {
+ self.0.to_string()
+ }
+}
+
+#[derive(Debug, Clone, Deserialize, Eq, PartialEq, Serialize)]
+struct BackgroundConsultFailure {
+ class: String,
+ detail: String,
+}
+
+#[derive(Debug, Clone, Deserialize, PartialEq, Serialize)]
+struct BackgroundConsultResponseRecord {
+ cwd: String,
+ response: String,
+ duration_ms: u64,
+ duration_api_ms: Option<u64>,
+ num_turns: u64,
+ stop_reason: Option<String>,
+ session_id: Option<String>,
+ total_cost_usd: Option<f64>,
+ usage: Option<Value>,
+ model_usage: Option<Value>,
+ permission_denials: Vec<Value>,
+ fast_mode_state: Option<String>,
+ uuid: Option<String>,
+}
+
+impl BackgroundConsultResponseRecord {
+ fn from_response(response: ConsultResponse) -> Self {
+ Self {
+ cwd: response.cwd.display(),
+ response: response.result,
+ duration_ms: response.duration_ms,
+ duration_api_ms: response.duration_api_ms,
+ num_turns: response.num_turns,
+ stop_reason: response.stop_reason,
+ session_id: response.session_id,
+ total_cost_usd: response.total_cost_usd,
+ usage: response.usage,
+ model_usage: response.model_usage,
+ permission_denials: response.permission_denials,
+ fast_mode_state: response.fast_mode_state,
+ uuid: response.uuid,
+ }
+ }
+
+ fn model_name(&self) -> Option<String> {
+ let Value::Object(models) = self.model_usage.as_ref()? else {
+ return None;
+ };
+ models.keys().next().cloned()
+ }
+}
+
+#[derive(Debug, Clone, Deserialize, PartialEq, Serialize)]
+struct BackgroundConsultJobRecord {
+ job_id: BackgroundConsultJobId,
+ status: BackgroundConsultStatus,
+ created_unix_ms: u64,
+ updated_unix_ms: u64,
+ started_unix_ms: Option<u64>,
+ finished_unix_ms: Option<u64>,
+ runner_pid: Option<u32>,
+ request: BackgroundConsultRequest,
+ prompt_prefix_injected: bool,
+ result: Option<BackgroundConsultResponseRecord>,
+ failure: Option<BackgroundConsultFailure>,
+}
+
+impl BackgroundConsultJobRecord {
+ fn new(job_id: BackgroundConsultJobId, request: BackgroundConsultRequest) -> Self {
+ let now = unix_ms_now();
+ Self {
+ job_id,
+ status: BackgroundConsultStatus::Queued,
+ created_unix_ms: now,
+ updated_unix_ms: now,
+ started_unix_ms: None,
+ finished_unix_ms: None,
+ runner_pid: None,
+ request,
+ prompt_prefix_injected: true,
+ result: None,
+ failure: None,
+ }
+ }
+
+ fn summary(&self) -> Value {
+ json!({
+ "job_id": self.job_id.display(),
+ "status": self.status,
+ "done": self.status.done(),
+ "succeeded": self.status.success(),
+ "failed": self.status.failed(),
+ "created_unix_ms": self.created_unix_ms,
+ "updated_unix_ms": self.updated_unix_ms,
+ "started_unix_ms": self.started_unix_ms,
+ "finished_unix_ms": self.finished_unix_ms,
+ "runner_pid": self.runner_pid,
+ "cwd": self.request.cwd,
+ "max_turns": self.request.max_turns,
+ "requested_session_id": self.request.session_id,
+ "prompt_prefix_injected": self.prompt_prefix_injected,
+ })
+ }
+}
+
#[derive(Debug, Error)]
enum ConsultRequestError {
#[error("prompt must not be empty")]
@@ -246,6 +479,8 @@ enum ConsultRequestError {
InvalidTurnLimit,
#[error("session_id must be a valid UUID, got `{0}`")]
InvalidSessionHandle(String),
+ #[error("job_id must be a valid UUID, got `{0}`")]
+ InvalidJobHandle(String),
}
#[derive(Debug, Error)]
@@ -348,6 +583,323 @@ fn consult_fault(
}
}
+pub(crate) fn consult_job_tool_output(
+ arguments: Value,
+ generation: Generation,
+ stage: FaultStage,
+ operation: &str,
+) -> Result<ToolOutput, FaultRecord> {
+ let args = deserialize::<ConsultJobArgs>(arguments, operation, generation)?;
+ let job_id = BackgroundConsultJobId::parse(args.job_id)
+ .map_err(|error| invalid_consult_request(generation, operation, error))?;
+ let record = load_background_consult_job(&job_id).map_err(|error| {
+ FaultRecord::downstream(generation, stage, operation, error.to_string())
+ })?;
+ background_job_tool_output(&record, generation, stage, operation)
+}
+
+pub(crate) fn consult_jobs_tool_output(
+ arguments: Value,
+ generation: Generation,
+ stage: FaultStage,
+ operation: &str,
+) -> Result<ToolOutput, FaultRecord> {
+ let args = deserialize::<ConsultJobsArgs>(arguments, operation, generation)?;
+ let limit = args.limit.unwrap_or(10).clamp(1, 50);
+ let jobs = load_recent_background_consult_jobs(limit as usize).map_err(|error| {
+ FaultRecord::downstream(generation, stage, operation, error.to_string())
+ })?;
+ let concise = json!({
+ "jobs": jobs.iter().map(BackgroundConsultJobRecord::summary).collect::<Vec<_>>(),
+ "count": jobs.len(),
+ });
+ let full = json!({
+ "jobs": jobs,
+ "count": jobs.len(),
+ });
+ let lines = if jobs.is_empty() {
+ "no background consult jobs".to_owned()
+ } else {
+ jobs.iter()
+ .map(|job| {
+ format!(
+ "{} status={} cwd={}",
+ job.job_id.display(),
+ serde_json::to_string(&job.status).unwrap_or_else(|_| "\"unknown\"".to_owned()),
+ job.request.cwd
+ )
+ })
+ .collect::<Vec<_>>()
+ .join("\n")
+ };
+ fallback_detailed_tool_output(
+ &concise,
+ &full,
+ lines,
+ None,
+ SurfaceKind::Read,
+ generation,
+ stage,
+ operation,
+ )
+}
+
+fn submit_background_consult(
+ request: &ConsultRequest,
+ generation: Generation,
+ stage: FaultStage,
+ operation: &str,
+) -> Result<ToolOutput, FaultRecord> {
+ let job_id = BackgroundConsultJobId::new();
+ let mut record = BackgroundConsultJobRecord::new(job_id, request.background_request());
+ persist_background_consult_job(&record)
+ .map_err(|error| FaultRecord::process(generation, stage, operation, error.to_string()))?;
+
+ let executable = std::env::current_exe()
+ .map_err(|error| FaultRecord::process(generation, stage, operation, error.to_string()))?;
+ let job_file = background_consult_job_path(&record.job_id)
+ .map_err(|error| FaultRecord::process(generation, stage, operation, error.to_string()))?;
+ let child = Command::new(executable)
+ .arg("mcp")
+ .arg("background-consult")
+ .arg("--job-file")
+ .arg(&job_file)
+ .stdin(Stdio::null())
+ .stdout(Stdio::null())
+ .stderr(Stdio::null())
+ .spawn()
+ .map_err(|error| FaultRecord::process(generation, stage, operation, error.to_string()))?;
+
+ record.status = BackgroundConsultStatus::Running;
+ record.started_unix_ms = Some(unix_ms_now());
+ record.updated_unix_ms = unix_ms_now();
+ record.runner_pid = Some(child.id());
+ persist_background_consult_job(&record)
+ .map_err(|error| FaultRecord::process(generation, stage, operation, error.to_string()))?;
+
+ let concise = json!({
+ "mode": request.mode().as_str(),
+ "job_id": record.job_id.display(),
+ "status": record.status,
+ "done": false,
+ "requested_session_id": request.requested_session_id(),
+ "session_mode": request.session_mode(),
+ "prompt_prefix_injected": true,
+ "follow_up_tools": ["consult_job", "consult_jobs"],
+ });
+ let full = json!({
+ "mode": request.mode().as_str(),
+ "job_id": record.job_id.display(),
+ "status": record.status,
+ "done": false,
+ "requested_session_id": request.requested_session_id(),
+ "session_mode": request.session_mode(),
+ "prompt_prefix_injected": true,
+ "prompt": request.prompt.as_str(),
+ "effective_prompt": request.prompt.rendered(),
+ "cwd": request.cwd.display(),
+ "max_turns": request.max_turns.map(TurnLimit::get),
+ "follow_up_tools": ["consult_job", "consult_jobs"],
+ });
+ fallback_detailed_tool_output(
+ &concise,
+ &full,
+ format!(
+ "background consult submitted job={} status=running",
+ record.job_id.display()
+ ),
+ None,
+ SurfaceKind::Read,
+ generation,
+ stage,
+ operation,
+ )
+}
+
+fn execute_background_consult(job_file: PathBuf) -> io::Result<()> {
+ let mut record = read_json_file::<BackgroundConsultJobRecord>(&job_file)?;
+ record.status = BackgroundConsultStatus::Running;
+ let _ = record.runner_pid.get_or_insert(std::process::id());
+ let _ = record.started_unix_ms.get_or_insert_with(unix_ms_now);
+ record.updated_unix_ms = unix_ms_now();
+ persist_background_consult_job_to_path(&job_file, &record)?;
+
+ let request = record
+ .request
+ .clone()
+ .into_consult_request()
+ .map_err(|error| io::Error::other(error.to_string()))?;
+ match invoke_claude(&request) {
+ Ok(response) => {
+ record.status = BackgroundConsultStatus::Succeeded;
+ record.result = Some(BackgroundConsultResponseRecord::from_response(response));
+ record.failure = None;
+ }
+ Err(error) => {
+ record.status = BackgroundConsultStatus::Failed;
+ record.result = None;
+ record.failure = Some(background_failure(error));
+ }
+ }
+ record.finished_unix_ms = Some(unix_ms_now());
+ record.updated_unix_ms = unix_ms_now();
+ record.runner_pid = None;
+ persist_background_consult_job_to_path(&job_file, &record)
+}
+
+fn background_failure(error: ConsultInvocationError) -> BackgroundConsultFailure {
+ match error {
+ ConsultInvocationError::Spawn(source) => BackgroundConsultFailure {
+ class: "process".to_owned(),
+ detail: source.to_string(),
+ },
+ ConsultInvocationError::InvalidJson(detail)
+ | ConsultInvocationError::Downstream(detail) => BackgroundConsultFailure {
+ class: "downstream".to_owned(),
+ detail,
+ },
+ }
+}
+
+fn background_job_tool_output(
+ record: &BackgroundConsultJobRecord,
+ generation: Generation,
+ stage: FaultStage,
+ operation: &str,
+) -> Result<ToolOutput, FaultRecord> {
+ let concise = json!({
+ "job_id": record.job_id.display(),
+ "status": record.status,
+ "done": record.status.done(),
+ "succeeded": record.status.success(),
+ "failed": record.status.failed(),
+ "created_unix_ms": record.created_unix_ms,
+ "updated_unix_ms": record.updated_unix_ms,
+ "started_unix_ms": record.started_unix_ms,
+ "finished_unix_ms": record.finished_unix_ms,
+ "runner_pid": record.runner_pid,
+ "cwd": record.request.cwd,
+ "requested_session_id": record.request.session_id,
+ "prompt_prefix_injected": record.prompt_prefix_injected,
+ "result": record.result.as_ref().map(|result| json!({
+ "response": result.response,
+ "duration_ms": result.duration_ms,
+ "num_turns": result.num_turns,
+ "session_id": result.session_id,
+ "model": result.model_name(),
+ })),
+ "failure": record.failure,
+ });
+ let full = json!({
+ "job": record,
+ });
+ let mut lines = vec![format!(
+ "job={} status={:?}",
+ record.job_id.display(),
+ record.status
+ )];
+ if let Some(result) = record.result.as_ref() {
+ lines.push(format!(
+ "result ready model={} turns={} duration={}",
+ result.model_name().unwrap_or_else(|| "unknown".to_owned()),
+ result.num_turns,
+ render_duration_ms(result.duration_ms)
+ ));
+ lines.push("response:".to_owned());
+ lines.push(result.response.clone());
+ }
+ if let Some(failure) = record.failure.as_ref() {
+ lines.push(format!("failure={} {}", failure.class, failure.detail));
+ }
+ fallback_detailed_tool_output(
+ &concise,
+ &full,
+ lines.join("\n"),
+ None,
+ SurfaceKind::Read,
+ generation,
+ stage,
+ operation,
+ )
+}
+
+fn background_consult_job_root() -> io::Result<PathBuf> {
+ let root = state_dir()
+ .map(|root| root.join("phone_opus"))
+ .or_else(|| home_dir().map(|home| home.join(".local").join("state").join("phone_opus")))
+ .ok_or_else(|| {
+ io::Error::new(
+ io::ErrorKind::NotFound,
+ "failed to resolve phone_opus state root",
+ )
+ })?;
+ let path = root.join("mcp").join("consult_jobs");
+ fs::create_dir_all(&path)?;
+ Ok(path)
+}
+
+fn background_consult_job_path(job_id: &BackgroundConsultJobId) -> io::Result<PathBuf> {
+ Ok(background_consult_job_root()?.join(format!("{}.json", job_id.display())))
+}
+
+fn persist_background_consult_job(record: &BackgroundConsultJobRecord) -> io::Result<()> {
+ let path = background_consult_job_path(&record.job_id)?;
+ persist_background_consult_job_to_path(&path, record)
+}
+
+fn persist_background_consult_job_to_path(
+ path: &Path,
+ record: &BackgroundConsultJobRecord,
+) -> io::Result<()> {
+ write_json_file(path, record)
+}
+
+fn load_background_consult_job(
+ job_id: &BackgroundConsultJobId,
+) -> io::Result<BackgroundConsultJobRecord> {
+ read_json_file(background_consult_job_path(job_id)?.as_path())
+}
+
+fn load_recent_background_consult_jobs(
+ limit: usize,
+) -> io::Result<Vec<BackgroundConsultJobRecord>> {
+ let mut jobs = fs::read_dir(background_consult_job_root()?)?
+ .filter_map(Result::ok)
+ .map(|entry| entry.path())
+ .filter(|path| path.extension().and_then(|ext| ext.to_str()) == Some("json"))
+ .filter_map(|path| read_json_file::<BackgroundConsultJobRecord>(&path).ok())
+ .collect::<Vec<_>>();
+ jobs.sort_by(|left, right| {
+ right
+ .updated_unix_ms
+ .cmp(&left.updated_unix_ms)
+ .then_with(|| left.job_id.cmp(&right.job_id))
+ });
+ jobs.truncate(limit);
+ Ok(jobs)
+}
+
+fn read_json_file<T: for<'de> Deserialize<'de>>(path: &Path) -> io::Result<T> {
+ let bytes = fs::read(path)?;
+ serde_json::from_slice(&bytes).map_err(|error| io::Error::other(error.to_string()))
+}
+
+fn write_json_file<T: Serialize>(path: &Path, value: &T) -> io::Result<()> {
+ let payload =
+ serde_json::to_vec_pretty(value).map_err(|error| io::Error::other(error.to_string()))?;
+ let temp_path = path.with_extension("json.tmp");
+ fs::write(&temp_path, payload)?;
+ fs::rename(temp_path, path)?;
+ Ok(())
+}
+
+fn unix_ms_now() -> u64 {
+ let duration = SystemTime::now()
+ .duration_since(UNIX_EPOCH)
+ .unwrap_or(Duration::ZERO);
+ u64::try_from(duration.as_millis()).unwrap_or(u64::MAX)
+}
+
fn invoke_claude(request: &ConsultRequest) -> Result<ConsultResponse, ConsultInvocationError> {
let mut command = Command::new(claude_binary());
let _ = command
@@ -452,6 +1004,7 @@ fn consult_output(
operation: &str,
) -> Result<ToolOutput, FaultRecord> {
let concise = json!({
+ "mode": request.mode().as_str(),
"response": response.result,
"cwd": response.cwd.display(),
"session_mode": request.session_mode(),
@@ -466,6 +1019,7 @@ fn consult_output(
"permission_denial_count": response.permission_denials.len(),
});
let full = json!({
+ "mode": request.mode().as_str(),
"response": response.result,
"cwd": response.cwd.display(),
"prompt": request.prompt.as_str(),