Skip to content

Commit

Permalink
feat: 移除 web worker
Browse files Browse the repository at this point in the history
  • Loading branch information
share121 committed Feb 7, 2025
1 parent 5327529 commit 8a84304
Show file tree
Hide file tree
Showing 5 changed files with 93 additions and 98 deletions.
Empty file.
5 changes: 1 addition & 4 deletions deno.json
Original file line number Diff line number Diff line change
@@ -1,8 +1,5 @@
{
"tasks": {
"build": "deno compile --allow-write --allow-net --allow-read --include worker.ts main.ts"
},
"compilerOptions": {
"lib": ["deno.window", "deno.worker"]
"build": "deno compile --allow-write --allow-net --allow-read main.ts"
}
}
40 changes: 18 additions & 22 deletions main.ts
Original file line number Diff line number Diff line change
@@ -1,6 +1,16 @@
import { basename, join } from "jsr:@std/path";
import { Mutex } from "npm:async-mutex";
import { downloadChunk } from "./workerpool.ts";
import { Chunk, downloadChunk } from "./workerpool.ts";

export interface DownloadOptions {
url: string;
dirpath: string;
threads: number;
chunkSize?: number;
headers?: HeadersInit;
startChunk?: number;
endChunk?: number;
}

export async function download({
url,
Expand All @@ -10,15 +20,7 @@ export async function download({
headers = {},
startChunk = 0,
endChunk = Infinity,
}: {
url: string;
dirpath: string;
threads: number;
chunkSize?: number;
headers?: HeadersInit;
startChunk?: number;
endChunk?: number;
}) {
}: DownloadOptions) {
async function getURLInfo() {
while (true) {
const r = await fetch(url, {
Expand Down Expand Up @@ -59,12 +61,12 @@ export async function download({
start: startChunk + i * chunkSize,
end: startChunk + Math.min((i + 1) * chunkSize, contentLength) - 1,
}));
await downloadChunk<Chunk, ArrayBuffer>(
threads,
await downloadChunk({
threadCount: threads,
url,
headers,
chunks,
async (i) => {
data: chunks,
onProgress: async (i) => {
const release = await mutex.acquire();
while (true) {
try {
Expand All @@ -78,21 +80,15 @@ export async function download({
release();
console.log(`chunk ${i.origin.start} - ${i.origin.end} done`);
},
Infinity
);
maxRetries: Infinity,
});
await mutex.waitForUnlock();
} finally {
file.close();
}

return filePath;
}

export interface Chunk {
start: number;
end: number;
}

if (import.meta.main) {
const url = Deno.args[0];
const threads = +Deno.args[1] || 32;
Expand Down
38 changes: 8 additions & 30 deletions worker.ts
Original file line number Diff line number Diff line change
@@ -1,45 +1,23 @@
import type { Chunk } from "./main.ts";
import type { Chunk } from "./workerpool.ts";

interface EventData {
export interface FetchChunksOptions {
url: string;
headers: HeadersInit;
chunks: Chunk[];
signal: AbortSignal;
}

addEventListener("message", async ({ data }: MessageEvent<EventData>) => {
export async function* fetchChunks(data: FetchChunksOptions) {
for (const item of data.chunks) {
if (data.signal.aborted) return;
const r = await fetch(data.url, {
headers: {
...data.headers,
Range: `bytes=${item.start}-${item.end}`,
},
signal: data.signal,
});
if (r.status !== 206) throw new Error(`Invalid status code ${r.status}"}`);
const buf = await r.arrayBuffer();
self.postMessage(buf, [buf]);
yield r.arrayBuffer();
}
});

addEventListener("error", (e) => {
// Deno 不能在主线程中捕获错误,所以这是折中的办法
e.preventDefault();
self.postMessage({
colno: e.colno,
lineno: e.lineno,
message: e.message,
filename: e.filename,
error: e.error,
isTrusted: e.isTrusted,
type: e.type,
});
});

addEventListener("unhandledrejection", (e) => {
// Deno 不能在主线程中捕获错误,所以这是折中的办法
e.preventDefault();
self.postMessage({
reason: e.reason,
isTrusted: e.isTrusted,
type: e.type,
});
});
}
108 changes: 66 additions & 42 deletions workerpool.ts
Original file line number Diff line number Diff line change
@@ -1,16 +1,33 @@
// deno-lint-ignore-file ban-ts-comment
export function downloadChunk<T, R>(
threadCount: number,
url: string,
headers: HeadersInit,
data: T[],
onProgress: (data: { origin: T; data: R; index: number }) => void,
maxRetries: number = 3
) {
import { fetchChunks } from "./worker.ts";

export interface Chunk {
start: number;
end: number;
}

export interface DownloadChunkOptions {
threadCount: number;
url: string;
headers: HeadersInit;
data: Chunk[];
onProgress: (data: {
origin: Chunk;
data: ArrayBuffer;
index: number;
}) => void;
maxRetries: number;
}

export function downloadChunk({
threadCount,
url,
headers,
data,
onProgress,
maxRetries = 3,
}: DownloadChunkOptions) {
if (data.length < 1) return Promise.resolve([]);
if (threadCount < 1) throw new Error("threadCount must be greater than 0");
const workerFactory = () =>
new Worker(import.meta.resolve("./worker.ts"), { type: "module" });
return new Promise<void>((resolve, reject) => {
const baseChunkCount = Math.floor(data.length / threadCount);
const remainingChunks = data.length % threadCount;
Expand All @@ -21,36 +38,34 @@ export function downloadChunk<T, R>(
if (startChunk >= data.length) break;
const endChunk =
startChunk + baseChunkCount + (i < remainingChunks ? 1 : 0);
const abortController = new AbortController();
const workerData = (workerPool[i] = {
worker: workerFactory(),
worker: fetchChunks({
url,
headers,
chunks: data.slice(startChunk, endChunk),
signal: abortController.signal,
}),
abortController,
startChunk,
endChunk,
currentChunk: startChunk,
retryCount: 0,
stolen: false,
} as WorkerData);
printWorkerData(workerData, i);
const messageHandle = (e: MessageEvent<R>) => {
// Deno 不能在主线程中捕获错误,所以这是折中的办法 start
// @ts-ignore
if (e.data?.type === "error" || e.data?.type === "unhandledrejection") {
// @ts-ignore
return errorHandel(e.data);
}
// Deno 不能在主线程中捕获错误,所以这是折中的办法 end
workerData.retryCount = 0;
const messageHandle = (result: ArrayBuffer) => {
onProgress({
index: workerData.currentChunk,
origin: data[workerData.currentChunk],
data: e.data,
data: result,
});
workerData.retryCount = 0;
workerData.currentChunk++;
if (workerData.currentChunk < workerData.endChunk) return;
if (workerData.stolen) {
workerData.worker.terminate();
workerData.worker = workerFactory();
workerData.worker.addEventListener("message", messageHandle);
workerData.worker.addEventListener("error", errorHandel);
workerData.abortController.abort();
workerData.abortController = new AbortController();
workerData.stolen = false;
}
let maxRemain = 0;
Expand All @@ -66,7 +81,6 @@ export function downloadChunk<T, R>(
}
if (maxRemain < 1) {
delete workerPool[i];
workerData.worker.terminate();
activeWorkers--;
if (activeWorkers === 0) resolve();
return;
Expand All @@ -77,7 +91,7 @@ export function downloadChunk<T, R>(
printWorkerData(targetWorker, targetWorkerIndex);
targetWorker.stolen = true;
const splitPoint = Math.ceil(
(targetWorker.currentChunk + targetWorker.endChunk) >>> 1
(targetWorker.currentChunk + targetWorker.endChunk) / 2
);
workerData.endChunk = targetWorker.endChunk;
workerData.currentChunk =
Expand All @@ -86,41 +100,51 @@ export function downloadChunk<T, R>(
splitPoint;
printWorkerData(workerData, i);
printWorkerData(targetWorker, targetWorkerIndex);
workerData.worker.postMessage({
workerData.worker = fetchChunks({
url,
headers,
chunks: data.slice(splitPoint, workerData.endChunk),
signal: workerData.abortController.signal,
});
addEventListener(workerData.worker, messageHandle, errorHandel);
};
const errorHandel = (err: ErrorEvent) => {
const errorHandel = (err: unknown) => {
if (workerData.retryCount >= maxRetries) {
for (let i = 0; i < workerPool.length; i++) {
if (i in workerPool) workerPool[i].worker.terminate();
}
for (let i = 0; i < workerPool.length; i++)
if (i in workerPool) workerPool[i].abortController.abort();
return reject(err);
}
workerData.retryCount++;
workerData.stolen = false;
printWorkerData(workerData, i, "try: ");
workerData.worker.postMessage({
workerData.worker = fetchChunks({
url,
headers,
chunks: data.slice(workerData.currentChunk, workerData.endChunk),
signal: workerData.abortController.signal,
});
addEventListener(workerData.worker, messageHandle, errorHandel);
};
workerData.worker.addEventListener("message", messageHandle);
workerData.worker.addEventListener("error", errorHandel);
workerData.worker.postMessage({
url,
headers,
chunks: data.slice(startChunk, endChunk),
});
addEventListener(workerData.worker, messageHandle, errorHandel);
}
});
}

async function addEventListener(
stream: AsyncGenerator<ArrayBuffer, void, unknown>,
messageHandle: (e: ArrayBuffer) => void,
errorHandle: (e: unknown) => void
) {
try {
for await (const data of stream) messageHandle(data);
} catch (e) {
errorHandle(e);
}
}

interface WorkerData {
worker: Worker;
worker: AsyncGenerator<ArrayBuffer, void, unknown>;
abortController: AbortController;
startChunk: number;
endChunk: number;
currentChunk: number;
Expand Down

0 comments on commit 8a84304

Please sign in to comment.