Skip to content

Commit

Permalink
Add utility function to check if an executor is direct or not
Browse files Browse the repository at this point in the history
Don't move futures to different executor if it's a direct executor
  • Loading branch information
krka committed Sep 26, 2019
1 parent b7395b3 commit e4f772b
Show file tree
Hide file tree
Showing 3 changed files with 71 additions and 1 deletion.
Original file line number Diff line number Diff line change
Expand Up @@ -83,6 +83,7 @@ public class DefaultRawMemcacheClient extends AbstractRawMemcacheClient {
private final BatchFlusher flusher;
private final HostAndPort address;
private final Executor executor;
private final boolean isDirectExecutor;
private final long timeoutMillis;
private final Metrics metrics;
private final int maxSetLength;
Expand Down Expand Up @@ -194,6 +195,7 @@ private DefaultRawMemcacheClient(
int maxSetLength) {
this.address = address;
this.executor = executor;
this.isDirectExecutor = executor == null || Utils.isDirectExecutor(executor);
this.timeoutMillis = timeoutMillis;
this.metrics = metrics;
this.maxSetLength = maxSetLength;
Expand Down Expand Up @@ -247,7 +249,11 @@ public <T> CompletionStage<T> send(final Request<T> request) {
}

private <T> CompletionStage<T> onExecutor(CompletionStage<T> future) {
return Utils.onExecutor(future, executor);
if (isDirectExecutor) {
return future;
} else {
return Utils.onExecutor(future, executor);
}
}

/**
Expand Down
41 changes: 41 additions & 0 deletions folsom/src/main/java/com/spotify/folsom/client/Utils.java
Original file line number Diff line number Diff line change
Expand Up @@ -25,6 +25,8 @@
import java.util.concurrent.Executor;
import java.util.function.Function;

import static java.util.Objects.requireNonNull;

public final class Utils {

private static final int TTL_CUTOFF = 60 * 60 * 24 * 30;
Expand Down Expand Up @@ -72,4 +74,43 @@ static <T> CompletionStage<T> onExecutor(CompletionStage<T> future, Executor exe
}
return future.whenCompleteAsync((t, throwable) -> {}, executor);
}

/**
* Checks if an executor is direct or not. Direct means that the executor executes work on the
* same thread as the work was submitted on.
*
* @param executor may not be null
* @return true if the executor is a direct executor, otherwise false
*/
static boolean isDirectExecutor(Executor executor) {
requireNonNull(executor);
RunnableWithThread runnableWithThread = new RunnableWithThread();
try {
executor.execute(runnableWithThread);

// We have the following cases
// 1) It is a direct executor, so runnableWithThread.thread will be set to the current thread.
// We will correctly return true
// 2) It is not a direct executor and runnableWithThread.thread is still null when we check
// it.
// We correctly return false
// 3) It is not a direct executor but the runnableWithThread.thread somehow managed to get set
// before we check it. In any case, it can't be referencing the same thread we are in
// We correctly return false

return runnableWithThread.thread == Thread.currentThread();
} catch (Exception e) {
// If the executor throws any exception, it can not be a direct executor
return false;
}
}

private static class RunnableWithThread implements Runnable {
private Thread thread;

@Override
public void run() {
thread = Thread.currentThread();
}
}
}
23 changes: 23 additions & 0 deletions folsom/src/test/java/com/spotify/folsom/client/UtilsTest.java
Original file line number Diff line number Diff line change
Expand Up @@ -2,11 +2,14 @@

import static org.junit.Assert.assertEquals;

import com.google.common.util.concurrent.MoreExecutors;
import com.google.common.util.concurrent.ThreadFactoryBuilder;
import java.util.concurrent.CompletableFuture;
import java.util.concurrent.ExecutionException;
import java.util.concurrent.Executor;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;
import java.util.concurrent.ForkJoinPool;
import org.junit.Test;

public class UtilsTest {
Expand Down Expand Up @@ -70,4 +73,24 @@ public void testOnExecutorException() throws ExecutionException, InterruptedExce
});
assertEquals("thread-B-0", future2.get());
}

@Test
public void testIsDirectExecutor() {
assertDirect(MoreExecutors.directExecutor(), true);
assertDirect(MoreExecutors.newDirectExecutorService(), true);

assertDirect(ForkJoinPool.commonPool(), false);
assertDirect(Executors.newFixedThreadPool(1), false);
assertDirect(Executors.newFixedThreadPool(10), false);
assertDirect(Executors.newSingleThreadExecutor(), false);
assertDirect(Executors.newCachedThreadPool(), false);
assertDirect(Executors.newWorkStealingPool(), false);
assertDirect(Executors.newWorkStealingPool(10), false);
}

private void assertDirect(Executor executor, boolean expected) {
for (int i = 0; i < 1000000; i++) {
assertEquals(expected, Utils.isDirectExecutor(executor));
}
}
}

0 comments on commit e4f772b

Please sign in to comment.