Y's note

Web技術・プロダクトマネジメント・そして経営について

本ブログの更新を停止しており、今後は下記Noteに記載していきます。
https://note.com/yutakikuchi/

FluentdとMysqlを利用した簡単なRecommendEngineの開発

WEB+DB PRESS Vol.72

WEB+DB PRESS Vol.72

  • 作者: 近藤宇智朗,生井智司,Dr.Kein,tokuhirom,森田創,中島聡,堤智代,A-Listers,はまちや2,竹原,川添貴生,久保達彦,道井俊介,飯田祐基,中村知成,規世やよい,後藤秀宣,天野祐介,奥野幹也,WEB+DB PRESS編集部
  • 出版社/メーカー: 技術評論社
  • 発売日: 2012/12/22
  • メディア: 大型本
  • 購入: 11人 クリック: 94回
  • この商品を含むブログ (10件) を見る

RecommendEngineを作りたい

Fluentd Casual Talks LT #fluentd #fluentdcasual はてなブックマーク - Fluentd Casual Talks LT #fluentd #fluentdcasual
Fluentdを使ってNginxLogをMysqlにリアルタイムで格納する - Yuta.Kikuchiの日記 はてなブックマーク - Fluentdを使ってNginxLogをMysqlにリアルタイムで格納する - Yuta.Kikuchiの日記
興味連動型広告におけるマッチングの微妙な知識だけを活かして最近は仕事をしている@yutakikucです。このエントリーではSimpleなRecommendEngineの仕組みを考えたいと思います。Userの行動履歴から類似ItemのSuggestを行うために相関データを導き出し、関連性上位のItemを抽出します。今回Logの蓄積を行うのにFluentd、AccessDataや類似度Dataを管理するためにMysql、COS類似度を計算するバッチ処理Javaで記述します。Fluentdを使ってLogをMysqlに格納するための仕組み設定は上のエントリーを確認していただければと思います。

AccessItem相関のCOS類似度

コサイン類似度 はてなブックマーク - コサイン類似度
AccessItemの相関を求めるためにCOS類似度を利用します。COS類似度は広く利用されており今回のItem相関だけでなく、複数の単語類似度などにも利用されています。COS類似度はベクトルと内積から算出され、計算は以下の通りです。
cos(a,b)=\frac{a \cdot b}{ |a||b| }a \cdot b=\Sigma_{i=1}^{n} a_{i}*b_{i}|a|=\sqrt{\Sigma_{i=1}^{n} a_{i}^{2}}|b|=\sqrt{\Sigma_{i=1}^{n} b_{i}^{2}}

AccessItem格納用DB構築

Tableの用途

Userの行動履歴から類似Itemを提案することを目的としています。それを実現するためにDBのTableは2個作成します。一つは選択中のItemに対してのAccess履歴を蓄積するTableで、もう一つはItem間のCOS類似度を格納するTableです。Tableにデータを書き込むタイミングはAccess履歴はNginxLogに書き込まれるタイミングでリアルタイムでitem_accessテーブルに格納し、COS類似度計算用のrecommend_itemTableにはJavaバッチ処理で2つのitem間の類似度を計算したタイミングで格納します。

item_access Table
item_id user_id access_count created_time modified_time
int(11) int(11) int(11) datetime datatime
CREATE DATABASE IF NOT EXISTS `recommend`;
use recommend;
CREATE TABLE IF NOT EXISTS `item_access` (
`item_id` int(11) unsigned NOT NULL,
`user_id` int(11) unsigned NOT NULL,
`access_count` int(11) unsigned NOT NULL,
`created_time` datetime NOT NULL,
`modified_time` datetime NOT NULL,
PRIMARY KEY (`item_id`,`user_id`)
) ENGINE=InnoDB DEFAULT CHARSET=utf8;
recommend_item Table
item_id user_id access_count created_time modified_time
int(11) int(11) int(11) datatime datatime
CREATE DATABASE IF NOT EXISTS `recommend`;
use recommend;
CREATE TABLE IF NOT EXISTS `recommend_item` (
`main_item_id` int(11) unsigned NOT NULL,
`recommend_item_id` int(11) unsigned NOT NULL,
`cos_score` int(11) unsigned NOT NULL,
`created_time` datetime NOT NULL,
`modified_time` datetime NOT NULL,
PRIMARY KEY (`main_item_id`, `recommend_item_id`)
) ENGINE=InnoDB DEFAULT CHARSET=utf8;

AccessItemログのリアルタイム格納

error_log

今回WebServerにはNginxを利用します。NginxでのItemAccessLogを以下のPathに記録してリアルタイムでMysqlにデータを格納することを行います。Logの流れとしてはPHP→NginxLog→Fluentd→Mysqlのようになります。NginxのPHPApplicationではItemAccessが発生した場合は以下のようなログ出力Scriptで/var/log/nginx/item/access.logにデータを記録します。item_idとuser_idはそれぞれGETパラメータで渡ってきている事を前提としています。出力するデータ項目はaccess時間/item_id/user_idの4つで、項目間はタブで区切っています。下にSampleLogも載せておきます。

<?php

define( 'LOG_PATH', '/var/log/nginx/item/access.log' );
$item_id = $_GET['item_id'];
$user_id = $_GET['user_id'];
$error_string = $date . "\t" . $user_id . "\t" . $item_id . "\n" ;
error_log( $error_string, 3, LOG_PATH );
$ tail -n 10 /var/log/nginx/item/access.log
2013-05-29 02:08:18	218	34
2013-05-29 02:08:18	192	35
2013-05-29 02:08:18	884	36
2013-05-29 02:08:18	967	29
2013-05-29 02:08:18	412	31
2013-05-29 02:08:18	653	1
2013-05-29 02:08:18	615	2
2013-05-29 02:08:18	939	44
2013-05-29 02:08:18	632	11
2013-05-29 02:08:18	100	44
td-agent conf

td-agentの設定ファイルを次のように定義します。タグ中では独自formatを定義できるので上で指定した内容に合わせてformat /^(?.*)\t(?.*)\t(?.*)$/ のように正規表現を記述します。条件にマッチしたLogはタグで記述したkey_namesに従ってINSERT文が実行されます。

$ cat /etc/td-agent/td-agent.conf

<source>
  type tail
  format /^(?<datetime>.*)\t(?<item_id>.*)\t(?<user_id>.*)$/ 
  path /var/log/nginx/item/access.log
  tag nginx.access
</source>

<match nginx.access>
  type mysql
  host localhost
  database recommend
  key_names user_id,item_id
  sql INSERT INTO item_access (user_id,item_id,access_count,created_at,modified_at) VALUES (?,?,1,NOW(),NOW()) ON DUPLICATE KEY UPDATE access_count = access_count + 1, modified_at = NOW()
  username root
  #password ""
  flush_interval 10s
</match>
Mysqlへのリアルタイム格納ログ

UserがItemにAccessした時に以下のようなAccess情報がDBに記録されます。

mysql> SELECT * FROM item_access;
+---------+---------+--------------+---------------------+---------------------+
| item_id | user_id | access_count | created_time        | modified_time       |
+---------+---------+--------------+---------------------+---------------------+
|       1 |       1 |            4 | 2013-05-31 04:58:03 | 2013-05-31 04:58:32 |
|       1 |       2 |            7 | 2013-05-31 04:57:59 | 2013-05-31 04:58:31 |
|       1 |       3 |            5 | 2013-05-31 04:57:58 | 2013-05-31 04:58:32 |
|       1 |       4 |            4 | 2013-05-31 04:58:06 | 2013-05-31 04:58:30 |
|       1 |       5 |            4 | 2013-05-31 04:58:03 | 2013-05-31 04:58:19 |
|       1 |       6 |            9 | 2013-05-31 04:57:56 | 2013-05-31 04:58:25 |
|       1 |       7 |            3 | 2013-05-31 04:57:53 | 2013-05-31 04:58:20 |
|       1 |       8 |            4 | 2013-05-31 04:57:54 | 2013-05-31 04:58:25 |
|       1 |       9 |            4 | 2013-05-31 04:58:12 | 2013-05-31 04:58:32 |
|       1 |      10 |            5 | 2013-05-31 04:57:56 | 2013-05-31 04:58:27 |
|       1 |      11 |            5 | 2013-05-31 04:57:59 | 2013-05-31 04:58:29 |
|       1 |      12 |            3 | 2013-05-31 04:58:00 | 2013-05-31 04:58:32 |
|       1 |      13 |            3 | 2013-05-31 04:57:59 | 2013-05-31 04:58:17 |
|       1 |      14 |            3 | 2013-05-31 04:58:05 | 2013-05-31 04:58:29 |

COS類似度計算

JavaConfig

JavaからMysqlに接続するためにjdkmysql-connector-javaをinstallします。またJAVA_HOMEとCLASSPATHを以下のように設定します。

$ yum install java-1.6.0-openjdk mysql-connector-java -y
$ sudo vim /etc/profile

#以下を追記
export JAVA_HOME=/usr/java/default/
export CLASSPATH=.:$JAVA_HOME/lib/tools.jar:/usr/share/java/mysql-connector-java.jar

$ source /etc/profile
Item間のベクトルデータ抽出

以下のSQLで2Item間でUser毎に抽出したベクトル一覧を取得します。2Item間のベクトルを閲覧User数分足し合わせる事になります。User数が増えすぎると計算量が膨大になってしまうので、厳密な類似度を求めない場合はデータ量に合わせて間引いても良いと思います。

SELECT item1.user_id,item1.item_id AS item1_id,item1.access_count AS item1_count,item2.item_id AS item2_id,item2.access_count AS item2_count FROM item_access AS item1 INNER JOIN item_access AS item2 USING(user_id) WHERE item2.item_id <> item1.item_id ORDER BY item1.item_id, item2.item_id;
+---------+----------+-------------+----------+-------------+
| user_id | item1_id | item1_count | item2_id | item2_count |
+---------+----------+-------------+----------+-------------+
|       1 |        1 |          53 |        2 |          31 |
|       8 |        1 |          24 |        2 |          25 |
|       4 |        1 |          39 |        2 |          36 |
|       7 |        1 |          36 |        2 |          27 |
|       3 |        1 |          37 |        2 |          30 |
|      10 |        1 |          22 |        2 |          28 |
|       6 |        1 |          28 |        2 |          34 |
|       2 |        1 |          31 |        2 |          32 |
|       9 |        1 |          39 |        2 |          28 |
|       5 |        1 |          39 |        2 |          39 |
+---------+----------+-------------+----------+-------------+
COS類似度計算

以下のJavaでCOS類似度を計算します。以下の処理ではMysqlへのConnection数を減らすためにSELECT文を1回、BULKINSERTを1回としていますがデータ量に合わせて実行回数を分割をすると良いと思います。データ量が多いとJavaのHeapErrorが出そうなのでそこは十分に気をつけて下さい。計算されたCOS類似度はrecommend_itemTableに格納されます。下のCosCalculatorをjavacした後に実行します。またFluentdでのデータ格納と異なりCosCalculatorはリアルタイムで計算しようとしていません。1時間毎にCronで計算をし直すなどの設定を入れておくと良いと思います。

import java.sql.*;
import java.util.*;
class CosCalculator {
   public static void main(String[] args) {
      int id1 = 0, id2 = 0, inner_product = 0, cos = 0;
      double item1_vector = 0,item2_vector = 0;
      try {
         String driver = "com.mysql.jdbc.Driver";
         String server = "localhost";
         String dbname = "recommend";
         String url = "jdbc:mysql://" + server + "/" + dbname + "?useUnicode=true&characterEncoding=UTF-8";
         String user = "root";
         String password = "";
         Class.forName(driver);
         Connection con = DriverManager.getConnection(url, user, password);
         Statement stmt = con.createStatement();
         String search_query = "SELECT item1.user_id,item1.item_id AS item1_id,item1.access_count AS item1_count,item2.item_id AS item2_id,item2.access_count AS item2_count FROM item_access AS item1 INNER JOIN item_access AS item2 USING(user_id) WHERE item2.item_id <> item1.item_id ORDER BY item1.item_id, item2.item_id";
         String insert_query = "INSERT INTO recommend_item(main_item_id,recommend_item_id,cos_score,created_time,modified_time) VALUES(?,?,?,NOW(),NOW()) ON DUPLICATE KEY UPDATE cos_score = ?, modified_time = NOW()";
         PreparedStatement insert_stmt = con.prepareStatement( insert_query );
         ResultSet rs = stmt.executeQuery( search_query );
         while( rs.next() ) {
            if( ( id1 != 0 && id2 != 0 ) && ( id1 != rs.getInt("item1_id") || id2 != rs.getInt("item2_id") ) ) {
               cos = (int)( inner_product / Math.sqrt( item1_vector ) * Math.sqrt( item2_vector ) );
               insert_stmt.setInt(1, id1);
               insert_stmt.setInt(2, id2);
               insert_stmt.setInt(3, cos);
               insert_stmt.setInt(4, cos);
               insert_stmt.addBatch();
               inner_product = 0;
               item1_vector = 0;
               item2_vector = 0;
            } 
            id1 = rs.getInt("item1_id");
            id2 = rs.getInt("item2_id");
            inner_product += rs.getInt("item1_count") * rs.getInt("item2_count");
            item1_vector += Math.pow(rs.getInt("item1_count"), 2);
            item2_vector += Math.pow(rs.getInt("item2_count"), 2);
         } 
         con.setAutoCommit(false);
         insert_stmt.executeBatch();
         con.commit();
         con.setAutoCommit(true);
         rs.close();
         stmt.close();
         insert_stmt.close();
         con.close();
         } catch (SQLException e) {
            System.err.println("SQL failed.");
            e.printStackTrace ();
         } catch (ClassNotFoundException ex) {
            ex.printStackTrace ();
         }
      }
}
$ javac CosCalculator.java
$ java CosCalculator
COS類似度上位5件を取得

recommend_itemTableに格納されたデータを取得します。以下の例ではitem_id = 10の関連性が強いと判断された上位5件のitem_idのリストです。関連性が強いと判断されたitem_idが抽出出来たので、後はitem情報が格納されているTableとJOINしてもらえれば類似ItemをSuggestできるかと思います。今日の話はここまでとします。

mysql> SELECT * FROM recommend_item WHERE main_item_id = 10 ORDER BY cos_score DESC LIMIT 5;
+--------------+-------------------+-----------+---------------------+---------------------+
| main_item_id | recommend_item_id | cos_score | created_time        | modified_time       |
+--------------+-------------------+-----------+---------------------+---------------------+
|           10 |                14 |     13207 | 2013-05-31 05:42:42 | 2013-05-31 06:22:29 |
|           10 |                15 |     12947 | 2013-05-31 05:42:42 | 2013-05-31 06:22:29 |
|           10 |                27 |     12839 | 2013-05-31 05:42:42 | 2013-05-31 06:22:29 |
|           10 |                12 |     12645 | 2013-05-31 05:42:42 | 2013-05-31 06:22:29 |
|           10 |                 1 |     12444 | 2013-05-31 05:42:42 | 2013-05-31 06:22:29 |
+--------------+-------------------+-----------+---------------------+---------------------+
5 rows in set (0.03 sec)