Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
3 changes: 3 additions & 0 deletions packages/ocap-kernel/CHANGELOG.md
Original file line number Diff line number Diff line change
Expand Up @@ -41,6 +41,9 @@ and this project adheres to [Semantic Versioning](https://semver.org/spec/v2.0.0
- Regenerate `incarnationId` when `resetStorage=true` clears the rest of kernel state, completing the #948 peer-restart detection on browser/extension kernel reloads ([#950](https://github.com/MetaMask/ocap-kernel/pull/950))
- The previous except-list preserved `incarnationId` across `resetStorage` wipes, so a restarted sender signalled the same incarnation it had before the wipe and the matching receiver's handshake decided "no restart" — leaving stale `highestReceivedSeq` in place and silently dropping the sender's fresh `seq=1` messages
- Register a new vat with its subcluster before awaiting `runVat`, so a garbage-collection pass during bundle load cannot delete the still-empty subcluster out from under the in-progress vat creation ([#952](https://github.com/MetaMask/ocap-kernel/pull/952))
- Use length-prefixed framing for remote messages so payloads larger than the underlying transport's per-frame cutoff (e.g. `@libp2p/webrtc`'s 16 KB datachannel limit) are reassembled correctly on the receiver ([#957](https://github.com/MetaMask/ocap-kernel/pull/957))
- Replace `byteStream` with `lpStream` on every remote channel; the byte-oriented stream did not preserve `write()` boundaries, so any message the transport split into multiple frames was parsed from the first frame only, silently dropped without acknowledgement, and the sender retried until giving up after `MAX_RETRIES`
- Surface receiver-side framing-cap violations (`InvalidDataLengthError`, `InvalidDataLengthLengthError`) as `ResourceLimitError` with `limitType: 'messageSize'` so size errors look the same whether they tripped on the sender's `validateMessageSize` or the receiver's framing decoder

## [0.7.0]

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -220,7 +220,7 @@ function makeTestMultiaddr(protoNames: string[], host: string) {
};
}

// Simple ByteStream mock
// Simple LengthPrefixedStream mock
type MockByteStream = {
write: (chunk: Uint8Array) => Promise<void>;
read: () => Promise<Uint8Array | undefined>;
Expand All @@ -229,7 +229,7 @@ type MockByteStream = {

const streamMap = new WeakMap<object, MockByteStream>();
vi.mock('@libp2p/utils', () => ({
byteStream: (stream: object) => {
lpStream: (stream: object) => {
const bs: MockByteStream = {
writes: [],
async write(chunk: Uint8Array) {
Expand All @@ -243,6 +243,9 @@ vi.mock('@libp2p/utils', () => ({
return bs;
},
getByteStreamFor: (stream: object) => streamMap.get(stream),
InvalidDataLengthError: class InvalidDataLengthError extends Error {},
InvalidDataLengthLengthError: class InvalidDataLengthLengthError extends Error {},
UnexpectedEOFError: class UnexpectedEOFError extends Error {},
}));

const createLibp2p = vi.fn();
Expand Down
17 changes: 14 additions & 3 deletions packages/ocap-kernel/src/remotes/platform/connection-factory.ts
Original file line number Diff line number Diff line change
Expand Up @@ -10,7 +10,7 @@ import {
} from '@libp2p/interface';
import type { PrivateKey, Libp2p } from '@libp2p/interface';
import { ping } from '@libp2p/ping';
import { byteStream } from '@libp2p/utils';
import { lpStream } from '@libp2p/utils';
import { webRTC } from '@libp2p/webrtc';
import { webSockets } from '@libp2p/websockets';
import { webTransport } from '@libp2p/webtransport';
Expand All @@ -26,6 +26,7 @@ import type { Multiaddr } from '@multiformats/multiaddr';
import { createLibp2p } from 'libp2p';

import {
DEFAULT_MAX_MESSAGE_SIZE_BYTES,
RELAY_RECONNECT_BASE_DELAY_MS,
RELAY_RECONNECT_MAX_DELAY_MS,
RELAY_RECONNECT_MAX_ATTEMPTS,
Expand Down Expand Up @@ -59,6 +60,8 @@ export class ConnectionFactory {

readonly #maxRetryAttempts: number;

readonly #maxDataLength: number;

readonly #directTransports: DirectTransport[];

readonly #allowedWsHosts: string[];
Expand Down Expand Up @@ -87,6 +90,7 @@ export class ConnectionFactory {
* @param options.logger - The logger to use for the libp2p node.
* @param options.signal - The signal to use for the libp2p node.
* @param options.maxRetryAttempts - Maximum number of reconnection attempts. 0 = infinite (default).
* @param options.maxMessageSizeBytes - Maximum inbound message size in bytes, used as `maxDataLength` on every `lpStream`. Defaults to 1 MB.
* @param options.directTransports - Optional direct transports (e.g. QUIC, TCP) with listen addresses.
* @param options.allowedWsHosts - Hostnames/IPs allowed for plain ws:// connections beyond private ranges.
*/
Expand All @@ -97,6 +101,8 @@ export class ConnectionFactory {
this.#logger = options.logger;
this.#signal = options.signal;
this.#maxRetryAttempts = options.maxRetryAttempts ?? 0;
this.#maxDataLength =
options.maxMessageSizeBytes ?? DEFAULT_MAX_MESSAGE_SIZE_BYTES;
this.#directTransports = options.directTransports ?? [];
const explicitHosts = options.allowedWsHosts ?? [];
const relayHosts: string[] = [];
Expand Down Expand Up @@ -137,6 +143,7 @@ export class ConnectionFactory {
* @param options.logger - The logger to use for the libp2p node.
* @param options.signal - The signal to use for the libp2p node.
* @param options.maxRetryAttempts - Maximum number of reconnection attempts. 0 = infinite (default).
* @param options.maxMessageSizeBytes - Maximum inbound message size in bytes, used as `maxDataLength` on every `lpStream`. Defaults to 1 MB.
* @param options.directTransports - Optional direct transports (e.g. QUIC, TCP) with listen addresses.
* @param options.allowedWsHosts - Hostnames/IPs allowed for plain ws:// connections beyond private ranges.
* @returns A promise for the new ConnectionFactory instance.
Expand Down Expand Up @@ -217,7 +224,9 @@ export class ConnectionFactory {

// Set up inbound handler
await this.#libp2p.handle('whatever', async (stream, connection) => {
const msgStream = byteStream(stream);
const msgStream = lpStream(stream, {
maxDataLength: this.#maxDataLength,
});
const remotePeerId = connection.remotePeer.toString();
const connType = connection.direct ? 'direct' : 'relayed';
this.#logger.log(
Expand Down Expand Up @@ -401,7 +410,9 @@ export class ConnectionFactory {
this.#logger.log(
`successfully connected to ${peerId} via ${addressString}`,
);
const msgStream = byteStream(stream);
const msgStream = lpStream(stream, {
maxDataLength: this.#maxDataLength,
});
const channel: Channel = { msgStream, stream, peerId };
this.#logger.log(`opened channel to ${peerId}`);
return channel;
Expand Down
5 changes: 3 additions & 2 deletions packages/ocap-kernel/src/remotes/platform/handshake.test.ts
Original file line number Diff line number Diff line change
@@ -1,3 +1,4 @@
import { UnexpectedEOFError } from '@libp2p/utils';
import { Logger } from '@metamask/logger';
import { describe, it, expect, beforeEach, vi } from 'vitest';

Expand Down Expand Up @@ -298,11 +299,11 @@ describe('handshake', () => {
it('throws when channel closes during read', async () => {
vi.spyOn(mockChannel.msgStream, 'read')
.mockImplementation()
.mockResolvedValueOnce(undefined);
.mockRejectedValueOnce(new UnexpectedEOFError('stream closed'));

await expect(
performInboundHandshake(mockChannel, mockDeps),
).rejects.toThrow('Channel closed during handshake');
).rejects.toThrow(UnexpectedEOFError);
});
});
});
3 changes: 0 additions & 3 deletions packages/ocap-kernel/src/remotes/platform/handshake.ts
Original file line number Diff line number Diff line change
Expand Up @@ -89,9 +89,6 @@ async function readWithTimeout(

const readPromise = (async () => {
const readBuf = await channel.msgStream.read();
if (!readBuf) {
throw new Error('Channel closed during handshake');
}
return bufToString(readBuf.subarray());
})();

Expand Down
87 changes: 87 additions & 0 deletions packages/ocap-kernel/src/remotes/platform/lp-framing.test.ts
Original file line number Diff line number Diff line change
@@ -0,0 +1,87 @@
import type { Stream } from '@libp2p/interface';
import { lpStream, streamPair } from '@libp2p/utils';
import { fromString, toString as bufToString } from 'uint8arrays';
import { describe, expect, it } from 'vitest';

/**
* Regression test for the framing bug that motivated switching from
* `byteStream` to `lpStream`: when the underlying transport (e.g.
* `@libp2p/webrtc`) splits a single write across multiple frames,
* `byteStream`'s reader would wake on the first chunk and return a
* truncated payload. `lpStream` adds a length-prefix so the reader
* waits until the full message is present and returns it intact.
*
* Each test forces chunking by capping the underlying stream's
* `maxMessageSize` well below the payload size; the abstract stream
* splits the write into several `message` events on the receiver side.
* `lpStream.read()` should still return exactly one complete payload
* per `lpStream.write()`.
*/
describe('lpStream framing over a chunked transport', () => {
/**
* Build a connected pair of message streams whose underlying
* AbstractStream splits any write larger than `chunkSize` bytes into
* multiple `message` events on the receiving end.
*
* @param chunkSize - Per-frame cap to apply to both ends.
* @returns The outbound/inbound paired streams.
*/
async function chunkedStreamPair(
chunkSize: number,
): Promise<[Stream, Stream]> {
return streamPair({
outbound: { maxMessageSize: chunkSize },
inbound: { maxMessageSize: chunkSize },
});
}

it('reassembles a single payload that the transport splits into many frames', async () => {
const [outbound, inbound] = await chunkedStreamPair(1024);
const sender = lpStream(outbound, { maxDataLength: 1024 * 1024 });
const receiver = lpStream(inbound, { maxDataLength: 1024 * 1024 });

// 20 KB payload — well above the 1 KB per-frame cap, so the transport
// will split it into ~20 frames on the receiver side.
const payload = 'A'.repeat(20_000);
await sender.write(fromString(payload));

const received = await receiver.read();
expect(received.byteLength).toBe(20_000);
expect(bufToString(received.subarray())).toBe(payload);
});

it('preserves message boundaries when several large payloads are sent back-to-back', async () => {
const [outbound, inbound] = await chunkedStreamPair(2048);
const sender = lpStream(outbound, { maxDataLength: 1024 * 1024 });
const receiver = lpStream(inbound, { maxDataLength: 1024 * 1024 });

const payloads = ['alpha', 'bravo', 'charlie'].map(
(label) => `${label}:${'x'.repeat(8_000)}`,
);
for (const payload of payloads) {
await sender.write(fromString(payload));
}

for (const expected of payloads) {
const received = await receiver.read();
expect(bufToString(received.subarray())).toBe(expected);
}
});

it('rejects an inbound message that announces a payload larger than maxDataLength', async () => {
const [outbound, inbound] = await chunkedStreamPair(1024);
const sender = lpStream(outbound, { maxDataLength: 1024 * 1024 });
// Receiver caps inbound at 8 KB to exercise the cross-machine
// mismatch sirtimid called out: a sender that allows larger messages
// than the receiver does should produce a clean InvalidDataLengthError
// on the receiver, not a silent reassembly stall.
const receiver = lpStream(inbound, { maxDataLength: 8 * 1024 });

const oversized = fromString('B'.repeat(16_000));
await sender.write(oversized);

await expect(receiver.read()).rejects.toThrow(
/Message length too long|InvalidDataLength/u,
);
});
});
23 changes: 17 additions & 6 deletions packages/ocap-kernel/src/remotes/platform/transport.test.ts
Original file line number Diff line number Diff line change
@@ -1,3 +1,4 @@
import { UnexpectedEOFError } from '@libp2p/utils';
import { AbortError } from '@metamask/kernel-errors';
import { makeAbortSignalMock } from '@ocap/repo-tools/test-utils';
import {
Expand Down Expand Up @@ -288,6 +289,7 @@ describe('transport.initTransport', () => {
logger: expect.any(Object),
signal: expect.any(AbortSignal),
maxRetryAttempts: undefined,
maxMessageSizeBytes: 1024 * 1024,
directTransports: undefined,
});
});
Expand All @@ -305,6 +307,7 @@ describe('transport.initTransport', () => {
logger: expect.any(Object),
signal: expect.any(AbortSignal),
maxRetryAttempts,
maxMessageSizeBytes: 1024 * 1024,
directTransports: undefined,
});
});
Expand Down Expand Up @@ -333,6 +336,7 @@ describe('transport.initTransport', () => {
logger: expect.any(Object),
signal: expect.any(AbortSignal),
maxRetryAttempts: undefined,
maxMessageSizeBytes: 1024 * 1024,
directTransports,
});
});
Expand Down Expand Up @@ -736,7 +740,7 @@ describe('transport.initTransport', () => {
});
});

it('exits read loop when readBuf is undefined (stream ended)', async () => {
it('treats UnexpectedEOFError as connection loss and triggers reconnection', async () => {
let inboundHandler: ((channel: MockChannel) => void) | undefined;
mockConnectionFactory.onInboundConnection.mockImplementation(
(handler) => {
Expand All @@ -748,16 +752,23 @@ describe('transport.initTransport', () => {
await initTransport('0x1234', {}, remoteHandler);

const mockChannel = createMockChannel('peer-1');
// First read returns undefined, which means stream ended - loop should break
mockChannel.msgStream.read.mockResolvedValueOnce(undefined);
// lpStream throws UnexpectedEOFError on EOF — both for clean
// end-of-stream between messages and for a partial-message drop.
// The read loop can't distinguish the two, so it must treat any
// EOF as a potential mid-message drop and trigger reconnection
// (rather than silently exiting and leaving the sender to
// retransmit into a dead channel).
mockChannel.msgStream.read.mockRejectedValueOnce(
new UnexpectedEOFError('stream closed'),
);

inboundHandler?.(mockChannel);

await vi.waitFor(() => {
// Stream ended, so no messages should be processed
expect(remoteHandler).not.toHaveBeenCalled();
// Should log that stream ended
expect(mockLogger.log).toHaveBeenCalledWith('peer-1:: stream ended');
expect(mockReconnectionManager.startReconnection).toHaveBeenCalledWith(
'peer-1',
);
});
});

Expand Down
42 changes: 33 additions & 9 deletions packages/ocap-kernel/src/remotes/platform/transport.ts
Original file line number Diff line number Diff line change
@@ -1,5 +1,9 @@
import { StreamResetError } from '@libp2p/interface';
import type { StreamCloseEvent } from '@libp2p/interface';
import {
InvalidDataLengthError,
InvalidDataLengthLengthError,
} from '@libp2p/utils';
import {
AbortError,
IntentionalCloseError,
Expand Down Expand Up @@ -171,6 +175,7 @@ export async function initTransport(
logger,
signal,
maxRetryAttempts,
maxMessageSizeBytes,
directTransports,
allowedWsHosts,
});
Expand Down Expand Up @@ -463,6 +468,31 @@ export async function initTransport(
try {
readBuf = await channel.msgStream.read();
} catch (problem) {
if (
problem instanceof InvalidDataLengthError ||
problem instanceof InvalidDataLengthLengthError
) {
// Peer announced a payload larger than maxMessageSizeBytes. The
// length-prefixed framing is now poisoned (subsequent bytes are
// not on a message boundary), so we cannot continue on this
// stream. Surface a uniform "message too long" error to match the
// sender-side validator.
const sizeError = new ResourceLimitError(
`Inbound message exceeds size limit: ${problem.message}`,
{
cause: problem,
data: { limitType: 'messageSize' },
},
);
outputError(
channel.peerId,
`reading message from ${channel.peerId}`,
sizeError,
);
handleConnectionLoss(channel.peerId);
logger.log(`closed channel to ${channel.peerId}`);
throw sizeError;
}
if (problem instanceof StreamResetError) {
// Remote-initiated stream reset: treat as connection loss and
// reconnect. Do NOT mark as intentionally closed — a malicious
Expand All @@ -487,15 +517,9 @@ export async function initTransport(
logger.log(`closed channel to ${channel.peerId}`);
throw problem;
}
if (readBuf) {
reconnectionManager.resetBackoff(channel.peerId); // successful inbound traffic
peerStateManager.updateConnectionTime(channel.peerId);
await receiveMessage(channel.peerId, bufToString(readBuf.subarray()));
} else {
// Stream ended (returned undefined), exit the read loop
logger.log(`${channel.peerId}:: stream ended`);
break;
}
reconnectionManager.resetBackoff(channel.peerId); // successful inbound traffic
peerStateManager.updateConnectionTime(channel.peerId);
await receiveMessage(channel.peerId, bufToString(readBuf.subarray()));
}
} finally {
// Always remove the channel when readChannel exits to prevent stale channels
Expand Down
12 changes: 10 additions & 2 deletions packages/ocap-kernel/src/remotes/types.ts
Original file line number Diff line number Diff line change
@@ -1,5 +1,5 @@
import type { Stream } from '@libp2p/interface';
import type { ByteStream } from '@libp2p/utils';
import type { LengthPrefixedStream } from '@libp2p/utils';
import type { Logger } from '@metamask/logger';

import type { KRef } from '../types.ts';
Expand All @@ -11,7 +11,7 @@ export type InboundConnectionHandler = (
export type PeerDisconnectHandler = (peerId: string) => void;

export type Channel = {
msgStream: ByteStream<Stream>;
msgStream: LengthPrefixedStream<Stream>;
stream: Stream;
peerId: string;
};
Expand Down Expand Up @@ -211,6 +211,14 @@ export type ConnectionFactoryOptions = {
logger: Logger;
signal: AbortSignal;
maxRetryAttempts?: number | undefined;
/**
* Maximum inbound message payload size in bytes. Used as `maxDataLength`
* on every `lpStream` constructed for a channel — must match the
* sender-side validator's limit (`maxMessageSizeBytes` on
* `RemoteCommsOptions`) so that a deployment which raises one also raises
* the other. Defaults to `DEFAULT_MAX_MESSAGE_SIZE_BYTES` (1 MB).
*/
maxMessageSizeBytes?: number | undefined;
directTransports?: DirectTransport[] | undefined;
allowedWsHosts?: string[] | undefined;
};
Expand Down
Loading