Kotlin Flow는 안드로이드 공식문서에서 다룰 정도로 Kotlin의 Coroutines에서 중요한 역할을 하는 API인데요.

오늘은 Kotlin의 Flow에 대해서 정리해 보도록 하겠습니다.

 

1. Flow

Coroutine에서 다양한 값들을 순차적으로 흘려보내줄 수 있는 API인데요.

데이터베이스의 업데이트들을 흘려보내서 계속 이 값들을 반영해 줄 수 있습니다.

단순히 순차적으로 흘려보내주기만 하는 것이 아니라,

비동기로 동작한다는 점도 매우 중요합니다.

 

2. Producer와 Consumer

데이터를 하나의 강물로 생각해 보겠습니다.

이 물을 생성해내는 Producer가 있구요.

이 물을 받아주는 Consumer가 존재합니다.

 

코드도 이러한 것이 반영되는데요.

Producer와 Consumer를 생각하며 보도록 하겠습니다.

 

2-1. Producer

데이터를 내보내주는 Producer의 간단한 코드를 보도록 하겠습니다.

아래와 같이 builder블록을 열어서 사용합니다.

1부터 8까지 Int타입을 흘려보내주고 있군요.

 

이 flow 블록은 suspend될 수 있습니다.

그런데 아래의 flow빌드타입을 이용한 함수는 suspend함수가 아니지요.

그럼에도 불구하고, suspend함수인 delay를 사용할 수 있도록 되어있습니다.

참고로 안드로이드스튜디오는 suspend함수에 대해서는 좌측 기둥에 화살표로 표시를 잘 해주고 있습니다.

 

emit함수는 데이터를 흘려보내는(방출하는)함수입니다.

 

 

좀더 복잡한 코드도 보도록 하겠습니다.

 

 

2-2. Consumer

위에서 본 코드가 stream을 생성하는 프로듀서라고 한다면,

이 Stream을 받을 컨슈머 코드가 있어야 겠지요.

아래와 같이 collect블록을 이용해서 consume할 수 있습니다.

 

 

한가지 기억할 것은 Flow는 Cold한 비동기 Stream API라는 것 입니다.

즉, flow빌더블록은 flow가 collect되기 전까지는 실행되지 않습니다.

 

아래코드의 경우, 출력되는 순서는 다음과 같은데요.

"started" -> "calling Collect" -> "flow started" -> 1,2,3

simple함수가 먼저 실행되었지만, "calling Collect"가 먼저 출력된 이유는,

Flow가 Cold하기 때문이지요.

즉, simple함수는 collect되기 전까지 실행되지 않았던 것 입니다.

 

 

2-2. BackPressure와 Buffer

컨슈머 코드에서는 2초마다 suspend되어 delay를 시키는데요.

프로듀서는 0.3초마다 delay가 되고 있습니다.

그럼 총 2.3초의 delay가 발생되게 됩니다.

 

 

원하는 결과는 처음에만 produce의 delay시간이 적용되고,

consumer가 원하는 대로 2초의 delay만 생기는 것 인데요.

실제로는 4초의 delay가 발생하는 것을 알 수 있습니다.

이것은 coroutine이  backpressure에 대한 대응을 하기위해 설계되어 있기 때문인데요.

 

원하는 결과를 얻기 위해서는 buffer오퍼레이터를 사용해주기만 하면됩니다.

이 함수를 사용하면, consumer와 다른 Coroutine(Thread가 아닌 다른 Coroutine)에서 실행을 하여서,

최초의 producer의 delay외에는 컨슈머의 delay만 적용되게 되는 것 이지요.

결과적으로 4초 ->2초 -> 2초 -> 2초 ....의 delay가 발생되는 것을 볼 수 있습니다.

 

 

물론 이 2초의 delay만으로 컨슈머코드가 제대로 동작하기 힘들다면,

Exception이 발생할 수 있으므로 주의가 필요합니다.

 

 

3. asFlow 오퍼레이터

아래와 같이도 중간자 오퍼레이터를 이용해서 Flow로 구현할 수도 있습니다.

 

 

2-2. Flow vs Channel

Flow는 Collect하기 전까지는 실행되지 않는다고 하였는데요.

이와 반대되는 것이 Channel인데요.

Channel은 Hot한 스트림입니다.

물론 Flow와 같이 비동기이구요.

 

 

4. buffer Operator

위에서도 보았던 아래와 같은 simple함수가 있다고 하겠습니다.

 

아래 코드의 measureTimeMillis블록이 simple함수를 실행하게 되는데요.

이 때 중간에 사용된 buffer오퍼레이터를 잘 보시면 좋습니다.

이 함수는

 

 

 

 

1-1. Sequence vs Flow

Coroutine의 API에는 비동기 API인 Flow와 다르게,

동기적으로 연속적인 흐름을 제어하는 Sequence가 있습니다.

아래의 결과는 1,2,3이 나오겠지요.

 

 

8. Flow와 Room

Room은 Flow API가 포함된 버전의 Coroutine을 지원합니다.

따라서, 아래와 같이 DAO클래스에 포함된 함수의 return타입을 FLOW로 한다면,

DB가 변경될 때 마다 이 것에 emit을 받을 수 있겠지요.

 

 

이것은 Room을 사용하는데 있어서 매우 중요한 feature이기도 합니다.

insert만 정상적으로 된다면,

바로바로 Noti를 받아서 ui를 업데이트 할 수 있겠지요.

 

다만 주의할 점이 있습니다.

그것은 DB의 모든 데이터가 유저에게 유의미한 UI에 반영되지 않는데,

DB가 변경될 때마다 Flow는 emit을 할 것입니다.

필요하지 않은 업데이트가 자주 발생할 수 있다는 이야기이지요.

 

그래서 이럴경우에 아래와 같이 "distinctUntilChanged" 오퍼레이터를 사용하면 효과적입니다.

 

 

 

 

 

9. Flow사용시 주의할 점

9-1. Sequential하다

producer가 suspend function이 값을 return할때까지 suspend하게 됩니다.

다음으로 넘어가지 않는 것 이지요.

 

9-2. CoroutineContext에 주의

flow producer코드는 다른 CoroutineContext에서 값을 방출할 수 없습니다.

따라서 다른 CoroutineContext또는 withContext를 이용한 다른 Coroutine에서 emit을 호출하면 않됩니다.

 

9-3. collector가 여러개인경우

flow를 각기 다른 인터벌로 collect하는 경우 같은 데이터를 여러번 fetch하게 될수도 있는데요.

flow를 공유하기 위해서는 shareIn 오퍼레이터를 사용하는 것이 효율적 입니다.

 

 

 

 

 

 

 

 

 

 

 

 

 

 

 

 

728x90

+ Recent posts