Skip to content

Commit

Permalink
refactor: 添加块合并下载,流式报告
Browse files Browse the repository at this point in the history
  • Loading branch information
share121 committed Feb 7, 2025
1 parent 96b8ff8 commit f7fa5f5
Show file tree
Hide file tree
Showing 3 changed files with 57 additions and 17 deletions.
2 changes: 1 addition & 1 deletion main.ts
Original file line number Diff line number Diff line change
Expand Up @@ -71,7 +71,7 @@ export async function download({
while (true) {
try {
await file.seek(i.origin.start, Deno.SeekMode.Start);
await file.write(new Uint8Array(i.data));
await file.write(i.data);
break;
} catch (e) {
console.error("%c" + e, "color: red");
Expand Down
62 changes: 51 additions & 11 deletions worker.ts
Original file line number Diff line number Diff line change
Expand Up @@ -7,16 +7,56 @@ export interface FetchChunksOptions {
signal: AbortSignal;
}

export async function* fetchChunks(data: FetchChunksOptions) {
for (const item of data.chunks) {
const r = await fetch(data.url, {
headers: {
...data.headers,
Range: `bytes=${item.start}-${item.end}`,
},
signal: data.signal,
});
if (r.status !== 206) throw new Error(`Invalid status code ${r.status}"}`);
yield r.arrayBuffer();
export async function* fetchChunks(
data: FetchChunksOptions
): AsyncGenerator<Uint8Array, void, unknown> {
const r = await fetch(data.url, {
headers: {
...data.headers,
Range: `bytes=${data.chunks[0].start}-${data.chunks.at(-1)!.end}`,
},
signal: data.signal,
});
if (r.status !== 206) throw new Error(`Invalid status code ${r.status}"}`);
if (!r.body) throw new Error("No body");
const reader = r.body.getReader();
let currentChunkIndex = 0;
let currentBuffer: Uint8Array | null = null;
let bufferOffset = 0;
try {
while (true) {
const { done, value } = await reader.read();
if (done) break;
let bytesProcessed = 0;
while (bytesProcessed < value.length) {
if (!currentBuffer && currentChunkIndex < data.chunks.length) {
const chunk = data.chunks[currentChunkIndex];
currentBuffer = new Uint8Array(chunk.end - chunk.start + 1);
bufferOffset = 0;
}
if (!currentBuffer) throw new Error("Unexpected extra data received");
const remainingBytes = currentBuffer.length - bufferOffset;
const bytesToCopy = Math.min(
remainingBytes,
value.length - bytesProcessed
);
currentBuffer.set(
value.subarray(bytesProcessed, bytesProcessed + bytesToCopy),
bufferOffset
);
bufferOffset += bytesToCopy;
bytesProcessed += bytesToCopy;
if (bufferOffset === currentBuffer.length) {
yield currentBuffer;
currentChunkIndex++;
currentBuffer = null;
}
}
}
if (currentChunkIndex !== data.chunks.length || currentBuffer) {
throw new Error("Incomplete data received");
}
} finally {
reader.releaseLock();
}
}
10 changes: 5 additions & 5 deletions workerpool.ts
Original file line number Diff line number Diff line change
Expand Up @@ -12,7 +12,7 @@ export interface DownloadChunkOptions {
data: Chunk[];
onProgress: (data: {
origin: Chunk;
data: ArrayBuffer;
data: Uint8Array;
index: number;
}) => void;
maxRetries: number;
Expand Down Expand Up @@ -54,7 +54,7 @@ export function downloadChunk({
stolen: false,
} as WorkerData);
printWorkerData(workerData, i);
const messageHandle = (result: ArrayBuffer) => {
const messageHandle = (result: Uint8Array) => {
onProgress({
index: workerData.currentChunk,
origin: data[workerData.currentChunk],
Expand Down Expand Up @@ -131,8 +131,8 @@ export function downloadChunk({
}

async function addEventListener(
stream: AsyncGenerator<ArrayBuffer, void, unknown>,
messageHandle: (e: ArrayBuffer) => void,
stream: AsyncGenerator<Uint8Array, void, unknown>,
messageHandle: (e: Uint8Array) => void,
errorHandle: (e: unknown) => void
) {
try {
Expand All @@ -144,7 +144,7 @@ async function addEventListener(
}

interface WorkerData {
worker: AsyncGenerator<ArrayBuffer, void, unknown>;
worker: AsyncGenerator<Uint8Array, void, unknown>;
abortController: AbortController;
startChunk: number;
endChunk: number;
Expand Down

0 comments on commit f7fa5f5

Please sign in to comment.