지금까지 공부했던 예제의 공통점은 대부분의 동작이 현재 즉, main 스레드에서 동작한다는 것이었다. 하지만, 실무에서는 요구사항에 맞게 비동기로 동작할 수 있도록 이를 바꿔야 한다. 이때 스케줄러를 이용한다.

스케줄러는 스레드를 지정할 수 있게 해준다. 단순히 새로운 스레드를 생성하거나 기존의 Executors를 활용하는 것을 넘어 새로운 방식으로 볼 수 있다. 그동안 어렵게 다뤄야 했단 비동기 프로그래밍이 간결한 코드로 구성될 수 있다.

위의 그림에서 시간 표시줄에 주목해야 한다. 마블 다이어그램을 코드로 표현하면 아래와 같다.

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
public class FlipSample {
public static void main(String[] args) {
String[] objs = {"1-T", "2-S", "3-P"};
Observable<String> source = Observable.fromArray(objs)
.doOnNext(data -> Log.d("Original data = " + data))
.subscribeOn(Schedulers.newThread())
.observeOn(Schedulers.newThread())
.map(Shape::flip);

source.subscribe(Log::i);
CommonUtils.sleep(500);
}
}
// 결과
RxNewThreadScheduler-1 | debug = Original data = 1-T
RxNewThreadScheduler-1 | debug = Original data = 2-S
RxNewThreadScheduler-1 | debug = Original data = 3-P
RxNewThreadScheduler-2 | value = (flipped)1-T
RxNewThreadScheduler-2 | value = (flipped)2-S
RxNewThreadScheduler-2 | value = (flipped)3-P
  • doOnNext() : Observable 에서 onNext 이벤트가 발생하면 실행되며, 여기에서는 원래의 데이터 값을 확인한다.
  • subscribeOn() : 구독자가 Observable 에 subscribe() 함수를 호출하여 구독할 때 실행되는 스레드를 지정한다. -> 해당 작업을 어느 쓰레드에서 실행할 것인가?!
  • observeOn() : Observable 에서 생성한 데이터 흐름이 여기저기 함수를 거치며 처리될 때, 동작이 어느 쓰레드에서 일어나는지 지정할 수 있다. -> 받은 결과를 어느 쓰레드에서 수행할지?!

결과를 보면 최초의 데이터 흐름이 발생하는 스레드와 flip() 함수를 거쳐서 구독자에게 전달되는 스레드가 다르다. 보통 우리는 새로운 스레드를 생성하거나 Runnable 혹은 Callable 객체를 생성하는데 우리는 전달한 적이 없다. 단지 subscribeOn()observeOn() 함수에 어떤 스케줄러를 지정했을 뿐이다.

이처럼 스케줄러를 활용하는 비동기 프로그래밍의 핵심은 바로 데이터 흐름이 발생하는 스레드와 처리된 결과를 구독자에게 전달하는 스레드를 분리할 수 있다는 것이다.

위의 코드에서 observeOn() 함수 호출 부분을 제거해보면 어떤 결과가 나올까? 결과는 아래에서 확인할 수 있다.

1
2
3
4
5
6
7
// 결과
RxNewThreadScheduler-1 | debug = Original data = 1-T
RxNewThreadScheduler-1 | value = (flipped)1-T
RxNewThreadScheduler-1 | debug = Original data = 2-S
RxNewThreadScheduler-1 | value = (flipped)2-S
RxNewThreadScheduler-1 | debug = Original data = 3-P
RxNewThreadScheduler-1 | value = (flipped)3-P

observeOn() 함수를 지정하지 않으면 subscribeOn() 함수로 지정한 스레드에서 모든 로직을 실행한다.

지금까지 배운 내용을 간단하게 정리하면 아래와 같다.

  1. 스케줄러는 RxJava 코드를 어느 스레드에서 실행할지 지정할 수 있다.
  2. subscribeOn() 함수와 observeOn() 함수를 모두 지정하면 Observable에서 데이터 흐름이 발생하는 스레드와 처리된 결과를 구독자에게 발행하는 스레드를 분리할 수 있다.
  3. subscribeOn() 함수만 호출하면 Observable의 모든 흐름이 동일한 스레드에서 실행된다.(observeOn() 함수를 생략했을 경우!)
  4. 스케줄러를 별도로 지정하지 않으면 현재(main) 스레드에서 동작을 실행한다.

스케줄러의 종류

  • 특정 스케줄러를 사용하다가 다른 스케줄러로 변경하기 쉽다는 특징을 가지고 있다.
  • 마치 map() 함수를 한 번 더 호출하는 것처럼 새롭게 스케줄러를 추가하거나 기존의 스케줄러를 다른 것으로 교체할 수 있다.

1. 뉴 스레드 스케줄러

이름처럼 새로운 스레드를 생성한다. 새로운 스레드를 만들어 동작을 실행하고 싶을 때 Schedulers.newThread()를 인자로 넣어주면 된다. 그럼 뉴 스레드 스케줄러는 요청을 받을 때마다 새로운 스레드를 생성한다.

뉴 스레드 스케줄러는 새로운 스레드를 생성하여 내가 원하는 동작을 처리하는 방법이다. 하지만 적극적으로 추천하는 방법은 아니다. RxJava에는 뉴 스레드 스케줄러보다 활용도가 높은 계산 스케줄러와 IO 스케줄러와 같은 다른 스케줄러를 제공하기 때문이다.

2. 계산 스케줄러

4장에서 봤던 interval() 함수는 기본적으로 계산 스케줄러에서 동작한다. 물론 내가 원하는 스케줄러에서 동작하도록 변경할 수도 있다.

1
2
3
@SchedulerSupport(SchedulerSupport.CUSTOM)
public static Observable<Long> interval(
long period, TimeUnit unit, Scheduler scheduler)

CUSTOM은 원하는 스케줄러를 지정할 수 있다는 의미이다. 리액티브 함수 대부분은 마지막 인자로 스케줄러를 지정할 수 있다. flatMap()이나 scan() 함수 등은 대표적인 연산자이지만 스케줄러를 인자로 받지 않는 경우도 있다.

계산 스케줄러는 CPU에 대응하는 계산용 스케줄러이다. 계산 작업(입출력(I/O) 작업을 하지 않는)을 할 때는 대기 시간 없이 빠르게 결과를 도출하는 것이 중요하다. 내부적으로 스레드 풀을 생성하며 스레드 개수는 기본적으로 프로세서 개수와 동일하다.

3. IO 스케줄러

IO 스케줄러는 네트워크상의 요청을 처리하거나 각종 입,출력 작업을 실행하기 위한 스케줄러이다. 계산 스케줄러와 다른 점은 기본적으로 생성되는 스레드 개수가 다르다는 것이다.

즉, 계산 스케줄러는 CPU 개수만큼 스레드를 생성하지만, IO 스케줄러는 필요할 때마다 스레드를 계속 생성한다. 입,출력 작업은 비동기로 실행되지만 결과를 얻기까지 대기 시간이 길다.

두 스케줄러의 비교

  • 계산 스케줄러 : 일반적인 계산 작업
  • IO 스케줄러 : 네트워크상의 요청, 파일 입출력, DB 쿼리 등

4. 트램펄린 스케줄러

트램펄린 스케줄러는 새로운 스레드를 생성하지 않고 현재 스레드에 무한한 크기의 대기 행렬을 생성하는 스케줄러이다. RxJava 1.x에서는 repeat() 함수와 retry() 함수의 기본 스케줄러였으나 RxJava 2.x에서는 이러한 제약이 사라졌다.

새로운 스레드를 생성하지 않는다는 것과 대기 행렬을 자동으로 만들어준다는 것이 뉴 스레드 스케줄러, 계산 스케줄러, IO 스케줄러와 다른 점이다.

5. 싱글 스레드 스케줄러

싱글 스레드 스케줄러는 RxJava 내부에서 단일 스레드를 별도로 생성하여 구독 작업을 처리한다. 단, 생성된 스레드는 여러 번 구독 요청이 와도 공통으로 사용한다.

리액티브 프로그래밍이 비동기 프로그래밍을 지향하기 때문에 싱글 스레드 스케줄러를 활용할 확률은 낮다.

트팸펄린 스케줄러 예제와 비교해보면 실행 스레드가다르다는 사실을 알 수 있다.

  • 트램펄린 스케줄러 : 메인 스레드
  • 싱글 스레드 스케줄러 : RxSingleScheduler-1

뒤에 -1과 같이 번호가 붙있지만 결국 단일 스레드만 사용한다는 사실도 확인할 수 있다.

스케줄러를 활용하여 콜백 지옥 벗어나기

안드로이드 개발을 한다면 가장 쉽게 접근할 수 있고 유용하게 사용할 수 있는 부분이다. 서버와 통신하는 네트워크 프로그래밍을 할 때 마주치는 콜백 지옥(Callback Hell)을 해결하는 것에 집중해보자.

RxJava의 스케줄러를 활용하면 비동기 프로그래밍 방식이 달라진다. 계산 스케줄러나 IO 스케줄러의 예제에서도 살펴봤듯이 스레드를 생성하거나 Callable, Runnable 객체를 실행하는 코드가 사라진다. 리액티브 프로그래밍은 서버와 연동하는 비동기 프로그래밍을 작성할 때 큰 힘을 발휘한다.

observeOn() 함수의 활용

RxJava 스케줄러의 핵심은 결국 제공되는 스케줄러의 종류를 선택한 후 subscribeOn()과 observeOn() 함수를 호출하는 것이다.

  • subscribeOn() : Observable에서 구독자가 subscribe() 함수를 호출했을 때 데이터 흐름을 발행하는 스레드를 지정한다.(즉, 작업 스레드를 지정한다.)
  • observeOn() : 처리된 결과를 구독자에게 전달하는 스레드를 지정한다.(UI 갱신을 위한 스레드를 지정한다.)

또한, subscribeOn() 함수는 처음 지정한 스레드를 고정시키므로 다시 subscribeOn() 함수를 호출해도 무시한다. 하지만, observeOn() 함수는 다르다.

  • subscribeOn(A)를 호출했을 때는 데이터를 발행하는 첫 줄이 스레드 A에서 실행된다. 이후에는 observeOn() 함수가 호출될 때까지 스레드 A에서 실행된다.
  • observeOn(B)를 호출하면 그 다음인 두 번째 줄부터는 스레드 B에서 실행된다.
  • map(o–>D) 함수는 스레드 변겨와는 상관없으므로 세 번째 줄은 계속 스레드 B에서 실행된다.
  • 이제 observeOn© 함수를 호출하면 그 다음 데이터 흐름은 스레드 C에서 실행된다.

요약하면 다음과 같다.

  1. subscribeOn() 함수는 한번 호출했을 때 결정한 스레드를 고정하며 이후에는 다시 호출해도 스레드가 바뀌지 않는다.
  2. observeOn() 함수는 여러 번 호출할 수 있으며 그 다음부터 동작하는 스레드를 바꿀 수 있다.

전통적인 스레드 프로그래밍에서는 일일이 스레드를 만들어야 하고 스레드가 늘어날 때마다 동기화하는 것이 매우 부담스럽기 때문에 이러한 로직을 구현하는 것이 매우 힘들다. 하지만 observeOn() 함수는 스레드 변경이 쉬우므로 활용할 수 있는 범위가 매우 넓다.

책이 있다면 책에 나와있는 openWeatherMap 예제를 실행해보는 것을 추천한다.