Spring

[Spring WebFlux] 2. 리액티브 스트림즈(Reactive Streams)

Karla Ko 2024. 6. 13. 12:27
728x90

리액티브 스트림즈 (Reactive Streams)

  • 리액티브 라이브로리를 어떻게 구현할지 정의해 놓은 별도의 표준 사양
  • 데이터 스트림을 Non-Blocking이면서 비동기적인 방식으로 처리하깊 위한 리액티브 라이브러리의 표준 사양
  • RxJava, Reactor(Spring Framework와 가장 궁합이 잘맞음), Akka Streams, Java 9 Flow API 등

 

리액티브 스트림즈 구성요소

컴포넌트 설명
Publisher  데이터를 생성하고 통지(발행, 게시, 방출)하는 역할
Subscriber 구독한 Publisher로부터 통지(발행, 게시, 방출)된 데이터를 전달받아서 처리하는 역할
Subscription Publisher에 요청할 데이터의 개수를 지정하고, 데이터의 구독을 취소하는 역할
Processor Publisher와 Subscriber의 기능을 모두 가지고 있음 
Subscriber로서 다른 Publisher를 구독할 수 있고, Publisher로서 다른 Subscriber가 구독할 수 있음

 

Publisher와 Subscriber 동작과정

진행 과정

  1. 먼저 Subscriber는 전달받을 데이터를 구독합니다. (subscribe)
  2. 다음으로 Publisher는 데이터를 통지할 준비가 되었음을 Subscriber에게 알립니다. (onSubscribe)
  3. Publisher가 데이터를 통지할 준비가 되었다는 알림은 받은 Subscriber는 전달받기를 원하는 데이터의 개수를 Publisher에게 요청합니다. (Subscription.request)
  4. 다음으로 Publisher는 Subscriber로부터 요청받은 데이터를 통지합니다. (onNext)
  5. 이렇게 Publisher와 Subscriber 간에 데이터 통지, 데이터 수신, 데이터 요청의 과정을 반복하다가 Publisher가 모든 데이터를 통지하면 마지막으로 데이터 전송이 완료되었음을 Subscriber에게 알립니다.(onComplete) 만약 Publisher가 데이터를 처리하는 과정에서 에러가 발생하면 에러가 발생했음을 Subscriber에게 알림니다 (onError)
Publisher가 Subscription.request를 통해 데이터의 요청 개수를 지정하는 이유

- 실제로 Publisher와 Subscriber는 각각 다른 스레드에서 비동기적으로 상호작용 하는 경우가 대부분
- 이럴 경우 Publisher가 통지하는 속도가 Publisher로 부터 통지된 데이터를 Subscriber가 처리하는 속도보다 더 빠르면 처리를 기다리는 데이터는 쌓이게 되고, 이는 결과적으로 시스템 부하가 커짐
- 이러한 문제를 방지하기 위해 데이터 개수 제어

 

Publisher

public interface Publisher<T>{
	public voikd subscribe(Subscriber<? super T> s);
}
  • 파라미터로 전달받은 Subscriber를 등록하는 역할
Publisher인데 왜 subscribe 메서드가..?
- 리액티브 스트림즈에서의 Publisher/Subscriber는 Kafka의 Publisher/Subscriber랑 의미가 다름
- Kafka에서의 Publisher/Subscriber는 각각 브로커 내의 토픽만 바라보면 되기 때문에 Publisher는 특정 토픽으로 메시 데이터를 전송하기만 하면 되고, Subscriber는 특정 토픽을 구독하고 해당 토픽에 전달되는 메시지 데이터를 전달 받기만 하면됨(느슨한 결합 구조)
- 리액티브 스트림즈에서의 Publisher와 Subscriber는 개념상으로는 Subscriber가 구독하는 것이 맞는데, 실제 코드상에서는 Publisherrk subscribe 메서드의 파라미터인 Subscriber를 등록하는 형태로 구독이 이루어짐

Subscriber

public interface Subscriber<T> {
    public void onSubscribe(Subscrition s);
    public void onNext(T t);
    public void onError(Throwable s);
    public void onComplete();
    }
  • onSubscribe
    • 구독 시작 시점에 어떤 처리(요청할 데이터의 개수를 지정하거나 구독을 해지하는 것)를 하는 역할
    • 파라미터 객체를 통해서 이루어짐
  • onNext
    • Publisher가 통지한 데이터를 처리하는 역할
  • onError
    • Publisher가 데이터 통지를 위한 처리 과정에서 에러가 발생했을 때 해당 에러를 처리하는 역할
  • onComplete
    • Publisher가 데이터 통지를 완료했음을 알릴 때 호출된느 메서드
    • 데이터 통지가 정상적으로 완료될 경우에 어떤 후처리를 해야한다면 onComplete메서드에서 처리 코드 작성

Subscription

public interface Subscription {
    public void request(long n);
    public void cancel();
}
  • Subscriber가 구독한 데이터의 개수를 요청하거나 또는 데이터 요청 취소(구독 해지)하는 역할

Processor

public interface Processor<T,R> extends Subscriber<T>, Publisher<R> {
}

 

 

 

728x90