본문 바로가기
JAVA

Spring에서 Notification을 구현해보자(WebFlux, Reactor Sinks, SSE)

by 열정적인 이찬형 2024. 2. 27.
주의!!

→ 알림을 구현하기 위해서 경험을 정리한 글이며 대규모 트래픽은 환경은 아니지만, 알림(포탈, SMS, Email, Kakao Alim Talk)이 발생하는 서비스를 제공한 내용을 정리한 글입니다.

 

→ 잘못된 부분이 있을 수 있으며, 해당 부분에서는 바로바로 지적해주시면 정말 감사하겠습니다.

 


[상황]

알림 서비스를 구현해야 하는 상황에 맞닥뜨렸다.

 

나는… 알림을 구현해본적이 없는데

 

먼저 구현하는 방법에 대해서 먼저 찾아보자!!

 

알림을 구현하는 방법에는 크게 4가지가 존재하며, 저는 SSE(Server-Sent Event)을 사용하였습니다.


[알림을 구현하는 방법 4가지]

1) Short-Polling

클라이언트는 설정한 주기(예:2초)로 서버에 대한 요청을 반복한다.

 

장점

  • Client와 Server의 구현이 모두 단순하다

단점

  • 지속적으로 Request를 서버에 보내기 때문에 클라이언트가 많아지면 서버의 부담이 증가하게 됩니다.
  • TCP의 Connection의 HandShake 과정 자체가 매우 무겁다.
  • 실시간으로 변화되는 빠른 정보의 응답을 기대하기는 어렵습니다.(간격이 존재하기 때문)

 

2) Long-Polling

Client가 Server에 요청을 보낸 뒤, 서버는 새로운 데이터가 나올 때까지 기다렸다가 응답을 전달합니다.

Client는 응답을 받은 뒤 즉시 요청을 다시 보내서 데이터의 응답을 기다립니다.

장점

  • Short Polling에 비해 동일한 양의 데이터를 Client에게 전송하는 HTTP 요청 수를 줄일 수 있습니다.

        ▶︎ 간격이 아닌 데이터가 새로 만들어졌을 때 응답을 받기 때문입니다.

 

  • 서버에서 응답을 순간을 정하기 때문에 실시간 메시지 전달이 가능하다.

        ▶︎ 단, 서버의 상태가 자주 변하지 않는 환경에 적합합니다.

 

단점

  • 상태가 빈번하게 바뀐다면 연결 요청도 늘어나 서버에 부담

        ▶︎ 응답이 반복됨에 따라 다시 TCP Connection HandShake가 진행되는 이유도 존재합니다.

 

3) WebSocket

첫 연결은 HTTP HandShake 과정으로 통해서 연결을 진행하고, WS 프로토콜을 변경하여 통신을 진행합니다.

WebSockets의 데이터는 Client/Server 양방향으로 통신이 진행됩니다.

장점

  • 빠른 속도을 제공합니다.

        ▶︎ Client/Server는 통신할 때마다 서로의 연결을 찾아서 다시 설정할 필요가 없습니다.

 

  • 데이터는 어느 방향으로든 즉시 안전하게 전송합니다.

        ▶︎ TCP으로 전달함으로써 메시지가 항상 순서대로 도착하도록 보장됩니다.

 

단점

  • 시스템에 상당한 복잡성을 추가하고 구현하는 데 많은 투자가 필요합니다.

        ▶︎ Polling이나 SSE가 적합하지 않은 경우에만 사용하는 것을 권장합니다.

 

  • Websocket은 HTTP 위에서 진행하는 것이 아니기 때문에 HTTP의 여러 이점을 얻을 수 없습니다.

1. HTTP 압축을 지원하지 않습니다.

2. HTTP/2 Multiplexing을 지원하지 않습니다.
   ▶︎ 동일한 호스트에 대한 여러 요청/응답들이 동일한 TCP 연결을 공유합니다

3. Websocket을 지원하지 않는 프록시에서 문제가 발생할 수 있습니다.

4. Cross-Site Websocket Hijacking으로부터 보호되지 않습니다.

   ▶︎ Origin 헤더 확인 및 승인되지 않은 도메인으로부터의 연결 차단을 직접 수행야 합니다.

 

4) SSE (Server-Sent Event)

Server와 Connection을 완료한 뒤, 일정 시간 동안 Server에서 변경이 일어날 때마다 Server에서 Client로 데이터를 전송하는 방법

HTTP의 Persistent Connections을 기반으로 하는 HTML5 표준 기술

 

장점

  • 응답마다 다시 요청을 해야 하는 Long Polling 방식보다 효율적입니다.
  • Client가 Server와 크게 통신할 필요 없이, 업데이트된 데이터만 받는 실시간 데이터 스트림을 받을 수 있습니다.

        ▶︎ Server로부터 단방향으로 데이터를 받아야하는 구현이 필요할 때 좋은 선택입니다.

  • HTTP위에서 작동하기 때문에 WebSocket에 비해 얻을 수 있는 이점이 있습니다.

       ▶︎ HTTP MultiPlexing도 지원합니다.

       ▶︎ 또한, HTTP를 통해 전송되므로 특별한 프로토콜이나 서버 구현이 필요하지 않음.

 

단점

  • HTTP를 통한 SSE(HTTP/2가 아닐 경우)는 브라우저 당 6개의 연결로 제한됩니다.

        ▶︎ 사용자가 웹 사이트의 여러 탭을 열면 첫 6개의 탭 이후에는 SSE가 작동하지 않는다.

 

SSE(Server-Sent Event)을 채택한 이유

  • 알림 기능은 Client가 Server로부터 단방향으로 데이터를 받는 서비스입니다.

        ▶︎ WebSocket의 양방향의 특성을 사용할 필요가 없었습니다.

 

  • 시간적 자원을 고려하여 WebSocket에 대한 WS 서버 구현보다는 HTTP으로 구현할 수 있는 SSE을 선택였습니다.

 

caniuse 사이트에서도 확인해보면 근소하게 더 많은 환경에서 SSE을 사용할 수 있다는 결과를 보여주고 있습니다.

 

Can I use... Support tables for HTML5, CSS3, etc

 

caniuse.com

 

 

 

SSE에 대해서 더 알아보자

 

Request Header

Accept:text/event-stream     //Text 형태의 Event-Stream으로 전달될 것이다.
Cache-Control:no-cache       //캐시 사용 X, 실시간 업데이트를 제공

 

text/event-steram

텍스트 데이터에 최적화되어 있다. ( 바이너리 처리에는 부적합하다 )

 

EventStream Response

 

1) Json형식의 Text

Data : { "name" : "홍길동", "age" : "19" }

 

2) 일반적인 Text

Data : 내 이름은 홍길동이고, 나이는 19살이야

 

3) Custom Text

Data : name=홍길동&age=19

 

Socket보다는 배터리 소모량이 적다.

 

Socket은 양방향으로써 Client에서 데이터를 주는 상황이 발생하기 때문에 백그라운드에서 지속적으로 동작하고 있어야 합니다.

또한, TCP 연결을 유지하기 위해서 일정한 주기 패킷을 교환하는 비용도 추가적으로 생깁니다.

 

SSE는 Server에서 Client로 오는 단방향이기 때문에 Server에서 오는 이벤트 데이터만 인식하고 있으면 되기 때문에 Connection을 형성한 뒤 기다리기만 하면 되기 때문입니다.

 

JavaScript에서 EventStream을 받는 EventSource에서는 PolyFill 형태도 지원하고 있어서 IE와 같은 브라우저에서도 적용이 가능합니다.

 

PolyFill : 브라우저가 지원하지 않는 JavaScript 코드를 지원 가능하도록 변환한 코드

→ 원래는 지원하지 않지만 변환되었기 때문에 최신 기능을 똑같이 구현 가능해주는 방식입니다.


[SSE 구현에 대한 고민]

흠..  WebFlux 환경에서 개발을 진행하고 있는데 어떻게 SSE을 통해서 알림을 보내지?

 

WebMvc → SseEmitter

 

⭐️ WebFlux → Reactor Sinks

 

WebFluxReactor을 이용해서 알림을 구현하는 글들을 읽었을 때 Flux.Interval()을 쓰는 것이 많았습니다.

 

Flux.Interval()을 사용하여 데이터를 주기적인 간격으로 전달하는 방법??

 

→ 원하던 구현 방향은 이벤트가 발생했을 때 데이터를 보내주는 것인데..

 

→ 주기적인 간격이 존재하면 실시간으로 이벤트를 전달하는 것이 아니고, 그럼 실시간이 아닌 것 같은데…

 

이벤트가 발생할 때!!! 알림을 발생하고 실시간이어야 하는데!!

 

Flux.Sinks, 유레카!!

 

Sinks are constructs through which Reactive Streams signals can be programmatically pushed, with Flux or Mono semantics.

 

These standalone sinks expose tryEmit methods that return an Sinks.EmitResult enum, allowing to atomically fail in case the attempted signal is inconsistent with the spec and/or the state of the sink.

 

This class exposes a collection of (Sinks.Many builders and Sinks.One factories. Unless constructed through the unsafe() spec, these sinks are thread safe in the sense that they will detect concurrent access and fail fast on one of the attempts. unsafe() sinks on the other hand are expected to be externally synchronized (typically by being called from within a Reactive Streams-compliant context, like a Subscriber or an operator, which means it is ok to remove the overhead of detecting concurrent access from the sink itself).

 

[제가 번역한 것이라서… 어색한 부분이 있을 수 있습니다.](영어 잘 못해요 T^T)

 

SinksFluxMono의 의미 체계를 이용하여 Reactor Stream의 Signal들을 프로그래밍적으로 Push할 수 있게 해주는 구성체입니다.

 

독립적인 Sinks는 Sinks.EmitResult enum을 내보내는 tryEmit Method을 진행할 때, 시도한 신호가 싱크의 사양 및/또는 상태와 일치하지 않을 경우 원자적으로 실패할 수 있습니다.

 

Sinks는 Sinks.Many Builder, Sinks.One factories의 Collection으로 내보낼 수 있습니다. 안전하지 않은 사양으로 구성되지 않는 한, Sinks는 동시 접근과 여러 시도 중 한 번의 빠른 실패를 감지해서 스레드가 안전합니다. 반면 안전하지 않은 Sinks는 외부에서 동기화될 것으로 예상됩니다(일반적으로 Subscriber 또는 operator 같은 Reactive Streams-compliant context으로부터 호출되어 진행됩니다. 즉, Sinks 자체에서 동시 액세스를 감지하는 오버헤드를 제거해도 괜찮습니다.)

 

정리

Reactor-Stream으로 발생하는 Signal을 Mono, Flux으로 Push할 수 있게 진행합니다.

Sinks을 asMono, asFlux을 통해서 Subscribe을 진행하여 Reactor-Stream을 Signal을 받을 수 있습니다.

 

Sinks는 Sinks.Many, Sinks.One을 통해서 특징에 맞게 구현할 수 있으며, thread Safe하게 Signal을 제공합니다.

 

 

Sinks (reactor-core 3.6.3)

Sinks are constructs through which Reactive Streams signals can be programmatically pushed, with Flux or Mono semantics. These standalone sinks expose tryEmit methods that return an Sinks.EmitResult enum, allowing to atomically fail in case the attempted s

projectreactor.io

 

Sinks에 더 알아보자

Sinks.One한 건의 데이터를 전송하는 방법

→ 여러 건의 Emit을 진행해도 첫 Emit만 전달되고, 나머지는 전달받지 못합니다.

asMono을 통해서 전달받음으로써, Mono의 의미 체계를 사용한다고 이야기합니다.

 

Sinks.Many여러 건의 데이터를 전송하는 방법

→ 여러 건의 Emit을 진행해도 ManySpec에 따라 전달합니다.

asFlux을 통해서 전달받음으로써, Flux의 의미 체계를 사용한다고 이야기합니다.

 

⭐ Notification 서비스는 지속적으로 여러 데이터를 실시간으로 받아야 하기 때문에 Sinks.Many을 통해서 구현을 진행할 예정임으로, ManySpec에 대해서 알아보겠습니다.

 

ManySpec

 

multicast() : 다수의 Subscriber에게 broadcast 형식으로 Signal을 보내는 Sinks.Many을 만들어줍니다.

 

replay() : 다수의 Subscriber에게 broadcast 형식으로 Signal을 보내며, 조건에 따라 Subscribe 이전에 Emit 데이터를 받을 수 있는 Sinks.Many을 만들어줍니다.

all() : Subscribe 이전 Emit 데이터 모두 재전달(Replay)합니다.

limit() : Subscribe 이전 limit 조건에 만족하는 데이터 모두 재전달(Replay)합니다.

latest() : Subscribe 이전 가장 최근 데이터를 재전달(Replay)합니다.

 

unicast() : 한 명의 Subscriber에게 broadcast 형식으로 Signal을 보내는 Sinks.Many을 만들어줍니다.

 

⭐ Notification 서비스는 Web Potal에 접근한 기준으로 실시간 알림을 전달하기 때문에 이전에 받았던 알림 데이터를 재전달 할 필요가 없기 때문에 multicast()을 사용할 것입니다.

 

SInks.many().multicast()

 

directAllOrNothing()

 

direct : 요소를 생성하는 데 직접적으로 메인 스레드를 사용, 메인 스레드의 부담이 커지고, I/O 등의 블로킹 작업이 발생할 경우 전체 애플리케이션 성능에 영향을 줄 수 있습니다.

 

AllOrNothing : 요소를 처리할 수 없는 Subscriber가 존재할 경우, 해당 요소를 처리할 수 있는 다른 Subscriber에게도 전달되지 않습니다. 모두 요소를 처리해야 데이터가 전달됩니다.

directBestEffort()

 

direct : 요소를 생성하는 데 직접적으로 메인 스레드를 사용, 메인 스레드의 부담이 커지고, I/O 등의 블로킹 작업이 발생할 경우 전체 애플리케이션 성능에 영향을 줄 수 있습니다.

 

BestEffort : 요소를 처리하지 못하는 Subscriber가 존재하더라도, 처리할 수 있는 Subscriber에게는 데이터를 전달하려고 최대한 노력합니다.

onBackpressureBuffer()

 

warmup에서 생긴 데이터들을 Buffer에 저장하여 관리한 뒤, 첫 Subscriber로 의해 hot sequence 변경되어 Buffer에 있던 데이터들을 전달받습니다.

 

→hot sequence에서는 multicast 방식으로 진행되어서 모든 Subscriber에게 데이터를 전달합니다.

 

⭐ directAllOrNothing, directBestEffort는 메인 스레드로 돌아가는데 알림 서버에는 부담이 생길 수 있으며, multicast 형식으로 데이터를 전달해주면 되기 때문에 onBackpressureBuffer()으로 Sinks.Many을 구현하였습니다.

 


[구현!]

 

📌 WebFlux 환경에서 진행하였으며, WebMvc 관련해서는 SseEmitter 찾아보시는 것을 추천드립니다.

 

Controller

...
@RequiredArgsConstructor
public class ReadNotificationController {

  ...
  private final SSEProcessor sendSSEProcessor;
  
  @GetMapping(value = "/sse", produces = MediaType.TEXT_EVENT_STREAM_VALUE)
  public Flux<ServerSentEvent<Object>> sseConnect(
		@RequestHeader("userId") String userId) {
    return sendSSEProcessor.connect(userId);
  }

  @GetMapping(value = "/sse/connect")
  public Mono<ResponseEntity<Boolean>> sseSuccessConnection(
          @RequestHeader("userId") String userId
  ) {
    return sendSSEProcessor.successMessageSend(userId)
            .map(isSuccess -> new ResponseEntity<>(isSuccess, HttpStatus.OK));
  }
	...
}

 

produces = MediaType.TEXT_EVENT_STREAM_VALUE)

SSE text/event-stream 으로 처리해야 하기 때문에 적용시켜주셔야 합니다.

 

sseSuccessConnection APISSE을 연결을 확인하기 위한 Message을 전달하기 위한 API입니다.

해당 API는 맞닥뜨린 문제를 해결하기 위해 추가된 것입니다. 자세한 설명은 만난 문제와 같이 설명할 것이므로 찾아보고 싶으시면 스크롤을 내려주시고 확인해주시면 감사하겠습니다.

 

SSEProcessor

...
@Component
public class SendSSEProcessor {
  private final Map<String, Sinks.Many<ServerSentEvent<Object>>> sinks = new HashMap<>();

    /**
     * Send notification to user
     */
    public void personalSend(SSENotificationData notificationData, String userId) {
        if(sinks.containsKey(userId)){      //알림을 받을 사용자가 현재 SSE로 연결한 경우 알림 발송
            sinks.get(userId).tryEmitNext(ServerSentEvent.builder()
                    .event("message")
                    .data(notificationData)
                    .id(notificationData.getNotificationId())
                    .comment(notificationData.getMessage())
                    .build());
        }
    }

    /**
     * Send notification to users
     */
    public void groupSend(SSENotificationData notificationData, List<User> users) {
        users.forEach(user -> {     //그룹에 속한 사용자들에게 알림 발송
            personalSend(notificationData, user.getId());
        });
    }

    public Mono<Boolean> successMessageSend(String userId) {
        return Mono.just(userId)
                .flatMap(id -> {
                    if (sinks.containsKey(id)) {      //알림을 받을 사용자가 현재 SSE로 연결한 경우 알림 발송
                        sinks.get(id).tryEmitNext(ServerSentEvent.builder()
                                .event("config")
                                .data("Connected Successfully")
                                .comment("Connected Successfully")
                                .build());
                        return Mono.just(true);
                    }
                    return Mono.error(new CommonExceptions.BadRequestException("존재하지 않는 SSE 채널입니다."));
                });
    }

    /**
     * Connect to SSE
     */
    public Flux<ServerSentEvent<Object>> connect(String userId) {


        if(sinks.containsKey(userId)){  //이미 SSE 연결이 되어있는 경우
            return sinks.get(userId).asFlux();
        }

        //SSE 연결이 되어있지 않은 경우
        sinks.put(userId, Sinks.many().multicast().onBackpressureBuffer());
        return sinks.get(userId).asFlux().doOnCancel(()->{
            log.info("### SSE Notification Cancelled by client: " + userId);
            this.finish(userId);
        });
    }

    /**
     * Disconnect from SSE
     */
    public void finish(String userId) {
        sinks.get(userId).tryEmitComplete();
        sinks.remove(userId);
    }
}

 

각 코드들을 살펴보면

private final Map<String, Sinks.Many<ServerSentEvent<Object>>> sinks = new HashMap<>();

 

Map으로 Sinks.Many를 관리하는 이유

  • 각 사용자마다 Sinks을 관리를 진행해야 했습니다.

        ▶︎ 1개의 Sinks을 쓰면 모든 사용자가 같은 Sinks의 Subscriber가 되면서 개인 알림이 아닌 전체 알림만 가능하게 됩니다.

 

  • 동일한 아이디로 중복 접근이 되어야 하는 환경이기 때문에 Map으로 같은 Sinks을 바라보게 해주기 위함입니다.

    /**
     * Connect to SSE
     */
    public Flux<ServerSentEvent<Object>> connect(String userId) {


        if(sinks.containsKey(userId)){  //이미 SSE 연결이 되어있는 경우
            return sinks.get(userId).asFlux();
        }

        //SSE 연결이 되어있지 않은 경우
        sinks.put(userId, Sinks.many().multicast().onBackpressureBuffer());
        return sinks.get(userId).asFlux().doOnCancel(()->{
            log.info("### SSE Notification Cancelled by client: " + userId);
            this.finish(userId);
        });
    }

 

Map을 사용하였기 때문에 userId을 기반으로한 이미 Sinks가 존재하면 해당 Sinks을 asFlux()을 통해서 기존 Sinks에 대한 Subscriber로 만듭니다.

  if(sinks.containsKey(userId)){  //이미 SSE 연결이 되어있는 경우
            return sinks.get(userId).asFlux();
   }

 

Map에 Key값이 없는 새로운 userId일 경우 새로운 Sinks에 대해서 만들어 준 뒤 asFlux()을 통해서 첫 Subscriber로 만들어줍니다.

        //SSE 연결이 되어있지 않은 경우
        sinks.put(userId, Sinks.many().multicast().onBackpressureBuffer());
        return sinks.get(userId).asFlux().doOnCancel(()->{
            log.info("### SSE Notification Cancelled by client: " + userId);
            this.finish(userId);
        });

    /**
     * Disconnect from SSE
     */
    public void finish(String userId) {
        sinks.get(userId).tryEmitComplete();
        sinks.remove(userId);
    }

 

강제적으로 해당 userId를 가진 Sinks을 끊어주는 함수입니다.


    /**
     * Send notification to user
     */
    public void personalSend(Dto dto, String userId) {
        if(sinks.containsKey(userId)){      //알림을 받을 사용자가 현재 SSE로 연결한 경우 알림 발송
            sinks.get(userId).tryEmitNext(ServerSentEvent.builder()
                    .event("message")
                    .data(dto)
                    .id(dto.getId())
                    .comment(dto.getMessage())
                    .build());
        }
    }

    /**
     * Send notification to users
     */
    public void groupSend(Dto dto, List<String> users) {
        users.forEach(userId -> {     //그룹에 속한 사용자들에게 알림 발송
            personalSend(dto, userId);
        });
    }

 

개인 알림 전달

    /**
     * Send notification to user
     */
    public void personalSend(Dto dto, String userId) {
        if(sinks.containsKey(userId)){      //알림을 받을 사용자가 현재 SSE로 연결한 경우 알림 발송
            sinks.get(userId).tryEmitNext(ServerSentEvent.builder()
                    .event("message")
                    .data(dto)
                    .id(dto.getId())
                    .comment(dto.getMessage())
                    .build());
        }
    }

 

전달해야 하는 userId의 Sinks에 tryEmitNext()을 통해서 데이터를 Subscriber들에게 전달하였습니다.

 

Event-Stream으로 데이터를 전달할 때 ServerSentEvent으로 Wrapping하여 데이터를 전달하였습니다.

 

ServerSentEvent (Spring Framework 6.1.4 API)

Return the retry field of this event, if available.

docs.spring.io

 

Reactive Spring에 SSE에 대한 표현을 지원해줍니다.

 

FrontEnd 개발자와 Custom Event를 정의하여 데이터를 더 원활하게 사용할 수 있도록 도움을 줍니다.

  • Connection 관련에서는 Config으로 정의하였으며, 데이터를 전달할 때는 message로 정의하였습니다.
  • ServerSentEvent.id에는 notification에 대한 UUID의 값을 전달하도록 약속하였습니다.
  • data에서는 notification message뿐만 아니라 표현에 필요한 데이터(level, title …)를 전달하였습니다.
  • message에서는 Notification Message을 전달하였습니다.

그룹 알림 전달은 각 사용자들에게 똑같은 알림에 대해서 PersonalSend을 반복하는 것임으로 설명은 생략하겠습니다.


    public Mono<Boolean> successMessageSend(String userId) {
        return Mono.just(userId)
                .flatMap(id -> {
                    if (sinks.containsKey(id)) {      //알림을 받을 사용자가 현재 SSE로 연결한 경우 알림 발송
                        sinks.get(id).tryEmitNext(ServerSentEvent.builder()
                                .event("config")
                                .data("Connected Successfully")
                                .comment("Connected Successfully")
                                .build());
                        return Mono.just(true);
                    }
                    return Mono.error(new CommonExceptions.BadRequestException("존재하지 않는 SSE 채널입니다."));
                });
    }

 

SSE 연결이 완료되었음을 확인 데이터를 보내기 위함입니다.

연결 확인이기 때문에 ServerSentEvent.event는 FrontEnd와 약속했던 config으로 정의해서 보내고 있습니다.

 

Temp Dto

...
public class Dto{
		private String id;
		private String message;
		private String level;
		...
}

 

PostMan


[추가 정보]

다른 서비스에서 Notification 발생을 요청하는 공통 인터페이스 형식의 API을 만들어서 Server끼리 통신할 수 있도록 하였습니다.

 

해당 API에서 Notification 생성 및 전달까지 모두 진행한다면 다른 서비스들이 Notification Server에서 진행하는 로직을 모두 확인한 뒤에 진행하기 때문에 비효율적이라고 판단하였습니다.

 

Notification 발생 API는 전달받은 데이터를 기준으로 Notification의 데이터만 DB에 저장시키고, Kafka을 통해서 Message을 발행하여 Notification을 전달하는 과정과 분리하였습니다.

 

간단한 그림으로 표현(세부적인 로직은 제외한 간단한 흐름도)

 

Local 환경에서 위와 같이 Kafka을 사용하셔서 테스트를 진행하고 싶으시면, 아래 글을 참고하셔서 환경을 만들어주시면 됩니다.

 

[Kafka] Docker Compose 를 이용하여 Single Broker 구성하기

 

devocean.sk.com

 

Kafka Cosumer

public class NotificationEventBusListener {
  /**
   * consumeKafkaNotificationMessage : Kafka 알림 이벤트 수신 및 전송
   * @param message : Kafka 메시지
   */
  @KafkaListener(
      topics = "notification",
      groupId = "notification"
  )
  public void consumeKafkaNotificationMessage(String message) {
		...
  }

 

@KafkaListener을 통해서 Kafka Message을 Consume을 진행하여 알림 발송을 진행하였습니다.

알림 발송, 알림 공통 인터페이스(알림 생성)을 분리시켰습니다.

 

알림 채널 Email, SMS, AlimTalk에 대해서는 NCP을 이용하여 구현하였습니다.

 

NAVER CLOUD PLATFORM

cloud computing services for corporations, IaaS, PaaS, SaaS, with Global region and Security Technology Certification

www.ncloud.com

 

Email : Cloud Outbound Mailer

 

Cloud Outbound Mailer 개요

 

api.ncloud-docs.com

 

SMS, Alim Talk : SENS(Simple & Easy Notification Service)

 

SENS 개요

 

api.ncloud-docs.com


[마주한 문제]

 

1. 알림 서비스를 제공해야 하는 환경은 동일한 아이디로 중복 접근이 되어야 합니다.

 

[생각중…..🧐]

Map<userId, Sinks>로 해서 동일한 Key의 대한 접근이면 똑같은 Sinks을 주면 같은 Stream을 유지하는 방향으로 시도해보면 되지 않을까??

또한, Sinks을 구분하는 Key값은 사용자마다 구분할 수 있는 UserID(UUID)을 기준으로 정의하면 되지 않을까??

 

[구현]

  Map<String, Sinks.Many<ServerSentEvent<Object>>> sinks

 

Sinks와 Map<>을 통해서 동일한 userId(UUID)가 접근했을 때 같은 Sinks을 Subscriber로 등록하여 동일한 데이터를 전달받을 수 있도록 진행하였습니다.

⇒ 성공!!

 

2. SSE을 연결하였을 때, pending 상태 유지로 인하여 EventSource에 대한 Success, Fail 에 대한 EventDriven을 못하는 현상이 발생하였습니다.

Server 문제로 pending인지, 알림이 도착하지 않아서 pending인지 구분하지 못하는 문제가 생겼습니다.

 

[첫 번째 시도 X]

 

SinksonBackpressureBuffer()을 통해서 만들기 때문에 Sinks을 생성되었을 때 아래와 같은 Message을 보내도록 설정하였습니다.

sinks.get(id).tryEmitNext(ServerSentEvent.builder()
   .event("config")
   .data("Connected Successfully")
   .comment("Connected Successfully")
   .build());

 

위와 같이 시도했을 때, 중복된 아이디로 다수의 사람들이 접근하였을 때 첫 로그인한 회원에게만 Connected Successfully이 전달되었으며, 나머지 사람들은 알림이 발생하기 전까지 pending 상태가 유지되었습니다.

onBackpressureBuffer()첫 Subscriber에게 Buffer에 저장된 데이터들을 모두 전달하기 때문입니다.

 

[두 번째 시도 X]

 

중복된 사용자가 같은 ID에 Sinks.asFlux()로 Subscriber이 되었을 때 Connected Successfully메시지들 전달 해보자!

 

Sinks.asFlux()가 Subscriber가 되기 전에 Message을 전달해도, 이전에 존재했던 모든 Subscriber들에게 전달하고, 이번에 새로운 Subscriber에게는 전달되지 못합니다.

 

[세 번째 시도]

 

  @GetMapping(value = "/sse/connect")
  public Mono<ResponseEntity<Boolean>> sseSuccessConnection(
          @RequestHeader("userId") String userId
  ) {
    return sendSSEProcessor.successMessageSend(userId)
            .map(isSuccess -> new ResponseEntity<>(isSuccess, HttpStatus.OK));
  }

 

위와 같이 Connected Successfully을 보내는 API을 만든 뒤에 FE에서 sse 연결을 시도한 뒤에 해당 API을 전송하도록 하였습니다.

 

SSE의 pending 상태를 해당 API을 호출함으로써 Connected Successfully 메시지가 전달되고, 200으로 변경되도록 함으로써 EventSource의 Success, Fail EventDriven 제대로 작동하도록 구현하였습니다.

→ 하지만 이 방법이 옳은 방법인지는 모르겠습니다.

 

★ 이 부분에 혹시, 아는 분이 계시면 어떻게 해결해야하는 지 알려주세요. 정말 알고 싶습니다.

 

3. 현재는 알림을 제공하는 서비스에 사용자가 적기 때문에 알림 기록들을 RDB에서 관리하고 있습니다.

 

하지만, 사용자가 많아진다면??

→ 이건 진짜 큰 문제!!

 

알림 데이터는 쌓이고 해당 데이터가 RDB가 관리하지 못할정도로 채워지면 DB가 터집니다.

 

그래서 ShortTerm, LongTerm을 구분해서 관리할 수 있습니다.

 

ShortTerm → 3개월

 

LongTerm → 3개월 이후.

 

ShortTerm을 RDB에서 관리하여, 최근 데이터를 다룰 때에만 사용하며,

 

LongTerm은 NoSQL(MongoDB)으로 관리하며, 샤딩을 이용해서 데이터를 관리할 수도 있을 것 같다고 생각하였습니다.

댓글