Skip to content

Conversation

@chadknutson
Copy link

Under some conditions (like a client crashing), 2 separate channel.close frames can be sent by amqproxy to the upstream. This is catastrophic because the upstream will close the connection, which closes all client connections through the proxy. This pull tries to ensure that duplicate channel.close frames are never sent.

Copy link

Copilot AI left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Pull request overview

This pull request attempts to prevent duplicate Channel.Close frames from being sent to the upstream AMQP server, which can cause catastrophic connection closures affecting all clients. The fix adds checks to ensure Close frames are only sent when channels are still active.

Key Changes

  • Added synchronization and existence checks in Upstream.close_channel() to prevent sending Close frames for already-closed channels
  • Implemented new Channel.Close handling logic in Client.read_loop() to properly track channel closing state
  • Added guards in Client.close_channel() and Client.write() to check channel state before sending Close frames

Reviewed changes

Copilot reviewed 2 out of 2 changed files in this pull request and generated 6 comments.

File Description
src/amqproxy/upstream.cr Added synchronized check to only send Channel.Close if channel exists in @channels map
src/amqproxy/client.cr Added comprehensive Channel.Close handling with state tracking, checks to prevent duplicate Close frames, and conditional guards in write() and close_channel() methods
Comments suppressed due to low confidence (1)

src/amqproxy/client.cr:216

  • The close_all_upstream_channels method iterates over @channel_map and modifies it (line 215 clears it) without any synchronization. This can race with concurrent operations in read_loop and write() that also access and modify @channel_map. While Crystal's Hash is designed to be safe for concurrent reads, concurrent iteration and modification can lead to undefined behavior.

Consider wrapping the iteration and clear operations in a synchronization block, or taking a snapshot of the channels to close before iterating. Note that other methods accessing @channel_map during iteration could also cause issues.

    private def close_all_upstream_channels(code = 500_u16, reason = "CLIENT_DISCONNECTED")
      @channel_map.each_value do |upstream_channel|
        upstream_channel.try &.close(code, reason)
      rescue Upstream::WriteError
        Log.debug { "Upstream write error while closing client's channels" }
        next # Nothing to do
      end
      @channel_map.clear
    end

💡 Add Copilot custom instructions for smarter, more guided reviews. Learn how to get started.

Comment on lines 177 to +181
case frame
when AMQ::Protocol::Frame::Channel::Close
@channel_map[frame.channel] = nil
if @channel_map[frame.channel]
@channel_map[frame.channel] = nil
end
Copy link

Copilot AI Dec 19, 2025

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

The modification of @channel_map at lines 179-181 happens outside the @lock.synchronize block that ends at line 176. This creates a race condition where multiple concurrent calls to write() with Channel::Close frames could interleave their checks and modifications of @channel_map.

Additionally, accessing @channel_map[frame.channel] without the ? operator can raise a KeyError if the channel was deleted from the map between the write operation and this check. This is especially problematic since the check happens after releasing the lock.

Move the @channel_map modification (lines 177-186) inside the @lock.synchronize block, or use @channel_map[frame.channel]? to safely handle missing keys. The former approach is preferred for consistency with how the rest of the write method handles synchronization.

Copilot uses AI. Check for mistakes.
Comment on lines +99 to +100
# Upstream write failed, send error close to client
close_channel(src_channel, 500_u16, "UPSTREAM_ERROR")
Copy link

Copilot AI Dec 19, 2025

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

The order of operations here is problematic. The code sets @channel_map[frame.channel] = nil at line 97 BEFORE attempting to write to upstream. If the upstream write at line 95 fails with Upstream::WriteError, the code attempts to call close_channel at line 100, but since @channel_map was already set to nil, close_channel won't send a Close frame (it checks if the channel exists in the map).

However, since the client initiated this Close request, the proper response upon error would be to send a CloseOk acknowledging the client's close, not another Close frame. Consider sending CloseOk in the error handler instead of calling close_channel, or move the @channel_map[frame.channel] = nil assignment to after the successful write, only setting it to nil within the rescue block if needed.

Suggested change
# Upstream write failed, send error close to client
close_channel(src_channel, 500_u16, "UPSTREAM_ERROR")
# Upstream write to close failed; acknowledge client's close and clean up
write AMQ::Protocol::Frame::Channel::CloseOk.new(src_channel)
@channel_map.delete(src_channel)

Copilot uses AI. Check for mistakes.
Comment on lines +77 to +103
when AMQ::Protocol::Frame::Channel::Close
src_channel = frame.channel
begin
upstream_channel = @channel_map[frame.channel]?
if upstream_channel.nil?
# Channel is already closing (value is nil) or doesn't exist
if @channel_map.has_key?(frame.channel)
# Channel exists but is nil (already closing from upstream side)
# Send CloseOk to acknowledge the client's close request
write AMQ::Protocol::Frame::Channel::CloseOk.new(frame.channel)
@channel_map.delete(frame.channel)
else
# Channel doesn't exist at all - error condition
close_connection(504_u16, "CHANNEL_ERROR - Channel #{frame.channel} not open", frame)
end
else
# Channel is open, forward the close to upstream
begin
upstream_channel.write(frame)
# Mark channel as closing so close_all_upstream_channels won't try to close it again
@channel_map[frame.channel] = nil
rescue ex : Upstream::WriteError
# Upstream write failed, send error close to client
close_channel(src_channel, 500_u16, "UPSTREAM_ERROR")
end
end
end
Copy link

Copilot AI Dec 19, 2025

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

The entire Channel::Close handling block accesses and modifies @channel_map (lines 80, 83, 87, 97) without any synchronization. This creates race conditions with:

  1. The write() method which modifies @channel_map at lines 179-181 (also unsynchronized)
  2. The close_channel() method which reads @channel_map at line 203
  3. The close_all_upstream_channels() method which iterates over @channel_map

Since @channel_map can be accessed concurrently from multiple fibers (e.g., when processing frames from upstream via DownstreamChannel.write which calls Client.write), all accesses to @channel_map should be protected by the same lock (such as @lock) to ensure thread safety. Consider wrapping all @channel_map operations in @lock.synchronize blocks or refactoring to ensure consistent synchronization throughout the class.

Copilot uses AI. Check for mistakes.
end
when AMQ::Protocol::Frame::Channel::CloseOk
# Server closed channel, CloseOk reply to server is already sent
@channel_map.delete(frame.channel)
Copy link

Copilot AI Dec 19, 2025

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

The deletion of @channel_map entry at line 106 is not synchronized, which can race with other concurrent accesses to @channel_map from the write() method, close_channel() method, and close_all_upstream_channels() method. Ensure this operation is protected by @lock to maintain thread safety.

Suggested change
@channel_map.delete(frame.channel)
@lock.synchronize do
@channel_map.delete(frame.channel)
end

Copilot uses AI. Check for mistakes.
Comment on lines 59 to 66
def close_channel(id, code, reason)
send AMQ::Protocol::Frame::Channel::Close.new(id, code, reason, 0_u16, 0_u16)
# Only send Channel::Close if the channel is still open (exists in our map)
@channels_lock.synchronize do
if @channels.has_key?(id)
send AMQ::Protocol::Frame::Channel::Close.new(id, code, reason, 0_u16, 0_u16)
end
end
end
Copy link

Copilot AI Dec 19, 2025

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

There's a potential race condition between this check and the channel deletion that happens in the read_loop. The read_loop deletes channels from @channels at lines 94 and 98 when receiving Channel::Close and Channel::CloseOk frames. Between the time this method checks has_key?(id) and sends the Close frame, the read_loop could receive and process a CloseOk, deleting the channel from @channels. This would still allow the duplicate Close frame to be sent.

To properly prevent duplicate Close frames, the deletion from @channels should also happen within this synchronized block, or the close_channel method should mark the channel as closing before sending the Close frame. Consider deleting the channel from @channels here before sending the Close frame, similar to how the read_loop handles it when receiving Close from upstream.

Copilot uses AI. Check for mistakes.
Comment on lines 201 to 206
def close_channel(id, code, reason)
write AMQ::Protocol::Frame::Channel::Close.new(id, code, reason, 0_u16, 0_u16)
# Only send Channel::Close if we haven't already sent one
if @channel_map[id]?
write AMQ::Protocol::Frame::Channel::Close.new(id, code, reason, 0_u16, 0_u16)
end
end
Copy link

Copilot AI Dec 19, 2025

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This method accesses @channel_map without any synchronization, which could lead to race conditions. The @channel_map is accessed concurrently by the read_loop (which runs in a fiber) and the write method (which uses @lock for synchronization). Without proper locking, there's a race condition where:

  1. This method reads @channel_map[id] and finds it non-nil
  2. Another fiber sets @channel_map[id] = nil or deletes it
  3. This method sends a duplicate Close frame

Since the write method uses @lock for synchronization when modifying @channel_map (lines 179-181), this method should use the same lock to ensure thread-safe access. Alternatively, consider setting @channel_map[id] = nil before writing the Close frame to prevent duplicates, similar to the pattern used in the write method.

Copilot uses AI. Check for mistakes.
@baelter
Copy link
Member

baelter commented Dec 19, 2025

I think there a simpler way to address this. Check #234

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment

Labels

None yet

Projects

None yet

Development

Successfully merging this pull request may close these issues.

3 participants