Skip to content

Commit

Permalink
feat: migrate kafka consumer bridge to source + connector
Browse files Browse the repository at this point in the history
  • Loading branch information
thalesmg committed Feb 26, 2024
1 parent 5326cc8 commit 37a43f0
Show file tree
Hide file tree
Showing 15 changed files with 1,087 additions and 208 deletions.
1 change: 1 addition & 0 deletions apps/emqx_bridge/src/emqx_action_info.erl
Original file line number Diff line number Diff line change
Expand Up @@ -92,6 +92,7 @@ hard_coded_action_info_modules_ee() ->
emqx_bridge_gcp_pubsub_consumer_action_info,
emqx_bridge_gcp_pubsub_producer_action_info,
emqx_bridge_kafka_action_info,
emqx_bridge_kafka_consumer_action_info,
emqx_bridge_kinesis_action_info,
emqx_bridge_hstreamdb_action_info,
emqx_bridge_matrix_action_info,
Expand Down
27 changes: 19 additions & 8 deletions apps/emqx_bridge/src/emqx_bridge_v2.erl
Original file line number Diff line number Diff line change
Expand Up @@ -765,19 +765,26 @@ create_dry_run(ConfRootKey, Type, Conf0) ->
{error, Reason1}
end.

create_dry_run_helper(ConfRootKey, BridgeType, ConnectorRawConf, BridgeV2RawConf) ->
create_dry_run_helper(ConfRootKey, BridgeV2Type, ConnectorRawConf, BridgeV2RawConf) ->
BridgeName = iolist_to_binary([?TEST_ID_PREFIX, emqx_utils:gen_id(8)]),
ConnectorType = connector_type(BridgeType),
ConnectorType = connector_type(BridgeV2Type),
OnReadyCallback =
fun(ConnectorId) ->
{_, ConnectorName} = emqx_connector_resource:parse_connector_id(ConnectorId),
ChannelTestId = id(BridgeType, BridgeName, ConnectorName),
Conf = emqx_utils_maps:unsafe_atom_key_map(BridgeV2RawConf),
ChannelTestId = id(BridgeV2Type, BridgeName, ConnectorName),
BridgeV2Conf0 = fill_defaults(
BridgeV2Type,
BridgeV2RawConf,
bin(ConfRootKey),
emqx_bridge_v2_schema,
#{make_serializable => false}
),
BridgeV2Conf = emqx_utils_maps:unsafe_atom_key_map(BridgeV2Conf0),
AugmentedConf = augment_channel_config(
ConfRootKey,
BridgeType,
BridgeV2Type,
BridgeName,
Conf
BridgeV2Conf
),
case emqx_resource_manager:add_channel(ConnectorId, ChannelTestId, AugmentedConf) of
{error, Reason} ->
Expand Down Expand Up @@ -1204,8 +1211,11 @@ perform_bridge_changes([#{action := Action, data := MapConfs} = Task | Tasks], E
perform_bridge_changes(Tasks, Errors).

fill_defaults(Type, RawConf, TopLevelConf, SchemaModule) ->
fill_defaults(Type, RawConf, TopLevelConf, SchemaModule, _Opts = #{}).

fill_defaults(Type, RawConf, TopLevelConf, SchemaModule, Opts) ->
PackedConf = pack_bridge_conf(Type, RawConf, TopLevelConf),
FullConf = emqx_config:fill_defaults(SchemaModule, PackedConf, #{}),
FullConf = emqx_config:fill_defaults(SchemaModule, PackedConf, Opts),
unpack_bridge_conf(Type, FullConf, TopLevelConf).

pack_bridge_conf(Type, RawConf, TopLevelConf) ->
Expand Down Expand Up @@ -1608,9 +1618,10 @@ split_and_validate_bridge_v1_config(BridgeV1Type, BridgeName, RawConf, PreviousR
}
}
},
SchemaMod = emqx_schema,
try
hocon_tconf:check_plain(
emqx_schema,
SchemaMod,
NewFakeGlobalConfig,
#{atom_key => false, required => false}
)
Expand Down
2 changes: 1 addition & 1 deletion apps/emqx_bridge/src/emqx_bridge_v2_api.erl
Original file line number Diff line number Diff line change
Expand Up @@ -775,7 +775,7 @@ handle_update(ConfRootKey, Id, Conf0) ->
Id,
case emqx_bridge_v2:lookup(ConfRootKey, BridgeType, BridgeName) of
{ok, _} ->
RawConf = emqx:get_raw_config([bridges, BridgeType, BridgeName], #{}),
RawConf = emqx:get_raw_config([ConfRootKey, BridgeType, BridgeName], #{}),
Conf = emqx_utils:deobfuscate(Conf1, RawConf),
update_bridge(ConfRootKey, BridgeType, BridgeName, Conf);
{error, not_found} ->
Expand Down
62 changes: 41 additions & 21 deletions apps/emqx_bridge/test/emqx_bridge_v2_testlib.erl
Original file line number Diff line number Diff line change
Expand Up @@ -89,6 +89,7 @@ end_per_testcase(_Testcase, Config) ->
%% in CI, apparently this needs more time since the
%% machines struggle with all the containers running...
emqx_common_test_helpers:call_janitor(60_000),
delete_all_bridges_and_connectors(),
ok = snabbkaffe:stop(),
ok
end.
Expand Down Expand Up @@ -132,15 +133,27 @@ parse_and_check(Kind, Type, Name, InnerConfigMap0) ->
TypeBin = emqx_utils_conv:bin(Type),
RawConf = #{RootBin => #{TypeBin => #{Name => InnerConfigMap0}}},
#{RootBin := #{TypeBin := #{Name := InnerConfigMap}}} = hocon_tconf:check_plain(
emqx_bridge_v2_schema, RawConf, #{required => false, atom_key => false}
emqx_bridge_v2_schema,
RawConf,
#{
required => false,
atom_key => false,
make_serializable => true
}
),
InnerConfigMap.

parse_and_check_connector(Type, Name, InnerConfigMap0) ->
TypeBin = emqx_utils_conv:bin(Type),
RawConf = #{<<"connectors">> => #{TypeBin => #{Name => InnerConfigMap0}}},
#{<<"connectors">> := #{TypeBin := #{Name := InnerConfigMap}}} = hocon_tconf:check_plain(
emqx_connector_schema, RawConf, #{required => false, atom_key => false}
emqx_connector_schema,
RawConf,
#{
required => false,
atom_key => false,
make_serializable => true
}
),
InnerConfigMap.

Expand Down Expand Up @@ -282,20 +295,23 @@ list_bridges_api() ->
ct:pal("list bridges result: ~p", [Res]),
Res.

get_source_api(BridgeType, BridgeName) ->
get_bridge_api(source, BridgeType, BridgeName).

get_bridge_api(BridgeType, BridgeName) ->
get_bridge_api(action, BridgeType, BridgeName).

get_bridge_api(BridgeKind, BridgeType, BridgeName) ->
BridgeId = emqx_bridge_resource:bridge_id(BridgeType, BridgeName),
Params = [],
Path = emqx_mgmt_api_test_util:api_path(["actions", BridgeId]),
AuthHeader = emqx_mgmt_api_test_util:auth_header_(),
Opts = #{return_all => true},
ct:pal("get bridge ~p (via http)", [{BridgeType, BridgeName}]),
Res =
case emqx_mgmt_api_test_util:request_api(get, Path, "", AuthHeader, Params, Opts) of
{ok, {Status, Headers, Body0}} ->
{ok, {Status, Headers, emqx_utils_json:decode(Body0, [return_maps])}};
Error ->
Error
Root =
case BridgeKind of
source -> "sources";
action -> "actions"
end,
Path = emqx_mgmt_api_test_util:api_path([Root, BridgeId]),
ct:pal("get bridge ~p (via http)", [{BridgeKind, BridgeType, BridgeName}]),
Res = request(get, Path, Params),
ct:pal("get bridge ~p result: ~p", [{BridgeType, BridgeName}, Res]),
Res.

Expand Down Expand Up @@ -672,7 +688,8 @@ t_async_query(Config, MakeMessageFun, IsSuccessCheck, TracePoint) ->
end,
ok.

%% - `ProduceFn': produces a message in the remote system that shall be consumed.
%% - `ProduceFn': produces a message in the remote system that shall be consumed. May be
%% a `{function(), integer()}' tuple.
%% - `Tracepoint': marks the end of consumed message processing.
t_consume(Config, Opts) ->
#{
Expand All @@ -683,14 +700,17 @@ t_consume(Config, Opts) ->
} = Opts,
?check_trace(
begin
?assertMatch(
{{ok, _}, {ok, _}},
snabbkaffe:wait_async_action(
fun() -> create_bridge_api(Config) end,
ConsumerReadyTPFn,
15_000
)
),
ConsumerReadyTimeout = maps:get(consumer_ready_timeout, Opts, 15_000),
case ConsumerReadyTPFn of
{Predicate, NEvents} when is_function(Predicate) ->
{ok, SRef0} = snabbkaffe:subscribe(Predicate, NEvents, ConsumerReadyTimeout);
Predicate when is_function(Predicate) ->
{ok, SRef0} = snabbkaffe:subscribe(
Predicate, _NEvents = 1, ConsumerReadyTimeout
)
end,
?assertMatch({ok, _}, create_bridge_api(Config)),
?assertMatch({ok, _}, snabbkaffe:receive_events(SRef0)),
ok = add_source_hookpoint(Config),
?retry(
_Sleep = 200,
Expand Down
7 changes: 6 additions & 1 deletion apps/emqx_bridge_kafka/src/emqx_bridge_kafka.app.src
Original file line number Diff line number Diff line change
Expand Up @@ -12,7 +12,12 @@
brod,
brod_gssapi
]},
{env, [{emqx_action_info_modules, [emqx_bridge_kafka_action_info]}]},
{env, [
{emqx_action_info_modules, [
emqx_bridge_kafka_action_info,
emqx_bridge_kafka_consumer_action_info
]}
]},
{modules, []},

{links, []}
Expand Down
105 changes: 105 additions & 0 deletions apps/emqx_bridge_kafka/src/emqx_bridge_kafka_consumer_action_info.erl
Original file line number Diff line number Diff line change
@@ -0,0 +1,105 @@
%%--------------------------------------------------------------------
%% Copyright (c) 2024 EMQ Technologies Co., Ltd. All Rights Reserved.
%%--------------------------------------------------------------------

-module(emqx_bridge_kafka_consumer_action_info).

-behaviour(emqx_action_info).

-export([
is_source/0,
is_action/0,
bridge_v1_type_name/0,
action_type_name/0,
connector_type_name/0,
schema_module/0,
connector_action_config_to_bridge_v1_config/2,
bridge_v1_config_to_action_config/2
]).

is_source() -> true.

is_action() -> false.

bridge_v1_type_name() -> kafka_consumer.

action_type_name() -> kafka_consumer.

connector_type_name() -> kafka_consumer.

schema_module() -> emqx_bridge_kafka_consumer_schema.

connector_action_config_to_bridge_v1_config(ConnectorConfig, ActionConfig) ->
V1Config1 = maps:remove(<<"connector">>, ActionConfig),
V1Config2 = emqx_utils_maps:deep_merge(ConnectorConfig, V1Config1),
V1Config3 = maybe_fabricate_topic_mapping(V1Config2),
{Params1, V1Config4} = maps:take(<<"parameters">>, V1Config3),
TopLevelCfgKeys = [to_bin(K) || {K, _} <- emqx_bridge_kafka:fields(consumer_opts), K =/= kafka],
TopLevelCfg = maps:with(TopLevelCfgKeys, Params1),
%% `topic' is v2-only
Params = maps:without([<<"topic">> | TopLevelCfgKeys], Params1),
V1Config5 = emqx_utils_maps:deep_merge(V1Config4, TopLevelCfg),
V1Config = emqx_utils_maps:update_if_present(
<<"resource_opts">>,
%% Slightly different from default source resource opts...
fun(RO) -> maps:with(v1_fields(connector_resource_opts), RO) end,
V1Config5
),
maps:put(<<"kafka">>, Params, V1Config).

bridge_v1_config_to_action_config(BridgeV1Conf, ConnectorName) ->
Config0 = emqx_action_info:transform_bridge_v1_config_to_action_config(
BridgeV1Conf, ConnectorName, schema_module(), source_parameters
),
TopicMapping = maps:get(<<"topic_mapping">>, BridgeV1Conf, []),
Params0 = maps:get(<<"kafka">>, BridgeV1Conf, #{}),
Params1 = maps:with(source_parameters_field_keys(), Params0),
Params2 = emqx_utils_maps:put_if(
Params1, <<"topic_mapping">>, TopicMapping, TopicMapping =/= []
),
Params = maybe_set_kafka_topic(Params2),
{source, action_type_name(), maps:put(<<"parameters">>, Params, Config0)}.

%%------------------------------------------------------------------------------------------
%% Internal helper functions
%%------------------------------------------------------------------------------------------

%% The new schema has a single kafka topic, so we take it from topic mapping when
%% converting from v1.
maybe_set_kafka_topic(#{<<"topic_mapping">> := [#{<<"kafka_topic">> := Topic} | _]} = Params) ->
Params#{<<"topic">> => Topic};
maybe_set_kafka_topic(Params) ->
Params.

%% The old schema requires `topic_mapping', which is now hidden.
maybe_fabricate_topic_mapping(#{<<"parameters">> := Params0} = BridgeV1Config0) ->
#{<<"topic">> := Topic} = Params0,
case maps:get(<<"topic_mapping">>, Params0, undefined) of
[_ | _] ->
BridgeV1Config0;
_ ->
%% Have to fabricate an MQTT topic, unfortunately... QoS and payload already
%% have defaults.
FakeTopicMapping = #{
<<"kafka_topic">> => Topic,
<<"mqtt_topic">> => <<>>
},
Params = Params0#{<<"topic_mapping">> => [FakeTopicMapping]},
BridgeV1Config0#{<<"parameters">> := Params}
end.

v1_fields(StructName) ->
[
to_bin(K)
|| {K, _} <- emqx_bridge_kafka:fields(StructName)
].

source_parameters_field_keys() ->
[
to_bin(K)
|| {K, _} <- emqx_bridge_kafka_consumer_schema:fields(source_parameters)
].

to_bin(B) when is_binary(B) -> B;
to_bin(L) when is_list(L) -> list_to_binary(L);
to_bin(A) when is_atom(A) -> atom_to_binary(A, utf8).
Loading

0 comments on commit 37a43f0

Please sign in to comment.