Skip to content

Commit

Permalink
fix(conn): postpone arming stats timer until CONNECT is processed
Browse files Browse the repository at this point in the history
  • Loading branch information
keynslug committed Nov 21, 2024
1 parent ad16ccf commit c76970f
Show file tree
Hide file tree
Showing 2 changed files with 45 additions and 32 deletions.
67 changes: 40 additions & 27 deletions apps/emqx/src/emqx_connection.erl
Original file line number Diff line number Diff line change
Expand Up @@ -101,12 +101,13 @@
%% GC State
gc_state :: option(emqx_gc:gc_state()),
%% Stats Timer
stats_timer :: disabled | option(reference()),
%% Idle Timeout
idle_timeout :: integer() | infinity,
%% When `disabled` stats are never reported.
%% When `paused` stats are not reported until complete CONNECT packet received.
%% Connection starts with `paused` by default.
stats_timer :: disabled | paused | option(reference()),
%% Idle Timer
idle_timer :: option(reference()),
%% Idle Timeout
%% Hibernate connection process if inactive for
hibernate_after :: integer() | infinity,
%% Zone name
zone :: atom(),
Expand Down Expand Up @@ -341,10 +342,9 @@ init_state(
end,
StatsTimer =
case emqx_config:get_zone_conf(Zone, [stats, enable]) of
true -> undefined;
true -> paused;
false -> disabled
end,
IdleTimeout = emqx_channel:get_mqtt_conf(Zone, idle_timeout),

#state{
transport = Transport,
Expand All @@ -358,8 +358,7 @@ init_state(
channel = Channel,
gc_state = GcState,
stats_timer = StatsTimer,
idle_timeout = IdleTimeout,
hibernate_after = maps:get(hibernate_after, Opts, IdleTimeout),
hibernate_after = maps:get(hibernate_after, Opts, get_zone_idle_timeout(Zone)),
zone = Zone,
listener = Listener,
limiter_buffer = queue:new(),
Expand All @@ -374,20 +373,17 @@ run_loop(
transport = Transport,
socket = Socket,
peername = Peername,
channel = Channel,
listener = Listener,
idle_timeout = IdleTimeout
zone = Zone
}
) ->
emqx_logger:set_metadata_peername(esockd:format(Peername)),
ShutdownPolicy = emqx_config:get_zone_conf(
emqx_channel:info(zone, Channel),
[force_shutdown]
),
ShutdownPolicy = emqx_config:get_zone_conf(Zone, [force_shutdown]),
emqx_utils:tune_heap_size(ShutdownPolicy),
case activate_socket(State) of
{ok, NState} ->
ok = set_tcp_keepalive(Listener),
IdleTimeout = get_zone_idle_timeout(Zone),
IdleTimer = start_timer(IdleTimeout, idle_timeout),
hibernate(Parent, NState#state{idle_timer = IdleTimer});
{error, Reason} ->
Expand Down Expand Up @@ -430,7 +426,7 @@ recvloop(
true ->
recvloop(Parent, State);
false ->
_ = set_chan_stats(State),
_ = try_set_chan_stats(State),
hibernate(Parent, cancel_stats_timer(State))
end
end.
Expand All @@ -440,8 +436,8 @@ handle_recv({system, From, Request}, Parent, State) ->
handle_recv({'EXIT', Parent, Reason}, Parent, State) ->
%% FIXME: it's not trapping exit, should never receive an EXIT
terminate(Reason, State);
handle_recv(Msg, Parent, State = #state{idle_timeout = IdleTimeout}) ->
case process_msg([Msg], ensure_stats_timer(IdleTimeout, State)) of
handle_recv(Msg, Parent, State) ->
case process_msg([Msg], ensure_stats_timer(State)) of
{ok, NewState} ->
?MODULE:recvloop(Parent, NewState);
{stop, Reason, NewSate} ->
Expand All @@ -458,10 +454,23 @@ wakeup_from_hib(Parent, State) ->
%%--------------------------------------------------------------------
%% Ensure/cancel stats timer

-compile({inline, [ensure_stats_timer/2]}).
-compile({inline, [ensure_stats_timer/1, ensure_stats_timer/2]}).
ensure_stats_timer(State = #state{stats_timer = undefined}) ->
Timeout = get_zone_idle_timeout(State#state.zone),
ensure_stats_timer(Timeout, State);
ensure_stats_timer(State) ->
State.

ensure_stats_timer(Timeout, State = #state{stats_timer = undefined}) ->
State#state{stats_timer = start_timer(Timeout, emit_stats)};
ensure_stats_timer(_Timeout, State) ->
%% Either already active, disabled, or paused.
State.

-compile({inline, [resume_stats_timer/1]}).
resume_stats_timer(State = #state{stats_timer = paused}) ->
ensure_stats_timer(0, State#state{stats_timer = undefined});
resume_stats_timer(State = #state{stats_timer = disabled}) ->
State.

-compile({inline, [cancel_stats_timer/1]}).
Expand All @@ -472,6 +481,10 @@ cancel_stats_timer(State = #state{stats_timer = TRef}) when is_reference(TRef) -
cancel_stats_timer(State) ->
State.

-compile({inline, [get_zone_idle_timeout/1]}).
get_zone_idle_timeout(Zone) ->
emqx_channel:get_mqtt_conf(Zone, idle_timeout).

%%--------------------------------------------------------------------
%% Process next Msg

Expand Down Expand Up @@ -552,12 +565,13 @@ handle_msg(
State = #state{idle_timer = IdleTimer}
) ->
ok = emqx_utils:cancel_timer(IdleTimer),
Serialize = emqx_frame:serialize_opts(ConnPkt),
NState = State#state{
serialize = Serialize,
serialize = emqx_frame:serialize_opts(ConnPkt),
idle_timer = undefined
},
handle_incoming(Packet, NState);
%% Causes stats timer to be emitted (if enabled) right after CONNECT is processed.
FState = resume_stats_timer(NState),
handle_incoming(Packet, FState);
handle_msg({incoming, Packet}, State) ->
?TRACE("MQTT", "mqtt_packet_received", #{packet => Packet}),
handle_incoming(Packet, State);
Expand Down Expand Up @@ -724,7 +738,8 @@ handle_timeout(
socket = Socket
}
) ->
_ = set_chan_stats(State),
ClientId = emqx_channel:info(clientid, Channel),
emqx_cm:set_chan_stats(ClientId, stats(State)),
emqx_congestion:maybe_alarm_conn_congestion(Socket, Transport, Channel),
{ok, State#state{stats_timer = undefined}};
handle_timeout(
Expand All @@ -743,7 +758,7 @@ handle_timeout(
handle_timeout(TRef, Msg, State) ->
with_channel(handle_timeout, [TRef, Msg], State).

set_chan_stats(State = #state{channel = Channel}) ->
try_set_chan_stats(State = #state{channel = Channel}) ->
case emqx_channel:info(clientid, Channel) of
%% ClientID is not yet known, nothing to report.
undefined -> false;
Expand Down Expand Up @@ -1111,10 +1126,8 @@ run_gc(Pubs, Bytes, State = #state{gc_state = GcSt, zone = Zone}) ->
{_IsGC, GcSt1} -> State#state{gc_state = GcSt1}
end.

check_oom(Pubs, Bytes, State = #state{channel = Channel}) ->
ShutdownPolicy = emqx_config:get_zone_conf(
emqx_channel:info(zone, Channel), [force_shutdown]
),
check_oom(Pubs, Bytes, State = #state{zone = Zone}) ->
ShutdownPolicy = emqx_config:get_zone_conf(Zone, [force_shutdown]),
case emqx_utils:check_oom(ShutdownPolicy) of
{shutdown, Reason} ->
%% triggers terminate/2 callback immediately
Expand Down
10 changes: 5 additions & 5 deletions apps/emqx/test/emqx_connection_SUITE.erl
Original file line number Diff line number Diff line change
Expand Up @@ -188,15 +188,15 @@ t_process_msg(_) ->
).

t_ensure_stats_timer(_) ->
NStats = emqx_connection:ensure_stats_timer(100, st()),
Stats_timer = emqx_connection:info(stats_timer, NStats),
?assert(is_reference(Stats_timer)),
NStats = emqx_connection:ensure_stats_timer(100, st(#{stats_timer => undefined})),
StatsTimer = emqx_connection:info(stats_timer, NStats),
?assert(is_reference(StatsTimer)),
?assertEqual(NStats, emqx_connection:ensure_stats_timer(100, NStats)).

t_cancel_stats_timer(_) ->
NStats = emqx_connection:cancel_stats_timer(st(#{stats_timer => make_ref()})),
Stats_timer = emqx_connection:info(stats_timer, NStats),
?assertEqual(undefined, Stats_timer),
StatsTimer = emqx_connection:info(stats_timer, NStats),
?assertEqual(undefined, StatsTimer),
?assertEqual(NStats, emqx_connection:cancel_stats_timer(NStats)).

t_append_msg(_) ->
Expand Down

0 comments on commit c76970f

Please sign in to comment.