FireDrago
[트러블 슈팅] 비동기 코드가 동기적으로 처리되는 이유 (Java Stream, CompletableFuture) 본문
문제 상황
@Scheduled(fixedRate = 3000)
public void monitorHighlights() {
// 추적중인 방송id 목록을 조회
List<String> activeStreamIds = streamProvider.getActiveStreamIds();
// 스트림을 사용하여 방송분석 로직을 비동기로 호출하기 위한 코드
// CompletableFuture을 통해 비동기 호출한뒤
// join을 통해 결과를 조합
List<AnalysisSignal> signals = activeStreamIds.stream()
.map(id -> CompletableFuture.supplyAsync(() -> processStream(id), virtualThreadExecutor))
.map(CompletableFuture::join)
.filter(Optional::isPresent)
.map(Optional::get)
.toList();
if (!signals.isEmpty()) {
// 분석결과를 api-server로 https 메시지 발송
signalClient.send(signals);
}
}
private Optional<AnalysisSignal> processStream(String streamId) {
// 채팅 빈도를 조회하여 NORMAL, PEAK 상태를 감지하는 분석로직 호출
// RedisTimeSeries 호출하는 I/O 로직이 실행된다.
}

치지직 채팅 데이터를 통해 하이라이트 상태를 감지하는 로직을 짰다.
`CompletableFuture`와 `Virtual ThreadExecutor`를 썼으니 당연히 비동기 처리가 될것으로 생각했다.
하지만 코드래빗의 리뷰를 통해 위 코드가 동기적으로 작동한다는 사실을 알게됐다. 이유가 뭘까?
원인 분석
문제의 원인은 `CompletableFuture.join()`의 성격과 Stream의 동작 방식이 충돌하기 때문이다.
CompletableFuture. join() 메서드는 Future가 완료될 때까지 현재 스레드를 대기 상태(Blocking)로 만든다.
즉, "미래의 결과"를 "현재의 값"으로 바꾸기 위해 흐름을 강제로 끊는다.
Java의 Stream은 기본적으로 Loop(반복문)다. 앞의 요소가 파이프라인을 완전히 통과해야 다음 요소가 진입할 수 있다.
이 둘이 만나면 아래와 같은 직렬화(Serialization) 현상이 발생한다.
stream.map(id -> CompletableFuture.supplyAsync(...)) // 1. 비동기 작업 시작 (좋음)
.map(CompletableFuture::join) // 2. 즉시 대기 (병목 발생)
- Task A 시작 (비동기 처리) ➔ join()을 만나서 스레드 정지.
- Task A가 끝날 때까지 Task B는 시작조차 못 함. (스트림의 특성)
- Task A 완료 ➔ 그제야 Task B 시작 ➔ 다시 join() 만나서 정지. (결과적 동기적 작동)
결국 비동기 작업을 시키자마자 결과를 내놓으라고 멱살 잡고 기다리는 꼴이 되어, 멀티 스레드 환경에서도 단일 스레드처럼 순차적으로 실행되는 결과를 낳게 된것이다.
해결 방법 : 요청과 대기의 시점 분리
public void monitorHighlights() {
List<String> activeStreamIds = streamProvider.getActiveStreamIds();
// [Step 1] 비동기 작업 시작 (Start)
// Stream을 순회하며 작업을 스레드 풀(Virtual Thread)에 전부 던진다.
// join()을 호출하지 않고, Future만 받아 리스트로 먼저 모아둔다.
List<CompletableFuture<Optional<AnalysisSignal>>> futures = activeStreamIds.stream()
.map(id -> CompletableFuture.supplyAsync(() -> processStream(id), virtualThreadExecutor))
.toList(); // 즉시 모든 작업이 병렬로 시작됨
// [Step 2] 전체 대기 (Wait)
// 모든 작업이 끝날 때까지 메인 스레드는 여기서 딱 한 번만 멈춘다.
// 가장 오래 걸리는 작업 하나만큼의 시간만 소요된다.
CompletableFuture.allOf(futures.toArray(new CompletableFuture[0])).join();
// [Step 3] 결과 수집 (Collect)
// 이미 모든 작업이 완료되었으므로, 여기서의 join()은 대기 시간 없이 즉시 값을 반환한다.
List<AnalysisSignal> signals = futures.stream()
.map(CompletableFuture::join)
.filter(Optional::isPresent)
.map(Optional::get)
.toList();
if (!signals.isEmpty()) {
signalClient.send(signals);
}
}
원인을 파악했으니 해결책은 모든 작업을 먼저 던져놓은 뒤(Non-blocking) 마지막에 한 번만 기다리는(Blocking) 구조로 변경했다.
1. toList()로 Future 수집: Stream은 지연 연산(Lazy Evaluation)을 수행하지만, toList()를 호출하는 순간 모든 요소에 대해 터미널 연산이 수행된다. 즉, 이 시점에 이미 모든 비동기 요청이 virtualThreadExecutor로 전달되어 병렬 실행이 시작된다.
2.CompletableFuture.allOf(...).join(): 여러 개의 Future 중 하나라도 끝나지 않았다면 넘어가지 않도록 막아주는 '동기화 장벽(Barrier)' 역할을 한다. 개별적으로 기다리는 것이 아니라, 전체를 묶어서 한 번만 기다리기 때문에 전체 소요 시간은 개별 작업 중 가장 오래걸리는 시간만큼으로 단축된다.
====== [실험 1] 기존 코드 (순차 처리) ======
VirtualThread[#21]/runnable@ForkJoinPool-1-worker-1 분석 시작: 방송1
VirtualThread[#21]/runnable@ForkJoinPool-1-worker-1 분석 완료: 방송1
VirtualThread[#25]/runnable@ForkJoinPool-1-worker-2 분석 시작: 방송2
VirtualThread[#25]/runnable@ForkJoinPool-1-worker-2 분석 완료: 방송2
VirtualThread[#27]/runnable@ForkJoinPool-1-worker-3 분석 시작: 방송3
VirtualThread[#27]/runnable@ForkJoinPool-1-worker-2 분석 완료: 방송3
👉 총 소요 시간: 3027ms (기대값: 약 3000ms)
====== [실험 2] 개선된 코드 (병렬 처리) ======
VirtualThread[#28]/runnable@ForkJoinPool-1-worker-2 분석 시작: 방송1
VirtualThread[#29]/runnable@ForkJoinPool-1-worker-3 분석 시작: 방송2
VirtualThread[#30]/runnable@ForkJoinPool-1-worker-1 분석 시작: 방송3
VirtualThread[#28]/runnable@ForkJoinPool-1-worker-2 분석 완료: 방송1
VirtualThread[#29]/runnable@ForkJoinPool-1-worker-1 분석 완료: 방송2
VirtualThread[#30]/runnable@ForkJoinPool-1-worker-3 분석 완료: 방송3
👉 총 소요 시간: 1005ms (기대값: 약 1000ms)
기존 코드와 개선된 코드를 테스트 코드를 통해 로직 실행시간을 측정했다.
동기적 처리 (3000ms) 에서 비동기 처리 (1000ms) 만큼 단축된것을 확인했다.
결론 : 비동기 키워드에 속지 말자
1. 스트림의 함정
java stream은 기본적으로 순차적 파이프라인이다. Stream 내부(`map`, `forEach`)에서 `join()` 이나 `get()` 같은 블로킹 메서드를 호출하는것은 병목의 원인이 될 수 있다. 비동기 작업을 시작했다면, 결과를 기다리는 시점은 반드시 스트림 루프 밖이어야 한다.
2. 병렬 처리의 정석 패턴
`CompleatbleFuture`를 리스트 처리할 때는 항상 3단계 패턴을 따라야 한다.
2.1 Request(비동기 처리 시작) : `Stream.map(async).toList()`로 Future 객체를 먼저 확보하여 모든 작업을 실행한다.
2.2 Wait (전체 대기) : `CompletableFuture.allOf(...).join()`을 사용하여 병렬로 실행된 작업이 모두 끝날때 까지 한번만 대기한다.
2.3 Process (결과 처리) : 이미 완료된 Future들에서 `join()`으로 값을 즉시 꺼낸다.
3. VirtualThread의 역할
가상스레드는 스레드 생성 비용을 낮추고 블로킹 시 자원 효율을 높여주는 도구이지, 잘못된 순차 로직을 병렬로 바꿔주는 것은 아니다. 로직자체가 직렬로 작성되어 있다면, 가상 스레드 위에서도 직렬로 동작한다. 올바른 비동기 설계가 있어야 스레드의 높은 처리량을 사용할 수 있다.
+) 현재 jdk25 버전에서는 `CompletableFuture`가 비동기 요청을 담당하는 클래스로 여전히 사용되고 있으나, 메서드 체이닝이 길어지고 내가 겪은 실수를 하는 경우가 많아 `StructuredTaskScope` 라는 새로운 객체를 도입중에 있다. 현재는 프리뷰 단계라 사용하지 않았다.
자세한 내용은 아래의 openjdk 공식 발표를 참고하자
https://openjdk.org/jeps/525
'프로젝트' 카테고리의 다른 글
| [Kafka] Kafka를 사용할때 살펴봐야할 6가지 (2) (0) | 2026.03.03 |
|---|---|
| [Kafka] Kafka를 사용할때 살펴봐야할 6가지 (1) (0) | 2026.02.28 |
| Spring Boot 4.0 & Kafka 4.x NoClassDefFoundError 해결하기 (0) | 2026.02.07 |
| [채팅 분석] 7. 안정적인 대규모 실시간 채팅 수집 완성 (2) (0) | 2026.02.01 |
| [채팅 분석] 6. 가상 스레드로 구축하는 대규모 실시간 채팅 수집 (1) (0) | 2026.01.23 |
