From 95be7222eb4ffdfef153c358a1e27bce1e436877 Mon Sep 17 00:00:00 2001 From: Sule-balogun Olanrewaju Date: Sat, 3 Jan 2026 12:19:29 +0000 Subject: [PATCH] Add missing handler for resource subscribe and unsubscribe --- .github/workflows/pipeline.yaml | 2 +- src/Capability/Registry.php | 44 ++++++ src/Capability/RegistryInterface.php | 18 +++ src/Server/Builder.php | 4 +- .../Request/ResourceSubscribeHandler.php | 66 ++++++++ .../Request/ResourceUnsubscribeHandler.php | 66 ++++++++ tests/Conformance/server.php | 5 +- tests/Unit/Capability/RegistryTest.php | 89 +++++++++++ .../Handler/Request/ResourceSubscribeTest.php | 136 +++++++++++++++++ .../Request/ResourceUnsubscribeTest.php | 141 ++++++++++++++++++ 10 files changed, 568 insertions(+), 3 deletions(-) create mode 100644 src/Server/Handler/Request/ResourceSubscribeHandler.php create mode 100644 src/Server/Handler/Request/ResourceUnsubscribeHandler.php create mode 100644 tests/Unit/Server/Handler/Request/ResourceSubscribeTest.php create mode 100644 tests/Unit/Server/Handler/Request/ResourceUnsubscribeTest.php diff --git a/.github/workflows/pipeline.yaml b/.github/workflows/pipeline.yaml index 812217c2..b1a2d528 100644 --- a/.github/workflows/pipeline.yaml +++ b/.github/workflows/pipeline.yaml @@ -100,7 +100,7 @@ jobs: passedTests=$(echo "$OUTPUT" | sed -nE 's/.*Total: ([0-9]+) passed.*/\1/p') passedTests=${passedTests:-0} - REQUIRED_TESTS_TO_PASS=22 + REQUIRED_TESTS_TO_PASS=25 echo "Required tests to pass: $REQUIRED_TESTS_TO_PASS" [ "$passedTests" -ge "$REQUIRED_TESTS_TO_PASS" ] || exit $exit_code diff --git a/src/Capability/Registry.php b/src/Capability/Registry.php index d0813412..e75820fd 100644 --- a/src/Capability/Registry.php +++ b/src/Capability/Registry.php @@ -25,11 +25,14 @@ use Mcp\Exception\PromptNotFoundException; use Mcp\Exception\ResourceNotFoundException; use Mcp\Exception\ToolNotFoundException; +use Mcp\Schema\Notification\ResourceUpdatedNotification; use Mcp\Schema\Page; use Mcp\Schema\Prompt; use Mcp\Schema\Resource; use Mcp\Schema\ResourceTemplate; use Mcp\Schema\Tool; +use Mcp\Server\Protocol; +use Mcp\Server\Session\SessionInterface; use Psr\EventDispatcher\EventDispatcherInterface; use Psr\Log\LoggerInterface; use Psr\Log\NullLogger; @@ -61,6 +64,11 @@ final class Registry implements RegistryInterface */ private array $resourceTemplates = []; + /** + * @var array> + */ + private array $resourceSubscriptions = []; + public function __construct( private readonly ?EventDispatcherInterface $eventDispatcher = null, private readonly LoggerInterface $logger = new NullLogger(), @@ -449,4 +457,40 @@ private function paginateResults(array $items, int $limit, ?string $cursor = nul return array_values(\array_slice($items, $offset, $limit)); } + + public function subscribe(SessionInterface $session, string $uri): void + { + if (!isset($this->resourceSubscriptions[$uri])) { + $this->resourceSubscriptions[$uri] = []; + } + + $sessionId = $session->getId()->toRfc4122(); + $this->resourceSubscriptions[$uri][$sessionId] = $session; + } + + public function unsubscribe(SessionInterface $session, string $uri): void + { + if (!isset($this->resourceSubscriptions[$uri])) { + return; + } + + $sessionId = $session->getId()->toRfc4122(); + + unset($this->resourceSubscriptions[$uri][$sessionId]); + + if ([] === $this->resourceSubscriptions[$uri]) { + unset($this->resourceSubscriptions[$uri]); + } + } + + public function notifyResourceChanged(Protocol $protocol, string $uri): void + { + if (!isset($this->resourceSubscriptions[$uri])) { + return; + } + + foreach ($this->resourceSubscriptions[$uri] as $session) { + $protocol->sendNotification(new ResourceUpdatedNotification($uri), $session); + } + } } diff --git a/src/Capability/RegistryInterface.php b/src/Capability/RegistryInterface.php index 67295681..f62db4f5 100644 --- a/src/Capability/RegistryInterface.php +++ b/src/Capability/RegistryInterface.php @@ -25,6 +25,8 @@ use Mcp\Schema\Resource; use Mcp\Schema\ResourceTemplate; use Mcp\Schema\Tool; +use Mcp\Server\Protocol; +use Mcp\Server\Session\SessionInterface; /** * @phpstan-import-type Handler from ElementReference @@ -157,4 +159,20 @@ public function getPrompts(?int $limit = null, ?string $cursor = null): Page; * @throws PromptNotFoundException */ public function getPrompt(string $name): PromptReference; + + /** + * Subscribes a session to a specific resource URI. + */ + public function subscribe(SessionInterface $session, string $uri): void; + + /** + * Unsubscribes a session from a specific resource URI. + */ + public function unsubscribe(SessionInterface $session, string $uri): void; + + /** + * Notifies all sessions subscribed to the given resource URI that the + * resource has changed. Sends a ResourceUpdatedNotification for each subscriber. + */ + public function notifyResourceChanged(Protocol $protocol, string $uri): void; } diff --git a/src/Server/Builder.php b/src/Server/Builder.php index 363a09c5..eb2ddc10 100644 --- a/src/Server/Builder.php +++ b/src/Server/Builder.php @@ -486,7 +486,7 @@ public function build(): Server tools: $registry->hasTools(), toolsListChanged: $this->eventDispatcher instanceof EventDispatcherInterface, resources: $registry->hasResources() || $registry->hasResourceTemplates(), - resourcesSubscribe: false, + resourcesSubscribe: $registry->hasResources() || $registry->hasResourceTemplates(), resourcesListChanged: $this->eventDispatcher instanceof EventDispatcherInterface, prompts: $registry->hasPrompts(), promptsListChanged: $this->eventDispatcher instanceof EventDispatcherInterface, @@ -509,6 +509,8 @@ public function build(): Server new Handler\Request\ListToolsHandler($registry, $this->paginationLimit), new Handler\Request\PingHandler(), new Handler\Request\ReadResourceHandler($registry, $referenceHandler, $logger), + new Handler\Request\ResourceSubscribeHandler($registry, $logger), + new Handler\Request\ResourceUnsubscribeHandler($registry, $logger), new Handler\Request\SetLogLevelHandler(), ]); diff --git a/src/Server/Handler/Request/ResourceSubscribeHandler.php b/src/Server/Handler/Request/ResourceSubscribeHandler.php new file mode 100644 index 00000000..189d751f --- /dev/null +++ b/src/Server/Handler/Request/ResourceSubscribeHandler.php @@ -0,0 +1,66 @@ + + * + * @author Larry Sule-balogun + */ +final class ResourceSubscribeHandler implements RequestHandlerInterface +{ + public function __construct( + private readonly RegistryInterface $registry, + private readonly LoggerInterface $logger = new NullLogger(), + ) { + } + + public function supports(Request $request): bool + { + return $request instanceof ResourceSubscribeRequest; + } + + public function handle(Request $request, SessionInterface $session): Response|Error + { + \assert($request instanceof ResourceSubscribeRequest); + + $uri = $request->uri; + + try { + $this->registry->getResource($uri); + } catch (ResourceNotFoundException $e) { + $this->logger->error('Resource not found', ['uri' => $uri]); + + return Error::forResourceNotFound($e->getMessage(), $request->getId()); + } + + $this->logger->debug('Subscribing to resource', ['uri' => $uri]); + + $this->registry->subscribe($session, $uri); + + return new Response( + $request->getId(), + new EmptyResult(), + ); + } +} diff --git a/src/Server/Handler/Request/ResourceUnsubscribeHandler.php b/src/Server/Handler/Request/ResourceUnsubscribeHandler.php new file mode 100644 index 00000000..91144662 --- /dev/null +++ b/src/Server/Handler/Request/ResourceUnsubscribeHandler.php @@ -0,0 +1,66 @@ + + * + * @author Larry Sule-balogun + */ +final class ResourceUnsubscribeHandler implements RequestHandlerInterface +{ + public function __construct( + private readonly RegistryInterface $registry, + private readonly LoggerInterface $logger = new NullLogger(), + ) { + } + + public function supports(Request $request): bool + { + return $request instanceof ResourceUnsubscribeRequest; + } + + public function handle(Request $request, SessionInterface $session): Response|Error + { + \assert($request instanceof ResourceUnsubscribeRequest); + + $uri = $request->uri; + + try { + $this->registry->getResource($uri); + } catch (ResourceNotFoundException $e) { + $this->logger->error('Resource not found', ['uri' => $uri]); + + return Error::forResourceNotFound($e->getMessage(), $request->getId()); + } + + $this->logger->debug('Unsubscribing from resource', ['uri' => $uri]); + + $this->registry->unsubscribe($session, $uri); + + return new Response( + $request->getId(), + new EmptyResult(), + ); + } +} diff --git a/tests/Conformance/server.php b/tests/Conformance/server.php index 802fa05a..af250067 100644 --- a/tests/Conformance/server.php +++ b/tests/Conformance/server.php @@ -9,10 +9,13 @@ * file that was distributed with this source code. */ +ini_set('display_errors', '0'); + require_once dirname(__DIR__, 2).'/vendor/autoload.php'; use Http\Discovery\Psr17Factory; use Laminas\HttpHandlerRunner\Emitter\SapiEmitter; +use Mcp\Capability\Registry; use Mcp\Schema\Content\AudioContent; use Mcp\Schema\Content\EmbeddedResource; use Mcp\Schema\Content\ImageContent; @@ -32,6 +35,7 @@ $request = $psr17Factory->createServerRequestFromGlobals(); $transport = new StreamableHttpTransport($request, logger: $logger); +$registry = new Registry(null, $logger); $server = Server::builder() ->setServerInfo('mcp-conformance-test-server', '1.0.0') @@ -51,7 +55,6 @@ ->addResource(fn () => 'This is the content of the static text resource.', 'test://static-text', 'static-text', 'A static text resource for testing') ->addResource(fn () => fopen('data://image/png;base64,'.Elements::TEST_IMAGE_BASE64, 'r'), 'test://static-binary', 'static-binary', 'A static binary resource (image) for testing') ->addResourceTemplate([Elements::class, 'resourceTemplate'], 'test://template/{id}/data', 'template', 'A resource template with parameter substitution', 'application/json') - // TODO: Handler for resources/subscribe and resources/unsubscribe ->addResource(fn () => 'Watched resource content', 'test://watched-resource', 'watched-resource', 'A resource that can be watched') // Prompts ->addPrompt(fn () => [['role' => 'user', 'content' => 'This is a simple prompt for testing.']], 'test_simple_prompt', 'A simple prompt without arguments') diff --git a/tests/Unit/Capability/RegistryTest.php b/tests/Unit/Capability/RegistryTest.php index d97ccf41..5f5176b3 100644 --- a/tests/Unit/Capability/RegistryTest.php +++ b/tests/Unit/Capability/RegistryTest.php @@ -20,23 +20,29 @@ use Mcp\Exception\PromptNotFoundException; use Mcp\Exception\ResourceNotFoundException; use Mcp\Exception\ToolNotFoundException; +use Mcp\Schema\Notification\ResourceUpdatedNotification; use Mcp\Schema\Prompt; use Mcp\Schema\Resource; use Mcp\Schema\ResourceTemplate; use Mcp\Schema\Tool; +use Mcp\Server\Protocol; +use Mcp\Server\Session\SessionInterface; use PHPUnit\Framework\MockObject\MockObject; use PHPUnit\Framework\TestCase; use Psr\Log\LoggerInterface; +use Symfony\Component\Uid\Uuid; class RegistryTest extends TestCase { private Registry $registry; + private Protocol|MockObject $protocol; private LoggerInterface|MockObject $logger; protected function setUp(): void { $this->logger = $this->createMock(LoggerInterface::class); $this->registry = new Registry(null, $this->logger); + $this->protocol = $this->createMock(Protocol::class); } public function testHasserReturnFalseForEmptyRegistry(): void @@ -527,6 +533,89 @@ public function testMultipleRegistrationsOfSameElementWithSameType(): void $this->assertEquals('second', ($toolRef->handler)()); } + public function testSubscribeAndSendsNotification(): void + { + $session1 = $this->createMock(SessionInterface::class); + $session2 = $this->createMock(SessionInterface::class); + + $uuid1 = Uuid::v4(); + $uuid2 = Uuid::v4(); + + $session1->method('getId')->willReturn($uuid1); + $session2->method('getId')->willReturn($uuid2); + + $uri = 'test://resource1'; + + // Expect notification to be sent for each subscriber + $this->protocol->expects($this->exactly(2)) + ->method('sendNotification') + ->with($this->callback(function ($notification) use ($uri) { + return $notification instanceof ResourceUpdatedNotification + && $notification->uri === $uri; + })); + + // Subscribe both sessions + $this->registry->subscribe($session1, $uri); + $this->registry->subscribe($session2, $uri); + + $this->registry->notifyResourceChanged($this->protocol, $uri); + } + + public function testUnsubscribeRemovesOnlyTargetSession(): void + { + $session1 = $this->createMock(SessionInterface::class); + $uuid1 = Uuid::v4(); + $session1->method('getId')->willReturn($uuid1); + + $uri = 'test://resource'; + + // Subscribe both sessions + $this->registry->subscribe($session1, $uri); + + $this->protocol->expects($this->exactly(1)) + ->method('sendNotification') + ->with($this->callback(fn ($notification) => $notification instanceof ResourceUpdatedNotification && $notification->uri === $uri + )); + + $this->registry->notifyResourceChanged($this->protocol, $uri); + + // Unsubscribe only session1 + $this->registry->unsubscribe($session1, $uri); + } + + public function testUnsubscribeStopsNotifications(): void + { + $protocol = $this->createMock(Protocol::class); + $session = $this->createMock(SessionInterface::class); + $session->method('getId')->willReturn(Uuid::v4()); + $uri = 'test://resource'; + + $this->registry->subscribe($session, $uri); + $this->registry->unsubscribe($session, $uri); + + $protocol->expects($this->never())->method('sendNotification'); + + $this->registry->notifyResourceChanged($protocol, $uri); + } + + public function testDuplicateSubscribeDoesNotTriggerDuplicateNotifications(): void + { + $session = $this->createMock(SessionInterface::class); + $uuid = Uuid::v4(); + $session->method('getId')->willReturn($uuid); + + $uri = 'test://resource'; + $this->registry->subscribe($session, $uri); + $this->registry->subscribe($session, $uri); + + $this->protocol->expects($this->once()) + ->method('sendNotification') + ->with($this->callback(fn ($notification) => $notification instanceof ResourceUpdatedNotification && $notification->uri === $uri + )); + + $this->registry->notifyResourceChanged($this->protocol, $uri); + } + private function createValidTool(string $name): Tool { return new Tool( diff --git a/tests/Unit/Server/Handler/Request/ResourceSubscribeTest.php b/tests/Unit/Server/Handler/Request/ResourceSubscribeTest.php new file mode 100644 index 00000000..45865430 --- /dev/null +++ b/tests/Unit/Server/Handler/Request/ResourceSubscribeTest.php @@ -0,0 +1,136 @@ +registry = $this->createMock(RegistryInterface::class); + $this->session = $this->createMock(SessionInterface::class); + + $this->handler = new ResourceSubscribeHandler($this->registry); + } + + public function testHandleSuccessfulSubscribe(): void + { + $uri = 'file://documents/readme.txt'; + $request = $this->createResourceSubscribeRequest($uri); + $resourceReference = $this->getMockBuilder(ResourceReference::class) + ->setConstructorArgs([new Resource($uri, 'test', mimeType: 'text/plain'), []]) + ->getMock(); + + $this->registry + ->expects($this->once()) + ->method('getResource') + ->with($uri) + ->willReturn($resourceReference); + + $this->registry->expects($this->once()) + ->method('subscribe') + ->with($this->session, $uri); + + $response = $this->handler->handle($request, $this->session); + + $this->assertInstanceOf(Response::class, $response); + $this->assertEquals($request->getId(), $response->id); + $this->assertInstanceOf(EmptyResult::class, $response->result); + } + + public function testDuplicateSubscribeDoesNotError(): void + { + $uri = 'file://documents/readme.txt'; + $request = $this->createResourceSubscribeRequest($uri); + $resourceReference = $this->getMockBuilder(ResourceReference::class) + ->setConstructorArgs([new Resource($uri, 'test', mimeType: 'text/plain'), []]) + ->getMock(); + + $this->registry + ->expects($this->exactly(2)) + ->method('getResource') + ->with($uri) + ->willReturn($resourceReference); + + $this->registry + ->expects($this->exactly(2)) + ->method('subscribe') + ->with($this->session, $uri); + + $response1 = $this->handler->handle($request, $this->session); + $response2 = $this->handler->handle($request, $this->session); + + // No exception thrown, response is still EmptyResult + $this->assertInstanceOf(Response::class, $response1); + $this->assertInstanceOf(Response::class, $response2); + $this->assertEquals($request->getId(), $response1->id); + $this->assertEquals($request->getId(), $response2->id); + $this->assertInstanceOf(EmptyResult::class, $response1->result); + $this->assertInstanceOf(EmptyResult::class, $response2->result); + } + + public function testSubscribeWithEmptyUriThrowsError(): void + { + $this->expectException(InvalidArgumentException::class); + $this->expectExceptionMessage('Missing or invalid "uri" parameter for resources/subscribe.'); + + $this->createResourceSubscribeRequest(''); + } + + public function testHandleSubscribeResourceNotFoundException(): void + { + $uri = 'file://missing/file.txt'; + $request = $this->createResourceSubscribeRequest($uri); + $exception = new ResourceNotFoundException($uri); + + $this->registry + ->expects($this->once()) + ->method('getResource') + ->with($uri) + ->willThrowException($exception); + + $response = $this->handler->handle($request, $this->session); + + $this->assertInstanceOf(Error::class, $response); + $this->assertEquals(Error::RESOURCE_NOT_FOUND, $response->code); + $this->assertEquals(\sprintf('Resource not found for uri: "%s".', $uri), $response->message); + } + + private function createResourceSubscribeRequest(string $uri): ResourceSubscribeRequest + { + return ResourceSubscribeRequest::fromArray([ + 'jsonrpc' => '2.0', + 'method' => ResourceSubscribeRequest::getMethod(), + 'id' => 'test-request-'.uniqid(), + 'params' => [ + 'uri' => $uri, + ], + ]); + } +} diff --git a/tests/Unit/Server/Handler/Request/ResourceUnsubscribeTest.php b/tests/Unit/Server/Handler/Request/ResourceUnsubscribeTest.php new file mode 100644 index 00000000..f4b21b46 --- /dev/null +++ b/tests/Unit/Server/Handler/Request/ResourceUnsubscribeTest.php @@ -0,0 +1,141 @@ +registry = $this->createMock(RegistryInterface::class); + $this->session = $this->createMock(SessionInterface::class); + + $this->handler = new ResourceUnsubscribeHandler($this->registry); + } + + public function testHandleSuccessfulUnsubscribe(): void + { + // Arrange + $uri = 'file://documents/readme.txt'; + $request = $this->createResourceUnsubscribeRequest($uri); + $resourceReference = $this->getMockBuilder(ResourceReference::class) + ->setConstructorArgs([new Resource($uri, 'test', mimeType: 'text/plain'), []]) + ->getMock(); + + $this->registry + ->expects($this->once()) + ->method('getResource') + ->with($uri) + ->willReturn($resourceReference); + + $this->registry->expects($this->once()) + ->method('unsubscribe') + ->with($this->session, $uri); + + // Act + $response = $this->handler->handle($request, $this->session); + + // Assert + $this->assertInstanceOf(Response::class, $response); + $this->assertEquals($request->getId(), $response->id); + $this->assertInstanceOf(EmptyResult::class, $response->result); + } + + public function testDuplicateUnsubscribeDoesNotError(): void + { + // Arrange + $uri = 'file://documents/readme.txt'; + $request = $this->createResourceUnsubscribeRequest($uri); + $resourceReference = $this->getMockBuilder(ResourceReference::class) + ->setConstructorArgs([new Resource($uri, 'test', mimeType: 'text/plain'), []]) + ->getMock(); + + $this->registry + ->expects($this->exactly(2)) + ->method('getResource') + ->with($uri) + ->willReturn($resourceReference); + + $this->registry + ->expects($this->exactly(2)) + ->method('unsubscribe') + ->with($this->session, $uri); + + // Act + $response1 = $this->handler->handle($request, $this->session); + $response2 = $this->handler->handle($request, $this->session); + + // Assert + $this->assertInstanceOf(Response::class, $response1); + $this->assertInstanceOf(Response::class, $response2); + $this->assertEquals($request->getId(), $response1->id); + $this->assertEquals($request->getId(), $response2->id); + $this->assertInstanceOf(EmptyResult::class, $response1->result); + $this->assertInstanceOf(EmptyResult::class, $response2->result); + } + + public function testHandleUnsubscribeResourceNotFoundException(): void + { + $uri = 'file://missing/file.txt'; + $request = $this->createResourceUnsubscribeRequest($uri); + $exception = new ResourceNotFoundException($uri); + + $this->registry + ->expects($this->once()) + ->method('getResource') + ->with($uri) + ->willThrowException($exception); + + $response = $this->handler->handle($request, $this->session); + + $this->assertInstanceOf(Error::class, $response); + $this->assertEquals(Error::RESOURCE_NOT_FOUND, $response->code); + $this->assertEquals(\sprintf('Resource not found for uri: "%s".', $uri), $response->message); + } + + public function testUnsubscribeWithEmptyUriThrowsError(): void + { + $this->expectException(InvalidArgumentException::class); + $this->expectExceptionMessage('Missing or invalid "uri" parameter for resources/unsubscribe.'); + + $this->createResourceUnsubscribeRequest(''); + } + + private function createResourceUnsubscribeRequest(string $uri): ResourceUnsubscribeRequest + { + return ResourceUnsubscribeRequest::fromArray([ + 'jsonrpc' => '2.0', + 'method' => ResourceUnsubscribeRequest::getMethod(), + 'id' => 'test-request-'.uniqid(), + 'params' => [ + 'uri' => $uri, + ], + ]); + } +}