-
Notifications
You must be signed in to change notification settings - Fork 2.3k
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
fix: willmsg not published in takeover #11868
fix: willmsg not published in takeover #11868
Conversation
482d4af
to
a4c202e
Compare
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
💯
a4c202e
to
f60eb8b
Compare
I did some updates. but need to dig more about the possible reasons of shutdown and connection states where willmsg publishing is allowed. |
805ff6e
to
6f779dc
Compare
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
👍🏼
?assertNot(IsWill2), | ||
emqtt:stop(CPid2), | ||
emqtt:stop(CPidSub), | ||
?assert(not is_process_alive(CPid1)), |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Tiny nit. The assert_client_exit/3
call several lines before returns only when CPid1
process exits, so this line seems kinda unnecessary.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I will leave it it is test code, assertions just helps.
%% a. expired (session expired) | ||
%% c. discarded (Session ends because another process starts new session with the same clientid) | ||
%% b. kicked. (kicked by operation) | ||
%% d. internal_error (maybe not recoverable) |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Agree. It seems a bit unsafe to assume that in the event of hitting internal error publishing should still work.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I just list them, may miss something, the entire call path, call stack is hard to follow.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I found this in spec:
In the case of a Server shutdown or failure, the Server MAY defer publication of Will Messages until a subsequent restart. If this happens, there might be a delay between the time the Server experienced failure and when the Will Message is published.
So I think it is ok to either publish/not publish/delay publish willmsg when server error.
apps/emqx/src/emqx_channel.erl
Outdated
terminate({shutdown, Reason}, Channel) when | ||
Reason =:= discarded; | ||
Reason =:= takenover | ||
Reason =:= expired orelse | ||
Reason =:= takenover orelse | ||
Reason =:= kicked orelse | ||
Reason =:= discarded | ||
-> | ||
run_terminate_hook(Reason, Channel); | ||
terminate(Reason, Channel = #channel{clientinfo = ClientInfo, will_msg = WillMsg}) -> | ||
%% since will_msg is set to undefined as soon as it is published, | ||
%% if will_msg still exists when the session is terminated, it | ||
%% must be published immediately. | ||
WillMsg =/= undefined andalso publish_will_msg(ClientInfo, WillMsg), | ||
run_terminate_hook(Reason, Channel). | ||
Channel1 = maybe_publish_will_msg(Reason, Channel), | ||
run_terminate_hook(Reason, Channel1); |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Nit. This clause's logic is identical to the next catch-all clause, the only difference is taking the inner atom from shutdown
tuple. I'd argue this makes the shutdown logic harder to follow.
In general, I suspect that one thing that could help to make things simpler to follow is separating maybe_publish_will_msg
into a set of 2 functions: one is for scheduling willmsg publishing (e.g. essentially called only when sock_closed
), another one is for deciding what to do on channel termination (when there's no point to schedule anything). This should also eliminate the need to do something explicitly on handle_call(kick, ...)
, i.e. here.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Nit. This clause's logic is identical to the next catch-all clause, the only difference is taking the inner atom from shutdown tuple. I'd argue this makes the shutdown logic harder to follow.
I thought the same and I did remove it until I found run_terminate_hook
relies on the Reason (kicked) instead of {shutdown, kicked}
, the hooks are exposed to the users so I think it better to keep it.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
In general, I suspect that one thing that could help to make things simpler to follow is separating maybe_publish_will_msg into a set of 2 functions: one is for scheduling willmsg publishing (e.g. essentially called only when sock_closed), another one is for deciding what to do on channel termination (when there's no point to schedule anything). This should also eliminate the need to do something explicitly on handle_call(kick, ...), i.e. here.
I found scheduling willmsg publishing will not always work when the process terminates before the timer get fired.
Simple fix would be to just extend the process lifetime but that is costly and it cannot be done in terminate
without changing the whole call stack.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Ah, good catch. However, it also means that terminate hook now will be called with expired
, instead of {shutdown, expired}
as before. Although I'm not sure if former behavior is important to preserve here.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I found scheduling willmsg publishing will not always work when the process terminates before the timer get fired.
I mean, that seems the reason why the logic is hard to follow: currently maybe_publish_will_msg
is called both from sock_closed context (where scheduling timers makes sense) and terminate context (where scheduling timers doesn't make sense). Yet some of its codepaths end up with timers being scheduled anyway, which is confusing: it's very hard to tell in which context they are scheduled. Thus it seems worthwhile to try to separate this into maybe_schedule_will_msg/1
and maybe_publish_will_msg_on_terminate/2
, at least on the first look.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
In general, I suspect that one thing that could help to make things simpler to follow is separating maybe_publish_will_msg into a set of 2 functions: one is for scheduling willmsg publishing (e.g. essentially called only when sock_closed), another one is for deciding what to do on channel termination (when there's no point to schedule anything). This should also eliminate the need to do something explicitly on handle_call(kick, ...), i.e. here.
I found scheduling willmsg publishing will not always work when the process terminates before the timer get fired. Simple fix would be to just extend the process lifetime but that is costly and it cannot be done in
terminate
without changing the whole call stack.
I am wrong. we could only schedule willmsg publishing when session expire > 0.
Reason =:= expired orelse | ||
Reason =:= discarded orelse | ||
Reason =:= kicked orelse | ||
Reason =:= ?chan_terminating orelse |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
this keeps the old behavior.
6aff0da
to
23c7e59
Compare
78f4b14
to
a0c8c06
Compare
because kick means shutdown connection AND delete session
will delay > session expire will delay < session expire timer triggered events are handled in seq, exclude the case of (will delay == session expire)
a0c8c06
to
125091a
Compare
125091a
to
2ff33f9
Compare
26fc157
to
6c7b774
Compare
kick should not trigger will message due to security reason. |
we decided to handle it in separate PR. |
Fixes #10551
Following the content in MQTT 5.0 spec I draw the state diagram of willmsg.
(a.) MQTT-3.1.2-8
(b.) MQTT-3.1.2-10
(c.) MQTT 3.1.3-9
(d.) MQTT-3.1.4-3
(e.) MQTT-3.14.4-3
(f.) ch 3.1.3.2.2
and it could be simplified to (in EMQX terms)
So following applies
applies to 'discard', 'internal_error', 'kick' and 'expired'
applies to 'takeover' , 'sock_close' (socket error) this is also the default error handling
@note,
EMQX named two takeover scenarios to reflact the term 'takenover' in MQTT 5.0
a. Discard: takeover with clean_session = True
b. Takeover: takeover with clean_session = False
For 'internal_error', we don't have chance to keep the session?
'kick' in EMQX also removes the session
Already triggered will_msg publishing will not work if will_msg is absent.
Summary
🤖 Generated by Copilot at b497f84
This pull request fixes a bug with will messages not being published after session takeover, adds a feature to gracefully close the transport layer when shutting down a connection, refactors and simplifies the code for handling will messages and channel termination, and updates the test suite to cover different scenarios for session takeover. It also adds a change log file
changes/ce/fix-11868.en.md
to document the bug fix.PR Checklist
Please convert it to a draft if any of the following conditions are not met. Reviewers may skip over until all the items are checked:
changes/(ce|ee)/(feat|perf|fix|breaking)-<PR-id>.en.md
filesChecklist for CI (.github/workflows) changes
changes/
dir for user-facing artifacts update