Here are some tips, including the required configurations and code samples, to help you implement a Kafka consumer in Java.
Required dependencies
Ensure that you have the following dependencies in your project:
<dependency>
<groupId>org.springframework.kafka</groupId>
<artifactId>spring-kafka</artifactId>
</dependency>
<dependency>
<groupId>io.strimzi</groupId>
<artifactId>kafka-oauth-client</artifactId>
<version>0.6.1</version>
</dependency>
<dependency>
<groupId>org.apache.kafka</groupId>
<artifactId>kafka-clients</artifactId>
<version>2.5.1</version>
</dependency>
<dependency>
<groupId>io.opentracing.contrib</groupId>
<artifactId>opentracing-kafka-client</artifactId>
<version>0.1.13</version>
</dependency>
Configuration
Ensure that you have the following configuration in your application.yml
or application.properties
file:
spring:
kafka:
bootstrap-servers: localhost:9092
security.protocol: "PLAINTEXT"
consumer:
key-serializer: org.apache.kafka.common.serialization.StringSerializer
value-serializer: org.springframework.kafka.support.serializer.JsonSerializer
properties:
max:
partition:
fetch:
bytes: ${KAFKA_MESSAGE_MAX_BYTES:52428800}
kafka:
consumer:
group-id:
...
threads:
...
Code sample for a Kafka Listener
Here’s an example of a Kafka listener method:
@KafkaListener(topics = "TOPIC_NAME_HERE")
public void listen(ConsumerRecord<String, String> record) throws JsonProcessingException {
SomeDTO request = objectMapper.readValue(record.value(), SomeDTO.class);
}