Skip to content

Commit 9de7529

Browse files
committed
sql: don't disable streamer due to scanBufferNode
In ed3f640 we disabled the streamer whenever the plan contains LocalPlanNode processor (which is just a wrapper around a `planNode`). This was done to prevent misuse of the txn API, and we knew at the time that we were disabling the streamer in more cases than strictly necessary. We just saw a support case where it was disabled due to `scanBufferNode`, so this commit includes `scanBufferNode` and `bufferNode` into the allow-list of planNodes that don't disable the streamer (since these don't interact with internal executors or the txn in any way). Also add a few trace messages around this code. Release note: None
1 parent 256d7bd commit 9de7529

File tree

2 files changed

+84
-54
lines changed

2 files changed

+84
-54
lines changed

pkg/sql/distsql_running.go

+12-2
Original file line numberDiff line numberDiff line change
@@ -710,8 +710,10 @@ func (dsp *DistSQLPlanner) Run(
710710
// then we are ignorant of the details of the execution plan, so we choose
711711
// to be on the safe side and mark 'noMutations' as 'false'.
712712
noMutations := planCtx.planner != nil && !planCtx.planner.curPlan.flags.IsSet(planFlagContainsMutation)
713+
log.VEventf(ctx, 3, "noMutations = %t", noMutations)
713714

714715
if txn == nil {
716+
log.VEventf(ctx, 3, "nil txn")
715717
// Txn can be nil in some cases, like BulkIO flows. In such a case, we
716718
// cannot create a LeafTxn, so we cannot parallelize scans.
717719
planCtx.parallelizeScansIfLocal = false
@@ -754,6 +756,7 @@ func (dsp *DistSQLPlanner) Run(
754756
// TODO(yuzefovich): fix the propagation of the lock spans with the leaf
755757
// txns and remove this check. See #94290.
756758
containsLocking := planCtx.planner != nil && planCtx.planner.curPlan.flags.IsSet(planFlagContainsLocking)
759+
log.VEventf(ctx, 3, "containsLocking = %t", containsLocking)
757760

758761
// We also currently disable the usage of the Streamer API whenever
759762
// we have a wrapped planNode. This is done to prevent scenarios
@@ -768,8 +771,15 @@ func (dsp *DistSQLPlanner) Run(
768771
// cases.
769772
mustUseRootTxn := func() bool {
770773
for _, p := range plan.Processors {
771-
if p.Spec.Core.LocalPlanNode != nil {
772-
return true
774+
if n := p.Spec.Core.LocalPlanNode; n != nil {
775+
switch n.Name {
776+
case "scan buffer", "buffer":
777+
// scanBufferNode and bufferNode don't interact with
778+
// txns directly, so they are safe.
779+
default:
780+
log.VEventf(ctx, 3, "must use root txn due to %q wrapped planNode", n.Name)
781+
return true
782+
}
773783
}
774784
}
775785
return false

pkg/sql/opt/exec/execbuilder/testdata/inverted_join_geospatial

+72-52
Original file line numberDiff line numberDiff line change
@@ -295,7 +295,7 @@ vectorized: true
295295
spans: FULL SCAN
296296

297297
query T
298-
EXPLAIN (VERBOSE)
298+
EXPLAIN ANALYZE
299299
WITH q AS (
300300
SELECT * FROM ltable WHERE lk > 2
301301
)
@@ -305,86 +305,106 @@ SELECT count(*), (SELECT count(*) FROM q) FROM (
305305
LEFT JOIN rtable ON ST_Intersects(q.geom1, rtable.geom)
306306
) GROUP BY lk
307307
----
308-
distribution: local
309-
vectorized: true
308+
planning time: 10µs
309+
execution time: 100µs
310+
distribution: <hidden>
311+
vectorized: <hidden>
312+
plan type: custom
313+
maximum memory usage: <hidden>
314+
network usage: <hidden>
315+
regions: <hidden>
316+
isolation level: serializable
317+
priority: normal
318+
quality of service: regular
310319
·
311320
• root
312-
│ columns: (count, count)
313321
314322
├── • render
315-
│ │ columns: (count, count)
316-
│ │ render count: @S2
317-
│ │ render count_rows: count_rows
318323
│ │
319324
│ └── • group (hash)
320-
│ │ columns: (lk, count_rows)
321-
│ │ estimated row count: 333 (missing stats)
322-
│ │ aggregate 0: count_rows()
325+
│ │ sql nodes: <hidden>
326+
│ │ regions: <hidden>
327+
│ │ actual row count: 0
328+
│ │ estimated max memory allocated: 0 B
329+
│ │ estimated max sql temp disk usage: 0 B
323330
│ │ group by: lk
324331
│ │
325-
│ └── • project
326-
│ │ columns: (lk)
332+
│ └── • lookup join (left outer) (streamer)
333+
│ │ sql nodes: <hidden>
334+
│ │ regions: <hidden>
335+
│ │ actual row count: 0
336+
│ │ KV time: 0µs
337+
│ │ KV contention time: 0µs
338+
│ │ KV rows decoded: 0
339+
│ │ KV bytes read: 0 B
340+
│ │ KV gRPC calls: 0
341+
│ │ estimated max memory allocated: 0 B
342+
│ │ estimated max sql temp disk usage: 0 B
343+
│ │ table: rtable@rtable_pkey
344+
│ │ equality: (rk1, rk2) = (rk1, rk2)
345+
│ │ equality cols are key
346+
│ │ pred: st_intersects(geom1, geom)
327347
│ │
328-
│ └── • project
329-
│ │ columns: (lk, geom1, geom)
348+
│ └── • inverted join (left outer)
349+
│ │ sql nodes: <hidden>
350+
│ │ regions: <hidden>
351+
│ │ actual row count: 0
352+
│ │ KV time: 0µs
353+
│ │ KV contention time: 0µs
354+
│ │ KV rows decoded: 0
355+
│ │ KV bytes read: 0 B
356+
│ │ KV gRPC calls: 0
357+
│ │ estimated max memory allocated: 0 B
358+
│ │ estimated max sql temp disk usage: 0 B
359+
│ │ table: rtable@geom_index
330360
│ │
331-
│ └── • lookup join (left outer)
332-
│ │ columns: (lk, geom1, rk1, rk2, cont, geom)
333-
│ │ estimated row count: 3,333 (missing stats)
334-
│ │ table: rtable@rtable_pkey
335-
│ │ equality: (rk1, rk2) = (rk1, rk2)
336-
│ │ equality cols are key
337-
│ │ pred: st_intersects(geom1, geom)
338-
│ │
339-
│ └── • project
340-
│ │ columns: (lk, geom1, rk1, rk2, cont)
341-
│ │
342-
│ └── • inverted join (left outer)
343-
│ │ columns: (lk, geom1, rk1, rk2, geom_inverted_key, cont)
344-
│ │ estimated row count: 3,333 (missing stats)
345-
│ │ table: rtable@geom_index
346-
│ │ inverted expr: st_intersects(geom1, geom_inverted_key)
347-
│ │
348-
│ └── • project
349-
│ │ columns: (lk, geom1)
350-
│ │
351-
│ └── • scan buffer
352-
│ columns: (lk, geom1, geom2)
353-
│ estimated row count: 333 (missing stats)
354-
│ label: buffer 1 (q)
361+
│ └── • scan buffer
362+
│ sql nodes: <hidden>
363+
│ regions: <hidden>
364+
│ actual row count: 0
365+
│ label: buffer 1 (q)
355366
356367
├── • subquery
357368
│ │ id: @S1
358369
│ │ original sql: SELECT * FROM ltable WHERE lk > 2
359370
│ │ exec mode: discard all rows
360371
│ │
361372
│ └── • buffer
362-
│ │ columns: (lk, geom1, geom2)
373+
│ │ sql nodes: <hidden>
374+
│ │ regions: <hidden>
375+
│ │ actual row count: 0
363376
│ │ label: buffer 1 (q)
364377
│ │
365378
│ └── • scan
366-
│ columns: (lk, geom1, geom2)
367-
│ estimated row count: 333 (missing stats)
379+
│ sql nodes: <hidden>
380+
│ kv nodes: <hidden>
381+
│ regions: <hidden>
382+
│ actual row count: 0
383+
│ KV time: 0µs
384+
│ KV contention time: 0µs
385+
│ KV rows decoded: 0
386+
│ KV bytes read: 0 B
387+
│ KV gRPC calls: 0
388+
│ estimated max memory allocated: 0 B
389+
│ missing stats
368390
│ table: ltable@ltable_pkey
369-
│ spans: /3-
391+
│ spans: [/3 - ]
370392
371393
└── • subquery
372394
│ id: @S2
373395
│ original sql: (SELECT count(*) FROM q)
374396
│ exec mode: one row
375397
376398
└── • group (scalar)
377-
columns: (count_rows)
378-
estimated row count: 1 (missing stats)
379-
aggregate 0: count_rows()
399+
sql nodes: <hidden>
400+
regions: <hidden>
401+
actual row count: 1
380402
381-
└── • project
382-
│ columns: ()
383-
384-
└── • scan buffer
385-
columns: (lk, geom1, geom2)
386-
estimated row count: 333 (missing stats)
387-
label: buffer 1 (q)
403+
└── • scan buffer
404+
sql nodes: <hidden>
405+
regions: <hidden>
406+
actual row count: 0
407+
label: buffer 1 (q)
388408

389409
# Anti joins are also converted to paired joins by the optimizer.
390410
query T

0 commit comments

Comments
 (0)