Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Asynchronous pruning for RubyThreadPoolExecutor #1082

Open
wants to merge 1 commit into
base: master
Choose a base branch
from
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
55 changes: 55 additions & 0 deletions lib/concurrent-ruby/concurrent/collection/ruby_timeout_queue.rb
Original file line number Diff line number Diff line change
@@ -0,0 +1,55 @@
module Concurrent
module Collection
# @!visibility private
# @!macro ruby_timeout_queue
class RubyTimeoutQueue < ::Queue
def initialize(*args)
if RUBY_VERSION >= '3.2'
raise "#{self.class.name} is not needed on Ruby 3.2 or later, use ::Queue instead"
end

super(*args)

@mutex = Mutex.new
@cond_var = ConditionVariable.new
end

def push(obj)
@mutex.synchronize do
super(obj)
@cond_var.signal
end
end
alias_method :enq, :push
alias_method :<<, :push

def pop(non_block = false, timeout: nil)
if non_block && timeout
raise ArgumentError, "can't set a timeout if non_block is enabled"
end

if non_block
super(true)
elsif timeout
@mutex.synchronize do
deadline = Concurrent.monotonic_time + timeout
while (now = Concurrent.monotonic_time) < deadline && empty?
@cond_var.wait(@mutex, deadline - now)
end
begin
return super(true)
rescue ThreadError
# still empty
nil
end
end
else
super(false)
end
end
alias_method :deq, :pop
alias_method :shift, :pop
end
private_constant :RubyTimeoutQueue
end
end
18 changes: 18 additions & 0 deletions lib/concurrent-ruby/concurrent/collection/timeout_queue.rb
Original file line number Diff line number Diff line change
@@ -0,0 +1,18 @@
module Concurrent
module Collection
# @!visibility private
# @!macro internal_implementation_note
TimeoutQueueImplementation = if RUBY_VERSION >= '3.2'
::Queue
else
require 'concurrent/collection/ruby_timeout_queue'
RubyTimeoutQueue
end
private_constant :TimeoutQueueImplementation

# @!visibility private
# @!macro timeout_queue
class TimeoutQueue < TimeoutQueueImplementation
end
end
end
6 changes: 2 additions & 4 deletions lib/concurrent-ruby/concurrent/executor/fixed_thread_pool.rb
Original file line number Diff line number Diff line change
Expand Up @@ -81,10 +81,8 @@ module Concurrent
# What is being pruned is controlled by the min_threads and idletime
# parameters passed at pool creation time
#
# This is a no-op on some pool implementation (e.g. the Java one). The Ruby
# pool will auto-prune each time a new job is posted. You will need to call
# this method explicitly in case your application post jobs in bursts (a
# lot of jobs and then nothing for long periods)
# This is a no-op on all pool implementations as they prune themselves
# automatically, and has been deprecated.

# @!macro thread_pool_executor_public_api
#
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -8,6 +8,7 @@ module Concurrent
# @!macro thread_pool_options
# @!visibility private
class JavaThreadPoolExecutor < JavaExecutorService
include Concern::Deprecation

# @!macro thread_pool_executor_constant_default_max_pool_size
DEFAULT_MAX_POOL_SIZE = java.lang.Integer::MAX_VALUE # 2147483647
Expand Down Expand Up @@ -100,6 +101,7 @@ def running?

# @!macro thread_pool_executor_method_prune_pool
def prune_pool
deprecated "#prune_pool has no effect and will be removed in the next release."
end

private
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -3,13 +3,15 @@
require 'concurrent/concern/logging'
require 'concurrent/executor/ruby_executor_service'
require 'concurrent/utility/monotonic_time'
require 'concurrent/collection/timeout_queue'

module Concurrent

# @!macro thread_pool_executor
# @!macro thread_pool_options
# @!visibility private
class RubyThreadPoolExecutor < RubyExecutorService
include Concern::Deprecation

# @!macro thread_pool_executor_constant_default_max_pool_size
DEFAULT_MAX_POOL_SIZE = 2_147_483_647 # java.lang.Integer::MAX_VALUE
Expand Down Expand Up @@ -94,9 +96,28 @@ def remaining_capacity
end
end

# removes the worker if it can be pruned
#
# @return [true, false] if the worker was pruned
#
# @!visibility private
def remove_busy_worker(worker)
synchronize { ns_remove_busy_worker worker }
def prune_worker(worker)
synchronize do
if ns_prunable_capacity > 0
remove_worker worker
true
else
false
end
end
end

# @!visibility private
def remove_worker(worker)
synchronize do
ns_remove_ready_worker worker
ns_remove_busy_worker worker
end
end

# @!visibility private
Expand All @@ -116,7 +137,7 @@ def worker_task_completed

# @!macro thread_pool_executor_method_prune_pool
def prune_pool
synchronize { ns_prune_pool }
deprecated "#prune_pool has no effect and will be removed in next the release, see https://github.com/ruby-concurrency/concurrent-ruby/pull/1082."
end

private
Expand Down Expand Up @@ -146,9 +167,6 @@ def ns_initialize(opts)
@largest_length = 0
@workers_counter = 0
@ruby_pid = $$ # detects if Ruby has forked

@gc_interval = opts.fetch(:gc_interval, @idletime / 2.0).to_i # undocumented
@next_gc_time = Concurrent.monotonic_time + @gc_interval
end

# @!visibility private
Expand All @@ -162,12 +180,10 @@ def ns_execute(*args, &task)

if ns_assign_worker(*args, &task) || ns_enqueue(*args, &task)
@scheduled_task_count += 1
nil
else
return fallback_action(*args, &task)
fallback_action(*args, &task)
end

ns_prune_pool if @next_gc_time < Concurrent.monotonic_time
nil
end

# @!visibility private
Expand Down Expand Up @@ -218,7 +234,7 @@ def ns_assign_worker(*args, &task)
# @!visibility private
def ns_enqueue(*args, &task)
return false if @synchronous

if !ns_limited_queue? || @queue.size < @max_queue
@queue << [task, args]
true
Expand Down Expand Up @@ -265,7 +281,7 @@ def ns_ready_worker(worker, last_message, success = true)
end
end

# removes a worker which is not in not tracked in @ready
# removes a worker which is not tracked in @ready
#
# @!visibility private
def ns_remove_busy_worker(worker)
Expand All @@ -274,25 +290,27 @@ def ns_remove_busy_worker(worker)
true
end

# try oldest worker if it is idle for enough time, it's returned back at the start
#
# @!visibility private
def ns_prune_pool
now = Concurrent.monotonic_time
stopped_workers = 0
while [email protected]? && (@pool.size - stopped_workers > @min_length)
worker, last_message = @ready.first
if now - last_message > self.idletime
stopped_workers += 1
@ready.shift
worker << :stop
else break
end
def ns_remove_ready_worker(worker)
if index = @ready.index { |rw, _| rw == worker }
@ready.delete_at(index)
end
true
end

@next_gc_time = Concurrent.monotonic_time + @gc_interval
# @return [Integer] number of excess idle workers which can be removed without
# going below min_length, or all workers if not running
#
# @!visibility private
def ns_prunable_capacity
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Could you explain the logic here? Is it how many workers can be removed at the current time?

Copy link
Author

@joshuay03 joshuay03 Apr 2, 2025

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

if running?
[@pool.size - @min_length, @ready.size].min
else
@pool.size
end
end

# @!visibility private
def ns_reset_if_forked
if $$ != @ruby_pid
@queue.clear
Expand All @@ -312,7 +330,7 @@ class Worker

def initialize(pool, id)
# instance variables accessed only under pool's lock so no need to sync here again
@queue = Queue.new
@queue = Collection::TimeoutQueue.new
@pool = pool
@thread = create_worker @queue, pool, pool.idletime

Expand All @@ -338,17 +356,22 @@ def kill
def create_worker(queue, pool, idletime)
Thread.new(queue, pool, idletime) do |my_queue, my_pool, my_idletime|
catch(:stop) do
loop do
prunable = true

case message = my_queue.pop
loop do
timeout = prunable && my_pool.running? ? my_idletime : nil
case message = my_queue.pop(timeout: timeout)
when nil
throw :stop if my_pool.prune_worker(self)
prunable = false
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Is it impossible if prunable_capacity is <= 0 that a worker would need to be prunable again?
i.e. what if prunable_capacity is 0 because all workers are busy but later on the pool is no longer running? and then we should prune/finish all workers?

Copy link
Author

@joshuay03 joshuay03 Apr 2, 2025

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

i.e. what if prunable_capacity is 0 because all workers are busy but later on the pool is no longer running? and then we should prune/finish all workers?

If a worker is busy they will be marked as ready when the task has completed, during which we stop them (in #ns_ready_worker) if the pool is no longer running. Even if the pool is only stopped after this point, we also mark that worker as prune-able again, which means they will eventually time out (cause work is no longer being accepted) and prune themselves.

when :stop
my_pool.remove_busy_worker(self)
my_pool.remove_worker(self)
throw :stop

else
task, args = message
run_task my_pool, task, args
my_pool.ready_worker(self, Concurrent.monotonic_time)
prunable = true
end
end
end
Expand Down
30 changes: 15 additions & 15 deletions spec/concurrent/executor/cached_thread_pool_spec.rb
Original file line number Diff line number Diff line change
Expand Up @@ -152,15 +152,13 @@ module Concurrent

context 'garbage collection' do

subject { described_class.new(idletime: 0.1, max_threads: 2, gc_interval: 0) }
subject { described_class.new(idletime: 0.1, max_threads: 2) }

it 'removes from pool any thread that has been idle too long' do
latch = Concurrent::CountDownLatch.new(4)
4.times { subject.post { sleep 0.1; latch.count_down } }
sleep 0.4
expect(latch.wait(1)).to be true
sleep 0.2
subject.post {}
sleep 0.2
expect(subject.length).to be < 4
end

Expand Down Expand Up @@ -197,25 +195,27 @@ module Concurrent
expect(subject.length).to be >= 5
3.times { subject << proc { sleep(1) } }
sleep(0.1)
expect(subject.length).to be >= 5
expect(subject.length).to be >= 3
end
end
end

context 'stress' do
configurations = [
{ min_threads: 2,
max_threads: ThreadPoolExecutor::DEFAULT_MAX_POOL_SIZE,
idletime: 0.1, # 1 minute
max_queue: 0, # unlimited
{
min_threads: 2,
max_threads: ThreadPoolExecutor::DEFAULT_MAX_POOL_SIZE,
idletime: 60, # 1 minute
max_queue: 0, # unlimited
fallback_policy: :caller_runs, # shouldn't matter -- 0 max queue
gc_interval: 0.1 },
{ min_threads: 2,
max_threads: 4,
idletime: 0.1, # 1 minute
max_queue: 0, # unlimited
},
{
min_threads: 2,
max_threads: 4,
idletime: 60, # 1 minute
max_queue: 0, # unlimited
fallback_policy: :caller_runs, # shouldn't matter -- 0 max queue
gc_interval: 0.1 }
}
]

configurations.each do |config|
Expand Down
7 changes: 0 additions & 7 deletions spec/concurrent/executor/java_thread_pool_executor_spec.rb
Original file line number Diff line number Diff line change
Expand Up @@ -26,13 +26,6 @@ module Concurrent

it_should_behave_like :thread_pool_executor

context :prune do
it "is a no-op, pruning is handled by the JVM" do
executor = JavaThreadPoolExecutor.new
executor.prune_pool
end
end

context '#overload_policy' do

specify ':abort maps to AbortPolicy' do
Expand Down
Loading