조건 연산자

조건 연산자는 Observable의 흐름을 제어하는 역할을 한다. filter 연산자가 발행된 값을 채택하느냐 기각하느냐 여부에 초점을 맞춘다면, 조건 연산자는 지금까지의 흐름을 어떻게 제어할 것인지에 초점을 맞춘다.

다음과 같은 연산자가 있다.

  • amb()
  • takeUntil()
  • skipUntil()
  • all()

1. amb()

amb는 ambiguous(모호한)라는 영어 단어의 줄임말이다. 여러 개의 Observable 중에서 1개의 Observable을 선택하는데, 선택 기준은 가장 먼저 데이터를 발행하는 발행하는 Observable이다. 이후에 나머지 Observable에서 발행하는 데이터는 모두 무시한다.

List 인터페이스처럼 Iterable<Observable> 객체를 인자로 넣으면 그 중에서 가장 먼저 데이터를 발행하는 Observable만 선택해서 계속 값을 발행하도록 해준다.

2. takeUntil()

takeUntil() 함수는 take() 함수에 조건을 설정할 수 있다. 구체적으로 살펴보면 인자로 받은 Observable에서 어떤 값을 발행하면 현재 Observable의 데이터 발행을 중단하고 즉시 완료(onComplete 이벤트 발생)한다. 즉, take() 함수처럼 일정 개수만 값을 발행하되 완료 기준을 다른 Observable에서 값을 발행하는지로 판단하는 것이다.

3. skipUntil()

takeUntil()과 정반대의 함수이다. other Observable을 인자로 받는다는 점은 같지만 Observable에서 데이터를 발행할 때까지 값을 건너뛴다.

takeUntil() 함수와는 다르게 other Observable에서 화살표가 나올 때까지는 값을 발행하지 않고 건너뛰다가 other Observable에서 값을 발행하는 순간부터 원래 Observable에서 값을 정상적으로 발행하기 시작한다.

4. all()

all() 함수는 단순하다. 주어진 조건에 100% 맞을 때만 true 값을 발행하고 조건에 맞지 않는 데이터가 발행되면 바로 false 값을 발행한다.

위의 마블 다이어그램은 ‘1’ 원부터 ‘6’ 원까지 모두 ‘원’ 모양이어야만 true를 발행한다.
all() 함수의 원형은 다음과 같다.

1
2
@SchedulerSupport(SchedulerSupport.NONE)
public final Single<Boolean> all(Predicate<? super T> predicate)

predicate 인자는 filter() 함수의 인자와 동일하다. 주어진 람다 표현식이 true인지 false인지를 판정해주어야 한다.

수학 및 기타 연산자

max(), sum()과 같은 수학 함수와 기타 분류에 해당하는 함수가 있다.

RxJavaMath가 있지만, RxJava 2.x를 지원하지 않으므로 다른 라이브러리를 사용한다. Rxjava2Extensions 라이브러리를 활용해 간단한 수학 함수 및 집합 함수의 활용법을 공부해보자.

1. 수학 함수

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
public class MathSample {
public static void main(String[] args) {
executeMath();
}

private static void executeMath() {
Integer[] data = {1, 2, 3, 4};

// 1. count
Single<Long> source = Observable.fromArray(data)
.count();
source.subscribe(count -> Log.i("count is " + count));

// 2. max
// MathFlowable 클래스의 max() 함수를 호출해 구현한다.
// to() 함수는 다른 타입으로 변환해주기 위한 함수.
Flowable.fromArray(data)
.to(MathFlowable::max)
.subscribe(max -> Log.i("max is " + max));

// 3. min
Flowable.fromArray(data)
.to(MathFlowable::min)
.subscribe(min -> Log.i("min is " + min));

// 4. sum
Flowable<Integer> flowable = Flowable.fromArray(data)
.to(MathFlowable::sumInt);
flowable.subscribe(sum -> Log.i("sum is " + sum));

// 5. average
Flowable<Double> flowable1 = Observable.fromArray(data)
.toFlowable(BackpressureStrategy.BUFFER)
.to(MathFlowable::averageDouble);
flowable1.subscribe(avg -> Log.i("avg is " + avg));
}
}
// 결과
main | value = count is 4
main | value = max is 4
main | value = min is 1
main | value = sum is 10
main | value = avg is 2.5

2. delay()

delay() 함수는 시간을 인자로 받는다. 앞에서 봤던 시간과 관련된 함수들(interval, timer, defer)이 Observable을 생성하는 역할이라면 delay() 함수는 유틸리티 연산자로서 보조 역할을 한다.

delay() 함수의 원형

1
2
@SchedulerSupport(SchedulerSupport.NONE)
public final Observable<T> delay(long delay, TimeUnit unit)

인자로 delay 변수와 시간 단위(ms 등)을 받는다. 그리고 intervale() 함수와 마찬가지로 계산 스케줄러에서 실행한다. 즉, main 스레드가 아닌 계산을 위한 별도 스레드 풀에서 실행하는 것이다.

3. timeInterval()

어떤 값을 발행했을 때 이전 값을 발행한 이후 얼마나 시간이 흘렀는지를 알려준다.

timeInterval() 함수의 원형

1
2
@SchedulerSupport(SchedulerSupport.NONE)
public final Observable<Timed<T>> timeIntervale()

Timed 객체에는 다음처럼 시간을 얻어오거나 Observable의 데이터를 얻을 수 있는 메소드를 제공할 수 있다.

1
2
3
4
public T value()
public TimeUnit unit()
public long time()
public long time(TimeUnit unit)