Skip to content

Commit 9b1ac4c

Browse files
committed
add push message handler registration and make all pubsub use it.
1 parent f925235 commit 9b1ac4c

File tree

2 files changed

+73
-43
lines changed

2 files changed

+73
-43
lines changed

packages/client/lib/client/commands-queue.ts

+50-17
Original file line numberDiff line numberDiff line change
@@ -1,8 +1,8 @@
11
import { SinglyLinkedList, DoublyLinkedNode, DoublyLinkedList } from './linked-list';
22
import encodeCommand from '../RESP/encoder';
33
import { Decoder, PUSH_TYPE_MAPPING, RESP_TYPES } from '../RESP/decoder';
4-
import { CommandArguments, TypeMapping, ReplyUnion, RespVersions } from '../RESP/types';
5-
import { ChannelListeners, PubSub, PubSubCommand, PubSubListener, PubSubType, PubSubTypeListeners } from './pub-sub';
4+
import { TypeMapping, ReplyUnion, RespVersions, CommandArguments } from '../RESP/types';
5+
import { COMMANDS, ChannelListeners, PUBSUB_TYPE, PubSub, PubSubCommand, PubSubListener, PubSubType, PubSubTypeListeners } from './pub-sub';
66
import { AbortError, ErrorReply } from '../errors';
77
import { MonitorCallback } from '.';
88

@@ -51,6 +51,8 @@ export default class RedisCommandsQueue {
5151
#chainInExecution: symbol | undefined;
5252
readonly decoder;
5353
readonly #pubSub = new PubSub();
54+
readonly #pushHandlers: Map<string, (pushMsg: Array<any>) => unknown> = new Map();
55+
readonly #builtInSet = new Set<string>;
5456

5557
get isPubSubActive() {
5658
return this.#pubSub.isActive;
@@ -64,6 +66,21 @@ export default class RedisCommandsQueue {
6466
this.#respVersion = respVersion;
6567
this.#maxLength = maxLength;
6668
this.#onShardedChannelMoved = onShardedChannelMoved;
69+
70+
this.#pushHandlers.set(COMMANDS[PUBSUB_TYPE.CHANNELS].message.toString(), this.#pubSub.handleMessageReplyChannel.bind(this.#pubSub));
71+
this.#pushHandlers.set(COMMANDS[PUBSUB_TYPE.CHANNELS].subscribe.toString(), this.#handleStatusReply.bind(this));
72+
this.#pushHandlers.set(COMMANDS[PUBSUB_TYPE.CHANNELS].unsubscribe.toString(), this.#handleStatusReply.bind(this));
73+
this.#pushHandlers.set(COMMANDS[PUBSUB_TYPE.PATTERNS].message.toString(), this.#pubSub.handleMessageReplyPattern.bind(this.#pubSub));
74+
this.#pushHandlers.set(COMMANDS[PUBSUB_TYPE.PATTERNS].subscribe.toString(), this.#handleStatusReply.bind(this));
75+
this.#pushHandlers.set(COMMANDS[PUBSUB_TYPE.PATTERNS].unsubscribe.toString(), this.#handleStatusReply.bind(this));
76+
this.#pushHandlers.set(COMMANDS[PUBSUB_TYPE.SHARDED].message.toString(), this.#pubSub.handleMessageReplySharded.bind(this.#pubSub));
77+
this.#pushHandlers.set(COMMANDS[PUBSUB_TYPE.SHARDED].subscribe.toString(), this.#handleStatusReply.bind(this));
78+
this.#pushHandlers.set(COMMANDS[PUBSUB_TYPE.SHARDED].unsubscribe.toString(), this.#handleShardedUnsubscribe.bind(this));
79+
80+
for (const str in this.#pushHandlers.keys) {
81+
this.#builtInSet.add(str);
82+
}
83+
6784
this.decoder = this.#initiateDecoder();
6885
}
6986

@@ -75,28 +92,44 @@ export default class RedisCommandsQueue {
7592
this.#waitingForReply.shift()!.reject(err);
7693
}
7794

78-
#onPush(push: Array<any>) {
79-
// TODO: type
80-
if (this.#pubSub.handleMessageReply(push)) return true;
81-
82-
const isShardedUnsubscribe = PubSub.isShardedUnsubscribe(push);
83-
if (isShardedUnsubscribe && !this.#waitingForReply.length) {
95+
#handleStatusReply(push: Array<any>) {
96+
const head = this.#waitingForReply.head!.value;
97+
if (
98+
(Number.isNaN(head.channelsCounter!) && push[2] === 0) ||
99+
--head.channelsCounter! === 0
100+
) {
101+
this.#waitingForReply.shift()!.resolve();
102+
}
103+
}
104+
105+
#handleShardedUnsubscribe(push: Array<any>) {
106+
if (!this.#waitingForReply.length) {
84107
const channel = push[1].toString();
85108
this.#onShardedChannelMoved(
86109
channel,
87110
this.#pubSub.removeShardedListeners(channel)
88111
);
89-
return true;
90-
} else if (isShardedUnsubscribe || PubSub.isStatusReply(push)) {
91-
const head = this.#waitingForReply.head!.value;
92-
if (
93-
(Number.isNaN(head.channelsCounter!) && push[2] === 0) ||
94-
--head.channelsCounter! === 0
95-
) {
96-
this.#waitingForReply.shift()!.resolve();
97-
}
112+
} else {
113+
this.#handleStatusReply(push);
114+
}
115+
}
116+
117+
addPushHandler(messageType: string, handler: (pushMsg: Array<any>) => unknown) {
118+
if (this.#builtInSet.has(messageType)) {
119+
throw new Error("Cannot override built in push message handler");
120+
}
121+
122+
this.#pushHandlers.set(messageType, handler);
123+
}
124+
125+
#onPush(push: Array<any>) {
126+
const handler = this.#pushHandlers.get(push[0].toString());
127+
if (handler) {
128+
handler(push);
98129
return true;
99130
}
131+
132+
return false;
100133
}
101134

102135
#getTypeMapping() {

packages/client/lib/client/pub-sub.ts

+23-26
Original file line numberDiff line numberDiff line change
@@ -11,7 +11,7 @@ export type PUBSUB_TYPE = typeof PUBSUB_TYPE;
1111

1212
export type PubSubType = PUBSUB_TYPE[keyof PUBSUB_TYPE];
1313

14-
const COMMANDS = {
14+
export const COMMANDS = {
1515
[PUBSUB_TYPE.CHANNELS]: {
1616
subscribe: Buffer.from('subscribe'),
1717
unsubscribe: Buffer.from('unsubscribe'),
@@ -344,32 +344,29 @@ export class PubSub {
344344
return commands;
345345
}
346346

347-
handleMessageReply(reply: Array<Buffer>): boolean {
348-
if (COMMANDS[PUBSUB_TYPE.CHANNELS].message.equals(reply[0])) {
349-
this.#emitPubSubMessage(
350-
PUBSUB_TYPE.CHANNELS,
351-
reply[2],
352-
reply[1]
353-
);
354-
return true;
355-
} else if (COMMANDS[PUBSUB_TYPE.PATTERNS].message.equals(reply[0])) {
356-
this.#emitPubSubMessage(
357-
PUBSUB_TYPE.PATTERNS,
358-
reply[3],
359-
reply[2],
360-
reply[1]
361-
);
362-
return true;
363-
} else if (COMMANDS[PUBSUB_TYPE.SHARDED].message.equals(reply[0])) {
364-
this.#emitPubSubMessage(
365-
PUBSUB_TYPE.SHARDED,
366-
reply[2],
367-
reply[1]
368-
);
369-
return true;
370-
}
347+
handleMessageReplyChannel(push: Array<Buffer>) {
348+
this.#emitPubSubMessage(
349+
PUBSUB_TYPE.CHANNELS,
350+
push[2],
351+
push[1]
352+
);
353+
}
371354

372-
return false;
355+
handleMessageReplyPattern(push: Array<Buffer>) {
356+
this.#emitPubSubMessage(
357+
PUBSUB_TYPE.PATTERNS,
358+
push[3],
359+
push[2],
360+
push[1]
361+
);
362+
}
363+
364+
handleMessageReplySharded(push: Array<Buffer>) {
365+
this.#emitPubSubMessage(
366+
PUBSUB_TYPE.SHARDED,
367+
push[2],
368+
push[1]
369+
);
373370
}
374371

375372
removeShardedListeners(channel: string): ChannelListeners {

0 commit comments

Comments
 (0)