Skip to content

Commit b2dbb58

Browse files
authored
fixes #107 and adds support for multi-level nesting of arrays (#114)
1 parent 2e36deb commit b2dbb58

File tree

2 files changed

+28
-7
lines changed

2 files changed

+28
-7
lines changed

src/main/java/at/grahsl/kafka/connect/mongodb/converter/AvroJsonSchemafulRecordConverter.java

+11-7
Original file line numberDiff line numberDiff line change
@@ -28,6 +28,7 @@
2828
import org.bson.BsonArray;
2929
import org.bson.BsonDocument;
3030
import org.bson.BsonNull;
31+
import org.bson.BsonValue;
3132
import org.slf4j.Logger;
3233
import org.slf4j.LoggerFactory;
3334

@@ -119,7 +120,7 @@ private void processField(BsonDocument doc, Struct struct, Field field) {
119120
handleStructField(doc, (Struct)struct.get(field), field);
120121
break;
121122
case ARRAY:
122-
handleArrayField(doc, (List)struct.get(field), field);
123+
doc.put(field.name(),handleArrayField((List)struct.get(field), field));
123124
break;
124125
case MAP:
125126
handleMapField(doc, (Map)struct.get(field), field);
@@ -157,31 +158,34 @@ private void handleMapField(BsonDocument doc, Map m, Field field) {
157158
final List list = (List)m.get(key);
158159
logger.trace("adding array values to {} of type valueSchema={} value='{}'",
159160
elementField.name(), elementField.schema().valueSchema(), list);
160-
handleArrayField(bd, list, elementField);
161+
bd.put(key, handleArrayField(list, elementField));
161162
} else {
162163
bd.put(key, toBsonDoc(field.schema().valueSchema(), m.get(key)));
163164
}
164165
}
165166
doc.put(field.name(), bd);
166167
}
167168

168-
private void handleArrayField(BsonDocument doc, List list, Field field) {
169+
private BsonValue handleArrayField(List list, Field field) {
169170
logger.trace("handling complex type 'array' of types '{}'",
170171
field.schema().valueSchema().type());
171172
if(list==null) {
172173
logger.trace("no array -> adding null");
173-
doc.put(field.name(), BsonNull.VALUE);
174-
return;
174+
return BsonNull.VALUE;
175175
}
176176
BsonArray array = new BsonArray();
177+
Schema.Type st = field.schema().valueSchema().type();
177178
for(Object element : list) {
178-
if(field.schema().valueSchema().type().isPrimitive()) {
179+
if(st.isPrimitive()) {
179180
array.add(getConverter(field.schema().valueSchema()).toBson(element,field.schema()));
181+
} else if(st == Schema.Type.ARRAY) {
182+
Field elementField = new Field("first", 0, field.schema().valueSchema());
183+
array.add(handleArrayField((List)element,elementField));
180184
} else {
181185
array.add(toBsonDoc(field.schema().valueSchema(), element));
182186
}
183187
}
184-
doc.put(field.name(), array);
188+
return array;
185189
}
186190

187191
private void handleStructField(BsonDocument doc, Struct struct, Field field) {

src/test/java/at/grahsl/kafka/connect/mongodb/converter/RecordConverterTest.java

+17
Original file line numberDiff line numberDiff line change
@@ -61,6 +61,7 @@ public static void initializeTestData() {
6161
"\"mySubDoc1\":{\"myString\":\"hello json\"}," +
6262
"\"myArray1\":[\"str_1\",\"str_2\",\"...\",\"str_N\"]," +
6363
"\"myArray2\":[{\"k\":\"a\",\"v\":1},{\"k\":\"b\",\"v\":2},{\"k\":\"c\",\"v\":3}]," +
64+
"\"myArray3\":[[[1],[],[2,3],[4,5,6]]]," +
6465
"\"mySubDoc2\":{\"k1\":9,\"k2\":8,\"k3\":7}," +
6566
"\"myMapOfStrings\":{\"k1\": [ \"v1-a\", \"v1-b\" ],\"k2\": [ \"v2-a\" ],\"k3\":[ \"v3-a\", \"v3-b\", \"v3-c\" ]}," +
6667
"\"myMapOfInts\":{\"k1\": [ 11, 12 ],\"k2\": [ 21 ],\"k3\":[ 31, 32, 33 ]}," +
@@ -85,6 +86,7 @@ public static void initializeTestData() {
8586
.field("v",Schema.INT32_SCHEMA)
8687
.build())
8788
)
89+
.field("myArray3", SchemaBuilder.array(SchemaBuilder.array(SchemaBuilder.array(Schema.INT32_SCHEMA))))
8890
.field("mySubDoc2", SchemaBuilder.map(Schema.STRING_SCHEMA, Schema.INT32_SCHEMA).build())
8991
.field( "myMapOfStrings", SchemaBuilder.map(Schema.STRING_SCHEMA, SchemaBuilder.array(Schema.STRING_SCHEMA).build()).build())
9092
.field( "myMapOfInts", SchemaBuilder.map(Schema.STRING_SCHEMA, SchemaBuilder.array(Schema.INT32_SCHEMA).build()).build())
@@ -113,6 +115,9 @@ public static void initializeTestData() {
113115
.put("k","c").put("v",3)
114116
)
115117
)
118+
.put("myArray3", Arrays.asList(
119+
Arrays.asList(Arrays.asList(1),Arrays.asList(),Arrays.asList(2,3),Arrays.asList(4,5,6))
120+
))
116121
.put("mySubDoc2",new HashMap<String,Integer>(){{ put("k1",9); put("k2",8); put("k3",7);}})
117122
.put("myMapOfStrings", new HashMap<String, List<String>>(){{
118123
put("k1", Arrays.asList("v1-a", "v1-b"));
@@ -152,6 +157,10 @@ public static void initializeTestData() {
152157
new HashMap<Object,Object>(){{put("k","c");put("v",3);}}
153158
)
154159
);
160+
OBJ_MAP_1.put("myArray3",Arrays.asList(
161+
Arrays.asList(Arrays.asList(1), Arrays.asList(), Arrays.asList(2,3), Arrays.asList(4,5,6))
162+
)
163+
);
155164
OBJ_MAP_1.put("mySubDoc2",new HashMap<String,Integer>(){{ put("k1",9); put("k2",8); put("k3",7);}});
156165
OBJ_MAP_1.put("myMapOfStrings",new HashMap<String,List<String>>(){{
157166
put("k1",Arrays.asList("v1-a", "v1-b"));
@@ -195,6 +204,14 @@ public static void initializeTestData() {
195204
new BsonDocument("k", new BsonString("b")).append("v", new BsonInt32(2)),
196205
new BsonDocument("k", new BsonString("c")).append("v", new BsonInt32(3))))
197206
)
207+
.append("myArray3", new BsonArray(Arrays.asList(
208+
new BsonArray(Arrays.asList(
209+
new BsonArray(Arrays.asList(new BsonInt32(1))),
210+
new BsonArray(),
211+
new BsonArray(Arrays.asList(new BsonInt32(2),new BsonInt32(3))),
212+
new BsonArray(Arrays.asList(new BsonInt32(4),new BsonInt32(5),new BsonInt32(6)))
213+
))))
214+
)
198215
.append("mySubDoc2", new BsonDocument("k1", new BsonInt32(9))
199216
.append("k2", new BsonInt32(8))
200217
.append("k3", new BsonInt32(7))

0 commit comments

Comments
 (0)