Skip to content

Commit 41da6e7

Browse files
committed
kvserver: add leaderlessWatcher
This commit adds a leaderlessWatcher to the replica. This will be used to signal when that replica has been leaderless for a long time. This signal will be used in future commits to return a ReplicaUnavailable error to requests that encounter replicas without a leader for a long time. This indicates that the range have lost quorum, and can't establish leadership in time. This is important for leader leases in particular because replicas will not propose a lease if they are not the leader, which makes the replica circuit breakers not effective when it comes to quorum loss with leader leases. References: #139638 Release note: None
1 parent 80d6692 commit 41da6e7

File tree

5 files changed

+117
-0
lines changed

5 files changed

+117
-0
lines changed

docs/generated/settings/settings.html

+1
Original file line numberDiff line numberDiff line change
@@ -116,6 +116,7 @@
116116
<tr><td><div id="setting-kv-rangefeed-closed-timestamp-refresh-interval" class="anchored"><code>kv.rangefeed.closed_timestamp_refresh_interval</code></div></td><td>duration</td><td><code>3s</code></td><td>the interval at which closed-timestamp updatesare delivered to rangefeeds; set to 0 to use kv.closed_timestamp.side_transport_interval</td><td>Dedicated/Self-hosted (read-write); Serverless (read-only)</td></tr>
117117
<tr><td><div id="setting-kv-rangefeed-enabled" class="anchored"><code>kv.rangefeed.enabled</code></div></td><td>boolean</td><td><code>false</code></td><td>if set, rangefeed registration is enabled</td><td>Dedicated/Self-hosted (read-write); Serverless (read-only)</td></tr>
118118
<tr><td><div id="setting-kv-replica-circuit-breaker-slow-replication-threshold" class="anchored"><code>kv.replica_circuit_breaker.slow_replication_threshold</code></div></td><td>duration</td><td><code>1m0s</code></td><td>duration after which slow proposals trip the per-Replica circuit breaker (zero duration disables breakers)</td><td>Dedicated/Self-Hosted</td></tr>
119+
<tr><td><div id="setting-kv-replica-raft-leaderless-unavailable-threshold" class="anchored"><code>kv.replica_raft.leaderless_unavailable_threshold</code></div></td><td>duration</td><td><code>1m0s</code></td><td>duration after which leaderless replicas is considered unavailable. Set to 0 to disable leaderless replica availability checks</td><td>Dedicated/Self-Hosted</td></tr>
119120
<tr><td><div id="setting-kv-replica-stats-addsst-request-size-factor" class="anchored"><code>kv.replica_stats.addsst_request_size_factor</code></div></td><td>integer</td><td><code>50000</code></td><td>the divisor that is applied to addsstable request sizes, then recorded in a leaseholders QPS; 0 means all requests are treated as cost 1</td><td>Dedicated/Self-Hosted</td></tr>
120121
<tr><td><div id="setting-kv-replication-reports-interval" class="anchored"><code>kv.replication_reports.interval</code></div></td><td>duration</td><td><code>1m0s</code></td><td>the frequency for generating the replication_constraint_stats, replication_stats_report and replication_critical_localities reports (set to 0 to disable)</td><td>Dedicated/Self-Hosted</td></tr>
121122
<tr><td><div id="setting-kv-snapshot-rebalance-max-rate" class="anchored"><code>kv.snapshot_rebalance.max_rate</code></div></td><td>byte size</td><td><code>32 MiB</code></td><td>the rate limit (bytes/sec) to use for rebalance and upreplication snapshots</td><td>Dedicated/Self-Hosted</td></tr>

pkg/kv/kvserver/replica.go

+60
Original file line numberDiff line numberDiff line change
@@ -176,6 +176,62 @@ func (c *atomicConnectionClass) set(cc rpc.ConnectionClass) {
176176
atomic.StoreUint32((*uint32)(c), uint32(cc))
177177
}
178178

179+
// leaderlessWatcher is a lightweight implementation of the signaller interface
180+
// that is used to signal when a replica doesn't know who the leader is for an
181+
// extended period of time. This is used to signal that the range in
182+
// unavailable.
183+
type leaderlessWatcher struct {
184+
mu struct {
185+
syncutil.RWMutex
186+
187+
// leaderlessTimestamp records the timestamp captured when the replica
188+
// didn't know who the leader was. This is reset on every tick if the
189+
// replica knows who the leader is.
190+
leaderlessTimestamp time.Time
191+
192+
// unavailable is set to true if the replica is leaderless for a long time
193+
// (longer than ReplicaLeaderlessUnavailableThreshold).
194+
unavailable bool
195+
}
196+
197+
// err is the error returned when the replica is leaderless for a long time.
198+
err error
199+
200+
// closedChannel is an already closed channel. Requests will use it to know
201+
// that the replica is leaderless, and can be considered unavailable. This
202+
// is primarily due to implementation details of the request path, where
203+
// the request grabs a signaller in signallerForBatch() and then checks if
204+
// the channel is closed to determine if the replica is available.
205+
closedChannel chan struct{}
206+
}
207+
208+
// NewLeaderlessWatcher initializes a new leaderlessWatcher with the default
209+
// values.
210+
func NewLeaderlessWatcher(r *Replica) *leaderlessWatcher {
211+
closedCh := make(chan struct{})
212+
close(closedCh)
213+
return &leaderlessWatcher{
214+
err: r.replicaUnavailableError(
215+
errors.Errorf("replica has been leaderless for %s",
216+
ReplicaLeaderlessUnavailableThreshold.Get(&r.store.cfg.Settings.SV))),
217+
closedChannel: closedCh,
218+
}
219+
}
220+
221+
func (lw *leaderlessWatcher) Err() error {
222+
return lw.err
223+
}
224+
225+
func (lw *leaderlessWatcher) C() <-chan struct{} {
226+
return lw.closedChannel
227+
}
228+
229+
func (lw *leaderlessWatcher) IsUnavailable() bool {
230+
lw.mu.RLock()
231+
defer lw.mu.RUnlock()
232+
return lw.mu.unavailable
233+
}
234+
179235
// ReplicaMutex is an RWMutex. It has its own type to make it easier to look for
180236
// usages specific to the replica mutex.
181237
type ReplicaMutex syncutil.RWMutex
@@ -941,6 +997,10 @@ type Replica struct {
941997
lastTickTimestamp hlc.ClockTimestamp
942998
}
943999

1000+
// LeaderlessWatcher is used to signal when a replica is leaderless for a long
1001+
// time.
1002+
LeaderlessWatcher *leaderlessWatcher
1003+
9441004
// The raft log truncations that are pending. Access is protected by its own
9451005
// mutex. All implementation details should be considered hidden except to
9461006
// the code in raft_log_truncator.go. External code should only use the

pkg/kv/kvserver/replica_init.go

+1
Original file line numberDiff line numberDiff line change
@@ -245,6 +245,7 @@ func newUninitializedReplicaWithoutRaftGroup(
245245
r.breaker = newReplicaCircuitBreaker(
246246
store.cfg.Settings, store.stopper, r.AmbientContext, r, onTrip, onReset,
247247
)
248+
r.LeaderlessWatcher = NewLeaderlessWatcher(r)
248249
r.mu.currentRACv2Mode = r.replicationAdmissionControlModeToUse(context.TODO())
249250
r.raftMu.flowControlLevel = kvflowcontrol.GetV2EnabledWhenLeaderLevel(
250251
r.raftCtx, store.ClusterSettings(), store.TestingKnobs().FlowControlTestingKnobs)

pkg/kv/kvserver/replica_raft.go

+15
Original file line numberDiff line numberDiff line change
@@ -32,6 +32,7 @@ import (
3232
"github.com/cockroachdb/cockroach/pkg/raft/raftpb"
3333
"github.com/cockroachdb/cockroach/pkg/raft/tracker"
3434
"github.com/cockroachdb/cockroach/pkg/roachpb"
35+
"github.com/cockroachdb/cockroach/pkg/settings"
3536
"github.com/cockroachdb/cockroach/pkg/settings/cluster"
3637
"github.com/cockroachdb/cockroach/pkg/storage"
3738
"github.com/cockroachdb/cockroach/pkg/util"
@@ -54,6 +55,20 @@ import (
5455
var raftDisableLeaderFollowsLeaseholder = envutil.EnvOrDefaultBool(
5556
"COCKROACH_DISABLE_LEADER_FOLLOWS_LEASEHOLDER", false)
5657

58+
// ReplicaLeaderlessUnavailableThreshold is the duration after which leaderless
59+
// replicas are considered unavailable. Set to 0 to disable.
60+
var ReplicaLeaderlessUnavailableThreshold = settings.RegisterDurationSettingWithExplicitUnit(
61+
settings.SystemOnly,
62+
"kv.replica_raft.leaderless_unavailable_threshold",
63+
"duration after which leaderless replicas is considered unavailable. Set to 0"+
64+
" to disable leaderless replica availability checks",
65+
60*time.Second,
66+
settings.WithPublic,
67+
// Setting the duration too low could be very dangerous to cluster health as
68+
// replicas under normal operation could be considered unavailable.
69+
settings.DurationWithMinimumOrZeroDisable(5*time.Second),
70+
)
71+
5772
// evalAndPropose prepares the necessary pending command struct and initializes
5873
// a client command ID if one hasn't been. A verified lease is supplied as a
5974
// parameter if the command requires a lease; nil otherwise. It then evaluates

pkg/kv/kvserver/replica_test.go

+40
Original file line numberDiff line numberDiff line change
@@ -15400,3 +15400,43 @@ func TestLockAcquisitions1PCInteractions(t *testing.T) {
1540015400
})
1540115401
})
1540215402
}
15403+
15404+
// TestLeaderlessWatcherInit tests that the leaderless watcher is initialized
15405+
// correctly.
15406+
func TestLeaderlessWatcherInit(t *testing.T) {
15407+
defer leaktest.AfterTest(t)()
15408+
defer log.Scope(t).Close(t)
15409+
ctx := context.Background()
15410+
tc := testContext{}
15411+
stopper := stop.NewStopper()
15412+
defer stopper.Stop(ctx)
15413+
15414+
// Set the leaderless threshold to 10 second.
15415+
tsc := TestStoreConfig(nil /* clock */)
15416+
ReplicaLeaderlessUnavailableThreshold.Override(ctx, &tsc.Settings.SV, 10*time.Second)
15417+
tc.StartWithStoreConfig(ctx, t, stopper, tsc)
15418+
15419+
repl, err := tc.store.GetReplica(1)
15420+
require.NoError(t, err)
15421+
15422+
repl.LeaderlessWatcher.mu.RLock()
15423+
defer repl.LeaderlessWatcher.mu.RUnlock()
15424+
15425+
// Initially, the leaderWatcher doesn't consider the replica as unavailable.
15426+
require.False(t, repl.LeaderlessWatcher.IsUnavailable())
15427+
15428+
// The leaderless timestamp is not set.
15429+
require.Equal(t, time.Time{}, repl.LeaderlessWatcher.mu.leaderlessTimestamp)
15430+
15431+
// The error is always loaded.
15432+
require.Regexp(t, "replica has been leaderless for 10s", repl.LeaderlessWatcher.Err())
15433+
15434+
// The channel is closed.
15435+
c := repl.LeaderlessWatcher.C()
15436+
select {
15437+
case <-c:
15438+
// Channel is closed, which is expected
15439+
default:
15440+
t.Fatalf("expected LeaderlessWatcher channel to be closed")
15441+
}
15442+
}

0 commit comments

Comments
 (0)