swarm repositories / source
aboutsummaryrefslogtreecommitdiff
path: root/crates/fidget-spinner-store-sqlite
diff options
context:
space:
mode:
Diffstat (limited to 'crates/fidget-spinner-store-sqlite')
-rw-r--r--crates/fidget-spinner-store-sqlite/src/lib.rs515
1 files changed, 505 insertions, 10 deletions
diff --git a/crates/fidget-spinner-store-sqlite/src/lib.rs b/crates/fidget-spinner-store-sqlite/src/lib.rs
index 253929e..8a80bbc 100644
--- a/crates/fidget-spinner-store-sqlite/src/lib.rs
+++ b/crates/fidget-spinner-store-sqlite/src/lib.rs
@@ -105,6 +105,8 @@ pub enum StoreError {
MetricOrderRequired { key: String },
#[error("dimension filter references unknown run dimension `{0}`")]
UnknownDimensionFilter(String),
+ #[error("metric scope `{scope}` requires a frontier selector")]
+ MetricScopeRequiresFrontier { scope: &'static str },
}
#[derive(Clone, Debug, Deserialize, Eq, PartialEq, Serialize)]
@@ -141,6 +143,7 @@ pub struct ProjectStatus {
#[serde(rename_all = "snake_case")]
pub enum MetricScope {
Live,
+ Scoreboard,
Visible,
All,
}
@@ -200,12 +203,14 @@ pub enum TextPatch<T> {
}
#[derive(Clone, Debug)]
-pub struct UpdateFrontierBriefRequest {
+pub struct UpdateFrontierRequest {
pub frontier: String,
pub expected_revision: Option<u64>,
+ pub objective: Option<NonEmptyText>,
pub situation: Option<TextPatch<NonEmptyText>>,
pub roadmap: Option<Vec<FrontierRoadmapItemDraft>>,
pub unknowns: Option<Vec<NonEmptyText>>,
+ pub scoreboard_metric_keys: Option<Vec<NonEmptyText>>,
}
#[derive(Clone, Debug)]
@@ -458,6 +463,36 @@ pub struct MetricBestQuery {
pub order: Option<MetricRankOrder>,
}
+#[derive(Clone, Debug)]
+pub struct ExperimentNearestQuery {
+ pub frontier: Option<String>,
+ pub hypothesis: Option<String>,
+ pub experiment: Option<String>,
+ pub metric: Option<NonEmptyText>,
+ pub dimensions: BTreeMap<NonEmptyText, RunDimensionValue>,
+ pub tags: BTreeSet<TagName>,
+ pub order: Option<MetricRankOrder>,
+}
+
+#[derive(Clone, Debug, Deserialize, PartialEq, Serialize)]
+pub struct ExperimentNearestHit {
+ pub experiment: ExperimentSummary,
+ pub hypothesis: HypothesisSummary,
+ pub dimensions: BTreeMap<NonEmptyText, RunDimensionValue>,
+ pub reasons: Vec<NonEmptyText>,
+ pub metric_value: Option<MetricObservationSummary>,
+}
+
+#[derive(Clone, Debug, Deserialize, PartialEq, Serialize)]
+pub struct ExperimentNearestResult {
+ pub metric: Option<MetricKeySummary>,
+ pub target_dimensions: BTreeMap<NonEmptyText, RunDimensionValue>,
+ pub accepted: Option<ExperimentNearestHit>,
+ pub kept: Option<ExperimentNearestHit>,
+ pub rejected: Option<ExperimentNearestHit>,
+ pub champion: Option<ExperimentNearestHit>,
+}
+
#[derive(Clone, Debug, Deserialize, PartialEq, Serialize)]
pub struct MetricBestEntry {
pub experiment: ExperimentSummary,
@@ -486,6 +521,7 @@ pub struct FrontierOpenProjection {
pub frontier: FrontierRecord,
pub active_tags: Vec<TagName>,
pub active_metric_keys: Vec<MetricKeySummary>,
+ pub scoreboard_metric_keys: Vec<MetricKeySummary>,
pub active_hypotheses: Vec<HypothesisCurrentState>,
pub open_experiments: Vec<ExperimentSummary>,
}
@@ -793,9 +829,9 @@ impl ProjectStore {
self.resolve_frontier(selector)
}
- pub fn update_frontier_brief(
+ pub fn update_frontier(
&mut self,
- request: UpdateFrontierBriefRequest,
+ request: UpdateFrontierRequest,
) -> Result<FrontierRecord, StoreError> {
let frontier = self.resolve_frontier(&request.frontier)?;
enforce_revision(
@@ -805,6 +841,17 @@ impl ProjectStore {
frontier.revision,
)?;
let now = OffsetDateTime::now_utc();
+ if let Some(metric_keys) = request.scoreboard_metric_keys.as_ref() {
+ for metric_key in metric_keys {
+ let _ = self
+ .metric_definition(metric_key)?
+ .ok_or_else(|| StoreError::UnknownMetricDefinition(metric_key.clone()))?;
+ }
+ }
+ let brief_changed = request.situation.is_some()
+ || request.roadmap.is_some()
+ || request.unknowns.is_some()
+ || request.scoreboard_metric_keys.is_some();
let brief = FrontierBrief {
situation: apply_optional_text_patch(
request.situation,
@@ -824,23 +871,35 @@ impl ProjectStore {
None => frontier.brief.roadmap.clone(),
},
unknowns: request.unknowns.unwrap_or(frontier.brief.unknowns.clone()),
- revision: frontier.brief.revision.saturating_add(1),
- updated_at: Some(now),
+ scoreboard_metric_keys: request
+ .scoreboard_metric_keys
+ .unwrap_or(frontier.brief.scoreboard_metric_keys.clone()),
+ revision: if brief_changed {
+ frontier.brief.revision.saturating_add(1)
+ } else {
+ frontier.brief.revision
+ },
+ updated_at: if brief_changed {
+ Some(now)
+ } else {
+ frontier.brief.updated_at
+ },
};
let updated = FrontierRecord {
+ objective: request.objective.unwrap_or(frontier.objective.clone()),
brief,
revision: frontier.revision.saturating_add(1),
updated_at: now,
..frontier
};
let transaction = self.connection.transaction()?;
- update_frontier(&transaction, &updated)?;
+ update_frontier_row(&transaction, &updated)?;
record_event(
&transaction,
"frontier",
&updated.id.to_string(),
updated.revision,
- "brief_updated",
+ "updated",
&updated,
)?;
transaction.commit()?;
@@ -1389,10 +1448,12 @@ impl ProjectStore {
let active_tags = derive_active_tags(&active_hypotheses, &open_experiments);
let active_metric_keys =
self.live_metric_keys(frontier.id, &active_hypotheses, &open_experiments)?;
+ let scoreboard_metric_keys = self.frontier_scoreboard_metric_keys(&frontier)?;
Ok(FrontierOpenProjection {
frontier,
active_tags,
active_metric_keys,
+ scoreboard_metric_keys,
active_hypotheses,
open_experiments,
})
@@ -1451,11 +1512,25 @@ impl ProjectStore {
}
pub fn metric_keys(&self, query: MetricKeysQuery) -> Result<Vec<MetricKeySummary>, StoreError> {
- let frontier_id = query
+ let frontier = query
.frontier
.as_deref()
- .map(|selector| self.resolve_frontier(selector).map(|frontier| frontier.id))
+ .map(|selector| self.resolve_frontier(selector))
.transpose()?;
+ let frontier_id = frontier.as_ref().map(|frontier| frontier.id);
+ if query.scope == MetricScope::Scoreboard && frontier.is_none() {
+ return Err(StoreError::MetricScopeRequiresFrontier {
+ scope: "scoreboard",
+ });
+ }
+ if query.scope == MetricScope::Scoreboard {
+ return match frontier.as_ref() {
+ Some(frontier) => self.frontier_scoreboard_metric_keys(frontier),
+ None => Err(StoreError::MetricScopeRequiresFrontier {
+ scope: "scoreboard",
+ }),
+ };
+ }
let definitions = self.list_metric_definitions()?;
let live_keys = frontier_id
.map(|frontier_id| self.live_metric_key_names(frontier_id))
@@ -1465,6 +1540,7 @@ impl ProjectStore {
.into_iter()
.filter(|definition| match query.scope {
MetricScope::Live => live_keys.contains(definition.key.as_str()),
+ MetricScope::Scoreboard => unreachable!("handled above"),
MetricScope::Visible => definition.visibility.is_default_visible(),
MetricScope::All => true,
})
@@ -1548,6 +1624,187 @@ impl ProjectStore {
Ok(apply_limit(entries, query.limit))
}
+ pub fn experiment_nearest(
+ &self,
+ query: ExperimentNearestQuery,
+ ) -> Result<ExperimentNearestResult, StoreError> {
+ let anchor_experiment = query
+ .experiment
+ .as_deref()
+ .map(|selector| self.resolve_experiment(selector))
+ .transpose()?;
+ let anchor_hypothesis = query
+ .hypothesis
+ .as_deref()
+ .map(|selector| self.resolve_hypothesis(selector))
+ .transpose()?;
+ let frontier = match query.frontier.as_deref() {
+ Some(selector) => Some(self.resolve_frontier(selector)?),
+ None => anchor_experiment
+ .as_ref()
+ .map(|experiment| self.resolve_frontier(&experiment.frontier_id.to_string()))
+ .transpose()?
+ .or(anchor_hypothesis
+ .as_ref()
+ .map(|hypothesis| self.resolve_frontier(&hypothesis.frontier_id.to_string()))
+ .transpose()?),
+ };
+ let frontier_id = frontier.as_ref().map(|frontier| frontier.id);
+ let anchor_hypothesis_id = anchor_hypothesis
+ .as_ref()
+ .map(|hypothesis| hypothesis.id)
+ .or_else(|| {
+ anchor_experiment
+ .as_ref()
+ .map(|experiment| experiment.hypothesis_id)
+ });
+ let target_dimensions = if query.dimensions.is_empty() {
+ anchor_experiment
+ .as_ref()
+ .and_then(|experiment| {
+ experiment
+ .outcome
+ .as_ref()
+ .map(|outcome| outcome.dimensions.clone())
+ })
+ .unwrap_or_default()
+ } else {
+ query.dimensions
+ };
+ let metric_definition = match query.metric.as_ref() {
+ Some(key) => Some(
+ self.metric_definition(key)?
+ .ok_or_else(|| StoreError::UnknownMetricDefinition(key.clone()))?,
+ ),
+ None => frontier
+ .as_ref()
+ .and_then(|frontier| frontier.brief.scoreboard_metric_keys.first())
+ .map(|key| {
+ self.metric_definition(key)?
+ .ok_or_else(|| StoreError::UnknownMetricDefinition(key.clone()))
+ })
+ .transpose()?,
+ };
+ let champion_order = metric_definition.as_ref().and_then(|definition| {
+ query.order.or(match definition.objective {
+ OptimizationObjective::Minimize => Some(MetricRankOrder::Asc),
+ OptimizationObjective::Maximize => Some(MetricRankOrder::Desc),
+ OptimizationObjective::Target => None,
+ })
+ });
+ let influence_neighborhood =
+ self.influence_neighborhood(anchor_experiment.as_ref(), anchor_hypothesis_id)?;
+ let candidates = self
+ .load_experiment_records(frontier_id, None, false)?
+ .into_iter()
+ .filter(|record| record.status == ExperimentStatus::Closed)
+ .filter(|record| {
+ anchor_experiment
+ .as_ref()
+ .is_none_or(|anchor| record.id != anchor.id)
+ })
+ .filter(|record| {
+ anchor_hypothesis_id.is_none_or(|hypothesis_id| {
+ anchor_hypothesis.is_none() || record.hypothesis_id == hypothesis_id
+ })
+ })
+ .map(|record| {
+ let Some(outcome) = record.outcome.clone() else {
+ return Ok(None);
+ };
+ let hypothesis_record = self.hypothesis_by_id(record.hypothesis_id)?;
+ if !query.tags.is_empty() {
+ let candidate_tags = record
+ .tags
+ .iter()
+ .cloned()
+ .chain(hypothesis_record.tags.iter().cloned())
+ .collect::<BTreeSet<_>>();
+ if !query.tags.iter().all(|tag| candidate_tags.contains(tag)) {
+ return Ok(None);
+ }
+ }
+ let structural_rank = comparator_rank(
+ &target_dimensions,
+ &outcome.dimensions,
+ anchor_hypothesis_id,
+ hypothesis_record.id,
+ record.id,
+ &influence_neighborhood,
+ );
+ let metric_value = metric_definition.as_ref().and_then(|definition| {
+ all_metrics(&outcome)
+ .into_iter()
+ .find(|metric| metric.key == definition.key)
+ .map(|metric| MetricObservationSummary {
+ key: metric.key,
+ value: metric.value,
+ unit: definition.unit.clone(),
+ objective: definition.objective,
+ })
+ });
+ Ok(Some(NearestComparatorCandidate {
+ closed_at: outcome.closed_at,
+ verdict: outcome.verdict,
+ experiment: self.experiment_summary_from_record(record)?,
+ hypothesis: self.hypothesis_summary_from_record(hypothesis_record)?,
+ dimensions: outcome.dimensions,
+ structural_rank,
+ metric_value,
+ }))
+ })
+ .collect::<Result<Vec<_>, StoreError>>()?
+ .into_iter()
+ .flatten()
+ .collect::<Vec<_>>();
+ let metric = if let Some(definition) = metric_definition.as_ref() {
+ Some(MetricKeySummary {
+ reference_count: self.metric_reference_count(frontier_id, &definition.key)?,
+ key: definition.key.clone(),
+ unit: definition.unit.clone(),
+ objective: definition.objective,
+ visibility: definition.visibility,
+ description: definition.description.clone(),
+ })
+ } else {
+ None
+ };
+ Ok(ExperimentNearestResult {
+ metric,
+ target_dimensions,
+ accepted: pick_nearest_bucket(
+ &candidates,
+ FrontierVerdict::Accepted,
+ metric_definition
+ .as_ref()
+ .map(|definition| definition.key.as_str()),
+ ),
+ kept: pick_nearest_bucket(
+ &candidates,
+ FrontierVerdict::Kept,
+ metric_definition
+ .as_ref()
+ .map(|definition| definition.key.as_str()),
+ ),
+ rejected: pick_nearest_bucket(
+ &candidates,
+ FrontierVerdict::Rejected,
+ metric_definition
+ .as_ref()
+ .map(|definition| definition.key.as_str()),
+ ),
+ champion: champion_order.and_then(|order| {
+ pick_champion_candidate(
+ &candidates,
+ order,
+ metric_definition
+ .as_ref()
+ .map(|definition| definition.key.as_str()),
+ )
+ }),
+ })
+ }
+
pub fn frontier_history(&self, selector: &str) -> Result<Vec<EntityHistoryEntry>, StoreError> {
let frontier = self.resolve_frontier(selector)?;
self.entity_history("frontier", &frontier.id.to_string())
@@ -1784,6 +2041,41 @@ impl ProjectStore {
}
}
+ fn influence_neighborhood(
+ &self,
+ anchor_experiment: Option<&ExperimentRecord>,
+ anchor_hypothesis_id: Option<HypothesisId>,
+ ) -> Result<Vec<VertexRef>, StoreError> {
+ let mut neighborhood = Vec::new();
+ if let Some(hypothesis_id) = anchor_hypothesis_id {
+ let anchor = VertexRef::Hypothesis(hypothesis_id);
+ neighborhood.extend(
+ self.load_vertex_parents(anchor)?
+ .into_iter()
+ .map(|summary| summary.vertex),
+ );
+ neighborhood.extend(
+ self.load_vertex_children(anchor)?
+ .into_iter()
+ .map(|summary| summary.vertex),
+ );
+ }
+ if let Some(experiment) = anchor_experiment {
+ let anchor = VertexRef::Experiment(experiment.id);
+ neighborhood.extend(
+ self.load_vertex_parents(anchor)?
+ .into_iter()
+ .map(|summary| summary.vertex),
+ );
+ neighborhood.extend(
+ self.load_vertex_children(anchor)?
+ .into_iter()
+ .map(|summary| summary.vertex),
+ );
+ }
+ Ok(neighborhood)
+ }
+
fn load_hypothesis_records(
&self,
frontier_id: Option<FrontierId>,
@@ -2178,6 +2470,31 @@ impl ProjectStore {
Ok(keys)
}
+ fn frontier_scoreboard_metric_keys(
+ &self,
+ frontier: &FrontierRecord,
+ ) -> Result<Vec<MetricKeySummary>, StoreError> {
+ frontier
+ .brief
+ .scoreboard_metric_keys
+ .iter()
+ .map(|key| {
+ let definition = self
+ .metric_definition(key)?
+ .ok_or_else(|| StoreError::UnknownMetricDefinition(key.clone()))?;
+ Ok(MetricKeySummary {
+ reference_count: self
+ .metric_reference_count(Some(frontier.id), &definition.key)?,
+ key: definition.key,
+ unit: definition.unit,
+ objective: definition.objective,
+ visibility: definition.visibility,
+ description: definition.description,
+ })
+ })
+ .collect()
+ }
+
fn live_metric_key_names(
&self,
frontier_id: FrontierId,
@@ -2592,7 +2909,7 @@ fn insert_frontier(
Ok(())
}
-fn update_frontier(
+fn update_frontier_row(
transaction: &Transaction<'_>,
frontier: &FrontierRecord,
) -> Result<(), StoreError> {
@@ -3248,6 +3565,184 @@ fn all_metrics(outcome: &ExperimentOutcome) -> Vec<MetricValue> {
.collect()
}
+#[derive(Clone)]
+struct ComparatorRank {
+ exact_dimension_match: bool,
+ core_dimension_matches: usize,
+ matched_dimension_count: usize,
+ same_hypothesis: bool,
+ neighborhood_match: bool,
+}
+
+#[derive(Clone)]
+struct NearestComparatorCandidate {
+ experiment: ExperimentSummary,
+ hypothesis: HypothesisSummary,
+ dimensions: BTreeMap<NonEmptyText, RunDimensionValue>,
+ verdict: FrontierVerdict,
+ closed_at: OffsetDateTime,
+ structural_rank: ComparatorRank,
+ metric_value: Option<MetricObservationSummary>,
+}
+
+fn comparator_rank(
+ target_dimensions: &BTreeMap<NonEmptyText, RunDimensionValue>,
+ candidate_dimensions: &BTreeMap<NonEmptyText, RunDimensionValue>,
+ anchor_hypothesis_id: Option<HypothesisId>,
+ candidate_hypothesis_id: HypothesisId,
+ candidate_experiment_id: ExperimentId,
+ influence_neighborhood: &[VertexRef],
+) -> ComparatorRank {
+ let matched_dimension_keys = target_dimensions
+ .iter()
+ .filter(|(key, value)| {
+ candidate_dimensions
+ .get(*key)
+ .is_some_and(|candidate| candidate == *value)
+ })
+ .map(|(key, _)| key.as_str())
+ .collect::<Vec<_>>();
+ let core_dimension_matches = matched_dimension_keys
+ .iter()
+ .filter(|key| {
+ matches!(
+ **key,
+ "instance" | "profile" | "family" | "duration_s" | "budget_s"
+ )
+ })
+ .count();
+ let exact_dimension_match = !target_dimensions.is_empty()
+ && target_dimensions.len() == candidate_dimensions.len()
+ && dimension_subset_matches(target_dimensions, candidate_dimensions);
+ let same_hypothesis = anchor_hypothesis_id == Some(candidate_hypothesis_id);
+ let neighborhood_match = influence_neighborhood.iter().any(|vertex| {
+ *vertex == VertexRef::Hypothesis(candidate_hypothesis_id)
+ || *vertex == VertexRef::Experiment(candidate_experiment_id)
+ });
+ ComparatorRank {
+ exact_dimension_match,
+ core_dimension_matches,
+ matched_dimension_count: matched_dimension_keys.len(),
+ same_hypothesis,
+ neighborhood_match,
+ }
+}
+
+fn compare_structural_rank(left: &ComparatorRank, right: &ComparatorRank) -> std::cmp::Ordering {
+ (
+ left.exact_dimension_match,
+ left.core_dimension_matches,
+ left.matched_dimension_count,
+ left.same_hypothesis,
+ left.neighborhood_match,
+ )
+ .cmp(&(
+ right.exact_dimension_match,
+ right.core_dimension_matches,
+ right.matched_dimension_count,
+ right.same_hypothesis,
+ right.neighborhood_match,
+ ))
+}
+
+fn preferred_metric_ordering(left: f64, right: f64, order: MetricRankOrder) -> std::cmp::Ordering {
+ compare_metric_values(left, right, order).reverse()
+}
+
+fn pick_nearest_bucket(
+ candidates: &[NearestComparatorCandidate],
+ verdict: FrontierVerdict,
+ metric_key: Option<&str>,
+) -> Option<ExperimentNearestHit> {
+ candidates
+ .iter()
+ .filter(|candidate| candidate.verdict == verdict)
+ .max_by(|left, right| {
+ compare_structural_rank(&left.structural_rank, &right.structural_rank)
+ .then_with(|| left.closed_at.cmp(&right.closed_at))
+ })
+ .map(|candidate| nearest_hit(candidate, metric_key, false))
+}
+
+fn pick_champion_candidate(
+ candidates: &[NearestComparatorCandidate],
+ order: MetricRankOrder,
+ metric_key: Option<&str>,
+) -> Option<ExperimentNearestHit> {
+ candidates
+ .iter()
+ .filter(|candidate| {
+ matches!(
+ candidate.verdict,
+ FrontierVerdict::Accepted | FrontierVerdict::Kept
+ ) && candidate.metric_value.is_some()
+ })
+ .max_by(|left, right| {
+ compare_structural_rank(&left.structural_rank, &right.structural_rank)
+ .then_with(|| match (&left.metric_value, &right.metric_value) {
+ (Some(left_metric), Some(right_metric)) => {
+ preferred_metric_ordering(left_metric.value, right_metric.value, order)
+ }
+ (Some(_), None) => std::cmp::Ordering::Greater,
+ (None, Some(_)) => std::cmp::Ordering::Less,
+ (None, None) => std::cmp::Ordering::Equal,
+ })
+ .then_with(|| left.closed_at.cmp(&right.closed_at))
+ })
+ .map(|candidate| nearest_hit(candidate, metric_key, true))
+}
+
+fn nearest_hit(
+ candidate: &NearestComparatorCandidate,
+ metric_key: Option<&str>,
+ is_champion: bool,
+) -> ExperimentNearestHit {
+ let mut reasons = Vec::new();
+ if candidate.structural_rank.exact_dimension_match {
+ reasons.push(must_non_empty_reason("exact dimension match"));
+ } else if candidate.structural_rank.core_dimension_matches > 0 {
+ reasons.push(must_non_empty_reason(format!(
+ "matched {} core slice keys",
+ candidate.structural_rank.core_dimension_matches
+ )));
+ } else if candidate.structural_rank.matched_dimension_count > 0 {
+ reasons.push(must_non_empty_reason(format!(
+ "matched {} requested dimensions",
+ candidate.structural_rank.matched_dimension_count
+ )));
+ }
+ if candidate.structural_rank.same_hypothesis {
+ reasons.push(must_non_empty_reason("same owning hypothesis"));
+ } else if candidate.structural_rank.neighborhood_match {
+ reasons.push(must_non_empty_reason("same influence neighborhood"));
+ }
+ if is_champion {
+ reasons.push(must_non_empty_reason(format!(
+ "best closed non-rejected result{}",
+ metric_key.map_or_else(String::new, |key| format!(" for {key}"))
+ )));
+ } else {
+ reasons.push(must_non_empty_reason(format!(
+ "nearest {} comparator",
+ candidate.verdict.as_str()
+ )));
+ }
+ ExperimentNearestHit {
+ experiment: candidate.experiment.clone(),
+ hypothesis: candidate.hypothesis.clone(),
+ dimensions: candidate.dimensions.clone(),
+ reasons,
+ metric_value: candidate.metric_value.clone(),
+ }
+}
+
+fn must_non_empty_reason(text: impl Into<String>) -> NonEmptyText {
+ match NonEmptyText::new(text) {
+ Ok(text) => text,
+ Err(_) => unreachable!("comparator reasons must never be empty"),
+ }
+}
+
fn bool_to_sql(value: bool) -> i64 {
i64::from(value)
}