Writing a streaming twitter waterflow solution
Last updated: Jan 3, 2025
In this post Philip Nilsson describes an inspiring, principled approach to solving a toy problem posed in a programming interview. I wanted to implement a solution to a variant of the problem where we’d like to process a stream. It was pretty easy to sketch a solution out on paper but Philip’s solution was invaluable in testing and debugging my implementation. (See also Chris Done’s mind-melting loeb approach)
My goal was to have a function:
waterStream :: [Int] -> [Int]
that would take a possibly-infinite list of columns and return a stream of known water quantities, where volumes of water were output as soon as possible. We can get a solution to the original problem, then, with
ourWaterFlow = sum . waterStream
Here is the solution I came up with, with inline explanation:
{-# LANGUAGE BangPatterns #-}
-- start processing `str` initializing the highest column to the left at 0, and
-- an empty stack.
waterStream :: [Int] -> [Int]
waterStream str = processWithMax 0 str []
processWithMax :: Int -> [Int] -> [(Int,Int)] -> [Int]
processWithMax prevMax = process
where
process [] = const []
-- output the quantity of water we know we can get, given the column at the
-- head of the stream, `y`:
process (y:ys) = eat 1
where
eat !n xxs@((offset,x):xs)
-- done with `y`, push it and its offset onto the stack
| y < x = process ys ((n,y):xxs)
-- at each "rise" we can output some known quantity of water;
-- storing the "offset" as we did above lets us calculate water
-- above a previously filled "valley"
| otherwise = let col = offset*(min y prevMax - x)
cols = eat (n+offset) xs
-- filter out zeros:
in if col == 0 then cols else col : cols
-- if we got to the end of the stack, then `y` is the new highest
-- column we've seen.
eat !n [] = processWithMax y ys [(n,y)]
The bit about “offsets” is the tricky part which I don’t know how to explain without a pretty animation.
Correctness
It took me much longer than I was expecting to code up the solution above that
worked on a few hand-drawn test cases, and at that point I didn’t have high
confidence that the code was correct, so I turned to
quickcheck and
assert
.
First I wanted to make sure the invariant that the “column” values in the stack were strictly increasing held:
import Control.Exception (assert)
...
--process (y:ys) = eat 1
process (y:ys) stack = assert (stackSane stack) $ eat 1 stack
...
Then I used Philip’s solution (which I had confidence in):
waterFlow :: [Int] -> Int
waterFlow h = sum $
zipWith (-)
(zipWith min (scanl1 max h) (scanr1 max h))
h
to test my implementation:
*Waterflow> import Test.QuickCheck
*Waterflow Test.QuickCheck> quickCheck (\l -> waterFlow l == ourWaterFlow l)
*** Failed! Falsifiable (after 21 tests and 28 shrinks):
[1,0,0,0,1]
Oops! It turned out I had a bug in this line (fixed above):
--old buggy:
--cols = eat (n+1) xs
--new fixed:
cols = eat (n+offset) xs
Performance
The solution seems to perform pretty well, processing 1,000,000 Int
s in 30ms
on my machine:
import Criterion.Main
main = do
gen <- create
rs <- replicateM 1000000 $ uniformR (0,100) gen
defaultMain [ bench "ourWaterFlow" $ whnf ourWaterFlow rs
I didn’t get a good look at space usage over time, as I was testing with
mwc-random
which doesn’t seem to support creating a lazy infinite list of
randoms and didn’t want to hunt down another library. Obviously on a stream
that simply descends forever, our stack of (Int,Int)
will grow to infinite
size.
It seems as though there is a decent amount of parallelism that could be exploited in this problem, but I didn’t have any luck on a quick attempt.
Thoughts?
Have a parallel solution, or something just faster? Or an implementation that doesn’t need a big stack of previous values?