FireDrago
[SSE] SseEmitter로 실시간 주문 상태 업데이트 구현하기 본문

구현 코드는 여기서 확인 할 수 있다.
백엔드 로직은 학습후에 직접 작성했다.
프론트 코드는 gemini가 100% 작성했다. (당당)
Short Polling 과 Long Polling
배달앱의 실시간 주문상태 업데이트나 서비스의 라이브 알림 기능은 어떻게 구현될까?
일반적인 HTTP 요청-응답 모델로는 이런 기능을 만들기 어렵다.
클라이언트가 먼저 요청해야만 서버가 응답할 수 있기 때문이다
Short Polling

위의 그림 처럼 Short Polling 방식을 사용할 수 있다.
클라이언트가 아주 짧은 주기로 서버에 계속해서 "새로운 데이터 있어?"라고 물어보는 방식이다.
하지만 이는 데이터가 없어도 계속 요청을 보내므로 매우 비효율적이다.
Long Polling

이를 개선한 것이 Long Polling이다.
클라이언트가 요청을 보내면, 서버는 바로 응답하지 않고
새로운 데이터가 생길 때까지 연결을 열어두고 기다린다.
데이터가 발생하면 그제야 응답을 보내 연결을 종료한다.
클라이언트는 응답을 받은 후 즉시 다음 요청을 보내 대기 상태에 들어간다.
GitHub Actions의 self-hosted runner가 새 작업을 할당받기 위해 이와 유사한 방식을 사용했다.
(현재는 공식문서에서 Long Polling에 대한 언급이 없다. 과거 공식문서를 인용한다.)
A self-hosted runner connects to your GitHub Enterprise Server instance to receive job assignments and to download new versions of the runner application. The self-hosted runner uses an HTTP(S) long poll that opens a connection to GitHub for 50 seconds, and if no response is received, it then times out and creates a new long poll.
하지만 Long Polling도 완벽한 해결책은 아니다.
연결을 유지하는 동안 서버 자원을 계속 소모하고, (스레드, 프로세스 등)
연결이 끊어지고 다시 맺힐 때마다 불필요한 HTTP 헤더 데이터가 오고 간다.
무엇보다 가장 큰 한계는 이것이 진정한 서버 푸시(Server Push)가 아니라는 점이다.
어디까지나 클라이언트의 요청에 대한 '지연된 응답'일 뿐,
서버가 필요할 때 능동적으로 클라이언트에 데이터를 보낼 수는 없다.
이러한 Polling 방식들의 근본적인 한계 때문에, 더 효율적인 대안으로 SSE(Server-Sent Events)가 등장했다.
SSE (Server-Sent Events)

SSE(Server-Sent Events)는 서버가 클라이언트로 데이터를 일방적, 지속적으로 밀어 넣어주는(Push) 기술이다.
클라이언트는 한번 연결을 맞추고 나면, 서버가 보내주는 새로운 소식을 계속해서 수신할 수 있다.
Long Polling이 가진 자원 소모, 불필요한 데이터 낭비,
그리고 근본적인 통신 방향의 한계. 이러한 문제들을 해결하기 위해 등장한 기술이다.
이름에서 알 수 있듯이, 서버가 보내는 이벤트라는 의미를 담고 있다.
효율적인 자원 사용
SSE 연결은 서버에 맺어진 채로 특별한 활동 없이 대기한다.
이 상태에서는 전담 스레드를 전혀 차지하지 않고,
단지 운영체제가 관리하는 수많은 연결 목록 중 하나로 존재한다.
서버에서 해당 클라이언트에게 보낼 데이터(이벤트)가 발생하면,
이벤트 루프가 이를 감지하고 스레드 풀에서 스레드를 잠깐 빌려와 데이터를 전송한다.
데이터 전송이 끝나면 스레드는 즉시 반납되고, SSE 연결은 다시 전담 스레드 없이 대기 상태로 돌아간다.
따라서 수천, 수만 개의 SSE 연결이 유지되더라도 서버는 소수의 스레드만으로 효율적인 처리가 가능하다
.
이제 주문 상태 변경을 스프링부트와 SSE로 구현해보자
스프링으로 SSE 구현하기 - 1. 최초 연결
SSE를 사용하기 위해서는 3가지 단계가 필요하다.
1. 최초 연결 (클라이언트/ 서버)
2. 연결 유지 및 데이터 전송 (서버)
3. 이벤트 수신 (클라이언트)
최초 연결 코드부터 살펴보자
// 클라이언트는 최초연결시 미디어 타입 "text/event-stream" 명시
@GetMapping(value = "/connect", produces = "text/event-stream")
public SseEmitter connect(
@RequestParam String userId,
HttpServletResponse response
) {
// Nginx 프록시 환경에서 버퍼링 방지
response.setHeader("X-Accel-Buffering", "no");
return sseNotificationService.subscribe(userId);
}
@GetMapping(value = "/connect", produces = "text/event-stream")
여기서 가장 중요한 부분은 produces = "text/event-stream"이다.
이 요청의 응답 미디어 타입을 SSE의 표준인 text/event-stream으로 지정하는 것으로,
일반적인 HTTP 응답이 아닌 이벤트 스트림 응답임을 클라이언트와 서버 양쪽에 공식적으로 알리는 역할을 한다.
이벤트 스트림이란 최초 한 번의 요청으로 연결된 후에는 클라이언트가 추가로 요청하지 않아도
서버가 계속해서 이벤트 데이터를 보낼 수 있는 통신 방식을 말한다.
response.setHeader("X-Accel-Buffering", "no")
만약 서버 앞에 Nginx 같은 리버스 프록시가 있다면,
프록시는 데이터를 일정량 모았다가 한 번에 보내는 '버퍼링'을 할 수 있다.
실시간 통신에서 이런 버퍼링은 지연을 유발하므로,
이 설정을 통해 버퍼링 없이 데이터가 생기는 즉시 바로 보내라고 명시해주는 것이다.
@Service
@RequiredArgsConstructor
public class SseNotificationService {
private static final Long EMITTER_TIMEOUT = 10 * 60 * 1000L; // 10분
private final Map<String, SseEmitter> emitters = new ConcurrentHashMap<>();
private final ObjectMapper objectMapper;
private final OrderSimulationQueue simulationQueue; // 실제 서버에서는 db 사용
public SseEmitter subscribe(String userId) {
SseEmitter emitter = new SseEmitter(EMITTER_TIMEOUT);
if (emitters.containsKey(userId)) {
emitters.get(userId).complete();
}
emitters.put(userId, emitter);
emitter.onCompletion(() -> emitters.remove(userId));
emitter.onTimeout(() -> emitters.remove(userId));
emitter.onError(e -> emitters.remove(userId));
// 시뮬레이션 큐에 userId 추가
simulationQueue.addUser(userId);
sendEventToClient(emitter, "CONNECT", "Connection successful. Your order will be processed soon.");
return emitter;
}
@Scheduled(fixedRate = 15_000)
public void sendHeartbeat() {
if (emitters.isEmpty()) {
return;
}
String heartbeatPayload = "tick " + System.currentTimeMillis();
emitters.forEach((userId, emitter) -> {
sendEventToClient(emitter, "HEARTBEAT", heartbeatPayload);
});
}
public void sendToClient(String userId, String eventType, Object payload) {
SseEmitter emitter = emitters.get(userId);
if (emitter != null) {
sendEventToClient(emitter, eventType, payload);
} else {
log.warn("No active emitter found for userId: {}", userId);
}
}
private void sendEventToClient(SseEmitter emitter, String eventType, Object payload) {
try {
Map<String, Object> eventData = Map.of("type", eventType, "payload", payload);
String jsonEventData = objectMapper.writeValueAsString(eventData);
emitter.send(SseEmitter.event().data(jsonEventData));
} catch (IOException e) {
log.error("Failed to send SSE event to client: {}", e.getMessage());
}
}
}
1. 새로운 구독자 처리 (subscribe 메서드)
스프링은 SSE 요청을 보내기위한 여러 기능들을 SseEmitter 객체에 이미 만들어 두었다.
SseEmitter 객체는 서버에서 클라이언트로 데이터를 보내기 위한 통신 채널의 역할을 한다.
클라이언트가 최초로 구독을 요청할 때 클라이언트 하나당 SseEmitter 객체 하나가 생성된다.
ConcurrentHashMap을 사용해서 SseEmitter 연결을 저장한다. (userId가 key)
원하는 데이터를 SseEmitter를 통해 클라이언트에게 전송 할 수 있다.
emitter.onCompletion(() -> ...), emitter.onTimeout(() -> ...), emitter.onError(() -> ...)
연결이 정상적으로 완료되거나, 시간이 초과되거나,
에러가 발생해서 연결이 끊어질 때 emitters 맵에서
해당 클라이언트의 정보를 반드시 삭제하도록 콜백을 등록한다.
문제있는 연결을 해제하는 것이라고 생각하면 쉽다.
스프링으로 SSE 구현하기 - 2. 연결 유지 및 데이터 전송
@Scheduled(fixedRate = 15_000)
public void sendHeartbeat() {
if (emitters.isEmpty()) {
return;
}
String heartbeatPayload = "tick " + System.currentTimeMillis();
emitters.forEach((userId, emitter) -> {
sendEventToClient(emitter, "HEARTBEAT", heartbeatPayload);
});
}
1. 연결 유지 (sendHeartbeat 메서드)
서버와 클라이언트 사이에 있는 방화벽이나 프록시 서버는
오랫동안 데이터 교환이 없는 연결을 유휴 상태(idle)로 보고
강제로 끊어버릴 수 있다. 이를 방지하기 위해
주기적으로 연결되어 있다는 의미의 작은 데이터(Heartbeat)를 보내서
연결이 끊어지지 않게 유지한다.
public void sendToClient(String userId, String eventType, Object payload) {
SseEmitter emitter = emitters.get(userId);
if (emitter != null) {
sendEventToClient(emitter, eventType, payload);
} else {
log.warn("No active emitter found for userId: {}", userId);
}
}
private void sendEventToClient(SseEmitter emitter, String eventType, Object payload) {
try {
Map<String, Object> eventData = Map.of("type", eventType, "payload", payload);
String jsonEventData = objectMapper.writeValueAsString(eventData);
emitter.send(SseEmitter.event().data(jsonEventData));
} catch (IOException e) {
log.error("Failed to send SSE event to client: {}", e.getMessage());
}
}
2. 데이터 전송 ( sendToClient 메서드 )
실제로 클라이언트에게 데이터를 전송하는 로직이다.
전달할 데이터를 Map으로 구조화하고,
ObjectMapper를 사용해 JSON 문자열로 변환한 뒤
emitter.send()를 통해 클라이언트로 전송한다.
try-catch로 감싸서 클라이언트 연결이 이미 끊긴 경우에도
서버에 에러가 발생하지 않도록 안전하게 처리한다.
스프링으로 SSE 구현하기 - 3. SSE 호출 하기
@Component
@RequiredArgsConstructor
public class OrderStatusSimulator {
private final SseNotificationService notificationService;
private final OrderSimulationQueue simulationQueue;
// 각 사용자별로 현재 어떤 주문 상태인지 추적
private final Map<String, Integer> userStatusTracker = new ConcurrentHashMap<>();
private final List<OrderStatus> statusFlow = Arrays.asList(
OrderStatus.ORDER_RECEIVED,
OrderStatus.COOKING,
OrderStatus.OUT_FOR_DELIVERY,
OrderStatus.DELIVERED
);
@Scheduled(fixedRate = 3_000) // 3초마다 실행
public void processNextOrderInQueue() {
// 대기열에서 다음 사용자 가져오기
Optional<String> userIdOptional = simulationQueue.getNextUserId();
if (userIdOptional.isPresent()) {
String userId = userIdOptional.get();
userStatusTracker.put(userId, 0);
sendStatusUpdate(userId);
}
}
@Scheduled(fixedRate = 3_000) // 3초마다
public void advanceOrderStatusForActiveUsers() {
if (userStatusTracker.isEmpty()) {
return;
}
userStatusTracker.forEach((userId, statusIndex) -> {
// 마지막 상태(DELIVERED)가 아니면 다음 단계로 진행
if (statusIndex < statusFlow.size() - 1) {
int nextIndex = statusIndex + 1;
userStatusTracker.put(userId, nextIndex);
sendStatusUpdate(userId);
} else {
// 배달 완료된 주문은 추적 맵에서 제거
userStatusTracker.remove(userId);
}
});
}
private void sendStatusUpdate(String userId) {
Integer statusIndex = userStatusTracker.get(userId);
if (statusIndex != null) {
OrderStatus currentStatus = statusFlow.get(statusIndex);
notificationService.sendToClient(userId, "STATUS_UPDATE", currentStatus);
}
}
}
앞서 만든 SseNotificationService를 이용해서,
실시간으로 배달 주문 상태가 변하는 상황을 시뮬레이션하는
OrderStatusSimulator 클래스다.
1. 새로운 주문 접수 (processNextOrderInQueue 메서드)
주문이 있으면, userStatusTracker 맵에 (사용자 ID, 0)을 기록해서
사용자의 주문이 '접수' 상태로 등록한다.
sendStatusUpdate를 호출해서 클라이언트에게 "주문 접수됨!"이라는 첫 알림을 보낸다.
2. 주문 상태 진행 (advanceOrderStatusForActiveUsers 메서드)
userStatusTracker에 등록된 모든 진행 중인 주문들을 하나씩 확인한다.
주문의 현재 상태가 마지막 단계('배달 완료')가 아니라면, 상태를 다음 단계로 하나씩 진전시킨다.
상태가 바뀔 때마다 sendStatusUpdate를 호출해서 클라이언트에게 변경된 상태를 알림으로 보낸다.
배달 완료 상태에 도달하면, 해당 주문은 끝난 거니까 userStatusTracker 맵에서 제거한다.
3. 알림 발송 (sendStatusUpdate 메서드)
사용자의 현재 주문 상태(OrderStatus)를 가져와.
notificationService.sendToClient()를 호출해서,
해당 사용자에게 "STATUS_UPDATE"라는 타입으로
현재 주문 상태 데이터를 SSE 이벤트로 전송한다.
'프로그래밍 > SpringBoot' 카테고리의 다른 글
| [SpringBoot] 다양한 외부 설정 조회방법 (0) | 2024.05.31 |
|---|---|
| [SpringBoot] 외부설정 (properties) (0) | 2024.05.30 |
| [SpringBoot] 스프링부트와 내장톰캣 (0) | 2024.05.23 |