Skip to content

Commit

Permalink
improved getOrCreateRlqsEngine
Browse files Browse the repository at this point in the history
  • Loading branch information
sergiitk committed Sep 28, 2024
1 parent 2ba4f46 commit 7432065
Show file tree
Hide file tree
Showing 3 changed files with 19 additions and 35 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -30,12 +30,12 @@ public class RlqsBucket {

private final RateLimitStrategy noAssignmentStrategy;
private final RateLimitStrategy expiredAssignmentStrategy;
private final DenyResponse denyResponse;

// TODO(sergiitk): [impl] consider AtomicLongFieldUpdater
private final AtomicLong lastSnapshotTimeNanos = new AtomicLong(-1);
private final AtomicLong numRequestsAllowed = new AtomicLong();
private final AtomicLong numRequestsDenied = new AtomicLong();
private final DenyResponse denyResponse;

// TODO(sergiitk): [impl] consider AtomicReferenceFieldUpdater
@Nullable
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -31,7 +31,7 @@ final class RlqsBucketCache {
private final ConcurrentMap<Long, Set<RlqsBucket>> bucketsPerInterval = new ConcurrentHashMap<>();
private final ConcurrentMap<RlqsBucketId, RlqsBucket> buckets = new ConcurrentHashMap<>();

RlqsBucket getOrCreate(
public RlqsBucket getOrCreate(
RlqsBucketId bucketId, RlqsBucketSettings bucketSettings, Consumer<RlqsBucket> onCreate) {
// read synchronize trick
RlqsBucket bucket = buckets.get(bucketId);
Expand All @@ -49,7 +49,7 @@ RlqsBucket getOrCreate(
}
}

void deleteBucket(RlqsBucketId bucketId) {
public void deleteBucket(RlqsBucketId bucketId) {
RlqsBucket bucket = buckets.get(bucketId);
if (bucket == null) {
return;
Expand All @@ -63,7 +63,8 @@ void deleteBucket(RlqsBucketId bucketId) {
}
}

void updateBucket(RlqsBucketId bucketId, RateLimitStrategy rateLimitStrategy, long ttlMillis) {
public void updateBucket(
RlqsBucketId bucketId, RateLimitStrategy rateLimitStrategy, long ttlMillis) {
RlqsBucket bucket = buckets.get(bucketId);
bucket.updateAction(rateLimitStrategy, ttlMillis);
}
Expand Down
45 changes: 14 additions & 31 deletions xds/src/main/java/io/grpc/xds/internal/rlqs/RlqsCache.java
Original file line number Diff line number Diff line change
Expand Up @@ -19,17 +19,14 @@
import static com.google.common.base.Preconditions.checkNotNull;

import com.google.common.collect.Sets;
import com.google.common.util.concurrent.SettableFuture;
import io.grpc.ChannelCredentials;
import io.grpc.InsecureChannelCredentials;
import io.grpc.SynchronizationContext;
import io.grpc.xds.RlqsFilterConfig;
import io.grpc.xds.client.Bootstrapper.RemoteServerInfo;
import java.util.Set;
import java.util.concurrent.ConcurrentHashMap;
import java.util.concurrent.ExecutionException;
import java.util.concurrent.ScheduledExecutorService;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.TimeoutException;
import java.util.logging.Level;
import java.util.logging.Logger;

Expand Down Expand Up @@ -80,34 +77,20 @@ public void shutdownRlqsEngine(RlqsFilterConfig oldConfig) {
// TODO(sergiitk): shutdown one
}

public RlqsEngine getOrCreateRlqsEngine(RlqsFilterConfig config) {
final String configHash = hashRlqsFilterConfig(config);
if (enginePool.containsKey(configHash)) {
return enginePool.get(configHash);
}
public RlqsEngine getOrCreateRlqsEngine(final RlqsFilterConfig config) {
String configHash = hashRlqsFilterConfig(config);
return enginePool.computeIfAbsent(configHash, k -> newRlqsEngine(k, config));
}

final SettableFuture<RlqsEngine> future = SettableFuture.create();
syncContext.execute(() -> {
// TODO(sergiitk): [IMPL] get channel creds from the bootstrap.
RemoteServerInfo rlqsServer = RemoteServerInfo.create(config.rlqsService().targetUri(),
InsecureChannelCredentials.create());
RlqsEngine rlqsEngine = new RlqsEngine(
rlqsServer,
config.domain(),
config.bucketMatchers(),
configHash,
scheduler);

enginePool.put(configHash, rlqsEngine);
future.set(enginePool.get(configHash));
});
try {
// TODO(sergiitk): [IMPL] clarify time
return future.get(1, TimeUnit.SECONDS);
} catch (InterruptedException | ExecutionException | TimeoutException e) {
// TODO(sergiitk): [IMPL] handle properly
throw new RuntimeException(e);
}
private RlqsEngine newRlqsEngine(String configHash, RlqsFilterConfig config) {
// TODO(sergiitk): [IMPL] get channel creds from the bootstrap.
ChannelCredentials creds = InsecureChannelCredentials.create();
return new RlqsEngine(
RemoteServerInfo.create(config.rlqsService().targetUri(), creds),
config.domain(),
config.bucketMatchers(),
configHash,
scheduler);
}

private String hashRlqsFilterConfig(RlqsFilterConfig config) {
Expand Down

0 comments on commit 7432065

Please sign in to comment.