Skip to content

Commit

Permalink
New parameters app for testing and possibility to declare parameters;…
Browse files Browse the repository at this point in the history
… Refactor of ros_service and ros_client; Support for multiple ros nodes; New supervision tree for the ros app.
  • Loading branch information
ziopio committed Nov 2, 2021
1 parent de05ef9 commit 8197b34
Show file tree
Hide file tree
Showing 27 changed files with 898 additions and 478 deletions.
2 changes: 0 additions & 2 deletions demos/listener/src/listener.erl
Original file line number Diff line number Diff line change
Expand Up @@ -3,11 +3,9 @@
-export([start_link/0]).

-behaviour(gen_subscription_listener).

-export([on_topic_msg/2]).

-behaviour(gen_server).

-export([init/1, handle_call/3, handle_cast/2]).

% We are going to use String.msg so we include its header to use its record definition.
Expand Down
15 changes: 15 additions & 0 deletions demos/parameters/src/parameters.app.src
Original file line number Diff line number Diff line change
@@ -0,0 +1,15 @@
{application, parameters,
[{description, "Examples of the parameter API"},
{vsn, "0.1.0"},
{registered, []},
{mod, {parameters_app, []}},
{applications,
[kernel,
stdlib,
ros
]},
{env,[]},
{modules, []},
{licenses, ["Apache 2.0"]},
{links, []}
]}.
55 changes: 55 additions & 0 deletions demos/parameters/src/parameters.erl
Original file line number Diff line number Diff line change
@@ -0,0 +1,55 @@
-module(parameters).

-behaviour(gen_server).

-export([start_link/0]).
-export([init/1, handle_call/3, handle_cast/2, handle_info/2]).

% We are gonna use String.msg so we include its header to use its record definition.
-include_lib("std_msgs/src/_rosie/std_msgs_string_msg.hrl").

-record(state, {ros_node}).

start_link() ->
gen_server:start_link(?MODULE, #state{}, []).

init(_) ->
Node = ros_context:create_node("parameter_test"),

OtherNode = ros_context:create_node("secon_node"),

test_parameter_declaration_api(Node),

{ok, #state{ros_node = Node}}.

handle_call(_, _, S) ->
{reply, ok, S}.

handle_cast(_, S) ->
{noreply, S}.

handle_info(_, S) ->
{noreply, S}.

test_parameter_declaration_api(Node) ->
SA = ros_node:declare_parameter(Node, "scale_angular", 1.0),
SL = ros_node:declare_parameter(Node, "scale_linear", 1.0),
% Code to test other param type declarations
S = ros_node:declare_parameter(Node, "dummy_string", "Hellow world!"),
I = ros_node:declare_parameter(Node, "int_val", -4),
B = ros_node:declare_parameter(Node, "bool_val", false),
Str_L = ros_node:declare_parameter(Node, "str_list", ["coco","Wow BRO! |-> .\n"]),
I_L = ros_node:declare_parameter(Node, "int_list", [1,23,78,0,99]),
B_L = ros_node:declare_parameter(Node, "bool_list", [true,false]),
E_L = ros_node:declare_parameter(Node, "empty_list", []),
N = ros_node:declare_parameter(Node, "not_set"),


R1 = ros_node:declare_parameter(Node, "to_be_removed"),
R2 = ros_node:undeclare_parameter(Node, "to_be_removed"),
R3 = ros_node:undeclare_parameter(Node, "never_declared"),

I1 = ros_node:declare_parameter(Node, "Invalid example 1", ["ciao", 99]),
I2 = ros_node:declare_parameter(Node, "Invalid example 2", [1.2, 99]),
I3 = ros_node:declare_parameter(Node, "Invalid example 3", [1, false]),
[ io:format("~p\n",[P]) || P <- [SA,SL,S,I,B,Str_L,I_L,B_L,E_L,N, R1,R2,R3, I1,I2,I3 ]].
13 changes: 13 additions & 0 deletions demos/parameters/src/parameters_app.erl
Original file line number Diff line number Diff line change
@@ -0,0 +1,13 @@
-module(parameters_app).

-behaviour(application).

-export([start/2, stop/1]).

start(_StartType, _StartArgs) ->
parameters_sup:start_link().

stop(_State) ->
ok.

%% internal functions
Original file line number Diff line number Diff line change
@@ -1,4 +1,4 @@
-module(ros_nodes_pool_sup).
-module(parameters_sup).

-behaviour(supervisor).

Expand All @@ -21,18 +21,19 @@ start_link() ->
%% modules => modules()} % optional

init([]) ->
%io:format("~p.erl STARTED!\n",[?MODULE]),
SupFlags =
#{strategy => simple_one_for_one,
#{strategy => one_for_all,
intensity => 0,
period => 1},
Node =
[#{id => ros_node_sup,
start => {ros_node_sup, start_link, []},
restart => permanent,
shutdown => 5000,
type => supervisor}],

{ok, {SupFlags, Node}}.
Params =
#{id => parameters,
start => {parameters, start_link, []},
restart => transient,
shutdown => 5000,
type => worker},

ChildSpecs = [Params],

{ok, {SupFlags, ChildSpecs}}.

%% internal functions
1 change: 1 addition & 0 deletions demos/turtlesim/src/turtle_controller.erl
Original file line number Diff line number Diff line change
Expand Up @@ -26,6 +26,7 @@ print_spawn_responce(#turtlesim_spawn_rp{name = Name}) ->
init(_) ->
Node = ros_context:create_node("turtle_controller"),


Pub = ros_node:create_publisher(Node, geometry_msgs_twist_msg, "turtle1/cmd_vel"),

Client = ros_node:create_client(Node, turtlesim_spawn_srv, fun print_spawn_responce/1),
Expand Down
2 changes: 1 addition & 1 deletion rosie/dds/include/dds_types.hrl
Original file line number Diff line number Diff line change
Expand Up @@ -17,6 +17,6 @@
{durability = ?VOLATILE_DURABILITY_QOS,
reliability = ?RELIABLE_RELIABILITY_QOS,
history = {?KEEP_LAST_HISTORY_QOS, 1}}).
-record(user_topic, {type_name, name, qos_profile = #qos_profile{}}).
-record(dds_user_topic, {type_name, name, qos_profile = #qos_profile{}}).

-endif.
2 changes: 1 addition & 1 deletion rosie/dds/src/protocol/rtps_full_writer.erl
Original file line number Diff line number Diff line change
Expand Up @@ -95,7 +95,7 @@ init({Participant, #endPoint{guid = GUID} = WriterConfig}) ->
#state{participant = Participant,
entity = WriterConfig,
history_cache = {cache_of, GUID}},
rtps_history_cache:set_listener({cache_of, GUID}, {GUID, ?MODULE}),
rtps_history_cache:set_listener({cache_of, GUID}, {?MODULE, GUID}),

erlang:send_after(10, self(), heartbeat_loop),
erlang:send_after(10, self(), write_loop),
Expand Down
4 changes: 2 additions & 2 deletions rosie/dds/src/protocol/rtps_history_cache.erl
Original file line number Diff line number Diff line change
Expand Up @@ -56,7 +56,7 @@ handle_call({get_change, WriterGuid, SequenceNumber}, _, State) ->
{reply, h_get_change(State, WriterGuid, SequenceNumber), State};
handle_call({add_change, Change}, _, #state{listener = L} = S) when L == undefined ->
{reply, ok, h_add_change(S, Change)};
handle_call({add_change, Change}, _, #state{listener = {ID, Module}} = S) ->
handle_call({add_change, Change}, _, #state{listener = {Module, ID}} = S) ->
Module:on_change_available(ID,
{Change#cacheChange.writerGuid, Change#cacheChange.sequenceNumber}),
{reply, ok, h_add_change(S, Change)};
Expand Down Expand Up @@ -85,7 +85,7 @@ h_get_change(#state{cache = C}, WriterGuid, SequenceNumber) ->
not_found
end.

h_remove_change(#state{cache = C, listener = {ID, Module}} = State,
h_remove_change(#state{cache = C, listener = {Module, ID}} = State,
WriterGuid,
SequenceNumber) ->
Module:on_change_removed(ID, {WriterGuid, SequenceNumber}),
Expand Down
4 changes: 2 additions & 2 deletions rosie/dds/src/protocol/rtps_participant.erl
Original file line number Diff line number Diff line change
Expand Up @@ -13,7 +13,7 @@
-include_lib("dds/include/rtps_constants.hrl").

-record(state,
{participant = #participant{}, spdp_writer_guid, spdp_reader_guid, spdp_data}).
{participant = #participant{}, spdp_writer_guid, spdp_reader_guid}).

start_link() ->
gen_server:start_link({local, participant}, ?MODULE, #state{}, []).
Expand Down Expand Up @@ -95,7 +95,7 @@ handle_cast({start_discovery, EndPointSet},
Change = rtps_writer:new_change(W_GUID, produce_SPDP_data(P, EndPointSet)),
rtps_history_cache:add_change({cache_of, W_GUID}, Change),

rtps_history_cache:set_listener({cache_of, R_GUID}, {participant, ?MODULE}),
rtps_history_cache:set_listener({cache_of, R_GUID}, { ?MODULE, participant}),
self() ! discovery_loop,
{noreply, State};
handle_cast({send_to_all_readers, Msg}, State) ->
Expand Down
2 changes: 1 addition & 1 deletion rosie/dds/src/protocol/rtps_writer.erl
Original file line number Diff line number Diff line change
Expand Up @@ -61,7 +61,7 @@ init({Participant, #endPoint{guid = GUID} = WriterConfig}) ->
%io:format("~p.erl STARTED!\n",[?MODULE]),
pg:join(GUID, self()),
Cache = {cache_of, GUID},
rtps_history_cache:set_listener(Cache, {GUID, ?MODULE}),
rtps_history_cache:set_listener(Cache, {?MODULE, GUID}),
erlang:send_after(1000, self(), writer_loop),
{ok,
#state{participant = Participant,
Expand Down
7 changes: 4 additions & 3 deletions rosie/dds/src/service/dds_data_r.erl
Original file line number Diff line number Diff line change
@@ -1,10 +1,11 @@
-module(dds_data_r).

-behaviour(gen_server).

-export([start_link/1, read/2, get_matched_publications/1, read_all/1,
on_change_available/2, on_change_removed/2, set_listener/2, remote_writer_add/2,
remote_writer_remove/2, match_remote_writers/2]).

-behaviour(gen_server).
-export([init/1, handle_call/3, handle_cast/2, handle_info/2]).

-include_lib("dds/include/rtps_structure.hrl").
Expand Down Expand Up @@ -55,7 +56,7 @@ remote_writer_remove(Name, W) ->
init({Topic, #participant{guid = ID}, GUID}) ->
%io:format("~p.erl STARTED!\n",[?MODULE]),
pg:join({data_r_of, GUID}, self()),
rtps_history_cache:set_listener({cache_of, GUID}, {{data_r_of, GUID}, ?MODULE}),
rtps_history_cache:set_listener({cache_of, GUID}, {?MODULE, {data_r_of, GUID}}),
% [P|_] = pg:get_members(ID),
% R = rtps_participant:create_full_reader(P,ReaderConfig,Cache),
{ok,
Expand All @@ -77,7 +78,7 @@ handle_call(_, _, State) ->
handle_cast({on_change_available, _}, #state{listener = L} = S) when L == not_set ->
{noreply, S};
handle_cast({on_change_available, ChangeKey},
#state{rtps_reader = GUID, listener = {Name, Module}} = S) ->
#state{rtps_reader = GUID, listener = {Module, Name}} = S) ->
Module:on_data_available(Name, {{data_r_of, GUID}, ChangeKey}),
{noreply, S};
handle_cast({match_remote_writers, Writers},
Expand Down
3 changes: 2 additions & 1 deletion rosie/dds/src/service/dds_data_w.erl
Original file line number Diff line number Diff line change
@@ -1,10 +1,11 @@
-module(dds_data_w).

-behaviour(gen_server).

-export([start_link/1, write/2, get_matched_subscriptions/1, remote_reader_add/2, is_sample_acknowledged/2,
remote_reader_remove/2, match_remote_readers/2, wait_for_acknoledgements/1,
flush_all_changes/1]).

-behaviour(gen_server).
-export([init/1, handle_call/3, handle_cast/2, handle_info/2]).

-include_lib("dds/include/rtps_structure.hrl").
Expand Down
8 changes: 4 additions & 4 deletions rosie/dds/src/service/dds_publisher.erl
Original file line number Diff line number Diff line change
Expand Up @@ -82,7 +82,7 @@ init([]) ->
% the publisher listens to the sub_detector to add remote readers to its writers
SubDetector =
dds_subscriber:lookup_datareader(dds_default_subscriber, builtin_sub_detector),
dds_data_r:set_listener(SubDetector, {dds_default_publisher, ?MODULE}),
dds_data_r:set_listener(SubDetector, {?MODULE, dds_default_publisher}),

% The builtin message writer for general purpose comunications between DDS participants
GUID_MW =
Expand Down Expand Up @@ -165,7 +165,7 @@ handle_cast({on_data_available, {R, ChangeKey}}, #state{data_writers = DW} = S)
ToBeMatched =
[Pid
|| {_, T, Pid} <- DW,
T#user_topic.name == Data#sedp_disc_endpoint_data.topic_name],
T#dds_user_topic.name == Data#sedp_disc_endpoint_data.topic_name],
%io:format("DDS: node willing to subscribe to topic : ~p\n", [Data#sedp_disc_endpoint_data.topic_name]),
%io:format("DDS: i have theese topics: ~p\n", [[ T || {_,T,Pid} <- DW]]),
%io:format("DDS: interested writers are: ~p\n", [ToBeMatched]),
Expand All @@ -178,7 +178,7 @@ handle_cast({on_data_available, {R, ChangeKey}}, #state{data_writers = DW} = S)
produce_sedp_disc_enpoint_data(#participant{guid = #guId{prefix = P},
vendorId = VID,
protocolVersion = PVER},
#user_topic{type_name = TN,
#dds_user_topic{type_name = TN,
name = N,
qos_profile =
#qos_profile{reliability = R,
Expand All @@ -199,7 +199,7 @@ produce_sedp_endpoint_leaving(#participant{guid = #guId{prefix = P}}, EntityID)
#sedp_endpoint_state{guid = #guId{prefix = P, entityId = EntityID},
status_flags = ?STATUS_INFO_UNREGISTERED + ?STATUS_INFO_DISPOSED}.

match_with_discovered_readers(DW, #user_topic{name = Tname, type_name = Ttype}) ->
match_with_discovered_readers(DW, #dds_user_topic{name = Tname, type_name = Ttype}) ->
SubDetector =
dds_subscriber:lookup_datareader(dds_default_subscriber, builtin_sub_detector),
RemoteReaders = [D || #cacheChange{data = D} <- dds_data_r:read_all(SubDetector)],
Expand Down
17 changes: 11 additions & 6 deletions rosie/dds/src/service/dds_subscriber.erl
Original file line number Diff line number Diff line change
@@ -1,9 +1,14 @@
-module(dds_subscriber).

-behaviour(gen_server).


-export([start_link/0, get_all_data_readers/1, create_datareader/2, lookup_datareader/2,
on_data_available/2, dispose_data_readers/1]). %set_subscription_publisher/2,
dispose_data_readers/1]). %set_subscription_publisher/2,

-behaviour(gen_data_reader_listener).
-export([on_data_available/2]).

-behaviour(gen_server).
-export([init/1, handle_call/3, handle_cast/2, handle_info/2]).

-include_lib("dds/include/dds_types.hrl").
Expand Down Expand Up @@ -63,7 +68,7 @@ init([]) ->
supervisor:start_child(dds_datareaders_pool_sup,
[{data_reader, dds_pub_detector, P_info, SEDP_Pub_Config}]),

dds_data_r:set_listener({data_r_of, GUID_p}, {dds_default_subscriber, ?MODULE}),
dds_data_r:set_listener({data_r_of, GUID_p}, {?MODULE, dds_default_subscriber}),

% The subscription-reader(aka detector) will listen to which topics the other participants want to subscribe
GUID_s =
Expand Down Expand Up @@ -147,7 +152,7 @@ handle_cast({on_data_available, {R, ChangeKey}}, #state{data_readers = DR} = S)
ToBeMatched =
[Name
|| {_, T, Name} <- DR,
T#user_topic.name == Data#sedp_disc_endpoint_data.topic_name],
T#dds_user_topic.name == Data#sedp_disc_endpoint_data.topic_name],
%io:format("DDS: discovered publisher of topic: ~p\n", [Data#sedp_disc_endpoint_data.topic_name]),
%io:format("DDS: i have theese topics: ~p\n", [[ T || {_,T,_} <- DR]]),
%io:format("DDS: interested readers are: ~p\n", [ToBeMatched]),
Expand All @@ -166,7 +171,7 @@ handle_info(_, State) ->
produce_sedp_disc_enpoint_data(#participant{guid = #guId{prefix = P},
vendorId = VID,
protocolVersion = PVER},
#user_topic{type_name = TN,
#dds_user_topic{type_name = TN,
name = N,
qos_profile =
#qos_profile{reliability = R,
Expand All @@ -188,7 +193,7 @@ produce_sedp_endpoint_leaving(#participant{guid = #guId{prefix = P}}, EntityID)
status_flags = ?STATUS_INFO_UNREGISTERED + ?STATUS_INFO_DISPOSED}.

match_with_discovered_writers(DR,
#user_topic{name = Tname, type_name = Ttype},
#dds_user_topic{name = Tname, type_name = Ttype},
#state{builtin_pub_detector = PubDetector}) ->
RemoteWriters = [D || #cacheChange{data = D} <- dds_data_r:read_all(PubDetector)],
%io:format("Remote writers for topic ~p are ~p\n",[Tname,RemoteWriters]),
Expand Down
4 changes: 2 additions & 2 deletions rosie/dds/src/service/gen_data_reader_listener.erl
Original file line number Diff line number Diff line change
@@ -1,5 +1,5 @@
-module(gen_data_reader_listener).

-callback on_data_available(Listener :: pid(),
{DataReader :: pid(), ChangeKey :: term()}) ->
-callback on_data_available(Listener :: term(),
{DataReader :: term(), ChangeKey :: term()}) ->
term().
3 changes: 3 additions & 0 deletions rosie/ros/src/behaviours/gen_dds_entity_owner.erl
Original file line number Diff line number Diff line change
@@ -0,0 +1,3 @@
-module(gen_dds_entity_owner).

-callback get_all_dds_entities(ProcName::term()) -> { DataWriters::list(), DataReaders::list()}.
Loading

0 comments on commit 8197b34

Please sign in to comment.