Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

release-25.1: sql: don't disable streamer due to scanBufferNode #143167

Merged
merged 1 commit into from
Mar 20, 2025
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
14 changes: 12 additions & 2 deletions pkg/sql/distsql_running.go
Original file line number Diff line number Diff line change
Expand Up @@ -710,8 +710,10 @@ func (dsp *DistSQLPlanner) Run(
// then we are ignorant of the details of the execution plan, so we choose
// to be on the safe side and mark 'noMutations' as 'false'.
noMutations := planCtx.planner != nil && !planCtx.planner.curPlan.flags.IsSet(planFlagContainsMutation)
log.VEventf(ctx, 3, "noMutations = %t", noMutations)

if txn == nil {
log.VEventf(ctx, 3, "nil txn")
// Txn can be nil in some cases, like BulkIO flows. In such a case, we
// cannot create a LeafTxn, so we cannot parallelize scans.
planCtx.parallelizeScansIfLocal = false
Expand Down Expand Up @@ -754,6 +756,7 @@ func (dsp *DistSQLPlanner) Run(
// TODO(yuzefovich): fix the propagation of the lock spans with the leaf
// txns and remove this check. See #94290.
containsLocking := planCtx.planner != nil && planCtx.planner.curPlan.flags.IsSet(planFlagContainsLocking)
log.VEventf(ctx, 3, "containsLocking = %t", containsLocking)

// We also currently disable the usage of the Streamer API whenever
// we have a wrapped planNode. This is done to prevent scenarios
Expand All @@ -768,8 +771,15 @@ func (dsp *DistSQLPlanner) Run(
// cases.
mustUseRootTxn := func() bool {
for _, p := range plan.Processors {
if p.Spec.Core.LocalPlanNode != nil {
return true
if n := p.Spec.Core.LocalPlanNode; n != nil {
switch n.Name {
case "scan buffer", "buffer":
// scanBufferNode and bufferNode don't interact with
// txns directly, so they are safe.
default:
log.VEventf(ctx, 3, "must use root txn due to %q wrapped planNode", n.Name)
return true
}
}
}
return false
Expand Down
126 changes: 73 additions & 53 deletions pkg/sql/opt/exec/execbuilder/testdata/inverted_join_geospatial
Original file line number Diff line number Diff line change
Expand Up @@ -15,7 +15,8 @@ CREATE TABLE rtable(
geom geometry,
rk2 string,
PRIMARY KEY (rk1, rk2),
INVERTED INDEX geom_index(geom)
INVERTED INDEX geom_index(geom),
FAMILY (rk1, rk2, geom)
)

query T
Expand Down Expand Up @@ -295,7 +296,7 @@ vectorized: true
spans: FULL SCAN

query T
EXPLAIN (VERBOSE)
EXPLAIN ANALYZE
WITH q AS (
SELECT * FROM ltable WHERE lk > 2
)
Expand All @@ -305,86 +306,105 @@ SELECT count(*), (SELECT count(*) FROM q) FROM (
LEFT JOIN rtable ON ST_Intersects(q.geom1, rtable.geom)
) GROUP BY lk
----
distribution: local
vectorized: true
planning time: 10µs
execution time: 100µs
distribution: <hidden>
vectorized: <hidden>
plan type: custom
maximum memory usage: <hidden>
network usage: <hidden>
regions: <hidden>
isolation level: serializable
priority: normal
quality of service: regular
·
• root
│ columns: (count, count)
├── • render
│ │ columns: (count, count)
│ │ render count: @S2
│ │ render count_rows: count_rows
│ │
│ └── • group (hash)
│ │ columns: (lk, count_rows)
│ │ estimated row count: 333 (missing stats)
│ │ aggregate 0: count_rows()
│ │ sql nodes: <hidden>
│ │ regions: <hidden>
│ │ actual row count: 0
│ │ estimated max memory allocated: 0 B
│ │ estimated max sql temp disk usage: 0 B
│ │ group by: lk
│ │
│ └── • project
│ │ columns: (lk)
│ └── • lookup join (left outer) (streamer)
│ │ sql nodes: <hidden>
│ │ regions: <hidden>
│ │ actual row count: 0
│ │ KV time: 0µs
│ │ KV contention time: 0µs
│ │ KV rows decoded: 0
│ │ KV bytes read: 0 B
│ │ KV gRPC calls: 0
│ │ estimated max memory allocated: 0 B
│ │ table: rtable@rtable_pkey
│ │ equality: (rk1, rk2) = (rk1, rk2)
│ │ equality cols are key
│ │ pred: st_intersects(geom1, geom)
│ │
│ └── • project
│ │ columns: (lk, geom1, geom)
│ └── • inverted join (left outer)
│ │ sql nodes: <hidden>
│ │ regions: <hidden>
│ │ actual row count: 0
│ │ KV time: 0µs
│ │ KV contention time: 0µs
│ │ KV rows decoded: 0
│ │ KV bytes read: 0 B
│ │ KV gRPC calls: 0
│ │ estimated max memory allocated: 0 B
│ │ estimated max sql temp disk usage: 0 B
│ │ table: rtable@geom_index
│ │
│ └── • lookup join (left outer)
│ │ columns: (lk, geom1, rk1, rk2, cont, geom)
│ │ estimated row count: 3,333 (missing stats)
│ │ table: rtable@rtable_pkey
│ │ equality: (rk1, rk2) = (rk1, rk2)
│ │ equality cols are key
│ │ pred: st_intersects(geom1, geom)
│ │
│ └── • project
│ │ columns: (lk, geom1, rk1, rk2, cont)
│ │
│ └── • inverted join (left outer)
│ │ columns: (lk, geom1, rk1, rk2, geom_inverted_key, cont)
│ │ estimated row count: 3,333 (missing stats)
│ │ table: rtable@geom_index
│ │ inverted expr: st_intersects(geom1, geom_inverted_key)
│ │
│ └── • project
│ │ columns: (lk, geom1)
│ │
│ └── • scan buffer
│ columns: (lk, geom1, geom2)
│ estimated row count: 333 (missing stats)
│ label: buffer 1 (q)
│ └── • scan buffer
│ sql nodes: <hidden>
│ regions: <hidden>
│ actual row count: 0
│ label: buffer 1 (q)
├── • subquery
│ │ id: @S1
│ │ original sql: SELECT * FROM ltable WHERE lk > 2
│ │ exec mode: discard all rows
│ │
│ └── • buffer
│ │ columns: (lk, geom1, geom2)
│ │ sql nodes: <hidden>
│ │ regions: <hidden>
│ │ actual row count: 0
│ │ label: buffer 1 (q)
│ │
│ └── • scan
│ columns: (lk, geom1, geom2)
│ estimated row count: 333 (missing stats)
│ sql nodes: <hidden>
│ kv nodes: <hidden>
│ regions: <hidden>
│ actual row count: 0
│ KV time: 0µs
│ KV contention time: 0µs
│ KV rows decoded: 0
│ KV bytes read: 0 B
│ KV gRPC calls: 0
│ estimated max memory allocated: 0 B
│ missing stats
│ table: ltable@ltable_pkey
│ spans: /3-
│ spans: [/3 - ]
└── • subquery
│ id: @S2
│ original sql: (SELECT count(*) FROM q)
│ exec mode: one row
└── • group (scalar)
columns: (count_rows)
estimated row count: 1 (missing stats)
aggregate 0: count_rows()
sql nodes: <hidden>
regions: <hidden>
actual row count: 1
└── • project
│ columns: ()
└── • scan buffer
columns: (lk, geom1, geom2)
estimated row count: 333 (missing stats)
label: buffer 1 (q)
└── • scan buffer
sql nodes: <hidden>
regions: <hidden>
actual row count: 0
label: buffer 1 (q)

# Anti joins are also converted to paired joins by the optimizer.
query T
Expand Down