FluentdとMysqlを利用した簡単なRecommendEngineの開発
- 作者: 近藤宇智朗,生井智司,Dr.Kein,tokuhirom,森田創,中島聡,堤智代,A-Listers,はまちや2,竹原,川添貴生,久保達彦,道井俊介,飯田祐基,中村知成,規世やよい,後藤秀宣,天野祐介,奥野幹也,WEB+DB PRESS編集部
- 出版社/メーカー: 技術評論社
- 発売日: 2012/12/22
- メディア: 大型本
- 購入: 11人 クリック: 94回
- この商品を含むブログ (10件) を見る
RecommendEngineを作りたい
Fluentd Casual Talks LT #fluentd #fluentdcasual
Fluentdを使ってNginxLogをMysqlにリアルタイムで格納する - Yuta.Kikuchiの日記
興味連動型広告におけるマッチングの微妙な知識だけを活かして最近は仕事をしている@yutakikucです。このエントリーではSimpleなRecommendEngineの仕組みを考えたいと思います。Userの行動履歴から類似ItemのSuggestを行うために相関データを導き出し、関連性上位のItemを抽出します。今回Logの蓄積を行うのにFluentd、AccessDataや類似度Dataを管理するためにMysql、COS類似度を計算するバッチ処理をJavaで記述します。Fluentdを使ってLogをMysqlに格納するための仕組み設定は上のエントリーを確認していただければと思います。
AccessItem相関のCOS類似度
コサイン類似度
AccessItemの相関を求めるためにCOS類似度を利用します。COS類似度は広く利用されており今回のItem相関だけでなく、複数の単語類似度などにも利用されています。COS類似度はベクトルと内積から算出され、計算は以下の通りです。
、 、 、
AccessItem格納用DB構築
Tableの用途
Userの行動履歴から類似Itemを提案することを目的としています。それを実現するためにDBのTableは2個作成します。一つは選択中のItemに対してのAccess履歴を蓄積するTableで、もう一つはItem間のCOS類似度を格納するTableです。Tableにデータを書き込むタイミングはAccess履歴はNginxLogに書き込まれるタイミングでリアルタイムでitem_accessテーブルに格納し、COS類似度計算用のrecommend_itemTableにはJavaのバッチ処理で2つのitem間の類似度を計算したタイミングで格納します。
item_access Table
item_id user_id access_count created_time modified_time int(11) int(11) int(11) datetime datatime CREATE DATABASE IF NOT EXISTS `recommend`; use recommend; CREATE TABLE IF NOT EXISTS `item_access` ( `item_id` int(11) unsigned NOT NULL, `user_id` int(11) unsigned NOT NULL, `access_count` int(11) unsigned NOT NULL, `created_time` datetime NOT NULL, `modified_time` datetime NOT NULL, PRIMARY KEY (`item_id`,`user_id`) ) ENGINE=InnoDB DEFAULT CHARSET=utf8;recommend_item Table
item_id user_id access_count created_time modified_time int(11) int(11) int(11) datatime datatime CREATE DATABASE IF NOT EXISTS `recommend`; use recommend; CREATE TABLE IF NOT EXISTS `recommend_item` ( `main_item_id` int(11) unsigned NOT NULL, `recommend_item_id` int(11) unsigned NOT NULL, `cos_score` int(11) unsigned NOT NULL, `created_time` datetime NOT NULL, `modified_time` datetime NOT NULL, PRIMARY KEY (`main_item_id`, `recommend_item_id`) ) ENGINE=InnoDB DEFAULT CHARSET=utf8;
AccessItemログのリアルタイム格納
error_log
今回WebServerにはNginxを利用します。NginxでのItemAccessLogを以下のPathに記録してリアルタイムでMysqlにデータを格納することを行います。Logの流れとしてはPHP→NginxLog→Fluentd→Mysqlのようになります。NginxのPHPApplicationではItemAccessが発生した場合は以下のようなログ出力Scriptで/var/log/nginx/item/access.logにデータを記録します。item_idとuser_idはそれぞれGETパラメータで渡ってきている事を前提としています。出力するデータ項目はaccess時間/item_id/user_idの4つで、項目間はタブで区切っています。下にSampleLogも載せておきます。
<?php define( 'LOG_PATH', '/var/log/nginx/item/access.log' ); $item_id = $_GET['item_id']; $user_id = $_GET['user_id']; $error_string = $date . "\t" . $user_id . "\t" . $item_id . "\n" ; error_log( $error_string, 3, LOG_PATH );$ tail -n 10 /var/log/nginx/item/access.log 2013-05-29 02:08:18 218 34 2013-05-29 02:08:18 192 35 2013-05-29 02:08:18 884 36 2013-05-29 02:08:18 967 29 2013-05-29 02:08:18 412 31 2013-05-29 02:08:18 653 1 2013-05-29 02:08:18 615 2 2013-05-29 02:08:18 939 44 2013-05-29 02:08:18 632 11 2013-05-29 02:08:18 100 44td-agent conf
td-agentの設定ファイルを次のように定義します。
$ cat /etc/td-agent/td-agent.conf <source> type tail format /^(?<datetime>.*)\t(?<item_id>.*)\t(?<user_id>.*)$/ path /var/log/nginx/item/access.log tag nginx.access </source> <match nginx.access> type mysql host localhost database recommend key_names user_id,item_id sql INSERT INTO item_access (user_id,item_id,access_count,created_at,modified_at) VALUES (?,?,1,NOW(),NOW()) ON DUPLICATE KEY UPDATE access_count = access_count + 1, modified_at = NOW() username root #password "" flush_interval 10s </match>Mysqlへのリアルタイム格納ログ
UserがItemにAccessした時に以下のようなAccess情報がDBに記録されます。
mysql> SELECT * FROM item_access; +---------+---------+--------------+---------------------+---------------------+ | item_id | user_id | access_count | created_time | modified_time | +---------+---------+--------------+---------------------+---------------------+ | 1 | 1 | 4 | 2013-05-31 04:58:03 | 2013-05-31 04:58:32 | | 1 | 2 | 7 | 2013-05-31 04:57:59 | 2013-05-31 04:58:31 | | 1 | 3 | 5 | 2013-05-31 04:57:58 | 2013-05-31 04:58:32 | | 1 | 4 | 4 | 2013-05-31 04:58:06 | 2013-05-31 04:58:30 | | 1 | 5 | 4 | 2013-05-31 04:58:03 | 2013-05-31 04:58:19 | | 1 | 6 | 9 | 2013-05-31 04:57:56 | 2013-05-31 04:58:25 | | 1 | 7 | 3 | 2013-05-31 04:57:53 | 2013-05-31 04:58:20 | | 1 | 8 | 4 | 2013-05-31 04:57:54 | 2013-05-31 04:58:25 | | 1 | 9 | 4 | 2013-05-31 04:58:12 | 2013-05-31 04:58:32 | | 1 | 10 | 5 | 2013-05-31 04:57:56 | 2013-05-31 04:58:27 | | 1 | 11 | 5 | 2013-05-31 04:57:59 | 2013-05-31 04:58:29 | | 1 | 12 | 3 | 2013-05-31 04:58:00 | 2013-05-31 04:58:32 | | 1 | 13 | 3 | 2013-05-31 04:57:59 | 2013-05-31 04:58:17 | | 1 | 14 | 3 | 2013-05-31 04:58:05 | 2013-05-31 04:58:29 |
COS類似度計算
JavaConfig
JavaからMysqlに接続するためにjdkとmysql-connector-javaをinstallします。またJAVA_HOMEとCLASSPATHを以下のように設定します。
$ yum install java-1.6.0-openjdk mysql-connector-java -y $ sudo vim /etc/profile #以下を追記 export JAVA_HOME=/usr/java/default/ export CLASSPATH=.:$JAVA_HOME/lib/tools.jar:/usr/share/java/mysql-connector-java.jar $ source /etc/profileItem間のベクトルデータ抽出
以下のSQLで2Item間でUser毎に抽出したベクトル一覧を取得します。2Item間のベクトルを閲覧User数分足し合わせる事になります。User数が増えすぎると計算量が膨大になってしまうので、厳密な類似度を求めない場合はデータ量に合わせて間引いても良いと思います。
SELECT item1.user_id,item1.item_id AS item1_id,item1.access_count AS item1_count,item2.item_id AS item2_id,item2.access_count AS item2_count FROM item_access AS item1 INNER JOIN item_access AS item2 USING(user_id) WHERE item2.item_id <> item1.item_id ORDER BY item1.item_id, item2.item_id; +---------+----------+-------------+----------+-------------+ | user_id | item1_id | item1_count | item2_id | item2_count | +---------+----------+-------------+----------+-------------+ | 1 | 1 | 53 | 2 | 31 | | 8 | 1 | 24 | 2 | 25 | | 4 | 1 | 39 | 2 | 36 | | 7 | 1 | 36 | 2 | 27 | | 3 | 1 | 37 | 2 | 30 | | 10 | 1 | 22 | 2 | 28 | | 6 | 1 | 28 | 2 | 34 | | 2 | 1 | 31 | 2 | 32 | | 9 | 1 | 39 | 2 | 28 | | 5 | 1 | 39 | 2 | 39 | +---------+----------+-------------+----------+-------------+COS類似度計算
以下のJavaでCOS類似度を計算します。以下の処理ではMysqlへのConnection数を減らすためにSELECT文を1回、BULKINSERTを1回としていますがデータ量に合わせて実行回数を分割をすると良いと思います。データ量が多いとJavaのHeapErrorが出そうなのでそこは十分に気をつけて下さい。計算されたCOS類似度はrecommend_itemTableに格納されます。下のCosCalculatorをjavacした後に実行します。またFluentdでのデータ格納と異なりCosCalculatorはリアルタイムで計算しようとしていません。1時間毎にCronで計算をし直すなどの設定を入れておくと良いと思います。
import java.sql.*; import java.util.*; class CosCalculator { public static void main(String[] args) { int id1 = 0, id2 = 0, inner_product = 0, cos = 0; double item1_vector = 0,item2_vector = 0; try { String driver = "com.mysql.jdbc.Driver"; String server = "localhost"; String dbname = "recommend"; String url = "jdbc:mysql://" + server + "/" + dbname + "?useUnicode=true&characterEncoding=UTF-8"; String user = "root"; String password = ""; Class.forName(driver); Connection con = DriverManager.getConnection(url, user, password); Statement stmt = con.createStatement(); String search_query = "SELECT item1.user_id,item1.item_id AS item1_id,item1.access_count AS item1_count,item2.item_id AS item2_id,item2.access_count AS item2_count FROM item_access AS item1 INNER JOIN item_access AS item2 USING(user_id) WHERE item2.item_id <> item1.item_id ORDER BY item1.item_id, item2.item_id"; String insert_query = "INSERT INTO recommend_item(main_item_id,recommend_item_id,cos_score,created_time,modified_time) VALUES(?,?,?,NOW(),NOW()) ON DUPLICATE KEY UPDATE cos_score = ?, modified_time = NOW()"; PreparedStatement insert_stmt = con.prepareStatement( insert_query ); ResultSet rs = stmt.executeQuery( search_query ); while( rs.next() ) { if( ( id1 != 0 && id2 != 0 ) && ( id1 != rs.getInt("item1_id") || id2 != rs.getInt("item2_id") ) ) { cos = (int)( inner_product / Math.sqrt( item1_vector ) * Math.sqrt( item2_vector ) ); insert_stmt.setInt(1, id1); insert_stmt.setInt(2, id2); insert_stmt.setInt(3, cos); insert_stmt.setInt(4, cos); insert_stmt.addBatch(); inner_product = 0; item1_vector = 0; item2_vector = 0; } id1 = rs.getInt("item1_id"); id2 = rs.getInt("item2_id"); inner_product += rs.getInt("item1_count") * rs.getInt("item2_count"); item1_vector += Math.pow(rs.getInt("item1_count"), 2); item2_vector += Math.pow(rs.getInt("item2_count"), 2); } con.setAutoCommit(false); insert_stmt.executeBatch(); con.commit(); con.setAutoCommit(true); rs.close(); stmt.close(); insert_stmt.close(); con.close(); } catch (SQLException e) { System.err.println("SQL failed."); e.printStackTrace (); } catch (ClassNotFoundException ex) { ex.printStackTrace (); } } }$ javac CosCalculator.java $ java CosCalculatorCOS類似度上位5件を取得
recommend_itemTableに格納されたデータを取得します。以下の例ではitem_id = 10の関連性が強いと判断された上位5件のitem_idのリストです。関連性が強いと判断されたitem_idが抽出出来たので、後はitem情報が格納されているTableとJOINしてもらえれば類似ItemをSuggestできるかと思います。今日の話はここまでとします。
mysql> SELECT * FROM recommend_item WHERE main_item_id = 10 ORDER BY cos_score DESC LIMIT 5; +--------------+-------------------+-----------+---------------------+---------------------+ | main_item_id | recommend_item_id | cos_score | created_time | modified_time | +--------------+-------------------+-----------+---------------------+---------------------+ | 10 | 14 | 13207 | 2013-05-31 05:42:42 | 2013-05-31 06:22:29 | | 10 | 15 | 12947 | 2013-05-31 05:42:42 | 2013-05-31 06:22:29 | | 10 | 27 | 12839 | 2013-05-31 05:42:42 | 2013-05-31 06:22:29 | | 10 | 12 | 12645 | 2013-05-31 05:42:42 | 2013-05-31 06:22:29 | | 10 | 1 | 12444 | 2013-05-31 05:42:42 | 2013-05-31 06:22:29 | +--------------+-------------------+-----------+---------------------+---------------------+ 5 rows in set (0.03 sec)