결합 연산자

생성 연산자와 변환 연산자는 1개의 데이터 흐름(Observable)을 다뤘다. 결합 연산자는 다수의 Observable을 하나로 합하는 방법을 제공한다. flatMap(), groupBy() 함수 등은 1개의 Observable을 확장해주는 반면, 결합 연산자들은 여러 개의 Observable을 내가 원하는 Observable로 결합해준다.

1. zip()

  • 각각의 Observable을 모두 활용해 2개 혹은 그 이상의 Observable을 결합한다.
  • 예를 들어, A, B 두 개의 Observable을 결합한다면 2개의 Observable에서 모두 데이터를 발행해야 결합할 수 있다. 그전까지는 발행을 기다린다.

zip() 함수는 최대 9개의 Observable을 결합할 수 있지만 보통 2개 혹은 3개면 충분하다. 다음은 간단한 zip() 함수의 사용 예제이다.

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16

public static void main(String[] args){
executeZipInteger();
}
private static void executeZipInteger() {
Observable.zip(
Observable.just(100, 200, 300),
Observable.just(10, 20, 30),
Observable.just(1, 2, 3),
(a, b, c) -> a + b + c
).subscribe(Log::i);
}
// 결과
main | value = 111
main | value = 222
main | value = 333

2. zipWith()

zipWith() 함수는 zip() 함수와 동일하지만 Observable을 다양한 함수와 조합하면서 틈틈이 호출할 수 있는 장점이 있다.

1
2
3
4
5
6
7
8
9
10
11
12
13
14
public class ZipWithSample {
public static void main(String[] args) {
Observable<Integer> source = Observable.zip(
Observable.just(100, 200, 300),
Observable.just(10, 20, 30),
(a, b) -> a + b)
.zipWith(Observable.just(1, 2, 3), (ab, c) -> ab * c);
source.subscribe(Log::i);
}
}
// 결과
main | value = 110
main | value = 440
main | value = 990

두 Observable을 zip() 함수로 묶고 세 번째 Observable을 다시 zipWith() 함수로 결합했다. zipWith() 함수를 호출할 때는 앞서 a와 b를 결합했기 때문에 ab로 명명했다.

3. combinLatest()

2개 이상의 Observable을 기반으로 Observable 각각의 값이 변경되었을 때 갱신해주는 함수이다. 마지막 인자로 combiner가 들어가는데 그것이 각 Observable을 결합하여 어떤 결과를 만들어주는 역할을 하는 함수이다. zip() 함수의 zipper 인자와 동일하다.

예를 들어, 첫 번째 Observable과 두 번째 Observable을 결합하는 기능을 만든다고 하면 첫 번째 Observable의 값 혹은 두 번째 Observable의 값이 변경되었을 때 그 값을 자동으로 갱신해준다.

첫 번째 Observable에서만 데이터를 발행하거나 두 번째 Observable의 데이터 흐름만 있으면 구독자에게 어떤 데이터도 발행하지 않는다. 하지만 두 Observable 모두 값을 발행하면 그때는 결과값이 나온다. 그 다음부터는 둘 중에 어떤 것이 갱신되던지 최신 결과값을 보여준다. -> 이 부분이 zip() 함수와 다른 점이다.

zip() 함수처럼 결합하고자 하는 첫 번째와 두 번쨰 Observable을 넣고 마지막으로 그것을 결합하는 combiner() 함수를 넣어주면 된다. 입력할 수 있는 Observable 인자의 개수는 9개이다.

4. merge()

  • zip() 함수나 combineLatest() 함수와 비교하면 가장 단순한 결합 함수이다.
  • 입력 Observable의 순서와 모든 Observable이 데이터를 발행하는지 등에 관여하지 않고 어느 것이든 업스트림에서 먼저 입력되는 데이터를 그대로 발행한다.

5. concat()

  • 2개 이상의 Observable을 이어 붙여주는 함수이다.
  • 첫 번째 Observable에 onComplete 이벤트가 발생해야 두 번째 Observable을 구독한다. 스레드를 활용한 일반적이 코드로 이와 같은 내용을 구현하기는 복잡하다.
  • 결합할 수 있는 Observable은 최대 4개이다.

첫 번째 Observable에 onComplete 이벤트가 발생하지 않게 되면 두 번째 Observable은 영원히 대기한다. 이는 잠재적인 메모리 누수의 위험을 내포한다. 따라서 입력 Observable이 반드시 완료(onComplete 이벤트)될 수 있게 해야 한다.

Observable의 중간 상태를 확인하는 방법

리액티브 프로그래밍을 할 때는 중간에 로그를 출력하는 것이 낯설게 느껴진다. 특히 함수형 프로그래밍 패러다임을 배우면서 "로그나 화면 출력하는 등을 부수 효과를 발생시킨다"라는 내용을 접하면 부수 효과를 최소화하려고 하는 경향이 생긴다.

하지만 부수 효과를 감내하고서라도 적절한 로그는 유지 보수성을 확보하기 위해 꼭 필요하다. RxJava에서는 Observable의 중간 결과를 간편하게 확인할 수 있는 함수들을 제공한다. 확실하지 않은 코드나 예제 코드를 실행할 때 찜찜한 부분이 있다면 doOnNext(), doOnComplete(), doOnError() 함수를 추가해보자.