diff options
Diffstat (limited to 'crates/fidget-spinner-store-sqlite/src')
| -rw-r--r-- | crates/fidget-spinner-store-sqlite/src/lib.rs | 534 |
1 files changed, 426 insertions, 108 deletions
diff --git a/crates/fidget-spinner-store-sqlite/src/lib.rs b/crates/fidget-spinner-store-sqlite/src/lib.rs index bcdbc01..1862590 100644 --- a/crates/fidget-spinner-store-sqlite/src/lib.rs +++ b/crates/fidget-spinner-store-sqlite/src/lib.rs @@ -13,7 +13,7 @@ use fidget_spinner_core::{ FrontierContract, FrontierNote, FrontierProjection, FrontierRecord, FrontierStatus, FrontierVerdict, GitCommitHash, InferencePolicy, JsonObject, MetricDefinition, MetricSpec, MetricUnit, MetricValue, NodeAnnotation, NodeClass, NodeDiagnostics, NodePayload, NonEmptyText, - OptimizationObjective, ProjectFieldSpec, ProjectSchema, RunDimensionDefinition, + OpenExperiment, OptimizationObjective, ProjectFieldSpec, ProjectSchema, RunDimensionDefinition, RunDimensionValue, RunRecord, RunStatus, TagName, TagRecord, }; use rusqlite::types::Value as SqlValue; @@ -29,6 +29,7 @@ pub const STORE_DIR_NAME: &str = ".fidget_spinner"; pub const STATE_DB_NAME: &str = "state.sqlite"; pub const PROJECT_CONFIG_NAME: &str = "project.json"; pub const PROJECT_SCHEMA_NAME: &str = "schema.json"; +pub const CURRENT_STORE_FORMAT_VERSION: u32 = 2; #[derive(Debug, Error)] pub enum StoreError { @@ -59,8 +60,14 @@ pub enum StoreError { FrontierNotFound(fidget_spinner_core::FrontierId), #[error("checkpoint {0} was not found")] CheckpointNotFound(fidget_spinner_core::CheckpointId), - #[error("node {0} is not a change node")] - NodeNotChange(fidget_spinner_core::NodeId), + #[error("experiment {0} was not found")] + ExperimentNotFound(fidget_spinner_core::ExperimentId), + #[error("node {0} is not a hypothesis node")] + NodeNotHypothesis(fidget_spinner_core::NodeId), + #[error( + "project store format {observed} is incompatible with this binary (expected {expected}); reinitialize the store" + )] + IncompatibleStoreFormatVersion { observed: u32, expected: u32 }, #[error("frontier {frontier_id} has no champion checkpoint")] MissingChampionCheckpoint { frontier_id: fidget_spinner_core::FrontierId, @@ -130,7 +137,7 @@ impl ProjectConfig { Self { display_name, created_at: OffsetDateTime::now_utc(), - store_format_version: 1, + store_format_version: CURRENT_STORE_FORMAT_VERSION, } } } @@ -219,7 +226,7 @@ pub struct NodeSummary { #[serde(rename_all = "snake_case")] pub enum MetricFieldSource { RunMetric, - ChangePayload, + HypothesisPayload, RunPayload, AnalysisPayload, DecisionPayload, @@ -230,7 +237,7 @@ impl MetricFieldSource { pub const fn as_str(self) -> &'static str { match self { Self::RunMetric => "run_metric", - Self::ChangePayload => "change_payload", + Self::HypothesisPayload => "hypothesis_payload", Self::RunPayload => "run_payload", Self::AnalysisPayload => "analysis_payload", Self::DecisionPayload => "decision_payload", @@ -240,13 +247,11 @@ impl MetricFieldSource { #[must_use] pub const fn from_payload_class(class: NodeClass) -> Option<Self> { match class { - NodeClass::Change => Some(Self::ChangePayload), + NodeClass::Hypothesis => Some(Self::HypothesisPayload), NodeClass::Run => Some(Self::RunPayload), NodeClass::Analysis => Some(Self::AnalysisPayload), NodeClass::Decision => Some(Self::DecisionPayload), - NodeClass::Contract | NodeClass::Research | NodeClass::Enabling | NodeClass::Note => { - None - } + NodeClass::Contract | NodeClass::Source | NodeClass::Note => None, } } } @@ -297,8 +302,8 @@ pub struct MetricBestEntry { pub order: MetricRankOrder, pub experiment_id: fidget_spinner_core::ExperimentId, pub frontier_id: fidget_spinner_core::FrontierId, - pub change_node_id: fidget_spinner_core::NodeId, - pub change_title: NonEmptyText, + pub hypothesis_node_id: fidget_spinner_core::NodeId, + pub hypothesis_title: NonEmptyText, pub run_id: fidget_spinner_core::RunId, pub verdict: FrontierVerdict, pub candidate_checkpoint_id: fidget_spinner_core::CheckpointId, @@ -380,10 +385,35 @@ pub struct CheckpointSeed { } #[derive(Clone, Debug)] -pub struct CloseExperimentRequest { +pub struct OpenExperimentRequest { pub frontier_id: fidget_spinner_core::FrontierId, pub base_checkpoint_id: fidget_spinner_core::CheckpointId, - pub change_node_id: fidget_spinner_core::NodeId, + pub hypothesis_node_id: fidget_spinner_core::NodeId, + pub title: NonEmptyText, + pub summary: Option<NonEmptyText>, +} + +#[derive(Clone, Debug, Deserialize, Eq, PartialEq, Serialize)] +pub struct OpenExperimentSummary { + pub id: fidget_spinner_core::ExperimentId, + pub frontier_id: fidget_spinner_core::FrontierId, + pub base_checkpoint_id: fidget_spinner_core::CheckpointId, + pub hypothesis_node_id: fidget_spinner_core::NodeId, + pub title: NonEmptyText, + pub summary: Option<NonEmptyText>, + pub created_at: OffsetDateTime, +} + +#[derive(Clone, Debug)] +pub struct ExperimentAnalysisDraft { + pub title: NonEmptyText, + pub summary: NonEmptyText, + pub body: NonEmptyText, +} + +#[derive(Clone, Debug)] +pub struct CloseExperimentRequest { + pub experiment_id: fidget_spinner_core::ExperimentId, pub candidate_summary: NonEmptyText, pub candidate_snapshot: CheckpointSnapshotRef, pub run_title: NonEmptyText, @@ -396,16 +426,18 @@ pub struct CloseExperimentRequest { pub supporting_metrics: Vec<MetricValue>, pub note: FrontierNote, pub verdict: FrontierVerdict, + pub analysis: Option<ExperimentAnalysisDraft>, pub decision_title: NonEmptyText, pub decision_rationale: NonEmptyText, - pub analysis_node_id: Option<fidget_spinner_core::NodeId>, } #[derive(Clone, Debug, Deserialize, PartialEq, Serialize)] pub struct ExperimentReceipt { + pub open_experiment: OpenExperiment, pub checkpoint: CheckpointRecord, pub run_node: DagNode, pub run: RunRecord, + pub analysis_node: Option<DagNode>, pub decision_node: DagNode, pub experiment: CompletedExperiment, } @@ -450,6 +482,12 @@ impl ProjectStore { .ok_or(StoreError::MissingProjectStore(requested_root))?; let state_root = state_root(&project_root); let config = read_json_file::<ProjectConfig>(&state_root.join(PROJECT_CONFIG_NAME))?; + if config.store_format_version != CURRENT_STORE_FORMAT_VERSION { + return Err(StoreError::IncompatibleStoreFormatVersion { + observed: config.store_format_version, + expected: CURRENT_STORE_FORMAT_VERSION, + }); + } let schema = read_json_file::<ProjectSchema>(&state_root.join(PROJECT_SCHEMA_NAME))?; let mut connection = Connection::open(state_root.join(STATE_DB_NAME).as_std_path())?; upgrade_store(&mut connection)?; @@ -1098,17 +1136,17 @@ impl ProjectStore { .map_err(StoreError::from) } - pub fn close_experiment( + pub fn open_experiment( &mut self, - request: CloseExperimentRequest, - ) -> Result<ExperimentReceipt, StoreError> { - let change_node = self - .get_node(request.change_node_id)? - .ok_or(StoreError::NodeNotFound(request.change_node_id))?; - if change_node.class != NodeClass::Change { - return Err(StoreError::NodeNotChange(request.change_node_id)); + request: OpenExperimentRequest, + ) -> Result<OpenExperimentSummary, StoreError> { + let hypothesis_node = self + .get_node(request.hypothesis_node_id)? + .ok_or(StoreError::NodeNotFound(request.hypothesis_node_id))?; + if hypothesis_node.class != NodeClass::Hypothesis { + return Err(StoreError::NodeNotHypothesis(request.hypothesis_node_id)); } - if change_node.frontier_id != Some(request.frontier_id) { + if hypothesis_node.frontier_id != Some(request.frontier_id) { return Err(StoreError::FrontierNotFound(request.frontier_id)); } let base_checkpoint = self @@ -1117,6 +1155,102 @@ impl ProjectStore { if base_checkpoint.frontier_id != request.frontier_id { return Err(StoreError::CheckpointNotFound(request.base_checkpoint_id)); } + let experiment = OpenExperiment { + id: fidget_spinner_core::ExperimentId::fresh(), + frontier_id: request.frontier_id, + base_checkpoint_id: request.base_checkpoint_id, + hypothesis_node_id: request.hypothesis_node_id, + title: request.title, + summary: request.summary, + created_at: OffsetDateTime::now_utc(), + }; + let tx = self.connection.transaction()?; + insert_open_experiment(&tx, &experiment)?; + touch_frontier(&tx, request.frontier_id)?; + insert_event( + &tx, + "experiment", + &experiment.id.to_string(), + "experiment.opened", + json!({ + "frontier_id": experiment.frontier_id, + "hypothesis_node_id": experiment.hypothesis_node_id, + "base_checkpoint_id": experiment.base_checkpoint_id, + }), + )?; + tx.commit()?; + Ok(summarize_open_experiment(&experiment)) + } + + pub fn list_open_experiments( + &self, + frontier_id: Option<fidget_spinner_core::FrontierId>, + ) -> Result<Vec<OpenExperimentSummary>, StoreError> { + let mut statement = self.connection.prepare( + "SELECT + id, + frontier_id, + base_checkpoint_id, + hypothesis_node_id, + title, + summary, + created_at + FROM open_experiments + WHERE (?1 IS NULL OR frontier_id = ?1) + ORDER BY created_at DESC", + )?; + let mut rows = statement.query(params![frontier_id.map(|id| id.to_string())])?; + let mut items = Vec::new(); + while let Some(row) = rows.next()? { + items.push(OpenExperimentSummary { + id: parse_experiment_id(&row.get::<_, String>(0)?)?, + frontier_id: parse_frontier_id(&row.get::<_, String>(1)?)?, + base_checkpoint_id: parse_checkpoint_id(&row.get::<_, String>(2)?)?, + hypothesis_node_id: parse_node_id(&row.get::<_, String>(3)?)?, + title: NonEmptyText::new(row.get::<_, String>(4)?)?, + summary: row + .get::<_, Option<String>>(5)? + .map(NonEmptyText::new) + .transpose()?, + created_at: decode_timestamp(&row.get::<_, String>(6)?)?, + }); + } + Ok(items) + } + + pub fn read_open_experiment( + &self, + experiment_id: fidget_spinner_core::ExperimentId, + ) -> Result<OpenExperimentSummary, StoreError> { + load_open_experiment(&self.connection, experiment_id)? + .map(|experiment| summarize_open_experiment(&experiment)) + .ok_or(StoreError::ExperimentNotFound(experiment_id)) + } + + pub fn close_experiment( + &mut self, + request: CloseExperimentRequest, + ) -> Result<ExperimentReceipt, StoreError> { + let open_experiment = load_open_experiment(&self.connection, request.experiment_id)? + .ok_or(StoreError::ExperimentNotFound(request.experiment_id))?; + let hypothesis_node = self + .get_node(open_experiment.hypothesis_node_id)? + .ok_or(StoreError::NodeNotFound(open_experiment.hypothesis_node_id))?; + if hypothesis_node.class != NodeClass::Hypothesis { + return Err(StoreError::NodeNotHypothesis( + open_experiment.hypothesis_node_id, + )); + } + let base_checkpoint = self + .load_checkpoint(open_experiment.base_checkpoint_id)? + .ok_or(StoreError::CheckpointNotFound( + open_experiment.base_checkpoint_id, + ))?; + if base_checkpoint.frontier_id != open_experiment.frontier_id { + return Err(StoreError::CheckpointNotFound( + open_experiment.base_checkpoint_id, + )); + } let tx = self.connection.transaction()?; let dimensions = validate_run_dimensions_tx(&tx, &request.dimensions)?; let primary_metric_definition = @@ -1144,7 +1278,7 @@ impl ProjectStore { let run_diagnostics = self.schema.validate_node(NodeClass::Run, &run_payload); let run_node = DagNode::new( NodeClass::Run, - Some(request.frontier_id), + Some(open_experiment.frontier_id), request.run_title, request.run_summary, run_payload, @@ -1155,7 +1289,7 @@ impl ProjectStore { let run = RunRecord { node_id: run_node.id, run_id, - frontier_id: Some(request.frontier_id), + frontier_id: Some(open_experiment.frontier_id), status: RunStatus::Succeeded, backend: request.backend, code_snapshot: request.code_snapshot, @@ -1165,6 +1299,27 @@ impl ProjectStore { finished_at: Some(now), }; + let analysis_node = request + .analysis + .map(|analysis| -> Result<DagNode, StoreError> { + let payload = NodePayload::with_schema( + self.schema.schema_ref(), + json_object(json!({ + "body": analysis.body.as_str(), + }))?, + ); + let diagnostics = self.schema.validate_node(NodeClass::Analysis, &payload); + Ok(DagNode::new( + NodeClass::Analysis, + Some(open_experiment.frontier_id), + analysis.title, + Some(analysis.summary), + payload, + diagnostics, + )) + }) + .transpose()?; + let decision_payload = NodePayload::with_schema( self.schema.schema_ref(), json_object(json!({ @@ -1177,7 +1332,7 @@ impl ProjectStore { .validate_node(NodeClass::Decision, &decision_payload); let decision_node = DagNode::new( NodeClass::Decision, - Some(request.frontier_id), + Some(open_experiment.frontier_id), request.decision_title, Some(request.decision_rationale.clone()), decision_payload, @@ -1186,7 +1341,7 @@ impl ProjectStore { let checkpoint = CheckpointRecord { id: fidget_spinner_core::CheckpointId::fresh(), - frontier_id: request.frontier_id, + frontier_id: open_experiment.frontier_id, node_id: run_node.id, snapshot: request.candidate_snapshot, disposition: match request.verdict { @@ -1202,15 +1357,17 @@ impl ProjectStore { }; let experiment = CompletedExperiment { - id: fidget_spinner_core::ExperimentId::fresh(), - frontier_id: request.frontier_id, - base_checkpoint_id: request.base_checkpoint_id, + id: open_experiment.id, + frontier_id: open_experiment.frontier_id, + base_checkpoint_id: open_experiment.base_checkpoint_id, candidate_checkpoint_id: checkpoint.id, - change_node_id: request.change_node_id, + hypothesis_node_id: open_experiment.hypothesis_node_id, run_node_id: run_node.id, run_id, - analysis_node_id: request.analysis_node_id, + analysis_node_id: analysis_node.as_ref().map(|node| node.id), decision_node_id: decision_node.id, + title: open_experiment.title.clone(), + summary: open_experiment.summary.clone(), result: ExperimentResult { dimensions: dimensions.clone(), primary_metric: request.primary_metric, @@ -1222,23 +1379,45 @@ impl ProjectStore { created_at: now, }; insert_node(&tx, &run_node)?; + if let Some(node) = analysis_node.as_ref() { + insert_node(&tx, node)?; + } insert_node(&tx, &decision_node)?; insert_edge( &tx, &DagEdge { - source_id: request.change_node_id, + source_id: open_experiment.hypothesis_node_id, target_id: run_node.id, kind: EdgeKind::Lineage, }, )?; - insert_edge( - &tx, - &DagEdge { - source_id: run_node.id, - target_id: decision_node.id, - kind: EdgeKind::Evidence, - }, - )?; + if let Some(node) = analysis_node.as_ref() { + insert_edge( + &tx, + &DagEdge { + source_id: run_node.id, + target_id: node.id, + kind: EdgeKind::Evidence, + }, + )?; + insert_edge( + &tx, + &DagEdge { + source_id: node.id, + target_id: decision_node.id, + kind: EdgeKind::Evidence, + }, + )?; + } else { + insert_edge( + &tx, + &DagEdge { + source_id: run_node.id, + target_id: decision_node.id, + kind: EdgeKind::Evidence, + }, + )?; + } insert_run( &tx, &run, @@ -1251,7 +1430,7 @@ impl ProjectStore { insert_run_dimensions(&tx, run.run_id, &dimensions)?; match request.verdict { FrontierVerdict::PromoteToChampion => { - demote_previous_champion(&tx, request.frontier_id)?; + demote_previous_champion(&tx, open_experiment.frontier_id)?; } FrontierVerdict::KeepOnFrontier | FrontierVerdict::NeedsMoreEvidence @@ -1260,14 +1439,16 @@ impl ProjectStore { } insert_checkpoint(&tx, &checkpoint)?; insert_experiment(&tx, &experiment)?; - touch_frontier(&tx, request.frontier_id)?; + delete_open_experiment(&tx, open_experiment.id)?; + touch_frontier(&tx, open_experiment.frontier_id)?; insert_event( &tx, "experiment", &experiment.id.to_string(), "experiment.closed", json!({ - "frontier_id": request.frontier_id, + "frontier_id": open_experiment.frontier_id, + "hypothesis_node_id": open_experiment.hypothesis_node_id, "verdict": format!("{:?}", request.verdict), "candidate_checkpoint_id": checkpoint.id, }), @@ -1275,9 +1456,11 @@ impl ProjectStore { tx.commit()?; Ok(ExperimentReceipt { + open_experiment, checkpoint, run_node, run, + analysis_node, decision_node, experiment, }) @@ -1363,7 +1546,7 @@ fn upgrade_store(connection: &mut Connection) -> Result<(), StoreError> { } fn validate_prose_node_request(request: &CreateNodeRequest) -> Result<(), StoreError> { - if !matches!(request.class, NodeClass::Note | NodeClass::Research) { + if !matches!(request.class, NodeClass::Note | NodeClass::Source) { return Ok(()); } if request.summary.is_none() { @@ -1382,8 +1565,8 @@ struct MetricSample { value: f64, frontier_id: fidget_spinner_core::FrontierId, experiment_id: fidget_spinner_core::ExperimentId, - change_node_id: fidget_spinner_core::NodeId, - change_title: NonEmptyText, + hypothesis_node_id: fidget_spinner_core::NodeId, + hypothesis_title: NonEmptyText, run_id: fidget_spinner_core::RunId, verdict: FrontierVerdict, candidate_checkpoint_id: fidget_spinner_core::CheckpointId, @@ -1402,8 +1585,8 @@ impl MetricSample { order, experiment_id: self.experiment_id, frontier_id: self.frontier_id, - change_node_id: self.change_node_id, - change_title: self.change_title, + hypothesis_node_id: self.hypothesis_node_id, + hypothesis_title: self.hypothesis_title, run_id: self.run_id, verdict: self.verdict, candidate_checkpoint_id: self.candidate_checkpoint_id, @@ -1558,7 +1741,7 @@ struct ExperimentMetricRow { run_id: fidget_spinner_core::RunId, verdict: FrontierVerdict, candidate_checkpoint: CheckpointRecord, - change_node: DagNode, + hypothesis_node: DagNode, run_node: DagNode, analysis_node: Option<DagNode>, decision_node: DagNode, @@ -1574,7 +1757,7 @@ fn load_experiment_rows(store: &ProjectStore) -> Result<Vec<ExperimentMetricRow> id, frontier_id, run_id, - change_node_id, + hypothesis_node_id, run_node_id, analysis_node_id, decision_node_id, @@ -1587,7 +1770,7 @@ fn load_experiment_rows(store: &ProjectStore) -> Result<Vec<ExperimentMetricRow> let mut rows = statement.query([])?; let mut items = Vec::new(); while let Some(row) = rows.next()? { - let change_node_id = parse_node_id(&row.get::<_, String>(3)?)?; + let hypothesis_node_id = parse_node_id(&row.get::<_, String>(3)?)?; let run_id = parse_run_id(&row.get::<_, String>(2)?)?; let run_node_id = parse_node_id(&row.get::<_, String>(4)?)?; let analysis_node_id = row @@ -1604,9 +1787,9 @@ fn load_experiment_rows(store: &ProjectStore) -> Result<Vec<ExperimentMetricRow> candidate_checkpoint: store .load_checkpoint(candidate_checkpoint_id)? .ok_or(StoreError::CheckpointNotFound(candidate_checkpoint_id))?, - change_node: store - .get_node(change_node_id)? - .ok_or(StoreError::NodeNotFound(change_node_id))?, + hypothesis_node: store + .get_node(hypothesis_node_id)? + .ok_or(StoreError::NodeNotFound(hypothesis_node_id))?, run_node: store .get_node(run_node_id)? .ok_or(StoreError::NodeNotFound(run_node_id))?, @@ -1647,7 +1830,11 @@ fn metric_samples_for_row( MetricFieldSource::RunMetric, ) })); - samples.extend(metric_samples_from_payload(schema, row, &row.change_node)); + samples.extend(metric_samples_from_payload( + schema, + row, + &row.hypothesis_node, + )); samples.extend(metric_samples_from_payload(schema, row, &row.run_node)); if let Some(node) = row.analysis_node.as_ref() { samples.extend(metric_samples_from_payload(schema, row, node)); @@ -1669,8 +1856,8 @@ fn metric_sample_from_observation( value: metric.value, frontier_id: row.frontier_id, experiment_id: row.experiment_id, - change_node_id: row.change_node.id, - change_title: row.change_node.title.clone(), + hypothesis_node_id: row.hypothesis_node.id, + hypothesis_title: row.hypothesis_node.title.clone(), run_id: row.run_id, verdict: row.verdict, candidate_checkpoint_id: row.candidate_checkpoint.id, @@ -1708,8 +1895,8 @@ fn metric_samples_from_payload( value, frontier_id: row.frontier_id, experiment_id: row.experiment_id, - change_node_id: row.change_node.id, - change_title: row.change_node.title.clone(), + hypothesis_node_id: row.hypothesis_node.id, + hypothesis_title: row.hypothesis_node.title.clone(), run_id: row.run_id, verdict: row.verdict, candidate_checkpoint_id: row.candidate_checkpoint.id, @@ -1847,16 +2034,28 @@ fn migrate(connection: &Connection) -> Result<(), StoreError> { PRIMARY KEY (run_id, dimension_key) ); + CREATE TABLE IF NOT EXISTS open_experiments ( + id TEXT PRIMARY KEY, + frontier_id TEXT NOT NULL REFERENCES frontiers(id) ON DELETE CASCADE, + base_checkpoint_id TEXT NOT NULL REFERENCES checkpoints(id) ON DELETE RESTRICT, + hypothesis_node_id TEXT NOT NULL REFERENCES nodes(id) ON DELETE RESTRICT, + title TEXT NOT NULL, + summary TEXT, + created_at TEXT NOT NULL + ); + CREATE TABLE IF NOT EXISTS experiments ( id TEXT PRIMARY KEY, frontier_id TEXT NOT NULL REFERENCES frontiers(id) ON DELETE CASCADE, base_checkpoint_id TEXT NOT NULL REFERENCES checkpoints(id) ON DELETE RESTRICT, candidate_checkpoint_id TEXT NOT NULL REFERENCES checkpoints(id) ON DELETE RESTRICT, - change_node_id TEXT NOT NULL REFERENCES nodes(id) ON DELETE RESTRICT, + hypothesis_node_id TEXT NOT NULL REFERENCES nodes(id) ON DELETE RESTRICT, run_node_id TEXT NOT NULL REFERENCES nodes(id) ON DELETE RESTRICT, run_id TEXT NOT NULL REFERENCES runs(run_id) ON DELETE RESTRICT, analysis_node_id TEXT REFERENCES nodes(id) ON DELETE RESTRICT, decision_node_id TEXT NOT NULL REFERENCES nodes(id) ON DELETE RESTRICT, + title TEXT NOT NULL, + summary TEXT, benchmark_suite TEXT NOT NULL, primary_metric_json TEXT NOT NULL, supporting_metrics_json TEXT NOT NULL, @@ -1870,6 +2069,7 @@ fn migrate(connection: &Connection) -> Result<(), StoreError> { CREATE INDEX IF NOT EXISTS run_dimensions_by_key_text ON run_dimensions(dimension_key, value_text); CREATE INDEX IF NOT EXISTS run_dimensions_by_key_numeric ON run_dimensions(dimension_key, value_numeric); CREATE INDEX IF NOT EXISTS run_dimensions_by_run ON run_dimensions(run_id, dimension_key); + CREATE INDEX IF NOT EXISTS open_experiments_by_frontier ON open_experiments(frontier_id, created_at DESC); CREATE INDEX IF NOT EXISTS experiments_by_frontier ON experiments(frontier_id, created_at DESC); CREATE TABLE IF NOT EXISTS events ( @@ -1889,7 +2089,7 @@ fn backfill_prose_summaries(connection: &Connection) -> Result<(), StoreError> { let mut statement = connection.prepare( "SELECT id, payload_json FROM nodes - WHERE class IN ('note', 'research') + WHERE class IN ('note', 'source') AND (summary IS NULL OR trim(summary) = '')", )?; let mut rows = statement.query([])?; @@ -2838,6 +3038,44 @@ fn insert_run( Ok(()) } +fn insert_open_experiment( + tx: &Transaction<'_>, + experiment: &OpenExperiment, +) -> Result<(), StoreError> { + let _ = tx.execute( + "INSERT INTO open_experiments ( + id, + frontier_id, + base_checkpoint_id, + hypothesis_node_id, + title, + summary, + created_at + ) VALUES (?1, ?2, ?3, ?4, ?5, ?6, ?7)", + params![ + experiment.id.to_string(), + experiment.frontier_id.to_string(), + experiment.base_checkpoint_id.to_string(), + experiment.hypothesis_node_id.to_string(), + experiment.title.as_str(), + experiment.summary.as_ref().map(NonEmptyText::as_str), + encode_timestamp(experiment.created_at)?, + ], + )?; + Ok(()) +} + +fn delete_open_experiment( + tx: &Transaction<'_>, + experiment_id: fidget_spinner_core::ExperimentId, +) -> Result<(), StoreError> { + let _ = tx.execute( + "DELETE FROM open_experiments WHERE id = ?1", + params![experiment_id.to_string()], + )?; + Ok(()) +} + fn insert_experiment( tx: &Transaction<'_>, experiment: &CompletedExperiment, @@ -2848,11 +3086,13 @@ fn insert_experiment( frontier_id, base_checkpoint_id, candidate_checkpoint_id, - change_node_id, + hypothesis_node_id, run_node_id, run_id, analysis_node_id, decision_node_id, + title, + summary, benchmark_suite, primary_metric_json, supporting_metrics_json, @@ -2860,17 +3100,19 @@ fn insert_experiment( note_next_json, verdict, created_at - ) VALUES (?1, ?2, ?3, ?4, ?5, ?6, ?7, ?8, ?9, ?10, ?11, ?12, ?13, ?14, ?15, ?16)", + ) VALUES (?1, ?2, ?3, ?4, ?5, ?6, ?7, ?8, ?9, ?10, ?11, ?12, ?13, ?14, ?15, ?16, ?17, ?18)", params![ experiment.id.to_string(), experiment.frontier_id.to_string(), experiment.base_checkpoint_id.to_string(), experiment.candidate_checkpoint_id.to_string(), - experiment.change_node_id.to_string(), + experiment.hypothesis_node_id.to_string(), experiment.run_node_id.to_string(), experiment.run_id.to_string(), experiment.analysis_node_id.map(|id| id.to_string()), experiment.decision_node_id.to_string(), + experiment.title.as_str(), + experiment.summary.as_ref().map(NonEmptyText::as_str), benchmark_suite_label(&experiment.result.dimensions), encode_json(&experiment.result.primary_metric)?, encode_json(&experiment.result.supporting_metrics)?, @@ -2904,6 +3146,60 @@ fn insert_event( Ok(()) } +fn load_open_experiment( + connection: &Connection, + experiment_id: fidget_spinner_core::ExperimentId, +) -> Result<Option<OpenExperiment>, StoreError> { + let mut statement = connection.prepare( + "SELECT + id, + frontier_id, + base_checkpoint_id, + hypothesis_node_id, + title, + summary, + created_at + FROM open_experiments + WHERE id = ?1", + )?; + statement + .query_row(params![experiment_id.to_string()], |row| { + Ok(OpenExperiment { + id: parse_experiment_id(&row.get::<_, String>(0)?) + .map_err(to_sql_conversion_error)?, + frontier_id: parse_frontier_id(&row.get::<_, String>(1)?) + .map_err(to_sql_conversion_error)?, + base_checkpoint_id: parse_checkpoint_id(&row.get::<_, String>(2)?) + .map_err(to_sql_conversion_error)?, + hypothesis_node_id: parse_node_id(&row.get::<_, String>(3)?) + .map_err(to_sql_conversion_error)?, + title: NonEmptyText::new(row.get::<_, String>(4)?) + .map_err(core_to_sql_conversion_error)?, + summary: row + .get::<_, Option<String>>(5)? + .map(NonEmptyText::new) + .transpose() + .map_err(core_to_sql_conversion_error)?, + created_at: decode_timestamp(&row.get::<_, String>(6)?) + .map_err(to_sql_conversion_error)?, + }) + }) + .optional() + .map_err(StoreError::from) +} + +fn summarize_open_experiment(experiment: &OpenExperiment) -> OpenExperimentSummary { + OpenExperimentSummary { + id: experiment.id, + frontier_id: experiment.frontier_id, + base_checkpoint_id: experiment.base_checkpoint_id, + hypothesis_node_id: experiment.hypothesis_node_id, + title: experiment.title.clone(), + summary: experiment.summary.clone(), + created_at: experiment.created_at, + } +} + fn touch_frontier( tx: &Transaction<'_>, frontier_id: fidget_spinner_core::FrontierId, @@ -3188,12 +3484,11 @@ fn parse_annotation_id(raw: &str) -> Result<fidget_spinner_core::AnnotationId, S fn parse_node_class(raw: &str) -> Result<NodeClass, StoreError> { match raw { "contract" => Ok(NodeClass::Contract), - "change" => Ok(NodeClass::Change), + "hypothesis" => Ok(NodeClass::Hypothesis), "run" => Ok(NodeClass::Run), "analysis" => Ok(NodeClass::Analysis), "decision" => Ok(NodeClass::Decision), - "research" => Ok(NodeClass::Research), - "enabling" => Ok(NodeClass::Enabling), + "source" => Ok(NodeClass::Source), "note" => Ok(NodeClass::Note), other => Err(StoreError::Json(serde_json::Error::io(io::Error::new( io::ErrorKind::InvalidData, @@ -3486,7 +3781,7 @@ mod tests { use super::{ CloseExperimentRequest, CreateFrontierRequest, CreateNodeRequest, DefineMetricRequest, DefineRunDimensionRequest, ListNodesQuery, MetricBestQuery, MetricFieldSource, - MetricKeyQuery, MetricRankOrder, PROJECT_SCHEMA_NAME, ProjectStore, + MetricKeyQuery, MetricRankOrder, OpenExperimentRequest, PROJECT_SCHEMA_NAME, ProjectStore, RemoveSchemaFieldRequest, UpsertSchemaFieldRequest, }; use fidget_spinner_core::{ @@ -3528,7 +3823,7 @@ mod tests { NonEmptyText::new("local.test")?, )?; let node = store.add_node(CreateNodeRequest { - class: NodeClass::Research, + class: NodeClass::Source, frontier_id: None, title: NonEmptyText::new("feature sketch")?, summary: Some(NonEmptyText::new("research note")?), @@ -3747,7 +4042,7 @@ mod tests { )?; let missing_summary = store.add_node(CreateNodeRequest { - class: NodeClass::Research, + class: NodeClass::Source, frontier_id: None, title: NonEmptyText::new("research note")?, summary: None, @@ -3761,7 +4056,7 @@ mod tests { }); assert!(matches!( missing_summary, - Err(super::StoreError::ProseSummaryRequired(NodeClass::Research)) + Err(super::StoreError::ProseSummaryRequired(NodeClass::Source)) )); let missing_body = store.add_node(CreateNodeRequest { @@ -3790,7 +4085,7 @@ mod tests { NonEmptyText::new("local.test")?, )?; let node = store.add_node(CreateNodeRequest { - class: NodeClass::Research, + class: NodeClass::Source, frontier_id: None, title: NonEmptyText::new("research note")?, summary: Some(NonEmptyText::new("temporary summary")?), @@ -3838,7 +4133,7 @@ mod tests { let field = store.upsert_schema_field(UpsertSchemaFieldRequest { name: NonEmptyText::new("scenario")?, - node_classes: BTreeSet::from([NodeClass::Change, NodeClass::Analysis]), + node_classes: BTreeSet::from([NodeClass::Hypothesis, NodeClass::Analysis]), presence: FieldPresence::Recommended, severity: DiagnosticSeverity::Warning, role: FieldRole::ProjectionGate, @@ -3868,7 +4163,7 @@ mod tests { let removed = reopened.remove_schema_field(RemoveSchemaFieldRequest { name: NonEmptyText::new("scenario")?, - node_classes: Some(BTreeSet::from([NodeClass::Change, NodeClass::Analysis])), + node_classes: Some(BTreeSet::from([NodeClass::Hypothesis, NodeClass::Analysis])), })?; assert_eq!(removed, 1); assert_eq!(reopened.schema().version, initial_version + 2); @@ -3934,11 +4229,11 @@ mod tests { description: Some(NonEmptyText::new("time budget in seconds")?), })?; - let first_change = store.add_node(CreateNodeRequest { - class: NodeClass::Change, + let first_hypothesis = store.add_node(CreateNodeRequest { + class: NodeClass::Hypothesis, frontier_id: Some(frontier_id), - title: NonEmptyText::new("first change")?, - summary: Some(NonEmptyText::new("first change summary")?), + title: NonEmptyText::new("first hypothesis")?, + summary: Some(NonEmptyText::new("first hypothesis summary")?), tags: None, payload: NodePayload::with_schema( store.schema().schema_ref(), @@ -3947,11 +4242,11 @@ mod tests { annotations: Vec::new(), attachments: Vec::new(), })?; - let second_change = store.add_node(CreateNodeRequest { - class: NodeClass::Change, + let second_hypothesis = store.add_node(CreateNodeRequest { + class: NodeClass::Hypothesis, frontier_id: Some(frontier_id), - title: NonEmptyText::new("second change")?, - summary: Some(NonEmptyText::new("second change summary")?), + title: NonEmptyText::new("second hypothesis")?, + summary: Some(NonEmptyText::new("second hypothesis summary")?), tags: None, payload: NodePayload::with_schema( store.schema().schema_ref(), @@ -3960,12 +4255,22 @@ mod tests { annotations: Vec::new(), attachments: Vec::new(), })?; + let first_experiment = store.open_experiment(open_experiment_request( + frontier_id, + base_checkpoint_id, + first_hypothesis.id, + "first experiment", + )?)?; + let second_experiment = store.open_experiment(open_experiment_request( + frontier_id, + base_checkpoint_id, + second_hypothesis.id, + "second experiment", + )?)?; let first_receipt = store.close_experiment(experiment_request( &root, - frontier_id, - base_checkpoint_id, - first_change.id, + first_experiment.id, "bbbbbbbbbbbbbbbb", "first run", 10.0, @@ -3973,9 +4278,7 @@ mod tests { )?)?; let second_receipt = store.close_experiment(experiment_request( &root, - frontier_id, - base_checkpoint_id, - second_change.id, + second_experiment.id, "cccccccccccccccc", "second run", 5.0, @@ -3987,7 +4290,7 @@ mod tests { key.key.as_str() == "wall_clock_s" && key.source == MetricFieldSource::RunMetric })); assert!(keys.iter().any(|key| { - key.key.as_str() == "latency_hint" && key.source == MetricFieldSource::ChangePayload + key.key.as_str() == "latency_hint" && key.source == MetricFieldSource::HypothesisPayload })); assert!(keys.iter().any(|key| { key.key.as_str() == "wall_clock_s" @@ -4044,19 +4347,19 @@ mod tests { let payload_best = store.best_metrics(MetricBestQuery { key: NonEmptyText::new("latency_hint")?, frontier_id: Some(frontier_id), - source: Some(MetricFieldSource::ChangePayload), + source: Some(MetricFieldSource::HypothesisPayload), dimensions: run_dimensions("belt_4x5", 60.0)?, order: Some(MetricRankOrder::Asc), limit: 5, })?; assert_eq!(payload_best.len(), 1); assert_eq!(payload_best[0].value, 7.0); - assert_eq!(payload_best[0].change_node_id, second_change.id); + assert_eq!(payload_best[0].hypothesis_node_id, second_hypothesis.id); let missing_order = store.best_metrics(MetricBestQuery { key: NonEmptyText::new("latency_hint")?, frontier_id: Some(frontier_id), - source: Some(MetricFieldSource::ChangePayload), + source: Some(MetricFieldSource::HypothesisPayload), dimensions: BTreeMap::new(), order: None, limit: 5, @@ -4107,11 +4410,11 @@ mod tests { let base_checkpoint_id = projection .champion_checkpoint_id .ok_or_else(|| super::StoreError::MissingChampionCheckpoint { frontier_id })?; - let change = store.add_node(CreateNodeRequest { - class: NodeClass::Change, + let hypothesis = store.add_node(CreateNodeRequest { + class: NodeClass::Hypothesis, frontier_id: Some(frontier_id), - title: NonEmptyText::new("candidate change")?, - summary: Some(NonEmptyText::new("candidate change summary")?), + title: NonEmptyText::new("candidate hypothesis")?, + summary: Some(NonEmptyText::new("candidate hypothesis summary")?), tags: None, payload: NodePayload::with_schema( store.schema().schema_ref(), @@ -4120,11 +4423,15 @@ mod tests { annotations: Vec::new(), attachments: Vec::new(), })?; - let _ = store.close_experiment(experiment_request( - &root, + let experiment = store.open_experiment(open_experiment_request( frontier_id, base_checkpoint_id, - change.id, + hypothesis.id, + "migration experiment", + )?)?; + let _ = store.close_experiment(experiment_request( + &root, + experiment.id, "bbbbbbbbbbbbbbbb", "migration run", 11.0, @@ -4177,20 +4484,31 @@ mod tests { }) } - fn experiment_request( - root: &camino::Utf8Path, + fn open_experiment_request( frontier_id: fidget_spinner_core::FrontierId, base_checkpoint_id: fidget_spinner_core::CheckpointId, - change_node_id: fidget_spinner_core::NodeId, + hypothesis_node_id: fidget_spinner_core::NodeId, + title: &str, + ) -> Result<OpenExperimentRequest, super::StoreError> { + Ok(OpenExperimentRequest { + frontier_id, + base_checkpoint_id, + hypothesis_node_id, + title: NonEmptyText::new(title)?, + summary: Some(NonEmptyText::new(format!("{title} summary"))?), + }) + } + + fn experiment_request( + root: &camino::Utf8Path, + experiment_id: fidget_spinner_core::ExperimentId, candidate_commit: &str, run_title: &str, wall_clock_s: f64, dimensions: BTreeMap<NonEmptyText, RunDimensionValue>, ) -> Result<CloseExperimentRequest, super::StoreError> { Ok(CloseExperimentRequest { - frontier_id, - base_checkpoint_id, - change_node_id, + experiment_id, candidate_summary: NonEmptyText::new(format!("candidate {candidate_commit}"))?, candidate_snapshot: checkpoint_snapshot(root, candidate_commit)?, run_title: NonEmptyText::new(run_title)?, @@ -4213,9 +4531,9 @@ mod tests { next_hypotheses: Vec::new(), }, verdict: FrontierVerdict::KeepOnFrontier, + analysis: None, decision_title: NonEmptyText::new("decision")?, decision_rationale: NonEmptyText::new("decision rationale")?, - analysis_node_id: None, }) } |