디버깅

코딩하는 도중에 로그를 넣는 이유는 잘못되었을 때를 대처하기 위함이다. 하지만 RxJava 코드는 로그를 넣을 수 있는 공간이 없다. Observable로 시작하는 업스트림(upstream)과 그것을 받아서 처리하는 다운스트림(downstream)이 동일한 문장으로 이루어져 있기 때문이다. 즉, 전체 동작을 선언적으로 만들 수 있으므로 전체 맥락에 대한 가독성은 높아지지만 예외 코드를 어떻게 넣어야 하는지에 대한 어려움이 있다.

원래 함수형 프로그래밍은 함수의 부수 효과를 없도록 하는 것이 원칙이지만 doOnXXX() 계열 함수는 오히려 부수 효과를 일으켜서 내가 작성하는 코드가 문제없는지 알아볼 수 있게 도와준다.

이번 Chapter에서 알아보자.

doOnXXX() 함수

doOnNext(), doOnComplete(), doOnError() 3가지 함수는 Observable의 알림 이벤트에 해당한다. Observable에서 어떤 데이터를 발행할 때는 onNext, 중간에 에러가 발생하면 onError, 모든 데이터를 발행하면 onComplete 이벤트가 발생한다. 어떻게 보면 이 알림 이벤트를 위의 함수가 가로채서 디버깅을 할 수 있도록 도와주는 것이다.

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
public class doOnXXX {
public static void main(String[] args) {
Observable.just("1", "3", "5")
.doOnNext(data -> Log.d("onNext()", data))
.doOnComplete(() -> Log.d("onComplete()"))
.doOnError(error -> Log.e("onError", error))
.subscribe(Log::i);
}
}
// 결과
main | onNext() | debug = 1
main | value = 1
main | onNext() | debug = 3
main | value = 3
main | onNext() | debug = 5
main | value = 5
main | debug = onComplete()

결과는 위와 같이 나온다. doOnNext(), doOnComplete(), doOnError() 함수를 사용해 로그를 출력해봤다. 모두 main 스레드에서 실행되었고, 실제로 Observable이 구독자에게 발행한 데이터는 value로 표시하였다.

하지만, doOnError() 함수의 동작을 보지 못했다. 다른 예제를 통해서 확인해보자.

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
public class doOnXXX {
public static void main(String[] args) {
Observable.just(10, 5, 0)
.map(divider -> 1000 / divider)
.doOnNext(data -> Log.d("onNext()", data))
.doOnComplete(() -> Log.d("onComplete()"))
.doOnError(error -> Log.e("onError()", error.getMessage()))
.subscribe(Log::i);
}
}
// 결과
main | onNext() | debug = 100
main | value = 100
main | onNext() | debug = 200
main | value = 200
main | onError() | error = / by zero
io.reactivex.exceptions.OnErrorNotImplementedException: The exception was not handled due to missing onError handler in the subscribe() method call. Further reading

어떤 수를 0으로 나누려고 하기 때문에 0 데이터가 발행될 때 에러가 발생하는 것을 볼 수 있다.

doOnEach() 함수

onNext, onComplete, onError 이벤트를 각각 처리하는 것이 아니라 한 번에 처리할 수 있기 때문에 편리하다.

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
public class doOnEach {
public static void main(String[] args) {
Observable.just("ONE", "TWO", "THREE")
.doOnEach(noti -> {
if (noti.isOnNext()) {
Log.d("onNext()", noti.getValue());
}

if (noti.isOnComplete()) {
Log.d("onComplete()");
}

if (noti.isOnError()) {
Log.d("onError()", noti.getError().getMessage());
}
})
.subscribe(Log::i);
}
}
// 결과
main | onNext() | debug = ONE
main | value = ONE
main | onNext() | debug = TWO
main | value = TWO
main | onNext() | debug = THREE
main | value = THREE
main | debug = onComplete()

Notification 객체는 발생한 이벤트의 종류를 알 수 있는 boolean 타입의 isOnNext(), isOnComplete(), isOnError() 함수를 제공한다. onNext()의 경우는 getValue() 함수를 호출하면 발행된 값을 얻을 수 있다. onError() 함수의 경우 getError() 함수를 호출하면 Throwable 객체를 얻을 수 있다.

doOnEach() 함수는 오직 onNext, onComplete, onNext 이벤트만 처리한다. 그리고 람다식을 잘 활용하여 간결하 코드를 유지하도록 한다.

doOnSubscribe(), doOnDispose(), 기타 함수

Observable의 알림 이벤트 중에는 onSubscribe와 onDispose 이벤트도 있다. 각각 Observable을 구독했을 때와 구독 해지했을 때의 이벤트를 처리할 수 있다.

  • doOnSubscribe() : Observable을 구독했을 때 어떤 작업을 할 수 있다. 람다 표현식의 인자로는 구독의 결과로 나오는 Disposable 객체가 제공된다.
  • doOnDispose() : Observable의 구독을 해지했을 때 호출되며 인자는 Action 객체이다. 여러 스레드에서 Observable을 참조할 수 있기 때문에 Action 객체는 Thread-Safe하게 동작해야 한다.
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
public class doOnSubscribe {
public static void main(String[] args) {
Observable<String> source = Observable.just("1", "3", "5", "2", "6")
.zipWith(Observable.interval(100L, TimeUnit.MILLISECONDS), (a, b) -> a)
.doOnSubscribe(data -> Log.d("onSubscribe()"))
.doOnDispose(() -> Log.d("onDispose"));

Disposable d = source.subscribe(Log::i);

CommonUtils.sleep(200);
d.dispose();
CommonUtils.sleep(300);
}
}
// 결과
main | debug = onSubscribe()
RxComputationThreadPool-1 | value = 1
RxComputationThreadPool-1 | value = 3
main | debug = onDispose

한편 doOnSubscribe()와 doOnDispose() 함수를 각각 호출하지 않고 한번에 호출하는 함수인 doOnLifeCycle() 함수가 존재한다. 위의 코드에서 doOnSubscribe()와 doOnDispose() 함수를 빼고 doOnLifeCycle() 함수를 사용하면 된다. 결과는 같다.

또한, doOnTerminate() 함수는 Observable이 끝나는 조건이 onComplete 혹은 onError 이벤트가 발생했을 때 실행하는 함수이다. 정확하게는 onComplete() 혹은 onError() 이벤트 발생 직전에 호출된다.

다음 코드를 통해서 결과를 확인해보자.

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
public class doOnTerminate {

public static void main(String[] args) {
Observable.just("1", "3", "5", "7")
.doOnTerminate(() -> Log.d("onTerminate()"))
.doOnNext(data -> Log.d("onNext()", data))
.doOnComplete(() -> Log.d("onComplete()"))
.doOnError(error -> Log.d("onError()", error.getMessage()))
.subscribe(Log::i);
}
}
// 결과
main | onNext() | debug = 1
main | value = 1
main | onNext() | debug = 3
main | value = 3
main | onNext() | debug = 5
main | value = 5
main | onNext() | debug = 7
main | value = 7
main | debug = onTerminate()
main | debug = onComplete()

onComplete 이벤트가 발생하기 직전에 doOnTerminate() 함수가 호출되는 것을 확인할 수 있다.

예외 처리

자바에서는 예외를 처리할 때 try-catch문을 사용했지만, RxJava에서는 사용할 수 없다.
사용한다면 다음과 같은 에러를 만나게 된다.

1
OnErrorNotImplementedException

RxJava 내부에서 onError를 함수의 인자로 넘긴다. 따라서 try-catch문을 활용할 수가 없다. 추가로 함수 체인이나 Observable 내부에서 예외가 발생해도 onError 이벤트가 발생하고 try-catch 문으로는 해결할 수 없다.

onErrorRetrun() 함수

  • 에러도 어떠한 데이터로 보는 것이 적절하다.
  • 예외가 발생했을 때 에러를 의미하는 다른 데이터로 대체한다.
  • onError 이벤트는 데이터 흐름이 바로 중단되므로 subscribe() 함수를 호출할 때, onError 이벤트를 처리하는 것은 OOM 같은 중대한 에러가 발생했을 때만 활용한다.
  • 에러가 발생했을 때 내가 원하는 데이터로 대체할 수 있다.

위의 그림에서 앞의 3개의 데이터가 정상적으로 발행되고 마지막 데이터에서 에러가 발생하는 경우, onErrorReturn() 함수는 인자로 넘겼던 기본ㄱ밧을 대신 발행하고 onComplete 이벤트가 발생한다. onError() 이벤트는 발생하지 않는다.

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
public class onErrorReturn {
public static void main(String[] args) {
String[] grades = {"70", "60", "$100", "93", "83"};
Observable<Integer> source = Observable.fromArray(grades)
.map(data -> Integer.parseInt(data))
.onErrorReturn(e -> {
if (e instanceof NumberFormatException) {
e.printStackTrace();
}
return -1;
});

source.subscribe(data -> {
if (data < 0) {
Log.e("Wrong Data found!!");
return;
}

Log.i("Grade is " + data);
});
}
}
  • 에러를 onErrorReturn() 함수에서 처리하며 NumberFormatException 발생 시 -1을 리턴한다. subscribe() 함수는 성적 데이터를 처리하므로 0보다 커야 하낟. onErrorReturn() 함수에서 예외 발생 시 음수 값을 리턴했으므로 data가 0보다 작으면 에러 발생 여부를 판단하고 에러 로그를 출력한다.

onError 이벤트에서 예외를 처리하는 것과 다른 점

  1. 예외 발생이 예상되는 부분을 선언적으로 처리할 수 있다.
  2. Observable을 생성하는 측과 구독하는 측이 서로 다를 수 있다는 점이다.
    • 구독자는 Observable에서 발생할 수 있는 예외를 구독한 이후에 파악하는 것이 어렵다.
    • 다시 말하면 Observable에서 에러 가능성을 명시하지 않았는데 구독자가 필요한 예외 처리를 빠짐없이 하는 것은 어렵다는 뜻이다. 이럴때 Observable을 생성하는 측에서 발생하는 예외 처리를 미리 해두면 구독자는 선언된 예외 상황을 보고 그에 맞는 처리를 할 수 있다.
  • onErrorReturnItem() : onErrorReturn() 함수와 동일하지만 Throwable 객체를 인자로 전달하지 않기 때문에 코드는 좀 더 간결해진다. 즉, 가독성이 좋아진다.
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
public class onErrorReturn {
public static void main(String[] args) {
String[] grades = {"70", "60", "$100", "93", "83"};
Observable<Integer> source = Observable.fromArray(grades)
.map(data -> Integer.parseInt(data))
.onErrorRetrunItem(-1)

source.subscribe(data -> {
if (data < 0) {
Log.e("Wrong Data found!!");
return;
}

Log.i("Grade is " + data);
});
}
}

onErrorResumeNext() 함수

  • onErrorReturn(), onErrorReturnItem()은 에러가 발생한 시점에 특정 값으로 대체.
  • 에러가 발생했을 때, 내가 원하는 Observable로 대체하는 방법이다.
  • Observable로 대체한다는 것은 에러 발생 시 데이터를 교체하는 것뿐만 아니라 관리자에게 이메일을 보낸다던가 자원을 해제하는 등의 추가 작업을 해야할 때 유용하다.

에러가 발생했을 때, 특정 값을 원하는 Observable로 설정할 수 있다.

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
public class onErrorResumeNext {
public static void main(String[] args) {
String[] salesData = {"100", "200", "A300"};
Observable<Integer> onParseError = Observable.defer(() -> {
Log.d("send email to administrator");
return Observable.just(-1);
}).subscribeOn(Schedulers.io());

Observable<Integer> source = Observable.fromArray(salesData)
.map(Integer::parseInt)
.onErrorResumeNext(onParseError);

source.subscribe(data -> {
if (data < 0) {
Log.e("Wrong Data Found!");
return;
}

Log.i("Sales data: " + data);
});
}
}
// 결과
main | value = Sales data: 100
main | value = Sales data: 200
RxCachedThreadScheduler-1 | debug = send email to administrator
RxCachedThreadScheduler-1 | error = Wrong Data Found!
  • 이처럼 에러가 발생했을 때 관리자에게 이메일을 보내고 '-1’이라는 데이터를 발행하는 Observable로 대체한다.
  • onParseError 변수는 subscribeOn() 함수를 호출하여 IO 스케줄러에서 실행한다. 이처럼 내가 원하는 코드를 실행하는 스케줄러를 선언적으로 지정할 수 있어 활용범위가 넓다.

retry() 함수

  • 예외 처리의 다른 방법은 재시도이다.
  • 예를 들어 서버와 통신할 때 인터넷이 일시적으로 안되거나 서버에 일시적인 장애가 발생하면 클라이언트에서는 일정 시간 후에 다시 통신을 요청하는 것이 필요하다. 이때 1개의 API가 아닌 다수의 API를 연속해서 호출해야 하는 경우 재시도하는 시나리오가 복잡해질 수도 있다.
  • 이런 것을 단순하게 처리할 수 있는 retry() 함수를 제공한다. onError 이벤트 발생 시 해당 처리를 재시도한다.
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
public class RetrySample {
public static void main(String[] args) {
CommonUtils.exampleStart();

String url = "https://api.github.com/zen";
Observable<String> source = Observable.just(url)
.map(OkHttpHelper::getT)
.retry(5)
.onErrorReturn(e -> CommonUtils.ERROR_CODE);

source.subscribe(data -> {
Log.it("result: " + data);
});
}
}

retry() 함수의 실행횟수는 5회로 지정한다. 마지막으로 에러 발생시 ERROR_CODE를 반환한다. 재시도 동작을 확인하기 위해서는 인터넷 환경을 끊은 상태에서 테스트를 진행해야 한다. 결과는 다음과 같다.

1
2
3
4
5
6
7
main | 645 | error = api.github.com: nodename nor servname provided, or not known
main | 646 | error = api.github.com
main | 646 | error = api.github.com
main | 647 | error = api.github.com
main | 647 | error = api.github.com
main | 647 | error = api.github.com
main | 647 | value = result: -500
  • 5회의 재시도 후 최종 요청이 실패 처리되었다. getT() 함수를 통해서 api 접속을 시도하지만 예외가 발생해서 에러 로그를 찍는 부분으로 빠진다. 요청을 5번 시도하면서 계속 에러 로그를 찍고 그 후에 onErrorReturn() 함수에서 에러 코드를 반환하고 종료한다.
  • 위에서 실행 시간이 문제가 있다. 재시도를 할 때 지연 시간이 없이 바로 재시도하기 때문에 도움이 되지 않는다. 지연 시간을 설정해서 재시도를 해보자.
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
public class RetryDelaySample {
public static void main(String[] args) {
final int RETRY_MAX = 5;
final int RETRY_DELAY = 1000;

CommonUtils.exampleStart();

String url = "https://api.github.com/zen";
Observable<String> source = Observable.just(url)
.map(OkHttpHelper::getT)
.retry((retryCount, e) -> {
Log.e("retryCount: " + retryCount);
CommonUtils.sleep(RETRY_DELAY);

return retryCount < RETRY_MAX ? true : false;
})
.onErrorReturn(e -> CommonUtils.ERROR_CODE);

source.subscribe(data -> {
Log.it("result: " + data);
});
}
}
  • 재시도 횟수는 5회이고 지연 시간 간격은 1000ms이다. 재시도할 때 CommonUtils.sleep() 함수를 호출해 1000ms 동안 대기한다.
  • api 호출을 하고 인터넷 연결이 되어 있지 않다면 재시도를 하게 된다. 5번까지 재시도를 하고 1000ms 간격으로 시도를 하면서 재시도 횟수를 로그를 통해 기록한다. 재시도 횟수가 5회 이하일 때는 true를 이후에는 false를 반환한다.
  • 결과는 다음과 같다.
1
2
3
4
5
6
7
8
9
10
11
main | 716 | error = api.github.com: nodename nor servname provided, or not known
main | error = retryCount: 1
main | 1721 | error = api.github.com
main | error = retryCount: 2
main | 2726 | error = api.github.com
main | error = retryCount: 3
main | 3726 | error = api.github.com
main | error = retryCount: 4
main | 4728 | error = api.github.com
main | error = retryCount: 5
main | 5732 | value = result: -500

retryUntil()

  • retry() 함수는 재시도를 지속할 조건이 없을 때 재시도를 중단한다.
  • 재시도를 중단할 조건이 발생할 때까지 재시도 한다.
  • 함수 원형은 다음과 같다.
1
public final Observable<T> retryUntil(final BooleanSupplier stop)
  • BooleanSupplier 객체는 인자는 없고 Boolean 값을 리턴하는 함수형 인터페이스다.
  • 즉, retryUntil() 함수의 인자로 Boolean 값을 리턴하는 구문이 들어가야 한다. 이게 재시도를 중단할 조건을 의미한다.
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
public class RetryUntil {
public static void main(String[] args) {
CommonUtils.exampleStart();

String url = "https://api.github.com/zen";
Observable<String> source = Observable.just(url)
.map(OkHttpHelper::getT)
.subscribeOn(Schedulers.io())
.retryUntil(() -> {
if (CommonUtils.isNetworkAvailable()) {
return true; // 중지.
}

CommonUtils.sleep(1000);
return false; // 계속 진행.
})
.onErrorReturn(e -> CommonUtils.ERROR_CODE);

source.subscribe(Log::i);

// IO 스케줄러에서 실행되기 때문에 sleep 함수가 필요함.
CommonUtils.sleep(5000);
}
}
  • 보통 재시도 로직은 별도의 스레드에서 동작하기 때문에 IO 스케줄러를 활용한다.
  • retryUntil() 함수의 인자인 람다 표현식에는 먼저 CommonUtils.isNetworkAvailable()를 호출해 네트워크가 사용 가능한 상태인지 확인한다. 만약, true를 반환하면 재시도를 중단하도록 true를 반환한다. 네트워크를 사용할 수 없는 상태라면 1000ms를 쉬고 재시도(재구독)한다. 이때 람다 표현식은 false를 반환한다.
  • 결국 retryUntil 함수의 인자인 람다 표현식이 true를 반환해야 재시도를 중단하게 된다. false를 반환하면 재시도를 계속하게 된다.
  • 결과는 다음과 같다. Process가 종료된 것은 인터넷에 연결되어 재시도가 끝났다는 것으로 해석할 수 있다.
1
2
3
4
5
6
7
8
9
10
11
12
RxCachedThreadScheduler-1 | 680 | error = api.github.com: nodename nor servname provided, or not known
RxCachedThreadScheduler-1 | Network is not available
RxCachedThreadScheduler-2 | 1685 | error = api.github.com
RxCachedThreadScheduler-2 | Network is not available
RxCachedThreadScheduler-1 | 2691 | error = api.github.com
RxCachedThreadScheduler-1 | Network is not available
RxCachedThreadScheduler-2 | 3695 | error = api.github.com
RxCachedThreadScheduler-2 | Network is not available
RxCachedThreadScheduler-1 | 4698 | error = api.github.com
RxCachedThreadScheduler-1 | Network is not available

Process finished with exit code 0

retryWhen()

  • 재시도와 관련된 함수 중 가장 복잡하다.
  • 주로 재시도 조건을 동적으로 설정해야 하는 복잡한 로직을 구현할 때 사용한다.