From bc1c0e776a7b752b4be9361332d45cab90b0f3fa Mon Sep 17 00:00:00 2001 From: tmcgroul Date: Tue, 6 Jan 2026 14:57:15 +0530 Subject: [PATCH] cleanup unused datasets from hotblocks storage --- crates/hotblocks/src/data_service.rs | 11 +++++++++ crates/storage/src/db/db.rs | 22 +++++++++++++++++ crates/storage/src/db/write/tx.rs | 5 ++++ crates/storage/tests/database_ops.rs | 35 ++++++++++++++++++++++++++++ 4 files changed, 73 insertions(+) diff --git a/crates/hotblocks/src/data_service.rs b/crates/hotblocks/src/data_service.rs index 491f5f93..db3e2f3c 100644 --- a/crates/hotblocks/src/data_service.rs +++ b/crates/hotblocks/src/data_service.rs @@ -7,6 +7,7 @@ use futures::FutureExt; use futures::{StreamExt, TryStreamExt}; use sqd_data_client::reqwest::ReqwestDataClient; use sqd_storage::db::DatasetId; +use tracing::{error, info}; use std::collections::{BTreeMap, HashMap}; use std::sync::Arc; @@ -21,6 +22,16 @@ pub struct DataService { impl DataService { pub async fn start(db: DBRef, datasets: BTreeMap) -> anyhow::Result { + let all_datasets = db.get_all_datasets()?; + for dataset in all_datasets { + if !datasets.contains_key(&dataset.id) { + info!("deleting unconfigured dataset {}", dataset.id); + if let Err(err) = db.delete_dataset(dataset.id) { + error!("failed to delete dataset {}: {}", dataset.id, err); + } + } + } + let mut controllers = futures::stream::iter(datasets.into_iter()) .map(|(dataset_id, cfg)| { let db = db.clone(); diff --git a/crates/storage/src/db/db.rs b/crates/storage/src/db/db.rs index 9bb3eee1..1b75b312 100644 --- a/crates/storage/src/db/db.rs +++ b/crates/storage/src/db/db.rs @@ -257,6 +257,28 @@ impl Database { perform_dataset_compaction(&self.db, dataset_id, max_chunk_size, write_amplification_limit, compaction_len_limit) } + pub fn delete_dataset(&self, dataset_id: DatasetId) -> anyhow::Result<()> { + Tx::new(&self.db).run(|tx| { + let label = tx.find_label_for_update(dataset_id)?; + if label.is_none() { + return Ok(()); + } + + let chunks = tx.list_chunks(dataset_id, 0, None); + for chunk_result in chunks { + let chunk = chunk_result?; + tx.delete_chunk(dataset_id, &chunk)?; + } + + tx.delete_label(dataset_id)?; + + Ok(()) + })?; + + self.cleanup()?; + Ok(()) + } + pub fn cleanup(&self) -> anyhow::Result { deleted_deleted_tables(&self.db) } diff --git a/crates/storage/src/db/write/tx.rs b/crates/storage/src/db/write/tx.rs index ccf65ceb..ad5f7db0 100644 --- a/crates/storage/src/db/write/tx.rs +++ b/crates/storage/src/db/write/tx.rs @@ -110,6 +110,11 @@ impl <'a> Tx<'a> { Ok(()) } + pub fn delete_label(&self, dataset_id: DatasetId) -> anyhow::Result<()> { + self.transaction.delete_cf(self.cf_handle(CF_DATASETS), dataset_id)?; + Ok(()) + } + pub fn write_chunk(&self, dataset_id: DatasetId, chunk: &Chunk) -> anyhow::Result<()> { self.transaction.put_cf( self.cf_handle(CF_CHUNKS), diff --git a/crates/storage/tests/database_ops.rs b/crates/storage/tests/database_ops.rs index ae62831a..febff1d3 100644 --- a/crates/storage/tests/database_ops.rs +++ b/crates/storage/tests/database_ops.rs @@ -397,3 +397,38 @@ fn labels() { assert_eq!(label.version(), 3); // 2 succesfull updates, 1 set finalized head assert_eq!(label.finalized_head(), Some(&finalized_head)); } + +#[test] +fn delete_dataset() { + let (db, dataset_id) = setup_db(); + + let chunk1 = Chunk::V0 { + first_block: 0, + last_block: 100, + last_block_hash: "last_1".to_owned(), + parent_block_hash: "base".to_owned(), + tables: Default::default(), + }; + let chunk2 = Chunk::V0 { + first_block: 101, + last_block: 200, + last_block_hash: "last_2".to_owned(), + parent_block_hash: "last_1".to_owned(), + tables: Default::default(), + }; + + assert!(db.insert_chunk(dataset_id, &chunk1).is_ok()); + assert!(db.insert_chunk(dataset_id, &chunk2).is_ok()); + + let datasets = db.get_all_datasets().unwrap(); + assert_eq!(datasets.len(), 1); + assert_eq!(datasets[0].id, dataset_id); + validate_chunks(&db, dataset_id, [&chunk1, &chunk2].to_vec()); + + assert!(db.delete_dataset(dataset_id).is_ok()); + + let datasets = db.get_all_datasets().unwrap(); + assert_eq!(datasets.len(), 0); + + validate_chunks(&db, dataset_id, [].to_vec()); +}