Skip to content

Commit 0dfc7b5

Browse files
committed
sql: don't disable streamer due to scanBufferNode
In ed3f640 we disabled the streamer whenever the plan contains LocalPlanNode processor (which is just a wrapper around a `planNode`). This was done to prevent misuse of the txn API, and we knew at the time that we were disabling the streamer in more cases than strictly necessary. We just saw a support case where it was disabled due to `scanBufferNode`, so this commit includes `scanBufferNode` and `bufferNode` into the allow-list of planNodes that don't disable the streamer (since these don't interact with internal executors or the txn in any way). Also add a few trace messages around this code. Release note: None
1 parent 256d7bd commit 0dfc7b5

File tree

2 files changed

+85
-55
lines changed

2 files changed

+85
-55
lines changed

pkg/sql/distsql_running.go

+12-2
Original file line numberDiff line numberDiff line change
@@ -710,8 +710,10 @@ func (dsp *DistSQLPlanner) Run(
710710
// then we are ignorant of the details of the execution plan, so we choose
711711
// to be on the safe side and mark 'noMutations' as 'false'.
712712
noMutations := planCtx.planner != nil && !planCtx.planner.curPlan.flags.IsSet(planFlagContainsMutation)
713+
log.VEventf(ctx, 3, "noMutations = %t", noMutations)
713714

714715
if txn == nil {
716+
log.VEventf(ctx, 3, "nil txn")
715717
// Txn can be nil in some cases, like BulkIO flows. In such a case, we
716718
// cannot create a LeafTxn, so we cannot parallelize scans.
717719
planCtx.parallelizeScansIfLocal = false
@@ -754,6 +756,7 @@ func (dsp *DistSQLPlanner) Run(
754756
// TODO(yuzefovich): fix the propagation of the lock spans with the leaf
755757
// txns and remove this check. See #94290.
756758
containsLocking := planCtx.planner != nil && planCtx.planner.curPlan.flags.IsSet(planFlagContainsLocking)
759+
log.VEventf(ctx, 3, "containsLocking = %t", containsLocking)
757760

758761
// We also currently disable the usage of the Streamer API whenever
759762
// we have a wrapped planNode. This is done to prevent scenarios
@@ -768,8 +771,15 @@ func (dsp *DistSQLPlanner) Run(
768771
// cases.
769772
mustUseRootTxn := func() bool {
770773
for _, p := range plan.Processors {
771-
if p.Spec.Core.LocalPlanNode != nil {
772-
return true
774+
if n := p.Spec.Core.LocalPlanNode; n != nil {
775+
switch n.Name {
776+
case "scan buffer", "buffer":
777+
// scanBufferNode and bufferNode don't interact with
778+
// txns directly, so they are safe.
779+
default:
780+
log.VEventf(ctx, 3, "must use root txn due to %q wrapped planNode", n.Name)
781+
return true
782+
}
773783
}
774784
}
775785
return false

pkg/sql/opt/exec/execbuilder/testdata/inverted_join_geospatial

+73-53
Original file line numberDiff line numberDiff line change
@@ -15,7 +15,8 @@ CREATE TABLE rtable(
1515
geom geometry,
1616
rk2 string,
1717
PRIMARY KEY (rk1, rk2),
18-
INVERTED INDEX geom_index(geom)
18+
INVERTED INDEX geom_index(geom),
19+
FAMILY (rk1, rk2, geom)
1920
)
2021

2122
query T
@@ -295,7 +296,7 @@ vectorized: true
295296
spans: FULL SCAN
296297

297298
query T
298-
EXPLAIN (VERBOSE)
299+
EXPLAIN ANALYZE
299300
WITH q AS (
300301
SELECT * FROM ltable WHERE lk > 2
301302
)
@@ -305,86 +306,105 @@ SELECT count(*), (SELECT count(*) FROM q) FROM (
305306
LEFT JOIN rtable ON ST_Intersects(q.geom1, rtable.geom)
306307
) GROUP BY lk
307308
----
308-
distribution: local
309-
vectorized: true
309+
planning time: 10µs
310+
execution time: 100µs
311+
distribution: <hidden>
312+
vectorized: <hidden>
313+
plan type: custom
314+
maximum memory usage: <hidden>
315+
network usage: <hidden>
316+
regions: <hidden>
317+
isolation level: serializable
318+
priority: normal
319+
quality of service: regular
310320
·
311321
• root
312-
│ columns: (count, count)
313322
314323
├── • render
315-
│ │ columns: (count, count)
316-
│ │ render count: @S2
317-
│ │ render count_rows: count_rows
318324
│ │
319325
│ └── • group (hash)
320-
│ │ columns: (lk, count_rows)
321-
│ │ estimated row count: 333 (missing stats)
322-
│ │ aggregate 0: count_rows()
326+
│ │ sql nodes: <hidden>
327+
│ │ regions: <hidden>
328+
│ │ actual row count: 0
329+
│ │ estimated max memory allocated: 0 B
330+
│ │ estimated max sql temp disk usage: 0 B
323331
│ │ group by: lk
324332
│ │
325-
│ └── • project
326-
│ │ columns: (lk)
333+
│ └── • lookup join (left outer) (streamer)
334+
│ │ sql nodes: <hidden>
335+
│ │ regions: <hidden>
336+
│ │ actual row count: 0
337+
│ │ KV time: 0µs
338+
│ │ KV contention time: 0µs
339+
│ │ KV rows decoded: 0
340+
│ │ KV bytes read: 0 B
341+
│ │ KV gRPC calls: 0
342+
│ │ estimated max memory allocated: 0 B
343+
│ │ table: rtable@rtable_pkey
344+
│ │ equality: (rk1, rk2) = (rk1, rk2)
345+
│ │ equality cols are key
346+
│ │ pred: st_intersects(geom1, geom)
327347
│ │
328-
│ └── • project
329-
│ │ columns: (lk, geom1, geom)
348+
│ └── • inverted join (left outer)
349+
│ │ sql nodes: <hidden>
350+
│ │ regions: <hidden>
351+
│ │ actual row count: 0
352+
│ │ KV time: 0µs
353+
│ │ KV contention time: 0µs
354+
│ │ KV rows decoded: 0
355+
│ │ KV bytes read: 0 B
356+
│ │ KV gRPC calls: 0
357+
│ │ estimated max memory allocated: 0 B
358+
│ │ estimated max sql temp disk usage: 0 B
359+
│ │ table: rtable@geom_index
330360
│ │
331-
│ └── • lookup join (left outer)
332-
│ │ columns: (lk, geom1, rk1, rk2, cont, geom)
333-
│ │ estimated row count: 3,333 (missing stats)
334-
│ │ table: rtable@rtable_pkey
335-
│ │ equality: (rk1, rk2) = (rk1, rk2)
336-
│ │ equality cols are key
337-
│ │ pred: st_intersects(geom1, geom)
338-
│ │
339-
│ └── • project
340-
│ │ columns: (lk, geom1, rk1, rk2, cont)
341-
│ │
342-
│ └── • inverted join (left outer)
343-
│ │ columns: (lk, geom1, rk1, rk2, geom_inverted_key, cont)
344-
│ │ estimated row count: 3,333 (missing stats)
345-
│ │ table: rtable@geom_index
346-
│ │ inverted expr: st_intersects(geom1, geom_inverted_key)
347-
│ │
348-
│ └── • project
349-
│ │ columns: (lk, geom1)
350-
│ │
351-
│ └── • scan buffer
352-
│ columns: (lk, geom1, geom2)
353-
│ estimated row count: 333 (missing stats)
354-
│ label: buffer 1 (q)
361+
│ └── • scan buffer
362+
│ sql nodes: <hidden>
363+
│ regions: <hidden>
364+
│ actual row count: 0
365+
│ label: buffer 1 (q)
355366
356367
├── • subquery
357368
│ │ id: @S1
358369
│ │ original sql: SELECT * FROM ltable WHERE lk > 2
359370
│ │ exec mode: discard all rows
360371
│ │
361372
│ └── • buffer
362-
│ │ columns: (lk, geom1, geom2)
373+
│ │ sql nodes: <hidden>
374+
│ │ regions: <hidden>
375+
│ │ actual row count: 0
363376
│ │ label: buffer 1 (q)
364377
│ │
365378
│ └── • scan
366-
│ columns: (lk, geom1, geom2)
367-
│ estimated row count: 333 (missing stats)
379+
│ sql nodes: <hidden>
380+
│ kv nodes: <hidden>
381+
│ regions: <hidden>
382+
│ actual row count: 0
383+
│ KV time: 0µs
384+
│ KV contention time: 0µs
385+
│ KV rows decoded: 0
386+
│ KV bytes read: 0 B
387+
│ KV gRPC calls: 0
388+
│ estimated max memory allocated: 0 B
389+
│ missing stats
368390
│ table: ltable@ltable_pkey
369-
│ spans: /3-
391+
│ spans: [/3 - ]
370392
371393
└── • subquery
372394
│ id: @S2
373395
│ original sql: (SELECT count(*) FROM q)
374396
│ exec mode: one row
375397
376398
└── • group (scalar)
377-
columns: (count_rows)
378-
estimated row count: 1 (missing stats)
379-
aggregate 0: count_rows()
399+
sql nodes: <hidden>
400+
regions: <hidden>
401+
actual row count: 1
380402
381-
└── • project
382-
│ columns: ()
383-
384-
└── • scan buffer
385-
columns: (lk, geom1, geom2)
386-
estimated row count: 333 (missing stats)
387-
label: buffer 1 (q)
403+
└── • scan buffer
404+
sql nodes: <hidden>
405+
regions: <hidden>
406+
actual row count: 0
407+
label: buffer 1 (q)
388408

389409
# Anti joins are also converted to paired joins by the optimizer.
390410
query T

0 commit comments

Comments
 (0)