diff --git a/config/config.exs b/config/config.exs index 33bd682e..8f365c3e 100644 --- a/config/config.exs +++ b/config/config.exs @@ -73,6 +73,6 @@ config :concentrate, file_tap: [ enabled?: false ], - http_producer: Concentrate.Producer.HTTPoison + http_producer: Concentrate.Producer.Mint import_config "#{Mix.env()}.exs" diff --git a/lib/concentrate/producer/mint.ex b/lib/concentrate/producer/mint.ex new file mode 100644 index 00000000..6910ecab --- /dev/null +++ b/lib/concentrate/producer/mint.ex @@ -0,0 +1,374 @@ +defmodule Concentrate.Producer.Mint do + @moduledoc """ + HTTP producer implementation using Mint. + """ + use GenStage + require Logger + alias Mint.HTTP + @start_link_opts ~w(name)a + @default_fetch_after 5_000 + @default_timeout 30_000 + @default_transport_opts [timeout: @default_timeout] + @default_headers %{"accept-encoding" => "gzip"} + + defmodule State do + @moduledoc false + + defstruct [ + :url, + :parser, + fetch_after: nil, + opts: %{}, + transport_opts: [], + headers: %{}, + conn: :not_connected, + ref: :not_connected, + demand: 0, + response: nil, + events: [] + ] + end + + alias __MODULE__.State + + def start_link({url, opts}) when is_binary(url) and is_list(opts) do + {start_link_opts, opts} = Keyword.split(opts, @start_link_opts) + GenStage.start_link(__MODULE__, {url, opts}, start_link_opts) + end + + @impl GenStage + def init({url, opts}) do + opts = Map.new(opts) + + parser = + case Map.fetch!(opts, :parser) do + module when is_atom(module) -> + &module.parse(&1, []) + + {module, opts} when is_atom(module) and is_list(opts) -> + &module.parse(&1, opts) + + fun when is_function(fun, 1) -> + fun + end + + state = %State{ + url: url, + parser: parser, + fetch_after: Map.get(opts, :fetch_after, @default_fetch_after), + transport_opts: + Keyword.take(Map.get(opts, :get_opts, @default_transport_opts), ~w(timeout send_timeout)a), + headers: Map.merge(@default_headers, Map.get(opts, :headers, %{})), + opts: opts + } + + { + :producer, + state, + dispatcher: GenStage.BroadcastDispatcher + } + end + + @impl GenStage + def handle_demand(new_demand, %{demand: existing_demand} = state) do + demand = new_demand + existing_demand + state = %{state | demand: demand} + + state = + if existing_demand == 0 do + make_request(state, state.url) + else + state + end + + {:noreply, [], state} + end + + @impl GenStage + def handle_info({:fetch, url}, state) do + state = + if state.demand > 0 do + make_request(state, url) + else + state + end + + {:noreply, [], state} + end + + def handle_info({:fetch_timeout, ref}, %{ref: ref} = state) do + log_message(:warn, state, fn -> "fetch timed out, disconnecting" end) + + _ = + if state.conn != :not_connected and HTTP.open?(state.conn) do + HTTP.close(state.conn) + end + + state = %{state | conn: :not_connected, ref: nil} + state = fetch_again!(state) + {:noreply, [], state} + end + + def handle_info({:fetch_timeout, _}, state) do + {:noreply, [], state} + end + + def handle_info(message, state) do + state = + case HTTP.stream(state.conn, message) do + {:ok, conn, responses} -> + Enum.reduce(responses, %{state | conn: conn}, &handle_responses/2) + + {:error, conn, error, _responses} -> + log_message(:warn, state, fn -> + "HTTP error error=#{inspect(error)} + " + end) + + state = %{state | conn: conn, ref: nil} + fetch_again!(state) + + :unknown -> + log_message(:warn, state, fn -> "unknown message message=#{inspect(message)}" end) + state + end + + {:noreply, Enum.reverse(state.events), %{state | events: []}} + end + + def handle_responses({:status, ref, status}, %{ref: ref} = state) do + %{state | response: {status, [], []}} + end + + def handle_responses({:headers, ref, headers}, %{ref: ref} = state) do + {status, old_headers, body} = state.response + + cache_headers = + Enum.reduce(headers, state.headers, fn {header, value}, acc -> + case String.downcase(header) do + "last-modified" -> + Map.put(acc, "if-modified-since", value) + + "etag" -> + Map.put(acc, "if-none-match", value) + + _ -> + acc + end + end) + + # don't use if-none-match if we already have if-modified-since + cache_headers = + case cache_headers do + %{"if-modified-since" => _, "if-none-match" => _} -> + Map.delete(cache_headers, "if-none-match") + + _ -> + cache_headers + end + + %{state | response: {status, headers ++ old_headers, body}, headers: cache_headers} + end + + def handle_responses({:data, ref, data}, %{ref: ref} = state) do + {status, headers, body} = state.response + + %{state | response: {status, headers, [body | data]}} + end + + def handle_responses({:done, ref}, %{ref: ref} = state) do + {status, headers, body} = state.response + handle_http_response(state, status, headers, body) + end + + def handle_responses(response, state) do + log_message(:warn, state, fn -> + "unexpected response=#{inspect(response)}" + end) + + state + end + + def make_request(state, url) do + state = connect(state, url) + + if state.conn != :not_connected do + {_, _, _, path} = parse_url(url) + {:ok, conn, ref} = HTTP.request(state.conn, "GET", path, Map.to_list(state.headers), "") + + Process.send_after( + self(), + {:fetch_timeout, ref}, + Keyword.get(state.transport_opts, :timeout) + ) + + %{state | conn: conn, ref: ref} + else + fetch_again!(state) + end + end + + defp connect(%{conn: :not_connected} = state, url) do + {scheme, host, port, _} = parse_url(url) + + case HTTP.connect(scheme, host, port, + transport_opts: state.transport_opts, + protocols: [:http1] + ) do + {:ok, conn} -> + %{state | conn: conn} + + {:error, _} -> + state + end + end + + defp connect(state, url) do + # if HTTP.open?(conn) do + # state + # else + state + |> disconnect() + |> connect(url) + + # end + end + + defp disconnect(%{conn: conn} = state) do + _ = HTTP.close(conn) + %{state | conn: :not_connected, ref: :not_connected} + end + + defp handle_http_response(state, 200, headers, body) do + body = decode_body(body, find_header(headers, "content-encoding")) + {time, parsed} = :timer.tc(state.parser, [body]) + + log_message(:info, state, fn -> + "updated: records=#{length(parsed)} time=#{time / 1000}" + end) + + state = %{ + state + | events: [parsed | state.events], + demand: max(state.demand - 1, 0), + response: nil + } + + fetch_again!(state) + rescue + error -> + state = log_parse_error(error, state, System.stacktrace()) + fetch_again!(state) + catch + error -> + state = log_parse_error(error, state, System.stacktrace()) + fetch_again!(state) + end + + defp handle_http_response(state, redirect, headers, _body) when redirect in [301, 302] do + {:ok, location} = find_header(headers, "location") + state = disconnect(state) + + if redirect == 301 do + state = %{state | url: location} + fetch_again!(state, fetch_after: 0) + else + fetch_again!(state, url: location, fetch_after: 0) + end + end + + defp handle_http_response(state, 304, _headers, _body) do + log_message(:info, state, fn -> "not modified status=304" end) + fetch_again!(state) + end + + defp handle_http_response(state, 404, _headers, _body) do + log_message(:warn, state, fn -> "not found status=404" end) + fetch_again!(state) + end + + defp decode_body(body, {:ok, "gzip"}) do + :zlib.gunzip(body) + end + + defp decode_body(body, _) do + IO.iodata_to_binary(body) + end + + defp fetch_again!(state, opts \\ []) do + _ = + if state.demand > 0 do + url = Keyword.get(opts, :url, state.url) + fetch_after = Keyword.get(opts, :fetch_after, state.fetch_after) + Process.send_after(self(), {:fetch, url}, fetch_after) + end + + state + end + + def find_header(headers, query) do + value = + Enum.find_value(headers, fn {header, value} -> + if String.downcase(header) == query do + value + else + nil + end + end) + + if value do + {:ok, value} + else + :error + end + end + + defp log_parse_error(error, machine, trace) do + _ = + Logger.error(fn -> + "#{__MODULE__}: parse error url=#{machine.url} error=#{inspect(error)}\n#{ + Exception.format_stacktrace(trace) + }" + end) + + machine + end + + @doc """ + Parse URL into the pieces needed for connecting to Mint. + + iex> parse_url("https://mbta.com/developers") + {:https, "mbta.com", 443, "/developers"} + + iex> parse_url("http://localhost:8080/path?query=string#fragement") + {:http, "localhost", 8080, "/path?query=string"} + """ + def parse_url(url) when is_binary(url) do + uri = URI.parse(url) + + scheme = + case uri.scheme do + "https" -> :https + "http" -> :http + nil -> :http + end + + path = + if uri.query do + "#{uri.path}?#{uri.query}" + else + uri.path + end + + {scheme, uri.host, uri.port, path} + end + + defp log_message(level, state, log_fn) do + _ = + Logger.log(level, fn -> + "#{__MODULE__} #{log_fn.()} url=#{inspect(state.url)}" + end) + + :ok + end +end diff --git a/mix.exs b/mix.exs index 25dda8e1..2db0db47 100644 --- a/mix.exs +++ b/mix.exs @@ -71,6 +71,8 @@ defmodule Concentrate.MixProject do {:gen_stage, "~> 1.0"}, {:gpb, "~> 4.7", only: :dev, runtime: false, only: :dev}, {:httpoison, "~> 1.0"}, + {:mint, "~> 1.0"}, + {:castore, "~> 0.1"}, {:jason, "~> 1.0"}, {:stream_data, "~> 0.4", only: :test}, {:tzdata, "~> 1.1.1"} diff --git a/mix.lock b/mix.lock index 83e6f6ba..003956f5 100644 --- a/mix.lock +++ b/mix.lock @@ -2,6 +2,7 @@ "benchee": {:hex, :benchee, "1.0.1", "66b211f9bfd84bd97e6d1beaddf8fc2312aaabe192f776e8931cb0c16f53a521", [:mix], [{:deep_merge, "~> 1.0", [hex: :deep_merge, repo: "hexpm", optional: false]}], "hexpm", "3ad58ae787e9c7c94dd7ceda3b587ec2c64604563e049b2a0e8baafae832addb"}, "bunt": {:hex, :bunt, "0.2.0", "951c6e801e8b1d2cbe58ebbd3e616a869061ddadcc4863d0a2182541acae9a38", [:mix], [], "hexpm", "7af5c7e09fe1d40f76c8e4f9dd2be7cebd83909f31fee7cd0e9eadc567da8353"}, "bypass": {:hex, :bypass, "2.1.0", "909782781bf8e20ee86a9cabde36b259d44af8b9f38756173e8f5e2e1fabb9b1", [:mix], [{:plug, "~> 1.7", [hex: :plug, repo: "hexpm", optional: false]}, {:plug_cowboy, "~> 2.0", [hex: :plug_cowboy, repo: "hexpm", optional: false]}, {:ranch, "~> 1.3", [hex: :ranch, repo: "hexpm", optional: false]}], "hexpm", "d9b5df8fa5b7a6efa08384e9bbecfe4ce61c77d28a4282f79e02f1ef78d96b80"}, + "castore": {:hex, :castore, "0.1.22", "4127549e411bedd012ca3a308dede574f43819fe9394254ca55ab4895abfa1a2", [:mix], [], "hexpm", "c17576df47eb5aa1ee40cc4134316a99f5cad3e215d5c77b8dd3cfef12a22cac"}, "certifi": {:hex, :certifi, "2.9.0", "6f2a475689dd47f19fb74334859d460a2dc4e3252a3324bd2111b8f0429e7e21", [:rebar3], [], "hexpm", "266da46bdb06d6c6d35fde799bcb28d36d985d424ad7c08b5bb48f5b5cdd4641"}, "cowboy": {:hex, :cowboy, "2.8.0", "f3dc62e35797ecd9ac1b50db74611193c29815401e53bac9a5c0577bd7bc667d", [:rebar3], [{:cowlib, "~> 2.9.1", [hex: :cowlib, repo: "hexpm", optional: false]}, {:ranch, "~> 1.7.1", [hex: :ranch, repo: "hexpm", optional: false]}], "hexpm", "4643e4fba74ac96d4d152c75803de6fad0b3fa5df354c71afdd6cbeeb15fac8a"}, "cowboy_telemetry": {:hex, :cowboy_telemetry, "0.3.1", "ebd1a1d7aff97f27c66654e78ece187abdc646992714164380d8a041eda16754", [:rebar3], [{:cowboy, "~> 2.7", [hex: :cowboy, repo: "hexpm", optional: false]}, {:telemetry, "~> 0.4", [hex: :telemetry, repo: "hexpm", optional: false]}], "hexpm", "3a6efd3366130eab84ca372cbd4a7d3c3a97bdfcfb4911233b035d117063f0af"}, @@ -19,6 +20,7 @@ "gen_stage": {:hex, :gen_stage, "1.1.0", "dd0c0f8d2f3b993fdbd3d58e94abbe65380f4e78bdee3fa93d5618d7d14abe60", [:mix], [], "hexpm", "7f2b36a6d02f7ef2ba410733b540ec423af65ec9c99f3d1083da508aca3b9305"}, "gpb": {:hex, :gpb, "4.18.0", "305548ad991583f4b9809e905d6a17475ab8df85116cb3c1269fda1e4424c7ea", [:make, :rebar3], [], "hexpm", "c2bb843866e627e1e0181c2f5e4b3ee79cbb8cd3574f66767e7f1d5130d6b025"}, "hackney": {:hex, :hackney, "1.18.1", "f48bf88f521f2a229fc7bae88cf4f85adc9cd9bcf23b5dc8eb6a1788c662c4f6", [:rebar3], [{:certifi, "~> 2.9.0", [hex: :certifi, repo: "hexpm", optional: false]}, {:idna, "~> 6.1.0", [hex: :idna, repo: "hexpm", optional: false]}, {:metrics, "~> 1.0.0", [hex: :metrics, repo: "hexpm", optional: false]}, {:mimerl, "~> 1.1", [hex: :mimerl, repo: "hexpm", optional: false]}, {:parse_trans, "3.3.1", [hex: :parse_trans, repo: "hexpm", optional: false]}, {:ssl_verify_fun, "~> 1.1.0", [hex: :ssl_verify_fun, repo: "hexpm", optional: false]}, {:unicode_util_compat, "~> 0.7.0", [hex: :unicode_util_compat, repo: "hexpm", optional: false]}], "hexpm", "a4ecdaff44297e9b5894ae499e9a070ea1888c84afdd1fd9b7b2bc384950128e"}, + "hpax": {:hex, :hpax, "0.1.2", "09a75600d9d8bbd064cdd741f21fc06fc1f4cf3d0fcc335e5aa19be1a7235c84", [:mix], [], "hexpm", "2c87843d5a23f5f16748ebe77969880e29809580efdaccd615cd3bed628a8c13"}, "httpoison": {:hex, :httpoison, "1.8.0", "6b85dea15820b7804ef607ff78406ab449dd78bed923a49c7160e1886e987a3d", [:mix], [{:hackney, "~> 1.17", [hex: :hackney, repo: "hexpm", optional: false]}], "hexpm", "28089eaa98cf90c66265b6b5ad87c59a3729bea2e74e9d08f9b51eb9729b3c3a"}, "idna": {:hex, :idna, "6.1.1", "8a63070e9f7d0c62eb9d9fcb360a7de382448200fbbd1b106cc96d3d8099df8d", [:rebar3], [{:unicode_util_compat, "~>0.7.0", [hex: :unicode_util_compat, repo: "hexpm", optional: false]}], "hexpm", "92376eb7894412ed19ac475e4a86f7b413c1b9fbb5bd16dccd57934157944cea"}, "jason": {:hex, :jason, "1.4.0", "e855647bc964a44e2f67df589ccf49105ae039d4179db7f6271dfd3843dc27e6", [:mix], [{:decimal, "~> 1.0 or ~> 2.0", [hex: :decimal, repo: "hexpm", optional: true]}], "hexpm", "79a3791085b2a0f743ca04cec0f7be26443738779d09302e01318f97bdb82121"}, @@ -27,6 +29,7 @@ "metrics": {:hex, :metrics, "1.0.1", "25f094dea2cda98213cecc3aeff09e940299d950904393b2a29d191c346a8486", [:rebar3], [], "hexpm", "69b09adddc4f74a40716ae54d140f93beb0fb8978d8636eaded0c31b6f099f16"}, "mime": {:hex, :mime, "1.5.0", "203ef35ef3389aae6d361918bf3f952fa17a09e8e43b5aa592b93eba05d0fb8d", [:mix], [], "hexpm", "55a94c0f552249fc1a3dd9cd2d3ab9de9d3c89b559c2bd01121f824834f24746"}, "mimerl": {:hex, :mimerl, "1.2.0", "67e2d3f571088d5cfd3e550c383094b47159f3eee8ffa08e64106cdf5e981be3", [:rebar3], [], "hexpm", "f278585650aa581986264638ebf698f8bb19df297f66ad91b18910dfc6e19323"}, + "mint": {:hex, :mint, "1.5.1", "8db5239e56738552d85af398798c80648db0e90f343c8469f6c6d8898944fb6f", [:mix], [{:castore, "~> 0.1.0 or ~> 1.0", [hex: :castore, repo: "hexpm", optional: true]}, {:hpax, "~> 0.1.1", [hex: :hpax, repo: "hexpm", optional: false]}], "hexpm", "4a63e1e76a7c3956abd2c72f370a0d0aecddc3976dea5c27eccbecfa5e7d5b1e"}, "parallel_stream": {:hex, :parallel_stream, "1.0.6", "b967be2b23f0f6787fab7ed681b4c45a215a81481fb62b01a5b750fa8f30f76c", [:mix], [], "hexpm", "639b2e8749e11b87b9eb42f2ad325d161c170b39b288ac8d04c4f31f8f0823eb"}, "parse_trans": {:hex, :parse_trans, "3.3.1", "16328ab840cc09919bd10dab29e431da3af9e9e7e7e6f0089dd5a2d2820011d8", [:rebar3], [], "hexpm", "07cd9577885f56362d414e8c4c4e6bdf10d43a8767abb92d24cbe8b24c54888b"}, "plug": {:hex, :plug, "1.11.1", "f2992bac66fdae679453c9e86134a4201f6f43a687d8ff1cd1b2862d53c80259", [:mix], [{:mime, "~> 1.0", [hex: :mime, repo: "hexpm", optional: false]}, {:plug_crypto, "~> 1.1.1 or ~> 1.2", [hex: :plug_crypto, repo: "hexpm", optional: false]}, {:telemetry, "~> 0.4", [hex: :telemetry, repo: "hexpm", optional: false]}], "hexpm", "23524e4fefbb587c11f0833b3910bfb414bf2e2534d61928e920f54e3a1b881f"}, diff --git a/test/concentrate/producer/mint_test.exs b/test/concentrate/producer/mint_test.exs new file mode 100644 index 00000000..147b13c3 --- /dev/null +++ b/test/concentrate/producer/mint_test.exs @@ -0,0 +1,347 @@ +defmodule Concentrate.Producer.MintTest do + @moduledoc false + use ExUnit.Case, async: false + import Concentrate.Producer.Mint + import Plug.Conn, only: [get_req_header: 2, put_resp_header: 3, send_resp: 3] + + doctest Concentrate.Producer.Mint + + defmodule TestParser do + @behaviour Concentrate.Parser + def parse(_body, _opts), do: [] + end + + describe "init/1" do + test "parser can be a module" do + assert {:producer, state, _} = init({"http://url", parser: __MODULE__.TestParser}) + end + + test "parser can be a module with options" do + assert {:producer, state, _} = + init({"http://url", parser: {__MODULE__.TestParser, [opt: 1]}}) + end + end + + describe "handle_info/2" do + @tag :capture_log + test "ignores unknown messages" do + {:producer, state, _} = init({"url", parser: &[&1]}) + assert {:noreply, [], ^state} = handle_info(:unknown, state) + end + + @tag :capture_log + test "does not send more demand than requested" do + {:producer, state, _} = init({"url", parser: &[&1], demand: 1}) + first_response = make_ref() + + responses = [ + {:status, first_response, 200}, + {:headers, first_response, []}, + {:data, first_response, "body"}, + {:done, first_response} + ] + + {state, events} = + Enum.reduce(responses, {state, []}, fn response, {state, events} -> + case handle_info(response, state) do + {:noreply, new_events, state} -> + {state, events ++ new_events} + end + end) + + assert [_] = events + + second_response = make_ref() + + responses = [ + {:status, second_response, 200}, + {:headers, second_response, []}, + {:data, second_response, "body"}, + {:done, second_response} + ] + + {state, events} = + Enum.reduce(responses, {state, []}, fn response, {state, events} -> + case handle_info(response, state) do + {:noreply, new_events, state} -> + {state, events ++ new_events} + end + end) + + assert events == [] + assert state.demand == 0 + end + end + + describe "handle_demand/3" do + test "only send messages if there was no previous demand" do + {_, state, _} = init({"url", parser: & &1}) + {:noreply, _, state} = handle_demand(1, state) + assert_receive {:fetch, "url"} + # there's demand now, so more incoming demand shouldn't reschedule + handle_demand(1, state) + refute_receive {:fetch, "url"} + end + end + + @moduletag timeout: 5_000 + + describe "bypass" do + setup do + Application.ensure_all_started(:bypass) + bypass = Bypass.open() + {:ok, bypass: bypass} + end + + test "does not connect without a consumer", %{bypass: bypass} do + Bypass.down(bypass) + + {:ok, _producer} = start_producer(bypass) + + # make sure the producer doesn't crash + assert :timer.sleep(50) + end + + test "sends the result of parsing", %{bypass: bypass} do + Bypass.expect_once(bypass, fn conn -> + send_resp(conn, 200, "body") + end) + + {:ok, producer} = start_producer(bypass) + assert take_events(producer, 1) == [["body"]] + end + + test "schedules a fetch again", %{bypass: bypass} do + {:ok, agent} = response_agent() + + agent + |> add_response(fn conn -> + send_resp(conn, 200, "first") + end) + |> add_response(fn conn -> + send_resp(conn, 200, "second") + end) + + Bypass.expect(bypass, fn conn -> agent_response(agent, conn) end) + + {:ok, producer} = start_producer(bypass, fetch_after: 50) + + assert take_events(producer, 2) == [["first"], ["second"]] + end + + @tag timeout: 2_000 + @tag :capture_log + test "schedules a fetch again if there's a disconnection", %{bypass: bypass} do + {:ok, agent} = response_agent() + + agent + |> add_response(fn conn -> + Bypass.down(bypass) + Bypass.up(bypass) + send_resp(conn, 200, "first") + end) + |> add_response(fn conn -> + send_resp(conn, 200, "reconnect") + end) + + Bypass.expect(bypass, fn conn -> agent_response(agent, conn) end) + + {:ok, producer} = start_producer(bypass, fetch_after: 50, get_opts: [timeout: 100]) + + assert take_events(producer, 1) == [["reconnect"]] + Bypass.pass(bypass) + end + + test "if there's a cached response, retries again with last-modified", %{bypass: bypass} do + {:ok, agent} = response_agent() + + agent + |> add_response(fn conn -> + conn + |> put_resp_header("Last-Modified", "last mod") + |> put_resp_header("ETag", "tag") + |> send_resp(200, "first") + end) + |> add_response(fn conn -> + assert get_req_header(conn, "if-modified-since") == ["last mod"] + assert get_req_header(conn, "if-none-match") == [] + send_resp(conn, 304, "") + end) + |> add_response(fn conn -> + assert get_req_header(conn, "if-modified-since") == ["last mod"] + assert get_req_header(conn, "if-none-match") == [] + send_resp(conn, 200, "second") + end) + + Bypass.expect(bypass, fn conn -> agent_response(agent, conn) end) + + {:ok, producer} = start_producer(bypass, fetch_after: 50) + assert take_events(producer, 3) == [["first"], ["second"], ["agent"]] + end + + test "if there's a cached response, retries again with etag if there isn't a last-modified header", + %{bypass: bypass} do + {:ok, agent} = response_agent() + + agent + |> add_response(fn conn -> + conn + |> put_resp_header("ETag", "tag") + |> send_resp(200, "first") + end) + |> add_response(fn conn -> + assert get_req_header(conn, "if-none-match") == ["tag"] + send_resp(conn, 304, "") + end) + |> add_response(fn conn -> + assert get_req_header(conn, "if-none-match") == ["tag"] + send_resp(conn, 200, "second") + end) + + Bypass.expect(bypass, fn conn -> agent_response(agent, conn) end) + + {:ok, producer} = start_producer(bypass, fetch_after: 50) + assert take_events(producer, 3) == [["first"], ["second"], ["agent"]] + end + + test "if there's a redirect, fetches from the new URL", %{bypass: bypass} do + # fetches are: + # 1. bypass (immediate) + # 2. temp redirect (immediate) + # 3. bypass (after timeout) + # 4. permanent redirect (immediate) + # 5. permanent redirect (after timeout) + temp_redirect = Bypass.open() + permanent_redirect = Bypass.open() + {:ok, agent} = response_agent() + {:ok, permanent_redirect_agent} = response_agent() + + agent + |> add_response(fn conn -> + conn + |> put_resp_header("location", "http://127.0.0.1:#{temp_redirect.port}/temp") + |> send_resp(302, "should have temp redirected") + end) + |> add_response(fn conn -> + conn + |> put_resp_header("location", "http://127.0.0.1:#{permanent_redirect.port}/perm") + |> send_resp(301, "should have permanently redirect") + end) + + permanent_redirect_agent + |> add_response(fn conn -> + send_resp(conn, 200, "in permanent redirect") + end) + |> add_response(fn conn -> + send_resp(conn, 200, "in permanent redirect again") + end) + + Bypass.expect_once(temp_redirect, fn conn -> + send_resp(conn, 200, "in temp redirect") + end) + + Bypass.expect(permanent_redirect, &agent_response(permanent_redirect_agent, &1)) + + Bypass.expect(bypass, fn conn -> agent_response(agent, conn) end) + {:ok, producer} = start_producer(bypass, fetch_after: 10) + + assert take_events(producer, 3) == [ + ["in temp redirect"], + ["in permanent redirect"], + ["in permanent redirect again"] + ] + end + + @tag :capture_log + test "an error in parsing isn't fatal", %{bypass: bypass} do + {:ok, agent} = response_agent() + + agent + |> add_response(fn conn -> + send_resp(conn, 200, "failure") + end) + + Bypass.expect(bypass, fn conn -> agent_response(agent, conn) end) + + {:ok, producer} = + start_producer( + bypass, + fetch_after: 10, + parser: fn + "failure" -> throw("error") + body -> [body] + end + ) + + assert take_events(producer, 1) == [["agent"]] + end + + @tag :capture_log + test "a FunctionClauseError error in parsing isn't fatal", %{bypass: bypass} do + {:ok, agent} = response_agent() + + agent + |> add_response(fn conn -> + send_resp(conn, 200, "failure") + end) + + Bypass.expect(bypass, fn conn -> agent_response(agent, conn) end) + + {:ok, producer} = + start_producer( + bypass, + fetch_after: 10, + parser: fn "agent" -> ["agent"] end + ) + + assert take_events(producer, 1) == [["agent"]] + end + + @tag :capture_log + test "a fetch error is not fatal" do + {:ok, pid} = + start_supervised({Concentrate.Producer.Mint, {"http://nodomain.dne", parser: & &1}}) + + # this will never finish, so run it in a separate process + Task.async(fn -> take_events(pid, 1) end) + :timer.sleep(50) + assert Process.alive?(pid) + end + + defp start_producer(bypass, opts \\ []) do + url = "http://127.0.0.1:#{bypass.port}/" + opts = Keyword.put_new(opts, :parser, fn body -> [body] end) + + {:ok, _} = start_supervised({Concentrate.Producer.Mint, {url, opts}}) + end + + defp take_events(producer, event_count) do + [{producer, max_demand: event_count}] + |> GenStage.stream() + |> Enum.take(event_count) + end + + defp response_agent do + Agent.start_link(fn -> [] end) + end + + defp add_response(agent, fun) do + :ok = Agent.update(agent, fn funs -> funs ++ [fun] end) + agent + end + + defp agent_response(agent, conn) do + fun = + Agent.get_and_update(agent, fn + [] -> {&default_response/1, []} + [fun | funs] -> {fun, funs} + end) + + fun.(conn) + end + + defp default_response(conn) do + send_resp(conn, 200, "agent") + end + end +end