Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Add utility function to check if an executor is direct or not #143

Open
wants to merge 1 commit into
base: master
Choose a base branch
from
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -83,6 +83,7 @@ public class DefaultRawMemcacheClient extends AbstractRawMemcacheClient {
private final BatchFlusher flusher;
private final HostAndPort address;
private final Executor executor;
private final boolean isDirectExecutor;
private final long timeoutMillis;
private final Metrics metrics;
private final int maxSetLength;
Expand Down Expand Up @@ -194,6 +195,7 @@ private DefaultRawMemcacheClient(
int maxSetLength) {
this.address = address;
this.executor = executor;
this.isDirectExecutor = executor == null || Utils.isDirectExecutor(executor);
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Nitpicking here, but given isDirectExecutor really meaning "is a direct executor or null", how about rather naming it something like "wrapWithExecutor"? (I'm terrible at naming so I'm sure an even better name would be available)

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Sure, that's a good idea

this.timeoutMillis = timeoutMillis;
this.metrics = metrics;
this.maxSetLength = maxSetLength;
Expand Down Expand Up @@ -247,7 +249,11 @@ public <T> CompletionStage<T> send(final Request<T> request) {
}

private <T> CompletionStage<T> onExecutor(CompletionStage<T> future) {
return Utils.onExecutor(future, executor);
if (isDirectExecutor) {
return future;
} else {
return Utils.onExecutor(future, executor);
}
}

/**
Expand Down
41 changes: 41 additions & 0 deletions folsom/src/main/java/com/spotify/folsom/client/Utils.java
Original file line number Diff line number Diff line change
Expand Up @@ -16,6 +16,8 @@

package com.spotify.folsom.client;

import static java.util.Objects.requireNonNull;

import com.google.common.collect.Iterables;
import com.google.common.collect.Lists;
import java.util.List;
Expand Down Expand Up @@ -72,4 +74,43 @@ static <T> CompletionStage<T> onExecutor(CompletionStage<T> future, Executor exe
}
return future.whenCompleteAsync((t, throwable) -> {}, executor);
}

/**
* Checks if an executor is direct or not. Direct means that the executor executes work on the
* same thread as the work was submitted on.
*
* @param executor may not be null
* @return true if the executor is a direct executor, otherwise false
*/
static boolean isDirectExecutor(Executor executor) {
requireNonNull(executor);
RunnableWithThread runnableWithThread = new RunnableWithThread();
try {
executor.execute(runnableWithThread);

// We have the following cases
// 1) It is a direct executor, so runnableWithThread.thread will be set to the current thread.
// We will correctly return true
// 2) It is not a direct executor and runnableWithThread.thread is still null when we check
// it.
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Nitpicking: wrong indentation

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Oops

// We correctly return false
// 3) It is not a direct executor but the runnableWithThread.thread somehow managed to get set
// before we check it. In any case, it can't be referencing the same thread we are in
// We correctly return false

return runnableWithThread.thread == Thread.currentThread();
} catch (Exception e) {
// If the executor throws any exception, it can not be a direct executor
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Is this as safe assumption? Looking at the Guava DirectExecutorService, it can throw exceptions here, but only if the executor is shutting down, which is probably fine either way. But, would we want to assume other direct executors would behave the same way?

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

No, probably not a safe assumption. It depends a bit on what the function is used for. If it's only for optimization purposes and sanity checks it may still be good enough

return false;
}
}

private static class RunnableWithThread implements Runnable {
private Thread thread;

@Override
public void run() {
thread = Thread.currentThread();
}
}
}
23 changes: 23 additions & 0 deletions folsom/src/test/java/com/spotify/folsom/client/UtilsTest.java
Original file line number Diff line number Diff line change
Expand Up @@ -2,11 +2,14 @@

import static org.junit.Assert.assertEquals;

import com.google.common.util.concurrent.MoreExecutors;
import com.google.common.util.concurrent.ThreadFactoryBuilder;
import java.util.concurrent.CompletableFuture;
import java.util.concurrent.ExecutionException;
import java.util.concurrent.Executor;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;
import java.util.concurrent.ForkJoinPool;
import org.junit.Test;

public class UtilsTest {
Expand Down Expand Up @@ -70,4 +73,24 @@ public void testOnExecutorException() throws ExecutionException, InterruptedExce
});
assertEquals("thread-B-0", future2.get());
}

@Test
public void testIsDirectExecutor() {
assertDirect(MoreExecutors.directExecutor(), true);
assertDirect(MoreExecutors.newDirectExecutorService(), true);

assertDirect(ForkJoinPool.commonPool(), false);
assertDirect(Executors.newFixedThreadPool(1), false);
assertDirect(Executors.newFixedThreadPool(10), false);
assertDirect(Executors.newSingleThreadExecutor(), false);
assertDirect(Executors.newCachedThreadPool(), false);
assertDirect(Executors.newWorkStealingPool(), false);
assertDirect(Executors.newWorkStealingPool(10), false);
}

private void assertDirect(Executor executor, boolean expected) {
for (int i = 0; i < 1000000; i++) {
assertEquals(expected, Utils.isDirectExecutor(executor));
}
}
}