Run and Connect to Kafka on Windows

Run Zookeeper on Windows

Download the latest ZooKeeper from Apache Mirror. http://apachemirror.wuchna.com/zookeeper/stable/

rename zoo_sample.cfg zoo.cfg

Change The dataDir=/tmp/zookeeper field in zoo.cfg to dataDir=../logs

Set the java path and run zkServer from bin

set JAVA="C:\Program Files\Java\jdk1.8.0_144\bin\java.exe"
zkServer

Run Kafka

Download Kafka and go to its windows folder. Run kafka in powershell if it doesn't open in cmd.

cd kafka\bin\windows
.\kafka-server-start.bat ..\..\config\server.properties

If you see the error, line is too long refer to this question.

https://stackoverflow.com/questions/48834927/the-input-line-is-too-long-when-starting-kafka

Spring Boot Kafka Configuration


    <parent>
        <groupId>org.springframework.boot</groupId>
        <artifactId>spring-boot-starter-parent</artifactId>
        <version>2.1.9.RELEASE</version>
    </parent>

    <dependencies>

        <dependency>
            <groupId>org.springframework.boot</groupId>
            <artifactId>spring-boot-starter</artifactId>
        </dependency>

        <dependency>
            <groupId>org.springframework.kafka</groupId>
            <artifactId>spring-kafka</artifactId>
        </dependency>

    </dependencies>

Connecting to Kafka in Java

@Configuration
public class KafkaProducerConfig {

    private final String BOOTSTRAP_ADDRESS = "localhost:9092";

    @Bean
    public ProducerFactory<String, Object> producerFactory() {
        Map<String, Object> configProps = new HashMap<>();
        configProps.put(
                ProducerConfig.BOOTSTRAP_SERVERS_CONFIG,
                BOOTSTRAP_ADDRESS);
        configProps.put(
                ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG,
                StringSerializer.class);
        configProps.put(
                ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG,
                JavaSerializer.class);
        return new DefaultKafkaProducerFactory<>(configProps);
    }

    @Bean
    public KafkaTemplate<String, Object> kafkaTemplate() {
        return new KafkaTemplate<>(producerFactory());
    }
}

Serialize Java Object to Kafka

The provided serializers only work for primitive types. So we need to create a custom serializer.

public class JavaSerializer<T> implements Serializer<T> {

    @Override
    public void configure(Map<String, ?> map, boolean b) {

    }

    @Override
    public byte[] serialize(String s, T object) {
        ByteArrayOutputStream bos = new ByteArrayOutputStream();
        ObjectOutputStream out = null;
        try {
            out = new ObjectOutputStream(bos);
            out.writeObject(object);
            out.flush();
            byte[] yourBytes = bos.toByteArray();
            return yourBytes;
        } catch (IOException e) {
            e.printStackTrace();
        } finally {
            try {
                bos.close();
            } catch (IOException ex) {
                // ignore close exception
            }
        }
        return new byte[0];
    }

    @Override
    public void close() {

    }
}

Creating Producer


@Repository
public class EventPublisher<T> {

    private final Logger logger = LoggerFactory.getLogger(EventPublisher.class);

    @Autowired
    private KafkaTemplate<String, T> kafkaTemplate;

    /**
     *
     * @param event
     */
    public void publish(AggregateEvent<T> event) {
        this.kafkaTemplate.send("eventTopic", event.getPayload());
        eventCacheHandler.updateStatus(event.getEventId(), Status.NEW);
    }

}

Creating Consumer


public class EventConsumer<T> {

    private final Logger logger = LoggerFactory.getLogger(EventConsumer.class);

    @KafkaListener(topics = "eventTopic", groupId = "group_id")
    public void consume(AggregateEvent<T> event){
        logger.info(String.format("$$ -> Consumed Message -> %s",message));
        eventCacheHandler.updateStatus(event.getEventId(), Status.PUBLISHED);
    }

}