Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
27 changes: 27 additions & 0 deletions Cargo.lock

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

2 changes: 2 additions & 0 deletions apps/sequencer/Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -48,12 +48,14 @@ alloy-primitives = { workspace = true }
alloy-u256-literal = { workspace = true }

anyhow = { workspace = true }
bridgetree = { git = "https://github.com/zcash/incrementalmerkletree", package = "bridgetree" }
bytes = { workspace = true }
chrono = { workspace = true }
console-subscriber = { workspace = true }
eyre = { workspace = true }
futures = { workspace = true }
futures-util = { workspace = true }
incrementalmerkletree = { git = "https://github.com/zcash/incrementalmerkletree", package = "incrementalmerkletree" }
once_cell = { workspace = true }
paste = { workspace = true }
pprof = { version = "0.11", features = ["flamegraph"] }
Expand Down
114 changes: 104 additions & 10 deletions apps/sequencer/src/providers/eth_send_utils.rs
Original file line number Diff line number Diff line change
Expand Up @@ -2,11 +2,12 @@ use actix_web::{rt::time::interval, web::Data};
use alloy::{
hex,
network::TransactionBuilder,
primitives::{Address, Bytes, U256},
primitives::{Address, Bytes},
providers::{Provider, ProviderBuilder},
rpc::types::{eth::TransactionRequest, TransactionReceipt},
};
use alloy_primitives::{FixedBytes, TxHash};

use alloy_primitives::{keccak256, FixedBytes, TxHash, U256};
use blocksense_config::{FeedStrideAndDecimals, GNOSIS_SAFE_CONTRACT_NAME};
use blocksense_data_feeds::feeds_processing::{BatchedAggregatesToSend, VotedFeedUpdate};
use blocksense_registry::config::FeedConfig;
Expand All @@ -20,7 +21,7 @@ use tokio::{

use crate::{
providers::provider::{
parse_eth_address, ProviderStatus, ProviderType, ProvidersMetrics, RpcProvider,
parse_eth_address, HashValue, ProviderStatus, ProviderType, ProvidersMetrics, RpcProvider,
SharedRpcProviders,
},
sequencer_state::SequencerState,
Expand Down Expand Up @@ -396,7 +397,37 @@ pub async fn eth_batch_send_to_contract(
let provider_metrics = &provider.provider_metrics;
let rpc_handle = &provider.provider;

let input = Bytes::from(serialized_updates);
let latest_call_data_hash = keccak256(&serialized_updates);

let mut next_calldata_merkle_tree = provider.calldata_merkle_tree_frontier.clone();
next_calldata_merkle_tree.append(HashValue(latest_call_data_hash));

let prev_calldata_merkle_tree_root = match &provider.merkle_root_in_contract {
Some(stored_hash) => stored_hash.clone(),
None => provider.calldata_merkle_tree_frontier.root(),
};
let next_calldata_merkle_tree_root = next_calldata_merkle_tree.root();

// Merkle tree over all call data management for ADFS contracts
let prev_root_bytes = prev_calldata_merkle_tree_root.0.as_slice();
let next_root_bytes = next_calldata_merkle_tree_root.0.as_slice();

// Compute (with just one allocation) the new input bytes
// from the serialized updates with roots
let mut serialized_updates_with_roots = Vec::with_capacity(
1 + prev_root_bytes.len() + next_root_bytes.len() + serialized_updates.len(),
);

// - Command index (`1` is for writing)
serialized_updates_with_roots.push(1);
// - Previous merkle root bytes
serialized_updates_with_roots.extend_from_slice(prev_root_bytes);
// - Next merkle root bytes
serialized_updates_with_roots.extend_from_slice(next_root_bytes);
// - Original serialized updates
serialized_updates_with_roots.extend_from_slice(&serialized_updates);

let input = Bytes::from(serialized_updates_with_roots);

let receipt;
let tx_time = Instant::now();
Expand Down Expand Up @@ -465,7 +496,7 @@ pub async fn eth_batch_send_to_contract(
return Ok(("timeout".to_string(), feeds_to_update_ids));
}

match get_nonce(
let latest_nonce = match get_nonce(
&net,
rpc_handle,
&sender_address,
Expand All @@ -489,8 +520,23 @@ pub async fn eth_batch_send_to_contract(
"Detected previously submitted transaction included on-chain: {included_tx_hash:?} in network `{net}` block height {block_height}"
);
}

// TODO: maybe move into an else clause of the `if` above,
// i.e. only do it when there were no included transactions found
try_to_sync(
net.as_str(),
&mut provider,
&contract_address,
block_height,
&next_calldata_merkle_tree_root,
latest_nonce,
nonce,
)
.await;

return Ok(("true".to_string(), feeds_to_update_ids));
}
latest_nonce
}
Err(err) => {
warn!("{err}");
Expand Down Expand Up @@ -643,6 +689,16 @@ pub async fn eth_batch_send_to_contract(
Err(err) => {
warn!("Error while submitting transaction in network `{net}` block height {block_height} and address {sender_address} due to {err}");
if err.to_string().contains("execution revert") {
try_to_sync(
net.as_str(),
&mut provider,
&contract_address,
block_height,
&next_calldata_merkle_tree_root,
latest_nonce,
nonce,
)
.await;
return Ok(("false".to_string(), feeds_to_update_ids));
} else {
inc_retries_with_backoff(
Expand Down Expand Up @@ -722,10 +778,18 @@ pub async fn eth_batch_send_to_contract(
log_gas_used(&net, &receipt, transaction_time, provider_metrics).await;

provider.update_history(&updates.updates);
let result = receipt.status().to_string();
if result == "true" {
Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

TODO: Make an enum that contains the tx results from alloy and adds a timeout status.

//Transaction was successfully confirmed therefore we update the latest state hash
let root = next_calldata_merkle_tree.root();
provider.calldata_merkle_tree_frontier = next_calldata_merkle_tree;
provider.merkle_root_in_contract = None;
debug!("Successfully updated contract in network `{net}` block height {block_height} Merkle root {root:?}");
} // TODO: Reread round counters + latest state hash from contract
Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Comment not relevant to this code path. TODO: Remove.

drop(provider);
debug!("Released a read/write lock on provider state in network `{net}` block height {block_height}");

Ok((receipt.status().to_string(), feeds_to_update_ids))
Ok((result, feeds_to_update_ids))
}

#[allow(clippy::too_many_arguments)]
Expand Down Expand Up @@ -824,6 +888,36 @@ pub async fn get_gas_limit(
}
}

async fn try_to_sync(
net: &str,
provider: &mut RpcProvider,
contract_address: &Address,
block_height: u64,
next_calldata_merkle_tree_root: &HashValue,
latest_nonce: u64,
previous_nonce: u64,
) {
let rpc_handle = &provider.provider;
match rpc_handle
.get_storage_at(*contract_address, U256::from(0))
.await
{
Ok(root) => {
if root != next_calldata_merkle_tree_root.0.into() {
// If the nonce in the contract increased and the next state root hash is not as we expect,
// another sequencer was able to post updates for the current block height before this one.
// We need to take this into account and reread the round counters of the feeds.
info!("Updates to contract already posted, network {net}, block_height {block_height}, latest_nonce {latest_nonce}, previous_nonce {previous_nonce}, merkle_root in contract {root}");
provider.merkle_root_in_contract = Some(HashValue(root.into()));
// TODO: Read round counters from contract
}
}
Err(e) => {
warn!("Failed to read root from network {net} with contract address {contract_address} : {e}");
}
}
}

pub async fn get_nonce(
net: &str,
rpc_handle: &ProviderType,
Expand Down Expand Up @@ -1594,7 +1688,7 @@ mod tests {
.await
.expect("Could not serialize updates!");

assert_eq!(serialized_updates.to_bytes().encode_hex(), "01000000000000000000000002000303e00701026869000401ffe00801036279650101000000000000000000000000000000000000000000000000000000000000000701ff0000000000000000000000000000000000000000000000000000000000000008");
assert_eq!(serialized_updates.to_bytes().encode_hex(), "00000002000303e00701026869000401ffe00801036279650101000000000000000000000000000000000000000000000000000000000000000701ff0000000000000000000000000000000000000000000000000000000000000008");
}

use blocksense_feed_registry::types::FeedType;
Expand Down Expand Up @@ -1658,7 +1752,7 @@ mod tests {
// Note: bye is filtered out:
assert_eq!(
serialized_updates.to_bytes().encode_hex(),
"01000000000000000000000001000303e0070102686901010000000000000000000000000000000000000000000000000000000000000007"
"00000001000303e0070102686901010000000000000000000000000000000000000000000000000000000000000007"
);

// Berachain
Expand Down Expand Up @@ -1689,7 +1783,7 @@ mod tests {

assert_eq!(
serialized_updates.to_bytes().encode_hex(),
"01000000000000000000000001000303e0070102686901010000000000000000000000000000000000000000000000000000000000000007"
"00000001000303e0070102686901010000000000000000000000000000000000000000000000000000000000000007"
);

// Manta
Expand Down Expand Up @@ -1719,7 +1813,7 @@ mod tests {

assert_eq!(
serialized_updates.to_bytes().encode_hex(),
"01000000000000000000000001000303e0070102686901010000000000000000000000000000000000000000000000000000000000000007"
"00000001000303e0070102686901010000000000000000000000000000000000000000000000000000000000000007"
);
}

Expand Down
43 changes: 36 additions & 7 deletions apps/sequencer/src/providers/provider.rs
Original file line number Diff line number Diff line change
Expand Up @@ -11,14 +11,15 @@ use alloy::{
},
signers::local::PrivateKeySigner,
};
use alloy_primitives::U256;
use alloy_primitives::{keccak256, B256, U256};
use alloy_u256_literal::u256;
use blocksense_feeds_processing::adfs_gen_calldata::{
calc_row_index, RoundBufferIndices, MAX_HISTORY_ELEMENTS_PER_FEED,
NUM_FEED_IDS_IN_RB_INDEX_RECORD,
};
use blocksense_utils::{EncodedFeedId, FeedId};
use futures::future::join_all;
use incrementalmerkletree::{frontier::Frontier, Hashable, Level};
use reqwest::Url; // TODO @ymadzhunkov include URL directly from url crate

use blocksense_config::{
Expand Down Expand Up @@ -86,7 +87,25 @@ impl Contract {
}
}

#[derive(Clone, Debug, PartialEq, Eq, PartialOrd, Ord)]
pub struct HashValue(pub B256);

impl Hashable for HashValue {
fn combine(_level: Level, a: &Self, b: &Self) -> Self {
HashValue(keccak256([a.0.to_vec(), b.0.to_vec()].concat()))
Copy link
Contributor Author

@HristoStaykov HristoStaykov Nov 7, 2025

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

@reo101: TODO: refactor to not make heap allocations.

}
fn empty_root(_level: Level) -> Self {
HashValue(B256::ZERO)
}

fn empty_leaf() -> Self {
HashValue(B256::ZERO)
}
}

pub struct RpcProvider {
pub calldata_merkle_tree_frontier: Frontier<HashValue, 32>,
pub merkle_root_in_contract: Option<HashValue>,
pub network: String,
pub provider: ProviderType,
pub signer: PrivateKeySigner,
Expand Down Expand Up @@ -233,12 +252,12 @@ async fn load_data_from_chain(
}
}

async fn log_if_contract_exists(rpc_provider: Arc<Mutex<RpcProvider>>, contract_name: String) {
rpc_provider
async fn log_if_contract_exists(provider_mutex: Arc<Mutex<RpcProvider>>, contract_name: String) {
provider_mutex
.lock()
.await
.log_if_contract_exists(&contract_name)
.await
.log_if_contract_exists_and_get_latest_root(contract_name.as_str())
.await;
}

#[derive(Debug, Clone, Copy)]
Expand Down Expand Up @@ -317,6 +336,8 @@ impl RpcProvider {
}
}
RpcProvider {
calldata_merkle_tree_frontier: Frontier::<HashValue, 32>::empty(),
merkle_root_in_contract: None,
network: network.to_string(),
provider,
signer: signer.clone(),
Expand Down Expand Up @@ -764,7 +785,7 @@ impl RpcProvider {
self.rpc_url.clone()
}

pub async fn log_if_contract_exists(&self, contract_name: &str) {
pub async fn log_if_contract_exists_and_get_latest_root(&mut self, contract_name: &str) {
let address = self.get_contract_address(contract_name).ok();
let network = self.network.as_str();
if let Some(addr) = address {
Expand All @@ -783,6 +804,14 @@ impl RpcProvider {
let same_byte_code = expected_byte_code.eq(&a);
if same_byte_code {
info!("Contract {contract_name} exists in network {network} on {addr} matching byte code!");
match self.provider.get_storage_at(addr, U256::from(0)).await {
Ok(root) => {
self.merkle_root_in_contract = Some(HashValue(root.into()));
}
Err(e) => {
warn!("Failed to read root from network {network} with contract address {addr} : {e}");
}
};
} else {
warn!("Contract {contract_name} exists in network {network} on {addr} but bytecode differs! Found {byte_code:?} expected {expected_byte_code:?}");
}
Expand Down Expand Up @@ -1362,7 +1391,7 @@ mod tests {
{
use alloy::hex::FromHex;
// Provided calldata to pre-populate counters/values
let calldata_hex = "0x01000000000000000100000001000303e3970120000000000000000000000000000015d41642f71aa02900000000003647f7d78101010000000000000000000000000000000000000000000000000000000000002397";
let calldata_hex = "0x010000000000000000000000000000000000000000000000000000000000000000908e112c5ba35c399906f7e029ec927d3390b62188d7c51208f3d30e1283da4e00000001000303e3970120000000000000000000000000000015d41642f71aa02900000000003647f7d78101010000000000000000000000000000000000000000000000000000000000002397";

let pre_data: Bytes = Bytes::from_hex(calldata_hex).expect("Invalid calldata hex");

Expand Down
Loading
Loading