Preventing a GenStage producer from stopping due to insufficient demand
A producer is designed to handle large volumes of data. If, in a handle_demand
call, your producer cannot produce enough data to meet the requested demand, it simply stops, as it assumes there is nothing more to deliver.
Scheduling a new delivery
A simple way to work around this is to "schedule" a new delivery if the demand cannot be met. This is done by scheduling an event that will "wake up" the producer.
Commented code
defmodule Producer do use GenStage def init(_state), do: {:producer, :no_state} # our handle_demand will be delegated to the send_demand/1 function def handle_demand(demand, _state) when demand > 0, do: send_demand(demand) # handle_info will be called when our producer receives the scheduled event def handle_info({:send_demand, demand}, _state), do: send_demand(demand) # the send_demand/1 function will actually send the data from the producer to the consumer defp send_demand(demand) do data = get_data(demand) # if the demand is not met, schedule a message that will arrive in 1s in handle_info if length(data) < demand do Process.send_after(self(), {:send_demand, demand}, :timer.seconds(1)) end {:noreply, data, :no_state} end # get_data will simulate the production of data from the producer # here it is always returning only 1 item (well below the demand) def get_data(_demand), do: [:rand.uniform()] end
I have made available here a livebook with a producer and a consumer implemented as in this example.