Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

kvserver: go through DB for GC #143124

Open
wants to merge 1 commit into
base: master
Choose a base branch
from
Open
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
17 changes: 9 additions & 8 deletions pkg/kv/kvserver/kvserverbase/base.go
Original file line number Diff line number Diff line change
@@ -132,14 +132,15 @@ var _ redact.SafeFormatter = CmdIDKey("")

// FilterArgs groups the arguments to a ReplicaCommandFilter.
type FilterArgs struct {
Ctx context.Context
CmdID CmdIDKey
Index int
Sid roachpb.StoreID
Req kvpb.Request
Hdr kvpb.Header
Version roachpb.Version
Err error // only used for TestingPostEvalFilter
Ctx context.Context
CmdID CmdIDKey
Index int
Sid roachpb.StoreID
Req kvpb.Request
Hdr kvpb.Header
AdmissionHdr kvpb.AdmissionHeader
Version roachpb.Version
Err error // only used for TestingPostEvalFilter
}

// ProposalFilterArgs groups the arguments to ReplicaProposalFilter.
51 changes: 13 additions & 38 deletions pkg/kv/kvserver/mvcc_gc_queue.go
Original file line number Diff line number Diff line change
@@ -13,6 +13,7 @@ import (
"sync/atomic"
"time"

"github.com/cockroachdb/cockroach/pkg/keys"
"github.com/cockroachdb/cockroach/pkg/kv"
"github.com/cockroachdb/cockroach/pkg/kv/kvpb"
"github.com/cockroachdb/cockroach/pkg/kv/kvserver/gc"
@@ -123,16 +124,6 @@ var EnqueueInMvccGCQueueOnSpanConfigUpdateEnabled = settings.RegisterBoolSetting
false,
)

// See https://github.com/cockroachdb/cockroach/pull/143122.
var mvccGCQueueFullyEnableAC = settings.RegisterBoolSetting(
settings.SystemOnly,
"kv.mvcc_gc.queue_kv_admission_control.enabled",
"when true, MVCC GC queue operations are subject to store admission control. If set to false, "+
"since store admission control will be disabled, replication flow control will also be effectively disabled. "+
"This setting does not affect CPU admission control.",
true,
)

func largeAbortSpan(ms enginepb.MVCCStats) bool {
// Checks if the size of the abort span exceeds the given threshold.
// The abort span is not supposed to become that large, but it does
@@ -594,6 +585,11 @@ func (r *replicaGCer) template() kvpb.GCRequest {
desc := r.repl.Desc()
var template kvpb.GCRequest
template.Key = desc.StartKey.AsRawKey()
if r.repl.RangeID == 1 {
// r1 should really start at LocalMax but it starts "officially" at KeyMin
// which is not addressable.
template.Key = keys.LocalMax
}
template.EndKey = desc.EndKey.AsRawKey()

return template
@@ -603,34 +599,13 @@ func (r *replicaGCer) send(ctx context.Context, req kvpb.GCRequest) error {
n := atomic.AddInt32(&r.count, 1)
log.Eventf(ctx, "sending batch %d (%d keys, %d rangekeys)", n, len(req.Keys), len(req.RangeKeys))

ba := &kvpb.BatchRequest{}
// Technically not needed since we're talking directly to the Replica.
ba.RangeID = r.repl.Desc().RangeID
ba.Timestamp = r.repl.Clock().Now()
ba.Add(&req)
// Since we are talking directly to the replica, we need to explicitly do
// admission control here, as we are bypassing server.Node.
var admissionHandle kvadmission.Handle
if r.admissionController != nil {
ba.AdmissionHeader = gcAdmissionHeader(r.repl.ClusterSettings())
ba.Replica.StoreID = r.storeID
var err error
admissionHandle, err = r.admissionController.AdmitKVWork(ctx, roachpb.SystemTenantID, ba)
if err != nil {
return err
}
if mvccGCQueueFullyEnableAC.Get(&r.repl.ClusterSettings().SV) {
ctx = admissionHandle.AnnotateCtx(ctx)
}
}
_, writeBytes, pErr := r.repl.SendWithWriteBytes(ctx, ba)
defer writeBytes.Release()
if r.admissionController != nil {
r.admissionController.AdmittedKVWorkDone(admissionHandle, writeBytes)
}
if pErr != nil {
log.VErrEventf(ctx, 2, "%v", pErr.String())
return pErr.GoError()
var b kv.Batch
b.AddRawRequest(&req)
b.AdmissionHeader = gcAdmissionHeader(r.repl.ClusterSettings())

if err := r.repl.store.cfg.DB.Run(ctx, &b); err != nil {
log.VErrEventf(ctx, 2, "%s", err)
return err
}
return nil
}
3 changes: 3 additions & 0 deletions pkg/kv/kvserver/mvcc_gc_queue_test.go
Original file line number Diff line number Diff line change
@@ -1397,6 +1397,9 @@ func TestMVCCGCQueueChunkRequests(t *testing.T) {
func(filterArgs kvserverbase.FilterArgs) *kvpb.Error {
if _, ok := filterArgs.Req.(*kvpb.GCRequest); ok {
atomic.AddInt32(&gcRequests, 1)
// Verify that all MVCC GC requests have their admission control header
// populated.
assert.NotZero(t, filterArgs.AdmissionHdr.CreateTime)
return nil
}
return nil
30 changes: 16 additions & 14 deletions pkg/kv/kvserver/replica_evaluate.go
Original file line number Diff line number Diff line change
@@ -324,13 +324,14 @@ func evaluateBatch(
// If a unittest filter was installed, check for an injected error; otherwise, continue.
if filter := rec.EvalKnobs().TestingEvalFilter; filter != nil {
filterArgs := kvserverbase.FilterArgs{
Ctx: ctx,
CmdID: idKey,
Index: index,
Sid: rec.StoreID(),
Req: args,
Version: rec.ClusterSettings().Version.ActiveVersionOrEmpty(ctx).Version,
Hdr: baHeader,
Ctx: ctx,
CmdID: idKey,
Index: index,
Sid: rec.StoreID(),
Req: args,
Version: rec.ClusterSettings().Version.ActiveVersionOrEmpty(ctx).Version,
Hdr: baHeader,
AdmissionHdr: ba.AdmissionHeader,
}
if pErr := filter(filterArgs); pErr != nil {
if pErr.GetTxn() == nil {
@@ -356,13 +357,14 @@ func evaluateBatch(

if filter := rec.EvalKnobs().TestingPostEvalFilter; filter != nil {
filterArgs := kvserverbase.FilterArgs{
Ctx: ctx,
CmdID: idKey,
Index: index,
Sid: rec.StoreID(),
Req: args,
Hdr: baHeader,
Err: err,
Ctx: ctx,
CmdID: idKey,
Index: index,
Sid: rec.StoreID(),
Req: args,
Hdr: baHeader,
AdmissionHdr: ba.AdmissionHeader,
Err: err,
}
if pErr := filter(filterArgs); pErr != nil {
if pErr.GetTxn() == nil {
1 change: 1 addition & 0 deletions pkg/settings/registry.go
Original file line number Diff line number Diff line change
@@ -259,6 +259,7 @@ var retiredSettings = map[InternalKey]struct{}{

// removed as of 25.2
"kv.snapshot_receiver.excise.enabled": {},
"kv.mvcc_gc.queue_kv_admission_control.enabled": {},
"sql.catalog.experimental_use_session_based_leasing": {},
"bulkio.backup.merge_file_buffer_size": {},
"changefeed.new_webhook_sink_enabled": {},