diff --git a/lib/off_broadway/emqqt/broker.ex b/lib/off_broadway/emqqt/broker.ex index 349a764..df569cd 100644 --- a/lib/off_broadway/emqqt/broker.ex +++ b/lib/off_broadway/emqqt/broker.ex @@ -40,7 +40,14 @@ defmodule OffBroadway.EMQTT.Broker do # Create a public ETS table to act as message buffer. It needs to be public # because the Producer process will read directly from it to avoid copying # the content across processes. - :ets.new(state.ets_table, [:ordered_set, :named_table, :public]) + :ets.new(state.ets_table, [ + :ordered_set, + :named_table, + :public, + {:read_concurrency, true}, + {:write_concurrency, true} + ]) + {:noreply, state, {:continue, :subscribe_to_topics}} end