Section exercise: Write a helper function which allows you to pass actions to worker threads, and which properly handles exceptions for all of these actions.
The async package provides
functionality for performing actions asynchronously, across multiple threads.
While it's built on top of the forkIO
function from base (in
Control.Concurrent
), the async package improves on this in many ways:
- It has graceful and thorough handling of exceptions
- It builds in a way to get results back from a child thread
- There is an
STM
interface for accessing thread results, providing for a convenient way to deal with such things as blocking operations waiting for results - Thread cancelation is made easy and reliable
- For some very common use cases, the
race
andconcurrently
functions, as well as theConcurrently
newtype wrapper, can give you a huge bang for your buck.
There is little cognitive overhead to using this package. The primary datatype
it exposes is Async
. An Async a
value represents a separate thread which
will ultimately generate a value of type a
. The package follows some pretty
standard naming conventions:
async*
forks a thread and returns anAsync
valuewithAsync*
forks a thread and provides theAsync
value to the provided inner action. The forked thread is killed when the inner action exits.- You can
wait
for the result of anAsync
,poll
to check if it's complete, orcancel
it - By default,
wait
ing will rethrow any exceptions thrown by the forked thread. Use the variants that sayCatch
to catch the exceptions. - Many wait operations have
STM
variants to make them more easily composable
To warm up, we'll start with examples of using concurrently
, race
, and the
Concurrently
newtype. These are simpler (and more efficient) variants of the
more general Async
-based interface. The general rule with this library is: if
you can get away with concurrently
/race
/Concurrently
, you should.
#!/usr/bin/env stack
-- stack script --resolver lts-11.10
import Control.Concurrent
import Control.Concurrent.Async
action1 :: IO Int
action1 = do
threadDelay 500000 -- just to make it interesting
return 5
action2 :: IO String
action2 = do
threadDelay 1000000
return "action2 result"
main :: IO ()
main = do
res <- concurrently action1 action2
print (res :: (Int, String))
As you can see, the concurrently
function waits until both operations
complete, and then returns both results in a tuple. In contrast, the race
function returns only the first one to complete:
#!/usr/bin/env stack
-- stack script --resolver lts-11.10
import Control.Concurrent
import Control.Concurrent.Async
action1 :: IO Int
action1 = do
threadDelay 500000 -- just to make it interesting
return 5
action2 :: IO String
action2 = do
threadDelay 1000000
return "action2 result"
main :: IO ()
main = do
res <- race action1 action2
print (res :: Either Int String)
Exercise: what will this program output?
The Concurrently
newtype wrapper uses the concurrently
function to
implement its Applicative
instance, and race
to implement its Alternative
instance. We can demonstrate that, though the code will be quite a bit more
verbose:
#!/usr/bin/env stack
-- stack script --resolver lts-11.10
import Control.Applicative
import Control.Concurrent
import Control.Concurrent.Async
action1 :: IO Int
action1 = do
threadDelay 500000 -- just to make it interesting
return 5
action2 :: IO String
action2 = do
threadDelay 1000000
return "action2 result"
main :: IO ()
main = do
res1 <- runConcurrently $ (,)
<$> Concurrently action1
<*> Concurrently action2
print (res1 :: (Int, String))
res2 <- runConcurrently
$ (Left <$> Concurrently action1)
<|> (Right <$> Concurrently action2)
print (res2 :: Either Int String)
While this seems tedious for such an example, the Concurrently
newtype can be
great for larger scale cases, such as when we want to discard some results.
#!/usr/bin/env stack
-- stack script --resolver lts-11.10
import Control.Concurrent.Async
import Data.Foldable (traverse_)
type Score = Int
data Person = Person FilePath Score
people :: [Person]
people =
[ Person "alice.txt" 50
, Person "bob.txt" 60
, Person "charlie.txt" 70
]
-- | This function returns a unit value that we don't care about. Using
-- concurrently on two such actions would give us ((), ()).
writePerson :: Person -> IO ()
writePerson (Person fp score) = writeFile fp (show score)
-- | Let's write lots of people to their respective files in parallel, instead
-- of sequentially.
writePeople :: [Person] -> IO ()
writePeople = runConcurrently . traverse_ (Concurrently . writePerson)
-- Note: traverse_ is just mapM_ for Applicative instead instead of Monad.
-- Remember, Concurrently is _not_ a Monad instance.
main :: IO ()
main = writePeople people
When either child thread throws an exception, that exception is thrown to the other thread:
#!/usr/bin/env stack
-- stack script --resolver lts-11.10
import Control.Concurrent
import Control.Concurrent.Async
import Control.Exception
action1 :: IO Int
action1 = error "action1 errored"
action2 :: IO String
action2 = handle onErr $ do
threadDelay 500000
return "action2 completed"
where
onErr e = do
putStrLn $ "action2 was killed by: " ++ displayException e
throwIO (e :: SomeException)
main :: IO ()
main = do
res <- concurrently action1 action2
print res
You'll get some interleaving of output most likely since string-based I/O work character-by-character, but you should get the idea from running this.
Exercises:
- Use
Data.Text.IO
instead of string-based I/O to avoid the interleaved output - Replace
concurrently
withrace
. What result do you get?
There's a neat trick you can accomplish with race
when you want a companion
thread to continue running as long as the main thread is in operation:
#!/usr/bin/env stack
-- stack script --resolver lts-11.10
import Control.Concurrent
import Control.Concurrent.Async
import Control.Exception
-- | Print successive numbers to stdout. Notice how it returns @a@ instead of
-- @()@. This lets the type system know that, under normal circumstances, this
-- function will never exit.
counter :: IO a
counter =
let loop i = do
putStrLn $ "counter: " ++ show i
threadDelay 1000000
loop $! i + 1
in loop 1
-- | This function will continue to run counter with whatever action you've
-- provided, and stop running counter once that action exits. If by some chance
-- counter throws an exception, it will take down your thread as well.
withCounter :: IO a -> IO a
withCounter inner = do
res <- race counter inner
case res of
Left x -> assert False x
Right x -> return x
-- More succintly
-- withCounter = fmap (either id id) . race counter
main :: IO ()
main = do
putStrLn "Before withCounter"
threadDelay 2000000
withCounter $ do
threadDelay 2000000
putStrLn "Inside withCounter"
threadDelay 2000000
threadDelay 2000000
putStrLn "After withCounter"
threadDelay 2000000
putStrLn "Exiting!"
Exercises:
- Why does the
assert False
never get triggered (aka, why doesrace
never return aLeft
value)? - In the "more succinct" version of the code, how does the
either id id
accomplish the same job as the pattern matching? - Extra credit: could you replace one of the
id
s ineither id id
withData.Void.absurd
?
Advanced While it's nice to be able to run companion threads, it can be
restricting to require that your main thread live in IO
. Perhaps you want to
have your main thread live in some monad transformer stack on top of IO
instead. Using the powerful (and complex) monad-control package, we can capture
the monadic state to make this work.
#!/usr/bin/env stack
-- stack script --resolver lts-11.10
{-# LANGUAGE FlexibleContexts #-}
import Control.Concurrent
import Control.Concurrent.Async
import Control.Exception
import Control.Monad.Reader
import Control.Monad.Trans.Control
-- | Print successive numbers to stdout. Notice how it returns @a@ instead of
-- @()@. This lets the type system know that, under normal circumstances, this
-- function will never exit.
counter :: IO a
counter =
let loop i = do
putStrLn $ "counter: " ++ show i
threadDelay 1000000
loop $! i + 1
in loop 1
-- | This function will continue to run counter with whatever action you've
-- provided, and stop running counter once that action exits. If by some chance
-- counter throws an exception, it will take down your thread as well.
withCounter :: MonadBaseControl IO m => m a -> m a
withCounter inner = control $ \runInIO -> do
res <- race counter (runInIO inner)
case res of
Left x -> assert False x
Right x -> return x
-- More succintly
-- withCounter = fmap (either id id) . race counter
main :: IO ()
main = do
putStrLn "Before withCounter"
threadDelay 2000000
flip runReaderT "some string" $ withCounter $ do
liftIO $ threadDelay 2000000
str <- ask
liftIO $ putStrLn $ "Inside withCounter, str == " ++ str
liftIO $ threadDelay 2000000
threadDelay 2000000
putStrLn "After withCounter"
threadDelay 2000000
putStrLn "Exiting!"
Sometimes you need a bit more control than is offered by the
Concurrently
family. In those cases, you'll want to use the Async
type and its associated functions. The core type for all of this is:
data Async a
This represents an action running in a different thread which, if
successful, will give a result of type a
. There are lots of things
you can do to interact with that thread:
- See if it's still running
- Wait for it to finish
- Get its result
- Kill it early
Most of this is pretty straightforward, but there are three possibly surprising things worth pointing out right off the bat:
- I used the phrase "if successful" above. It's possible that the
action will instead fail with a runtime exception. Therefore, any
function which gets a result also needs to deal with a possible
SomeException
. As you'll see, a number of the functions here deal with that case by simply rethrowing that exception in the calling thread. - In order to allow for more composability (as we'll see later), the
ability to query a thread's status can be performed from within an
STM
transaction. Most (if not all) of theseSTM
functions also haveIO
variants, but that's just for user convenience. - Killing a thread early, or canceling it, involves throwing it an async exception. If your action misbehaves with async exceptions, canceling may fail. We'll demonstrate that below, but I recommend checking out the safe-exceptions tutorial for advice on doing this right.
But before we can get into details, let's just see the launching of
some basic Async
s.
#!/usr/bin/env stack
-- stack script --resolver lts-11.10
import Control.Concurrent (threadDelay)
import Control.Concurrent.Async
import Control.Monad
import Say
talker :: String -> IO ()
talker str = forever $ do
sayString str
threadDelay 500000
getResult :: IO Int
getResult = do
sayString "Doing some big computation..."
threadDelay 2000000
sayString "Done!"
return 42
main :: IO ()
main = do
async1 <- async $ talker "async"
withAsync (talker "withAsync") $ \async2 -> do
async3 <- async getResult
res <- poll async3
case res of
Nothing -> sayString "getResult still running"
Just (Left e) -> sayString $ "getResult failed: " ++ show e
Just (Right x) -> sayString $ "getResult finished: " ++ show x
res <- waitCatch async3
case res of
Left e -> sayString $ "getResult failed: " ++ show e
Right x -> sayString $ "getResult finished: " ++ show x
res <- wait async3
sayString $ "getResult finished: " ++ show (res :: Int)
sayString "withAsync talker should be dead, but not async"
threadDelay 2000000
sayString "Now killing async talker"
cancel async1
threadDelay 2000000
sayString "Goodbye!"
This demonstrates the two most common ways of launching an Async
:
- The
async
function will launch a newAsync
without any plans to kill the thread. It's your responsibility to do so if you wish, explicitly, viacancel
withAsync
will launch a thread and run an action with that thread running. When the supplied action finishes, that thread iscancel
ed.
The advantage of withAsync
is that it is exception safe.
Exercise Implement your own withAsync
using bracket
, async
,
and cancel
.
This also demonstrated three of the most popular functions for
querying an Async
's status:
poll
checks if a thread is still running. If it's complete, it gives you aJust
withEither
theSomeException
it failed with, or the result.waitCatch
will block until the thread exits, and then give youEither
theSomeException
or the result.wait
also blocks, but in the case of aSomeException
will throw it as a runtime exception.
Exercise Implement wait
in terms of waitCatch
. Can you
efficiently implement waitCatch
in terms of poll
?
In general, you should prefer withAsync
in place of async
to avoid
having unneeded threads running. If you specifically need a thread to
outlive a certain block, then async
is the right function to use.
Let's simplify our example above a bit, and instead of using the IO
variants of the functions, use the STM
functions. This should look
pretty familiar:
#!/usr/bin/env stack
-- stack script --resolver lts-11.10
import Control.Concurrent (threadDelay)
import Control.Concurrent.Async
import Control.Concurrent.STM
import Control.Monad
import Say
getResult :: IO Int
getResult = do
sayString "Doing some big computation..."
threadDelay 2000000
sayString "Done!"
return 42
main :: IO ()
main = withAsync getResult $ \a -> do
res <- atomically $ pollSTM a
case res of
Nothing -> sayString "getResult still running"
Just (Left e) -> sayString $ "getResult failed: " ++ show e
Just (Right x) -> sayString $ "getResult finished: " ++ show x
res <- atomically $ waitCatchSTM a
case res of
Left e -> sayString $ "getResult failed: " ++ show e
Right x -> sayString $ "getResult finished: " ++ show x
res <- atomically $ waitSTM a
sayString $ "getResult finished: " ++ show (res :: Int)
All we've done is append STM
to the function names and stick an
atomically
at the front. On its own, this isn't too exciting, just
informative.
Exercise Implement both waitCatchSTM
and waitSTM
in terms of
pollSTM
Exercise Now implement pollSTM
in terms of waitCatchSTM
.
So what's so great about this STM business? For one, it can allow us
to do more sophisticated queries, like racing two Async
s:
#!/usr/bin/env stack
-- stack script --resolver lts-11.10
import Control.Applicative ((<|>))
import Control.Concurrent (threadDelay)
import Control.Concurrent.Async
import Control.Concurrent.STM
import Say
getResult1 :: IO Int
getResult1 = do
sayString "Doing some big computation..."
threadDelay 2000000
sayString "Done!"
return 42
getResult2 :: IO Int
getResult2 = do
sayString "Doing some smaller computation..."
threadDelay 1000000
sayString "Done!"
return 41
main :: IO ()
main = do
res <- withAsync getResult1 $ \a1 ->
withAsync getResult2 $ \a2 ->
atomically $ waitSTM a1 <|> waitSTM a2
sayString $ "getResult finished: " ++ show (res :: Int)
Yes, in this case using the race
function would make more sense, but
as problems grow in complexity, this flexibility will be important.
Exercise Identify the behavior of this program if getResult2
throws an exception. Can you modify the code so that, if either of the
threads completes successfully, we get a result?
Look at how easy it is to break our program completely:
#!/usr/bin/env stack
-- stack script --resolver lts-11.10
import Control.Concurrent (threadDelay)
import Control.Concurrent.Async
import Control.Exception
import Control.Monad
import Say
evil :: IO ()
evil = forever $ do
eres <- try $ threadDelay 1000000
sayShow (eres :: Either SomeException ())
main :: IO ()
main = withAsync evil $ const $ return ()
This code will loop forever, since the cancel
call's exception is
caught by the try
and execution continues indefinitely. This is
very bad code, don't do this!
Exercise Switch the import from Control.Exception
to
UnliftIO.Exception
. What happens? Why?
Let's say you want to run some kind of a background processing thread. You want to kick it off, leave it running, and essentially forget all about it. However, if that thread goes down for some reason, you need to ensure that your main thread goes down too. Such a situation makes a lot of sense, for example, with a job queue.
To make this work, you can use the link
function, which ensures that
if your Async
ends with an exception, that exception is rethrown to
the main thread.
#!/usr/bin/env stack
-- stack script --resolver lts-11.10
{-# LANGUAGE OverloadedStrings #-}
import Control.Concurrent (threadDelay)
import Control.Concurrent.Async
import Control.Concurrent.STM
import Control.Monad (forever)
import Say (say)
import Data.Text (Text)
data Work = Work Text -- intentionally lazy, you'll see why below
jobQueue :: TChan Work -> IO a
jobQueue chan = forever $ do
Work t <- atomically $ readTChan chan
say t
main :: IO ()
main = do
chan <- newTChanIO
a <- async $ jobQueue chan
link a
forever $ do
atomically $ do
writeTChan chan $ Work "Hello"
writeTChan chan $ Work undefined
writeTChan chan $ Work "World"
threadDelay 1000000
Question What's the behavior without the link
call?
Question What's the behavior if Work
was strict in its Text
?
How about if we forced evaluation with writeTChan chan $! Work $! undefined
?
Exercise Rewrite this program to use race
instead of async
and
link
.
All of the functions in Control.Concurrent.Async
live in either the
STM
or IO
types. Many of the IO
functions—like
cancel
—could be lifted to MonadIO
with a simple liftIO
call. However, many others, like async
, cannot (since they have the
IO
type appearing as an input, otherwise known as negative
position). That would seem to exclude the possibility of using monad
transformers.
We can make usage of a transformer like ReaderT
work by manually
wrapping/unwrapping its constructor. Let's adapt the jobQueue
example from above to use a ReaderT
and see how that goes:
#!/usr/bin/env stack
-- stack script --resolver lts-11.10
{-# LANGUAGE OverloadedStrings #-}
import Control.Concurrent (threadDelay)
import Control.Concurrent.Async
import Control.Concurrent.STM
import Control.Monad (forever)
import Say (say)
import Data.Text (Text)
import Control.Monad.Reader
data Work = Work Text
jobQueue :: ReaderT (TChan Work) IO a
jobQueue = forever $ do
chan <- ask
Work t <- liftIO $ atomically $ readTChan chan
say t
inner :: ReaderT (TChan Work) IO ()
inner = ReaderT $ \chan -> do
race_ (runReaderT jobQueue chan) $ flip runReaderT chan $ do
forever $ do
chan <- ask
liftIO $ atomically $ do
writeTChan chan $ Work "Hello"
writeTChan chan $ Work undefined
writeTChan chan $ Work "World"
liftIO $ threadDelay 1000000
main :: IO ()
main = do
chan <- newTChanIO
runReaderT inner chan
Workable, but tedious. Fortunately, the unliftio
package provides a
version of the async API which is lifted to many more monads. Let's
see how a simple change in import lets us write much nicer code.
#!/usr/bin/env stack
-- stack script --resolver lts-10.3
{-# LANGUAGE OverloadedStrings #-}
import UnliftIO.Async
import UnliftIO.Concurrent (threadDelay)
import UnliftIO.STM
import Control.Monad (forever)
import Say (say)
import Data.Text (Text)
import Control.Monad.Reader
data Work = Work Text
jobQueue :: ReaderT (TChan Work) IO a
jobQueue = forever $ do
chan <- ask
Work t <- liftIO $ atomically $ readTChan chan
say t
inner :: ReaderT (TChan Work) IO ()
inner = do
race_ jobQueue $ do
forever $ do
chan <- ask
liftIO $ atomically $ do
writeTChan chan $ Work "Hello"
writeTChan chan $ Work undefined
writeTChan chan $ Work "World"
liftIO $ threadDelay 1000000
main :: IO ()
main = do
chan <- newTChanIO
runReaderT inner chan
All of the runReaderT
/ReaderT
mess in inner
completely
disappeared.
This will work for any monad stack which is an instance of
MonadUnliftIO
, which allows for transformers like ReaderT
or
IdentityT
, but disallows transformers like StateT
and WriterT
as
they will result in discarded monadic state. Imagine a function like
concurrently (put 5) (put 6)
. Which state will survive? It's frankly
arbitrary, and there are three valid options to consider:
- The first one
- The second one
- Discard both states
Using UnliftIO.Async
as a drop in replacement for
Control.Concurrent.Async
is straightforward, the only API change to
be aware of is that the Async
data type will take an extra type
paramter for the underlying monad.
Write a helper function which allows you to pass actions to worker
threads, and which properly handles exceptions for all of these
actions. You'll want to use closable queues from stm-chans
. Imagine
how you'd allow parallelizing a loop that looks like:
myLoop = do
mnext <- getNextItem
case mnext of
Nothing -> pure ()
Just next -> do
handleItem next -- want to do this in a worker
myLoop
If you just use async
or forkIO
:
- You'll get unbounded worker threads created
- You'll incur the overhead of forking for each item
- You won't have any handling of exceptions
Hint: consider a helper function like:
type PerformInWorker = ... -- what should this type be?
withWorkers
:: Int -- ^ worker count
-> (PerformInWorker -> IO a)
-> IO a
Bonus:
- Generalize to
MonadUnliftIO