Skip to content
This repository has been archived by the owner on Mar 5, 2024. It is now read-only.

Commit

Permalink
add tag for the final batch of blocks in a sync
Browse files Browse the repository at this point in the history
deduplicate block send code
update rebar.lock depends on helium/proto#125
  • Loading branch information
joecaswell committed Mar 15, 2022
1 parent 9433396 commit 02933c2
Show file tree
Hide file tree
Showing 2 changed files with 34 additions and 43 deletions.
2 changes: 1 addition & 1 deletion rebar.lock
Original file line number Diff line number Diff line change
Expand Up @@ -71,7 +71,7 @@
{<<"hackney">>,{pkg,<<"hackney">>,<<"1.18.0">>},0},
{<<"helium_proto">>,
{git,"https://github.com/helium/proto.git",
{ref,"30f17c5d1a7942297923f4e743c681c46f917fc3"}},
{ref,"4e1e0d97358d736ce861ef8d8d4a52eeb6ae9a4b"}},
0},
{<<"hpack">>,{pkg,<<"hpack_erl">>,<<"0.2.3">>},2},
{<<"idna">>,{pkg,<<"idna">>,<<"6.1.1">>},1},
Expand Down
75 changes: 33 additions & 42 deletions src/handlers/blockchain_sync_handler.erl
Original file line number Diff line number Diff line change
Expand Up @@ -125,7 +125,7 @@ handle_data(client, Data0, #state{blockchain=Chain, path=Path, gossiped_hash=Gos
?SYNC_PROTOCOL_V1 -> Data0;
?SYNC_PROTOCOL_V2 -> zlib:uncompress(Data0)
end,
#blockchain_sync_blocks_pb{blocks=BinBlocks} =
#blockchain_sync_blocks_pb{final=Final, blocks=BinBlocks} =
blockchain_sync_handler_pb:decode_msg(Data, blockchain_sync_blocks_pb),

Blocks = [blockchain_block:deserialize(B) || B <- BinBlocks],
Expand All @@ -139,8 +139,12 @@ handle_data(client, Data0, #state{blockchain=Chain, path=Path, gossiped_hash=Gos
%% nothing was plausible, see if it has anything else
{noreply, State, blockchain_sync_handler_pb:encode_msg(#blockchain_sync_req_pb{msg={response, true}})};
HighestPlausible ->
lager:info("Eagerly re-gossiping ~p", [blockchain_block:height(HighestPlausible)]),
blockchain_gossip_handler:regossip_block(HighestPlausible, SwarmTID),
case Final of
true ->
lager:info("Eagerly re-gossiping ~p", [blockchain_block:height(HighestPlausible)]),
blockchain_gossip_handler:regossip_block(HighestPlausible, SwarmTID);
false -> ok
end,
%% do this in a spawn so that the connection dying does not stop adding blocks
{Pid, Ref} = spawn_monitor(fun() ->
%% this will check any plausible blocks we have and add them to the chain if possible
Expand All @@ -154,54 +158,22 @@ handle_data(client, Data0, #state{blockchain=Chain, path=Path, gossiped_hash=Gos
{stop, normal, State, blockchain_sync_handler_pb:encode_msg(#blockchain_sync_req_pb{msg={response, false}})}
end
end;

handle_data(server, Data, #state{blockchain=Blockchain, batch_size=BatchSize,
batches_sent=Sent, batch_limit=Limit,
path=Path, requested=StRequested}=State) ->
requested=StRequested}=State) ->
case blockchain_sync_handler_pb:decode_msg(Data, blockchain_sync_req_pb) of
#blockchain_sync_req_pb{msg={hash,
#blockchain_sync_hash_pb{hash = Hash,
heights = Requested}}} ->
{Blocks, Requested1} =
build_blocks(Requested, Hash, Blockchain, BatchSize),
case Blocks of
[] ->
{stop, normal, State};
[_|_] ->
Msg = mk_msg(Blocks, Path),
case Requested1 == [] andalso Requested /= [] of
true ->
{stop, normal, State, Msg};
_ ->
lager:info("sending blocks ~p to sync peer", [element(1, lists:unzip(Blocks))]),
{LastHeight, _LastBlock} = lists:last(Blocks),
{noreply, State#state{batches_sent=Sent+1,
last_block_height=LastHeight,
requested = Requested1},
Msg}
end
end;
maybe_send_blocks(Blocks, Requested1, Requested, State);
#blockchain_sync_req_pb{msg={response, true}} when Sent < Limit, State#state.last_block_height /= undefined ->
StartingBlockHeight = State#state.last_block_height,
{Blocks, Requested1} =
build_blocks(StRequested, StartingBlockHeight, Blockchain, BatchSize),
case Blocks of
[] ->
{stop, normal, State};
_ ->
Msg = mk_msg(Blocks, Path),
case Requested1 == [] andalso StRequested /= [] of
true ->
{stop, normal, State, Msg};
_ ->
lager:info("sending blocks ~p to sync peer", [element(1, lists:unzip(Blocks))]),
{LastHeight, _LastBlock} = lists:last(Blocks),
{noreply, State#state{batches_sent=Sent+1,
last_block_height=LastHeight,
requested = Requested1},
Msg}
end
end;
maybe_send_blocks(Blocks, Requested1, StRequested, State);
_ ->
%% ack was false, block was undefined, limit was hit or the message was not understood
{stop, normal, State}
Expand Down Expand Up @@ -265,11 +237,30 @@ build_blocks(R, Hash, Blockchain, BatchSize) when is_list(R) ->
{Blocks ++ ExtraBlocks,
R -- R2}.

mk_msg(Blocks, Path) ->
Msg1 = #blockchain_sync_blocks_pb{blocks= [B || {_H, B} <- Blocks]},
-spec maybe_send_blocks(list(), list(), list(), #state{}) -> {stop, normal, #state{}} |
{stop, normal, #state{}, binary()} |
{noreply, #state{}, binary()}.
maybe_send_blocks([], _, _, State) ->
{stop, normal, State};
maybe_send_blocks(BlockTuples, Requested, OldRequested,#state{path=Path, batches_sent=Sent} = State) ->
Final = Requested == [] andalso OldRequested /= [],
{Heights, Blocks} = lists:unzip(BlockTuples),
lager:info("sending blocks ~p to sync peer", [Heights]),
Msg1 = #blockchain_sync_blocks_pb{final=Final, blocks=Blocks},
Msg0 = blockchain_sync_handler_pb:encode_msg(Msg1),
Msg = case Path of
?SYNC_PROTOCOL_V1 -> Msg0;
?SYNC_PROTOCOL_V2 -> zlib:compress(Msg0)
end,
Msg.
case Final of
true ->
{stop, normal, State, Msg};
false ->
{LastHeight, _LastBlock} = lists:last(BlockTuples),
{noreply, State#state{batches_sent=Sent+1,
last_block_height=LastHeight,
requested = Requested},
Msg}
end.


0 comments on commit 02933c2

Please sign in to comment.