From 7e5029b52ecef834237c7baf7662a6a9af7e9127 Mon Sep 17 00:00:00 2001 From: Ovidiu Gheorghioiu Date: Thu, 3 Dec 2015 16:10:02 -0800 Subject: [PATCH 1/3] Add Ruby implementation of Kafka's partition hasher (based on key). Test that it matches results from Java. Not hooked up yet (future commits), but can already be used manually by a client. --- lib/poseidon/partition_hasher.rb | 63 ++++++++++++++++++++++++++++++ spec/unit/HashingTest.java | 26 ++++++++++++ spec/unit/partition_hasher_spec.rb | 30 ++++++++++++++ 3 files changed, 119 insertions(+) create mode 100644 lib/poseidon/partition_hasher.rb create mode 100644 spec/unit/HashingTest.java create mode 100644 spec/unit/partition_hasher_spec.rb diff --git a/lib/poseidon/partition_hasher.rb b/lib/poseidon/partition_hasher.rb new file mode 100644 index 0000000..309e6ba --- /dev/null +++ b/lib/poseidon/partition_hasher.rb @@ -0,0 +1,63 @@ +# Spec'ed by: +# https://github.com/apache/kafka/blob/0.8.2/clients/src/main/java/org/apache/kafka/common/utils/Utils.java#L244 (Java) +# https://github.com/mumrah/kafka-python/blob/master/kafka/partitioner/hashed.py#L40 (Python client) + +module Poseidon + class PartitionHasher + def initialize(num_partitions) + @partitions = num_partitions + end + + def partition_for(key) + # Java does 32-bit abs, followed by positive modulus. + (PartitionHasher.murmur2_hash(key) & 0x7fffffff) % @partitions + end + + private + + def self.murmur2_hash(str) + data = str.to_s.bytes + + length = data.length + # Magic MurMur constants. + seed = 0x9747b28c + m = 0x5bd1e995 + r = 24 + h = seed ^ length + restrict32 = lambda { |x| x & 0xffffffff } + + i4 = 0 + while i4 + 4 <= length do + k = data[i4 + 0] + (data[i4 + 1] << 8) + (data[i4 + 2] << 16) \ + + (data[i4 + 3] << 24) + k = restrict32[k * m] + k ^= k >> r # triple ("unsigned") shift not needed in Ruby + k = restrict32[k * m] + h = restrict32[h * m] + h ^= k + i4 += 4 + end + + # Handle the last few bytes of the input array. Java does this with a Duff + # device, oh well. + h ^= data[i4 + 2] << 16 if length > i4 + 2 + h ^= data[i4 + 1] << 8 if length > i4 + 1 + if length > i4 + h ^= data[i4] & 0xff + h = restrict32[h * m] + end + h ^= h >> 13 + h = restrict32[h * m] + h ^= h >> 15 + return h + end + end +end + + +# FIXME +if __FILE__ == $PROGRAM_NAME + hasher = Poseidon::PartitionHasher.new 50 + puts "foobar: #{hasher.partition_for('foobar')}" + puts "baz: #{hasher.partition_for('baz')}" +end diff --git a/spec/unit/HashingTest.java b/spec/unit/HashingTest.java new file mode 100644 index 0000000..ce04daa --- /dev/null +++ b/spec/unit/HashingTest.java @@ -0,0 +1,26 @@ +// Class for verifyinng hash functions agains Java's implementation. +// +// It's intentionally free of any build environment baggage. To run: +// +// (export CLASSPATH=/usr/local/...YADA-YADA.../kafka-clients-0.8.X.X.jar:. ; javac HashingTest.java && java HashingTest) + +import org.apache.kafka.common.utils.Utils; + +public class HashingTest { + public final static int PARTITIONS = 50; + public final static String[] KEYS = { + "foobar", "baz", "rm-foo", "yello", "poseidon says hi", + // Unicode. + "Здравствуй, мир!", "你好世界", "こんにちは世界", + // Corner cases. + "", " ", " ", " ", " " + }; + + public static void main(String[] args) throws Exception { + if (args.length > 0) throw new RuntimeException("No args allowed"); + for (String k : KEYS) { + int part = Utils.abs(Utils.murmur2(k.getBytes("UTF-8"))) % PARTITIONS; + System.out.println("'" + k + "' => " + part + " , "); + } + } +} diff --git a/spec/unit/partition_hasher_spec.rb b/spec/unit/partition_hasher_spec.rb new file mode 100644 index 0000000..eb453ea --- /dev/null +++ b/spec/unit/partition_hasher_spec.rb @@ -0,0 +1,30 @@ +# coding: utf-8 +require 'spec_helper' +require 'poseidon/partition_hasher' + +RSpec.describe PartitionHasher do + it 'should spread between 50 buckets matching Java implementation' do + # The values below are generated using the Java Kafka client, via the + # test class HashingTest.java. See instructions therein; it generates + # lines that can be pasted right in here. + test_cases = {'foobar' => 16 , + 'baz' => 30 , + 'rm-foo' => 45 , + 'yello' => 43 , + 'poseidon says hi' => 29 , + 'Здравствуй, мир!' => 37 , + '你好世界' => 37 , + 'こんにちは世界' => 12 , + '' => 31 , + ' ' => 21 , + ' ' => 5 , + ' ' => 2 , + ' ' => 44 , + } + hasher = PartitionHasher.new 50 + test_cases.each do |key, expected_part| + part = hasher.partition_for key + expect(part).to eq(expected_part) + end + end +end From 31d2c724394b670f1346cd63e3180fd52dce76fa Mon Sep 17 00:00:00 2001 From: Ovidiu Gheorghioiu Date: Mon, 7 Dec 2015 10:43:02 -0800 Subject: [PATCH 2/3] Remove leftover test code from library. Add test for one partition case. --- lib/poseidon/partition_hasher.rb | 8 -------- spec/unit/partition_hasher_spec.rb | 10 ++++++++++ 2 files changed, 10 insertions(+), 8 deletions(-) diff --git a/lib/poseidon/partition_hasher.rb b/lib/poseidon/partition_hasher.rb index 309e6ba..65053a8 100644 --- a/lib/poseidon/partition_hasher.rb +++ b/lib/poseidon/partition_hasher.rb @@ -53,11 +53,3 @@ def self.murmur2_hash(str) end end end - - -# FIXME -if __FILE__ == $PROGRAM_NAME - hasher = Poseidon::PartitionHasher.new 50 - puts "foobar: #{hasher.partition_for('foobar')}" - puts "baz: #{hasher.partition_for('baz')}" -end diff --git a/spec/unit/partition_hasher_spec.rb b/spec/unit/partition_hasher_spec.rb index eb453ea..33e3bc7 100644 --- a/spec/unit/partition_hasher_spec.rb +++ b/spec/unit/partition_hasher_spec.rb @@ -27,4 +27,14 @@ expect(part).to eq(expected_part) end end + + it 'should handle a one partition case well' do + rnd = Random.new RSpec.configuration.seed + hasher = PartitionHasher.new 1 + 100.times do |size| + key = rnd.bytes(size) + expect(hasher.partition_for key).to eq(0) + end + end + end From 4a62e0b28dc453c56d83c9230ba10bc1431ccde4 Mon Sep 17 00:00:00 2001 From: Ovidiu Gheorghioiu Date: Wed, 13 Jan 2016 13:02:40 -0800 Subject: [PATCH 3/3] Fix usage of String.bytes (Enumerator in 1.9.3 and jruby) to pass CI. --- lib/poseidon/partition_hasher.rb | 3 ++- 1 file changed, 2 insertions(+), 1 deletion(-) diff --git a/lib/poseidon/partition_hasher.rb b/lib/poseidon/partition_hasher.rb index 65053a8..3826d04 100644 --- a/lib/poseidon/partition_hasher.rb +++ b/lib/poseidon/partition_hasher.rb @@ -3,6 +3,7 @@ # https://github.com/mumrah/kafka-python/blob/master/kafka/partitioner/hashed.py#L40 (Python client) module Poseidon + # Partition hasher compatible with the Java Client. class PartitionHasher def initialize(num_partitions) @partitions = num_partitions @@ -16,7 +17,7 @@ def partition_for(key) private def self.murmur2_hash(str) - data = str.to_s.bytes + data = str.to_s.bytes.to_a length = data.length # Magic MurMur constants.