forked from tendermint/tendermint
-
Notifications
You must be signed in to change notification settings - Fork 22
/
Copy pathbyzantine_test.go
273 lines (232 loc) · 8.23 KB
/
byzantine_test.go
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
243
244
245
246
247
248
249
250
251
252
253
254
255
256
257
258
259
260
261
262
263
264
265
266
267
268
269
270
271
272
273
package consensus
import (
"context"
"fmt"
"sync"
"testing"
"time"
"github.com/stretchr/testify/require"
cmn "github.com/tendermint/tendermint/libs/common"
"github.com/tendermint/tendermint/p2p"
sm "github.com/tendermint/tendermint/state"
"github.com/tendermint/tendermint/types"
)
//----------------------------------------------
// byzantine failures
// 4 validators. 1 is byzantine. The other three are partitioned into A (1 val) and B (2 vals).
// byzantine validator sends conflicting proposals into A and B,
// and prevotes/precommits on both of them.
// B sees a commit, A doesn't.
// Byzantine validator refuses to prevote.
// Heal partition and ensure A sees the commit
func TestByzantine(t *testing.T) {
N := 4
logger := consensusLogger().With("test", "byzantine")
css, cleanup := randConsensusNet(N, "consensus_byzantine_test", newMockTickerFunc(false), newCounter)
defer cleanup()
// give the byzantine validator a normal ticker
ticker := NewTimeoutTicker()
ticker.SetLogger(css[0].Logger)
css[0].SetTimeoutTicker(ticker)
switches := make([]*p2p.Switch, N)
p2pLogger := logger.With("module", "p2p")
for i := 0; i < N; i++ {
switches[i] = p2p.MakeSwitch(
config.P2P,
i,
"foo", "1.0.0",
func(i int, sw *p2p.Switch) *p2p.Switch {
return sw
})
switches[i].SetLogger(p2pLogger.With("validator", i))
}
blocksSubs := make([]types.Subscription, N)
reactors := make([]p2p.Reactor, N)
for i := 0; i < N; i++ {
// make first val byzantine
if i == 0 {
// NOTE: Now, test validators are MockPV, which by default doesn't
// do any safety checks.
css[i].privValidator.(*types.MockPV).DisableChecks()
css[i].decideProposal = func(j int) func(int64, int) {
return func(height int64, round int) {
byzantineDecideProposalFunc(t, height, round, css[j], switches[j])
}
}(i)
css[i].doPrevote = func(height int64, round int) {}
}
eventBus := css[i].eventBus
eventBus.SetLogger(logger.With("module", "events", "validator", i))
var err error
blocksSubs[i], err = eventBus.Subscribe(context.Background(), testSubscriber, types.EventQueryNewBlock)
require.NoError(t, err)
conR := NewConsensusReactor(css[i], true) // so we don't start the consensus states
conR.SetLogger(logger.With("validator", i))
conR.SetEventBus(eventBus)
var conRI p2p.Reactor = conR
// make first val byzantine
if i == 0 {
conRI = NewByzantineReactor(conR)
}
reactors[i] = conRI
sm.SaveState(css[i].blockExec.DB(), css[i].state) //for save height 1's validators info
}
defer func() {
for _, r := range reactors {
if rr, ok := r.(*ByzantineReactor); ok {
rr.reactor.Switch.Stop()
} else {
r.(*ConsensusReactor).Switch.Stop()
}
}
}()
p2p.MakeConnectedSwitches(config.P2P, N, func(i int, s *p2p.Switch) *p2p.Switch {
// ignore new switch s, we already made ours
switches[i].AddReactor("CONSENSUS", reactors[i])
return switches[i]
}, func(sws []*p2p.Switch, i, j int) {
// the network starts partitioned with globally active adversary
if i != 0 {
return
}
p2p.Connect2Switches(sws, i, j)
})
// start the non-byz state machines.
// note these must be started before the byz
for i := 1; i < N; i++ {
cr := reactors[i].(*ConsensusReactor)
cr.SwitchToConsensus(cr.conS.GetState(), 0)
}
// start the byzantine state machine
byzR := reactors[0].(*ByzantineReactor)
s := byzR.reactor.conS.GetState()
byzR.reactor.SwitchToConsensus(s, 0)
// byz proposer sends one block to peers[0]
// and the other block to peers[1] and peers[2].
// note peers and switches order don't match.
peers := switches[0].Peers().List()
// partition A
ind0 := getSwitchIndex(switches, peers[0])
// partition B
ind1 := getSwitchIndex(switches, peers[1])
ind2 := getSwitchIndex(switches, peers[2])
p2p.Connect2Switches(switches, ind1, ind2)
// wait for someone in the big partition (B) to make a block
<-blocksSubs[ind2].Out()
t.Log("A block has been committed. Healing partition")
p2p.Connect2Switches(switches, ind0, ind1)
p2p.Connect2Switches(switches, ind0, ind2)
// wait till everyone makes the first new block
// (one of them already has)
wg := new(sync.WaitGroup)
wg.Add(2)
for i := 1; i < N-1; i++ {
go func(j int) {
<-blocksSubs[j].Out()
wg.Done()
}(i)
}
done := make(chan struct{})
go func() {
wg.Wait()
close(done)
}()
tick := time.NewTicker(time.Second * 10)
select {
case <-done:
case <-tick.C:
for i, reactor := range reactors {
t.Log(fmt.Sprintf("Consensus Reactor %v", i))
t.Log(fmt.Sprintf("%v", reactor))
}
t.Fatalf("Timed out waiting for all validators to commit first block")
}
}
//-------------------------------
// byzantine consensus functions
func byzantineDecideProposalFunc(t *testing.T, height int64, round int, cs *ConsensusState, sw *p2p.Switch) {
// byzantine user should create two proposals and try to split the vote.
// Avoid sending on internalMsgQueue and running consensus state.
// Create a new proposal block from state/txs from the mempool.
block1, blockParts1 := cs.createProposalBlock()
polRound, propBlockID := cs.ValidRound, types.BlockID{Hash: block1.Hash(), PartsHeader: blockParts1.Header()}
proposal1 := types.NewProposal(height, round, polRound, propBlockID)
if err := cs.privValidator.SignProposal(cs.state.ChainID, proposal1); err != nil {
t.Error(err)
}
// Create a new proposal block from state/txs from the mempool.
block2, blockParts2 := cs.createProposalBlock()
polRound, propBlockID = cs.ValidRound, types.BlockID{Hash: block2.Hash(), PartsHeader: blockParts2.Header()}
proposal2 := types.NewProposal(height, round, polRound, propBlockID)
if err := cs.privValidator.SignProposal(cs.state.ChainID, proposal2); err != nil {
t.Error(err)
}
block1Hash := block1.Hash()
block2Hash := block2.Hash()
// broadcast conflicting proposals/block parts to peers
peers := sw.Peers().List()
t.Logf("Byzantine: broadcasting conflicting proposals to %d peers", len(peers))
for i, peer := range peers {
if i < len(peers)/2 {
go sendProposalAndParts(height, round, cs, peer, proposal1, block1Hash, blockParts1)
} else {
go sendProposalAndParts(height, round, cs, peer, proposal2, block2Hash, blockParts2)
}
}
}
func sendProposalAndParts(height int64, round int, cs *ConsensusState, peer p2p.Peer, proposal *types.Proposal, blockHash []byte, parts *types.PartSet) {
// proposal
msg := &ProposalMessage{Proposal: proposal}
peer.Send(DataChannel, cdc.MustMarshalBinaryBare(msg))
// parts
for i := 0; i < int(parts.Total()); i++ {
part := parts.GetPart(i)
msg := &BlockPartMessage{
Height: height, // This tells peer that this part applies to us.
Round: round, // This tells peer that this part applies to us.
Part: part,
}
peer.Send(DataChannel, cdc.MustMarshalBinaryBare(msg))
}
// votes
cs.mtx.Lock()
prevote, _ := cs.signVote(types.PrevoteType, blockHash, parts.Header())
precommit, _ := cs.signVote(types.PrecommitType, blockHash, parts.Header())
cs.mtx.Unlock()
peer.Send(VoteChannel, cdc.MustMarshalBinaryBare(&VoteMessage{prevote}))
peer.Send(VoteChannel, cdc.MustMarshalBinaryBare(&VoteMessage{precommit}))
}
//----------------------------------------
// byzantine consensus reactor
type ByzantineReactor struct {
cmn.Service
reactor *ConsensusReactor
}
func NewByzantineReactor(conR *ConsensusReactor) *ByzantineReactor {
return &ByzantineReactor{
Service: conR,
reactor: conR,
}
}
func (br *ByzantineReactor) SetSwitch(s *p2p.Switch) { br.reactor.SetSwitch(s) }
func (br *ByzantineReactor) GetChannels() []*p2p.ChannelDescriptor { return br.reactor.GetChannels() }
func (br *ByzantineReactor) AddPeer(peer p2p.Peer) {
if !br.reactor.IsRunning() {
return
}
// Create peerState for peer
peerState := NewPeerState(peer).SetLogger(br.reactor.Logger)
peer.Set(types.PeerStateKey, peerState)
// Send our state to peer.
// If we're fast_syncing, broadcast a RoundStepMessage later upon SwitchToConsensus().
if !br.reactor.fastSync {
br.reactor.sendNewRoundStepMessage(peer)
}
}
func (br *ByzantineReactor) RemovePeer(peer p2p.Peer, reason interface{}) {
br.reactor.RemovePeer(peer, reason)
}
func (br *ByzantineReactor) Receive(chID byte, peer p2p.Peer, msgBytes []byte) {
br.reactor.Receive(chID, peer, msgBytes)
}
func (br *ByzantineReactor) InitPeer(peer p2p.Peer) p2p.Peer { return peer }