-
Notifications
You must be signed in to change notification settings - Fork 416
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
Asynchronous pruning for RubyThreadPoolExecutor #1082
base: master
Are you sure you want to change the base?
Asynchronous pruning for RubyThreadPoolExecutor #1082
Conversation
b6e5656
to
f90d46d
Compare
c5eca7d
to
89b67f4
Compare
547428b
to
a675844
Compare
lib/concurrent-ruby/concurrent/collection/ruby_timeout_queue.rb
Outdated
Show resolved
Hide resolved
fd02056
to
6cedac3
Compare
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Finally taking a look at this, sorry for the delay
lib/concurrent-ruby/concurrent/collection/ruby_timeout_queue.rb
Outdated
Show resolved
Hide resolved
lib/concurrent-ruby/concurrent/collection/ruby_timeout_queue.rb
Outdated
Show resolved
Hide resolved
defd0f7
to
9739e9a
Compare
lib/concurrent-ruby/concurrent/collection/ruby_timeout_queue.rb
Outdated
Show resolved
Hide resolved
9739e9a
to
6aa4ee5
Compare
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
The Queue stuff looks ready to me, i.e. I reviewed and approve that part.
The pool changes it's hard for me to tell because I haven't had time to understand the implementation in details (and likely won't have time for that soon).
It would be great if another concurrent-ruby maintainer could help review that (and they should feel free to merge this PR after they approve)
throw :stop | ||
end | ||
|
||
prunable = false |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Is it impossible if prunable_capacity
is <= 0 that a worker would need to be prunable again?
i.e. what if prunable_capacity
is 0 because all workers are busy but later on the pool is no longer running? and then we should prune/finish all workers?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
i.e. what if prunable_capacity is 0 because all workers are busy but later on the pool is no longer running? and then we should prune/finish all workers?
If a worker is busy they will be marked as ready when the task has completed, during which we stop them (in #ns_ready_worker
) if the pool is no longer running. Even if the pool is only stopped after this point, we also mark that worker as prune-able again, which means they will eventually time out (cause work is no longer being accepted) and prune themselves.
timeout = prunable && my_pool.running? ? my_idletime : nil | ||
case message = my_queue.pop(timeout: timeout) | ||
when nil | ||
if my_pool.prunable_capacity.positive? |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
if my_pool.prunable_capacity.positive? | |
if my_pool.prunable_capacity > 0 |
I think this is clearer
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
@@ -95,8 +97,16 @@ def remaining_capacity | |||
end | |||
|
|||
# @!visibility private | |||
def remove_busy_worker(worker) | |||
synchronize { ns_remove_busy_worker worker } | |||
def prunable_capacity |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Could you add a comment documenting what does this return?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Added to the ns_*
method similar to the others here.
def remove_busy_worker(worker) | ||
synchronize { ns_remove_busy_worker worker } | ||
def prunable_capacity | ||
synchronize { ns_prunable_capacity } |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Isn't there a risk the value is outdated by the time it's used since it will be used outside of this synchronize
block?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Oh good point. We should synchronize the entire pruning process else a worker that just became idle could be assigned work between when it thinks it should prune itself and when it actually does...
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
|
||
@next_gc_time = Concurrent.monotonic_time + @gc_interval | ||
def ns_prunable_capacity |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Could you explain the logic here? Is it how many workers can be removed at the current time?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
101b66e
to
e4f5794
Compare
e4f5794
to
4d088a0
Compare
Closes #1066
Closes #1075
Alternative to #1079
Implementation is based on the discussion in the linked issues.