Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

IGNITE-23799 : Calcite. Hash join. #11770

Open
wants to merge 8 commits into
base: master
Choose a base branch
from
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
8 changes: 4 additions & 4 deletions docs/_docs/SQL/sql-calcite.adoc
Original file line number Diff line number Diff line change
Expand Up @@ -371,10 +371,10 @@ SELECT /*+ ENFORCE_JOIN_ORDER */ T1.V1, T2.V1, T2.V2, T3.V1, T3.V2, T3.V3 FROM T
SELECT t1.v1, t3.v2 FROM TBL1 t1 JOIN TBL3 t3 on t1.v3=t3.v3 WHERE t1.v2 in (SELECT /*+ ENFORCE_JOIN_ORDER */ t2.v2 FROM TBL2 t2 JOIN TBL3 t3 ON t2.v1=t3.v1)
----

==== MERGE_JOIN, NL_JOIN, CNL_JOIN
Forces certain join type: Merge, Nested Loop and Correlated Nested Loop respectively.
==== MERGE_JOIN, NL_JOIN, CNL_JOIN, HASH_JOIN
Forces certain join type: Merge, Nested Loop, Correlated Nested Loop and Hash Join respectively.

Every of those has the negation like 'NO_INDEX': CNL_JOIN, NO_CNL_JOIN. The negation hint disables certain join type.
Every of those has the negation like 'NO_INDEX': CNL_JOIN, NO_CNL_JOIN, NO_HASH_JOIN. The negation hint disables certain join type.

===== Parameters:
* Empty. To force or disable certain join type for every join.
Expand All @@ -389,7 +389,7 @@ SELECT /*+ NL_JOIN(TBL3,TBL1) */ t4.v1, t2.v2 FROM TBL1 t4 JOIN TBL2 t2 on t1.v3

SELECT t1.v1, t2.v2 FROM TBL2 t1 JOIN TBL1 t2 on t1.v3=t2.v3 WHERE t2.v3 in (SELECT /*+ NO_CNL_JOIN(TBL4) */ t3.v3 FROM TBL3 t3 JOIN TBL4 t4 on t3.v1=t4.v1)

SELECT t4.v1, t2.v2 FROM TBL1 t4 JOIN TBL2 t2 on t1.v3=t2.v3 WHERE t2.v1 in (SELECT t3.v3 FROM TBL3 t3 JOIN TBL1 /*+ NL_JOIN */ t4 on t3.v2=t4.v2)
SELECT t4.v1, t2.v2 FROM TBL1 t4 JOIN TBL2 t2 on t1.v3=t2.v3 WHERE t2.v1 in (SELECT t3.v3 FROM TBL3 t3 JOIN TBL1 /*+ HASH_JOIN */ t4 on t3.v2=t4.v2)
----

==== EXPAND_DISTINCT_AGG
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -18,6 +18,7 @@
package org.apache.ignite.internal.processors.query.calcite.exec;

import java.util.ArrayList;
import java.util.Arrays;
import java.util.Comparator;
import java.util.List;
import java.util.Objects;
Expand Down Expand Up @@ -49,6 +50,7 @@
import org.apache.ignite.internal.processors.query.calcite.exec.rel.CorrelatedNestedLoopJoinNode;
import org.apache.ignite.internal.processors.query.calcite.exec.rel.FilterNode;
import org.apache.ignite.internal.processors.query.calcite.exec.rel.HashAggregateNode;
import org.apache.ignite.internal.processors.query.calcite.exec.rel.HashJoinNode;
import org.apache.ignite.internal.processors.query.calcite.exec.rel.Inbox;
import org.apache.ignite.internal.processors.query.calcite.exec.rel.IndexSpoolNode;
import org.apache.ignite.internal.processors.query.calcite.exec.rel.IntersectNode;
Expand All @@ -74,6 +76,7 @@
import org.apache.ignite.internal.processors.query.calcite.rel.IgniteExchange;
import org.apache.ignite.internal.processors.query.calcite.rel.IgniteFilter;
import org.apache.ignite.internal.processors.query.calcite.rel.IgniteHashIndexSpool;
import org.apache.ignite.internal.processors.query.calcite.rel.IgniteHashJoin;
import org.apache.ignite.internal.processors.query.calcite.rel.IgniteIndexBound;
import org.apache.ignite.internal.processors.query.calcite.rel.IgniteIndexCount;
import org.apache.ignite.internal.processors.query.calcite.rel.IgniteIndexScan;
Expand Down Expand Up @@ -266,6 +269,16 @@ public LogicalRelImplementor(
return node;
}

/** {@inheritDoc} */
@Override public Node<Row> visit(IgniteHashJoin rel) {
Node<Row> node = HashJoinNode.create(ctx, rel.getRowType(), rel.getLeft().getRowType(), rel.getRight().getRowType(),
rel.getJoinType(), rel.analyzeCondition());

node.register(Arrays.asList(visit(rel.getLeft()), visit(rel.getRight())));

return node;
}

/** {@inheritDoc} */
@Override public Node<Row> visit(IgniteCorrelatedNestedLoopJoin rel) {
RelDataType outType = rel.getRowType();
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,174 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/

package org.apache.ignite.internal.processors.query.calcite.exec.rel;

import java.util.ArrayDeque;
import java.util.Deque;
import org.apache.calcite.rel.type.RelDataType;
import org.apache.ignite.internal.processors.query.calcite.exec.ExecutionContext;
import org.apache.ignite.internal.util.typedef.F;
import org.jetbrains.annotations.Nullable;

/** Right-part materialized join node. Holds data from the right part locally. */
public abstract class AbstractRightMaterializedJoinNode<Row> extends MemoryTrackingNode<Row> {
/** Special flag which marks that all the rows are received. */
protected static final int NOT_WAITING = -1;

/** */
protected final Deque<Row> leftInBuf = new ArrayDeque<>(IN_BUFFER_SIZE);

/** */
protected boolean inLoop;

/** */
protected int requested;

/** */
protected int waitingLeft;

/** */
protected int waitingRight;

/** */
protected @Nullable Row left;

/** */
protected AbstractRightMaterializedJoinNode(ExecutionContext<Row> ctx, RelDataType rowType) {
super(ctx, rowType);
}

/** */
protected abstract void join() throws Exception;

/** */
protected abstract void pushRight(Row row) throws Exception;

/** {@inheritDoc} */
@Override public void request(int rowsCnt) throws Exception {
assert !F.isEmpty(sources()) && sources().size() == 2;
assert rowsCnt > 0 && requested == 0;

checkState();

requested = rowsCnt;

if (!inLoop)
context().execute(this::doJoin, this::onError);
}

/** {@inheritDoc} */
@Override protected void rewindInternal() {
requested = 0;
waitingLeft = 0;
waitingRight = 0;
left = null;

leftInBuf.clear();
}

/** {@inheritDoc} */
@Override protected Downstream<Row> requestDownstream(int idx) {
if (idx == 0) {
return new Downstream<>() {
@Override public void push(Row row) throws Exception {
pushLeft(row);
}

@Override public void end() throws Exception {
endLeft();
}

@Override public void onError(Throwable e) {
AbstractRightMaterializedJoinNode.this.onError(e);
}
};
}
else if (idx == 1) {
return new Downstream<>() {
@Override public void push(Row row) throws Exception {
pushRight(row);
}

@Override public void end() throws Exception {
endRight();
}

@Override public void onError(Throwable e) {
AbstractRightMaterializedJoinNode.this.onError(e);
}
};
}

throw new IndexOutOfBoundsException();
}

/** */
private void pushLeft(Row row) throws Exception {
assert downstream() != null;
assert waitingLeft > 0;

checkState();

--waitingLeft;

leftInBuf.add(row);

join();
}

/** */
private void endLeft() throws Exception {
assert downstream() != null;
assert waitingLeft > 0;

checkState();

waitingLeft = NOT_WAITING;

join();
}

/** */
private void endRight() throws Exception {
assert downstream() != null;
assert waitingRight > 0;

checkState();

waitingRight = NOT_WAITING;

join();
}

/** */
protected Node<Row> leftSource() {
return sources().get(0);
}

/** */
protected Node<Row> rightSource() {
return sources().get(1);
}

/** */
private void doJoin() throws Exception {
checkState();

join();
}
}
Loading