Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
4 changes: 2 additions & 2 deletions lib/poseidon/broker_pool.rb
Original file line number Diff line number Diff line change
@@ -1,6 +1,6 @@
module Poseidon
# BrokerPool allows you to send api calls to the a brokers Connection.
#
#
# @api private
class BrokerPool
class UnknownBroker < StandardError; end
Expand All @@ -26,7 +26,7 @@ def initialize(client_id, seed_brokers, socket_timeout_ms)
def fetch_metadata(topics)
@seed_brokers.each do |broker|
if metadata = fetch_metadata_from_broker(broker, topics)
Poseidon.logger.debug { "Fetched metadata\n" + metadata.to_s }
Poseidon.logger.debug { "Fetched metadata from #{broker}:\n" + metadata.to_s }
return metadata
end
end
Expand Down
5 changes: 5 additions & 0 deletions lib/poseidon/cluster_metadata.rb
Original file line number Diff line number Diff line change
Expand Up @@ -71,6 +71,11 @@ def to_s
out
end

def reset
@brokers = {}
@topic_metadata = {}
end

private
def update_topics(topics)
topics.each do |topic|
Expand Down
13 changes: 10 additions & 3 deletions lib/poseidon/sync_producer.rb
Original file line number Diff line number Diff line change
Expand Up @@ -58,7 +58,8 @@ def send_messages(messages)
break
else
Kernel.sleep retry_backoff_ms / 1000.0
refresh_metadata(messages_to_send.topic_set)
reset_metadata
ensure_metadata_available_for_topics(messages_to_send)
end
end

Expand All @@ -80,14 +81,14 @@ def close
def ensure_metadata_available_for_topics(messages_to_send)
return if !messages_to_send.needs_metadata?

Poseidon.logger.debug { "Fetching metadata for #{messages_to_send.topic_set}. (Attempt 1)" }
Poseidon.logger.debug { "Fetching metadata for #{messages_to_send.topic_set.inspect}. (Attempt 1)" }
refresh_metadata(messages_to_send.topic_set)
return if !messages_to_send.needs_metadata?

2.times do |n|
sleep 5

Poseidon.logger.debug { "Fetching metadata for #{messages_to_send.topic_set}. (Attempt #{n+2})" }
Poseidon.logger.debug { "Fetching metadata for #{messages_to_send.topic_set.inspect}. (Attempt #{n+2})" }
refresh_metadata(messages_to_send.topic_set)
return if !messages_to_send.needs_metadata?
end
Expand Down Expand Up @@ -134,6 +135,12 @@ def refresh_metadata(topics)
@broker_pool.update_known_brokers(@cluster_metadata.brokers)
end

def reset_metadata
Poseidon.logger.debug { "Resetting metdata" }
@cluster_metadata.reset
@broker_pool.close
end

def send_to_broker(messages_for_broker)
return false if messages_for_broker.broker_id == -1
to_send = messages_for_broker.build_protocol_objects(@compression_config)
Expand Down
4 changes: 2 additions & 2 deletions lib/poseidon/topic_metadata.rb
Original file line number Diff line number Diff line change
Expand Up @@ -35,7 +35,7 @@ def ==(o)
end

def exists?
struct.error == 0
struct.error == Errors::NO_ERROR_CODE
end

def eql?(o)
Expand All @@ -56,7 +56,7 @@ def partition_count

def available_partitions
@available_partitions ||= struct.partitions.select do |partition|
partition.error == 0 && partition.leader != -1
(partition.error == Errors::NO_ERROR_CODE || partition.error_class == Errors::ReplicaNotAvailable) && partition.leader != -1
end
end

Expand Down
1 change: 1 addition & 0 deletions poseidon.gemspec
Original file line number Diff line number Diff line change
Expand Up @@ -23,4 +23,5 @@ Gem::Specification.new do |gem|
gem.add_development_dependency(%q<yard>)
gem.add_development_dependency(%q<simplecov>)
gem.add_development_dependency(%q<snappy>)
gem.add_development_dependency(%q<timecop>)
end
2 changes: 2 additions & 0 deletions spec/integration/multiple_brokers/consumer_spec.rb
Original file line number Diff line number Diff line change
@@ -1,6 +1,8 @@
require 'integration/multiple_brokers/spec_helper'

RSpec.describe "consuming with multiple brokers", :type => :request do
include_context "a multiple broker cluster"

before(:each) do
# autocreate the topic by asking for information about it
c = Connection.new("localhost", 9092, "metadata_fetcher", 10_000)
Expand Down
109 changes: 109 additions & 0 deletions spec/integration/multiple_brokers/metadata_failures_spec.rb
Original file line number Diff line number Diff line change
@@ -1,6 +1,22 @@
require 'integration/multiple_brokers/spec_helper'

# Created because you can't use a block form for receive with and_call_original
# https://github.com/rspec/rspec-mocks/issues/774
RSpec::Matchers.define :a_broker_id_of do |id|
match { |actual| actual.broker_id == id }
end

RSpec::Matchers.define :a_broker_id_not do |id|
match { |actual| actual.broker_id != id }
end

RSpec::Matchers.define :needing_metadata do |val|
match { |actual| actual.send(:needs_metadata?) == val }
end

RSpec.describe "handling failures", :type => :request do
include_context "a multiple broker cluster"

describe "metadata failures" do
before(:each) do
@messages_to_send = [
Expand Down Expand Up @@ -32,4 +48,97 @@
}.to raise_error(Poseidon::Errors::UnableToFetchMetadata)
end
end

describe "leader node loss" do
let(:topic) { "testing" }
let(:kafka_partitions) { 1 }

it "is still able to send messages" do
@p = Producer.new(["localhost:9092","localhost:9093","localhost:9094"], "producer", :required_acks => 1)

# Send one to force topic creation
expect(@p.send_messages([MessageToSend.new(topic, "hello")])).to be_truthy

@producer = @p.instance_variable_get(:@producer)
@cluster_metadata = @producer.instance_variable_get(:@cluster_metadata)
topic_metadata = @cluster_metadata.metadata_for_topics([topic])[topic]

expect(topic_metadata.available_partitions.length).to be(kafka_partitions)

# Now, lets kill the topic leader
leader = topic_metadata.available_partitions.first.leader
broker = $tc.brokers[topic_metadata.available_partitions.first.leader]
expect(broker.id).to be(leader)

broker.without_process do
expect(@cluster_metadata.metadata_for_topics([topic])[topic].available_partitions.first.leader).to be(leader)
expect(@producer.send(:refresh_interval_elapsed?)).to be_falsy

# Setup expectations that the consumer updates its info
expect(@producer).to receive(:ensure_metadata_available_for_topics).with(needing_metadata(false)).ordered.and_call_original
expect(@producer).to receive(:send_to_broker).with(a_broker_id_of(leader)).ordered.and_call_original

expect(@producer).to receive(:reset_metadata).ordered.and_call_original
expect(@producer).to receive(:ensure_metadata_available_for_topics).ordered.and_call_original
expect(@producer).to receive(:send_to_broker).with(a_broker_id_not(leader)).ordered.and_call_original

expect(@p.send_messages([MessageToSend.new(topic, "hello")])).to be_truthy
end
end
end

describe "partition replica loss" do
let(:topic) { "testing" }
let(:kafka_partitions) { 1 }

it "refreshes metadata correctly" do
@p = Producer.new(["localhost:9092","localhost:9093","localhost:9094"], "producer", :required_acks => 1)

# Send one to force topic creation
expect(@p.send_messages([MessageToSend.new(topic, "hello")])).to be_truthy

@producer = @p.instance_variable_get(:@producer)
@cluster_metadata = @producer.instance_variable_get(:@cluster_metadata)
topic_metadata = @cluster_metadata.metadata_for_topics([topic])[topic]

expect(topic_metadata.available_partitions.length).to be(kafka_partitions)
expect(topic_metadata.available_partitions.first.error).to be(0)

# Now, lets kill the topic leader
partition_metadata = topic_metadata.available_partitions.first
expect(partition_metadata.replicas.length).to be(2)
leader = partition_metadata.leader
replica = (partition_metadata.replicas - [leader]).first

broker = $tc.brokers[replica]
expect(broker.id).to_not be(partition_metadata.leader)
expect(broker.id).to be(replica)

broker.without_process do
expect(@cluster_metadata.metadata_for_topics([topic])[topic].available_partitions.first.replicas).to include(replica)
expect(@producer.send(:refresh_interval_elapsed?)).to be_falsy

Timecop.travel(@producer.metadata_refresh_interval_ms) do
expect(@producer.send(:refresh_interval_elapsed?)).to be_truthy

# Setup expectations that the consumer updates its info
expect(@producer).to receive(:refresh_metadata).with(Set.new([topic])).ordered.and_call_original
expect(@producer).to receive(:ensure_metadata_available_for_topics).with(needing_metadata(false)).ordered.and_call_original
expect(@producer).to receive(:send_to_broker).with(a_broker_id_of(partition_metadata.leader)).ordered.and_call_original

# Make sure we don't error out
expect(@producer).to_not receive(:reset_metadata)

expect(@p.send_messages([MessageToSend.new(topic, "hello")])).to be_truthy

# Check the valid metadata
updated_topic_metadata = @cluster_metadata.metadata_for_topics([topic])[topic]
expect(updated_topic_metadata.available_partitions.length).to be(kafka_partitions)
expect(updated_topic_metadata.available_partitions.first.leader).to be(leader)
expect(updated_topic_metadata.available_partitions.first.replicas).to eq([leader])
expect(updated_topic_metadata.available_partitions.first.error).to be(9)
end
end
end
end
end
2 changes: 2 additions & 0 deletions spec/integration/multiple_brokers/rebalance_spec.rb
Original file line number Diff line number Diff line change
@@ -1,6 +1,8 @@
require 'integration/multiple_brokers/spec_helper'

RSpec.describe "producer handles rebalancing", :type => :request do
include_context "a multiple broker cluster"

before(:each) do
# autocreate the topic by asking for information about it
@c = Connection.new("localhost", 9093, "metadata_fetcher", 10_000)
Expand Down
2 changes: 2 additions & 0 deletions spec/integration/multiple_brokers/round_robin_spec.rb
Original file line number Diff line number Diff line change
@@ -1,6 +1,8 @@
require 'integration/multiple_brokers/spec_helper'

RSpec.describe "round robin sending", :type => :request do
include_context "a multiple broker cluster"

describe "with small message batches" do
it "evenly distributes messages across brokers" do
c = Connection.new("localhost", 9092, "metadata_fetcher", 10_000)
Expand Down
16 changes: 10 additions & 6 deletions spec/integration/multiple_brokers/spec_helper.rb
Original file line number Diff line number Diff line change
Expand Up @@ -3,10 +3,12 @@
require 'test_cluster'

class ThreeBrokerCluster
def initialize(properties = {})
attr_reader :brokers, :zookeeper

def initialize(partitions, properties = {})
@zookeeper = ZookeeperRunner.new
@brokers = (9092..9094).map { |port| BrokerRunner.new(port - 9092, port,
3,
partitions,
2,
properties) }
end
Expand Down Expand Up @@ -40,17 +42,19 @@ def start_first_broker
end
end

RSpec.configure do |config|
config.before(:each) do
RSpec.shared_context "a multiple broker cluster" do
let(:kafka_partitions) { 3 }

before(:each) do
JavaRunner.remove_tmp
JavaRunner.set_kafka_path!
$tc = ThreeBrokerCluster.new
$tc = ThreeBrokerCluster.new(kafka_partitions)
$tc.start
SPEC_LOGGER.info "Waiting on cluster"
sleep 10 # wait for cluster to come up
end

config.after(:each) do
after(:each) do
$tc.stop if $tc
end
end
2 changes: 2 additions & 0 deletions spec/integration/simple/compression_spec.rb
Original file line number Diff line number Diff line change
@@ -1,6 +1,8 @@
require 'integration/simple/spec_helper'

RSpec.describe "compression", :type => :request do
include_context "a single broker cluster"

it "roundtrips" do
i = rand(1000)

Expand Down
2 changes: 2 additions & 0 deletions spec/integration/simple/connection_spec.rb
Original file line number Diff line number Diff line change
Expand Up @@ -2,6 +2,8 @@

include Protocol
RSpec.describe Connection, :type => :request do
include_context "a single broker cluster"

before(:each) do
@connection = Connection.new("localhost", 9092, "test", 10_000)
end
Expand Down
2 changes: 2 additions & 0 deletions spec/integration/simple/multiple_brokers_spec.rb
Original file line number Diff line number Diff line change
@@ -1,6 +1,8 @@
require 'integration/simple/spec_helper'

RSpec.describe "three brokers in cluster", :type => :request do
include_context "a single broker cluster"

describe "sending batches of 1 message" do
it "sends messages to all brokers" do
end
Expand Down
Original file line number Diff line number Diff line change
@@ -1,6 +1,7 @@
require 'integration/simple/spec_helper'

RSpec.describe "simple producer and consumer", :type => :request do
include_context "a single broker cluster"

describe "writing and consuming one topic" do
it "fetches produced messages" do
Expand Down
7 changes: 3 additions & 4 deletions spec/integration/simple/spec_helper.rb
Original file line number Diff line number Diff line change
Expand Up @@ -2,16 +2,15 @@

require 'test_cluster'

RSpec.configure do |config|
config.before(:each) do
RSpec.shared_context "a single broker cluster" do
before(:each) do
JavaRunner.remove_tmp
JavaRunner.set_kafka_path!
$tc = TestCluster.new
$tc.start
sleep 5
end

config.after(:each) do
after(:each) do
$tc.stop
end
end
2 changes: 2 additions & 0 deletions spec/integration/simple/truncated_messages_spec.rb
Original file line number Diff line number Diff line change
@@ -1,6 +1,8 @@
require 'integration/simple/spec_helper'

RSpec.describe "truncated messages", :type => :request do
include_context "a single broker cluster"

before(:each) do
@s1 = "a" * 335
@s2 = "b" * 338
Expand Down
2 changes: 2 additions & 0 deletions spec/integration/simple/unavailable_broker_spec.rb
Original file line number Diff line number Diff line change
@@ -1,6 +1,8 @@
require 'integration/simple/spec_helper'

RSpec.describe "unavailable broker scenarios:", :type => :request do
include_context "a single broker cluster"

context "producer with a dead broker in bootstrap list" do
before(:each) do
@p = Producer.new(["localhost:9091","localhost:9092"], "test")
Expand Down
3 changes: 3 additions & 0 deletions spec/spec_helper.rb
Original file line number Diff line number Diff line change
Expand Up @@ -27,3 +27,6 @@

require 'coveralls'
Coveralls.wear!

require 'timecop'
Timecop.safe_mode = true
7 changes: 5 additions & 2 deletions spec/test_cluster.rb
Original file line number Diff line number Diff line change
Expand Up @@ -8,6 +8,7 @@ def initialize
def start
@zookeeper.start
@broker.start
sleep 5
end

def stop
Expand Down Expand Up @@ -153,10 +154,12 @@ class BrokerRunner
"leader.imbalance.check.interval.seconds" => 5
}

attr_reader :id

def initialize(id, port, partition_count = 1, replication_factor = 1, properties = {})
@id = id
@port = port
@jr = JavaRunner.new("broker_#{id}",
@jr = JavaRunner.new("broker_#{id}",
"#{ENV['KAFKA_PATH']}/bin/kafka-run-class.sh -daemon -name broker_#{id} kafka.Kafka",
"ps ax | grep -i 'kafka\.Kafka' | grep java | grep broker_#{id} | grep -v grep | awk '{print $1}'",
"SIGTERM",
Expand Down Expand Up @@ -197,7 +200,7 @@ def initialize
def pid
@jr.pid
end

def start
@jr.start
end
Expand Down
Loading