swarm repositories / source
aboutsummaryrefslogtreecommitdiff
path: root/crates/fidget-spinner-store-sqlite/src/lib.rs
diff options
context:
space:
mode:
authormain <main@swarm.moe>2026-03-20 16:00:30 -0400
committermain <main@swarm.moe>2026-03-20 16:00:30 -0400
commit9d63844f3a28fde70b19500422f17379e99e588a (patch)
tree163cfbd65a8d3528346561410ef39eb1183a16f2 /crates/fidget-spinner-store-sqlite/src/lib.rs
parent22fe3d2ce7478450a1d7443c4ecbd85fd4c46716 (diff)
downloadfidget_spinner-9d63844f3a28fde70b19500422f17379e99e588a.zip
Refound Spinner as an austere frontier ledger
Diffstat (limited to 'crates/fidget-spinner-store-sqlite/src/lib.rs')
-rw-r--r--crates/fidget-spinner-store-sqlite/src/lib.rs6259
1 files changed, 2698 insertions, 3561 deletions
diff --git a/crates/fidget-spinner-store-sqlite/src/lib.rs b/crates/fidget-spinner-store-sqlite/src/lib.rs
index bbe7038..3680471 100644
--- a/crates/fidget-spinner-store-sqlite/src/lib.rs
+++ b/crates/fidget-spinner-store-sqlite/src/lib.rs
@@ -1,23 +1,20 @@
-use std::cmp::Ordering;
use std::collections::{BTreeMap, BTreeSet};
-use std::fmt::Write as _;
use std::fs;
use std::io;
use camino::{Utf8Path, Utf8PathBuf};
use fidget_spinner_core::{
- AnnotationVisibility, CommandRecipe, CompletedExperiment, DagEdge, DagNode, DiagnosticSeverity,
- EdgeKind, ExecutionBackend, ExperimentResult, FieldPresence, FieldRole, FieldValueType,
- FrontierContract, FrontierNote, FrontierProjection, FrontierRecord, FrontierStatus,
- FrontierVerdict, FrontierVerdictCounts, InferencePolicy, JsonObject, MetricDefinition,
- MetricSpec, MetricUnit, MetricValue, NodeAnnotation, NodeClass, NodeDiagnostics, NodePayload,
- NonEmptyText, OpenExperiment, OptimizationObjective, ProjectFieldSpec, ProjectSchema,
- RunDimensionDefinition, RunDimensionValue, RunRecord, RunStatus, TagName, TagRecord,
+ ArtifactId, ArtifactKind, ArtifactRecord, AttachmentTargetRef, CommandRecipe, CoreError,
+ ExecutionBackend, ExperimentAnalysis, ExperimentId, ExperimentOutcome, ExperimentRecord,
+ ExperimentStatus, FieldValueType, FrontierBrief, FrontierId, FrontierRecord,
+ FrontierRoadmapItem, FrontierStatus, FrontierVerdict, HypothesisId, HypothesisRecord,
+ MetricDefinition, MetricUnit, MetricValue, MetricVisibility, NonEmptyText,
+ OptimizationObjective, RunDimensionDefinition, RunDimensionValue, Slug, TagName, TagRecord,
+ VertexRef,
};
-use rusqlite::types::Value as SqlValue;
-use rusqlite::{Connection, OptionalExtension, Transaction, params, params_from_iter};
+use rusqlite::{Connection, OptionalExtension, Transaction, params};
use serde::{Deserialize, Serialize};
-use serde_json::{Value, json};
+use serde_json::Value;
use thiserror::Error;
use time::OffsetDateTime;
use time::format_description::well_known::Rfc3339;
@@ -26,8 +23,7 @@ use uuid::Uuid;
pub const STORE_DIR_NAME: &str = ".fidget_spinner";
pub const STATE_DB_NAME: &str = "state.sqlite";
pub const PROJECT_CONFIG_NAME: &str = "project.json";
-pub const PROJECT_SCHEMA_NAME: &str = "schema.json";
-pub const CURRENT_STORE_FORMAT_VERSION: u32 = 3;
+pub const CURRENT_STORE_FORMAT_VERSION: u32 = 4;
#[derive(Debug, Error)]
pub enum StoreError {
@@ -49,17 +45,11 @@ pub enum StoreError {
#[error("time format failure")]
TimeFormat(#[from] time::error::Format),
#[error("core domain failure")]
- Core(#[from] fidget_spinner_core::CoreError),
+ Core(#[from] CoreError),
#[error("UUID parse failure")]
Uuid(#[from] uuid::Error),
- #[error("node {0} was not found")]
- NodeNotFound(fidget_spinner_core::NodeId),
- #[error("frontier {0} was not found")]
- FrontierNotFound(fidget_spinner_core::FrontierId),
- #[error("experiment {0} was not found")]
- ExperimentNotFound(fidget_spinner_core::ExperimentId),
- #[error("node {0} is not a hypothesis node")]
- NodeNotHypothesis(fidget_spinner_core::NodeId),
+ #[error("{0}")]
+ InvalidInput(String),
#[error(
"project store format {observed} is incompatible with this binary (expected {expected}); reinitialize the store"
)]
@@ -68,50 +58,53 @@ pub enum StoreError {
UnknownTag(TagName),
#[error("tag `{0}` already exists")]
DuplicateTag(TagName),
- #[error("note nodes require an explicit tag list; use an empty list if no tags apply")]
- NoteTagsRequired,
- #[error("{0} nodes require a non-empty summary")]
- ProseSummaryRequired(NodeClass),
- #[error("{0} nodes require a non-empty string payload field `body`")]
- ProseBodyRequired(NodeClass),
#[error("metric `{0}` is not registered")]
UnknownMetricDefinition(NonEmptyText),
- #[error(
- "metric `{key}` conflicts with existing definition ({existing_unit}/{existing_objective} vs {new_unit}/{new_objective})"
- )]
- ConflictingMetricDefinition {
- key: String,
- existing_unit: String,
- existing_objective: String,
- new_unit: String,
- new_objective: String,
- },
+ #[error("metric `{0}` already exists")]
+ DuplicateMetricDefinition(NonEmptyText),
#[error("run dimension `{0}` is not registered")]
UnknownRunDimension(NonEmptyText),
#[error("run dimension `{0}` already exists")]
DuplicateRunDimension(NonEmptyText),
+ #[error("frontier selector `{0}` did not resolve")]
+ UnknownFrontierSelector(String),
+ #[error("hypothesis selector `{0}` did not resolve")]
+ UnknownHypothesisSelector(String),
+ #[error("experiment selector `{0}` did not resolve")]
+ UnknownExperimentSelector(String),
+ #[error("artifact selector `{0}` did not resolve")]
+ UnknownArtifactSelector(String),
#[error(
- "run dimension `{key}` conflicts with existing definition ({existing_type} vs {new_type})"
+ "entity revision mismatch for {kind} `{selector}`: expected {expected}, observed {observed}"
)]
- ConflictingRunDimensionDefinition {
- key: String,
- existing_type: String,
- new_type: String,
+ RevisionMismatch {
+ kind: &'static str,
+ selector: String,
+ expected: u64,
+ observed: u64,
},
- #[error("run dimension `{key}` expects {expected} values, got {observed}")]
- InvalidRunDimensionValue {
- key: String,
- expected: String,
- observed: String,
- },
- #[error("schema field `{0}` was not found")]
- SchemaFieldNotFound(String),
- #[error("metric key `{key}` is ambiguous across sources: {sources}")]
- AmbiguousMetricKey { key: String, sources: String },
- #[error("metric key `{key}` for source `{metric_source}` requires an explicit order")]
- MetricOrderRequired { key: String, metric_source: String },
- #[error("metric key `{key}` for source `{metric_source}` has conflicting semantics")]
- MetricSemanticsAmbiguous { key: String, metric_source: String },
+ #[error("hypothesis body must be exactly one paragraph")]
+ HypothesisBodyMustBeSingleParagraph,
+ #[error("experiments must hang off exactly one hypothesis")]
+ ExperimentHypothesisRequired,
+ #[error("experiment `{0}` is already closed")]
+ ExperimentAlreadyClosed(ExperimentId),
+ #[error("experiment `{0}` is still open")]
+ ExperimentStillOpen(ExperimentId),
+ #[error("influence edge crosses frontier scope")]
+ CrossFrontierInfluence,
+ #[error("self edges are not allowed")]
+ SelfEdge,
+ #[error("unknown roadmap hypothesis `{0}`")]
+ UnknownRoadmapHypothesis(String),
+ #[error(
+ "manual experiments may omit command context only by using an empty argv surrogate explicitly"
+ )]
+ ManualExperimentRequiresCommand,
+ #[error("metric key `{key}` requires an explicit ranking order")]
+ MetricOrderRequired { key: String },
+ #[error("dimension filter references unknown run dimension `{0}`")]
+ UnknownDimensionFilter(String),
}
#[derive(Clone, Debug, Deserialize, Eq, PartialEq, Serialize)]
@@ -132,181 +125,294 @@ impl ProjectConfig {
}
}
-#[derive(Clone, Debug)]
-pub struct CreateNodeRequest {
- pub class: NodeClass,
- pub frontier_id: Option<fidget_spinner_core::FrontierId>,
- pub title: NonEmptyText,
- pub summary: Option<NonEmptyText>,
- pub tags: Option<BTreeSet<TagName>>,
- pub payload: NodePayload,
- pub annotations: Vec<NodeAnnotation>,
- pub attachments: Vec<EdgeAttachment>,
+#[derive(Clone, Debug, Deserialize, Eq, PartialEq, Serialize)]
+pub struct ProjectStatus {
+ pub project_root: Utf8PathBuf,
+ pub display_name: NonEmptyText,
+ pub store_format_version: u32,
+ pub frontier_count: u64,
+ pub hypothesis_count: u64,
+ pub experiment_count: u64,
+ pub open_experiment_count: u64,
+ pub artifact_count: u64,
}
#[derive(Clone, Copy, Debug, Deserialize, Eq, PartialEq, Serialize)]
-pub enum EdgeAttachmentDirection {
- ExistingToNew,
- NewToExisting,
+#[serde(rename_all = "snake_case")]
+pub enum MetricScope {
+ Live,
+ Visible,
+ All,
+}
+
+#[derive(Clone, Copy, Debug, Deserialize, Eq, PartialEq, Serialize)]
+#[serde(rename_all = "snake_case")]
+pub enum MetricRankOrder {
+ Asc,
+ Desc,
}
#[derive(Clone, Debug, Deserialize, Eq, PartialEq, Serialize)]
-pub struct EdgeAttachment {
- pub node_id: fidget_spinner_core::NodeId,
- pub kind: EdgeKind,
- pub direction: EdgeAttachmentDirection,
+#[serde(tag = "kind", content = "selector", rename_all = "snake_case")]
+pub enum VertexSelector {
+ Hypothesis(String),
+ Experiment(String),
}
-impl EdgeAttachment {
- #[must_use]
- pub fn materialize(&self, new_node_id: fidget_spinner_core::NodeId) -> DagEdge {
- match self.direction {
- EdgeAttachmentDirection::ExistingToNew => DagEdge {
- source_id: self.node_id,
- target_id: new_node_id,
- kind: self.kind,
- },
- EdgeAttachmentDirection::NewToExisting => DagEdge {
- source_id: new_node_id,
- target_id: self.node_id,
- kind: self.kind,
- },
- }
- }
+#[derive(Clone, Debug, Deserialize, Eq, PartialEq, Serialize)]
+#[serde(tag = "kind", content = "selector", rename_all = "snake_case")]
+pub enum AttachmentSelector {
+ Frontier(String),
+ Hypothesis(String),
+ Experiment(String),
+}
+
+#[derive(Clone, Debug)]
+pub struct CreateFrontierRequest {
+ pub label: NonEmptyText,
+ pub objective: NonEmptyText,
+ pub slug: Option<Slug>,
+}
+
+#[derive(Clone, Debug, Deserialize, Eq, PartialEq, Serialize)]
+pub struct FrontierSummary {
+ pub id: FrontierId,
+ pub slug: Slug,
+ pub label: NonEmptyText,
+ pub objective: NonEmptyText,
+ pub status: FrontierStatus,
+ pub active_hypothesis_count: u64,
+ pub open_experiment_count: u64,
+ pub updated_at: OffsetDateTime,
+}
+
+#[derive(Clone, Debug, Deserialize, Eq, PartialEq, Serialize)]
+pub struct FrontierRoadmapItemDraft {
+ pub rank: u32,
+ pub hypothesis: String,
+ pub summary: Option<NonEmptyText>,
+}
+
+#[derive(Clone, Debug)]
+pub enum TextPatch<T> {
+ Set(T),
+ Clear,
+}
+
+#[derive(Clone, Debug)]
+pub struct UpdateFrontierBriefRequest {
+ pub frontier: String,
+ pub expected_revision: Option<u64>,
+ pub situation: Option<TextPatch<NonEmptyText>>,
+ pub roadmap: Option<Vec<FrontierRoadmapItemDraft>>,
+ pub unknowns: Option<Vec<NonEmptyText>>,
}
#[derive(Clone, Debug)]
-pub struct ListNodesQuery {
- pub frontier_id: Option<fidget_spinner_core::FrontierId>,
- pub class: Option<NodeClass>,
+pub struct CreateHypothesisRequest {
+ pub frontier: String,
+ pub slug: Option<Slug>,
+ pub title: NonEmptyText,
+ pub summary: NonEmptyText,
+ pub body: NonEmptyText,
pub tags: BTreeSet<TagName>,
- pub include_archived: bool,
- pub limit: u32,
+ pub parents: Vec<VertexSelector>,
}
-impl Default for ListNodesQuery {
- fn default() -> Self {
- Self {
- frontier_id: None,
- class: None,
- tags: BTreeSet::new(),
- include_archived: false,
- limit: 20,
- }
- }
+#[derive(Clone, Debug)]
+pub struct UpdateHypothesisRequest {
+ pub hypothesis: String,
+ pub expected_revision: Option<u64>,
+ pub title: Option<NonEmptyText>,
+ pub summary: Option<NonEmptyText>,
+ pub body: Option<NonEmptyText>,
+ pub tags: Option<BTreeSet<TagName>>,
+ pub parents: Option<Vec<VertexSelector>>,
+ pub archived: Option<bool>,
+}
+
+#[derive(Clone, Debug, Default)]
+pub struct ListHypothesesQuery {
+ pub frontier: Option<String>,
+ pub tags: BTreeSet<TagName>,
+ pub include_archived: bool,
+ pub limit: Option<u32>,
}
#[derive(Clone, Debug, Deserialize, Eq, PartialEq, Serialize)]
-pub struct NodeSummary {
- pub id: fidget_spinner_core::NodeId,
- pub class: NodeClass,
- pub track: fidget_spinner_core::NodeTrack,
- pub frontier_id: Option<fidget_spinner_core::FrontierId>,
+pub struct VertexSummary {
+ pub vertex: VertexRef,
+ pub frontier_id: FrontierId,
+ pub slug: Slug,
pub archived: bool,
pub title: NonEmptyText,
pub summary: Option<NonEmptyText>,
- pub tags: BTreeSet<TagName>,
- pub diagnostic_count: u64,
- pub hidden_annotation_count: u64,
- pub created_at: OffsetDateTime,
pub updated_at: OffsetDateTime,
}
-#[derive(Clone, Copy, Debug, Deserialize, Eq, Ord, PartialEq, PartialOrd, Serialize)]
-#[serde(rename_all = "snake_case")]
-pub enum MetricFieldSource {
- RunMetric,
- HypothesisPayload,
- RunPayload,
- AnalysisPayload,
- DecisionPayload,
+#[derive(Clone, Debug, Deserialize, Eq, PartialEq, Serialize)]
+pub struct HypothesisSummary {
+ pub id: HypothesisId,
+ pub slug: Slug,
+ pub frontier_id: FrontierId,
+ pub archived: bool,
+ pub title: NonEmptyText,
+ pub summary: NonEmptyText,
+ pub tags: Vec<TagName>,
+ pub open_experiment_count: u64,
+ pub latest_verdict: Option<FrontierVerdict>,
+ pub updated_at: OffsetDateTime,
}
-impl MetricFieldSource {
- #[must_use]
- pub const fn as_str(self) -> &'static str {
- match self {
- Self::RunMetric => "run_metric",
- Self::HypothesisPayload => "hypothesis_payload",
- Self::RunPayload => "run_payload",
- Self::AnalysisPayload => "analysis_payload",
- Self::DecisionPayload => "decision_payload",
- }
- }
+#[derive(Clone, Debug, Deserialize, PartialEq, Serialize)]
+pub struct HypothesisDetail {
+ pub record: HypothesisRecord,
+ pub parents: Vec<VertexSummary>,
+ pub children: Vec<VertexSummary>,
+ pub open_experiments: Vec<ExperimentSummary>,
+ pub closed_experiments: Vec<ExperimentSummary>,
+ pub artifacts: Vec<ArtifactSummary>,
+}
- #[must_use]
- pub const fn from_payload_class(class: NodeClass) -> Option<Self> {
- match class {
- NodeClass::Hypothesis => Some(Self::HypothesisPayload),
- NodeClass::Run => Some(Self::RunPayload),
- NodeClass::Analysis => Some(Self::AnalysisPayload),
- NodeClass::Decision => Some(Self::DecisionPayload),
- NodeClass::Contract | NodeClass::Source | NodeClass::Note => None,
- }
- }
+#[derive(Clone, Debug)]
+pub struct OpenExperimentRequest {
+ pub hypothesis: String,
+ pub slug: Option<Slug>,
+ pub title: NonEmptyText,
+ pub summary: Option<NonEmptyText>,
+ pub tags: BTreeSet<TagName>,
+ pub parents: Vec<VertexSelector>,
}
-#[derive(Clone, Copy, Debug, Deserialize, Eq, Ord, PartialEq, PartialOrd, Serialize)]
-#[serde(rename_all = "snake_case")]
-pub enum MetricRankOrder {
- Asc,
- Desc,
+#[derive(Clone, Debug)]
+pub struct UpdateExperimentRequest {
+ pub experiment: String,
+ pub expected_revision: Option<u64>,
+ pub title: Option<NonEmptyText>,
+ pub summary: Option<TextPatch<NonEmptyText>>,
+ pub tags: Option<BTreeSet<TagName>>,
+ pub parents: Option<Vec<VertexSelector>>,
+ pub archived: Option<bool>,
+ pub outcome: Option<ExperimentOutcomePatch>,
}
-impl MetricRankOrder {
- #[must_use]
- pub const fn as_str(self) -> &'static str {
- match self {
- Self::Asc => "asc",
- Self::Desc => "desc",
- }
- }
+#[derive(Clone, Debug, Deserialize, PartialEq, Serialize)]
+pub struct ExperimentOutcomePatch {
+ pub backend: ExecutionBackend,
+ pub command: CommandRecipe,
+ pub dimensions: BTreeMap<NonEmptyText, RunDimensionValue>,
+ pub primary_metric: MetricValue,
+ pub supporting_metrics: Vec<MetricValue>,
+ pub verdict: FrontierVerdict,
+ pub rationale: NonEmptyText,
+ pub analysis: Option<ExperimentAnalysis>,
}
#[derive(Clone, Debug)]
-pub struct MetricBestQuery {
- pub key: NonEmptyText,
- pub frontier_id: Option<fidget_spinner_core::FrontierId>,
- pub source: Option<MetricFieldSource>,
+pub struct CloseExperimentRequest {
+ pub experiment: String,
+ pub expected_revision: Option<u64>,
+ pub backend: ExecutionBackend,
+ pub command: CommandRecipe,
pub dimensions: BTreeMap<NonEmptyText, RunDimensionValue>,
- pub order: Option<MetricRankOrder>,
- pub limit: u32,
+ pub primary_metric: MetricValue,
+ pub supporting_metrics: Vec<MetricValue>,
+ pub verdict: FrontierVerdict,
+ pub rationale: NonEmptyText,
+ pub analysis: Option<ExperimentAnalysis>,
}
-#[derive(Clone, Debug, Deserialize, Eq, PartialEq, Serialize)]
-pub struct MetricKeySummary {
- pub key: NonEmptyText,
- pub source: MetricFieldSource,
- pub experiment_count: u64,
- pub unit: Option<MetricUnit>,
- pub objective: Option<OptimizationObjective>,
- pub description: Option<NonEmptyText>,
- pub requires_order: bool,
+#[derive(Clone, Debug, Default)]
+pub struct ListExperimentsQuery {
+ pub frontier: Option<String>,
+ pub hypothesis: Option<String>,
+ pub tags: BTreeSet<TagName>,
+ pub include_archived: bool,
+ pub status: Option<ExperimentStatus>,
+ pub limit: Option<u32>,
}
#[derive(Clone, Debug, Deserialize, PartialEq, Serialize)]
-pub struct MetricBestEntry {
+pub struct MetricObservationSummary {
pub key: NonEmptyText,
- pub source: MetricFieldSource,
pub value: f64,
- pub order: MetricRankOrder,
- pub experiment_id: fidget_spinner_core::ExperimentId,
- pub experiment_title: NonEmptyText,
- pub frontier_id: fidget_spinner_core::FrontierId,
- pub hypothesis_node_id: fidget_spinner_core::NodeId,
- pub hypothesis_title: NonEmptyText,
- pub run_id: fidget_spinner_core::RunId,
- pub verdict: FrontierVerdict,
- pub unit: Option<MetricUnit>,
- pub objective: Option<OptimizationObjective>,
- pub dimensions: BTreeMap<NonEmptyText, RunDimensionValue>,
+ pub unit: MetricUnit,
+ pub objective: OptimizationObjective,
+}
+
+#[derive(Clone, Debug, Deserialize, PartialEq, Serialize)]
+pub struct ExperimentSummary {
+ pub id: ExperimentId,
+ pub slug: Slug,
+ pub frontier_id: FrontierId,
+ pub hypothesis_id: HypothesisId,
+ pub archived: bool,
+ pub title: NonEmptyText,
+ pub summary: Option<NonEmptyText>,
+ pub tags: Vec<TagName>,
+ pub status: ExperimentStatus,
+ pub verdict: Option<FrontierVerdict>,
+ pub primary_metric: Option<MetricObservationSummary>,
+ pub updated_at: OffsetDateTime,
+ pub closed_at: Option<OffsetDateTime>,
+}
+
+#[derive(Clone, Debug, Deserialize, PartialEq, Serialize)]
+pub struct ExperimentDetail {
+ pub record: ExperimentRecord,
+ pub owning_hypothesis: HypothesisSummary,
+ pub parents: Vec<VertexSummary>,
+ pub children: Vec<VertexSummary>,
+ pub artifacts: Vec<ArtifactSummary>,
+}
+
+#[derive(Clone, Debug)]
+pub struct CreateArtifactRequest {
+ pub slug: Option<Slug>,
+ pub kind: ArtifactKind,
+ pub label: NonEmptyText,
+ pub summary: Option<NonEmptyText>,
+ pub locator: NonEmptyText,
+ pub media_type: Option<NonEmptyText>,
+ pub attachments: Vec<AttachmentSelector>,
+}
+
+#[derive(Clone, Debug)]
+pub struct UpdateArtifactRequest {
+ pub artifact: String,
+ pub expected_revision: Option<u64>,
+ pub kind: Option<ArtifactKind>,
+ pub label: Option<NonEmptyText>,
+ pub summary: Option<TextPatch<NonEmptyText>>,
+ pub locator: Option<NonEmptyText>,
+ pub media_type: Option<TextPatch<NonEmptyText>>,
+ pub attachments: Option<Vec<AttachmentSelector>>,
}
#[derive(Clone, Debug, Default)]
-pub struct MetricKeyQuery {
- pub frontier_id: Option<fidget_spinner_core::FrontierId>,
- pub source: Option<MetricFieldSource>,
- pub dimensions: BTreeMap<NonEmptyText, RunDimensionValue>,
+pub struct ListArtifactsQuery {
+ pub frontier: Option<String>,
+ pub kind: Option<ArtifactKind>,
+ pub attached_to: Option<AttachmentSelector>,
+ pub limit: Option<u32>,
+}
+
+#[derive(Clone, Debug, Deserialize, Eq, PartialEq, Serialize)]
+pub struct ArtifactSummary {
+ pub id: ArtifactId,
+ pub slug: Slug,
+ pub kind: ArtifactKind,
+ pub label: NonEmptyText,
+ pub summary: Option<NonEmptyText>,
+ pub locator: NonEmptyText,
+ pub media_type: Option<NonEmptyText>,
+ pub updated_at: OffsetDateTime,
+}
+
+#[derive(Clone, Debug, Deserialize, Eq, PartialEq, Serialize)]
+pub struct ArtifactDetail {
+ pub record: ArtifactRecord,
+ pub attachments: Vec<AttachmentTargetRef>,
}
#[derive(Clone, Debug)]
@@ -314,6 +420,7 @@ pub struct DefineMetricRequest {
pub key: NonEmptyText,
pub unit: MetricUnit,
pub objective: OptimizationObjective,
+ pub visibility: MetricVisibility,
pub description: Option<NonEmptyText>,
}
@@ -325,154 +432,131 @@ pub struct DefineRunDimensionRequest {
}
#[derive(Clone, Debug)]
-pub struct UpsertSchemaFieldRequest {
- pub name: NonEmptyText,
- pub node_classes: BTreeSet<NodeClass>,
- pub presence: FieldPresence,
- pub severity: DiagnosticSeverity,
- pub role: FieldRole,
- pub inference_policy: InferencePolicy,
- pub value_type: Option<FieldValueType>,
-}
-
-#[derive(Clone, Debug)]
-pub struct RemoveSchemaFieldRequest {
- pub name: NonEmptyText,
- pub node_classes: Option<BTreeSet<NodeClass>>,
+pub struct MetricKeysQuery {
+ pub frontier: Option<String>,
+ pub scope: MetricScope,
}
#[derive(Clone, Debug, Deserialize, Eq, PartialEq, Serialize)]
-pub struct RunDimensionSummary {
+pub struct MetricKeySummary {
pub key: NonEmptyText,
- pub value_type: FieldValueType,
+ pub unit: MetricUnit,
+ pub objective: OptimizationObjective,
+ pub visibility: MetricVisibility,
pub description: Option<NonEmptyText>,
- pub observed_run_count: u64,
- pub distinct_value_count: u64,
- pub sample_values: Vec<Value>,
-}
-
-#[derive(Clone, Debug, Default, Deserialize, Eq, PartialEq, Serialize)]
-pub struct MetricPlaneMigrationReport {
- pub inserted_metric_definitions: u64,
- pub inserted_dimension_definitions: u64,
- pub inserted_dimension_values: u64,
+ pub reference_count: u64,
}
#[derive(Clone, Debug)]
-pub struct CreateFrontierRequest {
- pub label: NonEmptyText,
- pub contract_title: NonEmptyText,
- pub contract_summary: Option<NonEmptyText>,
- pub contract: FrontierContract,
+pub struct MetricBestQuery {
+ pub frontier: Option<String>,
+ pub hypothesis: Option<String>,
+ pub key: NonEmptyText,
+ pub dimensions: BTreeMap<NonEmptyText, RunDimensionValue>,
+ pub include_rejected: bool,
+ pub limit: Option<u32>,
+ pub order: Option<MetricRankOrder>,
}
-#[derive(Clone, Debug)]
-pub struct OpenExperimentRequest {
- pub frontier_id: fidget_spinner_core::FrontierId,
- pub hypothesis_node_id: fidget_spinner_core::NodeId,
- pub title: NonEmptyText,
- pub summary: Option<NonEmptyText>,
+#[derive(Clone, Debug, Deserialize, PartialEq, Serialize)]
+pub struct MetricBestEntry {
+ pub experiment: ExperimentSummary,
+ pub hypothesis: HypothesisSummary,
+ pub value: f64,
+ pub dimensions: BTreeMap<NonEmptyText, RunDimensionValue>,
}
#[derive(Clone, Debug, Deserialize, Eq, PartialEq, Serialize)]
-pub struct OpenExperimentSummary {
- pub id: fidget_spinner_core::ExperimentId,
- pub frontier_id: fidget_spinner_core::FrontierId,
- pub hypothesis_node_id: fidget_spinner_core::NodeId,
- pub title: NonEmptyText,
- pub summary: Option<NonEmptyText>,
- pub created_at: OffsetDateTime,
+pub struct EntityHistoryEntry {
+ pub revision: u64,
+ pub event_kind: NonEmptyText,
+ pub occurred_at: OffsetDateTime,
+ pub snapshot: Value,
}
-#[derive(Clone, Debug)]
-pub struct ExperimentAnalysisDraft {
- pub title: NonEmptyText,
- pub summary: NonEmptyText,
- pub body: NonEmptyText,
-}
-
-#[derive(Clone, Debug)]
-pub struct CloseExperimentRequest {
- pub experiment_id: fidget_spinner_core::ExperimentId,
- pub run_title: NonEmptyText,
- pub run_summary: Option<NonEmptyText>,
- pub backend: ExecutionBackend,
- pub dimensions: BTreeMap<NonEmptyText, RunDimensionValue>,
- pub command: CommandRecipe,
- pub primary_metric: MetricValue,
- pub supporting_metrics: Vec<MetricValue>,
- pub note: FrontierNote,
- pub verdict: FrontierVerdict,
- pub analysis: Option<ExperimentAnalysisDraft>,
- pub decision_title: NonEmptyText,
- pub decision_rationale: NonEmptyText,
+#[derive(Clone, Debug, Deserialize, PartialEq, Serialize)]
+pub struct HypothesisCurrentState {
+ pub hypothesis: HypothesisSummary,
+ pub open_experiments: Vec<ExperimentSummary>,
+ pub latest_closed_experiment: Option<ExperimentSummary>,
}
#[derive(Clone, Debug, Deserialize, PartialEq, Serialize)]
-pub struct ExperimentReceipt {
- pub open_experiment: OpenExperiment,
- pub run_node: DagNode,
- pub run: RunRecord,
- pub analysis_node: Option<DagNode>,
- pub decision_node: DagNode,
- pub experiment: CompletedExperiment,
+pub struct FrontierOpenProjection {
+ pub frontier: FrontierRecord,
+ pub active_tags: Vec<TagName>,
+ pub active_metric_keys: Vec<MetricKeySummary>,
+ pub active_hypotheses: Vec<HypothesisCurrentState>,
+ pub open_experiments: Vec<ExperimentSummary>,
}
pub struct ProjectStore {
project_root: Utf8PathBuf,
state_root: Utf8PathBuf,
- connection: Connection,
config: ProjectConfig,
- schema: ProjectSchema,
+ connection: Connection,
}
impl ProjectStore {
pub fn init(
project_root: impl AsRef<Utf8Path>,
display_name: NonEmptyText,
- schema_namespace: NonEmptyText,
) -> Result<Self, StoreError> {
let project_root = project_root.as_ref().to_path_buf();
+ fs::create_dir_all(project_root.as_std_path())?;
let state_root = state_root(&project_root);
- fs::create_dir_all(state_root.join("blobs"))?;
+ fs::create_dir_all(state_root.as_std_path())?;
let config = ProjectConfig::new(display_name);
write_json_file(&state_root.join(PROJECT_CONFIG_NAME), &config)?;
- let schema = ProjectSchema::default_with_namespace(schema_namespace);
- write_json_file(&state_root.join(PROJECT_SCHEMA_NAME), &schema)?;
- let mut connection = Connection::open(state_root.join(STATE_DB_NAME).as_std_path())?;
- upgrade_store(&mut connection)?;
+ let database_path = state_root.join(STATE_DB_NAME);
+ let connection = Connection::open(database_path.as_std_path())?;
+ connection.pragma_update(None, "foreign_keys", 1_i64)?;
+ connection.pragma_update(
+ None,
+ "user_version",
+ i64::from(CURRENT_STORE_FORMAT_VERSION),
+ )?;
+ install_schema(&connection)?;
Ok(Self {
project_root,
state_root,
- connection,
config,
- schema,
+ connection,
})
}
pub fn open(project_root: impl AsRef<Utf8Path>) -> Result<Self, StoreError> {
- let requested_root = project_root.as_ref().to_path_buf();
- let project_root = discover_project_root(&requested_root)
- .ok_or(StoreError::MissingProjectStore(requested_root))?;
+ let project_root = project_root.as_ref().to_path_buf();
let state_root = state_root(&project_root);
- let config = read_json_file::<ProjectConfig>(&state_root.join(PROJECT_CONFIG_NAME))?;
+ if !state_root.exists() {
+ return Err(StoreError::MissingProjectStore(project_root));
+ }
+ let config: ProjectConfig = read_json_file(&state_root.join(PROJECT_CONFIG_NAME))?;
if config.store_format_version != CURRENT_STORE_FORMAT_VERSION {
return Err(StoreError::IncompatibleStoreFormatVersion {
observed: config.store_format_version,
expected: CURRENT_STORE_FORMAT_VERSION,
});
}
- let schema = read_json_file::<ProjectSchema>(&state_root.join(PROJECT_SCHEMA_NAME))?;
- let mut connection = Connection::open(state_root.join(STATE_DB_NAME).as_std_path())?;
- upgrade_store(&mut connection)?;
+ let database_path = state_root.join(STATE_DB_NAME);
+ let connection = Connection::open(database_path.as_std_path())?;
+ connection.pragma_update(None, "foreign_keys", 1_i64)?;
+ let observed_version: i64 =
+ connection.pragma_query_value(None, "user_version", |row| row.get(0))?;
+ if u32::try_from(observed_version).ok() != Some(CURRENT_STORE_FORMAT_VERSION) {
+ return Err(StoreError::IncompatibleStoreFormatVersion {
+ observed: u32::try_from(observed_version).unwrap_or(0),
+ expected: CURRENT_STORE_FORMAT_VERSION,
+ });
+ }
+
Ok(Self {
project_root,
state_root,
- connection,
config,
- schema,
+ connection,
})
}
@@ -482,64 +566,6 @@ impl ProjectStore {
}
#[must_use]
- pub fn schema(&self) -> &ProjectSchema {
- &self.schema
- }
-
- pub fn upsert_schema_field(
- &mut self,
- request: UpsertSchemaFieldRequest,
- ) -> Result<ProjectFieldSpec, StoreError> {
- let field = ProjectFieldSpec {
- name: request.name,
- node_classes: request.node_classes,
- presence: request.presence,
- severity: request.severity,
- role: request.role,
- inference_policy: request.inference_policy,
- value_type: request.value_type,
- };
- if let Some(existing) = self.schema.fields.iter_mut().find(|existing| {
- existing.name == field.name && existing.node_classes == field.node_classes
- }) {
- if *existing == field {
- return Ok(field);
- }
- *existing = field.clone();
- } else {
- self.schema.fields.push(field.clone());
- }
- sort_schema_fields(&mut self.schema.fields);
- self.bump_schema_version();
- self.save_schema()?;
- Ok(field)
- }
-
- pub fn remove_schema_field(
- &mut self,
- request: RemoveSchemaFieldRequest,
- ) -> Result<u64, StoreError> {
- let before = self.schema.fields.len();
- self.schema.fields.retain(|field| {
- field.name != request.name
- || request
- .node_classes
- .as_ref()
- .is_some_and(|node_classes| field.node_classes != *node_classes)
- });
- let removed = before.saturating_sub(self.schema.fields.len()) as u64;
- if removed == 0 {
- return Err(StoreError::SchemaFieldNotFound(
- request.name.as_str().to_owned(),
- ));
- }
- sort_schema_fields(&mut self.schema.fields);
- self.bump_schema_version();
- self.save_schema()?;
- Ok(removed)
- }
-
- #[must_use]
pub fn project_root(&self) -> &Utf8Path {
&self.project_root
}
@@ -549,3634 +575,2745 @@ impl ProjectStore {
&self.state_root
}
- fn bump_schema_version(&mut self) {
- self.schema.version = self.schema.version.saturating_add(1);
- }
-
- fn save_schema(&self) -> Result<(), StoreError> {
- write_json_file(&self.state_root.join(PROJECT_SCHEMA_NAME), &self.schema)
+ pub fn status(&self) -> Result<ProjectStatus, StoreError> {
+ Ok(ProjectStatus {
+ project_root: self.project_root.clone(),
+ display_name: self.config.display_name.clone(),
+ store_format_version: self.config.store_format_version,
+ frontier_count: count_rows(&self.connection, "frontiers")?,
+ hypothesis_count: count_rows(&self.connection, "hypotheses")?,
+ experiment_count: count_rows(&self.connection, "experiments")?,
+ open_experiment_count: count_rows_where(
+ &self.connection,
+ "experiments",
+ "status = 'open'",
+ )?,
+ artifact_count: count_rows(&self.connection, "artifacts")?,
+ })
}
- pub fn create_frontier(
+ pub fn register_tag(
&mut self,
- request: CreateFrontierRequest,
- ) -> Result<FrontierProjection, StoreError> {
- let frontier_id = fidget_spinner_core::FrontierId::fresh();
- let payload = NodePayload::with_schema(
- self.schema.schema_ref(),
- frontier_contract_payload(&request.contract)?,
- );
- let diagnostics = self.schema.validate_node(NodeClass::Contract, &payload);
- let contract_node = DagNode::new(
- NodeClass::Contract,
- Some(frontier_id),
- request.contract_title,
- request.contract_summary,
- payload,
- diagnostics,
- );
- let frontier = FrontierRecord::with_id(frontier_id, request.label, contract_node.id);
-
- let tx = self.connection.transaction()?;
- let _ = upsert_metric_definition_tx(
- &tx,
- &MetricDefinition::new(
- request
- .contract
- .evaluation
- .primary_metric
- .metric_key
- .clone(),
- request.contract.evaluation.primary_metric.unit,
- request.contract.evaluation.primary_metric.objective,
- None,
- ),
- )?;
- for metric in &request.contract.evaluation.supporting_metrics {
- let _ = upsert_metric_definition_tx(
- &tx,
- &MetricDefinition::new(
- metric.metric_key.clone(),
- metric.unit,
- metric.objective,
- None,
- ),
- )?;
+ name: TagName,
+ description: NonEmptyText,
+ ) -> Result<TagRecord, StoreError> {
+ if self
+ .connection
+ .query_row(
+ "SELECT 1 FROM tags WHERE name = ?1",
+ params![name.as_str()],
+ |_| Ok(()),
+ )
+ .optional()?
+ .is_some()
+ {
+ return Err(StoreError::DuplicateTag(name));
}
- insert_node(&tx, &contract_node)?;
- insert_frontier(&tx, &frontier)?;
- insert_event(
- &tx,
- "frontier",
- &frontier.id.to_string(),
- "frontier.created",
- json!({"root_contract_node_id": contract_node.id}),
+ let created_at = OffsetDateTime::now_utc();
+ let _ = self.connection.execute(
+ "INSERT INTO tags (name, description, created_at) VALUES (?1, ?2, ?3)",
+ params![
+ name.as_str(),
+ description.as_str(),
+ encode_timestamp(created_at)?
+ ],
)?;
- tx.commit()?;
+ Ok(TagRecord {
+ name,
+ description,
+ created_at,
+ })
+ }
- self.frontier_projection(frontier.id)
+ pub fn list_tags(&self) -> Result<Vec<TagRecord>, StoreError> {
+ let mut statement = self
+ .connection
+ .prepare("SELECT name, description, created_at FROM tags ORDER BY name ASC")?;
+ let rows = statement.query_map([], |row| {
+ Ok(TagRecord {
+ name: parse_tag_name(&row.get::<_, String>(0)?)?,
+ description: parse_non_empty_text(&row.get::<_, String>(1)?)?,
+ created_at: parse_timestamp_sql(&row.get::<_, String>(2)?)?,
+ })
+ })?;
+ rows.collect::<Result<Vec<_>, _>>()
+ .map_err(StoreError::from)
}
pub fn define_metric(
&mut self,
request: DefineMetricRequest,
) -> Result<MetricDefinition, StoreError> {
+ if self.metric_definition(&request.key)?.is_some() {
+ return Err(StoreError::DuplicateMetricDefinition(request.key));
+ }
let record = MetricDefinition::new(
request.key,
request.unit,
request.objective,
+ request.visibility,
request.description,
);
- let tx = self.connection.transaction()?;
- let _ = upsert_metric_definition_tx(&tx, &record)?;
- tx.commit()?;
+ let _ = self.connection.execute(
+ "INSERT INTO metric_definitions (key, unit, objective, visibility, description, created_at, updated_at)
+ VALUES (?1, ?2, ?3, ?4, ?5, ?6, ?7)",
+ params![
+ record.key.as_str(),
+ record.unit.as_str(),
+ record.objective.as_str(),
+ record.visibility.as_str(),
+ record.description.as_ref().map(NonEmptyText::as_str),
+ encode_timestamp(record.created_at)?,
+ encode_timestamp(record.updated_at)?,
+ ],
+ )?;
Ok(record)
}
pub fn list_metric_definitions(&self) -> Result<Vec<MetricDefinition>, StoreError> {
let mut statement = self.connection.prepare(
- "SELECT metric_key, unit, objective, description, created_at
+ "SELECT key, unit, objective, visibility, description, created_at, updated_at
FROM metric_definitions
- ORDER BY metric_key ASC",
+ ORDER BY key ASC",
)?;
- let mut rows = statement.query([])?;
- let mut items = Vec::new();
- while let Some(row) = rows.next()? {
- items.push(MetricDefinition {
- key: NonEmptyText::new(row.get::<_, String>(0)?)?,
- unit: decode_metric_unit(&row.get::<_, String>(1)?)?,
- objective: decode_optimization_objective(&row.get::<_, String>(2)?)?,
- description: row
- .get::<_, Option<String>>(3)?
- .map(NonEmptyText::new)
- .transpose()?,
- created_at: decode_timestamp(&row.get::<_, String>(4)?)?,
- });
- }
- Ok(items)
+ let rows = statement.query_map([], decode_metric_definition_row)?;
+ rows.collect::<Result<Vec<_>, _>>()
+ .map_err(StoreError::from)
}
pub fn define_run_dimension(
&mut self,
request: DefineRunDimensionRequest,
) -> Result<RunDimensionDefinition, StoreError> {
+ if self.run_dimension_definition(&request.key)?.is_some() {
+ return Err(StoreError::DuplicateRunDimension(request.key));
+ }
let record =
RunDimensionDefinition::new(request.key, request.value_type, request.description);
- let tx = self.connection.transaction()?;
- let _ = insert_run_dimension_definition_tx(&tx, &record)?;
- tx.commit()?;
+ let _ = self.connection.execute(
+ "INSERT INTO run_dimension_definitions (key, value_type, description, created_at, updated_at)
+ VALUES (?1, ?2, ?3, ?4, ?5)",
+ params![
+ record.key.as_str(),
+ record.value_type.as_str(),
+ record.description.as_ref().map(NonEmptyText::as_str),
+ encode_timestamp(record.created_at)?,
+ encode_timestamp(record.updated_at)?,
+ ],
+ )?;
Ok(record)
}
- pub fn list_run_dimensions(&self) -> Result<Vec<RunDimensionSummary>, StoreError> {
- load_run_dimension_summaries(self)
- }
-
- pub fn coerce_run_dimensions(
- &self,
- raw_dimensions: BTreeMap<String, Value>,
- ) -> Result<BTreeMap<NonEmptyText, RunDimensionValue>, StoreError> {
- coerce_run_dimension_map(&run_dimension_definitions_by_key(self)?, raw_dimensions)
- }
-
- pub fn migrate_metric_plane(&mut self) -> Result<MetricPlaneMigrationReport, StoreError> {
- let tx = self.connection.transaction()?;
- let report = normalize_metric_plane_tx(&tx)?;
- tx.commit()?;
- Ok(report)
+ pub fn list_run_dimensions(&self) -> Result<Vec<RunDimensionDefinition>, StoreError> {
+ let mut statement = self.connection.prepare(
+ "SELECT key, value_type, description, created_at, updated_at
+ FROM run_dimension_definitions
+ ORDER BY key ASC",
+ )?;
+ let rows = statement.query_map([], decode_run_dimension_definition_row)?;
+ rows.collect::<Result<Vec<_>, _>>()
+ .map_err(StoreError::from)
}
- pub fn add_tag(
+ pub fn create_frontier(
&mut self,
- name: TagName,
- description: NonEmptyText,
- ) -> Result<TagRecord, StoreError> {
- let record = TagRecord {
- name,
- description,
- created_at: OffsetDateTime::now_utc(),
+ request: CreateFrontierRequest,
+ ) -> Result<FrontierRecord, StoreError> {
+ let id = FrontierId::fresh();
+ let slug = self.unique_frontier_slug(request.slug, &request.label)?;
+ let now = OffsetDateTime::now_utc();
+ let record = FrontierRecord {
+ id,
+ slug,
+ label: request.label,
+ objective: request.objective,
+ status: FrontierStatus::Exploring,
+ brief: FrontierBrief::default(),
+ revision: 1,
+ created_at: now,
+ updated_at: now,
};
- let tx = self.connection.transaction()?;
- insert_tag(&tx, &record)?;
- insert_event(
- &tx,
- "tag",
- record.name.as_str(),
- "tag.created",
- json!({"description": record.description.as_str()}),
+ let transaction = self.connection.transaction()?;
+ insert_frontier(&transaction, &record)?;
+ record_event(
+ &transaction,
+ "frontier",
+ &record.id.to_string(),
+ 1,
+ "created",
+ &record,
)?;
- tx.commit()?;
+ transaction.commit()?;
Ok(record)
}
- pub fn list_tags(&self) -> Result<Vec<TagRecord>, StoreError> {
+ pub fn list_frontiers(&self) -> Result<Vec<FrontierSummary>, StoreError> {
let mut statement = self.connection.prepare(
- "SELECT name, description, created_at
- FROM tags
- ORDER BY name ASC",
+ "SELECT id, slug, label, objective, status, brief_json, revision, created_at, updated_at
+ FROM frontiers
+ ORDER BY updated_at DESC, created_at DESC",
)?;
- let mut rows = statement.query([])?;
- let mut items = Vec::new();
- while let Some(row) = rows.next()? {
- items.push(TagRecord {
- name: TagName::new(row.get::<_, String>(0)?)?,
- description: NonEmptyText::new(row.get::<_, String>(1)?)?,
- created_at: decode_timestamp(&row.get::<_, String>(2)?)?,
- });
- }
- Ok(items)
+ let rows = statement.query_map([], decode_frontier_row)?;
+ rows.collect::<Result<Vec<_>, _>>()
+ .map_err(StoreError::from)?
+ .into_iter()
+ .map(|record| {
+ Ok(FrontierSummary {
+ active_hypothesis_count: self.active_hypothesis_count(record.id)?,
+ open_experiment_count: self.open_experiment_count(Some(record.id))?,
+ id: record.id,
+ slug: record.slug,
+ label: record.label,
+ objective: record.objective,
+ status: record.status,
+ updated_at: record.updated_at,
+ })
+ })
+ .collect()
}
- pub fn add_node(&mut self, request: CreateNodeRequest) -> Result<DagNode, StoreError> {
- validate_prose_node_request(&request)?;
- let diagnostics = self.schema.validate_node(request.class, &request.payload);
- let mut node = DagNode::new(
- request.class,
- request.frontier_id,
- request.title,
- request.summary,
- request.payload,
- diagnostics,
- );
- node.tags = match (request.class, request.tags) {
- (NodeClass::Note, Some(tags)) => tags,
- (NodeClass::Note, None) => return Err(StoreError::NoteTagsRequired),
- (_, Some(tags)) => tags,
- (_, None) => BTreeSet::new(),
- };
- node.annotations = request.annotations;
+ pub fn read_frontier(&self, selector: &str) -> Result<FrontierRecord, StoreError> {
+ self.resolve_frontier(selector)
+ }
- let tx = self.connection.transaction()?;
- ensure_known_tags(&tx, &node.tags)?;
- insert_node(&tx, &node)?;
- for attachment in &request.attachments {
- insert_edge(&tx, &attachment.materialize(node.id))?;
- }
- insert_event(
- &tx,
- "node",
- &node.id.to_string(),
- "node.created",
- json!({"class": node.class.as_str(), "frontier_id": node.frontier_id}),
+ pub fn update_frontier_brief(
+ &mut self,
+ request: UpdateFrontierBriefRequest,
+ ) -> Result<FrontierRecord, StoreError> {
+ let frontier = self.resolve_frontier(&request.frontier)?;
+ enforce_revision(
+ "frontier",
+ &request.frontier,
+ request.expected_revision,
+ frontier.revision,
+ )?;
+ let now = OffsetDateTime::now_utc();
+ let brief = FrontierBrief {
+ situation: apply_optional_text_patch(
+ request.situation,
+ frontier.brief.situation.clone(),
+ ),
+ roadmap: match request.roadmap {
+ Some(items) => items
+ .into_iter()
+ .map(|item| {
+ Ok(FrontierRoadmapItem {
+ rank: item.rank,
+ hypothesis_id: self.resolve_hypothesis(&item.hypothesis)?.id,
+ summary: item.summary,
+ })
+ })
+ .collect::<Result<Vec<_>, StoreError>>()?,
+ None => frontier.brief.roadmap.clone(),
+ },
+ unknowns: request.unknowns.unwrap_or(frontier.brief.unknowns.clone()),
+ revision: frontier.brief.revision.saturating_add(1),
+ updated_at: Some(now),
+ };
+ let updated = FrontierRecord {
+ brief,
+ revision: frontier.revision.saturating_add(1),
+ updated_at: now,
+ ..frontier
+ };
+ let transaction = self.connection.transaction()?;
+ update_frontier(&transaction, &updated)?;
+ record_event(
+ &transaction,
+ "frontier",
+ &updated.id.to_string(),
+ updated.revision,
+ "brief_updated",
+ &updated,
)?;
- tx.commit()?;
- Ok(node)
+ transaction.commit()?;
+ Ok(updated)
}
- pub fn list_metric_keys(&self) -> Result<Vec<MetricKeySummary>, StoreError> {
- self.list_metric_keys_filtered(MetricKeyQuery::default())
+ pub fn create_hypothesis(
+ &mut self,
+ request: CreateHypothesisRequest,
+ ) -> Result<HypothesisRecord, StoreError> {
+ validate_hypothesis_body(&request.body)?;
+ self.assert_known_tags(&request.tags)?;
+ let frontier = self.resolve_frontier(&request.frontier)?;
+ let id = HypothesisId::fresh();
+ let slug = self.unique_hypothesis_slug(request.slug, &request.title)?;
+ let now = OffsetDateTime::now_utc();
+ let record = HypothesisRecord {
+ id,
+ slug,
+ frontier_id: frontier.id,
+ archived: false,
+ title: request.title,
+ summary: request.summary,
+ body: request.body,
+ tags: request.tags.iter().cloned().collect(),
+ revision: 1,
+ created_at: now,
+ updated_at: now,
+ };
+ let parents = self.resolve_vertex_parents(
+ frontier.id,
+ &request.parents,
+ Some(VertexRef::Hypothesis(id)),
+ )?;
+ let transaction = self.connection.transaction()?;
+ insert_hypothesis(&transaction, &record)?;
+ replace_hypothesis_tags(&transaction, record.id, &request.tags)?;
+ replace_influence_parents(&transaction, VertexRef::Hypothesis(id), &parents)?;
+ record_event(
+ &transaction,
+ "hypothesis",
+ &record.id.to_string(),
+ 1,
+ "created",
+ &record,
+ )?;
+ transaction.commit()?;
+ Ok(record)
}
- pub fn list_metric_keys_filtered(
+ pub fn list_hypotheses(
&self,
- query: MetricKeyQuery,
- ) -> Result<Vec<MetricKeySummary>, StoreError> {
- let mut summaries = collect_metric_samples(self, &query)?
+ query: ListHypothesesQuery,
+ ) -> Result<Vec<HypothesisSummary>, StoreError> {
+ let frontier_id = query
+ .frontier
+ .as_deref()
+ .map(|selector| self.resolve_frontier(selector).map(|frontier| frontier.id))
+ .transpose()?;
+ let records = self.load_hypothesis_records(frontier_id, query.include_archived)?;
+ let filtered = records
.into_iter()
- .fold(
- BTreeMap::<(MetricFieldSource, String), MetricKeyAccumulator>::new(),
- |mut accumulators, sample| {
- let key = (sample.source, sample.key.as_str().to_owned());
- let _ = accumulators
- .entry(key)
- .and_modify(|entry| entry.observe(&sample))
- .or_insert_with(|| MetricKeyAccumulator::from_sample(&sample));
- accumulators
- },
- )
- .into_values()
- .map(MetricKeyAccumulator::finish)
- .collect::<Vec<_>>();
- if query
- .source
- .is_none_or(|source| source == MetricFieldSource::RunMetric)
- {
- merge_registered_run_metric_summaries(self, &mut summaries)?;
- }
- summaries.sort_by(|left, right| {
- left.key
- .cmp(&right.key)
- .then(left.source.cmp(&right.source))
- });
- Ok(summaries)
+ .filter(|record| {
+ query.tags.is_empty() || query.tags.iter().all(|tag| record.tags.contains(tag))
+ })
+ .map(|record| self.hypothesis_summary_from_record(record))
+ .collect::<Result<Vec<_>, _>>()?;
+ Ok(apply_limit(filtered, query.limit))
}
- pub fn best_metrics(&self, query: MetricBestQuery) -> Result<Vec<MetricBestEntry>, StoreError> {
- let matching = collect_metric_samples(
- self,
- &MetricKeyQuery {
- frontier_id: query.frontier_id,
- source: query.source,
- dimensions: query.dimensions.clone(),
- },
- )?
- .into_iter()
- .filter(|sample| sample.key == query.key)
- .collect::<Vec<_>>();
- if matching.is_empty() {
- return Ok(Vec::new());
- }
-
- let source = if let Some(source) = query.source {
- source
- } else {
- let sources = matching
- .iter()
- .map(|sample| sample.source)
- .collect::<BTreeSet<_>>();
- if sources.len() != 1 {
- return Err(StoreError::AmbiguousMetricKey {
- key: query.key.as_str().to_owned(),
- sources: sources
- .into_iter()
- .map(MetricFieldSource::as_str)
- .collect::<Vec<_>>()
- .join(", "),
- });
- }
- let Some(source) = sources.iter().copied().next() else {
- return Ok(Vec::new());
- };
- source
- };
-
- let mut matching = matching
- .into_iter()
- .filter(|sample| sample.source == source)
- .collect::<Vec<_>>();
- if matching.is_empty() {
- return Ok(Vec::new());
- }
-
- let order = resolve_metric_order(&matching, &query, source)?;
- matching.sort_by(|left, right| compare_metric_samples(left, right, order));
- matching.truncate(query.limit as usize);
- Ok(matching
+ pub fn read_hypothesis(&self, selector: &str) -> Result<HypothesisDetail, StoreError> {
+ let record = self.resolve_hypothesis(selector)?;
+ let parents = self.load_vertex_parents(VertexRef::Hypothesis(record.id))?;
+ let children = self.load_vertex_children(VertexRef::Hypothesis(record.id))?;
+ let experiments = self.list_experiments(ListExperimentsQuery {
+ hypothesis: Some(record.id.to_string()),
+ include_archived: true,
+ limit: None,
+ ..ListExperimentsQuery::default()
+ })?;
+ let (open_experiments, closed_experiments): (Vec<_>, Vec<_>) = experiments
.into_iter()
- .map(|sample| sample.into_entry(order))
- .collect())
+ .partition(|experiment| experiment.status == ExperimentStatus::Open);
+ Ok(HypothesisDetail {
+ artifacts: self.list_artifacts(ListArtifactsQuery {
+ attached_to: Some(AttachmentSelector::Hypothesis(record.id.to_string())),
+ limit: None,
+ ..ListArtifactsQuery::default()
+ })?,
+ children,
+ closed_experiments,
+ open_experiments,
+ parents,
+ record,
+ })
}
- pub fn archive_node(&mut self, node_id: fidget_spinner_core::NodeId) -> Result<(), StoreError> {
- let updated_at = encode_timestamp(OffsetDateTime::now_utc())?;
- let changed = self.connection.execute(
- "UPDATE nodes SET archived = 1, updated_at = ?1 WHERE id = ?2",
- params![updated_at, node_id.to_string()],
+ pub fn update_hypothesis(
+ &mut self,
+ request: UpdateHypothesisRequest,
+ ) -> Result<HypothesisRecord, StoreError> {
+ let record = self.resolve_hypothesis(&request.hypothesis)?;
+ enforce_revision(
+ "hypothesis",
+ &request.hypothesis,
+ request.expected_revision,
+ record.revision,
)?;
- if changed == 0 {
- return Err(StoreError::NodeNotFound(node_id));
+ if let Some(body) = request.body.as_ref() {
+ validate_hypothesis_body(body)?;
}
- Ok(())
+ if let Some(tags) = request.tags.as_ref() {
+ self.assert_known_tags(tags)?;
+ }
+ let updated = HypothesisRecord {
+ title: request.title.unwrap_or(record.title.clone()),
+ summary: request.summary.unwrap_or(record.summary.clone()),
+ body: request.body.unwrap_or(record.body.clone()),
+ tags: request
+ .tags
+ .clone()
+ .map_or_else(|| record.tags.clone(), |tags| tags.into_iter().collect()),
+ archived: request.archived.unwrap_or(record.archived),
+ revision: record.revision.saturating_add(1),
+ updated_at: OffsetDateTime::now_utc(),
+ ..record
+ };
+ let parents = request
+ .parents
+ .as_ref()
+ .map(|selectors| {
+ self.resolve_vertex_parents(
+ updated.frontier_id,
+ selectors,
+ Some(VertexRef::Hypothesis(updated.id)),
+ )
+ })
+ .transpose()?;
+ let transaction = self.connection.transaction()?;
+ update_hypothesis_row(&transaction, &updated)?;
+ replace_hypothesis_tags(
+ &transaction,
+ updated.id,
+ &updated.tags.iter().cloned().collect::<BTreeSet<_>>(),
+ )?;
+ if let Some(parents) = parents.as_ref() {
+ replace_influence_parents(&transaction, VertexRef::Hypothesis(updated.id), parents)?;
+ }
+ record_event(
+ &transaction,
+ "hypothesis",
+ &updated.id.to_string(),
+ updated.revision,
+ "updated",
+ &updated,
+ )?;
+ transaction.commit()?;
+ Ok(updated)
}
- pub fn annotate_node(
+ pub fn open_experiment(
&mut self,
- node_id: fidget_spinner_core::NodeId,
- annotation: NodeAnnotation,
- ) -> Result<(), StoreError> {
- let tx = self.connection.transaction()?;
- let exists = tx
- .query_row(
- "SELECT 1 FROM nodes WHERE id = ?1",
- params![node_id.to_string()],
- |row| row.get::<_, i64>(0),
- )
- .optional()?;
- if exists.is_none() {
- return Err(StoreError::NodeNotFound(node_id));
- }
- insert_annotation(&tx, node_id, &annotation)?;
- let _ = tx.execute(
- "UPDATE nodes SET updated_at = ?1 WHERE id = ?2",
- params![
- encode_timestamp(OffsetDateTime::now_utc())?,
- node_id.to_string()
- ],
+ request: OpenExperimentRequest,
+ ) -> Result<ExperimentRecord, StoreError> {
+ self.assert_known_tags(&request.tags)?;
+ let hypothesis = self.resolve_hypothesis(&request.hypothesis)?;
+ let id = ExperimentId::fresh();
+ let slug = self.unique_experiment_slug(request.slug, &request.title)?;
+ let now = OffsetDateTime::now_utc();
+ let record = ExperimentRecord {
+ id,
+ slug,
+ frontier_id: hypothesis.frontier_id,
+ hypothesis_id: hypothesis.id,
+ archived: false,
+ title: request.title,
+ summary: request.summary,
+ tags: request.tags.iter().cloned().collect(),
+ status: ExperimentStatus::Open,
+ outcome: None,
+ revision: 1,
+ created_at: now,
+ updated_at: now,
+ };
+ let parents = self.resolve_vertex_parents(
+ hypothesis.frontier_id,
+ &request.parents,
+ Some(VertexRef::Experiment(id)),
)?;
- insert_event(
- &tx,
- "node",
- &node_id.to_string(),
- "node.annotated",
- json!({"visibility": format!("{:?}", annotation.visibility)}),
+ let transaction = self.connection.transaction()?;
+ insert_experiment(&transaction, &record)?;
+ replace_experiment_tags(&transaction, record.id, &request.tags)?;
+ replace_influence_parents(&transaction, VertexRef::Experiment(id), &parents)?;
+ record_event(
+ &transaction,
+ "experiment",
+ &record.id.to_string(),
+ 1,
+ "opened",
+ &record,
)?;
- tx.commit()?;
- Ok(())
+ transaction.commit()?;
+ Ok(record)
}
- pub fn get_node(
+ pub fn list_experiments(
&self,
- node_id: fidget_spinner_core::NodeId,
- ) -> Result<Option<DagNode>, StoreError> {
- let mut statement = self.connection.prepare(
- "SELECT
- id,
- class,
- track,
- frontier_id,
- archived,
- title,
- summary,
- payload_schema_namespace,
- payload_schema_version,
- payload_json,
- diagnostics_json,
- agent_session_id,
- created_at,
- updated_at
- FROM nodes
- WHERE id = ?1",
- )?;
- let node = statement
- .query_row(params![node_id.to_string()], read_node_row)
- .optional()?;
- node.map(|mut item| {
- item.tags = self.load_tags(item.id)?;
- item.annotations = self.load_annotations(item.id)?;
- Ok(item)
+ query: ListExperimentsQuery,
+ ) -> Result<Vec<ExperimentSummary>, StoreError> {
+ let frontier_id = query
+ .frontier
+ .as_deref()
+ .map(|selector| self.resolve_frontier(selector).map(|frontier| frontier.id))
+ .transpose()?;
+ let hypothesis_id = query
+ .hypothesis
+ .as_deref()
+ .map(|selector| {
+ self.resolve_hypothesis(selector)
+ .map(|hypothesis| hypothesis.id)
+ })
+ .transpose()?;
+ let records =
+ self.load_experiment_records(frontier_id, hypothesis_id, query.include_archived)?;
+ let filtered = records
+ .into_iter()
+ .filter(|record| query.status.is_none_or(|status| record.status == status))
+ .filter(|record| {
+ query.tags.is_empty() || query.tags.iter().all(|tag| record.tags.contains(tag))
+ })
+ .map(|record| self.experiment_summary_from_record(record))
+ .collect::<Result<Vec<_>, _>>()?;
+ Ok(apply_limit(filtered, query.limit))
+ }
+
+ pub fn read_experiment(&self, selector: &str) -> Result<ExperimentDetail, StoreError> {
+ let record = self.resolve_experiment(selector)?;
+ Ok(ExperimentDetail {
+ artifacts: self.list_artifacts(ListArtifactsQuery {
+ attached_to: Some(AttachmentSelector::Experiment(record.id.to_string())),
+ limit: None,
+ ..ListArtifactsQuery::default()
+ })?,
+ children: self.load_vertex_children(VertexRef::Experiment(record.id))?,
+ owning_hypothesis: self
+ .hypothesis_summary_from_record(self.hypothesis_by_id(record.hypothesis_id)?)?,
+ parents: self.load_vertex_parents(VertexRef::Experiment(record.id))?,
+ record,
})
- .transpose()
}
- pub fn list_nodes(&self, query: ListNodesQuery) -> Result<Vec<NodeSummary>, StoreError> {
- let frontier_id = query.frontier_id.map(|id| id.to_string());
- let class = query.class.map(|item| item.as_str().to_owned());
- let mut sql = String::from(
- "SELECT
- n.id,
- n.class,
- n.track,
- n.frontier_id,
- n.archived,
- n.title,
- n.summary,
- n.diagnostics_json,
- n.created_at,
- n.updated_at,
- (
- SELECT COUNT(*)
- FROM node_annotations AS a
- WHERE a.node_id = n.id AND a.visibility = 'hidden'
- ) AS hidden_annotation_count
- FROM nodes AS n
- WHERE (?1 IS NULL OR n.frontier_id = ?1)
- AND (?2 IS NULL OR n.class = ?2)
- AND (?3 = 1 OR n.archived = 0)",
- );
- let mut parameters = vec![
- frontier_id.map_or(SqlValue::Null, SqlValue::Text),
- class.map_or(SqlValue::Null, SqlValue::Text),
- SqlValue::Integer(i64::from(query.include_archived)),
- ];
- for (index, tag) in query.tags.iter().enumerate() {
- let placeholder = parameters.len() + 1;
- let _ = write!(
- sql,
- "
- AND EXISTS (
- SELECT 1
- FROM node_tags AS nt{index}
- WHERE nt{index}.node_id = n.id AND nt{index}.tag_name = ?{placeholder}
- )"
- );
- parameters.push(SqlValue::Text(tag.as_str().to_owned()));
- }
- let limit_placeholder = parameters.len() + 1;
- let _ = write!(
- sql,
- "
- ORDER BY n.updated_at DESC
- LIMIT ?{limit_placeholder}"
- );
- parameters.push(SqlValue::Integer(i64::from(query.limit)));
- let mut statement = self.connection.prepare(&sql)?;
- let mut rows = statement.query(params_from_iter(parameters.iter()))?;
- let mut items = Vec::new();
- while let Some(row) = rows.next()? {
- let diagnostics = decode_json::<NodeDiagnostics>(&row.get::<_, String>(7)?)?;
- let node_id = parse_node_id(&row.get::<_, String>(0)?)?;
- items.push(NodeSummary {
- id: node_id,
- class: parse_node_class(&row.get::<_, String>(1)?)?,
- track: parse_node_track(&row.get::<_, String>(2)?)?,
- frontier_id: row
- .get::<_, Option<String>>(3)?
- .map(|raw| parse_frontier_id(&raw))
- .transpose()?,
- archived: row.get::<_, i64>(4)? != 0,
- title: NonEmptyText::new(row.get::<_, String>(5)?)?,
- summary: row
- .get::<_, Option<String>>(6)?
- .map(NonEmptyText::new)
- .transpose()?,
- tags: self.load_tags(node_id)?,
- diagnostic_count: diagnostics.items.len() as u64,
- hidden_annotation_count: row.get::<_, i64>(10)? as u64,
- created_at: decode_timestamp(&row.get::<_, String>(8)?)?,
- updated_at: decode_timestamp(&row.get::<_, String>(9)?)?,
- });
+ pub fn update_experiment(
+ &mut self,
+ request: UpdateExperimentRequest,
+ ) -> Result<ExperimentRecord, StoreError> {
+ let record = self.resolve_experiment(&request.experiment)?;
+ enforce_revision(
+ "experiment",
+ &request.experiment,
+ request.expected_revision,
+ record.revision,
+ )?;
+ if let Some(tags) = request.tags.as_ref() {
+ self.assert_known_tags(tags)?;
}
- Ok(items)
- }
-
- pub fn list_frontiers(&self) -> Result<Vec<FrontierRecord>, StoreError> {
- let mut statement = self.connection.prepare(
- "SELECT id, label, root_contract_node_id, status, created_at, updated_at
- FROM frontiers
- ORDER BY updated_at DESC",
+ let outcome = match request.outcome {
+ Some(patch) => Some(self.materialize_outcome(&patch)?),
+ None => record.outcome.clone(),
+ };
+ let updated = ExperimentRecord {
+ title: request.title.unwrap_or(record.title.clone()),
+ summary: apply_optional_text_patch(request.summary, record.summary.clone()),
+ tags: request
+ .tags
+ .clone()
+ .map_or_else(|| record.tags.clone(), |tags| tags.into_iter().collect()),
+ archived: request.archived.unwrap_or(record.archived),
+ status: if outcome.is_some() {
+ ExperimentStatus::Closed
+ } else {
+ record.status
+ },
+ outcome,
+ revision: record.revision.saturating_add(1),
+ updated_at: OffsetDateTime::now_utc(),
+ ..record
+ };
+ let parents = request
+ .parents
+ .as_ref()
+ .map(|selectors| {
+ self.resolve_vertex_parents(
+ updated.frontier_id,
+ selectors,
+ Some(VertexRef::Experiment(updated.id)),
+ )
+ })
+ .transpose()?;
+ let transaction = self.connection.transaction()?;
+ update_experiment_row(&transaction, &updated)?;
+ replace_experiment_tags(
+ &transaction,
+ updated.id,
+ &updated.tags.iter().cloned().collect::<BTreeSet<_>>(),
)?;
- let mut rows = statement.query([])?;
- let mut items = Vec::new();
- while let Some(row) = rows.next()? {
- items.push(read_frontier_row(row)?);
+ replace_experiment_dimensions(&transaction, updated.id, updated.outcome.as_ref())?;
+ replace_experiment_metrics(&transaction, updated.id, updated.outcome.as_ref())?;
+ if let Some(parents) = parents.as_ref() {
+ replace_influence_parents(&transaction, VertexRef::Experiment(updated.id), parents)?;
}
- Ok(items)
+ record_event(
+ &transaction,
+ "experiment",
+ &updated.id.to_string(),
+ updated.revision,
+ "updated",
+ &updated,
+ )?;
+ transaction.commit()?;
+ Ok(updated)
}
- pub fn frontier_projection(
- &self,
- frontier_id: fidget_spinner_core::FrontierId,
- ) -> Result<FrontierProjection, StoreError> {
- let frontier = self.load_frontier(frontier_id)?;
- let open_experiment_count = self.connection.query_row(
- "SELECT COUNT(*) FROM open_experiments WHERE frontier_id = ?1",
- params![frontier_id.to_string()],
- |row| row.get::<_, i64>(0),
- )? as u64;
- let completed_experiment_count = self.connection.query_row(
- "SELECT COUNT(*) FROM experiments WHERE frontier_id = ?1",
- params![frontier_id.to_string()],
- |row| row.get::<_, i64>(0),
- )? as u64;
- let verdict_counts = self.connection.query_row(
- "SELECT
- SUM(CASE WHEN verdict = 'accepted' THEN 1 ELSE 0 END),
- SUM(CASE WHEN verdict = 'kept' THEN 1 ELSE 0 END),
- SUM(CASE WHEN verdict = 'parked' THEN 1 ELSE 0 END),
- SUM(CASE WHEN verdict = 'rejected' THEN 1 ELSE 0 END)
- FROM experiments
- WHERE frontier_id = ?1",
- params![frontier_id.to_string()],
- |row| {
- Ok(FrontierVerdictCounts {
- accepted: row.get::<_, Option<i64>>(0)?.unwrap_or(0) as u64,
- kept: row.get::<_, Option<i64>>(1)?.unwrap_or(0) as u64,
- parked: row.get::<_, Option<i64>>(2)?.unwrap_or(0) as u64,
- rejected: row.get::<_, Option<i64>>(3)?.unwrap_or(0) as u64,
- })
- },
+ pub fn close_experiment(
+ &mut self,
+ request: CloseExperimentRequest,
+ ) -> Result<ExperimentRecord, StoreError> {
+ let record = self.resolve_experiment(&request.experiment)?;
+ if record.status == ExperimentStatus::Closed {
+ return Err(StoreError::ExperimentAlreadyClosed(record.id));
+ }
+ enforce_revision(
+ "experiment",
+ &request.experiment,
+ request.expected_revision,
+ record.revision,
)?;
-
- Ok(FrontierProjection {
- frontier,
- open_experiment_count,
- completed_experiment_count,
- verdict_counts,
- })
+ let outcome = self.materialize_outcome(&ExperimentOutcomePatch {
+ backend: request.backend,
+ command: request.command,
+ dimensions: request.dimensions,
+ primary_metric: request.primary_metric,
+ supporting_metrics: request.supporting_metrics,
+ verdict: request.verdict,
+ rationale: request.rationale,
+ analysis: request.analysis,
+ })?;
+ let updated = ExperimentRecord {
+ status: ExperimentStatus::Closed,
+ outcome: Some(outcome),
+ revision: record.revision.saturating_add(1),
+ updated_at: OffsetDateTime::now_utc(),
+ ..record
+ };
+ let transaction = self.connection.transaction()?;
+ update_experiment_row(&transaction, &updated)?;
+ replace_experiment_dimensions(&transaction, updated.id, updated.outcome.as_ref())?;
+ replace_experiment_metrics(&transaction, updated.id, updated.outcome.as_ref())?;
+ record_event(
+ &transaction,
+ "experiment",
+ &updated.id.to_string(),
+ updated.revision,
+ "closed",
+ &updated,
+ )?;
+ transaction.commit()?;
+ Ok(updated)
}
- pub fn open_experiment(
+ pub fn create_artifact(
&mut self,
- request: OpenExperimentRequest,
- ) -> Result<OpenExperimentSummary, StoreError> {
- let hypothesis_node = self
- .get_node(request.hypothesis_node_id)?
- .ok_or(StoreError::NodeNotFound(request.hypothesis_node_id))?;
- if hypothesis_node.class != NodeClass::Hypothesis {
- return Err(StoreError::NodeNotHypothesis(request.hypothesis_node_id));
- }
- if hypothesis_node.frontier_id != Some(request.frontier_id) {
- return Err(StoreError::FrontierNotFound(request.frontier_id));
- }
- let experiment = OpenExperiment {
- id: fidget_spinner_core::ExperimentId::fresh(),
- frontier_id: request.frontier_id,
- hypothesis_node_id: request.hypothesis_node_id,
- title: request.title,
+ request: CreateArtifactRequest,
+ ) -> Result<ArtifactRecord, StoreError> {
+ let id = ArtifactId::fresh();
+ let slug = self.unique_artifact_slug(request.slug, &request.label)?;
+ let now = OffsetDateTime::now_utc();
+ let record = ArtifactRecord {
+ id,
+ slug,
+ kind: request.kind,
+ label: request.label,
summary: request.summary,
- created_at: OffsetDateTime::now_utc(),
+ locator: request.locator,
+ media_type: request.media_type,
+ revision: 1,
+ created_at: now,
+ updated_at: now,
};
- let tx = self.connection.transaction()?;
- insert_open_experiment(&tx, &experiment)?;
- touch_frontier(&tx, request.frontier_id)?;
- insert_event(
- &tx,
- "experiment",
- &experiment.id.to_string(),
- "experiment.opened",
- json!({
- "frontier_id": experiment.frontier_id,
- "hypothesis_node_id": experiment.hypothesis_node_id,
- }),
+ let attachments = self.resolve_attachment_targets(&request.attachments)?;
+ let transaction = self.connection.transaction()?;
+ insert_artifact(&transaction, &record)?;
+ replace_artifact_attachments(&transaction, record.id, &attachments)?;
+ record_event(
+ &transaction,
+ "artifact",
+ &record.id.to_string(),
+ 1,
+ "created",
+ &record,
)?;
- tx.commit()?;
- Ok(summarize_open_experiment(&experiment))
+ transaction.commit()?;
+ Ok(record)
}
- pub fn list_open_experiments(
+ pub fn list_artifacts(
&self,
- frontier_id: Option<fidget_spinner_core::FrontierId>,
- ) -> Result<Vec<OpenExperimentSummary>, StoreError> {
- let mut statement = self.connection.prepare(
- "SELECT
- id,
- frontier_id,
- hypothesis_node_id,
- title,
- summary,
- created_at
- FROM open_experiments
- WHERE (?1 IS NULL OR frontier_id = ?1)
- ORDER BY created_at DESC",
- )?;
- let mut rows = statement.query(params![frontier_id.map(|id| id.to_string())])?;
- let mut items = Vec::new();
- while let Some(row) = rows.next()? {
- items.push(OpenExperimentSummary {
- id: parse_experiment_id(&row.get::<_, String>(0)?)?,
- frontier_id: parse_frontier_id(&row.get::<_, String>(1)?)?,
- hypothesis_node_id: parse_node_id(&row.get::<_, String>(2)?)?,
- title: NonEmptyText::new(row.get::<_, String>(3)?)?,
- summary: row
- .get::<_, Option<String>>(4)?
- .map(NonEmptyText::new)
- .transpose()?,
- created_at: decode_timestamp(&row.get::<_, String>(5)?)?,
- });
+ query: ListArtifactsQuery,
+ ) -> Result<Vec<ArtifactSummary>, StoreError> {
+ let records = self.load_artifact_records()?;
+ let frontier_id = query
+ .frontier
+ .as_deref()
+ .map(|selector| self.resolve_frontier(selector).map(|frontier| frontier.id))
+ .transpose()?;
+ let mut filtered = Vec::new();
+ for record in records {
+ if query.kind.is_some_and(|kind| record.kind != kind) {
+ continue;
+ }
+ if let Some(frontier_id) = frontier_id
+ && !self.artifact_attached_to_frontier(record.id, frontier_id)?
+ {
+ continue;
+ }
+ filtered.push(record);
}
- Ok(items)
+ let attached_filtered = match query.attached_to {
+ Some(selector) => {
+ let target = self.resolve_attachment_target(&selector)?;
+ filtered
+ .into_iter()
+ .filter(|record| {
+ self.artifact_attachment_targets(record.id)
+ .map(|targets| targets.contains(&target))
+ .unwrap_or(false)
+ })
+ .collect()
+ }
+ None => filtered,
+ };
+ Ok(apply_limit(
+ attached_filtered
+ .into_iter()
+ .map(|record| ArtifactSummary {
+ id: record.id,
+ slug: record.slug,
+ kind: record.kind,
+ label: record.label,
+ summary: record.summary,
+ locator: record.locator,
+ media_type: record.media_type,
+ updated_at: record.updated_at,
+ })
+ .collect(),
+ query.limit,
+ ))
}
- pub fn read_open_experiment(
- &self,
- experiment_id: fidget_spinner_core::ExperimentId,
- ) -> Result<OpenExperimentSummary, StoreError> {
- load_open_experiment(&self.connection, experiment_id)?
- .map(|experiment| summarize_open_experiment(&experiment))
- .ok_or(StoreError::ExperimentNotFound(experiment_id))
+ pub fn read_artifact(&self, selector: &str) -> Result<ArtifactDetail, StoreError> {
+ let record = self.resolve_artifact(selector)?;
+ Ok(ArtifactDetail {
+ attachments: self.artifact_attachment_targets(record.id)?,
+ record,
+ })
}
- pub fn close_experiment(
+ pub fn update_artifact(
&mut self,
- request: CloseExperimentRequest,
- ) -> Result<ExperimentReceipt, StoreError> {
- let open_experiment = load_open_experiment(&self.connection, request.experiment_id)?
- .ok_or(StoreError::ExperimentNotFound(request.experiment_id))?;
- let hypothesis_node = self
- .get_node(open_experiment.hypothesis_node_id)?
- .ok_or(StoreError::NodeNotFound(open_experiment.hypothesis_node_id))?;
- if hypothesis_node.class != NodeClass::Hypothesis {
- return Err(StoreError::NodeNotHypothesis(
- open_experiment.hypothesis_node_id,
- ));
+ request: UpdateArtifactRequest,
+ ) -> Result<ArtifactRecord, StoreError> {
+ let record = self.resolve_artifact(&request.artifact)?;
+ enforce_revision(
+ "artifact",
+ &request.artifact,
+ request.expected_revision,
+ record.revision,
+ )?;
+ let updated = ArtifactRecord {
+ kind: request.kind.unwrap_or(record.kind),
+ label: request.label.unwrap_or(record.label.clone()),
+ summary: apply_optional_text_patch(request.summary, record.summary.clone()),
+ locator: request.locator.unwrap_or(record.locator.clone()),
+ media_type: apply_optional_text_patch(request.media_type, record.media_type.clone()),
+ revision: record.revision.saturating_add(1),
+ updated_at: OffsetDateTime::now_utc(),
+ ..record
+ };
+ let attachments = request
+ .attachments
+ .as_ref()
+ .map(|selectors| self.resolve_attachment_targets(selectors))
+ .transpose()?;
+ let transaction = self.connection.transaction()?;
+ update_artifact_row(&transaction, &updated)?;
+ if let Some(attachments) = attachments.as_ref() {
+ replace_artifact_attachments(&transaction, updated.id, attachments)?;
}
- let tx = self.connection.transaction()?;
- let dimensions = validate_run_dimensions_tx(&tx, &request.dimensions)?;
- let primary_metric_definition =
- load_metric_definition_tx(&tx, &request.primary_metric.key)?.ok_or_else(|| {
- StoreError::UnknownMetricDefinition(request.primary_metric.key.clone())
- })?;
- let supporting_metric_definitions = request
- .supporting_metrics
- .iter()
- .map(|metric| {
- load_metric_definition_tx(&tx, &metric.key)?
- .ok_or_else(|| StoreError::UnknownMetricDefinition(metric.key.clone()))
+ record_event(
+ &transaction,
+ "artifact",
+ &updated.id.to_string(),
+ updated.revision,
+ "updated",
+ &updated,
+ )?;
+ transaction.commit()?;
+ Ok(updated)
+ }
+
+ pub fn frontier_open(&self, selector: &str) -> Result<FrontierOpenProjection, StoreError> {
+ let frontier = self.resolve_frontier(selector)?;
+ let active_hypothesis_ids = self.active_hypothesis_ids(frontier.id, &frontier.brief)?;
+ let active_hypotheses = active_hypothesis_ids
+ .into_iter()
+ .map(|hypothesis_id| {
+ let summary =
+ self.hypothesis_summary_from_record(self.hypothesis_by_id(hypothesis_id)?)?;
+ let open_experiments = self.list_experiments(ListExperimentsQuery {
+ hypothesis: Some(hypothesis_id.to_string()),
+ status: Some(ExperimentStatus::Open),
+ limit: None,
+ ..ListExperimentsQuery::default()
+ })?;
+ let latest_closed_experiment = self
+ .list_experiments(ListExperimentsQuery {
+ hypothesis: Some(hypothesis_id.to_string()),
+ status: Some(ExperimentStatus::Closed),
+ limit: Some(1),
+ ..ListExperimentsQuery::default()
+ })?
+ .into_iter()
+ .next();
+ Ok(HypothesisCurrentState {
+ hypothesis: summary,
+ open_experiments,
+ latest_closed_experiment,
+ })
})
.collect::<Result<Vec<_>, StoreError>>()?;
- let benchmark_suite = benchmark_suite_label(&dimensions);
-
- let run_payload = NodePayload::with_schema(
- self.schema.schema_ref(),
- json_object(json!({
- "dimensions": run_dimensions_json(&dimensions),
- "backend": format!("{:?}", request.backend),
- "command": request.command.argv.iter().map(NonEmptyText::as_str).collect::<Vec<_>>(),
- }))?,
- );
- let run_diagnostics = self.schema.validate_node(NodeClass::Run, &run_payload);
- let run_node = DagNode::new(
- NodeClass::Run,
- Some(open_experiment.frontier_id),
- request.run_title,
- request.run_summary,
- run_payload,
- run_diagnostics,
- );
- let run_id = fidget_spinner_core::RunId::fresh();
- let now = OffsetDateTime::now_utc();
- let run = RunRecord {
- node_id: run_node.id,
- run_id,
- frontier_id: Some(open_experiment.frontier_id),
- status: RunStatus::Succeeded,
- backend: request.backend,
- dimensions: dimensions.clone(),
- command: request.command,
- started_at: Some(now),
- finished_at: Some(now),
- };
+ let open_experiments = self.list_experiments(ListExperimentsQuery {
+ frontier: Some(frontier.id.to_string()),
+ status: Some(ExperimentStatus::Open),
+ limit: None,
+ ..ListExperimentsQuery::default()
+ })?;
+ let active_tags = derive_active_tags(&active_hypotheses, &open_experiments);
+ let active_metric_keys =
+ self.live_metric_keys(frontier.id, &active_hypotheses, &open_experiments)?;
+ Ok(FrontierOpenProjection {
+ frontier,
+ active_tags,
+ active_metric_keys,
+ active_hypotheses,
+ open_experiments,
+ })
+ }
- let analysis_node = request
- .analysis
- .map(|analysis| -> Result<DagNode, StoreError> {
- let payload = NodePayload::with_schema(
- self.schema.schema_ref(),
- json_object(json!({
- "body": analysis.body.as_str(),
- }))?,
- );
- let diagnostics = self.schema.validate_node(NodeClass::Analysis, &payload);
- Ok(DagNode::new(
- NodeClass::Analysis,
- Some(open_experiment.frontier_id),
- analysis.title,
- Some(analysis.summary),
- payload,
- diagnostics,
- ))
+ pub fn metric_keys(&self, query: MetricKeysQuery) -> Result<Vec<MetricKeySummary>, StoreError> {
+ let frontier_id = query
+ .frontier
+ .as_deref()
+ .map(|selector| self.resolve_frontier(selector).map(|frontier| frontier.id))
+ .transpose()?;
+ let definitions = self.list_metric_definitions()?;
+ let live_keys = frontier_id
+ .map(|frontier_id| self.live_metric_key_names(frontier_id))
+ .transpose()?
+ .unwrap_or_default();
+ let mut keys = definitions
+ .into_iter()
+ .filter(|definition| match query.scope {
+ MetricScope::Live => live_keys.contains(definition.key.as_str()),
+ MetricScope::Visible => definition.visibility.is_default_visible(),
+ MetricScope::All => true,
})
+ .map(|definition| {
+ Ok(MetricKeySummary {
+ reference_count: self.metric_reference_count(frontier_id, &definition.key)?,
+ key: definition.key,
+ unit: definition.unit,
+ objective: definition.objective,
+ visibility: definition.visibility,
+ description: definition.description,
+ })
+ })
+ .collect::<Result<Vec<_>, StoreError>>()?;
+ keys.sort_by(|left, right| left.key.as_str().cmp(right.key.as_str()));
+ Ok(keys)
+ }
+
+ pub fn metric_best(&self, query: MetricBestQuery) -> Result<Vec<MetricBestEntry>, StoreError> {
+ let definition = self
+ .metric_definition(&query.key)?
+ .ok_or_else(|| StoreError::UnknownMetricDefinition(query.key.clone()))?;
+ let frontier_id = query
+ .frontier
+ .as_deref()
+ .map(|selector| self.resolve_frontier(selector).map(|frontier| frontier.id))
.transpose()?;
+ let hypothesis_id = query
+ .hypothesis
+ .as_deref()
+ .map(|selector| {
+ self.resolve_hypothesis(selector)
+ .map(|hypothesis| hypothesis.id)
+ })
+ .transpose()?;
+ let order = query.order.unwrap_or(match definition.objective {
+ OptimizationObjective::Minimize => MetricRankOrder::Asc,
+ OptimizationObjective::Maximize => MetricRankOrder::Desc,
+ OptimizationObjective::Target => {
+ return Err(StoreError::MetricOrderRequired {
+ key: query.key.to_string(),
+ });
+ }
+ });
+ let experiments = self
+ .load_experiment_records(frontier_id, hypothesis_id, true)?
+ .into_iter()
+ .filter(|record| record.status == ExperimentStatus::Closed)
+ .filter(|record| {
+ query.include_rejected
+ || record
+ .outcome
+ .as_ref()
+ .is_some_and(|outcome| outcome.verdict != FrontierVerdict::Rejected)
+ })
+ .collect::<Vec<_>>();
+ let mut entries = experiments
+ .into_iter()
+ .filter_map(|record| {
+ let outcome = record.outcome.clone()?;
+ if !dimension_subset_matches(&query.dimensions, &outcome.dimensions) {
+ return None;
+ }
+ let metric = all_metrics(&outcome)
+ .into_iter()
+ .find(|metric| metric.key == query.key)?;
+ Some((record, outcome.dimensions.clone(), metric.value))
+ })
+ .map(|(record, dimensions, value)| {
+ Ok(MetricBestEntry {
+ experiment: self.experiment_summary_from_record(record.clone())?,
+ hypothesis: self.hypothesis_summary_from_record(
+ self.hypothesis_by_id(record.hypothesis_id)?,
+ )?,
+ value,
+ dimensions,
+ })
+ })
+ .collect::<Result<Vec<_>, StoreError>>()?;
+ entries.sort_by(|left, right| compare_metric_values(left.value, right.value, order));
+ Ok(apply_limit(entries, query.limit))
+ }
- let decision_payload = NodePayload::with_schema(
- self.schema.schema_ref(),
- json_object(json!({
- "verdict": format!("{:?}", request.verdict),
- "rationale": request.decision_rationale.as_str(),
- }))?,
- );
- let decision_diagnostics = self
- .schema
- .validate_node(NodeClass::Decision, &decision_payload);
- let decision_node = DagNode::new(
- NodeClass::Decision,
- Some(open_experiment.frontier_id),
- request.decision_title,
- Some(request.decision_rationale.clone()),
- decision_payload,
- decision_diagnostics,
- );
+ pub fn frontier_history(&self, selector: &str) -> Result<Vec<EntityHistoryEntry>, StoreError> {
+ let frontier = self.resolve_frontier(selector)?;
+ self.entity_history("frontier", &frontier.id.to_string())
+ }
- let experiment = CompletedExperiment {
- id: open_experiment.id,
- frontier_id: open_experiment.frontier_id,
- hypothesis_node_id: open_experiment.hypothesis_node_id,
- run_node_id: run_node.id,
- run_id,
- analysis_node_id: analysis_node.as_ref().map(|node| node.id),
- decision_node_id: decision_node.id,
- title: open_experiment.title.clone(),
- summary: open_experiment.summary.clone(),
- result: ExperimentResult {
- dimensions: dimensions.clone(),
- primary_metric: request.primary_metric,
- supporting_metrics: request.supporting_metrics,
- benchmark_bundle: None,
- },
- note: request.note,
- verdict: request.verdict,
- created_at: now,
+ pub fn hypothesis_history(
+ &self,
+ selector: &str,
+ ) -> Result<Vec<EntityHistoryEntry>, StoreError> {
+ let hypothesis = self.resolve_hypothesis(selector)?;
+ self.entity_history("hypothesis", &hypothesis.id.to_string())
+ }
+
+ pub fn experiment_history(
+ &self,
+ selector: &str,
+ ) -> Result<Vec<EntityHistoryEntry>, StoreError> {
+ let experiment = self.resolve_experiment(selector)?;
+ self.entity_history("experiment", &experiment.id.to_string())
+ }
+
+ pub fn artifact_history(&self, selector: &str) -> Result<Vec<EntityHistoryEntry>, StoreError> {
+ let artifact = self.resolve_artifact(selector)?;
+ self.entity_history("artifact", &artifact.id.to_string())
+ }
+
+ fn metric_definition(
+ &self,
+ key: &NonEmptyText,
+ ) -> Result<Option<MetricDefinition>, StoreError> {
+ self.connection
+ .query_row(
+ "SELECT key, unit, objective, visibility, description, created_at, updated_at
+ FROM metric_definitions
+ WHERE key = ?1",
+ params![key.as_str()],
+ decode_metric_definition_row,
+ )
+ .optional()
+ .map_err(StoreError::from)
+ }
+
+ fn run_dimension_definition(
+ &self,
+ key: &NonEmptyText,
+ ) -> Result<Option<RunDimensionDefinition>, StoreError> {
+ self.connection
+ .query_row(
+ "SELECT key, value_type, description, created_at, updated_at
+ FROM run_dimension_definitions
+ WHERE key = ?1",
+ params![key.as_str()],
+ decode_run_dimension_definition_row,
+ )
+ .optional()
+ .map_err(StoreError::from)
+ }
+
+ fn hypothesis_by_id(&self, id: HypothesisId) -> Result<HypothesisRecord, StoreError> {
+ self.connection
+ .query_row(
+ "SELECT id, slug, frontier_id, archived, title, summary, body, revision, created_at, updated_at
+ FROM hypotheses WHERE id = ?1",
+ params![id.to_string()],
+ |row| self.decode_hypothesis_row(row),
+ )
+ .map_err(StoreError::from)
+ }
+
+ fn resolve_frontier(&self, selector: &str) -> Result<FrontierRecord, StoreError> {
+ let record = match resolve_selector(selector)? {
+ Selector::Id(uuid) => self
+ .connection
+ .query_row(
+ "SELECT id, slug, label, objective, status, brief_json, revision, created_at, updated_at
+ FROM frontiers WHERE id = ?1",
+ params![uuid.to_string()],
+ decode_frontier_row,
+ )
+ .optional()?,
+ Selector::Slug(slug) => self
+ .connection
+ .query_row(
+ "SELECT id, slug, label, objective, status, brief_json, revision, created_at, updated_at
+ FROM frontiers WHERE slug = ?1",
+ params![slug.as_str()],
+ decode_frontier_row,
+ )
+ .optional()?,
};
- insert_node(&tx, &run_node)?;
- if let Some(node) = analysis_node.as_ref() {
- insert_node(&tx, node)?;
+ record.ok_or_else(|| StoreError::UnknownFrontierSelector(selector.to_owned()))
+ }
+
+ fn resolve_hypothesis(&self, selector: &str) -> Result<HypothesisRecord, StoreError> {
+ let record = match resolve_selector(selector)? {
+ Selector::Id(uuid) => self
+ .connection
+ .query_row(
+ "SELECT id, slug, frontier_id, archived, title, summary, body, revision, created_at, updated_at
+ FROM hypotheses WHERE id = ?1",
+ params![uuid.to_string()],
+ |row| self.decode_hypothesis_row(row),
+ )
+ .optional()?,
+ Selector::Slug(slug) => self
+ .connection
+ .query_row(
+ "SELECT id, slug, frontier_id, archived, title, summary, body, revision, created_at, updated_at
+ FROM hypotheses WHERE slug = ?1",
+ params![slug.as_str()],
+ |row| self.decode_hypothesis_row(row),
+ )
+ .optional()?,
+ };
+ record.ok_or_else(|| StoreError::UnknownHypothesisSelector(selector.to_owned()))
+ }
+
+ fn resolve_experiment(&self, selector: &str) -> Result<ExperimentRecord, StoreError> {
+ let record = match resolve_selector(selector)? {
+ Selector::Id(uuid) => self
+ .connection
+ .query_row(
+ "SELECT id, slug, frontier_id, hypothesis_id, archived, title, summary, tags_json, status, outcome_json, revision, created_at, updated_at
+ FROM experiments WHERE id = ?1",
+ params![uuid.to_string()],
+ decode_experiment_row,
+ )
+ .optional()?,
+ Selector::Slug(slug) => self
+ .connection
+ .query_row(
+ "SELECT id, slug, frontier_id, hypothesis_id, archived, title, summary, tags_json, status, outcome_json, revision, created_at, updated_at
+ FROM experiments WHERE slug = ?1",
+ params![slug.as_str()],
+ decode_experiment_row,
+ )
+ .optional()?,
+ };
+ record.ok_or_else(|| StoreError::UnknownExperimentSelector(selector.to_owned()))
+ }
+
+ fn resolve_artifact(&self, selector: &str) -> Result<ArtifactRecord, StoreError> {
+ let record = match resolve_selector(selector)? {
+ Selector::Id(uuid) => self
+ .connection
+ .query_row(
+ "SELECT id, slug, kind, label, summary, locator, media_type, revision, created_at, updated_at
+ FROM artifacts WHERE id = ?1",
+ params![uuid.to_string()],
+ decode_artifact_row,
+ )
+ .optional()?,
+ Selector::Slug(slug) => self
+ .connection
+ .query_row(
+ "SELECT id, slug, kind, label, summary, locator, media_type, revision, created_at, updated_at
+ FROM artifacts WHERE slug = ?1",
+ params![slug.as_str()],
+ decode_artifact_row,
+ )
+ .optional()?,
+ };
+ record.ok_or_else(|| StoreError::UnknownArtifactSelector(selector.to_owned()))
+ }
+
+ fn resolve_vertex_parents(
+ &self,
+ frontier_id: FrontierId,
+ selectors: &[VertexSelector],
+ child: Option<VertexRef>,
+ ) -> Result<Vec<VertexRef>, StoreError> {
+ selectors
+ .iter()
+ .map(|selector| {
+ let vertex = match selector {
+ VertexSelector::Hypothesis(selector) => {
+ VertexRef::Hypothesis(self.resolve_hypothesis(selector)?.id)
+ }
+ VertexSelector::Experiment(selector) => {
+ VertexRef::Experiment(self.resolve_experiment(selector)?.id)
+ }
+ };
+ let parent_frontier_id = match vertex {
+ VertexRef::Hypothesis(id) => self.hypothesis_by_id(id)?.frontier_id,
+ VertexRef::Experiment(id) => {
+ self.resolve_experiment(&id.to_string())?.frontier_id
+ }
+ };
+ if parent_frontier_id != frontier_id {
+ return Err(StoreError::CrossFrontierInfluence);
+ }
+ if child.is_some_and(|child| child == vertex) {
+ return Err(StoreError::SelfEdge);
+ }
+ Ok(vertex)
+ })
+ .collect()
+ }
+
+ fn resolve_attachment_targets(
+ &self,
+ selectors: &[AttachmentSelector],
+ ) -> Result<Vec<AttachmentTargetRef>, StoreError> {
+ selectors
+ .iter()
+ .map(|selector| match selector {
+ AttachmentSelector::Frontier(selector) => Ok(AttachmentTargetRef::Frontier(
+ self.resolve_frontier(selector)?.id,
+ )),
+ AttachmentSelector::Hypothesis(selector) => Ok(AttachmentTargetRef::Hypothesis(
+ self.resolve_hypothesis(selector)?.id,
+ )),
+ AttachmentSelector::Experiment(selector) => Ok(AttachmentTargetRef::Experiment(
+ self.resolve_experiment(selector)?.id,
+ )),
+ })
+ .collect()
+ }
+
+ fn resolve_attachment_target(
+ &self,
+ selector: &AttachmentSelector,
+ ) -> Result<AttachmentTargetRef, StoreError> {
+ match selector {
+ AttachmentSelector::Frontier(selector) => Ok(AttachmentTargetRef::Frontier(
+ self.resolve_frontier(selector)?.id,
+ )),
+ AttachmentSelector::Hypothesis(selector) => Ok(AttachmentTargetRef::Hypothesis(
+ self.resolve_hypothesis(selector)?.id,
+ )),
+ AttachmentSelector::Experiment(selector) => Ok(AttachmentTargetRef::Experiment(
+ self.resolve_experiment(selector)?.id,
+ )),
}
- insert_node(&tx, &decision_node)?;
- insert_edge(
- &tx,
- &DagEdge {
- source_id: open_experiment.hypothesis_node_id,
- target_id: run_node.id,
- kind: EdgeKind::Lineage,
- },
- )?;
- if let Some(node) = analysis_node.as_ref() {
- insert_edge(
- &tx,
- &DagEdge {
- source_id: run_node.id,
- target_id: node.id,
- kind: EdgeKind::Evidence,
- },
- )?;
- insert_edge(
- &tx,
- &DagEdge {
- source_id: node.id,
- target_id: decision_node.id,
- kind: EdgeKind::Evidence,
- },
+ }
+
+ fn load_hypothesis_records(
+ &self,
+ frontier_id: Option<FrontierId>,
+ include_archived: bool,
+ ) -> Result<Vec<HypothesisRecord>, StoreError> {
+ let mut records = if let Some(frontier_id) = frontier_id {
+ let mut statement = self.connection.prepare(
+ "SELECT id, slug, frontier_id, archived, title, summary, body, revision, created_at, updated_at
+ FROM hypotheses
+ WHERE frontier_id = ?1
+ ORDER BY updated_at DESC, created_at DESC",
)?;
+ let rows = statement.query_map(params![frontier_id.to_string()], |row| {
+ self.decode_hypothesis_row(row)
+ })?;
+ rows.collect::<Result<Vec<_>, _>>()?
} else {
- insert_edge(
- &tx,
- &DagEdge {
- source_id: run_node.id,
- target_id: decision_node.id,
- kind: EdgeKind::Evidence,
- },
+ let mut statement = self.connection.prepare(
+ "SELECT id, slug, frontier_id, archived, title, summary, body, revision, created_at, updated_at
+ FROM hypotheses
+ ORDER BY updated_at DESC, created_at DESC",
)?;
+ let rows = statement.query_map([], |row| self.decode_hypothesis_row(row))?;
+ rows.collect::<Result<Vec<_>, _>>()?
+ };
+ if !include_archived {
+ records.retain(|record| !record.archived);
}
- insert_run(
- &tx,
- &run,
- benchmark_suite.as_deref(),
- &experiment.result.primary_metric,
- &primary_metric_definition,
- &experiment.result.supporting_metrics,
- supporting_metric_definitions.as_slice(),
- )?;
- insert_run_dimensions(&tx, run.run_id, &dimensions)?;
- insert_experiment(&tx, &experiment)?;
- delete_open_experiment(&tx, open_experiment.id)?;
- touch_frontier(&tx, open_experiment.frontier_id)?;
- insert_event(
- &tx,
- "experiment",
- &experiment.id.to_string(),
- "experiment.closed",
- json!({
- "frontier_id": open_experiment.frontier_id,
- "hypothesis_node_id": open_experiment.hypothesis_node_id,
- "verdict": format!("{:?}", request.verdict),
- }),
- )?;
- tx.commit()?;
-
- Ok(ExperimentReceipt {
- open_experiment,
- run_node,
- run,
- analysis_node,
- decision_node,
- experiment,
- })
+ Ok(records)
}
- fn load_annotations(
+ fn load_experiment_records(
&self,
- node_id: fidget_spinner_core::NodeId,
- ) -> Result<Vec<NodeAnnotation>, StoreError> {
+ frontier_id: Option<FrontierId>,
+ hypothesis_id: Option<HypothesisId>,
+ include_archived: bool,
+ ) -> Result<Vec<ExperimentRecord>, StoreError> {
+ let base_sql = "SELECT id, slug, frontier_id, hypothesis_id, archived, title, summary, tags_json, status, outcome_json, revision, created_at, updated_at FROM experiments";
+ let records = match (frontier_id, hypothesis_id) {
+ (Some(frontier_id), Some(hypothesis_id)) => {
+ let mut statement = self.connection.prepare(&format!(
+ "{base_sql} WHERE frontier_id = ?1 AND hypothesis_id = ?2 ORDER BY updated_at DESC, created_at DESC"
+ ))?;
+ let rows = statement.query_map(
+ params![frontier_id.to_string(), hypothesis_id.to_string()],
+ decode_experiment_row,
+ )?;
+ rows.collect::<Result<Vec<_>, _>>()?
+ }
+ (Some(frontier_id), None) => {
+ let mut statement = self.connection.prepare(&format!(
+ "{base_sql} WHERE frontier_id = ?1 ORDER BY updated_at DESC, created_at DESC"
+ ))?;
+ let rows =
+ statement.query_map(params![frontier_id.to_string()], decode_experiment_row)?;
+ rows.collect::<Result<Vec<_>, _>>()?
+ }
+ (None, Some(hypothesis_id)) => {
+ let mut statement = self.connection.prepare(&format!(
+ "{base_sql} WHERE hypothesis_id = ?1 ORDER BY updated_at DESC, created_at DESC"
+ ))?;
+ let rows = statement
+ .query_map(params![hypothesis_id.to_string()], decode_experiment_row)?;
+ rows.collect::<Result<Vec<_>, _>>()?
+ }
+ (None, None) => {
+ let mut statement = self.connection.prepare(&format!(
+ "{base_sql} ORDER BY updated_at DESC, created_at DESC"
+ ))?;
+ let rows = statement.query_map([], decode_experiment_row)?;
+ rows.collect::<Result<Vec<_>, _>>()?
+ }
+ };
+ Ok(if include_archived {
+ records
+ } else {
+ records
+ .into_iter()
+ .filter(|record| !record.archived)
+ .collect()
+ })
+ }
+
+ fn load_artifact_records(&self) -> Result<Vec<ArtifactRecord>, StoreError> {
let mut statement = self.connection.prepare(
- "SELECT id, visibility, label, body, created_at
- FROM node_annotations
- WHERE node_id = ?1
- ORDER BY created_at ASC",
+ "SELECT id, slug, kind, label, summary, locator, media_type, revision, created_at, updated_at
+ FROM artifacts
+ ORDER BY updated_at DESC, created_at DESC",
)?;
- let mut rows = statement.query(params![node_id.to_string()])?;
- let mut items = Vec::new();
- while let Some(row) = rows.next()? {
- items.push(NodeAnnotation {
- id: parse_annotation_id(&row.get::<_, String>(0)?)?,
- visibility: parse_annotation_visibility(&row.get::<_, String>(1)?)?,
- label: row
- .get::<_, Option<String>>(2)?
- .map(NonEmptyText::new)
- .transpose()?,
- body: NonEmptyText::new(row.get::<_, String>(3)?)?,
- created_at: decode_timestamp(&row.get::<_, String>(4)?)?,
- });
- }
- Ok(items)
+ let rows = statement.query_map([], decode_artifact_row)?;
+ rows.collect::<Result<Vec<_>, _>>()
+ .map_err(StoreError::from)
}
- fn load_tags(
+ fn decode_hypothesis_row(
&self,
- node_id: fidget_spinner_core::NodeId,
- ) -> Result<BTreeSet<TagName>, StoreError> {
+ row: &rusqlite::Row<'_>,
+ ) -> Result<HypothesisRecord, rusqlite::Error> {
+ let id = HypothesisId::from_uuid(parse_uuid_sql(&row.get::<_, String>(0)?)?);
+ Ok(HypothesisRecord {
+ id,
+ slug: parse_slug(&row.get::<_, String>(1)?)?,
+ frontier_id: FrontierId::from_uuid(parse_uuid_sql(&row.get::<_, String>(2)?)?),
+ archived: row.get::<_, i64>(3)? != 0,
+ title: parse_non_empty_text(&row.get::<_, String>(4)?)?,
+ summary: parse_non_empty_text(&row.get::<_, String>(5)?)?,
+ body: parse_non_empty_text(&row.get::<_, String>(6)?)?,
+ tags: self.hypothesis_tags(id)?,
+ revision: row.get::<_, u64>(7)?,
+ created_at: parse_timestamp_sql(&row.get::<_, String>(8)?)?,
+ updated_at: parse_timestamp_sql(&row.get::<_, String>(9)?)?,
+ })
+ }
+
+ fn hypothesis_tags(&self, id: HypothesisId) -> Result<Vec<TagName>, rusqlite::Error> {
let mut statement = self.connection.prepare(
- "SELECT tag_name
- FROM node_tags
- WHERE node_id = ?1
- ORDER BY tag_name ASC",
+ "SELECT tag_name FROM hypothesis_tags WHERE hypothesis_id = ?1 ORDER BY tag_name ASC",
)?;
- let mut rows = statement.query(params![node_id.to_string()])?;
- let mut items = BTreeSet::new();
- while let Some(row) = rows.next()? {
- let _ = items.insert(TagName::new(row.get::<_, String>(0)?)?);
- }
- Ok(items)
+ let rows = statement.query_map(params![id.to_string()], |row| {
+ parse_tag_name(&row.get::<_, String>(0)?)
+ })?;
+ rows.collect::<Result<Vec<_>, _>>()
}
- fn load_frontier(
+ fn hypothesis_summary_from_record(
&self,
- frontier_id: fidget_spinner_core::FrontierId,
- ) -> Result<FrontierRecord, StoreError> {
- let mut statement = self.connection.prepare(
- "SELECT id, label, root_contract_node_id, status, created_at, updated_at
- FROM frontiers
- WHERE id = ?1",
- )?;
- let frontier = statement
- .query_row(params![frontier_id.to_string()], |row| {
- read_frontier_row(row).map_err(to_sql_conversion_error)
- })
- .optional()?;
- frontier.ok_or(StoreError::FrontierNotFound(frontier_id))
+ record: HypothesisRecord,
+ ) -> Result<HypothesisSummary, StoreError> {
+ let latest_verdict = self
+ .latest_closed_experiment(record.id)?
+ .and_then(|experiment| experiment.outcome.map(|outcome| outcome.verdict));
+ Ok(HypothesisSummary {
+ id: record.id,
+ slug: record.slug,
+ frontier_id: record.frontier_id,
+ archived: record.archived,
+ title: record.title,
+ summary: record.summary,
+ tags: record.tags,
+ open_experiment_count: self
+ .list_experiments(ListExperimentsQuery {
+ hypothesis: Some(record.id.to_string()),
+ status: Some(ExperimentStatus::Open),
+ limit: None,
+ ..ListExperimentsQuery::default()
+ })?
+ .len() as u64,
+ latest_verdict,
+ updated_at: record.updated_at,
+ })
}
-}
-fn upgrade_store(connection: &mut Connection) -> Result<(), StoreError> {
- migrate(connection)?;
- backfill_prose_summaries(connection)?;
- let tx = connection.transaction()?;
- let _ = normalize_metric_plane_tx(&tx)?;
- tx.commit()?;
- Ok(())
-}
+ fn experiment_summary_from_record(
+ &self,
+ record: ExperimentRecord,
+ ) -> Result<ExperimentSummary, StoreError> {
+ Ok(ExperimentSummary {
+ id: record.id,
+ slug: record.slug,
+ frontier_id: record.frontier_id,
+ hypothesis_id: record.hypothesis_id,
+ archived: record.archived,
+ title: record.title,
+ summary: record.summary,
+ tags: record.tags,
+ status: record.status,
+ verdict: record.outcome.as_ref().map(|outcome| outcome.verdict),
+ primary_metric: record
+ .outcome
+ .as_ref()
+ .map(|outcome| self.metric_observation_summary(&outcome.primary_metric))
+ .transpose()?,
+ updated_at: record.updated_at,
+ closed_at: record.outcome.as_ref().map(|outcome| outcome.closed_at),
+ })
+ }
-fn validate_prose_node_request(request: &CreateNodeRequest) -> Result<(), StoreError> {
- if !matches!(request.class, NodeClass::Note | NodeClass::Source) {
- return Ok(());
+ fn metric_observation_summary(
+ &self,
+ metric: &MetricValue,
+ ) -> Result<MetricObservationSummary, StoreError> {
+ let definition = self
+ .metric_definition(&metric.key)?
+ .ok_or_else(|| StoreError::UnknownMetricDefinition(metric.key.clone()))?;
+ Ok(MetricObservationSummary {
+ key: metric.key.clone(),
+ value: metric.value,
+ unit: definition.unit,
+ objective: definition.objective,
+ })
}
- if request.summary.is_none() {
- return Err(StoreError::ProseSummaryRequired(request.class));
+
+ fn latest_closed_experiment(
+ &self,
+ hypothesis_id: HypothesisId,
+ ) -> Result<Option<ExperimentRecord>, StoreError> {
+ self.load_experiment_records(None, Some(hypothesis_id), true)
+ .map(|records| {
+ records
+ .into_iter()
+ .filter(|record| record.status == ExperimentStatus::Closed)
+ .max_by_key(|record| {
+ record
+ .outcome
+ .as_ref()
+ .map(|outcome| outcome.closed_at)
+ .unwrap_or(record.updated_at)
+ })
+ })
}
- match request.payload.field("body") {
- Some(Value::String(body)) if !body.trim().is_empty() => Ok(()),
- _ => Err(StoreError::ProseBodyRequired(request.class)),
+
+ fn load_vertex_parents(&self, child: VertexRef) -> Result<Vec<VertexSummary>, StoreError> {
+ let mut statement = self.connection.prepare(
+ "SELECT parent_kind, parent_id
+ FROM influence_edges
+ WHERE child_kind = ?1 AND child_id = ?2
+ ORDER BY ordinal ASC, parent_kind ASC, parent_id ASC",
+ )?;
+ let rows = statement.query_map(
+ params![vertex_kind_name(child), child.opaque_id()],
+ |row| -> Result<VertexRef, rusqlite::Error> {
+ decode_vertex_ref(&row.get::<_, String>(0)?, &row.get::<_, String>(1)?)
+ },
+ )?;
+ rows.collect::<Result<Vec<_>, _>>()?
+ .into_iter()
+ .map(|parent| self.vertex_summary(parent))
+ .collect()
}
-}
-#[derive(Clone, Debug)]
-struct MetricSample {
- key: NonEmptyText,
- source: MetricFieldSource,
- value: f64,
- frontier_id: fidget_spinner_core::FrontierId,
- experiment_id: fidget_spinner_core::ExperimentId,
- experiment_title: NonEmptyText,
- hypothesis_node_id: fidget_spinner_core::NodeId,
- hypothesis_title: NonEmptyText,
- run_id: fidget_spinner_core::RunId,
- verdict: FrontierVerdict,
- unit: Option<MetricUnit>,
- objective: Option<OptimizationObjective>,
- dimensions: BTreeMap<NonEmptyText, RunDimensionValue>,
-}
+ fn load_vertex_children(&self, parent: VertexRef) -> Result<Vec<VertexSummary>, StoreError> {
+ let mut statement = self.connection.prepare(
+ "SELECT child_kind, child_id
+ FROM influence_edges
+ WHERE parent_kind = ?1 AND parent_id = ?2
+ ORDER BY ordinal ASC, child_kind ASC, child_id ASC",
+ )?;
+ let rows = statement.query_map(
+ params![vertex_kind_name(parent), parent.opaque_id()],
+ |row| -> Result<VertexRef, rusqlite::Error> {
+ decode_vertex_ref(&row.get::<_, String>(0)?, &row.get::<_, String>(1)?)
+ },
+ )?;
+ rows.collect::<Result<Vec<_>, _>>()?
+ .into_iter()
+ .map(|child| self.vertex_summary(child))
+ .collect()
+ }
-impl MetricSample {
- fn into_entry(self, order: MetricRankOrder) -> MetricBestEntry {
- MetricBestEntry {
- key: self.key,
- source: self.source,
- value: self.value,
- order,
- experiment_id: self.experiment_id,
- experiment_title: self.experiment_title,
- frontier_id: self.frontier_id,
- hypothesis_node_id: self.hypothesis_node_id,
- hypothesis_title: self.hypothesis_title,
- run_id: self.run_id,
- verdict: self.verdict,
- unit: self.unit,
- objective: self.objective,
- dimensions: self.dimensions,
+ fn vertex_summary(&self, vertex: VertexRef) -> Result<VertexSummary, StoreError> {
+ match vertex {
+ VertexRef::Hypothesis(id) => {
+ let record = self.hypothesis_by_id(id)?;
+ Ok(VertexSummary {
+ vertex,
+ frontier_id: record.frontier_id,
+ slug: record.slug,
+ archived: record.archived,
+ title: record.title,
+ summary: Some(record.summary),
+ updated_at: record.updated_at,
+ })
+ }
+ VertexRef::Experiment(id) => {
+ let record = self.resolve_experiment(&id.to_string())?;
+ Ok(VertexSummary {
+ vertex,
+ frontier_id: record.frontier_id,
+ slug: record.slug,
+ archived: record.archived,
+ title: record.title,
+ summary: record.summary,
+ updated_at: record.updated_at,
+ })
+ }
}
}
-}
-#[derive(Clone, Debug)]
-struct MetricKeyAccumulator {
- key: NonEmptyText,
- source: MetricFieldSource,
- experiment_ids: BTreeSet<fidget_spinner_core::ExperimentId>,
- unit: Option<MetricUnit>,
- objective: Option<OptimizationObjective>,
- ambiguous_semantics: bool,
-}
+ fn artifact_attachment_targets(
+ &self,
+ artifact_id: ArtifactId,
+ ) -> Result<Vec<AttachmentTargetRef>, StoreError> {
+ let mut statement = self.connection.prepare(
+ "SELECT target_kind, target_id
+ FROM artifact_attachments
+ WHERE artifact_id = ?1
+ ORDER BY ordinal ASC, target_kind ASC, target_id ASC",
+ )?;
+ let rows = statement.query_map(params![artifact_id.to_string()], |row| {
+ decode_attachment_target(&row.get::<_, String>(0)?, &row.get::<_, String>(1)?)
+ })?;
+ rows.collect::<Result<Vec<_>, _>>()
+ .map_err(StoreError::from)
+ }
-impl MetricKeyAccumulator {
- fn from_sample(sample: &MetricSample) -> Self {
- Self {
- key: sample.key.clone(),
- source: sample.source,
- experiment_ids: BTreeSet::from([sample.experiment_id]),
- unit: sample.unit,
- objective: sample.objective,
- ambiguous_semantics: false,
+ fn artifact_attached_to_frontier(
+ &self,
+ artifact_id: ArtifactId,
+ frontier_id: FrontierId,
+ ) -> Result<bool, StoreError> {
+ let targets = self.artifact_attachment_targets(artifact_id)?;
+ if targets.contains(&AttachmentTargetRef::Frontier(frontier_id)) {
+ return Ok(true);
+ }
+ for target in targets {
+ match target {
+ AttachmentTargetRef::Hypothesis(hypothesis_id) => {
+ if self.hypothesis_by_id(hypothesis_id)?.frontier_id == frontier_id {
+ return Ok(true);
+ }
+ }
+ AttachmentTargetRef::Experiment(experiment_id) => {
+ if self
+ .resolve_experiment(&experiment_id.to_string())?
+ .frontier_id
+ == frontier_id
+ {
+ return Ok(true);
+ }
+ }
+ AttachmentTargetRef::Frontier(_) => {}
+ }
}
+ Ok(false)
}
- fn observe(&mut self, sample: &MetricSample) {
- let _ = self.experiment_ids.insert(sample.experiment_id);
- if self.unit != sample.unit || self.objective != sample.objective {
- self.ambiguous_semantics = true;
- self.unit = None;
- self.objective = None;
+ fn active_hypothesis_ids(
+ &self,
+ frontier_id: FrontierId,
+ brief: &FrontierBrief,
+ ) -> Result<BTreeSet<HypothesisId>, StoreError> {
+ let mut ids = brief
+ .roadmap
+ .iter()
+ .map(|item| item.hypothesis_id)
+ .collect::<BTreeSet<_>>();
+ for experiment in self.list_experiments(ListExperimentsQuery {
+ frontier: Some(frontier_id.to_string()),
+ status: Some(ExperimentStatus::Open),
+ limit: None,
+ ..ListExperimentsQuery::default()
+ })? {
+ let _ = ids.insert(experiment.hypothesis_id);
}
+ Ok(ids)
}
- fn finish(self) -> MetricKeySummary {
- MetricKeySummary {
- key: self.key,
- source: self.source,
- experiment_count: self.experiment_ids.len() as u64,
- unit: self.unit,
- objective: self.objective,
- description: None,
- requires_order: self.source != MetricFieldSource::RunMetric
- || self.ambiguous_semantics
- || !matches!(
- self.objective,
- Some(OptimizationObjective::Minimize | OptimizationObjective::Maximize)
- ),
- }
+ fn active_hypothesis_count(&self, frontier_id: FrontierId) -> Result<u64, StoreError> {
+ let frontier = self.read_frontier(&frontier_id.to_string())?;
+ Ok(self
+ .active_hypothesis_ids(frontier_id, &frontier.brief)?
+ .len() as u64)
}
-}
-fn collect_metric_samples(
- store: &ProjectStore,
- query: &MetricKeyQuery,
-) -> Result<Vec<MetricSample>, StoreError> {
- let rows = load_experiment_rows(store)?;
- let metric_definitions = metric_definitions_by_key(store)?;
- let mut samples = Vec::new();
- for row in rows {
- if query
- .frontier_id
- .is_some_and(|frontier_id| row.frontier_id != frontier_id)
- {
- continue;
+ fn open_experiment_count(&self, frontier_id: Option<FrontierId>) -> Result<u64, StoreError> {
+ Ok(self
+ .load_experiment_records(frontier_id, None, false)?
+ .into_iter()
+ .filter(|record| record.status == ExperimentStatus::Open)
+ .count() as u64)
+ }
+
+ fn live_metric_keys(
+ &self,
+ frontier_id: FrontierId,
+ active_hypotheses: &[HypothesisCurrentState],
+ open_experiments: &[ExperimentSummary],
+ ) -> Result<Vec<MetricKeySummary>, StoreError> {
+ let live_names = self.live_metric_key_names_with_context(
+ frontier_id,
+ active_hypotheses,
+ open_experiments,
+ )?;
+ let mut keys = self
+ .list_metric_definitions()?
+ .into_iter()
+ .filter(|definition| live_names.contains(definition.key.as_str()))
+ .filter(|definition| definition.visibility.is_default_visible())
+ .map(|definition| {
+ Ok(MetricKeySummary {
+ reference_count: self
+ .metric_reference_count(Some(frontier_id), &definition.key)?,
+ key: definition.key,
+ unit: definition.unit,
+ objective: definition.objective,
+ visibility: definition.visibility,
+ description: definition.description,
+ })
+ })
+ .collect::<Result<Vec<_>, StoreError>>()?;
+ keys.sort_by(|left, right| left.key.as_str().cmp(right.key.as_str()));
+ Ok(keys)
+ }
+
+ fn live_metric_key_names(
+ &self,
+ frontier_id: FrontierId,
+ ) -> Result<BTreeSet<String>, StoreError> {
+ let frontier = self.read_frontier(&frontier_id.to_string())?;
+ let active_hypotheses = self
+ .active_hypothesis_ids(frontier_id, &frontier.brief)?
+ .into_iter()
+ .map(|hypothesis_id| {
+ let summary =
+ self.hypothesis_summary_from_record(self.hypothesis_by_id(hypothesis_id)?)?;
+ let open_experiments = self.list_experiments(ListExperimentsQuery {
+ hypothesis: Some(hypothesis_id.to_string()),
+ status: Some(ExperimentStatus::Open),
+ limit: None,
+ ..ListExperimentsQuery::default()
+ })?;
+ let latest_closed_experiment = self
+ .list_experiments(ListExperimentsQuery {
+ hypothesis: Some(hypothesis_id.to_string()),
+ status: Some(ExperimentStatus::Closed),
+ limit: Some(1),
+ ..ListExperimentsQuery::default()
+ })?
+ .into_iter()
+ .next();
+ Ok(HypothesisCurrentState {
+ hypothesis: summary,
+ open_experiments,
+ latest_closed_experiment,
+ })
+ })
+ .collect::<Result<Vec<_>, StoreError>>()?;
+ let open_experiments = self.list_experiments(ListExperimentsQuery {
+ frontier: Some(frontier_id.to_string()),
+ status: Some(ExperimentStatus::Open),
+ limit: None,
+ ..ListExperimentsQuery::default()
+ })?;
+ self.live_metric_key_names_with_context(frontier_id, &active_hypotheses, &open_experiments)
+ }
+
+ fn live_metric_key_names_with_context(
+ &self,
+ _frontier_id: FrontierId,
+ active_hypotheses: &[HypothesisCurrentState],
+ open_experiments: &[ExperimentSummary],
+ ) -> Result<BTreeSet<String>, StoreError> {
+ let mut keys = BTreeSet::new();
+ for state in active_hypotheses {
+ if let Some(experiment) = state.latest_closed_experiment.as_ref() {
+ keys.extend(self.experiment_metric_key_names(experiment.id)?);
+ }
}
- if !dimensions_match(&row.dimensions, &query.dimensions) {
- continue;
+ for experiment in open_experiments {
+ for parent in self.load_vertex_parents(VertexRef::Experiment(experiment.id))? {
+ if let VertexRef::Experiment(parent_id) = parent.vertex {
+ keys.extend(self.experiment_metric_key_names(parent_id)?);
+ }
+ }
}
- samples.extend(metric_samples_for_row(
- store.schema(),
- &row,
- &metric_definitions,
- ));
+ Ok(keys)
}
- Ok(if let Some(source) = query.source {
- samples
- .into_iter()
- .filter(|sample| sample.source == source)
- .collect()
- } else {
- samples
- })
-}
-fn resolve_metric_order(
- matching: &[MetricSample],
- query: &MetricBestQuery,
- source: MetricFieldSource,
-) -> Result<MetricRankOrder, StoreError> {
- if let Some(order) = query.order {
- return Ok(order);
+ fn experiment_metric_key_names(
+ &self,
+ experiment_id: ExperimentId,
+ ) -> Result<BTreeSet<String>, StoreError> {
+ let record = self.resolve_experiment(&experiment_id.to_string())?;
+ Ok(record
+ .outcome
+ .as_ref()
+ .map(all_metrics)
+ .unwrap_or_default()
+ .into_iter()
+ .map(|metric| metric.key.to_string())
+ .collect())
}
- if source != MetricFieldSource::RunMetric {
- return Err(StoreError::MetricOrderRequired {
- key: query.key.as_str().to_owned(),
- metric_source: source.as_str().to_owned(),
- });
+
+ fn metric_reference_count(
+ &self,
+ frontier_id: Option<FrontierId>,
+ key: &NonEmptyText,
+ ) -> Result<u64, StoreError> {
+ let base_sql = "SELECT COUNT(*)
+ FROM experiment_metrics metrics
+ JOIN experiments experiments ON experiments.id = metrics.experiment_id";
+ let count = if let Some(frontier_id) = frontier_id {
+ self.connection.query_row(
+ &format!("{base_sql} WHERE metrics.key = ?1 AND experiments.frontier_id = ?2"),
+ params![key.as_str(), frontier_id.to_string()],
+ |row| row.get::<_, u64>(0),
+ )?
+ } else {
+ self.connection.query_row(
+ &format!("{base_sql} WHERE metrics.key = ?1"),
+ params![key.as_str()],
+ |row| row.get::<_, u64>(0),
+ )?
+ };
+ Ok(count)
}
- let objectives = matching
- .iter()
- .map(|sample| sample.objective)
- .collect::<BTreeSet<_>>();
- match objectives.len() {
- 1 => match objectives.into_iter().next().flatten() {
- Some(OptimizationObjective::Minimize) => Ok(MetricRankOrder::Asc),
- Some(OptimizationObjective::Maximize) => Ok(MetricRankOrder::Desc),
- Some(OptimizationObjective::Target) | None => Err(StoreError::MetricOrderRequired {
- key: query.key.as_str().to_owned(),
- metric_source: source.as_str().to_owned(),
- }),
- },
- _ => Err(StoreError::MetricSemanticsAmbiguous {
- key: query.key.as_str().to_owned(),
- metric_source: source.as_str().to_owned(),
- }),
+
+ fn materialize_outcome(
+ &self,
+ patch: &ExperimentOutcomePatch,
+ ) -> Result<ExperimentOutcome, StoreError> {
+ if patch.backend == ExecutionBackend::Manual && patch.command.argv.is_empty() {
+ return Err(StoreError::ManualExperimentRequiresCommand);
+ }
+ for key in patch.dimensions.keys() {
+ let definition = self
+ .run_dimension_definition(key)?
+ .ok_or_else(|| StoreError::UnknownRunDimension(key.clone()))?;
+ let observed = patch
+ .dimensions
+ .get(key)
+ .map(RunDimensionValue::value_type)
+ .ok_or_else(|| StoreError::UnknownRunDimension(key.clone()))?;
+ if definition.value_type != observed {
+ return Err(StoreError::UnknownDimensionFilter(key.to_string()));
+ }
+ }
+ let _ = self
+ .metric_definition(&patch.primary_metric.key)?
+ .ok_or_else(|| StoreError::UnknownMetricDefinition(patch.primary_metric.key.clone()))?;
+ for metric in &patch.supporting_metrics {
+ let _ = self
+ .metric_definition(&metric.key)?
+ .ok_or_else(|| StoreError::UnknownMetricDefinition(metric.key.clone()))?;
+ }
+ Ok(ExperimentOutcome {
+ backend: patch.backend,
+ command: patch.command.clone(),
+ dimensions: patch.dimensions.clone(),
+ primary_metric: patch.primary_metric.clone(),
+ supporting_metrics: patch.supporting_metrics.clone(),
+ verdict: patch.verdict,
+ rationale: patch.rationale.clone(),
+ analysis: patch.analysis.clone(),
+ closed_at: OffsetDateTime::now_utc(),
+ })
}
-}
-fn compare_metric_samples(
- left: &MetricSample,
- right: &MetricSample,
- order: MetricRankOrder,
-) -> Ordering {
- let metric_order = match order {
- MetricRankOrder::Asc => left
- .value
- .partial_cmp(&right.value)
- .unwrap_or(Ordering::Equal),
- MetricRankOrder::Desc => right
- .value
- .partial_cmp(&left.value)
- .unwrap_or(Ordering::Equal),
- };
- metric_order
- .then_with(|| right.experiment_id.cmp(&left.experiment_id))
- .then_with(|| left.key.cmp(&right.key))
-}
+ fn assert_known_tags(&self, tags: &BTreeSet<TagName>) -> Result<(), StoreError> {
+ for tag in tags {
+ if self
+ .connection
+ .query_row(
+ "SELECT 1 FROM tags WHERE name = ?1",
+ params![tag.as_str()],
+ |_| Ok(()),
+ )
+ .optional()?
+ .is_none()
+ {
+ return Err(StoreError::UnknownTag(tag.clone()));
+ }
+ }
+ Ok(())
+ }
-#[derive(Clone, Debug)]
-struct ExperimentMetricRow {
- experiment_id: fidget_spinner_core::ExperimentId,
- experiment_title: NonEmptyText,
- frontier_id: fidget_spinner_core::FrontierId,
- run_id: fidget_spinner_core::RunId,
- verdict: FrontierVerdict,
- hypothesis_node: DagNode,
- run_node: DagNode,
- analysis_node: Option<DagNode>,
- decision_node: DagNode,
- primary_metric: MetricValue,
- supporting_metrics: Vec<MetricValue>,
- dimensions: BTreeMap<NonEmptyText, RunDimensionValue>,
-}
+ fn unique_frontier_slug(
+ &self,
+ explicit: Option<Slug>,
+ label: &NonEmptyText,
+ ) -> Result<Slug, StoreError> {
+ self.unique_slug("frontiers", "slug", explicit, label)
+ }
-fn load_experiment_rows(store: &ProjectStore) -> Result<Vec<ExperimentMetricRow>, StoreError> {
- let run_dimensions = load_run_dimensions_by_run_id(store)?;
- let mut statement = store.connection.prepare(
- "SELECT
- id,
- title,
- frontier_id,
- run_id,
- hypothesis_node_id,
- run_node_id,
- analysis_node_id,
- decision_node_id,
- primary_metric_json,
- supporting_metrics_json,
- verdict
- FROM experiments",
- )?;
- let mut rows = statement.query([])?;
- let mut items = Vec::new();
- while let Some(row) = rows.next()? {
- let hypothesis_node_id = parse_node_id(&row.get::<_, String>(4)?)?;
- let run_id = parse_run_id(&row.get::<_, String>(3)?)?;
- let run_node_id = parse_node_id(&row.get::<_, String>(5)?)?;
- let analysis_node_id = row
- .get::<_, Option<String>>(6)?
- .map(|raw| parse_node_id(&raw))
- .transpose()?;
- let decision_node_id = parse_node_id(&row.get::<_, String>(7)?)?;
- items.push(ExperimentMetricRow {
- experiment_id: parse_experiment_id(&row.get::<_, String>(0)?)?,
- experiment_title: NonEmptyText::new(row.get::<_, String>(1)?)?,
- frontier_id: parse_frontier_id(&row.get::<_, String>(2)?)?,
- run_id,
- verdict: parse_frontier_verdict(&row.get::<_, String>(10)?)?,
- hypothesis_node: store
- .get_node(hypothesis_node_id)?
- .ok_or(StoreError::NodeNotFound(hypothesis_node_id))?,
- run_node: store
- .get_node(run_node_id)?
- .ok_or(StoreError::NodeNotFound(run_node_id))?,
- analysis_node: analysis_node_id
- .map(|node_id| {
- store
- .get_node(node_id)?
- .ok_or(StoreError::NodeNotFound(node_id))
- })
- .transpose()?,
- decision_node: store
- .get_node(decision_node_id)?
- .ok_or(StoreError::NodeNotFound(decision_node_id))?,
- primary_metric: decode_json(&row.get::<_, String>(8)?)?,
- supporting_metrics: decode_json(&row.get::<_, String>(9)?)?,
- dimensions: run_dimensions.get(&run_id).cloned().unwrap_or_default(),
- });
+ fn unique_hypothesis_slug(
+ &self,
+ explicit: Option<Slug>,
+ title: &NonEmptyText,
+ ) -> Result<Slug, StoreError> {
+ self.unique_slug("hypotheses", "slug", explicit, title)
}
- Ok(items)
-}
-fn metric_samples_for_row(
- schema: &ProjectSchema,
- row: &ExperimentMetricRow,
- metric_definitions: &BTreeMap<String, MetricDefinition>,
-) -> Vec<MetricSample> {
- let mut samples = vec![metric_sample_from_observation(
- row,
- &row.primary_metric,
- metric_definitions,
- MetricFieldSource::RunMetric,
- )];
- samples.extend(row.supporting_metrics.iter().map(|metric| {
- metric_sample_from_observation(
- row,
- metric,
- metric_definitions,
- MetricFieldSource::RunMetric,
- )
- }));
- samples.extend(metric_samples_from_payload(
- schema,
- row,
- &row.hypothesis_node,
- ));
- samples.extend(metric_samples_from_payload(schema, row, &row.run_node));
- if let Some(node) = row.analysis_node.as_ref() {
- samples.extend(metric_samples_from_payload(schema, row, node));
+ fn unique_experiment_slug(
+ &self,
+ explicit: Option<Slug>,
+ title: &NonEmptyText,
+ ) -> Result<Slug, StoreError> {
+ self.unique_slug("experiments", "slug", explicit, title)
}
- samples.extend(metric_samples_from_payload(schema, row, &row.decision_node));
- samples
-}
-fn metric_sample_from_observation(
- row: &ExperimentMetricRow,
- metric: &MetricValue,
- metric_definitions: &BTreeMap<String, MetricDefinition>,
- source: MetricFieldSource,
-) -> MetricSample {
- let registry = metric_definitions.get(metric.key.as_str());
- MetricSample {
- key: metric.key.clone(),
- source,
- value: metric.value,
- frontier_id: row.frontier_id,
- experiment_id: row.experiment_id,
- experiment_title: row.experiment_title.clone(),
- hypothesis_node_id: row.hypothesis_node.id,
- hypothesis_title: row.hypothesis_node.title.clone(),
- run_id: row.run_id,
- verdict: row.verdict,
- unit: registry.map(|definition| definition.unit),
- objective: registry.map(|definition| definition.objective),
- dimensions: row.dimensions.clone(),
+ fn unique_artifact_slug(
+ &self,
+ explicit: Option<Slug>,
+ label: &NonEmptyText,
+ ) -> Result<Slug, StoreError> {
+ self.unique_slug("artifacts", "slug", explicit, label)
}
-}
-fn metric_samples_from_payload(
- schema: &ProjectSchema,
- row: &ExperimentMetricRow,
- node: &DagNode,
-) -> Vec<MetricSample> {
- let Some(source) = MetricFieldSource::from_payload_class(node.class) else {
- return Vec::new();
- };
- node.payload
- .fields
- .iter()
- .filter_map(|(key, value)| {
- let value = value.as_f64()?;
- let spec = schema.field_spec(node.class, key);
- if spec.is_some_and(|field| {
- field
- .value_type
- .is_some_and(|kind| kind != FieldValueType::Numeric)
- }) {
- return None;
+ fn unique_slug(
+ &self,
+ table: &str,
+ column: &str,
+ explicit: Option<Slug>,
+ seed: &NonEmptyText,
+ ) -> Result<Slug, StoreError> {
+ if let Some(explicit) = explicit {
+ return Ok(explicit);
+ }
+ let base = slugify(seed.as_str())?;
+ if !self.slug_exists(table, column, &base)? {
+ return Ok(base);
+ }
+ for ordinal in 2..10_000 {
+ let candidate = Slug::new(format!("{}-{ordinal}", base.as_str()))?;
+ if !self.slug_exists(table, column, &candidate)? {
+ return Ok(candidate);
}
- Some(MetricSample {
- key: NonEmptyText::new(key.clone()).ok()?,
- source,
- value,
- frontier_id: row.frontier_id,
- experiment_id: row.experiment_id,
- experiment_title: row.experiment_title.clone(),
- hypothesis_node_id: row.hypothesis_node.id,
- hypothesis_title: row.hypothesis_node.title.clone(),
- run_id: row.run_id,
- verdict: row.verdict,
- unit: None,
- objective: None,
- dimensions: row.dimensions.clone(),
+ }
+ Slug::new(format!("{}-{}", base.as_str(), Uuid::now_v7().simple()))
+ .map_err(StoreError::from)
+ }
+
+ fn slug_exists(&self, table: &str, column: &str, slug: &Slug) -> Result<bool, StoreError> {
+ let sql = format!("SELECT 1 FROM {table} WHERE {column} = ?1");
+ self.connection
+ .query_row(&sql, params![slug.as_str()], |_| Ok(()))
+ .optional()
+ .map(|value| value.is_some())
+ .map_err(StoreError::from)
+ }
+
+ fn entity_history(
+ &self,
+ entity_kind: &str,
+ entity_id: &str,
+ ) -> Result<Vec<EntityHistoryEntry>, StoreError> {
+ let mut statement = self.connection.prepare(
+ "SELECT revision, event_kind, occurred_at, snapshot_json
+ FROM events
+ WHERE entity_kind = ?1 AND entity_id = ?2
+ ORDER BY revision DESC, occurred_at DESC",
+ )?;
+ let rows = statement.query_map(params![entity_kind, entity_id], |row| {
+ Ok(EntityHistoryEntry {
+ revision: row.get(0)?,
+ event_kind: parse_non_empty_text(&row.get::<_, String>(1)?)?,
+ occurred_at: parse_timestamp_sql(&row.get::<_, String>(2)?)?,
+ snapshot: decode_json(&row.get::<_, String>(3)?)
+ .map_err(to_sql_conversion_error)?,
})
- })
- .collect()
+ })?;
+ rows.collect::<Result<Vec<_>, _>>()
+ .map_err(StoreError::from)
+ }
}
-fn migrate(connection: &Connection) -> Result<(), StoreError> {
+fn install_schema(connection: &Connection) -> Result<(), StoreError> {
connection.execute_batch(
"
- PRAGMA foreign_keys = ON;
+ CREATE TABLE IF NOT EXISTS tags (
+ name TEXT PRIMARY KEY NOT NULL,
+ description TEXT NOT NULL,
+ created_at TEXT NOT NULL
+ );
- CREATE TABLE IF NOT EXISTS nodes (
- id TEXT PRIMARY KEY,
- class TEXT NOT NULL,
- track TEXT NOT NULL,
- frontier_id TEXT,
+ CREATE TABLE IF NOT EXISTS frontiers (
+ id TEXT PRIMARY KEY NOT NULL,
+ slug TEXT NOT NULL UNIQUE,
+ label TEXT NOT NULL,
+ objective TEXT NOT NULL,
+ status TEXT NOT NULL,
+ brief_json TEXT NOT NULL,
+ revision INTEGER NOT NULL,
+ created_at TEXT NOT NULL,
+ updated_at TEXT NOT NULL
+ );
+
+ CREATE TABLE IF NOT EXISTS hypotheses (
+ id TEXT PRIMARY KEY NOT NULL,
+ slug TEXT NOT NULL UNIQUE,
+ frontier_id TEXT NOT NULL REFERENCES frontiers(id) ON DELETE CASCADE,
archived INTEGER NOT NULL,
title TEXT NOT NULL,
- summary TEXT,
- payload_schema_namespace TEXT,
- payload_schema_version INTEGER,
- payload_json TEXT NOT NULL,
- diagnostics_json TEXT NOT NULL,
- agent_session_id TEXT,
+ summary TEXT NOT NULL,
+ body TEXT NOT NULL,
+ revision INTEGER NOT NULL,
created_at TEXT NOT NULL,
updated_at TEXT NOT NULL
);
- CREATE TABLE IF NOT EXISTS node_annotations (
- id TEXT PRIMARY KEY,
- node_id TEXT NOT NULL REFERENCES nodes(id) ON DELETE CASCADE,
- visibility TEXT NOT NULL,
- label TEXT,
- body TEXT NOT NULL,
- created_at TEXT NOT NULL
+ CREATE TABLE IF NOT EXISTS hypothesis_tags (
+ hypothesis_id TEXT NOT NULL REFERENCES hypotheses(id) ON DELETE CASCADE,
+ tag_name TEXT NOT NULL REFERENCES tags(name) ON DELETE CASCADE,
+ PRIMARY KEY (hypothesis_id, tag_name)
);
- CREATE TABLE IF NOT EXISTS tags (
- name TEXT PRIMARY KEY,
- description TEXT NOT NULL,
- created_at TEXT NOT NULL
+ CREATE TABLE IF NOT EXISTS experiments (
+ id TEXT PRIMARY KEY NOT NULL,
+ slug TEXT NOT NULL UNIQUE,
+ frontier_id TEXT NOT NULL REFERENCES frontiers(id) ON DELETE CASCADE,
+ hypothesis_id TEXT NOT NULL REFERENCES hypotheses(id) ON DELETE CASCADE,
+ archived INTEGER NOT NULL,
+ title TEXT NOT NULL,
+ summary TEXT,
+ tags_json TEXT NOT NULL,
+ status TEXT NOT NULL,
+ outcome_json TEXT,
+ revision INTEGER NOT NULL,
+ created_at TEXT NOT NULL,
+ updated_at TEXT NOT NULL
);
- CREATE TABLE IF NOT EXISTS node_tags (
- node_id TEXT NOT NULL REFERENCES nodes(id) ON DELETE CASCADE,
- tag_name TEXT NOT NULL REFERENCES tags(name) ON DELETE RESTRICT,
- PRIMARY KEY (node_id, tag_name)
+ CREATE TABLE IF NOT EXISTS experiment_tags (
+ experiment_id TEXT NOT NULL REFERENCES experiments(id) ON DELETE CASCADE,
+ tag_name TEXT NOT NULL REFERENCES tags(name) ON DELETE CASCADE,
+ PRIMARY KEY (experiment_id, tag_name)
);
- CREATE TABLE IF NOT EXISTS node_edges (
- source_id TEXT NOT NULL REFERENCES nodes(id) ON DELETE CASCADE,
- target_id TEXT NOT NULL REFERENCES nodes(id) ON DELETE CASCADE,
- kind TEXT NOT NULL,
- PRIMARY KEY (source_id, target_id, kind)
+ CREATE TABLE IF NOT EXISTS influence_edges (
+ parent_kind TEXT NOT NULL,
+ parent_id TEXT NOT NULL,
+ child_kind TEXT NOT NULL,
+ child_id TEXT NOT NULL,
+ ordinal INTEGER NOT NULL,
+ PRIMARY KEY (parent_kind, parent_id, child_kind, child_id)
);
- CREATE TABLE IF NOT EXISTS frontiers (
- id TEXT PRIMARY KEY,
+ CREATE TABLE IF NOT EXISTS artifacts (
+ id TEXT PRIMARY KEY NOT NULL,
+ slug TEXT NOT NULL UNIQUE,
+ kind TEXT NOT NULL,
label TEXT NOT NULL,
- root_contract_node_id TEXT NOT NULL REFERENCES nodes(id) ON DELETE RESTRICT,
- status TEXT NOT NULL,
+ summary TEXT,
+ locator TEXT NOT NULL,
+ media_type TEXT,
+ revision INTEGER NOT NULL,
created_at TEXT NOT NULL,
updated_at TEXT NOT NULL
);
- CREATE TABLE IF NOT EXISTS runs (
- run_id TEXT PRIMARY KEY,
- node_id TEXT NOT NULL REFERENCES nodes(id) ON DELETE CASCADE,
- frontier_id TEXT REFERENCES frontiers(id) ON DELETE SET NULL,
- status TEXT NOT NULL,
- backend TEXT NOT NULL,
- benchmark_suite TEXT,
- working_directory TEXT NOT NULL,
- argv_json TEXT NOT NULL,
- env_json TEXT NOT NULL,
- started_at TEXT,
- finished_at TEXT
- );
-
- CREATE TABLE IF NOT EXISTS metrics (
- run_id TEXT NOT NULL REFERENCES runs(run_id) ON DELETE CASCADE,
- metric_key TEXT NOT NULL,
- unit TEXT NOT NULL,
- objective TEXT NOT NULL,
- value REAL NOT NULL
+ CREATE TABLE IF NOT EXISTS artifact_attachments (
+ artifact_id TEXT NOT NULL REFERENCES artifacts(id) ON DELETE CASCADE,
+ target_kind TEXT NOT NULL,
+ target_id TEXT NOT NULL,
+ ordinal INTEGER NOT NULL,
+ PRIMARY KEY (artifact_id, target_kind, target_id)
);
CREATE TABLE IF NOT EXISTS metric_definitions (
- metric_key TEXT PRIMARY KEY,
+ key TEXT PRIMARY KEY NOT NULL,
unit TEXT NOT NULL,
objective TEXT NOT NULL,
+ visibility TEXT NOT NULL,
description TEXT,
- created_at TEXT NOT NULL
+ created_at TEXT NOT NULL,
+ updated_at TEXT NOT NULL
);
CREATE TABLE IF NOT EXISTS run_dimension_definitions (
- dimension_key TEXT PRIMARY KEY,
+ key TEXT PRIMARY KEY NOT NULL,
value_type TEXT NOT NULL,
description TEXT,
- created_at TEXT NOT NULL
+ created_at TEXT NOT NULL,
+ updated_at TEXT NOT NULL
);
- CREATE TABLE IF NOT EXISTS run_dimensions (
- run_id TEXT NOT NULL REFERENCES runs(run_id) ON DELETE CASCADE,
- dimension_key TEXT NOT NULL REFERENCES run_dimension_definitions(dimension_key) ON DELETE RESTRICT,
- value_type TEXT NOT NULL,
- value_text TEXT,
- value_numeric REAL,
- value_boolean INTEGER,
- value_timestamp TEXT,
- PRIMARY KEY (run_id, dimension_key)
+ CREATE TABLE IF NOT EXISTS experiment_dimensions (
+ experiment_id TEXT NOT NULL REFERENCES experiments(id) ON DELETE CASCADE,
+ key TEXT NOT NULL REFERENCES run_dimension_definitions(key) ON DELETE CASCADE,
+ value_json TEXT NOT NULL,
+ PRIMARY KEY (experiment_id, key)
);
- CREATE TABLE IF NOT EXISTS open_experiments (
- id TEXT PRIMARY KEY,
- frontier_id TEXT NOT NULL REFERENCES frontiers(id) ON DELETE CASCADE,
- hypothesis_node_id TEXT NOT NULL REFERENCES nodes(id) ON DELETE RESTRICT,
- title TEXT NOT NULL,
- summary TEXT,
- created_at TEXT NOT NULL
+ CREATE TABLE IF NOT EXISTS experiment_metrics (
+ experiment_id TEXT NOT NULL REFERENCES experiments(id) ON DELETE CASCADE,
+ key TEXT NOT NULL REFERENCES metric_definitions(key) ON DELETE CASCADE,
+ ordinal INTEGER NOT NULL,
+ is_primary INTEGER NOT NULL,
+ value REAL NOT NULL,
+ PRIMARY KEY (experiment_id, key, ordinal)
);
- CREATE TABLE IF NOT EXISTS experiments (
- id TEXT PRIMARY KEY,
- frontier_id TEXT NOT NULL REFERENCES frontiers(id) ON DELETE CASCADE,
- hypothesis_node_id TEXT NOT NULL REFERENCES nodes(id) ON DELETE RESTRICT,
- run_node_id TEXT NOT NULL REFERENCES nodes(id) ON DELETE RESTRICT,
- run_id TEXT NOT NULL REFERENCES runs(run_id) ON DELETE RESTRICT,
- analysis_node_id TEXT REFERENCES nodes(id) ON DELETE RESTRICT,
- decision_node_id TEXT NOT NULL REFERENCES nodes(id) ON DELETE RESTRICT,
- title TEXT NOT NULL,
- summary TEXT,
- benchmark_suite TEXT NOT NULL,
- primary_metric_json TEXT NOT NULL,
- supporting_metrics_json TEXT NOT NULL,
- note_summary TEXT NOT NULL,
- note_next_json TEXT NOT NULL,
- verdict TEXT NOT NULL,
- created_at TEXT NOT NULL
- );
-
- CREATE INDEX IF NOT EXISTS metrics_by_key ON metrics(metric_key);
- CREATE INDEX IF NOT EXISTS run_dimensions_by_key_text ON run_dimensions(dimension_key, value_text);
- CREATE INDEX IF NOT EXISTS run_dimensions_by_key_numeric ON run_dimensions(dimension_key, value_numeric);
- CREATE INDEX IF NOT EXISTS run_dimensions_by_run ON run_dimensions(run_id, dimension_key);
- CREATE INDEX IF NOT EXISTS open_experiments_by_frontier ON open_experiments(frontier_id, created_at DESC);
- CREATE INDEX IF NOT EXISTS experiments_by_frontier ON experiments(frontier_id, created_at DESC);
-
CREATE TABLE IF NOT EXISTS events (
- id INTEGER PRIMARY KEY AUTOINCREMENT,
entity_kind TEXT NOT NULL,
entity_id TEXT NOT NULL,
+ revision INTEGER NOT NULL,
event_kind TEXT NOT NULL,
- payload_json TEXT NOT NULL,
- created_at TEXT NOT NULL
+ occurred_at TEXT NOT NULL,
+ snapshot_json TEXT NOT NULL,
+ PRIMARY KEY (entity_kind, entity_id, revision)
);
",
)?;
Ok(())
}
-fn backfill_prose_summaries(connection: &Connection) -> Result<(), StoreError> {
- let mut statement = connection.prepare(
- "SELECT id, payload_json
- FROM nodes
- WHERE class IN ('note', 'source')
- AND (summary IS NULL OR trim(summary) = '')",
+fn insert_frontier(
+ transaction: &Transaction<'_>,
+ frontier: &FrontierRecord,
+) -> Result<(), StoreError> {
+ let _ = transaction.execute(
+ "INSERT INTO frontiers (id, slug, label, objective, status, brief_json, revision, created_at, updated_at)
+ VALUES (?1, ?2, ?3, ?4, ?5, ?6, ?7, ?8, ?9)",
+ params![
+ frontier.id.to_string(),
+ frontier.slug.as_str(),
+ frontier.label.as_str(),
+ frontier.objective.as_str(),
+ frontier.status.as_str(),
+ encode_json(&frontier.brief)?,
+ frontier.revision,
+ encode_timestamp(frontier.created_at)?,
+ encode_timestamp(frontier.updated_at)?,
+ ],
)?;
- let mut rows = statement.query([])?;
- let mut updates = Vec::new();
- while let Some(row) = rows.next()? {
- let node_id = row.get::<_, String>(0)?;
- let payload = decode_json::<NodePayload>(&row.get::<_, String>(1)?)?;
- let Some(Value::String(body)) = payload.field("body") else {
- continue;
- };
- let Some(summary) = derive_summary_from_body(body) else {
- continue;
- };
- updates.push((node_id, summary));
- }
- for (node_id, summary) in updates {
- let _ = connection.execute(
- "UPDATE nodes SET summary = ?1 WHERE id = ?2",
- params![summary.as_str(), node_id],
- )?;
- }
Ok(())
}
-fn sort_schema_fields(fields: &mut [ProjectFieldSpec]) {
- fields.sort_by(|left, right| {
- left.name
- .cmp(&right.name)
- .then_with(|| left.node_classes.iter().cmp(right.node_classes.iter()))
- });
-}
-
-fn normalize_metric_plane_tx(
- tx: &Transaction<'_>,
-) -> Result<MetricPlaneMigrationReport, StoreError> {
- let mut report = MetricPlaneMigrationReport::default();
-
- if insert_run_dimension_definition_tx(
- tx,
- &RunDimensionDefinition::new(
- NonEmptyText::new("benchmark_suite")?,
- FieldValueType::String,
- Some(NonEmptyText::new("Legacy coarse benchmark label")?),
- ),
- )? {
- report.inserted_dimension_definitions += 1;
- }
-
- {
- let mut statement = tx.prepare(
- "SELECT DISTINCT metric_key, unit, objective
- FROM metrics
- ORDER BY metric_key ASC",
- )?;
- let mut rows = statement.query([])?;
- while let Some(row) = rows.next()? {
- let definition = MetricDefinition::new(
- NonEmptyText::new(row.get::<_, String>(0)?)?,
- decode_metric_unit(&row.get::<_, String>(1)?)?,
- decode_optimization_objective(&row.get::<_, String>(2)?)?,
- None,
- );
- if upsert_metric_definition_tx(tx, &definition)? {
- report.inserted_metric_definitions += 1;
- }
- }
- }
-
- {
- let mut statement = tx.prepare(
- "SELECT payload_json
- FROM nodes
- WHERE class = 'contract'",
- )?;
- let mut rows = statement.query([])?;
- while let Some(row) = rows.next()? {
- let payload = decode_json::<NodePayload>(&row.get::<_, String>(0)?)?;
- for definition in contract_metric_definitions(&payload)? {
- if upsert_metric_definition_tx(tx, &definition)? {
- report.inserted_metric_definitions += 1;
- }
- }
- }
- }
-
- {
- let mut statement = tx.prepare(
- "SELECT run_id, benchmark_suite
- FROM runs
- WHERE benchmark_suite IS NOT NULL
- AND trim(benchmark_suite) != ''",
- )?;
- let mut rows = statement.query([])?;
- while let Some(row) = rows.next()? {
- let run_id = parse_run_id(&row.get::<_, String>(0)?)?;
- let value = RunDimensionValue::String(NonEmptyText::new(row.get::<_, String>(1)?)?);
- if insert_run_dimension_value_tx(
- tx,
- run_id,
- &NonEmptyText::new("benchmark_suite")?,
- &value,
- )? {
- report.inserted_dimension_values += 1;
- }
- }
- }
-
- Ok(report)
-}
-
-fn contract_metric_definitions(payload: &NodePayload) -> Result<Vec<MetricDefinition>, StoreError> {
- let mut definitions = Vec::new();
- if let Some(primary) = payload.field("primary_metric") {
- definitions.push(metric_definition_from_json(primary, None)?);
- }
- if let Some(Value::Array(items)) = payload.field("supporting_metrics") {
- for item in items {
- definitions.push(metric_definition_from_json(item, None)?);
- }
- }
- Ok(definitions)
-}
-
-fn metric_definition_from_json(
- value: &Value,
- description: Option<NonEmptyText>,
-) -> Result<MetricDefinition, StoreError> {
- let Some(object) = value.as_object() else {
- return Err(StoreError::Json(serde_json::Error::io(io::Error::new(
- io::ErrorKind::InvalidData,
- "metric definition payload must be an object",
- ))));
- };
- let key = object
- .get("metric_key")
- .or_else(|| object.get("key"))
- .and_then(Value::as_str)
- .ok_or_else(|| {
- StoreError::Json(serde_json::Error::io(io::Error::new(
- io::ErrorKind::InvalidData,
- "metric definition missing key",
- )))
- })?;
- let unit = object.get("unit").and_then(Value::as_str).ok_or_else(|| {
- StoreError::Json(serde_json::Error::io(io::Error::new(
- io::ErrorKind::InvalidData,
- "metric definition missing unit",
- )))
- })?;
- let objective = object
- .get("objective")
- .and_then(Value::as_str)
- .ok_or_else(|| {
- StoreError::Json(serde_json::Error::io(io::Error::new(
- io::ErrorKind::InvalidData,
- "metric definition missing objective",
- )))
- })?;
- Ok(MetricDefinition::new(
- NonEmptyText::new(key)?,
- decode_metric_unit(unit)?,
- decode_optimization_objective(objective)?,
- description,
- ))
-}
-
-fn upsert_metric_definition_tx(
- tx: &Transaction<'_>,
- definition: &MetricDefinition,
-) -> Result<bool, StoreError> {
- let existing = tx
- .query_row(
- "SELECT unit, objective, description
- FROM metric_definitions
- WHERE metric_key = ?1",
- params![definition.key.as_str()],
- |row| {
- Ok((
- row.get::<_, String>(0)?,
- row.get::<_, String>(1)?,
- row.get::<_, Option<String>>(2)?,
- ))
- },
- )
- .optional()?;
- if let Some((existing_unit, existing_objective, existing_description)) = existing {
- let new_unit = encode_metric_unit(definition.unit).to_owned();
- let new_objective = encode_optimization_objective(definition.objective).to_owned();
- if existing_unit != new_unit || existing_objective != new_objective {
- return Err(StoreError::ConflictingMetricDefinition {
- key: definition.key.as_str().to_owned(),
- existing_unit,
- existing_objective,
- new_unit,
- new_objective,
- });
- }
- if existing_description.is_none() && definition.description.is_some() {
- let _ = tx.execute(
- "UPDATE metric_definitions SET description = ?2 WHERE metric_key = ?1",
- params![
- definition.key.as_str(),
- definition.description.as_ref().map(NonEmptyText::as_str)
- ],
- )?;
- }
- Ok(false)
- } else {
- let _ = tx.execute(
- "INSERT INTO metric_definitions (metric_key, unit, objective, description, created_at)
- VALUES (?1, ?2, ?3, ?4, ?5)",
- params![
- definition.key.as_str(),
- encode_metric_unit(definition.unit),
- encode_optimization_objective(definition.objective),
- definition.description.as_ref().map(NonEmptyText::as_str),
- encode_timestamp(definition.created_at)?,
- ],
- )?;
- Ok(true)
- }
-}
-
-fn insert_run_dimension_definition_tx(
- tx: &Transaction<'_>,
- definition: &RunDimensionDefinition,
-) -> Result<bool, StoreError> {
- let existing = tx
- .query_row(
- "SELECT value_type, description
- FROM run_dimension_definitions
- WHERE dimension_key = ?1",
- params![definition.key.as_str()],
- |row| Ok((row.get::<_, String>(0)?, row.get::<_, Option<String>>(1)?)),
- )
- .optional()?;
- if let Some((existing_type, existing_description)) = existing {
- let new_type = encode_field_value_type(definition.value_type).to_owned();
- if existing_type != new_type {
- return Err(StoreError::ConflictingRunDimensionDefinition {
- key: definition.key.as_str().to_owned(),
- existing_type,
- new_type,
- });
- }
- if existing_description.is_none() && definition.description.is_some() {
- let _ = tx.execute(
- "UPDATE run_dimension_definitions SET description = ?2 WHERE dimension_key = ?1",
- params![
- definition.key.as_str(),
- definition.description.as_ref().map(NonEmptyText::as_str)
- ],
- )?;
- }
- Ok(false)
- } else {
- let _ = tx.execute(
- "INSERT INTO run_dimension_definitions (dimension_key, value_type, description, created_at)
- VALUES (?1, ?2, ?3, ?4)",
- params![
- definition.key.as_str(),
- encode_field_value_type(definition.value_type),
- definition.description.as_ref().map(NonEmptyText::as_str),
- encode_timestamp(definition.created_at)?,
- ],
- )?;
- Ok(true)
- }
-}
-
-fn load_metric_definition_tx(
- tx: &Transaction<'_>,
- key: &NonEmptyText,
-) -> Result<Option<MetricDefinition>, StoreError> {
- tx.query_row(
- "SELECT metric_key, unit, objective, description, created_at
- FROM metric_definitions
- WHERE metric_key = ?1",
- params![key.as_str()],
- |row| {
- Ok(MetricDefinition {
- key: NonEmptyText::new(row.get::<_, String>(0)?)
- .map_err(core_to_sql_conversion_error)?,
- unit: decode_metric_unit(&row.get::<_, String>(1)?)
- .map_err(to_sql_conversion_error)?,
- objective: decode_optimization_objective(&row.get::<_, String>(2)?)
- .map_err(to_sql_conversion_error)?,
- description: row
- .get::<_, Option<String>>(3)?
- .map(NonEmptyText::new)
- .transpose()
- .map_err(core_to_sql_conversion_error)?,
- created_at: decode_timestamp(&row.get::<_, String>(4)?)
- .map_err(to_sql_conversion_error)?,
- })
- },
- )
- .optional()
- .map_err(StoreError::from)
-}
-
-fn metric_definitions_by_key(
- store: &ProjectStore,
-) -> Result<BTreeMap<String, MetricDefinition>, StoreError> {
- Ok(store
- .list_metric_definitions()?
- .into_iter()
- .map(|definition| (definition.key.as_str().to_owned(), definition))
- .collect())
-}
-
-fn run_dimension_definitions_by_key(
- store: &ProjectStore,
-) -> Result<BTreeMap<String, RunDimensionDefinition>, StoreError> {
- let mut statement = store.connection.prepare(
- "SELECT dimension_key, value_type, description, created_at
- FROM run_dimension_definitions",
+fn update_frontier(
+ transaction: &Transaction<'_>,
+ frontier: &FrontierRecord,
+) -> Result<(), StoreError> {
+ let _ = transaction.execute(
+ "UPDATE frontiers
+ SET slug = ?2, label = ?3, objective = ?4, status = ?5, brief_json = ?6, revision = ?7, updated_at = ?8
+ WHERE id = ?1",
+ params![
+ frontier.id.to_string(),
+ frontier.slug.as_str(),
+ frontier.label.as_str(),
+ frontier.objective.as_str(),
+ frontier.status.as_str(),
+ encode_json(&frontier.brief)?,
+ frontier.revision,
+ encode_timestamp(frontier.updated_at)?,
+ ],
)?;
- let mut rows = statement.query([])?;
- let mut items = BTreeMap::new();
- while let Some(row) = rows.next()? {
- let definition = RunDimensionDefinition {
- key: NonEmptyText::new(row.get::<_, String>(0)?)?,
- value_type: decode_field_value_type(&row.get::<_, String>(1)?)?,
- description: row
- .get::<_, Option<String>>(2)?
- .map(NonEmptyText::new)
- .transpose()?,
- created_at: decode_timestamp(&row.get::<_, String>(3)?)?,
- };
- let _ = items.insert(definition.key.as_str().to_owned(), definition);
- }
- Ok(items)
-}
-
-fn coerce_run_dimension_map(
- definitions: &BTreeMap<String, RunDimensionDefinition>,
- raw_dimensions: BTreeMap<String, Value>,
-) -> Result<BTreeMap<NonEmptyText, RunDimensionValue>, StoreError> {
- let mut dimensions = BTreeMap::new();
- for (raw_key, raw_value) in raw_dimensions {
- let key = NonEmptyText::new(raw_key)?;
- let Some(definition) = definitions.get(key.as_str()) else {
- return Err(StoreError::UnknownRunDimension(key));
- };
- let value = coerce_run_dimension_value(definition, raw_value)?;
- let _ = dimensions.insert(key, value);
- }
- Ok(dimensions)
-}
-
-fn coerce_run_dimension_value(
- definition: &RunDimensionDefinition,
- raw_value: Value,
-) -> Result<RunDimensionValue, StoreError> {
- match definition.value_type {
- FieldValueType::String => match raw_value {
- Value::String(value) => Ok(RunDimensionValue::String(NonEmptyText::new(value)?)),
- other => Err(StoreError::InvalidRunDimensionValue {
- key: definition.key.as_str().to_owned(),
- expected: definition.value_type.as_str().to_owned(),
- observed: value_kind_name(&other).to_owned(),
- }),
- },
- FieldValueType::Numeric => match raw_value.as_f64() {
- Some(value) => Ok(RunDimensionValue::Numeric(value)),
- None => Err(StoreError::InvalidRunDimensionValue {
- key: definition.key.as_str().to_owned(),
- expected: definition.value_type.as_str().to_owned(),
- observed: value_kind_name(&raw_value).to_owned(),
- }),
- },
- FieldValueType::Boolean => match raw_value {
- Value::Bool(value) => Ok(RunDimensionValue::Boolean(value)),
- other => Err(StoreError::InvalidRunDimensionValue {
- key: definition.key.as_str().to_owned(),
- expected: definition.value_type.as_str().to_owned(),
- observed: value_kind_name(&other).to_owned(),
- }),
- },
- FieldValueType::Timestamp => match raw_value {
- Value::String(value) => {
- let _ = OffsetDateTime::parse(&value, &Rfc3339)?;
- Ok(RunDimensionValue::Timestamp(NonEmptyText::new(value)?))
- }
- other => Err(StoreError::InvalidRunDimensionValue {
- key: definition.key.as_str().to_owned(),
- expected: definition.value_type.as_str().to_owned(),
- observed: value_kind_name(&other).to_owned(),
- }),
- },
- }
+ Ok(())
}
-fn insert_run_dimension_value_tx(
- tx: &Transaction<'_>,
- run_id: fidget_spinner_core::RunId,
- key: &NonEmptyText,
- value: &RunDimensionValue,
-) -> Result<bool, StoreError> {
- let (value_text, value_numeric, value_boolean, value_timestamp) =
- encode_run_dimension_columns(value)?;
- let changed = tx.execute(
- "INSERT OR IGNORE INTO run_dimensions (
- run_id,
- dimension_key,
- value_type,
- value_text,
- value_numeric,
- value_boolean,
- value_timestamp
- ) VALUES (?1, ?2, ?3, ?4, ?5, ?6, ?7)",
+fn insert_hypothesis(
+ transaction: &Transaction<'_>,
+ hypothesis: &HypothesisRecord,
+) -> Result<(), StoreError> {
+ let _ = transaction.execute(
+ "INSERT INTO hypotheses (id, slug, frontier_id, archived, title, summary, body, revision, created_at, updated_at)
+ VALUES (?1, ?2, ?3, ?4, ?5, ?6, ?7, ?8, ?9, ?10)",
params![
- run_id.to_string(),
- key.as_str(),
- encode_field_value_type(value.value_type()),
- value_text,
- value_numeric,
- value_boolean,
- value_timestamp,
+ hypothesis.id.to_string(),
+ hypothesis.slug.as_str(),
+ hypothesis.frontier_id.to_string(),
+ bool_to_sql(hypothesis.archived),
+ hypothesis.title.as_str(),
+ hypothesis.summary.as_str(),
+ hypothesis.body.as_str(),
+ hypothesis.revision,
+ encode_timestamp(hypothesis.created_at)?,
+ encode_timestamp(hypothesis.updated_at)?,
],
)?;
- Ok(changed > 0)
+ Ok(())
}
-fn insert_run_dimensions(
- tx: &Transaction<'_>,
- run_id: fidget_spinner_core::RunId,
- dimensions: &BTreeMap<NonEmptyText, RunDimensionValue>,
+fn update_hypothesis_row(
+ transaction: &Transaction<'_>,
+ hypothesis: &HypothesisRecord,
) -> Result<(), StoreError> {
- for (key, value) in dimensions {
- let _ = insert_run_dimension_value_tx(tx, run_id, key, value)?;
- }
+ let _ = transaction.execute(
+ "UPDATE hypotheses
+ SET slug = ?2, archived = ?3, title = ?4, summary = ?5, body = ?6, revision = ?7, updated_at = ?8
+ WHERE id = ?1",
+ params![
+ hypothesis.id.to_string(),
+ hypothesis.slug.as_str(),
+ bool_to_sql(hypothesis.archived),
+ hypothesis.title.as_str(),
+ hypothesis.summary.as_str(),
+ hypothesis.body.as_str(),
+ hypothesis.revision,
+ encode_timestamp(hypothesis.updated_at)?,
+ ],
+ )?;
Ok(())
}
-fn validate_run_dimensions_tx(
- tx: &Transaction<'_>,
- dimensions: &BTreeMap<NonEmptyText, RunDimensionValue>,
-) -> Result<BTreeMap<NonEmptyText, RunDimensionValue>, StoreError> {
- for (key, value) in dimensions {
- let Some(expected_type) = tx
- .query_row(
- "SELECT value_type
- FROM run_dimension_definitions
- WHERE dimension_key = ?1",
- params![key.as_str()],
- |row| row.get::<_, String>(0),
- )
- .optional()?
- else {
- return Err(StoreError::UnknownRunDimension(key.clone()));
- };
- let expected_type = decode_field_value_type(&expected_type)?;
- let observed_type = value.value_type();
- if expected_type != observed_type {
- return Err(StoreError::InvalidRunDimensionValue {
- key: key.as_str().to_owned(),
- expected: expected_type.as_str().to_owned(),
- observed: observed_type.as_str().to_owned(),
- });
- }
- if matches!(value, RunDimensionValue::Timestamp(raw) if OffsetDateTime::parse(raw.as_str(), &Rfc3339).is_err())
- {
- return Err(StoreError::InvalidRunDimensionValue {
- key: key.as_str().to_owned(),
- expected: FieldValueType::Timestamp.as_str().to_owned(),
- observed: "string".to_owned(),
- });
- }
- }
- Ok(dimensions.clone())
-}
-
-fn load_run_dimensions_by_run_id(
- store: &ProjectStore,
-) -> Result<
- BTreeMap<fidget_spinner_core::RunId, BTreeMap<NonEmptyText, RunDimensionValue>>,
- StoreError,
-> {
- let mut statement = store.connection.prepare(
- "SELECT run_id, dimension_key, value_type, value_text, value_numeric, value_boolean, value_timestamp
- FROM run_dimensions
- ORDER BY dimension_key ASC",
+fn replace_hypothesis_tags(
+ transaction: &Transaction<'_>,
+ hypothesis_id: HypothesisId,
+ tags: &BTreeSet<TagName>,
+) -> Result<(), StoreError> {
+ let _ = transaction.execute(
+ "DELETE FROM hypothesis_tags WHERE hypothesis_id = ?1",
+ params![hypothesis_id.to_string()],
)?;
- let mut rows = statement.query([])?;
- let mut values =
- BTreeMap::<fidget_spinner_core::RunId, BTreeMap<NonEmptyText, RunDimensionValue>>::new();
- while let Some(row) = rows.next()? {
- let run_id = parse_run_id(&row.get::<_, String>(0)?)?;
- let key = NonEmptyText::new(row.get::<_, String>(1)?)?;
- let value_type = decode_field_value_type(&row.get::<_, String>(2)?)?;
- let value = decode_run_dimension_value(
- value_type,
- row.get::<_, Option<String>>(3)?,
- row.get::<_, Option<f64>>(4)?,
- row.get::<_, Option<i64>>(5)?,
- row.get::<_, Option<String>>(6)?,
- )?;
- let _ = values.entry(run_id).or_default().insert(key, value);
- }
- Ok(values)
-}
-
-fn load_run_dimension_summaries(
- store: &ProjectStore,
-) -> Result<Vec<RunDimensionSummary>, StoreError> {
- let definitions = {
- let mut statement = store.connection.prepare(
- "SELECT dimension_key, value_type, description, created_at
- FROM run_dimension_definitions
- ORDER BY dimension_key ASC",
- )?;
- let mut rows = statement.query([])?;
- let mut items = Vec::new();
- while let Some(row) = rows.next()? {
- items.push(RunDimensionDefinition {
- key: NonEmptyText::new(row.get::<_, String>(0)?)?,
- value_type: decode_field_value_type(&row.get::<_, String>(1)?)?,
- description: row
- .get::<_, Option<String>>(2)?
- .map(NonEmptyText::new)
- .transpose()?,
- created_at: decode_timestamp(&row.get::<_, String>(3)?)?,
- });
- }
- items
- };
-
- let mut summaries = Vec::new();
- for definition in definitions {
- let mut statement = store.connection.prepare(
- "SELECT value_text, value_numeric, value_boolean, value_timestamp
- FROM run_dimensions
- WHERE dimension_key = ?1",
+ for tag in tags {
+ let _ = transaction.execute(
+ "INSERT INTO hypothesis_tags (hypothesis_id, tag_name) VALUES (?1, ?2)",
+ params![hypothesis_id.to_string(), tag.as_str()],
)?;
- let mut rows = statement.query(params![definition.key.as_str()])?;
- let mut observed_run_count = 0_u64;
- let mut distinct = BTreeSet::new();
- let mut sample_values = Vec::new();
- while let Some(row) = rows.next()? {
- observed_run_count += 1;
- let value = decode_run_dimension_value(
- definition.value_type,
- row.get::<_, Option<String>>(0)?,
- row.get::<_, Option<f64>>(1)?,
- row.get::<_, Option<i64>>(2)?,
- row.get::<_, Option<String>>(3)?,
- )?;
- let serialized = encode_json(&value.as_json())?;
- if distinct.insert(serialized) && sample_values.len() < 5 {
- sample_values.push(value.as_json());
- }
- }
- summaries.push(RunDimensionSummary {
- key: definition.key,
- value_type: definition.value_type,
- description: definition.description,
- observed_run_count,
- distinct_value_count: distinct.len() as u64,
- sample_values,
- });
- }
- Ok(summaries)
-}
-
-fn merge_registered_run_metric_summaries(
- store: &ProjectStore,
- summaries: &mut Vec<MetricKeySummary>,
-) -> Result<(), StoreError> {
- let definitions = store.list_metric_definitions()?;
- for definition in definitions {
- if let Some(summary) = summaries.iter_mut().find(|summary| {
- summary.source == MetricFieldSource::RunMetric && summary.key == definition.key
- }) {
- summary.unit = Some(definition.unit);
- summary.objective = Some(definition.objective);
- summary.description.clone_from(&definition.description);
- summary.requires_order = matches!(definition.objective, OptimizationObjective::Target);
- continue;
- }
- summaries.push(MetricKeySummary {
- key: definition.key,
- source: MetricFieldSource::RunMetric,
- experiment_count: 0,
- unit: Some(definition.unit),
- objective: Some(definition.objective),
- description: definition.description,
- requires_order: matches!(definition.objective, OptimizationObjective::Target),
- });
}
Ok(())
}
-fn dimensions_match(
- haystack: &BTreeMap<NonEmptyText, RunDimensionValue>,
- needle: &BTreeMap<NonEmptyText, RunDimensionValue>,
-) -> bool {
- needle
- .iter()
- .all(|(key, value)| haystack.get(key) == Some(value))
-}
-
-fn run_dimensions_json(dimensions: &BTreeMap<NonEmptyText, RunDimensionValue>) -> Value {
- Value::Object(
- dimensions
- .iter()
- .map(|(key, value)| (key.to_string(), value.as_json()))
- .collect::<serde_json::Map<String, Value>>(),
- )
-}
-
-fn benchmark_suite_label(dimensions: &BTreeMap<NonEmptyText, RunDimensionValue>) -> Option<String> {
- dimensions
- .get(&NonEmptyText::new("benchmark_suite").ok()?)
- .and_then(|value| match value {
- RunDimensionValue::String(item) => Some(item.to_string()),
- _ => None,
- })
- .or_else(|| {
- if dimensions.is_empty() {
- None
- } else {
- Some(
- dimensions
- .iter()
- .map(|(key, value)| format!("{key}={}", dimension_value_text(value)))
- .collect::<Vec<_>>()
- .join(", "),
- )
- }
- })
-}
-
-fn derive_summary_from_body(body: &str) -> Option<NonEmptyText> {
- const MAX_SUMMARY_CHARS: usize = 240;
-
- let paragraph = body
- .split("\n\n")
- .map(collapse_inline_whitespace)
- .map(|text| text.trim().to_owned())
- .find(|text| !text.is_empty())?;
- let summary = truncate_chars(&paragraph, MAX_SUMMARY_CHARS);
- NonEmptyText::new(summary).ok()
-}
-
-fn collapse_inline_whitespace(raw: &str) -> String {
- raw.split_whitespace().collect::<Vec<_>>().join(" ")
-}
-
-fn truncate_chars(value: &str, max_chars: usize) -> String {
- if value.chars().count() <= max_chars {
- return value.to_owned();
- }
- let mut truncated = value.chars().take(max_chars).collect::<String>();
- if let Some(index) = truncated.rfind(char::is_whitespace) {
- truncated.truncate(index);
- }
- format!("{}…", truncated.trim_end())
-}
-
-fn insert_node(tx: &Transaction<'_>, node: &DagNode) -> Result<(), StoreError> {
- let schema_namespace = node
- .payload
- .schema
- .as_ref()
- .map(|schema| schema.namespace.as_str());
- let schema_version = node
- .payload
- .schema
- .as_ref()
- .map(|schema| i64::from(schema.version));
- let _ = tx.execute(
- "INSERT INTO nodes (
- id,
- class,
- track,
- frontier_id,
- archived,
- title,
- summary,
- payload_schema_namespace,
- payload_schema_version,
- payload_json,
- diagnostics_json,
- agent_session_id,
- created_at,
- updated_at
- ) VALUES (?1, ?2, ?3, ?4, ?5, ?6, ?7, ?8, ?9, ?10, ?11, ?12, ?13, ?14)",
+fn insert_experiment(
+ transaction: &Transaction<'_>,
+ experiment: &ExperimentRecord,
+) -> Result<(), StoreError> {
+ let _ = transaction.execute(
+ "INSERT INTO experiments (id, slug, frontier_id, hypothesis_id, archived, title, summary, tags_json, status, outcome_json, revision, created_at, updated_at)
+ VALUES (?1, ?2, ?3, ?4, ?5, ?6, ?7, ?8, ?9, ?10, ?11, ?12, ?13)",
params![
- node.id.to_string(),
- node.class.as_str(),
- encode_node_track(node.track),
- node.frontier_id.map(|id| id.to_string()),
- i64::from(node.archived),
- node.title.as_str(),
- node.summary.as_ref().map(NonEmptyText::as_str),
- schema_namespace,
- schema_version,
- encode_json(&node.payload)?,
- encode_json(&node.diagnostics)?,
- node.agent_session_id.map(|id| id.to_string()),
- encode_timestamp(node.created_at)?,
- encode_timestamp(node.updated_at)?,
+ experiment.id.to_string(),
+ experiment.slug.as_str(),
+ experiment.frontier_id.to_string(),
+ experiment.hypothesis_id.to_string(),
+ bool_to_sql(experiment.archived),
+ experiment.title.as_str(),
+ experiment.summary.as_ref().map(NonEmptyText::as_str),
+ encode_json(&experiment.tags)?,
+ experiment.status.as_str(),
+ experiment.outcome.as_ref().map(encode_json).transpose()?,
+ experiment.revision,
+ encode_timestamp(experiment.created_at)?,
+ encode_timestamp(experiment.updated_at)?,
],
)?;
- for annotation in &node.annotations {
- insert_annotation(tx, node.id, annotation)?;
- }
- for tag in &node.tags {
- insert_node_tag(tx, node.id, tag)?;
- }
Ok(())
}
-fn insert_tag(tx: &Transaction<'_>, tag: &TagRecord) -> Result<(), StoreError> {
- let existing = tx
- .query_row(
- "SELECT 1 FROM tags WHERE name = ?1",
- params![tag.name.as_str()],
- |row| row.get::<_, i64>(0),
- )
- .optional()?;
- if existing.is_some() {
- return Err(StoreError::DuplicateTag(tag.name.clone()));
- }
- let _ = tx.execute(
- "INSERT INTO tags (name, description, created_at)
- VALUES (?1, ?2, ?3)",
+fn update_experiment_row(
+ transaction: &Transaction<'_>,
+ experiment: &ExperimentRecord,
+) -> Result<(), StoreError> {
+ let _ = transaction.execute(
+ "UPDATE experiments
+ SET slug = ?2, archived = ?3, title = ?4, summary = ?5, tags_json = ?6, status = ?7, outcome_json = ?8, revision = ?9, updated_at = ?10
+ WHERE id = ?1",
params![
- tag.name.as_str(),
- tag.description.as_str(),
- encode_timestamp(tag.created_at)?,
+ experiment.id.to_string(),
+ experiment.slug.as_str(),
+ bool_to_sql(experiment.archived),
+ experiment.title.as_str(),
+ experiment.summary.as_ref().map(NonEmptyText::as_str),
+ encode_json(&experiment.tags)?,
+ experiment.status.as_str(),
+ experiment.outcome.as_ref().map(encode_json).transpose()?,
+ experiment.revision,
+ encode_timestamp(experiment.updated_at)?,
],
)?;
Ok(())
}
-fn insert_annotation(
- tx: &Transaction<'_>,
- node_id: fidget_spinner_core::NodeId,
- annotation: &NodeAnnotation,
+fn replace_experiment_tags(
+ transaction: &Transaction<'_>,
+ experiment_id: ExperimentId,
+ tags: &BTreeSet<TagName>,
) -> Result<(), StoreError> {
- let _ = tx.execute(
- "INSERT INTO node_annotations (id, node_id, visibility, label, body, created_at)
- VALUES (?1, ?2, ?3, ?4, ?5, ?6)",
- params![
- annotation.id.to_string(),
- node_id.to_string(),
- encode_annotation_visibility(annotation.visibility),
- annotation.label.as_ref().map(NonEmptyText::as_str),
- annotation.body.as_str(),
- encode_timestamp(annotation.created_at)?,
- ],
+ let _ = transaction.execute(
+ "DELETE FROM experiment_tags WHERE experiment_id = ?1",
+ params![experiment_id.to_string()],
)?;
+ for tag in tags {
+ let _ = transaction.execute(
+ "INSERT INTO experiment_tags (experiment_id, tag_name) VALUES (?1, ?2)",
+ params![experiment_id.to_string(), tag.as_str()],
+ )?;
+ }
Ok(())
}
-fn insert_node_tag(
- tx: &Transaction<'_>,
- node_id: fidget_spinner_core::NodeId,
- tag: &TagName,
+fn replace_influence_parents(
+ transaction: &Transaction<'_>,
+ child: VertexRef,
+ parents: &[VertexRef],
) -> Result<(), StoreError> {
- let _ = tx.execute(
- "INSERT INTO node_tags (node_id, tag_name)
- VALUES (?1, ?2)",
- params![node_id.to_string(), tag.as_str()],
+ let _ = transaction.execute(
+ "DELETE FROM influence_edges WHERE child_kind = ?1 AND child_id = ?2",
+ params![vertex_kind_name(child), child.opaque_id()],
)?;
- Ok(())
-}
-
-fn ensure_known_tags(tx: &Transaction<'_>, tags: &BTreeSet<TagName>) -> Result<(), StoreError> {
- let mut statement = tx.prepare("SELECT 1 FROM tags WHERE name = ?1")?;
- for tag in tags {
- let exists = statement
- .query_row(params![tag.as_str()], |row| row.get::<_, i64>(0))
- .optional()?;
- if exists.is_none() {
- return Err(StoreError::UnknownTag(tag.clone()));
- }
+ for (ordinal, parent) in parents.iter().enumerate() {
+ let _ = transaction.execute(
+ "INSERT INTO influence_edges (parent_kind, parent_id, child_kind, child_id, ordinal)
+ VALUES (?1, ?2, ?3, ?4, ?5)",
+ params![
+ vertex_kind_name(*parent),
+ parent.opaque_id(),
+ vertex_kind_name(child),
+ child.opaque_id(),
+ i64::try_from(ordinal).unwrap_or(i64::MAX),
+ ],
+ )?;
}
Ok(())
}
-fn insert_edge(tx: &Transaction<'_>, edge: &DagEdge) -> Result<(), StoreError> {
- let _ = tx.execute(
- "INSERT OR IGNORE INTO node_edges (source_id, target_id, kind)
- VALUES (?1, ?2, ?3)",
+fn insert_artifact(
+ transaction: &Transaction<'_>,
+ artifact: &ArtifactRecord,
+) -> Result<(), StoreError> {
+ let _ = transaction.execute(
+ "INSERT INTO artifacts (id, slug, kind, label, summary, locator, media_type, revision, created_at, updated_at)
+ VALUES (?1, ?2, ?3, ?4, ?5, ?6, ?7, ?8, ?9, ?10)",
params![
- edge.source_id.to_string(),
- edge.target_id.to_string(),
- encode_edge_kind(edge.kind),
+ artifact.id.to_string(),
+ artifact.slug.as_str(),
+ artifact.kind.as_str(),
+ artifact.label.as_str(),
+ artifact.summary.as_ref().map(NonEmptyText::as_str),
+ artifact.locator.as_str(),
+ artifact.media_type.as_ref().map(NonEmptyText::as_str),
+ artifact.revision,
+ encode_timestamp(artifact.created_at)?,
+ encode_timestamp(artifact.updated_at)?,
],
)?;
Ok(())
}
-fn insert_frontier(tx: &Transaction<'_>, frontier: &FrontierRecord) -> Result<(), StoreError> {
- let _ = tx.execute(
- "INSERT INTO frontiers (id, label, root_contract_node_id, status, created_at, updated_at)
- VALUES (?1, ?2, ?3, ?4, ?5, ?6)",
+fn update_artifact_row(
+ transaction: &Transaction<'_>,
+ artifact: &ArtifactRecord,
+) -> Result<(), StoreError> {
+ let _ = transaction.execute(
+ "UPDATE artifacts
+ SET slug = ?2, kind = ?3, label = ?4, summary = ?5, locator = ?6, media_type = ?7, revision = ?8, updated_at = ?9
+ WHERE id = ?1",
params![
- frontier.id.to_string(),
- frontier.label.as_str(),
- frontier.root_contract_node_id.to_string(),
- encode_frontier_status(frontier.status),
- encode_timestamp(frontier.created_at)?,
- encode_timestamp(frontier.updated_at)?,
+ artifact.id.to_string(),
+ artifact.slug.as_str(),
+ artifact.kind.as_str(),
+ artifact.label.as_str(),
+ artifact.summary.as_ref().map(NonEmptyText::as_str),
+ artifact.locator.as_str(),
+ artifact.media_type.as_ref().map(NonEmptyText::as_str),
+ artifact.revision,
+ encode_timestamp(artifact.updated_at)?,
],
)?;
Ok(())
}
-fn insert_run(
- tx: &Transaction<'_>,
- run: &RunRecord,
- benchmark_suite: Option<&str>,
- primary_metric: &MetricValue,
- primary_metric_definition: &MetricDefinition,
- supporting_metrics: &[MetricValue],
- supporting_metric_definitions: &[MetricDefinition],
+fn replace_artifact_attachments(
+ transaction: &Transaction<'_>,
+ artifact_id: ArtifactId,
+ attachments: &[AttachmentTargetRef],
) -> Result<(), StoreError> {
- let started_at = match run.started_at {
- Some(timestamp) => Some(encode_timestamp(timestamp)?),
- None => None,
- };
- let finished_at = match run.finished_at {
- Some(timestamp) => Some(encode_timestamp(timestamp)?),
- None => None,
- };
- let _ = tx.execute(
- "INSERT INTO runs (
- run_id,
- node_id,
- frontier_id,
- status,
- backend,
- benchmark_suite,
- working_directory,
- argv_json,
- env_json,
- started_at,
- finished_at
- ) VALUES (?1, ?2, ?3, ?4, ?5, ?6, ?7, ?8, ?9, ?10, ?11)",
- params![
- run.run_id.to_string(),
- run.node_id.to_string(),
- run.frontier_id.map(|id| id.to_string()),
- encode_run_status(run.status),
- encode_backend(run.backend),
- benchmark_suite,
- run.command.working_directory.as_str(),
- encode_json(&run.command.argv)?,
- encode_json(&run.command.env)?,
- started_at,
- finished_at,
- ],
+ let _ = transaction.execute(
+ "DELETE FROM artifact_attachments WHERE artifact_id = ?1",
+ params![artifact_id.to_string()],
)?;
-
- for (metric, definition) in std::iter::once((primary_metric, primary_metric_definition)).chain(
- supporting_metrics
- .iter()
- .zip(supporting_metric_definitions.iter()),
- ) {
- let _ = tx.execute(
- "INSERT INTO metrics (run_id, metric_key, unit, objective, value)
- VALUES (?1, ?2, ?3, ?4, ?5)",
+ for (ordinal, attachment) in attachments.iter().enumerate() {
+ let _ = transaction.execute(
+ "INSERT INTO artifact_attachments (artifact_id, target_kind, target_id, ordinal)
+ VALUES (?1, ?2, ?3, ?4)",
params![
- run.run_id.to_string(),
- metric.key.as_str(),
- encode_metric_unit(definition.unit),
- encode_optimization_objective(definition.objective),
- metric.value,
+ artifact_id.to_string(),
+ attachment_target_kind_name(*attachment),
+ attachment.opaque_id(),
+ i64::try_from(ordinal).unwrap_or(i64::MAX),
],
)?;
}
Ok(())
}
-fn insert_open_experiment(
- tx: &Transaction<'_>,
- experiment: &OpenExperiment,
-) -> Result<(), StoreError> {
- let _ = tx.execute(
- "INSERT INTO open_experiments (
- id,
- frontier_id,
- hypothesis_node_id,
- title,
- summary,
- created_at
- ) VALUES (?1, ?2, ?3, ?4, ?5, ?6)",
- params![
- experiment.id.to_string(),
- experiment.frontier_id.to_string(),
- experiment.hypothesis_node_id.to_string(),
- experiment.title.as_str(),
- experiment.summary.as_ref().map(NonEmptyText::as_str),
- encode_timestamp(experiment.created_at)?,
- ],
- )?;
- Ok(())
-}
-
-fn delete_open_experiment(
- tx: &Transaction<'_>,
- experiment_id: fidget_spinner_core::ExperimentId,
+fn replace_experiment_dimensions(
+ transaction: &Transaction<'_>,
+ experiment_id: ExperimentId,
+ outcome: Option<&ExperimentOutcome>,
) -> Result<(), StoreError> {
- let _ = tx.execute(
- "DELETE FROM open_experiments WHERE id = ?1",
+ let _ = transaction.execute(
+ "DELETE FROM experiment_dimensions WHERE experiment_id = ?1",
params![experiment_id.to_string()],
)?;
+ if let Some(outcome) = outcome {
+ for (key, value) in &outcome.dimensions {
+ let _ = transaction.execute(
+ "INSERT INTO experiment_dimensions (experiment_id, key, value_json) VALUES (?1, ?2, ?3)",
+ params![experiment_id.to_string(), key.as_str(), encode_json(value)?],
+ )?;
+ }
+ }
Ok(())
}
-fn insert_experiment(
- tx: &Transaction<'_>,
- experiment: &CompletedExperiment,
+fn replace_experiment_metrics(
+ transaction: &Transaction<'_>,
+ experiment_id: ExperimentId,
+ outcome: Option<&ExperimentOutcome>,
) -> Result<(), StoreError> {
- let _ = tx.execute(
- "INSERT INTO experiments (
- id,
- frontier_id,
- hypothesis_node_id,
- run_node_id,
- run_id,
- analysis_node_id,
- decision_node_id,
- title,
- summary,
- benchmark_suite,
- primary_metric_json,
- supporting_metrics_json,
- note_summary,
- note_next_json,
- verdict,
- created_at
- ) VALUES (?1, ?2, ?3, ?4, ?5, ?6, ?7, ?8, ?9, ?10, ?11, ?12, ?13, ?14, ?15, ?16)",
- params![
- experiment.id.to_string(),
- experiment.frontier_id.to_string(),
- experiment.hypothesis_node_id.to_string(),
- experiment.run_node_id.to_string(),
- experiment.run_id.to_string(),
- experiment.analysis_node_id.map(|id| id.to_string()),
- experiment.decision_node_id.to_string(),
- experiment.title.as_str(),
- experiment.summary.as_ref().map(NonEmptyText::as_str),
- benchmark_suite_label(&experiment.result.dimensions),
- encode_json(&experiment.result.primary_metric)?,
- encode_json(&experiment.result.supporting_metrics)?,
- experiment.note.summary.as_str(),
- encode_json(&experiment.note.next_hypotheses)?,
- encode_frontier_verdict(experiment.verdict),
- encode_timestamp(experiment.created_at)?,
- ],
+ let _ = transaction.execute(
+ "DELETE FROM experiment_metrics WHERE experiment_id = ?1",
+ params![experiment_id.to_string()],
)?;
+ if let Some(outcome) = outcome {
+ for (ordinal, metric) in all_metrics(outcome).into_iter().enumerate() {
+ let _ = transaction.execute(
+ "INSERT INTO experiment_metrics (experiment_id, key, ordinal, is_primary, value)
+ VALUES (?1, ?2, ?3, ?4, ?5)",
+ params![
+ experiment_id.to_string(),
+ metric.key.as_str(),
+ i64::try_from(ordinal).unwrap_or(i64::MAX),
+ bool_to_sql(ordinal == 0),
+ metric.value,
+ ],
+ )?;
+ }
+ }
Ok(())
}
-fn insert_event(
- tx: &Transaction<'_>,
+fn record_event(
+ transaction: &Transaction<'_>,
entity_kind: &str,
entity_id: &str,
+ revision: u64,
event_kind: &str,
- payload: Value,
+ snapshot: &impl Serialize,
) -> Result<(), StoreError> {
- let _ = tx.execute(
- "INSERT INTO events (entity_kind, entity_id, event_kind, payload_json, created_at)
- VALUES (?1, ?2, ?3, ?4, ?5)",
+ let _ = transaction.execute(
+ "INSERT INTO events (entity_kind, entity_id, revision, event_kind, occurred_at, snapshot_json)
+ VALUES (?1, ?2, ?3, ?4, ?5, ?6)",
params![
entity_kind,
entity_id,
+ revision,
event_kind,
- payload.to_string(),
encode_timestamp(OffsetDateTime::now_utc())?,
+ encode_json(snapshot)?,
],
)?;
Ok(())
}
-fn load_open_experiment(
- connection: &Connection,
- experiment_id: fidget_spinner_core::ExperimentId,
-) -> Result<Option<OpenExperiment>, StoreError> {
- let mut statement = connection.prepare(
- "SELECT
- id,
- frontier_id,
- hypothesis_node_id,
- title,
- summary,
- created_at
- FROM open_experiments
- WHERE id = ?1",
- )?;
- statement
- .query_row(params![experiment_id.to_string()], |row| {
- Ok(OpenExperiment {
- id: parse_experiment_id(&row.get::<_, String>(0)?)
- .map_err(to_sql_conversion_error)?,
- frontier_id: parse_frontier_id(&row.get::<_, String>(1)?)
- .map_err(to_sql_conversion_error)?,
- hypothesis_node_id: parse_node_id(&row.get::<_, String>(2)?)
- .map_err(to_sql_conversion_error)?,
- title: NonEmptyText::new(row.get::<_, String>(3)?)
- .map_err(core_to_sql_conversion_error)?,
- summary: row
- .get::<_, Option<String>>(4)?
- .map(NonEmptyText::new)
- .transpose()
- .map_err(core_to_sql_conversion_error)?,
- created_at: decode_timestamp(&row.get::<_, String>(5)?)
- .map_err(to_sql_conversion_error)?,
- })
- })
- .optional()
- .map_err(StoreError::from)
-}
-
-fn summarize_open_experiment(experiment: &OpenExperiment) -> OpenExperimentSummary {
- OpenExperimentSummary {
- id: experiment.id,
- frontier_id: experiment.frontier_id,
- hypothesis_node_id: experiment.hypothesis_node_id,
- title: experiment.title.clone(),
- summary: experiment.summary.clone(),
- created_at: experiment.created_at,
- }
-}
-
-fn touch_frontier(
- tx: &Transaction<'_>,
- frontier_id: fidget_spinner_core::FrontierId,
-) -> Result<(), StoreError> {
- let _ = tx.execute(
- "UPDATE frontiers SET updated_at = ?1 WHERE id = ?2",
- params![
- encode_timestamp(OffsetDateTime::now_utc())?,
- frontier_id.to_string()
- ],
- )?;
- Ok(())
+fn decode_frontier_row(row: &rusqlite::Row<'_>) -> Result<FrontierRecord, rusqlite::Error> {
+ Ok(FrontierRecord {
+ id: FrontierId::from_uuid(parse_uuid_sql(&row.get::<_, String>(0)?)?),
+ slug: parse_slug(&row.get::<_, String>(1)?)?,
+ label: parse_non_empty_text(&row.get::<_, String>(2)?)?,
+ objective: parse_non_empty_text(&row.get::<_, String>(3)?)?,
+ status: parse_frontier_status(&row.get::<_, String>(4)?)?,
+ brief: decode_json(&row.get::<_, String>(5)?).map_err(to_sql_conversion_error)?,
+ revision: row.get(6)?,
+ created_at: parse_timestamp_sql(&row.get::<_, String>(7)?)?,
+ updated_at: parse_timestamp_sql(&row.get::<_, String>(8)?)?,
+ })
}
-fn read_node_row(row: &rusqlite::Row<'_>) -> Result<DagNode, rusqlite::Error> {
- let payload_json = row.get::<_, String>(9)?;
- let diagnostics_json = row.get::<_, String>(10)?;
- let payload = decode_json::<NodePayload>(&payload_json).map_err(to_sql_conversion_error)?;
- let diagnostics =
- decode_json::<NodeDiagnostics>(&diagnostics_json).map_err(to_sql_conversion_error)?;
- Ok(DagNode {
- id: parse_node_id(&row.get::<_, String>(0)?).map_err(to_sql_conversion_error)?,
- class: parse_node_class(&row.get::<_, String>(1)?).map_err(to_sql_conversion_error)?,
- track: parse_node_track(&row.get::<_, String>(2)?).map_err(to_sql_conversion_error)?,
- frontier_id: row
- .get::<_, Option<String>>(3)?
- .map(|raw| parse_frontier_id(&raw))
- .transpose()
- .map_err(to_sql_conversion_error)?,
+fn decode_experiment_row(row: &rusqlite::Row<'_>) -> Result<ExperimentRecord, rusqlite::Error> {
+ Ok(ExperimentRecord {
+ id: ExperimentId::from_uuid(parse_uuid_sql(&row.get::<_, String>(0)?)?),
+ slug: parse_slug(&row.get::<_, String>(1)?)?,
+ frontier_id: FrontierId::from_uuid(parse_uuid_sql(&row.get::<_, String>(2)?)?),
+ hypothesis_id: HypothesisId::from_uuid(parse_uuid_sql(&row.get::<_, String>(3)?)?),
archived: row.get::<_, i64>(4)? != 0,
- title: NonEmptyText::new(row.get::<_, String>(5)?).map_err(core_to_sql_conversion_error)?,
- summary: row
- .get::<_, Option<String>>(6)?
- .map(NonEmptyText::new)
- .transpose()
- .map_err(core_to_sql_conversion_error)?,
- tags: BTreeSet::new(),
- payload,
- annotations: Vec::new(),
- diagnostics,
- agent_session_id: row
- .get::<_, Option<String>>(11)?
- .map(|raw| parse_agent_session_id(&raw))
- .transpose()
- .map_err(to_sql_conversion_error)?,
- created_at: decode_timestamp(&row.get::<_, String>(12)?)
- .map_err(to_sql_conversion_error)?,
- updated_at: decode_timestamp(&row.get::<_, String>(13)?)
- .map_err(to_sql_conversion_error)?,
+ title: parse_non_empty_text(&row.get::<_, String>(5)?)?,
+ summary: parse_optional_non_empty_text(row.get::<_, Option<String>>(6)?)?,
+ tags: decode_json(&row.get::<_, String>(7)?).map_err(to_sql_conversion_error)?,
+ status: parse_experiment_status(&row.get::<_, String>(8)?)?,
+ outcome: row
+ .get::<_, Option<String>>(9)?
+ .map(|raw| decode_json(&raw).map_err(to_sql_conversion_error))
+ .transpose()?,
+ revision: row.get(10)?,
+ created_at: parse_timestamp_sql(&row.get::<_, String>(11)?)?,
+ updated_at: parse_timestamp_sql(&row.get::<_, String>(12)?)?,
})
}
-fn read_frontier_row(row: &rusqlite::Row<'_>) -> Result<FrontierRecord, StoreError> {
- Ok(FrontierRecord {
- id: parse_frontier_id(&row.get::<_, String>(0)?)?,
- label: NonEmptyText::new(row.get::<_, String>(1)?)?,
- root_contract_node_id: parse_node_id(&row.get::<_, String>(2)?)?,
- status: parse_frontier_status(&row.get::<_, String>(3)?)?,
- created_at: decode_timestamp(&row.get::<_, String>(4)?)?,
- updated_at: decode_timestamp(&row.get::<_, String>(5)?)?,
+fn decode_artifact_row(row: &rusqlite::Row<'_>) -> Result<ArtifactRecord, rusqlite::Error> {
+ Ok(ArtifactRecord {
+ id: ArtifactId::from_uuid(parse_uuid_sql(&row.get::<_, String>(0)?)?),
+ slug: parse_slug(&row.get::<_, String>(1)?)?,
+ kind: parse_artifact_kind(&row.get::<_, String>(2)?)?,
+ label: parse_non_empty_text(&row.get::<_, String>(3)?)?,
+ summary: parse_optional_non_empty_text(row.get::<_, Option<String>>(4)?)?,
+ locator: parse_non_empty_text(&row.get::<_, String>(5)?)?,
+ media_type: parse_optional_non_empty_text(row.get::<_, Option<String>>(6)?)?,
+ revision: row.get(7)?,
+ created_at: parse_timestamp_sql(&row.get::<_, String>(8)?)?,
+ updated_at: parse_timestamp_sql(&row.get::<_, String>(9)?)?,
})
}
-fn frontier_contract_payload(contract: &FrontierContract) -> Result<JsonObject, StoreError> {
- json_object(json!({
- "objective": contract.objective.as_str(),
- "benchmark_suites": contract
- .evaluation
- .benchmark_suites
- .iter()
- .map(NonEmptyText::as_str)
- .collect::<Vec<_>>(),
- "primary_metric": metric_spec_json(&contract.evaluation.primary_metric),
- "supporting_metrics": contract
- .evaluation
- .supporting_metrics
- .iter()
- .map(metric_spec_json)
- .collect::<Vec<_>>(),
- "promotion_criteria": contract
- .promotion_criteria
- .iter()
- .map(NonEmptyText::as_str)
- .collect::<Vec<_>>(),
- }))
+fn decode_metric_definition_row(
+ row: &rusqlite::Row<'_>,
+) -> Result<MetricDefinition, rusqlite::Error> {
+ Ok(MetricDefinition {
+ key: parse_non_empty_text(&row.get::<_, String>(0)?)?,
+ unit: parse_metric_unit(&row.get::<_, String>(1)?)?,
+ objective: parse_optimization_objective(&row.get::<_, String>(2)?)?,
+ visibility: parse_metric_visibility(&row.get::<_, String>(3)?)?,
+ description: parse_optional_non_empty_text(row.get::<_, Option<String>>(4)?)?,
+ created_at: parse_timestamp_sql(&row.get::<_, String>(5)?)?,
+ updated_at: parse_timestamp_sql(&row.get::<_, String>(6)?)?,
+ })
}
-fn metric_spec_json(metric: &MetricSpec) -> Value {
- json!({
- "metric_key": metric.metric_key.as_str(),
- "unit": encode_metric_unit(metric.unit),
- "objective": encode_optimization_objective(metric.objective),
+fn decode_run_dimension_definition_row(
+ row: &rusqlite::Row<'_>,
+) -> Result<RunDimensionDefinition, rusqlite::Error> {
+ Ok(RunDimensionDefinition {
+ key: parse_non_empty_text(&row.get::<_, String>(0)?)?,
+ value_type: parse_field_value_type(&row.get::<_, String>(1)?)?,
+ description: parse_optional_non_empty_text(row.get::<_, Option<String>>(2)?)?,
+ created_at: parse_timestamp_sql(&row.get::<_, String>(3)?)?,
+ updated_at: parse_timestamp_sql(&row.get::<_, String>(4)?)?,
})
}
-fn json_object(value: Value) -> Result<JsonObject, StoreError> {
- match value {
- Value::Object(map) => Ok(map),
- other => Err(StoreError::Json(serde_json::Error::io(io::Error::new(
- io::ErrorKind::InvalidInput,
- format!("expected JSON object, got {other:?}"),
- )))),
+fn enforce_revision(
+ kind: &'static str,
+ selector: &str,
+ expected: Option<u64>,
+ observed: u64,
+) -> Result<(), StoreError> {
+ if let Some(expected) = expected
+ && expected != observed
+ {
+ return Err(StoreError::RevisionMismatch {
+ kind,
+ selector: selector.to_owned(),
+ expected,
+ observed,
+ });
}
-}
-
-fn write_json_file<T: Serialize>(path: &Utf8Path, value: &T) -> Result<(), StoreError> {
- let serialized = serde_json::to_string_pretty(value)?;
- fs::write(path.as_std_path(), serialized)?;
Ok(())
}
-fn read_json_file<T: for<'de> Deserialize<'de>>(path: &Utf8Path) -> Result<T, StoreError> {
- let bytes = fs::read(path.as_std_path())?;
- serde_json::from_slice(&bytes).map_err(StoreError::from)
-}
-
-fn encode_json<T: Serialize>(value: &T) -> Result<String, StoreError> {
- serde_json::to_string(value).map_err(StoreError::from)
-}
-
-fn decode_json<T: for<'de> Deserialize<'de>>(raw: &str) -> Result<T, StoreError> {
- serde_json::from_str(raw).map_err(StoreError::from)
-}
-
-fn encode_timestamp(timestamp: OffsetDateTime) -> Result<String, StoreError> {
- timestamp.format(&Rfc3339).map_err(StoreError::from)
-}
-
-fn decode_timestamp(raw: &str) -> Result<OffsetDateTime, StoreError> {
- OffsetDateTime::parse(raw, &Rfc3339).map_err(StoreError::from)
-}
-
-fn state_root(project_root: &Utf8Path) -> Utf8PathBuf {
- project_root.join(STORE_DIR_NAME)
-}
-
-#[must_use]
-pub fn discover_project_root(path: impl AsRef<Utf8Path>) -> Option<Utf8PathBuf> {
- let mut cursor = discovery_start(path.as_ref());
- loop {
- if state_root(&cursor).exists() {
- return Some(cursor);
- }
- let parent = cursor.parent()?;
- cursor = parent.to_path_buf();
- }
-}
-
-fn discovery_start(path: &Utf8Path) -> Utf8PathBuf {
- match fs::metadata(path.as_std_path()) {
- Ok(metadata) if metadata.is_file() => path
- .parent()
- .map_or_else(|| path.to_path_buf(), Utf8Path::to_path_buf),
- _ => path.to_path_buf(),
+fn validate_hypothesis_body(body: &NonEmptyText) -> Result<(), StoreError> {
+ let raw = body.as_str().trim();
+ if raw.contains("\n\n")
+ || raw.lines().any(|line| {
+ let trimmed = line.trim_start();
+ trimmed.starts_with('-') || trimmed.starts_with('*') || trimmed.starts_with('#')
+ })
+ {
+ return Err(StoreError::HypothesisBodyMustBeSingleParagraph);
}
+ Ok(())
}
-fn to_sql_conversion_error(error: StoreError) -> rusqlite::Error {
- rusqlite::Error::FromSqlConversionFailure(0, rusqlite::types::Type::Text, Box::new(error))
-}
-
-fn core_to_sql_conversion_error(error: fidget_spinner_core::CoreError) -> rusqlite::Error {
- to_sql_conversion_error(StoreError::from(error))
-}
-
-fn parse_uuid(raw: &str) -> Result<Uuid, StoreError> {
- Uuid::parse_str(raw).map_err(StoreError::from)
-}
-
-fn parse_node_id(raw: &str) -> Result<fidget_spinner_core::NodeId, StoreError> {
- Ok(fidget_spinner_core::NodeId::from_uuid(parse_uuid(raw)?))
-}
-
-fn parse_frontier_id(raw: &str) -> Result<fidget_spinner_core::FrontierId, StoreError> {
- Ok(fidget_spinner_core::FrontierId::from_uuid(parse_uuid(raw)?))
-}
-
-fn parse_experiment_id(raw: &str) -> Result<fidget_spinner_core::ExperimentId, StoreError> {
- Ok(fidget_spinner_core::ExperimentId::from_uuid(parse_uuid(
- raw,
- )?))
-}
-
-fn parse_run_id(raw: &str) -> Result<fidget_spinner_core::RunId, StoreError> {
- Ok(fidget_spinner_core::RunId::from_uuid(parse_uuid(raw)?))
-}
-
-fn parse_agent_session_id(raw: &str) -> Result<fidget_spinner_core::AgentSessionId, StoreError> {
- Ok(fidget_spinner_core::AgentSessionId::from_uuid(parse_uuid(
- raw,
- )?))
-}
-
-fn parse_annotation_id(raw: &str) -> Result<fidget_spinner_core::AnnotationId, StoreError> {
- Ok(fidget_spinner_core::AnnotationId::from_uuid(parse_uuid(
- raw,
- )?))
-}
-
-fn parse_node_class(raw: &str) -> Result<NodeClass, StoreError> {
+fn parse_frontier_status(raw: &str) -> Result<FrontierStatus, rusqlite::Error> {
match raw {
- "contract" => Ok(NodeClass::Contract),
- "hypothesis" => Ok(NodeClass::Hypothesis),
- "run" => Ok(NodeClass::Run),
- "analysis" => Ok(NodeClass::Analysis),
- "decision" => Ok(NodeClass::Decision),
- "source" => Ok(NodeClass::Source),
- "note" => Ok(NodeClass::Note),
- other => Err(StoreError::Json(serde_json::Error::io(io::Error::new(
- io::ErrorKind::InvalidData,
- format!("unknown node class `{other}`"),
- )))),
- }
-}
-
-fn encode_node_track(track: fidget_spinner_core::NodeTrack) -> &'static str {
- match track {
- fidget_spinner_core::NodeTrack::CorePath => "core-path",
- fidget_spinner_core::NodeTrack::OffPath => "off-path",
+ "exploring" => Ok(FrontierStatus::Exploring),
+ "paused" => Ok(FrontierStatus::Paused),
+ "archived" => Ok(FrontierStatus::Archived),
+ _ => Err(to_sql_conversion_error(StoreError::Json(
+ serde_json::Error::io(io::Error::new(
+ io::ErrorKind::InvalidData,
+ format!("invalid frontier status `{raw}`"),
+ )),
+ ))),
}
}
-fn parse_node_track(raw: &str) -> Result<fidget_spinner_core::NodeTrack, StoreError> {
+fn parse_metric_unit(raw: &str) -> Result<MetricUnit, rusqlite::Error> {
match raw {
- "core-path" => Ok(fidget_spinner_core::NodeTrack::CorePath),
- "off-path" => Ok(fidget_spinner_core::NodeTrack::OffPath),
- other => Err(StoreError::Json(serde_json::Error::io(io::Error::new(
- io::ErrorKind::InvalidData,
- format!("unknown node track `{other}`"),
- )))),
- }
-}
-
-fn encode_annotation_visibility(visibility: AnnotationVisibility) -> &'static str {
- match visibility {
- AnnotationVisibility::HiddenByDefault => "hidden",
- AnnotationVisibility::Visible => "visible",
+ "seconds" => Ok(MetricUnit::Seconds),
+ "bytes" => Ok(MetricUnit::Bytes),
+ "count" => Ok(MetricUnit::Count),
+ "ratio" => Ok(MetricUnit::Ratio),
+ "custom" => Ok(MetricUnit::Custom),
+ _ => Err(to_sql_conversion_error(StoreError::Json(
+ serde_json::Error::io(io::Error::new(
+ io::ErrorKind::InvalidData,
+ format!("invalid metric unit `{raw}`"),
+ )),
+ ))),
}
}
-fn parse_annotation_visibility(raw: &str) -> Result<AnnotationVisibility, StoreError> {
+fn parse_optimization_objective(raw: &str) -> Result<OptimizationObjective, rusqlite::Error> {
match raw {
- "hidden" => Ok(AnnotationVisibility::HiddenByDefault),
- "visible" => Ok(AnnotationVisibility::Visible),
- other => Err(StoreError::Json(serde_json::Error::io(io::Error::new(
- io::ErrorKind::InvalidData,
- format!("unknown annotation visibility `{other}`"),
- )))),
+ "minimize" => Ok(OptimizationObjective::Minimize),
+ "maximize" => Ok(OptimizationObjective::Maximize),
+ "target" => Ok(OptimizationObjective::Target),
+ _ => Err(to_sql_conversion_error(StoreError::Json(
+ serde_json::Error::io(io::Error::new(
+ io::ErrorKind::InvalidData,
+ format!("invalid objective `{raw}`"),
+ )),
+ ))),
}
}
-fn encode_edge_kind(kind: EdgeKind) -> &'static str {
- match kind {
- EdgeKind::Lineage => "lineage",
- EdgeKind::Evidence => "evidence",
- EdgeKind::Comparison => "comparison",
- EdgeKind::Supersedes => "supersedes",
- EdgeKind::Annotation => "annotation",
+fn parse_metric_visibility(raw: &str) -> Result<MetricVisibility, rusqlite::Error> {
+ match raw {
+ "canonical" => Ok(MetricVisibility::Canonical),
+ "minor" => Ok(MetricVisibility::Minor),
+ "hidden" => Ok(MetricVisibility::Hidden),
+ "archived" => Ok(MetricVisibility::Archived),
+ _ => Err(to_sql_conversion_error(StoreError::Json(
+ serde_json::Error::io(io::Error::new(
+ io::ErrorKind::InvalidData,
+ format!("invalid metric visibility `{raw}`"),
+ )),
+ ))),
}
}
-fn encode_frontier_status(status: FrontierStatus) -> &'static str {
- match status {
- FrontierStatus::Exploring => "exploring",
- FrontierStatus::Paused => "paused",
- FrontierStatus::Saturated => "saturated",
- FrontierStatus::Archived => "archived",
+fn parse_field_value_type(raw: &str) -> Result<FieldValueType, rusqlite::Error> {
+ match raw {
+ "string" => Ok(FieldValueType::String),
+ "numeric" => Ok(FieldValueType::Numeric),
+ "boolean" => Ok(FieldValueType::Boolean),
+ "timestamp" => Ok(FieldValueType::Timestamp),
+ _ => Err(to_sql_conversion_error(StoreError::Json(
+ serde_json::Error::io(io::Error::new(
+ io::ErrorKind::InvalidData,
+ format!("invalid field type `{raw}`"),
+ )),
+ ))),
}
}
-fn parse_frontier_status(raw: &str) -> Result<FrontierStatus, StoreError> {
+fn parse_experiment_status(raw: &str) -> Result<ExperimentStatus, rusqlite::Error> {
match raw {
- "exploring" => Ok(FrontierStatus::Exploring),
- "paused" => Ok(FrontierStatus::Paused),
- "saturated" => Ok(FrontierStatus::Saturated),
- "archived" => Ok(FrontierStatus::Archived),
- other => Err(StoreError::Json(serde_json::Error::io(io::Error::new(
- io::ErrorKind::InvalidData,
- format!("unknown frontier status `{other}`"),
- )))),
+ "open" => Ok(ExperimentStatus::Open),
+ "closed" => Ok(ExperimentStatus::Closed),
+ _ => Err(to_sql_conversion_error(StoreError::Json(
+ serde_json::Error::io(io::Error::new(
+ io::ErrorKind::InvalidData,
+ format!("invalid experiment status `{raw}`"),
+ )),
+ ))),
}
}
-fn encode_run_status(status: RunStatus) -> &'static str {
- match status {
- RunStatus::Queued => "queued",
- RunStatus::Running => "running",
- RunStatus::Succeeded => "succeeded",
- RunStatus::Failed => "failed",
- RunStatus::Cancelled => "cancelled",
+fn parse_artifact_kind(raw: &str) -> Result<ArtifactKind, rusqlite::Error> {
+ match raw {
+ "document" => Ok(ArtifactKind::Document),
+ "link" => Ok(ArtifactKind::Link),
+ "log" => Ok(ArtifactKind::Log),
+ "table" => Ok(ArtifactKind::Table),
+ "plot" => Ok(ArtifactKind::Plot),
+ "dump" => Ok(ArtifactKind::Dump),
+ "binary" => Ok(ArtifactKind::Binary),
+ "other" => Ok(ArtifactKind::Other),
+ _ => Err(to_sql_conversion_error(StoreError::Json(
+ serde_json::Error::io(io::Error::new(
+ io::ErrorKind::InvalidData,
+ format!("invalid artifact kind `{raw}`"),
+ )),
+ ))),
}
}
-fn encode_backend(backend: ExecutionBackend) -> &'static str {
- match backend {
- ExecutionBackend::LocalProcess => "local-process",
- ExecutionBackend::WorktreeProcess => "worktree-process",
- ExecutionBackend::SshProcess => "ssh-process",
+fn resolve_selector(raw: &str) -> Result<Selector, StoreError> {
+ if let Ok(uuid) = Uuid::parse_str(raw) {
+ Ok(Selector::Id(uuid))
+ } else {
+ Ok(Selector::Slug(Slug::new(raw.to_owned())?))
}
}
-fn encode_field_value_type(value_type: FieldValueType) -> &'static str {
- value_type.as_str()
+enum Selector {
+ Id(Uuid),
+ Slug(Slug),
}
-fn decode_field_value_type(raw: &str) -> Result<FieldValueType, StoreError> {
- match raw {
- "string" => Ok(FieldValueType::String),
- "numeric" => Ok(FieldValueType::Numeric),
- "boolean" => Ok(FieldValueType::Boolean),
- "timestamp" => Ok(FieldValueType::Timestamp),
- other => Err(StoreError::Json(serde_json::Error::io(io::Error::new(
- io::ErrorKind::InvalidData,
- format!("unknown field value type `{other}`"),
- )))),
+fn slugify(raw: &str) -> Result<Slug, CoreError> {
+ let mut slug = String::with_capacity(raw.len());
+ let mut last_was_separator = true;
+ for character in raw.chars().flat_map(char::to_lowercase) {
+ if character.is_ascii_alphanumeric() {
+ slug.push(character);
+ last_was_separator = false;
+ continue;
+ }
+ if matches!(character, ' ' | '-' | '_' | '/' | ':') && !last_was_separator {
+ slug.push('-');
+ last_was_separator = true;
+ }
}
-}
-
-fn encode_metric_unit(unit: MetricUnit) -> &'static str {
- match unit {
- MetricUnit::Seconds => "seconds",
- MetricUnit::Bytes => "bytes",
- MetricUnit::Count => "count",
- MetricUnit::Ratio => "ratio",
- MetricUnit::Custom => "custom",
+ if slug.ends_with('-') {
+ let _ = slug.pop();
}
-}
-
-fn decode_metric_unit(raw: &str) -> Result<MetricUnit, StoreError> {
- match raw {
- "seconds" => Ok(MetricUnit::Seconds),
- "bytes" => Ok(MetricUnit::Bytes),
- "count" => Ok(MetricUnit::Count),
- "ratio" => Ok(MetricUnit::Ratio),
- "custom" => Ok(MetricUnit::Custom),
- other => Err(StoreError::Json(serde_json::Error::io(io::Error::new(
- io::ErrorKind::InvalidData,
- format!("unknown metric unit `{other}`"),
- )))),
+ if slug.is_empty() {
+ slug.push_str("untitled");
}
+ Slug::new(slug)
}
-fn encode_optimization_objective(objective: OptimizationObjective) -> &'static str {
- match objective {
- OptimizationObjective::Minimize => "minimize",
- OptimizationObjective::Maximize => "maximize",
- OptimizationObjective::Target => "target",
+fn vertex_kind_name(vertex: VertexRef) -> &'static str {
+ match vertex {
+ VertexRef::Hypothesis(_) => "hypothesis",
+ VertexRef::Experiment(_) => "experiment",
}
}
-fn decode_optimization_objective(raw: &str) -> Result<OptimizationObjective, StoreError> {
- match raw {
- "minimize" => Ok(OptimizationObjective::Minimize),
- "maximize" => Ok(OptimizationObjective::Maximize),
- "target" => Ok(OptimizationObjective::Target),
- other => Err(StoreError::Json(serde_json::Error::io(io::Error::new(
- io::ErrorKind::InvalidData,
- format!("unknown optimization objective `{other}`"),
- )))),
+fn attachment_target_kind_name(target: AttachmentTargetRef) -> &'static str {
+ match target {
+ AttachmentTargetRef::Frontier(_) => "frontier",
+ AttachmentTargetRef::Hypothesis(_) => "hypothesis",
+ AttachmentTargetRef::Experiment(_) => "experiment",
}
}
-fn encode_frontier_verdict(verdict: FrontierVerdict) -> &'static str {
- match verdict {
- FrontierVerdict::Accepted => "accepted",
- FrontierVerdict::Kept => "kept",
- FrontierVerdict::Parked => "parked",
- FrontierVerdict::Rejected => "rejected",
+fn decode_vertex_ref(kind: &str, raw_id: &str) -> Result<VertexRef, rusqlite::Error> {
+ let uuid = parse_uuid_sql(raw_id)?;
+ match kind {
+ "hypothesis" => Ok(VertexRef::Hypothesis(HypothesisId::from_uuid(uuid))),
+ "experiment" => Ok(VertexRef::Experiment(ExperimentId::from_uuid(uuid))),
+ _ => Err(to_sql_conversion_error(StoreError::Json(
+ serde_json::Error::io(io::Error::new(
+ io::ErrorKind::InvalidData,
+ format!("invalid vertex kind `{kind}`"),
+ )),
+ ))),
}
}
-fn parse_frontier_verdict(raw: &str) -> Result<FrontierVerdict, StoreError> {
- match raw {
- "accepted" => Ok(FrontierVerdict::Accepted),
- "kept" => Ok(FrontierVerdict::Kept),
- "parked" => Ok(FrontierVerdict::Parked),
- "rejected" => Ok(FrontierVerdict::Rejected),
- other => Err(StoreError::Json(serde_json::Error::io(io::Error::new(
- io::ErrorKind::InvalidData,
- format!("unknown frontier verdict `{other}`"),
- )))),
+fn decode_attachment_target(
+ kind: &str,
+ raw_id: &str,
+) -> Result<AttachmentTargetRef, rusqlite::Error> {
+ let uuid = parse_uuid_sql(raw_id)?;
+ match kind {
+ "frontier" => Ok(AttachmentTargetRef::Frontier(FrontierId::from_uuid(uuid))),
+ "hypothesis" => Ok(AttachmentTargetRef::Hypothesis(HypothesisId::from_uuid(
+ uuid,
+ ))),
+ "experiment" => Ok(AttachmentTargetRef::Experiment(ExperimentId::from_uuid(
+ uuid,
+ ))),
+ _ => Err(to_sql_conversion_error(StoreError::Json(
+ serde_json::Error::io(io::Error::new(
+ io::ErrorKind::InvalidData,
+ format!("invalid attachment target kind `{kind}`"),
+ )),
+ ))),
}
}
-type RunDimensionColumns = (Option<String>, Option<f64>, Option<i64>, Option<String>);
-
-fn encode_run_dimension_columns(
- value: &RunDimensionValue,
-) -> Result<RunDimensionColumns, StoreError> {
- match value {
- RunDimensionValue::String(item) => Ok((Some(item.to_string()), None, None, None)),
- RunDimensionValue::Numeric(item) => Ok((None, Some(*item), None, None)),
- RunDimensionValue::Boolean(item) => Ok((None, None, Some(i64::from(*item)), None)),
- RunDimensionValue::Timestamp(item) => {
- let _ = OffsetDateTime::parse(item.as_str(), &Rfc3339)?;
- Ok((None, None, None, Some(item.to_string())))
+fn derive_active_tags(
+ active_hypotheses: &[HypothesisCurrentState],
+ open_experiments: &[ExperimentSummary],
+) -> Vec<TagName> {
+ let mut tags = BTreeSet::new();
+ for state in active_hypotheses {
+ tags.extend(state.hypothesis.tags.iter().cloned());
+ for experiment in &state.open_experiments {
+ tags.extend(experiment.tags.iter().cloned());
}
}
-}
-
-fn decode_run_dimension_value(
- value_type: FieldValueType,
- value_text: Option<String>,
- value_numeric: Option<f64>,
- value_boolean: Option<i64>,
- value_timestamp: Option<String>,
-) -> Result<RunDimensionValue, StoreError> {
- match value_type {
- FieldValueType::String => Ok(RunDimensionValue::String(NonEmptyText::new(
- value_text.ok_or_else(|| {
- StoreError::Json(serde_json::Error::io(io::Error::new(
- io::ErrorKind::InvalidData,
- "missing string dimension value",
- )))
- })?,
- )?)),
- FieldValueType::Numeric => Ok(RunDimensionValue::Numeric(value_numeric.ok_or_else(
- || {
- StoreError::Json(serde_json::Error::io(io::Error::new(
- io::ErrorKind::InvalidData,
- "missing numeric dimension value",
- )))
- },
- )?)),
- FieldValueType::Boolean => Ok(RunDimensionValue::Boolean(
- value_boolean.ok_or_else(|| {
- StoreError::Json(serde_json::Error::io(io::Error::new(
- io::ErrorKind::InvalidData,
- "missing boolean dimension value",
- )))
- })? != 0,
- )),
- FieldValueType::Timestamp => {
- let value = value_timestamp.ok_or_else(|| {
- StoreError::Json(serde_json::Error::io(io::Error::new(
- io::ErrorKind::InvalidData,
- "missing timestamp dimension value",
- )))
- })?;
- let _ = OffsetDateTime::parse(&value, &Rfc3339)?;
- Ok(RunDimensionValue::Timestamp(NonEmptyText::new(value)?))
- }
+ for experiment in open_experiments {
+ tags.extend(experiment.tags.iter().cloned());
}
+ tags.into_iter().collect()
}
-fn dimension_value_text(value: &RunDimensionValue) -> String {
- match value {
- RunDimensionValue::String(item) | RunDimensionValue::Timestamp(item) => item.to_string(),
- RunDimensionValue::Numeric(item) => item.to_string(),
- RunDimensionValue::Boolean(item) => item.to_string(),
- }
+fn dimension_subset_matches(
+ expected: &BTreeMap<NonEmptyText, RunDimensionValue>,
+ observed: &BTreeMap<NonEmptyText, RunDimensionValue>,
+) -> bool {
+ expected.iter().all(|(key, value)| {
+ observed
+ .get(key)
+ .is_some_and(|candidate| candidate == value)
+ })
}
-fn value_kind_name(value: &Value) -> &'static str {
- match value {
- Value::Null => "null",
- Value::Bool(_) => "boolean",
- Value::Number(_) => "numeric",
- Value::String(_) => "string",
- Value::Array(_) => "array",
- Value::Object(_) => "object",
+fn compare_metric_values(left: f64, right: f64, order: MetricRankOrder) -> std::cmp::Ordering {
+ let ordering = left
+ .partial_cmp(&right)
+ .unwrap_or(std::cmp::Ordering::Equal);
+ match order {
+ MetricRankOrder::Asc => ordering,
+ MetricRankOrder::Desc => ordering.reverse(),
}
}
-#[cfg(test)]
-mod tests {
- use std::collections::{BTreeMap, BTreeSet};
-
- use serde_json::json;
-
- use super::{
- CloseExperimentRequest, CreateFrontierRequest, CreateNodeRequest, DefineMetricRequest,
- DefineRunDimensionRequest, ListNodesQuery, MetricBestQuery, MetricFieldSource,
- MetricKeyQuery, MetricRankOrder, OpenExperimentRequest, PROJECT_SCHEMA_NAME, ProjectStore,
- RemoveSchemaFieldRequest, UpsertSchemaFieldRequest,
- };
- use fidget_spinner_core::{
- CommandRecipe, DiagnosticSeverity, EvaluationProtocol, FieldPresence, FieldRole,
- FieldValueType, FrontierContract, FrontierNote, FrontierVerdict, InferencePolicy,
- MetricSpec, MetricUnit, MetricValue, NodeAnnotation, NodeClass, NodePayload, NonEmptyText,
- OptimizationObjective, RunDimensionValue, TagName,
- };
-
- fn temp_project_root(label: &str) -> camino::Utf8PathBuf {
- let mut path = std::env::temp_dir();
- path.push(format!(
- "fidget_spinner_store_test_{}_{}",
- label,
- uuid::Uuid::now_v7()
- ));
- camino::Utf8PathBuf::from(path.to_string_lossy().into_owned())
- }
-
- #[test]
- fn init_writes_model_facing_schema_file() -> Result<(), super::StoreError> {
- let root = temp_project_root("schema");
- let store = ProjectStore::init(
- &root,
- NonEmptyText::new("test project")?,
- NonEmptyText::new("local.test")?,
- )?;
+fn all_metrics(outcome: &ExperimentOutcome) -> Vec<MetricValue> {
+ std::iter::once(outcome.primary_metric.clone())
+ .chain(outcome.supporting_metrics.clone())
+ .collect()
+}
- assert!(store.state_root().join(PROJECT_SCHEMA_NAME).exists());
- Ok(())
- }
+fn bool_to_sql(value: bool) -> i64 {
+ i64::from(value)
+}
- #[test]
- fn add_node_persists_hidden_annotations() -> Result<(), super::StoreError> {
- let root = temp_project_root("notes");
- let mut store = ProjectStore::init(
- &root,
- NonEmptyText::new("test project")?,
- NonEmptyText::new("local.test")?,
- )?;
- let node = store.add_node(CreateNodeRequest {
- class: NodeClass::Source,
- frontier_id: None,
- title: NonEmptyText::new("feature sketch")?,
- summary: Some(NonEmptyText::new("research note")?),
- tags: None,
- payload: NodePayload::with_schema(
- store.schema().schema_ref(),
- super::json_object(json!({"body": "freeform"}))?,
- ),
- annotations: vec![NodeAnnotation::hidden(NonEmptyText::new(
- "private scratch",
- )?)],
- attachments: Vec::new(),
- })?;
- let loaded = store
- .get_node(node.id)?
- .ok_or(super::StoreError::NodeNotFound(node.id))?;
-
- assert_eq!(loaded.annotations.len(), 1);
- assert_eq!(
- loaded.annotations[0].visibility,
- fidget_spinner_core::AnnotationVisibility::HiddenByDefault
- );
- Ok(())
- }
+fn count_rows(connection: &Connection, table: &str) -> Result<u64, StoreError> {
+ let sql = format!("SELECT COUNT(*) FROM {table}");
+ connection
+ .query_row(&sql, [], |row| row.get::<_, u64>(0))
+ .map_err(StoreError::from)
+}
- #[test]
- fn frontier_projection_tracks_experiment_counts() -> Result<(), super::StoreError> {
- let root = temp_project_root("frontier");
- let mut store = ProjectStore::init(
- &root,
- NonEmptyText::new("test project")?,
- NonEmptyText::new("local.test")?,
- )?;
- let projection = store.create_frontier(CreateFrontierRequest {
- label: NonEmptyText::new("optimization frontier")?,
- contract_title: NonEmptyText::new("contract root")?,
- contract_summary: None,
- contract: FrontierContract {
- objective: NonEmptyText::new("improve wall time")?,
- evaluation: EvaluationProtocol {
- benchmark_suites: BTreeSet::from([NonEmptyText::new("smoke")?]),
- primary_metric: MetricSpec {
- metric_key: NonEmptyText::new("wall_clock_s")?,
- unit: MetricUnit::Seconds,
- objective: OptimizationObjective::Minimize,
- },
- supporting_metrics: BTreeSet::new(),
- },
- promotion_criteria: vec![NonEmptyText::new("strict speedup")?],
- },
- })?;
+fn count_rows_where(
+ connection: &Connection,
+ table: &str,
+ predicate: &str,
+) -> Result<u64, StoreError> {
+ let sql = format!("SELECT COUNT(*) FROM {table} WHERE {predicate}");
+ connection
+ .query_row(&sql, [], |row| row.get::<_, u64>(0))
+ .map_err(StoreError::from)
+}
- assert_eq!(projection.open_experiment_count, 0);
- assert_eq!(projection.completed_experiment_count, 0);
- assert_eq!(projection.verdict_counts.accepted, 0);
- assert_eq!(projection.verdict_counts.kept, 0);
- assert_eq!(projection.verdict_counts.parked, 0);
- assert_eq!(projection.verdict_counts.rejected, 0);
- Ok(())
+fn apply_limit<T>(items: Vec<T>, limit: Option<u32>) -> Vec<T> {
+ if let Some(limit) = limit {
+ items.into_iter().take(limit as usize).collect()
+ } else {
+ items
}
+}
- #[test]
- fn list_nodes_hides_archived_by_default() -> Result<(), super::StoreError> {
- let root = temp_project_root("archive");
- let mut store = ProjectStore::init(
- &root,
- NonEmptyText::new("test project")?,
- NonEmptyText::new("local.test")?,
- )?;
- let node = store.add_node(CreateNodeRequest {
- class: NodeClass::Note,
- frontier_id: None,
- title: NonEmptyText::new("quick note")?,
- summary: Some(NonEmptyText::new("quick note summary")?),
- tags: Some(BTreeSet::new()),
- payload: NodePayload::with_schema(
- store.schema().schema_ref(),
- super::json_object(json!({"body": "hello"}))?,
- ),
- annotations: Vec::new(),
- attachments: Vec::new(),
- })?;
- store.archive_node(node.id)?;
-
- let visible = store.list_nodes(ListNodesQuery::default())?;
- let hidden = store.list_nodes(ListNodesQuery {
- include_archived: true,
- ..ListNodesQuery::default()
- })?;
-
- assert!(visible.is_empty());
- assert_eq!(hidden.len(), 1);
- Ok(())
+fn apply_optional_text_patch<T>(patch: Option<TextPatch<T>>, current: Option<T>) -> Option<T> {
+ match patch {
+ None => current,
+ Some(TextPatch::Set(value)) => Some(value),
+ Some(TextPatch::Clear) => None,
}
+}
- #[test]
- fn frontier_filter_includes_root_contract_node() -> Result<(), super::StoreError> {
- let root = temp_project_root("contract-filter");
- let mut store = ProjectStore::init(
- &root,
- NonEmptyText::new("test project")?,
- NonEmptyText::new("local.test")?,
- )?;
- let projection = store.create_frontier(CreateFrontierRequest {
- label: NonEmptyText::new("frontier")?,
- contract_title: NonEmptyText::new("root contract")?,
- contract_summary: None,
- contract: FrontierContract {
- objective: NonEmptyText::new("optimize")?,
- evaluation: EvaluationProtocol {
- benchmark_suites: BTreeSet::from([NonEmptyText::new("smoke")?]),
- primary_metric: MetricSpec {
- metric_key: NonEmptyText::new("wall_clock_s")?,
- unit: MetricUnit::Seconds,
- objective: OptimizationObjective::Minimize,
- },
- supporting_metrics: BTreeSet::new(),
- },
- promotion_criteria: vec![NonEmptyText::new("faster")?],
- },
- })?;
-
- let nodes = store.list_nodes(ListNodesQuery {
- frontier_id: Some(projection.frontier.id),
- ..ListNodesQuery::default()
- })?;
+fn write_json_file<T: Serialize>(path: &Utf8Path, value: &T) -> Result<(), StoreError> {
+ let bytes = serde_json::to_vec_pretty(value)?;
+ fs::write(path.as_std_path(), bytes)?;
+ Ok(())
+}
- assert_eq!(nodes.len(), 1);
- assert_eq!(nodes[0].class, NodeClass::Contract);
- Ok(())
- }
+fn read_json_file<T: for<'de> Deserialize<'de>>(path: &Utf8Path) -> Result<T, StoreError> {
+ let bytes = fs::read(path.as_std_path())?;
+ serde_json::from_slice(&bytes).map_err(StoreError::from)
+}
- #[test]
- fn notes_require_explicit_tags_even_when_empty() -> Result<(), super::StoreError> {
- let root = temp_project_root("note-tags-required");
- let mut store = ProjectStore::init(
- &root,
- NonEmptyText::new("test project")?,
- NonEmptyText::new("local.test")?,
- )?;
+fn encode_json<T: Serialize>(value: &T) -> Result<String, StoreError> {
+ serde_json::to_string(value).map_err(StoreError::from)
+}
- let result = store.add_node(CreateNodeRequest {
- class: NodeClass::Note,
- frontier_id: None,
- title: NonEmptyText::new("quick note")?,
- summary: Some(NonEmptyText::new("quick note summary")?),
- tags: None,
- payload: NodePayload::with_schema(
- store.schema().schema_ref(),
- super::json_object(json!({"body": "hello"}))?,
- ),
- annotations: Vec::new(),
- attachments: Vec::new(),
- });
+fn decode_json<T: for<'de> Deserialize<'de>>(raw: &str) -> Result<T, StoreError> {
+ serde_json::from_str(raw).map_err(StoreError::from)
+}
- assert!(matches!(result, Err(super::StoreError::NoteTagsRequired)));
- Ok(())
- }
+fn encode_timestamp(timestamp: OffsetDateTime) -> Result<String, StoreError> {
+ timestamp.format(&Rfc3339).map_err(StoreError::from)
+}
- #[test]
- fn tags_round_trip_and_filter_node_list() -> Result<(), super::StoreError> {
- let root = temp_project_root("tag-roundtrip");
- let mut store = ProjectStore::init(
- &root,
- NonEmptyText::new("test project")?,
- NonEmptyText::new("local.test")?,
- )?;
- let cuts = store.add_tag(
- TagName::new("cuts/core")?,
- NonEmptyText::new("Core cutset work")?,
- )?;
- let heuristics = store.add_tag(
- TagName::new("heuristic")?,
- NonEmptyText::new("Heuristic tuning")?,
- )?;
- let note = store.add_node(CreateNodeRequest {
- class: NodeClass::Note,
- frontier_id: None,
- title: NonEmptyText::new("tagged note")?,
- summary: Some(NonEmptyText::new("tagged note summary")?),
- tags: Some(BTreeSet::from([cuts.name.clone(), heuristics.name.clone()])),
- payload: NodePayload::with_schema(
- store.schema().schema_ref(),
- super::json_object(json!({"body": "tagged"}))?,
- ),
- annotations: Vec::new(),
- attachments: Vec::new(),
- })?;
+fn decode_timestamp(raw: &str) -> Result<OffsetDateTime, time::error::Parse> {
+ OffsetDateTime::parse(raw, &Rfc3339)
+}
- let loaded = store
- .get_node(note.id)?
- .ok_or(super::StoreError::NodeNotFound(note.id))?;
- assert_eq!(loaded.tags.len(), 2);
+fn state_root(project_root: &Utf8Path) -> Utf8PathBuf {
+ project_root.join(STORE_DIR_NAME)
+}
- let filtered = store.list_nodes(ListNodesQuery {
- tags: BTreeSet::from([cuts.name]),
- ..ListNodesQuery::default()
- })?;
- assert_eq!(filtered.len(), 1);
- assert_eq!(filtered[0].tags.len(), 2);
- Ok(())
+#[must_use]
+pub fn discover_project_root(path: impl AsRef<Utf8Path>) -> Option<Utf8PathBuf> {
+ let mut cursor = discovery_start(path.as_ref());
+ loop {
+ if state_root(&cursor).exists() {
+ return Some(cursor);
+ }
+ let parent = cursor.parent()?;
+ cursor = parent.to_path_buf();
}
+}
- #[test]
- fn prose_nodes_require_summary_and_body() -> Result<(), super::StoreError> {
- let root = temp_project_root("prose-summary");
- let mut store = ProjectStore::init(
- &root,
- NonEmptyText::new("test project")?,
- NonEmptyText::new("local.test")?,
- )?;
-
- let missing_summary = store.add_node(CreateNodeRequest {
- class: NodeClass::Source,
- frontier_id: None,
- title: NonEmptyText::new("research note")?,
- summary: None,
- tags: None,
- payload: NodePayload::with_schema(
- store.schema().schema_ref(),
- super::json_object(json!({"body": "research body"}))?,
- ),
- annotations: Vec::new(),
- attachments: Vec::new(),
- });
- assert!(matches!(
- missing_summary,
- Err(super::StoreError::ProseSummaryRequired(NodeClass::Source))
- ));
-
- let missing_body = store.add_node(CreateNodeRequest {
- class: NodeClass::Note,
- frontier_id: None,
- title: NonEmptyText::new("quick note")?,
- summary: Some(NonEmptyText::new("quick note summary")?),
- tags: Some(BTreeSet::new()),
- payload: NodePayload::with_schema(store.schema().schema_ref(), serde_json::Map::new()),
- annotations: Vec::new(),
- attachments: Vec::new(),
- });
- assert!(matches!(
- missing_body,
- Err(super::StoreError::ProseBodyRequired(NodeClass::Note))
- ));
- Ok(())
+fn discovery_start(path: &Utf8Path) -> Utf8PathBuf {
+ match fs::metadata(path.as_std_path()) {
+ Ok(metadata) if metadata.is_file() => path
+ .parent()
+ .map_or_else(|| path.to_path_buf(), Utf8Path::to_path_buf),
+ _ => path.to_path_buf(),
}
+}
- #[test]
- fn opening_store_backfills_missing_prose_summaries() -> Result<(), super::StoreError> {
- let root = temp_project_root("summary-backfill");
- let mut store = ProjectStore::init(
- &root,
- NonEmptyText::new("test project")?,
- NonEmptyText::new("local.test")?,
- )?;
- let node = store.add_node(CreateNodeRequest {
- class: NodeClass::Source,
- frontier_id: None,
- title: NonEmptyText::new("research note")?,
- summary: Some(NonEmptyText::new("temporary summary")?),
- tags: None,
- payload: NodePayload::with_schema(
- store.schema().schema_ref(),
- super::json_object(json!({"body": "First paragraph.\n\nSecond paragraph."}))?,
- ),
- annotations: Vec::new(),
- attachments: Vec::new(),
- })?;
- drop(store);
-
- let connection = rusqlite::Connection::open(
- root.join(super::STORE_DIR_NAME)
- .join(super::STATE_DB_NAME)
- .as_std_path(),
- )?;
- let _ = connection.execute(
- "UPDATE nodes SET summary = NULL WHERE id = ?1",
- rusqlite::params![node.id.to_string()],
- )?;
- drop(connection);
-
- let reopened = ProjectStore::open(&root)?;
- let loaded = reopened
- .get_node(node.id)?
- .ok_or(super::StoreError::NodeNotFound(node.id))?;
- assert_eq!(
- loaded.summary.as_ref().map(NonEmptyText::as_str),
- Some("First paragraph.")
- );
- Ok(())
- }
+fn to_sql_conversion_error(error: StoreError) -> rusqlite::Error {
+ rusqlite::Error::FromSqlConversionFailure(0, rusqlite::types::Type::Text, Box::new(error))
+}
- #[test]
- fn schema_field_upsert_remove_persists_and_bumps_version() -> Result<(), super::StoreError> {
- let root = temp_project_root("schema-upsert-remove");
- let mut store = ProjectStore::init(
- &root,
- NonEmptyText::new("test project")?,
- NonEmptyText::new("local.test")?,
- )?;
- let initial_version = store.schema().version;
-
- let field = store.upsert_schema_field(UpsertSchemaFieldRequest {
- name: NonEmptyText::new("scenario")?,
- node_classes: BTreeSet::from([NodeClass::Hypothesis, NodeClass::Analysis]),
- presence: FieldPresence::Recommended,
- severity: DiagnosticSeverity::Warning,
- role: FieldRole::ProjectionGate,
- inference_policy: InferencePolicy::ManualOnly,
- value_type: Some(FieldValueType::String),
- })?;
- assert_eq!(field.name.as_str(), "scenario");
- assert_eq!(store.schema().version, initial_version + 1);
- assert!(
- store
- .schema()
- .fields
- .iter()
- .any(|item| item.name.as_str() == "scenario")
- );
- drop(store);
-
- let mut reopened = ProjectStore::open(&root)?;
- assert_eq!(reopened.schema().version, initial_version + 1);
- assert!(
- reopened
- .schema()
- .fields
- .iter()
- .any(|item| item.name.as_str() == "scenario")
- );
+fn core_to_sql_conversion_error(error: CoreError) -> rusqlite::Error {
+ to_sql_conversion_error(StoreError::from(error))
+}
- let removed = reopened.remove_schema_field(RemoveSchemaFieldRequest {
- name: NonEmptyText::new("scenario")?,
- node_classes: Some(BTreeSet::from([NodeClass::Hypothesis, NodeClass::Analysis])),
- })?;
- assert_eq!(removed, 1);
- assert_eq!(reopened.schema().version, initial_version + 2);
- assert!(
- !reopened
- .schema()
- .fields
- .iter()
- .any(|item| item.name.as_str() == "scenario")
- );
- Ok(())
- }
+fn uuid_to_sql_conversion_error(error: uuid::Error) -> rusqlite::Error {
+ to_sql_conversion_error(StoreError::from(error))
+}
- #[test]
- fn metric_queries_surface_canonical_and_payload_numeric_fields() -> Result<(), super::StoreError>
- {
- let root = temp_project_root("metric-best");
- let mut store = ProjectStore::init(
- &root,
- NonEmptyText::new("test project")?,
- NonEmptyText::new("local.test")?,
- )?;
- let projection = store.create_frontier(CreateFrontierRequest {
- label: NonEmptyText::new("optimization frontier")?,
- contract_title: NonEmptyText::new("contract root")?,
- contract_summary: None,
- contract: FrontierContract {
- objective: NonEmptyText::new("improve wall time")?,
- evaluation: EvaluationProtocol {
- benchmark_suites: BTreeSet::from([NonEmptyText::new("smoke")?]),
- primary_metric: MetricSpec {
- metric_key: NonEmptyText::new("wall_clock_s")?,
- unit: MetricUnit::Seconds,
- objective: OptimizationObjective::Minimize,
- },
- supporting_metrics: BTreeSet::new(),
- },
- promotion_criteria: vec![NonEmptyText::new("strict speedup")?],
- },
- })?;
- let frontier_id = projection.frontier.id;
- let _ = store.define_metric(DefineMetricRequest {
- key: NonEmptyText::new("wall_clock_s")?,
- unit: MetricUnit::Seconds,
- objective: OptimizationObjective::Minimize,
- description: Some(NonEmptyText::new("elapsed wall time")?),
- })?;
- let _ = store.define_run_dimension(DefineRunDimensionRequest {
- key: NonEmptyText::new("scenario")?,
- value_type: FieldValueType::String,
- description: Some(NonEmptyText::new("workload family")?),
- })?;
- let _ = store.define_run_dimension(DefineRunDimensionRequest {
- key: NonEmptyText::new("duration_s")?,
- value_type: FieldValueType::Numeric,
- description: Some(NonEmptyText::new("time budget in seconds")?),
- })?;
+fn time_to_sql_conversion_error(error: time::error::Parse) -> rusqlite::Error {
+ to_sql_conversion_error(StoreError::from(error))
+}
- let first_hypothesis = store.add_node(CreateNodeRequest {
- class: NodeClass::Hypothesis,
- frontier_id: Some(frontier_id),
- title: NonEmptyText::new("first hypothesis")?,
- summary: Some(NonEmptyText::new("first hypothesis summary")?),
- tags: None,
- payload: NodePayload::with_schema(
- store.schema().schema_ref(),
- super::json_object(json!({"body": "first body", "latency_hint": 14.0}))?,
- ),
- annotations: Vec::new(),
- attachments: Vec::new(),
- })?;
- let second_hypothesis = store.add_node(CreateNodeRequest {
- class: NodeClass::Hypothesis,
- frontier_id: Some(frontier_id),
- title: NonEmptyText::new("second hypothesis")?,
- summary: Some(NonEmptyText::new("second hypothesis summary")?),
- tags: None,
- payload: NodePayload::with_schema(
- store.schema().schema_ref(),
- super::json_object(json!({"body": "second body", "latency_hint": 7.0}))?,
- ),
- annotations: Vec::new(),
- attachments: Vec::new(),
- })?;
- let first_experiment = store.open_experiment(open_experiment_request(
- frontier_id,
- first_hypothesis.id,
- "first experiment",
- )?)?;
- let second_experiment = store.open_experiment(open_experiment_request(
- frontier_id,
- second_hypothesis.id,
- "second experiment",
- )?)?;
-
- let _first_receipt = store.close_experiment(experiment_request(
- &root,
- first_experiment.id,
- "first run",
- 10.0,
- run_dimensions("belt_4x5", 20.0)?,
- )?)?;
- let second_receipt = store.close_experiment(experiment_request(
- &root,
- second_experiment.id,
- "second run",
- 5.0,
- run_dimensions("belt_4x5", 60.0)?,
- )?)?;
-
- let keys = store.list_metric_keys()?;
- assert!(keys.iter().any(|key| {
- key.key.as_str() == "wall_clock_s" && key.source == MetricFieldSource::RunMetric
- }));
- assert!(keys.iter().any(|key| {
- key.key.as_str() == "latency_hint" && key.source == MetricFieldSource::HypothesisPayload
- }));
- assert!(keys.iter().any(|key| {
- key.key.as_str() == "wall_clock_s"
- && key.source == MetricFieldSource::RunMetric
- && key.description.as_ref().map(NonEmptyText::as_str) == Some("elapsed wall time")
- }));
-
- let filtered_keys = store.list_metric_keys_filtered(MetricKeyQuery {
- frontier_id: Some(frontier_id),
- source: Some(MetricFieldSource::RunMetric),
- dimensions: run_dimensions("belt_4x5", 60.0)?,
- })?;
- assert_eq!(filtered_keys.len(), 1);
- assert_eq!(filtered_keys[0].experiment_count, 1);
-
- let dimension_summaries = store.list_run_dimensions()?;
- assert!(dimension_summaries.iter().any(|dimension| {
- dimension.key.as_str() == "benchmark_suite"
- && dimension.value_type == FieldValueType::String
- && dimension.observed_run_count == 2
- }));
- assert!(dimension_summaries.iter().any(|dimension| {
- dimension.key.as_str() == "scenario"
- && dimension.description.as_ref().map(NonEmptyText::as_str)
- == Some("workload family")
- }));
- assert!(dimension_summaries.iter().any(|dimension| {
- dimension.key.as_str() == "duration_s"
- && dimension.value_type == FieldValueType::Numeric
- && dimension.distinct_value_count == 2
- }));
-
- let canonical_best = store.best_metrics(MetricBestQuery {
- key: NonEmptyText::new("wall_clock_s")?,
- frontier_id: Some(frontier_id),
- source: Some(MetricFieldSource::RunMetric),
- dimensions: run_dimensions("belt_4x5", 60.0)?,
- order: None,
- limit: 5,
- })?;
- assert_eq!(canonical_best.len(), 1);
- assert_eq!(canonical_best[0].value, 5.0);
- assert_eq!(
- canonical_best[0].experiment_title.as_str(),
- "second experiment"
- );
- assert_eq!(canonical_best[0].verdict, FrontierVerdict::Kept);
- assert_eq!(
- canonical_best[0]
- .dimensions
- .get(&NonEmptyText::new("duration_s")?),
- Some(&RunDimensionValue::Numeric(60.0))
- );
+fn parse_non_empty_text(raw: &str) -> Result<NonEmptyText, rusqlite::Error> {
+ NonEmptyText::new(raw.to_owned()).map_err(core_to_sql_conversion_error)
+}
- let payload_best = store.best_metrics(MetricBestQuery {
- key: NonEmptyText::new("latency_hint")?,
- frontier_id: Some(frontier_id),
- source: Some(MetricFieldSource::HypothesisPayload),
- dimensions: run_dimensions("belt_4x5", 60.0)?,
- order: Some(MetricRankOrder::Asc),
- limit: 5,
- })?;
- assert_eq!(payload_best.len(), 1);
- assert_eq!(payload_best[0].value, 7.0);
- assert_eq!(payload_best[0].hypothesis_node_id, second_hypothesis.id);
-
- let missing_order = store.best_metrics(MetricBestQuery {
- key: NonEmptyText::new("latency_hint")?,
- frontier_id: Some(frontier_id),
- source: Some(MetricFieldSource::HypothesisPayload),
- dimensions: BTreeMap::new(),
- order: None,
- limit: 5,
- });
- assert!(matches!(
- missing_order,
- Err(super::StoreError::MetricOrderRequired { .. })
- ));
- assert_eq!(
- second_receipt.experiment.title.as_str(),
- "second experiment"
- );
- Ok(())
- }
+fn parse_optional_non_empty_text(
+ raw: Option<String>,
+) -> Result<Option<NonEmptyText>, rusqlite::Error> {
+ raw.map(|value| parse_non_empty_text(&value)).transpose()
+}
- #[test]
- fn opening_store_backfills_legacy_benchmark_suite_dimensions() -> Result<(), super::StoreError>
- {
- let root = temp_project_root("metric-plane-backfill");
- let mut store = ProjectStore::init(
- &root,
- NonEmptyText::new("test project")?,
- NonEmptyText::new("local.test")?,
- )?;
- let projection = store.create_frontier(CreateFrontierRequest {
- label: NonEmptyText::new("migration frontier")?,
- contract_title: NonEmptyText::new("migration contract")?,
- contract_summary: None,
- contract: FrontierContract {
- objective: NonEmptyText::new("exercise metric migration")?,
- evaluation: EvaluationProtocol {
- benchmark_suites: BTreeSet::from([NonEmptyText::new("smoke")?]),
- primary_metric: MetricSpec {
- metric_key: NonEmptyText::new("wall_clock_s")?,
- unit: MetricUnit::Seconds,
- objective: OptimizationObjective::Minimize,
- },
- supporting_metrics: BTreeSet::new(),
- },
- promotion_criteria: vec![NonEmptyText::new("keep the metric plane queryable")?],
- },
- })?;
- let frontier_id = projection.frontier.id;
- let hypothesis = store.add_node(CreateNodeRequest {
- class: NodeClass::Hypothesis,
- frontier_id: Some(frontier_id),
- title: NonEmptyText::new("candidate hypothesis")?,
- summary: Some(NonEmptyText::new("candidate hypothesis summary")?),
- tags: None,
- payload: NodePayload::with_schema(
- store.schema().schema_ref(),
- super::json_object(json!({"latency_hint": 9.0}))?,
- ),
- annotations: Vec::new(),
- attachments: Vec::new(),
- })?;
- let experiment = store.open_experiment(open_experiment_request(
- frontier_id,
- hypothesis.id,
- "migration experiment",
- )?)?;
- let _ = store.close_experiment(experiment_request(
- &root,
- experiment.id,
- "migration run",
- 11.0,
- BTreeMap::from([(
- NonEmptyText::new("benchmark_suite")?,
- RunDimensionValue::String(NonEmptyText::new("smoke")?),
- )]),
- )?)?;
- drop(store);
-
- let connection = rusqlite::Connection::open(
- root.join(super::STORE_DIR_NAME)
- .join(super::STATE_DB_NAME)
- .as_std_path(),
- )?;
- let _ = connection.execute("DELETE FROM run_dimensions", [])?;
- drop(connection);
-
- let reopened = ProjectStore::open(&root)?;
- let dimensions = reopened.list_run_dimensions()?;
- assert!(dimensions.iter().any(|dimension| {
- dimension.key.as_str() == "benchmark_suite" && dimension.observed_run_count == 1
- }));
-
- let best = reopened.best_metrics(MetricBestQuery {
- key: NonEmptyText::new("wall_clock_s")?,
- frontier_id: Some(frontier_id),
- source: Some(MetricFieldSource::RunMetric),
- dimensions: BTreeMap::from([(
- NonEmptyText::new("benchmark_suite")?,
- RunDimensionValue::String(NonEmptyText::new("smoke")?),
- )]),
- order: None,
- limit: 5,
- })?;
- assert_eq!(best.len(), 1);
- assert_eq!(best[0].value, 11.0);
- Ok(())
- }
+fn parse_slug(raw: &str) -> Result<Slug, rusqlite::Error> {
+ Slug::new(raw.to_owned()).map_err(core_to_sql_conversion_error)
+}
- fn open_experiment_request(
- frontier_id: fidget_spinner_core::FrontierId,
- hypothesis_node_id: fidget_spinner_core::NodeId,
- title: &str,
- ) -> Result<OpenExperimentRequest, super::StoreError> {
- Ok(OpenExperimentRequest {
- frontier_id,
- hypothesis_node_id,
- title: NonEmptyText::new(title)?,
- summary: Some(NonEmptyText::new(format!("{title} summary"))?),
- })
- }
+fn parse_tag_name(raw: &str) -> Result<TagName, rusqlite::Error> {
+ TagName::new(raw.to_owned()).map_err(core_to_sql_conversion_error)
+}
- fn experiment_request(
- root: &camino::Utf8Path,
- experiment_id: fidget_spinner_core::ExperimentId,
- run_title: &str,
- wall_clock_s: f64,
- dimensions: BTreeMap<NonEmptyText, RunDimensionValue>,
- ) -> Result<CloseExperimentRequest, super::StoreError> {
- Ok(CloseExperimentRequest {
- experiment_id,
- run_title: NonEmptyText::new(run_title)?,
- run_summary: Some(NonEmptyText::new("run summary")?),
- backend: fidget_spinner_core::ExecutionBackend::WorktreeProcess,
- dimensions,
- command: CommandRecipe::new(
- root.to_path_buf(),
- vec![NonEmptyText::new("true")?],
- BTreeMap::new(),
- )?,
- primary_metric: MetricValue {
- key: NonEmptyText::new("wall_clock_s")?,
- value: wall_clock_s,
- },
- supporting_metrics: Vec::new(),
- note: FrontierNote {
- summary: NonEmptyText::new("note summary")?,
- next_hypotheses: Vec::new(),
- },
- verdict: FrontierVerdict::Kept,
- analysis: None,
- decision_title: NonEmptyText::new("decision")?,
- decision_rationale: NonEmptyText::new("decision rationale")?,
- })
- }
+fn parse_uuid_sql(raw: &str) -> Result<Uuid, rusqlite::Error> {
+ Uuid::parse_str(raw).map_err(uuid_to_sql_conversion_error)
+}
- fn run_dimensions(
- scenario: &str,
- duration_s: f64,
- ) -> Result<BTreeMap<NonEmptyText, RunDimensionValue>, super::StoreError> {
- Ok(BTreeMap::from([
- (
- NonEmptyText::new("benchmark_suite")?,
- RunDimensionValue::String(NonEmptyText::new("smoke")?),
- ),
- (
- NonEmptyText::new("scenario")?,
- RunDimensionValue::String(NonEmptyText::new(scenario)?),
- ),
- (
- NonEmptyText::new("duration_s")?,
- RunDimensionValue::Numeric(duration_s),
- ),
- ]))
- }
+fn parse_timestamp_sql(raw: &str) -> Result<OffsetDateTime, rusqlite::Error> {
+ decode_timestamp(raw).map_err(time_to_sql_conversion_error)
}