본문 바로가기
Android 개발/Coroutine , Flow, Channel

SharedFlow 에 대한 총정리 # Buffer Replay tryEmit Kotlin Coroutine

by Developer88 2023. 5. 4.
반응형

오늘은 Kotlin Coroutine의 SharedFlow 에 대해서 정리해 보도록 하겠습니다.

 

1. SharedFlow

SharedFlow 는 이름에서 알 수 있듯이,

Collector 가 여러개인 경우,

Collector 들이 emit 된 값들을 동시에 consume 할 수 있도록,

Share(공유)되는 Flow 의 API 입니다.

 

fun main() = runBlocking {
    val sharedFlow = MutableSharedFlow<Int>(replay = 1)
    launch {
        for (i in 1..5) {
            sharedFlow.emit(i)
            println("Emitted: $i")
        }
    }

    launch {
        sharedFlow.collect { value ->
            println("Collector 1에서 받은 값: $value")
        }
    }

    launch {
        sharedFlow.collect { value ->
            println("Collector 2에서 받은 값: $value")
        }
    }
}

 

 

2. Hot Stream 과 Buffer

SharedFlow는 HotFlow인데요.

데이터를 Consume 하는 곳과 무관하게,

데이터를 흘려보내는 API라는 것 입니다.

그래서 값이 생성되면 바로 emit 해 버립니다.

 

Hot Stream 과 Cold Stream에 대해서는 아래 글의 1-2를 참조해 주세요.

>> Kotlin Coroutine Flow 총정리 part3 # launchIn

 

그래서 collector 들이 준비가 안되어 있는데,

데이터가 생성되면 아무것도 받지 못하게 됩니다.

 

2-1. Buffer

그런데 SharedFlow에는 Buffer라는 것이 있어서,

만약 Collector 가 준비가 않되어서, 데이터를 받지 못하는 것을 방지할 수 있습니다.

아래 코드의 경우,

2번째 launch블록의 코드는 delay가 첫번째 launch 블록보다 더 길기 때문에,

원래는 Hot Stream 인 sharedFlow의 값을 받지 못합니다.

하지만, MutableSharedFlow에,

아래와 같이 extraBufferCapacity를 설정해 줄 경우는 받을 수 있게 됩니다.

 

fun main() = runBlocking {
    val sharedFlow = MutableSharedFlow<Int>(extraBufferCapacity = 3)
    launch {
        for (i in 1..5) {
            delay(200) 
            sharedFlow.emit(i)
            println("방출된 값: $i")
        }
    }

    launch {
        sharedFlow.collect { value ->
            delay(500) // 위에 방출되는 emit보다 더 느림
            println("Collect에서 받은 값: $value")
        }
    }
}

 

 

위의 결과는 아래와 같습니다.

원래는 흘러가서 못 받아야 했던 값을 받아낸 것을 볼 수 있습니다.

 

방출된 값: 1
방출된 값: 2
방출된 값: 3
방출된 값: 4
Collect에서 받은 값: 1
방출된 값: 5
Collect에서 받은 값: 2
Collect에서 받은 값: 3
Collect에서 받은 값: 4
Collect에서 받은 값: 5

 

참고로, Buffer의 개수가 부족할 경우에는, 

기본적으로 emit 을 잠시 suspend 하게 됩니다.

위에서 값을 다 받을 수 있었던 것도, 버퍼가 3개였는데, 다 찼기 때문에 emit을 멈추어 주었기 때문입니다.

 

 

다만, 데이터의 실시간성이 중요한 경우는 이렇게 사용하면,

Buffer를 사용하는 것이 데이터의 시간차가 발생하므로 무조건 좋다고 만은 할 수 없습니다.

 

3. Replay

위에서 Buffer가 Collector들이 준비가 안되면,

데이터를 Buffer에 잠시 준비시켰다가,

준비가 되면 emit을 해 주었는데요.

 

Replay는 가장 최근의 데이터를 새로운 Collector 가 Collect를 시작할 때 보내줍니다.

위에서도 잠깐 보았지만, default값은 0으로 되어 있는데요.

새로운 Collector가 나타나면 최근 아이템을 말 그대로 Replay 해줄 수 있습니다.

 

val sharedFlow = MutableSharedFlow<Int>(replay = 2)

 

만약, Replay 공간이 꽉차게 되면 어떻게 될까요?

가장 오래된 값이 제거 되도록 되어 있습니다.

 

fun main() = runBlocking {
    val sharedFlow = MutableSharedFlow<Int>(replay = 2)
    launch {
        for (i in 1..3) {
            sharedFlow.emit(i)
            println("방출된 값: $i")
            delay(300)
        }
    }

    // 일부로 기다리게 하기
    delay(800)

    launch {
        sharedFlow.collect { value ->
            println("Collector가 받은 값: $value")
        }
    }
}

 

 

아래 보이는 것처럼, 

replay 없이는, delay 때문에 원래 3번째 아이템만 받았어야 하는데요.

replay덕분에, 최근의 2개의 받을 수 있게 되었습니다.

 

방출된 값: 1
방출된 값: 2
방출된 값: 3
Collector가 받은 값: 2
Collector가 받은 값: 3

 

위에서 모든 값을 받을 수 있었던,

Buffer를 사용한 것과는 다른 것을 볼 수 있습니다.

 

4. asSharedFlow

이 API는 캡슐화 하기 위해서 사용하는 확장함수인데요.

아래와 같이, MutableSharedFlow로 선언하고,

asSharedFlow를 사용해서 readOnly 타입의 SharedFlow로부터 Collector들이 값을 Collect 하도록 해 줍니다.

실제로는 이렇게 SharedFlow를 사용해야 하겠지요.

 

private val _sharedFlow = MutableSharedFlow<Int>(replay = 1)
val sharedFlow: SharedFlow<Int> = _sharedFlow.asSharedFlow()

 

 

5. tryEmit()

tryEmit 은 코루틴의 suspend 함수 없이도 값을 방출할 수 있게 해 줍니다.

Collect 하는 곳에서 Coroutine을 사용하기 어려운 경우도 있기 때문인데요.

SharedFlow 처럼 여러곳에서 Collect 해서 사용하는 API 라면 필요성을 느낄 법합니다.

 

emit() 과는 다르게, emit을 시도해서 성공하면 true 를 리턴해 줍니다.

 

 

그런데, emit을 하면 하는 것이지, 왜 false 하는 경우가 생기게 될까요?

아래에서는 buffer 사이즈가 꽉 차게 되서 실패를 할 수 있습니다.

 

fun main() = runBlocking {
    val sharedFlow = MutableSharedFlow<Int>(extraBufferCapacity = 2)

    launch {
        for (i in 1..5) {
            val result = sharedFlow.tryEmit(i)
            if (result) {
                println("방출 성공: $i")
            } else {
                println("값 방출 실패: $i")
            }
            delay(200)
        }
    }

    launch {
        sharedFlow.collect { value ->
            delay(500) // 시간 지연시키기
            println("Collector에서 받은 값: $value")
        }
    }
}

 

 

위 코드에서는 4,5는 값이 유실되었습니다.

버퍼가 2개로 꽉 차있었기 때문입니다.

원래는 버퍼가 꽉차면 emit을 suspend 한다고 나왔었는데요.

이 API는 suspend 가 작동하지 않습니다.

 

emit 은 suspend 함수인 반면, tryEmit은 아니라는 것을 알 수 있습니다.

 

 

무조건, Coroutine 없이도 값을 방출할 수 있게(emit) 해주는 함수라고 외우면 안되는 이유입니다.

위 예제 코드의 결과도 아래와 같이 2개의 값이 유실된 것을 볼 수 있습니다.

 

방출된 값: 1
방출된 값: 2
방출된 값: 3
방출실패: 4
방출실패: 5
Collector에서 받은 값: 1
Collector에서 받은 값: 2
Collector에서 받은 값: 3

 

SharedFlow는 Hot Stream 이기 때문에,

비동기 코드에서 다루기가 어려울 수 있는데요.

그래서 Buffer나 Replay 가 존재하는 것 이겠지요.

그런데, tryEmit을 사용하면 suspend가 동작하지 않게 되면서,

좀 더 생각할 것이 많아지게 됩니다.

 

이상으로 SharedFlow 에 대해서 정리해 보았습니다.

728x90

댓글