Javaã§Actorãæ¸ã
並åã§å¦çãããã«ã¯è¤æ°ã®ã¤ã³ã¹ã¿ã³ã¹ãä½ãå¿ è¦ãããã¾ããããã£ããActorãå®è£ ã
Actor ã£ã¦ãªãããï¼
http://ja.wikipedia.org/wiki/%E3%82%A2%E3%82%AF%E3%82%BF%E3%83%BC%E3%83%A2%E3%83%87%E3%83%AB
ã¢ã¯ã¿ã¼ã¢ãã«ã®ãã¨ã Simla ã¨ã Smalltalk ããè²ã åèã«ä½ããã¦ããããã éè¦ãªå¦çã¯ä¸è¨ã
- ä½çãã®å¼ã³åºãã«å¯¾ããå¦çã Actor ã¨ããå¦çåä½ã«ã¾ã¨ããã
- Actor ã®å¤ããã¡ãã»ã¼ã¸ã¨ããå½¢ã§å¦çãããã¼ã¿ã渡ãã
- Actor å é¨ã§ã¯ãåãåã£ãã¡ãã»ã¼ã¸ãã¡ãã»ã¼ã¸ãã¥ã¼ã«è²¯ãã¦ããã*1
- Actor å é¨ã¹ã¬ããã«ããããã¥ã¼ã®ä¸ã®å¦çã1件ã¥ã¤å¦çããã
è¦ãç®ã®ã¤ã³ãã¯ãåªå ã§å©ç¨æ¹æ³ã ãå ã«ã
public static void main(String[] args) throws Exception { // ä¸è¨ 1 ã«è©²å½ SimpleActor<String, String> sampleActor = new SimpleActor<String, String>() { // ä¸è¨ã® 4 ã«è©²å½ã @Override protected String execute(String message) { // ä½ãè¶ éãå¦çãªããããã«ã return "Hello! mr," + message; } }; // ä¸è¨ 2 ã®å¦çãFuture ã¯éåæã«å¤ãåå¾ããããã®äºç´ãã±ããã¿ãããªãã® Future<String, String> future = sampleActor.sendToFuture("White - azalea"); // ä»ã«ãããã¨ããªãã®ã§å¦çå®äºå¾ ã¡ã // (ãã®ç¨åº¦ã®å¦çãªããã®ã«ã¼ããåããã¨ã¯ã»ã¼ç¡ããããã) while(!future.isFinished()) { Thread.sleep(10); } System.out.println("Finish : " + future.getResult().result); }
ããã¨ã以ä¸ã®ãããªå©ç¹ãå¾ãããã
- éåæå®è¡ãªã®ã§ãéãããªå¦çã Actor ã«æµãã¦ãä»ã®å¦çãã§ããã
- Actor ã¤ã³ã¹ã¿ã³ã¹ãå¢ããã°ããã®å並åå®è¡ã§ããã
- è¦ã¦ã®éã syncronized ãªããå ±æãªã½ã¼ã¹ã Actor ã§å²ã£ã¦ãã¾ãã°ããããããã¯ããªãçã*2
ãã¡ããã使ããããå ´æã¨ä½¿ãã«ããå ´æããã£ã¦ã使ããããã®ã¯ç·åçãªå¦çãéè¨çã使ãã«ããã®ã¯ç¶ç¶å®è¡åç¡éã«ã¼ããªã¹ã¬ãã(ã²ã¼ã ã®ã¡ã¤ã³ã«ã¼ãã¨ã)ã
ãã¡ãã¨ããå®è£ ããé«æ©è½ãªActorã欲ããå ´å㯠Akka ã Java ã«ã対å¿ãã¦ãã¾ãã
ããå®è£ ã¸
ãã¡ãããä»äºã¨ãã ã¨ãOSS åæã«å ¥ãããªï¼ãã¨ããã®ã¯ããã£ã¨ããå¾ãå®è£ ã®åå¼·ããã®ã«Akka èªãã¨ãæ£æ°ã®æ²æ±°ãããªãã
ãããªããªãã« Java ã®å²ã¨å¿ è¦æä½éæ©è½ã»ãããªã³ã¼ããã½ã¨ãã
ã¾ãã¯çµæåãActor ã®å¦ççµæãã¾ã¨ããã¯ã©ã¹ã
package com.company.actor; /** * çµæãéç¥ããããã®ãã¼ã¿æ§é */ public class Result<R> { public R result; public Exception error; }
å¾ããµã³ãã«ã§ã¯ä½¿ã£ã¦ãªãã£ããã©ããå¦ççµæã®ã³ã¼ã«ããã¯ã
package com.company.actor; /** * Actor ã®å¦çãçµãã£ãéã®ã³ã¼ã«ãã㯠*/ public interface IFinishCallback<R> { void success(R result); void failed(Exception error); }
ä¸è¨ãè¸ã¾ãã¦ãFuture(ä¸ã®èª¬æã§è¨ãæã®Actorå¦ççµæå¾ ã¡ã®äºç´ç¥¨)
package com.company.actor; public class Future<M, R> { private M message = null; public Future(M message) { this.message = message; } public M getMessage() { return this.message; } private Result<R> result = null; /** * å¦çãçµãã£ããã©ãã * @return */ public boolean isFinished() { synchronized (this) { return result != null; } } /** * å¦ççµæãåå¾ããã */ public Result<R> getResult() throws Exception { if (this.result == null) throw new IllegalAccessException("Item does not initialised."); return this.result; } /** * çµæãè¨é²ãã * @param r */ void setResult(Result<R> r) { synchronized (this) { this.result = r; } executeCallback(); } private IFinishCallback<R> callback = null; /** * æåã»å¤±ææã®ã³ã¼ã«ããã¯ãè¨å®ããã * @param callback */ public void setCallback(IFinishCallback<R> callback) { synchronized (this) { this.callback = callback; // æ¢ã«æåæ¸ã¿ãªãå®è¡ãã if (this.result != null) executeCallback(); } } private void executeCallback() { synchronized (this) { if (this.callback != null) { if (this.result.error != null) { this.callback.failed(this.result.error); } else { this.callback.success(this.result.result); } } } } }
ããã¦ãå¾ ã¡ãã Actor ã®å¦çæ¬ä½ã
package com.company.actor; import java.util.ArrayList; import java.util.List; import java.util.Queue; import java.util.concurrent.ConcurrentLinkedQueue; public abstract class SimpleActor<M, R> implements Runnable { private Queue<M> messages = new ConcurrentLinkedQueue<M>(); private List<Future<M, R>> futures = new ArrayList<Future<M, R>>(); Thread self = new Thread(this); private boolean isStateRunnable() { Thread.State state = self.getState(); return state == Thread.State.NEW || state == Thread.State.TERMINATED; } /** * ã¡ãã»ã¼ã¸ãéä¿¡ããã * @param message */ public void send(M message) { synchronized (self) { messages.add(message); if (self != null && !isStateRunnable()) { self.start(); } } } /** * çµæäºç´ãè¿ãã * @param message * @return */ public Future<M, R> sendToFuture(M message) { Future<M, R> future = new Future<M, R>(message); synchronized (self) { futures.add(future); messages.add(message); if (self != null && isStateRunnable()) { self.start(); } } return future; } @Override public void run() { while(messages.size() > 0) { M message = messages.poll(); Result<R> res = new Result<R>(); try { R result = execute(message); res.result = result; } catch (Exception e) { error(message, e); res.error = e; } synchronized (self) { setResult(message, res); } } } /** * å®è¡çµæã対å¿ããã¤ãã³ããã³ãã©ã¸éç¥ããã * @param message * @param result */ private void setResult(M message, Result<R> result) { Future<M, R> target = null; for (Future<M, R> value : futures) { if (value.getMessage() == message) { target = value; break; } } // 該å½ãããã®ãããã°çµæãéç¥ãã¦ãªã¹ãããåé¤ãã if (target != null) { target.setResult(result); futures.remove(target); } } /** * ãã®ã¢ã¯ã¿ã¼ã®å¦ç * @param message * @return */ protected abstract R execute(M message); /** * ä½ãåé¡ããã£ãæã«ã¤ãã³ããåãåããã³ãã© * @param message * @param e */ protected void error(M message, Exception e) {} }
ããã ã 2h ç¨åº¦ã§ä½ã£ãã«ãã¦ã¯ä¸åºæ¥ããããã UT æ¸ãã¦ãªãããç°å¸¸ç³»ã¨ãããã¯ã½ã ã¨æããã©ãï½
*1:ããã®åä½ãããã°ã©ãã¯æèããªãã¦ãã
*2:Actor å é¨ã®å¦çã§ã¯ã©ã¹å¤æ°ãªããã«ã¢ã¯ã»ã¹ãã¦ãã¨ã¹ã¬ããã»ã¼ãã¯ç¥ããªããã©ã