Skip to content

Commit

Permalink
work
Browse files Browse the repository at this point in the history
  • Loading branch information
iuioiua committed Apr 27, 2024
1 parent 2327ffb commit 5c32e12
Show file tree
Hide file tree
Showing 4 changed files with 90 additions and 30 deletions.
2 changes: 1 addition & 1 deletion deno.json
Original file line number Diff line number Diff line change
Expand Up @@ -12,7 +12,7 @@
},
"tasks": {
"redis:start": "redis-server --save \"\" --appendonly no --daemonize yes",
"test": "deno test --allow-net --trace-leaks --coverage --doc --parallel",
"test": "deno test --allow-net --trace-leaks --coverage --doc --parallel x_test.ts",
"test:dev": "deno task redis:start && deno task test || redis-cli SHUTDOWN",
"bench": "deno bench --allow-net --allow-env",
"bench:dev": "deno task redis:start && deno task bench",
Expand Down
24 changes: 10 additions & 14 deletions mod.ts
Original file line number Diff line number Diff line change
Expand Up @@ -269,15 +269,6 @@ async function* readReplies(
}
}

class AsyncQueue {
#queue: Promise<any> = Promise.resolve();

async enqueue<T>(task: () => Promise<T>): Promise<T> {
this.#queue = this.#queue.then(task);
return await this.#queue;
}
}

/**
* A Redis client that can be used to send commands to a Redis server.
*
Expand All @@ -297,12 +288,17 @@ class AsyncQueue {
*/
export class RedisClient {
#conn: Deno.TcpConn | Deno.TlsConn;
#queue: AsyncQueue;
#queue: Promise<any>;

/** Constructs a new instance. */
constructor(conn: Deno.TcpConn | Deno.TlsConn) {
this.#conn = conn;
this.#queue = new AsyncQueue();
this.#queue = Promise.resolve();
}

async #enqueue<T>(task: () => Promise<T>): Promise<T> {
this.#queue = this.#queue.then(task);
return await this.#queue;
}

/**
Expand All @@ -323,7 +319,7 @@ export class RedisClient {
* ```
*/
async sendCommand(command: Command, raw = false): Promise<Reply> {
return await this.#queue.enqueue(
return await this.#enqueue(
async () => await sendCommand(this.#conn, command, raw),
);
}
Expand All @@ -342,7 +338,7 @@ export class RedisClient {
* ```
*/
async writeCommand(command: Command) {
await this.#queue.enqueue(
await this.#enqueue(
async () => await writeCommand(this.#conn, command),
);
}
Expand Down Expand Up @@ -389,7 +385,7 @@ export class RedisClient {
* ```
*/
async pipelineCommands(commands: Command[]): Promise<Reply[]> {
return await this.#queue.enqueue(
return await this.#enqueue(
async () => await pipelineCommands(this.#conn, commands),
);
}
Expand Down
59 changes: 48 additions & 11 deletions x.ts
Original file line number Diff line number Diff line change
Expand Up @@ -144,6 +144,26 @@ class RedisEncoderStream extends TransformStream<RedisCommand, string> {
}
}

async function sendCommand(
writable: WritableStream<Uint8Array>,
reader: ReadableStreamDefaultReader<string>,
command: RedisCommand,
) {
await writeCommand(writable, command);
return await readReply(reader);
}

async function pipeline(
writable: WritableStream<Uint8Array>,
reader: ReadableStreamDefaultReader<string>,
commands: RedisCommand[],
): Promise<RedisReply[]> {
for (const command of commands) {
await writeCommand(writable, command);
}
return await readReplies(reader, commands.length);
}

interface Conn {
readonly readable: ReadableStream<Uint8Array>;
readonly writable: WritableStream<Uint8Array>;
Expand All @@ -164,38 +184,55 @@ class TextDecoderStream extends TransformStream<Uint8Array, string> {
}
}

async function writeCommand(
writable: WritableStream<Uint8Array>,
command: RedisCommand,
) {
await ReadableStream.from([command])
.pipeThrough(new RedisEncoderStream())
.pipeThrough(new TextEncoderStream())
.pipeTo(writable, { preventClose: true });
}

export class RedisClient {
#reader: ReadableStreamDefaultReader<string>;
#writable: WritableStream<Uint8Array>;
// deno-lint-ignore no-explicit-any
#queue: Promise<any>;

constructor(conn: Conn) {
this.#reader = conn.readable
.pipeThrough(new TextDecoderStream())
.pipeThrough(new RedisLineStream())
.getReader();
this.#writable = conn.writable;
this.#queue = Promise.resolve();
}

async #enqueue<T>(task: () => Promise<T>): Promise<T> {
this.#queue = this.#queue.then(task);
return await this.#queue;
}

async readReply(): Promise<RedisReply> {
return await readReply(this.#reader);
return await this.#enqueue(async () => await readReply(this.#reader));
}

Check warning on line 219 in x.ts

View check run for this annotation

Codecov / codecov/patch

x.ts#L217-L219

Added lines #L217 - L219 were not covered by tests

async writeCommand(command: RedisCommand) {
await ReadableStream.from([command])
.pipeThrough(new RedisEncoderStream())
.pipeThrough(new TextEncoderStream())
.pipeTo(this.#writable, { preventClose: true });
await this.#enqueue(async () =>
await writeCommand(this.#writable, command)

Check warning on line 223 in x.ts

View check run for this annotation

Codecov / codecov/patch

x.ts#L221-L223

Added lines #L221 - L223 were not covered by tests
);
}

Check warning on line 225 in x.ts

View check run for this annotation

Codecov / codecov/patch

x.ts#L225

Added line #L225 was not covered by tests

async sendCommand(command: RedisCommand): Promise<RedisReply> {
await this.writeCommand(command);
return await this.readReply();
return await this.#enqueue(async () =>
await sendCommand(this.#writable, this.#reader, command)
);
}

async pipeline(commands: RedisCommand[]): Promise<RedisReply[]> {
for (const command of commands) {
await this.writeCommand(command);
}
return await readReplies(this.#reader, commands.length);
return await this.#enqueue(async () =>
await pipeline(this.#writable, this.#reader, commands)
);
}
}
35 changes: 31 additions & 4 deletions x_test.ts
Original file line number Diff line number Diff line change
Expand Up @@ -138,6 +138,8 @@ const PORT = 6379;
const redisConn = await Deno.connect({ hostname: HOSTNAME, port: PORT });
const redisClient = new RedisClient(redisConn);

await redisClient.sendCommand(["FLUSHALL"]);

async function assertSendCommandEquals(
command: RedisCommand,
expected: RedisReply,
Expand All @@ -152,9 +154,12 @@ Deno.test("RedisClient.sendCommand() transactions", async () => {
await assertSendCommandEquals(["EXEC"], [1, 1]);
});

/* Deno.test("RedisClient.sendCommand() eval script", async () => {
await assertSendCommandEquals("EVAL return ARGV[1] 0 hello", "hello");
}); */
Deno.test("RedisClient.sendCommand() eval script", async () => {
await assertSendCommandEquals(
["EVAL", "return ARGV[1]", 0, "hello"],
"hello",
);
});

Deno.test("redisClient.sendCommand() Lua script", async () => {
await assertSendCommandEquals([
Expand All @@ -174,7 +179,7 @@ Deno.test("redisClient.sendCommand() RESP3", async () => {
});
});

/* Deno.test("redisClient.sendCommand() race condition", async () => {
Deno.test("redisClient.sendCommand() race condition", async () => {
async function fn() {
const key = crypto.randomUUID();
const value = crypto.randomUUID();
Expand Down Expand Up @@ -204,4 +209,26 @@ Deno.test("redisClient.sendCommand() RESP3", async () => {
fn(),
fn(),
]);
});

Deno.test("redisClient.pipelineCommands()", async () => {
assertEquals(
await redisClient.pipeline([
["INCR", "X"],
["INCR", "X"],
["INCR", "X"],
["INCR", "X"],
]),
[1, 2, 3, 4],
);
});

/* Deno.test("redisClient.sendCommand() - no reply", async () => {
await assertRejects(
async () => await redisClient.sendCommand(["SHUTDOWN"]),
RedisError,
"No reply received",
);
}); */

// addEventListener("unload", () => redisConn.close());

0 comments on commit 5c32e12

Please sign in to comment.