Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

do not block other threads from health checking if a stream is blocked #34283

Open
wants to merge 4 commits into
base: master
Choose a base branch
from
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -26,6 +26,7 @@
import java.util.concurrent.Executors;
import java.util.concurrent.RejectedExecutionException;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.atomic.AtomicBoolean;
import java.util.function.Function;
import javax.annotation.concurrent.GuardedBy;
import org.apache.beam.runners.dataflow.worker.windmill.client.grpc.observers.StreamObserverCancelledException;
Expand Down Expand Up @@ -83,6 +84,7 @@ public abstract class AbstractWindmillStream<RequestT, ResponseT> implements Win
private final String backendWorkerToken;
private final ResettableThrowingStreamObserver<RequestT> requestObserver;
private final StreamDebugMetrics debugMetrics;
private final AtomicBoolean isHealthCheckScheduled;

@GuardedBy("this")
protected boolean clientClosed;
Expand Down Expand Up @@ -115,6 +117,7 @@ protected AbstractWindmillStream(
this.clientClosed = false;
this.isShutdown = false;
this.started = false;
this.isHealthCheckScheduled = new AtomicBoolean(false);
this.finishLatch = new CountDownLatch(1);
this.logger = logger;
this.requestObserver =
Expand Down Expand Up @@ -236,13 +239,35 @@ protected final void executeSafely(Runnable runnable) {
}
}

public final synchronized void maybeSendHealthCheck(Instant lastSendThreshold) {
if (!clientClosed && debugMetrics.getLastSendTimeMs() < lastSendThreshold.getMillis()) {
try {
sendHealthCheck();
} catch (Exception e) {
logger.debug("Received exception sending health check.", e);
}
/**
* Schedule an application level keep-alive health check to be sent on the stream.
*
* @implNote This is sent asynchronously via an executor to minimize blocking. Messages are sent
* serially. If we recently sent a message before we attempt to schedule the health check, the
* stream has been restarted/closed, there is a scheduled health check that hasn't completed
* or there was a more recent send by the time we enter the synchronized block, we skip the
* attempt to send scheduled the health check.
*/
public final void maybeScheduleHealthCheck(Instant lastSendThreshold) {
if (debugMetrics.getLastSendTimeMs() < lastSendThreshold.getMillis()
&& isHealthCheckScheduled.compareAndSet(false, true)) {
// Don't block other streams when sending health check.
executeSafely(
() -> {
synchronized (this) {
if (!clientClosed
&& debugMetrics.getLastSendTimeMs() < lastSendThreshold.getMillis()) {
try {
sendHealthCheck();
} catch (Exception e) {
logger.debug("Received exception sending health check.", e);
}
}

// Ready to send another health check after we attempt the scheduled health check.
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

seems like it would be a little safer to move the try to cover all of this lambda
so that we are sure to set it back to false.

try {
sycnrhonized () { ... maybe send }
} catch {
log
} finally {
isHealthCheckScheduled.set(false);
}

isHealthCheckScheduled.set(false);
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

how about showing if health check is scheduled in the appendSummaryHtml?

}
});
}
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -150,7 +150,7 @@ protected boolean hasPendingRequests() {
}

@Override
public void sendHealthCheck() throws WindmillStreamShutdownException {
protected void sendHealthCheck() throws WindmillStreamShutdownException {
if (hasPendingRequests()) {
StreamingCommitWorkRequest.Builder builder = StreamingCommitWorkRequest.newBuilder();
builder.addCommitChunkBuilder().setRequestId(HEARTBEAT_REQUEST_ID);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -238,7 +238,7 @@ public void appendSpecificHtml(PrintWriter writer) {
}

@Override
public void sendHealthCheck() throws WindmillStreamShutdownException {
protected void sendHealthCheck() throws WindmillStreamShutdownException {
trySend(HEALTH_CHECK_REQUEST);
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -296,7 +296,7 @@ public void onHeartbeatResponse(List<Windmill.ComputationHeartbeatResponse> resp
}

@Override
public void sendHealthCheck() throws WindmillStreamShutdownException {
protected void sendHealthCheck() throws WindmillStreamShutdownException {
if (hasPendingRequests()) {
trySend(HEALTH_CHECK_REQUEST);
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -166,7 +166,7 @@ public void appendSpecificHtml(PrintWriter writer) {
}

@Override
public void sendHealthCheck() throws WindmillStreamShutdownException {
protected void sendHealthCheck() throws WindmillStreamShutdownException {
trySend(HEALTH_CHECK);
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -155,7 +155,7 @@ public void run() {
Instant reportThreshold =
Instant.now().minus(Duration.millis(healthCheckIntervalMillis));
for (AbstractWindmillStream<?, ?> stream : streamFactory.streamRegistry) {
stream.maybeSendHealthCheck(reportThreshold);
stream.maybeScheduleHealthCheck(reportThreshold);
}
}
},
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -36,10 +36,13 @@
import org.apache.beam.vendor.grpc.v1p69p0.io.grpc.stub.CallStreamObserver;
import org.apache.beam.vendor.grpc.v1p69p0.io.grpc.stub.StreamObserver;
import org.apache.beam.vendor.guava.v32_1_2_jre.com.google.common.util.concurrent.Uninterruptibles;
import org.joda.time.Duration;
import org.joda.time.Instant;
import org.junit.Before;
import org.junit.Test;
import org.junit.runner.RunWith;
import org.junit.runners.JUnit4;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

@RunWith(JUnit4.class)
Expand All @@ -61,42 +64,9 @@ private TestStream newStream(

@Test
public void testShutdown_notBlockedBySend() throws InterruptedException, ExecutionException {
CountDownLatch sendBlocker = new CountDownLatch(1);
TestCallStreamObserver callStreamObserver = new TestCallStreamObserver(/* waitForSend= */ true);
Function<StreamObserver<Integer>, StreamObserver<Integer>> clientFactory =
ignored ->
new CallStreamObserver<Integer>() {
@Override
public void onNext(Integer integer) {
try {
sendBlocker.await();
} catch (InterruptedException e) {
throw new RuntimeException(e);
}
}

@Override
public void onError(Throwable throwable) {}

@Override
public void onCompleted() {}

@Override
public boolean isReady() {
return false;
}

@Override
public void setOnReadyHandler(Runnable runnable) {}

@Override
public void disableAutoInboundFlowControl() {}

@Override
public void request(int i) {}

@Override
public void setMessageCompression(boolean b) {}
};
ignored -> callStreamObserver;

TestStream testStream = newStream(clientFactory);
testStream.start();
Expand All @@ -110,12 +80,67 @@ public void setMessageCompression(boolean b) {}
// Sleep a bit to give sendExecutor time to execute the send().
Uninterruptibles.sleepUninterruptibly(100, TimeUnit.MILLISECONDS);

sendBlocker.countDown();
callStreamObserver.unblockSend();
assertThat(sendFuture.get()).isInstanceOf(WindmillStreamShutdownException.class);
}

@Test
public void testMaybeScheduleHealthCheck() {
TestCallStreamObserver callStreamObserver =
new TestCallStreamObserver(/* waitForSend= */ false);
Function<StreamObserver<Integer>, StreamObserver<Integer>> clientFactory =
ignored -> callStreamObserver;

TestStream testStream = newStream(clientFactory);
testStream.start();
Instant reportingThreshold = Instant.now().minus(Duration.millis(1));

testStream.maybeScheduleHealthCheck(reportingThreshold);
testStream.waitForHealthChecks(1);
assertThat(testStream.numHealthChecks.get()).isEqualTo(1);
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

not sure if this is going to be flaky since lambda queued by trySend could possibly not have actually executed yet. Seems like it could be safer to loop while it is not yet 1.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

done

testStream.shutdown();
}

@Test
public void testMaybeSendHealthCheck_doesNotSendIfLastScheduleLessThanThreshold() {
TestCallStreamObserver callStreamObserver =
new TestCallStreamObserver(/* waitForSend= */ false);
Function<StreamObserver<Integer>, StreamObserver<Integer>> clientFactory =
ignored -> callStreamObserver;

TestStream testStream = newStream(clientFactory);
testStream.start();

try {
testStream.trySend(1);
} catch (WindmillStreamShutdownException e) {
throw new RuntimeException(e);
}

// Sleep a bit to give sendExecutor time to execute the send().
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

doesn't trySend send inline? can this be removed?

Uninterruptibles.sleepUninterruptibly(100, TimeUnit.MILLISECONDS);

// Set a really long reporting threshold.
Instant reportingThreshold = Instant.now().minus(Duration.standardHours(1));

// Should not send health checks since we just sent the above message.
testStream.maybeScheduleHealthCheck(reportingThreshold);
Uninterruptibles.sleepUninterruptibly(100, TimeUnit.MILLISECONDS);
testStream.maybeScheduleHealthCheck(reportingThreshold);
Uninterruptibles.sleepUninterruptibly(100, TimeUnit.MILLISECONDS);

callStreamObserver.waitForSend();
Uninterruptibles.sleepUninterruptibly(100, TimeUnit.MILLISECONDS);
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

// Sleep just to ensure a aysnc health check doesn't show up


assertThat(testStream.numHealthChecks.get()).isEqualTo(0);
testStream.shutdown();
}

private static class TestStream extends AbstractWindmillStream<Integer, Integer> {
private static final Logger LOG = LoggerFactory.getLogger(AbstractWindmillStreamTest.class);

private final AtomicInteger numStarts = new AtomicInteger();
private final AtomicInteger numHealthChecks = new AtomicInteger();

private TestStream(
Function<StreamObserver<Integer>, StreamObserver<Integer>> clientFactory,
Expand Down Expand Up @@ -148,19 +173,90 @@ protected boolean hasPendingRequests() {
@Override
protected void startThrottleTimer() {}

public void testSend(Integer i)
throws ResettableThrowingStreamObserver.StreamClosedException,
WindmillStreamShutdownException {
public void testSend(Integer i) throws WindmillStreamShutdownException {
trySend(i);
}

@Override
protected void sendHealthCheck() {}
protected void sendHealthCheck() {
numHealthChecks.incrementAndGet();
}

private void waitForHealthChecks(int expectedHealthChecks) {
int waitedMillis = 0;
while (numHealthChecks.get() < expectedHealthChecks) {
LOG.info(
"Waited for {}ms for {} health checks. Current health check count is {}.",
waitedMillis,
numHealthChecks.get(),
expectedHealthChecks);
Uninterruptibles.sleepUninterruptibly(100, TimeUnit.MILLISECONDS);
}
}

@Override
protected void appendSpecificHtml(PrintWriter writer) {}

@Override
protected void shutdownInternal() {}
}

private static class TestCallStreamObserver extends CallStreamObserver<Integer> {
private static final Logger LOG = LoggerFactory.getLogger(AbstractWindmillStreamTest.class);
private final CountDownLatch sendBlocker = new CountDownLatch(1);

private final boolean waitForSend;

private TestCallStreamObserver(boolean waitForSend) {
this.waitForSend = waitForSend;
}

private void unblockSend() {
sendBlocker.countDown();
}

private void waitForSend() {
try {
int waitedMillis = 0;
while (!sendBlocker.await(100, TimeUnit.MILLISECONDS)) {
waitedMillis += 100;
LOG.info("Waiting from send for {}ms", waitedMillis);
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I think sendBlocker is being used two different ways and is a little confusing.

How about naming this method to waitForSendUnblocked and logging here that you are waiting for send to be unblocked.
And then below in onNext instead of decrementing sendblocker, instead you increment a count of number of received and can add a new method to wait for N messages to be sent.

}
} catch (InterruptedException e) {
LOG.error("Interrupted waiting for send().");
}
}

@Override
public void onNext(Integer integer) {
if (waitForSend) {
waitForSend();
} else {
sendBlocker.countDown();
}
}

@Override
public void onError(Throwable throwable) {}

@Override
public void onCompleted() {}

@Override
public boolean isReady() {
return true;
}

@Override
public void setOnReadyHandler(Runnable runnable) {}

@Override
public void disableAutoInboundFlowControl() {}

@Override
public void request(int i) {}

@Override
public void setMessageCompression(boolean b) {}
}
}
Loading