When it’s ready.

出来るまで出来ない

GAE/Python でフルテキストサーチ実装した。 redisを使ったインチキバージョン

GAEにどんどん機能が追加されていく中、なかなか実装されないのが全文検索。品詞がとれるセグメンターだけでも提供してくれたら全然便利だと思うんだけどそんなアナウンスはまだ有りません。

なきゃ作ればいいじゃんという事で、全文検索もどきを実装してみました。ひとつ前のエントリー通りTriGramです。
以前、恵比寿のイケメン イアンさんと一緒に作ったmisopotetoというモジュールをベースにしています。
今回のポイントは、転置インデックスをredisサーバに送っているところ、GAE(とうかDB全般)は、インサートがめちゃくちゃ遅いので、Ngramでgram毎にエントリーIDをappendしていくというのは辛いです。Twitterの検索結果15個x100文字位をTriGramでインデックスを作ろうとすると、1500個くらいをgetしてappendして、putする必要があります。以前は、TaskQueueが無かったのでおれおれQueueを作って非同期でIndexを作成していましたが今はTaskQueueがあるので問題なく実装できると思っていました。がしかし、全然遅いし、なぞのリトライが多発して、30分くらいで2000+個のTaskQueueが積まれっぱなしで全然消化される気配がありませんでした。
 最近メチャ熱な、redisを使うことでサクサクIndex作成をしてみました。この時点でGAE/Pだけじゃなくなってるんですが・・・時間かければGAEでも出来るのでまぁまぁまぁ。redisは、sakuraのVPSにubuntu入れて立ててます。Pythonのredisモジュールはsocketが必須でGAEで使えないので、VPS上にredisプロキシーなFlaskAppを作ってPOSTメソッドでredisのKVSを使えるようにしています。

ソースは、bitbucket.orgに上げました。まるごと上げたので問題有るかも知れないですが、問題あったら教えてください。
ソース:http://bitbucket.org/a2c/a2c-fts/overview
a2c-fts:http://a2c-fts.appspot.com/
(注意:ChromeかSafari以外で見ると悲しい見た目になります)

セグメンター

utilsモジュールにNgramSegmenterクラスを入れてます。これでNgramでぶつ切ります。デフォルトでBigramですが、ノイズがハンパないので今回はTrigramで使用しています。以下ソース

class NgramSegmenter:
  _word_delimiter_regex = u"[。、" + string.punctuation + " ]" 
  def __init__(self, text, sp=2, word_delimiter_regex=None):

    if not word_delimiter_regex:
        word_delimiter_regex = self._word_delimiter_regex

    self.text = re.sub(word_delimiter_regex, r'', text)
    self.ngramArr = []
    for pos in range(len(self.text)-sp+1):
      self.ngramArr.append({
        'word_pos' : pos,
        'word_text' : self.text[pos:pos+sp]})
 
  def getText(self):
    return self.text

  def getNgramArr(self):
    return self.ngramArr

  def getSegmenter(self):
    a = [x['word_text'] for x in self.ngramArr]
    return a
    #return ' '.join(a)

if __name__ == '__main__':
  LOG_FILENAME = 'segmenter.log'
  logging.basicConfig(filename=LOG_FILENAME,level=logging.DEBUG)
  text = u'やってみせ 言って聞かせて させて見せ ほめてやらねば 人は動かじ' 
  ana2Str = NgramSegmenter(text , 2)
  print ana2Str.getSegmenter()

Reidsプロクシ

GAEからredisサーバを使用する為にシンプルなWebアプリ作りました。redisはappend操作もアトミックに動くのでとても楽ちんです。登録する時には、GETでもPOSTでも受け付けます。参照するときにはGETのみ。テスト用にルートにアクセスすると登録用のフォームもあります。abかけてみましたが、秒間数百行けてるです。

#!/usr/bin/env python
import os
import redis
from flask import Flask, request
import json
app = Flask(__name__)
app.debug = True

@app.route('/')
def redis_input():
  html = '''
  <!doctype html>
  <form action="/api/post" method="post">
  Key:<input type=text name="key"><br>
  Val:<input type=text name="val"><br>
  <input type=submit value="save">
  </form>
  '''
  return html

# GET method ================================================
@app.route('/api/set/<key>/<val>')
def set_id(key, val):
  gram = 'redis_' + key
  twit_id = str(val)
  r = redis.Redis(host='localhost', port=6379, db=0)
  r.rpush(gram, twit_id)
  cur_list = r.lrange(gram, 0, -1)
  return json.dumps(cur_list, indent=2)


@app.route('/api/get/<key>')
def get_id(key):
  gram = 'redis_' + key
  r = redis.Redis(host='localhost', port=6379, db=0)
  cur_list = r.lrange(gram, 0, -1)
  return json.dumps(cur_list, indent=2) 


# POST method ================================================
@app.route('/api/post', methods=['POST'])
def set_post_id():
  gram = 'redis_' + request.form['key']
  twit_id = str(request.form['val'])
  r = redis.Redis(host='localhost', port=6379, db=0)
  r.rpush(gram, twit_id)
  cur_list = r.lrange(gram, 0, -1)
  return json.dumps(cur_list, indent=2)

if __name__ == '__main__':
  app.run(host='0.0.0.0', port=8080)

TaskQueueに超ハマッた!

GAE-jのグループにも聞いてしまったのですが、TaskQueueに登録するところではまりました。
TaskQueueで登録できるTaskは、自アプリの特定URLオンリーなので外部URLをTaskに登録することが出来ません。そこで、外部サーバーを叩くエンドポイントを作ってそこをTaskQueueで叩くようにしたのですが、正常終了しているはずなのにTaskが削除されずに延々リトライを続けて、GAEのCPUとバンドをドンドン食いつぶす病にかかってしまいました。

以下、駄目だったコード

@app.route('/api/send_redis', methods=['POST'])
def saveRedisTwitSearchIndex():
  gram = request.form['gram']
  twit_id =request.form['twit_id']
  ext_url = 'http://redis.hoge.com/api/post'
  form_fields = {
     "key": gram,
     "val": twit_id,
     }
  form_data = urllib.urlencode(form_fields)
  result = urlfetch.fetch(url=ext_url,
     payload=form_data,
     method=urlfetch.POST,
     )
  if result.status_code == 200:
    return 1
  return 1

ほぼサンプルからコピペした寄せ集めなので動くはずなんですが、全然だめで、原因もわかりません。
今も原因が分からないですが、どうやら if があるとダメぽいです。

  if result.status_code == 200:
    return 1

この部分をなくすとうまくいきました。動いている現在のコード

@app.route('/api/send_redis', methods=['POST'])
def saveRedisTwitSearchIndex():
  gram = request.form['gram']
  twit_id =request.form['twit_id'] 
  ext_url = 'http://redis.atusi.me/api/post'
  form_fields = {
      "key": gram,
      "val": twit_id,
      }
  form_data = urllib.urlencode(form_fields)
  result = urlfetch.fetch(url=ext_url,
      payload=form_data,
      method=urlfetch.POST,
      )
  '''
  if result.status_code == 200:
    #return result.status_code
    return json.loads(result.content)
  '''
  return "gram: %s<br>twitid: %s"%(gram, twit_id)

コメントアウトするだけで動きました。なんでやねん!
上記変更を加えてから、1000以上のTaskQueueも数分で消化できるようになりました。

検索結果は、GAEの上限の1000件まで返すようになってます。トップページは、いっぱい出ても意味が無いので100件に絞ってます。他にも色々つまずいた所とがあったような気がしたけど忘れた・・・

redisへのFlaskアプリのベンチーマーク結果

Document Path:          /api/set/hoge1/1
Document Length:        17 bytes

Concurrency Level:      100
Time taken for tests:   0.439 seconds
Complete requests:      100
Failed requests:        98
   (Connect: 0, Receive: 0, Length: 98, Exceptions: 0)
Write errors:           0
Total transferred:      59828 bytes
HTML transferred:       40740 bytes
Requests per second:    227.68 [#/sec] (mean)
Time per request:       439.219 [ms] (mean)
Time per request:       4.392 [ms] (mean, across all concurrent requests)
Transfer rate:          133.02 [Kbytes/sec] received

Connection Times (ms)
              min  mean[+/-sd] median   max
Connect:        8   15   3.3     16      21
Processing:    38  154  81.8    144     431
Waiting:       38  154  81.6    144     428
Total:         48  169  83.6    160     439