大量のエンティティを処理するデザインパターン
データストアにある大量のエンティティを処理したい場合、クエリ結果を複数のタスクに分散して処理する必要があります。クエリ結果のカーソルを次のタスクに引き継ぐパターンをテンプレート化してみました。
基本的な流れはこんな感じ。
- タスクが実行される。
- 件数制限付きのクエリを実行する。
- abstract query()
- 結果リストを処理する。
- abstract run(List
)
- abstract run(List
- タスク実行経過時間が6秒以内*1の場合は2に戻る。
- すべてのエンティティを処理済みの場合は終了する。
- 結果リストのカーソルをパラメータに保存し、次のタスクをenqueueする。
使い方はこんな感じ。
- QueryTask
をextendsする。 - query() にクエリを書く。
protected S3QueryResultList<Hoge> query() throws Exception { return Datastore.query(getMeta()) .filter(getMeta().hage.equal(...)) .limit(50) .asQueryResultList(); }
- run(List
) にエンティティ処理を書く。
protected void run(List<Hoge> resultList) throws Exception { for(Hoge hoge : resultList) { hoge.setHage(...); } Datastore.put(resultList); }
QueryTaskクラスはこんな感じ。
public abstract class QueryTask<M> extends Controller //... protected Navigation run() throws Exception { startedTimeMillis = System.currentTimeMillis(); int done = 0; Integer unit = asInteger("u"); String cursor = asString("c"); String filters = asString("f"); String sorts = asString("s"); if(cursor==null || filters==null || sorts==null || unit==null) { S3QueryResultList<M> resultList = query(); unit = resultList.size(); run(resultList); done += unit; if(!resultList.hasNext()) { logger.info(done+" entities processed in "+getElapsedTimeMillis()+" ms"); return null; } cursor = resultList.getEncodedCursor(); filters = resultList.getEncodedFilters(); sorts = resultList.getEncodedSorts(); } for(;;) { S3QueryResultList<M> resultList = Datastore.query(getMeta()) .encodedCursor(cursor) .encodedFilters(filters) .encodedSorts(sorts) .limit(unit) .asQueryResultList(); run(resultList); done += unit; if(!resultList.hasNext()) { logger.info(done+" entities processed in "+getElapsedTimeMillis()+" ms"); return null; } cursor = resultList.getEncodedCursor(); filters = resultList.getEncodedFilters(); sorts = resultList.getEncodedSorts(); if(getElapsedTimeMillis() > taskTimeMillis) { TaskHandle task = queue.add(TaskOptions.Builder .method(Method.POST) .url(request.getRequestURI()) .taskName(UUID.randomUUID().toString()) .param("u", unit.toString()) .param("c", cursor) .param("f", filters) .param("s", sorts) ); logger.info("Next task queued as "+task.getName()); logger.info(done+" entities processed in "+getElapsedTimeMillis()+" ms"); return null; } } }http://github.com/int128/healthcheck2/blob/9377e61054eeb52714c2d16e7392fadca6103153/src/org/hidetake/app/healthcheck2/util/QueryTask.java
*1:タスクの実行時間を10秒以内にする場合