From a7f97b7d0466cbbc87717544016008d892c85f8d Mon Sep 17 00:00:00 2001 From: Arseniy Klempner <arseniyk@status.im> Date: Fri, 7 Mar 2025 18:00:33 -0800 Subject: [PATCH 1/2] feat(sds): adds ephemeral messages, delivered message callback and event --- package-lock.json | 8 ++-- packages/sds/package.json | 1 + packages/sds/src/sds.spec.ts | 65 ++++++++++++++++++++++++--- packages/sds/src/sds.ts | 87 +++++++++++++++++++++++++++++++++--- 4 files changed, 144 insertions(+), 17 deletions(-) diff --git a/package-lock.json b/package-lock.json index 275649e54a..ca6de8411a 100644 --- a/package-lock.json +++ b/package-lock.json @@ -6451,10 +6451,9 @@ } }, "node_modules/@libp2p/interface": { - "version": "2.4.1", - "resolved": "https://registry.npmjs.org/@libp2p/interface/-/interface-2.4.1.tgz", - "integrity": "sha512-G80+rWn0d1+txM7TXMs+eK79qXdtS3yfepx2uGA5Kc7WSzXicwMN1Qw6ZJAB58SExdfQ0oWlS0E/v7kr8B025g==", - "license": "Apache-2.0 OR MIT", + "version": "2.7.0", + "resolved": "https://registry.npmjs.org/@libp2p/interface/-/interface-2.7.0.tgz", + "integrity": "sha512-lWmfIGzbSaw//yoEWWJh8dXNDGSCwUyXwC7P1Q6jCFWNoEtCaB1pvwOGBtri7Db/aNFZryMzN5covoq5ulldnA==", "dependencies": { "@multiformats/multiaddr": "^12.3.3", "it-pushable": "^3.2.3", @@ -42821,6 +42820,7 @@ "version": "0.0.1", "license": "MIT OR Apache-2.0", "dependencies": { + "@libp2p/interface": "^2.7.0", "@noble/hashes": "^1.7.1", "@waku/message-hash": "^0.1.17", "@waku/proto": "^0.0.8", diff --git a/packages/sds/package.json b/packages/sds/package.json index 69b7743151..caa047768b 100644 --- a/packages/sds/package.json +++ b/packages/sds/package.json @@ -59,6 +59,7 @@ "node": ">=20" }, "dependencies": { + "@libp2p/interface": "^2.7.0", "@noble/hashes": "^1.7.1", "@waku/message-hash": "^0.1.17", "@waku/proto": "^0.0.8", diff --git a/packages/sds/src/sds.spec.ts b/packages/sds/src/sds.spec.ts index 9caba42c60..b70bb6edac 100644 --- a/packages/sds/src/sds.spec.ts +++ b/packages/sds/src/sds.spec.ts @@ -5,7 +5,8 @@ import { DefaultBloomFilter } from "./bloom.js"; import { DEFAULT_BLOOM_FILTER_OPTIONS, Message, - MessageChannel + MessageChannel, + MessageChannelEvent } from "./sds.js"; const channelId = "test-channel"; @@ -399,12 +400,10 @@ describe("MessageChannel", function () { it("should remove messages without delivering if timeout is exceeded", async () => { const causalHistorySize = (channelA as any).causalHistorySize; // Create a channel with very very short timeout - const channelC: MessageChannel = new MessageChannel( - channelId, - causalHistorySize, - true, - 10 - ); + const channelC: MessageChannel = new MessageChannel(channelId, { + receivedMessageTimeoutEnabled: true, + receivedMessageTimeout: 10 + }); for (const m of messagesA) { await channelA.sendMessage(utf8ToBytes(m), callback); @@ -547,4 +546,56 @@ describe("MessageChannel", function () { ); }); }); + + describe("Ephemeral messages", () => { + beforeEach(() => { + channelA = new MessageChannel(channelId); + }); + + it("should be sent without a timestamp, causal history, or bloom filter", () => { + const timestampBefore = (channelA as any).lamportTimestamp; + channelA.sendEphemeralMessage(new Uint8Array(), (message) => { + expect(message.lamportTimestamp).to.equal(undefined); + expect(message.causalHistory).to.deep.equal([]); + expect(message.bloomFilter).to.equal(undefined); + return true; + }); + + const outgoingBuffer = (channelA as any).outgoingBuffer as Message[]; + expect(outgoingBuffer.length).to.equal(0); + + const timestampAfter = (channelA as any).lamportTimestamp; + expect(timestampAfter).to.equal(timestampBefore); + }); + + it("should be delivered immediately if received", async () => { + let deliveredMessageId: string | undefined; + let sentMessage: Message | undefined; + + const channelB = new MessageChannel(channelId, { + deliveredMessageCallback: (messageId) => { + deliveredMessageId = messageId; + } + }); + + const waitForMessageDelivered = new Promise<string>((resolve) => { + channelB.addEventListener( + MessageChannelEvent.MessageDelivered, + (event) => { + resolve(event.detail); + } + ); + + channelA.sendEphemeralMessage(utf8ToBytes(messagesA[0]), (message) => { + sentMessage = message; + channelB.receiveMessage(message); + return true; + }); + }); + + const eventMessageId = await waitForMessageDelivered; + expect(deliveredMessageId).to.equal(sentMessage?.messageId); + expect(eventMessageId).to.equal(sentMessage?.messageId); + }); + }); }); diff --git a/packages/sds/src/sds.ts b/packages/sds/src/sds.ts index 0582085cb5..c316f88ff4 100644 --- a/packages/sds/src/sds.ts +++ b/packages/sds/src/sds.ts @@ -1,9 +1,17 @@ +import { TypedEventEmitter } from "@libp2p/interface"; import { sha256 } from "@noble/hashes/sha256"; import { bytesToHex } from "@noble/hashes/utils"; import { proto_sds_message } from "@waku/proto"; import { DefaultBloomFilter } from "./bloom.js"; +export enum MessageChannelEvent { + MessageDelivered = "messageDelivered" +} +type MessageChannelEvents = { + [MessageChannelEvent.MessageDelivered]: CustomEvent<string>; +}; + export type Message = proto_sds_message.SdsMessage; export type ChannelId = string; @@ -15,7 +23,14 @@ export const DEFAULT_BLOOM_FILTER_OPTIONS = { const DEFAULT_CAUSAL_HISTORY_SIZE = 2; const DEFAULT_RECEIVED_MESSAGE_TIMEOUT = 1000 * 60 * 5; // 5 minutes -export class MessageChannel { +interface MessageChannelOptions { + causalHistorySize?: number; + receivedMessageTimeoutEnabled?: boolean; + receivedMessageTimeout?: number; + deliveredMessageCallback?: (messageId: string) => void; +} + +export class MessageChannel extends TypedEventEmitter<MessageChannelEvents> { private lamportTimestamp: number; private filter: DefaultBloomFilter; private outgoingBuffer: Message[]; @@ -26,13 +41,15 @@ export class MessageChannel { private causalHistorySize: number; private acknowledgementCount: number; private timeReceived: Map<string, number>; + private receivedMessageTimeoutEnabled: boolean; + private receivedMessageTimeout: number; + private deliveredMessageCallback?: (messageId: string) => void; public constructor( channelId: ChannelId, - causalHistorySize: number = DEFAULT_CAUSAL_HISTORY_SIZE, - private receivedMessageTimeoutEnabled: boolean = false, - private receivedMessageTimeout: number = DEFAULT_RECEIVED_MESSAGE_TIMEOUT + options: MessageChannelOptions = {} ) { + super(); this.channelId = channelId; this.lamportTimestamp = 0; this.filter = new DefaultBloomFilter(DEFAULT_BLOOM_FILTER_OPTIONS); @@ -40,9 +57,15 @@ export class MessageChannel { this.acknowledgements = new Map(); this.incomingBuffer = []; this.messageIdLog = []; - this.causalHistorySize = causalHistorySize; + this.causalHistorySize = + options.causalHistorySize ?? DEFAULT_CAUSAL_HISTORY_SIZE; this.acknowledgementCount = this.getAcknowledgementCount(); this.timeReceived = new Map(); + this.receivedMessageTimeoutEnabled = + options.receivedMessageTimeoutEnabled ?? false; + this.receivedMessageTimeout = + options.receivedMessageTimeout ?? DEFAULT_RECEIVED_MESSAGE_TIMEOUT; + this.deliveredMessageCallback = options.deliveredMessageCallback; } public static getMessageId(payload: Uint8Array): string { @@ -95,6 +118,36 @@ export class MessageChannel { } } + /** + * Sends a short-lived message without synchronization or reliability requirements. + * + * Sends a message without a timestamp, causal history, or bloom filter. + * Ephemeral messages are not added to the outgoing buffer. + * Upon reception, ephemeral messages are delivered immediately without + * checking for causal dependencies or including in the local log. + * + * See https://rfc.vac.dev/vac/raw/sds/#ephemeral-messages + * + * @param payload - The payload to send. + * @param callback - A callback function that returns a boolean indicating whether the message was sent successfully. + */ + public sendEphemeralMessage( + payload: Uint8Array, + callback?: (message: Message) => boolean + ): void { + const message: Message = { + messageId: MessageChannel.getMessageId(payload), + channelId: this.channelId, + content: payload, + lamportTimestamp: undefined, + causalHistory: [], + bloomFilter: undefined + }; + + if (callback) { + callback(message); + } + } /** * Process a received SDS message for this channel. * @@ -110,6 +163,11 @@ export class MessageChannel { * @param message - The received SDS message. */ public receiveMessage(message: Message): void { + if (!message.lamportTimestamp) { + // Messages with no timestamp are ephemeral messages and should be delivered immediately + this.deliverMessage(message); + return; + } // review ack status this.reviewAckStatus(message); // add to bloom filter (skip for messages with empty content) @@ -241,13 +299,19 @@ export class MessageChannel { // See https://rfc.vac.dev/vac/raw/sds/#deliver-message private deliverMessage(message: Message): void { + this.notifyDeliveredMessage(message.messageId); + const messageLamportTimestamp = message.lamportTimestamp ?? 0; if (messageLamportTimestamp > this.lamportTimestamp) { this.lamportTimestamp = messageLamportTimestamp; } - if (message.content?.length === 0) { + if ( + message.content?.length === 0 || + message.lamportTimestamp === undefined + ) { // Messages with empty content are sync messages. + // Messages with no timestamp are ephemeral messages. // They are not added to the local log or bloom filter. return; } @@ -312,4 +376,15 @@ export class MessageChannel { private getAcknowledgementCount(): number { return 2; } + + private notifyDeliveredMessage(messageId: string): void { + if (this.deliveredMessageCallback) { + this.deliveredMessageCallback(messageId); + } + this.dispatchEvent( + new CustomEvent(MessageChannelEvent.MessageDelivered, { + detail: messageId + }) + ); + } } From 23a0dc10080a729d45cf398b0f04b7917beba246 Mon Sep 17 00:00:00 2001 From: Arseniy Klempner <arseniyk@status.im> Date: Mon, 10 Mar 2025 19:55:43 -0700 Subject: [PATCH 2/2] feat(sds): add retrieval hint to causal history --- packages/proto/src/generated/sds_message.ts | 78 ++++++++++++++++++++- packages/proto/src/lib/sds_message.proto | 7 +- packages/sds/src/sds.spec.ts | 76 +++++++++++--------- packages/sds/src/sds.ts | 65 ++++++++++------- 4 files changed, 163 insertions(+), 63 deletions(-) diff --git a/packages/proto/src/generated/sds_message.ts b/packages/proto/src/generated/sds_message.ts index 757756b6f9..8ad2dbef6c 100644 --- a/packages/proto/src/generated/sds_message.ts +++ b/packages/proto/src/generated/sds_message.ts @@ -7,11 +7,81 @@ import { type Codec, decodeMessage, type DecodeOptions, encodeMessage, MaxLengthError, message } from 'protons-runtime' import type { Uint8ArrayList } from 'uint8arraylist' +export interface HistoryEntry { + messageId: string + retrievalHint?: Uint8Array +} + +export namespace HistoryEntry { + let _codec: Codec<HistoryEntry> + + export const codec = (): Codec<HistoryEntry> => { + if (_codec == null) { + _codec = message<HistoryEntry>((obj, w, opts = {}) => { + if (opts.lengthDelimited !== false) { + w.fork() + } + + if ((obj.messageId != null && obj.messageId !== '')) { + w.uint32(10) + w.string(obj.messageId) + } + + if (obj.retrievalHint != null) { + w.uint32(18) + w.bytes(obj.retrievalHint) + } + + if (opts.lengthDelimited !== false) { + w.ldelim() + } + }, (reader, length, opts = {}) => { + const obj: any = { + messageId: '' + } + + const end = length == null ? reader.len : reader.pos + length + + while (reader.pos < end) { + const tag = reader.uint32() + + switch (tag >>> 3) { + case 1: { + obj.messageId = reader.string() + break + } + case 2: { + obj.retrievalHint = reader.bytes() + break + } + default: { + reader.skipType(tag & 7) + break + } + } + } + + return obj + }) + } + + return _codec + } + + export const encode = (obj: Partial<HistoryEntry>): Uint8Array => { + return encodeMessage(obj, HistoryEntry.codec()) + } + + export const decode = (buf: Uint8Array | Uint8ArrayList, opts?: DecodeOptions<HistoryEntry>): HistoryEntry => { + return decodeMessage(buf, HistoryEntry.codec(), opts) + } +} + export interface SdsMessage { messageId: string channelId: string lamportTimestamp?: number - causalHistory: string[] + causalHistory: HistoryEntry[] bloomFilter?: Uint8Array content?: Uint8Array } @@ -44,7 +114,7 @@ export namespace SdsMessage { if (obj.causalHistory != null) { for (const value of obj.causalHistory) { w.uint32(90) - w.string(value) + HistoryEntry.codec().encode(value, w) } } @@ -91,7 +161,9 @@ export namespace SdsMessage { throw new MaxLengthError('Decode error - map field "causalHistory" had too many elements') } - obj.causalHistory.push(reader.string()) + obj.causalHistory.push(HistoryEntry.codec().decode(reader, reader.uint32(), { + limits: opts.limits?.causalHistory$ + })) break } case 12: { diff --git a/packages/proto/src/lib/sds_message.proto b/packages/proto/src/lib/sds_message.proto index f0396e6cc8..c75c8d7447 100644 --- a/packages/proto/src/lib/sds_message.proto +++ b/packages/proto/src/lib/sds_message.proto @@ -1,11 +1,16 @@ syntax = "proto3"; +message HistoryEntry { + string message_id = 1; // Unique identifier of the SDS message, as defined in `Message` + optional bytes retrieval_hint = 2; // Optional information to help remote parties retrieve this SDS message; For example, A Waku deterministic message hash or routing payload hash +} + message SdsMessage { // 1 Reserved for sender/participant id string message_id = 2; // Unique identifier of the message string channel_id = 3; // Identifier of the channel to which the message belongs optional int32 lamport_timestamp = 10; // Logical timestamp for causal ordering in channel - repeated string causal_history = 11; // List of preceding message IDs that this message causally depends on. Generally 2 or 3 message IDs are included. + repeated HistoryEntry causal_history = 11; // List of preceding message IDs that this message causally depends on. Generally 2 or 3 message IDs are included. optional bytes bloom_filter = 12; // Bloom filter representing received message IDs in channel optional bytes content = 20; // Actual content of the message } \ No newline at end of file diff --git a/packages/sds/src/sds.spec.ts b/packages/sds/src/sds.spec.ts index b70bb6edac..64ced83926 100644 --- a/packages/sds/src/sds.spec.ts +++ b/packages/sds/src/sds.spec.ts @@ -4,14 +4,15 @@ import { expect } from "chai"; import { DefaultBloomFilter } from "./bloom.js"; import { DEFAULT_BLOOM_FILTER_OPTIONS, + HistoryEntry, Message, MessageChannel, MessageChannelEvent } from "./sds.js"; const channelId = "test-channel"; -const callback = (_message: Message): Promise<boolean> => { - return Promise.resolve(true); +const callback = (_message: Message): Promise<{ success: boolean }> => { + return Promise.resolve({ success: true }); }; const getBloomFilter = (channel: MessageChannel): DefaultBloomFilter => { @@ -62,15 +63,16 @@ describe("MessageChannel", function () { const expectedTimestamp = (channelA as any).lamportTimestamp + 1; const messageId = MessageChannel.getMessageId(new Uint8Array()); await channelA.sendMessage(new Uint8Array(), callback); - const messageIdLog = (channelA as any).messageIdLog as { + const messageIdLog = (channelA as any).localHistory as { timestamp: number; - messageId: string; + historyEntry: HistoryEntry; }[]; expect(messageIdLog.length).to.equal(1); expect( messageIdLog.some( (log) => - log.timestamp === expectedTimestamp && log.messageId === messageId + log.timestamp === expectedTimestamp && + log.historyEntry.messageId === messageId ) ).to.equal(true); }); @@ -100,12 +102,15 @@ describe("MessageChannel", function () { // Causal history should only contain the last N messages as defined by causalHistorySize const causalHistory = outgoingBuffer[outgoingBuffer.length - 1] - .causalHistory as string[]; + .causalHistory as HistoryEntry[]; expect(causalHistory.length).to.equal(causalHistorySize); const expectedCausalHistory = messages .slice(-causalHistorySize - 1, -1) - .map((message) => MessageChannel.getMessageId(utf8ToBytes(message))); + .map((message) => ({ + messageId: MessageChannel.getMessageId(utf8ToBytes(message)), + retrievalHint: undefined + })); expect(causalHistory).to.deep.equal(expectedCausalHistory); }); }); @@ -120,7 +125,7 @@ describe("MessageChannel", function () { const timestampBefore = (channelA as any).lamportTimestamp; await channelB.sendMessage(new Uint8Array(), (message) => { channelA.receiveMessage(message); - return Promise.resolve(true); + return Promise.resolve({ success: true }); }); const timestampAfter = (channelA as any).lamportTimestamp; expect(timestampAfter).to.equal(timestampBefore + 1); @@ -133,7 +138,7 @@ describe("MessageChannel", function () { for (const m of messagesB) { await channelB.sendMessage(utf8ToBytes(m), (message) => { channelA.receiveMessage(message); - return Promise.resolve(true); + return Promise.resolve({ success: true }); }); } const timestampAfter = (channelA as any).lamportTimestamp; @@ -147,7 +152,7 @@ describe("MessageChannel", function () { timestamp++; channelB.receiveMessage(message); expect((channelB as any).lamportTimestamp).to.equal(timestamp); - return Promise.resolve(true); + return Promise.resolve({ success: true }); }); } @@ -156,7 +161,7 @@ describe("MessageChannel", function () { timestamp++; channelA.receiveMessage(message); expect((channelA as any).lamportTimestamp).to.equal(timestamp); - return Promise.resolve(true); + return Promise.resolve({ success: true }); }); } @@ -173,7 +178,7 @@ describe("MessageChannel", function () { channelB.receiveMessage(message); const bloomFilter = getBloomFilter(channelB); expect(bloomFilter.lookup(message.messageId)).to.equal(true); - return Promise.resolve(true); + return Promise.resolve({ success: true }); }); } }); @@ -189,7 +194,7 @@ describe("MessageChannel", function () { await channelA.sendMessage(utf8ToBytes(messagesB[0]), (message) => { receivedMessage = message; channelB.receiveMessage(message); - return Promise.resolve(true); + return Promise.resolve({ success: true }); }); const incomingBuffer = (channelB as any).incomingBuffer as Message[]; @@ -201,12 +206,15 @@ describe("MessageChannel", function () { expect(timestampAfter).to.equal(timestampBefore); // Message should not be in local history - const localHistory = (channelB as any).messageIdLog as { + const localHistory = (channelB as any).localHistory as { timestamp: number; - messageId: string; + historyEntry: HistoryEntry; }[]; expect( - localHistory.some((m) => m.messageId === receivedMessage!.messageId) + localHistory.some( + ({ historyEntry: { messageId } }) => + messageId === receivedMessage!.messageId + ) ).to.equal(false); }); }); @@ -221,14 +229,14 @@ describe("MessageChannel", function () { for (const m of messagesA) { await channelA.sendMessage(utf8ToBytes(m), (message) => { channelB.receiveMessage(message); - return Promise.resolve(true); + return Promise.resolve({ success: true }); }); } let notInHistory: Message | null = null; await channelA.sendMessage(utf8ToBytes("not-in-history"), (message) => { notInHistory = message; - return Promise.resolve(true); + return Promise.resolve({ success: true }); }); expect((channelA as any).outgoingBuffer.length).to.equal( @@ -237,7 +245,7 @@ describe("MessageChannel", function () { await channelB.sendMessage(utf8ToBytes(messagesB[0]), (message) => { channelA.receiveMessage(message); - return Promise.resolve(true); + return Promise.resolve({ success: true }); }); // Since messagesA are in causal history of channel B's message @@ -262,7 +270,7 @@ describe("MessageChannel", function () { for (const m of messages) { await channelA.sendMessage(utf8ToBytes(m), (message) => { channelB.receiveMessage(message); - return Promise.resolve(true); + return Promise.resolve({ success: true }); }); } @@ -276,7 +284,7 @@ describe("MessageChannel", function () { utf8ToBytes(messagesB[messagesB.length - 1]), (message) => { channelA.receiveMessage(message); - return Promise.resolve(true); + return Promise.resolve({ success: true }); } ); @@ -310,7 +318,7 @@ describe("MessageChannel", function () { // Send messages until acknowledgement count is reached await channelB.sendMessage(utf8ToBytes(`x-${i}`), (message) => { channelA.receiveMessage(message); - return Promise.resolve(true); + return Promise.resolve({ success: true }); }); } @@ -344,9 +352,9 @@ describe("MessageChannel", function () { await channelA.sendMessage(utf8ToBytes(m), callback); } - await channelA.sendMessage(utf8ToBytes(messagesB[0]), (message) => { + await channelA.sendMessage(utf8ToBytes(messagesB[0]), async (message) => { channelB.receiveMessage(message); - return Promise.resolve(true); + return Promise.resolve({ success: true }); }); const incomingBuffer = (channelB as any).incomingBuffer as Message[]; @@ -357,7 +365,7 @@ describe("MessageChannel", function () { const missingMessages = channelB.sweepIncomingBuffer(); expect(missingMessages.length).to.equal(causalHistorySize); - expect(missingMessages[0]).to.equal( + expect(missingMessages[0].messageId).to.equal( MessageChannel.getMessageId(utf8ToBytes(messagesA[0])) ); }); @@ -368,18 +376,18 @@ describe("MessageChannel", function () { for (const m of messagesA) { await channelA.sendMessage(utf8ToBytes(m), (message) => { sentMessages.push(message); - return Promise.resolve(true); + return Promise.resolve({ success: true }); }); } await channelA.sendMessage(utf8ToBytes(messagesB[0]), (message) => { channelB.receiveMessage(message); - return Promise.resolve(true); + return Promise.resolve({ success: true }); }); const missingMessages = channelB.sweepIncomingBuffer(); expect(missingMessages.length).to.equal(causalHistorySize); - expect(missingMessages[0]).to.equal( + expect(missingMessages[0].messageId).to.equal( MessageChannel.getMessageId(utf8ToBytes(messagesA[0])) ); @@ -411,7 +419,7 @@ describe("MessageChannel", function () { await channelA.sendMessage(utf8ToBytes(messagesB[0]), (message) => { channelC.receiveMessage(message); - return Promise.resolve(true); + return Promise.resolve({ success: true }); }); const missingMessages = channelC.sweepIncomingBuffer(); @@ -439,7 +447,7 @@ describe("MessageChannel", function () { await channelA.sendMessage(utf8ToBytes(m), (message) => { unacknowledgedMessages.push(message); channelB.receiveMessage(message); - return Promise.resolve(true); + return Promise.resolve({ success: true }); }); } @@ -458,7 +466,7 @@ describe("MessageChannel", function () { utf8ToBytes(messagesB[causalHistorySize]), (message) => { channelA.receiveMessage(message); - return Promise.resolve(true); + return Promise.resolve({ success: true }); } ); @@ -495,7 +503,7 @@ describe("MessageChannel", function () { bloomFilter.lookup(MessageChannel.getMessageId(new Uint8Array())) ).to.equal(false); - const localLog = (channelA as any).messageIdLog as { + const localLog = (channelA as any).localHistory as { timestamp: number; messageId: string; }[]; @@ -514,7 +522,7 @@ describe("MessageChannel", function () { expect(timestampAfter).to.equal(expectedTimestamp); expect(timestampAfter).to.be.greaterThan(timestampBefore); - const localLog = (channelB as any).messageIdLog as { + const localLog = (channelB as any).localHistory as { timestamp: number; messageId: string; }[]; @@ -530,7 +538,7 @@ describe("MessageChannel", function () { for (const m of messagesA) { await channelA.sendMessage(utf8ToBytes(m), (message) => { channelB.receiveMessage(message); - return Promise.resolve(true); + return Promise.resolve({ success: true }); }); } diff --git a/packages/sds/src/sds.ts b/packages/sds/src/sds.ts index c316f88ff4..ea3c26a814 100644 --- a/packages/sds/src/sds.ts +++ b/packages/sds/src/sds.ts @@ -13,6 +13,7 @@ type MessageChannelEvents = { }; export type Message = proto_sds_message.SdsMessage; +export type HistoryEntry = proto_sds_message.HistoryEntry; export type ChannelId = string; export const DEFAULT_BLOOM_FILTER_OPTIONS = { @@ -36,7 +37,7 @@ export class MessageChannel extends TypedEventEmitter<MessageChannelEvents> { private outgoingBuffer: Message[]; private acknowledgements: Map<string, number>; private incomingBuffer: Message[]; - private messageIdLog: { timestamp: number; messageId: string }[]; + private localHistory: { timestamp: number; historyEntry: HistoryEntry }[]; private channelId: ChannelId; private causalHistorySize: number; private acknowledgementCount: number; @@ -56,7 +57,7 @@ export class MessageChannel extends TypedEventEmitter<MessageChannelEvents> { this.outgoingBuffer = []; this.acknowledgements = new Map(); this.incomingBuffer = []; - this.messageIdLog = []; + this.localHistory = []; this.causalHistorySize = options.causalHistorySize ?? DEFAULT_CAUSAL_HISTORY_SIZE; this.acknowledgementCount = this.getAcknowledgementCount(); @@ -90,7 +91,10 @@ export class MessageChannel extends TypedEventEmitter<MessageChannelEvents> { */ public async sendMessage( payload: Uint8Array, - callback?: (message: Message) => Promise<boolean> + callback?: (message: Message) => Promise<{ + success: boolean; + retrievalHint?: Uint8Array; + }> ): Promise<void> { this.lamportTimestamp++; @@ -100,9 +104,9 @@ export class MessageChannel extends TypedEventEmitter<MessageChannelEvents> { messageId, channelId: this.channelId, lamportTimestamp: this.lamportTimestamp, - causalHistory: this.messageIdLog + causalHistory: this.localHistory .slice(-this.causalHistorySize) - .map(({ messageId }) => messageId), + .map(({ historyEntry }) => historyEntry), bloomFilter: this.filter.toBytes(), content: payload }; @@ -110,10 +114,16 @@ export class MessageChannel extends TypedEventEmitter<MessageChannelEvents> { this.outgoingBuffer.push(message); if (callback) { - const success = await callback(message); + const { success, retrievalHint } = await callback(message); if (success) { this.filter.insert(messageId); - this.messageIdLog.push({ timestamp: this.lamportTimestamp, messageId }); + this.localHistory.push({ + timestamp: this.lamportTimestamp, + historyEntry: { + messageId, + retrievalHint + } + }); } } } @@ -175,9 +185,10 @@ export class MessageChannel extends TypedEventEmitter<MessageChannelEvents> { this.filter.insert(message.messageId); } // verify causal history - const dependenciesMet = message.causalHistory.every((messageId) => - this.messageIdLog.some( - ({ messageId: logMessageId }) => logMessageId === messageId + const dependenciesMet = message.causalHistory.every((historyEntry) => + this.localHistory.some( + ({ historyEntry: { messageId } }) => + messageId === historyEntry.messageId ) ); if (!dependenciesMet) { @@ -189,17 +200,18 @@ export class MessageChannel extends TypedEventEmitter<MessageChannelEvents> { } // https://rfc.vac.dev/vac/raw/sds/#periodic-incoming-buffer-sweep - public sweepIncomingBuffer(): string[] { + public sweepIncomingBuffer(): HistoryEntry[] { const { buffer, missing } = this.incomingBuffer.reduce<{ buffer: Message[]; - missing: string[]; + missing: HistoryEntry[]; }>( ({ buffer, missing }, message) => { // Check each message for missing dependencies const missingDependencies = message.causalHistory.filter( - (messageId) => - !this.messageIdLog.some( - ({ messageId: logMessageId }) => logMessageId === messageId + (messageHistoryEntry) => + !this.localHistory.some( + ({ historyEntry: { messageId } }) => + messageId === messageHistoryEntry.messageId ) ); if (missingDependencies.length === 0) { @@ -227,7 +239,7 @@ export class MessageChannel extends TypedEventEmitter<MessageChannelEvents> { missing: missing.concat(missingDependencies) }; }, - { buffer: new Array<Message>(), missing: new Array<string>() } + { buffer: new Array<Message>(), missing: new Array<HistoryEntry>() } ); // Update the incoming buffer to only include messages with no missing dependencies this.incomingBuffer = buffer; @@ -284,9 +296,9 @@ export class MessageChannel extends TypedEventEmitter<MessageChannelEvents> { messageId: MessageChannel.getMessageId(emptyMessage), channelId: this.channelId, lamportTimestamp: this.lamportTimestamp, - causalHistory: this.messageIdLog + causalHistory: this.localHistory .slice(-this.causalHistorySize) - .map(({ messageId }) => messageId), + .map(({ historyEntry }) => historyEntry), bloomFilter: this.filter.toBytes(), content: emptyMessage }; @@ -298,7 +310,7 @@ export class MessageChannel extends TypedEventEmitter<MessageChannelEvents> { } // See https://rfc.vac.dev/vac/raw/sds/#deliver-message - private deliverMessage(message: Message): void { + private deliverMessage(message: Message, retrievalHint?: Uint8Array): void { this.notifyDeliveredMessage(message.messageId); const messageLamportTimestamp = message.lamportTimestamp ?? 0; @@ -321,15 +333,18 @@ export class MessageChannel extends TypedEventEmitter<MessageChannelEvents> { // If one or more message IDs with the same Lamport timestamp already exists, // the participant MUST follow the Resolve Conflicts procedure. // https://rfc.vac.dev/vac/raw/sds/#resolve-conflicts - this.messageIdLog.push({ + this.localHistory.push({ timestamp: messageLamportTimestamp, - messageId: message.messageId + historyEntry: { + messageId: message.messageId, + retrievalHint + } }); - this.messageIdLog.sort((a, b) => { + this.localHistory.sort((a, b) => { if (a.timestamp !== b.timestamp) { return a.timestamp - b.timestamp; } - return a.messageId.localeCompare(b.messageId); + return a.historyEntry.messageId.localeCompare(b.historyEntry.messageId); }); } @@ -338,9 +353,9 @@ export class MessageChannel extends TypedEventEmitter<MessageChannelEvents> { // See https://rfc.vac.dev/vac/raw/sds/#review-ack-status private reviewAckStatus(receivedMessage: Message): void { // the participant MUST mark all messages in the received causal_history as acknowledged. - receivedMessage.causalHistory.forEach((messageId) => { + receivedMessage.causalHistory.forEach(({ messageId }) => { this.outgoingBuffer = this.outgoingBuffer.filter( - (msg) => msg.messageId !== messageId + ({ messageId: outgoingMessageId }) => outgoingMessageId !== messageId ); this.acknowledgements.delete(messageId); if (!this.filter.lookup(messageId)) {