-
Notifications
You must be signed in to change notification settings - Fork 0
/
Copy pathnode.go
196 lines (158 loc) · 4.41 KB
/
node.go
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
package main
import (
"context"
"errors"
"os"
"reflect"
"time"
peer "gx/ipfs/QmZcUPvPhD1Xvk6mwijYF8AfR3mG31S1YsEfHG4khrFPRr/go-libp2p-peer"
"gx/ipfs/QmQa2wf1sLFKkjHCVEbna8y5qhdMjL8vtTJSAc48vZGTer/go-ipfs/core"
"gx/ipfs/QmQa2wf1sLFKkjHCVEbna8y5qhdMjL8vtTJSAc48vZGTer/go-ipfs/core/corenet"
"gx/ipfs/QmQa2wf1sLFKkjHCVEbna8y5qhdMjL8vtTJSAc48vZGTer/go-ipfs/repo/config"
"gx/ipfs/QmQa2wf1sLFKkjHCVEbna8y5qhdMjL8vtTJSAc48vZGTer/go-ipfs/repo/fsrepo"
)
const (
// nBitsForKeypair sets the strength of keypair
nBitsForKeypair = 2048
// BootstrapPeerID is the peer id of agora's bootstrap node
BootstrapPeerID = "QmdtfJBMitotUWBX5YZ6rYeaYRFu6zfXXMZP6fygEWK2iu"
// BootstrapMultiAddr is the ipfs address of agora's bootstrap node
BootstrapMultiAddr = "/ip4/54.178.171.10/tcp/4001/ipfs/" + BootstrapPeerID
)
func monitorPeers(n *Node) {
go func() {
for {
printPeers(n.IpfsNode)
time.Sleep(2 * time.Second)
}
}()
}
func printPeers(n *core.IpfsNode) {
conns := n.PeerHost.Network().Conns()
Info.Println("---- PeerList")
for _, c := range conns {
pid := c.RemotePeer()
addr := c.RemoteMultiaddr()
Info.Println(pid, "\t", addr, "\t", n.Peerstore.LatencyEWMA(pid))
}
}
// Node provides an abstraction for IpfsNode and is the prefered way
// of accessing Nodes in our application. Note that IpfsNode is an
// embedded type.
type Node struct {
*core.IpfsNode
*Model
ID string
cancel context.CancelFunc
}
// NewNode creates a new Node from an existing node repository
func NewNode(path string) (*Node, error) {
// Open and check node repository
r, err := fsrepo.Open(path)
if err != nil {
return nil, err
}
// Run Node
cfg := &core.BuildCfg{
Repo: r,
Online: true,
}
ctx, cancel := context.WithCancel(context.Background())
node, err := core.NewNode(ctx, cfg)
if err != nil {
cancel()
return nil, err
}
// Open Node's DB Instance
db, err := OpenDB(path + "/agora.db")
if err != nil {
cancel()
return nil, err
}
return &Node{
IpfsNode: node,
Model: db,
ID: node.Identity.Pretty(),
cancel: cancel,
}, nil
}
// CreateNodeIfNotExists creates a new ipfs repo at given location if
// needed and returns an agora node istance
func CreateNodeIfNotExists(path string) (*Node, error) {
if !Exists(path) {
err := NewNodeRepo(path, nil)
if err != nil {
return nil, err
}
}
return NewNode(MyNodePath)
}
var ErrSkipBlacklisted = errors.New("Peer is blacklisted, will skip this request")
// Request is the generalized method to connect to another peer and
// send requests and receive responses. This is used by Client defined
// in peerapi.go and should not be used directly.
func (n *Client) Request(targetPeer string, path string, body interface{}, resp interface{}) error {
// @TODO not the most elegant solution due to time pressure
isBlacklist, err := n.Node.IsBlacklisted(targetPeer)
if err != nil {
return err
}
if isBlacklist {
return ErrSkipBlacklisted
}
// Check if Node hash is valid
target, err := peer.IDB58Decode(targetPeer)
if err != nil {
return err
}
// Connect to targetPeer
stream, err := corenet.Dial(n.Node.IpfsNode, target, path)
if err != nil {
return err
}
// This gives you a warning if you accidentially send a
// pointer instead of the struct as body, note that the
// warning will not stop the transaction
if reflect.ValueOf(resp).Kind() != reflect.Ptr {
Warning.Println("You must pass resp by &reference and not by value. This is not done for a request to", targetPeer, path)
}
// Exchange request and response
WriteJSON(stream, &body)
ReadJSON(stream, &resp)
return nil
}
// NewNodeRepo will create a new data and configuration folder for a
// new IPFS node at the provided location
func NewNodeRepo(repoRoot string, addr *config.Addresses) error {
err := os.MkdirAll(repoRoot, 0755)
if err != nil {
return err
}
if fsrepo.IsInitialized(repoRoot) {
return errors.New("Repo already exists")
}
conf, err := config.Init(os.Stdout, nBitsForKeypair)
if err != nil {
return err
}
if addr != nil {
conf.Addresses = *addr
}
own, err := config.ParseBootstrapPeer(BootstrapMultiAddr)
if err != nil {
return err
}
defaults, err := config.DefaultBootstrapPeers()
if err != nil {
return err
}
bps := []config.BootstrapPeer{own}
bps = append(bps, defaults...)
// Add our own bootstrap node
conf.SetBootstrapPeers(bps)
fsrepo.Init(repoRoot, conf)
if err != nil {
return err
}
return initializeIpnsKeyspace(repoRoot)
}