Skip to content

Commit b37f7b4

Browse files
Xfaider48ramunasd
authored andcommitted
fix: Mark channels closed when connection is closed
1 parent 853986d commit b37f7b4

File tree

3 files changed

+100
-0
lines changed

3 files changed

+100
-0
lines changed

PhpAmqpLib/Channel/AMQPChannel.php

Lines changed: 5 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -207,6 +207,11 @@ public function close($reply_code = 0, $reply_text = '', $method_sig = array(0,
207207
), false, $this->channel_rpc_timeout);
208208
}
209209

210+
public function markClosed()
211+
{
212+
$this->do_close();
213+
}
214+
210215
/**
211216
* @param AMQPReader $reader
212217
* @throws AMQPProtocolChannelException

PhpAmqpLib/Connection/AbstractConnection.php

Lines changed: 16 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -424,6 +424,7 @@ protected function do_close()
424424
$this->frame_queue = new \SplQueue();
425425
$this->method_queue = [];
426426
$this->setIsConnected(false);
427+
$this->markChannelsClosed();
427428
$this->close_input();
428429
$this->close_socket();
429430
}
@@ -1112,6 +1113,21 @@ protected function closeChannels()
11121113
}
11131114
}
11141115

1116+
/**
1117+
* Mark all available channels as closed
1118+
*/
1119+
protected function markChannelsClosed()
1120+
{
1121+
foreach ($this->channels as $key => $channel) {
1122+
// channels[0] is this connection object, so don't close it yet
1123+
if ($key === 0) {
1124+
continue;
1125+
}
1126+
1127+
$channel->markClosed();
1128+
}
1129+
}
1130+
11151131
/**
11161132
* Should the connection be attempted during construction?
11171133
*

tests/Functional/Connection/ConnectionClosedTest.php

Lines changed: 79 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -8,6 +8,7 @@
88
use PhpAmqpLib\Exception\AMQPHeartbeatMissedException;
99
use PhpAmqpLib\Message\AMQPMessage;
1010
use PhpAmqpLib\Tests\Functional\AbstractConnectionTest;
11+
use PhpAmqpLib\Tests\Functional\ToxiProxy;
1112

1213
/**
1314
* @group connection
@@ -278,4 +279,82 @@ public function must_throw_exception_after_connection_was_restored($type)
278279
$this->assertChannelClosed($channel);
279280
$this->assertConnectionClosed($connection);
280281
}
282+
283+
/**
284+
* Try to close and reopen connection with two channels.
285+
*
286+
* @test
287+
* @small
288+
* @group connection
289+
* @group proxy
290+
* @testWith ["stream"]
291+
* ["socket"]
292+
* @covers \PhpAmqpLib\Wire\IO\StreamIO::write()
293+
* @covers \PhpAmqpLib\Wire\IO\SocketIO::write()
294+
*
295+
* @param string $type
296+
*/
297+
public function must_throw_exception_after_connection_was_restored_with_two_channels($type)
298+
{
299+
$timeout = 1;
300+
301+
// Create proxy part
302+
$host = trim(getenv('TOXIPROXY_AMQP_TARGET'));
303+
if (empty($host)) {
304+
$host = HOST;
305+
}
306+
$proxy = new ToxiProxy('amqp_connection', $this->get_toxiproxy_host());
307+
$proxy->open($host, PORT, $this->get_toxiproxy_amqp_port());
308+
309+
/** @var AbstractConnection $connection */
310+
$connection = $this->connection_create(
311+
$type,
312+
$proxy->getHost(),
313+
$proxy->getPort(),
314+
array('timeout' => $timeout)
315+
);
316+
317+
$channel = $connection->channel();
318+
$anotherChannel = $connection->channel();
319+
320+
$this->assertTrue($channel->is_open());
321+
$this->assertTrue($anotherChannel->is_open());
322+
323+
$this->queue_bind($channel, $exchange_name = 'test_exchange_broken', $queue_name);
324+
$message = new AMQPMessage(
325+
'test',
326+
['delivery_mode' => AMQPMessage::DELIVERY_MODE_NON_PERSISTENT]
327+
);
328+
$channel->basic_publish($message, $exchange_name, $queue_name);
329+
330+
// drop proxy connection and wait longer than timeout
331+
$proxy->close();
332+
sleep($timeout);
333+
usleep(100000);
334+
// Reopen proxy
335+
$proxy->open($host, PORT, $this->get_toxiproxy_amqp_port());
336+
337+
$retry = 0;
338+
$exception = null;
339+
do {
340+
try {
341+
$channel->basic_publish($message, $exchange_name, $queue_name);
342+
} catch (\PHPUnit_Exception $exception) {
343+
throw $exception;
344+
} catch (\Exception $exception) {
345+
break;
346+
}
347+
} while (!$exception && ++$retry < 100);
348+
349+
$this->assertInstanceOf(AMQPConnectionClosedException::class, $exception);
350+
$this->assertGreaterThan(0, $exception->getCode());
351+
$this->assertChannelClosed($channel);
352+
353+
// Now lets reconnect
354+
$connection->reconnect();
355+
356+
// Both old channels must be closed
357+
$this->assertChannelClosed($channel);
358+
$this->assertChannelClosed($anotherChannel);
359+
}
281360
}

0 commit comments

Comments
 (0)