Oath: å®å ¨ãé«éãåæå¯è½ãªä¸¦è¡å¦ç
TL;DR
並è¡å¦çãç°¡æ½ãã¤å®å
¨ã«è¨è¿°ã§ããã©ã¤ãã©ãªãä½ã£ããApplicativeDoæ¡å¼µã使ã£ã¦ã以ä¸ã®ããã«oath
ã®å¼æ°ã¨ãã¦ä¸ããIOã¢ã¯ã·ã§ã³ãåæã«å®è¡ããçµæãéããå¦çãæ¸ããããããããä¾å¤ãæããå ´åãæ®ãããã£ã³ã»ã«ãããããªã½ã¼ã¹ãæ¼ããå¿é
ããªãã
evalOath $ do a <- oath $ ... b <- oath $ ... f a b
çµç·¯
Haskellã¯ä¸¦è¡å¦çãå¾æã¨ããã¦ãããäºå®ã軽éã¹ã¬ãããMVarãSTMã¨ãã£ãå¦çç³»ã«ãããµãã¼ããå å®ãã¦ãããHackageã®Concurrencyã«ãã´ãªã«ã¯235ãã®ããã±ã¼ã¸ããããã¨ãããã¦ã¼ã¶ã®é¢å¿ã®é«ãã窺ããã 並è¡å¦çãå®ç¨ããä¸ã§ã¯ãåã«ã¹ã¬ããããã©ã¼ã¯ããã ãã§ãªããè¨ç®ã®çµæãéããå¿ è¦ãããââScalaãªã©ã§è¨ãFutureã«è¿ããã®ãããã¨å¬ãããæ¡ã®å®ã並è¡è¨ç®ã®çµæãåãåºãããã®Haskellã©ã¤ãã©ãªã¯æ°å¤ãä½ããã¦ããã
futuresã¨ãããªããªãããååã®ããã±ã¼ã¸ããããAPIãããªãã·ã³ãã«ã ã ã¹ã¬ããããã©ã¼ã¯ãã¦ãè¨ç®çµæãMVarã«ä»£å ¥ããMVarã®ä¸èº«ãåãåºãã¢ã¯ã·ã§ã³ãè¿ãã¨ãããã®ã ã
fork :: IO a -> IO (Future a) fork io = do mv <- newEmptyMVar forkIO $ do a <- io putMVar mv a return $ Future $ readMVar mv
èãæ¹ã¯ãããããããããã®å®è£
ã«ã¯è´å½çãªæ¬ ç¹ãããââä¾å¤å¦çã§ãããforkã«ä¸ããã¢ã¯ã·ã§ã³ãä¾å¤ãçºçããã¦ãã誰ã対å¿ãã§ããªããããããputMVar
ãå¼ã°ããªããªãã®ã§readMVar
ãçºæ£ãã¦ãã¾ã(å®è¡æã¨ã©ã¼ã«ãªã)ãããã§ã¯å®ç¨ããã®ã¯é£ãããã¾ããforkIOã®çµæã§ããThreadIdãæ¨ã¦ããã¦ãããã¨ãããããããã«ãä¸åº¦å§ããè¨ç®ãå¤é¨ããæ¢ãããã¹ã¯ãªãã
spawnã¯ãFutureåã§ã¯ãªãç´æ¥IOãè¿ãç¹ãé¤ãã°futuresã¨ä¼¼ã¦ããã
spawnTry :: IO a -> IO (IO (Result a)) spawnTry m = do v <- newEmptyMVar _ <- mask $ \restore -> forkIO (try (restore m) >>= putMVar v) return (readMVar v) spawn :: IO a -> IO (IO a) spawn m = do r <- spawnTry m return (r >>= either throwIO return)
ãã¡ãã¯ãã¡ãã¨ä¾å¤å¦çããã¦ãããputMVarãå¼ã°ãããã¨ãä¿è¨¼ãããããããããå®ç¨çã ããããããã¯ãè¨ç®ã¯æ¢ããããªãã
promiseã¯ãApplicativeã¨Monadã®ã¤ã³ã¹ã¿ã³ã¹ã§ããPromiseåã売ãã ã
newtype Promise a = Promise { unPromise :: IO (Async a) } deriving (Functor) instance Applicative Promise where pure = Promise . async . return Promise mf <*> Promise mx = Promise $ do f <- mf x <- mx (f', x') <- waitBoth f x async $ return $ f' x' instance Monad Promise where return = liftIO . return Promise m >>= f = Promise $ m >>= wait >>= unPromise . f liftIO :: IO a -> Promise a liftIO = Promise . async
å¤å´ã®IOã§ã¹ã¬ããããã©ã¼ã¯ããå
å´ã®Asyncã§çµæãå¾
ã¤ã¨ããä»çµã¿ã®ããã ãã ããm <*> n
ã¯mã¨nããããã並è¡ãã¦å®è¡ããã®ã¡ã両æ¹ã®çµæãè¿ã£ã¦ããã®ããã®å ´ã§å¾
ã¤å¿
è¦ããã(ãå¾
ã¤Asyncãè¿ããã®ã§ã¯ãªãããå¾
ã£ã¦ããAsyncãè¿ãã)ãããã§ã¯(a *> b) *> c
ã¨a *> (b *> c)
ã®æåãç°ãªã£ã¦ãã¾ããããApplicativeã®åãæºãããªããã¾ããAsyncã¯æ¬æ¥cancel`ã§ããã¯ãã ããåæä¸ã«çµæãå¾
ã£ã¦ãã¾ãã®ã§äºå®ä¸ä¸æãã§ããªã(ã©ã¡ãã«ãããPromiseã®å®è£
ãé è½ããã¦ããã®ã§ã©ããããããªãã)ã
ããã«ãMonadã¯å·¦ã®è¨ç®ã®çµæãå¾
ã£ã¦ããå³ã«é²ãã¨ããæåã§ãApplicativeã¨ã®ä¸è²«æ§ããªãããããã¤ã³ã¹ã¿ã³ã¹ããã£ã¦ããããã§ããã°ã©ã ãçµã¿ç«ã¦ãã®ã¯é£ããã ããã
éåæå¦çã®å®çªã§ããasyncããã±ã¼ã¸ã«ã¯ãConcurrently
ã¨ããåãããã
newtype Concurrently a = Concurrently { runConcurrently :: IO a } instance Applicative Concurrently where pure = Concurrently . return Concurrently fs <*> Concurrently as = Concurrently $ (\(f, a) -> f a) <$> concurrently fs as instance Alternative Concurrently where empty = Concurrently $ forever (threadDelay maxBound) Concurrently as <|> Concurrently bs = Concurrently $ either id id <$> race as bs
(<*>)
ã¯concurrently :: IO a -> IO b -> IO (a, b)
ã使ã£ã¦ä¸¡æ¹ã®çµæãå¾
ã¤è¨ç®ã表ç¾ãã(<|>)
ã¯race :: IO a -> IO b -> IO (Either a b)
ã¯ã©ã¡ãããè¿ã£ã¦ããã¾ã§å¾
ã¤è¨ç®ã表ç¾ãããpromise
ã¨éãããã®å ´ã§å¾
æ©ããå¿
è¦ããªãã®ã§ãApplicativeåãæºããããã ã
concurrently
ããã³race
ã¯ãã¹ã¬ãããå¿
ãäºã¤ãã©ã¼ã¯ããä¸ãããªãè¤éãªå®è£
ãªã®ã§ããªã¼ãã¼ãããã大ãããã ãããã¾ã§è¦ãä¸ã§ã¯ä¸çªæ£ãããã§ä½¿ããããããªå®è£
ã ãããã£ã¨ããæ¹æ³ã¯ãªãã ãããã
ç¶ç¶ã¨STMã®ã³ã³ã
éåæå¦çãå®å ¨ã«åæããæ¹æ³ã«ã¯è¦ããããã単純で頑強なメッセージングシステム、franz - モナドとわたしとコモナドã§ç´¹ä»ãããç¶ç¶æ¸¡ããç¨ãã¦çµæãå¾ ã¤ãã©ã³ã¶ã¯ã·ã§ã³ã渡ãã¨ããä»çµã¿ã ããã®æ¹æ³ãªããç¶ç¶ãçµäºããã¿ã¤ãã³ã°ã§å¦çãä¸æã§ãããããå¾ ã¤ãããããèªç±èªå¨ããªã½ã¼ã¹ã®è§£æ¾æ¼ãã®å¿é ã¯ãªã(franzã¯ãã®ã¡ã«ããºã ã«ãã£ã¦éåæãªã¯ã¨ã¹ãã管çãã¦ãã)ããã®ã¨ãã»ã³ã¹ãç¬ç«ããã©ã¤ãã©ãªã«è¸çã§ãããã ãããã£ã½ãååã¯ã ãããåããã¦ããã®ã§ãPromiseããã®é£æ³ã§Oathã¨åä»ããã
newtype Oath a = Oath { runOath :: forall r. (STM a -> IO r) -> IO r } deriving Functor instance Applicative Oath where pure a = Oath $ \cont -> cont (pure a) Oath m <*> Oath n = Oath $ \cont -> m $ \f -> n $ \x -> cont (f <*> x)
Oath
ã¯ãIO (STM a)
ãCPSå¤æãããã®ã§ãCompose (Codensity IO) STM
ã¨ç価ã§ãã*1 *2ãå¤å´ã®IO
ã§è¨ç®ãèµ·åããå
å´ã®STM
ã§çµæãå¾
ã¤ãOath
ã¯Applicative
ã¤ã³ã¹ã¿ã³ã¹ãæã¡ãSTM (a -> b)
ã¨STM a
ãç¨æãã¦ãããããããåæããSTM b
ãè¿ãââè¨ç®ã®èµ·åã¨ãçµæã®å¾
æ©ãå¥ã
ã«åæããã¨ããããã ã
oath :: IO a -> Oath a oath act = Oath $ \cont -> do v <- newEmptyTMVarIO tid <- forkFinally act (atomically . putTMVar v) let await = readTMVar v >>= either throwSTM pure cont await `finally` killThread tid
oath
ã¯ãIOã¢ã¯ã·ã§ã³ãå¥ã®ã¹ã¬ããã§å®è¡ãããã®çµæãTMVarçµç±ã§åãåºããforkFinally
ãä¾å¤ãåãæ¢ããthrowSTM
ããããä¼ããã
ç殺ä¸å¥ªã®æ¨©ã¯ç¶ç¶ãæ¡ã£ã¦ãããçµäºããã¨ãã«ThreadKilled
ä¾å¤ãæããããããã®ãããªæåãwithAsyncãªã©ã§å®å
¨ã«è¡¨ç¾ãããã¨ããã¨ãwithAsync foo $ \a -> withAsync bar $ \b -> ...
ã®ããã«
ãã¹ããæ·±ããªã£ã¦ãã¾ããã¡ã ããOathã¯ç¶ç¶æ¸¡ãã®æ½è±¡åã«ãã£ã¦ãcombine <$> oathSTM foo <*> oathSTM bar
ã®ããã«ãã©ããã«æ¸ãããããã ãã§ãªããtraverse
ã§ã³ã³ããã«å¯¾ãã¦ãç°¡åã«é©ç¨ã§ããã¨ããå©ç¹ãããã
Oath
ãå®è¡ããã«ã¯ãåã«evalOath
ãå¼ã³åºãããã¡ãããçµæãå¾
ã¤ã¿ã¤ãã³ã°ãå¶å¾¡ãããå ´åã¯runOath
ãç´æ¥å¼ã¶ã¹ãå ´é¢ãããã ããã
evalOath :: Oath a -> IO a evalOath m = runOath m atomically
Alternative
ã®<|>
ã¯ãConcurrently
ã¨åæ§ä¸¡æ¹ã®è¨ç®ãèµ·åããããä¸æ¹ãå®äºãã段éã§ããçæ¹ã¯ãã£ã³ã»ã«ãã(STMã®Alternativeã¤ã³ã¹ã¿ã³ã¹ãç¶æ¿ãã¦ãã)ã
Control.Concurrent.STM.Delayã®ã©ããã¼ãæä¾ãã¦ããã<|>
ã§åæããã ãã§ãã¿ã¤ã ã¢ã¦ãå¦çãé常ã«ç°¡åã«è¨è¿°ã§ããã
ã¾ããããªã½ã¼ã¹ã®ç¢ºä¿ãã解æ¾ãã使ç¨ãã¨ããä¸ã¤ã®æåãä¸ã¤ã«ã¾ã¨ããããã¨ããç¶ç¶æ¸¡ãã®å¼·ã¿ãdelay
ã®å®è£
ã«ãã表ãã¦ããã
instance Alternative Oath where empty = Oath $ \cont -> cont empty Oath m <|> Oath n = Oath $ \cont -> m $ \a -> n $ \b -> cont (a <|> b) delay :: Int -> Oath () delay dur = Oath $ \cont -> bracket (newDelay dur) cancelDelay (cont . waitDelay)
Concurrently
ã¨éããOath
ã¯ãã©ã¼ã¯ã¯å¿
é ã§ã¯ãªãã®ããã¤ã³ãã ãä¾ãã°ãããã¯ã¼ã¯çµç±ã§ãªã¯ã¨ã¹ããéä¿¡ããå¦çã¯æ¸ããé åºã§å®è¡ããçµæãå¾
ã¤é¨åã ãéåæã«ããã¨ãã£ãå®è£
ãã§ãããforkOn
ãªã©ãforkIO
以å¤ã®ãã©ã¼ã¯ã使ããããèªç±åº¦ãé«ãããããããã©ã¼ã¯ããªãã¨ããé¸æè¢ãããããã決å®æ§ãæ±ããããåä½ãã¹ãã®å®è£
ãªã©ã«ãå½¹ã«ç«ã¤ã ããã
ããã©ã¼ãã³ã¹
æå¾ã«ãConcurrently
ã¨æ¯è¼ããããã®ãã³ããã¼ã¯ãè¡ã£ããä»ã®ããã±ã¼ã¸ã¯ã¾ã¨ãã«åä½ããªããããè©ä¾¡ã®å¯¾è±¡ããå¤ãã(åä½ãã¹ãã¯æ®ãã¦ãã)ã
, bench "oath STM 100" $ nfIO $ O.evalOath $ traverse (O.oath . pure) [0 :: Int ..99] , bench "async 100" $ nfIO $ A.runConcurrently $ traverse (A.Concurrently . pure) [0 :: Int ..99]
oath IO 10: OK (0.86s) 3.18 μs ± 206 ns oath STM 10: OK (0.34s) 5.10 μs ± 169 ns async 10: OK (0.66s) 10.0 μs ± 190 ns oath IO 100: OK (0.60s) 35.8 μs ± 2.4 μs oath STM 100: OK (0.39s) 48.0 μs ± 1.3 μs async 100: OK (0.21s) 100 μs ± 5.6 μs
ãªã¼ãã¼ãããã¯Concurrently
ã®ç´ååã«æãããã¦ãããConcurrentlyã¯(<*>)
ã®å·¦å³ãã¨ã«ã¹ã¬ããããã©ã¼ã¯ãããããé
ã®äºåãã©ã¼ã¯ããå¿
è¦ãããã¨ããç¹ã表ãã¦ããã®ã ããã
ã¾ã¨ã
Oath
ã¯ãç¶ç¶æ¸¡ãã¹ã¿ã¤ã«ã¨STMãçµã¿åããããã¨ã«ãã£ã¦ãæè»ãå®å
¨ãé«éãããã¦åæå¯è½ãªéåæå¦çã®è¡¨ç¾ãå¯è½ã«ãããåã«IOã¢ã¯ã·ã§ã³ãçµã¿ç«ã¦ãéå
·ã¨ãã¦ã便å©ã ããä»å¾ã¯Oathã«åºã¥ããAPIãã¶ã¤ã³ã«ãä¸èã®ä½å°ãããã¨èãã¦ããã並è¡å¦çã®æ°å®çªãçã£ã¦ããããã