Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Refactor and optimize persistent session #12251

Merged
merged 19 commits into from
Jan 26, 2024
Merged

Conversation

ieQu1
Copy link
Member

@ieQu1 ieQu1 commented Jan 3, 2024

Fixes https://emqx.atlassian.net/browse/EMQX-11556

Release version: v/e5.?

Summary

Refactor code to make more invariants correct by construction.

  • Hide Mnesia tables behind a CRUD module. Introduce dirty session state to avoid frequent mria transactions
  • Factor out inflight buffer to a different module
  • Use separate tracks of PacketIds for QoS1 and QoS2 messages
  • Avoid storing pubranges, allow only one pubrange per stream
  • Better logical separation of concerns

Performance test

Fan-out to wildcard subscribers (message size = 0)

emqttb --pushgw --loiter 10min \
               @pub --metadata -N 1000 --qos 1 --topic foo/%n --pubinterval 100ms --size 0 \
               @sub --qos 1 --parse-metadata --topic foo/# -N 10 --expiry 30 \
               @a -a pub/pubinterval -V 10000000 --Ti 0.1 --Kp 0.0009 --pvar [scenarios,sub,{},metrics,e2e_latency] --setpoint 1000000 --min 1000 --max 100000000 \
               @g --inflight 1000 &

https://snapshots.raintank.io/dashboard/snapshot/ccmEzWR08hLfU9zdxQL9iEqG4gD6eqKC

Storage inefficiency factor:

tar cf  data.tar data
tar czf data.tar.xz data
echo "$(stat --format %s data.tar) / $(stat --format %s data.tar.xz)" | bc -l

#+RESULTS:
: 7.470356703038198

Fan-out to wildcard subscribers (message size = 1k)

https://snapshots.raintank.io/dashboard/snapshot/fg0qoTi8v3vzvzoTCcw7uGGFdJnI1dGa

PR Checklist

Please convert it to a draft if any of the following conditions are not met. Reviewers may skip over until all the items are checked:

  • Added tests for the changes
  • Added property-based tests for code which performs user input validation
  • Changed lines covered in coverage report
  • Change log has been added to changes/(ce|ee)/(feat|perf|fix|breaking)-<PR-id>.en.md files
  • For internal contributor: there is a jira ticket to track this change
  • Created PR to emqx-docs if documentation update is required, or link to a follow-up jira ticket
  • Schema changes are backward compatible

Checklist for CI (.github/workflows) changes

  • If changed package build workflow, pass this action (manual trigger)
  • Change log has been added to changes/ dir for user-facing artifacts update

@ieQu1 ieQu1 force-pushed the dev/refactor-sessds branch from ac74c03 to e1a2514 Compare January 8, 2024 11:57

ensure_iterator(TopicFilter, StartTime, SubId, {{RankX, RankY}, Stream}, S) ->
%% TODO: hash collisions
Key = {SubId, erlang:phash2(Stream)},
Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This is the same problem as Thales was facing... I need to come up with an idea how to store the streams without worrying about collisions.

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Shouldn't we use the stream itself as the key? That was what I was doing for the DS cache. 🙈

Copy link
Member Author

@ieQu1 ieQu1 Jan 12, 2024

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

RocksDB doesn't like maps in the keys; all match expressions will fail with badarg.

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Right. The binary representation of the stream term then?

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

It's kinda fat though 🤔

@ieQu1 ieQu1 force-pushed the dev/refactor-sessds branch from e1a2514 to fe4bb00 Compare January 10, 2024 23:20
apps/emqx/src/emqx_persistent_session_ds.hrl Outdated Show resolved Hide resolved

ensure_iterator(TopicFilter, StartTime, SubId, {{RankX, RankY}, Stream}, S) ->
%% TODO: hash collisions
Key = {SubId, erlang:phash2(Stream)},
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Shouldn't we use the stream itself as the key? That was what I was doing for the DS cache. 🙈

apps/emqx/src/emqx_persistent_session_ds_gc_worker.erl Outdated Show resolved Hide resolved
Comment on lines 84 to 88
-define(created_at, created_at).
-define(last_alive_at, last_alive_at).
-define(conninfo, conninfo).
%% Unique integer used to create unique identities
-define(last_id, last_id).
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Q: shouldn't these use integer values to reduce the storage burden when persisting?

Copy link
Member Author

@ieQu1 ieQu1 Jan 12, 2024

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Translation of a map from/to a compact representation will be done here: https://github.com/emqx/emqx/pull/12251/files#diff-4f926b90b373b839679213fbdb3c30c784f5575bc3d4b721afce21466dded77eR394 (I haven't implemented any encoder callbacks yet).

I found that integer-keyed maps are rather unpleasant when debugging, I'd like to avoid them when possible.

@ieQu1 ieQu1 force-pushed the dev/refactor-sessds branch from 1eaa2af to 334c4e7 Compare January 12, 2024 01:43
@@ -1,5 +1,5 @@
%%--------------------------------------------------------------------
%% Copyright (c) 2020-2023 EMQ Technologies Co., Ltd. All Rights Reserved.
%% Copyright (c) 2020-2024 EMQ Technologies Co., Ltd. All Rights Reserved.
Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Auto-update copyright hook...
This change is accidental, but I guess it won't hurt.

@@ -1777,7 +1777,7 @@ fields("session_persistence") ->
sc(
pos_integer(),
#{
default => 1000,
default => 100,
Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Since the introduction of the mqueue buffer for persistent sessions, it's no longer necessary to use huge batches.

@ieQu1 ieQu1 marked this pull request as ready for review January 12, 2024 10:25
@ieQu1 ieQu1 requested review from keynslug, a team, JimMoen, HJianBo and lafirest as code owners January 12, 2024 10:25
@ieQu1 ieQu1 force-pushed the dev/refactor-sessds branch from 14f60c6 to 90ba95c Compare January 12, 2024 10:27
@ieQu1 ieQu1 force-pushed the dev/refactor-sessds branch 2 times, most recently from d732d54 to 10cb2f0 Compare January 12, 2024 11:24
@ieQu1 ieQu1 changed the title Refactor persistent session Refactor and optimize persistent session Jan 12, 2024
@ieQu1 ieQu1 force-pushed the dev/refactor-sessds branch 2 times, most recently from 25e93ff to d725db4 Compare January 12, 2024 21:50
changes/ce/feat-12251.en.md Outdated Show resolved Hide resolved
apps/emqx/src/emqx_topic_gbt.erl Show resolved Hide resolved
Comment on lines -412 to +413
case enrich_deliver(ClientInfo, D, UpgradeQoS, Session) of
[] ->
enrich_delivers(ClientInfo, Rest, UpgradeQoS, Session);
Msg ->
[Msg | enrich_delivers(ClientInfo, Rest, UpgradeQoS, Session)]
end.
enrich_deliver(ClientInfo, D, UpgradeQoS, Session) ++
enrich_delivers(ClientInfo, Rest, UpgradeQoS, Session).
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

What's the point of this change? It seems to just introduce a couple of extra heap allocations per each message.

Copy link
Member Author

@ieQu1 ieQu1 Jan 15, 2024

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

++ is a BIF though, and we can get rid of the pattern-matching. Not sure what is more expensive in terms of micro-optimizations, but always returning a list is at least nicer from the typing perspective.

apps/emqx/src/emqx_persistent_session_ds.erl Outdated Show resolved Hide resolved
apps/emqx/src/emqx_persistent_session_ds.erl Show resolved Hide resolved
apps/emqx/src/emqx_persistent_session_ds.erl Show resolved Hide resolved
-spec update_seqno(puback | pubrec | pubcomp, emqx_types:packet_id(), session()) ->
{ok, emqx_types:message(), session()} | {error, _}.
update_seqno(Track, PacketId, Session = #{id := SessionId, s := S}) ->
SeqNo = packet_id_to_seqno(PacketId, S),
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Q: Should we reject things like PUBREC(PacketID = 42) (i.e. packet id from different track)?

apps/emqx/src/emqx_persistent_session_ds.erl Outdated Show resolved Hide resolved
%%

transaction(Fun) ->
mria:async_dirty(?DS_MRIA_SHARD, Fun).
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This looks risky. For consistency to be preserved the order of operations in Fun should be correct. Right now it almost certainly isn't (e.g. because not-yet-persisted state is kept in a map, streams' iterators are updated in unspecified order, etc.), so the consistency looks easy to break.

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Agree that this part is a bit sketchy. However, session must NOT rely on mnesia_locker. If we need a locking mechanism to prevent multiple versions of session from clobbering each others' persistent data, it should be explicit.

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

It's not about concurrent updates, it's more about atomicity: async_dirty is not atomic in presence of failures I believe. It could have been much less of a problem if the operations were issued in the consistency-preserving order, but they are not.

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

If we intend to move the sessions to DS, we cannot rely on the atomicity of updates either. What would be the consistency preserving order here?

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

we cannot rely on the atomicity of updates either

Why? There are usable transactions both in rocksdb and fdb.

What would be the consistency preserving order here?

Hard to define simply enough. There might be none actually, given that more pieces of state are now tracked in separate records. Persisting ?dup markers first, then streams in the order of their respective seqno, then ?commited markers could be a good start, though I'm still unsure if holes during replay would be impossible (probably still possible), and when to persist ranks (is it reconstructible from list of ifss?).

),
{ok, [], Session};
handle_timeout(_ClientInfo, ?TIMER_BUMP_LAST_ALIVE_AT, Session0 = #{s := S0}) ->
S = emqx_persistent_session_ds_state:commit(bump_last_alive(S0)),
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I'd expects users will want this configurable, especially when QoS2 flows are involved.

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Not sure if I understand the remark. bump_interval() function inside bump_last_alive() reads the configuration. ?TIMER_BUMP_LAST_ALIVE_AT is the id of the timer event.

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Sorry, I was thinking about the configurability of the circumstances under which the session state is persisted.

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I specifically tried to avoid adding new config parameters in this PR, since I find constant adding and deprecation of config parameters quite annoying.

If there is real-life need to fine tune these parameters separately, we can always add a new timer/new config.

thalesmg
thalesmg previously approved these changes Jan 24, 2024
apps/emqx/src/emqx_persistent_session_ds.erl Show resolved Hide resolved
apps/emqx/src/emqx_persistent_session_ds.erl Outdated Show resolved Hide resolved
%%------------------------------------------------------------------------------
%% Testcases
%%------------------------------------------------------------------------------

t_non_persistent_session_subscription(_Config) ->
Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I removed this testcase because it tests in-memory session. Not sure if it's relevant for this testsuite. Also, testcases that verify absence of certain trace events are unreliable on their own: if a trace event is renamed or removed, the test will always pass.

thalesmg
thalesmg previously approved these changes Jan 26, 2024
@ieQu1 ieQu1 merged commit 9e0bea0 into emqx:master Jan 26, 2024
162 of 164 checks passed
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Labels
None yet
Projects
None yet
Development

Successfully merging this pull request may close these issues.

4 participants