FireDrago
[채팅 분석] 7. 안정적인 대규모 실시간 채팅 수집 완성 (2) 본문
지난 1편에서는 수백 개의 동시 웹소켓 연결을 효율적으로 관리하기 위해 스프링 이벤트와 자바 가상 스레드를 도입한 과정을 소개했다. 이번 2편에서는 시스템의 안정성과 데이터 파이프라인의 완성도를 높이는 실전 구현 디테일을 다룬다. 불안정한 네트워크 환경에서도 데이터를 안정적으로 수집하고, 수집된 데이터를 안전하게 다음 단계로 전달하는 과정을 살펴보자.
1. 안정적인 연결 생명주기: 지수 백오프(Exponential Backoff) 재연결 전략
실시간 데이터를 수집하는 시스템에서 가장 중요한 것은 '연결의 안정성'이다. 웹소켓 연결은 다양한 이유(일시적인 네트워크 단절, 서버 재시작 등)로 예기치 않게 끊어질 수 있다. 이때, 무작정 즉시 재연결을 시도하면 오히려 서버에 부담을 주거나 계속 실패할 수 있다. 이런 문제를 해결하기 위해 지수 백오프 전략을 도입했다. 핵심은 재연결에 실패할수록 다음 재시도까지의 대기 시간을 점차 늘리는 것이다.
// 지수 백오프의 기본 로직
retryCount = 0;
while (isNotConnected) {
try {
connect();
} catch (Exception e) {
retryCount++;
long delay = calculateDelay(retryCount); // 1s, 2s, 4s, 8s...
sleep(delay);
}
}
`ChatConnectionManager` 클래스는 이 전략을 구현하여 개별 웹소켓 연결의 생명주기를 관리하고, 불안정한 연결을 복구하는 역할을 담당한다. 연결이 끊어지거나 오류가 발생하면 `scheduleReconnect`메서드(재연결)가 호출된다.
// ChatConnectionManager 클래스
private void scheduleReconnect() {
// 1. 중복 실행 방지 및 상태 체크
if (isManualDisconnect || !isReconnecting.compareAndSet(false, true)) return;
// 2. 백오프 계산 및 횟수 증가
long delayMillis = calculateBackoffDelay();
retryCount++;
// 3. 가상 스레드로 비동기 재연결 (핵심!)
Thread.ofVirtual().name("reconnect-" + chatChannelId).start(() -> {
try {
log.info("[{}] {}ms 후 재연결 시도", chatChannelId, delayMillis);
Thread.sleep(delayMillis); // 가상 스레드라 블로킹 부담 없음
connect();
} catch (InterruptedException e) {
Thread.currentThread().interrupt();
} finally {
isReconnecting.set(false);
}
});
}
private long calculateBackoffDelay() {
return Math.min(1000L * (1L << Math.min(retryCount, 5)), 30000L);
}
이 코드는 `calculateBackoffDelay`를 통해 재시도 횟수가 늘어날수록 대기 시간을 1초, 2초, 4초, ... 와 같이 지수적으로 늘려 최대 30초까지 대기한다. 덕분에 일시적인 문제 발생 시 시스템에 과도한 부하를 주지 않고 안정적으로 연결을 복구할 수 있다.
특히 `Thread.sleep()`으로 대기하는 동안 플랫폼 스레드를 낭비하지 않도록 재연결 로직 전체를 가상 스레드 위에서 실행한 점이 핵심이다. 수백 개의 채널이 동시에 재연결을 시도하더라도 시스템 리소스를 효율적으로 사용할 수 있는 이유다.
2. 가상 스레드 전용 HttpClient: 수천 개의 태스크를 가볍게 처리하기
웹소켓 연결뿐만 아니라, 방송 정보를 얻기 위한 API 호출 등 백그라운드에서 발생하는 수많은 HTTP 요청 또한 가상 스레드로 처리해야 시스템 전체의 성능을 극대화할 수 있다. 기존의 HttpClient는 내부적으로 고정된 개수의 플랫폼 스레드 풀을 사용하는 경우가 많아, 가상 스레드 환경에서는 오히려 병목점이 될 수 있다. 이 문제를 해결하기 위해, 모든 HTTP 요청마다 새로운 가상 스레드를 할당하는 전용 `ExecutorService`를 생성하고, 이를 `HttpClient`에 주입했다.
// HttpClientConfig (가상 스레드 실행기 설정)
@Configuration
public class HttpClientConfig {
@Bean
public ExecutorService virtualThreadExecutor() {
return Executors.newVirtualThreadPerTaskExecutor();
}
@Bean
public HttpClient httpClient(ExecutorService executorService) {
return HttpClient.newBuilder()
.executor(executorService)
.build();
}
}
`Executors.newVirtualThreadPerTaskExecutor()`는 각 태스크를 새로운 가상 스레드에서 실행하는 `ExecutorService`를 생성한다. 이 Executor를 HttpClient의 기본 실행기로 설정함으로써, HttpClient를 통해 발생하는 모든 비동기 작업(API 호출, 웹소켓 연결 등)이 가상 스레드 위에서 가볍게 처리된다. 이제 I/O 대기가 긴 작업이 플랫폼 스레드를 점유하며 시스템 전체를 느리게 만드는 현상을 원천적으로 방지할 수 있다.
3. 데이터 파이프라인 완성: Kafka로 안전하게 데이터 전송하기
이제 안정적으로 웹소켓 연결을 맺고, 수집한 채팅 메시지를 다음 분석 단계로 전달할 차례다.
여기서 Kafka를 도입하여 Producer(수집기)와 Consumer(분석기) 사이의 의존성을 완전히 분리했다. 분석 로직이 고도화됨에 따라 새로운 모듈이 추가되더라도 기존 수집 로직은 전혀 수정할 필요가 없는 유연한 확장성을 확보하고 싶었기 때문이다. 또한, 실시간으로 폭주하는 채팅 데이터의 수집 속도와 상대적으로 무거운 분석 처리 속도 사이의 불균형을 해결하기 위한 완충 지대로서 Kafka가 최적의 선택지라고 판단했다.
`ChzzkChatCollector`는 `ChatConnectionManager`로부터 채팅 메시지를 전달받아 Kafka로 전송하는 최종 관문 역할을 한다.
// ChzzkChatCollector.java (Kafka 전송 로직)
@Slf4j
@RequiredArgsConstructor
public class ChzzkChatCollector implements ChatMessageListener {
private final String streamId;
private final KafkaTemplate<String, ChatMessage> kafkaTemplate;
@Override
public void onMessages(List<ChatMessage> messages) {
messages.forEach(msg -> {
kafkaTemplate.send("chat-messages", streamId, msg);
log.debug("[{}] kafka 전송 완료: {}", streamId, msg.message());
});
}
// ... (기타 콜백 메서드)
}
수집된 메시지(ChatMessage)는 Spring의 KafkaTemplate을 통해 chat-messages라는 토픽으로 전송된다. 메시지 키로는 streamId를 사용하여, 특정 방송의 메시지들이 동일한 파티션에 순서대로 저장되도록 보장했다. 이를 통해 후속 컨슈머들이 각 방송별 채팅 흐름을 순서대로 처리할 수 있게 되었다.
지금까지 2편에 걸쳐 이벤트 기반 아키텍처, 가상 스레드, 지수 백오프, 그리고 Kafka를 활용하여 대규모 실시간 채팅 데이터를 안정적으로 수집하는 시스템을 구축하는 과정을 살펴보았다.
- 모듈 분리: 스프링 이벤트를 통해 모듈 간 결합도를 낮춰 유연한 아키텍처를 구현했다.
- 동시성 확보: 가상 스레드를 전면적으로 도입하여 최소한의 리소스로 수백 개의 동시 I/O 작업을 처리했다.
- 안정성 강화: 지수 백오프 재연결 전략으로 예기치 않은 연결 끊김에 효과적으로 대응했다.
- 데이터 파이프라인 구축: Kafka를 이용해 수집한 데이터를 후속 분석 시스템으로 안정적으로 전달하는 통로를 마련했다.
이제 데이터 수집의 파이프라인이 완성되었으므로, 다음 단계는 Kafka로 들어온 채팅 메시지를 실시간으로 분석하여 의미 있는 하이라이트를 추출하는 것이다. 다음 시리즈에서는 Redis TimeSeries를 활용한 실시간 채팅 화력 분석과 하이라이트 감지 로직 구현에 대해 살펴보자
'프로젝트' 카테고리의 다른 글
| [트러블 슈팅] 비동기 코드가 동기적으로 처리되는 이유 (Java Stream, CompletableFuture) (0) | 2026.02.14 |
|---|---|
| Spring Boot 4.0 & Kafka 4.x NoClassDefFoundError 해결하기 (0) | 2026.02.07 |
| [채팅 분석] 6. 가상 스레드로 구축하는 대규모 실시간 채팅 수집 (1) (0) | 2026.01.23 |
| [채팅 감정 분석] 5. MVP 완성 및 트러블 슈팅 (설계 변경) (0) | 2025.12.25 |
| [채팅 감정 분석] 4. 실시간 분석 데이터 파이프라인 설계 (0) | 2025.12.21 |
