Impedindo um producer GenStage de parar por não cumprir a demanda
Um producer é feito para lidar com grande volume de dados, se em uma chamada do handle_demand
o seu producer não conseguir produzir dados o suficiente para a demanda solicitada, ele simplesmente para, pois se assume que não tem mais nada ali para ele entregar.
Agendando uma nova entrega
Uma maneira simples de contornar isso é, caso a demanda não possa ser atendida, "agendar" uma nova entrega. Isso é feito programando o envio de um evento que irá servir para "despertar" o producer.Código comentado
defmodule Producer do use GenStage def init(_state), do: {:producer, :no_state} # nosso handle demand vai ser delegado à função send_demand/1 def handle_demand(demand, _state) when demand > 0, do: send_demand(demand) # o handle_info irá ser chamado quando o nosso producer receber o evento que foi agendado def handle_info({:send_demand, demand}, _state), do: send_demand(demand) # a função send_demand/1 é quem irá realmente enviar os dados do producer para quem quer que esteja consumindo defp send_demand(demand) do data = get_data(demand) # se a demanda não for atendida, agenda uma mensagem que chegará daqui a 1s no handle_info if length(data) < demand do Process.send_after(self(), {:send_demand, demand}, :timer.seconds(1)) end {:noreply, data, :no_state} end # o get_data vai simular a produção de dados do producer # aqui ele está sempre retornando somente 1 item (bem menos que a demanda) def get_data(_demand), do: [:rand.uniform()] end
Disponibilizei aqui um livebook com um producer e um consumer implementados como nesse exemplo.