ä»åº¦ã¯ãFutures and Promisesã§ãã
http://docs.scala-lang.org/overviews/core/futures.html
æ¥æ¬èªè¨³ã
http://docs.scala-lang.org/ja/overviews/core/futures.html
ããããéãå¤ããªããããªã®ã§ãåå²ãã¦ããã¾ããã¾ãã¯Futureããã
Futureã£ã¦ï¼
ããè¨ç®å¦çãéåæï¼ä¸¦åï¼ã«å®è¡ãã¦ããã¦ãçµæã¯å¾ããåå¾ãããã¨ãã£ããã®ã§ãããã«ãã¹ã¬ããç³»ã®Futureãã¿ã¼ã³ããJavaã ã¨java.util.concurrent.Futureã§ã馴æã¿ããã§ãããããããã°ãScalaèªèº«ã®Actorããã±ã¼ã¸ã«ããã£ãæ°ããã¾ããâ¦ã
ã¾ãã2.10.0ã§è¿½å ãããFutureã¨ãããã¨ã§ã
ãµã³ãã«ã¯ãããªæãã§ãã
import scala.concurrent._ import ExecutionContext.Implicits.global import scala.util.{Failure, Success} val session = ... val f: Future[List[String]] = future { session.getRecentPosts } f onComplete { case Success(posts) => for (post <- posts) println(post) case Failure(t) => println("ã¨ã©ã¼ãçºçãã: " + t.getMessage) }
futureé¢æ°ã§ãFutureãä½æãããã¨ãã§ãã¾ããã¾ããFutureã®è¨ç®çµæã«ã¯æåã¨å¤±æãããã¾ããè¨ç®å¦çã§ä¾å¤ãã¹ãã¼ãããã¨ã失æã¨ãªãã¿ããã§ããã
以ä¸ã®importæã¯ãããã©ã«ãã®å®è¡ã³ã³ããã¹ãã使ç¨ãããã¨ã示ãã¦ãã¾ãã
import ExecutionContext.Implicits.global
ãã®å¤ããFutureãä½æãããå®è¡ãããããæã®å®è¡ã³ã³ããã¹ãã¨ãã¦Implicit Parameterã¨ããå½¢ã§ä¸ãããã¾ããããã©ã«ãã®å®è¡ã³ã³ããã¹ã以å¤ãã©ããã£ã¦ä½ãã®ãã¯â¦ã§ãããã¨ãããããä»ã¯ããã®importãå¿ è¦ã§ãããã¨ã
ã¾ããfutureé¢æ°ã®çµæã¨ãã¦è¿ã£ã¦ããFutureãã¬ã¤ãã®ã¤ã³ã¹ã¿ã³ã¹ã§ãããããèªèº«ã«å¾
ã¡åããã®æ©è½ã¯ããã¾ããã
ï¼Javaã¯ãFuture#getã§è¨ç®çµæãå¾ãããã¾ã§å¾
ã¡åããããã¨ãã§ãã¾ã
http://www.scala-lang.org/api/current/index.html#scala.concurrent.Future
ã¹ã¿ã³ã¹ã¨ãã¦ã¯ãFutureã使ãæã¯çµæãåæçã«å¾
ã¤ã®ã§ã¯ãªãã¦ãFutureãã¬ã¤ãã®ã¤ã³ã¹ã¿ã³ã¹ã«ã³ã¼ã«ããã¯é¢æ°ãç»é²ãã¦ï¼å
ã®ãµã³ãã«ã§ã¯ãFuture#onCompleteï¼ãéåæã«çµæãåãåã£ã¦å¦çããªãããã¨ããã¹ã¿ã³ã¹ãªããã§ãã
ï¼ãã®æ¹ããæ§è½çã«å¥½ã¾ããã¨ãããã¨ã ããã§
Awaitã¨Duration
ã¨ã¯ãã£ã¦ããFutureã®ããã¯ã°ã©ã¦ã³ãã§ä½¿ããã¦ããã¹ã¬ããã¯ForkJoinPoolã®Workerã¹ã¬ããï¼Daemonã¹ã¬ããï¼ãªã®ã§ãæ®éã«ããã°ã©ã ãå®è¡ãã¦ãã以éã®å¦çããªãã¨ãJavaVMãçµäºãã¦ãã¾ãã¾ãâ¦ã
ï¼ãã®è¾ºããClojureã¨éãã¾ããã
REPLã§ãã£ã¦ãããªãã¨ãããããµã³ãã«ã³ã¼ããæ¸ãã¦ããã¨ããã¥ããã¦ä»æ¹ããªãã®ã§ãå ã«å¾ ã¡åããã®æ¹æ³ããç´¹ä»ãã¾ãã
å¾ ã¡åãããããã«ã¯scala.concurrent.Awaitããå¾ ã¡åããã®æéãæå®ããã«ã¯scala.concurrent.duration.Durationã使ç¨ãã¾ãã
以ä¸ããµã³ãã«ã§ãã
val f = future { Thread.sleep(3000L); 10 } println(Await.result(f, Duration.Inf)) // => 10
ãã®ãµã³ãã«ãå®è¡ããã¨ã3ç§å¾ã«ã³ã¡ã³ãã§è¨è¼ãã¦ãããããªå 容ãã³ã³ã½ã¼ã«ã«åºåããã¾ãã
Await#result
def result[T](awaitable: Awaitable[T], atMost: Duration): T
ã§ãæéã表ãDurationãæå®ãã¦Futureã®çµæãåå¾ãã¾ããçµæã¯ãfutureé¢æ°ã§ä½æããè¨ç®çµæã§ããè¨ç®å¦çã失æããï¼ä¾å¤ãã¹ãã¼ãããï¼å ´åã¯ããã®ã¡ã½ããããä¾å¤ãé£ãã§ãã¾ãã
Await#ready
def ready[T](awaitable: Awaitable[T], atMost: Duration): awaitable.type
Futureã®è¨ç®çµæãå¾ ã¡åãããã ãã®ã¡ã½ããã§ããçµæã¯ãæåï¼å¤±æã«é¢ããããè¨ç®çµæãçµäºããFutureãè¿ã£ã¦ãã¾ãããã¡ãããå¼æ°ã«ä¸ããFutureãè¦ã¦ãããã§ããã
両ã¡ã½ããã¨ããæå®ããDurationã¾ã§ã«çµäºããªãã£ãå ´åã«ã¯java.util.concurrent.TimeoutExceptionãã¹ãã¼ããã¾ãã
Durationã¯ãæéãé·ãã表ãåºåºã¯ã©ã¹ã§ãæéãç¡éã表ç¾ã§ãã¾ããå ã®ä¾
Duration.Inf
ã¯ç¡éã§ããã
Durationã®ã¤ã³ã¹ã¿ã³ã¹ã¯ãä¸è¨ã®ãããªæ¹æ³ã§ä½æãããã¨ãã§ãã¾ããã¾ããããã¯ã¬ã¤ãã®åãã§ããã
import scala.concurrent.Duration import scala.concurrent.duration._ import java.util.concurrent.TimeUnit._ // ä½æ val d1 = Duration(100, MILLISECONDS) // from Long and TimeUnit val d2 = Duration(100, "millis") // from Long and String val d3 = 100 millis // implicitly from Long, Int or Double val d4 = Duration("1.2 µs") // from String
ãã¿ã¼ã³ããããå¯è½ã
// ãã¿ã¼ã³ãããã³ã° val Duration(length, unit) = 5 millis
以éã®ä¾ã§ã¯ããã¹ã¦importã¨ãã¦
import scala.concurrent._ import scala.concurrent.ExecutionContext.Implicits.global import scala.concurrent.duration._ import scala.util.{Failure, Success}
ãæå®ãã¦ãããã®ã¨ããå¾ ã¡åãããè¡ãæéã¯
Duration.Inf
ã§è¡ããã®ã¨ãã¾ãã
ã¾ããFutureãä½æããæã«ã¯
// wait-and-get def wag(waitSec: Int, n: Int): Int = { Thread.sleep(waitSec * 1000L) n }
ã¨æå®æéå¾ æ©ãããããªé¢æ°ã使ããæéã®è¨æ¸¬ããã£ãæ¹ã並åã«åãã¦ãããã¨ãããããããã®ã§
// stop-watch def sw[A](body: => A): Unit = { val start = System.currentTimeMillis try { println(s"got [$body], elapsed time [${(System.currentTimeMillis - start) / 1000.0}]msec") } catch { case th: Throwable => println(s"got Exception[$th], elapsed time [${(System.currentTimeMillis - start) / 1000.0}]msec") } }
ã¨ããé¢æ°ã§å ãã§å®è¡ãããã®ã¨ãã¾ãã
ä¾ãã°ããããªæãã§ãã
sw { val f = future { wag(3, 5) } Await.result(f, Duration.Inf) } // => got [5], elapsed time [3.019]msec
Future#value
Futureã®å¤ãåå¾ã§ãã¾ãã
sw { val f = future { wag(3, 5) } f.value } // => got [None], elapsed time [0.012]msec sw { val f = future { wag(3, 5) } Await.ready(f, Duration.Inf) f.value } // => got [Some(Success(5))], elapsed time [3.003]msec sw { val f = future { throw new Exception } Await.ready(f, Duration.Inf) f.value } // => got [Some(Failure(java.lang.Exception))], elapsed time [0.001]msec
å¥ã«çµæã®å¾ ã¡åãããããããã§ããªãã®ã§ãè¨ç®å¦çãçµäºãã¦ããªãæã«ãã®ã¡ã½ãããå¼ã¶ã¨Noneãè¿ãã¾ããè¨ç®å¦çãçµäºãã¦ããå ´åã¯ãSomeã«å ã¾ãã¦çµæãè¿ãã¾ãã
Future#onComplete
Futureã®è¨ç®ã®æåã失æã«é¢ããããçµäºæã«å¼ã³åºãããã³ã¼ã«ããã¯é¢æ°ãç»é²ãã¾ããçµæã¯scala.util.Tryã®ã¤ã³ã¹ã¿ã³ã¹ã¨ãã¦æ¸¡ã£ã¦ããã®ã§ããã¿ã¼ã³ãããã§å¤å®ãã¾ãã
æåã¯Successã失æã¯Failureã¨ãªãã¾ãã
sw { val f = future { wag(3, 5) } f onComplete { case Success(n) => println(s"got success = $n") // => got success = 5 case Failure(e) => println(s"got error = $e") } Await.result(f, Duration.Inf) } // => got [5], elapsed time [3.018]msec sw { val f = future { throw new Exception } f onComplete { case Success(n) => println(s"got success = $n") case Failure(e) => println(s"got error = $e") // => got error = java.lang.Exception } Await.result(f, Duration.Inf) } // => got Exception[java.lang.Exception], elapsed time [0.002]msec
2ã¤ç®ã®æ¹ã¯ãFutureãä¾å¤ãæãã¦ããã®ã§swé¢æ°ã®catchç¯ã«æã¾ã£ããã¨ã«ãªãã¾ãããªããã³ã¼ã«ããã¯é¢æ°ã¯ï¼ããã¥ã¡ã³ãã®æ¸ãæ¹ã¯ææ§ã§ããï¼éåæã«å®è¡ããã¦ããããã§ãã
ã³ã¼ã«ããã¯é¢æ°ã¯ãè¤æ°ç»é²ãããã¨ãã§ãã¾ãã
sw { val f = future { wag(3, 5) } f onComplete { case Success(n) => println(s"1. got success = $n") case Failure(e) => println(s"1. got error = $e") } f onComplete { case Success(n) => println(s"2. got success = $n") case Failure(e) => println(s"2. got error = $e") } Await.result(f, Duration.Inf) }
ãã®å ´åããç»é²ããé¢æ°ã¯ç»é²é ã«é¢ä¿ãªãéåæã«å®è¡ãããããã§ãã
Future#onSuccessãFuture#onFailure
ãããããonCompleteã®ããã«ã³ã¼ã«ããã¯é¢æ°ãç»é²ããã¡ã½ããã§ãããããããæåæã¾ãã¯å¤±ææã®ã¿ã«ããå¼ã³åºãããªããã¨ãç°ãªãã¾ãã
sw { val f = future { wag(3, 5) } f onSuccess { case n => println(s"got success $n") // => got success 5 } f onFailure { case t => println(s"got failure $t") } Await.result(f, Duration.Inf) } // => got [5], elapsed time [3.017]msec sw { val f = future { throw new Exception } f onSuccess { case n => println(s"got success $n") } f onFailure { case t => println(s"got failure $t") // => got failure java.lang.Exception } Await.result(f, Duration.Inf) } // => got Exception[java.lang.Exception], elapsed time [0.008]msec
æåæã«ã¯onFailureã«ç»é²ããé¢æ°ãã失ææã«ã¯onSuccessã«ç»é²ããé¢æ°ãç¡è¦ããããã¨ã«ãªãã¾ãã
ãã¡ãããè¤æ°ã®ã³ã¼ã«ããã¯é¢æ°ãç»é²ãããã¨ãå¯è½ã§ãã
Future#mapãFuture#flatMapãFuture#foreachãFuture#withFilterãFuture#collect
ã馴æã¿ã®å¤æãå復ã¡ã½ããã§ããã
sw { val f = future { wag(3, 5) } map { n => wag(5, 10) } f foreach println // => 10 Await.result(f, Duration.Inf) } // => got [10], elapsed time [8.022]msec sw { val f1 = future { wag(3, 5) } val f2 = future { wag(5, 10) } val fs = f1 flatMap { n1 => f2 map { n2 => n1 + n2 } } Await.result(fs, Duration.Inf) } // => got [15], elapsed time [5.003]msec
æåã®mapã®ä¾ã§ã¯ãFutureã®è¨ç®çµæã«å¯¾ãã¦ããã«Futureã使ã£ãè¨ç®ããã¦ãã¾ãããã£ã¦ã2ã¤ç®ã®Futureãæåã®Futureã®çµæã«ä¾åãã¦ãããããé次å¦çã¨ãªãå®è¡æéã¯8ç§ã¨ãªã£ã¦ãã¾ãã
対ãã¦flatMapã®ä¾ã§ã¯ã2ã¤ã®Futureã®çµæãæããªããã°çµæã¯å¾ããã¾ãããã2ã¤ã®Futureã¯å¥ã
ã«è¨ç®ã§ãããã5ç§ã§å®è¡ãå®äºãã¦ãã¾ãã2ã¤ã®Futureã¯ã並åã«å®è¡ãããã¨ãããã¨ã§ããã
Futureã«ããã¦foreachã¯ãonSuccessã¨åãã¨èãã¦ããã¿ããã§ããã¾ããmapã¨flatMapã«æ¸¡ããé¢æ°ããFutureã®è¨ç®ãæåããªããã°å®è¡ããã¾ããããã®è¾ºãã¯ãOptionã®SomeãNoneã®é¢ä¿ã¨åãã¨èãã¦ããããã§ããã
mapãflatMapã使ããã®ã§ãå ã»ã©ã®flatMapã®ä¾ã¯forå¼ã§æ¸ãç´ããã¨ãã§ãã¾ãã
sw { val f1 = future { wag(3, 5) } val f2 = future { wag(5, 10) } val fs = for { n1 <- f1 n2 <- f2 } yield n1 + n2 Await.result(fs, Duration.Inf) }
withFilterã¨filterãããã®ã§ãforå¼å ã§ifã使ç¨ãããã¨ãã§ãã¾ãããã ãwithFilterï¼ã¾ãã¯filterï¼ã®çµæãfalseã¨ãªã£ãå ´åã¯ãFutureã®çµæã¨ãã¦ã¯java.util.NoSuchElementExceptionãã¹ãã¼ããããã¨ã«ãªãã¾ãã
sw { val f1 = future { wag(3, 5) } val f2 = future { wag(5, 10) } val f = for { n1 <- f1 if n1 > 5 n2 <- f2 } yield n1 + n2 Await.result(f, Duration.Inf) } // => got Exception[java.util.NoSuchElementException: Future.filter predicate is not satisfied], elapsed time [3.002]msec
collectã¯å²æãã
Future#recoverãFuture#recoverWith
Futureã®è¨ç®ã失æããå ´åã«ãä¾å¤ã®çµæããæ°ããªFutureãä½æãã¦ç¶ç¶ããããã®ã¡ã½ããã§ãã
sw { val f = future { 1 / 0 } recover { case th: ArithmeticException => 5 } Await.result(f, Duration.Inf) } // => got [5], elapsed time [0.035]msec sw { val f = future { 1 / 0 } recoverWith { case th: ArithmeticException => future { wag(3, 5) } } Await.result(f, Duration.Inf) } // => got [5], elapsed time [3.004]msec
ã¬ã¤ãã«ãæ¸ãã¦ãã¾ãããããããmapã¨flatMapã«ããä¼¼ã¦ãã¾ããããªããå¼æ°ã¯PartialFunctionãªã®ã§ãé¨åé¢æ°ã«æ¸¡ã£ã¦ããå¼æ°ã®ä¾å¤ã«å¯¾ãã¦ãã¿ã¼ã³ããããæåããªãã£ãå ´åã¯ãrecoverããã³recoverWithã®çµæãã¾ã失æãã¾ãã
Future#fallbackTo
recoverãrecoverWithã«ä¼¼ã¦ãã¾ããã失ææã®ä»£æ¿ã¨ãªãFutureãç´æ¥è¨å®ãã¾ãã
sw { val f = future { 1 / 0 } fallbackTo future { wag(3, 5) } Await.result(f, Duration.Inf) } // => got [5], elapsed time [3.017]msec
Future#andThen
å¯ä½ç¨ãèµ·ãããã¨ãç®çã¨ãã¦ãPartialFunctionãæå®ãã¾ããçµæã¯Futureãè¿ãã¾ãããandThenã§å¼ã³åºãããé¢æ°ãä½ãè¿ããã¨ãå ã®Futureã®å¤ãå¼ã³åºãå ã«ã¯è¿ãã¾ãã
sw { val f = future { wag(3, 5) } .andThen { case Success(n) => println(n); 20 } // 5 .andThen { case Success(n) => println(n); 10 } // 5 Await.result(f, Duration.Inf) } // => got [5], elapsed time [3.065]msec
ãã®ä¾ã§ã¯ãPartialFunctionã§æåã®Futureã¨å ¨ç¶é¢ä¿ãªãå¤ãè¿ãã¦ãã¾ãããandThenã«æ¸¡ãããå¼æ°ãAwait#resultã§å¾ãããçµæããæåã®Futureã®çµæã§å¾ããããã®ã§ãã
Future#zip
2ã¤ã®Futureãã¾ã¨ãããã¨ãã§ãã¾ãã
sw { val f = future { wag(3, 5) } zip future { wag(5, 10) } Await.result(f, Duration.Inf) } // => got [(5,10)], elapsed time [5.027]msec
並åå®è¡ããã¦ãããããªã®ã§ãå®è¡æéã¯5ç§ã¨ãªã£ã¦ãã¾ãã
æ®éã«ã¿ãã«ã«ããå ´åã¨ãä½ãéãã®ï¼ã¨ããã¨ããã§ãã
sw { val f = future { throw new Exception } zip future { wag(5, 10) } Await.result(f, Duration.Inf) } // => got Exception[java.lang.Exception], elapsed time [0.026]msec sw { val f = future { wag(3, 5) } zip future { throw new Exception } Await.result(f, Duration.Inf) } // => got Exception[java.lang.Exception], elapsed time [3.003]msec
ã©ã¡ããçæ¹ã§ã失æããå ´åã¯ããã®ä¾å¤ãå¾ããããã¨ã«ãªãã¾ãã
Future#failed
å®äºããFutureã失æããå ´åã«ãFuture[Throwable]ãè¿ãforeachï¼ãããã¯forå å 表è¨ï¼ã§ä½¿ããã¨ãã§ãã¾ãã
sw { val f = future { wag(3, 5) } f.failed.foreach(th => println(s"got Exception[${th}]")) Await.result(f, Duration.Inf) } sw { val f = future { Thread.sleep(3000L); 1 / 0 } f.failed.foreach(th => println(s"got Exception[${th}]")) // => got Exception[java.lang.ArithmeticException: / by zero] Await.result(f, Duration.Inf) } sw { val f = future { Thread.sleep(3000L); 1 / 0 } for (th <- f.failed) println(s"got Exception[${th}]") // => got Exception[java.lang.ArithmeticException: / by zero] Await.result(f, Duration.Inf) }
æåã®ã³ã¼ãã¯foreachã§ä½ãåºåãã¾ããã
Future#transform
Futureã®çµæãå¤æãããã¨ãã§ãã¾ããå¤æããããã®é¢æ°ã¯ãæåæã¨å¤±ææã«å¯¾ãã¦ããããè¨å®ãã¾ãã
sw { val f = future { wag(3, 5) } transform ({ n => n * 10}, { th => new RuntimeException(th) }) Await.result(f, Duration.Inf) } // => got [50], elapsed time [3.016]msec sw { val f = future { 1 / 0 } transform ({ n => n * 10}, { th => new RuntimeException(th) }) Await.result(f, Duration.Inf) } // => got Exception[java.lang.RuntimeException: java.lang.ArithmeticException: / by zero], elapsed time [0.004]msec
è¦ãç®çã«ãããªã«å¤ãããªãã®ã§ãããFuture#transformã®å®£è¨ã
def transform[S](s: (T) â S, f: (Throwable) â Throwable)(implicit executor: ExecutionContext): Future[S]
ã¨ãä¾å¤ãåããå ´åã¯ä¾å¤ãè¿å´ããã°ããã®ã§ãå¥ã«ã¹ãã¼ããªãã¦ãããã§ãããããAwait#resultã§ã¯ãå¤æããä¾å¤ãé£ãã§ãã¦ããããã§ããã
ããã¼ããã£ããé¢ç½ãã§ããã¾ã Futureãªãã¸ã§ã¯ãã¨Promiseãæ®ã£ã¦ããã®ã§ãããã¯æ¬¡å以éã«ã