Skip to content

Commit 9c20b6e

Browse files
committed
feat(sds): adds logic to sweep incoming and outgoing buffers
1 parent 4cd1eea commit 9c20b6e

File tree

2 files changed

+223
-1
lines changed

2 files changed

+223
-1
lines changed

packages/sds/src/sds.spec.ts

+141
Original file line numberDiff line numberDiff line change
@@ -330,4 +330,145 @@ describe("MessageChannel", function () {
330330
});
331331
});
332332
});
333+
334+
describe("Sweeping incoming buffer", () => {
335+
beforeEach(() => {
336+
channelA = new MessageChannel(channelId);
337+
channelB = new MessageChannel(channelId);
338+
});
339+
340+
it("should detect messages with missing dependencies", () => {
341+
const causalHistorySize = (channelA as any).causalHistorySize;
342+
messagesA.forEach((m) => {
343+
channelA.sendMessage(utf8ToBytes(m), callback);
344+
});
345+
346+
channelA.sendMessage(utf8ToBytes(messagesB[0]), (message) => {
347+
channelB.receiveMessage(message);
348+
return true;
349+
});
350+
351+
const incomingBuffer = (channelB as any).incomingBuffer as Message[];
352+
expect(incomingBuffer.length).to.equal(1);
353+
expect(incomingBuffer[0].messageId).to.equal(
354+
MessageChannel.getMessageId(utf8ToBytes(messagesB[0]))
355+
);
356+
357+
const missingMessages = channelB.sweepIncomingBuffer();
358+
expect(missingMessages.length).to.equal(causalHistorySize);
359+
expect(missingMessages[0]).to.equal(
360+
MessageChannel.getMessageId(utf8ToBytes(messagesA[0]))
361+
);
362+
});
363+
364+
it("should deliver messages after dependencies are met", () => {
365+
const causalHistorySize = (channelA as any).causalHistorySize;
366+
const sentMessages = new Array<Message>();
367+
messagesA.forEach((m) => {
368+
channelA.sendMessage(utf8ToBytes(m), (message) => {
369+
sentMessages.push(message);
370+
return true;
371+
});
372+
});
373+
374+
channelA.sendMessage(utf8ToBytes(messagesB[0]), (message) => {
375+
channelB.receiveMessage(message);
376+
return true;
377+
});
378+
379+
const missingMessages = channelB.sweepIncomingBuffer();
380+
expect(missingMessages.length).to.equal(causalHistorySize);
381+
expect(missingMessages[0]).to.equal(
382+
MessageChannel.getMessageId(utf8ToBytes(messagesA[0]))
383+
);
384+
385+
let incomingBuffer = (channelB as any).incomingBuffer as Message[];
386+
expect(incomingBuffer.length).to.equal(1);
387+
388+
sentMessages.forEach((m) => {
389+
channelB.receiveMessage(m);
390+
});
391+
392+
const missingMessages2 = channelB.sweepIncomingBuffer();
393+
expect(missingMessages2.length).to.equal(0);
394+
395+
incomingBuffer = (channelB as any).incomingBuffer as Message[];
396+
expect(incomingBuffer.length).to.equal(0);
397+
});
398+
399+
it("should remove messages without delivering if timeout is exceeded", async () => {
400+
const causalHistorySize = (channelA as any).causalHistorySize;
401+
// Create a channel with very very short timeout
402+
const channelC: MessageChannel = new MessageChannel(
403+
channelId,
404+
causalHistorySize,
405+
true,
406+
10
407+
);
408+
409+
messagesA.forEach((m) => {
410+
channelA.sendMessage(utf8ToBytes(m), callback);
411+
});
412+
413+
channelA.sendMessage(utf8ToBytes(messagesB[0]), (message) => {
414+
channelC.receiveMessage(message);
415+
return true;
416+
});
417+
418+
const missingMessages = channelC.sweepIncomingBuffer();
419+
expect(missingMessages.length).to.equal(causalHistorySize);
420+
let incomingBuffer = (channelC as any).incomingBuffer as Message[];
421+
expect(incomingBuffer.length).to.equal(1);
422+
423+
await new Promise((resolve) => setTimeout(resolve, 20));
424+
425+
channelC.sweepIncomingBuffer();
426+
incomingBuffer = (channelC as any).incomingBuffer as Message[];
427+
expect(incomingBuffer.length).to.equal(0);
428+
});
429+
});
430+
431+
describe("Sweeping outgoing buffer", () => {
432+
beforeEach(() => {
433+
channelA = new MessageChannel(channelId);
434+
channelB = new MessageChannel(channelId);
435+
});
436+
437+
it("should partition messages based on acknowledgement status", () => {
438+
const unacknowledgedMessages: Message[] = [];
439+
messagesA.forEach((m) => {
440+
channelA.sendMessage(utf8ToBytes(m), (message) => {
441+
unacknowledgedMessages.push(message);
442+
channelB.receiveMessage(message);
443+
return true;
444+
});
445+
});
446+
447+
let { unacknowledged, possiblyAcknowledged } =
448+
channelA.sweepOutgoingBuffer();
449+
expect(unacknowledged.length).to.equal(messagesA.length);
450+
expect(possiblyAcknowledged.length).to.equal(0);
451+
452+
// Make sure messages sent by channel A are not in causal history
453+
const causalHistorySize = (channelA as any).causalHistorySize;
454+
messagesB.slice(0, causalHistorySize).forEach((m) => {
455+
channelB.sendMessage(utf8ToBytes(m), callback);
456+
});
457+
458+
channelB.sendMessage(
459+
utf8ToBytes(messagesB[causalHistorySize]),
460+
(message) => {
461+
channelA.receiveMessage(message);
462+
return true;
463+
}
464+
);
465+
466+
// All messages that were previously unacknowledged should now be possibly acknowledged
467+
// since they were included in one of the bloom filters sent from channel B
468+
({ unacknowledged, possiblyAcknowledged } =
469+
channelA.sweepOutgoingBuffer());
470+
expect(unacknowledged.length).to.equal(0);
471+
expect(possiblyAcknowledged.length).to.equal(messagesA.length);
472+
});
473+
});
333474
});

packages/sds/src/sds.ts

+82-1
Original file line numberDiff line numberDiff line change
@@ -13,6 +13,7 @@ export const DEFAULT_BLOOM_FILTER_OPTIONS = {
1313
};
1414

1515
const DEFAULT_CAUSAL_HISTORY_SIZE = 2;
16+
const DEFAULT_RECEIVED_MESSAGE_TIMEOUT = 1000 * 60 * 5; // 5 minutes
1617

1718
export class MessageChannel {
1819
private lamportTimestamp: number;
@@ -24,10 +25,13 @@ export class MessageChannel {
2425
private channelId: ChannelId;
2526
private causalHistorySize: number;
2627
private acknowledgementCount: number;
28+
private timeReceived: Map<string, number>;
2729

2830
public constructor(
2931
channelId: ChannelId,
30-
causalHistorySize: number = DEFAULT_CAUSAL_HISTORY_SIZE
32+
causalHistorySize: number = DEFAULT_CAUSAL_HISTORY_SIZE,
33+
private receivedMessageTimeoutEnabled: boolean = false,
34+
private receivedMessageTimeout: number = DEFAULT_RECEIVED_MESSAGE_TIMEOUT
3135
) {
3236
this.channelId = channelId;
3337
this.lamportTimestamp = 0;
@@ -38,6 +42,7 @@ export class MessageChannel {
3842
this.messageIdLog = [];
3943
this.causalHistorySize = causalHistorySize;
4044
this.acknowledgementCount = this.getAcknowledgementCount();
45+
this.timeReceived = new Map();
4146
}
4247

4348
public static getMessageId(payload: Uint8Array): string {
@@ -117,11 +122,87 @@ export class MessageChannel {
117122
);
118123
if (!dependenciesMet) {
119124
this.incomingBuffer.push(message);
125+
this.timeReceived.set(message.messageId, Date.now());
120126
} else {
121127
this.deliverMessage(message);
122128
}
123129
}
124130

131+
// https://rfc.vac.dev/vac/raw/sds/#periodic-incoming-buffer-sweep
132+
public sweepIncomingBuffer(): string[] {
133+
const { buffer, missing } = this.incomingBuffer.reduce<{
134+
buffer: Message[];
135+
missing: string[];
136+
}>(
137+
({ buffer, missing }, message) => {
138+
// Check each message for missing dependencies
139+
const missingDependencies = message.causalHistory.filter(
140+
(messageId) =>
141+
!this.messageIdLog.some(
142+
({ messageId: logMessageId }) => logMessageId === messageId
143+
)
144+
);
145+
if (missingDependencies.length === 0) {
146+
// Any message with no missing dependencies is delivered
147+
// and removed from the buffer (implicitly by not adding it to the new incoming buffer)
148+
this.deliverMessage(message);
149+
return { buffer, missing };
150+
}
151+
152+
// Optionally, if a message has not been received after a predetermined amount of time,
153+
// it is marked as irretrievably lost (implicitly by removing it from the buffer without delivery)
154+
if (this.receivedMessageTimeoutEnabled) {
155+
const timeReceived = this.timeReceived.get(message.messageId);
156+
if (
157+
timeReceived &&
158+
Date.now() - timeReceived > this.receivedMessageTimeout
159+
) {
160+
return { buffer, missing };
161+
}
162+
}
163+
// Any message with missing dependencies stays in the buffer
164+
// and the missing message IDs are returned for processing.
165+
return {
166+
buffer: buffer.concat(message),
167+
missing: missing.concat(missingDependencies)
168+
};
169+
},
170+
{ buffer: new Array<Message>(), missing: new Array<string>() }
171+
);
172+
// Update the incoming buffer to only include messages with no missing dependencies
173+
this.incomingBuffer = buffer;
174+
return missing;
175+
}
176+
177+
// https://rfc.vac.dev/vac/raw/sds/#periodic-outgoing-buffer-sweep
178+
public sweepOutgoingBuffer(): {
179+
unacknowledged: Message[];
180+
possiblyAcknowledged: Message[];
181+
} {
182+
// Partition all messages in the outgoing buffer into unacknowledged and possibly acknowledged messages
183+
return this.outgoingBuffer.reduce<{
184+
unacknowledged: Message[];
185+
possiblyAcknowledged: Message[];
186+
}>(
187+
({ unacknowledged, possiblyAcknowledged }, message) => {
188+
if (this.acknowledgements.has(message.messageId)) {
189+
return {
190+
unacknowledged,
191+
possiblyAcknowledged: possiblyAcknowledged.concat(message)
192+
};
193+
}
194+
return {
195+
unacknowledged: unacknowledged.concat(message),
196+
possiblyAcknowledged
197+
};
198+
},
199+
{
200+
unacknowledged: new Array<Message>(),
201+
possiblyAcknowledged: new Array<Message>()
202+
}
203+
);
204+
}
205+
125206
// See https://rfc.vac.dev/vac/raw/sds/#deliver-message
126207
private deliverMessage(message: Message): void {
127208
const messageLamportTimestamp = message.lamportTimestamp ?? 0;

0 commit comments

Comments
 (0)