변환 연산자

위에서 데이터 흐름(Observable)을 만들어내는 생성 연산자를 보았다면 이번에는 데이터 흐름을 원하는대로 변형할 수 있는 변환 연산자를 알아보자.

1. concatMap()

flatMap() 함수와 매우 비슷하다. 하지만, concatMap() 함수는 먼저 들어온 데이터 순서대로 처리해서 결과를 낼 수 있도록 보장한다. 즉, 데이터의 순서를 보장한다. flatMap() 함수는 순서를 보장하지 않는다.

flatMap() 함수는 먼저 들어온 데이터를 처리하는 도중에 새로운 데이터가 들어오면 나중에 들어온 데이터의 처리 결과가 먼저 출력될 수도 있다. 이를 인터리빙(끼어들기)라고 한다.

2.switchMap()

concatMap() 함수가 인터리빙이 발생할 수 있는 상황에서 동작의 순서를 보장해준다면 switchMap() 함수는 순서를 보장하기 위해 기존에 진행중이던 작업을 바로 중단한다. 그리고 여러 개의 값이 발행되었을 때 마지막에 들어온 값만 처리하고 싶을 때 사용한다. 중간에 끊기더라도 마지막 데이터의 처리는 보장하기 때문이다.

마블 다이어그램이 조금 복잡하지만, 시간이 겹치지 않는다는 것을 유의하면 된다.
빨간색 도형의 경우 정상적으로 처리했지만,
초록색 도형을 처리하는 도중에 파란색 도형이 들어왔으므로 초록색 도형의 처리는 중단하고 파란색으로 도형을 처리한다.

switchMap() 함수는 센서 등의 값을 얻어와서 동적으로 처리하는 경우에 매우 유용하다. 센서 값을 중간값보다는 최종적인 값으로 결과를 처리하는 경우가 많기 때문이다. 이럴 때는 flatMap() 함수로 매번 새로운 결과가 나왔는지 검사하지 말고 손쉽게 switchMap() 함수를 사용하자.

3. groupBy()

어떤 기준(KeySelector 인자)으로 단일 Observable을 여러 개로 이루어진 Observable 그룹(GroupedObservable)으로 만든다.

1
2
3
4
5
6
7
8
9
10
11
12
public static void main(String[] args){
String[] objs = {PUPPLE, SKY, triangle(YELLOW), YELLOW, triangle(PUPPLE), triangle(SKY)};
Observable<GroupedObservable<String, String>> source =
Observable.fromArray(objs)
.groupBy(Shape::getShape);

source.subscribe(obj -> {
obj.subscribe(val ->
System.out.println("GROUP:" + obj.getKey() + "\t Value:" + val));
});
CommonUtils.exampleComplete();
}
  • 원래 코틀린으로 해결하려고 했는데, 어떻게 짜야될지 몰라서 자바로 진행해본다.
    코드가 조금 복잡하다. GroupedObservable 클래스는 Observable과 동일하지만 getKey() 메소드를 제공하여 구분된 그룹을 알 수 있게 해준다. source는 objs[] 배열에서 입력 데이터를 가져온다. 그룹을 구별하기 위해서 Shape.getShape() 함수를 사용한다.

getShape() 함수의 내용은 다음과 같다.

1
2
3
4
5
6
7
8
9
10
11
public static String getShape(String obj) {
if (obj == null || obj.equals("")) return NO_SHAPE;
if (obj.endsWith("-H")) return HEXAGON;
if (obj.endsWith("-O")) return OCTAGON;
if (obj.endsWith("-R")) return RECTANGLE;
if (obj.endsWith("-T")) return TRIANGLE;
if (obj.endsWith("<>")) return DIAMOND;
if (obj.endsWith("-P")) return PENTAGON;
if (obj.endsWith("-S")) return STAR;
return "BALL";
}

source.subscribe()에 전달하는 obj는 GroupedObservable 객체이다. 그룹별로 1개씩 생성되므로 생성된 obj 별로 다시 subscribe() 함수를 호출해야 한다. val은 그룹 안에서 각 Observable이 발행한 데이터를 의미한다.(즉, GroupedObservable이 발행한 데이터 ex. “BALL - 6”)

  • 만약, 모든 그룹을 처리하고 싶은게 아니라 특정 그룹만 처리하고 싶다면 filter() 함수를 이용해 조건을 추가해주면 된다.
  • getKey() : 메소드는 그룹의 구분자 값을 리턴한다.(즉, Key 값)

map(), flatMap(), groupBy() 비교

  • map() : 함수는 1개의 데이터를 다른 값이나 타입으로 변환해준다.
  • flatMap() : 함수는 1개의 값을 받아서 여러 개의 데이터(Observable)로 확장해준다.
  • groupBy() : 함수는 값들을 받아서 어떤 기준에 맞는 새로운 Observable 다수를 생성한다.

4. scan()

  • reduce() 함수와 비슷하다.
  • reduce() : Observable에서 모든 데이터가 입력된 후 그것을 종합하여 마지막 1개의 데이터만을 구독자에게 발행한다.

반면, scan() 함수는 실행할 때마다 입력값에 맞는 중간 결과 및 최종 결과를 구독자에게 발행한다.

1
2
3
4
5
6
7
8
9
10
private fun executeScan(){
val source = Observable.just("1", "3", "5")
.scan { ball1: String, ball2: String -> "$ball2 ($ball1)" }

source.subscribe(Log::i)
}
// 결과
main | value = 1
main | value = 3 (1)
main | value = 5 (3 (1))

reduce()와 다른점이 있다. 첫 번째는 source의 타입이 Maybe이 아니라 Observable이라는 것이다. reduce() 함수의 경우 마지막 값이 입력되지 않거나 onComplete 이벤트가 발생하지 않으면 구독자에게 값을 발행하지 않는다. 최악의 경우에는 값을 전혀 발해아지 않고 종료할 수도 있기 때문에 Maybe 클래스 타입으로 정의했다.

반면, scan() 함수는 값이 입력될 때마다 구독자에게 값을 발행한다. 따라서 Maybe가 아니라 Observable이다. 그리고 출력된 결과를 확인하면 main 스레드에서 실행되며 값이 입력될 때마다 발행하는 것을 확인할 수 있다.