Skip to content

Commit aeb5a7b

Browse files
committed
catalog/lease: properly handle session ID changes
Previously, the lease manager did not properly handle cases when the session ID changes. This could happen because of failover scenarios, where a new session ID would be assigned. The lease manager would not update the in memory or on disk state to pick up the new session ID, if the descriptor version did not change. This could lead to an infinite loop inside lease acquisition. To address this, this patch will allow upserting leases with a new session ID and the same version. Fixes: #141567 Fixes: #141556 Fixes: #141555 Fixes: #141554 Fixes: #141553 Fixes: #141552 Fixes: #141549 Fixes: #141548 Fixes: #141547 Fixes: #141546 Fixes: #141545 Fixes: #141544 Fixes: #141543 Fixes: #141542 Fixes: #141541 Fixes: #141540 Fixes: #141539 Fixes: #141538 Fixes: #141484 Fixes: #141481 Fixes: #141480 Fixes :#141473 Fixes: #141473 Fixes: #141467 Fixes: #141685 Fixes: #141585 Fixes: #141566 Fixes: #141513 Fixes: #141479 Release note: None
1 parent 945d97e commit aeb5a7b

File tree

6 files changed

+169
-14
lines changed

6 files changed

+169
-14
lines changed

pkg/sql/catalog/lease/BUILD.bazel

+1
Original file line numberDiff line numberDiff line change
@@ -127,6 +127,7 @@ go_test(
127127
"//pkg/sql/sqlliveness",
128128
"//pkg/sql/sqlliveness/slbase",
129129
"//pkg/sql/sqlliveness/slprovider",
130+
"//pkg/sql/sqlliveness/sqllivenesstestutils",
130131
"//pkg/sql/sqltestutils",
131132
"//pkg/sql/stats",
132133
"//pkg/sql/types",

pkg/sql/catalog/lease/descriptor_state.go

+22-4
Original file line numberDiff line numberDiff line change
@@ -125,7 +125,7 @@ func (t *descriptorState) upsertLeaseLocked(
125125
desc catalog.Descriptor,
126126
session sqlliveness.Session,
127127
regionEnumPrefix []byte,
128-
) (createdDescriptorVersionState *descriptorVersionState, _ error) {
128+
) error {
129129
if t.mu.maxVersionSeen < desc.GetVersion() {
130130
t.mu.maxVersionSeen = desc.GetVersion()
131131
}
@@ -136,10 +136,28 @@ func (t *descriptorState) upsertLeaseLocked(
136136
}
137137
descState := newDescriptorVersionState(t, desc, hlc.Timestamp{}, session, regionEnumPrefix, true /* isLease */)
138138
t.mu.active.insert(descState)
139-
return descState, nil
139+
t.m.names.insert(ctx, descState)
140+
return nil
141+
}
142+
// If the version already exists and the session ID matches nothing
143+
// needs to be done.
144+
if s.getSessionID() == session.ID() {
145+
return nil
140146
}
141-
// If the version already exists, nothing needs to be done.
142-
return nil, nil
147+
148+
// Otherwise, we need to update the existing lease to fix the session ID. The
149+
// previously stored lease also needs to be deleted.
150+
var existingLease storedLease
151+
func() {
152+
s.mu.Lock()
153+
defer s.mu.Unlock()
154+
existingLease = *s.mu.lease
155+
s.mu.lease.sessionID = session.ID().UnsafeBytes()
156+
s.mu.session = session
157+
}()
158+
// Delete the existing lease on behalf of the caller.
159+
t.m.storage.release(ctx, t.m.stopper, &existingLease)
160+
return nil
143161
}
144162

145163
var _ redact.SafeFormatter = (*descriptorVersionState)(nil)

pkg/sql/catalog/lease/descriptor_version_state.go

+7
Original file line numberDiff line numberDiff line change
@@ -108,6 +108,13 @@ func (s *descriptorVersionState) stringLocked() redact.RedactableString {
108108
return redact.Sprintf("%d(%q,%s) ver=%d:%s, refcount=%d", s.GetID(), s.GetName(), redact.SafeString(sessionID), s.GetVersion(), s.mu.expiration, s.mu.refcount)
109109
}
110110

111+
// getSessionID returns the current session ID from the lease.
112+
func (s *descriptorVersionState) getSessionID() sqlliveness.SessionID {
113+
s.mu.Lock()
114+
defer s.mu.Unlock()
115+
return s.mu.session.ID()
116+
}
117+
111118
// hasExpired checks if the descriptor is too old to be used (by a txn
112119
// operating) at the given timestamp.
113120
func (s *descriptorVersionState) hasExpired(ctx context.Context, timestamp hlc.Timestamp) bool {

pkg/sql/catalog/lease/lease.go

+4-6
Original file line numberDiff line numberDiff line change
@@ -853,15 +853,17 @@ func acquireNodeLease(
853853
}
854854
newest := m.findNewest(id)
855855
var currentVersion descpb.DescriptorVersion
856+
var currentSessionID sqlliveness.SessionID
856857
if newest != nil {
857858
currentVersion = newest.GetVersion()
859+
currentSessionID = newest.getSessionID()
858860
}
859861
// A session will always be populated, since we use session based leasing.
860862
session, err := m.storage.livenessProvider.Session(ctx)
861863
if err != nil {
862864
return false, errors.Wrapf(err, "lease acquisition was unable to resolve liveness session")
863865
}
864-
desc, regionPrefix, err := m.storage.acquire(ctx, session, id, currentVersion)
866+
desc, regionPrefix, err := m.storage.acquire(ctx, session, id, currentVersion, currentSessionID)
865867
if err != nil {
866868
return nil, err
867869
}
@@ -877,14 +879,10 @@ func acquireNodeLease(
877879
t.mu.Lock()
878880
t.mu.takenOffline = false
879881
defer t.mu.Unlock()
880-
var newDescVersionState *descriptorVersionState
881-
newDescVersionState, err = t.upsertLeaseLocked(ctx, desc, session, regionPrefix)
882+
err = t.upsertLeaseLocked(ctx, desc, session, regionPrefix)
882883
if err != nil {
883884
return nil, err
884885
}
885-
if newDescVersionState != nil {
886-
m.names.insert(ctx, newDescVersionState)
887-
}
888886
return true, nil
889887
})
890888
if m.testingKnobs.LeaseStoreTestingKnobs.LeaseAcquireResultBlockEvent != nil {

pkg/sql/catalog/lease/lease_internal_test.go

+118
Original file line numberDiff line numberDiff line change
@@ -29,12 +29,14 @@ import (
2929
"github.com/cockroachdb/cockroach/pkg/sql/enum"
3030
"github.com/cockroachdb/cockroach/pkg/sql/isql"
3131
"github.com/cockroachdb/cockroach/pkg/sql/sqlliveness"
32+
"github.com/cockroachdb/cockroach/pkg/sql/sqlliveness/sqllivenesstestutils"
3233
"github.com/cockroachdb/cockroach/pkg/testutils/serverutils"
3334
"github.com/cockroachdb/cockroach/pkg/testutils/sqlutils"
3435
"github.com/cockroachdb/cockroach/pkg/util/admission"
3536
"github.com/cockroachdb/cockroach/pkg/util/hlc"
3637
"github.com/cockroachdb/cockroach/pkg/util/leaktest"
3738
"github.com/cockroachdb/cockroach/pkg/util/log"
39+
"github.com/cockroachdb/cockroach/pkg/util/syncutil"
3840
"github.com/cockroachdb/cockroach/pkg/util/timeutil"
3941
"github.com/cockroachdb/errors"
4042
"github.com/cockroachdb/logtags"
@@ -1606,3 +1608,119 @@ func TestLeaseCountDetailSessionBased(t *testing.T) {
16061608
require.Equal(t, 1, detail.numSQLInstances)
16071609
require.Equal(t, 0, detail.sampleSQLInstanceID)
16081610
}
1611+
1612+
// fakeSessionProvider session provider that only overloads the Session function
1613+
// with a callback.
1614+
type fakeSessionProvider struct {
1615+
syncutil.Mutex
1616+
sqlliveness.Provider
1617+
getSession func() *sqllivenesstestutils.FakeSession
1618+
}
1619+
1620+
var _ sqlliveness.Provider = &fakeSessionProvider{}
1621+
1622+
// Session implements sqlliveness.Provider
1623+
func (p *fakeSessionProvider) Session(ctx context.Context) (sqlliveness.Session, error) {
1624+
p.Lock()
1625+
defer p.Unlock()
1626+
if f := p.getSession(); f != nil {
1627+
return f, nil
1628+
}
1629+
return p.Provider.Session(ctx)
1630+
}
1631+
1632+
// TestLeaseManagerSessionIDChanges validates that the lease manager can acquire
1633+
// and release leases properly even if the SessionID changes. This can happen
1634+
// during a fail over scenario, where a new SessionID could be picked up. Which,
1635+
// should cause us to reacquire leases.
1636+
func TestLeaseManagerSessionIDChanges(t *testing.T) {
1637+
defer leaktest.AfterTest(t)()
1638+
defer log.Scope(t).Close(t)
1639+
1640+
srv, sqlDB, kvDB := serverutils.StartServer(
1641+
t, base.TestServerArgs{
1642+
// Avoid using tenants since async tenant migration steps can acquire
1643+
// leases on our user tables.
1644+
DefaultTestTenant: base.TestNeedsTightIntegrationBetweenAPIsAndTestingKnobs,
1645+
})
1646+
defer srv.Stopper().Stop(context.Background())
1647+
s := srv.ApplicationLayer()
1648+
leaseManager := s.LeaseManager().(*Manager)
1649+
1650+
runner := sqlutils.MakeSQLRunner(sqlDB)
1651+
runner.Exec(t, `SET CLUSTER SETTING sql.stats.automatic_collection.enabled = false`)
1652+
runner.Exec(t, `SET CLUSTER SETTING sql.catalog.descriptor_wait_for_initial_version.enabled = false`)
1653+
1654+
runner.Exec(t, `
1655+
CREATE DATABASE t;
1656+
CREATE TABLE t.test (k CHAR PRIMARY KEY, v CHAR);
1657+
`)
1658+
tableDesc := desctestutils.TestingGetPublicTableDescriptor(kvDB, s.Codec(), "t", "test")
1659+
1660+
getLatestLeasedDesc := func() *descriptorVersionState {
1661+
state := leaseManager.findDescriptorState(tableDesc.GetID(), false)
1662+
state.mu.Lock()
1663+
defer state.mu.Unlock()
1664+
descState := state.mu.active.findNewest()
1665+
return descState
1666+
}
1667+
1668+
// Set up a fake session provider that will keep changing IDs and every session
1669+
// will instantly expire. This is like having a zero duration lease in the expiry
1670+
// model.
1671+
var nextSessionID atomic.Int64
1672+
var enableHook atomic.Bool
1673+
1674+
fs := fakeSessionProvider{
1675+
Provider: leaseManager.storage.livenessProvider,
1676+
getSession: func() *sqllivenesstestutils.FakeSession {
1677+
if !enableHook.Load() {
1678+
return nil
1679+
}
1680+
now := s.Clock().Now()
1681+
return &sqllivenesstestutils.FakeSession{
1682+
SessionID: sqlliveness.SessionID(fmt.Sprintf("session-%d", nextSessionID.Load())),
1683+
ExpTS: now,
1684+
StartTS: now,
1685+
}
1686+
},
1687+
}
1688+
1689+
// Replace the session provider which only returns expired leases.
1690+
leaseManager.storage.livenessProvider = &fs
1691+
defer func() {
1692+
// Restore the original provider so valid session IDs
1693+
// are assigned.
1694+
leaseManager.storage.livenessProvider = fs.Provider
1695+
}()
1696+
1697+
// Repeatedly lease the same descriptor, with the session ID continuously changing
1698+
// and each one always being expired. This validates that in fail over scenarios,
1699+
// nothing bad happens if the session is expired.
1700+
ctx := context.Background()
1701+
var previousSessionID sqlliveness.SessionID
1702+
var previousExpiry hlc.Timestamp
1703+
for count := 0; count < 10; count++ {
1704+
enableHook.Swap(true)
1705+
nextSessionID.Add(1)
1706+
now := s.Clock().Now()
1707+
desc, err := leaseManager.Acquire(ctx, now, tableDesc.GetID())
1708+
require.NoError(t, err)
1709+
// We expect a new session ID each time, and the descriptor
1710+
// to be expired.
1711+
newSessionID := getLatestLeasedDesc().getSessionID()
1712+
newExpiry := getLatestLeasedDesc().getExpiration(ctx)
1713+
require.NotEqualf(t, previousSessionID, newSessionID, "session ID should not match")
1714+
require.Truef(t, previousExpiry.Less(newExpiry), "session expiry should be later.")
1715+
// Disable the hook before the lease query.
1716+
enableHook.Swap(false)
1717+
// Sanity: Validate that system.lease only has a single lease with new
1718+
// id.
1719+
runner.CheckQueryResults(t, fmt.Sprintf("SELECT session_id FROM system.lease WHERE desc_id=%d", tableDesc.GetID()),
1720+
[][]string{{string(newSessionID.UnsafeBytes())}})
1721+
previousExpiry = newExpiry
1722+
previousSessionID = newSessionID
1723+
desc.Release(ctx)
1724+
}
1725+
1726+
}

pkg/sql/catalog/lease/storage.go

+17-4
Original file line numberDiff line numberDiff line change
@@ -120,14 +120,15 @@ func (s storage) crossValidateDuringRenewal() bool {
120120
// acquire a lease on the most recent version of a descriptor. The lease is tied
121121
// to the provided sqlliveness.Session. If a newer version (then lastVersion) of
122122
// the descriptor exists, this function will attempt to acquire a lease on it.
123-
// If no newer version exists, it returns a nil descriptor. If the lease cannot
123+
// If no newer version exists, it returns a nil descriptor. If the lease cannot
124124
// be obtained because the descriptor is being dropped or is offline (currently
125125
// only applicable to tables), an inactiveTableError is returned.
126126
func (s storage) acquire(
127127
ctx context.Context,
128128
session sqlliveness.Session,
129129
id descpb.ID,
130130
lastVersion descpb.DescriptorVersion,
131+
lastSessionID sqlliveness.SessionID,
131132
) (desc catalog.Descriptor, prefix []byte, _ error) {
132133
ctx = multitenant.WithTenantCostControlExemption(ctx)
133134
prefix = s.getRegionPrefix()
@@ -151,7 +152,7 @@ func (s storage) acquire(
151152
// written a value to the database, which we'd leak if we did not delete it.
152153
// Note that the expiration is part of the primary key in the table, so we
153154
// would not overwrite the old entry if we just were to do another insert.
154-
//repeatIteration = desc != nil
155+
// repeatIteration = desc != nil
155156
if sessionID != nil && desc != nil {
156157
if err := s.writer.deleteLease(ctx, txn, leaseFields{
157158
regionPrefix: prefix,
@@ -168,10 +169,17 @@ func (s storage) acquire(
168169
// any retryable error. If we run into an error then the delete
169170
// above may need to be executed again.
170171
latestDesc, err := s.mustGetDescriptorByID(ctx, txn, id)
171-
// If the descriptor version hasn't changed, then nothing needs to be done.
172-
if err != nil || latestDesc.GetVersion() == lastVersion {
172+
173+
if err != nil {
173174
return err
174175
}
176+
// If the descriptor version hasn't changed, then no new lease to be
177+
// inserted unless the session ID has changed on us. No descriptor will
178+
// be set indicating to the caller that no new version exists.
179+
if latestDesc.GetVersion() == lastVersion &&
180+
lastSessionID == session.ID() {
181+
return nil
182+
}
175183
desc = latestDesc
176184
if err := catalog.FilterAddingDescriptor(desc); err != nil {
177185
return err
@@ -234,6 +242,11 @@ func (s storage) acquire(
234242
case err != nil:
235243
return nil, nil, err
236244
}
245+
// If desc is nil then no new descriptor was available to be leased.
246+
// i.e. the last version we leased is the latest and still held
247+
if desc == nil {
248+
return nil, nil, nil
249+
}
237250
log.VEventf(ctx, 2, "storage acquired lease %v", desc)
238251
if s.testingKnobs.LeaseAcquiredEvent != nil {
239252
s.testingKnobs.LeaseAcquiredEvent(desc, err)

0 commit comments

Comments
 (0)