Skip to content

Commit

Permalink
KAFKA-2489: add benchmark for new consumer
Browse files Browse the repository at this point in the history
ewencp
The changes here are smaller than they look - mostly refactoring/cleanup.

- ConsumerPerformanceService: added new_consumer flag, and exposed more command-line settings
- benchmark.py: refactored to use `parametrize` and `matrix` - this reduced some amount of repeated code
- benchmark.py: added consumer performance tests with new consumer (using `parametrize`)
- benchmark.py: added more detailed test descriptions
- performance.py: broke into separate files

Author: Geoff Anderson <[email protected]>

Reviewers: Ewen Cheslack-Postava, Jason Gustafson, Gwen Shapira

Closes apache#179 from granders/KAFKA-2489-benchmark-new-consumer
  • Loading branch information
Geoff Anderson authored and gwenshap committed Sep 9, 2015
1 parent f890298 commit b8b1bca
Show file tree
Hide file tree
Showing 12 changed files with 494 additions and 338 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -155,7 +155,7 @@
* called <i>test</i> as described above.
* <p>
* The broker will automatically detect failed processes in the <i>test</i> group by using a heartbeat mechanism. The
* consumer will automatically ping the cluster periodically, which let's the cluster know that it is alive. As long as
* consumer will automatically ping the cluster periodically, which lets the cluster know that it is alive. As long as
* the consumer is able to do this it is considered alive and retains the right to consume from the partitions assigned
* to it. If it stops heartbeating for a period of time longer than <code>session.timeout.ms</code> then it will be
* considered dead and its partitions will be assigned to another process.
Expand Down
44 changes: 35 additions & 9 deletions core/src/main/scala/kafka/tools/ConsumerPerformance.scala
Original file line number Diff line number Diff line change
Expand Up @@ -17,11 +17,15 @@

package kafka.tools

import java.util

import org.apache.kafka.common.TopicPartition

import scala.collection.JavaConversions._
import java.util.concurrent.atomic.AtomicLong
import java.nio.channels.ClosedByInterruptException
import org.apache.log4j.Logger
import org.apache.kafka.clients.consumer.KafkaConsumer
import org.apache.kafka.clients.consumer.{ConsumerRebalanceListener, KafkaConsumer}
import org.apache.kafka.common.serialization.ByteArrayDeserializer
import kafka.utils.CommandLineUtils
import java.util.{ Random, Properties }
Expand Down Expand Up @@ -58,7 +62,7 @@ object ConsumerPerformance {
val consumer = new KafkaConsumer[Array[Byte], Array[Byte]](config.props)
consumer.subscribe(List(config.topic))
startMs = System.currentTimeMillis
consume(consumer, config.numMessages, 1000, config, totalMessagesRead, totalBytesRead)
consume(consumer, List(config.topic), config.numMessages, 1000, config, totalMessagesRead, totalBytesRead)
endMs = System.currentTimeMillis
consumer.close()
} else {
Expand Down Expand Up @@ -93,18 +97,40 @@ object ConsumerPerformance {
}
}

def consume(consumer: KafkaConsumer[Array[Byte], Array[Byte]], count: Long, timeout: Long, config: ConsumerPerfConfig, totalMessagesRead: AtomicLong, totalBytesRead: AtomicLong) {
def consume(consumer: KafkaConsumer[Array[Byte], Array[Byte]], topics: List[String], count: Long, timeout: Long, config: ConsumerPerfConfig, totalMessagesRead: AtomicLong, totalBytesRead: AtomicLong) {
var bytesRead = 0L
var messagesRead = 0L
val startMs = System.currentTimeMillis
var lastReportTime: Long = startMs
var lastBytesRead = 0L
var lastMessagesRead = 0L
var lastConsumed = System.currentTimeMillis
while(messagesRead < count && lastConsumed >= System.currentTimeMillis - timeout) {

// Wait for group join, metadata fetch, etc
val joinTimeout = 10000
val isAssigned = new AtomicBoolean(false)
consumer.subscribe(topics, new ConsumerRebalanceListener {
def onPartitionsAssigned(consumer: org.apache.kafka.clients.consumer.Consumer[_, _], partitions: util.Collection[TopicPartition]) {
isAssigned.set(true)
}
def onPartitionsRevoked(consumer: org.apache.kafka.clients.consumer.Consumer[_, _], partitions: util.Collection[TopicPartition]) {
isAssigned.set(false)
}})
val joinStart = System.currentTimeMillis()
while (!isAssigned.get()) {
if (System.currentTimeMillis() - joinStart >= joinTimeout) {
throw new Exception("Timed out waiting for initial group join.")
}
consumer.poll(100)
}
consumer.seekToBeginning()

// Now start the benchmark
val startMs = System.currentTimeMillis
var lastReportTime: Long = startMs
var lastConsumedTime = System.currentTimeMillis

while(messagesRead < count && System.currentTimeMillis() - lastConsumedTime <= timeout) {
val records = consumer.poll(100)
if(records.count() > 0)
lastConsumed = System.currentTimeMillis
lastConsumedTime = System.currentTimeMillis
for(record <- records) {
messagesRead += 1
if(record.key != null)
Expand All @@ -121,6 +147,7 @@ object ConsumerPerformance {
}
}
}

totalMessagesRead.set(messagesRead)
totalBytesRead.set(bytesRead)
}
Expand Down Expand Up @@ -255,5 +282,4 @@ object ConsumerPerformance {
}

}

}
163 changes: 0 additions & 163 deletions tests/kafkatest/services/performance.py

This file was deleted.

19 changes: 19 additions & 0 deletions tests/kafkatest/services/performance/__init__.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,19 @@
# Licensed to the Apache Software Foundation (ASF) under one or more
# contributor license agreements. See the NOTICE file distributed with
# this work for additional information regarding copyright ownership.
# The ASF licenses this file to You under the Apache License, Version 2.0
# (the "License"); you may not use this file except in compliance with
# the License. You may obtain a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.

from performance import PerformanceService
from end_to_end_latency import EndToEndLatencyService
from producer_performance import ProducerPerformanceService
from consumer_performance import ConsumerPerformanceService
Loading

0 comments on commit b8b1bca

Please sign in to comment.