코딩하는 도중에 로그를 넣는 이유는 잘못되었을 때를 대처하기 위함이다. 하지만 RxJava 코드는 로그를 넣을 수 있는 공간이 없다. Observable로 시작하는 업스트림(upstream)과 그것을 받아서 처리하는 다운스트림(downstream)이 동일한 문장으로 이루어져 있기 때문이다. 즉, 전체 동작을 선언적으로 만들 수 있으므로 전체 맥락에 대한 가독성은 높아지지만 예외 코드를 어떻게 넣어야 하는지에 대한 어려움이 있다.
원래 함수형 프로그래밍은 함수의 부수 효과를 없도록 하는 것이 원칙이지만 doOnXXX() 계열 함수는 오히려 부수 효과를 일으켜서 내가 작성하는 코드가 문제없는지 알아볼 수 있게 도와준다.
이번 Chapter에서 알아보자.
doOnXXX() 함수
doOnNext(), doOnComplete(), doOnError() 3가지 함수는 Observable의 알림 이벤트에 해당한다. Observable에서 어떤 데이터를 발행할 때는 onNext, 중간에 에러가 발생하면 onError, 모든 데이터를 발행하면 onComplete 이벤트가 발생한다. 어떻게 보면 이 알림 이벤트를 위의 함수가 가로채서 디버깅을 할 수 있도록 도와주는 것이다.
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17
publicclassdoOnXXX{ publicstaticvoidmain(String[] args){ Observable.just("1", "3", "5") .doOnNext(data -> Log.d("onNext()", data)) .doOnComplete(() -> Log.d("onComplete()")) .doOnError(error -> Log.e("onError", error)) .subscribe(Log::i); } } // 결과 main | onNext() | debug = 1 main | value = 1 main | onNext() | debug = 3 main | value = 3 main | onNext() | debug = 5 main | value = 5 main | debug = onComplete()
결과는 위와 같이 나온다. doOnNext(), doOnComplete(), doOnError() 함수를 사용해 로그를 출력해봤다. 모두 main 스레드에서 실행되었고, 실제로 Observable이 구독자에게 발행한 데이터는 value로 표시하였다.
하지만, doOnError() 함수의 동작을 보지 못했다. 다른 예제를 통해서 확인해보자.
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17
publicclassdoOnXXX{ publicstaticvoidmain(String[] args){ Observable.just(10, 5, 0) .map(divider -> 1000 / divider) .doOnNext(data -> Log.d("onNext()", data)) .doOnComplete(() -> Log.d("onComplete()")) .doOnError(error -> Log.e("onError()", error.getMessage())) .subscribe(Log::i); } } // 결과 main | onNext() | debug = 100 main | value = 100 main | onNext() | debug = 200 main | value = 200 main | onError() | error = / by zero io.reactivex.exceptions.OnErrorNotImplementedException: The exception was not handled due to missing onError handler in the subscribe() method call. Further reading
어떤 수를 0으로 나누려고 하기 때문에 0 데이터가 발행될 때 에러가 발생하는 것을 볼 수 있다.
doOnEach() 함수
onNext, onComplete, onError 이벤트를 각각 처리하는 것이 아니라 한 번에 처리할 수 있기 때문에 편리하다.
if (noti.isOnComplete()) { Log.d("onComplete()"); }
if (noti.isOnError()) { Log.d("onError()", noti.getError().getMessage()); } }) .subscribe(Log::i); } } // 결과 main | onNext() | debug = ONE main | value = ONE main | onNext() | debug = TWO main | value = TWO main | onNext() | debug = THREE main | value = THREE main | debug = onComplete()
Notification 객체는 발생한 이벤트의 종류를 알 수 있는 boolean 타입의 isOnNext(), isOnComplete(), isOnError() 함수를 제공한다. onNext()의 경우는 getValue() 함수를 호출하면 발행된 값을 얻을 수 있다. onError() 함수의 경우 getError() 함수를 호출하면 Throwable 객체를 얻을 수 있다.
doOnEach() 함수는 오직 onNext, onComplete, onNext 이벤트만 처리한다. 그리고 람다식을 잘 활용하여 간결하 코드를 유지하도록 한다.
doOnSubscribe(), doOnDispose(), 기타 함수
Observable의 알림 이벤트 중에는 onSubscribe와 onDispose 이벤트도 있다. 각각 Observable을 구독했을 때와 구독 해지했을 때의 이벤트를 처리할 수 있다.
doOnSubscribe() : Observable을 구독했을 때 어떤 작업을 할 수 있다. 람다 표현식의 인자로는 구독의 결과로 나오는 Disposable 객체가 제공된다.
doOnDispose() : Observable의 구독을 해지했을 때 호출되며 인자는 Action 객체이다. 여러 스레드에서 Observable을 참조할 수 있기 때문에 Action 객체는 Thread-Safe하게 동작해야 한다.
CommonUtils.sleep(200); d.dispose(); CommonUtils.sleep(300); } } // 결과 main | debug = onSubscribe() RxComputationThreadPool-1 | value = 1 RxComputationThreadPool-1 | value = 3 main | debug = onDispose
한편 doOnSubscribe()와 doOnDispose() 함수를 각각 호출하지 않고 한번에 호출하는 함수인 doOnLifeCycle() 함수가 존재한다. 위의 코드에서 doOnSubscribe()와 doOnDispose() 함수를 빼고 doOnLifeCycle() 함수를 사용하면 된다. 결과는 같다.
또한, doOnTerminate() 함수는 Observable이 끝나는 조건이 onComplete 혹은 onError 이벤트가 발생했을 때 실행하는 함수이다. 정확하게는 onComplete() 혹은 onError() 이벤트 발생 직전에 호출된다.
source.subscribe(data -> { if (data < 0) { Log.e("Wrong Data found!!"); return; }
Log.i("Grade is " + data); }); } }
에러를 onErrorReturn() 함수에서 처리하며 NumberFormatException 발생 시 -1을 리턴한다. subscribe() 함수는 성적 데이터를 처리하므로 0보다 커야 하낟. onErrorReturn() 함수에서 예외 발생 시 음수 값을 리턴했으므로 data가 0보다 작으면 에러 발생 여부를 판단하고 에러 로그를 출력한다.
onError 이벤트에서 예외를 처리하는 것과 다른 점
예외 발생이 예상되는 부분을 선언적으로 처리할 수 있다.
Observable을 생성하는 측과 구독하는 측이 서로 다를 수 있다는 점이다.
구독자는 Observable에서 발생할 수 있는 예외를 구독한 이후에 파악하는 것이 어렵다.
다시 말하면 Observable에서 에러 가능성을 명시하지 않았는데 구독자가 필요한 예외 처리를 빠짐없이 하는 것은 어렵다는 뜻이다. 이럴때 Observable을 생성하는 측에서 발생하는 예외 처리를 미리 해두면 구독자는 선언된 예외 상황을 보고 그에 맞는 처리를 할 수 있다.
onErrorReturnItem() : onErrorReturn() 함수와 동일하지만 Throwable 객체를 인자로 전달하지 않기 때문에 코드는 좀 더 간결해진다. 즉, 가독성이 좋아진다.
source.subscribe(data -> { if (data < 0) { Log.e("Wrong Data Found!"); return; }
Log.i("Sales data: " + data); }); } } // 결과 main | value = Sales data: 100 main | value = Sales data: 200 RxCachedThreadScheduler-1 | debug = send email to administrator RxCachedThreadScheduler-1 | error = Wrong Data Found!
이처럼 에러가 발생했을 때 관리자에게 이메일을 보내고 '-1’이라는 데이터를 발행하는 Observable로 대체한다.
onParseError 변수는 subscribeOn() 함수를 호출하여 IO 스케줄러에서 실행한다. 이처럼 내가 원하는 코드를 실행하는 스케줄러를 선언적으로 지정할 수 있어 활용범위가 넓다.
retry() 함수
예외 처리의 다른 방법은 재시도이다.
예를 들어 서버와 통신할 때 인터넷이 일시적으로 안되거나 서버에 일시적인 장애가 발생하면 클라이언트에서는 일정 시간 후에 다시 통신을 요청하는 것이 필요하다. 이때 1개의 API가 아닌 다수의 API를 연속해서 호출해야 하는 경우 재시도하는 시나리오가 복잡해질 수도 있다.
이런 것을 단순하게 처리할 수 있는 retry() 함수를 제공한다. onError 이벤트 발생 시 해당 처리를 재시도한다.
retry() 함수의 실행횟수는 5회로 지정한다. 마지막으로 에러 발생시 ERROR_CODE를 반환한다. 재시도 동작을 확인하기 위해서는 인터넷 환경을 끊은 상태에서 테스트를 진행해야 한다. 결과는 다음과 같다.
1 2 3 4 5 6 7
main | 645 | error = api.github.com: nodename nor servname provided, or not known main | 646 | error = api.github.com main | 646 | error = api.github.com main | 647 | error = api.github.com main | 647 | error = api.github.com main | 647 | error = api.github.com main | 647 | value = result: -500
5회의 재시도 후 최종 요청이 실패 처리되었다. getT() 함수를 통해서 api 접속을 시도하지만 예외가 발생해서 에러 로그를 찍는 부분으로 빠진다. 요청을 5번 시도하면서 계속 에러 로그를 찍고 그 후에 onErrorReturn() 함수에서 에러 코드를 반환하고 종료한다.
위에서 실행 시간이 문제가 있다. 재시도를 할 때 지연 시간이 없이 바로 재시도하기 때문에 도움이 되지 않는다. 지연 시간을 설정해서 재시도를 해보자.
재시도 횟수는 5회이고 지연 시간 간격은 1000ms이다. 재시도할 때 CommonUtils.sleep() 함수를 호출해 1000ms 동안 대기한다.
api 호출을 하고 인터넷 연결이 되어 있지 않다면 재시도를 하게 된다. 5번까지 재시도를 하고 1000ms 간격으로 시도를 하면서 재시도 횟수를 로그를 통해 기록한다. 재시도 횟수가 5회 이하일 때는 true를 이후에는 false를 반환한다.
결과는 다음과 같다.
1 2 3 4 5 6 7 8 9 10 11
main | 716 | error = api.github.com: nodename nor servname provided, or not known main | error = retryCount: 1 main | 1721 | error = api.github.com main | error = retryCount: 2 main | 2726 | error = api.github.com main | error = retryCount: 3 main | 3726 | error = api.github.com main | error = retryCount: 4 main | 4728 | error = api.github.com main | error = retryCount: 5 main | 5732 | value = result: -500
retryUntil() 함수의 인자인 람다 표현식에는 먼저 CommonUtils.isNetworkAvailable()를 호출해 네트워크가 사용 가능한 상태인지 확인한다. 만약, true를 반환하면 재시도를 중단하도록 true를 반환한다. 네트워크를 사용할 수 없는 상태라면 1000ms를 쉬고 재시도(재구독)한다. 이때 람다 표현식은 false를 반환한다.
결국 retryUntil 함수의 인자인 람다 표현식이 true를 반환해야 재시도를 중단하게 된다. false를 반환하면 재시도를 계속하게 된다.
결과는 다음과 같다. Process가 종료된 것은 인터넷에 연결되어 재시도가 끝났다는 것으로 해석할 수 있다.
1 2 3 4 5 6 7 8 9 10 11 12
RxCachedThreadScheduler-1 | 680 | error = api.github.com: nodename nor servname provided, or not known RxCachedThreadScheduler-1 | Network is not available RxCachedThreadScheduler-2 | 1685 | error = api.github.com RxCachedThreadScheduler-2 | Network is not available RxCachedThreadScheduler-1 | 2691 | error = api.github.com RxCachedThreadScheduler-1 | Network is not available RxCachedThreadScheduler-2 | 3695 | error = api.github.com RxCachedThreadScheduler-2 | Network is not available RxCachedThreadScheduler-1 | 4698 | error = api.github.com RxCachedThreadScheduler-1 | Network is not available