본문 바로가기
프로젝트 기록/Spring

[Spring / Redis] 이중화 서버에서의 웹 소켓 채팅 + SSE 알림 구현기

by clean01 2024. 9. 7.

잊기 전에 기록하자, Redis Pub/Sub을 활용한 STOMP 채팅 + SSE 알림

 

약 한달간의 사이디(Si-D) 프로젝트가 끝났습니다.

사이디.. 잘가

 

 

 

사이디 프로젝트에서는 STOMP를 활용한 1대1 채팅 기능이 있었는데, 채팅이 오면 Server Sent Event 기능으로 즉시 알림을 쏴줄 수 있게 했습니다.

오늘은 그 기능을 구현하는 방법과 구현하며 레디스에 대해 알게된 사실을 정리해보려고 합니다.

 

 

이중화 서버에서의 웹 소켓 통신과 SSE 알림을 처리하는 방법

 

사이디의 서버 배포 아키텍처

 

일단 냅다 아키텍처 사진을 보여드린 이유는...

사이디의 운영 서버는  쿠버네티스에서 파드를 2개 띄워서 두 대의 서버가 요청을 처리해주는 구조로 동작하고 있다는 것을 알려드리기 위함입니다.

 

그런데 Web Socket과 SSE 모두 클라이언트에 데이터를 쏴주는 emitter를 서버의 메모리에서 관리를 하고 있기 때문에 이중화된 서버에서 단일 서버일 때와 똑같이 채팅과 알림을 구현한다면 다음과 같은 문제가 생길 수 있습니다.

 

위 사진을 보면, 유저 1의 웹소켓은 server1에서, 유저 2의 웹소켓은 server2에서 관리되고 있는데요.

이렇게 되면 유저2에게 채팅이 전송되지 않는 문제가 발생합니다.

이는 SSE 방식의 알림에서도 마찬가지입니다.

SSE Emitter를 서버의 메모리에서 관리해주고 있기 때문이죠.

 

물론 DB에 채팅 내용을 저장해주고 있기 때문에 새로고침하면 채팅이 뜨기야 하겠지만 실시간성이 아니기 때문에, 이런 경우에도 채팅이 바로 보일 수 있도록 개선이 필요합니다.

여기서 레디스의 펍섭을 활용할 수 있습니다!

 

 

두 서버가 모두 "chat"이라는 토픽을 구독하고 있고, 채팅이 들어왔을 때 무조건 레디스로 publish 하면

그 publish한 메시지를 두 서버가 모두 읽어가게 됩니다.

 

그러면 각기 다른 서버에서 유저의 웹 소켓을 관리하더라도 채팅이 제대로 가게됩니다.

SSE 알림도 마찬가지로, 유저가 로그인 했을때 그 요청을 처리한 서버가 redis의 "member_{member_id}" 이런 토픽을 구독하고,

알림을 보내줄 때 "member_{member_id}" 토픽에 publish하면 그 서버가 읽어가서 클라이언트에게 이벤트를 전송할 수 있게 됩니다.

 

사이디는 채팅이 올때마다 클라이언트에게 SSE로 알림을 보내주고 있으므로,

채팅과 알림의 과정을 모두 그림에 나타내면 아래와 같습니다.

 

그림이 이해가 잘가게 그린 것은 아닌 것 같지만... 구조가 복잡해서 최선을 다해 그린 것입니다,,,

글로 설명해보자면

1. 1번 유저가 채팅 전송

2. 채팅이 레디스 "chat" 토픽에 publish, 그리고 채팅을 받는 회원(2번 유저)의 멤버아이디 토픽에 알림 내용을 publish

3. "chat"을 구독하고 있던 두 서버가 채팅을 읽어가게되고, 두 유저의 화면에 채팅이 뿌려진다.

4. 채팅을 받는 회원의 멤버아이디 토픽을 subscribe하고 있던 서버가 알림을 읽어가고 그게 해당 멤버(2번 유저)의 헤더에 뿌려지게 된다. (즉 알림 숫자가 올라감)

 

2개 이상의 토픽을 쓸 때 Redis Pub/Sub과 관련한 이슈

위 그림을 통해 알 수 있듯이,

채팅과 알림 모두 Redis pub/sub 기능을 사용하고 있습니다.

 

Redis에는 0~15까지 총 16개의 독립된 공간이 있는데, 이 공간을 각기 다른 용도로 사용할 수 있습니다.

예를 들면 0번은 리프레시 토큰을 관리하고, 1번은 글의 조회수를 관리하고~ 이런 식으로요.

 

spring data redis에서 여러개의 방을 활용하기 위해서는 RedisConnectionFactory와 RedisTemplate 스프링 빈을 방마다 등록해 주어야 하는데요,

같은 타입의 빈을 여러개 등록하여 주입할 때 구분 짓는 것은 @Qualifier 어노테이션으로 할 수 있습니다.

이렇게 말이죠!

 

예시)

    @Bean
    @Qualifier("ssePubSub")
    public RedisConnectionFactory ssePubSubFactory() {
        RedisStandaloneConfiguration configuration = new RedisStandaloneConfiguration();
        configuration.setHostName(host);
        configuration.setPort(port);
        configuration.setDatabase(13);
        return new LettuceConnectionFactory(configuration);
    }

    @Bean
    @Qualifier("ssePubSub")
    public RedisTemplate<String, Object> ssePubSubTemplate(@Qualifier("ssePubSub") RedisConnectionFactory ssePubSubFactory) {
        RedisTemplate<String, Object> redisTemplate = new RedisTemplate<>();
        redisTemplate.setKeySerializer(new StringRedisSerializer());

        //== 객체 안의 객체 직렬화  이슈로 인해 아래와 같이 serializer 커스텀
        Jackson2JsonRedisSerializer<Object> serializer = new Jackson2JsonRedisSerializer<>(Object.class);
        ObjectMapper objectMapper = new ObjectMapper();
        serializer.setObjectMapper(objectMapper);
        redisTemplate.setValueSerializer(serializer);

        redisTemplate.setConnectionFactory(ssePubSubFactory);
        return redisTemplate;
    }
    
    //== 5번 방 ==//
    @Bean
    @Qualifier("chatPubSub")
    public RedisConnectionFactory chatPubSubFactory() {
        RedisStandaloneConfiguration configuration = new RedisStandaloneConfiguration();
        configuration.setHostName(host);
        configuration.setPort(port);
        configuration.setDatabase(5);
        return new LettuceConnectionFactory(configuration);
    }

    
    @Bean
    @Qualifier("chatPubSub")
    public RedisTemplate<String, Object> chatPubSubTemplate(@Qualifier("chatPubSub") RedisConnectionFactory chatPubSubFactory) {
        // Object에는 주로 json이 들어옴
        RedisTemplate<String, Object> redisTemplate = new RedisTemplate<>();
        redisTemplate.setKeySerializer(new StringRedisSerializer());

        //== 객체 안의 객체 직렬화  이슈로 인해 아래와 같이 serializer 커스텀
        Jackson2JsonRedisSerializer<Object> serializer = new Jackson2JsonRedisSerializer<>(Object.class);
        ObjectMapper objectMapper = new ObjectMapper();
        serializer.setObjectMapper(objectMapper);
        redisTemplate.setValueSerializer(serializer);

        redisTemplate.setConnectionFactory(chatPubSubFactory);
        return redisTemplate;
    }

 

@Qualifier 어노테이션을 통해 5번 방은 채팅을 위해 쓰는 공간, 13번 방은 알림을 위해 쓰는 공간으로 지정한 것입니다.

이 bean을 주입받는 것도 @Qualifier로 하면 됩니다.

 

예시)

    private final RedisTemplate<String, String> viewRedisTemplate;
    private final RedisTemplate<String, Object> scrapRedisTemplate;

    @Autowired
    public LaunchedProjectScheduler(LaunchedProjectRepository launchedProjectRepository,
                                    LaunchedProjectScrapRepository launchedProjectScrapRepository,
                                    MemberRepository memberRepository,
                                    @Qualifier("launchedProjectView") RedisTemplate<String, String> viewRedisTemplate,
                                    @Qualifier("launchedProjectScrap") RedisTemplate<String, Object> scrapRedisTemplate
    ) {
        this.launchedProjectRepository = launchedProjectRepository;
        this.launchedProjectScrapRepository = launchedProjectScrapRepository;
        this.memberRepository = memberRepository;
        this.viewRedisTemplate = viewRedisTemplate;
        this.scrapRedisTemplate = scrapRedisTemplate;
    }

 

그런데 Spring Data Redis로 pub/sub 기능을 쓰기 위해서는 두가지 빈을 더 추가해주어야 하는데요.

바로 RedisListenerAdapter와 RedisMessageListenerContainer입니다.

저희는 처음에 Redis pub/sub은 특정 방에 종속되는 것이라고 생각했습니다.

 

즉, 5번 방의 "chat" 토픽을 구독하면 5번방의 "chat" 토픽에 메시지가 들어왔을 때만 읽어가고,

13번 방에 "chat" 토픽 메시지가 들어왔을 때는 5번 방과 무관하므로 읽어가지 않는 식으로 말이죠.

따라서 RedisMessageListenerContainer 빈도 각각 존재해야한다고 생각했습니다.

 

그래서 아래와 같이, 채팅과 관련한 펍섭과 알림과 관련한 펍섭을 방을 구분하기 위해 RedisMessageListenerContainer 빈을 2개 등록해줬으며, @Qualifier로 구분지어줬습니다.

//== 5번 방 ==//
    @Bean
    @Qualifier("chatPubSub")
    public RedisConnectionFactory chatPubSubFactory() {
        RedisStandaloneConfiguration configuration = new RedisStandaloneConfiguration();
        configuration.setHostName(host);
        configuration.setPort(port);
        configuration.setDatabase(5);
        return new LettuceConnectionFactory(configuration);
    }

    // RedisTemplate은 redis와 상호작용할 때 redis key, value의 형식을 지정한다.
    @Bean
    @Qualifier("chatPubSub")
    public RedisTemplate<String, Object> chatPubSubTemplate(@Qualifier("chatPubSub") RedisConnectionFactory chatPubSubFactory) {
        // Object에는 주로 json이 들어옴
        RedisTemplate<String, Object> redisTemplate = new RedisTemplate<>();
        redisTemplate.setKeySerializer(new StringRedisSerializer());

        //== 객체 안의 객체 직렬화  이슈로 인해 아래와 같이 serializer 커스텀
        Jackson2JsonRedisSerializer<Object> serializer = new Jackson2JsonRedisSerializer<>(Object.class);
        ObjectMapper objectMapper = new ObjectMapper();
        serializer.setObjectMapper(objectMapper);
        redisTemplate.setValueSerializer(serializer);

        redisTemplate.setConnectionFactory(chatPubSubFactory);
        return redisTemplate;
    }
    
    @Bean
    @Qualifier("chatMessageListenerAdapter")
    public MessageListenerAdapter messageListenerAdapter(RedisSubscriber redisSubscriber) {
        return new MessageListenerAdapter(redisSubscriber);
    }


    @Bean
    @Qualifier("chatPubSub")
    public RedisMessageListenerContainer redisMessageListenerContainer(@Qualifier("chatPubSub") RedisConnectionFactory chatPubSubFactory,
                                                                       @Qualifier("chatMessageListenerAdapter") MessageListenerAdapter listenerAdapter) {
        RedisMessageListenerContainer container = new RedisMessageListenerContainer();
        container.setConnectionFactory(chatPubSubFactory);
        container.addMessageListener(listenerAdapter, topic());

        return container;
    }

    @Bean
    @Qualifier("chatTopic")
    public ChannelTopic topic() {
        return new ChannelTopic("chat");
    }
    
    //== 13번 ==//
    @Bean
    @Qualifier("ssePubSub")
    public RedisConnectionFactory ssePubSubFactory() {
        RedisStandaloneConfiguration configuration = new RedisStandaloneConfiguration();
        configuration.setHostName(host);
        configuration.setPort(port);
        configuration.setDatabase(13);
        return new LettuceConnectionFactory(configuration);
    }

    @Bean
    @Qualifier("ssePubSub")
    public RedisTemplate<String, Object> ssePubSubTemplate(@Qualifier("ssePubSub") RedisConnectionFactory ssePubSubFactory) {
        RedisTemplate<String, Object> redisTemplate = new RedisTemplate<>();
        redisTemplate.setKeySerializer(new StringRedisSerializer());

        //== 객체 안의 객체 직렬화  이슈로 인해 아래와 같이 serializer 커스텀
        Jackson2JsonRedisSerializer<Object> serializer = new Jackson2JsonRedisSerializer<>(Object.class);
        ObjectMapper objectMapper = new ObjectMapper();
        serializer.setObjectMapper(objectMapper);
        redisTemplate.setValueSerializer(serializer);

        redisTemplate.setConnectionFactory(ssePubSubFactory);
        return redisTemplate;
    }

    @Bean
    @Qualifier("ssePubSub")
    public RedisMessageListenerContainer redisMessageListenerContainer(@Qualifier("ssePubSub") RedisConnectionFactory sseFactory) {
        RedisMessageListenerContainer container = new RedisMessageListenerContainer();
        container.setConnectionFactory(sseFactory);
        return container;
    }

 

 

그리고 이 빈을 사용하는 SseService에서 이렇게 @Qualifier로 빈을 주입받으려고 했는데...!

 

 

nested exception is org.springframework.beans.factory.NoSuchBeanDefinitionException: No qualifying bean of type 'org.springframework.data.redis.listener.RedisMessageListenerContainer' available: expected at least 1 bean which qualifies as autowire candidate. Dependency annotations: {@org.springframework.beans.factory.annotation.Qualifier("ssePubSub")}

 

적어도 그런 빈이 1개는 필요한데, 적절한 빈이 존재하지 않는다는 에러가 뜨며 빌드자체가 되지 않는 것을 볼 수 있었습니다.

 

분명 빈은 2개 존재하고, @Qualifier로 구분까지 해줬는데 무슨일일까요..

 

 

Redis Pub/Sub은 방에 종속되지 않는다

그 이유는 ... Redis pub/sub은 방에 종속되지 않고, 레디스 전체에서 관리되기 때문이라고 예상했습니다.

 

그렇게 생각한 이유는 일종의 펍섭 테스트를 진행해본 결과 때문이었는데요

 

SseService에서 RedisMessageListenerContainer를 주입받는 부분에서 Qualifier를 제거하니 어쨌든 실행은 돼서 그 상태로 Redis pubsub이 정상적으로 동작하는지 테스트해봤습니다.

터미널을 열고 레디스에 직접 접속해서 "subscribe 토픽" 명령어를 통해 메시지가 잘 전달되는지 확인해보았는데요

 

그 결과 5번방 또는 13번 방이 아닌, 아무 방이나 select로 들어가서 subscribe를 하더라도 메시지가 잘 전달되는 것을 볼 수 있었습니다.

SSE바오님의 열혈 테스트

 

Redis Pub/Sub의 공식문서를 보면!

ㅋㅋDatabase랑은 상관이 없다내

 

펍섭은 Database에 종속되는게 아닌, 레디스 전체에서 관리되는 것이 맞았습니다.

아마도 RedisMessageListenerContainer에 @Qualifier가 동작하지 않았던 것은, RedisMessageListener가 Redis pub/sub에 관련한 빈이므로 스프링 컨테이너 전체에서 한개만 존재해야하기에 빈으로 등록하는 부분에서 @Qualifier가 붙든 안붙든 인식을 하지 않는 것은 아닌지..

그래서 다른 빈에서 주입받으려고 할때 @Qualifier("ssePubSub")하니까 그런 빈은 없다고 한게 아닌지 예상해봅니다.

 

이부분은 정확한 레퍼런스를 찾아보려고 했지만 아직 찾지 못했습니다.

RedisMessageListenerContainer 클래스의 문서를 읽어봐도 해당 내용은 없는 것 같습니다. 아니면 제가 이해를 못한건지...

 

아무튼! Si-D의 채팅, SSE의 구조와 구현하며 만났던 Redis Pub/Sub과 관련한 이슈를 정리해보았습니다.

 

구현 코드를 보고 싶으시다면 아래 레포지토리에서 다음 클래스들을 참고해주시면 좋을 것 같습니다

(아 스파게티 코드라 너무 부끄러워)

- RedisConfig

- WebSocketService

- ChatService

- WebSocketController

- ChatController

- StompHandler

- SseService

- SseController 

https://github.com/devjeans-si-d/Si-D-server

 

GitHub - devjeans-si-d/Si-D-server: Si-D(사이디)의 서버 리포지토리 (섭종)

Si-D(사이디)의 서버 리포지토리 (섭종). Contribute to devjeans-si-d/Si-D-server development by creating an account on GitHub.

github.com

 

Reference

https://redis.io/docs/latest/develop/interact/pubsub/