From a8c1a603777fa4becb90343b739f033ba478073f Mon Sep 17 00:00:00 2001 From: Steven Landers Date: Fri, 26 Sep 2025 15:39:57 -0400 Subject: [PATCH 1/2] make prewarm slower --- main.go | 2 +- sender/dispatcher.go | 56 +++++++++++++++++++++++++++++++++++++++++++- 2 files changed, 56 insertions(+), 2 deletions(-) diff --git a/main.go b/main.go index 67111bc..bd15b26 100644 --- a/main.go +++ b/main.go @@ -234,7 +234,7 @@ func runLoadTest(ctx context.Context, cmd *cobra.Command, args []string) error { if settings.Prewarm { log.Printf("🔥 Creating prewarm generator...") prewarmGen := generator.NewPrewarmGenerator(cfg, gen) - dispatcher.SetPrewarmGenerator(prewarmGen) + dispatcher.SetPrewarmGenerator(prewarmGen, cfg.Endpoints[0]) log.Printf("✅ Prewarm generator ready") log.Printf("📝 Prewarm mode: Accounts will be prewarmed") } diff --git a/sender/dispatcher.go b/sender/dispatcher.go index 3e0c5c7..1fc288b 100644 --- a/sender/dispatcher.go +++ b/sender/dispatcher.go @@ -2,12 +2,18 @@ package sender import ( "context" + "errors" "fmt" "log" "sync" + "time" + + "github.com/ethereum/go-ethereum" + "github.com/ethereum/go-ethereum/ethclient" "github.com/sei-protocol/sei-load/generator" "github.com/sei-protocol/sei-load/stats" + "github.com/sei-protocol/sei-load/types" "github.com/sei-protocol/sei-load/utils" ) @@ -15,6 +21,7 @@ import ( type Dispatcher struct { generator generator.Generator prewarmGen utils.Option[generator.Generator] // Optional prewarm generator + prewarmRPC string sender TxSender // Statistics @@ -39,16 +46,18 @@ func (d *Dispatcher) SetStatsCollector(collector *stats.Collector) { } // SetPrewarmGenerator sets the prewarm generator for this dispatcher -func (d *Dispatcher) SetPrewarmGenerator(prewarmGen generator.Generator) { +func (d *Dispatcher) SetPrewarmGenerator(prewarmGen generator.Generator, rpcEndpoint string) { d.mu.Lock() defer d.mu.Unlock() d.prewarmGen = utils.Some(prewarmGen) + d.prewarmRPC = rpcEndpoint } // Prewarm runs the prewarm generator to completion before starting the main load test func (d *Dispatcher) Prewarm(ctx context.Context) error { d.mu.RLock() prewarmGen := d.prewarmGen + endpoint := d.prewarmRPC d.mu.RUnlock() gen, ok := prewarmGen.Get() @@ -56,6 +65,16 @@ func (d *Dispatcher) Prewarm(ctx context.Context) error { return nil } // No prewarming configured + if endpoint == "" { + return fmt.Errorf("prewarm endpoint not configured") + } + + client, err := ethclient.Dial(endpoint) + if err != nil { + return fmt.Errorf("failed to connect to prewarm endpoint: %w", err) + } + defer client.Close() + log.Print("🔥 Starting account prewarming...") processedAccounts := 0 logInterval := 100 @@ -73,6 +92,10 @@ func (d *Dispatcher) Prewarm(ctx context.Context) error { continue } + if err := waitForReceipt(ctx, client, tx); err != nil { + return fmt.Errorf("failed waiting for prewarm receipt for account %s: %w", tx.Scenario.Sender.Address.Hex(), err) + } + processedAccounts++ // Log progress periodically @@ -85,6 +108,37 @@ func (d *Dispatcher) Prewarm(ctx context.Context) error { return nil } +func waitForReceipt(ctx context.Context, client *ethclient.Client, tx *types.LoadTx) error { + const receiptTimeout = 30 * time.Second + const pollInterval = 200 * time.Millisecond + + waitCtx, cancel := context.WithTimeout(ctx, receiptTimeout) + defer cancel() + + ticker := time.NewTicker(pollInterval) + defer ticker.Stop() + + for { + select { + case <-waitCtx.Done(): + return fmt.Errorf("timeout waiting for receipt for tx %s", tx.EthTx.Hash().Hex()) + case <-ticker.C: + receipt, err := client.TransactionReceipt(waitCtx, tx.EthTx.Hash()) + if err != nil { + if errors.Is(err, ethereum.NotFound) { + continue + } + log.Printf("🔥 Error fetching receipt for tx %s: %v", tx.EthTx.Hash().Hex(), err) + continue + } + if receipt.Status != 1 { + return fmt.Errorf("transaction %s failed with status %d", tx.EthTx.Hash().Hex(), receipt.Status) + } + return nil + } + } +} + // Start begins the dispatcher's transaction generation and sending loop func (d *Dispatcher) Run(ctx context.Context) error { for ctx.Err() == nil { From 0c6ec6e3555ad54bdac4fae29102dbe3c6b8e6f8 Mon Sep 17 00:00:00 2001 From: Steven Landers Date: Fri, 26 Sep 2025 16:00:39 -0400 Subject: [PATCH 2/2] make prewarming a LITTLE faster --- README.md | 4 ++ config/settings.go | 106 +++++++++++++++++--------------- config/settings_test.go | 28 +++++---- main.go | 10 ++- sender/dispatcher.go | 131 ++++++++++++++++++++++++++++++++++------ 5 files changed, 200 insertions(+), 79 deletions(-) diff --git a/README.md b/README.md index a6b5e05..22ca8c7 100644 --- a/README.md +++ b/README.md @@ -60,6 +60,8 @@ Edit `my-config.json`: | `--track-blocks` | false | Track block statistics | | `--track-user-latency` | false | Track user latency metrics | | `--prewarm` | false | Prewarm accounts before test | +| `--prewarm-tps` | 100 | Target transactions per second during prewarm (0 = unlimited) | +| `--prewarm-parallelism` | 100 | Maximum in-flight prewarm transactions | ## Examples @@ -142,6 +144,8 @@ Available settings: - `trackBlocks`: Track block statistics - `trackUserLatency`: Track user latency metrics - `prewarm`: Prewarm accounts before test +- `prewarmTPS`: Target transactions per second during prewarm (0 = unlimited) +- `prewarmParallelism`: Maximum number of concurrent prewarm transactions ## Available Scenarios diff --git a/config/settings.go b/config/settings.go index 9224072..5e68b8b 100644 --- a/config/settings.go +++ b/config/settings.go @@ -12,35 +12,39 @@ import ( // Settings holds all CLI-configurable parameters type Settings struct { - Workers int `json:"workers,omitempty"` - TPS float64 `json:"tps,omitempty"` - StatsInterval Duration `json:"statsInterval,omitempty"` - BufferSize int `json:"bufferSize,omitempty"` - DryRun bool `json:"dryRun,omitempty"` - Debug bool `json:"debug,omitempty"` - TrackReceipts bool `json:"trackReceipts,omitempty"` - TrackBlocks bool `json:"trackBlocks,omitempty"` - TrackUserLatency bool `json:"trackUserLatency,omitempty"` - Prewarm bool `json:"prewarm,omitempty"` - RampUp bool `json:"rampUp,omitempty"` - ReportPath string `json:"reportPath,omitempty"` + Workers int `json:"workers,omitempty"` + TPS float64 `json:"tps,omitempty"` + StatsInterval Duration `json:"statsInterval,omitempty"` + BufferSize int `json:"bufferSize,omitempty"` + DryRun bool `json:"dryRun,omitempty"` + Debug bool `json:"debug,omitempty"` + TrackReceipts bool `json:"trackReceipts,omitempty"` + TrackBlocks bool `json:"trackBlocks,omitempty"` + TrackUserLatency bool `json:"trackUserLatency,omitempty"` + Prewarm bool `json:"prewarm,omitempty"` + PrewarmTPS float64 `json:"prewarmTPS,omitempty"` + PrewarmParallelism int `json:"prewarmParallelism,omitempty"` + RampUp bool `json:"rampUp,omitempty"` + ReportPath string `json:"reportPath,omitempty"` } // DefaultSettings returns the default configuration values func DefaultSettings() Settings { return Settings{ - Workers: 1, - TPS: 0.0, - StatsInterval: Duration(10 * time.Second), - BufferSize: 1000, - DryRun: false, - Debug: false, - TrackReceipts: false, - TrackBlocks: false, - TrackUserLatency: false, - Prewarm: false, - RampUp: false, - ReportPath: "", + Workers: 1, + TPS: 0.0, + StatsInterval: Duration(10 * time.Second), + BufferSize: 1000, + DryRun: false, + Debug: false, + TrackReceipts: false, + TrackBlocks: false, + TrackUserLatency: false, + Prewarm: false, + PrewarmTPS: 100.0, + PrewarmParallelism: 100, + RampUp: false, + ReportPath: "", } } @@ -48,18 +52,20 @@ func DefaultSettings() Settings { func InitializeViper(cmd *cobra.Command) error { // Bind flags to viper with error checking flagBindings := map[string]string{ - "statsInterval": "stats-interval", - "bufferSize": "buffer-size", - "tps": "tps", - "dryRun": "dry-run", - "debug": "debug", - "trackReceipts": "track-receipts", - "trackBlocks": "track-blocks", - "prewarm": "prewarm", - "trackUserLatency": "track-user-latency", - "workers": "workers", - "rampUp": "ramp-up", - "reportPath": "report-path", + "statsInterval": "stats-interval", + "bufferSize": "buffer-size", + "tps": "tps", + "dryRun": "dry-run", + "debug": "debug", + "trackReceipts": "track-receipts", + "trackBlocks": "track-blocks", + "prewarm": "prewarm", + "prewarmTPS": "prewarm-tps", + "prewarmParallelism": "prewarm-parallelism", + "trackUserLatency": "track-user-latency", + "workers": "workers", + "rampUp": "ramp-up", + "reportPath": "report-path", } for viperKey, flagName := range flagBindings { @@ -78,6 +84,8 @@ func InitializeViper(cmd *cobra.Command) error { viper.SetDefault("trackReceipts", defaults.TrackReceipts) viper.SetDefault("trackBlocks", defaults.TrackBlocks) viper.SetDefault("prewarm", defaults.Prewarm) + viper.SetDefault("prewarmTPS", defaults.PrewarmTPS) + viper.SetDefault("prewarmParallelism", defaults.PrewarmParallelism) viper.SetDefault("trackUserLatency", defaults.TrackUserLatency) viper.SetDefault("workers", defaults.Workers) viper.SetDefault("rampUp", defaults.RampUp) @@ -108,17 +116,19 @@ func LoadSettings(settings *Settings) error { // ResolveSettings gets the final resolved settings from Viper func ResolveSettings() Settings { return Settings{ - Workers: viper.GetInt("workers"), - TPS: viper.GetFloat64("tps"), - StatsInterval: Duration(viper.GetDuration("statsInterval")), - BufferSize: viper.GetInt("bufferSize"), - DryRun: viper.GetBool("dryRun"), - Debug: viper.GetBool("debug"), - TrackReceipts: viper.GetBool("trackReceipts"), - TrackBlocks: viper.GetBool("trackBlocks"), - TrackUserLatency: viper.GetBool("trackUserLatency"), - Prewarm: viper.GetBool("prewarm"), - RampUp: viper.GetBool("rampUp"), - ReportPath: viper.GetString("reportPath"), + Workers: viper.GetInt("workers"), + TPS: viper.GetFloat64("tps"), + StatsInterval: Duration(viper.GetDuration("statsInterval")), + BufferSize: viper.GetInt("bufferSize"), + DryRun: viper.GetBool("dryRun"), + Debug: viper.GetBool("debug"), + TrackReceipts: viper.GetBool("trackReceipts"), + TrackBlocks: viper.GetBool("trackBlocks"), + TrackUserLatency: viper.GetBool("trackUserLatency"), + Prewarm: viper.GetBool("prewarm"), + PrewarmTPS: viper.GetFloat64("prewarmTPS"), + PrewarmParallelism: viper.GetInt("prewarmParallelism"), + RampUp: viper.GetBool("rampUp"), + ReportPath: viper.GetString("reportPath"), } } diff --git a/config/settings_test.go b/config/settings_test.go index 95f54ae..89b5008 100644 --- a/config/settings_test.go +++ b/config/settings_test.go @@ -89,6 +89,8 @@ func TestArgumentPrecedence(t *testing.T) { cmd.Flags().Bool("track-receipts", false, "Track receipts") cmd.Flags().Bool("track-blocks", false, "Track blocks") cmd.Flags().Bool("prewarm", false, "Prewarm") + cmd.Flags().Float64("prewarm-tps", 0, "Prewarm TPS") + cmd.Flags().Int("prewarm-parallelism", 0, "Prewarm parallelism") cmd.Flags().Bool("track-user-latency", false, "Track user latency") cmd.Flags().Int("buffer-size", 0, "Buffer size") cmd.Flags().Bool("ramp-up", false, "Ramp up loadtest") @@ -121,18 +123,20 @@ func TestDefaultSettings(t *testing.T) { defaults := DefaultSettings() expected := Settings{ - Workers: 1, - TPS: 0.0, - StatsInterval: Duration(10 * time.Second), - BufferSize: 1000, - DryRun: false, - Debug: false, - TrackReceipts: false, - TrackBlocks: false, - TrackUserLatency: false, - Prewarm: false, - RampUp: false, - ReportPath: "", + Workers: 1, + TPS: 0.0, + StatsInterval: Duration(10 * time.Second), + BufferSize: 1000, + DryRun: false, + Debug: false, + TrackReceipts: false, + TrackBlocks: false, + TrackUserLatency: false, + Prewarm: false, + PrewarmTPS: 100.0, + PrewarmParallelism: 100, + RampUp: false, + ReportPath: "", } if defaults != expected { diff --git a/main.go b/main.go index bd15b26..1195cf2 100644 --- a/main.go +++ b/main.go @@ -59,6 +59,8 @@ func init() { rootCmd.Flags().Bool("track-receipts", false, "Track receipts") rootCmd.Flags().Bool("track-blocks", false, "Track blocks") rootCmd.Flags().Bool("prewarm", false, "Prewarm accounts with self-transactions") + rootCmd.Flags().Float64("prewarm-tps", 100, "Target transactions per second during prewarm (0 = unlimited)") + rootCmd.Flags().Int("prewarm-parallelism", 100, "Maximum number of in-flight prewarm transactions (0 = default)") rootCmd.Flags().Bool("track-user-latency", false, "Track user latency") rootCmd.Flags().IntP("workers", "w", 0, "Number of workers") rootCmd.Flags().IntP("nodes", "n", 0, "Number of nodes/endpoints to use (0 = use all)") @@ -234,9 +236,15 @@ func runLoadTest(ctx context.Context, cmd *cobra.Command, args []string) error { if settings.Prewarm { log.Printf("🔥 Creating prewarm generator...") prewarmGen := generator.NewPrewarmGenerator(cfg, gen) - dispatcher.SetPrewarmGenerator(prewarmGen, cfg.Endpoints[0]) + dispatcher.SetPrewarmGenerator(prewarmGen, cfg.Endpoints[0], settings.PrewarmTPS, settings.PrewarmParallelism) log.Printf("✅ Prewarm generator ready") log.Printf("📝 Prewarm mode: Accounts will be prewarmed") + if settings.PrewarmParallelism > 1 { + log.Printf("🔥 Prewarm parallelism: %d in-flight transactions", settings.PrewarmParallelism) + } + if settings.PrewarmTPS > 0 { + log.Printf("🔥 Prewarm TPS limit: %.2f", settings.PrewarmTPS) + } } // Start the sender (starts all workers) diff --git a/sender/dispatcher.go b/sender/dispatcher.go index 1fc288b..591546a 100644 --- a/sender/dispatcher.go +++ b/sender/dispatcher.go @@ -5,11 +5,13 @@ import ( "errors" "fmt" "log" + "math" "sync" "time" "github.com/ethereum/go-ethereum" "github.com/ethereum/go-ethereum/ethclient" + "golang.org/x/time/rate" "github.com/sei-protocol/sei-load/generator" "github.com/sei-protocol/sei-load/stats" @@ -19,10 +21,12 @@ import ( // Dispatcher continuously generates transactions and dispatches them to the sender type Dispatcher struct { - generator generator.Generator - prewarmGen utils.Option[generator.Generator] // Optional prewarm generator - prewarmRPC string - sender TxSender + generator generator.Generator + prewarmGen utils.Option[generator.Generator] // Optional prewarm generator + prewarmRPC string + prewarmRate float64 + prewarmMaxInFlight int + sender TxSender // Statistics totalSent uint64 @@ -46,11 +50,16 @@ func (d *Dispatcher) SetStatsCollector(collector *stats.Collector) { } // SetPrewarmGenerator sets the prewarm generator for this dispatcher -func (d *Dispatcher) SetPrewarmGenerator(prewarmGen generator.Generator, rpcEndpoint string) { +func (d *Dispatcher) SetPrewarmGenerator(prewarmGen generator.Generator, rpcEndpoint string, rate float64, maxInFlight int) { d.mu.Lock() defer d.mu.Unlock() d.prewarmGen = utils.Some(prewarmGen) d.prewarmRPC = rpcEndpoint + if maxInFlight <= 0 { + maxInFlight = 1 + } + d.prewarmRate = rate + d.prewarmMaxInFlight = maxInFlight } // Prewarm runs the prewarm generator to completion before starting the main load test @@ -58,6 +67,8 @@ func (d *Dispatcher) Prewarm(ctx context.Context) error { d.mu.RLock() prewarmGen := d.prewarmGen endpoint := d.prewarmRPC + rateLimit := d.prewarmRate + maxInFlight := d.prewarmMaxInFlight d.mu.RUnlock() gen, ok := prewarmGen.Get() @@ -69,39 +80,123 @@ func (d *Dispatcher) Prewarm(ctx context.Context) error { return fmt.Errorf("prewarm endpoint not configured") } + if maxInFlight <= 0 { + maxInFlight = 1 + } + client, err := ethclient.Dial(endpoint) if err != nil { return fmt.Errorf("failed to connect to prewarm endpoint: %w", err) } defer client.Close() + ctx, cancel := context.WithCancel(ctx) + defer cancel() + + var limiter *rate.Limiter + if rateLimit > 0 { + burst := int(math.Ceil(rateLimit)) + if burst < maxInFlight { + burst = maxInFlight + } + limiter = rate.NewLimiter(rate.Limit(rateLimit), burst) + } + + type prewarmResult struct { + account string + txHash string + err error + } + log.Print("🔥 Starting account prewarming...") - processedAccounts := 0 logInterval := 100 + processedAccounts := 0 + inFlight := 0 + results := make(chan prewarmResult, maxInFlight) + generatorDone := false + var prewarmErr error + + handleResult := func(res prewarmResult) { + inFlight-- + if res.err != nil { + if prewarmErr == nil { + prewarmErr = fmt.Errorf("failed waiting for prewarm receipt for account %s (tx %s): %w", res.account, res.txHash, res.err) + } + cancel() + return + } + processedAccounts++ + if processedAccounts%logInterval == 0 { + log.Printf("🔥 Prewarming progress: %d accounts processed...", processedAccounts) + } + } + + for { + if generatorDone && inFlight == 0 { + break + } + if ctx.Err() != nil && inFlight == 0 { + break + } + + if inFlight > 0 { + select { + case res := <-results: + handleResult(res) + continue + default: + } + } + + if generatorDone || ctx.Err() != nil { + if inFlight > 0 { + res := <-results + handleResult(res) + continue + } + break + } + + if maxInFlight > 0 && inFlight >= maxInFlight { + res := <-results + handleResult(res) + continue + } - // Run prewarm generator until completion - for ctx.Err() == nil { tx, ok := gen.Generate() if !ok { - break // Prewarming is complete + generatorDone = true + continue + } + + if limiter != nil { + if err := limiter.Wait(ctx); err != nil { + if prewarmErr == nil { + prewarmErr = fmt.Errorf("prewarm rate limiter wait failed: %w", err) + } + cancel() + generatorDone = true + continue + } } - // Send the prewarming transaction if err := d.sender.Send(ctx, tx); err != nil { log.Printf("🔥 Failed to send prewarm transaction for account %s: %v", tx.Scenario.Sender.Address.Hex(), err) continue } - if err := waitForReceipt(ctx, client, tx); err != nil { - return fmt.Errorf("failed waiting for prewarm receipt for account %s: %w", tx.Scenario.Sender.Address.Hex(), err) - } + inFlight++ + account := tx.Scenario.Sender.Address.Hex() + txHash := tx.EthTx.Hash().Hex() - processedAccounts++ + go func(tx *types.LoadTx, account string, txHash string) { + err := waitForReceipt(ctx, client, tx) + results <- prewarmResult{account: account, txHash: txHash, err: err} + }(tx, account, txHash) + } - // Log progress periodically - if processedAccounts%logInterval == 0 { - log.Printf("🔥 Prewarming progress: %d accounts processed...", processedAccounts) - } + if prewarmErr != nil { + return prewarmErr } log.Printf("🔥 Prewarming complete! Processed %d accounts", processedAccounts)