Android 개발/Coroutine , Flow, Channel

SharedFlow 에 대한 총정리 # Buffer Replay tryEmit Kotlin Coroutine

Developer88 2023. 5. 4. 00:01
반응형

오늘은 Kotlin Coroutine의 SharedFlow 에 대해서 정리해 보도록 하겠습니다.

 

1. SharedFlow

SharedFlow 는 이름에서 알 수 있듯이,

Collector 가 여러개인 경우,

Collector 들이 emit 된 값들을 동시에 consume 할 수 있도록,

Share(공유)되는 Flow 의 API 입니다.

 

fun main() = runBlocking {
    val sharedFlow = MutableSharedFlow<Int>(replay = 1)
    launch {
        for (i in 1..5) {
            sharedFlow.emit(i)
            println("Emitted: $i")
        }
    }

    launch {
        sharedFlow.collect { value ->
            println("Collector 1에서 받은 값: $value")
        }
    }

    launch {
        sharedFlow.collect { value ->
            println("Collector 2에서 받은 값: $value")
        }
    }
}

 

 

2. Hot Stream 과 Buffer

SharedFlow는 HotFlow인데요.

데이터를 Consume 하는 곳과 무관하게,

데이터를 흘려보내는 API라는 것 입니다.

그래서 값이 생성되면 바로 emit 해 버립니다.

 

Hot Stream 과 Cold Stream에 대해서는 아래 글의 1-2를 참조해 주세요.

>> Kotlin Coroutine Flow 총정리 part3 # launchIn

 

그래서 collector 들이 준비가 안되어 있는데,

데이터가 생성되면 아무것도 받지 못하게 됩니다.

반대로 지나간 일회성 이벤트를 다시 받아서 처리할 필요가 없을 때는,

값을 유지하고 있는 StateFlow가 아니라,

SharedFlow를 선택해 주어야 합니다.

 

2-1. Buffer

그런데 SharedFlow에는 Buffer라는 것이 있어서,

만약 Collector 가 준비가 않되어서, 데이터를 받지 못하는 것을 방지할 수 있습니다.

아래 코드의 경우,

2번째 launch블록의 코드는 delay가 첫번째 launch 블록보다 더 길기 때문에,

원래는 Hot Stream 인 sharedFlow의 값을 받지 못합니다.

하지만, MutableSharedFlow에,

아래와 같이 extraBufferCapacity를 설정해 줄 경우는 받을 수 있게 됩니다.

 

fun main() = runBlocking {
    val sharedFlow = MutableSharedFlow<Int>(extraBufferCapacity = 3)
    launch {
        for (i in 1..5) {
            delay(200) 
            sharedFlow.emit(i)
            println("방출된 값: $i")
        }
    }

    launch {
        sharedFlow.collect { value ->
            delay(500) // 위에 방출되는 emit보다 더 느림
            println("Collect에서 받은 값: $value")
        }
    }
}

 

 

위의 결과는 아래와 같습니다.

원래는 흘러가서 못 받아야 했던 값을 받아낸 것을 볼 수 있습니다.

 

방출된 값: 1
방출된 값: 2
방출된 값: 3
방출된 값: 4
Collect에서 받은 값: 1
방출된 값: 5
Collect에서 받은 값: 2
Collect에서 받은 값: 3
Collect에서 받은 값: 4
Collect에서 받은 값: 5

 

참고로, Buffer의 개수가 부족할 경우에는, 

기본적으로 emit 을 잠시 suspend 하게 됩니다.

위에서 값을 다 받을 수 있었던 것도, 버퍼가 3개였는데, 다 찼기 때문에 emit을 멈추어 주었기 때문입니다.

 

 

다만, 데이터의 실시간성이 중요한 경우는 이렇게 사용하면,

Buffer를 사용하는 것이 데이터의 시간차가 발생하므로 무조건 좋다고 만은 할 수 없습니다.

 

3. Replay

위에서 Buffer가 Collector들이 준비가 안되면,

데이터를 Buffer에 잠시 준비시켰다가,

준비가 되면 emit을 해 주었는데요.

 

Replay는 가장 최근의 데이터를 새로운 Collector 가 Collect를 시작할 때 보내줍니다.

위에서도 잠깐 보았지만, default값은 0으로 되어 있는데요.

새로운 Collector가 나타나면 최근 아이템을 말 그대로 Replay 해줄 수 있습니다.

 

val sharedFlow = MutableSharedFlow<Int>(replay = 2)

 

만약, Replay 공간이 꽉차게 되면 어떻게 될까요?

가장 오래된 값이 제거 되도록 되어 있습니다.

 

fun main() = runBlocking {
    val sharedFlow = MutableSharedFlow<Int>(replay = 2)
    launch {
        for (i in 1..3) {
            sharedFlow.emit(i)
            println("방출된 값: $i")
            delay(300)
        }
    }

    // 일부로 기다리게 하기
    delay(800)

    launch {
        sharedFlow.collect { value ->
            println("Collector가 받은 값: $value")
        }
    }
}

 

 

아래 보이는 것처럼, 

replay 없이는, delay 때문에 원래 3번째 아이템만 받았어야 하는데요.

replay덕분에, 최근의 2개의 받을 수 있게 되었습니다.

 

방출된 값: 1
방출된 값: 2
방출된 값: 3
Collector가 받은 값: 2
Collector가 받은 값: 3

 

위에서 모든 값을 받을 수 있었던,

Buffer를 사용한 것과는 다른 것을 볼 수 있습니다.

 

4. asSharedFlow

이 API는 캡슐화 하기 위해서 사용하는 확장함수인데요.

아래와 같이, MutableSharedFlow로 선언하고,

asSharedFlow를 사용해서 readOnly 타입의 SharedFlow로부터 Collector들이 값을 Collect 하도록 해 줍니다.

실제로는 이렇게 SharedFlow를 사용해야 하겠지요.

 

private val _sharedFlow = MutableSharedFlow<Int>(replay = 1)
val sharedFlow: SharedFlow<Int> = _sharedFlow.asSharedFlow()

 

 

5. tryEmit()

tryEmit 은 코루틴의 suspend 함수 없이도 값을 방출할 수 있게 해 줍니다.

Collect 하는 곳에서 Coroutine을 사용하기 어려운 경우도 있기 때문인데요.

SharedFlow 처럼 여러곳에서 Collect 해서 사용하는 API 라면 필요성을 느낄 법합니다.

 

emit() 과는 다르게, emit을 시도해서 성공하면 true 를 리턴해 줍니다.

 

 

그런데, emit을 하면 하는 것이지, 왜 false 하는 경우가 생기게 될까요?

아래에서는 buffer 사이즈가 꽉 차게 되서 실패를 할 수 있습니다.

 

fun main() = runBlocking {
    val sharedFlow = MutableSharedFlow<Int>(extraBufferCapacity = 2)

    launch {
        for (i in 1..5) {
            val result = sharedFlow.tryEmit(i)
            if (result) {
                println("방출 성공: $i")
            } else {
                println("값 방출 실패: $i")
            }
            delay(200)
        }
    }

    launch {
        sharedFlow.collect { value ->
            delay(500) // 시간 지연시키기
            println("Collector에서 받은 값: $value")
        }
    }
}

 

 

위 코드에서는 4,5는 값이 유실되었습니다.

버퍼가 2개로 꽉 차있었기 때문입니다.

원래는 버퍼가 꽉차면 emit을 suspend 한다고 나왔었는데요.

이 API는 suspend 가 작동하지 않습니다.

 

emit 은 suspend 함수인 반면, tryEmit은 아니라는 것을 알 수 있습니다.

 

 

무조건, Coroutine 없이도 값을 방출할 수 있게(emit) 해주는 함수라고 외우면 안되는 이유입니다.

위 예제 코드의 결과도 아래와 같이 2개의 값이 유실된 것을 볼 수 있습니다.

 

방출된 값: 1
방출된 값: 2
방출된 값: 3
방출실패: 4
방출실패: 5
Collector에서 받은 값: 1
Collector에서 받은 값: 2
Collector에서 받은 값: 3

 

SharedFlow는 Hot Stream 이기 때문에,

비동기 코드에서 다루기가 어려울 수 있는데요.

그래서 Buffer나 Replay 가 존재하는 것 이겠지요.

그런데, tryEmit을 사용하면 suspend가 동작하지 않게 되면서,

좀 더 생각할 것이 많아지게 됩니다.

 

이상으로 SharedFlow 에 대해서 정리해 보았습니다.

728x90