Skip to content

Commit cc4bcaf

Browse files
committed
feat(sds): send and receive sync messages
1 parent 5b3a256 commit cc4bcaf

File tree

2 files changed

+120
-2
lines changed

2 files changed

+120
-2
lines changed

packages/sds/src/sds.spec.ts

+76
Original file line numberDiff line numberDiff line change
@@ -471,4 +471,80 @@ describe("MessageChannel", function () {
471471
expect(possiblyAcknowledged.length).to.equal(messagesA.length);
472472
});
473473
});
474+
475+
describe("Sync messages", () => {
476+
beforeEach(() => {
477+
channelA = new MessageChannel(channelId);
478+
channelB = new MessageChannel(channelId);
479+
});
480+
481+
it("should be sent with empty content", () => {
482+
channelA.sendSyncMessage((message) => {
483+
expect(message.content?.length).to.equal(0);
484+
return true;
485+
});
486+
});
487+
488+
it("should not be added to outgoing buffer, bloom filter, or local log", () => {
489+
channelA.sendSyncMessage();
490+
491+
const outgoingBuffer = (channelA as any).outgoingBuffer as Message[];
492+
expect(outgoingBuffer.length).to.equal(0);
493+
494+
const bloomFilter = getBloomFilter(channelA);
495+
expect(
496+
bloomFilter.lookup(MessageChannel.getMessageId(new Uint8Array()))
497+
).to.equal(false);
498+
499+
const localLog = (channelA as any).messageIdLog as {
500+
timestamp: number;
501+
messageId: string;
502+
}[];
503+
expect(localLog.length).to.equal(0);
504+
});
505+
506+
it("should be delivered but not added to local log or bloom filter", () => {
507+
const timestampBefore = (channelB as any).lamportTimestamp;
508+
let expectedTimestamp: number | undefined;
509+
channelA.sendSyncMessage((message) => {
510+
expectedTimestamp = message.lamportTimestamp;
511+
channelB.receiveMessage(message);
512+
return true;
513+
});
514+
const timestampAfter = (channelB as any).lamportTimestamp;
515+
expect(timestampAfter).to.equal(expectedTimestamp);
516+
expect(timestampAfter).to.be.greaterThan(timestampBefore);
517+
518+
const localLog = (channelB as any).messageIdLog as {
519+
timestamp: number;
520+
messageId: string;
521+
}[];
522+
expect(localLog.length).to.equal(0);
523+
524+
const bloomFilter = getBloomFilter(channelB);
525+
expect(
526+
bloomFilter.lookup(MessageChannel.getMessageId(new Uint8Array()))
527+
).to.equal(false);
528+
});
529+
530+
it("should update ack status of messages in outgoing buffer", () => {
531+
messagesA.forEach((m) => {
532+
channelA.sendMessage(utf8ToBytes(m), (message) => {
533+
channelB.receiveMessage(message);
534+
return true;
535+
});
536+
});
537+
538+
channelB.sendSyncMessage((message) => {
539+
channelA.receiveMessage(message);
540+
return true;
541+
});
542+
543+
const causalHistorySize = (channelA as any).causalHistorySize;
544+
const outgoingBuffer = (channelA as any).outgoingBuffer as Message[];
545+
expect(outgoingBuffer.length).to.equal(
546+
messagesA.length - causalHistorySize
547+
);
548+
});
549+
});
474550
});

packages/sds/src/sds.ts

+44-2
Original file line numberDiff line numberDiff line change
@@ -112,8 +112,10 @@ export class MessageChannel {
112112
public receiveMessage(message: Message): void {
113113
// review ack status
114114
this.reviewAckStatus(message);
115-
// add to bloom filter
116-
this.filter.insert(message.messageId);
115+
// add to bloom filter (skip for messages with empty content)
116+
if (message.content?.length && message.content.length > 0) {
117+
this.filter.insert(message.messageId);
118+
}
117119
// verify causal history
118120
const dependenciesMet = message.causalHistory.every((messageId) =>
119121
this.messageIdLog.some(
@@ -203,13 +205,51 @@ export class MessageChannel {
203205
);
204206
}
205207

208+
/**
209+
* Send a sync message to the SDS channel.
210+
*
211+
* Increments the lamport timestamp, constructs a `Message` object
212+
* with an empty load. Skips outgoing buffer, filter, and local log.
213+
*
214+
* See https://rfc.vac.dev/vac/raw/sds/#send-sync-message
215+
*
216+
* @param callback - A callback function that returns a boolean indicating whether the message was sent successfully.
217+
*/
218+
public sendSyncMessage(callback?: (message: Message) => boolean): boolean {
219+
this.lamportTimestamp++;
220+
221+
const emptyMessage = new Uint8Array();
222+
223+
const message: Message = {
224+
messageId: MessageChannel.getMessageId(emptyMessage),
225+
channelId: this.channelId,
226+
lamportTimestamp: this.lamportTimestamp,
227+
causalHistory: this.messageIdLog
228+
.slice(-this.causalHistorySize)
229+
.map(({ messageId }) => messageId),
230+
bloomFilter: this.filter.toBytes(),
231+
content: emptyMessage
232+
};
233+
234+
if (callback) {
235+
return callback(message);
236+
}
237+
return false;
238+
}
239+
206240
// See https://rfc.vac.dev/vac/raw/sds/#deliver-message
207241
private deliverMessage(message: Message): void {
208242
const messageLamportTimestamp = message.lamportTimestamp ?? 0;
209243
if (messageLamportTimestamp > this.lamportTimestamp) {
210244
this.lamportTimestamp = messageLamportTimestamp;
211245
}
212246

247+
if (message.content?.length === 0) {
248+
// Messages with empty content are sync messages.
249+
// They are not added to the local log or bloom filter.
250+
return;
251+
}
252+
213253
// The participant MUST insert the message ID into its local log,
214254
// based on Lamport timestamp.
215255
// If one or more message IDs with the same Lamport timestamp already exists,
@@ -227,6 +267,8 @@ export class MessageChannel {
227267
});
228268
}
229269

270+
// For each received message (including sync messages), inspect the causal history and bloom filter
271+
// to determine the acknowledgement status of messages in the outgoing buffer.
230272
// See https://rfc.vac.dev/vac/raw/sds/#review-ack-status
231273
private reviewAckStatus(receivedMessage: Message): void {
232274
// the participant MUST mark all messages in the received causal_history as acknowledged.

0 commit comments

Comments
 (0)