Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

feat: migrate kafka consumer bridge to source + connector #12595

Merged
merged 4 commits into from
Mar 1, 2024
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
1 change: 1 addition & 0 deletions apps/emqx_bridge/src/emqx_action_info.erl
Original file line number Diff line number Diff line change
Expand Up @@ -92,6 +92,7 @@ hard_coded_action_info_modules_ee() ->
emqx_bridge_gcp_pubsub_consumer_action_info,
emqx_bridge_gcp_pubsub_producer_action_info,
emqx_bridge_kafka_action_info,
emqx_bridge_kafka_consumer_action_info,
emqx_bridge_kinesis_action_info,
emqx_bridge_hstreamdb_action_info,
emqx_bridge_matrix_action_info,
Expand Down
24 changes: 17 additions & 7 deletions apps/emqx_bridge/src/emqx_bridge_v2.erl
Original file line number Diff line number Diff line change
Expand Up @@ -765,19 +765,26 @@ create_dry_run(ConfRootKey, Type, Conf0) ->
{error, Reason1}
end.

create_dry_run_helper(ConfRootKey, BridgeType, ConnectorRawConf, BridgeV2RawConf) ->
create_dry_run_helper(ConfRootKey, BridgeV2Type, ConnectorRawConf, BridgeV2RawConf) ->
BridgeName = iolist_to_binary([?TEST_ID_PREFIX, emqx_utils:gen_id(8)]),
ConnectorType = connector_type(BridgeType),
ConnectorType = connector_type(BridgeV2Type),
OnReadyCallback =
fun(ConnectorId) ->
{_, ConnectorName} = emqx_connector_resource:parse_connector_id(ConnectorId),
ChannelTestId = id(BridgeType, BridgeName, ConnectorName),
Conf = emqx_utils_maps:unsafe_atom_key_map(BridgeV2RawConf),
ChannelTestId = id(BridgeV2Type, BridgeName, ConnectorName),
BridgeV2Conf0 = fill_defaults(
BridgeV2Type,
BridgeV2RawConf,
bin(ConfRootKey),
emqx_bridge_v2_schema,
#{make_serializable => false}
),
BridgeV2Conf = emqx_utils_maps:unsafe_atom_key_map(BridgeV2Conf0),
AugmentedConf = augment_channel_config(
ConfRootKey,
BridgeType,
BridgeV2Type,
BridgeName,
Conf
BridgeV2Conf
),
case emqx_resource_manager:add_channel(ConnectorId, ChannelTestId, AugmentedConf) of
{error, Reason} ->
Expand Down Expand Up @@ -1204,8 +1211,11 @@ perform_bridge_changes([#{action := Action, data := MapConfs} = Task | Tasks], E
perform_bridge_changes(Tasks, Errors).

fill_defaults(Type, RawConf, TopLevelConf, SchemaModule) ->
fill_defaults(Type, RawConf, TopLevelConf, SchemaModule, _Opts = #{}).

fill_defaults(Type, RawConf, TopLevelConf, SchemaModule, Opts) ->
PackedConf = pack_bridge_conf(Type, RawConf, TopLevelConf),
FullConf = emqx_config:fill_defaults(SchemaModule, PackedConf, #{}),
FullConf = emqx_config:fill_defaults(SchemaModule, PackedConf, Opts),
unpack_bridge_conf(Type, FullConf, TopLevelConf).

pack_bridge_conf(Type, RawConf, TopLevelConf) ->
Expand Down
2 changes: 1 addition & 1 deletion apps/emqx_bridge/src/emqx_bridge_v2_api.erl
Original file line number Diff line number Diff line change
Expand Up @@ -775,7 +775,7 @@ handle_update(ConfRootKey, Id, Conf0) ->
Id,
case emqx_bridge_v2:lookup(ConfRootKey, BridgeType, BridgeName) of
{ok, _} ->
RawConf = emqx:get_raw_config([bridges, BridgeType, BridgeName], #{}),
Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

🙈

RawConf = emqx:get_raw_config([ConfRootKey, BridgeType, BridgeName], #{}),
Conf = emqx_utils:deobfuscate(Conf1, RawConf),
update_bridge(ConfRootKey, BridgeType, BridgeName, Conf);
{error, not_found} ->
Expand Down
62 changes: 41 additions & 21 deletions apps/emqx_bridge/test/emqx_bridge_v2_testlib.erl
Original file line number Diff line number Diff line change
Expand Up @@ -89,6 +89,7 @@ end_per_testcase(_Testcase, Config) ->
%% in CI, apparently this needs more time since the
%% machines struggle with all the containers running...
emqx_common_test_helpers:call_janitor(60_000),
delete_all_bridges_and_connectors(),
ok = snabbkaffe:stop(),
ok
end.
Expand Down Expand Up @@ -132,15 +133,27 @@ parse_and_check(Kind, Type, Name, InnerConfigMap0) ->
TypeBin = emqx_utils_conv:bin(Type),
RawConf = #{RootBin => #{TypeBin => #{Name => InnerConfigMap0}}},
#{RootBin := #{TypeBin := #{Name := InnerConfigMap}}} = hocon_tconf:check_plain(
emqx_bridge_v2_schema, RawConf, #{required => false, atom_key => false}
emqx_bridge_v2_schema,
RawConf,
#{
required => false,
atom_key => false,
make_serializable => true
}
),
InnerConfigMap.

parse_and_check_connector(Type, Name, InnerConfigMap0) ->
TypeBin = emqx_utils_conv:bin(Type),
RawConf = #{<<"connectors">> => #{TypeBin => #{Name => InnerConfigMap0}}},
#{<<"connectors">> := #{TypeBin := #{Name := InnerConfigMap}}} = hocon_tconf:check_plain(
emqx_connector_schema, RawConf, #{required => false, atom_key => false}
emqx_connector_schema,
RawConf,
#{
required => false,
atom_key => false,
make_serializable => true
}
),
InnerConfigMap.

Expand Down Expand Up @@ -282,20 +295,23 @@ list_bridges_api() ->
ct:pal("list bridges result: ~p", [Res]),
Res.

get_source_api(BridgeType, BridgeName) ->
get_bridge_api(source, BridgeType, BridgeName).

get_bridge_api(BridgeType, BridgeName) ->
get_bridge_api(action, BridgeType, BridgeName).

get_bridge_api(BridgeKind, BridgeType, BridgeName) ->
BridgeId = emqx_bridge_resource:bridge_id(BridgeType, BridgeName),
Params = [],
Path = emqx_mgmt_api_test_util:api_path(["actions", BridgeId]),
AuthHeader = emqx_mgmt_api_test_util:auth_header_(),
Opts = #{return_all => true},
ct:pal("get bridge ~p (via http)", [{BridgeType, BridgeName}]),
Res =
case emqx_mgmt_api_test_util:request_api(get, Path, "", AuthHeader, Params, Opts) of
{ok, {Status, Headers, Body0}} ->
{ok, {Status, Headers, emqx_utils_json:decode(Body0, [return_maps])}};
Error ->
Error
Root =
case BridgeKind of
source -> "sources";
action -> "actions"
end,
Path = emqx_mgmt_api_test_util:api_path([Root, BridgeId]),
ct:pal("get bridge ~p (via http)", [{BridgeKind, BridgeType, BridgeName}]),
Res = request(get, Path, Params),
ct:pal("get bridge ~p result: ~p", [{BridgeType, BridgeName}, Res]),
Res.

Expand Down Expand Up @@ -672,7 +688,8 @@ t_async_query(Config, MakeMessageFun, IsSuccessCheck, TracePoint) ->
end,
ok.

%% - `ProduceFn': produces a message in the remote system that shall be consumed.
%% - `ProduceFn': produces a message in the remote system that shall be consumed. May be
%% a `{function(), integer()}' tuple.
%% - `Tracepoint': marks the end of consumed message processing.
t_consume(Config, Opts) ->
#{
Expand All @@ -683,14 +700,17 @@ t_consume(Config, Opts) ->
} = Opts,
?check_trace(
begin
?assertMatch(
{{ok, _}, {ok, _}},
snabbkaffe:wait_async_action(
fun() -> create_bridge_api(Config) end,
ConsumerReadyTPFn,
15_000
)
),
ConsumerReadyTimeout = maps:get(consumer_ready_timeout, Opts, 15_000),
case ConsumerReadyTPFn of
{Predicate, NEvents} when is_function(Predicate) ->
{ok, SRef0} = snabbkaffe:subscribe(Predicate, NEvents, ConsumerReadyTimeout);
Predicate when is_function(Predicate) ->
{ok, SRef0} = snabbkaffe:subscribe(
Predicate, _NEvents = 1, ConsumerReadyTimeout
)
end,
?assertMatch({ok, _}, create_bridge_api(Config)),
?assertMatch({ok, _}, snabbkaffe:receive_events(SRef0)),
ok = add_source_hookpoint(Config),
?retry(
_Sleep = 200,
Expand Down
7 changes: 6 additions & 1 deletion apps/emqx_bridge_kafka/src/emqx_bridge_kafka.app.src
Original file line number Diff line number Diff line change
Expand Up @@ -12,7 +12,12 @@
brod,
brod_gssapi
]},
{env, [{emqx_action_info_modules, [emqx_bridge_kafka_action_info]}]},
{env, [
{emqx_action_info_modules, [
emqx_bridge_kafka_action_info,
emqx_bridge_kafka_consumer_action_info
]}
]},
{modules, []},

{links, []}
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,105 @@
%%--------------------------------------------------------------------
%% Copyright (c) 2024 EMQ Technologies Co., Ltd. All Rights Reserved.
%%--------------------------------------------------------------------

-module(emqx_bridge_kafka_consumer_action_info).

-behaviour(emqx_action_info).

-export([
is_source/0,
is_action/0,
bridge_v1_type_name/0,
action_type_name/0,
connector_type_name/0,
schema_module/0,
connector_action_config_to_bridge_v1_config/2,
bridge_v1_config_to_action_config/2
]).

is_source() -> true.

is_action() -> false.

bridge_v1_type_name() -> kafka_consumer.

action_type_name() -> kafka_consumer.

connector_type_name() -> kafka_consumer.

schema_module() -> emqx_bridge_kafka_consumer_schema.

connector_action_config_to_bridge_v1_config(ConnectorConfig, ActionConfig) ->
V1Config1 = maps:remove(<<"connector">>, ActionConfig),
V1Config2 = emqx_utils_maps:deep_merge(ConnectorConfig, V1Config1),
V1Config3 = maybe_fabricate_topic_mapping(V1Config2),
{Params1, V1Config4} = maps:take(<<"parameters">>, V1Config3),
TopLevelCfgKeys = [to_bin(K) || {K, _} <- emqx_bridge_kafka:fields(consumer_opts), K =/= kafka],
TopLevelCfg = maps:with(TopLevelCfgKeys, Params1),
%% `topic' is v2-only
Params = maps:without([<<"topic">> | TopLevelCfgKeys], Params1),
V1Config5 = emqx_utils_maps:deep_merge(V1Config4, TopLevelCfg),
V1Config = emqx_utils_maps:update_if_present(
<<"resource_opts">>,
%% Slightly different from default source resource opts...
fun(RO) -> maps:with(v1_fields(connector_resource_opts), RO) end,
V1Config5
),
maps:put(<<"kafka">>, Params, V1Config).

bridge_v1_config_to_action_config(BridgeV1Conf, ConnectorName) ->
Config0 = emqx_action_info:transform_bridge_v1_config_to_action_config(
BridgeV1Conf, ConnectorName, schema_module(), source_parameters
),
TopicMapping = maps:get(<<"topic_mapping">>, BridgeV1Conf, []),
Params0 = maps:get(<<"kafka">>, BridgeV1Conf, #{}),
Params1 = maps:with(source_parameters_field_keys(), Params0),
Params2 = emqx_utils_maps:put_if(
Params1, <<"topic_mapping">>, TopicMapping, TopicMapping =/= []
),
Params = maybe_set_kafka_topic(Params2),
{source, action_type_name(), maps:put(<<"parameters">>, Params, Config0)}.

%%------------------------------------------------------------------------------------------
%% Internal helper functions
%%------------------------------------------------------------------------------------------

%% The new schema has a single kafka topic, so we take it from topic mapping when
%% converting from v1.
maybe_set_kafka_topic(#{<<"topic_mapping">> := [#{<<"kafka_topic">> := Topic} | _]} = Params) ->
Params#{<<"topic">> => Topic};
maybe_set_kafka_topic(Params) ->
Params.

%% The old schema requires `topic_mapping', which is now hidden.
maybe_fabricate_topic_mapping(#{<<"parameters">> := Params0} = BridgeV1Config0) ->
#{<<"topic">> := Topic} = Params0,
case maps:get(<<"topic_mapping">>, Params0, undefined) of
[_ | _] ->
BridgeV1Config0;
_ ->
%% Have to fabricate an MQTT topic, unfortunately... QoS and payload already
%% have defaults.
FakeTopicMapping = #{
<<"kafka_topic">> => Topic,
<<"mqtt_topic">> => <<>>
},
Params = Params0#{<<"topic_mapping">> => [FakeTopicMapping]},
BridgeV1Config0#{<<"parameters">> := Params}
end.

v1_fields(StructName) ->
[
to_bin(K)
|| {K, _} <- emqx_bridge_kafka:fields(StructName)
].

source_parameters_field_keys() ->
[
to_bin(K)
|| {K, _} <- emqx_bridge_kafka_consumer_schema:fields(source_parameters)
].

to_bin(B) when is_binary(B) -> B;
to_bin(L) when is_list(L) -> list_to_binary(L);
to_bin(A) when is_atom(A) -> atom_to_binary(A, utf8).
Loading
Loading