Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
56 changes: 56 additions & 0 deletions lib/poseidon/partition_hasher.rb
Original file line number Diff line number Diff line change
@@ -0,0 +1,56 @@
# Spec'ed by:
# https://github.com/apache/kafka/blob/0.8.2/clients/src/main/java/org/apache/kafka/common/utils/Utils.java#L244 (Java)
# https://github.com/mumrah/kafka-python/blob/master/kafka/partitioner/hashed.py#L40 (Python client)

module Poseidon
# Partition hasher compatible with the Java Client.
class PartitionHasher
def initialize(num_partitions)
@partitions = num_partitions
end

def partition_for(key)
# Java does 32-bit abs, followed by positive modulus.
(PartitionHasher.murmur2_hash(key) & 0x7fffffff) % @partitions
end

private

def self.murmur2_hash(str)
data = str.to_s.bytes.to_a

length = data.length
# Magic MurMur constants.
seed = 0x9747b28c
m = 0x5bd1e995
r = 24
h = seed ^ length
restrict32 = lambda { |x| x & 0xffffffff }

i4 = 0
while i4 + 4 <= length do
k = data[i4 + 0] + (data[i4 + 1] << 8) + (data[i4 + 2] << 16) \
+ (data[i4 + 3] << 24)
k = restrict32[k * m]
k ^= k >> r # triple ("unsigned") shift not needed in Ruby
k = restrict32[k * m]
h = restrict32[h * m]
h ^= k
i4 += 4
end

# Handle the last few bytes of the input array. Java does this with a Duff
# device, oh well.
h ^= data[i4 + 2] << 16 if length > i4 + 2
h ^= data[i4 + 1] << 8 if length > i4 + 1
if length > i4
h ^= data[i4] & 0xff
h = restrict32[h * m]
end
h ^= h >> 13
h = restrict32[h * m]
h ^= h >> 15
return h
end
end
end
26 changes: 26 additions & 0 deletions spec/unit/HashingTest.java
Original file line number Diff line number Diff line change
@@ -0,0 +1,26 @@
// Class for verifyinng hash functions agains Java's implementation.
//
// It's intentionally free of any build environment baggage. To run:
//
// (export CLASSPATH=/usr/local/...YADA-YADA.../kafka-clients-0.8.X.X.jar:. ; javac HashingTest.java && java HashingTest)

import org.apache.kafka.common.utils.Utils;

public class HashingTest {
public final static int PARTITIONS = 50;
public final static String[] KEYS = {
"foobar", "baz", "rm-foo", "yello", "poseidon says hi",
// Unicode.
"Здравствуй, мир!", "你好世界", "こんにちは世界",
// Corner cases.
"", " ", " ", " ", " "
};

public static void main(String[] args) throws Exception {
if (args.length > 0) throw new RuntimeException("No args allowed");
for (String k : KEYS) {
int part = Utils.abs(Utils.murmur2(k.getBytes("UTF-8"))) % PARTITIONS;
System.out.println("'" + k + "' => " + part + " , ");
}
}
}
40 changes: 40 additions & 0 deletions spec/unit/partition_hasher_spec.rb
Original file line number Diff line number Diff line change
@@ -0,0 +1,40 @@
# coding: utf-8
require 'spec_helper'
require 'poseidon/partition_hasher'

RSpec.describe PartitionHasher do
it 'should spread between 50 buckets matching Java implementation' do
# The values below are generated using the Java Kafka client, via the
# test class HashingTest.java. See instructions therein; it generates
# lines that can be pasted right in here.
test_cases = {'foobar' => 16 ,
'baz' => 30 ,
'rm-foo' => 45 ,
'yello' => 43 ,
'poseidon says hi' => 29 ,
'Здравствуй, мир!' => 37 ,
'你好世界' => 37 ,
'こんにちは世界' => 12 ,
'' => 31 ,
' ' => 21 ,
' ' => 5 ,
' ' => 2 ,
' ' => 44 ,
}
hasher = PartitionHasher.new 50
test_cases.each do |key, expected_part|
part = hasher.partition_for key
expect(part).to eq(expected_part)
end
end

it 'should handle a one partition case well' do
rnd = Random.new RSpec.configuration.seed
hasher = PartitionHasher.new 1
100.times do |size|
key = rnd.bytes(size)
expect(hasher.partition_for key).to eq(0)
end
end

end