swarm repositories / source
aboutsummaryrefslogtreecommitdiff
diff options
context:
space:
mode:
authormain <main@swarm.moe>2026-03-24 19:26:58 -0400
committermain <main@swarm.moe>2026-03-24 19:26:58 -0400
commit57db4dc94dbf571ac8a393f61549def5afaa0209 (patch)
treee625a2af169a7397c34339e6150fc7bee1f900a2
parent8b090c3d0daf8b336aab9074b0d8aa31a688e232 (diff)
downloadphone_opus-57db4dc94dbf571ac8a393f61549def5afaa0209.zip
Predeclare and stream consult session ids
-rw-r--r--README.md4
-rw-r--r--assets/codex-skills/phone-opus/SKILL.md4
-rw-r--r--crates/phone-opus/src/mcp/fault.rs11
-rw-r--r--crates/phone-opus/src/mcp/service.rs379
-rw-r--r--crates/phone-opus/tests/mcp_hardening.rs202
5 files changed, 503 insertions, 97 deletions
diff --git a/README.md b/README.md
index dfaba8c..c2aaefa 100644
--- a/README.md
+++ b/README.md
@@ -10,7 +10,7 @@ It exposes one blocking domain tool:
- by default, phone_opus reuses the remembered Claude context for the consulted `cwd`
- pass `fresh_context: true` to opt out and start a fresh Claude context for that `cwd`
- a fixed consult prefix is prepended before the caller-supplied prompt
- - downstream failures still surface any reusable context handle for that `cwd`, so a post-reset retry can stick to the same session
+ - fresh consults predeclare a UUID session id and stream-confirm it eagerly, so downstream failures still surface a reusable context handle for that `cwd`
The server keeps the public MCP session in a durable host, isolates the actual
Claude invocation in a disposable worker, and ships standard health and
@@ -31,6 +31,8 @@ Each `consult` call runs Claude Code with:
- a read-only built-in toolset:
- `Bash,Read,Grep,Glob,LS,WebFetch`
- `--dangerously-skip-permissions`
+- `--session-id <uuid>` on fresh consults, or `--resume <uuid>` when reusing remembered `cwd` context
+- `--output-format stream-json` so phone_opus can capture the init/result session metadata eagerly instead of waiting for the terminal result blob
- an external `systemd-run --user` sandbox instead of Claude's internal permission gate
- the filesystem stays globally read-only under `ProtectSystem=strict`
- `phone_opus` gives Claude a separate persistent home and XDG state under its own state root
diff --git a/assets/codex-skills/phone-opus/SKILL.md b/assets/codex-skills/phone-opus/SKILL.md
index cf913a5..953a71b 100644
--- a/assets/codex-skills/phone-opus/SKILL.md
+++ b/assets/codex-skills/phone-opus/SKILL.md
@@ -35,9 +35,9 @@ should be taken as authoritative or final. It is a pure consultant.
- Web search is disabled; keep Opus focused on local inspection, reasoning, and any direct web fetches that are truly necessary.
- Previous consult outputs can be found in `/tmp/phone_opus-consults`.
- For related follow-ups on the same repository, keep using the same `cwd`; phone-opus will reuse that remembered context by default, which is much cheaper than cold-starting Opus.
-- If a consult fails downstream, inspect the returned error metadata; phone-opus still surfaces any reusable context handle for that `cwd`.
+- Fresh consults predeclare a UUID session id and stream-confirm it eagerly; if a consult fails downstream, inspect the returned error metadata for the reusable handle.
- This surface is consultative only. Edit tools are unavailable.
-- The returned `session_id` is still surfaced for traceability, but context reuse is now automatic per `cwd` instead of caller-managed.
+- The returned `session_id` is a resumable handle, but context reuse is automatic per `cwd` instead of caller-managed.
## Example
diff --git a/crates/phone-opus/src/mcp/fault.rs b/crates/phone-opus/src/mcp/fault.rs
index 4d438e3..c2d4a6c 100644
--- a/crates/phone-opus/src/mcp/fault.rs
+++ b/crates/phone-opus/src/mcp/fault.rs
@@ -14,10 +14,11 @@ pub(crate) struct FaultContext {
pub(crate) struct ConsultFaultContext {
pub(crate) cwd: String,
pub(crate) context_mode: String,
+ pub(crate) planned_session_id: String,
#[serde(skip_serializing_if = "Option::is_none")]
pub(crate) reused_session_id: Option<String>,
#[serde(skip_serializing_if = "Option::is_none")]
- pub(crate) downstream_session_id: Option<String>,
+ pub(crate) observed_session_id: Option<String>,
#[serde(skip_serializing_if = "Option::is_none")]
pub(crate) resume_session_id: Option<String>,
#[serde(default, skip_serializing_if = "is_false")]
@@ -202,12 +203,16 @@ impl FaultRecord {
let mut fields: BTreeMap<String, String> = BTreeMap::from([
("cwd".to_owned(), consult.cwd.clone()),
("context_mode".to_owned(), consult.context_mode.clone()),
+ (
+ "planned_session".to_owned(),
+ consult.planned_session_id.clone(),
+ ),
]);
if let Some(session_id) = consult.reused_session_id.as_ref() {
let _ = fields.insert("reused_session".to_owned(), session_id.clone());
}
- if let Some(session_id) = consult.downstream_session_id.as_ref() {
- let _ = fields.insert("downstream_session".to_owned(), session_id.clone());
+ if let Some(session_id) = consult.observed_session_id.as_ref() {
+ let _ = fields.insert("observed_session".to_owned(), session_id.clone());
}
if let Some(session_id) = consult.resume_session_id.as_ref() {
let _ = fields.insert("resume_session".to_owned(), session_id.clone());
diff --git a/crates/phone-opus/src/mcp/service.rs b/crates/phone-opus/src/mcp/service.rs
index c5c2d66..39cc825 100644
--- a/crates/phone-opus/src/mcp/service.rs
+++ b/crates/phone-opus/src/mcp/service.rs
@@ -1,10 +1,11 @@
use std::collections::{BTreeMap, BTreeSet};
use std::fs;
-use std::io::{self, BufRead, Write};
+use std::io::{self, BufRead, Read, Write};
#[cfg(unix)]
use std::os::unix::fs::symlink;
use std::path::{Path, PathBuf};
use std::process::{Command, Stdio};
+use std::thread;
use std::time::{Duration, Instant, SystemTime, UNIX_EPOCH};
use dirs::{home_dir, state_dir};
@@ -145,7 +146,7 @@ struct ConsultRequest {
cwd: WorkingDirectory,
context_key: ConsultContextKey,
fresh_context: bool,
- session: Option<SessionHandle>,
+ session_plan: ConsultSessionPlan,
}
impl ConsultRequest {
@@ -154,34 +155,70 @@ impl ConsultRequest {
let cwd = WorkingDirectory::resolve(args.cwd)?;
let context_key = ConsultContextKey::from_cwd(&cwd);
let fresh_context = args.fresh_context.unwrap_or(false);
+ let session_plan = if fresh_context {
+ ConsultSessionPlan::fresh()
+ } else {
+ load_consult_context(&context_key)
+ .map_err(|source| ConsultRequestError::ContextIndex { source })?
+ .and_then(ConsultSessionPlan::from_stored)
+ .unwrap_or_else(ConsultSessionPlan::fresh)
+ };
Ok(Self {
prompt,
cwd,
- session: if fresh_context {
- None
- } else {
- load_consult_context(&context_key)
- .map_err(|source| ConsultRequestError::ContextIndex { source })?
- },
context_key,
fresh_context,
+ session_plan,
})
}
fn context_mode(&self) -> &'static str {
- if self.session.is_some() {
- "reused"
- } else {
- "fresh"
- }
+ self.session_plan.context_mode()
}
fn reused_session_id(&self) -> Option<String> {
- self.session.as_ref().map(SessionHandle::display)
+ self.session_plan.reused_session_id()
+ }
+
+ fn planned_session_id(&self) -> String {
+ self.session_plan.planned_session().display()
+ }
+
+ fn launch_resume_session(&self) -> Option<String> {
+ self.session_plan
+ .resume_session()
+ .map(SessionHandle::display)
+ }
+
+ fn launch_session_id(&self) -> Option<String> {
+ match self.session_plan {
+ ConsultSessionPlan::Start { .. } => Some(self.planned_session_id()),
+ ConsultSessionPlan::Resume(_) => None,
+ }
}
fn remember_context(&self, session_id: Option<&str>) -> io::Result<()> {
- remember_consult_context(&self.context_key, session_id)
+ confirm_consult_context(
+ &self.context_key,
+ session_id,
+ self.session_plan.planned_session(),
+ )
+ }
+
+ fn remember_planned_context(&self) -> io::Result<()> {
+ if let ConsultSessionPlan::Start { session, .. } = &self.session_plan {
+ remember_planned_consult_context(&self.context_key, session)
+ } else {
+ Ok(())
+ }
+ }
+
+ fn current_context_session_id(&self) -> Option<String> {
+ load_consult_context(&self.context_key)
+ .ok()
+ .flatten()
+ .and_then(ConsultSessionPlan::from_stored)
+ .map(|plan| plan.planned_session().display())
}
#[allow(dead_code, reason = "background submission is parked but not exposed")]
@@ -261,6 +298,10 @@ impl WorkingDirectory {
struct SessionHandle(Uuid);
impl SessionHandle {
+ fn fresh() -> Self {
+ Self(Uuid::new_v4())
+ }
+
fn parse(raw: &str) -> Option<Self> {
Uuid::parse_str(raw).ok().map(Self)
}
@@ -286,32 +327,110 @@ impl ConsultContextKey {
#[derive(Debug, Clone, Deserialize, Serialize)]
struct StoredConsultContext {
session_id: String,
+ #[serde(default = "default_consult_context_state")]
+ state: StoredConsultContextState,
updated_unix_ms: u64,
}
+#[derive(Debug, Clone, Copy, Default, Deserialize, Eq, PartialEq, Serialize)]
+#[serde(rename_all = "snake_case")]
+enum StoredConsultContextState {
+ Planned,
+ #[default]
+ Confirmed,
+}
+
+const fn default_consult_context_state() -> StoredConsultContextState {
+ StoredConsultContextState::Confirmed
+}
+
#[derive(Debug, Default, Deserialize, Serialize)]
struct ConsultContextIndex {
by_cwd: BTreeMap<String, StoredConsultContext>,
}
impl ConsultContextIndex {
- fn session_for(&self, key: &ConsultContextKey) -> Option<SessionHandle> {
- self.by_cwd
- .get(key.as_str())
- .and_then(|entry| SessionHandle::parse(entry.session_id.as_str()))
+ fn context_for(&self, key: &ConsultContextKey) -> Option<StoredConsultContext> {
+ self.by_cwd.get(key.as_str()).cloned()
}
- fn remember(&mut self, key: &ConsultContextKey, session: &SessionHandle) {
+ fn remember(
+ &mut self,
+ key: &ConsultContextKey,
+ session: &SessionHandle,
+ state: StoredConsultContextState,
+ ) {
let _ = self.by_cwd.insert(
key.as_str().to_owned(),
StoredConsultContext {
session_id: session.display(),
+ state,
updated_unix_ms: unix_ms_now(),
},
);
}
}
+#[derive(Debug, Clone)]
+enum ConsultSessionPlan {
+ Start {
+ session: SessionHandle,
+ reused: bool,
+ },
+ Resume(SessionHandle),
+}
+
+impl ConsultSessionPlan {
+ fn fresh() -> Self {
+ Self::Start {
+ session: SessionHandle::fresh(),
+ reused: false,
+ }
+ }
+
+ fn from_stored(context: StoredConsultContext) -> Option<Self> {
+ let session = SessionHandle::parse(context.session_id.as_str())?;
+ Some(match context.state {
+ StoredConsultContextState::Planned => Self::Start {
+ session,
+ reused: true,
+ },
+ StoredConsultContextState::Confirmed => Self::Resume(session),
+ })
+ }
+
+ fn planned_session(&self) -> &SessionHandle {
+ match self {
+ Self::Start { session, .. } | Self::Resume(session) => session,
+ }
+ }
+
+ fn resume_session(&self) -> Option<&SessionHandle> {
+ match self {
+ Self::Resume(session) => Some(session),
+ Self::Start { .. } => None,
+ }
+ }
+
+ fn context_mode(&self) -> &'static str {
+ match self {
+ Self::Start { reused: false, .. } => "fresh",
+ Self::Start { reused: true, .. } | Self::Resume(_) => "reused",
+ }
+ }
+
+ fn reused_session_id(&self) -> Option<String> {
+ match self {
+ Self::Start {
+ reused: true,
+ session,
+ }
+ | Self::Resume(session) => Some(session.display()),
+ Self::Start { reused: false, .. } => None,
+ }
+ }
+}
+
#[derive(Debug, Clone, Deserialize, Eq, PartialEq, Serialize)]
struct BackgroundConsultRequest {
prompt: String,
@@ -565,18 +684,27 @@ struct ClaudeJsonEnvelope {
uuid: Option<String>,
}
+#[derive(Debug, Default)]
+struct ClaudeStreamCapture {
+ stdout: String,
+ observed_session_id: Option<String>,
+ final_envelope: Option<ClaudeJsonEnvelope>,
+}
+
#[derive(Debug)]
struct ConsultResponse {
cwd: WorkingDirectory,
result: String,
persisted_output_path: PersistedConsultPath,
context_mode: &'static str,
+ planned_session_id: String,
reused_session_id: Option<String>,
duration_ms: u64,
duration_api_ms: Option<u64>,
num_turns: u64,
stop_reason: Option<String>,
session_id: Option<String>,
+ observed_session_id: Option<String>,
total_cost_usd: Option<f64>,
usage: Option<Value>,
model_usage: Option<Value>,
@@ -870,10 +998,15 @@ fn consult_fault_context(request: &ConsultRequest, error: &ConsultInvocationErro
| ConsultInvocationError::Downstream(detail) => Some(detail.as_str()),
};
let reused_session_id = request.reused_session_id();
- let downstream_session_id = detail.and_then(downstream_session_id);
- let resume_session_id = downstream_session_id
+ let planned_session_id = request.planned_session_id();
+ let observed_session_id = detail
+ .and_then(downstream_session_id)
+ .clone()
+ .or_else(|| request.current_context_session_id());
+ let resume_session_id = observed_session_id
.clone()
- .or_else(|| reused_session_id.clone());
+ .or_else(|| reused_session_id.clone())
+ .or_else(|| Some(planned_session_id.clone()));
let quota_reset_hint = detail.and_then(quota_reset_hint);
let quota_limited = quota_reset_hint.is_some();
let retry_hint = consult_retry_hint(quota_limited, resume_session_id.as_deref());
@@ -881,8 +1014,9 @@ fn consult_fault_context(request: &ConsultRequest, error: &ConsultInvocationErro
consult: Some(ConsultFaultContext {
cwd: request.cwd.display(),
context_mode: request.context_mode().to_owned(),
+ planned_session_id,
reused_session_id,
- downstream_session_id,
+ observed_session_id,
resume_session_id,
quota_limited,
quota_reset_hint,
@@ -1147,7 +1281,7 @@ fn wait_for_background_consult(
}
let remaining_ms = request.config.timeout_ms.saturating_sub(waited_ms);
let sleep_ms = remaining_ms.min(request.config.poll_interval_ms);
- std::thread::sleep(Duration::from_millis(sleep_ms));
+ thread::sleep(Duration::from_millis(sleep_ms));
}
}
@@ -1332,7 +1466,7 @@ fn invoke_claude(request: &ConsultRequest) -> Result<ConsultResponse, ConsultInv
.arg(claude_binary())
.arg("-p")
.arg("--output-format")
- .arg("json")
+ .arg("stream-json")
.arg("--strict-mcp-config")
.arg("--mcp-config")
.arg(EMPTY_MCP_CONFIG)
@@ -1344,28 +1478,59 @@ fn invoke_claude(request: &ConsultRequest) -> Result<ConsultResponse, ConsultInv
.arg(CLAUDE_EFFORT)
.arg("--tools")
.arg(CLAUDE_TOOLSET)
- .arg("--dangerously-skip-permissions");
- if let Some(session) = request.session.as_ref() {
- let _ = command.arg("--resume").arg(session.display());
+ .arg("--dangerously-skip-permissions")
+ .stdout(Stdio::piped())
+ .stderr(Stdio::piped());
+ if let Some(session_id) = request.launch_session_id() {
+ let _ = command.arg("--session-id").arg(session_id);
}
- let output = command
+ if let Some(session_id) = request.launch_resume_session() {
+ let _ = command.arg("--resume").arg(session_id);
+ }
+ let mut child = command
.arg(request.prompt.rendered())
- .output()
+ .spawn()
.map_err(ConsultInvocationError::Spawn)?;
- let stdout = String::from_utf8_lossy(&output.stdout).trim().to_owned();
- let stderr = String::from_utf8_lossy(&output.stderr).trim().to_owned();
- let envelope = match serde_json::from_slice::<ClaudeJsonEnvelope>(&output.stdout) {
- Ok(envelope) => envelope,
- Err(_error) if !output.status.success() => {
+ request
+ .remember_planned_context()
+ .map_err(ConsultInvocationError::Spawn)?;
+ let stdout = child
+ .stdout
+ .take()
+ .ok_or_else(|| ConsultInvocationError::Spawn(io::Error::other("missing Claude stdout")))?;
+ let stderr = child
+ .stderr
+ .take()
+ .ok_or_else(|| ConsultInvocationError::Spawn(io::Error::other("missing Claude stderr")))?;
+ let stderr_reader = thread::spawn(move || -> io::Result<String> {
+ let mut stderr = stderr;
+ let mut buffer = String::new();
+ let _ = stderr.read_to_string(&mut buffer)?;
+ Ok(buffer)
+ });
+ let capture = capture_claude_stream(stdout, request)?;
+ let status = child.wait().map_err(ConsultInvocationError::Spawn)?;
+ let stderr = stderr_reader
+ .join()
+ .map_err(|_| {
+ ConsultInvocationError::Spawn(io::Error::other("Claude stderr reader panicked"))
+ })?
+ .map_err(ConsultInvocationError::Spawn)?
+ .trim()
+ .to_owned();
+ let stdout = capture.stdout.trim().to_owned();
+ let envelope = match capture.final_envelope {
+ Some(envelope) => envelope,
+ None if !status.success() => {
return Err(ConsultInvocationError::Downstream(downstream_message(
- output.status.code(),
+ status.code(),
&stdout,
&stderr,
)));
}
- Err(error) => {
+ None => {
return Err(ConsultInvocationError::InvalidJson(format!(
- "{error}; stdout={stdout}; stderr={stderr}"
+ "missing Claude result envelope; stdout={stdout}; stderr={stderr}"
)));
}
};
@@ -1375,34 +1540,46 @@ fn invoke_claude(request: &ConsultRequest) -> Result<ConsultResponse, ConsultInv
envelope.envelope_type
)));
}
- if !output.status.success()
- || envelope.is_error
- || envelope.subtype.as_deref() != Some("success")
- {
+ let observed_session_id = envelope
+ .session_id
+ .clone()
+ .or(capture.observed_session_id.clone());
+ if !status.success() || envelope.is_error || envelope.subtype.as_deref() != Some("success") {
return Err(ConsultInvocationError::Downstream(
envelope
.result
.filter(|value| !value.trim().is_empty())
- .unwrap_or_else(|| downstream_message(output.status.code(), &stdout, &stderr)),
+ .unwrap_or_else(|| downstream_message(status.code(), &stdout, &stderr)),
));
}
let result = envelope.result.clone().unwrap_or_default();
+ let canonical_session_id = observed_session_id
+ .clone()
+ .or_else(|| Some(request.planned_session_id()));
request
- .remember_context(envelope.session_id.as_deref())
- .map_err(ConsultInvocationError::Spawn)?;
- let persisted_output_path = persist_consult_output(request, &result, &envelope)
+ .remember_context(canonical_session_id.as_deref())
.map_err(ConsultInvocationError::Spawn)?;
+ let persisted_output_path = persist_consult_output(
+ request,
+ &result,
+ &envelope,
+ canonical_session_id.as_deref(),
+ observed_session_id.as_deref(),
+ )
+ .map_err(ConsultInvocationError::Spawn)?;
Ok(ConsultResponse {
cwd: request.cwd.clone(),
result,
persisted_output_path,
context_mode: request.context_mode(),
+ planned_session_id: request.planned_session_id(),
reused_session_id: request.reused_session_id(),
duration_ms: envelope.duration_ms.unwrap_or(0),
duration_api_ms: envelope.duration_api_ms,
num_turns: envelope.num_turns.unwrap_or(0),
stop_reason: envelope.stop_reason,
- session_id: envelope.session_id,
+ session_id: canonical_session_id,
+ observed_session_id,
total_cost_usd: envelope.total_cost_usd,
usage: envelope.usage,
model_usage: envelope.model_usage,
@@ -1412,6 +1589,65 @@ fn invoke_claude(request: &ConsultRequest) -> Result<ConsultResponse, ConsultInv
})
}
+fn capture_claude_stream(
+ stdout: impl Read,
+ request: &ConsultRequest,
+) -> Result<ClaudeStreamCapture, ConsultInvocationError> {
+ let mut capture = ClaudeStreamCapture::default();
+ let reader = io::BufReader::new(stdout);
+ for line in reader.lines() {
+ let line = line.map_err(ConsultInvocationError::Spawn)?;
+ let trimmed = line.trim();
+ if trimmed.is_empty() {
+ continue;
+ }
+ capture.stdout.push_str(trimmed);
+ capture.stdout.push('\n');
+ let Ok(value) = serde_json::from_str::<Value>(trimmed) else {
+ continue;
+ };
+ if capture.observed_session_id.is_none()
+ && let Some(session_id) = stream_session_id(&value)
+ {
+ request
+ .remember_context(Some(session_id.as_str()))
+ .map_err(ConsultInvocationError::Spawn)?;
+ capture.observed_session_id = Some(session_id);
+ }
+ if value.get("type").and_then(Value::as_str) == Some("result") {
+ let envelope =
+ serde_json::from_value::<ClaudeJsonEnvelope>(value).map_err(|error| {
+ ConsultInvocationError::InvalidJson(format!("{error}; stream_line={trimmed}"))
+ })?;
+ if let Some(session_id) = envelope.session_id.as_deref() {
+ request
+ .remember_context(Some(session_id))
+ .map_err(ConsultInvocationError::Spawn)?;
+ capture.observed_session_id = Some(session_id.to_owned());
+ }
+ capture.final_envelope = Some(envelope);
+ }
+ }
+ Ok(capture)
+}
+
+fn stream_session_id(value: &Value) -> Option<String> {
+ match value {
+ Value::Object(object) => object
+ .iter()
+ .find_map(|(key, value)| {
+ ((key == "session_id") || (key == "sessionId"))
+ .then_some(value)
+ .and_then(Value::as_str)
+ .and_then(SessionHandle::parse)
+ .map(|session| session.display())
+ })
+ .or_else(|| object.values().find_map(stream_session_id)),
+ Value::Array(array) => array.iter().find_map(stream_session_id),
+ _ => None,
+ }
+}
+
fn downstream_message(status_code: Option<i32>, stdout: &str, stderr: &str) -> String {
if !stderr.is_empty() {
return stderr.to_owned();
@@ -1460,16 +1696,29 @@ fn load_consult_context_index() -> io::Result<ConsultContextIndex> {
}
}
-fn load_consult_context(key: &ConsultContextKey) -> io::Result<Option<SessionHandle>> {
- Ok(load_consult_context_index()?.session_for(key))
+fn load_consult_context(key: &ConsultContextKey) -> io::Result<Option<StoredConsultContext>> {
+ Ok(load_consult_context_index()?.context_for(key))
}
-fn remember_consult_context(key: &ConsultContextKey, session_id: Option<&str>) -> io::Result<()> {
- let Some(session_id) = session_id.and_then(SessionHandle::parse) else {
- return Ok(());
- };
+fn remember_planned_consult_context(
+ key: &ConsultContextKey,
+ session_id: &SessionHandle,
+) -> io::Result<()> {
let mut index = load_consult_context_index()?;
- index.remember(key, &session_id);
+ index.remember(key, session_id, StoredConsultContextState::Planned);
+ write_json_file(consult_context_index_path()?.as_path(), &index)
+}
+
+fn confirm_consult_context(
+ key: &ConsultContextKey,
+ observed_session_id: Option<&str>,
+ fallback_session_id: &SessionHandle,
+) -> io::Result<()> {
+ let session_id = observed_session_id
+ .and_then(SessionHandle::parse)
+ .unwrap_or_else(|| fallback_session_id.clone());
+ let mut index = load_consult_context_index()?;
+ index.remember(key, &session_id, StoredConsultContextState::Confirmed);
write_json_file(consult_context_index_path()?.as_path(), &index)
}
@@ -1537,8 +1786,10 @@ fn persist_consult_output(
request: &ConsultRequest,
result: &str,
envelope: &ClaudeJsonEnvelope,
+ session_id: Option<&str>,
+ observed_session_id: Option<&str>,
) -> io::Result<PersistedConsultPath> {
- let path = PersistedConsultPath::new(request, envelope.session_id.as_deref())?;
+ let path = PersistedConsultPath::new(request, session_id)?;
let saved_at = OffsetDateTime::now_utc()
.format(&Rfc3339)
.map_err(|error| io::Error::other(error.to_string()))?;
@@ -1551,6 +1802,7 @@ fn persist_consult_output(
"prompt_prefix": CLAUDE_CONSULT_PREFIX,
"effective_prompt": request.prompt.rendered(),
"context_mode": request.context_mode(),
+ "planned_session_id": request.planned_session_id(),
"reused_session_id": request.reused_session_id(),
"response": result,
"model": model_name(envelope.model_usage.as_ref()),
@@ -1558,7 +1810,8 @@ fn persist_consult_output(
"duration_api_ms": envelope.duration_api_ms,
"num_turns": envelope.num_turns.unwrap_or(0),
"stop_reason": envelope.stop_reason,
- "session_id": envelope.session_id,
+ "session_id": session_id,
+ "observed_session_id": observed_session_id,
"total_cost_usd": envelope.total_cost_usd,
"usage": envelope.usage,
"model_usage": envelope.model_usage,
@@ -1650,6 +1903,7 @@ fn consult_output(
"cwd": response.cwd.display(),
"persisted_output_path": response.persisted_output_path.display(),
"context_mode": response.context_mode,
+ "planned_session_id": response.planned_session_id,
"reused_session_id": response.reused_session_id,
"prompt_prefix_injected": true,
"model": response.model_name(),
@@ -1657,6 +1911,7 @@ fn consult_output(
"num_turns": response.num_turns,
"stop_reason": response.stop_reason,
"session_id": response.session_id,
+ "observed_session_id": response.observed_session_id,
"total_cost_usd": response.total_cost_usd,
"permission_denial_count": response.permission_denials.len(),
});
@@ -1668,12 +1923,14 @@ fn consult_output(
"prompt_prefix": CLAUDE_CONSULT_PREFIX,
"effective_prompt": request.prompt.rendered(),
"context_mode": response.context_mode,
+ "planned_session_id": response.planned_session_id,
"reused_session_id": response.reused_session_id,
"duration_ms": response.duration_ms,
"duration_api_ms": response.duration_api_ms,
"num_turns": response.num_turns,
"stop_reason": response.stop_reason,
"session_id": response.session_id,
+ "observed_session_id": response.observed_session_id,
"total_cost_usd": response.total_cost_usd,
"usage": response.usage,
"model_usage": response.model_usage,
@@ -1712,9 +1969,13 @@ fn concise_text(_request: &ConsultRequest, response: &ConsultResponse) -> String
let mut lines = vec![status.join(" ")];
lines.push(format!("cwd: {}", response.cwd.display()));
+ lines.push(format!("planned_session: {}", response.planned_session_id));
if let Some(session_id) = response.reused_session_id.as_deref() {
lines.push(format!("reused_session: {session_id}"));
}
+ if let Some(session_id) = response.observed_session_id.as_deref() {
+ lines.push(format!("observed_session: {session_id}"));
+ }
if let Some(session_id) = response.session_id.as_deref() {
lines.push(format!("session: {session_id}"));
}
@@ -1740,11 +2001,15 @@ fn full_text(_request: &ConsultRequest, response: &ConsultResponse) -> String {
response.context_mode, response.num_turns
),
format!("cwd: {}", response.cwd.display()),
+ format!("planned_session: {}", response.planned_session_id),
format!("duration: {}", render_duration_ms(response.duration_ms)),
];
if let Some(session_id) = response.reused_session_id.as_deref() {
lines.push(format!("reused_session: {session_id}"));
}
+ if let Some(session_id) = response.observed_session_id.as_deref() {
+ lines.push(format!("observed_session: {session_id}"));
+ }
if let Some(duration_api_ms) = response.duration_api_ms {
lines.push(format!(
"api_duration: {}",
diff --git a/crates/phone-opus/tests/mcp_hardening.rs b/crates/phone-opus/tests/mcp_hardening.rs
index e9a664b..0d53c33 100644
--- a/crates/phone-opus/tests/mcp_hardening.rs
+++ b/crates/phone-opus/tests/mcp_hardening.rs
@@ -280,37 +280,62 @@ fn seed_caller_claude_home(home: &Path) -> TestResult {
Ok(())
}
-fn write_fake_claude_stdout(path: &Path, result: &str, session_id: &str, uuid: &str) -> TestResult {
+fn write_fake_claude_stream_success(
+ path: &Path,
+ result: &str,
+ session_id: &str,
+ uuid: &str,
+) -> TestResult {
+ let payload = [
+ serde_json::to_string(&json!({
+ "type": "system",
+ "subtype": "init",
+ "session_id": session_id,
+ }))?,
+ serde_json::to_string(&json!({
+ "type": "result",
+ "subtype": "success",
+ "is_error": false,
+ "duration_ms": 1234,
+ "duration_api_ms": 1200,
+ "num_turns": 2,
+ "result": result,
+ "stop_reason": "end_turn",
+ "session_id": session_id,
+ "total_cost_usd": 0.125,
+ "usage": {
+ "input_tokens": 10,
+ "output_tokens": 5
+ },
+ "modelUsage": {
+ "claude-opus-4-6": {
+ "inputTokens": 10,
+ "outputTokens": 5
+ }
+ },
+ "permission_denials": [],
+ "fast_mode_state": "off",
+ "uuid": uuid
+ }))?,
+ ]
+ .join("\n");
+ must(fs::write(path, format!("{payload}\n")), "write fake stdout")
+}
+
+fn write_fake_claude_stream_init(path: &Path, session_id: &str) -> TestResult {
must(
fs::write(
path,
- serde_json::to_string(&json!({
- "type": "result",
- "subtype": "success",
- "is_error": false,
- "duration_ms": 1234,
- "duration_api_ms": 1200,
- "num_turns": 2,
- "result": result,
- "stop_reason": "end_turn",
- "session_id": session_id,
- "total_cost_usd": 0.125,
- "usage": {
- "input_tokens": 10,
- "output_tokens": 5
- },
- "modelUsage": {
- "claude-opus-4-6": {
- "inputTokens": 10,
- "outputTokens": 5
- }
- },
- "permission_denials": [],
- "fast_mode_state": "off",
- "uuid": uuid
- }))?,
+ format!(
+ "{}\n",
+ serde_json::to_string(&json!({
+ "type": "system",
+ "subtype": "init",
+ "session_id": session_id,
+ }))?
+ ),
),
- "write fake stdout",
+ "write fake init stream",
)
}
@@ -392,7 +417,7 @@ fn consult_reuses_context_per_cwd_by_default_and_fresh_context_opts_out() -> Tes
let fresh_session = "dbd3b6c2-4757-4b45-a8f0-f3d877e1a13f";
let sibling_session = "d9a9a472-a091-4268-a7dd-9f31cf61f87e";
write_fake_claude_script(&fake_claude)?;
- write_fake_claude_stdout(&stdout_file, "oracle", remembered_session, "uuid-123")?;
+ write_fake_claude_stream_success(&stdout_file, "oracle", remembered_session, "uuid-123")?;
let claude_bin = fake_claude.display().to_string();
let stdout_path = stdout_file.display().to_string();
@@ -448,16 +473,31 @@ fn consult_reuses_context_per_cwd_by_default_and_fresh_context_opts_out() -> Tes
tool_content(&consult)["context_mode"].as_str(),
Some("fresh")
);
+ assert!(
+ tool_content(&consult)["planned_session_id"]
+ .as_str()
+ .is_some_and(|value| !value.is_empty())
+ );
assert!(tool_content(&consult)["reused_session_id"].is_null());
assert_eq!(
+ tool_content(&consult)["observed_session_id"].as_str(),
+ Some(remembered_session)
+ );
+ assert_eq!(
tool_content(&consult)["session_id"].as_str(),
Some(remembered_session)
);
let first_args = must(fs::read_to_string(&args_file), "read first fake args file")?;
+ assert!(first_args.contains("--session-id"));
+ assert!(
+ tool_content(&consult)["planned_session_id"]
+ .as_str()
+ .is_some_and(|value| first_args.contains(value))
+ );
assert!(!first_args.contains("--resume"));
assert!(!first_args.contains("not-a-uuid"));
- write_fake_claude_stdout(
+ write_fake_claude_stream_success(
&stdout_file,
"oracle reused",
remembered_session,
@@ -488,7 +528,7 @@ fn consult_reuses_context_per_cwd_by_default_and_fresh_context_opts_out() -> Tes
assert!(reused_args.contains("--resume"));
assert!(reused_args.contains(remembered_session));
- write_fake_claude_stdout(&stdout_file, "oracle fresh", fresh_session, "uuid-125")?;
+ write_fake_claude_stream_success(&stdout_file, "oracle fresh", fresh_session, "uuid-125")?;
let fresh = harness.call_tool(
5,
"consult",
@@ -508,7 +548,7 @@ fn consult_reuses_context_per_cwd_by_default_and_fresh_context_opts_out() -> Tes
let fresh_args = must(fs::read_to_string(&args_file), "read fresh fake args file")?;
assert!(!fresh_args.contains("--resume"));
- write_fake_claude_stdout(
+ write_fake_claude_stream_success(
&stdout_file,
"oracle after fresh",
fresh_session,
@@ -538,7 +578,7 @@ fn consult_reuses_context_per_cwd_by_default_and_fresh_context_opts_out() -> Tes
assert!(after_fresh_args.contains("--resume"));
assert!(after_fresh_args.contains(fresh_session));
- write_fake_claude_stdout(&stdout_file, "oracle sibling", sibling_session, "uuid-127")?;
+ write_fake_claude_stream_success(&stdout_file, "oracle sibling", sibling_session, "uuid-127")?;
let sibling = harness.call_tool(
7,
"consult",
@@ -592,7 +632,7 @@ fn consult_reuses_context_per_cwd_by_default_and_fresh_context_opts_out() -> Tes
let lines = args.lines().collect::<Vec<_>>();
assert!(lines.contains(&"-p"));
assert!(lines.contains(&"--output-format"));
- assert!(lines.contains(&"json"));
+ assert!(lines.contains(&"stream-json"));
assert!(lines.contains(&"--strict-mcp-config"));
assert!(lines.contains(&"--mcp-config"));
assert!(lines.contains(&"{\"mcpServers\":{}}"));
@@ -605,6 +645,7 @@ fn consult_reuses_context_per_cwd_by_default_and_fresh_context_opts_out() -> Tes
assert!(lines.contains(&"--tools"));
assert!(lines.contains(&"Bash,Read,Grep,Glob,LS,WebFetch"));
assert!(lines.contains(&"--dangerously-skip-permissions"));
+ assert!(lines.contains(&"--session-id"));
assert!(!lines.contains(&"--permission-mode"));
assert!(!lines.contains(&"dontAsk"));
assert!(!lines.contains(&"--resume"));
@@ -811,7 +852,7 @@ fn quota_failures_surface_resume_context_for_same_cwd() -> TestResult {
let stdout_file = root.join("stdout.json");
let remembered_session = "84b9d462-5af9-4a4e-8e44-379a8d0c46d7";
write_fake_claude_script(&fake_claude)?;
- write_fake_claude_stdout(&stdout_file, "ok", remembered_session, "uuid-remembered")?;
+ write_fake_claude_stream_success(&stdout_file, "ok", remembered_session, "uuid-remembered")?;
let claude_bin = fake_claude.display().to_string();
let stdout_path = stdout_file.display().to_string();
@@ -875,10 +916,18 @@ fn quota_failures_surface_resume_context_for_same_cwd() -> TestResult {
Some("reused")
);
assert_eq!(
+ tool_content(&failed)["context"]["consult"]["planned_session_id"].as_str(),
+ Some(remembered_session)
+ );
+ assert_eq!(
tool_content(&failed)["context"]["consult"]["reused_session_id"].as_str(),
Some(remembered_session)
);
assert_eq!(
+ tool_content(&failed)["context"]["consult"]["observed_session_id"].as_str(),
+ Some(remembered_session)
+ );
+ assert_eq!(
tool_content(&failed)["context"]["consult"]["resume_session_id"].as_str(),
Some(remembered_session)
);
@@ -910,6 +959,91 @@ fn quota_failures_surface_resume_context_for_same_cwd() -> TestResult {
}
#[test]
+fn fresh_failures_capture_streamed_session_ids_eagerly() -> TestResult {
+ let root = temp_root("consult_fresh_stream_failure")?;
+ let state_home = root.join("state-home");
+ let sandbox = root.join("sandbox");
+ let caller_home = root.join("caller-home");
+ let fake_claude = root.join("claude");
+ let stdout_file = root.join("stdout.json");
+ let args_file = root.join("args.txt");
+ let init_session = "550e8400-e29b-41d4-a716-446655440000";
+ must(fs::create_dir_all(&state_home), "create state home")?;
+ must(fs::create_dir_all(&sandbox), "create sandbox")?;
+ must(fs::create_dir_all(&caller_home), "create caller home")?;
+ seed_caller_claude_home(&caller_home)?;
+ write_fake_claude_script(&fake_claude)?;
+ write_fake_claude_stream_init(&stdout_file, init_session)?;
+
+ let claude_bin = fake_claude.display().to_string();
+ let stdout_path = stdout_file.display().to_string();
+ let args_path = args_file.display().to_string();
+ let caller_home_path = caller_home.display().to_string();
+ let env = [
+ ("HOME", caller_home_path.as_str()),
+ ("PHONE_OPUS_CLAUDE_BIN", claude_bin.as_str()),
+ ("PHONE_OPUS_TEST_STDOUT_FILE", stdout_path.as_str()),
+ ("PHONE_OPUS_TEST_ARGS_FILE", args_path.as_str()),
+ ("PHONE_OPUS_TEST_EXIT_CODE", "17"),
+ (
+ "PHONE_OPUS_TEST_STDERR",
+ "You've hit your limit ยท resets 9pm (America/New_York)",
+ ),
+ ];
+ let mut harness = McpHarness::spawn(&state_home, &env)?;
+ let _ = harness.initialize()?;
+ harness.notify_initialized()?;
+
+ let failed = harness.call_tool(
+ 3,
+ "consult",
+ json!({
+ "prompt": "fresh expensive audit",
+ "cwd": sandbox.display().to_string()
+ }),
+ )?;
+ assert_tool_error(&failed);
+ assert_eq!(
+ tool_content(&failed)["context"]["consult"]["context_mode"].as_str(),
+ Some("fresh")
+ );
+ assert_eq!(
+ tool_content(&failed)["context"]["consult"]["observed_session_id"].as_str(),
+ Some(init_session)
+ );
+ assert_eq!(
+ tool_content(&failed)["context"]["consult"]["resume_session_id"].as_str(),
+ Some(init_session)
+ );
+ assert_eq!(
+ tool_content(&failed)["context"]["consult"]["quota_reset_hint"].as_str(),
+ Some("9pm (America/New_York)")
+ );
+ let planned_session = must_some(
+ tool_content(&failed)["context"]["consult"]["planned_session_id"]
+ .as_str()
+ .map(str::to_owned),
+ "planned session id on failure",
+ )?;
+ let args = must(fs::read_to_string(&args_file), "read fresh failure args")?;
+ assert!(args.contains("--session-id"));
+ assert!(args.contains(&planned_session));
+ assert!(!args.contains("--resume"));
+ assert!(
+ failed["result"]["content"]
+ .as_array()
+ .into_iter()
+ .flatten()
+ .filter_map(|entry| entry["text"].as_str())
+ .any(|text| {
+ text.contains("observed_session: 550e8400-e29b-41d4-a716-446655440000")
+ && text.contains("resume_session: 550e8400-e29b-41d4-a716-446655440000")
+ })
+ );
+ Ok(())
+}
+
+#[test]
fn consult_never_replays_after_worker_transport_failure() -> TestResult {
let root = temp_root("consult_no_replay")?;
let state_home = root.join("state-home");