개발/TIL

java에서 비동기 처리하기 Mono와 Flux ( reactor.core 라이브러리)

ebang 2025. 3. 4. 22:13

📌 Reactor의 Publisher 개념 이해하기

Reactor에서 Publisher는 비동기 데이터 스트림을 제공하는 핵심 인터페이스이다.

Publisher는 데이터를 생산하고, Subscriber는 데이터를 소비하는 방식으로 동작하며, Reactive Streams 표준을 따른다.

 

링크 에 설명되어있는 바와 같이 CorePublisher 를 Implement 하고 있는 게 보인다. 


1. Publisher란?

Publisher<T>는 비동기적으로 데이터를 방출하는 개체이다. Reactor의 Mono와 Flux는 이 Publisher 인터페이스를 구현하고 있다.

 

 

 

public interface Publisher<T> {
    void subscribe(Subscriber<? super T> subscriber);
}

 

 Publisher의 역할

  • 데이터를 제공하는 역할: Flux 또는 Mono는 Publisher의 구체적인 구현체로, 데이터 스트림을 비동기적으로 발행한다.
  • 구독자(Subscriber)와의 협력: Publisher는 Subscriber가 요청한 만큼 데이터를 전달한다. (Backpressure 처리)
  • 데이터 스트림의 종료를 알림: 정상적으로 완료되거나, 에러가 발생했음을 Subscriber에게 통지한다.

2. Publisher의 대표적인 구현체

Reactor에서는 두 가지 주요 Publisher가 제공된다 : Mono 와 Flux가 그 예시이다. 

 Mono<T> (최대 1개)

  • 0개 또는 1개의 값을 발행하는 Publisher
  • 단일 값 처리에 적합
  • 주요 연산
    • Mono.just(T data): 단일 값을 가진 Mono 생성
    • Mono.empty(): 빈 Mono 생성
    • Mono.error(Throwable e): 에러를 발생시키는 Mono 생성
    • Mono.fromCallable(Callable<T>): 호출 가능한 값으로 Mono 생성
    • Mono.zip(mono1, mono2, (data1, data2) -> ...): 두 개 이상의 Mono 결과를 조합

 

 

 Flux<T> (0개 이상)

  • 0개 이상의 값을 발행하는 Publisher
  • 여러 개의 값을 스트리밍하는 경우 사용
  • 주요 연산
    • Flux.just(T... data): 여러 개의 값을 가진 Flux 생성
    • Flux.fromIterable(List<T>): 리스트 등의 컬렉션에서 Flux 생성
    • Flux.range(start, count): 특정 범위의 숫자 스트림 생성
    • Flux.interval(Duration.ofSeconds(n)): 일정 간격으로 값을 방출하는 Flux 생성
    • Flux.merge(flux1, flux2): 여러 Flux를 병합
    • Flux.concat(flux1, flux2): 여러 Flux를 순차적으로 연결
    • Flux.zip(flux1, flux2, (data1, data2) -> ...): 여러 Flux를 결합하여 새로운 데이터 생성

 

Flux와 Mono의 예외 처리

  • onErrorReturn(): 예외 발생 시 기본값 반환
  • onErrorResume(): 예외 발생 시 대체 Mono 또는 Flux 반환
  • doOnError(): 예외 발생 시 로깅 또는 추가 처리

예외처리 관련은 따로 또 정리할 예정이다.. 

 

 

 3. Publisher와 Subscriber의 관계

Publisher는 데이터를 생성하고, Subscriber는 데이터를 소비하는 구조이다. Publisher.subscribe(Subscriber)를 호출하면 스트림이 시작된다. 

Flux<Integer> flux = Flux.range(1, 5);

flux.subscribe(
    data -> System.out.println("Received: " + data),  // onNext
    error -> System.err.println("Error: " + error),   // onError
    () -> System.out.println("Complete!")            // onComplete
);

출력

Received: 1
Received: 2
Received: 3
Received: 4
Received: 5
Complete!

Subscriber의 주요 메서드

public interface Subscriber<T> {
    void onSubscribe(Subscription s); // 구독이 시작될 때 호출
    void onNext(T t);                 // 데이터를 받을 때 호출
    void onError(Throwable t);        // 에러가 발생했을 때 호출
    void onComplete();                // 스트림이 끝났을 때 호출
}

4. Backpressure (역압)

  • Backpressure이란 Subscriber가 Publisher로부터 받을 데이터를 조절하는 기능이다. Publisher가 너무 빠르게 데이터를 방출하면 Subscriber가 처리 속도를 맞추지 못해 문제가 발생할 수 있기 때문에 적당히 조절하는 것이 중요하다. 

Backpressure 전략

  • Subscriber가 Subscription.request(n)을 통해 원하는 데이터 개수를 요청할 수 있음.
  • Flux나 Mono는 onBackpressureBuffer(), onBackpressureDrop() 등을 통해 Backpressure를 제어 가능함. 
Flux.range(1, 100)
    .onBackpressureBuffer(10) // 최대 10개까지 버퍼링
    .subscribe(
        data -> System.out.println("Received: " + data),
        err -> System.err.println("Error: " + err),
        () -> System.out.println("Complete!")
    );

 

 

 

5. Publisher의 주요 연산 (Operators)

 1) map() 

stream.map 처럼 데이터를 순회하면서 변환한다.

2) flatMap() 

map과 비슷하지만 비동기 라는 점이 다르다.  때문에 순서가 보장되지 않는다!!!  순서를 보장하고 싶다면 3)concatMap 을 사용해야 한다. 

java
복사편집
Flux<Integer> flux = Flux.range(1, 10)
    .filter(n -> n % 2 == 0);

flux.subscribe(System.out::println);
// 출력: 2, 4, 6, 8, 10

 

 

3) concatMap()

순서를 보장한다. 

Flux<Integer> flux = Flux.range(1, 5)
    .concatMap(n -> Mono.just(n * 10)
        .delayElement(Duration.ofMillis(500))); // 순차적으로 실행

flux.subscribe(System.out::println);
Thread.sleep(3000);

 

4) zip() - 여러 Publisher 결합

Flux<String> names = Flux.just("Alice", "Bob", "Charlie");
Flux<Integer> ages = Flux.just(25, 30, 35);

Flux<String> zippedFlux = Flux.zip(names, ages, (name, age) -> name + " is " + age);

zippedFlux.subscribe(System.out::println);
// 출력:
// Alice is 25
// Bob is 30
// Charlie is 35

 

 

 

 

 

 

 

실제 예제

 비동기로 데이터를 가공하고 싶을 때, 이런식으로 사용한다. 실제로 사용한 코드이다. 

Flux.fromIterable(request.getData())
			.parallel(API_PARALLEL_SIZE)
			.runOn(Schedulers.boundedElastic())
			.flatMap(dataId -> processData(dataId, entity))
			.sequential()
			.then()
			.block(); // 비동기 작업 완료 대기

- fromIterable :

  • 컬렉션(리스트, 셋 등)을 Flux 스트림으로 변환하는 연산자
  • iterable 객체에 대해 각각 비동기 데이터로 변환하여, Flux 를 만들어 반환한다

- parallel

  • 여러 개의 스레드를 활용해 작업을 병렬로 수행
  • runOn 과 함께 사용. (스케줄러 역할)
  • flux 가 아니라 parallelFlux 를 반환한다. ( 주의 - .subscribe 은 Flux<T> 에서만 사용가능함. 그래서 아래에서 sequential 을 수행)

-runOn 

  • parallel 을 하면 데이터가 단순히 쪼개질 뿐 병렬로 수행되지는 않는다. runOn 을 이용해야만 병렬로 스레드가 여러개 생성되어 수행된다. 
  • 병렬 처리를 위한 스레드 풀을 제공하는 특정 스케줄러에서 실행되도록 설정한다.
  • 네트워크 요청, 데이터베이스 호출, 파일 입출력 같은 작업은 Schedulers.boundedElastic()이 적합하다고 한다. 
  • 그 외에 Schedulers.parallel()은 cpu바운드 작업에 적합하다고 한다.  그 외에도 여럿 있다. 

 

- flatMap

  • 이전에 나왔지만 다시 설명하자면 Flux 데이터 (기본적으로 여러 개) 를 펼쳐서 처리하는 연산자이다. 

- sequential

  • 병렬로 처리한 데이터를 다시 하나의 Flux 스트림으로 합치는 연산자. 
  • ParallelFlux → Flux 변환 역할.

- then()

  • Flux의 모든 요소를 처리한 후 실행된다. 
  • 원본 Flux의 결과는 무시하고 새로운 Mono를 반환한다. 
  • 이전 Flux가 완료되었을 때 후속 작업을 수행할 때 유용하다! 
  • 이외에도 thenMany(Flux 반환),  thenEmpty(Mono<Void> 반환) 등이 있다. 

 

- block()

  • 이 코드를 사용하면 코드가 비동기적이 아니라 동기적으로 실행하게 된다. 
  • 비동기를 사용하려는 의미가 조금 달라져서 지양된다고는 한다. 

대안 1. .subscribe() 를 사용

Flux.fromIterable(request.getData())
    .parallel(API_PARALLEL_SIZE)
    .runOn(Schedulers.boundedElastic())
    .flatMap(dataId -> processData(dataId, entity))
    .sequential()
    .then()
    .subscribe(
        result -> System.out.println("완료!"),  // onNext (Mono<Void>라 호출 안됨)
        error -> System.err.println("오류 발생: " + error),  // onError
        () -> System.out.println("모든 작업 완료!")  // onComplete
    );

요런 경우에는 메소드가 반환된 후에도 백그라운드에서 계속 실행되기 때문에, 결과가 필요한 경우엔 사용하면 안된다. 

 

 

대안 2 - Future 사용

CompletableFuture<Void> future = Flux.fromIterable(request.getData())
    .parallel(API_PARALLEL_SIZE)
    .runOn(Schedulers.boundedElastic())
    .flatMap(dataId -> processData(dataId, entity))
    .sequential()
    .then()
    .toFuture();

future.get();  // 블로킹하지만 Future 사용

block 대신 future 로 동기 실행하는 방식이다. 여전히 동기적인 방식이다. 

 

대안 3 - when 사용

Mono.when(
    Flux.fromIterable(request.getData())
        .parallel(API_PARALLEL_SIZE)
        .runOn(Schedulers.boundedElastic())
        .flatMap(dataId -> processData(dataId, entity))
        .sequential()
).subscribe(
    unused -> System.out.println("모든 비동기 작업 완료!"),
    error -> System.err.println("오류 발생: " + error)
);
  • 모든 작업이 완료될 때만 후속 작업을 실행한다. 
  • block()을 사용하지 않으면서도 비동기 완료를 보장할 수 있다!

 

대안 4. CountDownLatch 사용 (Java 표준 스레드 동기화)

CountDownLatch를 사용하면 모든 작업이 완료될 때까지 await()로 대기 가능하다. 

  • CountDownLatch는 모든 작업이 끝날 때까지 await()로 대기 가능한 동기화 도구이다. 
  • Reactive 흐름을 유지하면서도, 모든 subscribe() 작업이 끝날 때까지 기다릴 수 있다. 
CountDownLatch latch = new CountDownLatch(1); // 모든 작업이 완료될 때까지 대기

Flux.fromIterable(request.getPlateIds())
    .parallel()
    .runOn(Schedulers.boundedElastic())
    .flatMap(plateId -> processPlate(plateId, entity, algoPackInfo, ctType, reqSettingValue, preAnalysisDto))
    .sequential()
    .doFinally(signalType -> latch.countDown()) // 모든 작업이 끝나면 countDown()
    .subscribe();

try {
    latch.await(); // 모든 작업이 끝날 때까지 대기
} catch (InterruptedException e) {
    Thread.currentThread().interrupt();
}

 

 

 

다음 번에 기회가 된다면 Mono, Flux 의 운영체제 관점에서의 매커니즘, 그리고 에러가 발생시에는 어떻게 처리하는지에 대해서도 정리해보도록 하겠다. 

 

 

*Mono, Flux 설명은 공식 문서에도 매우 잘 설명되어있다. 

https://docs.spring.io/projectreactor/reactor-core/docs/current-SNAPSHOT/reference/html/coreFeatures/simple-ways-to-create-a-flux-or-mono-and-subscribe-to-it.html

 

Simple Ways to Create a Flux or Mono and Subscribe to It :: Reactor Core Reference Guide

When implementing backpressure in Reactor, the way consumer pressure is propagated back to the source is by sending a request to the upstream operator. The sum of current requests is sometimes referenced to as the current “demand”, or “pending reques

docs.spring.io