19
19
import java .util .concurrent .TimeUnit ;
20
20
import java .util .concurrent .atomic .AtomicBoolean ;
21
21
import java .util .concurrent .atomic .AtomicReference ;
22
- import java .util .concurrent .locks .ReentrantLock ;
23
22
24
23
import software .aws .toolkits .eclipse .amazonq .observers .EventObserver ;
25
24
import software .aws .toolkits .eclipse .amazonq .observers .StreamObserver ;
26
- import software .aws .toolkits .eclipse .amazonq .plugin .Activator ;
27
25
28
26
public final class EventBroker {
29
27
30
28
@ FunctionalInterface
31
29
private interface TypedCallable <T > {
32
- void call (T event );
30
+ void callWith (T event );
33
31
}
34
32
35
- private final class BlockingCallerRunsPolicy implements RejectedExecutionHandler {
33
+ public static final class CallerRunsPolicyBlocking implements RejectedExecutionHandler {
36
34
37
35
private final BlockingQueue <Runnable > workQueue ;
38
36
39
- BlockingCallerRunsPolicy (final BlockingQueue <Runnable > workQueue ) {
37
+ CallerRunsPolicyBlocking (final BlockingQueue <Runnable > workQueue ) {
40
38
this .workQueue = workQueue ;
41
39
}
42
40
@@ -56,122 +54,98 @@ public void rejectedExecution(final Runnable runnable, final ThreadPoolExecutor
56
54
57
55
}
58
56
59
- private class OrderedThreadPoolExecutor {
57
+ public static final class OrderedThreadPoolExecutor {
60
58
61
- private final Map <String , BlockingQueue <?>> typedEventQueue ;
62
- private final Map <String , AtomicBoolean > typedJobStatus ;
63
- private final Map <String , ReentrantLock > typedJobLock ;
64
- private final Map <String , TypedCallable <?>> typedCallback ;
59
+ private final Map <String , BlockingQueue <?>> interestIdToEventQueueMap ;
60
+ private final Map <String , AtomicBoolean > interestIdToJobStatusMap ;
61
+ private final Map <String , TypedCallable <?>> interestIdToCallbackMap ;
65
62
66
- private final BlockingQueue <Runnable > workQueue ;
63
+ private final BlockingQueue <Runnable > scheduledJobsQueue ;
67
64
private final ThreadPoolExecutor executor ;
68
65
private final int eventQueueCapacity ;
69
66
70
- OrderedThreadPoolExecutor (final int coreThreadCount , final int maxThreadCount , final int queueCapacity ,
71
- final int keepAliveTime , final int eventQueueCapacity ) {
72
- workQueue = new ArrayBlockingQueue <>(queueCapacity );
73
- typedEventQueue = new ConcurrentHashMap <>();
74
- typedJobStatus = new ConcurrentHashMap <>();
75
- typedJobLock = new ConcurrentHashMap <>();
76
- typedCallback = new ConcurrentHashMap <>();
67
+ OrderedThreadPoolExecutor (final int coreThreadCount , final int maxThreadCount , final int jobQueueCapacity ,
68
+ final int eventQueueCapacity , final int keepAliveTime , final TimeUnit keepAliveTimeUnit ) {
69
+ scheduledJobsQueue = new ArrayBlockingQueue <>(jobQueueCapacity );
70
+ interestIdToEventQueueMap = new ConcurrentHashMap <>();
71
+ interestIdToJobStatusMap = new ConcurrentHashMap <>();
72
+ interestIdToCallbackMap = new ConcurrentHashMap <>();
77
73
78
74
this .eventQueueCapacity = eventQueueCapacity ;
79
75
80
- executor = new ThreadPoolExecutor (coreThreadCount , maxThreadCount , keepAliveTime , TimeUnit . MILLISECONDS ,
81
- workQueue , Executors .defaultThreadFactory (), new BlockingCallerRunsPolicy ( workQueue ));
76
+ executor = new ThreadPoolExecutor (coreThreadCount , maxThreadCount , keepAliveTime , keepAliveTimeUnit ,
77
+ scheduledJobsQueue , Executors .defaultThreadFactory (), new CallerRunsPolicyBlocking ( scheduledJobsQueue ));
82
78
}
83
79
84
- public <T , R > void registerCallback (final String interestId , final TypedCallable <R > callback ) {
85
- typedCallback .putIfAbsent (interestId , callback );
80
+ public <T , R > void registerCallbackForInterest (final String interestId , final TypedCallable <R > callback ) {
81
+ interestIdToCallbackMap .putIfAbsent (interestId , callback );
86
82
}
87
83
88
- public <T > boolean hasRegisteredCallback (final String interestType ) {
89
- return typedCallback .containsKey (interestType );
84
+ public <T > boolean isCallbackRegisteredForInterest (final String interestId ) {
85
+ return interestIdToCallbackMap .containsKey (interestId );
90
86
}
91
87
92
88
@ SuppressWarnings ("unchecked" )
93
- public <T , R > void submit (final String interestId , final R event ) {
94
- BlockingQueue <R > eventQueue = (BlockingQueue <R >) typedEventQueue .computeIfAbsent (interestId ,
89
+ public <T , R > void submitEventForInterest (final String interestId , final R event ) {
90
+ BlockingQueue <R > eventQueue = (BlockingQueue <R >) interestIdToEventQueueMap .computeIfAbsent (interestId ,
95
91
k -> new ArrayBlockingQueue <>(eventQueueCapacity , true ));
96
92
try {
97
93
eventQueue .put (event );
98
94
} catch (InterruptedException e ) {
99
95
e .printStackTrace ();
100
96
}
101
97
102
- handleScheduling (interestId , (Class <R >) event .getClass (), eventQueue );
98
+ handleJobScheduling (interestId , (Class <R >) event .getClass (), eventQueue );
103
99
}
104
100
105
- public <T , R > void handleScheduling (final String interestId , final Class <R > eventType ,
101
+ private <T , R > void handleJobScheduling (final String interestId , final Class <R > eventType ,
106
102
final BlockingQueue <R > eventQueue ) {
107
- AtomicBoolean jobStatus = typedJobStatus .computeIfAbsent (interestId , k -> new AtomicBoolean (false ));
108
- ReentrantLock jobLock = typedJobLock .computeIfAbsent (interestId , k -> new ReentrantLock (true ));
103
+ AtomicBoolean jobStatus = interestIdToJobStatusMap .computeIfAbsent (interestId , k -> new AtomicBoolean (false ));
109
104
110
- jobLock .lock ();
111
- try {
112
- if (!jobStatus .get () && !eventQueue .isEmpty ()) {
113
- if (jobStatus .compareAndSet (false , true )) {
114
- executor .submit (() -> processEventQueue (interestId , eventType ,
115
- eventQueue , jobStatus , jobLock ));
116
- }
117
- }
118
- } finally {
119
- jobLock .unlock ();
105
+ if (!jobStatus .get () && !eventQueue .isEmpty () && jobStatus .compareAndSet (false , true )) {
106
+ executor .submit (() -> processQueuedEvents (interestId , eventType , eventQueue , jobStatus ));
120
107
}
121
108
}
122
109
123
110
@ SuppressWarnings ("unchecked" )
124
- public <T , R > void processEventQueue (final String interestId , final Class <R > eventType ,
125
- final BlockingQueue <R > eventQueue , final AtomicBoolean jobStatus , final ReentrantLock jobLock ) {
126
- if (jobStatus == null || jobLock == null || eventQueue == null ) {
127
- throw new NullPointerException ("ThreadPoolExecutor in unexpected state" );
128
- }
129
-
130
- jobLock .lock ();
111
+ private <T , R > void processQueuedEvents (final String interestId , final Class <R > eventType ,
112
+ final BlockingQueue <R > eventQueue , final AtomicBoolean jobStatus ) {
131
113
try {
132
- TypedCallable <R > eventCallback = (TypedCallable <R >) typedCallback .get (interestId );
114
+ TypedCallable <R > eventCallback = (TypedCallable <R >) interestIdToCallbackMap .get (interestId );
133
115
if (eventCallback == null ) {
134
116
return ;
135
117
}
136
118
137
119
while (!eventQueue .isEmpty ()) {
138
- try {
139
- R newEvent = eventQueue .take ();
140
- if (newEvent != null ) {
141
- try {
142
- eventCallback .call (newEvent );
143
- } catch (Exception e ) {
144
- e .printStackTrace ();
145
- }
120
+ R newEvent = eventQueue .poll ();
121
+ if (newEvent != null ) {
122
+ try {
123
+ eventCallback .callWith (newEvent );
124
+ } catch (Exception e ) {
125
+ e .printStackTrace ();
146
126
}
147
- } catch (InterruptedException e ) {
148
- e .printStackTrace ();
149
127
}
150
128
}
151
129
} finally {
152
- try {
153
- jobStatus .set (false );
154
- } finally {
155
- jobLock .unlock ();
156
- }
130
+ jobStatus .set (false );
157
131
}
158
132
}
133
+
159
134
}
160
135
161
136
private static final EventBroker INSTANCE ;
162
- private final Map <Class <?>, SubmissionPublisher <?>> publishers ;
163
- private final OrderedThreadPoolExecutor emissionExecutor ;
164
- private final OrderedThreadPoolExecutor consumptionExecutor ;
137
+ private final Map <Class <?>, SubmissionPublisher <?>> eventTypeToPublisherMap ;
138
+ private final OrderedThreadPoolExecutor publisherExecutor ;
139
+ private final OrderedThreadPoolExecutor subscriberExecutor ;
165
140
166
141
static {
167
142
INSTANCE = new EventBroker ();
168
143
}
169
144
170
145
private EventBroker () {
171
- publishers = new ConcurrentHashMap <>();
172
-
173
- emissionExecutor = new OrderedThreadPoolExecutor (5 , 30 , 30 , 10 , 100000000 );
174
- consumptionExecutor = new OrderedThreadPoolExecutor (5 , 30 , 30 , 10 , 100000000 );
146
+ eventTypeToPublisherMap = new ConcurrentHashMap <>();
147
+ publisherExecutor = new OrderedThreadPoolExecutor (3 , 10 , 10 , 100 , 10 , TimeUnit .MILLISECONDS );
148
+ subscriberExecutor = new OrderedThreadPoolExecutor (3 , 10 , 10 , 100 , 10 , TimeUnit .MILLISECONDS );
175
149
}
176
150
177
151
public static EventBroker getInstance () {
@@ -184,20 +158,20 @@ public <T> void post(final T event) {
184
158
return ;
185
159
}
186
160
187
- SubmissionPublisher <T > publisher = getPublisher ((Class <T >) event .getClass ());
188
- if (!emissionExecutor . hasRegisteredCallback ((event .getClass ().getName ()))) {
189
- registerPublisherCallback ( publisher , event .getClass ().getName ());
161
+ SubmissionPublisher <T > publisher = getPublisherForEventType ((Class <T >) event .getClass ());
162
+ if (!publisherExecutor . isCallbackRegisteredForInterest ((event .getClass ().getName ()))) {
163
+ registerPublisherCallbackForInterest ( event .getClass ().getName (), publisher );
190
164
}
191
165
192
- emissionExecutor . submit (event .getClass ().getName (), event );
166
+ publisherExecutor . submitEventForInterest (event .getClass ().getName (), event );
193
167
}
194
168
195
169
public <T > Subscription subscribe (final EventObserver <T > observer ) {
196
- SubmissionPublisher <T > publisher = getPublisher (observer .getEventType ());
170
+ SubmissionPublisher <T > publisher = getPublisherForEventType (observer .getEventType ());
197
171
AtomicReference <Subscription > subscriptionReference = new AtomicReference <>();
198
172
String subscriberId = UUID .randomUUID ().toString ();
199
173
200
- registerSubscriberCallback ( observer , subscriberId );
174
+ registerSubscriberCallbackForInterest ( subscriberId , observer );
201
175
202
176
Subscriber <T > subscriber = new Subscriber <>() {
203
177
@@ -207,19 +181,18 @@ public <T> Subscription subscribe(final EventObserver<T> observer) {
207
181
public void onSubscribe (final Subscription subscription ) {
208
182
this .subscription = subscription ;
209
183
subscriptionReference .set (subscription );
210
-
211
184
this .subscription .request (1 );
212
185
}
213
186
214
187
@ Override
215
188
public void onNext (final T event ) {
216
- consumptionExecutor . submit (subscriberId , event );
217
- this . subscription .request (1 );
189
+ subscriberExecutor . submitEventForInterest (subscriberId , event );
190
+ subscription .request (1 );
218
191
}
219
192
220
193
@ Override
221
- public void onError (final Throwable throwable ) {
222
- return ;
194
+ public void onError (final Throwable error ) {
195
+ error . printStackTrace () ;
223
196
}
224
197
225
198
@ Override
@@ -234,11 +207,11 @@ public void onComplete() {
234
207
}
235
208
236
209
public <T > Subscription subscribe (final StreamObserver <T > observer ) {
237
- SubmissionPublisher <T > publisher = getPublisher (observer .getEventType ());
210
+ SubmissionPublisher <T > publisher = getPublisherForEventType (observer .getEventType ());
238
211
AtomicReference <Subscription > subscriptionReference = new AtomicReference <>();
239
212
String subscriberId = UUID .randomUUID ().toString ();
240
213
241
- registerSubscriberCallback ( observer , subscriberId );
214
+ registerSubscriberCallbackForInterest ( subscriberId , observer );
242
215
243
216
Subscriber <T > subscriber = new Subscriber <>() {
244
217
@@ -248,19 +221,18 @@ public <T> Subscription subscribe(final StreamObserver<T> observer) {
248
221
public void onSubscribe (final Subscription subscription ) {
249
222
this .subscription = subscription ;
250
223
subscriptionReference .set (subscription );
251
-
252
224
this .subscription .request (1 );
253
225
}
254
226
255
227
@ Override
256
228
public void onNext (final T event ) {
257
- consumptionExecutor . submit (subscriberId , event );
258
- this . subscription .request (1 );
229
+ subscriberExecutor . submitEventForInterest (subscriberId , event );
230
+ subscription .request (1 );
259
231
}
260
232
261
233
@ Override
262
- public void onError (final Throwable throwable ) {
263
- observer .onError (throwable );
234
+ public void onError (final Throwable error ) {
235
+ observer .onError (error );
264
236
}
265
237
266
238
@ Override
@@ -275,30 +247,31 @@ public void onComplete() {
275
247
}
276
248
277
249
@ SuppressWarnings ("unchecked" )
278
- private <T > SubmissionPublisher <T > getPublisher (final Class <T > eventType ) {
279
- return (SubmissionPublisher <T >) publishers .computeIfAbsent (eventType ,
250
+ private <T > SubmissionPublisher <T > getPublisherForEventType (final Class <T > eventType ) {
251
+ return (SubmissionPublisher <T >) eventTypeToPublisherMap .computeIfAbsent (eventType ,
280
252
key -> new SubmissionPublisher <>(Runnable ::run , Flow .defaultBufferSize ()));
281
253
}
282
254
283
- private <T > void registerSubscriberCallback (final EventObserver < T > subscriber , final String subscriberId ) {
284
- Activator . getLogger (). info ( subscriberId );
255
+ private <T > void registerSubscriberCallbackForInterest (final String interestId ,
256
+ final EventObserver < T > observer ) {
285
257
TypedCallable <T > eventCallback = new TypedCallable <>() {
286
258
@ Override
287
- public void call (final T event ) {
288
- subscriber .onEvent (event );
259
+ public void callWith (final T event ) {
260
+ observer .onEvent (event );
289
261
}
290
262
};
291
- consumptionExecutor . registerCallback ( subscriberId , eventCallback );
263
+ subscriberExecutor . registerCallbackForInterest ( interestId , eventCallback );
292
264
}
293
265
294
- private <T > void registerPublisherCallback (final SubmissionPublisher <T > publisher , final String eventId ) {
266
+ private <T > void registerPublisherCallbackForInterest (final String interestId ,
267
+ final SubmissionPublisher <T > publisher ) {
295
268
TypedCallable <T > eventCallback = new TypedCallable <>() {
296
269
@ Override
297
- public void call (final T event ) {
270
+ public void callWith (final T event ) {
298
271
publisher .submit (event );
299
272
}
300
273
};
301
- emissionExecutor . registerCallback ( eventId , eventCallback );
274
+ publisherExecutor . registerCallbackForInterest ( interestId , eventCallback );
302
275
}
303
276
304
277
}
0 commit comments