-
Notifications
You must be signed in to change notification settings - Fork 8
Refactor: storage manager trait splitted in multiple subtraits #311
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
base: v0.41-dev
Are you sure you want to change the base?
Conversation
…d command communciation
|
This PR has merge conflicts with the base branch. Please rebase or merge the base branch into your branch to resolve them. |
📝 WalkthroughWalkthroughThis PR refactors the SPV storage architecture from in-memory state management to a persistent, trait-based storage system. It removes the Changes
Estimated code review effort🎯 4 (Complex) | ⏱️ ~60 minutes Possibly related PRs
Suggested reviewers
Poem
Pre-merge checks and finishing touches❌ Failed checks (1 warning)
✅ Passed checks (2 passed)
✨ Finishing touches
Thanks for using CodeRabbit! It's free for OSS, and your support helps us grow. If you like it, consider giving us a shout-out. Comment |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Actionable comments posted: 7
Caution
Some comments are outside the diff and can’t be posted inline due to platform limitations.
⚠️ Outside diff range comments (1)
dash-spv/src/storage/segments.rs (1)
413-413: Segment size violates documented storage architecture: change from 10,000 to 50,000 items per segment is not reflected in guidelines.The coding guideline specifies "Store headers in 10,000-header segments", but the implementation uses
const ITEMS_PER_SEGMENT: u32 = 50_000;(line 413). This 5x increase impacts memory usage, disk I/O patterns, and persistence granularity. Either revert to 10,000 items per segment to match the documented architecture, or update CLAUDE.md to reflect and justify the 50,000-item design decision.
🧹 Nitpick comments (15)
dash-spv/src/client/progress.rs (1)
41-43: Consider logging storage errors for observability.The API change from
Result<Option<u32>>toOption<u32>means storage errors are now silently ignored when retrieving the tip height. While acceptable for stats collection, consider logging storage failures at the debug or trace level to aid diagnostics.🔎 Example with error logging
-if let Some(header_height) = storage.get_tip_height().await { +match storage.get_tip_height().await { + Some(header_height) => { stats.header_height = header_height; + } + None => { + tracing::trace!("Could not retrieve tip height from storage for stats"); + } }dash-spv/src/client/core.rs (1)
192-198: Simplify the double??pattern.The chained
??operators on line 195 are harder to read than necessary. Consider flattening this logic:🔎 Suggested refactor
pub async fn tip_hash(&self) -> Option<dashcore::BlockHash> { let storage = self.storage.lock().await; let tip_height = storage.get_tip_height().await?; - let header = storage.get_header(tip_height).await.ok()??; + let header = storage.get_header(tip_height).await.ok().flatten()?; Some(header.block_hash()) }Alternatively, you could use
and_thenfor clarity:pub async fn tip_hash(&self) -> Option<dashcore::BlockHash> { let storage = self.storage.lock().await; - let tip_height = storage.get_tip_height().await?; - let header = storage.get_header(tip_height).await.ok()??; - - Some(header.block_hash()) + storage + .get_tip_height() + .await + .and_then(|height| storage.get_header(height).await.ok().flatten()) + .map(|header| header.block_hash()) }dash-spv/src/sync/transitions.rs (1)
180-180: Inconsistent error handling betweenget_tip_heightandget_filter_tip_height.
get_tip_height().await.unwrap_or(0)silently defaults to 0 on storage errors, whileget_filter_tip_height()(line 412-416) still maps and propagates errors. This inconsistency could mask storage issues during header sync while surfacing them during filter sync.Consider either:
- Using consistent error handling for both (propagate errors or use defaults)
- Adding a log/trace when defaulting to 0 to aid debugging
🔎 Suggestion to add trace logging
- let start_height = storage.get_tip_height().await.unwrap_or(0); + let start_height = storage.get_tip_height().await.unwrap_or_else(|| { + tracing::trace!("No tip height in storage, starting from height 0"); + 0 + });dash-spv/src/storage/chainstate.rs (2)
74-97: Consider using serde derive for ChainState deserialization.Manual JSON field extraction is verbose and error-prone. If
ChainStatederivesSerializeandDeserialize, you can simplify this significantly and ensure field consistency between store and load.🔎 Suggested simplification
If ChainState derives serde traits:
- let value: serde_json::Value = serde_json::from_str(&content).map_err(|e| { - crate::error::StorageError::Serialization(format!("Failed to parse chain state: {}", e)) - })?; - - let state = ChainState { - last_chainlock_height: value - .get("last_chainlock_height") - .and_then(|v| v.as_u64()) - .map(|h| h as u32), - // ... many more lines - }; + let mut state: ChainState = serde_json::from_str(&content).map_err(|e| { + crate::error::StorageError::Serialization(format!("Failed to parse chain state: {}", e)) + })?; + // Reset runtime-only field + state.masternode_engine = None;
43-61: Manual JSON construction could be replaced with serde serialization.If
ChainStatederivesSerialize, useserde_json::to_string(&state)for consistency and automatic handling of all fields.dash-spv/src/storage/transactions.rs (1)
39-42: Naming may be misleading for non-persistent storage.
PersistentTransactionStoragedoesn't actually persist data (as noted in line 57). Consider renaming toInMemoryTransactionStorageor adding a doc comment explaining the design rationale.🔎 Suggested documentation
+/// In-memory transaction storage for mempool data. +/// +/// Note: Despite the "Persistent" prefix (for trait conformance), this storage +/// intentionally does not persist mempool data to disk, as mempool state is +/// transient and rebuilds from network on restart. pub struct PersistentTransactionStorage { mempool_transactions: HashMap<Txid, UnconfirmedTransaction>, mempool_state: Option<MempoolState>, }dash-spv/src/storage/blocks.rs (1)
117-127:persist_dirtydoes not update the hash-to-height index.If the application crashes after calling
persist_dirtybut before a fullpersist, the index file may be stale on next startup. The fallback inload()(line 94) rebuilds the index from segments, so this is safe but potentially slow on restart.Consider persisting the index in
persist_dirtyas well, or documenting this as intentional behavior.dash-spv/src/sync/headers/manager.rs (4)
143-153: Consider combining tip retrieval into a single storage operation.The current implementation makes two separate async calls to get the tip height and then the header at that height. If storage state changes between calls (e.g., in concurrent scenarios), this could lead to inconsistency. While unlikely in the current architecture, a combined
get_tip_header()method would be more robust.
496-505: Checkpoint header is fetched unconditionally but only used in some branches.The
checkpoint_headeris retrieved at the start of the method but is only used wheneffective_tip_heightisNonewith checkpoint sync, or when at checkpoint height. This adds unnecessary I/O overhead for normal non-checkpoint sync paths.🔎 Proposed refactor: fetch checkpoint header lazily
Move the checkpoint header retrieval inside the branches that actually need it, or use a lazy evaluation pattern to avoid the upfront I/O cost when not needed.
523-541: Redundantget_header(0)calls.
storage.get_header(0)is called at line 523 and again at lines 530-535 for the same purpose. The second call checks if the header exists to decide whether to store it, but the first call already retrieved it.🔎 Proposed fix
- if let Some(genesis_header) = storage.get_header(0).await.map_err(|e| { - SyncError::Storage(format!( - "Error trying to get genesis block from storage: {}", - e - )) - })? { - // Store genesis in storage if not already there - if storage - .get_header(0) - .await - .map_err(|e| { - SyncError::Storage(format!("Failed to check genesis: {}", e)) - })? - .is_none() - { - tracing::info!("Storing genesis block in storage"); - storage.store_headers(&[genesis_header]).await.map_err(|e| { - SyncError::Storage(format!("Failed to store genesis: {}", e)) - })?; - } + let existing_genesis = storage.get_header(0).await.map_err(|e| { + SyncError::Storage(format!( + "Error trying to get genesis block from storage: {}", + e + )) + })?; + + if let Some(genesis_header) = existing_genesis { let genesis_hash = genesis_header.block_hash(); tracing::info!("Starting from genesis block: {}", genesis_hash); Some(genesis_hash)
655-679: Checkpoint header retrieved unconditionally; consider lazy loading.Similar to
prepare_sync, the checkpoint header is fetched at the start of the timeout handling path but is only used whencurrent_tip_heightisNoneand syncing from a checkpoint. For most timeout recovery scenarios (when we have a tip), this I/O is unnecessary.dash-spv/src/storage/mod.rs (4)
45-57: Consider adding documentation clarifying thread-safety expectations.Based on learnings, the
StorageManagertrait pattern with&mut selfmethods combined withSend + Syncbounds can be confusing. The implementations use interior mutability (Arc<RwLock<_>>), so explicit documentation would help clarify the thread-safety model and API design rationale.🔎 Suggested documentation
#[async_trait] +/// Composite trait for SPV storage operations. +/// +/// Implementations use interior mutability (e.g., `Arc<RwLock<_>>`) for thread-safe +/// concurrent access while maintaining the `Send + Sync` bounds required for async contexts. pub trait StorageManager: blocks::BlockHeaderStorage + filters::FilterHeaderStorage
78-87: Synchronous I/O in async context.
std::fs::create_dir_allat line 84 performs blocking I/O. In an async context, this could briefly block the executor. Consider usingtokio::fs::create_dir_allfor consistency with the rest of the async operations.🔎 Proposed fix
pub async fn new(storage_path: impl Into<PathBuf> + Send) -> StorageResult<Self> { - use std::fs; - let storage_path = storage_path.into(); // Create directories if they don't exist - fs::create_dir_all(&storage_path)?; + tokio::fs::create_dir_all(&storage_path).await?;
156-161:stop_workerdoes not clearworker_handleafter aborting.After calling
abort(), theworker_handlefield remainsSome(...)with an aborted handle. This could cause confusion ifstop_workeris called multiple times or if code checksworker_handle.is_some(). Theshutdownmethod correctly usestake().🔎 Proposed fix
/// Stop the background worker without forcing a save. - pub(super) fn stop_worker(&self) { - if let Some(handle) = &self.worker_handle { + pub(super) fn stop_worker(&mut self) { + if let Some(handle) = self.worker_handle.take() { handle.abort(); } }
218-227: Consider logging or propagating errors frompersist().The
persist()method silently ignores all errors. During shutdown, failure to persist data could result in data loss. Consider logging warnings or returning a result to allow callers to handle failures.🔎 Proposed fix: log persist errors
async fn persist(&self) { let storage_path = &self.storage_path; - let _ = self.block_headers.write().await.persist(storage_path).await; - let _ = self.filter_headers.write().await.persist(storage_path).await; - let _ = self.filters.write().await.persist(storage_path).await; - let _ = self.transactions.write().await.persist(storage_path).await; - let _ = self.metadata.write().await.persist(storage_path).await; - let _ = self.chainstate.write().await.persist(storage_path).await; + if let Err(e) = self.block_headers.write().await.persist(storage_path).await { + tracing::error!("Failed to persist block headers on shutdown: {}", e); + } + if let Err(e) = self.filter_headers.write().await.persist(storage_path).await { + tracing::error!("Failed to persist filter headers on shutdown: {}", e); + } + if let Err(e) = self.filters.write().await.persist(storage_path).await { + tracing::error!("Failed to persist filters on shutdown: {}", e); + } + if let Err(e) = self.transactions.write().await.persist(storage_path).await { + tracing::error!("Failed to persist transactions on shutdown: {}", e); + } + if let Err(e) = self.metadata.write().await.persist(storage_path).await { + tracing::error!("Failed to persist metadata on shutdown: {}", e); + } + if let Err(e) = self.chainstate.write().await.persist(storage_path).await { + tracing::error!("Failed to persist chainstate on shutdown: {}", e); + } }
The idea of this PR is to split responsibilities when it comes to the storage system. Instead of a single large trait handling all storage operations, we would have multiple traits, each one specialized in a specific system (BlockHeaders, Filters, etc.).
The reason for this change is the need to refactor how peer reputation is stored, along with other systems that we may eventually need. This approach makes the code more scalable and testable, following good design patterns.
To make this change as smooth as possible, I preserve the
StorageManagertrait implementing all the new specialized traits, this wayDiskStorageManagercan still be used as a facade for all the specialized storage structs.Also removed redundant documentation like:
If you find any change that you think needs documentation or comments bcs is not clear by reading the code feel free to ask for changes
This PR is built on top of:
#278
#292
Summary by CodeRabbit
New Features
Bug Fixes
Refactor
clear_filters()public method from client API.✏️ Tip: You can customize this high-level summary in your review settings.