![[이길어때] SSE 방식의 실시간 알림 구현하기](https://img1.daumcdn.net/thumb/R750x0/?scode=mtistory2&fname=https%3A%2F%2Fblog.kakaocdn.net%2Fdn%2FcAIn9r%2FbtsFQNOls4h%2FCVelFHmolg1JUzrkcp2Bzk%2Fimg.png)
이길어때가 더 궁금하다면?
[이길어때] 목차
프로젝트 이길어때의 개발 과정을 정리합니다. 서비스 주소 📍 https://yigil.co.kr/ 이길로그 나의 너무 멋진 장소 yigil.co.kr 깃허브 주소 🐈⬛ https://github.com/Kernel360/f1-Yigil GitHub - Kernel360/f1-Yigil Con
stonehee99.tistory.com
제가 진행하고 있는 이길어때 프로젝트는 점차 기능을 갖춰가면서 하나의 SNS 서비스의 형태로 발전해나가고 있답니다 :)
SNS하면 또 실시간으로 사용자들의 반응을 확인하고 싶은 요구가 생기기 마련인데요, 그에 맞추어 저희 서비스에서도 실시간 알림 기능을 기획하였답니다.
오늘은 프로젝트를 진행하면서 SSE
를 통한 알림 기능을 구현한 경험에 대해 글을 작성해보겠습니다!
SSE (Server-Sent Events)
먼저 실시간 알림을 구현하는 방식은 여러가지가 있습니다.
클라이언트 사이드에서의 주기적인 요청을 통해 구현하는 폴링 방식
그리고 효율적인 양방향 실시간 통신을 위해 새로운 프로토콜을 사용하여 통신하는 웹소켓 방식
그리고 저희가 실시간 알림을 구현하기위해 사용한 SSE 방식이 있습니다.
이 방식들을 모두 다뤄보면 좋겠지만! 오늘은 간단히만 알아보고 나중에 기회가되면 전부 구현해보고 비교해보는 시간을 가지겠습니다!
우선, 저희가 SSE를 선택한 이유를 알기 위해선 먼저 SSE에 대해 알고 있어야 합니다.
SSE(Server-Sent Events)는 이름 그대로 서버가 클라이언트로 실시간 정보를 자동으로 보낼 수 있도록 하는 기술입니다.
먼저 SSE는 서버측에서 클라이언트로 데이터를 전송하는 단방향 통신 방법입니다.
최초에 클라이언트에서 서버에 SSE 연결을 요청하면 서버와 연결을 맺게 됩니다.
이후 연결이 성립하면, 클라이언트는 마치 서버를 구독한 것과 같은 상태가 됩니다. 따라서 서버는 연결이 맺어져있는 시간동안 실시간으로 이벤트가 발생할 때 실시간으로 알림을 전송할 수 있게 됩니다.
이 방식은 주기적으로 요청을 보내야하는 폴링 방식에 비해 훨씬 효율적입니다. 폴링 방식을 사용하면 빠른 반응성을 위해 더욱 자주 서버로 불필요한 요청을 보내야 하는 반면, SSE 방식을 채택하면 한번 연결을 맺으면 연결이 끊길 때 까지 서버는 이벤트를 기다리기만 하면 됩니다.
그리고 웹소켓과 같은 방식에 비해 훨씬 간단하게 구현이 가능합니다. 물론 실시간으로 양 방향 통신이 필요한 상황이라면 다른 기술을 채택하는 것이 좋을 수 있지만, 알림 기능의 경우 서버에서의 특정 동작이 일어나면 해당 동작을 통해 사용자에게 필요한 알림을 전송하기만 하면 되므로, 단방향 통신을 사용하는 것이 리소스도 절약되면서 더 간단하게 알림기능을 구현할 수 있게 됩니다.
Spring 에서 SSE 구현 방법
전통적인 Spring Web MVC Framework
에서는 SseEmitter
클래스를 통해 간단하게 SSE를 구현할 수 있습니다.
따라서 Spring Web Starter 의존성만 있으면 해당 클래스를 사용할 수 있습니다.
build.gradle
dependencies {
implementation 'org.springframework.boot:spring-boot-starter-web-services'
}
그러면 먼저 사용자의 요청을 받아 SSE 구독을 받는 컨트롤러를 만들어 보겠습니다.
@RestController
@RequiredArgsConstructor
public class NotificationController {
private final NotificationService notificationService;
@GetMapping(path = "/api/v1/notifications/stream")
public SseEmitter streamNotifications(@AuthenticationPrinciple User user) {
return notificationService.createEmitter(user.getId());
}
}
위와 같이, SseEmitter를 반환하는 컨트롤러를 만들었습니다. 요청을 통해 사용자의 정보를 받아 연결을 맺습니다.
(이 부분은 각자의 프로젝트에 맞게 적용하시면 됩니다.)
그러면, 이제 서비스를 통해 실제 연결을 맺는 코드를 보겠습니다.
@Service
@RequiredArgsConstructor
public class NotificationService {
private final Map<Long, SseEmitter> userEmitters = new ConcurrentHashMap<>();
public SseEmitter createEmitter(Long userId) {
SseEmitter emitter = new SseEmitter(Long.MAX_VALUE);
userEmitters.put(userId, emitter);
emitter.onCompletion(() -> userEmitters.remove(userId));
emitter.onTimeout(() -> userEmitters.remove(userId));
emitter.onError((e) -> userEmitters.remove(userId));
return emitter;
}
}
각 사용자의 고유정보를(여기서는 Long 타입으로 채번된 id) key로 두고 value에 SseEmitter 객체를 저장하는 Map 객체를 만듭니다.
이 때 동시성 이슈가 있을 수 있으므로 ConcurrentHashMap
타입으로 작성했습니다.
그리고, 만약 연결 요청이 오면 새로운 SseEmitter 객체를 만들고 (이 때, 생성자의 파라미터에는 SseEmitter의 유효시간이 들어갑니다)
그리고 해당 사용자에 맞는 emitter를 map에 저장하고, 연결이 끝나거나 / 만료되거나 / 에러가 발생했을 때에는 해당 맵에서 삭제합니다.
이렇게 하면 SSE 연결을 맺을 모든 준비가 완료됩니다.
그렇다면 실시간 알림을 전송하는건 어떻게 가능할까요?
@Entity
@Getter
@NoArgsConstructor(access = AccessLevel.PROTECTED)
public class Notification {
@Id
@GeneratedValue(strategy = GenerationType.IDENTITY)
private Long id;
@ManyToOne(fetch = FetchType.LAZY)
@JoinColumn(name = "user_id")
private User receiver
private String message;
private boolean read;
@CreateDate
@Column(updatable = false, columnDefinition = "TIMESTAMP")
private LocalDateTime createdAt;
@LastModifiedDate
@Column(columnDefinition = "TIMESTAMP")
private LocalDateTime modifiedAt;
}
먼저, 전송된 알림은 데이터베이스에 저장되어야 하므로 알림을 나타내는 엔티티 객체를 선언합니다.
이후 서비스 클래스에서 알림을 전송하는 로직을 작성해보겠습니다.
@Service
@RequiredArgsConstructor
public class NotificationService {
private final Map<Long, SseEmitter> userEmitters = new ConcurrentHashMap<>();
private final ExecurotService executor = Executors.newSingleThreadExecutor();
private final NotificationRepository notificationRepository;
@Transactional
public void sendNotification(Long receiverId) {
Notification notification = new Notification("알림 왔숑", receiverId);
Notification savedNotification = notificationRepository.save(notification)
sendRealTimeNotification(savedNotification);
}
private void sendRealTimeNotification(Notification notification) {
SseEmitter emitter = userEmitters.get(notification.getUserId());
if (emitter != null) {
excutor.excute(() -> {
try {
emitter.send(SseEmitter.event().name("notification").data(notification.getMessage()));
} catch (Exception e) {
// 예외 처리
}
});
}
}
}
비즈니스 로직에 맞게 Notification 객체를 생성, 저장한 후 쓰레드 풀에서 쓰레드를 생성해서 알림 이벤트를 전송하는 간단한 로직입니다.
이렇게만 구성하면 컨트롤러를 통해 연결을 맺은 클라이언트에게 알림을 전송할 수 있게됩니다!
마주하게된 또다른 문제점
이렇게 코드를 작성하고 알림 기능의 동작은 확인할 수 있었지만 또다른(?) 문제를 마주하게 되었습니다.
바로 저희 서버는 다중 서버 환경에서 동작하고 있다는 것이었습니다.
저희는 EC2
인스턴스 한 대에 NginX
를 프록시 서버로 두고, 두 대의 WAS
로 요청을 로드밸런싱
하는 형태였습니다.
이러한 경우, 단일 서버와 구독을 맺는 구조가 아니기 때문에, SSE연결을 한대의 WAS로 강제하거나 혹은 서로 상태정보를 공유할 수 있어야 했습니다.
해당 문제를 해결하기 위해, 스티키 세션
을 사용할 수도 있습니다.
스티키 세션은 클라이언트의 모든 요청을 특정 서버로만 라우팅 하도록 합니다. 하지만 이경우 서버의 부하 분산을 최적화하기 어렵고 서버 장애시 모든 연결이 끊기게 되는 문제가 있었습니다.
무엇보다 저희 서비스는 세션 저장소로 Redis
를 이미 사용하고 있는 상황이었습니다. 따라서 이미 세션 정보가 공유되므로, 굳이 스티키 세션을 구현할 필요도가 더 떨어졌죠.
그래서 저희는 메시지 브로커
를 활용한 상태 공유가 필요하다고 생각하였습니다. 메시지 브로커로 Kafka
, RabbitMQ
등을 사용할 수 있지만, 저희는 AWS에서 잘 관리되는 서비스인 ElasticCache For Redis
서비스를 적용하고 있었으므로, Redis의 Pub/Sub 모델
을 활용하여 해당 문제를 해결하였습니다.
Redis Pub/Sub 모델을 사용한 SSE 스케일링
Redis의 Pub/Sub 시스템은 레디스 내의 메시징 시스템으로서, 메시지의 발행(publish)과 구독(subscribe)를 통해 실시간 메시징 구현이 가능합니다.
- 먼저 클라이언트가 두 웹 어플리케이션 중 하나의 서버에 요청을 보내 연결을 맺습니다.
- 이후 특정 동작을 통해 알림이 생성됩니다. (이때 이 알림은 1번 서버, 2번 서버 중 어느 곳에서든 생성될 수 있습니다)
서버는 생성된 알림을 Redis에 발행합니다. - 모든 서버 인스턴스는 기본적으로 Redis의 해당 채널을 구독하고 있습니다.
SUBSCRIBE 명령어를 통해 레디스 채널에 메시지가 도착하면 각 서버는 해당 메시지를 받아 처리합니다. - 각 서버는 Redis로 부터 메시지를 전달받고 연결된 클라이언트에게 SSE를 통한 알림을 전송합니다.
이러한 방식으로 상태정보를 공유하고 해결할 수 있습니다.
Spring에서 Redis를 적용하여 SSE 개선하기
먼저 Redis를 설치하고 실행시킵니다. Redis가 잘 동작한다면 서버와 연결시킵니다.
서버 설정에 Redis와의 연결정보를 입력하고 필요한 의존성을 추가합니다.
application.yml
spring:
data:
redis:
host: localhost
port: 6379
build.gradle
dependencies {
implementation 'org.springframework.boot:spring-boot-starter-data-redis'
}
이후 Redis 관련 설정을 작성해야 합니다.
@Configuration
public class RedisConfig {
@Value("${spring.data.redis.host}")
private String host;
@Value("${spring.data.redis.port}")
private int port;
@Bean
public RedisConnectionFactory redisConnectionFactory() {
return new LettuceConnectionFactory(host, port);
}
@Bean
public RedisTemplate<String, Object> redisTemplate() {
RedisTemplate<String, Object> redisTemplate = new RedisTemplace<>();
redisTemplate.setConnectionFactory(redisConnectionFactory());
redisTemplate.setKeySerializer(new StringRedisSerializer());
redisTemplate.setValueSerializer(new Jackson2JsonRedisSerializer<>(Object.class));
return redisTemplate;
}
}
완료되었다면, 알림 전송 로직을 Redis Pub/Sub 모델을 사용하도록 리팩토링 해보도록 하겠습니다.
@Service
@RequiredArgsConstructor
public NotificationService {
private final RedisTemplate<Stirng, Object> redisTemplate;
private final ObjectMapper objectMapper = new ObjectMapper();
private final Map<Long, SseEmitter> userEmitters = new ConcurrentHashMap<>();
private final ExecutorService executor = Executors.newSingleThreadExecutor();
private final NotificationRepository notificationRepository;
@Transactional
public void sendNotification(Long receiverId) {
Notification notification = new Notification("알림 왔숑", receiverId);
Notification savedNotification = notificationRepository.save(notification)
publishNotification(savedNotification);
}
private void publishNotification(Notification notification) {
redisTemplate.convertAndSend("notificationTopic", notification);
}
private void sendRealTimeNotification(Notification notification) {
SseEmitter emitter = userEmitters.get(notification.getUserId());
if (emitter != null) {
excutor.excute(() -> {
try {
emitter.send(SseEmitter.event().name("notification").data(notification.getMessage()));
} catch (Exception e) {
// 예외 처리
}
});
}
}
@Bean
RedisMessageListenerContainer redisContainer(RedisConnectionFacotry redisConnectionFactory) {
RedisMessageListenerContainer container = new RedisMessageListenerContainer();
container.setConnectionFactory(connectionFactory);
container.addMessageListener((message, pattern) -> {
String body = redisTemplate.getStringSerializer().deserialize(message.getBody());
Notification notification = null;
try {
notification = objectMapper.readValue(body, Notification.class);
} catch (JsonProcessingException e) {
throw new RuntimeException(e);
}
assert notification != null;
sendRealTimeNotification(notification);
}, new PatternTopic("notificationTopic"));
return container;
}
}
기존 방식은 DB에 알림을 저장하고 바로 해당 이벤트를 클라이언트로 전달하는 형식이었다면,
지금은 Redis의
그리고 해당 채널을 Subscribe하고있는 RedisMessageListenerContainer
는 해당 이벤트를 받아서 전송합니다.
이렇게 레디스를 통해서 알림 상태 정보를 두 대의 WAS가 공유할 수 있게 됩니다.
결론
오늘은 프로젝트를 진행하면서 SSE를 구현한 경험을 되돌아 보았습니다.
실제 프로젝트에 적용하실 때에는 Redis를 사용한 경우 Pub/Sub 모델이 메시지 순서를 보장하지는 않으므로, 해당 부분을 고려해야 합니다.
또한, SSE는 MTM 공격
을 막기 위해 HTTPS 통신을 통해서만 이루어져야 하며, 또 서버에서 클라이언트로 악의적인 메시지를 보내지 못하도록 데이터 새니타이징을 구현하는 등 추가적인 작업이 필요하답니다.
'회고 > 프로젝트 🖥️' 카테고리의 다른 글
[이길어때] Redis 캐싱으로 API 성능 개선하기 (0) | 2024.04.01 |
---|---|
[이길어때] 우당탕탕 DB 레플리케이션 적용기 (0) | 2024.02.07 |
[이길어때] 이벤트 기반으로 파일 업로드 기능 구현하기 (0) | 2024.02.07 |
[이길어때] 전략 패턴으로 확장성있게 소셜 로그인 설계하기 (0) | 2024.02.07 |
[이길어때] 위치 정보 관리를 위한 PostGIS 적용기 (0) | 2024.02.07 |
안녕하세요, 저는 주니어 개발자 박석희 입니다. 언제든 하단 연락처로 연락주세요 😆