Download the latest ZooKeeper from Apache Mirror. http://apachemirror.wuchna.com/zookeeper/stable/
rename zoo_sample.cfg zoo.cfg
Change The dataDir=/tmp/zookeeper
field in zoo.cfg
to dataDir=../logs
Set the java path and run zkServer from bin
set JAVA="C:\Program Files\Java\jdk1.8.0_144\bin\java.exe"
zkServer
Download Kafka and go to its windows folder. Run kafka in powershell if it doesn't open in cmd.
cd kafka\bin\windows
.\kafka-server-start.bat ..\..\config\server.properties
If you see the error, line is too long refer to this question.
https://stackoverflow.com/questions/48834927/the-input-line-is-too-long-when-starting-kafka
<parent>
<groupId>org.springframework.boot</groupId>
<artifactId>spring-boot-starter-parent</artifactId>
<version>2.1.9.RELEASE</version>
</parent>
<dependencies>
<dependency>
<groupId>org.springframework.boot</groupId>
<artifactId>spring-boot-starter</artifactId>
</dependency>
<dependency>
<groupId>org.springframework.kafka</groupId>
<artifactId>spring-kafka</artifactId>
</dependency>
</dependencies>
@Configuration
public class KafkaProducerConfig {
private final String BOOTSTRAP_ADDRESS = "localhost:9092";
@Bean
public ProducerFactory<String, Object> producerFactory() {
Map<String, Object> configProps = new HashMap<>();
configProps.put(
ProducerConfig.BOOTSTRAP_SERVERS_CONFIG,
BOOTSTRAP_ADDRESS);
configProps.put(
ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG,
StringSerializer.class);
configProps.put(
ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG,
JavaSerializer.class);
return new DefaultKafkaProducerFactory<>(configProps);
}
@Bean
public KafkaTemplate<String, Object> kafkaTemplate() {
return new KafkaTemplate<>(producerFactory());
}
}
Serialize Java Object to Kafka
The provided serializers only work for primitive types. So we need to create a custom serializer.
public class JavaSerializer<T> implements Serializer<T> {
@Override
public void configure(Map<String, ?> map, boolean b) {
}
@Override
public byte[] serialize(String s, T object) {
ByteArrayOutputStream bos = new ByteArrayOutputStream();
ObjectOutputStream out = null;
try {
out = new ObjectOutputStream(bos);
out.writeObject(object);
out.flush();
byte[] yourBytes = bos.toByteArray();
return yourBytes;
} catch (IOException e) {
e.printStackTrace();
} finally {
try {
bos.close();
} catch (IOException ex) {
// ignore close exception
}
}
return new byte[0];
}
@Override
public void close() {
}
}
@Repository
public class EventPublisher<T> {
private final Logger logger = LoggerFactory.getLogger(EventPublisher.class);
@Autowired
private KafkaTemplate<String, T> kafkaTemplate;
/**
*
* @param event
*/
public void publish(AggregateEvent<T> event) {
this.kafkaTemplate.send("eventTopic", event.getPayload());
eventCacheHandler.updateStatus(event.getEventId(), Status.NEW);
}
}
public class EventConsumer<T> {
private final Logger logger = LoggerFactory.getLogger(EventConsumer.class);
@KafkaListener(topics = "eventTopic", groupId = "group_id")
public void consume(AggregateEvent<T> event){
logger.info(String.format("$$ -> Consumed Message -> %s",message));
eventCacheHandler.updateStatus(event.getEventId(), Status.PUBLISHED);
}
}