ForkJoinPoolã¯Java 7ããå°å ¥ãããæ°ããExecutorã®ãã¬ã¼ã ã¯ã¼ã¯ã§ãã æ§æ¥ã®Executorã¨éãã®ã¯ãã¿ã¹ã¯ã®ã¹ã±ã¸ã¥ã¼ã«ã®ã¢ã«ã´ãªãºã ã¨ãã¦ãwork-stealingãæ¡ç¨ãã¦ãããã¨ã§ããããã¯å帰å¦çãã¿ã¹ã¯ã®ä¸ã§æ´ã«ç´°ããªåã¿ã¹ã¯ãçæããããããªè¨ç®å¦çã«é©ãã¦ãã¾ã(ä¾ãã°Webã¯ãã¼ã©ãªã©)
ForkJoinkPoolã«ç»é²ããã¦ããåã¯ã¼ã«ã¼ã¹ã¬ããã¯ãããããã¯ã¼ã«ã¼ãã¥ã¼(å®éã¯LIFOåã®ã¹ã¿ãã¯)ãæã£ã¦ãã¦ãForkJoinTaskãç©ããã¨ãã§ãã¾ããForkJoinTaskã¯å¤é¨ããForkJoinPoolã® execute
, invoke
, submit
ã¡ã½ããã使ã£ã¦ç»é²ãããããããã¯ã¿ã¹ã¯ã®ä¸ã§ç´æ¥å¥ã¿ã¹ã¯ãçæããã® fork
ã¡ã½ãããå¼ã¶ãã¨ã§ç»é²ãããã¨ãã§ãã¾ããforkãããã¿ã¹ã¯ã¯ join
ã¡ã½ããã使ããè¨ç®çµæãå¾
ã¡ã¾ãã
ã¨ãããã¨ã§ãWikipediaã®Work stealingã«ããã¢ãã«ãForkJoinPoolã¨ForkJoinTaskã使ã£ã¦å®è£ ãã¦ã¿ã¾ããForkJoinTaskã«ã¯ããã¤ãã®æ½è±¡ãµãã¯ã©ã¹ããããä»åã¯ãã®ä¸ã®RecursiveTaskã使ãã¾ãã
import java.util.concurrent.ForkJoinPool; import java.util.concurrent.RecursiveTask; class ForkJoinPoolExample { public static void main(String[] args) { int poolSize = Integer.parseInt(args[0]); ForkJoinPool pool = new ForkJoinPool(poolSize); int result = pool.invoke(new F(1, 2)); log("Result is " + result); } static class F extends RecursiveTask<Integer> { private final int a, b; F(int a, int b) { this.a = a; this.b = b; } @Override protected Integer compute() { log(String.format("Start compute of f(%d, %d) = g(%d) + h(%d)", a, b, a, b)); G g = new G(a); g.fork(); sleep(1000); H h = new H(b); final int result = h.compute() + g.join(); log(String.format("f(%d, %d) = %d", a, b, result)); return result; } } static class G extends RecursiveTask<Integer> { private final int a; G(int a) { this.a = a; } @Override protected Integer compute() { log(String.format("Start compute of g(%d) = %<d * 2", a)); final int result = a * 2; log(String.format("g(%d) = %d", a, result)); return result; } } static class H extends RecursiveTask<Integer> { private final int a; H(int a) { this.a = a; } @Override protected Integer compute() { log(String.format("Start compute of h(%d) = g(%<d) + (%<d + 1)", a)); G g = new G(a); g.fork(); sleep(1000); int c = a + 1; final int result = c + g.join(); log(String.format("h(%d) = %d", a, result)); return result; } } private static void log(String message) { System.out.println(String.format("%tT.%<tL [%s] %s", System.currentTimeMillis(), Thread.currentThread().getName(), message)); } private static void sleep(long millis) { try { Thread.sleep(millis); } catch (InterruptedException e) { e.printStackTrace(); } } }
æåã¯å¦çé ã確èªããããããã¼ã«ã®ãµã¤ãºã1ã«ãã¦ã¿ã¾ãã
$ java ForkJoinPoolExample 1 01:37:23.239 [ForkJoinPool-1-worker-1] Start compute of f(1, 2) = g(1) + h(2) 01:37:24.293 [ForkJoinPool-1-worker-1] Start compute of h(2) = g(2) + (2 + 1) 01:37:25.295 [ForkJoinPool-1-worker-1] Start compute of g(2) = 2 * 2 01:37:25.295 [ForkJoinPool-1-worker-1] g(2) = 4 01:37:25.295 [ForkJoinPool-1-worker-1] h(2) = 7 01:37:25.296 [ForkJoinPool-1-worker-1] Start compute of g(1) = 1 * 2 01:37:25.296 [ForkJoinPool-1-worker-1] g(1) = 2 01:37:25.297 [ForkJoinPool-1-worker-1] f(1, 2) = 9 01:37:25.297 [main] Result is 9
ãã¼ã«ãµã¤ãºã2以ä¸ã®å ´åã¯forkãããã¿ã¹ã¯ã¯ç©ºãã¹ã¬ãããããã°é 次æ¶è²»ããã¦ããã¾ãã
$ java ForkJoinPoolExample 2 01:37:43.282 [ForkJoinPool-1-worker-1] Start compute of f(1, 2) = g(1) + h(2) 01:37:43.292 [ForkJoinPool-1-worker-0] Start compute of g(1) = 1 * 2 01:37:43.293 [ForkJoinPool-1-worker-0] g(1) = 2 01:37:44.300 [ForkJoinPool-1-worker-1] Start compute of h(2) = g(2) + (2 + 1) 01:37:44.300 [ForkJoinPool-1-worker-0] Start compute of g(2) = 2 * 2 01:37:44.301 [ForkJoinPool-1-worker-0] g(2) = 4 01:37:45.305 [ForkJoinPool-1-worker-1] h(2) = 7 01:37:45.306 [ForkJoinPool-1-worker-1] f(1, 2) = 9 01:37:45.306 [main] Result is 9