Skip to content
This repository has been archived by the owner on Mar 5, 2024. It is now read-only.

Commit

Permalink
add tag the final batch of block in an sync
Browse files Browse the repository at this point in the history
deduplicate block send code
update rebar.lock depends on helium/proto#125
  • Loading branch information
joecaswell committed Mar 15, 2022
1 parent edf69ec commit e5693e2
Show file tree
Hide file tree
Showing 2 changed files with 34 additions and 43 deletions.
2 changes: 1 addition & 1 deletion rebar.lock
Original file line number Diff line number Diff line change
Expand Up @@ -71,7 +71,7 @@
{<<"hackney">>,{pkg,<<"hackney">>,<<"1.18.0">>},0},
{<<"helium_proto">>,
{git,"https://github.com/helium/proto.git",
{ref,"30f17c5d1a7942297923f4e743c681c46f917fc3"}},
{ref,"4e1e0d97358d736ce861ef8d8d4a52eeb6ae9a4b"}},
0},
{<<"hpack">>,{pkg,<<"hpack_erl">>,<<"0.2.3">>},2},
{<<"idna">>,{pkg,<<"idna">>,<<"6.1.1">>},1},
Expand Down
75 changes: 33 additions & 42 deletions src/handlers/blockchain_sync_handler.erl
Original file line number Diff line number Diff line change
Expand Up @@ -125,7 +125,7 @@ handle_data(client, Data0, #state{blockchain=Chain, path=Path, gossiped_hash=Gos
?SYNC_PROTOCOL_V1 -> Data0;
?SYNC_PROTOCOL_V2 -> zlib:uncompress(Data0)
end,
#blockchain_sync_blocks_pb{blocks=BinBlocks} =
#blockchain_sync_blocks_pb{final=Final, blocks=BinBlocks} =
blockchain_sync_handler_pb:decode_msg(Data, blockchain_sync_blocks_pb),

Blocks = [blockchain_block:deserialize(B) || B <- BinBlocks],
Expand All @@ -139,8 +139,12 @@ handle_data(client, Data0, #state{blockchain=Chain, path=Path, gossiped_hash=Gos
%% nothing was plausible, see if it has anything else
{noreply, State, blockchain_sync_handler_pb:encode_msg(#blockchain_sync_req_pb{msg={response, true}})};
HighestPlausible ->
lager:info("Eagerly re-gossiping ~p", [blockchain_block:height(HighestPlausible)]),
blockchain_gossip_handler:regossip_block(HighestPlausible, SwarmTID),
case Final of
true ->
lager:info("Eagerly re-gossiping ~p", [blockchain_block:height(HighestPlausible)]),
blockchain_gossip_handler:regossip_block(HighestPlausible, SwarmTID);
false -> ok
end,
%% do this in a spawn so that the connection dying does not stop adding blocks
{Pid, Ref} = spawn_monitor(fun() ->
%% this will check any plausible blocks we have and add them to the chain if possible
Expand All @@ -154,54 +158,22 @@ handle_data(client, Data0, #state{blockchain=Chain, path=Path, gossiped_hash=Gos
{stop, normal, State, blockchain_sync_handler_pb:encode_msg(#blockchain_sync_req_pb{msg={response, false}})}
end
end;

handle_data(server, Data, #state{blockchain=Blockchain, batch_size=BatchSize,
batches_sent=Sent, batch_limit=Limit,
path=Path, requested=StRequested}=State) ->
requested=StRequested}=State) ->
case blockchain_sync_handler_pb:decode_msg(Data, blockchain_sync_req_pb) of
#blockchain_sync_req_pb{msg={hash,
#blockchain_sync_hash_pb{hash = Hash,
heights = Requested}}} ->
{Blocks, Requested1} =
build_blocks(Requested, Hash, Blockchain, BatchSize),
case Blocks of
[] ->
{stop, normal, State};
[_|_] ->
Msg = mk_msg(Blocks, Path),
case Requested1 == [] andalso Requested /= [] of
true ->
{stop, normal, State, Msg};
_ ->
lager:info("sending blocks ~p to sync peer", [element(1, lists:unzip(Blocks))]),
{LastHeight, _LastBlock} = lists:last(Blocks),
{noreply, State#state{batches_sent=Sent+1,
last_block_height=LastHeight,
requested = Requested1},
Msg}
end
end;
maybe_send_blocks(Blocks, Requested1, Requested, State);
#blockchain_sync_req_pb{msg={response, true}} when Sent < Limit, State#state.last_block_height /= undefined ->
StartingBlockHeight = State#state.last_block_height,
{Blocks, Requested1} =
build_blocks(StRequested, StartingBlockHeight, Blockchain, BatchSize),
case Blocks of
[] ->
{stop, normal, State};
_ ->
Msg = mk_msg(Blocks, Path),
case Requested1 == [] andalso StRequested /= [] of
true ->
{stop, normal, State, Msg};
_ ->
lager:info("sending blocks ~p to sync peer", [element(1, lists:unzip(Blocks))]),
{LastHeight, _LastBlock} = lists:last(Blocks),
{noreply, State#state{batches_sent=Sent+1,
last_block_height=LastHeight,
requested = Requested1},
Msg}
end
end;
maybe_send_blocks(Blocks, Requested1, StRequested, State);
_ ->
%% ack was false, block was undefined, limit was hit or the message was not understood
{stop, normal, State}
Expand Down Expand Up @@ -265,11 +237,30 @@ build_blocks(R, Hash, Blockchain, BatchSize) when is_list(R) ->
{Blocks ++ ExtraBlocks,
R -- R2}.

mk_msg(Blocks, Path) ->
Msg1 = #blockchain_sync_blocks_pb{blocks= [B || {_H, B} <- Blocks]},
-spec maybe_send_blocks(list(), list(), list(), #state{}) -> {stop, normal, #state{}} |
{stop, normal, #state{}, binary()} |
{noreply, #state{}, binary()}.
maybe_send_blocks([], _, _, State) ->
{stop, normal, State};
maybe_send_blocks(BlockTuples, Requested, OldRequested,#state{path=Path, batches_sent=Sent} = State) ->
Final = Requested == [] andalso OldRequested /= [],
{Heights, Blocks} = lists:unzip(BlockTuples),
lager:info("sending blocks ~p to sync peer", [Heights]),
Msg1 = #blockchain_sync_blocks_pb{final=Final, blocks=Blocks},
Msg0 = blockchain_sync_handler_pb:encode_msg(Msg1),
Msg = case Path of
?SYNC_PROTOCOL_V1 -> Msg0;
?SYNC_PROTOCOL_V2 -> zlib:compress(Msg0)
end,
Msg.
case Final of
true ->
{stop, normal, State, Msg};
false ->
{LastHeight, _LastBlock} = lists:last(BlockTuples),
{noreply, State#state{batches_sent=Sent+1,
last_block_height=LastHeight,
requested = Requested},
Msg}
end.


0 comments on commit e5693e2

Please sign in to comment.