-
Notifications
You must be signed in to change notification settings - Fork 416
/
Copy pathjava_thread_pool_executor.rb
147 lines (115 loc) · 4.87 KB
/
java_thread_pool_executor.rb
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
if Concurrent.on_jruby?
require 'concurrent/executor/java_executor_service'
module Concurrent
# @!macro thread_pool_executor
# @!macro thread_pool_options
# @!visibility private
class JavaThreadPoolExecutor < JavaExecutorService
include Concern::Deprecation
# @!macro thread_pool_executor_constant_default_max_pool_size
DEFAULT_MAX_POOL_SIZE = java.lang.Integer::MAX_VALUE # 2147483647
# @!macro thread_pool_executor_constant_default_min_pool_size
DEFAULT_MIN_POOL_SIZE = 0
# @!macro thread_pool_executor_constant_default_max_queue_size
DEFAULT_MAX_QUEUE_SIZE = 0
# @!macro thread_pool_executor_constant_default_thread_timeout
DEFAULT_THREAD_IDLETIMEOUT = 60
# @!macro thread_pool_executor_constant_default_synchronous
DEFAULT_SYNCHRONOUS = false
# @!macro thread_pool_executor_attr_reader_max_length
attr_reader :max_length
# @!macro thread_pool_executor_attr_reader_max_queue
attr_reader :max_queue
# @!macro thread_pool_executor_attr_reader_synchronous
attr_reader :synchronous
# @!macro thread_pool_executor_method_initialize
def initialize(opts = {})
super(opts)
end
# @!macro executor_service_method_can_overflow_question
def can_overflow?
@max_queue != 0
end
# @!macro thread_pool_executor_attr_reader_min_length
def min_length
@executor.getCorePoolSize
end
# @!macro thread_pool_executor_attr_reader_max_length
def max_length
@executor.getMaximumPoolSize
end
# @!macro thread_pool_executor_attr_reader_length
def length
@executor.getPoolSize
end
# @!macro thread_pool_executor_attr_reader_largest_length
def largest_length
@executor.getLargestPoolSize
end
# @!macro thread_pool_executor_attr_reader_scheduled_task_count
def scheduled_task_count
@executor.getTaskCount
end
# @!macro thread_pool_executor_attr_reader_completed_task_count
def completed_task_count
@executor.getCompletedTaskCount
end
# @!macro thread_pool_executor_method_active_count
def active_count
@executor.getActiveCount
end
# @!macro thread_pool_executor_attr_reader_idletime
def idletime
@executor.getKeepAliveTime(java.util.concurrent.TimeUnit::SECONDS)
end
# @!macro thread_pool_executor_attr_reader_queue_length
def queue_length
@executor.getQueue.size
end
# @!macro thread_pool_executor_attr_reader_remaining_capacity
def remaining_capacity
@max_queue == 0 ? -1 : @executor.getQueue.remainingCapacity
end
# @!macro executor_service_method_running_question
def running?
super && [email protected]
end
# @!macro thread_pool_executor_method_prune_pool
def prune_pool
deprecated "#prune_pool has no effect and will be removed in the next release."
end
private
def ns_initialize(opts)
min_length = opts.fetch(:min_threads, DEFAULT_MIN_POOL_SIZE).to_i
max_length = opts.fetch(:max_threads, DEFAULT_MAX_POOL_SIZE).to_i
idletime = opts.fetch(:idletime, DEFAULT_THREAD_IDLETIMEOUT).to_i
@max_queue = opts.fetch(:max_queue, DEFAULT_MAX_QUEUE_SIZE).to_i
@synchronous = opts.fetch(:synchronous, DEFAULT_SYNCHRONOUS)
@fallback_policy = opts.fetch(:fallback_policy, :abort)
raise ArgumentError.new("`synchronous` cannot be set unless `max_queue` is 0") if @synchronous && @max_queue > 0
raise ArgumentError.new("`max_threads` cannot be less than #{DEFAULT_MIN_POOL_SIZE}") if max_length < DEFAULT_MIN_POOL_SIZE
raise ArgumentError.new("`max_threads` cannot be greater than #{DEFAULT_MAX_POOL_SIZE}") if max_length > DEFAULT_MAX_POOL_SIZE
raise ArgumentError.new("`min_threads` cannot be less than #{DEFAULT_MIN_POOL_SIZE}") if min_length < DEFAULT_MIN_POOL_SIZE
raise ArgumentError.new("`min_threads` cannot be more than `max_threads`") if min_length > max_length
raise ArgumentError.new("#{fallback_policy} is not a valid fallback policy") unless FALLBACK_POLICY_CLASSES.include?(@fallback_policy)
if @max_queue == 0
if @synchronous
queue = java.util.concurrent.SynchronousQueue.new
else
queue = java.util.concurrent.LinkedBlockingQueue.new
end
else
queue = java.util.concurrent.LinkedBlockingQueue.new(@max_queue)
end
@executor = java.util.concurrent.ThreadPoolExecutor.new(
min_length,
max_length,
idletime,
java.util.concurrent.TimeUnit::SECONDS,
queue,
DaemonThreadFactory.new(ns_auto_terminate?),
FALLBACK_POLICY_CLASSES[@fallback_policy].new)
end
end
end
end