From eb05ef43dc0041297438600d4d7e210611ae727f Mon Sep 17 00:00:00 2001 From: Adrian Falleiro Date: Sat, 18 May 2024 16:49:24 +0000 Subject: [PATCH 1/7] feat: add decompress interceptor --- index.js | 3 +- lib/interceptor/decompress.js | 113 +++++++++++++ test/interceptors/decompress.js | 282 ++++++++++++++++++++++++++++++++ test/types/index.test-d.ts | 1 + types/index.d.ts | 1 + types/interceptors.d.ts | 1 + 6 files changed, 400 insertions(+), 1 deletion(-) create mode 100644 lib/interceptor/decompress.js create mode 100644 test/interceptors/decompress.js diff --git a/index.js b/index.js index 7a68d04abb3..3a63c085b48 100644 --- a/index.js +++ b/index.js @@ -41,7 +41,8 @@ module.exports.createRedirectInterceptor = createRedirectInterceptor module.exports.interceptors = { redirect: require('./lib/interceptor/redirect'), retry: require('./lib/interceptor/retry'), - dump: require('./lib/interceptor/dump') + dump: require('./lib/interceptor/dump'), + decompress: require('./lib/interceptor/decompress') } module.exports.buildConnector = buildConnector diff --git a/lib/interceptor/decompress.js b/lib/interceptor/decompress.js new file mode 100644 index 00000000000..58d9aea745b --- /dev/null +++ b/lib/interceptor/decompress.js @@ -0,0 +1,113 @@ +'use strict' + +const util = require('../core/util') +const DecoratorHandler = require('../handler/decorator-handler') +const zlib = require('node:zlib') +const { pipeline } = require('node:stream') + +const nullBodyStatus = [101, 204, 205, 304] + +class DecompressHandler extends DecoratorHandler { + #handler + #opts + + #inputStream = null + #trailers = null + + constructor (opts, handler) { + super(handler) + this.#handler = handler + this.#opts = opts + } + + onHeaders (statusCode, rawHeaders, resume, statusMessage) { + const parsedHeaders = util.parseHeaders(rawHeaders) + const contentEncoding = parsedHeaders['content-encoding'] + const encodings = contentEncoding ? contentEncoding.split(',').map(e => e.trim().toLowerCase()) : [] + + const { method } = this.#opts + + // https://developer.mozilla.org/en-US/docs/Web/HTTP/Headers/Content-Encoding + if (encodings.length !== 0 && method !== 'HEAD' && method !== 'CONNECT' && !nullBodyStatus.includes(statusCode)) { + const decoders = [] + for (let i = 0; i < encodings.length; ++i) { + const encoding = encodings[i] + // https://www.rfc-editor.org/rfc/rfc9112.html#section-7.2 + if (encoding === 'x-gzip' || encoding === 'gzip') { + decoders.push(zlib.createGunzip({ + // Be less strict when decoding compressed responses, since sometimes + // servers send slightly invalid responses that are still accepted + // by common browsers. + // Always using Z_SYNC_FLUSH is what cURL does. + flush: zlib.constants.Z_SYNC_FLUSH, + finishFlush: zlib.constants.Z_SYNC_FLUSH + })) + } else if (encoding === 'deflate') { + decoders.push(zlib.createInflate()) + } else if (encoding === 'br') { + decoders.push(zlib.createBrotliDecompress()) + } else { + decoders.length = 0 + break + } + } + + if (decoders.length !== 0) { + const [first, ...rest] = decoders + this.#inputStream = first + + if (rest.length !== 0) { + pipeline( + this.#inputStream, + ...rest, + err => { + if (err) { + this.#handler.onError(err) + } else { + this.#handler.onComplete(this.#trailers) + } + } + ).on('data', (chunk) => this.#handler.onData(chunk)) + } else { + this.#inputStream.on('data', (chunk) => this.#handler.onData(chunk)) + this.#inputStream.on('end', () => this.#handler.onComplete(this.#trailers)) + this.#inputStream.on('error', (err) => this.#handler.onError(err)) + } + } + } + + return this.#handler.onHeaders( + statusCode, + rawHeaders, + resume, + statusMessage + ) + } + + onData (chunk) { + if (this.#inputStream) { + return this.#inputStream.write(chunk) + } + return this.#handler.onData(chunk) + } + + onComplete (trailers) { + if (this.#inputStream) { + this.#trailers = trailers + this.#inputStream.end() + return + } + + return this.#handler.onComplete(trailers) + } +} + +function createDecompressionInterceptor () { + return dispatch => { + return function Decompress (opts, handler) { + return dispatch(opts, new DecompressHandler(opts, handler)) + } + } +} + +module.exports = createDecompressionInterceptor diff --git a/test/interceptors/decompress.js b/test/interceptors/decompress.js new file mode 100644 index 00000000000..7d003c2da95 --- /dev/null +++ b/test/interceptors/decompress.js @@ -0,0 +1,282 @@ +'use strict' +const { test, after } = require('node:test') +const { createServer } = require('node:http') +const { once } = require('node:events') +const { tspl } = require('@matteo.collina/tspl') +const { createBrotliCompress, createGzip, createDeflate } = require('node:zlib') + +const { Client, interceptors } = require('../..') +const { decompress } = interceptors + +test('decompresses gzip encoding', async (t) => { + t = tspl(t, { plan: 1 }) + const contentEncodings = 'gzip' + const text = 'Hello, World!' + + const server = createServer((req, res) => { + const gzip = createGzip() + + res.setHeader('Content-Encoding', contentEncodings) + res.setHeader('Content-Type', 'text/plain') + + gzip.pipe(res) + gzip.write(text) + gzip.end() + }).listen(0) + + await once(server, 'listening') + + const client = new Client( + `http://localhost:${server.address().port}` + ).compose(decompress()) + + after(async () => { + await client.close() + + server.close() + await once(server, 'close') + }) + + const response = await client.request({ + method: 'GET', + path: '/' + }) + + t.equal(await response.body.text(), text) + + await t.completed +}) + +test('decompresses deflate encoding', async (t) => { + t = tspl(t, { plan: 1 }) + const contentEncodings = 'deflate' + const text = 'Hello, World!' + + const server = createServer((req, res) => { + const deflate = createDeflate() + + res.setHeader('Content-Encoding', contentEncodings) + res.setHeader('Content-Type', 'text/plain') + + deflate.pipe(res) + deflate.write(text) + deflate.end() + }).listen(0) + + await once(server, 'listening') + + const client = new Client( + `http://localhost:${server.address().port}` + ).compose(decompress()) + + after(async () => { + await client.close() + + server.close() + await once(server, 'close') + }) + + const response = await client.request({ + method: 'GET', + path: '/' + }) + + t.equal(await response.body.text(), text) + + await t.completed +}) + +test('decompresses brotli encoding', async (t) => { + t = tspl(t, { plan: 1 }) + const contentEncodings = 'br' + const text = 'Hello, World!' + + const server = createServer((req, res) => { + const brotli = createBrotliCompress() + + res.setHeader('Content-Encoding', contentEncodings) + res.setHeader('Content-Type', 'text/plain') + + brotli.pipe(res) + brotli.write(text) + brotli.end() + }).listen(0) + + await once(server, 'listening') + + const client = new Client( + `http://localhost:${server.address().port}` + ).compose(decompress()) + + after(async () => { + await client.close() + + server.close() + await once(server, 'close') + }) + + const response = await client.request({ + method: 'GET', + path: '/' + }) + t.equal(await response.body.text(), text) + + await t.completed +}) + +test('decompresses multiple encodings', async (t) => { + t = tspl(t, { plan: 1 }) + const contentEncodings = 'gzip, br' + const text = 'Hello, World!' + + const server = createServer((req, res) => { + const gzip = createGzip() + const brotli = createBrotliCompress() + + res.setHeader('Content-Encoding', contentEncodings) + res.setHeader('Content-Type', 'text/plain') + + brotli.pipe(gzip).pipe(res) + + brotli.write(text) + brotli.end() + }).listen(0) + + await once(server, 'listening') + + const client = new Client( + `http://localhost:${server.address().port}` + ).compose(decompress()) + + after(async () => { + await client.close() + + server.close() + await once(server, 'close') + }) + + const response = await client.request({ + method: 'GET', + path: '/' + }) + + t.equal(await response.body.text(), text) + + await t.completed +}) + +test('content-encoding header is case-iNsENsITIve', async (t) => { + t = tspl(t, { plan: 1 }) + const contentCodings = 'GZiP, Br' + const text = 'Hello, World!' + + const server = createServer((req, res) => { + const gzip = createGzip() + const brotli = createBrotliCompress() + + res.setHeader('Content-Encoding', contentCodings) + res.setHeader('Content-Type', 'text/plain') + + brotli.pipe(gzip).pipe(res) + brotli.write(text) + brotli.end() + }).listen(0) + + await once(server, 'listening') + + const client = new Client( + `http://localhost:${server.address().port}` + ).compose(decompress()) + + after(async () => { + await client.close() + + server.close() + await once(server, 'close') + }) + + const response = await client.request({ + method: 'GET', + path: '/' + }) + + t.equal(await response.body.text(), text) + + await t.completed +}) + +test('does not throw when an unsupported content encoding is encountered', async (t) => { + t = tspl(t, { plan: 1 }) + const contentCodings = 'UNSUPPORTED' + const text = 'Hello, World!' + + const server = createServer((req, res) => { + res.setHeader('Content-Encoding', contentCodings) + res.setHeader('Content-Type', 'text/plain') + res.write(text) + res.end() + }).listen(0) + + await once(server, 'listening') + + const client = new Client( + `http://localhost:${server.address().port}` + ).compose(decompress()) + + after(async () => { + await client.close() + + server.close() + await once(server, 'close') + }) + + const response = await client.request({ + method: 'GET', + path: '/' + } + ) + t.equal(await response.body.text(), text) + + await t.completed +}) + +test('response decompression according to content-encoding should be handled in a correct order', async (t) => { + t = tspl(t, { plan: 1 }) + const contentCodings = 'deflate, gzip' + const text = 'Hello, World!' + + const server = createServer((req, res) => { + const gzip = createGzip() + const deflate = createDeflate() + + res.setHeader('Content-Encoding', contentCodings) + res.setHeader('Content-Type', 'text/plain') + + gzip.pipe(deflate).pipe(res) + + gzip.write(text) + gzip.end() + }).listen(0) + + await once(server, 'listening') + + const client = new Client( + `http://localhost:${server.address().port}` + ).compose(decompress()) + + after(async () => { + await client.close() + + server.close() + await once(server, 'close') + }) + + const response = await client.request({ + method: 'GET', + path: '/' + }) + + t.equal(await response.body.text(), text) + + await t.completed +}) diff --git a/test/types/index.test-d.ts b/test/types/index.test-d.ts index 39e413c02c8..9401352eac6 100644 --- a/test/types/index.test-d.ts +++ b/test/types/index.test-d.ts @@ -16,6 +16,7 @@ expectAssignable(Undici.FileReader) expectAssignable(Undici.interceptors.dump()) expectAssignable(Undici.interceptors.redirect()) expectAssignable(Undici.interceptors.retry()) +expectAssignable(Undici.interceptors.decompress()) const client = new Undici.Client('', {}) const handler: Dispatcher.DispatchHandlers = {} diff --git a/types/index.d.ts b/types/index.d.ts index 9e5eaeb3d54..89598f00443 100644 --- a/types/index.d.ts +++ b/types/index.d.ts @@ -70,5 +70,6 @@ declare namespace Undici { dump: typeof import('./interceptors').dump; retry: typeof import('./interceptors').retry; redirect: typeof import('./interceptors').redirect; + decompress: typeof import('./interceptors').decompress; } } diff --git a/types/interceptors.d.ts b/types/interceptors.d.ts index d546ac344e3..3b95a99ee42 100644 --- a/types/interceptors.d.ts +++ b/types/interceptors.d.ts @@ -9,3 +9,4 @@ export declare function createRedirectInterceptor (opts: RedirectInterceptorOpts export declare function dump(opts?: DumpInterceptorOpts): Dispatcher.DispatcherComposeInterceptor export declare function retry(opts?: RetryInterceptorOpts): Dispatcher.DispatcherComposeInterceptor export declare function redirect(opts?: RedirectInterceptorOpts): Dispatcher.DispatcherComposeInterceptor +export declare function decompress(): Dispatcher.DispatcherComposeInterceptor From df4804c0c7de417328e2d4c8dc678c25f2a3ac73 Mon Sep 17 00:00:00 2001 From: Adrian Falleiro Date: Sat, 18 May 2024 17:05:46 +0000 Subject: [PATCH 2/7] fix: use existing `createInflate` util --- lib/interceptor/decompress.js | 3 ++- 1 file changed, 2 insertions(+), 1 deletion(-) diff --git a/lib/interceptor/decompress.js b/lib/interceptor/decompress.js index 58d9aea745b..310dff8ef1e 100644 --- a/lib/interceptor/decompress.js +++ b/lib/interceptor/decompress.js @@ -4,6 +4,7 @@ const util = require('../core/util') const DecoratorHandler = require('../handler/decorator-handler') const zlib = require('node:zlib') const { pipeline } = require('node:stream') +const { createInflate } = require('../web/fetch/util') const nullBodyStatus = [101, 204, 205, 304] @@ -43,7 +44,7 @@ class DecompressHandler extends DecoratorHandler { finishFlush: zlib.constants.Z_SYNC_FLUSH })) } else if (encoding === 'deflate') { - decoders.push(zlib.createInflate()) + decoders.push(createInflate()) } else if (encoding === 'br') { decoders.push(zlib.createBrotliDecompress()) } else { From 61a721cc1774d874a0f9c21332c4fdeaf595d33a Mon Sep 17 00:00:00 2001 From: Adrian Falleiro Date: Sat, 18 May 2024 16:26:24 -0400 Subject: [PATCH 3/7] fix: add test for raw deflate --- test/interceptors/decompress.js | 41 ++++++++++++++++++++++++++++++++- 1 file changed, 40 insertions(+), 1 deletion(-) diff --git a/test/interceptors/decompress.js b/test/interceptors/decompress.js index 7d003c2da95..e7ef16a26e9 100644 --- a/test/interceptors/decompress.js +++ b/test/interceptors/decompress.js @@ -3,7 +3,7 @@ const { test, after } = require('node:test') const { createServer } = require('node:http') const { once } = require('node:events') const { tspl } = require('@matteo.collina/tspl') -const { createBrotliCompress, createGzip, createDeflate } = require('node:zlib') +const { createBrotliCompress, createGzip, createDeflate, createDeflateRaw } = require('node:zlib') const { Client, interceptors } = require('../..') const { decompress } = interceptors @@ -86,6 +86,45 @@ test('decompresses deflate encoding', async (t) => { await t.completed }) +test('decompresses raw deflate encoding', async (t) => { + t = tspl(t, { plan: 1 }) + const contentEncodings = 'deflate' + const text = 'Hello, World!' + + const server = createServer((req, res) => { + const deflate = createDeflateRaw() + + res.setHeader('Content-Encoding', contentEncodings) + res.setHeader('Content-Type', 'text/plain') + + deflate.pipe(res) + deflate.write(text) + deflate.end() + }).listen(0) + + await once(server, 'listening') + + const client = new Client( + `http://localhost:${server.address().port}` + ).compose(decompress()) + + after(async () => { + await client.close() + + server.close() + await once(server, 'close') + }) + + const response = await client.request({ + method: 'GET', + path: '/' + }) + + t.equal(await response.body.text(), text) + + await t.completed +}) + test('decompresses brotli encoding', async (t) => { t = tspl(t, { plan: 1 }) const contentEncodings = 'br' From cd172cbe1ce22381f71410ebeb2699fee4488857 Mon Sep 17 00:00:00 2001 From: Adrian Falleiro Date: Sat, 18 May 2024 17:19:55 -0400 Subject: [PATCH 4/7] Update lib/interceptor/decompress.js Co-authored-by: Robert Nagy --- lib/interceptor/decompress.js | 6 +++++- 1 file changed, 5 insertions(+), 1 deletion(-) diff --git a/lib/interceptor/decompress.js b/lib/interceptor/decompress.js index 310dff8ef1e..0015e4a5b00 100644 --- a/lib/interceptor/decompress.js +++ b/lib/interceptor/decompress.js @@ -20,7 +20,11 @@ class DecompressHandler extends DecoratorHandler { this.#handler = handler this.#opts = opts } - +onConnect(abort) { + this.#inputStream = null + this.#trailers = null + return this.#handler.onConnect(abort) +} onHeaders (statusCode, rawHeaders, resume, statusMessage) { const parsedHeaders = util.parseHeaders(rawHeaders) const contentEncoding = parsedHeaders['content-encoding'] From deee27f71c5758767dfa16de2cce4f8672f1e143 Mon Sep 17 00:00:00 2001 From: Adrian Falleiro Date: Sat, 18 May 2024 17:19:30 -0400 Subject: [PATCH 5/7] use `closeClientAndServerAsPromise` --- test/interceptors/decompress.js | 57 ++++++--------------------------- 1 file changed, 9 insertions(+), 48 deletions(-) diff --git a/test/interceptors/decompress.js b/test/interceptors/decompress.js index e7ef16a26e9..0f1ea9f888b 100644 --- a/test/interceptors/decompress.js +++ b/test/interceptors/decompress.js @@ -4,6 +4,7 @@ const { createServer } = require('node:http') const { once } = require('node:events') const { tspl } = require('@matteo.collina/tspl') const { createBrotliCompress, createGzip, createDeflate, createDeflateRaw } = require('node:zlib') +const { closeClientAndServerAsPromise } = require('../utils/node-http') const { Client, interceptors } = require('../..') const { decompress } = interceptors @@ -30,12 +31,7 @@ test('decompresses gzip encoding', async (t) => { `http://localhost:${server.address().port}` ).compose(decompress()) - after(async () => { - await client.close() - - server.close() - await once(server, 'close') - }) + after(closeClientAndServerAsPromise(client, server)) const response = await client.request({ method: 'GET', @@ -69,12 +65,7 @@ test('decompresses deflate encoding', async (t) => { `http://localhost:${server.address().port}` ).compose(decompress()) - after(async () => { - await client.close() - - server.close() - await once(server, 'close') - }) + after(closeClientAndServerAsPromise(server, client)) const response = await client.request({ method: 'GET', @@ -108,12 +99,7 @@ test('decompresses raw deflate encoding', async (t) => { `http://localhost:${server.address().port}` ).compose(decompress()) - after(async () => { - await client.close() - - server.close() - await once(server, 'close') - }) + after(closeClientAndServerAsPromise(server, client)) const response = await client.request({ method: 'GET', @@ -147,12 +133,7 @@ test('decompresses brotli encoding', async (t) => { `http://localhost:${server.address().port}` ).compose(decompress()) - after(async () => { - await client.close() - - server.close() - await once(server, 'close') - }) + after(closeClientAndServerAsPromise(server, client)) const response = await client.request({ method: 'GET', @@ -187,12 +168,7 @@ test('decompresses multiple encodings', async (t) => { `http://localhost:${server.address().port}` ).compose(decompress()) - after(async () => { - await client.close() - - server.close() - await once(server, 'close') - }) + after(closeClientAndServerAsPromise(server, client)) const response = await client.request({ method: 'GET', @@ -227,12 +203,7 @@ test('content-encoding header is case-iNsENsITIve', async (t) => { `http://localhost:${server.address().port}` ).compose(decompress()) - after(async () => { - await client.close() - - server.close() - await once(server, 'close') - }) + after(closeClientAndServerAsPromise(server, client)) const response = await client.request({ method: 'GET', @@ -262,12 +233,7 @@ test('does not throw when an unsupported content encoding is encountered', async `http://localhost:${server.address().port}` ).compose(decompress()) - after(async () => { - await client.close() - - server.close() - await once(server, 'close') - }) + after(closeClientAndServerAsPromise(server, client)) const response = await client.request({ method: 'GET', @@ -303,12 +269,7 @@ test('response decompression according to content-encoding should be handled in `http://localhost:${server.address().port}` ).compose(decompress()) - after(async () => { - await client.close() - - server.close() - await once(server, 'close') - }) + after(closeClientAndServerAsPromise(server, client)) const response = await client.request({ method: 'GET', From f1068be5d0101e8bc233f4ec6319578d990c7403 Mon Sep 17 00:00:00 2001 From: Adrian Falleiro Date: Sat, 18 May 2024 17:36:53 -0400 Subject: [PATCH 6/7] refactor: prevent `onError` call after `onComplete` for single decoder --- lib/interceptor/decompress.js | 45 +++++++++++++++++++---------------- 1 file changed, 25 insertions(+), 20 deletions(-) diff --git a/lib/interceptor/decompress.js b/lib/interceptor/decompress.js index 0015e4a5b00..a13ae7defe2 100644 --- a/lib/interceptor/decompress.js +++ b/lib/interceptor/decompress.js @@ -3,7 +3,7 @@ const util = require('../core/util') const DecoratorHandler = require('../handler/decorator-handler') const zlib = require('node:zlib') -const { pipeline } = require('node:stream') +const { pipeline, finished } = require('node:stream') const { createInflate } = require('../web/fetch/util') const nullBodyStatus = [101, 204, 205, 304] @@ -20,11 +20,21 @@ class DecompressHandler extends DecoratorHandler { this.#handler = handler this.#opts = opts } -onConnect(abort) { - this.#inputStream = null - this.#trailers = null - return this.#handler.onConnect(abort) -} + + onConnect (abort) { + this.#inputStream = null + this.#trailers = null + return this.#handler.onConnect(abort) + } + + #onDecompressStreamFinished (err) { + if (err) { + this.#handler.onError(err) + } else { + this.#handler.onComplete(this.#trailers) + } + } + onHeaders (statusCode, rawHeaders, resume, statusMessage) { const parsedHeaders = util.parseHeaders(rawHeaders) const contentEncoding = parsedHeaders['content-encoding'] @@ -58,25 +68,20 @@ onConnect(abort) { } if (decoders.length !== 0) { - const [first, ...rest] = decoders - this.#inputStream = first + const [firstDecoder, ...restDecoders] = decoders + this.#inputStream = firstDecoder - if (rest.length !== 0) { + if (restDecoders.length !== 0) { pipeline( this.#inputStream, - ...rest, - err => { - if (err) { - this.#handler.onError(err) - } else { - this.#handler.onComplete(this.#trailers) - } - } + ...restDecoders, + this.#onDecompressStreamFinished.bind(this) ).on('data', (chunk) => this.#handler.onData(chunk)) } else { - this.#inputStream.on('data', (chunk) => this.#handler.onData(chunk)) - this.#inputStream.on('end', () => this.#handler.onComplete(this.#trailers)) - this.#inputStream.on('error', (err) => this.#handler.onError(err)) + finished( + this.#inputStream.on('data', (chunk) => this.#handler.onData(chunk)), + this.#onDecompressStreamFinished.bind(this) + ) } } } From 13ce6ac879de971eb432ed3bd5d988cafe26ac28 Mon Sep 17 00:00:00 2001 From: Adrian Falleiro Date: Mon, 20 May 2024 00:44:06 -0400 Subject: [PATCH 7/7] handle backpressure --- lib/interceptor/decompress.js | 28 +++++++++++------ test/interceptors/decompress.js | 55 ++++++++++++++++++++++++++++++++- 2 files changed, 72 insertions(+), 11 deletions(-) diff --git a/lib/interceptor/decompress.js b/lib/interceptor/decompress.js index a13ae7defe2..c17901e864b 100644 --- a/lib/interceptor/decompress.js +++ b/lib/interceptor/decompress.js @@ -38,17 +38,17 @@ class DecompressHandler extends DecoratorHandler { onHeaders (statusCode, rawHeaders, resume, statusMessage) { const parsedHeaders = util.parseHeaders(rawHeaders) const contentEncoding = parsedHeaders['content-encoding'] - const encodings = contentEncoding ? contentEncoding.split(',').map(e => e.trim().toLowerCase()) : [] + const requestEncodings = contentEncoding ? contentEncoding.split(',').map(e => e.trim().toLowerCase()) : [] const { method } = this.#opts // https://developer.mozilla.org/en-US/docs/Web/HTTP/Headers/Content-Encoding - if (encodings.length !== 0 && method !== 'HEAD' && method !== 'CONNECT' && !nullBodyStatus.includes(statusCode)) { + if (requestEncodings.length !== 0 && method !== 'HEAD' && method !== 'CONNECT' && !nullBodyStatus.includes(statusCode)) { const decoders = [] - for (let i = 0; i < encodings.length; ++i) { - const encoding = encodings[i] + for (let i = 0; i < requestEncodings.length; ++i) { + const requestEncoding = requestEncodings[i] // https://www.rfc-editor.org/rfc/rfc9112.html#section-7.2 - if (encoding === 'x-gzip' || encoding === 'gzip') { + if ((requestEncoding === 'x-gzip' || requestEncoding === 'gzip')) { decoders.push(zlib.createGunzip({ // Be less strict when decoding compressed responses, since sometimes // servers send slightly invalid responses that are still accepted @@ -57,9 +57,9 @@ class DecompressHandler extends DecoratorHandler { flush: zlib.constants.Z_SYNC_FLUSH, finishFlush: zlib.constants.Z_SYNC_FLUSH })) - } else if (encoding === 'deflate') { + } else if (requestEncoding === 'deflate') { decoders.push(createInflate()) - } else if (encoding === 'br') { + } else if (requestEncoding === 'br') { decoders.push(zlib.createBrotliDecompress()) } else { decoders.length = 0 @@ -70,19 +70,27 @@ class DecompressHandler extends DecoratorHandler { if (decoders.length !== 0) { const [firstDecoder, ...restDecoders] = decoders this.#inputStream = firstDecoder + let outputStream = firstDecoder if (restDecoders.length !== 0) { - pipeline( + outputStream = pipeline( this.#inputStream, ...restDecoders, this.#onDecompressStreamFinished.bind(this) - ).on('data', (chunk) => this.#handler.onData(chunk)) + ) } else { finished( - this.#inputStream.on('data', (chunk) => this.#handler.onData(chunk)), + this.#inputStream, this.#onDecompressStreamFinished.bind(this) ) } + + outputStream.on('data', (chunk) => { + if (!this.#handler.onData(chunk)) { + this.#inputStream.pause() + this.#inputStream.once('drain', () => this.#inputStream.resume()) + } + }) } } diff --git a/test/interceptors/decompress.js b/test/interceptors/decompress.js index 0f1ea9f888b..700927a38cc 100644 --- a/test/interceptors/decompress.js +++ b/test/interceptors/decompress.js @@ -7,6 +7,7 @@ const { createBrotliCompress, createGzip, createDeflate, createDeflateRaw } = re const { closeClientAndServerAsPromise } = require('../utils/node-http') const { Client, interceptors } = require('../..') +const { PassThrough } = require('node:stream') const { decompress } = interceptors test('decompresses gzip encoding', async (t) => { @@ -215,7 +216,7 @@ test('content-encoding header is case-iNsENsITIve', async (t) => { await t.completed }) -test('does not throw when an unsupported content encoding is encountered', async (t) => { +test('does not throw when an unknown content encoding is received in the response', async (t) => { t = tspl(t, { plan: 1 }) const contentCodings = 'UNSUPPORTED' const text = 'Hello, World!' @@ -280,3 +281,55 @@ test('response decompression according to content-encoding should be handled in await t.completed }) + +test('handles backpressure', async (t) => { + t = tspl(t, { plan: 1 }) + const contentCodings = 'deflate, gzip' + + const text = Buffer.alloc(1e6).toString() + + const server = createServer((req, res) => { + const gzip = createGzip() + const deflate = createDeflate() + + res.setHeader('Content-Encoding', contentCodings) + res.setHeader('Content-Type', 'text/plain') + + gzip.pipe(deflate).pipe(res) + + gzip.write(text) + gzip.end() + }).listen(0) + + await once(server, 'listening') + + const dst = new PassThrough() + + dst.on('data', () => { + dst.pause() + setImmediate(() => dst.resume()) + }).on('end', () => { + t.ok(true, 'pass') + }) + + const client = new Client( + `http://localhost:${server.address().port}` + ).compose(decompress()) + + after(closeClientAndServerAsPromise(server, client)) + + await client.dispatch({ + method: 'GET', + path: '/' + }, + { + onError: (err) => { throw err }, + onConnect: () => {}, + onBodySent: () => {}, + onHeaders: () => {}, + onComplete: () => dst.end(), + onData: (chunk) => dst.write(chunk) + }) + + await t.completed +})