Skip to content

Commit

Permalink
refactor: rename TripUpdate -> TripDescriptor
Browse files Browse the repository at this point in the history
In GTFS-RT, TripUpdate is the combination of a TripDescriptor and the
relevant StopTimeUpdates. This renames the Concentrate struct to more closely
match that naming.
  • Loading branch information
paulswartz committed Dec 28, 2020
1 parent c51fabf commit 6e86872
Show file tree
Hide file tree
Showing 51 changed files with 400 additions and 400 deletions.
2 changes: 1 addition & 1 deletion config/config.exs
Original file line number Diff line number Diff line change
Expand Up @@ -37,7 +37,7 @@ config :concentrate,
Concentrate.GroupFilter.VehicleAtSkippedStop,
Concentrate.GroupFilter.VehicleStopMatch,
Concentrate.GroupFilter.SkippedStopOnAddedTrip,
Concentrate.GroupFilter.TripUpdateTimestamp
Concentrate.GroupFilter.TripDescriptorTimestamp
],
reporters: [
Concentrate.Reporter.VehicleLatency,
Expand Down
4 changes: 2 additions & 2 deletions guides/architecture.md
Original file line number Diff line number Diff line change
Expand Up @@ -6,7 +6,7 @@ Overall, Concentrate is modeled as a [GenStage](https://github.com/elixir-lang/g

Throughout the pipeline, data is represented as one of three structs:

* `TripUpdate`: basic trip information like ID, route ID, and direction
* `TripDescriptor`: basic trip information like ID, route ID, and direction
* `VehiclePosition`: where the vehicle is, both latitude/longitude and on the trip
* `StopTimeUpdate`: a prediction about when a vehicle will arrive/depart a stop

Expand All @@ -30,7 +30,7 @@ provided struc. Some filters depend on outside data such as alerts or GTFS:
those filters maintain external state and refer to it during the filtering.

After the first pass of filtering, the data are grouped by their trip
ID. This more closely matches the GTFS-RT format, by having a TripUpdate
ID. This more closely matches the GTFS-RT format, by having a TripDescriptor
grouped with the VehiclePositions and StopTimeUpdates for that trip.

The groups are then passed through modules implementing the GroupFilter
Expand Down
4 changes: 2 additions & 2 deletions guides/configuration.md
Original file line number Diff line number Diff line change
Expand Up @@ -19,7 +19,7 @@ Concentrate can be configured either by updating `config/config.exs` file, or by
* `"max_future_time"`: amount of time (seconds) after which StopTImeUpdates will be ignored
* `"fetch_after"`: amount of time (milliseconds) to wait between fetches
* `"headers"`: an object with additional HTTP headers to send. The values can optionally be {“system”: “<ENV_VAR>”} to fetch the header value from the environment.
* `"drop_fields"`: an object with `"VehiclePosition"`, `"TripUpdate"`, and/or `"StopTimeUpdate"` keys, and values as a list of fields on those struct. The provided fields will be replace with `null` when being parsed.
* `"drop_fields"`: an object with `"VehiclePosition"`, `"TripDescriptor"`, and/or `"StopTimeUpdate"` keys, and values as a list of fields on those struct. The provided fields will be replace with `null` when being parsed.

### GTFS (required)
* Top-level key: `"gtfs"`
Expand Down Expand Up @@ -92,7 +92,7 @@ Concentrate can be configured either by updating `config/config.exs` file, or by
"enhanced_1": {
"url": "url_3",
"drop_fields": {
"TripUpdate": ["start_time"]
"TripDescriptor": ["start_time"]
}
}
}
Expand Down
64 changes: 32 additions & 32 deletions lib/concentrate/encoder/gtfs_realtime_helpers.ex
Original file line number Diff line number Diff line change
Expand Up @@ -2,35 +2,35 @@ defmodule Concentrate.Encoder.GTFSRealtimeHelpers do
@moduledoc """
Helper functions for encoding GTFS-Realtime files.
"""
alias Concentrate.{StopTimeUpdate, TripUpdate, VehiclePosition}
alias Concentrate.{StopTimeUpdate, TripDescriptor, VehiclePosition}
import Calendar.ISO, only: [date_to_string: 4]

@type trip_group :: {TripUpdate.t() | nil, [VehiclePosition.t()], [StopTimeUpdate.t()]}
@type trip_group :: {TripDescriptor.t() | nil, [VehiclePosition.t()], [StopTimeUpdate.t()]}

@doc """
Given a list of parsed data, returns a list of tuples:
{TripUpdate.t() | nil, [VehiclePosition.t()], [StopTimeUpdate.t]}
{TripDescriptor.t() | nil, [VehiclePosition.t()], [StopTimeUpdate.t]}
The VehiclePositions/StopTimeUpdates will share the same trip ID.
"""
@spec group([TripUpdate.t() | VehiclePosition.t() | StopTimeUpdate.t()]) :: [trip_group]
@spec group([TripDescriptor.t() | VehiclePosition.t() | StopTimeUpdate.t()]) :: [trip_group]
def group(parsed) do
# we sort by the initial size, which keeps the trip updates in their original ordering
parsed
|> Enum.reduce(%{}, &group_by_trip_id/2)
|> Map.values()
|> Enum.flat_map(fn
{%TripUpdate{} = tu, [], []} ->
if TripUpdate.schedule_relationship(tu) == :CANCELED do
[{tu, [], []}]
{%TripDescriptor{} = td, [], []} ->
if TripDescriptor.schedule_relationship(td) == :CANCELED do
[{td, [], []}]
else
[]
end

{tu, vps, stus} ->
{td, vps, stus} ->
stus = Enum.sort_by(stus, &StopTimeUpdate.stop_sequence/1)
[{tu, vps, stus}]
[{td, vps, stus}]
end)
end

Expand Down Expand Up @@ -91,7 +91,7 @@ defmodule Concentrate.Encoder.GTFSRealtimeHelpers do
end

@doc """
Builds a list of TripUpdate FeedEntities.
Builds a list of TripDescriptor FeedEntities.
Takes a function to turn a StopTimeUpdate struct into the GTFS-RT version.
"""
Expand Down Expand Up @@ -140,9 +140,9 @@ defmodule Concentrate.Encoder.GTFSRealtimeHelpers do
def schedule_relationship(:SCHEDULED), do: nil
def schedule_relationship(relationship), do: relationship

defp group_by_trip_id(%TripUpdate{} = tu, map) do
if trip_id = TripUpdate.trip_id(tu) do
Map.update(map, trip_id, {tu, [], []}, &add_trip_update(&1, tu))
defp group_by_trip_id(%TripDescriptor{} = td, map) do
if trip_id = TripDescriptor.trip_id(td) do
Map.update(map, trip_id, {td, [], []}, &add_trip_descriptor(&1, td))
else
map
end
Expand All @@ -160,43 +160,43 @@ defmodule Concentrate.Encoder.GTFSRealtimeHelpers do
Map.update(map, trip_id, {nil, [], [stu]}, &add_stop_time_update(&1, stu))
end

defp add_trip_update({_tu, vps, stus}, tu) do
{tu, vps, stus}
defp add_trip_descriptor({_td, vps, stus}, td) do
{td, vps, stus}
end

defp add_vehicle_position({tu, vps, stus}, vp) do
{tu, [vp | vps], stus}
defp add_vehicle_position({td, vps, stus}, vp) do
{td, [vp | vps], stus}
end

defp add_stop_time_update({tu, vps, stus}, stu) do
{tu, vps, [stu | stus]}
defp add_stop_time_update({td, vps, stus}, stu) do
{td, vps, [stu | stus]}
end

defp build_trip_update_entity(
{%TripUpdate{} = update, vps, stus},
{%TripDescriptor{} = td, vps, stus},
stop_time_update_fn,
enhanced_data_fn
) do
trip_id = TripUpdate.trip_id(update)
id = trip_id || "#{:erlang.phash2(update)}"
trip_id = TripDescriptor.trip_id(td)
id = trip_id || "#{:erlang.phash2(td)}"

trip_data = %{
trip_id: trip_id,
route_id: TripUpdate.route_id(update),
direction_id: TripUpdate.direction_id(update),
start_time: TripUpdate.start_time(update),
start_date: encode_date(TripUpdate.start_date(update)),
schedule_relationship: schedule_relationship(TripUpdate.schedule_relationship(update))
route_id: TripDescriptor.route_id(td),
direction_id: TripDescriptor.direction_id(td),
start_time: TripDescriptor.start_time(td),
start_date: encode_date(TripDescriptor.start_date(td)),
schedule_relationship: schedule_relationship(TripDescriptor.schedule_relationship(td))
}

timestamp = TripUpdate.timestamp(update)
timestamp = TripDescriptor.timestamp(td)

trip =
trip_data
|> Map.merge(enhanced_data_fn.(update))
|> Map.merge(enhanced_data_fn.(td))
|> drop_nil_values()

vehicle = trip_update_vehicle(update, vps)
vehicle = trip_update_vehicle(td, vps)

stop_time_update =
case stus do
Expand All @@ -219,7 +219,7 @@ defmodule Concentrate.Encoder.GTFSRealtimeHelpers do
}
]

TripUpdate.schedule_relationship(update) == :CANCELED ->
TripDescriptor.schedule_relationship(td) == :CANCELED ->
[
%{
id: id,
Expand Down Expand Up @@ -257,7 +257,7 @@ defmodule Concentrate.Encoder.GTFSRealtimeHelpers do
end

defp trip_update_vehicle(update, []) do
if vehicle_id = TripUpdate.vehicle_id(update) do
if vehicle_id = TripDescriptor.vehicle_id(update) do
%{id: vehicle_id}
else
nil
Expand Down
4 changes: 2 additions & 2 deletions lib/concentrate/encoder/trip_updates_enhanced.ex
Original file line number Diff line number Diff line change
Expand Up @@ -3,7 +3,7 @@ defmodule Concentrate.Encoder.TripUpdatesEnhanced do
Encodes a list of parsed data into an enhanced.pb file.
"""
@behaviour Concentrate.Encoder
alias Concentrate.{StopTimeUpdate, TripUpdate}
alias Concentrate.{StopTimeUpdate, TripDescriptor}
import Concentrate.Encoder.GTFSRealtimeHelpers

@impl Concentrate.Encoder
Expand All @@ -17,7 +17,7 @@ defmodule Concentrate.Encoder.TripUpdatesEnhanced do
end

defp enhanced_data(update) do
%{route_pattern_id: TripUpdate.route_pattern_id(update)}
%{route_pattern_id: TripDescriptor.route_pattern_id(update)}
end

defp build_stop_time_update(update) do
Expand Down
18 changes: 9 additions & 9 deletions lib/concentrate/encoder/vehicle_positions.ex
Original file line number Diff line number Diff line change
Expand Up @@ -3,7 +3,7 @@ defmodule Concentrate.Encoder.VehiclePositions do
Encodes a list of parsed data into a VehiclePositions.pb file.
"""
@behaviour Concentrate.Encoder
alias Concentrate.{TripUpdate, VehiclePosition}
alias Concentrate.{TripDescriptor, VehiclePosition}
import Concentrate.Encoder.GTFSRealtimeHelpers

@impl Concentrate.Encoder
Expand All @@ -22,8 +22,8 @@ defmodule Concentrate.Encoder.VehiclePositions do
|> Enum.flat_map(&build_entity/1)
end

def build_entity({%TripUpdate{} = update, vps, _stus}) do
trip = trip_descriptor(update)
def build_entity({%TripDescriptor{} = td, vps, _stus}) do
trip = trip_descriptor(td)

for vp <- vps do
%{
Expand Down Expand Up @@ -87,12 +87,12 @@ defmodule Concentrate.Encoder.VehiclePositions do

def trip_descriptor(update) do
drop_nil_values(%{
trip_id: TripUpdate.trip_id(update),
route_id: TripUpdate.route_id(update),
direction_id: TripUpdate.direction_id(update),
start_time: TripUpdate.start_time(update),
start_date: encode_date(TripUpdate.start_date(update)),
schedule_relationship: TripUpdate.schedule_relationship(update)
trip_id: TripDescriptor.trip_id(update),
route_id: TripDescriptor.route_id(update),
direction_id: TripDescriptor.direction_id(update),
start_time: TripDescriptor.start_time(update),
start_date: encode_date(TripDescriptor.start_date(update)),
schedule_relationship: TripDescriptor.schedule_relationship(update)
})
end
end
6 changes: 3 additions & 3 deletions lib/concentrate/encoder/vehicle_positions_enhanced.ex
Original file line number Diff line number Diff line change
Expand Up @@ -3,7 +3,7 @@ defmodule Concentrate.Encoder.VehiclePositionsEnhanced do
Encodes a list of parsed data into a VehiclePositions.pb file.
"""
@behaviour Concentrate.Encoder
alias Concentrate.{TripUpdate, VehiclePosition}
alias Concentrate.{TripDescriptor, VehiclePosition}
alias VehiclePosition.Consist, as: VehiclePositionConsist
import Concentrate.Encoder.GTFSRealtimeHelpers
import Concentrate.Encoder.VehiclePositions, only: [entity_id: 1, trip_descriptor: 1]
Expand All @@ -18,8 +18,8 @@ defmodule Concentrate.Encoder.VehiclePositionsEnhanced do
Jason.encode!(message)
end

def build_entity({%TripUpdate{} = update, vps, _stus}) do
trip = trip_descriptor(update)
def build_entity({%TripDescriptor{} = td, vps, _stus}) do
trip = trip_descriptor(td)

for vp <- vps do
%{
Expand Down
30 changes: 15 additions & 15 deletions lib/concentrate/filter/include_route_direction.ex
Original file line number Diff line number Diff line change
Expand Up @@ -4,37 +4,37 @@ defmodule Concentrate.Filter.IncludeRouteDirection do
"""
@behaviour Concentrate.Filter
alias Concentrate.Filter.GTFS.Trips
alias Concentrate.TripUpdate
alias Concentrate.TripDescriptor

@impl Concentrate.Filter
def filter(item, module \\ Trips)

def filter(%TripUpdate{} = tu, module) do
trip_id = TripUpdate.trip_id(tu)
tu = update_route_direction(tu, trip_id, module)
{:cont, tu}
def filter(%TripDescriptor{} = td, module) do
trip_id = TripDescriptor.trip_id(td)
td = update_route_direction(td, trip_id, module)
{:cont, td}
end

def filter(other, _module) do
{:cont, other}
end

defp update_route_direction(tu, nil, _) do
tu
defp update_route_direction(td, nil, _) do
td
end

defp update_route_direction(tu, trip_id, module) do
tu =
if TripUpdate.route_id(tu) do
tu
defp update_route_direction(td, trip_id, module) do
td =
if TripDescriptor.route_id(td) do
td
else
TripUpdate.update_route_id(tu, module.route_id(trip_id))
TripDescriptor.update_route_id(td, module.route_id(trip_id))
end

if TripUpdate.direction_id(tu) do
tu
if TripDescriptor.direction_id(td) do
td
else
TripUpdate.update_direction_id(tu, module.direction_id(trip_id))
TripDescriptor.update_direction_id(td, module.direction_id(trip_id))
end
end
end
2 changes: 1 addition & 1 deletion lib/concentrate/group_filter.ex
Original file line number Diff line number Diff line change
Expand Up @@ -2,7 +2,7 @@ defmodule Concentrate.GroupFilter do
@moduledoc """
Defines a behavior for filtering over grouped realtime data.
Each filter gets called for each trip group. A trip group is a TripUpdate
Each filter gets called for each trip group. A trip group is a TripDescriptor
(optional), a list of VehiclePositions, and a list of StopTimeUpdates. All
the items in a group share a trip ID, and the StopTimeUpdates will be in
order of their stop_sequence.
Expand Down
16 changes: 8 additions & 8 deletions lib/concentrate/group_filter/cancelled_trip.ex
Original file line number Diff line number Diff line change
Expand Up @@ -4,18 +4,18 @@ defmodule Concentrate.GroupFilter.CancelledTrip do
"""
@behaviour Concentrate.GroupFilter
alias Concentrate.Filter.Alert.CancelledTrips
alias Concentrate.{StopTimeUpdate, TripUpdate}
alias Concentrate.{StopTimeUpdate, TripDescriptor}

@impl Concentrate.GroupFilter
def filter(trip_group, module \\ CancelledTrips)

def filter({%TripUpdate{} = tu, _vps, [stu | _]} = group, module) do
trip_id = TripUpdate.trip_id(tu)
route_id = TripUpdate.route_id(tu)
def filter({%TripDescriptor{} = td, _vps, [stu | _]} = group, module) do
trip_id = TripDescriptor.trip_id(td)
route_id = TripDescriptor.route_id(td)
time = StopTimeUpdate.time(stu)

cond do
TripUpdate.schedule_relationship(tu) == :CANCELED ->
TripDescriptor.schedule_relationship(td) == :CANCELED ->
cancel_group(group)

is_nil(time) ->
Expand All @@ -34,9 +34,9 @@ defmodule Concentrate.GroupFilter.CancelledTrip do

def filter(other, _module), do: other

defp cancel_group({tu, vps, stus}) do
tu = TripUpdate.cancel(tu)
defp cancel_group({td, vps, stus}) do
td = TripDescriptor.cancel(td)
stus = Enum.map(stus, &StopTimeUpdate.skip/1)
{tu, vps, stus}
{td, vps, stus}
end
end
12 changes: 6 additions & 6 deletions lib/concentrate/group_filter/closed_stop.ex
Original file line number Diff line number Diff line change
Expand Up @@ -3,17 +3,17 @@ defmodule Concentrate.GroupFilter.ClosedStop do
Skips StopTimeUpdates for closed stops.
"""
@behaviour Concentrate.GroupFilter
alias Concentrate.{Alert.InformedEntity, StopTimeUpdate, TripUpdate}
alias Concentrate.{Alert.InformedEntity, StopTimeUpdate, TripDescriptor}
alias Concentrate.Filter.Alert.ClosedStops

@impl Concentrate.GroupFilter
def filter(update, stops_module \\ ClosedStops)

def filter({%TripUpdate{} = tu, vps, stus}, stops_module) do
def filter({%TripDescriptor{} = td, vps, stus}, stops_module) do
match = [
trip_id: TripUpdate.trip_id(tu),
route_id: TripUpdate.route_id(tu),
direction_id: TripUpdate.direction_id(tu)
trip_id: TripDescriptor.trip_id(td),
route_id: TripDescriptor.route_id(td),
direction_id: TripDescriptor.direction_id(td)
]

stus =
Expand All @@ -28,7 +28,7 @@ defmodule Concentrate.GroupFilter.ClosedStop do
end
end

{tu, vps, stus}
{td, vps, stus}
end

def filter({_, _, _} = group, _) do
Expand Down
Loading

0 comments on commit 6e86872

Please sign in to comment.