오늘은 Kotlin Coroutine의 SharedFlow 에 대해서 정리해 보도록 하겠습니다.
1. SharedFlow
SharedFlow 는 이름에서 알 수 있듯이,
Collector 가 여러개인 경우,
Collector 들이 emit 된 값들을 동시에 consume 할 수 있도록,
Share(공유)되는 Flow 의 API 입니다.
fun main() = runBlocking {
val sharedFlow = MutableSharedFlow<Int>(replay = 1)
launch {
for (i in 1..5) {
sharedFlow.emit(i)
println("Emitted: $i")
}
}
launch {
sharedFlow.collect { value ->
println("Collector 1에서 받은 값: $value")
}
}
launch {
sharedFlow.collect { value ->
println("Collector 2에서 받은 값: $value")
}
}
}
2. Hot Stream 과 Buffer
SharedFlow는 HotFlow인데요.
데이터를 Consume 하는 곳과 무관하게,
데이터를 흘려보내는 API라는 것 입니다.
그래서 값이 생성되면 바로 emit 해 버립니다.
Hot Stream 과 Cold Stream에 대해서는 아래 글의 1-2를 참조해 주세요.
>> Kotlin Coroutine Flow 총정리 part3 # launchIn
그래서 collector 들이 준비가 안되어 있는데,
데이터가 생성되면 아무것도 받지 못하게 됩니다.
2-1. Buffer
그런데 SharedFlow에는 Buffer라는 것이 있어서,
만약 Collector 가 준비가 않되어서, 데이터를 받지 못하는 것을 방지할 수 있습니다.
아래 코드의 경우,
2번째 launch블록의 코드는 delay가 첫번째 launch 블록보다 더 길기 때문에,
원래는 Hot Stream 인 sharedFlow의 값을 받지 못합니다.
하지만, MutableSharedFlow에,
아래와 같이 extraBufferCapacity를 설정해 줄 경우는 받을 수 있게 됩니다.
fun main() = runBlocking {
val sharedFlow = MutableSharedFlow<Int>(extraBufferCapacity = 3)
launch {
for (i in 1..5) {
delay(200)
sharedFlow.emit(i)
println("방출된 값: $i")
}
}
launch {
sharedFlow.collect { value ->
delay(500) // 위에 방출되는 emit보다 더 느림
println("Collect에서 받은 값: $value")
}
}
}
위의 결과는 아래와 같습니다.
원래는 흘러가서 못 받아야 했던 값을 받아낸 것을 볼 수 있습니다.
방출된 값: 1
방출된 값: 2
방출된 값: 3
방출된 값: 4
Collect에서 받은 값: 1
방출된 값: 5
Collect에서 받은 값: 2
Collect에서 받은 값: 3
Collect에서 받은 값: 4
Collect에서 받은 값: 5
참고로, Buffer의 개수가 부족할 경우에는,
기본적으로 emit 을 잠시 suspend 하게 됩니다.
위에서 값을 다 받을 수 있었던 것도, 버퍼가 3개였는데, 다 찼기 때문에 emit을 멈추어 주었기 때문입니다.
다만, 데이터의 실시간성이 중요한 경우는 이렇게 사용하면,
Buffer를 사용하는 것이 데이터의 시간차가 발생하므로 무조건 좋다고 만은 할 수 없습니다.
3. Replay
위에서 Buffer가 Collector들이 준비가 안되면,
데이터를 Buffer에 잠시 준비시켰다가,
준비가 되면 emit을 해 주었는데요.
Replay는 가장 최근의 데이터를 새로운 Collector 가 Collect를 시작할 때 보내줍니다.
위에서도 잠깐 보았지만, default값은 0으로 되어 있는데요.
새로운 Collector가 나타나면 최근 아이템을 말 그대로 Replay 해줄 수 있습니다.
val sharedFlow = MutableSharedFlow<Int>(replay = 2)
만약, Replay 공간이 꽉차게 되면 어떻게 될까요?
가장 오래된 값이 제거 되도록 되어 있습니다.
fun main() = runBlocking {
val sharedFlow = MutableSharedFlow<Int>(replay = 2)
launch {
for (i in 1..3) {
sharedFlow.emit(i)
println("방출된 값: $i")
delay(300)
}
}
// 일부로 기다리게 하기
delay(800)
launch {
sharedFlow.collect { value ->
println("Collector가 받은 값: $value")
}
}
}
아래 보이는 것처럼,
replay 없이는, delay 때문에 원래 3번째 아이템만 받았어야 하는데요.
replay덕분에, 최근의 2개의 받을 수 있게 되었습니다.
방출된 값: 1
방출된 값: 2
방출된 값: 3
Collector가 받은 값: 2
Collector가 받은 값: 3
위에서 모든 값을 받을 수 있었던,
Buffer를 사용한 것과는 다른 것을 볼 수 있습니다.
4. asSharedFlow
이 API는 캡슐화 하기 위해서 사용하는 확장함수인데요.
아래와 같이, MutableSharedFlow로 선언하고,
asSharedFlow를 사용해서 readOnly 타입의 SharedFlow로부터 Collector들이 값을 Collect 하도록 해 줍니다.
실제로는 이렇게 SharedFlow를 사용해야 하겠지요.
private val _sharedFlow = MutableSharedFlow<Int>(replay = 1)
val sharedFlow: SharedFlow<Int> = _sharedFlow.asSharedFlow()
5. tryEmit()
tryEmit 은 코루틴의 suspend 함수 없이도 값을 방출할 수 있게 해 줍니다.
Collect 하는 곳에서 Coroutine을 사용하기 어려운 경우도 있기 때문인데요.
SharedFlow 처럼 여러곳에서 Collect 해서 사용하는 API 라면 필요성을 느낄 법합니다.
emit() 과는 다르게, emit을 시도해서 성공하면 true 를 리턴해 줍니다.
그런데, emit을 하면 하는 것이지, 왜 false 하는 경우가 생기게 될까요?
아래에서는 buffer 사이즈가 꽉 차게 되서 실패를 할 수 있습니다.
fun main() = runBlocking {
val sharedFlow = MutableSharedFlow<Int>(extraBufferCapacity = 2)
launch {
for (i in 1..5) {
val result = sharedFlow.tryEmit(i)
if (result) {
println("방출 성공: $i")
} else {
println("값 방출 실패: $i")
}
delay(200)
}
}
launch {
sharedFlow.collect { value ->
delay(500) // 시간 지연시키기
println("Collector에서 받은 값: $value")
}
}
}
위 코드에서는 4,5는 값이 유실되었습니다.
버퍼가 2개로 꽉 차있었기 때문입니다.
원래는 버퍼가 꽉차면 emit을 suspend 한다고 나왔었는데요.
이 API는 suspend 가 작동하지 않습니다.
emit 은 suspend 함수인 반면, tryEmit은 아니라는 것을 알 수 있습니다.
무조건, Coroutine 없이도 값을 방출할 수 있게(emit) 해주는 함수라고 외우면 안되는 이유입니다.
위 예제 코드의 결과도 아래와 같이 2개의 값이 유실된 것을 볼 수 있습니다.
방출된 값: 1
방출된 값: 2
방출된 값: 3
방출실패: 4
방출실패: 5
Collector에서 받은 값: 1
Collector에서 받은 값: 2
Collector에서 받은 값: 3
SharedFlow는 Hot Stream 이기 때문에,
비동기 코드에서 다루기가 어려울 수 있는데요.
그래서 Buffer나 Replay 가 존재하는 것 이겠지요.
그런데, tryEmit을 사용하면 suspend가 동작하지 않게 되면서,
좀 더 생각할 것이 많아지게 됩니다.
이상으로 SharedFlow 에 대해서 정리해 보았습니다.
'Android 개발 > Coroutine , Flow, Channel' 카테고리의 다른 글
StateFlow vs SharedFlow 를 비교해보자 #이벤트 핸들링 (1) | 2023.05.06 |
---|---|
Flow 결합연산자 combine , zip , merge 비교 총정리 # Kotlin Coroutine (0) | 2023.05.03 |
onEach vs onStart 비교 정리 # Kotlin Coroutine Flow (0) | 2023.05.03 |
Coroutine suspend 동작에 관한 좋은 예와 잘못된 예 # 비동기 (0) | 2023.04.18 |
flatMapLatest 이용해서 값이 들어오는 것을 기다리기 # Coroutine (0) | 2023.04.18 |
함수 실행 시간 측정 후 Delay 사용하기 (0) | 2023.04.14 |
MutableStateFlow 이용한 로딩 후 로딩 완료 기다리기 구현 방법 (0) | 2023.04.08 |
StateFlow 정리 # Android Kotlin Coroutine getStateFlow StateIn (0) | 2022.10.12 |
Kotlin Coroutine Flow 총정리 part3 # launchIn (0) | 2022.10.10 |
Kotlin Coroutine 총정리 part2 # Cancellation Exception Handling (0) | 2022.10.09 |
댓글