Synchronized Concurrent IO Actions
Publish date: Feb 24, 2011
Last updated: Jan 2, 2025
Last updated: Jan 2, 2025
This is a bit of literate haskell code that works through a system for
running concurrent IO computations that are synchronized on a stream. So we
have many “nodes” of the type :: a -> IO ()
running concurrently on a single
stream, where we would like each node to wait until all nodes have processed a
stream element before any are allowed to proceed onto the next element.
This was motivated by the desire to simulate a system in which many nodes are interacting independently according to a time-synchronized algorithm. So the “stream” the nodes process is time.
I wanted to use real concurrency because it sounded fun. It would of course be possible to simulate a time-synchronized system without using real concurrency.
> module Main
> where
>
> import Control.Concurrent
> import Control.Monad
Demonstrating a system for coordinating independent threads that need to be
synchronized to a stream. For example we might be simulating some nodes that
each act independently performing some action at regular time intervals. In
this case the coordinating stream could be a list of ascending integers
representing time.
A node is a function over time. In this case it represents a clock, every
time it prints representing a tick. We want the nodes to be independent but
to simulate a time-snchronized system.
Here we have a simple example node that does nothing but take an Int and print
it:
> nodeSimple :: Int -> IO ()
> nodeSimple = putStr . show
the synchronizer takes a list of functions such as the one above, each
representing a different node, and a synchronizing stream of inputs. It then
runs the node functions concurrently and synchronously.
This is a simple version without worrying about actual concurrency:
> synchronizeSimple :: [a -> IO ()] -> [a] -> IO ()
> synchronizeSimple fs = mapM_ (\a-> mapM_ ($ a) fs)
Each "node" function executes on a given input before any of the nodes can move
onto the next input. This is the goal. In the concurrent implementation below,
the nodes may execute at any order during a single "step", but block until all
nodes have executed on the input.
> testSimpleSync = synchronizeSimple [nodeSimple,nodeSimple,nodeSimple] [1..5]
-------------------------------------------------------------------------------
Now we want to be able to replicate the above but have the "node" functions be
running in separate, but coordinating threads.
We will add to this the requirement and constraint that the input stream will be
infinite and each node must indicate when its job is done by returning a Bool at
each step, with False indicating it is no longer taking input:
> type Node a = a -> IO Bool
Here are three such Node types. We expect them to exit at different times
according to their guard predicate:
> nodeA, nodeB, nodeC :: Node Int
> nodeA a | a < 50 = comp a `seq` (putStr "A") >> return True
> | otherwise = return False
> nodeB a | a < 700 = comp a `seq` (putStr "B") >> return True
> | otherwise = return False
> nodeC a | a < 1000 = comp a `seq` (putStr "C") >> return True
> | otherwise = return False
>
> -- an expensive computation to see if things are actually running concurrently
> comp a = sum [a..10000]
And here a synchronizing function that uses a set of channels to coordinate the
action of the nodes, each of which are now a seperate thread:
> synchronize :: [Node a] -> [a] -> IO ()
> synchronize ns str = do
> -- knowing the number of nodes just lets the programmer be lazy for now
> let n = length ns
>
> -- the input stream gets pushed down this pipe, and read from duplicates
> -- of this Chan:
> broadcastChan <- newChan
>
> -- the pipe that all the nodes report back to and the synchronizer listens
> -- to, in order to know when to feed another stream value down the
> -- broadcast pipe:
> receiverChan <- newChan
>
> -- fork off the nodes, which will block until the coordinator starts:
> let runNode node = do
> ----SQUASHED BUG: forked before duplicating Chan caused Chan to
> ----be duplicated after first input sent through (I think):
> bChanDup <- dupChan broadcastChan
> forkIO $ nodeLoop bChanDup receiverChan node
> mapM_ runNode ns
>
> -- Start the computation by running the coordinator which will start by
> -- putting the first value in the broadcast Chan
> coordinator broadcastChan receiverChan n str
This function repeatedly runs a node action on the inputs, reporting each run
back up the receiverChan:
> nodeLoop :: Chan a -> Chan Bool -> Node a -> IO ()
> nodeLoop bChan rChan n = loop where
> loop = do
> -- read the next input (blocked until all have done previous):
> going <- n =<< readChan bChan
> ---- TODO: should we fork either of these:
> writeChan rChan going
> when going loop
Finally we have the coordinator function that listens to the receiver Chan and
pushes the next input down the (duplicated) broadcast chan after all nodes
report in, keeping track of the number of active nodes after each round so it
know how many will be reporting in through the Chan on the next round.
> coordinator :: Chan a -> Chan Bool -> Int -> [a] -> IO ()
> coordinator bChan rChan = loop where
> loop alive (a:as) = do
> putStr "|" --DEBUG: visually delimit rounds
> writeChan bChan a
> rs <- getChanContents rChan
> let died = length $ filter not $ take alive rs
> stillAlive = alive - died
> when (stillAlive > 0) $ loop stillAlive as
And here we have some tests:
> test2 = synchronize [nodeA,nodeB,nodeC] [1..]
>
> test3 = synchronize (concatMap (replicate 3) [nodeA,nodeB,nodeC]) [1..]
>
> main = test3
If the above reminds you of Iteratee, I'm right with you. Will be exploring
that next.
Thanks!