Skip to content

Commit 03c8f7f

Browse files
authored
[fix][dingo-executor] Support selection for coprocessorV2 (#1345)
1 parent 64bee41 commit 03c8f7f

16 files changed

+999
-13
lines changed

dingo-calcite/src/main/java/io/dingodb/calcite/visitor/function/DingoIndexFullScanVisitFun.java

+2-1
Original file line numberDiff line numberDiff line change
@@ -148,7 +148,8 @@ private DingoIndexFullScanVisitFun() {
148148
relOp,
149149
rel.isPushDown(),
150150
rel.getSelection(),
151-
0
151+
0,
152+
transaction.isAutoCommit()
152153
));
153154
}
154155
assert indexScanvertex != null;

dingo-calcite/src/main/java/io/dingodb/calcite/visitor/function/DingoIndexRangeScanVisitFun.java

+2-1
Original file line numberDiff line numberDiff line change
@@ -175,7 +175,8 @@ private DingoIndexRangeScanVisitFun() {
175175
relOp,
176176
rel.isPushDown(),
177177
rel.getSelection(),
178-
0
178+
0,
179+
transaction.isAutoCommit()
179180
));
180181
}
181182
OutputHint hint = new OutputHint();

dingo-calcite/src/main/java/io/dingodb/calcite/visitor/function/DingoIndexScanWithRelOpVisitFun.java

+5-2
Original file line numberDiff line numberDiff line change
@@ -188,7 +188,8 @@ private DingoIndexScanWithRelOpVisitFun() {
188188
rel.isPushDown(),
189189
td.version,
190190
0,
191-
td.getCodecVersion()
191+
td.getCodecVersion(),
192+
null
192193
);
193194
if (relOp instanceof PipeOp) {
194195
return new Vertex(SCAN_WITH_PIPE_OP, param);
@@ -231,7 +232,9 @@ private DingoIndexScanWithRelOpVisitFun() {
231232
rel.isPushDown(),
232233
td.version,
233234
0,
234-
td.getCodecVersion()
235+
td.getCodecVersion(),
236+
null,
237+
transaction.isAutoCommit()
235238
);
236239
if (relOp instanceof PipeOp) {
237240
return new Vertex(TXN_SCAN_WITH_PIPE_OP, param);

dingo-calcite/src/main/java/io/dingodb/calcite/visitor/function/DingoScanWithRelOpVisitFun.java

+5-2
Original file line numberDiff line numberDiff line change
@@ -212,7 +212,8 @@ private DingoScanWithRelOpVisitFun() {
212212
rel.isPushDown(),
213213
td.version,
214214
rel.getLimit(),
215-
td.getCodecVersion()
215+
td.getCodecVersion(),
216+
null
216217
);
217218
if (relOp instanceof PipeOp) {
218219
return new Vertex(SCAN_WITH_PIPE_OP, param);
@@ -256,7 +257,9 @@ private DingoScanWithRelOpVisitFun() {
256257
rel.isPushDown(),
257258
td.version,
258259
rel.getLimit(),
259-
td.getCodecVersion()
260+
td.getCodecVersion(),
261+
null,
262+
transaction.isAutoCommit()
260263
);
261264
if (relOp instanceof PipeOp) {
262265
return new Vertex(TXN_SCAN_WITH_PIPE_OP, param);

dingo-exec/src/main/java/io/dingodb/exec/operator/params/ScanWithRelOpParam.java

+8-1
Original file line numberDiff line numberDiff line change
@@ -42,6 +42,7 @@
4242
import io.dingodb.expr.runtime.expr.NullaryAggExpr;
4343
import lombok.Getter;
4444
import lombok.Setter;
45+
import lombok.extern.slf4j.Slf4j;
4546
import org.checkerframework.checker.nullness.qual.NonNull;
4647

4748
import java.io.ByteArrayOutputStream;
@@ -52,6 +53,7 @@
5253
import java.util.stream.Collectors;
5354
import java.util.stream.IntStream;
5455

56+
@Slf4j
5557
@JsonTypeName("scanRel")
5658
@JsonPropertyOrder({
5759
"tableId",
@@ -79,6 +81,9 @@ public class ScanWithRelOpParam extends ScanParam {
7981
@Setter
8082
protected int limit;
8183

84+
@JsonProperty("selection")
85+
protected List<Integer> selection;
86+
8287
@Getter
8388
protected transient CoprocessorV2 coprocessor;
8489

@@ -126,7 +131,8 @@ public ScanWithRelOpParam(
126131
boolean pushDown,
127132
int schemaVersion,
128133
int limit,
129-
int codecVersion
134+
int codecVersion,
135+
List<Integer> selection
130136
) {
131137
super(tableId, schema, keyMapping, schemaVersion, codecVersion);
132138
this.relOp = relOp;
@@ -135,6 +141,7 @@ public ScanWithRelOpParam(
135141
coprocessor = null;
136142
this.limit = limit;
137143
config = new DingoRelConfig();
144+
this.selection = selection;
138145
}
139146

140147
@Override

dingo-exec/src/main/java/io/dingodb/exec/operator/params/TxnIndexRangeScanParam.java

+51-4
Original file line numberDiff line numberDiff line change
@@ -23,24 +23,34 @@
2323
import io.dingodb.codec.KeyValueCodec;
2424
import io.dingodb.common.CommonId;
2525
import io.dingodb.common.CoprocessorV2;
26+
import io.dingodb.common.log.LogUtils;
2627
import io.dingodb.common.type.DingoType;
2728
import io.dingodb.common.type.TupleMapping;
2829
import io.dingodb.exec.dag.Vertex;
2930
import io.dingodb.exec.expr.DingoCompileContext;
3031
import io.dingodb.exec.utils.SchemaWrapperUtils;
32+
import io.dingodb.exec.utils.relop.RelOpMappingVisitor;
33+
import io.dingodb.exec.utils.relop.RelOpSelectionVisitor;
34+
import io.dingodb.exec.utils.relop.SelectionFlag;
35+
import io.dingodb.exec.utils.relop.SelectionObj;
3136
import io.dingodb.expr.coding.CodingFlag;
3237
import io.dingodb.expr.coding.RelOpCoder;
3338
import io.dingodb.expr.common.type.TupleType;
3439
import io.dingodb.expr.rel.RelOp;
3540
import io.dingodb.meta.entity.Table;
3641
import lombok.Getter;
42+
import lombok.extern.slf4j.Slf4j;
3743

3844
import java.io.ByteArrayOutputStream;
3945
import java.util.Arrays;
46+
import java.util.Comparator;
47+
import java.util.HashSet;
4048
import java.util.List;
49+
import java.util.Set;
4150
import java.util.stream.Collectors;
4251
import java.util.stream.IntStream;
4352

53+
@Slf4j
4454
@Getter
4555
public class TxnIndexRangeScanParam extends ScanWithRelOpParam {
4656

@@ -68,6 +78,8 @@ public class TxnIndexRangeScanParam extends ScanWithRelOpParam {
6878
protected List<Integer> mapList;
6979
@JsonProperty("selection")
7080
private TupleMapping selection;
81+
@JsonProperty("isAutoCommit")
82+
private final boolean isAutoCommit;
7183

7284
public TxnIndexRangeScanParam(CommonId indexTableId,
7385
CommonId tableId,
@@ -82,9 +94,10 @@ public TxnIndexRangeScanParam(CommonId indexTableId,
8294
RelOp relOp,
8395
boolean pushDown,
8496
TupleMapping selection,
85-
int limit) {
97+
int limit,
98+
boolean isAutoCommit) {
8699
super(tableId, index.tupleType(), keyMapping, relOp, outputSchema,
87-
pushDown, index.getVersion(), limit, table.getCodecVersion());
100+
pushDown, index.getVersion(), limit, table.getCodecVersion(), selection.stream().boxed().collect(Collectors.toList()));
88101
this.indexSchema = index.tupleType();
89102
this.indexTableId = indexTableId;
90103
this.isLookup = isLookup;
@@ -94,6 +107,7 @@ public TxnIndexRangeScanParam(CommonId indexTableId,
94107
this.scanTs = scanTs;
95108
this.timeout = timeout;
96109
this.selection = selection;
110+
this.isAutoCommit = isAutoCommit;
97111
this.codec = CodecService.getDefault().createKeyValueCodec(
98112
index.getCodecVersion(), index.version, index.tupleType(), index.keyMapping());
99113
if (isLookup) {
@@ -109,16 +123,48 @@ public void init(Vertex vertex) {
109123
if (relOp == null) {
110124
return;
111125
}
112-
relOp = relOp.compile(new DingoCompileContext(
126+
RelOp relOpCompile = relOp.compile(new DingoCompileContext(
113127
(TupleType) indexSchema.getType(),
114128
(TupleType) vertex.getParasType().getType()
115129
), config);
116130
if (pushDown) {
117131
ByteArrayOutputStream os = new ByteArrayOutputStream();
118-
if (RelOpCoder.INSTANCE.visit(relOp, os) == CodingFlag.OK) {
132+
if (RelOpCoder.INSTANCE.visit(relOpCompile, os) == CodingFlag.OK) {
119133
List<Integer> selection = IntStream.range(0, indexSchema.fieldCount())
120134
.boxed()
121135
.collect(Collectors.toList());
136+
Set<Integer> selections = new HashSet<>();
137+
SelectionObj selectionObj = new SelectionObj(selections, true);
138+
boolean isSelection = false;
139+
if (isAutoCommit() && RelOpSelectionVisitor.INSTANCE.visit(relOp, selectionObj) == SelectionFlag.OK
140+
&& selectionObj.isProject() && selections.size() != selection.size()) {
141+
try {
142+
selection.clear();
143+
selection.addAll(selections);
144+
selection.sort(Comparator.naturalOrder());
145+
relOpCompile = RelOpMappingVisitor.INSTANCE.visit(relOp, selection);
146+
LogUtils.debug(log, "jobId:{}, new relOp: {}", vertex.getTask().getJobId(), relOpCompile);
147+
isSelection = true;
148+
} catch (Exception e) {
149+
LogUtils.error(log, e.getMessage(), e);
150+
selection = IntStream.range(0, indexSchema.fieldCount())
151+
.boxed()
152+
.collect(Collectors.toList());
153+
}
154+
} else {
155+
LogUtils.debug(log, "jobId:{}, origin relOp: {}", vertex.getTask().getJobId(), relOp);
156+
}
157+
if (isSelection) {
158+
relOpCompile = relOpCompile.compile(new DingoCompileContext(
159+
(TupleType) indexSchema.select(TupleMapping.of(selection)).getType(),
160+
(TupleType) vertex.getParasType().getType()
161+
), config);
162+
os = new ByteArrayOutputStream();
163+
if (RelOpCoder.INSTANCE.visit(relOpCompile, os) != CodingFlag.OK) {
164+
relOp = relOpCompile;
165+
return;
166+
}
167+
}
122168
TupleMapping keyMapping = indexKeyMapping();
123169
TupleMapping outputKeyMapping = TupleMapping.of(new int[]{});
124170
coprocessor = CoprocessorV2.builder()
@@ -130,6 +176,7 @@ public void init(Vertex vertex) {
130176
.build();
131177
}
132178
}
179+
relOp = relOpCompile;
133180
}
134181

135182
public TupleMapping indexKeyMapping() {

dingo-exec/src/main/java/io/dingodb/exec/operator/params/TxnScanWithRelOpParam.java

+91-2
Original file line numberDiff line numberDiff line change
@@ -20,12 +20,34 @@
2020
import com.fasterxml.jackson.annotation.JsonPropertyOrder;
2121
import com.fasterxml.jackson.annotation.JsonTypeName;
2222
import io.dingodb.common.CommonId;
23+
import io.dingodb.common.CoprocessorV2;
24+
import io.dingodb.common.log.LogUtils;
2325
import io.dingodb.common.type.DingoType;
2426
import io.dingodb.common.type.TupleMapping;
27+
import io.dingodb.exec.dag.Vertex;
28+
import io.dingodb.exec.expr.DingoCompileContext;
29+
import io.dingodb.exec.utils.SchemaWrapperUtils;
30+
import io.dingodb.exec.utils.relop.RelOpMappingVisitor;
31+
import io.dingodb.exec.utils.relop.RelOpSelectionVisitor;
32+
import io.dingodb.exec.utils.relop.SelectionFlag;
33+
import io.dingodb.exec.utils.relop.SelectionObj;
34+
import io.dingodb.expr.coding.CodingFlag;
35+
import io.dingodb.expr.coding.RelOpCoder;
36+
import io.dingodb.expr.common.type.TupleType;
2537
import io.dingodb.expr.rel.RelOp;
2638
import lombok.Getter;
39+
import lombok.extern.slf4j.Slf4j;
2740
import org.checkerframework.checker.nullness.qual.NonNull;
2841

42+
import java.io.ByteArrayOutputStream;
43+
import java.util.Comparator;
44+
import java.util.HashSet;
45+
import java.util.List;
46+
import java.util.Set;
47+
import java.util.stream.Collectors;
48+
import java.util.stream.IntStream;
49+
50+
@Slf4j
2951
@Getter
3052
@JsonTypeName("txnScanRel")
3153
@JsonPropertyOrder({
@@ -46,6 +68,7 @@ public class TxnScanWithRelOpParam extends ScanWithRelOpParam {
4668
private final long timeOut;
4769
@JsonProperty("scanTs")
4870
private long scanTs;
71+
private final boolean isAutoCommit;
4972

5073
public TxnScanWithRelOpParam(
5174
CommonId tableId,
@@ -59,17 +82,83 @@ public TxnScanWithRelOpParam(
5982
boolean pushDown,
6083
int schemaVersion,
6184
int limit,
62-
int codecVersion
85+
int codecVersion,
86+
List<Integer> selection,
87+
boolean isAutoCommit
6388
) {
64-
super(tableId, schema, keyMapping, relOp, outputSchema, pushDown, schemaVersion, limit, codecVersion);
89+
super(tableId, schema, keyMapping, relOp, outputSchema, pushDown, schemaVersion, limit, codecVersion, selection);
6590
this.scanTs = scanTs;
6691
this.isolationLevel = isolationLevel;
6792
this.timeOut = timeOut;
93+
this.isAutoCommit = isAutoCommit;
6894
}
6995

7096
@Override
7197
public void setStartTs(long startTs) {
7298
super.setStartTs(startTs);
7399
this.scanTs = startTs;
74100
}
101+
102+
@Override
103+
public void init(Vertex vertex) {
104+
if (selection == null) {
105+
selection = IntStream.range(0, schema.fieldCount())
106+
.boxed()
107+
.collect(Collectors.toList());
108+
}
109+
RelOp relOpCompile = relOp.compile(new DingoCompileContext(
110+
(TupleType) schema.getType(),
111+
(TupleType) vertex.getParasType().getType()
112+
), config);
113+
if (pushDown) {
114+
ByteArrayOutputStream os = new ByteArrayOutputStream();
115+
if (RelOpCoder.INSTANCE.visit(relOpCompile, os) == CodingFlag.OK) {
116+
Set<Integer> selections = new HashSet<>();
117+
SelectionObj selectionObj = new SelectionObj(selections, true);
118+
boolean isSelection = false;
119+
if (isAutoCommit() && RelOpSelectionVisitor.INSTANCE.visit(relOp, selectionObj) == SelectionFlag.OK
120+
&& selectionObj.isProject() && selections.size() != selection.size()) {
121+
try {
122+
selection.clear();
123+
selection.addAll(selections);
124+
selection.sort(Comparator.naturalOrder());
125+
relOpCompile = RelOpMappingVisitor.INSTANCE.visit(relOp, selection);
126+
LogUtils.debug(log, "jobId:{}, new relOp: {}", vertex.getTask().getJobId(), relOpCompile);
127+
isSelection = true;
128+
} catch (Exception e) {
129+
LogUtils.error(log, e.getMessage(), e);
130+
selection = IntStream.range(0, schema.fieldCount())
131+
.boxed()
132+
.collect(Collectors.toList());
133+
}
134+
} else {
135+
LogUtils.debug(log, "jobId:{}, origin relOp: {}", vertex.getTask().getJobId(), relOp);
136+
}
137+
if (isSelection) {
138+
relOpCompile = relOpCompile.compile(new DingoCompileContext(
139+
(TupleType) schema.select(TupleMapping.of(selection)).getType(),
140+
(TupleType) vertex.getParasType().getType()
141+
), config);
142+
os = new ByteArrayOutputStream();
143+
if (RelOpCoder.INSTANCE.visit(relOpCompile, os) != CodingFlag.OK) {
144+
relOp = relOpCompile;
145+
return;
146+
}
147+
}
148+
TupleMapping outputKeyMapping = TupleMapping.of(new int[]{});
149+
coprocessor = CoprocessorV2.builder()
150+
.originalSchema(SchemaWrapperUtils.buildSchemaWrapper(schema, keyMapping, tableId.seq))
151+
.resultSchema(SchemaWrapperUtils.buildSchemaWrapper(outputSchema, outputKeyMapping, tableId.seq))
152+
.selection(selection)
153+
.relExpr(os.toByteArray())
154+
.codecVersion(codecVersion)
155+
.build();
156+
if (limit > 0) {
157+
coprocessor.setLimit(limit);
158+
}
159+
}
160+
}
161+
relOp = relOpCompile;
162+
}
163+
75164
}

0 commit comments

Comments
 (0)