Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Event Bus in Native Java #324

Closed
wants to merge 16 commits into from
Closed
Changes from 1 commit
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Prev Previous commit
Next Next commit
Fixed issues with type identification
taldekar committed Jan 17, 2025
commit b915d7641a7b423bb0eb454c3e2f5d4577e03a5e
Original file line number Diff line number Diff line change
@@ -9,6 +9,7 @@
import java.util.concurrent.BlockingQueue;
import java.util.concurrent.ConcurrentHashMap;
import java.util.concurrent.Executors;
import java.util.concurrent.Flow;
import java.util.concurrent.Flow.Subscriber;
import java.util.concurrent.Flow.Subscription;
import java.util.concurrent.RejectedExecutionException;
@@ -22,6 +23,7 @@

import software.aws.toolkits.eclipse.amazonq.observers.EventObserver;
import software.aws.toolkits.eclipse.amazonq.observers.StreamObserver;
import software.aws.toolkits.eclipse.amazonq.plugin.Activator;

public final class EventBroker {

@@ -56,10 +58,10 @@ public void rejectedExecution(final Runnable runnable, final ThreadPoolExecutor

private class OrderedThreadPoolExecutor {

private final Map<Class<?>, BlockingQueue<?>> typedEventQueue;
private final Map<Class<?>, AtomicBoolean> typedJobStatus;
private final Map<Class<?>, ReentrantLock> typedJobLock;
private final Map<Class<?>, TypedCallable<?>> typedCallback;
private final Map<String, BlockingQueue<?>> typedEventQueue;
private final Map<String, AtomicBoolean> typedJobStatus;
private final Map<String, ReentrantLock> typedJobLock;
private final Map<String, TypedCallable<?>> typedCallback;

private final BlockingQueue<Runnable> workQueue;
private final ThreadPoolExecutor executor;
@@ -79,67 +81,80 @@ private class OrderedThreadPoolExecutor {
workQueue, Executors.defaultThreadFactory(), new BlockingCallerRunsPolicy(workQueue));
}

public <T, R> void registerCallback(final Class<T> interestType, final TypedCallable<R> callback) {
typedCallback.putIfAbsent(interestType, callback);
public <T, R> void registerCallback(final String interestId, final TypedCallable<R> callback) {
typedCallback.putIfAbsent(interestId, callback);
}

public <T> boolean hasRegisteredCallback(final Class<T> interestType) {
public <T> boolean hasRegisteredCallback(final String interestType) {
return typedCallback.containsKey(interestType);
}

@SuppressWarnings("unchecked")
public <T, R> void submit(final Class<T> interestType, final R event) {
BlockingQueue<R> eventQueue = (BlockingQueue<R>) typedEventQueue.computeIfAbsent(interestType,
k -> new ArrayBlockingQueue<>(eventQueueCapacity));
eventQueue.offer(event);
public <T, R> void submit(final String interestId, final R event) {
BlockingQueue<R> eventQueue = (BlockingQueue<R>) typedEventQueue.computeIfAbsent(interestId,
k -> new ArrayBlockingQueue<>(eventQueueCapacity, true));
try {
eventQueue.put(event);
} catch (InterruptedException e) {
e.printStackTrace();
}

handleScheduling(interestType, event.getClass());
handleScheduling(interestId, (Class<R>) event.getClass(), eventQueue);
}

public <T, R> void handleScheduling(final Class<T> interestType, final Class<R> eventType) {
AtomicBoolean jobStatus = typedJobStatus.computeIfAbsent(interestType, k -> new AtomicBoolean(false));
ReentrantLock jobLock = typedJobLock.computeIfAbsent(interestType, k -> new ReentrantLock(true));

if (!jobStatus.get()) {
jobLock.lock();
public <T, R> void handleScheduling(final String interestId, final Class<R> eventType,
final BlockingQueue<R> eventQueue) {
AtomicBoolean jobStatus = typedJobStatus.computeIfAbsent(interestId, k -> new AtomicBoolean(false));
ReentrantLock jobLock = typedJobLock.computeIfAbsent(interestId, k -> new ReentrantLock(true));

try {
jobLock.lock();
try {
if (!jobStatus.get() && !eventQueue.isEmpty()) {
if (jobStatus.compareAndSet(false, true)) {
executor.submit(() -> processEventQueue(interestType, eventType));
executor.submit(() -> processEventQueue(interestId, eventType,
eventQueue, jobStatus, jobLock));
}
} finally {
jobLock.unlock();
}
} finally {
jobLock.unlock();
}
}

@SuppressWarnings("unchecked")
public <T, R> void processEventQueue(final Class<T> interestType, final Class<R> eventType) {
TypedCallable<R> eventCallback = (TypedCallable<R>) typedCallback.get(interestType);
if (eventCallback == null) {
return;
}

AtomicBoolean jobStatus = typedJobStatus.get(interestType);
ReentrantLock jobLock = typedJobLock.get(interestType);
BlockingQueue<R> eventQueue = (BlockingQueue<R>) typedEventQueue.get(interestType);

public <T, R> void processEventQueue(final String interestId, final Class<R> eventType,
final BlockingQueue<R> eventQueue, final AtomicBoolean jobStatus, final ReentrantLock jobLock) {
if (jobStatus == null || jobLock == null || eventQueue == null) {
throw new NullPointerException("ThreadPoolExecutor in unexpected state");
}

jobLock.lock();
try {
TypedCallable<R> eventCallback = (TypedCallable<R>) typedCallback.get(interestId);
if (eventCallback == null) {
return;
}

while (!eventQueue.isEmpty()) {
R newEvent = eventQueue.poll();

if (newEvent != null) {
eventCallback.call(newEvent);
while (!eventQueue.isEmpty()) {
try {
R newEvent = eventQueue.take();
if (newEvent != null) {
try {
eventCallback.call(newEvent);
} catch (Exception e) {
e.printStackTrace();
}
}
} catch (InterruptedException e) {
e.printStackTrace();
}
}
} finally {
try {
jobStatus.set(false);
} finally {
jobLock.unlock();
}
}

jobStatus.set(false);
jobLock.unlock();
}
}

@@ -155,8 +170,8 @@ public <T, R> void processEventQueue(final Class<T> interestType, final Class<R>
private EventBroker() {
publishers = new ConcurrentHashMap<>();

emissionExecutor = new OrderedThreadPoolExecutor(10, 10, 10, 10, 10);
consumptionExecutor = new OrderedThreadPoolExecutor(10, 10, 10, 10, 10);
emissionExecutor = new OrderedThreadPoolExecutor(5, 30, 30, 10, 100000000);
consumptionExecutor = new OrderedThreadPoolExecutor(5, 30, 30, 10, 100000000);
}

public static EventBroker getInstance() {
@@ -170,23 +185,23 @@ public <T> void post(final T event) {
}

SubmissionPublisher<T> publisher = getPublisher((Class<T>) event.getClass());
if (!emissionExecutor.hasRegisteredCallback(event.getClass())) {
registerPublisherCallback(publisher, (Class<T>) event.getClass());
if (!emissionExecutor.hasRegisteredCallback((event.getClass().getName()))) {
registerPublisherCallback(publisher, event.getClass().getName());
}

emissionExecutor.submit(event.getClass(), event);
emissionExecutor.submit(event.getClass().getName(), event);
}

public <T> Subscription subscribe(final EventObserver<T> observer) {
SubmissionPublisher<T> publisher = getPublisher(observer.getEventType());
AtomicReference<Subscription> subscriptionReference = new AtomicReference<>();
Class<?> subscriberToken = createUniqueSubscriberClassToken();
String subscriberId = UUID.randomUUID().toString();

registerSubscriberCallback(observer, subscriberToken);
registerSubscriberCallback(observer, subscriberId);

Subscriber<T> subscriber = new Subscriber<>() {

private Subscription subscription;
private volatile Subscription subscription;

@Override
public void onSubscribe(final Subscription subscription) {
@@ -198,7 +213,7 @@ public void onSubscribe(final Subscription subscription) {

@Override
public void onNext(final T event) {
consumptionExecutor.submit(subscriberToken, event);
consumptionExecutor.submit(subscriberId, event);
this.subscription.request(1);
}

@@ -221,13 +236,13 @@ public void onComplete() {
public <T> Subscription subscribe(final StreamObserver<T> observer) {
SubmissionPublisher<T> publisher = getPublisher(observer.getEventType());
AtomicReference<Subscription> subscriptionReference = new AtomicReference<>();
Class<?> subscriberToken = createUniqueSubscriberClassToken();
String subscriberId = UUID.randomUUID().toString();

registerSubscriberCallback(observer, subscriberToken);
registerSubscriberCallback(observer, subscriberId);

Subscriber<T> subscriber = new Subscriber<>() {

private Subscription subscription;
private volatile Subscription subscription;

@Override
public void onSubscribe(final Subscription subscription) {
@@ -239,7 +254,7 @@ public void onSubscribe(final Subscription subscription) {

@Override
public void onNext(final T event) {
consumptionExecutor.submit(subscriberToken, event);
consumptionExecutor.submit(subscriberId, event);
this.subscription.request(1);
}

@@ -262,33 +277,28 @@ public void onComplete() {
@SuppressWarnings("unchecked")
private <T> SubmissionPublisher<T> getPublisher(final Class<T> eventType) {
return (SubmissionPublisher<T>) publishers.computeIfAbsent(eventType,
key -> new SubmissionPublisher<>());
}

private Class<?> createUniqueSubscriberClassToken() {
return new Object() {
private static final String ID = UUID.randomUUID().toString();
}.getClass();
key -> new SubmissionPublisher<>(Runnable::run, Flow.defaultBufferSize()));
}

private <T> void registerSubscriberCallback(final EventObserver<T> subscriber, final Class<?> subscriberToken) {
private <T> void registerSubscriberCallback(final EventObserver<T> subscriber, final String subscriberId) {
Activator.getLogger().info(subscriberId);
TypedCallable<T> eventCallback = new TypedCallable<>() {
@Override
public void call(final T event) {
subscriber.onEvent(event);
}
};
consumptionExecutor.registerCallback(subscriberToken, eventCallback);
consumptionExecutor.registerCallback(subscriberId, eventCallback);
}

private <T> void registerPublisherCallback(final SubmissionPublisher<T> publisher, final Class<T> eventType) {
private <T> void registerPublisherCallback(final SubmissionPublisher<T> publisher, final String eventId) {
TypedCallable<T> eventCallback = new TypedCallable<>() {
@Override
public void call(final T event) {
publisher.submit(event);
}
};
emissionExecutor.registerCallback(eventType, eventCallback);
emissionExecutor.registerCallback(eventId, eventCallback);
}

}
Original file line number Diff line number Diff line change
@@ -5,13 +5,19 @@

public final class TestEvent {
private final String message;
private final int sequenceNumber;

public TestEvent(final String message) {
public TestEvent(final String message, final int sequenceNumber) {
this.message = message;
this.sequenceNumber = sequenceNumber;
}

public String getMessage() {
return message;
}

public int getSequenceNumber() {
return sequenceNumber;
}

}
Original file line number Diff line number Diff line change
@@ -3,22 +3,27 @@

package software.aws.toolkits.eclipse.amazonq.plugin;

import java.util.ArrayList;
import java.util.List;

import org.eclipse.ui.plugin.AbstractUIPlugin;
import org.osgi.framework.BundleContext;

import software.aws.toolkits.eclipse.amazonq.broker.EventBroker;
import software.aws.toolkits.eclipse.amazonq.chat.ChatStateManager;
import software.aws.toolkits.eclipse.amazonq.configuration.DefaultPluginStore;
import software.aws.toolkits.eclipse.amazonq.configuration.PluginStore;
import software.aws.toolkits.eclipse.amazonq.lsp.auth.DefaultLoginService;
import software.aws.toolkits.eclipse.amazonq.lsp.auth.LoginService;
import software.aws.toolkits.eclipse.amazonq.providers.LspProvider;
import software.aws.toolkits.eclipse.amazonq.providers.LspProviderImpl;
import software.aws.toolkits.eclipse.amazonq.subscriber.TestSubscribers;
import software.aws.toolkits.eclipse.amazonq.telemetry.service.DefaultTelemetryService;
import software.aws.toolkits.eclipse.amazonq.telemetry.service.TelemetryService;
import software.aws.toolkits.eclipse.amazonq.util.PluginLogger;
import software.aws.toolkits.eclipse.amazonq.chat.ChatStateManager;
import software.aws.toolkits.eclipse.amazonq.util.CodeReferenceLoggingService;
import software.aws.toolkits.eclipse.amazonq.util.DefaultCodeReferenceLoggingService;
import software.aws.toolkits.eclipse.amazonq.util.LoggingService;
import software.aws.toolkits.eclipse.amazonq.util.PluginLogger;

public class Activator extends AbstractUIPlugin {

@@ -44,6 +49,14 @@ public Activator() {
.initializeOnStartUp()
.build();
codeReferenceLoggingService = DefaultCodeReferenceLoggingService.getInstance();

List<TestSubscribers> testSubscriberList = new ArrayList<>(3);
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Do we expect Activator to be the source of truth for event broker subscriptions?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

No, this was just to test that the event bus is working. Subscriptions can come from anywhere in the codebase.


for (int i = 0; i < 3; ++i) {
TestSubscribers testSubsciber = new TestSubscribers();
testSubscriberList.add(testSubsciber);
EventBroker.getInstance().subscribe(testSubsciber);
}
}

@Override
Original file line number Diff line number Diff line change
@@ -14,8 +14,8 @@ public TestPublisher() {
Thread.sleep(5000);
EventBroker eventBroker = EventBroker.getInstance();

for (int i = 0; i < 10; i++) {
eventBroker.post(new TestEvent("Test Event " + i));
for (int i = 0; i < 100000; i++) {
eventBroker.post(new TestEvent("Test Event " + i, i));
}
} catch (InterruptedException e) {
Thread.currentThread().interrupt();
Original file line number Diff line number Diff line change
@@ -0,0 +1,28 @@
// Copyright 2024 Amazon.com, Inc. or its affiliates. All Rights Reserved.
// SPDX-License-Identifier: Apache-2.0

package software.aws.toolkits.eclipse.amazonq.subscriber;

import software.aws.toolkits.eclipse.amazonq.events.TestEvent;
import software.aws.toolkits.eclipse.amazonq.observers.EventObserver;
import software.aws.toolkits.eclipse.amazonq.plugin.Activator;

public final class TestSubscribers implements EventObserver<TestEvent> {

private int previousSequenceNumber = -1;

public TestSubscribers() {

}

@Override
public void onEvent(final TestEvent event) {
Activator.getLogger().info(event.getMessage());

if (event.getSequenceNumber() - previousSequenceNumber != 1) {
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I am unclear on the significance of the sequence number here

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

The sequence numbers are more for testing the POC on my end. They helped ensure that the events arrived in the order in which they were published to all subscribers.

Activator.getLogger().info("OUT OF ORDER: " + event.getSequenceNumber() + " " + previousSequenceNumber);
}

previousSequenceNumber = event.getSequenceNumber();
}
}
Original file line number Diff line number Diff line change
@@ -9,7 +9,6 @@
import org.eclipse.swt.widgets.Display;
import org.eclipse.ui.part.ViewPart;

import software.aws.toolkits.eclipse.amazonq.broker.EventBroker;
import software.aws.toolkits.eclipse.amazonq.controllers.AmazonQViewController;
import software.aws.toolkits.eclipse.amazonq.events.TestEvent;
import software.aws.toolkits.eclipse.amazonq.lsp.auth.AuthStatusChangedListener;
@@ -30,7 +29,7 @@ public abstract class AmazonQView extends ViewPart implements AuthStatusChangedL
protected AmazonQView() {
this.viewController = new AmazonQViewController();
new TestPublisher();
EventBroker.getInstance().subscribe(this);
// EventBroker.getInstance().subscribe(this);
}

public final Browser getBrowser() {