Skip to content

Commit

Permalink
fix(conn): resume stats timer in on connected event instead
Browse files Browse the repository at this point in the history
Also don't fire zero-timeout timer because `connected` event handling
code already takes care of initialization the channel metrics.
  • Loading branch information
keynslug committed Nov 21, 2024
1 parent c76970f commit c74f7df
Show file tree
Hide file tree
Showing 3 changed files with 16 additions and 17 deletions.
18 changes: 6 additions & 12 deletions apps/emqx/src/emqx_connection.erl
Original file line number Diff line number Diff line change
Expand Up @@ -454,22 +454,17 @@ wakeup_from_hib(Parent, State) ->
%%--------------------------------------------------------------------
%% Ensure/cancel stats timer

-compile({inline, [ensure_stats_timer/1, ensure_stats_timer/2]}).
-compile({inline, [ensure_stats_timer/1]}).
ensure_stats_timer(State = #state{stats_timer = undefined}) ->
Timeout = get_zone_idle_timeout(State#state.zone),
ensure_stats_timer(Timeout, State);
ensure_stats_timer(State) ->
State.

ensure_stats_timer(Timeout, State = #state{stats_timer = undefined}) ->
State#state{stats_timer = start_timer(Timeout, emit_stats)};
ensure_stats_timer(_Timeout, State) ->
ensure_stats_timer(State) ->
%% Either already active, disabled, or paused.
State.

-compile({inline, [resume_stats_timer/1]}).
resume_stats_timer(State = #state{stats_timer = paused}) ->
ensure_stats_timer(0, State#state{stats_timer = undefined});
State#state{stats_timer = undefined};
resume_stats_timer(State = #state{stats_timer = disabled}) ->
State.

Expand Down Expand Up @@ -569,9 +564,7 @@ handle_msg(
serialize = emqx_frame:serialize_opts(ConnPkt),
idle_timer = undefined
},
%% Causes stats timer to be emitted (if enabled) right after CONNECT is processed.
FState = resume_stats_timer(NState),
handle_incoming(Packet, FState);
handle_incoming(Packet, NState);
handle_msg({incoming, Packet}, State) ->
?TRACE("MQTT", "mqtt_packet_received", #{packet => Packet}),
handle_incoming(Packet, State);
Expand Down Expand Up @@ -622,7 +615,8 @@ handle_msg(
maps:get(conn_pid, QSS), {PS, Serialize, Channel}
),
ClientId = emqx_channel:info(clientid, Channel),
emqx_cm:insert_channel_info(ClientId, info(State), stats(State));
emqx_cm:insert_channel_info(ClientId, info(State), stats(State)),
{ok, resume_stats_timer(State)};
handle_msg({event, disconnected}, State = #state{channel = Channel}) ->
ClientId = emqx_channel:info(clientid, Channel),
emqx_cm:set_chan_info(ClientId, info(State)),
Expand Down
6 changes: 3 additions & 3 deletions apps/emqx/test/emqx_connection_SUITE.erl
Original file line number Diff line number Diff line change
Expand Up @@ -188,10 +188,10 @@ t_process_msg(_) ->
).

t_ensure_stats_timer(_) ->
NStats = emqx_connection:ensure_stats_timer(100, st(#{stats_timer => undefined})),
NStats = emqx_connection:ensure_stats_timer(st(#{stats_timer => undefined})),
StatsTimer = emqx_connection:info(stats_timer, NStats),
?assert(is_reference(StatsTimer)),
?assertEqual(NStats, emqx_connection:ensure_stats_timer(100, NStats)).
?assertEqual(NStats, emqx_connection:ensure_stats_timer(NStats)).

t_cancel_stats_timer(_) ->
NStats = emqx_connection:cancel_stats_timer(st(#{stats_timer => make_ref()})),
Expand Down Expand Up @@ -272,7 +272,7 @@ t_handle_msg_event(_) ->
ok = meck:expect(emqx_cm, register_channel, fun(_, _, _) -> ok end),
ok = meck:expect(emqx_cm, insert_channel_info, fun(_, _, _) -> ok end),
ok = meck:expect(emqx_cm, set_chan_info, fun(_, _) -> ok end),
?assertEqual(ok, handle_msg({event, connected}, st())),
?assertMatch({ok, _St}, handle_msg({event, connected}, st())),
?assertMatch({ok, _St}, handle_msg({event, disconnected}, st())),
?assertMatch({ok, _St}, handle_msg({event, undefined}, st())).

Expand Down
9 changes: 7 additions & 2 deletions apps/emqx/test/emqx_mqtt_protocol_v5_SUITE.erl
Original file line number Diff line number Diff line change
Expand Up @@ -384,11 +384,16 @@ t_connect_emit_stats_timeout('end', _Config) ->

t_connect_emit_stats_timeout(Config) ->
ConnFun = ?config(conn_fun, Config),
{_, IdleTimeout} = lists:keyfind(idle_timeout, 1, Config),
IdleTimeout = ?config(idle_timeout, Config),
{ok, Client} = emqtt:start_link([{proto_ver, v5}, {keepalive, 60} | Config]),
{ok, _} = emqtt:ConnFun(Client),
%% Poke the connection to ensure stats timer is armed.
pong = emqtt:ping(Client),
[ClientPid] = emqx_cm:lookup_channels(client_info(clientid, Client)),
?assert(is_reference(emqx_connection:info(stats_timer, sys:get_state(ClientPid)))),
?assertMatch(
TRef when is_reference(TRef),
emqx_connection:info(stats_timer, sys:get_state(ClientPid))
),
?block_until(#{?snk_kind := cancel_stats_timer}, IdleTimeout * 2, _BackInTime = 0),
?assertEqual(undefined, emqx_connection:info(stats_timer, sys:get_state(ClientPid))),
ok = emqtt:disconnect(Client).
Expand Down

0 comments on commit c74f7df

Please sign in to comment.