@@ -29,7 +29,7 @@ import org.apache.kafka.common.metrics.Metrics
29
29
import org .apache .kafka .common .metrics .stats .{Avg , CumulativeSum , Rate }
30
30
import org .apache .kafka .common .security .auth .KafkaPrincipal
31
31
import org .apache .kafka .common .utils .{Sanitizer , Time }
32
- import org .apache .kafka .server .config .{ ClientQuotaManagerConfig , ZooKeeperInternals }
32
+ import org .apache .kafka .server .config .ClientQuotaManagerConfig
33
33
import org .apache .kafka .server .quota .{ClientQuotaCallback , ClientQuotaEntity , ClientQuotaType , QuotaType , QuotaUtils , SensorAccess , ThrottleCallback , ThrottledChannel }
34
34
import org .apache .kafka .server .util .ShutdownableThread
35
35
import org .apache .kafka .network .Session
@@ -55,7 +55,7 @@ object QuotaTypes {
55
55
object ClientQuotaManager {
56
56
// Purge sensors after 1 hour of inactivity
57
57
val InactiveSensorExpirationTimeSeconds = 3600
58
-
58
+ private val DefaultName = " <default> "
59
59
val DefaultClientIdQuotaEntity : KafkaQuotaEntity = KafkaQuotaEntity (None , Some (DefaultClientIdEntity ))
60
60
val DefaultUserQuotaEntity : KafkaQuotaEntity = KafkaQuotaEntity (Some (DefaultUserEntity ), None )
61
61
val DefaultUserClientIdQuotaEntity : KafkaQuotaEntity = KafkaQuotaEntity (Some (DefaultUserEntity ), Some (DefaultClientIdEntity ))
@@ -76,13 +76,13 @@ object ClientQuotaManager {
76
76
77
77
case object DefaultUserEntity extends BaseUserEntity {
78
78
override def entityType : ClientQuotaEntity .ConfigEntityType = ClientQuotaEntity .ConfigEntityType .DEFAULT_USER
79
- override def name : String = ZooKeeperInternals . DEFAULT_STRING
79
+ override def name : String = DefaultName
80
80
override def toString : String = " default user"
81
81
}
82
82
83
83
case object DefaultClientIdEntity extends ClientQuotaEntity .ConfigEntity {
84
84
override def entityType : ClientQuotaEntity .ConfigEntityType = ClientQuotaEntity .ConfigEntityType .DEFAULT_CLIENT_ID
85
- override def name : String = ZooKeeperInternals . DEFAULT_STRING
85
+ override def name : String = DefaultName
86
86
override def toString : String = " default client-id"
87
87
}
88
88
@@ -93,7 +93,7 @@ object ClientQuotaManager {
93
93
94
94
def sanitizedUser : String = userEntity.map {
95
95
case entity : UserEntity => entity.sanitizedUser
96
- case DefaultUserEntity => ZooKeeperInternals . DEFAULT_STRING
96
+ case DefaultUserEntity => DefaultName
97
97
}.getOrElse(" " )
98
98
99
99
def clientId : String = clientIdEntity.map(_.name).getOrElse(" " )
@@ -260,7 +260,7 @@ class ClientQuotaManager(private val config: ClientQuotaManagerConfig,
260
260
*/
261
261
def unrecordQuotaSensor (request : RequestChannel .Request , value : Double , timeMs : Long ): Unit = {
262
262
val clientSensors = getOrCreateQuotaSensors(request.session, request.header.clientId)
263
- clientSensors.quotaSensor.record(value * ( - 1 ) , timeMs, false )
263
+ clientSensors.quotaSensor.record(value * - 1 , timeMs, false )
264
264
}
265
265
266
266
/**
@@ -403,12 +403,15 @@ class ClientQuotaManager(private val config: ClientQuotaManagerConfig,
403
403
* Overrides quotas for <user>, <client-id> or <user, client-id> or the dynamic defaults
404
404
* for any of these levels.
405
405
*
406
- * @param sanitizedUser user to override if quota applies to <user> or <user, client-id>
407
- * @param clientId client to override if quota applies to <client-id> or <user, client-id>
408
- * @param sanitizedClientId sanitized client ID to override if quota applies to <client-id> or <user, client-id>
409
- * @param quota custom quota to apply or None if quota override is being removed
406
+ * @param userEntity user to override if quota applies to <user> or <user, client-id>
407
+ * @param clientEntity sanitized client entity to override if quota applies to <client-id> or <user, client-id>
408
+ * @param quota custom quota to apply or None if quota override is being removed
410
409
*/
411
- def updateQuota (sanitizedUser : Option [String ], clientId : Option [String ], sanitizedClientId : Option [String ], quota : Option [Quota ]): Unit = {
410
+ def updateQuota (
411
+ userEntity : Option [BaseUserEntity ],
412
+ clientEntity : Option [ClientQuotaEntity .ConfigEntity ],
413
+ quota : Option [Quota ]
414
+ ): Unit = {
412
415
/*
413
416
* Acquire the write lock to apply changes in the quota objects.
414
417
* This method changes the quota in the overriddenQuota map and applies the update on the actual KafkaMetric object (if it exists).
@@ -418,29 +421,21 @@ class ClientQuotaManager(private val config: ClientQuotaManagerConfig,
418
421
*/
419
422
lock.writeLock().lock()
420
423
try {
421
- val userEntity = sanitizedUser.map {
422
- case ZooKeeperInternals .DEFAULT_STRING => DefaultUserEntity
423
- case user => UserEntity (user)
424
- }
425
- val clientIdEntity = sanitizedClientId.map {
426
- case ZooKeeperInternals .DEFAULT_STRING => DefaultClientIdEntity
427
- case _ => ClientIdEntity (clientId.getOrElse(throw new IllegalStateException (" Client-id not provided" )))
428
- }
429
- val quotaEntity = KafkaQuotaEntity (userEntity, clientIdEntity)
424
+ val quotaEntity = KafkaQuotaEntity (userEntity, clientEntity)
430
425
431
426
if (userEntity.nonEmpty) {
432
427
if (quotaEntity.clientIdEntity.nonEmpty)
433
428
quotaTypesEnabled |= QuotaTypes .UserClientIdQuotaEnabled
434
429
else
435
430
quotaTypesEnabled |= QuotaTypes .UserQuotaEnabled
436
- } else if (clientIdEntity .nonEmpty)
431
+ } else if (clientEntity .nonEmpty)
437
432
quotaTypesEnabled |= QuotaTypes .ClientIdQuotaEnabled
438
433
439
434
quota match {
440
435
case Some (newQuota) => quotaCallback.updateQuota(clientQuotaType, quotaEntity, newQuota.bound)
441
436
case None => quotaCallback.removeQuota(clientQuotaType, quotaEntity)
442
437
}
443
- val updatedEntity = if (userEntity.contains(DefaultUserEntity ) || clientIdEntity .contains(DefaultClientIdEntity ))
438
+ val updatedEntity = if (userEntity.contains(DefaultUserEntity ) || clientEntity .contains(DefaultClientIdEntity ))
444
439
None // more than one entity may need updating, so `updateQuotaMetricConfigs` will go through all metrics
445
440
else
446
441
Some (quotaEntity)
@@ -452,9 +447,8 @@ class ClientQuotaManager(private val config: ClientQuotaManagerConfig,
452
447
}
453
448
454
449
/**
455
- * Updates metrics configs. This is invoked when quota configs are updated in ZooKeeper
456
- * or when partitions leaders change and custom callbacks that implement partition-based quotas
457
- * have updated quotas.
450
+ * Updates metrics configs. This is invoked when quota configs are updated when partitions leaders change
451
+ * and custom callbacks that implement partition-based quotas have updated quotas.
458
452
*
459
453
* @param updatedQuotaEntity If set to one entity and quotas have only been enabled at one
460
454
* level, then an optimized update is performed with a single metric update. If None is provided,
0 commit comments