Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

xds: RLQS Prototype #11456

Draft
wants to merge 47 commits into
base: master
Choose a base branch
from
Draft
Show file tree
Hide file tree
Changes from all commits
Commits
Show all changes
47 commits
Select commit Hold shift + click to select a range
12e58b4
RlqsFilter WIP
sergiitk Feb 7, 2024
a4de284
Basic GrpcService type
sergiitk Feb 7, 2024
0709108
Basic GrpcService
sergiitk Feb 7, 2024
63e2fb1
Basic interceptor
sergiitk Feb 7, 2024
1d225e2
Notes from the sync with Eric
sergiitk Feb 12, 2024
8f4f02c
post-rebase fix
sergiitk Mar 19, 2024
962a4cc
RlqsClientPool, RlqsClient, working on shutdown
sergiitk Mar 26, 2024
3b70775
another note
sergiitk Mar 26, 2024
ef9b4f7
categorize todos
sergiitk Mar 26, 2024
caa1d13
Basic RlqsBucketSettings and Matcher parsing
sergiitk Mar 26, 2024
b25efa9
Minimal CelMatcher
sergiitk Mar 27, 2024
9fd26b7
basic cel-java integration/test
sergiitk Mar 27, 2024
b870c02
Implement GrpcCelEnvironment and MetadataHelper
sergiitk Sep 4, 2024
dfca019
Use dev.cel:runtime in the prod code
sergiitk Sep 6, 2024
3963469
Add RlqsClientPool/RlqsClient/RlqsApiClient classes
sergiitk Sep 16, 2024
e8691ef
Draft bucket processing logic
sergiitk Sep 24, 2024
2f8fe96
Filter chain lifecycle bookmarks - filter provider refactoring TBD
sergiitk Sep 24, 2024
fabefe0
Draft reports and timers
sergiitk Sep 25, 2024
0273132
RlqsClient -> RlqsEngine
sergiitk Sep 25, 2024
9a05cb8
Remove periodic cleanup logic from RlqsClientPool
sergiitk Sep 25, 2024
f9758a4
RlqsClientPool -> RlqsCache
sergiitk Sep 25, 2024
e1f289e
RlqsApiClient -> RlqsClient
sergiitk Sep 25, 2024
bf7c410
More class drafting
sergiitk Sep 25, 2024
92169c6
Create proper RateLimitResult
sergiitk Sep 26, 2024
725d7b8
Draft Bucket: Usage Reports, RateLimitStrategy, TTLs
sergiitk Sep 26, 2024
5588d8b
Improve method names
sergiitk Sep 27, 2024
04f0d02
RateLimitResult -> RlqsRateLimitResult
sergiitk Sep 27, 2024
748dfcf
More API improvements
sergiitk Sep 27, 2024
03bde29
getOrCreate pattern for bucket cache and timers
sergiitk Sep 27, 2024
00ea545
RlqsClient doesn't know about the bucket cache anymore; uses callbacks
sergiitk Sep 28, 2024
c567c46
Minor renames
sergiitk Sep 28, 2024
831cd88
improved getOrCreateRlqsEngine
sergiitk Sep 28, 2024
c318d2f
Handle special case
sergiitk Sep 28, 2024
9b0b8eb
Dynamic bucket id builder processing initial logic.
sergiitk Oct 3, 2024
9d9e279
hash config to a long
sergiitk Oct 4, 2024
3d9c5cb
Remove outdated note
sergiitk Oct 8, 2024
e1e8878
add Filter.isEnabled()
sergiitk Oct 15, 2024
0848a63
add GRPC_EXPERIMENTAL_RLQS_DRY_RUN
sergiitk Oct 15, 2024
d76d24b
XdsTestServer: add --xds_server_mode
sergiitk Oct 17, 2024
e5d6557
convert logid to local
sergiitk Oct 17, 2024
1d222e0
PSM e2e: works!
sergiitk Oct 17, 2024
877dfc0
RlqsEngine -> RlqsFilterState
sergiitk Oct 17, 2024
921a502
Add CEL types to the message printer
sergiitk Oct 21, 2024
7af3fcc
Add CEL macro verifications
sergiitk Oct 24, 2024
e56c032
Add CelMatcher.fromEnvoyProto - just to import dev.cel.expr
sergiitk Oct 25, 2024
29f6fe7
LongAdder note
sergiitk Oct 31, 2024
0656459
CEL variable resolver
sergiitk Oct 28, 2024
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
2 changes: 2 additions & 0 deletions gradle/libs.versions.toml
Original file line number Diff line number Diff line change
Expand Up @@ -27,6 +27,8 @@ commons-math3 = "org.apache.commons:commons-math3:3.6.1"
conscrypt = "org.conscrypt:conscrypt-openjdk-uber:2.5.2"
cronet-api = "org.chromium.net:cronet-api:119.6045.31"
cronet-embedded = "org.chromium.net:cronet-embedded:119.6045.31"
dev-cel-compiler = "dev.cel:compiler:0.6.0"
dev-cel-runtime = "dev.cel:runtime:0.6.0"
# error-prone 2.31.0+ blocked on https://github.com/grpc/grpc-java/issues/10152
# It breaks Bazel (ArrayIndexOutOfBoundsException in turbine) and Dexing ("D8:
# java.lang.NullPointerException"). We can trivially upgrade the Bazel CI to
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -32,6 +32,7 @@
import io.grpc.ServerCredentials;
import io.grpc.ServerInterceptor;
import io.grpc.ServerInterceptors;
import io.grpc.ServerServiceDefinition;
import io.grpc.Status;
import io.grpc.gcp.csm.observability.CsmObservability;
import io.grpc.health.v1.HealthCheckResponse.ServingStatus;
Expand Down Expand Up @@ -83,6 +84,7 @@ public final class XdsTestServer {
private int port = 8080;
private int maintenancePort = 8080;
private boolean secureMode = false;
private boolean xdsServerMode = false;
private boolean enableCsmObservability;
private String serverId = "java_server";
private HealthStatusManager health;
Expand Down Expand Up @@ -143,7 +145,10 @@ void parseArgs(String[] args) {
maintenancePort = Integer.valueOf(value);
} else if ("secure_mode".equals(key)) {
secureMode = Boolean.parseBoolean(value);
} else if ("enable_csm_observability".equals(key)) {
} else if ("xds_server_mode".equals(key)) {
xdsServerMode = Boolean.parseBoolean(value);
}
else if ("enable_csm_observability".equals(key)) {
enableCsmObservability = Boolean.valueOf(value);
} else if ("server_id".equals(key)) {
serverId = value;
Expand All @@ -164,6 +169,9 @@ void parseArgs(String[] args) {
+ maintenancePort);
usage = true;
}
if (secureMode) {
xdsServerMode = true;
}

if (usage) {
XdsTestServer s = new XdsTestServer();
Expand All @@ -180,6 +188,9 @@ void parseArgs(String[] args) {
+ " port and maintenance_port should be different for secure mode."
+ "\n Default: "
+ s.secureMode
+ "\n --xds_server_mode=BOOLEAN Start in xDS Server mode."
+ "\n Default: "
+ s.xdsServerMode
+ "\n --enable_csm_observability=BOOL Enable CSM observability reporting. Default: "
+ s.enableCsmObservability
+ "\n --server_id=STRING server ID for response."
Expand Down Expand Up @@ -213,66 +224,72 @@ void start() throws Exception {
throw new RuntimeException(e);
}
health = new HealthStatusManager();
ServerServiceDefinition testServiceInterceptor = ServerInterceptors.intercept(
new TestServiceImpl(serverId, host),
new TestInfoInterceptor(host));
ServerCredentials insecureServerCreds = InsecureServerCredentials.create();

@SuppressWarnings("deprecation")
BindableService oldReflectionService = ProtoReflectionService.newInstance();
if (secureMode) {
if (addressType != Util.AddressType.IPV4_IPV6) {
throw new IllegalArgumentException("Secure mode only supports IPV4_IPV6 address type");
}
maintenanceServer =
Grpc.newServerBuilderForPort(maintenancePort, InsecureServerCredentials.create())
Grpc.newServerBuilderForPort(maintenancePort, insecureServerCreds)
.addService(new XdsUpdateHealthServiceImpl(health))
.addService(health.getHealthService())
.addService(oldReflectionService)
.addService(ProtoReflectionServiceV1.newInstance())
.addServices(AdminInterface.getStandardServices())
.build();
maintenanceServer.start();
server =
XdsServerBuilder.forPort(
port, XdsServerCredentials.create(InsecureServerCredentials.create()))
.addService(
ServerInterceptors.intercept(
new TestServiceImpl(serverId, host), new TestInfoInterceptor(host)))
server = XdsServerBuilder.forPort(port, XdsServerCredentials.create(insecureServerCreds))
.addService(testServiceInterceptor)
.build();
server.start();
} else {
ServerBuilder<?> serverBuilder;
ServerCredentials insecureServerCreds = InsecureServerCredentials.create();
switch (addressType) {
case IPV4_IPV6:
serverBuilder = Grpc.newServerBuilderForPort(port, insecureServerCreds);
break;
case IPV4:
SocketAddress v4Address = Util.getV4Address(port);
InetSocketAddress localV4Address = new InetSocketAddress("127.0.0.1", port);
serverBuilder = NettyServerBuilder.forAddress(
localV4Address, insecureServerCreds);
if (v4Address != null && !v4Address.equals(localV4Address) ) {
((NettyServerBuilder) serverBuilder).addListenAddress(v4Address);
}
break;
case IPV6:
List<SocketAddress> v6Addresses = Util.getV6Addresses(port);
InetSocketAddress localV6Address = new InetSocketAddress("::1", port);
serverBuilder = NettyServerBuilder.forAddress(localV6Address, insecureServerCreds);
for (SocketAddress address : v6Addresses) {
if (!address.equals(localV6Address)) {
((NettyServerBuilder) serverBuilder).addListenAddress(address);
}
health.setStatus("", ServingStatus.SERVING);
return;
}

ServerBuilder<?> serverBuilder;
switch (addressType) {
case IPV4_IPV6:
serverBuilder = Grpc.newServerBuilderForPort(port, insecureServerCreds);
break;
case IPV4:
SocketAddress v4Address = Util.getV4Address(port);
InetSocketAddress localV4Address = new InetSocketAddress("127.0.0.1", port);
serverBuilder = NettyServerBuilder.forAddress(
localV4Address, insecureServerCreds);
if (v4Address != null && !v4Address.equals(localV4Address) ) {
((NettyServerBuilder) serverBuilder).addListenAddress(v4Address);
}
break;
case IPV6:
List<SocketAddress> v6Addresses = Util.getV6Addresses(port);
InetSocketAddress localV6Address = new InetSocketAddress("::1", port);
serverBuilder = NettyServerBuilder.forAddress(localV6Address, insecureServerCreds);
for (SocketAddress address : v6Addresses) {
if (!address.equals(localV6Address)) {
((NettyServerBuilder) serverBuilder).addListenAddress(address);
}
break;
default:
throw new AssertionError("Unknown address type: " + addressType);
}
break;
default:
throw new AssertionError("Unknown address type: " + addressType);
}

if (xdsServerMode) {
if (addressType != Util.AddressType.IPV4_IPV6) {
throw new IllegalArgumentException("xDS Server mode only supports IPV4_IPV6 address type");
}

logger.info("Starting server on port " + port + " with address type " + addressType);

server =
serverBuilder
.addService(
ServerInterceptors.intercept(
new TestServiceImpl(serverId, host), new TestInfoInterceptor(host)))
.addService(testServiceInterceptor)
.addService(new XdsUpdateHealthServiceImpl(health))
.addService(health.getHealthService())
.addService(oldReflectionService)
Expand All @@ -281,7 +298,23 @@ void start() throws Exception {
.build();
server.start();
maintenanceServer = null;
health.setStatus("", ServingStatus.SERVING);
return;
}

logger.info("Starting server on port " + port + " with address type " + addressType);

server =
serverBuilder
.addService(testServiceInterceptor)
.addService(new XdsUpdateHealthServiceImpl(health))
.addService(health.getHealthService())
.addService(oldReflectionService)
.addService(ProtoReflectionServiceV1.newInstance())
.addServices(AdminInterface.getStandardServices())
.build();
server.start();
maintenanceServer = null;
health.setStatus("", ServingStatus.SERVING);
}

Expand Down
4 changes: 3 additions & 1 deletion xds/build.gradle
Original file line number Diff line number Diff line change
Expand Up @@ -52,6 +52,7 @@ dependencies {
project(':grpc-services'),
project(':grpc-auth'),
project(path: ':grpc-alts', configuration: 'shadow'),
libraries.dev.cel.runtime,
libraries.guava,
libraries.gson,
libraries.re2j,
Expand All @@ -70,7 +71,8 @@ dependencies {
compileOnly libraries.netty.transport.epoll

testImplementation project(':grpc-testing'),
project(':grpc-testing-proto')
project(':grpc-testing-proto'),
libraries.dev.cel.compiler
testImplementation (libraries.netty.transport.epoll) {
artifact {
classifier = "linux-x86_64"
Expand Down
19 changes: 19 additions & 0 deletions xds/src/main/java/io/grpc/xds/Filter.java
Original file line number Diff line number Diff line change
Expand Up @@ -38,6 +38,10 @@
*/
String[] typeUrls();

default boolean isEnabled() {
return true;
}

/**
* Parses the top-level filter config from raw proto message. The message may be either a {@link
* com.google.protobuf.Any} or a {@link com.google.protobuf.Struct}.
Expand All @@ -50,6 +54,12 @@
*/
ConfigOrError<? extends FilterConfig> parseFilterConfigOverride(Message rawProtoMessage);

default void shutdown() {
// Implement as needed.
// TODO(sergiitk): [DESIGN] important to cover and discuss in the design.
// TODO(sergiitk): [QUESTION] should it be in ServerInterceptorBuilder?
}

Check warning on line 61 in xds/src/main/java/io/grpc/xds/Filter.java

View check run for this annotation

Codecov / codecov/patch

xds/src/main/java/io/grpc/xds/Filter.java#L61

Added line #L61 was not covered by tests

/** Represents an opaque data structure holding configuration for a filter. */
interface FilterConfig {
String typeUrl();
Expand All @@ -68,6 +78,15 @@
@Nullable
ServerInterceptor buildServerInterceptor(
FilterConfig config, @Nullable FilterConfig overrideConfig);

@Nullable
default ServerInterceptor buildServerInterceptor(
FilterConfig config,
@Nullable FilterConfig overrideConfig,
ScheduledExecutorService scheduler) {
return buildServerInterceptor(config, overrideConfig);
}

}

/** Filter config with instance name. */
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -108,6 +108,7 @@ public void userEventTriggered(ChannelHandlerContext ctx, Object evt) throws Exc
drainGraceTime = drainGraceNanosObj;
drainGraceTimeUnit = TimeUnit.NANOSECONDS;
}
// TODO(sergiitk): [design] drains connections on LDS update.
FilterChainSelectorManager.Closer closer = new FilterChainSelectorManager.Closer(
new GracefullyShutdownChannelRunnable(ctx.channel(), drainGraceTime, drainGraceTimeUnit));
FilterChainSelector selector = filterChainSelectorManager.register(closer);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -65,6 +65,7 @@ public void updateSelector(FilterChainSelector newSelector) {
closers = new TreeSet<Closer>(closers.comparator());
selector = newSelector;
}
// TODO(sergiitk): [design] calls the closer of FilterChainMatchingNegotiatorServerFactory
for (Closer closer : oldClosers) {
closer.closer.run();
}
Expand Down
6 changes: 5 additions & 1 deletion xds/src/main/java/io/grpc/xds/FilterRegistry.java
Original file line number Diff line number Diff line change
Expand Up @@ -37,7 +37,8 @@ static synchronized FilterRegistry getDefaultRegistry() {
instance = newRegistry().register(
FaultFilter.INSTANCE,
RouterFilter.INSTANCE,
RbacFilter.INSTANCE);
RbacFilter.INSTANCE,
RlqsFilter.INSTANCE);
}
return instance;
}
Expand All @@ -50,6 +51,9 @@ static FilterRegistry newRegistry() {
@VisibleForTesting
FilterRegistry register(Filter... filters) {
for (Filter filter : filters) {
if (!filter.isEnabled()) {
continue;
}
for (String typeUrl : filter.typeUrls()) {
supportedFilters.put(typeUrl, filter);
}
Expand Down
9 changes: 9 additions & 0 deletions xds/src/main/java/io/grpc/xds/MessagePrinter.java
Original file line number Diff line number Diff line change
Expand Up @@ -16,6 +16,8 @@

package io.grpc.xds;

import com.github.xds.type.matcher.v3.CelMatcher;
import com.github.xds.type.matcher.v3.HttpAttributesCelMatchInput;
import com.google.protobuf.Descriptors.Descriptor;
import com.google.protobuf.InvalidProtocolBufferException;
import com.google.protobuf.Message;
Expand All @@ -28,6 +30,8 @@
import io.envoyproxy.envoy.config.route.v3.RouteConfiguration;
import io.envoyproxy.envoy.extensions.clusters.aggregate.v3.ClusterConfig;
import io.envoyproxy.envoy.extensions.filters.http.fault.v3.HTTPFault;
import io.envoyproxy.envoy.extensions.filters.http.rate_limit_quota.v3.RateLimitQuotaFilterConfig;
import io.envoyproxy.envoy.extensions.filters.http.rate_limit_quota.v3.RateLimitQuotaOverride;
import io.envoyproxy.envoy.extensions.filters.http.rbac.v3.RBAC;
import io.envoyproxy.envoy.extensions.filters.http.rbac.v3.RBACPerRoute;
import io.envoyproxy.envoy.extensions.filters.http.router.v3.Router;
Expand Down Expand Up @@ -58,6 +62,11 @@ private static JsonFormat.Printer newPrinter() {
.add(RBAC.getDescriptor())
.add(RBACPerRoute.getDescriptor())
.add(Router.getDescriptor())
// RLQS
.add(RateLimitQuotaFilterConfig.getDescriptor())
.add(RateLimitQuotaOverride.getDescriptor())
.add(HttpAttributesCelMatchInput.getDescriptor())
.add(CelMatcher.getDescriptor())
// UpstreamTlsContext and DownstreamTlsContext in v3 are not transitively imported
// by top-level resource types.
.add(UpstreamTlsContext.getDescriptor())
Expand Down
Loading
Loading