Skip to content

Commit

Permalink
refactor(BREAKING): command pipe writes may now return a promise (#225)
Browse files Browse the repository at this point in the history
  • Loading branch information
dsherret authored Jan 27, 2024
1 parent c885ba5 commit c3128fc
Show file tree
Hide file tree
Showing 18 changed files with 247 additions and 189 deletions.
25 changes: 9 additions & 16 deletions mod.test.ts
Original file line number Diff line number Diff line change
Expand Up @@ -95,7 +95,7 @@ Deno.test("should not get stderr when set to writer", async () => {
assertThrows(
() => output.stderr,
Error,
`Stderr was not piped (was streamed). Call .stderr(\"piped\") or .stderr(\"inheritPiped\") when building the command.`,
`Stderr was streamed to another source and is no longer available.`,
);
});

Expand Down Expand Up @@ -374,14 +374,11 @@ Deno.test("should handle boolean list 'and'", async () => {

Deno.test("should support custom command handlers", async () => {
const builder = new CommandBuilder()
.registerCommand("zardoz-speaks", (context) => {
.registerCommand("zardoz-speaks", async (context) => {
if (context.args.length != 1) {
context.stderr.writeLine("zardoz-speaks: expected 1 argument");
return {
code: 1,
};
return context.error("zardoz-speaks: expected 1 argument");
}
context.stdout.writeLine(`zardoz speaks to ${context.args[0]}`);
await context.stdout.writeLine(`zardoz speaks to ${context.args[0]}`);
return {
code: 0,
};
Expand Down Expand Up @@ -802,7 +799,7 @@ Deno.test("piping to stdin", async () => {
.stderr("piped")
.noThrow();
assertEquals(result.code, 1);
assertEquals(result.stderr, "stdin pipe broken. Error: Exited with code: 1\n");
assertEquals(result.stderr, "stdin pipe broken. Exited with code: 1\n");
}
});

Expand Down Expand Up @@ -862,13 +859,9 @@ Deno.test("piping to a writable that throws", async () => {
throw new Error("failed");
},
});
await assertRejects(
async () => {
await $`echo 1`.stdout(writableStream);
},
Error,
"failed",
);
const result = await $`echo 1`.stdout(writableStream).stderr("piped").noThrow();
assertEquals(result.code, 1);
assertEquals(result.stderr, "echo: failed\n");
});

Deno.test("piping stdout/stderr to a file", async () => {
Expand Down Expand Up @@ -1030,7 +1023,7 @@ Deno.test("streaming api errors while streaming", async () => {
.stdout("piped")
.stderr("piped")
.spawn();
assertEquals(result.stderr, "stdin pipe broken. Error: Exited with code: 1\n");
assertEquals(result.stderr, "stdin pipe broken. Exited with code: 1\n");
assertEquals(result.stdout, "1\n2\n");
}
});
Expand Down
88 changes: 45 additions & 43 deletions src/command.ts
Original file line number Diff line number Diff line change
Expand Up @@ -18,19 +18,22 @@ import { Delay } from "./common.ts";
import { Buffer, colors, path, readerFromStreamReader } from "./deps.ts";
import {
CapturingBufferWriter,
CapturingBufferWriterSync,
InheritStaticTextBypassWriter,
NullPipeWriter,
PipedBuffer,
Reader,
ShellPipeReaderKind,
ShellPipeWriter,
ShellPipeWriterKind,
Writer,
WriterSync,
} from "./pipes.ts";
import { parseCommand, spawn } from "./shell.ts";
import { isShowingProgressBars } from "./console/progress/interval.ts";
import { PathRef } from "./path.ts";
import { RequestBuilder } from "./request.ts";
import { writerFromStreamWriter } from "https://deno.land/[email protected]/streams/writer_from_stream_writer.ts";

type BufferStdio = "inherit" | "null" | "streamed" | Buffer;

Expand Down Expand Up @@ -823,74 +826,57 @@ export function parseAndSpawnCommand(state: CommandBuilderState) {
}
const combinedBuffer = new Buffer();
return [
new CapturingBufferWriter(stdoutBuffer, combinedBuffer),
new CapturingBufferWriter(stderrBuffer, combinedBuffer),
getCapturingBuffer(stdoutBuffer, combinedBuffer),
getCapturingBuffer(stderrBuffer, combinedBuffer),
combinedBuffer,
] as const;
}
return [stdoutBuffer, stderrBuffer, undefined] as const;

function getOutputBuffer(innerWriter: WriterSync, { kind, options }: ShellPipeWriterKindWithOptions) {
function getCapturingBuffer(buffer: Writer | WriterSync, combinedBuffer: Buffer) {
if ("write" in buffer) {
return new CapturingBufferWriter(buffer, combinedBuffer);
} else {
return new CapturingBufferWriterSync(buffer, combinedBuffer);
}
}

function getOutputBuffer(inheritWriter: WriterSync, { kind, options }: ShellPipeWriterKindWithOptions) {
if (typeof kind === "object") {
if (kind instanceof PathRef) {
const file = kind.openSync({ write: true, truncate: true, create: true });
disposables.push(file);
return file;
} else if (kind instanceof WritableStream) {
// this is sketch
const writer = kind.getWriter();
const promiseMap = new Map<number, Promise<void>>();
let hadError = false;
let foundErr: unknown = undefined;
let index = 0;
const streamWriter = kind.getWriter();
asyncDisposables.push({
async [Symbol.asyncDispose]() {
await Promise.all(promiseMap.values());
if (foundErr) {
throw foundErr;
}
if (!options?.preventClose && !hadError) {
await writer.close();
streamWriter.releaseLock();
if (!options?.preventClose) {
try {
await kind.close();
} catch {
// ignore, the stream have errored
}
}
},
});
return {
writeSync(buffer: Uint8Array) {
if (foundErr) {
const errorToThrow = foundErr;
foundErr = undefined;
throw errorToThrow;
}
const newIndex = index++;
promiseMap.set(
newIndex,
writer.write(buffer).catch((err) => {
if (err != null) {
foundErr = err;
hadError = true;
}
}).finally(() => {
promiseMap.delete(newIndex);
}),
);
return buffer.length;
},
};
return writerFromStreamWriter(streamWriter);
} else {
return kind;
}
}
switch (kind) {
case "inherit":
if (hasProgressBars) {
return new InheritStaticTextBypassWriter(innerWriter);
return new InheritStaticTextBypassWriter(inheritWriter);
} else {
return "inherit";
}
case "piped":
return new PipedBuffer();
case "inheritPiped":
return new CapturingBufferWriter(innerWriter, new Buffer());
return new CapturingBufferWriterSync(inheritWriter, new Buffer());
case "null":
return "null";
default: {
Expand All @@ -902,9 +888,17 @@ export function parseAndSpawnCommand(state: CommandBuilderState) {
}

function finalizeCommandResultBuffer(
buffer: PipedBuffer | "inherit" | "null" | CapturingBufferWriter | InheritStaticTextBypassWriter | WriterSync,
buffer:
| PipedBuffer
| "inherit"
| "null"
| CapturingBufferWriter
| CapturingBufferWriterSync
| InheritStaticTextBypassWriter
| Writer
| WriterSync,
): BufferStdio {
if (buffer instanceof CapturingBufferWriter) {
if (buffer instanceof CapturingBufferWriterSync || buffer instanceof CapturingBufferWriter) {
return buffer.getBuffer();
} else if (buffer instanceof InheritStaticTextBypassWriter) {
buffer.flush(); // this is line buffered, so flush anything left
Expand All @@ -920,7 +914,15 @@ export function parseAndSpawnCommand(state: CommandBuilderState) {
}

function finalizeCommandResultBufferForError(
buffer: PipedBuffer | "inherit" | "null" | CapturingBufferWriter | InheritStaticTextBypassWriter | WriterSync,
buffer:
| PipedBuffer
| "inherit"
| "null"
| CapturingBufferWriter
| CapturingBufferWriterSync
| InheritStaticTextBypassWriter
| Writer
| WriterSync,
error: Error,
) {
if (buffer instanceof InheritStaticTextBypassWriter) {
Expand Down Expand Up @@ -1013,7 +1015,7 @@ export class CommandResult {

/** Raw stderr bytes. */
get stderrBytes(): Uint8Array {
if (this.#stdout === "streamed") {
if (this.#stderr === "streamed") {
throw new Error(
`Stderr was streamed to another source and is no longer available.`,
);
Expand Down
14 changes: 9 additions & 5 deletions src/command_handler.ts
Original file line number Diff line number Diff line change
@@ -1,15 +1,15 @@
import { ExecuteResult } from "./result.ts";
import type { KillSignal } from "./command.ts";
import { Reader, WriterSync } from "./pipes.ts";
import { Reader } from "./pipes.ts";

/** Used to read from stdin. */
export type CommandPipeReader = "inherit" | "null" | Reader;

/** Used to write to stdout or stderr. */
export interface CommandPipeWriter extends WriterSync {
writeSync(p: Uint8Array): number;
writeText(text: string): void;
writeLine(text: string): void;
export interface CommandPipeWriter {
write(p: Uint8Array): Promise<number> | number;
writeText(text: string): Promise<void> | void;
writeLine(text: string): Promise<void> | void;
}

/** Context of the currently executing command. */
Expand All @@ -21,6 +21,10 @@ export interface CommandContext {
get stdout(): CommandPipeWriter;
get stderr(): CommandPipeWriter;
get signal(): KillSignal;
/// Helper function for writing a line to stderr and returning a 1 exit code.
error(message: string): Promise<ExecuteResult> | ExecuteResult;
/// Helper function for writing a line to stderr and returning the provided exit code.
error(code: number, message: string): Promise<ExecuteResult> | ExecuteResult;
}

/** Handler for executing a command. */
Expand Down
41 changes: 30 additions & 11 deletions src/commands/cat.ts
Original file line number Diff line number Diff line change
Expand Up @@ -14,8 +14,7 @@ export async function catCommand(
const code = await executeCat(context);
return { code };
} catch (err) {
context.stderr.writeLine(`cat: ${err?.message ?? err}`);
return { code: 1 };
return context.error(`cat: ${err?.message ?? err}`);
}
}

Expand All @@ -29,8 +28,14 @@ async function executeCat(context: CommandContext) {
if (typeof context.stdin === "object") { // stdin is a Reader
while (!context.signal.aborted) {
const size = await context.stdin.read(buf);
if (!size || size === 0) break;
else context.stdout.writeSync(buf.slice(0, size));
if (!size || size === 0) {
break;
} else {
const maybePromise = context.stdout.write(buf.slice(0, size));
if (maybePromise instanceof Promise) {
await maybePromise;
}
}
}
exitCode = context.signal.abortedExitCode ?? 0;
} else {
Expand All @@ -44,15 +49,24 @@ async function executeCat(context: CommandContext) {
while (!context.signal.aborted) {
// NOTE: rust supports cancellation here
const size = file.readSync(buf);
if (!size || size === 0) break;
else context.stdout.writeSync(buf.slice(0, size));
if (!size || size === 0) {
break;
} else {
const maybePromise = context.stdout.write(buf.slice(0, size));
if (maybePromise instanceof Promise) {
await maybePromise;
}
}
}
exitCode = context.signal.abortedExitCode ?? 0;
} catch (err) {
context.stderr.writeLine(`cat ${path}: ${err}`);
const maybePromise = context.stderr.writeLine(`cat ${path}: ${err?.message ?? err}`);
if (maybePromise instanceof Promise) {
await maybePromise;
}
exitCode = 1;
} finally {
if (file) file.close();
file?.close();
}
}
}
Expand All @@ -62,10 +76,15 @@ async function executeCat(context: CommandContext) {
export function parseCatArgs(args: string[]): CatFlags {
const paths = [];
for (const arg of parseArgKinds(args)) {
if (arg.kind === "Arg") paths.push(arg.arg);
else bailUnsupported(arg); // for now, we don't support any arguments
if (arg.kind === "Arg") {
paths.push(arg.arg);
} else {
bailUnsupported(arg); // for now, we don't support any arguments
}
}

if (paths.length === 0) paths.push("-");
if (paths.length === 0) {
paths.push("-");
}
return { paths };
}
3 changes: 1 addition & 2 deletions src/commands/cd.ts
Original file line number Diff line number Diff line change
Expand Up @@ -13,8 +13,7 @@ export async function cdCommand(context: CommandContext): Promise<ExecuteResult>
}],
};
} catch (err) {
context.stderr.writeLine(`cd: ${err?.message ?? err}`);
return { code: 1 };
return context.error(`cd: ${err?.message ?? err}`);
}
}

Expand Down
6 changes: 2 additions & 4 deletions src/commands/cp_mv.ts
Original file line number Diff line number Diff line change
Expand Up @@ -11,8 +11,7 @@ export async function cpCommand(
await executeCp(context.cwd, context.args);
return { code: 0 };
} catch (err) {
context.stderr.writeLine(`cp: ${err?.message ?? err}`);
return { code: 1 };
return context.error(`cp: ${err?.message ?? err}`);
}
}

Expand Down Expand Up @@ -101,8 +100,7 @@ export async function mvCommand(
await executeMove(context.cwd, context.args);
return { code: 0 };
} catch (err) {
context.stderr.writeLine(`mv: ${err?.message ?? err}`);
return { code: 1 };
return context.error(`mv: ${err?.message ?? err}`);
}
}

Expand Down
18 changes: 15 additions & 3 deletions src/commands/echo.ts
Original file line number Diff line number Diff line change
@@ -1,7 +1,19 @@
import { CommandContext } from "../command_handler.ts";
import { ExecuteResult } from "../result.ts";

export function echoCommand(context: CommandContext): ExecuteResult {
context.stdout.writeLine(context.args.join(" "));
return { code: 0 };
export function echoCommand(context: CommandContext): ExecuteResult | Promise<ExecuteResult> {
try {
const maybePromise = context.stdout.writeLine(context.args.join(" "));
if (maybePromise instanceof Promise) {
return maybePromise.then(() => ({ code: 0 })).catch((err) => handleFailure(context, err));
} else {
return { code: 0 };
}
} catch (err) {
return handleFailure(context, err);
}
}

function handleFailure(context: CommandContext, err: any) {
return context.error(`echo: ${err?.message ?? err}`);
}
Loading

0 comments on commit c3128fc

Please sign in to comment.