Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

feat: add decompress interceptor #3274

Open
wants to merge 7 commits into
base: main
Choose a base branch
from

Conversation

adrianfalleiro
Copy link

Added a new interceptor to decompress the response body. I lifted some of the implementation from the way decompression is handled in the fetch client

This relates to...

Rationale

Changes

  • Adds a response interceptor to decompress gzip, brotli and deflate responses

Features

Bug Fixes

Breaking Changes and Deprecations

Status

@adrianfalleiro adrianfalleiro force-pushed the feat/decompress-interceptor branch from 938fa4b to df4804c Compare May 18, 2024 17:52
@Uzlopak
Copy link
Contributor

Uzlopak commented May 18, 2024

Should we also handle rawdeflate?

@adrianfalleiro
Copy link
Author

Should we also handle rawdeflate?

Yep, I'm using the existing createInflate() util from the fetch client which handles both deflate and deflateRaw so it should handle both already. I'll add a test.

this.#inputStream,
...restDecoders,
this.#onDecompressStreamFinished.bind(this)
).on('data', (chunk) => this.#handler.onData(chunk))
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Missing backpressure

Copy link
Member

@metcoder95 metcoder95 left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Documentation seems lost

}
}

function createDecompressionInterceptor () {
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

We might need to support add the hinting of supported encoding in case implementers wants to limit this (I can only think for performance reasons).
They can pass a set of encodings supported, and the decompress decorator hints it using accept-encoding

outputStream.on('data', (chunk) => {
if (!this.#handler.onData(chunk)) {
this.#inputStream.pause()
this.#inputStream.once('drain', () => this.#inputStream.resume())
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Those is wrong. Drain is signaled through the resume function sent to onHeaders

// https://developer.mozilla.org/en-US/docs/Web/HTTP/Headers/Content-Encoding
if (requestEncodings.length !== 0 && method !== 'HEAD' && method !== 'CONNECT' && !nullBodyStatus.includes(statusCode)) {
const decoders = []
for (let i = 0; i < requestEncodings.length; ++i) {
Copy link
Member

@tsctx tsctx Jun 18, 2024

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Suggested change
for (let i = 0; i < requestEncodings.length; ++i) {
for (let i = requestEncodings.length - 1; i >= 0; --i) {

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

The other way round, right?

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Yeah, I made the same changes #3343.

Copy link
Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Thanks, I'll come back to this PR this week

}
}

return this.#handler.onHeaders(

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Thanks for putting up this pr! I added this interceptor into my project.

This interceptor could also be used with fetch if you remove the content-encoding. My use case is I have a LoggingInterceptor that sends the response body to GCP logging. This worked great except one API sends a gzipped response. Now I have the following flow:

Decompress -> Logging -> Fetch

    delete parsedHeaders['content-encoding'];
    const newRawHeaders = Object.entries(parsedHeaders)
      .flat()
      .map((e) => Buffer.from(e));

    return this.#handler.onHeaders(statusCode, newRawHeaders, resume, statusMessage);

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Welp - I ended up having issues with the interceptor. Haven't been able to dive in yet, but I feel like my flow should work. Will try again once this is reviewed/merged.

@tjhiggins
Copy link

Last piece for this to work for me with fetch. Fetch does some sort of back pressuring.

const wrappedResume = () => {
  if (this.#inputStream) {
    this.#inputStream.resume();
  }
  return resume();
};

return this.#handler.onHeaders!(statusCode, newRawHeaders, wrappedResume, statusText);

Full onHeaders

onHeaders(statusCode: number, rawHeaders: Buffer[], resume: () => void, statusText: string) {
  const parsedHeaders = util.parseHeaders(rawHeaders);
  const contentEncoding = parsedHeaders['content-encoding'] as string | undefined;
  const requestEncodings = contentEncoding ? contentEncoding.split(',').map((e) => e.trim().toLowerCase()) : [];

  const { method } = this.#opts;

  // https://developer.mozilla.org/en-US/docs/Web/HTTP/Headers/Content-Encoding
  if (
    requestEncodings.length !== 0 &&
    method !== 'HEAD' &&
    method !== 'CONNECT' &&
    !nullBodyStatus.includes(statusCode)
  ) {
    const decoders: Transform[] = [];
    for (let i = requestEncodings.length - 1; i >= 0; --i) {
      const requestEncoding = requestEncodings[i];
      // https://www.rfc-editor.org/rfc/rfc9112.html#section-7.2
      if (requestEncoding === 'x-gzip' || requestEncoding === 'gzip') {
        decoders.push(
          zlib.createGunzip({
            // Be less strict when decoding compressed responses, since sometimes
            // servers send slightly invalid responses that are still accepted
            // by common browsers.
            // Always using Z_SYNC_FLUSH is what cURL does.
            flush: zlib.constants.Z_SYNC_FLUSH,
            finishFlush: zlib.constants.Z_SYNC_FLUSH,
          }),
        );
      } else if (requestEncoding === 'deflate') {
        throw new NotImplementedException('deflate is not supported');
      } else if (requestEncoding === 'br') {
        decoders.push(zlib.createBrotliDecompress());
      } else {
        decoders.length = 0;
        break;
      }
    }

    if (decoders.length !== 0) {
      const [firstDecoder, ...restDecoders] = decoders;
      this.#inputStream = firstDecoder;
      this.#inputStream.on('drain', () => {
        if (this.#inputStream) {
          this.#inputStream.resume();
        }
      });
      let outputStream = firstDecoder;

      if (restDecoders.length !== 0) {
        // eslint-disable-next-line @typescript-eslint/ban-ts-comment
        // @ts-expect-error
        outputStream = pipeline(this.#inputStream, ...restDecoders, this.#onDecompressStreamFinished.bind(this));
      } else {
        finished(this.#inputStream, this.#onDecompressStreamFinished.bind(this));
      }

      outputStream.on('data', (chunk) => {
        if (!this.#handler.onData!(chunk)) {
          if (this.#inputStream) {
            this.#inputStream.pause();
          }
        }
      });
    }
  }

  delete parsedHeaders['content-encoding'];
  const newRawHeaders = Object.entries(parsedHeaders)
    .map(([key, value]) => {
      if (Array.isArray(value)) {
        return value.map((v) => [key, v]).flat();
      } else {
        return [key, value];
      }
    })
    .flat()
    .map((v) => Buffer.from(v));

  const wrappedResume = () => {
    if (this.#inputStream) {
      this.#inputStream.resume();
    }
    return resume();
  };

  return this.#handler.onHeaders!(statusCode, newRawHeaders, wrappedResume, statusText);
}

@adrianfalleiro
Copy link
Author

@tjhiggins Do you want to collaborate on this PR with me? I can give you commit access to the base branch.

I think the interceptor interface has changed a little since I opened this PR. I can work on ensuring it works with the latest version of Undici.

Once I've done that you, do you want to add the changes above for onHeaders and backpressure? Sound good?

@tjhiggins
Copy link

@tjhiggins Do you want to collaborate on this PR with me? I can give you commit access to the base branch.

I think the interceptor interface has changed a little since I opened this PR. I can work on ensuring it works with the latest version of Undici.

Once I've done that you, do you want to add the changes above for onHeaders and backpressure? Sound good?

I only just saw the new hooks for v7. Fetch does decompress, but after the interceptors in v6. Maybe with the new hook onResponseData I don't even need an interceptor. I'll wait for you to update before making any changes. Not in a huge hurry to spend more time on this - now that I have it working for v6.

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Labels
None yet
Projects
None yet
Development

Successfully merging this pull request may close these issues.

6 participants