diff options
| author | main <main@swarm.moe> | 2026-03-20 16:00:30 -0400 |
|---|---|---|
| committer | main <main@swarm.moe> | 2026-03-20 16:00:30 -0400 |
| commit | 9d63844f3a28fde70b19500422f17379e99e588a (patch) | |
| tree | 163cfbd65a8d3528346561410ef39eb1183a16f2 /crates/fidget-spinner-store-sqlite | |
| parent | 22fe3d2ce7478450a1d7443c4ecbd85fd4c46716 (diff) | |
| download | fidget_spinner-9d63844f3a28fde70b19500422f17379e99e588a.zip | |
Refound Spinner as an austere frontier ledger
Diffstat (limited to 'crates/fidget-spinner-store-sqlite')
| -rw-r--r-- | crates/fidget-spinner-store-sqlite/Cargo.toml | 2 | ||||
| -rw-r--r-- | crates/fidget-spinner-store-sqlite/src/lib.rs | 6259 |
2 files changed, 2699 insertions, 3562 deletions
diff --git a/crates/fidget-spinner-store-sqlite/Cargo.toml b/crates/fidget-spinner-store-sqlite/Cargo.toml index 00fd070..01d6f44 100644 --- a/crates/fidget-spinner-store-sqlite/Cargo.toml +++ b/crates/fidget-spinner-store-sqlite/Cargo.toml @@ -1,7 +1,7 @@ [package] name = "fidget-spinner-store-sqlite" categories.workspace = true -description = "SQLite-backed per-project store for Fidget Spinner DAG projects" +description = "SQLite-backed per-project frontier ledger store for Fidget Spinner" edition.workspace = true keywords.workspace = true license.workspace = true diff --git a/crates/fidget-spinner-store-sqlite/src/lib.rs b/crates/fidget-spinner-store-sqlite/src/lib.rs index bbe7038..3680471 100644 --- a/crates/fidget-spinner-store-sqlite/src/lib.rs +++ b/crates/fidget-spinner-store-sqlite/src/lib.rs @@ -1,23 +1,20 @@ -use std::cmp::Ordering; use std::collections::{BTreeMap, BTreeSet}; -use std::fmt::Write as _; use std::fs; use std::io; use camino::{Utf8Path, Utf8PathBuf}; use fidget_spinner_core::{ - AnnotationVisibility, CommandRecipe, CompletedExperiment, DagEdge, DagNode, DiagnosticSeverity, - EdgeKind, ExecutionBackend, ExperimentResult, FieldPresence, FieldRole, FieldValueType, - FrontierContract, FrontierNote, FrontierProjection, FrontierRecord, FrontierStatus, - FrontierVerdict, FrontierVerdictCounts, InferencePolicy, JsonObject, MetricDefinition, - MetricSpec, MetricUnit, MetricValue, NodeAnnotation, NodeClass, NodeDiagnostics, NodePayload, - NonEmptyText, OpenExperiment, OptimizationObjective, ProjectFieldSpec, ProjectSchema, - RunDimensionDefinition, RunDimensionValue, RunRecord, RunStatus, TagName, TagRecord, + ArtifactId, ArtifactKind, ArtifactRecord, AttachmentTargetRef, CommandRecipe, CoreError, + ExecutionBackend, ExperimentAnalysis, ExperimentId, ExperimentOutcome, ExperimentRecord, + ExperimentStatus, FieldValueType, FrontierBrief, FrontierId, FrontierRecord, + FrontierRoadmapItem, FrontierStatus, FrontierVerdict, HypothesisId, HypothesisRecord, + MetricDefinition, MetricUnit, MetricValue, MetricVisibility, NonEmptyText, + OptimizationObjective, RunDimensionDefinition, RunDimensionValue, Slug, TagName, TagRecord, + VertexRef, }; -use rusqlite::types::Value as SqlValue; -use rusqlite::{Connection, OptionalExtension, Transaction, params, params_from_iter}; +use rusqlite::{Connection, OptionalExtension, Transaction, params}; use serde::{Deserialize, Serialize}; -use serde_json::{Value, json}; +use serde_json::Value; use thiserror::Error; use time::OffsetDateTime; use time::format_description::well_known::Rfc3339; @@ -26,8 +23,7 @@ use uuid::Uuid; pub const STORE_DIR_NAME: &str = ".fidget_spinner"; pub const STATE_DB_NAME: &str = "state.sqlite"; pub const PROJECT_CONFIG_NAME: &str = "project.json"; -pub const PROJECT_SCHEMA_NAME: &str = "schema.json"; -pub const CURRENT_STORE_FORMAT_VERSION: u32 = 3; +pub const CURRENT_STORE_FORMAT_VERSION: u32 = 4; #[derive(Debug, Error)] pub enum StoreError { @@ -49,17 +45,11 @@ pub enum StoreError { #[error("time format failure")] TimeFormat(#[from] time::error::Format), #[error("core domain failure")] - Core(#[from] fidget_spinner_core::CoreError), + Core(#[from] CoreError), #[error("UUID parse failure")] Uuid(#[from] uuid::Error), - #[error("node {0} was not found")] - NodeNotFound(fidget_spinner_core::NodeId), - #[error("frontier {0} was not found")] - FrontierNotFound(fidget_spinner_core::FrontierId), - #[error("experiment {0} was not found")] - ExperimentNotFound(fidget_spinner_core::ExperimentId), - #[error("node {0} is not a hypothesis node")] - NodeNotHypothesis(fidget_spinner_core::NodeId), + #[error("{0}")] + InvalidInput(String), #[error( "project store format {observed} is incompatible with this binary (expected {expected}); reinitialize the store" )] @@ -68,50 +58,53 @@ pub enum StoreError { UnknownTag(TagName), #[error("tag `{0}` already exists")] DuplicateTag(TagName), - #[error("note nodes require an explicit tag list; use an empty list if no tags apply")] - NoteTagsRequired, - #[error("{0} nodes require a non-empty summary")] - ProseSummaryRequired(NodeClass), - #[error("{0} nodes require a non-empty string payload field `body`")] - ProseBodyRequired(NodeClass), #[error("metric `{0}` is not registered")] UnknownMetricDefinition(NonEmptyText), - #[error( - "metric `{key}` conflicts with existing definition ({existing_unit}/{existing_objective} vs {new_unit}/{new_objective})" - )] - ConflictingMetricDefinition { - key: String, - existing_unit: String, - existing_objective: String, - new_unit: String, - new_objective: String, - }, + #[error("metric `{0}` already exists")] + DuplicateMetricDefinition(NonEmptyText), #[error("run dimension `{0}` is not registered")] UnknownRunDimension(NonEmptyText), #[error("run dimension `{0}` already exists")] DuplicateRunDimension(NonEmptyText), + #[error("frontier selector `{0}` did not resolve")] + UnknownFrontierSelector(String), + #[error("hypothesis selector `{0}` did not resolve")] + UnknownHypothesisSelector(String), + #[error("experiment selector `{0}` did not resolve")] + UnknownExperimentSelector(String), + #[error("artifact selector `{0}` did not resolve")] + UnknownArtifactSelector(String), #[error( - "run dimension `{key}` conflicts with existing definition ({existing_type} vs {new_type})" + "entity revision mismatch for {kind} `{selector}`: expected {expected}, observed {observed}" )] - ConflictingRunDimensionDefinition { - key: String, - existing_type: String, - new_type: String, + RevisionMismatch { + kind: &'static str, + selector: String, + expected: u64, + observed: u64, }, - #[error("run dimension `{key}` expects {expected} values, got {observed}")] - InvalidRunDimensionValue { - key: String, - expected: String, - observed: String, - }, - #[error("schema field `{0}` was not found")] - SchemaFieldNotFound(String), - #[error("metric key `{key}` is ambiguous across sources: {sources}")] - AmbiguousMetricKey { key: String, sources: String }, - #[error("metric key `{key}` for source `{metric_source}` requires an explicit order")] - MetricOrderRequired { key: String, metric_source: String }, - #[error("metric key `{key}` for source `{metric_source}` has conflicting semantics")] - MetricSemanticsAmbiguous { key: String, metric_source: String }, + #[error("hypothesis body must be exactly one paragraph")] + HypothesisBodyMustBeSingleParagraph, + #[error("experiments must hang off exactly one hypothesis")] + ExperimentHypothesisRequired, + #[error("experiment `{0}` is already closed")] + ExperimentAlreadyClosed(ExperimentId), + #[error("experiment `{0}` is still open")] + ExperimentStillOpen(ExperimentId), + #[error("influence edge crosses frontier scope")] + CrossFrontierInfluence, + #[error("self edges are not allowed")] + SelfEdge, + #[error("unknown roadmap hypothesis `{0}`")] + UnknownRoadmapHypothesis(String), + #[error( + "manual experiments may omit command context only by using an empty argv surrogate explicitly" + )] + ManualExperimentRequiresCommand, + #[error("metric key `{key}` requires an explicit ranking order")] + MetricOrderRequired { key: String }, + #[error("dimension filter references unknown run dimension `{0}`")] + UnknownDimensionFilter(String), } #[derive(Clone, Debug, Deserialize, Eq, PartialEq, Serialize)] @@ -132,181 +125,294 @@ impl ProjectConfig { } } -#[derive(Clone, Debug)] -pub struct CreateNodeRequest { - pub class: NodeClass, - pub frontier_id: Option<fidget_spinner_core::FrontierId>, - pub title: NonEmptyText, - pub summary: Option<NonEmptyText>, - pub tags: Option<BTreeSet<TagName>>, - pub payload: NodePayload, - pub annotations: Vec<NodeAnnotation>, - pub attachments: Vec<EdgeAttachment>, +#[derive(Clone, Debug, Deserialize, Eq, PartialEq, Serialize)] +pub struct ProjectStatus { + pub project_root: Utf8PathBuf, + pub display_name: NonEmptyText, + pub store_format_version: u32, + pub frontier_count: u64, + pub hypothesis_count: u64, + pub experiment_count: u64, + pub open_experiment_count: u64, + pub artifact_count: u64, } #[derive(Clone, Copy, Debug, Deserialize, Eq, PartialEq, Serialize)] -pub enum EdgeAttachmentDirection { - ExistingToNew, - NewToExisting, +#[serde(rename_all = "snake_case")] +pub enum MetricScope { + Live, + Visible, + All, +} + +#[derive(Clone, Copy, Debug, Deserialize, Eq, PartialEq, Serialize)] +#[serde(rename_all = "snake_case")] +pub enum MetricRankOrder { + Asc, + Desc, } #[derive(Clone, Debug, Deserialize, Eq, PartialEq, Serialize)] -pub struct EdgeAttachment { - pub node_id: fidget_spinner_core::NodeId, - pub kind: EdgeKind, - pub direction: EdgeAttachmentDirection, +#[serde(tag = "kind", content = "selector", rename_all = "snake_case")] +pub enum VertexSelector { + Hypothesis(String), + Experiment(String), } -impl EdgeAttachment { - #[must_use] - pub fn materialize(&self, new_node_id: fidget_spinner_core::NodeId) -> DagEdge { - match self.direction { - EdgeAttachmentDirection::ExistingToNew => DagEdge { - source_id: self.node_id, - target_id: new_node_id, - kind: self.kind, - }, - EdgeAttachmentDirection::NewToExisting => DagEdge { - source_id: new_node_id, - target_id: self.node_id, - kind: self.kind, - }, - } - } +#[derive(Clone, Debug, Deserialize, Eq, PartialEq, Serialize)] +#[serde(tag = "kind", content = "selector", rename_all = "snake_case")] +pub enum AttachmentSelector { + Frontier(String), + Hypothesis(String), + Experiment(String), +} + +#[derive(Clone, Debug)] +pub struct CreateFrontierRequest { + pub label: NonEmptyText, + pub objective: NonEmptyText, + pub slug: Option<Slug>, +} + +#[derive(Clone, Debug, Deserialize, Eq, PartialEq, Serialize)] +pub struct FrontierSummary { + pub id: FrontierId, + pub slug: Slug, + pub label: NonEmptyText, + pub objective: NonEmptyText, + pub status: FrontierStatus, + pub active_hypothesis_count: u64, + pub open_experiment_count: u64, + pub updated_at: OffsetDateTime, +} + +#[derive(Clone, Debug, Deserialize, Eq, PartialEq, Serialize)] +pub struct FrontierRoadmapItemDraft { + pub rank: u32, + pub hypothesis: String, + pub summary: Option<NonEmptyText>, +} + +#[derive(Clone, Debug)] +pub enum TextPatch<T> { + Set(T), + Clear, +} + +#[derive(Clone, Debug)] +pub struct UpdateFrontierBriefRequest { + pub frontier: String, + pub expected_revision: Option<u64>, + pub situation: Option<TextPatch<NonEmptyText>>, + pub roadmap: Option<Vec<FrontierRoadmapItemDraft>>, + pub unknowns: Option<Vec<NonEmptyText>>, } #[derive(Clone, Debug)] -pub struct ListNodesQuery { - pub frontier_id: Option<fidget_spinner_core::FrontierId>, - pub class: Option<NodeClass>, +pub struct CreateHypothesisRequest { + pub frontier: String, + pub slug: Option<Slug>, + pub title: NonEmptyText, + pub summary: NonEmptyText, + pub body: NonEmptyText, pub tags: BTreeSet<TagName>, - pub include_archived: bool, - pub limit: u32, + pub parents: Vec<VertexSelector>, } -impl Default for ListNodesQuery { - fn default() -> Self { - Self { - frontier_id: None, - class: None, - tags: BTreeSet::new(), - include_archived: false, - limit: 20, - } - } +#[derive(Clone, Debug)] +pub struct UpdateHypothesisRequest { + pub hypothesis: String, + pub expected_revision: Option<u64>, + pub title: Option<NonEmptyText>, + pub summary: Option<NonEmptyText>, + pub body: Option<NonEmptyText>, + pub tags: Option<BTreeSet<TagName>>, + pub parents: Option<Vec<VertexSelector>>, + pub archived: Option<bool>, +} + +#[derive(Clone, Debug, Default)] +pub struct ListHypothesesQuery { + pub frontier: Option<String>, + pub tags: BTreeSet<TagName>, + pub include_archived: bool, + pub limit: Option<u32>, } #[derive(Clone, Debug, Deserialize, Eq, PartialEq, Serialize)] -pub struct NodeSummary { - pub id: fidget_spinner_core::NodeId, - pub class: NodeClass, - pub track: fidget_spinner_core::NodeTrack, - pub frontier_id: Option<fidget_spinner_core::FrontierId>, +pub struct VertexSummary { + pub vertex: VertexRef, + pub frontier_id: FrontierId, + pub slug: Slug, pub archived: bool, pub title: NonEmptyText, pub summary: Option<NonEmptyText>, - pub tags: BTreeSet<TagName>, - pub diagnostic_count: u64, - pub hidden_annotation_count: u64, - pub created_at: OffsetDateTime, pub updated_at: OffsetDateTime, } -#[derive(Clone, Copy, Debug, Deserialize, Eq, Ord, PartialEq, PartialOrd, Serialize)] -#[serde(rename_all = "snake_case")] -pub enum MetricFieldSource { - RunMetric, - HypothesisPayload, - RunPayload, - AnalysisPayload, - DecisionPayload, +#[derive(Clone, Debug, Deserialize, Eq, PartialEq, Serialize)] +pub struct HypothesisSummary { + pub id: HypothesisId, + pub slug: Slug, + pub frontier_id: FrontierId, + pub archived: bool, + pub title: NonEmptyText, + pub summary: NonEmptyText, + pub tags: Vec<TagName>, + pub open_experiment_count: u64, + pub latest_verdict: Option<FrontierVerdict>, + pub updated_at: OffsetDateTime, } -impl MetricFieldSource { - #[must_use] - pub const fn as_str(self) -> &'static str { - match self { - Self::RunMetric => "run_metric", - Self::HypothesisPayload => "hypothesis_payload", - Self::RunPayload => "run_payload", - Self::AnalysisPayload => "analysis_payload", - Self::DecisionPayload => "decision_payload", - } - } +#[derive(Clone, Debug, Deserialize, PartialEq, Serialize)] +pub struct HypothesisDetail { + pub record: HypothesisRecord, + pub parents: Vec<VertexSummary>, + pub children: Vec<VertexSummary>, + pub open_experiments: Vec<ExperimentSummary>, + pub closed_experiments: Vec<ExperimentSummary>, + pub artifacts: Vec<ArtifactSummary>, +} - #[must_use] - pub const fn from_payload_class(class: NodeClass) -> Option<Self> { - match class { - NodeClass::Hypothesis => Some(Self::HypothesisPayload), - NodeClass::Run => Some(Self::RunPayload), - NodeClass::Analysis => Some(Self::AnalysisPayload), - NodeClass::Decision => Some(Self::DecisionPayload), - NodeClass::Contract | NodeClass::Source | NodeClass::Note => None, - } - } +#[derive(Clone, Debug)] +pub struct OpenExperimentRequest { + pub hypothesis: String, + pub slug: Option<Slug>, + pub title: NonEmptyText, + pub summary: Option<NonEmptyText>, + pub tags: BTreeSet<TagName>, + pub parents: Vec<VertexSelector>, } -#[derive(Clone, Copy, Debug, Deserialize, Eq, Ord, PartialEq, PartialOrd, Serialize)] -#[serde(rename_all = "snake_case")] -pub enum MetricRankOrder { - Asc, - Desc, +#[derive(Clone, Debug)] +pub struct UpdateExperimentRequest { + pub experiment: String, + pub expected_revision: Option<u64>, + pub title: Option<NonEmptyText>, + pub summary: Option<TextPatch<NonEmptyText>>, + pub tags: Option<BTreeSet<TagName>>, + pub parents: Option<Vec<VertexSelector>>, + pub archived: Option<bool>, + pub outcome: Option<ExperimentOutcomePatch>, } -impl MetricRankOrder { - #[must_use] - pub const fn as_str(self) -> &'static str { - match self { - Self::Asc => "asc", - Self::Desc => "desc", - } - } +#[derive(Clone, Debug, Deserialize, PartialEq, Serialize)] +pub struct ExperimentOutcomePatch { + pub backend: ExecutionBackend, + pub command: CommandRecipe, + pub dimensions: BTreeMap<NonEmptyText, RunDimensionValue>, + pub primary_metric: MetricValue, + pub supporting_metrics: Vec<MetricValue>, + pub verdict: FrontierVerdict, + pub rationale: NonEmptyText, + pub analysis: Option<ExperimentAnalysis>, } #[derive(Clone, Debug)] -pub struct MetricBestQuery { - pub key: NonEmptyText, - pub frontier_id: Option<fidget_spinner_core::FrontierId>, - pub source: Option<MetricFieldSource>, +pub struct CloseExperimentRequest { + pub experiment: String, + pub expected_revision: Option<u64>, + pub backend: ExecutionBackend, + pub command: CommandRecipe, pub dimensions: BTreeMap<NonEmptyText, RunDimensionValue>, - pub order: Option<MetricRankOrder>, - pub limit: u32, + pub primary_metric: MetricValue, + pub supporting_metrics: Vec<MetricValue>, + pub verdict: FrontierVerdict, + pub rationale: NonEmptyText, + pub analysis: Option<ExperimentAnalysis>, } -#[derive(Clone, Debug, Deserialize, Eq, PartialEq, Serialize)] -pub struct MetricKeySummary { - pub key: NonEmptyText, - pub source: MetricFieldSource, - pub experiment_count: u64, - pub unit: Option<MetricUnit>, - pub objective: Option<OptimizationObjective>, - pub description: Option<NonEmptyText>, - pub requires_order: bool, +#[derive(Clone, Debug, Default)] +pub struct ListExperimentsQuery { + pub frontier: Option<String>, + pub hypothesis: Option<String>, + pub tags: BTreeSet<TagName>, + pub include_archived: bool, + pub status: Option<ExperimentStatus>, + pub limit: Option<u32>, } #[derive(Clone, Debug, Deserialize, PartialEq, Serialize)] -pub struct MetricBestEntry { +pub struct MetricObservationSummary { pub key: NonEmptyText, - pub source: MetricFieldSource, pub value: f64, - pub order: MetricRankOrder, - pub experiment_id: fidget_spinner_core::ExperimentId, - pub experiment_title: NonEmptyText, - pub frontier_id: fidget_spinner_core::FrontierId, - pub hypothesis_node_id: fidget_spinner_core::NodeId, - pub hypothesis_title: NonEmptyText, - pub run_id: fidget_spinner_core::RunId, - pub verdict: FrontierVerdict, - pub unit: Option<MetricUnit>, - pub objective: Option<OptimizationObjective>, - pub dimensions: BTreeMap<NonEmptyText, RunDimensionValue>, + pub unit: MetricUnit, + pub objective: OptimizationObjective, +} + +#[derive(Clone, Debug, Deserialize, PartialEq, Serialize)] +pub struct ExperimentSummary { + pub id: ExperimentId, + pub slug: Slug, + pub frontier_id: FrontierId, + pub hypothesis_id: HypothesisId, + pub archived: bool, + pub title: NonEmptyText, + pub summary: Option<NonEmptyText>, + pub tags: Vec<TagName>, + pub status: ExperimentStatus, + pub verdict: Option<FrontierVerdict>, + pub primary_metric: Option<MetricObservationSummary>, + pub updated_at: OffsetDateTime, + pub closed_at: Option<OffsetDateTime>, +} + +#[derive(Clone, Debug, Deserialize, PartialEq, Serialize)] +pub struct ExperimentDetail { + pub record: ExperimentRecord, + pub owning_hypothesis: HypothesisSummary, + pub parents: Vec<VertexSummary>, + pub children: Vec<VertexSummary>, + pub artifacts: Vec<ArtifactSummary>, +} + +#[derive(Clone, Debug)] +pub struct CreateArtifactRequest { + pub slug: Option<Slug>, + pub kind: ArtifactKind, + pub label: NonEmptyText, + pub summary: Option<NonEmptyText>, + pub locator: NonEmptyText, + pub media_type: Option<NonEmptyText>, + pub attachments: Vec<AttachmentSelector>, +} + +#[derive(Clone, Debug)] +pub struct UpdateArtifactRequest { + pub artifact: String, + pub expected_revision: Option<u64>, + pub kind: Option<ArtifactKind>, + pub label: Option<NonEmptyText>, + pub summary: Option<TextPatch<NonEmptyText>>, + pub locator: Option<NonEmptyText>, + pub media_type: Option<TextPatch<NonEmptyText>>, + pub attachments: Option<Vec<AttachmentSelector>>, } #[derive(Clone, Debug, Default)] -pub struct MetricKeyQuery { - pub frontier_id: Option<fidget_spinner_core::FrontierId>, - pub source: Option<MetricFieldSource>, - pub dimensions: BTreeMap<NonEmptyText, RunDimensionValue>, +pub struct ListArtifactsQuery { + pub frontier: Option<String>, + pub kind: Option<ArtifactKind>, + pub attached_to: Option<AttachmentSelector>, + pub limit: Option<u32>, +} + +#[derive(Clone, Debug, Deserialize, Eq, PartialEq, Serialize)] +pub struct ArtifactSummary { + pub id: ArtifactId, + pub slug: Slug, + pub kind: ArtifactKind, + pub label: NonEmptyText, + pub summary: Option<NonEmptyText>, + pub locator: NonEmptyText, + pub media_type: Option<NonEmptyText>, + pub updated_at: OffsetDateTime, +} + +#[derive(Clone, Debug, Deserialize, Eq, PartialEq, Serialize)] +pub struct ArtifactDetail { + pub record: ArtifactRecord, + pub attachments: Vec<AttachmentTargetRef>, } #[derive(Clone, Debug)] @@ -314,6 +420,7 @@ pub struct DefineMetricRequest { pub key: NonEmptyText, pub unit: MetricUnit, pub objective: OptimizationObjective, + pub visibility: MetricVisibility, pub description: Option<NonEmptyText>, } @@ -325,154 +432,131 @@ pub struct DefineRunDimensionRequest { } #[derive(Clone, Debug)] -pub struct UpsertSchemaFieldRequest { - pub name: NonEmptyText, - pub node_classes: BTreeSet<NodeClass>, - pub presence: FieldPresence, - pub severity: DiagnosticSeverity, - pub role: FieldRole, - pub inference_policy: InferencePolicy, - pub value_type: Option<FieldValueType>, -} - -#[derive(Clone, Debug)] -pub struct RemoveSchemaFieldRequest { - pub name: NonEmptyText, - pub node_classes: Option<BTreeSet<NodeClass>>, +pub struct MetricKeysQuery { + pub frontier: Option<String>, + pub scope: MetricScope, } #[derive(Clone, Debug, Deserialize, Eq, PartialEq, Serialize)] -pub struct RunDimensionSummary { +pub struct MetricKeySummary { pub key: NonEmptyText, - pub value_type: FieldValueType, + pub unit: MetricUnit, + pub objective: OptimizationObjective, + pub visibility: MetricVisibility, pub description: Option<NonEmptyText>, - pub observed_run_count: u64, - pub distinct_value_count: u64, - pub sample_values: Vec<Value>, -} - -#[derive(Clone, Debug, Default, Deserialize, Eq, PartialEq, Serialize)] -pub struct MetricPlaneMigrationReport { - pub inserted_metric_definitions: u64, - pub inserted_dimension_definitions: u64, - pub inserted_dimension_values: u64, + pub reference_count: u64, } #[derive(Clone, Debug)] -pub struct CreateFrontierRequest { - pub label: NonEmptyText, - pub contract_title: NonEmptyText, - pub contract_summary: Option<NonEmptyText>, - pub contract: FrontierContract, +pub struct MetricBestQuery { + pub frontier: Option<String>, + pub hypothesis: Option<String>, + pub key: NonEmptyText, + pub dimensions: BTreeMap<NonEmptyText, RunDimensionValue>, + pub include_rejected: bool, + pub limit: Option<u32>, + pub order: Option<MetricRankOrder>, } -#[derive(Clone, Debug)] -pub struct OpenExperimentRequest { - pub frontier_id: fidget_spinner_core::FrontierId, - pub hypothesis_node_id: fidget_spinner_core::NodeId, - pub title: NonEmptyText, - pub summary: Option<NonEmptyText>, +#[derive(Clone, Debug, Deserialize, PartialEq, Serialize)] +pub struct MetricBestEntry { + pub experiment: ExperimentSummary, + pub hypothesis: HypothesisSummary, + pub value: f64, + pub dimensions: BTreeMap<NonEmptyText, RunDimensionValue>, } #[derive(Clone, Debug, Deserialize, Eq, PartialEq, Serialize)] -pub struct OpenExperimentSummary { - pub id: fidget_spinner_core::ExperimentId, - pub frontier_id: fidget_spinner_core::FrontierId, - pub hypothesis_node_id: fidget_spinner_core::NodeId, - pub title: NonEmptyText, - pub summary: Option<NonEmptyText>, - pub created_at: OffsetDateTime, +pub struct EntityHistoryEntry { + pub revision: u64, + pub event_kind: NonEmptyText, + pub occurred_at: OffsetDateTime, + pub snapshot: Value, } -#[derive(Clone, Debug)] -pub struct ExperimentAnalysisDraft { - pub title: NonEmptyText, - pub summary: NonEmptyText, - pub body: NonEmptyText, -} - -#[derive(Clone, Debug)] -pub struct CloseExperimentRequest { - pub experiment_id: fidget_spinner_core::ExperimentId, - pub run_title: NonEmptyText, - pub run_summary: Option<NonEmptyText>, - pub backend: ExecutionBackend, - pub dimensions: BTreeMap<NonEmptyText, RunDimensionValue>, - pub command: CommandRecipe, - pub primary_metric: MetricValue, - pub supporting_metrics: Vec<MetricValue>, - pub note: FrontierNote, - pub verdict: FrontierVerdict, - pub analysis: Option<ExperimentAnalysisDraft>, - pub decision_title: NonEmptyText, - pub decision_rationale: NonEmptyText, +#[derive(Clone, Debug, Deserialize, PartialEq, Serialize)] +pub struct HypothesisCurrentState { + pub hypothesis: HypothesisSummary, + pub open_experiments: Vec<ExperimentSummary>, + pub latest_closed_experiment: Option<ExperimentSummary>, } #[derive(Clone, Debug, Deserialize, PartialEq, Serialize)] -pub struct ExperimentReceipt { - pub open_experiment: OpenExperiment, - pub run_node: DagNode, - pub run: RunRecord, - pub analysis_node: Option<DagNode>, - pub decision_node: DagNode, - pub experiment: CompletedExperiment, +pub struct FrontierOpenProjection { + pub frontier: FrontierRecord, + pub active_tags: Vec<TagName>, + pub active_metric_keys: Vec<MetricKeySummary>, + pub active_hypotheses: Vec<HypothesisCurrentState>, + pub open_experiments: Vec<ExperimentSummary>, } pub struct ProjectStore { project_root: Utf8PathBuf, state_root: Utf8PathBuf, - connection: Connection, config: ProjectConfig, - schema: ProjectSchema, + connection: Connection, } impl ProjectStore { pub fn init( project_root: impl AsRef<Utf8Path>, display_name: NonEmptyText, - schema_namespace: NonEmptyText, ) -> Result<Self, StoreError> { let project_root = project_root.as_ref().to_path_buf(); + fs::create_dir_all(project_root.as_std_path())?; let state_root = state_root(&project_root); - fs::create_dir_all(state_root.join("blobs"))?; + fs::create_dir_all(state_root.as_std_path())?; let config = ProjectConfig::new(display_name); write_json_file(&state_root.join(PROJECT_CONFIG_NAME), &config)?; - let schema = ProjectSchema::default_with_namespace(schema_namespace); - write_json_file(&state_root.join(PROJECT_SCHEMA_NAME), &schema)?; - let mut connection = Connection::open(state_root.join(STATE_DB_NAME).as_std_path())?; - upgrade_store(&mut connection)?; + let database_path = state_root.join(STATE_DB_NAME); + let connection = Connection::open(database_path.as_std_path())?; + connection.pragma_update(None, "foreign_keys", 1_i64)?; + connection.pragma_update( + None, + "user_version", + i64::from(CURRENT_STORE_FORMAT_VERSION), + )?; + install_schema(&connection)?; Ok(Self { project_root, state_root, - connection, config, - schema, + connection, }) } pub fn open(project_root: impl AsRef<Utf8Path>) -> Result<Self, StoreError> { - let requested_root = project_root.as_ref().to_path_buf(); - let project_root = discover_project_root(&requested_root) - .ok_or(StoreError::MissingProjectStore(requested_root))?; + let project_root = project_root.as_ref().to_path_buf(); let state_root = state_root(&project_root); - let config = read_json_file::<ProjectConfig>(&state_root.join(PROJECT_CONFIG_NAME))?; + if !state_root.exists() { + return Err(StoreError::MissingProjectStore(project_root)); + } + let config: ProjectConfig = read_json_file(&state_root.join(PROJECT_CONFIG_NAME))?; if config.store_format_version != CURRENT_STORE_FORMAT_VERSION { return Err(StoreError::IncompatibleStoreFormatVersion { observed: config.store_format_version, expected: CURRENT_STORE_FORMAT_VERSION, }); } - let schema = read_json_file::<ProjectSchema>(&state_root.join(PROJECT_SCHEMA_NAME))?; - let mut connection = Connection::open(state_root.join(STATE_DB_NAME).as_std_path())?; - upgrade_store(&mut connection)?; + let database_path = state_root.join(STATE_DB_NAME); + let connection = Connection::open(database_path.as_std_path())?; + connection.pragma_update(None, "foreign_keys", 1_i64)?; + let observed_version: i64 = + connection.pragma_query_value(None, "user_version", |row| row.get(0))?; + if u32::try_from(observed_version).ok() != Some(CURRENT_STORE_FORMAT_VERSION) { + return Err(StoreError::IncompatibleStoreFormatVersion { + observed: u32::try_from(observed_version).unwrap_or(0), + expected: CURRENT_STORE_FORMAT_VERSION, + }); + } + Ok(Self { project_root, state_root, - connection, config, - schema, + connection, }) } @@ -482,64 +566,6 @@ impl ProjectStore { } #[must_use] - pub fn schema(&self) -> &ProjectSchema { - &self.schema - } - - pub fn upsert_schema_field( - &mut self, - request: UpsertSchemaFieldRequest, - ) -> Result<ProjectFieldSpec, StoreError> { - let field = ProjectFieldSpec { - name: request.name, - node_classes: request.node_classes, - presence: request.presence, - severity: request.severity, - role: request.role, - inference_policy: request.inference_policy, - value_type: request.value_type, - }; - if let Some(existing) = self.schema.fields.iter_mut().find(|existing| { - existing.name == field.name && existing.node_classes == field.node_classes - }) { - if *existing == field { - return Ok(field); - } - *existing = field.clone(); - } else { - self.schema.fields.push(field.clone()); - } - sort_schema_fields(&mut self.schema.fields); - self.bump_schema_version(); - self.save_schema()?; - Ok(field) - } - - pub fn remove_schema_field( - &mut self, - request: RemoveSchemaFieldRequest, - ) -> Result<u64, StoreError> { - let before = self.schema.fields.len(); - self.schema.fields.retain(|field| { - field.name != request.name - || request - .node_classes - .as_ref() - .is_some_and(|node_classes| field.node_classes != *node_classes) - }); - let removed = before.saturating_sub(self.schema.fields.len()) as u64; - if removed == 0 { - return Err(StoreError::SchemaFieldNotFound( - request.name.as_str().to_owned(), - )); - } - sort_schema_fields(&mut self.schema.fields); - self.bump_schema_version(); - self.save_schema()?; - Ok(removed) - } - - #[must_use] pub fn project_root(&self) -> &Utf8Path { &self.project_root } @@ -549,3634 +575,2745 @@ impl ProjectStore { &self.state_root } - fn bump_schema_version(&mut self) { - self.schema.version = self.schema.version.saturating_add(1); - } - - fn save_schema(&self) -> Result<(), StoreError> { - write_json_file(&self.state_root.join(PROJECT_SCHEMA_NAME), &self.schema) + pub fn status(&self) -> Result<ProjectStatus, StoreError> { + Ok(ProjectStatus { + project_root: self.project_root.clone(), + display_name: self.config.display_name.clone(), + store_format_version: self.config.store_format_version, + frontier_count: count_rows(&self.connection, "frontiers")?, + hypothesis_count: count_rows(&self.connection, "hypotheses")?, + experiment_count: count_rows(&self.connection, "experiments")?, + open_experiment_count: count_rows_where( + &self.connection, + "experiments", + "status = 'open'", + )?, + artifact_count: count_rows(&self.connection, "artifacts")?, + }) } - pub fn create_frontier( + pub fn register_tag( &mut self, - request: CreateFrontierRequest, - ) -> Result<FrontierProjection, StoreError> { - let frontier_id = fidget_spinner_core::FrontierId::fresh(); - let payload = NodePayload::with_schema( - self.schema.schema_ref(), - frontier_contract_payload(&request.contract)?, - ); - let diagnostics = self.schema.validate_node(NodeClass::Contract, &payload); - let contract_node = DagNode::new( - NodeClass::Contract, - Some(frontier_id), - request.contract_title, - request.contract_summary, - payload, - diagnostics, - ); - let frontier = FrontierRecord::with_id(frontier_id, request.label, contract_node.id); - - let tx = self.connection.transaction()?; - let _ = upsert_metric_definition_tx( - &tx, - &MetricDefinition::new( - request - .contract - .evaluation - .primary_metric - .metric_key - .clone(), - request.contract.evaluation.primary_metric.unit, - request.contract.evaluation.primary_metric.objective, - None, - ), - )?; - for metric in &request.contract.evaluation.supporting_metrics { - let _ = upsert_metric_definition_tx( - &tx, - &MetricDefinition::new( - metric.metric_key.clone(), - metric.unit, - metric.objective, - None, - ), - )?; + name: TagName, + description: NonEmptyText, + ) -> Result<TagRecord, StoreError> { + if self + .connection + .query_row( + "SELECT 1 FROM tags WHERE name = ?1", + params![name.as_str()], + |_| Ok(()), + ) + .optional()? + .is_some() + { + return Err(StoreError::DuplicateTag(name)); } - insert_node(&tx, &contract_node)?; - insert_frontier(&tx, &frontier)?; - insert_event( - &tx, - "frontier", - &frontier.id.to_string(), - "frontier.created", - json!({"root_contract_node_id": contract_node.id}), + let created_at = OffsetDateTime::now_utc(); + let _ = self.connection.execute( + "INSERT INTO tags (name, description, created_at) VALUES (?1, ?2, ?3)", + params![ + name.as_str(), + description.as_str(), + encode_timestamp(created_at)? + ], )?; - tx.commit()?; + Ok(TagRecord { + name, + description, + created_at, + }) + } - self.frontier_projection(frontier.id) + pub fn list_tags(&self) -> Result<Vec<TagRecord>, StoreError> { + let mut statement = self + .connection + .prepare("SELECT name, description, created_at FROM tags ORDER BY name ASC")?; + let rows = statement.query_map([], |row| { + Ok(TagRecord { + name: parse_tag_name(&row.get::<_, String>(0)?)?, + description: parse_non_empty_text(&row.get::<_, String>(1)?)?, + created_at: parse_timestamp_sql(&row.get::<_, String>(2)?)?, + }) + })?; + rows.collect::<Result<Vec<_>, _>>() + .map_err(StoreError::from) } pub fn define_metric( &mut self, request: DefineMetricRequest, ) -> Result<MetricDefinition, StoreError> { + if self.metric_definition(&request.key)?.is_some() { + return Err(StoreError::DuplicateMetricDefinition(request.key)); + } let record = MetricDefinition::new( request.key, request.unit, request.objective, + request.visibility, request.description, ); - let tx = self.connection.transaction()?; - let _ = upsert_metric_definition_tx(&tx, &record)?; - tx.commit()?; + let _ = self.connection.execute( + "INSERT INTO metric_definitions (key, unit, objective, visibility, description, created_at, updated_at) + VALUES (?1, ?2, ?3, ?4, ?5, ?6, ?7)", + params![ + record.key.as_str(), + record.unit.as_str(), + record.objective.as_str(), + record.visibility.as_str(), + record.description.as_ref().map(NonEmptyText::as_str), + encode_timestamp(record.created_at)?, + encode_timestamp(record.updated_at)?, + ], + )?; Ok(record) } pub fn list_metric_definitions(&self) -> Result<Vec<MetricDefinition>, StoreError> { let mut statement = self.connection.prepare( - "SELECT metric_key, unit, objective, description, created_at + "SELECT key, unit, objective, visibility, description, created_at, updated_at FROM metric_definitions - ORDER BY metric_key ASC", + ORDER BY key ASC", )?; - let mut rows = statement.query([])?; - let mut items = Vec::new(); - while let Some(row) = rows.next()? { - items.push(MetricDefinition { - key: NonEmptyText::new(row.get::<_, String>(0)?)?, - unit: decode_metric_unit(&row.get::<_, String>(1)?)?, - objective: decode_optimization_objective(&row.get::<_, String>(2)?)?, - description: row - .get::<_, Option<String>>(3)? - .map(NonEmptyText::new) - .transpose()?, - created_at: decode_timestamp(&row.get::<_, String>(4)?)?, - }); - } - Ok(items) + let rows = statement.query_map([], decode_metric_definition_row)?; + rows.collect::<Result<Vec<_>, _>>() + .map_err(StoreError::from) } pub fn define_run_dimension( &mut self, request: DefineRunDimensionRequest, ) -> Result<RunDimensionDefinition, StoreError> { + if self.run_dimension_definition(&request.key)?.is_some() { + return Err(StoreError::DuplicateRunDimension(request.key)); + } let record = RunDimensionDefinition::new(request.key, request.value_type, request.description); - let tx = self.connection.transaction()?; - let _ = insert_run_dimension_definition_tx(&tx, &record)?; - tx.commit()?; + let _ = self.connection.execute( + "INSERT INTO run_dimension_definitions (key, value_type, description, created_at, updated_at) + VALUES (?1, ?2, ?3, ?4, ?5)", + params![ + record.key.as_str(), + record.value_type.as_str(), + record.description.as_ref().map(NonEmptyText::as_str), + encode_timestamp(record.created_at)?, + encode_timestamp(record.updated_at)?, + ], + )?; Ok(record) } - pub fn list_run_dimensions(&self) -> Result<Vec<RunDimensionSummary>, StoreError> { - load_run_dimension_summaries(self) - } - - pub fn coerce_run_dimensions( - &self, - raw_dimensions: BTreeMap<String, Value>, - ) -> Result<BTreeMap<NonEmptyText, RunDimensionValue>, StoreError> { - coerce_run_dimension_map(&run_dimension_definitions_by_key(self)?, raw_dimensions) - } - - pub fn migrate_metric_plane(&mut self) -> Result<MetricPlaneMigrationReport, StoreError> { - let tx = self.connection.transaction()?; - let report = normalize_metric_plane_tx(&tx)?; - tx.commit()?; - Ok(report) + pub fn list_run_dimensions(&self) -> Result<Vec<RunDimensionDefinition>, StoreError> { + let mut statement = self.connection.prepare( + "SELECT key, value_type, description, created_at, updated_at + FROM run_dimension_definitions + ORDER BY key ASC", + )?; + let rows = statement.query_map([], decode_run_dimension_definition_row)?; + rows.collect::<Result<Vec<_>, _>>() + .map_err(StoreError::from) } - pub fn add_tag( + pub fn create_frontier( &mut self, - name: TagName, - description: NonEmptyText, - ) -> Result<TagRecord, StoreError> { - let record = TagRecord { - name, - description, - created_at: OffsetDateTime::now_utc(), + request: CreateFrontierRequest, + ) -> Result<FrontierRecord, StoreError> { + let id = FrontierId::fresh(); + let slug = self.unique_frontier_slug(request.slug, &request.label)?; + let now = OffsetDateTime::now_utc(); + let record = FrontierRecord { + id, + slug, + label: request.label, + objective: request.objective, + status: FrontierStatus::Exploring, + brief: FrontierBrief::default(), + revision: 1, + created_at: now, + updated_at: now, }; - let tx = self.connection.transaction()?; - insert_tag(&tx, &record)?; - insert_event( - &tx, - "tag", - record.name.as_str(), - "tag.created", - json!({"description": record.description.as_str()}), + let transaction = self.connection.transaction()?; + insert_frontier(&transaction, &record)?; + record_event( + &transaction, + "frontier", + &record.id.to_string(), + 1, + "created", + &record, )?; - tx.commit()?; + transaction.commit()?; Ok(record) } - pub fn list_tags(&self) -> Result<Vec<TagRecord>, StoreError> { + pub fn list_frontiers(&self) -> Result<Vec<FrontierSummary>, StoreError> { let mut statement = self.connection.prepare( - "SELECT name, description, created_at - FROM tags - ORDER BY name ASC", + "SELECT id, slug, label, objective, status, brief_json, revision, created_at, updated_at + FROM frontiers + ORDER BY updated_at DESC, created_at DESC", )?; - let mut rows = statement.query([])?; - let mut items = Vec::new(); - while let Some(row) = rows.next()? { - items.push(TagRecord { - name: TagName::new(row.get::<_, String>(0)?)?, - description: NonEmptyText::new(row.get::<_, String>(1)?)?, - created_at: decode_timestamp(&row.get::<_, String>(2)?)?, - }); - } - Ok(items) + let rows = statement.query_map([], decode_frontier_row)?; + rows.collect::<Result<Vec<_>, _>>() + .map_err(StoreError::from)? + .into_iter() + .map(|record| { + Ok(FrontierSummary { + active_hypothesis_count: self.active_hypothesis_count(record.id)?, + open_experiment_count: self.open_experiment_count(Some(record.id))?, + id: record.id, + slug: record.slug, + label: record.label, + objective: record.objective, + status: record.status, + updated_at: record.updated_at, + }) + }) + .collect() } - pub fn add_node(&mut self, request: CreateNodeRequest) -> Result<DagNode, StoreError> { - validate_prose_node_request(&request)?; - let diagnostics = self.schema.validate_node(request.class, &request.payload); - let mut node = DagNode::new( - request.class, - request.frontier_id, - request.title, - request.summary, - request.payload, - diagnostics, - ); - node.tags = match (request.class, request.tags) { - (NodeClass::Note, Some(tags)) => tags, - (NodeClass::Note, None) => return Err(StoreError::NoteTagsRequired), - (_, Some(tags)) => tags, - (_, None) => BTreeSet::new(), - }; - node.annotations = request.annotations; + pub fn read_frontier(&self, selector: &str) -> Result<FrontierRecord, StoreError> { + self.resolve_frontier(selector) + } - let tx = self.connection.transaction()?; - ensure_known_tags(&tx, &node.tags)?; - insert_node(&tx, &node)?; - for attachment in &request.attachments { - insert_edge(&tx, &attachment.materialize(node.id))?; - } - insert_event( - &tx, - "node", - &node.id.to_string(), - "node.created", - json!({"class": node.class.as_str(), "frontier_id": node.frontier_id}), + pub fn update_frontier_brief( + &mut self, + request: UpdateFrontierBriefRequest, + ) -> Result<FrontierRecord, StoreError> { + let frontier = self.resolve_frontier(&request.frontier)?; + enforce_revision( + "frontier", + &request.frontier, + request.expected_revision, + frontier.revision, + )?; + let now = OffsetDateTime::now_utc(); + let brief = FrontierBrief { + situation: apply_optional_text_patch( + request.situation, + frontier.brief.situation.clone(), + ), + roadmap: match request.roadmap { + Some(items) => items + .into_iter() + .map(|item| { + Ok(FrontierRoadmapItem { + rank: item.rank, + hypothesis_id: self.resolve_hypothesis(&item.hypothesis)?.id, + summary: item.summary, + }) + }) + .collect::<Result<Vec<_>, StoreError>>()?, + None => frontier.brief.roadmap.clone(), + }, + unknowns: request.unknowns.unwrap_or(frontier.brief.unknowns.clone()), + revision: frontier.brief.revision.saturating_add(1), + updated_at: Some(now), + }; + let updated = FrontierRecord { + brief, + revision: frontier.revision.saturating_add(1), + updated_at: now, + ..frontier + }; + let transaction = self.connection.transaction()?; + update_frontier(&transaction, &updated)?; + record_event( + &transaction, + "frontier", + &updated.id.to_string(), + updated.revision, + "brief_updated", + &updated, )?; - tx.commit()?; - Ok(node) + transaction.commit()?; + Ok(updated) } - pub fn list_metric_keys(&self) -> Result<Vec<MetricKeySummary>, StoreError> { - self.list_metric_keys_filtered(MetricKeyQuery::default()) + pub fn create_hypothesis( + &mut self, + request: CreateHypothesisRequest, + ) -> Result<HypothesisRecord, StoreError> { + validate_hypothesis_body(&request.body)?; + self.assert_known_tags(&request.tags)?; + let frontier = self.resolve_frontier(&request.frontier)?; + let id = HypothesisId::fresh(); + let slug = self.unique_hypothesis_slug(request.slug, &request.title)?; + let now = OffsetDateTime::now_utc(); + let record = HypothesisRecord { + id, + slug, + frontier_id: frontier.id, + archived: false, + title: request.title, + summary: request.summary, + body: request.body, + tags: request.tags.iter().cloned().collect(), + revision: 1, + created_at: now, + updated_at: now, + }; + let parents = self.resolve_vertex_parents( + frontier.id, + &request.parents, + Some(VertexRef::Hypothesis(id)), + )?; + let transaction = self.connection.transaction()?; + insert_hypothesis(&transaction, &record)?; + replace_hypothesis_tags(&transaction, record.id, &request.tags)?; + replace_influence_parents(&transaction, VertexRef::Hypothesis(id), &parents)?; + record_event( + &transaction, + "hypothesis", + &record.id.to_string(), + 1, + "created", + &record, + )?; + transaction.commit()?; + Ok(record) } - pub fn list_metric_keys_filtered( + pub fn list_hypotheses( &self, - query: MetricKeyQuery, - ) -> Result<Vec<MetricKeySummary>, StoreError> { - let mut summaries = collect_metric_samples(self, &query)? + query: ListHypothesesQuery, + ) -> Result<Vec<HypothesisSummary>, StoreError> { + let frontier_id = query + .frontier + .as_deref() + .map(|selector| self.resolve_frontier(selector).map(|frontier| frontier.id)) + .transpose()?; + let records = self.load_hypothesis_records(frontier_id, query.include_archived)?; + let filtered = records .into_iter() - .fold( - BTreeMap::<(MetricFieldSource, String), MetricKeyAccumulator>::new(), - |mut accumulators, sample| { - let key = (sample.source, sample.key.as_str().to_owned()); - let _ = accumulators - .entry(key) - .and_modify(|entry| entry.observe(&sample)) - .or_insert_with(|| MetricKeyAccumulator::from_sample(&sample)); - accumulators - }, - ) - .into_values() - .map(MetricKeyAccumulator::finish) - .collect::<Vec<_>>(); - if query - .source - .is_none_or(|source| source == MetricFieldSource::RunMetric) - { - merge_registered_run_metric_summaries(self, &mut summaries)?; - } - summaries.sort_by(|left, right| { - left.key - .cmp(&right.key) - .then(left.source.cmp(&right.source)) - }); - Ok(summaries) + .filter(|record| { + query.tags.is_empty() || query.tags.iter().all(|tag| record.tags.contains(tag)) + }) + .map(|record| self.hypothesis_summary_from_record(record)) + .collect::<Result<Vec<_>, _>>()?; + Ok(apply_limit(filtered, query.limit)) } - pub fn best_metrics(&self, query: MetricBestQuery) -> Result<Vec<MetricBestEntry>, StoreError> { - let matching = collect_metric_samples( - self, - &MetricKeyQuery { - frontier_id: query.frontier_id, - source: query.source, - dimensions: query.dimensions.clone(), - }, - )? - .into_iter() - .filter(|sample| sample.key == query.key) - .collect::<Vec<_>>(); - if matching.is_empty() { - return Ok(Vec::new()); - } - - let source = if let Some(source) = query.source { - source - } else { - let sources = matching - .iter() - .map(|sample| sample.source) - .collect::<BTreeSet<_>>(); - if sources.len() != 1 { - return Err(StoreError::AmbiguousMetricKey { - key: query.key.as_str().to_owned(), - sources: sources - .into_iter() - .map(MetricFieldSource::as_str) - .collect::<Vec<_>>() - .join(", "), - }); - } - let Some(source) = sources.iter().copied().next() else { - return Ok(Vec::new()); - }; - source - }; - - let mut matching = matching - .into_iter() - .filter(|sample| sample.source == source) - .collect::<Vec<_>>(); - if matching.is_empty() { - return Ok(Vec::new()); - } - - let order = resolve_metric_order(&matching, &query, source)?; - matching.sort_by(|left, right| compare_metric_samples(left, right, order)); - matching.truncate(query.limit as usize); - Ok(matching + pub fn read_hypothesis(&self, selector: &str) -> Result<HypothesisDetail, StoreError> { + let record = self.resolve_hypothesis(selector)?; + let parents = self.load_vertex_parents(VertexRef::Hypothesis(record.id))?; + let children = self.load_vertex_children(VertexRef::Hypothesis(record.id))?; + let experiments = self.list_experiments(ListExperimentsQuery { + hypothesis: Some(record.id.to_string()), + include_archived: true, + limit: None, + ..ListExperimentsQuery::default() + })?; + let (open_experiments, closed_experiments): (Vec<_>, Vec<_>) = experiments .into_iter() - .map(|sample| sample.into_entry(order)) - .collect()) + .partition(|experiment| experiment.status == ExperimentStatus::Open); + Ok(HypothesisDetail { + artifacts: self.list_artifacts(ListArtifactsQuery { + attached_to: Some(AttachmentSelector::Hypothesis(record.id.to_string())), + limit: None, + ..ListArtifactsQuery::default() + })?, + children, + closed_experiments, + open_experiments, + parents, + record, + }) } - pub fn archive_node(&mut self, node_id: fidget_spinner_core::NodeId) -> Result<(), StoreError> { - let updated_at = encode_timestamp(OffsetDateTime::now_utc())?; - let changed = self.connection.execute( - "UPDATE nodes SET archived = 1, updated_at = ?1 WHERE id = ?2", - params![updated_at, node_id.to_string()], + pub fn update_hypothesis( + &mut self, + request: UpdateHypothesisRequest, + ) -> Result<HypothesisRecord, StoreError> { + let record = self.resolve_hypothesis(&request.hypothesis)?; + enforce_revision( + "hypothesis", + &request.hypothesis, + request.expected_revision, + record.revision, )?; - if changed == 0 { - return Err(StoreError::NodeNotFound(node_id)); + if let Some(body) = request.body.as_ref() { + validate_hypothesis_body(body)?; } - Ok(()) + if let Some(tags) = request.tags.as_ref() { + self.assert_known_tags(tags)?; + } + let updated = HypothesisRecord { + title: request.title.unwrap_or(record.title.clone()), + summary: request.summary.unwrap_or(record.summary.clone()), + body: request.body.unwrap_or(record.body.clone()), + tags: request + .tags + .clone() + .map_or_else(|| record.tags.clone(), |tags| tags.into_iter().collect()), + archived: request.archived.unwrap_or(record.archived), + revision: record.revision.saturating_add(1), + updated_at: OffsetDateTime::now_utc(), + ..record + }; + let parents = request + .parents + .as_ref() + .map(|selectors| { + self.resolve_vertex_parents( + updated.frontier_id, + selectors, + Some(VertexRef::Hypothesis(updated.id)), + ) + }) + .transpose()?; + let transaction = self.connection.transaction()?; + update_hypothesis_row(&transaction, &updated)?; + replace_hypothesis_tags( + &transaction, + updated.id, + &updated.tags.iter().cloned().collect::<BTreeSet<_>>(), + )?; + if let Some(parents) = parents.as_ref() { + replace_influence_parents(&transaction, VertexRef::Hypothesis(updated.id), parents)?; + } + record_event( + &transaction, + "hypothesis", + &updated.id.to_string(), + updated.revision, + "updated", + &updated, + )?; + transaction.commit()?; + Ok(updated) } - pub fn annotate_node( + pub fn open_experiment( &mut self, - node_id: fidget_spinner_core::NodeId, - annotation: NodeAnnotation, - ) -> Result<(), StoreError> { - let tx = self.connection.transaction()?; - let exists = tx - .query_row( - "SELECT 1 FROM nodes WHERE id = ?1", - params![node_id.to_string()], - |row| row.get::<_, i64>(0), - ) - .optional()?; - if exists.is_none() { - return Err(StoreError::NodeNotFound(node_id)); - } - insert_annotation(&tx, node_id, &annotation)?; - let _ = tx.execute( - "UPDATE nodes SET updated_at = ?1 WHERE id = ?2", - params![ - encode_timestamp(OffsetDateTime::now_utc())?, - node_id.to_string() - ], + request: OpenExperimentRequest, + ) -> Result<ExperimentRecord, StoreError> { + self.assert_known_tags(&request.tags)?; + let hypothesis = self.resolve_hypothesis(&request.hypothesis)?; + let id = ExperimentId::fresh(); + let slug = self.unique_experiment_slug(request.slug, &request.title)?; + let now = OffsetDateTime::now_utc(); + let record = ExperimentRecord { + id, + slug, + frontier_id: hypothesis.frontier_id, + hypothesis_id: hypothesis.id, + archived: false, + title: request.title, + summary: request.summary, + tags: request.tags.iter().cloned().collect(), + status: ExperimentStatus::Open, + outcome: None, + revision: 1, + created_at: now, + updated_at: now, + }; + let parents = self.resolve_vertex_parents( + hypothesis.frontier_id, + &request.parents, + Some(VertexRef::Experiment(id)), )?; - insert_event( - &tx, - "node", - &node_id.to_string(), - "node.annotated", - json!({"visibility": format!("{:?}", annotation.visibility)}), + let transaction = self.connection.transaction()?; + insert_experiment(&transaction, &record)?; + replace_experiment_tags(&transaction, record.id, &request.tags)?; + replace_influence_parents(&transaction, VertexRef::Experiment(id), &parents)?; + record_event( + &transaction, + "experiment", + &record.id.to_string(), + 1, + "opened", + &record, )?; - tx.commit()?; - Ok(()) + transaction.commit()?; + Ok(record) } - pub fn get_node( + pub fn list_experiments( &self, - node_id: fidget_spinner_core::NodeId, - ) -> Result<Option<DagNode>, StoreError> { - let mut statement = self.connection.prepare( - "SELECT - id, - class, - track, - frontier_id, - archived, - title, - summary, - payload_schema_namespace, - payload_schema_version, - payload_json, - diagnostics_json, - agent_session_id, - created_at, - updated_at - FROM nodes - WHERE id = ?1", - )?; - let node = statement - .query_row(params![node_id.to_string()], read_node_row) - .optional()?; - node.map(|mut item| { - item.tags = self.load_tags(item.id)?; - item.annotations = self.load_annotations(item.id)?; - Ok(item) + query: ListExperimentsQuery, + ) -> Result<Vec<ExperimentSummary>, StoreError> { + let frontier_id = query + .frontier + .as_deref() + .map(|selector| self.resolve_frontier(selector).map(|frontier| frontier.id)) + .transpose()?; + let hypothesis_id = query + .hypothesis + .as_deref() + .map(|selector| { + self.resolve_hypothesis(selector) + .map(|hypothesis| hypothesis.id) + }) + .transpose()?; + let records = + self.load_experiment_records(frontier_id, hypothesis_id, query.include_archived)?; + let filtered = records + .into_iter() + .filter(|record| query.status.is_none_or(|status| record.status == status)) + .filter(|record| { + query.tags.is_empty() || query.tags.iter().all(|tag| record.tags.contains(tag)) + }) + .map(|record| self.experiment_summary_from_record(record)) + .collect::<Result<Vec<_>, _>>()?; + Ok(apply_limit(filtered, query.limit)) + } + + pub fn read_experiment(&self, selector: &str) -> Result<ExperimentDetail, StoreError> { + let record = self.resolve_experiment(selector)?; + Ok(ExperimentDetail { + artifacts: self.list_artifacts(ListArtifactsQuery { + attached_to: Some(AttachmentSelector::Experiment(record.id.to_string())), + limit: None, + ..ListArtifactsQuery::default() + })?, + children: self.load_vertex_children(VertexRef::Experiment(record.id))?, + owning_hypothesis: self + .hypothesis_summary_from_record(self.hypothesis_by_id(record.hypothesis_id)?)?, + parents: self.load_vertex_parents(VertexRef::Experiment(record.id))?, + record, }) - .transpose() } - pub fn list_nodes(&self, query: ListNodesQuery) -> Result<Vec<NodeSummary>, StoreError> { - let frontier_id = query.frontier_id.map(|id| id.to_string()); - let class = query.class.map(|item| item.as_str().to_owned()); - let mut sql = String::from( - "SELECT - n.id, - n.class, - n.track, - n.frontier_id, - n.archived, - n.title, - n.summary, - n.diagnostics_json, - n.created_at, - n.updated_at, - ( - SELECT COUNT(*) - FROM node_annotations AS a - WHERE a.node_id = n.id AND a.visibility = 'hidden' - ) AS hidden_annotation_count - FROM nodes AS n - WHERE (?1 IS NULL OR n.frontier_id = ?1) - AND (?2 IS NULL OR n.class = ?2) - AND (?3 = 1 OR n.archived = 0)", - ); - let mut parameters = vec![ - frontier_id.map_or(SqlValue::Null, SqlValue::Text), - class.map_or(SqlValue::Null, SqlValue::Text), - SqlValue::Integer(i64::from(query.include_archived)), - ]; - for (index, tag) in query.tags.iter().enumerate() { - let placeholder = parameters.len() + 1; - let _ = write!( - sql, - " - AND EXISTS ( - SELECT 1 - FROM node_tags AS nt{index} - WHERE nt{index}.node_id = n.id AND nt{index}.tag_name = ?{placeholder} - )" - ); - parameters.push(SqlValue::Text(tag.as_str().to_owned())); - } - let limit_placeholder = parameters.len() + 1; - let _ = write!( - sql, - " - ORDER BY n.updated_at DESC - LIMIT ?{limit_placeholder}" - ); - parameters.push(SqlValue::Integer(i64::from(query.limit))); - let mut statement = self.connection.prepare(&sql)?; - let mut rows = statement.query(params_from_iter(parameters.iter()))?; - let mut items = Vec::new(); - while let Some(row) = rows.next()? { - let diagnostics = decode_json::<NodeDiagnostics>(&row.get::<_, String>(7)?)?; - let node_id = parse_node_id(&row.get::<_, String>(0)?)?; - items.push(NodeSummary { - id: node_id, - class: parse_node_class(&row.get::<_, String>(1)?)?, - track: parse_node_track(&row.get::<_, String>(2)?)?, - frontier_id: row - .get::<_, Option<String>>(3)? - .map(|raw| parse_frontier_id(&raw)) - .transpose()?, - archived: row.get::<_, i64>(4)? != 0, - title: NonEmptyText::new(row.get::<_, String>(5)?)?, - summary: row - .get::<_, Option<String>>(6)? - .map(NonEmptyText::new) - .transpose()?, - tags: self.load_tags(node_id)?, - diagnostic_count: diagnostics.items.len() as u64, - hidden_annotation_count: row.get::<_, i64>(10)? as u64, - created_at: decode_timestamp(&row.get::<_, String>(8)?)?, - updated_at: decode_timestamp(&row.get::<_, String>(9)?)?, - }); + pub fn update_experiment( + &mut self, + request: UpdateExperimentRequest, + ) -> Result<ExperimentRecord, StoreError> { + let record = self.resolve_experiment(&request.experiment)?; + enforce_revision( + "experiment", + &request.experiment, + request.expected_revision, + record.revision, + )?; + if let Some(tags) = request.tags.as_ref() { + self.assert_known_tags(tags)?; } - Ok(items) - } - - pub fn list_frontiers(&self) -> Result<Vec<FrontierRecord>, StoreError> { - let mut statement = self.connection.prepare( - "SELECT id, label, root_contract_node_id, status, created_at, updated_at - FROM frontiers - ORDER BY updated_at DESC", + let outcome = match request.outcome { + Some(patch) => Some(self.materialize_outcome(&patch)?), + None => record.outcome.clone(), + }; + let updated = ExperimentRecord { + title: request.title.unwrap_or(record.title.clone()), + summary: apply_optional_text_patch(request.summary, record.summary.clone()), + tags: request + .tags + .clone() + .map_or_else(|| record.tags.clone(), |tags| tags.into_iter().collect()), + archived: request.archived.unwrap_or(record.archived), + status: if outcome.is_some() { + ExperimentStatus::Closed + } else { + record.status + }, + outcome, + revision: record.revision.saturating_add(1), + updated_at: OffsetDateTime::now_utc(), + ..record + }; + let parents = request + .parents + .as_ref() + .map(|selectors| { + self.resolve_vertex_parents( + updated.frontier_id, + selectors, + Some(VertexRef::Experiment(updated.id)), + ) + }) + .transpose()?; + let transaction = self.connection.transaction()?; + update_experiment_row(&transaction, &updated)?; + replace_experiment_tags( + &transaction, + updated.id, + &updated.tags.iter().cloned().collect::<BTreeSet<_>>(), )?; - let mut rows = statement.query([])?; - let mut items = Vec::new(); - while let Some(row) = rows.next()? { - items.push(read_frontier_row(row)?); + replace_experiment_dimensions(&transaction, updated.id, updated.outcome.as_ref())?; + replace_experiment_metrics(&transaction, updated.id, updated.outcome.as_ref())?; + if let Some(parents) = parents.as_ref() { + replace_influence_parents(&transaction, VertexRef::Experiment(updated.id), parents)?; } - Ok(items) + record_event( + &transaction, + "experiment", + &updated.id.to_string(), + updated.revision, + "updated", + &updated, + )?; + transaction.commit()?; + Ok(updated) } - pub fn frontier_projection( - &self, - frontier_id: fidget_spinner_core::FrontierId, - ) -> Result<FrontierProjection, StoreError> { - let frontier = self.load_frontier(frontier_id)?; - let open_experiment_count = self.connection.query_row( - "SELECT COUNT(*) FROM open_experiments WHERE frontier_id = ?1", - params![frontier_id.to_string()], - |row| row.get::<_, i64>(0), - )? as u64; - let completed_experiment_count = self.connection.query_row( - "SELECT COUNT(*) FROM experiments WHERE frontier_id = ?1", - params![frontier_id.to_string()], - |row| row.get::<_, i64>(0), - )? as u64; - let verdict_counts = self.connection.query_row( - "SELECT - SUM(CASE WHEN verdict = 'accepted' THEN 1 ELSE 0 END), - SUM(CASE WHEN verdict = 'kept' THEN 1 ELSE 0 END), - SUM(CASE WHEN verdict = 'parked' THEN 1 ELSE 0 END), - SUM(CASE WHEN verdict = 'rejected' THEN 1 ELSE 0 END) - FROM experiments - WHERE frontier_id = ?1", - params![frontier_id.to_string()], - |row| { - Ok(FrontierVerdictCounts { - accepted: row.get::<_, Option<i64>>(0)?.unwrap_or(0) as u64, - kept: row.get::<_, Option<i64>>(1)?.unwrap_or(0) as u64, - parked: row.get::<_, Option<i64>>(2)?.unwrap_or(0) as u64, - rejected: row.get::<_, Option<i64>>(3)?.unwrap_or(0) as u64, - }) - }, + pub fn close_experiment( + &mut self, + request: CloseExperimentRequest, + ) -> Result<ExperimentRecord, StoreError> { + let record = self.resolve_experiment(&request.experiment)?; + if record.status == ExperimentStatus::Closed { + return Err(StoreError::ExperimentAlreadyClosed(record.id)); + } + enforce_revision( + "experiment", + &request.experiment, + request.expected_revision, + record.revision, )?; - - Ok(FrontierProjection { - frontier, - open_experiment_count, - completed_experiment_count, - verdict_counts, - }) + let outcome = self.materialize_outcome(&ExperimentOutcomePatch { + backend: request.backend, + command: request.command, + dimensions: request.dimensions, + primary_metric: request.primary_metric, + supporting_metrics: request.supporting_metrics, + verdict: request.verdict, + rationale: request.rationale, + analysis: request.analysis, + })?; + let updated = ExperimentRecord { + status: ExperimentStatus::Closed, + outcome: Some(outcome), + revision: record.revision.saturating_add(1), + updated_at: OffsetDateTime::now_utc(), + ..record + }; + let transaction = self.connection.transaction()?; + update_experiment_row(&transaction, &updated)?; + replace_experiment_dimensions(&transaction, updated.id, updated.outcome.as_ref())?; + replace_experiment_metrics(&transaction, updated.id, updated.outcome.as_ref())?; + record_event( + &transaction, + "experiment", + &updated.id.to_string(), + updated.revision, + "closed", + &updated, + )?; + transaction.commit()?; + Ok(updated) } - pub fn open_experiment( + pub fn create_artifact( &mut self, - request: OpenExperimentRequest, - ) -> Result<OpenExperimentSummary, StoreError> { - let hypothesis_node = self - .get_node(request.hypothesis_node_id)? - .ok_or(StoreError::NodeNotFound(request.hypothesis_node_id))?; - if hypothesis_node.class != NodeClass::Hypothesis { - return Err(StoreError::NodeNotHypothesis(request.hypothesis_node_id)); - } - if hypothesis_node.frontier_id != Some(request.frontier_id) { - return Err(StoreError::FrontierNotFound(request.frontier_id)); - } - let experiment = OpenExperiment { - id: fidget_spinner_core::ExperimentId::fresh(), - frontier_id: request.frontier_id, - hypothesis_node_id: request.hypothesis_node_id, - title: request.title, + request: CreateArtifactRequest, + ) -> Result<ArtifactRecord, StoreError> { + let id = ArtifactId::fresh(); + let slug = self.unique_artifact_slug(request.slug, &request.label)?; + let now = OffsetDateTime::now_utc(); + let record = ArtifactRecord { + id, + slug, + kind: request.kind, + label: request.label, summary: request.summary, - created_at: OffsetDateTime::now_utc(), + locator: request.locator, + media_type: request.media_type, + revision: 1, + created_at: now, + updated_at: now, }; - let tx = self.connection.transaction()?; - insert_open_experiment(&tx, &experiment)?; - touch_frontier(&tx, request.frontier_id)?; - insert_event( - &tx, - "experiment", - &experiment.id.to_string(), - "experiment.opened", - json!({ - "frontier_id": experiment.frontier_id, - "hypothesis_node_id": experiment.hypothesis_node_id, - }), + let attachments = self.resolve_attachment_targets(&request.attachments)?; + let transaction = self.connection.transaction()?; + insert_artifact(&transaction, &record)?; + replace_artifact_attachments(&transaction, record.id, &attachments)?; + record_event( + &transaction, + "artifact", + &record.id.to_string(), + 1, + "created", + &record, )?; - tx.commit()?; - Ok(summarize_open_experiment(&experiment)) + transaction.commit()?; + Ok(record) } - pub fn list_open_experiments( + pub fn list_artifacts( &self, - frontier_id: Option<fidget_spinner_core::FrontierId>, - ) -> Result<Vec<OpenExperimentSummary>, StoreError> { - let mut statement = self.connection.prepare( - "SELECT - id, - frontier_id, - hypothesis_node_id, - title, - summary, - created_at - FROM open_experiments - WHERE (?1 IS NULL OR frontier_id = ?1) - ORDER BY created_at DESC", - )?; - let mut rows = statement.query(params![frontier_id.map(|id| id.to_string())])?; - let mut items = Vec::new(); - while let Some(row) = rows.next()? { - items.push(OpenExperimentSummary { - id: parse_experiment_id(&row.get::<_, String>(0)?)?, - frontier_id: parse_frontier_id(&row.get::<_, String>(1)?)?, - hypothesis_node_id: parse_node_id(&row.get::<_, String>(2)?)?, - title: NonEmptyText::new(row.get::<_, String>(3)?)?, - summary: row - .get::<_, Option<String>>(4)? - .map(NonEmptyText::new) - .transpose()?, - created_at: decode_timestamp(&row.get::<_, String>(5)?)?, - }); + query: ListArtifactsQuery, + ) -> Result<Vec<ArtifactSummary>, StoreError> { + let records = self.load_artifact_records()?; + let frontier_id = query + .frontier + .as_deref() + .map(|selector| self.resolve_frontier(selector).map(|frontier| frontier.id)) + .transpose()?; + let mut filtered = Vec::new(); + for record in records { + if query.kind.is_some_and(|kind| record.kind != kind) { + continue; + } + if let Some(frontier_id) = frontier_id + && !self.artifact_attached_to_frontier(record.id, frontier_id)? + { + continue; + } + filtered.push(record); } - Ok(items) + let attached_filtered = match query.attached_to { + Some(selector) => { + let target = self.resolve_attachment_target(&selector)?; + filtered + .into_iter() + .filter(|record| { + self.artifact_attachment_targets(record.id) + .map(|targets| targets.contains(&target)) + .unwrap_or(false) + }) + .collect() + } + None => filtered, + }; + Ok(apply_limit( + attached_filtered + .into_iter() + .map(|record| ArtifactSummary { + id: record.id, + slug: record.slug, + kind: record.kind, + label: record.label, + summary: record.summary, + locator: record.locator, + media_type: record.media_type, + updated_at: record.updated_at, + }) + .collect(), + query.limit, + )) } - pub fn read_open_experiment( - &self, - experiment_id: fidget_spinner_core::ExperimentId, - ) -> Result<OpenExperimentSummary, StoreError> { - load_open_experiment(&self.connection, experiment_id)? - .map(|experiment| summarize_open_experiment(&experiment)) - .ok_or(StoreError::ExperimentNotFound(experiment_id)) + pub fn read_artifact(&self, selector: &str) -> Result<ArtifactDetail, StoreError> { + let record = self.resolve_artifact(selector)?; + Ok(ArtifactDetail { + attachments: self.artifact_attachment_targets(record.id)?, + record, + }) } - pub fn close_experiment( + pub fn update_artifact( &mut self, - request: CloseExperimentRequest, - ) -> Result<ExperimentReceipt, StoreError> { - let open_experiment = load_open_experiment(&self.connection, request.experiment_id)? - .ok_or(StoreError::ExperimentNotFound(request.experiment_id))?; - let hypothesis_node = self - .get_node(open_experiment.hypothesis_node_id)? - .ok_or(StoreError::NodeNotFound(open_experiment.hypothesis_node_id))?; - if hypothesis_node.class != NodeClass::Hypothesis { - return Err(StoreError::NodeNotHypothesis( - open_experiment.hypothesis_node_id, - )); + request: UpdateArtifactRequest, + ) -> Result<ArtifactRecord, StoreError> { + let record = self.resolve_artifact(&request.artifact)?; + enforce_revision( + "artifact", + &request.artifact, + request.expected_revision, + record.revision, + )?; + let updated = ArtifactRecord { + kind: request.kind.unwrap_or(record.kind), + label: request.label.unwrap_or(record.label.clone()), + summary: apply_optional_text_patch(request.summary, record.summary.clone()), + locator: request.locator.unwrap_or(record.locator.clone()), + media_type: apply_optional_text_patch(request.media_type, record.media_type.clone()), + revision: record.revision.saturating_add(1), + updated_at: OffsetDateTime::now_utc(), + ..record + }; + let attachments = request + .attachments + .as_ref() + .map(|selectors| self.resolve_attachment_targets(selectors)) + .transpose()?; + let transaction = self.connection.transaction()?; + update_artifact_row(&transaction, &updated)?; + if let Some(attachments) = attachments.as_ref() { + replace_artifact_attachments(&transaction, updated.id, attachments)?; } - let tx = self.connection.transaction()?; - let dimensions = validate_run_dimensions_tx(&tx, &request.dimensions)?; - let primary_metric_definition = - load_metric_definition_tx(&tx, &request.primary_metric.key)?.ok_or_else(|| { - StoreError::UnknownMetricDefinition(request.primary_metric.key.clone()) - })?; - let supporting_metric_definitions = request - .supporting_metrics - .iter() - .map(|metric| { - load_metric_definition_tx(&tx, &metric.key)? - .ok_or_else(|| StoreError::UnknownMetricDefinition(metric.key.clone())) + record_event( + &transaction, + "artifact", + &updated.id.to_string(), + updated.revision, + "updated", + &updated, + )?; + transaction.commit()?; + Ok(updated) + } + + pub fn frontier_open(&self, selector: &str) -> Result<FrontierOpenProjection, StoreError> { + let frontier = self.resolve_frontier(selector)?; + let active_hypothesis_ids = self.active_hypothesis_ids(frontier.id, &frontier.brief)?; + let active_hypotheses = active_hypothesis_ids + .into_iter() + .map(|hypothesis_id| { + let summary = + self.hypothesis_summary_from_record(self.hypothesis_by_id(hypothesis_id)?)?; + let open_experiments = self.list_experiments(ListExperimentsQuery { + hypothesis: Some(hypothesis_id.to_string()), + status: Some(ExperimentStatus::Open), + limit: None, + ..ListExperimentsQuery::default() + })?; + let latest_closed_experiment = self + .list_experiments(ListExperimentsQuery { + hypothesis: Some(hypothesis_id.to_string()), + status: Some(ExperimentStatus::Closed), + limit: Some(1), + ..ListExperimentsQuery::default() + })? + .into_iter() + .next(); + Ok(HypothesisCurrentState { + hypothesis: summary, + open_experiments, + latest_closed_experiment, + }) }) .collect::<Result<Vec<_>, StoreError>>()?; - let benchmark_suite = benchmark_suite_label(&dimensions); - - let run_payload = NodePayload::with_schema( - self.schema.schema_ref(), - json_object(json!({ - "dimensions": run_dimensions_json(&dimensions), - "backend": format!("{:?}", request.backend), - "command": request.command.argv.iter().map(NonEmptyText::as_str).collect::<Vec<_>>(), - }))?, - ); - let run_diagnostics = self.schema.validate_node(NodeClass::Run, &run_payload); - let run_node = DagNode::new( - NodeClass::Run, - Some(open_experiment.frontier_id), - request.run_title, - request.run_summary, - run_payload, - run_diagnostics, - ); - let run_id = fidget_spinner_core::RunId::fresh(); - let now = OffsetDateTime::now_utc(); - let run = RunRecord { - node_id: run_node.id, - run_id, - frontier_id: Some(open_experiment.frontier_id), - status: RunStatus::Succeeded, - backend: request.backend, - dimensions: dimensions.clone(), - command: request.command, - started_at: Some(now), - finished_at: Some(now), - }; + let open_experiments = self.list_experiments(ListExperimentsQuery { + frontier: Some(frontier.id.to_string()), + status: Some(ExperimentStatus::Open), + limit: None, + ..ListExperimentsQuery::default() + })?; + let active_tags = derive_active_tags(&active_hypotheses, &open_experiments); + let active_metric_keys = + self.live_metric_keys(frontier.id, &active_hypotheses, &open_experiments)?; + Ok(FrontierOpenProjection { + frontier, + active_tags, + active_metric_keys, + active_hypotheses, + open_experiments, + }) + } - let analysis_node = request - .analysis - .map(|analysis| -> Result<DagNode, StoreError> { - let payload = NodePayload::with_schema( - self.schema.schema_ref(), - json_object(json!({ - "body": analysis.body.as_str(), - }))?, - ); - let diagnostics = self.schema.validate_node(NodeClass::Analysis, &payload); - Ok(DagNode::new( - NodeClass::Analysis, - Some(open_experiment.frontier_id), - analysis.title, - Some(analysis.summary), - payload, - diagnostics, - )) + pub fn metric_keys(&self, query: MetricKeysQuery) -> Result<Vec<MetricKeySummary>, StoreError> { + let frontier_id = query + .frontier + .as_deref() + .map(|selector| self.resolve_frontier(selector).map(|frontier| frontier.id)) + .transpose()?; + let definitions = self.list_metric_definitions()?; + let live_keys = frontier_id + .map(|frontier_id| self.live_metric_key_names(frontier_id)) + .transpose()? + .unwrap_or_default(); + let mut keys = definitions + .into_iter() + .filter(|definition| match query.scope { + MetricScope::Live => live_keys.contains(definition.key.as_str()), + MetricScope::Visible => definition.visibility.is_default_visible(), + MetricScope::All => true, }) + .map(|definition| { + Ok(MetricKeySummary { + reference_count: self.metric_reference_count(frontier_id, &definition.key)?, + key: definition.key, + unit: definition.unit, + objective: definition.objective, + visibility: definition.visibility, + description: definition.description, + }) + }) + .collect::<Result<Vec<_>, StoreError>>()?; + keys.sort_by(|left, right| left.key.as_str().cmp(right.key.as_str())); + Ok(keys) + } + + pub fn metric_best(&self, query: MetricBestQuery) -> Result<Vec<MetricBestEntry>, StoreError> { + let definition = self + .metric_definition(&query.key)? + .ok_or_else(|| StoreError::UnknownMetricDefinition(query.key.clone()))?; + let frontier_id = query + .frontier + .as_deref() + .map(|selector| self.resolve_frontier(selector).map(|frontier| frontier.id)) .transpose()?; + let hypothesis_id = query + .hypothesis + .as_deref() + .map(|selector| { + self.resolve_hypothesis(selector) + .map(|hypothesis| hypothesis.id) + }) + .transpose()?; + let order = query.order.unwrap_or(match definition.objective { + OptimizationObjective::Minimize => MetricRankOrder::Asc, + OptimizationObjective::Maximize => MetricRankOrder::Desc, + OptimizationObjective::Target => { + return Err(StoreError::MetricOrderRequired { + key: query.key.to_string(), + }); + } + }); + let experiments = self + .load_experiment_records(frontier_id, hypothesis_id, true)? + .into_iter() + .filter(|record| record.status == ExperimentStatus::Closed) + .filter(|record| { + query.include_rejected + || record + .outcome + .as_ref() + .is_some_and(|outcome| outcome.verdict != FrontierVerdict::Rejected) + }) + .collect::<Vec<_>>(); + let mut entries = experiments + .into_iter() + .filter_map(|record| { + let outcome = record.outcome.clone()?; + if !dimension_subset_matches(&query.dimensions, &outcome.dimensions) { + return None; + } + let metric = all_metrics(&outcome) + .into_iter() + .find(|metric| metric.key == query.key)?; + Some((record, outcome.dimensions.clone(), metric.value)) + }) + .map(|(record, dimensions, value)| { + Ok(MetricBestEntry { + experiment: self.experiment_summary_from_record(record.clone())?, + hypothesis: self.hypothesis_summary_from_record( + self.hypothesis_by_id(record.hypothesis_id)?, + )?, + value, + dimensions, + }) + }) + .collect::<Result<Vec<_>, StoreError>>()?; + entries.sort_by(|left, right| compare_metric_values(left.value, right.value, order)); + Ok(apply_limit(entries, query.limit)) + } - let decision_payload = NodePayload::with_schema( - self.schema.schema_ref(), - json_object(json!({ - "verdict": format!("{:?}", request.verdict), - "rationale": request.decision_rationale.as_str(), - }))?, - ); - let decision_diagnostics = self - .schema - .validate_node(NodeClass::Decision, &decision_payload); - let decision_node = DagNode::new( - NodeClass::Decision, - Some(open_experiment.frontier_id), - request.decision_title, - Some(request.decision_rationale.clone()), - decision_payload, - decision_diagnostics, - ); + pub fn frontier_history(&self, selector: &str) -> Result<Vec<EntityHistoryEntry>, StoreError> { + let frontier = self.resolve_frontier(selector)?; + self.entity_history("frontier", &frontier.id.to_string()) + } - let experiment = CompletedExperiment { - id: open_experiment.id, - frontier_id: open_experiment.frontier_id, - hypothesis_node_id: open_experiment.hypothesis_node_id, - run_node_id: run_node.id, - run_id, - analysis_node_id: analysis_node.as_ref().map(|node| node.id), - decision_node_id: decision_node.id, - title: open_experiment.title.clone(), - summary: open_experiment.summary.clone(), - result: ExperimentResult { - dimensions: dimensions.clone(), - primary_metric: request.primary_metric, - supporting_metrics: request.supporting_metrics, - benchmark_bundle: None, - }, - note: request.note, - verdict: request.verdict, - created_at: now, + pub fn hypothesis_history( + &self, + selector: &str, + ) -> Result<Vec<EntityHistoryEntry>, StoreError> { + let hypothesis = self.resolve_hypothesis(selector)?; + self.entity_history("hypothesis", &hypothesis.id.to_string()) + } + + pub fn experiment_history( + &self, + selector: &str, + ) -> Result<Vec<EntityHistoryEntry>, StoreError> { + let experiment = self.resolve_experiment(selector)?; + self.entity_history("experiment", &experiment.id.to_string()) + } + + pub fn artifact_history(&self, selector: &str) -> Result<Vec<EntityHistoryEntry>, StoreError> { + let artifact = self.resolve_artifact(selector)?; + self.entity_history("artifact", &artifact.id.to_string()) + } + + fn metric_definition( + &self, + key: &NonEmptyText, + ) -> Result<Option<MetricDefinition>, StoreError> { + self.connection + .query_row( + "SELECT key, unit, objective, visibility, description, created_at, updated_at + FROM metric_definitions + WHERE key = ?1", + params![key.as_str()], + decode_metric_definition_row, + ) + .optional() + .map_err(StoreError::from) + } + + fn run_dimension_definition( + &self, + key: &NonEmptyText, + ) -> Result<Option<RunDimensionDefinition>, StoreError> { + self.connection + .query_row( + "SELECT key, value_type, description, created_at, updated_at + FROM run_dimension_definitions + WHERE key = ?1", + params![key.as_str()], + decode_run_dimension_definition_row, + ) + .optional() + .map_err(StoreError::from) + } + + fn hypothesis_by_id(&self, id: HypothesisId) -> Result<HypothesisRecord, StoreError> { + self.connection + .query_row( + "SELECT id, slug, frontier_id, archived, title, summary, body, revision, created_at, updated_at + FROM hypotheses WHERE id = ?1", + params![id.to_string()], + |row| self.decode_hypothesis_row(row), + ) + .map_err(StoreError::from) + } + + fn resolve_frontier(&self, selector: &str) -> Result<FrontierRecord, StoreError> { + let record = match resolve_selector(selector)? { + Selector::Id(uuid) => self + .connection + .query_row( + "SELECT id, slug, label, objective, status, brief_json, revision, created_at, updated_at + FROM frontiers WHERE id = ?1", + params![uuid.to_string()], + decode_frontier_row, + ) + .optional()?, + Selector::Slug(slug) => self + .connection + .query_row( + "SELECT id, slug, label, objective, status, brief_json, revision, created_at, updated_at + FROM frontiers WHERE slug = ?1", + params![slug.as_str()], + decode_frontier_row, + ) + .optional()?, }; - insert_node(&tx, &run_node)?; - if let Some(node) = analysis_node.as_ref() { - insert_node(&tx, node)?; + record.ok_or_else(|| StoreError::UnknownFrontierSelector(selector.to_owned())) + } + + fn resolve_hypothesis(&self, selector: &str) -> Result<HypothesisRecord, StoreError> { + let record = match resolve_selector(selector)? { + Selector::Id(uuid) => self + .connection + .query_row( + "SELECT id, slug, frontier_id, archived, title, summary, body, revision, created_at, updated_at + FROM hypotheses WHERE id = ?1", + params![uuid.to_string()], + |row| self.decode_hypothesis_row(row), + ) + .optional()?, + Selector::Slug(slug) => self + .connection + .query_row( + "SELECT id, slug, frontier_id, archived, title, summary, body, revision, created_at, updated_at + FROM hypotheses WHERE slug = ?1", + params![slug.as_str()], + |row| self.decode_hypothesis_row(row), + ) + .optional()?, + }; + record.ok_or_else(|| StoreError::UnknownHypothesisSelector(selector.to_owned())) + } + + fn resolve_experiment(&self, selector: &str) -> Result<ExperimentRecord, StoreError> { + let record = match resolve_selector(selector)? { + Selector::Id(uuid) => self + .connection + .query_row( + "SELECT id, slug, frontier_id, hypothesis_id, archived, title, summary, tags_json, status, outcome_json, revision, created_at, updated_at + FROM experiments WHERE id = ?1", + params![uuid.to_string()], + decode_experiment_row, + ) + .optional()?, + Selector::Slug(slug) => self + .connection + .query_row( + "SELECT id, slug, frontier_id, hypothesis_id, archived, title, summary, tags_json, status, outcome_json, revision, created_at, updated_at + FROM experiments WHERE slug = ?1", + params![slug.as_str()], + decode_experiment_row, + ) + .optional()?, + }; + record.ok_or_else(|| StoreError::UnknownExperimentSelector(selector.to_owned())) + } + + fn resolve_artifact(&self, selector: &str) -> Result<ArtifactRecord, StoreError> { + let record = match resolve_selector(selector)? { + Selector::Id(uuid) => self + .connection + .query_row( + "SELECT id, slug, kind, label, summary, locator, media_type, revision, created_at, updated_at + FROM artifacts WHERE id = ?1", + params![uuid.to_string()], + decode_artifact_row, + ) + .optional()?, + Selector::Slug(slug) => self + .connection + .query_row( + "SELECT id, slug, kind, label, summary, locator, media_type, revision, created_at, updated_at + FROM artifacts WHERE slug = ?1", + params![slug.as_str()], + decode_artifact_row, + ) + .optional()?, + }; + record.ok_or_else(|| StoreError::UnknownArtifactSelector(selector.to_owned())) + } + + fn resolve_vertex_parents( + &self, + frontier_id: FrontierId, + selectors: &[VertexSelector], + child: Option<VertexRef>, + ) -> Result<Vec<VertexRef>, StoreError> { + selectors + .iter() + .map(|selector| { + let vertex = match selector { + VertexSelector::Hypothesis(selector) => { + VertexRef::Hypothesis(self.resolve_hypothesis(selector)?.id) + } + VertexSelector::Experiment(selector) => { + VertexRef::Experiment(self.resolve_experiment(selector)?.id) + } + }; + let parent_frontier_id = match vertex { + VertexRef::Hypothesis(id) => self.hypothesis_by_id(id)?.frontier_id, + VertexRef::Experiment(id) => { + self.resolve_experiment(&id.to_string())?.frontier_id + } + }; + if parent_frontier_id != frontier_id { + return Err(StoreError::CrossFrontierInfluence); + } + if child.is_some_and(|child| child == vertex) { + return Err(StoreError::SelfEdge); + } + Ok(vertex) + }) + .collect() + } + + fn resolve_attachment_targets( + &self, + selectors: &[AttachmentSelector], + ) -> Result<Vec<AttachmentTargetRef>, StoreError> { + selectors + .iter() + .map(|selector| match selector { + AttachmentSelector::Frontier(selector) => Ok(AttachmentTargetRef::Frontier( + self.resolve_frontier(selector)?.id, + )), + AttachmentSelector::Hypothesis(selector) => Ok(AttachmentTargetRef::Hypothesis( + self.resolve_hypothesis(selector)?.id, + )), + AttachmentSelector::Experiment(selector) => Ok(AttachmentTargetRef::Experiment( + self.resolve_experiment(selector)?.id, + )), + }) + .collect() + } + + fn resolve_attachment_target( + &self, + selector: &AttachmentSelector, + ) -> Result<AttachmentTargetRef, StoreError> { + match selector { + AttachmentSelector::Frontier(selector) => Ok(AttachmentTargetRef::Frontier( + self.resolve_frontier(selector)?.id, + )), + AttachmentSelector::Hypothesis(selector) => Ok(AttachmentTargetRef::Hypothesis( + self.resolve_hypothesis(selector)?.id, + )), + AttachmentSelector::Experiment(selector) => Ok(AttachmentTargetRef::Experiment( + self.resolve_experiment(selector)?.id, + )), } - insert_node(&tx, &decision_node)?; - insert_edge( - &tx, - &DagEdge { - source_id: open_experiment.hypothesis_node_id, - target_id: run_node.id, - kind: EdgeKind::Lineage, - }, - )?; - if let Some(node) = analysis_node.as_ref() { - insert_edge( - &tx, - &DagEdge { - source_id: run_node.id, - target_id: node.id, - kind: EdgeKind::Evidence, - }, - )?; - insert_edge( - &tx, - &DagEdge { - source_id: node.id, - target_id: decision_node.id, - kind: EdgeKind::Evidence, - }, + } + + fn load_hypothesis_records( + &self, + frontier_id: Option<FrontierId>, + include_archived: bool, + ) -> Result<Vec<HypothesisRecord>, StoreError> { + let mut records = if let Some(frontier_id) = frontier_id { + let mut statement = self.connection.prepare( + "SELECT id, slug, frontier_id, archived, title, summary, body, revision, created_at, updated_at + FROM hypotheses + WHERE frontier_id = ?1 + ORDER BY updated_at DESC, created_at DESC", )?; + let rows = statement.query_map(params![frontier_id.to_string()], |row| { + self.decode_hypothesis_row(row) + })?; + rows.collect::<Result<Vec<_>, _>>()? } else { - insert_edge( - &tx, - &DagEdge { - source_id: run_node.id, - target_id: decision_node.id, - kind: EdgeKind::Evidence, - }, + let mut statement = self.connection.prepare( + "SELECT id, slug, frontier_id, archived, title, summary, body, revision, created_at, updated_at + FROM hypotheses + ORDER BY updated_at DESC, created_at DESC", )?; + let rows = statement.query_map([], |row| self.decode_hypothesis_row(row))?; + rows.collect::<Result<Vec<_>, _>>()? + }; + if !include_archived { + records.retain(|record| !record.archived); } - insert_run( - &tx, - &run, - benchmark_suite.as_deref(), - &experiment.result.primary_metric, - &primary_metric_definition, - &experiment.result.supporting_metrics, - supporting_metric_definitions.as_slice(), - )?; - insert_run_dimensions(&tx, run.run_id, &dimensions)?; - insert_experiment(&tx, &experiment)?; - delete_open_experiment(&tx, open_experiment.id)?; - touch_frontier(&tx, open_experiment.frontier_id)?; - insert_event( - &tx, - "experiment", - &experiment.id.to_string(), - "experiment.closed", - json!({ - "frontier_id": open_experiment.frontier_id, - "hypothesis_node_id": open_experiment.hypothesis_node_id, - "verdict": format!("{:?}", request.verdict), - }), - )?; - tx.commit()?; - - Ok(ExperimentReceipt { - open_experiment, - run_node, - run, - analysis_node, - decision_node, - experiment, - }) + Ok(records) } - fn load_annotations( + fn load_experiment_records( &self, - node_id: fidget_spinner_core::NodeId, - ) -> Result<Vec<NodeAnnotation>, StoreError> { + frontier_id: Option<FrontierId>, + hypothesis_id: Option<HypothesisId>, + include_archived: bool, + ) -> Result<Vec<ExperimentRecord>, StoreError> { + let base_sql = "SELECT id, slug, frontier_id, hypothesis_id, archived, title, summary, tags_json, status, outcome_json, revision, created_at, updated_at FROM experiments"; + let records = match (frontier_id, hypothesis_id) { + (Some(frontier_id), Some(hypothesis_id)) => { + let mut statement = self.connection.prepare(&format!( + "{base_sql} WHERE frontier_id = ?1 AND hypothesis_id = ?2 ORDER BY updated_at DESC, created_at DESC" + ))?; + let rows = statement.query_map( + params![frontier_id.to_string(), hypothesis_id.to_string()], + decode_experiment_row, + )?; + rows.collect::<Result<Vec<_>, _>>()? + } + (Some(frontier_id), None) => { + let mut statement = self.connection.prepare(&format!( + "{base_sql} WHERE frontier_id = ?1 ORDER BY updated_at DESC, created_at DESC" + ))?; + let rows = + statement.query_map(params![frontier_id.to_string()], decode_experiment_row)?; + rows.collect::<Result<Vec<_>, _>>()? + } + (None, Some(hypothesis_id)) => { + let mut statement = self.connection.prepare(&format!( + "{base_sql} WHERE hypothesis_id = ?1 ORDER BY updated_at DESC, created_at DESC" + ))?; + let rows = statement + .query_map(params![hypothesis_id.to_string()], decode_experiment_row)?; + rows.collect::<Result<Vec<_>, _>>()? + } + (None, None) => { + let mut statement = self.connection.prepare(&format!( + "{base_sql} ORDER BY updated_at DESC, created_at DESC" + ))?; + let rows = statement.query_map([], decode_experiment_row)?; + rows.collect::<Result<Vec<_>, _>>()? + } + }; + Ok(if include_archived { + records + } else { + records + .into_iter() + .filter(|record| !record.archived) + .collect() + }) + } + + fn load_artifact_records(&self) -> Result<Vec<ArtifactRecord>, StoreError> { let mut statement = self.connection.prepare( - "SELECT id, visibility, label, body, created_at - FROM node_annotations - WHERE node_id = ?1 - ORDER BY created_at ASC", + "SELECT id, slug, kind, label, summary, locator, media_type, revision, created_at, updated_at + FROM artifacts + ORDER BY updated_at DESC, created_at DESC", )?; - let mut rows = statement.query(params![node_id.to_string()])?; - let mut items = Vec::new(); - while let Some(row) = rows.next()? { - items.push(NodeAnnotation { - id: parse_annotation_id(&row.get::<_, String>(0)?)?, - visibility: parse_annotation_visibility(&row.get::<_, String>(1)?)?, - label: row - .get::<_, Option<String>>(2)? - .map(NonEmptyText::new) - .transpose()?, - body: NonEmptyText::new(row.get::<_, String>(3)?)?, - created_at: decode_timestamp(&row.get::<_, String>(4)?)?, - }); - } - Ok(items) + let rows = statement.query_map([], decode_artifact_row)?; + rows.collect::<Result<Vec<_>, _>>() + .map_err(StoreError::from) } - fn load_tags( + fn decode_hypothesis_row( &self, - node_id: fidget_spinner_core::NodeId, - ) -> Result<BTreeSet<TagName>, StoreError> { + row: &rusqlite::Row<'_>, + ) -> Result<HypothesisRecord, rusqlite::Error> { + let id = HypothesisId::from_uuid(parse_uuid_sql(&row.get::<_, String>(0)?)?); + Ok(HypothesisRecord { + id, + slug: parse_slug(&row.get::<_, String>(1)?)?, + frontier_id: FrontierId::from_uuid(parse_uuid_sql(&row.get::<_, String>(2)?)?), + archived: row.get::<_, i64>(3)? != 0, + title: parse_non_empty_text(&row.get::<_, String>(4)?)?, + summary: parse_non_empty_text(&row.get::<_, String>(5)?)?, + body: parse_non_empty_text(&row.get::<_, String>(6)?)?, + tags: self.hypothesis_tags(id)?, + revision: row.get::<_, u64>(7)?, + created_at: parse_timestamp_sql(&row.get::<_, String>(8)?)?, + updated_at: parse_timestamp_sql(&row.get::<_, String>(9)?)?, + }) + } + + fn hypothesis_tags(&self, id: HypothesisId) -> Result<Vec<TagName>, rusqlite::Error> { let mut statement = self.connection.prepare( - "SELECT tag_name - FROM node_tags - WHERE node_id = ?1 - ORDER BY tag_name ASC", + "SELECT tag_name FROM hypothesis_tags WHERE hypothesis_id = ?1 ORDER BY tag_name ASC", )?; - let mut rows = statement.query(params![node_id.to_string()])?; - let mut items = BTreeSet::new(); - while let Some(row) = rows.next()? { - let _ = items.insert(TagName::new(row.get::<_, String>(0)?)?); - } - Ok(items) + let rows = statement.query_map(params![id.to_string()], |row| { + parse_tag_name(&row.get::<_, String>(0)?) + })?; + rows.collect::<Result<Vec<_>, _>>() } - fn load_frontier( + fn hypothesis_summary_from_record( &self, - frontier_id: fidget_spinner_core::FrontierId, - ) -> Result<FrontierRecord, StoreError> { - let mut statement = self.connection.prepare( - "SELECT id, label, root_contract_node_id, status, created_at, updated_at - FROM frontiers - WHERE id = ?1", - )?; - let frontier = statement - .query_row(params![frontier_id.to_string()], |row| { - read_frontier_row(row).map_err(to_sql_conversion_error) - }) - .optional()?; - frontier.ok_or(StoreError::FrontierNotFound(frontier_id)) + record: HypothesisRecord, + ) -> Result<HypothesisSummary, StoreError> { + let latest_verdict = self + .latest_closed_experiment(record.id)? + .and_then(|experiment| experiment.outcome.map(|outcome| outcome.verdict)); + Ok(HypothesisSummary { + id: record.id, + slug: record.slug, + frontier_id: record.frontier_id, + archived: record.archived, + title: record.title, + summary: record.summary, + tags: record.tags, + open_experiment_count: self + .list_experiments(ListExperimentsQuery { + hypothesis: Some(record.id.to_string()), + status: Some(ExperimentStatus::Open), + limit: None, + ..ListExperimentsQuery::default() + })? + .len() as u64, + latest_verdict, + updated_at: record.updated_at, + }) } -} -fn upgrade_store(connection: &mut Connection) -> Result<(), StoreError> { - migrate(connection)?; - backfill_prose_summaries(connection)?; - let tx = connection.transaction()?; - let _ = normalize_metric_plane_tx(&tx)?; - tx.commit()?; - Ok(()) -} + fn experiment_summary_from_record( + &self, + record: ExperimentRecord, + ) -> Result<ExperimentSummary, StoreError> { + Ok(ExperimentSummary { + id: record.id, + slug: record.slug, + frontier_id: record.frontier_id, + hypothesis_id: record.hypothesis_id, + archived: record.archived, + title: record.title, + summary: record.summary, + tags: record.tags, + status: record.status, + verdict: record.outcome.as_ref().map(|outcome| outcome.verdict), + primary_metric: record + .outcome + .as_ref() + .map(|outcome| self.metric_observation_summary(&outcome.primary_metric)) + .transpose()?, + updated_at: record.updated_at, + closed_at: record.outcome.as_ref().map(|outcome| outcome.closed_at), + }) + } -fn validate_prose_node_request(request: &CreateNodeRequest) -> Result<(), StoreError> { - if !matches!(request.class, NodeClass::Note | NodeClass::Source) { - return Ok(()); + fn metric_observation_summary( + &self, + metric: &MetricValue, + ) -> Result<MetricObservationSummary, StoreError> { + let definition = self + .metric_definition(&metric.key)? + .ok_or_else(|| StoreError::UnknownMetricDefinition(metric.key.clone()))?; + Ok(MetricObservationSummary { + key: metric.key.clone(), + value: metric.value, + unit: definition.unit, + objective: definition.objective, + }) } - if request.summary.is_none() { - return Err(StoreError::ProseSummaryRequired(request.class)); + + fn latest_closed_experiment( + &self, + hypothesis_id: HypothesisId, + ) -> Result<Option<ExperimentRecord>, StoreError> { + self.load_experiment_records(None, Some(hypothesis_id), true) + .map(|records| { + records + .into_iter() + .filter(|record| record.status == ExperimentStatus::Closed) + .max_by_key(|record| { + record + .outcome + .as_ref() + .map(|outcome| outcome.closed_at) + .unwrap_or(record.updated_at) + }) + }) } - match request.payload.field("body") { - Some(Value::String(body)) if !body.trim().is_empty() => Ok(()), - _ => Err(StoreError::ProseBodyRequired(request.class)), + + fn load_vertex_parents(&self, child: VertexRef) -> Result<Vec<VertexSummary>, StoreError> { + let mut statement = self.connection.prepare( + "SELECT parent_kind, parent_id + FROM influence_edges + WHERE child_kind = ?1 AND child_id = ?2 + ORDER BY ordinal ASC, parent_kind ASC, parent_id ASC", + )?; + let rows = statement.query_map( + params![vertex_kind_name(child), child.opaque_id()], + |row| -> Result<VertexRef, rusqlite::Error> { + decode_vertex_ref(&row.get::<_, String>(0)?, &row.get::<_, String>(1)?) + }, + )?; + rows.collect::<Result<Vec<_>, _>>()? + .into_iter() + .map(|parent| self.vertex_summary(parent)) + .collect() } -} -#[derive(Clone, Debug)] -struct MetricSample { - key: NonEmptyText, - source: MetricFieldSource, - value: f64, - frontier_id: fidget_spinner_core::FrontierId, - experiment_id: fidget_spinner_core::ExperimentId, - experiment_title: NonEmptyText, - hypothesis_node_id: fidget_spinner_core::NodeId, - hypothesis_title: NonEmptyText, - run_id: fidget_spinner_core::RunId, - verdict: FrontierVerdict, - unit: Option<MetricUnit>, - objective: Option<OptimizationObjective>, - dimensions: BTreeMap<NonEmptyText, RunDimensionValue>, -} + fn load_vertex_children(&self, parent: VertexRef) -> Result<Vec<VertexSummary>, StoreError> { + let mut statement = self.connection.prepare( + "SELECT child_kind, child_id + FROM influence_edges + WHERE parent_kind = ?1 AND parent_id = ?2 + ORDER BY ordinal ASC, child_kind ASC, child_id ASC", + )?; + let rows = statement.query_map( + params![vertex_kind_name(parent), parent.opaque_id()], + |row| -> Result<VertexRef, rusqlite::Error> { + decode_vertex_ref(&row.get::<_, String>(0)?, &row.get::<_, String>(1)?) + }, + )?; + rows.collect::<Result<Vec<_>, _>>()? + .into_iter() + .map(|child| self.vertex_summary(child)) + .collect() + } -impl MetricSample { - fn into_entry(self, order: MetricRankOrder) -> MetricBestEntry { - MetricBestEntry { - key: self.key, - source: self.source, - value: self.value, - order, - experiment_id: self.experiment_id, - experiment_title: self.experiment_title, - frontier_id: self.frontier_id, - hypothesis_node_id: self.hypothesis_node_id, - hypothesis_title: self.hypothesis_title, - run_id: self.run_id, - verdict: self.verdict, - unit: self.unit, - objective: self.objective, - dimensions: self.dimensions, + fn vertex_summary(&self, vertex: VertexRef) -> Result<VertexSummary, StoreError> { + match vertex { + VertexRef::Hypothesis(id) => { + let record = self.hypothesis_by_id(id)?; + Ok(VertexSummary { + vertex, + frontier_id: record.frontier_id, + slug: record.slug, + archived: record.archived, + title: record.title, + summary: Some(record.summary), + updated_at: record.updated_at, + }) + } + VertexRef::Experiment(id) => { + let record = self.resolve_experiment(&id.to_string())?; + Ok(VertexSummary { + vertex, + frontier_id: record.frontier_id, + slug: record.slug, + archived: record.archived, + title: record.title, + summary: record.summary, + updated_at: record.updated_at, + }) + } } } -} -#[derive(Clone, Debug)] -struct MetricKeyAccumulator { - key: NonEmptyText, - source: MetricFieldSource, - experiment_ids: BTreeSet<fidget_spinner_core::ExperimentId>, - unit: Option<MetricUnit>, - objective: Option<OptimizationObjective>, - ambiguous_semantics: bool, -} + fn artifact_attachment_targets( + &self, + artifact_id: ArtifactId, + ) -> Result<Vec<AttachmentTargetRef>, StoreError> { + let mut statement = self.connection.prepare( + "SELECT target_kind, target_id + FROM artifact_attachments + WHERE artifact_id = ?1 + ORDER BY ordinal ASC, target_kind ASC, target_id ASC", + )?; + let rows = statement.query_map(params![artifact_id.to_string()], |row| { + decode_attachment_target(&row.get::<_, String>(0)?, &row.get::<_, String>(1)?) + })?; + rows.collect::<Result<Vec<_>, _>>() + .map_err(StoreError::from) + } -impl MetricKeyAccumulator { - fn from_sample(sample: &MetricSample) -> Self { - Self { - key: sample.key.clone(), - source: sample.source, - experiment_ids: BTreeSet::from([sample.experiment_id]), - unit: sample.unit, - objective: sample.objective, - ambiguous_semantics: false, + fn artifact_attached_to_frontier( + &self, + artifact_id: ArtifactId, + frontier_id: FrontierId, + ) -> Result<bool, StoreError> { + let targets = self.artifact_attachment_targets(artifact_id)?; + if targets.contains(&AttachmentTargetRef::Frontier(frontier_id)) { + return Ok(true); + } + for target in targets { + match target { + AttachmentTargetRef::Hypothesis(hypothesis_id) => { + if self.hypothesis_by_id(hypothesis_id)?.frontier_id == frontier_id { + return Ok(true); + } + } + AttachmentTargetRef::Experiment(experiment_id) => { + if self + .resolve_experiment(&experiment_id.to_string())? + .frontier_id + == frontier_id + { + return Ok(true); + } + } + AttachmentTargetRef::Frontier(_) => {} + } } + Ok(false) } - fn observe(&mut self, sample: &MetricSample) { - let _ = self.experiment_ids.insert(sample.experiment_id); - if self.unit != sample.unit || self.objective != sample.objective { - self.ambiguous_semantics = true; - self.unit = None; - self.objective = None; + fn active_hypothesis_ids( + &self, + frontier_id: FrontierId, + brief: &FrontierBrief, + ) -> Result<BTreeSet<HypothesisId>, StoreError> { + let mut ids = brief + .roadmap + .iter() + .map(|item| item.hypothesis_id) + .collect::<BTreeSet<_>>(); + for experiment in self.list_experiments(ListExperimentsQuery { + frontier: Some(frontier_id.to_string()), + status: Some(ExperimentStatus::Open), + limit: None, + ..ListExperimentsQuery::default() + })? { + let _ = ids.insert(experiment.hypothesis_id); } + Ok(ids) } - fn finish(self) -> MetricKeySummary { - MetricKeySummary { - key: self.key, - source: self.source, - experiment_count: self.experiment_ids.len() as u64, - unit: self.unit, - objective: self.objective, - description: None, - requires_order: self.source != MetricFieldSource::RunMetric - || self.ambiguous_semantics - || !matches!( - self.objective, - Some(OptimizationObjective::Minimize | OptimizationObjective::Maximize) - ), - } + fn active_hypothesis_count(&self, frontier_id: FrontierId) -> Result<u64, StoreError> { + let frontier = self.read_frontier(&frontier_id.to_string())?; + Ok(self + .active_hypothesis_ids(frontier_id, &frontier.brief)? + .len() as u64) } -} -fn collect_metric_samples( - store: &ProjectStore, - query: &MetricKeyQuery, -) -> Result<Vec<MetricSample>, StoreError> { - let rows = load_experiment_rows(store)?; - let metric_definitions = metric_definitions_by_key(store)?; - let mut samples = Vec::new(); - for row in rows { - if query - .frontier_id - .is_some_and(|frontier_id| row.frontier_id != frontier_id) - { - continue; + fn open_experiment_count(&self, frontier_id: Option<FrontierId>) -> Result<u64, StoreError> { + Ok(self + .load_experiment_records(frontier_id, None, false)? + .into_iter() + .filter(|record| record.status == ExperimentStatus::Open) + .count() as u64) + } + + fn live_metric_keys( + &self, + frontier_id: FrontierId, + active_hypotheses: &[HypothesisCurrentState], + open_experiments: &[ExperimentSummary], + ) -> Result<Vec<MetricKeySummary>, StoreError> { + let live_names = self.live_metric_key_names_with_context( + frontier_id, + active_hypotheses, + open_experiments, + )?; + let mut keys = self + .list_metric_definitions()? + .into_iter() + .filter(|definition| live_names.contains(definition.key.as_str())) + .filter(|definition| definition.visibility.is_default_visible()) + .map(|definition| { + Ok(MetricKeySummary { + reference_count: self + .metric_reference_count(Some(frontier_id), &definition.key)?, + key: definition.key, + unit: definition.unit, + objective: definition.objective, + visibility: definition.visibility, + description: definition.description, + }) + }) + .collect::<Result<Vec<_>, StoreError>>()?; + keys.sort_by(|left, right| left.key.as_str().cmp(right.key.as_str())); + Ok(keys) + } + + fn live_metric_key_names( + &self, + frontier_id: FrontierId, + ) -> Result<BTreeSet<String>, StoreError> { + let frontier = self.read_frontier(&frontier_id.to_string())?; + let active_hypotheses = self + .active_hypothesis_ids(frontier_id, &frontier.brief)? + .into_iter() + .map(|hypothesis_id| { + let summary = + self.hypothesis_summary_from_record(self.hypothesis_by_id(hypothesis_id)?)?; + let open_experiments = self.list_experiments(ListExperimentsQuery { + hypothesis: Some(hypothesis_id.to_string()), + status: Some(ExperimentStatus::Open), + limit: None, + ..ListExperimentsQuery::default() + })?; + let latest_closed_experiment = self + .list_experiments(ListExperimentsQuery { + hypothesis: Some(hypothesis_id.to_string()), + status: Some(ExperimentStatus::Closed), + limit: Some(1), + ..ListExperimentsQuery::default() + })? + .into_iter() + .next(); + Ok(HypothesisCurrentState { + hypothesis: summary, + open_experiments, + latest_closed_experiment, + }) + }) + .collect::<Result<Vec<_>, StoreError>>()?; + let open_experiments = self.list_experiments(ListExperimentsQuery { + frontier: Some(frontier_id.to_string()), + status: Some(ExperimentStatus::Open), + limit: None, + ..ListExperimentsQuery::default() + })?; + self.live_metric_key_names_with_context(frontier_id, &active_hypotheses, &open_experiments) + } + + fn live_metric_key_names_with_context( + &self, + _frontier_id: FrontierId, + active_hypotheses: &[HypothesisCurrentState], + open_experiments: &[ExperimentSummary], + ) -> Result<BTreeSet<String>, StoreError> { + let mut keys = BTreeSet::new(); + for state in active_hypotheses { + if let Some(experiment) = state.latest_closed_experiment.as_ref() { + keys.extend(self.experiment_metric_key_names(experiment.id)?); + } } - if !dimensions_match(&row.dimensions, &query.dimensions) { - continue; + for experiment in open_experiments { + for parent in self.load_vertex_parents(VertexRef::Experiment(experiment.id))? { + if let VertexRef::Experiment(parent_id) = parent.vertex { + keys.extend(self.experiment_metric_key_names(parent_id)?); + } + } } - samples.extend(metric_samples_for_row( - store.schema(), - &row, - &metric_definitions, - )); + Ok(keys) } - Ok(if let Some(source) = query.source { - samples - .into_iter() - .filter(|sample| sample.source == source) - .collect() - } else { - samples - }) -} -fn resolve_metric_order( - matching: &[MetricSample], - query: &MetricBestQuery, - source: MetricFieldSource, -) -> Result<MetricRankOrder, StoreError> { - if let Some(order) = query.order { - return Ok(order); + fn experiment_metric_key_names( + &self, + experiment_id: ExperimentId, + ) -> Result<BTreeSet<String>, StoreError> { + let record = self.resolve_experiment(&experiment_id.to_string())?; + Ok(record + .outcome + .as_ref() + .map(all_metrics) + .unwrap_or_default() + .into_iter() + .map(|metric| metric.key.to_string()) + .collect()) } - if source != MetricFieldSource::RunMetric { - return Err(StoreError::MetricOrderRequired { - key: query.key.as_str().to_owned(), - metric_source: source.as_str().to_owned(), - }); + + fn metric_reference_count( + &self, + frontier_id: Option<FrontierId>, + key: &NonEmptyText, + ) -> Result<u64, StoreError> { + let base_sql = "SELECT COUNT(*) + FROM experiment_metrics metrics + JOIN experiments experiments ON experiments.id = metrics.experiment_id"; + let count = if let Some(frontier_id) = frontier_id { + self.connection.query_row( + &format!("{base_sql} WHERE metrics.key = ?1 AND experiments.frontier_id = ?2"), + params![key.as_str(), frontier_id.to_string()], + |row| row.get::<_, u64>(0), + )? + } else { + self.connection.query_row( + &format!("{base_sql} WHERE metrics.key = ?1"), + params![key.as_str()], + |row| row.get::<_, u64>(0), + )? + }; + Ok(count) } - let objectives = matching - .iter() - .map(|sample| sample.objective) - .collect::<BTreeSet<_>>(); - match objectives.len() { - 1 => match objectives.into_iter().next().flatten() { - Some(OptimizationObjective::Minimize) => Ok(MetricRankOrder::Asc), - Some(OptimizationObjective::Maximize) => Ok(MetricRankOrder::Desc), - Some(OptimizationObjective::Target) | None => Err(StoreError::MetricOrderRequired { - key: query.key.as_str().to_owned(), - metric_source: source.as_str().to_owned(), - }), - }, - _ => Err(StoreError::MetricSemanticsAmbiguous { - key: query.key.as_str().to_owned(), - metric_source: source.as_str().to_owned(), - }), + + fn materialize_outcome( + &self, + patch: &ExperimentOutcomePatch, + ) -> Result<ExperimentOutcome, StoreError> { + if patch.backend == ExecutionBackend::Manual && patch.command.argv.is_empty() { + return Err(StoreError::ManualExperimentRequiresCommand); + } + for key in patch.dimensions.keys() { + let definition = self + .run_dimension_definition(key)? + .ok_or_else(|| StoreError::UnknownRunDimension(key.clone()))?; + let observed = patch + .dimensions + .get(key) + .map(RunDimensionValue::value_type) + .ok_or_else(|| StoreError::UnknownRunDimension(key.clone()))?; + if definition.value_type != observed { + return Err(StoreError::UnknownDimensionFilter(key.to_string())); + } + } + let _ = self + .metric_definition(&patch.primary_metric.key)? + .ok_or_else(|| StoreError::UnknownMetricDefinition(patch.primary_metric.key.clone()))?; + for metric in &patch.supporting_metrics { + let _ = self + .metric_definition(&metric.key)? + .ok_or_else(|| StoreError::UnknownMetricDefinition(metric.key.clone()))?; + } + Ok(ExperimentOutcome { + backend: patch.backend, + command: patch.command.clone(), + dimensions: patch.dimensions.clone(), + primary_metric: patch.primary_metric.clone(), + supporting_metrics: patch.supporting_metrics.clone(), + verdict: patch.verdict, + rationale: patch.rationale.clone(), + analysis: patch.analysis.clone(), + closed_at: OffsetDateTime::now_utc(), + }) } -} -fn compare_metric_samples( - left: &MetricSample, - right: &MetricSample, - order: MetricRankOrder, -) -> Ordering { - let metric_order = match order { - MetricRankOrder::Asc => left - .value - .partial_cmp(&right.value) - .unwrap_or(Ordering::Equal), - MetricRankOrder::Desc => right - .value - .partial_cmp(&left.value) - .unwrap_or(Ordering::Equal), - }; - metric_order - .then_with(|| right.experiment_id.cmp(&left.experiment_id)) - .then_with(|| left.key.cmp(&right.key)) -} + fn assert_known_tags(&self, tags: &BTreeSet<TagName>) -> Result<(), StoreError> { + for tag in tags { + if self + .connection + .query_row( + "SELECT 1 FROM tags WHERE name = ?1", + params![tag.as_str()], + |_| Ok(()), + ) + .optional()? + .is_none() + { + return Err(StoreError::UnknownTag(tag.clone())); + } + } + Ok(()) + } -#[derive(Clone, Debug)] -struct ExperimentMetricRow { - experiment_id: fidget_spinner_core::ExperimentId, - experiment_title: NonEmptyText, - frontier_id: fidget_spinner_core::FrontierId, - run_id: fidget_spinner_core::RunId, - verdict: FrontierVerdict, - hypothesis_node: DagNode, - run_node: DagNode, - analysis_node: Option<DagNode>, - decision_node: DagNode, - primary_metric: MetricValue, - supporting_metrics: Vec<MetricValue>, - dimensions: BTreeMap<NonEmptyText, RunDimensionValue>, -} + fn unique_frontier_slug( + &self, + explicit: Option<Slug>, + label: &NonEmptyText, + ) -> Result<Slug, StoreError> { + self.unique_slug("frontiers", "slug", explicit, label) + } -fn load_experiment_rows(store: &ProjectStore) -> Result<Vec<ExperimentMetricRow>, StoreError> { - let run_dimensions = load_run_dimensions_by_run_id(store)?; - let mut statement = store.connection.prepare( - "SELECT - id, - title, - frontier_id, - run_id, - hypothesis_node_id, - run_node_id, - analysis_node_id, - decision_node_id, - primary_metric_json, - supporting_metrics_json, - verdict - FROM experiments", - )?; - let mut rows = statement.query([])?; - let mut items = Vec::new(); - while let Some(row) = rows.next()? { - let hypothesis_node_id = parse_node_id(&row.get::<_, String>(4)?)?; - let run_id = parse_run_id(&row.get::<_, String>(3)?)?; - let run_node_id = parse_node_id(&row.get::<_, String>(5)?)?; - let analysis_node_id = row - .get::<_, Option<String>>(6)? - .map(|raw| parse_node_id(&raw)) - .transpose()?; - let decision_node_id = parse_node_id(&row.get::<_, String>(7)?)?; - items.push(ExperimentMetricRow { - experiment_id: parse_experiment_id(&row.get::<_, String>(0)?)?, - experiment_title: NonEmptyText::new(row.get::<_, String>(1)?)?, - frontier_id: parse_frontier_id(&row.get::<_, String>(2)?)?, - run_id, - verdict: parse_frontier_verdict(&row.get::<_, String>(10)?)?, - hypothesis_node: store - .get_node(hypothesis_node_id)? - .ok_or(StoreError::NodeNotFound(hypothesis_node_id))?, - run_node: store - .get_node(run_node_id)? - .ok_or(StoreError::NodeNotFound(run_node_id))?, - analysis_node: analysis_node_id - .map(|node_id| { - store - .get_node(node_id)? - .ok_or(StoreError::NodeNotFound(node_id)) - }) - .transpose()?, - decision_node: store - .get_node(decision_node_id)? - .ok_or(StoreError::NodeNotFound(decision_node_id))?, - primary_metric: decode_json(&row.get::<_, String>(8)?)?, - supporting_metrics: decode_json(&row.get::<_, String>(9)?)?, - dimensions: run_dimensions.get(&run_id).cloned().unwrap_or_default(), - }); + fn unique_hypothesis_slug( + &self, + explicit: Option<Slug>, + title: &NonEmptyText, + ) -> Result<Slug, StoreError> { + self.unique_slug("hypotheses", "slug", explicit, title) } - Ok(items) -} -fn metric_samples_for_row( - schema: &ProjectSchema, - row: &ExperimentMetricRow, - metric_definitions: &BTreeMap<String, MetricDefinition>, -) -> Vec<MetricSample> { - let mut samples = vec![metric_sample_from_observation( - row, - &row.primary_metric, - metric_definitions, - MetricFieldSource::RunMetric, - )]; - samples.extend(row.supporting_metrics.iter().map(|metric| { - metric_sample_from_observation( - row, - metric, - metric_definitions, - MetricFieldSource::RunMetric, - ) - })); - samples.extend(metric_samples_from_payload( - schema, - row, - &row.hypothesis_node, - )); - samples.extend(metric_samples_from_payload(schema, row, &row.run_node)); - if let Some(node) = row.analysis_node.as_ref() { - samples.extend(metric_samples_from_payload(schema, row, node)); + fn unique_experiment_slug( + &self, + explicit: Option<Slug>, + title: &NonEmptyText, + ) -> Result<Slug, StoreError> { + self.unique_slug("experiments", "slug", explicit, title) } - samples.extend(metric_samples_from_payload(schema, row, &row.decision_node)); - samples -} -fn metric_sample_from_observation( - row: &ExperimentMetricRow, - metric: &MetricValue, - metric_definitions: &BTreeMap<String, MetricDefinition>, - source: MetricFieldSource, -) -> MetricSample { - let registry = metric_definitions.get(metric.key.as_str()); - MetricSample { - key: metric.key.clone(), - source, - value: metric.value, - frontier_id: row.frontier_id, - experiment_id: row.experiment_id, - experiment_title: row.experiment_title.clone(), - hypothesis_node_id: row.hypothesis_node.id, - hypothesis_title: row.hypothesis_node.title.clone(), - run_id: row.run_id, - verdict: row.verdict, - unit: registry.map(|definition| definition.unit), - objective: registry.map(|definition| definition.objective), - dimensions: row.dimensions.clone(), + fn unique_artifact_slug( + &self, + explicit: Option<Slug>, + label: &NonEmptyText, + ) -> Result<Slug, StoreError> { + self.unique_slug("artifacts", "slug", explicit, label) } -} -fn metric_samples_from_payload( - schema: &ProjectSchema, - row: &ExperimentMetricRow, - node: &DagNode, -) -> Vec<MetricSample> { - let Some(source) = MetricFieldSource::from_payload_class(node.class) else { - return Vec::new(); - }; - node.payload - .fields - .iter() - .filter_map(|(key, value)| { - let value = value.as_f64()?; - let spec = schema.field_spec(node.class, key); - if spec.is_some_and(|field| { - field - .value_type - .is_some_and(|kind| kind != FieldValueType::Numeric) - }) { - return None; + fn unique_slug( + &self, + table: &str, + column: &str, + explicit: Option<Slug>, + seed: &NonEmptyText, + ) -> Result<Slug, StoreError> { + if let Some(explicit) = explicit { + return Ok(explicit); + } + let base = slugify(seed.as_str())?; + if !self.slug_exists(table, column, &base)? { + return Ok(base); + } + for ordinal in 2..10_000 { + let candidate = Slug::new(format!("{}-{ordinal}", base.as_str()))?; + if !self.slug_exists(table, column, &candidate)? { + return Ok(candidate); } - Some(MetricSample { - key: NonEmptyText::new(key.clone()).ok()?, - source, - value, - frontier_id: row.frontier_id, - experiment_id: row.experiment_id, - experiment_title: row.experiment_title.clone(), - hypothesis_node_id: row.hypothesis_node.id, - hypothesis_title: row.hypothesis_node.title.clone(), - run_id: row.run_id, - verdict: row.verdict, - unit: None, - objective: None, - dimensions: row.dimensions.clone(), + } + Slug::new(format!("{}-{}", base.as_str(), Uuid::now_v7().simple())) + .map_err(StoreError::from) + } + + fn slug_exists(&self, table: &str, column: &str, slug: &Slug) -> Result<bool, StoreError> { + let sql = format!("SELECT 1 FROM {table} WHERE {column} = ?1"); + self.connection + .query_row(&sql, params![slug.as_str()], |_| Ok(())) + .optional() + .map(|value| value.is_some()) + .map_err(StoreError::from) + } + + fn entity_history( + &self, + entity_kind: &str, + entity_id: &str, + ) -> Result<Vec<EntityHistoryEntry>, StoreError> { + let mut statement = self.connection.prepare( + "SELECT revision, event_kind, occurred_at, snapshot_json + FROM events + WHERE entity_kind = ?1 AND entity_id = ?2 + ORDER BY revision DESC, occurred_at DESC", + )?; + let rows = statement.query_map(params![entity_kind, entity_id], |row| { + Ok(EntityHistoryEntry { + revision: row.get(0)?, + event_kind: parse_non_empty_text(&row.get::<_, String>(1)?)?, + occurred_at: parse_timestamp_sql(&row.get::<_, String>(2)?)?, + snapshot: decode_json(&row.get::<_, String>(3)?) + .map_err(to_sql_conversion_error)?, }) - }) - .collect() + })?; + rows.collect::<Result<Vec<_>, _>>() + .map_err(StoreError::from) + } } -fn migrate(connection: &Connection) -> Result<(), StoreError> { +fn install_schema(connection: &Connection) -> Result<(), StoreError> { connection.execute_batch( " - PRAGMA foreign_keys = ON; + CREATE TABLE IF NOT EXISTS tags ( + name TEXT PRIMARY KEY NOT NULL, + description TEXT NOT NULL, + created_at TEXT NOT NULL + ); - CREATE TABLE IF NOT EXISTS nodes ( - id TEXT PRIMARY KEY, - class TEXT NOT NULL, - track TEXT NOT NULL, - frontier_id TEXT, + CREATE TABLE IF NOT EXISTS frontiers ( + id TEXT PRIMARY KEY NOT NULL, + slug TEXT NOT NULL UNIQUE, + label TEXT NOT NULL, + objective TEXT NOT NULL, + status TEXT NOT NULL, + brief_json TEXT NOT NULL, + revision INTEGER NOT NULL, + created_at TEXT NOT NULL, + updated_at TEXT NOT NULL + ); + + CREATE TABLE IF NOT EXISTS hypotheses ( + id TEXT PRIMARY KEY NOT NULL, + slug TEXT NOT NULL UNIQUE, + frontier_id TEXT NOT NULL REFERENCES frontiers(id) ON DELETE CASCADE, archived INTEGER NOT NULL, title TEXT NOT NULL, - summary TEXT, - payload_schema_namespace TEXT, - payload_schema_version INTEGER, - payload_json TEXT NOT NULL, - diagnostics_json TEXT NOT NULL, - agent_session_id TEXT, + summary TEXT NOT NULL, + body TEXT NOT NULL, + revision INTEGER NOT NULL, created_at TEXT NOT NULL, updated_at TEXT NOT NULL ); - CREATE TABLE IF NOT EXISTS node_annotations ( - id TEXT PRIMARY KEY, - node_id TEXT NOT NULL REFERENCES nodes(id) ON DELETE CASCADE, - visibility TEXT NOT NULL, - label TEXT, - body TEXT NOT NULL, - created_at TEXT NOT NULL + CREATE TABLE IF NOT EXISTS hypothesis_tags ( + hypothesis_id TEXT NOT NULL REFERENCES hypotheses(id) ON DELETE CASCADE, + tag_name TEXT NOT NULL REFERENCES tags(name) ON DELETE CASCADE, + PRIMARY KEY (hypothesis_id, tag_name) ); - CREATE TABLE IF NOT EXISTS tags ( - name TEXT PRIMARY KEY, - description TEXT NOT NULL, - created_at TEXT NOT NULL + CREATE TABLE IF NOT EXISTS experiments ( + id TEXT PRIMARY KEY NOT NULL, + slug TEXT NOT NULL UNIQUE, + frontier_id TEXT NOT NULL REFERENCES frontiers(id) ON DELETE CASCADE, + hypothesis_id TEXT NOT NULL REFERENCES hypotheses(id) ON DELETE CASCADE, + archived INTEGER NOT NULL, + title TEXT NOT NULL, + summary TEXT, + tags_json TEXT NOT NULL, + status TEXT NOT NULL, + outcome_json TEXT, + revision INTEGER NOT NULL, + created_at TEXT NOT NULL, + updated_at TEXT NOT NULL ); - CREATE TABLE IF NOT EXISTS node_tags ( - node_id TEXT NOT NULL REFERENCES nodes(id) ON DELETE CASCADE, - tag_name TEXT NOT NULL REFERENCES tags(name) ON DELETE RESTRICT, - PRIMARY KEY (node_id, tag_name) + CREATE TABLE IF NOT EXISTS experiment_tags ( + experiment_id TEXT NOT NULL REFERENCES experiments(id) ON DELETE CASCADE, + tag_name TEXT NOT NULL REFERENCES tags(name) ON DELETE CASCADE, + PRIMARY KEY (experiment_id, tag_name) ); - CREATE TABLE IF NOT EXISTS node_edges ( - source_id TEXT NOT NULL REFERENCES nodes(id) ON DELETE CASCADE, - target_id TEXT NOT NULL REFERENCES nodes(id) ON DELETE CASCADE, - kind TEXT NOT NULL, - PRIMARY KEY (source_id, target_id, kind) + CREATE TABLE IF NOT EXISTS influence_edges ( + parent_kind TEXT NOT NULL, + parent_id TEXT NOT NULL, + child_kind TEXT NOT NULL, + child_id TEXT NOT NULL, + ordinal INTEGER NOT NULL, + PRIMARY KEY (parent_kind, parent_id, child_kind, child_id) ); - CREATE TABLE IF NOT EXISTS frontiers ( - id TEXT PRIMARY KEY, + CREATE TABLE IF NOT EXISTS artifacts ( + id TEXT PRIMARY KEY NOT NULL, + slug TEXT NOT NULL UNIQUE, + kind TEXT NOT NULL, label TEXT NOT NULL, - root_contract_node_id TEXT NOT NULL REFERENCES nodes(id) ON DELETE RESTRICT, - status TEXT NOT NULL, + summary TEXT, + locator TEXT NOT NULL, + media_type TEXT, + revision INTEGER NOT NULL, created_at TEXT NOT NULL, updated_at TEXT NOT NULL ); - CREATE TABLE IF NOT EXISTS runs ( - run_id TEXT PRIMARY KEY, - node_id TEXT NOT NULL REFERENCES nodes(id) ON DELETE CASCADE, - frontier_id TEXT REFERENCES frontiers(id) ON DELETE SET NULL, - status TEXT NOT NULL, - backend TEXT NOT NULL, - benchmark_suite TEXT, - working_directory TEXT NOT NULL, - argv_json TEXT NOT NULL, - env_json TEXT NOT NULL, - started_at TEXT, - finished_at TEXT - ); - - CREATE TABLE IF NOT EXISTS metrics ( - run_id TEXT NOT NULL REFERENCES runs(run_id) ON DELETE CASCADE, - metric_key TEXT NOT NULL, - unit TEXT NOT NULL, - objective TEXT NOT NULL, - value REAL NOT NULL + CREATE TABLE IF NOT EXISTS artifact_attachments ( + artifact_id TEXT NOT NULL REFERENCES artifacts(id) ON DELETE CASCADE, + target_kind TEXT NOT NULL, + target_id TEXT NOT NULL, + ordinal INTEGER NOT NULL, + PRIMARY KEY (artifact_id, target_kind, target_id) ); CREATE TABLE IF NOT EXISTS metric_definitions ( - metric_key TEXT PRIMARY KEY, + key TEXT PRIMARY KEY NOT NULL, unit TEXT NOT NULL, objective TEXT NOT NULL, + visibility TEXT NOT NULL, description TEXT, - created_at TEXT NOT NULL + created_at TEXT NOT NULL, + updated_at TEXT NOT NULL ); CREATE TABLE IF NOT EXISTS run_dimension_definitions ( - dimension_key TEXT PRIMARY KEY, + key TEXT PRIMARY KEY NOT NULL, value_type TEXT NOT NULL, description TEXT, - created_at TEXT NOT NULL + created_at TEXT NOT NULL, + updated_at TEXT NOT NULL ); - CREATE TABLE IF NOT EXISTS run_dimensions ( - run_id TEXT NOT NULL REFERENCES runs(run_id) ON DELETE CASCADE, - dimension_key TEXT NOT NULL REFERENCES run_dimension_definitions(dimension_key) ON DELETE RESTRICT, - value_type TEXT NOT NULL, - value_text TEXT, - value_numeric REAL, - value_boolean INTEGER, - value_timestamp TEXT, - PRIMARY KEY (run_id, dimension_key) + CREATE TABLE IF NOT EXISTS experiment_dimensions ( + experiment_id TEXT NOT NULL REFERENCES experiments(id) ON DELETE CASCADE, + key TEXT NOT NULL REFERENCES run_dimension_definitions(key) ON DELETE CASCADE, + value_json TEXT NOT NULL, + PRIMARY KEY (experiment_id, key) ); - CREATE TABLE IF NOT EXISTS open_experiments ( - id TEXT PRIMARY KEY, - frontier_id TEXT NOT NULL REFERENCES frontiers(id) ON DELETE CASCADE, - hypothesis_node_id TEXT NOT NULL REFERENCES nodes(id) ON DELETE RESTRICT, - title TEXT NOT NULL, - summary TEXT, - created_at TEXT NOT NULL + CREATE TABLE IF NOT EXISTS experiment_metrics ( + experiment_id TEXT NOT NULL REFERENCES experiments(id) ON DELETE CASCADE, + key TEXT NOT NULL REFERENCES metric_definitions(key) ON DELETE CASCADE, + ordinal INTEGER NOT NULL, + is_primary INTEGER NOT NULL, + value REAL NOT NULL, + PRIMARY KEY (experiment_id, key, ordinal) ); - CREATE TABLE IF NOT EXISTS experiments ( - id TEXT PRIMARY KEY, - frontier_id TEXT NOT NULL REFERENCES frontiers(id) ON DELETE CASCADE, - hypothesis_node_id TEXT NOT NULL REFERENCES nodes(id) ON DELETE RESTRICT, - run_node_id TEXT NOT NULL REFERENCES nodes(id) ON DELETE RESTRICT, - run_id TEXT NOT NULL REFERENCES runs(run_id) ON DELETE RESTRICT, - analysis_node_id TEXT REFERENCES nodes(id) ON DELETE RESTRICT, - decision_node_id TEXT NOT NULL REFERENCES nodes(id) ON DELETE RESTRICT, - title TEXT NOT NULL, - summary TEXT, - benchmark_suite TEXT NOT NULL, - primary_metric_json TEXT NOT NULL, - supporting_metrics_json TEXT NOT NULL, - note_summary TEXT NOT NULL, - note_next_json TEXT NOT NULL, - verdict TEXT NOT NULL, - created_at TEXT NOT NULL - ); - - CREATE INDEX IF NOT EXISTS metrics_by_key ON metrics(metric_key); - CREATE INDEX IF NOT EXISTS run_dimensions_by_key_text ON run_dimensions(dimension_key, value_text); - CREATE INDEX IF NOT EXISTS run_dimensions_by_key_numeric ON run_dimensions(dimension_key, value_numeric); - CREATE INDEX IF NOT EXISTS run_dimensions_by_run ON run_dimensions(run_id, dimension_key); - CREATE INDEX IF NOT EXISTS open_experiments_by_frontier ON open_experiments(frontier_id, created_at DESC); - CREATE INDEX IF NOT EXISTS experiments_by_frontier ON experiments(frontier_id, created_at DESC); - CREATE TABLE IF NOT EXISTS events ( - id INTEGER PRIMARY KEY AUTOINCREMENT, entity_kind TEXT NOT NULL, entity_id TEXT NOT NULL, + revision INTEGER NOT NULL, event_kind TEXT NOT NULL, - payload_json TEXT NOT NULL, - created_at TEXT NOT NULL + occurred_at TEXT NOT NULL, + snapshot_json TEXT NOT NULL, + PRIMARY KEY (entity_kind, entity_id, revision) ); ", )?; Ok(()) } -fn backfill_prose_summaries(connection: &Connection) -> Result<(), StoreError> { - let mut statement = connection.prepare( - "SELECT id, payload_json - FROM nodes - WHERE class IN ('note', 'source') - AND (summary IS NULL OR trim(summary) = '')", +fn insert_frontier( + transaction: &Transaction<'_>, + frontier: &FrontierRecord, +) -> Result<(), StoreError> { + let _ = transaction.execute( + "INSERT INTO frontiers (id, slug, label, objective, status, brief_json, revision, created_at, updated_at) + VALUES (?1, ?2, ?3, ?4, ?5, ?6, ?7, ?8, ?9)", + params![ + frontier.id.to_string(), + frontier.slug.as_str(), + frontier.label.as_str(), + frontier.objective.as_str(), + frontier.status.as_str(), + encode_json(&frontier.brief)?, + frontier.revision, + encode_timestamp(frontier.created_at)?, + encode_timestamp(frontier.updated_at)?, + ], )?; - let mut rows = statement.query([])?; - let mut updates = Vec::new(); - while let Some(row) = rows.next()? { - let node_id = row.get::<_, String>(0)?; - let payload = decode_json::<NodePayload>(&row.get::<_, String>(1)?)?; - let Some(Value::String(body)) = payload.field("body") else { - continue; - }; - let Some(summary) = derive_summary_from_body(body) else { - continue; - }; - updates.push((node_id, summary)); - } - for (node_id, summary) in updates { - let _ = connection.execute( - "UPDATE nodes SET summary = ?1 WHERE id = ?2", - params![summary.as_str(), node_id], - )?; - } Ok(()) } -fn sort_schema_fields(fields: &mut [ProjectFieldSpec]) { - fields.sort_by(|left, right| { - left.name - .cmp(&right.name) - .then_with(|| left.node_classes.iter().cmp(right.node_classes.iter())) - }); -} - -fn normalize_metric_plane_tx( - tx: &Transaction<'_>, -) -> Result<MetricPlaneMigrationReport, StoreError> { - let mut report = MetricPlaneMigrationReport::default(); - - if insert_run_dimension_definition_tx( - tx, - &RunDimensionDefinition::new( - NonEmptyText::new("benchmark_suite")?, - FieldValueType::String, - Some(NonEmptyText::new("Legacy coarse benchmark label")?), - ), - )? { - report.inserted_dimension_definitions += 1; - } - - { - let mut statement = tx.prepare( - "SELECT DISTINCT metric_key, unit, objective - FROM metrics - ORDER BY metric_key ASC", - )?; - let mut rows = statement.query([])?; - while let Some(row) = rows.next()? { - let definition = MetricDefinition::new( - NonEmptyText::new(row.get::<_, String>(0)?)?, - decode_metric_unit(&row.get::<_, String>(1)?)?, - decode_optimization_objective(&row.get::<_, String>(2)?)?, - None, - ); - if upsert_metric_definition_tx(tx, &definition)? { - report.inserted_metric_definitions += 1; - } - } - } - - { - let mut statement = tx.prepare( - "SELECT payload_json - FROM nodes - WHERE class = 'contract'", - )?; - let mut rows = statement.query([])?; - while let Some(row) = rows.next()? { - let payload = decode_json::<NodePayload>(&row.get::<_, String>(0)?)?; - for definition in contract_metric_definitions(&payload)? { - if upsert_metric_definition_tx(tx, &definition)? { - report.inserted_metric_definitions += 1; - } - } - } - } - - { - let mut statement = tx.prepare( - "SELECT run_id, benchmark_suite - FROM runs - WHERE benchmark_suite IS NOT NULL - AND trim(benchmark_suite) != ''", - )?; - let mut rows = statement.query([])?; - while let Some(row) = rows.next()? { - let run_id = parse_run_id(&row.get::<_, String>(0)?)?; - let value = RunDimensionValue::String(NonEmptyText::new(row.get::<_, String>(1)?)?); - if insert_run_dimension_value_tx( - tx, - run_id, - &NonEmptyText::new("benchmark_suite")?, - &value, - )? { - report.inserted_dimension_values += 1; - } - } - } - - Ok(report) -} - -fn contract_metric_definitions(payload: &NodePayload) -> Result<Vec<MetricDefinition>, StoreError> { - let mut definitions = Vec::new(); - if let Some(primary) = payload.field("primary_metric") { - definitions.push(metric_definition_from_json(primary, None)?); - } - if let Some(Value::Array(items)) = payload.field("supporting_metrics") { - for item in items { - definitions.push(metric_definition_from_json(item, None)?); - } - } - Ok(definitions) -} - -fn metric_definition_from_json( - value: &Value, - description: Option<NonEmptyText>, -) -> Result<MetricDefinition, StoreError> { - let Some(object) = value.as_object() else { - return Err(StoreError::Json(serde_json::Error::io(io::Error::new( - io::ErrorKind::InvalidData, - "metric definition payload must be an object", - )))); - }; - let key = object - .get("metric_key") - .or_else(|| object.get("key")) - .and_then(Value::as_str) - .ok_or_else(|| { - StoreError::Json(serde_json::Error::io(io::Error::new( - io::ErrorKind::InvalidData, - "metric definition missing key", - ))) - })?; - let unit = object.get("unit").and_then(Value::as_str).ok_or_else(|| { - StoreError::Json(serde_json::Error::io(io::Error::new( - io::ErrorKind::InvalidData, - "metric definition missing unit", - ))) - })?; - let objective = object - .get("objective") - .and_then(Value::as_str) - .ok_or_else(|| { - StoreError::Json(serde_json::Error::io(io::Error::new( - io::ErrorKind::InvalidData, - "metric definition missing objective", - ))) - })?; - Ok(MetricDefinition::new( - NonEmptyText::new(key)?, - decode_metric_unit(unit)?, - decode_optimization_objective(objective)?, - description, - )) -} - -fn upsert_metric_definition_tx( - tx: &Transaction<'_>, - definition: &MetricDefinition, -) -> Result<bool, StoreError> { - let existing = tx - .query_row( - "SELECT unit, objective, description - FROM metric_definitions - WHERE metric_key = ?1", - params![definition.key.as_str()], - |row| { - Ok(( - row.get::<_, String>(0)?, - row.get::<_, String>(1)?, - row.get::<_, Option<String>>(2)?, - )) - }, - ) - .optional()?; - if let Some((existing_unit, existing_objective, existing_description)) = existing { - let new_unit = encode_metric_unit(definition.unit).to_owned(); - let new_objective = encode_optimization_objective(definition.objective).to_owned(); - if existing_unit != new_unit || existing_objective != new_objective { - return Err(StoreError::ConflictingMetricDefinition { - key: definition.key.as_str().to_owned(), - existing_unit, - existing_objective, - new_unit, - new_objective, - }); - } - if existing_description.is_none() && definition.description.is_some() { - let _ = tx.execute( - "UPDATE metric_definitions SET description = ?2 WHERE metric_key = ?1", - params![ - definition.key.as_str(), - definition.description.as_ref().map(NonEmptyText::as_str) - ], - )?; - } - Ok(false) - } else { - let _ = tx.execute( - "INSERT INTO metric_definitions (metric_key, unit, objective, description, created_at) - VALUES (?1, ?2, ?3, ?4, ?5)", - params![ - definition.key.as_str(), - encode_metric_unit(definition.unit), - encode_optimization_objective(definition.objective), - definition.description.as_ref().map(NonEmptyText::as_str), - encode_timestamp(definition.created_at)?, - ], - )?; - Ok(true) - } -} - -fn insert_run_dimension_definition_tx( - tx: &Transaction<'_>, - definition: &RunDimensionDefinition, -) -> Result<bool, StoreError> { - let existing = tx - .query_row( - "SELECT value_type, description - FROM run_dimension_definitions - WHERE dimension_key = ?1", - params![definition.key.as_str()], - |row| Ok((row.get::<_, String>(0)?, row.get::<_, Option<String>>(1)?)), - ) - .optional()?; - if let Some((existing_type, existing_description)) = existing { - let new_type = encode_field_value_type(definition.value_type).to_owned(); - if existing_type != new_type { - return Err(StoreError::ConflictingRunDimensionDefinition { - key: definition.key.as_str().to_owned(), - existing_type, - new_type, - }); - } - if existing_description.is_none() && definition.description.is_some() { - let _ = tx.execute( - "UPDATE run_dimension_definitions SET description = ?2 WHERE dimension_key = ?1", - params![ - definition.key.as_str(), - definition.description.as_ref().map(NonEmptyText::as_str) - ], - )?; - } - Ok(false) - } else { - let _ = tx.execute( - "INSERT INTO run_dimension_definitions (dimension_key, value_type, description, created_at) - VALUES (?1, ?2, ?3, ?4)", - params![ - definition.key.as_str(), - encode_field_value_type(definition.value_type), - definition.description.as_ref().map(NonEmptyText::as_str), - encode_timestamp(definition.created_at)?, - ], - )?; - Ok(true) - } -} - -fn load_metric_definition_tx( - tx: &Transaction<'_>, - key: &NonEmptyText, -) -> Result<Option<MetricDefinition>, StoreError> { - tx.query_row( - "SELECT metric_key, unit, objective, description, created_at - FROM metric_definitions - WHERE metric_key = ?1", - params![key.as_str()], - |row| { - Ok(MetricDefinition { - key: NonEmptyText::new(row.get::<_, String>(0)?) - .map_err(core_to_sql_conversion_error)?, - unit: decode_metric_unit(&row.get::<_, String>(1)?) - .map_err(to_sql_conversion_error)?, - objective: decode_optimization_objective(&row.get::<_, String>(2)?) - .map_err(to_sql_conversion_error)?, - description: row - .get::<_, Option<String>>(3)? - .map(NonEmptyText::new) - .transpose() - .map_err(core_to_sql_conversion_error)?, - created_at: decode_timestamp(&row.get::<_, String>(4)?) - .map_err(to_sql_conversion_error)?, - }) - }, - ) - .optional() - .map_err(StoreError::from) -} - -fn metric_definitions_by_key( - store: &ProjectStore, -) -> Result<BTreeMap<String, MetricDefinition>, StoreError> { - Ok(store - .list_metric_definitions()? - .into_iter() - .map(|definition| (definition.key.as_str().to_owned(), definition)) - .collect()) -} - -fn run_dimension_definitions_by_key( - store: &ProjectStore, -) -> Result<BTreeMap<String, RunDimensionDefinition>, StoreError> { - let mut statement = store.connection.prepare( - "SELECT dimension_key, value_type, description, created_at - FROM run_dimension_definitions", +fn update_frontier( + transaction: &Transaction<'_>, + frontier: &FrontierRecord, +) -> Result<(), StoreError> { + let _ = transaction.execute( + "UPDATE frontiers + SET slug = ?2, label = ?3, objective = ?4, status = ?5, brief_json = ?6, revision = ?7, updated_at = ?8 + WHERE id = ?1", + params![ + frontier.id.to_string(), + frontier.slug.as_str(), + frontier.label.as_str(), + frontier.objective.as_str(), + frontier.status.as_str(), + encode_json(&frontier.brief)?, + frontier.revision, + encode_timestamp(frontier.updated_at)?, + ], )?; - let mut rows = statement.query([])?; - let mut items = BTreeMap::new(); - while let Some(row) = rows.next()? { - let definition = RunDimensionDefinition { - key: NonEmptyText::new(row.get::<_, String>(0)?)?, - value_type: decode_field_value_type(&row.get::<_, String>(1)?)?, - description: row - .get::<_, Option<String>>(2)? - .map(NonEmptyText::new) - .transpose()?, - created_at: decode_timestamp(&row.get::<_, String>(3)?)?, - }; - let _ = items.insert(definition.key.as_str().to_owned(), definition); - } - Ok(items) -} - -fn coerce_run_dimension_map( - definitions: &BTreeMap<String, RunDimensionDefinition>, - raw_dimensions: BTreeMap<String, Value>, -) -> Result<BTreeMap<NonEmptyText, RunDimensionValue>, StoreError> { - let mut dimensions = BTreeMap::new(); - for (raw_key, raw_value) in raw_dimensions { - let key = NonEmptyText::new(raw_key)?; - let Some(definition) = definitions.get(key.as_str()) else { - return Err(StoreError::UnknownRunDimension(key)); - }; - let value = coerce_run_dimension_value(definition, raw_value)?; - let _ = dimensions.insert(key, value); - } - Ok(dimensions) -} - -fn coerce_run_dimension_value( - definition: &RunDimensionDefinition, - raw_value: Value, -) -> Result<RunDimensionValue, StoreError> { - match definition.value_type { - FieldValueType::String => match raw_value { - Value::String(value) => Ok(RunDimensionValue::String(NonEmptyText::new(value)?)), - other => Err(StoreError::InvalidRunDimensionValue { - key: definition.key.as_str().to_owned(), - expected: definition.value_type.as_str().to_owned(), - observed: value_kind_name(&other).to_owned(), - }), - }, - FieldValueType::Numeric => match raw_value.as_f64() { - Some(value) => Ok(RunDimensionValue::Numeric(value)), - None => Err(StoreError::InvalidRunDimensionValue { - key: definition.key.as_str().to_owned(), - expected: definition.value_type.as_str().to_owned(), - observed: value_kind_name(&raw_value).to_owned(), - }), - }, - FieldValueType::Boolean => match raw_value { - Value::Bool(value) => Ok(RunDimensionValue::Boolean(value)), - other => Err(StoreError::InvalidRunDimensionValue { - key: definition.key.as_str().to_owned(), - expected: definition.value_type.as_str().to_owned(), - observed: value_kind_name(&other).to_owned(), - }), - }, - FieldValueType::Timestamp => match raw_value { - Value::String(value) => { - let _ = OffsetDateTime::parse(&value, &Rfc3339)?; - Ok(RunDimensionValue::Timestamp(NonEmptyText::new(value)?)) - } - other => Err(StoreError::InvalidRunDimensionValue { - key: definition.key.as_str().to_owned(), - expected: definition.value_type.as_str().to_owned(), - observed: value_kind_name(&other).to_owned(), - }), - }, - } + Ok(()) } -fn insert_run_dimension_value_tx( - tx: &Transaction<'_>, - run_id: fidget_spinner_core::RunId, - key: &NonEmptyText, - value: &RunDimensionValue, -) -> Result<bool, StoreError> { - let (value_text, value_numeric, value_boolean, value_timestamp) = - encode_run_dimension_columns(value)?; - let changed = tx.execute( - "INSERT OR IGNORE INTO run_dimensions ( - run_id, - dimension_key, - value_type, - value_text, - value_numeric, - value_boolean, - value_timestamp - ) VALUES (?1, ?2, ?3, ?4, ?5, ?6, ?7)", +fn insert_hypothesis( + transaction: &Transaction<'_>, + hypothesis: &HypothesisRecord, +) -> Result<(), StoreError> { + let _ = transaction.execute( + "INSERT INTO hypotheses (id, slug, frontier_id, archived, title, summary, body, revision, created_at, updated_at) + VALUES (?1, ?2, ?3, ?4, ?5, ?6, ?7, ?8, ?9, ?10)", params![ - run_id.to_string(), - key.as_str(), - encode_field_value_type(value.value_type()), - value_text, - value_numeric, - value_boolean, - value_timestamp, + hypothesis.id.to_string(), + hypothesis.slug.as_str(), + hypothesis.frontier_id.to_string(), + bool_to_sql(hypothesis.archived), + hypothesis.title.as_str(), + hypothesis.summary.as_str(), + hypothesis.body.as_str(), + hypothesis.revision, + encode_timestamp(hypothesis.created_at)?, + encode_timestamp(hypothesis.updated_at)?, ], )?; - Ok(changed > 0) + Ok(()) } -fn insert_run_dimensions( - tx: &Transaction<'_>, - run_id: fidget_spinner_core::RunId, - dimensions: &BTreeMap<NonEmptyText, RunDimensionValue>, +fn update_hypothesis_row( + transaction: &Transaction<'_>, + hypothesis: &HypothesisRecord, ) -> Result<(), StoreError> { - for (key, value) in dimensions { - let _ = insert_run_dimension_value_tx(tx, run_id, key, value)?; - } + let _ = transaction.execute( + "UPDATE hypotheses + SET slug = ?2, archived = ?3, title = ?4, summary = ?5, body = ?6, revision = ?7, updated_at = ?8 + WHERE id = ?1", + params![ + hypothesis.id.to_string(), + hypothesis.slug.as_str(), + bool_to_sql(hypothesis.archived), + hypothesis.title.as_str(), + hypothesis.summary.as_str(), + hypothesis.body.as_str(), + hypothesis.revision, + encode_timestamp(hypothesis.updated_at)?, + ], + )?; Ok(()) } -fn validate_run_dimensions_tx( - tx: &Transaction<'_>, - dimensions: &BTreeMap<NonEmptyText, RunDimensionValue>, -) -> Result<BTreeMap<NonEmptyText, RunDimensionValue>, StoreError> { - for (key, value) in dimensions { - let Some(expected_type) = tx - .query_row( - "SELECT value_type - FROM run_dimension_definitions - WHERE dimension_key = ?1", - params![key.as_str()], - |row| row.get::<_, String>(0), - ) - .optional()? - else { - return Err(StoreError::UnknownRunDimension(key.clone())); - }; - let expected_type = decode_field_value_type(&expected_type)?; - let observed_type = value.value_type(); - if expected_type != observed_type { - return Err(StoreError::InvalidRunDimensionValue { - key: key.as_str().to_owned(), - expected: expected_type.as_str().to_owned(), - observed: observed_type.as_str().to_owned(), - }); - } - if matches!(value, RunDimensionValue::Timestamp(raw) if OffsetDateTime::parse(raw.as_str(), &Rfc3339).is_err()) - { - return Err(StoreError::InvalidRunDimensionValue { - key: key.as_str().to_owned(), - expected: FieldValueType::Timestamp.as_str().to_owned(), - observed: "string".to_owned(), - }); - } - } - Ok(dimensions.clone()) -} - -fn load_run_dimensions_by_run_id( - store: &ProjectStore, -) -> Result< - BTreeMap<fidget_spinner_core::RunId, BTreeMap<NonEmptyText, RunDimensionValue>>, - StoreError, -> { - let mut statement = store.connection.prepare( - "SELECT run_id, dimension_key, value_type, value_text, value_numeric, value_boolean, value_timestamp - FROM run_dimensions - ORDER BY dimension_key ASC", +fn replace_hypothesis_tags( + transaction: &Transaction<'_>, + hypothesis_id: HypothesisId, + tags: &BTreeSet<TagName>, +) -> Result<(), StoreError> { + let _ = transaction.execute( + "DELETE FROM hypothesis_tags WHERE hypothesis_id = ?1", + params![hypothesis_id.to_string()], )?; - let mut rows = statement.query([])?; - let mut values = - BTreeMap::<fidget_spinner_core::RunId, BTreeMap<NonEmptyText, RunDimensionValue>>::new(); - while let Some(row) = rows.next()? { - let run_id = parse_run_id(&row.get::<_, String>(0)?)?; - let key = NonEmptyText::new(row.get::<_, String>(1)?)?; - let value_type = decode_field_value_type(&row.get::<_, String>(2)?)?; - let value = decode_run_dimension_value( - value_type, - row.get::<_, Option<String>>(3)?, - row.get::<_, Option<f64>>(4)?, - row.get::<_, Option<i64>>(5)?, - row.get::<_, Option<String>>(6)?, - )?; - let _ = values.entry(run_id).or_default().insert(key, value); - } - Ok(values) -} - -fn load_run_dimension_summaries( - store: &ProjectStore, -) -> Result<Vec<RunDimensionSummary>, StoreError> { - let definitions = { - let mut statement = store.connection.prepare( - "SELECT dimension_key, value_type, description, created_at - FROM run_dimension_definitions - ORDER BY dimension_key ASC", - )?; - let mut rows = statement.query([])?; - let mut items = Vec::new(); - while let Some(row) = rows.next()? { - items.push(RunDimensionDefinition { - key: NonEmptyText::new(row.get::<_, String>(0)?)?, - value_type: decode_field_value_type(&row.get::<_, String>(1)?)?, - description: row - .get::<_, Option<String>>(2)? - .map(NonEmptyText::new) - .transpose()?, - created_at: decode_timestamp(&row.get::<_, String>(3)?)?, - }); - } - items - }; - - let mut summaries = Vec::new(); - for definition in definitions { - let mut statement = store.connection.prepare( - "SELECT value_text, value_numeric, value_boolean, value_timestamp - FROM run_dimensions - WHERE dimension_key = ?1", + for tag in tags { + let _ = transaction.execute( + "INSERT INTO hypothesis_tags (hypothesis_id, tag_name) VALUES (?1, ?2)", + params![hypothesis_id.to_string(), tag.as_str()], )?; - let mut rows = statement.query(params![definition.key.as_str()])?; - let mut observed_run_count = 0_u64; - let mut distinct = BTreeSet::new(); - let mut sample_values = Vec::new(); - while let Some(row) = rows.next()? { - observed_run_count += 1; - let value = decode_run_dimension_value( - definition.value_type, - row.get::<_, Option<String>>(0)?, - row.get::<_, Option<f64>>(1)?, - row.get::<_, Option<i64>>(2)?, - row.get::<_, Option<String>>(3)?, - )?; - let serialized = encode_json(&value.as_json())?; - if distinct.insert(serialized) && sample_values.len() < 5 { - sample_values.push(value.as_json()); - } - } - summaries.push(RunDimensionSummary { - key: definition.key, - value_type: definition.value_type, - description: definition.description, - observed_run_count, - distinct_value_count: distinct.len() as u64, - sample_values, - }); - } - Ok(summaries) -} - -fn merge_registered_run_metric_summaries( - store: &ProjectStore, - summaries: &mut Vec<MetricKeySummary>, -) -> Result<(), StoreError> { - let definitions = store.list_metric_definitions()?; - for definition in definitions { - if let Some(summary) = summaries.iter_mut().find(|summary| { - summary.source == MetricFieldSource::RunMetric && summary.key == definition.key - }) { - summary.unit = Some(definition.unit); - summary.objective = Some(definition.objective); - summary.description.clone_from(&definition.description); - summary.requires_order = matches!(definition.objective, OptimizationObjective::Target); - continue; - } - summaries.push(MetricKeySummary { - key: definition.key, - source: MetricFieldSource::RunMetric, - experiment_count: 0, - unit: Some(definition.unit), - objective: Some(definition.objective), - description: definition.description, - requires_order: matches!(definition.objective, OptimizationObjective::Target), - }); } Ok(()) } -fn dimensions_match( - haystack: &BTreeMap<NonEmptyText, RunDimensionValue>, - needle: &BTreeMap<NonEmptyText, RunDimensionValue>, -) -> bool { - needle - .iter() - .all(|(key, value)| haystack.get(key) == Some(value)) -} - -fn run_dimensions_json(dimensions: &BTreeMap<NonEmptyText, RunDimensionValue>) -> Value { - Value::Object( - dimensions - .iter() - .map(|(key, value)| (key.to_string(), value.as_json())) - .collect::<serde_json::Map<String, Value>>(), - ) -} - -fn benchmark_suite_label(dimensions: &BTreeMap<NonEmptyText, RunDimensionValue>) -> Option<String> { - dimensions - .get(&NonEmptyText::new("benchmark_suite").ok()?) - .and_then(|value| match value { - RunDimensionValue::String(item) => Some(item.to_string()), - _ => None, - }) - .or_else(|| { - if dimensions.is_empty() { - None - } else { - Some( - dimensions - .iter() - .map(|(key, value)| format!("{key}={}", dimension_value_text(value))) - .collect::<Vec<_>>() - .join(", "), - ) - } - }) -} - -fn derive_summary_from_body(body: &str) -> Option<NonEmptyText> { - const MAX_SUMMARY_CHARS: usize = 240; - - let paragraph = body - .split("\n\n") - .map(collapse_inline_whitespace) - .map(|text| text.trim().to_owned()) - .find(|text| !text.is_empty())?; - let summary = truncate_chars(¶graph, MAX_SUMMARY_CHARS); - NonEmptyText::new(summary).ok() -} - -fn collapse_inline_whitespace(raw: &str) -> String { - raw.split_whitespace().collect::<Vec<_>>().join(" ") -} - -fn truncate_chars(value: &str, max_chars: usize) -> String { - if value.chars().count() <= max_chars { - return value.to_owned(); - } - let mut truncated = value.chars().take(max_chars).collect::<String>(); - if let Some(index) = truncated.rfind(char::is_whitespace) { - truncated.truncate(index); - } - format!("{}…", truncated.trim_end()) -} - -fn insert_node(tx: &Transaction<'_>, node: &DagNode) -> Result<(), StoreError> { - let schema_namespace = node - .payload - .schema - .as_ref() - .map(|schema| schema.namespace.as_str()); - let schema_version = node - .payload - .schema - .as_ref() - .map(|schema| i64::from(schema.version)); - let _ = tx.execute( - "INSERT INTO nodes ( - id, - class, - track, - frontier_id, - archived, - title, - summary, - payload_schema_namespace, - payload_schema_version, - payload_json, - diagnostics_json, - agent_session_id, - created_at, - updated_at - ) VALUES (?1, ?2, ?3, ?4, ?5, ?6, ?7, ?8, ?9, ?10, ?11, ?12, ?13, ?14)", +fn insert_experiment( + transaction: &Transaction<'_>, + experiment: &ExperimentRecord, +) -> Result<(), StoreError> { + let _ = transaction.execute( + "INSERT INTO experiments (id, slug, frontier_id, hypothesis_id, archived, title, summary, tags_json, status, outcome_json, revision, created_at, updated_at) + VALUES (?1, ?2, ?3, ?4, ?5, ?6, ?7, ?8, ?9, ?10, ?11, ?12, ?13)", params![ - node.id.to_string(), - node.class.as_str(), - encode_node_track(node.track), - node.frontier_id.map(|id| id.to_string()), - i64::from(node.archived), - node.title.as_str(), - node.summary.as_ref().map(NonEmptyText::as_str), - schema_namespace, - schema_version, - encode_json(&node.payload)?, - encode_json(&node.diagnostics)?, - node.agent_session_id.map(|id| id.to_string()), - encode_timestamp(node.created_at)?, - encode_timestamp(node.updated_at)?, + experiment.id.to_string(), + experiment.slug.as_str(), + experiment.frontier_id.to_string(), + experiment.hypothesis_id.to_string(), + bool_to_sql(experiment.archived), + experiment.title.as_str(), + experiment.summary.as_ref().map(NonEmptyText::as_str), + encode_json(&experiment.tags)?, + experiment.status.as_str(), + experiment.outcome.as_ref().map(encode_json).transpose()?, + experiment.revision, + encode_timestamp(experiment.created_at)?, + encode_timestamp(experiment.updated_at)?, ], )?; - for annotation in &node.annotations { - insert_annotation(tx, node.id, annotation)?; - } - for tag in &node.tags { - insert_node_tag(tx, node.id, tag)?; - } Ok(()) } -fn insert_tag(tx: &Transaction<'_>, tag: &TagRecord) -> Result<(), StoreError> { - let existing = tx - .query_row( - "SELECT 1 FROM tags WHERE name = ?1", - params![tag.name.as_str()], - |row| row.get::<_, i64>(0), - ) - .optional()?; - if existing.is_some() { - return Err(StoreError::DuplicateTag(tag.name.clone())); - } - let _ = tx.execute( - "INSERT INTO tags (name, description, created_at) - VALUES (?1, ?2, ?3)", +fn update_experiment_row( + transaction: &Transaction<'_>, + experiment: &ExperimentRecord, +) -> Result<(), StoreError> { + let _ = transaction.execute( + "UPDATE experiments + SET slug = ?2, archived = ?3, title = ?4, summary = ?5, tags_json = ?6, status = ?7, outcome_json = ?8, revision = ?9, updated_at = ?10 + WHERE id = ?1", params![ - tag.name.as_str(), - tag.description.as_str(), - encode_timestamp(tag.created_at)?, + experiment.id.to_string(), + experiment.slug.as_str(), + bool_to_sql(experiment.archived), + experiment.title.as_str(), + experiment.summary.as_ref().map(NonEmptyText::as_str), + encode_json(&experiment.tags)?, + experiment.status.as_str(), + experiment.outcome.as_ref().map(encode_json).transpose()?, + experiment.revision, + encode_timestamp(experiment.updated_at)?, ], )?; Ok(()) } -fn insert_annotation( - tx: &Transaction<'_>, - node_id: fidget_spinner_core::NodeId, - annotation: &NodeAnnotation, +fn replace_experiment_tags( + transaction: &Transaction<'_>, + experiment_id: ExperimentId, + tags: &BTreeSet<TagName>, ) -> Result<(), StoreError> { - let _ = tx.execute( - "INSERT INTO node_annotations (id, node_id, visibility, label, body, created_at) - VALUES (?1, ?2, ?3, ?4, ?5, ?6)", - params![ - annotation.id.to_string(), - node_id.to_string(), - encode_annotation_visibility(annotation.visibility), - annotation.label.as_ref().map(NonEmptyText::as_str), - annotation.body.as_str(), - encode_timestamp(annotation.created_at)?, - ], + let _ = transaction.execute( + "DELETE FROM experiment_tags WHERE experiment_id = ?1", + params![experiment_id.to_string()], )?; + for tag in tags { + let _ = transaction.execute( + "INSERT INTO experiment_tags (experiment_id, tag_name) VALUES (?1, ?2)", + params![experiment_id.to_string(), tag.as_str()], + )?; + } Ok(()) } -fn insert_node_tag( - tx: &Transaction<'_>, - node_id: fidget_spinner_core::NodeId, - tag: &TagName, +fn replace_influence_parents( + transaction: &Transaction<'_>, + child: VertexRef, + parents: &[VertexRef], ) -> Result<(), StoreError> { - let _ = tx.execute( - "INSERT INTO node_tags (node_id, tag_name) - VALUES (?1, ?2)", - params![node_id.to_string(), tag.as_str()], + let _ = transaction.execute( + "DELETE FROM influence_edges WHERE child_kind = ?1 AND child_id = ?2", + params![vertex_kind_name(child), child.opaque_id()], )?; - Ok(()) -} - -fn ensure_known_tags(tx: &Transaction<'_>, tags: &BTreeSet<TagName>) -> Result<(), StoreError> { - let mut statement = tx.prepare("SELECT 1 FROM tags WHERE name = ?1")?; - for tag in tags { - let exists = statement - .query_row(params![tag.as_str()], |row| row.get::<_, i64>(0)) - .optional()?; - if exists.is_none() { - return Err(StoreError::UnknownTag(tag.clone())); - } + for (ordinal, parent) in parents.iter().enumerate() { + let _ = transaction.execute( + "INSERT INTO influence_edges (parent_kind, parent_id, child_kind, child_id, ordinal) + VALUES (?1, ?2, ?3, ?4, ?5)", + params![ + vertex_kind_name(*parent), + parent.opaque_id(), + vertex_kind_name(child), + child.opaque_id(), + i64::try_from(ordinal).unwrap_or(i64::MAX), + ], + )?; } Ok(()) } -fn insert_edge(tx: &Transaction<'_>, edge: &DagEdge) -> Result<(), StoreError> { - let _ = tx.execute( - "INSERT OR IGNORE INTO node_edges (source_id, target_id, kind) - VALUES (?1, ?2, ?3)", +fn insert_artifact( + transaction: &Transaction<'_>, + artifact: &ArtifactRecord, +) -> Result<(), StoreError> { + let _ = transaction.execute( + "INSERT INTO artifacts (id, slug, kind, label, summary, locator, media_type, revision, created_at, updated_at) + VALUES (?1, ?2, ?3, ?4, ?5, ?6, ?7, ?8, ?9, ?10)", params![ - edge.source_id.to_string(), - edge.target_id.to_string(), - encode_edge_kind(edge.kind), + artifact.id.to_string(), + artifact.slug.as_str(), + artifact.kind.as_str(), + artifact.label.as_str(), + artifact.summary.as_ref().map(NonEmptyText::as_str), + artifact.locator.as_str(), + artifact.media_type.as_ref().map(NonEmptyText::as_str), + artifact.revision, + encode_timestamp(artifact.created_at)?, + encode_timestamp(artifact.updated_at)?, ], )?; Ok(()) } -fn insert_frontier(tx: &Transaction<'_>, frontier: &FrontierRecord) -> Result<(), StoreError> { - let _ = tx.execute( - "INSERT INTO frontiers (id, label, root_contract_node_id, status, created_at, updated_at) - VALUES (?1, ?2, ?3, ?4, ?5, ?6)", +fn update_artifact_row( + transaction: &Transaction<'_>, + artifact: &ArtifactRecord, +) -> Result<(), StoreError> { + let _ = transaction.execute( + "UPDATE artifacts + SET slug = ?2, kind = ?3, label = ?4, summary = ?5, locator = ?6, media_type = ?7, revision = ?8, updated_at = ?9 + WHERE id = ?1", params![ - frontier.id.to_string(), - frontier.label.as_str(), - frontier.root_contract_node_id.to_string(), - encode_frontier_status(frontier.status), - encode_timestamp(frontier.created_at)?, - encode_timestamp(frontier.updated_at)?, + artifact.id.to_string(), + artifact.slug.as_str(), + artifact.kind.as_str(), + artifact.label.as_str(), + artifact.summary.as_ref().map(NonEmptyText::as_str), + artifact.locator.as_str(), + artifact.media_type.as_ref().map(NonEmptyText::as_str), + artifact.revision, + encode_timestamp(artifact.updated_at)?, ], )?; Ok(()) } -fn insert_run( - tx: &Transaction<'_>, - run: &RunRecord, - benchmark_suite: Option<&str>, - primary_metric: &MetricValue, - primary_metric_definition: &MetricDefinition, - supporting_metrics: &[MetricValue], - supporting_metric_definitions: &[MetricDefinition], +fn replace_artifact_attachments( + transaction: &Transaction<'_>, + artifact_id: ArtifactId, + attachments: &[AttachmentTargetRef], ) -> Result<(), StoreError> { - let started_at = match run.started_at { - Some(timestamp) => Some(encode_timestamp(timestamp)?), - None => None, - }; - let finished_at = match run.finished_at { - Some(timestamp) => Some(encode_timestamp(timestamp)?), - None => None, - }; - let _ = tx.execute( - "INSERT INTO runs ( - run_id, - node_id, - frontier_id, - status, - backend, - benchmark_suite, - working_directory, - argv_json, - env_json, - started_at, - finished_at - ) VALUES (?1, ?2, ?3, ?4, ?5, ?6, ?7, ?8, ?9, ?10, ?11)", - params![ - run.run_id.to_string(), - run.node_id.to_string(), - run.frontier_id.map(|id| id.to_string()), - encode_run_status(run.status), - encode_backend(run.backend), - benchmark_suite, - run.command.working_directory.as_str(), - encode_json(&run.command.argv)?, - encode_json(&run.command.env)?, - started_at, - finished_at, - ], + let _ = transaction.execute( + "DELETE FROM artifact_attachments WHERE artifact_id = ?1", + params![artifact_id.to_string()], )?; - - for (metric, definition) in std::iter::once((primary_metric, primary_metric_definition)).chain( - supporting_metrics - .iter() - .zip(supporting_metric_definitions.iter()), - ) { - let _ = tx.execute( - "INSERT INTO metrics (run_id, metric_key, unit, objective, value) - VALUES (?1, ?2, ?3, ?4, ?5)", + for (ordinal, attachment) in attachments.iter().enumerate() { + let _ = transaction.execute( + "INSERT INTO artifact_attachments (artifact_id, target_kind, target_id, ordinal) + VALUES (?1, ?2, ?3, ?4)", params![ - run.run_id.to_string(), - metric.key.as_str(), - encode_metric_unit(definition.unit), - encode_optimization_objective(definition.objective), - metric.value, + artifact_id.to_string(), + attachment_target_kind_name(*attachment), + attachment.opaque_id(), + i64::try_from(ordinal).unwrap_or(i64::MAX), ], )?; } Ok(()) } -fn insert_open_experiment( - tx: &Transaction<'_>, - experiment: &OpenExperiment, -) -> Result<(), StoreError> { - let _ = tx.execute( - "INSERT INTO open_experiments ( - id, - frontier_id, - hypothesis_node_id, - title, - summary, - created_at - ) VALUES (?1, ?2, ?3, ?4, ?5, ?6)", - params![ - experiment.id.to_string(), - experiment.frontier_id.to_string(), - experiment.hypothesis_node_id.to_string(), - experiment.title.as_str(), - experiment.summary.as_ref().map(NonEmptyText::as_str), - encode_timestamp(experiment.created_at)?, - ], - )?; - Ok(()) -} - -fn delete_open_experiment( - tx: &Transaction<'_>, - experiment_id: fidget_spinner_core::ExperimentId, +fn replace_experiment_dimensions( + transaction: &Transaction<'_>, + experiment_id: ExperimentId, + outcome: Option<&ExperimentOutcome>, ) -> Result<(), StoreError> { - let _ = tx.execute( - "DELETE FROM open_experiments WHERE id = ?1", + let _ = transaction.execute( + "DELETE FROM experiment_dimensions WHERE experiment_id = ?1", params![experiment_id.to_string()], )?; + if let Some(outcome) = outcome { + for (key, value) in &outcome.dimensions { + let _ = transaction.execute( + "INSERT INTO experiment_dimensions (experiment_id, key, value_json) VALUES (?1, ?2, ?3)", + params![experiment_id.to_string(), key.as_str(), encode_json(value)?], + )?; + } + } Ok(()) } -fn insert_experiment( - tx: &Transaction<'_>, - experiment: &CompletedExperiment, +fn replace_experiment_metrics( + transaction: &Transaction<'_>, + experiment_id: ExperimentId, + outcome: Option<&ExperimentOutcome>, ) -> Result<(), StoreError> { - let _ = tx.execute( - "INSERT INTO experiments ( - id, - frontier_id, - hypothesis_node_id, - run_node_id, - run_id, - analysis_node_id, - decision_node_id, - title, - summary, - benchmark_suite, - primary_metric_json, - supporting_metrics_json, - note_summary, - note_next_json, - verdict, - created_at - ) VALUES (?1, ?2, ?3, ?4, ?5, ?6, ?7, ?8, ?9, ?10, ?11, ?12, ?13, ?14, ?15, ?16)", - params![ - experiment.id.to_string(), - experiment.frontier_id.to_string(), - experiment.hypothesis_node_id.to_string(), - experiment.run_node_id.to_string(), - experiment.run_id.to_string(), - experiment.analysis_node_id.map(|id| id.to_string()), - experiment.decision_node_id.to_string(), - experiment.title.as_str(), - experiment.summary.as_ref().map(NonEmptyText::as_str), - benchmark_suite_label(&experiment.result.dimensions), - encode_json(&experiment.result.primary_metric)?, - encode_json(&experiment.result.supporting_metrics)?, - experiment.note.summary.as_str(), - encode_json(&experiment.note.next_hypotheses)?, - encode_frontier_verdict(experiment.verdict), - encode_timestamp(experiment.created_at)?, - ], + let _ = transaction.execute( + "DELETE FROM experiment_metrics WHERE experiment_id = ?1", + params![experiment_id.to_string()], )?; + if let Some(outcome) = outcome { + for (ordinal, metric) in all_metrics(outcome).into_iter().enumerate() { + let _ = transaction.execute( + "INSERT INTO experiment_metrics (experiment_id, key, ordinal, is_primary, value) + VALUES (?1, ?2, ?3, ?4, ?5)", + params![ + experiment_id.to_string(), + metric.key.as_str(), + i64::try_from(ordinal).unwrap_or(i64::MAX), + bool_to_sql(ordinal == 0), + metric.value, + ], + )?; + } + } Ok(()) } -fn insert_event( - tx: &Transaction<'_>, +fn record_event( + transaction: &Transaction<'_>, entity_kind: &str, entity_id: &str, + revision: u64, event_kind: &str, - payload: Value, + snapshot: &impl Serialize, ) -> Result<(), StoreError> { - let _ = tx.execute( - "INSERT INTO events (entity_kind, entity_id, event_kind, payload_json, created_at) - VALUES (?1, ?2, ?3, ?4, ?5)", + let _ = transaction.execute( + "INSERT INTO events (entity_kind, entity_id, revision, event_kind, occurred_at, snapshot_json) + VALUES (?1, ?2, ?3, ?4, ?5, ?6)", params![ entity_kind, entity_id, + revision, event_kind, - payload.to_string(), encode_timestamp(OffsetDateTime::now_utc())?, + encode_json(snapshot)?, ], )?; Ok(()) } -fn load_open_experiment( - connection: &Connection, - experiment_id: fidget_spinner_core::ExperimentId, -) -> Result<Option<OpenExperiment>, StoreError> { - let mut statement = connection.prepare( - "SELECT - id, - frontier_id, - hypothesis_node_id, - title, - summary, - created_at - FROM open_experiments - WHERE id = ?1", - )?; - statement - .query_row(params![experiment_id.to_string()], |row| { - Ok(OpenExperiment { - id: parse_experiment_id(&row.get::<_, String>(0)?) - .map_err(to_sql_conversion_error)?, - frontier_id: parse_frontier_id(&row.get::<_, String>(1)?) - .map_err(to_sql_conversion_error)?, - hypothesis_node_id: parse_node_id(&row.get::<_, String>(2)?) - .map_err(to_sql_conversion_error)?, - title: NonEmptyText::new(row.get::<_, String>(3)?) - .map_err(core_to_sql_conversion_error)?, - summary: row - .get::<_, Option<String>>(4)? - .map(NonEmptyText::new) - .transpose() - .map_err(core_to_sql_conversion_error)?, - created_at: decode_timestamp(&row.get::<_, String>(5)?) - .map_err(to_sql_conversion_error)?, - }) - }) - .optional() - .map_err(StoreError::from) -} - -fn summarize_open_experiment(experiment: &OpenExperiment) -> OpenExperimentSummary { - OpenExperimentSummary { - id: experiment.id, - frontier_id: experiment.frontier_id, - hypothesis_node_id: experiment.hypothesis_node_id, - title: experiment.title.clone(), - summary: experiment.summary.clone(), - created_at: experiment.created_at, - } -} - -fn touch_frontier( - tx: &Transaction<'_>, - frontier_id: fidget_spinner_core::FrontierId, -) -> Result<(), StoreError> { - let _ = tx.execute( - "UPDATE frontiers SET updated_at = ?1 WHERE id = ?2", - params![ - encode_timestamp(OffsetDateTime::now_utc())?, - frontier_id.to_string() - ], - )?; - Ok(()) +fn decode_frontier_row(row: &rusqlite::Row<'_>) -> Result<FrontierRecord, rusqlite::Error> { + Ok(FrontierRecord { + id: FrontierId::from_uuid(parse_uuid_sql(&row.get::<_, String>(0)?)?), + slug: parse_slug(&row.get::<_, String>(1)?)?, + label: parse_non_empty_text(&row.get::<_, String>(2)?)?, + objective: parse_non_empty_text(&row.get::<_, String>(3)?)?, + status: parse_frontier_status(&row.get::<_, String>(4)?)?, + brief: decode_json(&row.get::<_, String>(5)?).map_err(to_sql_conversion_error)?, + revision: row.get(6)?, + created_at: parse_timestamp_sql(&row.get::<_, String>(7)?)?, + updated_at: parse_timestamp_sql(&row.get::<_, String>(8)?)?, + }) } -fn read_node_row(row: &rusqlite::Row<'_>) -> Result<DagNode, rusqlite::Error> { - let payload_json = row.get::<_, String>(9)?; - let diagnostics_json = row.get::<_, String>(10)?; - let payload = decode_json::<NodePayload>(&payload_json).map_err(to_sql_conversion_error)?; - let diagnostics = - decode_json::<NodeDiagnostics>(&diagnostics_json).map_err(to_sql_conversion_error)?; - Ok(DagNode { - id: parse_node_id(&row.get::<_, String>(0)?).map_err(to_sql_conversion_error)?, - class: parse_node_class(&row.get::<_, String>(1)?).map_err(to_sql_conversion_error)?, - track: parse_node_track(&row.get::<_, String>(2)?).map_err(to_sql_conversion_error)?, - frontier_id: row - .get::<_, Option<String>>(3)? - .map(|raw| parse_frontier_id(&raw)) - .transpose() - .map_err(to_sql_conversion_error)?, +fn decode_experiment_row(row: &rusqlite::Row<'_>) -> Result<ExperimentRecord, rusqlite::Error> { + Ok(ExperimentRecord { + id: ExperimentId::from_uuid(parse_uuid_sql(&row.get::<_, String>(0)?)?), + slug: parse_slug(&row.get::<_, String>(1)?)?, + frontier_id: FrontierId::from_uuid(parse_uuid_sql(&row.get::<_, String>(2)?)?), + hypothesis_id: HypothesisId::from_uuid(parse_uuid_sql(&row.get::<_, String>(3)?)?), archived: row.get::<_, i64>(4)? != 0, - title: NonEmptyText::new(row.get::<_, String>(5)?).map_err(core_to_sql_conversion_error)?, - summary: row - .get::<_, Option<String>>(6)? - .map(NonEmptyText::new) - .transpose() - .map_err(core_to_sql_conversion_error)?, - tags: BTreeSet::new(), - payload, - annotations: Vec::new(), - diagnostics, - agent_session_id: row - .get::<_, Option<String>>(11)? - .map(|raw| parse_agent_session_id(&raw)) - .transpose() - .map_err(to_sql_conversion_error)?, - created_at: decode_timestamp(&row.get::<_, String>(12)?) - .map_err(to_sql_conversion_error)?, - updated_at: decode_timestamp(&row.get::<_, String>(13)?) - .map_err(to_sql_conversion_error)?, + title: parse_non_empty_text(&row.get::<_, String>(5)?)?, + summary: parse_optional_non_empty_text(row.get::<_, Option<String>>(6)?)?, + tags: decode_json(&row.get::<_, String>(7)?).map_err(to_sql_conversion_error)?, + status: parse_experiment_status(&row.get::<_, String>(8)?)?, + outcome: row + .get::<_, Option<String>>(9)? + .map(|raw| decode_json(&raw).map_err(to_sql_conversion_error)) + .transpose()?, + revision: row.get(10)?, + created_at: parse_timestamp_sql(&row.get::<_, String>(11)?)?, + updated_at: parse_timestamp_sql(&row.get::<_, String>(12)?)?, }) } -fn read_frontier_row(row: &rusqlite::Row<'_>) -> Result<FrontierRecord, StoreError> { - Ok(FrontierRecord { - id: parse_frontier_id(&row.get::<_, String>(0)?)?, - label: NonEmptyText::new(row.get::<_, String>(1)?)?, - root_contract_node_id: parse_node_id(&row.get::<_, String>(2)?)?, - status: parse_frontier_status(&row.get::<_, String>(3)?)?, - created_at: decode_timestamp(&row.get::<_, String>(4)?)?, - updated_at: decode_timestamp(&row.get::<_, String>(5)?)?, +fn decode_artifact_row(row: &rusqlite::Row<'_>) -> Result<ArtifactRecord, rusqlite::Error> { + Ok(ArtifactRecord { + id: ArtifactId::from_uuid(parse_uuid_sql(&row.get::<_, String>(0)?)?), + slug: parse_slug(&row.get::<_, String>(1)?)?, + kind: parse_artifact_kind(&row.get::<_, String>(2)?)?, + label: parse_non_empty_text(&row.get::<_, String>(3)?)?, + summary: parse_optional_non_empty_text(row.get::<_, Option<String>>(4)?)?, + locator: parse_non_empty_text(&row.get::<_, String>(5)?)?, + media_type: parse_optional_non_empty_text(row.get::<_, Option<String>>(6)?)?, + revision: row.get(7)?, + created_at: parse_timestamp_sql(&row.get::<_, String>(8)?)?, + updated_at: parse_timestamp_sql(&row.get::<_, String>(9)?)?, }) } -fn frontier_contract_payload(contract: &FrontierContract) -> Result<JsonObject, StoreError> { - json_object(json!({ - "objective": contract.objective.as_str(), - "benchmark_suites": contract - .evaluation - .benchmark_suites - .iter() - .map(NonEmptyText::as_str) - .collect::<Vec<_>>(), - "primary_metric": metric_spec_json(&contract.evaluation.primary_metric), - "supporting_metrics": contract - .evaluation - .supporting_metrics - .iter() - .map(metric_spec_json) - .collect::<Vec<_>>(), - "promotion_criteria": contract - .promotion_criteria - .iter() - .map(NonEmptyText::as_str) - .collect::<Vec<_>>(), - })) +fn decode_metric_definition_row( + row: &rusqlite::Row<'_>, +) -> Result<MetricDefinition, rusqlite::Error> { + Ok(MetricDefinition { + key: parse_non_empty_text(&row.get::<_, String>(0)?)?, + unit: parse_metric_unit(&row.get::<_, String>(1)?)?, + objective: parse_optimization_objective(&row.get::<_, String>(2)?)?, + visibility: parse_metric_visibility(&row.get::<_, String>(3)?)?, + description: parse_optional_non_empty_text(row.get::<_, Option<String>>(4)?)?, + created_at: parse_timestamp_sql(&row.get::<_, String>(5)?)?, + updated_at: parse_timestamp_sql(&row.get::<_, String>(6)?)?, + }) } -fn metric_spec_json(metric: &MetricSpec) -> Value { - json!({ - "metric_key": metric.metric_key.as_str(), - "unit": encode_metric_unit(metric.unit), - "objective": encode_optimization_objective(metric.objective), +fn decode_run_dimension_definition_row( + row: &rusqlite::Row<'_>, +) -> Result<RunDimensionDefinition, rusqlite::Error> { + Ok(RunDimensionDefinition { + key: parse_non_empty_text(&row.get::<_, String>(0)?)?, + value_type: parse_field_value_type(&row.get::<_, String>(1)?)?, + description: parse_optional_non_empty_text(row.get::<_, Option<String>>(2)?)?, + created_at: parse_timestamp_sql(&row.get::<_, String>(3)?)?, + updated_at: parse_timestamp_sql(&row.get::<_, String>(4)?)?, }) } -fn json_object(value: Value) -> Result<JsonObject, StoreError> { - match value { - Value::Object(map) => Ok(map), - other => Err(StoreError::Json(serde_json::Error::io(io::Error::new( - io::ErrorKind::InvalidInput, - format!("expected JSON object, got {other:?}"), - )))), +fn enforce_revision( + kind: &'static str, + selector: &str, + expected: Option<u64>, + observed: u64, +) -> Result<(), StoreError> { + if let Some(expected) = expected + && expected != observed + { + return Err(StoreError::RevisionMismatch { + kind, + selector: selector.to_owned(), + expected, + observed, + }); } -} - -fn write_json_file<T: Serialize>(path: &Utf8Path, value: &T) -> Result<(), StoreError> { - let serialized = serde_json::to_string_pretty(value)?; - fs::write(path.as_std_path(), serialized)?; Ok(()) } -fn read_json_file<T: for<'de> Deserialize<'de>>(path: &Utf8Path) -> Result<T, StoreError> { - let bytes = fs::read(path.as_std_path())?; - serde_json::from_slice(&bytes).map_err(StoreError::from) -} - -fn encode_json<T: Serialize>(value: &T) -> Result<String, StoreError> { - serde_json::to_string(value).map_err(StoreError::from) -} - -fn decode_json<T: for<'de> Deserialize<'de>>(raw: &str) -> Result<T, StoreError> { - serde_json::from_str(raw).map_err(StoreError::from) -} - -fn encode_timestamp(timestamp: OffsetDateTime) -> Result<String, StoreError> { - timestamp.format(&Rfc3339).map_err(StoreError::from) -} - -fn decode_timestamp(raw: &str) -> Result<OffsetDateTime, StoreError> { - OffsetDateTime::parse(raw, &Rfc3339).map_err(StoreError::from) -} - -fn state_root(project_root: &Utf8Path) -> Utf8PathBuf { - project_root.join(STORE_DIR_NAME) -} - -#[must_use] -pub fn discover_project_root(path: impl AsRef<Utf8Path>) -> Option<Utf8PathBuf> { - let mut cursor = discovery_start(path.as_ref()); - loop { - if state_root(&cursor).exists() { - return Some(cursor); - } - let parent = cursor.parent()?; - cursor = parent.to_path_buf(); - } -} - -fn discovery_start(path: &Utf8Path) -> Utf8PathBuf { - match fs::metadata(path.as_std_path()) { - Ok(metadata) if metadata.is_file() => path - .parent() - .map_or_else(|| path.to_path_buf(), Utf8Path::to_path_buf), - _ => path.to_path_buf(), +fn validate_hypothesis_body(body: &NonEmptyText) -> Result<(), StoreError> { + let raw = body.as_str().trim(); + if raw.contains("\n\n") + || raw.lines().any(|line| { + let trimmed = line.trim_start(); + trimmed.starts_with('-') || trimmed.starts_with('*') || trimmed.starts_with('#') + }) + { + return Err(StoreError::HypothesisBodyMustBeSingleParagraph); } + Ok(()) } -fn to_sql_conversion_error(error: StoreError) -> rusqlite::Error { - rusqlite::Error::FromSqlConversionFailure(0, rusqlite::types::Type::Text, Box::new(error)) -} - -fn core_to_sql_conversion_error(error: fidget_spinner_core::CoreError) -> rusqlite::Error { - to_sql_conversion_error(StoreError::from(error)) -} - -fn parse_uuid(raw: &str) -> Result<Uuid, StoreError> { - Uuid::parse_str(raw).map_err(StoreError::from) -} - -fn parse_node_id(raw: &str) -> Result<fidget_spinner_core::NodeId, StoreError> { - Ok(fidget_spinner_core::NodeId::from_uuid(parse_uuid(raw)?)) -} - -fn parse_frontier_id(raw: &str) -> Result<fidget_spinner_core::FrontierId, StoreError> { - Ok(fidget_spinner_core::FrontierId::from_uuid(parse_uuid(raw)?)) -} - -fn parse_experiment_id(raw: &str) -> Result<fidget_spinner_core::ExperimentId, StoreError> { - Ok(fidget_spinner_core::ExperimentId::from_uuid(parse_uuid( - raw, - )?)) -} - -fn parse_run_id(raw: &str) -> Result<fidget_spinner_core::RunId, StoreError> { - Ok(fidget_spinner_core::RunId::from_uuid(parse_uuid(raw)?)) -} - -fn parse_agent_session_id(raw: &str) -> Result<fidget_spinner_core::AgentSessionId, StoreError> { - Ok(fidget_spinner_core::AgentSessionId::from_uuid(parse_uuid( - raw, - )?)) -} - -fn parse_annotation_id(raw: &str) -> Result<fidget_spinner_core::AnnotationId, StoreError> { - Ok(fidget_spinner_core::AnnotationId::from_uuid(parse_uuid( - raw, - )?)) -} - -fn parse_node_class(raw: &str) -> Result<NodeClass, StoreError> { +fn parse_frontier_status(raw: &str) -> Result<FrontierStatus, rusqlite::Error> { match raw { - "contract" => Ok(NodeClass::Contract), - "hypothesis" => Ok(NodeClass::Hypothesis), - "run" => Ok(NodeClass::Run), - "analysis" => Ok(NodeClass::Analysis), - "decision" => Ok(NodeClass::Decision), - "source" => Ok(NodeClass::Source), - "note" => Ok(NodeClass::Note), - other => Err(StoreError::Json(serde_json::Error::io(io::Error::new( - io::ErrorKind::InvalidData, - format!("unknown node class `{other}`"), - )))), - } -} - -fn encode_node_track(track: fidget_spinner_core::NodeTrack) -> &'static str { - match track { - fidget_spinner_core::NodeTrack::CorePath => "core-path", - fidget_spinner_core::NodeTrack::OffPath => "off-path", + "exploring" => Ok(FrontierStatus::Exploring), + "paused" => Ok(FrontierStatus::Paused), + "archived" => Ok(FrontierStatus::Archived), + _ => Err(to_sql_conversion_error(StoreError::Json( + serde_json::Error::io(io::Error::new( + io::ErrorKind::InvalidData, + format!("invalid frontier status `{raw}`"), + )), + ))), } } -fn parse_node_track(raw: &str) -> Result<fidget_spinner_core::NodeTrack, StoreError> { +fn parse_metric_unit(raw: &str) -> Result<MetricUnit, rusqlite::Error> { match raw { - "core-path" => Ok(fidget_spinner_core::NodeTrack::CorePath), - "off-path" => Ok(fidget_spinner_core::NodeTrack::OffPath), - other => Err(StoreError::Json(serde_json::Error::io(io::Error::new( - io::ErrorKind::InvalidData, - format!("unknown node track `{other}`"), - )))), - } -} - -fn encode_annotation_visibility(visibility: AnnotationVisibility) -> &'static str { - match visibility { - AnnotationVisibility::HiddenByDefault => "hidden", - AnnotationVisibility::Visible => "visible", + "seconds" => Ok(MetricUnit::Seconds), + "bytes" => Ok(MetricUnit::Bytes), + "count" => Ok(MetricUnit::Count), + "ratio" => Ok(MetricUnit::Ratio), + "custom" => Ok(MetricUnit::Custom), + _ => Err(to_sql_conversion_error(StoreError::Json( + serde_json::Error::io(io::Error::new( + io::ErrorKind::InvalidData, + format!("invalid metric unit `{raw}`"), + )), + ))), } } -fn parse_annotation_visibility(raw: &str) -> Result<AnnotationVisibility, StoreError> { +fn parse_optimization_objective(raw: &str) -> Result<OptimizationObjective, rusqlite::Error> { match raw { - "hidden" => Ok(AnnotationVisibility::HiddenByDefault), - "visible" => Ok(AnnotationVisibility::Visible), - other => Err(StoreError::Json(serde_json::Error::io(io::Error::new( - io::ErrorKind::InvalidData, - format!("unknown annotation visibility `{other}`"), - )))), + "minimize" => Ok(OptimizationObjective::Minimize), + "maximize" => Ok(OptimizationObjective::Maximize), + "target" => Ok(OptimizationObjective::Target), + _ => Err(to_sql_conversion_error(StoreError::Json( + serde_json::Error::io(io::Error::new( + io::ErrorKind::InvalidData, + format!("invalid objective `{raw}`"), + )), + ))), } } -fn encode_edge_kind(kind: EdgeKind) -> &'static str { - match kind { - EdgeKind::Lineage => "lineage", - EdgeKind::Evidence => "evidence", - EdgeKind::Comparison => "comparison", - EdgeKind::Supersedes => "supersedes", - EdgeKind::Annotation => "annotation", +fn parse_metric_visibility(raw: &str) -> Result<MetricVisibility, rusqlite::Error> { + match raw { + "canonical" => Ok(MetricVisibility::Canonical), + "minor" => Ok(MetricVisibility::Minor), + "hidden" => Ok(MetricVisibility::Hidden), + "archived" => Ok(MetricVisibility::Archived), + _ => Err(to_sql_conversion_error(StoreError::Json( + serde_json::Error::io(io::Error::new( + io::ErrorKind::InvalidData, + format!("invalid metric visibility `{raw}`"), + )), + ))), } } -fn encode_frontier_status(status: FrontierStatus) -> &'static str { - match status { - FrontierStatus::Exploring => "exploring", - FrontierStatus::Paused => "paused", - FrontierStatus::Saturated => "saturated", - FrontierStatus::Archived => "archived", +fn parse_field_value_type(raw: &str) -> Result<FieldValueType, rusqlite::Error> { + match raw { + "string" => Ok(FieldValueType::String), + "numeric" => Ok(FieldValueType::Numeric), + "boolean" => Ok(FieldValueType::Boolean), + "timestamp" => Ok(FieldValueType::Timestamp), + _ => Err(to_sql_conversion_error(StoreError::Json( + serde_json::Error::io(io::Error::new( + io::ErrorKind::InvalidData, + format!("invalid field type `{raw}`"), + )), + ))), } } -fn parse_frontier_status(raw: &str) -> Result<FrontierStatus, StoreError> { +fn parse_experiment_status(raw: &str) -> Result<ExperimentStatus, rusqlite::Error> { match raw { - "exploring" => Ok(FrontierStatus::Exploring), - "paused" => Ok(FrontierStatus::Paused), - "saturated" => Ok(FrontierStatus::Saturated), - "archived" => Ok(FrontierStatus::Archived), - other => Err(StoreError::Json(serde_json::Error::io(io::Error::new( - io::ErrorKind::InvalidData, - format!("unknown frontier status `{other}`"), - )))), + "open" => Ok(ExperimentStatus::Open), + "closed" => Ok(ExperimentStatus::Closed), + _ => Err(to_sql_conversion_error(StoreError::Json( + serde_json::Error::io(io::Error::new( + io::ErrorKind::InvalidData, + format!("invalid experiment status `{raw}`"), + )), + ))), } } -fn encode_run_status(status: RunStatus) -> &'static str { - match status { - RunStatus::Queued => "queued", - RunStatus::Running => "running", - RunStatus::Succeeded => "succeeded", - RunStatus::Failed => "failed", - RunStatus::Cancelled => "cancelled", +fn parse_artifact_kind(raw: &str) -> Result<ArtifactKind, rusqlite::Error> { + match raw { + "document" => Ok(ArtifactKind::Document), + "link" => Ok(ArtifactKind::Link), + "log" => Ok(ArtifactKind::Log), + "table" => Ok(ArtifactKind::Table), + "plot" => Ok(ArtifactKind::Plot), + "dump" => Ok(ArtifactKind::Dump), + "binary" => Ok(ArtifactKind::Binary), + "other" => Ok(ArtifactKind::Other), + _ => Err(to_sql_conversion_error(StoreError::Json( + serde_json::Error::io(io::Error::new( + io::ErrorKind::InvalidData, + format!("invalid artifact kind `{raw}`"), + )), + ))), } } -fn encode_backend(backend: ExecutionBackend) -> &'static str { - match backend { - ExecutionBackend::LocalProcess => "local-process", - ExecutionBackend::WorktreeProcess => "worktree-process", - ExecutionBackend::SshProcess => "ssh-process", +fn resolve_selector(raw: &str) -> Result<Selector, StoreError> { + if let Ok(uuid) = Uuid::parse_str(raw) { + Ok(Selector::Id(uuid)) + } else { + Ok(Selector::Slug(Slug::new(raw.to_owned())?)) } } -fn encode_field_value_type(value_type: FieldValueType) -> &'static str { - value_type.as_str() +enum Selector { + Id(Uuid), + Slug(Slug), } -fn decode_field_value_type(raw: &str) -> Result<FieldValueType, StoreError> { - match raw { - "string" => Ok(FieldValueType::String), - "numeric" => Ok(FieldValueType::Numeric), - "boolean" => Ok(FieldValueType::Boolean), - "timestamp" => Ok(FieldValueType::Timestamp), - other => Err(StoreError::Json(serde_json::Error::io(io::Error::new( - io::ErrorKind::InvalidData, - format!("unknown field value type `{other}`"), - )))), +fn slugify(raw: &str) -> Result<Slug, CoreError> { + let mut slug = String::with_capacity(raw.len()); + let mut last_was_separator = true; + for character in raw.chars().flat_map(char::to_lowercase) { + if character.is_ascii_alphanumeric() { + slug.push(character); + last_was_separator = false; + continue; + } + if matches!(character, ' ' | '-' | '_' | '/' | ':') && !last_was_separator { + slug.push('-'); + last_was_separator = true; + } } -} - -fn encode_metric_unit(unit: MetricUnit) -> &'static str { - match unit { - MetricUnit::Seconds => "seconds", - MetricUnit::Bytes => "bytes", - MetricUnit::Count => "count", - MetricUnit::Ratio => "ratio", - MetricUnit::Custom => "custom", + if slug.ends_with('-') { + let _ = slug.pop(); } -} - -fn decode_metric_unit(raw: &str) -> Result<MetricUnit, StoreError> { - match raw { - "seconds" => Ok(MetricUnit::Seconds), - "bytes" => Ok(MetricUnit::Bytes), - "count" => Ok(MetricUnit::Count), - "ratio" => Ok(MetricUnit::Ratio), - "custom" => Ok(MetricUnit::Custom), - other => Err(StoreError::Json(serde_json::Error::io(io::Error::new( - io::ErrorKind::InvalidData, - format!("unknown metric unit `{other}`"), - )))), + if slug.is_empty() { + slug.push_str("untitled"); } + Slug::new(slug) } -fn encode_optimization_objective(objective: OptimizationObjective) -> &'static str { - match objective { - OptimizationObjective::Minimize => "minimize", - OptimizationObjective::Maximize => "maximize", - OptimizationObjective::Target => "target", +fn vertex_kind_name(vertex: VertexRef) -> &'static str { + match vertex { + VertexRef::Hypothesis(_) => "hypothesis", + VertexRef::Experiment(_) => "experiment", } } -fn decode_optimization_objective(raw: &str) -> Result<OptimizationObjective, StoreError> { - match raw { - "minimize" => Ok(OptimizationObjective::Minimize), - "maximize" => Ok(OptimizationObjective::Maximize), - "target" => Ok(OptimizationObjective::Target), - other => Err(StoreError::Json(serde_json::Error::io(io::Error::new( - io::ErrorKind::InvalidData, - format!("unknown optimization objective `{other}`"), - )))), +fn attachment_target_kind_name(target: AttachmentTargetRef) -> &'static str { + match target { + AttachmentTargetRef::Frontier(_) => "frontier", + AttachmentTargetRef::Hypothesis(_) => "hypothesis", + AttachmentTargetRef::Experiment(_) => "experiment", } } -fn encode_frontier_verdict(verdict: FrontierVerdict) -> &'static str { - match verdict { - FrontierVerdict::Accepted => "accepted", - FrontierVerdict::Kept => "kept", - FrontierVerdict::Parked => "parked", - FrontierVerdict::Rejected => "rejected", +fn decode_vertex_ref(kind: &str, raw_id: &str) -> Result<VertexRef, rusqlite::Error> { + let uuid = parse_uuid_sql(raw_id)?; + match kind { + "hypothesis" => Ok(VertexRef::Hypothesis(HypothesisId::from_uuid(uuid))), + "experiment" => Ok(VertexRef::Experiment(ExperimentId::from_uuid(uuid))), + _ => Err(to_sql_conversion_error(StoreError::Json( + serde_json::Error::io(io::Error::new( + io::ErrorKind::InvalidData, + format!("invalid vertex kind `{kind}`"), + )), + ))), } } -fn parse_frontier_verdict(raw: &str) -> Result<FrontierVerdict, StoreError> { - match raw { - "accepted" => Ok(FrontierVerdict::Accepted), - "kept" => Ok(FrontierVerdict::Kept), - "parked" => Ok(FrontierVerdict::Parked), - "rejected" => Ok(FrontierVerdict::Rejected), - other => Err(StoreError::Json(serde_json::Error::io(io::Error::new( - io::ErrorKind::InvalidData, - format!("unknown frontier verdict `{other}`"), - )))), +fn decode_attachment_target( + kind: &str, + raw_id: &str, +) -> Result<AttachmentTargetRef, rusqlite::Error> { + let uuid = parse_uuid_sql(raw_id)?; + match kind { + "frontier" => Ok(AttachmentTargetRef::Frontier(FrontierId::from_uuid(uuid))), + "hypothesis" => Ok(AttachmentTargetRef::Hypothesis(HypothesisId::from_uuid( + uuid, + ))), + "experiment" => Ok(AttachmentTargetRef::Experiment(ExperimentId::from_uuid( + uuid, + ))), + _ => Err(to_sql_conversion_error(StoreError::Json( + serde_json::Error::io(io::Error::new( + io::ErrorKind::InvalidData, + format!("invalid attachment target kind `{kind}`"), + )), + ))), } } -type RunDimensionColumns = (Option<String>, Option<f64>, Option<i64>, Option<String>); - -fn encode_run_dimension_columns( - value: &RunDimensionValue, -) -> Result<RunDimensionColumns, StoreError> { - match value { - RunDimensionValue::String(item) => Ok((Some(item.to_string()), None, None, None)), - RunDimensionValue::Numeric(item) => Ok((None, Some(*item), None, None)), - RunDimensionValue::Boolean(item) => Ok((None, None, Some(i64::from(*item)), None)), - RunDimensionValue::Timestamp(item) => { - let _ = OffsetDateTime::parse(item.as_str(), &Rfc3339)?; - Ok((None, None, None, Some(item.to_string()))) +fn derive_active_tags( + active_hypotheses: &[HypothesisCurrentState], + open_experiments: &[ExperimentSummary], +) -> Vec<TagName> { + let mut tags = BTreeSet::new(); + for state in active_hypotheses { + tags.extend(state.hypothesis.tags.iter().cloned()); + for experiment in &state.open_experiments { + tags.extend(experiment.tags.iter().cloned()); } } -} - -fn decode_run_dimension_value( - value_type: FieldValueType, - value_text: Option<String>, - value_numeric: Option<f64>, - value_boolean: Option<i64>, - value_timestamp: Option<String>, -) -> Result<RunDimensionValue, StoreError> { - match value_type { - FieldValueType::String => Ok(RunDimensionValue::String(NonEmptyText::new( - value_text.ok_or_else(|| { - StoreError::Json(serde_json::Error::io(io::Error::new( - io::ErrorKind::InvalidData, - "missing string dimension value", - ))) - })?, - )?)), - FieldValueType::Numeric => Ok(RunDimensionValue::Numeric(value_numeric.ok_or_else( - || { - StoreError::Json(serde_json::Error::io(io::Error::new( - io::ErrorKind::InvalidData, - "missing numeric dimension value", - ))) - }, - )?)), - FieldValueType::Boolean => Ok(RunDimensionValue::Boolean( - value_boolean.ok_or_else(|| { - StoreError::Json(serde_json::Error::io(io::Error::new( - io::ErrorKind::InvalidData, - "missing boolean dimension value", - ))) - })? != 0, - )), - FieldValueType::Timestamp => { - let value = value_timestamp.ok_or_else(|| { - StoreError::Json(serde_json::Error::io(io::Error::new( - io::ErrorKind::InvalidData, - "missing timestamp dimension value", - ))) - })?; - let _ = OffsetDateTime::parse(&value, &Rfc3339)?; - Ok(RunDimensionValue::Timestamp(NonEmptyText::new(value)?)) - } + for experiment in open_experiments { + tags.extend(experiment.tags.iter().cloned()); } + tags.into_iter().collect() } -fn dimension_value_text(value: &RunDimensionValue) -> String { - match value { - RunDimensionValue::String(item) | RunDimensionValue::Timestamp(item) => item.to_string(), - RunDimensionValue::Numeric(item) => item.to_string(), - RunDimensionValue::Boolean(item) => item.to_string(), - } +fn dimension_subset_matches( + expected: &BTreeMap<NonEmptyText, RunDimensionValue>, + observed: &BTreeMap<NonEmptyText, RunDimensionValue>, +) -> bool { + expected.iter().all(|(key, value)| { + observed + .get(key) + .is_some_and(|candidate| candidate == value) + }) } -fn value_kind_name(value: &Value) -> &'static str { - match value { - Value::Null => "null", - Value::Bool(_) => "boolean", - Value::Number(_) => "numeric", - Value::String(_) => "string", - Value::Array(_) => "array", - Value::Object(_) => "object", +fn compare_metric_values(left: f64, right: f64, order: MetricRankOrder) -> std::cmp::Ordering { + let ordering = left + .partial_cmp(&right) + .unwrap_or(std::cmp::Ordering::Equal); + match order { + MetricRankOrder::Asc => ordering, + MetricRankOrder::Desc => ordering.reverse(), } } -#[cfg(test)] -mod tests { - use std::collections::{BTreeMap, BTreeSet}; - - use serde_json::json; - - use super::{ - CloseExperimentRequest, CreateFrontierRequest, CreateNodeRequest, DefineMetricRequest, - DefineRunDimensionRequest, ListNodesQuery, MetricBestQuery, MetricFieldSource, - MetricKeyQuery, MetricRankOrder, OpenExperimentRequest, PROJECT_SCHEMA_NAME, ProjectStore, - RemoveSchemaFieldRequest, UpsertSchemaFieldRequest, - }; - use fidget_spinner_core::{ - CommandRecipe, DiagnosticSeverity, EvaluationProtocol, FieldPresence, FieldRole, - FieldValueType, FrontierContract, FrontierNote, FrontierVerdict, InferencePolicy, - MetricSpec, MetricUnit, MetricValue, NodeAnnotation, NodeClass, NodePayload, NonEmptyText, - OptimizationObjective, RunDimensionValue, TagName, - }; - - fn temp_project_root(label: &str) -> camino::Utf8PathBuf { - let mut path = std::env::temp_dir(); - path.push(format!( - "fidget_spinner_store_test_{}_{}", - label, - uuid::Uuid::now_v7() - )); - camino::Utf8PathBuf::from(path.to_string_lossy().into_owned()) - } - - #[test] - fn init_writes_model_facing_schema_file() -> Result<(), super::StoreError> { - let root = temp_project_root("schema"); - let store = ProjectStore::init( - &root, - NonEmptyText::new("test project")?, - NonEmptyText::new("local.test")?, - )?; +fn all_metrics(outcome: &ExperimentOutcome) -> Vec<MetricValue> { + std::iter::once(outcome.primary_metric.clone()) + .chain(outcome.supporting_metrics.clone()) + .collect() +} - assert!(store.state_root().join(PROJECT_SCHEMA_NAME).exists()); - Ok(()) - } +fn bool_to_sql(value: bool) -> i64 { + i64::from(value) +} - #[test] - fn add_node_persists_hidden_annotations() -> Result<(), super::StoreError> { - let root = temp_project_root("notes"); - let mut store = ProjectStore::init( - &root, - NonEmptyText::new("test project")?, - NonEmptyText::new("local.test")?, - )?; - let node = store.add_node(CreateNodeRequest { - class: NodeClass::Source, - frontier_id: None, - title: NonEmptyText::new("feature sketch")?, - summary: Some(NonEmptyText::new("research note")?), - tags: None, - payload: NodePayload::with_schema( - store.schema().schema_ref(), - super::json_object(json!({"body": "freeform"}))?, - ), - annotations: vec![NodeAnnotation::hidden(NonEmptyText::new( - "private scratch", - )?)], - attachments: Vec::new(), - })?; - let loaded = store - .get_node(node.id)? - .ok_or(super::StoreError::NodeNotFound(node.id))?; - - assert_eq!(loaded.annotations.len(), 1); - assert_eq!( - loaded.annotations[0].visibility, - fidget_spinner_core::AnnotationVisibility::HiddenByDefault - ); - Ok(()) - } +fn count_rows(connection: &Connection, table: &str) -> Result<u64, StoreError> { + let sql = format!("SELECT COUNT(*) FROM {table}"); + connection + .query_row(&sql, [], |row| row.get::<_, u64>(0)) + .map_err(StoreError::from) +} - #[test] - fn frontier_projection_tracks_experiment_counts() -> Result<(), super::StoreError> { - let root = temp_project_root("frontier"); - let mut store = ProjectStore::init( - &root, - NonEmptyText::new("test project")?, - NonEmptyText::new("local.test")?, - )?; - let projection = store.create_frontier(CreateFrontierRequest { - label: NonEmptyText::new("optimization frontier")?, - contract_title: NonEmptyText::new("contract root")?, - contract_summary: None, - contract: FrontierContract { - objective: NonEmptyText::new("improve wall time")?, - evaluation: EvaluationProtocol { - benchmark_suites: BTreeSet::from([NonEmptyText::new("smoke")?]), - primary_metric: MetricSpec { - metric_key: NonEmptyText::new("wall_clock_s")?, - unit: MetricUnit::Seconds, - objective: OptimizationObjective::Minimize, - }, - supporting_metrics: BTreeSet::new(), - }, - promotion_criteria: vec![NonEmptyText::new("strict speedup")?], - }, - })?; +fn count_rows_where( + connection: &Connection, + table: &str, + predicate: &str, +) -> Result<u64, StoreError> { + let sql = format!("SELECT COUNT(*) FROM {table} WHERE {predicate}"); + connection + .query_row(&sql, [], |row| row.get::<_, u64>(0)) + .map_err(StoreError::from) +} - assert_eq!(projection.open_experiment_count, 0); - assert_eq!(projection.completed_experiment_count, 0); - assert_eq!(projection.verdict_counts.accepted, 0); - assert_eq!(projection.verdict_counts.kept, 0); - assert_eq!(projection.verdict_counts.parked, 0); - assert_eq!(projection.verdict_counts.rejected, 0); - Ok(()) +fn apply_limit<T>(items: Vec<T>, limit: Option<u32>) -> Vec<T> { + if let Some(limit) = limit { + items.into_iter().take(limit as usize).collect() + } else { + items } +} - #[test] - fn list_nodes_hides_archived_by_default() -> Result<(), super::StoreError> { - let root = temp_project_root("archive"); - let mut store = ProjectStore::init( - &root, - NonEmptyText::new("test project")?, - NonEmptyText::new("local.test")?, - )?; - let node = store.add_node(CreateNodeRequest { - class: NodeClass::Note, - frontier_id: None, - title: NonEmptyText::new("quick note")?, - summary: Some(NonEmptyText::new("quick note summary")?), - tags: Some(BTreeSet::new()), - payload: NodePayload::with_schema( - store.schema().schema_ref(), - super::json_object(json!({"body": "hello"}))?, - ), - annotations: Vec::new(), - attachments: Vec::new(), - })?; - store.archive_node(node.id)?; - - let visible = store.list_nodes(ListNodesQuery::default())?; - let hidden = store.list_nodes(ListNodesQuery { - include_archived: true, - ..ListNodesQuery::default() - })?; - - assert!(visible.is_empty()); - assert_eq!(hidden.len(), 1); - Ok(()) +fn apply_optional_text_patch<T>(patch: Option<TextPatch<T>>, current: Option<T>) -> Option<T> { + match patch { + None => current, + Some(TextPatch::Set(value)) => Some(value), + Some(TextPatch::Clear) => None, } +} - #[test] - fn frontier_filter_includes_root_contract_node() -> Result<(), super::StoreError> { - let root = temp_project_root("contract-filter"); - let mut store = ProjectStore::init( - &root, - NonEmptyText::new("test project")?, - NonEmptyText::new("local.test")?, - )?; - let projection = store.create_frontier(CreateFrontierRequest { - label: NonEmptyText::new("frontier")?, - contract_title: NonEmptyText::new("root contract")?, - contract_summary: None, - contract: FrontierContract { - objective: NonEmptyText::new("optimize")?, - evaluation: EvaluationProtocol { - benchmark_suites: BTreeSet::from([NonEmptyText::new("smoke")?]), - primary_metric: MetricSpec { - metric_key: NonEmptyText::new("wall_clock_s")?, - unit: MetricUnit::Seconds, - objective: OptimizationObjective::Minimize, - }, - supporting_metrics: BTreeSet::new(), - }, - promotion_criteria: vec![NonEmptyText::new("faster")?], - }, - })?; - - let nodes = store.list_nodes(ListNodesQuery { - frontier_id: Some(projection.frontier.id), - ..ListNodesQuery::default() - })?; +fn write_json_file<T: Serialize>(path: &Utf8Path, value: &T) -> Result<(), StoreError> { + let bytes = serde_json::to_vec_pretty(value)?; + fs::write(path.as_std_path(), bytes)?; + Ok(()) +} - assert_eq!(nodes.len(), 1); - assert_eq!(nodes[0].class, NodeClass::Contract); - Ok(()) - } +fn read_json_file<T: for<'de> Deserialize<'de>>(path: &Utf8Path) -> Result<T, StoreError> { + let bytes = fs::read(path.as_std_path())?; + serde_json::from_slice(&bytes).map_err(StoreError::from) +} - #[test] - fn notes_require_explicit_tags_even_when_empty() -> Result<(), super::StoreError> { - let root = temp_project_root("note-tags-required"); - let mut store = ProjectStore::init( - &root, - NonEmptyText::new("test project")?, - NonEmptyText::new("local.test")?, - )?; +fn encode_json<T: Serialize>(value: &T) -> Result<String, StoreError> { + serde_json::to_string(value).map_err(StoreError::from) +} - let result = store.add_node(CreateNodeRequest { - class: NodeClass::Note, - frontier_id: None, - title: NonEmptyText::new("quick note")?, - summary: Some(NonEmptyText::new("quick note summary")?), - tags: None, - payload: NodePayload::with_schema( - store.schema().schema_ref(), - super::json_object(json!({"body": "hello"}))?, - ), - annotations: Vec::new(), - attachments: Vec::new(), - }); +fn decode_json<T: for<'de> Deserialize<'de>>(raw: &str) -> Result<T, StoreError> { + serde_json::from_str(raw).map_err(StoreError::from) +} - assert!(matches!(result, Err(super::StoreError::NoteTagsRequired))); - Ok(()) - } +fn encode_timestamp(timestamp: OffsetDateTime) -> Result<String, StoreError> { + timestamp.format(&Rfc3339).map_err(StoreError::from) +} - #[test] - fn tags_round_trip_and_filter_node_list() -> Result<(), super::StoreError> { - let root = temp_project_root("tag-roundtrip"); - let mut store = ProjectStore::init( - &root, - NonEmptyText::new("test project")?, - NonEmptyText::new("local.test")?, - )?; - let cuts = store.add_tag( - TagName::new("cuts/core")?, - NonEmptyText::new("Core cutset work")?, - )?; - let heuristics = store.add_tag( - TagName::new("heuristic")?, - NonEmptyText::new("Heuristic tuning")?, - )?; - let note = store.add_node(CreateNodeRequest { - class: NodeClass::Note, - frontier_id: None, - title: NonEmptyText::new("tagged note")?, - summary: Some(NonEmptyText::new("tagged note summary")?), - tags: Some(BTreeSet::from([cuts.name.clone(), heuristics.name.clone()])), - payload: NodePayload::with_schema( - store.schema().schema_ref(), - super::json_object(json!({"body": "tagged"}))?, - ), - annotations: Vec::new(), - attachments: Vec::new(), - })?; +fn decode_timestamp(raw: &str) -> Result<OffsetDateTime, time::error::Parse> { + OffsetDateTime::parse(raw, &Rfc3339) +} - let loaded = store - .get_node(note.id)? - .ok_or(super::StoreError::NodeNotFound(note.id))?; - assert_eq!(loaded.tags.len(), 2); +fn state_root(project_root: &Utf8Path) -> Utf8PathBuf { + project_root.join(STORE_DIR_NAME) +} - let filtered = store.list_nodes(ListNodesQuery { - tags: BTreeSet::from([cuts.name]), - ..ListNodesQuery::default() - })?; - assert_eq!(filtered.len(), 1); - assert_eq!(filtered[0].tags.len(), 2); - Ok(()) +#[must_use] +pub fn discover_project_root(path: impl AsRef<Utf8Path>) -> Option<Utf8PathBuf> { + let mut cursor = discovery_start(path.as_ref()); + loop { + if state_root(&cursor).exists() { + return Some(cursor); + } + let parent = cursor.parent()?; + cursor = parent.to_path_buf(); } +} - #[test] - fn prose_nodes_require_summary_and_body() -> Result<(), super::StoreError> { - let root = temp_project_root("prose-summary"); - let mut store = ProjectStore::init( - &root, - NonEmptyText::new("test project")?, - NonEmptyText::new("local.test")?, - )?; - - let missing_summary = store.add_node(CreateNodeRequest { - class: NodeClass::Source, - frontier_id: None, - title: NonEmptyText::new("research note")?, - summary: None, - tags: None, - payload: NodePayload::with_schema( - store.schema().schema_ref(), - super::json_object(json!({"body": "research body"}))?, - ), - annotations: Vec::new(), - attachments: Vec::new(), - }); - assert!(matches!( - missing_summary, - Err(super::StoreError::ProseSummaryRequired(NodeClass::Source)) - )); - - let missing_body = store.add_node(CreateNodeRequest { - class: NodeClass::Note, - frontier_id: None, - title: NonEmptyText::new("quick note")?, - summary: Some(NonEmptyText::new("quick note summary")?), - tags: Some(BTreeSet::new()), - payload: NodePayload::with_schema(store.schema().schema_ref(), serde_json::Map::new()), - annotations: Vec::new(), - attachments: Vec::new(), - }); - assert!(matches!( - missing_body, - Err(super::StoreError::ProseBodyRequired(NodeClass::Note)) - )); - Ok(()) +fn discovery_start(path: &Utf8Path) -> Utf8PathBuf { + match fs::metadata(path.as_std_path()) { + Ok(metadata) if metadata.is_file() => path + .parent() + .map_or_else(|| path.to_path_buf(), Utf8Path::to_path_buf), + _ => path.to_path_buf(), } +} - #[test] - fn opening_store_backfills_missing_prose_summaries() -> Result<(), super::StoreError> { - let root = temp_project_root("summary-backfill"); - let mut store = ProjectStore::init( - &root, - NonEmptyText::new("test project")?, - NonEmptyText::new("local.test")?, - )?; - let node = store.add_node(CreateNodeRequest { - class: NodeClass::Source, - frontier_id: None, - title: NonEmptyText::new("research note")?, - summary: Some(NonEmptyText::new("temporary summary")?), - tags: None, - payload: NodePayload::with_schema( - store.schema().schema_ref(), - super::json_object(json!({"body": "First paragraph.\n\nSecond paragraph."}))?, - ), - annotations: Vec::new(), - attachments: Vec::new(), - })?; - drop(store); - - let connection = rusqlite::Connection::open( - root.join(super::STORE_DIR_NAME) - .join(super::STATE_DB_NAME) - .as_std_path(), - )?; - let _ = connection.execute( - "UPDATE nodes SET summary = NULL WHERE id = ?1", - rusqlite::params![node.id.to_string()], - )?; - drop(connection); - - let reopened = ProjectStore::open(&root)?; - let loaded = reopened - .get_node(node.id)? - .ok_or(super::StoreError::NodeNotFound(node.id))?; - assert_eq!( - loaded.summary.as_ref().map(NonEmptyText::as_str), - Some("First paragraph.") - ); - Ok(()) - } +fn to_sql_conversion_error(error: StoreError) -> rusqlite::Error { + rusqlite::Error::FromSqlConversionFailure(0, rusqlite::types::Type::Text, Box::new(error)) +} - #[test] - fn schema_field_upsert_remove_persists_and_bumps_version() -> Result<(), super::StoreError> { - let root = temp_project_root("schema-upsert-remove"); - let mut store = ProjectStore::init( - &root, - NonEmptyText::new("test project")?, - NonEmptyText::new("local.test")?, - )?; - let initial_version = store.schema().version; - - let field = store.upsert_schema_field(UpsertSchemaFieldRequest { - name: NonEmptyText::new("scenario")?, - node_classes: BTreeSet::from([NodeClass::Hypothesis, NodeClass::Analysis]), - presence: FieldPresence::Recommended, - severity: DiagnosticSeverity::Warning, - role: FieldRole::ProjectionGate, - inference_policy: InferencePolicy::ManualOnly, - value_type: Some(FieldValueType::String), - })?; - assert_eq!(field.name.as_str(), "scenario"); - assert_eq!(store.schema().version, initial_version + 1); - assert!( - store - .schema() - .fields - .iter() - .any(|item| item.name.as_str() == "scenario") - ); - drop(store); - - let mut reopened = ProjectStore::open(&root)?; - assert_eq!(reopened.schema().version, initial_version + 1); - assert!( - reopened - .schema() - .fields - .iter() - .any(|item| item.name.as_str() == "scenario") - ); +fn core_to_sql_conversion_error(error: CoreError) -> rusqlite::Error { + to_sql_conversion_error(StoreError::from(error)) +} - let removed = reopened.remove_schema_field(RemoveSchemaFieldRequest { - name: NonEmptyText::new("scenario")?, - node_classes: Some(BTreeSet::from([NodeClass::Hypothesis, NodeClass::Analysis])), - })?; - assert_eq!(removed, 1); - assert_eq!(reopened.schema().version, initial_version + 2); - assert!( - !reopened - .schema() - .fields - .iter() - .any(|item| item.name.as_str() == "scenario") - ); - Ok(()) - } +fn uuid_to_sql_conversion_error(error: uuid::Error) -> rusqlite::Error { + to_sql_conversion_error(StoreError::from(error)) +} - #[test] - fn metric_queries_surface_canonical_and_payload_numeric_fields() -> Result<(), super::StoreError> - { - let root = temp_project_root("metric-best"); - let mut store = ProjectStore::init( - &root, - NonEmptyText::new("test project")?, - NonEmptyText::new("local.test")?, - )?; - let projection = store.create_frontier(CreateFrontierRequest { - label: NonEmptyText::new("optimization frontier")?, - contract_title: NonEmptyText::new("contract root")?, - contract_summary: None, - contract: FrontierContract { - objective: NonEmptyText::new("improve wall time")?, - evaluation: EvaluationProtocol { - benchmark_suites: BTreeSet::from([NonEmptyText::new("smoke")?]), - primary_metric: MetricSpec { - metric_key: NonEmptyText::new("wall_clock_s")?, - unit: MetricUnit::Seconds, - objective: OptimizationObjective::Minimize, - }, - supporting_metrics: BTreeSet::new(), - }, - promotion_criteria: vec![NonEmptyText::new("strict speedup")?], - }, - })?; - let frontier_id = projection.frontier.id; - let _ = store.define_metric(DefineMetricRequest { - key: NonEmptyText::new("wall_clock_s")?, - unit: MetricUnit::Seconds, - objective: OptimizationObjective::Minimize, - description: Some(NonEmptyText::new("elapsed wall time")?), - })?; - let _ = store.define_run_dimension(DefineRunDimensionRequest { - key: NonEmptyText::new("scenario")?, - value_type: FieldValueType::String, - description: Some(NonEmptyText::new("workload family")?), - })?; - let _ = store.define_run_dimension(DefineRunDimensionRequest { - key: NonEmptyText::new("duration_s")?, - value_type: FieldValueType::Numeric, - description: Some(NonEmptyText::new("time budget in seconds")?), - })?; +fn time_to_sql_conversion_error(error: time::error::Parse) -> rusqlite::Error { + to_sql_conversion_error(StoreError::from(error)) +} - let first_hypothesis = store.add_node(CreateNodeRequest { - class: NodeClass::Hypothesis, - frontier_id: Some(frontier_id), - title: NonEmptyText::new("first hypothesis")?, - summary: Some(NonEmptyText::new("first hypothesis summary")?), - tags: None, - payload: NodePayload::with_schema( - store.schema().schema_ref(), - super::json_object(json!({"body": "first body", "latency_hint": 14.0}))?, - ), - annotations: Vec::new(), - attachments: Vec::new(), - })?; - let second_hypothesis = store.add_node(CreateNodeRequest { - class: NodeClass::Hypothesis, - frontier_id: Some(frontier_id), - title: NonEmptyText::new("second hypothesis")?, - summary: Some(NonEmptyText::new("second hypothesis summary")?), - tags: None, - payload: NodePayload::with_schema( - store.schema().schema_ref(), - super::json_object(json!({"body": "second body", "latency_hint": 7.0}))?, - ), - annotations: Vec::new(), - attachments: Vec::new(), - })?; - let first_experiment = store.open_experiment(open_experiment_request( - frontier_id, - first_hypothesis.id, - "first experiment", - )?)?; - let second_experiment = store.open_experiment(open_experiment_request( - frontier_id, - second_hypothesis.id, - "second experiment", - )?)?; - - let _first_receipt = store.close_experiment(experiment_request( - &root, - first_experiment.id, - "first run", - 10.0, - run_dimensions("belt_4x5", 20.0)?, - )?)?; - let second_receipt = store.close_experiment(experiment_request( - &root, - second_experiment.id, - "second run", - 5.0, - run_dimensions("belt_4x5", 60.0)?, - )?)?; - - let keys = store.list_metric_keys()?; - assert!(keys.iter().any(|key| { - key.key.as_str() == "wall_clock_s" && key.source == MetricFieldSource::RunMetric - })); - assert!(keys.iter().any(|key| { - key.key.as_str() == "latency_hint" && key.source == MetricFieldSource::HypothesisPayload - })); - assert!(keys.iter().any(|key| { - key.key.as_str() == "wall_clock_s" - && key.source == MetricFieldSource::RunMetric - && key.description.as_ref().map(NonEmptyText::as_str) == Some("elapsed wall time") - })); - - let filtered_keys = store.list_metric_keys_filtered(MetricKeyQuery { - frontier_id: Some(frontier_id), - source: Some(MetricFieldSource::RunMetric), - dimensions: run_dimensions("belt_4x5", 60.0)?, - })?; - assert_eq!(filtered_keys.len(), 1); - assert_eq!(filtered_keys[0].experiment_count, 1); - - let dimension_summaries = store.list_run_dimensions()?; - assert!(dimension_summaries.iter().any(|dimension| { - dimension.key.as_str() == "benchmark_suite" - && dimension.value_type == FieldValueType::String - && dimension.observed_run_count == 2 - })); - assert!(dimension_summaries.iter().any(|dimension| { - dimension.key.as_str() == "scenario" - && dimension.description.as_ref().map(NonEmptyText::as_str) - == Some("workload family") - })); - assert!(dimension_summaries.iter().any(|dimension| { - dimension.key.as_str() == "duration_s" - && dimension.value_type == FieldValueType::Numeric - && dimension.distinct_value_count == 2 - })); - - let canonical_best = store.best_metrics(MetricBestQuery { - key: NonEmptyText::new("wall_clock_s")?, - frontier_id: Some(frontier_id), - source: Some(MetricFieldSource::RunMetric), - dimensions: run_dimensions("belt_4x5", 60.0)?, - order: None, - limit: 5, - })?; - assert_eq!(canonical_best.len(), 1); - assert_eq!(canonical_best[0].value, 5.0); - assert_eq!( - canonical_best[0].experiment_title.as_str(), - "second experiment" - ); - assert_eq!(canonical_best[0].verdict, FrontierVerdict::Kept); - assert_eq!( - canonical_best[0] - .dimensions - .get(&NonEmptyText::new("duration_s")?), - Some(&RunDimensionValue::Numeric(60.0)) - ); +fn parse_non_empty_text(raw: &str) -> Result<NonEmptyText, rusqlite::Error> { + NonEmptyText::new(raw.to_owned()).map_err(core_to_sql_conversion_error) +} - let payload_best = store.best_metrics(MetricBestQuery { - key: NonEmptyText::new("latency_hint")?, - frontier_id: Some(frontier_id), - source: Some(MetricFieldSource::HypothesisPayload), - dimensions: run_dimensions("belt_4x5", 60.0)?, - order: Some(MetricRankOrder::Asc), - limit: 5, - })?; - assert_eq!(payload_best.len(), 1); - assert_eq!(payload_best[0].value, 7.0); - assert_eq!(payload_best[0].hypothesis_node_id, second_hypothesis.id); - - let missing_order = store.best_metrics(MetricBestQuery { - key: NonEmptyText::new("latency_hint")?, - frontier_id: Some(frontier_id), - source: Some(MetricFieldSource::HypothesisPayload), - dimensions: BTreeMap::new(), - order: None, - limit: 5, - }); - assert!(matches!( - missing_order, - Err(super::StoreError::MetricOrderRequired { .. }) - )); - assert_eq!( - second_receipt.experiment.title.as_str(), - "second experiment" - ); - Ok(()) - } +fn parse_optional_non_empty_text( + raw: Option<String>, +) -> Result<Option<NonEmptyText>, rusqlite::Error> { + raw.map(|value| parse_non_empty_text(&value)).transpose() +} - #[test] - fn opening_store_backfills_legacy_benchmark_suite_dimensions() -> Result<(), super::StoreError> - { - let root = temp_project_root("metric-plane-backfill"); - let mut store = ProjectStore::init( - &root, - NonEmptyText::new("test project")?, - NonEmptyText::new("local.test")?, - )?; - let projection = store.create_frontier(CreateFrontierRequest { - label: NonEmptyText::new("migration frontier")?, - contract_title: NonEmptyText::new("migration contract")?, - contract_summary: None, - contract: FrontierContract { - objective: NonEmptyText::new("exercise metric migration")?, - evaluation: EvaluationProtocol { - benchmark_suites: BTreeSet::from([NonEmptyText::new("smoke")?]), - primary_metric: MetricSpec { - metric_key: NonEmptyText::new("wall_clock_s")?, - unit: MetricUnit::Seconds, - objective: OptimizationObjective::Minimize, - }, - supporting_metrics: BTreeSet::new(), - }, - promotion_criteria: vec![NonEmptyText::new("keep the metric plane queryable")?], - }, - })?; - let frontier_id = projection.frontier.id; - let hypothesis = store.add_node(CreateNodeRequest { - class: NodeClass::Hypothesis, - frontier_id: Some(frontier_id), - title: NonEmptyText::new("candidate hypothesis")?, - summary: Some(NonEmptyText::new("candidate hypothesis summary")?), - tags: None, - payload: NodePayload::with_schema( - store.schema().schema_ref(), - super::json_object(json!({"latency_hint": 9.0}))?, - ), - annotations: Vec::new(), - attachments: Vec::new(), - })?; - let experiment = store.open_experiment(open_experiment_request( - frontier_id, - hypothesis.id, - "migration experiment", - )?)?; - let _ = store.close_experiment(experiment_request( - &root, - experiment.id, - "migration run", - 11.0, - BTreeMap::from([( - NonEmptyText::new("benchmark_suite")?, - RunDimensionValue::String(NonEmptyText::new("smoke")?), - )]), - )?)?; - drop(store); - - let connection = rusqlite::Connection::open( - root.join(super::STORE_DIR_NAME) - .join(super::STATE_DB_NAME) - .as_std_path(), - )?; - let _ = connection.execute("DELETE FROM run_dimensions", [])?; - drop(connection); - - let reopened = ProjectStore::open(&root)?; - let dimensions = reopened.list_run_dimensions()?; - assert!(dimensions.iter().any(|dimension| { - dimension.key.as_str() == "benchmark_suite" && dimension.observed_run_count == 1 - })); - - let best = reopened.best_metrics(MetricBestQuery { - key: NonEmptyText::new("wall_clock_s")?, - frontier_id: Some(frontier_id), - source: Some(MetricFieldSource::RunMetric), - dimensions: BTreeMap::from([( - NonEmptyText::new("benchmark_suite")?, - RunDimensionValue::String(NonEmptyText::new("smoke")?), - )]), - order: None, - limit: 5, - })?; - assert_eq!(best.len(), 1); - assert_eq!(best[0].value, 11.0); - Ok(()) - } +fn parse_slug(raw: &str) -> Result<Slug, rusqlite::Error> { + Slug::new(raw.to_owned()).map_err(core_to_sql_conversion_error) +} - fn open_experiment_request( - frontier_id: fidget_spinner_core::FrontierId, - hypothesis_node_id: fidget_spinner_core::NodeId, - title: &str, - ) -> Result<OpenExperimentRequest, super::StoreError> { - Ok(OpenExperimentRequest { - frontier_id, - hypothesis_node_id, - title: NonEmptyText::new(title)?, - summary: Some(NonEmptyText::new(format!("{title} summary"))?), - }) - } +fn parse_tag_name(raw: &str) -> Result<TagName, rusqlite::Error> { + TagName::new(raw.to_owned()).map_err(core_to_sql_conversion_error) +} - fn experiment_request( - root: &camino::Utf8Path, - experiment_id: fidget_spinner_core::ExperimentId, - run_title: &str, - wall_clock_s: f64, - dimensions: BTreeMap<NonEmptyText, RunDimensionValue>, - ) -> Result<CloseExperimentRequest, super::StoreError> { - Ok(CloseExperimentRequest { - experiment_id, - run_title: NonEmptyText::new(run_title)?, - run_summary: Some(NonEmptyText::new("run summary")?), - backend: fidget_spinner_core::ExecutionBackend::WorktreeProcess, - dimensions, - command: CommandRecipe::new( - root.to_path_buf(), - vec![NonEmptyText::new("true")?], - BTreeMap::new(), - )?, - primary_metric: MetricValue { - key: NonEmptyText::new("wall_clock_s")?, - value: wall_clock_s, - }, - supporting_metrics: Vec::new(), - note: FrontierNote { - summary: NonEmptyText::new("note summary")?, - next_hypotheses: Vec::new(), - }, - verdict: FrontierVerdict::Kept, - analysis: None, - decision_title: NonEmptyText::new("decision")?, - decision_rationale: NonEmptyText::new("decision rationale")?, - }) - } +fn parse_uuid_sql(raw: &str) -> Result<Uuid, rusqlite::Error> { + Uuid::parse_str(raw).map_err(uuid_to_sql_conversion_error) +} - fn run_dimensions( - scenario: &str, - duration_s: f64, - ) -> Result<BTreeMap<NonEmptyText, RunDimensionValue>, super::StoreError> { - Ok(BTreeMap::from([ - ( - NonEmptyText::new("benchmark_suite")?, - RunDimensionValue::String(NonEmptyText::new("smoke")?), - ), - ( - NonEmptyText::new("scenario")?, - RunDimensionValue::String(NonEmptyText::new(scenario)?), - ), - ( - NonEmptyText::new("duration_s")?, - RunDimensionValue::Numeric(duration_s), - ), - ])) - } +fn parse_timestamp_sql(raw: &str) -> Result<OffsetDateTime, rusqlite::Error> { + decode_timestamp(raw).map_err(time_to_sql_conversion_error) } |