📌 Reactor의 Publisher 개념 이해하기
Reactor에서 Publisher는 비동기 데이터 스트림을 제공하는 핵심 인터페이스이다.
Publisher는 데이터를 생산하고, Subscriber는 데이터를 소비하는 방식으로 동작하며, Reactive Streams 표준을 따른다.
링크 에 설명되어있는 바와 같이 CorePublisher 를 Implement 하고 있는 게 보인다.
1. Publisher란?
Publisher<T>는 비동기적으로 데이터를 방출하는 개체이다. Reactor의 Mono와 Flux는 이 Publisher 인터페이스를 구현하고 있다.
public interface Publisher<T> {
void subscribe(Subscriber<? super T> subscriber);
}
Publisher의 역할
- 데이터를 제공하는 역할: Flux 또는 Mono는 Publisher의 구체적인 구현체로, 데이터 스트림을 비동기적으로 발행한다.
- 구독자(Subscriber)와의 협력: Publisher는 Subscriber가 요청한 만큼 데이터를 전달한다. (Backpressure 처리)
- 데이터 스트림의 종료를 알림: 정상적으로 완료되거나, 에러가 발생했음을 Subscriber에게 통지한다.
2. Publisher의 대표적인 구현체
Reactor에서는 두 가지 주요 Publisher가 제공된다 : Mono 와 Flux가 그 예시이다.
Mono<T> (최대 1개)
- 0개 또는 1개의 값을 발행하는 Publisher
- 단일 값 처리에 적합
- 주요 연산
- Mono.just(T data): 단일 값을 가진 Mono 생성
- Mono.empty(): 빈 Mono 생성
- Mono.error(Throwable e): 에러를 발생시키는 Mono 생성
- Mono.fromCallable(Callable<T>): 호출 가능한 값으로 Mono 생성
- Mono.zip(mono1, mono2, (data1, data2) -> ...): 두 개 이상의 Mono 결과를 조합
Flux<T> (0개 이상)
- 0개 이상의 값을 발행하는 Publisher
- 여러 개의 값을 스트리밍하는 경우 사용
- 주요 연산
- Flux.just(T... data): 여러 개의 값을 가진 Flux 생성
- Flux.fromIterable(List<T>): 리스트 등의 컬렉션에서 Flux 생성
- Flux.range(start, count): 특정 범위의 숫자 스트림 생성
- Flux.interval(Duration.ofSeconds(n)): 일정 간격으로 값을 방출하는 Flux 생성
- Flux.merge(flux1, flux2): 여러 Flux를 병합
- Flux.concat(flux1, flux2): 여러 Flux를 순차적으로 연결
- Flux.zip(flux1, flux2, (data1, data2) -> ...): 여러 Flux를 결합하여 새로운 데이터 생성
Flux와 Mono의 예외 처리
- onErrorReturn(): 예외 발생 시 기본값 반환
- onErrorResume(): 예외 발생 시 대체 Mono 또는 Flux 반환
- doOnError(): 예외 발생 시 로깅 또는 추가 처리
예외처리 관련은 따로 또 정리할 예정이다..
3. Publisher와 Subscriber의 관계
Publisher는 데이터를 생성하고, Subscriber는 데이터를 소비하는 구조이다. Publisher.subscribe(Subscriber)를 호출하면 스트림이 시작된다.
Flux<Integer> flux = Flux.range(1, 5);
flux.subscribe(
data -> System.out.println("Received: " + data), // onNext
error -> System.err.println("Error: " + error), // onError
() -> System.out.println("Complete!") // onComplete
);
출력
Received: 1
Received: 2
Received: 3
Received: 4
Received: 5
Complete!
Subscriber의 주요 메서드
public interface Subscriber<T> {
void onSubscribe(Subscription s); // 구독이 시작될 때 호출
void onNext(T t); // 데이터를 받을 때 호출
void onError(Throwable t); // 에러가 발생했을 때 호출
void onComplete(); // 스트림이 끝났을 때 호출
}
4. Backpressure (역압)
- Backpressure이란 Subscriber가 Publisher로부터 받을 데이터를 조절하는 기능이다. Publisher가 너무 빠르게 데이터를 방출하면 Subscriber가 처리 속도를 맞추지 못해 문제가 발생할 수 있기 때문에 적당히 조절하는 것이 중요하다.
Backpressure 전략
- Subscriber가 Subscription.request(n)을 통해 원하는 데이터 개수를 요청할 수 있음.
- Flux나 Mono는 onBackpressureBuffer(), onBackpressureDrop() 등을 통해 Backpressure를 제어 가능함.
Flux.range(1, 100)
.onBackpressureBuffer(10) // 최대 10개까지 버퍼링
.subscribe(
data -> System.out.println("Received: " + data),
err -> System.err.println("Error: " + err),
() -> System.out.println("Complete!")
);
5. Publisher의 주요 연산 (Operators)
1) map()
stream.map 처럼 데이터를 순회하면서 변환한다.
2) flatMap()
map과 비슷하지만 비동기 라는 점이 다르다. 때문에 순서가 보장되지 않는다!!! 순서를 보장하고 싶다면 3)concatMap 을 사용해야 한다.
java
복사편집
Flux<Integer> flux = Flux.range(1, 10)
.filter(n -> n % 2 == 0);
flux.subscribe(System.out::println);
// 출력: 2, 4, 6, 8, 10
3) concatMap()
순서를 보장한다.
Flux<Integer> flux = Flux.range(1, 5)
.concatMap(n -> Mono.just(n * 10)
.delayElement(Duration.ofMillis(500))); // 순차적으로 실행
flux.subscribe(System.out::println);
Thread.sleep(3000);
4) zip() - 여러 Publisher 결합
Flux<String> names = Flux.just("Alice", "Bob", "Charlie");
Flux<Integer> ages = Flux.just(25, 30, 35);
Flux<String> zippedFlux = Flux.zip(names, ages, (name, age) -> name + " is " + age);
zippedFlux.subscribe(System.out::println);
// 출력:
// Alice is 25
// Bob is 30
// Charlie is 35
실제 예제
비동기로 데이터를 가공하고 싶을 때, 이런식으로 사용한다. 실제로 사용한 코드이다.
Flux.fromIterable(request.getData())
.parallel(API_PARALLEL_SIZE)
.runOn(Schedulers.boundedElastic())
.flatMap(dataId -> processData(dataId, entity))
.sequential()
.then()
.block(); // 비동기 작업 완료 대기
- fromIterable :
- 컬렉션(리스트, 셋 등)을 Flux 스트림으로 변환하는 연산자
- iterable 객체에 대해 각각 비동기 데이터로 변환하여, Flux 를 만들어 반환한다
- parallel
- 여러 개의 스레드를 활용해 작업을 병렬로 수행
- runOn 과 함께 사용. (스케줄러 역할)
- flux 가 아니라 parallelFlux 를 반환한다. ( 주의 - .subscribe 은 Flux<T> 에서만 사용가능함. 그래서 아래에서 sequential 을 수행)
-runOn
- parallel 을 하면 데이터가 단순히 쪼개질 뿐 병렬로 수행되지는 않는다. runOn 을 이용해야만 병렬로 스레드가 여러개 생성되어 수행된다.
- 병렬 처리를 위한 스레드 풀을 제공하는 특정 스케줄러에서 실행되도록 설정한다.
- 네트워크 요청, 데이터베이스 호출, 파일 입출력 같은 작업은 Schedulers.boundedElastic()이 적합하다고 한다.
- 그 외에 Schedulers.parallel()은 cpu바운드 작업에 적합하다고 한다. 그 외에도 여럿 있다.
- flatMap
- 이전에 나왔지만 다시 설명하자면 Flux 데이터 (기본적으로 여러 개) 를 펼쳐서 처리하는 연산자이다.
- sequential
- 병렬로 처리한 데이터를 다시 하나의 Flux 스트림으로 합치는 연산자.
- ParallelFlux → Flux 변환 역할.
- then()
- Flux의 모든 요소를 처리한 후 실행된다.
- 원본 Flux의 결과는 무시하고 새로운 Mono를 반환한다.
- 이전 Flux가 완료되었을 때 후속 작업을 수행할 때 유용하다!
- 이외에도 thenMany(Flux 반환), thenEmpty(Mono<Void> 반환) 등이 있다.
- block()
- 이 코드를 사용하면 코드가 비동기적이 아니라 동기적으로 실행하게 된다.
- 비동기를 사용하려는 의미가 조금 달라져서 지양된다고는 한다.
대안 1. .subscribe() 를 사용
Flux.fromIterable(request.getData())
.parallel(API_PARALLEL_SIZE)
.runOn(Schedulers.boundedElastic())
.flatMap(dataId -> processData(dataId, entity))
.sequential()
.then()
.subscribe(
result -> System.out.println("완료!"), // onNext (Mono<Void>라 호출 안됨)
error -> System.err.println("오류 발생: " + error), // onError
() -> System.out.println("모든 작업 완료!") // onComplete
);
요런 경우에는 메소드가 반환된 후에도 백그라운드에서 계속 실행되기 때문에, 결과가 필요한 경우엔 사용하면 안된다.
대안 2 - Future 사용
CompletableFuture<Void> future = Flux.fromIterable(request.getData())
.parallel(API_PARALLEL_SIZE)
.runOn(Schedulers.boundedElastic())
.flatMap(dataId -> processData(dataId, entity))
.sequential()
.then()
.toFuture();
future.get(); // 블로킹하지만 Future 사용
block 대신 future 로 동기 실행하는 방식이다. 여전히 동기적인 방식이다.
대안 3 - when 사용
Mono.when(
Flux.fromIterable(request.getData())
.parallel(API_PARALLEL_SIZE)
.runOn(Schedulers.boundedElastic())
.flatMap(dataId -> processData(dataId, entity))
.sequential()
).subscribe(
unused -> System.out.println("모든 비동기 작업 완료!"),
error -> System.err.println("오류 발생: " + error)
);
- 모든 작업이 완료될 때만 후속 작업을 실행한다.
- block()을 사용하지 않으면서도 비동기 완료를 보장할 수 있다!
대안 4. CountDownLatch 사용 (Java 표준 스레드 동기화)
CountDownLatch를 사용하면 모든 작업이 완료될 때까지 await()로 대기 가능하다.
- CountDownLatch는 모든 작업이 끝날 때까지 await()로 대기 가능한 동기화 도구이다.
- Reactive 흐름을 유지하면서도, 모든 subscribe() 작업이 끝날 때까지 기다릴 수 있다.
CountDownLatch latch = new CountDownLatch(1); // 모든 작업이 완료될 때까지 대기
Flux.fromIterable(request.getPlateIds())
.parallel()
.runOn(Schedulers.boundedElastic())
.flatMap(plateId -> processPlate(plateId, entity, algoPackInfo, ctType, reqSettingValue, preAnalysisDto))
.sequential()
.doFinally(signalType -> latch.countDown()) // 모든 작업이 끝나면 countDown()
.subscribe();
try {
latch.await(); // 모든 작업이 끝날 때까지 대기
} catch (InterruptedException e) {
Thread.currentThread().interrupt();
}
다음 번에 기회가 된다면 Mono, Flux 의 운영체제 관점에서의 매커니즘, 그리고 에러가 발생시에는 어떻게 처리하는지에 대해서도 정리해보도록 하겠다.
*Mono, Flux 설명은 공식 문서에도 매우 잘 설명되어있다.
Simple Ways to Create a Flux or Mono and Subscribe to It :: Reactor Core Reference Guide
When implementing backpressure in Reactor, the way consumer pressure is propagated back to the source is by sending a request to the upstream operator. The sum of current requests is sometimes referenced to as the current “demand”, or “pending reques
docs.spring.io
'개발 > TIL' 카테고리의 다른 글
[SQL] 복합인덱스를 사용할 때 주의할 점 (0) | 2025.03.07 |
---|---|
[SQL] spring JPA 에서 자주 등장하는 ':1' 은? (feat 파라미터 바인딩) (0) | 2025.03.06 |
[SQL 튜닝] SQL 처리과정과 IO (1) | 2025.03.03 |
java 개선된 switch 문 사용하기 (2) | 2025.03.01 |
Validation과 Exception Handling (4) | 2025.03.01 |