Skip to content

Commit

Permalink
Add the Memory Protector to limit the resource usage of the query ope…
Browse files Browse the repository at this point in the history
…rations (#599)
  • Loading branch information
hanahmily authored Jan 23, 2025
1 parent 765af71 commit 9724431
Show file tree
Hide file tree
Showing 29 changed files with 580 additions and 25 deletions.
1 change: 1 addition & 0 deletions CHANGES.md
Original file line number Diff line number Diff line change
Expand Up @@ -21,6 +21,7 @@ Release Notes.
- Add the "api version" service to gRPC and HTTP server.
- Metadata: Wait for the existing registration to be removed before registering the node.
- Stream: Introduce the batch scan to improve the performance of the query and limit the memory usage.
- Add memory protector to protect the memory usage of the system. It will limit the memory usage of the querying.

### Bug Fixes

Expand Down
2 changes: 1 addition & 1 deletion banyand/dquery/dquery.go
Original file line number Diff line number Diff line change
Expand Up @@ -94,7 +94,7 @@ func (q *queryService) Name() string {

func (q *queryService) FlagSet() *run.FlagSet {
fs := run.NewFlagSet("distributed-query")
fs.DurationVar(&q.slowQuery, "dst-slow-query", 0, "distributed slow query threshold, 0 means no slow query log")
fs.DurationVar(&q.slowQuery, "dst-slow-query", 5*time.Second, "distributed slow query threshold, 0 means no slow query log")
return fs
}

Expand Down
2 changes: 1 addition & 1 deletion banyand/liaison/grpc/property.go
Original file line number Diff line number Diff line change
Expand Up @@ -61,7 +61,7 @@ import (
"github.com/apache/skywalking-banyandb/pkg/query"
)

const defaultQueryTimeout = 30 * time.Second
const defaultQueryTimeout = 10 * time.Second

type propertyServer struct {
propertyv1.UnimplementedPropertyServiceServer
Expand Down
6 changes: 5 additions & 1 deletion banyand/measure/measure.go
Original file line number Diff line number Diff line change
Expand Up @@ -25,6 +25,7 @@ import (

commonv1 "github.com/apache/skywalking-banyandb/api/proto/banyandb/common/v1"
databasev1 "github.com/apache/skywalking-banyandb/api/proto/banyandb/database/v1"
"github.com/apache/skywalking-banyandb/banyand/protector"
"github.com/apache/skywalking-banyandb/banyand/queue"
"github.com/apache/skywalking-banyandb/pkg/logger"
"github.com/apache/skywalking-banyandb/pkg/partition"
Expand Down Expand Up @@ -53,6 +54,7 @@ type option struct {

type measure struct {
databaseSupplier schema.Supplier
pm *protector.Memory
indexTagMap map[string]struct{}
l *logger.Logger
schema *databasev1.Measure
Expand Down Expand Up @@ -122,14 +124,16 @@ type measureSpec struct {
topNAggregations []*databasev1.TopNAggregation
}

func openMeasure(shardNum uint32, db schema.Supplier, spec measureSpec, l *logger.Logger, pipeline queue.Queue,
func openMeasure(shardNum uint32, db schema.Supplier, spec measureSpec,
l *logger.Logger, pipeline queue.Queue, pm *protector.Memory,
) (*measure, error) {
m := &measure{
shardNum: shardNum,
schema: spec.schema,
indexRules: spec.indexRules,
topNAggregations: spec.topNAggregations,
l: l,
pm: pm,
}
if err := m.parseSpec(); err != nil {
return nil, err
Expand Down
4 changes: 3 additions & 1 deletion banyand/measure/measure_suite_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -28,6 +28,7 @@ import (
"github.com/apache/skywalking-banyandb/banyand/metadata"
"github.com/apache/skywalking-banyandb/banyand/metadata/embeddedserver"
"github.com/apache/skywalking-banyandb/banyand/observability"
"github.com/apache/skywalking-banyandb/banyand/protector"
"github.com/apache/skywalking-banyandb/banyand/query"
"github.com/apache/skywalking-banyandb/banyand/queue"
"github.com/apache/skywalking-banyandb/pkg/logger"
Expand Down Expand Up @@ -75,8 +76,9 @@ func setUp() (*services, func()) {
gomega.Expect(err).NotTo(gomega.HaveOccurred())

metricSvc := observability.NewMetricService(metadataService, pipeline, "test", nil)
pm := protector.NewMemory(metricSvc)
// Init Measure Service
measureService, err := measure.NewService(context.TODO(), metadataService, pipeline, nil, metricSvc)
measureService, err := measure.NewService(metadataService, pipeline, nil, metricSvc, pm)
gomega.Expect(err).NotTo(gomega.HaveOccurred())
preloadMeasureSvc := &preloadMeasureService{metaSvc: metadataService}
querySvc, err := query.NewService(context.TODO(), nil, measureService, metadataService, pipeline)
Expand Down
7 changes: 5 additions & 2 deletions banyand/measure/metadata.go
Original file line number Diff line number Diff line change
Expand Up @@ -34,6 +34,7 @@ import (
"github.com/apache/skywalking-banyandb/banyand/metadata"
"github.com/apache/skywalking-banyandb/banyand/metadata/schema"
"github.com/apache/skywalking-banyandb/banyand/observability"
"github.com/apache/skywalking-banyandb/banyand/protector"
"github.com/apache/skywalking-banyandb/banyand/queue"
"github.com/apache/skywalking-banyandb/pkg/logger"
resourceSchema "github.com/apache/skywalking-banyandb/pkg/schema"
Expand Down Expand Up @@ -267,6 +268,7 @@ type supplier struct {
pipeline queue.Queue
omr observability.MetricsRegistry
l *logger.Logger
pm *protector.Memory
path string
option option
}
Expand All @@ -279,6 +281,7 @@ func newSupplier(path string, svc *service) *supplier {
pipeline: svc.localPipeline,
option: svc.option,
omr: svc.omr,
pm: svc.pm,
}
}

Expand All @@ -288,7 +291,7 @@ func (s *supplier) OpenResource(shardNum uint32, supplier resourceSchema.Supplie
schema: measureSchema,
indexRules: spec.IndexRules(),
topNAggregations: spec.TopN(),
}, s.l, s.pipeline)
}, s.l, s.pipeline, s.pm)
}

func (s *supplier) ResourceSchema(md *commonv1.Metadata) (resourceSchema.ResourceSchema, error) {
Expand Down Expand Up @@ -351,5 +354,5 @@ func (s *portableSupplier) OpenResource(shardNum uint32, _ resourceSchema.Suppli
schema: measureSchema,
indexRules: spec.IndexRules(),
topNAggregations: spec.TopN(),
}, s.l, nil)
}, s.l, nil, nil)
}
5 changes: 5 additions & 0 deletions banyand/measure/query.go
Original file line number Diff line number Diff line change
Expand Up @@ -353,6 +353,7 @@ func (s *measure) searchBlocks(ctx context.Context, result *queryResult, sids []
return fmt.Errorf("cannot init tstIter: %w", tstIter.Error())
}
var hit int
var totalBlockBytes uint64
for tstIter.nextBlock() {
if hit%checkDoneEvery == 0 {
select {
Expand All @@ -366,10 +367,14 @@ func (s *measure) searchBlocks(ctx context.Context, result *queryResult, sids []
p := tstIter.piHeap[0]
bc.init(p.p, p.curBlock, qo)
result.data = append(result.data, bc)
totalBlockBytes += bc.bm.uncompressedSizeBytes
}
if tstIter.Error() != nil {
return fmt.Errorf("cannot iterate tstIter: %w", tstIter.Error())
}
if err := s.pm.AcquireResource(ctx, totalBlockBytes); err != nil {
return err
}
result.sidToIndex = make(map[common.SeriesID]int)
for i, si := range originalSids {
result.sidToIndex[si] = i
Expand Down
5 changes: 4 additions & 1 deletion banyand/measure/service.go
Original file line number Diff line number Diff line change
Expand Up @@ -28,6 +28,7 @@ import (
databasev1 "github.com/apache/skywalking-banyandb/api/proto/banyandb/database/v1"
"github.com/apache/skywalking-banyandb/banyand/metadata"
"github.com/apache/skywalking-banyandb/banyand/observability"
"github.com/apache/skywalking-banyandb/banyand/protector"
"github.com/apache/skywalking-banyandb/banyand/queue"
"github.com/apache/skywalking-banyandb/pkg/bus"
"github.com/apache/skywalking-banyandb/pkg/logger"
Expand Down Expand Up @@ -60,6 +61,7 @@ type service struct {
omr observability.MetricsRegistry
schemaRepo *schemaRepo
l *logger.Logger
pm *protector.Memory
root string
option option
maxDiskUsagePercent int
Expand Down Expand Up @@ -147,11 +149,12 @@ func (s *service) GracefulStop() {
}

// NewService returns a new service.
func NewService(_ context.Context, metadata metadata.Repo, pipeline queue.Server, metricPipeline queue.Server, omr observability.MetricsRegistry) (Service, error) {
func NewService(metadata metadata.Repo, pipeline queue.Server, metricPipeline queue.Server, omr observability.MetricsRegistry, pm *protector.Memory) (Service, error) {
return &service{
metadata: metadata,
pipeline: pipeline,
metricPipeline: metricPipeline,
omr: omr,
pm: pm,
}, nil
}
191 changes: 191 additions & 0 deletions banyand/protector/protector.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,191 @@
// Licensed to Apache Software Foundation (ASF) under one or more contributor
// license agreements. See the NOTICE file distributed with
// this work for additional information regarding copyright
// ownership. Apache Software Foundation (ASF) licenses this file to you under
// the Apache License, Version 2.0 (the "License"); you may
// not use this file except in compliance with the License.
// You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing,
// software distributed under the License is distributed on an
// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
// KIND, either express or implied. See the License for the
// specific language governing permissions and limitations
// under the License.

// Package protector provides a set of protectors that stop the query services when the resource usage exceeds the limit.
package protector

import (
"context"
"errors"
"fmt"
"runtime/metrics"
"sync/atomic"
"time"

"github.com/dustin/go-humanize"

"github.com/apache/skywalking-banyandb/banyand/observability"
"github.com/apache/skywalking-banyandb/pkg/cgroups"
"github.com/apache/skywalking-banyandb/pkg/logger"
"github.com/apache/skywalking-banyandb/pkg/meter"
"github.com/apache/skywalking-banyandb/pkg/run"
)

var scope = observability.RootScope.SubScope("memory_protector")

// Memory is a protector that stops the query services when the memory usage exceeds the limit.
type Memory struct {
omr observability.MetricsRegistry
limitGauge meter.Gauge
usageGauge meter.Gauge
l *logger.Logger
closed chan struct{}
blockedChan chan struct{}
allowedPercent int
allowedBytes run.Bytes
limit uint64
usage uint64
}

// NewMemory creates a new Memory protector.
func NewMemory(omr observability.MetricsRegistry) *Memory {
queueSize := cgroups.CPUs()
factory := omr.With(scope)

return &Memory{
omr: omr,
blockedChan: make(chan struct{}, queueSize),
closed: make(chan struct{}),

limitGauge: factory.NewGauge("limit"),
usageGauge: factory.NewGauge("usage"),
}
}

// AcquireResource attempts to acquire a `size` amount of memory.
func (m *Memory) AcquireResource(ctx context.Context, size uint64) error {
if m.limit == 0 {
return nil
}
start := time.Now()

select {
case m.blockedChan <- struct{}{}:
defer func() { <-m.blockedChan }()
case <-ctx.Done():
return fmt.Errorf("context canceled while waiting for blocked queue slot: %w", ctx.Err())
}

for {
currentUsage := atomic.LoadUint64(&m.usage)
if currentUsage+size <= m.limit {
return nil
}

select {
case <-time.After(100 * time.Millisecond):
continue
case <-ctx.Done():
return fmt.Errorf(
"context canceled: memory acquisition failed (currentUsage: %d, limit: %d, size: %d, blockedDuration: %v): %w",
currentUsage, m.limit, size, time.Since(start), ctx.Err(),
)
}
}
}

// Name returns the name of the protector.
func (m *Memory) Name() string {
return "memory-protector"
}

// FlagSet returns the flag set for the protector.
func (m *Memory) FlagSet() *run.FlagSet {
flagS := run.NewFlagSet(m.Name())
flagS.IntVarP(&m.allowedPercent, "allowed-percent", "", 75,
"Allowed bytes of memory usage. If the memory usage exceeds this value, the query services will stop. "+
"Setting a large value may evict data from the OS page cache, causing high disk I/O.")
flagS.VarP(&m.allowedBytes, "allowed-bytes", "", "Allowed percentage of total memory usage. If usage exceeds this value, the query services will stop. "+
"This takes effect only if `allowed-bytes` is 0. If usage is too high, it may cause OS page cache eviction.")
return flagS
}

// Validate validates the protector's flags.
func (m *Memory) Validate() error {
if m.allowedPercent <= 0 || m.allowedPercent > 100 {
if m.allowedBytes <= 0 {
return errors.New("allowed-bytes must be greater than 0")
}
return errors.New("allowed-percent must be in the range (0, 100]")
}
return nil
}

// PreRun initializes the protector.
func (m *Memory) PreRun(context.Context) error {
m.l = logger.GetLogger(m.Name())
if m.allowedBytes > 0 {
m.limit = uint64(m.allowedBytes)
m.l.Info().
Str("limit", humanize.Bytes(m.limit)).
Msg("memory protector enabled")
} else {
cgLimit, err := cgroups.MemoryLimit()
if err != nil {
m.l.Warn().Err(err).Msg("failed to get memory limit from cgroups, disable memory protector")
return nil
}
if cgLimit <= 0 || cgLimit > 1e18 {
m.l.Warn().Int64("cgroup_memory_limit", cgLimit).Msg("cgroup memory limit is invalid, disable memory protector")
return nil
}
m.limit = uint64(cgLimit) * uint64(m.allowedPercent) / 100
m.l.Info().
Str("limit", humanize.Bytes(m.limit)).
Str("cgroup_limit", humanize.Bytes(uint64(cgLimit))).
Int("percent", m.allowedPercent).
Msg("memory protector enabled")
}
m.limitGauge.Set(float64(m.limit))
return nil
}

// GracefulStop stops the protector.
func (m *Memory) GracefulStop() {
close(m.closed)
}

// Serve starts the protector.
func (m *Memory) Serve() run.StopNotify {
if m.limit == 0 {
return m.closed
}
go func() {
ticker := time.NewTicker(5 * time.Second)
defer ticker.Stop()

for {
select {
case <-m.closed:
return
case <-ticker.C:
samples := []metrics.Sample{
{Name: "/memory/classes/total:bytes"},
}
metrics.Read(samples)
usedBytes := samples[0].Value.Uint64()

atomic.StoreUint64(&m.usage, usedBytes)

if usedBytes > m.limit {
m.l.Warn().Str("used", humanize.Bytes(usedBytes)).Str("limit", humanize.Bytes(m.limit)).Msg("memory usage exceeds limit")
}
}
}
}()
return m.closed
}
Loading

0 comments on commit 9724431

Please sign in to comment.