Skip to content

Commit

Permalink
[FEATURE] Add Support for querying Avro Data in Kafka Connector (#1958)
Browse files Browse the repository at this point in the history
  • Loading branch information
Jithendar12 authored Jun 13, 2024
1 parent 2eb357e commit 79a1c6b
Show file tree
Hide file tree
Showing 11 changed files with 444 additions and 58 deletions.
5 changes: 5 additions & 0 deletions athena-kafka/athena-kafka.yaml
Original file line number Diff line number Diff line change
Expand Up @@ -28,6 +28,10 @@ Parameters:
KafkaEndpoint:
Description: 'Kafka cluster endpoint'
Type: String
SchemaRegistryUrl:
Description: 'Schema Registry URL. Applicable for Avro Data Formatted Topics. (syntax. http://<endpoint>:<port>)'
Type: String
Default: ""
LambdaFunctionName:
Description: 'This is the name of the lambda function that will be created. This name must satisfy the pattern ^[a-z0-9-_]{1,64}$'
Type: String
Expand Down Expand Up @@ -94,6 +98,7 @@ Resources:
secrets_manager_secret: !Ref SecretNamePrefix
certificates_s3_reference: !Ref CertificatesS3Reference
kafka_endpoint: !Ref KafkaEndpoint
schema_registry_url: !Ref SchemaRegistryUrl
auth_type: !Ref AuthType
FunctionName: !Ref LambdaFunctionName
Handler: "com.amazonaws.athena.connectors.kafka.KafkaCompositeHandler"
Expand Down
16 changes: 16 additions & 0 deletions athena-kafka/pom.xml
Original file line number Diff line number Diff line change
Expand Up @@ -26,6 +26,16 @@
<artifactId>kafka-clients</artifactId>
<version>3.7.0</version>
</dependency>
<dependency>
<groupId>org.apache.avro</groupId>
<artifactId>avro</artifactId>
<version>1.11.3</version>
</dependency>
<dependency>
<groupId>io.confluent</groupId>
<artifactId>kafka-avro-serializer</artifactId>
<version>7.6.0</version>
</dependency>
<dependency>
<groupId>com.fasterxml.jackson.core</groupId>
<artifactId>jackson-annotations</artifactId>
Expand Down Expand Up @@ -202,4 +212,10 @@
</plugin>
</plugins>
</build>
<repositories>
<repository>
<id>confluent</id>
<url>https://packages.confluent.io/maven/</url>
</repository>
</repositories>
</project>
Original file line number Diff line number Diff line change
Expand Up @@ -72,4 +72,9 @@ public <T> T getGlueSchema(String glueRegistryName, String glueSchemaName, Class
GetSchemaVersionResult result = getSchemaVersionResult(glueRegistryName, glueSchemaName);
return objectMapper.readValue(result.getSchemaDefinition(), clazz);
}
public String getGlueSchemaType(String glueRegistryName, String glueSchemaName)
{
GetSchemaVersionResult result = getSchemaVersionResult(glueRegistryName, glueSchemaName);
return result.getDataFormat();
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -43,6 +43,10 @@ public class KafkaConstants
* This is kafka node details
*/
public static final String ENV_KAFKA_ENDPOINT = "kafka_endpoint";
/**
* This is schema registry url
*/
public static final String KAFKA_SCHEMA_REGISTRY_URL = "schema_registry_url";
/**
* This is the type of authentication client has set for the cluster
*/
Expand All @@ -64,6 +68,8 @@ public class KafkaConstants

public static final int MAX_RECORDS_IN_SPLIT = 10_000;

public static final String AVRO_DATA_FORMAT = "avro";

private KafkaConstants()
{
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -37,6 +37,7 @@
import com.amazonaws.athena.connector.lambda.metadata.ListTablesRequest;
import com.amazonaws.athena.connector.lambda.metadata.ListTablesResponse;
import com.amazonaws.athena.connector.util.PaginatedRequestIterator;
import com.amazonaws.athena.connectors.kafka.dto.AvroTopicSchema;
import com.amazonaws.athena.connectors.kafka.dto.SplitParameters;
import com.amazonaws.athena.connectors.kafka.dto.TopicPartitionPiece;
import com.amazonaws.athena.connectors.kafka.dto.TopicSchema;
Expand Down Expand Up @@ -68,6 +69,7 @@
import java.util.stream.Stream;

import static com.amazonaws.athena.connector.lambda.metadata.ListTablesRequest.UNLIMITED_PAGE_SIZE_VALUE;
import static com.amazonaws.athena.connectors.kafka.KafkaConstants.AVRO_DATA_FORMAT;
import static com.amazonaws.athena.connectors.kafka.KafkaConstants.MAX_RECORDS_IN_SPLIT;

public class KafkaMetadataHandler extends MetadataHandler
Expand Down Expand Up @@ -256,7 +258,7 @@ private String findGlueSchemaNameIgnoringCasing(String glueRegistryNameIn, Strin
public GetTableResponse doGetTable(BlockAllocator blockAllocator, GetTableRequest getTableRequest) throws Exception
{
LOGGER.info("doGetTable request: {}", getTableRequest);
Schema tableSchema = null;
Schema tableSchema;
try {
tableSchema = getSchema(getTableRequest.getTableName().getSchemaName(), getTableRequest.getTableName().getTableName());
}
Expand Down Expand Up @@ -323,9 +325,16 @@ public GetSplitsResponse doGetSplits(BlockAllocator allocator, GetSplitsRequest
// returning a single partition.
String glueRegistryName = request.getTableName().getSchemaName();
String glueSchemaName = request.getTableName().getTableName();
String topic;
GlueRegistryReader registryReader = new GlueRegistryReader();
TopicSchema topicSchema = registryReader.getGlueSchema(glueRegistryName, glueSchemaName, TopicSchema.class);
String topic = topicSchema.getTopicName();
if (registryReader.getGlueSchemaType(glueRegistryName, glueSchemaName).equalsIgnoreCase(AVRO_DATA_FORMAT)) {
//if schema type is avro, then topic name should be glue schema name
topic = glueSchemaName;
}
else {
TopicSchema topicSchema = registryReader.getGlueSchema(glueRegistryName, glueSchemaName, TopicSchema.class);
topic = topicSchema.getTopicName();
}

LOGGER.info("Retrieved topicName: {}", topic);

Expand Down Expand Up @@ -419,29 +428,49 @@ private Schema getSchema(String glueRegistryName, String glueSchemaName) throws

// Get topic schema json from GLue registry as translated to TopicSchema pojo
GlueRegistryReader registryReader = new GlueRegistryReader();
TopicSchema topicSchema = registryReader.getGlueSchema(glueRegistryName, glueSchemaName, TopicSchema.class);

// Creating ArrowType for each fields in the topic schema.
// Also putting the additional column level information
// into the metadata in ArrowType field.
topicSchema.getMessage().getFields().forEach(it -> {
FieldType fieldType = new FieldType(
true,
KafkaUtils.toArrowType(it.getType()),
null,
com.google.common.collect.ImmutableMap.of(
"mapping", it.getMapping(),
"formatHint", it.getFormatHint(),
"type", it.getType()
)
);
Field field = new Field(it.getName(), fieldType, null);
schemaBuilder.addField(field);
});

// Putting the additional schema level information into the metadata in ArrowType schema.
schemaBuilder.addMetadata("dataFormat", topicSchema.getMessage().getDataFormat());

if (registryReader.getGlueSchemaType(glueRegistryName, glueSchemaName).equalsIgnoreCase(AVRO_DATA_FORMAT)) {
AvroTopicSchema avroTopicSchema = registryReader.getGlueSchema(glueRegistryName, glueSchemaName, AvroTopicSchema.class);
// Creating ArrowType for each fields in the topic schema.
// Also putting the additional column level information
// into the metadata in ArrowType field.
avroTopicSchema.getFields().forEach(it -> {
FieldType fieldType = new FieldType(
true,
KafkaUtils.toArrowType(it.getType()),
null,
com.google.common.collect.ImmutableMap.of(
"name", it.getName(),
"formatHint", it.getFormatHint(),
"type", it.getType()
)
);
Field field = new Field(it.getName(), fieldType, null);
schemaBuilder.addField(field);
});
schemaBuilder.addMetadata("dataFormat", AVRO_DATA_FORMAT);
}
else {
TopicSchema topicSchema = registryReader.getGlueSchema(glueRegistryName, glueSchemaName, TopicSchema.class);
// Creating ArrowType for each fields in the topic schema.
// Also putting the additional column level information
// into the metadata in ArrowType field.
topicSchema.getMessage().getFields().forEach(it -> {
FieldType fieldType = new FieldType(
true,
KafkaUtils.toArrowType(it.getType()),
null,
com.google.common.collect.ImmutableMap.of(
"mapping", it.getMapping(),
"formatHint", it.getFormatHint(),
"type", it.getType()
)
);
Field field = new Field(it.getName(), fieldType, null);
schemaBuilder.addField(field);
});
// Putting the additional schema level information into the metadata in ArrowType schema.
schemaBuilder.addMetadata("dataFormat", topicSchema.getMessage().getDataFormat());
}
// NOTE: these values are being shoved in here for usage later in the calling context
// of doGetTable() since Java doesn't have tuples.
schemaBuilder.addMetadata("glueRegistryName", glueRegistryName);
Expand Down
Loading

0 comments on commit 79a1c6b

Please sign in to comment.