From d986442e8e4bc2d716c9d63159a1cfa7b1e6ed76 Mon Sep 17 00:00:00 2001 From: main Date: Sun, 22 Mar 2026 22:20:17 -0400 Subject: Bootstrap consultative Claude Code MCP --- .gitignore | 2 + Cargo.lock | 809 ++++++++++++++++++++++++++++++ Cargo.toml | 116 +++++ README.md | 43 ++ check.py | 177 +++++++ clippy.toml | 6 + crates/phone-opus/Cargo.toml | 25 + crates/phone-opus/src/main.rs | 51 ++ crates/phone-opus/src/mcp/catalog.rs | 107 ++++ crates/phone-opus/src/mcp/fault.rs | 214 ++++++++ crates/phone-opus/src/mcp/host/binary.rs | 41 ++ crates/phone-opus/src/mcp/host/mod.rs | 3 + crates/phone-opus/src/mcp/host/process.rs | 223 ++++++++ crates/phone-opus/src/mcp/host/runtime.rs | 779 ++++++++++++++++++++++++++++ crates/phone-opus/src/mcp/mod.rs | 10 + crates/phone-opus/src/mcp/output.rs | 195 +++++++ crates/phone-opus/src/mcp/protocol.rs | 74 +++ crates/phone-opus/src/mcp/service.rs | 550 ++++++++++++++++++++ crates/phone-opus/src/mcp/telemetry.rs | 228 +++++++++ crates/phone-opus/tests/mcp_hardening.rs | 422 ++++++++++++++++ rust-toolchain.toml | 5 + 21 files changed, 4080 insertions(+) create mode 100644 .gitignore create mode 100644 Cargo.lock create mode 100644 Cargo.toml create mode 100644 README.md create mode 100644 check.py create mode 100644 clippy.toml create mode 100644 crates/phone-opus/Cargo.toml create mode 100644 crates/phone-opus/src/main.rs create mode 100644 crates/phone-opus/src/mcp/catalog.rs create mode 100644 crates/phone-opus/src/mcp/fault.rs create mode 100644 crates/phone-opus/src/mcp/host/binary.rs create mode 100644 crates/phone-opus/src/mcp/host/mod.rs create mode 100644 crates/phone-opus/src/mcp/host/process.rs create mode 100644 crates/phone-opus/src/mcp/host/runtime.rs create mode 100644 crates/phone-opus/src/mcp/mod.rs create mode 100644 crates/phone-opus/src/mcp/output.rs create mode 100644 crates/phone-opus/src/mcp/protocol.rs create mode 100644 crates/phone-opus/src/mcp/service.rs create mode 100644 crates/phone-opus/src/mcp/telemetry.rs create mode 100644 crates/phone-opus/tests/mcp_hardening.rs create mode 100644 rust-toolchain.toml diff --git a/.gitignore b/.gitignore new file mode 100644 index 0000000..54466f5 --- /dev/null +++ b/.gitignore @@ -0,0 +1,2 @@ +/target + diff --git a/Cargo.lock b/Cargo.lock new file mode 100644 index 0000000..f87b4e0 --- /dev/null +++ b/Cargo.lock @@ -0,0 +1,809 @@ +# This file is automatically @generated by Cargo. +# It is not intended for manual editing. +version = 4 + +[[package]] +name = "anstream" +version = "1.0.0" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "824a212faf96e9acacdbd09febd34438f8f711fb84e09a8916013cd7815ca28d" +dependencies = [ + "anstyle", + "anstyle-parse", + "anstyle-query", + "anstyle-wincon", + "colorchoice", + "is_terminal_polyfill", + "utf8parse", +] + +[[package]] +name = "anstyle" +version = "1.0.14" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "940b3a0ca603d1eade50a4846a2afffd5ef57a9feac2c0e2ec2e14f9ead76000" + +[[package]] +name = "anstyle-parse" +version = "1.0.0" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "52ce7f38b242319f7cabaa6813055467063ecdc9d355bbb4ce0c68908cd8130e" +dependencies = [ + "utf8parse", +] + +[[package]] +name = "anstyle-query" +version = "1.1.5" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "40c48f72fd53cd289104fc64099abca73db4166ad86ea0b4341abe65af83dadc" +dependencies = [ + "windows-sys", +] + +[[package]] +name = "anstyle-wincon" +version = "3.0.11" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "291e6a250ff86cd4a820112fb8898808a366d8f9f58ce16d1f538353ad55747d" +dependencies = [ + "anstyle", + "once_cell_polyfill", + "windows-sys", +] + +[[package]] +name = "bytes" +version = "1.11.1" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "1e748733b7cbc798e1434b6ac524f0c1ff2ab456fe201501e6497c8417a4fc33" + +[[package]] +name = "cfg-if" +version = "1.0.4" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "9330f8b2ff13f34540b44e946ef35111825727b38d33286ef986142615121801" + +[[package]] +name = "clap" +version = "4.6.0" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "b193af5b67834b676abd72466a96c1024e6a6ad978a1f484bd90b85c94041351" +dependencies = [ + "clap_builder", + "clap_derive", +] + +[[package]] +name = "clap_builder" +version = "4.6.0" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "714a53001bf66416adb0e2ef5ac857140e7dc3a0c48fb28b2f10762fc4b5069f" +dependencies = [ + "anstream", + "anstyle", + "clap_lex", + "strsim", +] + +[[package]] +name = "clap_derive" +version = "4.6.0" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "1110bd8a634a1ab8cb04345d8d878267d57c3cf1b38d91b71af6686408bbca6a" +dependencies = [ + "heck", + "proc-macro2", + "quote", + "syn", +] + +[[package]] +name = "clap_lex" +version = "1.1.0" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "c8d4a3bb8b1e0c1050499d1815f5ab16d04f0959b233085fb31653fbfc9d98f9" + +[[package]] +name = "colorchoice" +version = "1.0.5" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "1d07550c9036bf2ae0c684c4297d503f838287c83c53686d05370d0e139ae570" + +[[package]] +name = "deranged" +version = "0.5.8" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "7cd812cc2bc1d69d4764bd80df88b4317eaef9e773c75226407d9bc0876b211c" +dependencies = [ + "powerfmt", +] + +[[package]] +name = "dirs" +version = "6.0.0" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "c3e8aa94d75141228480295a7d0e7feb620b1a5ad9f12bc40be62411e38cce4e" +dependencies = [ + "dirs-sys", +] + +[[package]] +name = "dirs-sys" +version = "0.5.0" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "e01a3366d27ee9890022452ee61b2b63a67e6f13f58900b651ff5665f0bb1fab" +dependencies = [ + "libc", + "option-ext", + "redox_users", + "windows-sys", +] + +[[package]] +name = "displaydoc" +version = "0.2.5" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "97369cbbc041bc366949bc74d34658d6cda5621039731c6310521892a3a20ae0" +dependencies = [ + "proc-macro2", + "quote", + "syn", +] + +[[package]] +name = "dyn-clone" +version = "1.0.20" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "d0881ea181b1df73ff77ffaaf9c7544ecc11e82fba9b5f27b262a3c73a332555" + +[[package]] +name = "form_urlencoded" +version = "1.2.2" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "cb4cb245038516f5f85277875cdaa4f7d2c9a0fa0468de06ed190163b1581fcf" +dependencies = [ + "percent-encoding", +] + +[[package]] +name = "getrandom" +version = "0.2.17" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "ff2abc00be7fca6ebc474524697ae276ad847ad0a6b3faa4bcb027e9a4614ad0" +dependencies = [ + "cfg-if", + "libc", + "wasi", +] + +[[package]] +name = "heck" +version = "0.5.0" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "2304e00983f87ffb38b55b444b5e3b60a884b5d30c0fca7d82fe33449bbe55ea" + +[[package]] +name = "icu_collections" +version = "2.1.1" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "4c6b649701667bbe825c3b7e6388cb521c23d88644678e83c0c4d0a621a34b43" +dependencies = [ + "displaydoc", + "potential_utf", + "yoke", + "zerofrom", + "zerovec", +] + +[[package]] +name = "icu_locale_core" +version = "2.1.1" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "edba7861004dd3714265b4db54a3c390e880ab658fec5f7db895fae2046b5bb6" +dependencies = [ + "displaydoc", + "litemap", + "tinystr", + "writeable", + "zerovec", +] + +[[package]] +name = "icu_normalizer" +version = "2.1.1" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "5f6c8828b67bf8908d82127b2054ea1b4427ff0230ee9141c54251934ab1b599" +dependencies = [ + "icu_collections", + "icu_normalizer_data", + "icu_properties", + "icu_provider", + "smallvec", + "zerovec", +] + +[[package]] +name = "icu_normalizer_data" +version = "2.1.1" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "7aedcccd01fc5fe81e6b489c15b247b8b0690feb23304303a9e560f37efc560a" + +[[package]] +name = "icu_properties" +version = "2.1.2" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "020bfc02fe870ec3a66d93e677ccca0562506e5872c650f893269e08615d74ec" +dependencies = [ + "icu_collections", + "icu_locale_core", + "icu_properties_data", + "icu_provider", + "zerotrie", + "zerovec", +] + +[[package]] +name = "icu_properties_data" +version = "2.1.2" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "616c294cf8d725c6afcd8f55abc17c56464ef6211f9ed59cccffe534129c77af" + +[[package]] +name = "icu_provider" +version = "2.1.1" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "85962cf0ce02e1e0a629cc34e7ca3e373ce20dda4c4d7294bbd0bf1fdb59e614" +dependencies = [ + "displaydoc", + "icu_locale_core", + "writeable", + "yoke", + "zerofrom", + "zerotrie", + "zerovec", +] + +[[package]] +name = "idna" +version = "1.1.0" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "3b0875f23caa03898994f6ddc501886a45c7d3d62d04d2d90788d47be1b1e4de" +dependencies = [ + "idna_adapter", + "smallvec", + "utf8_iter", +] + +[[package]] +name = "idna_adapter" +version = "1.2.1" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "3acae9609540aa318d1bc588455225fb2085b9ed0c4f6bd0d9d5bcd86f1a0344" +dependencies = [ + "icu_normalizer", + "icu_properties", +] + +[[package]] +name = "is_terminal_polyfill" +version = "1.70.2" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "a6cb138bb79a146c1bd460005623e142ef0181e3d0219cb493e02f7d08a35695" + +[[package]] +name = "itoa" +version = "1.0.18" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "8f42a60cbdf9a97f5d2305f08a87dc4e09308d1276d28c869c684d7777685682" + +[[package]] +name = "libc" +version = "0.2.183" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "b5b646652bf6661599e1da8901b3b9522896f01e736bad5f723fe7a3a27f899d" + +[[package]] +name = "libmcp" +version = "1.1.0" +source = "git+https://git.swarm.moe/libmcp.git?rev=e325cd23f19378f543981071673c1d03be438fa5#e325cd23f19378f543981071673c1d03be438fa5" +dependencies = [ + "libmcp-derive", + "schemars", + "serde", + "serde_json", + "thiserror", + "time", + "tokio", + "url", +] + +[[package]] +name = "libmcp-derive" +version = "1.1.0" +source = "git+https://git.swarm.moe/libmcp.git?rev=e325cd23f19378f543981071673c1d03be438fa5#e325cd23f19378f543981071673c1d03be438fa5" +dependencies = [ + "proc-macro2", + "quote", + "syn", +] + +[[package]] +name = "libmcp-testkit" +version = "1.1.0" +source = "git+https://git.swarm.moe/libmcp.git?rev=e325cd23f19378f543981071673c1d03be438fa5#e325cd23f19378f543981071673c1d03be438fa5" +dependencies = [ + "libmcp", + "serde", + "serde_json", +] + +[[package]] +name = "libredox" +version = "0.1.14" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "1744e39d1d6a9948f4f388969627434e31128196de472883b39f148769bfe30a" +dependencies = [ + "libc", +] + +[[package]] +name = "litemap" +version = "0.8.1" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "6373607a59f0be73a39b6fe456b8192fcc3585f602af20751600e974dd455e77" + +[[package]] +name = "memchr" +version = "2.8.0" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "f8ca58f447f06ed17d5fc4043ce1b10dd205e060fb3ce5b979b8ed8e59ff3f79" + +[[package]] +name = "num-conv" +version = "0.2.0" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "cf97ec579c3c42f953ef76dbf8d55ac91fb219dde70e49aa4a6b7d74e9919050" + +[[package]] +name = "once_cell_polyfill" +version = "1.70.2" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "384b8ab6d37215f3c5301a95a4accb5d64aa607f1fcb26a11b5303878451b4fe" + +[[package]] +name = "option-ext" +version = "0.2.0" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "04744f49eae99ab78e0d5c0b603ab218f515ea8cfe5a456d7629ad883a3b6e7d" + +[[package]] +name = "percent-encoding" +version = "2.3.2" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "9b4f627cb1b25917193a259e49bdad08f671f8d9708acfd5fe0a8c1455d87220" + +[[package]] +name = "phone-opus" +version = "0.1.0" +dependencies = [ + "clap", + "dirs", + "libmcp", + "libmcp-testkit", + "serde", + "serde_json", + "thiserror", +] + +[[package]] +name = "pin-project-lite" +version = "0.2.17" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "a89322df9ebe1c1578d689c92318e070967d1042b512afbe49518723f4e6d5cd" + +[[package]] +name = "potential_utf" +version = "0.1.4" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "b73949432f5e2a09657003c25bca5e19a0e9c84f8058ca374f49e0ebe605af77" +dependencies = [ + "zerovec", +] + +[[package]] +name = "powerfmt" +version = "0.2.0" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "439ee305def115ba05938db6eb1644ff94165c5ab5e9420d1c1bcedbba909391" + +[[package]] +name = "proc-macro2" +version = "1.0.106" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "8fd00f0bb2e90d81d1044c2b32617f68fcb9fa3bb7640c23e9c748e53fb30934" +dependencies = [ + "unicode-ident", +] + +[[package]] +name = "quote" +version = "1.0.45" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "41f2619966050689382d2b44f664f4bc593e129785a36d6ee376ddf37259b924" +dependencies = [ + "proc-macro2", +] + +[[package]] +name = "redox_users" +version = "0.5.2" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "a4e608c6638b9c18977b00b475ac1f28d14e84b27d8d42f70e0bf1e3dec127ac" +dependencies = [ + "getrandom", + "libredox", + "thiserror", +] + +[[package]] +name = "ref-cast" +version = "1.0.25" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "f354300ae66f76f1c85c5f84693f0ce81d747e2c3f21a45fef496d89c960bf7d" +dependencies = [ + "ref-cast-impl", +] + +[[package]] +name = "ref-cast-impl" +version = "1.0.25" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "b7186006dcb21920990093f30e3dea63b7d6e977bf1256be20c3563a5db070da" +dependencies = [ + "proc-macro2", + "quote", + "syn", +] + +[[package]] +name = "schemars" +version = "1.2.1" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "a2b42f36aa1cd011945615b92222f6bf73c599a102a300334cd7f8dbeec726cc" +dependencies = [ + "dyn-clone", + "ref-cast", + "schemars_derive", + "serde", + "serde_json", +] + +[[package]] +name = "schemars_derive" +version = "1.2.1" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "7d115b50f4aaeea07e79c1912f645c7513d81715d0420f8bc77a18c6260b307f" +dependencies = [ + "proc-macro2", + "quote", + "serde_derive_internals", + "syn", +] + +[[package]] +name = "serde" +version = "1.0.228" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "9a8e94ea7f378bd32cbbd37198a4a91436180c5bb472411e48b5ec2e2124ae9e" +dependencies = [ + "serde_core", + "serde_derive", +] + +[[package]] +name = "serde_core" +version = "1.0.228" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "41d385c7d4ca58e59fc732af25c3983b67ac852c1a25000afe1175de458b67ad" +dependencies = [ + "serde_derive", +] + +[[package]] +name = "serde_derive" +version = "1.0.228" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "d540f220d3187173da220f885ab66608367b6574e925011a9353e4badda91d79" +dependencies = [ + "proc-macro2", + "quote", + "syn", +] + +[[package]] +name = "serde_derive_internals" +version = "0.29.1" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "18d26a20a969b9e3fdf2fc2d9f21eda6c40e2de84c9408bb5d3b05d499aae711" +dependencies = [ + "proc-macro2", + "quote", + "syn", +] + +[[package]] +name = "serde_json" +version = "1.0.149" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "83fc039473c5595ace860d8c4fafa220ff474b3fc6bfdb4293327f1a37e94d86" +dependencies = [ + "itoa", + "memchr", + "serde", + "serde_core", + "zmij", +] + +[[package]] +name = "smallvec" +version = "1.15.1" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "67b1b7a3b5fe4f1376887184045fcf45c69e92af734b7aaddc05fb777b6fbd03" + +[[package]] +name = "stable_deref_trait" +version = "1.2.1" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "6ce2be8dc25455e1f91df71bfa12ad37d7af1092ae736f3a6cd0e37bc7810596" + +[[package]] +name = "strsim" +version = "0.11.1" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "7da8b5736845d9f2fcb837ea5d9e2628564b3b043a70948a3f0b778838c5fb4f" + +[[package]] +name = "syn" +version = "2.0.117" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "e665b8803e7b1d2a727f4023456bbbbe74da67099c585258af0ad9c5013b9b99" +dependencies = [ + "proc-macro2", + "quote", + "unicode-ident", +] + +[[package]] +name = "synstructure" +version = "0.13.2" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "728a70f3dbaf5bab7f0c4b1ac8d7ae5ea60a4b5549c8a5914361c99147a709d2" +dependencies = [ + "proc-macro2", + "quote", + "syn", +] + +[[package]] +name = "thiserror" +version = "2.0.18" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "4288b5bcbc7920c07a1149a35cf9590a2aa808e0bc1eafaade0b80947865fbc4" +dependencies = [ + "thiserror-impl", +] + +[[package]] +name = "thiserror-impl" +version = "2.0.18" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "ebc4ee7f67670e9b64d05fa4253e753e016c6c95ff35b89b7941d6b856dec1d5" +dependencies = [ + "proc-macro2", + "quote", + "syn", +] + +[[package]] +name = "time" +version = "0.3.47" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "743bd48c283afc0388f9b8827b976905fb217ad9e647fae3a379a9283c4def2c" +dependencies = [ + "deranged", + "itoa", + "num-conv", + "powerfmt", + "serde_core", + "time-core", + "time-macros", +] + +[[package]] +name = "time-core" +version = "0.1.8" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "7694e1cfe791f8d31026952abf09c69ca6f6fa4e1a1229e18988f06a04a12dca" + +[[package]] +name = "time-macros" +version = "0.2.27" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "2e70e4c5a0e0a8a4823ad65dfe1a6930e4f4d756dcd9dd7939022b5e8c501215" +dependencies = [ + "num-conv", + "time-core", +] + +[[package]] +name = "tinystr" +version = "0.8.2" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "42d3e9c45c09de15d06dd8acf5f4e0e399e85927b7f00711024eb7ae10fa4869" +dependencies = [ + "displaydoc", + "zerovec", +] + +[[package]] +name = "tokio" +version = "1.50.0" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "27ad5e34374e03cfffefc301becb44e9dc3c17584f414349ebe29ed26661822d" +dependencies = [ + "bytes", + "pin-project-lite", + "tokio-macros", +] + +[[package]] +name = "tokio-macros" +version = "2.6.1" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "5c55a2eff8b69ce66c84f85e1da1c233edc36ceb85a2058d11b0d6a3c7e7569c" +dependencies = [ + "proc-macro2", + "quote", + "syn", +] + +[[package]] +name = "unicode-ident" +version = "1.0.24" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "e6e4313cd5fcd3dad5cafa179702e2b244f760991f45397d14d4ebf38247da75" + +[[package]] +name = "url" +version = "2.5.8" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "ff67a8a4397373c3ef660812acab3268222035010ab8680ec4215f38ba3d0eed" +dependencies = [ + "form_urlencoded", + "idna", + "percent-encoding", + "serde", +] + +[[package]] +name = "utf8_iter" +version = "1.0.4" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "b6c140620e7ffbb22c2dee59cafe6084a59b5ffc27a8859a5f0d494b5d52b6be" + +[[package]] +name = "utf8parse" +version = "0.2.2" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "06abde3611657adf66d383f00b093d7faecc7fa57071cce2578660c9f1010821" + +[[package]] +name = "wasi" +version = "0.11.1+wasi-snapshot-preview1" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "ccf3ec651a847eb01de73ccad15eb7d99f80485de043efb2f370cd654f4ea44b" + +[[package]] +name = "windows-link" +version = "0.2.1" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "f0805222e57f7521d6a62e36fa9163bc891acd422f971defe97d64e70d0a4fe5" + +[[package]] +name = "windows-sys" +version = "0.61.2" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "ae137229bcbd6cdf0f7b80a31df61766145077ddf49416a728b02cb3921ff3fc" +dependencies = [ + "windows-link", +] + +[[package]] +name = "writeable" +version = "0.6.2" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "9edde0db4769d2dc68579893f2306b26c6ecfbe0ef499b013d731b7b9247e0b9" + +[[package]] +name = "yoke" +version = "0.8.1" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "72d6e5c6afb84d73944e5cedb052c4680d5657337201555f9f2a16b7406d4954" +dependencies = [ + "stable_deref_trait", + "yoke-derive", + "zerofrom", +] + +[[package]] +name = "yoke-derive" +version = "0.8.1" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "b659052874eb698efe5b9e8cf382204678a0086ebf46982b79d6ca3182927e5d" +dependencies = [ + "proc-macro2", + "quote", + "syn", + "synstructure", +] + +[[package]] +name = "zerofrom" +version = "0.1.6" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "50cc42e0333e05660c3587f3bf9d0478688e15d870fab3346451ce7f8c9fbea5" +dependencies = [ + "zerofrom-derive", +] + +[[package]] +name = "zerofrom-derive" +version = "0.1.6" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "d71e5d6e06ab090c67b5e44993ec16b72dcbaabc526db883a360057678b48502" +dependencies = [ + "proc-macro2", + "quote", + "syn", + "synstructure", +] + +[[package]] +name = "zerotrie" +version = "0.2.3" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "2a59c17a5562d507e4b54960e8569ebee33bee890c70aa3fe7b97e85a9fd7851" +dependencies = [ + "displaydoc", + "yoke", + "zerofrom", +] + +[[package]] +name = "zerovec" +version = "0.11.5" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "6c28719294829477f525be0186d13efa9a3c602f7ec202ca9e353d310fb9a002" +dependencies = [ + "yoke", + "zerofrom", + "zerovec-derive", +] + +[[package]] +name = "zerovec-derive" +version = "0.11.2" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "eadce39539ca5cb3985590102671f2567e659fca9666581ad3411d59207951f3" +dependencies = [ + "proc-macro2", + "quote", + "syn", +] + +[[package]] +name = "zmij" +version = "1.0.21" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "b8848ee67ecc8aedbaf3e4122217aff892639231befc6a1b58d29fff4c2cabaa" diff --git a/Cargo.toml b/Cargo.toml new file mode 100644 index 0000000..221e0c7 --- /dev/null +++ b/Cargo.toml @@ -0,0 +1,116 @@ +[workspace] +members = ["crates/phone-opus"] +resolver = "3" + +[workspace.package] +categories = ["development-tools", "command-line-utilities"] +description = "Consultative Claude Code MCP with a durable host, disposable worker, and read-only tool posture." +edition = "2024" +keywords = ["mcp", "claude", "consultation", "ai", "tooling"] +license = "MIT" +readme = "README.md" +repository = "https://git.swarm.moe/phone_opus.git" +rust-version = "1.94" +version = "0.1.0" + +[workspace.dependencies] +clap = { version = "4.5", features = ["derive"] } +dirs = "6" +libmcp = { git = "https://git.swarm.moe/libmcp.git", rev = "e325cd23f19378f543981071673c1d03be438fa5" } +libmcp-testkit = { git = "https://git.swarm.moe/libmcp.git", rev = "e325cd23f19378f543981071673c1d03be438fa5", package = "libmcp-testkit" } +serde = { version = "1.0.228", features = ["derive"] } +serde_json = "1.0.145" +thiserror = "2.0.17" + +[workspace.lints.rust] +elided_lifetimes_in_paths = "deny" +unexpected_cfgs = "deny" +unsafe_code = "deny" +unused_crate_dependencies = "warn" +unused_lifetimes = "deny" +unused_qualifications = "deny" +unused_results = "deny" + +[workspace.lints.rustdoc] +bare_urls = "deny" +broken_intra_doc_links = "deny" + +[workspace.lints.clippy] +all = { level = "deny", priority = -2 } +pedantic = { level = "deny", priority = -1 } +cargo = { level = "warn", priority = -3 } + +dbg_macro = "deny" +expect_used = "deny" +panic = "deny" +todo = "deny" +unimplemented = "deny" +unwrap_used = "deny" +allow_attributes_without_reason = "deny" + +cargo_common_metadata = "allow" +missing_errors_doc = "allow" +missing_panics_doc = "allow" +multiple_crate_versions = "allow" + +items_after_statements = "allow" +many_single_char_names = "allow" +match_same_arms = "allow" +module_name_repetitions = "allow" +similar_names = "allow" +struct_field_names = "allow" +too_many_arguments = "allow" +too_many_lines = "allow" +unnested_or_patterns = "allow" + +cast_lossless = "allow" +cast_possible_truncation = "allow" +cast_possible_wrap = "allow" +cast_precision_loss = "allow" +cast_sign_loss = "allow" +float_cmp = "allow" +implicit_hasher = "allow" +manual_let_else = "allow" +map_unwrap_or = "allow" +uninlined_format_args = "allow" + +ignored_unit_patterns = "allow" +must_use_candidate = "allow" +needless_pass_by_value = "allow" +no_effect_underscore_binding = "allow" +redundant_closure_for_method_calls = "allow" +ref_option = "allow" +return_self_not_must_use = "allow" +trivially_copy_pass_by_ref = "allow" +unused_async = "allow" +used_underscore_binding = "allow" + +[workspace.metadata.rust-starter] +format_command = ["cargo", "fmt", "--all", "--check"] +clippy_command = [ + "cargo", + "clippy", + "--workspace", + "--all-targets", + "--all-features", + "--", + "-D", + "warnings", +] +test_command = ["cargo", "test", "--workspace", "--all-targets", "--all-features"] +doc_command = ["cargo", "doc", "--workspace", "--all-features", "--no-deps"] +fix_command = [ + "cargo", + "clippy", + "--fix", + "--workspace", + "--all-targets", + "--all-features", + "--allow-dirty", + "--allow-staged", +] + +[workspace.metadata.rust-starter.source_files] +max_lines = 2500 +include = ["*.rs", "**/*.rs"] +exclude = [] diff --git a/README.md b/README.md new file mode 100644 index 0000000..86c66f6 --- /dev/null +++ b/README.md @@ -0,0 +1,43 @@ +# phone_opus + +`phone_opus` is a deliberately narrow MCP server for consultative Claude Code +calls. + +It exposes one blocking domain tool: + +- `consult`: run the system `claude` install in print mode, wait for the answer, + and return the response plus execution metadata + +The server keeps the public MCP session in a durable host, isolates the actual +Claude invocation in a disposable worker, and ships standard health and +telemetry surfaces: + +- `health_snapshot` +- `telemetry_snapshot` + +## Runtime posture + +Each `consult` call runs Claude Code with: + +- the system `claude` binary +- the normal settings stack, including user-level defaults +- no configured MCP servers (`--strict-mcp-config --mcp-config '{"mcpServers":{}}'`) +- a read-only built-in toolset: + - `Bash,Read,Grep,Glob,LS,WebFetch,WebSearch` +- `--permission-mode dontAsk`, so only preapproved read-only Bash patterns can + execute and edit tools never appear in the session + +## Development + +Run the fast gate with: + +```bash +python check.py +``` + +Run the server locally with: + +```bash +cargo run -- mcp serve +``` + diff --git a/check.py b/check.py new file mode 100644 index 0000000..9f31d1a --- /dev/null +++ b/check.py @@ -0,0 +1,177 @@ +#!/usr/bin/env python3 +from __future__ import annotations + +import argparse +import os +import subprocess +import tomllib +from dataclasses import dataclass +from pathlib import Path +from pathlib import PurePosixPath + + +ROOT = Path(__file__).resolve().parent +WORKSPACE_MANIFEST = ROOT / "Cargo.toml" +DEFAULT_MAX_SOURCE_FILE_LINES = 2500 +DEFAULT_SOURCE_FILE_INCLUDE = ("*.rs", "**/*.rs") +IGNORED_SOURCE_DIRS = frozenset( + {".direnv", ".git", ".hg", ".jj", ".svn", "__pycache__", "node_modules", "target", "vendor"} +) + + +@dataclass(frozen=True, slots=True) +class SourceFilePolicy: + max_lines: int + include: tuple[str, ...] + exclude: tuple[str, ...] + + +def load_workspace_metadata() -> dict[str, object]: + workspace = tomllib.loads(WORKSPACE_MANIFEST.read_text(encoding="utf-8")) + return workspace["workspace"]["metadata"]["rust-starter"] + + +def load_commands(metadata: dict[str, object]) -> dict[str, list[str]]: + commands: dict[str, list[str]] = {} + for key in ("format_command", "clippy_command", "test_command", "doc_command", "fix_command"): + value = metadata.get(key) + if isinstance(value, list) and value and all(isinstance(part, str) for part in value): + commands[key] = value + return commands + + +def load_patterns( + value: object, + *, + default: tuple[str, ...], + key_path: str, + allow_empty: bool, +) -> tuple[str, ...]: + if value is None: + return default + if not isinstance(value, list) or not all(isinstance(pattern, str) and pattern for pattern in value): + raise SystemExit(f"[check] invalid {key_path}: expected a string list") + if not allow_empty and not value: + raise SystemExit(f"[check] invalid {key_path}: expected at least one pattern") + return tuple(value) + + +def load_source_file_policy(metadata: dict[str, object]) -> SourceFilePolicy: + raw_policy = metadata.get("source_files") + if raw_policy is None: + return SourceFilePolicy(DEFAULT_MAX_SOURCE_FILE_LINES, DEFAULT_SOURCE_FILE_INCLUDE, ()) + if not isinstance(raw_policy, dict): + raise SystemExit("[check] invalid workspace.metadata.rust-starter.source_files: expected a table") + + max_lines = raw_policy.get("max_lines", DEFAULT_MAX_SOURCE_FILE_LINES) + if not isinstance(max_lines, int) or max_lines <= 0: + raise SystemExit( + "[check] invalid workspace.metadata.rust-starter.source_files.max_lines: expected a positive integer" + ) + + include = load_patterns( + raw_policy.get("include"), + default=DEFAULT_SOURCE_FILE_INCLUDE, + key_path="workspace.metadata.rust-starter.source_files.include", + allow_empty=False, + ) + exclude = load_patterns( + raw_policy.get("exclude"), + default=(), + key_path="workspace.metadata.rust-starter.source_files.exclude", + allow_empty=True, + ) + return SourceFilePolicy(max_lines, include, exclude) + + +def run(name: str, argv: list[str]) -> None: + print(f"[check] {name}: {' '.join(argv)}", flush=True) + proc = subprocess.run(argv, cwd=ROOT) + if proc.returncode != 0: + raise SystemExit(proc.returncode) + + +def parse_args() -> argparse.Namespace: + parser = argparse.ArgumentParser(description="Thin Rust starter check runner") + parser.add_argument( + "mode", + nargs="?", + choices=("check", "deep", "fix"), + default="check", + help="Run the fast gate, include docs for the deep gate, or run the fix command.", + ) + return parser.parse_args() + + +def matches_pattern(path: PurePosixPath, pattern: str) -> bool: + if path.match(pattern): + return True + prefix = "**/" + return pattern.startswith(prefix) and path.match(pattern.removeprefix(prefix)) + + +def iter_source_files(policy: SourceFilePolicy) -> list[Path]: + paths: list[Path] = [] + for current_root, dirnames, filenames in os.walk(ROOT): + dirnames[:] = sorted(name for name in dirnames if name not in IGNORED_SOURCE_DIRS) + current = Path(current_root) + for filename in filenames: + path = current / filename + relative_path = PurePosixPath(path.relative_to(ROOT).as_posix()) + if not any(matches_pattern(relative_path, pattern) for pattern in policy.include): + continue + if any(matches_pattern(relative_path, pattern) for pattern in policy.exclude): + continue + paths.append(path) + return sorted(paths) + + +def line_count(path: Path) -> int: + return len(path.read_text(encoding="utf-8").splitlines()) + + +def enforce_source_file_policy(policy: SourceFilePolicy) -> None: + paths = iter_source_files(policy) + print(f"[check] source-files: max {policy.max_lines} lines", flush=True) + violations: list[tuple[str, int]] = [] + for path in paths: + lines = line_count(path) + if lines > policy.max_lines: + violations.append((path.relative_to(ROOT).as_posix(), lines)) + if not violations: + return + + print( + f"[check] source-files: {len(violations)} file(s) exceed the configured limit", + flush=True, + ) + for relative_path, lines in violations: + print(f"[check] source-files: {relative_path}: {lines} lines", flush=True) + raise SystemExit(1) + + +def main() -> None: + metadata = load_workspace_metadata() + commands = load_commands(metadata) + source_file_policy = load_source_file_policy(metadata) + args = parse_args() + + if args.mode == "fix": + run("fix", commands["fix_command"]) + return + + enforce_source_file_policy(source_file_policy) + run("fmt", commands["format_command"]) + run("clippy", commands["clippy_command"]) + run("test", commands["test_command"]) + + if args.mode == "deep" and "doc_command" in commands: + run("doc", commands["doc_command"]) + + +if __name__ == "__main__": + try: + main() + except KeyboardInterrupt: + raise SystemExit(130) + diff --git a/clippy.toml b/clippy.toml new file mode 100644 index 0000000..520b038 --- /dev/null +++ b/clippy.toml @@ -0,0 +1,6 @@ +# Keep this file tiny. Do not move the repo-wide allow/deny architecture here. + +allow-expect-in-tests = true +allow-unwrap-in-tests = true +allow-panic-in-tests = true + diff --git a/crates/phone-opus/Cargo.toml b/crates/phone-opus/Cargo.toml new file mode 100644 index 0000000..402d459 --- /dev/null +++ b/crates/phone-opus/Cargo.toml @@ -0,0 +1,25 @@ +[package] +name = "phone-opus" +edition.workspace = true +license.workspace = true +rust-version.workspace = true +version.workspace = true +categories.workspace = true +description.workspace = true +keywords.workspace = true +readme.workspace = true +repository.workspace = true + +[dependencies] +clap.workspace = true +dirs.workspace = true +libmcp.workspace = true +serde.workspace = true +serde_json.workspace = true +thiserror.workspace = true + +[dev-dependencies] +libmcp-testkit.workspace = true + +[lints] +workspace = true diff --git a/crates/phone-opus/src/main.rs b/crates/phone-opus/src/main.rs new file mode 100644 index 0000000..79ace26 --- /dev/null +++ b/crates/phone-opus/src/main.rs @@ -0,0 +1,51 @@ +mod mcp; + +use clap::{Args, Parser, Subcommand}; +#[cfg(test)] +use libmcp_testkit as _; + +#[derive(Parser)] +#[command( + author, + version, + about = "Consultative Claude Code MCP with a hardened host/worker spine" +)] +struct Cli { + #[command(subcommand)] + command: Command, +} + +#[derive(Subcommand)] +enum Command { + /// Serve the stdio MCP host. + Mcp { + #[command(subcommand)] + command: McpCommand, + }, +} + +#[derive(Subcommand)] +enum McpCommand { + /// Run the durable stdio host. + Serve, + /// Run the disposable worker process. + Worker(McpWorkerArgs), +} + +#[derive(Args)] +struct McpWorkerArgs { + /// Logical worker generation assigned by the host. + #[arg(long)] + generation: u64, +} + +fn main() -> Result<(), Box> { + let cli = Cli::parse(); + match cli.command { + Command::Mcp { command } => match command { + McpCommand::Serve => mcp::run_host()?, + McpCommand::Worker(args) => mcp::run_worker(args.generation)?, + }, + } + Ok(()) +} diff --git a/crates/phone-opus/src/mcp/catalog.rs b/crates/phone-opus/src/mcp/catalog.rs new file mode 100644 index 0000000..a7e7cf6 --- /dev/null +++ b/crates/phone-opus/src/mcp/catalog.rs @@ -0,0 +1,107 @@ +use libmcp::ReplayContract; +use serde_json::{Value, json}; + +use crate::mcp::output::with_common_presentation; + +#[derive(Clone, Copy, Debug, Eq, PartialEq)] +pub(crate) enum DispatchTarget { + Host, + Worker, +} + +#[derive(Clone, Copy, Debug, Eq, PartialEq)] +pub(crate) struct ToolSpec { + pub(crate) name: &'static str, + pub(crate) description: &'static str, + pub(crate) dispatch: DispatchTarget, + pub(crate) replay: ReplayContract, +} + +impl ToolSpec { + fn annotation_json(self) -> Value { + json!({ + "title": self.name, + "readOnlyHint": true, + "destructiveHint": false, + "phoneOpus": { + "dispatch": match self.dispatch { + DispatchTarget::Host => "host", + DispatchTarget::Worker => "worker", + }, + "replayContract": match self.replay { + ReplayContract::Convergent => "convergent", + ReplayContract::ProbeRequired => "probe_required", + ReplayContract::NeverReplay => "never_replay", + }, + } + }) + } +} + +const TOOL_SPECS: &[ToolSpec] = &[ + ToolSpec { + name: "consult", + description: "Run a blocking consult against the system Claude Code install using a read-only built-in toolset and return the response.", + dispatch: DispatchTarget::Worker, + replay: ReplayContract::NeverReplay, + }, + ToolSpec { + name: "health_snapshot", + description: "Read host lifecycle, worker generation, rollout state, and latest fault. Defaults to render=porcelain; use render=json for structured output.", + dispatch: DispatchTarget::Host, + replay: ReplayContract::Convergent, + }, + ToolSpec { + name: "telemetry_snapshot", + description: "Read aggregate request and recovery telemetry for this session. Defaults to render=porcelain; use render=json for structured output.", + dispatch: DispatchTarget::Host, + replay: ReplayContract::Convergent, + }, +]; + +pub(crate) fn tool_spec(name: &str) -> Option { + TOOL_SPECS.iter().copied().find(|spec| spec.name == name) +} + +pub(crate) fn tool_definitions() -> Vec { + TOOL_SPECS + .iter() + .map(|spec| { + json!({ + "name": spec.name, + "description": spec.description, + "inputSchema": tool_schema(spec.name), + "annotations": spec.annotation_json(), + }) + }) + .collect() +} + +fn tool_schema(name: &str) -> Value { + match name { + "consult" => with_common_presentation(json!({ + "type": "object", + "properties": { + "prompt": { + "type": "string", + "description": "Prompt to send to Claude Code." + }, + "cwd": { + "type": "string", + "description": "Optional working directory for the Claude Code session. Relative paths resolve against the MCP host working directory." + }, + "max_turns": { + "type": "integer", + "minimum": 1, + "description": "Optional maximum number of Claude agent turns before stopping." + } + }, + "required": ["prompt"] + })), + "health_snapshot" | "telemetry_snapshot" => with_common_presentation(json!({ + "type": "object", + "properties": {} + })), + _ => Value::Null, + } +} diff --git a/crates/phone-opus/src/mcp/fault.rs b/crates/phone-opus/src/mcp/fault.rs new file mode 100644 index 0000000..5b23f79 --- /dev/null +++ b/crates/phone-opus/src/mcp/fault.rs @@ -0,0 +1,214 @@ +use libmcp::{Fault, FaultClass, FaultCode, Generation, RecoveryDirective, ToolErrorDetail}; +use serde::{Deserialize, Serialize}; +use serde_json::{Value, json}; + +#[derive(Clone, Copy, Debug, Deserialize, Eq, PartialEq, Serialize)] +pub(crate) enum FaultStage { + Host, + Worker, + Claude, + Transport, + Protocol, + Rollout, +} + +#[derive(Clone, Debug, Deserialize, Eq, PartialEq, Serialize)] +pub(crate) struct FaultRecord { + pub(crate) fault: Fault, + pub(crate) stage: FaultStage, + pub(crate) operation: String, + pub(crate) jsonrpc_code: i64, + pub(crate) retryable: bool, + pub(crate) retried: bool, +} + +impl FaultRecord { + pub(crate) fn invalid_input( + generation: Generation, + stage: FaultStage, + operation: impl Into, + detail: impl Into, + ) -> Self { + Self::new( + generation, + FaultClass::Protocol, + "invalid_input", + RecoveryDirective::AbortRequest, + stage, + operation, + detail, + -32602, + ) + } + + pub(crate) fn not_initialized( + generation: Generation, + stage: FaultStage, + operation: impl Into, + detail: impl Into, + ) -> Self { + Self::new( + generation, + FaultClass::Protocol, + "not_initialized", + RecoveryDirective::AbortRequest, + stage, + operation, + detail, + -32002, + ) + } + + pub(crate) fn transport( + generation: Generation, + stage: FaultStage, + operation: impl Into, + detail: impl Into, + ) -> Self { + Self::new( + generation, + FaultClass::Transport, + "transport_failure", + RecoveryDirective::RestartAndReplay, + stage, + operation, + detail, + -32603, + ) + } + + pub(crate) fn process( + generation: Generation, + stage: FaultStage, + operation: impl Into, + detail: impl Into, + ) -> Self { + Self::new( + generation, + FaultClass::Process, + "process_failure", + RecoveryDirective::RestartAndReplay, + stage, + operation, + detail, + -32603, + ) + } + + pub(crate) fn downstream( + generation: Generation, + stage: FaultStage, + operation: impl Into, + detail: impl Into, + ) -> Self { + Self::new( + generation, + FaultClass::Downstream, + "downstream_failure", + RecoveryDirective::AbortRequest, + stage, + operation, + detail, + -32603, + ) + } + + pub(crate) fn internal( + generation: Generation, + stage: FaultStage, + operation: impl Into, + detail: impl Into, + ) -> Self { + Self::new( + generation, + FaultClass::Invariant, + "internal_failure", + RecoveryDirective::AbortRequest, + stage, + operation, + detail, + -32603, + ) + } + + pub(crate) fn rollout( + generation: Generation, + operation: impl Into, + detail: impl Into, + ) -> Self { + Self::new( + generation, + FaultClass::Rollout, + "rollout_failure", + RecoveryDirective::RestartAndReplay, + FaultStage::Rollout, + operation, + detail, + -32603, + ) + } + + pub(crate) fn mark_retried(mut self) -> Self { + self.retried = true; + self + } + + pub(crate) fn message(&self) -> &str { + self.fault.detail.as_str() + } + + pub(crate) fn error_detail(&self) -> ToolErrorDetail { + ToolErrorDetail { + code: Some(self.jsonrpc_code), + kind: Some(self.fault.code.as_str().to_owned()), + message: Some(self.message().to_owned()), + } + } + + pub(crate) fn into_jsonrpc_error(self) -> Value { + json!({ + "code": self.jsonrpc_code, + "message": self.message(), + "data": self, + }) + } + + pub(crate) fn into_tool_result(self) -> Value { + json!({ + "content": [{ + "type": "text", + "text": self.message(), + }], + "structuredContent": self, + "isError": true, + }) + } + + fn new( + generation: Generation, + class: FaultClass, + code: &'static str, + directive: RecoveryDirective, + stage: FaultStage, + operation: impl Into, + detail: impl Into, + jsonrpc_code: i64, + ) -> Self { + let fault = Fault::new(generation, class, fault_code(code), directive, detail); + Self { + retryable: directive != RecoveryDirective::AbortRequest, + fault, + stage, + operation: operation.into(), + jsonrpc_code, + retried: false, + } + } +} + +fn fault_code(code: &'static str) -> FaultCode { + match FaultCode::try_new(code.to_owned()) { + Ok(value) => value, + Err(_) => std::process::abort(), + } +} diff --git a/crates/phone-opus/src/mcp/host/binary.rs b/crates/phone-opus/src/mcp/host/binary.rs new file mode 100644 index 0000000..9ec7721 --- /dev/null +++ b/crates/phone-opus/src/mcp/host/binary.rs @@ -0,0 +1,41 @@ +use std::fs; +use std::io; +use std::path::{Path, PathBuf}; + +use crate::mcp::protocol::BinaryFingerprint; + +pub(crate) struct BinaryRuntime { + pub(crate) path: PathBuf, + startup_fingerprint: BinaryFingerprint, + pub(crate) launch_path_stable: bool, +} + +impl BinaryRuntime { + pub(crate) fn new(path: PathBuf) -> io::Result { + let startup_fingerprint = fingerprint_binary(&path)?; + Ok(Self { + launch_path_stable: !path + .components() + .any(|component| component.as_os_str().to_string_lossy() == "target"), + path, + startup_fingerprint, + }) + } + + pub(crate) fn rollout_pending(&self) -> io::Result { + Ok(fingerprint_binary(&self.path)? != self.startup_fingerprint) + } +} + +fn fingerprint_binary(path: &Path) -> io::Result { + let metadata = fs::metadata(path)?; + let modified_unix_nanos = metadata + .modified()? + .duration_since(std::time::UNIX_EPOCH) + .map_err(|error| io::Error::other(format!("invalid binary mtime: {error}")))? + .as_nanos(); + Ok(BinaryFingerprint { + length_bytes: metadata.len(), + modified_unix_nanos, + }) +} diff --git a/crates/phone-opus/src/mcp/host/mod.rs b/crates/phone-opus/src/mcp/host/mod.rs new file mode 100644 index 0000000..29cdcb1 --- /dev/null +++ b/crates/phone-opus/src/mcp/host/mod.rs @@ -0,0 +1,3 @@ +pub(crate) mod binary; +pub(crate) mod process; +pub(crate) mod runtime; diff --git a/crates/phone-opus/src/mcp/host/process.rs b/crates/phone-opus/src/mcp/host/process.rs new file mode 100644 index 0000000..d3c62c8 --- /dev/null +++ b/crates/phone-opus/src/mcp/host/process.rs @@ -0,0 +1,223 @@ +use std::io::{BufRead, BufReader, BufWriter, Write}; +use std::process::{Child, ChildStdin, ChildStdout, Command, Stdio}; + +use libmcp::Generation; +use serde_json::Value; + +use crate::mcp::fault::{FaultRecord, FaultStage}; +use crate::mcp::protocol::{ + HostRequestId, WorkerOperation, WorkerOutcome, WorkerRequest, WorkerResponse, WorkerSpawnConfig, +}; + +pub(super) struct WorkerSupervisor { + config: WorkerSpawnConfig, + generation: Generation, + has_spawned: bool, + crash_before_reply_once: bool, + child: Option, + stdin: Option>, + stdout: Option>, +} + +impl WorkerSupervisor { + pub(super) fn new( + config: WorkerSpawnConfig, + generation: Generation, + has_spawned: bool, + ) -> Self { + Self { + config, + generation, + has_spawned, + crash_before_reply_once: false, + child: None, + stdin: None, + stdout: None, + } + } + + pub(super) fn generation(&self) -> Generation { + self.generation + } + + pub(super) fn has_spawned(&self) -> bool { + self.has_spawned + } + + pub(super) fn execute( + &mut self, + request_id: HostRequestId, + operation: WorkerOperation, + ) -> Result { + self.ensure_worker()?; + let request = WorkerRequest::Execute { + id: request_id, + operation, + }; + let stdin = self.stdin.as_mut().ok_or_else(|| { + FaultRecord::transport( + self.generation, + FaultStage::Transport, + "worker.stdin", + "worker stdin is not available", + ) + })?; + serde_json::to_writer(&mut *stdin, &request).map_err(|error| { + FaultRecord::transport( + self.generation, + FaultStage::Transport, + "worker.write", + format!("failed to encode worker request: {error}"), + ) + })?; + stdin.write_all(b"\n").map_err(|error| { + FaultRecord::transport( + self.generation, + FaultStage::Transport, + "worker.write", + format!("failed to frame worker request: {error}"), + ) + })?; + stdin.flush().map_err(|error| { + FaultRecord::transport( + self.generation, + FaultStage::Transport, + "worker.write", + format!("failed to flush worker request: {error}"), + ) + })?; + + if self.crash_before_reply_once { + self.crash_before_reply_once = false; + self.kill_current_worker(); + return Err(FaultRecord::transport( + self.generation, + FaultStage::Transport, + "worker.read", + "worker crashed before replying", + )); + } + + let stdout = self.stdout.as_mut().ok_or_else(|| { + FaultRecord::transport( + self.generation, + FaultStage::Transport, + "worker.stdout", + "worker stdout is not available", + ) + })?; + let mut line = String::new(); + let bytes = stdout.read_line(&mut line).map_err(|error| { + FaultRecord::transport( + self.generation, + FaultStage::Transport, + "worker.read", + format!("failed to read worker response: {error}"), + ) + })?; + if bytes == 0 { + self.kill_current_worker(); + return Err(FaultRecord::transport( + self.generation, + FaultStage::Transport, + "worker.read", + "worker exited before replying", + )); + } + let response = serde_json::from_str::(&line).map_err(|error| { + FaultRecord::transport( + self.generation, + FaultStage::Transport, + "worker.read", + format!("invalid worker response: {error}"), + ) + })?; + match response.outcome { + WorkerOutcome::Success { result } => Ok(result), + WorkerOutcome::Fault { fault } => Err(fault), + } + } + + pub(super) fn restart(&mut self) -> Result<(), FaultRecord> { + self.kill_current_worker(); + self.ensure_worker() + } + + pub(super) fn is_alive(&mut self) -> bool { + let Some(child) = self.child.as_mut() else { + return false; + }; + if let Ok(None) = child.try_wait() { + true + } else { + self.child = None; + self.stdin = None; + self.stdout = None; + false + } + } + + pub(super) fn arm_crash_once(&mut self) { + self.crash_before_reply_once = true; + } + + fn ensure_worker(&mut self) -> Result<(), FaultRecord> { + if self.is_alive() { + return Ok(()); + } + let generation = if self.has_spawned { + self.generation.next() + } else { + self.generation + }; + let mut child = Command::new(&self.config.executable) + .arg("mcp") + .arg("worker") + .arg("--generation") + .arg(generation.get().to_string()) + .stdin(Stdio::piped()) + .stdout(Stdio::piped()) + .stderr(Stdio::inherit()) + .spawn() + .map_err(|error| { + FaultRecord::process( + generation, + FaultStage::Transport, + "worker.spawn", + format!("failed to spawn worker: {error}"), + ) + })?; + let stdin = child.stdin.take().ok_or_else(|| { + FaultRecord::internal( + generation, + FaultStage::Transport, + "worker.spawn", + "worker stdin pipe was not created", + ) + })?; + let stdout = child.stdout.take().ok_or_else(|| { + FaultRecord::internal( + generation, + FaultStage::Transport, + "worker.spawn", + "worker stdout pipe was not created", + ) + })?; + self.generation = generation; + self.has_spawned = true; + self.child = Some(child); + self.stdin = Some(BufWriter::new(stdin)); + self.stdout = Some(BufReader::new(stdout)); + Ok(()) + } + + fn kill_current_worker(&mut self) { + if let Some(child) = self.child.as_mut() { + let _ = child.kill(); + let _ = child.wait(); + } + self.child = None; + self.stdin = None; + self.stdout = None; + } +} diff --git a/crates/phone-opus/src/mcp/host/runtime.rs b/crates/phone-opus/src/mcp/host/runtime.rs new file mode 100644 index 0000000..5922766 --- /dev/null +++ b/crates/phone-opus/src/mcp/host/runtime.rs @@ -0,0 +1,779 @@ +use std::fs; +use std::io::{self, BufRead, Write}; +#[cfg(unix)] +use std::os::unix::process::CommandExt; +use std::path::PathBuf; +use std::process::Command; +use std::time::Instant; + +use dirs::{home_dir, state_dir}; +use libmcp::{ + FramedMessage, Generation, HostSessionKernel, ReplayContract, RequestId, RolloutState, + TelemetryLog, ToolOutcome, load_snapshot_file_from_env, remove_snapshot_file, + write_snapshot_file, +}; +use serde_json::{Value, json}; + +use crate::mcp::catalog::{DispatchTarget, tool_definitions, tool_spec}; +use crate::mcp::fault::{FaultRecord, FaultStage}; +use crate::mcp::host::binary::BinaryRuntime; +use crate::mcp::host::process::WorkerSupervisor; +use crate::mcp::output::{ + ToolOutput, fallback_detailed_tool_output, split_presentation, tool_success, +}; +use crate::mcp::protocol::{ + FORCE_ROLLOUT_ENV, HOST_STATE_ENV, HostRequestId, HostStateSeed, PROTOCOL_VERSION, SERVER_NAME, + WORKER_CRASH_ONCE_ENV, WorkerOperation, WorkerSpawnConfig, +}; +use crate::mcp::telemetry::ServerTelemetry; + +pub(crate) fn run_host() -> Result<(), Box> { + let stdin = io::stdin(); + let mut stdout = io::stdout().lock(); + let mut host = HostRuntime::new()?; + + for line in stdin.lock().lines() { + let line = line?; + if line.trim().is_empty() { + continue; + } + + let maybe_response = host.handle_line(&line); + if let Some(response) = maybe_response { + write_message(&mut stdout, &response)?; + } + host.maybe_roll_forward()?; + } + + Ok(()) +} + +struct HostRuntime { + session_kernel: HostSessionKernel, + telemetry: ServerTelemetry, + telemetry_log: TelemetryLog, + next_request_id: u64, + worker: WorkerSupervisor, + binary: BinaryRuntime, + force_rollout_key: Option, + force_rollout_consumed: bool, + rollout_requested: bool, + worker_crash_once_key: Option, + worker_crash_once_consumed: bool, +} + +impl HostRuntime { + fn new() -> Result> { + let executable = std::env::current_exe()?; + let binary = BinaryRuntime::new(executable.clone())?; + let restored = restore_host_state()?; + let session_kernel = restored + .as_ref() + .map(|seed| seed.session_kernel.clone().restore()) + .transpose()? + .map_or_else(HostSessionKernel::cold, HostSessionKernel::from_restored); + let telemetry = restored + .as_ref() + .map_or_else(ServerTelemetry::default, |seed| seed.telemetry.clone()); + let next_request_id = restored + .as_ref() + .map_or(1, |seed| seed.next_request_id.max(1)); + let worker_generation = restored + .as_ref() + .map_or(Generation::genesis(), |seed| seed.worker_generation); + let worker_spawned = restored.as_ref().is_some_and(|seed| seed.worker_spawned); + let force_rollout_consumed = restored + .as_ref() + .is_some_and(|seed| seed.force_rollout_consumed); + let worker_crash_once_consumed = restored + .as_ref() + .is_some_and(|seed| seed.worker_crash_once_consumed); + let telemetry_log = open_telemetry_log()?; + let worker = WorkerSupervisor::new( + WorkerSpawnConfig { + executable: executable.clone(), + }, + worker_generation, + worker_spawned, + ); + + Ok(Self { + session_kernel, + telemetry, + telemetry_log, + next_request_id, + worker, + binary, + force_rollout_key: std::env::var(FORCE_ROLLOUT_ENV).ok(), + force_rollout_consumed, + rollout_requested: false, + worker_crash_once_key: std::env::var(WORKER_CRASH_ONCE_ENV).ok(), + worker_crash_once_consumed, + }) + } + + fn handle_line(&mut self, line: &str) -> Option { + let frame = match FramedMessage::parse(line.as_bytes().to_vec()) { + Ok(frame) => frame, + Err(error) => { + return Some(jsonrpc_error( + Value::Null, + FaultRecord::invalid_input( + self.worker.generation(), + FaultStage::Protocol, + "jsonrpc.parse", + format!("parse error: {error}"), + ), + )); + } + }; + self.handle_frame(frame) + } + + fn handle_frame(&mut self, frame: FramedMessage) -> Option { + self.session_kernel.observe_client_frame(&frame); + let Some(object) = frame.value.as_object() else { + return Some(jsonrpc_error( + Value::Null, + FaultRecord::invalid_input( + self.worker.generation(), + FaultStage::Protocol, + "jsonrpc.message", + "invalid request: expected JSON object", + ), + )); + }; + let method = object.get("method").and_then(Value::as_str)?; + let id = object.get("id").cloned(); + let params = object.get("params").cloned().unwrap_or_else(|| json!({})); + let operation_key = operation_key(method, ¶ms); + let started_at = Instant::now(); + + self.telemetry.record_request(&operation_key); + let response = match self.dispatch(&frame, method, params, id.clone()) { + Ok(Some(result)) => { + let latency_ms = elapsed_ms(started_at.elapsed()); + self.telemetry.record_success( + &operation_key, + latency_ms, + self.worker.generation(), + self.worker.is_alive(), + ); + id.map(|id| jsonrpc_result(id, result)) + } + Ok(None) => { + let latency_ms = elapsed_ms(started_at.elapsed()); + self.telemetry.record_success( + &operation_key, + latency_ms, + self.worker.generation(), + self.worker.is_alive(), + ); + None + } + Err(fault) => { + let latency_ms = elapsed_ms(started_at.elapsed()); + self.telemetry.record_error( + &operation_key, + &fault, + latency_ms, + self.worker.generation(), + ); + Some(match id { + Some(id) if method == "tools/call" => { + jsonrpc_result(id, fault.into_tool_result()) + } + Some(id) => jsonrpc_error(id, fault), + None => jsonrpc_error(Value::Null, fault), + }) + } + }; + + if self.should_force_rollout(&operation_key) { + self.force_rollout_consumed = true; + self.telemetry.record_rollout(); + self.rollout_requested = true; + } + + response + } + + fn dispatch( + &mut self, + request_frame: &FramedMessage, + method: &str, + params: Value, + request_id: Option, + ) -> Result, FaultRecord> { + match method { + "initialize" => Ok(Some(json!({ + "protocolVersion": PROTOCOL_VERSION, + "capabilities": { + "tools": { "listChanged": false } + }, + "serverInfo": { + "name": SERVER_NAME, + "version": env!("CARGO_PKG_VERSION") + }, + "instructions": "Use consult for blocking Claude Code consultations. Each call runs the system Claude Code binary in print-mode JSON with no configured MCP servers, a read-only built-in toolset, and dontAsk permission mode. Edit tools are unavailable." + }))), + "notifications/initialized" => { + if !self.seed_captured() { + return Err(FaultRecord::not_initialized( + self.worker.generation(), + FaultStage::Host, + "notifications/initialized", + "received initialized notification before initialize", + )); + } + Ok(None) + } + "notifications/cancelled" => Ok(None), + "ping" => Ok(Some(json!({}))), + other => { + self.require_initialized(other)?; + match other { + "tools/list" => Ok(Some(json!({ "tools": tool_definitions() }))), + "tools/call" => Ok(Some(self.dispatch_tool_call( + request_frame, + params, + request_id, + )?)), + _ => Err(FaultRecord::invalid_input( + self.worker.generation(), + FaultStage::Protocol, + other, + format!("method `{other}` is not implemented"), + )), + } + } + } + } + + fn dispatch_tool_call( + &mut self, + request_frame: &FramedMessage, + params: Value, + _request_id: Option, + ) -> Result { + let envelope = + deserialize::(params, "tools/call", self.worker.generation())?; + let spec = tool_spec(&envelope.name).ok_or_else(|| { + FaultRecord::invalid_input( + self.worker.generation(), + FaultStage::Host, + format!("tools/call:{}", envelope.name), + format!("unknown tool `{}`", envelope.name), + ) + })?; + match spec.dispatch { + DispatchTarget::Host => { + let started_at = Instant::now(); + let request_id = request_id_from_frame(request_frame); + let result = self.handle_host_tool(&envelope.name, envelope.arguments); + self.record_host_tool_completion( + request_frame, + request_id.as_ref(), + elapsed_ms(started_at.elapsed()), + result.as_ref().err(), + ); + result + } + DispatchTarget::Worker => { + self.dispatch_worker_tool(request_frame, spec, envelope.arguments) + } + } + } + + fn dispatch_worker_tool( + &mut self, + request_frame: &FramedMessage, + spec: crate::mcp::catalog::ToolSpec, + arguments: Value, + ) -> Result { + let operation = format!("tools/call:{}", spec.name); + self.dispatch_worker_operation( + request_frame, + operation, + spec.replay, + WorkerOperation::CallTool { + name: spec.name.to_owned(), + arguments, + }, + ) + } + + fn dispatch_worker_operation( + &mut self, + request_frame: &FramedMessage, + operation: String, + replay: ReplayContract, + worker_operation: WorkerOperation, + ) -> Result { + if self.should_crash_worker_once(&operation) { + self.worker.arm_crash_once(); + } + + self.session_kernel + .record_forwarded_request(request_frame, replay); + let forwarded_request_id = request_id_from_frame(request_frame); + let host_request_id = self.allocate_request_id(); + let started_at = Instant::now(); + let mut replay_attempts = 0; + + let outcome = match self + .worker + .execute(host_request_id, worker_operation.clone()) + { + Ok(result) => Ok(result), + Err(fault) => { + if replay == ReplayContract::Convergent && fault.retryable { + replay_attempts = 1; + self.telemetry.record_retry(&operation); + self.worker + .restart() + .map_err(|restart_fault| restart_fault.mark_retried())?; + self.telemetry + .record_worker_restart(self.worker.generation()); + self.worker + .execute(host_request_id, worker_operation) + .map_err(FaultRecord::mark_retried) + } else { + Err(fault) + } + } + }; + + let completed = forwarded_request_id + .as_ref() + .and_then(|request_id| self.session_kernel.take_completed_request(request_id)); + self.record_worker_tool_completion( + forwarded_request_id.as_ref(), + completed.as_ref(), + elapsed_ms(started_at.elapsed()), + replay_attempts, + outcome.as_ref().err(), + ); + outcome + } + + fn handle_host_tool(&mut self, name: &str, arguments: Value) -> Result { + let operation = format!("tools/call:{name}"); + let generation = self.worker.generation(); + let (presentation, _arguments) = + split_presentation(arguments, &operation, generation, FaultStage::Host)?; + match name { + "health_snapshot" => { + let rollout = if self.binary.rollout_pending().map_err(|error| { + FaultRecord::rollout(generation, &operation, error.to_string()) + })? { + RolloutState::Pending + } else { + RolloutState::Stable + }; + let health = self.telemetry.health_snapshot(rollout); + tool_success( + health_snapshot_output( + &health, + self.worker.is_alive(), + self.binary.launch_path_stable, + generation, + )?, + presentation, + generation, + FaultStage::Host, + &operation, + ) + } + "telemetry_snapshot" => { + let snapshot = self.telemetry.telemetry_snapshot(); + tool_success( + telemetry_snapshot_output( + &snapshot, + self.telemetry.host_rollouts(), + generation, + )?, + presentation, + generation, + FaultStage::Host, + &operation, + ) + } + other => Err(FaultRecord::invalid_input( + generation, + FaultStage::Host, + format!("tools/call:{other}"), + format!("unknown host tool `{other}`"), + )), + } + } + + fn require_initialized(&self, operation: &str) -> Result<(), FaultRecord> { + if self.session_initialized() { + return Ok(()); + } + Err(FaultRecord::not_initialized( + self.worker.generation(), + FaultStage::Host, + operation, + "client must call initialize and notifications/initialized before normal operations", + )) + } + + fn session_initialized(&self) -> bool { + self.session_kernel + .initialization_seed() + .is_some_and(|seed| seed.initialized_notification.is_some()) + } + + fn seed_captured(&self) -> bool { + self.session_kernel.initialization_seed().is_some() + } + + fn allocate_request_id(&mut self) -> HostRequestId { + let id = HostRequestId(self.next_request_id); + self.next_request_id += 1; + id + } + + fn maybe_roll_forward(&mut self) -> Result<(), Box> { + let binary_pending = self.binary.rollout_pending()?; + if !self.rollout_requested && !binary_pending { + return Ok(()); + } + if binary_pending && !self.rollout_requested { + self.telemetry.record_rollout(); + } + self.roll_forward() + } + + fn roll_forward(&mut self) -> Result<(), Box> { + let state = HostStateSeed { + session_kernel: self.session_kernel.snapshot(), + telemetry: self.telemetry.clone(), + next_request_id: self.next_request_id, + worker_generation: self.worker.generation(), + worker_spawned: self.worker.has_spawned(), + force_rollout_consumed: self.force_rollout_consumed, + worker_crash_once_consumed: self.worker_crash_once_consumed, + }; + let state_path = write_snapshot_file("phone-opus-mcp-host-reexec", &state)?; + let mut command = Command::new(&self.binary.path); + let _ = command.arg("mcp").arg("serve"); + let _ = command.env(HOST_STATE_ENV, &state_path); + #[cfg(unix)] + { + let error = command.exec(); + let _ = remove_snapshot_file(&state_path); + Err(Box::new(error)) + } + #[cfg(not(unix))] + { + let _ = remove_snapshot_file(&state_path); + Err(Box::new(io::Error::new( + io::ErrorKind::Unsupported, + "host rollout requires unix exec support", + ))) + } + } + + fn should_force_rollout(&self, operation: &str) -> bool { + self.force_rollout_key + .as_deref() + .is_some_and(|key| key == operation) + && !self.force_rollout_consumed + } + + fn should_crash_worker_once(&mut self, operation: &str) -> bool { + let should_crash = self + .worker_crash_once_key + .as_deref() + .is_some_and(|key| key == operation) + && !self.worker_crash_once_consumed; + if should_crash { + self.worker_crash_once_consumed = true; + } + should_crash + } + + fn record_host_tool_completion( + &mut self, + request_frame: &FramedMessage, + request_id: Option<&RequestId>, + latency_ms: u64, + fault: Option<&FaultRecord>, + ) { + let Some(request_id) = request_id else { + return; + }; + let Some(tool_meta) = libmcp::parse_tool_call_meta(request_frame, "tools/call") else { + return; + }; + self.record_tool_completion(request_id, &tool_meta, latency_ms, 0, fault); + } + + fn record_worker_tool_completion( + &mut self, + request_id: Option<&RequestId>, + completed: Option<&libmcp::CompletedPendingRequest>, + latency_ms: u64, + replay_attempts: u8, + fault: Option<&FaultRecord>, + ) { + let Some(request_id) = request_id else { + return; + }; + let Some(completed) = completed else { + return; + }; + let Some(tool_meta) = completed.request.tool_call_meta.as_ref() else { + return; + }; + self.record_tool_completion(request_id, tool_meta, latency_ms, replay_attempts, fault); + } + + fn record_tool_completion( + &mut self, + request_id: &RequestId, + tool_meta: &libmcp::ToolCallMeta, + latency_ms: u64, + replay_attempts: u8, + fault: Option<&FaultRecord>, + ) { + let result = self.telemetry_log.record_tool_completion( + request_id, + tool_meta, + latency_ms, + replay_attempts, + if fault.is_some() { + ToolOutcome::Error + } else { + ToolOutcome::Ok + }, + fault.map_or_else(libmcp::ToolErrorDetail::default, FaultRecord::error_detail), + ); + if let Err(error) = result { + eprintln!("phone_opus telemetry write failed: {error}"); + } + } +} + +fn restore_host_state() -> Result, Box> { + Ok(load_snapshot_file_from_env(HOST_STATE_ENV)?) +} + +fn open_telemetry_log() -> io::Result { + let log_root = state_root()?.join("mcp"); + fs::create_dir_all(&log_root)?; + TelemetryLog::new( + log_root.join("telemetry.jsonl").as_path(), + repo_root()?.as_path(), + 1, + ) +} + +fn repo_root() -> io::Result { + std::env::current_dir() +} + +fn state_root() -> io::Result { + if let Some(root) = state_dir() { + return Ok(root.join(SERVER_NAME)); + } + if let Some(home) = home_dir() { + return Ok(home.join(".local").join("state").join(SERVER_NAME)); + } + Err(io::Error::new( + io::ErrorKind::NotFound, + "failed to resolve a state directory for telemetry", + )) +} + +fn health_snapshot_output( + health: &libmcp::HealthSnapshot, + worker_alive: bool, + launch_path_stable: bool, + generation: Generation, +) -> Result { + let rollout_pending = matches!(health.rollout, Some(RolloutState::Pending)); + let concise = json!({ + "ready": matches!(health.state, libmcp::LifecycleState::Ready), + "worker_generation": health.generation.get(), + "worker_alive": worker_alive, + "rollout_pending": rollout_pending, + "launch_path_stable": launch_path_stable, + }); + let full = json!({ + "health": health, + "worker_alive": worker_alive, + "launch_path_stable": launch_path_stable, + }); + let mut line = format!( + "{} gen={}", + if matches!(health.state, libmcp::LifecycleState::Ready) { + "ready" + } else { + "not-ready" + }, + health.generation.get() + ); + if let Some(last_fault) = health.last_fault.as_ref() { + line.push_str(format!(" last_fault={}", last_fault.code.as_str()).as_str()); + } + line.push_str(if worker_alive { + " worker=alive" + } else { + " worker=dead" + }); + if rollout_pending { + line.push_str(" rollout=pending"); + } + if !launch_path_stable { + line.push_str(" launch_path=unstable"); + } + fallback_detailed_tool_output( + &concise, + &full, + line, + None, + libmcp::SurfaceKind::Ops, + generation, + FaultStage::Host, + "tools/call:health_snapshot", + ) +} + +fn telemetry_snapshot_output( + telemetry: &libmcp::TelemetrySnapshot, + host_rollouts: u64, + generation: Generation, +) -> Result { + let hot_methods = telemetry.methods.iter().take(6).collect::>(); + let concise = json!({ + "requests": telemetry.totals.request_count, + "successes": telemetry.totals.success_count, + "response_errors": telemetry.totals.response_error_count, + "transport_faults": telemetry.totals.transport_fault_count, + "retries": telemetry.totals.retry_count, + "worker_restarts": telemetry.restart_count, + "host_rollouts": host_rollouts, + "hot_methods": hot_methods.iter().map(|method| json!({ + "method": method.method, + "requests": method.request_count, + "response_errors": method.response_error_count, + "transport_faults": method.transport_fault_count, + "retries": method.retry_count, + })).collect::>(), + }); + let full = json!({ + "telemetry": telemetry, + "host_rollouts": host_rollouts, + }); + let mut lines = vec![format!( + "requests={} success={} response_error={} transport_fault={} retry={}", + telemetry.totals.request_count, + telemetry.totals.success_count, + telemetry.totals.response_error_count, + telemetry.totals.transport_fault_count, + telemetry.totals.retry_count + )]; + lines.push(format!( + "worker_restarts={} host_rollouts={host_rollouts}", + telemetry.restart_count, + )); + if !hot_methods.is_empty() { + lines.push("hot methods:".to_owned()); + for method in hot_methods { + lines.push(format!( + "{} req={} err={} transport={} retry={}", + method.method, + method.request_count, + method.response_error_count, + method.transport_fault_count, + method.retry_count, + )); + } + } + fallback_detailed_tool_output( + &concise, + &full, + lines.join("\n"), + None, + libmcp::SurfaceKind::Ops, + generation, + FaultStage::Host, + "tools/call:telemetry_snapshot", + ) +} + +fn deserialize serde::Deserialize<'de>>( + value: Value, + operation: &str, + generation: Generation, +) -> Result { + serde_json::from_value(value).map_err(|error| { + FaultRecord::invalid_input( + generation, + FaultStage::Protocol, + operation, + format!("invalid params: {error}"), + ) + }) +} + +fn operation_key(method: &str, params: &Value) -> String { + match method { + "tools/call" => params.get("name").and_then(Value::as_str).map_or_else( + || "tools/call".to_owned(), + |name| format!("tools/call:{name}"), + ), + other => other.to_owned(), + } +} + +fn request_id_from_frame(frame: &FramedMessage) -> Option { + match frame.classify() { + libmcp::RpcEnvelopeKind::Request { id, .. } => Some(id), + libmcp::RpcEnvelopeKind::Notification { .. } + | libmcp::RpcEnvelopeKind::Response { .. } + | libmcp::RpcEnvelopeKind::Unknown => None, + } +} + +fn jsonrpc_result(id: Value, result: Value) -> Value { + json!({ + "jsonrpc": "2.0", + "id": id, + "result": result, + }) +} + +fn jsonrpc_error(id: Value, fault: FaultRecord) -> Value { + json!({ + "jsonrpc": "2.0", + "id": id, + "error": fault.into_jsonrpc_error(), + }) +} + +fn write_message(stdout: &mut impl Write, message: &Value) -> io::Result<()> { + serde_json::to_writer(&mut *stdout, message)?; + stdout.write_all(b"\n")?; + stdout.flush()?; + Ok(()) +} + +fn elapsed_ms(duration: std::time::Duration) -> u64 { + u64::try_from(duration.as_millis()).unwrap_or(u64::MAX) +} + +#[derive(Debug, serde::Deserialize)] +struct ToolCallEnvelope { + name: String, + #[serde(default = "empty_json_object")] + arguments: Value, +} + +fn empty_json_object() -> Value { + json!({}) +} diff --git a/crates/phone-opus/src/mcp/mod.rs b/crates/phone-opus/src/mcp/mod.rs new file mode 100644 index 0000000..666598f --- /dev/null +++ b/crates/phone-opus/src/mcp/mod.rs @@ -0,0 +1,10 @@ +mod catalog; +mod fault; +mod host; +mod output; +mod protocol; +mod service; +mod telemetry; + +pub(crate) use host::runtime::run_host; +pub(crate) use service::run_worker; diff --git a/crates/phone-opus/src/mcp/output.rs b/crates/phone-opus/src/mcp/output.rs new file mode 100644 index 0000000..90673b3 --- /dev/null +++ b/crates/phone-opus/src/mcp/output.rs @@ -0,0 +1,195 @@ +use libmcp::{ + DetailLevel, FallbackJsonProjection, JsonPorcelainConfig, ProjectionError, RenderMode, + SurfaceKind, ToolProjection, render_json_porcelain, with_presentation_properties, +}; +use serde::Serialize; +use serde_json::{Value, json}; + +use crate::mcp::fault::{FaultRecord, FaultStage}; + +const FULL_PORCELAIN_MAX_LINES: usize = 40; +const FULL_PORCELAIN_MAX_INLINE_CHARS: usize = 512; + +#[derive(Debug, Clone, Copy, PartialEq, Eq)] +pub(crate) struct Presentation { + pub(crate) render: RenderMode, + pub(crate) detail: DetailLevel, +} + +#[derive(Debug, Clone)] +pub(crate) struct ToolOutput { + concise: Value, + full: Value, + concise_text: String, + full_text: Option, +} + +impl ToolOutput { + pub(crate) fn from_values( + concise: Value, + full: Value, + concise_text: impl Into, + full_text: Option, + ) -> Self { + Self { + concise, + full, + concise_text: concise_text.into(), + full_text, + } + } + + fn structured(&self, detail: DetailLevel) -> &Value { + match detail { + DetailLevel::Concise => &self.concise, + DetailLevel::Full => &self.full, + } + } + + fn porcelain_text(&self, detail: DetailLevel) -> String { + match detail { + DetailLevel::Concise => self.concise_text.clone(), + DetailLevel::Full => self + .full_text + .clone() + .unwrap_or_else(|| render_json_porcelain(&self.full, full_porcelain_config())), + } + } +} + +impl Default for Presentation { + fn default() -> Self { + Self { + render: RenderMode::Porcelain, + detail: DetailLevel::Concise, + } + } +} + +pub(crate) fn split_presentation( + arguments: Value, + operation: &str, + generation: libmcp::Generation, + stage: FaultStage, +) -> Result<(Presentation, Value), FaultRecord> { + let Value::Object(mut object) = arguments else { + return Ok((Presentation::default(), arguments)); + }; + let render = object + .remove("render") + .map(|value| { + serde_json::from_value::(value).map_err(|error| { + FaultRecord::invalid_input( + generation, + stage, + operation, + format!("invalid render mode: {error}"), + ) + }) + }) + .transpose()? + .unwrap_or(RenderMode::Porcelain); + let detail = object + .remove("detail") + .map(|value| { + serde_json::from_value::(value).map_err(|error| { + FaultRecord::invalid_input( + generation, + stage, + operation, + format!("invalid detail level: {error}"), + ) + }) + }) + .transpose()? + .unwrap_or(DetailLevel::Concise); + Ok((Presentation { render, detail }, Value::Object(object))) +} + +pub(crate) fn projected_tool_output( + projection: &impl ToolProjection, + concise_text: impl Into, + full_text: Option, + generation: libmcp::Generation, + stage: FaultStage, + operation: &str, +) -> Result { + let concise = projection + .concise_projection() + .map_err(|error| projection_fault(error, generation, stage, operation))?; + let full = projection + .full_projection() + .map_err(|error| projection_fault(error, generation, stage, operation))?; + Ok(ToolOutput::from_values( + concise, + full, + concise_text, + full_text, + )) +} + +pub(crate) fn fallback_detailed_tool_output( + concise: &impl Serialize, + full: &impl Serialize, + concise_text: impl Into, + full_text: Option, + kind: SurfaceKind, + generation: libmcp::Generation, + stage: FaultStage, + operation: &str, +) -> Result { + let projection = FallbackJsonProjection::new(concise, full, kind) + .map_err(|error| projection_fault(error, generation, stage, operation))?; + projected_tool_output( + &projection, + concise_text, + full_text, + generation, + stage, + operation, + ) +} + +pub(crate) fn tool_success( + output: ToolOutput, + presentation: Presentation, + generation: libmcp::Generation, + stage: FaultStage, + operation: &str, +) -> Result { + let structured = output.structured(presentation.detail).clone(); + let text = match presentation.render { + RenderMode::Porcelain => output.porcelain_text(presentation.detail), + RenderMode::Json => serde_json::to_string_pretty(&structured).map_err(|error| { + FaultRecord::internal(generation, stage, operation, error.to_string()) + })?, + }; + Ok(json!({ + "content": [{ + "type": "text", + "text": text, + }], + "structuredContent": structured, + "isError": false, + })) +} + +pub(crate) fn with_common_presentation(schema: Value) -> Value { + with_presentation_properties(schema) +} + +fn projection_fault( + error: ProjectionError, + generation: libmcp::Generation, + stage: FaultStage, + operation: &str, +) -> FaultRecord { + FaultRecord::internal(generation, stage, operation, error.to_string()) +} + +const fn full_porcelain_config() -> JsonPorcelainConfig { + JsonPorcelainConfig { + max_lines: FULL_PORCELAIN_MAX_LINES, + max_inline_chars: FULL_PORCELAIN_MAX_INLINE_CHARS, + } +} diff --git a/crates/phone-opus/src/mcp/protocol.rs b/crates/phone-opus/src/mcp/protocol.rs new file mode 100644 index 0000000..6662fa9 --- /dev/null +++ b/crates/phone-opus/src/mcp/protocol.rs @@ -0,0 +1,74 @@ +use std::path::PathBuf; + +use libmcp::{Generation, HostSessionKernelSnapshot}; +use serde::{Deserialize, Serialize}; +use serde_json::Value; + +use crate::mcp::telemetry::ServerTelemetry; + +pub(crate) const PROTOCOL_VERSION: &str = "2025-11-25"; +pub(crate) const SERVER_NAME: &str = "phone_opus"; +pub(crate) const HOST_STATE_ENV: &str = "PHONE_OPUS_MCP_HOST_STATE"; +pub(crate) const FORCE_ROLLOUT_ENV: &str = "PHONE_OPUS_MCP_TEST_FORCE_ROLLOUT_KEY"; +pub(crate) const WORKER_CRASH_ONCE_ENV: &str = "PHONE_OPUS_MCP_TEST_WORKER_CRASH_ONCE_KEY"; +pub(crate) const CLAUDE_BIN_ENV: &str = "PHONE_OPUS_CLAUDE_BIN"; +pub(crate) const CLAUDE_TOOLSET: &str = "Bash,Read,Grep,Glob,LS,WebFetch,WebSearch"; +pub(crate) const EMPTY_MCP_CONFIG: &str = "{\"mcpServers\":{}}"; + +#[derive(Clone, Debug, Deserialize, Serialize)] +pub(crate) struct HostStateSeed { + pub(crate) session_kernel: HostSessionKernelSnapshot, + pub(crate) telemetry: ServerTelemetry, + pub(crate) next_request_id: u64, + pub(crate) worker_generation: Generation, + pub(crate) worker_spawned: bool, + pub(crate) force_rollout_consumed: bool, + pub(crate) worker_crash_once_consumed: bool, +} + +#[derive(Clone, Copy, Debug, Deserialize, Eq, Ord, PartialEq, PartialOrd, Serialize)] +#[serde(transparent)] +pub(crate) struct HostRequestId(pub(crate) u64); + +#[derive(Clone, Debug, Deserialize, Eq, PartialEq, Serialize)] +#[serde(tag = "kind", rename_all = "snake_case")] +pub(crate) enum WorkerRequest { + Execute { + id: HostRequestId, + operation: WorkerOperation, + }, +} + +#[derive(Clone, Debug, Deserialize, Eq, PartialEq, Serialize)] +#[serde(tag = "kind", rename_all = "snake_case")] +pub(crate) enum WorkerOperation { + CallTool { name: String, arguments: Value }, +} + +#[derive(Clone, Debug, Deserialize, Eq, PartialEq, Serialize)] +pub(crate) struct WorkerResponse { + pub(crate) id: HostRequestId, + pub(crate) outcome: WorkerOutcome, +} + +#[derive(Clone, Debug, Deserialize, Eq, PartialEq, Serialize)] +#[serde(tag = "status", rename_all = "snake_case")] +pub(crate) enum WorkerOutcome { + Success { + result: Value, + }, + Fault { + fault: crate::mcp::fault::FaultRecord, + }, +} + +#[derive(Clone, Debug, Deserialize, Eq, PartialEq, Serialize)] +pub(crate) struct BinaryFingerprint { + pub(crate) length_bytes: u64, + pub(crate) modified_unix_nanos: u128, +} + +#[derive(Clone, Debug)] +pub(crate) struct WorkerSpawnConfig { + pub(crate) executable: PathBuf, +} diff --git a/crates/phone-opus/src/mcp/service.rs b/crates/phone-opus/src/mcp/service.rs new file mode 100644 index 0000000..2472887 --- /dev/null +++ b/crates/phone-opus/src/mcp/service.rs @@ -0,0 +1,550 @@ +use std::collections::BTreeMap; +use std::io::{self, BufRead, Write}; +use std::path::{Path, PathBuf}; +use std::process::Command; + +use libmcp::{Generation, SurfaceKind}; +use serde::Deserialize; +use serde_json::{Value, json}; +use thiserror::Error; + +use crate::mcp::fault::{FaultRecord, FaultStage}; +use crate::mcp::output::{ + ToolOutput, fallback_detailed_tool_output, split_presentation, tool_success, +}; +use crate::mcp::protocol::{CLAUDE_BIN_ENV, CLAUDE_TOOLSET, EMPTY_MCP_CONFIG}; + +pub(crate) fn run_worker(generation: u64) -> Result<(), Box> { + let generation = generation_from_wire(generation); + let stdin = io::stdin(); + let mut stdout = io::stdout().lock(); + let mut service = WorkerService::new(generation); + + for line in stdin.lock().lines() { + let line = line?; + if line.trim().is_empty() { + continue; + } + let request = serde_json::from_str::(&line)?; + let response = match request { + crate::mcp::protocol::WorkerRequest::Execute { id, operation } => { + let outcome = match service.execute(operation) { + Ok(result) => crate::mcp::protocol::WorkerOutcome::Success { result }, + Err(fault) => crate::mcp::protocol::WorkerOutcome::Fault { fault }, + }; + crate::mcp::protocol::WorkerResponse { id, outcome } + } + }; + serde_json::to_writer(&mut stdout, &response)?; + stdout.write_all(b"\n")?; + stdout.flush()?; + } + + Ok(()) +} + +struct WorkerService { + generation: Generation, +} + +impl WorkerService { + fn new(generation: Generation) -> Self { + Self { generation } + } + + fn execute( + &mut self, + operation: crate::mcp::protocol::WorkerOperation, + ) -> Result { + match operation { + crate::mcp::protocol::WorkerOperation::CallTool { name, arguments } => { + self.call_tool(&name, arguments) + } + } + } + + fn call_tool(&mut self, name: &str, arguments: Value) -> Result { + let operation = format!("tools/call:{name}"); + let (presentation, arguments) = + split_presentation(arguments, &operation, self.generation, FaultStage::Worker)?; + let output = match name { + "consult" => { + let args = deserialize::(arguments, &operation, self.generation)?; + let request = ConsultRequest::parse(args) + .map_err(|error| invalid_consult_request(self.generation, &operation, error))?; + let response = invoke_claude(&request) + .map_err(|error| consult_fault(self.generation, &operation, error))?; + consult_output(&request, &response, self.generation, &operation)? + } + other => { + return Err(FaultRecord::invalid_input( + self.generation, + FaultStage::Worker, + &operation, + format!("unknown worker tool `{other}`"), + )); + } + }; + tool_success( + output, + presentation, + self.generation, + FaultStage::Worker, + &operation, + ) + } +} + +#[derive(Debug, Deserialize)] +struct ConsultArgs { + prompt: String, + cwd: Option, + max_turns: Option, +} + +#[derive(Debug, Clone)] +struct ConsultRequest { + prompt: PromptText, + cwd: WorkingDirectory, + max_turns: Option, +} + +impl ConsultRequest { + fn parse(args: ConsultArgs) -> Result { + Ok(Self { + prompt: PromptText::parse(args.prompt)?, + cwd: WorkingDirectory::resolve(args.cwd)?, + max_turns: args.max_turns.map(TurnLimit::parse).transpose()?, + }) + } +} + +#[derive(Debug, Clone)] +struct PromptText(String); + +impl PromptText { + fn parse(raw: String) -> Result { + if raw.trim().is_empty() { + return Err(ConsultRequestError::EmptyPrompt); + } + Ok(Self(raw)) + } + + fn as_str(&self) -> &str { + self.0.as_str() + } +} + +#[derive(Debug, Clone)] +struct WorkingDirectory(PathBuf); + +impl WorkingDirectory { + fn resolve(raw: Option) -> Result { + let base = + std::env::current_dir().map_err(|source| ConsultRequestError::CurrentDir { source })?; + let requested = raw.map_or_else(|| base.clone(), PathBuf::from); + let candidate = if requested.is_absolute() { + requested + } else { + base.join(requested) + }; + let canonical = + candidate + .canonicalize() + .map_err(|source| ConsultRequestError::Canonicalize { + path: candidate.display().to_string(), + source, + })?; + if !canonical.is_dir() { + return Err(ConsultRequestError::NotDirectory( + canonical.display().to_string(), + )); + } + Ok(Self(canonical)) + } + + fn as_path(&self) -> &Path { + self.0.as_path() + } + + fn display(&self) -> String { + self.0.display().to_string() + } +} + +#[derive(Debug, Clone, Copy)] +struct TurnLimit(u64); + +impl TurnLimit { + fn parse(raw: u64) -> Result { + if raw == 0 { + return Err(ConsultRequestError::InvalidTurnLimit); + } + Ok(Self(raw)) + } + + fn get(self) -> u64 { + self.0 + } +} + +#[derive(Debug, Error)] +enum ConsultRequestError { + #[error("prompt must not be empty")] + EmptyPrompt, + #[error("failed to resolve the current working directory: {source}")] + CurrentDir { source: io::Error }, + #[error("failed to resolve working directory `{path}`: {source}")] + Canonicalize { path: String, source: io::Error }, + #[error("working directory `{0}` is not a directory")] + NotDirectory(String), + #[error("max_turns must be greater than zero")] + InvalidTurnLimit, +} + +#[derive(Debug, Error)] +enum ConsultInvocationError { + #[error("failed to spawn Claude Code: {0}")] + Spawn(#[source] io::Error), + #[error("Claude Code returned non-JSON output: {0}")] + InvalidJson(String), + #[error("{0}")] + Downstream(String), +} + +#[derive(Debug, Deserialize)] +struct ClaudeJsonEnvelope { + #[serde(rename = "type")] + envelope_type: String, + subtype: Option, + is_error: bool, + duration_ms: Option, + duration_api_ms: Option, + num_turns: Option, + result: Option, + stop_reason: Option, + session_id: Option, + total_cost_usd: Option, + usage: Option, + #[serde(rename = "modelUsage")] + model_usage: Option, + #[serde(default)] + permission_denials: Vec, + fast_mode_state: Option, + uuid: Option, +} + +#[derive(Debug)] +struct ConsultResponse { + cwd: WorkingDirectory, + result: String, + duration_ms: u64, + duration_api_ms: Option, + num_turns: u64, + stop_reason: Option, + session_id: Option, + total_cost_usd: Option, + usage: Option, + model_usage: Option, + permission_denials: Vec, + fast_mode_state: Option, + uuid: Option, +} + +impl ConsultResponse { + fn model_name(&self) -> Option { + let Value::Object(models) = self.model_usage.as_ref()? else { + return None; + }; + models.keys().next().cloned() + } +} + +fn deserialize Deserialize<'de>>( + value: Value, + operation: &str, + generation: Generation, +) -> Result { + serde_json::from_value(value).map_err(|error| { + FaultRecord::invalid_input( + generation, + FaultStage::Protocol, + operation, + format!("invalid params: {error}"), + ) + }) +} + +fn invalid_consult_request( + generation: Generation, + operation: &str, + error: ConsultRequestError, +) -> FaultRecord { + FaultRecord::invalid_input(generation, FaultStage::Worker, operation, error.to_string()) +} + +fn consult_fault( + generation: Generation, + operation: &str, + error: ConsultInvocationError, +) -> FaultRecord { + match error { + ConsultInvocationError::Spawn(source) => FaultRecord::process( + generation, + FaultStage::Claude, + operation, + source.to_string(), + ), + ConsultInvocationError::InvalidJson(detail) + | ConsultInvocationError::Downstream(detail) => { + FaultRecord::downstream(generation, FaultStage::Claude, operation, detail) + } + } +} + +fn invoke_claude(request: &ConsultRequest) -> Result { + let mut command = Command::new(claude_binary()); + let _ = command + .arg("-p") + .arg("--output-format") + .arg("json") + .arg("--strict-mcp-config") + .arg("--mcp-config") + .arg(EMPTY_MCP_CONFIG) + .arg("--disable-slash-commands") + .arg("--no-chrome") + .arg("--tools") + .arg(CLAUDE_TOOLSET) + .arg("--permission-mode") + .arg("dontAsk"); + if let Some(max_turns) = request.max_turns { + let _ = command.arg("--max-turns").arg(max_turns.get().to_string()); + } + let output = command + .current_dir(request.cwd.as_path()) + .arg(request.prompt.as_str()) + .output() + .map_err(ConsultInvocationError::Spawn)?; + let stdout = String::from_utf8_lossy(&output.stdout).trim().to_owned(); + let stderr = String::from_utf8_lossy(&output.stderr).trim().to_owned(); + let envelope = match serde_json::from_slice::(&output.stdout) { + Ok(envelope) => envelope, + Err(_error) if !output.status.success() => { + return Err(ConsultInvocationError::Downstream(downstream_message( + output.status.code(), + &stdout, + &stderr, + ))); + } + Err(error) => { + return Err(ConsultInvocationError::InvalidJson(format!( + "{error}; stdout={stdout}; stderr={stderr}" + ))); + } + }; + if envelope.envelope_type != "result" { + return Err(ConsultInvocationError::Downstream(format!( + "unexpected Claude envelope type `{}`", + envelope.envelope_type + ))); + } + if !output.status.success() + || envelope.is_error + || envelope.subtype.as_deref() != Some("success") + { + return Err(ConsultInvocationError::Downstream( + envelope + .result + .filter(|value| !value.trim().is_empty()) + .unwrap_or_else(|| downstream_message(output.status.code(), &stdout, &stderr)), + )); + } + Ok(ConsultResponse { + cwd: request.cwd.clone(), + result: envelope.result.unwrap_or_default(), + duration_ms: envelope.duration_ms.unwrap_or(0), + duration_api_ms: envelope.duration_api_ms, + num_turns: envelope.num_turns.unwrap_or(0), + stop_reason: envelope.stop_reason, + session_id: envelope.session_id, + total_cost_usd: envelope.total_cost_usd, + usage: envelope.usage, + model_usage: envelope.model_usage, + permission_denials: envelope.permission_denials, + fast_mode_state: envelope.fast_mode_state, + uuid: envelope.uuid, + }) +} + +fn downstream_message(status_code: Option, stdout: &str, stderr: &str) -> String { + if !stderr.is_empty() { + return stderr.to_owned(); + } + if !stdout.is_empty() { + return stdout.to_owned(); + } + format!("Claude Code exited with status {status_code:?}") +} + +fn claude_binary() -> PathBuf { + std::env::var_os(CLAUDE_BIN_ENV) + .map(PathBuf::from) + .unwrap_or_else(|| PathBuf::from("claude")) +} + +fn consult_output( + request: &ConsultRequest, + response: &ConsultResponse, + generation: Generation, + operation: &str, +) -> Result { + let concise = json!({ + "response": response.result, + "cwd": response.cwd.display(), + "model": response.model_name(), + "duration_ms": response.duration_ms, + "num_turns": response.num_turns, + "stop_reason": response.stop_reason, + "session_id": response.session_id, + "total_cost_usd": response.total_cost_usd, + "permission_denial_count": response.permission_denials.len(), + }); + let full = json!({ + "response": response.result, + "cwd": response.cwd.display(), + "prompt": request.prompt.as_str(), + "max_turns": request.max_turns.map(TurnLimit::get), + "duration_ms": response.duration_ms, + "duration_api_ms": response.duration_api_ms, + "num_turns": response.num_turns, + "stop_reason": response.stop_reason, + "session_id": response.session_id, + "total_cost_usd": response.total_cost_usd, + "usage": response.usage, + "model_usage": response.model_usage, + "permission_denials": response.permission_denials, + "fast_mode_state": response.fast_mode_state, + "uuid": response.uuid, + }); + fallback_detailed_tool_output( + &concise, + &full, + concise_text(response), + Some(full_text(response)), + SurfaceKind::Read, + generation, + FaultStage::Worker, + operation, + ) +} + +fn concise_text(response: &ConsultResponse) -> String { + let mut status = vec![ + "consult ok".to_owned(), + format!("turns={}", response.num_turns), + format!("duration={}", render_duration_ms(response.duration_ms)), + ]; + if let Some(model) = response.model_name() { + status.push(format!("model={model}")); + } + if let Some(stop_reason) = response.stop_reason.as_deref() { + status.push(format!("stop={stop_reason}")); + } + if let Some(cost) = response.total_cost_usd { + status.push(format!("cost=${cost:.6}")); + } + + let mut lines = vec![status.join(" ")]; + lines.push(format!("cwd: {}", response.cwd.display())); + if let Some(session_id) = response.session_id.as_deref() { + lines.push(format!("session: {session_id}")); + } + if !response.permission_denials.is_empty() { + lines.push(format!( + "permission_denials: {}", + response.permission_denials.len() + )); + } + lines.push("response:".to_owned()); + lines.push(response.result.clone()); + lines.join("\n") +} + +fn full_text(response: &ConsultResponse) -> String { + let mut lines = vec![ + format!("consult ok turns={}", response.num_turns), + format!("cwd: {}", response.cwd.display()), + format!("duration: {}", render_duration_ms(response.duration_ms)), + ]; + if let Some(duration_api_ms) = response.duration_api_ms { + lines.push(format!( + "api_duration: {}", + render_duration_ms(duration_api_ms) + )); + } + if let Some(model) = response.model_name() { + lines.push(format!("model: {model}")); + } + if let Some(stop_reason) = response.stop_reason.as_deref() { + lines.push(format!("stop: {stop_reason}")); + } + if let Some(session_id) = response.session_id.as_deref() { + lines.push(format!("session: {session_id}")); + } + if let Some(cost) = response.total_cost_usd { + lines.push(format!("cost_usd: {cost:.6}")); + } + lines.push(format!( + "permission_denials: {}", + response.permission_denials.len() + )); + if let Some(fast_mode_state) = response.fast_mode_state.as_deref() { + lines.push(format!("fast_mode: {fast_mode_state}")); + } + if let Some(uuid) = response.uuid.as_deref() { + lines.push(format!("uuid: {uuid}")); + } + if let Some(usage) = usage_summary(response.usage.as_ref()) { + lines.push(format!("usage: {usage}")); + } + lines.push("response:".to_owned()); + lines.push(response.result.clone()); + lines.join("\n") +} + +fn usage_summary(usage: Option<&Value>) -> Option { + let Value::Object(usage) = usage? else { + return None; + }; + let summary = usage + .iter() + .filter_map(|(key, value)| match value { + Value::Number(number) => Some((key.clone(), number.to_string())), + Value::String(text) if !text.is_empty() => Some((key.clone(), text.clone())), + _ => None, + }) + .collect::>(); + (!summary.is_empty()).then(|| { + summary + .into_iter() + .map(|(key, value)| format!("{key}={value}")) + .collect::>() + .join(" ") + }) +} + +fn render_duration_ms(duration_ms: u64) -> String { + if duration_ms < 1_000 { + return format!("{duration_ms}ms"); + } + let seconds = duration_ms as f64 / 1_000.0; + format!("{seconds:.3}s") +} + +fn generation_from_wire(raw: u64) -> Generation { + let mut generation = Generation::genesis(); + for _ in 1..raw { + generation = generation.next(); + } + generation +} diff --git a/crates/phone-opus/src/mcp/telemetry.rs b/crates/phone-opus/src/mcp/telemetry.rs new file mode 100644 index 0000000..8df0009 --- /dev/null +++ b/crates/phone-opus/src/mcp/telemetry.rs @@ -0,0 +1,228 @@ +use std::collections::BTreeMap; +use std::time::{Duration, SystemTime, UNIX_EPOCH}; + +use libmcp::{ + Fault, Generation, HealthSnapshot, LifecycleState, MethodTelemetry, RolloutState, + TelemetrySnapshot, TelemetryTotals, +}; +use serde::{Deserialize, Serialize}; + +use crate::mcp::fault::FaultRecord; + +#[derive(Clone, Debug, Default, Deserialize, Eq, PartialEq, Serialize)] +struct MethodStats { + request_count: u64, + success_count: u64, + response_error_count: u64, + transport_fault_count: u64, + retry_count: u64, + total_latency_ms: u128, + max_latency_ms: u64, + last_latency_ms: Option, + last_error: Option, +} + +#[derive(Clone, Debug, Deserialize, Eq, PartialEq, Serialize)] +pub(crate) struct ServerTelemetry { + started_unix_ms: u64, + state: LifecycleState, + generation: Generation, + consecutive_failures: u32, + restart_count: u64, + host_rollouts: u64, + totals: TelemetryTotals, + methods: BTreeMap, + last_fault: Option, +} + +impl Default for ServerTelemetry { + fn default() -> Self { + Self { + started_unix_ms: unix_ms_now(), + state: LifecycleState::Cold, + generation: Generation::genesis(), + consecutive_failures: 0, + restart_count: 0, + host_rollouts: 0, + totals: TelemetryTotals { + request_count: 0, + success_count: 0, + response_error_count: 0, + transport_fault_count: 0, + retry_count: 0, + }, + methods: BTreeMap::new(), + last_fault: None, + } + } +} + +impl ServerTelemetry { + pub(crate) fn record_request(&mut self, operation: &str) { + self.totals.request_count += 1; + self.methods + .entry(operation.to_owned()) + .or_default() + .request_count += 1; + } + + pub(crate) fn record_success( + &mut self, + operation: &str, + latency_ms: u64, + generation: Generation, + worker_alive: bool, + ) { + self.generation = generation; + self.state = if worker_alive { + LifecycleState::Ready + } else { + LifecycleState::Cold + }; + self.consecutive_failures = 0; + self.last_fault = None; + self.totals.success_count += 1; + let entry = self.methods.entry(operation.to_owned()).or_default(); + entry.success_count += 1; + entry.total_latency_ms = entry + .total_latency_ms + .saturating_add(u128::from(latency_ms)); + entry.max_latency_ms = entry.max_latency_ms.max(latency_ms); + entry.last_latency_ms = Some(latency_ms); + entry.last_error = None; + } + + pub(crate) fn record_error( + &mut self, + operation: &str, + fault: &FaultRecord, + latency_ms: u64, + generation: Generation, + ) { + self.generation = generation; + self.consecutive_failures = self.consecutive_failures.saturating_add(1); + self.last_fault = Some(fault.fault.clone()); + let transportish = matches!( + fault.fault.class, + libmcp::FaultClass::Transport + | libmcp::FaultClass::Process + | libmcp::FaultClass::Timeout + | libmcp::FaultClass::Resource + | libmcp::FaultClass::Replay + | libmcp::FaultClass::Rollout + ); + if transportish { + self.state = LifecycleState::Recovering; + self.totals.transport_fault_count += 1; + } else { + self.totals.response_error_count += 1; + } + let entry = self.methods.entry(operation.to_owned()).or_default(); + if transportish { + entry.transport_fault_count += 1; + } else { + entry.response_error_count += 1; + } + entry.total_latency_ms = entry + .total_latency_ms + .saturating_add(u128::from(latency_ms)); + entry.max_latency_ms = entry.max_latency_ms.max(latency_ms); + entry.last_latency_ms = Some(latency_ms); + entry.last_error = Some(fault.message().to_owned()); + } + + pub(crate) fn record_retry(&mut self, operation: &str) { + self.totals.retry_count += 1; + self.methods + .entry(operation.to_owned()) + .or_default() + .retry_count += 1; + } + + pub(crate) fn record_worker_restart(&mut self, generation: Generation) { + self.generation = generation; + self.restart_count += 1; + self.state = LifecycleState::Recovering; + } + + pub(crate) fn record_rollout(&mut self) { + self.host_rollouts += 1; + } + + pub(crate) fn host_rollouts(&self) -> u64 { + self.host_rollouts + } + + pub(crate) fn health_snapshot(&self, rollout: RolloutState) -> HealthSnapshot { + HealthSnapshot { + state: self.state, + generation: self.generation, + uptime_ms: self.uptime_ms(), + consecutive_failures: self.consecutive_failures, + restart_count: self.restart_count, + rollout: Some(rollout), + last_fault: self.last_fault.clone(), + } + } + + pub(crate) fn telemetry_snapshot(&self) -> TelemetrySnapshot { + TelemetrySnapshot { + uptime_ms: self.uptime_ms(), + state: self.state, + generation: self.generation, + consecutive_failures: self.consecutive_failures, + restart_count: self.restart_count, + totals: self.totals.clone(), + methods: self.ranked_methods(), + last_fault: self.last_fault.clone(), + } + } + + pub(crate) fn ranked_methods(&self) -> Vec { + let mut methods = self + .methods + .iter() + .map(|(method, stats)| MethodTelemetry { + method: method.clone(), + request_count: stats.request_count, + success_count: stats.success_count, + response_error_count: stats.response_error_count, + transport_fault_count: stats.transport_fault_count, + retry_count: stats.retry_count, + last_latency_ms: stats.last_latency_ms, + max_latency_ms: stats.max_latency_ms, + avg_latency_ms: average_latency_ms(stats), + last_error: stats.last_error.clone(), + }) + .collect::>(); + methods.sort_by(|left, right| { + right + .request_count + .cmp(&left.request_count) + .then_with(|| right.transport_fault_count.cmp(&left.transport_fault_count)) + .then_with(|| right.response_error_count.cmp(&left.response_error_count)) + .then_with(|| left.method.cmp(&right.method)) + }); + methods + } + + fn uptime_ms(&self) -> u64 { + unix_ms_now().saturating_sub(self.started_unix_ms) + } +} + +fn average_latency_ms(stats: &MethodStats) -> u64 { + if stats.request_count == 0 { + return 0; + } + let average = stats.total_latency_ms / u128::from(stats.request_count); + u64::try_from(average).unwrap_or(u64::MAX) +} + +fn unix_ms_now() -> u64 { + let duration = SystemTime::now() + .duration_since(UNIX_EPOCH) + .unwrap_or(Duration::ZERO); + let millis = duration.as_millis(); + u64::try_from(millis).unwrap_or(u64::MAX) +} diff --git a/crates/phone-opus/tests/mcp_hardening.rs b/crates/phone-opus/tests/mcp_hardening.rs new file mode 100644 index 0000000..b47b365 --- /dev/null +++ b/crates/phone-opus/tests/mcp_hardening.rs @@ -0,0 +1,422 @@ +use clap as _; +use dirs as _; +use libmcp as _; +use std::fs; +use std::io::{self, BufRead, BufReader, Write}; +use std::os::unix::fs::PermissionsExt; +use std::path::{Path, PathBuf}; +use std::process::{Child, ChildStdin, ChildStdout, Command, Stdio}; + +use libmcp_testkit::read_json_lines; +use serde as _; +use serde_json::{Value, json}; +use thiserror as _; + +type TestResult = Result>; + +fn must( + result: Result, + context: C, +) -> TestResult { + result.map_err(|error| io::Error::other(format!("{context}: {error}")).into()) +} + +fn must_some(value: Option, context: &str) -> TestResult { + value.ok_or_else(|| io::Error::other(context).into()) +} + +fn temp_root(name: &str) -> TestResult { + let root = std::env::temp_dir().join(format!( + "phone_opus_{name}_{}_{}", + std::process::id(), + must( + std::time::SystemTime::now().duration_since(std::time::UNIX_EPOCH), + "current time after unix epoch", + )? + .as_nanos() + )); + must(fs::create_dir_all(&root), "create temp root")?; + Ok(root) +} + +fn binary_path() -> PathBuf { + PathBuf::from(env!("CARGO_BIN_EXE_phone-opus")) +} + +struct McpHarness { + child: Child, + stdin: ChildStdin, + stdout: BufReader, +} + +impl McpHarness { + fn spawn(state_home: &Path, extra_env: &[(&str, &str)]) -> TestResult { + let mut command = Command::new(binary_path()); + let _ = command + .arg("mcp") + .arg("serve") + .env("XDG_STATE_HOME", state_home) + .stdin(Stdio::piped()) + .stdout(Stdio::piped()) + .stderr(Stdio::inherit()); + for (key, value) in extra_env { + let _ = command.env(key, value); + } + let mut child = must(command.spawn(), "spawn mcp host")?; + let stdin = must_some(child.stdin.take(), "host stdin")?; + let stdout = BufReader::new(must_some(child.stdout.take(), "host stdout")?); + Ok(Self { + child, + stdin, + stdout, + }) + } + + fn initialize(&mut self) -> TestResult { + self.request(json!({ + "jsonrpc": "2.0", + "id": 1, + "method": "initialize", + "params": { + "protocolVersion": "2025-11-25", + "capabilities": {}, + "clientInfo": { "name": "mcp-hardening-test", "version": "0" } + } + })) + } + + fn notify_initialized(&mut self) -> TestResult { + self.notify(json!({ + "jsonrpc": "2.0", + "method": "notifications/initialized", + })) + } + + fn tools_list(&mut self) -> TestResult { + self.request(json!({ + "jsonrpc": "2.0", + "id": 2, + "method": "tools/list", + "params": {}, + })) + } + + fn call_tool(&mut self, id: u64, name: &str, arguments: Value) -> TestResult { + self.request(json!({ + "jsonrpc": "2.0", + "id": id, + "method": "tools/call", + "params": { + "name": name, + "arguments": arguments, + } + })) + } + + fn request(&mut self, message: Value) -> TestResult { + let encoded = must(serde_json::to_string(&message), "request json")?; + must(writeln!(self.stdin, "{encoded}"), "write request")?; + must(self.stdin.flush(), "flush request")?; + let mut line = String::new(); + let byte_count = must(self.stdout.read_line(&mut line), "read response")?; + if byte_count == 0 { + return Err(io::Error::other("unexpected EOF reading response").into()); + } + must(serde_json::from_str(&line), "response json") + } + + fn notify(&mut self, message: Value) -> TestResult { + let encoded = must(serde_json::to_string(&message), "notify json")?; + must(writeln!(self.stdin, "{encoded}"), "write notify")?; + must(self.stdin.flush(), "flush notify")?; + Ok(()) + } +} + +impl Drop for McpHarness { + fn drop(&mut self) { + let _ = self.child.kill(); + let _ = self.child.wait(); + } +} + +fn assert_tool_ok(response: &Value) { + assert_eq!( + response["result"]["isError"].as_bool(), + Some(false), + "tool response unexpectedly errored: {response:#}" + ); +} + +fn assert_tool_error(response: &Value) { + assert_eq!( + response["result"]["isError"].as_bool(), + Some(true), + "tool response unexpectedly succeeded: {response:#}" + ); +} + +fn tool_content(response: &Value) -> &Value { + &response["result"]["structuredContent"] +} + +fn tool_names(response: &Value) -> Vec<&str> { + response["result"]["tools"] + .as_array() + .into_iter() + .flatten() + .filter_map(|tool| tool["name"].as_str()) + .collect() +} + +fn write_fake_claude_script(path: &Path) -> TestResult { + let script = r#"#!/bin/sh +set -eu +if [ -n "${PHONE_OPUS_TEST_PWD_FILE:-}" ]; then + pwd >"$PHONE_OPUS_TEST_PWD_FILE" +fi +if [ -n "${PHONE_OPUS_TEST_ARGS_FILE:-}" ]; then + printf '%s\n' "$@" >"$PHONE_OPUS_TEST_ARGS_FILE" +fi +if [ -n "${PHONE_OPUS_TEST_STDERR:-}" ]; then + printf '%s\n' "$PHONE_OPUS_TEST_STDERR" >&2 +fi +if [ -n "${PHONE_OPUS_TEST_STDOUT_FILE:-}" ]; then + cat "$PHONE_OPUS_TEST_STDOUT_FILE" +fi +exit "${PHONE_OPUS_TEST_EXIT_CODE:-0}" +"#; + must(fs::write(path, script), "write fake claude script")?; + let mut permissions = must(fs::metadata(path), "fake claude metadata")?.permissions(); + permissions.set_mode(0o755); + must( + fs::set_permissions(path, permissions), + "chmod fake claude script", + )?; + Ok(()) +} + +#[test] +fn cold_start_exposes_consult_and_ops_tools() -> TestResult { + let root = temp_root("cold_start")?; + let state_home = root.join("state-home"); + must(fs::create_dir_all(&state_home), "create state home")?; + + let mut harness = McpHarness::spawn(&state_home, &[])?; + let initialize = harness.initialize()?; + assert_eq!( + initialize["result"]["protocolVersion"].as_str(), + Some("2025-11-25") + ); + harness.notify_initialized()?; + + let tools = harness.tools_list()?; + let tool_names = tool_names(&tools); + assert!(tool_names.contains(&"consult")); + assert!(tool_names.contains(&"health_snapshot")); + assert!(tool_names.contains(&"telemetry_snapshot")); + + let health = harness.call_tool(3, "health_snapshot", json!({}))?; + assert_tool_ok(&health); + assert_eq!(tool_content(&health)["worker_generation"].as_u64(), Some(1)); + Ok(()) +} + +#[test] +fn consult_uses_read_only_toolset_and_requested_working_directory() -> TestResult { + let root = temp_root("consult_success")?; + let state_home = root.join("state-home"); + let sandbox = root.join("sandbox"); + must(fs::create_dir_all(&state_home), "create state home")?; + must(fs::create_dir_all(&sandbox), "create sandbox")?; + + let fake_claude = root.join("claude"); + let stdout_file = root.join("stdout.json"); + let args_file = root.join("args.txt"); + let pwd_file = root.join("pwd.txt"); + write_fake_claude_script(&fake_claude)?; + must( + fs::write( + &stdout_file, + serde_json::to_string(&json!({ + "type": "result", + "subtype": "success", + "is_error": false, + "duration_ms": 1234, + "duration_api_ms": 1200, + "num_turns": 2, + "result": "oracle", + "stop_reason": "end_turn", + "session_id": "session-123", + "total_cost_usd": 0.125, + "usage": { + "input_tokens": 10, + "output_tokens": 5 + }, + "modelUsage": { + "claude-sonnet-4-6": { + "inputTokens": 10, + "outputTokens": 5 + } + }, + "permission_denials": [], + "fast_mode_state": "off", + "uuid": "uuid-123" + }))?, + ), + "write fake stdout", + )?; + + let claude_bin = fake_claude.display().to_string(); + let stdout_path = stdout_file.display().to_string(); + let args_path = args_file.display().to_string(); + let pwd_path = pwd_file.display().to_string(); + let env = [ + ("PHONE_OPUS_CLAUDE_BIN", claude_bin.as_str()), + ("PHONE_OPUS_TEST_STDOUT_FILE", stdout_path.as_str()), + ("PHONE_OPUS_TEST_ARGS_FILE", args_path.as_str()), + ("PHONE_OPUS_TEST_PWD_FILE", pwd_path.as_str()), + ]; + let mut harness = McpHarness::spawn(&state_home, &env)?; + let _ = harness.initialize()?; + harness.notify_initialized()?; + + let consult = harness.call_tool( + 3, + "consult", + json!({ + "prompt": "say oracle", + "cwd": sandbox.display().to_string(), + "max_turns": 7 + }), + )?; + assert_tool_ok(&consult); + assert_eq!(tool_content(&consult)["response"].as_str(), Some("oracle")); + assert_eq!( + tool_content(&consult)["cwd"].as_str(), + Some(sandbox.display().to_string().as_str()) + ); + assert_eq!(tool_content(&consult)["num_turns"].as_u64(), Some(2)); + + let pwd = must(fs::read_to_string(&pwd_file), "read fake pwd file")?; + assert_eq!(pwd.trim(), sandbox.display().to_string()); + + let args = must(fs::read_to_string(&args_file), "read fake args file")?; + let lines = args.lines().collect::>(); + assert!(lines.contains(&"-p")); + assert!(lines.contains(&"--output-format")); + assert!(lines.contains(&"json")); + assert!(lines.contains(&"--strict-mcp-config")); + assert!(lines.contains(&"--mcp-config")); + assert!(lines.contains(&"{\"mcpServers\":{}}")); + assert!(lines.contains(&"--disable-slash-commands")); + assert!(lines.contains(&"--no-chrome")); + assert!(lines.contains(&"--tools")); + assert!(lines.contains(&"Bash,Read,Grep,Glob,LS,WebFetch,WebSearch")); + assert!(lines.contains(&"--permission-mode")); + assert!(lines.contains(&"dontAsk")); + assert!(lines.contains(&"--max-turns")); + assert!(lines.contains(&"7")); + assert_eq!(lines.last().copied(), Some("say oracle")); + + let telemetry = harness.call_tool(4, "telemetry_snapshot", json!({}))?; + assert_tool_ok(&telemetry); + let hot_methods = tool_content(&telemetry)["hot_methods"] + .as_array() + .cloned() + .unwrap_or_default(); + assert!( + hot_methods + .iter() + .any(|value| value["method"] == "tools/call:consult") + ); + Ok(()) +} + +#[test] +fn consult_surfaces_downstream_cli_failures() -> TestResult { + let root = temp_root("consult_failure")?; + let state_home = root.join("state-home"); + let fake_claude = root.join("claude"); + must(fs::create_dir_all(&state_home), "create state home")?; + write_fake_claude_script(&fake_claude)?; + + let claude_bin = fake_claude.display().to_string(); + let env = [ + ("PHONE_OPUS_CLAUDE_BIN", claude_bin.as_str()), + ("PHONE_OPUS_TEST_EXIT_CODE", "17"), + ("PHONE_OPUS_TEST_STDERR", "permission denied by fake claude"), + ]; + let mut harness = McpHarness::spawn(&state_home, &env)?; + let _ = harness.initialize()?; + harness.notify_initialized()?; + + let consult = harness.call_tool(3, "consult", json!({ "prompt": "fail" }))?; + assert_tool_error(&consult); + assert_eq!( + tool_content(&consult)["fault"]["class"].as_str(), + Some("downstream") + ); + assert!( + tool_content(&consult)["fault"]["detail"] + .as_str() + .is_some_and(|value| value.contains("permission denied by fake claude")) + ); + Ok(()) +} + +#[test] +fn consult_never_replays_after_worker_transport_failure() -> TestResult { + let root = temp_root("consult_no_replay")?; + let state_home = root.join("state-home"); + let fake_claude = root.join("claude"); + must(fs::create_dir_all(&state_home), "create state home")?; + write_fake_claude_script(&fake_claude)?; + + let claude_bin = fake_claude.display().to_string(); + let env = [ + ("PHONE_OPUS_CLAUDE_BIN", claude_bin.as_str()), + ( + "PHONE_OPUS_MCP_TEST_WORKER_CRASH_ONCE_KEY", + "tools/call:consult", + ), + ]; + let mut harness = McpHarness::spawn(&state_home, &env)?; + let _ = harness.initialize()?; + harness.notify_initialized()?; + + let consult = harness.call_tool(3, "consult", json!({ "prompt": "crash once" }))?; + assert_tool_error(&consult); + assert_eq!( + tool_content(&consult)["fault"]["class"].as_str(), + Some("transport") + ); + assert_eq!(tool_content(&consult)["retryable"].as_bool(), Some(true)); + assert_eq!(tool_content(&consult)["retried"].as_bool(), Some(false)); + + let telemetry = harness.call_tool(4, "telemetry_snapshot", json!({ "render": "json" }))?; + assert_tool_ok(&telemetry); + assert_eq!(tool_content(&telemetry)["retries"].as_u64(), Some(0)); + let hot_methods = tool_content(&telemetry)["hot_methods"] + .as_array() + .cloned() + .unwrap_or_default(); + let consult_method = hot_methods + .iter() + .find(|value| value["method"] == "tools/call:consult") + .cloned() + .unwrap_or_default(); + assert_eq!(consult_method["transport_faults"].as_u64(), Some(1)); + + let telemetry_log_path = state_home + .join("phone_opus") + .join("mcp") + .join("telemetry.jsonl"); + let telemetry_rows = read_json_lines::(&telemetry_log_path)?; + assert!( + telemetry_rows + .iter() + .any(|row| row["event"] == "tool_call" && row["tool_name"] == "consult") + ); + Ok(()) +} diff --git a/rust-toolchain.toml b/rust-toolchain.toml new file mode 100644 index 0000000..5d3ba6e --- /dev/null +++ b/rust-toolchain.toml @@ -0,0 +1,5 @@ +[toolchain] +channel = "1.94.0" +profile = "minimal" +components = ["clippy", "rustfmt"] + -- cgit v1.2.3