|
17 | 17 |
|
18 | 18 | package kafka.server
|
19 | 19 |
|
20 |
| -import java.net.{InetAddress, UnknownHostException} |
21 | 20 | import java.util.{Collections, Properties}
|
22 | 21 | import kafka.log.UnifiedLog
|
23 |
| -import kafka.network.ConnectionQuotas |
24 | 22 | import kafka.server.QuotaFactory.QuotaManagers
|
25 | 23 | import kafka.utils.Logging
|
26 | 24 | import org.apache.kafka.server.config.{QuotaConfig, ZooKeeperInternals}
|
27 |
| -import org.apache.kafka.common.metrics.Quota |
28 | 25 | import org.apache.kafka.common.metrics.Quota._
|
29 |
| -import org.apache.kafka.common.utils.Sanitizer |
30 | 26 | import org.apache.kafka.coordinator.group.GroupCoordinator
|
31 | 27 | import org.apache.kafka.server.ClientMetricsManager
|
32 | 28 | import org.apache.kafka.server.common.StopPartition
|
@@ -142,60 +138,6 @@ class TopicConfigHandler(private val replicaManager: ReplicaManager,
|
142 | 138 | }
|
143 | 139 | }
|
144 | 140 |
|
145 |
| - |
146 |
| -/** |
147 |
| - * Handles <client-id>, <user> or <user, client-id> quota config updates in ZK. |
148 |
| - * This implementation reports the overrides to the respective ClientQuotaManager objects |
149 |
| - */ |
150 |
| -class QuotaConfigHandler(private val quotaManagers: QuotaManagers) { |
151 |
| - |
152 |
| - def updateQuotaConfig(sanitizedUser: Option[String], sanitizedClientId: Option[String], config: Properties): Unit = { |
153 |
| - val clientId = sanitizedClientId.map(Sanitizer.desanitize) |
154 |
| - val producerQuota = |
155 |
| - if (config.containsKey(QuotaConfig.PRODUCER_BYTE_RATE_OVERRIDE_CONFIG)) |
156 |
| - Some(new Quota(config.getProperty(QuotaConfig.PRODUCER_BYTE_RATE_OVERRIDE_CONFIG).toLong.toDouble, true)) |
157 |
| - else |
158 |
| - None |
159 |
| - quotaManagers.produce.updateQuota(sanitizedUser, clientId, sanitizedClientId, producerQuota) |
160 |
| - val consumerQuota = |
161 |
| - if (config.containsKey(QuotaConfig.CONSUMER_BYTE_RATE_OVERRIDE_CONFIG)) |
162 |
| - Some(new Quota(config.getProperty(QuotaConfig.CONSUMER_BYTE_RATE_OVERRIDE_CONFIG).toLong.toDouble, true)) |
163 |
| - else |
164 |
| - None |
165 |
| - quotaManagers.fetch.updateQuota(sanitizedUser, clientId, sanitizedClientId, consumerQuota) |
166 |
| - val requestQuota = |
167 |
| - if (config.containsKey(QuotaConfig.REQUEST_PERCENTAGE_OVERRIDE_CONFIG)) |
168 |
| - Some(new Quota(config.getProperty(QuotaConfig.REQUEST_PERCENTAGE_OVERRIDE_CONFIG).toDouble, true)) |
169 |
| - else |
170 |
| - None |
171 |
| - quotaManagers.request.updateQuota(sanitizedUser, clientId, sanitizedClientId, requestQuota) |
172 |
| - val controllerMutationQuota = |
173 |
| - if (config.containsKey(QuotaConfig.CONTROLLER_MUTATION_RATE_OVERRIDE_CONFIG)) |
174 |
| - Some(new Quota(config.getProperty(QuotaConfig.CONTROLLER_MUTATION_RATE_OVERRIDE_CONFIG).toDouble, true)) |
175 |
| - else |
176 |
| - None |
177 |
| - quotaManagers.controllerMutation.updateQuota(sanitizedUser, clientId, sanitizedClientId, controllerMutationQuota) |
178 |
| - } |
179 |
| -} |
180 |
| - |
181 |
| -class IpConfigHandler(private val connectionQuotas: ConnectionQuotas) extends ConfigHandler with Logging { |
182 |
| - |
183 |
| - def processConfigChanges(ip: String, config: Properties): Unit = { |
184 |
| - val ipConnectionRateQuota = Option(config.getProperty(QuotaConfig.IP_CONNECTION_RATE_OVERRIDE_CONFIG)).map(_.toInt) |
185 |
| - val updatedIp = { |
186 |
| - if (ip != ZooKeeperInternals.DEFAULT_STRING) { |
187 |
| - try { |
188 |
| - Some(InetAddress.getByName(ip)) |
189 |
| - } catch { |
190 |
| - case _: UnknownHostException => throw new IllegalArgumentException(s"Unable to resolve address $ip") |
191 |
| - } |
192 |
| - } else |
193 |
| - None |
194 |
| - } |
195 |
| - connectionQuotas.updateIpConnectionRateQuota(updatedIp, ipConnectionRateQuota) |
196 |
| - } |
197 |
| -} |
198 |
| - |
199 | 141 | /**
|
200 | 142 | * The BrokerConfigHandler will process individual broker config changes in ZK.
|
201 | 143 | * The callback provides the brokerId and the full properties set read from ZK.
|
|
0 commit comments