diff --git a/ffi/.gitignore b/ffi/.gitignore index b65905e63..899de29d0 100644 --- a/ffi/.gitignore +++ b/ffi/.gitignore @@ -1,2 +1,3 @@ cffi-test cffi-test.o +examples/read-table/.clangd diff --git a/ffi/Cargo.toml b/ffi/Cargo.toml index 08162a505..52c7b8cb9 100644 --- a/ffi/Cargo.toml +++ b/ffi/Cargo.toml @@ -30,7 +30,7 @@ arrow-schema = { version = "53.0", default-features = false, features = [ arrow-data = { version = "53.0", default-features = false, features = [ "ffi", ], optional = true } -arrow-array = { version = "53.0", default-features = false, optional = true } +arrow-array = { version = "53.0", default-features = false, features = [ "ffi" ], optional = true } [build-dependencies] cbindgen = "0.27.0" diff --git a/ffi/examples/read-table/CMakeLists.txt b/ffi/examples/read-table/CMakeLists.txt index 22461b513..2f60b689e 100644 --- a/ffi/examples/read-table/CMakeLists.txt +++ b/ffi/examples/read-table/CMakeLists.txt @@ -1,46 +1,123 @@ cmake_minimum_required(VERSION 3.12) project(read_table) + +# Options option(PRINT_DATA "Print out the table data. Requires arrow-glib" ON) -option(VERBOSE "Enable for more diagnostics messages." OFF) -add_executable(read_table read_table.c arrow.c kernel_utils.c) -target_compile_definitions(read_table PUBLIC DEFINE_DEFAULT_ENGINE) -target_include_directories(read_table PUBLIC "${CMAKE_CURRENT_SOURCE_DIR}/../../../target/ffi-headers") -target_link_directories(read_table PUBLIC "${CMAKE_CURRENT_SOURCE_DIR}/../../../target/debug") -target_link_libraries(read_table PUBLIC delta_kernel_ffi) -target_compile_options(read_table PUBLIC) - -# Add the test -include(CTest) -set(TestRunner "../../../tests/read-table-testing/run_test.sh") -set(DatPath "../../../../acceptance/tests/dat/out/reader_tests/generated") -set(ExpectedPath "../../../tests/read-table-testing/expected-data") -add_test(NAME read_and_print_all_prim COMMAND ${TestRunner} ${DatPath}/all_primitive_types/delta/ ${ExpectedPath}/all-prim-types.expected) -add_test(NAME read_and_print_basic_partitioned COMMAND ${TestRunner} ${DatPath}/basic_partitioned/delta/ ${ExpectedPath}/basic-partitioned.expected) - -if(WIN32) - set(CMAKE_C_FLAGS_DEBUG "/MT") - target_link_libraries(read_table PUBLIC ws2_32 userenv bcrypt ncrypt crypt32 secur32 ntdll RuntimeObject) -endif(WIN32) - -if(MSVC) - target_compile_options(read_table PRIVATE /W3 /WX) -else() - # no-strict-prototypes because arrow headers have fn defs without prototypes - target_compile_options(read_table PRIVATE -Wall -Wextra -Wpedantic -Werror -Wno-strict-prototypes -g -fsanitize=address) - target_link_options(read_table PRIVATE -g -fsanitize=address) +option(VERBOSE "Enable for more diagnostic messages." OFF) + +# Paths to directories +set(FFI_HEADERS_DIR "${CMAKE_CURRENT_SOURCE_DIR}/../../../target/ffi-headers") +set(TARGET_DEBUG_DIR "${CMAKE_CURRENT_SOURCE_DIR}/../../../target/debug") +set(COMMON_DIR "${CMAKE_CURRENT_SOURCE_DIR}/common") + +# Include the pkg-config module if needed +if(PRINT_DATA) + include(FindPkgConfig) + pkg_check_modules(GLIB REQUIRED glib-2.0) + pkg_check_modules(ARROW_GLIB REQUIRED arrow-glib) + pkg_check_modules(PARQUET_GLIB REQUIRED parquet-glib) endif() +# Create the common static library +add_library(common_lib STATIC + ${COMMON_DIR}/arrow.c + ${COMMON_DIR}/kernel_utils.c +) + +# Include directories for common_lib +target_include_directories(common_lib PUBLIC + ${COMMON_DIR} + ${FFI_HEADERS_DIR} + $,${ARROW_GLIB_INCLUDE_DIRS},> + $,${PARQUET_GLIB_INCLUDE_DIRS},> + $,${CMAKE_CURRENT_SOURCE_DIR},> +) + +# Link directories for common_lib +target_link_directories(common_lib PUBLIC + ${TARGET_DEBUG_DIR} + $,${ARROW_GLIB_LIBRARY_DIRS},> + $,${PARQUET_GLIB_LIBRARY_DIRS},> +) + +# Link libraries for common_lib +target_link_libraries(common_lib PUBLIC + delta_kernel_ffi + $,${ARROW_GLIB_LIBRARIES},> + $,${PARQUET_GLIB_LIBRARIES},> +) + +# Compile definitions for common_lib +target_compile_definitions(common_lib PUBLIC DEFINE_DEFAULT_ENGINE) + if(VERBOSE) - target_compile_definitions(read_table PUBLIC VERBOSE) -endif(VERBOSE) + target_compile_definitions(common_lib PUBLIC VERBOSE) +endif() if(PRINT_DATA) - include(FindPkgConfig) - pkg_check_modules(GLIB REQUIRED glib-2.0) - pkg_check_modules(ARROW_GLIB REQUIRED arrow-glib) - target_include_directories(read_table PUBLIC ${ARROW_GLIB_INCLUDE_DIRS}) - target_link_directories(read_table PUBLIC ${ARROW_GLIB_LIBRARY_DIRS}) - target_link_libraries(read_table PUBLIC ${ARROW_GLIB_LIBRARIES}) - target_compile_options(read_table PUBLIC ${ARROW_GLIB_CFLAGS_OTHER}) - target_compile_definitions(read_table PUBLIC PRINT_ARROW_DATA) -endif(PRINT_DATA) + target_compile_definitions(common_lib PUBLIC PRINT_ARROW_DATA) + target_compile_options(common_lib PUBLIC ${ARROW_GLIB_CFLAGS_OTHER}) + target_compile_options(common_lib PUBLIC ${PARQUET_GLIB_CFLAGS_OTHER}) +endif() + +# List of executables +set(EXECUTABLES read_table write_table) + +foreach(EXE ${EXECUTABLES}) + add_executable(${EXE} ${EXE}.c) + + # Include directories for the executable + target_include_directories(${EXE} PUBLIC + "${CMAKE_CURRENT_SOURCE_DIR}" + ${FFI_HEADERS_DIR} + ) + + # Link directories for the executable + target_link_directories(${EXE} PUBLIC ${TARGET_DEBUG_DIR}) + + # Link libraries + target_link_libraries(${EXE} PUBLIC common_lib) + + # Compile definitions for the executable (if needed) + if(VERBOSE) + target_compile_definitions(${EXE} PUBLIC VERBOSE) + endif() + + if(PRINT_DATA) + target_compile_definitions(${EXE} PUBLIC PRINT_ARROW_DATA) + target_compile_options(${EXE} PUBLIC ${ARROW_GLIB_CFLAGS_OTHER}) + target_compile_options(${EXE} PUBLIC ${PARQUET_GLIB_CFLAGS_OTHER}) + endif() + + # Compiler options + if(MSVC) + target_compile_options(${EXE} PRIVATE /W3 /WX) + else() + # Suppress warnings about function prototypes due to arrow headers + target_compile_options(${EXE} PRIVATE + -Wall -Wextra -Wpedantic -Werror -Wno-strict-prototypes -g -fsanitize=address + ) + target_link_options(${EXE} PRIVATE -g -fsanitize=address) + endif() + + # Platform-specific libraries + if(WIN32) + set(CMAKE_C_FLAGS_DEBUG "/MT") + target_link_libraries(${EXE} PUBLIC + ws2_32 userenv bcrypt ncrypt crypt32 secur32 ntdll RuntimeObject + ) + endif() +endforeach() + +# Add the tests +include(CTest) +set(TestRunner "../../../tests/read-table-testing/run_test.sh") +set(DatPath "../../../../acceptance/tests/dat/out/reader_tests/generated") +set(ExpectedPath "../../../tests/read-table-testing/expected-data") + +add_test(NAME read_and_print_all_prim + COMMAND ${TestRunner} ${DatPath}/all_primitive_types/delta/ ${ExpectedPath}/all-prim-types.expected +) +add_test(NAME read_and_print_basic_partitioned + COMMAND ${TestRunner} ${DatPath}/basic_partitioned/delta/ ${ExpectedPath}/basic-partitioned.expected +) diff --git a/ffi/examples/read-table/arrow.c b/ffi/examples/read-table/common/arrow.c similarity index 100% rename from ffi/examples/read-table/arrow.c rename to ffi/examples/read-table/common/arrow.c diff --git a/ffi/examples/read-table/arrow.h b/ffi/examples/read-table/common/arrow.h similarity index 100% rename from ffi/examples/read-table/arrow.h rename to ffi/examples/read-table/common/arrow.h diff --git a/ffi/examples/read-table/kernel_utils.c b/ffi/examples/read-table/common/kernel_utils.c similarity index 93% rename from ffi/examples/read-table/kernel_utils.c rename to ffi/examples/read-table/common/kernel_utils.c index 64262414a..8844dbfc5 100644 --- a/ffi/examples/read-table/kernel_utils.c +++ b/ffi/examples/read-table/common/kernel_utils.c @@ -1,6 +1,7 @@ -#include #include #include + +#include "delta_kernel_ffi.h" #include "kernel_utils.h" // some diagnostic functions @@ -61,7 +62,7 @@ void* allocate_string(const KernelStringSlice slice) } // utility function to convert key/val into slices and set them on a builder -void set_builder_opt(EngineBuilder* engine_builder, char* key, char* val) +void set_builder_opt(struct EngineBuilder* engine_builder, char* key, char* val) { KernelStringSlice key_slice = { key, strlen(key) }; KernelStringSlice val_slice = { val, strlen(val) }; diff --git a/ffi/examples/read-table/kernel_utils.h b/ffi/examples/read-table/common/kernel_utils.h similarity index 96% rename from ffi/examples/read-table/kernel_utils.h rename to ffi/examples/read-table/common/kernel_utils.h index c6e60b960..1bc024596 100644 --- a/ffi/examples/read-table/kernel_utils.h +++ b/ffi/examples/read-table/common/kernel_utils.h @@ -1,6 +1,6 @@ #pragma once -#include +#include "delta_kernel_ffi.h" // This is how we represent our errors. The kernel will ask us to contruct this struct whenever it // enounters an error, and then return the contructed EngineError to us diff --git a/ffi/examples/read-table/schema.h b/ffi/examples/read-table/common/schema.h similarity index 100% rename from ffi/examples/read-table/schema.h rename to ffi/examples/read-table/common/schema.h diff --git a/ffi/examples/read-table/data/test_table/_delta_log/00000000000000000000.json b/ffi/examples/read-table/data/test_table/_delta_log/00000000000000000000.json new file mode 100644 index 000000000..4b27e91a1 --- /dev/null +++ b/ffi/examples/read-table/data/test_table/_delta_log/00000000000000000000.json @@ -0,0 +1,4 @@ +{"commitInfo":{"timestamp":1678020185201,"operation":"WRITE","operationParameters":{"mode":"Append","partitionBy":"[]"},"isolationLevel":"Serializable","isBlindAppend":true,"operationMetrics":{"numFiles":"1","numOutputRows":"10","numOutputBytes":"548"},"engineInfo":"Apache-Spark/3.3.0 Delta-Lake/2.3.0rc1","txnId":"07c0f996-3854-4456-b68b-d1e35e3888cd"}} +{"protocol":{"minReaderVersion":3,"minWriterVersion":7,"readerFeatures":[],"writerFeatures":[]}} +{"metaData":{"id":"6524c99f-9a76-4ea1-8ad4-e428a7e065d7","format":{"provider":"parquet","options":{}},"schemaString":"{\"type\":\"struct\",\"fields\":[{\"name\":\"value\",\"type\":\"long\",\"nullable\":true,\"metadata\":{}}]}","partitionColumns":[],"configuration":{},"createdTime":1678020184802}} +{"add":{"path":"part-00000-517f5d32-9c95-48e8-82b4-0229cc194867-c000.snappy.parquet","partitionValues":{},"size":548,"modificationTime":1678020185157,"dataChange":true,"stats":"{\"numRecords\":10,\"minValues\":{\"value\":0},\"maxValues\":{\"value\":9},\"nullCount\":{\"value\":0}}"}} diff --git a/ffi/examples/read-table/data/test_table/_delta_log/00000000000000000001.json b/ffi/examples/read-table/data/test_table/_delta_log/00000000000000000001.json new file mode 100644 index 000000000..565dcc808 --- /dev/null +++ b/ffi/examples/read-table/data/test_table/_delta_log/00000000000000000001.json @@ -0,0 +1 @@ +{"commitInfo":{"timestamp":1732749680780,"operation":"UNKNOWN","operationParameters":{},"kernelVersion":"v0.5.0","engineCommitInfo":{"engineInfo":"default engine C FFI"}}} diff --git a/ffi/examples/read-table/data/test_table/_delta_log/00000000000000000002.json b/ffi/examples/read-table/data/test_table/_delta_log/00000000000000000002.json new file mode 100644 index 000000000..f3d37e384 --- /dev/null +++ b/ffi/examples/read-table/data/test_table/_delta_log/00000000000000000002.json @@ -0,0 +1 @@ +{"commitInfo":{"timestamp":1732749716500,"operation":"UNKNOWN","operationParameters":{},"kernelVersion":"v0.5.0","engineCommitInfo":{"engineInfo":"default engine C FFI"}}} diff --git a/ffi/examples/read-table/data/test_table/part-00000-517f5d32-9c95-48e8-82b4-0229cc194867-c000.snappy.parquet b/ffi/examples/read-table/data/test_table/part-00000-517f5d32-9c95-48e8-82b4-0229cc194867-c000.snappy.parquet new file mode 100644 index 000000000..7ce78a86b Binary files /dev/null and b/ffi/examples/read-table/data/test_table/part-00000-517f5d32-9c95-48e8-82b4-0229cc194867-c000.snappy.parquet differ diff --git a/ffi/examples/read-table/data/unsupported_write_table/_delta_log/00000000000000000000.json b/ffi/examples/read-table/data/unsupported_write_table/_delta_log/00000000000000000000.json new file mode 100644 index 000000000..a7941cd08 --- /dev/null +++ b/ffi/examples/read-table/data/unsupported_write_table/_delta_log/00000000000000000000.json @@ -0,0 +1,4 @@ +{"commitInfo":{"timestamp":1678020185201,"operation":"WRITE","operationParameters":{"mode":"Append","partitionBy":"[]"},"isolationLevel":"Serializable","isBlindAppend":true,"operationMetrics":{"numFiles":"1","numOutputRows":"10","numOutputBytes":"548"},"engineInfo":"Apache-Spark/3.3.0 Delta-Lake/2.3.0rc1","txnId":"07c0f996-3854-4456-b68b-d1e35e3888cd"}} +{"protocol":{"minReaderVersion":1,"minWriterVersion":2}} +{"metaData":{"id":"6524c99f-9a76-4ea1-8ad4-e428a7e065d7","format":{"provider":"parquet","options":{}},"schemaString":"{\"type\":\"struct\",\"fields\":[{\"name\":\"value\",\"type\":\"long\",\"nullable\":true,\"metadata\":{}}]}","partitionColumns":[],"configuration":{},"createdTime":1678020184802}} +{"add":{"path":"part-00000-517f5d32-9c95-48e8-82b4-0229cc194867-c000.snappy.parquet","partitionValues":{},"size":548,"modificationTime":1678020185157,"dataChange":true,"stats":"{\"numRecords\":10,\"minValues\":{\"value\":0},\"maxValues\":{\"value\":9},\"nullCount\":{\"value\":0}}"}} diff --git a/ffi/examples/read-table/data/unsupported_write_table/part-00000-517f5d32-9c95-48e8-82b4-0229cc194867-c000.snappy.parquet b/ffi/examples/read-table/data/unsupported_write_table/part-00000-517f5d32-9c95-48e8-82b4-0229cc194867-c000.snappy.parquet new file mode 100644 index 000000000..7ce78a86b Binary files /dev/null and b/ffi/examples/read-table/data/unsupported_write_table/part-00000-517f5d32-9c95-48e8-82b4-0229cc194867-c000.snappy.parquet differ diff --git a/ffi/examples/read-table/read_table.h b/ffi/examples/read-table/read_table.h index 28d9c72dc..826f58e25 100644 --- a/ffi/examples/read-table/read_table.h +++ b/ffi/examples/read-table/read_table.h @@ -1,6 +1,6 @@ #pragma once -#include +#include "delta_kernel_ffi.h" // A list of partition column names typedef struct PartitionList diff --git a/ffi/examples/read-table/write_table.c b/ffi/examples/read-table/write_table.c new file mode 100644 index 000000000..f60d6aaef --- /dev/null +++ b/ffi/examples/read-table/write_table.c @@ -0,0 +1,571 @@ +#include +#include +#include +#include +#include +#include +#include +#include + +#include "delta_kernel_ffi.h" +#include "arrow.h" +#include "schema.h" +#include "kernel_utils.h" + +bool record_batch_to_ffi(GArrowRecordBatch *batch, + ArrowFFIData *out_ffi, + GError **error) { + if (!batch || !out_ffi) { + g_set_error(error, + GARROW_ERROR, + GARROW_ERROR_INVALID, + "record_batch_to_ffi: batch/out_ffi cannot be NULL"); + return false; + } + + struct ArrowArray *tmp_array = NULL; + struct ArrowSchema *tmp_schema = NULL; + + if (!garrow_record_batch_export(batch, + (gpointer *)&tmp_array, + (gpointer *)&tmp_schema, + error)) { + return false; // *error set by the export + } + + // shallow copy into out_ffi->array / out_ffi->schema. + memcpy(&out_ffi->array, tmp_array, sizeof(struct ArrowArray)); + memcpy(&out_ffi->schema, tmp_schema, sizeof(struct ArrowSchema)); + + // don't double-free + tmp_array->release = NULL; + tmp_schema->release = NULL; + + g_free(tmp_array); + g_free(tmp_schema); + + // must eventually call: + // - out_ffi->array.release(&out_ffi->array); + // - out_ffi->schema.release(&out_ffi->schema); + return true; +} + +GArrowRecordBatch *create_write_metadata(const gchar *path, + gint64 size_value, + gint64 mod_time_value, + GError **error) +{ + GArrowDataType *string_type = GARROW_DATA_TYPE(garrow_string_data_type_new()); + GArrowDataType *key_type = GARROW_DATA_TYPE(garrow_string_data_type_new()); + GArrowDataType *val_type = GARROW_DATA_TYPE(garrow_string_data_type_new()); + GArrowMapDataType *map_type = garrow_map_data_type_new(key_type, val_type); + GArrowDataType *int64_type = GARROW_DATA_TYPE(garrow_int64_data_type_new()); + GArrowDataType *bool_type = GARROW_DATA_TYPE(garrow_boolean_data_type_new()); + + GArrowField *field_path = garrow_field_new("path", string_type); + GArrowField *field_partition_values = garrow_field_new("partitionValues", GARROW_DATA_TYPE(map_type)); + GArrowField *field_size = garrow_field_new("size", int64_type); + GArrowField *field_mod_time = garrow_field_new("modificationTime", int64_type); + GArrowField *field_data_change = garrow_field_new("dataChange", bool_type); + + // unref the data types now that fields hold references + g_object_unref(string_type); + g_object_unref(key_type); + g_object_unref(val_type); + g_object_unref(map_type); + g_object_unref(int64_type); + g_object_unref(bool_type); + + // schema + GList *fields = NULL; + fields = g_list_append(fields, field_path); + fields = g_list_append(fields, field_partition_values); + fields = g_list_append(fields, field_size); + fields = g_list_append(fields, field_mod_time); + fields = g_list_append(fields, field_data_change); + GArrowSchema *schema = garrow_schema_new(fields); + g_list_free(fields); + + // schema now holds refs to the fields + g_object_unref(field_path); + g_object_unref(field_partition_values); + g_object_unref(field_size); + g_object_unref(field_mod_time); + g_object_unref(field_data_change); + + if (!schema) { + g_set_error_literal(error, + GARROW_ERROR, + GARROW_ERROR_INVALID, + "Failed to create schema"); + return NULL; + } + + // path + GArrowStringArrayBuilder *path_builder = + GARROW_STRING_ARRAY_BUILDER(garrow_string_array_builder_new()); + if (!garrow_string_array_builder_append_string(path_builder, path, error)) { + goto error_cleanup; + } + GArrowArray *path_array = + garrow_array_builder_finish(GARROW_ARRAY_BUILDER(path_builder), error); + g_object_unref(path_builder); + if (!path_array) { + goto error_cleanup; + } + + // partitionValues + GArrowMapArrayBuilder *map_builder = + garrow_map_array_builder_new(garrow_map_data_type_new( + GARROW_DATA_TYPE(garrow_string_data_type_new()), + GARROW_DATA_TYPE(garrow_string_data_type_new())), + error); + if (!map_builder) { + g_object_unref(path_array); + goto error_cleanup; + } + if (!garrow_map_array_builder_append_value(map_builder, error)) { + g_object_unref(path_array); + g_object_unref(map_builder); + goto error_cleanup; + } + GArrowArray *partition_values_array = + garrow_array_builder_finish(GARROW_ARRAY_BUILDER(map_builder), error); + g_object_unref(map_builder); + if (!partition_values_array) { + g_object_unref(path_array); + goto error_cleanup; + } + + // size + GArrowInt64ArrayBuilder *size_builder = + GARROW_INT64_ARRAY_BUILDER(garrow_int64_array_builder_new()); + if (!garrow_int64_array_builder_append_value(size_builder, size_value, error)) { + g_object_unref(path_array); + g_object_unref(partition_values_array); + goto error_cleanup; + } + GArrowArray *size_array = + garrow_array_builder_finish(GARROW_ARRAY_BUILDER(size_builder), error); + g_object_unref(size_builder); + if (!size_array) { + g_object_unref(path_array); + g_object_unref(partition_values_array); + goto error_cleanup; + } + + // modificationTime + GArrowInt64ArrayBuilder *mod_time_builder = + GARROW_INT64_ARRAY_BUILDER(garrow_int64_array_builder_new()); + if (!garrow_int64_array_builder_append_value(mod_time_builder, mod_time_value, error)) { + g_object_unref(path_array); + g_object_unref(partition_values_array); + g_object_unref(size_array); + goto error_cleanup; + } + GArrowArray *mod_time_array = + garrow_array_builder_finish(GARROW_ARRAY_BUILDER(mod_time_builder), error); + g_object_unref(mod_time_builder); + if (!mod_time_array) { + g_object_unref(path_array); + g_object_unref(partition_values_array); + g_object_unref(size_array); + goto error_cleanup; + } + + // dataChange + GArrowBooleanArrayBuilder *bool_builder = + GARROW_BOOLEAN_ARRAY_BUILDER(garrow_boolean_array_builder_new()); + if (!garrow_boolean_array_builder_append_value(bool_builder, TRUE, error)) { + g_object_unref(path_array); + g_object_unref(partition_values_array); + g_object_unref(size_array); + g_object_unref(mod_time_array); + goto error_cleanup; + } + GArrowArray *data_change_array = + garrow_array_builder_finish(GARROW_ARRAY_BUILDER(bool_builder), error); + g_object_unref(bool_builder); + if (!data_change_array) { + g_object_unref(path_array); + g_object_unref(partition_values_array); + g_object_unref(size_array); + g_object_unref(mod_time_array); + goto error_cleanup; + } + + const guint32 n_rows = 1; + GList *columns = NULL; + columns = g_list_append(columns, path_array); + columns = g_list_append(columns, partition_values_array); + columns = g_list_append(columns, size_array); + columns = g_list_append(columns, mod_time_array); + columns = g_list_append(columns, data_change_array); + GArrowRecordBatch *record_batch = + garrow_record_batch_new(schema, n_rows, columns, error); + g_list_free(columns); + + // record batch holds refs to arrays/schema + g_object_unref(path_array); + g_object_unref(partition_values_array); + g_object_unref(size_array); + g_object_unref(mod_time_array); + g_object_unref(data_change_array); + g_object_unref(schema); + + if (!record_batch) { + // 'error' is set by garrow_record_batch_new() if it failed + return NULL; + } + + return record_batch; + +error_cleanup: + g_object_unref(schema); + return NULL; +} + +// create map type column "engineCommitInfo" with one row and one entry: +// {"engineInfo": "default engine"} +GArrowRecordBatch *create_commit_info(GError **error) { + GArrowDataType *key_type = GARROW_DATA_TYPE(garrow_string_data_type_new()); + GArrowDataType *val_type = GARROW_DATA_TYPE(garrow_string_data_type_new()); + GArrowMapDataType *map_data_type = + garrow_map_data_type_new(key_type, val_type); + + GArrowField *map_field = + garrow_field_new("engineCommitInfo", GARROW_DATA_TYPE(map_data_type)); + + GList *schema_fields = NULL; + schema_fields = g_list_append(schema_fields, map_field); + GArrowSchema *schema = garrow_schema_new(schema_fields); + g_list_free(schema_fields); + + GArrowMapArrayBuilder *map_builder = + garrow_map_array_builder_new(map_data_type, error); + + if (!map_builder) { + g_object_unref(schema); + g_object_unref(map_field); + g_object_unref(key_type); + g_object_unref(val_type); + g_object_unref(map_data_type); + return NULL; + } + + GArrowArrayBuilder *key_builder = + garrow_map_array_builder_get_key_builder(map_builder); + GArrowArrayBuilder *item_builder = + garrow_map_array_builder_get_item_builder(map_builder); + + if (!garrow_map_array_builder_append_value(map_builder, error)) { + goto error_cleanup; + } + + if (!garrow_string_array_builder_append_string( + GARROW_STRING_ARRAY_BUILDER(key_builder), + "engineInfo", + error)) { + goto error_cleanup; + } + if (!garrow_string_array_builder_append_string( + GARROW_STRING_ARRAY_BUILDER(item_builder), + "default engine", + error)) { + goto error_cleanup; + } + + GArrowArray *map_array = + garrow_array_builder_finish(GARROW_ARRAY_BUILDER(map_builder), error); + if (!map_array) { + goto error_cleanup; + } + + guint32 n_rows = garrow_array_get_length(map_array); + GList *columns = NULL; + columns = g_list_append(columns, map_array); + GArrowRecordBatch *record_batch = + garrow_record_batch_new(schema, n_rows, columns, error); + + g_list_free(columns); + g_object_unref(map_array); + + if (!record_batch) { + // record_batch creation failed, error is set + goto error_cleanup; + } + + g_object_unref(map_builder); + g_object_unref(schema); + g_object_unref(map_field); + g_object_unref(key_type); + g_object_unref(val_type); + g_object_unref(map_data_type); + + return record_batch; + +error_cleanup: + printf("Error creating commit info record batch: %s\n", (*error)->message); + if (map_builder) { + g_object_unref(map_builder); + } + if (schema) { + g_object_unref(schema); + } + if (map_field) { + g_object_unref(map_field); + } + if (key_type) { + g_object_unref(key_type); + } + if (val_type) { + g_object_unref(val_type); + } + if (map_data_type) { + g_object_unref(map_data_type); + } + return NULL; +} + +gboolean write_record_batch(gchar *output_path, + GArrowRecordBatch *record_batch, + GError **error) { + // scrub prefix if it exists + const char prefix[] = "file://"; + size_t prefix_len = strlen(prefix); + if (strncmp(output_path, prefix, prefix_len) == 0) { + memmove(output_path, output_path + prefix_len, strlen(output_path + prefix_len) + 1); + } + GArrowSchema *schema = garrow_record_batch_get_schema(record_batch); + GParquetArrowFileWriter *writer = gparquet_arrow_file_writer_new_path ( + schema, + output_path, + NULL, + error + ); + + if (writer == NULL) { + printf("Error creating file writer: %s\n", (*error)->message); + return FALSE; + } + + g_object_unref(schema); + + if (!gparquet_arrow_file_writer_write_record_batch(writer, record_batch, error)) { + g_object_unref(writer); + return FALSE; + } + + if (!gparquet_arrow_file_writer_close(writer, error)) { + g_object_unref(writer); + return FALSE; + } + + g_object_unref(writer); + return TRUE; +} + +// (from build dir) run with ./write_table ../data/test_table/ +// just appends (10, 11, 12) to the existing table +int main(int argc, char* argv[]) +{ + if (argc < 2) { + printf("Usage: %s table/path\n", argv[0]); + return -1; + } + + char* table_path = argv[1]; + printf("Writing to table at %s\n", table_path); + + KernelStringSlice table_path_slice = { table_path, strlen(table_path) }; + + ExternResultEngineBuilder engine_builder_res = + get_engine_builder(table_path_slice, allocate_error); + if (engine_builder_res.tag != OkEngineBuilder) { + print_error("Could not get engine builder.", (Error*)engine_builder_res.err); + free_error((Error*)engine_builder_res.err); + return -1; + } + + ExternResultHandleSharedExternEngine engine_res = + get_default_engine(table_path_slice, allocate_error); + + if (engine_res.tag != OkHandleSharedExternEngine) { + print_error("Failed to get engine", (Error*)engine_builder_res.err); + free_error((Error*)engine_builder_res.err); + return -1; + } + + SharedExternEngine* engine = engine_res.ok; + + ExternResultHandleExclusiveTransaction txn_res = transaction(table_path_slice, engine); + if (txn_res.tag != OkHandleExclusiveTransaction) { + print_error("Failed to create transaction.", (Error*)txn_res.err); + free_error((Error*)txn_res.err); + return -1; + } + + ExclusiveTransaction* txn = txn_res.ok; + + GError *error = NULL; + GArrowRecordBatch *commit_info_batch = create_commit_info(&error); + + if (!commit_info_batch) { + if (error) { + fprintf(stderr, "Error: %s\n", error->message); + g_error_free(error); + } + return 1; + } + + ArrowFFIData commit_info_ffi; + if (!record_batch_to_ffi(commit_info_batch, &commit_info_ffi, &error)) { + if (error) { + fprintf(stderr, "Error: %s\n", error->message); + g_error_free(error); + } + // FIXME + // handle error, cleanup + return 1; + } + + // FIXME + // g_object_unref(commit_info); + + ExternResultHandleExclusiveEngineData commit_info_res = + get_engine_data(&commit_info_ffi, engine); + + if (commit_info_res.tag != OkHandleExclusiveEngineData) { + print_error("Failed to get commit info as engine data.", (Error*)commit_info_res.err); + free_error((Error*)commit_info_res.err); + return -1; + } + + ExclusiveEngineData* commit_info = commit_info_res.ok; + + ExclusiveTransaction* txn_with_commit_info = with_commit_info(txn, commit_info); + + SharedWriteContext* write_ctx = get_write_context(txn_with_commit_info); + + // TODO + // SharedSchema* write_schema = get_write_schema(write_ctx); + + char* table_root = get_write_path(write_ctx, allocate_string); + + uuid_t uuid; + uuid_generate(uuid); + + char parquet_name[45]; // 36 characters + null terminator + ".parquet" + uuid_unparse_lower(uuid, parquet_name); + snprintf(parquet_name, sizeof(parquet_name), "%s.parquet", parquet_name); + + size_t path_size = strlen(table_root) + strlen(parquet_name) + 1; + char *write_path = malloc(path_size); + if (!write_path) { + // TODO + return 1; + } + + snprintf(write_path, path_size, "%s%s", table_root, parquet_name); + print_diag("writing to: %s\n", write_path); + + // build arrow array of data we want to append + GArrowInt64ArrayBuilder *builder = garrow_int64_array_builder_new(); + if (!garrow_int64_array_builder_append_value(builder, 10, &error) || + !garrow_int64_array_builder_append_value(builder, 11, &error) || + !garrow_int64_array_builder_append_value(builder, 12, &error)) { + g_print("Error appending value: %s\n", error->message); + g_clear_error(&error); + g_object_unref(builder); + return EXIT_FAILURE; + } + + GArrowArray *array = garrow_array_builder_finish(GARROW_ARRAY_BUILDER(builder), &error); + if (!array) { + g_print("Error building array: %s\n", error->message); + g_clear_error(&error); + g_object_unref(builder); + return EXIT_FAILURE; + } + + GArrowDataType *data_type = GARROW_DATA_TYPE(garrow_int64_data_type_new()); + GArrowField *field = garrow_field_new("value", data_type); + g_object_unref(data_type); + + GList *fields = NULL; + fields = g_list_append(fields, field); + + GArrowSchema *schema = garrow_schema_new(fields); + g_list_free(fields); + g_object_unref(field); + + if (!schema) { + g_printerr("Error creating schema.\n"); + // 'field' is already unref + g_object_unref(array); + return 1; + } + + guint32 n_rows = garrow_array_get_length(array); + GList *columns = NULL; + columns = g_list_append(columns, array); + + GArrowRecordBatch *record_batch = + garrow_record_batch_new(schema, n_rows, columns, &error); + + g_list_free(columns); + g_object_unref(schema); + + if (!record_batch) { + g_printerr("Error creating RecordBatch: %s\n", error->message); + g_clear_error(&error); + g_object_unref(array); + return 1; + } + g_object_unref(array); + + printf("writing %lld rows...\n", garrow_record_batch_get_n_rows(record_batch)); + + write_record_batch(write_path, record_batch, &error); + // FIXME + GArrowRecordBatch *write_meta_batch = create_write_metadata(parquet_name, 1234, 5678, &error); + + + ArrowFFIData write_meta_ffi; + if (!record_batch_to_ffi(write_meta_batch, &write_meta_ffi, &error)) { + if (error) { + fprintf(stderr, "Error: %s\n", error->message); + g_error_free(error); + } + // FIXME + // handle error, cleanup + return 1; + } + + ExternResultHandleExclusiveEngineData write_meta_res = + get_engine_data(&write_meta_ffi, engine); + + if (write_meta_res.tag != OkHandleExclusiveEngineData) { + print_error("Failed to get commit info as engine data.", (Error*)commit_info_res.err); + free_error((Error*)commit_info_res.err); + return -1; + } + + ExclusiveEngineData* write_meta = write_meta_res.ok; + + add_write_metadata(txn_with_commit_info, write_meta); + + // commit! WARN: txn is consumed by this call + commit(txn_with_commit_info, engine); + + // Clean up + g_object_unref(record_batch); + g_object_unref(builder); + + free(write_path); + + // free_transaction(txn); // txn is consumed by commit + free_engine(engine); + + return 0; +} diff --git a/ffi/src/engine_data.rs b/ffi/src/engine_data.rs index 3363c9034..99ed13d91 100644 --- a/ffi/src/engine_data.rs +++ b/ffi/src/engine_data.rs @@ -84,3 +84,26 @@ fn get_raw_arrow_data_impl(data: Box) -> DeltaResult<*mut ArrowF let ret_data = Box::new(ArrowFFIData { array, schema }); Ok(Box::leak(ret_data)) } + +#[cfg(feature = "default-engine")] +#[no_mangle] +pub unsafe extern "C" fn get_engine_data( + data: *mut ArrowFFIData, + engine: Handle, +) -> ExternResult> { + get_engine_data_impl(data) + .map(|engine_data| engine_data.into()) + .into_extern_result(&engine.as_ref()) +} + +#[cfg(feature = "default-engine")] +unsafe fn get_engine_data_impl(data: *mut ArrowFFIData) -> DeltaResult> { + let array_local = std::ptr::read(&(*data).array); + let schema_local = std::ptr::read(&(*data).schema); + + let array: arrow_array::StructArray = + arrow_array::ffi::from_ffi(array_local, &schema_local)?.into(); + let record_batch: arrow_array::RecordBatch = array.into(); + let engine_data = delta_kernel::engine::arrow_data::ArrowEngineData::from(record_batch); + Ok(Box::new(engine_data)) +} diff --git a/ffi/src/engine_funcs.rs b/ffi/src/engine_funcs.rs index f8534dfc0..d0ce765ea 100644 --- a/ffi/src/engine_funcs.rs +++ b/ffi/src/engine_funcs.rs @@ -8,7 +8,7 @@ use tracing::debug; use url::Url; use crate::{ - scan::SharedSchema, ExclusiveEngineData, ExternEngine, ExternResult, IntoExternResult, + SharedSchema, ExclusiveEngineData, ExternEngine, ExternResult, IntoExternResult, KernelStringSlice, NullableCvoid, SharedExternEngine, TryFromStringSlice, }; diff --git a/ffi/src/error.rs b/ffi/src/error.rs index a615d0330..8f8514ca3 100644 --- a/ffi/src/error.rs +++ b/ffi/src/error.rs @@ -183,6 +183,7 @@ impl AllocateError for &dyn ExternEngine { etype: KernelError, msg: KernelStringSlice, ) -> *mut EngineError { + println!("etype: {etype:?}"); self.error_allocator().allocate_error(etype, msg) } } diff --git a/ffi/src/handle.rs b/ffi/src/handle.rs index 27b35bea5..99b4ce124 100644 --- a/ffi/src/handle.rs +++ b/ffi/src/handle.rs @@ -3,19 +3,19 @@ //! //! Creating a [`Handle`] always implies some kind of ownership transfer. A mutable handle takes //! ownership of the object itself (analagous to [`Box`]), while a non-mutable (shared) handle -//! takes ownership of a shared reference to the object (analagous to [`std::sync::Arc`]). Thus, a created -//! handle remains [valid][Handle#Validity], and its underlying object remains accessible, until the -//! handle is explicitly dropped or consumed. Dropping a mutable handle always drops the underlying -//! object as well; dropping a shared handle only drops the underlying object if the handle was the -//! last reference to that object. +//! takes ownership of a shared reference to the object (analagous to [`std::sync::Arc`]). Thus, +//! a created handle remains [valid][Handle#Validity], and its underlying object remains +//! accessible, until the handle is explicitly dropped or consumed. Dropping a mutable handle +//! always drops the underlying object as well; dropping a shared handle only drops the underlying +//! object if the handle was the last reference to that object. //! //! Because handles carry ownership semantics, and lifetime information is not preserved across the //! FFI boundary, handles are always opaque types to avoid confusiong them with normal references //! and pointers (`&Foo`, `*const Foo`, etc) that are possibly only valid until the FFI call -//! returns. For the same reason, mutable handles implement neither [`Copy`] nor [`Clone`]. However, -//! this only helps on the Rust side, because handles appear as simple pointers in the FFI and are -//! thus easily duplicated there. In order to improve safety, external (non-Rust) code is strongly -//! advised to maintain "unique pointer" semantics for mutable handles. +//! returns. For the same reason, mutable handles implement neither [`Copy`] nor [`Clone`]. +//! However, this only helps on the Rust side, because handles appear as simple pointers in the FFI +//! and are thus easily duplicated there. In order to improve safety, external (non-Rust) code is +//! strongly advised to maintain "unique pointer" semantics for mutable handles. //! //! NOTE: While shared handles could conceptually impl [`Clone`], cloning would require unsafe code //! and so we can't actually implement the trait. Use [`Handle::clone_handle`] instead. diff --git a/ffi/src/lib.rs b/ffi/src/lib.rs index 323f02ac9..f192539ae 100644 --- a/ffi/src/lib.rs +++ b/ffi/src/lib.rs @@ -11,7 +11,9 @@ use std::sync::Arc; use tracing::debug; use url::Url; +use delta_kernel::schema::Schema; use delta_kernel::snapshot::Snapshot; +use delta_kernel::transaction::{CommitResult, Transaction}; use delta_kernel::{DeltaResult, Engine, EngineData, Table}; use delta_kernel_ffi_macros::handle_descriptor; @@ -325,6 +327,9 @@ pub unsafe extern "C" fn free_row_indexes(slice: KernelRowIndexArray) { let _ = slice.into_vec(); } +#[handle_descriptor(target=Schema, mutable=false, sized=true)] +pub struct SharedSchema; + // TODO: Do we want this handle at all? Perhaps we should just _always_ pass raw *mut c_void pointers // that are the engine data? Even if we want the type, should it be a shared handle instead? /// an opaque struct that encapsulates data read by an engine. this handle can be passed back into @@ -662,6 +667,137 @@ pub unsafe extern "C" fn free_string_slice_data(data: Handle, +) -> ExternResult> { + let url = unsafe { unwrap_and_parse_path_as_url(path) }; + let engine = unsafe { engine.as_ref() }; + transaction_impl(url, engine).into_extern_result(&engine) +} + +fn transaction_impl( + url: DeltaResult, + extern_engine: &dyn ExternEngine, +) -> DeltaResult> { + let table = Table::try_from_uri(url?)?; + let transaction = table.new_transaction(extern_engine.engine().as_ref())?; + Ok(Box::new(transaction).into()) +} + +/// # Safety +/// +/// Caller is responsible for passing a valid handle. +#[no_mangle] +pub unsafe extern "C" fn free_transaction(txn: Handle) { + debug!("engine released transaction"); + txn.drop_handle(); +} + +/// TODO +/// +/// # Safety +/// +/// Caller is responsible for passing a valid handle. CONSUMES TRANSACTION and commit info +#[no_mangle] +pub unsafe extern "C" fn with_commit_info( + txn: Handle, + commit_info: Handle, +) -> Handle { + let txn = unsafe { txn.into_inner() }; + let commit_info = unsafe { commit_info.into_inner() }; + Box::new(txn.with_commit_info(commit_info)).into() +} + +use delta_kernel::transaction::WriteContext; + +#[handle_descriptor(target=WriteContext, mutable=false, sized=true)] +pub struct SharedWriteContext; + +#[no_mangle] +pub unsafe extern "C" fn get_write_context( + txn: Handle, +) -> Handle { + let txn = unsafe { txn.as_ref() }; + Arc::new(txn.get_write_context()).into() +} + +/// TODO +/// +/// # Safety +/// Engine is responsible for providing a valid WriteContext pointer +#[no_mangle] +pub unsafe extern "C" fn get_write_schema( + write_context: Handle, +) -> Handle { + let write_context = unsafe { write_context.as_ref() }; + write_context.schema().clone().into() +} + +/// TODO +/// +/// # Safety +/// Engine is responsible for providing a valid WriteContext pointer +#[no_mangle] +pub unsafe extern "C" fn get_write_path( + write_context: Handle, + allocate_fn: AllocateStringFn, +) -> NullableCvoid { + let write_context = unsafe { write_context.as_ref() }; + let write_path = write_context.target_dir().to_string(); + allocate_fn(kernel_string_slice!(write_path)) +} + +/// TODO +/// +/// # Safety +/// +/// Caller is responsible for passing a valid handle. Assumes no concurrent txn usage. Consumes +/// write_metadata. +#[no_mangle] +pub unsafe extern "C" fn add_write_metadata( + mut txn: Handle, + write_metadata: Handle, +) { + let txn = unsafe { txn.as_mut() }; + let write_metadata = unsafe { write_metadata.into_inner() }; + txn.add_write_metadata(write_metadata); +} + +/// TODO +/// +/// # Safety +/// +/// Caller is responsible for passing a valid handle. And MUST NOT USE transaction after this +/// method is called. +#[no_mangle] +pub unsafe extern "C" fn commit( + txn: Handle, + engine: Handle, +) -> ExternResult { + let txn = unsafe { txn.into_inner() }; + let extern_engine = unsafe { engine.as_ref() }; + let engine = extern_engine.engine(); + // FIXME for now just erasing the enum + match txn.commit(engine.as_ref()) { + Ok(CommitResult::Committed(v)) => Ok(v), + Ok(CommitResult::Conflict(_, v)) => Err(delta_kernel::Error::Generic(format!( + "commit conflict at version {v}" + ))), + Err(e) => Err(e), + } + .into_extern_result(&extern_engine) +} + // A set that can identify its contents by address pub struct ReferenceSet { map: std::collections::HashMap, diff --git a/ffi/src/scan.rs b/ffi/src/scan.rs index d5695c130..9a6e87050 100644 --- a/ffi/src/scan.rs +++ b/ffi/src/scan.rs @@ -3,9 +3,10 @@ use std::collections::HashMap; use std::sync::{Arc, Mutex}; +use crate::SharedSchema; + use delta_kernel::scan::state::{visit_scan_files, DvInfo, GlobalScanState}; use delta_kernel::scan::{Scan, ScanData}; -use delta_kernel::schema::Schema; use delta_kernel::snapshot::Snapshot; use delta_kernel::{DeltaResult, Error}; use delta_kernel_ffi_macros::handle_descriptor; @@ -70,8 +71,6 @@ fn scan_impl( #[handle_descriptor(target=GlobalScanState, mutable=false, sized=true)] pub struct SharedGlobalScanState; -#[handle_descriptor(target=Schema, mutable=false, sized=true)] -pub struct SharedSchema; /// Get the global state for a scan. See the docs for [`delta_kernel::scan::state::GlobalScanState`] /// for more information. diff --git a/kernel/src/engine/arrow_data.rs b/kernel/src/engine/arrow_data.rs index 7c2dd5f40..17f2662c6 100644 --- a/kernel/src/engine/arrow_data.rs +++ b/kernel/src/engine/arrow_data.rs @@ -6,7 +6,7 @@ use arrow_array::cast::AsArray; use arrow_array::types::{Int32Type, Int64Type}; use arrow_array::{Array, ArrayRef, GenericListArray, MapArray, OffsetSizeTrait, RecordBatch, StructArray}; use arrow_schema::{FieldRef, DataType as ArrowDataType}; -use tracing::{debug}; +use tracing::debug; use std::collections::{HashMap, HashSet};