Skip to content

Commit efc7597

Browse files
committed
feat(sds): send and receive sync messages
1 parent 9c20b6e commit efc7597

File tree

1 file changed

+44
-2
lines changed

1 file changed

+44
-2
lines changed

packages/sds/src/sds.ts

+44-2
Original file line numberDiff line numberDiff line change
@@ -112,8 +112,10 @@ export class MessageChannel {
112112
public receiveMessage(message: Message): void {
113113
// review ack status
114114
this.reviewAckStatus(message);
115-
// add to bloom filter
116-
this.filter.insert(message.messageId);
115+
// add to bloom filter (skip for messages with empty content)
116+
if (message.content?.length && message.content.length > 0) {
117+
this.filter.insert(message.messageId);
118+
}
117119
// verify causal history
118120
const dependenciesMet = message.causalHistory.every((messageId) =>
119121
this.messageIdLog.some(
@@ -203,13 +205,51 @@ export class MessageChannel {
203205
);
204206
}
205207

208+
/**
209+
* Send a sync message to the SDS channel.
210+
*
211+
* Increments the lamport timestamp, constructs a `Message` object
212+
* with an empty load. Skips outgoing buffer, filter, and local log.
213+
*
214+
* See https://rfc.vac.dev/vac/raw/sds/#send-sync-message
215+
*
216+
* @param callback - A callback function that returns a boolean indicating whether the message was sent successfully.
217+
*/
218+
public sendSyncMessage(callback?: (message: Message) => boolean): boolean {
219+
this.lamportTimestamp++;
220+
221+
const emptyMessage = new Uint8Array();
222+
223+
const message: Message = {
224+
messageId: MessageChannel.getMessageId(emptyMessage),
225+
channelId: this.channelId,
226+
lamportTimestamp: this.lamportTimestamp,
227+
causalHistory: this.messageIdLog
228+
.slice(-this.causalHistorySize)
229+
.map(({ messageId }) => messageId),
230+
bloomFilter: this.filter.toBytes(),
231+
content: emptyMessage
232+
};
233+
234+
if (callback) {
235+
return callback(message);
236+
}
237+
return false;
238+
}
239+
206240
// See https://rfc.vac.dev/vac/raw/sds/#deliver-message
207241
private deliverMessage(message: Message): void {
208242
const messageLamportTimestamp = message.lamportTimestamp ?? 0;
209243
if (messageLamportTimestamp > this.lamportTimestamp) {
210244
this.lamportTimestamp = messageLamportTimestamp;
211245
}
212246

247+
if (message.content?.length === 0) {
248+
// Messages with empty content are sync messages.
249+
// They are not added to the local log or bloom filter.
250+
return;
251+
}
252+
213253
// The participant MUST insert the message ID into its local log,
214254
// based on Lamport timestamp.
215255
// If one or more message IDs with the same Lamport timestamp already exists,
@@ -227,6 +267,8 @@ export class MessageChannel {
227267
});
228268
}
229269

270+
// For each received message (including sync messages), inspect the causal history and bloom filter
271+
// to determine the acknowledgement status of messages in the outgoing buffer.
230272
// See https://rfc.vac.dev/vac/raw/sds/#review-ack-status
231273
private reviewAckStatus(receivedMessage: Message): void {
232274
// the participant MUST mark all messages in the received causal_history as acknowledged.

0 commit comments

Comments
 (0)