From 16d9d946432c8e67e06263d071e12f8b0b29dc40 Mon Sep 17 00:00:00 2001 From: Yahor Yuzefovich Date: Thu, 6 Mar 2025 17:04:42 -0800 Subject: [PATCH 1/5] kvclient: inline some helper methods in txnWriteBuffer helpers Given that we only have two scan variants (Scan and ReverseScan) and will only support two scan formats for now (KEY_VALUES and BATCH_RESPONSE), it seems cleaner and less verbose to use `if` blocks rather than `switch` statements. Thus, this commit inlines some of the helper methods and gets rid of `switch`es. I think it'll be easier to follow the logic for BATCH_RESPONSE scan format added in the following commit. The only somewhat meaningful change is the fact that we now check the scan format first as opposed to checking whether we have a Scan or a ReverseScan (this will make logic for BATCH_RESPONSE cleaner) Release note: None --- .../kvcoord/txn_interceptor_write_buffer.go | 227 ++++++------------ 1 file changed, 78 insertions(+), 149 deletions(-) diff --git a/pkg/kv/kvclient/kvcoord/txn_interceptor_write_buffer.go b/pkg/kv/kvclient/kvcoord/txn_interceptor_write_buffer.go index ef35d8afb909..f9ca1988b91b 100644 --- a/pkg/kv/kvclient/kvcoord/txn_interceptor_write_buffer.go +++ b/pkg/kv/kvclient/kvcoord/txn_interceptor_write_buffer.go @@ -529,10 +529,10 @@ func (twb *txnWriteBuffer) mergeWithScanResp( return rm.toScanResp(), nil } -// mergeWithScanResp takes a ReverseScanRequest, that was sent to the KV layer, -// and the response returned by the KV layer, and merges it with any writes that -// were buffered by the transaction to correctly uphold read-your-own-write -// semantics. +// mergeWithReverseScanResp takes a ReverseScanRequest, that was sent to the KV +// layer, and the response returned by the KV layer, and merges it with any +// writes that were buffered by the transaction to correctly uphold +// read-your-own-write semantics. func (twb *txnWriteBuffer) mergeWithReverseScanResp( req *kvpb.ReverseScanRequest, resp *kvpb.ReverseScanResponse, ) (*kvpb.ReverseScanResponse, error) { @@ -967,16 +967,23 @@ type respIter struct { scanResp *kvpb.ScanResponse reverseScanReq *kvpb.ReverseScanRequest reverseScanResp *kvpb.ReverseScanResponse - index int + // scanFormat indicates the ScanFormat of the request. Only KEY_VALUES is + // supported right now. + scanFormat kvpb.ScanFormat + + rowsIndex int } // newScanRespIter constructs and returns a new iterator to iterate over a // ScanRequest/Response. func newScanRespIter(req *kvpb.ScanRequest, resp *kvpb.ScanResponse) *respIter { + if req.ScanFormat != kvpb.KEY_VALUES { + panic("unexpected") + } return &respIter{ - scanReq: req, - scanResp: resp, - index: 0, + scanReq: req, + scanResp: resp, + scanFormat: req.ScanFormat, } } @@ -985,128 +992,73 @@ func newScanRespIter(req *kvpb.ScanRequest, resp *kvpb.ScanResponse) *respIter { func newReverseScanRespIter( req *kvpb.ReverseScanRequest, resp *kvpb.ReverseScanResponse, ) *respIter { + if req.ScanFormat != kvpb.KEY_VALUES { + panic("unexpected") + } return &respIter{ reverseScanReq: req, reverseScanResp: resp, - index: 0, + scanFormat: req.ScanFormat, } } // peekKey returns the key at the current iterator position. func (s *respIter) peekKey() roachpb.Key { - switch s.method() { - case kvpb.Scan: - switch s.scanFormat() { - case kvpb.KEY_VALUES: - return s.scanResp.Rows[s.index].Key - default: - panic("unexpected") + if s.scanFormat == kvpb.KEY_VALUES { + if s.scanReq != nil { + return s.scanResp.Rows[s.rowsIndex].Key } - - case kvpb.ReverseScan: - switch s.scanFormat() { - case kvpb.KEY_VALUES: - return s.reverseScanResp.Rows[s.index].Key - default: - panic("unexpected") - } - default: - panic("unexpected") + return s.reverseScanResp.Rows[s.rowsIndex].Key } + panic("unexpected") } // next moves the iterator forward. func (s *respIter) next() { - s.index++ + s.rowsIndex++ } // valid returns whether the iterator is (still) positioned to a valid index. func (s *respIter) valid() bool { - switch s.method() { - case kvpb.Scan: - switch s.scanFormat() { - case kvpb.KEY_VALUES: - return s.index < len(s.scanResp.Rows) - default: - panic("unexpected") - } - case kvpb.ReverseScan: - switch s.scanFormat() { - case kvpb.KEY_VALUES: - return s.index < len(s.reverseScanResp.Rows) - default: - panic("unexpected") + if s.scanFormat == kvpb.KEY_VALUES { + if s.scanReq != nil { + return s.rowsIndex < len(s.scanResp.Rows) } - default: - panic("unexpected") + return s.rowsIndex < len(s.reverseScanResp.Rows) } + panic("unexpected") } // reset re-positions the iterator to the beginning of the response. func (s *respIter) reset() { - s.index = 0 -} - -// scanFormat returns the scan format of the request/response. -func (s *respIter) scanFormat() kvpb.ScanFormat { - switch s.method() { - case kvpb.Scan: - return s.scanReq.ScanFormat - case kvpb.ReverseScan: - return s.reverseScanReq.ScanFormat - default: - panic("unexpected") - } + s.rowsIndex = 0 } // startKey returns the start key of the request in response to which the // iterator was created. func (s *respIter) startKey() roachpb.Key { - switch s.method() { - case kvpb.Scan: + if s.scanReq != nil { return s.scanReq.Key - case kvpb.ReverseScan: - return s.reverseScanReq.Key - default: - panic("unexpected") } + return s.reverseScanReq.Key } // endKey returns the end key of the request in response to which the iterator // was created. func (s *respIter) endKey() roachpb.Key { - switch s.method() { - case kvpb.Scan: + if s.scanReq != nil { return s.scanReq.EndKey - case kvpb.ReverseScan: - return s.reverseScanReq.EndKey - default: - panic("unexpected") } + return s.reverseScanReq.EndKey } // seq returns the sequence number of the request in response to which the // iterator was created. func (s *respIter) seq() enginepb.TxnSeq { - switch s.method() { - case kvpb.Scan: + if s.scanReq != nil { return s.scanReq.Sequence - case kvpb.ReverseScan: - return s.reverseScanReq.Sequence - default: - panic("unexpected") - } -} - -func (s *respIter) method() kvpb.Method { - switch { - case s.scanReq != nil: - return kvpb.Scan - case s.reverseScanReq != nil: - return kvpb.ReverseScan - default: - panic("unexpected") } + return s.reverseScanReq.Sequence } // respMerger encapsulates state to combine a {,Reverse}ScanResponse, returned @@ -1119,66 +1071,56 @@ type respMerger struct { // reverseScanResp; the other field should be nil. scanResp *kvpb.ScanResponse reverseScanResp *kvpb.ReverseScanResponse - respIdx int + + // rowsIdx tracks the position within Rows slice of the response to be + // populated next. + rowsIdx int } // makeRespMerger constructs and returns a new respMerger. func makeRespMerger(serverSideRespIter *respIter, size int) respMerger { m := respMerger{ serverRespIter: serverSideRespIter, - respIdx: 0, } - switch serverSideRespIter.method() { - case kvpb.Scan: + if serverSideRespIter.scanReq != nil { resp := serverSideRespIter.scanResp.ShallowCopy().(*kvpb.ScanResponse) - switch serverSideRespIter.scanFormat() { - case kvpb.KEY_VALUES: + if serverSideRespIter.scanFormat == kvpb.KEY_VALUES { resp.Rows = make([]roachpb.KeyValue, size) - default: + } else { panic("unexpected") } m.scanResp = resp - case kvpb.ReverseScan: - resp := serverSideRespIter.reverseScanResp.ShallowCopy().(*kvpb.ReverseScanResponse) - switch serverSideRespIter.scanFormat() { - case kvpb.KEY_VALUES: - resp.Rows = make([]roachpb.KeyValue, size) - default: - panic("unexpected") - } - m.reverseScanResp = resp - default: + return m + } + resp := serverSideRespIter.reverseScanResp.ShallowCopy().(*kvpb.ReverseScanResponse) + if serverSideRespIter.scanFormat == kvpb.KEY_VALUES { + resp.Rows = make([]roachpb.KeyValue, size) + } else { panic("unexpected") } + m.reverseScanResp = resp return m } // acceptKV takes a key and a value (presumably from the write buffer) and adds // it to the result set. func (m *respMerger) acceptKV(key roachpb.Key, value *roachpb.Value) { - switch m.serverRespIter.method() { - case kvpb.Scan: - switch m.serverRespIter.scanFormat() { - case kvpb.KEY_VALUES: - m.scanResp.Rows[m.respIdx] = roachpb.KeyValue{ + if m.serverRespIter.scanFormat == kvpb.KEY_VALUES { + if m.serverRespIter.scanReq != nil { + m.scanResp.Rows[m.rowsIdx] = roachpb.KeyValue{ Key: key, Value: *value, } - default: - panic("unexpected") - } - case kvpb.ReverseScan: - switch m.serverRespIter.scanFormat() { - case kvpb.KEY_VALUES: - m.reverseScanResp.Rows[m.respIdx] = roachpb.KeyValue{ + } else { + m.reverseScanResp.Rows[m.rowsIdx] = roachpb.KeyValue{ Key: key, Value: *value, } - default: - panic("unexpected") } + m.rowsIdx++ + return } - m.respIdx++ + panic("unexpected") } // acceptServerResp accepts the current server response and adds it to the @@ -1187,52 +1129,39 @@ func (m *respMerger) acceptKV(key roachpb.Key, value *roachpb.Value) { // Note that the iterator is not moved forward after accepting the response; the // responsibility of doing so, if desired, is the caller's. func (m *respMerger) acceptServerResp() { - switch m.serverRespIter.method() { - case kvpb.Scan: - switch m.serverRespIter.scanFormat() { - case kvpb.KEY_VALUES: - m.scanResp.Rows[m.respIdx] = m.serverRespIter.scanResp.Rows[m.serverRespIter.index] - default: - panic("unexpected") - } - case kvpb.ReverseScan: - switch m.serverRespIter.scanFormat() { - case kvpb.KEY_VALUES: - m.reverseScanResp.Rows[m.respIdx] = m.serverRespIter.reverseScanResp.Rows[m.serverRespIter.index] - default: - panic("unexpected") + if m.serverRespIter.scanFormat == kvpb.KEY_VALUES { + if m.serverRespIter.scanReq != nil { + m.scanResp.Rows[m.rowsIdx] = m.serverRespIter.scanResp.Rows[m.serverRespIter.rowsIndex] + } else { + m.reverseScanResp.Rows[m.rowsIdx] = m.serverRespIter.reverseScanResp.Rows[m.serverRespIter.rowsIndex] } - default: - panic("unexpected") + m.rowsIdx++ + return } - m.respIdx++ + panic("unexpected") } // toScanResp returns the final merged ScanResponse. func (m *respMerger) toScanResp() *kvpb.ScanResponse { - assertTrue(m.serverRespIter.method() == kvpb.Scan, "weren't accumulating a scan resp") + assertTrue(m.serverRespIter.scanReq != nil, "weren't accumulating a scan resp") // If we've done everything correctly, resIdx == len(response rows). - switch m.serverRespIter.scanFormat() { - case kvpb.KEY_VALUES: - assertTrue(m.respIdx == len(m.scanResp.Rows), "did not fill in all rows; did we miscount?") - default: - panic("unexpected") + if m.serverRespIter.scanFormat == kvpb.KEY_VALUES { + assertTrue(m.rowsIdx == len(m.scanResp.Rows), "did not fill in all rows; did we miscount?") + return m.scanResp } - return m.scanResp + panic("unexpected") } // toReverseScanResp returns the final merged ReverseScanResponse. func (m *respMerger) toReverseScanResp() *kvpb.ReverseScanResponse { - assertTrue(m.serverRespIter.method() == kvpb.ReverseScan, - "weren't accumulating a reverse scan resp") + assertTrue(m.serverRespIter.scanReq == nil, "weren't accumulating a reverse scan resp") // If we've done everything correctly, resIdx == len(response rows). - switch m.serverRespIter.scanFormat() { - case kvpb.KEY_VALUES: - assertTrue(m.respIdx == len(m.reverseScanResp.Rows), "did not fill in all rows; did we miscount?") - default: - panic("unexpected") + if m.serverRespIter.scanFormat == kvpb.KEY_VALUES { + assertTrue(m.rowsIdx == len(m.reverseScanResp.Rows), "did not fill in all rows; did we miscount?") + return m.reverseScanResp } - return m.reverseScanResp + panic("unexpected") + } // assertTrue panics with a message if the supplied condition isn't true. From 6ab9b9b42981bd4b289c457af278a2eca37bf24c Mon Sep 17 00:00:00 2001 From: Yahor Yuzefovich Date: Thu, 6 Mar 2025 17:28:17 -0800 Subject: [PATCH 2/5] kvcoord: refactor txnWriteBuffer helpers This commit refactors a couple of helpers that we use when merging Scan and ReverseScan responses with the buffered values. This is done in preparation for supporting BATCH_RESPONSE scan format. In particular: - it introduces a helper struct that is used on the first iteration to calculate the size of the responses - it "disassembles" the responses into its contents (i.e. `Rows` field). This also allows us to remove some code duplication between Scans and ReverseScans. It also exposes the need to adjust NumKeys and NumBytes fields of the response. Addressing that is left as a TODO. Release note: None --- .../kvcoord/txn_interceptor_write_buffer.go | 177 +++++++++--------- 1 file changed, 86 insertions(+), 91 deletions(-) diff --git a/pkg/kv/kvclient/kvcoord/txn_interceptor_write_buffer.go b/pkg/kv/kvclient/kvcoord/txn_interceptor_write_buffer.go index f9ca1988b91b..a60e782936af 100644 --- a/pkg/kv/kvclient/kvcoord/txn_interceptor_write_buffer.go +++ b/pkg/kv/kvclient/kvcoord/txn_interceptor_write_buffer.go @@ -513,20 +513,14 @@ func (twb *txnWriteBuffer) mergeWithScanResp( respIter := newScanRespIter(req, resp) // First, calculate the size of the merged response. This then allows us to - // exactly pre-allocate the response slice when constructing the - // respMerger. - respSize := 0 - twb.mergeBufferAndResp( - respIter, - func(roachpb.Key, *roachpb.Value) { respSize++ }, - func() { respSize++ }, - false, /* reverse */ - ) + // exactly pre-allocate the response slice when constructing the respMerger. + h := makeRespSizeHelper(respIter) + twb.mergeBufferAndResp(respIter, h.acceptBuffer, h.acceptResp, false /* reverse */) respIter.reset() - rm := makeRespMerger(respIter, respSize) + rm := makeRespMerger(respIter, h) twb.mergeBufferAndResp(respIter, rm.acceptKV, rm.acceptServerResp, false /* reverse */) - return rm.toScanResp(), nil + return rm.toScanResp(resp), nil } // mergeWithReverseScanResp takes a ReverseScanRequest, that was sent to the KV @@ -547,20 +541,14 @@ func (twb *txnWriteBuffer) mergeWithReverseScanResp( respIter := newReverseScanRespIter(req, resp) // First, calculate the size of the merged response. This then allows us to - // exactly pre-allocate the response slice when constructing the - // respMerger. - respSize := 0 - twb.mergeBufferAndResp( - respIter, - func(roachpb.Key, *roachpb.Value) { respSize++ }, - func() { respSize++ }, - true, /* reverse */ - ) + // exactly pre-allocate the response slice when constructing the respMerger. + h := makeRespSizeHelper(respIter) + twb.mergeBufferAndResp(respIter, h.acceptBuffer, h.acceptResp, true /* reverse */) respIter.reset() - rm := makeRespMerger(respIter, respSize) + rm := makeRespMerger(respIter, h) twb.mergeBufferAndResp(respIter, rm.acceptKV, rm.acceptServerResp, true /* reverse */) - return rm.toReverseScanResp(), nil + return rm.toReverseScanResp(resp), nil } // mergeBufferAndScanResp merges (think the merge step from merge sort) the @@ -961,16 +949,23 @@ func (bw *bufferedWrite) toRequest() kvpb.RequestUnion { // respIter is an iterator over a scan or reverse scan response returned by // the KV layer. type respIter struct { - // One and only one of scanReq/reverseScanReq (and by extension, - // scanResp/reverseScanResp) should ever be set. - scanReq *kvpb.ScanRequest - scanResp *kvpb.ScanResponse - reverseScanReq *kvpb.ReverseScanRequest - reverseScanResp *kvpb.ReverseScanResponse + // One and only one of scanReq/reverseScanReq should ever be set. + scanReq *kvpb.ScanRequest + reverseScanReq *kvpb.ReverseScanRequest // scanFormat indicates the ScanFormat of the request. Only KEY_VALUES is // supported right now. scanFormat kvpb.ScanFormat + // rows is the Rows field of the corresponding response. + // + // Only set with KEY_VALUES scan format. + rows []roachpb.KeyValue + + // Fields below will be modified when advancing the iterator. + + // rowsIndex is the current index into Rows field of the response. + // + // Used in the KEY_VALUES scan format. rowsIndex int } @@ -982,8 +977,8 @@ func newScanRespIter(req *kvpb.ScanRequest, resp *kvpb.ScanResponse) *respIter { } return &respIter{ scanReq: req, - scanResp: resp, scanFormat: req.ScanFormat, + rows: resp.Rows, } } @@ -996,19 +991,16 @@ func newReverseScanRespIter( panic("unexpected") } return &respIter{ - reverseScanReq: req, - reverseScanResp: resp, - scanFormat: req.ScanFormat, + reverseScanReq: req, + scanFormat: req.ScanFormat, + rows: resp.Rows, } } // peekKey returns the key at the current iterator position. func (s *respIter) peekKey() roachpb.Key { if s.scanFormat == kvpb.KEY_VALUES { - if s.scanReq != nil { - return s.scanResp.Rows[s.rowsIndex].Key - } - return s.reverseScanResp.Rows[s.rowsIndex].Key + return s.rows[s.rowsIndex].Key } panic("unexpected") } @@ -1021,10 +1013,7 @@ func (s *respIter) next() { // valid returns whether the iterator is (still) positioned to a valid index. func (s *respIter) valid() bool { if s.scanFormat == kvpb.KEY_VALUES { - if s.scanReq != nil { - return s.rowsIndex < len(s.scanResp.Rows) - } - return s.rowsIndex < len(s.reverseScanResp.Rows) + return s.rowsIndex < len(s.rows) } panic("unexpected") } @@ -1061,61 +1050,64 @@ func (s *respIter) seq() enginepb.TxnSeq { return s.reverseScanReq.Sequence } +type respSizeHelper struct { + it *respIter + + // rowsSize tracks the total number of KVs that we'll include in the Rows + // field of the merged {,Reverse}ScanResponse when KEY_VALUES scan format is + // used. + rowsSize int +} + +func makeRespSizeHelper(it *respIter) respSizeHelper { + return respSizeHelper{it: it} +} + +func (h *respSizeHelper) acceptBuffer(roachpb.Key, *roachpb.Value) { + h.rowsSize++ +} + +func (h *respSizeHelper) acceptResp() { + h.rowsSize++ +} + // respMerger encapsulates state to combine a {,Reverse}ScanResponse, returned // by the KV layer, with any overlapping buffered writes to correctly uphold // read-your-own-write semantics. It can be used to accumulate a response when // merging a {,Reverse}ScanResponse with buffered writes. type respMerger struct { serverRespIter *respIter - // We should only ever be accumulating either one of scanResp or - // reverseScanResp; the other field should be nil. - scanResp *kvpb.ScanResponse - reverseScanResp *kvpb.ReverseScanResponse - // rowsIdx tracks the position within Rows slice of the response to be + // rows is the Rows field of the corresponding response. The merged response + // will be accumulated here first before being injected into one of the + // response structs. + // + // Only populated with KEY_VALUES scan format. + rows []roachpb.KeyValue + + // rowsIdx tracks the position within rows slice of the response to be // populated next. rowsIdx int } // makeRespMerger constructs and returns a new respMerger. -func makeRespMerger(serverSideRespIter *respIter, size int) respMerger { - m := respMerger{ - serverRespIter: serverSideRespIter, - } - if serverSideRespIter.scanReq != nil { - resp := serverSideRespIter.scanResp.ShallowCopy().(*kvpb.ScanResponse) - if serverSideRespIter.scanFormat == kvpb.KEY_VALUES { - resp.Rows = make([]roachpb.KeyValue, size) - } else { - panic("unexpected") - } - m.scanResp = resp - return m - } - resp := serverSideRespIter.reverseScanResp.ShallowCopy().(*kvpb.ReverseScanResponse) - if serverSideRespIter.scanFormat == kvpb.KEY_VALUES { - resp.Rows = make([]roachpb.KeyValue, size) - } else { +func makeRespMerger(serverSideRespIter *respIter, h respSizeHelper) respMerger { + if serverSideRespIter.scanFormat != kvpb.KEY_VALUES { panic("unexpected") } - m.reverseScanResp = resp - return m + return respMerger{ + serverRespIter: serverSideRespIter, + rows: make([]roachpb.KeyValue, h.rowsSize), + } } // acceptKV takes a key and a value (presumably from the write buffer) and adds // it to the result set. func (m *respMerger) acceptKV(key roachpb.Key, value *roachpb.Value) { if m.serverRespIter.scanFormat == kvpb.KEY_VALUES { - if m.serverRespIter.scanReq != nil { - m.scanResp.Rows[m.rowsIdx] = roachpb.KeyValue{ - Key: key, - Value: *value, - } - } else { - m.reverseScanResp.Rows[m.rowsIdx] = roachpb.KeyValue{ - Key: key, - Value: *value, - } + m.rows[m.rowsIdx] = roachpb.KeyValue{ + Key: key, + Value: *value, } m.rowsIdx++ return @@ -1130,38 +1122,41 @@ func (m *respMerger) acceptKV(key roachpb.Key, value *roachpb.Value) { // responsibility of doing so, if desired, is the caller's. func (m *respMerger) acceptServerResp() { if m.serverRespIter.scanFormat == kvpb.KEY_VALUES { - if m.serverRespIter.scanReq != nil { - m.scanResp.Rows[m.rowsIdx] = m.serverRespIter.scanResp.Rows[m.serverRespIter.rowsIndex] - } else { - m.reverseScanResp.Rows[m.rowsIdx] = m.serverRespIter.reverseScanResp.Rows[m.serverRespIter.rowsIndex] - } + m.rows[m.rowsIdx] = m.serverRespIter.rows[m.serverRespIter.rowsIndex] m.rowsIdx++ return } panic("unexpected") } -// toScanResp returns the final merged ScanResponse. -func (m *respMerger) toScanResp() *kvpb.ScanResponse { +// toScanResp populates a copy of the given response with the final merged +// state. +func (m *respMerger) toScanResp(resp *kvpb.ScanResponse) *kvpb.ScanResponse { assertTrue(m.serverRespIter.scanReq != nil, "weren't accumulating a scan resp") - // If we've done everything correctly, resIdx == len(response rows). + // TODO(yuzefovich): we need to update NumKeys and NumBytes. + result := resp.ShallowCopy().(*kvpb.ScanResponse) if m.serverRespIter.scanFormat == kvpb.KEY_VALUES { - assertTrue(m.rowsIdx == len(m.scanResp.Rows), "did not fill in all rows; did we miscount?") - return m.scanResp + // If we've done everything correctly, resIdx == len(response rows). + assertTrue(m.rowsIdx == len(m.rows), "did not fill in all rows; did we miscount?") + result.Rows = m.rows + return result } panic("unexpected") } -// toReverseScanResp returns the final merged ReverseScanResponse. -func (m *respMerger) toReverseScanResp() *kvpb.ReverseScanResponse { +// toReverseScanResp populates a copy of the given response with the final +// merged state. +func (m *respMerger) toReverseScanResp(resp *kvpb.ReverseScanResponse) *kvpb.ReverseScanResponse { assertTrue(m.serverRespIter.scanReq == nil, "weren't accumulating a reverse scan resp") - // If we've done everything correctly, resIdx == len(response rows). + // TODO(yuzefovich): we need to update NumKeys and NumBytes. + result := resp.ShallowCopy().(*kvpb.ReverseScanResponse) if m.serverRespIter.scanFormat == kvpb.KEY_VALUES { - assertTrue(m.rowsIdx == len(m.reverseScanResp.Rows), "did not fill in all rows; did we miscount?") - return m.reverseScanResp + // If we've done everything correctly, resIdx == len(response rows). + assertTrue(m.rowsIdx == len(m.rows), "did not fill in all rows; did we miscount?") + result.Rows = m.rows + return result } panic("unexpected") - } // assertTrue panics with a message if the supplied condition isn't true. From eb2d2a7842977ca04f4835739f00e467bad6ce45 Mon Sep 17 00:00:00 2001 From: Yahor Yuzefovich Date: Thu, 6 Mar 2025 19:30:55 -0800 Subject: [PATCH 3/5] kvcoord: support BATCH_RESPONSE scan format with buffered writes This commit extends the txnWriteBuffer helper structs to support BATCH_RESPONSE scan format of Scans and ReverseScans. In this format `BatchResponses` field contains `[][]byte` slices where a single `[]byte` slice can have multiple KVs in the "engine" format. (Each `[]byte` is growing in size exponentially, until a hard limit, for large scans.) The contribution of this commit is two-fold: - the size helper struct is responsible for calculating the precise capacity that each `[]byte` is needed in the merged response - during the main iteration, when accepting the KV from the buffer we need to write it in the same "engine" format as the KV server did. For simplicity, this commit keeps the hierarchy of slices in `BatchResponses` unchanged, i.e. we inject all buffered KVs into the "current position" of `BatchResponses[i]` from the server response. This means that in the extreme case when the server response is empty, all buffered KVs will be included in a single slice. This could be suboptimial from memory GC perspective, but adding a smarter heuristic is left as a TODO. Release note: None --- pkg/kv/kvclient/kvcoord/BUILD.bazel | 1 + .../kvcoord/txn_interceptor_write_buffer.go | 226 +++++++++++++++--- .../txn_interceptor_write_buffer_test.go | 152 +++++++----- pkg/kv/kvclient/kvcoord/txn_test.go | 92 +++++-- 4 files changed, 358 insertions(+), 113 deletions(-) diff --git a/pkg/kv/kvclient/kvcoord/BUILD.bazel b/pkg/kv/kvclient/kvcoord/BUILD.bazel index 8cc68c157d35..93dd018800a1 100644 --- a/pkg/kv/kvclient/kvcoord/BUILD.bazel +++ b/pkg/kv/kvclient/kvcoord/BUILD.bazel @@ -62,6 +62,7 @@ go_library( "//pkg/sql/pgwire/pgcode", "//pkg/sql/pgwire/pgerror", "//pkg/storage/enginepb", + "//pkg/storage/mvccencoding", "//pkg/util", "//pkg/util/admission/admissionpb", "//pkg/util/buildutil", diff --git a/pkg/kv/kvclient/kvcoord/txn_interceptor_write_buffer.go b/pkg/kv/kvclient/kvcoord/txn_interceptor_write_buffer.go index a60e782936af..766650fdcbec 100644 --- a/pkg/kv/kvclient/kvcoord/txn_interceptor_write_buffer.go +++ b/pkg/kv/kvclient/kvcoord/txn_interceptor_write_buffer.go @@ -7,12 +7,14 @@ package kvcoord import ( "context" + "encoding/binary" "github.com/cockroachdb/cockroach/pkg/kv/kvpb" "github.com/cockroachdb/cockroach/pkg/kv/kvserver/concurrency/lock" "github.com/cockroachdb/cockroach/pkg/roachpb" "github.com/cockroachdb/cockroach/pkg/settings" "github.com/cockroachdb/cockroach/pkg/storage/enginepb" + "github.com/cockroachdb/cockroach/pkg/storage/mvccencoding" "github.com/cockroachdb/cockroach/pkg/util/log" "github.com/cockroachdb/errors" ) @@ -506,10 +508,6 @@ func (twb *txnWriteBuffer) mergeWithScanResp( return nil, errors.AssertionFailedf("unexpectedly called mergeWithScanResp on a ScanRequest " + "with COL_BATCH_RESPONSE scan format") } - if req.ScanFormat == kvpb.BATCH_RESPONSE { - // TODO(arul): See pebbleResults.put for how this should be done. - return nil, errors.AssertionFailedf("unimplemented") - } respIter := newScanRespIter(req, resp) // First, calculate the size of the merged response. This then allows us to @@ -534,10 +532,6 @@ func (twb *txnWriteBuffer) mergeWithReverseScanResp( return nil, errors.AssertionFailedf("unexpectedly called mergeWithReverseScanResp on a " + "ReverseScanRequest with COL_BATCH_RESPONSE scan format") } - if req.ScanFormat == kvpb.BATCH_RESPONSE { - // TODO(arul): See pebbleResults.put for how this should be done. - return nil, errors.AssertionFailedf("unimplemented") - } respIter := newReverseScanRespIter(req, resp) // First, calculate the size of the merged response. This then allows us to @@ -557,6 +551,9 @@ func (twb *txnWriteBuffer) mergeWithReverseScanResp( // function, based on which KV pair should be preferred[2] by the combined // response. // +// Note that acceptBuffer and acceptResp functions should not advance the +// iterator. acceptResp will only be called when respIter is in valid state. +// // [1] Forward or reverse order, depending on the direction of the scan. // [2] See inline comments for more details on what "preferred" means. func (twb *txnWriteBuffer) mergeBufferAndResp( @@ -946,20 +943,72 @@ func (bw *bufferedWrite) toRequest() kvpb.RequestUnion { return ru } +// getKey reads the key for the next KV from a slice of BatchResponses field of +// {,Reverse}ScanResponse. The KV is encoded in the following format: +// +// +// +// Furthermore, MVCC timestamp might be included in the suffix of part, so +// we need to split it away. +// +// The method assumes that the encoding is valid. +func getKey(br []byte) []byte { + lenKey := int(binary.LittleEndian.Uint32(br[4:8])) + key, _, _ := enginepb.SplitMVCCKey(br[8 : 8+lenKey]) + return key +} + +// getFirstKVLength returns the number of bytes used to encode the first KV from +// the given slice (which is assumed to have come from BatchResponses field of +// {,Reverse}ScanResponse). +func getFirstKVLength(br []byte) int { + // See comment on getKey for more details. + lenValue := int(binary.LittleEndian.Uint32(br[0:4])) + lenKey := int(binary.LittleEndian.Uint32(br[4:8])) + return 8 + lenKey + lenValue +} + +// encKVLength returns the number of bytes that will be required to encode the +// given key/value pair as well as just encoding length of the key (including +// the timestamp). +func encKVLength(key roachpb.Key, value *roachpb.Value) (lenKV, lenKey int) { + // See comment on getKey for more details. + lenKey = mvccencoding.EncodedMVCCKeyLength(key, value.Timestamp) + lenKV = 8 + lenKey + len(value.RawBytes) + return lenKV, lenKey +} + +// appendKV appends the given key/value pair to the provided slice. It is +// assumed that the slice already has enough capacity. The updated slice is +// returned. +func appendKV(toAppend []byte, key roachpb.Key, value *roachpb.Value) []byte { + lenKV, lenKey := encKVLength(key, value) + buf := toAppend[len(toAppend) : len(toAppend)+lenKV] + binary.LittleEndian.PutUint32(buf[0:4], uint32(len(value.RawBytes))) + binary.LittleEndian.PutUint32(buf[4:8], uint32(lenKey)) + mvccencoding.EncodeMVCCKeyToBufSized(buf[8:8+lenKey], key, value.Timestamp, lenKey) + copy(buf[8+lenKey:], value.RawBytes) + return toAppend[:len(toAppend)+lenKV] +} + // respIter is an iterator over a scan or reverse scan response returned by // the KV layer. type respIter struct { // One and only one of scanReq/reverseScanReq should ever be set. scanReq *kvpb.ScanRequest reverseScanReq *kvpb.ReverseScanRequest - // scanFormat indicates the ScanFormat of the request. Only KEY_VALUES is - // supported right now. + // scanFormat indicates the ScanFormat of the request. Only KEY_VALUES and + // BATCH_RESPONSE are supported right now. scanFormat kvpb.ScanFormat // rows is the Rows field of the corresponding response. // // Only set with KEY_VALUES scan format. rows []roachpb.KeyValue + // batchResponses is the BatchResponses field of the corresponding response. + // + // Only set with BATCH_RESPONSE scan format. + batchResponses [][]byte // Fields below will be modified when advancing the iterator. @@ -967,18 +1016,30 @@ type respIter struct { // // Used in the KEY_VALUES scan format. rowsIndex int + // brIndex and brOffset describe the current position within BatchResponses + // field of the response. The next KV starts at + // batchResponses[brIndex][brOffset]. When the end of the slice is reached, + // brIndex is incremented and brOffset is reset to 0. + // + // Additionally, brIndex controls which BatchResponses[i] slice KVs are + // being written into when merging the response with the buffered writes. + // + // Used in the BATCH_RESPONSE scan format. + brIndex int + brOffset int } // newScanRespIter constructs and returns a new iterator to iterate over a // ScanRequest/Response. func newScanRespIter(req *kvpb.ScanRequest, resp *kvpb.ScanResponse) *respIter { - if req.ScanFormat != kvpb.KEY_VALUES { + if req.ScanFormat != kvpb.KEY_VALUES && req.ScanFormat != kvpb.BATCH_RESPONSE { panic("unexpected") } return &respIter{ - scanReq: req, - scanFormat: req.ScanFormat, - rows: resp.Rows, + scanReq: req, + scanFormat: req.ScanFormat, + rows: resp.Rows, + batchResponses: resp.BatchResponses, } } @@ -987,27 +1048,42 @@ func newScanRespIter(req *kvpb.ScanRequest, resp *kvpb.ScanResponse) *respIter { func newReverseScanRespIter( req *kvpb.ReverseScanRequest, resp *kvpb.ReverseScanResponse, ) *respIter { - if req.ScanFormat != kvpb.KEY_VALUES { + if req.ScanFormat != kvpb.KEY_VALUES && req.ScanFormat != kvpb.BATCH_RESPONSE { panic("unexpected") } return &respIter{ reverseScanReq: req, scanFormat: req.ScanFormat, rows: resp.Rows, + batchResponses: resp.BatchResponses, } } // peekKey returns the key at the current iterator position. +// +// peekKey should only be called if the iterator is in valid state (i.e. +// valid() returned true). func (s *respIter) peekKey() roachpb.Key { if s.scanFormat == kvpb.KEY_VALUES { return s.rows[s.rowsIndex].Key } - panic("unexpected") + return getKey(s.batchResponses[s.brIndex][s.brOffset:]) } // next moves the iterator forward. +// +// next should only be called if the iterator is in valid state (i.e. valid() +// returned true). func (s *respIter) next() { - s.rowsIndex++ + if s.scanFormat == kvpb.KEY_VALUES { + s.rowsIndex++ + return + } + s.brOffset += getFirstKVLength(s.batchResponses[s.brIndex][s.brOffset:]) + if s.brOffset >= len(s.batchResponses[s.brIndex]) { + s.brIndex++ + s.brOffset = 0 + } } // valid returns whether the iterator is (still) positioned to a valid index. @@ -1015,12 +1091,14 @@ func (s *respIter) valid() bool { if s.scanFormat == kvpb.KEY_VALUES { return s.rowsIndex < len(s.rows) } - panic("unexpected") + return s.brIndex < len(s.batchResponses) } // reset re-positions the iterator to the beginning of the response. func (s *respIter) reset() { s.rowsIndex = 0 + s.brIndex = 0 + s.brOffset = 0 } // startKey returns the start key of the request in response to which the @@ -1057,18 +1135,51 @@ type respSizeHelper struct { // field of the merged {,Reverse}ScanResponse when KEY_VALUES scan format is // used. rowsSize int + // batchResponseSize tracks the lengths of each []byte that we'll include in + // the BatchResponses field of the merged {,Reverse}ScanResponse when + // BATCH_RESPONSE scan format is used. + // + // At the moment, we'll rely on the "structure" produced by the server + // meaning that we'll "inject" the buffered KVs into responses from the + // server while maintaining the "layering" of slices. In the extreme case + // when the server produced an empty response this means that we'll include + // all buffered KVs in a single slice. + // TODO(yuzefovich): add better sizing heuristic which will allow for faster + // garbage collection of already processed KVs by the SQL layer. + // + // Length of this slice is always 1 greater than the length of the + // BatchResponses field from the server response in order to include a + // "spill-over" slice - this is done to accommodate any buffered writes + // after the server response is fully processed. + batchResponseSize []int } func makeRespSizeHelper(it *respIter) respSizeHelper { - return respSizeHelper{it: it} + h := respSizeHelper{it: it} + if it.scanFormat == kvpb.BATCH_RESPONSE { + h.batchResponseSize = make([]int, len(it.batchResponses)+1) + } + return h } -func (h *respSizeHelper) acceptBuffer(roachpb.Key, *roachpb.Value) { - h.rowsSize++ +func (h *respSizeHelper) acceptBuffer(key roachpb.Key, value *roachpb.Value) { + if h.it.scanFormat == kvpb.KEY_VALUES { + h.rowsSize++ + return + } + lenKV, _ := encKVLength(key, value) + // Note that this will always be in bounds even when h.it is no longer + // valid (due to the "spill-over" slice). + h.batchResponseSize[h.it.brIndex] += lenKV } func (h *respSizeHelper) acceptResp() { - h.rowsSize++ + if h.it.scanFormat == kvpb.KEY_VALUES { + h.rowsSize++ + return + } + br := h.it.batchResponses[h.it.brIndex][h.it.brOffset:] + h.batchResponseSize[h.it.brIndex] += getFirstKVLength(br) } // respMerger encapsulates state to combine a {,Reverse}ScanResponse, returned @@ -1088,23 +1199,40 @@ type respMerger struct { // rowsIdx tracks the position within rows slice of the response to be // populated next. rowsIdx int + + // batchResponses is the BatchResponses field of the corresponding response. + // The merged response will be accumulated here first before being injected + // into one of the response structs. + // + // Only populated with BATCH_RESPONSE scan format. + // + // Note that unlike for rows, we don't have any position tracking in this + // struct for batchResponses -- this is because we reuse respIter.brIndex to + // indicate which []byte to write into. + batchResponses [][]byte } // makeRespMerger constructs and returns a new respMerger. func makeRespMerger(serverSideRespIter *respIter, h respSizeHelper) respMerger { - if serverSideRespIter.scanFormat != kvpb.KEY_VALUES { - panic("unexpected") - } - return respMerger{ + m := respMerger{ serverRespIter: serverSideRespIter, - rows: make([]roachpb.KeyValue, h.rowsSize), } + if serverSideRespIter.scanFormat == kvpb.KEY_VALUES { + m.rows = make([]roachpb.KeyValue, h.rowsSize) + } else { + m.batchResponses = make([][]byte, len(h.batchResponseSize)) + for i, size := range h.batchResponseSize { + m.batchResponses[i] = make([]byte, 0, size) + } + } + return m } // acceptKV takes a key and a value (presumably from the write buffer) and adds // it to the result set. func (m *respMerger) acceptKV(key roachpb.Key, value *roachpb.Value) { - if m.serverRespIter.scanFormat == kvpb.KEY_VALUES { + it := m.serverRespIter + if it.scanFormat == kvpb.KEY_VALUES { m.rows[m.rowsIdx] = roachpb.KeyValue{ Key: key, Value: *value, @@ -1112,21 +1240,23 @@ func (m *respMerger) acceptKV(key roachpb.Key, value *roachpb.Value) { m.rowsIdx++ return } - panic("unexpected") + // Note that this will always be in bounds even when the server resp + // iterator is no longer valid (due to the "spill-over" slice). + m.batchResponses[it.brIndex] = appendKV(m.batchResponses[it.brIndex], key, value) } // acceptServerResp accepts the current server response and adds it to the // result set. -// -// Note that the iterator is not moved forward after accepting the response; the -// responsibility of doing so, if desired, is the caller's. func (m *respMerger) acceptServerResp() { - if m.serverRespIter.scanFormat == kvpb.KEY_VALUES { - m.rows[m.rowsIdx] = m.serverRespIter.rows[m.serverRespIter.rowsIndex] + it := m.serverRespIter + if it.scanFormat == kvpb.KEY_VALUES { + m.rows[m.rowsIdx] = it.rows[it.rowsIndex] m.rowsIdx++ return } - panic("unexpected") + br := it.batchResponses[it.brIndex][it.brOffset:] + toAppend := br[:getFirstKVLength(br)] + m.batchResponses[it.brIndex] = append(m.batchResponses[it.brIndex], toAppend...) } // toScanResp populates a copy of the given response with the final merged @@ -1140,8 +1270,19 @@ func (m *respMerger) toScanResp(resp *kvpb.ScanResponse) *kvpb.ScanResponse { assertTrue(m.rowsIdx == len(m.rows), "did not fill in all rows; did we miscount?") result.Rows = m.rows return result + } else { + // If we've done everything correctly, then each BatchResponses[i] slice + // should've been filled up to capacity. + for _, br := range m.batchResponses { + assertTrue(len(br) == cap(br), "incorrect calculation of BatchResponses[i] slice capacity") + } + if lastIdx := len(m.batchResponses) - 1; lastIdx > 0 && len(m.batchResponses[lastIdx]) == 0 { + // If we didn't use the "spill-over" slice, then remove it. + m.batchResponses = m.batchResponses[:lastIdx] + } + result.BatchResponses = m.batchResponses + return result } - panic("unexpected") } // toReverseScanResp populates a copy of the given response with the final @@ -1155,8 +1296,19 @@ func (m *respMerger) toReverseScanResp(resp *kvpb.ReverseScanResponse) *kvpb.Rev assertTrue(m.rowsIdx == len(m.rows), "did not fill in all rows; did we miscount?") result.Rows = m.rows return result + } else { + // If we've done everything correctly, then each BatchResponses[i] slice + // should've been filled up to capacity. + for _, br := range m.batchResponses { + assertTrue(len(br) == cap(br), "incorrect calculation of BatchResponses[i] slice capacity") + } + if lastIdx := len(m.batchResponses) - 1; lastIdx > 0 && len(m.batchResponses[lastIdx]) == 0 { + // If we didn't use the "spill-over" slice, then remove it. + m.batchResponses = m.batchResponses[:lastIdx] + } + result.BatchResponses = m.batchResponses + return result } - panic("unexpected") } // assertTrue panics with a message if the supplied condition isn't true. diff --git a/pkg/kv/kvclient/kvcoord/txn_interceptor_write_buffer_test.go b/pkg/kv/kvclient/kvcoord/txn_interceptor_write_buffer_test.go index c72ee20b6a31..a901961e97b6 100644 --- a/pkg/kv/kvclient/kvcoord/txn_interceptor_write_buffer_test.go +++ b/pkg/kv/kvclient/kvcoord/txn_interceptor_write_buffer_test.go @@ -15,6 +15,7 @@ import ( "github.com/cockroachdb/cockroach/pkg/roachpb" "github.com/cockroachdb/cockroach/pkg/storage/enginepb" "github.com/cockroachdb/cockroach/pkg/testutils" + "github.com/cockroachdb/cockroach/pkg/util/hlc" "github.com/cockroachdb/cockroach/pkg/util/leaktest" "github.com/cockroachdb/cockroach/pkg/util/log" "github.com/cockroachdb/errors" @@ -660,8 +661,9 @@ func TestTxnWriteBufferServesPointReadsLocally(t *testing.T) { require.IsType(t, &kvpb.EndTxnResponse{}, br.Responses[0].GetInner()) } -// TestTxnWriteBufferServesOverlappingReadsCorrectly ensures that Scan requests -// that overlap with buffered writes are correctly served from the buffer. +// TestTxnWriteBufferServesOverlappingReadsCorrectly ensures that Scan and +// ReverseScan requests that overlap with buffered writes are correctly served +// from the buffer. func TestTxnWriteBufferServesOverlappingReadsCorrectly(t *testing.T) { defer leaktest.AfterTest(t)() defer log.Scope(t).Close(t) @@ -713,7 +715,7 @@ func TestTxnWriteBufferServesOverlappingReadsCorrectly(t *testing.T) { // Verify the writes were buffered correctly. expBufferedWrites := []bufferedWrite{ - makeBufferedWrite(keyA, makeBufferedValue("valA10", 10)), + makeBufferedWrite(keyA, makeBufferedValue(valA10, 10)), makeBufferedWrite(keyB, makeBufferedValue("", 10)), } require.Equal(t, expBufferedWrites, twb.testingBufferedWritesAsSlice()) @@ -733,21 +735,101 @@ func TestTxnWriteBufferServesOverlappingReadsCorrectly(t *testing.T) { } require.Equal(t, expBufferedWrites, twb.testingBufferedWritesAsSlice()) - // First up, perform reads on key A at various sequence numbers and ensure the - // correct value is served from the buffer. - for seq, expVal := range map[enginepb.TxnSeq]string{ - 10: valA10, 11: valA10, 12: valA12, 13: valA12, 14: valA14, 15: valA14, + for _, tc := range []struct { + scanFormat kvpb.ScanFormat + numKVs func(rows []roachpb.KeyValue, batchResponses [][]byte) int + firstKV func(rows []roachpb.KeyValue, batchResponses [][]byte) (roachpb.Key, roachpb.Value) + }{ + { + scanFormat: kvpb.KEY_VALUES, + numKVs: func(rows []roachpb.KeyValue, _ [][]byte) int { + return len(rows) + }, + firstKV: func(rows []roachpb.KeyValue, _ [][]byte) (roachpb.Key, roachpb.Value) { + return rows[0].Key, rows[0].Value + }, + }, + { + scanFormat: kvpb.BATCH_RESPONSE, + numKVs: func(_ []roachpb.KeyValue, batchResponses [][]byte) int { + var numKVs int + err := enginepb.ScanDecodeKeyValues(batchResponses, func([]byte, hlc.Timestamp, []byte) error { + numKVs++ + return nil + }) + require.NoError(t, err) + return numKVs + }, + firstKV: func(_ []roachpb.KeyValue, batchResponses [][]byte) (roachpb.Key, roachpb.Value) { + key, rawBytes, _, err := enginepb.ScanDecodeKeyValueNoTS(batchResponses[0]) + require.NoError(t, err) + return key, roachpb.Value{RawBytes: rawBytes} + }, + }, } { + // First up, perform reads on key A at various sequence numbers and + // ensure the correct value is served from the buffer. + for seq, expVal := range map[enginepb.TxnSeq]string{ + 10: valA10, 11: valA10, 12: valA12, 13: valA12, 14: valA14, 15: valA14, + } { + ba = &kvpb.BatchRequest{} + txn.Sequence = seq + ba.Header = kvpb.Header{Txn: &txn} + scan := &kvpb.ScanRequest{ + RequestHeader: kvpb.RequestHeader{Key: keyA, EndKey: keyC, Sequence: txn.Sequence}, + ScanFormat: tc.scanFormat, + } + reverseScan := &kvpb.ReverseScanRequest{ + RequestHeader: kvpb.RequestHeader{Key: keyA, EndKey: keyC, Sequence: txn.Sequence}, + ScanFormat: tc.scanFormat, + } + ba.Add(scan) + ba.Add(reverseScan) + + numCalled = mockSender.NumCalled() + mockSender.MockSend(func(ba *kvpb.BatchRequest) (*kvpb.BatchResponse, *kvpb.Error) { + require.Len(t, ba.Requests, 2) + require.IsType(t, &kvpb.ScanRequest{}, ba.Requests[0].GetInner()) + require.IsType(t, &kvpb.ReverseScanRequest{}, ba.Requests[1].GetInner()) + + br = ba.CreateReply() + br.Txn = ba.Txn + return br, nil + }) + + br, pErr = twb.SendLocked(ctx, ba) + require.Nil(t, pErr) + require.NotNil(t, br) + require.Len(t, br.Responses, 2) + // There should only be a single response, for Key A, as Key B was + // deleted. + sr := br.Responses[0].GetInner().(*kvpb.ScanResponse) + require.Equal(t, 1, tc.numKVs(sr.Rows, sr.BatchResponses)) + rsr := br.Responses[1].GetInner().(*kvpb.ReverseScanResponse) + require.Equal(t, 1, tc.numKVs(rsr.Rows, rsr.BatchResponses)) + sk, sv := tc.firstKV(sr.Rows, sr.BatchResponses) + require.Equal(t, keyA, sk) + require.Equal(t, roachpb.MakeValueFromString(expVal), sv) + rsk, rsv := tc.firstKV(rsr.Rows, rsr.BatchResponses) + require.Equal(t, keyA, rsk) + require.Equal(t, roachpb.MakeValueFromString(expVal), rsv) + // Assert that the request was sent to the KV layer. + require.Equal(t, mockSender.NumCalled(), numCalled+1) + } + + // Perform a scan at a lower sequence number than the minimum buffered + // write. This should be sent to the KV layer, like above, but the + // result shouldn't include any buffered writes. + txn.Sequence = 9 ba = &kvpb.BatchRequest{} - txn.Sequence = seq ba.Header = kvpb.Header{Txn: &txn} scan := &kvpb.ScanRequest{ RequestHeader: kvpb.RequestHeader{Key: keyA, EndKey: keyC, Sequence: txn.Sequence}, - ScanFormat: kvpb.KEY_VALUES, + ScanFormat: tc.scanFormat, } reverseScan := &kvpb.ReverseScanRequest{ RequestHeader: kvpb.RequestHeader{Key: keyA, EndKey: keyC, Sequence: txn.Sequence}, - ScanFormat: kvpb.KEY_VALUES, + ScanFormat: tc.scanFormat, } ba.Add(scan) ba.Add(reverseScan) @@ -762,59 +844,19 @@ func TestTxnWriteBufferServesOverlappingReadsCorrectly(t *testing.T) { br.Txn = ba.Txn return br, nil }) - br, pErr = twb.SendLocked(ctx, ba) require.Nil(t, pErr) require.NotNil(t, br) require.Len(t, br.Responses, 2) - // There should only be a single response, for Key A, as Key B was deleted. - require.Len(t, br.Responses[0].GetInner().(*kvpb.ScanResponse).Rows, 1) - require.Len(t, br.Responses[1].GetInner().(*kvpb.ReverseScanResponse).Rows, 1) - require.Equal(t, keyA, br.Responses[0].GetInner().(*kvpb.ScanResponse).Rows[0].Key) - require.Equal(t, roachpb.MakeValueFromString(expVal), br.Responses[0].GetInner().(*kvpb.ScanResponse).Rows[0].Value) - require.Equal(t, keyA, br.Responses[1].GetInner().(*kvpb.ReverseScanResponse).Rows[0].Key) - require.Equal(t, roachpb.MakeValueFromString(expVal), br.Responses[1].GetInner().(*kvpb.ReverseScanResponse).Rows[0].Value) + // Assert that no buffered write was returned. + sr := br.Responses[0].GetInner().(*kvpb.ScanResponse) + require.Equal(t, 0, tc.numKVs(sr.Rows, sr.BatchResponses)) + rsr := br.Responses[1].GetInner().(*kvpb.ReverseScanResponse) + require.Equal(t, 0, tc.numKVs(rsr.Rows, rsr.BatchResponses)) // Assert that the request was sent to the KV layer. require.Equal(t, mockSender.NumCalled(), numCalled+1) } - // Perform a scan at a lower sequence number than the minimum buffered write. - // This should be sent to the KV layer, like above, but the result shouldn't - // include any buffered writes. - txn.Sequence = 9 - ba = &kvpb.BatchRequest{} - ba.Header = kvpb.Header{Txn: &txn} - scan := &kvpb.ScanRequest{ - RequestHeader: kvpb.RequestHeader{Key: keyA, EndKey: keyC, Sequence: txn.Sequence}, - ScanFormat: kvpb.KEY_VALUES, - } - reverseScan := &kvpb.ReverseScanRequest{ - RequestHeader: kvpb.RequestHeader{Key: keyA, EndKey: keyC, Sequence: txn.Sequence}, - ScanFormat: kvpb.KEY_VALUES, - } - ba.Add(scan) - ba.Add(reverseScan) - - numCalled = mockSender.NumCalled() - mockSender.MockSend(func(ba *kvpb.BatchRequest) (*kvpb.BatchResponse, *kvpb.Error) { - require.Len(t, ba.Requests, 2) - require.IsType(t, &kvpb.ScanRequest{}, ba.Requests[0].GetInner()) - require.IsType(t, &kvpb.ReverseScanRequest{}, ba.Requests[1].GetInner()) - - br = ba.CreateReply() - br.Txn = ba.Txn - return br, nil - }) - br, pErr = twb.SendLocked(ctx, ba) - require.Nil(t, pErr) - require.NotNil(t, br) - require.Len(t, br.Responses, 2) - // Assert that no buffered write was returned. - require.Len(t, br.Responses[0].GetInner().(*kvpb.ScanResponse).Rows, 0) - require.Len(t, br.Responses[1].GetInner().(*kvpb.ReverseScanResponse).Rows, 0) - // Assert that the request was sent to the KV layer. - require.Equal(t, mockSender.NumCalled(), numCalled+1) - // Lastly, for completeness, commit the transaction and ensure that the buffer // is correctly flushed. ba = &kvpb.BatchRequest{} diff --git a/pkg/kv/kvclient/kvcoord/txn_test.go b/pkg/kv/kvclient/kvcoord/txn_test.go index d0a3fb8e4cc0..4ea7a8d4db45 100644 --- a/pkg/kv/kvclient/kvcoord/txn_test.go +++ b/pkg/kv/kvclient/kvcoord/txn_test.go @@ -25,6 +25,7 @@ import ( "github.com/cockroachdb/cockroach/pkg/kv/kvserver/kvserverbase" "github.com/cockroachdb/cockroach/pkg/kv/kvserver/tscache" "github.com/cockroachdb/cockroach/pkg/roachpb" + "github.com/cockroachdb/cockroach/pkg/storage" "github.com/cockroachdb/cockroach/pkg/storage/enginepb" "github.com/cockroachdb/cockroach/pkg/testutils" "github.com/cockroachdb/cockroach/pkg/testutils/kvclientutils" @@ -1709,6 +1710,19 @@ func TestTxnBufferedWritesOverlappingScan(t *testing.T) { s := createTestDB(t) defer s.Stop() + extractKVs := func(rows []roachpb.KeyValue, batchResponses [][]byte) []roachpb.KeyValue { + if rows != nil { + return rows + } + var kvs []roachpb.KeyValue + err := storage.MVCCScanDecodeKeyValues(batchResponses, func(key storage.MVCCKey, rawBytes []byte) error { + kvs = append(kvs, roachpb.KeyValue{Key: key.Key, Value: roachpb.Value{RawBytes: rawBytes}}) + return nil + }) + require.NoError(t, err) + return kvs + } + testutils.RunTrueAndFalse(t, "reverse", func(t *testing.T, reverse bool) { makeKV := func(key []byte, val []byte) roachpb.KeyValue { return roachpb.KeyValue{Key: key, Value: roachpb.Value{RawBytes: val}} @@ -1798,6 +1812,14 @@ func TestTxnBufferedWritesOverlappingScan(t *testing.T) { makeKV(keyB, valueTxn), makeKV(keyC, valueTxn), }, }, + { + // Entirely within the buffer and the server response is empty. + key: keyB, + endKey: keyC, + expRes: []roachpb.KeyValue{ + makeKV(keyB, valueTxn), + }, + }, { // End key is present in the buffer, but isn't returned because the scan // is exclusive. @@ -1834,27 +1856,55 @@ func TestTxnBufferedWritesOverlappingScan(t *testing.T) { }, }, } { - var res []kv.KeyValue - var err error - if reverse { - res, err = txn.ReverseScan(ctx, tc.key, tc.endKey, 0 /* maxRows */) - require.NoError(t, err) - } else { - res, err = txn.Scan(ctx, tc.key, tc.endKey, 0 /* maxRows */) - require.NoError(t, err) - } - if reverse { - // Reverse the expected result. - sort.Slice(tc.expRes, func(i, j int) bool { - return bytes.Compare(tc.expRes[i].Key, tc.expRes[j].Key) > 0 - }) - } - require.Len(t, res, len(tc.expRes), "failed %d", i) - for i, exp := range tc.expRes { - require.Equal(t, exp.Key, res[i].Key, "failed %d", i) - val, err := res[i].Value.GetBytes() - require.NoError(t, err) - require.Equal(t, exp.Value.RawBytes, val) + for _, sf := range []kvpb.ScanFormat{ + kvpb.KEY_VALUES, + kvpb.BATCH_RESPONSE, + } { + var req kvpb.Request + if reverse { + req = &kvpb.ReverseScanRequest{ + RequestHeader: kvpb.RequestHeader{ + Key: tc.key, + EndKey: tc.endKey, + }, + ScanFormat: sf, + } + } else { + req = &kvpb.ScanRequest{ + RequestHeader: kvpb.RequestHeader{ + Key: tc.key, + EndKey: tc.endKey, + }, + ScanFormat: sf, + } + } + b := txn.NewBatch() + b.AddRawRequest(req) + require.NoError(t, txn.Run(ctx, b)) + br := b.RawResponse() + + require.Equal(t, 1, len(br.Responses)) + var kvs []roachpb.KeyValue + if reverse { + rsr := br.Responses[0].GetInner().(*kvpb.ReverseScanResponse) + kvs = extractKVs(rsr.Rows, rsr.BatchResponses) + } else { + sr := br.Responses[0].GetInner().(*kvpb.ScanResponse) + kvs = extractKVs(sr.Rows, sr.BatchResponses) + } + if reverse { + // Reverse the expected result. + sort.Slice(tc.expRes, func(i, j int) bool { + return bytes.Compare(tc.expRes[i].Key, tc.expRes[j].Key) > 0 + }) + } + require.Len(t, kvs, len(tc.expRes), "failed %d", i) + for i, exp := range tc.expRes { + require.Equal(t, exp.Key, kvs[i].Key, "failed %d", i) + val, err := kvs[i].Value.GetBytes() + require.NoError(t, err) + require.Equal(t, exp.Value.RawBytes, val) + } } } return nil From 3d740ceb8fa5a3044d2befaadda96a2b76160623 Mon Sep 17 00:00:00 2001 From: Yahor Yuzefovich Date: Mon, 17 Mar 2025 20:48:44 -0700 Subject: [PATCH 4/5] kvcoord: correctly set NumKeys and NumBytes on merged scan responses This commit utilizes the respSizeHelper struct to calculate the final values for `NumKeys` and `NumBytes` fields of merged Scan and ReverseScan responses when they are produced by the txnWriteBuffer. The existing test has been extended to verify these values which required some minor adjustments to correctly simulate already committed expected values (they have the timestamp set). Release note: None --- pkg/kv/kvclient/kvcoord/BUILD.bazel | 1 + .../kvcoord/txn_interceptor_write_buffer.go | 33 ++++++++++++----- pkg/kv/kvclient/kvcoord/txn_test.go | 35 +++++++++++++++++-- 3 files changed, 58 insertions(+), 11 deletions(-) diff --git a/pkg/kv/kvclient/kvcoord/BUILD.bazel b/pkg/kv/kvclient/kvcoord/BUILD.bazel index 93dd018800a1..86a282f34cc1 100644 --- a/pkg/kv/kvclient/kvcoord/BUILD.bazel +++ b/pkg/kv/kvclient/kvcoord/BUILD.bazel @@ -206,6 +206,7 @@ go_test( "//pkg/sql/pgwire/pgerror", "//pkg/storage", "//pkg/storage/enginepb", + "//pkg/storage/mvccencoding", "//pkg/testutils", "//pkg/testutils/datapathutils", "//pkg/testutils/kvclientutils", diff --git a/pkg/kv/kvclient/kvcoord/txn_interceptor_write_buffer.go b/pkg/kv/kvclient/kvcoord/txn_interceptor_write_buffer.go index 766650fdcbec..55d7806c32e2 100644 --- a/pkg/kv/kvclient/kvcoord/txn_interceptor_write_buffer.go +++ b/pkg/kv/kvclient/kvcoord/txn_interceptor_write_buffer.go @@ -518,7 +518,7 @@ func (twb *txnWriteBuffer) mergeWithScanResp( respIter.reset() rm := makeRespMerger(respIter, h) twb.mergeBufferAndResp(respIter, rm.acceptKV, rm.acceptServerResp, false /* reverse */) - return rm.toScanResp(resp), nil + return rm.toScanResp(resp, h), nil } // mergeWithReverseScanResp takes a ReverseScanRequest, that was sent to the KV @@ -542,7 +542,7 @@ func (twb *txnWriteBuffer) mergeWithReverseScanResp( respIter.reset() rm := makeRespMerger(respIter, h) twb.mergeBufferAndResp(respIter, rm.acceptKV, rm.acceptServerResp, true /* reverse */) - return rm.toReverseScanResp(resp), nil + return rm.toReverseScanResp(resp, h), nil } // mergeBufferAndScanResp merges (think the merge step from merge sort) the @@ -1131,6 +1131,11 @@ func (s *respIter) seq() enginepb.TxnSeq { type respSizeHelper struct { it *respIter + // numKeys and numBytes track the values that NumKeys and NumBytes fields of + // the merged {,Reverse}ScanResponse should be set to. + numKeys int64 + numBytes int64 + // rowsSize tracks the total number of KVs that we'll include in the Rows // field of the merged {,Reverse}ScanResponse when KEY_VALUES scan format is // used. @@ -1163,23 +1168,31 @@ func makeRespSizeHelper(it *respIter) respSizeHelper { } func (h *respSizeHelper) acceptBuffer(key roachpb.Key, value *roachpb.Value) { + h.numKeys++ + lenKV, _ := encKVLength(key, value) + h.numBytes += int64(lenKV) if h.it.scanFormat == kvpb.KEY_VALUES { h.rowsSize++ return } - lenKV, _ := encKVLength(key, value) // Note that this will always be in bounds even when h.it is no longer // valid (due to the "spill-over" slice). h.batchResponseSize[h.it.brIndex] += lenKV } func (h *respSizeHelper) acceptResp() { + h.numKeys++ if h.it.scanFormat == kvpb.KEY_VALUES { + kv := h.it.rows[h.it.rowsIndex] + lenKV, _ := encKVLength(kv.Key, &kv.Value) + h.numBytes += int64(lenKV) h.rowsSize++ return } br := h.it.batchResponses[h.it.brIndex][h.it.brOffset:] - h.batchResponseSize[h.it.brIndex] += getFirstKVLength(br) + lenKV := getFirstKVLength(br) + h.numBytes += int64(lenKV) + h.batchResponseSize[h.it.brIndex] += lenKV } // respMerger encapsulates state to combine a {,Reverse}ScanResponse, returned @@ -1261,10 +1274,11 @@ func (m *respMerger) acceptServerResp() { // toScanResp populates a copy of the given response with the final merged // state. -func (m *respMerger) toScanResp(resp *kvpb.ScanResponse) *kvpb.ScanResponse { +func (m *respMerger) toScanResp(resp *kvpb.ScanResponse, h respSizeHelper) *kvpb.ScanResponse { assertTrue(m.serverRespIter.scanReq != nil, "weren't accumulating a scan resp") - // TODO(yuzefovich): we need to update NumKeys and NumBytes. result := resp.ShallowCopy().(*kvpb.ScanResponse) + result.NumKeys = h.numKeys + result.NumBytes = h.numBytes if m.serverRespIter.scanFormat == kvpb.KEY_VALUES { // If we've done everything correctly, resIdx == len(response rows). assertTrue(m.rowsIdx == len(m.rows), "did not fill in all rows; did we miscount?") @@ -1287,10 +1301,13 @@ func (m *respMerger) toScanResp(resp *kvpb.ScanResponse) *kvpb.ScanResponse { // toReverseScanResp populates a copy of the given response with the final // merged state. -func (m *respMerger) toReverseScanResp(resp *kvpb.ReverseScanResponse) *kvpb.ReverseScanResponse { +func (m *respMerger) toReverseScanResp( + resp *kvpb.ReverseScanResponse, h respSizeHelper, +) *kvpb.ReverseScanResponse { assertTrue(m.serverRespIter.scanReq == nil, "weren't accumulating a reverse scan resp") - // TODO(yuzefovich): we need to update NumKeys and NumBytes. result := resp.ShallowCopy().(*kvpb.ReverseScanResponse) + result.NumKeys = h.numKeys + result.NumBytes = h.numBytes if m.serverRespIter.scanFormat == kvpb.KEY_VALUES { // If we've done everything correctly, resIdx == len(response rows). assertTrue(m.rowsIdx == len(m.rows), "did not fill in all rows; did we miscount?") diff --git a/pkg/kv/kvclient/kvcoord/txn_test.go b/pkg/kv/kvclient/kvcoord/txn_test.go index 4ea7a8d4db45..f517ce6aa0b2 100644 --- a/pkg/kv/kvclient/kvcoord/txn_test.go +++ b/pkg/kv/kvclient/kvcoord/txn_test.go @@ -27,6 +27,7 @@ import ( "github.com/cockroachdb/cockroach/pkg/roachpb" "github.com/cockroachdb/cockroach/pkg/storage" "github.com/cockroachdb/cockroach/pkg/storage/enginepb" + "github.com/cockroachdb/cockroach/pkg/storage/mvccencoding" "github.com/cockroachdb/cockroach/pkg/testutils" "github.com/cockroachdb/cockroach/pkg/testutils/kvclientutils" "github.com/cockroachdb/cockroach/pkg/testutils/localtestcluster" @@ -1724,8 +1725,23 @@ func TestTxnBufferedWritesOverlappingScan(t *testing.T) { } testutils.RunTrueAndFalse(t, "reverse", func(t *testing.T, reverse bool) { + // valueTxn is a special value that indicates that a particular KV has + // been modified in the current txn but hasn't been committed yet. + valueTxn := []byte("valueTxn") makeKV := func(key []byte, val []byte) roachpb.KeyValue { - return roachpb.KeyValue{Key: key, Value: roachpb.Value{RawBytes: val}} + var ts hlc.Timestamp + if !bytes.Equal(val, valueTxn) { + // If we have a value other than valueTxn, it means that the KV + // has been committed. As such, it'll have the Timestamp set, so + // we simulate that here (this is needed to get the correct + // NumBytes estimate). (A particular timestamp doesn't matter, + // just that both WallTime and Logical parts are set.) + ts = hlc.Timestamp{WallTime: 1, Logical: 1} + } + var value roachpb.Value + value.SetBytes(val) + value.Timestamp = ts + return roachpb.KeyValue{Key: key, Value: value} } ctx := context.Background() @@ -1733,7 +1749,6 @@ func TestTxnBufferedWritesOverlappingScan(t *testing.T) { valueC := []byte("valueC") valueF := []byte("valueF") valueG := []byte("valueG") - valueTxn := []byte("valueTxn") keyA := []byte("keyA") keyB := []byte("keyB") @@ -1885,12 +1900,15 @@ func TestTxnBufferedWritesOverlappingScan(t *testing.T) { require.Equal(t, 1, len(br.Responses)) var kvs []roachpb.KeyValue + var numKeys, numBytes int if reverse { rsr := br.Responses[0].GetInner().(*kvpb.ReverseScanResponse) kvs = extractKVs(rsr.Rows, rsr.BatchResponses) + numKeys, numBytes = int(rsr.NumKeys), int(rsr.NumBytes) } else { sr := br.Responses[0].GetInner().(*kvpb.ScanResponse) kvs = extractKVs(sr.Rows, sr.BatchResponses) + numKeys, numBytes = int(sr.NumKeys), int(sr.NumBytes) } if reverse { // Reverse the expected result. @@ -1901,10 +1919,21 @@ func TestTxnBufferedWritesOverlappingScan(t *testing.T) { require.Len(t, kvs, len(tc.expRes), "failed %d", i) for i, exp := range tc.expRes { require.Equal(t, exp.Key, kvs[i].Key, "failed %d", i) + expVal, err := exp.Value.GetBytes() + require.NoError(t, err) val, err := kvs[i].Value.GetBytes() require.NoError(t, err) - require.Equal(t, exp.Value.RawBytes, val) + require.Equal(t, expVal, val) + } + // Additionally verify NumKeys and NumBytes fields. + require.Equal(t, len(tc.expRes), numKeys, "incorrect NumKeys value") + var expNumBytes int + for _, r := range tc.expRes { + // See encKVLength in txn_interceptor_write_buffer.go + // for more details on the expected sizing. + expNumBytes += 8 + mvccencoding.EncodedMVCCKeyLength(r.Key, r.Value.Timestamp) + len(r.Value.RawBytes) } + require.Equal(t, expNumBytes, numBytes, "incorrect NumBytes value") } } return nil From 0f857d10f379ea2cb0a7380892a8b9bd65e2e2f8 Mon Sep 17 00:00:00 2001 From: Herko Lategan Date: Wed, 19 Mar 2025 12:46:44 +0000 Subject: [PATCH 5/5] microbench-ci: benchmark suite changes wip --- .github/workflows/microbenchmarks-ci.yaml | 2 +- .../microbench-ci/config/pull-request-suite.yml | 16 ++++++++-------- 2 files changed, 9 insertions(+), 9 deletions(-) diff --git a/.github/workflows/microbenchmarks-ci.yaml b/.github/workflows/microbenchmarks-ci.yaml index 9e27cc5f51ae..c1bea8f8cfc1 100644 --- a/.github/workflows/microbenchmarks-ci.yaml +++ b/.github/workflows/microbenchmarks-ci.yaml @@ -1,6 +1,6 @@ name: Microbenchmarks CI on: - pull_request_target: + pull_request: types: [ opened, reopened, synchronize ] branches: [ master ] concurrency: diff --git a/pkg/cmd/microbench-ci/config/pull-request-suite.yml b/pkg/cmd/microbench-ci/config/pull-request-suite.yml index ec31ad59e700..03356083b98b 100644 --- a/pkg/cmd/microbench-ci/config/pull-request-suite.yml +++ b/pkg/cmd/microbench-ci/config/pull-request-suite.yml @@ -13,12 +13,12 @@ benchmarks: - "allocs/op" - display_name: Sysbench - labels: ["KV", "1node", "local", "oltp_read_only"] - name: "BenchmarkSysbench/KV/1node_local/oltp_read_only" + labels: ["KV", "3node", "oltp_read_only"] + name: "BenchmarkSysbench/KV/3node/oltp_read_only" package: "pkg/sql/tests" runner_group: 2 - count: 20 - iterations: 6000 + count: 15 + iterations: 3000 compare_alpha: 0.025 retries: 3 metrics: @@ -26,12 +26,12 @@ benchmarks: - "allocs/op" - display_name: Sysbench - labels: ["KV", "1node", "local", "oltp_write_only"] - name: "BenchmarkSysbench/KV/1node_local/oltp_write_only" + labels: ["KV", "3node", "oltp_write_only"] + name: "BenchmarkSysbench/KV/3node/oltp_write_only" package: "pkg/sql/tests" runner_group: 2 - count: 20 - iterations: 6000 + count: 15 + iterations: 3000 compare_alpha: 0.025 retries: 3 metrics: