TestObserver 클래스

  • RxJava에서 제공하는 TestObserver 클래스다.
  • JUnit 기반의 테스트 코드와 다른 점은 test()와 assertResult() 함수다.
  • 코드는 다음과 같다.
1
2
3
4
5
6
7
8
9
10
11
12
13
public class JUnitBasic {
@Test
public void testGetShapeObservable() {
String[] data = {"1", "2-R", "3-T"};
// source : 실제 결과.
Observable<String> source = Observable.fromArray(data)
.map(Shape::getShape);

// expected : 예상 결과.
String[] expected = {Shape.BALL, Shape.RECTANGLE, Shape.TRIANGLE};
source.test().assertResult(expected);
}
}
  • assertResult() : 예상된 결과와 실제 결과를 비교하는 메소드. JUnit의 assertEquals() 메소드와 같다.
  • assertFailure() : Observable에서 기대했던 에러가 발생하는지 확인하는 코드다. 만약, 기대했던 에러가 발생하지 않으면 테스트 코드 실행은 실패한다.
    • 총 3개의 값을 넣어 앞 두 번째 값까지는 정상적으로 발행하고 마지막 값에서 기대했던 예외가 발생하는지 확인한다.
    • 세번째 데이터는 %를 붙여서 Integer.parseInt()에서 변환이 안되기 때문에 NumberFormatException이 발생하고 onError 이벤트로 종료된다.
1
2
3
4
5
6
7
8
@Test
public void assertFailureExample() {
String[] data = {"100", "200", "%300"};
Observable<Integer> source = Observable.fromArray(data)
.map(Integer::parseInt);

source.test().assertFailure(NumberFormatException.class, 100, 200);
}
  • assertFailureAndMessage() : 기대했던 에러 발생시 에러 메시지까지 확인할 수 있다.
    • 에러 메시지를 확인하기 위한 message 인자가 추가되었다.
    • 에러가 발생했을 때 메시지를 확인하기 위해 아래와 같은 구문을 추가한다.
1
2
3
4
5
6
7
8
9
@Test
public void assertFailureAndMessage() {
String[] data = {"100", "200", "%300"};
Observable<Integer> source = Observable.fromArray(data)
.map(Integer::parseInt);

source.test().assertFailureAndMessage(NumberFormatException.class,
"For input string : \"%300\"", 100, 200);
}
  • awaitDone() : interval() 함수처럼 비동기로 동작하는 Observable 코드를 테스트할 수 있다.
  • assertComplete() : Observable을 정상적으로 완료했는지(onComplete 이벤트) 확인한다.

비동기 코드 테스트

  • RxJava는 다양한 상황에서 비동기 코드를 직관적으로 작성할 수 있다. 하지만, 비동기 코드를 테스트하는 것은 어려우므로 RxJava는 비동기로 동작하는 코드를 테스트할 방법을 제공한다.
  • Observable.interval() 메소드는 main 스레드가 아닌 계산 스케줄러에서 실행되기 때문에 비동기 코드를 테스트할 필요가 있다. awaitDone() 함수를 사용하면 된다.
  • awaitDone() 함수는 test() 함수가 실행되는 스레드에서 onComplete() 함수를 호출할 때까지 기다려준다.
1
2
3
4
5
6
7
8
9
10
11
12
13
14
public class TestAsync {
@Test
public void testInterval() {
Observable<Integer> source = Observable.interval(100L, TimeUnit.MILLISECONDS)
.take(5)
.map(Long::intValue);

source.doOnNext(Log::d)
.test()
.awaitDone(1L, TimeUnit.SECONDS)
.assertResult(0, 1, 2, 3, 4);

}
}

HTTP 서버와 통신하는 코드를 테스트하는 것은 소스 코드만 봐도 충분히 이해할 수 있다.

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
@Test
public void testHttp() {
final String url = "http://api.github.com/users/yudong80";
Observable<String> source = Observable.just(url)
.subscribeOn(Schedulers.io())
.map(OkHttpHelper::get)
.doOnNext(Log::d) // json 을 로그로 찍는다.
.map(json -> GsonHelper.parseValue(json, "name"))
.observeOn(Schedulers.newThread());

// json 중에서 name 만 뽑은 것을 로그로 찍는다.
String expected = "Dong Hwan Yu";
source.doOnNext(Log::i)
.test()
.awaitDone(3, TimeUnit.SECONDS)
.assertResult(expected);
}
  • HTTP 호출은 IO 스케줄러에서 실행되었고 JSON 파싱 결과는 뉴 스레드 스케줄러에서 출력한다. UI 프로그래밍을 할 때는 뉴 스레드 스케줄러 대신 UI 스레드로 변경하면 된다.

Flowable 클래스

  • 배압 이슈를 위해 별도로 분리한 클래스다.
  • Flowable 클래스를 도입한 이유는 Observable 클래스의 성능을 향상시키기 위해서다.
  • 기존의 Observable 클래스(배압 관련 함수들을 포함했었다.)는 배압에 관한 처리가 불필요한 경우에는 초기 로딩 때문에 약간의 오버헤드가 있었지만, RxJava 2.X의 Observable 클래스에는 배압으로 인한 성능 오버헤드가 사라졌다.
  • Flowable -> Observable로 변환하는 것뿐만 아니라 반대도 어렵지 않다.

Observabler과 Flowable의 선택 기준

1. Observable을 사용해야 할 때

  • 최대 1000개 미만의 데이터 흐름.
  • 예를 들어, 응용 프로그램에서 OOM이 발생할 확률이 거의 없는 경우다
  • 마우스 이벤트나 터치 이벤트를 다루는 GUI 프로그래밍. 이 경우에는 배압의 이슈가 거의 발생하지 않는다. Observable로는 초당 1000회 이하의 이벤트를 다루는데 이때 sample()이나 debounce() 같은 흐름 제어 함수를 활용하면 된다.
  • 데이터 흐름이 본질적으로 동기 방식이지만, 프로젝트에서 사용하는 플랫폼이 자바 Stream API나 그에 준하는 기능을 제공하지 않을 때, Observable은 보통 Flowable과 비교했을 때 성능 오버헤드가 낮다.

2. Flowable을 사용해야 할 때

  • 특정 방식으로 생성된 1000개 이상의 데이터를 처리하는 경우. 이때 메소드 체인에서 데이터 소스에 데이터 개수 제한을 요청해야 한다.
  • 디스크에서 파일을 읽어 들일 경우, 본질적으로 블로킹 I/O 방식을 활용하고 내가 원하는 만큼 가져오는 방식(pull-based)으로 처리해야 하기 때문이다. 예를 들면, 특정 단위로 잘라 몇 행씩 가져오도록 제어할 수 있다.
  • JDBC를 활용해 데이터베이스의 쿼리 결과를 가져오는 경우, 블로킹 방식을 이용하므로 ResultSet.next()를 호출하는 방식으로 쿼리의 결과를 읽어오도록 제어할 수 있다.
  • 네트워크 I/O를 실행하는 경우, 네트워크나 프로토콜을 통해 서버에서 가져오길 원하는 만큼의 데이터양을 요청할 수 있을 때이다.
  • 다수의 블로킹 방식을 사용하거나 가져오는 방식(pull-based)의 데이터 소스가 미래에는 논 블로킹 방식의 리액티브 API나 드라이버를 제공할 수도 있는 경우다.

디스크에서 파일 읽기, JDBC를 활용한 데이터베이스 쿼리하기, 네트워크 I/O 등은 차가운 Observable(구독자가 구독하면 데이터의 처음부터 모두 발행하는 Observable)에 해당한다. 이는 결과 데이터를 처리할 수 있는만큼 조금씩 가져오는 것이 아니라 한 번에 모두 가져온다. 따라서 이 경우에는 반드시 Flowable을 활용해야 하는 것은 아니다.

업스트림에서 발생하는 데이터의 속도와 다운스트림에서 처리하는 속도의 차이가 작다면 Observable을 활용해도 된다. 즉, 데이터 발행과 처리 속도가 차이나더라도 먼저 sample(), throttle(), debounce() 같은 흐름 제어 함수를 활용해 해결하는 것이 좋다. 이러한 함수로도 해결하기 어려울 때는 Flowable 클래스로 전환하면 된다.

Flowable을 활용한 배압 이슈 대응

  • onBackpressureBuffer() : 배압 이슈가 발생했을 때 별도의 버퍼에 저장한다. Flowable 클래스는 기본적으로 128개의 버퍼가 있다.
  • onBackpressureDrop() : 배압 이슈가 발생했을 때 해당 데이터를 무시한다.
  • onBackpressureLatest() : 처리할 수 없어서 쌓이는 데이터를 무시하면서 최신의 데이터만 유지한다.
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
public class FlowableSample {
public static void main(String[] args) {
CommonUtils.exampleStart();

PublishSubject<Integer> subject = PublishSubject.create();
subject.observeOn(Schedulers.computation())
.subscribe(data -> {
CommonUtils.sleep(100); // 100ms 후에 데이터 처리.
Log.it(data);
}, err -> Log.e(err.toString()));

// 뜨거운 Observable 로 50,000,000개의 데이터를 연속으로 발행함.
for (int i = 0; i < 50000000; i++) {
subject.onNext(i);
}

subject.onComplete();
}
}

PublishSubject 객체를 생성한 후, 처리 결과는 계산 스케줄러로 전달한다. subscribe() 함수를 호출한 후 Subject 객체가 발행한 데이터는 100ms 후에 로그를 찍는다.

한편 PublishSubject 객체는 뜨거운 Observable이다. 데이터를 발행하는 속도와 데이터를 처리하는 속도의 차이가 발생했을 때 어떠한 보호 장치도 없다. 결과는 아래와 같다.

1
2
3
4
5
6
7
8
9
10
11
RxComputationThreadPool-1 | 604 | value = 0
RxComputationThreadPool-1 | 742 | value = 1
RxComputationThreadPool-1 | 3172 | value = 2
RxComputationThreadPool-1 | 5158 | value = 3
RxComputationThreadPool-1 | 7426 | value = 4
RxComputationThreadPool-1 | 7528 | value = 5
RxComputationThreadPool-1 | 8017 | value = 6
RxComputationThreadPool-1 | 8570 | value = 7
RxComputationThreadPool-1 | 9180 | value = 8
RxComputationThreadPool-1 | 15089 | value = 9
RxComputationThreadPool-1 | 15675 | value = 10

처리 결과를 보면 100ms 간격보다 상당히 느리게 데이터를 처리한다. 그리고 데이터는 반복문을 통해서 PublishSubject 객체에서 매우 빠르게 발행되는데 데이터는 겨우 10개만 처리되었다. 만약, 발행하는 데이터의 개수가 훨씬 많아지면 JVM은 곧 OOM 예외를 발생하고 실행을 중단할 것이다. 이런 배압 이슈가 발생했을 때 Flowable 클래스를 활용한다.

위와 같은 배압 이슈에 대응하기 위해서 첫 번째 방법은 사용해보자.

  • 버퍼 만들기
  • onBackpressureBuffer() 함수에는 다음과 같은 오버로딩이 있다.
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
// 기본값(128)의 버퍼 개수가 있다.
public final Flowable<T> onBackpressureBuffer()

// delayError 여부를 지정할 수 있다.
// true : 예외가 발생했을 때 버퍼에 쌓인 데이터를 모두 처리할 때가지 예외를 던지지 않는다.
// false : 예외가 발생했을 때 바로 다운스트림에 예외를 던진다.
// 기본값은 false다.
public final Flowable<T> onBackpressureBuffer(boolean delayError)

// capacity 인자로 버퍼의 개수를 지정한다.
// onOverflow 인자에 버퍼가 넘쳤을 때 실행할 동작을 지정한다.
public final Flowable<T> onBackpressureBuffer(int capacity, Action onOverflow)

// 버퍼가 가득찼을 때 추가로 실행하는 전략을 지정할 수 있다.
public final Flowable<T> onBackpressureBuffer(long capacity, Action onOverflow, BackpressureOverflowStrategy overflowStrategy)

지정할 수 있는 전략은 아래와 같다.

  • ERROR : MissingBackpressureException 예외를 던지고 데이터 흐름을 중단한다.
  • DROP_LATEST : 버퍼에 쌓여있는 최근 값을 제거한다.
  • DROP_OLDEST : 버퍼에 쌓여있는 가장 오래된 값을 제거한다.
1
2
3
4
5
6
7
8
9
10
11
12
13
14
public class onBackPressureBufferSample {
public static void main(String[] args) {
CommonUtils.exampleStart();

Flowable.range(1, 50000000)
.onBackpressureBuffer(128, () -> {
}, BackpressureOverflowStrategy.DROP_OLDEST)
.observeOn(Schedulers.computation())
.subscribe(data -> {
CommonUtils.sleep(100);
Log.it(data);
}, error -> Log.e(error.getMessage()));
}
}

Flowable.range() 함수를 활용해 동일한 개수의 데이터를 발행한다. 그리고 128개의 버퍼를 생성한 후 버퍼가 넘치면 버퍼의 가장 오래된 데이터를 버리도록 지정한다. 결과는 아래와 같다.

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
RxComputationThreadPool-1 | 356 | value = 1
RxComputationThreadPool-1 | 459 | value = 2
RxComputationThreadPool-1 | 561 | value = 3
RxComputationThreadPool-1 | 662 | value = 4
RxComputationThreadPool-1 | 766 | value = 5
RxComputationThreadPool-1 | 869 | value = 6
RxComputationThreadPool-1 | 972 | value = 7
RxComputationThreadPool-1 | 1072 | value = 8
RxComputationThreadPool-1 | 1176 | value = 9
RxComputationThreadPool-1 | 1276 | value = 10
RxComputationThreadPool-1 | 1380 | value = 11
RxComputationThreadPool-1 | 1485 | value = 12
RxComputationThreadPool-1 | 1586 | value = 13
RxComputationThreadPool-1 | 1688 | value = 14
RxComputationThreadPool-1 | 1793 | value = 15
RxComputationThreadPool-1 | 1895 | value = 16

이처럼 버퍼를 활용해 데이터를 훨씬 빠르게 다운스트림으로 발행하는 것을 알 수 있다. 거의 10배의 속도이다. 발행하는 속도도 이전보다 빨라졌고 더 많은 데이터를 발행한다. 데이터의 발행 속도가 워낙 빠르기 때문에 128개의 버퍼로는 모두 대응하기 어렵다.

배압 이슈에 대응하는 두 번째 방법은 onBackpressureDrop() 함수를 활용하는 것이다. onBackpressureBuffer() 함수가 버퍼를 만들어 쌓아 두었다가 처리하는 방식이라면, onBackpressureDrop() 함수는 버퍼가 가득 찼을 때, 이후 데이터를 그냥 무시한다.

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
public class onBackPressureDropSample {
public static void main(String[] args) {
CommonUtils.exampleStart();

Flowable.range(1, 50000000)
.onBackpressureDrop()
.observeOn(Schedulers.computation())
.subscribe(data -> {
CommonUtils.sleep(100);
Log.it(data);
}, error -> Log.e(error.getMessage()));

CommonUtils.sleep(20000);
}
}
// 결과
... 생략
RxComputationThreadPool-1 | 13004 | value = 124
RxComputationThreadPool-1 | 13105 | value = 125
RxComputationThreadPool-1 | 13207 | value = 126
RxComputationThreadPool-1 | 13309 | value = 127
RxComputationThreadPool-1 | 13411 | value = 128

버퍼에 128개의 데이터가 가득 찼을 때, 데이터를 계산 스케줄러에서 출력하기도 전에 예제가 끝난다. 따라서 계산 스케줄러에서 데이터를 다운스트림으로 발행할 수 있도록 충분한 시간(여기서는 20초)을 기다려줘야 한다. UI 프로그래밍(안드로이드)에서는 이와 같은 기다림이 필요하지 않다.

기본 버퍼 개수만큼만 버퍼에 저장하고 나머지는 모두 무시했기 때문에 128개의 데이터만 출력하고 종료한다.

마지막 방법은 onBackpressureLatest() 함수를 활용하는 것이다. 위의 두 함수의 기능을 섞을 것으로 데이터가 많이 쌓이면 무시하면서 최신의 데이터 즉, 마지막 데이터를 유지하는 것이다.

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
public class onBackPressureLatest {
public static void main(String[] args) {
CommonUtils.exampleStart();

Flowable.range(1, 50000000)
.onBackpressureLatest()
.observeOn(Schedulers.computation())
.subscribe(data -> {
CommonUtils.sleep(100);
Log.it(data);
}, error -> Log.e(error.getMessage()));

CommonUtils.sleep(20000);
}
}
// 결과
RxComputationThreadPool-1 | 13416 | value = 127
RxComputationThreadPool-1 | 13518 | value = 128
RxComputationThreadPool-1 | 13622 | value = 50000000

함수만 교체해주었고, 결과는 위와 같다. 버퍼가 꽉찼을 때, 데이터를 무시하면서 마지막 데이터를 다운 스트림으로 발행하는 것을 확인할 수 있다.