Android 개발/Coroutine , Flow, Channel

Kotlin Coroutine Flow 총정리 part3 # launchIn

Developer88 2022. 10. 10. 00:01
반응형

지난 글에 이어서 part3에서는 Coroutine의 Flow에 대해서 정리해 보도록 하겠습니다.

지난 part1과 part2는 아래 링크를 참조해주세요.

>> Kotlin Coroutine 총정리 part1 # launch, async, Context, Job, CoroutineScope

 

>> Kotlin Coroutine 총정리 part2 # Cancellation, ExceptionHandling

 

1. Flow

1-1. Flow

비동기 작업을 하는 Coroutine의 suspend 함수는,

일반적으로 하나의 값을 반환(return)하거나 완료 시 종료됩니다.

스트림이 아닌 단일 값 처리에 적합하다는 의미이지요.

 

비동기적으로 데이터를 지속적으로 방출하거나,

한 번에 여러 개를 처리하는 데 적합한 API가 있는데요.

Coroutine의 비동기 스트림 API인 flow 입니다.

 

실제 코드를 통해서 flow를 볼까요?

flow는 아래와 같이,

flow builder를 통해 생성할 수 있습니다.

아래는 for문에서 emit()함수를 사용하고 있는데요.

emit은 방출하다는 뜻인데, 스트림에서 아이템들을 stream으로 흘려보내는 역할을 하는 함수입니다.

 

 

 

아이템들이 흘러나오면 이것들을 수집해서유용하게 써야하는데요.

Collect()함수를 사용해, 수집할 수 있습니다.

 

 

실행해보면 100ms의 딜레이를 가지고 아이템들이 흘러나오는 것을 볼 수 있습니다.

 

 

1-2. Cold Stream vs Hot Stream

이런 stream api를 사용하면 항상 나오는 단어가 있는데요.

cold 한 지 hot 한 지입니다.

이 둘의 의미를 구분해 보도록 하겠습니다.

 

구분 의미
Hot 데이터를 소비하는 곳과 무관하게 아이템들을 생산 List, Set 같은 Collections
Channel
Cold lazy라는 표현을 많이 하는데요.
데이터를 소비하는 곳에서 필요하다고 하기 전에는 동작하지 않다가,
필요하다고 할 때 동작하게 됩니다.

필요하다고 하기 전까지는 시작하지 않는 API입니다.
kotlin에서는 lateinit var로 선언하는 변수는 향후 lazy하게 생성되어 들어올 변수를 의미하는데요. 이러한 개념입니다.

필요하지 않을 때 동작하지 않으므로,
메모리도 필요할 때 사용
하게 됩니다.
Flow,
Sequence

 

 

Flow는 Cold 한 스트림 API인데요.

스트림은 데이터 소비가 시작될 때(즉, collect()가 호출될 때)마다,

새롭게 데이터 흐름을 생성해 줍니다.

 

재미있는 점은,

Flow는 Coroutine API이지만,

suspend 키워드를 필요로 하지 않습니다.

코루틴을 기반으로 비동기 스트림을 정의하지만,

flow가 cold stream이어서,

collect()함수가 호출되기 전에는 실행되지 않기 때문입니다.

 

아래에서도 simple함수에는 suspend 키워드가 붙어있지 않음을 알 수 있습니다.

collect()함수만 Coroutine 블록안에서 실행되면 되는 것 이지요.

 

 

Flow 의 모든 API가 다 Cold Stream은 아니구요.

Cold Stream 과 Hot Stream 모두 다 존재합니다.

실제로 비동기상황에서 Cold Stream을 다루어보면,

데이터가 collect 되지 안을 수 있어 어려움을 겪게 되는 경우가 있습니다.

 

1-3. Upstream 과 DownSteram

자신을 기준으로 위에서 흘러들어오는 Stream을 upstream이라고 하고,

아래로 흘러내려가는 Stream을 downStream이라고 합니다.

특정 operator같은 것들이 영향을 미치는 범위를 설명할 때 주로 사용합니다.

예를들어, context() 오퍼레이터는 자신보다 위의 연산자들,

upstream에만 영향을 줍니다.

 

2. Flow Cancellation (취소)

2-1. withTimeoutOrNull

flow는 withTimeoutOrNull을 이용하면 쉽게 취소가 가능한데요.

아래코드는 stream을 소비하는 withTimeoutOrNull()함수가 250ms뒤에 timeout을 하게 됩니다.

 

 

 

실행하면 다음과 같은 결과를 볼 수 있습니다.

250ms안에 2개의 아이템이 흘러들어와서 print를 하고 종료됩니다.

 

 

2-2. flow cancellation 체크

flow빌더는 각각의 흘러나가는 밸류들에 대해서 ensureActive를 실행하는데요.

ensureActive는 CoroutineScope의 확장함수로서,

isActive가 false로 나오면 CancellationException을 throw 해서, Coroutine을 cancel 시킵니다.

 

 

아래 코드를 실행해 보고 정리해 보도록 하겠습니다.

 

 

결과는 다음과 같은데요.

4를 흘려보내다가, cancellationException이 전파되어서, 아래와 같이 Exception이 나게 되는 것을 볼 수 있습니다.

 

 

이러한 Cancellation체크는 flow빌더에서만 가능한 것이고요.

아래와 같이, asFlow 같은 확장함수를 사용한 경우는 동작하지 않습니다.

 

 

실행해 보면 1부터 5까지 동작하고 나서 Exception이 전파되는 것을 볼 수 있습니다.

 

 

3. Flow 빌더들

위에서는 flow블록을 통해서 flow를 생성했었는데요.

flow를 선언할 수 있는 flow 빌더들이 여러 가지 있습니다.

 

3-1. flowOf

flowOf는 정해진 값의 set을 내부적으로 반복문을 돌려서 emit 해 줍니다.

 

 

다음 코드는 emit을 4개 한 것과 같은 코드를 심플하게 작성할 수 있습니다.

 

 

3-2. asFlow

asFlow는 코틀린의 컬렉션이나 시퀀스를 Flow로 바로 변환해 주는 함수입니다.

이를 이용해서 아래와 같이 쉽게 Stream을 내보내고 Collect 할 수 있습니다.

 

 

 

내부 코드는 아래와 같은데요.

내부적으로는 forEach로 아이템들을 emit 해 줍니다.

 

 

range에 asFlow함수를 이용하는 것도 좋습니다.

 

 

4.  중간 연산자들(Operators)

4-1. map

stream에서 흘러나온 데이터에 어떤 변경을 할 때, map 연산자를 사용해 줍니다.

 

 

 

4-2. onEach

onEach는 주어진 action을 하는 flow를 return 해 줍니다.

 

 

map과는 다른 점이라면, if문을 써서 해당하는 부분만 가공해도,

upstream에서 흘러나온 아이템들은 그대로 흘려내 보네 줍니다.

(map은 if문만 써버리면 else가 없을 경우, 아무 아이템도 흘려보내주지 않습니다.)

 

 

 

실행해 보면 다음과 같은 결과가 나옵니다.

 

 

4-3. filter 와 filterNot

filter는 조건에 맞는 아이템만 흘러나가도록 하는 역할을 합니다.

다음에서는 짝수만 흘러나가도록 하였습니다.

 

 

실행해 보면 짝수만 흘러나가게 되는 것을 볼 수 있습니다.

 

 

filter와는 반대로 주어진 조건의 반대인 경우만 흘러나가게 하는 것이 filterNot 입니다.

 

filter와 fitlerNot의 코드를 보면 다음과 같이 inline함수로 되어있는데요.

주어진 조건에 대한 서술(predicate)이 맞으면 emit을 하도록 되어있습니다.

 

 

아래에서는 이제 홀수의 아이템만 흘려보내줍니다.

 

 

4-3. transform

transform은 가장 기본적이면서도 유연한 operator 인데요.

흘러온 아이템을 변형시키거나,

skip 도 할 수 있고요.

같은 아이템에 대해 다른 형태로 여러 번 흘려보낼 수도 있습니다.

 

아래에서는 하나의 아이템이 흘러나오면,

transform에서 2개의 아이템을 만들어서 흘려보냅니다.

 

 

 

결과는 다음과 같습니다.

1,2,3이 흘러들어왔지만, 아래와 같이 6개의 아이템이 transform을 통해서 방출되었습니다.

 

 

4-4. take

몇 개의 값만 취할지를 결정해 주는 것이 take입니다.

아래 코드를 실행해 보겠습니다.

 

 

결과값은 아래와 같습니다.

 

 

4-5. takeWhile

위의 take와는 다르게 개수가 아니라, 

조건을 만족할 경우만 취하도록 하는 것입니다.

 

 

 

실행하면 다음과 같은 결과를 볼 수 있습니다.

 

 

4-6. drop 과 dropWhile

drop은 처음 몇개의 스트림을 drop해 줍니다.

 

 

실행하면 다음과 같은 결과를 볼 수 있습니다.

 

 

dropWhile을 사용하면, 개수기준이 아닌,

조건을 만족하는 첫번째 요소들을 제외한 아이템을 흘려보냅니다.

 

 

아래코드를 실행해 보겠습니다.

3보다 작은 1과 2까지는 drop되겠지요.

 

 

실행하면 아래와 같은 결과를 볼 수 있습니다.

 

 

5. Terminal Operators

위에서 봤었던 collect()함수는 아이템들을 수집하기 시작하는 Terminal Operator입니다.

cold한 flow는 collect함수같은 terminal 연산자가 있어야,

아이템을 흘려보내기 시작합니다.

map이나 transform같은 중간연산자들이 중간에서 흘러나온 데이터들을 변경해 주는 역할을 하는데,

이들과는 조금 다른 역할을 하는 것 이지요.

 

collect는 가장 기본적인 terminal operator 인데요.

이외에도  toList, toSet, first, single, reduce, fold, 같은 terminal Operator들이 있습니다.

 

5-1. toList 와 toSet

api이름 자체로 무슨 연산을 하는지 알수 있는데요.

아래와 같이 list나 set으로 만들 수 있습니다.

 

 

실행결과는 다음과 같습니다.

 

 

5-2. first 와 single

first는 말 그대로 가장 첫 아이템을 흘려보내주는 terminal operator 이구요.

 

 

실행결과는 가장 첫 아이템이 나오게 됩니다.

 

 

single은 조금 특이할 수 있는데요.

하나의 아이템만 흘러나올 때만 실행해주고, 그 외에는 Exceptin처리를 합니다.

 

 

single은 익셉션처리를 동반하게 되므로, 

아무래도 single보다는 singleOrNull 연산자를 더 많이 쓸 것 같습니다.

 

 

5-3. reduce

reduce는 연산된 결과 값을 이후의 첫 번째 인자로 넣어서 계속 누적으로 계산하는 방식인데요.

설명이 조금 추상적이라서, 코드를 보면서 이해해 보겠습니다.

 

먼저 아래와 같이 더해주는 reduce연산을 한다고 가정해 보겠습니다.

reduce에 대해 이해하기 위해서 더하기 연산을 하도록 하였습니다.

 

 

 

이를 실행해 보면 결과 값은 다음과 같은데요.

 

 

이 값이 어떻게 나온 것인지 이해해 보도록 하겠습니다.

이 21이라는 값은 1부터 6까지의 값이,

reduce를 통해 누적으로 더해져서 나온 값입니다.

구체적으로는 다음과 같은 순서로 누적되어 실행이 된 것인데요.

  • 1 + 2(첫 2개의 아이템은 그대로 인자에 들어감)
  • 3 + 3(1 + 2의 값이 accumulator에 들어오고, 3번째 아이템인 3이 더해짐)
  • 6 + 4(3 + 3의 값이 accumulator에 들어오고, 4번째 아이템은 4가 더해짐)
  • 10 + 5
  • 15 + 6 = 21 반환하고 종료

처음에 연산된 값이 accumulator에 들어오는 것이 reduce의 특징이라는 것입니다.

복잡해 보이지만, 결론적으로는 1 부터 6까지 더한 것과 같은 결과를 보게 됩니다.

 

map이 Stream에서 하나의 아이템들을 가공해서 흘려보내고 있었다면,

reduce는 누적으로 가공해 하나의 아이템으로 내보낸다고 보면 될 것 같습니다.

 

5-4. fold

reduce와 비슷하지만, 초기 값을 인자로 넣어주어야 하고, 

그 값에 처음으로 흘러들어온 아이템을 더하면서 누적으로 연산하게 됩니다.

그래서 시작이 8 + 1이 되겠고요.

다음은 9 + 2와 같은 식으로 계속 누적되어 연산됩니다.

 

 

5-5. count

연산자명 그대로 아이템들의 개수를 세어줍니다.

 

 

 

조건을 서술하여서 조건에 해당하는 것의 개수만 셀 수도 있습니다.

 

 

결과는 3이 나오는 것을 알 수 있습니다.

 

 

5-6. launchIn 과 onEach

launchIn은 조금 특수한 terminal 연산자 인데요.

이것은 별도의 코루틴에 플로우를 론치시켜서, 코드의 실행이 즉시 계속되게 해줍니다.

갑자기 이게 무슨 말일까 할 수 있는데요.

 

예를 들어서, 게임같은 곳에서 유저가 마우스를 클릭한다든지, 특정 유아이를 터치한다든지,

연속적인 인터랙션이 일어난다고 가정해 보겠습니다.

해당 터치에 대해서 즉시 반응해 주고, 네트워크리퀘스트가 필요한 경우 해준다음 반응을 결과값을 받아서 표시해주고 해야 하는데요.

flow는 기본적으로 이런 반응을 하기에는 위에서 언급한 것처럼 cold한 stream입니다.

collect함수 같은 terminal 연산자가 붙어주기 전까지는 start 하지 않는 API인 것이지요.

그래서 이런 점을 보완하기 위한 API로서 launchIn이 있습니다.

별도의 코루틴에 플로우를 론치 시켜, 연속적인 이벤트에 반응해서 바로바로 스트림을 내려보낼 수 있도록 하고 있지요.

 

 

공식문서에는 이러한 launchIn과 onEach를 같이 사용하면,

addEventListener가 동작하듯 할 수 있다고 나와있습니다.

게다가, Cancellatoin이나 Strucutred Concurrency의 구조 덕분에 removeEventListener 같은 기능은 필요 없고요.

네트워크동작이나, 유저인풋에 따른 유아이 변경에도 적용해 볼 수 있겠지요.

 

실제 코드를 보면서 정리해 보도록 하겠습니다.

events함수는 연속적으로 100ms간격으로 실행되는 UI이벤트가 flow를 통해서 계속 들어온다고 생각하면 되고요.

 

launchIn 연산자를 사용하였기 때문에 계속해서 데이터를 받아들이고,

onEach에서 로그를 찍고 있습니다.

참고로 launchIn에 인자로 들어가는 것은 CoroutineScope로 여기서는 this가 됩니다.

 

 

실행해 보면, 일정간격으로 실행되며 출력되는 것을 볼 수 있습니다.

 

 

6. Context 변경

Flow는 Flow가 호출된 Coroutine의 Context 에서 실행이 됩니다.

좀 더 구체적으로는 terminal 오퍼레이터가 호출된 Context에서 실행이 되는데요.

여기서는 이 Context를 변경하는 방법에 대해서 정리해 보겠습니다.

 

 

 

보통 Coroutine에서는 withContext를 이용해서 다른 Dispatcher를 이용해서 Context를 변경해서 사용하는데요.

CPU자원을 많이 소모하는 연산에서는 Dispatchers.Default를, UI를 업데이트해야 하는 곳에서는 Dispatchers.Main을 이용하였습니다. 

다만, flow내부에서 기존 Coroutine에서와 같이 withContext 등으로 Dispatcher를 지정할 경우,

illegalStateException을 보게 됩니다.

 

그래서, flow에서는 context를 바꾸는 다른 방법을 사용해야 합니다.

아래에서는 이때 사용해야 할 flowOn연산자에 대해서 알아보도록 하겠습니다.

 

5-1. flowOn 

flowOn은 upStream 대상을 특정한 콘텍스트에서 실행하도록 할 수 있습니다.

아래 공식문서에 나온 것처럼, flowOn에 인자로 들어간 context가 위의 upstream에서 실행되도록 해 줍니다.

(참고로 기준점의 위에서 흘러나오는 부분을 upstream, 기준점 아래로 흘러내려가는 부분을 downStream이라고 합니다.)

Dispatchers.IO라고 인자로 너어지면, 그 위의 연산자들은 IO에서 실행되는 것이지요.

 

 

아래 코드를 보도록 하겠습니다.

upstream에 대해서는 Dispatchers.Default 에서 실행되도록 하고 있습니다.

하지만 collect 하는 쪽에서는 기존의 스레드인 Main에서 실행될 텐데요.

실행해 보겠습니다.

 

 

 

결과는 아래와 같이 upstream에서는 Dispatchers.Default에서,

collect를 실행한 곳에서는 Main에서 실행되었습니다.

핵심은 flowOn연산자는 upstream에만 영향을 끼친다는 것입니다.

 

 

만약 아래와 같이 flowOn2번 적용되면 어떻게 될까요?

이럴 경우 upstream은 처음에 적용했던 Dispatcher가 그대로 적용됩니다.

공식문서의 설명에 의하면,

flowOn 연산자는 Context가 지정되지 않은 연산자에만 영향을 끼치기 때문입니다.

이미 IO가 적용되어 버리면, 나중에 나온 Default가 보면 이미 Context가 지정되어 있으므로 변경할 수 없습니다.

 

 

6. conflate 와 collectLatest

6-1. conflate

conflate의 원래 뜻은 여러 개를 하나로 합친다는 것 인데요. 

사실 API자체의 기능은 이 뜻과 정확하게 대응되지는 않습니다.

단순히 여러개를 하나로 합치는 것이 아니라,

중간에 지연이 생기는 경우,

collector가 가장 최근의 아이템을 받을 수 있도록,

지연된 아이템을 무시하고 최근의 아이템을 흘려보내는 역할을 합니다.

특정한 이유로, 지연되는 아이템에 누락이 발생해도 상관이 없는 경우에 사용해야 합니다.

 

 

코드를 보도록 하겠습니다.

아래와 같은 flow가 존재한다고 가정해 보겠습니다.

아이템들은 1부터 3까지 100ms간격으로 흘러나옵니다.

이것을 collect 하는데요.

처음수를 collect 하고 처리하는데, 300ms의 시간이 걸립니다.

그동안 2번째, 3번째 아이템은 이미 흘러나와버립니다.

그럼, 2번째 아이템은 skip 되어버리고, 가장 최근은 3번만 collect 합니다.

 

 

실행해 보면 위에서 언급한 데로, 1과 3 아이템만 출력되는 것을 볼 수 있습니다.

 

 

6-2. collectLatest

눈치 빠르신 분들은 API이름만 보고도 무슨 내용인 줄 아시겠지요.

가장 최근의 것만 collect 해 줍니다.

 

한 가지 주의할 점은, collectLatest는 마지막 아이템이 흘러나올 때까지 기다렸다가 그 값을 collect 하는 것이 아니라는 점입니다.

이 API는 첫 번째 아이템이 흘러나오는 도중에 2번째 아이템이 흘러나오면, 

첫번째 아이템을 cancel 시켜버립니다.

마지막 값을 취하기는 하지만, 그 과정이 확연히 다르므로 잘 알아둘 필요가 있습니다.

 

conflate가 중간값을 drop 시켜서 처리하는 api이라면,

collectLatest는 느린 collector들을 취소시키고,

매번 다시 새로운 밸류를 받도록 emit을 해서 처리를 합니다.

API이름에 xxxLatest가 붙는다면, 다 이런 식의 처리를 가리킵니다.

이름만 봐서는 중간값을 drop 할 것 같지만, 실제로는 취소시켜 버린다는 것이 가장 큰 차이입니다.

 

다음 예제를 실행해 보겠습니다.

 

 

 

실행해 보면 다음과 같은 결과가 나오는 것을 볼 수 있는데요.

collectLatest블록 안에서 collect를 하는데,

Collecting 1이라는 로그가 찍히고,

300ms의 delay가 발생합니다.

이동안 2번째, 3번째 아이템들이 흘러나옵니다.

그동안 1번째, 2번째 아이템은 취소되어 버리고요.

Done3로 로그가 찍히게 되는 것이지요.

블록 안에서의 delay동안 새로운 아이템이 흘러나와버리면,

기존 것들은 다 취소시켜 버리는 것이지요.

 

 

7. flow 조합하기

7-1. zip

RxJava 등을 써보신 분이라면 매우 익숙한 API일 텐데요.

아래 공식문서의 예에 나오는 것처럼, 두 개의 flow를 결합해서 아이템들을 받을 수 있습니다.

 

실제로 아래와 같은 코드를 실행해 보도록 하겠습니다.

 

 

 

결과는 다음과 같이 합쳐져서 나오는 것을 알 수  있습니다.

 

 

7-2. combine

합치긴 합치는 데, 양쪽 아이템 중 하나가 흘러나올 때 결합을 시키는 방식을 취합니다.

zip으로 합칠 때는 두 개의 데이터가 준비가 되었을 때만 결합이 이루어졌었는데요.

combine은 하나만 준비가 되어도 동작을 하게 됩니다.

 

 

무슨 말인지 예제를 보면서 이해해 보겠습니다.

nums는 300ms간격으로 아이템들을 흘려보내고요.

strs는 400ms간격으로 아이템들을 흘려보냅니다.

nums가 strs보다 더 빨리 나오게 되는데요.

zip에서는 이 둘이 아이템이 나올 때까지 가다려준반면,

combine은 기다려주지 않고, 둘 중 하나만 나오면 바로 collect 해 버립니다.

 

 

실행해 보면 다음과 같은 결과를 보여줍니다.

nums에 2가 나왔을 때도,  strs에서는 아직 아이템이 흘러나오지 않았으므로,

기존의 "one"을 이용해 합쳐버리기 때문에 다음과 같은 결과가 나오는 것입니다.

 

 

2개 flow의 아이템이 준비될 때까지 기다리는 zip과는 확연한 차이가 있습니다.

 

8. Flat 시키기

flow에서 흘러나온 아이템을 바탕으로,

또 다른 비동기 리퀘스트를 해서 flow를 만들어야 하는 경우가 발생할 수 있습니다.

 

아래에서 흘러나온 아이템을 가지고

map중간 연산자로 다시 requestFlow()를 실행해 flow를 방출하였는데요.

이렇게 되면 flow로 wrap을 하는 결과가 되어버립니다.

이렇게 중첩된 flow를 사용할 필요 없이 flat 하게 펴서 사용할 수 있는 연산자가 필요할 텐데요.

이럴 때 사용하는 flat 시켜주는 api에 대해서 알아보도록 하겠습니다.

 

 

8-1. flatMapConcat

concat은 여러 개를 연결하는 것을 말하는데요.

 

코드를 실행해 보면서, 이해해 보도록 하겠습니다.

먼저 아래와 같이 delay가 포함된 flow를 정의해 줍니다.

request함수 안에 emit이 2개가 들어있는데,

이것은 다른 flatMap Api들과 비교를 위한 것이니, 지금 신경 쓸 필요는 없습니다.

 

 

 

다음으로 flatMapConcat을 아래와 같이 사용해 주겠습니다.

 

 

결과 값은 다음과 같습니다.

flatMapConcat에 it으로 넘어온 값과 requestFlow의 flow연산이 합쳐져서 아래와 같이 나옵니다.

그냥 map으로 했다면, flow안에 flow가 들어있는 중첩된 아이템들이 나왔겠지만,

flatMap으로 한번 펼쳐져서 아래와 같이 나옵니다.

onEach의 아이템들과 그를 기반으로 한 requestFlow의 아이템들이 zip 되어서 나오는 것과 유사합니다.

 

 

flatMapConcat을 실행해 보면, 순서대로 아이템들을 결합한다는 것을 알 수 있습니다.

 

8-2. flatMapMerge

또 다른 flat api로 flatMapMerge 가 있는데요.

Concat이 연결한다는 뜻이고, Merge는 합친다는 뜻인데요.

flatMapMerge는 첫아이템들을 flat 시키고 나서 다음요소를 flat 시킵니다.

flat을 하는 것은 동일하지만, 그 방식에 있어서 약간 다른 모습을 보이는 것이지요.

위에서 사용하였던 requestFlow()를 그대로 사용하겠습니다.

 

flatMapMerge를 아래와 같이 넣어주고요.

 

 

실행해 보면, 다음과 같은데요.

 

 

 

바로 아래가 flatMapConcat을 실행시키면 나오는 결과인데,

flatMapMerge 와의 차이가 보이시나요?

flatMapConcat은 1이 들어간 requestFlow() 함수 블록 끝까지 실행이 될 때까지,

다음 2가 들어가는 것을 기다리는데요.

flatMapMerge는 1이 들어가고 나서 delay 되는 동안, 2가 흘러들어오면,

기다리지 않고 2를 처리합니다.

비동기적으로 동작한다고 볼 수 있겠지요.

 

 

이러 철 기다림 없이 비동기코드처럼 바로 다음인자가 들어와서 실행해 버리는 것이,

flatMapMerge의 특징입니다.

 

8-3. flatMapLatest

flatMapLatest도 있는데요.

collectLatest에서도 본 것처럼, Latest가 붙은 API는 가장 마지막 것만 취하는 것이 아니라,

이전에 진행 중인 것은 취소해 버리는 과정을 통해서, 마지막 것을 취합니다.

 

이번에도 코드를 실행결과를 보면서 이해해 보도록 하겠습니다.

requestFlow() 함수는 그대로 이용하고요. flapMapLatest()로 바꾸어보기만 하겠습니다.

 

 

위 코드의 실행결과는 아래와 같은데요.

Second를 출력할 수 있는 아이템은 3 뿐입니다.

1과 2는 delay 되는 동안, 3번 아이템이 들어오기 때문에, 

cancel 되어버립니다.

 

 

flatMapLatest는 최근 아이템이 들어오면, 기존 아이템들을 취소해 버리기 때문에,

다음과 같은 순서로 동작하게 됩니다.

  • 1이 들어와서 실행이 되고,
  • 2가 들어와서 실행될 때, 1이 실행되는 것은 취소가 되어버리고요.
  • 3이 들어와서 실행될 때, 2가 실행되던 것도 취소가 되어버립니다.
  • 마지막 아이템인 3이 들어와서 실행될 때만, 마지막 라인까지 실행되어 완료됩니다.

 

8-4. 정리

flatMapConcat은 각각의 실행 후 완료된 결과를 완료시켜 주었고요.

flatMapMerge는 각각 실행이 되는데, 완료와 상관없이,

계속 다음다음 아이템을 실행시켜 비동기적으로 Merge 시켜버렸습니다.

flatMapLatest는 하나가 실행되고, 다음 아이템이 실행될 때,

기존 실행되던 것을 취소시켜 버리는 방식을 취합니다.

새로운 아이템이 들어오면 기존에 실행되던 것은 취소시켜버려야 할 때 사용하면 되겠지요.

 

9. Exception 처리

9-1. catch 

flow에서는 catch 연산자를 사용해서 Exception을 처리할 수 있습니다.

 

upstream에서 발생하는 Exception을 catch연산자로 잡아내는 것인데요.

(flowOn과 영향범위가 같습니다. upStream에만 영향을 주는 것이지요)

downStream(자신기준 아래의 연산자들)에는 영향이 가지 않으므로 주의해야 합니다.

 

downStream의 Exception은 처리할 수 없으므로,

collector 블록의 코드 부분에서 발생하는 익셉션만 try.. catch로 처리해 주면 됩니다.

 

 

아래와 같은 코드를 실행해 보겠습니다.

onEach에서 만약 3이면 IllegalStateException이 나도록 해주는 check함수를 사용해 주었습니다.

 

 

 

결과는 다음과 같이 crash가 나지 않고, exception 이 잡히는 것을 볼 수 있습니다.

 

 

참고로 위에서 사용했던 check함수는 다음과 같습니다.

inline함수인데요. 조건이 false이면 IllegalStateException을 내줍니다.

 

 

catch블록에서 다른 값을 emit 하는 것도 가능하고요.

 

 

실행해 보면, Exception을 catch 한 다음다음과 같이 888을 흘려보내줍니다.

 

 

특정 Exception을 다시 throw 해서 다음 catch에서 잡히도록 할 수도 있습니다.

 

 

10. Complete

flow에서도 network나 디스크 IO를 해야 할 때처럼,

Exception이 일어나더라도 리소스를 close 해 주어야 할 수 있습니다.

이럴 때, flow를 try.. finally로 감싸서 처리할 수도 있고요.

flow의 API인 onCompletion을 사용해 줄수도 있습니다.

 

10-1. onCompletion

onCompletion연산자를 이용하면,

아래와 같이 간단하게 Complete을 처리할 수 있습니다.

 

 

결과는 아래와 같이 collecting이 끝나고, "Done"이 찍히는 것을 볼 수 있습니다.

 

 

Exception이 발생하더라도, onCompletion은 항상 실행되는 것을 확인해 볼 수 있는데요.

 

 

 

실행해 보면 다음과 같이 Exception이 발생했음에도 "Done"이 찍힌 것을 볼 수 있습니다.

 

 

10-2. finally 블록의 사용

위와 같이 declative 하게 onCompletion 함수를 사용할 수도 있고요.

try.. finally를 사용하여 imperative 하게 사용할 수도 있습니다.

 

 

실행하면 다음과 같이 나오는 것을 볼 수 있습니다.

 

 

11. 정리

이상으로 Coroutine 중 part3 Flow에 대해서 정리해 보았습니다.

다음 글인 part4에서는 Channel에 대해서 정리해 보도록 하겠습니다.

 

728x90