Impedindo um producer GenStage de parar por não cumprir a demanda

17/08/2022 tipselixir

Um producer é feito para lidar com grande volume de dados, se em uma chamada do handle_demand o seu producer não conseguir produzir dados o suficiente para a demanda solicitada, ele simplesmente para, pois se assume que não tem mais nada ali para ele entregar.

Agendando uma nova entrega

Uma maneira simples de contornar isso é, caso a demanda não possa ser atendida, "agendar" uma nova entrega. Isso é feito programando o envio de um evento que irá servir para "despertar" o producer.

Código comentado

defmodule Producer do
  use GenStage
  
  def init(_state), do: {:producer, :no_state}

  # nosso handle demand vai ser delegado à função send_demand/1
  def handle_demand(demand, _state) when demand > 0, do: send_demand(demand)

  # o handle_info irá ser chamado quando o nosso producer receber o evento que foi agendado
  def handle_info({:send_demand, demand}, _state), do: send_demand(demand)

  # a função send_demand/1 é quem irá realmente enviar os dados do producer para quem quer que esteja consumindo
  defp send_demand(demand) do
    data = get_data(demand)

    # se a demanda não for atendida, agenda uma mensagem que chegará daqui a 1s no handle_info
    if length(data) < demand do
      Process.send_after(self(), {:send_demand, demand}, :timer.seconds(1))
    end

    {:noreply, data, :no_state}
  end

  # o get_data vai simular a produção de dados do producer
  # aqui ele está sempre retornando somente 1 item (bem menos que a demanda)
  def get_data(_demand), do: [:rand.uniform()]
end

Disponibilizei aqui um livebook com um producer e um consumer implementados como nesse exemplo.