Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

trie: parallelize committer #30461

Closed
wants to merge 5 commits into from
Closed
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
4 changes: 2 additions & 2 deletions core/state/statedb.go
Original file line number Diff line number Diff line change
Expand Up @@ -945,7 +945,7 @@ func (s *StateDB) fastDeleteStorage(snaps *snapshot.Tree, addrHash common.Hash,
slots = make(map[common.Hash][]byte)
)
stack := trie.NewStackTrie(func(path []byte, hash common.Hash, blob []byte) {
nodes.AddNode(path, trienode.NewDeleted())
nodes.AddNode(string(path), trienode.NewDeleted())
})
for iter.Next() {
slot := common.CopyBytes(iter.Slot())
Expand Down Expand Up @@ -991,7 +991,7 @@ func (s *StateDB) slowDeleteStorage(addr common.Address, addrHash common.Hash, r
if it.Hash() == (common.Hash{}) {
continue
}
nodes.AddNode(it.Path(), trienode.NewDeleted())
nodes.AddNode(string(it.Path()), trienode.NewDeleted())
}
if err := it.Error(); err != nil {
return nil, nil, err
Expand Down
102 changes: 76 additions & 26 deletions trie/committer.go
Original file line number Diff line number Diff line change
Expand Up @@ -18,6 +18,7 @@ package trie

import (
"fmt"
"sync"

"github.com/ethereum/go-ethereum/common"
"github.com/ethereum/go-ethereum/trie/trienode"
Expand All @@ -30,28 +31,44 @@ type committer struct {
nodes *trienode.NodeSet
tracer *tracer
collectLeaf bool
parallel bool
}

// newCommitter creates a new committer or picks one from the pool.
func newCommitter(nodeset *trienode.NodeSet, tracer *tracer, collectLeaf bool) *committer {
func newCommitter(nodes *trienode.NodeSet, tracer *tracer, collectLeaf bool, parallel bool) *committer {
return &committer{
nodes: nodeset,
nodes: nodes,
tracer: tracer,
collectLeaf: collectLeaf,
parallel: parallel,
}
}

type wrapNode struct {
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

The whole concept of wrapping nodes -- I don't see the point in it. Why is that needed? Couldn't you just copy the path for each goroutine, and then let each goroutine work on it's own path-copy individually without risking any cross-goroutine disruptions?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Because multiple goroutines calling AddNode will cause concurrent map writes.
This could be solved by using mutex, but using wrapping nodes to avoid lock/unlock can save a little more time.

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Well, if we're parallelizing things, you could have a chanwhere you send off the nodes, and then have a dedicated goroutine which just reads the chan and invokes AddNode sequentially. If you make the chan use some buffering, then it should be faster than using a lock.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Yes I did try this approach before. However, it still requires a new struct, similar to WrapNode, to initialize the chan since AddNode requires both the node itself and the path. Additionally, we would need an extra mechanism to monitor when the send operation is completed. Here's the old draft PR illustrating this approach, rough version.

For the idea of parallelizing committer and invoking AddNode sequentially, current PR can also achieve in a cleaner way, meanwhile just as fast. I believe both approaches are valid and can work well, and I'm open to further discussion if you think there's a strong case for the alternative.

node *trienode.Node
path string
leafHash common.Hash // optional, the parent hash of the related leaf
leafBlob []byte // optional, the blob of the related leaf
}

// Commit collapses a node down into a hash node.
func (c *committer) Commit(n node) hashNode {
return c.commit(nil, n).(hashNode)
hn, wnodes := c.commit(nil, n, true)
for _, wn := range wnodes {
c.nodes.AddNode(wn.path, wn.node)
if wn.leafHash != (common.Hash{}) {
c.nodes.AddLeaf(wn.leafHash, wn.leafBlob)
}
}
return hn.(hashNode)
}

// commit collapses a node down into a hash node and returns it.
func (c *committer) commit(path []byte, n node) node {
func (c *committer) commit(path []byte, n node, topmost bool) (node, []*wrapNode) {
// if this path is clean, use available cached data
hash, dirty := n.cache()
if hash != nil && !dirty {
return hash
return hash, nil
}
// Commit children, then parent, and remove the dirty flag.
switch cn := n.(type) {
Expand All @@ -61,38 +78,49 @@ func (c *committer) commit(path []byte, n node) node {

// If the child is fullNode, recursively commit,
// otherwise it can only be hashNode or valueNode.
var nodes []*wrapNode
if _, ok := cn.Val.(*fullNode); ok {
collapsed.Val = c.commit(append(path, cn.Key...), cn.Val)
collapsed.Val, nodes = c.commit(append(path, cn.Key...), cn.Val, false)
}
// The key needs to be copied, since we're adding it to the
// modified nodeset.
collapsed.Key = hexToCompact(cn.Key)
hashedNode := c.store(path, collapsed)
hashedNode, wNode := c.store(path, collapsed)
if wNode != nil {
nodes = append(nodes, wNode)
}
if hn, ok := hashedNode.(hashNode); ok {
return hn
return hn, nodes
}
return collapsed
return collapsed, nodes
case *fullNode:
hashedKids := c.commitChildren(path, cn)
hashedKids, nodes := c.commitChildren(path, cn, topmost && c.parallel)
collapsed := cn.copy()
collapsed.Children = hashedKids

hashedNode := c.store(path, collapsed)
hashedNode, wNode := c.store(path, collapsed)
if wNode != nil {
nodes = append(nodes, wNode)
}
if hn, ok := hashedNode.(hashNode); ok {
return hn
return hn, nodes
}
return collapsed
return collapsed, nodes
case hashNode:
return cn
return cn, nil
default:
// nil, valuenode shouldn't be committed
panic(fmt.Sprintf("%T: invalid node: %v", n, n))
}
}

// commitChildren commits the children of the given fullnode
func (c *committer) commitChildren(path []byte, n *fullNode) [17]node {
var children [17]node
func (c *committer) commitChildren(path []byte, n *fullNode, parallel bool) ([17]node, []*wrapNode) {
var (
wg sync.WaitGroup
children [17]node
results [16][]*wrapNode
)
for i := 0; i < 16; i++ {
child := n.Children[i]
if child == nil {
Expand All @@ -108,18 +136,35 @@ func (c *committer) commitChildren(path []byte, n *fullNode) [17]node {
// Commit the child recursively and store the "hashed" value.
// Note the returned node can be some embedded nodes, so it's
// possible the type is not hashNode.
children[i] = c.commit(append(path, byte(i)), child)
if !parallel {
children[i], results[i] = c.commit(append(path, byte(i)), child, false)
} else {
wg.Add(1)
go func(index int) {
defer wg.Done()
children[index], results[index] = c.commit(append(path, byte(index)), child, false)
}(i)
}
}
if parallel {
wg.Wait()
}
// For the 17th child, it's possible the type is valuenode.
if n.Children[16] != nil {
children[16] = n.Children[16]
}
return children
var wnodes []*wrapNode
for i := 0; i < 16; i++ {
if results[i] != nil {
wnodes = append(wnodes, results[i]...)
}
}
return children, wnodes
}

// store hashes the node n and adds it to the modified nodeset. If leaf collection
// is enabled, leaf nodes will be tracked in the modified nodeset as well.
func (c *committer) store(path []byte, n node) node {
func (c *committer) store(path []byte, n node) (node, *wrapNode) {
// Larger nodes are replaced by their hash and stored in the database.
var hash, _ = n.cache()

Expand All @@ -133,25 +178,30 @@ func (c *committer) store(path []byte, n node) node {
// deleted only if the node was existent in database before.
_, ok := c.tracer.accessList[string(path)]
if ok {
c.nodes.AddNode(path, trienode.NewDeleted())
return n, &wrapNode{
path: string(path),
node: trienode.NewDeleted(),
}
}
return n
return n, nil
}
// Collect the dirty node to nodeset for return.
nhash := common.BytesToHash(hash)
c.nodes.AddNode(path, trienode.New(nhash, nodeToBytes(n)))

wNode := &wrapNode{
path: string(path),
node: trienode.New(nhash, nodeToBytes(n)),
}
// Collect the corresponding leaf node if it's required. We don't check
// full node since it's impossible to store value in fullNode. The key
// length of leaves should be exactly same.
if c.collectLeaf {
if sn, ok := n.(*shortNode); ok {
if val, ok := sn.Val.(valueNode); ok {
c.nodes.AddLeaf(nhash, val)
wNode.leafHash = nhash
wNode.leafBlob = val
}
}
}
return hash
return hash, wNode
}

// ForGatherChildren decodes the provided node and traverses the children inside.
Expand Down
46 changes: 20 additions & 26 deletions trie/trie.go
Original file line number Diff line number Diff line change
Expand Up @@ -37,23 +37,13 @@ import (
//
// Trie is not safe for concurrent use.
type Trie struct {
root node
owner common.Hash

// Flag whether the commit operation is already performed. If so the
// trie is not usable(latest states is invisible).
committed bool

// Keep track of the number leaves which have been inserted since the last
// hashing operation. This number will not directly map to the number of
// actually unhashed nodes.
unhashed int

// reader is the handler trie can retrieve nodes from.
reader *trieReader

// tracer is the tool to track the trie changes.
tracer *tracer
root node
owner common.Hash
committed bool // The Flag whether the commit operation is already performed
reader *trieReader // The handler trie can retrieve nodes from
tracer *tracer // The tool to track the trie changes
mutate int // The number of trie mutations that have been performed
hashed int // The number of mutations that have been hashed
}

// newFlag returns the cache flag value for a newly created node.
Expand All @@ -67,9 +57,10 @@ func (t *Trie) Copy() *Trie {
root: t.root,
owner: t.owner,
committed: t.committed,
unhashed: t.unhashed,
reader: t.reader,
tracer: t.tracer.copy(),
mutate: t.mutate,
hashed: t.hashed,
}
}

Expand Down Expand Up @@ -304,11 +295,11 @@ func (t *Trie) Update(key, value []byte) error {
if t.committed {
return ErrCommitted
}
t.mutate++
return t.update(key, value)
}

func (t *Trie) update(key, value []byte) error {
t.unhashed++
k := keybytesToHex(key)
if len(value) != 0 {
_, n, err := t.insert(t.root, nil, k, valueNode(value))
Expand Down Expand Up @@ -422,7 +413,7 @@ func (t *Trie) Delete(key []byte) error {
if t.committed {
return ErrCommitted
}
t.unhashed++
t.mutate++
k := keybytesToHex(key)
_, n, err := t.delete(t.root, nil, k)
if err != nil {
Expand Down Expand Up @@ -622,7 +613,7 @@ func (t *Trie) Commit(collectLeaf bool) (common.Hash, *trienode.NodeSet) {
}
nodes := trienode.NewNodeSet(t.owner)
for _, path := range paths {
nodes.AddNode([]byte(path), trienode.NewDeleted())
nodes.AddNode(path, trienode.NewDeleted())
}
return types.EmptyRootHash, nodes // case (b)
}
Expand All @@ -640,9 +631,11 @@ func (t *Trie) Commit(collectLeaf bool) (common.Hash, *trienode.NodeSet) {
}
nodes := trienode.NewNodeSet(t.owner)
for _, path := range t.tracer.deletedNodes() {
nodes.AddNode([]byte(path), trienode.NewDeleted())
nodes.AddNode(path, trienode.NewDeleted())
}
t.root = newCommitter(nodes, t.tracer, collectLeaf).Commit(t.root)
// If the number of changes is below 100, we let one thread handle it
t.root = newCommitter(nodes, t.tracer, collectLeaf, t.mutate > 100).Commit(t.root)
t.mutate = 0
return rootHash, nodes
}

Expand All @@ -652,10 +645,10 @@ func (t *Trie) hashRoot() (node, node) {
return hashNode(types.EmptyRootHash.Bytes()), nil
}
// If the number of changes is below 100, we let one thread handle it
h := newHasher(t.unhashed >= 100)
h := newHasher(t.mutate-t.hashed >= 100)
defer func() {
returnHasherToPool(h)
t.unhashed = 0
t.hashed = t.mutate
}()
hashed, cached := h.hash(t.root, true)
return hashed, cached
Expand All @@ -677,7 +670,8 @@ func (t *Trie) Witness() map[string]struct{} {
func (t *Trie) Reset() {
t.root = nil
t.owner = common.Hash{}
t.unhashed = 0
t.tracer.reset()
t.committed = false
t.hashed = 0
t.mutate = 0
}
Loading