해당 글은 직접 책을 구매하여 공부의 목적으로 정리하는 글임을 알려드립니다.

이번에는 리액티브 연산자를 카테고리별로 알아보도록 하겠다. 이유는 연산자의 종류가 많기도 하고 카테고리로 쓰임새를 어느 정도 짐작할 수 있기 때문이다.

생성 연산자는 Observable로 데이터 흐름을 만들고 변환 연산자와 필터 연산자는 데이터 흐름을 내가 원하는 방식으로 변형한다. 결합 연산자는 1개의 Observable이 아니라 여러 개의 Observable을 조합할 수 있도록 해준다.

생성 연산자

  • 생성 연산자의 역할은 데이터 흐름을 만드는 것이다.
  • Observable(Observable, Single, Maybe 객체 등)을 만든다고 생각하면 된다.
  • 앞선 챕터에서는 just(), fromXXX(), create() 함수 등을 봤고, 이번에는 다른 연산자들을 확인해보도록 하겠다.

1. interval()

  • 일정 시간 간격으로 데이터 흐름을 생성한다.
  • 주어진 시간 간격으로 0부터 1씩 증가하는 Long 객체를 발행한다.
  • 두 가지의 함수 원형을 가지고 있으며, 최초 지연 시간을 조절하는 함수와 그렇지 않은 함수가 존재한다.
  • period(일정 시간)동안 쉬었다가 데이터를 발행한다.
  • 기본적으로 영원히 지속되기 때문에 폴링 용도로 많이 사용한다.
  • 함수의 동작이 현재 스레드가 아닌 계산을 위한 별도의 스케줄러(스레드)에서 동작한다.
1
2
3
4
// 함수 원형
@SchedulerSupport(SchedulerSupport.COMPUTATION)
public static Observable<Long> interval(long period, TimeUnit unit)
public static Observable<Long> interval(long initialDelay, long period, TimeUnit unit)
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
private fun executeInterval() {

/*
* 시작 시간을 표시하는 유틸리티 메소드이다.
* RxJava - 비동기 프로그래밍이기 때문에 시간에 대한 이해가 중요하다.
* 시작 시간을 기준으로 RxJava 각 함수의 실행 시간을 측정하기 위함이다.
* sleep()을 호출하는 이유는 다른 스레드에서 실행이 완료될 때까지 기다려야 하기 때문이다.
* 해당 문장을 주석 처리하게 되면 기다리지 않고 바로 프로그램이 종료되는 것을 확인할 수 있다.
* 이유는 메인 스레드에서 할 일이 없기 때문이다.
*
* */

CommonUtils.exampleStart()
val source = Observable.interval(100L, TimeUnit.MILLISECONDS)
.map { data ->
(data + 1) * 100
}
.take(5)

source.subscribe(Log::it)
CommonUtils.sleep(TIME)
CommonUtils.exampleComplete()
}
// 결과
RxComputationThreadPool-1 | 247 | value = 100
RxComputationThreadPool-1 | 347 | value = 200
RxComputationThreadPool-1 | 445 | value = 300
RxComputationThreadPool-1 | 545 | value = 400
RxComputationThreadPool-1 | 645 | value = 500
-----------------------

2. timer()

  • interval() 함수와 유사하지만 time() 함수는 한 번만 실행되는 함수이다.
  • 일정 시간이 지난 후 한 개의 데이터를 발행하고 onComplete() 이벤트가 발생한다.
  • 전반적으로 interval() 함수와 유사하다. 계산 스케줄러에서 실행되며 함수의 발행되는 데이터도 interval() 함수의 첫 번째 값인 0L이다.
  • 보통 일정 시간이 지난 후 어떤 동작을 실행할 때 활용한다. 우리가 사용하는 타이머를 맞춘다고 생각하면 된다.
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
private fun executeTimer() {
CommonUtils.exampleStart()
val source = Observable.timer(500L, TimeUnit.MILLISECONDS)
.map { notUsed ->
SimpleDateFormat("yyyy/MM/dd HH:mm:ss")
.format(Date())
}
source.subscribe(Log::it)
CommonUtils.sleep(1000)
CommonUtils.exampleComplete()
/*
* timer() 함수도 메인 스케줄러가 아닌 계산 스케줄러에서 실행되기 때문에
* 계산 스케줄러가 완료될 때까지 기다리기 위해서 sleep() 함수를 호출하여 준다.
* 즉, 계산 스케줄러의 동작이 완료될 때까지 메인 스케줄러가 기다리는 것이다.
* 그렇지 않으면 메인 스케줄러에서 할 일이 없기 때문에 프로그램이 바료 종료된다.
* */
}
// 결과
RxComputationThreadPool-1 | 708 | value = 2019/06/03 21:26:26
-----------------------

3. range()

  • 주어진 값(n)부터 m개의 Integer 객체를 발행한다.
  • interval(), timer() 함수는 Long 객체를 발행했지만, range() 함수는 Integer 객체를 발행한다.
  • 특정한 스케줄러에서 실행되지 않는다. 즉, 현재 스레드에서 실행한다.
  • 반복문(for, while문)을 대체할 수 있다.
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
/*
* range() 함수로 1부터 10까지 숫자를 생성한다.
* 그리고 filter() 함수를 이용해 짝수만 걸러낸다.
* 현재 쓰레드에서 실행된다.
* */
private fun executeRange() {
Observable.range(1, 10)
.filter { num ->
num % 2 == 0
}
.subscribe(Log::i)
}
// 결과
main | value = 2
main | value = 4
main | value = 6
main | value = 8
main | value = 10

4. intervaleRange()

  • interval()과 range() 함수를 혼합해놓은 함수이다.
  • interval() 함수처럼 일정한 시간 간격으로 값을 출력하지만, range() 함수처럼 시작 숫자(n)로부터 m개만큼의 값만 생성하고 onComplete 이벤트가 발생한다.
  • interval() 함수처럼 무한히 데이터 흐름을 발행하지 않는다. 반환 타입은 Long 타입이다.
  • 계산 스케줄러에서 실행된다.

사실 intervalRange() 함수는 interval() 함수와 다른 함수를 조합해서 만들 수 있다. 이유는 intervalRange() 함수가 직관적이지 않기 때문이다.

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
/*
* main 스레드에서 실행되는 것이 아니기 때문에 sleep() 함수를 호출한다.
* */
private fun executeIntervalRange() {
Observable.intervalRange(1,
5,
100L,
100L,
TimeUnit.MILLISECONDS)
.subscribe(Log::i)
CommonUtils.sleep(1000)
}
// 결과
RxComputationThreadPool-1 | value = 1
RxComputationThreadPool-1 | value = 2
RxComputationThreadPool-1 | value = 3
RxComputationThreadPool-1 | value = 4
RxComputationThreadPool-1 | value = 5

interval() 함수 -> intervalRange() 함수 만들기

  • interval() 함수 사용
  • map() 함수 사용
  • take() 함수 사용
1
2
3
4
5
6
7
8
9
private fun executeIntervalRangeUsingIntervalAndMapAndTake() {
Observable.interval(100L, TimeUnit.MILLISECONDS)
.map { data ->
data + 1
}
.take(5)
.subscribe(Log::i)
CommonUtils.sleep(1000)
}

5. defer()

  • 데이터 흐름 생성을 구독자가 subscribe() 함수를 호출할 때까지 미룰 수 있다. 이때 새로운 Observable이 생성된다.
  • Observable의 생성이 구독할 때까지 미뤄지기 때문에 최신 데이터를 얻을 수 있다.
  • 현재 스레드에서 실행되며, 인자로는 Callable<Observable>를 받는다. Callable 객체이므로 구독자가 subscribe()를 호출할 때까지 call() 메소드의 호출을 미룰 수 있다.

defer() 함수는 구독자가 구독할 때까지 Observable의 데이터 발행을 미루는 역할을 한다. 따라서 구독자가 subscribe()를 호출하는 시점에 최신의 데이터를 받을 수 있다. defer() 함수를 사용하지 않은 상황에서 구독자 두명이 그대로 구독을 하게 되면 같은 5에 대한 데이터를 발행하고 구독자가 받게 된다.

개념이 조금 어렵다;; 천천히 다시 볼 필요가 있다.

여기에서 다룬 Observable은 모두 차가운 Observable이다. Observable을 생성할 때 입력값이 결정되고 구독자가 subscribe() 함수를 호출하면 그때 해당 데이터 흐름을 그대로 발행한다. 즉, defer() 함수를 활용하면 subscribe() 함수를 호출할 때의 상황을 반영하여 데이터 흐름의 생성을 지연하는 효과를 보여준다. 내부적으로 구독자가 subscribe() 함수를 호출하면 그때 supplier의 call() 메소드를 호출한다.

6. repeat()

  • 단순히 반복 실행을 하는 함수이다.
  • 서버와 통신을 할 때 해당 서버가 살아있는지 확인(이 확인 과정을 보통 ping 혹은 heart beat라고 한다.)하는 코드를 작성할 때 주로 사용한다.
  • 인자를 입력하지 않으면 영원히 실행된다. 따라서 반복하길 원하는 숫자만큼 인자로 전달하는게 좋다.

예제로 heart beat를 간단하게 구현해보기

서버와 연동하는 앱을 작성하다 보면 통신하는 서버가 동작하는지 확인하는 코드가 필요하다. 지속적인 통신을 해야 하는 서버의 경우 명세서에 동작 확인 코드를 작성할 것을 명시하기도 한다.

보통 일정 시간 안데 heart beat 패킷을 보내지 않으면 서버는 클라이언트와의 연결이 종료된 것으로 판단하고 연결을 해제한다. (보통 30초 간격으로 heart beat 신호를 보낸다.) 이럴 때 repeat() 함수를 활용하면 heart beat 패킷을 보내는 프로그램을 간단하게 구현할 수 있다.

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
fun main(args: Array<String>){
CommonUtils.exampleStart()
val serverUrl: String = "https://api.github.com/zen"

Observable.timer(2, TimeUnit.SECONDS)
.map { it ->
serverUrl
}
.map(OkHttpHelper::get)
.repeat()
.subscribe({
Log.it("Ping result: $it")
}, {
Log.it("Ping result fail: ${it.message}")
})
CommonUtils.sleep(1000)
}

// 아래는 OkHttpHelper 클래스이다.
public class OkHttpHelper {
private static OkHttpClient client = new OkHttpClient();
public static String ERROR = "ERROR";

public static String get(String url) throws IOException {
Request request = new Request.Builder()
.url(url)
.build();
try {
Response res = client.newCall(request).execute();
return res.body().string();
} catch (IOException e) {
Log.e(e.getMessage());
throw e;
}
}

public static String getT(String url) throws IOException {
Request request = new Request.Builder()
.url(url)
.build();
try {
Response res = client.newCall(request).execute();
return res.body().string();
} catch (IOException e) {
Log.et(e.getMessage());
throw e;
}
}

public static String getWithLog(String url) throws IOException {
Log.d("OkHttp call URL = " + url);
return get(url);
}
}

timer() 함수를 사용해 2초마다 반복 실행되도록 했다. 약 2초 간격으로 실행된다. 원래 timer() 함수는 한 번 호출된 후에는 종료된다. 그런데 계속 반복해서 실행되는 것을 볼 수 있다.

이유는 repeat() 함수 때문이다. repeat() 함수는 동작이 한 번 끝난 다음에 다시 구독하는 방식으로 동작한다. 그리고 다시 구독할 때마다 동작하는 스레드의 번호가 달라진다.

만약 동작하는 스레드를 동일하게 맞추고 싶다면 timer()와 repeat() 함수를 빼고 interval() 함수를 대신 넣어 호출하면 된다.