GeekFactory

int128.hatenablog.com

大量のエンティティを処理するデザインパターン

データストアにある大量のエンティティを処理したい場合、クエリ結果を複数のタスクに分散して処理する必要があります。クエリ結果のカーソルを次のタスクに引き継ぐパターンをテンプレート化してみました。

基本的な流れはこんな感じ。

  1. タスクが実行される。
  2. 件数制限付きのクエリを実行する。
    • abstract query()
  3. 結果リストを処理する。
    • abstract run(List)
  4. タスク実行経過時間が6秒以内*1の場合は2に戻る。
  5. すべてのエンティティを処理済みの場合は終了する。
  6. 結果リストのカーソルをパラメータに保存し、次のタスクをenqueueする。

使い方はこんな感じ。

  • QueryTask をextendsする。
  • query() にクエリを書く。
protected S3QueryResultList<Hoge> query() throws Exception
{
    return Datastore.query(getMeta())
        .filter(getMeta().hage.equal(...))
        .limit(50)
        .asQueryResultList();
}
  • run(List) にエンティティ処理を書く。
protected void run(List<Hoge> resultList) throws Exception
{
    for(Hoge hoge : resultList) {
        hoge.setHage(...);
    }
    Datastore.put(resultList);
}

QueryTaskクラスはこんな感じ。

public abstract class QueryTask<M> extends Controller
//...
protected Navigation run() throws Exception
{
    startedTimeMillis = System.currentTimeMillis();
    int done = 0;
    
    Integer unit = asInteger("u");
    String cursor = asString("c");
    String filters = asString("f");
    String sorts = asString("s");
    if(cursor==null || filters==null || sorts==null || unit==null) {
        S3QueryResultList<M> resultList = query();
        unit = resultList.size();
        run(resultList);
        done += unit;
        if(!resultList.hasNext()) {
            logger.info(done+" entities processed in "+getElapsedTimeMillis()+" ms");
            return null;
        }
        cursor = resultList.getEncodedCursor();
        filters = resultList.getEncodedFilters();
        sorts = resultList.getEncodedSorts();
    }
    for(;;) {
        S3QueryResultList<M> resultList = Datastore.query(getMeta())
                .encodedCursor(cursor)
                .encodedFilters(filters)
                .encodedSorts(sorts)
                .limit(unit)
                .asQueryResultList();
        run(resultList);
        done += unit;
        if(!resultList.hasNext()) {
            logger.info(done+" entities processed in "+getElapsedTimeMillis()+" ms");
            return null;
        }
        cursor = resultList.getEncodedCursor();
        filters = resultList.getEncodedFilters();
        sorts = resultList.getEncodedSorts();
        if(getElapsedTimeMillis() > taskTimeMillis) {
            TaskHandle task = queue.add(TaskOptions.Builder
                    .method(Method.POST)
                    .url(request.getRequestURI())
                    .taskName(UUID.randomUUID().toString())
                    .param("u", unit.toString())
                    .param("c", cursor)
                    .param("f", filters)
                    .param("s", sorts)
                    );
            logger.info("Next task queued as "+task.getName());
            logger.info(done+" entities processed in "+getElapsedTimeMillis()+" ms");
            return null;
        }
    }
}
http://github.com/int128/healthcheck2/blob/9377e61054eeb52714c2d16e7392fadca6103153/src/org/hidetake/app/healthcheck2/util/QueryTask.java

*1:タスクの実行時間を10秒以内にする場合