Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

KAFKA-18870 Implement describeDelegationToken for controller #19306

Open
wants to merge 7 commits into
base: trunk
Choose a base branch
from
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -3429,7 +3429,7 @@ public DescribeDelegationTokenResult describeDelegationToken(final DescribeDeleg
final KafkaFutureImpl<List<DelegationToken>> tokensFuture = new KafkaFutureImpl<>();
final long now = time.milliseconds();
runnable.call(new Call("describeDelegationToken", calcDeadlineMs(now, options.timeoutMs()),
new LeastLoadedNodeProvider()) {
new LeastLoadedBrokerOrActiveKController()) {

@Override
DescribeDelegationTokenRequest.Builder createRequest(int timeoutMs) {
Expand Down
41 changes: 39 additions & 2 deletions core/src/main/scala/kafka/server/ControllerApis.scala
Original file line number Diff line number Diff line change
Expand Up @@ -31,7 +31,7 @@ import kafka.server.metadata.KRaftMetadataCache
import kafka.utils.Logging
import org.apache.kafka.clients.admin.{AlterConfigOp, EndpointType}
import org.apache.kafka.common.Uuid.ZERO_UUID
import org.apache.kafka.common.acl.AclOperation.{ALTER, ALTER_CONFIGS, CLUSTER_ACTION, CREATE, CREATE_TOKENS, DELETE, DESCRIBE, DESCRIBE_CONFIGS}
import org.apache.kafka.common.acl.AclOperation.{ALTER, ALTER_CONFIGS, CLUSTER_ACTION, CREATE, CREATE_TOKENS, DELETE, DESCRIBE, DESCRIBE_CONFIGS, DESCRIBE_TOKENS}
import org.apache.kafka.common.config.ConfigResource
import org.apache.kafka.common.errors.{ApiException, ClusterAuthorizationException, InvalidRequestException, TopicDeletionDisabledException, UnsupportedVersionException}
import org.apache.kafka.common.internals.FatalExitError
Expand All @@ -47,7 +47,7 @@ import org.apache.kafka.common.protocol.Errors._
import org.apache.kafka.common.protocol.{ApiKeys, ApiMessage, Errors}
import org.apache.kafka.common.requests._
import org.apache.kafka.common.resource.Resource.CLUSTER_NAME
import org.apache.kafka.common.resource.ResourceType.{CLUSTER, GROUP, TOPIC, USER}
import org.apache.kafka.common.resource.ResourceType.{CLUSTER, DELEGATION_TOKEN, GROUP, TOPIC, USER}
import org.apache.kafka.common.utils.Time
import org.apache.kafka.common.Uuid
import org.apache.kafka.controller.ControllerRequestContext.requestTimeoutMsToDeadlineNs
Expand All @@ -56,6 +56,7 @@ import org.apache.kafka.image.publisher.ControllerRegistrationsPublisher
import org.apache.kafka.metadata.{BrokerHeartbeatReply, BrokerRegistrationReply}
import org.apache.kafka.common.security.auth.KafkaPrincipal
import org.apache.kafka.common.security.auth.SecurityProtocol
import org.apache.kafka.common.security.token.delegation.{DelegationToken, TokenInformation}
import org.apache.kafka.server.ProcessRole
import org.apache.kafka.server.authorizer.Authorizer
import org.apache.kafka.server.common.{ApiMessageAndVersion, RequestLocal}
Expand All @@ -76,6 +77,7 @@ class ControllerApis(
val config: KafkaConfig,
val clusterId: String,
val registrationsPublisher: ControllerRegistrationsPublisher,
val tokenManager: DelegationTokenManager,
val apiVersionManager: ApiVersionManager,
val metadataCache: KRaftMetadataCache
) extends ApiRequestHandler with Logging {
Expand Down Expand Up @@ -116,6 +118,7 @@ class ControllerApis(
case ApiKeys.CREATE_DELEGATION_TOKEN => handleCreateDelegationTokenRequest(request)
case ApiKeys.RENEW_DELEGATION_TOKEN => handleRenewDelegationTokenRequest(request)
case ApiKeys.EXPIRE_DELEGATION_TOKEN => handleExpireDelegationTokenRequest(request)
case ApiKeys.DESCRIBE_DELEGATION_TOKEN => handleDescribeDelegationTokenRequest(request)
case ApiKeys.ENVELOPE => handleEnvelopeRequest(request, requestLocal)
case ApiKeys.SASL_HANDSHAKE => handleSaslHandshakeRequest(request)
case ApiKeys.SASL_AUTHENTICATE => handleSaslAuthenticateRequest(request)
Expand Down Expand Up @@ -1009,6 +1012,40 @@ class ControllerApis(
}
}

private def handleDescribeDelegationTokenRequest(request: RequestChannel.Request): CompletableFuture[Unit] = {
val describeTokenRequest = request.body[DescribeDelegationTokenRequest]

// the callback for sending a describe token response
def sendResponseCallback(error: Errors, tokenDetails: util.List[DelegationToken]): CompletableFuture[Unit] = {
requestHelper.sendResponseMaybeThrottle(request, requestThrottleMs =>
new DescribeDelegationTokenResponse(request.context.requestVersion(), requestThrottleMs, error, tokenDetails))
trace("Sending describe token response for correlation id %d to client %s."
.format(request.header.correlationId, request.header.clientId))
CompletableFuture.completedFuture[Unit](())
}

if (!allowTokenRequests(request))
sendResponseCallback(Errors.DELEGATION_TOKEN_REQUEST_NOT_ALLOWED, Collections.emptyList)
else if (!config.tokenAuthEnabled)
sendResponseCallback(Errors.DELEGATION_TOKEN_AUTH_DISABLED, Collections.emptyList)
else {
val requestPrincipal = request.context.principal

if (describeTokenRequest.ownersListEmpty()) {
sendResponseCallback(Errors.NONE, Collections.emptyList)
}
else {
val owners = Option(describeTokenRequest.data.owners)
.map(_.asScala.map(p => new KafkaPrincipal(p.principalType, p.principalName)).toList)
def authorizeToken(tokenId: String) = authHelper.authorize(request.context, DESCRIBE, DELEGATION_TOKEN, tokenId)
def authorizeRequester(owner: KafkaPrincipal) = authHelper.authorize(request.context, DESCRIBE_TOKENS, USER, owner.toString)
def eligible(token: TokenInformation) = DelegationTokenManager.filterToken(requestPrincipal, owners, token, authorizeToken, authorizeRequester)
val tokens = tokenManager.getTokens(eligible)
sendResponseCallback(Errors.NONE, tokens)
}
}
}

def handleListPartitionReassignments(request: RequestChannel.Request): CompletableFuture[Unit] = {
val listRequest = request.body[ListPartitionReassignmentsRequest]
authHelper.authorizeClusterOperation(request, DESCRIBE)
Expand Down
3 changes: 3 additions & 0 deletions core/src/main/scala/kafka/server/ControllerServer.scala
Original file line number Diff line number Diff line change
Expand Up @@ -93,6 +93,7 @@ class ControllerServer(
var controller: Controller = _
var quotaManagers: QuotaManagers = _
var clientQuotaMetadataManager: ClientQuotaMetadataManager = _
var tokenManager: DelegationTokenManager = _
var controllerApis: ControllerApis = _
var controllerApisHandlerPool: KafkaRequestHandlerPool = _
def kafkaYammerMetrics: KafkaYammerMetrics = KafkaYammerMetrics.INSTANCE
Expand Down Expand Up @@ -269,6 +270,7 @@ class ControllerServer(
time,
s"controller-${config.nodeId}-", ProcessRole.ControllerRole.toString)
clientQuotaMetadataManager = new ClientQuotaMetadataManager(quotaManagers, socketServer.connectionQuotas)
tokenManager = new DelegationTokenManager(config, tokenCache, time)
controllerApis = new ControllerApis(socketServer.dataPlaneRequestChannel,
authorizer,
quotaManagers,
Expand All @@ -278,6 +280,7 @@ class ControllerServer(
config,
clusterId,
registrationsPublisher,
tokenManager,
apiVersionManager,
metadataCache)
controllerApisHandlerPool = new KafkaRequestHandlerPool(config.nodeId,
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -169,6 +169,7 @@ class ControllerApisTest {
new KafkaConfig(props),
"JgxuGe9URy-E-ceaL04lEw",
new ControllerRegistrationsPublisher(),
null,
new SimpleApiVersionManager(
ListenerType.CONTROLLER,
true,
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -275,6 +275,13 @@ default Set<GroupProtocol> supportedGroupProtocols() {

//---------------------------[wait]---------------------------//

default void waitForToken() throws InterruptedException {
Collection<KafkaBroker> brokers = aliveBrokers().values();
TestUtils.waitForCondition(() -> brokers.stream()
.noneMatch(broker -> broker.tokenCache().tokens().isEmpty()),
60000L, "Token not propagated after 60000 ms");
}

default void waitTopicDeletion(String topic) throws InterruptedException {
waitForTopic(topic, 0);
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -42,6 +42,7 @@
import java.util.Collections;
import java.util.HashSet;
import java.util.List;
import java.util.Optional;
import java.util.Properties;
import java.util.concurrent.ExecutionException;
import java.util.stream.Stream;
Expand Down Expand Up @@ -181,12 +182,15 @@ public static List<DelegationToken> describeToken(Admin adminClient, DelegationT

private static Admin createAdminClient(DelegationTokenCommandOptions opts) throws IOException {
Properties props = Utils.loadProps(opts.options.valueOf(opts.commandConfigOpt));
props.put("bootstrap.servers", opts.options.valueOf(opts.bootstrapServerOpt));
CommandLineUtils.initializeBootstrapProperties(props,
Optional.ofNullable(opts.options.valueOf(opts.bootstrapServerOpt)),
Optional.ofNullable(opts.options.valueOf(opts.bootstrapControllerOpt)));
return Admin.create(props);
}

static class DelegationTokenCommandOptions extends CommandDefaultOptions {
public final OptionSpec<String> bootstrapServerOpt;
public final OptionSpec<String> bootstrapControllerOpt;
public final OptionSpec<String> commandConfigOpt;
public final OptionSpec<Void> createOpt;
public final OptionSpec<Void> renewOpt;
Expand All @@ -202,14 +206,20 @@ static class DelegationTokenCommandOptions extends CommandDefaultOptions {
public DelegationTokenCommandOptions(String[] args) {
super(args);

String bootstrapServerDoc = "REQUIRED: server(s) to use for bootstrapping.";
String bootstrapServerDoc = "REQUIRED: server(s) to use for bootstrapping. When the --bootstrap-controller argument is used --bootstrap-servers must not be specified.";
String commandConfigDoc = "REQUIRED: A property file containing configs to be passed to Admin Client. Token management" +
" operations are allowed in secure mode only. This config file is used to pass security related configs.";
String bootstrapControllerDoc = "REQUIRED: A comma-separated list of bootstrap.controllers that can be supplied instead of bootstrap-servers."
+ " This is useful for administrators who wish to bypass the brokers.";

this.bootstrapServerOpt = parser.accepts("bootstrap-server", bootstrapServerDoc)
.withRequiredArg()
.ofType(String.class);

this.bootstrapControllerOpt = parser.accepts("bootstrap-controller", bootstrapControllerDoc)
.withRequiredArg()
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

You cannot specify both --bootstrap-server and --bootstrap-controller as required. This makes result like:

>  ./bin/kafka-delegation-tokens.sh --bootstrap-controller localhost:9093 --describe
Missing required argument "[bootstrap-server]"

Please using withOptionalArg and do the check in checkArgs.

.ofType(String.class);

this.commandConfigOpt = parser.accepts("command-config", commandConfigDoc)
.withRequiredArg()
.ofType(String.class);
Expand Down Expand Up @@ -284,7 +294,7 @@ public String hmac() {

public void checkArgs() {
// check required args
CommandLineUtils.checkRequiredArgs(parser, options, bootstrapServerOpt, commandConfigOpt);
CommandLineUtils.checkRequiredArgs(parser, options, commandConfigOpt);

if (options.has(createOpt)) {
CommandLineUtils.checkRequiredArgs(parser, options, maxLifeTimeOpt);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -17,17 +17,32 @@
package org.apache.kafka.tools;

import org.apache.kafka.clients.admin.Admin;
import org.apache.kafka.clients.admin.CreateDelegationTokenOptions;
import org.apache.kafka.clients.admin.CreateDelegationTokenResult;
import org.apache.kafka.clients.admin.DescribeDelegationTokenResult;
import org.apache.kafka.clients.admin.MockAdminClient;
import org.apache.kafka.common.security.auth.SecurityProtocol;
import org.apache.kafka.common.security.token.delegation.DelegationToken;
import org.apache.kafka.common.test.ClusterInstance;
import org.apache.kafka.common.test.api.ClusterConfigProperty;
import org.apache.kafka.common.test.api.ClusterTest;
import org.apache.kafka.common.test.api.Type;
import org.apache.kafka.common.utils.SecurityUtils;

import org.junit.jupiter.api.Test;
import org.junit.jupiter.api.io.TempDir;

import java.io.IOException;
import java.nio.file.Files;
import java.nio.file.Path;
import java.util.ArrayList;
import java.util.Arrays;
import java.util.Collections;
import java.util.List;
import java.util.Map;
import java.util.concurrent.ExecutionException;

import static org.apache.kafka.server.config.DelegationTokenManagerConfigs.DELEGATION_TOKEN_SECRET_KEY_CONFIG;
import static org.junit.jupiter.api.Assertions.assertEquals;
import static org.junit.jupiter.api.Assertions.assertThrows;
import static org.junit.jupiter.api.Assertions.assertTrue;
Expand Down Expand Up @@ -81,6 +96,36 @@ public void testDelegationTokenRequests() throws ExecutionException, Interrupted

}

@Test
public void testSetBootstrapServerAndBootstrapController(@TempDir Path configPath) throws IOException {
Path testFile = Files.createFile(configPath.resolve("testfile"));
assertThrows(RuntimeException.class,
() -> DelegationTokenCommand.execute("--bootstrap-server", "localhost:9092",
"--bootstrap-controller", "localhost:9092",
"--command-config", testFile.toString(), "--describe"));
}

@ClusterTest(types = { Type.KRAFT },
brokerSecurityProtocol = SecurityProtocol.SASL_PLAINTEXT,
controllerSecurityProtocol = SecurityProtocol.SASL_PLAINTEXT,
serverProperties = {
@ClusterConfigProperty(key = DELEGATION_TOKEN_SECRET_KEY_CONFIG, value = "key")
})
public void testDescribeDelegationTokenWithBootstrapController(ClusterInstance clusterInstance) throws ExecutionException, InterruptedException {
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This test case is about Admin. We need another test case to test DelegationTokenCommand directly.

try (Admin brokerAdmin = clusterInstance.admin();
Admin controllerAdmin = clusterInstance.admin(Map.of(), true)) {
CreateDelegationTokenOptions ops = new CreateDelegationTokenOptions()
.renewers(List.of(SecurityUtils.parseKafkaPrincipal("User:user1")));
CreateDelegationTokenResult createResult = brokerAdmin.createDelegationToken(ops);
DelegationToken expected = createResult.delegationToken().get();
clusterInstance.waitForToken();

DescribeDelegationTokenResult describeResult = controllerAdmin.describeDelegationToken();
DelegationToken actual = describeResult.delegationTokens().get().get(0);
assertEquals(expected, actual);
}
}

private DelegationTokenCommand.DelegationTokenCommandOptions getCreateOpts(String renewer) {
String[] args = {"--bootstrap-server", "localhost:9092", "--max-life-time-period", "-1", "--command-config", "testfile", "--create", "--renewer-principal", renewer};
return new DelegationTokenCommand.DelegationTokenCommandOptions(args);
Expand Down