Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Decoding only part of a stream? #128

Open
mcclure opened this issue Sep 30, 2020 · 6 comments
Open

Decoding only part of a stream? #128

mcclure opened this issue Sep 30, 2020 · 6 comments

Comments

@mcclure
Copy link

mcclure commented Sep 30, 2020

Scenario: I have a small app where I store a binary blob. The binary blob has metadata associated with it, so, I think: I will store first some sort of header, and then the binary blob. I decide that the header should be a msgpack item. So I encode() my header, write() the result to a file, and then write() my binary blob. (There are reasons why I do not simply include the binary blob inside the msgpack item.)

When it comes time to read my file back in, I get a ReadableStream for the file, I call decodeAsync() on the ReadableStream, and… I get an error, Extra 512 of 529 byte(s) found at buffer[17]. Which, yes, that is expected, I put it there.

My only options for decoding msgpack seem to be decode/decodeAsync, which error if there is extra data at the end of the stream; and decodeStream, which understands there can be many consecutive data items but assumes they are all msgpack.
I can decode a single msgpack at the start of a stream by doing for await (const idk of decodeStream(fileStream)) { and then doing a break inside the loop, but if i do this I find fileStream is exhausted (0 remaining bytes), so I cannot resume from the stream following that first msgPack item. (And I don't have a way of knowing how many bytes the first msgPack item was, so I can't even start over from the start and skip past it).
Alternately, I can do the for await trick and attempt to read from the stream inside the loop after msgPack has decoded one item, but this won't work either because ReadableStreams are only allowed to have one Reader at a time.

How should I proceed? It seems like "read one item from this stream, but allow me to still use the stream when you are done with it" is not a particularly outlandish use case, but it does not seem to be supported.

@gfx
Copy link
Member

gfx commented Oct 4, 2020

I think MessagePack is designed to handle fixed-sized data structures and it's not easy to handle your case.

You can define your own protocol on the top of MessagePack, like this:

[data_len][data_msgpack][stream_buffer]
uint32     msgpack       chunk of bytes

Thus, the data structure does not depend on a particular MessagePack library.

@covert-encryption
Copy link

This needs to be fixed. Looks like every implementation of msgpack makes it very difficult to handle such a simple and presumably common use case. I managed to work around this in Python without hacking the module, but haven't yet figured any sane way to get current reading position in Javascript (a solution that does involve parsing the text of that exception for the object length).

Adding an extra field for msgpack object length is not a solution, nor is encoding all other data as msgpack raw binary objects. The MessagePack format itself knows very well where the object ends, and applications should have access to this information too.

@covert-encryption
Copy link

Essentially would need public access to doDecodeSync and pos of decoder, to allow

  1. Decoding a single object without throwing an exception when there is data left
  2. Finding out the current position afterwards (possibly also increment it externally to skip foreign data and decode the next msgpack object)

I suppose that the function could be called decodeSingle, as opposed to decode that throws and decodeMulti that loops when there is extra data.

@gfx
Copy link
Member

gfx commented Dec 15, 2021

@covert-encryption

I'm not exactly sure about your use cases, but I'm willing to make some methods public (with some changes, if needed). Feel free to make pull-requests with tests that simulate your use cases.

@covert-encryption
Copy link

@gfx The use case same as @mcclure. An ArrayBuffer (or another stream) which contains

[msgpack object][other data][msgpack object]...

Think of the msgpack object as a header for the other data that follows. Unfortunately the other data cannot easily be msgpack bin field mainly because it is often too large to fit in memory. The other data itself knows when it ends, so we know when to ask for the msgpack parser to step in again.

I'll see if I can make a sensible PR without complicating the already extensive API too much.

@ansemjo
Copy link

ansemjo commented Jul 4, 2023

If I may chime in with another use-case: I'm not using MessagePack for persistent storage but for streaming data to a browser over WebTransport. The data contains some metadata and then a relatively large binary blob. Now imagine if "relatively large" meant somethink like a hundred megabytes. I'd like to avoid having to decode the entire binary blob into memory before I can do anything else with it.

It would be great if you could decode the metadata, get the size of the binary from it, and then pass the underlying stream along to something that can accept the binary blob as a ReadableStream with the correct length.

While experimenting with Protobuf at first, I've worked around this by using it-length-prefixed (prepend each Protobuf with a VarInt length, so you can stream multiple on the wire at all) and it-reader (read an exact amount of bytes from a stream).


  • Using it-reader on the stream before passing it to decodeMultiStream probably won't work as it just eagerly takes all the bytes it gets (as @mcclure also found out above).
  • Prepending each msgpack with a VarInt length and using it-length-prefixed will probably work but I'd like to avoid that because a) MessagePack already supports streaming as-is and b) it makes writing to the wire harder.
  • However, after looking at the code I also don't see how you could elegantly pass such a ReadableStream onwards within an iteration because you'll only get to read the remaining bytes in the buffer at that moment. The buffer doesn't get refilled without resuming the loop inside the async generator for the next for await (const buffer of stream).
    • Maybe it might be viable to split instantiation of the decoder (using the passed stream in a nested ReadableStream generator, which supports reading an exact number of bytes ... like it-reader) and a method yielding single decoded msgpack objects (which will only read as much data as required from this internal stream).

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Labels
None yet
Projects
None yet
Development

No branches or pull requests

4 participants