 
                            Download the latest ZooKeeper from Apache Mirror. http://apachemirror.wuchna.com/zookeeper/stable/
rename zoo_sample.cfg zoo.cfgChange The dataDir=/tmp/zookeeper field in zoo.cfg to dataDir=../logs
Set the java path and run zkServer from bin
set JAVA="C:\Program Files\Java\jdk1.8.0_144\bin\java.exe"
zkServerDownload Kafka and go to its windows folder. Run kafka in powershell if it doesn't open in cmd.
cd kafka\bin\windows
.\kafka-server-start.bat ..\..\config\server.propertiesIf you see the error, line is too long refer to this question.
https://stackoverflow.com/questions/48834927/the-input-line-is-too-long-when-starting-kafka
    <parent>
        <groupId>org.springframework.boot</groupId>
        <artifactId>spring-boot-starter-parent</artifactId>
        <version>2.1.9.RELEASE</version>
    </parent>
    <dependencies>
        <dependency>
            <groupId>org.springframework.boot</groupId>
            <artifactId>spring-boot-starter</artifactId>
        </dependency>
        <dependency>
            <groupId>org.springframework.kafka</groupId>
            <artifactId>spring-kafka</artifactId>
        </dependency>
    </dependencies>@Configuration
public class KafkaProducerConfig {
    private final String BOOTSTRAP_ADDRESS = "localhost:9092";
    @Bean
    public ProducerFactory<String, Object> producerFactory() {
        Map<String, Object> configProps = new HashMap<>();
        configProps.put(
                ProducerConfig.BOOTSTRAP_SERVERS_CONFIG,
                BOOTSTRAP_ADDRESS);
        configProps.put(
                ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG,
                StringSerializer.class);
        configProps.put(
                ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG,
                JavaSerializer.class);
        return new DefaultKafkaProducerFactory<>(configProps);
    }
    @Bean
    public KafkaTemplate<String, Object> kafkaTemplate() {
        return new KafkaTemplate<>(producerFactory());
    }
}Serialize Java Object to Kafka
The provided serializers only work for primitive types. So we need to create a custom serializer.
public class JavaSerializer<T> implements Serializer<T> {
    @Override
    public void configure(Map<String, ?> map, boolean b) {
    }
    @Override
    public byte[] serialize(String s, T object) {
        ByteArrayOutputStream bos = new ByteArrayOutputStream();
        ObjectOutputStream out = null;
        try {
            out = new ObjectOutputStream(bos);
            out.writeObject(object);
            out.flush();
            byte[] yourBytes = bos.toByteArray();
            return yourBytes;
        } catch (IOException e) {
            e.printStackTrace();
        } finally {
            try {
                bos.close();
            } catch (IOException ex) {
                // ignore close exception
            }
        }
        return new byte[0];
    }
    @Override
    public void close() {
    }
}
@Repository
public class EventPublisher<T> {
    private final Logger logger = LoggerFactory.getLogger(EventPublisher.class);
    @Autowired
    private KafkaTemplate<String, T> kafkaTemplate;
    /**
     *
     * @param event
     */
    public void publish(AggregateEvent<T> event) {
        this.kafkaTemplate.send("eventTopic", event.getPayload());
        eventCacheHandler.updateStatus(event.getEventId(), Status.NEW);
    }
}
public class EventConsumer<T> {
    private final Logger logger = LoggerFactory.getLogger(EventConsumer.class);
    @KafkaListener(topics = "eventTopic", groupId = "group_id")
    public void consume(AggregateEvent<T> event){
        logger.info(String.format("$$ -> Consumed Message -> %s",message));
        eventCacheHandler.updateStatus(event.getEventId(), Status.PUBLISHED);
    }
}