å®ç¨çãªåæ並è¡ã½ã¼ããå®è£ ãã話
scala ã® parallel collection ã¯ãæ®éã® collection ã使ãããã«ä½¿ã£ã¦ããã ãã§ã並åè¨ç®ã®æ©æµãåããããå ´åãå¤ãã®ã§ãããããã§ã¯ãªãå ´åãå¤ãã§ãã
ãã®å ¸åä¾ããã½ã¼ãã§ããå®ã¯ã並åã®ã½ã¼ãã¯ãã¾ã å®è£ ããã¦ãã¾ããã
ããããèªåã§å®è£ ããã°ããã®ããã¨ããã¨ãããã¯ååæ£ããã¦ååééãã§ãã
ã©ãããäºãã¨ããã¨ãã·ã³ã°ã«ã¹ã¬ããã® java.util.Arrays.sort ãã¨ã¦ãé«éãªã®ã§ãããããéãã½ã¼ããèªåã§å®è£ ããã®ã¯é£ããã®ã§ããArrays.sort ãéãã®ã¯ãåã«ã¢ã«ã´ãªãºã ã ãã®åé¡ã§ã¯ããã¾ãããèªå㧠Array ã®æ´åå¦çãæ¸ãã¨ããã¨ãå½ç¶ãé åã®è¦ç´ ã«æ·»åã§ã¢ã¯ã»ã¹ãã¾ããããããã¨ãJVMã¯é åã®ç¯å²å ãã©ããããã§ãã¯ãã¾ãããããã®ãªã¼ãããããããããã«ãjava.util.Arrays.sort ã«åã¤å¦çãèªåã§æ¸ãã®ã¯å°é£ãªã®ã§ãã
ããã§ãjava.util.Arrays.sort ãå©ç¨ãã¤ã¤ã並åã§ã½ã¼ããããå¦çãèãã¦ã¿ã¾ããã
ã¢ã¤ãã£ã¢ã¯ç°¡åã§ãã
- é åãååã¨å¾åã®ï¼ã¤ã«åãã¾ã
- ãããããåæ並è¡ãã¦æ´å(Arrays.sort)ãã¾ã
- æå¾ã«ãã¼ã¸ãã¾ãã
ãããã³ã¼ãã«ç´ãã¾ãããã
import java.util.Arrays abstract class Sorter { def sorted(a: Array[Int]): Array[Int] } // æ®éã®ã½ã¼ã object SimpleSorter extends Sorter { def sorted(a: Array[Int]) = { Arrays.sort(a) a } } // 並è¡ã½ã¼ã object DivideAndMergeParallelSorter extends Sorter { def sorted(a: Array[Int]) = { require(a.length >= 2) import scala.annotation.tailrec import scala.concurrent.ops._ val len = a.length val half = len / 2 // 注ç®é¨åï¼ par(Arrays.sort(a, 0, half), Arrays.sort(a, half, len)) val ret = new Array[Int](a.length) @tailrec def merge(i: Int, j: Int, k: Int) { if (a(j) <= a(k)) { ret(i) = a(j) if (j < half - 1) merge(i + 1, j + 1, k) else System.arraycopy(a, k, ret, i + 1, len - k) } else { ret(i) = a(k) if (k < len - 1) merge(i + 1, j, k + 1) else System.arraycopy(a, j, ret, i + 1, half - j) } } merge(0, 0, half) ret } }
ä¸è¨ã®ããã«å·¥å¤«ããã ãã§ãå¦çæéã4å²ã»ã©åæ¸ã§ãã¾ãã(å½æ¹ã®ç°å¢ã§ 1000000è¦ç´ ã® Array[Int] ãæ´åããã®ã«è¦ããæéã 138[ms] ãã 83[ms] ã«ç縮ã§ãã¾ããï¼
ãã¦ãä¸è¨ã®ã³ã¼ãã¯ãããªãã«å®ç¨çãªãã§ãããã¡ãã£ã¨æ°ã«ãªãç¹ãããã¾ããã
CPUã®ã³ã¢æ°ãããããã£ã¦ã2ã³ã¢ãã使ã£ã¦ããªãã¨ããç¹ã§ãããªãã¨ãªããã£ãããªãã§ããã8ã³ã¢ç°å¢ãªã8ã³ã¢ã使ãããã«æ¸ããªããã¦ã¿ã¾ãããã
object DivideAndMergeParallelSorter2 extends Sorter { def sorted(a: Array[Int]) = { require(a.length >= scala.collection.parallel.availableProcessors) import scala.annotation.tailrec import scala.concurrent.ops._ val nDiv = collection.parallel.availableProcessors // æ´åç¯å²ãã³ã¢æ°ã«å¯¾å¿ããããã«åå² val len = a.length val pslices = (0 until nDiv).par map {i => Arrays.copyOfRange(a, i * len / nDiv, (i + 1) * len / nDiv)} pslices foreach (Arrays.sort _) def merge(a: Array[Int], b: Array[Int]): Array[Int] = { val alen = a.length val blen = b.length val ret = new Array[Int](alen + blen); @tailrec def rec(i: Int, j: Int, k: Int) { if (a(j) <= b(k)) { ret(i) = a(j) if (j < alen - 1) rec(i + 1, j + 1, k) else System.arraycopy(b, k, ret, i + 1, blen - k) } else { ret(i) = b(k) if (k < blen - 1) rec(i + 1, j, k + 1) else System.arraycopy(a, j, ret, i + 1, alen - j) } } rec(0, 0, 0) ret } pslices reduce merge } }
4ã³ã¢ä»¥ä¸((Hyper Threading ã®ã³ã¢ãããªãã¦ãå®ã³ã¢ãããªãã¨ãã¡ããããã¾ããã)))ç°å¢ãªããããã«é«éã«æ´åã§ããããã«ãªãã¾ãããHT8ã³ã¢(å®4ã³ã¢)ã®ç°å¢ã§ã®å®æ¸¬ã§ãã·ã³ã°ã«ã¹ã¬ããã®4å²ã®å®è¡æéã«ã¾ã§ç縮ã§ãã¾ããã
æåå³åã§ãããä¸è¨ã®ææ³ã¯ããããªãã«å®ç¨çã¨è¨ãããã§ã¯ãªãã§ãããããï¼ãã ããä¸è¨ã³ã¼ãèªä½ã¯ãIntéå®ã ã£ããæ´åãããé åã®è¦ç´ æ°ãã³ã¢æ°ä»¥ä¸ã ã¨æ±ºãæã¡ãã¦ããããã¦ãã¾ããããå®éã«ä½¿ãã«ã¯è¥å¹²ã®ä¿®æ£ãå¿ è¦ã§ããï¼
ãã³ããã¼ã¯ä»ãã®ã½ã¼ã¹ã³ã¼ããgithubに上げましたã