ログ集計システムを自前で作る
Index
ログ集計システムの要件
爆弾ログ処理班の@yutakikuchi_です。
ログ集計システムというものを作る時に皆さんはどのように対応していますか? 以下の候補から要件のレベルで使い分けをしている人が多いと予想しています。ざっくりの評価ですが、導入難易度、正確性、可視化、リアルタイム、長期集計、スケール、運用費用という点で評価を書いています。
ツール 導入難易度 正確性 可視化 リアルタイム 長期集計 スケール 運用費用 リンク GA(スタンダード) ○ × ○ ○ ○ ○ ○ Google アナリティクス公式サイト - ウェブ解析とレポート機能 – Google アナリティクス ![]()
自前Hadoop × ○ × × ○ △ ○ NTTデータのHadoop報告書がすごかった - 科学と非科学の迷宮 ![]()
Kibana ○ ○ ○ ○ × × ○ Kibana入門 // Speaker Deck ![]()
TresureData ○ ○ ○ △ ○ ○ × 数百億件のデータを30秒で解析――クラウド型DWH「Treasure Data」に新サービス - ITmedia エンタープライズ ![]()
Redshift ○ ○ ○ △ ○ ○ × Amazon Redshiftではじめるビッグデータ処理入門:連載|gihyo.jp … 技術評論社 ![]()
GoogleBigQuery △ ○ ○ △ ○ ○ × (2/6)システム部が担うデジタルマーケティング - 第1回 ビッグデータチームの結成を──Google BigQueryがデー...:ITpro ![]()
mysql × ○ × ○ ○ × ○ ソーシャルゲームのためのMySQL入門 - Technology of DeNA ![]()
集計要件の切り口はリアルタイム性を求めるか、集計結果に正確性を求めるのか、大量データを後からスケールさせるか、長期保存が必要なログなのかのという点を考えればある程度は使えるツールが見えてくると思います。世間の動きとしてはTresureData、RedShift、GoogleBigQueryのようにクラウド型DWHにデータを転送して、BIツールで集計結果を見るという流れにある気がします。クラウド型DWHを使う事で開発コストを抑えれる、大量データを突っ込んでもスケール面で安心感があります。Kibanaのように検索エンジンを裏側で持つ仕組みを見た時はなるほどと思ったのですが、大量データを長期間保存するのには不向きな面があります。GAも素晴らしいツールですがスタンダード会員だと集計できる数に制限がでてきてしまいます。僕の今の仕事はクラウド型DWHは使わないという条件があり... 非効率だと叩かれそうですが安定と実績があるmysqlを利用してログを貯めるような設計を考えています。
DB設計
データ保存方針
上に貼った画像のように「食べたパンの枚数」を忘れてしまうようではディオ様にも怒られてしまいます。システム開発の現場でも過去の履歴を辿って数を抽出することはとても重要な話で、生ログに近いデータをDBに保存しておくのはシステム障害の場合等にも有効です。(当然生ログファイルも残します。mysqlで無くてMongodbでも良いです。) この生ログに近いデータをtableに保存しつつ、バッチ処理で定期的に集計値を算出し別tableに記録しBIツールはそちらを参照するような形がとれれば良いと思います。以前は生ログに近いデータを保存するのに反対で集計tableだけを用意し1行の集計カラムだけを都度updateする方針でやっていたのですが、集計値がずれた場合にはログファイルから再集計をやる必要があったので、今後は生ログtableと集計tableを分けて使いたいと思っています。その他生ログをDBに保存する用途としてはDBはAND条件指定が得意なのでURL×UAなどのクロス集計も簡単に抽出できます。
table設計
mysqlでログ集計を考える時に重要な要素は2つあってログ保存期間の仕様を決める、用途毎のtableを用意するという内容があるかと思います。大量の生ログに近いデータをmysqlに延々と貯め続けるのは非効率なのである程度のところでデータを削除しなければなりません。(削除は仕様として割り切るしかないかなと考えています。mysqlのパージコストは大きいですが)それに関連してログを保存するテーブルと集計結果を格納するtableを分けBIツールからは集計用のテーブルを参照するような仕組みが良いと思います。そこで以下のようなテーブルを設計します。パージし易いようにPARTITIONを使います。
tableを3つ用意します。
table名 用途 書き込み元 log リアルタイムでログを書き込む。1行で1アクセスを管理。 fluentd summary_realtime 短時間でlogから集計結果をsummaryする。 batch summary_date デイリーでlogから集計結果をsummaryする。 batch batch_history バッチが正常終了とsummary_realtimeで何分迄集計したかを記録。 batch SET @OLD_UNIQUE_CHECKS=@@UNIQUE_CHECKS, UNIQUE_CHECKS=0; SET @OLD_FOREIGN_KEY_CHECKS=@@FOREIGN_KEY_CHECKS, FOREIGN_KEY_CHECKS=0; SET @OLD_SQL_MODE=@@SQL_MODE, SQL_MODE='TRADITIONAL'; CREATE SCHEMA IF NOT EXISTS `analytics` DEFAULT CHARACTER SET utf8 ; USE `analytics` ; -- ----------------------------------------------------- -- Table `analytics`.`log` -- ----------------------------------------------------- CREATE TABLE IF NOT EXISTS `analytics`.`log` ( `log_id` INT NOT NULL AUTO_INCREMENT , `path` VARCHAR(255) NOT NULL , `code` INT NOT NULL , `ua` VARCHAR(512) NOT NULL , `referer` VARCHAR(255) NOT NULL , `created_at` DATETIME NOT NULL , PRIMARY KEY (`log_id`, `created_at`) , INDEX `PATH` (`path` ASC, `code` ASC, `created_at` ASC) ) ENGINE = InnoDB PARTITION BY RANGE(TO_DAYS(created_at)) PARTITIONS 5( PARTITION p1 VALUES LESS THAN (TO_DAYS('2014-02-01 00:00:00')), PARTITION p2 VALUES LESS THAN (TO_DAYS('2014-03-01 00:00:00')), PARTITION p3 VALUES LESS THAN (TO_DAYS('2014-04-01 00:00:00')), PARTITION p4 VALUES LESS THAN (TO_DAYS('2014-05-01 00:00:00')), PARTITION p5 VALUES LESS THAN (TO_DAYS('2014-06-01 00:00:00'))); -- ----------------------------------------------------- -- Table `analytics`.`realcreated_at_summary` -- ----------------------------------------------------- CREATE TABLE IF NOT EXISTS `analytics`.`summary_realtime` ( `summary_realtime_id` INT NOT NULL AUTO_INCREMENT , `path` VARCHAR(255) NOT NULL , `count` INT NOT NULL , `start_at` DATETIME NOT NULL , `end_at` DATETIME NOT NULL , `created_at` DATETIME NOT NULL , `modified_at` DATETIME NOT NULL , PRIMARY KEY (`summary_realtime_id`) , UNIQUE(`path`, `start_at`, `end_at`), INDEX `PATH1` (`path` ASC, `created_at` ASC) ) ENGINE = InnoDB; -- ----------------------------------------------------- -- Table `analytics`.`summary_date` -- ----------------------------------------------------- CREATE TABLE IF NOT EXISTS `analytics`.`summary_date` ( `summary_date_id` INT NOT NULL AUTO_INCREMENT , `path` VARCHAR(255) NOT NULL , `count` INT NOT NULL , `date` DATE NOT NULL , `created_at` DATETIME NOT NULL , `modified_at` DATETIME NOT NULL , PRIMARY KEY (`summary_date_id`) , INDEX `PATH` (`path` ASC, `date` ASC) ) ENGINE = InnoDB; -- ----------------------------------------------------- -- Table `analytics`.`batch_history` -- ----------------------------------------------------- CREATE TABLE IF NOT EXISTS `analytics`.`batch_history` ( `batch_history_id` INT NOT NULL AUTO_INCREMENT , `type` CHAR(1) NOT NULL , `date` DATE NOT NULL , `count` INT NOT NULL , PRIMARY KEY (`batch_history_id`) , UNIQUE(`type`, `date`), INDEX `DATE` (`type` ASC, `date` ASC)) ENGINE = InnoDB; SET SQL_MODE=@OLD_SQL_MODE; SET FOREIGN_KEY_CHECKS=@OLD_FOREIGN_KEYサーバ構成
サーバ構成は次のようなものを想定しています。一応半年前ぐらいにもmysqlとinfinidbでログ集計を扱ったエントリーを書いていますので参考になれば。
【進撃の巨大データ】Log集計用DBとシステム構成の美しい設計を考える - Yuta.Kikuchiの日記
Fluentd
fluentd,fluent-plugin-mysql-bulk install
Apacheのログをリアルタイムでmysqlに格納する為の準備をします。Fluentd本体とfluent-plugin-mysql-bulkを設定します。※fluentdのmysqlpluginは他にもありますが、1レコードずつinsertするとmysqlへの接続負荷が高くなってしまうのでbulkinsertできるpluignを選びました。blukinsertとはINSERT INTO (A,B,C) VALUES(aaa,bbb,ccc), (ddd,eee,fff)のように1つのSQL文でINSERTが出来る為に高速と言われています。
Fluentd: Open Source Log Management
toyama0919/fluent-plugin-mysql-bulk · GitHub$ curl -L http://toolbelt.treasuredata.com/sh/install-redhat.sh | sh $ sudo /usr/lib64/fluent/ruby/bin/fluent-gem install fluent-plugin-mysql-bulk
td-agent.conf
上のサーバ構成のLogAggregator Masterの設定を想定して記述します。次の内容を/etc/td-agent/td-agent.confに設定するとmysqlに接続してbulkinsertしてくれます。
<source> type forward port 24224 </source> <match apache.access> type file path /var/log/apache/access.log </match> <source> type tail path /var/log/httpd/access_log format apache tag apache.log pos_file /var/log/td-agent/apache.pos </source> <match apache.log> type mysql_bulk host localhost database analytics username root column_names path,code,ua,referer,created_at key_names path,code,agent,referer,${time} table log flush_interval 5s </match>$ sudo /etc/init.d/td-agent restart
mysqlにデータが格納される事を確認する
apacheへのRequest => apache logへの書き込み => fluent-plugin-mysql-bulk => mysqlの流れを確認します。
$ for i in {1..100} do curl "http://localhost/index.html" done $ tail -n 3 /var/log/httpd/access_log ::1 - - [08/Feb/2014:23:10:40 +0900] "GET /index.html HTTP/1.1" 200 - "-" "curl/7.19.7 (x86_64-redhat-linux-gnu) libcurl/7.19.7 NSS/3.14.0.0 zlib/1.2.3 libidn/1.18 libssh2/1.4.2" ::1 - - [08/Feb/2014:23:10:40 +0900] "GET /index.html HTTP/1.1" 200 - "-" "curl/7.19.7 (x86_64-redhat-linux-gnu) libcurl/7.19.7 NSS/3.14.0.0 zlib/1.2.3 libidn/1.18 libssh2/1.4.2" ::1 - - [08/Feb/2014:23:10:40 +0900] "GET /index.html HTTP/1.1" 200 - "-" "curl/7.19.7 (x86_64-redhat-linux-gnu) libcurl/7.19.7 NSS/3.14.0.0 zlib/1.2.3 libidn/1.18 libssh2/1.4.2" $ tail -n 3 /var/log/td-agent/td-agent.log 2014-02-08 23:10:42 +0900 [info]: bulk insert sql => [INSERT INTO log (path,code,ua,referer,created_at) VALUES ('/index.html','200','curl/7.19.7 (x86_64-redhat-linux-gnu) libcurl/7.19.7 NSS/3.14.0.0 zlib/1.2.3 libidn/1.18 libssh2/1.4.2','-','2014-02-08 23:10:37'),('/index.html','200','curl/7.19.7 (x86_64-redhat-linux-gnu) libcurl/7.19.7 NSS/3.14.0.0 zlib/1.2.3 libidn/1.18 libssh2/1.4.2','-','2014-02-08 23:10:37'),....mysql> SELECT * FROM analytics.log ORDER BY log_id DESC LIMIT 3 ; +--------+-------------+------+--------------------------------------------------------------------------------------------------------+---------+---------------------+ | log_id | path | code | ua | referer | created_at | +--------+-------------+------+--------------------------------------------------------------------------------------------------------+---------+---------------------+ | 100 | /index.html | 200 | curl/7.19.7 (x86_64-redhat-linux-gnu) libcurl/7.19.7 NSS/3.14.0.0 zlib/1.2.3 libidn/1.18 libssh2/1.4.2 | - | 2014-02-08 23:10:40 | | 99 | /index.html | 200 | curl/7.19.7 (x86_64-redhat-linux-gnu) libcurl/7.19.7 NSS/3.14.0.0 zlib/1.2.3 libidn/1.18 libssh2/1.4.2 | - | 2014-02-08 23:10:40 | | 98 | /index.html | 200 | curl/7.19.7 (x86_64-redhat-linux-gnu) libcurl/7.19.7 NSS/3.14.0.0 zlib/1.2.3 libidn/1.18 libssh2/1.4.2 | - | 2014-02-08 23:10:40 | +--------+-------------+------+--------------------------------------------------------------------------------------------------------+---------+---------------------+
集計用のバッチ
以下のpythonコードでlogtableから5分毎に集計結果をsummary_realtimeに格納、1日毎にsummary_dateに格納する事ができます。それぞれのプログラムをcronで回せばOKですね。実行した結果mysqlにデータが格納されている事も確認します。logtableの格納件数とsummarytableの集計値が一致しているので問題無さそうでした。
#!/usr/bin/env python #coding:utf-8 import sys,mysql.connector from datetime import * import time date = datetime.now().strftime( '%Y-%m-%d' ) base_timestamp = int( time.mktime( datetime.strptime( date + ' 00:00:00', "%Y-%m-%d %H:%M:%S" ).timetuple() ) ) # mysql connector con = mysql.connector.connect( host='localhost', db='analytics', user='root', passwd='', buffered=True) cur = con.cursor() # get batch_count cur.execute('SELECT count FROM %s WHERE date = "%s" AND type = "%s"' % ('batch_history', date, 'R') ) res = cur.fetchone() try: batch_count = res[0] except TypeError: batch_count = 0 start_timestamp = base_timestamp + 300 * ( batch_count -1 ); end_timestamp = base_timestamp + 300 * batch_count; # get logdata cur.execute('SELECT COUNT(path),path,ua FROM %s WHERE code = %d AND created_at BETWEEN FROM_UNIXTIME("%s") AND FROM_UNIXTIME("%s") GROUP BY path' % ('log', 200, start_timestamp, end_timestamp)) res = cur.fetchall() # insert into summary_realtime for row in res: cur.execute('INSERT INTO %s (path, count, start_at, end_at, created_at, modified_at) VALUES("%s", %d, FROM_UNIXTIME("%s"), FROM_UNIXTIME("%s"), NOW(), NOW()) ON DUPLICATE KEY UPDATE count = %d, start_at = FROM_UNIXTIME("%s"), end_at = FROM_UNIXTIME("%s"), modified_at = NOW(), summary_realtime_id = LAST_INSERT_ID(summary_realtime_id)' %('summary_realtime', row[1], row[0], start_timestamp, end_timestamp, row[0], start_timestamp, end_timestamp )) # history cur.execute('INSERT INTO %s (type,date,count) VALUES("%s", "%s", 1) ON DUPLICATE KEY UPDATE batch_history_id = LAST_INSERT_ID(batch_history_id), count = count + 1' % ('batch_history', 'R', date)) con.commit() cur.close() con.close()#!/usr/bin/env python #coding:utf-8 import sys,mysql.connector from datetime import * import time yesterday = datetime.now() - timedelta(0,3600*24) start_date = yesterday.strftime( '%Y-%m-%d 00:00:00' ) end_date = datetime.now().strftime( '%Y-%m-%d 00:00:00' ) date = yesterday.strftime( '%Y-%m-%d' ) # mysql connector con = mysql.connector.connect( host='localhost', db='analytics', user='root', passwd='', buffered=True) cur = con.cursor() # get batch_count cur.execute('SELECT count FROM %s WHERE date = "%s" AND type = "%s"' % ('batch_history', date, 'D') ) res = cur.fetchone() try: batch_count = res[0] except TypeError: batch_count = 0 # get logdata cur.execute('SELECT COUNT(path),path,ua FROM %s WHERE code = %d AND created_at BETWEEN "%s" AND "%s" GROUP BY path' % ('log', 200, start_date, end_date)) res = cur.fetchall() # insert into summary_date for row in res: cur.execute('INSERT INTO %s (path, count, date, created_at, modified_at) VALUES("%s", %d, "%s", NOW(), NOW()) ON DUPLICATE KEY UPDATE count = %d, date = "%s", modified_at = NOW(), summary_date_id = LAST_INSERT_ID(summary_date_id)' %('summary_date', row[1], row[0], end_date, row[0], end_date)) # history cur.execute('INSERT INTO %s (type,date,count) VALUES("%s", "%s", 1) ON DUPLICATE KEY UPDATE batch_history_id = LAST_INSERT_ID(batch_history_id), count = count + 1' % ('batch_history', 'D', date)) con.commit() cur.close() con.close()mysql> SELECT COUNT(*) FROM log WHERE created_at BETWEEN '2014-02-08' AND '2014-02-09'; +----------+ | COUNT(*) | +----------+ | 100 | +----------+ 1 row in set (0.00 sec) mysql> SELECT COUNT(*) FROM log WHERE created_at BETWEEN '2014-02-09' AND '2014-02-10'; +----------+ | COUNT(*) | +----------+ | 1288 | +----------+ 1 row in set (0.00 sec) mysql> SELECT * FROM summary_date; +-----------------+-------------+-------+------------+---------------------+---------------------+ | summary_date_id | path | count | date | created_at | modified_at | +-----------------+-------------+-------+------------+---------------------+---------------------+ | 1 | /index.html | 100 | 2014-02-09 | 2014-02-09 15:48:37 | 2014-02-09 15:48:37 | +-----------------+-------------+-------+------------+---------------------+---------------------+ 1 row in set (0.00 sec) mysql> SELECT SUM(count) FROM summary_date; +------------+ | SUM(count) | +------------+ | 100 | +------------+ 1 row in set (0.00 sec) mysql> SELECT * FROM summary_realtime; +---------------------+-------------+-------+---------------------+---------------------+---------------------+---------------------+ | summary_realtime_id | path | count | start_at | end_at | created_at | modified_at | +---------------------+-------------+-------+---------------------+---------------------+---------------------+---------------------+ | 1 | /index.html | 144 | 2014-02-09 01:00:00 | 2014-02-09 01:05:00 | 2014-02-09 15:45:21 | 2014-02-09 15:45:21 | | 2 | /index.html | 144 | 2014-02-09 01:10:00 | 2014-02-09 01:15:00 | 2014-02-09 15:45:21 | 2014-02-09 15:45:21 | | 3 | /index.html | 1000 | 2014-02-09 01:50:00 | 2014-02-09 01:55:00 | 2014-02-09 15:45:23 | 2014-02-09 15:45:23 | +---------------------+-------------+-------+---------------------+---------------------+---------------------+---------------------+ 3 rows in set (0.00 sec) mysql> SELECT SUM(count) FROM summary_realtime; +------------+ | SUM(count) | +------------+ | 1288 | +------------+ 1 row in set (0.00 sec)
その他
Table肥大化防止
logtableが肥大化して来た時はパージを行います。予めpartitionを月毎に切っているので仕様で決めた蓄積期間を経過したらサヨナラします。summary_dateのtableは容量を食わないはずなのでそのまま残しておいても良いと思います。
mysql> ALTER TABLE log DROP PARTITION p1;可視化
Highcharts - Interactive JavaScript charts for your webpage
morris.js
グラフ描画のJSで注目したいのはHighchartsとmorris.jsぐらいでしょうか。Highchartsは商用での利用はライセンス契約しないとだめなので、その辺を気にせず使いたい方はmorris.jsが良いかなと思います。