Skip to content

Commit fd02056

Browse files
committed
Asynchronous pruning for RubyThreadPoolExecutor
1 parent 2aa6f64 commit fd02056

8 files changed

+162
-111
lines changed
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,73 @@
1+
module Concurrent
2+
module Collection
3+
# @!visibility private
4+
# @!macro ruby_timeout_queue
5+
class RubyTimeoutQueue < ::Queue
6+
def initialize(*args)
7+
if RUBY_VERSION >= '3.2'
8+
raise "#{self.class.name} is not needed on Ruby 3.2 or later, use ::Queue instead"
9+
end
10+
11+
super(*args)
12+
13+
@mutex = Mutex.new
14+
@cond_var = ConditionVariable.new
15+
end
16+
17+
def push(obj)
18+
@mutex.synchronize do
19+
super(obj)
20+
@cond_var.signal
21+
end
22+
end
23+
alias_method :enq, :push
24+
alias_method :<<, :push
25+
26+
def pop(non_block = false, timeout: nil)
27+
if non_block && timeout
28+
raise ArgumentError, "can't set a timeout if non_block is enabled"
29+
end
30+
31+
if non_block
32+
super(true)
33+
elsif timeout
34+
@mutex.synchronize do
35+
while empty? && timeout.positive? # handle spurious wakeups
36+
timed_out, timeout = ns_timed_out?(timeout) { @cond_var.wait(@mutex, timeout) }
37+
return nil if timed_out
38+
end
39+
40+
super(true) rescue nil
41+
end
42+
else
43+
super(false)
44+
end
45+
end
46+
alias_method :deq, :pop
47+
alias_method :shift, :pop
48+
49+
private
50+
51+
def ns_timed_out?(timeout, &cond_var_wait)
52+
# https://github.com/ruby/ruby/pull/4256
53+
timed_out = if RUBY_VERSION >= '3.1'
54+
start_time = Concurrent.monotonic_time
55+
result = cond_var_wait.call.nil?
56+
end_time = Concurrent.monotonic_time
57+
result
58+
else
59+
start_time = Concurrent.monotonic_time
60+
deadline = start_time + timeout
61+
cond_var_wait.call
62+
end_time = Concurrent.monotonic_time
63+
end_time >= deadline
64+
end
65+
66+
elapsed = end_time - start_time
67+
new_timeout = [timeout - elapsed, 0].max
68+
69+
[timed_out, new_timeout]
70+
end
71+
end
72+
end
73+
end
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,18 @@
1+
module Concurrent
2+
module Collection
3+
# @!visibility private
4+
# @!macro internal_implementation_note
5+
TimeoutQueueImplementation = if RUBY_VERSION >= '3.2'
6+
::Queue
7+
else
8+
require 'concurrent/collection/ruby_timeout_queue'
9+
RubyTimeoutQueue
10+
end
11+
private_constant :TimeoutQueueImplementation
12+
13+
# @!visibility private
14+
# @!macro timeout_queue
15+
class TimeoutQueue < TimeoutQueueImplementation
16+
end
17+
end
18+
end

lib/concurrent-ruby/concurrent/executor/fixed_thread_pool.rb

-14
Original file line numberDiff line numberDiff line change
@@ -75,17 +75,6 @@ module Concurrent
7575
# @return [Integer] Number of tasks that may be enqueued before reaching `max_queue` and rejecting
7676
# new tasks. A value of -1 indicates that the queue may grow without bound.
7777

78-
# @!macro thread_pool_executor_method_prune_pool
79-
# Prune the thread pool of unneeded threads
80-
#
81-
# What is being pruned is controlled by the min_threads and idletime
82-
# parameters passed at pool creation time
83-
#
84-
# This is a no-op on some pool implementation (e.g. the Java one). The Ruby
85-
# pool will auto-prune each time a new job is posted. You will need to call
86-
# this method explicitly in case your application post jobs in bursts (a
87-
# lot of jobs and then nothing for long periods)
88-
8978
# @!macro thread_pool_executor_public_api
9079
#
9180
# @!macro abstract_executor_service_public_api
@@ -122,9 +111,6 @@ module Concurrent
122111
#
123112
# @!method can_overflow?
124113
# @!macro executor_service_method_can_overflow_question
125-
#
126-
# @!method prune_pool
127-
# @!macro thread_pool_executor_method_prune_pool
128114

129115

130116

lib/concurrent-ruby/concurrent/executor/java_thread_pool_executor.rb

-4
Original file line numberDiff line numberDiff line change
@@ -98,10 +98,6 @@ def running?
9898
super && !@executor.isTerminating
9999
end
100100

101-
# @!macro thread_pool_executor_method_prune_pool
102-
def prune_pool
103-
end
104-
105101
private
106102

107103
def ns_initialize(opts)

lib/concurrent-ruby/concurrent/executor/ruby_thread_pool_executor.rb

+40-36
Original file line numberDiff line numberDiff line change
@@ -3,6 +3,7 @@
33
require 'concurrent/concern/logging'
44
require 'concurrent/executor/ruby_executor_service'
55
require 'concurrent/utility/monotonic_time'
6+
require 'concurrent/collection/timeout_queue'
67

78
module Concurrent
89

@@ -95,8 +96,16 @@ def remaining_capacity
9596
end
9697

9798
# @!visibility private
98-
def remove_busy_worker(worker)
99-
synchronize { ns_remove_busy_worker worker }
99+
def prunable_capacity
100+
synchronize { ns_prunable_capacity }
101+
end
102+
103+
# @!visibility private
104+
def remove_worker(worker)
105+
synchronize do
106+
ns_remove_ready_worker worker
107+
ns_remove_busy_worker worker
108+
end
100109
end
101110

102111
# @!visibility private
@@ -114,11 +123,6 @@ def worker_task_completed
114123
synchronize { @completed_task_count += 1 }
115124
end
116125

117-
# @!macro thread_pool_executor_method_prune_pool
118-
def prune_pool
119-
synchronize { ns_prune_pool }
120-
end
121-
122126
private
123127

124128
# @!visibility private
@@ -146,9 +150,6 @@ def ns_initialize(opts)
146150
@largest_length = 0
147151
@workers_counter = 0
148152
@ruby_pid = $$ # detects if Ruby has forked
149-
150-
@gc_interval = opts.fetch(:gc_interval, @idletime / 2.0).to_i # undocumented
151-
@next_gc_time = Concurrent.monotonic_time + @gc_interval
152153
end
153154

154155
# @!visibility private
@@ -162,12 +163,10 @@ def ns_execute(*args, &task)
162163

163164
if ns_assign_worker(*args, &task) || ns_enqueue(*args, &task)
164165
@scheduled_task_count += 1
166+
nil
165167
else
166-
return fallback_action(*args, &task)
168+
fallback_action(*args, &task)
167169
end
168-
169-
ns_prune_pool if @next_gc_time < Concurrent.monotonic_time
170-
nil
171170
end
172171

173172
# @!visibility private
@@ -218,7 +217,7 @@ def ns_assign_worker(*args, &task)
218217
# @!visibility private
219218
def ns_enqueue(*args, &task)
220219
return false if @synchronous
221-
220+
222221
if !ns_limited_queue? || @queue.size < @max_queue
223222
@queue << [task, args]
224223
true
@@ -265,7 +264,7 @@ def ns_ready_worker(worker, last_message, success = true)
265264
end
266265
end
267266

268-
# removes a worker which is not in not tracked in @ready
267+
# removes a worker which is not tracked in @ready
269268
#
270269
# @!visibility private
271270
def ns_remove_busy_worker(worker)
@@ -274,23 +273,19 @@ def ns_remove_busy_worker(worker)
274273
true
275274
end
276275

277-
# try oldest worker if it is idle for enough time, it's returned back at the start
278-
#
279-
# @!visibility private
280-
def ns_prune_pool
281-
now = Concurrent.monotonic_time
282-
stopped_workers = 0
283-
while !@ready.empty? && (@pool.size - stopped_workers > @min_length)
284-
worker, last_message = @ready.first
285-
if now - last_message > self.idletime
286-
stopped_workers += 1
287-
@ready.shift
288-
worker << :stop
289-
else break
290-
end
276+
def ns_remove_ready_worker(worker)
277+
if index = @ready.index { |rw, _| rw == worker }
278+
@ready.delete_at(index)
291279
end
280+
true
281+
end
292282

293-
@next_gc_time = Concurrent.monotonic_time + @gc_interval
283+
def ns_prunable_capacity
284+
if running?
285+
[@pool.size - @min_length, @ready.size].min
286+
else
287+
@pool.size
288+
end
294289
end
295290

296291
def ns_reset_if_forked
@@ -312,7 +307,7 @@ class Worker
312307

313308
def initialize(pool, id)
314309
# instance variables accessed only under pool's lock so no need to sync here again
315-
@queue = Queue.new
310+
@queue = Collection::TimeoutQueue.new
316311
@pool = pool
317312
@thread = create_worker @queue, pool, pool.idletime
318313

@@ -338,17 +333,26 @@ def kill
338333
def create_worker(queue, pool, idletime)
339334
Thread.new(queue, pool, idletime) do |my_queue, my_pool, my_idletime|
340335
catch(:stop) do
341-
loop do
336+
prunable = true
342337

343-
case message = my_queue.pop
338+
loop do
339+
timeout = prunable && my_pool.running? ? my_idletime : nil
340+
case message = my_queue.pop(timeout: timeout)
341+
when nil
342+
if my_pool.prunable_capacity.positive?
343+
my_pool.remove_worker(self)
344+
throw :stop
345+
end
346+
347+
prunable = false
344348
when :stop
345-
my_pool.remove_busy_worker(self)
349+
my_pool.remove_worker(self)
346350
throw :stop
347-
348351
else
349352
task, args = message
350353
run_task my_pool, task, args
351354
my_pool.ready_worker(self, Concurrent.monotonic_time)
355+
prunable = true
352356
end
353357
end
354358
end

spec/concurrent/executor/cached_thread_pool_spec.rb

+15-15
Original file line numberDiff line numberDiff line change
@@ -152,15 +152,13 @@ module Concurrent
152152

153153
context 'garbage collection' do
154154

155-
subject { described_class.new(idletime: 0.1, max_threads: 2, gc_interval: 0) }
155+
subject { described_class.new(idletime: 0.1, max_threads: 2) }
156156

157157
it 'removes from pool any thread that has been idle too long' do
158158
latch = Concurrent::CountDownLatch.new(4)
159159
4.times { subject.post { sleep 0.1; latch.count_down } }
160+
sleep 0.4
160161
expect(latch.wait(1)).to be true
161-
sleep 0.2
162-
subject.post {}
163-
sleep 0.2
164162
expect(subject.length).to be < 4
165163
end
166164

@@ -197,25 +195,27 @@ module Concurrent
197195
expect(subject.length).to be >= 5
198196
3.times { subject << proc { sleep(1) } }
199197
sleep(0.1)
200-
expect(subject.length).to be >= 5
198+
expect(subject.length).to be >= 3
201199
end
202200
end
203201
end
204202

205203
context 'stress' do
206204
configurations = [
207-
{ min_threads: 2,
208-
max_threads: ThreadPoolExecutor::DEFAULT_MAX_POOL_SIZE,
209-
idletime: 0.1, # 1 minute
210-
max_queue: 0, # unlimited
205+
{
206+
min_threads: 2,
207+
max_threads: ThreadPoolExecutor::DEFAULT_MAX_POOL_SIZE,
208+
idletime: 60, # 1 minute
209+
max_queue: 0, # unlimited
211210
fallback_policy: :caller_runs, # shouldn't matter -- 0 max queue
212-
gc_interval: 0.1 },
213-
{ min_threads: 2,
214-
max_threads: 4,
215-
idletime: 0.1, # 1 minute
216-
max_queue: 0, # unlimited
211+
},
212+
{
213+
min_threads: 2,
214+
max_threads: 4,
215+
idletime: 60, # 1 minute
216+
max_queue: 0, # unlimited
217217
fallback_policy: :caller_runs, # shouldn't matter -- 0 max queue
218-
gc_interval: 0.1 }
218+
}
219219
]
220220

221221
configurations.each do |config|

spec/concurrent/executor/java_thread_pool_executor_spec.rb

-7
Original file line numberDiff line numberDiff line change
@@ -26,13 +26,6 @@ module Concurrent
2626

2727
it_should_behave_like :thread_pool_executor
2828

29-
context :prune do
30-
it "is a no-op, pruning is handled by the JVM" do
31-
executor = JavaThreadPoolExecutor.new
32-
executor.prune_pool
33-
end
34-
end
35-
3629
context '#overload_policy' do
3730

3831
specify ':abort maps to AbortPolicy' do

0 commit comments

Comments
 (0)