技術をかじる猫

適当に気になった技術や言語、思ったこと考えた事など。

JavaでActorを書く

並列で処理させるには複数のインスタンスを作る必要がありますが、ざっくりActorを実装。

Actor ってなんぞや?

http://ja.wikipedia.org/wiki/%E3%82%A2%E3%82%AF%E3%82%BF%E3%83%BC%E3%83%A2%E3%83%87%E3%83%AB

アクターモデルのこと。 Simla とか Smalltalk やら色々参考に作られてるらしい。 重要な処理は下記。

  1. 何等かの呼び出しに対する処理を Actor という処理単位にまとめる。
  2. Actor の外からメッセージという形で処理するデータを渡す。
  3. Actor 内部では、受け取ったメッセージをメッセージキューに貯めておく。*1
  4. Actor 内部スレッドにより、キューの中の処理を1件づつ処理する。

見た目のインパクト優先で利用方法だけ先に。

public static void main(String[] args) throws Exception {
    // 上記 1 に該当
    SimpleActor<String, String> sampleActor = new SimpleActor<String, String>() {

        // 上記の 4 に該当。
        @Override
        protected String execute(String message) {
            // 何か超重い処理なんかここに。
            return "Hello! mr," + message;
        }
    };

    // 上記 2 の処理。Future は非同期に値を取得するための予約チケットみたいなもの
    Future<String, String> future = sampleActor.sendToFuture("White - azalea");

    // 他にすることもないので処理完了待ち。
    // (この程度の処理ならこのループが回ることはほぼ無いが、、、)
    while(!future.isFinished()) {
        Thread.sleep(10);
    }

    System.out.println("Finish : " + future.getResult().result);
}

すると、以下のような利点が得られる。

  • 非同期実行なので、重そうな処理を Actor に流して、他の処理ができる。
  • Actor インスタンスを増やせば、その分並列実行できる。
  • 見ての通り syncronized なし、共有リソースも Actor で囲ってしまえば、デッドロックもない筈。*2

もちろん、使いやすい場所と使いにくい場所があって、使いやすいのは線型的な処理や集計等、使いにくいのは継続実行型無限ループなスレッド(ゲームのメインループとか)。

きちんとした実装や、高機能なActorが欲しい場合は Akka が Java にも対応しています。

いざ実装へ

もちろんお仕事とかだと「OSS 勝手に入れるな!」というのはごもっとも。後、実装の勉強するのにAkka 読むとか正気の沙汰じゃない。

そんなあなたに Java の割と必要最低限機能セットなコードをぽとり。

まずは結果型。Actor の処理結果をまとめるクラス。

    package com.company.actor;
    
    /**
     * 結果を通知するためのデータ構造
     */
    public class Result<R> {
        public R result;
        public Exception error;
    }

後、サンプルでは使ってなかったけども、処理結果のコールバック。

package com.company.actor;

/**
 * Actor の処理が終わった際のコールバック
 */
public interface IFinishCallback<R> {
    void success(R result);
    void failed(Exception error);
}

上記を踏まえて、Future(上の説明で言う所のActor処理結果待ちの予約票)

package com.company.actor;

public class Future<M, R> {

    private M message = null;

    public Future(M message) {
        this.message = message;
    }

    public M getMessage() {
        return this.message;
    }

    private Result<R> result = null;

    /**
     * 処理が終わったかどうか
     * @return
     */
    public boolean isFinished() {
        synchronized (this) {
            return result != null;
        }
    }

    /**
     * 処理結果を取得する。
     */
    public Result<R> getResult() throws Exception {
        if (this.result == null)
            throw new IllegalAccessException("Item does not initialised.");
        return this.result;
    }

    /**
     * 結果を記録する
     * @param r
     */
    void setResult(Result<R> r) {
        synchronized (this) {
            this.result = r;
        }
        executeCallback();
    }

    private IFinishCallback<R> callback = null;

    /**
     * 成功・失敗時のコールバックを設定する。
     * @param callback
     */
    public void setCallback(IFinishCallback<R> callback) {
        synchronized (this) {
            this.callback = callback;

            // 既に成功済みなら実行する
            if (this.result != null)
                executeCallback();
        }
    }

    private void executeCallback() {
        synchronized (this) {
             if (this.callback != null) {
                 if (this.result.error != null) {
                     this.callback.failed(this.result.error);
                 } else {
                     this.callback.success(this.result.result);
                 }
             }
        }
    }
}

そしてお待ちかね Actor の処理本体。

package com.company.actor;

import java.util.ArrayList;
import java.util.List;
import java.util.Queue;
import java.util.concurrent.ConcurrentLinkedQueue;

public abstract class SimpleActor<M, R> implements Runnable {

    private Queue<M>           messages = new ConcurrentLinkedQueue<M>();
    private List<Future<M, R>> futures  = new ArrayList<Future<M, R>>();

    Thread self = new Thread(this);

    private boolean isStateRunnable() {
        Thread.State state = self.getState();
        return state == Thread.State.NEW || state == Thread.State.TERMINATED;
    }

    /**
     * メッセージを送信する。
     * @param message
     */
    public void send(M message) {
        synchronized (self) {
            messages.add(message);
            if (self != null && !isStateRunnable()) {
                self.start();
            }
        }
    }

    /**
     * 結果予約を返す。
     * @param message
     * @return
     */
    public Future<M, R> sendToFuture(M message) {
        Future<M, R> future = new Future<M, R>(message);
        synchronized (self) {
            futures.add(future);
            messages.add(message);
            if (self != null && isStateRunnable()) {
                self.start();
            }
        }
        return future;
    }

    @Override
    public void run() {
        while(messages.size() > 0) {
            M message = messages.poll();
            Result<R> res = new Result<R>();
            try {
                R result = execute(message);
                res.result = result;
            } catch (Exception e) {
                error(message, e);
                res.error = e;
            }
            synchronized (self) {
                setResult(message, res);
            }
        }
    }

    /**
     * 実行結果を対応するイベントハンドラへ通知する。
     * @param message
     * @param result
     */
    private void setResult(M message, Result<R> result) {
        Future<M, R> target = null;
        for (Future<M, R> value : futures) {
            if (value.getMessage() == message) {
                target = value;
                break;
            }
        }

        // 該当するものがあれば結果を通知してリストから削除する
        if (target != null) {
            target.setResult(result);
            futures.remove(target);
        }
    }

    /**
     * このアクターの処理
     * @param message
     * @return
     */
    protected abstract R execute(M message);

    /**
     * 何か問題があった時にイベントを受け取るハンドラ
     * @param message
     * @param e
     */
    protected void error(M message, Exception e) {}
}

たかだか 2h 程度で作ったにしては上出来か、、、。 UT 書いてないから異常系とかボロクソだと思うけどもw

*1:ここの動作をプログラマは意識しなくてよい

*2:Actor 内部の処理でクラス変数なんかにアクセスしてるとスレッドセーフは知らないけども