33
33
} ,
34
34
std:: {
35
35
borrow:: Cow ,
36
- collections:: { BTreeMap , HashMap } ,
36
+ collections:: { BTreeMap , BTreeSet , HashMap } ,
37
37
sync:: {
38
38
atomic:: { AtomicBool , Ordering } ,
39
39
Arc ,
@@ -783,12 +783,19 @@ impl SolanaRpc {
783
783
hash,
784
784
time,
785
785
height,
786
- parent_slot : _parent_slot ,
786
+ parent_slot,
787
787
parent_hash : _parent_hash,
788
788
transactions,
789
789
} => {
790
- let info =
791
- StreamsSlotInfo :: new ( leader, slot, hash, time, height, transactions) ;
790
+ let info = StreamsSlotInfo :: new (
791
+ leader,
792
+ slot,
793
+ parent_slot,
794
+ hash,
795
+ time,
796
+ height,
797
+ transactions,
798
+ ) ;
792
799
let _ = streams_tx. send ( Arc :: new ( StreamsUpdateMessage :: Slot { info } ) ) ;
793
800
}
794
801
} ,
@@ -832,13 +839,13 @@ impl SolanaRpc {
832
839
hash,
833
840
time,
834
841
height,
835
- parent_slot: _parent_slot ,
842
+ parent_slot,
836
843
parent_hash: _parent_hash,
837
844
transactions,
838
845
} => {
839
- latest_blockhash_storage. push_block( slot, height, hash) ;
846
+ latest_blockhash_storage. push_block( slot, parent_slot , height, hash) ;
840
847
841
- let info = StreamsSlotInfo :: new( leader, slot, hash, time, height, transactions) ;
848
+ let info = StreamsSlotInfo :: new( leader, slot, parent_slot , hash, time, height, transactions) ;
842
849
slots_info. insert( slot, info. clone( ) ) ;
843
850
while slots_info. len( ) > MAX_NUM_RECENT_SLOT_INFO {
844
851
slots_info. pop_first( ) ;
@@ -923,37 +930,17 @@ impl SolanaRpc {
923
930
SolanaRpc :: internal_error_with_data ( "no slot" ) ,
924
931
) ;
925
932
} ;
926
- if rollback > 0 {
927
- let Some ( first_available_slot) =
928
- latest_blockhash_storage. slots . keys ( ) . next ( ) . copied ( )
933
+ for _ in 0 ..rollback {
934
+ let Some ( parent_value) = latest_blockhash_storage. slots . get ( & value. parent )
929
935
else {
930
936
return Self :: create_failure (
931
937
jsonrpc,
932
938
id,
933
- JsonrpcError :: invalid_params ( "empty slots storage" ) ,
939
+ JsonrpcError :: invalid_params ( "not enought slots in the storage" ) ,
934
940
) ;
935
941
} ;
936
-
937
- for _ in 0 ..rollback {
938
- loop {
939
- slot -= 1 ;
940
-
941
- if let Some ( prev_value) = latest_blockhash_storage. slots . get ( & slot) {
942
- if prev_value. height + 1 == value. height {
943
- value = prev_value;
944
- break ;
945
- }
946
- } else if slot < first_available_slot {
947
- return Self :: create_failure (
948
- jsonrpc,
949
- id,
950
- JsonrpcError :: invalid_params (
951
- "not enought slots in the storage" ,
952
- ) ,
953
- ) ;
954
- }
955
- }
956
- }
942
+ slot = value. parent ;
943
+ value = parent_value;
957
944
}
958
945
959
946
Self :: create_success2 (
@@ -1154,60 +1141,138 @@ enum RpcRequest {
1154
1141
1155
1142
#[ derive( Debug , Default ) ]
1156
1143
struct LatestBlockhashStorage {
1157
- slots : BTreeMap < Slot , LatestBlockhashSlot > ,
1158
- finalized_total : usize ,
1159
1144
slot_processed : Slot ,
1160
1145
slot_confirmed : Slot ,
1161
1146
slot_finalized : Slot ,
1147
+ confirmed : BTreeSet < Slot > ,
1148
+ finalized : BTreeSet < Slot > ,
1149
+ slots : BTreeMap < Slot , LatestBlockhashSlot > ,
1162
1150
}
1163
1151
1164
1152
impl LatestBlockhashStorage {
1165
- fn push_block ( & mut self , slot : Slot , height : Slot , hash : Hash ) {
1153
+ fn update_slots ( & mut self ) {
1154
+ for ( slot, entry) in self . slots . iter ( ) . rev ( ) {
1155
+ match entry. commitment {
1156
+ CommitmentLevel :: Processed => {
1157
+ self . slot_processed = self . slot_processed . max ( * slot) ;
1158
+ }
1159
+ CommitmentLevel :: Confirmed => {
1160
+ self . slot_confirmed = self . slot_confirmed . max ( * slot) ;
1161
+ }
1162
+ CommitmentLevel :: Finalized => {
1163
+ self . slot_finalized = self . slot_finalized . max ( * slot) ;
1164
+ break ;
1165
+ }
1166
+ }
1167
+ }
1168
+
1169
+ // in case of epoch change with a lot of forks we can receive confirmed directly
1170
+ // and processed would be behind, so we take `max(processed, confirmed)`
1171
+ self . slot_processed = self . slot_processed . max ( self . slot_confirmed ) ;
1172
+ }
1173
+
1174
+ fn push_block ( & mut self , slot : Slot , parent : Slot , height : Slot , hash : Hash ) {
1175
+ // in case if slot status was received before block
1176
+ let mut commitment = CommitmentLevel :: Processed ;
1177
+ if self . confirmed . contains ( & slot) {
1178
+ commitment = CommitmentLevel :: Confirmed ;
1179
+ }
1180
+ if self . finalized . contains ( & slot) {
1181
+ commitment = CommitmentLevel :: Finalized ;
1182
+ }
1166
1183
self . slots . insert (
1167
1184
slot,
1168
1185
LatestBlockhashSlot {
1169
- hash,
1186
+ commitment,
1187
+ parent,
1170
1188
height,
1171
- commitment : CommitmentLevel :: Processed ,
1189
+ hash ,
1172
1190
} ,
1173
1191
) ;
1192
+
1193
+ // keep slots under the limit (based only on finalized count)
1194
+ let slots_to_remove = self
1195
+ . slots
1196
+ . values ( )
1197
+ . filter ( |entry| entry. commitment == CommitmentLevel :: Finalized )
1198
+ . count ( )
1199
+ . checked_sub ( MAX_PROCESSING_AGE + 10 )
1200
+ . unwrap_or_default ( ) ;
1201
+ for _ in 0 ..slots_to_remove {
1202
+ while let Some ( ( _slot, value) ) = self . slots . pop_first ( ) {
1203
+ if value. commitment == CommitmentLevel :: Finalized {
1204
+ break ;
1205
+ }
1206
+ }
1207
+ }
1208
+
1209
+ // update tips
1210
+ self . update_slots ( ) ;
1174
1211
}
1175
1212
1176
1213
fn update_commitment ( & mut self , slot : Slot , commitment : CommitmentLevel ) {
1177
- if let Some ( value) = self . slots . get_mut ( & slot) {
1178
- value. commitment = commitment;
1179
-
1180
- if commitment == CommitmentLevel :: Processed && slot > self . slot_processed {
1181
- self . slot_processed = slot;
1182
- } else if commitment == CommitmentLevel :: Confirmed {
1183
- self . slot_confirmed = slot;
1184
- } else if commitment == CommitmentLevel :: Finalized {
1185
- self . finalized_total += 1 ;
1186
- self . slot_finalized = slot;
1187
- }
1214
+ // save commitment
1215
+ if commitment == CommitmentLevel :: Confirmed {
1216
+ self . confirmed . insert ( slot) ;
1217
+ } else if commitment == CommitmentLevel :: Finalized {
1218
+ self . confirmed . insert ( slot) ;
1219
+ self . finalized . insert ( slot) ;
1220
+ }
1221
+ while self . confirmed . len ( ) > MAX_PROCESSING_AGE {
1222
+ self . confirmed . pop_first ( ) ;
1223
+ }
1224
+ while self . finalized . len ( ) > MAX_PROCESSING_AGE {
1225
+ self . finalized . pop_first ( ) ;
1188
1226
}
1189
1227
1190
- while self . finalized_total > MAX_PROCESSING_AGE + 10 {
1191
- if let Some ( ( _slot, value) ) = self . slots . pop_first ( ) {
1192
- if value. commitment == CommitmentLevel :: Finalized {
1193
- self . finalized_total -= 1 ;
1228
+ // update current and all slots before
1229
+ if let Some ( value) = self . slots . get_mut ( & slot) {
1230
+ value. commitment = value. commitment . max ( commitment) ;
1231
+
1232
+ let mut parent_slot = value. parent ;
1233
+ if commitment == CommitmentLevel :: Confirmed {
1234
+ loop {
1235
+ if let Some ( value) = self . slots . get_mut ( & parent_slot) {
1236
+ if value. commitment == CommitmentLevel :: Processed {
1237
+ value. commitment = CommitmentLevel :: Confirmed ;
1238
+ parent_slot = value. parent ;
1239
+ continue ;
1240
+ }
1241
+ }
1242
+ break ;
1243
+ }
1244
+ } else if commitment == CommitmentLevel :: Finalized {
1245
+ loop {
1246
+ if let Some ( value) = self . slots . get_mut ( & parent_slot) {
1247
+ if value. commitment != CommitmentLevel :: Finalized {
1248
+ value. commitment = CommitmentLevel :: Finalized ;
1249
+ parent_slot = value. parent ;
1250
+ continue ;
1251
+ }
1252
+ }
1253
+ break ;
1194
1254
}
1195
1255
}
1196
1256
}
1257
+
1258
+ // update tips
1259
+ self . update_slots ( ) ;
1197
1260
}
1198
1261
}
1199
1262
1200
1263
#[ derive( Debug ) ]
1201
1264
struct LatestBlockhashSlot {
1202
- hash : Hash ,
1203
- height : Slot ,
1204
1265
commitment : CommitmentLevel ,
1266
+ parent : Slot ,
1267
+ height : Slot ,
1268
+ hash : Hash ,
1205
1269
}
1206
1270
1207
1271
#[ derive( Debug , Clone ) ]
1208
1272
struct StreamsSlotInfo {
1209
1273
leader : Option < Pubkey > ,
1210
1274
slot : Slot ,
1275
+ parent_slot : Slot ,
1211
1276
commitment : CommitmentLevel ,
1212
1277
hash : Hash ,
1213
1278
time : UnixTimestamp ,
@@ -1223,6 +1288,7 @@ impl StreamsSlotInfo {
1223
1288
fn new (
1224
1289
leader : Option < Pubkey > ,
1225
1290
slot : Slot ,
1291
+ parent_slot : Slot ,
1226
1292
hash : Hash ,
1227
1293
time : UnixTimestamp ,
1228
1294
height : Slot ,
@@ -1242,6 +1308,7 @@ impl StreamsSlotInfo {
1242
1308
Self {
1243
1309
leader,
1244
1310
slot,
1311
+ parent_slot,
1245
1312
commitment : CommitmentLevel :: Processed ,
1246
1313
hash,
1247
1314
time,
@@ -1292,6 +1359,7 @@ impl StreamsSlotInfo {
1292
1359
SlotsSubscribeOutput :: Slot {
1293
1360
leader : self . leader . map ( |pk| pk. to_string ( ) ) . unwrap_or_default ( ) ,
1294
1361
slot : self . slot ,
1362
+ parent_slot : self . parent_slot ,
1295
1363
commitment : self . commitment ,
1296
1364
hash : self . hash . to_string ( ) ,
1297
1365
time : self . time ,
@@ -1467,6 +1535,7 @@ enum SlotsSubscribeOutput {
1467
1535
Slot {
1468
1536
leader : String ,
1469
1537
slot : Slot ,
1538
+ parent_slot : Slot ,
1470
1539
commitment : CommitmentLevel ,
1471
1540
hash : String ,
1472
1541
time : UnixTimestamp ,
0 commit comments