[RxJava] Chap07. 디버깅 - 흐름 제어
흐름 제어
- 흐름 제어는 Observable이 데이터를 발행하는 속도와 옵저버가 데이터를 받아서 처리하는 속도 사이의 차이가 발생할 때 사용하는 함수이다.
- RxJava는 Observable이 데이터의 흐름을 push하는 방식으로 동작하기 때문에 위의 문제에 대해서 대처할 수 있어야 한다.
sample()
- 특정한 시간 동안 가장 최근에 발행한 데이터만 걸러준다. 즉, 최근에 발행된 데이터만 넘겨주고 나머지는 무시한다.
- 해당 시간에는 아무리 많은 데이터가 들어와도 해당 구간의 마지막 데이터만 발행하고 나머지는 무시한다.
1 | (SchedulerSupport.COMPUTATION) |
emitLast
인자는 sample() 함수의 데이터 발행이 완료되지 않고 마지막에 데이터가 남아 있을 때, 해당 데이터를 발행할 것인지 결정한다. true로 설정하면 마지막 데이터를 발행한다.
1 | public class sampleTest { |
- 먼저, 100ms 간격으로 data 배열에 있는 데이터 4개를 발행한다. 그리고 마지막 데이터인 6을 300ms 후에 발행한다.
- 또한 내가 원하는 특정 시간 후에 발행하기 위해 concat() 함수를 호출해 2개의 데이터 흐름(Observable)을 결합했다. 이렇게 전체 데이터 흐름을 세부 데이터 흐름으로 나누면 코드의 가독성이 좋아진다.
- sample() 함수는 300ms 간격으로 수행한다. 매 300ms 마다 가장 최근에 들어온 값만 최종적으로 발행한다.
- 처음에 데이터를 발행하기 위해 약간의 지연 시간이 있어서(100ms+a) 다이어그램을 보면 시작할 때 약간의 간격이 있음을 볼 수 있다. 다음 데이터 발행이 3XXms이기 때문에 300ms일 때는 가장 최근 데이터가 이전에 발행했던 7이 되는 것이다.(처음 기준!)
- 마지막 인자를 true로 설정하면 마지막 데이터를 발행한다. 기본값이 false이다.
buffer()
- 일정 시간 동안 데이터를 모아두었다가 한꺼번에 발행해준다.
- 따라서 넘치는 데이터 흐름을 제어할 필요가 있을 때 활용한다. 컴퓨터의 버퍼 같은 기능을 한다.
- 처음에 빨,노,초 원을 발행하면 그것을 모아서 List 객체에 전달해준다. 그 다음 다시 하늘,파,보 원이 모이면 그것을 모아서 한 번에 발행해준다. 매우 단순한 로직이다.
- 함수의 원형은 다음과 같다.
- 기본적으로 스케줄러 없이 현재 스레드에서 동작하며 입력되는 값을 count에 저장된 수만큼 모아서 List
에 한꺼번에 발행한다.
- 기본적으로 스케줄러 없이 현재 스레드에서 동작하며 입력되는 값을 count에 저장된 수만큼 모아서 List
1 | (SchedulerSupport.NONE) |
1 | public class bufferSample { |
- buffer(3)는 데이터를 3개씩 모았다가 List
에 채운 후 값을 한꺼번에 발행해준다. 그래서 위의 결과를 보면 확인 가능하다. - buffer() 함수에는 모으거나(count) 무시할(skip) 데이터 개수를 입력할 수 있다.
- skip 변수는 count보다 값이 커야 한다. count가 2이고 skip이 3이면 2개 데이터를 모으고 3번째 데이터 1개는 스킵한다.
- 코드는 위와 같으면
buffer(2,3)
으로 호출하면 아래와 같은 결과를 얻을 수 있다.
1 | RxComputationThreadPool-1 | 562 | value = [1, 2] |
- Observable에서 onNext 이벤트가 발생하면 내부 데이터는 3개가 아니라 2개 값을 모아 바로 List
에 채운 후 구독자에게 데이터를 발행한다.
throttleFirst(), throttleLast()
- throttle는 조절판이라는 뜻이다. 그것에 맞게 throttleFirst() 함수는 주어진 조건에서 가장 먼저 입력된 값을 발행한다. throttleLast() 함수는 주어진 조건에서 가장 마지막에 입력된 값을 발행한다.
- throttleFirst()와 throttleLast()는 정반대의 의미가 아니다. throttleFirst() 함수는 어떤 데이터가 입력된 후 일정 시간 동안 다른 데이터가 발행되지 못하도록 방지하지만, throttleLast() 함수는 sample() 함수처럼 고정된 시간 간격안에서 마지막 데이터만 발행한다.
- throttleFirst() : sample() 함수와 비슷하지만 다르다. sample() 함수가 주어진 시간 동안 입력된 마지막 값을 발행한다면
throttleFirst()
함수는 어떤 데이터를 발행하면 지정된 시간 동안 다른 데이터를 발행하지 않도록 막는다. - throttleLast() : sample() 함수와 기본 개념은 동일하다. 주어진 시간 동안 입력된 값 중 마지막 값을 발행한다.
- 함수 원형은 다음과 같다.
- 계산 스케줄러에서 실행한다. 즉, 비동기로 동작하도록 설계된 함수다.
- windowDuration는 시간 간격을 지정하며 unit은 시간의 단위다.
1 | (SchedulerSupport.COMPUTATION) |
1 | public class throttleFirstSample { |
- 처음 100ms가 지난 후에 1을 발행한 후, 300ms 동안 기다린 다음 2를 발행한다. 그리고 100ms 간격으로 나머지 값들을 발행한다. 마지막으로 throttleFirst() 함수를 호출해 200ms 간격으로 타임 윈도에 맨 먼저 입력된 값을 발행한다.
- 위에서는 1,2,4,6이 다운 스트림으로 발행된다.
window()
- groupBy() 함수와 개념적으로 비슷하다.
- throttleFirst()나 sample() 함수처럼 내가 처리할 수 있는 일부의 값들만 받아들일 수 있다. 흐름 제어 기능에 groupBy() 함수와 비슷한 별도의 Observable 분리 기능을 모두 갖추었다고 생각하면 된다.
- count를 인자로 받는다. 예를 들어, 3을 인자로 받으면 앞으로 데이터 3개가 발행될 때마다 새로운 Observable을 생성하겠다는 뜻이다.
- 함수의 원형은 다음과 같다.
- 현재 스레드를 그대로 활용한다. 왜 그런지 window() 함수의 다른 변형을 비교하면 알 수 있다.
1 | (SchedulerSupport.NONE) |
1 | (SchedulerSupport.COMPUTATION) |
- count만을 인자로 갖는 window() 함수는 입력된 값을 그대로 발행하기 때문에 비동기 작업이라고 보기 어렵다.
- 위의 함수 원형에는 timespan이라는 시간 동안 입력된 값 중에서 일부를 무시하는 기능을 포함한다.
- 어떤 필터링 작업을 해줘야 하기 때문에 계산 스케줄러를 활용하게 된다.
1 | public class windowSample { |
- window() 함수의 인자로 3을 넣었다. 처음에 Observable을 생성하고 3개의 데이터를 전달받으면 새로운 Observable을 다시 생성하여 값을 발행한다.
- 1 값을 발행할 때와 4 값을 발행할 때 각각 새로운 Observable이 생성되었다.
debounce()
- 빠르게 연속 이벤트를 처리하는 흐름 제어 함수다.
- 안드로이드와 같은 UI 기반의 프로그래밍에서는 유용하게 활용할 수 있다.
- 예를 들어, 버튼을 빠르게 누르는 상황에서 마지막에 누른 이벤트만 처리해야할 때 간단하게 적용할 수 있다. RxJava를 이용하지 않는다면 마지막에 버튼을 누른 시간을 멤버 변수에 저장하고 일정 시간 동안 if문으로 예외 처리해야 하기 때문에 매우 번거롭고 실수할 가능성도 크다.
- 첫 번째 원은 지정한 시간 간격 안에 들어왔고 다른 이벤트는 없어서 그대로 발행되었다. 두 번째 원의 경우 시간 간격 안에 세 번째 원이 다시 들어왔으므로 두 번째가 아닌 세 번째 원을 발행한다. 마지막도 마찬가지다.
- 함수의 원형은 다음과 같다.
- 계산 스케줄러에서 동작한다.
- 어떤 이벤트가 입력되고 timeout에서 지정한 시간 동안 추가 이벤트가 발생하지 않으면 마지막 이벤트를 최종적으로 발행한다.
1 | (SchedulerSupport.COMPUTATION) |
1 | public class debounceSample { |
- 데이터를 발행하는 부분이 특이하다. 각각의 시간 간격이 서로 다르기 때문에 concat() 함수를 활용해 각가 데이터를 발행했다.
- timer() 함수는 이벤트를 한 번만 발생시키고 완료하기 때문에 concat()과 timer() 함수의 조합은 유용하다.
- debounce()를 활용해 어떤 이벤트가 입력되고 지정된 timeout인 200ms 안에 더 이상의 이벤트가 없으면 마지막에 입력된 값을 발행한다.
- 여기서 이해가 잘 안되었던 부분이 있었다. 결과를 예상해봤을 때, 1,2,5라고 생각했지만 아니었다. 왜냐면 정의 자체가 어떤 이벤트가 입력되고 나서 timout 내에 더 이상의 이벤트가 없으면 마지막 이벤트를 발행하는 것이다. 그러니까 어떤 이벤트가 입력되고 나서 timout 내에 이벤트가 있는지 찾는 것이다. 그래서 결과는 1,3,5가 맞다.