実用的な同時並行ソートを実装する話

scala の parallel collection は、普通の collection を使うように使っているだけで、並列計算の恩恵を受けられる場合も多いのですが、そうではない場合も多いです。

その典型例が、ソートです。実は、並列のソートは、まだ実装されていません。

じゃあ、自分で実装すればいいのか、というと、これは半分正しくて半分間違いです。

どういう事かというと、シングルスレッドの java.util.Arrays.sort がとても高速なので、これより速いソートを自前で実装するのは難しいのです。Arrays.sort が速いのは、単にアルゴリズムだけの問題ではありません。自分で Array の整列処理を書くとすると、当然、配列の要素に添字でアクセスしますよね。すると、JVMは配列の範囲内かどうかをチェックしますよね。このオーバヘッドがあるために、java.util.Arrays.sort に勝つ処理を自前で書くのは困難なのです。

そこで、java.util.Arrays.sort を利用しつつ、並列でソートをする処理を考えてみました。

アイディアは簡単です。

  • 配列を前半と後半の2つに分けます
  • それぞれを同時並行して整列(Arrays.sort)します
  • 最後にマージします。

これをコードに直しましょう。

import java.util.Arrays

abstract class Sorter {
  def sorted(a: Array[Int]): Array[Int]
}

// 普通のソート
object SimpleSorter extends Sorter {
  def sorted(a: Array[Int]) = {
    Arrays.sort(a)
    a
  }
}

// 並行ソート
object DivideAndMergeParallelSorter extends Sorter {
  def sorted(a: Array[Int]) = {
    require(a.length >= 2)
    import scala.annotation.tailrec
    import scala.concurrent.ops._
    val len = a.length
    val half = len / 2
    
    // 注目部分!
    par(Arrays.sort(a, 0, half), Arrays.sort(a, half, len))
    
    val ret = new Array[Int](a.length)
    @tailrec
    def merge(i: Int, j: Int, k: Int) {
      if (a(j) <= a(k)) {
        ret(i) = a(j)
        if (j < half - 1) merge(i + 1, j + 1, k)
        else System.arraycopy(a, k, ret, i + 1, len - k)
      } else {
        ret(i) = a(k)
        if (k < len - 1) merge(i + 1, j, k + 1)
        else System.arraycopy(a, j, ret, i + 1, half - j)
      }
    }
    merge(0, 0, half)
    ret
  }
}

上記のように工夫するだけで、処理時間が4割ほど削減できます。(当方の環境で 1000000要素の Array[Int] を整列するのに要する時間が 138[ms] から 83[ms] に短縮できました)


さて、上記のコードはそれなりに実用的なんですが、ちょっと気になる点がありますね。
CPUのコア数がいくらあっても2コアしか使っていないという点です。なんとなくもったいないですね。8コア環境なら8コアを使うように書きなおしてみましょう。

object DivideAndMergeParallelSorter2 extends Sorter {
  def sorted(a: Array[Int]) = {
    require(a.length >= scala.collection.parallel.availableProcessors)
    import scala.annotation.tailrec
    import scala.concurrent.ops._
    
    val nDiv = collection.parallel.availableProcessors // 整列範囲をコア数に対応するように分割
    val len = a.length
    val pslices = (0 until nDiv).par map {i => Arrays.copyOfRange(a, i * len / nDiv, (i + 1) * len / nDiv)}
    pslices foreach (Arrays.sort _)
    
    def merge(a: Array[Int], b: Array[Int]): Array[Int] = {
      val alen = a.length
      val blen = b.length
      val ret = new Array[Int](alen + blen);
      @tailrec def rec(i: Int, j: Int, k: Int) {
        if (a(j) <= b(k)) {
          ret(i) = a(j)
          if (j < alen - 1) rec(i + 1, j + 1, k)
          else System.arraycopy(b, k, ret, i + 1, blen - k)
        } else {
          ret(i) = b(k)
          if (k < blen - 1) rec(i + 1, j, k + 1)
          else System.arraycopy(a, j, ret, i + 1, alen - j)          
        }
      }
      rec(0, 0, 0)
      ret
    }
    pslices reduce merge
  }
}

4コア以上((Hyper Threading のコアじゃなくて、実コアじゃないとダメかもしれませんが)))環境なら、さらに高速に整列できるようになりました。HT8コア(実4コア)の環境での実測で、シングルスレッドの4割の実行時間にまで短縮できました。

手前味噌ですが、上記の手法は、それなりに実用的と言えるんではないでしょうか。(ただし、上記コード自体は、Int限定だったり整列される配列の要素数がコア数以上だと決め打ちしていたりしていますから、実際に使うには若干の修正が必要です。)


ベンチマーク付きのソースコードをgithubに上げました。