Reading:
Run and Connect to Kafka on Windows

Run and Connect to Kafka on Windows

Metamug
Run and Connect to Kafka on Windows

Run Zookeeper on Windows

Download the latest ZooKeeper from Apache Mirror. http://apachemirror.wuchna.com/zookeeper/stable/

rename zoo_sample.cfg zoo.cfg

Change The dataDir=/tmp/zookeeper field in zoo.cfg to dataDir=../logs

Set the java path and run zkServer from bin

set JAVA="C:\Program Files\Java\jdk1.8.0_144\bin\java.exe"
zkServer

Run Kafka

Download Kafka and go to its windows folder. Run kafka in powershell if it doesn't open in cmd.

cd kafka\bin\windows
.\kafka-server-start.bat ..\..\config\server.properties

If you see the error, line is too long refer to this question.

https://stackoverflow.com/questions/48834927/the-input-line-is-too-long-when-starting-kafka

Spring Boot Kafka Configuration


    <parent>
        <groupId>org.springframework.boot</groupId>
        <artifactId>spring-boot-starter-parent</artifactId>
        <version>2.1.9.RELEASE</version>
    </parent>

    <dependencies>

        <dependency>
            <groupId>org.springframework.boot</groupId>
            <artifactId>spring-boot-starter</artifactId>
        </dependency>

        <dependency>
            <groupId>org.springframework.kafka</groupId>
            <artifactId>spring-kafka</artifactId>
        </dependency>

    </dependencies>

Connecting to Kafka in Java

@Configuration
public class KafkaProducerConfig {

    private final String BOOTSTRAP_ADDRESS = "localhost:9092";

    @Bean
    public ProducerFactory<String, Object> producerFactory() {
        Map<String, Object> configProps = new HashMap<>();
        configProps.put(
                ProducerConfig.BOOTSTRAP_SERVERS_CONFIG,
                BOOTSTRAP_ADDRESS);
        configProps.put(
                ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG,
                StringSerializer.class);
        configProps.put(
                ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG,
                JavaSerializer.class);
        return new DefaultKafkaProducerFactory<>(configProps);
    }

    @Bean
    public KafkaTemplate<String, Object> kafkaTemplate() {
        return new KafkaTemplate<>(producerFactory());
    }
}

Serialize Java Object to Kafka

The provided serializers only work for primitive types. So we need to create a custom serializer.

public class JavaSerializer<T> implements Serializer<T> {

    @Override
    public void configure(Map<String, ?> map, boolean b) {

    }

    @Override
    public byte[] serialize(String s, T object) {
        ByteArrayOutputStream bos = new ByteArrayOutputStream();
        ObjectOutputStream out = null;
        try {
            out = new ObjectOutputStream(bos);
            out.writeObject(object);
            out.flush();
            byte[] yourBytes = bos.toByteArray();
            return yourBytes;
        } catch (IOException e) {
            e.printStackTrace();
        } finally {
            try {
                bos.close();
            } catch (IOException ex) {
                // ignore close exception
            }
        }
        return new byte[0];
    }

    @Override
    public void close() {

    }
}

Creating Producer


@Repository
public class EventPublisher<T> {

    private final Logger logger = LoggerFactory.getLogger(EventPublisher.class);

    @Autowired
    private KafkaTemplate<String, T> kafkaTemplate;

    /**
     *
     * @param event
     */
    public void publish(AggregateEvent<T> event) {
        this.kafkaTemplate.send("eventTopic", event.getPayload());
        eventCacheHandler.updateStatus(event.getEventId(), Status.NEW);
    }

}

Creating Consumer


public class EventConsumer<T> {

    private final Logger logger = LoggerFactory.getLogger(EventConsumer.class);

    @KafkaListener(topics = "eventTopic", groupId = "group_id")
    public void consume(AggregateEvent<T> event){
        logger.info(String.format("$$ -> Consumed Message -> %s",message));
        eventCacheHandler.updateStatus(event.getEventId(), Status.PUBLISHED);
    }

}


Icon For Arrow-up
Comments

Post a comment