diff --git a/core/src/main/java/com/linecorp/armeria/client/endpoint/healthcheck/DefaultHealthCheckerContext.java b/core/src/main/java/com/linecorp/armeria/client/endpoint/healthcheck/DefaultHealthCheckerContext.java index 3cfc522b2ae..7c7e9ea5ff2 100644 --- a/core/src/main/java/com/linecorp/armeria/client/endpoint/healthcheck/DefaultHealthCheckerContext.java +++ b/core/src/main/java/com/linecorp/armeria/client/endpoint/healthcheck/DefaultHealthCheckerContext.java @@ -186,7 +186,7 @@ public void updateHealth(double health) { } @Override - public void updateHealth(double health, ClientRequestContext ctx, + public void updateHealth(double health, @Nullable ClientRequestContext ctx, @Nullable ResponseHeaders headers, @Nullable Throwable cause) { final boolean isHealthy = health > 0; if (headers != null && headers.contains("x-envoy-degraded")) { diff --git a/core/src/main/java/com/linecorp/armeria/client/endpoint/healthcheck/HealthCheckedEndpointGroupBuilder.java b/core/src/main/java/com/linecorp/armeria/client/endpoint/healthcheck/HealthCheckedEndpointGroupBuilder.java index 5e950cc3ca7..e055377cbfb 100644 --- a/core/src/main/java/com/linecorp/armeria/client/endpoint/healthcheck/HealthCheckedEndpointGroupBuilder.java +++ b/core/src/main/java/com/linecorp/armeria/client/endpoint/healthcheck/HealthCheckedEndpointGroupBuilder.java @@ -23,7 +23,7 @@ import com.linecorp.armeria.client.Endpoint; import com.linecorp.armeria.client.endpoint.EndpointGroup; import com.linecorp.armeria.common.util.AsyncCloseable; -import com.linecorp.armeria.internal.client.endpoint.healthcheck.HttpHealthChecker; +import com.linecorp.armeria.internal.client.endpoint.healthcheck.DefaultHttpHealthChecker; /** * A builder for creating a new {@link HealthCheckedEndpointGroup} that sends HTTP health check requests. @@ -73,8 +73,8 @@ private static class HttpHealthCheckerFactory implements Function 0) { + headers = builder.add(HttpHeaderNames.IF_NONE_MATCH, + wasHealthy ? "\"healthy\"" : "\"unhealthy\"") + .add(HttpHeaderNames.PREFER, "wait=" + maxLongPollingSeconds) + .build(); + } else { + headers = builder.build(); + } + + try (ClientRequestContextCaptor reqCtxCaptor = Clients.newContextCaptor()) { + lastResponse = webClient.execute(headers); + final ClientRequestContext reqCtx = reqCtxCaptor.get(); + lastResponse.subscribe(new HealthCheckResponseSubscriber(reqCtx, lastResponse), + reqCtx.eventLoop().withoutContext(), + SubscriptionOption.WITH_POOLED_OBJECTS); + } + } finally { + unlock(); + } + } + + @Override + public CompletableFuture closeAsync() { + return closeable.closeAsync(); + } + + private synchronized void closeAsync(CompletableFuture future) { + lock(); + try { + if (lastResponse == null) { + // Called even before the first request is sent. + future.complete(null); + } else { + lastResponse.abort(); + lastResponse.whenComplete().handle((unused1, unused2) -> future.complete(null)); + } + } finally { + unlock(); + } + } + + @Override + public void close() { + closeable.close(); + } + + private final class ResponseTimeoutUpdater extends SimpleDecoratingHttpClient { + ResponseTimeoutUpdater(HttpClient delegate) { + super(delegate); + } + + @Override + public HttpResponse execute(ClientRequestContext ctx, HttpRequest req) throws Exception { + if (maxLongPollingSeconds > 0) { + ctx.setResponseTimeoutMillis(TimeoutMode.EXTEND, + TimeUnit.SECONDS.toMillis(maxLongPollingSeconds)); + } + return unwrap().execute(ctx, req); + } + } + + private class HealthCheckResponseSubscriber implements Subscriber { + + private final ClientRequestContext reqCtx; + private final HttpResponse res; + @Nullable + private Subscription subscription; + @Nullable + private ResponseHeaders responseHeaders; + private boolean isHealthy; + private boolean receivedExpectedResponse; + private boolean updatedHealth; + + @Nullable + private ScheduledFuture pingCheckFuture; + private long lastPingTimeNanos; + + HealthCheckResponseSubscriber(ClientRequestContext reqCtx, HttpResponse res) { + this.reqCtx = reqCtx; + this.res = res; + } + + @Override + public void onSubscribe(Subscription subscription) { + this.subscription = subscription; + subscription.request(1); + maybeSchedulePingCheck(); + } + + @Override + public void onNext(HttpObject obj) { + assert subscription != null; + + if (closeable.isClosing()) { + subscription.cancel(); + return; + } + + try { + if (!(obj instanceof ResponseHeaders)) { + PooledObjects.close(obj); + return; + } + + final ResponseHeaders headers = (ResponseHeaders) obj; + responseHeaders = headers; + updateLongPollingSettings(headers); + + final HttpStatus status = headers.status(); + final HttpStatusClass statusClass = status.codeClass(); + switch (statusClass) { + case INFORMATIONAL: + maybeSchedulePingCheck(); + break; + case SERVER_ERROR: + receivedExpectedResponse = true; + break; + case SUCCESS: + isHealthy = true; + receivedExpectedResponse = true; + break; + default: + if (status == HttpStatus.NOT_MODIFIED) { + isHealthy = wasHealthy; + receivedExpectedResponse = true; + } else { + // Do not use long polling on an unexpected status for safety. + maxLongPollingSeconds = 0; + + if (statusClass == HttpStatusClass.CLIENT_ERROR) { + logger.warn("{} Unexpected 4xx health check response: {} A 4xx response " + + "generally indicates a misconfiguration of the client. " + + "Did you happen to forget to configure the {}'s client options?", + reqCtx, headers, HealthCheckedEndpointGroup.class.getSimpleName()); + } else { + logger.warn("{} Unexpected health check response: {}", reqCtx, headers); + } + } + } + } finally { + subscription.request(1); + } + } + + @Override + public void onError(Throwable t) { + updateHealth(t); + } + + @Override + public void onComplete() { + updateHealth(null); + } + + private void updateLongPollingSettings(ResponseHeaders headers) { + final String longPollingSettings = headers.get(ARMERIA_LPHC); + if (longPollingSettings == null) { + maxLongPollingSeconds = 0; + pingIntervalSeconds = 0; + return; + } + + final int commaPos = longPollingSettings.indexOf(','); + int maxLongPollingSeconds = 0; + int pingIntervalSeconds = 0; + try { + maxLongPollingSeconds = Integer.max( + 0, Integer.parseInt(longPollingSettings.substring(0, commaPos).trim())); + pingIntervalSeconds = Integer.max( + 0, Integer.parseInt(longPollingSettings.substring(commaPos + 1).trim())); + } catch (Exception e) { + // Ignore malformed settings. + } + + DefaultHttpHealthChecker.this.maxLongPollingSeconds = maxLongPollingSeconds; + if (maxLongPollingSeconds > 0 && pingIntervalSeconds < maxLongPollingSeconds) { + DefaultHttpHealthChecker.this.pingIntervalSeconds = pingIntervalSeconds; + } else { + DefaultHttpHealthChecker.this.pingIntervalSeconds = 0; + } + } + + // TODO(trustin): Remove once https://github.com/line/armeria/issues/1063 is fixed. + private void maybeSchedulePingCheck() { + lastPingTimeNanos = System.nanoTime(); + + if (pingCheckFuture != null) { + return; + } + + final int pingIntervalSeconds = DefaultHttpHealthChecker.this.pingIntervalSeconds; + if (pingIntervalSeconds <= 0) { + return; + } + + final long pingTimeoutNanos = TimeUnit.SECONDS.toNanos(pingIntervalSeconds) * 2; + pingCheckFuture = reqCtx.eventLoop().withoutContext().scheduleWithFixedDelay(() -> { + if (System.nanoTime() - lastPingTimeNanos >= pingTimeoutNanos) { + // Did not receive a ping on time. + final ResponseTimeoutException cause = ResponseTimeoutException.get(); + res.abort(cause); + isHealthy = false; + receivedExpectedResponse = false; + updateHealth(cause); + } + }, 1, 1, TimeUnit.SECONDS); + } + + private void updateHealth(@Nullable Throwable cause) { + if (pingCheckFuture != null) { + pingCheckFuture.cancel(false); + } + + if (closeable.isClosing() || updatedHealth) { + return; + } + + updatedHealth = true; + + ctx.updateHealth(isHealthy ? 1 : 0, reqCtx, responseHeaders, cause); + wasHealthy = isHealthy; + + final ScheduledExecutorService executor = ctx.executor(); + try { + // Send a long polling check immediately if: + // - Server has long polling enabled. + // - Server responded with 2xx or 5xx. + if (maxLongPollingSeconds > 0 && receivedExpectedResponse) { + executor.execute(DefaultHttpHealthChecker.this::check); + } else { + executor.schedule(DefaultHttpHealthChecker.this::check, + ctx.nextDelayMillis(), TimeUnit.MILLISECONDS); + } + } catch (RejectedExecutionException ignored) { + // Can happen if the Endpoint being checked has been disappeared from + // the delegate EndpointGroup. See HealthCheckedEndpointGroupTest.disappearedEndpoint(). + } + } + } + + private void lock() { + lock.lock(); + } + + private void unlock() { + lock.unlock(); + } +} diff --git a/core/src/main/java/com/linecorp/armeria/internal/client/endpoint/healthcheck/HttpHealthChecker.java b/core/src/main/java/com/linecorp/armeria/internal/client/endpoint/healthcheck/HttpHealthChecker.java index 6c9f7a4c220..831a09810fd 100644 --- a/core/src/main/java/com/linecorp/armeria/internal/client/endpoint/healthcheck/HttpHealthChecker.java +++ b/core/src/main/java/com/linecorp/armeria/internal/client/endpoint/healthcheck/HttpHealthChecker.java @@ -13,342 +13,10 @@ * License for the specific language governing permissions and limitations * under the License. */ -package com.linecorp.armeria.internal.client.endpoint.healthcheck; - -import java.util.concurrent.CompletableFuture; -import java.util.concurrent.RejectedExecutionException; -import java.util.concurrent.ScheduledExecutorService; -import java.util.concurrent.TimeUnit; -import java.util.concurrent.locks.ReentrantLock; -import org.reactivestreams.Subscriber; -import org.reactivestreams.Subscription; -import org.slf4j.Logger; -import org.slf4j.LoggerFactory; +package com.linecorp.armeria.internal.client.endpoint.healthcheck; -import com.linecorp.armeria.client.ClientRequestContext; -import com.linecorp.armeria.client.ClientRequestContextCaptor; -import com.linecorp.armeria.client.Clients; -import com.linecorp.armeria.client.Endpoint; -import com.linecorp.armeria.client.HttpClient; -import com.linecorp.armeria.client.ResponseTimeoutException; -import com.linecorp.armeria.client.SimpleDecoratingHttpClient; -import com.linecorp.armeria.client.WebClient; -import com.linecorp.armeria.client.endpoint.healthcheck.HealthCheckedEndpointGroup; -import com.linecorp.armeria.client.endpoint.healthcheck.HealthCheckerContext; -import com.linecorp.armeria.common.HttpHeaderNames; -import com.linecorp.armeria.common.HttpMethod; -import com.linecorp.armeria.common.HttpObject; -import com.linecorp.armeria.common.HttpRequest; -import com.linecorp.armeria.common.HttpResponse; -import com.linecorp.armeria.common.HttpStatus; -import com.linecorp.armeria.common.HttpStatusClass; -import com.linecorp.armeria.common.RequestHeaders; -import com.linecorp.armeria.common.RequestHeadersBuilder; -import com.linecorp.armeria.common.ResponseHeaders; -import com.linecorp.armeria.common.SessionProtocol; -import com.linecorp.armeria.common.annotation.Nullable; -import com.linecorp.armeria.common.stream.SubscriptionOption; import com.linecorp.armeria.common.util.AsyncCloseable; -import com.linecorp.armeria.common.util.AsyncCloseableSupport; -import com.linecorp.armeria.common.util.TimeoutMode; -import com.linecorp.armeria.internal.common.util.ReentrantShortLock; -import com.linecorp.armeria.unsafe.PooledObjects; - -import io.netty.util.AsciiString; -import io.netty.util.concurrent.ScheduledFuture; - -public final class HttpHealthChecker implements AsyncCloseable { - - private static final Logger logger = LoggerFactory.getLogger(HttpHealthChecker.class); - - private static final AsciiString ARMERIA_LPHC = HttpHeaderNames.of("armeria-lphc"); - - private final ReentrantLock lock = new ReentrantShortLock(); - private final HealthCheckerContext ctx; - private final WebClient webClient; - private final String authority; - private final String path; - private final boolean useGet; - private boolean wasHealthy; - private int maxLongPollingSeconds; - private int pingIntervalSeconds; - @Nullable - private HttpResponse lastResponse; - private final AsyncCloseableSupport closeable = AsyncCloseableSupport.of(this::closeAsync); - - public HttpHealthChecker(HealthCheckerContext ctx, Endpoint endpoint, String path, boolean useGet, - SessionProtocol protocol, @Nullable String host) { - this.ctx = ctx; - webClient = WebClient.builder(protocol, endpoint) - .options(ctx.clientOptions()) - .decorator(ResponseTimeoutUpdater::new) - .build(); - authority = host != null ? host : endpoint.authority(); - this.path = path; - this.useGet = useGet; - } - - public void start() { - check(); - } - - private void check() { - lock(); - try { - if (closeable.isClosing()) { - return; - } - - final RequestHeaders headers; - final RequestHeadersBuilder builder = - RequestHeaders.builder(useGet ? HttpMethod.GET : HttpMethod.HEAD, path) - .authority(authority); - if (maxLongPollingSeconds > 0) { - headers = builder.add(HttpHeaderNames.IF_NONE_MATCH, - wasHealthy ? "\"healthy\"" : "\"unhealthy\"") - .add(HttpHeaderNames.PREFER, "wait=" + maxLongPollingSeconds) - .build(); - } else { - headers = builder.build(); - } - - try (ClientRequestContextCaptor reqCtxCaptor = Clients.newContextCaptor()) { - lastResponse = webClient.execute(headers); - final ClientRequestContext reqCtx = reqCtxCaptor.get(); - lastResponse.subscribe(new HealthCheckResponseSubscriber(reqCtx, lastResponse), - reqCtx.eventLoop().withoutContext(), - SubscriptionOption.WITH_POOLED_OBJECTS); - } - } finally { - unlock(); - } - } - - @Override - public CompletableFuture closeAsync() { - return closeable.closeAsync(); - } - - private synchronized void closeAsync(CompletableFuture future) { - lock(); - try { - if (lastResponse == null) { - // Called even before the first request is sent. - future.complete(null); - } else { - lastResponse.abort(); - lastResponse.whenComplete().handle((unused1, unused2) -> future.complete(null)); - } - } finally { - unlock(); - } - } - - @Override - public void close() { - closeable.close(); - } - - private final class ResponseTimeoutUpdater extends SimpleDecoratingHttpClient { - ResponseTimeoutUpdater(HttpClient delegate) { - super(delegate); - } - - @Override - public HttpResponse execute(ClientRequestContext ctx, HttpRequest req) throws Exception { - if (maxLongPollingSeconds > 0) { - ctx.setResponseTimeoutMillis(TimeoutMode.EXTEND, - TimeUnit.SECONDS.toMillis(maxLongPollingSeconds)); - } - return unwrap().execute(ctx, req); - } - } - - private class HealthCheckResponseSubscriber implements Subscriber { - - private final ClientRequestContext reqCtx; - private final HttpResponse res; - @Nullable - private Subscription subscription; - @Nullable - private ResponseHeaders responseHeaders; - private boolean isHealthy; - private boolean receivedExpectedResponse; - private boolean updatedHealth; - - @Nullable - private ScheduledFuture pingCheckFuture; - private long lastPingTimeNanos; - - HealthCheckResponseSubscriber(ClientRequestContext reqCtx, HttpResponse res) { - this.reqCtx = reqCtx; - this.res = res; - } - - @Override - public void onSubscribe(Subscription subscription) { - this.subscription = subscription; - subscription.request(1); - maybeSchedulePingCheck(); - } - - @Override - public void onNext(HttpObject obj) { - assert subscription != null; - - if (closeable.isClosing()) { - subscription.cancel(); - return; - } - - try { - if (!(obj instanceof ResponseHeaders)) { - PooledObjects.close(obj); - return; - } - - final ResponseHeaders headers = (ResponseHeaders) obj; - responseHeaders = headers; - updateLongPollingSettings(headers); - - final HttpStatus status = headers.status(); - final HttpStatusClass statusClass = status.codeClass(); - switch (statusClass) { - case INFORMATIONAL: - maybeSchedulePingCheck(); - break; - case SERVER_ERROR: - receivedExpectedResponse = true; - break; - case SUCCESS: - isHealthy = true; - receivedExpectedResponse = true; - break; - default: - if (status == HttpStatus.NOT_MODIFIED) { - isHealthy = wasHealthy; - receivedExpectedResponse = true; - } else { - // Do not use long polling on an unexpected status for safety. - maxLongPollingSeconds = 0; - - if (statusClass == HttpStatusClass.CLIENT_ERROR) { - logger.warn("{} Unexpected 4xx health check response: {} A 4xx response " + - "generally indicates a misconfiguration of the client. " + - "Did you happen to forget to configure the {}'s client options?", - reqCtx, headers, HealthCheckedEndpointGroup.class.getSimpleName()); - } else { - logger.warn("{} Unexpected health check response: {}", reqCtx, headers); - } - } - } - } finally { - subscription.request(1); - } - } - - @Override - public void onError(Throwable t) { - updateHealth(t); - } - - @Override - public void onComplete() { - updateHealth(null); - } - - private void updateLongPollingSettings(ResponseHeaders headers) { - final String longPollingSettings = headers.get(ARMERIA_LPHC); - if (longPollingSettings == null) { - maxLongPollingSeconds = 0; - pingIntervalSeconds = 0; - return; - } - - final int commaPos = longPollingSettings.indexOf(','); - int maxLongPollingSeconds = 0; - int pingIntervalSeconds = 0; - try { - maxLongPollingSeconds = Integer.max( - 0, Integer.parseInt(longPollingSettings.substring(0, commaPos).trim())); - pingIntervalSeconds = Integer.max( - 0, Integer.parseInt(longPollingSettings.substring(commaPos + 1).trim())); - } catch (Exception e) { - // Ignore malformed settings. - } - - HttpHealthChecker.this.maxLongPollingSeconds = maxLongPollingSeconds; - if (maxLongPollingSeconds > 0 && pingIntervalSeconds < maxLongPollingSeconds) { - HttpHealthChecker.this.pingIntervalSeconds = pingIntervalSeconds; - } else { - HttpHealthChecker.this.pingIntervalSeconds = 0; - } - } - - // TODO(trustin): Remove once https://github.com/line/armeria/issues/1063 is fixed. - private void maybeSchedulePingCheck() { - lastPingTimeNanos = System.nanoTime(); - - if (pingCheckFuture != null) { - return; - } - - final int pingIntervalSeconds = HttpHealthChecker.this.pingIntervalSeconds; - if (pingIntervalSeconds <= 0) { - return; - } - - final long pingTimeoutNanos = TimeUnit.SECONDS.toNanos(pingIntervalSeconds) * 2; - pingCheckFuture = reqCtx.eventLoop().withoutContext().scheduleWithFixedDelay(() -> { - if (System.nanoTime() - lastPingTimeNanos >= pingTimeoutNanos) { - // Did not receive a ping on time. - final ResponseTimeoutException cause = ResponseTimeoutException.get(); - res.abort(cause); - isHealthy = false; - receivedExpectedResponse = false; - updateHealth(cause); - } - }, 1, 1, TimeUnit.SECONDS); - } - - private void updateHealth(@Nullable Throwable cause) { - if (pingCheckFuture != null) { - pingCheckFuture.cancel(false); - } - - if (closeable.isClosing() || updatedHealth) { - return; - } - - updatedHealth = true; - - ctx.updateHealth(isHealthy ? 1 : 0, reqCtx, responseHeaders, cause); - wasHealthy = isHealthy; - - final ScheduledExecutorService executor = ctx.executor(); - try { - // Send a long polling check immediately if: - // - Server has long polling enabled. - // - Server responded with 2xx or 5xx. - if (maxLongPollingSeconds > 0 && receivedExpectedResponse) { - executor.execute(HttpHealthChecker.this::check); - } else { - executor.schedule(HttpHealthChecker.this::check, - ctx.nextDelayMillis(), TimeUnit.MILLISECONDS); - } - } catch (RejectedExecutionException ignored) { - // Can happen if the Endpoint being checked has been disappeared from - // the delegate EndpointGroup. See HealthCheckedEndpointGroupTest.disappearedEndpoint(). - } - } - } - - private void lock() { - lock.lock(); - } - private void unlock() { - lock.unlock(); - } +public interface HttpHealthChecker extends AsyncCloseable { } diff --git a/xds/src/main/java/com/linecorp/armeria/xds/client/endpoint/StaticHttpHealthChecker.java b/xds/src/main/java/com/linecorp/armeria/xds/client/endpoint/StaticHttpHealthChecker.java new file mode 100644 index 00000000000..36d2753f3eb --- /dev/null +++ b/xds/src/main/java/com/linecorp/armeria/xds/client/endpoint/StaticHttpHealthChecker.java @@ -0,0 +1,43 @@ +/* + * Copyright 2024 LINE Corporation + * + * LINE Corporation licenses this file to you under the Apache License, + * version 2.0 (the "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at: + * + * https://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, WITHOUT + * WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the + * License for the specific language governing permissions and limitations + * under the License. + */ + +package com.linecorp.armeria.xds.client.endpoint; + +import java.util.concurrent.CompletableFuture; + +import com.linecorp.armeria.client.endpoint.healthcheck.HealthCheckerContext; +import com.linecorp.armeria.common.util.UnmodifiableFuture; +import com.linecorp.armeria.internal.client.endpoint.healthcheck.HttpHealthChecker; + +final class StaticHttpHealthChecker implements HttpHealthChecker { + + public static HttpHealthChecker of(HealthCheckerContext ctx, double healthy) { + return new StaticHttpHealthChecker(ctx, healthy); + } + + private StaticHttpHealthChecker(HealthCheckerContext ctx, double healthy) { + ctx.updateHealth(healthy, null, null, null); + } + + @Override + public CompletableFuture closeAsync() { + return UnmodifiableFuture.completedFuture(null); + } + + @Override + public void close() { + } +} diff --git a/xds/src/main/java/com/linecorp/armeria/xds/client/endpoint/XdsHealthCheckedEndpointGroupBuilder.java b/xds/src/main/java/com/linecorp/armeria/xds/client/endpoint/XdsHealthCheckedEndpointGroupBuilder.java index dadc53091dc..ef12afd7b00 100644 --- a/xds/src/main/java/com/linecorp/armeria/xds/client/endpoint/XdsHealthCheckedEndpointGroupBuilder.java +++ b/xds/src/main/java/com/linecorp/armeria/xds/client/endpoint/XdsHealthCheckedEndpointGroupBuilder.java @@ -30,7 +30,7 @@ import com.linecorp.armeria.common.HttpMethod; import com.linecorp.armeria.common.SessionProtocol; import com.linecorp.armeria.common.util.AsyncCloseable; -import com.linecorp.armeria.internal.client.endpoint.healthcheck.HttpHealthChecker; +import com.linecorp.armeria.internal.client.endpoint.healthcheck.DefaultHttpHealthChecker; import io.envoyproxy.envoy.config.cluster.v3.Cluster; import io.envoyproxy.envoy.config.core.v3.HealthCheck.HttpHealthCheck; @@ -57,13 +57,17 @@ final class XdsHealthCheckedEndpointGroupBuilder return ctx -> { final LbEndpoint lbEndpoint = EndpointUtil.lbEndpoint(ctx.originalEndpoint()); final HealthCheckConfig healthCheckConfig = lbEndpoint.getEndpoint().getHealthCheckConfig(); + if (healthCheckConfig.getDisableActiveHealthCheck()) { + // health check is disabled, so assume the endpoint is healthy + return StaticHttpHealthChecker.of(ctx, 1.0); + } final String path = httpHealthCheck.getPath(); final String host = Strings.emptyToNull(httpHealthCheck.getHost()); - final HttpHealthChecker checker = - new HttpHealthChecker(ctx, endpoint(healthCheckConfig, ctx.originalEndpoint()), - path, httpMethod(httpHealthCheck) == HttpMethod.GET, - protocol(cluster), host); + final DefaultHttpHealthChecker checker = + new DefaultHttpHealthChecker(ctx, endpoint(healthCheckConfig, ctx.originalEndpoint()), + path, httpMethod(httpHealthCheck) == HttpMethod.GET, + protocol(cluster), host); checker.start(); return checker; }; diff --git a/xds/src/test/java/com/linecorp/armeria/xds/XdsTestResources.java b/xds/src/test/java/com/linecorp/armeria/xds/XdsTestResources.java index 4c319796d97..c5d395ef3a9 100644 --- a/xds/src/test/java/com/linecorp/armeria/xds/XdsTestResources.java +++ b/xds/src/test/java/com/linecorp/armeria/xds/XdsTestResources.java @@ -52,6 +52,7 @@ import io.envoyproxy.envoy.config.core.v3.TransportSocket; import io.envoyproxy.envoy.config.endpoint.v3.ClusterLoadAssignment; import io.envoyproxy.envoy.config.endpoint.v3.Endpoint; +import io.envoyproxy.envoy.config.endpoint.v3.Endpoint.HealthCheckConfig; import io.envoyproxy.envoy.config.endpoint.v3.LbEndpoint; import io.envoyproxy.envoy.config.endpoint.v3.LocalityLbEndpoints; import io.envoyproxy.envoy.config.listener.v3.ApiListener; @@ -105,6 +106,11 @@ public static LbEndpoint endpoint(String address, int port, Metadata metadata) { public static LbEndpoint endpoint(String address, int port, Metadata metadata, int weight, HealthStatus healthStatus) { + return endpoint(address, port, metadata, weight, healthStatus, HealthCheckConfig.getDefaultInstance()); + } + + public static LbEndpoint endpoint(String address, int port, Metadata metadata, int weight, + HealthStatus healthStatus, HealthCheckConfig healthCheckConfig) { return LbEndpoint .newBuilder() .setLoadBalancingWeight(UInt32Value.of(weight)) @@ -112,6 +118,7 @@ public static LbEndpoint endpoint(String address, int port, Metadata metadata, i .setHealthStatus(healthStatus) .setEndpoint(Endpoint.newBuilder() .setAddress(address(address, port)) + .setHealthCheckConfig(healthCheckConfig) .build()).build(); } diff --git a/xds/src/test/java/com/linecorp/armeria/xds/client/endpoint/HealthCheckedTest.java b/xds/src/test/java/com/linecorp/armeria/xds/client/endpoint/HealthCheckedTest.java index ece7115fd88..ddae44adc8f 100644 --- a/xds/src/test/java/com/linecorp/armeria/xds/client/endpoint/HealthCheckedTest.java +++ b/xds/src/test/java/com/linecorp/armeria/xds/client/endpoint/HealthCheckedTest.java @@ -35,6 +35,7 @@ import org.junit.jupiter.api.extension.RegisterExtension; import org.junit.jupiter.params.ParameterizedTest; import org.junit.jupiter.params.provider.Arguments; +import org.junit.jupiter.params.provider.EnumSource; import org.junit.jupiter.params.provider.MethodSource; import com.google.common.collect.ImmutableList; @@ -59,8 +60,10 @@ import io.envoyproxy.envoy.config.core.v3.HealthCheck.HttpHealthCheck; import io.envoyproxy.envoy.config.core.v3.HealthStatus; import io.envoyproxy.envoy.config.core.v3.Locality; +import io.envoyproxy.envoy.config.core.v3.Metadata; import io.envoyproxy.envoy.config.endpoint.v3.ClusterLoadAssignment; import io.envoyproxy.envoy.config.endpoint.v3.ClusterLoadAssignment.Policy; +import io.envoyproxy.envoy.config.endpoint.v3.Endpoint.HealthCheckConfig; import io.envoyproxy.envoy.config.endpoint.v3.LbEndpoint; import io.envoyproxy.envoy.config.listener.v3.Listener; import io.envoyproxy.envoy.type.v3.Percent; @@ -222,6 +225,68 @@ void panicCase(double panicThreshold, boolean panicMode) { } } + @ParameterizedTest + @EnumSource(value = HealthStatus.class, names = {"UNKNOWN", "UNHEALTHY", "HEALTHY"}) + void disabled(HealthStatus healthStatus) { + final Listener listener = staticResourceListener(); + final HealthCheckConfig disabledConfig = HealthCheckConfig.newBuilder() + .setDisableActiveHealthCheck(true).build(); + + final List healthyEndpoints = + server.server().activePorts().keySet() + .stream().map(addr -> testEndpoint(addr, healthStatus, disabledConfig)) + .collect(Collectors.toList()); + assertThat(healthyEndpoints).hasSize(3); + final List unhealthyEndpoints = + noHealthCheck.server().activePorts().keySet() + .stream().map(addr -> testEndpoint(addr, healthStatus, disabledConfig)) + .collect(Collectors.toList()); + assertThat(unhealthyEndpoints).hasSize(3); + final List allEndpoints = ImmutableList.builder() + .addAll(healthyEndpoints) + .addAll(unhealthyEndpoints).build(); + + final ClusterLoadAssignment loadAssignment = + ClusterLoadAssignment + .newBuilder() + .addEndpoints(localityLbEndpoints(Locality.getDefaultInstance(), allEndpoints)) + .setPolicy(Policy.newBuilder().setWeightedPriorityHealth(true)) + .build(); + final HttpHealthCheck httpHealthCheck = HttpHealthCheck.newBuilder() + .setPath("/monitor/healthcheck") + .build(); + final Cluster cluster = createStaticCluster("cluster", loadAssignment) + .toBuilder() + .addHealthChecks(HealthCheck.newBuilder().setHttpHealthCheck(httpHealthCheck)) + .setCommonLbConfig(CommonLbConfig.newBuilder() + .setHealthyPanicThreshold(Percent.newBuilder().setValue(0))) + .build(); + + final Bootstrap bootstrap = staticBootstrap(listener, cluster); + try (XdsBootstrap xdsBootstrap = XdsBootstrap.of(bootstrap); + EndpointGroup endpointGroup = XdsEndpointGroup.of("listener", xdsBootstrap)) { + await().untilAsserted(() -> assertThat(endpointGroup.whenReady()).isDone()); + + final ClientRequestContext ctx = + ClientRequestContext.of(HttpRequest.of(HttpMethod.GET, "/")); + final Endpoint endpoint = endpointGroup.selectNow(ctx); + + // The healthStatus set to the endpoint overrides + if (healthStatus == HealthStatus.HEALTHY || healthStatus == HealthStatus.UNKNOWN) { + assertThat(endpoint).isNotNull(); + } else { + assertThat(healthStatus).isEqualTo(HealthStatus.UNHEALTHY); + assertThat(endpoint).isNull(); + } + } + } + + private static LbEndpoint testEndpoint(InetSocketAddress address, HealthStatus healthStatus, + HealthCheckConfig config) { + return endpoint(address.getAddress().getHostAddress(), address.getPort(), + Metadata.getDefaultInstance(), 1, healthStatus, config); + } + private static List ports(ServerExtension server) { return server.server().activePorts().keySet().stream() .map(InetSocketAddress::getPort).collect(Collectors.toList());