연산자

ReactiveX의 연산자는 꽤 많다. 이 연산자들을 모두 안다고 하더라도 기억하기는 어렵다. 하지만, 이름을 보고 내용을 짐작할 수 있고 마블 다이어그램이 도움이 된다. 필요할 때 찾아보자.

대신, 여기서 소개하는 연산자는 자주 사용되니 꼭 알아두자!

1.map()

  • 입력값을 어떤 함수에 넣어서 원하는 값으로 변환하는 함수이다.
  • 입력 데이터가 있고 그것을 변환해줄 중개업자가 있다고 생각하면 좋다.
  • map() 함수는 반환값을 확인한다. 또한, 스케줄러를 지원하지 않으므로 현재 스레드에서 실행된다.
  • 일대일 함수
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
fun main(args: Array<String>) {

// map을 통해 10을 곱해준 값을 반환.
Observable.just(1, 2, 3, 4, 5)
.map { item -> item * 10 }
.subscribe({
println(it)
}, {
println(it.message)
})


// 메소드 참조도 가능.
Observable.just("RED", "BLUE", "YELLOW","BLACK")
.map(Test::ballToIndex)
.subscribe({
println(it)
}, {
println(it.message)
})
}

object Test {
fun ballToIndex(color: String): Int = when (color) {
"RED" -> {
1
}
"YELLOW" -> {
2
}
"BLUE" -> {
3
}
else -> {
-1
}
}
}

2.flatMap()

  • map() 함수와 동일한 기능을 하지만 결과가 Observable로 나온다.
  • 결과값이 Observable이므로 여러 개의 데이터를 발행할 수 있다.
  • 일대다 함수 혹은 일대일 Observable 함수이다.
  • 1개를 발행할 수도 여러 개를 발행할 수도 있다.
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
fun main(args: Array<String>) {

// Function 인터페이스를 통해 제네릭 타입을 선언.
val getDoubleDiamonds : (String) -> Observable<String> = { ball: String ->
Observable.just<String>("$ball<>", "$ball<>")
}

// 3개의 데이터를 넣었는데 6개를 발행한다.
// 일대다이고, Observable 을 반환한다.
// 위에서 정의한 Function 인터페이스 사용.
Observable.just("1", "2", "3")
.flatMap(getDoubleDiamonds)
.subscribe({
println("성공 : $it")
}, {
println("실패 : ${it.message}")
})

// 인라인을 사용.
Observable.just("1","2","3")
.map { ball ->
Observable.just("$ball<>","$ball<>")
}
.subscribe{
println(it)
}

}
  • Function<T,R> : T는 입력값을 의미하고 R은 결과 함수이다.
  • String을 넣으면 여러 개의 String을 발행하는 Observable이 나온다.
  • 여러 개의 데이터를 발행하는 방법은 Observable 뿐이다.

3.filter() 함수

  • Observable에서 원하는 데이터만 걸러내는 역할을 한다.
  • 즉, 필요없는 데이터는 제거하고 관심있는 데이터만 filter() 함수를 통과한다.
  • 간단한 수식을 적용하는 것과 원하는 조건을 작성할 수도 있다.
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
fun main(args: Array<String>) {
// 짝수만 필터링.
Observable.just(1, 2, 3, 4, 5, 6, 7)
.filter { number ->
number % 2 == 0
}
.subscribe({
println("result : $it")
}, {
println("error : ${it.message}")
})
}
// 결과
result : 2
result : 4
result : 6

이외에도 filter() 함수와 비슷한 함수들이 존재한다. 이름만 보고 어떤 기능을 할지 짐작이 가능하다.

  • first(default) : Observable의 첫 번째 값을 필터한다. 만약 값 없이 완료되면 기본값을 반환한다.
  • last(default) : Observable의 마지막 값을 필터한다. 만약 값 없이 완료되면 기본값을 반환한다.
  • take(N) : 최초 N개 값만 가져온다.
  • takeLast(N) : 마지막 N개 값만 필터한다.
  • skip(N) : 최초 N개 값을 건너뛴다.
  • skipLast(N) : 마지막 N개 값을 건너뛴다.

사용 예제는 아래와 같다.

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
fun main(args: Array<String>) {
val source = Observable.just(100, 200, 300, 400, 500, 600)
var single: Single<Int>

// 1. first
single = source.first(-1)
single.subscribe({
println("first result : $it")
}, {
println("error : ${it.message}")
})

// 2. last
single = source.last(-1)
single.subscribe({
println("last result : $it")
}, {
println("error : ${it.message}")
})

// 3. take
source.take(3)
.subscribe({
println("take result : $it")
}, {
println("error : ${it.message}")
})

// 4. takeLast
source.takeLast(3)
.subscribe({
println("takeLast result : $it")
}, {
println("error : ${it.message}")
})

// 5. skip -> 300,400,500,600
source.skip(2)
.subscribe({
println("skip result : $it")
}, {
println("error : ${it.message}")
})

// 6. skipLast -> 100,200,300
source.skipLast(3)
.subscribe({
println("skipLast result : $it")
}, {
println("error : ${it.message}")
})
}
// 결과
first result : 100
last result : 600
take result : 100
take result : 200
take result : 300
takeLast result : 400
takeLast result : 500
takeLast result : 600
skip result : 300
skip result : 400
skip result : 500
skip result : 600
skipLast result : 100
skipLast result : 200
skipLast result : 300

4.reduce()

  • 발행한 데이터를 모두 사용하여 어떤 최종 결과 데이터를 합성할 때 활용한다.
  • 함수형 프로그래밍의 가장 기본 연산자인 map/filter/reduce 패턴을 이루는 마지막 필수 함수이다.
  • Observable을 이용해 들어오는 데이터를 1개씩 모아서 최종 결과를 만들어야 할 때 사용한다고 생각하면 된다. 주로 수치와 관련된 계산 문제에서 활용하면 좋다.

보통 Observable에 입력된 데이터를 필요한 map() 함수로 매핑하고, 원하는 데이터만 추출할 때는 불필요한 데이터를 걸러내는 filter() 함수를 사용한다. 또한 상황에 따라 발행된 데이터를 취합하여 어떤 결과를 만들어낼 때는 reduce 계열의 함수를 사용한다.

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
fun main(args: Array<String>) {
val source: Maybe<String> = Observable.just("1", "3", "5")
.reduce { ball1, ball2 ->
"$ball2($ball1)"
}
source.subscribe({
println("reduce 결과 : $it")
}, {
println("error : ${it.message}")
})

// 람다 표현식을 별도 함수로 분리.
Observable.just("1","3","5")
.reduce(mergeBalls)
.subscribe({
println("reduce 함수로 분리한 결과 : $it")
},{
println("error : ${it.message}")
})
}

reduce() 함수를 호출하면 인자로 넘긴 람다 표현식에 의해 결과 없이 완료될 수도 있다. 따라서 Observable이 아니라 결과가 반환할 수도 아닐 수도 있는 Maybe 객체로 리턴된다.

예제

다음과 같은 예제를 한번 작성해보자.

  1. 전체 매출 데이터를 입력한다.
  2. 매출 데이터 중 TV 매출을 필터링한다.
  3. TV 매출의 합을 구한다.
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
fun main(args: Array<String>) {

// 1. 데이터 입력.
// first : 상품 이름, second : 매출액.
var sales = mutableListOf<Pair<String, Int>>()
sales.add("TV" to 2500)
sales.add("Camera" to 300)
sales.add("TV" to 1600)
sales.add("Phone" to 800)
sales.add("Sofa" to 10000)
sales.add(Pair("TV", 1000))

val source: Maybe<Int> = Observable.fromIterable(sales)
// 2. 매출 데이터 중 TV 매출을 필터링한다.
.filter { sale ->
sale.first == "TV"
}
// map 을 통해 sale 에서 매출액만 뽑는다.
.map { sale ->
sale.second
}
// 3. reduce 를 통해 매출의 합을 구한다.
.reduce { sale1, sale2 ->
sale1 + sale2
}
// reduce 를 호출하기 때문에 Observable 이 아니라 Maybe 를 사용한다.

source.subscribe({ total ->
println("TV Sale: $ $total")
}, {
println("error : ${it.message}")
})
}
// 결과
TV Sale: $ 4100