오늘은 Kotlin Coroutine의 Flow API 중,
flow들을 결합해 주는 오퍼레이터인,
combine과 zip 그리고 merge에 대해서 총정리해 보겠습니다.
1. combine
Combine 은 2개 이상의 스트림 되는 flow 데이터들을,
합쳐서 하나의 flow로 흘려보내 주는 API입니다.
2개의 스트림을 합쳐주는 것은 알겠는데,
각각의 데이터들이 다른 시점에 나올 때는 어떻게 되는 것일까요?
아래 공식문서의 설명에 따르면,
둘 중 가장 최근에 방출되는 값이 있으면,
그 값을 기준으로 합쳐진 flow 를 방출해 줍니다.
단순히 2개의 flow를 합쳐준다고만 이해해서는 안 되고요.
2개의 flow 중 어느 하나에서 아이템이 방출되면,
최근 2개의 flow를 합쳐서 방출해 줍니다.
좀 더 이해해 보기 위해서 코드를 보도록 하겠습니다.
flowA는 1, 2,3 이 지연 없이 그대로 방출(emit) 되고요.
flowB는 emit(방출)될 때마다, 1초 의 지연이 발생합니다.
combine을 사용하면 어떻게 나올까요?
fun main() = runBlocking<Unit> {
val flowA = flowOf(1, 2, 3)
val flowB = flowOf(4, 5, 6).onEach { delay(1 * 1000) }
flowA.combine(flowB) { a, b ->
"a: $a, b: $b"
}.collect { println(it) }
}
결과는 다음과 같습니다.
둘 중 하나만 방출되면, 최근의 2개를 합쳐서 방출한다고 하였는데,
1,2는 합쳐서 방출되지 않은 이유가 무엇일까요?
Combine API는 합쳐서 방출해주기는 하지만,
2개의 flow 모두 최소 1개씩의 아이템들을 방출해 주어야 합쳐서 방출(emit)을 해 줍니다.
flowA에서 1,2 가 흘러나올 때, flowB는 delay가 되어 suspend 되고 있었습니다.
flowB에서 4가 흘러나오기 시작하자,
flowA에서 이미 흘러나온 최근값인 3과 합쳐서 다음 3개의 아이템이 흘러나온 것 입니다.
a: 3, b: 4
a: 3, b: 5
a: 3, b: 6
2. Zip
2-1. zip
combine과 아주 유사한 API로 zip을 들 수 있습니다.
하나의 기존 flow가 데이터를 방출하고 있는데,
이것에 다른 flow를 더해서 새로운 합쳐진 flow를 방출해 주는 API입니다.
아래 코드에서 보는 것 처럼,
API사용방법은 combine 과 다르지 않습니다.
fun main() = runBlocking<Unit> {
val flowA = flowOf(1, 2, 3)
val flowB = flowOf("A", "B", "C")
flowA.zip(flowB) { a, b ->
"a: $a, b: $b"
}.collect { println(it) }
}
결과는 다음과 같습니다.
a: 1, b: A
a: 2, b: B
a: 3, b: C
combine과 많이 유사한 듯한데요.
이들에 대한 비교는 아래 4번에서 구체적으로 보도록 하겠습니다.
2-2. 3가지 아이템 합치기
3가지 아이템을 zip해서 사용한다면 어떻게 해야할까요?
이것을 한번에 하지는 못하구요.
1번 zip을 한 후에,
그 결과값에 아래와 같이 zip을 해 주면 됩니다.
fun main() = runBlocking {
val flow1: Flow<Int> = (1..3).asFlow()
val flow2: Flow<String> = listOf("A", "B", "C").asFlow()
val flow3: Flow<Double> = listOf(1.1, 2.2, 3.3).asFlow()
flow1.zip(flow2) { value1, value2 -> Pair(value1, value2) }
.zip(flow3) { (value1, value2), value3 -> Triple(value1, value2, value3) }
.collect { value -> println("결과: $value") }
}
위 코드의 실행결과는 다음과 같습니다.
결과: (1, A, 1.1)
결과: (2, B, 2.2)
결과: (3, C, 3.3)
3. Merge
merge API는 말 그대로 심플하게 flow를 비동기적으로 합쳐서 흘려보내줍니다.
combine과 zip과는 조금은 결이 다른 셈입니다.
코드를 보면서 이해해 보겠습니다.
아래와 같이 1개의 flow에 다른 하나의 flow를 합쳐서,
같이 방출되도록 할 수 있습니다.
import kotlinx.coroutines.*
import kotlinx.coroutines.flow.*
fun main() = runBlocking<Unit> {
val flowA = flowOf(1, 3, 5, 7, 9)
val flowB = flowOf(2, 4, 6, 8, 10)
flowA.merge(flowB).collect { println(it) }
}
만약, 서로 다른 타입의 아이템인 경우는 아래와 가공을 해 주어야 합니다.
fun main() = runBlocking<Unit> {
val flowA = flowOf(1, 2, 3)
val flowB = flowOf("a", "b", "c", "d")
flowA.map { it.toString() }
.merge(flowB)
.collect { println(it) }
}
결과는 아래와 같습니다.
아래의 순서는 API설명에 나온 것처럼 보장받지는 못합니다.
그래서 이 API를 사용할 때는,
비동기적으로 순서를 보장받지 못한다는 것을 유념하고 있어야 합니다.
1
a
2
b
3
c
d
4. Combine, Zip, Merge 정리
셋다 합쳐주는 연산자인 것은 알겠는데,
어떤 차이가 있는 것일까요.
하나의 표로 보면서 이해해 보도록 하겠습니다.
위에서도 언급하였지만, merge는 combine이나 zip과는 차이가 크게 있어서,
구별하는데 어려움은 없습니다.
다만, merge에서는 위에서도 언급하였지만, 아래표에 나오는 순서는 보장받지 못합니다.
비동기적으로 둘을 합쳐서 흘려보내주기 때문입니다.
combine과 zip을 중심으로 보는 것이 좋습니다.
combine 은 둘 중 하나에서만 데이터가 흘러나오면 A,B의 데이터를 계속 합쳐서 방출해 주고요.
ZIP은 A,B 둘 각각의 최신 데이터만 합쳐서 흘려보내줍니다.
A는 A3이후에는 흘러나온 데이터가 없으므로, B가 아무리 B4, B5를 흘려보내도, ZIP은 아무것도 흘려보내주지 않습니다.
Combine과 Zip 둘의 차이를 비교해 보면 다음과 같습니다.
- Combine 은 둘 중 하나의 아이템이 방출될 때마다 최근데이터를 기준으로 합쳐서 방출
- b5까지 모든 아이템들이 방출됨
- Zip은 둘 다 방출이 될 때마다 합쳐서 방출
- zip에서는 A3이후에 4번째 데이터가 방출되지 않아, b4, b5는 합쳐서 방출되지 못하게 됨
그래서, combine과 zip을 사용할 때는,
각각의 상황에 맞추어서 최적의 것을 선택해 주는 것이 좋습니다.
예를 들어, 네트워크 데이터의 경우 2개의 네트워크에서 데이터를 받아올 경우,
1개만 최신화 되어있을 경우 문제가 될 수 있습니다.
둘 다 호출 이후 흘러나온 최신 데이터일 경우만 사용해야 하는데,
이럴 경우는 ZIP을 사용해 주어야 하겠지요.
5. launchIn
글을 마무리하기 전에,
위의 결합연산자들과 함께 사용하기 좋은 API인 launchIn 함수도 정리해 보겠습니다.
이것은 Coroutine을 launch 해서 collect() 해주는 함수입니다.
아래는 viewModel에서 사용하는 예인데요.
아래와 같이 combine을 쉽고 간결하게 사용할 수 있습니다.
val flowA = flowOf("A1", "A2", "A3").onEach { delay(200) }
val flowB = flowOf("B1", "B2", "B3").onEach { delay(100) }
flowA.combine(flowB) { a, b -> a to b }
.onEach { value -> println(value) }
.launchIn(viewModelScope)
사실 이것은 아래의 예와 같이 사용하는 것과 같은 의미를 가지는데요.
위와 같이 launchIn을 사용하는 것이 훨씬 간결하고 보기도 좋습니다.
val flowA = flowOf("A1", "A2", "A3").onEach { delay(200) }
val flowB = flowOf("B1", "B2", "B3").onEach { delay(100) }
viewModelScope.launch {
flowA.combine(flowB) { a, b -> a to b }
.onEach { value -> println(value) }
.collect()
}
이상으로 Flow 결합연산자 combine , zip , merge에 대해서 비교해 보았습니다.
'Android 개발 > Coroutine , Flow, Channel' 카테고리의 다른 글
StateFlow vs SharedFlow 를 비교해보자 #이벤트 핸들링 (1) | 2023.05.06 |
---|---|
SharedFlow 에 대한 총정리 # Buffer Replay tryEmit Kotlin Coroutine (2) | 2023.05.04 |
onEach vs onStart 비교 정리 # Kotlin Coroutine Flow (0) | 2023.05.03 |
Coroutine suspend 동작에 관한 좋은 예와 잘못된 예 # 비동기 (0) | 2023.04.18 |
flatMapLatest 이용해서 값이 들어오는 것을 기다리기 # Coroutine (0) | 2023.04.18 |
함수 실행 시간 측정 후 Delay 사용하기 (0) | 2023.04.14 |
MutableStateFlow 이용한 로딩 후 로딩 완료 기다리기 구현 방법 (0) | 2023.04.08 |
StateFlow 정리 # Android Kotlin Coroutine getStateFlow StateIn (0) | 2022.10.12 |
Kotlin Coroutine Flow 총정리 part3 # launchIn (0) | 2022.10.10 |
Kotlin Coroutine 총정리 part2 # Cancellation Exception Handling (0) | 2022.10.09 |
댓글