From 5c32e12f5efcc70a261a01a58e05e55f9dafdf12 Mon Sep 17 00:00:00 2001 From: Asher Gomez Date: Sat, 27 Apr 2024 15:33:53 +1000 Subject: [PATCH] work --- deno.json | 2 +- mod.ts | 24 ++++++++++------------ x.ts | 59 ++++++++++++++++++++++++++++++++++++++++++++----------- x_test.ts | 35 +++++++++++++++++++++++++++++---- 4 files changed, 90 insertions(+), 30 deletions(-) diff --git a/deno.json b/deno.json index 992ce7e..cc12bd5 100644 --- a/deno.json +++ b/deno.json @@ -12,7 +12,7 @@ }, "tasks": { "redis:start": "redis-server --save \"\" --appendonly no --daemonize yes", - "test": "deno test --allow-net --trace-leaks --coverage --doc --parallel", + "test": "deno test --allow-net --trace-leaks --coverage --doc --parallel x_test.ts", "test:dev": "deno task redis:start && deno task test || redis-cli SHUTDOWN", "bench": "deno bench --allow-net --allow-env", "bench:dev": "deno task redis:start && deno task bench", diff --git a/mod.ts b/mod.ts index 1e35d95..400ea60 100644 --- a/mod.ts +++ b/mod.ts @@ -269,15 +269,6 @@ async function* readReplies( } } -class AsyncQueue { - #queue: Promise = Promise.resolve(); - - async enqueue(task: () => Promise): Promise { - this.#queue = this.#queue.then(task); - return await this.#queue; - } -} - /** * A Redis client that can be used to send commands to a Redis server. * @@ -297,12 +288,17 @@ class AsyncQueue { */ export class RedisClient { #conn: Deno.TcpConn | Deno.TlsConn; - #queue: AsyncQueue; + #queue: Promise; /** Constructs a new instance. */ constructor(conn: Deno.TcpConn | Deno.TlsConn) { this.#conn = conn; - this.#queue = new AsyncQueue(); + this.#queue = Promise.resolve(); + } + + async #enqueue(task: () => Promise): Promise { + this.#queue = this.#queue.then(task); + return await this.#queue; } /** @@ -323,7 +319,7 @@ export class RedisClient { * ``` */ async sendCommand(command: Command, raw = false): Promise { - return await this.#queue.enqueue( + return await this.#enqueue( async () => await sendCommand(this.#conn, command, raw), ); } @@ -342,7 +338,7 @@ export class RedisClient { * ``` */ async writeCommand(command: Command) { - await this.#queue.enqueue( + await this.#enqueue( async () => await writeCommand(this.#conn, command), ); } @@ -389,7 +385,7 @@ export class RedisClient { * ``` */ async pipelineCommands(commands: Command[]): Promise { - return await this.#queue.enqueue( + return await this.#enqueue( async () => await pipelineCommands(this.#conn, commands), ); } diff --git a/x.ts b/x.ts index be8c679..2cbd4ac 100644 --- a/x.ts +++ b/x.ts @@ -144,6 +144,26 @@ class RedisEncoderStream extends TransformStream { } } +async function sendCommand( + writable: WritableStream, + reader: ReadableStreamDefaultReader, + command: RedisCommand, +) { + await writeCommand(writable, command); + return await readReply(reader); +} + +async function pipeline( + writable: WritableStream, + reader: ReadableStreamDefaultReader, + commands: RedisCommand[], +): Promise { + for (const command of commands) { + await writeCommand(writable, command); + } + return await readReplies(reader, commands.length); +} + interface Conn { readonly readable: ReadableStream; readonly writable: WritableStream; @@ -164,9 +184,21 @@ class TextDecoderStream extends TransformStream { } } +async function writeCommand( + writable: WritableStream, + command: RedisCommand, +) { + await ReadableStream.from([command]) + .pipeThrough(new RedisEncoderStream()) + .pipeThrough(new TextEncoderStream()) + .pipeTo(writable, { preventClose: true }); +} + export class RedisClient { #reader: ReadableStreamDefaultReader; #writable: WritableStream; + // deno-lint-ignore no-explicit-any + #queue: Promise; constructor(conn: Conn) { this.#reader = conn.readable @@ -174,28 +206,33 @@ export class RedisClient { .pipeThrough(new RedisLineStream()) .getReader(); this.#writable = conn.writable; + this.#queue = Promise.resolve(); + } + + async #enqueue(task: () => Promise): Promise { + this.#queue = this.#queue.then(task); + return await this.#queue; } async readReply(): Promise { - return await readReply(this.#reader); + return await this.#enqueue(async () => await readReply(this.#reader)); } async writeCommand(command: RedisCommand) { - await ReadableStream.from([command]) - .pipeThrough(new RedisEncoderStream()) - .pipeThrough(new TextEncoderStream()) - .pipeTo(this.#writable, { preventClose: true }); + await this.#enqueue(async () => + await writeCommand(this.#writable, command) + ); } async sendCommand(command: RedisCommand): Promise { - await this.writeCommand(command); - return await this.readReply(); + return await this.#enqueue(async () => + await sendCommand(this.#writable, this.#reader, command) + ); } async pipeline(commands: RedisCommand[]): Promise { - for (const command of commands) { - await this.writeCommand(command); - } - return await readReplies(this.#reader, commands.length); + return await this.#enqueue(async () => + await pipeline(this.#writable, this.#reader, commands) + ); } } diff --git a/x_test.ts b/x_test.ts index e62392e..01eb4be 100644 --- a/x_test.ts +++ b/x_test.ts @@ -138,6 +138,8 @@ const PORT = 6379; const redisConn = await Deno.connect({ hostname: HOSTNAME, port: PORT }); const redisClient = new RedisClient(redisConn); +await redisClient.sendCommand(["FLUSHALL"]); + async function assertSendCommandEquals( command: RedisCommand, expected: RedisReply, @@ -152,9 +154,12 @@ Deno.test("RedisClient.sendCommand() transactions", async () => { await assertSendCommandEquals(["EXEC"], [1, 1]); }); -/* Deno.test("RedisClient.sendCommand() eval script", async () => { - await assertSendCommandEquals("EVAL return ARGV[1] 0 hello", "hello"); -}); */ +Deno.test("RedisClient.sendCommand() eval script", async () => { + await assertSendCommandEquals( + ["EVAL", "return ARGV[1]", 0, "hello"], + "hello", + ); +}); Deno.test("redisClient.sendCommand() Lua script", async () => { await assertSendCommandEquals([ @@ -174,7 +179,7 @@ Deno.test("redisClient.sendCommand() RESP3", async () => { }); }); -/* Deno.test("redisClient.sendCommand() race condition", async () => { +Deno.test("redisClient.sendCommand() race condition", async () => { async function fn() { const key = crypto.randomUUID(); const value = crypto.randomUUID(); @@ -204,4 +209,26 @@ Deno.test("redisClient.sendCommand() RESP3", async () => { fn(), fn(), ]); +}); + +Deno.test("redisClient.pipelineCommands()", async () => { + assertEquals( + await redisClient.pipeline([ + ["INCR", "X"], + ["INCR", "X"], + ["INCR", "X"], + ["INCR", "X"], + ]), + [1, 2, 3, 4], + ); +}); + +/* Deno.test("redisClient.sendCommand() - no reply", async () => { + await assertRejects( + async () => await redisClient.sendCommand(["SHUTDOWN"]), + RedisError, + "No reply received", + ); }); */ + +// addEventListener("unload", () => redisConn.close());