Preventing a GenStage producer from stopping due to insufficient demand

17/08/2022 tipselixir

A producer is designed to handle large volumes of data. If, in a handle_demand call, your producer cannot produce enough data to meet the requested demand, it simply stops, as it assumes there is nothing more to deliver.

Scheduling a new delivery

A simple way to work around this is to "schedule" a new delivery if the demand cannot be met. This is done by scheduling an event that will "wake up" the producer.

Commented code

defmodule Producer do
  use GenStage
  
  def init(_state), do: {:producer, :no_state}

  # our handle_demand will be delegated to the send_demand/1 function
  def handle_demand(demand, _state) when demand > 0, do: send_demand(demand)

  # handle_info will be called when our producer receives the scheduled event
  def handle_info({:send_demand, demand}, _state), do: send_demand(demand)

  # the send_demand/1 function will actually send the data from the producer to the consumer
  defp send_demand(demand) do
    data = get_data(demand)

    # if the demand is not met, schedule a message that will arrive in 1s in handle_info
    if length(data) < demand do
      Process.send_after(self(), {:send_demand, demand}, :timer.seconds(1))
    end

    {:noreply, data, :no_state}
  end

  # get_data will simulate the production of data from the producer
  # here it is always returning only 1 item (well below the demand)
  def get_data(_demand), do: [:rand.uniform()]
end

I have made available here a livebook with a producer and a consumer implemented as in this example.