erlangã§fluentdã¯ã©ã¤ã¢ã³ãã®OTPã¢ããªã±ã¼ã·ã§ã³ãå®è£ ãã
ã¯ããã«
ã¢ããªã±ã¼ã·ã§ã³ã«ãã°ããã¼ã¿ãéãããããã®ã«ãã°ãã°å©ç¨ããã Fluentd ã¨ããã½ããã¦ã§ã¢ã®ã¯ã©ã¤ã¢ã³ããErlang/OTPã§å®è£ ãã話ã§ãã
æ¬å®¶ã®erlangã®fluentdã®ã¯ã©ã¤ã¢ã³ã ã®å®è£ ãçºãã¦ãã¦ã gen_eventã§å®è£ ããã¦ãããããéè² è·ãé害æãªã©ãèæ ®ãããå ´åã¯ã²ã¨å·¥å¤«ãå ¥ããå¿ è¦ãããããã¨æãã ä»åæ¬å®¶ã¨ç°ãªãããã¤ãã¢ã§å®è£ ãã¦ã¿ã¾ããã
ä½æ gen_event使ããªãã£ãã®ãï¼
çµè«ããè¨ãã¨ãå¥ã·ã¹ãã ã«ãã¼ã¿ãéä¿¡ãããããªã±ã¼ã¹ã§ã®gen_eventã®å©ç¨ã¯é¿ãããã£ãããã§ãã ä½æ é¿ããã®ãã¨ãã説æãããããã«ãç°¡åã«gen_eventã®è©±ããã¾ãã
gen_eventã¯ä¸ã¤ã®ã¤ãã³ãããã¼ã¸ã£ã¼ã¨0å以ä¸ã®ã¤ãã³ããã³ãã©ã¼ã§æ§æããã ããã¼ã¸ã£ã¼ã«ã¡ãã»ã¼ã¸ãéä¿¡ãã(notify, sync_notifyé¢æ°)ã¨ã ããã¼ã¸ã£ã«ç»é²ãããã³ãã©ã¼ã®å¦çãèµ°ãã¨ããæãã®ããã¤ãã¢ã¼ã§ãã
gen_eventã®ä¾¿å©ãªã¨ããã¯ãæµãããã¤ãã³ãã¯åãã ããã©å¥ç®çã§ããããå¦çããããã¨ããã®ãç°¡åã«å®è£ ã§ãããã¨ãæãããã¾ãã ä¾ãã°ããã°ã®ã³ã³ã½ã¼ã«åºåããã¡ã¤ã«æ¸ãè¾¼ã¿ãã¨ã©ã¼ã®ã¢ã©ã¼ãçºå ±ãä¸ã¤ã®gen_eventã§ã¾ã¨ãããã¨ãã§ãã¾ãã
ãã ãgen_eventãæ±ãå ´åã«æ°ãã¤ããªããã°ãããªãç¹ãããã¾ãã ã¾ãããããã®è¤æ°ãã³ãã©ã¼ã¯ä¸¦åå®è¡ãããªãç¹ã§ãããã®ãããä¸ã¤ã®ãã³ãã©ã¼ã®å¦çã«æéããããã¨ãã³ãã©ã¼å ¨ä½ã«å½±é¿ãåºã¾ãã 次ã«ã1ããã»ã¹ã«ã¡ãã»ã¼ã¸ãéä¸ããã®ã§éè² è·ã«å¼±ãã§ãã äºåã«è¨è¨ããã¤ãã³ãããã¼ã¸ã£ã®æµéãå¤ãã¨åãã£ã¦ããå ´åã ãã®ããã¼ã¸ã£ã¼ã«å¦çãå¤§å¹ ã«é ãããããªãã³ãã©ã¼ãä¸ã¤ã ãã§ã追å ããã¨ããã®ã¯é¿ããã»ããè¯ãã§ãããã
fluentdã¨ã®ããåãããã®ã¾ã¾ãã³ãã©ã¼ã«æ¸ãè¾¼ãã¨ã ã¬ã¤ãã³ã·ã大ãããªã£ãããfluentdãã¦ã³æãªã©ã®å½±é¿ã§ã·ã¹ãã å ¨ä½ãé ããªãå¯è½æ§ãããã¾ãã
ãªã®ã§ãä»åãã®ç¹ã注åããfluentdã®ã¯ã©ã¤ã¢ã³ãã¢ããªã±ã¼ã·ã§ã³ãå¥ããã¤ãã¢ã§å®è£ ãããã¨ã«ãã¾ããã
å®è£
é¸æããbehaviorã¯gen_statemã§ãã gen_serverã§ãè¯ãã£ãã®ã§ããã以ä¸ã®çç±ã§gen_statemã«ãã¾ããã
- gen_statemã®ç·´ç¿
- ç¶æ ãé¢æ°åã¨ãã¦å®ç¾©ã§ããã®ã§ãç¶æ å¤æ°ãå°ãæ¸ã
- åæ¥ç¶ã®å¦çãæ¸ãããã
以ä¸ãæ¦è¦å³ã§ãã
efluentc_supãsimple_one_for_oneã§efluentcã®ã¯ã©ã¤ã¢ã³ãããã»ã¹ãåºå®æ°èµ·åãã¾ãï¼ããã©ã«ãã¯ï¼ï¼ã 以ä¸ãefluentc_supã®start_linkã§ãã
start_link() -> Num = application:get_env(efluentc, worker_size, ?DEFAULT_WORKER_SIZE), Ret = supervisor:start_link({local, ?SERVER}, ?MODULE, []), [supervisor:start_child(?SERVER, [Id]) || Id <- lists:seq(0, Num-1)], Ret.
gen_statemããã¤ãã¢ã§å®è£ ãã¦ããefluentc_clientã®ç¶æ é·ç§»å³ã¯ä»¥ä¸ã®æãã«ãªã£ã¦ãã¾ãã
åæåé¢æ°(init/1)ã¨åç¶æ ã«ã¤ãã¦ãã³ã¼ãã交ãã¤ã¤è§£èª¬ãã¦ããã¾ãã
init([Id]) -> Host = application:get_env(efluentc, host, localhost), Port = application:get_env(efluentc, port, 24224), BuffSize = application:get_env(efluentc, flush_buffer_size, ?DEFAULT_FLUSH_BUFFER_SIZE), Timeout = application:get_env(efluentc, flush_timeout, ?DEFAULT_FLUSH_TIMEOUT), State = #state{id = Id, host = Host, port = Port, flush_buffer_size = BuffSize, flush_timeout = Timeout}, process_flag(trap_exit, true), {ok, closed, State, [{next_event, cast, connect}]}. closed(cast, connect, State) -> case try_connect(State) of {ok, NewState, empty} -> {next_state, empty, NewState}; {ok, NewState, buffered} -> {next_state, bufferd, NewState, flush_action()}; {error, Reason} -> ?ERR_LOG("connect failed: ~p", [Reason]), {next_state, closed, State, reconnect_action(0)} end;
init/1ããclosedã®ç¶æ é·ç§»ããã¨ããinitå ã§ã¯æ¥ç¶ã¾ã§ã¯ããã«fluentdã®æ¥ç¶è¨å®ãç¶æ å¤æ°(State)ã«æããã¦closedã¸é·ç§»ãã¾ãã ããã¯ãinitæã«æ¥ç¶ã¾ã§ãã¦ãã¾ãã¨ãfluentdãæ»ãã§ãã¾ã£ãã¨ãã«ããã»ã¹ã®èµ·åãåºæ¥ãã«æçµçã«ã·ã¹ãã å ¨ä½ããã¦ã³ãã¦ãã¾ãã¿ãããªãã¨ãé¿ããããã§ãã
åæç¶æ ã«ããã«ã¤ãã³ããçºç«ãããããnext_eventã®ã¢ã¯ã·ã§ã³ããªãã·ã§ã³ã«æããã¦è¿ãã¦ãã¾ãã closedã®ç¶æ ã§connectã®ã¤ãã³ããæ¥ãã¨ããtry_connecté¢æ°ã§fluentdã¨ã®æ¥ç¶ã試ã¿ã æåããå ´åã¯emptyç¶æ ãbufferedç¶æ ã«é·ç§»ãã¾ãã emptyãbufferedãã¯ç¶æ å¤æ°ã®bufferã空ãã©ããã§å¤æããã¾ãã 失æããå ´åã¯ãåæ¥ç¶ã®ã¢ã¯ã·ã§ã³ã«é£ã°ããã¾ãã
% @private reconnect_action(N) -> [{{timeout, connect}, backoff(N), {reconnect, N}}].
reconnectã¢ã¯ã·ã§ã³ã¯timeoutã®ã¢ã¯ã·ã§ã³ã使ã£ã¦ãã¾ãã
backoff(N) ããªç§å¾ã«ã¿ã¤ã ã¢ã¦ããçºç«ããã¦ãclose({timeout, connect}, Req, State)
ã¸callbackããããã¨ãæå¾
ãã¦ãã¾ãã
closed({timeout, connect}, {reconnect, N}, State) -> case try_connect(State) of {ok, NewState, empty} -> {next_state, empty, NewState}; {ok, NewState, buffered} -> {next_state, buffered, NewState, flush_action()}; {error, Reason} -> ?ERR_LOG("connect failed: ~p", [Reason]), {next_state, closed, State, reconnect_action(N+1)} end;
callbackå ã§å度æ¥ç¶ã試ã¿ã¦ãæåãããemptyãbufferedã失æãããreconnect_actionã§ã¿ã¤ã ã¢ã¦ããå¾ ã¡ã¾ãã ãã®ä¸é£ã®æ¥ç¶å¦çã®æµãã¯æ¥ç¶ãæåããã¾ã§æ°¸é ã¨ç¹°ãè¿ããã¾ãã
ã¾ããclosedã®ç¶æ ã§ãpostããããã¿ã¼ã³ãããããããã®ã±ã¢ããã¾ãã ä¸å®ãã¤ãæ°ã¯ç¶æ å¤æ°ã®ãããã¡ã¼ã«ãããã¦ãæ¥ç¶ãéå§ãããã¨åæã«é 次éä¿¡ããããã«ãã¦ã¾ãã
closed(cast, {post, Tag, Data}, #state{buffer = Buffs, buffer_caches = Caches, flush_buffer_size = FBS} = State) when FBS < byte_size(Buffs) -> Msg = encode_msg(Tag, Data), NewCaches = append_buffer_caches(Buffs, Caches), {next_state, closed, State#state{buffer = Msg, buffer_caches = NewCaches}}; closed(cast, {post, Tag, Data}, #state{buffer = Buffs} = State) -> Msg = encode_msg(Tag, Data), NewBuffs = <<Buffs/binary, Msg/binary>>, {next_state, closed, State#state{buffer = NewBuffs}}.
é¾å¤ã¯ï¼ã¤ããã¾ãã ä¸åº¦ã«éä¿¡ããã¡ãã»ã¼ã¸ã®ãã¤ãæ°(flush_buffer_size)ã¨ããã®å¡ã®æ°(buffer_cachesã®é·ã)ã§ãã ã¡ãã»ã¼ã¸ã®ãµã¤ãºãflush_buffer_sizeãè¶ ããå ´åãbuffer_cachesã®ãªã¹ãã«è¿½å ããã ãã®ã¨ãã«buffer_cachesã®é·ããé¾å¤ãè¶ ãã¦ãããããã®ã¡ãã»ã¼ã¸ã¯ãã¹ããã¾ãã
次ã«ãemptyã¨bufferedç¶æ ã®ã¨ãã®å¦çã§ãã
empty(cast, {post, Tag, Data}, State = #state{buffer = Buffs, flush_timeout = Timeout}) -> Msg = encode_msg(Tag, Data), NewBuffs = <<Buffs/binary, Msg/binary>>, {next_state, buffered, State#state{buffer = NewBuffs}, flush_timeout_action(Timeout)};
emptyç¶æ ã¯bufferã«ãã¼ã¿ãããã®ã¨ãã®ç¶æ ã®ãããpostããã¦bufferã«ãã¼ã¿ãè©°ãããããbufferedç¶æ ã«ç§»è¡ãã¾ãã bufferã«è©°ãããããã¼ã¿ã¯flush_buffered_sizeãè¶ ããã¨ãããflush_timeoutå¾ã«fluentdã¸éä¿¡ããã¾ãã fluentdã¸ã®éä¿¡ã¯ãbufferedç¶æ ããã®ã¿å®è¡ããã¾ãã
buffered(_Event, flush, State) -> case buffered_send(State) of {{error, timeout}, NextState, NewState} -> ?ERR_LOG("send timeout", []), {next_state, NextState, NewState}; {{error, Reason}, NextState, NewState} -> ?ERR_LOG("send failed: ~p", [Reason]), {next_state, NextState, NewState, reconnect_action(0)}; {ok, NextState, NewState} -> {next_state, NextState, NewState} end; buffered(cast, {post, Tag, Data}, State = #state{flush_timeout = Timeout}) -> Msg = encode_msg(Tag, Data), case append_buffer(Msg, State) of {buffer_overflow, NewState} -> {next_state, buffered, NewState, flush_action()}; {ok, NewState} -> {next_state, buffered, NewState, flush_timeout_action(Timeout)} end; % @private flush_action() -> [{next_event, cast, flush}]. % @private flush_timeout_action(Timeout) -> [{state_timeout, Timeout, flush}].
flushã®ãªã¯ã¨ã¹ããæ¥ãã¨ãã¯fluentdã¸ãã¼ã¿ãéä¿¡ãpostãªã¯ã¨ã¹ããæ¥ãå ´åã¯bufferã¸ãã¼ã¿ãè©°ãã¦ããã¾ãã flush_buffer_sizeãè¶ ããå ´åããããã¯flush_timeoutå¾ã«ãã¼ã¿ãflushãã¾ãã
以ä¸ãgithubã®ãªãã¸ããªã§ãã GitHub - tkyshm/efluentc: efluentc is the Client OTP application for Fluentd
使ãæ¹
使ãæ¹ã¯ãefluentc:post(Tag, Message)
ã¨ããã ãã§ãã
> efluentc:post(<<"test.tag">>, <<"message">>). ok > efluentc:post('test.tag', <<"message">>). ok > efluentc:post('test.tag', "message"). ok > efluentc:post('test.tag', #{<<"key">> => <<"value">>}). ok
ã¾ããclientæ°ãflush_buffer_sizeãflush_timeoutãªã©ã®é¾å¤ãå¤æ´ããå ´åã¯ã次ã®ããã«configãæ¸ãã¾ãã
[ {efluentc, [{worker_size, 10}, % 10 workers {fluesh_timeout, 500}, % 500msec {buffer_size, 1048576}, % 1MiB {host, localhost}, {port, 24224}]} ]
ææ³
gen_statemããã¤ãã¢ã®ç·´ç¿ã¨ãã¦ã å¤é¨ã¨éä¿¡ãããããªã¤ã³ã¿ã¼ãã§ã¼ã¹ãä½ãã¯çµæ§è¯ãé¡æã ã£ãããããªãããªã¨æãã¾ããã
ä»åä½ã£ãefluentcã®ã¢ã¤ãã£ã¢ãè¯ããã©ããã¯ç½®ãã¦ããã¦ã gen_statemã¯æ±ç¨æ§é«ãã¦è²ã ã¨å¿ç¨ãå¹ãããã§è¯ãæãã ãªã¨æãã¾ããã
ãã¥ã¼ãå ¥ãã¦ãéä¿¡ã§ããªãå ´åã®ä¸æéé¿ããé åãä½ã£ããã©ã æãè¿ãã¨ãã è¤éã«ããã ãããç¥ããªãã¨ããæãããã£ããã éä¿¡ã§ããªãå ´åã«ã¢ã©ã¼ããä¸ããããããã«ãã¦ãåé¡ãããã«çºè¦ã§ãããããªä½ãã®ã»ããè¯ãã£ãããç¥ããªãã§ãã
ãã¨ãgen_eventã«çµã¿è¾¼ã¿ãããªã£ãã¨ãã¦ãã åã handlerãå®è£ ãã¦ããã£ã¦ããã®ä¸ã§efluentc:postãã©ãããã¦ãã¾ãã°è¯ãããã ã¨æã...ã