-
Notifications
You must be signed in to change notification settings - Fork 2.3k
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
Refactor and optimize persistent session #12251
Conversation
ac74c03
to
e1a2514
Compare
|
||
ensure_iterator(TopicFilter, StartTime, SubId, {{RankX, RankY}, Stream}, S) -> | ||
%% TODO: hash collisions | ||
Key = {SubId, erlang:phash2(Stream)}, |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
This is the same problem as Thales was facing... I need to come up with an idea how to store the streams without worrying about collisions.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Shouldn't we use the stream itself as the key? That was what I was doing for the DS cache. 🙈
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
RocksDB doesn't like maps in the keys; all match expressions will fail with badarg.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Right. The binary representation of the stream term then?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
It's kinda fat though 🤔
e1a2514
to
fe4bb00
Compare
|
||
ensure_iterator(TopicFilter, StartTime, SubId, {{RankX, RankY}, Stream}, S) -> | ||
%% TODO: hash collisions | ||
Key = {SubId, erlang:phash2(Stream)}, |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Shouldn't we use the stream itself as the key? That was what I was doing for the DS cache. 🙈
-define(created_at, created_at). | ||
-define(last_alive_at, last_alive_at). | ||
-define(conninfo, conninfo). | ||
%% Unique integer used to create unique identities | ||
-define(last_id, last_id). |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Q: shouldn't these use integer values to reduce the storage burden when persisting?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Translation of a map from/to a compact representation will be done here: https://github.com/emqx/emqx/pull/12251/files#diff-4f926b90b373b839679213fbdb3c30c784f5575bc3d4b721afce21466dded77eR394 (I haven't implemented any encoder callbacks yet).
I found that integer-keyed maps are rather unpleasant when debugging, I'd like to avoid them when possible.
1eaa2af
to
334c4e7
Compare
@@ -1,5 +1,5 @@ | |||
%%-------------------------------------------------------------------- | |||
%% Copyright (c) 2020-2023 EMQ Technologies Co., Ltd. All Rights Reserved. | |||
%% Copyright (c) 2020-2024 EMQ Technologies Co., Ltd. All Rights Reserved. |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Auto-update copyright hook...
This change is accidental, but I guess it won't hurt.
@@ -1777,7 +1777,7 @@ fields("session_persistence") -> | |||
sc( | |||
pos_integer(), | |||
#{ | |||
default => 1000, | |||
default => 100, |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Since the introduction of the mqueue buffer for persistent sessions, it's no longer necessary to use huge batches.
14f60c6
to
90ba95c
Compare
d732d54
to
10cb2f0
Compare
25e93ff
to
d725db4
Compare
case enrich_deliver(ClientInfo, D, UpgradeQoS, Session) of | ||
[] -> | ||
enrich_delivers(ClientInfo, Rest, UpgradeQoS, Session); | ||
Msg -> | ||
[Msg | enrich_delivers(ClientInfo, Rest, UpgradeQoS, Session)] | ||
end. | ||
enrich_deliver(ClientInfo, D, UpgradeQoS, Session) ++ | ||
enrich_delivers(ClientInfo, Rest, UpgradeQoS, Session). |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
What's the point of this change? It seems to just introduce a couple of extra heap allocations per each message.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
++
is a BIF though, and we can get rid of the pattern-matching. Not sure what is more expensive in terms of micro-optimizations, but always returning a list is at least nicer from the typing perspective.
-spec update_seqno(puback | pubrec | pubcomp, emqx_types:packet_id(), session()) -> | ||
{ok, emqx_types:message(), session()} | {error, _}. | ||
update_seqno(Track, PacketId, Session = #{id := SessionId, s := S}) -> | ||
SeqNo = packet_id_to_seqno(PacketId, S), |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Q: Should we reject things like PUBREC(PacketID = 42)
(i.e. packet id from different track)?
%% | ||
|
||
transaction(Fun) -> | ||
mria:async_dirty(?DS_MRIA_SHARD, Fun). |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
This looks risky. For consistency to be preserved the order of operations in Fun
should be correct. Right now it almost certainly isn't (e.g. because not-yet-persisted state is kept in a map, streams' iterators are updated in unspecified order, etc.), so the consistency looks easy to break.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Agree that this part is a bit sketchy. However, session must NOT rely on mnesia_locker
. If we need a locking mechanism to prevent multiple versions of session from clobbering each others' persistent data, it should be explicit.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
It's not about concurrent updates, it's more about atomicity: async_dirty
is not atomic in presence of failures I believe. It could have been much less of a problem if the operations were issued in the consistency-preserving order, but they are not.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
If we intend to move the sessions to DS, we cannot rely on the atomicity of updates either. What would be the consistency preserving order here?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
we cannot rely on the atomicity of updates either
Why? There are usable transactions both in rocksdb and fdb.
What would be the consistency preserving order here?
Hard to define simply enough. There might be none actually, given that more pieces of state are now tracked in separate records. Persisting ?dup
markers first, then streams in the order of their respective seqno, then ?commited
markers could be a good start, though I'm still unsure if holes during replay would be impossible (probably still possible), and when to persist ranks (is it reconstructible from list of ifs
s?).
), | ||
{ok, [], Session}; | ||
handle_timeout(_ClientInfo, ?TIMER_BUMP_LAST_ALIVE_AT, Session0 = #{s := S0}) -> | ||
S = emqx_persistent_session_ds_state:commit(bump_last_alive(S0)), |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I'd expects users will want this configurable, especially when QoS2 flows are involved.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Not sure if I understand the remark. bump_interval()
function inside bump_last_alive()
reads the configuration. ?TIMER_BUMP_LAST_ALIVE_AT
is the id of the timer event.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Sorry, I was thinking about the configurability of the circumstances under which the session state is persisted.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I specifically tried to avoid adding new config parameters in this PR, since I find constant adding and deprecation of config parameters quite annoying.
If there is real-life need to fine tune these parameters separately, we can always add a new timer/new config.
2681866
to
e1f7d02
Compare
%%------------------------------------------------------------------------------ | ||
%% Testcases | ||
%%------------------------------------------------------------------------------ | ||
|
||
t_non_persistent_session_subscription(_Config) -> |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I removed this testcase because it tests in-memory session. Not sure if it's relevant for this testsuite. Also, testcases that verify absence of certain trace events are unreliable on their own: if a trace event is renamed or removed, the test will always pass.
e1f7d02
to
2c6a776
Compare
Fixes https://emqx.atlassian.net/browse/EMQX-11556
Release version: v/e5.?
Summary
Refactor code to make more invariants correct by construction.
Performance test
Fan-out to wildcard subscribers (message size = 0)
https://snapshots.raintank.io/dashboard/snapshot/ccmEzWR08hLfU9zdxQL9iEqG4gD6eqKC
Storage inefficiency factor:
Fan-out to wildcard subscribers (message size = 1k)
https://snapshots.raintank.io/dashboard/snapshot/fg0qoTi8v3vzvzoTCcw7uGGFdJnI1dGa
PR Checklist
Please convert it to a draft if any of the following conditions are not met. Reviewers may skip over until all the items are checked:
changes/(ce|ee)/(feat|perf|fix|breaking)-<PR-id>.en.md
filesChecklist for CI (.github/workflows) changes
changes/
dir for user-facing artifacts update