Skip to content
Draft
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
55 changes: 55 additions & 0 deletions Cargo.lock

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

7 changes: 7 additions & 0 deletions Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -56,6 +56,7 @@ reth-rpc-eth-api = { git = "https://github.com/paradigmxyz/reth", tag = "v1.9.1"
reth-optimism-primitives = { git = "https://github.com/paradigmxyz/reth", tag = "v1.9.1" }
reth-rpc-convert = { git = "https://github.com/paradigmxyz/reth", tag = "v1.9.1" }
reth-optimism-rpc = { git = "https://github.com/paradigmxyz/reth", tag = "v1.9.1" }
reth-optimism-payload-builder = { git = "https://github.com/paradigmxyz/reth", tag = "v1.9.1" }
reth-optimism-evm = { git = "https://github.com/paradigmxyz/reth", tag = "v1.9.1" }
reth-optimism-chainspec = { git = "https://github.com/paradigmxyz/reth", tag = "v1.9.1" }
reth-provider = { git = "https://github.com/paradigmxyz/reth", tag = "v1.9.1" }
Expand All @@ -69,6 +70,7 @@ reth-exex = { git = "https://github.com/paradigmxyz/reth", tag = "v1.9.1" }
reth-db = { git = "https://github.com/paradigmxyz/reth", tag = "v1.9.1" }
reth-testing-utils = { git = "https://github.com/paradigmxyz/reth", tag = "v1.9.1" }
reth-db-common = { git = "https://github.com/paradigmxyz/reth", tag = "v1.9.1" }
reth-rpc-server-types = { git = "https://github.com/paradigmxyz/reth", tag = "v1.9.1" }

# revm
revm = { version = "31.0.0", default-features = false }
Expand Down Expand Up @@ -96,6 +98,8 @@ op-alloy-rpc-types-engine = { version = "0.22.0", default-features = false }
op-alloy-rpc-jsonrpsee = { version = "0.22.0", default-features = false }
op-alloy-network = { version = "0.22.0", default-features = false }
op-alloy-consensus = { version = "0.22.0", default-features = false }
op-alloy-flz = { version = "0.13.1", default-features = false }
rdkafka = { version = "0.37.0", default-features = false, features = ["tokio", "ssl-vendored", "libz-static"] }

# rollup-boost
rollup-boost = { git = "http://github.com/flashbots/rollup-boost", tag = "rollup-boost/v0.7.5" }
Expand Down Expand Up @@ -132,3 +136,6 @@ brotli = "8.0.1"
arc-swap = "1.7.1"
once_cell = "1.19"
rand = "0.9.2"
tracing-subscriber = "0.3.18"
parking_lot = "0.12.3"
indexmap = "2.7.0"
17 changes: 13 additions & 4 deletions crates/metering/Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -23,6 +23,7 @@ reth-primitives-traits.workspace = true
reth-evm.workspace = true
reth-optimism-evm.workspace = true
reth-optimism-chainspec.workspace = true
reth-optimism-payload-builder.workspace = true
reth-optimism-primitives.workspace = true
reth-transaction-pool.workspace = true
reth-optimism-cli.workspace = true # Enables serde & codec traits for OpReceipt/OpTxEnvelope
Expand All @@ -34,6 +35,7 @@ alloy-eips.workspace = true

# op-alloy
op-alloy-consensus.workspace = true
op-alloy-flz.workspace = true

# revm
revm.workspace = true
Expand All @@ -45,6 +47,13 @@ jsonrpsee.workspace = true
tracing.workspace = true
serde.workspace = true
eyre.workspace = true
indexmap.workspace = true
parking_lot.workspace = true
tokio.workspace = true
rdkafka.workspace = true
serde_json.workspace = true
metrics.workspace = true
chrono.workspace = true

[dev-dependencies]
alloy-genesis.workspace = true
Expand All @@ -54,10 +63,10 @@ reth-db = { workspace = true, features = ["test-utils"] }
reth-db-common.workspace = true
reth-e2e-test-utils.workspace = true
reth-optimism-node.workspace = true
reth-optimism-payload-builder.workspace = true
reth-optimism-rpc.workspace = true
reth-rpc-server-types.workspace = true
reth-testing-utils.workspace = true
reth-tracing.workspace = true
reth-transaction-pool = { workspace = true, features = ["test-utils"] }
serde_json.workspace = true
tokio.workspace = true


tracing-subscriber.workspace = true
37 changes: 37 additions & 0 deletions crates/metering/README.md
Original file line number Diff line number Diff line change
Expand Up @@ -60,3 +60,40 @@ Note: While some fields like `revertingTxHashes` are part of the TIPS Bundle for
- Stops on first failure
- Automatically registered in `base` namespace

## Upcoming Features

- **In-memory metering cache:** Maintains per-flashblock resource snapshots (gas, DA bytes,
execution time, state-root time) for the latest 12 blocks to support pricing decisions.
- **Stream ingestion:** Background tasks will hydrate the cache by consuming the TIPS Kafka
feed (for timing metrics) and the flashblocks websocket stream (for inclusion order).
- **Priority-fee estimator:** Aggregates cached data in ascending priority-fee order to
project the fee a bundle must pay to satisfy each resource constraint, including
percentile-based recommendations. (Core logic implemented; awaiting live data feed.)
- **`base_meteredPriorityFeePerGas` RPC:** Accepts a TIPS Bundle, meters it locally, and
responds with per-resource fee suggestions for each flashblock index plus aggregated
min/max guidance for next-block and next-flashblock inclusion. Currently returns an error
until the ingestion tasks populate the metering cache.

### Observability Contract

- **Gauges**: `metering.kafka.lag_ms`, `metering.cache.latest_block`, `metering.cache.window_depth`
track data freshness and cache depth.
- **Counters**: `metering.kafka.messages_total`, `metering.kafka.errors_total`,
`metering.kafka.messages_skipped`, `metering.streams.flashblocks_total`,
`metering.streams.misses_total`, `metering.cache.tx_events_total` capture ingestion health.
- Detailed per-transaction diagnostics (missing flashblock index, snapshot drops) are emitted as
structured tracing events instead of high-cardinality metrics.

## Testing & Observability Plan

- **Unit coverage:** Exercise cache eviction, transaction ordering, and estimator threshold
logic with synthetic datasets (see `cache.rs` and `estimator.rs` tests). Extend with Kafka
parsing tests once the ingest message schema is integrated.
- **Integration harness:** Feed mocked Kafka + websocket streams into the ingest pipeline to
validate cache hydration and end-to-end RPC responses. Leverage existing async test
utilities in the workspace for deterministic sequencing.
- **Property-style checks:** Generate random transaction fee/usage distributions to ensure the
estimator produces monotonic thresholds and sensible percentiles across resource types.
- **Metrics & tracing:** Emit gauges for cache freshness (latest block/index), Kafka lag,
websocket heartbeat, and estimator latency. Reuse the existing `tracing` instrumentation
pattern in the repo so operators can alert on stale data paths.
135 changes: 135 additions & 0 deletions crates/metering/src/annotator.rs
Original file line number Diff line number Diff line change
@@ -0,0 +1,135 @@
use crate::{MeteredTransaction, MeteringCache};
use alloy_primitives::TxHash;
use parking_lot::RwLock;
use std::sync::Arc;
use tokio::sync::mpsc::UnboundedReceiver;
use tracing::{debug, info, warn};

/// Message received from the flashblocks websocket feed indicating which
/// transactions were included in a specific flashblock.
#[derive(Debug)]
pub struct FlashblockInclusion {
pub block_number: u64,
pub flashblock_index: u64,
/// Tx hashes included in this flashblock.
pub ordered_tx_hashes: Vec<TxHash>,
}

/// Maximum number of pending transactions before oldest entries are evicted.
const MAX_PENDING_TRANSACTIONS: usize = 10_000;

/// Annotates flashblock transactions with their resource usage.
///
/// The flow is:
/// 1. Kafka sends `MeteredTransaction` with resource usage data keyed by tx hash
/// 2. These are stored in a pending lookup table
/// 3. Websocket sends `FlashblockInclusion` with actual (block, flashblock) location
/// 4. We look up pending transactions and insert them into the cache at the real location
pub struct ResourceAnnotator {
cache: Arc<RwLock<MeteringCache>>,
tx_updates_rx: UnboundedReceiver<MeteredTransaction>,
flashblock_rx: UnboundedReceiver<FlashblockInclusion>,
/// Pending metering data awaiting flashblock inclusion confirmation.
/// Uses IndexMap to maintain insertion order for FIFO eviction.
pending_transactions: indexmap::IndexMap<TxHash, MeteredTransaction>,
}

impl ResourceAnnotator {
pub fn new(
cache: Arc<RwLock<MeteringCache>>,
tx_updates_rx: UnboundedReceiver<MeteredTransaction>,
flashblock_rx: UnboundedReceiver<FlashblockInclusion>,
) -> Self {
Self {
cache,
tx_updates_rx,
flashblock_rx,
pending_transactions: indexmap::IndexMap::new(),
}
}

pub async fn run(mut self) {
info!(target: "metering::annotator", "Starting ResourceAnnotator");
loop {
tokio::select! {
Some(tx_event) = self.tx_updates_rx.recv() => {
self.handle_tx_event(tx_event);
}
Some(flashblock_event) = self.flashblock_rx.recv() => {
self.handle_flashblock_event(flashblock_event);
}
else => {
info!(target: "metering::annotator", "ResourceAnnotator terminating");
break;
}
}
}
}

fn handle_tx_event(&mut self, tx: MeteredTransaction) {
debug!(
tx_hash = %tx.tx_hash,
gas_used = tx.gas_used,
"Storing metered transaction in pending map"
);
self.pending_transactions.insert(tx.tx_hash, tx);

// Evict oldest entries if we exceed the limit.
while self.pending_transactions.len() > MAX_PENDING_TRANSACTIONS {
if let Some((evicted_hash, _)) = self.pending_transactions.shift_remove_index(0) {
info!(
tx_hash = %evicted_hash,
"Evicting old transaction from pending map (limit exceeded)"
);
metrics::counter!("metering.pending.evicted").increment(1);
}
}

metrics::gauge!("metering.pending.size").set(self.pending_transactions.len() as f64);
metrics::counter!("metering.kafka.tx_events_total").increment(1);
}

fn handle_flashblock_event(&mut self, event: FlashblockInclusion) {
metrics::counter!("metering.streams.flashblocks_total").increment(1);

let mut matched = 0usize;
let mut missed = 0usize;

{
let mut cache = self.cache.write();
for tx_hash in &event.ordered_tx_hashes {
if let Some(tx) = self.pending_transactions.shift_remove(tx_hash) {
cache.upsert_transaction(event.block_number, event.flashblock_index, tx);
matched += 1;
} else {
missed += 1;
}
}
}

if matched > 0 {
debug!(
block_number = event.block_number,
flashblock_index = event.flashblock_index,
matched,
"Inserted transactions into cache from flashblock"
);
}

// All transactions should come through as bundles. Any misses indicate
// the Kafka event hasn't arrived yet or was lost.
if missed > 0 {
warn!(
block_number = event.block_number,
flashblock_index = event.flashblock_index,
matched,
missed,
"Flashblock contained transactions not found in pending map"
);
metrics::counter!("metering.streams.tx_misses_total").increment(missed as u64);
}

metrics::gauge!("metering.pending.size").set(self.pending_transactions.len() as f64);
metrics::counter!("metering.streams.tx_matched_total").increment(matched as u64);
}
}
Loading