Skip to content

Commit f326f6a

Browse files
ffacswgtmac
authored andcommitted
ORC-1388: [C++] Support schema evolution from decimal to timestamp/string group
### What changes were proposed in this pull request? Support conversion from {decimal} to {string, char, varchar, timestamp, timestamp_instant} ### Why are the changes needed? Support schema evolution at cpp side. ### How was this patch tested? UT passed. ### Was this patch authored or co-authored using generative AI tooling? NO Closes #1761 from ffacs/ORC-1388. Authored-by: ffacs <[email protected]> Signed-off-by: Gang Wu <[email protected]>
1 parent 8f22732 commit f326f6a

4 files changed

+287
-13
lines changed

c++/src/ConvertColumnReader.cc

+126-12
Original file line numberDiff line numberDiff line change
@@ -593,6 +593,107 @@ namespace orc {
593593
int32_t toScale;
594594
};
595595

596+
template <typename FileTypeBatch>
597+
class DecimalToTimestampColumnReader : public ConvertToTimestampColumnReader {
598+
public:
599+
DecimalToTimestampColumnReader(const Type& _readType, const Type& fileType,
600+
StripeStreams& stripe, bool _throwOnOverflow)
601+
: ConvertToTimestampColumnReader(_readType, fileType, stripe, _throwOnOverflow),
602+
precision(static_cast<int32_t>(fileType.getPrecision())),
603+
scale(static_cast<int32_t>(fileType.getScale())) {}
604+
605+
void next(ColumnVectorBatch& rowBatch, uint64_t numValues, char* notNull) override {
606+
ConvertColumnReader::next(rowBatch, numValues, notNull);
607+
const auto& srcBatch = *SafeCastBatchTo<const FileTypeBatch*>(data.get());
608+
auto& dstBatch = *SafeCastBatchTo<TimestampVectorBatch*>(&rowBatch);
609+
for (uint64_t i = 0; i < rowBatch.numElements; ++i) {
610+
if (!rowBatch.hasNulls || rowBatch.notNull[i]) {
611+
convertDecimalToTimestamp(dstBatch, i, srcBatch);
612+
}
613+
}
614+
}
615+
616+
private:
617+
void convertDecimalToTimestamp(TimestampVectorBatch& dstBatch, uint64_t idx,
618+
const FileTypeBatch& srcBatch) {
619+
constexpr int SecondToNanoFactor = 9;
620+
// Following constant comes from java.time.Instant
621+
// '-1000000000-01-01T00:00Z'
622+
constexpr int64_t MIN_EPOCH_SECONDS = -31557014167219200L;
623+
// '1000000000-12-31T23:59:59.999999999Z'
624+
constexpr int64_t MAX_EPOCH_SECONDS = 31556889864403199L;
625+
// dummy variable, there's no risk of overflow
626+
bool overflow = false;
627+
628+
Int128 i128(srcBatch.values[idx]);
629+
Int128 integerPortion = scaleDownInt128ByPowerOfTen(i128, scale);
630+
if (integerPortion < MIN_EPOCH_SECONDS || integerPortion > MAX_EPOCH_SECONDS) {
631+
handleOverflow<Decimal, int64_t>(dstBatch, idx, throwOnOverflow);
632+
return;
633+
}
634+
i128 -= scaleUpInt128ByPowerOfTen(integerPortion, scale, overflow);
635+
Int128 fractionPortion = std::move(i128);
636+
if (scale < SecondToNanoFactor) {
637+
fractionPortion =
638+
scaleUpInt128ByPowerOfTen(fractionPortion, SecondToNanoFactor - scale, overflow);
639+
} else {
640+
fractionPortion = scaleDownInt128ByPowerOfTen(fractionPortion, scale - SecondToNanoFactor);
641+
}
642+
if (fractionPortion < 0) {
643+
fractionPortion += 1e9;
644+
integerPortion -= 1;
645+
}
646+
// line 630 has guaranteed toLong() will not overflow
647+
dstBatch.data[idx] = integerPortion.toLong();
648+
dstBatch.nanoseconds[idx] = fractionPortion.toLong();
649+
650+
if (needConvertTimezone) {
651+
dstBatch.data[idx] = readerTimezone.convertFromUTC(dstBatch.data[idx]);
652+
}
653+
}
654+
655+
const int32_t precision;
656+
const int32_t scale;
657+
};
658+
659+
template <typename FileTypeBatch>
660+
class DecimalToStringVariantColumnReader : public ConvertToStringVariantColumnReader {
661+
public:
662+
DecimalToStringVariantColumnReader(const Type& _readType, const Type& fileType,
663+
StripeStreams& stripe, bool _throwOnOverflow)
664+
: ConvertToStringVariantColumnReader(_readType, fileType, stripe, _throwOnOverflow),
665+
scale(fileType.getScale()) {}
666+
667+
uint64_t convertToStrBuffer(ColumnVectorBatch& rowBatch, uint64_t numValues) override {
668+
uint64_t size = 0;
669+
strBuffer.resize(numValues);
670+
const auto& srcBatch = *SafeCastBatchTo<const FileTypeBatch*>(data.get());
671+
if (readType.getKind() == STRING) {
672+
for (uint64_t i = 0; i < rowBatch.numElements; ++i) {
673+
if (!rowBatch.hasNulls || rowBatch.notNull[i]) {
674+
strBuffer[i] = Int128(srcBatch.values[i]).toDecimalString(scale, true);
675+
size += strBuffer[i].size();
676+
}
677+
}
678+
} else {
679+
const auto maxLength = readType.getMaximumLength();
680+
for (uint64_t i = 0; i < rowBatch.numElements; ++i) {
681+
if (!rowBatch.hasNulls || rowBatch.notNull[i]) {
682+
strBuffer[i] = Int128(srcBatch.values[i]).toDecimalString(scale, true);
683+
}
684+
if (strBuffer[i].size() > maxLength) {
685+
strBuffer[i].resize(maxLength);
686+
}
687+
size += strBuffer[i].size();
688+
}
689+
}
690+
return size;
691+
}
692+
693+
private:
694+
const int32_t scale;
695+
};
696+
596697
#define DEFINE_NUMERIC_CONVERT_READER(FROM, TO, TYPE) \
597698
using FROM##To##TO##ColumnReader = \
598699
NumericConvertColumnReader<FROM##VectorBatch, TO##VectorBatch, TYPE>;
@@ -621,6 +722,14 @@ namespace orc {
621722
using Decimal128##To##TO##ColumnReader = \
622723
DecimalConvertColumnReader<Decimal128VectorBatch, TO##VectorBatch>;
623724

725+
#define DEFINE_DECIMAL_CONVERT_TO_TIMESTAMP_READER \
726+
using Decimal64ToTimestampColumnReader = DecimalToTimestampColumnReader<Decimal64VectorBatch>; \
727+
using Decimal128ToTimestampColumnReader = DecimalToTimestampColumnReader<Decimal128VectorBatch>;
728+
729+
#define DEFINE_DECIMAL_CONVERT_TO_STRING_VARINT_READER(TO) \
730+
using Decimal64To##TO##ColumnReader = DecimalToStringVariantColumnReader<Decimal64VectorBatch>; \
731+
using Decimal128To##TO##ColumnReader = DecimalToStringVariantColumnReader<Decimal128VectorBatch>;
732+
624733
DEFINE_NUMERIC_CONVERT_READER(Boolean, Byte, int8_t)
625734
DEFINE_NUMERIC_CONVERT_READER(Boolean, Short, int16_t)
626735
DEFINE_NUMERIC_CONVERT_READER(Boolean, Int, int32_t)
@@ -720,6 +829,11 @@ namespace orc {
720829
DEFINE_DECIMAL_CONVERT_TO_DECIMAL_READER(Decimal64)
721830
DEFINE_DECIMAL_CONVERT_TO_DECIMAL_READER(Decimal128)
722831

832+
DEFINE_DECIMAL_CONVERT_TO_TIMESTAMP_READER
833+
DEFINE_DECIMAL_CONVERT_TO_STRING_VARINT_READER(String)
834+
DEFINE_DECIMAL_CONVERT_TO_STRING_VARINT_READER(Char)
835+
DEFINE_DECIMAL_CONVERT_TO_STRING_VARINT_READER(Varchar)
836+
723837
#define CREATE_READER(NAME) \
724838
return std::make_unique<NAME>(_readType, fileType, stripe, throwOnOverflow);
725839

@@ -935,13 +1049,6 @@ namespace orc {
9351049
CASE_EXCEPTION
9361050
}
9371051
}
938-
case STRING:
939-
case BINARY:
940-
case TIMESTAMP:
941-
case LIST:
942-
case MAP:
943-
case STRUCT:
944-
case UNION:
9451052
case DECIMAL: {
9461053
switch (_readType.getKind()) {
9471054
CASE_CREATE_FROM_DECIMAL_READER(BOOLEAN, Boolean)
@@ -951,6 +1058,11 @@ namespace orc {
9511058
CASE_CREATE_FROM_DECIMAL_READER(LONG, Long)
9521059
CASE_CREATE_FROM_DECIMAL_READER(FLOAT, Float)
9531060
CASE_CREATE_FROM_DECIMAL_READER(DOUBLE, Double)
1061+
CASE_CREATE_FROM_DECIMAL_READER(STRING, String)
1062+
CASE_CREATE_FROM_DECIMAL_READER(CHAR, Char)
1063+
CASE_CREATE_FROM_DECIMAL_READER(VARCHAR, Varchar)
1064+
CASE_CREATE_FROM_DECIMAL_READER(TIMESTAMP, Timestamp)
1065+
CASE_CREATE_FROM_DECIMAL_READER(TIMESTAMP_INSTANT, Timestamp)
9541066
case DECIMAL: {
9551067
if (isDecimal64(fileType)) {
9561068
if (isDecimal64(_readType)) {
@@ -966,11 +1078,6 @@ namespace orc {
9661078
}
9671079
}
9681080
}
969-
case STRING:
970-
case CHAR:
971-
case VARCHAR:
972-
case TIMESTAMP:
973-
case TIMESTAMP_INSTANT:
9741081
case BINARY:
9751082
case LIST:
9761083
case MAP:
@@ -980,6 +1087,13 @@ namespace orc {
9801087
CASE_EXCEPTION
9811088
}
9821089
}
1090+
case STRING:
1091+
case BINARY:
1092+
case TIMESTAMP:
1093+
case LIST:
1094+
case MAP:
1095+
case STRUCT:
1096+
case UNION:
9831097
case DATE:
9841098
case VARCHAR:
9851099
case CHAR:

c++/src/SchemaEvolution.cc

+2-1
Original file line numberDiff line numberDiff line change
@@ -99,7 +99,8 @@ namespace orc {
9999
break;
100100
}
101101
case DECIMAL: {
102-
ret.isValid = ret.needConvert = isNumeric(readType);
102+
ret.isValid = ret.needConvert =
103+
isNumeric(readType) || isStringVariant(readType) || isTimestamp(readType);
103104
break;
104105
}
105106
case STRING:

c++/test/TestConvertColumnReader.cc

+143
Original file line numberDiff line numberDiff line change
@@ -16,6 +16,7 @@
1616
* limitations under the License.
1717
*/
1818

19+
#include "Timezone.hh"
1920
#include "orc/Type.hh"
2021
#include "wrap/gtest-wrapper.h"
2122

@@ -672,4 +673,146 @@ namespace orc {
672673
}
673674
}
674675

676+
TEST(ConvertColumnReader, TestConvertDecimalToTimestamp) {
677+
constexpr int DEFAULT_MEM_STREAM_SIZE = 10 * 1024 * 1024;
678+
constexpr int TEST_CASES = 1024;
679+
std::string writerTimezoneName = "America/New_York";
680+
std::string readerTimezoneName = "Australia/Sydney";
681+
MemoryOutputStream memStream(DEFAULT_MEM_STREAM_SIZE);
682+
std::unique_ptr<Type> fileType(
683+
Type::buildTypeFromString("struct<c1:decimal(14,4),c2:decimal(25,10)>"));
684+
std::shared_ptr<Type> readType(
685+
Type::buildTypeFromString("struct<c1:timestamp,c2:timestamp with local time zone>"));
686+
WriterOptions options;
687+
options.setUseTightNumericVector(true);
688+
options.setTimezoneName(writerTimezoneName);
689+
auto writer = createWriter(*fileType, &memStream, options);
690+
auto batch = writer->createRowBatch(TEST_CASES);
691+
auto structBatch = dynamic_cast<StructVectorBatch*>(batch.get());
692+
auto& c1 = dynamic_cast<Decimal64VectorBatch&>(*structBatch->fields[0]);
693+
auto& c2 = dynamic_cast<Decimal128VectorBatch&>(*structBatch->fields[1]);
694+
695+
auto convertToSeconds = [](const Timezone& writerTimezone, const std::string& date) {
696+
tm timeStruct;
697+
if (strptime(date.c_str(), "%Y-%m-%d %H:%M:%S", &timeStruct) == nullptr) {
698+
throw TimezoneError("bad time " + date);
699+
}
700+
return writerTimezone.convertFromUTC(timegm(&timeStruct));
701+
};
702+
703+
std::vector<std::string> timeStrings;
704+
for (int i = 0; i < TEST_CASES; i++) {
705+
int64_t year = 1960 + (i / 12);
706+
int64_t month = i % 12 + 1;
707+
int64_t day = 27;
708+
std::string others = "23:45:56";
709+
std::stringstream ss;
710+
ss << year << "-";
711+
ss << std::setfill('0') << std::setw(2) << month << "-" << day << " " << others;
712+
timeStrings.push_back(ss.str());
713+
}
714+
std::vector<int64_t> ts[2];
715+
for (auto& time : timeStrings) {
716+
ts[0].emplace_back(convertToSeconds(getTimezoneByName("GMT"), time));
717+
ts[1].emplace_back(convertToSeconds(getTimezoneByName(writerTimezoneName), time));
718+
}
719+
bool overflow = false;
720+
721+
for (int i = 0; i < TEST_CASES; i++) {
722+
c1.values[i] = ts[0][i] * 10000 + 1234;
723+
c2.values[i] = scaleUpInt128ByPowerOfTen(Int128(ts[1][i]), 10, overflow) +=
724+
Int128("1234567895");
725+
assert(!overflow);
726+
}
727+
728+
structBatch->numElements = c1.numElements = c2.numElements = TEST_CASES;
729+
structBatch->hasNulls = c1.hasNulls = c2.hasNulls = false;
730+
writer->add(*batch);
731+
writer->close();
732+
auto inStream = std::make_unique<MemoryInputStream>(memStream.getData(), memStream.getLength());
733+
auto pool = getDefaultPool();
734+
auto reader = createReader(*pool, std::move(inStream));
735+
RowReaderOptions rowReaderOptions;
736+
rowReaderOptions.setUseTightNumericVector(true);
737+
rowReaderOptions.setReadType(readType);
738+
rowReaderOptions.setTimezoneName(readerTimezoneName);
739+
auto rowReader = reader->createRowReader(rowReaderOptions);
740+
auto readBatch = rowReader->createRowBatch(TEST_CASES);
741+
EXPECT_EQ(true, rowReader->next(*readBatch));
742+
743+
auto& readSturctBatch = dynamic_cast<StructVectorBatch&>(*readBatch);
744+
auto& readC1 = dynamic_cast<TimestampVectorBatch&>(*readSturctBatch.fields[0]);
745+
auto& readC2 = dynamic_cast<TimestampVectorBatch&>(*readSturctBatch.fields[1]);
746+
for (int i = 0; i < TEST_CASES; i++) {
747+
size_t idx = static_cast<size_t>(i);
748+
EXPECT_TRUE(readC1.notNull[idx]) << i;
749+
EXPECT_TRUE(readC2.notNull[idx]) << i;
750+
EXPECT_EQ(getTimezoneByName(readerTimezoneName).convertToUTC(readC1.data[i]), ts[0][i]);
751+
EXPECT_TRUE(readC1.nanoseconds[i] == 123400000);
752+
EXPECT_EQ(readC2.data[i], ts[1][i]);
753+
if (readC2.data[i] < 0) {
754+
EXPECT_EQ(readC2.nanoseconds[i], 123456790) << timeStrings[i];
755+
} else {
756+
EXPECT_EQ(readC2.nanoseconds[i], 123456789) << timeStrings[i];
757+
}
758+
}
759+
}
760+
761+
TEST(ConvertColumnReader, TestConvertDecimalToStringVariant) {
762+
constexpr int DEFAULT_MEM_STREAM_SIZE = 10 * 1024 * 1024;
763+
constexpr int TEST_CASES = 1024;
764+
MemoryOutputStream memStream(DEFAULT_MEM_STREAM_SIZE);
765+
std::unique_ptr<Type> fileType(
766+
Type::buildTypeFromString("struct<c1:decimal(14,3),c2:decimal(14,3),c3:decimal(14,3)>"));
767+
std::shared_ptr<Type> readType(
768+
Type::buildTypeFromString("struct<c1:char(5),c2:varchar(5),c3:string>"));
769+
WriterOptions options;
770+
auto writer = createWriter(*fileType, &memStream, options);
771+
auto batch = writer->createRowBatch(TEST_CASES);
772+
auto structBatch = dynamic_cast<StructVectorBatch*>(batch.get());
773+
auto& c1 = dynamic_cast<Decimal64VectorBatch&>(*structBatch->fields[0]);
774+
auto& c2 = dynamic_cast<Decimal64VectorBatch&>(*structBatch->fields[1]);
775+
auto& c3 = dynamic_cast<Decimal64VectorBatch&>(*structBatch->fields[2]);
776+
777+
for (int i = 0; i < TEST_CASES; i++) {
778+
c1.values[i] = i * 1000 + 123;
779+
c2.values[i] = i * 1000 + 456;
780+
c3.values[i] = i * 1000 + 789;
781+
}
782+
structBatch->numElements = c1.numElements = c2.numElements = TEST_CASES;
783+
structBatch->hasNulls = c1.hasNulls = c2.hasNulls = false;
784+
writer->add(*batch);
785+
writer->close();
786+
auto inStream = std::make_unique<MemoryInputStream>(memStream.getData(), memStream.getLength());
787+
auto pool = getDefaultPool();
788+
auto reader = createReader(*pool, std::move(inStream));
789+
RowReaderOptions rowReaderOptions;
790+
rowReaderOptions.setUseTightNumericVector(true);
791+
rowReaderOptions.setReadType(readType);
792+
auto rowReader = reader->createRowReader(rowReaderOptions);
793+
auto readBatch = rowReader->createRowBatch(TEST_CASES);
794+
EXPECT_EQ(true, rowReader->next(*readBatch));
795+
796+
auto& readSturctBatch = dynamic_cast<StructVectorBatch&>(*readBatch);
797+
auto& readC1 = dynamic_cast<StringVectorBatch&>(*readSturctBatch.fields[0]);
798+
auto& readC2 = dynamic_cast<StringVectorBatch&>(*readSturctBatch.fields[1]);
799+
auto& readC3 = dynamic_cast<StringVectorBatch&>(*readSturctBatch.fields[2]);
800+
for (int i = 0; i < TEST_CASES; i++) {
801+
if (i < 10) {
802+
EXPECT_EQ(std::to_string(i) + ".123", std::string(readC1.data[i], readC1.length[i]));
803+
EXPECT_EQ(std::to_string(i) + ".456", std::string(readC2.data[i], readC2.length[i]));
804+
} else if (i >= 10 && i < 100) {
805+
EXPECT_EQ(std::to_string(i) + ".12", std::string(readC1.data[i], readC1.length[i]));
806+
EXPECT_EQ(std::to_string(i) + ".45", std::string(readC2.data[i], readC2.length[i]));
807+
} else if (i >= 100 && i < 1000) {
808+
EXPECT_EQ(std::to_string(i) + ".1", std::string(readC1.data[i], readC1.length[i]));
809+
EXPECT_EQ(std::to_string(i) + ".4", std::string(readC2.data[i], readC2.length[i]));
810+
} else {
811+
EXPECT_EQ(std::to_string(i) + ".", std::string(readC1.data[i], readC1.length[i]));
812+
EXPECT_EQ(std::to_string(i) + ".", std::string(readC2.data[i], readC2.length[i]));
813+
}
814+
EXPECT_EQ(std::to_string(i) + ".789", std::string(readC3.data[i], readC3.length[i]));
815+
}
816+
}
817+
675818
} // namespace orc

c++/test/TestSchemaEvolution.cc

+16
Original file line numberDiff line numberDiff line change
@@ -132,6 +132,22 @@ namespace orc {
132132
}
133133
}
134134

135+
// conversion from decimal to string/char/varchar
136+
for (size_t i = 12; i <= 13; i++) {
137+
for (size_t j = 7; j <= 11; j++) {
138+
canConvert[i][j] = true;
139+
needConvert[i][j] = true;
140+
}
141+
}
142+
143+
// conversion from decimal to timestamp
144+
for (size_t i = 12; i <= 13; i++) {
145+
for (size_t j = 14; j <= 15; j++) {
146+
canConvert[i][j] = true;
147+
needConvert[i][j] = true;
148+
}
149+
}
150+
135151
for (size_t i = 0; i < typesSize; i++) {
136152
for (size_t j = 0; j < typesSize; j++) {
137153
testConvertReader(types[i], types[j], canConvert[i][j], needConvert[i][j]);

0 commit comments

Comments
 (0)