diff --git a/config/config.exs b/config/config.exs index 7de7a411..3ce3966f 100644 --- a/config/config.exs +++ b/config/config.exs @@ -29,6 +29,7 @@ config :concentrate, Concentrate.GroupFilter.TimeOutOfRange, Concentrate.GroupFilter.RemoveUnneededTimes, Concentrate.GroupFilter.VehiclePastStop, + Concentrate.GroupFilter.VehicleBeforeStop, Concentrate.GroupFilter.Shuttle, Concentrate.GroupFilter.SkippedDepartures, Concentrate.GroupFilter.CancelledTrip, diff --git a/lib/concentrate/group_filter/cache/vehicle_before_stop.ex b/lib/concentrate/group_filter/cache/vehicle_before_stop.ex new file mode 100644 index 00000000..2e83a4aa --- /dev/null +++ b/lib/concentrate/group_filter/cache/vehicle_before_stop.ex @@ -0,0 +1,101 @@ +defmodule Concentrate.GroupFilter.Cache.VehicleBeforeStop do + @moduledoc """ + Server to maintain a cache of previously seen StopTimeUpdates for a given trip. + + As the vehicle moves through, we'll remove the older updates. Periodically, + we'll scan for StopTimeUpdates in the past and remove them. + """ + use GenServer + alias Concentrate.{VehiclePosition, StopTimeUpdate} + + @table __MODULE__ + # 5 minutes + @stale_timeout_seconds 300 + + @spec stop_time_updates_for_vehicle(VehiclePosition.t(), [StopTimeUpdate.t()]) :: [ + StopTimeUpdate.t() + ] + def stop_time_updates_for_vehicle(vehicle_position, stop_time_updates) do + if is_integer(VehiclePosition.stop_sequence(vehicle_position)) do + insert_new_updates!(stop_time_updates) + delete_old_updates(vehicle_position) + fetch_updates_with_stop_sequence_ge_than_vehicle(vehicle_position) + else + stop_time_updates + end + rescue + ArgumentError -> + stop_time_updates + end + + defp insert_new_updates!(stop_time_updates) do + inserts = + for stu <- stop_time_updates, + stop_sequence <- List.wrap(StopTimeUpdate.stop_sequence(stu)) do + trip_id = StopTimeUpdate.trip_id(stu) + time = StopTimeUpdate.time(stu) + :ets.match_delete(@table, {trip_id, stop_sequence, :_, :_}) + {trip_id, stop_sequence, time, stu} + end + + :ets.insert(@table, inserts) + end + + defp fetch_updates_with_stop_sequence_ge_than_vehicle(vp) do + unsorted = + :ets.select(@table, [ + { + {VehiclePosition.trip_id(vp), :"$1", :_, :"$2"}, + [{:>=, :"$1", VehiclePosition.stop_sequence(vp)}], + [:"$2"] + } + ]) + + Enum.sort_by(unsorted, &StopTimeUpdate.stop_sequence/1) + end + + defp delete_old_updates(vp) do + :ets.select_delete(@table, [ + { + {VehiclePosition.trip_id(vp), :"$1", :_, :_}, + [{:<, :"$1", VehiclePosition.stop_sequence(vp)}], + [true] + } + ]) + end + + def start_link([]) do + GenServer.start_link(__MODULE__, []) + end + + @impl GenServer + def init([]) do + @table = :ets.new(@table, [:bag, :named_table, :public]) + schedule_clear!() + {:ok, []} + end + + @impl GenServer + def handle_info(:clear, state) do + now = System.system_time(:seconds) + minimum_time = now - @stale_timeout_seconds + + :ets.select_delete(@table, [ + { + {:_, :_, :"$1", :_}, + [{:<, :"$1", minimum_time}], + [true] + } + ]) + + {:noreply, state} + end + + def handle_info(message, state) do + super(message, state) + end + + defp schedule_clear! do + send(self(), :clear) + end +end diff --git a/lib/concentrate/group_filter/vehicle_before_stop.ex b/lib/concentrate/group_filter/vehicle_before_stop.ex new file mode 100644 index 00000000..961fbb78 --- /dev/null +++ b/lib/concentrate/group_filter/vehicle_before_stop.ex @@ -0,0 +1,17 @@ +defmodule Concentrate.GroupFilter.VehicleBeforeStop do + @moduledoc """ + Adds a historic StopTimeUpdate for the trip if the vehicle hasn't moved past it yet. + """ + @behaviour Concentrate.GroupFilter + alias Concentrate.TripUpdate + alias Concentrate.GroupFilter.Cache.VehicleBeforeStop, as: Cache + + @impl Concentrate.GroupFilter + def filter({%TripUpdate{} = tu, [vp], stus}) do + stus = Cache.stop_time_updates_for_vehicle(vp, stus) + + {tu, [vp], stus} + end + + def filter(other), do: other +end diff --git a/lib/concentrate/supervisor.ex b/lib/concentrate/supervisor.ex index a714c07e..e5f466e9 100644 --- a/lib/concentrate/supervisor.ex +++ b/lib/concentrate/supervisor.ex @@ -14,16 +14,17 @@ defmodule Concentrate.Supervisor do end def children(config) do - pool = pool() + misc = misc() alerts = alerts(config[:alerts]) gtfs = gtfs(config[:gtfs]) pipeline = pipeline(config) - Enum.concat([pool, alerts, gtfs, pipeline]) + Enum.concat([misc, alerts, gtfs, pipeline]) end - def pool do + def misc do [ - :hackney_pool.child_spec(:http_producer_pool, timeout: 30_000, max_connections: 100) + :hackney_pool.child_spec(:http_producer_pool, timeout: 30_000, max_connections: 100), + Concentrate.GroupFilter.Cache.VehicleBeforeStop ] end diff --git a/test/concentrate/group_filter/cache/vehicle_before_stop_test.exs b/test/concentrate/group_filter/cache/vehicle_before_stop_test.exs new file mode 100644 index 00000000..7f585d51 --- /dev/null +++ b/test/concentrate/group_filter/cache/vehicle_before_stop_test.exs @@ -0,0 +1,81 @@ +defmodule Concentrate.GroupFilter.Cache.VehicleBeforeStopTest do + @moduledoc false + use ExUnit.Case + import Concentrate.GroupFilter.Cache.VehicleBeforeStop + alias Concentrate.{VehiclePosition, StopTimeUpdate} + + defp supervised(_) do + {:ok, _} = start_supervised(Concentrate.GroupFilter.Cache.VehicleBeforeStop) + :ok + end + + describe "stop_time_updates_for_vehicle/2" do + setup :supervised + + test "restores older StopTimeUpdate values if the vehicle hasn't reached them" do + trip_id = "before_stop_test" + + vp = + VehiclePosition.new( + id: "vehicle", + trip_id: trip_id, + stop_sequence: 1, + latitude: 1.0, + longitude: 1.0 + ) + + stus = + for stop_sequence <- 1..4 do + StopTimeUpdate.new(trip_id: trip_id, stop_sequence: stop_sequence) + end + + assert stop_time_updates_for_vehicle(vp, stus) == stus + # restores the first two StopTimeUpdates since the vehicle hasn't + # reached them + assert stop_time_updates_for_vehicle(vp, Enum.drop(stus, 2)) == stus + + # restores the second StopTimeUpdate since the vehicle is past the + # first one + vp = VehiclePosition.update_stop_sequence(vp, 2) + assert stop_time_updates_for_vehicle(vp, Enum.drop(stus, 2)) == Enum.drop(stus, 1) + end + + test "uses updated stop time updates for future changes" do + trip_id = "before_stop_test" + + vp = + VehiclePosition.new( + id: "vehicle", + trip_id: trip_id, + stop_sequence: 1, + latitude: 1.0, + longitude: 1.0 + ) + + stus = + for stop_sequence <- 1..4 do + StopTimeUpdate.new(trip_id: trip_id, stop_sequence: stop_sequence) + end + + assert stop_time_updates_for_vehicle(vp, stus) == stus + vp = VehiclePosition.update_stop_sequence(vp, 2) + + new_stus = + for stop_sequence <- 3..4 do + StopTimeUpdate.new(trip_id: trip_id, stop_sequence: stop_sequence, arrival_time: 5) + end + + # we expect one old update, plus the two new ones + expected = Enum.slice(stus, 1..1) ++ new_stus + assert stop_time_updates_for_vehicle(vp, new_stus) == expected + end + end + + describe "missing ETS table" do + test "stop_time_updates_for_vehicle returns same updates" do + vp = VehiclePosition.new(latitude: 1, longitude: 1) + stu = StopTimeUpdate.new([]) + assert stop_time_updates_for_vehicle(vp, [stu]) == [stu] + end + end +end diff --git a/test/concentrate/group_filter/vehicle_before_stop_test.exs b/test/concentrate/group_filter/vehicle_before_stop_test.exs new file mode 100644 index 00000000..ac1db8f2 --- /dev/null +++ b/test/concentrate/group_filter/vehicle_before_stop_test.exs @@ -0,0 +1,47 @@ +defmodule Concentrate.GroupFilter.VehicleBeforeStopTest do + @moduledoc false + use ExUnit.Case + import Concentrate.GroupFilter.VehicleBeforeStop + alias Concentrate.{TripUpdate, VehiclePosition, StopTimeUpdate} + + describe "filter/1" do + setup do + {:ok, _} = start_supervised(Concentrate.GroupFilter.Cache.VehicleBeforeStop) + :ok + end + + test "restores older StopTimeUpdate values if the vehicle hasn't reached them yet" do + trip_id = "before_stop_test" + tu = TripUpdate.new(trip_id: trip_id) + + vp = + VehiclePosition.new( + id: "vehicle", + trip_id: trip_id, + stop_sequence: 1, + latitude: 1.0, + longitude: 1.0 + ) + + stus = + for stop_sequence <- 1..4 do + StopTimeUpdate.new(trip_id: trip_id, stop_sequence: stop_sequence) + end + + group = {tu, [vp], stus} + assert filter(group) == group + # restores the first two StopTimeUpdates since the vehicle hasn't + # reached them + assert filter({tu, [vp], Enum.drop(stus, 2)}) == group + + # restores the second StopTimeUpdate since the vehicle is past the + # first one + vp = VehiclePosition.update_stop_sequence(vp, 2) + assert filter({tu, [vp], Enum.drop(stus, 2)}) == {tu, [vp], Enum.drop(stus, 1)} + end + + test "ignores unknown values" do + assert filter(:value) == :value + end + end +end diff --git a/test/concentrate/supervisor_test.exs b/test/concentrate/supervisor_test.exs index 40f5aaad..c69f0bf1 100644 --- a/test/concentrate/supervisor_test.exs +++ b/test/concentrate/supervisor_test.exs @@ -15,10 +15,10 @@ defmodule Concentrate.SupervisorTest do describe "children/1" do test "builds the right number of children" do - # currently, the right number is 4: HTTP pool, alerts, GTFS, pipeline + # currently, the right number is 5: HTTP pool, alerts, GTFS, cache, pipeline actual = children([]) - assert length(actual) == 4 + assert length(actual) == 5 end end end