diff --git a/lib/poseidon/broker_pool.rb b/lib/poseidon/broker_pool.rb index fd99b57..cbfd7da 100644 --- a/lib/poseidon/broker_pool.rb +++ b/lib/poseidon/broker_pool.rb @@ -1,6 +1,6 @@ module Poseidon # BrokerPool allows you to send api calls to the a brokers Connection. - # + # # @api private class BrokerPool class UnknownBroker < StandardError; end @@ -26,7 +26,7 @@ def initialize(client_id, seed_brokers, socket_timeout_ms) def fetch_metadata(topics) @seed_brokers.each do |broker| if metadata = fetch_metadata_from_broker(broker, topics) - Poseidon.logger.debug { "Fetched metadata\n" + metadata.to_s } + Poseidon.logger.debug { "Fetched metadata from #{broker}:\n" + metadata.to_s } return metadata end end diff --git a/lib/poseidon/cluster_metadata.rb b/lib/poseidon/cluster_metadata.rb index 101f572..79c4224 100644 --- a/lib/poseidon/cluster_metadata.rb +++ b/lib/poseidon/cluster_metadata.rb @@ -71,6 +71,11 @@ def to_s out end + def reset + @brokers = {} + @topic_metadata = {} + end + private def update_topics(topics) topics.each do |topic| diff --git a/lib/poseidon/sync_producer.rb b/lib/poseidon/sync_producer.rb index b55d4f1..07cc0ae 100644 --- a/lib/poseidon/sync_producer.rb +++ b/lib/poseidon/sync_producer.rb @@ -58,7 +58,8 @@ def send_messages(messages) break else Kernel.sleep retry_backoff_ms / 1000.0 - refresh_metadata(messages_to_send.topic_set) + reset_metadata + ensure_metadata_available_for_topics(messages_to_send) end end @@ -80,14 +81,14 @@ def close def ensure_metadata_available_for_topics(messages_to_send) return if !messages_to_send.needs_metadata? - Poseidon.logger.debug { "Fetching metadata for #{messages_to_send.topic_set}. (Attempt 1)" } + Poseidon.logger.debug { "Fetching metadata for #{messages_to_send.topic_set.inspect}. (Attempt 1)" } refresh_metadata(messages_to_send.topic_set) return if !messages_to_send.needs_metadata? 2.times do |n| sleep 5 - Poseidon.logger.debug { "Fetching metadata for #{messages_to_send.topic_set}. (Attempt #{n+2})" } + Poseidon.logger.debug { "Fetching metadata for #{messages_to_send.topic_set.inspect}. (Attempt #{n+2})" } refresh_metadata(messages_to_send.topic_set) return if !messages_to_send.needs_metadata? end @@ -134,6 +135,12 @@ def refresh_metadata(topics) @broker_pool.update_known_brokers(@cluster_metadata.brokers) end + def reset_metadata + Poseidon.logger.debug { "Resetting metdata" } + @cluster_metadata.reset + @broker_pool.close + end + def send_to_broker(messages_for_broker) return false if messages_for_broker.broker_id == -1 to_send = messages_for_broker.build_protocol_objects(@compression_config) diff --git a/lib/poseidon/topic_metadata.rb b/lib/poseidon/topic_metadata.rb index 3265c4b..cbce207 100644 --- a/lib/poseidon/topic_metadata.rb +++ b/lib/poseidon/topic_metadata.rb @@ -35,7 +35,7 @@ def ==(o) end def exists? - struct.error == 0 + struct.error == Errors::NO_ERROR_CODE end def eql?(o) @@ -56,7 +56,7 @@ def partition_count def available_partitions @available_partitions ||= struct.partitions.select do |partition| - partition.error == 0 && partition.leader != -1 + (partition.error == Errors::NO_ERROR_CODE || partition.error_class == Errors::ReplicaNotAvailable) && partition.leader != -1 end end diff --git a/poseidon.gemspec b/poseidon.gemspec index 0a397c6..08060ab 100644 --- a/poseidon.gemspec +++ b/poseidon.gemspec @@ -23,4 +23,5 @@ Gem::Specification.new do |gem| gem.add_development_dependency(%q) gem.add_development_dependency(%q) gem.add_development_dependency(%q) + gem.add_development_dependency(%q) end diff --git a/spec/integration/multiple_brokers/consumer_spec.rb b/spec/integration/multiple_brokers/consumer_spec.rb index 1fcb6e5..7a301e8 100644 --- a/spec/integration/multiple_brokers/consumer_spec.rb +++ b/spec/integration/multiple_brokers/consumer_spec.rb @@ -1,6 +1,8 @@ require 'integration/multiple_brokers/spec_helper' RSpec.describe "consuming with multiple brokers", :type => :request do + include_context "a multiple broker cluster" + before(:each) do # autocreate the topic by asking for information about it c = Connection.new("localhost", 9092, "metadata_fetcher", 10_000) diff --git a/spec/integration/multiple_brokers/metadata_failures_spec.rb b/spec/integration/multiple_brokers/metadata_failures_spec.rb index 2c79c16..f8b801f 100644 --- a/spec/integration/multiple_brokers/metadata_failures_spec.rb +++ b/spec/integration/multiple_brokers/metadata_failures_spec.rb @@ -1,6 +1,22 @@ require 'integration/multiple_brokers/spec_helper' +# Created because you can't use a block form for receive with and_call_original +# https://github.com/rspec/rspec-mocks/issues/774 +RSpec::Matchers.define :a_broker_id_of do |id| + match { |actual| actual.broker_id == id } +end + +RSpec::Matchers.define :a_broker_id_not do |id| + match { |actual| actual.broker_id != id } +end + +RSpec::Matchers.define :needing_metadata do |val| + match { |actual| actual.send(:needs_metadata?) == val } +end + RSpec.describe "handling failures", :type => :request do + include_context "a multiple broker cluster" + describe "metadata failures" do before(:each) do @messages_to_send = [ @@ -32,4 +48,97 @@ }.to raise_error(Poseidon::Errors::UnableToFetchMetadata) end end + + describe "leader node loss" do + let(:topic) { "testing" } + let(:kafka_partitions) { 1 } + + it "is still able to send messages" do + @p = Producer.new(["localhost:9092","localhost:9093","localhost:9094"], "producer", :required_acks => 1) + + # Send one to force topic creation + expect(@p.send_messages([MessageToSend.new(topic, "hello")])).to be_truthy + + @producer = @p.instance_variable_get(:@producer) + @cluster_metadata = @producer.instance_variable_get(:@cluster_metadata) + topic_metadata = @cluster_metadata.metadata_for_topics([topic])[topic] + + expect(topic_metadata.available_partitions.length).to be(kafka_partitions) + + # Now, lets kill the topic leader + leader = topic_metadata.available_partitions.first.leader + broker = $tc.brokers[topic_metadata.available_partitions.first.leader] + expect(broker.id).to be(leader) + + broker.without_process do + expect(@cluster_metadata.metadata_for_topics([topic])[topic].available_partitions.first.leader).to be(leader) + expect(@producer.send(:refresh_interval_elapsed?)).to be_falsy + + # Setup expectations that the consumer updates its info + expect(@producer).to receive(:ensure_metadata_available_for_topics).with(needing_metadata(false)).ordered.and_call_original + expect(@producer).to receive(:send_to_broker).with(a_broker_id_of(leader)).ordered.and_call_original + + expect(@producer).to receive(:reset_metadata).ordered.and_call_original + expect(@producer).to receive(:ensure_metadata_available_for_topics).ordered.and_call_original + expect(@producer).to receive(:send_to_broker).with(a_broker_id_not(leader)).ordered.and_call_original + + expect(@p.send_messages([MessageToSend.new(topic, "hello")])).to be_truthy + end + end + end + + describe "partition replica loss" do + let(:topic) { "testing" } + let(:kafka_partitions) { 1 } + + it "refreshes metadata correctly" do + @p = Producer.new(["localhost:9092","localhost:9093","localhost:9094"], "producer", :required_acks => 1) + + # Send one to force topic creation + expect(@p.send_messages([MessageToSend.new(topic, "hello")])).to be_truthy + + @producer = @p.instance_variable_get(:@producer) + @cluster_metadata = @producer.instance_variable_get(:@cluster_metadata) + topic_metadata = @cluster_metadata.metadata_for_topics([topic])[topic] + + expect(topic_metadata.available_partitions.length).to be(kafka_partitions) + expect(topic_metadata.available_partitions.first.error).to be(0) + + # Now, lets kill the topic leader + partition_metadata = topic_metadata.available_partitions.first + expect(partition_metadata.replicas.length).to be(2) + leader = partition_metadata.leader + replica = (partition_metadata.replicas - [leader]).first + + broker = $tc.brokers[replica] + expect(broker.id).to_not be(partition_metadata.leader) + expect(broker.id).to be(replica) + + broker.without_process do + expect(@cluster_metadata.metadata_for_topics([topic])[topic].available_partitions.first.replicas).to include(replica) + expect(@producer.send(:refresh_interval_elapsed?)).to be_falsy + + Timecop.travel(@producer.metadata_refresh_interval_ms) do + expect(@producer.send(:refresh_interval_elapsed?)).to be_truthy + + # Setup expectations that the consumer updates its info + expect(@producer).to receive(:refresh_metadata).with(Set.new([topic])).ordered.and_call_original + expect(@producer).to receive(:ensure_metadata_available_for_topics).with(needing_metadata(false)).ordered.and_call_original + expect(@producer).to receive(:send_to_broker).with(a_broker_id_of(partition_metadata.leader)).ordered.and_call_original + + # Make sure we don't error out + expect(@producer).to_not receive(:reset_metadata) + + expect(@p.send_messages([MessageToSend.new(topic, "hello")])).to be_truthy + + # Check the valid metadata + updated_topic_metadata = @cluster_metadata.metadata_for_topics([topic])[topic] + expect(updated_topic_metadata.available_partitions.length).to be(kafka_partitions) + expect(updated_topic_metadata.available_partitions.first.leader).to be(leader) + expect(updated_topic_metadata.available_partitions.first.replicas).to eq([leader]) + expect(updated_topic_metadata.available_partitions.first.error).to be(9) + end + end + end + end end diff --git a/spec/integration/multiple_brokers/rebalance_spec.rb b/spec/integration/multiple_brokers/rebalance_spec.rb index 157f4b2..d943ae6 100644 --- a/spec/integration/multiple_brokers/rebalance_spec.rb +++ b/spec/integration/multiple_brokers/rebalance_spec.rb @@ -1,6 +1,8 @@ require 'integration/multiple_brokers/spec_helper' RSpec.describe "producer handles rebalancing", :type => :request do + include_context "a multiple broker cluster" + before(:each) do # autocreate the topic by asking for information about it @c = Connection.new("localhost", 9093, "metadata_fetcher", 10_000) diff --git a/spec/integration/multiple_brokers/round_robin_spec.rb b/spec/integration/multiple_brokers/round_robin_spec.rb index 2e1b26c..13aefef 100644 --- a/spec/integration/multiple_brokers/round_robin_spec.rb +++ b/spec/integration/multiple_brokers/round_robin_spec.rb @@ -1,6 +1,8 @@ require 'integration/multiple_brokers/spec_helper' RSpec.describe "round robin sending", :type => :request do + include_context "a multiple broker cluster" + describe "with small message batches" do it "evenly distributes messages across brokers" do c = Connection.new("localhost", 9092, "metadata_fetcher", 10_000) diff --git a/spec/integration/multiple_brokers/spec_helper.rb b/spec/integration/multiple_brokers/spec_helper.rb index bf07c52..4003aa8 100644 --- a/spec/integration/multiple_brokers/spec_helper.rb +++ b/spec/integration/multiple_brokers/spec_helper.rb @@ -3,10 +3,12 @@ require 'test_cluster' class ThreeBrokerCluster - def initialize(properties = {}) + attr_reader :brokers, :zookeeper + + def initialize(partitions, properties = {}) @zookeeper = ZookeeperRunner.new @brokers = (9092..9094).map { |port| BrokerRunner.new(port - 9092, port, - 3, + partitions, 2, properties) } end @@ -40,17 +42,19 @@ def start_first_broker end end -RSpec.configure do |config| - config.before(:each) do +RSpec.shared_context "a multiple broker cluster" do + let(:kafka_partitions) { 3 } + + before(:each) do JavaRunner.remove_tmp JavaRunner.set_kafka_path! - $tc = ThreeBrokerCluster.new + $tc = ThreeBrokerCluster.new(kafka_partitions) $tc.start SPEC_LOGGER.info "Waiting on cluster" sleep 10 # wait for cluster to come up end - config.after(:each) do + after(:each) do $tc.stop if $tc end end diff --git a/spec/integration/simple/compression_spec.rb b/spec/integration/simple/compression_spec.rb index 2d473bb..16fcd41 100644 --- a/spec/integration/simple/compression_spec.rb +++ b/spec/integration/simple/compression_spec.rb @@ -1,6 +1,8 @@ require 'integration/simple/spec_helper' RSpec.describe "compression", :type => :request do + include_context "a single broker cluster" + it "roundtrips" do i = rand(1000) diff --git a/spec/integration/simple/connection_spec.rb b/spec/integration/simple/connection_spec.rb index 63ab3cd..721ec09 100644 --- a/spec/integration/simple/connection_spec.rb +++ b/spec/integration/simple/connection_spec.rb @@ -2,6 +2,8 @@ include Protocol RSpec.describe Connection, :type => :request do + include_context "a single broker cluster" + before(:each) do @connection = Connection.new("localhost", 9092, "test", 10_000) end diff --git a/spec/integration/simple/multiple_brokers_spec.rb b/spec/integration/simple/multiple_brokers_spec.rb index 1d1fe8e..13de6a2 100644 --- a/spec/integration/simple/multiple_brokers_spec.rb +++ b/spec/integration/simple/multiple_brokers_spec.rb @@ -1,6 +1,8 @@ require 'integration/simple/spec_helper' RSpec.describe "three brokers in cluster", :type => :request do + include_context "a single broker cluster" + describe "sending batches of 1 message" do it "sends messages to all brokers" do end diff --git a/spec/integration/simple/simple_producer_and_consumer_spec.rb b/spec/integration/simple/simple_producer_and_consumer_spec.rb index 27fe25a..a942921 100644 --- a/spec/integration/simple/simple_producer_and_consumer_spec.rb +++ b/spec/integration/simple/simple_producer_and_consumer_spec.rb @@ -1,6 +1,7 @@ require 'integration/simple/spec_helper' RSpec.describe "simple producer and consumer", :type => :request do + include_context "a single broker cluster" describe "writing and consuming one topic" do it "fetches produced messages" do diff --git a/spec/integration/simple/spec_helper.rb b/spec/integration/simple/spec_helper.rb index 5884122..6072179 100644 --- a/spec/integration/simple/spec_helper.rb +++ b/spec/integration/simple/spec_helper.rb @@ -2,16 +2,15 @@ require 'test_cluster' -RSpec.configure do |config| - config.before(:each) do +RSpec.shared_context "a single broker cluster" do + before(:each) do JavaRunner.remove_tmp JavaRunner.set_kafka_path! $tc = TestCluster.new $tc.start - sleep 5 end - config.after(:each) do + after(:each) do $tc.stop end end diff --git a/spec/integration/simple/truncated_messages_spec.rb b/spec/integration/simple/truncated_messages_spec.rb index 5edff80..3bb2175 100644 --- a/spec/integration/simple/truncated_messages_spec.rb +++ b/spec/integration/simple/truncated_messages_spec.rb @@ -1,6 +1,8 @@ require 'integration/simple/spec_helper' RSpec.describe "truncated messages", :type => :request do + include_context "a single broker cluster" + before(:each) do @s1 = "a" * 335 @s2 = "b" * 338 diff --git a/spec/integration/simple/unavailable_broker_spec.rb b/spec/integration/simple/unavailable_broker_spec.rb index 1d75290..5c470f5 100644 --- a/spec/integration/simple/unavailable_broker_spec.rb +++ b/spec/integration/simple/unavailable_broker_spec.rb @@ -1,6 +1,8 @@ require 'integration/simple/spec_helper' RSpec.describe "unavailable broker scenarios:", :type => :request do + include_context "a single broker cluster" + context "producer with a dead broker in bootstrap list" do before(:each) do @p = Producer.new(["localhost:9091","localhost:9092"], "test") diff --git a/spec/spec_helper.rb b/spec/spec_helper.rb index 5214d5f..7a703ea 100644 --- a/spec/spec_helper.rb +++ b/spec/spec_helper.rb @@ -27,3 +27,6 @@ require 'coveralls' Coveralls.wear! + +require 'timecop' +Timecop.safe_mode = true diff --git a/spec/test_cluster.rb b/spec/test_cluster.rb index 8915629..7969f98 100644 --- a/spec/test_cluster.rb +++ b/spec/test_cluster.rb @@ -8,6 +8,7 @@ def initialize def start @zookeeper.start @broker.start + sleep 5 end def stop @@ -153,10 +154,12 @@ class BrokerRunner "leader.imbalance.check.interval.seconds" => 5 } + attr_reader :id + def initialize(id, port, partition_count = 1, replication_factor = 1, properties = {}) @id = id @port = port - @jr = JavaRunner.new("broker_#{id}", + @jr = JavaRunner.new("broker_#{id}", "#{ENV['KAFKA_PATH']}/bin/kafka-run-class.sh -daemon -name broker_#{id} kafka.Kafka", "ps ax | grep -i 'kafka\.Kafka' | grep java | grep broker_#{id} | grep -v grep | awk '{print $1}'", "SIGTERM", @@ -197,7 +200,7 @@ def initialize def pid @jr.pid end - + def start @jr.start end diff --git a/spec/unit/sync_producer_spec.rb b/spec/unit/sync_producer_spec.rb index 001864d..2046410 100644 --- a/spec/unit/sync_producer_spec.rb +++ b/spec/unit/sync_producer_spec.rb @@ -76,8 +76,19 @@ @sp.send_messages([Message.new(:topic => "topic", :value => "value")]) rescue StandardError end - it "refreshes metadata between retries" do - expect(@cluster_metadata).to receive(:update).exactly(4).times + it "resets and refreshes metadata between retries" do + # First check + expect(@mbts).to receive(:needs_metadata?).and_return(false).ordered + + # After first failure + 3.times do + expect(@cluster_metadata).to receive(:reset).ordered + expect(@broker_pool).to receive(:close).ordered + expect(@mbts).to receive(:needs_metadata?).and_return(true).ordered + expect(@cluster_metadata).to receive(:update).ordered + expect(@mbts).to receive(:needs_metadata?).and_return(false).ordered + end + @sp.send_messages([Message.new(:topic => "topic", :value => "value")]) rescue StandardError end diff --git a/spec/unit/topic_metadata_spec.rb b/spec/unit/topic_metadata_spec.rb index bea0804..077fa29 100644 --- a/spec/unit/topic_metadata_spec.rb +++ b/spec/unit/topic_metadata_spec.rb @@ -22,4 +22,22 @@ expect(tm.partition_leader(0)).to eq(0) end + + context "#available_partitions" do + it "includes when missing replicas" do + partition_metadata = Protocol::PartitionMetadata.new(9, 0, 0, [0], [0]) + partitions = [partition_metadata] + tm = TopicMetadata.new(Protocol::TopicMetadataStruct.new(0, "topic", partitions)) + + expect(tm.available_partitions.length).to be(1) + end + + it "ignores other errors" do + partition_metadata = Protocol::PartitionMetadata.new(2, 0, 0, [0], [0]) + partitions = [partition_metadata] + tm = TopicMetadata.new(Protocol::TopicMetadataStruct.new(0, "topic", partitions)) + + expect(tm.available_partitions.length).to be(0) + end + end end