From f706910944ee8abe7b27a248596f7705059969d9 Mon Sep 17 00:00:00 2001 From: main Date: Thu, 19 Mar 2026 22:28:01 -0400 Subject: Polish MCP ingest and schema surfaces --- crates/fidget-spinner-store-sqlite/src/lib.rs | 2305 ++++++++++++++++++++++++- 1 file changed, 2247 insertions(+), 58 deletions(-) (limited to 'crates/fidget-spinner-store-sqlite') diff --git a/crates/fidget-spinner-store-sqlite/src/lib.rs b/crates/fidget-spinner-store-sqlite/src/lib.rs index da9fa42..b970629 100644 --- a/crates/fidget-spinner-store-sqlite/src/lib.rs +++ b/crates/fidget-spinner-store-sqlite/src/lib.rs @@ -1,4 +1,5 @@ -use std::collections::BTreeSet; +use std::cmp::Ordering; +use std::collections::{BTreeMap, BTreeSet}; use std::fmt::Write as _; use std::fs; use std::io; @@ -7,11 +8,13 @@ use std::process::Command; use camino::{Utf8Path, Utf8PathBuf}; use fidget_spinner_core::{ AnnotationVisibility, CheckpointDisposition, CheckpointRecord, CheckpointSnapshotRef, - CodeSnapshotRef, CommandRecipe, CompletedExperiment, DagEdge, DagNode, EdgeKind, - ExecutionBackend, ExperimentResult, FrontierContract, FrontierNote, FrontierProjection, - FrontierRecord, FrontierStatus, FrontierVerdict, GitCommitHash, JsonObject, MetricObservation, - MetricSpec, MetricUnit, NodeAnnotation, NodeClass, NodeDiagnostics, NodePayload, NonEmptyText, - OptimizationObjective, ProjectSchema, RunRecord, RunStatus, TagName, TagRecord, + CodeSnapshotRef, CommandRecipe, CompletedExperiment, DagEdge, DagNode, DiagnosticSeverity, + EdgeKind, ExecutionBackend, ExperimentResult, FieldPresence, FieldRole, FieldValueType, + FrontierContract, FrontierNote, FrontierProjection, FrontierRecord, FrontierStatus, + FrontierVerdict, GitCommitHash, InferencePolicy, JsonObject, MetricDefinition, MetricSpec, + MetricUnit, MetricValue, NodeAnnotation, NodeClass, NodeDiagnostics, NodePayload, NonEmptyText, + OptimizationObjective, ProjectFieldSpec, ProjectSchema, RunDimensionDefinition, + RunDimensionValue, RunRecord, RunStatus, TagName, TagRecord, }; use rusqlite::types::Value as SqlValue; use rusqlite::{Connection, OptionalExtension, Transaction, params, params_from_iter}; @@ -63,8 +66,50 @@ pub enum StoreError { DuplicateTag(TagName), #[error("note nodes require an explicit tag list; use an empty list if no tags apply")] NoteTagsRequired, + #[error("{0} nodes require a non-empty summary")] + ProseSummaryRequired(NodeClass), + #[error("{0} nodes require a non-empty string payload field `body`")] + ProseBodyRequired(NodeClass), #[error("git repository inspection failed for {0}")] GitInspectionFailed(Utf8PathBuf), + #[error("metric `{0}` is not registered")] + UnknownMetricDefinition(NonEmptyText), + #[error( + "metric `{key}` conflicts with existing definition ({existing_unit}/{existing_objective} vs {new_unit}/{new_objective})" + )] + ConflictingMetricDefinition { + key: String, + existing_unit: String, + existing_objective: String, + new_unit: String, + new_objective: String, + }, + #[error("run dimension `{0}` is not registered")] + UnknownRunDimension(NonEmptyText), + #[error("run dimension `{0}` already exists")] + DuplicateRunDimension(NonEmptyText), + #[error( + "run dimension `{key}` conflicts with existing definition ({existing_type} vs {new_type})" + )] + ConflictingRunDimensionDefinition { + key: String, + existing_type: String, + new_type: String, + }, + #[error("run dimension `{key}` expects {expected} values, got {observed}")] + InvalidRunDimensionValue { + key: String, + expected: String, + observed: String, + }, + #[error("schema field `{0}` was not found")] + SchemaFieldNotFound(String), + #[error("metric key `{key}` is ambiguous across sources: {sources}")] + AmbiguousMetricKey { key: String, sources: String }, + #[error("metric key `{key}` for source `{metric_source}` requires an explicit order")] + MetricOrderRequired { key: String, metric_source: String }, + #[error("metric key `{key}` for source `{metric_source}` has conflicting semantics")] + MetricSemanticsAmbiguous { key: String, metric_source: String }, } #[derive(Clone, Debug, Deserialize, Eq, PartialEq, Serialize)] @@ -165,6 +210,155 @@ pub struct NodeSummary { pub updated_at: OffsetDateTime, } +#[derive(Clone, Copy, Debug, Deserialize, Eq, Ord, PartialEq, PartialOrd, Serialize)] +#[serde(rename_all = "snake_case")] +pub enum MetricFieldSource { + RunMetric, + ChangePayload, + RunPayload, + AnalysisPayload, + DecisionPayload, +} + +impl MetricFieldSource { + #[must_use] + pub const fn as_str(self) -> &'static str { + match self { + Self::RunMetric => "run_metric", + Self::ChangePayload => "change_payload", + Self::RunPayload => "run_payload", + Self::AnalysisPayload => "analysis_payload", + Self::DecisionPayload => "decision_payload", + } + } + + #[must_use] + pub const fn from_payload_class(class: NodeClass) -> Option { + match class { + NodeClass::Change => Some(Self::ChangePayload), + NodeClass::Run => Some(Self::RunPayload), + NodeClass::Analysis => Some(Self::AnalysisPayload), + NodeClass::Decision => Some(Self::DecisionPayload), + NodeClass::Contract | NodeClass::Research | NodeClass::Enabling | NodeClass::Note => { + None + } + } + } +} + +#[derive(Clone, Copy, Debug, Deserialize, Eq, Ord, PartialEq, PartialOrd, Serialize)] +#[serde(rename_all = "snake_case")] +pub enum MetricRankOrder { + Asc, + Desc, +} + +impl MetricRankOrder { + #[must_use] + pub const fn as_str(self) -> &'static str { + match self { + Self::Asc => "asc", + Self::Desc => "desc", + } + } +} + +#[derive(Clone, Debug)] +pub struct MetricBestQuery { + pub key: NonEmptyText, + pub frontier_id: Option, + pub source: Option, + pub dimensions: BTreeMap, + pub order: Option, + pub limit: u32, +} + +#[derive(Clone, Debug, Deserialize, Eq, PartialEq, Serialize)] +pub struct MetricKeySummary { + pub key: NonEmptyText, + pub source: MetricFieldSource, + pub experiment_count: u64, + pub unit: Option, + pub objective: Option, + pub description: Option, + pub requires_order: bool, +} + +#[derive(Clone, Debug, Deserialize, PartialEq, Serialize)] +pub struct MetricBestEntry { + pub key: NonEmptyText, + pub source: MetricFieldSource, + pub value: f64, + pub order: MetricRankOrder, + pub experiment_id: fidget_spinner_core::ExperimentId, + pub frontier_id: fidget_spinner_core::FrontierId, + pub change_node_id: fidget_spinner_core::NodeId, + pub change_title: NonEmptyText, + pub run_id: fidget_spinner_core::RunId, + pub verdict: FrontierVerdict, + pub candidate_checkpoint_id: fidget_spinner_core::CheckpointId, + pub candidate_commit_hash: GitCommitHash, + pub unit: Option, + pub objective: Option, + pub dimensions: BTreeMap, +} + +#[derive(Clone, Debug, Default)] +pub struct MetricKeyQuery { + pub frontier_id: Option, + pub source: Option, + pub dimensions: BTreeMap, +} + +#[derive(Clone, Debug)] +pub struct DefineMetricRequest { + pub key: NonEmptyText, + pub unit: MetricUnit, + pub objective: OptimizationObjective, + pub description: Option, +} + +#[derive(Clone, Debug)] +pub struct DefineRunDimensionRequest { + pub key: NonEmptyText, + pub value_type: FieldValueType, + pub description: Option, +} + +#[derive(Clone, Debug)] +pub struct UpsertSchemaFieldRequest { + pub name: NonEmptyText, + pub node_classes: BTreeSet, + pub presence: FieldPresence, + pub severity: DiagnosticSeverity, + pub role: FieldRole, + pub inference_policy: InferencePolicy, + pub value_type: Option, +} + +#[derive(Clone, Debug)] +pub struct RemoveSchemaFieldRequest { + pub name: NonEmptyText, + pub node_classes: Option>, +} + +#[derive(Clone, Debug, Deserialize, Eq, PartialEq, Serialize)] +pub struct RunDimensionSummary { + pub key: NonEmptyText, + pub value_type: FieldValueType, + pub description: Option, + pub observed_run_count: u64, + pub distinct_value_count: u64, + pub sample_values: Vec, +} + +#[derive(Clone, Debug, Default, Deserialize, Eq, PartialEq, Serialize)] +pub struct MetricPlaneMigrationReport { + pub inserted_metric_definitions: u64, + pub inserted_dimension_definitions: u64, + pub inserted_dimension_values: u64, +} + #[derive(Clone, Debug)] pub struct CreateFrontierRequest { pub label: NonEmptyText, @@ -190,11 +384,11 @@ pub struct CloseExperimentRequest { pub run_title: NonEmptyText, pub run_summary: Option, pub backend: ExecutionBackend, - pub benchmark_suite: NonEmptyText, + pub dimensions: BTreeMap, pub command: CommandRecipe, pub code_snapshot: Option, - pub primary_metric: MetricObservation, - pub supporting_metrics: Vec, + pub primary_metric: MetricValue, + pub supporting_metrics: Vec, pub note: FrontierNote, pub verdict: FrontierVerdict, pub decision_title: NonEmptyText, @@ -233,8 +427,8 @@ impl ProjectStore { let schema = ProjectSchema::default_with_namespace(schema_namespace); write_json_file(&state_root.join(PROJECT_SCHEMA_NAME), &schema)?; - let connection = Connection::open(state_root.join(STATE_DB_NAME).as_std_path())?; - migrate(&connection)?; + let mut connection = Connection::open(state_root.join(STATE_DB_NAME).as_std_path())?; + upgrade_store(&mut connection)?; Ok(Self { project_root, @@ -252,8 +446,8 @@ impl ProjectStore { let state_root = state_root(&project_root); let config = read_json_file::(&state_root.join(PROJECT_CONFIG_NAME))?; let schema = read_json_file::(&state_root.join(PROJECT_SCHEMA_NAME))?; - let connection = Connection::open(state_root.join(STATE_DB_NAME).as_std_path())?; - migrate(&connection)?; + let mut connection = Connection::open(state_root.join(STATE_DB_NAME).as_std_path())?; + upgrade_store(&mut connection)?; Ok(Self { project_root, state_root, @@ -273,6 +467,59 @@ impl ProjectStore { &self.schema } + pub fn upsert_schema_field( + &mut self, + request: UpsertSchemaFieldRequest, + ) -> Result { + let field = ProjectFieldSpec { + name: request.name, + node_classes: request.node_classes, + presence: request.presence, + severity: request.severity, + role: request.role, + inference_policy: request.inference_policy, + value_type: request.value_type, + }; + if let Some(existing) = self.schema.fields.iter_mut().find(|existing| { + existing.name == field.name && existing.node_classes == field.node_classes + }) { + if *existing == field { + return Ok(field); + } + *existing = field.clone(); + } else { + self.schema.fields.push(field.clone()); + } + sort_schema_fields(&mut self.schema.fields); + self.bump_schema_version(); + self.save_schema()?; + Ok(field) + } + + pub fn remove_schema_field( + &mut self, + request: RemoveSchemaFieldRequest, + ) -> Result { + let before = self.schema.fields.len(); + self.schema.fields.retain(|field| { + field.name != request.name + || request + .node_classes + .as_ref() + .is_some_and(|node_classes| field.node_classes != *node_classes) + }); + let removed = before.saturating_sub(self.schema.fields.len()) as u64; + if removed == 0 { + return Err(StoreError::SchemaFieldNotFound( + request.name.as_str().to_owned(), + )); + } + sort_schema_fields(&mut self.schema.fields); + self.bump_schema_version(); + self.save_schema()?; + Ok(removed) + } + #[must_use] pub fn project_root(&self) -> &Utf8Path { &self.project_root @@ -283,6 +530,14 @@ impl ProjectStore { &self.state_root } + fn bump_schema_version(&mut self) { + self.schema.version = self.schema.version.saturating_add(1); + } + + fn save_schema(&self) -> Result<(), StoreError> { + write_json_file(&self.state_root.join(PROJECT_SCHEMA_NAME), &self.schema) + } + pub fn create_frontier( &mut self, request: CreateFrontierRequest, @@ -304,6 +559,31 @@ impl ProjectStore { let frontier = FrontierRecord::with_id(frontier_id, request.label, contract_node.id); let tx = self.connection.transaction()?; + let _ = upsert_metric_definition_tx( + &tx, + &MetricDefinition::new( + request + .contract + .evaluation + .primary_metric + .metric_key + .clone(), + request.contract.evaluation.primary_metric.unit, + request.contract.evaluation.primary_metric.objective, + None, + ), + )?; + for metric in &request.contract.evaluation.supporting_metrics { + let _ = upsert_metric_definition_tx( + &tx, + &MetricDefinition::new( + metric.metric_key.clone(), + metric.unit, + metric.objective, + None, + ), + )?; + } insert_node(&tx, &contract_node)?; insert_frontier(&tx, &frontier)?; if let Some(seed) = request.initial_checkpoint { @@ -330,6 +610,75 @@ impl ProjectStore { self.frontier_projection(frontier.id) } + pub fn define_metric( + &mut self, + request: DefineMetricRequest, + ) -> Result { + let record = MetricDefinition::new( + request.key, + request.unit, + request.objective, + request.description, + ); + let tx = self.connection.transaction()?; + let _ = upsert_metric_definition_tx(&tx, &record)?; + tx.commit()?; + Ok(record) + } + + pub fn list_metric_definitions(&self) -> Result, StoreError> { + let mut statement = self.connection.prepare( + "SELECT metric_key, unit, objective, description, created_at + FROM metric_definitions + ORDER BY metric_key ASC", + )?; + let mut rows = statement.query([])?; + let mut items = Vec::new(); + while let Some(row) = rows.next()? { + items.push(MetricDefinition { + key: NonEmptyText::new(row.get::<_, String>(0)?)?, + unit: decode_metric_unit(&row.get::<_, String>(1)?)?, + objective: decode_optimization_objective(&row.get::<_, String>(2)?)?, + description: row + .get::<_, Option>(3)? + .map(NonEmptyText::new) + .transpose()?, + created_at: decode_timestamp(&row.get::<_, String>(4)?)?, + }); + } + Ok(items) + } + + pub fn define_run_dimension( + &mut self, + request: DefineRunDimensionRequest, + ) -> Result { + let record = + RunDimensionDefinition::new(request.key, request.value_type, request.description); + let tx = self.connection.transaction()?; + let _ = insert_run_dimension_definition_tx(&tx, &record)?; + tx.commit()?; + Ok(record) + } + + pub fn list_run_dimensions(&self) -> Result, StoreError> { + load_run_dimension_summaries(self) + } + + pub fn coerce_run_dimensions( + &self, + raw_dimensions: BTreeMap, + ) -> Result, StoreError> { + coerce_run_dimension_map(&run_dimension_definitions_by_key(self)?, raw_dimensions) + } + + pub fn migrate_metric_plane(&mut self) -> Result { + let tx = self.connection.transaction()?; + let report = normalize_metric_plane_tx(&tx)?; + tx.commit()?; + Ok(report) + } + pub fn add_tag( &mut self, name: TagName, @@ -372,6 +721,7 @@ impl ProjectStore { } pub fn add_node(&mut self, request: CreateNodeRequest) -> Result { + validate_prose_node_request(&request)?; let diagnostics = self.schema.validate_node(request.class, &request.payload); let mut node = DagNode::new( request.class, @@ -406,6 +756,100 @@ impl ProjectStore { Ok(node) } + pub fn list_metric_keys(&self) -> Result, StoreError> { + self.list_metric_keys_filtered(MetricKeyQuery::default()) + } + + pub fn list_metric_keys_filtered( + &self, + query: MetricKeyQuery, + ) -> Result, StoreError> { + let mut summaries = collect_metric_samples(self, &query)? + .into_iter() + .fold( + BTreeMap::<(MetricFieldSource, String), MetricKeyAccumulator>::new(), + |mut accumulators, sample| { + let key = (sample.source, sample.key.as_str().to_owned()); + let _ = accumulators + .entry(key) + .and_modify(|entry| entry.observe(&sample)) + .or_insert_with(|| MetricKeyAccumulator::from_sample(&sample)); + accumulators + }, + ) + .into_values() + .map(MetricKeyAccumulator::finish) + .collect::>(); + if query + .source + .is_none_or(|source| source == MetricFieldSource::RunMetric) + { + merge_registered_run_metric_summaries(self, &mut summaries)?; + } + summaries.sort_by(|left, right| { + left.key + .cmp(&right.key) + .then(left.source.cmp(&right.source)) + }); + Ok(summaries) + } + + pub fn best_metrics(&self, query: MetricBestQuery) -> Result, StoreError> { + let matching = collect_metric_samples( + self, + &MetricKeyQuery { + frontier_id: query.frontier_id, + source: query.source, + dimensions: query.dimensions.clone(), + }, + )? + .into_iter() + .filter(|sample| sample.key == query.key) + .collect::>(); + if matching.is_empty() { + return Ok(Vec::new()); + } + + let source = if let Some(source) = query.source { + source + } else { + let sources = matching + .iter() + .map(|sample| sample.source) + .collect::>(); + if sources.len() != 1 { + return Err(StoreError::AmbiguousMetricKey { + key: query.key.as_str().to_owned(), + sources: sources + .into_iter() + .map(MetricFieldSource::as_str) + .collect::>() + .join(", "), + }); + } + let Some(source) = sources.iter().copied().next() else { + return Ok(Vec::new()); + }; + source + }; + + let mut matching = matching + .into_iter() + .filter(|sample| sample.source == source) + .collect::>(); + if matching.is_empty() { + return Ok(Vec::new()); + } + + let order = resolve_metric_order(&matching, &query, source)?; + matching.sort_by(|left, right| compare_metric_samples(left, right, order)); + matching.truncate(query.limit as usize); + Ok(matching + .into_iter() + .map(|sample| sample.into_entry(order)) + .collect()) + } + pub fn archive_node(&mut self, node_id: fidget_spinner_core::NodeId) -> Result<(), StoreError> { let updated_at = encode_timestamp(OffsetDateTime::now_utc())?; let changed = self.connection.execute( @@ -668,11 +1112,26 @@ impl ProjectStore { if base_checkpoint.frontier_id != request.frontier_id { return Err(StoreError::CheckpointNotFound(request.base_checkpoint_id)); } + let tx = self.connection.transaction()?; + let dimensions = validate_run_dimensions_tx(&tx, &request.dimensions)?; + let primary_metric_definition = + load_metric_definition_tx(&tx, &request.primary_metric.key)?.ok_or_else(|| { + StoreError::UnknownMetricDefinition(request.primary_metric.key.clone()) + })?; + let supporting_metric_definitions = request + .supporting_metrics + .iter() + .map(|metric| { + load_metric_definition_tx(&tx, &metric.key)? + .ok_or_else(|| StoreError::UnknownMetricDefinition(metric.key.clone())) + }) + .collect::, StoreError>>()?; + let benchmark_suite = benchmark_suite_label(&dimensions); let run_payload = NodePayload::with_schema( self.schema.schema_ref(), json_object(json!({ - "benchmark_suite": request.benchmark_suite.as_str(), + "dimensions": run_dimensions_json(&dimensions), "backend": format!("{:?}", request.backend), "command": request.command.argv.iter().map(NonEmptyText::as_str).collect::>(), }))?, @@ -695,7 +1154,7 @@ impl ProjectStore { status: RunStatus::Succeeded, backend: request.backend, code_snapshot: request.code_snapshot, - benchmark_suite: Some(request.benchmark_suite.clone()), + dimensions: dimensions.clone(), command: request.command, started_at: Some(now), finished_at: Some(now), @@ -748,7 +1207,7 @@ impl ProjectStore { analysis_node_id: request.analysis_node_id, decision_node_id: decision_node.id, result: ExperimentResult { - benchmark_suite: request.benchmark_suite, + dimensions: dimensions.clone(), primary_metric: request.primary_metric, supporting_metrics: request.supporting_metrics, benchmark_bundle: None, @@ -757,8 +1216,6 @@ impl ProjectStore { verdict: request.verdict, created_at: now, }; - - let tx = self.connection.transaction()?; insert_node(&tx, &run_node)?; insert_node(&tx, &decision_node)?; insert_edge( @@ -780,9 +1237,13 @@ impl ProjectStore { insert_run( &tx, &run, + benchmark_suite.as_deref(), &experiment.result.primary_metric, + &primary_metric_definition, &experiment.result.supporting_metrics, + supporting_metric_definitions.as_slice(), )?; + insert_run_dimensions(&tx, run.run_id, &dimensions)?; match request.verdict { FrontierVerdict::PromoteToChampion => { demote_previous_champion(&tx, request.frontier_id)?; @@ -887,6 +1348,375 @@ impl ProjectStore { } } +fn upgrade_store(connection: &mut Connection) -> Result<(), StoreError> { + migrate(connection)?; + backfill_prose_summaries(connection)?; + let tx = connection.transaction()?; + let _ = normalize_metric_plane_tx(&tx)?; + tx.commit()?; + Ok(()) +} + +fn validate_prose_node_request(request: &CreateNodeRequest) -> Result<(), StoreError> { + if !matches!(request.class, NodeClass::Note | NodeClass::Research) { + return Ok(()); + } + if request.summary.is_none() { + return Err(StoreError::ProseSummaryRequired(request.class)); + } + match request.payload.field("body") { + Some(Value::String(body)) if !body.trim().is_empty() => Ok(()), + _ => Err(StoreError::ProseBodyRequired(request.class)), + } +} + +#[derive(Clone, Debug)] +struct MetricSample { + key: NonEmptyText, + source: MetricFieldSource, + value: f64, + frontier_id: fidget_spinner_core::FrontierId, + experiment_id: fidget_spinner_core::ExperimentId, + change_node_id: fidget_spinner_core::NodeId, + change_title: NonEmptyText, + run_id: fidget_spinner_core::RunId, + verdict: FrontierVerdict, + candidate_checkpoint_id: fidget_spinner_core::CheckpointId, + candidate_commit_hash: GitCommitHash, + unit: Option, + objective: Option, + dimensions: BTreeMap, +} + +impl MetricSample { + fn into_entry(self, order: MetricRankOrder) -> MetricBestEntry { + MetricBestEntry { + key: self.key, + source: self.source, + value: self.value, + order, + experiment_id: self.experiment_id, + frontier_id: self.frontier_id, + change_node_id: self.change_node_id, + change_title: self.change_title, + run_id: self.run_id, + verdict: self.verdict, + candidate_checkpoint_id: self.candidate_checkpoint_id, + candidate_commit_hash: self.candidate_commit_hash, + unit: self.unit, + objective: self.objective, + dimensions: self.dimensions, + } + } +} + +#[derive(Clone, Debug)] +struct MetricKeyAccumulator { + key: NonEmptyText, + source: MetricFieldSource, + experiment_ids: BTreeSet, + unit: Option, + objective: Option, + ambiguous_semantics: bool, +} + +impl MetricKeyAccumulator { + fn from_sample(sample: &MetricSample) -> Self { + Self { + key: sample.key.clone(), + source: sample.source, + experiment_ids: BTreeSet::from([sample.experiment_id]), + unit: sample.unit, + objective: sample.objective, + ambiguous_semantics: false, + } + } + + fn observe(&mut self, sample: &MetricSample) { + let _ = self.experiment_ids.insert(sample.experiment_id); + if self.unit != sample.unit || self.objective != sample.objective { + self.ambiguous_semantics = true; + self.unit = None; + self.objective = None; + } + } + + fn finish(self) -> MetricKeySummary { + MetricKeySummary { + key: self.key, + source: self.source, + experiment_count: self.experiment_ids.len() as u64, + unit: self.unit, + objective: self.objective, + description: None, + requires_order: self.source != MetricFieldSource::RunMetric + || self.ambiguous_semantics + || !matches!( + self.objective, + Some(OptimizationObjective::Minimize | OptimizationObjective::Maximize) + ), + } + } +} + +fn collect_metric_samples( + store: &ProjectStore, + query: &MetricKeyQuery, +) -> Result, StoreError> { + let rows = load_experiment_rows(store)?; + let metric_definitions = metric_definitions_by_key(store)?; + let mut samples = Vec::new(); + for row in rows { + if query + .frontier_id + .is_some_and(|frontier_id| row.frontier_id != frontier_id) + { + continue; + } + if !dimensions_match(&row.dimensions, &query.dimensions) { + continue; + } + samples.extend(metric_samples_for_row( + store.schema(), + &row, + &metric_definitions, + )); + } + Ok(if let Some(source) = query.source { + samples + .into_iter() + .filter(|sample| sample.source == source) + .collect() + } else { + samples + }) +} + +fn resolve_metric_order( + matching: &[MetricSample], + query: &MetricBestQuery, + source: MetricFieldSource, +) -> Result { + if let Some(order) = query.order { + return Ok(order); + } + if source != MetricFieldSource::RunMetric { + return Err(StoreError::MetricOrderRequired { + key: query.key.as_str().to_owned(), + metric_source: source.as_str().to_owned(), + }); + } + let objectives = matching + .iter() + .map(|sample| sample.objective) + .collect::>(); + match objectives.len() { + 1 => match objectives.into_iter().next().flatten() { + Some(OptimizationObjective::Minimize) => Ok(MetricRankOrder::Asc), + Some(OptimizationObjective::Maximize) => Ok(MetricRankOrder::Desc), + Some(OptimizationObjective::Target) | None => Err(StoreError::MetricOrderRequired { + key: query.key.as_str().to_owned(), + metric_source: source.as_str().to_owned(), + }), + }, + _ => Err(StoreError::MetricSemanticsAmbiguous { + key: query.key.as_str().to_owned(), + metric_source: source.as_str().to_owned(), + }), + } +} + +fn compare_metric_samples( + left: &MetricSample, + right: &MetricSample, + order: MetricRankOrder, +) -> Ordering { + let metric_order = match order { + MetricRankOrder::Asc => left + .value + .partial_cmp(&right.value) + .unwrap_or(Ordering::Equal), + MetricRankOrder::Desc => right + .value + .partial_cmp(&left.value) + .unwrap_or(Ordering::Equal), + }; + metric_order + .then_with(|| right.experiment_id.cmp(&left.experiment_id)) + .then_with(|| left.key.cmp(&right.key)) +} + +#[derive(Clone, Debug)] +struct ExperimentMetricRow { + experiment_id: fidget_spinner_core::ExperimentId, + frontier_id: fidget_spinner_core::FrontierId, + run_id: fidget_spinner_core::RunId, + verdict: FrontierVerdict, + candidate_checkpoint: CheckpointRecord, + change_node: DagNode, + run_node: DagNode, + analysis_node: Option, + decision_node: DagNode, + primary_metric: MetricValue, + supporting_metrics: Vec, + dimensions: BTreeMap, +} + +fn load_experiment_rows(store: &ProjectStore) -> Result, StoreError> { + let run_dimensions = load_run_dimensions_by_run_id(store)?; + let mut statement = store.connection.prepare( + "SELECT + id, + frontier_id, + run_id, + change_node_id, + run_node_id, + analysis_node_id, + decision_node_id, + candidate_checkpoint_id, + primary_metric_json, + supporting_metrics_json, + verdict + FROM experiments", + )?; + let mut rows = statement.query([])?; + let mut items = Vec::new(); + while let Some(row) = rows.next()? { + let change_node_id = parse_node_id(&row.get::<_, String>(3)?)?; + let run_id = parse_run_id(&row.get::<_, String>(2)?)?; + let run_node_id = parse_node_id(&row.get::<_, String>(4)?)?; + let analysis_node_id = row + .get::<_, Option>(5)? + .map(|raw| parse_node_id(&raw)) + .transpose()?; + let decision_node_id = parse_node_id(&row.get::<_, String>(6)?)?; + let candidate_checkpoint_id = parse_checkpoint_id(&row.get::<_, String>(7)?)?; + items.push(ExperimentMetricRow { + experiment_id: parse_experiment_id(&row.get::<_, String>(0)?)?, + frontier_id: parse_frontier_id(&row.get::<_, String>(1)?)?, + run_id, + verdict: parse_frontier_verdict(&row.get::<_, String>(10)?)?, + candidate_checkpoint: store + .load_checkpoint(candidate_checkpoint_id)? + .ok_or(StoreError::CheckpointNotFound(candidate_checkpoint_id))?, + change_node: store + .get_node(change_node_id)? + .ok_or(StoreError::NodeNotFound(change_node_id))?, + run_node: store + .get_node(run_node_id)? + .ok_or(StoreError::NodeNotFound(run_node_id))?, + analysis_node: analysis_node_id + .map(|node_id| { + store + .get_node(node_id)? + .ok_or(StoreError::NodeNotFound(node_id)) + }) + .transpose()?, + decision_node: store + .get_node(decision_node_id)? + .ok_or(StoreError::NodeNotFound(decision_node_id))?, + primary_metric: decode_json(&row.get::<_, String>(8)?)?, + supporting_metrics: decode_json(&row.get::<_, String>(9)?)?, + dimensions: run_dimensions.get(&run_id).cloned().unwrap_or_default(), + }); + } + Ok(items) +} + +fn metric_samples_for_row( + schema: &ProjectSchema, + row: &ExperimentMetricRow, + metric_definitions: &BTreeMap, +) -> Vec { + let mut samples = vec![metric_sample_from_observation( + row, + &row.primary_metric, + metric_definitions, + MetricFieldSource::RunMetric, + )]; + samples.extend(row.supporting_metrics.iter().map(|metric| { + metric_sample_from_observation( + row, + metric, + metric_definitions, + MetricFieldSource::RunMetric, + ) + })); + samples.extend(metric_samples_from_payload(schema, row, &row.change_node)); + samples.extend(metric_samples_from_payload(schema, row, &row.run_node)); + if let Some(node) = row.analysis_node.as_ref() { + samples.extend(metric_samples_from_payload(schema, row, node)); + } + samples.extend(metric_samples_from_payload(schema, row, &row.decision_node)); + samples +} + +fn metric_sample_from_observation( + row: &ExperimentMetricRow, + metric: &MetricValue, + metric_definitions: &BTreeMap, + source: MetricFieldSource, +) -> MetricSample { + let registry = metric_definitions.get(metric.key.as_str()); + MetricSample { + key: metric.key.clone(), + source, + value: metric.value, + frontier_id: row.frontier_id, + experiment_id: row.experiment_id, + change_node_id: row.change_node.id, + change_title: row.change_node.title.clone(), + run_id: row.run_id, + verdict: row.verdict, + candidate_checkpoint_id: row.candidate_checkpoint.id, + candidate_commit_hash: row.candidate_checkpoint.snapshot.commit_hash.clone(), + unit: registry.map(|definition| definition.unit), + objective: registry.map(|definition| definition.objective), + dimensions: row.dimensions.clone(), + } +} + +fn metric_samples_from_payload( + schema: &ProjectSchema, + row: &ExperimentMetricRow, + node: &DagNode, +) -> Vec { + let Some(source) = MetricFieldSource::from_payload_class(node.class) else { + return Vec::new(); + }; + node.payload + .fields + .iter() + .filter_map(|(key, value)| { + let value = value.as_f64()?; + let spec = schema.field_spec(node.class, key); + if spec.is_some_and(|field| { + field + .value_type + .is_some_and(|kind| kind != FieldValueType::Numeric) + }) { + return None; + } + Some(MetricSample { + key: NonEmptyText::new(key.clone()).ok()?, + source, + value, + frontier_id: row.frontier_id, + experiment_id: row.experiment_id, + change_node_id: row.change_node.id, + change_title: row.change_node.title.clone(), + run_id: row.run_id, + verdict: row.verdict, + candidate_checkpoint_id: row.candidate_checkpoint.id, + candidate_commit_hash: row.candidate_checkpoint.snapshot.commit_hash.clone(), + unit: None, + objective: None, + dimensions: row.dimensions.clone(), + }) + }) + .collect() +} + fn migrate(connection: &Connection) -> Result<(), StoreError> { connection.execute_batch( " @@ -986,6 +1816,32 @@ fn migrate(connection: &Connection) -> Result<(), StoreError> { value REAL NOT NULL ); + CREATE TABLE IF NOT EXISTS metric_definitions ( + metric_key TEXT PRIMARY KEY, + unit TEXT NOT NULL, + objective TEXT NOT NULL, + description TEXT, + created_at TEXT NOT NULL + ); + + CREATE TABLE IF NOT EXISTS run_dimension_definitions ( + dimension_key TEXT PRIMARY KEY, + value_type TEXT NOT NULL, + description TEXT, + created_at TEXT NOT NULL + ); + + CREATE TABLE IF NOT EXISTS run_dimensions ( + run_id TEXT NOT NULL REFERENCES runs(run_id) ON DELETE CASCADE, + dimension_key TEXT NOT NULL REFERENCES run_dimension_definitions(dimension_key) ON DELETE RESTRICT, + value_type TEXT NOT NULL, + value_text TEXT, + value_numeric REAL, + value_boolean INTEGER, + value_timestamp TEXT, + PRIMARY KEY (run_id, dimension_key) + ); + CREATE TABLE IF NOT EXISTS experiments ( id TEXT PRIMARY KEY, frontier_id TEXT NOT NULL REFERENCES frontiers(id) ON DELETE CASCADE, @@ -1005,6 +1861,12 @@ fn migrate(connection: &Connection) -> Result<(), StoreError> { created_at TEXT NOT NULL ); + CREATE INDEX IF NOT EXISTS metrics_by_key ON metrics(metric_key); + CREATE INDEX IF NOT EXISTS run_dimensions_by_key_text ON run_dimensions(dimension_key, value_text); + CREATE INDEX IF NOT EXISTS run_dimensions_by_key_numeric ON run_dimensions(dimension_key, value_numeric); + CREATE INDEX IF NOT EXISTS run_dimensions_by_run ON run_dimensions(run_id, dimension_key); + CREATE INDEX IF NOT EXISTS experiments_by_frontier ON experiments(frontier_id, created_at DESC); + CREATE TABLE IF NOT EXISTS events ( id INTEGER PRIMARY KEY AUTOINCREMENT, entity_kind TEXT NOT NULL, @@ -1018,30 +1880,696 @@ fn migrate(connection: &Connection) -> Result<(), StoreError> { Ok(()) } -fn insert_node(tx: &Transaction<'_>, node: &DagNode) -> Result<(), StoreError> { - let schema_namespace = node - .payload - .schema - .as_ref() - .map(|schema| schema.namespace.as_str()); - let schema_version = node - .payload - .schema - .as_ref() - .map(|schema| i64::from(schema.version)); - let _ = tx.execute( - "INSERT INTO nodes ( - id, - class, - track, - frontier_id, - archived, - title, - summary, - payload_schema_namespace, - payload_schema_version, - payload_json, - diagnostics_json, +fn backfill_prose_summaries(connection: &Connection) -> Result<(), StoreError> { + let mut statement = connection.prepare( + "SELECT id, payload_json + FROM nodes + WHERE class IN ('note', 'research') + AND (summary IS NULL OR trim(summary) = '')", + )?; + let mut rows = statement.query([])?; + let mut updates = Vec::new(); + while let Some(row) = rows.next()? { + let node_id = row.get::<_, String>(0)?; + let payload = decode_json::(&row.get::<_, String>(1)?)?; + let Some(Value::String(body)) = payload.field("body") else { + continue; + }; + let Some(summary) = derive_summary_from_body(body) else { + continue; + }; + updates.push((node_id, summary)); + } + for (node_id, summary) in updates { + let _ = connection.execute( + "UPDATE nodes SET summary = ?1 WHERE id = ?2", + params![summary.as_str(), node_id], + )?; + } + Ok(()) +} + +fn sort_schema_fields(fields: &mut [ProjectFieldSpec]) { + fields.sort_by(|left, right| { + left.name + .cmp(&right.name) + .then_with(|| left.node_classes.iter().cmp(right.node_classes.iter())) + }); +} + +fn normalize_metric_plane_tx( + tx: &Transaction<'_>, +) -> Result { + let mut report = MetricPlaneMigrationReport::default(); + + if insert_run_dimension_definition_tx( + tx, + &RunDimensionDefinition::new( + NonEmptyText::new("benchmark_suite")?, + FieldValueType::String, + Some(NonEmptyText::new("Legacy coarse benchmark label")?), + ), + )? { + report.inserted_dimension_definitions += 1; + } + + { + let mut statement = tx.prepare( + "SELECT DISTINCT metric_key, unit, objective + FROM metrics + ORDER BY metric_key ASC", + )?; + let mut rows = statement.query([])?; + while let Some(row) = rows.next()? { + let definition = MetricDefinition::new( + NonEmptyText::new(row.get::<_, String>(0)?)?, + decode_metric_unit(&row.get::<_, String>(1)?)?, + decode_optimization_objective(&row.get::<_, String>(2)?)?, + None, + ); + if upsert_metric_definition_tx(tx, &definition)? { + report.inserted_metric_definitions += 1; + } + } + } + + { + let mut statement = tx.prepare( + "SELECT payload_json + FROM nodes + WHERE class = 'contract'", + )?; + let mut rows = statement.query([])?; + while let Some(row) = rows.next()? { + let payload = decode_json::(&row.get::<_, String>(0)?)?; + for definition in contract_metric_definitions(&payload)? { + if upsert_metric_definition_tx(tx, &definition)? { + report.inserted_metric_definitions += 1; + } + } + } + } + + { + let mut statement = tx.prepare( + "SELECT run_id, benchmark_suite + FROM runs + WHERE benchmark_suite IS NOT NULL + AND trim(benchmark_suite) != ''", + )?; + let mut rows = statement.query([])?; + while let Some(row) = rows.next()? { + let run_id = parse_run_id(&row.get::<_, String>(0)?)?; + let value = RunDimensionValue::String(NonEmptyText::new(row.get::<_, String>(1)?)?); + if insert_run_dimension_value_tx( + tx, + run_id, + &NonEmptyText::new("benchmark_suite")?, + &value, + )? { + report.inserted_dimension_values += 1; + } + } + } + + Ok(report) +} + +fn contract_metric_definitions(payload: &NodePayload) -> Result, StoreError> { + let mut definitions = Vec::new(); + if let Some(primary) = payload.field("primary_metric") { + definitions.push(metric_definition_from_json(primary, None)?); + } + if let Some(Value::Array(items)) = payload.field("supporting_metrics") { + for item in items { + definitions.push(metric_definition_from_json(item, None)?); + } + } + Ok(definitions) +} + +fn metric_definition_from_json( + value: &Value, + description: Option, +) -> Result { + let Some(object) = value.as_object() else { + return Err(StoreError::Json(serde_json::Error::io(io::Error::new( + io::ErrorKind::InvalidData, + "metric definition payload must be an object", + )))); + }; + let key = object + .get("metric_key") + .or_else(|| object.get("key")) + .and_then(Value::as_str) + .ok_or_else(|| { + StoreError::Json(serde_json::Error::io(io::Error::new( + io::ErrorKind::InvalidData, + "metric definition missing key", + ))) + })?; + let unit = object.get("unit").and_then(Value::as_str).ok_or_else(|| { + StoreError::Json(serde_json::Error::io(io::Error::new( + io::ErrorKind::InvalidData, + "metric definition missing unit", + ))) + })?; + let objective = object + .get("objective") + .and_then(Value::as_str) + .ok_or_else(|| { + StoreError::Json(serde_json::Error::io(io::Error::new( + io::ErrorKind::InvalidData, + "metric definition missing objective", + ))) + })?; + Ok(MetricDefinition::new( + NonEmptyText::new(key)?, + decode_metric_unit(unit)?, + decode_optimization_objective(objective)?, + description, + )) +} + +fn upsert_metric_definition_tx( + tx: &Transaction<'_>, + definition: &MetricDefinition, +) -> Result { + let existing = tx + .query_row( + "SELECT unit, objective, description + FROM metric_definitions + WHERE metric_key = ?1", + params![definition.key.as_str()], + |row| { + Ok(( + row.get::<_, String>(0)?, + row.get::<_, String>(1)?, + row.get::<_, Option>(2)?, + )) + }, + ) + .optional()?; + if let Some((existing_unit, existing_objective, existing_description)) = existing { + let new_unit = encode_metric_unit(definition.unit).to_owned(); + let new_objective = encode_optimization_objective(definition.objective).to_owned(); + if existing_unit != new_unit || existing_objective != new_objective { + return Err(StoreError::ConflictingMetricDefinition { + key: definition.key.as_str().to_owned(), + existing_unit, + existing_objective, + new_unit, + new_objective, + }); + } + if existing_description.is_none() && definition.description.is_some() { + let _ = tx.execute( + "UPDATE metric_definitions SET description = ?2 WHERE metric_key = ?1", + params![ + definition.key.as_str(), + definition.description.as_ref().map(NonEmptyText::as_str) + ], + )?; + } + Ok(false) + } else { + let _ = tx.execute( + "INSERT INTO metric_definitions (metric_key, unit, objective, description, created_at) + VALUES (?1, ?2, ?3, ?4, ?5)", + params![ + definition.key.as_str(), + encode_metric_unit(definition.unit), + encode_optimization_objective(definition.objective), + definition.description.as_ref().map(NonEmptyText::as_str), + encode_timestamp(definition.created_at)?, + ], + )?; + Ok(true) + } +} + +fn insert_run_dimension_definition_tx( + tx: &Transaction<'_>, + definition: &RunDimensionDefinition, +) -> Result { + let existing = tx + .query_row( + "SELECT value_type, description + FROM run_dimension_definitions + WHERE dimension_key = ?1", + params![definition.key.as_str()], + |row| Ok((row.get::<_, String>(0)?, row.get::<_, Option>(1)?)), + ) + .optional()?; + if let Some((existing_type, existing_description)) = existing { + let new_type = encode_field_value_type(definition.value_type).to_owned(); + if existing_type != new_type { + return Err(StoreError::ConflictingRunDimensionDefinition { + key: definition.key.as_str().to_owned(), + existing_type, + new_type, + }); + } + if existing_description.is_none() && definition.description.is_some() { + let _ = tx.execute( + "UPDATE run_dimension_definitions SET description = ?2 WHERE dimension_key = ?1", + params![ + definition.key.as_str(), + definition.description.as_ref().map(NonEmptyText::as_str) + ], + )?; + } + Ok(false) + } else { + let _ = tx.execute( + "INSERT INTO run_dimension_definitions (dimension_key, value_type, description, created_at) + VALUES (?1, ?2, ?3, ?4)", + params![ + definition.key.as_str(), + encode_field_value_type(definition.value_type), + definition.description.as_ref().map(NonEmptyText::as_str), + encode_timestamp(definition.created_at)?, + ], + )?; + Ok(true) + } +} + +fn load_metric_definition_tx( + tx: &Transaction<'_>, + key: &NonEmptyText, +) -> Result, StoreError> { + tx.query_row( + "SELECT metric_key, unit, objective, description, created_at + FROM metric_definitions + WHERE metric_key = ?1", + params![key.as_str()], + |row| { + Ok(MetricDefinition { + key: NonEmptyText::new(row.get::<_, String>(0)?) + .map_err(core_to_sql_conversion_error)?, + unit: decode_metric_unit(&row.get::<_, String>(1)?) + .map_err(to_sql_conversion_error)?, + objective: decode_optimization_objective(&row.get::<_, String>(2)?) + .map_err(to_sql_conversion_error)?, + description: row + .get::<_, Option>(3)? + .map(NonEmptyText::new) + .transpose() + .map_err(core_to_sql_conversion_error)?, + created_at: decode_timestamp(&row.get::<_, String>(4)?) + .map_err(to_sql_conversion_error)?, + }) + }, + ) + .optional() + .map_err(StoreError::from) +} + +fn metric_definitions_by_key( + store: &ProjectStore, +) -> Result, StoreError> { + Ok(store + .list_metric_definitions()? + .into_iter() + .map(|definition| (definition.key.as_str().to_owned(), definition)) + .collect()) +} + +fn run_dimension_definitions_by_key( + store: &ProjectStore, +) -> Result, StoreError> { + let mut statement = store.connection.prepare( + "SELECT dimension_key, value_type, description, created_at + FROM run_dimension_definitions", + )?; + let mut rows = statement.query([])?; + let mut items = BTreeMap::new(); + while let Some(row) = rows.next()? { + let definition = RunDimensionDefinition { + key: NonEmptyText::new(row.get::<_, String>(0)?)?, + value_type: decode_field_value_type(&row.get::<_, String>(1)?)?, + description: row + .get::<_, Option>(2)? + .map(NonEmptyText::new) + .transpose()?, + created_at: decode_timestamp(&row.get::<_, String>(3)?)?, + }; + let _ = items.insert(definition.key.as_str().to_owned(), definition); + } + Ok(items) +} + +fn coerce_run_dimension_map( + definitions: &BTreeMap, + raw_dimensions: BTreeMap, +) -> Result, StoreError> { + let mut dimensions = BTreeMap::new(); + for (raw_key, raw_value) in raw_dimensions { + let key = NonEmptyText::new(raw_key)?; + let Some(definition) = definitions.get(key.as_str()) else { + return Err(StoreError::UnknownRunDimension(key)); + }; + let value = coerce_run_dimension_value(definition, raw_value)?; + let _ = dimensions.insert(key, value); + } + Ok(dimensions) +} + +fn coerce_run_dimension_value( + definition: &RunDimensionDefinition, + raw_value: Value, +) -> Result { + match definition.value_type { + FieldValueType::String => match raw_value { + Value::String(value) => Ok(RunDimensionValue::String(NonEmptyText::new(value)?)), + other => Err(StoreError::InvalidRunDimensionValue { + key: definition.key.as_str().to_owned(), + expected: definition.value_type.as_str().to_owned(), + observed: value_kind_name(&other).to_owned(), + }), + }, + FieldValueType::Numeric => match raw_value.as_f64() { + Some(value) => Ok(RunDimensionValue::Numeric(value)), + None => Err(StoreError::InvalidRunDimensionValue { + key: definition.key.as_str().to_owned(), + expected: definition.value_type.as_str().to_owned(), + observed: value_kind_name(&raw_value).to_owned(), + }), + }, + FieldValueType::Boolean => match raw_value { + Value::Bool(value) => Ok(RunDimensionValue::Boolean(value)), + other => Err(StoreError::InvalidRunDimensionValue { + key: definition.key.as_str().to_owned(), + expected: definition.value_type.as_str().to_owned(), + observed: value_kind_name(&other).to_owned(), + }), + }, + FieldValueType::Timestamp => match raw_value { + Value::String(value) => { + let _ = OffsetDateTime::parse(&value, &Rfc3339)?; + Ok(RunDimensionValue::Timestamp(NonEmptyText::new(value)?)) + } + other => Err(StoreError::InvalidRunDimensionValue { + key: definition.key.as_str().to_owned(), + expected: definition.value_type.as_str().to_owned(), + observed: value_kind_name(&other).to_owned(), + }), + }, + } +} + +fn insert_run_dimension_value_tx( + tx: &Transaction<'_>, + run_id: fidget_spinner_core::RunId, + key: &NonEmptyText, + value: &RunDimensionValue, +) -> Result { + let (value_text, value_numeric, value_boolean, value_timestamp) = + encode_run_dimension_columns(value)?; + let changed = tx.execute( + "INSERT OR IGNORE INTO run_dimensions ( + run_id, + dimension_key, + value_type, + value_text, + value_numeric, + value_boolean, + value_timestamp + ) VALUES (?1, ?2, ?3, ?4, ?5, ?6, ?7)", + params![ + run_id.to_string(), + key.as_str(), + encode_field_value_type(value.value_type()), + value_text, + value_numeric, + value_boolean, + value_timestamp, + ], + )?; + Ok(changed > 0) +} + +fn insert_run_dimensions( + tx: &Transaction<'_>, + run_id: fidget_spinner_core::RunId, + dimensions: &BTreeMap, +) -> Result<(), StoreError> { + for (key, value) in dimensions { + let _ = insert_run_dimension_value_tx(tx, run_id, key, value)?; + } + Ok(()) +} + +fn validate_run_dimensions_tx( + tx: &Transaction<'_>, + dimensions: &BTreeMap, +) -> Result, StoreError> { + for (key, value) in dimensions { + let Some(expected_type) = tx + .query_row( + "SELECT value_type + FROM run_dimension_definitions + WHERE dimension_key = ?1", + params![key.as_str()], + |row| row.get::<_, String>(0), + ) + .optional()? + else { + return Err(StoreError::UnknownRunDimension(key.clone())); + }; + let expected_type = decode_field_value_type(&expected_type)?; + let observed_type = value.value_type(); + if expected_type != observed_type { + return Err(StoreError::InvalidRunDimensionValue { + key: key.as_str().to_owned(), + expected: expected_type.as_str().to_owned(), + observed: observed_type.as_str().to_owned(), + }); + } + if matches!(value, RunDimensionValue::Timestamp(raw) if OffsetDateTime::parse(raw.as_str(), &Rfc3339).is_err()) + { + return Err(StoreError::InvalidRunDimensionValue { + key: key.as_str().to_owned(), + expected: FieldValueType::Timestamp.as_str().to_owned(), + observed: "string".to_owned(), + }); + } + } + Ok(dimensions.clone()) +} + +fn load_run_dimensions_by_run_id( + store: &ProjectStore, +) -> Result< + BTreeMap>, + StoreError, +> { + let mut statement = store.connection.prepare( + "SELECT run_id, dimension_key, value_type, value_text, value_numeric, value_boolean, value_timestamp + FROM run_dimensions + ORDER BY dimension_key ASC", + )?; + let mut rows = statement.query([])?; + let mut values = + BTreeMap::>::new(); + while let Some(row) = rows.next()? { + let run_id = parse_run_id(&row.get::<_, String>(0)?)?; + let key = NonEmptyText::new(row.get::<_, String>(1)?)?; + let value_type = decode_field_value_type(&row.get::<_, String>(2)?)?; + let value = decode_run_dimension_value( + value_type, + row.get::<_, Option>(3)?, + row.get::<_, Option>(4)?, + row.get::<_, Option>(5)?, + row.get::<_, Option>(6)?, + )?; + let _ = values.entry(run_id).or_default().insert(key, value); + } + Ok(values) +} + +fn load_run_dimension_summaries( + store: &ProjectStore, +) -> Result, StoreError> { + let definitions = { + let mut statement = store.connection.prepare( + "SELECT dimension_key, value_type, description, created_at + FROM run_dimension_definitions + ORDER BY dimension_key ASC", + )?; + let mut rows = statement.query([])?; + let mut items = Vec::new(); + while let Some(row) = rows.next()? { + items.push(RunDimensionDefinition { + key: NonEmptyText::new(row.get::<_, String>(0)?)?, + value_type: decode_field_value_type(&row.get::<_, String>(1)?)?, + description: row + .get::<_, Option>(2)? + .map(NonEmptyText::new) + .transpose()?, + created_at: decode_timestamp(&row.get::<_, String>(3)?)?, + }); + } + items + }; + + let mut summaries = Vec::new(); + for definition in definitions { + let mut statement = store.connection.prepare( + "SELECT value_text, value_numeric, value_boolean, value_timestamp + FROM run_dimensions + WHERE dimension_key = ?1", + )?; + let mut rows = statement.query(params![definition.key.as_str()])?; + let mut observed_run_count = 0_u64; + let mut distinct = BTreeSet::new(); + let mut sample_values = Vec::new(); + while let Some(row) = rows.next()? { + observed_run_count += 1; + let value = decode_run_dimension_value( + definition.value_type, + row.get::<_, Option>(0)?, + row.get::<_, Option>(1)?, + row.get::<_, Option>(2)?, + row.get::<_, Option>(3)?, + )?; + let serialized = encode_json(&value.as_json())?; + if distinct.insert(serialized) && sample_values.len() < 5 { + sample_values.push(value.as_json()); + } + } + summaries.push(RunDimensionSummary { + key: definition.key, + value_type: definition.value_type, + description: definition.description, + observed_run_count, + distinct_value_count: distinct.len() as u64, + sample_values, + }); + } + Ok(summaries) +} + +fn merge_registered_run_metric_summaries( + store: &ProjectStore, + summaries: &mut Vec, +) -> Result<(), StoreError> { + let definitions = store.list_metric_definitions()?; + for definition in definitions { + if let Some(summary) = summaries.iter_mut().find(|summary| { + summary.source == MetricFieldSource::RunMetric && summary.key == definition.key + }) { + summary.unit = Some(definition.unit); + summary.objective = Some(definition.objective); + summary.description.clone_from(&definition.description); + summary.requires_order = matches!(definition.objective, OptimizationObjective::Target); + continue; + } + summaries.push(MetricKeySummary { + key: definition.key, + source: MetricFieldSource::RunMetric, + experiment_count: 0, + unit: Some(definition.unit), + objective: Some(definition.objective), + description: definition.description, + requires_order: matches!(definition.objective, OptimizationObjective::Target), + }); + } + Ok(()) +} + +fn dimensions_match( + haystack: &BTreeMap, + needle: &BTreeMap, +) -> bool { + needle + .iter() + .all(|(key, value)| haystack.get(key) == Some(value)) +} + +fn run_dimensions_json(dimensions: &BTreeMap) -> Value { + Value::Object( + dimensions + .iter() + .map(|(key, value)| (key.to_string(), value.as_json())) + .collect::>(), + ) +} + +fn benchmark_suite_label(dimensions: &BTreeMap) -> Option { + dimensions + .get(&NonEmptyText::new("benchmark_suite").ok()?) + .and_then(|value| match value { + RunDimensionValue::String(item) => Some(item.to_string()), + _ => None, + }) + .or_else(|| { + if dimensions.is_empty() { + None + } else { + Some( + dimensions + .iter() + .map(|(key, value)| format!("{key}={}", dimension_value_text(value))) + .collect::>() + .join(", "), + ) + } + }) +} + +fn derive_summary_from_body(body: &str) -> Option { + const MAX_SUMMARY_CHARS: usize = 240; + + let paragraph = body + .split("\n\n") + .map(collapse_inline_whitespace) + .map(|text| text.trim().to_owned()) + .find(|text| !text.is_empty())?; + let summary = truncate_chars(¶graph, MAX_SUMMARY_CHARS); + NonEmptyText::new(summary).ok() +} + +fn collapse_inline_whitespace(raw: &str) -> String { + raw.split_whitespace().collect::>().join(" ") +} + +fn truncate_chars(value: &str, max_chars: usize) -> String { + if value.chars().count() <= max_chars { + return value.to_owned(); + } + let mut truncated = value.chars().take(max_chars).collect::(); + if let Some(index) = truncated.rfind(char::is_whitespace) { + truncated.truncate(index); + } + format!("{}…", truncated.trim_end()) +} + +fn insert_node(tx: &Transaction<'_>, node: &DagNode) -> Result<(), StoreError> { + let schema_namespace = node + .payload + .schema + .as_ref() + .map(|schema| schema.namespace.as_str()); + let schema_version = node + .payload + .schema + .as_ref() + .map(|schema| i64::from(schema.version)); + let _ = tx.execute( + "INSERT INTO nodes ( + id, + class, + track, + frontier_id, + archived, + title, + summary, + payload_schema_namespace, + payload_schema_version, + payload_json, + diagnostics_json, agent_session_id, created_at, updated_at @@ -1210,8 +2738,11 @@ fn insert_checkpoint( fn insert_run( tx: &Transaction<'_>, run: &RunRecord, - primary_metric: &MetricObservation, - supporting_metrics: &[MetricObservation], + benchmark_suite: Option<&str>, + primary_metric: &MetricValue, + primary_metric_definition: &MetricDefinition, + supporting_metrics: &[MetricValue], + supporting_metric_definitions: &[MetricDefinition], ) -> Result<(), StoreError> { let (repo_root, worktree_root, worktree_name, head_commit, dirty_paths) = run .code_snapshot @@ -1273,7 +2804,7 @@ fn insert_run( worktree_name.map(|item| item.to_string()), head_commit.map(|item| item.to_string()), dirty_paths_json, - run.benchmark_suite.as_ref().map(NonEmptyText::as_str), + benchmark_suite, run.command.working_directory.as_str(), encode_json(&run.command.argv)?, encode_json(&run.command.env)?, @@ -1282,15 +2813,19 @@ fn insert_run( ], )?; - for metric in std::iter::once(primary_metric).chain(supporting_metrics.iter()) { + for (metric, definition) in std::iter::once((primary_metric, primary_metric_definition)).chain( + supporting_metrics + .iter() + .zip(supporting_metric_definitions.iter()), + ) { let _ = tx.execute( "INSERT INTO metrics (run_id, metric_key, unit, objective, value) VALUES (?1, ?2, ?3, ?4, ?5)", params![ run.run_id.to_string(), - metric.metric_key.as_str(), - encode_metric_unit(metric.unit), - encode_optimization_objective(metric.objective), + metric.key.as_str(), + encode_metric_unit(definition.unit), + encode_optimization_objective(definition.objective), metric.value, ], )?; @@ -1331,7 +2866,7 @@ fn insert_experiment( experiment.run_id.to_string(), experiment.analysis_node_id.map(|id| id.to_string()), experiment.decision_node_id.to_string(), - experiment.result.benchmark_suite.as_str(), + benchmark_suite_label(&experiment.result.dimensions), encode_json(&experiment.result.primary_metric)?, encode_json(&experiment.result.supporting_metrics)?, experiment.note.summary.as_str(), @@ -1623,6 +3158,16 @@ fn parse_checkpoint_id(raw: &str) -> Result Result { + Ok(fidget_spinner_core::ExperimentId::from_uuid(parse_uuid( + raw, + )?)) +} + +fn parse_run_id(raw: &str) -> Result { + Ok(fidget_spinner_core::RunId::from_uuid(parse_uuid(raw)?)) +} + fn parse_agent_session_id(raw: &str) -> Result { Ok(fidget_spinner_core::AgentSessionId::from_uuid(parse_uuid( raw, @@ -1762,6 +3307,23 @@ fn encode_backend(backend: ExecutionBackend) -> &'static str { } } +fn encode_field_value_type(value_type: FieldValueType) -> &'static str { + value_type.as_str() +} + +fn decode_field_value_type(raw: &str) -> Result { + match raw { + "string" => Ok(FieldValueType::String), + "numeric" => Ok(FieldValueType::Numeric), + "boolean" => Ok(FieldValueType::Boolean), + "timestamp" => Ok(FieldValueType::Timestamp), + other => Err(StoreError::Json(serde_json::Error::io(io::Error::new( + io::ErrorKind::InvalidData, + format!("unknown field value type `{other}`"), + )))), + } +} + fn encode_metric_unit(unit: MetricUnit) -> &'static str { match unit { MetricUnit::Seconds => "seconds", @@ -1772,6 +3334,20 @@ fn encode_metric_unit(unit: MetricUnit) -> &'static str { } } +fn decode_metric_unit(raw: &str) -> Result { + match raw { + "seconds" => Ok(MetricUnit::Seconds), + "bytes" => Ok(MetricUnit::Bytes), + "count" => Ok(MetricUnit::Count), + "ratio" => Ok(MetricUnit::Ratio), + "custom" => Ok(MetricUnit::Custom), + other => Err(StoreError::Json(serde_json::Error::io(io::Error::new( + io::ErrorKind::InvalidData, + format!("unknown metric unit `{other}`"), + )))), + } +} + fn encode_optimization_objective(objective: OptimizationObjective) -> &'static str { match objective { OptimizationObjective::Minimize => "minimize", @@ -1780,6 +3356,18 @@ fn encode_optimization_objective(objective: OptimizationObjective) -> &'static s } } +fn decode_optimization_objective(raw: &str) -> Result { + match raw { + "minimize" => Ok(OptimizationObjective::Minimize), + "maximize" => Ok(OptimizationObjective::Maximize), + "target" => Ok(OptimizationObjective::Target), + other => Err(StoreError::Json(serde_json::Error::io(io::Error::new( + io::ErrorKind::InvalidData, + format!("unknown optimization objective `{other}`"), + )))), + } +} + fn encode_frontier_verdict(verdict: FrontierVerdict) -> &'static str { match verdict { FrontierVerdict::PromoteToChampion => "promote-to-champion", @@ -1790,18 +3378,117 @@ fn encode_frontier_verdict(verdict: FrontierVerdict) -> &'static str { } } +fn parse_frontier_verdict(raw: &str) -> Result { + match raw { + "promote-to-champion" => Ok(FrontierVerdict::PromoteToChampion), + "keep-on-frontier" => Ok(FrontierVerdict::KeepOnFrontier), + "revert-to-champion" => Ok(FrontierVerdict::RevertToChampion), + "archive-dead-end" => Ok(FrontierVerdict::ArchiveDeadEnd), + "needs-more-evidence" => Ok(FrontierVerdict::NeedsMoreEvidence), + other => Err(StoreError::Json(serde_json::Error::io(io::Error::new( + io::ErrorKind::InvalidData, + format!("unknown frontier verdict `{other}`"), + )))), + } +} + +type RunDimensionColumns = (Option, Option, Option, Option); + +fn encode_run_dimension_columns( + value: &RunDimensionValue, +) -> Result { + match value { + RunDimensionValue::String(item) => Ok((Some(item.to_string()), None, None, None)), + RunDimensionValue::Numeric(item) => Ok((None, Some(*item), None, None)), + RunDimensionValue::Boolean(item) => Ok((None, None, Some(i64::from(*item)), None)), + RunDimensionValue::Timestamp(item) => { + let _ = OffsetDateTime::parse(item.as_str(), &Rfc3339)?; + Ok((None, None, None, Some(item.to_string()))) + } + } +} + +fn decode_run_dimension_value( + value_type: FieldValueType, + value_text: Option, + value_numeric: Option, + value_boolean: Option, + value_timestamp: Option, +) -> Result { + match value_type { + FieldValueType::String => Ok(RunDimensionValue::String(NonEmptyText::new( + value_text.ok_or_else(|| { + StoreError::Json(serde_json::Error::io(io::Error::new( + io::ErrorKind::InvalidData, + "missing string dimension value", + ))) + })?, + )?)), + FieldValueType::Numeric => Ok(RunDimensionValue::Numeric(value_numeric.ok_or_else( + || { + StoreError::Json(serde_json::Error::io(io::Error::new( + io::ErrorKind::InvalidData, + "missing numeric dimension value", + ))) + }, + )?)), + FieldValueType::Boolean => Ok(RunDimensionValue::Boolean( + value_boolean.ok_or_else(|| { + StoreError::Json(serde_json::Error::io(io::Error::new( + io::ErrorKind::InvalidData, + "missing boolean dimension value", + ))) + })? != 0, + )), + FieldValueType::Timestamp => { + let value = value_timestamp.ok_or_else(|| { + StoreError::Json(serde_json::Error::io(io::Error::new( + io::ErrorKind::InvalidData, + "missing timestamp dimension value", + ))) + })?; + let _ = OffsetDateTime::parse(&value, &Rfc3339)?; + Ok(RunDimensionValue::Timestamp(NonEmptyText::new(value)?)) + } + } +} + +fn dimension_value_text(value: &RunDimensionValue) -> String { + match value { + RunDimensionValue::String(item) | RunDimensionValue::Timestamp(item) => item.to_string(), + RunDimensionValue::Numeric(item) => item.to_string(), + RunDimensionValue::Boolean(item) => item.to_string(), + } +} + +fn value_kind_name(value: &Value) -> &'static str { + match value { + Value::Null => "null", + Value::Bool(_) => "boolean", + Value::Number(_) => "numeric", + Value::String(_) => "string", + Value::Array(_) => "array", + Value::Object(_) => "object", + } +} + #[cfg(test)] mod tests { - use std::collections::BTreeSet; + use std::collections::{BTreeMap, BTreeSet}; use serde_json::json; use super::{ - CreateFrontierRequest, CreateNodeRequest, ListNodesQuery, PROJECT_SCHEMA_NAME, ProjectStore, + CloseExperimentRequest, CreateFrontierRequest, CreateNodeRequest, DefineMetricRequest, + DefineRunDimensionRequest, ListNodesQuery, MetricBestQuery, MetricFieldSource, + MetricKeyQuery, MetricRankOrder, PROJECT_SCHEMA_NAME, ProjectStore, + RemoveSchemaFieldRequest, UpsertSchemaFieldRequest, }; use fidget_spinner_core::{ - CheckpointSnapshotRef, EvaluationProtocol, FrontierContract, MetricSpec, MetricUnit, - NodeAnnotation, NodeClass, NodePayload, NonEmptyText, OptimizationObjective, TagName, + CheckpointSnapshotRef, CommandRecipe, DiagnosticSeverity, EvaluationProtocol, + FieldPresence, FieldRole, FieldValueType, FrontierContract, FrontierNote, FrontierVerdict, + GitCommitHash, InferencePolicy, MetricSpec, MetricUnit, MetricValue, NodeAnnotation, + NodeClass, NodePayload, NonEmptyText, OptimizationObjective, RunDimensionValue, TagName, }; fn temp_project_root(label: &str) -> camino::Utf8PathBuf { @@ -1893,7 +3580,7 @@ mod tests { repo_root: root.clone(), worktree_root: root, worktree_name: Some(NonEmptyText::new("main")?), - commit_hash: fidget_spinner_core::GitCommitHash::new("0123456789abcdef")?, + commit_hash: GitCommitHash::new("0123456789abcdef")?, }, }), })?; @@ -1914,7 +3601,7 @@ mod tests { class: NodeClass::Note, frontier_id: None, title: NonEmptyText::new("quick note")?, - summary: None, + summary: Some(NonEmptyText::new("quick note summary")?), tags: Some(BTreeSet::new()), payload: NodePayload::with_schema( store.schema().schema_ref(), @@ -1987,7 +3674,7 @@ mod tests { class: NodeClass::Note, frontier_id: None, title: NonEmptyText::new("quick note")?, - summary: None, + summary: Some(NonEmptyText::new("quick note summary")?), tags: None, payload: NodePayload::with_schema( store.schema().schema_ref(), @@ -2021,7 +3708,7 @@ mod tests { class: NodeClass::Note, frontier_id: None, title: NonEmptyText::new("tagged note")?, - summary: None, + summary: Some(NonEmptyText::new("tagged note summary")?), tags: Some(BTreeSet::from([cuts.name.clone(), heuristics.name.clone()])), payload: NodePayload::with_schema( store.schema().schema_ref(), @@ -2044,4 +3731,506 @@ mod tests { assert_eq!(filtered[0].tags.len(), 2); Ok(()) } + + #[test] + fn prose_nodes_require_summary_and_body() -> Result<(), super::StoreError> { + let root = temp_project_root("prose-summary"); + let mut store = ProjectStore::init( + &root, + NonEmptyText::new("test project")?, + NonEmptyText::new("local.test")?, + )?; + + let missing_summary = store.add_node(CreateNodeRequest { + class: NodeClass::Research, + frontier_id: None, + title: NonEmptyText::new("research note")?, + summary: None, + tags: None, + payload: NodePayload::with_schema( + store.schema().schema_ref(), + super::json_object(json!({"body": "research body"}))?, + ), + annotations: Vec::new(), + attachments: Vec::new(), + }); + assert!(matches!( + missing_summary, + Err(super::StoreError::ProseSummaryRequired(NodeClass::Research)) + )); + + let missing_body = store.add_node(CreateNodeRequest { + class: NodeClass::Note, + frontier_id: None, + title: NonEmptyText::new("quick note")?, + summary: Some(NonEmptyText::new("quick note summary")?), + tags: Some(BTreeSet::new()), + payload: NodePayload::with_schema(store.schema().schema_ref(), serde_json::Map::new()), + annotations: Vec::new(), + attachments: Vec::new(), + }); + assert!(matches!( + missing_body, + Err(super::StoreError::ProseBodyRequired(NodeClass::Note)) + )); + Ok(()) + } + + #[test] + fn opening_store_backfills_missing_prose_summaries() -> Result<(), super::StoreError> { + let root = temp_project_root("summary-backfill"); + let mut store = ProjectStore::init( + &root, + NonEmptyText::new("test project")?, + NonEmptyText::new("local.test")?, + )?; + let node = store.add_node(CreateNodeRequest { + class: NodeClass::Research, + frontier_id: None, + title: NonEmptyText::new("research note")?, + summary: Some(NonEmptyText::new("temporary summary")?), + tags: None, + payload: NodePayload::with_schema( + store.schema().schema_ref(), + super::json_object(json!({"body": "First paragraph.\n\nSecond paragraph."}))?, + ), + annotations: Vec::new(), + attachments: Vec::new(), + })?; + drop(store); + + let connection = rusqlite::Connection::open( + root.join(super::STORE_DIR_NAME) + .join(super::STATE_DB_NAME) + .as_std_path(), + )?; + let _ = connection.execute( + "UPDATE nodes SET summary = NULL WHERE id = ?1", + rusqlite::params![node.id.to_string()], + )?; + drop(connection); + + let reopened = ProjectStore::open(&root)?; + let loaded = reopened + .get_node(node.id)? + .ok_or(super::StoreError::NodeNotFound(node.id))?; + assert_eq!( + loaded.summary.as_ref().map(NonEmptyText::as_str), + Some("First paragraph.") + ); + Ok(()) + } + + #[test] + fn schema_field_upsert_remove_persists_and_bumps_version() -> Result<(), super::StoreError> { + let root = temp_project_root("schema-upsert-remove"); + let mut store = ProjectStore::init( + &root, + NonEmptyText::new("test project")?, + NonEmptyText::new("local.test")?, + )?; + let initial_version = store.schema().version; + + let field = store.upsert_schema_field(UpsertSchemaFieldRequest { + name: NonEmptyText::new("scenario")?, + node_classes: BTreeSet::from([NodeClass::Change, NodeClass::Analysis]), + presence: FieldPresence::Recommended, + severity: DiagnosticSeverity::Warning, + role: FieldRole::ProjectionGate, + inference_policy: InferencePolicy::ManualOnly, + value_type: Some(FieldValueType::String), + })?; + assert_eq!(field.name.as_str(), "scenario"); + assert_eq!(store.schema().version, initial_version + 1); + assert!( + store + .schema() + .fields + .iter() + .any(|item| item.name.as_str() == "scenario") + ); + drop(store); + + let mut reopened = ProjectStore::open(&root)?; + assert_eq!(reopened.schema().version, initial_version + 1); + assert!( + reopened + .schema() + .fields + .iter() + .any(|item| item.name.as_str() == "scenario") + ); + + let removed = reopened.remove_schema_field(RemoveSchemaFieldRequest { + name: NonEmptyText::new("scenario")?, + node_classes: Some(BTreeSet::from([NodeClass::Change, NodeClass::Analysis])), + })?; + assert_eq!(removed, 1); + assert_eq!(reopened.schema().version, initial_version + 2); + assert!( + !reopened + .schema() + .fields + .iter() + .any(|item| item.name.as_str() == "scenario") + ); + Ok(()) + } + + #[test] + fn metric_queries_surface_canonical_and_payload_numeric_fields() -> Result<(), super::StoreError> + { + let root = temp_project_root("metric-best"); + let mut store = ProjectStore::init( + &root, + NonEmptyText::new("test project")?, + NonEmptyText::new("local.test")?, + )?; + let projection = store.create_frontier(CreateFrontierRequest { + label: NonEmptyText::new("optimization frontier")?, + contract_title: NonEmptyText::new("contract root")?, + contract_summary: None, + contract: FrontierContract { + objective: NonEmptyText::new("improve wall time")?, + evaluation: EvaluationProtocol { + benchmark_suites: BTreeSet::from([NonEmptyText::new("smoke")?]), + primary_metric: MetricSpec { + metric_key: NonEmptyText::new("wall_clock_s")?, + unit: MetricUnit::Seconds, + objective: OptimizationObjective::Minimize, + }, + supporting_metrics: BTreeSet::new(), + }, + promotion_criteria: vec![NonEmptyText::new("strict speedup")?], + }, + initial_checkpoint: Some(super::CheckpointSeed { + summary: NonEmptyText::new("seed")?, + snapshot: checkpoint_snapshot(&root, "aaaaaaaaaaaaaaaa")?, + }), + })?; + let frontier_id = projection.frontier.id; + let base_checkpoint_id = projection + .champion_checkpoint_id + .ok_or_else(|| super::StoreError::MissingChampionCheckpoint { frontier_id })?; + let _ = store.define_metric(DefineMetricRequest { + key: NonEmptyText::new("wall_clock_s")?, + unit: MetricUnit::Seconds, + objective: OptimizationObjective::Minimize, + description: Some(NonEmptyText::new("elapsed wall time")?), + })?; + let _ = store.define_run_dimension(DefineRunDimensionRequest { + key: NonEmptyText::new("scenario")?, + value_type: FieldValueType::String, + description: Some(NonEmptyText::new("workload family")?), + })?; + let _ = store.define_run_dimension(DefineRunDimensionRequest { + key: NonEmptyText::new("duration_s")?, + value_type: FieldValueType::Numeric, + description: Some(NonEmptyText::new("time budget in seconds")?), + })?; + + let first_change = store.add_node(CreateNodeRequest { + class: NodeClass::Change, + frontier_id: Some(frontier_id), + title: NonEmptyText::new("first change")?, + summary: Some(NonEmptyText::new("first change summary")?), + tags: None, + payload: NodePayload::with_schema( + store.schema().schema_ref(), + super::json_object(json!({"body": "first body", "latency_hint": 14.0}))?, + ), + annotations: Vec::new(), + attachments: Vec::new(), + })?; + let second_change = store.add_node(CreateNodeRequest { + class: NodeClass::Change, + frontier_id: Some(frontier_id), + title: NonEmptyText::new("second change")?, + summary: Some(NonEmptyText::new("second change summary")?), + tags: None, + payload: NodePayload::with_schema( + store.schema().schema_ref(), + super::json_object(json!({"body": "second body", "latency_hint": 7.0}))?, + ), + annotations: Vec::new(), + attachments: Vec::new(), + })?; + + let first_receipt = store.close_experiment(experiment_request( + &root, + frontier_id, + base_checkpoint_id, + first_change.id, + "bbbbbbbbbbbbbbbb", + "first run", + 10.0, + run_dimensions("belt_4x5", 20.0)?, + )?)?; + let second_receipt = store.close_experiment(experiment_request( + &root, + frontier_id, + base_checkpoint_id, + second_change.id, + "cccccccccccccccc", + "second run", + 5.0, + run_dimensions("belt_4x5", 60.0)?, + )?)?; + + let keys = store.list_metric_keys()?; + assert!(keys.iter().any(|key| { + key.key.as_str() == "wall_clock_s" && key.source == MetricFieldSource::RunMetric + })); + assert!(keys.iter().any(|key| { + key.key.as_str() == "latency_hint" && key.source == MetricFieldSource::ChangePayload + })); + assert!(keys.iter().any(|key| { + key.key.as_str() == "wall_clock_s" + && key.source == MetricFieldSource::RunMetric + && key.description.as_ref().map(NonEmptyText::as_str) == Some("elapsed wall time") + })); + + let filtered_keys = store.list_metric_keys_filtered(MetricKeyQuery { + frontier_id: Some(frontier_id), + source: Some(MetricFieldSource::RunMetric), + dimensions: run_dimensions("belt_4x5", 60.0)?, + })?; + assert_eq!(filtered_keys.len(), 1); + assert_eq!(filtered_keys[0].experiment_count, 1); + + let dimension_summaries = store.list_run_dimensions()?; + assert!(dimension_summaries.iter().any(|dimension| { + dimension.key.as_str() == "benchmark_suite" + && dimension.value_type == FieldValueType::String + && dimension.observed_run_count == 2 + })); + assert!(dimension_summaries.iter().any(|dimension| { + dimension.key.as_str() == "scenario" + && dimension.description.as_ref().map(NonEmptyText::as_str) + == Some("workload family") + })); + assert!(dimension_summaries.iter().any(|dimension| { + dimension.key.as_str() == "duration_s" + && dimension.value_type == FieldValueType::Numeric + && dimension.distinct_value_count == 2 + })); + + let canonical_best = store.best_metrics(MetricBestQuery { + key: NonEmptyText::new("wall_clock_s")?, + frontier_id: Some(frontier_id), + source: Some(MetricFieldSource::RunMetric), + dimensions: run_dimensions("belt_4x5", 60.0)?, + order: None, + limit: 5, + })?; + assert_eq!(canonical_best.len(), 1); + assert_eq!(canonical_best[0].value, 5.0); + assert_eq!( + canonical_best[0].candidate_checkpoint_id, + second_receipt.checkpoint.id + ); + assert_eq!( + canonical_best[0] + .dimensions + .get(&NonEmptyText::new("duration_s")?), + Some(&RunDimensionValue::Numeric(60.0)) + ); + + let payload_best = store.best_metrics(MetricBestQuery { + key: NonEmptyText::new("latency_hint")?, + frontier_id: Some(frontier_id), + source: Some(MetricFieldSource::ChangePayload), + dimensions: run_dimensions("belt_4x5", 60.0)?, + order: Some(MetricRankOrder::Asc), + limit: 5, + })?; + assert_eq!(payload_best.len(), 1); + assert_eq!(payload_best[0].value, 7.0); + assert_eq!(payload_best[0].change_node_id, second_change.id); + + let missing_order = store.best_metrics(MetricBestQuery { + key: NonEmptyText::new("latency_hint")?, + frontier_id: Some(frontier_id), + source: Some(MetricFieldSource::ChangePayload), + dimensions: BTreeMap::new(), + order: None, + limit: 5, + }); + assert!(matches!( + missing_order, + Err(super::StoreError::MetricOrderRequired { .. }) + )); + assert_eq!( + first_receipt.checkpoint.snapshot.commit_hash.as_str(), + "bbbbbbbbbbbbbbbb" + ); + Ok(()) + } + + #[test] + fn opening_store_backfills_legacy_benchmark_suite_dimensions() -> Result<(), super::StoreError> + { + let root = temp_project_root("metric-plane-backfill"); + let mut store = ProjectStore::init( + &root, + NonEmptyText::new("test project")?, + NonEmptyText::new("local.test")?, + )?; + let projection = store.create_frontier(CreateFrontierRequest { + label: NonEmptyText::new("migration frontier")?, + contract_title: NonEmptyText::new("migration contract")?, + contract_summary: None, + contract: FrontierContract { + objective: NonEmptyText::new("exercise metric migration")?, + evaluation: EvaluationProtocol { + benchmark_suites: BTreeSet::from([NonEmptyText::new("smoke")?]), + primary_metric: MetricSpec { + metric_key: NonEmptyText::new("wall_clock_s")?, + unit: MetricUnit::Seconds, + objective: OptimizationObjective::Minimize, + }, + supporting_metrics: BTreeSet::new(), + }, + promotion_criteria: vec![NonEmptyText::new("keep the metric plane queryable")?], + }, + initial_checkpoint: Some(super::CheckpointSeed { + summary: NonEmptyText::new("seed")?, + snapshot: checkpoint_snapshot(&root, "aaaaaaaaaaaaaaaa")?, + }), + })?; + let frontier_id = projection.frontier.id; + let base_checkpoint_id = projection + .champion_checkpoint_id + .ok_or_else(|| super::StoreError::MissingChampionCheckpoint { frontier_id })?; + let change = store.add_node(CreateNodeRequest { + class: NodeClass::Change, + frontier_id: Some(frontier_id), + title: NonEmptyText::new("candidate change")?, + summary: Some(NonEmptyText::new("candidate change summary")?), + tags: None, + payload: NodePayload::with_schema( + store.schema().schema_ref(), + super::json_object(json!({"latency_hint": 9.0}))?, + ), + annotations: Vec::new(), + attachments: Vec::new(), + })?; + let _ = store.close_experiment(experiment_request( + &root, + frontier_id, + base_checkpoint_id, + change.id, + "bbbbbbbbbbbbbbbb", + "migration run", + 11.0, + BTreeMap::from([( + NonEmptyText::new("benchmark_suite")?, + RunDimensionValue::String(NonEmptyText::new("smoke")?), + )]), + )?)?; + drop(store); + + let connection = rusqlite::Connection::open( + root.join(super::STORE_DIR_NAME) + .join(super::STATE_DB_NAME) + .as_std_path(), + )?; + let _ = connection.execute("DELETE FROM run_dimensions", [])?; + drop(connection); + + let reopened = ProjectStore::open(&root)?; + let dimensions = reopened.list_run_dimensions()?; + assert!(dimensions.iter().any(|dimension| { + dimension.key.as_str() == "benchmark_suite" && dimension.observed_run_count == 1 + })); + + let best = reopened.best_metrics(MetricBestQuery { + key: NonEmptyText::new("wall_clock_s")?, + frontier_id: Some(frontier_id), + source: Some(MetricFieldSource::RunMetric), + dimensions: BTreeMap::from([( + NonEmptyText::new("benchmark_suite")?, + RunDimensionValue::String(NonEmptyText::new("smoke")?), + )]), + order: None, + limit: 5, + })?; + assert_eq!(best.len(), 1); + assert_eq!(best[0].value, 11.0); + Ok(()) + } + + fn checkpoint_snapshot( + root: &camino::Utf8Path, + commit: &str, + ) -> Result { + Ok(CheckpointSnapshotRef { + repo_root: root.to_path_buf(), + worktree_root: root.to_path_buf(), + worktree_name: Some(NonEmptyText::new("main")?), + commit_hash: GitCommitHash::new(commit)?, + }) + } + + fn experiment_request( + root: &camino::Utf8Path, + frontier_id: fidget_spinner_core::FrontierId, + base_checkpoint_id: fidget_spinner_core::CheckpointId, + change_node_id: fidget_spinner_core::NodeId, + candidate_commit: &str, + run_title: &str, + wall_clock_s: f64, + dimensions: BTreeMap, + ) -> Result { + Ok(CloseExperimentRequest { + frontier_id, + base_checkpoint_id, + change_node_id, + candidate_summary: NonEmptyText::new(format!("candidate {candidate_commit}"))?, + candidate_snapshot: checkpoint_snapshot(root, candidate_commit)?, + run_title: NonEmptyText::new(run_title)?, + run_summary: Some(NonEmptyText::new("run summary")?), + backend: fidget_spinner_core::ExecutionBackend::WorktreeProcess, + dimensions, + command: CommandRecipe::new( + root.to_path_buf(), + vec![NonEmptyText::new("true")?], + BTreeMap::new(), + )?, + code_snapshot: None, + primary_metric: MetricValue { + key: NonEmptyText::new("wall_clock_s")?, + value: wall_clock_s, + }, + supporting_metrics: Vec::new(), + note: FrontierNote { + summary: NonEmptyText::new("note summary")?, + next_hypotheses: Vec::new(), + }, + verdict: FrontierVerdict::KeepOnFrontier, + decision_title: NonEmptyText::new("decision")?, + decision_rationale: NonEmptyText::new("decision rationale")?, + analysis_node_id: None, + }) + } + + fn run_dimensions( + scenario: &str, + duration_s: f64, + ) -> Result, super::StoreError> { + Ok(BTreeMap::from([ + ( + NonEmptyText::new("benchmark_suite")?, + RunDimensionValue::String(NonEmptyText::new("smoke")?), + ), + ( + NonEmptyText::new("scenario")?, + RunDimensionValue::String(NonEmptyText::new(scenario)?), + ), + ( + NonEmptyText::new("duration_s")?, + RunDimensionValue::Numeric(duration_s), + ), + ])) + } } -- cgit v1.2.3