-
Notifications
You must be signed in to change notification settings - Fork 2.3k
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
Use event-based mechanism for stream discovery #14130
Conversation
3a468d2
to
91685aa
Compare
1194284
to
bcbeb9f
Compare
bcbeb9f
to
8a7d56b
Compare
apps/emqx/src/emqx_persistent_session_ds/emqx_persistent_session_ds_subs.erl
Outdated
Show resolved
Hide resolved
apps/emqx/src/emqx_persistent_session_ds/emqx_persistent_session_ds_subs.erl
Outdated
Show resolved
Hide resolved
%% all nodes. | ||
-spec notify_new_stream(emqx_ds:db(), emqx_ds:topic_filter()) -> ok. | ||
notify_new_stream(DB, TF) -> | ||
emqx_ds_new_streams_proto_v1:notify([node() | nodes()], DB, TF). |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Don't we need running_nodes here?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
In theory, yes. But I don't like the kludge of using mria:running_nodes()
for unintended purpose.
It's mostly used as a workaround to avoid sending rpc calls to nodes that can't handle them (e.g. rshell), it's less relevant for casts, since reply is not expected.
%% "active" flag. Whenever it receives a notification about a new | ||
%% stream, it matches all active subscriptions' topic-filters against | ||
%% the topic-filter of the event, and sets `active' flags for every | ||
%% match. |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Probably some explantion of "dirty" therminology is needed.
|
||
register_db(DB, Backend) -> | ||
ets:insert(?TAB, {DB, Backend}), | ||
_ = supervisor:start_child(?WATCH_SUP, [DB]), |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
How is this supervisor stopped on db close/drop?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
It's never stopped by design. Stopping it would lose subscriptions and all events. I'll add a comment.
|
||
%% @doc This module is used for notifying processes about new streams. | ||
%% It's not a replacement for `emqx_ds:get_streams' function, it's | ||
%% only meant to optimize its usage. |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
The relation of the API that this module provides to the DS API may be still unclear: is it available for all DS'es? Which DS'es should implement it? How is it possible for a DS consumer understand whether it is possible to use it?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Right now the idea is that any DS backend/layout can use this module at its own discretion. I didn't have time to design a fancy layered API.
Fixes
Release version: v/e5.?
Summary
PR Checklist
Please convert it to a draft if any of the following conditions are not met. Reviewers may skip over until all the items are checked:
changes/(ce|ee)/(feat|perf|fix|breaking)-<PR-id>.en.md
filesChecklist for CI (.github/workflows) changes
changes/
dir for user-facing artifacts update