Skip to content

Commit 89b67f4

Browse files
committed
Better pruning for RubyThreadPoolExecutor
1 parent 6e2bd8a commit 89b67f4

8 files changed

+140
-106
lines changed
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,55 @@
1+
module Concurrent
2+
module Collection
3+
# @!visibility private
4+
# @!macro ruby_timeout_queue
5+
class RubyTimeoutQueue < ::Queue
6+
def initialize(*args)
7+
if RUBY_VERSION >= '3.2'
8+
raise 'RubyTimeoutQueue is not needed on Ruby 3.2 or later, use ::Queue instead'
9+
end
10+
11+
super(*args)
12+
13+
@mutex = Mutex.new
14+
@cond_var = ConditionVariable.new
15+
end
16+
17+
def push(obj)
18+
super(obj).tap { @mutex.synchronize { @cond_var.signal } }
19+
end
20+
alias_method :enq, :push
21+
alias_method :<<, :push
22+
23+
def pop(non_block = false, timeout: nil)
24+
if non_block && timeout
25+
raise ArgumentError, "can't set a timeout if non_block is enabled"
26+
end
27+
28+
if non_block
29+
super(true)
30+
elsif empty? && timed_out?(timeout) { @mutex.synchronize { @cond_var.wait(@mutex, timeout) } }
31+
nil
32+
else
33+
super(false)
34+
end
35+
end
36+
alias_method :deq, :pop
37+
alias_method :shift, :pop
38+
39+
private
40+
41+
def timed_out?(timeout)
42+
return unless timeout
43+
44+
# https://github.com/ruby/ruby/pull/4256
45+
if RUBY_VERSION >= '3.1'
46+
yield.nil?
47+
else
48+
deadline = Process.clock_gettime(Process::CLOCK_MONOTONIC) + timeout
49+
yield
50+
Process.clock_gettime(Process::CLOCK_MONOTONIC) >= deadline
51+
end
52+
end
53+
end
54+
end
55+
end
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,18 @@
1+
module Concurrent
2+
module Collection
3+
# @!visibility private
4+
# @!macro internal_implementation_note
5+
TimeoutQueueImplementation = if RUBY_VERSION >= '3.2'
6+
::Queue
7+
else
8+
require 'concurrent/collection/ruby_timeout_queue'
9+
RubyTimeoutQueue
10+
end
11+
private_constant :TimeoutQueueImplementation
12+
13+
# @!visibility private
14+
# @!macro timeout_queue
15+
class TimeoutQueue < TimeoutQueueImplementation
16+
end
17+
end
18+
end

lib/concurrent-ruby/concurrent/executor/fixed_thread_pool.rb

-14
Original file line numberDiff line numberDiff line change
@@ -75,17 +75,6 @@ module Concurrent
7575
# @return [Integer] Number of tasks that may be enqueued before reaching `max_queue` and rejecting
7676
# new tasks. A value of -1 indicates that the queue may grow without bound.
7777

78-
# @!macro thread_pool_executor_method_prune_pool
79-
# Prune the thread pool of unneeded threads
80-
#
81-
# What is being pruned is controlled by the min_threads and idletime
82-
# parameters passed at pool creation time
83-
#
84-
# This is a no-op on some pool implementation (e.g. the Java one). The Ruby
85-
# pool will auto-prune each time a new job is posted. You will need to call
86-
# this method explicitly in case your application post jobs in bursts (a
87-
# lot of jobs and then nothing for long periods)
88-
8978
# @!macro thread_pool_executor_public_api
9079
#
9180
# @!macro abstract_executor_service_public_api
@@ -122,9 +111,6 @@ module Concurrent
122111
#
123112
# @!method can_overflow?
124113
# @!macro executor_service_method_can_overflow_question
125-
#
126-
# @!method prune_pool
127-
# @!macro thread_pool_executor_method_prune_pool
128114

129115

130116

lib/concurrent-ruby/concurrent/executor/java_thread_pool_executor.rb

-4
Original file line numberDiff line numberDiff line change
@@ -98,10 +98,6 @@ def running?
9898
super && !@executor.isTerminating
9999
end
100100

101-
# @!macro thread_pool_executor_method_prune_pool
102-
def prune_pool
103-
end
104-
105101
private
106102

107103
def ns_initialize(opts)

lib/concurrent-ruby/concurrent/executor/ruby_thread_pool_executor.rb

+36-31
Original file line numberDiff line numberDiff line change
@@ -3,6 +3,7 @@
33
require 'concurrent/concern/logging'
44
require 'concurrent/executor/ruby_executor_service'
55
require 'concurrent/utility/monotonic_time'
6+
require 'concurrent/collection/timeout_queue'
67

78
module Concurrent
89

@@ -104,6 +105,11 @@ def ready_worker(worker, last_message)
104105
synchronize { ns_ready_worker worker, last_message }
105106
end
106107

108+
# @!visibility private
109+
def remove_ready_worker(worker)
110+
synchronize { ns_remove_ready_worker worker }
111+
end
112+
107113
# @!visibility private
108114
def worker_died(worker)
109115
synchronize { ns_worker_died worker }
@@ -114,9 +120,9 @@ def worker_task_completed
114120
synchronize { @completed_task_count += 1 }
115121
end
116122

117-
# @!macro thread_pool_executor_method_prune_pool
118-
def prune_pool
119-
synchronize { ns_prune_pool }
123+
# @!visibility private
124+
def prunable_capacity
125+
synchronize { ns_prunable_capacity }
120126
end
121127

122128
private
@@ -146,9 +152,6 @@ def ns_initialize(opts)
146152
@largest_length = 0
147153
@workers_counter = 0
148154
@ruby_pid = $$ # detects if Ruby has forked
149-
150-
@gc_interval = opts.fetch(:gc_interval, @idletime / 2.0).to_i # undocumented
151-
@next_gc_time = Concurrent.monotonic_time + @gc_interval
152155
end
153156

154157
# @!visibility private
@@ -162,12 +165,10 @@ def ns_execute(*args, &task)
162165

163166
if ns_assign_worker(*args, &task) || ns_enqueue(*args, &task)
164167
@scheduled_task_count += 1
168+
nil
165169
else
166-
return fallback_action(*args, &task)
170+
fallback_action(*args, &task)
167171
end
168-
169-
ns_prune_pool if @next_gc_time < Concurrent.monotonic_time
170-
nil
171172
end
172173

173174
# @!visibility private
@@ -218,7 +219,7 @@ def ns_assign_worker(*args, &task)
218219
# @!visibility private
219220
def ns_enqueue(*args, &task)
220221
return false if @synchronous
221-
222+
222223
if !ns_limited_queue? || @queue.size < @max_queue
223224
@queue << [task, args]
224225
true
@@ -265,7 +266,7 @@ def ns_ready_worker(worker, last_message, success = true)
265266
end
266267
end
267268

268-
# removes a worker which is not in not tracked in @ready
269+
# removes a worker which is not tracked in @ready
269270
#
270271
# @!visibility private
271272
def ns_remove_busy_worker(worker)
@@ -274,23 +275,15 @@ def ns_remove_busy_worker(worker)
274275
true
275276
end
276277

277-
# try oldest worker if it is idle for enough time, it's returned back at the start
278-
#
279-
# @!visibility private
280-
def ns_prune_pool
281-
now = Concurrent.monotonic_time
282-
stopped_workers = 0
283-
while !@ready.empty? && (@pool.size - stopped_workers > @min_length)
284-
worker, last_message = @ready.first
285-
if now - last_message > self.idletime
286-
stopped_workers += 1
287-
@ready.shift
288-
worker << :stop
289-
else break
290-
end
278+
def ns_remove_ready_worker(worker)
279+
if index = @ready.index { |rw, _| rw == worker }
280+
@ready.delete_at(index)
291281
end
282+
true
283+
end
292284

293-
@next_gc_time = Concurrent.monotonic_time + @gc_interval
285+
def ns_prunable_capacity
286+
[@pool.size - @min_length, @ready.size].min
294287
end
295288

296289
def ns_reset_if_forked
@@ -312,7 +305,7 @@ class Worker
312305

313306
def initialize(pool, id)
314307
# instance variables accessed only under pool's lock so no need to sync here again
315-
@queue = Queue.new
308+
@queue = Collection::TimeoutQueue.new
316309
@pool = pool
317310
@thread = create_worker @queue, pool, pool.idletime
318311

@@ -338,17 +331,29 @@ def kill
338331
def create_worker(queue, pool, idletime)
339332
Thread.new(queue, pool, idletime) do |my_queue, my_pool, my_idletime|
340333
catch(:stop) do
341-
loop do
334+
not_prunable = false
342335

343-
case message = my_queue.pop
336+
loop do
337+
timeout = not_prunable ? nil : my_idletime
338+
case message = my_queue.pop(timeout: timeout)
339+
when nil
340+
unless my_pool.prunable_capacity.positive?
341+
not_prunable = true
342+
next
343+
end
344+
345+
my_pool.remove_ready_worker(self)
346+
my_pool.remove_busy_worker(self)
347+
throw :stop
344348
when :stop
349+
my_pool.remove_ready_worker(self)
345350
my_pool.remove_busy_worker(self)
346351
throw :stop
347-
348352
else
349353
task, args = message
350354
run_task my_pool, task, args
351355
my_pool.ready_worker(self, Concurrent.monotonic_time)
356+
not_prunable = false
352357
end
353358
end
354359
end

spec/concurrent/executor/cached_thread_pool_spec.rb

+15-15
Original file line numberDiff line numberDiff line change
@@ -152,15 +152,13 @@ module Concurrent
152152

153153
context 'garbage collection' do
154154

155-
subject { described_class.new(idletime: 0.1, max_threads: 2, gc_interval: 0) }
155+
subject { described_class.new(idletime: 0.1, max_threads: 2) }
156156

157157
it 'removes from pool any thread that has been idle too long' do
158158
latch = Concurrent::CountDownLatch.new(4)
159159
4.times { subject.post { sleep 0.1; latch.count_down } }
160+
sleep 10
160161
expect(latch.wait(1)).to be true
161-
sleep 0.2
162-
subject.post {}
163-
sleep 0.2
164162
expect(subject.length).to be < 4
165163
end
166164

@@ -197,25 +195,27 @@ module Concurrent
197195
expect(subject.length).to be >= 5
198196
3.times { subject << proc { sleep(1) } }
199197
sleep(0.1)
200-
expect(subject.length).to be >= 5
198+
expect(subject.length).to be >= 3
201199
end
202200
end
203201
end
204202

205203
context 'stress' do
206204
configurations = [
207-
{ min_threads: 2,
208-
max_threads: ThreadPoolExecutor::DEFAULT_MAX_POOL_SIZE,
209-
idletime: 0.1, # 1 minute
210-
max_queue: 0, # unlimited
205+
{
206+
min_threads: 2,
207+
max_threads: ThreadPoolExecutor::DEFAULT_MAX_POOL_SIZE,
208+
idletime: 60, # 1 minute
209+
max_queue: 0, # unlimited
211210
fallback_policy: :caller_runs, # shouldn't matter -- 0 max queue
212-
gc_interval: 0.1 },
213-
{ min_threads: 2,
214-
max_threads: 4,
215-
idletime: 0.1, # 1 minute
216-
max_queue: 0, # unlimited
211+
},
212+
{
213+
min_threads: 2,
214+
max_threads: 4,
215+
idletime: 60, # 1 minute
216+
max_queue: 0, # unlimited
217217
fallback_policy: :caller_runs, # shouldn't matter -- 0 max queue
218-
gc_interval: 0.1 }
218+
}
219219
]
220220

221221
configurations.each do |config|

spec/concurrent/executor/java_thread_pool_executor_spec.rb

-7
Original file line numberDiff line numberDiff line change
@@ -26,13 +26,6 @@ module Concurrent
2626

2727
it_should_behave_like :thread_pool_executor
2828

29-
context :prune do
30-
it "is a no-op, pruning is handled by the JVM" do
31-
executor = JavaThreadPoolExecutor.new
32-
executor.prune_pool
33-
end
34-
end
35-
3629
context '#overload_policy' do
3730

3831
specify ':abort maps to AbortPolicy' do

0 commit comments

Comments
 (0)