이번에는 Observable의 개념에 대해서 알아보도록 하겠다. 책을 보면 RxJava는 Observable에서 시작해서 Observable로 끝난다고 해도 과언이 아니라고 한다. 그렇다면 과연 어떤 것인지 공부해보자.

Observable 소개

Observable은 데이터 흐름에 맞게 알림을 보내 구독자가 데이터를 처리할 수 있도록 한다. RxJava에서는 Observable이 중요한 개념이다.

  • Maybe 클래스 : reduce(), firstElement() 함수와 같이 데이터가 발행될 수 있거나 혹은 발행되지 않고도 완료되는 경우를 의미한다.
  • Flowable 클래스 : 데이터가 발행되는 속도가 구독자가 처리하는 속도보다 현저하게 빠른 경우 발생하는 배압(Back Pressure) 이슈에 대응하는 기능을 추가로 제공한다.

위와 같은 클래스들은 이번 Chapter에서는 공부하지 않고 다음 Chapter에서 공부할 예정이다. 이번에는 Observable에 대해서 파헤쳐보자.

Observable 클래스

Observable은 옵저버 패턴을 구현한다. 옵저버 패턴이라 함은 객체의 상태 변화를 관찰하는 관찰자(옵저버) 목록을 객체에 등록한다. 그리고 상태 변화가 있을 때마다 메소드를 호출하여 객체가 직접 목록의 각 옵저버에게 변화를 알려준다. 라이프 사이클은 존재하지 않으며 보통 단일 함수를 통해 변화만 알린다.

옵저버 패턴의 대표적인 예로는 사용자가 버튼을 누르면 버튼에 미리 등록해 둔 onClick() 메소드를 호출해 원하는 처리를 하는 것이다. 안드로이드에서는 onClick() 이벤트를 처리하는 것이 딱 이 예와 같다고 볼 수 있다.

RxJava의 Observable은 세 가지의 알림을 구독자에게 전달한다.

  • onNext() : Observable이 데이터의 발행을 알린다. 기존의 옵저버 패턴과 동일하다.
  • onComplete() : 모든 데이터의 발행을 완료했음을 알린다. onComplete 이벤트는 단 한번만 발생하며, 발생한 후에는 onNext 이벤트가 발생해서는 안된다.
  • onError() : Observable에서 어떠한 이유로 에러가 발생했음을 알린다. onError 이벤트가 발생하면 이후에 onNext나 onComplete 이벤트가 발생하지 않는다. 즉, Observable의 실행을 종료한다.

Observable을 생성할 때는 직접 인스턴스를 만들지 않고 정적 팩토리 함수를 호출한다. 여기서 말하는 정적 팩토리 함수가 뭘까…??

옵저버 패턴

여기쯤에 옵저버 패턴 공부 좀 하고 넣으면 될 것 같다. 아니면 새로운 글을 만들어서 디자인 패턴의 옵저버 패턴을 공부하면 괜찮을 듯 하다.

# 정적팩토리 함수

# 팩토리 함수들

RxJava에서 제공하는 팩토리 함수들을 살펴볼 예정이다. 알아보고 직접 사용해보면서 사용법을 익혀보자.

  • Observable을 만드는 함수들이다.

1. just() 함수

  • 데이터를 발행하는 가장 쉬운 방법은 기존의 자료구조를 사용하는 것이다.
  • just() 함수는 인자로 넣은 데이터를 차례로 발행하려고 Observable을 생성한다.
  • 실제 데이터의 발행은 subscribe() 함수를 호출해야 시작한다.
  • 한 개의 값을 넣을 수도 있고 인자로 여러 개의 값을 넣을 수도 있다.
    • 주의해야 할 점은 최대 10개까지의 데이터를 넣을 수 있다는 것이다.
    • 그리고 타입은 모두 같아야 한다.
    • 데이터 내용을 바꾸지 않고 그대로 발행한다.

2. subscribe() 함수와 Disposable 객체

  • RxJava는 내가 동작시키기 원하는 것을 사전에 정의해둔 다음 실제 그것이 실행되는 시점을 조절할 수 있다. 이때 사용하는 것이 subscribe() 함수이다.
  • Observable은 just() 등의 팩토리 함수로 데이터 흐름을 정의한 후 subscribe() 함수를 호출해야 실제로 데이터를 발행한다.

RxJava는 선언형 프로그래밍

앞선 Chap01에서도 설명했지만 한 번 더 보도록 하겠다. RxJava는 선언형 프로그래밍을 지향한다. 선언형 프로그래밍은 명령형 프로그래밍의 반대말이다. 즉, 어떤 방법(How)으로 동작하는지가 아니라 프로그래밍할 대상이 무엇(What)인지 알려주는 것을 의미한다.

예를 들어 명령형 프로그래밍 언어에서는 실행할 알고리즘과 동작을 구체적으로 명시한다. 하지만 선언형 프로그래밍은 목표를 명시할 뿐 실행할 알고리즘을 명시하지 않는다.

다시 subscribe() 함수에 관한 이야기로 돌아와보자. subscribe() 함수의 주요 원형 중 하나를 설명하겠다.

1
Disposable subscribe()

위의 인자가 없는 subscribe() 함수는 onNext()와 onComplete() 이벤트를 무시하고 onError 이벤트가 발생했을 때만 OnErrorNotImplementedException를 던진다.(throw) 따라서 Observable로 작성한 코드를 테스트하거나 디버깅할 때 활용한다.

이외에도 함수의 원형은 여러 가지 존재한다. 관련된 부분은 문서를 참고하길 바란다. 모든 함수 원형은 Disposable 인터페이스의 객체를 리턴한다. Disposable은 RxJava의 구독 객체에 해당한다.

Disposable 인터페이스의 함수

dispose()는 Observable에게 더 이상 데이터를 발행하지 않도록 구독을 해지하는 함수이다. Observable 계약(Observable Contract)에 따르면 Observable이 onComplete 알림을 보냈을 때 자동으로 dispose()를 호출해 Observable과 구독자의 관계를 끊는다.

따라서 onComplete 이벤트가 정상적으로 발생했다면 구독자가 별도로 dispose()를 호출할 필요가 없다.

  • isDisposed() : Observable이 데이터를 발행하지 않는지(구독을 해지했는지) 확인하는 함수이다.
    • 위에서 언급했듯, onComplete 이벤트가 정상적으로 발생했다면 구독자가 별도로 dispose() 하지 않아도 구독이 해제된다.

3. create()

  • just()는 데이터를 인자로 넣으면 자동으로 알림 이벤트가 발생하지만, create() 함수는 onNext, onComplete, onError 같은 알림을 개발자가 직접 호출해야 된다.
  • 개발자가 무언가를 하는 느낌이 강하다.
  • 데이터 발행을 위해서 onNext()를 호출해야 한다.
  • 모든 데이터를 발행한 후 onComplete()를 호출해야 한다.
1
2
3
4
5
6
7
8
9
10
11
12
13
val sourceInt : Observable<Int> = Observable.create {
it.onNext(100)
it.onNext(200)
it.onNext(300)
it.onNext(400)
it.onComplete() // 데이터 발행 완료.
}

sourceInt.subscribe(System.out::println)
// 람다식 활용.
sourceInt.subscribe {
println("data : $it")
}
  • onNext() 함수를 이용해 차례로 데이터를 발행했다. 그리고 onComplete() 함수를 호출하여 데이터 발행을 완료한다.
  • 람다식을 활용하면 메소드의 원형을 알 필요도 없고 가독성도 훨씬 높아진다.
  • subscribe() 함수를 호출하지 않으면 데이터 발행이 안된다.
  • 메소드 레퍼런스와 람다 표현식을 사용하면 좋다. 이유는 함수의 원형을 알지 않아도 되므로 가독성이 높아진다.

4. fromArray()

  • just(), create()는 단일 데이터를 다룬다.
  • 단일 데이터가 아닐 때는 fromXXX() 함수를 사용하면 된다.
  • 배열에 들어있는 데이터를 세분화할 때 fromArray() 함수를 사용하면 된다.
  • 숫자 뿐 아니라 사용자 정의 클래스 객체도 넣을 수 있다.

기본 타입인 int[] 배열을 RxJava에서 인식시키기 위해서는 Integer[] 배열로 변환해야 한다. 자바 8의 Stream API에서 제공하는 방법을 사용하면 된다.

toIntegerArray() 함수는 int[] 배열 각각의 요소를 Integer로 변환해 Integer[] 배열의 스트림으로 만들어 반환한다. 그리고 최종적으로 스트림을 Integer[] 배열로 만들어준다.

1
2
3
4
5
6
7
8
9
fun main(args : Array<String>){
val array : IntArray = intArrayOf(1,2,3,4)
executeIntFromArray(array)
}
private fun executeIntFromArray(array: IntArray) = Observable.fromArray(IntegerArray(array)).subscribe {
for (i in array) {
println("array : $i")
}
}

5. fromIterable()

  • Iterable 인터페이스는 반복자를 반환한다.
  • 이터레이터 패턴을 구현한 것으로 다음에 어떤 데이터가 있는지와 그 값을 얻어오는 것만 관여할 뿐 특정 데이터 타입에 의존하지 않는 장점이 있다.
  • hasNext(), next() 메소드가 있다.
  • 자바의 많은 컬렉션 클래스가 Iterable 인터페이스를 구현한다.
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
private fun executeFromIterable() {
val list: MutableList<String> = ArrayList()
list.add("Lee")
list.add("Park")
list.add("Kim")

Observable.fromIterable(list).subscribe {
println(it)
}

val orderQueue: BlockingQueue<Order> = ArrayBlockingQueue(100);
orderQueue.add(Order("ORD-1"))
orderQueue.add(Order("ORD-2"))
orderQueue.add(Order("ORD-3"))

Observable.fromIterable(orderQueue).subscribe { order ->
println(order.mid)
}
}
// Order 데이터 클래스.
data class Order(val mid: String)

6. fromCallable()

  • 동시성 API인 Callable 인터페이스
  • Callable 객체와 fromCallable() 함수를 이용해 Observable을 만들 수 있다.
  • 실행 결과를 리턴한다.
  • Executor 인터페이스의 인자로 활용되기 때문에 잠재적으로 다른 스레드에서 실행되는 것을 의미한다.

7. fromFuture()

  • Future 인터페이스도 동시성 API로 비동기 계산의 결과를 구할 때 사용한다.
  • 잘 모르는 내용이기 때문에 더 공부하고 추가하도록 하겠다… ^^;

8. fromPublishe()

  • fromXXX() 계열의 마지막 함수.
  • 자바 9의 표준인 Flow API의 일부.
  • 기존의 RxJava와 비교했을 때 패키지 이름이 다르다.
  • Observable.create()와 마찬가지로 onNext()와 onComplete() 함수를 호출할 수 있다.

# Single 클래스

  • Single 클래스는 오직 1개의 데이터만 발행하도록 한정한다.
  • 결과가 유일한 서버 API를 호출할 때 유용하게 사용할 수 있다.
  • 데이터 하나가 발행과 동시에 종료(onSuccess) 된다는 점이다.
  • onNext()와 onComplete() 함수가 onSuccess() 함수로 통합된 것으로 보면 된다.
  • 따라서 Single 클래스의 라이프 라이클 함수.
    • onSuccess()
    • onError()

1. just() 함수

  • Observable과 거의 같은 방법으로 활용할 수 있다.
  • Single 클래스는 하나의 데이터만 발행하기 때문에 조심해서 사용해야 한다.
  • 예제
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
private fun executeFromObservableToSingle() {
println("==Single 예제==")
// 1. 기존 Observable 에서 Single 객체로 변환하기.
val source: Observable<String> = Observable.just("Hello Single")
Single.fromObservable(source).subscribe { it ->
println(it)
}

// 2. Single() 함수를 호출해 Single 객체 생성하기.
// Observable 에서 값이 발행되지 않을 때, 인자로 넣은 기본 값을 대신 발행한다.
Observable.just("Hi Single")
.single("default value")
.subscribe { it ->
println(it)
}


// 3. first() 함수를 호출해 Single 객체 생성하기.
// 하나 이상의 데이터를 발행하더라도 첫 번째 데이터 발행 후 onSuccess 이벤트가 발생한다.
val colors: Array<String> = arrayOf("Red", "Blue", "White")
Observable.fromArray(colors).map {
return@map it[0]
}
.first("d")
.subscribe { it ->
println(it)
}

// 4. empty Observable 에서 Single 객체 생성하기.
// 첫 번째 데이터 발행 후 onSuccess 이벤트가 발생.
// Observable 에서 값이 발행되지 않을 때도 기본값을 갖는 single 객체로 변환할 수 있다.
Observable.empty<String>()
.single("default value")
.subscribe { it ->
println(it)
}


// 5. take() 함수에서 single 객체 생성하기.
// Single 이기 때문에 take() 함수에 인자로 1보다 큰 값을 넣으면 에러 발생.
Observable.just(Order("ORD-5"), Order("ORD-6"))
.take(1)
.single(Order("default value"))
.subscribe { it ->
println(it)
}
}

data class Order(val mid: String) {
// toString 오버라이드.
override fun toString(): String {
return "Order ID : $mid"
}
}

# Maybe 클래스

  • 최대 데이터 하나를 가질 수 있지만 데이터 발행 없이 바로 데이터 발생을 완료할 수도 있다.
    • Single 클래스 : 1개 완료
    • Maybe 클래스 : 0 혹은 1개 완료
  • Maybe 클래스는 Single 클래스에 onComplete 이벤트가 추가된 형태이다.
  • 차례대로 onSuccess, onError, onComplete 이벤트에 해당한다.
  • Observable의 특정 연산자를 통해 생성할 때가 많다.

# 뜨거운 Observable

Observable에는 뜨거운 Observable과 차가운 Observable이 있다.

  • 차가운 Observable

    • just(), fromIterable() 함수를 호출해도 옵서버가 subscribe() 함수를 호출하여 구독하지 않으면 데이터를 발행하지 않는다.
    • 예) 웹 요청, 데이터베이스 쿼리와 파일 읽기 등.
    • 원하는 URL이나 데이터를 지정하면 그때부터 서버나 데이터베이스 요청을 보내고 결과를 받는다.
    • 앞서 봤던 것들이 차가운 Observable이다.
    • 앞으로 공부함에 있어서 별도의 언급이 없으면 차가운 Observable!
  • 뜨거운 Observable

    • 구독자의 존재 여부와 관계없이 데이터를 발행한다.
    • 따라서 여러 구독자를 고려할 수 있다.
    • 구독자로서는 Observable에서 발행하는 데이터를 처음부터 모두 수신할 것으로 보장할 수 없다.
    • 예) 마우스 이벤트, 키보드 이벤트, 시스템 이벤트, 센서 데이터와 주식 가격 등.
    • 온도, 습도 센서의 데이터를 처리하는 앱이라면 최근의 온도, 습도 정보만 사용자에게 표시하면 된다.

요약하면, 차가운 Observable은 구독자가 구독하면 준비된 데이터를 처음부터 발행한다. 하지만 뜨거운 Observable은 구독한 시점부터 Observable에서 발행한 값을 받는다.

구독자가 여러명?

위에서 뜨거운 Observable은 여러 구독자를 고려할 수 있다고 한다. 무슨 뜻일까?
예를 들어 서버에 요청한 결과로 반환된 JSON 문서를 파싱해 원하는 속성을 추추란다고 해보자.
날씨 정보, 지역 정보, 시간 정보를 반환하는 경우 RxJava에서는 위의 세 가지 정보를 구독자라고 생각하면 편리하다.

데이터의 원천은 한 곳이지만 내가 최종적으로 원하는 결과 데이터가 여러 종류일 때는 각각을 구독자로 생각하면 좋다.

# Subject 클래스

  • 차가운 Observable -> 뜨거운 Observable
  • Observable의 속성과 구독자의 속성이 모두 있다.
    • Observable처럼 데이터를 발행할 수도 있고 구독자처럼 발행된 데이터를 바로 처리할 수도 있다.

1. AsyncSubject 클래스

  • Observable에서 발행한 마짐막 데이터를 얻어올 수 있는 Subject.
  • 즉, 완료되기 전 마지막 데이터에만 관심이 있으며 이전 데이터는 무시한다.
  1. 처음 구독자가 subscribe() 함수를 호출한다.
  2. 이후에 Red, Green 원이 발행된 후 두 번째 구독자가 subscribe() 함수를 호출한다.
  3. 마지막으로 Blue 원이 발행되고 데이터 발행을 완료한다.(onComplete 이벤트)

이때 완료되기 전까지는 구독자에게 데이터를 전달하지 안하가 완료됨과 동시에 첫 번째와 두 번째 구독자에게 마지막 데이터를 발행하고 종료한다.

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
private fun executeAsyncSubject(){
val subject : AsyncSubject<String> = AsyncSubject.create()
// 정적 팩토리 함수인 create()로 객체 생성.
subject.subscribe{
data -> println("Subscriber #1 => $data")
}
// 구독.

subject.onNext("1")
subject.onNext("3")
// onNext()를 호출하여 데이터 발행.

subject.subscribe{
data -> println("Subscriber #2 => $data")
}
// 두 번째 구독.

subject.onNext("5")
subject.onComplete()
// 데이터를 발행한 후에는 마지막으로 onComplete() 호출.

// onComplete() 함수를 호출한 후에 구독했을 경우.
// Observable과 마찬가지로 onComplete 함수 호출 이후에 onNext 이벤트를 무시한다.
subject.onNext("12")
subject.subscribe{
data -> println("Subscriber #3 => $data")
}
subject.subscribe{
data -> println("Subscriber #4 => $data")
}
}
// 결과
Subscriber #1 => 5
Subscriber #2 => 5
Subscriber #3 => 5
Subscriber #4 => 5
  • 또한, AsyncSubject 클래스는 구독자로도 동작할 수 있다.
1
2
3
4
5
6
7
8
9
10
11
12
13
private fun executeAsyncSubject2(){
val source : Observable<Float> = Observable.just(10.1f,13.4f,12.5f)

val subject : AsyncSubject<Float> = AsyncSubject.create()
subject.subscribe{
data -> println("Subscriber # => $data")
}

source.subscribe(subject)
// subject가 source 즉, Observable을 구독.
}
// 결과
Subscriber # => 12.5

2. BehaviorSubject 클래스

  • 구독자가 구독을 하면 가장 최근 값 혹은 기본값을 넘겨주는 클래스이다.
  • 예를 들어 온도 센서에서 값을 받아온다면 가장 최근의 온도 값을 받아오는 동작을 구현할 수 있으며 처음 온도를 얻을 때는 초깃값(0)을 반환하기도 한다.
  • 마블 다이어그램처럼 처음에 발행한 값이 없다면 기본값을 발행해서 구독자가 받게 된다.
  • 그 다음부터는 발행한 최근 값을 받는다.
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
private fun executeBehaviorSubject() {
val subject: BehaviorSubject<String> = BehaviorSubject.createDefault("6")
subject.subscribe { data ->
println("Subscriber #1 => $data")
}
subject.onNext("1")
subject.onNext("3")
subject.subscribe { data ->
println("Subscriber #2 => $data")
}
subject.onNext("5")
subject.onComplete()
}
// 결과
Subscriber #1 => 6
Subscriber #1 => 1
Subscriber #1 => 3
Subscriber #2 => 3
Subscriber #1 => 5
Subscriber #2 => 5

3. PublishSubject 클래스

  • 구독자가 subscribe() 함수를 호출하면 값을 발행하기 시작한다.
  • 가장 평범한 Subject 클래스.
  • 오직 해당 시간에 발생한 데이터를 그대로 구독자에게 전달한다.
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
private fun executePublishSubject() {
val subject: PublishSubject<String> = PublishSubject.create()
subject.subscribe { data ->
println("Subscriber #1 => $data")
}
subject.onNext("1")
subject.onNext("3")
subject.subscribe { data ->
println("Subscriber #2 => $data")
}
subject.onNext("5")
subject.onComplete()
}
// 결과
Subscriber #1 => 1
Subscriber #1 => 3
Subscriber #1 => 5
Subscriber #2 => 5
  • 마블 다이어그램과 코드를 함께 보면 이해가 조금 더 수월하다.
  • 첫 번째 구독자가 subscribe() 함수를 호출하면 Subject가 발행한 1,3 이라는 데이터를 전달받는다.
  • 이 후 두 번째 구독자가 subscribe() 함수를 호출하고 Subject가 발행한 5라는 데이터는 두 구독자 모두가 전달받는다.
    • 두 번째 구독자는 구독한 이후에 발행된 데이터인 5만 전달받는다.

4. ReplaySubject 클래스

  • Subject 클래스의 목적은 뜨거운 Observable을 활용하는 것인데 차가운 Observable 처럼 동작하기 때문이다. -> 아직 무슨 말인지 정확히 이해가 가지 않는다…ㅜ
  • 구독자가 새로 생기면 항상 데이터의 처음부터 끝까지 발행하는 것을 보장해준다.
  • 그러므로 모든 데이터 내용을 저장해두는 과정 중 메모리 누수가 발생할 가능성을 염두에 두고 사용할 때 주의해야 한다.
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
private fun exectueReplaySubject(){
val subject : ReplaySubject<String> = ReplaySubject.create()
subject.subscribe {
data-> println("Subscriber #1 => $data")
}

subject.onNext("1")
subject.onNext("3")
subject.subscribe {
data -> println("Subscriber #2 => $data")
}
subject.onNext("5")
subject.onComplete()
}
//결과
Subscriber #1 => 1
Subscriber #1 => 3
Subscriber #2 => 1
Subscriber #2 => 3
Subscriber #1 => 5
Subscriber #2 => 5
  • 첫 번째 구독자는 Observable을 구독한 이후에 발행한 1과 3을 전달받는다.
  • 두 번째 구독자는 subscribe() 함수를 호출하면 지금까지 발행된 1과 3을 바로 전달받는다. 그리고 마지막으로 Subject 클래스가 5를 발행하면 두 구독자 모두 해당 값을 전달받는다.

# ConnectableObservable 클래스

  • 차가운 Observable을 뜨거운 Observable로 변환한다.(Subject 처럼)
  • Observable을 여러 구독자에게 공유할 수 있으므로 원 데이터 하나를 여러 구독자에게 동시에 전달할 때 사용한다.
  • subscribe() 함수를 호출해도 아무 동작이 일어나지 않는다.
  • connect() 함수는 호출한 시점부터 subscribe() 함수를 호출한 구독자에게 데이터를 발행한다.
  • publish() : Observable을 ConnectableObservable로 변환.
  • 여러 구독자에게 데이터를 발행하기 위해 connect() 함수를 호출하기 전까지 데이터 발행을 유예할 수 있다.
  • connect() 함수를 호출해야 그때까지 구독했던 구독자 모두에게 데이터를 발행한다. connect() 함수를 호출한 이후에 구독한 구독자에게는 구독 이후에 발생한 데이터부터 발행한다.

ConnectableObservable로 바꿔놓으면

  1. 구독자를 동시에 대기시킬 수 있다.
    -> 구독자 1,2,3을 subscribe() 해놓고 마지막에 connect()를 하면 동시에 같은 데이터를 받을 수 있다. 이것이 뜨거운 Observable의 성격!

  2. connect()한 이후에 subscribe()를 하게 되면 앞서 발행되었던 데이터를 받을 수 없다.
    -> 이도 마찬가지로 뜨거운 Observable의 성격이다. 최신의 데이터만 필요하거나 앞서 발행되었던 데이터는 불필요한 경우에 활용될 수 있다.

# 참고