Skip to content

Commit 7ee9542

Browse files
authored
Improve all rule checking slots with standard BlockError representation (#187)
* Improve all rule checking slots with standard BlockError representation Signed-off-by: Eric Zhao <[email protected]>
1 parent 1f1c71b commit 7ee9542

11 files changed

+86
-71
lines changed

api/api_test.go

+1-1
Original file line numberDiff line numberDiff line change
@@ -103,7 +103,7 @@ func Test_entryWithArgsAndChainBlock(t *testing.T) {
103103
blockType := base.BlockTypeFlow
104104

105105
ps1.On("Prepare", mock.Anything).Return()
106-
rcs1.On("Check", mock.Anything).Return(base.NewTokenResultBlocked(blockType, "Flow"))
106+
rcs1.On("Check", mock.Anything).Return(base.NewTokenResultBlocked(blockType))
107107
rcs2.On("Check", mock.Anything).Return(base.NewTokenResultPass())
108108
ssm.On("OnEntryPassed", mock.Anything).Return()
109109
ssm.On("OnEntryBlocked", mock.Anything, mock.Anything).Return()

core/base/block_error.go

+14-6
Original file line numberDiff line numberDiff line change
@@ -5,9 +5,11 @@ import "fmt"
55
// BlockError indicates the request was blocked by Sentinel.
66
type BlockError struct {
77
blockType BlockType
8-
blockMsg string
8+
// blockMsg provides additional message for the block error.
9+
blockMsg string
910

10-
rule SentinelRule
11+
rule SentinelRule
12+
// snapshotValue represents the triggered "snapshot" value
1113
snapshotValue interface{}
1214
}
1315

@@ -36,15 +38,21 @@ func NewBlockErrorFromDeepCopy(from *BlockError) *BlockError {
3638
}
3739
}
3840

39-
func NewBlockError(blockType BlockType, blockMsg string) *BlockError {
40-
return &BlockError{blockType: blockType, blockMsg: blockMsg}
41+
func NewBlockError(blockType BlockType) *BlockError {
42+
return &BlockError{blockType: blockType}
43+
}
44+
45+
func NewBlockErrorWithMessage(blockType BlockType, message string) *BlockError {
46+
return &BlockError{blockType: blockType, blockMsg: message}
4147
}
4248

4349
func NewBlockErrorWithCause(blockType BlockType, blockMsg string, rule SentinelRule, snapshot interface{}) *BlockError {
4450
return &BlockError{blockType: blockType, blockMsg: blockMsg, rule: rule, snapshotValue: snapshot}
4551
}
4652

4753
func (e *BlockError) Error() string {
48-
return fmt.Sprintf("Sentinel block error: blockType:%s, message: %s, rule: %+v, snapshotValue: %+v",
49-
e.blockType, e.blockMsg, e.rule, e.snapshotValue)
54+
if len(e.blockMsg) == 0 {
55+
return fmt.Sprintf("SentinelBlockError: %s", e.blockType.String())
56+
}
57+
return fmt.Sprintf("SentinelBlockError: %s, message: %s", e.blockType.String(), e.blockMsg)
5058
}

core/base/context_test.go

+1-1
Original file line numberDiff line numberDiff line change
@@ -9,6 +9,6 @@ import (
99
func TestEntryContext_IsBlocked(t *testing.T) {
1010
ctx := NewEmptyEntryContext()
1111
assert.False(t, ctx.IsBlocked(), "empty context with no result should indicate pass")
12-
ctx.RuleCheckResult = NewTokenResultBlocked(BlockTypeUnknown, "Unknown")
12+
ctx.RuleCheckResult = NewTokenResultBlocked(BlockTypeUnknown)
1313
assert.True(t, ctx.IsBlocked(), "context with blocked request should indicate blocked")
1414
}

core/base/result.go

+36-28
Original file line numberDiff line numberDiff line change
@@ -70,6 +70,7 @@ func (r *TokenResult) DeepCopyFrom(newResult *TokenResult) {
7070
snapshotValue: newResult.blockErr.snapshotValue,
7171
}
7272
} else {
73+
// TODO: review the reusing logic
7374
r.blockErr.blockType = newResult.blockErr.blockType
7475
r.blockErr.blockMsg = newResult.blockErr.blockMsg
7576
r.blockErr.rule = newResult.blockErr.rule
@@ -83,29 +84,36 @@ func (r *TokenResult) ResetToPass() {
8384
r.waitMs = 0
8485
}
8586

86-
func (r *TokenResult) ResetToBlockedFrom(blockType BlockType, blockMsg string) {
87+
func (r *TokenResult) ResetToBlocked(blockType BlockType) {
8788
r.status = ResultStatusBlocked
8889
if r.blockErr == nil {
89-
r.blockErr = &BlockError{
90-
blockType: blockType,
91-
blockMsg: blockMsg,
92-
}
90+
r.blockErr = NewBlockError(blockType)
91+
} else {
92+
r.blockErr.blockType = blockType
93+
r.blockErr.blockMsg = ""
94+
r.blockErr.rule = nil
95+
r.blockErr.snapshotValue = nil
96+
}
97+
r.waitMs = 0
98+
}
99+
100+
func (r *TokenResult) ResetToBlockedWithMessage(blockType BlockType, blockMsg string) {
101+
r.status = ResultStatusBlocked
102+
if r.blockErr == nil {
103+
r.blockErr = NewBlockErrorWithMessage(blockType, blockMsg)
93104
} else {
94105
r.blockErr.blockType = blockType
95106
r.blockErr.blockMsg = blockMsg
107+
r.blockErr.rule = nil
108+
r.blockErr.snapshotValue = nil
96109
}
97110
r.waitMs = 0
98111
}
99112

100-
func (r *TokenResult) ResetToBlockedWithCauseFrom(blockType BlockType, blockMsg string, rule SentinelRule, snapshot interface{}) {
113+
func (r *TokenResult) ResetToBlockedWithCause(blockType BlockType, blockMsg string, rule SentinelRule, snapshot interface{}) {
101114
r.status = ResultStatusBlocked
102115
if r.blockErr == nil {
103-
r.blockErr = &BlockError{
104-
blockType: blockType,
105-
blockMsg: blockMsg,
106-
rule: rule,
107-
snapshotValue: snapshot,
108-
}
116+
r.blockErr = NewBlockErrorWithCause(blockType, blockMsg, rule, snapshot)
109117
} else {
110118
r.blockErr.blockType = blockType
111119
r.blockErr.blockMsg = blockMsg
@@ -142,7 +150,7 @@ func (r *TokenResult) String() string {
142150
} else {
143151
blockMsg = r.blockErr.Error()
144152
}
145-
return fmt.Sprintf("TokenResult{status=%+v, blockErr=%s, waitMs=%d}", r.status, blockMsg, r.waitMs)
153+
return fmt.Sprintf("TokenResult{status=%s, blockErr=%s, waitMs=%d}", r.status.String(), blockMsg, r.waitMs)
146154
}
147155

148156
func NewTokenResultPass() *TokenResult {
@@ -153,27 +161,27 @@ func NewTokenResultPass() *TokenResult {
153161
}
154162
}
155163

156-
func NewTokenResultBlocked(blockType BlockType, blockMsg string) *TokenResult {
164+
func NewTokenResultBlocked(blockType BlockType) *TokenResult {
157165
return &TokenResult{
158-
status: ResultStatusBlocked,
159-
blockErr: &BlockError{
160-
blockType: blockType,
161-
blockMsg: blockMsg,
162-
},
163-
waitMs: 0,
166+
status: ResultStatusBlocked,
167+
blockErr: NewBlockError(blockType),
168+
waitMs: 0,
169+
}
170+
}
171+
172+
func NewTokenResultBlockedWithMessage(blockType BlockType, blockMsg string) *TokenResult {
173+
return &TokenResult{
174+
status: ResultStatusBlocked,
175+
blockErr: NewBlockErrorWithMessage(blockType, blockMsg),
176+
waitMs: 0,
164177
}
165178
}
166179

167180
func NewTokenResultBlockedWithCause(blockType BlockType, blockMsg string, rule SentinelRule, snapshot interface{}) *TokenResult {
168181
return &TokenResult{
169-
status: ResultStatusBlocked,
170-
blockErr: &BlockError{
171-
blockType: blockType,
172-
blockMsg: blockMsg,
173-
rule: rule,
174-
snapshotValue: snapshot,
175-
},
176-
waitMs: 0,
182+
status: ResultStatusBlocked,
183+
blockErr: NewBlockErrorWithCause(blockType, blockMsg, rule, snapshot),
184+
waitMs: 0,
177185
}
178186
}
179187

core/base/slot_chain_test.go

+2-2
Original file line numberDiff line numberDiff line change
@@ -283,7 +283,7 @@ func TestSlotChain_Entry_Block(t *testing.T) {
283283

284284
rbs.On("Prepare", mock.Anything).Return()
285285
fsm.On("Check", mock.Anything).Return(NewTokenResultPass())
286-
dsm.On("Check", mock.Anything).Return(NewTokenResultBlocked(blockType, "Unknown"))
286+
dsm.On("Check", mock.Anything).Return(NewTokenResultBlocked(blockType))
287287
ssm.On("OnEntryPassed", mock.Anything).Return()
288288
ssm.On("OnEntryBlocked", mock.Anything, mock.Anything).Return()
289289
ssm.On("OnCompleted", mock.Anything).Return()
@@ -339,7 +339,7 @@ func TestSlotChain_Entry_With_Panic(t *testing.T) {
339339

340340
rbs.On("Prepare", mock.Anything).Return()
341341
fsm.On("Check", mock.Anything).Return(NewTokenResultPass())
342-
dsm.On("Check", mock.Anything).Return(NewTokenResultBlocked(BlockTypeUnknown, "Unknown"))
342+
dsm.On("Check", mock.Anything).Return(NewTokenResultBlocked(BlockTypeUnknown))
343343
ssm.On("OnEntryPassed", mock.Anything).Return()
344344
ssm.On("OnEntryBlocked", mock.Anything, mock.Anything).Return()
345345
ssm.On("OnCompleted", mock.Anything).Return()

core/circuitbreaker/slot.go

+11-7
Original file line numberDiff line numberDiff line change
@@ -13,19 +13,23 @@ func (b *Slot) Check(ctx *base.EntryContext) *base.TokenResult {
1313
if len(resource) == 0 {
1414
return result
1515
}
16-
if !checkPass(ctx) {
17-
result.ResetToBlockedFrom(base.BlockTypeCircuitBreaking, "CircuitBreaking")
16+
if passed, rule := checkPass(ctx); !passed {
17+
if result == nil {
18+
result = base.NewTokenResultBlockedWithCause(base.BlockTypeCircuitBreaking, "", rule, nil)
19+
} else {
20+
result.ResetToBlockedWithCause(base.BlockTypeCircuitBreaking, "", rule, nil)
21+
}
1822
}
1923
return result
2024
}
2125

22-
func checkPass(ctx *base.EntryContext) bool {
26+
func checkPass(ctx *base.EntryContext) (bool, Rule) {
2327
breakers := getResBreakers(ctx.Resource.Name())
2428
for _, breaker := range breakers {
25-
isPass := breaker.TryPass(ctx)
26-
if !isPass {
27-
return false
29+
passed := breaker.TryPass(ctx)
30+
if !passed {
31+
return false, breaker.BoundRule()
2832
}
2933
}
30-
return true
34+
return true, nil
3135
}

core/flow/rule_manager.go

+1-1
Original file line numberDiff line numberDiff line change
@@ -30,7 +30,7 @@ var (
3030
func init() {
3131
// Initialize the traffic shaping controller generator map for existing control behaviors.
3232
tcGenFuncMap[Reject] = func(rule *FlowRule) *TrafficShapingController {
33-
return NewTrafficShapingController(NewDefaultTrafficShapingCalculator(rule.Count), NewDefaultTrafficShapingChecker(rule.MetricType), rule)
33+
return NewTrafficShapingController(NewDefaultTrafficShapingCalculator(rule.Count), NewDefaultTrafficShapingChecker(rule), rule)
3434
}
3535
tcGenFuncMap[Throttling] = func(rule *FlowRule) *TrafficShapingController {
3636
return NewTrafficShapingController(NewDefaultTrafficShapingCalculator(rule.Count), NewThrottlingChecker(rule.MaxQueueingTimeMs), rule)

core/flow/tc_default.go

+5-5
Original file line numberDiff line numberDiff line change
@@ -17,25 +17,25 @@ func (d *DefaultTrafficShapingCalculator) CalculateAllowedTokens(base.StatNode,
1717
}
1818

1919
type DefaultTrafficShapingChecker struct {
20-
metricType MetricType
20+
rule *FlowRule
2121
}
2222

23-
func NewDefaultTrafficShapingChecker(metricType MetricType) *DefaultTrafficShapingChecker {
24-
return &DefaultTrafficShapingChecker{metricType: metricType}
23+
func NewDefaultTrafficShapingChecker(rule *FlowRule) *DefaultTrafficShapingChecker {
24+
return &DefaultTrafficShapingChecker{rule: rule}
2525
}
2626

2727
func (d *DefaultTrafficShapingChecker) DoCheck(node base.StatNode, acquireCount uint32, threshold float64) *base.TokenResult {
2828
if node == nil {
2929
return nil
3030
}
3131
var curCount float64
32-
if d.metricType == Concurrency {
32+
if d.rule.MetricType == Concurrency {
3333
curCount = float64(node.CurrentGoroutineNum())
3434
} else {
3535
curCount = node.GetQPS(base.MetricEventPass)
3636
}
3737
if curCount+float64(acquireCount) > threshold {
38-
return base.NewTokenResultBlocked(base.BlockTypeFlow, "Flow")
38+
return base.NewTokenResultBlockedWithCause(base.BlockTypeFlow, "", d.rule, curCount)
3939
}
4040
return nil
4141
}

core/flow/tc_throttling.go

+3-3
Original file line numberDiff line numberDiff line change
@@ -34,7 +34,7 @@ func (c *ThrottlingChecker) DoCheck(_ base.StatNode, acquireCount uint32, thresh
3434
return nil
3535
}
3636
if threshold <= 0 {
37-
return base.NewTokenResultBlocked(base.BlockTypeFlow, "Flow")
37+
return base.NewTokenResultBlocked(base.BlockTypeFlow)
3838
}
3939
// Here we use nanosecond so that we could control the queueing time more accurately.
4040
curNano := util.CurrentTimeNano()
@@ -50,15 +50,15 @@ func (c *ThrottlingChecker) DoCheck(_ base.StatNode, acquireCount uint32, thresh
5050
}
5151
estimatedQueueingDuration := atomic.LoadUint64(&c.lastPassedTime) + interval - util.CurrentTimeNano()
5252
if estimatedQueueingDuration > c.maxQueueingTimeNs {
53-
return base.NewTokenResultBlocked(base.BlockTypeFlow, "Flow")
53+
return base.NewTokenResultBlocked(base.BlockTypeFlow)
5454
}
5555

5656
oldTime := atomic.AddUint64(&c.lastPassedTime, interval)
5757
estimatedQueueingDuration = oldTime - util.CurrentTimeNano()
5858
if estimatedQueueingDuration > c.maxQueueingTimeNs {
5959
// Subtract the interval.
6060
atomic.AddUint64(&c.lastPassedTime, ^(interval - 1))
61-
return base.NewTokenResultBlocked(base.BlockTypeFlow, "Flow")
61+
return base.NewTokenResultBlocked(base.BlockTypeFlow)
6262
}
6363
if estimatedQueueingDuration > 0 {
6464
return base.NewTokenResultShouldWait(estimatedQueueingDuration / util.UnixTimeUnitOffset)

core/hotspot/traffic_shaping.go

+9-14
Original file line numberDiff line numberDiff line change
@@ -87,16 +87,14 @@ func (c *baseTrafficShapingController) performCheckingForConcurrencyMetric(arg i
8787
return nil
8888
}
8989
return base.NewTokenResultBlockedWithCause(base.BlockTypeHotSpotParamFlow,
90-
fmt.Sprintf("Blocked by specific concurrency threshold, arg: %+v, current concurrency: %d, specific concurrency: %d", arg, concurrency, specificConcurrency),
91-
c.BoundRule(), concurrency)
90+
fmt.Sprintf("arg=%v", arg), c.BoundRule(), concurrency)
9291
}
9392
threshold := int64(c.threshold)
9493
if concurrency <= threshold {
9594
return nil
9695
}
9796
return base.NewTokenResultBlockedWithCause(base.BlockTypeHotSpotParamFlow,
98-
fmt.Sprintf("Blocked by concurrency threshold, arg: %+v, current concurrency: %d, threshold concurrency: %d", arg, concurrency, threshold),
99-
c.BoundRule(), concurrency)
97+
fmt.Sprintf("arg=%v", arg), c.BoundRule(), concurrency)
10098
}
10199

102100
// rejectTrafficShapingController use Reject strategy
@@ -145,14 +143,13 @@ func (c *rejectTrafficShapingController) PerformChecking(arg interface{}, acquir
145143
}
146144
if tokenCount <= 0 {
147145
return base.NewTokenResultBlockedWithCause(base.BlockTypeHotSpotParamFlow,
148-
fmt.Sprintf("Blocked by reject traffic shaping controller, arg: %+v", arg), c.BoundRule(), "")
146+
fmt.Sprintf("arg=%v", arg), c.BoundRule(), nil)
149147
}
150148
maxCount := tokenCount + c.burstCount
151149
if acquireCount > maxCount {
152150
// return blocked because the acquired number is more than max count of rejectTrafficShapingController
153151
return base.NewTokenResultBlockedWithCause(base.BlockTypeHotSpotParamFlow,
154-
fmt.Sprintf("Blocked by reject traffic shaping controller, arg: %+v, the acquired number(%d) is more than max count(%d) of rejectTrafficShapingController", arg, acquireCount, maxCount),
155-
c.BoundRule(), "")
152+
fmt.Sprintf("arg=%v", arg), c.BoundRule(), nil)
156153
}
157154

158155
for {
@@ -187,8 +184,7 @@ func (c *rejectTrafficShapingController) PerformChecking(arg interface{}, acquir
187184
}
188185
if newQps < 0 {
189186
return base.NewTokenResultBlockedWithCause(base.BlockTypeHotSpotParamFlow,
190-
fmt.Sprintf("rejectTrafficShapingController, the new QPS after subbing acquire(%d) is less than 0.", acquireCount),
191-
c.BoundRule(), "")
187+
fmt.Sprintf("arg=%v", arg), c.BoundRule(), nil)
192188
}
193189
if atomic.CompareAndSwapInt64(oldQpsPtr, restQps, newQps) {
194190
atomic.StoreInt64(lastAddTokenTimePtr, currentTimeInMs)
@@ -208,8 +204,7 @@ func (c *rejectTrafficShapingController) PerformChecking(arg interface{}, acquir
208204
}
209205
} else {
210206
return base.NewTokenResultBlockedWithCause(base.BlockTypeHotSpotParamFlow,
211-
fmt.Sprintf("rejectTrafficShapingController, the rest token is not enough, oldRestToken: %d, acquire: %d", oldRestToken, acquireCount),
212-
c.BoundRule(), "")
207+
fmt.Sprintf("arg=%v", arg), c.BoundRule(), nil)
213208
}
214209
}
215210
runtime.Gosched()
@@ -242,7 +237,8 @@ func (c *throttlingTrafficShapingController) PerformChecking(arg interface{}, ac
242237
tokenCount = val
243238
}
244239
if tokenCount <= 0 {
245-
return base.NewTokenResultBlockedWithCause(base.BlockTypeHotSpotParamFlow, "throttlingTrafficShapingController, the setting tokenCount is <= 0", c.BoundRule(), "")
240+
return base.NewTokenResultBlockedWithCause(base.BlockTypeHotSpotParamFlow,
241+
fmt.Sprintf("arg=%v", arg), c.BoundRule(), nil)
246242
}
247243
intervalCostTime := int64(math.Round(float64(acquireCount * c.durationInSec * 1000 / tokenCount)))
248244
for {
@@ -270,8 +266,7 @@ func (c *throttlingTrafficShapingController) PerformChecking(arg interface{}, ac
270266
}
271267
} else {
272268
return base.NewTokenResultBlockedWithCause(base.BlockTypeHotSpotParamFlow,
273-
fmt.Sprintf("throttlingTrafficShapingController, current time(%d) is not reaching to expected time(%d)", currentTimeInMs, expectedTime),
274-
c.BoundRule(), "")
269+
fmt.Sprintf("arg=%v", arg), c.BoundRule(), nil)
275270
}
276271
}
277272
}

core/system/slot.go

+3-3
Original file line numberDiff line numberDiff line change
@@ -17,14 +17,14 @@ func (s *SystemAdaptiveSlot) Check(ctx *base.EntryContext) *base.TokenResult {
1717
rules := GetRules()
1818
result := ctx.RuleCheckResult
1919
for _, rule := range rules {
20-
passed, m := s.doCheckRule(rule)
20+
passed, snapshotValue := s.doCheckRule(rule)
2121
if passed {
2222
continue
2323
}
2424
if result == nil {
25-
result = base.NewTokenResultBlockedWithCause(base.BlockTypeSystemFlow, base.BlockTypeSystemFlow.String(), rule, m)
25+
result = base.NewTokenResultBlockedWithCause(base.BlockTypeSystemFlow, rule.MetricType.String(), rule, snapshotValue)
2626
} else {
27-
result.ResetToBlockedWithCauseFrom(base.BlockTypeSystemFlow, base.BlockTypeSystemFlow.String(), rule, m)
27+
result.ResetToBlockedWithCause(base.BlockTypeSystemFlow, rule.MetricType.String(), rule, snapshotValue)
2828
}
2929
return result
3030
}

0 commit comments

Comments
 (0)