Skip to content

Commit

Permalink
refactor:
Browse files Browse the repository at this point in the history
  • Loading branch information
share121 committed Feb 7, 2025
1 parent af62bad commit 40beb05
Show file tree
Hide file tree
Showing 4 changed files with 51 additions and 36 deletions.
2 changes: 1 addition & 1 deletion .github/workflows/build.yml
Original file line number Diff line number Diff line change
Expand Up @@ -3,7 +3,7 @@ name: build
on:
push:
tags:
- "v*"
- v*

permissions:
packages: write
Expand Down
41 changes: 19 additions & 22 deletions main.ts
Original file line number Diff line number Diff line change
@@ -1,6 +1,16 @@
import { basename, join } from "jsr:@std/path";
import { Mutex } from "npm:async-mutex";
import { downloadChunk } from "./workerpool.ts";
import { downloadChunk, type Chunk } from "./workerpool.ts";

export interface DownloadOptions {
url: string;
dirpath: string;
threads: number;
chunkSize?: number;
headers?: HeadersInit;
startChunk?: number;
endChunk?: number;
}

export async function download({
url,
Expand All @@ -10,15 +20,7 @@ export async function download({
headers = {},
startChunk = 0,
endChunk = Infinity,
}: {
url: string;
dirpath: string;
threads: number;
chunkSize?: number;
headers?: HeadersInit;
startChunk?: number;
endChunk?: number;
}) {
}: DownloadOptions) {
async function getURLInfo() {
while (true) {
const r = await fetch(url, {
Expand Down Expand Up @@ -59,17 +61,17 @@ export async function download({
start: startChunk + i * chunkSize,
end: startChunk + Math.min((i + 1) * chunkSize, contentLength) - 1,
}));
await downloadChunk<Chunk, ArrayBuffer>(
threads,
await downloadChunk({
threadCount: threads,
url,
headers,
chunks,
async (i) => {
data: chunks,
onProgress: async (i) => {
const release = await mutex.acquire();
while (true) {
try {
await file.seek(i.origin.start, Deno.SeekMode.Start);
await file.write(new Uint8Array(i.data));
await file.write(i.data);
break;
} catch (e) {
console.error("%c" + e, "color: red");
Expand All @@ -78,8 +80,8 @@ export async function download({
release();
console.log(`chunk ${i.origin.start} - ${i.origin.end} done`);
},
Infinity
);
maxRetries: Infinity,
});
await mutex.waitForUnlock();
} finally {
file.close();
Expand All @@ -88,11 +90,6 @@ export async function download({
return filePath;
}

export interface Chunk {
start: number;
end: number;
}

if (import.meta.main) {
const url = Deno.args[0];
const threads = +Deno.args[1] || 32;
Expand Down
6 changes: 3 additions & 3 deletions worker.ts
Original file line number Diff line number Diff line change
@@ -1,6 +1,6 @@
import type { Chunk } from "./main.ts";
import type { Chunk } from "./workerpool.ts";

interface EventData {
export interface EventData {
url: string;
headers: HeadersInit;
chunks: Chunk[];
Expand All @@ -15,7 +15,7 @@ addEventListener("message", async ({ data }: MessageEvent<EventData>) => {
},
});
if (r.status !== 206) throw new Error(`Invalid status code ${r.status}"}`);
const buf = await r.arrayBuffer();
const buf = new Uint8Array(await r.arrayBuffer());
self.postMessage(buf, [buf]);
}
});
Expand Down
38 changes: 28 additions & 10 deletions workerpool.ts
Original file line number Diff line number Diff line change
@@ -1,12 +1,25 @@
// deno-lint-ignore-file ban-ts-comment
export function downloadChunk<T, R>(
threadCount: number,
url: string,
headers: HeadersInit,
data: T[],
onProgress: (data: { origin: T; data: R; index: number }) => void,
maxRetries: number = 3
) {
export interface DownloadChunkOptions {
threadCount: number;
url: string;
headers: HeadersInit;
data: Chunk[];
onProgress: (data: {
origin: Chunk;
data: Uint8Array;
index: number;
}) => void;
maxRetries: number;
}

export function downloadChunk({
threadCount,
url,
headers,
data,
onProgress,
maxRetries = 3,
}: DownloadChunkOptions) {
if (data.length < 1) return Promise.resolve([]);
if (threadCount < 1) throw new Error("threadCount must be greater than 0");
const workerFactory = () =>
Expand All @@ -30,20 +43,20 @@ export function downloadChunk<T, R>(
stolen: false,
} as WorkerData);
printWorkerData(workerData, i);
const messageHandle = (e: MessageEvent<R>) => {
const messageHandle = (e: MessageEvent<Uint8Array>) => {
// Deno 不能在主线程中捕获错误,所以这是折中的办法 start
// @ts-ignore
if (e.data?.type === "error" || e.data?.type === "unhandledrejection") {
// @ts-ignore
return errorHandel(e.data);
}
// Deno 不能在主线程中捕获错误,所以这是折中的办法 end
workerData.retryCount = 0;
onProgress({
index: workerData.currentChunk,
origin: data[workerData.currentChunk],
data: e.data,
});
workerData.retryCount = 0;
workerData.currentChunk++;
if (workerData.currentChunk < workerData.endChunk) return;
if (workerData.stolen) {
Expand Down Expand Up @@ -119,6 +132,11 @@ export function downloadChunk<T, R>(
});
}

export interface Chunk {
start: number;
end: number;
}

interface WorkerData {
worker: Worker;
startChunk: number;
Expand Down

0 comments on commit 40beb05

Please sign in to comment.