FireDrago

[채팅 감정 분석] 4. 실시간 분석 데이터 파이프라인 설계 본문

프로젝트

[채팅 감정 분석] 4. 실시간 분석 데이터 파이프라인 설계

화이용 2025. 12. 21. 17:09

(이 포스팅의 설계는 변경되었습니다. 변경된 설계 참고해주세요)

채팅 감정 분석 아키텍처

웹소켓을 기반으로 한 실시간 시스템은 필연적으로 비대칭적 처리 속도를 해결해야 한다.

수집단에서 쏟아지는 데이터를 분석단이 제때 소화하지 못할 때 발생하는 데이터 유실을 막고,

시스템의 전체적인 처리량(Throughput)을 위한 아키텍처 설계 과정을 정리했다.

이전 포스팅 웹소켓 클라이언트와 스레드 참고하자

1단계 : 수집

데이터 수집의 핵심은 도메인 보호와 책임의 분리다.

외부 플랫폼(치지직)의 데이터 구조에 시스템이 종속되지 않게 설계했다.

 

1. 매퍼 클래스와 표준 도메인의 도입

ChatMessage 공통 dto와 변환을 위한 ChzzkMessageMapper 추가

외부에서 유입되는 JSON 데이터는 변경 가능성이 높고 비즈니스 로직에 부적합하다.

이를 위해 전용 매퍼를 두어

시스템 내부에서 사용하는 표준 도메인 객체(ChatMessage)로 즉시 변환하도록 설계했다.

 

외부 API 명세가 변경되더라도 매퍼 클래스만 수정하면 되므로,

핵심 분석 로직의 오염을 방지하는 어댑터 패턴을 도입했다.

불변 객체인 Record를 사용하여 멀티스레드 환경에서의 안정성을 확보했다.

 

2. ChatBuffer를 통한 큐 도입

@Component
public class ChatBuffer {

    private final BlockingDeque<ChatMessage> queue = new LinkedBlockingDeque<>();

    public void produce(ChatMessage chatMessage) {
        queue.offer(chatMessage);
    }
    // 최대 maxSize개, 혹은 timeoutMs 동안 기다려서 채팅 데이터를 가져오는 메서드
    public List<ChatMessage> drainBatch(int maxSize, long timeoutMs) throws InterruptedException {
        List<ChatMessage> tempBatch = new ArrayList<>();
        tempBatch.add(queue.take());

        long deadLine = System.currentTimeMillis() + timeoutMs;
        while (tempBatch.size() < maxSize) {
            long remaining = deadLine - System.currentTimeMillis();
            if (remaining < 0) break;

            ChatMessage next = queue.poll();
            if (next == null) break;
            tempBatch.add(next);
            if (!queue.isEmpty()) {
                queue.drainTo(tempBatch, maxSize);
            }
        }
        return List.copyOf(tempBatch);
    }
}

버퍼는 수집단(Producer)과 분석단(Consumer) 사이의

속도 차이를 완충하고, 효율적인 처리를 위한 배치 처리를 위한 임시 저장소의 역할도 한다.

 

1. `LinkedBlockingDeque`를 선택한 이유

단순한 리스트가 아닌 스레드 안전한 큐를 사용하여,

서로 다른 라이프사이클을 가진 스레드들 사이에서 데이터 정합성을 보장한다.

별도의 락(Lock)을 직접 구현하지 않고도 Java 표준 라이브러리를 통해 Non-blocking 데이터 접근을 할 수 있게 되었다.

향후 서비스가 여러 채팅을 동시에 분석해야 할때, 
`kafka`등 외부 메시징 큐와 연동하는 구조로 변경하기 쉽도록 별도의 클래스로 분리했다.

 

2. 배치 추출 전략 `drainBatch()`

이 메서드는 단순히 데이터를 꺼내는 것이 아니라, CPU 효율과 실시간 응답을 고려했다.

  • take(): 데이터 부재 시 스레드를 대기(Waiting) 상태로 두어 불필요한 CPU 점유를 방지한다.
  • poll(timeout): 배치 사이즈가 차지 않더라도 설정한 시간이 지나면 즉시 분석을 시작하여 지연 시간을 보장한다.
  • drainTo(): 큐에 쌓인 대량의 데이터를 한 번에 원자적으로 이동시켜 컨텍스트 스위칭 비용을 최소화한다.

2단계 : 매니저 (배치 처리)

 

1. 매니저단의 비동기 처리

@Service
@RequiredArgsConstructor
public class ChatAnalyzeService implements CommandLineRunner {

    private final ChatBuffer chatBuffer;
    private final ChatEmotionAnalyzer chatEmotionAnalyzer;

    @Override
    public void run(String... args) throws Exception {
        analyze();
    }
    
    // 단일 스레드 풀
    @Async("chatManagerThreadPoolTaskExecutor")
    public void analyze() throws InterruptedException {
        while (!Thread.interrupted()) {
            // 버퍼에서 배치처리 호출만 하면 된다.
            List<ChatMessage> chatMessages = chatBuffer.drainBatch(30, 1000);
            // 분석단으로 넘기기
            chatEmotionAnalyzer.analyze(chatMessages);
        }
    }
}
// AppConfig.class

@Bean
public ThreadPoolTaskExecutor chatManagerThreadPoolTaskExecutor() {
    ThreadPoolTaskExecutor executor = new ThreadPoolTaskExecutor();
    executor.setCorePoolSize(1);
    executor.setMaxPoolSize(1);
    executor.setQueueCapacity(150);
    executor.setThreadNamePrefix("chat-manager");
    executor.initialize();
    return executor;
}

매니저는 시스템 전체의 흐름을 관장하는 'Dispatcher' 역할을 수행한다.

 

1. 단일 스레드 모델 기반의 비동기 처리

분석을 관리하는 (chat-manager)를 단일 스레드 풀로 구성했다.

배치를 구성하는 로직에서 경합이 발생하면 오히려 성능이 저하된다.

1개의 스레드가 배치를 만들어 워커들에게 작업을 전달하는 것이 더 효율적이라고 판단했다.

 

`@Async`와 `CommandLineRunner`를 조합하여

앱 기동과 동시에 백그라운드에서 분석 파이프라인이 독립적으로 가동되도록 구현했다.

 

3단계 : 분석

// AppConfig.class
// 본격적인 분석 작업을 실행하는 워커 스레드
@Bean
public ThreadPoolTaskExecutor chatWorkerThreadPoolTaskExecutor() {
    ThreadPoolTaskExecutor executor = new ThreadPoolTaskExecutor();
    executor.setCorePoolSize(4);
    executor.setMaxPoolSize(8);
    executor.setQueueCapacity(150);
    executor.setThreadNamePrefix("chat-worker");
    executor.initialize();
    return executor;
}
@Component
public class ChatEmotionAnalyzer {

    @Async("chatWorkerThreadPoolTaskExecutor")
    public void analyze(List<ChatMessage> buffer) {
        // llm과 연계하여 본격적인 감정분석 구현
    }
}

실제 무거운 분석 작업은 여러 개의 워커 스레드(chat-worker)들이

나누어 처리하여 전체 시스템의 처리량(Throughput)을 효율적으로 최적화 한다.

 

1. I/O Bound 작업에 최적화된 설계

LLM 분석은 외부 연동이 포함된 무거운 작업이다.

매니저가 작업을 던져주는 즉시 워커들이 병렬로 실행되므로,

매니저는 지체 없이 다음 배치를 구성하러 돌아갈 수 있다.

 

분석 로직을 별도의 워커 스레드 풀로 격리하여,

특정 분석 작업의 지연이 전체 시스템의 데이터 수집을 방해하지 않는

유연한 구조로 설계했다.

더 고민해야할 지점들

1. 데이터파이프라인 안정성

과부하로 인해 큐가 가득 찼을 때의 처리 전략(Rejection Policy)을 고민해볼 필요가 있다.

큐가 꽉 찼을 때 가장 오래된 데이터를 버릴 것인지(DiscardOldestPolicy),

아니면 호출한 스레드가 직접 처리하게 해서 속도를 늦출 것인지(CallerRunsPolicy) 등

모니터링 환경 구축 후, 실제 유입 데이터를 분석하여 시스템 안정성을 극대화할 수 있는 정책을 결정해야 한다.

 

2. 배치 사이즈와 타임아웃의 트레이드 오프

현재 설정된 '30개'와 '1000ms'라는 수치는 어떻게 최적화할 수 있을까?

배치를 크게 잡으면 처리량(Throughput)은 좋아지지만 지연 시간(Latency)이 늘어난다.

반대로 작게 잡으면 LLM 호출 비용이 급증한다.

향후 트래픽 패턴을 분석하여 비용 최적화와 실시간성 사이의 최적의 지점을 찾아가는 과정이 필요하다.

 

3. 시스템의 예외발생시 회복

단일 예외가 전체 파이프라인 중단으로 이어지지 않도록 예외 격리 및 재시도 전략이 필요하다.