Skip to content

Commit 1fe27fe

Browse files
authored
apacheGH-45263: [MATLAB] Add ability to construct RecordBatchStreamReader from uint8 array (apache#45274)
### Rationale for this change To enable more workflows using the IPC Stream format in the MATLAB interface, this pull request adds the ability to construct a `RecordBatchStreamReader` from a MATLAB `uint8` array. This is helpful, for example, to enable Arrow-over-HTTP workflows in conjunction with the [MATLAB `webread` function](https://www.mathworks.com/help/matlab/ref/webread.html) (which can return a `uint8` array from an HTTP request). This is a followup issue to apache#44923. ### What changes are included in this PR? 1. Added a new `static` "construction function" `arrow.io.ipc.RecordBatchStreamReader.fromBytes(bytes)`. 2. Added a new `static` "construction function" `arrow.io.ipc.RecordBatchStreamReader.fromFile(filename)`. 3. Changed the signature of the `arrow.io.ipc.RecordBatchStreamReader` constructor to no longer directly accept a `filename` as an input. Instead, a `arrow.io.ipc.RecordBatchStreamReader` can now only be directly constructed from a `libmexclass.proxy.Proxy` instance. This mirrors the design of other MATLAB classes which wrap `Proxy` instances in the MATLAB interface. To construct `RecordBatchStreamReader` objects from an Arrow IPC Stream file on disk, users can instead use the new `static` "construction function" `arrow.io.ipc.RecordBatchStreamReader.fromFile(filename)`. ### Are these changes tested? Yes. 1. Updated tests in `arrow/matlab/test/arrow/io/ipc/tRecordBatchStreamReader.m` to be parameterized over the `fromFile` and `fromBytes` "construction functions". 2. Added a new test to verify that an appropriate error is thrown if the `RecordBatchStreamReader` constructor is called directly with an input that is not a `libmexclass.proxy.Proxy` instance. ### Are there any user-facing changes? Yes. 1. Users can now create `arrow.io.ipc.RecordBatchStreamReader` objects from an Arrow IPC Stream file on disk using the new `static` "construction function" `arrow.io.ipc.RecordBatchStreamReader.fromFile(filename)`. 2. Users can now create `arrow.io.ipc.RecordBatchStreamReader` objects from an in-memory MATLAB `uint8` "bytes" array using the new `static` "construction function" `arrow.io.ipc.RecordBatchStreamReader.fromBytes(bytes)`. **This PR includes breaking changes to public APIs.** This PR changes the signature of the public `arrow.io.ipc.RecordBatchStreamReader` constructor to no longer directly accept a `filename` as an input. Instead, a `arrow.io.ipc.RecordBatchStreamReader` can now only be directly constructed from a `libmexclass.proxy.Proxy` instance. This mirrors the design of other MATLAB classes which wrap `Proxy` instances in the MATLAB interface. To construct `RecordBatchStreamReader` objects from an Arrow IPC Stream file on disk, users can instead use the new `static` "construction function" `arrow.io.ipc.RecordBatchStreamReader.fromFile(filename)`. ### Future Directions 1. Use the new `static` "construction function" `arrow.io.ipc.RecordBatchStreamReader.fromBytes(bytes)` in an example to demonstrate how to read an Arrow IPC Stream from an HTTP endpoint as part of [apache/arrow-experiments](https://github.com/apache/arrow-experiments/tree/main/http/get_simple). ### Notes 1. Thank you @ sgilmore10 for your help with this pull request! * GitHub Issue: apache#45263 Authored-by: Kevin Gurney <[email protected]> Signed-off-by: Kevin Gurney <[email protected]>
1 parent c9f417f commit 1fe27fe

File tree

5 files changed

+144
-42
lines changed

5 files changed

+144
-42
lines changed

matlab/src/cpp/arrow/matlab/error/error.h

+2
Original file line numberDiff line numberDiff line change
@@ -247,6 +247,8 @@ static const char* IPC_RECORD_BATCH_WRITE_FAILED =
247247
static const char* IPC_RECORD_BATCH_WRITE_CLOSE_FAILED = "arrow:io:ipc:CloseFailed";
248248
static const char* IPC_RECORD_BATCH_READER_OPEN_FAILED =
249249
"arrow:io:ipc:FailedToOpenRecordBatchReader";
250+
static const char* IPC_RECORD_BATCH_READER_INVALID_CONSTRUCTION_TYPE =
251+
"arrow:io:ipc:InvalidConstructionType";
250252
static const char* IPC_RECORD_BATCH_READ_INVALID_INDEX = "arrow:io:ipc:InvalidIndex";
251253
static const char* IPC_RECORD_BATCH_READ_FAILED = "arrow:io:ipc:ReadFailed";
252254
static const char* IPC_TABLE_READ_FAILED = "arrow:io:ipc:TableReadFailed";

matlab/src/cpp/arrow/matlab/io/ipc/proxy/record_batch_stream_reader.cc

+40-2
Original file line numberDiff line numberDiff line change
@@ -17,6 +17,8 @@
1717

1818
#include "arrow/matlab/io/ipc/proxy/record_batch_stream_reader.h"
1919
#include "arrow/io/file.h"
20+
#include "arrow/io/memory.h"
21+
#include "arrow/matlab/buffer/matlab_buffer.h"
2022
#include "arrow/matlab/error/error.h"
2123
#include "arrow/matlab/tabular/proxy/record_batch.h"
2224
#include "arrow/matlab/tabular/proxy/schema.h"
@@ -36,14 +38,13 @@ RecordBatchStreamReader::RecordBatchStreamReader(
3638
REGISTER_METHOD(RecordBatchStreamReader, readTable);
3739
}
3840

39-
libmexclass::proxy::MakeResult RecordBatchStreamReader::make(
41+
libmexclass::proxy::MakeResult RecordBatchStreamReader::fromFile(
4042
const libmexclass::proxy::FunctionArguments& constructor_arguments) {
4143
namespace mda = ::matlab::data;
4244
using RecordBatchStreamReaderProxy =
4345
arrow::matlab::io::ipc::proxy::RecordBatchStreamReader;
4446

4547
const mda::StructArray opts = constructor_arguments[0];
46-
4748
const mda::StringArray filename_mda = opts[0]["Filename"];
4849
const auto filename_utf16 = std::u16string(filename_mda[0]);
4950
MATLAB_ASSIGN_OR_ERROR(const auto filename_utf8,
@@ -60,6 +61,43 @@ libmexclass::proxy::MakeResult RecordBatchStreamReader::make(
6061
return std::make_shared<RecordBatchStreamReaderProxy>(std::move(reader));
6162
}
6263

64+
libmexclass::proxy::MakeResult RecordBatchStreamReader::fromBytes(
65+
const libmexclass::proxy::FunctionArguments& constructor_arguments) {
66+
namespace mda = ::matlab::data;
67+
using RecordBatchStreamReaderProxy =
68+
arrow::matlab::io::ipc::proxy::RecordBatchStreamReader;
69+
70+
const mda::StructArray opts = constructor_arguments[0];
71+
const ::matlab::data::TypedArray<uint8_t> bytes_mda = opts[0]["Bytes"];
72+
const auto matlab_buffer =
73+
std::make_shared<arrow::matlab::buffer::MatlabBuffer>(bytes_mda);
74+
auto buffer_reader = std::make_shared<arrow::io::BufferReader>(matlab_buffer);
75+
MATLAB_ASSIGN_OR_ERROR(auto reader,
76+
arrow::ipc::RecordBatchStreamReader::Open(buffer_reader),
77+
error::IPC_RECORD_BATCH_READER_OPEN_FAILED);
78+
return std::make_shared<RecordBatchStreamReaderProxy>(std::move(reader));
79+
}
80+
81+
libmexclass::proxy::MakeResult RecordBatchStreamReader::make(
82+
const libmexclass::proxy::FunctionArguments& constructor_arguments) {
83+
namespace mda = ::matlab::data;
84+
const mda::StructArray opts = constructor_arguments[0];
85+
86+
// Dispatch to the appropriate static "make" method depending
87+
// on the input type.
88+
const mda::StringArray type_mda = opts[0]["Type"];
89+
const auto type_utf16 = std::u16string(type_mda[0]);
90+
if (type_utf16 == u"Bytes") {
91+
return RecordBatchStreamReader::fromBytes(constructor_arguments);
92+
} else if (type_utf16 == u"File") {
93+
return RecordBatchStreamReader::fromFile(constructor_arguments);
94+
} else {
95+
return libmexclass::error::Error{
96+
"arrow:io:ipc:InvalidConstructionType",
97+
"Invalid construction type for RecordBatchStreamReader."};
98+
}
99+
}
100+
63101
void RecordBatchStreamReader::getSchema(libmexclass::proxy::method::Context& context) {
64102
namespace mda = ::matlab::data;
65103
using SchemaProxy = arrow::matlab::tabular::proxy::Schema;

matlab/src/cpp/arrow/matlab/io/ipc/proxy/record_batch_stream_reader.h

+4
Original file line numberDiff line numberDiff line change
@@ -30,6 +30,10 @@ class RecordBatchStreamReader : public libmexclass::proxy::Proxy {
3030

3131
static libmexclass::proxy::MakeResult make(
3232
const libmexclass::proxy::FunctionArguments& constructor_arguments);
33+
static libmexclass::proxy::MakeResult fromFile(
34+
const libmexclass::proxy::FunctionArguments& constructor_arguments);
35+
static libmexclass::proxy::MakeResult fromBytes(
36+
const libmexclass::proxy::FunctionArguments& constructor_arguments);
3337

3438
protected:
3539
std::shared_ptr<arrow::ipc::RecordBatchStreamReader> reader;

matlab/src/matlab/+arrow/+io/+ipc/RecordBatchStreamReader.m

+24-4
Original file line numberDiff line numberDiff line change
@@ -26,14 +26,34 @@
2626
Schema
2727
end
2828

29-
methods
30-
function obj = RecordBatchStreamReader(filename)
29+
methods (Static)
30+
function obj = fromBytes(bytes)
31+
arguments
32+
bytes(:, 1) uint8
33+
end
34+
args = struct(Bytes=bytes, Type="Bytes");
35+
proxyName = "arrow.io.ipc.proxy.RecordBatchStreamReader";
36+
proxy = arrow.internal.proxy.create(proxyName, args);
37+
obj = arrow.io.ipc.RecordBatchStreamReader(proxy);
38+
end
39+
40+
function obj = fromFile(filename)
3141
arguments
3242
filename(1, 1) string {mustBeNonzeroLengthText}
3343
end
34-
args = struct(Filename=filename);
44+
args = struct(Filename=filename, Type="File");
3545
proxyName = "arrow.io.ipc.proxy.RecordBatchStreamReader";
36-
obj.Proxy = arrow.internal.proxy.create(proxyName, args);
46+
proxy = arrow.internal.proxy.create(proxyName, args);
47+
obj = arrow.io.ipc.RecordBatchStreamReader(proxy);
48+
end
49+
end
50+
51+
methods
52+
function obj = RecordBatchStreamReader(proxy)
53+
arguments
54+
proxy(1, 1) libmexclass.proxy.Proxy
55+
end
56+
obj.Proxy = proxy;
3757
end
3858

3959
function schema = get.Schema(obj)

0 commit comments

Comments
 (0)