Skip to content

Commit 247b47b

Browse files
committed
Optionally queue outgoing data
Support queueing outgoing stanzas and stream management elements for up to a configurable number of milliseconds (with a configurable queue size limit). This allows for batching up multiple XML elements into a single TCP packet in order to reduce the TCP/IP overhead.
1 parent a90c53e commit 247b47b

File tree

3 files changed

+249
-47
lines changed

3 files changed

+249
-47
lines changed

src/xmpp_socket.erl

+7
Original file line numberDiff line numberDiff line change
@@ -28,6 +28,7 @@
2828
compress/1,
2929
compress/2,
3030
reset_stream/1,
31+
send_elements/2,
3132
send_element/2,
3233
send_header/2,
3334
send_trailer/1,
@@ -196,6 +197,12 @@ reset_stream(#socket_state{xml_stream = XMLStream,
196197
SocketData#socket_state{socket = Socket1}
197198
end.
198199

200+
-spec send_elements(socket_state(), [fxml:xmlel()]) -> ok | {error, inet:posix()}.
201+
send_elements(#socket_state{xml_stream = undefined}, _Els) ->
202+
erlang:error(not_implemented);
203+
send_elements(SocketData, Els) ->
204+
send(SocketData, list_to_binary([fxml:element_to_binary(El) || El <- Els])).
205+
199206
-spec send_element(socket_state(), fxml:xmlel()) -> ok | {error, inet:posix()}.
200207
send_element(#socket_state{xml_stream = undefined} = SocketData, El) ->
201208
send_xml(SocketData, {xmlstreamelement, El});

src/xmpp_stream_in.erl

+127-25
Original file line numberDiff line numberDiff line change
@@ -25,8 +25,8 @@
2525
%% API
2626
-export([start/3, start_link/3, call/3, cast/2, reply/2, stop/1, stop_async/1,
2727
accept/1, send/2, close/1, close/2, send_error/3, establish/1,
28-
get_transport/1, change_shaper/2, set_timeout/2, format_error/1,
29-
send_ws_ping/1]).
28+
get_transport/1, change_shaper/2, configure_queue/3, set_timeout/2,
29+
format_error/1, send_ws_ping/1]).
3030

3131
%% gen_server callbacks
3232
-export([init/1, handle_cast/2, handle_call/3, handle_info/2,
@@ -58,6 +58,9 @@
5858
stream_encrypted => boolean(),
5959
stream_version => {non_neg_integer(), non_neg_integer()},
6060
stream_authenticated => boolean(),
61+
stream_queue := [xmpp_element() | xmlel()],
62+
stream_queue_max := non_neg_integer(),
63+
stream_queue_timeout => {non_neg_integer(), integer()},
6164
ip => {inet:ip_address(), inet:port_number()},
6265
codec_options => [xmpp:decode_option()],
6366
xmlns => binary(),
@@ -226,7 +229,21 @@ close(Pid, Reason) ->
226229
establish(State) ->
227230
process_stream_established(State).
228231

229-
-spec set_timeout(state(), non_neg_integer() | infinity) -> state().
232+
-spec configure_queue(state(), non_neg_integer(), non_neg_integer()) -> state().
233+
configure_queue(#{owner := Owner} = State, MaxSize, MaxDelay)
234+
when Owner == self() ->
235+
flush_queue(State), % Support reconfiguration.
236+
if MaxSize == 0; MaxDelay == 0 ->
237+
State#{stream_queue_max => 0};
238+
true ->
239+
CurrentTime = p1_time_compat:monotonic_time(milli_seconds),
240+
State#{stream_queue_max => MaxSize,
241+
stream_queue_timeout => {MaxDelay, CurrentTime}}
242+
end;
243+
configure_queue(_, _, _) ->
244+
erlang:error(badarg).
245+
246+
-spec set_timeout(state(), timeout()) -> state().
230247
set_timeout(#{owner := Owner} = State, Timeout) when Owner == self() ->
231248
case Timeout of
232249
infinity -> State#{stream_timeout => infinity};
@@ -280,7 +297,9 @@ init([Mod, {SockMod, Socket}, Opts]) ->
280297
socket_mod => SockMod,
281298
socket_opts => Opts,
282299
stream_timeout => {Timeout, Time},
283-
stream_state => accepting},
300+
stream_state => accepting,
301+
stream_queue => [],
302+
stream_queue_max => 0},
284303
{ok, State, Timeout}.
285304

286305
-spec handle_cast(term(), state()) -> next_state().
@@ -424,6 +443,8 @@ handle_info({'$gen_all_state_event', {xmlstreamcdata, Data}},
424443
noreply(try callback(handle_cdata, Data, State)
425444
catch _:{?MODULE, undef} -> State
426445
end);
446+
handle_info(timeout, #{stream_queue := [_|_]} = State) ->
447+
noreply(flush_queue(State));
427448
handle_info(timeout, #{lang := Lang} = State) ->
428449
Disconnected = is_disconnected(State),
429450
noreply(try callback(handle_timeout, State)
@@ -522,15 +543,32 @@ init_state(#{socket := Socket, mod := Mod} = State, Opts) ->
522543
end.
523544

524545
-spec noreply(state()) -> noreply();
525-
({stop, state()}) -> {stop, normal, state()}.
546+
({stop, state()}) -> {stop, normal, state()};
547+
({stop, normal, state()}) -> {stop, normal, state()}.
526548
noreply({stop, State}) ->
527549
{stop, normal, State};
528-
noreply(#{stream_timeout := infinity} = State) ->
529-
{noreply, State, infinity};
530-
noreply(#{stream_timeout := {MSecs, StartTime}} = State) ->
550+
noreply({stop, normal, State}) ->
551+
{stop, normal, State};
552+
noreply(State) ->
553+
{noreply, State, get_timeout(State)}.
554+
555+
-spec get_timeout(state()) -> timeout().
556+
get_timeout(State) ->
557+
min(get_stream_timeout(State), get_queue_timeout(State)).
558+
559+
-spec get_stream_timeout(state()) -> timeout().
560+
get_stream_timeout(#{stream_timeout := infinity}) ->
561+
infinity;
562+
get_stream_timeout(#{stream_timeout := {MSecs, StartTime}}) ->
531563
CurrentTime = p1_time_compat:monotonic_time(milli_seconds),
532-
Timeout = max(0, MSecs - CurrentTime + StartTime),
533-
{noreply, State, Timeout}.
564+
max(0, MSecs - CurrentTime + StartTime).
565+
566+
-spec get_queue_timeout(state()) -> timeout().
567+
get_queue_timeout(#{stream_queue := []}) ->
568+
infinity;
569+
get_queue_timeout(#{stream_queue_timeout := {MSecs, StartTime}}) ->
570+
CurrentTime = p1_time_compat:monotonic_time(milli_seconds),
571+
max(0, MSecs - CurrentTime + StartTime).
534572

535573
-spec is_disconnected(state()) -> boolean().
536574
is_disconnected(#{stream_state := StreamState}) ->
@@ -1193,21 +1231,29 @@ send_header(State, _) ->
11931231

11941232
-spec send_pkt(state(), xmpp_element() | xmlel()) -> state().
11951233
send_pkt(State, Pkt) ->
1196-
Result = socket_send(State, Pkt),
1197-
State1 = try callback(handle_send, Pkt, Result, State)
1198-
catch _:{?MODULE, undef} -> State
1199-
end,
1200-
case Result of
1201-
_ when is_record(Pkt, stream_error) ->
1202-
process_stream_end({stream, {out, Pkt}}, State1);
1203-
ok ->
1204-
State1;
1205-
{error, _Why} ->
1206-
% Queue process_stream_end instead of calling it directly,
1207-
% so we have opportunity to process incoming queued messages before
1208-
% terminating session.
1209-
self() ! {'$gen_event', closed},
1210-
State1
1234+
case check_queue(State, Pkt) of
1235+
flush ->
1236+
flush_queue(State, Pkt);
1237+
queue ->
1238+
queue_pkt(State, Pkt);
1239+
noqueue ->
1240+
State1 = flush_queue(State),
1241+
Result = socket_send(State1, Pkt),
1242+
State2 = try callback(handle_send, Pkt, Result, State1)
1243+
catch _:{?MODULE, undef} -> State1
1244+
end,
1245+
case Result of
1246+
_ when is_record(Pkt, stream_error) ->
1247+
process_stream_end({stream, {out, Pkt}}, State2);
1248+
ok ->
1249+
State2;
1250+
{error, _Why} ->
1251+
% Queue process_stream_end instead of calling it directly,
1252+
% so we have the opportunity to process incoming queued
1253+
% messages before terminating the session.
1254+
self() ! {'$gen_event', closed},
1255+
State2
1256+
end
12111257
end.
12121258

12131259
-spec send_error(state(), xmpp_element() | xmlel(), stanza_error()) -> state().
@@ -1258,6 +1304,62 @@ close_socket(#{socket := Socket} = State) ->
12581304
close_socket(State) ->
12591305
State.
12601306

1307+
-spec check_queue(state(), xmpp_element() | xmlel()) -> flush | queue | noqueue.
1308+
check_queue(#{stream_queue_max := 0}, _Pkt) ->
1309+
noqueue;
1310+
check_queue(#{stream_state := StreamState}, _Pkt)
1311+
when StreamState /= established->
1312+
noqueue;
1313+
check_queue(_State, Pkt)
1314+
when not ?is_stanza(Pkt),
1315+
not is_record(Pkt, sm_a),
1316+
not is_record(Pkt, sm_r) ->
1317+
noqueue;
1318+
check_queue(#{stream_queue := Q, stream_queue_max := MaxQueue}, _Pkt)
1319+
when length(Q) >= MaxQueue ->
1320+
flush;
1321+
check_queue(_State, _Pkt) ->
1322+
queue.
1323+
1324+
-spec queue_pkt(state(), xmpp_element() | xmlel()) -> state().
1325+
queue_pkt(#{stream_queue := [],
1326+
stream_queue_timeout := {MSecs, _PrevTime}} = State, Pkt) ->
1327+
CurrentTime = p1_time_compat:monotonic_time(milli_seconds),
1328+
State#{stream_queue := [Pkt],
1329+
stream_queue_timeout := {MSecs, CurrentTime}};
1330+
queue_pkt(#{stream_queue := Q} = State, Pkt) ->
1331+
State#{stream_queue := [Pkt|Q]}.
1332+
1333+
-spec flush_queue(state(), xmpp_element() | xmlel()) -> state().
1334+
flush_queue(State, Pkt) ->
1335+
flush_queue(queue_pkt(State, Pkt)).
1336+
1337+
-spec flush_queue(state()) -> state().
1338+
flush_queue(#{stream_queue := []} = State) ->
1339+
State;
1340+
flush_queue(#{stream_queue := Q0,
1341+
socket := Sock,
1342+
xmlns := NS} = State0) ->
1343+
Q = lists:reverse(Q0),
1344+
Els = [xmpp:encode(Pkt, NS) || Pkt <- Q],
1345+
Result = xmpp_socket:send_elements(Sock, Els),
1346+
State1 = State0#{stream_queue := []},
1347+
State2 = try lists:foldl(
1348+
fun(Pkt, State) ->
1349+
callback(handle_send, Pkt, Result, State)
1350+
end, State1, Q)
1351+
catch _:{?MODULE, undef} -> State1
1352+
end,
1353+
case Result of
1354+
ok ->
1355+
State2;
1356+
{error, _Why} ->
1357+
self() ! {'$gen_event', closed},
1358+
State2
1359+
end;
1360+
flush_queue(#{stream_queue := _Q} = State) -> % Socket has been released.
1361+
maps:remove(stream_queue, State).
1362+
12611363
-spec select_lang(binary(), binary()) -> binary().
12621364
select_lang(Lang, <<"">>) -> Lang;
12631365
select_lang(_, Lang) -> Lang.

0 commit comments

Comments
 (0)