diff --git a/packages/sds/src/sds.spec.ts b/packages/sds/src/sds.spec.ts index db69b14a04..9caba42c60 100644 --- a/packages/sds/src/sds.spec.ts +++ b/packages/sds/src/sds.spec.ts @@ -9,8 +9,8 @@ import { } from "./sds.js"; const channelId = "test-channel"; -const callback = (_message: Message): boolean => { - return true; +const callback = (_message: Message): Promise => { + return Promise.resolve(true); }; const getBloomFilter = (channel: MessageChannel): DefaultBloomFilter => { @@ -36,31 +36,31 @@ describe("MessageChannel", function () { channelA = new MessageChannel(channelId); }); - it("should increase lamport timestamp", () => { + it("should increase lamport timestamp", async () => { const timestampBefore = (channelA as any).lamportTimestamp; - channelA.sendMessage(new Uint8Array(), callback); + await channelA.sendMessage(new Uint8Array(), callback); const timestampAfter = (channelA as any).lamportTimestamp; expect(timestampAfter).to.equal(timestampBefore + 1); }); - it("should push the message to the outgoing buffer", () => { + it("should push the message to the outgoing buffer", async () => { const bufferLengthBefore = (channelA as any).outgoingBuffer.length; - channelA.sendMessage(new Uint8Array(), callback); + await channelA.sendMessage(new Uint8Array(), callback); const bufferLengthAfter = (channelA as any).outgoingBuffer.length; expect(bufferLengthAfter).to.equal(bufferLengthBefore + 1); }); - it("should insert message into bloom filter", () => { + it("should insert message into bloom filter", async () => { const messageId = MessageChannel.getMessageId(new Uint8Array()); - channelA.sendMessage(new Uint8Array(), callback); + await channelA.sendMessage(new Uint8Array(), callback); const bloomFilter = getBloomFilter(channelA); expect(bloomFilter.lookup(messageId)).to.equal(true); }); - it("should insert message id into causal history", () => { + it("should insert message id into causal history", async () => { const expectedTimestamp = (channelA as any).lamportTimestamp + 1; const messageId = MessageChannel.getMessageId(new Uint8Array()); - channelA.sendMessage(new Uint8Array(), callback); + await channelA.sendMessage(new Uint8Array(), callback); const messageIdLog = (channelA as any).messageIdLog as { timestamp: number; messageId: string; @@ -74,7 +74,7 @@ describe("MessageChannel", function () { ).to.equal(true); }); - it("should attach causal history and bloom filter to each message", () => { + it("should attach causal history and bloom filter to each message", async () => { const bloomFilter = new DefaultBloomFilter(DEFAULT_BLOOM_FILTER_OPTIONS); const causalHistorySize = (channelA as any).causalHistorySize; const filterBytes = new Array(); @@ -82,11 +82,11 @@ describe("MessageChannel", function () { .fill("message") .map((message, index) => `${message}-${index}`); - messages.forEach((message) => { + for (const message of messages) { filterBytes.push(bloomFilter.toBytes()); - channelA.sendMessage(utf8ToBytes(message), callback); + await channelA.sendMessage(utf8ToBytes(message), callback); bloomFilter.insert(MessageChannel.getMessageId(utf8ToBytes(message))); - }); + } const outgoingBuffer = (channelA as any).outgoingBuffer as Message[]; expect(outgoingBuffer.length).to.equal(messages.length); @@ -115,49 +115,49 @@ describe("MessageChannel", function () { channelB = new MessageChannel(channelId); }); - it("should increase lamport timestamp", () => { + it("should increase lamport timestamp", async () => { const timestampBefore = (channelA as any).lamportTimestamp; - channelB.sendMessage(new Uint8Array(), (message) => { + await channelB.sendMessage(new Uint8Array(), (message) => { channelA.receiveMessage(message); - return true; + return Promise.resolve(true); }); const timestampAfter = (channelA as any).lamportTimestamp; expect(timestampAfter).to.equal(timestampBefore + 1); }); - it("should update lamport timestamp if greater than current timestamp and dependencies are met", () => { - messagesA.forEach((m) => { - channelA.sendMessage(utf8ToBytes(m), callback); - }); - messagesB.forEach((m) => { - channelB.sendMessage(utf8ToBytes(m), (message) => { + it("should update lamport timestamp if greater than current timestamp and dependencies are met", async () => { + for (const m of messagesA) { + await channelA.sendMessage(utf8ToBytes(m), callback); + } + for (const m of messagesB) { + await channelB.sendMessage(utf8ToBytes(m), (message) => { channelA.receiveMessage(message); - return true; + return Promise.resolve(true); }); - }); + } const timestampAfter = (channelA as any).lamportTimestamp; expect(timestampAfter).to.equal(messagesB.length); }); - it("should maintain proper timestamps if all messages received", () => { + it("should maintain proper timestamps if all messages received", async () => { let timestamp = 0; - messagesA.forEach((m) => { - channelA.sendMessage(utf8ToBytes(m), (message) => { + for (const m of messagesA) { + await channelA.sendMessage(utf8ToBytes(m), (message) => { timestamp++; channelB.receiveMessage(message); expect((channelB as any).lamportTimestamp).to.equal(timestamp); - return true; + return Promise.resolve(true); }); - }); + } - messagesB.forEach((m) => { - channelB.sendMessage(utf8ToBytes(m), (message) => { + for (const m of messagesB) { + await channelB.sendMessage(utf8ToBytes(m), (message) => { timestamp++; channelA.receiveMessage(message); expect((channelA as any).lamportTimestamp).to.equal(timestamp); - return true; + return Promise.resolve(true); }); - }); + } const expectedLength = messagesA.length + messagesB.length; expect((channelA as any).lamportTimestamp).to.equal(expectedLength); @@ -166,29 +166,29 @@ describe("MessageChannel", function () { ); }); - it("should add received messages to bloom filter", () => { - messagesA.forEach((m) => { - channelA.sendMessage(utf8ToBytes(m), (message) => { + it("should add received messages to bloom filter", async () => { + for (const m of messagesA) { + await channelA.sendMessage(utf8ToBytes(m), (message) => { channelB.receiveMessage(message); const bloomFilter = getBloomFilter(channelB); expect(bloomFilter.lookup(message.messageId)).to.equal(true); - return true; + return Promise.resolve(true); }); - }); + } }); - it("should add to incoming buffer if dependencies are not met", () => { - messagesA.forEach((m) => { - channelA.sendMessage(utf8ToBytes(m), callback); - }); + it("should add to incoming buffer if dependencies are not met", async () => { + for (const m of messagesA) { + await channelA.sendMessage(utf8ToBytes(m), callback); + } let receivedMessage: Message | null = null; const timestampBefore = (channelB as any).lamportTimestamp; - channelA.sendMessage(utf8ToBytes(messagesB[0]), (message) => { + await channelA.sendMessage(utf8ToBytes(messagesB[0]), (message) => { receivedMessage = message; channelB.receiveMessage(message); - return true; + return Promise.resolve(true); }); const incomingBuffer = (channelB as any).incomingBuffer as Message[]; @@ -216,27 +216,27 @@ describe("MessageChannel", function () { channelB = new MessageChannel(channelId); }); - it("should mark all messages in causal history as acknowledged", () => { - messagesA.forEach((m) => { - channelA.sendMessage(utf8ToBytes(m), (message) => { + it("should mark all messages in causal history as acknowledged", async () => { + for (const m of messagesA) { + await channelA.sendMessage(utf8ToBytes(m), (message) => { channelB.receiveMessage(message); - return true; + return Promise.resolve(true); }); - }); + } let notInHistory: Message | null = null; - channelA.sendMessage(utf8ToBytes("not-in-history"), (message) => { + await channelA.sendMessage(utf8ToBytes("not-in-history"), (message) => { notInHistory = message; - return true; + return Promise.resolve(true); }); expect((channelA as any).outgoingBuffer.length).to.equal( messagesA.length + 1 ); - channelB.sendMessage(utf8ToBytes(messagesB[0]), (message) => { + await channelB.sendMessage(utf8ToBytes(messagesB[0]), (message) => { channelA.receiveMessage(message); - return true; + return Promise.resolve(true); }); // Since messagesA are in causal history of channel B's message @@ -247,7 +247,7 @@ describe("MessageChannel", function () { expect(outgoingBuffer[0].messageId).to.equal(notInHistory!.messageId); }); - it("should track probabilistic acknowledgements of messages received in bloom filter", () => { + it("should track probabilistic acknowledgements of messages received in bloom filter", async () => { const acknowledgementCount = (channelA as any).acknowledgementCount; const causalHistorySize = (channelA as any).causalHistorySize; @@ -258,24 +258,24 @@ describe("MessageChannel", function () { ]; const messages = [...messagesA, ...messagesB.slice(0, -1)]; // Send messages to be received by channel B - messages.forEach((m) => { - channelA.sendMessage(utf8ToBytes(m), (message) => { + for (const m of messages) { + await channelA.sendMessage(utf8ToBytes(m), (message) => { channelB.receiveMessage(message); - return true; + return Promise.resolve(true); }); - }); + } // Send messages not received by channel B - unacknowledgedMessages.forEach((m) => { - channelA.sendMessage(utf8ToBytes(m), callback); - }); + for (const m of unacknowledgedMessages) { + await channelA.sendMessage(utf8ToBytes(m), callback); + } // Channel B sends a message to channel A - channelB.sendMessage( + await channelB.sendMessage( utf8ToBytes(messagesB[messagesB.length - 1]), (message) => { channelA.receiveMessage(message); - return true; + return Promise.resolve(true); } ); @@ -307,9 +307,9 @@ describe("MessageChannel", function () { // in the bloom filter as before, which should mark them as fully acknowledged in channel A for (let i = 1; i < acknowledgementCount; i++) { // Send messages until acknowledgement count is reached - channelB.sendMessage(utf8ToBytes(`x-${i}`), (message) => { + await channelB.sendMessage(utf8ToBytes(`x-${i}`), (message) => { channelA.receiveMessage(message); - return true; + return Promise.resolve(true); }); } @@ -337,15 +337,15 @@ describe("MessageChannel", function () { channelB = new MessageChannel(channelId); }); - it("should detect messages with missing dependencies", () => { + it("should detect messages with missing dependencies", async () => { const causalHistorySize = (channelA as any).causalHistorySize; - messagesA.forEach((m) => { - channelA.sendMessage(utf8ToBytes(m), callback); - }); + for (const m of messagesA) { + await channelA.sendMessage(utf8ToBytes(m), callback); + } - channelA.sendMessage(utf8ToBytes(messagesB[0]), (message) => { + await channelA.sendMessage(utf8ToBytes(messagesB[0]), (message) => { channelB.receiveMessage(message); - return true; + return Promise.resolve(true); }); const incomingBuffer = (channelB as any).incomingBuffer as Message[]; @@ -361,19 +361,19 @@ describe("MessageChannel", function () { ); }); - it("should deliver messages after dependencies are met", () => { + it("should deliver messages after dependencies are met", async () => { const causalHistorySize = (channelA as any).causalHistorySize; const sentMessages = new Array(); - messagesA.forEach((m) => { - channelA.sendMessage(utf8ToBytes(m), (message) => { + for (const m of messagesA) { + await channelA.sendMessage(utf8ToBytes(m), (message) => { sentMessages.push(message); - return true; + return Promise.resolve(true); }); - }); + } - channelA.sendMessage(utf8ToBytes(messagesB[0]), (message) => { + await channelA.sendMessage(utf8ToBytes(messagesB[0]), (message) => { channelB.receiveMessage(message); - return true; + return Promise.resolve(true); }); const missingMessages = channelB.sweepIncomingBuffer(); @@ -406,13 +406,13 @@ describe("MessageChannel", function () { 10 ); - messagesA.forEach((m) => { - channelA.sendMessage(utf8ToBytes(m), callback); - }); + for (const m of messagesA) { + await channelA.sendMessage(utf8ToBytes(m), callback); + } - channelA.sendMessage(utf8ToBytes(messagesB[0]), (message) => { + await channelA.sendMessage(utf8ToBytes(messagesB[0]), (message) => { channelC.receiveMessage(message); - return true; + return Promise.resolve(true); }); const missingMessages = channelC.sweepIncomingBuffer(); @@ -434,15 +434,15 @@ describe("MessageChannel", function () { channelB = new MessageChannel(channelId); }); - it("should partition messages based on acknowledgement status", () => { + it("should partition messages based on acknowledgement status", async () => { const unacknowledgedMessages: Message[] = []; - messagesA.forEach((m) => { - channelA.sendMessage(utf8ToBytes(m), (message) => { + for (const m of messagesA) { + await channelA.sendMessage(utf8ToBytes(m), (message) => { unacknowledgedMessages.push(message); channelB.receiveMessage(message); - return true; + return Promise.resolve(true); }); - }); + } let { unacknowledged, possiblyAcknowledged } = channelA.sweepOutgoingBuffer(); @@ -451,15 +451,15 @@ describe("MessageChannel", function () { // Make sure messages sent by channel A are not in causal history const causalHistorySize = (channelA as any).causalHistorySize; - messagesB.slice(0, causalHistorySize).forEach((m) => { - channelB.sendMessage(utf8ToBytes(m), callback); - }); + for (const m of messagesB.slice(0, causalHistorySize)) { + await channelB.sendMessage(utf8ToBytes(m), callback); + } - channelB.sendMessage( + await channelB.sendMessage( utf8ToBytes(messagesB[causalHistorySize]), (message) => { channelA.receiveMessage(message); - return true; + return Promise.resolve(true); } ); @@ -471,4 +471,80 @@ describe("MessageChannel", function () { expect(possiblyAcknowledged.length).to.equal(messagesA.length); }); }); + + describe("Sync messages", () => { + beforeEach(() => { + channelA = new MessageChannel(channelId); + channelB = new MessageChannel(channelId); + }); + + it("should be sent with empty content", async () => { + await channelA.sendSyncMessage((message) => { + expect(message.content?.length).to.equal(0); + return Promise.resolve(true); + }); + }); + + it("should not be added to outgoing buffer, bloom filter, or local log", async () => { + await channelA.sendSyncMessage(); + + const outgoingBuffer = (channelA as any).outgoingBuffer as Message[]; + expect(outgoingBuffer.length).to.equal(0); + + const bloomFilter = getBloomFilter(channelA); + expect( + bloomFilter.lookup(MessageChannel.getMessageId(new Uint8Array())) + ).to.equal(false); + + const localLog = (channelA as any).messageIdLog as { + timestamp: number; + messageId: string; + }[]; + expect(localLog.length).to.equal(0); + }); + + it("should be delivered but not added to local log or bloom filter", async () => { + const timestampBefore = (channelB as any).lamportTimestamp; + let expectedTimestamp: number | undefined; + await channelA.sendSyncMessage((message) => { + expectedTimestamp = message.lamportTimestamp; + channelB.receiveMessage(message); + return Promise.resolve(true); + }); + const timestampAfter = (channelB as any).lamportTimestamp; + expect(timestampAfter).to.equal(expectedTimestamp); + expect(timestampAfter).to.be.greaterThan(timestampBefore); + + const localLog = (channelB as any).messageIdLog as { + timestamp: number; + messageId: string; + }[]; + expect(localLog.length).to.equal(0); + + const bloomFilter = getBloomFilter(channelB); + expect( + bloomFilter.lookup(MessageChannel.getMessageId(new Uint8Array())) + ).to.equal(false); + }); + + it("should update ack status of messages in outgoing buffer", async () => { + for (const m of messagesA) { + await channelA.sendMessage(utf8ToBytes(m), (message) => { + channelB.receiveMessage(message); + return Promise.resolve(true); + }); + } + + await channelB.sendSyncMessage((message) => { + channelA.receiveMessage(message); + return Promise.resolve(true); + }); + + const causalHistorySize = (channelA as any).causalHistorySize; + const outgoingBuffer = (channelA as any).outgoingBuffer as Message[]; + expect(outgoingBuffer.length).to.equal( + messagesA.length - causalHistorySize + ); + }); + }); }); diff --git a/packages/sds/src/sds.ts b/packages/sds/src/sds.ts index 1e2f73662c..0582085cb5 100644 --- a/packages/sds/src/sds.ts +++ b/packages/sds/src/sds.ts @@ -65,10 +65,10 @@ export class MessageChannel { * @param payload - The payload to send. * @param callback - A callback function that returns a boolean indicating whether the message was sent successfully. */ - public sendMessage( + public async sendMessage( payload: Uint8Array, - callback?: (message: Message) => boolean - ): void { + callback?: (message: Message) => Promise + ): Promise { this.lamportTimestamp++; const messageId = MessageChannel.getMessageId(payload); @@ -87,7 +87,7 @@ export class MessageChannel { this.outgoingBuffer.push(message); if (callback) { - const success = callback(message); + const success = await callback(message); if (success) { this.filter.insert(messageId); this.messageIdLog.push({ timestamp: this.lamportTimestamp, messageId }); @@ -112,8 +112,10 @@ export class MessageChannel { public receiveMessage(message: Message): void { // review ack status this.reviewAckStatus(message); - // add to bloom filter - this.filter.insert(message.messageId); + // add to bloom filter (skip for messages with empty content) + if (message.content?.length && message.content.length > 0) { + this.filter.insert(message.messageId); + } // verify causal history const dependenciesMet = message.causalHistory.every((messageId) => this.messageIdLog.some( @@ -203,6 +205,40 @@ export class MessageChannel { ); } + /** + * Send a sync message to the SDS channel. + * + * Increments the lamport timestamp, constructs a `Message` object + * with an empty load. Skips outgoing buffer, filter, and local log. + * + * See https://rfc.vac.dev/vac/raw/sds/#send-sync-message + * + * @param callback - A callback function that returns a boolean indicating whether the message was sent successfully. + */ + public sendSyncMessage( + callback?: (message: Message) => Promise + ): Promise { + this.lamportTimestamp++; + + const emptyMessage = new Uint8Array(); + + const message: Message = { + messageId: MessageChannel.getMessageId(emptyMessage), + channelId: this.channelId, + lamportTimestamp: this.lamportTimestamp, + causalHistory: this.messageIdLog + .slice(-this.causalHistorySize) + .map(({ messageId }) => messageId), + bloomFilter: this.filter.toBytes(), + content: emptyMessage + }; + + if (callback) { + return callback(message); + } + return Promise.resolve(false); + } + // See https://rfc.vac.dev/vac/raw/sds/#deliver-message private deliverMessage(message: Message): void { const messageLamportTimestamp = message.lamportTimestamp ?? 0; @@ -210,6 +246,12 @@ export class MessageChannel { this.lamportTimestamp = messageLamportTimestamp; } + if (message.content?.length === 0) { + // Messages with empty content are sync messages. + // They are not added to the local log or bloom filter. + return; + } + // The participant MUST insert the message ID into its local log, // based on Lamport timestamp. // If one or more message IDs with the same Lamport timestamp already exists, @@ -227,6 +269,8 @@ export class MessageChannel { }); } + // For each received message (including sync messages), inspect the causal history and bloom filter + // to determine the acknowledgement status of messages in the outgoing buffer. // See https://rfc.vac.dev/vac/raw/sds/#review-ack-status private reviewAckStatus(receivedMessage: Message): void { // the participant MUST mark all messages in the received causal_history as acknowledged.