# Writing a streaming twitter waterflow solution

In this post Philip Nilsson describes an inspiring, principled approach to solving a toy problem posed in a programming interview. I wanted to implement a solution to a variant of the problem where we’d like to process a stream. It was pretty easy to sketch a solution out on paper but Philip’s solution was invaluable in testing and debugging my implementation. (See also Chris Done’s mind-melting loeb approach)

My goal was to have a function:

```
waterStream :: [Int] -> [Int]
```

that would take a possibly-infinite list of columns and return a stream of
*known water quantities*, where volumes of water were output as soon as
possible. We can get a solution to the original problem, then, with

```
ourWaterFlow = sum . waterStream
```

Here is the solution I came up with, with inline explanation:

```
{-# LANGUAGE BangPatterns #-}
-- start processing `str` initializing the highest column to the left at 0, and
-- an empty stack.
waterStream :: [Int] -> [Int]
waterStream str = processWithMax 0 str []
processWithMax :: Int -> [Int] -> [(Int,Int)] -> [Int]
processWithMax prevMax = process
where
process [] = const []
-- output the quantity of water we know we can get, given the column at the
-- head of the stream, `y`:
process (y:ys) = eat 1
where
eat !n xxs@((offset,x):xs)
-- done with `y`, push it and its offset onto the stack
| y < x = process ys ((n,y):xxs)
-- at each "rise" we can output some known quantity of water;
-- storing the "offset" as we did above lets us calculate water
-- above a previously filled "valley"
| otherwise = let col = offset*(min y prevMax - x)
cols = eat (n+offset) xs
-- filter out zeros:
in if col == 0 then cols else col : cols
-- if we got to the end of the stack, then `y` is the new highest
-- column we've seen.
eat !n [] = processWithMax y ys [(n,y)]
```

The bit about “offsets” is the tricky part which I don’t know how to explain without a pretty animation.

## Correctness

It took me much longer than I was expecting to code up the solution above that
worked on a few hand-drawn test cases, and at that point I didn’t have high
confidence that the code was correct, so I turned to
quickcheck and
`assert`

.

First I wanted to make sure the invariant that the “column” values in the stack were strictly increasing held:

```
import Control.Exception (assert)
...
--process (y:ys) = eat 1
process (y:ys) stack = assert (stackSane stack) $ eat 1 stack
...
```

Then I used Philip’s solution (which I had confidence in):

```
waterFlow :: [Int] -> Int
waterFlow h = sum $
zipWith (-)
(zipWith min (scanl1 max h) (scanr1 max h))
h
```

to test my implementation:

```
*Waterflow> import Test.QuickCheck
*Waterflow Test.QuickCheck> quickCheck (\l -> waterFlow l == ourWaterFlow l)
*** Failed! Falsifiable (after 21 tests and 28 shrinks):
[1,0,0,0,1]
```

Oops! It turned out I had a bug in this line (fixed above):

```
--old buggy:
--cols = eat (n+1) xs
--new fixed:
cols = eat (n+offset) xs
```

## Performance

The solution seems to perform pretty well, processing 1,000,000 `Int`

s in 30ms
on my machine:

```
import Criterion.Main
main = do
gen <- create
rs <- replicateM 1000000 $ uniformR (0,100) gen
defaultMain [ bench "ourWaterFlow" $ whnf ourWaterFlow rs
```

I didn’t get a good look at space usage over time, as I was testing with
`mwc-random`

which doesn’t seem to support creating a lazy infinite list of
randoms and didn’t want to hunt down another library. Obviously on a stream
that simply descends forever, our stack of `(Int,Int)`

will grow to infinite
size.

It seems as though there is a decent amount of parallelism that could be exploited in this problem, but I didn’t have any luck on a quick attempt.

## Thoughts?

Have a parallel solution, or something just faster? Or an implementation that doesn’t need a big stack of previous values?