diff --git a/lib/poseidon/broker_pool.rb b/lib/poseidon/broker_pool.rb index cee927c..9c71b52 100644 --- a/lib/poseidon/broker_pool.rb +++ b/lib/poseidon/broker_pool.rb @@ -29,6 +29,7 @@ def fetch_metadata(topics) # @param [Hash] brokers # Hash of broker_id => { :host => host, :port => port } def update_known_brokers(brokers) + @brokers.values(&:close) @brokers.update(brokers) nil end diff --git a/lib/poseidon/connection.rb b/lib/poseidon/connection.rb index 7bde395..7c6c2ed 100644 --- a/lib/poseidon/connection.rb +++ b/lib/poseidon/connection.rb @@ -40,7 +40,8 @@ def produce(required_acks, timeout, messages_for_topics) req = ProduceRequest.new( request_common(:produce), required_acks, timeout, - messages_for_topics) + messages_for_topics) + puts "KAFKADEBUG PRODUCE CORRELATION #{req.common.correlation_id} from CLIENT #{@client_id} TO #{@host} #{@port}" send_request(req) if required_acks != 0 read_response(ProduceResponse) diff --git a/lib/poseidon/message_conductor.rb b/lib/poseidon/message_conductor.rb index 8636065..f10e503 100644 --- a/lib/poseidon/message_conductor.rb +++ b/lib/poseidon/message_conductor.rb @@ -32,6 +32,7 @@ def destination(topic, key = nil) if topic_metadata && topic_metadata.leader_available? partition_id = determine_partition(topic_metadata, key) broker_id = topic_metadata.partitions[partition_id].leader || NO_BROKER + puts "KAFKADEBUG LEADER FOR #{partition_id} is #{broker_id}" else partition_id = NO_PARTITION broker_id = NO_BROKER diff --git a/lib/poseidon/sync_producer.rb b/lib/poseidon/sync_producer.rb index 6f13b95..f52038a 100644 --- a/lib/poseidon/sync_producer.rb +++ b/lib/poseidon/sync_producer.rb @@ -50,7 +50,9 @@ def send_messages(messages) end messages_to_send.messages_for_brokers(@message_conductor).each do |messages_for_broker| - if send_to_broker(messages_for_broker) + puts "KAFKADEBUG #{@client_id} sending messages for broker #{messages_for_broker.broker_id}" + + if sent_ok(send_to_broker(messages_for_broker)) messages_to_send.successfully_sent(messages_for_broker) end end @@ -109,11 +111,33 @@ def refresh_metadata(topics) def send_to_broker(messages_for_broker) return false if messages_for_broker.broker_id == -1 to_send = messages_for_broker.build_protocol_objects(@compression_config) - @broker_pool.execute_api_call(messages_for_broker.broker_id, :produce, + @broker_pool.execute_api_call(messages_for_broker.broker_id, :produce, required_acks, ack_timeout_ms, to_send) rescue Connection::ConnectionFailedError false end + + def sent_ok(produce_response) + if !produce_response + false + elsif @required_acks == 0 + true + else + has_errors = produce_response.topic_response.reduce(false){ |errors, response| + errors || response.partitions.reduce(false){ |part_errors, part_response| + part_errors || has_error(response, part_response) + } + } + ! has_errors + end + end + + def has_error(response, part_response) + if part_response.error == 6 + refresh_metadata(response.topic) + end + part_response.error != 0 + end end end