From cd891dd85027de880fc2d818ad53112320987dda Mon Sep 17 00:00:00 2001 From: Scott Clasen Date: Fri, 16 May 2014 11:10:20 -0700 Subject: [PATCH 1/3] check for errors when required acks > 0, refresh metadata on NotLeaderForPartition --- lib/poseidon/sync_producer.rb | 24 +++++++++++++++++++++++- 1 file changed, 23 insertions(+), 1 deletion(-) diff --git a/lib/poseidon/sync_producer.rb b/lib/poseidon/sync_producer.rb index 6f13b95..df4df2d 100644 --- a/lib/poseidon/sync_producer.rb +++ b/lib/poseidon/sync_producer.rb @@ -50,7 +50,7 @@ def send_messages(messages) end messages_to_send.messages_for_brokers(@message_conductor).each do |messages_for_broker| - if send_to_broker(messages_for_broker) + if sent_ok(send_to_broker(messages_for_broker)) messages_to_send.successfully_sent(messages_for_broker) end end @@ -115,5 +115,27 @@ def send_to_broker(messages_for_broker) rescue Connection::ConnectionFailedError false end + + def sent_ok(produce_response) + if !produce_response + false + elsif @required_acks == 0 + true + else + has_errors = produce_response.topic_response.reduce(false){ |errors, response| + errors || response.partitions.reduce(false){ |part_errors, part_response| + part_errors || has_error(response, part_response) + } + } + ! has_errors + end + end + + def has_error(response, part_response) + if part_response.error == 6 + refresh_metadata(response.topic) + end + part_response.error != 0 + end end end From 7dc0fb9a35d248456632baf7d9043984dab0ebc2 Mon Sep 17 00:00:00 2001 From: Scott Clasen Date: Fri, 16 May 2014 14:34:02 -0700 Subject: [PATCH 2/3] log correlation ids and broker conn info, close connections on update known brokers --- lib/poseidon/broker_pool.rb | 1 + lib/poseidon/connection.rb | 3 ++- 2 files changed, 3 insertions(+), 1 deletion(-) diff --git a/lib/poseidon/broker_pool.rb b/lib/poseidon/broker_pool.rb index cee927c..9c71b52 100644 --- a/lib/poseidon/broker_pool.rb +++ b/lib/poseidon/broker_pool.rb @@ -29,6 +29,7 @@ def fetch_metadata(topics) # @param [Hash] brokers # Hash of broker_id => { :host => host, :port => port } def update_known_brokers(brokers) + @brokers.values(&:close) @brokers.update(brokers) nil end diff --git a/lib/poseidon/connection.rb b/lib/poseidon/connection.rb index 7bde395..dc71cf1 100644 --- a/lib/poseidon/connection.rb +++ b/lib/poseidon/connection.rb @@ -40,7 +40,8 @@ def produce(required_acks, timeout, messages_for_topics) req = ProduceRequest.new( request_common(:produce), required_acks, timeout, - messages_for_topics) + messages_for_topics) + puts "PRODUCE CORRELATION #{req.common.correlation_id} from CLIENT #{@client_id} TO #{@host} #{@port}" send_request(req) if required_acks != 0 read_response(ProduceResponse) From 76a4ee4e741829474ba4c59694844fc9b0efd15f Mon Sep 17 00:00:00 2001 From: Scott Clasen Date: Fri, 16 May 2014 15:42:55 -0700 Subject: [PATCH 3/3] debug logging --- lib/poseidon/connection.rb | 2 +- lib/poseidon/message_conductor.rb | 1 + lib/poseidon/sync_producer.rb | 4 +++- 3 files changed, 5 insertions(+), 2 deletions(-) diff --git a/lib/poseidon/connection.rb b/lib/poseidon/connection.rb index dc71cf1..7c6c2ed 100644 --- a/lib/poseidon/connection.rb +++ b/lib/poseidon/connection.rb @@ -41,7 +41,7 @@ def produce(required_acks, timeout, messages_for_topics) required_acks, timeout, messages_for_topics) - puts "PRODUCE CORRELATION #{req.common.correlation_id} from CLIENT #{@client_id} TO #{@host} #{@port}" + puts "KAFKADEBUG PRODUCE CORRELATION #{req.common.correlation_id} from CLIENT #{@client_id} TO #{@host} #{@port}" send_request(req) if required_acks != 0 read_response(ProduceResponse) diff --git a/lib/poseidon/message_conductor.rb b/lib/poseidon/message_conductor.rb index 8636065..f10e503 100644 --- a/lib/poseidon/message_conductor.rb +++ b/lib/poseidon/message_conductor.rb @@ -32,6 +32,7 @@ def destination(topic, key = nil) if topic_metadata && topic_metadata.leader_available? partition_id = determine_partition(topic_metadata, key) broker_id = topic_metadata.partitions[partition_id].leader || NO_BROKER + puts "KAFKADEBUG LEADER FOR #{partition_id} is #{broker_id}" else partition_id = NO_PARTITION broker_id = NO_BROKER diff --git a/lib/poseidon/sync_producer.rb b/lib/poseidon/sync_producer.rb index df4df2d..f52038a 100644 --- a/lib/poseidon/sync_producer.rb +++ b/lib/poseidon/sync_producer.rb @@ -50,6 +50,8 @@ def send_messages(messages) end messages_to_send.messages_for_brokers(@message_conductor).each do |messages_for_broker| + puts "KAFKADEBUG #{@client_id} sending messages for broker #{messages_for_broker.broker_id}" + if sent_ok(send_to_broker(messages_for_broker)) messages_to_send.successfully_sent(messages_for_broker) end @@ -109,7 +111,7 @@ def refresh_metadata(topics) def send_to_broker(messages_for_broker) return false if messages_for_broker.broker_id == -1 to_send = messages_for_broker.build_protocol_objects(@compression_config) - @broker_pool.execute_api_call(messages_for_broker.broker_id, :produce, + @broker_pool.execute_api_call(messages_for_broker.broker_id, :produce, required_acks, ack_timeout_ms, to_send) rescue Connection::ConnectionFailedError