diff --git a/packages/kernel-browser-runtime/src/PlatformServicesClient.test.ts b/packages/kernel-browser-runtime/src/PlatformServicesClient.test.ts index f18ece846..5673a8ccd 100644 --- a/packages/kernel-browser-runtime/src/PlatformServicesClient.test.ts +++ b/packages/kernel-browser-runtime/src/PlatformServicesClient.test.ts @@ -419,8 +419,7 @@ describe('PlatformServicesClient', () => { ); await delay(50); - expect(remoteHandler).toHaveBeenCalledOnce(); - expect(remoteHandler).toHaveBeenCalledWith( + expect(remoteHandler).toHaveBeenCalledExactlyOnceWith( 'peer-123', 'test-message', ); @@ -471,8 +470,7 @@ describe('PlatformServicesClient', () => { ); await delay(50); - expect(giveUpHandler).toHaveBeenCalledOnce(); - expect(giveUpHandler).toHaveBeenCalledWith('peer-456'); + expect(giveUpHandler).toHaveBeenCalledExactlyOnceWith('peer-456'); const successResponse = outputs.find( (message) => diff --git a/packages/kernel-browser-runtime/src/PlatformServicesServer.test.ts b/packages/kernel-browser-runtime/src/PlatformServicesServer.test.ts index 62ec4eaab..daad608b1 100644 --- a/packages/kernel-browser-runtime/src/PlatformServicesServer.test.ts +++ b/packages/kernel-browser-runtime/src/PlatformServicesServer.test.ts @@ -231,8 +231,7 @@ describe('PlatformServicesServer', () => { await stream.receiveInput(makeMessageEvent('m0', { method: 'foo' })); await delay(10); - expect(errorSpy).toHaveBeenCalledOnce(); - expect(errorSpy).toHaveBeenCalledWith( + expect(errorSpy).toHaveBeenCalledExactlyOnceWith( 'Error handling "foo" request:', rpcErrors.methodNotFound(), ); @@ -275,8 +274,9 @@ describe('PlatformServicesServer', () => { await delay(10); expect(workers).toHaveLength(1); - expect(workers[0]?.launch).toHaveBeenCalledOnce(); - expect(workers[0]?.launch).toHaveBeenCalledWith(makeVatConfig()); + expect(workers[0]?.launch).toHaveBeenCalledExactlyOnceWith( + makeVatConfig(), + ); }); it('logs error if a vat with the same id already exists', async () => { @@ -285,8 +285,7 @@ describe('PlatformServicesServer', () => { await stream.receiveInput(makeLaunchMessageEvent('m1', 'v0')); await delay(10); - expect(errorSpy).toHaveBeenCalledOnce(); - expect(errorSpy).toHaveBeenCalledWith( + expect(errorSpy).toHaveBeenCalledExactlyOnceWith( 'Error handling "launch" request:', new VatAlreadyExistsError('v0'), ); @@ -306,8 +305,7 @@ describe('PlatformServicesServer', () => { await delay(10); expect(workers).toHaveLength(1); - expect(workers[0]?.terminate).toHaveBeenCalledOnce(); - expect(workers[0]?.terminate).toHaveBeenCalledWith(); + expect(workers[0]?.terminate).toHaveBeenCalledExactlyOnceWith(); }); it('logs error if a vat with the specified id does not exist', async () => { @@ -315,8 +313,7 @@ describe('PlatformServicesServer', () => { await stream.receiveInput(makeTerminateMessageEvent('m0', 'v0')); await delay(10); - expect(errorSpy).toHaveBeenCalledOnce(); - expect(errorSpy).toHaveBeenCalledWith( + expect(errorSpy).toHaveBeenCalledExactlyOnceWith( 'Error handling "terminate" request:', new VatNotFoundError('v0'), ); @@ -336,8 +333,7 @@ describe('PlatformServicesServer', () => { await stream.receiveInput(makeTerminateMessageEvent('m1', vatId)); await delay(10); - expect(errorSpy).toHaveBeenCalledOnce(); - expect(errorSpy).toHaveBeenCalledWith( + expect(errorSpy).toHaveBeenCalledExactlyOnceWith( 'Error handling "terminate" request:', vatNotFoundError, ); @@ -380,8 +376,7 @@ describe('PlatformServicesServer', () => { await stream.receiveInput(makeTerminateAllMessageEvent('m1')); await delay(10); - expect(errorSpy).toHaveBeenCalledOnce(); - expect(errorSpy).toHaveBeenCalledWith( + expect(errorSpy).toHaveBeenCalledExactlyOnceWith( 'Error handling "terminateAll" request:', vatNotFoundError, ); diff --git a/packages/kernel-browser-runtime/src/internal-comms/internal-connections.test.ts b/packages/kernel-browser-runtime/src/internal-comms/internal-connections.test.ts index 3dd09c888..004065841 100644 --- a/packages/kernel-browser-runtime/src/internal-comms/internal-connections.test.ts +++ b/packages/kernel-browser-runtime/src/internal-comms/internal-connections.test.ts @@ -183,7 +183,7 @@ describe('internal-connections', () => { it('should handle new internal process connections', async () => { receiveInternalConnections({ - handleInternalMessage: mockHandleMessage, + handler: mockHandleMessage, logger, }); @@ -208,7 +208,7 @@ describe('internal-connections', () => { it('should handle valid message', async () => { receiveInternalConnections({ - handleInternalMessage: mockHandleMessage, + handler: mockHandleMessage, logger, }); @@ -257,7 +257,7 @@ describe('internal-connections', () => { it('should handle JSON-RPC notifications', async () => { receiveInternalConnections({ - handleInternalMessage: mockHandleMessage, + handler: mockHandleMessage, logger, }); @@ -300,7 +300,7 @@ describe('internal-connections', () => { it('should handle multiple simultaneous connections', async () => { receiveInternalConnections({ - handleInternalMessage: mockHandleMessage, + handler: mockHandleMessage, logger, }); @@ -335,7 +335,7 @@ describe('internal-connections', () => { it('should forget ids of closed channels', async () => { receiveInternalConnections({ - handleInternalMessage: mockHandleMessage, + handler: mockHandleMessage, logger, }); const controlChannel = MockBroadcastChannel.channels.get( @@ -376,7 +376,7 @@ describe('internal-connections', () => { it('should reject duplicate connections', () => { receiveInternalConnections({ - handleInternalMessage: mockHandleMessage, + handler: mockHandleMessage, logger, }); const controlChannel = MockBroadcastChannel.channels.get( @@ -401,7 +401,7 @@ describe('internal-connections', () => { it('should reject invalid control commands', () => { receiveInternalConnections({ - handleInternalMessage: mockHandleMessage, + handler: mockHandleMessage, logger, }); const controlChannel = MockBroadcastChannel.channels.get( @@ -425,7 +425,7 @@ describe('internal-connections', () => { it('should handle comms channel message errors', async () => { receiveInternalConnections({ - handleInternalMessage: mockHandleMessage, + handler: mockHandleMessage, logger, }); @@ -455,5 +455,140 @@ describe('internal-connections', () => { expect.any(Error), ); }); + + it('should handle messages with handlerPromise after resolution', async () => { + const handlerPromise = Promise.resolve(mockHandleMessage); + + receiveInternalConnections({ + handlerPromise, + logger, + }); + + const controlChannel = MockBroadcastChannel.channels.get( + COMMS_CONTROL_CHANNEL_NAME, + ); + controlChannel?.onmessage?.( + new MessageEvent('message', { + data: { + method: 'init', + params: { channelName: 'internal-process-channel' }, + }, + }), + ); + + await delay(); + const commsStream = streamInstances[0]!; + expect(commsStream).toBeDefined(); + const commsStreamWriteSpy = vi.spyOn(commsStream, 'write'); + + const commsChannel = MockBroadcastChannel.channels.get( + 'internal-process-channel', + )!; + + // Send first message + commsChannel.onmessage?.( + new MessageEvent('message', { + data: { + method: 'getStatus', + params: null, + id: 1, + }, + }), + ); + await delay(); + + expect(mockHandleMessage).toHaveBeenCalledWith({ + method: 'getStatus', + params: null, + id: 1, + }); + expect(commsStreamWriteSpy).toHaveBeenCalledWith({ + jsonrpc: '2.0', + id: 1, + result: { vats: [], clusterConfig: makeClusterConfig() }, + }); + + // Send second message to verify caching (handler should be used directly) + commsChannel.onmessage?.( + new MessageEvent('message', { + data: { + method: 'getStatus', + params: null, + id: 2, + }, + }), + ); + await delay(); + + expect(mockHandleMessage).toHaveBeenCalledTimes(2); + expect(commsStreamWriteSpy).toHaveBeenCalledTimes(2); + }); + + it('should queue messages until handlerPromise resolves', async () => { + let resolveHandler: (handler: typeof mockHandleMessage) => void; + const handlerPromise = new Promise( + (resolve) => { + resolveHandler = resolve; + }, + ); + + receiveInternalConnections({ + handlerPromise, + logger, + }); + + const controlChannel = MockBroadcastChannel.channels.get( + COMMS_CONTROL_CHANNEL_NAME, + ); + controlChannel?.onmessage?.( + new MessageEvent('message', { + data: { + method: 'init', + params: { channelName: 'internal-process-channel' }, + }, + }), + ); + + await delay(); + const commsStream = streamInstances[0]!; + expect(commsStream).toBeDefined(); + const commsStreamWriteSpy = vi.spyOn(commsStream, 'write'); + + const commsChannel = MockBroadcastChannel.channels.get( + 'internal-process-channel', + )!; + + // Send message before handler is ready + commsChannel.onmessage?.( + new MessageEvent('message', { + data: { + method: 'getStatus', + params: null, + id: 1, + }, + }), + ); + + // Handler should not be called yet + await delay(); + expect(mockHandleMessage).not.toHaveBeenCalled(); + expect(commsStreamWriteSpy).not.toHaveBeenCalled(); + + // Now resolve the handler + resolveHandler!(mockHandleMessage); + await delay(); + + // Now the message should be handled + expect(mockHandleMessage).toHaveBeenCalledWith({ + method: 'getStatus', + params: null, + id: 1, + }); + expect(commsStreamWriteSpy).toHaveBeenCalledWith({ + jsonrpc: '2.0', + id: 1, + result: { vats: [], clusterConfig: makeClusterConfig() }, + }); + }); }); }); diff --git a/packages/kernel-browser-runtime/src/internal-comms/internal-connections.ts b/packages/kernel-browser-runtime/src/internal-comms/internal-connections.ts index 960b3c8e0..4e461dbfc 100644 --- a/packages/kernel-browser-runtime/src/internal-comms/internal-connections.ts +++ b/packages/kernel-browser-runtime/src/internal-comms/internal-connections.ts @@ -102,9 +102,17 @@ const connectToInternalProcess = async ( return stream; }; -type ReceiveConnectionsOptions = Omit & { - handleInternalMessage: HandleInternalMessage; -}; +type ReceiveConnectionsOptions = Omit & + ( + | { + handler: HandleInternalMessage; + handlerPromise?: never; + } + | { + handler?: never; + handlerPromise: Promise; + } + ); /** * Listens for connections between the kernel and an internal process, e.g. a UI instance. @@ -112,16 +120,33 @@ type ReceiveConnectionsOptions = Omit & { * processes have attempted to connect. * * @param options - The options for the connection. - * @param options.handleInternalMessage - The function to handle the internal message. + * @param options.handler - The function to handle internal messages. Mutually exclusive + * with `handlerPromise`. + * @param options.handlerPromise - A promise that resolves to the handler function. + * Mutually exclusive with `handler`. * @param options.logger - The logger instance. * @param options.controlChannelName - The name of the control channel. Must match * the name used by {@link connectToKernel} on the other end. */ export const receiveInternalConnections = ({ - handleInternalMessage, + handler: directHandler, + handlerPromise, logger, controlChannelName = COMMS_CONTROL_CHANNEL_NAME, }: ReceiveConnectionsOptions): void => { + let handler: HandleInternalMessage | null = null; + let handlerReady: Promise; + + if (handlerPromise === undefined) { + handler = directHandler; + handlerReady = Promise.resolve(directHandler); + } else { + handlerReady = handlerPromise.then((resolvedHandler) => { + handler = resolvedHandler; + return resolvedHandler; + }); + } + const seenChannels = new Set(); new BroadcastChannel(controlChannelName).onmessage = (event) => { if (!isCommsControlMessage(event.data)) { @@ -148,7 +173,8 @@ export const receiveInternalConnections = ({ `Received message from internal process "${channelName}": ${JSON.stringify(message)}`, ); - const reply = await handleInternalMessage(message); + const messageHandler = handler ?? (await handlerReady); + const reply = await messageHandler(message); if (reply !== undefined) { await kernelRpcStream.write(reply); } diff --git a/packages/kernel-browser-runtime/src/kernel-worker/kernel-worker.ts b/packages/kernel-browser-runtime/src/kernel-worker/kernel-worker.ts index 4fe244572..894711634 100644 --- a/packages/kernel-browser-runtime/src/kernel-worker/kernel-worker.ts +++ b/packages/kernel-browser-runtime/src/kernel-worker/kernel-worker.ts @@ -54,18 +54,18 @@ async function main(): Promise { resetStorage, }, ); - const serverP = kernelP.then((kernel) => { - return new JsonRpcServer({ + const handlerP = kernelP.then((kernel) => { + const server = new JsonRpcServer({ middleware: [ makeLoggingMiddleware(logger.subLogger('kernel-command')), makePanelMessageMiddleware(kernel, kernelDatabase), ], }); + return async (request: JsonRpcCall) => server.handle(request); }); receiveInternalConnections({ - handleInternalMessage: async (request) => - serverP.then(async (rpcServer) => rpcServer.handle(request)), + handlerPromise: handlerP, logger, });