GenStage is a miracle of UX for developers. There's only one catch: because of its purpose, it's built for the machine, not for the developer. Let's go over what GenStage is for, and see just how simple it is to start using it.

Why would you use GenStage?

GenStage is what you would use if you had "some work" (events) that needed to go through one or more transformations, which you could easily split into multiple parallelizable steps. In the world of GenStage, the producer is what will hold the events that the consumers will then work on.

How to think about GenStage

GenStage is based on two fundamental principles:

  1. Consumers request events from producers. The producers' only job is to feed the consumer when it is hungry. Consumer not hungry? No feeding.
  2. GenStage "stages" exist to be split in a way that benefits the computer's constraints, not the programmer's constraints

In other words:

  1. GenStage has a built-in a backpressure mechanism: work is not pushed to the workers, the workers pull when they are ready for more work. The only complexity is that every callback should check the demand and make sure we produce enough to match that demand. Thankfully, this is always the same work, so it can be extracted to a function.
  2. Use producers, producer-consumers, and consumers to use computing resources (e.g. CPU, I/O, RAM) effectively. Do not use them to organize your domain model.

Let's start using GenStage

First, a test project

mix new genstage_test

cd genstage_test

Edit your mix.exs file and make sure your deps look like this:

1
2
3
4
  defp deps do
    [
      {:gen_stage, "~> 0.12.1"}
    ]

mix deps.get

Now, GenStage

I'm not going to cover everything about GenStage here - once you've read this blog entry, you'll be able to read the documentation ( https://hexdocs.pm/gen_stage/GenStage.html ) and know which bits go where, without the shadow of a doubt.

GenStage is basically broken down into three kinds of processes: producer, producer-consumer, and consumer. A producer will hold the events that will be requested by consumers, and a producer-consumer will request events from a producer and then be able to produce events, which means you can create a chain of transformations very easily.

For our example, let us say the "events" will be incoming geolocations in the form of latitude/longitude pairs, that we will need to transform them into the name of the closest city, and then count the number of times a city was found. Let's say each "event" will be of the form {latitude, longitude} (e.g. {42, -72})

This seems like it could be represented by a producer, a producer-consumer, and then a consumer! How wonderfully coincidental.

The first thing that we need to consider is the source of the events. It could be anything, such as a Stream, or a directory on the filesystem, or a database table. As long as the producer can query it to figure out what to produce, we're fine. To make the example perfectly clear and self-contained, we're going to use a simple list that we'll put in the producer's state.

First, let's write a mock geolocation service:

1
2
3
4
5
6
7
defmodule Geolocation do
  @cities ["New York", "Constantinople", "Istanbul", "Paris", "London"]
  def call({_lat, _long}) do
    :timer.sleep(500) # This operation takes a while
    Enum.random(@cities)
  end
end

Here's the code for the producer. Remember that this is only responsible for fulfilling demand. You'll see what I mentioned earlier: in every single callback, whether a cast, a call, or the demand callback, we have to "return events to be produced", so at least for us here it's easier to have a single function that creates our new state as well as the events to return.

This might not be the implementation you want in your code. The GenStage documentation has a slightly more streamlined version of this that always returns :noreply, at the cost of a slightly more complex handle_call behavior.

 1
 2
 3
 4
 5
 6
 7
 8
 9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
defmodule Producer do
  use GenStage

  def start_link() do
    GenStage.start_link(__MODULE__, %{})
  end

  def init(state) do
    new_state =
      state
      |> Map.put(:geolocations, [])
      |> Map.put(:demand, 0)

    {:producer, new_state}
  end

  def handle_cast({:add_geolocations, geolocations}, state) do
    new_geolocations = state.geolocations ++ geolocations
    state = Map.put(state, :geolocations, new_geolocations)
    {geolocations_to_produce, new_state} = prepare_return_values(state.demand, state)
    {:noreply, geolocations_to_produce, new_state}
  end

  def handle_call(:queue_size, _from, state) do
    {geolocations_to_produce, new_state} = prepare_return_values(state.demand, state)
    {:reply, length(new_state.geolocations), geolocations_to_produce, new_state}
  end

  def handle_demand(demand, state) when demand > 0 do
    total_demand = demand + state.demand
    {geolocations_to_produce, new_state} = prepare_return_values(total_demand, state)
    {:noreply, geolocations_to_produce, new_state}
  end

  defp prepare_return_values(demand, state) do
    {geolocations_to_produce, geolocations_left} = Enum.split(state.geolocations, demand)
    remaining_demand = demand - length(geolocations_to_produce)
    new_state =
      state
      |> Map.put(:geolocations, geolocations_left)
      |> Map.put(:demand, remaining_demand)
    {geolocations_to_produce, new_state}
  end
end

The state needs to keep track of the unfulfilled demand, because when we receive events, we have to be able to produce them and backfill demand - not just wait for the next time we get a request for events. That's what the :demand key is for.

For painfully didactic reasons, I have a handle_cast and a handle_call, to show you that their return tuple behaves the same was as the handle_demand='s return tuple. We =cast to add to the queue because we don't care about the reply (maybe we should, but I'm a bad person), but we do care about knowing the size of the queue, maybe so we can add more consumers, so we use a call for that.

Now let's take a look at the next step in the chain: taking the geolocations and converting them to cities.

 1
 2
 3
 4
 5
 6
 7
 8
 9
10
11
12
13
14
15
  defmodule ProducerConsumer do
    use GenStage

    def start_link, do: GenStage.start_link(__MODULE__, %{})

    def init(state) do
      {:producer_consumer, state}
    end

    def handle_events(geolocations, _from, state) do
      IO.inspect geolocations
      cities = Enum.map(geolocations, &Geolocation.call/1)
      {:noreply, cities, state}
    end
  end

And now let's take a look at the code for the final stage, counting the cities:

 1
 2
 3
 4
 5
 6
 7
 8
 9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
  defmodule Consumer do
    use GenStage

    def start_link() do
      GenStage.start_link(__MODULE__, :ok)
    end

    def init(:ok) do
      # A slight abstraction leak here requires us to return something as the state
      {:consumer, :ok}
    end

    def handle_events(cities, _from, state) do
      aggregated_cities = Enum.reduce(
        cities,
        %{},
        fn(city, acc) -> Map.update(acc, city, 1, &(&1 + 1)) end
      )
      IO.inspect aggregated_cities
      # A slight abstraction leak here requires us to return an empty list
      {:noreply, [], state}
    end
  end

The consumer is quite simple, isn't it. In the handle_events callback, we do the work. Here we artifially sleep to slow it down, and we print out the events we received to get a sense of what each consumer is doing.

So... Wait. Are we done? Let's try it. iex -S mix ...

1
2
3
4
5
{:ok, p_pid} = Producer.start_link()
{:ok, pc_pid} = ProducerConsumer.start_link()
{:ok, c_pid} = Consumer.start_link()
GenStage.sync_subscribe(c_pid, to: pc_pid, max_demand: 5)
GenStage.sync_subscribe(pc_pid, to: p_pid, max_demand: 10)

Well, we're clearly in a state where the consumers have asked for events and we couldn't fulfill. Let's give the producer some things to produce. They won't be real locations, but then again, our location service is fake anyway.

1
2
locations = (1..1000) |> Enum.to_list |> Enum.map(fn(x) -> {x, x} end)
GenStage.cast(p_pid, {:add_geolocations, locations})

Well would you look at that. Yes, I'd say we're done. Isn't it beautiful? ... And maybe a little surprising? Explaining the rest is beyond the scope of this introduction, unfortunately, but you should now be ready for the gen_stage documentation.

What you've just read is the fundamentals of GenStage - it's what I wish I'd read when I started. Summary:

  • The consumer asks for events
  • The producer keeps track of the amount of requested events
  • The producer pulls events from the source until demand hits zero or until there's no more events to pull, then waits.
  • The consumer gets whatever the producer is able to send back, and then does hopefully meaningful work.