Skip to content

Commit

Permalink
Add Support for querying Protobuf Data in Kafka Connector (#2020)
Browse files Browse the repository at this point in the history
  • Loading branch information
Jithendar12 authored Jun 24, 2024
1 parent fcf55b3 commit 22d0e83
Show file tree
Hide file tree
Showing 8 changed files with 264 additions and 6 deletions.
2 changes: 1 addition & 1 deletion athena-kafka/athena-kafka.yaml
Original file line number Diff line number Diff line change
Expand Up @@ -29,7 +29,7 @@ Parameters:
Description: 'Kafka cluster endpoint'
Type: String
SchemaRegistryUrl:
Description: 'Schema Registry URL. Applicable for Avro Data Formatted Topics. (syntax. http://<endpoint>:<port>)'
Description: 'Schema Registry URL. Applicable for Avro/Protobuf Formatted Topics. (syntax. http://<endpoint>:<port>)'
Type: String
Default: ""
LambdaFunctionName:
Expand Down
5 changes: 5 additions & 0 deletions athena-kafka/pom.xml
Original file line number Diff line number Diff line change
Expand Up @@ -36,6 +36,11 @@
<artifactId>kafka-avro-serializer</artifactId>
<version>7.6.1</version>
</dependency>
<dependency>
<groupId>io.confluent</groupId>
<artifactId>kafka-protobuf-serializer</artifactId>
<version>7.6.0</version>
</dependency>
<dependency>
<groupId>com.fasterxml.jackson.core</groupId>
<artifactId>jackson-annotations</artifactId>
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -77,4 +77,9 @@ public String getGlueSchemaType(String glueRegistryName, String glueSchemaName)
GetSchemaVersionResult result = getSchemaVersionResult(glueRegistryName, glueSchemaName);
return result.getDataFormat();
}
public String getSchemaDef(String glueRegistryName, String glueSchemaName)
{
GetSchemaVersionResult result = getSchemaVersionResult(glueRegistryName, glueSchemaName);
return result.getSchemaDefinition();
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -69,7 +69,7 @@ public class KafkaConstants
public static final int MAX_RECORDS_IN_SPLIT = 10_000;

public static final String AVRO_DATA_FORMAT = "avro";

public static final String PROTOBUF_DATA_FORMAT = "protobuf";
private KafkaConstants()
{
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -51,6 +51,8 @@
import com.amazonaws.services.glue.model.RegistryId;
import com.amazonaws.services.glue.model.RegistryListItem;
import com.google.common.annotations.VisibleForTesting;
import com.google.protobuf.Descriptors;
import io.confluent.kafka.schemaregistry.protobuf.ProtobufSchema;
import org.apache.arrow.vector.types.pojo.Field;
import org.apache.arrow.vector.types.pojo.FieldType;
import org.apache.arrow.vector.types.pojo.Schema;
Expand All @@ -71,6 +73,7 @@
import static com.amazonaws.athena.connector.lambda.metadata.ListTablesRequest.UNLIMITED_PAGE_SIZE_VALUE;
import static com.amazonaws.athena.connectors.kafka.KafkaConstants.AVRO_DATA_FORMAT;
import static com.amazonaws.athena.connectors.kafka.KafkaConstants.MAX_RECORDS_IN_SPLIT;
import static com.amazonaws.athena.connectors.kafka.KafkaConstants.PROTOBUF_DATA_FORMAT;

public class KafkaMetadataHandler extends MetadataHandler
{
Expand Down Expand Up @@ -327,8 +330,9 @@ public GetSplitsResponse doGetSplits(BlockAllocator allocator, GetSplitsRequest
String glueSchemaName = request.getTableName().getTableName();
String topic;
GlueRegistryReader registryReader = new GlueRegistryReader();
if (registryReader.getGlueSchemaType(glueRegistryName, glueSchemaName).equalsIgnoreCase(AVRO_DATA_FORMAT)) {
//if schema type is avro, then topic name should be glue schema name
String dataFormat = registryReader.getGlueSchemaType(glueRegistryName, glueSchemaName);
if (dataFormat.equalsIgnoreCase(AVRO_DATA_FORMAT) || dataFormat.equalsIgnoreCase(PROTOBUF_DATA_FORMAT)) {
//if schema type is avro/protobuf, then topic name should be glue schema name
topic = glueSchemaName;
}
else {
Expand Down Expand Up @@ -428,7 +432,8 @@ private Schema getSchema(String glueRegistryName, String glueSchemaName) throws

// Get topic schema json from GLue registry as translated to TopicSchema pojo
GlueRegistryReader registryReader = new GlueRegistryReader();
if (registryReader.getGlueSchemaType(glueRegistryName, glueSchemaName).equalsIgnoreCase(AVRO_DATA_FORMAT)) {
String dataFormat = registryReader.getGlueSchemaType(glueRegistryName, glueSchemaName);
if (dataFormat.equalsIgnoreCase(AVRO_DATA_FORMAT)) {
AvroTopicSchema avroTopicSchema = registryReader.getGlueSchema(glueRegistryName, glueSchemaName, AvroTopicSchema.class);
// Creating ArrowType for each fields in the topic schema.
// Also putting the additional column level information
Expand All @@ -449,6 +454,21 @@ private Schema getSchema(String glueRegistryName, String glueSchemaName) throws
});
schemaBuilder.addMetadata("dataFormat", AVRO_DATA_FORMAT);
}
else if (dataFormat.equalsIgnoreCase(PROTOBUF_DATA_FORMAT)) {
String glueSchema = registryReader.getSchemaDef(glueRegistryName, glueSchemaName);
ProtobufSchema protobufSchema = new ProtobufSchema(glueSchema);
Descriptors.Descriptor descriptor = protobufSchema.toDescriptor();
for (Descriptors.FieldDescriptor fieldDescriptor : descriptor.getFields()) {
FieldType fieldType = new FieldType(
true,
KafkaUtils.toArrowType(fieldDescriptor.getType().toString()),
null
);
Field field = new Field(fieldDescriptor.getName(), fieldType, null);
schemaBuilder.addField(field);
}
schemaBuilder.addMetadata("dataFormat", PROTOBUF_DATA_FORMAT);
}
else {
TopicSchema topicSchema = registryReader.getGlueSchema(glueRegistryName, glueSchemaName, TopicSchema.class);
// Creating ArrowType for each fields in the topic schema.
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -34,6 +34,8 @@
import com.amazonaws.services.secretsmanager.AWSSecretsManager;
import com.amazonaws.services.secretsmanager.AWSSecretsManagerClientBuilder;
import com.google.common.annotations.VisibleForTesting;
import com.google.protobuf.Descriptors;
import com.google.protobuf.DynamicMessage;
import org.apache.avro.Schema;
import org.apache.avro.generic.GenericRecord;
import org.apache.kafka.clients.consumer.Consumer;
Expand All @@ -49,6 +51,7 @@
import java.util.Map;

import static com.amazonaws.athena.connectors.kafka.KafkaConstants.AVRO_DATA_FORMAT;
import static com.amazonaws.athena.connectors.kafka.KafkaConstants.PROTOBUF_DATA_FORMAT;

public class KafkaRecordHandler
extends RecordHandler
Expand Down Expand Up @@ -89,7 +92,8 @@ public void readWithConstraint(BlockSpiller spiller, ReadRecordsRequest recordsR
Collection<TopicPartition> partitions = com.google.common.collect.ImmutableList.of(partition);
GlueRegistryReader registryReader = new GlueRegistryReader();

if (registryReader.getGlueSchemaType(recordsRequest.getTableName().getSchemaName(), recordsRequest.getTableName().getTableName()).equalsIgnoreCase(AVRO_DATA_FORMAT)) {
String dataFormat = registryReader.getGlueSchemaType(recordsRequest.getTableName().getSchemaName(), recordsRequest.getTableName().getTableName());
if (dataFormat.equalsIgnoreCase(AVRO_DATA_FORMAT)) {
try (Consumer<String, GenericRecord> kafkaAvroConsumer = KafkaUtils.getAvroKafkaConsumer(configOptions)) {
// Assign the topic and partition into this consumer.
kafkaAvroConsumer.assign(partitions);
Expand All @@ -115,6 +119,32 @@ public void readWithConstraint(BlockSpiller spiller, ReadRecordsRequest recordsR
avroConsume(spiller, recordsRequest, queryStatusChecker, splitParameters, kafkaAvroConsumer);
}
}
else if (dataFormat.equalsIgnoreCase(PROTOBUF_DATA_FORMAT)) {
try (Consumer<String, DynamicMessage> kafkaProtobufConsumer = KafkaUtils.getProtobufKafkaConsumer(configOptions)) {
// Assign the topic and partition into this consumer.
kafkaProtobufConsumer.assign(partitions);

// Setting the start offset from where we are interested to read data from topic partition.
// We have configured this start offset when we had created the split on MetadataHandler.
kafkaProtobufConsumer.seek(partition, splitParameters.startOffset);

// If endOffsets is 0 that means there is no data close consumer and exit
Map<TopicPartition, Long> endOffsets = kafkaProtobufConsumer.endOffsets(partitions);
if (endOffsets.get(partition) == 0) {
LOGGER.debug("[kafka] topic does not have data, closing consumer {}", splitParameters);
kafkaProtobufConsumer.close();

// For debug insight
splitParameters.info = "endOffset is 0 i.e partition does not have data";
if (LOGGER.isDebugEnabled()) {
LOGGER.debug(splitParameters.debug());
}
return;
}
// Consume topic data
protobufConsume(spiller, recordsRequest, queryStatusChecker, splitParameters, kafkaProtobufConsumer);
}
}
else {
// Initiate new KafkaConsumer that MUST not belong to any consumer group.
try (Consumer<String, TopicResultSet> kafkaConsumer = KafkaUtils.getKafkaConsumer(recordsRequest.getSchema(), configOptions)) {
Expand Down Expand Up @@ -331,4 +361,86 @@ private void avroExecute(
return 1;
});
}

private void protobufConsume(
BlockSpiller spiller,
ReadRecordsRequest recordsRequest,
QueryStatusChecker queryStatusChecker,
SplitParameters splitParameters,
Consumer<String, DynamicMessage> kafkaProtobufConsumer)
{
LOGGER.info("[kafka] {} Polling for data", splitParameters);
int emptyResultFoundCount = 0;
try (Consumer<String, DynamicMessage> protobufConsumer = kafkaProtobufConsumer) {
while (true) {
if (!queryStatusChecker.isQueryRunning()) {
LOGGER.debug("[kafka]{} Stopping and closing consumer due to query execution terminated by athena", splitParameters);
splitParameters.info = "query status is false i.e no need to work";
return;
}

// Call the poll on consumer to fetch data from kafka server
// poll returns data as batch which can be configured.
ConsumerRecords<String, DynamicMessage> records = protobufConsumer.poll(Duration.ofSeconds(1L));
LOGGER.debug("[kafka] {} polled records size {}", splitParameters, records.count());

// For debug insight
splitParameters.pulled += records.count();

// Keep track for how many times we are getting empty result for the polling call.
if (records.count() == 0) {
emptyResultFoundCount++;
}

// We will close KafkaConsumer if we are getting empty result again and again.
// Here we are comparing with a max threshold (MAX_EMPTY_RESULT_FOUNT_COUNT) to
// stop the polling.
if (emptyResultFoundCount >= MAX_EMPTY_RESULT_FOUND_COUNT) {
LOGGER.debug("[kafka] {} Closing consumer due to getting empty result from broker", splitParameters);
splitParameters.info = "always getting empty data i.e leaving from work";
return;
}

for (ConsumerRecord<String, DynamicMessage> record : records) {
// Pass batch data one by one to be processed to execute. execute method is
// a kind of abstraction to keep data filtering and writing on spiller separate.
protobufExecute(spiller, recordsRequest, queryStatusChecker, splitParameters, record);

// If we have reached at the end offset of the partition. we will not continue
// to call the polling.
if (record.offset() >= splitParameters.endOffset) {
LOGGER.debug("[kafka] {} Closing consumer due to reach at end offset (current record offset is {})", splitParameters, record.offset());

// For debug insight
splitParameters.info = String.format(
"reached at the end offset i.e no need to work: condition [if(record.offset() >= splitParameters.endOffset) i.e if(%s >= %s)]",
record.offset(),
splitParameters.endOffset
);
return;
}
}
}
}
}
private void protobufExecute(
BlockSpiller spiller,
ReadRecordsRequest recordsRequest,
QueryStatusChecker queryStatusChecker,
SplitParameters splitParameters,
ConsumerRecord<String, DynamicMessage> record)
{
spiller.writeRows((Block block, int rowNum) -> {
for (Descriptors.FieldDescriptor next : record.value().getAllFields().keySet()) {
boolean isMatched = block.offerValue(next.getName(), rowNum, record.value().getField(next));
if (!isMatched) {
LOGGER.debug("[FailedToSpill] {} Failed to spill record, offset: {}", splitParameters, record.offset());
return 0;
}
}
// For debug insight
splitParameters.spilled += 1;
return 1;
});
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -39,6 +39,7 @@
import com.amazonaws.services.secretsmanager.model.GetSecretValueResult;
import com.fasterxml.jackson.core.type.TypeReference;
import com.fasterxml.jackson.databind.ObjectMapper;
import com.google.protobuf.DynamicMessage;
import org.apache.arrow.vector.types.Types;
import org.apache.arrow.vector.types.pojo.ArrowType;
import org.apache.arrow.vector.types.pojo.Schema;
Expand Down Expand Up @@ -164,6 +165,14 @@ public static Consumer<String, GenericRecord> getAvroKafkaConsumer(java.util.Map
return new KafkaConsumer<>(properties);
}

public static Consumer<String, DynamicMessage> getProtobufKafkaConsumer(java.util.Map<String, String> configOptions) throws Exception
{
Properties properties = getKafkaProperties(configOptions);
properties.setProperty(KAFKA_VALUE_DESERIALIZER_CLASS_CONFIG, io.confluent.kafka.serializers.protobuf.KafkaProtobufDeserializer.class.getName());
properties.setProperty(KAFKA_SCHEMA_REGISTRY_URL, getRequiredConfig(KafkaConstants.KAFKA_SCHEMA_REGISTRY_URL, configOptions));
return new KafkaConsumer<>(properties);
}

/**
* Creates the required settings for kafka consumer.
*
Expand Down Expand Up @@ -417,16 +426,19 @@ public static ArrowType toArrowType(String dataType)
{
switch (dataType.trim().toUpperCase()) {
case "BOOLEAN":
case "BOOL":
return new ArrowType.Bool();
case "TINYINT":
return Types.MinorType.TINYINT.getType();
case "SMALLINT":
return Types.MinorType.SMALLINT.getType();
case "INT":
case "INT32":
case "INTEGER":
return Types.MinorType.INT.getType();
case "LONG":
case "BIGINT":
case "INT64":
return Types.MinorType.BIGINT.getType();
case "FLOAT":
return Types.MinorType.FLOAT4.getType();
Expand Down
Loading

0 comments on commit 22d0e83

Please sign in to comment.