-
Notifications
You must be signed in to change notification settings - Fork 76
Pagination beyond uint64 #417
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
base: main
Are you sure you want to change the base?
Conversation
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Pull request overview
This PR extends Ghostferry's pagination capability beyond int64/BIGINT columns to support binary pagination keys (UUID, VARBINARY, CHAR types). The implementation introduces a PaginationKey interface with two implementations: Uint64Key (existing behavior) and BinaryKey (new support). State serialization is fully backward compatible, allowing resumption from old uint64-only states.
Key Changes:
- New
PaginationKeyinterface withUint64KeyandBinaryKeyimplementations supporting lexicographic comparison and progress approximation - Backward-compatible state serialization with type information (
{"type": "uint64", "value": 123}/{"type": "binary", "value": "deadbeef..."}) - Updated components (cursor, batch_writer, verifiers, data_iterator) to handle both key types with comprehensive test coverage (+836 test lines)
Reviewed changes
Copilot reviewed 28 out of 28 changed files in this pull request and generated 7 comments.
Show a summary per file
| File | Description |
|---|---|
| pagination_key.go | New file implementing PaginationKey interface with Uint64Key and BinaryKey types |
| state_tracker.go | Custom JSON marshaling/unmarshaling for backward-compatible state serialization |
| table_schema_cache.go | Updated to accept binary/string column types for pagination, modified maxPaginationKey function |
| cursor.go | Pagination iteration updated to handle both uint64 and binary keys with type-specific extraction |
| data_iterator.go | Iterator tracking updated for both key types with fingerprint generation |
| batch_writer.go | Row batch handling updated to extract and track pagination keys by type |
| inline_verifier.go | Fingerprint checking adapted for binary keys with string-based hash storage |
| iterative_verifier.go | Hash calculation and verification updated for mixed key types |
| compression_verifier.go | Decompression logic updated to handle binary pagination keys |
| ferry.go | Progress API converts all keys to uint64 via NumericPosition() for backward compatibility |
| filter.go | CopyFilter interface updated to accept PaginationKey parameter |
| sharding/filter.go | ShardedCopyFilter updated to use PaginationKey.SQLValue() |
| dml_events.go | DML event pagination key extraction returns string representation |
| target_verifier.go | Error message formatting updated for string pagination keys |
| row_batch.go | Fingerprints map changed from uint64 to string keys |
| config.go | Documentation updated with uniqueness requirement warnings |
| test/go/*.go | Comprehensive unit tests for pagination keys, state serialization, and verifiers |
| test/integration/*.rb | Integration tests for UUID table interrupt/resume scenarios |
| test/helpers/db_helper.rb | Helper functions for UUID table seeding and validation |
💡 Add Copilot custom instructions for smarter, more guided reviews. Learn how to get started.
7a37eb4 to
92fbc0d
Compare
test/helpers/db_helper.rb
Outdated
|
|
||
| def generate_uuid_bytes | ||
| uuid_string = SecureRandom.uuid | ||
| uuid_string.gsub("-", "").scan(/../).map { |x| x.hex.chr }.join |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
there's a dedicated set of methods for packing and unpacking such things, this might be easier to understand
[SecureRandom.uuid].pack("H32")There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Removing - first...
92fbc0d to
1cd6fdb
Compare
1cd6fdb to
2f1dff8
Compare
grodowski
left a comment
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Not a finished review, but these are the two things I noticed so far
| } | ||
|
|
||
| // NumericPosition calculates a rough float position. | ||
| func (k BinaryKey) NumericPosition() float64 { |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
BinaryKey.NumericPosition is used for progress tracking and progress deltas (where we subtract those), it seems the way it's currently implemented it will report correct progress with a UUID v7 (or anything that has a monotonic values in those first bits), but progress reporting will be unpredictable with random data?
Should this be explained in docs or comments? Do we need an alternate progress tracking mode?
| tables, _ := ghostferry.LoadTables(t.Ferry.SourceDB, tableFilter, nil, nil, nil, nil) | ||
|
|
||
| t.unsortedTables = make(map[*ghostferry.TableSchema]uint64, len(tables)) | ||
| t.unsortedTables = make(map[*ghostferry.TableSchema]ghostferry.PaginationKey, len(tables)) |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Would it make sense to validate the collation to be binary on the PK column here or in LoadTables? Some quick analysis with claude showed that a non-binary collation is unlikely to cause data loss and will just fail the move but the errors could be unintuitive to debug.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Lines 140 to 145 in b01bcec
| if paginationKeypos.Compare(c.lastSuccessfulPaginationKey) <= 0 { | |
| tx.Rollback() | |
| err = fmt.Errorf("new paginationKeypos %s <= lastSuccessfulPaginationKey %s", paginationKeypos.String(), c.lastSuccessfulPaginationKey.String()) | |
| c.logger.WithError(err).Errorf("last successful paginationKey position did not advance") | |
| return err | |
| } |
It seems to be theoretically possible to either lose data or fail with last successful paginationKey position did not advance when go bytes.Compare differs from MySQL collation.
Validation to fail earlier and with a clear message added on a new branch: https://github.com/Shopify/ghostferry/compare/uuid-as-id...grodowski/uuid-as-id?expand=1
Ruby integration test with claude to prove the did not advance condition
def test_safety_check_catches_ordering_mismatch
source_db.query("CREATE DATABASE IF NOT EXISTS #{DEFAULT_DB}")
source_db.query("CREATE TABLE #{DEFAULT_FULL_TABLE_NAME} (id VARCHAR(10) COLLATE utf8mb4_unicode_ci NOT NULL PRIMARY KEY, data TEXT)")
200.times { |i| source_db.query("INSERT INTO #{DEFAULT_FULL_TABLE_NAME} VALUES ('b#{i.to_s.rjust(3, '0')}', 'data#{i}')") }
200.times { |i| source_db.query("INSERT INTO #{DEFAULT_FULL_TABLE_NAME} VALUES ('C#{i.to_s.rjust(3, '0')}', 'upper#{i}')") }
source_db.query("INSERT INTO #{DEFAULT_FULL_TABLE_NAME} VALUES ('z999', 'sentinel')")
target_db.query("CREATE DATABASE IF NOT EXISTS #{DEFAULT_DB}")
target_db.query("CREATE TABLE #{DEFAULT_FULL_TABLE_NAME} (id VARCHAR(10) COLLATE utf8mb4_unicode_ci NOT NULL PRIMARY KEY, data TEXT)")
ghostferry = new_ghostferry(MINIMAL_GHOSTFERRY)
ghostferry.run_expecting_failure
error_logs = ghostferry.logrus_lines.values.flatten.select { |line| line["level"] == "error" }
assert error_logs.any? { |line|
line["msg"]&.include?("paginationKey position did not advance")
}, "Expected safety check error, got: #{error_logs.inspect}"
endThe potential data loss: when the last batch keys are all < than previous batch.
def test_silent_data_loss
source_db.query("CREATE DATABASE IF NOT EXISTS #{DEFAULT_DB}")
source_db.query("CREATE TABLE #{DEFAULT_FULL_TABLE_NAME} (id VARCHAR(10) COLLATE utf8mb4_unicode_ci NOT NULL PRIMARY KEY, data TEXT)")
200.times { |i| source_db.query("INSERT INTO #{DEFAULT_FULL_TABLE_NAME} VALUES ('b#{i.to_s.rjust(3, '0')}', 'data#{i}')") }
200.times { |i| source_db.query("INSERT INTO #{DEFAULT_FULL_TABLE_NAME} VALUES ('C#{i.to_s.rjust(3, '0')}', 'upper#{i}')") }
target_db.query("CREATE DATABASE IF NOT EXISTS #{DEFAULT_DB}")
target_db.query("CREATE TABLE #{DEFAULT_FULL_TABLE_NAME} (id VARCHAR(10) COLLATE utf8mb4_unicode_ci NOT NULL PRIMARY KEY, data TEXT)")
ghostferry = new_ghostferry(MINIMAL_GHOSTFERRY)
ghostferry.run
source_count = source_db.query("SELECT COUNT(*) as count FROM #{DEFAULT_FULL_TABLE_NAME}").first["count"].to_i
target_count = target_db.query("SELECT COUNT(*) as count FROM #{DEFAULT_FULL_TABLE_NAME}").first["count"].to_i
assert_equal 400, source_count
assert_equal 200, target_count, "Expected silent data loss: 200 Cherry rows never copied"
endPlease poke holes in this.
|
|
||
| func setupSingleTableDatabase(f *testhelpers.TestFerry, sourceDB, targetDB *sql.DB) { | ||
| testhelpers.SeedInitialData(sourceDB, "gftest", "table1", 1000) | ||
| testhelpers.SeedInitialData(sourceDB, "gftest", "table1", 100) |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
What's the reason behind lowering? Could hide some issues since it's less than default batch size
| SQLValue() interface{} | ||
| Compare(other PaginationKey) int | ||
| NumericPosition() float64 | ||
| String() string | ||
| MarshalJSON() ([]byte, error) | ||
| IsMax() bool | ||
| } |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
A few comments would be helpful, it's not quite clear from the names what they refer to.
| switch c.paginationKeyColumn.Type { | ||
| case schema.TYPE_NUMBER, schema.TYPE_MEDIUM_INT: | ||
| var value uint64 | ||
| value, err = lastRowData.GetUint64(paginationKeyIndex) | ||
| if err != nil { | ||
| logger.WithError(err).Error("failed to get uint64 paginationKey value") | ||
| return | ||
| } | ||
| paginationKeypos = NewUint64Key(value) | ||
|
|
||
| case schema.TYPE_BINARY, schema.TYPE_STRING: | ||
| valueInterface := lastRowData[paginationKeyIndex] | ||
|
|
||
| var valueBytes []byte | ||
| switch v := valueInterface.(type) { | ||
| case []byte: | ||
| valueBytes = v | ||
| case string: | ||
| valueBytes = []byte(v) | ||
| default: | ||
| err = fmt.Errorf("expected binary pagination key to be []byte or string, got %T", valueInterface) | ||
| logger.WithError(err).Error("failed to get binary paginationKey value") | ||
| return | ||
| } | ||
|
|
||
| paginationKeypos = NewBinaryKey(valueBytes) | ||
|
|
||
| default: | ||
| // Fallback for other integer types | ||
| var value uint64 | ||
| value, err = lastRowData.GetUint64(paginationKeyIndex) | ||
| if err != nil { | ||
| logger.WithError(err).Error("failed to get uint64 paginationKey value") | ||
| return | ||
| } | ||
| paginationKeypos = NewUint64Key(value) |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
There's several similar copies of this block, it might be worth creating a function NewPaginationKeyFromValue(input interface{}) (PaginationKey, error) in pagination.go to encapsulate all this.
This commit adds support for binary pagination keys (UUID, VARBINARY, CHAR types) to Ghostferry, previously limited to uint64/BIGINT columns.
This is the AI summary of changes.
Key Changes
{"type": "uint64", "value": 123}
{"type": "binary", "value": "deadbeef..."}
Areas Requiring Careful Review
🔴 Critical: Uniqueness Requirement
The pagination column MUST have unique values for data integrity. The algorithm uses:
WHERE pagination_key > last_key ORDER BY pagination_key LIMIT batch_size
If duplicates exist at batch boundaries, rows will be skipped → data loss during migration.
Recommendation: Consider adding a validation step during initialization that checks for unique constraints on the pagination
column, or at minimum log a warning if no unique constraint is detected.
🟡 REST API Conversion
The /api/status endpoint converts all pagination keys to uint64 (progress.go:13-20, ferry.go:1027):
🟢 Binary Key Comparison
BinaryKey.Compare() uses bytes.Compare() for lexicographic ordering:
🟢 State Resume Logic
State serialization is well-tested but verify:
Testing
Run all Go tests
./bin/gotestsum --format short-verbose ./test/go
Run interrupt/resume integration tests with binary keys
ruby -Itest test/integration/interrupt_resume_test.rb
Key test files:
Migration Path
For existing Ghostferry users:
Files changed: 28 files, +1920 insertions, -280 deletions