Spring/Kafka

[Kafka] Kafka 메시지 전송, 소비 실습

챛채 2025. 4. 5. 02:37

이번 글에서는 Kafka를 직접 구성하고 실습해서 메시지를 어떻게 발행하고, 소비하고, 어떤 흐름으로 구성되는지 알아보려고 한다! 

 

1. Kafka 환경 설정

설치는 도커 컴포즈를 사용하여 kafka 컨테이너를 생성하였다.

version: '3.8'
services:
  zookeeper:
    image: bitnami/zookeeper:3.8
    platform: linux/amd64
    ports:
      - "2181:2181"
    environment:
      ZOOKEEPER_CLIENT_PORT: 2181
      ZOOKEEPER_TICK_TIME: 2000
      ALLOW_ANONYMOUS_LOGIN: "yes"

  kafka:
    image: bitnami/kafka:3.6
    platform: linux/amd64
    ports:
      - "9092:9092"
    environment:
      KAFKA_ADVERTISED_LISTENERS: INSIDE://kafka:29092,OUTSIDE://localhost:9092
      KAFKA_LISTENER_SECURITY_PROTOCOL_MAP: INSIDE:PLAINTEXT,OUTSIDE:PLAINTEXT
      KAFKA_LISTENERS: INSIDE://0.0.0.0:29092,OUTSIDE://0.0.0.0:9092
      KAFKA_INTER_BROKER_LISTENER_NAME: INSIDE
      KAFKA_ZOOKEEPER_CONNECT: zookeeper:2181
    volumes:
      - /var/run/docker.sock:/var/run/docker.sock

  kafka-ui:
    image: provectuslabs/kafka-ui:latest
    platform: linux/amd64
    ports:
      - "8080:8080"
    environment:
      KAFKA_CLUSTERS_0_NAME: local
      KAFKA_CLUSTERS_0_BOOTSTRAPSERVERS: kafka:29092
      KAFKA_CLUSTERS_0_ZOOKEEPER: zookeeper:2181
      KAFKA_CLUSTERS_0_READONLY: "false"

 

2. Kafka Producer 만들기

1. 구조 요약

- 'ProoducerService' -> Kafka로 메시지 전송

- 'ProducerController' -> 메시지 전송 API 제공

- 'ProducerApplicationKafkaConfig' -> Kafka 설정

 

2. Kafka 설정 (application.properties)

spring.application.name=producer
server.port=8090

spring.kafka.bootstrap-servers=localhost:9092
spring.kafka.producer.key-serializer=org.apache.kafka.common.serialization.StringSerializer
spring.kafka.producer.value-serializer=org.apache.kafka.common.serialization.StringSerializer
  • bootstrap-servers: Kafka 서버의 주소
  • key/value-serializer: 메시지를 Kafka에 보내기 전 직렬화 방식 (여기서는 String)

 

3. Kafka 설정 클래스 (ProducerApplicationKafkaConfig.java)

@Configuration
public class ProducerApplicationKafkaConfig {
    @Bean
    public ProducerFactory<String, String> producerFactory() {
        Map<String, Object> configProps = new HashMap<>();
        configProps.put(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG, "localhost:9092");
        configProps.put(ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG, StringSerializer.class);
        configProps.put(ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG, StringSerializer.class);
        return new DefaultKafkaProducerFactory<>(configProps);
    }

    @Bean
    public KafkaTemplate<String, String> kafkaTemplate() {
        return new KafkaTemplate<>(producerFactory());
    }
}
  • Kafka 메시지를 보내기 위해 KafkaTemplate을 사용한다.
  • 이 설정은 ProducerFactory를 만들어서 KafkaTemlplate에 주입하는 방식이다.
  • 메시지를 String으로 전송하기 위해 StringSerializer를 사용했다.

4. 메시지 전송 로직 (ProducerService.java)

@Service
@RequiredArgsConstructor
public class ProducerService {

    private final KafkaTemplate<String, String> kafkaTemplate;

    public void sendMessage(String topic , String key, String message) {
        for (int i = 0; i < 10; i++) {
            kafkaTemplate.send(topic, key, message + " " + i);
        }
    }
}
  • KafkaTemplate.send(topic, key, message)를 통해 메시지를 Kafka에 전송한다.
  • 반복문으로 10개의 메시지를 전송함으로써 Kafka의 메시지 처리 확인을 쉽게 할 수 있다.
  • key를 사용하면 Kafka 내부에서 같은 파티션에 할당되어 순서 유지도 가능

5. 메시지 전송 API(ProducerControlelr.java)

@RestController
@RequiredArgsConstructor
public class ProducerController {

    private final ProducerService producerService;

    @GetMapping("/send")
    public String sendMessage(@RequestParam("topic") String topic,
                              @RequestParam("key") String key,
                              @RequestParam("message") String message) {
        producerService.sendMessage(topic, key, message);
        return "Message sent to Kafka topic";
    }
}

 

  • 테스트 예시
GET http://localhost:8090/send?topic=topic1&key=user1&message=Hello Kafka
  • 이 API 호출하면 지정한 topic, key, message로 메시지를 Kafka에 전송한다.

3. Kafka Consumer 만들기

1. Kafka Consumer 설정 (application.properties)

spring.application.name=consumer
server.port=8091

spring.kafka.bootstrap-servers=localhost:9092
spring.kafka.consumer.key-deserializer=org.apache.kafka.common.serialization.StringDeserializer
spring.kafka.consumer.value-deserializer=org.apache.kafka.common.serialization.StringDeserializer
  • Kafka 브로커 주소 : localhost:9092
  • Key/Value 역직렬화 : Kafka가 메시지를 문자열로 읽을 수 있게 해주는 설정
  • application.name과 server.port는 Consumer 애플리케이션 자체 설정

2. Kafka 설정 클래스(ConsumerApplicationKafkaConfig.java)

// 이 클래스는 Kafka 컨슈머 설정을 위한 Spring 설정 클래스
@EnableKafka // Kafka 리스너를 활성화하는 어노테이션입니다.
@Configuration // Spring 설정 클래스로 선언하는 어노테이션
public class ConsumerApplicationKafkaConfig {

    // Kafka 컨슈머 팩토리를 생성하는 빈을 정의
    // ConsumerFactory는 Kafka 컨슈머 인스턴스를 생성하는 데 사용   
    // 각 컨슈머는 이 팩토리를 통해 생성된 설정을 기반으로 작동     
    @Bean
    public ConsumerFactory<String, String> consumerFactory() {
        // 컨슈머 팩토리 설정을 위한 맵을 생성
        Map<String, Object> configProps = new HashMap<>();
        // Kafka 브로커의 주소를 설정합니다.
        configProps.put(ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG, "localhost:9092");
        // 메시지 키의 디시리얼라이저 클래스를 설정
        configProps.put(ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG, StringDeserializer.class);
        // 메시지 값의 디시리얼라이저 클래스를 설정
        configProps.put(ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG, StringDeserializer.class);
        // 설정된 프로퍼티로 DefaultKafkaConsumerFactory를 생성하여 반환
        return new DefaultKafkaConsumerFactory<>(configProps);
    }

    // Kafka 리스너 컨테이너 팩토리를 생성하는 빈을 정의
    // ConcurrentKafkaListenerContainerFactory는 Kafka 메시지를 비동기적으로 수신하는 리스너 컨테이너를 생성하는 데 사용
    // 이 팩토리는 @KafkaListener 어노테이션이 붙은 메서드들을 실행할 컨테이너를 제공
    @Bean
    public ConcurrentKafkaListenerContainerFactory<String, String> kafkaListenerContainerFactory() {
        // ConcurrentKafkaListenerContainerFactory를 생성
        ConcurrentKafkaListenerContainerFactory<String, String> factory = new ConcurrentKafkaListenerContainerFactory<>();
        // 컨슈머 팩토리를 리스너 컨테이너 팩토리에 설정
        factory.setConsumerFactory(consumerFactory());
        // 설정된 리스너 컨테이너 팩토리를 반환
        return factory;
    }
}

 

 

3. 메시지 소비 서비스(ConsumerEndpoint.java)

@Slf4j
@Service
public class ConsumerEndpoint {

    // 이 메서드는 Kafka에서 메세지를 받아서 처리하는 역할
    // @KafkaListener 어노테이션은 이 메서드를 Kafka 리스너로 설정
    @KafkaListener(groupId = "group_a", topics = "topic1")
    // Kafka 토픽 "test-topic"에서 메시지를 수신하면 이 메서드가 호출
    // groupId는 컨슈머 그룹을 지정하여 동일한 그룹에 속한 다른 컨슈머와 메시지를 분배
    public void consumeFromGroupA(String message) {
        log.info("Group A consumed message from topic1: " + message);
    }

    // 동일한 토픽을 다른 그룹 ID로 소비하는 또 다른 리스너 메서드
    @KafkaListener(groupId = "group_b", topics = "topic1")
    public void consumeFromGroupB(String message) {
        log.info("Group B consumed message from topic1: " + message);
    }

    // 다른 토픽을 다른 그룹 ID로 소비하는 리스너 메서드
    @KafkaListener(groupId = "group_c", topics = "topic2")
    public void consumeFromTopicC(String message) {
        log.info("Group C consumed message from topic2: " + message);
    }

    // 다른 토픽을 다른 그룹 ID로 소비하는 리스너 메서드
    @KafkaListener(groupId = "group_c", topics = "topic3")
    public void consumeFromTopicD(String message) {
        log.info("Group C consumed message from topic3: " + message);
    }

    @KafkaListener(groupId = "group_d", topics = "topic4")
    public void consumeFromPartition0(String message) {
        log.info("Group D consumed message from topic4: " + message);
    }
}

 

  • 테스트 예시
GET http://localhost:8090/send?topic=topic1&key=user1&message=Hello Kafka

 

4. 확인

  • 두 애플리케이션을 실행한 후 Kafka ui를 확인해보면 Topic 탭과 Consumers 탭을 확인해보면 잘 생성이 된 것을 볼 수 있다.

 

  • topic을 test-topic으로 지정하고 요청해보고 kafka ui > Topics > Messages에 접속하면 방금 요청한 토픽으로 발행된 메시지를 볼 수 있다.

  • topic을 topic1으로 지정하고 요청하고 컨슈머 애플리케이션의 로그를 보면 GroupA와 GroupB가 메시지를 수신한 것을 볼 수 있다. 따라서 같은 토픽을 가지고 그룹이 다르면 메시지를 각 그룹마다 수신한다는 것을 알 수 있다.

 

  • topic을 topic2으로 지정하고 요청한 후 컨슈머 어플리케이션의 로그를 보면 GroupC이고 topic이 2인 리스너가 메시지를 수신한 것을 볼 수 있다.

 

  • topic을 topic3로 지정하고 요청한 후 컨슈머 어플리케이션의 로그를 보면 GroupC이고 topic이 3인 리스너가 메시지를 수신한 것을 볼 수 있다.

 

  • topic을 topic4로 지정하고 요청한 후 컨슈머 어플리케이션의 로그를 보면 GroupD이고 topic이 4인 리스너가 메시지를 수신한 것을 볼 수 있다.