diff --git a/lib/poseidon/partition_hasher.rb b/lib/poseidon/partition_hasher.rb new file mode 100644 index 0000000..3826d04 --- /dev/null +++ b/lib/poseidon/partition_hasher.rb @@ -0,0 +1,56 @@ +# Spec'ed by: +# https://github.com/apache/kafka/blob/0.8.2/clients/src/main/java/org/apache/kafka/common/utils/Utils.java#L244 (Java) +# https://github.com/mumrah/kafka-python/blob/master/kafka/partitioner/hashed.py#L40 (Python client) + +module Poseidon + # Partition hasher compatible with the Java Client. + class PartitionHasher + def initialize(num_partitions) + @partitions = num_partitions + end + + def partition_for(key) + # Java does 32-bit abs, followed by positive modulus. + (PartitionHasher.murmur2_hash(key) & 0x7fffffff) % @partitions + end + + private + + def self.murmur2_hash(str) + data = str.to_s.bytes.to_a + + length = data.length + # Magic MurMur constants. + seed = 0x9747b28c + m = 0x5bd1e995 + r = 24 + h = seed ^ length + restrict32 = lambda { |x| x & 0xffffffff } + + i4 = 0 + while i4 + 4 <= length do + k = data[i4 + 0] + (data[i4 + 1] << 8) + (data[i4 + 2] << 16) \ + + (data[i4 + 3] << 24) + k = restrict32[k * m] + k ^= k >> r # triple ("unsigned") shift not needed in Ruby + k = restrict32[k * m] + h = restrict32[h * m] + h ^= k + i4 += 4 + end + + # Handle the last few bytes of the input array. Java does this with a Duff + # device, oh well. + h ^= data[i4 + 2] << 16 if length > i4 + 2 + h ^= data[i4 + 1] << 8 if length > i4 + 1 + if length > i4 + h ^= data[i4] & 0xff + h = restrict32[h * m] + end + h ^= h >> 13 + h = restrict32[h * m] + h ^= h >> 15 + return h + end + end +end diff --git a/spec/unit/HashingTest.java b/spec/unit/HashingTest.java new file mode 100644 index 0000000..ce04daa --- /dev/null +++ b/spec/unit/HashingTest.java @@ -0,0 +1,26 @@ +// Class for verifyinng hash functions agains Java's implementation. +// +// It's intentionally free of any build environment baggage. To run: +// +// (export CLASSPATH=/usr/local/...YADA-YADA.../kafka-clients-0.8.X.X.jar:. ; javac HashingTest.java && java HashingTest) + +import org.apache.kafka.common.utils.Utils; + +public class HashingTest { + public final static int PARTITIONS = 50; + public final static String[] KEYS = { + "foobar", "baz", "rm-foo", "yello", "poseidon says hi", + // Unicode. + "Здравствуй, мир!", "你好世界", "こんにちは世界", + // Corner cases. + "", " ", " ", " ", " " + }; + + public static void main(String[] args) throws Exception { + if (args.length > 0) throw new RuntimeException("No args allowed"); + for (String k : KEYS) { + int part = Utils.abs(Utils.murmur2(k.getBytes("UTF-8"))) % PARTITIONS; + System.out.println("'" + k + "' => " + part + " , "); + } + } +} diff --git a/spec/unit/partition_hasher_spec.rb b/spec/unit/partition_hasher_spec.rb new file mode 100644 index 0000000..33e3bc7 --- /dev/null +++ b/spec/unit/partition_hasher_spec.rb @@ -0,0 +1,40 @@ +# coding: utf-8 +require 'spec_helper' +require 'poseidon/partition_hasher' + +RSpec.describe PartitionHasher do + it 'should spread between 50 buckets matching Java implementation' do + # The values below are generated using the Java Kafka client, via the + # test class HashingTest.java. See instructions therein; it generates + # lines that can be pasted right in here. + test_cases = {'foobar' => 16 , + 'baz' => 30 , + 'rm-foo' => 45 , + 'yello' => 43 , + 'poseidon says hi' => 29 , + 'Здравствуй, мир!' => 37 , + '你好世界' => 37 , + 'こんにちは世界' => 12 , + '' => 31 , + ' ' => 21 , + ' ' => 5 , + ' ' => 2 , + ' ' => 44 , + } + hasher = PartitionHasher.new 50 + test_cases.each do |key, expected_part| + part = hasher.partition_for key + expect(part).to eq(expected_part) + end + end + + it 'should handle a one partition case well' do + rnd = Random.new RSpec.configuration.seed + hasher = PartitionHasher.new 1 + 100.times do |size| + key = rnd.bytes(size) + expect(hasher.partition_for key).to eq(0) + end + end + +end