Skip to content

Commit f90d46d

Browse files
committed
Better pruning for RubyThreadPoolExecutor
1 parent 6e2bd8a commit f90d46d

8 files changed

+127
-107
lines changed
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,44 @@
1+
module Concurrent
2+
module Collection
3+
# @!visibility private
4+
# @!macro ruby_timeout_queue
5+
class RubyTimeoutQueue < ::Queue
6+
def initialize(*args)
7+
if RUBY_VERSION >= '3.2'
8+
raise 'RubyTimeoutQueue is not needed on Ruby 3.2 or later, use ::Queue instead'
9+
end
10+
11+
super(*args)
12+
13+
@mutex = Mutex.new
14+
@cond_var = ConditionVariable.new
15+
end
16+
17+
def push(obj)
18+
super(obj).tap { @mutex.synchronize { @cond_var.signal } }
19+
end
20+
alias_method :enq, :push
21+
alias_method :<<, :push
22+
23+
def pop(non_block = false, timeout: nil)
24+
if non_block && timeout
25+
raise ArgumentError, "can't set a timeout if non_block is enabled"
26+
end
27+
28+
if non_block
29+
super(true)
30+
elsif timeout && empty?
31+
if @mutex.synchronize { @cond_var.wait(@mutex, timeout) }
32+
super(false)
33+
else
34+
nil
35+
end
36+
else
37+
super(false)
38+
end
39+
end
40+
alias_method :deq, :pop
41+
alias_method :shift, :pop
42+
end
43+
end
44+
end
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,18 @@
1+
module Concurrent
2+
module Collection
3+
# @!visibility private
4+
# @!macro internal_implementation_note
5+
TimeoutQueueImplementation = if RUBY_VERSION >= '3.2'
6+
::Queue
7+
else
8+
require 'concurrent/collection/ruby_timeout_queue'
9+
RubyTimeoutQueue
10+
end
11+
private_constant :TimeoutQueueImplementation
12+
13+
# @!visibility private
14+
# @!macro timeout_queue
15+
class TimeoutQueue < TimeoutQueueImplementation
16+
end
17+
end
18+
end

lib/concurrent-ruby/concurrent/executor/fixed_thread_pool.rb

-14
Original file line numberDiff line numberDiff line change
@@ -75,17 +75,6 @@ module Concurrent
7575
# @return [Integer] Number of tasks that may be enqueued before reaching `max_queue` and rejecting
7676
# new tasks. A value of -1 indicates that the queue may grow without bound.
7777

78-
# @!macro thread_pool_executor_method_prune_pool
79-
# Prune the thread pool of unneeded threads
80-
#
81-
# What is being pruned is controlled by the min_threads and idletime
82-
# parameters passed at pool creation time
83-
#
84-
# This is a no-op on some pool implementation (e.g. the Java one). The Ruby
85-
# pool will auto-prune each time a new job is posted. You will need to call
86-
# this method explicitly in case your application post jobs in bursts (a
87-
# lot of jobs and then nothing for long periods)
88-
8978
# @!macro thread_pool_executor_public_api
9079
#
9180
# @!macro abstract_executor_service_public_api
@@ -122,9 +111,6 @@ module Concurrent
122111
#
123112
# @!method can_overflow?
124113
# @!macro executor_service_method_can_overflow_question
125-
#
126-
# @!method prune_pool
127-
# @!macro thread_pool_executor_method_prune_pool
128114

129115

130116

lib/concurrent-ruby/concurrent/executor/java_thread_pool_executor.rb

-4
Original file line numberDiff line numberDiff line change
@@ -98,10 +98,6 @@ def running?
9898
super && !@executor.isTerminating
9999
end
100100

101-
# @!macro thread_pool_executor_method_prune_pool
102-
def prune_pool
103-
end
104-
105101
private
106102

107103
def ns_initialize(opts)

lib/concurrent-ruby/concurrent/executor/ruby_thread_pool_executor.rb

+35-32
Original file line numberDiff line numberDiff line change
@@ -3,6 +3,7 @@
33
require 'concurrent/concern/logging'
44
require 'concurrent/executor/ruby_executor_service'
55
require 'concurrent/utility/monotonic_time'
6+
require 'concurrent/collection/timeout_queue'
67

78
module Concurrent
89

@@ -104,6 +105,11 @@ def ready_worker(worker, last_message)
104105
synchronize { ns_ready_worker worker, last_message }
105106
end
106107

108+
# @!visibility private
109+
def remove_ready_worker(worker)
110+
synchronize { ns_remove_ready_worker worker }
111+
end
112+
107113
# @!visibility private
108114
def worker_died(worker)
109115
synchronize { ns_worker_died worker }
@@ -114,9 +120,9 @@ def worker_task_completed
114120
synchronize { @completed_task_count += 1 }
115121
end
116122

117-
# @!macro thread_pool_executor_method_prune_pool
118-
def prune_pool
119-
synchronize { ns_prune_pool }
123+
# @!visibility private
124+
def prunable_capacity
125+
synchronize { ns_prunable_capacity }
120126
end
121127

122128
private
@@ -146,9 +152,6 @@ def ns_initialize(opts)
146152
@largest_length = 0
147153
@workers_counter = 0
148154
@ruby_pid = $$ # detects if Ruby has forked
149-
150-
@gc_interval = opts.fetch(:gc_interval, @idletime / 2.0).to_i # undocumented
151-
@next_gc_time = Concurrent.monotonic_time + @gc_interval
152155
end
153156

154157
# @!visibility private
@@ -162,12 +165,10 @@ def ns_execute(*args, &task)
162165

163166
if ns_assign_worker(*args, &task) || ns_enqueue(*args, &task)
164167
@scheduled_task_count += 1
168+
nil
165169
else
166-
return fallback_action(*args, &task)
170+
fallback_action(*args, &task)
167171
end
168-
169-
ns_prune_pool if @next_gc_time < Concurrent.monotonic_time
170-
nil
171172
end
172173

173174
# @!visibility private
@@ -218,7 +219,7 @@ def ns_assign_worker(*args, &task)
218219
# @!visibility private
219220
def ns_enqueue(*args, &task)
220221
return false if @synchronous
221-
222+
222223
if !ns_limited_queue? || @queue.size < @max_queue
223224
@queue << [task, args]
224225
true
@@ -265,7 +266,7 @@ def ns_ready_worker(worker, last_message, success = true)
265266
end
266267
end
267268

268-
# removes a worker which is not in not tracked in @ready
269+
# removes a worker which is not tracked in @ready
269270
#
270271
# @!visibility private
271272
def ns_remove_busy_worker(worker)
@@ -274,23 +275,13 @@ def ns_remove_busy_worker(worker)
274275
true
275276
end
276277

277-
# try oldest worker if it is idle for enough time, it's returned back at the start
278-
#
279-
# @!visibility private
280-
def ns_prune_pool
281-
now = Concurrent.monotonic_time
282-
stopped_workers = 0
283-
while !@ready.empty? && (@pool.size - stopped_workers > @min_length)
284-
worker, last_message = @ready.first
285-
if now - last_message > self.idletime
286-
stopped_workers += 1
287-
@ready.shift
288-
worker << :stop
289-
else break
290-
end
291-
end
278+
def ns_remove_ready_worker(worker)
279+
@ready.delete_if { |rw, _| rw == worker }
280+
true
281+
end
292282

293-
@next_gc_time = Concurrent.monotonic_time + @gc_interval
283+
def ns_prunable_capacity
284+
[@pool.size - @min_length, @ready.size].min
294285
end
295286

296287
def ns_reset_if_forked
@@ -312,7 +303,7 @@ class Worker
312303

313304
def initialize(pool, id)
314305
# instance variables accessed only under pool's lock so no need to sync here again
315-
@queue = Queue.new
306+
@queue = Collection::TimeoutQueue.new
316307
@pool = pool
317308
@thread = create_worker @queue, pool, pool.idletime
318309

@@ -338,17 +329,29 @@ def kill
338329
def create_worker(queue, pool, idletime)
339330
Thread.new(queue, pool, idletime) do |my_queue, my_pool, my_idletime|
340331
catch(:stop) do
341-
loop do
332+
not_prunable = false
342333

343-
case message = my_queue.pop
334+
loop do
335+
timeout = not_prunable ? nil : my_idletime
336+
case message = my_queue.pop(timeout: timeout)
337+
when nil
338+
unless my_pool.prunable_capacity.positive?
339+
not_prunable = true
340+
next
341+
end
342+
343+
my_pool.remove_ready_worker(self)
344+
my_pool.remove_busy_worker(self)
345+
throw :stop
344346
when :stop
347+
my_pool.remove_ready_worker(self)
345348
my_pool.remove_busy_worker(self)
346349
throw :stop
347-
348350
else
349351
task, args = message
350352
run_task my_pool, task, args
351353
my_pool.ready_worker(self, Concurrent.monotonic_time)
354+
not_prunable = false
352355
end
353356
end
354357
end

spec/concurrent/executor/cached_thread_pool_spec.rb

+14-15
Original file line numberDiff line numberDiff line change
@@ -152,15 +152,12 @@ module Concurrent
152152

153153
context 'garbage collection' do
154154

155-
subject { described_class.new(idletime: 0.1, max_threads: 2, gc_interval: 0) }
155+
subject { described_class.new(idletime: 0.1, max_threads: 2) }
156156

157157
it 'removes from pool any thread that has been idle too long' do
158158
latch = Concurrent::CountDownLatch.new(4)
159159
4.times { subject.post { sleep 0.1; latch.count_down } }
160160
expect(latch.wait(1)).to be true
161-
sleep 0.2
162-
subject.post {}
163-
sleep 0.2
164161
expect(subject.length).to be < 4
165162
end
166163

@@ -197,25 +194,27 @@ module Concurrent
197194
expect(subject.length).to be >= 5
198195
3.times { subject << proc { sleep(1) } }
199196
sleep(0.1)
200-
expect(subject.length).to be >= 5
197+
expect(subject.length).to be >= 3
201198
end
202199
end
203200
end
204201

205202
context 'stress' do
206203
configurations = [
207-
{ min_threads: 2,
208-
max_threads: ThreadPoolExecutor::DEFAULT_MAX_POOL_SIZE,
209-
idletime: 0.1, # 1 minute
210-
max_queue: 0, # unlimited
204+
{
205+
min_threads: 2,
206+
max_threads: ThreadPoolExecutor::DEFAULT_MAX_POOL_SIZE,
207+
idletime: 60, # 1 minute
208+
max_queue: 0, # unlimited
211209
fallback_policy: :caller_runs, # shouldn't matter -- 0 max queue
212-
gc_interval: 0.1 },
213-
{ min_threads: 2,
214-
max_threads: 4,
215-
idletime: 0.1, # 1 minute
216-
max_queue: 0, # unlimited
210+
},
211+
{
212+
min_threads: 2,
213+
max_threads: 4,
214+
idletime: 60, # 1 minute
215+
max_queue: 0, # unlimited
217216
fallback_policy: :caller_runs, # shouldn't matter -- 0 max queue
218-
gc_interval: 0.1 }
217+
}
219218
]
220219

221220
configurations.each do |config|

spec/concurrent/executor/java_thread_pool_executor_spec.rb

-7
Original file line numberDiff line numberDiff line change
@@ -26,13 +26,6 @@ module Concurrent
2626

2727
it_should_behave_like :thread_pool_executor
2828

29-
context :prune do
30-
it "is a no-op, pruning is handled by the JVM" do
31-
executor = JavaThreadPoolExecutor.new
32-
executor.prune_pool
33-
end
34-
end
35-
3629
context '#overload_policy' do
3730

3831
specify ':abort maps to AbortPolicy' do

0 commit comments

Comments
 (0)