-
Notifications
You must be signed in to change notification settings - Fork 9
/
Copy pathsr.js
126 lines (110 loc) · 3.44 KB
/
sr.js
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
// require('kafkajs') is replaced with require('@confluentinc/kafka-javascript').KafkaJS.
const { Kafka } = require('@confluentinc/kafka-javascript').KafkaJS;
// Note: The @confluentinc/schemaregistry will need to be installed separately to run this example,
// as it isn't a dependency of confluent-kafka-javascript.
const { SchemaRegistryClient, SerdeType, AvroSerializer, AvroDeserializer} = require('@confluentinc/schemaregistry');
const registry = new SchemaRegistryClient({ baseURLs: ['<fill>'] })
const kafka = new Kafka({
kafkaJS: {
brokers: ['<fill>'],
ssl: true,
sasl: {
mechanism: 'plain',
username: '<fill>',
password: '<fill>',
},
}
});
let consumer = kafka.consumer({
kafkaJS: {
groupId: "test-group",
fromBeginning: true,
},
});
let producer = kafka.producer();
const schemaA = {
type: 'record',
namespace: 'test',
name: 'A',
fields: [
{ name: 'id', type: 'int' },
{ name: 'b', type: 'test.B' },
],
};
const schemaB = {
type: 'record',
namespace: 'test',
name: 'B',
fields: [{ name: 'id', type: 'int' }],
};
const topicName = 'test-topic';
const subjectName = topicName + '-value';
const run = async () => {
// Register schemaB.
await registry.register(
'avro-b',
{
schemaType: 'AVRO',
schema: JSON.stringify(schemaB),
}
);
const response = await registry.getLatestSchemaMetadata('avro-b');
const version = response.version
// Register schemaA, which references schemaB.
const id = await registry.register(
subjectName,
{
schemaType: 'AVRO',
schema: JSON.stringify(schemaA),
references: [
{
name: 'test.B',
subject: 'avro-b',
version,
},
],
}
)
// Create an Avro serializer
const ser = new AvroSerializer(registry, SerdeType.VALUE, { useLatestVersion: true });
// Produce a message with schemaA.
await producer.connect()
const outgoingMessage = {
key: 'key',
value: await ser.serialize(topicName, { id: 1, b: { id: 2 } }),
}
await producer.send({
topic: topicName,
messages: [outgoingMessage]
});
console.log("Producer sent its message.")
await producer.disconnect();
producer = null;
// Create an Avro deserializer
const deser = new AvroDeserializer(registry, SerdeType.VALUE, {});
await consumer.connect()
await consumer.subscribe({ topic: topicName })
let messageRcvd = false;
await consumer.run({
eachMessage: async ({ message }) => {
const decodedMessage = {
...message,
value: await deser.deserialize(topicName, message.value)
};
console.log("Consumer received message.\nBefore decoding: " + JSON.stringify(message) + "\nAfter decoding: " + JSON.stringify(decodedMessage));
messageRcvd = true;
},
});
// Wait around until we get a message, and then disconnect.
while (!messageRcvd) {
await new Promise((resolve) => setTimeout(resolve, 100));
}
await consumer.disconnect();
consumer = null;
}
run().catch (async e => {
console.error(e);
consumer && await consumer.disconnect();
producer && await producer.disconnect();
process.exit(1);
})