Skip to content

Commit a563025

Browse files
committed
kvserver: mark replica as unavailable if leaderless for a long time
This commit marks the replica as unavailable in the leaderlessWatcher if it has been leaderless for a duration above: `kv.replica_raft.leaderless_unavailable_threshold`. This helps requests to bail early on unavailable ranges without relying on the replica circuit breaker to trip. This has multiple benefits: 1) Faster reaction time than the replica circuit breaker: If two nodes fail (assuming R=3), many ranges will become unavailable. With the replica circuit breaker, the breaker is tripped on the request path, causing added delays. However, with this approach, the replica will basically become unavailable on the tick path (rather than on the request path). 2) Lighter weight: Instead of relying on the replica circuit breaker to test the replication pipeline before it marks the range as available again, this approach relies on the Raft signal to know when there is a leader again, indicating that the range is available again. 3) With leader leases, a replica won't propose a lease if it's not the leader. This means that with leader leases, the replica circuit breaker might not trip if the range have lost quorum. However, with this commit, the replica will eventually forget who the leader was, and eventually the leaderlessWatcher would mark it as unavailable. Fixes: #139638 Release note: None
1 parent 41da6e7 commit a563025

6 files changed

+323
-12
lines changed

pkg/kv/kvserver/client_replica_test.go

+137
Original file line numberDiff line numberDiff line change
@@ -39,6 +39,7 @@ import (
3939
"github.com/cockroachdb/cockroach/pkg/kv/kvserver/protectedts/ptutil"
4040
"github.com/cockroachdb/cockroach/pkg/kv/kvserver/raftutil"
4141
"github.com/cockroachdb/cockroach/pkg/kv/kvserver/stateloader"
42+
"github.com/cockroachdb/cockroach/pkg/raft"
4243
"github.com/cockroachdb/cockroach/pkg/raft/raftpb"
4344
"github.com/cockroachdb/cockroach/pkg/roachpb"
4445
"github.com/cockroachdb/cockroach/pkg/server"
@@ -2728,6 +2729,142 @@ func TestChangeReplicasGeneration(t *testing.T) {
27282729
assert.EqualValues(t, repl.Desc().Generation, oldGeneration+3, "\nold: %+v\nnew: %+v", oldDesc, newDesc)
27292730
}
27302731

2732+
// TestLossQuorumCauseLeaderlessWatcherToSignalUnavailable checks that if a range
2733+
// lost its quorum, the remaining replicas in that range will have their
2734+
// leaderlessWatcher indicate that the range is unavailable. Also, it checks
2735+
// that when the range regains quorum, the leaderlessWatcher will indicate that
2736+
// the range is available.
2737+
func TestLossQuorumCauseLeaderWatcherToSignalUnavailable(t *testing.T) {
2738+
defer leaktest.AfterTest(t)()
2739+
defer log.Scope(t).Close(t)
2740+
ctx := context.Background()
2741+
manualClock := hlc.NewHybridManualClock()
2742+
2743+
stickyVFSRegistry := fs.NewStickyRegistry()
2744+
lisReg := listenerutil.NewListenerRegistry()
2745+
defer lisReg.Close()
2746+
2747+
// Perform a basic test setup with two nodes.
2748+
const numServers int = 2
2749+
st := cluster.MakeTestingClusterSettings()
2750+
2751+
// Set `kv.replica_raft.leaderless_unavailable_threshold` to 10 seconds.
2752+
threshold := time.Second * 10
2753+
kvserver.ReplicaLeaderlessUnavailableThreshold.Override(ctx, &st.SV, threshold)
2754+
2755+
stickyServerArgs := make(map[int]base.TestServerArgs)
2756+
for i := 0; i < numServers; i++ {
2757+
stickyServerArgs[i] = base.TestServerArgs{
2758+
Settings: st,
2759+
StoreSpecs: []base.StoreSpec{
2760+
{
2761+
InMemory: true,
2762+
StickyVFSID: strconv.FormatInt(int64(i), 10),
2763+
},
2764+
},
2765+
Knobs: base.TestingKnobs{
2766+
Server: &server.TestingKnobs{
2767+
StickyVFSRegistry: stickyVFSRegistry,
2768+
},
2769+
},
2770+
}
2771+
}
2772+
2773+
tc := testcluster.StartTestCluster(t, numServers,
2774+
base.TestClusterArgs{
2775+
ReplicationMode: base.ReplicationManual,
2776+
ReusableListenerReg: lisReg,
2777+
ServerArgsPerNode: stickyServerArgs,
2778+
})
2779+
2780+
defer tc.Stopper().Stop(ctx)
2781+
2782+
key := tc.ScratchRange(t)
2783+
tc.AddVotersOrFatal(t, key, tc.Targets(1)...)
2784+
desc, err := tc.LookupRange(key)
2785+
require.NoError(t, err)
2786+
2787+
// Randomly stop server index 0 or 1.
2788+
stoppedNodeInx := rand.Intn(2)
2789+
aliveNodeIdx := 1 - stoppedNodeInx
2790+
tc.StopServer(stoppedNodeInx)
2791+
repl := tc.GetFirstStoreFromServer(t, aliveNodeIdx).LookupReplica(roachpb.RKey(key))
2792+
2793+
// The range is available initially.
2794+
require.False(t, repl.LeaderlessWatcher.IsUnavailable())
2795+
2796+
// Wait until the remaining replica becomes leaderless.
2797+
testutils.SucceedsSoon(t, func() error {
2798+
if repl.RaftStatus().Lead != raft.None {
2799+
return errors.New("Leader still exists")
2800+
}
2801+
return nil
2802+
})
2803+
2804+
// Increment the clock by the leaderlessWatcher unavailable threshold.
2805+
manualClock.Increment(threshold.Nanoseconds())
2806+
2807+
// Wait for the leaderlessWatcher to indicate that the range is unavailable.
2808+
testutils.SucceedsSoon(t, func() error {
2809+
tc.GetFirstStoreFromServer(t, aliveNodeIdx).LookupReplica(roachpb.RKey(key))
2810+
if !repl.LeaderlessWatcher.IsUnavailable() {
2811+
return errors.New("range is still available")
2812+
}
2813+
return nil
2814+
})
2815+
2816+
sendPutRequestWithTimeout := func(repl *kvserver.Replica, timeout time.Duration) (*kvpb.Error, error) {
2817+
ctx, cancel := context.WithTimeout(ctx, timeout)
2818+
defer cancel()
2819+
ba := &kvpb.BatchRequest{}
2820+
ba.RangeID = desc.RangeID
2821+
ba.Timestamp = repl.Clock().Now()
2822+
ba.Add(putArgs(key, []byte("foo")))
2823+
_, pErr := repl.Send(ctx, ba)
2824+
return pErr, ctx.Err()
2825+
}
2826+
2827+
// Requests should immediately return an error indicating that the range is
2828+
// unavailable.
2829+
pErr, ctxErr := sendPutRequestWithTimeout(repl, 2*time.Second)
2830+
require.NoError(t, ctxErr)
2831+
require.Regexp(t, "replica has been leaderless for 10s", pErr)
2832+
require.True(t, errors.HasType(pErr.GoError(), (*kvpb.ReplicaUnavailableError)(nil)),
2833+
"expected ReplicaUnavailableError, got %v", err)
2834+
2835+
// At this point we know that the replica is considered unavailable. Regain
2836+
// the quorum and check that the leaderlessWatcher indicates that the range is
2837+
// available.
2838+
require.NoError(t, tc.RestartServer(stoppedNodeInx))
2839+
2840+
testutils.SucceedsSoon(t, func() error {
2841+
repl = tc.GetFirstStoreFromServer(t, aliveNodeIdx).LookupReplica(roachpb.RKey(key))
2842+
tc.GetFirstStoreFromServer(t, aliveNodeIdx).LookupReplica(roachpb.RKey(key))
2843+
if repl.LeaderlessWatcher.IsUnavailable() {
2844+
return errors.New("range is still unavailable")
2845+
}
2846+
return nil
2847+
})
2848+
2849+
// Requests should now succeed. We need to try both replicas to avoid
2850+
// NotLeaseHolderErrors.
2851+
testutils.SucceedsSoon(t, func() error {
2852+
for i := range numServers {
2853+
repl := tc.GetFirstStoreFromServer(t, i).LookupReplica(roachpb.RKey(key))
2854+
pErr, ctxErr = sendPutRequestWithTimeout(repl, 2*time.Second)
2855+
if ctxErr == nil && pErr == nil {
2856+
return nil
2857+
}
2858+
}
2859+
// If we reach this point, we know that the request failed, return the
2860+
// error.
2861+
if ctxErr != nil {
2862+
return ctxErr
2863+
}
2864+
return pErr.GoError()
2865+
})
2866+
}
2867+
27312868
func TestClearRange(t *testing.T) {
27322869
defer leaktest.AfterTest(t)()
27332870
defer log.Scope(t).Close(t)

pkg/kv/kvserver/replica.go

+5
Original file line numberDiff line numberDiff line change
@@ -232,6 +232,11 @@ func (lw *leaderlessWatcher) IsUnavailable() bool {
232232
return lw.mu.unavailable
233233
}
234234

235+
func (lw *leaderlessWatcher) resetLocked() {
236+
lw.mu.leaderlessTimestamp = time.Time{}
237+
lw.mu.unavailable = false
238+
}
239+
235240
// ReplicaMutex is an RWMutex. It has its own type to make it easier to look for
236241
// usages specific to the replica mutex.
237242
type ReplicaMutex syncutil.RWMutex

pkg/kv/kvserver/replica_backpressure.go

+16-5
Original file line numberDiff line numberDiff line change
@@ -118,18 +118,29 @@ func canBackpressureBatch(ba *kvpb.BatchRequest) bool {
118118
return false
119119
}
120120

121-
// signallerForBatch returns the signaller to use for this batch. This is the
122-
// Replica's breaker's signaller except if any request in the batch uses
123-
// poison.Policy_Wait, in which case it's a neverTripSignaller. In particular,
124-
// `(signaller).C() == nil` signals that the request bypasses the circuit
125-
// breakers.
121+
// signallerForBatch returns the signaller to use for this batch in the
122+
// following priorities:
123+
// 1. If the batch contains a request uses poison.Policy_Wait, we will return
124+
// neverTripSignaller.
125+
// 2. If the replica is leaderless for a time longer than the threshold in
126+
// `kv.replica_raft.leaderless_unavailable_threshold`, use the
127+
// leaderlessWatcher signal.
128+
// 3. Otherwise, use the replica's breaker's signaller
126129
func (r *Replica) signallerForBatch(ba *kvpb.BatchRequest) signaller {
127130
for _, ru := range ba.Requests {
128131
req := ru.GetInner()
129132
if kvpb.BypassesReplicaCircuitBreaker(req) {
130133
return neverTripSignaller{}
131134
}
132135
}
136+
137+
// If the leaderless watcher indicates that this replica is leaderless for a
138+
// long time, use it as the signal.
139+
if r.LeaderlessWatcher.IsUnavailable() {
140+
return r.LeaderlessWatcher
141+
}
142+
143+
// Otherwise, use the replica's breaker.
133144
return r.breaker.Signal()
134145
}
135146

pkg/kv/kvserver/replica_raft.go

+52-5
Original file line numberDiff line numberDiff line change
@@ -1540,19 +1540,25 @@ func (r *Replica) tick(
15401540
//
15411541
// This is likely unintentional, and the leader should likely consider itself
15421542
// live even when quiesced.
1543+
nowPhysicalTime := r.Clock().PhysicalTime()
15431544
if r.isRaftLeaderRLocked() {
1544-
r.mu.lastUpdateTimes.update(r.replicaID, r.Clock().PhysicalTime())
1545+
r.mu.lastUpdateTimes.update(r.replicaID, nowPhysicalTime)
15451546
// We also update lastUpdateTimes for replicas that provide store liveness
15461547
// support to the leader.
15471548
r.updateLastUpdateTimesUsingStoreLivenessRLocked(storeClockTimestamp)
15481549
}
15491550

15501551
r.mu.ticks++
1551-
preTickState := r.mu.internalRaftGroup.BasicStatus().RaftState
1552+
preTickStatus := r.mu.internalRaftGroup.BasicStatus()
15521553
r.mu.internalRaftGroup.Tick()
1553-
postTickState := r.mu.internalRaftGroup.BasicStatus().RaftState
1554-
if preTickState != postTickState {
1555-
if postTickState == raftpb.StatePreCandidate {
1554+
postTickStatus := r.mu.internalRaftGroup.BasicStatus()
1555+
1556+
// Check if the replica has been leaderless for too long, and potentially set
1557+
// the leaderless watcher replica state as unavailable.
1558+
r.maybeMarkReplicaUnavailableInLeaderlessWatcher(ctx, postTickStatus.Lead, nowPhysicalTime)
1559+
1560+
if preTickStatus.RaftState != postTickStatus.RaftState {
1561+
if postTickStatus.RaftState == raftpb.StatePreCandidate {
15561562
r.store.Metrics().RaftTimeoutCampaign.Inc(1)
15571563
if k := r.store.TestingKnobs(); k != nil && k.OnRaftTimeoutCampaign != nil {
15581564
k.OnRaftTimeoutCampaign(r.RangeID)
@@ -2179,6 +2185,47 @@ func (r *Replica) reportSnapshotStatus(ctx context.Context, to roachpb.ReplicaID
21792185
}
21802186
}
21812187

2188+
// maybeMarkReplicaUnavailableInLeaderlessWatcher marks the replica as
2189+
// unavailable in the leaderless watcher if the replica has been leaderless
2190+
// for a duration of time greater than or equal to the threshold.
2191+
func (r *Replica) maybeMarkReplicaUnavailableInLeaderlessWatcher(
2192+
ctx context.Context, postTickLead raftpb.PeerID, storeClockTime time.Time,
2193+
) {
2194+
r.LeaderlessWatcher.mu.Lock()
2195+
defer r.LeaderlessWatcher.mu.Unlock()
2196+
2197+
threshold := ReplicaLeaderlessUnavailableThreshold.Get(&r.store.cfg.Settings.SV)
2198+
if threshold == time.Duration(0) {
2199+
// The leaderless watcher is disabled. It's important to reset the
2200+
// leaderless watcher when it's disabled to reset any replica that was
2201+
// marked as unavailable before the watcher was disabled.
2202+
r.LeaderlessWatcher.resetLocked()
2203+
return
2204+
}
2205+
2206+
if postTickLead != raft.None {
2207+
// If we know about the leader, reset the leaderless timer, and mark the
2208+
// replica as available.
2209+
r.LeaderlessWatcher.resetLocked()
2210+
} else if r.LeaderlessWatcher.mu.leaderlessTimestamp.IsZero() {
2211+
// If we don't know about the leader, and we haven't been leaderless before,
2212+
// mark the time we became leaderless.
2213+
r.LeaderlessWatcher.mu.leaderlessTimestamp = storeClockTime
2214+
} else if !r.LeaderlessWatcher.mu.unavailable {
2215+
// At this point we know that we have been leaderless for sometime, and
2216+
// we haven't marked the replica as unavailable yet. Make sure we didn't
2217+
// exceed the threshold. Otherwise, mark the replica as unavailable.
2218+
durationSinceLeaderless := storeClockTime.Sub(r.LeaderlessWatcher.mu.leaderlessTimestamp)
2219+
if durationSinceLeaderless >= threshold {
2220+
err := errors.Errorf("have been leaderless for %.2fs, setting the "+
2221+
"leaderless watcher replica's state as unavailable",
2222+
durationSinceLeaderless.Seconds())
2223+
log.Warningf(ctx, "%s", err)
2224+
r.LeaderlessWatcher.mu.unavailable = true
2225+
}
2226+
}
2227+
}
2228+
21822229
type snapTruncationInfo struct {
21832230
index kvpb.RaftIndex
21842231
recipientStore roachpb.StoreID

pkg/kv/kvserver/replica_raft_test.go

+112
Original file line numberDiff line numberDiff line change
@@ -18,6 +18,7 @@ import (
1818
"github.com/cockroachdb/cockroach/pkg/kv/kvclient/kvcoord"
1919
"github.com/cockroachdb/cockroach/pkg/kv/kvpb"
2020
"github.com/cockroachdb/cockroach/pkg/kv/kvserver/logstore"
21+
"github.com/cockroachdb/cockroach/pkg/raft"
2122
"github.com/cockroachdb/cockroach/pkg/raft/raftpb"
2223
"github.com/cockroachdb/cockroach/pkg/raft/tracker"
2324
"github.com/cockroachdb/cockroach/pkg/roachpb"
@@ -386,3 +387,114 @@ func checkNoLeakedTraceSpans(t *testing.T, store *Store) {
386387
return errors.Newf("%s\n\ngoroutines of interest: %v\nstacks:\n\n%s", buf.String(), ids, sl)
387388
})
388389
}
390+
391+
// TestMaybeMarkReplicaUnavailableInLeaderlessWatcher is a basic unit test for
392+
// the function maybeMarkReplicaUnavailableInLeaderlessWatcher.
393+
func TestMaybeMarkReplicaUnavailableInLeaderlessWatcher(t *testing.T) {
394+
defer leaktest.AfterTest(t)()
395+
defer log.Scope(t).Close(t)
396+
397+
now := time.Now()
398+
leaderlessThreshold := time.Second * 60
399+
400+
testCases := []struct {
401+
name string
402+
initReplicaUnavailable bool
403+
// The initial leaderless timestamp of the replica in the leaderlessWatcher.
404+
initLeaderlessTimestamp time.Time
405+
leader raftpb.PeerID
406+
disableWatcher bool
407+
expectedLeaderlessTime time.Time
408+
expectedUnavailable bool
409+
}{
410+
{
411+
name: "leader known",
412+
initLeaderlessTimestamp: time.Time{},
413+
leader: raftpb.PeerID(1),
414+
disableWatcher: false,
415+
// Since the leader is known, we expect that the replica is considered
416+
// available, and the leaderless timestamp is reset.
417+
expectedLeaderlessTime: time.Time{},
418+
expectedUnavailable: false,
419+
},
420+
{
421+
name: "leader was unknown but is now known",
422+
initLeaderlessTimestamp: now.Add(-10 * time.Second),
423+
leader: raftpb.PeerID(1),
424+
disableWatcher: false,
425+
// Since the leader is known, we expect that the replica is considered
426+
// available, and the leaderless timestamp is reset.
427+
expectedLeaderlessTime: time.Time{},
428+
expectedUnavailable: false,
429+
},
430+
{
431+
name: "leader unknown less than threshold",
432+
initLeaderlessTimestamp: now.Add(-10 * time.Second),
433+
leader: raft.None,
434+
disableWatcher: false,
435+
// Since the leader has been unknown for less than the threshold, we
436+
// expect that the replica is considered available, and the leaderless
437+
// time it is set properly.
438+
expectedLeaderlessTime: now.Add(-10 * time.Second),
439+
expectedUnavailable: false,
440+
},
441+
{
442+
name: "leader unknown exceeds threshold",
443+
initLeaderlessTimestamp: now.Add(-leaderlessThreshold),
444+
leader: raft.None,
445+
disableWatcher: false,
446+
// Since the leader has been unknown for the threshold period, we
447+
// expect that the replica is considered unavailable, and the leaderless
448+
// time it is set properly.
449+
expectedLeaderlessTime: now.Add(-leaderlessThreshold),
450+
expectedUnavailable: true,
451+
},
452+
{
453+
name: "leader unknown exceeds threshold and watcher disabled",
454+
initReplicaUnavailable: true,
455+
initLeaderlessTimestamp: now.Add(-leaderlessThreshold),
456+
leader: raft.None,
457+
disableWatcher: true,
458+
// Since the watcher is disabled, the replica won't be considered
459+
// unavailable even if the replica was marked as unavailable, and it's
460+
// been leaderless for a long time.
461+
expectedLeaderlessTime: time.Time{},
462+
expectedUnavailable: false,
463+
},
464+
}
465+
for _, tc := range testCases {
466+
ctx := context.Background()
467+
stopper := stop.NewStopper()
468+
469+
tContext := testContext{}
470+
cfg := TestStoreConfig(nil)
471+
472+
if tc.disableWatcher {
473+
ReplicaLeaderlessUnavailableThreshold.Override(ctx, &cfg.Settings.SV, time.Duration(0))
474+
} else {
475+
ReplicaLeaderlessUnavailableThreshold.Override(ctx, &cfg.Settings.SV, leaderlessThreshold)
476+
}
477+
tContext.StartWithStoreConfig(ctx, t, stopper, cfg)
478+
479+
repl := tContext.repl
480+
repl.LeaderlessWatcher.mu.unavailable = tc.initReplicaUnavailable
481+
repl.LeaderlessWatcher.mu.leaderlessTimestamp = tc.initLeaderlessTimestamp
482+
repl.maybeMarkReplicaUnavailableInLeaderlessWatcher(ctx, tc.leader, now)
483+
require.Equal(t, tc.expectedUnavailable, repl.LeaderlessWatcher.IsUnavailable())
484+
require.Equal(t, tc.expectedLeaderlessTime, repl.LeaderlessWatcher.mu.leaderlessTimestamp)
485+
486+
// Attempt to write to the replica to ensure that if it's considered
487+
// unavailable, we should get an error.
488+
key := roachpb.Key("a")
489+
write := putArgs(key, []byte("foo"))
490+
_, pErr := tContext.SendWrapped(&write)
491+
492+
if tc.expectedUnavailable {
493+
require.Regexp(t, "replica has been leaderless for 1m0s", pErr)
494+
} else {
495+
require.NoError(t, pErr.GoError())
496+
}
497+
498+
stopper.Stop(ctx)
499+
}
500+
}

0 commit comments

Comments
 (0)