ComplexEventProcessingエンジン Setsuna をリリースしました!!

やっとリリースできました!

Sourceforge.jpダウンロードページ


名前はSetsuna(セツナ)としました。
時間の単位を表す言葉である刹那からきています。
刹那の短さでデータを処理出来ればいいなということでこの名前にしました。




このSetsunaですが、CEPといわれるカテゴリに属するものです。
ではCEPとは??なんですが、これは語源そのままです。


Complex Event Processing


複数のイベントつまりデータを処理するエンジンです。
イメージとしてはデータの流れに直接処理を行えるイメージです。




あまり文章ばかりでもつまらないので、実際Setsunaを使いながら説明していきたいと思います。
では題材はこんなのにしたいと思います。
「Apacheが重い!! ひょっとして攻撃されてる?!」


すいません、わけわからんタイトルですね。。
やりたいことは、Webサーバの負荷が一定値を超えた場合に、Apacheのアクセスログの直近1万件以内からの
IPアドレス別でアクセス回数を合計しその合計値が、規定回数を超えていた場合に、アラート内容をSystemログに
出力するというものです。


ではこの処理をどのようにするか順を追って見ていきたいと思います。
※環境はCentOS5.5(64bit)前提として進めます。


Step1.サーバの負荷を監視する
Step1:まずはサーバの負荷を監視しないとダメです。
ここではtopコマンドのロードアベレージを負荷の指標に利用したいと思います。


$top -d 1
結果は以下のような感じです。

top - 16:47:19 up 8 min, 2 users, load average: 0.06, 0.11, 0.08
Tasks: 78 total, 1 running, 77 sleeping, 0 stopped, 0 zombie
Cpu(s): 2.5%us, 3.0%sy, 0.0%ni, 93.9%id, 0.2%wa, 0.1%hi, 0.3%si, 0.0%st
Mem: 793284k total, 180116k used, 613168k free, 12272k buffers
Swap: 1048568k total, 0k used, 1048568k free, 130704k cached

PID USER PR NI VIRT RES SHR S %CPU %MEM TIME+ COMMAND
1 root 15 0 2072 636 544 S 0.0 0.1 0:00.72 init
2 root RT -5 0 0 0 S 0.0 0.0 0:00.00 migration/0
               ・
               ・
               ・

では早速これをSetsunaに流し込みたいと思います。
Setsunaは標準でパイプで繋ぐことでデータを流し込むことが可能です。
Setsunaをダウンダウンロードしてもらって適当な場所に解凍してもらうと
中にsetsuna.jarというファイルがあると思います。それが本体です。


実行可能jarなので、javaが動く環境であればこのファイル以外は何も必要ありません。

$top -b -d 1 | java -jar setsuna.jar
上記を実行すると以下のようなエラーが出ると思います。

setsuna.core.util.SetsunaException: The read data differs from definition information. Adapter name is PIPE
at setsuna.core.adapter.SetsunaCoreAdapterEngine.doEvent(SetsunaCoreAdapterEngine.java:67)
at setsuna.core.AbstractCoreEngine.run(AbstractCoreEngine.java:54)
これは、Setsunaの自動テーブルマッピングの機能がこけています。つまりSetsunaまでは
topの結果は届いています。ではどういったエラーか??
 それは、1行目のデータと2行目のデータの形式が違いますよということを言っています。
Setsunaはパイプライン入力で渡されたデータを内蔵するH2Databseに自動的に格納します。
そのため、格納するためにデータをレコードとして認識しかつ、カラムに分解しようとします。
Setsunaはデフォルトではデータ(レコード)の区切りを改行とし、カラムレベルへの分解に
半角スペースを利用します。(2つ以上続くスペースは自動的に1つにトリムされます)
topの結果をもう一度みてみると、


1: top - 16:47:19 up 8 min, 2 users, load average: 0.06, 0.11, 0.08
2: Tasks: 78 total, 1 running, 77 sleeping, 0 stopped, 0 zombie
行番号1と2では全くフォーマットが異なります。フォーマットとはスペースで分解した際の分解できた数です。
そこでロードアベレージを含む1行目だけをSetsunaに送り込んでみましょう。これにはgrepを利用します。
そしてこのデータの流れに名前を付けたいと思います。-streamオプションを利用すると名前が付けれます。

$top -b -d 1 | grep --line-buffered "top -" | java -jar setsuna.jar -stream top


Exceptionが出なくなり変わりに以下のような結果になっていないでしょうか?

{"COLUMN0":"top","COLUMN1":"-","COLUMN2":"17:01:58","COLUMN3":"up","COLUMN4":"23","COLUMN5":"min,","COLUMN6":"2","COLUMN7":"users,","COLUMN8":"load","COLUMN9":"average:","COLUMN10":"0.20,","COLUMN11":"0.12,","COLUMN12":"0.09"}
{"COLUMN0":"top","COLUMN1":"-","COLUMN2":"17:01:59","COLUMN3":"up","COLUMN4":"23","COLUMN5":"min,","COLUMN6":"2","COLUMN7":"users,","COLUMN8":"load","COLUMN9":"average:","COLUMN10":"0.20,","COLUMN11":"0.12,","COLUMN12":"0.09"}
{"COLUMN0":"top","COLUMN1":"-","COLUMN2":"17:02:00","COLUMN3":"up","COLUMN4":"23","COLUMN5":"min,","COLUMN6":"2","COLUMN7":"users,","COLUMN8":"load","COLUMN9":"average:","COLUMN10":"0.20,","COLUMN11":"0.12,","COLUMN12":"0.09"}
{"COLUMN0":"top","COLUMN1":"-","COLUMN2":"17:02:01","COLUMN3":"up","COLUMN4":"23","COLUMN5":"min,","COLUMN6":"2","COLUMN7":"users,","COLUMN8":"load","COLUMN9":"average:","COLUMN10":"0.20,","COLUMN11":"0.12,","COLUMN12":"0.09"}

これはSetsunaが取り込んだデータをJSONフォーマットで
カラム情報とセットで出力しています。
Setsunaはパイプラインでデータを与えるだけの場合このような動きになります。
ではそれぞれ見ていきましょう。


topのロードアベレージの行

top - 16:47:19 up 8 min, 2 users, load average: 0.06, 0.11, 0.08
Setsunaの結果

{
"COLUMN0":"top",
"COLUMN1":"-",
"COLUMN2":"17:01:58",
"COLUMN3":"up",
"COLUMN4":"23",
"COLUMN5":"min,",
"COLUMN6":"2",
"COLUMN7":"users,",
"COLUMN8":"load",
"COLUMN9":"average:",
"COLUMN10":"0.20,",
"COLUMN11":"0.12,",
"COLUMN12":"0.09"
}
マッピング出来ていますね。
目当てのロードアベレージの値は"COLUMN10"の値です。
これでSetsunaへのデータの流し込みは完了です。
この流し込む処理の部分はAdapterと呼んでいます。




Step2
では次に、もう一つのデータであるApacheのアクセスログを流し込みたいと思います。
このデータはアクセスIPの抽出に利用したいと思います。
先ほどと同じ手順でSetsunaにデータを流し込みます。
このデータの流れにはlogという名前を付けます。

$tail -f /etc/httpd/logs/access_log | java -jar setsuna.jar -sep " - -" -stream log
今回は先ほどと違い、-sepという項目がついています。
これは、レコードデータをカラムに分解する際のセパレータを指定しています。
僕の環境のアクセスログは以下のようなフォーマットでした。

192.168.1.100 - - [15/Mar/2012:17:29:52 +0900] "GET /index.php HTTP/1.1" 200 130 "-" "Mozilla/5.0 (Windows NT 6.1; WOW64) AppleWebKit/535.11 (KHTML, like Gecko) Chrome/17.0.963.79 Safari/535.11"
ここからIPだけを取り出したいので、データを分解するセパレータに" - -"という文字列を
利用しました。これで先頭のIP部分の直後で分解できます。
そして実際にSetsunaに流しこんだ結果が下です。


{"COLUMN0":"192.168.1.100","COLUMN1":" [15/Mar/2012:17:46:45 +0900] "GET /index.php HTTP/1.1" 200 130 "-" "Mozilla/5.0 (Windows NT 6.1; WOW64) AppleWebKit/535.11 (KHTML, like Gecko) Chrome/17.0.963.79 Safari/535.11""}

"COLUMN0"にIP情報がマッピングされているのがわかります。




Step3
ここまではデータをSetsunaに流し込むだけでした、ここからは実際にデータを処理して負荷を検知してみます。
負荷の検知には先ほどのtopのデータの流れのロードアベレージの値を使いたいと思います。
そして、負荷を判定するための条件を設定したいと思います。それには-triggerオプションを利用します。
そして、より複雑な条件であるアクセス頻度の高いIPを割り出すために、-queryオプションを使い、
SQLにて、アクセスログのデータ内から基底回数を超えるアクセスを行っているIPがあった場合に、
Systemログにアラーを書きだそうと思います。


まず、先ほどのデータをクリアするために、2つのSetsunaプロセスを止めてください。Crtl+Cなどで強制終了でOKです。
そしてまずアクセスログを流し込みます。

$tail -f /etc/httpd/logs/access_log | java -jar setsuna.jar -sep " - -" -stream log
そして、topの結果を流し込むんですが、その前に、-triggerを作りたいと思います。
triggerではロードアベレージが規定値を超えていないかをチェックさせます。

  • trigger "COLUMN10 > 1"

ここで利用するのは先ほどの流しことんだデータを参考にCOLUMN10を指定しています。
これは直近のロードアベレージの値です。そして、'>'という条件記号を使って、
'1'とい数値以上を指定しています。
ここで注目してもらいたいのは、上の分解したtopのCOLUMN10の値です。


"COLUMN10":"0.20,"
COLUMN10の値は数値の後ろに','が入っています。この文字である"0.20,"を数値変換すると
当然最後の','のせいで変換エラーになってしまいます。
しかし、Setsunaの持つ'>'や'<'条件記号はこういった値を可能な限り数値化します。
つまり最後の','は外して数値に変換してから比べます。
これは、対象データの中から数値を最初に見つけたところから数値ではなくなるまでの間の
値を数値として認識するように出来ているからです。なので、"Count=10"みたいな値で
あっても10と認識することができます。
このほかに条件記号は完全一致を指定する'='や、部分一致を指定する'like'が存在します。
ではこれでtriggerは完成です。


次にアクセス頻度の高いIPを見つける-queryです。これは完全にSQLで記述します。
既に実行中のアクセスログのストリームを利用したいと思います。
アクセス頻度の高いIPを抽出するクエリだけ記述します。

select
count(*)
from
(select count(column0) as gcnt,
column0
from
log \ // 20120321:修正
group by column0)
where (gcnt*3) < (select
avg(cnt)
from (select
count(column0) as cnt,
column0
from
log
where
C_TIME > DATEADD(SECOND, -60, current_timestamp)
group by column0)
);
これで直近の60秒のログからアクセスしてきたIPアクセス回数の平均を
大きく超える(3倍)IPが存在しているかをカウントいてます。
ここで注目していただもらいたいのは、下から3行目のC_TIMEという項目です。
これはSetsunaがデータをテーブルに格納する際に自動的に作成しているカラムです。
このカラムには、データを投入した日時がTimestamp型で登録されます。
なので、このように現在時刻から60秒前のような指定が可能になっています。
"C_TIME > DATEADD(SECOND, -60, current_timestamp)"の部分。
このほかにlong型のシーケンス値をもっています。これは0始まりのシーケンス値
なので、単純にデータをソートする際はそちらが便利です。
この条件指定の部分をQueryと呼んでいます。




さて-tirggerと-queryでデータをハンドリング出来れば後は、UserEventと言う最後の部分です。
ここは、ユーザが自由に定義した処理を指定する部分です。
今回の例ではloggerと言われるsyslogにログを出力する標準のライブラリを使いたいと思います。
UserEventを利用する場合は-eventと記述し、それ以降に実行したコマンドを記述します。
以下のようになります。

-event "logger Warning!! System busy"
そして今までの全てを繋ぐと以下のようになります。



$top -b -d 1 | grep --line-buffered "top -" | java -jar setsuna.jar -stream top\
-trigger "COLUMN10 > 1" \
-query "select \
* \
from \
(select count(column0) as gcnt,\
column0 \
from \
log \ // 20120321:修正
group by column0) \
where (gcnt*3) < (select \
avg(cnt) \
from (select \
count(column0) as cnt,\
column0\
from \
log\
where\
C_TIME > DATEADD(SECOND, -60, current_timestamp)\
group by column0)"\
-event "logger Warning!! System busy"
これでtopのロードアベレージ値が1以上の場合に、直近60秒間のアクセスの間に異常に
アクセス数が多いIPアドレスを見つけてsyslogにワーニングを出力できます。


※2012/03/20 追記
  • eventで指定したコマンドには引数としてこのUserEventを実行した引き金となる

Adapterからのデータが1レコードが渡されるので第一引数で取得して使えます。
こんな感じです。


-event "logger "
実際には下のようになります。

logger "{"COLUMN0":"top","COLUMN1":"-","COLUMN2":"17:01:58","COLUMN3":"up","COLUMN4":"23","COLUMN5":"min,","COLUMN6":"2","COLUMN7":"users,","COLUMN8":"load","COLUMN9":"average:","COLUMN10":"0.20,","COLUMN11":"0.12,","COLUMN12":"0.09"}"

1レコードではなく、SQLなどでデータを取得して、それを利用したい場合は

  • eventqueryを使います。

これはUserEventでSQLを実行するもので書き方は-queryと全く同じです。
ただ結果が標準出力に出力されるので、Setsunaそのものをさらに別のコマンドにパイプで繋いで、-eventqueryの結果を利用出来ます。
バージョン0.0.1では-eventか、-eventqueryのどちらかしか指定出来ませんが
次のバージョンで、このクエリの結果を-eventで指定したコマンドにも渡せる様にする予定です。




このように、複数のデータをSetsunaに流しておけばそれらを複合してデータの変化を掴むことが
出来ます。ユーザイベントも指定は自由なので、メールを出したり、今回の例で言うとiptablesなどで
IPを遮断することもできます。
ただし、データベースではないためデータを蓄積することはできません。SQLでデータが引けるため
データは蓄積されるように見えますが、自動的に古いデータから消えていきます。(デフォルト10分前)
これは大量のインプットデータに高速に処理を行うために内臓DBがインメモリであるためです。
あくまで処理を行うエンジンというところですね。




現在はパイプのインプットしかないので今後はサーバ型の開発を行い、外部からNW越しにデータを
流せるようにする予定です。ログコレクタも最近は熱いので、それらともつなげたいです(fluentdとか良いですね!!)。