Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

refactor: cleanup readReply() logic #181

Merged
merged 4 commits into from
Dec 16, 2024
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
3 changes: 1 addition & 2 deletions deno.json
Original file line number Diff line number Diff line change
Expand Up @@ -7,8 +7,7 @@
"@std/bytes": "jsr:@std/bytes@^0.220.1",
"@std/collections": "jsr:@std/collections@^0.220.1",
"@std/fmt": "jsr:@std/fmt@^0.220.1",
"@std/io": "jsr:@std/io@^0.220.1",
"jsr:@iuioiua/r2d2/": "./"
"@std/io": "jsr:@std/io@^0.220.1"
},
"tasks": {
"redis:start": "redis-server --save \"\" --appendonly no --daemonize yes",
Expand Down
242 changes: 75 additions & 167 deletions mod.ts
Original file line number Diff line number Diff line change
Expand Up @@ -38,12 +38,12 @@ export type Reply =

const encoder = new TextEncoder();
const decoder = new TextDecoder();
const CRLF = "\r\n";
const CRLF_RAW = encoder.encode(CRLF);

const CRLF_STRING = "\r\n";
const ARRAY_PREFIX_STRING = "*";
const BULK_STRING_PREFIX_STRING = "$";

const CRLF = encoder.encode(CRLF_STRING);
const ARRAY_PREFIX = ARRAY_PREFIX_STRING.charCodeAt(0);
const ATTRIBUTE_PREFIX = "|".charCodeAt(0);
const BIG_NUMBER_PREFIX = "(".charCodeAt(0);
Expand All @@ -66,16 +66,20 @@ const VERBATIM_STRING_PREFIX = "=".charCodeAt(0);
* @see {@link https://redis.io/docs/reference/protocol-spec/#send-commands-to-a-redis-server}
*/
function createRequest(command: Command): Uint8Array {
const lines = [encoder.encode(ARRAY_PREFIX_STRING + command.length + CRLF)];
const lines = [
encoder.encode(ARRAY_PREFIX_STRING + command.length + CRLF_STRING),
];
for (const arg of command) {
const bytes = arg instanceof Uint8Array
? arg
: encoder.encode(arg.toString());
lines.push(
encoder.encode(BULK_STRING_PREFIX_STRING + bytes.byteLength + CRLF),
encoder.encode(
BULK_STRING_PREFIX_STRING + bytes.byteLength + CRLF_STRING,
),
bytes,
CRLF,
);
lines.push(bytes);
lines.push(CRLF_RAW);
}
return concat(lines);
}
Expand All @@ -87,110 +91,12 @@ async function writeCommand(
await writeAll(writer, createRequest(command));
}

function removePrefix(line: Uint8Array): string {
return decoder.decode(line.slice(1));
}

function toObject(array: any[]): Record<string, any> {
return Object.fromEntries(chunk(array, 2));
}

async function readNReplies(
function readNReplies(
length: number,
iterator: AsyncIterableIterator<Uint8Array>,
raw = false,
): Promise<Reply[]> {
const replies: Reply[] = [];
for (let i = 0; i < length; i++) {
replies.push(await readReply(iterator, raw));
}
return replies;
}

async function readArray(
line: Uint8Array,
iterator: AsyncIterableIterator<Uint8Array>,
): Promise<null | Reply[]> {
const length = readNumberOrDouble(line);
return length === -1 ? null : await readNReplies(length, iterator);
}

/**
* Read but don't return attribute data.
*
* @todo include attribute data somehow
*/
async function readAttribute(
line: Uint8Array,
iterator: AsyncIterableIterator<Uint8Array>,
raw = false,
): Promise<null | Reply> {
await readMap(line, iterator);
return await readReply(iterator, raw);
}

function readBigNumber(line: Uint8Array): bigint {
return BigInt(removePrefix(line));
}

async function readBlobError(
iterator: AsyncIterableIterator<Uint8Array>,
): Promise<never> {
/** Skip to reading the next line, which is a string */
const { value } = await iterator.next();
return await Promise.reject(decoder.decode(value));
}

function readBoolean(line: Uint8Array): boolean {
return removePrefix(line) === "t";
}

async function readBulkOrVerbatimString(
line: Uint8Array,
iterator: AsyncIterableIterator<Uint8Array>,
raw = false,
): Promise<string | null> {
if (readNumberOrDouble(line) === -1) {
return null;
}
const { value } = await iterator.next();
return raw ? value : decoder.decode(value);
}

async function readError(line: Uint8Array): Promise<never> {
return await Promise.reject(removePrefix(line));
}

async function readMap(
line: Uint8Array,
iterator: AsyncIterableIterator<Uint8Array>,
): Promise<Record<string, any>> {
const length = readNumberOrDouble(line) * 2;
const array = await readNReplies(length, iterator);
return toObject(array);
}

function readNumberOrDouble(line: Uint8Array): number {
const number = removePrefix(line);
switch (number) {
case "inf":
return Infinity;
case "-inf":
return -Infinity;
default:
return Number(number);
}
}

async function readSet(
line: Uint8Array,
iterator: AsyncIterableIterator<Uint8Array>,
): Promise<Set<Reply>> {
return new Set(await readArray(line, iterator));
}

function readSimpleString(line: Uint8Array): string {
return removePrefix(line);
return Array.fromAsync({ length }, () => readReply(iterator, raw));
}

/**
Expand All @@ -206,36 +112,66 @@ export async function readReply(
): Promise<Reply> {
const { value } = await iterator.next();
if (value.length === 0) {
return await Promise.reject(new TypeError("No reply received"));
return Promise.reject(new TypeError("No reply received"));
}
const line = decoder.decode(value.slice(1));
switch (value[0]) {
case ARRAY_PREFIX:
case PUSH_PREFIX:
return await readArray(value, iterator);
case ATTRIBUTE_PREFIX:
return await readAttribute(value, iterator);
case PUSH_PREFIX: {
const length = Number(line);
return length === -1 ? null : await readNReplies(length, iterator);
}
case ATTRIBUTE_PREFIX: {
/**
* Read but don't return attribute data.
*
* @todo include attribute data somehow
*/
const length = Number(line) * 2;
await readNReplies(length, iterator);
return readReply(iterator, raw);
}
case BIG_NUMBER_PREFIX:
return readBigNumber(value);
case BLOB_ERROR_PREFIX:
return readBlobError(iterator);
return BigInt(line);
case BLOB_ERROR_PREFIX: {
/** Skip to reading the next line, which is a string */
const { value } = await iterator.next();
return Promise.reject(decoder.decode(value));
}
case BOOLEAN_PREFIX:
return readBoolean(value);
return line === "t";
case BULK_STRING_PREFIX:
case VERBATIM_STRING_PREFIX:
return await readBulkOrVerbatimString(value, iterator, raw);
case VERBATIM_STRING_PREFIX: {
if (Number(line) === -1) {
return null;
}
const { value } = await iterator.next();
return raw ? value : decoder.decode(value);
}
case DOUBLE_PREFIX:
case INTEGER_PREFIX:
return readNumberOrDouble(value);
case INTEGER_PREFIX: {
switch (line) {
case "inf":
return Infinity;
case "-inf":
return -Infinity;
default:
return Number(line);
}
}
case ERROR_PREFIX:
return readError(value);
case MAP_PREFIX:
return await readMap(value, iterator);
return Promise.reject(line);
case MAP_PREFIX: {
const length = Number(line) * 2;
const array = await readNReplies(length, iterator);
return Object.fromEntries(chunk(array, 2));
}
case NULL_PREFIX:
return null;
case SET_PREFIX:
return await readSet(value, iterator);
return new Set(await readNReplies(Number(line), iterator, raw));
case SIMPLE_STRING_PREFIX:
return readSimpleString(value);
return line;
/** No prefix */
default:
return decoder.decode(value);
Expand All @@ -248,7 +184,7 @@ async function sendCommand(
raw = false,
): Promise<Reply> {
await writeCommand(redisConn, command);
return await readReply(readDelim(redisConn, CRLF_RAW), raw);
return readReply(readDelim(redisConn, CRLF), raw);
}

async function pipelineCommands(
Expand All @@ -257,53 +193,31 @@ async function pipelineCommands(
): Promise<Reply[]> {
const bytes = commands.map(createRequest);
await writeAll(redisConn, concat(bytes));
return readNReplies(commands.length, readDelim(redisConn, CRLF_RAW));
return readNReplies(commands.length, readDelim(redisConn, CRLF));
}

async function* readReplies(
redisConn: Deno.Conn,
raw = false,
): AsyncIterableIterator<Reply> {
const iterator = readDelim(redisConn, CRLF_RAW);
const iterator = readDelim(redisConn, CRLF);
while (true) {
yield await readReply(iterator, raw);
}
}

class AsyncQueue {
#queue: Promise<any> = Promise.resolve();

async enqueue<T>(task: () => Promise<T>): Promise<T> {
this.#queue = this.#queue.then(task);
return await this.#queue;
}
}

/**
* A Redis client that can be used to send commands to a Redis server.
*
* @example
* ```ts ignore
* import { RedisClient } from "jsr:@iuioiua/r2d2";
*
* const redisConn = await Deno.connect({ port: 6379 });
* const redisClient = new RedisClient(redisConn);
*
* // Returns "OK"
* await redisClient.sendCommand(["SET", "hello", "world"]);
*
* // Returns "world"
* await redisClient.sendCommand(["GET", "hello"]);
* ```
*/
export class RedisClient {
#conn: Deno.TcpConn | Deno.TlsConn;
#queue: AsyncQueue;
#queue: Promise<any> = Promise.resolve();

/** Constructs a new instance. */
constructor(conn: Deno.TcpConn | Deno.TlsConn) {
this.#conn = conn;
this.#queue = new AsyncQueue();
}

#enqueue<T>(task: () => Promise<T>): Promise<T> {
this.#queue = this.#queue.then(task);
return this.#queue;
}

/**
Expand All @@ -323,10 +237,8 @@ export class RedisClient {
* await redisClient.sendCommand(["GET", "hello"]);
* ```
*/
async sendCommand(command: Command, raw = false): Promise<Reply> {
return await this.#queue.enqueue(
async () => await sendCommand(this.#conn, command, raw),
);
sendCommand(command: Command, raw = false): Promise<Reply> {
return this.#enqueue(() => sendCommand(this.#conn, command, raw));
}

/**
Expand All @@ -342,10 +254,8 @@ export class RedisClient {
* await redisClient.writeCommand(["SHUTDOWN"]);
* ```
*/
async writeCommand(command: Command) {
await this.#queue.enqueue(
async () => await writeCommand(this.#conn, command),
);
writeCommand(command: Command): Promise<void> {
return this.#enqueue(() => writeCommand(this.#conn, command));
}

/**
Expand Down Expand Up @@ -389,9 +299,7 @@ export class RedisClient {
* ]);
* ```
*/
async pipelineCommands(commands: Command[]): Promise<Reply[]> {
return await this.#queue.enqueue(
async () => await pipelineCommands(this.#conn, commands),
);
pipelineCommands(commands: Command[]): Promise<Reply[]> {
return this.#enqueue(() => pipelineCommands(this.#conn, commands));
}
}
Loading