diff --git a/src/tasks/cache/bundle.rs b/src/tasks/cache/bundle.rs index ab0ad24..f107999 100644 --- a/src/tasks/cache/bundle.rs +++ b/src/tasks/cache/bundle.rs @@ -1,14 +1,13 @@ //! Bundler service responsible for fetching bundles and sending them to the simulator. use crate::config::BuilderConfig; -use init4_bin_base::perms::SharedToken; -use reqwest::{Client, Url}; -use signet_tx_cache::types::{TxCacheBundle, TxCacheBundlesResponse}; +use init4_bin_base::perms::tx_cache::BuilderTxCache; +use signet_tx_cache::{TxCacheError, types::TxCacheBundle}; use tokio::{ sync::mpsc::{UnboundedReceiver, UnboundedSender, unbounded_channel}, task::JoinHandle, time::{self, Duration}, }; -use tracing::{Instrument, debug, error, trace, trace_span}; +use tracing::{Instrument, error, trace, trace_span}; /// Poll interval for the bundle poller in milliseconds. const POLL_INTERVAL_MS: u64 = 1000; @@ -18,10 +17,10 @@ const POLL_INTERVAL_MS: u64 = 1000; pub struct BundlePoller { /// The builder configuration values. config: &'static BuilderConfig, - /// Authentication module that periodically fetches and stores auth tokens. - token: SharedToken, - /// Holds a Reqwest client - client: Client, + + /// Client for the tx cache. + tx_cache: BuilderTxCache, + /// Defines the interval at which the bundler polls the tx-pool for bundles. poll_interval_ms: u64, } @@ -42,26 +41,9 @@ impl BundlePoller { /// Creates a new BundlePoller from the provided builder config and with the specified poll interval in ms. pub fn new_with_poll_interval_ms(poll_interval_ms: u64) -> Self { let config = crate::config(); - let token = config.oauth_token(); - Self { config, token, client: Client::new(), poll_interval_ms } - } - - /// Fetches bundles from the transaction cache and returns them. - pub async fn check_bundle_cache(&mut self) -> eyre::Result> { - let bundle_url: Url = self.config.tx_pool_url.join("bundles")?; - let token = - self.token.secret().await.map_err(|e| eyre::eyre!("Failed to read token: {e}"))?; - - self.client - .get(bundle_url) - .bearer_auth(token) - .send() - .await? - .error_for_status()? - .json() - .await - .map(|resp: TxCacheBundlesResponse| resp.bundles) - .map_err(Into::into) + let cache = signet_tx_cache::TxCache::new(config.tx_pool_url.clone()); + let tx_cache = BuilderTxCache::new(cache, config.oauth_token()); + Self { config, tx_cache, poll_interval_ms } } /// Returns the poll duration as a [`Duration`]. @@ -69,7 +51,27 @@ impl BundlePoller { Duration::from_millis(self.poll_interval_ms) } - async fn task_future(mut self, outbound: UnboundedSender) { + /// Checks the bundle cache for new bundles. + pub async fn check_bundle_cache(&self) -> Result, TxCacheError> { + let res = self.tx_cache.get_bundles().await; + + match res { + Ok(bundles) => { + trace!(count = ?bundles.len(), "found bundles"); + Ok(bundles) + } + Err(TxCacheError::NotOurSlot) => { + trace!("Not our slot to fetch bundles"); + Err(TxCacheError::NotOurSlot) + } + Err(err) => { + error!(?err, "Failed to fetch bundles from tx-cache"); + Err(err) + } + } + } + + async fn task_future(self, outbound: UnboundedSender) { loop { let span = trace_span!("BundlePoller::loop", url = %self.config.tx_pool_url); @@ -85,17 +87,10 @@ impl BundlePoller { // exit the span after the check. drop(_guard); - if let Ok(bundles) = self - .check_bundle_cache() - .instrument(span.clone()) - .await - .inspect_err(|err| debug!(%err, "Error fetching bundles")) - { - let _guard = span.entered(); - trace!(count = ?bundles.len(), "found bundles"); + if let Ok(bundles) = self.check_bundle_cache().instrument(span.clone()).await { for bundle in bundles.into_iter() { if let Err(err) = outbound.send(bundle) { - error!(err = ?err, "Failed to send bundle - channel is dropped"); + span_debug!(span, ?err, "Failed to send bundle - channel is dropped"); break; } } diff --git a/src/tasks/env.rs b/src/tasks/env.rs index 2d65397..7859794 100644 --- a/src/tasks/env.rs +++ b/src/tasks/env.rs @@ -287,10 +287,10 @@ impl EnvTask { .expect("valid timestamp"); let sim_slot = self.config.slot_calculator.current_slot().expect("chain has started"); - // Create a `SimEnv` span + // Create a `BlockConstruction` span let span = info_span!( parent: None, - "SimEnv", + "BlockConstruction", confirmed.host.number = host_block_number, confirmed.host.hash = tracing::field::Empty, confirmed.ru.number = rollup_block_number, @@ -321,7 +321,7 @@ impl EnvTask { Err(QuinceyError::NotOurSlot) => { span_debug!( span, - "not our slot according to quincey - skipping block submission" + "not our slot according to quincey - skipping block construction" ); continue; } @@ -329,7 +329,7 @@ impl EnvTask { span_error!( span, %err, - "error during quincey preflight check - skipping block submission" + "error during quincey preflight check - skipping block construction" ); continue; } @@ -339,13 +339,13 @@ impl EnvTask { let host_block_opt = res_unwrap_or_continue!( host_block_res, span, - error!("error fetching previous host block - skipping block submission") + error!("error fetching previous host block - skipping block construction") ); let host_header = opt_unwrap_or_continue!( host_block_opt, span, - warn!("previous host block not found - skipping block submission") + warn!("previous host block not found - skipping block construction") ) .header .inner; diff --git a/tests/bundle_poller_test.rs b/tests/bundle_poller_test.rs index c49ac8f..440e057 100644 --- a/tests/bundle_poller_test.rs +++ b/tests/bundle_poller_test.rs @@ -7,7 +7,7 @@ async fn test_bundle_poller_roundtrip() -> Result<()> { setup_logging(); setup_test_config(); - let mut bundle_poller = builder::tasks::cache::BundlePoller::new(); + let bundle_poller = builder::tasks::cache::BundlePoller::new(); let _ = bundle_poller.check_bundle_cache().await?;