Kafka 도입한 배경
💬 메인 서버와 SSE 서버가 분리되면서 서버 간 데이터를 전송하고 받을 수 있는 수단이 필요했습니다. 후보군으로 나왔던 것이 Redis와 Kafka였습니다.
✔ 결정 기준
아래와 같은 이유로 Kafka를 사용하기로 했습니다.
1. 입찰 데이터를 전송이 실패할 경우 재시도가 가능해야 했기에 데이터가 보존되어야 했습니다.
➡ Redis의 경우 데이터 전송 후 사라집니다.
2. 비동기식으로 처리가 되어야 하며, 읽기/쓰기가 빨라야 했습니다.
➡ Redis의 경우 다른 소비자에게 데이터를 보내기 전에 응답을 기다려야 합니다.
3. 이후에 Scale-out을 고려했을 때 병렬적으로 데이터를 가져갈 수 있어야 했습니다.
➡ Redis의 경우 병렬성을 제공하지 않는다고 합니다.
Redis와 Kafka 비교 - 게시/구독 메시징 시스템 간의 차이점 - AWS
Apache Kafka는 대규모 데이터 세트를 스트리밍하고 높은 복구 성능이 필요한 애플리케이션을 구축하는 데 더 적합합니다. 전달되는 수조 개의 메시지를 처리할 수 있는 단일 분산 데이터 파이프라
aws.amazon.com
Kafka 도입하기
💬 우리 프로젝트에 Kafka를 사용해 Producer/Consumer 형태로 구현하기로 했습니다. 기존 서버 Producer, Broker 서버, SSE 서버 Consumer로 구성했습니다.
✔ Broker 서버 구축
1. Kafka를 사용하기 위해 Java가 필요합니다.
➡ Kafka 최신버전에 지원하는 17 버전으로 설치
➡ sudo apt-get install openjdk-17-jdk
2. Kafka 다운로드 및 압축 풀기
➡ wget https://downloads.apache.org/kafka/3.5.1/kafka_2.13-3.5.1.tgz
➡ tar xvf kafka_2.13-3.5.1.tgz
3. zookeeper 실행
➡ cd /home/ubuntu/kafka_2.13-3.5.1
➡ ./bin/zookeeper-server-start.sh -daemon ./kafka_2.13-3.5.1/config/zookeeper.properties
4. kafka 실행
➡ ./bin/kafka-server-start.sh -daemon ./config/server.properties
✔ Producer 설정
spring :
kafka: bootstrap-servers: {Kafka Server IP}
producer:
key-serializer: org.apache.kafka.common.serialization.StringSerializer
value-serializer: org.apache.kafka.common.serialization.StringSerializer
✔ Producer 코드
@Service
public class KafkaProducerService {
private final KafkaTemplate<String, String> kafkaTemplate;
public KafkaProducerService(KafkaTemplate<String, String> kafkaTemplate) {
this.kafkaTemplate = kafkaTemplate;
}
public void sendMessage(String topic, String message) {
kafkaTemplate.send(topic, message);
}
}
✔ Consumer 설정
spring :
kafka: bootstrap-servers: {Kafka Server IP}
consumer:
key-serializer: org.apache.kafka.common.serialization.StringSerializer
value-serializer: org.apache.kafka.common.serialization.StringSerializer
auto-offset-reset: latest
group-id: sse_group
✔ Consumer 코드
@Service
@RequiredArgsConstructor
public class KafkaConsumer {
private final SseService sseService;
@KafkaListener(topics = {"bid", "notice"} )
public void listenBid(String message) {
SseEvent sseEvent = SseEvent.valueOf(message.split("#")[0]);
String content = message.split("#")[1];
Long id = Long.valueOf(message.split("#")[2]);
sseService.send(sseEvent.getSseConnect(), sseEvent, id, content);
}
}
'개발일지 > TIL' 카테고리의 다른 글
[230913] Scale-out 환경에서 Scheduler 중복으로 실행되는 문제 (0) | 2023.09.13 |
---|---|
[230901] 간단한 화면 구현 (0) | 2023.09.01 |
[230829] SSE 서버 분리하기 (0) | 2023.08.29 |
[230828] 입찰 API 응답 Average Latency 속도 문제 (0) | 2023.08.28 |
[230827] 입찰 API 병목 현상 로직 수정 (0) | 2023.08.27 |