흐름 제어

  • 흐름 제어는 Observable이 데이터를 발행하는 속도와 옵저버가 데이터를 받아서 처리하는 속도 사이의 차이가 발생할 때 사용하는 함수이다.
  • RxJava는 Observable이 데이터의 흐름을 push하는 방식으로 동작하기 때문에 위의 문제에 대해서 대처할 수 있어야 한다.

sample()

  • 특정한 시간 동안 가장 최근에 발행한 데이터만 걸러준다. 즉, 최근에 발행된 데이터만 넘겨주고 나머지는 무시한다.
  • 해당 시간에는 아무리 많은 데이터가 들어와도 해당 구간의 마지막 데이터만 발행하고 나머지는 무시한다.
1
2
3
@SchedulerSupport(SchedulerSupport.COMPUTATION)
public final Observable<T> sample(long period, TimeUnit unit)
public final Observable<T> sample(long period, TimeUnit unit, boolean emitLast)
  • emitLast 인자는 sample() 함수의 데이터 발행이 완료되지 않고 마지막에 데이터가 남아 있을 때, 해당 데이터를 발행할 것인지 결정한다. true로 설정하면 마지막 데이터를 발행한다.
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
public class sampleTest {
public static void main(String[] args) {
String[] data = {"1", "7", "2", "3", "6"};

// 시간 측정용.
CommonUtils.exampleStart();

// 앞의 4개의 데이터는 100ms 간격으로 발행.
Observable<String> earlySource = Observable.fromArray(data)
.take(4)
.zipWith(Observable.interval(100L, TimeUnit.MILLISECONDS),
(a, b) -> a);

// 마지막 데이터는 300ms 후에 발행.
Observable<String> lateSource = Observable.just(data[4])
.zipWith(Observable.interval(300L, TimeUnit.MILLISECONDS),
(a, b) -> a);

// 2개의 Observable 을 결합하고 300ms 로 샘플링.
Observable<String> source = Observable.concat(earlySource, lateSource)
.sample(300L, TimeUnit.MILLISECONDS);

source.subscribe(Log::it);

CommonUtils.sleep(1000);
}
}
// 결과
RxComputationThreadPool-1 | 552 | value = 7
RxComputationThreadPool-1 | 849 | value = 3
  • 먼저, 100ms 간격으로 data 배열에 있는 데이터 4개를 발행한다. 그리고 마지막 데이터인 6을 300ms 후에 발행한다.
  • 또한 내가 원하는 특정 시간 후에 발행하기 위해 concat() 함수를 호출해 2개의 데이터 흐름(Observable)을 결합했다. 이렇게 전체 데이터 흐름을 세부 데이터 흐름으로 나누면 코드의 가독성이 좋아진다.
  • sample() 함수는 300ms 간격으로 수행한다. 매 300ms 마다 가장 최근에 들어온 값만 최종적으로 발행한다.
  • 처음에 데이터를 발행하기 위해 약간의 지연 시간이 있어서(100ms+a) 다이어그램을 보면 시작할 때 약간의 간격이 있음을 볼 수 있다. 다음 데이터 발행이 3XXms이기 때문에 300ms일 때는 가장 최근 데이터가 이전에 발행했던 7이 되는 것이다.(처음 기준!)
  • 마지막 인자를 true로 설정하면 마지막 데이터를 발행한다. 기본값이 false이다.

buffer()

  • 일정 시간 동안 데이터를 모아두었다가 한꺼번에 발행해준다.
  • 따라서 넘치는 데이터 흐름을 제어할 필요가 있을 때 활용한다. 컴퓨터의 버퍼 같은 기능을 한다.
  • 처음에 빨,노,초 원을 발행하면 그것을 모아서 List 객체에 전달해준다. 그 다음 다시 하늘,파,보 원이 모이면 그것을 모아서 한 번에 발행해준다. 매우 단순한 로직이다.
  • 함수의 원형은 다음과 같다.
    • 기본적으로 스케줄러 없이 현재 스레드에서 동작하며 입력되는 값을 count에 저장된 수만큼 모아서 List에 한꺼번에 발행한다.
1
2
@SchedulerSupport(SchedulerSupport.NONE)
public final Observable<List<T>> buffer(int count)
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
public class bufferSample {
public static void main(String[] args) {
String[] data = {"1", "2", "3", "4", "5", "6"};
CommonUtils.exampleStart();

// 앞의 3개는 100ms 간격으로 발행.
Observable<String> earlySource = Observable.fromArray(data)
.take(3)
.zipWith(Observable.interval(100L, TimeUnit.MILLISECONDS),
(a, b) -> a);

// 가운데 1개는 300ms 후에 발행.
Observable<String> middleSource = Observable.just(data[3])
.zipWith(Observable.interval(300L, TimeUnit.MILLISECONDS),
(a, b) -> a);

// 마지막 2개는 100ms 후에 발행.
Observable<String> lataSource = Observable.just(data[4], data[5])
.zipWith(Observable.interval(100L, TimeUnit.MILLISECONDS),
(a, b) -> a);


Observable<List<String>> source = Observable.concat(earlySource, middleSource, lataSource)
.buffer(3);

source.subscribe(Log::it);
CommonUtils.sleep(1000);
}
}
// 결과
RxComputationThreadPool-1 | 562 | value = [1, 2, 3]
RxComputationThreadPool-3 | 1067 | value = [4, 5, 6]
  • buffer(3)는 데이터를 3개씩 모았다가 List에 채운 후 값을 한꺼번에 발행해준다. 그래서 위의 결과를 보면 확인 가능하다.
  • buffer() 함수에는 모으거나(count) 무시할(skip) 데이터 개수를 입력할 수 있다.
  • skip 변수는 count보다 값이 커야 한다. count가 2이고 skip이 3이면 2개 데이터를 모으고 3번째 데이터 1개는 스킵한다.
  • 코드는 위와 같으면 buffer(2,3)으로 호출하면 아래와 같은 결과를 얻을 수 있다.
1
2
RxComputationThreadPool-1 | 562 | value = [1, 2]
RxComputationThreadPool-3 | 1067 | value = [4, 5]
  • Observable에서 onNext 이벤트가 발생하면 내부 데이터는 3개가 아니라 2개 값을 모아 바로 List에 채운 후 구독자에게 데이터를 발행한다.

throttleFirst(), throttleLast()

  • throttle는 조절판이라는 뜻이다. 그것에 맞게 throttleFirst() 함수는 주어진 조건에서 가장 먼저 입력된 값을 발행한다. throttleLast() 함수는 주어진 조건에서 가장 마지막에 입력된 값을 발행한다.
  • throttleFirst()와 throttleLast()는 정반대의 의미가 아니다. throttleFirst() 함수는 어떤 데이터가 입력된 후 일정 시간 동안 다른 데이터가 발행되지 못하도록 방지하지만, throttleLast() 함수는 sample() 함수처럼 고정된 시간 간격안에서 마지막 데이터만 발행한다.
  • throttleFirst() : sample() 함수와 비슷하지만 다르다. sample() 함수가 주어진 시간 동안 입력된 마지막 값을 발행한다면 throttleFirst() 함수는 어떤 데이터를 발행하면 지정된 시간 동안 다른 데이터를 발행하지 않도록 막는다.
  • throttleLast() : sample() 함수와 기본 개념은 동일하다. 주어진 시간 동안 입력된 값 중 마지막 값을 발행한다.
  • 함수 원형은 다음과 같다.
    • 계산 스케줄러에서 실행한다. 즉, 비동기로 동작하도록 설계된 함수다.
    • windowDuration는 시간 간격을 지정하며 unit은 시간의 단위다.
1
2
@SchedulerSupport(SchedulerSupport.COMPUTATION)
public final Observable<T> throttleFirst(long windowDuration, TimeUnit unit)
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
public class throttleFirstSample {
public static void main(String[] args) {
String[] data = {"1", "2", "3", "4", "5", "6"};
CommonUtils.exampleStart();

// 앞의 1개는 100ms 간격으로 발행.
Observable<String> earlySource = Observable.just(data[0])
.zipWith(Observable.interval(100L, TimeUnit.MILLISECONDS),
(a, b) -> a);

// 다음 1개는 300ms 후에 발행.
Observable<String> middlerSource = Observable.just(data[1])
.zipWith(Observable.interval(300L, TimeUnit.MILLISECONDS),
(a, b) -> a);

// 마지막 4개는 100ms 후에 발행.
Observable<String> lateSource = Observable.just(data[2], data[3], data[4], data[5])
.zipWith(Observable.interval(100L, TimeUnit.MILLISECONDS),
(a, b) -> a)
.doOnNext(Log::dt); // 디버깅 정보 출력.

Observable<String> source = Observable.concat(earlySource, middlerSource, lateSource)
.throttleFirst(200L, TimeUnit.MILLISECONDS);

source.subscribe(Log::it);
CommonUtils.sleep(1000);
}
}
// 결과
RxComputationThreadPool-1 | 371 | value = 1
RxComputationThreadPool-3 | 673 | value = 2
RxComputationThreadPool-4 | 779 | debug = 3
RxComputationThreadPool-4 | 876 | debug = 4
RxComputationThreadPool-4 | 975 | debug = 5
RxComputationThreadPool-4 | 975 | value = 5
RxComputationThreadPool-4 | 1077 | debug = 6
  • 처음 100ms가 지난 후에 1을 발행한 후, 300ms 동안 기다린 다음 2를 발행한다. 그리고 100ms 간격으로 나머지 값들을 발행한다. 마지막으로 throttleFirst() 함수를 호출해 200ms 간격으로 타임 윈도에 맨 먼저 입력된 값을 발행한다.
  • 위에서는 1,2,4,6이 다운 스트림으로 발행된다.

window()

  • groupBy() 함수와 개념적으로 비슷하다.
  • throttleFirst()나 sample() 함수처럼 내가 처리할 수 있는 일부의 값들만 받아들일 수 있다. 흐름 제어 기능에 groupBy() 함수와 비슷한 별도의 Observable 분리 기능을 모두 갖추었다고 생각하면 된다.
  • count를 인자로 받는다. 예를 들어, 3을 인자로 받으면 앞으로 데이터 3개가 발행될 때마다 새로운 Observable을 생성하겠다는 뜻이다.
  • 함수의 원형은 다음과 같다.
    • 현재 스레드를 그대로 활용한다. 왜 그런지 window() 함수의 다른 변형을 비교하면 알 수 있다.
1
2
@SchedulerSupport(SchedulerSupport.NONE)
public final Observable<Observable<T>> window(long count)
1
2
3
4
5
6
@SchedulerSupport(SchedulerSupport.COMPUTATION)
public final Observable<Observable<T>> window(
long timespan, long timeskip, TimeUnit unit
){
// 생략.
}
  • count만을 인자로 갖는 window() 함수는 입력된 값을 그대로 발행하기 때문에 비동기 작업이라고 보기 어렵다.
  • 위의 함수 원형에는 timespan이라는 시간 동안 입력된 값 중에서 일부를 무시하는 기능을 포함한다.
  • 어떤 필터링 작업을 해줘야 하기 때문에 계산 스케줄러를 활용하게 된다.
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
public class windowSample {
public static void main(String[] args) {
String[] data = {"1", "2", "3", "4", "5", "6"};

CommonUtils.exampleStart();

// 앞의 3개는 100ms 간격으로 발행.
Observable<String> earlySource = Observable.fromArray(data)
.take(3)
.zipWith(Observable.interval(100L, TimeUnit.MILLISECONDS),
(a, b) -> a);

// 가운데 1개는 300ms 후에 발행.
Observable<String> middleSource = Observable.just(data[3])
.zipWith(Observable.interval(300L, TimeUnit.MILLISECONDS),
(a, b) -> a);

// 마지막 2개는 100ms 후에 발행.
Observable<String> lateSource = Observable.just(data[4], data[5])
.zipWith(Observable.interval(100L, TimeUnit.MILLISECONDS),
(a, b) -> a);

// 데이터 3개씩 모아서 새로운 Observable 생성.
Observable<Observable<String>> source = Observable.concat(earlySource, middleSource, lateSource)
.window(3);

source.subscribe(observable -> {
Log.dt("New Observable Started!!");
observable.subscribe(Log::it);
});

CommonUtils.sleep(1000);
CommonUtils.exampleComplete();

}
}
// 결과
RxComputationThreadPool-1 | 365 | debug = New Observable Started!!
RxComputationThreadPool-1 | 366 | value = 1
RxComputationThreadPool-1 | 461 | value = 2
RxComputationThreadPool-1 | 560 | value = 3
RxComputationThreadPool-2 | 861 | debug = New Observable Started!!
RxComputationThreadPool-2 | 861 | value = 4
RxComputationThreadPool-3 | 963 | value = 5
RxComputationThreadPool-3 | 1062 | value = 6
  • window() 함수의 인자로 3을 넣었다. 처음에 Observable을 생성하고 3개의 데이터를 전달받으면 새로운 Observable을 다시 생성하여 값을 발행한다.
  • 1 값을 발행할 때와 4 값을 발행할 때 각각 새로운 Observable이 생성되었다.

debounce()

  • 빠르게 연속 이벤트를 처리하는 흐름 제어 함수다.
  • 안드로이드와 같은 UI 기반의 프로그래밍에서는 유용하게 활용할 수 있다.
  • 예를 들어, 버튼을 빠르게 누르는 상황에서 마지막에 누른 이벤트만 처리해야할 때 간단하게 적용할 수 있다. RxJava를 이용하지 않는다면 마지막에 버튼을 누른 시간을 멤버 변수에 저장하고 일정 시간 동안 if문으로 예외 처리해야 하기 때문에 매우 번거롭고 실수할 가능성도 크다.
  • 첫 번째 원은 지정한 시간 간격 안에 들어왔고 다른 이벤트는 없어서 그대로 발행되었다. 두 번째 원의 경우 시간 간격 안에 세 번째 원이 다시 들어왔으므로 두 번째가 아닌 세 번째 원을 발행한다. 마지막도 마찬가지다.
  • 함수의 원형은 다음과 같다.
    • 계산 스케줄러에서 동작한다.
    • 어떤 이벤트가 입력되고 timeout에서 지정한 시간 동안 추가 이벤트가 발생하지 않으면 마지막 이벤트를 최종적으로 발행한다.
1
2
@SchedulerSupport(SchedulerSupport.COMPUTATION)
public final Observable<T> debounce(long timeout, TimeUnit unit)
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
public class debounceSample {
public static void main(String[] args) {
String[] data = {"1", "2", "3", "5"};
Observable<String> source = Observable.concat(
Observable.timer(100L, TimeUnit.MILLISECONDS).map(i -> data[0]),
Observable.timer(300L, TimeUnit.MILLISECONDS).map(i -> data[1]),
Observable.timer(100L, TimeUnit.MILLISECONDS).map(i -> data[2]),
Observable.timer(300L, TimeUnit.MILLISECONDS).map(i -> data[3])
).debounce(200L, TimeUnit.MILLISECONDS);

source.subscribe(Log::i);
CommonUtils.sleep(1000);
}
}
// 결과
RxComputationThreadPool-2 | value = 1
RxComputationThreadPool-2 | value = 3
RxComputationThreadPool-2 | value = 5
  • 데이터를 발행하는 부분이 특이하다. 각각의 시간 간격이 서로 다르기 때문에 concat() 함수를 활용해 각가 데이터를 발행했다.
  • timer() 함수는 이벤트를 한 번만 발생시키고 완료하기 때문에 concat()과 timer() 함수의 조합은 유용하다.
  • debounce()를 활용해 어떤 이벤트가 입력되고 지정된 timeout인 200ms 안에 더 이상의 이벤트가 없으면 마지막에 입력된 값을 발행한다.
  • 여기서 이해가 잘 안되었던 부분이 있었다. 결과를 예상해봤을 때, 1,2,5라고 생각했지만 아니었다. 왜냐면 정의 자체가 어떤 이벤트가 입력되고 나서 timout 내에 더 이상의 이벤트가 없으면 마지막 이벤트를 발행하는 것이다. 그러니까 어떤 이벤트가 입력되고 나서 timout 내에 이벤트가 있는지 찾는 것이다. 그래서 결과는 1,3,5가 맞다.