diff --git a/Win11_24H2_Pro_Chinese_Simplified_x64.iso b/Win11_24H2_Pro_Chinese_Simplified_x64.iso deleted file mode 100644 index e69de29..0000000 diff --git a/deno.json b/deno.json index c366228..6fe140d 100644 --- a/deno.json +++ b/deno.json @@ -1,8 +1,5 @@ { "tasks": { - "build": "deno compile --allow-write --allow-net --allow-read --include worker.ts main.ts" - }, - "compilerOptions": { - "lib": ["deno.window", "deno.worker"] + "build": "deno compile --allow-write --allow-net --allow-read main.ts" } } diff --git a/main.ts b/main.ts index df30083..466de48 100644 --- a/main.ts +++ b/main.ts @@ -1,6 +1,16 @@ import { basename, join } from "jsr:@std/path"; import { Mutex } from "npm:async-mutex"; -import { downloadChunk } from "./workerpool.ts"; +import { Chunk, downloadChunk } from "./workerpool.ts"; + +export interface DownloadOptions { + url: string; + dirpath: string; + threads: number; + chunkSize?: number; + headers?: HeadersInit; + startChunk?: number; + endChunk?: number; +} export async function download({ url, @@ -10,15 +20,7 @@ export async function download({ headers = {}, startChunk = 0, endChunk = Infinity, -}: { - url: string; - dirpath: string; - threads: number; - chunkSize?: number; - headers?: HeadersInit; - startChunk?: number; - endChunk?: number; -}) { +}: DownloadOptions) { async function getURLInfo() { while (true) { const r = await fetch(url, { @@ -59,12 +61,12 @@ export async function download({ start: startChunk + i * chunkSize, end: startChunk + Math.min((i + 1) * chunkSize, contentLength) - 1, })); - await downloadChunk( - threads, + await downloadChunk({ + threadCount: threads, url, headers, - chunks, - async (i) => { + data: chunks, + onProgress: async (i) => { const release = await mutex.acquire(); while (true) { try { @@ -78,21 +80,15 @@ export async function download({ release(); console.log(`chunk ${i.origin.start} - ${i.origin.end} done`); }, - Infinity - ); + maxRetries: Infinity, + }); await mutex.waitForUnlock(); } finally { file.close(); } - return filePath; } -export interface Chunk { - start: number; - end: number; -} - if (import.meta.main) { const url = Deno.args[0]; const threads = +Deno.args[1] || 32; diff --git a/worker.ts b/worker.ts index 8b01cb8..da2b795 100644 --- a/worker.ts +++ b/worker.ts @@ -1,45 +1,23 @@ -import type { Chunk } from "./main.ts"; +import type { Chunk } from "./workerpool.ts"; -interface EventData { +export interface FetchChunksOptions { url: string; headers: HeadersInit; chunks: Chunk[]; + signal: AbortSignal; } -addEventListener("message", async ({ data }: MessageEvent) => { +export async function* fetchChunks(data: FetchChunksOptions) { for (const item of data.chunks) { + if (data.signal.aborted) return; const r = await fetch(data.url, { headers: { ...data.headers, Range: `bytes=${item.start}-${item.end}`, }, + signal: data.signal, }); if (r.status !== 206) throw new Error(`Invalid status code ${r.status}"}`); - const buf = await r.arrayBuffer(); - self.postMessage(buf, [buf]); + yield r.arrayBuffer(); } -}); - -addEventListener("error", (e) => { - // Deno 不能在主线程中捕获错误,所以这是折中的办法 - e.preventDefault(); - self.postMessage({ - colno: e.colno, - lineno: e.lineno, - message: e.message, - filename: e.filename, - error: e.error, - isTrusted: e.isTrusted, - type: e.type, - }); -}); - -addEventListener("unhandledrejection", (e) => { - // Deno 不能在主线程中捕获错误,所以这是折中的办法 - e.preventDefault(); - self.postMessage({ - reason: e.reason, - isTrusted: e.isTrusted, - type: e.type, - }); -}); +} diff --git a/workerpool.ts b/workerpool.ts index 3495536..7f8a806 100644 --- a/workerpool.ts +++ b/workerpool.ts @@ -1,16 +1,33 @@ -// deno-lint-ignore-file ban-ts-comment -export function downloadChunk( - threadCount: number, - url: string, - headers: HeadersInit, - data: T[], - onProgress: (data: { origin: T; data: R; index: number }) => void, - maxRetries: number = 3 -) { +import { fetchChunks } from "./worker.ts"; + +export interface Chunk { + start: number; + end: number; +} + +export interface DownloadChunkOptions { + threadCount: number; + url: string; + headers: HeadersInit; + data: Chunk[]; + onProgress: (data: { + origin: Chunk; + data: ArrayBuffer; + index: number; + }) => void; + maxRetries: number; +} + +export function downloadChunk({ + threadCount, + url, + headers, + data, + onProgress, + maxRetries = 3, +}: DownloadChunkOptions) { if (data.length < 1) return Promise.resolve([]); if (threadCount < 1) throw new Error("threadCount must be greater than 0"); - const workerFactory = () => - new Worker(import.meta.resolve("./worker.ts"), { type: "module" }); return new Promise((resolve, reject) => { const baseChunkCount = Math.floor(data.length / threadCount); const remainingChunks = data.length % threadCount; @@ -21,8 +38,15 @@ export function downloadChunk( if (startChunk >= data.length) break; const endChunk = startChunk + baseChunkCount + (i < remainingChunks ? 1 : 0); + const abortController = new AbortController(); const workerData = (workerPool[i] = { - worker: workerFactory(), + worker: fetchChunks({ + url, + headers, + chunks: data.slice(startChunk, endChunk), + signal: abortController.signal, + }), + abortController, startChunk, endChunk, currentChunk: startChunk, @@ -30,27 +54,18 @@ export function downloadChunk( stolen: false, } as WorkerData); printWorkerData(workerData, i); - const messageHandle = (e: MessageEvent) => { - // Deno 不能在主线程中捕获错误,所以这是折中的办法 start - // @ts-ignore - if (e.data?.type === "error" || e.data?.type === "unhandledrejection") { - // @ts-ignore - return errorHandel(e.data); - } - // Deno 不能在主线程中捕获错误,所以这是折中的办法 end - workerData.retryCount = 0; + const messageHandle = (result: ArrayBuffer) => { onProgress({ index: workerData.currentChunk, origin: data[workerData.currentChunk], - data: e.data, + data: result, }); + workerData.retryCount = 0; workerData.currentChunk++; if (workerData.currentChunk < workerData.endChunk) return; if (workerData.stolen) { - workerData.worker.terminate(); - workerData.worker = workerFactory(); - workerData.worker.addEventListener("message", messageHandle); - workerData.worker.addEventListener("error", errorHandel); + workerData.abortController.abort(); + workerData.abortController = new AbortController(); workerData.stolen = false; } let maxRemain = 0; @@ -66,7 +81,6 @@ export function downloadChunk( } if (maxRemain < 1) { delete workerPool[i]; - workerData.worker.terminate(); activeWorkers--; if (activeWorkers === 0) resolve(); return; @@ -77,7 +91,7 @@ export function downloadChunk( printWorkerData(targetWorker, targetWorkerIndex); targetWorker.stolen = true; const splitPoint = Math.ceil( - (targetWorker.currentChunk + targetWorker.endChunk) >>> 1 + (targetWorker.currentChunk + targetWorker.endChunk) / 2 ); workerData.endChunk = targetWorker.endChunk; workerData.currentChunk = @@ -86,41 +100,51 @@ export function downloadChunk( splitPoint; printWorkerData(workerData, i); printWorkerData(targetWorker, targetWorkerIndex); - workerData.worker.postMessage({ + workerData.worker = fetchChunks({ url, headers, chunks: data.slice(splitPoint, workerData.endChunk), + signal: workerData.abortController.signal, }); + addEventListener(workerData.worker, messageHandle, errorHandel); }; - const errorHandel = (err: ErrorEvent) => { + const errorHandel = (err: unknown) => { if (workerData.retryCount >= maxRetries) { - for (let i = 0; i < workerPool.length; i++) { - if (i in workerPool) workerPool[i].worker.terminate(); - } + for (let i = 0; i < workerPool.length; i++) + if (i in workerPool) workerPool[i].abortController.abort(); return reject(err); } workerData.retryCount++; workerData.stolen = false; printWorkerData(workerData, i, "try: "); - workerData.worker.postMessage({ + workerData.worker = fetchChunks({ url, headers, chunks: data.slice(workerData.currentChunk, workerData.endChunk), + signal: workerData.abortController.signal, }); + addEventListener(workerData.worker, messageHandle, errorHandel); }; - workerData.worker.addEventListener("message", messageHandle); - workerData.worker.addEventListener("error", errorHandel); - workerData.worker.postMessage({ - url, - headers, - chunks: data.slice(startChunk, endChunk), - }); + addEventListener(workerData.worker, messageHandle, errorHandel); } }); } +async function addEventListener( + stream: AsyncGenerator, + messageHandle: (e: ArrayBuffer) => void, + errorHandle: (e: unknown) => void +) { + try { + for await (const data of stream) messageHandle(data); + } catch (e) { + errorHandle(e); + } +} + interface WorkerData { - worker: Worker; + worker: AsyncGenerator; + abortController: AbortController; startChunk: number; endChunk: number; currentChunk: number;