Skip to content

Commit

Permalink
Allow XdsEndpointGroup to disable health checking per Endpoint (#…
Browse files Browse the repository at this point in the history
…5879)

Motivation:

While working on documentation, I realized that
`disable_active_health_check` is not supported.
ref:
https://www.envoyproxy.io/docs/envoy/latest/api-v3/config/endpoint/v3/endpoint_components.proto#config-endpoint-v3-endpoint-healthcheckconfig

This can be useful when users would like to force certain endpoints to
be healthy without health checking.

Modifications:

- Renamed `HttpHealthChecker` to `DefaultHttpHealthChecker`
- Introduced the `HttpHealthChecker` interface
- Introduced `StaticHttpHealthChecker` which sets the health status
immediately
- Refer to `HealthCheckConfig#getDisableActiveHealthCheck` and decide
whether to return a `StaticHttpHealthChecker`

Result:

- `disable_active_health_check` is now supported

<!--
Visit this URL to learn more about how to write a pull request
description:

https://armeria.dev/community/developer-guide#how-to-write-pull-request-description
-->
  • Loading branch information
jrhee17 authored Nov 7, 2024
1 parent 90258fe commit 011741d
Show file tree
Hide file tree
Showing 9 changed files with 484 additions and 344 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -186,7 +186,7 @@ public void updateHealth(double health) {
}

@Override
public void updateHealth(double health, ClientRequestContext ctx,
public void updateHealth(double health, @Nullable ClientRequestContext ctx,
@Nullable ResponseHeaders headers, @Nullable Throwable cause) {
final boolean isHealthy = health > 0;
if (headers != null && headers.contains("x-envoy-degraded")) {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -23,7 +23,7 @@
import com.linecorp.armeria.client.Endpoint;
import com.linecorp.armeria.client.endpoint.EndpointGroup;
import com.linecorp.armeria.common.util.AsyncCloseable;
import com.linecorp.armeria.internal.client.endpoint.healthcheck.HttpHealthChecker;
import com.linecorp.armeria.internal.client.endpoint.healthcheck.DefaultHttpHealthChecker;

/**
* A builder for creating a new {@link HealthCheckedEndpointGroup} that sends HTTP health check requests.
Expand Down Expand Up @@ -73,8 +73,8 @@ private static class HttpHealthCheckerFactory implements Function<HealthCheckerC

@Override
public AsyncCloseable apply(HealthCheckerContext ctx) {
final HttpHealthChecker checker = new HttpHealthChecker(ctx, ctx.endpoint(), path, useGet,
ctx.protocol(), null);
final DefaultHttpHealthChecker checker =
new DefaultHttpHealthChecker(ctx, ctx.endpoint(), path, useGet, ctx.protocol(), null);
checker.start();
return checker;
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -90,7 +90,7 @@ public interface HealthCheckerContext {
* @param cause the cause of the failed health check request.
* {@code null} if the health checked request received the {@code headers}.
*/
default void updateHealth(double health, ClientRequestContext ctx,
default void updateHealth(double health, @Nullable ClientRequestContext ctx,
@Nullable ResponseHeaders headers, @Nullable Throwable cause) {
// TODO(ikhoon): Make this method abstract in Armeria 2.0
updateHealth(health);
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,353 @@
/*
* Copyright 2024 LINE Corporation
*
* LINE Corporation licenses this file to you under the Apache License,
* version 2.0 (the "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at:
*
* https://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
* WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
* License for the specific language governing permissions and limitations
* under the License.
*/
package com.linecorp.armeria.internal.client.endpoint.healthcheck;

import java.util.concurrent.CompletableFuture;
import java.util.concurrent.RejectedExecutionException;
import java.util.concurrent.ScheduledExecutorService;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.locks.ReentrantLock;

import org.reactivestreams.Subscriber;
import org.reactivestreams.Subscription;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

import com.linecorp.armeria.client.ClientRequestContext;
import com.linecorp.armeria.client.ClientRequestContextCaptor;
import com.linecorp.armeria.client.Clients;
import com.linecorp.armeria.client.Endpoint;
import com.linecorp.armeria.client.HttpClient;
import com.linecorp.armeria.client.ResponseTimeoutException;
import com.linecorp.armeria.client.SimpleDecoratingHttpClient;
import com.linecorp.armeria.client.WebClient;
import com.linecorp.armeria.client.endpoint.healthcheck.HealthCheckedEndpointGroup;
import com.linecorp.armeria.client.endpoint.healthcheck.HealthCheckerContext;
import com.linecorp.armeria.common.HttpHeaderNames;
import com.linecorp.armeria.common.HttpMethod;
import com.linecorp.armeria.common.HttpObject;
import com.linecorp.armeria.common.HttpRequest;
import com.linecorp.armeria.common.HttpResponse;
import com.linecorp.armeria.common.HttpStatus;
import com.linecorp.armeria.common.HttpStatusClass;
import com.linecorp.armeria.common.RequestHeaders;
import com.linecorp.armeria.common.RequestHeadersBuilder;
import com.linecorp.armeria.common.ResponseHeaders;
import com.linecorp.armeria.common.SessionProtocol;
import com.linecorp.armeria.common.annotation.Nullable;
import com.linecorp.armeria.common.stream.SubscriptionOption;
import com.linecorp.armeria.common.util.AsyncCloseableSupport;
import com.linecorp.armeria.common.util.TimeoutMode;
import com.linecorp.armeria.internal.common.util.ReentrantShortLock;
import com.linecorp.armeria.unsafe.PooledObjects;

import io.netty.util.AsciiString;
import io.netty.util.concurrent.ScheduledFuture;

public final class DefaultHttpHealthChecker implements HttpHealthChecker {

private static final Logger logger = LoggerFactory.getLogger(DefaultHttpHealthChecker.class);

private static final AsciiString ARMERIA_LPHC = HttpHeaderNames.of("armeria-lphc");

private final ReentrantLock lock = new ReentrantShortLock();
private final HealthCheckerContext ctx;
private final WebClient webClient;
private final String authority;
private final String path;
private final boolean useGet;
private boolean wasHealthy;
private int maxLongPollingSeconds;
private int pingIntervalSeconds;
@Nullable
private HttpResponse lastResponse;
private final AsyncCloseableSupport closeable = AsyncCloseableSupport.of(this::closeAsync);

public DefaultHttpHealthChecker(HealthCheckerContext ctx, Endpoint endpoint, String path, boolean useGet,
SessionProtocol protocol, @Nullable String host) {
this.ctx = ctx;
webClient = WebClient.builder(protocol, endpoint)
.options(ctx.clientOptions())
.decorator(ResponseTimeoutUpdater::new)
.build();
authority = host != null ? host : endpoint.authority();
this.path = path;
this.useGet = useGet;
}

public void start() {
check();
}

private void check() {
lock();
try {
if (closeable.isClosing()) {
return;
}

final RequestHeaders headers;
final RequestHeadersBuilder builder =
RequestHeaders.builder(useGet ? HttpMethod.GET : HttpMethod.HEAD, path)
.authority(authority);
if (maxLongPollingSeconds > 0) {
headers = builder.add(HttpHeaderNames.IF_NONE_MATCH,
wasHealthy ? "\"healthy\"" : "\"unhealthy\"")
.add(HttpHeaderNames.PREFER, "wait=" + maxLongPollingSeconds)
.build();
} else {
headers = builder.build();
}

try (ClientRequestContextCaptor reqCtxCaptor = Clients.newContextCaptor()) {
lastResponse = webClient.execute(headers);
final ClientRequestContext reqCtx = reqCtxCaptor.get();
lastResponse.subscribe(new HealthCheckResponseSubscriber(reqCtx, lastResponse),
reqCtx.eventLoop().withoutContext(),
SubscriptionOption.WITH_POOLED_OBJECTS);
}
} finally {
unlock();
}
}

@Override
public CompletableFuture<?> closeAsync() {
return closeable.closeAsync();
}

private synchronized void closeAsync(CompletableFuture<?> future) {
lock();
try {
if (lastResponse == null) {
// Called even before the first request is sent.
future.complete(null);
} else {
lastResponse.abort();
lastResponse.whenComplete().handle((unused1, unused2) -> future.complete(null));
}
} finally {
unlock();
}
}

@Override
public void close() {
closeable.close();
}

private final class ResponseTimeoutUpdater extends SimpleDecoratingHttpClient {
ResponseTimeoutUpdater(HttpClient delegate) {
super(delegate);
}

@Override
public HttpResponse execute(ClientRequestContext ctx, HttpRequest req) throws Exception {
if (maxLongPollingSeconds > 0) {
ctx.setResponseTimeoutMillis(TimeoutMode.EXTEND,
TimeUnit.SECONDS.toMillis(maxLongPollingSeconds));
}
return unwrap().execute(ctx, req);
}
}

private class HealthCheckResponseSubscriber implements Subscriber<HttpObject> {

private final ClientRequestContext reqCtx;
private final HttpResponse res;
@Nullable
private Subscription subscription;
@Nullable
private ResponseHeaders responseHeaders;
private boolean isHealthy;
private boolean receivedExpectedResponse;
private boolean updatedHealth;

@Nullable
private ScheduledFuture<?> pingCheckFuture;
private long lastPingTimeNanos;

HealthCheckResponseSubscriber(ClientRequestContext reqCtx, HttpResponse res) {
this.reqCtx = reqCtx;
this.res = res;
}

@Override
public void onSubscribe(Subscription subscription) {
this.subscription = subscription;
subscription.request(1);
maybeSchedulePingCheck();
}

@Override
public void onNext(HttpObject obj) {
assert subscription != null;

if (closeable.isClosing()) {
subscription.cancel();
return;
}

try {
if (!(obj instanceof ResponseHeaders)) {
PooledObjects.close(obj);
return;
}

final ResponseHeaders headers = (ResponseHeaders) obj;
responseHeaders = headers;
updateLongPollingSettings(headers);

final HttpStatus status = headers.status();
final HttpStatusClass statusClass = status.codeClass();
switch (statusClass) {
case INFORMATIONAL:
maybeSchedulePingCheck();
break;
case SERVER_ERROR:
receivedExpectedResponse = true;
break;
case SUCCESS:
isHealthy = true;
receivedExpectedResponse = true;
break;
default:
if (status == HttpStatus.NOT_MODIFIED) {
isHealthy = wasHealthy;
receivedExpectedResponse = true;
} else {
// Do not use long polling on an unexpected status for safety.
maxLongPollingSeconds = 0;

if (statusClass == HttpStatusClass.CLIENT_ERROR) {
logger.warn("{} Unexpected 4xx health check response: {} A 4xx response " +
"generally indicates a misconfiguration of the client. " +
"Did you happen to forget to configure the {}'s client options?",
reqCtx, headers, HealthCheckedEndpointGroup.class.getSimpleName());
} else {
logger.warn("{} Unexpected health check response: {}", reqCtx, headers);
}
}
}
} finally {
subscription.request(1);
}
}

@Override
public void onError(Throwable t) {
updateHealth(t);
}

@Override
public void onComplete() {
updateHealth(null);
}

private void updateLongPollingSettings(ResponseHeaders headers) {
final String longPollingSettings = headers.get(ARMERIA_LPHC);
if (longPollingSettings == null) {
maxLongPollingSeconds = 0;
pingIntervalSeconds = 0;
return;
}

final int commaPos = longPollingSettings.indexOf(',');
int maxLongPollingSeconds = 0;
int pingIntervalSeconds = 0;
try {
maxLongPollingSeconds = Integer.max(
0, Integer.parseInt(longPollingSettings.substring(0, commaPos).trim()));
pingIntervalSeconds = Integer.max(
0, Integer.parseInt(longPollingSettings.substring(commaPos + 1).trim()));
} catch (Exception e) {
// Ignore malformed settings.
}

DefaultHttpHealthChecker.this.maxLongPollingSeconds = maxLongPollingSeconds;
if (maxLongPollingSeconds > 0 && pingIntervalSeconds < maxLongPollingSeconds) {
DefaultHttpHealthChecker.this.pingIntervalSeconds = pingIntervalSeconds;
} else {
DefaultHttpHealthChecker.this.pingIntervalSeconds = 0;
}
}

// TODO(trustin): Remove once https://github.com/line/armeria/issues/1063 is fixed.
private void maybeSchedulePingCheck() {
lastPingTimeNanos = System.nanoTime();

if (pingCheckFuture != null) {
return;
}

final int pingIntervalSeconds = DefaultHttpHealthChecker.this.pingIntervalSeconds;
if (pingIntervalSeconds <= 0) {
return;
}

final long pingTimeoutNanos = TimeUnit.SECONDS.toNanos(pingIntervalSeconds) * 2;
pingCheckFuture = reqCtx.eventLoop().withoutContext().scheduleWithFixedDelay(() -> {
if (System.nanoTime() - lastPingTimeNanos >= pingTimeoutNanos) {
// Did not receive a ping on time.
final ResponseTimeoutException cause = ResponseTimeoutException.get();
res.abort(cause);
isHealthy = false;
receivedExpectedResponse = false;
updateHealth(cause);
}
}, 1, 1, TimeUnit.SECONDS);
}

private void updateHealth(@Nullable Throwable cause) {
if (pingCheckFuture != null) {
pingCheckFuture.cancel(false);
}

if (closeable.isClosing() || updatedHealth) {
return;
}

updatedHealth = true;

ctx.updateHealth(isHealthy ? 1 : 0, reqCtx, responseHeaders, cause);
wasHealthy = isHealthy;

final ScheduledExecutorService executor = ctx.executor();
try {
// Send a long polling check immediately if:
// - Server has long polling enabled.
// - Server responded with 2xx or 5xx.
if (maxLongPollingSeconds > 0 && receivedExpectedResponse) {
executor.execute(DefaultHttpHealthChecker.this::check);
} else {
executor.schedule(DefaultHttpHealthChecker.this::check,
ctx.nextDelayMillis(), TimeUnit.MILLISECONDS);
}
} catch (RejectedExecutionException ignored) {
// Can happen if the Endpoint being checked has been disappeared from
// the delegate EndpointGroup. See HealthCheckedEndpointGroupTest.disappearedEndpoint().
}
}
}

private void lock() {
lock.lock();
}

private void unlock() {
lock.unlock();
}
}
Loading

0 comments on commit 011741d

Please sign in to comment.