Skip to content

Commit 4deb3af

Browse files
Kafka connector was throwing null pointer exception when entire record is null, issue fix. (#2575)
Co-authored-by: ejeffrli <[email protected]> Co-authored-by: ejeffrli <[email protected]>
1 parent d6e5530 commit 4deb3af

File tree

2 files changed

+29
-11
lines changed

2 files changed

+29
-11
lines changed

athena-kafka/src/main/java/com/amazonaws/athena/connectors/kafka/KafkaRecordHandler.java

+12
Original file line numberDiff line numberDiff line change
@@ -262,6 +262,10 @@ private void execute(
262262
ConsumerRecord<String, TopicResultSet> record)
263263
{
264264
final boolean[] isExecuted = {false};
265+
if (record == null || record.value() == null) {
266+
LOGGER.warn("[NullRecord] {} Received a null record or record value, offset: {}", splitParameters, record != null ? record.offset() : "unknown");
267+
return;
268+
}
265269
spiller.writeRows((Block block, int rowNum) -> {
266270
for (KafkaField field : record.value().getFields()) {
267271
boolean isMatched = block.offerValue(field.getName(), rowNum, field.getValue());
@@ -345,6 +349,10 @@ private void avroExecute(
345349
SplitParameters splitParameters,
346350
ConsumerRecord<String, GenericRecord> record)
347351
{
352+
if (record == null || record.value() == null) {
353+
LOGGER.warn("[NullRecord] {} Received a null record or record value, offset: {}", splitParameters, record != null ? record.offset() : "unknown");
354+
return; // Skip processing this record
355+
}
348356
spiller.writeRows((Block block, int rowNum) -> {
349357
for (Schema.Field next : record.value().getSchema().getFields()) {
350358
boolean isMatched = block.offerValue(next.name(), rowNum, record.value().get(next.name()));
@@ -427,6 +435,10 @@ private void protobufExecute(
427435
SplitParameters splitParameters,
428436
ConsumerRecord<String, DynamicMessage> record)
429437
{
438+
if (record == null || record.value() == null) {
439+
LOGGER.warn("[NullRecord] {} Received a null record or record value, offset: {}", splitParameters, record != null ? record.offset() : "unknown");
440+
return; // Skip processing this record
441+
}
430442
spiller.writeRows((Block block, int rowNum) -> {
431443
for (Descriptors.FieldDescriptor next : record.value().getAllFields().keySet()) {
432444
boolean isMatched = block.offerValue(next.getName(), rowNum, record.value().getField(next));

athena-kafka/src/test/java/com/amazonaws/athena/connectors/kafka/KafkaRecordHandlerTest.java

+17-11
Original file line numberDiff line numberDiff line change
@@ -122,21 +122,27 @@ public void setUp() throws Exception {
122122
MockitoAnnotations.initMocks(this);
123123

124124
consumer = new MockConsumer<>(OffsetResetStrategy.EARLIEST);
125-
ConsumerRecord<String, TopicResultSet> record1 = createConsumerRecord("myTopic", 0, "k1", createTopicResultSet("myTopic"));
126-
ConsumerRecord<String, TopicResultSet> record2 = createConsumerRecord("myTopic", 0, "k2", createTopicResultSet("myTopic"));
125+
ConsumerRecord<String, TopicResultSet> record1 = createConsumerRecord("myTopic", 0, 0, "k1", createTopicResultSet("myTopic"));
126+
ConsumerRecord<String, TopicResultSet> record2 = createConsumerRecord("myTopic", 0, 1, "k2", createTopicResultSet("myTopic"));
127+
ConsumerRecord<String, TopicResultSet> nullValueRecord = createConsumerRecord("myTopic", 0, 2, "k3", null);
127128
consumer.schedulePollTask(() -> {
128129
consumer.addRecord(record1);
129130
consumer.addRecord(record2);
131+
consumer.addRecord(nullValueRecord);
130132
});
131133
avroConsumer = new MockConsumer<>(OffsetResetStrategy.EARLIEST);
132-
ConsumerRecord<String, GenericRecord> avroRecord = createAvroConsumerRecord("greetings", 0 , "k1", createGenericRecord("greetings"));
134+
ConsumerRecord<String, GenericRecord> avroRecord = createAvroConsumerRecord("greetings", 0 , 0, "k1", createGenericRecord("greetings"));
135+
ConsumerRecord<String, GenericRecord> avroNullValueRecord = createAvroConsumerRecord("greetings", 0 , 1, "k2", null);
133136
avroConsumer.schedulePollTask(() -> {
134137
avroConsumer.addRecord(avroRecord);
138+
avroConsumer.addRecord(avroNullValueRecord);
135139
});
136140
protobufConsumer = new MockConsumer<>(OffsetResetStrategy.EARLIEST);
137-
ConsumerRecord<String, DynamicMessage> protobufRecord = createProtobufConsumerRecord("protobuftest", 0, "k1", createDynamicRecord());
141+
ConsumerRecord<String, DynamicMessage> protobufRecord = createProtobufConsumerRecord("protobuftest", 0, 0, "k1", createDynamicRecord());
142+
ConsumerRecord<String, DynamicMessage> protobufNullValueRecord = createProtobufConsumerRecord("protobuftest", 0, 1, "k2", null);
138143
protobufConsumer.schedulePollTask(() -> {
139144
protobufConsumer.addRecord(protobufRecord);
145+
protobufConsumer.addRecord(protobufNullValueRecord);
140146
});
141147
spillConfig = SpillConfig.newBuilder()
142148
.withEncryptionKey(encryptionKey)
@@ -173,7 +179,7 @@ public void testForConsumeDataFromTopic() throws Exception {
173179
offsets.put(new TopicPartition("myTopic", 0), 1L);
174180
consumer.updateEndOffsets(offsets);
175181

176-
SplitParameters splitParameters = new SplitParameters("myTopic", 0, 0, 1);
182+
SplitParameters splitParameters = new SplitParameters("myTopic", 0, 0, 2);
177183
Schema schema = createSchema(createCsvTopicSchema());
178184

179185
mockedKafkaUtils.when(() -> KafkaUtils.getKafkaConsumer(schema, com.google.common.collect.ImmutableMap.of())).thenReturn(consumer);
@@ -351,16 +357,16 @@ private ReadRecordsRequest createReadRecordsRequest(Schema schema) {
351357
0);
352358
}
353359

354-
private ConsumerRecord<String, TopicResultSet> createConsumerRecord(String topic, int partition, String key, TopicResultSet data) throws Exception {
355-
return new ConsumerRecord<>(topic, partition, 0, key, data);
360+
private ConsumerRecord<String, TopicResultSet> createConsumerRecord(String topic, int partition, long offset, String key, TopicResultSet data) throws Exception {
361+
return new ConsumerRecord<>(topic, partition, offset, key, data);
356362
}
357363

358-
private ConsumerRecord<String, GenericRecord> createAvroConsumerRecord(String topic, int partition, String key, GenericRecord data) throws Exception {
359-
return new ConsumerRecord<>(topic, partition, 0, key, data);
364+
private ConsumerRecord<String, GenericRecord> createAvroConsumerRecord(String topic, int partition, long offset, String key, GenericRecord data) throws Exception {
365+
return new ConsumerRecord<>(topic, partition, offset, key, data);
360366
}
361367

362-
private ConsumerRecord<String, DynamicMessage> createProtobufConsumerRecord(String topic, int partition, String key, DynamicMessage data) throws Exception {
363-
return new ConsumerRecord<>(topic, partition, 0, key, data);
368+
private ConsumerRecord<String, DynamicMessage> createProtobufConsumerRecord(String topic, int partition, long offset, String key, DynamicMessage data) throws Exception {
369+
return new ConsumerRecord<>(topic, partition, offset, key, data);
364370
}
365371

366372
private TopicResultSet createTopicResultSet(String topic) {

0 commit comments

Comments
 (0)