diff --git a/.github/workflows/ci.yml b/.github/workflows/ci.yml index f17d01c7..b6c24cbd 100644 --- a/.github/workflows/ci.yml +++ b/.github/workflows/ci.yml @@ -13,7 +13,7 @@ jobs: strategy: matrix: otp_vsn: ['27', '26', '25'] - rebar_vsn: ['3.23.0'] + rebar_vsn: ['3.24.0'] test-type: ['regular', 'integration'] runs-on: 'ubuntu-24.04' steps: diff --git a/guides/coordinator.md b/guides/coordinator.md index 0c6dfacc..3dff4251 100644 --- a/guides/coordinator.md +++ b/guides/coordinator.md @@ -1,6 +1,6 @@ ## API -See `amoc_coordinator`. +See `m:amoc_coordinator`. ## Description diff --git a/guides/telemetry.md b/guides/telemetry.md index 2d87e40c..1bf28553 100644 --- a/guides/telemetry.md +++ b/guides/telemetry.md @@ -62,7 +62,7 @@ This event is raised only on the master node. ```erlang event_name: [amoc, throttle, rate] -measurements: #{rate := non_neg_integer()} +measurements: #{rate := rate(), interval := interval()} metadata: #{monotonic_time := integer(), name := atom(), msg => binary()} ``` diff --git a/guides/throttle.md b/guides/throttle.md index ff904f3c..c6a83761 100644 --- a/guides/throttle.md +++ b/guides/throttle.md @@ -1,18 +1,22 @@ ## API -See `amoc_throttle` +See `m:amoc_throttle`. ## Overview Amoc throttle is a module that allows limiting the number of users' actions per given interval, no matter how many users there are in a test. -It works in both local and distributed environments, allows for dynamic rate changes during a test and exposes metrics which show the number of requests and executions. +It works in both local and distributed environments, allows for dynamic rate changes during a test and exposes telemetry events showing the number of requests and executions. -Amoc throttle allows setting the execution `Rate` per `Interval` or limiting the number of parallel executions when `Interval` is set to `0`. -Each `Rate` is identified with a `Name`. -The rate limiting mechanism allows responding to a request only when it does not exceed the given `Rate`. -Amoc throttle makes sure that the given `Rate` per `Interval` is maintained on a constant level. +Amoc throttle allows to: + +- Setting the execution `Rate` per `Interval`, or inversely, the `Interarrival` time between actions. +- Limiting the number of parallel executions when `interval` is set to `0`. + +Each throttle is identified with a `Name`. +The rate limiting mechanism allows responding to a request only when it does not exceed the given throttle. +Amoc throttle makes sure that the given throttle is maintained on a constant level. It prevents bursts of executions which could blurry the results, as they technically produce a desired rate in a given interval. -Because of that, it may happen that the actual `Rate` would be slightly below the demanded rate. However, it will never be exceeded. +Because of that, it may happen that the actual throttle rate would be slightly below the demanded rate. However, it will never be exceeded. ## Examples @@ -42,18 +46,21 @@ user_loop(Id) -> user_loop(Id). ``` Here a system should be under a continuous load of 100 messages per minute. -Note that if we used something like `amoc_throttle:run(messages_rate, fun() -> send_message(Id) end)` instead of `amoc_throttle:send_and_wait/2` the system would be flooded with requests. +Note that if we used something like `amoc_throttle:run(messages_rate, fun() -> send_message(Id) end)` instead of `amoc_throttle:wait/1` the system would be flooded with requests. A test may of course be much more complicated. For example it can have the load changing in time. A plan for that can be set for the whole test in `init/1`: ```erlang init() -> - %% init metrics amoc_throttle:start(messages_rate, 100), %% 9 steps of 100 increases in Rate, each lasting one minute - amoc_throttle:change_rate_gradually(messages_rate, 100, 1000, 60000, 60000, 9), - ok. + Gradual = #{from_rate => 100, + to_rate => 1000, + step_count => 9, + step_size => 100, + step_interval => timer:minutes(1)}, + amoc_throttle:change_rate_gradually(messages_rate, Gradual). ``` Normal Erlang messages can be used to schedule tasks for users by themselves or by some controller process. @@ -97,13 +104,13 @@ For a more comprehensive example please refer to the `throttle_test` scenario, w - `amoc_throttle_controller.erl` - a gen_server which is responsible for reacting to requests, and managing `throttle_processes`. In a distributed environment an instance of `throttle_controller` runs on every node, and the one running on the master Amoc node stores the state for all nodes. - `amoc_throttle_process.erl` - gen_server module, implements the logic responsible for limiting the rate. -For every `Name`, a `NoOfProcesses` are created, each responsible for keeping executions at a level proportional to their part of `Rate`. +For every `Name`, a number of processes are created, each responsible for keeping executions at a level proportional to their part of the throttle. ### Distributed environment #### Metrics -In a distributed environment every Amoc node with a throttle started, exposes metrics showing the numbers of requests and executions. -Those exposed by the master node show the sum of all metrics from all nodes. +In a distributed environment every Amoc node with a throttle started, exposes telemetry events showing the numbers of requests and executions. +Those exposed by the master node show the aggregate of all telemetry events from all nodes. This allows to quickly see the real rates across the whole system. #### Workflow @@ -112,12 +119,12 @@ Then a runner process is spawned on the same node. Its task will be to execute `Fun` asynchronously. A random throttle process which is assigned to the `Name` is asked for a permission for asynchronous runner to execute `Fun`. When the request reaches the master node, where throttle processes reside, the request metric on the master node is updated and the throttle process which got the request starts monitoring the asynchronous runner process. -Then, depending on the system's load and the current rate of executions, the asynchronous runner is allowed to run the `Fun` or compelled to wait, because executing the function would exceed the calculated `Rate` in an `Interval`. +Then, depending on the system's load and the current rate of executions, the asynchronous runner is allowed to run the `Fun` or compelled to wait, because executing the function would exceed the calculated throttle. When the rate finally allows it, the asynchronous runner gets the permission to run the function from the throttle process. Both processes increase the metrics which count executions, but for each the metric is assigned to their own node. Then the asynchronous runner tries to execute `Fun`. It may succeed or fail, either way it dies and an `'EXIT'` signal is sent to the throttle process. -This way it knows that the execution of a task has ended, and can allow a different process to run its task connected to the same `Name` if the current `Rate` allows it. +This way it knows that the execution of a task has ended, and can allow a different process to run its task connected to the same `Name` if the current throttle allows it. Below is a graph showing the communication between processes on different nodes described above. ![amoc_throttle_dist](assets/amoc_throttle_dist.svg) diff --git a/rebar.config b/rebar.config index b1f19dae..579959a3 100644 --- a/rebar.config +++ b/rebar.config @@ -4,7 +4,7 @@ ]}. {deps, [ - {telemetry, "1.2.1"} + {telemetry, "1.3.0"} ]}. {profiles, [ @@ -14,10 +14,10 @@ {meck, "0.9.2"}, {proper, "1.4.0"}, {bbmustache, "1.12.2"}, - {wait_helper, "0.2.0"} + {wait_helper, "0.2.1"} ]} ]}, - {elvis, [{plugins, [{rebar3_lint, "3.2.3"}]}]} + {elvis, [{plugins, [{rebar3_lint, "3.2.6"}]}]} ]}. {relx, [ @@ -62,7 +62,7 @@ {'guides/amoc_livebook.livemd', #{title => <<"Livebook tutorial">>}}, {'LICENSE', #{title => <<"License">>}} ]}, - {assets, <<"guides/assets">>}, + {assets, #{<<"guides/assets">> => <<"assets">>}}, {main, <<"readme">>} ]}. diff --git a/rebar.lock b/rebar.lock index 7be98b4f..fd6ebf15 100644 --- a/rebar.lock +++ b/rebar.lock @@ -1,8 +1,8 @@ {"1.2.0", -[{<<"telemetry">>,{pkg,<<"telemetry">>,<<"1.2.1">>},0}]}. +[{<<"telemetry">>,{pkg,<<"telemetry">>,<<"1.3.0">>},0}]}. [ {pkg_hash,[ - {<<"telemetry">>, <<"68FDFE8D8F05A8428483A97D7AAB2F268AAFF24B49E0F599FAA091F1D4E7F61C">>}]}, + {<<"telemetry">>, <<"FEDEBBAE410D715CF8E7062C96A1EF32EC22E764197F70CDA73D82778D61E7A2">>}]}, {pkg_hash_ext,[ - {<<"telemetry">>, <<"DAD9CE9D8EFFC621708F99EAC538EF1CBE05D6A874DD741DE2E689C47FEAFED5">>}]} + {<<"telemetry">>, <<"7015FC8919DBE63764F4B4B87A95B7C0996BD539E0D499BE6EC9D7F3875B79E6">>}]} ]. diff --git a/src/throttle/amoc_throttle.erl b/src/throttle/amoc_throttle.erl index 19885869..0c4e42fc 100644 --- a/src/throttle/amoc_throttle.erl +++ b/src/throttle/amoc_throttle.erl @@ -1,133 +1,169 @@ %% @copyright 2024 Erlang Solutions Ltd. -%% @doc This module allows to synchronize the users and act on groups of them. +%% @doc Allows limiting the number of users' actions per interval. -module(amoc_throttle). %% API --export([start/2, - start/3, - start/4, - send/3, - send/2, - send_and_wait/2, - wait/1, - run/2, - pause/1, - resume/1, - change_rate/3, - change_rate_gradually/6, - stop/1]). - --deprecated([{send_and_wait, 2, "use wait/1 instead"}]). - --define(DEFAULT_NO_PROCESSES, 10). --define(DEFAULT_INTERVAL, 60000). %% one minute --define(NONNEG_INT(N), (is_integer(N) andalso N >= 0)). --define(POS_INT(N), (is_integer(N) andalso N > 0)). +-export([start/2, stop/1, + send/2, send/3, wait/1, + run/2, pause/1, resume/1, unlock/1, + change_rate/2, change_rate_gradually/2]). -type name() :: atom(). --type rate() :: pos_integer(). +%% Atom representing the name of the throttle. + +-type rate() :: infinity | non_neg_integer(). +%% Number of events per given `t:interval/0'. +%% +%% It can also be: +%% + +-type interarrival() :: infinity | non_neg_integer(). +%% Time in milliseconds between two events. +%% +%% It can also be: +%% + -type interval() :: non_neg_integer(). -%% In milliseconds, defaults to 60000 (one minute) when not given. -%% An interval of 0 means no delay at all, only the number of simultaneous executions will be -%% controlled, which corresponds to the number of processes started --export_type([name/0, rate/0, interval/0]). +%% In milliseconds, defaults to 60000 (one minute). +%% +%% Note that an interval of zero means effectively allowing `t:rate/0' number of executions in +%% parallel. It might be expected for this to be always `infinity' as a result of the limit when +%% dividing by zero, but this needs to be made explicit in the `t:rate/0' by setting it to infinity. + +-type t() :: #{rate := rate(), interval => interval()} | + #{interarrival := interarrival()}. +%% Throttle unit of measurement + +-type gradual() :: #{from_rate := non_neg_integer(), + to_rate := non_neg_integer(), + interval => interval()} | + #{from_interarrival := non_neg_integer(), + to_interarrival := non_neg_integer()}. +%% Configuration throttle for a gradual rate change. +%% +%% "from" and "to" prefixed parameters, whether rates or interarrivals, are required. +%% `interval' applies only to rate and defaults to 1s. + +-type plan() :: #{step_interval := pos_integer(), + step_count := pos_integer()} | + #{duration := pos_integer()}. +%% Configuration plan for a gradual rate change. +%% +%% The throttle mechanism will take a series of discrete steps, +%% for as long as the duration given, +%% or in the shape of the `step_interval' and `step_count'. -%% @see start/4 --spec start(name(), rate()) -> ok | {error, any()}. -start(Name, Rate) -> - start(Name, Rate, ?DEFAULT_INTERVAL). +-type gradual_plan() :: #{throttle := gradual(), + plan := plan()}. +%% Gradual plan details. Must specify a `t:gradual/0', and a `t:plan/0'. -%% @see start/4 --spec start(name(), rate(), non_neg_integer()) -> ok | {error, any()}. -start(Name, Rate, Interval) -> - start(Name, Rate, Interval, ?DEFAULT_NO_PROCESSES). +-export_type([t/0, name/0, rate/0, interval/0, gradual_plan/0]). -%% @doc Starts the throttle mechanism for a given `Name' with a given `Rate' per `Interval'. +%% @doc Starts the throttle mechanism for a given `Name' with a given config. %% -%% The optional arguments are an `Interval' (default is one minute) and a ` NoOfProcesses' (default is 10). %% `Name' is needed to identify the rate as a single test can have different rates for different tasks. -%% `Interval' is given in milliseconds and can be changed to a different value for convenience or higher granularity. -%% It also accepts a special value of `0' which limits the number of parallel executions associated with `Name' to `Rate'. --spec start(name(), rate(), interval(), pos_integer()) -> ok | {error, any()}. -start(Name, Rate, Interval, NoOfProcesses) - when is_atom(Name), ?POS_INT(Rate), ?NONNEG_INT(Interval), ?POS_INT(NoOfProcesses) -> - amoc_throttle_controller:ensure_throttle_processes_started(Name, Rate, Interval, NoOfProcesses); -start(_Name, _Rate, _Interval, _NoOfProcesses) -> - {error, invalid_throttle}. +-spec start(name(), t() | rate()) -> {ok, started | already_started} | {error, any()}. +start(Name, #{} = Config) -> + case amoc_throttle_config:verify_config(Config) of + {error, Error} -> + {error, Error}; + VerifiedConfig -> + amoc_throttle_controller:ensure_throttle_processes_started(Name, VerifiedConfig) + end; +start(Name, Rate) when is_integer(Rate) -> + start(Name, #{rate => Rate}). %% @doc Pauses executions for the given `Name' as if `Rate' was set to `0'. %% -%% Does not stop the scheduled rate changes. +%% Does not stop the scheduled rate changes. `resume/1' undoes the pausing. -spec pause(name()) -> ok | {error, any()}. pause(Name) -> amoc_throttle_controller:pause(Name). -%% @doc Resumes the executions for the given `Name', to their original `Rate' and `Interval' values. +%% @doc Resumes the executions for the given `Name', to their original configuration value. +%% +%% It is the counterpart to the `pause/1' API, resuming the execution of what that mechanism paused. -spec resume(name()) -> ok | {error, any()}. resume(Name) -> amoc_throttle_controller:resume(Name). -%% @doc Sets `Rate' and `Interval' for `Name' according to the given values. -%% -%% Can change whether Amoc throttle limits `Name' to parallel executions or to `Rate' per `Interval', -%% according to the given `Interval' value. --spec change_rate(name(), rate(), interval()) -> ok | {error, any()}. -change_rate(Name, Rate, Interval) -> - amoc_throttle_controller:change_rate(Name, Rate, Interval). +%% @doc Unlocks executions for the given `Name' by setting `Rate' to `infinity'. +-spec unlock(name()) -> ok | {error, any()}. +unlock(Name) -> + change_rate(Name, #{rate => infinity, interval => 0}). + +%% @doc Sets the throttle `Config' for `Name' according to the given values. +-spec change_rate(name(), t() | rate()) -> ok | {error, any()}. +change_rate(Name, #{} = Config) -> + case amoc_throttle_config:verify_config(Config) of + {error, Error} -> + {error, Error}; + VerifiedConfig -> + amoc_throttle_controller:change_rate(Name, VerifiedConfig) + end; +change_rate(Name, Rate) when is_integer(Rate) -> + change_rate(Name, #{rate => Rate}). %% @doc Allows to set a plan of gradual rate changes for a given `Name'. %% -%% `Rate' will be changed from `FromRate' to `ToRate' in a series of consecutive steps. -%% Note that `FromRate' does not need to be lower than `ToRate', rates can be changed downwards. -%% -%% The rate is calculated at each step in relation to the `RateInterval', which can also be `0'. -%% There will be `NoOfSteps' steps, each taking `StepInterval' time in milliseconds. +%% The configuration will be changed in a series of consecutive steps. +%% Rates can be changed upwards as well as downwards. +%% See the documentation for `t:gradual_plan/0' for more info. %% -%% Be aware that, at first, the rate will be changed to `FromRate' per `RateInterval' and this is not considered a step. --spec change_rate_gradually(name(), rate(), rate(), interval(), pos_integer(), pos_integer()) -> +%% Be aware that, at first, the rate will be changed to the initial point given +%% in the configuration, and this is not considered a step. +-spec change_rate_gradually(name(), gradual_plan()) -> ok | {error, any()}. -change_rate_gradually(Name, FromRate, ToRate, RateInterval, StepInterval, NoOfSteps) -> - amoc_throttle_controller:change_rate_gradually( - Name, FromRate, ToRate, RateInterval, StepInterval, NoOfSteps). +change_rate_gradually(Name, Config) -> + case amoc_throttle_config:verify_gradual_config(Config) of + {error, _} = Error -> + Error; + VerifiedConfig -> + amoc_throttle_controller:change_rate_gradually(Name, VerifiedConfig) + end. %% @doc Executes a given function `Fn' when it does not exceed the rate for `Name'. %% %% `Fn' is executed in the context of a new process spawned on the same node on which %% the process executing `run/2' runs, so a call to `run/2' is non-blocking. -%% This function is used internally by both `send' and `send_and_wait/2' functions, -%% so all those actions will be limited to the same rate when called with the same `Name'. %% -%% Diagram showing function execution flow in distributed environment, -%% generated using https://sequencediagram.org/: +%% Diagram +%% showing function execution flow in distributed environment. %% ``` -%% title Amoc distributed -%% participantgroup **Slave node** -%% participant User -%% participant Async runner -%% end -%% participantgroup **Master node** -%% participant Throttle process -%% end -%% box left of User: inc req rate -%% -%% User -> *Async runner : Fun -%% -%% User -> Throttle process : {schedule, Async runner PID} -%% box right of Throttle process : inc req rate -%% -%% ==throtlling delay== -%% -%% Throttle process -> Async runner: scheduled -%% -%% box left of Async runner : inc exec rate -%% abox over Async runner : Fun() -%% activate Async runner -%% box right of Throttle process : inc exec rate -%% deactivate Async runner -%% Async runner ->Throttle process:'DOWN' -%% destroy Async runner +%% title Amoc distributed +%% participantgroup **Slave node** +%% participant User +%% participant Async runner +%% end +%% participantgroup **Master node** +%% participant Throttle process +%% end +%% box left of User: request telemetry event +%% +%% User -> *Async runner : Fun +%% +%% User -> Throttle process : {schedule, Async runner PID} +%% box right of Throttle process : request telemetry event +%% +%% ==throtlling delay== +%% +%% Throttle process -> Async runner: scheduled +%% box right of Throttle process : execution telemetry event +%% space -5 +%% box left of Async runner : execution telemetry event +%% abox over Async runner : scheduled action +%% activate Async runner +%% space +%% deactivate Async runner +%% Async runner ->Throttle process:'DOWN' +%% destroy Async runner %% ''' -%% for the local execution, req/exec rates are increased only by throttle process. -spec run(name(), fun(() -> any())) -> ok | {error, any()}. run(Name, Fn) -> amoc_throttle_runner:throttle(Name, Fn). @@ -145,13 +181,6 @@ send(Name, Msg) -> send(Name, Pid, Msg) -> amoc_throttle_runner:throttle(Name, {Pid, Msg}). -%% @doc Sends and receives the given message `Msg'. -%% -%% Deprecated in favour of `wait/1' --spec send_and_wait(name(), any()) -> ok | {error, any()}. -send_and_wait(Name, _) -> - amoc_throttle_runner:throttle(Name, wait). - %% @doc Blocks the caller until the throttle mechanism allows. -spec wait(name()) -> ok | {error, any()}. wait(Name) -> diff --git a/src/throttle/amoc_throttle_config.erl b/src/throttle/amoc_throttle_config.erl new file mode 100644 index 00000000..3292a139 --- /dev/null +++ b/src/throttle/amoc_throttle_config.erl @@ -0,0 +1,179 @@ +%% @private +%% @see amoc_throttle +%% @copyright 2024 Erlang Solutions Ltd. +-module(amoc_throttle_config). + +-include_lib("kernel/include/logger.hrl"). + +-define(TIMEOUT(N), (infinity =:= N orelse is_integer(N) andalso N >= 0)). +-define(NON_NEG_INT(N), (is_integer(N) andalso N >= 0)). +-define(POS_INT(N), (is_integer(N) andalso N > 0)). +-define(DEFAULT_INTERVAL, 60000). %% one minute +-define(DEFAULT_STEP_INTERVAL, 100). %% every 100ms + +-export([verify_config/1, verify_gradual_config/1, pool_config/2, process_pool_config/2]). +-export([no_of_processes/0]). +-export_type([config/0, gradual_plan/0, pool_config/0]). + +-type process_number() :: non_neg_integer(). +-type config() :: #{rate := amoc_throttle:rate(), + interval := amoc_throttle:interval()}. +-type gradual_plan() :: #{rates := [non_neg_integer()], + interval := amoc_throttle:interval(), + step_interval := non_neg_integer()}. +-type pool_config() :: #{process_number() := + #{max_n := infinity | non_neg_integer(), + delay := non_neg_integer(), + status := active | inactive, + pid := undefined | pid()}}. + +-spec verify_config(amoc_throttle:t()) -> config() | {error, any()}. +verify_config(#{interarrival := infinity} = Config) + when 1 =:= map_size(Config) -> + #{rate => 0, interval => ?DEFAULT_INTERVAL}; +verify_config(#{interarrival := 0} = Config) + when 1 =:= map_size(Config) -> + #{rate => infinity, interval => ?DEFAULT_INTERVAL}; +verify_config(#{interarrival := Interarrival} = Config) + when 1 =:= map_size(Config), ?POS_INT(Interarrival) -> + #{rate => ?DEFAULT_INTERVAL div Interarrival, interval => ?DEFAULT_INTERVAL}; +verify_config(#{rate := Rate, interval := Interval} = Config) + when 2 =:= map_size(Config), ?TIMEOUT(Rate), ?NON_NEG_INT(Interval) -> + Config; +verify_config(#{rate := Rate} = Config) + when 1 =:= map_size(Config), ?TIMEOUT(Rate) -> + Config#{interval => ?DEFAULT_INTERVAL}; +verify_config(_Config) -> + {error, invalid_throttle}. + +-spec verify_gradual_config(amoc_throttle:gradual_plan()) -> gradual_plan() | {error, any()}. +verify_gradual_config(Config) -> + try do_verify_gradual_config(Config) of + Change -> Change + catch error:Reason:Stacktrace -> + ?LOG_WARNING(#{what => bad_gradual_config, + reason => Reason, stacktrace => Stacktrace}), + {error, Reason} + end. + +-spec pool_config(amoc_throttle:rate(), amoc_throttle:interval()) -> pool_config(). +pool_config(infinity, _) -> + Config = #{max_n => infinity, delay => 0, status => active, pid => undefined}, + maps:from_keys(lists:seq(1, ?MODULE:no_of_processes()), Config); +pool_config(0, _) -> + Config = #{max_n => 0, delay => infinity, status => active, pid => undefined}, + maps:from_keys(lists:seq(1, ?MODULE:no_of_processes()), Config); +pool_config(Rate, 0) -> + Config = #{max_n => Rate, delay => 0, status => inactive, pid => undefined}, + PoolConfig = #{1 := First} = maps:from_keys(lists:seq(1, ?MODULE:no_of_processes()), Config), + PoolConfig#{1 := First#{status => active}}; +pool_config(Rate, Interval) when ?POS_INT(Rate), ?POS_INT(Interval) -> + NoOfProcesses = ?MODULE:no_of_processes(), + RatesPerProcess = calculate_rate_per_process(NoOfProcesses, Rate, Interval, +0.0, []), + #{} = lists:foldl(fun assign_process/2, #{}, RatesPerProcess). + +-define(THRESHOLD, 10). +calculate_rate_per_process(1, Rate, Interval, RoundingError, Acc) -> + case delay(RoundingError, Rate, Interval) of + {Delay, Remaining} when Delay =:= infinity; Remaining < 0.5 -> + [{1, Rate, Delay} | Acc]; + {Delay, _} -> + [{1, Rate, Delay + 1} | Acc] + end; +calculate_rate_per_process(N, Rate, Interval, RoundingError, Acc) when is_integer(N), N > 1 -> + ProcessRate = Rate div N, + case ProcessRate of + _ when ProcessRate =< ?THRESHOLD, Rate =< ?THRESHOLD -> + {Delay, RoundingError1} = delay(RoundingError, Rate, Interval), + Acc1 = [{N, Rate, Delay} | Acc], + calculate_rate_per_process(N - 1, 0, Interval, RoundingError1, Acc1); + _ when ProcessRate =< ?THRESHOLD -> + {Delay, RoundingError1} = delay(RoundingError, ?THRESHOLD, Interval), + Acc1 = [{N, ?THRESHOLD, Delay} | Acc], + calculate_rate_per_process(N - 1, Rate - ?THRESHOLD, Interval, RoundingError1, Acc1); + _ -> + {Delay, RoundingError1} = delay(RoundingError, ProcessRate, Interval), + Acc1 = [{N, ProcessRate, Delay} | Acc], + calculate_rate_per_process(N - 1, Rate - ProcessRate, Interval, RoundingError1, Acc1) + end. + +delay(RemainingError, 0, _Interval) -> + {infinity, RemainingError}; +delay(RemainingError, Rate, Interval) -> + Remaining = Interval rem Rate, + RemainingError1 = RemainingError + (Remaining / Rate), + case {Interval div Rate, RemainingError1} of + {DelayBetweenExecutions, _} when RemainingError1 >= 1.0 -> + {DelayBetweenExecutions + 1, RemainingError1 - 1}; + {DelayBetweenExecutions, _} -> + {DelayBetweenExecutions, RemainingError1} + end. + +assign_process({N, RatePerProcess, infinity}, Config) -> + Config#{N => #{max_n => RatePerProcess, + delay => infinity, + status => inactive, + pid => undefined}}; +assign_process({N, RatePerProcess, Delay}, Config) -> + Config#{N => #{max_n => RatePerProcess, + delay => Delay, + status => active, + pid => undefined}}. + +-spec process_pool_config(pid(), pool_config()) -> pool_config(). +process_pool_config(PoolSup, PoolConfig) -> + Workers = amoc_throttle_pool:get_workers(PoolSup), + Fun1 = fun(N, Config) -> Config#{pid => maps:get(N, Workers)} end, + maps:map(Fun1, PoolConfig). + +-spec no_of_processes() -> non_neg_integer(). +no_of_processes() -> + min(30, 2 * erlang:system_info(schedulers_online)). + +-spec do_verify_gradual_config(amoc_throttle:gradual_plan()) -> gradual_plan(). +do_verify_gradual_config( + #{throttle := #{from_rate := FromRate, to_rate := ToRate, interval := Interval} = Throttle, + plan := #{step_interval := StepInterval, step_count := StepCount} = Plan}) + when 3 =:= map_size(Throttle), 2 =:= map_size(Plan), + ?NON_NEG_INT(FromRate), ?NON_NEG_INT(ToRate), ?NON_NEG_INT(Interval), + ?POS_INT(StepInterval), ?POS_INT(StepCount) -> + StepRate = (ToRate - FromRate) / StepCount, + StepPlan = [ calculate_step(Step, StepCount, StepRate, FromRate, ToRate) + || Step <- lists:seq(0, StepCount) ], + #{rates => StepPlan, interval => Interval, step_interval => StepInterval}; + +do_verify_gradual_config( + #{throttle := #{from_rate := _, to_rate := _} = Throttle} = Config0) + when 2 =:= map_size(Throttle) -> + Config1 = Config0#{throttle := Throttle#{interval => ?DEFAULT_INTERVAL}}, + do_verify_gradual_config(Config1); + +do_verify_gradual_config( + #{throttle := #{from_interarrival := FromInterarrival, + to_interarrival := ToInterarrival} = Throttle} = Config0) + when ?NON_NEG_INT(FromInterarrival), ?NON_NEG_INT(ToInterarrival), 2 =:= map_size(Throttle) -> + FromRate = ?DEFAULT_INTERVAL div FromInterarrival, + ToRate = ?DEFAULT_INTERVAL div ToInterarrival, + Config1 = Config0#{throttle := #{from_rate => FromRate, to_rate => ToRate}}, + do_verify_gradual_config(Config1); + +do_verify_gradual_config( + #{throttle := #{from_rate := FromRate, to_rate := ToRate, interval := Interval} = Throttle, + plan := #{duration := Duration} = Plan} = Config0) + when 3 =:= map_size(Throttle), 1 =:= map_size(Plan), + ?NON_NEG_INT(FromRate), ?NON_NEG_INT(ToRate), ?NON_NEG_INT(Interval), ?POS_INT(Duration) -> + StepCount = abs(Duration div ?DEFAULT_STEP_INTERVAL), + Config1 = Config0#{plan := #{step_interval => ?DEFAULT_STEP_INTERVAL, step_count => StepCount}}, + do_verify_gradual_config(Config1). + +-spec calculate_step( + Step :: non_neg_integer(), + StepCount :: non_neg_integer(), + StepRate :: float(), + FromRate :: non_neg_integer(), + ToRate :: non_neg_integer()) -> + non_neg_integer(). +calculate_step(N, N, _, _, To) -> To; +calculate_step(0, _, _, From, _) -> From; +calculate_step(N, _, StepRate, From, _) -> + From + round(StepRate * N). diff --git a/src/throttle/amoc_throttle_controller.erl b/src/throttle/amoc_throttle_controller.erl index 94e97a22..ef12e0a9 100644 --- a/src/throttle/amoc_throttle_controller.erl +++ b/src/throttle/amoc_throttle_controller.erl @@ -8,32 +8,33 @@ %% API -export([start_link/0, - ensure_throttle_processes_started/4, - pause/1, resume/1, stop/1, - change_rate/3, change_rate_gradually/6, + ensure_throttle_processes_started/2, + pause/1, resume/1, stop/1, get_info/1, + change_rate/2, change_rate_gradually/2, + pg_scope/0, + get_throttle_process/1, raise_event_on_slave_node/2, telemetry_event/2]). %% gen_server callbacks --export([init/1, - handle_call/3, - handle_cast/2, - handle_info/2]). +-export([init/1, handle_call/3, handle_cast/2, handle_info/2]). +-define(PG_SCOPE, amoc_throttle). -define(SERVER, ?MODULE). -define(MASTER_SERVER, {?SERVER, amoc_cluster:master_node()}). -record(throttle_info, { + pool_sup :: pid(), + pool_config :: amoc_throttle_config:pool_config(), rate :: amoc_throttle:rate(), interval :: amoc_throttle:interval(), - no_of_procs :: pos_integer(), - active :: boolean(), - change_plan :: change_rate_plan() | undefined + active = true :: boolean(), + change_plan :: undefined | change_rate_plan() }). -record(change_rate_plan, { - high_rate :: pos_integer(), - no_of_steps :: non_neg_integer(), - timer :: timer:tref()}). + rates :: [non_neg_integer()], + timer :: timer:tref() +}). -type name() :: amoc_throttle:name(). -type change_rate_plan() :: #change_rate_plan{}. @@ -41,22 +42,49 @@ -type state() :: #{name() => throttle_info()}. -type event() :: init | execute | request. +-type start_throttle() :: {start_throttle, name(), amoc_throttle_config:config()}. +-type change_rate() :: {change_rate, name(), amoc_throttle_config:config()}. +-type change_rate_gradually() :: + {change_rate_gradually, name(), amoc_throttle_config:gradual_plan()}. +-type operation() :: {pause | resume | stop, name()}. + %%%=================================================================== %%% API %%%=================================================================== --spec(start_link() -> - {ok, Pid :: pid()} | ignore | {error, Reason :: term()}). +-spec pg_scope() -> atom(). +pg_scope() -> + ?PG_SCOPE. + +-spec get_throttle_process(amoc_throttle:name()) -> + {error, no_throttle_process_registered} | {ok, pid()}. +get_throttle_process(Name) -> + case pg:get_members(?PG_SCOPE, Name) of + [_ | _] = List -> %% nonempty list + N = rand:uniform(length(List)), + {ok, lists:nth(N, List)}; + [] -> + {error, no_throttle_process_registered} + end. + +-spec start_link() -> gen_server:start_ret(). start_link() -> gen_server:start_link({local, ?SERVER}, ?MODULE, [], []). --spec ensure_throttle_processes_started(name(), amoc_throttle:rate(), - amoc_throttle:interval(), pos_integer()) -> +-spec ensure_throttle_processes_started(name(), amoc_throttle_config:config()) -> {ok, started | already_started} | - {error, wrong_reconfiguration | wrong_no_of_procs}. -ensure_throttle_processes_started(Name, Rate, Interval, NoOfProcesses) -> + {error, invalid_throttle | wrong_reconfiguration | error_starting_pool}. +ensure_throttle_processes_started(Name, Config) when is_atom(Name) -> raise_event_on_slave_node(Name, init), - gen_server:call(?MASTER_SERVER, {start_processes, Name, Rate, Interval, NoOfProcesses}). + gen_server:call(?MASTER_SERVER, {start_throttle, Name, Config}). + +-spec change_rate(name(), amoc_throttle_config:config()) -> ok | {error, any()}. +change_rate(Name, Config) -> + gen_server:call(?MASTER_SERVER, {change_rate, Name, Config}). + +-spec change_rate_gradually(name(), amoc_throttle_config:gradual_plan()) -> ok | {error, any()}. +change_rate_gradually(Name, Config) -> + gen_server:call(?MASTER_SERVER, {change_rate_gradually, Name, Config}). -spec pause(name()) -> ok | {error, any()}. pause(Name) -> @@ -66,16 +94,9 @@ pause(Name) -> resume(Name) -> gen_server:call(?MASTER_SERVER, {resume, Name}). --spec change_rate(name(), amoc_throttle:rate(), amoc_throttle:interval()) -> ok | {error, any()}. -change_rate(Name, Rate, Interval) -> - gen_server:call(?MASTER_SERVER, {change_rate, Name, Rate, Interval}). - --spec change_rate_gradually(name(), amoc_throttle:rate(), amoc_throttle:rate(), - amoc_throttle:interval(), pos_integer(), pos_integer()) -> - ok | {error, any()}. -change_rate_gradually(Name, LowRate, HighRate, RateInterval, StepInterval, NoOfSteps) -> - gen_server:call(?MASTER_SERVER, {change_rate_gradually, Name, LowRate, HighRate, - RateInterval, StepInterval, NoOfSteps}). +-spec get_info(name()) -> #{_ := _} | {error, any()}. +get_info(Name) -> + gen_server:call(?MASTER_SERVER, {get_info, Name}). -spec stop(name()) -> ok | {error, any()}. stop(Name) -> @@ -98,87 +119,68 @@ raise_event_on_slave_node(Name, Event) -> %%% gen_server callbacks %%%=================================================================== --spec init([]) -> {ok, #{}}. +-spec init([]) -> {ok, state()}. init([]) -> {ok, #{}}. --spec handle_call({start_processes, name(), pos_integer(), amoc_throttle:interval(), pos_integer()}, - From :: {pid(), Tag :: term()}, state()) -> +-spec handle_call(start_throttle(), gen_server:from(), state()) -> {reply, {ok, started | already_started}, state()} | - {reply, {error, wrong_reconfiguration | wrong_no_of_procs}, state()}; - ({pause | resume | stop}, From :: {pid(), Tag :: term()}, state()) -> + {reply, {error, wrong_reconfiguration | error_starting_pool}, state()}; + (change_rate(), gen_server:from(), state()) -> {reply, ok, state()} | - {reply, Error :: any(), state()}; - ({change_rate, name(), amoc_throttle:rate(), amoc_throttle:interval()}, - From :: {pid(), Tag :: term()}, state()) -> + {reply, {error, any()}, state()}; + (change_rate_gradually(), gen_server:from(), state()) -> {reply, ok, state()} | {reply, {error, any()}, state()}; - ({change_rate_gradually, name(), amoc_throttle:rate(), amoc_throttle:rate(), - amoc_throttle:interval(), pos_integer(), pos_integer()}, - From :: {pid(), Tag :: term()}, state()) -> + (operation(), gen_server:from(), state()) -> {reply, ok, state()} | + {reply, Error :: any(), state()}; + ({get_info, name()}, gen_server:from(), state()) -> + {reply, #{_ := _}, state()} | {reply, {error, any()}, state()}. -handle_call({start_processes, Name, Rate, Interval, NoOfProcesses}, _From, State) -> - case amoc_throttle_process:get_throttle_processes(Name) of - {error, no_throttle_process_registered} -> - RealNoOfProcs = start_processes(Name, Rate, Interval, NoOfProcesses), - NewState = State#{Name => #throttle_info{rate = Rate, interval = Interval, - active = true, no_of_procs = RealNoOfProcs}}, - {reply, {ok, started}, NewState}; - {ok, Group} -> - verify_new_start_matches_running(Name, Rate, Interval, NoOfProcesses, Group, State) - end; -handle_call({pause, Name}, _From, State) -> - case run_in_all_processes(Name, pause) of - ok -> - Info = maps:get(Name, State), - {reply, ok, State#{Name => Info#throttle_info{active = false}}}; - Error -> - {reply, Error, State} +handle_call({start_throttle, Name, #{rate := Rate, interval := Interval}}, _From, State) -> + case State of + #{Name := #throttle_info{rate = Rate, interval = Interval}} -> + {reply, {ok, already_started}, State}; + #{Name := #throttle_info{}} -> + {reply, {error, wrong_reconfiguration}, State}; + _ -> + do_start_throttle(Name, Rate, Interval, State) end; -handle_call({resume, Name}, _From, State) -> - case run_in_all_processes(Name, resume) of - ok -> - Info = maps:get(Name, State), - {reply, ok, State#{Name => Info#throttle_info{active = true}}}; - Error -> {reply, Error, State} +handle_call({change_rate, Name, #{rate := Rate, interval := Interval}}, _From, State) -> + case State of + #{Name := #throttle_info{rate = Rate, interval = Interval}} -> + {reply, ok, State}; + #{Name := #throttle_info{change_plan = undefined} = Info} -> + do_change_rate(Name, Rate, Interval, Info, State); + #{Name := #throttle_info{}} -> + {reply, {error, cannot_change_rate}, State}; + _ -> + {reply, {error, {no_throttle_by_name, Name}}, State} end; -handle_call({change_rate, Name, Rate, Interval}, _From, State) -> +handle_call({change_rate_gradually, Name, GradualChangeRate}, _From, State) -> case State of - #{Name := Info} -> - case maybe_change_rate(Name, Rate, Interval, Info) of - ok -> - UpdatedInfo = Info#throttle_info{rate = Rate, interval = Interval}, - {reply, ok, State#{Name => UpdatedInfo}}; - Error -> - {reply, Error, State} - end; + #{Name := #throttle_info{change_plan = undefined} = Info} -> + do_gradual_change_rate(Name, Info, State, GradualChangeRate); + #{Name := _} -> + {reply, {error, cannot_change_rate}, State}; _ -> {reply, {error, {no_throttle_by_name, Name}}, State} end; -handle_call({change_rate_gradually, Name, LowRate, HighRate, - RateInterval, StepInterval, NoOfSteps}, - _From, State) -> +handle_call({Op, Name}, _From, State) + when stop =:= Op; pause =:= Op; resume =:= Op -> case State of #{Name := Info} -> - case Info#throttle_info.change_plan of - undefined -> - NewInfo = start_gradual_rate_change(Name, LowRate, HighRate, RateInterval, - StepInterval, NoOfSteps, Info), - {reply, ok, State#{Name => NewInfo}}; - _ -> - {reply, {error, cannot_change_rate}, State} - end; + do_run_op(Op, Name, Info, State); _ -> {reply, {error, {no_throttle_by_name, Name}}, State} end; -handle_call({stop, Name}, _From, State) -> - case run_in_all_processes(Name, stop) of - ok -> - {reply, ok, maps:remove(Name, State)}; - Error -> - {reply, Error, State} - end. +handle_call({get_info, Name}, _From, State) -> + Info = maps:get(Name, State), + Fields = record_info(fields, throttle_info), + [_ | Values] = tuple_to_list(Info), + Ret = maps:from_list(lists:zip(Fields, Values)), + {reply, Ret, State}. -spec(handle_cast(any(), state()) -> {noreply, state()}). handle_cast(_, State) -> @@ -189,12 +191,8 @@ handle_cast(_, State) -> handle_info({change_plan, Name}, State) -> Info = maps:get(Name, State), Plan = Info#throttle_info.change_plan, - case Plan#change_rate_plan.no_of_steps of - 1 -> NewState = change_rate_and_stop_plan(Name, State), - {noreply, NewState}; - N when N > 1 -> NewState = continue_plan(Name, State), - {noreply, NewState} - end. + NewState = continue_plan(Name, State, Info, Plan), + {noreply, NewState}. %%%=================================================================== %%% Internal functions @@ -203,20 +201,71 @@ handle_info({change_plan, Name}, State) -> raise_event(Name, Event) when Event =:= request; Event =:= execute; Event =:= init -> amoc_telemetry:execute([throttle, Event], #{count => 1}, #{name => Name}). -report_rate(Name, RatePerMinute) -> - amoc_telemetry:execute([throttle, rate], #{rate => RatePerMinute}, #{name => Name}). +report_rate(Name, infinity, _) -> + amoc_telemetry:execute([throttle, rate], #{rate => infinity}, #{name => Name}); +report_rate(Name, Rate, Interval) -> + Measurements = #{rate => Rate, interval => Interval}, + amoc_telemetry:execute([throttle, rate], Measurements, #{name => Name}). + +-spec do_start_throttle(name(), amoc_throttle:rate(), amoc_throttle:interval(), state()) -> + {reply, {ok, started}, state()} | + {reply, {error, error_starting_pool}, state()}. +do_start_throttle(Name, Rate, Interval, State) -> + PoolConfig = amoc_throttle_config:pool_config(Rate, Interval), + case amoc_throttle_pooler:start_pool(Name, PoolConfig) of + {ok, PoolSup} when is_pid(PoolSup) -> + PoolConfig1 = amoc_throttle_config:process_pool_config(PoolSup, PoolConfig), + process_pool(Name, PoolConfig1), + raise_event(Name, init), + report_rate(Name, Rate, Interval), + Info = #throttle_info{pool_sup = PoolSup, pool_config = PoolConfig1, + rate = Rate, interval = Interval}, + NewState = State#{Name => Info}, + {reply, {ok, started}, NewState}; + _ -> + {reply, {error, error_starting_pool}, State} + end. --spec change_rate_and_stop_plan(name(), state()) -> state(). -change_rate_and_stop_plan(Name, State) -> - Info = maps:get(Name, State), - Plan = Info#throttle_info.change_plan, +-spec do_change_rate( + name(), amoc_throttle:rate(), amoc_throttle:interval(), throttle_info(), state()) -> + {reply, ok, state()} | + {reply, {error, any()}, state()}. +do_change_rate(Name, Rate, Interval, Info, State) -> + NewInfo = do_change_rate(Name, Rate, Interval, Info), + {reply, ok, State#{Name => NewInfo}}. + +-spec do_change_rate(name(), amoc_throttle:rate(), amoc_throttle:interval(), throttle_info()) -> + throttle_info(). +do_change_rate(Name, Rate, Interval, #throttle_info{pool_config = OldPoolConfig} = Info) -> + NewPoolConfig = amoc_throttle_config:pool_config(Rate, Interval), + report_rate(Name, Rate, Interval), + PoolConfig1 = update_throttle_processes(Name, OldPoolConfig, NewPoolConfig), + Info#throttle_info{rate = Rate, interval = Interval, pool_config = PoolConfig1}. + +-spec do_gradual_change_rate( + name(), throttle_info(), state(), amoc_throttle_config:gradual_plan()) -> + {reply, ok, state()}. +do_gradual_change_rate( + Name, Info, State, + #{rates := [FirstRate | Rates], interval := Interval, step_interval := StepInterval}) -> + Info1 = do_change_rate(Name, FirstRate, Interval, Info), + {ok, Timer} = timer:send_interval(StepInterval, {change_plan, Name}), + Plan = #change_rate_plan{rates = Rates, timer = Timer}, + NewInfo = Info1#throttle_info{rate = FirstRate, interval = Interval, change_plan = Plan}, + {reply, ok, State#{Name => NewInfo}}. + +-spec continue_plan(name(), state(), throttle_info(), change_rate_plan()) -> state(). +continue_plan(Name, State, Info, #change_rate_plan{rates = [Rate]} = Plan) -> Interval = Info#throttle_info.interval, TRef = Plan#change_rate_plan.timer, - HighRate = Plan#change_rate_plan.high_rate, - ok = do_change_rate(Name, HighRate, Interval), + Info1 = do_change_rate(Name, Rate, Interval, Info), {ok, cancel} = timer:cancel(TRef), consume_all_timer_ticks({change_plan, Name}), - State#{Name => Info#throttle_info{rate = HighRate, change_plan = undefined}}. + State#{Name => Info1#throttle_info{change_plan = undefined}}; +continue_plan(Name, State, Info, #change_rate_plan{rates = [Rate | Rates]} = Plan) -> + Info1 = do_change_rate(Name, Rate, Info#throttle_info.interval, Info), + NewPlan = Plan#change_rate_plan{rates = Rates}, + State#{Name => Info1#throttle_info{change_plan = NewPlan}}. consume_all_timer_ticks(Msg) -> receive @@ -224,102 +273,57 @@ consume_all_timer_ticks(Msg) -> after 0 -> ok end. --spec continue_plan(name(), state()) -> state(). -continue_plan(Name, State) -> - Info = maps:get(Name, State), - Plan = Info#throttle_info.change_plan, - LowRate = Info#throttle_info.rate, - HighRate = Plan#change_rate_plan.high_rate, - NoOfSteps = Plan#change_rate_plan.no_of_steps, - - Step = (HighRate - LowRate) div (NoOfSteps), - NewRate = LowRate + Step, - ok = do_change_rate(Name, NewRate, Info#throttle_info.interval), - - NewPlan = Plan#change_rate_plan{no_of_steps = NoOfSteps - 1}, - State#{Name => Info#throttle_info{rate = NewRate, change_plan = NewPlan}}. - --spec rate_per_minute(amoc_throttle:rate(), amoc_throttle:interval()) -> amoc_throttle:rate(). -rate_per_minute(_, 0) -> 0; -rate_per_minute(Rate, Interval) -> - (Rate * 60000) div Interval. - --spec start_processes(name(), amoc_throttle:rate(), amoc_throttle:interval(), pos_integer()) -> - pos_integer(). -start_processes(Name, Rate, Interval, NoOfProcesses) -> - raise_event(Name, init), - RatePerMinute = rate_per_minute(Rate, Interval), - report_rate(Name, RatePerMinute), - RealNoOfProcesses = min(Rate, NoOfProcesses), - start_throttle_processes(Name, Interval, Rate, RealNoOfProcesses), - RealNoOfProcesses. - --spec maybe_change_rate(name(), amoc_throttle:rate(), amoc_throttle:interval(), throttle_info()) -> - ok | {error, any()}. -maybe_change_rate(Name, Rate, Interval, Info) -> - CurrentRatePerMin = rate_per_minute(Info#throttle_info.rate, Info#throttle_info.interval), - ReqRatePerMin = rate_per_minute(Rate, Interval), - case {CurrentRatePerMin, Info#throttle_info.change_plan} of - {ReqRatePerMin, _} -> ok; - {_, undefined} -> do_change_rate(Name, Rate, Interval); - _ -> {error, cannot_change_rate} - end. - --spec do_change_rate(name(), amoc_throttle:rate(), amoc_throttle:interval()) -> ok | {error, any()}. -do_change_rate(Name, Rate, Interval) -> - case amoc_throttle_process:get_throttle_processes(Name) of - {ok, List} -> - RatePerMinute = rate_per_minute(Rate, Interval), - report_rate(Name, RatePerMinute), - update_throttle_processes(List, Interval, Rate, length(List)), - ok; - Error -> - Error - end. - --spec start_gradual_rate_change( - name(), amoc_throttle:rate(), amoc_throttle:rate(), - amoc_throttle:interval(), pos_integer(), pos_integer(), throttle_info()) -> - throttle_info(). -start_gradual_rate_change(Name, LowRate, HighRate, RateInterval, StepInterval, NoOfSteps, Info) -> - ok = do_change_rate(Name, LowRate, RateInterval), - {ok, Timer} = timer:send_interval(StepInterval, {change_plan, Name}), - Plan = #change_rate_plan{high_rate = HighRate, no_of_steps = NoOfSteps, timer = Timer}, - Info#throttle_info{rate = LowRate, interval = RateInterval, change_plan = Plan}. - -start_throttle_processes(Name, Interval, Rate, N) -> - ok = amoc_throttle_pool:start_process_pool(Name, Interval, Rate, N). - -update_throttle_processes([Pid], Interval, Rate, 1) -> - amoc_throttle_process:update(Pid, Interval, Rate); -update_throttle_processes([Pid | Tail], Interval, Rate, N) when N > 1 -> - ProcessRate = Rate div N, - amoc_throttle_process:update(Pid, Interval, ProcessRate), - update_throttle_processes(Tail, Interval, Rate - ProcessRate, N - 1). - -run_in_all_processes(Name, Cmd) -> - case amoc_throttle_process:get_throttle_processes(Name) of - {ok, List} -> - [run_cmd(P, Cmd) || P <- List], - ok; - Error -> - Error - end. - -verify_new_start_matches_running(Name, Rate, Interval, NoOfProcesses, Group, State) -> - ExpectedNoOfProcesses = min(Rate, NoOfProcesses), - case {length(Group), State} of - {ExpectedNoOfProcesses, #{Name := #throttle_info{rate = Rate, interval = Interval}}} -> - {reply, {ok, already_started}, State}; - {ExpectedNoOfProcesses, #{Name := #throttle_info{}}} -> - {reply, {error, wrong_reconfiguration}, State}; - _ -> - {reply, {error, wrong_no_of_procs}, State} - end. - -run_cmd(Pid, stop) -> - amoc_throttle_process:stop(Pid); -run_cmd(Pid, pause) -> - amoc_throttle_process:pause(Pid); -run_cmd(Pid, resume) -> - amoc_throttle_process:resume(Pid). +do_run_op(stop, Name, #throttle_info{pool_sup = PoolSup}, State) -> + ok = amoc_throttle_pooler:stop_pool(PoolSup), + {reply, ok, maps:remove(Name, State)}; +do_run_op(pause, Name, #throttle_info{pool_config = PoolConfig, active = true} = Info, State) -> + Fun = fun(_, #{pid := Pid}) -> + amoc_throttle_process:update(Pid, 0, infinity) + end, + maps:foreach(Fun, PoolConfig), + {reply, ok, State#{Name => Info#throttle_info{active = false}}}; + +do_run_op(resume, Name, #throttle_info{pool_config = PoolConfig, active = false} = Info, State) -> + Fun = fun(_, #{max_n := MaxN, delay := Delay, pid := Pid}) -> + amoc_throttle_process:update(Pid, MaxN, Delay) + end, + maps:foreach(Fun, PoolConfig), + {reply, ok, State#{Name => Info#throttle_info{active = true}}}. + +-spec process_pool(amoc_throttle:name(), amoc_throttle_config:pool_config()) -> + amoc_throttle_config:pool_config(). +process_pool(Name, PoolConfig1) -> + Fun2 = fun({_, #{status := active, pid := Pid}}) -> + {true, Pid}; + (_) -> + false + end, + Pids = lists:filtermap(Fun2, maps:to_list(PoolConfig1)), + pg:join(?PG_SCOPE, Name, Pids), + PoolConfig1. + +-spec update_throttle_processes( + amoc_throttle:name(), + amoc_throttle_config:pool_config(), + amoc_throttle_config:pool_config()) -> + amoc_throttle_config:pool_config(). +update_throttle_processes(Name, OldPoolConfig, NewPoolConfig) -> + Fun = fun(N, #{status := Status, delay := Delay, max_n := MaxN} = V, {C, J, L}) -> + #{status := OldStatus, pid := Pid} = maps:get(N, C), + case {Status, OldStatus} of + {active, inactive} -> + %% we let the disabling process drain, without possibly setting its delay + %% to infinity and blocking all other processes. + {C#{N := V#{pid := Pid}}, [Pid | J], L}; + {inactive, active} -> + amoc_throttle_process:update(Pid, MaxN, Delay), + {C#{N := V#{pid := Pid}}, J, [Pid | L]}; + {Same, Same} -> + amoc_throttle_process:update(Pid, MaxN, Delay), + {C#{N := V#{pid := Pid}}, J, L} + end + end, + {PoolConfig, Join, Leave} = maps:fold(Fun, {OldPoolConfig, [], []}, NewPoolConfig), + pg:join(?PG_SCOPE, Name, Join), + pg:leave(?PG_SCOPE, Name, Leave), + PoolConfig. diff --git a/src/throttle/amoc_throttle_pool.erl b/src/throttle/amoc_throttle_pool.erl index 84eb3a12..a0685e5b 100644 --- a/src/throttle/amoc_throttle_pool.erl +++ b/src/throttle/amoc_throttle_pool.erl @@ -5,52 +5,32 @@ -behaviour(supervisor). --export([start_process_pool/4]). --export([start_link/4, init/1]). +-export([get_workers/1]). +-export([start_link/2, init/1]). --spec start_process_pool( - amoc_throttle:name(), - amoc_throttle:interval(), - amoc_throttle:rate(), - pos_integer() - ) -> ok | error. -start_process_pool(Name, Interval, Rate, NoOfProcesses) -> - {ok, _} = supervisor:start_child(amoc_throttle_pooler, [Name, Interval, Rate, NoOfProcesses]), - ok. +-spec get_workers(pid()) -> #{non_neg_integer() := pid()}. +get_workers(PoolSup) -> + Processes = supervisor:which_children(PoolSup), + Workers = [ {N, Pid} || {{amoc_throttle_process, N}, Pid, _, _} <- Processes, is_pid(Pid) ], + maps:from_list(Workers). --spec start_link( - amoc_throttle:name(), - amoc_throttle:interval(), - amoc_throttle:rate(), - pos_integer() - ) -> {ok, Pid :: pid()}. -start_link(Name, Interval, Rate, NoOfProcesses) when NoOfProcesses > 0 -> - supervisor:start_link(?MODULE, {Name, Interval, Rate, NoOfProcesses}). +-spec start_link(amoc_throttle:name(), amoc_throttle_config:pool_config()) -> + supervisor:startlink_ret(). +start_link(Name, PoolConfig) -> + supervisor:start_link(?MODULE, {Name, PoolConfig}). --spec init({amoc_throttle:name(), amoc_throttle:rate(), amoc_throttle:interval(), pos_integer()}) -> +-spec init({amoc_throttle:name(), amoc_throttle_config:pool_config()}) -> {ok, {supervisor:sup_flags(), [supervisor:child_spec()]}}. -init({Name, Interval, Rate, NoOfProcesses}) -> - RatesPerProcess = calculate_rate_per_process(Rate, NoOfProcesses), - Tags = lists:seq(1, NoOfProcesses), +init({Name, ConfigPerProcess}) -> Children = [ - #{id => {amoc_throttle_process, Name, N}, - start => {amoc_throttle_process, start_link, [Name, Interval, RatePerProcess]}, + #{id => {amoc_throttle_process, N}, + start => {amoc_throttle_process, start_link, [Name, MaxN, Delay]}, type => worker, - shutdown => timer:seconds(5), + shutdown => timer:seconds(60), restart => transient, modules => [amoc_throttle_process] } - || {RatePerProcess, N} <- lists:zip(RatesPerProcess, Tags) + || {N, #{max_n := MaxN, delay := Delay}} <- maps:to_list(ConfigPerProcess) ], SupFlags = #{strategy => one_for_one, intensity => 0}, {ok, {SupFlags, Children}}. - -%% Helpers -calculate_rate_per_process(Rate, NoOfProcesses) -> - calculate_rate_per_process([], Rate, NoOfProcesses). - -calculate_rate_per_process(Acc, Rate, 1) -> - [Rate | Acc]; -calculate_rate_per_process(Acc, Rate, N) when is_integer(N), N > 1 -> - ProcessRate = Rate div N, - calculate_rate_per_process([ProcessRate | Acc], Rate - ProcessRate, N - 1). diff --git a/src/throttle/amoc_throttle_pooler.erl b/src/throttle/amoc_throttle_pooler.erl index 62280f6f..8f2dabd0 100644 --- a/src/throttle/amoc_throttle_pooler.erl +++ b/src/throttle/amoc_throttle_pooler.erl @@ -5,13 +5,23 @@ -behaviour(supervisor). +-export([start_pool/2, stop_pool/1]). -export([start_link/0, init/1]). --spec start_link() -> {ok, Pid :: pid()}. +-spec start_pool(amoc_throttle:name(), amoc_throttle_config:pool_config()) -> + supervisor:startchild_ret(). +start_pool(Name, PoolConfig) -> + supervisor:start_child(?MODULE, [Name, PoolConfig]). + +-spec stop_pool(pid()) -> ok. +stop_pool(Pool) -> + ok = supervisor:terminate_child(?MODULE, Pool). + +-spec start_link() -> supervisor:startlink_ret(). start_link() -> supervisor:start_link({local, ?MODULE}, ?MODULE, []). --spec init(term()) -> {ok, {supervisor:sup_flags(), [supervisor:child_spec()]}}. +-spec init([]) -> {ok, {supervisor:sup_flags(), [supervisor:child_spec()]}}. init([]) -> ChildSpec = #{id => amoc_throttle_pool, start => {amoc_throttle_pool, start_link, []}, diff --git a/src/throttle/amoc_throttle_process.erl b/src/throttle/amoc_throttle_process.erl index 4b67d69a..a0a21025 100644 --- a/src/throttle/amoc_throttle_process.erl +++ b/src/throttle/amoc_throttle_process.erl @@ -7,15 +7,7 @@ -behaviour(gen_server). %% API --export([stop/1, - run/2, - update/3, - pause/1, - resume/1, - get_state/1, - get_throttle_process/1, - get_throttle_processes/1 - ]). +-export([run/2, update/3]). %% gen_server behaviour -export([start_link/3, @@ -24,117 +16,82 @@ handle_info/2, handle_cast/2, handle_continue/2, + terminate/2, format_status/1]). --define(PG_SCOPE, amoc_throttle). --define(DEFAULT_MSG_TIMEOUT, 60000).%% one minute +-define(DEFAULT_MSG_TIMEOUT, 60000). %% one minute --record(state, {can_run_fn = true :: boolean(), - pause = false :: boolean(), - max_n :: non_neg_integer(), - name :: atom(), - n :: integer(), - interval = 0 :: amoc_throttle:interval(), %%ms - delay_between_executions = 0 :: non_neg_integer(), %%ms +-record(state, {name :: atom(), + delay_between_executions :: timeout(), %% ms + max_n :: infinity | non_neg_integer(), + n = 0 :: non_neg_integer(), + can_run_fn = true :: boolean(), tref :: timer:tref() | undefined, schedule = [] :: [AmocThrottleRunnerProcess :: pid()], - schedule_reversed = [] :: [AmocThrottleRunnerProcess :: pid()]}). + schedule_reversed = [] :: [AmocThrottleRunnerProcess :: pid()] + }). -type state() :: #state{}. %%------------------------------------------------------------------------------ %% Exported functions %%------------------------------------------------------------------------------ --spec start_link(atom(), amoc_throttle:interval(), amoc_throttle:rate()) -> {ok, pid()}. -start_link(Name, Interval, Rate) -> - gen_server:start_link(?MODULE, {Name, Interval, Rate}, []). - --spec stop(pid()) -> ok. -stop(Pid) -> - gen_server:cast(Pid, stop_process). +-spec start_link(atom(), amoc_throttle:rate(), timeout()) -> gen_server:start_ret(). +start_link(Name, MaxN, Delay) -> + gen_server:start_link(?MODULE, {Name, MaxN, Delay}, []). -spec run(pid(), pid()) -> ok. run(Pid, RunnerPid) -> gen_server:cast(Pid, {schedule, RunnerPid}). --spec update(pid(), amoc_throttle:interval(), amoc_throttle:rate()) -> ok. -update(Pid, Interval, Rate) -> - gen_server:cast(Pid, {update, Interval, Rate}). - --spec pause(pid()) -> ok. -pause(Pid) -> - gen_server:cast(Pid, pause_process). - --spec resume(pid()) -> ok. -resume(Pid) -> - gen_server:cast(Pid, resume_process). - --spec get_state(pid()) -> map(). -get_state(Pid) -> - gen_server:call(Pid, get_state). - --spec get_throttle_process(amoc_throttle:name()) -> - {error, no_throttle_process_registered} | {ok, pid()}. -get_throttle_process(Name) -> - case pg:get_members(?PG_SCOPE, Name) of - [] -> - {error, no_throttle_process_registered}; - List -> %% nonempty list - N = rand:uniform(length(List)), - {ok, lists:nth(N, List)} - end. - --spec get_throttle_processes(amoc_throttle:name()) -> - {error, no_throttle_process_registered} | {ok, [pid()]}. -get_throttle_processes(Name) -> - case pg:get_members(?PG_SCOPE, Name) of - [] -> - {error, no_throttle_process_registered}; - List -> - {ok, List} - end. +%% @doc See `initial_state/1'. +%% +%% Setting the delay to infinity results in the effective pausing of the process. +-spec update(pid(), amoc_throttle:rate(), timeout()) -> ok. +update(Pid, MaxN, Delay) -> + gen_server:cast(Pid, {update, MaxN, Delay}). %%------------------------------------------------------------------------------ %% gen_server behaviour %%------------------------------------------------------------------------------ --spec init({amoc_throttle:name(), amoc_throttle:interval(), amoc_throttle:rate()}) -> +%% We're assuming that this throttle process is getting configured sensible values +%% for Delay and MaxN, and it is the responsibility of the controller +%% to give all workers values that aggregate to the desired throttling. +-spec init({amoc_throttle:name(), amoc_throttle:rate(), timeout()}) -> {ok, state(), timeout()}. -init({Name, Interval, Rate}) -> - pg:join(?PG_SCOPE, Name, self()), - InitialState = initial_state(Name, Interval, Rate), +init({Name, MaxN, Delay}) -> + InitialState = initial_state(Name, MaxN, Delay), StateWithTimer = maybe_start_timer(InitialState), - {ok, StateWithTimer#state{name = Name}, timeout(InitialState)}. + {ok, StateWithTimer, timeout(InitialState)}. --spec handle_info(term(), state()) -> {noreply, state(), {continue, maybe_run_fn}}. +-spec handle_info(Req, state()) -> + {noreply, state(), {continue, maybe_run_fn}} + when Req :: {'DOWN', reference(), process, pid(), term()} + | delay_between_executions + | timeout. handle_info({'DOWN', _, process, _, _}, State) -> - {noreply, inc_n(State), {continue, maybe_run_fn}}; + {noreply, dec_n(State), {continue, maybe_run_fn}}; handle_info(delay_between_executions, State) -> {noreply, State#state{can_run_fn = true}, {continue, maybe_run_fn}}; handle_info(timeout, State) -> internal_event(<<"is inactive">>, State), {noreply, State, {continue, maybe_run_fn}}. --spec handle_cast(term(), state()) -> - {noreply, state(), {continue, maybe_run_fn}} | {stop, normal, state()}. -handle_cast(stop_process, State) -> - {stop, normal, State}; -handle_cast(pause_process, State) -> - {noreply, State#state{pause = true}, {continue, maybe_run_fn}}; -handle_cast(resume_process, State) -> - {noreply, State#state{pause = false}, {continue, maybe_run_fn}}; +-spec handle_cast(Req, state()) -> + {noreply, state(), {continue, maybe_run_fn}} + when Req :: {update, amoc_throttle:rate(), timeout()} + | {schedule, pid()}. handle_cast({schedule, RunnerPid}, #state{schedule_reversed = SchRev, name = Name} = State) -> amoc_throttle_controller:telemetry_event(Name, request), {noreply, State#state{schedule_reversed = [RunnerPid | SchRev]}, {continue, maybe_run_fn}}; -handle_cast({update, Interval, Rate}, #state{name = Name} = State) -> - NewState = merge_state(initial_state(Name, Interval, Rate), State), +handle_cast({update, MaxN, Delay}, #state{name = Name} = State) -> + NewState = merge_state(initial_state(Name, MaxN, Delay), State), internal_event(<<"state update">>, NewState), {noreply, NewState, {continue, maybe_run_fn}}. --spec handle_call(term(), term(), state()) -> - {reply, {error, not_implemented} | state(), state(), {continue, maybe_run_fn}}. -handle_call(get_state, _, State) -> - {reply, printable_state(State), State, {continue, maybe_run_fn}}; +-spec handle_call(any(), gen_server:from(), state()) -> + {reply, {error, not_implemented}, state(), {continue, maybe_run_fn}}. handle_call(_, _, State) -> {reply, {error, not_implemented}, State, {continue, maybe_run_fn}}. @@ -143,46 +100,39 @@ handle_continue(maybe_run_fn, State) -> NewState = maybe_run_fn(State), {noreply, NewState, timeout(NewState)}. +-spec terminate(term(), state()) -> ok. +terminate(_, State) -> %% Flush all pending actions + maybe_run_fn(State#state{can_run_fn = true, max_n = infinity}), + ok. + -spec format_status(gen_server:format_status()) -> gen_server:format_status(). -format_status(#{state := #state{} = State} = FormatStatus) -> - ScheduleLen = length(State#state.schedule), - ScheduleRevLen = length(State#state.schedule_reversed), - State1 = setelement(#state.schedule, State, ScheduleLen), - State2 = setelement(#state.schedule_reversed, State1, ScheduleRevLen), - FormatStatus#{state := State2}. +format_status(#{state := State} = FormatStatus) -> + FormatStatus#{state := printable_state(State)}. %%------------------------------------------------------------------------------ %% internal functions %%------------------------------------------------------------------------------ -initial_state(Name, Interval, Rate) when Rate > 0 -> - NewRate = case Rate < 5 of - true -> - Msg = <<"too low rate, please reduce NoOfProcesses">>, - internal_error(Msg, Name, Rate, Interval), - Rate; - false -> - Rate - end, - Delay = case {Interval, Interval div NewRate, Interval rem NewRate} of - {0, _, _} -> 0; %% limit only No of simultaneous executions - {_, I, _} when I < 10 -> - Message = <<"too high rate, please increase NoOfProcesses">>, - internal_error(Message, Name, Rate, Interval), - 10; - {_, DelayBetweenExecutions, 0} -> DelayBetweenExecutions; - {_, DelayBetweenExecutions, _} -> DelayBetweenExecutions + 1 - end, - #state{interval = Interval, n = NewRate, max_n = NewRate, delay_between_executions = Delay}. - -merge_state(#state{interval = I, delay_between_executions = D, n = N, max_n = MaxN}, - #state{n = OldN, max_n = OldMaxN} = OldState) -> +%% - If `Delay' is infinity, we mean to pause the process, see how at `maybe_start_timer/1' +%% a delay of infinity will set `can_run_fn = false'. +%% +%% - If `MaxN' is infinity and `Delay' is a number, we mean no limits to throttling, +%% see how `maybe_start_timer/1' will not actually start any timer +%% and `maybe_run_fn/1' with `max_n = infinity' will loop without pause. +%% +%% - If both `MaxN' and `Delay' are numbers, this will be the actual rate/interval. +%% Note however that if delay is zero, we effectively limit parallelism to `MaxN'. +-spec initial_state(Name :: atom(), MaxN :: amoc_throttle:rate(), Delay :: timeout()) -> state(). +initial_state(Name, MaxN, Delay) -> + #state{name = Name, max_n = MaxN, delay_between_executions = Delay}. + +merge_state(#state{delay_between_executions = D, max_n = MaxN}, #state{} = OldState) -> maybe_stop_timer(OldState), - NewN = N - (OldMaxN - OldN), - NewState = OldState#state{interval = I, delay_between_executions = D, n = NewN, - max_n = MaxN, tref = undefined}, + NewState = OldState#state{delay_between_executions = D, max_n = MaxN, tref = undefined}, maybe_start_timer(NewState). +maybe_start_timer(#state{delay_between_executions = infinity, tref = undefined} = State) -> + State#state{can_run_fn = false}; maybe_start_timer(#state{delay_between_executions = 0, tref = undefined} = State) -> State#state{can_run_fn = true}; maybe_start_timer(#state{delay_between_executions = D, tref = undefined} = State) -> @@ -195,6 +145,11 @@ maybe_stop_timer(#state{tref = TRef}) -> {ok, cancel} = timer:cancel(TRef), consume_all_timer_ticks(delay_between_executions). +timeout(#state{delay_between_executions = infinity}) -> + infinity; +timeout(#state{delay_between_executions = Delay}) -> + Delay + ?DEFAULT_MSG_TIMEOUT. + consume_all_timer_ticks(Msg) -> receive Msg -> consume_all_timer_ticks(Msg) @@ -207,10 +162,10 @@ maybe_run_fn(#state{schedule = [], schedule_reversed = SchRev} = State) -> NewSchedule = lists:reverse(SchRev), NewState = State#state{schedule = NewSchedule, schedule_reversed = []}, maybe_run_fn(NewState); -maybe_run_fn(#state{interval = 0, pause = false, n = N} = State) when N > 0 -> +maybe_run_fn(#state{can_run_fn = true, max_n = infinity} = State) -> NewState = run_fn(State), maybe_run_fn(NewState); -maybe_run_fn(#state{can_run_fn = true, pause = false, n = N} = State) when N > 0 -> +maybe_run_fn(#state{can_run_fn = true, n = N, max_n = MaxN} = State) when N < MaxN -> NewState = run_fn(State), NewState#state{can_run_fn = false}; maybe_run_fn(State) -> @@ -220,23 +175,16 @@ run_fn(#state{schedule = [RunnerPid | T], name = Name, n = N} = State) -> erlang:monitor(process, RunnerPid), amoc_throttle_runner:run(RunnerPid), amoc_throttle_controller:telemetry_event(Name, execute), - State#state{schedule = T, n = N - 1}. + State#state{schedule = T, n = N + 1}. -timeout(State) -> - State#state.interval + ?DEFAULT_MSG_TIMEOUT. - -inc_n(#state{name = Name, n = N, max_n = MaxN} = State) -> - NewN = N + 1, - case MaxN < NewN of - true -> - PrintableState = printable_state(State), - Msg = <<"throttle proccess has invalid N">>, - amoc_telemetry:execute_log( - error, [throttle, process], #{name => Name, n => NewN, state => PrintableState}, Msg), - State#state{n = MaxN}; - false -> - State#state{n = NewN} - end. +dec_n(#state{name = Name, n = 0} = State) -> + PrintableState = printable_state(State), + Msg = <<"throttle proccess has invalid N">>, + Metadata = #{name => Name, n => 0, state => PrintableState}, + amoc_telemetry:execute_log(error, [throttle, process], Metadata, Msg), + State; +dec_n(#state{n = N} = State) -> + State#state{n = N - 1}. -spec internal_event(binary(), state()) -> any(). internal_event(Msg, #state{name = Name} = State) -> @@ -244,11 +192,6 @@ internal_event(Msg, #state{name = Name} = State) -> amoc_telemetry:execute_log( debug, [throttle, process], #{self => self(), name => Name, state => PrintableState}, Msg). --spec internal_error(binary(), atom(), amoc_throttle:rate(), amoc_throttle:interval()) -> any(). -internal_error(Msg, Name, Rate, Interval) -> - amoc_telemetry:execute_log( - error, [throttle, process], #{name => Name, rate => Rate, interval => Interval}, Msg). - printable_state(#state{} = State) -> Fields = record_info(fields, state), [_ | Values] = tuple_to_list(State#state{schedule = [], schedule_reversed = []}), diff --git a/src/throttle/amoc_throttle_runner.erl b/src/throttle/amoc_throttle_runner.erl index 358d2f64..51726383 100644 --- a/src/throttle/amoc_throttle_runner.erl +++ b/src/throttle/amoc_throttle_runner.erl @@ -17,7 +17,7 @@ run(RunnerPid) -> -spec throttle(amoc_throttle:name(), action()) -> ok | {error, any()}. throttle(Name, Action) -> - case amoc_throttle_process:get_throttle_process(Name) of + case amoc_throttle_controller:get_throttle_process(Name) of {ok, ThrottlerPid} -> Args = [Name, self(), ThrottlerPid, Action], RunnerPid = erlang:spawn_link(?MODULE, async_runner, Args), @@ -38,7 +38,7 @@ maybe_wait(wait, RunnerPid) -> maybe_wait(_, _) -> ok. --spec async_runner(amoc_throttle:name(), pid(), pid(), term()) -> no_return(). +-spec async_runner(amoc_throttle:name(), pid(), pid(), action()) -> true. async_runner(Name, Caller, ThrottlerPid, Action) -> ThrottlerMonitor = erlang:monitor(process, ThrottlerPid), amoc_throttle_controller:raise_event_on_slave_node(Name, request), diff --git a/src/throttle/amoc_throttle_sup.erl b/src/throttle/amoc_throttle_sup.erl index 01b182bc..3e78d91e 100644 --- a/src/throttle/amoc_throttle_sup.erl +++ b/src/throttle/amoc_throttle_sup.erl @@ -25,11 +25,9 @@ -behaviour(supervisor). --define(PG_SCOPE, amoc_throttle). - -export([start_link/0, init/1]). --spec start_link() -> {ok, Pid :: pid()}. +-spec start_link() -> supervisor:startlink_ret(). start_link() -> supervisor:start_link({local, ?MODULE}, ?MODULE, []). @@ -37,7 +35,7 @@ start_link() -> init([]) -> SupFlags = #{strategy => one_for_one}, Pg = #{id => pg, - start => {pg, start_link, [?PG_SCOPE]}, + start => {pg, start_link, [amoc_throttle_controller:pg_scope()]}, type => worker, shutdown => timer:seconds(5), restart => permanent, diff --git a/test/throttle_SUITE.erl b/test/throttle_SUITE.erl index 9eb65c9e..66c4ef94 100644 --- a/test/throttle_SUITE.erl +++ b/test/throttle_SUITE.erl @@ -1,16 +1,17 @@ -module(throttle_SUITE). --include_lib("eunit/include/eunit.hrl"). +-include_lib("proper/include/proper.hrl"). +-include_lib("stdlib/include/assert.hrl"). -compile([export_all, nowarn_export_all]). --define(DEFAULT_NO_PROCESSES, 10). -define(DEFAULT_INTERVAL, 60000). %% one minute -define(RECV(Msg, Timeout), receive Msg -> ok after Timeout -> {error, not_received_yet} end). all() -> [ - {group, api} + {group, api}, + {group, properties} ]. groups() -> @@ -18,21 +19,49 @@ groups() -> {api, [parallel], [ start, - rate_zero_is_not_accepted, - low_rate_gets_remapped, - low_interval_get_remapped, + start_descriptive, + start_interarrival, + start_interarrival_zero, + start_interarrival_infinity, + start_rate_zero, + start_rate_infinity, + start_interval_zero, + low_rate_does_not_get_remapped, + low_interval_does_not_get_remapped, start_and_stop, change_rate, + interval_equal_zero_limits_parallelism, + change_rate_to_interval_zero_limits_parallelism, + change_rate_triggers_parallelism, change_rate_gradually, - send_and_wait, + change_interarrival_gradually, + change_rate_gradually_verify_descriptions, just_wait, wait_for_process_to_die_sends_a_kill, async_runner_dies_while_waiting_raises_exit, async_runner_dies_when_throttler_dies, - run_with_interval_zero_limits_only_number_of_parallel_executions, pause_and_resume, get_state - ]} + ]}, + {properties, [], + [ + change_rate_gradually_verify_descriptions_properties, + % Note that the smallest delay possible for a process is 1ms (receive operations), + % hence if we give for example 10 workers 1ms delays, we get 600_000 ticks per minute. + % and if we give for example 48 workers 1ms delays, we get 2_880_000 ticks per minute. + % That means, that is realistically the maximum rate we could possibly manage + % with a static pool of such number of workers. + pool_config_is_precise_for_rates_1, + pool_config_is_precise_for_rates_2, + pool_config_is_precise_for_rates_3, + pool_config_is_precise_for_rates_4, + pool_config_is_precise_for_rates_5, + pool_config_is_precise_for_rates_6, + pool_config_is_precise_for_rates_7, + pool_config_is_precise_for_rates_8, + pool_config_is_precise_for_rates_9, + pool_config_is_precise_for_rates_10 + ]} ]. init_per_suite(Config) -> @@ -47,6 +76,18 @@ end_per_suite(_) -> telemetry_helpers:stop(), ok. +init_per_group(properties, Config) -> + meck:new(amoc_throttle_config, [passthrough, non_strict, no_link]), + meck:expect(amoc_throttle_config, no_of_processes, [], 100), + Config; +init_per_group(_, Config) -> + Config. + +end_per_group(properties, _Config) -> + meck:unload(amoc_throttle_config); +end_per_group(_, _Config) -> + ok. + init_per_testcase(_, Config) -> Config. @@ -64,31 +105,68 @@ start(_) -> ?assertMatch({ok, already_started}, amoc_throttle:start(?FUNCTION_NAME, 100)), ?assertMatch({error, wrong_reconfiguration}, - amoc_throttle:start(?FUNCTION_NAME, 100, ?DEFAULT_INTERVAL + 1)), - ?assertMatch({error, wrong_no_of_procs}, - amoc_throttle:start(?FUNCTION_NAME, 100, ?DEFAULT_INTERVAL, - ?DEFAULT_NO_PROCESSES + 1)). - -rate_zero_is_not_accepted(_) -> - ?assertMatch({error, invalid_throttle}, amoc_throttle:start(?FUNCTION_NAME, 0, 100, 1)). - -low_rate_gets_remapped(_) -> - ?assertMatch({ok, started}, amoc_throttle:start(?FUNCTION_NAME, 2, 100, 1)), - State = get_state_of_one_process(?FUNCTION_NAME), - ?assertMatch(#{name := ?FUNCTION_NAME, - interval := 100, - delay_between_executions := 50}, - State), - assert_telemetry_event([amoc, throttle, process], error, ?FUNCTION_NAME, 2, 100). - -low_interval_get_remapped(_) -> - ?assertMatch({ok, started}, amoc_throttle:start(?FUNCTION_NAME, 1, 1, 1)), - State = get_state_of_one_process(?FUNCTION_NAME), - ?assertMatch(#{name := ?FUNCTION_NAME, - interval := 1, - delay_between_executions := 10}, - State), - assert_telemetry_event([amoc, throttle, process], error, ?FUNCTION_NAME, 1, 1). + amoc_throttle:start(?FUNCTION_NAME, 101)). + +start_descriptive(_) -> + %% Starts successfully + Description = #{rate => 100, interval => 5000}, + ?assertMatch({ok, started}, amoc_throttle:start(?FUNCTION_NAME, Description)). + +start_interarrival(_) -> + %% Starts successfully + Description = #{interarrival => 50}, + ?assertMatch({ok, started}, amoc_throttle:start(?FUNCTION_NAME, Description)), + State = get_throttle_info(?FUNCTION_NAME), + ?assertMatch(#{interval := 60000, rate := 1200}, State). + +start_interarrival_zero(_) -> + %% Starts successfully + Description = #{interarrival => 0}, + ?assertMatch({ok, started}, amoc_throttle:start(?FUNCTION_NAME, Description)), + State = get_throttle_info(?FUNCTION_NAME), + ?assertMatch(#{interval := 60000, rate := infinity}, State). + +start_interarrival_infinity(_) -> + %% Starts successfully + Description = #{interarrival => infinity}, + ?assertMatch({ok, started}, amoc_throttle:start(?FUNCTION_NAME, Description)), + State = get_throttle_info(?FUNCTION_NAME), + ?assertMatch(#{interval := 60000, rate := 0}, State). + +start_rate_zero(_) -> + %% Starts successfully + Description = #{rate => 0}, + ?assertMatch({ok, started}, amoc_throttle:start(?FUNCTION_NAME, Description)), + State = get_throttle_info(?FUNCTION_NAME), + ?assertMatch(#{interval := 60000, rate := 0}, State). + +start_rate_infinity(_) -> + %% Starts successfully + Description = #{rate => infinity}, + ?assertMatch({ok, started}, amoc_throttle:start(?FUNCTION_NAME, Description)), + State = get_throttle_info(?FUNCTION_NAME), + ?assertMatch(#{interval := 60000, rate := infinity}, State). + +start_interval_zero(_) -> + %% Starts successfully + Description = #{rate => 60, interval => 0}, + ?assertMatch({ok, started}, amoc_throttle:start(?FUNCTION_NAME, Description)), + State = get_throttle_info(?FUNCTION_NAME), + ?assertMatch(#{interval := 0, rate := 60}, State). + +low_rate_does_not_get_remapped(_) -> + ?assertMatch({ok, started}, + amoc_throttle:start(?FUNCTION_NAME, + #{rate => 2, interval => 100})), + State = get_throttle_info(?FUNCTION_NAME), + ?assertMatch(#{interval := 100, rate := 2}, State). + +low_interval_does_not_get_remapped(_) -> + ?assertMatch({ok, started}, + amoc_throttle:start(?FUNCTION_NAME, + #{rate => 1, interval => 1})), + State = get_throttle_info(?FUNCTION_NAME), + ?assertMatch(#{interval := 1, rate := 1}, State). start_and_stop(_) -> %% Starts successfully @@ -102,45 +180,251 @@ start_and_stop(_) -> change_rate(_) -> ?assertMatch({error, {no_throttle_by_name, ?FUNCTION_NAME}}, - amoc_throttle:change_rate(?FUNCTION_NAME, 100, ?DEFAULT_INTERVAL)), + amoc_throttle:change_rate(?FUNCTION_NAME, #{rate => 100})), ?assertMatch({ok, started}, amoc_throttle:start(?FUNCTION_NAME, 100)), - ?assertMatch(ok, amoc_throttle:change_rate(?FUNCTION_NAME, 100, ?DEFAULT_INTERVAL)), - ?assertMatch(ok, amoc_throttle:change_rate(?FUNCTION_NAME, 100, ?DEFAULT_INTERVAL + 1)). + ?assertMatch(ok, amoc_throttle:change_rate(?FUNCTION_NAME, 101)), + R1 = #{rate => 100, interval => ?DEFAULT_INTERVAL + 1}, + ?assertMatch(ok, amoc_throttle:change_rate(?FUNCTION_NAME, R1)), + R2 = #{rate => 10000, interval => 100}, + ?assertMatch(ok, amoc_throttle:change_rate(?FUNCTION_NAME, R2)), + amoc_throttle:send(?FUNCTION_NAME, receive_this), + ?assertMatch(ok, ?RECV(receive_this, 100)), + R3 = #{rate => 0}, + ?assertMatch(ok, amoc_throttle:change_rate(?FUNCTION_NAME, R3)), + amoc_throttle:send(?FUNCTION_NAME, receive_this), + ?assertMatch({error, not_received_yet}, ?RECV(receive_this, 200)), + R4 = #{rate => infinity}, + ?assertMatch(ok, amoc_throttle:change_rate(?FUNCTION_NAME, R4)), + ?assertMatch(ok, ?RECV(receive_this, 100)), + I1 = #{interarrival => 1}, + ?assertMatch(ok, amoc_throttle:change_rate(?FUNCTION_NAME, I1)), + amoc_throttle:send(?FUNCTION_NAME, receive_this), + ?assertMatch(ok, ?RECV(receive_this, 100)), + I2 = #{interarrival => 30000}, + ?assertMatch(ok, amoc_throttle:change_rate(?FUNCTION_NAME, I2)), + amoc_throttle:send(?FUNCTION_NAME, receive_this), + ?assertMatch({error, not_received_yet}, ?RECV(receive_this, 200)). + +interval_equal_zero_limits_parallelism(_) -> + E1 = #{rate => 36, interval => 0}, + ?assertMatch({ok, started}, amoc_throttle:start(?FUNCTION_NAME, E1)), + #{pool_config := Config0} = get_throttle_info(?FUNCTION_NAME), + ActiveWorkers = maps:filter(fun(_, #{status := S}) -> S =:= active end, Config0), + ?assertEqual(1, map_size(ActiveWorkers)), + ?assertMatch([#{max_n := 36, delay := 0}], maps:values(ActiveWorkers)). + +change_rate_to_interval_zero_limits_parallelism(_) -> + ?assertMatch({ok, started}, amoc_throttle:start(?FUNCTION_NAME, 100)), + E1 = #{rate => 100, interval => 0}, + ?assertMatch(ok, amoc_throttle:change_rate(?FUNCTION_NAME, E1)), + #{pool_config := Config0} = get_throttle_info(?FUNCTION_NAME), + ?assertEqual(1, map_size(maps:filter(fun(_, #{status := S}) -> S =:= active end, Config0))). + +change_rate_triggers_parallelism(_) -> + ?assertMatch({ok, started}, amoc_throttle:start(?FUNCTION_NAME, 1)), + #{pool_config := Config0} = get_throttle_info(?FUNCTION_NAME), + ?assertEqual(1, map_size(maps:filter(fun(_, #{status := S}) -> S =:= active end, Config0))), + E1 = #{rate => 60000}, + ?assertMatch(ok, amoc_throttle:change_rate(?FUNCTION_NAME, E1)), + #{pool_config := Config1} = get_throttle_info(?FUNCTION_NAME), + ?assertNotEqual(1, map_size(maps:filter(fun(_, #{status := S}) -> S =:= active end, Config1))). change_rate_gradually(_) -> + C1 = #{throttle => #{from_rate => 100, to_rate => 200, interval => 1}, + plan => #{step_interval => 1, step_count => 1}}, ?assertMatch({error, {no_throttle_by_name, ?FUNCTION_NAME}}, - amoc_throttle:change_rate_gradually(?FUNCTION_NAME, 100, 200, 1, 1, 1)), + amoc_throttle:change_rate_gradually(?FUNCTION_NAME, C1)), ?assertMatch({ok, started}, amoc_throttle:start(?FUNCTION_NAME, 100)), - ?assertMatch(ok, amoc_throttle:change_rate_gradually(?FUNCTION_NAME, 10, 3000, 1, 100, 300)), + C2 = #{throttle => #{from_rate => 10, to_rate => 3000, interval => 1}, + plan => #{step_interval => 100, step_count => 300}}, + ?assertMatch(ok, amoc_throttle:change_rate_gradually(?FUNCTION_NAME, C2)), %% We cannot change rate while a current gradual change is already running. + C3 = #{throttle => #{from_rate => 50, to_rate => 200, interval => 1}, + plan => #{step_interval => 1, step_count => 1}}, ?assertMatch({error, cannot_change_rate}, - amoc_throttle:change_rate_gradually(?FUNCTION_NAME, 50, 200, 1, 1, 1)), - ?assertMatch({error, cannot_change_rate}, - amoc_throttle:change_rate(?FUNCTION_NAME, 100, ?DEFAULT_INTERVAL + 1)). + amoc_throttle:change_rate_gradually(?FUNCTION_NAME, C3)), + E1 = #{rate => 100, interval => ?DEFAULT_INTERVAL + 1}, + ?assertMatch({error, cannot_change_rate}, amoc_throttle:change_rate(?FUNCTION_NAME, E1)). -send_and_wait(_) -> - %% it failts if the throttle wasn't started yet - ?assertMatch({error, no_throttle_process_registered}, - amoc_throttle:send_and_wait(?FUNCTION_NAME, receive_this)), - %% Start 100-per-10ms throttle with a single process - ?assertMatch({ok, started}, amoc_throttle:start(?FUNCTION_NAME, 100, 10, 1)), - %% send_and_wait passes fine - ?assertMatch(ok, amoc_throttle:send_and_wait(?FUNCTION_NAME, receive_this)), - %% One message is received sufficiently fast - amoc_throttle:send(?FUNCTION_NAME, receive_this), - ?assertMatch(ok, ?RECV(receive_this, 100)), - %% If someone else fills the throttle heavily, - %% it will take proportionally so long to execute for me - fill_throttle(?FUNCTION_NAME, 100 * 10), - amoc_throttle:send(?FUNCTION_NAME, receive_this), - ?assertMatch({error, not_received_yet}, ?RECV(receive_this, 200)). +change_interarrival_gradually(_) -> + C1 = #{throttle => #{from_interarrival => 100, to_interarrival => 200}, + plan => #{step_interval => 1, step_count => 1}}, + ?assertMatch({error, {no_throttle_by_name, ?FUNCTION_NAME}}, + amoc_throttle:change_rate_gradually(?FUNCTION_NAME, C1)), + ?assertMatch({ok, started}, amoc_throttle:start(?FUNCTION_NAME, 100)), + C2 = #{throttle => #{from_interarrival => 10, to_interarrival => 3000}, + plan => #{step_interval => 10, step_count => 1000}}, + ?assertMatch(ok, amoc_throttle:change_rate_gradually(?FUNCTION_NAME, C2)), + %% We cannot change rate while a current gradual change is already running. + C3 = #{throttle => #{from_interarrival => 50, to_interarrival => 200}, + plan => #{step_interval => 1, step_count => 1}}, + ?assertMatch({error, cannot_change_rate}, + amoc_throttle:change_rate_gradually(?FUNCTION_NAME, C3)), + E1 = #{rate => 100, interval => ?DEFAULT_INTERVAL + 1}, + ?assertMatch({error, cannot_change_rate}, amoc_throttle:change_rate(?FUNCTION_NAME, E1)). + +change_rate_gradually_verify_descriptions(_) -> + %%% Using step_interval and step_count + %% Condition 1: increment, explicit interval + D1 = #{throttle => #{from_rate => 80, to_rate => 5000, interval => 15000}, + plan => #{step_interval => 50, step_count => 984}}, + R1 = amoc_throttle_config:verify_gradual_config(D1), + ?assertMatch( + #{rates := Rates, interval := 15000, step_interval := 50} when 985 =:= length(Rates), R1), + ?assertEqual(lists:sort(maps:get(rates, R1)), maps:get(rates, R1)), + %% Condition 2: decrement, explicit interval + D2 = #{throttle => #{from_rate => 5000, to_rate => 80, interval => 1000}, + plan => #{step_interval => 10, step_count => 4920}}, + R2 = amoc_throttle_config:verify_gradual_config(D2), + ?assertMatch( + #{rates := Rates, interval := 1000, step_interval := 10} when 4921 =:= length(Rates), R2), + ?assertEqual(lists:reverse(lists:sort(maps:get(rates, R2))), maps:get(rates, R2)), + %% Condition 3: increment, default interval + D3 = #{throttle => #{from_rate => 1200, to_rate => 4000}, + plan => #{step_interval => 10, step_count => 100}}, + R3 = amoc_throttle_config:verify_gradual_config(D3), + ?assertMatch( + #{rates := Rates, interval := 60000, step_interval := 10} when 101 =:= length(Rates), R3), + ?assertEqual(lists:sort(maps:get(rates, R3)), maps:get(rates, R3)), + %% Condition 4: decrement, default interval + D4 = #{throttle => #{from_rate => 4000, to_rate => 1200}, + plan => #{step_interval => 10, step_count => 100}}, + R4 = amoc_throttle_config:verify_gradual_config(D4), + ?assertMatch( + #{rates := Rates, interval := 60000, step_interval := 10} when 101 =:= length(Rates), R4), + ?assertEqual(lists:reverse(lists:sort(maps:get(rates, R4))), maps:get(rates, R4)), + %% Condition 5: increment, interarrival + D5 = #{throttle => #{from_interarrival => 1000, to_interarrival => 100}, + plan => #{step_interval => 10, step_count => 100}}, + R5 = amoc_throttle_config:verify_gradual_config(D5), + ?assertMatch( + #{rates := Rates, interval := 60000, step_interval := 10} when 101 =:= length(Rates), R5), + ?assertEqual(lists:sort(maps:get(rates, R5)), maps:get(rates, R5)), + %% Condition 6: decrement, interarrival + D6 = #{throttle => #{from_interarrival => 100, to_interarrival => 1000}, + plan => #{step_interval => 10, step_count => 100}}, + R6 = amoc_throttle_config:verify_gradual_config(D6), + ?assertMatch( + #{rates := Rates, interval := 60000, step_interval := 10} when 101 =:= length(Rates), R6), + ?assertEqual(lists:reverse(lists:sort(maps:get(rates, R6))), maps:get(rates, R6)), + + %%% Using step_interval and step_count + %% Condition 7: increment, explicit interval + D7 = #{throttle => #{from_rate => 80, to_rate => 5000, interval => 15000}, + plan => #{duration => timer:minutes(10)}}, + R7 = amoc_throttle_config:verify_gradual_config(D7), + ?assertMatch( + #{rates := Rates, interval := 15000, step_interval := 100} when 6001 =:= length(Rates), R7), + ?assertEqual(lists:sort(maps:get(rates, R7)), maps:get(rates, R7)), + %% Condition 8: decrement, explicit interval + D8 = #{throttle => #{from_rate => 5000, to_rate => 80, interval => 1000}, + plan => #{duration => timer:minutes(10)}}, + R8 = amoc_throttle_config:verify_gradual_config(D8), + ?assertMatch( + #{rates := Rates, interval := 1000, step_interval := 100} when 6001 =:= length(Rates), R8), + ?assertEqual(lists:reverse(lists:sort(maps:get(rates, R8))), maps:get(rates, R8)), + %% Condition 9: increment, default interval + D9 = #{throttle => #{from_rate => 1200, to_rate => 4000}, + plan => #{duration => timer:minutes(30)}}, + R9 = amoc_throttle_config:verify_gradual_config(D9), + ?assertMatch( + #{rates := Rates, interval := 60000, step_interval := 100} when 18001 =:= length(Rates), R9), + ?assertEqual(lists:sort(maps:get(rates, R9)), maps:get(rates, R9)), + %% Condition 10: decrement, default interval + D10 = #{throttle => #{from_rate => 4000, to_rate => 1200}, + plan => #{duration => timer:minutes(10)}}, + R10 = amoc_throttle_config:verify_gradual_config(D10), + ?assertMatch( + #{rates := Rates, interval := 60000, step_interval := 100} when 6001 =:= length(Rates), R10), + ?assertEqual(lists:reverse(lists:sort(maps:get(rates, R10))), maps:get(rates, R10)), + %% Condition 11: increment, interarrival + D11 = #{throttle => #{from_interarrival => 1000, to_interarrival => 100}, + plan => #{duration => timer:minutes(10)}}, + R11 = amoc_throttle_config:verify_gradual_config(D11), + ?assertMatch( + #{rates := Rates, interval := 60000, step_interval := 100} when 6001 =:= length(Rates), R11), + ?assertEqual(lists:sort(maps:get(rates, R11)), maps:get(rates, R11)), + %% Condition 12: decrement, interarrival + D12 = #{throttle => #{from_interarrival => 100, to_interarrival => 1000}, + plan => #{duration => timer:minutes(10)}}, + R12 = amoc_throttle_config:verify_gradual_config(D12), + ?assertMatch( + #{rates := Rates, interval := 60000, step_interval := 100} when 6001 =:= length(Rates), R12), + ?assertEqual(lists:reverse(lists:sort(maps:get(rates, R12))), maps:get(rates, R12)), + + %% Error + E1 = #{throttle => #{from_rate => 100, to_rate => 10}, + plan => #{}}, + ?assertMatch( + {error, _}, + amoc_throttle_config:verify_gradual_config(E1)). + +change_rate_gradually_verify_descriptions_properties(_) -> + Fun = fun(From, To, Interval, StepInterval, StepCount) -> + D1 = #{throttle => #{from_rate => From, to_rate => To, interval => Interval}, + plan => #{step_interval => StepInterval, step_count => StepCount}}, + R1 = amoc_throttle_config:verify_gradual_config(D1), + ?assertMatch(#{rates := Rates, + interval := Interval, + step_interval := StepInterval} + when 1 + StepCount =:= length(Rates), R1), + Rates = maps:get(rates, R1), + SortedRates = lists:sort(Rates), + From =:= lists:nth(1, Rates) andalso + To =:= lists:last(Rates) andalso + (From =< To andalso SortedRates =:= Rates + orelse From > To andalso lists:reverse(SortedRates) =:= Rates) + end, + Prop = ?FORALL({From, To, Interval, StepInterval, StepCount}, + {integer(1, 1 bsl 61), + integer(1, 1 bsl 61), + integer(1, 1 bsl 24), + integer(1, 1 bsl 8), + integer(1, 1 bsl 24)}, + Fun(From, To, Interval, StepInterval, StepCount)), + run_prop(?FUNCTION_NAME, Prop, 1 bsl 16, 1). + +pool_config_is_precise_for_rates_1(_) -> + pool_config_property_tests(lists:seq(1, 100_000), timer:minutes(1)). + +pool_config_is_precise_for_rates_2(_) -> + pool_config_property_tests(lists:seq(100_000, 200_000), timer:minutes(1)). + +pool_config_is_precise_for_rates_3(_) -> + pool_config_property_tests(lists:seq(200_000, 300_000), timer:minutes(1)). + +pool_config_is_precise_for_rates_4(_) -> + pool_config_property_tests(lists:seq(300_000, 400_000), timer:minutes(1)). + +pool_config_is_precise_for_rates_5(_) -> + pool_config_property_tests(lists:seq(400_000, 500_000), timer:minutes(1)). + +pool_config_is_precise_for_rates_6(_) -> + pool_config_property_tests(lists:seq(500_000, 600_000), timer:minutes(1)). + +pool_config_is_precise_for_rates_7(_) -> + pool_config_property_tests(lists:seq(600_000, 700_000), timer:minutes(1)). + +pool_config_is_precise_for_rates_8(_) -> + pool_config_property_tests(lists:seq(700_000, 800_000), timer:minutes(1)). + +pool_config_is_precise_for_rates_9(_) -> + pool_config_property_tests(lists:seq(800_000, 900_000), timer:minutes(1)). + +pool_config_is_precise_for_rates_10(_) -> + pool_config_property_tests(lists:seq(900_000, 1_000_000), timer:minutes(1)). + +% TODO: introduce dynamically sized pools in order to manage higher rates. +pool_config_is_precise_for_high_rates(_) -> + pool_config_property_tests(lists:seq(1 bsl 24, 1 bsl 32), timer:minutes(1)). just_wait(_) -> - %% it failts if the throttle wasn't started yet + %% it fails if the throttle wasn't started yet ?assertMatch({error, no_throttle_process_registered}, amoc_throttle:wait(?FUNCTION_NAME)), - %% Start 100-per-10ms throttle with a single process - ?assertMatch({ok, started}, amoc_throttle:start(?FUNCTION_NAME, 100, 10, 1)), + %% Start 100-per-10ms throttle, that is, 600k per minute + Description = #{rate => 100, interval => 10}, + ?assertMatch({ok, started}, amoc_throttle:start(?FUNCTION_NAME, Description)), %% wait passes fine ?assertMatch(ok, amoc_throttle:wait(?FUNCTION_NAME)), %% One message is received sufficiently fast @@ -148,45 +432,40 @@ just_wait(_) -> ?assertMatch(ok, ?RECV(receive_this, 100)), %% If someone else fills the throttle heavily, %% it will take proportionally so long to execute for me - fill_throttle(?FUNCTION_NAME, 100 * 10), + %% TODO + fill_throttle(?FUNCTION_NAME, 100 * 100), amoc_throttle:send(?FUNCTION_NAME, receive_this), ?assertMatch({error, not_received_yet}, ?RECV(receive_this, 200)). wait_for_process_to_die_sends_a_kill(_) -> erlang:process_flag(trap_exit, true), - ?assertMatch({ok, started}, amoc_throttle:start(?FUNCTION_NAME, 100, 10, 1)), + Description = #{rate => 100, interval => 10}, + ?assertMatch({ok, started}, amoc_throttle:start(?FUNCTION_NAME, Description)), amoc_throttle:run(?FUNCTION_NAME, fun() -> exit(?FUNCTION_NAME) end), ?assertMatch(ok, ?RECV({'EXIT', _, ?FUNCTION_NAME}, 100)). async_runner_dies_while_waiting_raises_exit(_) -> - ?assertMatch({ok, started}, amoc_throttle:start(?FUNCTION_NAME, 1, 1, 1)), + Description = #{rate => 1, interval => 1}, + ?assertMatch({ok, started}, amoc_throttle:start(?FUNCTION_NAME, Description)), find_new_link_and_kill_it(self()), ?assertExit({throttle_wait_died, _, killed}, amoc_throttle:wait(?FUNCTION_NAME)). async_runner_dies_when_throttler_dies(_) -> erlang:process_flag(trap_exit, true), {links, OriginalLinks} = erlang:process_info(self(), links), - ?assertMatch({ok, started}, amoc_throttle:start(?FUNCTION_NAME, 1, 60000, 1)), + Description = #{rate => 1, interval => 60000}, + ?assertMatch({ok, started}, amoc_throttle:start(?FUNCTION_NAME, Description)), wait_until_one_throttle_worker(?FUNCTION_NAME), amoc_throttle:send(?FUNCTION_NAME, receive_this), wait_until_one_async_runner(self(), OriginalLinks), amoc_throttle:stop(?FUNCTION_NAME), ?assertMatch(ok, ?RECV({'EXIT', _, {throttler_worker_died, _, _}}, 100)). -run_with_interval_zero_limits_only_number_of_parallel_executions(_) -> - %% Start 10 actions at once in 10 processes - ?assertMatch({ok, started}, amoc_throttle:start(?FUNCTION_NAME, 10, 0, 1)), - %% If someone else fills the throttle heavily, - %% it will take proportionally so long to execute for me - fill_throttle(?FUNCTION_NAME, 100), - amoc_throttle:send(?FUNCTION_NAME, receive_this), - ?assertMatch(ok, ?RECV(receive_this, 200)). - pause_and_resume(_) -> - %% Start 100-per-10ms throttle with a single process - ?assertMatch({ok, started}, amoc_throttle:start(?FUNCTION_NAME, 100, 10, 1)), - %% send_and_wait passes fine - ?assertMatch(ok, amoc_throttle:send_and_wait(?FUNCTION_NAME, receive_this)), + %% Start a 10-per-ms throttle + Description = #{rate => 600000}, + ?assertMatch({ok, started}, amoc_throttle:start(?FUNCTION_NAME, Description)), + ?assertMatch(ok, amoc_throttle:wait(?FUNCTION_NAME)), %% pauses runs correctly ?assertMatch(ok, amoc_throttle:pause(?FUNCTION_NAME)), %% It is paused, so messages aren't received @@ -194,15 +473,15 @@ pause_and_resume(_) -> ?assertMatch({error, not_received_yet}, ?RECV(receive_this, 200)), %% After resume the message is then received ?assertMatch(ok, amoc_throttle:resume(?FUNCTION_NAME)), - ?assertMatch(ok, ?RECV(receive_this, 200)). + ?assertMatch(ok, ?RECV(receive_this, 200)), + State = get_throttle_info(?FUNCTION_NAME), + ?assertMatch(#{rate := 600000}, State). get_state(_) -> - ?assertMatch({ok, started}, amoc_throttle:start(?FUNCTION_NAME, 100, 60000, 1)), - State = get_state_of_one_process(?FUNCTION_NAME), - ?assertMatch(#{name := ?FUNCTION_NAME, - interval := 60000, - delay_between_executions := 600}, - State). + Config = #{rate => 100, interval => 60000}, + ?assertMatch({ok, started}, amoc_throttle:start(?FUNCTION_NAME, Config)), + State = get_throttle_info(?FUNCTION_NAME), + ?assertMatch(#{rate := 100, interval := 60000, active := true}, State). %% Helpers @@ -217,6 +496,39 @@ assert_telemetry_event(Name, Measurement, Throttle, Rate, Interval) -> end, ?assert(lists:any(IsLowRateEventFn, TelemetryEvents)). +pool_config_property_tests(RateGen, IntervalGen) -> + Fun = fun(Rate, Interval) -> + R1 = amoc_throttle_config:pool_config(Rate, Interval), + accumulated_is_requested(Rate, Interval, R1) + end, + [ Fun(Rate, IntervalGen) || Rate <- RateGen]. + +accumulated_is_requested(Rate, Interval, Res) -> + Fold = fun(_N, #{status := active, delay := D}, Acc) -> + Acc + (1 / D); + (_, _, Acc) -> + Acc + end, + NumberOfActionsPerMs = maps:fold(Fold, +0.0, Res), + Expected = Rate / Interval, + %% This is checking that the difference is less than 1% by multipliting + %% by a 100 and then checking it is smaller. + Error = (abs(NumberOfActionsPerMs - Expected) * 100), + Expected >= Error + orelse throw(#{throttle => #{rate => Rate, interval => Interval}, + expected_rate_per_minute => Expected, + returned_aggregated_rate_per_minute => NumberOfActionsPerMs, + error_percentage => Error/Expected, + config => filter_only_actives(Res)}). + +filter_only_actives(Res) -> + maps:filter(fun(_, #{status := Status}) -> Status =:= active end, Res). + +run_prop(PropName, Property, NumTests, Workers) -> + Opts = [noshrink, {start_size, 1}, {numtests, NumTests}, {numworkers, Workers}], + Res = proper:counterexample(proper:conjunction([{PropName, Property}]), Opts), + ?assertEqual(true, Res). + get_throttle_workers(Name) -> pg:get_members(amoc_throttle, Name). @@ -224,11 +536,10 @@ get_number_of_workers(Name) -> Processes = get_throttle_workers(Name), length(Processes). -get_state_of_one_process(Name) -> - Processes = get_throttle_workers(Name), - ?assertMatch([_ | _], Processes), - [Process | _] = Processes, - amoc_throttle_process:get_state(Process). +get_throttle_info(Name) -> + Info = amoc_throttle_controller:get_info(Name), + ?assert(is_map(Info)), + Info. wait_until_one_throttle_worker(Name) -> GetWorkers = fun() -> get_throttle_workers(Name) end,