본문 바로가기

개발일지/TIL

[230830] 서버간 통신을 위해 Kafka 적용

Kafka 도입한 배경

💬 메인 서버와 SSE 서버가 분리되면서 서버 간 데이터를 전송하고 받을 수 있는 수단이 필요했습니다. 후보군으로 나왔던 것이 Redis와 Kafka였습니다. 

 

✔ 결정 기준

아래와 같은 이유로 Kafka를 사용하기로 했습니다.
1. 입찰 데이터를 전송이 실패할 경우 재시도가 가능해야 했기에 데이터가 보존되어야 했습니다.
    ➡ Redis의 경우 데이터 전송 후 사라집니다.
2. 비동기식으로 처리가 되어야 하며, 읽기/쓰기가 빨라야 했습니다.
    ➡ Redis의 경우 다른 소비자에게 데이터를 보내기 전에 응답을 기다려야 합니다.
3. 이후에 Scale-out을 고려했을 때 병렬적으로 데이터를 가져갈 수 있어야 했습니다.
    ➡ Redis의 경우 병렬성을 제공하지 않는다고 합니다.
 

Redis와 Kafka 비교 - 게시/구독 메시징 시스템 간의 차이점 - AWS

Apache Kafka는 대규모 데이터 세트를 스트리밍하고 높은 복구 성능이 필요한 애플리케이션을 구축하는 데 더 적합합니다. 전달되는 수조 개의 메시지를 처리할 수 있는 단일 분산 데이터 파이프라

aws.amazon.com

Kafka 도입하기

💬 우리 프로젝트에 Kafka를 사용해 Producer/Consumer 형태로 구현하기로 했습니다. 기존 서버 Producer, Broker 서버, SSE 서버 Consumer로 구성했습니다. 

 

✔ Broker 서버 구축

1. Kafka를 사용하기 위해 Java가 필요합니다.
    ➡ Kafka 최신버전에 지원하는 17 버전으로 설치
    ➡ sudo apt-get install openjdk-17-jdk
2. Kafka 다운로드 및 압축 풀기
    ➡ wget https://downloads.apache.org/kafka/3.5.1/kafka_2.13-3.5.1.tgz
    ➡ tar xvf kafka_2.13-3.5.1.tgz
3. zookeeper 실행
     ➡ cd /home/ubuntu/kafka_2.13-3.5.1
     ➡ ./bin/zookeeper-server-start.sh -daemon ./kafka_2.13-3.5.1/config/zookeeper.properties
4. kafka 실행
     ➡ ./bin/kafka-server-start.sh -daemon ./config/server.properties

 

✔ Producer 설정

spring :
    kafka: bootstrap-servers: {Kafka Server IP}
    producer:
        key-serializer: org.apache.kafka.common.serialization.StringSerializer
        value-serializer: org.apache.kafka.common.serialization.StringSerializer

 

✔ Producer 코드

 

@Service
public class KafkaProducerService {

    private final KafkaTemplate<String, String> kafkaTemplate;

    public KafkaProducerService(KafkaTemplate<String, String> kafkaTemplate) {
        this.kafkaTemplate = kafkaTemplate;
    }

    public void sendMessage(String topic, String message) {
        kafkaTemplate.send(topic, message);
    }
}

 

✔ Consumer 설정

spring :
    kafka: bootstrap-servers: {Kafka Server IP}
    consumer:
        key-serializer: org.apache.kafka.common.serialization.StringSerializer
        value-serializer: org.apache.kafka.common.serialization.StringSerializer
        auto-offset-reset: latest
        group-id: sse_group

 

✔ Consumer 코드

 

@Service
@RequiredArgsConstructor
public class KafkaConsumer {

    private final SseService sseService;

    @KafkaListener(topics = {"bid", "notice"} )
    public void listenBid(String message) {
        SseEvent sseEvent = SseEvent.valueOf(message.split("#")[0]);
        String content = message.split("#")[1];
        Long id = Long.valueOf(message.split("#")[2]);

        sseService.send(sseEvent.getSseConnect(), sseEvent, id, content);
    }
}