TestObserver 클래스

  • RxJava에서 제공하는 TestObserver 클래스다.
  • JUnit 기반의 테스트 코드와 다른 점은 test()와 assertResult() 함수다.
  • 코드는 다음과 같다.
1
2
3
4
5
6
7
8
9
10
11
12
13
public class JUnitBasic {
@Test
public void testGetShapeObservable() {
String[] data = {"1", "2-R", "3-T"};
// source : 실제 결과.
Observable<String> source = Observable.fromArray(data)
.map(Shape::getShape);

// expected : 예상 결과.
String[] expected = {Shape.BALL, Shape.RECTANGLE, Shape.TRIANGLE};
source.test().assertResult(expected);
}
}
  • assertResult() : 예상된 결과와 실제 결과를 비교하는 메소드. JUnit의 assertEquals() 메소드와 같다.
  • assertFailure() : Observable에서 기대했던 에러가 발생하는지 확인하는 코드다. 만약, 기대했던 에러가 발생하지 않으면 테스트 코드 실행은 실패한다.
    • 총 3개의 값을 넣어 앞 두 번째 값까지는 정상적으로 발행하고 마지막 값에서 기대했던 예외가 발생하는지 확인한다.
    • 세번째 데이터는 %를 붙여서 Integer.parseInt()에서 변환이 안되기 때문에 NumberFormatException이 발생하고 onError 이벤트로 종료된다.
1
2
3
4
5
6
7
8
@Test
public void assertFailureExample() {
String[] data = {"100", "200", "%300"};
Observable<Integer> source = Observable.fromArray(data)
.map(Integer::parseInt);

source.test().assertFailure(NumberFormatException.class, 100, 200);
}
  • assertFailureAndMessage() : 기대했던 에러 발생시 에러 메시지까지 확인할 수 있다.
    • 에러 메시지를 확인하기 위한 message 인자가 추가되었다.
    • 에러가 발생했을 때 메시지를 확인하기 위해 아래와 같은 구문을 추가한다.
1
2
3
4
5
6
7
8
9
@Test
public void assertFailureAndMessage() {
String[] data = {"100", "200", "%300"};
Observable<Integer> source = Observable.fromArray(data)
.map(Integer::parseInt);

source.test().assertFailureAndMessage(NumberFormatException.class,
"For input string : \"%300\"", 100, 200);
}
  • awaitDone() : interval() 함수처럼 비동기로 동작하는 Observable 코드를 테스트할 수 있다.
  • assertComplete() : Observable을 정상적으로 완료했는지(onComplete 이벤트) 확인한다.

비동기 코드 테스트

  • RxJava는 다양한 상황에서 비동기 코드를 직관적으로 작성할 수 있다. 하지만, 비동기 코드를 테스트하는 것은 어려우므로 RxJava는 비동기로 동작하는 코드를 테스트할 방법을 제공한다.
  • Observable.interval() 메소드는 main 스레드가 아닌 계산 스케줄러에서 실행되기 때문에 비동기 코드를 테스트할 필요가 있다. awaitDone() 함수를 사용하면 된다.
  • awaitDone() 함수는 test() 함수가 실행되는 스레드에서 onComplete() 함수를 호출할 때까지 기다려준다.
1
2
3
4
5
6
7
8
9
10
11
12
13
14
public class TestAsync {
@Test
public void testInterval() {
Observable<Integer> source = Observable.interval(100L, TimeUnit.MILLISECONDS)
.take(5)
.map(Long::intValue);

source.doOnNext(Log::d)
.test()
.awaitDone(1L, TimeUnit.SECONDS)
.assertResult(0, 1, 2, 3, 4);

}
}

HTTP 서버와 통신하는 코드를 테스트하는 것은 소스 코드만 봐도 충분히 이해할 수 있다.

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
@Test
public void testHttp() {
final String url = "http://api.github.com/users/yudong80";
Observable<String> source = Observable.just(url)
.subscribeOn(Schedulers.io())
.map(OkHttpHelper::get)
.doOnNext(Log::d) // json 을 로그로 찍는다.
.map(json -> GsonHelper.parseValue(json, "name"))
.observeOn(Schedulers.newThread());

// json 중에서 name 만 뽑은 것을 로그로 찍는다.
String expected = "Dong Hwan Yu";
source.doOnNext(Log::i)
.test()
.awaitDone(3, TimeUnit.SECONDS)
.assertResult(expected);
}
  • HTTP 호출은 IO 스케줄러에서 실행되었고 JSON 파싱 결과는 뉴 스레드 스케줄러에서 출력한다. UI 프로그래밍을 할 때는 뉴 스레드 스케줄러 대신 UI 스레드로 변경하면 된다.

Flowable 클래스

  • 배압 이슈를 위해 별도로 분리한 클래스다.
  • Flowable 클래스를 도입한 이유는 Observable 클래스의 성능을 향상시키기 위해서다.
  • 기존의 Observable 클래스(배압 관련 함수들을 포함했었다.)는 배압에 관한 처리가 불필요한 경우에는 초기 로딩 때문에 약간의 오버헤드가 있었지만, RxJava 2.X의 Observable 클래스에는 배압으로 인한 성능 오버헤드가 사라졌다.
  • Flowable -> Observable로 변환하는 것뿐만 아니라 반대도 어렵지 않다.

Observabler과 Flowable의 선택 기준

1. Observable을 사용해야 할 때

  • 최대 1000개 미만의 데이터 흐름.
  • 예를 들어, 응용 프로그램에서 OOM이 발생할 확률이 거의 없는 경우다
  • 마우스 이벤트나 터치 이벤트를 다루는 GUI 프로그래밍. 이 경우에는 배압의 이슈가 거의 발생하지 않는다. Observable로는 초당 1000회 이하의 이벤트를 다루는데 이때 sample()이나 debounce() 같은 흐름 제어 함수를 활용하면 된다.
  • 데이터 흐름이 본질적으로 동기 방식이지만, 프로젝트에서 사용하는 플랫폼이 자바 Stream API나 그에 준하는 기능을 제공하지 않을 때, Observable은 보통 Flowable과 비교했을 때 성능 오버헤드가 낮다.

2. Flowable을 사용해야 할 때

  • 특정 방식으로 생성된 1000개 이상의 데이터를 처리하는 경우. 이때 메소드 체인에서 데이터 소스에 데이터 개수 제한을 요청해야 한다.
  • 디스크에서 파일을 읽어 들일 경우, 본질적으로 블로킹 I/O 방식을 활용하고 내가 원하는 만큼 가져오는 방식(pull-based)으로 처리해야 하기 때문이다. 예를 들면, 특정 단위로 잘라 몇 행씩 가져오도록 제어할 수 있다.
  • JDBC를 활용해 데이터베이스의 쿼리 결과를 가져오는 경우, 블로킹 방식을 이용하므로 ResultSet.next()를 호출하는 방식으로 쿼리의 결과를 읽어오도록 제어할 수 있다.
  • 네트워크 I/O를 실행하는 경우, 네트워크나 프로토콜을 통해 서버에서 가져오길 원하는 만큼의 데이터양을 요청할 수 있을 때이다.
  • 다수의 블로킹 방식을 사용하거나 가져오는 방식(pull-based)의 데이터 소스가 미래에는 논 블로킹 방식의 리액티브 API나 드라이버를 제공할 수도 있는 경우다.

디스크에서 파일 읽기, JDBC를 활용한 데이터베이스 쿼리하기, 네트워크 I/O 등은 차가운 Observable(구독자가 구독하면 데이터의 처음부터 모두 발행하는 Observable)에 해당한다. 이는 결과 데이터를 처리할 수 있는만큼 조금씩 가져오는 것이 아니라 한 번에 모두 가져온다. 따라서 이 경우에는 반드시 Flowable을 활용해야 하는 것은 아니다.

업스트림에서 발생하는 데이터의 속도와 다운스트림에서 처리하는 속도의 차이가 작다면 Observable을 활용해도 된다. 즉, 데이터 발행과 처리 속도가 차이나더라도 먼저 sample(), throttle(), debounce() 같은 흐름 제어 함수를 활용해 해결하는 것이 좋다. 이러한 함수로도 해결하기 어려울 때는 Flowable 클래스로 전환하면 된다.

Flowable을 활용한 배압 이슈 대응

  • onBackpressureBuffer() : 배압 이슈가 발생했을 때 별도의 버퍼에 저장한다. Flowable 클래스는 기본적으로 128개의 버퍼가 있다.
  • onBackpressureDrop() : 배압 이슈가 발생했을 때 해당 데이터를 무시한다.
  • onBackpressureLatest() : 처리할 수 없어서 쌓이는 데이터를 무시하면서 최신의 데이터만 유지한다.
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
public class FlowableSample {
public static void main(String[] args) {
CommonUtils.exampleStart();

PublishSubject<Integer> subject = PublishSubject.create();
subject.observeOn(Schedulers.computation())
.subscribe(data -> {
CommonUtils.sleep(100); // 100ms 후에 데이터 처리.
Log.it(data);
}, err -> Log.e(err.toString()));

// 뜨거운 Observable 로 50,000,000개의 데이터를 연속으로 발행함.
for (int i = 0; i < 50000000; i++) {
subject.onNext(i);
}

subject.onComplete();
}
}

PublishSubject 객체를 생성한 후, 처리 결과는 계산 스케줄러로 전달한다. subscribe() 함수를 호출한 후 Subject 객체가 발행한 데이터는 100ms 후에 로그를 찍는다.

한편 PublishSubject 객체는 뜨거운 Observable이다. 데이터를 발행하는 속도와 데이터를 처리하는 속도의 차이가 발생했을 때 어떠한 보호 장치도 없다. 결과는 아래와 같다.

1
2
3
4
5
6
7
8
9
10
11
RxComputationThreadPool-1 | 604 | value = 0
RxComputationThreadPool-1 | 742 | value = 1
RxComputationThreadPool-1 | 3172 | value = 2
RxComputationThreadPool-1 | 5158 | value = 3
RxComputationThreadPool-1 | 7426 | value = 4
RxComputationThreadPool-1 | 7528 | value = 5
RxComputationThreadPool-1 | 8017 | value = 6
RxComputationThreadPool-1 | 8570 | value = 7
RxComputationThreadPool-1 | 9180 | value = 8
RxComputationThreadPool-1 | 15089 | value = 9
RxComputationThreadPool-1 | 15675 | value = 10

처리 결과를 보면 100ms 간격보다 상당히 느리게 데이터를 처리한다. 그리고 데이터는 반복문을 통해서 PublishSubject 객체에서 매우 빠르게 발행되는데 데이터는 겨우 10개만 처리되었다. 만약, 발행하는 데이터의 개수가 훨씬 많아지면 JVM은 곧 OOM 예외를 발생하고 실행을 중단할 것이다. 이런 배압 이슈가 발생했을 때 Flowable 클래스를 활용한다.

위와 같은 배압 이슈에 대응하기 위해서 첫 번째 방법은 사용해보자.

  • 버퍼 만들기
  • onBackpressureBuffer() 함수에는 다음과 같은 오버로딩이 있다.
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
// 기본값(128)의 버퍼 개수가 있다.
public final Flowable<T> onBackpressureBuffer()

// delayError 여부를 지정할 수 있다.
// true : 예외가 발생했을 때 버퍼에 쌓인 데이터를 모두 처리할 때가지 예외를 던지지 않는다.
// false : 예외가 발생했을 때 바로 다운스트림에 예외를 던진다.
// 기본값은 false다.
public final Flowable<T> onBackpressureBuffer(boolean delayError)

// capacity 인자로 버퍼의 개수를 지정한다.
// onOverflow 인자에 버퍼가 넘쳤을 때 실행할 동작을 지정한다.
public final Flowable<T> onBackpressureBuffer(int capacity, Action onOverflow)

// 버퍼가 가득찼을 때 추가로 실행하는 전략을 지정할 수 있다.
public final Flowable<T> onBackpressureBuffer(long capacity, Action onOverflow, BackpressureOverflowStrategy overflowStrategy)

지정할 수 있는 전략은 아래와 같다.

  • ERROR : MissingBackpressureException 예외를 던지고 데이터 흐름을 중단한다.
  • DROP_LATEST : 버퍼에 쌓여있는 최근 값을 제거한다.
  • DROP_OLDEST : 버퍼에 쌓여있는 가장 오래된 값을 제거한다.
1
2
3
4
5
6
7
8
9
10
11
12
13
14
public class onBackPressureBufferSample {
public static void main(String[] args) {
CommonUtils.exampleStart();

Flowable.range(1, 50000000)
.onBackpressureBuffer(128, () -> {
}, BackpressureOverflowStrategy.DROP_OLDEST)
.observeOn(Schedulers.computation())
.subscribe(data -> {
CommonUtils.sleep(100);
Log.it(data);
}, error -> Log.e(error.getMessage()));
}
}

Flowable.range() 함수를 활용해 동일한 개수의 데이터를 발행한다. 그리고 128개의 버퍼를 생성한 후 버퍼가 넘치면 버퍼의 가장 오래된 데이터를 버리도록 지정한다. 결과는 아래와 같다.

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
RxComputationThreadPool-1 | 356 | value = 1
RxComputationThreadPool-1 | 459 | value = 2
RxComputationThreadPool-1 | 561 | value = 3
RxComputationThreadPool-1 | 662 | value = 4
RxComputationThreadPool-1 | 766 | value = 5
RxComputationThreadPool-1 | 869 | value = 6
RxComputationThreadPool-1 | 972 | value = 7
RxComputationThreadPool-1 | 1072 | value = 8
RxComputationThreadPool-1 | 1176 | value = 9
RxComputationThreadPool-1 | 1276 | value = 10
RxComputationThreadPool-1 | 1380 | value = 11
RxComputationThreadPool-1 | 1485 | value = 12
RxComputationThreadPool-1 | 1586 | value = 13
RxComputationThreadPool-1 | 1688 | value = 14
RxComputationThreadPool-1 | 1793 | value = 15
RxComputationThreadPool-1 | 1895 | value = 16

이처럼 버퍼를 활용해 데이터를 훨씬 빠르게 다운스트림으로 발행하는 것을 알 수 있다. 거의 10배의 속도이다. 발행하는 속도도 이전보다 빨라졌고 더 많은 데이터를 발행한다. 데이터의 발행 속도가 워낙 빠르기 때문에 128개의 버퍼로는 모두 대응하기 어렵다.

배압 이슈에 대응하는 두 번째 방법은 onBackpressureDrop() 함수를 활용하는 것이다. onBackpressureBuffer() 함수가 버퍼를 만들어 쌓아 두었다가 처리하는 방식이라면, onBackpressureDrop() 함수는 버퍼가 가득 찼을 때, 이후 데이터를 그냥 무시한다.

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
public class onBackPressureDropSample {
public static void main(String[] args) {
CommonUtils.exampleStart();

Flowable.range(1, 50000000)
.onBackpressureDrop()
.observeOn(Schedulers.computation())
.subscribe(data -> {
CommonUtils.sleep(100);
Log.it(data);
}, error -> Log.e(error.getMessage()));

CommonUtils.sleep(20000);
}
}
// 결과
... 생략
RxComputationThreadPool-1 | 13004 | value = 124
RxComputationThreadPool-1 | 13105 | value = 125
RxComputationThreadPool-1 | 13207 | value = 126
RxComputationThreadPool-1 | 13309 | value = 127
RxComputationThreadPool-1 | 13411 | value = 128

버퍼에 128개의 데이터가 가득 찼을 때, 데이터를 계산 스케줄러에서 출력하기도 전에 예제가 끝난다. 따라서 계산 스케줄러에서 데이터를 다운스트림으로 발행할 수 있도록 충분한 시간(여기서는 20초)을 기다려줘야 한다. UI 프로그래밍(안드로이드)에서는 이와 같은 기다림이 필요하지 않다.

기본 버퍼 개수만큼만 버퍼에 저장하고 나머지는 모두 무시했기 때문에 128개의 데이터만 출력하고 종료한다.

마지막 방법은 onBackpressureLatest() 함수를 활용하는 것이다. 위의 두 함수의 기능을 섞을 것으로 데이터가 많이 쌓이면 무시하면서 최신의 데이터 즉, 마지막 데이터를 유지하는 것이다.

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
public class onBackPressureLatest {
public static void main(String[] args) {
CommonUtils.exampleStart();

Flowable.range(1, 50000000)
.onBackpressureLatest()
.observeOn(Schedulers.computation())
.subscribe(data -> {
CommonUtils.sleep(100);
Log.it(data);
}, error -> Log.e(error.getMessage()));

CommonUtils.sleep(20000);
}
}
// 결과
RxComputationThreadPool-1 | 13416 | value = 127
RxComputationThreadPool-1 | 13518 | value = 128
RxComputationThreadPool-1 | 13622 | value = 50000000

함수만 교체해주었고, 결과는 위와 같다. 버퍼가 꽉찼을 때, 데이터를 무시하면서 마지막 데이터를 다운 스트림으로 발행하는 것을 확인할 수 있다.

Comment and share

흐름 제어

  • 흐름 제어는 Observable이 데이터를 발행하는 속도와 옵저버가 데이터를 받아서 처리하는 속도 사이의 차이가 발생할 때 사용하는 함수이다.
  • RxJava는 Observable이 데이터의 흐름을 push하는 방식으로 동작하기 때문에 위의 문제에 대해서 대처할 수 있어야 한다.

sample()

  • 특정한 시간 동안 가장 최근에 발행한 데이터만 걸러준다. 즉, 최근에 발행된 데이터만 넘겨주고 나머지는 무시한다.
  • 해당 시간에는 아무리 많은 데이터가 들어와도 해당 구간의 마지막 데이터만 발행하고 나머지는 무시한다.
1
2
3
@SchedulerSupport(SchedulerSupport.COMPUTATION)
public final Observable<T> sample(long period, TimeUnit unit)
public final Observable<T> sample(long period, TimeUnit unit, boolean emitLast)
  • emitLast 인자는 sample() 함수의 데이터 발행이 완료되지 않고 마지막에 데이터가 남아 있을 때, 해당 데이터를 발행할 것인지 결정한다. true로 설정하면 마지막 데이터를 발행한다.
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
public class sampleTest {
public static void main(String[] args) {
String[] data = {"1", "7", "2", "3", "6"};

// 시간 측정용.
CommonUtils.exampleStart();

// 앞의 4개의 데이터는 100ms 간격으로 발행.
Observable<String> earlySource = Observable.fromArray(data)
.take(4)
.zipWith(Observable.interval(100L, TimeUnit.MILLISECONDS),
(a, b) -> a);

// 마지막 데이터는 300ms 후에 발행.
Observable<String> lateSource = Observable.just(data[4])
.zipWith(Observable.interval(300L, TimeUnit.MILLISECONDS),
(a, b) -> a);

// 2개의 Observable 을 결합하고 300ms 로 샘플링.
Observable<String> source = Observable.concat(earlySource, lateSource)
.sample(300L, TimeUnit.MILLISECONDS);

source.subscribe(Log::it);

CommonUtils.sleep(1000);
}
}
// 결과
RxComputationThreadPool-1 | 552 | value = 7
RxComputationThreadPool-1 | 849 | value = 3
  • 먼저, 100ms 간격으로 data 배열에 있는 데이터 4개를 발행한다. 그리고 마지막 데이터인 6을 300ms 후에 발행한다.
  • 또한 내가 원하는 특정 시간 후에 발행하기 위해 concat() 함수를 호출해 2개의 데이터 흐름(Observable)을 결합했다. 이렇게 전체 데이터 흐름을 세부 데이터 흐름으로 나누면 코드의 가독성이 좋아진다.
  • sample() 함수는 300ms 간격으로 수행한다. 매 300ms 마다 가장 최근에 들어온 값만 최종적으로 발행한다.
  • 처음에 데이터를 발행하기 위해 약간의 지연 시간이 있어서(100ms+a) 다이어그램을 보면 시작할 때 약간의 간격이 있음을 볼 수 있다. 다음 데이터 발행이 3XXms이기 때문에 300ms일 때는 가장 최근 데이터가 이전에 발행했던 7이 되는 것이다.(처음 기준!)
  • 마지막 인자를 true로 설정하면 마지막 데이터를 발행한다. 기본값이 false이다.

buffer()

  • 일정 시간 동안 데이터를 모아두었다가 한꺼번에 발행해준다.
  • 따라서 넘치는 데이터 흐름을 제어할 필요가 있을 때 활용한다. 컴퓨터의 버퍼 같은 기능을 한다.
  • 처음에 빨,노,초 원을 발행하면 그것을 모아서 List 객체에 전달해준다. 그 다음 다시 하늘,파,보 원이 모이면 그것을 모아서 한 번에 발행해준다. 매우 단순한 로직이다.
  • 함수의 원형은 다음과 같다.
    • 기본적으로 스케줄러 없이 현재 스레드에서 동작하며 입력되는 값을 count에 저장된 수만큼 모아서 List에 한꺼번에 발행한다.
1
2
@SchedulerSupport(SchedulerSupport.NONE)
public final Observable<List<T>> buffer(int count)
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
public class bufferSample {
public static void main(String[] args) {
String[] data = {"1", "2", "3", "4", "5", "6"};
CommonUtils.exampleStart();

// 앞의 3개는 100ms 간격으로 발행.
Observable<String> earlySource = Observable.fromArray(data)
.take(3)
.zipWith(Observable.interval(100L, TimeUnit.MILLISECONDS),
(a, b) -> a);

// 가운데 1개는 300ms 후에 발행.
Observable<String> middleSource = Observable.just(data[3])
.zipWith(Observable.interval(300L, TimeUnit.MILLISECONDS),
(a, b) -> a);

// 마지막 2개는 100ms 후에 발행.
Observable<String> lataSource = Observable.just(data[4], data[5])
.zipWith(Observable.interval(100L, TimeUnit.MILLISECONDS),
(a, b) -> a);


Observable<List<String>> source = Observable.concat(earlySource, middleSource, lataSource)
.buffer(3);

source.subscribe(Log::it);
CommonUtils.sleep(1000);
}
}
// 결과
RxComputationThreadPool-1 | 562 | value = [1, 2, 3]
RxComputationThreadPool-3 | 1067 | value = [4, 5, 6]
  • buffer(3)는 데이터를 3개씩 모았다가 List에 채운 후 값을 한꺼번에 발행해준다. 그래서 위의 결과를 보면 확인 가능하다.
  • buffer() 함수에는 모으거나(count) 무시할(skip) 데이터 개수를 입력할 수 있다.
  • skip 변수는 count보다 값이 커야 한다. count가 2이고 skip이 3이면 2개 데이터를 모으고 3번째 데이터 1개는 스킵한다.
  • 코드는 위와 같으면 buffer(2,3)으로 호출하면 아래와 같은 결과를 얻을 수 있다.
1
2
RxComputationThreadPool-1 | 562 | value = [1, 2]
RxComputationThreadPool-3 | 1067 | value = [4, 5]
  • Observable에서 onNext 이벤트가 발생하면 내부 데이터는 3개가 아니라 2개 값을 모아 바로 List에 채운 후 구독자에게 데이터를 발행한다.

throttleFirst(), throttleLast()

  • throttle는 조절판이라는 뜻이다. 그것에 맞게 throttleFirst() 함수는 주어진 조건에서 가장 먼저 입력된 값을 발행한다. throttleLast() 함수는 주어진 조건에서 가장 마지막에 입력된 값을 발행한다.
  • throttleFirst()와 throttleLast()는 정반대의 의미가 아니다. throttleFirst() 함수는 어떤 데이터가 입력된 후 일정 시간 동안 다른 데이터가 발행되지 못하도록 방지하지만, throttleLast() 함수는 sample() 함수처럼 고정된 시간 간격안에서 마지막 데이터만 발행한다.
  • throttleFirst() : sample() 함수와 비슷하지만 다르다. sample() 함수가 주어진 시간 동안 입력된 마지막 값을 발행한다면 throttleFirst() 함수는 어떤 데이터를 발행하면 지정된 시간 동안 다른 데이터를 발행하지 않도록 막는다.
  • throttleLast() : sample() 함수와 기본 개념은 동일하다. 주어진 시간 동안 입력된 값 중 마지막 값을 발행한다.
  • 함수 원형은 다음과 같다.
    • 계산 스케줄러에서 실행한다. 즉, 비동기로 동작하도록 설계된 함수다.
    • windowDuration는 시간 간격을 지정하며 unit은 시간의 단위다.
1
2
@SchedulerSupport(SchedulerSupport.COMPUTATION)
public final Observable<T> throttleFirst(long windowDuration, TimeUnit unit)
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
public class throttleFirstSample {
public static void main(String[] args) {
String[] data = {"1", "2", "3", "4", "5", "6"};
CommonUtils.exampleStart();

// 앞의 1개는 100ms 간격으로 발행.
Observable<String> earlySource = Observable.just(data[0])
.zipWith(Observable.interval(100L, TimeUnit.MILLISECONDS),
(a, b) -> a);

// 다음 1개는 300ms 후에 발행.
Observable<String> middlerSource = Observable.just(data[1])
.zipWith(Observable.interval(300L, TimeUnit.MILLISECONDS),
(a, b) -> a);

// 마지막 4개는 100ms 후에 발행.
Observable<String> lateSource = Observable.just(data[2], data[3], data[4], data[5])
.zipWith(Observable.interval(100L, TimeUnit.MILLISECONDS),
(a, b) -> a)
.doOnNext(Log::dt); // 디버깅 정보 출력.

Observable<String> source = Observable.concat(earlySource, middlerSource, lateSource)
.throttleFirst(200L, TimeUnit.MILLISECONDS);

source.subscribe(Log::it);
CommonUtils.sleep(1000);
}
}
// 결과
RxComputationThreadPool-1 | 371 | value = 1
RxComputationThreadPool-3 | 673 | value = 2
RxComputationThreadPool-4 | 779 | debug = 3
RxComputationThreadPool-4 | 876 | debug = 4
RxComputationThreadPool-4 | 975 | debug = 5
RxComputationThreadPool-4 | 975 | value = 5
RxComputationThreadPool-4 | 1077 | debug = 6
  • 처음 100ms가 지난 후에 1을 발행한 후, 300ms 동안 기다린 다음 2를 발행한다. 그리고 100ms 간격으로 나머지 값들을 발행한다. 마지막으로 throttleFirst() 함수를 호출해 200ms 간격으로 타임 윈도에 맨 먼저 입력된 값을 발행한다.
  • 위에서는 1,2,4,6이 다운 스트림으로 발행된다.

window()

  • groupBy() 함수와 개념적으로 비슷하다.
  • throttleFirst()나 sample() 함수처럼 내가 처리할 수 있는 일부의 값들만 받아들일 수 있다. 흐름 제어 기능에 groupBy() 함수와 비슷한 별도의 Observable 분리 기능을 모두 갖추었다고 생각하면 된다.
  • count를 인자로 받는다. 예를 들어, 3을 인자로 받으면 앞으로 데이터 3개가 발행될 때마다 새로운 Observable을 생성하겠다는 뜻이다.
  • 함수의 원형은 다음과 같다.
    • 현재 스레드를 그대로 활용한다. 왜 그런지 window() 함수의 다른 변형을 비교하면 알 수 있다.
1
2
@SchedulerSupport(SchedulerSupport.NONE)
public final Observable<Observable<T>> window(long count)
1
2
3
4
5
6
@SchedulerSupport(SchedulerSupport.COMPUTATION)
public final Observable<Observable<T>> window(
long timespan, long timeskip, TimeUnit unit
){
// 생략.
}
  • count만을 인자로 갖는 window() 함수는 입력된 값을 그대로 발행하기 때문에 비동기 작업이라고 보기 어렵다.
  • 위의 함수 원형에는 timespan이라는 시간 동안 입력된 값 중에서 일부를 무시하는 기능을 포함한다.
  • 어떤 필터링 작업을 해줘야 하기 때문에 계산 스케줄러를 활용하게 된다.
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
public class windowSample {
public static void main(String[] args) {
String[] data = {"1", "2", "3", "4", "5", "6"};

CommonUtils.exampleStart();

// 앞의 3개는 100ms 간격으로 발행.
Observable<String> earlySource = Observable.fromArray(data)
.take(3)
.zipWith(Observable.interval(100L, TimeUnit.MILLISECONDS),
(a, b) -> a);

// 가운데 1개는 300ms 후에 발행.
Observable<String> middleSource = Observable.just(data[3])
.zipWith(Observable.interval(300L, TimeUnit.MILLISECONDS),
(a, b) -> a);

// 마지막 2개는 100ms 후에 발행.
Observable<String> lateSource = Observable.just(data[4], data[5])
.zipWith(Observable.interval(100L, TimeUnit.MILLISECONDS),
(a, b) -> a);

// 데이터 3개씩 모아서 새로운 Observable 생성.
Observable<Observable<String>> source = Observable.concat(earlySource, middleSource, lateSource)
.window(3);

source.subscribe(observable -> {
Log.dt("New Observable Started!!");
observable.subscribe(Log::it);
});

CommonUtils.sleep(1000);
CommonUtils.exampleComplete();

}
}
// 결과
RxComputationThreadPool-1 | 365 | debug = New Observable Started!!
RxComputationThreadPool-1 | 366 | value = 1
RxComputationThreadPool-1 | 461 | value = 2
RxComputationThreadPool-1 | 560 | value = 3
RxComputationThreadPool-2 | 861 | debug = New Observable Started!!
RxComputationThreadPool-2 | 861 | value = 4
RxComputationThreadPool-3 | 963 | value = 5
RxComputationThreadPool-3 | 1062 | value = 6
  • window() 함수의 인자로 3을 넣었다. 처음에 Observable을 생성하고 3개의 데이터를 전달받으면 새로운 Observable을 다시 생성하여 값을 발행한다.
  • 1 값을 발행할 때와 4 값을 발행할 때 각각 새로운 Observable이 생성되었다.

debounce()

  • 빠르게 연속 이벤트를 처리하는 흐름 제어 함수다.
  • 안드로이드와 같은 UI 기반의 프로그래밍에서는 유용하게 활용할 수 있다.
  • 예를 들어, 버튼을 빠르게 누르는 상황에서 마지막에 누른 이벤트만 처리해야할 때 간단하게 적용할 수 있다. RxJava를 이용하지 않는다면 마지막에 버튼을 누른 시간을 멤버 변수에 저장하고 일정 시간 동안 if문으로 예외 처리해야 하기 때문에 매우 번거롭고 실수할 가능성도 크다.
  • 첫 번째 원은 지정한 시간 간격 안에 들어왔고 다른 이벤트는 없어서 그대로 발행되었다. 두 번째 원의 경우 시간 간격 안에 세 번째 원이 다시 들어왔으므로 두 번째가 아닌 세 번째 원을 발행한다. 마지막도 마찬가지다.
  • 함수의 원형은 다음과 같다.
    • 계산 스케줄러에서 동작한다.
    • 어떤 이벤트가 입력되고 timeout에서 지정한 시간 동안 추가 이벤트가 발생하지 않으면 마지막 이벤트를 최종적으로 발행한다.
1
2
@SchedulerSupport(SchedulerSupport.COMPUTATION)
public final Observable<T> debounce(long timeout, TimeUnit unit)
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
public class debounceSample {
public static void main(String[] args) {
String[] data = {"1", "2", "3", "5"};
Observable<String> source = Observable.concat(
Observable.timer(100L, TimeUnit.MILLISECONDS).map(i -> data[0]),
Observable.timer(300L, TimeUnit.MILLISECONDS).map(i -> data[1]),
Observable.timer(100L, TimeUnit.MILLISECONDS).map(i -> data[2]),
Observable.timer(300L, TimeUnit.MILLISECONDS).map(i -> data[3])
).debounce(200L, TimeUnit.MILLISECONDS);

source.subscribe(Log::i);
CommonUtils.sleep(1000);
}
}
// 결과
RxComputationThreadPool-2 | value = 1
RxComputationThreadPool-2 | value = 3
RxComputationThreadPool-2 | value = 5
  • 데이터를 발행하는 부분이 특이하다. 각각의 시간 간격이 서로 다르기 때문에 concat() 함수를 활용해 각가 데이터를 발행했다.
  • timer() 함수는 이벤트를 한 번만 발생시키고 완료하기 때문에 concat()과 timer() 함수의 조합은 유용하다.
  • debounce()를 활용해 어떤 이벤트가 입력되고 지정된 timeout인 200ms 안에 더 이상의 이벤트가 없으면 마지막에 입력된 값을 발행한다.
  • 여기서 이해가 잘 안되었던 부분이 있었다. 결과를 예상해봤을 때, 1,2,5라고 생각했지만 아니었다. 왜냐면 정의 자체가 어떤 이벤트가 입력되고 나서 timout 내에 더 이상의 이벤트가 없으면 마지막 이벤트를 발행하는 것이다. 그러니까 어떤 이벤트가 입력되고 나서 timout 내에 이벤트가 있는지 찾는 것이다. 그래서 결과는 1,3,5가 맞다.

Comment and share

해당 포스팅 글이 로컬에서만 보이는 문제로 인하여 재업로드한 글입니다.

RxJava와 관련된 내용은 RxJava 프로그래밍 책을 구매하여 공부하면서 참고하였습니다. 앞으로 작성하는 RxJava 글은 위의 책으로 공부하면서 정리한 내용입니다. 문제가 된다면 해당 게시글을 삭제하도록 하겠습니다. 아래 글은 제가 책을 보고 공부한 내용과 참고한 내용으로 작성되었으므로 정확하지 않을 수 있으니 깃헙에 있는 이메일로 연락주시면 감사하겠습니다.

리액티브 프로그래밍

리액티브 프로그래밍(Reative Programming)은 반응형 프로그래밍이라고도 한다. 데이터 의 흐름과 전달에 관한 프로그래밍 패러다임이다. 기존의 명령형 프로그래밍은 주로 컴퓨터 하드웨어를 대상으로 프로그래머가 작성한 코드가 정해진 절차에 따라 순서대로 실행된다. 리액티브 프로그래밍은 데이터 흐름을 먼저 정의하고 데이터가 변경되었을 때 연관되는 수식이나 함수가 업데이트되는 방식이다.

가장 쉽게 이해할 수 있는 예는 MS의 엑셀(즉, 스프레드 시트)이다. 엑셀에서 값을 변경했을 때 자동으로 반영되는 것이 이러한 예를 설명한다.

  • 기존의 명령형 프로그래밍 <–> 반응형 프로그래밍(즉, 리액티브 프로그래밍)
  • 명령형 프로그래밍 방식은 변경이 발생했다는 통지를 받아서 연말 매출액을 새로 계산하는 당겨오는(pull) 방식이지만, 리액티브 프로그래밍은 데이터 소스가 변경된 데이터를 밀어주는(push 방식이다. 일종의 옵저버 패턴이라고 생각하면 된다.
  • 명령형 프로그래밍의 반대말은 선언형 프로그래밍이라고도 한다.
    • ex) SQL.
    • 반응형 프로그래밍은 선언형 프로그래밍을 지향한다.

자바언어와 리액티브 프로그래밍

  • 기존의 pull 방식의 프로그래밍 개념 -> push 방식의 프로그래밍 개념으로 바뀜.
  • 함수형 프로그래밍의 지원을 받는다.

리액티브 프로그래밍에서는 데이터의 변화가 발생했을 때 변경이 발생한 곳(데이터 소스)에서 새로운 데이터를 보내(push 방식) 준다. 기존 자바 프로그래밍이 pull 방식이라면 리액티브 프로그래밍은 push 방식이다.

한편 우리가 아는 콜백이나 옵저버 패턴을 넘어서 RxJava 기반의 리액티브 프로그래밍이 되려면 함수형 프로그래밍이 필요하다. 콜백이나 옵저버 패턴은 옵저버가 1개이거나 단일 스레드 환경에서는 문제가 없지만, 멀티 스레드 환경에서는 사용시 많은 주의가 필요하다.
대표적인 예가 데드락동기화문제이다.

추가적으로 함수형 프로그래밍은 부수 효과(side effect)가 없다. 부수 효과란 콜백이나 옵저버 패턴이 스레드에 안전하지 않은 이유가 같은 자원에 여러 스레드가 경쟁 조건(race condition)에 빠지게 되었을 때 예측할 수 없는 잘못된 결과가 나오는 것을 말한다. 한 두개의 스레드가 있을 때는 잘 동작하다가 수십, 수백개의 스레드가 동시에 단일 자원에 접근하면 계산 결과가 꼬이게 되고 디버깅도 어려워진다.

함수형 프로그래밍은 부수 효과가 없는 순수 함수(pure function)를 지향한다. 따라서 멀티 스레드 환경에서도 안전하다. 자바를 사용해 리액티브 프로그래밍을 하기 위해서는 함수형 프로그래밍의 도움이 필요하다.

그럼 여기서 말하는 함수형 프로그래밍은 무엇을 말하는 걸까?

간략하게 말하면 함수형 프로그래밍 언어는 함수를 단지 호출하는 대상이 아닌 변수로도 할 수 있고 인자로도 넘길 수 있고 마음대로 지지고 볶고? 할 수 있다. (아직 개념이 정확히 잡히지 않아서. . ㅜㅜ) 어려운 말로는 일급 시민이라고 표현한다고 한다.

반응형 프로그래밍은 이런 함수형 언어의 도구들을 자유자재로 활용해야 한다. 예를 들어 Java 8에 도입된 람다 표현식은 반드시 알아야 한다.

리액티브 프로그래밍 개념 다시 잡아보자.

RxJava를 비롯해서 리액티브 프로그래밍을 공부하다 보면 새로 등장하는 개념으로 인해 많은 혼란을 겪는다고 한다. 일단, 프로그래밍 스타일이 너무 다르다. 자바는 객체 지향 언어인데 리액티브 프로그래밍은 뭔가 좀 다른 것 같다. 그리고 내가 문제를 바라보는 개념도 이전과는 달라야 하는 것 같다. 이 부분은 아직 감이 안잡히지만 천천히 잡아보도록 하겠다.

아무튼 어렵고 새로운 개념은 초반에 잘 잡아놓으면 나중에 공부할 때 도움이 많이 되기 때문에 여기서 잘 잡아서 앞으로 나아가자. 다음은 위키 백과에 나와 있는 설명 중 일부분이다.

위키 백과의 일부

  • 상호 작용 프로그램은 프로그램이 주도하는 속도로 사용자 혹은 다른 프로그램과 상호작용 한다.
  • 사용자의 관점으로 볼 때 시분할 시스템은 상호작용 프로그램이다.
  • 리액티브 프로그래밍은 주변의 환경과 끊임없이 상호작용을 하는데 프로그램이 주도하는 것이 아니라 환경이 변하면 이벤트를 받아 동작한다.
  • 상호작용 프로그램은 자신의 속도에 맞춰 일하고 대부분 통신을 담당하는 반면 리액티브 프로그램은 외부 요구에 반응에 맞춰 일하고 대부분 정확한 인터럽트 처리를 담당한다.

자바에서는 이런 RxJava와 같은 리액티브 프로그래밍을 하기 위해서 기반이 마련되야 한다. 즉, 데이터 소스를 정의할 수 있고 그것의 변경 사항을 받아서 내 프로그램에 알려줄 존재(push)가 필요하다. 이를 RxJava 라이브러리를 통해서 구현할 수 있다.

RxJava를 만든 이유가 뭘까??

RxJava는 지금 우리가 아주 아주 잘 사용하는 [넷플릭스](https://www.netflix.com/kr/)의 기술 블로그에서 처음 소개되었다. 성능 개선을 위해서 넷플릭스는 .NET 환경의 리액티브 확장 라이브러리(Rx)를 JVM에 포팅(??)하여 RxJava를 만들었다. 넷플릭스가 RxJava를 만든 핵심적인 이유는 아래와 같다.

  1. 동시성을 적극적으로 끌어안기에 자바는 번거롭다.

자바가 동시성(? 정확히 뭐지…?)을 처리하기에 번거롭기 때문에 넷플릭스는 클라이언트의 요청을 서비스 계층에서 동시성을 적극적으로 끌어안음으로 이를 해결했다. 클라이언트의 요청을 처리할 때 다수의 비동기 실행 흐름(스레드 등등)을 생성하고 그것의 결과를 취합하여 최종 리턴하는 방식으로 내부 로직을 변경했다고 한다.

  1. 자바의 Future를 조합하기 어렵다?!

약 6년 전 (시간이 벌써 2019년이라니ㅜ.ㅜ) 내가 대학교 1학년일 때이다. 이 당시에 자바 8에서 제공하는 CompletableFuture 같은 클래스가 제공되지 않았다고 한다. 그래서 비동기 흐름을 조합할 수 있는 방법이 거의 없었고, RxJava에서는 이를 해결하려고 비동기 흐름을 조합(Compose)할 수 있는 방법을 제공한다.
RxJava에서 조합하는 실행 단위를 리액티브 연산자(Operator)라고 한다.

  1. 콜백 지옥 탈출!

콜백이 콜백을 부르는 콜백 지옥(Callback Hell) 상황은 코드의 가독성을 떨어트린다. 또한, 문제 발생 시 디버깅을 어렵게 만든다. 콜백은 비동기 방식으로 동작하는 가장 대표적인 패턴이다. 이런 지옥을 탈출하고자 RxJava에서는 콜백을 사용하지 않는 방향으로 설계하려고 했다.

정리

위에서 쭉 반응형 프로그래밍(즉, 리액티브 프로그래밍)은 비동기 데이터의 흐름과 전달에 초점을 맞춘 패러다임이다. 그리고 함수형 프로그래밍 언어의 지원을 받고 이를 활용한다고 했다.

정리하자면 반응형 프로그래밍은 함수형 프로그래밍 언어의 도구들을 가지고 데이터의 흐름을 Composable(구성 가능하게?!)하게 구현하는 것이라고 할 수 있다.

참고

Comment and share

디버깅

코딩하는 도중에 로그를 넣는 이유는 잘못되었을 때를 대처하기 위함이다. 하지만 RxJava 코드는 로그를 넣을 수 있는 공간이 없다. Observable로 시작하는 업스트림(upstream)과 그것을 받아서 처리하는 다운스트림(downstream)이 동일한 문장으로 이루어져 있기 때문이다. 즉, 전체 동작을 선언적으로 만들 수 있으므로 전체 맥락에 대한 가독성은 높아지지만 예외 코드를 어떻게 넣어야 하는지에 대한 어려움이 있다.

원래 함수형 프로그래밍은 함수의 부수 효과를 없도록 하는 것이 원칙이지만 doOnXXX() 계열 함수는 오히려 부수 효과를 일으켜서 내가 작성하는 코드가 문제없는지 알아볼 수 있게 도와준다.

이번 Chapter에서 알아보자.

doOnXXX() 함수

doOnNext(), doOnComplete(), doOnError() 3가지 함수는 Observable의 알림 이벤트에 해당한다. Observable에서 어떤 데이터를 발행할 때는 onNext, 중간에 에러가 발생하면 onError, 모든 데이터를 발행하면 onComplete 이벤트가 발생한다. 어떻게 보면 이 알림 이벤트를 위의 함수가 가로채서 디버깅을 할 수 있도록 도와주는 것이다.

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
public class doOnXXX {
public static void main(String[] args) {
Observable.just("1", "3", "5")
.doOnNext(data -> Log.d("onNext()", data))
.doOnComplete(() -> Log.d("onComplete()"))
.doOnError(error -> Log.e("onError", error))
.subscribe(Log::i);
}
}
// 결과
main | onNext() | debug = 1
main | value = 1
main | onNext() | debug = 3
main | value = 3
main | onNext() | debug = 5
main | value = 5
main | debug = onComplete()

결과는 위와 같이 나온다. doOnNext(), doOnComplete(), doOnError() 함수를 사용해 로그를 출력해봤다. 모두 main 스레드에서 실행되었고, 실제로 Observable이 구독자에게 발행한 데이터는 value로 표시하였다.

하지만, doOnError() 함수의 동작을 보지 못했다. 다른 예제를 통해서 확인해보자.

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
public class doOnXXX {
public static void main(String[] args) {
Observable.just(10, 5, 0)
.map(divider -> 1000 / divider)
.doOnNext(data -> Log.d("onNext()", data))
.doOnComplete(() -> Log.d("onComplete()"))
.doOnError(error -> Log.e("onError()", error.getMessage()))
.subscribe(Log::i);
}
}
// 결과
main | onNext() | debug = 100
main | value = 100
main | onNext() | debug = 200
main | value = 200
main | onError() | error = / by zero
io.reactivex.exceptions.OnErrorNotImplementedException: The exception was not handled due to missing onError handler in the subscribe() method call. Further reading

어떤 수를 0으로 나누려고 하기 때문에 0 데이터가 발행될 때 에러가 발생하는 것을 볼 수 있다.

doOnEach() 함수

onNext, onComplete, onError 이벤트를 각각 처리하는 것이 아니라 한 번에 처리할 수 있기 때문에 편리하다.

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
public class doOnEach {
public static void main(String[] args) {
Observable.just("ONE", "TWO", "THREE")
.doOnEach(noti -> {
if (noti.isOnNext()) {
Log.d("onNext()", noti.getValue());
}

if (noti.isOnComplete()) {
Log.d("onComplete()");
}

if (noti.isOnError()) {
Log.d("onError()", noti.getError().getMessage());
}
})
.subscribe(Log::i);
}
}
// 결과
main | onNext() | debug = ONE
main | value = ONE
main | onNext() | debug = TWO
main | value = TWO
main | onNext() | debug = THREE
main | value = THREE
main | debug = onComplete()

Notification 객체는 발생한 이벤트의 종류를 알 수 있는 boolean 타입의 isOnNext(), isOnComplete(), isOnError() 함수를 제공한다. onNext()의 경우는 getValue() 함수를 호출하면 발행된 값을 얻을 수 있다. onError() 함수의 경우 getError() 함수를 호출하면 Throwable 객체를 얻을 수 있다.

doOnEach() 함수는 오직 onNext, onComplete, onNext 이벤트만 처리한다. 그리고 람다식을 잘 활용하여 간결하 코드를 유지하도록 한다.

doOnSubscribe(), doOnDispose(), 기타 함수

Observable의 알림 이벤트 중에는 onSubscribe와 onDispose 이벤트도 있다. 각각 Observable을 구독했을 때와 구독 해지했을 때의 이벤트를 처리할 수 있다.

  • doOnSubscribe() : Observable을 구독했을 때 어떤 작업을 할 수 있다. 람다 표현식의 인자로는 구독의 결과로 나오는 Disposable 객체가 제공된다.
  • doOnDispose() : Observable의 구독을 해지했을 때 호출되며 인자는 Action 객체이다. 여러 스레드에서 Observable을 참조할 수 있기 때문에 Action 객체는 Thread-Safe하게 동작해야 한다.
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
public class doOnSubscribe {
public static void main(String[] args) {
Observable<String> source = Observable.just("1", "3", "5", "2", "6")
.zipWith(Observable.interval(100L, TimeUnit.MILLISECONDS), (a, b) -> a)
.doOnSubscribe(data -> Log.d("onSubscribe()"))
.doOnDispose(() -> Log.d("onDispose"));

Disposable d = source.subscribe(Log::i);

CommonUtils.sleep(200);
d.dispose();
CommonUtils.sleep(300);
}
}
// 결과
main | debug = onSubscribe()
RxComputationThreadPool-1 | value = 1
RxComputationThreadPool-1 | value = 3
main | debug = onDispose

한편 doOnSubscribe()와 doOnDispose() 함수를 각각 호출하지 않고 한번에 호출하는 함수인 doOnLifeCycle() 함수가 존재한다. 위의 코드에서 doOnSubscribe()와 doOnDispose() 함수를 빼고 doOnLifeCycle() 함수를 사용하면 된다. 결과는 같다.

또한, doOnTerminate() 함수는 Observable이 끝나는 조건이 onComplete 혹은 onError 이벤트가 발생했을 때 실행하는 함수이다. 정확하게는 onComplete() 혹은 onError() 이벤트 발생 직전에 호출된다.

다음 코드를 통해서 결과를 확인해보자.

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
public class doOnTerminate {

public static void main(String[] args) {
Observable.just("1", "3", "5", "7")
.doOnTerminate(() -> Log.d("onTerminate()"))
.doOnNext(data -> Log.d("onNext()", data))
.doOnComplete(() -> Log.d("onComplete()"))
.doOnError(error -> Log.d("onError()", error.getMessage()))
.subscribe(Log::i);
}
}
// 결과
main | onNext() | debug = 1
main | value = 1
main | onNext() | debug = 3
main | value = 3
main | onNext() | debug = 5
main | value = 5
main | onNext() | debug = 7
main | value = 7
main | debug = onTerminate()
main | debug = onComplete()

onComplete 이벤트가 발생하기 직전에 doOnTerminate() 함수가 호출되는 것을 확인할 수 있다.

예외 처리

자바에서는 예외를 처리할 때 try-catch문을 사용했지만, RxJava에서는 사용할 수 없다.
사용한다면 다음과 같은 에러를 만나게 된다.

1
OnErrorNotImplementedException

RxJava 내부에서 onError를 함수의 인자로 넘긴다. 따라서 try-catch문을 활용할 수가 없다. 추가로 함수 체인이나 Observable 내부에서 예외가 발생해도 onError 이벤트가 발생하고 try-catch 문으로는 해결할 수 없다.

onErrorRetrun() 함수

  • 에러도 어떠한 데이터로 보는 것이 적절하다.
  • 예외가 발생했을 때 에러를 의미하는 다른 데이터로 대체한다.
  • onError 이벤트는 데이터 흐름이 바로 중단되므로 subscribe() 함수를 호출할 때, onError 이벤트를 처리하는 것은 OOM 같은 중대한 에러가 발생했을 때만 활용한다.
  • 에러가 발생했을 때 내가 원하는 데이터로 대체할 수 있다.

위의 그림에서 앞의 3개의 데이터가 정상적으로 발행되고 마지막 데이터에서 에러가 발생하는 경우, onErrorReturn() 함수는 인자로 넘겼던 기본ㄱ밧을 대신 발행하고 onComplete 이벤트가 발생한다. onError() 이벤트는 발생하지 않는다.

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
public class onErrorReturn {
public static void main(String[] args) {
String[] grades = {"70", "60", "$100", "93", "83"};
Observable<Integer> source = Observable.fromArray(grades)
.map(data -> Integer.parseInt(data))
.onErrorReturn(e -> {
if (e instanceof NumberFormatException) {
e.printStackTrace();
}
return -1;
});

source.subscribe(data -> {
if (data < 0) {
Log.e("Wrong Data found!!");
return;
}

Log.i("Grade is " + data);
});
}
}
  • 에러를 onErrorReturn() 함수에서 처리하며 NumberFormatException 발생 시 -1을 리턴한다. subscribe() 함수는 성적 데이터를 처리하므로 0보다 커야 하낟. onErrorReturn() 함수에서 예외 발생 시 음수 값을 리턴했으므로 data가 0보다 작으면 에러 발생 여부를 판단하고 에러 로그를 출력한다.

onError 이벤트에서 예외를 처리하는 것과 다른 점

  1. 예외 발생이 예상되는 부분을 선언적으로 처리할 수 있다.
  2. Observable을 생성하는 측과 구독하는 측이 서로 다를 수 있다는 점이다.
    • 구독자는 Observable에서 발생할 수 있는 예외를 구독한 이후에 파악하는 것이 어렵다.
    • 다시 말하면 Observable에서 에러 가능성을 명시하지 않았는데 구독자가 필요한 예외 처리를 빠짐없이 하는 것은 어렵다는 뜻이다. 이럴때 Observable을 생성하는 측에서 발생하는 예외 처리를 미리 해두면 구독자는 선언된 예외 상황을 보고 그에 맞는 처리를 할 수 있다.
  • onErrorReturnItem() : onErrorReturn() 함수와 동일하지만 Throwable 객체를 인자로 전달하지 않기 때문에 코드는 좀 더 간결해진다. 즉, 가독성이 좋아진다.
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
public class onErrorReturn {
public static void main(String[] args) {
String[] grades = {"70", "60", "$100", "93", "83"};
Observable<Integer> source = Observable.fromArray(grades)
.map(data -> Integer.parseInt(data))
.onErrorRetrunItem(-1)

source.subscribe(data -> {
if (data < 0) {
Log.e("Wrong Data found!!");
return;
}

Log.i("Grade is " + data);
});
}
}

onErrorResumeNext() 함수

  • onErrorReturn(), onErrorReturnItem()은 에러가 발생한 시점에 특정 값으로 대체.
  • 에러가 발생했을 때, 내가 원하는 Observable로 대체하는 방법이다.
  • Observable로 대체한다는 것은 에러 발생 시 데이터를 교체하는 것뿐만 아니라 관리자에게 이메일을 보낸다던가 자원을 해제하는 등의 추가 작업을 해야할 때 유용하다.

에러가 발생했을 때, 특정 값을 원하는 Observable로 설정할 수 있다.

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
public class onErrorResumeNext {
public static void main(String[] args) {
String[] salesData = {"100", "200", "A300"};
Observable<Integer> onParseError = Observable.defer(() -> {
Log.d("send email to administrator");
return Observable.just(-1);
}).subscribeOn(Schedulers.io());

Observable<Integer> source = Observable.fromArray(salesData)
.map(Integer::parseInt)
.onErrorResumeNext(onParseError);

source.subscribe(data -> {
if (data < 0) {
Log.e("Wrong Data Found!");
return;
}

Log.i("Sales data: " + data);
});
}
}
// 결과
main | value = Sales data: 100
main | value = Sales data: 200
RxCachedThreadScheduler-1 | debug = send email to administrator
RxCachedThreadScheduler-1 | error = Wrong Data Found!
  • 이처럼 에러가 발생했을 때 관리자에게 이메일을 보내고 '-1’이라는 데이터를 발행하는 Observable로 대체한다.
  • onParseError 변수는 subscribeOn() 함수를 호출하여 IO 스케줄러에서 실행한다. 이처럼 내가 원하는 코드를 실행하는 스케줄러를 선언적으로 지정할 수 있어 활용범위가 넓다.

retry() 함수

  • 예외 처리의 다른 방법은 재시도이다.
  • 예를 들어 서버와 통신할 때 인터넷이 일시적으로 안되거나 서버에 일시적인 장애가 발생하면 클라이언트에서는 일정 시간 후에 다시 통신을 요청하는 것이 필요하다. 이때 1개의 API가 아닌 다수의 API를 연속해서 호출해야 하는 경우 재시도하는 시나리오가 복잡해질 수도 있다.
  • 이런 것을 단순하게 처리할 수 있는 retry() 함수를 제공한다. onError 이벤트 발생 시 해당 처리를 재시도한다.
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
public class RetrySample {
public static void main(String[] args) {
CommonUtils.exampleStart();

String url = "https://api.github.com/zen";
Observable<String> source = Observable.just(url)
.map(OkHttpHelper::getT)
.retry(5)
.onErrorReturn(e -> CommonUtils.ERROR_CODE);

source.subscribe(data -> {
Log.it("result: " + data);
});
}
}

retry() 함수의 실행횟수는 5회로 지정한다. 마지막으로 에러 발생시 ERROR_CODE를 반환한다. 재시도 동작을 확인하기 위해서는 인터넷 환경을 끊은 상태에서 테스트를 진행해야 한다. 결과는 다음과 같다.

1
2
3
4
5
6
7
main | 645 | error = api.github.com: nodename nor servname provided, or not known
main | 646 | error = api.github.com
main | 646 | error = api.github.com
main | 647 | error = api.github.com
main | 647 | error = api.github.com
main | 647 | error = api.github.com
main | 647 | value = result: -500
  • 5회의 재시도 후 최종 요청이 실패 처리되었다. getT() 함수를 통해서 api 접속을 시도하지만 예외가 발생해서 에러 로그를 찍는 부분으로 빠진다. 요청을 5번 시도하면서 계속 에러 로그를 찍고 그 후에 onErrorReturn() 함수에서 에러 코드를 반환하고 종료한다.
  • 위에서 실행 시간이 문제가 있다. 재시도를 할 때 지연 시간이 없이 바로 재시도하기 때문에 도움이 되지 않는다. 지연 시간을 설정해서 재시도를 해보자.
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
public class RetryDelaySample {
public static void main(String[] args) {
final int RETRY_MAX = 5;
final int RETRY_DELAY = 1000;

CommonUtils.exampleStart();

String url = "https://api.github.com/zen";
Observable<String> source = Observable.just(url)
.map(OkHttpHelper::getT)
.retry((retryCount, e) -> {
Log.e("retryCount: " + retryCount);
CommonUtils.sleep(RETRY_DELAY);

return retryCount < RETRY_MAX ? true : false;
})
.onErrorReturn(e -> CommonUtils.ERROR_CODE);

source.subscribe(data -> {
Log.it("result: " + data);
});
}
}
  • 재시도 횟수는 5회이고 지연 시간 간격은 1000ms이다. 재시도할 때 CommonUtils.sleep() 함수를 호출해 1000ms 동안 대기한다.
  • api 호출을 하고 인터넷 연결이 되어 있지 않다면 재시도를 하게 된다. 5번까지 재시도를 하고 1000ms 간격으로 시도를 하면서 재시도 횟수를 로그를 통해 기록한다. 재시도 횟수가 5회 이하일 때는 true를 이후에는 false를 반환한다.
  • 결과는 다음과 같다.
1
2
3
4
5
6
7
8
9
10
11
main | 716 | error = api.github.com: nodename nor servname provided, or not known
main | error = retryCount: 1
main | 1721 | error = api.github.com
main | error = retryCount: 2
main | 2726 | error = api.github.com
main | error = retryCount: 3
main | 3726 | error = api.github.com
main | error = retryCount: 4
main | 4728 | error = api.github.com
main | error = retryCount: 5
main | 5732 | value = result: -500

retryUntil()

  • retry() 함수는 재시도를 지속할 조건이 없을 때 재시도를 중단한다.
  • 재시도를 중단할 조건이 발생할 때까지 재시도 한다.
  • 함수 원형은 다음과 같다.
1
public final Observable<T> retryUntil(final BooleanSupplier stop)
  • BooleanSupplier 객체는 인자는 없고 Boolean 값을 리턴하는 함수형 인터페이스다.
  • 즉, retryUntil() 함수의 인자로 Boolean 값을 리턴하는 구문이 들어가야 한다. 이게 재시도를 중단할 조건을 의미한다.
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
public class RetryUntil {
public static void main(String[] args) {
CommonUtils.exampleStart();

String url = "https://api.github.com/zen";
Observable<String> source = Observable.just(url)
.map(OkHttpHelper::getT)
.subscribeOn(Schedulers.io())
.retryUntil(() -> {
if (CommonUtils.isNetworkAvailable()) {
return true; // 중지.
}

CommonUtils.sleep(1000);
return false; // 계속 진행.
})
.onErrorReturn(e -> CommonUtils.ERROR_CODE);

source.subscribe(Log::i);

// IO 스케줄러에서 실행되기 때문에 sleep 함수가 필요함.
CommonUtils.sleep(5000);
}
}
  • 보통 재시도 로직은 별도의 스레드에서 동작하기 때문에 IO 스케줄러를 활용한다.
  • retryUntil() 함수의 인자인 람다 표현식에는 먼저 CommonUtils.isNetworkAvailable()를 호출해 네트워크가 사용 가능한 상태인지 확인한다. 만약, true를 반환하면 재시도를 중단하도록 true를 반환한다. 네트워크를 사용할 수 없는 상태라면 1000ms를 쉬고 재시도(재구독)한다. 이때 람다 표현식은 false를 반환한다.
  • 결국 retryUntil 함수의 인자인 람다 표현식이 true를 반환해야 재시도를 중단하게 된다. false를 반환하면 재시도를 계속하게 된다.
  • 결과는 다음과 같다. Process가 종료된 것은 인터넷에 연결되어 재시도가 끝났다는 것으로 해석할 수 있다.
1
2
3
4
5
6
7
8
9
10
11
12
RxCachedThreadScheduler-1 | 680 | error = api.github.com: nodename nor servname provided, or not known
RxCachedThreadScheduler-1 | Network is not available
RxCachedThreadScheduler-2 | 1685 | error = api.github.com
RxCachedThreadScheduler-2 | Network is not available
RxCachedThreadScheduler-1 | 2691 | error = api.github.com
RxCachedThreadScheduler-1 | Network is not available
RxCachedThreadScheduler-2 | 3695 | error = api.github.com
RxCachedThreadScheduler-2 | Network is not available
RxCachedThreadScheduler-1 | 4698 | error = api.github.com
RxCachedThreadScheduler-1 | Network is not available

Process finished with exit code 0

retryWhen()

  • 재시도와 관련된 함수 중 가장 복잡하다.
  • 주로 재시도 조건을 동적으로 설정해야 하는 복잡한 로직을 구현할 때 사용한다.

Comment and share

지금까지 공부했던 예제의 공통점은 대부분의 동작이 현재 즉, main 스레드에서 동작한다는 것이었다. 하지만, 실무에서는 요구사항에 맞게 비동기로 동작할 수 있도록 이를 바꿔야 한다. 이때 스케줄러를 이용한다.

스케줄러는 스레드를 지정할 수 있게 해준다. 단순히 새로운 스레드를 생성하거나 기존의 Executors를 활용하는 것을 넘어 새로운 방식으로 볼 수 있다. 그동안 어렵게 다뤄야 했단 비동기 프로그래밍이 간결한 코드로 구성될 수 있다.

위의 그림에서 시간 표시줄에 주목해야 한다. 마블 다이어그램을 코드로 표현하면 아래와 같다.

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
public class FlipSample {
public static void main(String[] args) {
String[] objs = {"1-T", "2-S", "3-P"};
Observable<String> source = Observable.fromArray(objs)
.doOnNext(data -> Log.d("Original data = " + data))
.subscribeOn(Schedulers.newThread())
.observeOn(Schedulers.newThread())
.map(Shape::flip);

source.subscribe(Log::i);
CommonUtils.sleep(500);
}
}
// 결과
RxNewThreadScheduler-1 | debug = Original data = 1-T
RxNewThreadScheduler-1 | debug = Original data = 2-S
RxNewThreadScheduler-1 | debug = Original data = 3-P
RxNewThreadScheduler-2 | value = (flipped)1-T
RxNewThreadScheduler-2 | value = (flipped)2-S
RxNewThreadScheduler-2 | value = (flipped)3-P
  • doOnNext() : Observable 에서 onNext 이벤트가 발생하면 실행되며, 여기에서는 원래의 데이터 값을 확인한다.
  • subscribeOn() : 구독자가 Observable 에 subscribe() 함수를 호출하여 구독할 때 실행되는 스레드를 지정한다. -> 해당 작업을 어느 쓰레드에서 실행할 것인가?!
  • observeOn() : Observable 에서 생성한 데이터 흐름이 여기저기 함수를 거치며 처리될 때, 동작이 어느 쓰레드에서 일어나는지 지정할 수 있다. -> 받은 결과를 어느 쓰레드에서 수행할지?!

결과를 보면 최초의 데이터 흐름이 발생하는 스레드와 flip() 함수를 거쳐서 구독자에게 전달되는 스레드가 다르다. 보통 우리는 새로운 스레드를 생성하거나 Runnable 혹은 Callable 객체를 생성하는데 우리는 전달한 적이 없다. 단지 subscribeOn()observeOn() 함수에 어떤 스케줄러를 지정했을 뿐이다.

이처럼 스케줄러를 활용하는 비동기 프로그래밍의 핵심은 바로 데이터 흐름이 발생하는 스레드와 처리된 결과를 구독자에게 전달하는 스레드를 분리할 수 있다는 것이다.

위의 코드에서 observeOn() 함수 호출 부분을 제거해보면 어떤 결과가 나올까? 결과는 아래에서 확인할 수 있다.

1
2
3
4
5
6
7
// 결과
RxNewThreadScheduler-1 | debug = Original data = 1-T
RxNewThreadScheduler-1 | value = (flipped)1-T
RxNewThreadScheduler-1 | debug = Original data = 2-S
RxNewThreadScheduler-1 | value = (flipped)2-S
RxNewThreadScheduler-1 | debug = Original data = 3-P
RxNewThreadScheduler-1 | value = (flipped)3-P

observeOn() 함수를 지정하지 않으면 subscribeOn() 함수로 지정한 스레드에서 모든 로직을 실행한다.

지금까지 배운 내용을 간단하게 정리하면 아래와 같다.

  1. 스케줄러는 RxJava 코드를 어느 스레드에서 실행할지 지정할 수 있다.
  2. subscribeOn() 함수와 observeOn() 함수를 모두 지정하면 Observable에서 데이터 흐름이 발생하는 스레드와 처리된 결과를 구독자에게 발행하는 스레드를 분리할 수 있다.
  3. subscribeOn() 함수만 호출하면 Observable의 모든 흐름이 동일한 스레드에서 실행된다.(observeOn() 함수를 생략했을 경우!)
  4. 스케줄러를 별도로 지정하지 않으면 현재(main) 스레드에서 동작을 실행한다.

스케줄러의 종류

  • 특정 스케줄러를 사용하다가 다른 스케줄러로 변경하기 쉽다는 특징을 가지고 있다.
  • 마치 map() 함수를 한 번 더 호출하는 것처럼 새롭게 스케줄러를 추가하거나 기존의 스케줄러를 다른 것으로 교체할 수 있다.

1. 뉴 스레드 스케줄러

이름처럼 새로운 스레드를 생성한다. 새로운 스레드를 만들어 동작을 실행하고 싶을 때 Schedulers.newThread()를 인자로 넣어주면 된다. 그럼 뉴 스레드 스케줄러는 요청을 받을 때마다 새로운 스레드를 생성한다.

뉴 스레드 스케줄러는 새로운 스레드를 생성하여 내가 원하는 동작을 처리하는 방법이다. 하지만 적극적으로 추천하는 방법은 아니다. RxJava에는 뉴 스레드 스케줄러보다 활용도가 높은 계산 스케줄러와 IO 스케줄러와 같은 다른 스케줄러를 제공하기 때문이다.

2. 계산 스케줄러

4장에서 봤던 interval() 함수는 기본적으로 계산 스케줄러에서 동작한다. 물론 내가 원하는 스케줄러에서 동작하도록 변경할 수도 있다.

1
2
3
@SchedulerSupport(SchedulerSupport.CUSTOM)
public static Observable<Long> interval(
long period, TimeUnit unit, Scheduler scheduler)

CUSTOM은 원하는 스케줄러를 지정할 수 있다는 의미이다. 리액티브 함수 대부분은 마지막 인자로 스케줄러를 지정할 수 있다. flatMap()이나 scan() 함수 등은 대표적인 연산자이지만 스케줄러를 인자로 받지 않는 경우도 있다.

계산 스케줄러는 CPU에 대응하는 계산용 스케줄러이다. 계산 작업(입출력(I/O) 작업을 하지 않는)을 할 때는 대기 시간 없이 빠르게 결과를 도출하는 것이 중요하다. 내부적으로 스레드 풀을 생성하며 스레드 개수는 기본적으로 프로세서 개수와 동일하다.

3. IO 스케줄러

IO 스케줄러는 네트워크상의 요청을 처리하거나 각종 입,출력 작업을 실행하기 위한 스케줄러이다. 계산 스케줄러와 다른 점은 기본적으로 생성되는 스레드 개수가 다르다는 것이다.

즉, 계산 스케줄러는 CPU 개수만큼 스레드를 생성하지만, IO 스케줄러는 필요할 때마다 스레드를 계속 생성한다. 입,출력 작업은 비동기로 실행되지만 결과를 얻기까지 대기 시간이 길다.

두 스케줄러의 비교

  • 계산 스케줄러 : 일반적인 계산 작업
  • IO 스케줄러 : 네트워크상의 요청, 파일 입출력, DB 쿼리 등

4. 트램펄린 스케줄러

트램펄린 스케줄러는 새로운 스레드를 생성하지 않고 현재 스레드에 무한한 크기의 대기 행렬을 생성하는 스케줄러이다. RxJava 1.x에서는 repeat() 함수와 retry() 함수의 기본 스케줄러였으나 RxJava 2.x에서는 이러한 제약이 사라졌다.

새로운 스레드를 생성하지 않는다는 것과 대기 행렬을 자동으로 만들어준다는 것이 뉴 스레드 스케줄러, 계산 스케줄러, IO 스케줄러와 다른 점이다.

5. 싱글 스레드 스케줄러

싱글 스레드 스케줄러는 RxJava 내부에서 단일 스레드를 별도로 생성하여 구독 작업을 처리한다. 단, 생성된 스레드는 여러 번 구독 요청이 와도 공통으로 사용한다.

리액티브 프로그래밍이 비동기 프로그래밍을 지향하기 때문에 싱글 스레드 스케줄러를 활용할 확률은 낮다.

트팸펄린 스케줄러 예제와 비교해보면 실행 스레드가다르다는 사실을 알 수 있다.

  • 트램펄린 스케줄러 : 메인 스레드
  • 싱글 스레드 스케줄러 : RxSingleScheduler-1

뒤에 -1과 같이 번호가 붙있지만 결국 단일 스레드만 사용한다는 사실도 확인할 수 있다.

스케줄러를 활용하여 콜백 지옥 벗어나기

안드로이드 개발을 한다면 가장 쉽게 접근할 수 있고 유용하게 사용할 수 있는 부분이다. 서버와 통신하는 네트워크 프로그래밍을 할 때 마주치는 콜백 지옥(Callback Hell)을 해결하는 것에 집중해보자.

RxJava의 스케줄러를 활용하면 비동기 프로그래밍 방식이 달라진다. 계산 스케줄러나 IO 스케줄러의 예제에서도 살펴봤듯이 스레드를 생성하거나 Callable, Runnable 객체를 실행하는 코드가 사라진다. 리액티브 프로그래밍은 서버와 연동하는 비동기 프로그래밍을 작성할 때 큰 힘을 발휘한다.

observeOn() 함수의 활용

RxJava 스케줄러의 핵심은 결국 제공되는 스케줄러의 종류를 선택한 후 subscribeOn()과 observeOn() 함수를 호출하는 것이다.

  • subscribeOn() : Observable에서 구독자가 subscribe() 함수를 호출했을 때 데이터 흐름을 발행하는 스레드를 지정한다.(즉, 작업 스레드를 지정한다.)
  • observeOn() : 처리된 결과를 구독자에게 전달하는 스레드를 지정한다.(UI 갱신을 위한 스레드를 지정한다.)

또한, subscribeOn() 함수는 처음 지정한 스레드를 고정시키므로 다시 subscribeOn() 함수를 호출해도 무시한다. 하지만, observeOn() 함수는 다르다.

  • subscribeOn(A)를 호출했을 때는 데이터를 발행하는 첫 줄이 스레드 A에서 실행된다. 이후에는 observeOn() 함수가 호출될 때까지 스레드 A에서 실행된다.
  • observeOn(B)를 호출하면 그 다음인 두 번째 줄부터는 스레드 B에서 실행된다.
  • map(o–>D) 함수는 스레드 변겨와는 상관없으므로 세 번째 줄은 계속 스레드 B에서 실행된다.
  • 이제 observeOn© 함수를 호출하면 그 다음 데이터 흐름은 스레드 C에서 실행된다.

요약하면 다음과 같다.

  1. subscribeOn() 함수는 한번 호출했을 때 결정한 스레드를 고정하며 이후에는 다시 호출해도 스레드가 바뀌지 않는다.
  2. observeOn() 함수는 여러 번 호출할 수 있으며 그 다음부터 동작하는 스레드를 바꿀 수 있다.

전통적인 스레드 프로그래밍에서는 일일이 스레드를 만들어야 하고 스레드가 늘어날 때마다 동기화하는 것이 매우 부담스럽기 때문에 이러한 로직을 구현하는 것이 매우 힘들다. 하지만 observeOn() 함수는 스레드 변경이 쉬우므로 활용할 수 있는 범위가 매우 넓다.

책이 있다면 책에 나와있는 openWeatherMap 예제를 실행해보는 것을 추천한다.

Comment and share

조건 연산자

조건 연산자는 Observable의 흐름을 제어하는 역할을 한다. filter 연산자가 발행된 값을 채택하느냐 기각하느냐 여부에 초점을 맞춘다면, 조건 연산자는 지금까지의 흐름을 어떻게 제어할 것인지에 초점을 맞춘다.

다음과 같은 연산자가 있다.

  • amb()
  • takeUntil()
  • skipUntil()
  • all()

1. amb()

amb는 ambiguous(모호한)라는 영어 단어의 줄임말이다. 여러 개의 Observable 중에서 1개의 Observable을 선택하는데, 선택 기준은 가장 먼저 데이터를 발행하는 발행하는 Observable이다. 이후에 나머지 Observable에서 발행하는 데이터는 모두 무시한다.

List 인터페이스처럼 Iterable<Observable> 객체를 인자로 넣으면 그 중에서 가장 먼저 데이터를 발행하는 Observable만 선택해서 계속 값을 발행하도록 해준다.

2. takeUntil()

takeUntil() 함수는 take() 함수에 조건을 설정할 수 있다. 구체적으로 살펴보면 인자로 받은 Observable에서 어떤 값을 발행하면 현재 Observable의 데이터 발행을 중단하고 즉시 완료(onComplete 이벤트 발생)한다. 즉, take() 함수처럼 일정 개수만 값을 발행하되 완료 기준을 다른 Observable에서 값을 발행하는지로 판단하는 것이다.

3. skipUntil()

takeUntil()과 정반대의 함수이다. other Observable을 인자로 받는다는 점은 같지만 Observable에서 데이터를 발행할 때까지 값을 건너뛴다.

takeUntil() 함수와는 다르게 other Observable에서 화살표가 나올 때까지는 값을 발행하지 않고 건너뛰다가 other Observable에서 값을 발행하는 순간부터 원래 Observable에서 값을 정상적으로 발행하기 시작한다.

4. all()

all() 함수는 단순하다. 주어진 조건에 100% 맞을 때만 true 값을 발행하고 조건에 맞지 않는 데이터가 발행되면 바로 false 값을 발행한다.

위의 마블 다이어그램은 ‘1’ 원부터 ‘6’ 원까지 모두 ‘원’ 모양이어야만 true를 발행한다.
all() 함수의 원형은 다음과 같다.

1
2
@SchedulerSupport(SchedulerSupport.NONE)
public final Single<Boolean> all(Predicate<? super T> predicate)

predicate 인자는 filter() 함수의 인자와 동일하다. 주어진 람다 표현식이 true인지 false인지를 판정해주어야 한다.

수학 및 기타 연산자

max(), sum()과 같은 수학 함수와 기타 분류에 해당하는 함수가 있다.

RxJavaMath가 있지만, RxJava 2.x를 지원하지 않으므로 다른 라이브러리를 사용한다. Rxjava2Extensions 라이브러리를 활용해 간단한 수학 함수 및 집합 함수의 활용법을 공부해보자.

1. 수학 함수

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
public class MathSample {
public static void main(String[] args) {
executeMath();
}

private static void executeMath() {
Integer[] data = {1, 2, 3, 4};

// 1. count
Single<Long> source = Observable.fromArray(data)
.count();
source.subscribe(count -> Log.i("count is " + count));

// 2. max
// MathFlowable 클래스의 max() 함수를 호출해 구현한다.
// to() 함수는 다른 타입으로 변환해주기 위한 함수.
Flowable.fromArray(data)
.to(MathFlowable::max)
.subscribe(max -> Log.i("max is " + max));

// 3. min
Flowable.fromArray(data)
.to(MathFlowable::min)
.subscribe(min -> Log.i("min is " + min));

// 4. sum
Flowable<Integer> flowable = Flowable.fromArray(data)
.to(MathFlowable::sumInt);
flowable.subscribe(sum -> Log.i("sum is " + sum));

// 5. average
Flowable<Double> flowable1 = Observable.fromArray(data)
.toFlowable(BackpressureStrategy.BUFFER)
.to(MathFlowable::averageDouble);
flowable1.subscribe(avg -> Log.i("avg is " + avg));
}
}
// 결과
main | value = count is 4
main | value = max is 4
main | value = min is 1
main | value = sum is 10
main | value = avg is 2.5

2. delay()

delay() 함수는 시간을 인자로 받는다. 앞에서 봤던 시간과 관련된 함수들(interval, timer, defer)이 Observable을 생성하는 역할이라면 delay() 함수는 유틸리티 연산자로서 보조 역할을 한다.

delay() 함수의 원형

1
2
@SchedulerSupport(SchedulerSupport.NONE)
public final Observable<T> delay(long delay, TimeUnit unit)

인자로 delay 변수와 시간 단위(ms 등)을 받는다. 그리고 intervale() 함수와 마찬가지로 계산 스케줄러에서 실행한다. 즉, main 스레드가 아닌 계산을 위한 별도 스레드 풀에서 실행하는 것이다.

3. timeInterval()

어떤 값을 발행했을 때 이전 값을 발행한 이후 얼마나 시간이 흘렀는지를 알려준다.

timeInterval() 함수의 원형

1
2
@SchedulerSupport(SchedulerSupport.NONE)
public final Observable<Timed<T>> timeIntervale()

Timed 객체에는 다음처럼 시간을 얻어오거나 Observable의 데이터를 얻을 수 있는 메소드를 제공할 수 있다.

1
2
3
4
public T value()
public TimeUnit unit()
public long time()
public long time(TimeUnit unit)

Comment and share

결합 연산자

생성 연산자와 변환 연산자는 1개의 데이터 흐름(Observable)을 다뤘다. 결합 연산자는 다수의 Observable을 하나로 합하는 방법을 제공한다. flatMap(), groupBy() 함수 등은 1개의 Observable을 확장해주는 반면, 결합 연산자들은 여러 개의 Observable을 내가 원하는 Observable로 결합해준다.

1. zip()

  • 각각의 Observable을 모두 활용해 2개 혹은 그 이상의 Observable을 결합한다.
  • 예를 들어, A, B 두 개의 Observable을 결합한다면 2개의 Observable에서 모두 데이터를 발행해야 결합할 수 있다. 그전까지는 발행을 기다린다.

zip() 함수는 최대 9개의 Observable을 결합할 수 있지만 보통 2개 혹은 3개면 충분하다. 다음은 간단한 zip() 함수의 사용 예제이다.

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16

public static void main(String[] args){
executeZipInteger();
}
private static void executeZipInteger() {
Observable.zip(
Observable.just(100, 200, 300),
Observable.just(10, 20, 30),
Observable.just(1, 2, 3),
(a, b, c) -> a + b + c
).subscribe(Log::i);
}
// 결과
main | value = 111
main | value = 222
main | value = 333

2. zipWith()

zipWith() 함수는 zip() 함수와 동일하지만 Observable을 다양한 함수와 조합하면서 틈틈이 호출할 수 있는 장점이 있다.

1
2
3
4
5
6
7
8
9
10
11
12
13
14
public class ZipWithSample {
public static void main(String[] args) {
Observable<Integer> source = Observable.zip(
Observable.just(100, 200, 300),
Observable.just(10, 20, 30),
(a, b) -> a + b)
.zipWith(Observable.just(1, 2, 3), (ab, c) -> ab * c);
source.subscribe(Log::i);
}
}
// 결과
main | value = 110
main | value = 440
main | value = 990

두 Observable을 zip() 함수로 묶고 세 번째 Observable을 다시 zipWith() 함수로 결합했다. zipWith() 함수를 호출할 때는 앞서 a와 b를 결합했기 때문에 ab로 명명했다.

3. combinLatest()

2개 이상의 Observable을 기반으로 Observable 각각의 값이 변경되었을 때 갱신해주는 함수이다. 마지막 인자로 combiner가 들어가는데 그것이 각 Observable을 결합하여 어떤 결과를 만들어주는 역할을 하는 함수이다. zip() 함수의 zipper 인자와 동일하다.

예를 들어, 첫 번째 Observable과 두 번째 Observable을 결합하는 기능을 만든다고 하면 첫 번째 Observable의 값 혹은 두 번째 Observable의 값이 변경되었을 때 그 값을 자동으로 갱신해준다.

첫 번째 Observable에서만 데이터를 발행하거나 두 번째 Observable의 데이터 흐름만 있으면 구독자에게 어떤 데이터도 발행하지 않는다. 하지만 두 Observable 모두 값을 발행하면 그때는 결과값이 나온다. 그 다음부터는 둘 중에 어떤 것이 갱신되던지 최신 결과값을 보여준다. -> 이 부분이 zip() 함수와 다른 점이다.

zip() 함수처럼 결합하고자 하는 첫 번째와 두 번쨰 Observable을 넣고 마지막으로 그것을 결합하는 combiner() 함수를 넣어주면 된다. 입력할 수 있는 Observable 인자의 개수는 9개이다.

4. merge()

  • zip() 함수나 combineLatest() 함수와 비교하면 가장 단순한 결합 함수이다.
  • 입력 Observable의 순서와 모든 Observable이 데이터를 발행하는지 등에 관여하지 않고 어느 것이든 업스트림에서 먼저 입력되는 데이터를 그대로 발행한다.

5. concat()

  • 2개 이상의 Observable을 이어 붙여주는 함수이다.
  • 첫 번째 Observable에 onComplete 이벤트가 발생해야 두 번째 Observable을 구독한다. 스레드를 활용한 일반적이 코드로 이와 같은 내용을 구현하기는 복잡하다.
  • 결합할 수 있는 Observable은 최대 4개이다.

첫 번째 Observable에 onComplete 이벤트가 발생하지 않게 되면 두 번째 Observable은 영원히 대기한다. 이는 잠재적인 메모리 누수의 위험을 내포한다. 따라서 입력 Observable이 반드시 완료(onComplete 이벤트)될 수 있게 해야 한다.

Observable의 중간 상태를 확인하는 방법

리액티브 프로그래밍을 할 때는 중간에 로그를 출력하는 것이 낯설게 느껴진다. 특히 함수형 프로그래밍 패러다임을 배우면서 "로그나 화면 출력하는 등을 부수 효과를 발생시킨다"라는 내용을 접하면 부수 효과를 최소화하려고 하는 경향이 생긴다.

하지만 부수 효과를 감내하고서라도 적절한 로그는 유지 보수성을 확보하기 위해 꼭 필요하다. RxJava에서는 Observable의 중간 결과를 간편하게 확인할 수 있는 함수들을 제공한다. 확실하지 않은 코드나 예제 코드를 실행할 때 찜찜한 부분이 있다면 doOnNext(), doOnComplete(), doOnError() 함수를 추가해보자.

Comment and share

변환 연산자

위에서 데이터 흐름(Observable)을 만들어내는 생성 연산자를 보았다면 이번에는 데이터 흐름을 원하는대로 변형할 수 있는 변환 연산자를 알아보자.

1. concatMap()

flatMap() 함수와 매우 비슷하다. 하지만, concatMap() 함수는 먼저 들어온 데이터 순서대로 처리해서 결과를 낼 수 있도록 보장한다. 즉, 데이터의 순서를 보장한다. flatMap() 함수는 순서를 보장하지 않는다.

flatMap() 함수는 먼저 들어온 데이터를 처리하는 도중에 새로운 데이터가 들어오면 나중에 들어온 데이터의 처리 결과가 먼저 출력될 수도 있다. 이를 인터리빙(끼어들기)라고 한다.

2.switchMap()

concatMap() 함수가 인터리빙이 발생할 수 있는 상황에서 동작의 순서를 보장해준다면 switchMap() 함수는 순서를 보장하기 위해 기존에 진행중이던 작업을 바로 중단한다. 그리고 여러 개의 값이 발행되었을 때 마지막에 들어온 값만 처리하고 싶을 때 사용한다. 중간에 끊기더라도 마지막 데이터의 처리는 보장하기 때문이다.

마블 다이어그램이 조금 복잡하지만, 시간이 겹치지 않는다는 것을 유의하면 된다.
빨간색 도형의 경우 정상적으로 처리했지만,
초록색 도형을 처리하는 도중에 파란색 도형이 들어왔으므로 초록색 도형의 처리는 중단하고 파란색으로 도형을 처리한다.

switchMap() 함수는 센서 등의 값을 얻어와서 동적으로 처리하는 경우에 매우 유용하다. 센서 값을 중간값보다는 최종적인 값으로 결과를 처리하는 경우가 많기 때문이다. 이럴 때는 flatMap() 함수로 매번 새로운 결과가 나왔는지 검사하지 말고 손쉽게 switchMap() 함수를 사용하자.

3. groupBy()

어떤 기준(KeySelector 인자)으로 단일 Observable을 여러 개로 이루어진 Observable 그룹(GroupedObservable)으로 만든다.

1
2
3
4
5
6
7
8
9
10
11
12
public static void main(String[] args){
String[] objs = {PUPPLE, SKY, triangle(YELLOW), YELLOW, triangle(PUPPLE), triangle(SKY)};
Observable<GroupedObservable<String, String>> source =
Observable.fromArray(objs)
.groupBy(Shape::getShape);

source.subscribe(obj -> {
obj.subscribe(val ->
System.out.println("GROUP:" + obj.getKey() + "\t Value:" + val));
});
CommonUtils.exampleComplete();
}
  • 원래 코틀린으로 해결하려고 했는데, 어떻게 짜야될지 몰라서 자바로 진행해본다.
    코드가 조금 복잡하다. GroupedObservable 클래스는 Observable과 동일하지만 getKey() 메소드를 제공하여 구분된 그룹을 알 수 있게 해준다. source는 objs[] 배열에서 입력 데이터를 가져온다. 그룹을 구별하기 위해서 Shape.getShape() 함수를 사용한다.

getShape() 함수의 내용은 다음과 같다.

1
2
3
4
5
6
7
8
9
10
11
public static String getShape(String obj) {
if (obj == null || obj.equals("")) return NO_SHAPE;
if (obj.endsWith("-H")) return HEXAGON;
if (obj.endsWith("-O")) return OCTAGON;
if (obj.endsWith("-R")) return RECTANGLE;
if (obj.endsWith("-T")) return TRIANGLE;
if (obj.endsWith("<>")) return DIAMOND;
if (obj.endsWith("-P")) return PENTAGON;
if (obj.endsWith("-S")) return STAR;
return "BALL";
}

source.subscribe()에 전달하는 obj는 GroupedObservable 객체이다. 그룹별로 1개씩 생성되므로 생성된 obj 별로 다시 subscribe() 함수를 호출해야 한다. val은 그룹 안에서 각 Observable이 발행한 데이터를 의미한다.(즉, GroupedObservable이 발행한 데이터 ex. “BALL - 6”)

  • 만약, 모든 그룹을 처리하고 싶은게 아니라 특정 그룹만 처리하고 싶다면 filter() 함수를 이용해 조건을 추가해주면 된다.
  • getKey() : 메소드는 그룹의 구분자 값을 리턴한다.(즉, Key 값)

map(), flatMap(), groupBy() 비교

  • map() : 함수는 1개의 데이터를 다른 값이나 타입으로 변환해준다.
  • flatMap() : 함수는 1개의 값을 받아서 여러 개의 데이터(Observable)로 확장해준다.
  • groupBy() : 함수는 값들을 받아서 어떤 기준에 맞는 새로운 Observable 다수를 생성한다.

4. scan()

  • reduce() 함수와 비슷하다.
  • reduce() : Observable에서 모든 데이터가 입력된 후 그것을 종합하여 마지막 1개의 데이터만을 구독자에게 발행한다.

반면, scan() 함수는 실행할 때마다 입력값에 맞는 중간 결과 및 최종 결과를 구독자에게 발행한다.

1
2
3
4
5
6
7
8
9
10
private fun executeScan(){
val source = Observable.just("1", "3", "5")
.scan { ball1: String, ball2: String -> "$ball2 ($ball1)" }

source.subscribe(Log::i)
}
// 결과
main | value = 1
main | value = 3 (1)
main | value = 5 (3 (1))

reduce()와 다른점이 있다. 첫 번째는 source의 타입이 Maybe이 아니라 Observable이라는 것이다. reduce() 함수의 경우 마지막 값이 입력되지 않거나 onComplete 이벤트가 발생하지 않으면 구독자에게 값을 발행하지 않는다. 최악의 경우에는 값을 전혀 발해아지 않고 종료할 수도 있기 때문에 Maybe 클래스 타입으로 정의했다.

반면, scan() 함수는 값이 입력될 때마다 구독자에게 값을 발행한다. 따라서 Maybe가 아니라 Observable이다. 그리고 출력된 결과를 확인하면 main 스레드에서 실행되며 값이 입력될 때마다 발행하는 것을 확인할 수 있다.

Comment and share

[RxJava] Chap04

in RxJava

해당 글은 직접 책을 구매하여 공부의 목적으로 정리하는 글임을 알려드립니다.

이번에는 리액티브 연산자를 카테고리별로 알아보도록 하겠다. 이유는 연산자의 종류가 많기도 하고 카테고리로 쓰임새를 어느 정도 짐작할 수 있기 때문이다.

생성 연산자는 Observable로 데이터 흐름을 만들고 변환 연산자와 필터 연산자는 데이터 흐름을 내가 원하는 방식으로 변형한다. 결합 연산자는 1개의 Observable이 아니라 여러 개의 Observable을 조합할 수 있도록 해준다.

생성 연산자

  • 생성 연산자의 역할은 데이터 흐름을 만드는 것이다.
  • Observable(Observable, Single, Maybe 객체 등)을 만든다고 생각하면 된다.
  • 앞선 챕터에서는 just(), fromXXX(), create() 함수 등을 봤고, 이번에는 다른 연산자들을 확인해보도록 하겠다.

1. interval()

  • 일정 시간 간격으로 데이터 흐름을 생성한다.
  • 주어진 시간 간격으로 0부터 1씩 증가하는 Long 객체를 발행한다.
  • 두 가지의 함수 원형을 가지고 있으며, 최초 지연 시간을 조절하는 함수와 그렇지 않은 함수가 존재한다.
  • period(일정 시간)동안 쉬었다가 데이터를 발행한다.
  • 기본적으로 영원히 지속되기 때문에 폴링 용도로 많이 사용한다.
  • 함수의 동작이 현재 스레드가 아닌 계산을 위한 별도의 스케줄러(스레드)에서 동작한다.
1
2
3
4
// 함수 원형
@SchedulerSupport(SchedulerSupport.COMPUTATION)
public static Observable<Long> interval(long period, TimeUnit unit)
public static Observable<Long> interval(long initialDelay, long period, TimeUnit unit)
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
private fun executeInterval() {

/*
* 시작 시간을 표시하는 유틸리티 메소드이다.
* RxJava - 비동기 프로그래밍이기 때문에 시간에 대한 이해가 중요하다.
* 시작 시간을 기준으로 RxJava 각 함수의 실행 시간을 측정하기 위함이다.
* sleep()을 호출하는 이유는 다른 스레드에서 실행이 완료될 때까지 기다려야 하기 때문이다.
* 해당 문장을 주석 처리하게 되면 기다리지 않고 바로 프로그램이 종료되는 것을 확인할 수 있다.
* 이유는 메인 스레드에서 할 일이 없기 때문이다.
*
* */

CommonUtils.exampleStart()
val source = Observable.interval(100L, TimeUnit.MILLISECONDS)
.map { data ->
(data + 1) * 100
}
.take(5)

source.subscribe(Log::it)
CommonUtils.sleep(TIME)
CommonUtils.exampleComplete()
}
// 결과
RxComputationThreadPool-1 | 247 | value = 100
RxComputationThreadPool-1 | 347 | value = 200
RxComputationThreadPool-1 | 445 | value = 300
RxComputationThreadPool-1 | 545 | value = 400
RxComputationThreadPool-1 | 645 | value = 500
-----------------------

2. timer()

  • interval() 함수와 유사하지만 time() 함수는 한 번만 실행되는 함수이다.
  • 일정 시간이 지난 후 한 개의 데이터를 발행하고 onComplete() 이벤트가 발생한다.
  • 전반적으로 interval() 함수와 유사하다. 계산 스케줄러에서 실행되며 함수의 발행되는 데이터도 interval() 함수의 첫 번째 값인 0L이다.
  • 보통 일정 시간이 지난 후 어떤 동작을 실행할 때 활용한다. 우리가 사용하는 타이머를 맞춘다고 생각하면 된다.
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
private fun executeTimer() {
CommonUtils.exampleStart()
val source = Observable.timer(500L, TimeUnit.MILLISECONDS)
.map { notUsed ->
SimpleDateFormat("yyyy/MM/dd HH:mm:ss")
.format(Date())
}
source.subscribe(Log::it)
CommonUtils.sleep(1000)
CommonUtils.exampleComplete()
/*
* timer() 함수도 메인 스케줄러가 아닌 계산 스케줄러에서 실행되기 때문에
* 계산 스케줄러가 완료될 때까지 기다리기 위해서 sleep() 함수를 호출하여 준다.
* 즉, 계산 스케줄러의 동작이 완료될 때까지 메인 스케줄러가 기다리는 것이다.
* 그렇지 않으면 메인 스케줄러에서 할 일이 없기 때문에 프로그램이 바료 종료된다.
* */
}
// 결과
RxComputationThreadPool-1 | 708 | value = 2019/06/03 21:26:26
-----------------------

3. range()

  • 주어진 값(n)부터 m개의 Integer 객체를 발행한다.
  • interval(), timer() 함수는 Long 객체를 발행했지만, range() 함수는 Integer 객체를 발행한다.
  • 특정한 스케줄러에서 실행되지 않는다. 즉, 현재 스레드에서 실행한다.
  • 반복문(for, while문)을 대체할 수 있다.
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
/*
* range() 함수로 1부터 10까지 숫자를 생성한다.
* 그리고 filter() 함수를 이용해 짝수만 걸러낸다.
* 현재 쓰레드에서 실행된다.
* */
private fun executeRange() {
Observable.range(1, 10)
.filter { num ->
num % 2 == 0
}
.subscribe(Log::i)
}
// 결과
main | value = 2
main | value = 4
main | value = 6
main | value = 8
main | value = 10

4. intervaleRange()

  • interval()과 range() 함수를 혼합해놓은 함수이다.
  • interval() 함수처럼 일정한 시간 간격으로 값을 출력하지만, range() 함수처럼 시작 숫자(n)로부터 m개만큼의 값만 생성하고 onComplete 이벤트가 발생한다.
  • interval() 함수처럼 무한히 데이터 흐름을 발행하지 않는다. 반환 타입은 Long 타입이다.
  • 계산 스케줄러에서 실행된다.

사실 intervalRange() 함수는 interval() 함수와 다른 함수를 조합해서 만들 수 있다. 이유는 intervalRange() 함수가 직관적이지 않기 때문이다.

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
/*
* main 스레드에서 실행되는 것이 아니기 때문에 sleep() 함수를 호출한다.
* */
private fun executeIntervalRange() {
Observable.intervalRange(1,
5,
100L,
100L,
TimeUnit.MILLISECONDS)
.subscribe(Log::i)
CommonUtils.sleep(1000)
}
// 결과
RxComputationThreadPool-1 | value = 1
RxComputationThreadPool-1 | value = 2
RxComputationThreadPool-1 | value = 3
RxComputationThreadPool-1 | value = 4
RxComputationThreadPool-1 | value = 5

interval() 함수 -> intervalRange() 함수 만들기

  • interval() 함수 사용
  • map() 함수 사용
  • take() 함수 사용
1
2
3
4
5
6
7
8
9
private fun executeIntervalRangeUsingIntervalAndMapAndTake() {
Observable.interval(100L, TimeUnit.MILLISECONDS)
.map { data ->
data + 1
}
.take(5)
.subscribe(Log::i)
CommonUtils.sleep(1000)
}

5. defer()

  • 데이터 흐름 생성을 구독자가 subscribe() 함수를 호출할 때까지 미룰 수 있다. 이때 새로운 Observable이 생성된다.
  • Observable의 생성이 구독할 때까지 미뤄지기 때문에 최신 데이터를 얻을 수 있다.
  • 현재 스레드에서 실행되며, 인자로는 Callable<Observable>를 받는다. Callable 객체이므로 구독자가 subscribe()를 호출할 때까지 call() 메소드의 호출을 미룰 수 있다.

defer() 함수는 구독자가 구독할 때까지 Observable의 데이터 발행을 미루는 역할을 한다. 따라서 구독자가 subscribe()를 호출하는 시점에 최신의 데이터를 받을 수 있다. defer() 함수를 사용하지 않은 상황에서 구독자 두명이 그대로 구독을 하게 되면 같은 5에 대한 데이터를 발행하고 구독자가 받게 된다.

개념이 조금 어렵다;; 천천히 다시 볼 필요가 있다.

여기에서 다룬 Observable은 모두 차가운 Observable이다. Observable을 생성할 때 입력값이 결정되고 구독자가 subscribe() 함수를 호출하면 그때 해당 데이터 흐름을 그대로 발행한다. 즉, defer() 함수를 활용하면 subscribe() 함수를 호출할 때의 상황을 반영하여 데이터 흐름의 생성을 지연하는 효과를 보여준다. 내부적으로 구독자가 subscribe() 함수를 호출하면 그때 supplier의 call() 메소드를 호출한다.

6. repeat()

  • 단순히 반복 실행을 하는 함수이다.
  • 서버와 통신을 할 때 해당 서버가 살아있는지 확인(이 확인 과정을 보통 ping 혹은 heart beat라고 한다.)하는 코드를 작성할 때 주로 사용한다.
  • 인자를 입력하지 않으면 영원히 실행된다. 따라서 반복하길 원하는 숫자만큼 인자로 전달하는게 좋다.

예제로 heart beat를 간단하게 구현해보기

서버와 연동하는 앱을 작성하다 보면 통신하는 서버가 동작하는지 확인하는 코드가 필요하다. 지속적인 통신을 해야 하는 서버의 경우 명세서에 동작 확인 코드를 작성할 것을 명시하기도 한다.

보통 일정 시간 안데 heart beat 패킷을 보내지 않으면 서버는 클라이언트와의 연결이 종료된 것으로 판단하고 연결을 해제한다. (보통 30초 간격으로 heart beat 신호를 보낸다.) 이럴 때 repeat() 함수를 활용하면 heart beat 패킷을 보내는 프로그램을 간단하게 구현할 수 있다.

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
fun main(args: Array<String>){
CommonUtils.exampleStart()
val serverUrl: String = "https://api.github.com/zen"

Observable.timer(2, TimeUnit.SECONDS)
.map { it ->
serverUrl
}
.map(OkHttpHelper::get)
.repeat()
.subscribe({
Log.it("Ping result: $it")
}, {
Log.it("Ping result fail: ${it.message}")
})
CommonUtils.sleep(1000)
}

// 아래는 OkHttpHelper 클래스이다.
public class OkHttpHelper {
private static OkHttpClient client = new OkHttpClient();
public static String ERROR = "ERROR";

public static String get(String url) throws IOException {
Request request = new Request.Builder()
.url(url)
.build();
try {
Response res = client.newCall(request).execute();
return res.body().string();
} catch (IOException e) {
Log.e(e.getMessage());
throw e;
}
}

public static String getT(String url) throws IOException {
Request request = new Request.Builder()
.url(url)
.build();
try {
Response res = client.newCall(request).execute();
return res.body().string();
} catch (IOException e) {
Log.et(e.getMessage());
throw e;
}
}

public static String getWithLog(String url) throws IOException {
Log.d("OkHttp call URL = " + url);
return get(url);
}
}

timer() 함수를 사용해 2초마다 반복 실행되도록 했다. 약 2초 간격으로 실행된다. 원래 timer() 함수는 한 번 호출된 후에는 종료된다. 그런데 계속 반복해서 실행되는 것을 볼 수 있다.

이유는 repeat() 함수 때문이다. repeat() 함수는 동작이 한 번 끝난 다음에 다시 구독하는 방식으로 동작한다. 그리고 다시 구독할 때마다 동작하는 스레드의 번호가 달라진다.

만약 동작하는 스레드를 동일하게 맞추고 싶다면 timer()와 repeat() 함수를 빼고 interval() 함수를 대신 넣어 호출하면 된다.

Comment and share

[RxJava] Chap03

in RxJava

연산자

ReactiveX의 연산자는 꽤 많다. 이 연산자들을 모두 안다고 하더라도 기억하기는 어렵다. 하지만, 이름을 보고 내용을 짐작할 수 있고 마블 다이어그램이 도움이 된다. 필요할 때 찾아보자.

대신, 여기서 소개하는 연산자는 자주 사용되니 꼭 알아두자!

1.map()

  • 입력값을 어떤 함수에 넣어서 원하는 값으로 변환하는 함수이다.
  • 입력 데이터가 있고 그것을 변환해줄 중개업자가 있다고 생각하면 좋다.
  • map() 함수는 반환값을 확인한다. 또한, 스케줄러를 지원하지 않으므로 현재 스레드에서 실행된다.
  • 일대일 함수
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
fun main(args: Array<String>) {

// map을 통해 10을 곱해준 값을 반환.
Observable.just(1, 2, 3, 4, 5)
.map { item -> item * 10 }
.subscribe({
println(it)
}, {
println(it.message)
})


// 메소드 참조도 가능.
Observable.just("RED", "BLUE", "YELLOW","BLACK")
.map(Test::ballToIndex)
.subscribe({
println(it)
}, {
println(it.message)
})
}

object Test {
fun ballToIndex(color: String): Int = when (color) {
"RED" -> {
1
}
"YELLOW" -> {
2
}
"BLUE" -> {
3
}
else -> {
-1
}
}
}

2.flatMap()

  • map() 함수와 동일한 기능을 하지만 결과가 Observable로 나온다.
  • 결과값이 Observable이므로 여러 개의 데이터를 발행할 수 있다.
  • 일대다 함수 혹은 일대일 Observable 함수이다.
  • 1개를 발행할 수도 여러 개를 발행할 수도 있다.
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
fun main(args: Array<String>) {

// Function 인터페이스를 통해 제네릭 타입을 선언.
val getDoubleDiamonds : (String) -> Observable<String> = { ball: String ->
Observable.just<String>("$ball<>", "$ball<>")
}

// 3개의 데이터를 넣었는데 6개를 발행한다.
// 일대다이고, Observable 을 반환한다.
// 위에서 정의한 Function 인터페이스 사용.
Observable.just("1", "2", "3")
.flatMap(getDoubleDiamonds)
.subscribe({
println("성공 : $it")
}, {
println("실패 : ${it.message}")
})

// 인라인을 사용.
Observable.just("1","2","3")
.map { ball ->
Observable.just("$ball<>","$ball<>")
}
.subscribe{
println(it)
}

}
  • Function<T,R> : T는 입력값을 의미하고 R은 결과 함수이다.
  • String을 넣으면 여러 개의 String을 발행하는 Observable이 나온다.
  • 여러 개의 데이터를 발행하는 방법은 Observable 뿐이다.

3.filter() 함수

  • Observable에서 원하는 데이터만 걸러내는 역할을 한다.
  • 즉, 필요없는 데이터는 제거하고 관심있는 데이터만 filter() 함수를 통과한다.
  • 간단한 수식을 적용하는 것과 원하는 조건을 작성할 수도 있다.
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
fun main(args: Array<String>) {
// 짝수만 필터링.
Observable.just(1, 2, 3, 4, 5, 6, 7)
.filter { number ->
number % 2 == 0
}
.subscribe({
println("result : $it")
}, {
println("error : ${it.message}")
})
}
// 결과
result : 2
result : 4
result : 6

이외에도 filter() 함수와 비슷한 함수들이 존재한다. 이름만 보고 어떤 기능을 할지 짐작이 가능하다.

  • first(default) : Observable의 첫 번째 값을 필터한다. 만약 값 없이 완료되면 기본값을 반환한다.
  • last(default) : Observable의 마지막 값을 필터한다. 만약 값 없이 완료되면 기본값을 반환한다.
  • take(N) : 최초 N개 값만 가져온다.
  • takeLast(N) : 마지막 N개 값만 필터한다.
  • skip(N) : 최초 N개 값을 건너뛴다.
  • skipLast(N) : 마지막 N개 값을 건너뛴다.

사용 예제는 아래와 같다.

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
fun main(args: Array<String>) {
val source = Observable.just(100, 200, 300, 400, 500, 600)
var single: Single<Int>

// 1. first
single = source.first(-1)
single.subscribe({
println("first result : $it")
}, {
println("error : ${it.message}")
})

// 2. last
single = source.last(-1)
single.subscribe({
println("last result : $it")
}, {
println("error : ${it.message}")
})

// 3. take
source.take(3)
.subscribe({
println("take result : $it")
}, {
println("error : ${it.message}")
})

// 4. takeLast
source.takeLast(3)
.subscribe({
println("takeLast result : $it")
}, {
println("error : ${it.message}")
})

// 5. skip -> 300,400,500,600
source.skip(2)
.subscribe({
println("skip result : $it")
}, {
println("error : ${it.message}")
})

// 6. skipLast -> 100,200,300
source.skipLast(3)
.subscribe({
println("skipLast result : $it")
}, {
println("error : ${it.message}")
})
}
// 결과
first result : 100
last result : 600
take result : 100
take result : 200
take result : 300
takeLast result : 400
takeLast result : 500
takeLast result : 600
skip result : 300
skip result : 400
skip result : 500
skip result : 600
skipLast result : 100
skipLast result : 200
skipLast result : 300

4.reduce()

  • 발행한 데이터를 모두 사용하여 어떤 최종 결과 데이터를 합성할 때 활용한다.
  • 함수형 프로그래밍의 가장 기본 연산자인 map/filter/reduce 패턴을 이루는 마지막 필수 함수이다.
  • Observable을 이용해 들어오는 데이터를 1개씩 모아서 최종 결과를 만들어야 할 때 사용한다고 생각하면 된다. 주로 수치와 관련된 계산 문제에서 활용하면 좋다.

보통 Observable에 입력된 데이터를 필요한 map() 함수로 매핑하고, 원하는 데이터만 추출할 때는 불필요한 데이터를 걸러내는 filter() 함수를 사용한다. 또한 상황에 따라 발행된 데이터를 취합하여 어떤 결과를 만들어낼 때는 reduce 계열의 함수를 사용한다.

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
fun main(args: Array<String>) {
val source: Maybe<String> = Observable.just("1", "3", "5")
.reduce { ball1, ball2 ->
"$ball2($ball1)"
}
source.subscribe({
println("reduce 결과 : $it")
}, {
println("error : ${it.message}")
})

// 람다 표현식을 별도 함수로 분리.
Observable.just("1","3","5")
.reduce(mergeBalls)
.subscribe({
println("reduce 함수로 분리한 결과 : $it")
},{
println("error : ${it.message}")
})
}

reduce() 함수를 호출하면 인자로 넘긴 람다 표현식에 의해 결과 없이 완료될 수도 있다. 따라서 Observable이 아니라 결과가 반환할 수도 아닐 수도 있는 Maybe 객체로 리턴된다.

예제

다음과 같은 예제를 한번 작성해보자.

  1. 전체 매출 데이터를 입력한다.
  2. 매출 데이터 중 TV 매출을 필터링한다.
  3. TV 매출의 합을 구한다.
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
fun main(args: Array<String>) {

// 1. 데이터 입력.
// first : 상품 이름, second : 매출액.
var sales = mutableListOf<Pair<String, Int>>()
sales.add("TV" to 2500)
sales.add("Camera" to 300)
sales.add("TV" to 1600)
sales.add("Phone" to 800)
sales.add("Sofa" to 10000)
sales.add(Pair("TV", 1000))

val source: Maybe<Int> = Observable.fromIterable(sales)
// 2. 매출 데이터 중 TV 매출을 필터링한다.
.filter { sale ->
sale.first == "TV"
}
// map 을 통해 sale 에서 매출액만 뽑는다.
.map { sale ->
sale.second
}
// 3. reduce 를 통해 매출의 합을 구한다.
.reduce { sale1, sale2 ->
sale1 + sale2
}
// reduce 를 호출하기 때문에 Observable 이 아니라 Maybe 를 사용한다.

source.subscribe({ total ->
println("TV Sale: $ $total")
}, {
println("error : ${it.message}")
})
}
// 결과
TV Sale: $ 4100

Comment and share

Author's picture

VictoryWoo

기록을 통해 사람들과 공유하는 것을 좋아합니다.


Android Developer