-
Notifications
You must be signed in to change notification settings - Fork 6.4k
/
Copy pathmerge_operator.cc
167 lines (140 loc) · 6.03 KB
/
merge_operator.cc
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
// Copyright (c) 2011-present, Facebook, Inc. All rights reserved.
// This source code is licensed under both the GPLv2 (found in the
// COPYING file in the root directory) and Apache 2.0 License
// (found in the LICENSE.Apache file in the root directory).
//
/**
* Back-end implementation details specific to the Merge Operator.
*/
#include "rocksdb/merge_operator.h"
#include <type_traits>
#include "db/wide/wide_columns_helper.h"
#include "util/overload.h"
namespace ROCKSDB_NAMESPACE {
bool MergeOperator::FullMergeV2(const MergeOperationInput& merge_in,
MergeOperationOutput* merge_out) const {
// If FullMergeV2 is not implemented, we convert the operand_list to
// std::deque<std::string> and pass it to FullMerge
std::deque<std::string> operand_list_str;
for (auto& op : merge_in.operand_list) {
operand_list_str.emplace_back(op.data(), op.size());
}
return FullMerge(merge_in.key, merge_in.existing_value, operand_list_str,
&merge_out->new_value, merge_in.logger);
}
bool MergeOperator::FullMergeV3(const MergeOperationInputV3& merge_in,
MergeOperationOutputV3* merge_out) const {
assert(merge_out);
MergeOperationInput in_v2(merge_in.key, nullptr, merge_in.operand_list,
merge_in.logger);
std::string new_value;
Slice existing_operand(nullptr, 0);
MergeOperationOutput out_v2(new_value, existing_operand);
return std::visit(
overload{
[&](const auto& existing) -> bool {
using T = std::decay_t<decltype(existing)>;
if constexpr (std::is_same_v<T, Slice>) {
in_v2.existing_value = &existing;
}
const bool result = FullMergeV2(in_v2, &out_v2);
if (!result) {
merge_out->op_failure_scope = out_v2.op_failure_scope;
return false;
}
if (existing_operand.data()) {
merge_out->new_value = existing_operand;
} else {
merge_out->new_value = std::move(new_value);
}
return true;
},
[&](const WideColumns& existing_columns) -> bool {
const bool has_default_column =
WideColumnsHelper::HasDefaultColumn(existing_columns);
Slice value_of_default;
if (has_default_column) {
value_of_default = existing_columns.front().value();
}
in_v2.existing_value = &value_of_default;
const bool result = FullMergeV2(in_v2, &out_v2);
if (!result) {
merge_out->op_failure_scope = out_v2.op_failure_scope;
return false;
}
merge_out->new_value = MergeOperationOutputV3::NewColumns();
auto& new_columns = std::get<MergeOperationOutputV3::NewColumns>(
merge_out->new_value);
new_columns.reserve(has_default_column
? existing_columns.size()
: (existing_columns.size() + 1));
if (existing_operand.data()) {
new_columns.emplace_back(kDefaultWideColumnName.ToString(),
existing_operand.ToString());
} else {
new_columns.emplace_back(kDefaultWideColumnName.ToString(),
std::move(new_value));
}
for (size_t i = has_default_column ? 1 : 0;
i < existing_columns.size(); ++i) {
new_columns.emplace_back(existing_columns[i].name().ToString(),
existing_columns[i].value().ToString());
}
return true;
}},
merge_in.existing_value);
}
// The default implementation of PartialMergeMulti, which invokes
// PartialMerge multiple times internally and merges two operands at
// a time.
bool MergeOperator::PartialMergeMulti(const Slice& key,
const std::deque<Slice>& operand_list,
std::string* new_value,
Logger* logger) const {
assert(operand_list.size() >= 2);
// Simply loop through the operands
Slice temp_slice(operand_list[0]);
for (size_t i = 1; i < operand_list.size(); ++i) {
auto& operand = operand_list[i];
std::string temp_value;
if (!PartialMerge(key, temp_slice, operand, &temp_value, logger)) {
return false;
}
swap(temp_value, *new_value);
temp_slice = Slice(*new_value);
}
// The result will be in *new_value. All merges succeeded.
return true;
}
// Given a "real" merge from the library, call the user's
// associative merge function one-by-one on each of the operands.
// NOTE: It is assumed that the client's merge-operator will handle any errors.
bool AssociativeMergeOperator::FullMergeV2(
const MergeOperationInput& merge_in,
MergeOperationOutput* merge_out) const {
// Simply loop through the operands
Slice temp_existing;
const Slice* existing_value = merge_in.existing_value;
for (const auto& operand : merge_in.operand_list) {
std::string temp_value;
if (!Merge(merge_in.key, existing_value, operand, &temp_value,
merge_in.logger)) {
return false;
}
swap(temp_value, merge_out->new_value);
temp_existing = Slice(merge_out->new_value);
existing_value = &temp_existing;
}
// The result will be in *new_value. All merges succeeded.
return true;
}
// Call the user defined simple merge on the operands;
// NOTE: It is assumed that the client's merge-operator will handle any errors.
bool AssociativeMergeOperator::PartialMerge(const Slice& key,
const Slice& left_operand,
const Slice& right_operand,
std::string* new_value,
Logger* logger) const {
return Merge(key, &left_operand, right_operand, new_value, logger);
}
} // namespace ROCKSDB_NAMESPACE