@@ -13,15 +13,25 @@ import
13
13
chronicles,
14
14
metrics,
15
15
chronos/ timer,
16
- std/ [strformat, strutils],
16
+ chronos,
17
+ std/ [strformat, strutils, os],
17
18
stew/ io2,
18
19
beacon_chain/ era_db,
19
20
beacon_chain/ networking/ network_metadata,
20
21
./ config,
21
22
./ common/ common,
22
23
./ core/ chain,
23
24
./ db/ era1_db,
24
- ./ utils/ era_helpers
25
+ ./ utils/ era_helpers,
26
+ eth/ common/ keys, # rng
27
+ eth/ net/ nat, # setupAddress
28
+ eth/ p2p/ discoveryv5/ protocol as discv5_protocol,
29
+ eth/ p2p/ discoveryv5/ routing_table,
30
+ eth/ p2p/ discoveryv5/ enr,
31
+ ../ fluffy/ portal_node,
32
+ ../ fluffy/ common/ common_utils, # getPersistentNetKey, getPersistentEnr
33
+ ../ fluffy/ network_metadata,
34
+ ../ fluffy/ version
25
35
26
36
declareGauge nec_import_block_number, " Latest imported block number"
27
37
@@ -87,7 +97,177 @@ template boolFlag(flags, b): PersistBlockFlags =
87
97
else :
88
98
{}
89
99
90
- proc importBlocks * (conf: NimbusConf , com: CommonRef ) =
100
+ proc run (config: NimbusConf ): PortalNode {.
101
+ raises : [CatchableError ]
102
+ .} =
103
+ let rng = newRng ()
104
+
105
+ # # Network configuration
106
+ let
107
+ bindIp = config.listenAddress
108
+ udpPort = Port (config.udpPort)
109
+ # TODO : allow for no TCP port mapping!
110
+ (extIp, _, extUdpPort) =
111
+ try :
112
+ setupAddress (config.nat, config.listenAddress, udpPort, udpPort, " portal" )
113
+ except CatchableError as exc:
114
+ raiseAssert exc.msg
115
+ # raise exc # TODO: Ideally we don't have the Exception here
116
+ except Exception as exc:
117
+ raiseAssert exc.msg
118
+ (netkey, newNetKey) =
119
+ # if config.netKey.isSome():
120
+ # (config.netKey.get(), true)
121
+ # else:
122
+ getPersistentNetKey (rng[], config.dataDir / " netkey" )
123
+
124
+ enrFilePath = config.dataDir / " nimbus_portal_node.enr"
125
+ previousEnr =
126
+ if not newNetKey:
127
+ getPersistentEnr (enrFilePath)
128
+ else :
129
+ Opt .none (enr.Record )
130
+
131
+ var bootstrapRecords: seq [Record ]
132
+ # loadBootstrapFile(string config.bootstrapNodesFile, bootstrapRecords)
133
+ # bootstrapRecords.add(config.bootstrapNodes)
134
+
135
+ # case config.network
136
+ # of PortalNetwork.none:
137
+ # discard # don't connect to any network bootstrap nodes
138
+ # of PortalNetwork.mainnet:
139
+ # for enrURI in mainnetBootstrapNodes:
140
+ # let res = enr.Record.fromURI(enrURI)
141
+ # if res.isOk():
142
+ # bootstrapRecords.add(res.value)
143
+ # of PortalNetwork.angelfood:
144
+ # for enrURI in angelfoodBootstrapNodes:
145
+ # let res = enr.Record.fromURI(enrURI)
146
+ # if res.isOk():
147
+ # bootstrapRecords.add(res.value)
148
+
149
+ # Only mainnet
150
+ for enrURI in mainnetBootstrapNodes:
151
+ let res = enr.Record .fromURI (enrURI)
152
+ if res.isOk ():
153
+ bootstrapRecords.add (res.value)
154
+
155
+ # # Discovery v5 protocol setup
156
+ let
157
+ discoveryConfig =
158
+ DiscoveryConfig .init (DefaultTableIpLimit , DefaultBucketIpLimit , DefaultBitsPerHop )
159
+ d = newProtocol (
160
+ netkey,
161
+ extIp,
162
+ Opt .none (Port ),
163
+ extUdpPort,
164
+ # Note: The addition of default clientInfo to the ENR is a temporary
165
+ # measure to easily identify & debug the clients used in the testnet.
166
+ # Might make this into a, default off, cli option.
167
+ localEnrFields = {" c" : enrClientInfoShort},
168
+ bootstrapRecords = bootstrapRecords,
169
+ previousRecord = previousEnr,
170
+ bindIp = bindIp,
171
+ bindPort = udpPort,
172
+ enrAutoUpdate = true ,
173
+ config = discoveryConfig,
174
+ rng = rng,
175
+ )
176
+
177
+ d.open ()
178
+
179
+ # # Portal node setup
180
+ let
181
+ portalProtocolConfig = PortalProtocolConfig .init (
182
+ DefaultTableIpLimit , DefaultBucketIpLimit , DefaultBitsPerHop , defaultAlpha, RadiusConfig (kind: Static , logRadius: 249 ),
183
+ defaultDisablePoke, defaultMaxGossipNodes, defaultContentCacheSize,
184
+ defaultDisableContentCache, defaultMaxConcurrentOffers
185
+ )
186
+
187
+ portalNodeConfig = PortalNodeConfig (
188
+ accumulatorFile: Opt .none (string ),
189
+ disableStateRootValidation: true ,
190
+ trustedBlockRoot: Opt .none (Digest ),
191
+ portalConfig: portalProtocolConfig,
192
+ dataDir: string config.dataDir,
193
+ storageCapacity: 0 ,
194
+ contentRequestRetries: 1
195
+ )
196
+
197
+ node = PortalNode .new (
198
+ PortalNetwork .mainnet,
199
+ portalNodeConfig,
200
+ d,
201
+ {PortalSubnetwork .history},
202
+ bootstrapRecords = bootstrapRecords,
203
+ rng = rng,
204
+ )
205
+
206
+ let enrFile = config.dataDir / " nimbus_portal_node.enr"
207
+ if io2.writeFile (enrFile, d.localNode.record.toURI ()).isErr:
208
+ fatal " Failed to write the enr file" , file = enrFile
209
+ quit 1
210
+
211
+ # # Start the Portal node.
212
+ node.start ()
213
+
214
+ node
215
+
216
+ proc getBlockLoop (node: PortalNode , blockQueue: AsyncQueue [seq [EthBlock ]], startBlock: uint64 , portalWorkers: int ): Future [void ] {.async .} =
217
+ let historyNetwork = node.historyNetwork.value ()
218
+ var blockNumber = startBlock
219
+
220
+ let blockNumberQueue = newAsyncQueue [(uint64 , uint64 )](2048 )
221
+ var blocks: seq [EthBlock ] = newSeq [EthBlock ](8192 )
222
+ var count = 0
223
+ var failureCount = 0
224
+
225
+ proc blockWorker (node: PortalNode ): Future [void ] {.async .} =
226
+ while true :
227
+ let (blockNumber, i) = await blockNumberQueue.popFirst ()
228
+ var currentBlockFailures = 0
229
+ while true :
230
+ let (header, body) = (await historyNetwork.getBlock (blockNumber + i)).valueOr:
231
+ currentBlockFailures.inc ()
232
+ if currentBlockFailures > 10 :
233
+ fatal " Block download failed too many times" , blockNumber = blockNumber + i
234
+ quit (QuitFailure )
235
+
236
+ debug " Failed to get block" , blockNumber = blockNumber + i, currentBlockFailures
237
+ failureCount.inc ()
238
+ continue
239
+
240
+ blocks[i] = init (EthBlock , header, body)
241
+ count.inc ()
242
+
243
+ break
244
+
245
+ var workers: seq [Future [void ]] = @ []
246
+ for i in 0 ..< portalWorkers:
247
+ workers.add node.blockWorker ()
248
+
249
+ while true :
250
+ blocks = newSeq [EthBlock ](8192 )
251
+ count = 0
252
+ failureCount = 0
253
+ info " Downloading 8192 blocks" , startBlock = blockNumber
254
+ let t0 = Moment .now ()
255
+ for i in 0 .. 8191 'u64 :
256
+ await blockNumberQueue.addLast ((blockNumber, i))
257
+
258
+ # Not great :/
259
+ while count != 8192 :
260
+ await sleepAsync (10 .milliseconds)
261
+ let t1 = Moment .now ()
262
+ let diff = (t1 - t0).nanoseconds ().float / 1000000000
263
+ let bps = 8192 .float / diff
264
+
265
+ info " Finished downloading 8192 blocks" , startBlock = blockNumber, bps = bps, failureCount = failureCount
266
+ await blockQueue.addLast (blocks)
267
+
268
+ blockNumber += 8192
269
+
270
+ proc importBlocks * (conf: NimbusConf , com: CommonRef , node: PortalNode , blockQueue: AsyncQueue [seq [EthBlock ]]) {.async .} =
91
271
proc controlCHandler () {.noconv .} =
92
272
when defined (windows):
93
273
# workaround for https://github.com/nim-lang/Nim/issues/4057
@@ -119,7 +299,7 @@ proc importBlocks*(conf: NimbusConf, com: CommonRef) =
119
299
boolFlag (NoPersistBodies , not conf.storeBodies) +
120
300
boolFlag ({PersistBlockFlag .NoPersistReceipts }, not conf.storeReceipts) +
121
301
boolFlag ({PersistBlockFlag .NoPersistSlotHashes }, not conf.storeSlotHashes)
122
- blk: Block
302
+ blk: blocks. Block
123
303
persister = Persister .init (chain, flags)
124
304
cstats: PersistStats # stats at start of chunk
125
305
@@ -293,11 +473,19 @@ proc importBlocks*(conf: NimbusConf, com: CommonRef) =
293
473
294
474
while running and persister.stats.blocks.uint64 < conf.maxBlocks and
295
475
blockNumber <= lastEra1Block:
296
- if not loadEraBlock (blockNumber):
297
- notice " No more `era1` blocks to import" , blockNumber, slot
298
- break
299
- persistBlock ()
300
- checkpoint ()
476
+ if not conf.usePortal:
477
+ if not loadEraBlock (blockNumber):
478
+ notice " No more `era1` blocks to import" , blockNumber, slot
479
+ break
480
+ persistBlock ()
481
+ checkpoint ()
482
+ else :
483
+ let blockSeq = await blockQueue.popFirst ()
484
+ for blck in blockSeq:
485
+ blk = blck
486
+ persistBlock ()
487
+ checkpoint ()
488
+ # debugEcho "blck:" & $blck.header.number
301
489
302
490
block era1Import:
303
491
if blockNumber > lastEra1Block:
@@ -369,3 +557,22 @@ proc importBlocks*(conf: NimbusConf, com: CommonRef) =
369
557
blocks = persister.stats.blocks,
370
558
txs = persister.stats.txs,
371
559
mgas = f (persister.stats.gas.float / 1000000 )
560
+
561
+ proc importBlocksPortal * (conf: NimbusConf , com: CommonRef ) {.
562
+ raises : [CatchableError ]
563
+ .} =
564
+ let
565
+ portalNode = run (conf)
566
+ blockQueue = newAsyncQueue [seq [EthBlock ]](4 )
567
+ start = com.db.baseTxFrame ().getSavedStateBlockNumber () + 1
568
+
569
+ if conf.usePortal:
570
+ asyncSpawn portalNode.getBlockLoop (blockQueue, start, conf.portalWorkers)
571
+
572
+ asyncSpawn importBlocks (conf, com, portalNode, blockQueue)
573
+
574
+ while running:
575
+ try :
576
+ poll ()
577
+ except CatchableError as e:
578
+ warn " Exception in poll()" , exc = e.name, err = e.msg
0 commit comments