-
Notifications
You must be signed in to change notification settings - Fork 220
/
Copy pathAvroProducer.java
35 lines (28 loc) · 1.14 KB
/
AvroProducer.java
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
import java.util.*;
import org.apache.kafka.clients.producer.*;
public class AvroProducer {
public static void main(String[] args) throws Exception{
String topicName = "AvroClicks";
String msg;
Properties props = new Properties();
props.put("bootstrap.servers", "localhost:9092,localhost:9093");
props.put("key.serializer","org.apache.kafka.common.serialization.StringSerializer");
props.put("value.serializer", "io.confluent.kafka.serializers.KafkaAvroSerializer");
props.put("schema.registry.url", "http://localhost:8081");
Producer<String, ClickRecord> producer = new KafkaProducer <>(props);
ClickRecord cr = new ClickRecord();
try{
cr.setSessionId("10001");
cr.setChannel("HomePage");
cr.setIp("192.168.0.1");
producer.send(new ProducerRecord<String, ClickRecord>(topicName,cr.getSessionId().toString(),cr)).get();
System.out.println("Complete");
}
catch(Exception ex){
ex.printStackTrace(System.out);
}
finally{
producer.close();
}
}
}