Skip to content

Commit d20d791

Browse files
committed
Optionally queue outgoing data
Support queueing outgoing stanzas and stream management elements for up to a configurable number of milliseconds (with a configurable queue size limit). This allows for batching up multiple XML elements into a single TCP packet in order to reduce the TCP/IP overhead.
1 parent a90c53e commit d20d791

File tree

3 files changed

+242
-46
lines changed

3 files changed

+242
-46
lines changed

src/xmpp_socket.erl

+7
Original file line numberDiff line numberDiff line change
@@ -28,6 +28,7 @@
2828
compress/1,
2929
compress/2,
3030
reset_stream/1,
31+
send_elements/2,
3132
send_element/2,
3233
send_header/2,
3334
send_trailer/1,
@@ -196,6 +197,12 @@ reset_stream(#socket_state{xml_stream = XMLStream,
196197
SocketData#socket_state{socket = Socket1}
197198
end.
198199

200+
-spec send_elements(socket_state(), [fxml:xmlel()]) -> ok | {error, inet:posix()}.
201+
send_elements(#socket_state{xml_stream = undefined}, _Els) ->
202+
erlang:error(not_implemented);
203+
send_elements(SocketData, Els) ->
204+
send(SocketData, list_to_binary([fxml:element_to_binary(El) || El <- Els])).
205+
199206
-spec send_element(socket_state(), fxml:xmlel()) -> ok | {error, inet:posix()}.
200207
send_element(#socket_state{xml_stream = undefined} = SocketData, El) ->
201208
send_xml(SocketData, {xmlstreamelement, El});

src/xmpp_stream_in.erl

+121-24
Original file line numberDiff line numberDiff line change
@@ -25,8 +25,8 @@
2525
%% API
2626
-export([start/3, start_link/3, call/3, cast/2, reply/2, stop/1, stop_async/1,
2727
accept/1, send/2, close/1, close/2, send_error/3, establish/1,
28-
get_transport/1, change_shaper/2, set_timeout/2, format_error/1,
29-
send_ws_ping/1]).
28+
get_transport/1, change_shaper/2, configure_queue/3, set_timeout/2,
29+
format_error/1, send_ws_ping/1]).
3030

3131
%% gen_server callbacks
3232
-export([init/1, handle_cast/2, handle_call/3, handle_info/2,
@@ -58,6 +58,9 @@
5858
stream_encrypted => boolean(),
5959
stream_version => {non_neg_integer(), non_neg_integer()},
6060
stream_authenticated => boolean(),
61+
stream_queue := [xmpp_element() | xmlel()],
62+
stream_queue_max := non_neg_integer(),
63+
stream_queue_timeout => {non_neg_integer(), integer()},
6164
ip => {inet:ip_address(), inet:port_number()},
6265
codec_options => [xmpp:decode_option()],
6366
xmlns => binary(),
@@ -226,7 +229,21 @@ close(Pid, Reason) ->
226229
establish(State) ->
227230
process_stream_established(State).
228231

229-
-spec set_timeout(state(), non_neg_integer() | infinity) -> state().
232+
-spec configure_queue(state(), non_neg_integer(), non_neg_integer()) -> state().
233+
configure_queue(#{owner := Owner} = State, MaxSize, MaxDelay)
234+
when Owner == self() ->
235+
flush_queue(State), % Support reconfiguration.
236+
if MaxSize == 0; MaxDelay == 0 ->
237+
State#{stream_queue_max => 0};
238+
true ->
239+
CurrentTime = p1_time_compat:monotonic_time(milli_seconds),
240+
State#{stream_queue_max => MaxSize,
241+
stream_queue_timeout => {MaxDelay, CurrentTime}}
242+
end;
243+
configure_queue(_, _, _) ->
244+
erlang:error(badarg).
245+
246+
-spec set_timeout(state(), timeout()) -> state().
230247
set_timeout(#{owner := Owner} = State, Timeout) when Owner == self() ->
231248
case Timeout of
232249
infinity -> State#{stream_timeout => infinity};
@@ -280,7 +297,9 @@ init([Mod, {SockMod, Socket}, Opts]) ->
280297
socket_mod => SockMod,
281298
socket_opts => Opts,
282299
stream_timeout => {Timeout, Time},
283-
stream_state => accepting},
300+
stream_state => accepting,
301+
stream_queue => [],
302+
stream_queue_max => 0},
284303
{ok, State, Timeout}.
285304

286305
-spec handle_cast(term(), state()) -> next_state().
@@ -424,6 +443,8 @@ handle_info({'$gen_all_state_event', {xmlstreamcdata, Data}},
424443
noreply(try callback(handle_cdata, Data, State)
425444
catch _:{?MODULE, undef} -> State
426445
end);
446+
handle_info(timeout, #{stream_queue := [_|_]} = State) ->
447+
noreply(flush_queue(State));
427448
handle_info(timeout, #{lang := Lang} = State) ->
428449
Disconnected = is_disconnected(State),
429450
noreply(try callback(handle_timeout, State)
@@ -525,12 +546,26 @@ init_state(#{socket := Socket, mod := Mod} = State, Opts) ->
525546
({stop, state()}) -> {stop, normal, state()}.
526547
noreply({stop, State}) ->
527548
{stop, normal, State};
528-
noreply(#{stream_timeout := infinity} = State) ->
529-
{noreply, State, infinity};
530-
noreply(#{stream_timeout := {MSecs, StartTime}} = State) ->
549+
noreply(State) ->
550+
{noreply, State, get_timeout(State)}.
551+
552+
-spec get_timeout(state()) -> timeout().
553+
get_timeout(State) ->
554+
min(get_stream_timeout(State), get_queue_timeout(State)).
555+
556+
-spec get_stream_timeout(state()) -> timeout().
557+
get_stream_timeout(#{stream_timeout := infinity}) ->
558+
infinity;
559+
get_stream_timeout(#{stream_timeout := {MSecs, StartTime}}) ->
531560
CurrentTime = p1_time_compat:monotonic_time(milli_seconds),
532-
Timeout = max(0, MSecs - CurrentTime + StartTime),
533-
{noreply, State, Timeout}.
561+
max(0, MSecs - CurrentTime + StartTime).
562+
563+
-spec get_queue_timeout(state()) -> timeout().
564+
get_queue_timeout(#{stream_queue := []}) ->
565+
infinity;
566+
get_queue_timeout(#{stream_queue_timeout := {MSecs, StartTime}}) ->
567+
CurrentTime = p1_time_compat:monotonic_time(milli_seconds),
568+
max(0, MSecs - CurrentTime + StartTime).
534569

535570
-spec is_disconnected(state()) -> boolean().
536571
is_disconnected(#{stream_state := StreamState}) ->
@@ -1193,21 +1228,29 @@ send_header(State, _) ->
11931228

11941229
-spec send_pkt(state(), xmpp_element() | xmlel()) -> state().
11951230
send_pkt(State, Pkt) ->
1196-
Result = socket_send(State, Pkt),
1197-
State1 = try callback(handle_send, Pkt, Result, State)
1198-
catch _:{?MODULE, undef} -> State
1199-
end,
1200-
case Result of
1201-
_ when is_record(Pkt, stream_error) ->
1202-
process_stream_end({stream, {out, Pkt}}, State1);
1203-
ok ->
1204-
State1;
1205-
{error, _Why} ->
1206-
% Queue process_stream_end instead of calling it directly,
1207-
% so we have opportunity to process incoming queued messages before
1208-
% terminating session.
1209-
self() ! {'$gen_event', closed},
1210-
State1
1231+
case check_queue(State, Pkt) of
1232+
flush ->
1233+
flush_queue(State, Pkt);
1234+
queue ->
1235+
queue_pkt(State, Pkt);
1236+
noqueue ->
1237+
State1 = flush_queue(State),
1238+
Result = socket_send(State1, Pkt),
1239+
State2 = try callback(handle_send, Pkt, Result, State1)
1240+
catch _:{?MODULE, undef} -> State1
1241+
end,
1242+
case Result of
1243+
_ when is_record(Pkt, stream_error) ->
1244+
process_stream_end({stream, {out, Pkt}}, State2);
1245+
ok ->
1246+
State2;
1247+
{error, _Why} ->
1248+
% Queue process_stream_end instead of calling it directly,
1249+
% so we have the opportunity to process incoming queued
1250+
% messages before terminating the session.
1251+
self() ! {'$gen_event', closed},
1252+
State2
1253+
end
12111254
end.
12121255

12131256
-spec send_error(state(), xmpp_element() | xmlel(), stanza_error()) -> state().
@@ -1258,6 +1301,60 @@ close_socket(#{socket := Socket} = State) ->
12581301
close_socket(State) ->
12591302
State.
12601303

1304+
-spec check_queue(state(), xmpp_element() | xmlel()) -> flush | queue | noqueue.
1305+
check_queue(#{stream_queue_max := 0}, _Pkt) ->
1306+
noqueue;
1307+
check_queue(#{stream_state := StreamState}, _Pkt)
1308+
when StreamState /= established->
1309+
noqueue;
1310+
check_queue(_State, Pkt)
1311+
when not ?is_stanza(Pkt),
1312+
not is_record(Pkt, sm_a),
1313+
not is_record(Pkt, sm_r) ->
1314+
noqueue;
1315+
check_queue(#{stream_queue := Q, stream_queue_max := MaxQueue}, _Pkt)
1316+
when length(Q) >= MaxQueue ->
1317+
flush;
1318+
check_queue(_State, _Pkt) ->
1319+
queue.
1320+
1321+
-spec queue_pkt(state(), xmpp_element() | xmlel()) -> state().
1322+
queue_pkt(#{stream_queue := [],
1323+
stream_queue_timeout := {MSecs, _PrevTime}} = State, Pkt) ->
1324+
CurrentTime = p1_time_compat:monotonic_time(milli_seconds),
1325+
State#{stream_queue := [Pkt],
1326+
stream_queue_timeout := {MSecs, CurrentTime}};
1327+
queue_pkt(#{stream_queue := Q} = State, Pkt) ->
1328+
State#{stream_queue := [Pkt|Q]}.
1329+
1330+
-spec flush_queue(state(), xmpp_element() | xmlel()) -> state().
1331+
flush_queue(State, Pkt) ->
1332+
flush_queue(queue_pkt(State, Pkt)).
1333+
1334+
-spec flush_queue(state()) -> state().
1335+
flush_queue(#{stream_queue := []} = State) ->
1336+
State;
1337+
flush_queue(#{stream_queue := Q0,
1338+
socket := Sock,
1339+
xmlns := NS} = State0) ->
1340+
Q = lists:reverse(Q0),
1341+
Els = [xmpp:encode(Pkt, NS) || Pkt <- Q],
1342+
Result = xmpp_socket:send_elements(Sock, Els),
1343+
State1 = State0#{stream_queue := []},
1344+
State2 = try lists:foldl(
1345+
fun(Pkt, State) ->
1346+
callback(handle_send, Pkt, Result, State)
1347+
end, State1, Q)
1348+
catch _:{?MODULE, undef} -> State1
1349+
end,
1350+
case Result of
1351+
ok ->
1352+
State2;
1353+
{error, _Why} ->
1354+
self() ! {'$gen_event', closed},
1355+
State2
1356+
end.
1357+
12611358
-spec select_lang(binary(), binary()) -> binary().
12621359
select_lang(Lang, <<"">>) -> Lang;
12631360
select_lang(_, Lang) -> Lang.

0 commit comments

Comments
 (0)