ABOUT ME

-

Today
-
Yesterday
-
Total
-
  • [Android] Kotlin Coroutines Flow 2주차 (1/2)
    Android 2021. 8. 11. 17:58
    반응형

    Asynchronous Flow

    suspend 함수는 비동기적으로 동작하고 단일 값을 반환합니다.

    Flow는 비동기적으로 여러 개의 값을 반환받고 싶을 때 사용하는 코루틴 빌더입니다.

    Representing multiple values

    Collections을 사용하여 여러 개의 값을 반환할 수 있습니다.

    fun simple(): List<Int> = listOf(1, 2, 3)
     
    fun main() {
        simple().forEach { value -> println(value) } 
    }
    
    // 1
    // 2
    // 3

    Sequences

    kotlin builder인 sequence를 사용하여 100ms 간격으로 숫자를 찍습니다.

    Thread.sleep을 사용하기 때문에 Thread가 blocking 됩니다.

    fun simple(): Sequence<Int> = sequence { // sequence builder
        for (i in 1..3) {
            Thread.sleep(100) // pretend we are computing it
            yield(i) // yield next value
        }
    }
    
    fun main() {
        simple().forEach { value -> println(value) } 
    }

    Suspending functions

    suspend 키워드를 사용하여 스레드가 차단되는 걸 방지합니다.

    suspend fun simple(): List<Int> {
        delay(1000) // pretend we are doing something asynchronous here
        return listOf(1, 2, 3)
    }
    
    fun main() = runBlocking<Unit> {
        simple().forEach { value -> println(value) } 
    }

    Flows

    List<Int>는 모든 계산이 끝난 다음에 한 번에 결과를 반환하지만

    Flow는 비동기적으로 계산하고 끝날 때 마다 하나씩 stream형태로 값을 반환합니다.

    import kotlinx.coroutines.*
    import kotlinx.coroutines.flow.*
    
                  
    fun simple(): Flow<Int> = flow { // flow builder
        for (i in 1..3) {
            delay(100) // pretend we are doing something useful here
            emit(i) // emit next value
        }
    }
    
    fun main() = runBlocking<Unit> {
        // Launch a concurrent coroutine to check if the main thread is blocked
        launch {
            for (k in 1..3) {
                println("I'm not blocked $k")
                delay(100)
            }
        }
        // Collect the flow
        simple().collect { value -> println(value) } 
    }
    
    // I'm not blocked 1
    // 1
    // I'm not blocked 2
    // 2
    // I'm not blocked 3
    // 3

    스레드가 blocking 되지 않아서 launch 블록과 flow 블록이 번갈아가면서 출력되는 것을 확인할 수 있습니다.

    • Flow 타입의 빌더 함수를 flow라고 부릅니다.
    • flow 블록 내의 코드는 일시중지 될 수 있습니다.
    • 더 이상 suspend 키워드를 안 써도 됩니다.
    • emit 함수를 사용하여 flow를 방출합니다.
    • collect 함수를 사용하여 flow를 수집합니다.

    Flows are cold

    Flowcold stream 입니다.

    flow builder 내부의 코드는 flow가 collect 될 때까지 실행되지 않고, 실행되면 처음부터 끝까지 값을 전부 방출합니다.

    Flow는 내부적으로 코 루틴을 사용하고 있기 때문에 suspend 키워드를 안 붙이고 사용할 수 있습니다.

    fun simple(): Flow<Int> = flow { 
        println("Flow started")
        for (i in 1..3) {
            delay(100)
            emit(i)
        }
    }
    
    fun main() = runBlocking<Unit> {
        println("Calling simple function...")
        val flow = simple()
        println("Calling collect...")
        flow.collect { value -> println(value) } 
        println("Calling collect again...")
        flow.collect { value -> println(value) } 
    }
    
    // Calling simple function...
    // Calling collect...
    // Flow started
    // 1
    // 2
    // 3
    // Calling collect again...
    // Flow started
    // 1
    // 2
    // 3
    simple()은 빠르게 반환하고 아무것도 기다리지 않습니다.
    
    flow는 collect 할때 마다 시작됩니다.

    Flow cancellation basics

    Flow는 coroutine cancellation을 따릅니다.

    Flow collection은 delay 같은 cancel이 가능한 일시 중지 함수가 호출되면 일시 중지되면 취소될 수 있습니다.

    launch 나 async 같이 연속적인 작업 같은 경우에는 취소되지 않습니다.

    fun simple(): Flow<Int> = flow { 
        for (i in 1..3) {
            delay(100)          
            println("Emitting $i")
            emit(i)
        }
    }
    
    fun main() = runBlocking<Unit> {
        withTimeoutOrNull(250) { // Timeout after 250ms 
            simple().collect { value -> println(value) } 
        }
        println("Done")
    }
    
    // Emitting 1
    // 1
    // Emitting 2
    // 2
    // Done
    
    250ms 뒤에 타임 아웃돼서 2까지만 출력하고 끝이 난다.

    Flow builders

    • 값이 고정되어 있으면 flowOf builder를 이용합니다.
    • 여러 Collection들은. asFlow()를 사용하여 flow로 변경이 가능합니다.
      (1..3).asFlow().collect { value -> println(value) }​

    Intermediate flow operators

    연산자를 사용하여 flow를 변환할 수 있습니다.

    중간 연산자는 다음과 같은 특징이 있습니다.

    • upstream flow에 적용되고 downstream flow를 반환합니다.
    • cold 하게 동작합니다.
    • suspend 함수가 아닙니다.
    • 매우 빠르게 작동하고 새로운 flow를 반환합니다.
    • map과 filter와 같이 collection과 동일하지만 가장 큰 차이점은 연산자 내부 코드 블록에서 suspend 함수를
      호출할 수 있다는 점입니다.
    suspend fun performRequest(request: Int): String {
        delay(1000) // imitate long-running asynchronous work
        return "response $request"
    }
    
    fun main() = runBlocking<Unit> {
        (1..3).asFlow() // a flow of requests
            .map { request -> performRequest(request) }
            .collect { response -> println(response) }
    }
    
    // response 1
    // response 2
    // response 3
    
    delay 함수가 map 안에서 적용돼서 1초에 한번씩 출력합니다.

    Transform operator

    flow 변환 연산자 중 transform 함수는 map이나 filter와 같이 간단하게 값을 변환할 수 있고,
    복잡한 변환을 수행할 수도 있습니다.

    transform 연산자를 사용하여 임의의 값을 여러 번 반복하여 방출할 수도 있습니다.

    suspend fun performRequest(request: Int): String {
        delay(1000) // imitate long-running asynchronous work
        return "response $request"
    }
    
    fun main() = runBlocking<Unit> {
        (1..3).asFlow() // a flow of requests
            .transform { request ->
                emit("Making request $request") 
                emit(performRequest(request)) 
            }
            .collect { response -> println(response) }
    }
    
    // Making request 1
    // response 1
    // Making request 2
    // response 2
    // Making request 3
    // response 3

    Size-limiting operators

    take는 제한된 개수까지만 flow를 실행하고 cancel 합니다.

    Cancellation은 항상 예외를 던져 수행되므로 모든 리소스 관리 기능 (ex : try catch문) 들은

    cancel시 정상적으로 작동합니다.

    fun numbers(): Flow<Int> = flow {
        try {                          
            emit(1)
            emit(2) 
            println("This line will not execute")
            emit(3)    
        } finally {
            println("Finally in numbers")
        }
    }
    
    fun main() = runBlocking<Unit> {
        numbers() 
            .take(2) // take only the first two
            .collect { value -> println(value) }
    }
    
    // 1
    // 2
    // Finally in numbers
    
    take 함수로 개수를 2개로 제한했으므로 1,2를 방출하고 cancel 됩니다.
    cancel 시켜도 try catch문은 정상적으로 작동하므로 finally 블록이 실행 됩니다.

    Terminal flow operators

    Terminal 연산자는 collection을 시작시키는 suspend 함수입니다.

    collect는 가장 기본적인 연산자고 그 외에 다른 Terminal 연산자는 다음과 같습니다.

    • toList 또는 toSet은 flow를 MutableList 나 MutableSet으로 변환합니다.
    • first는 flow의 첫 번째 값만 방출 하고 나머지는 cancel 시킵니다.
    • reduce는 첫번째 값에 주어진 연산자를 이용하여 누적시켜 최종 값을 방출합니다.
    • fold는 초기값을 입력받아 주어진 연산자를 이용하여 누적시켜 최종값을 방출합니다.
    val sum = (1..5).asFlow()
        .map { it * it } // squares of numbers from 1 to 5                           
        .reduce { a, b -> a + b } // sum them (terminal operator)
    println(sum)
    
    // 55
    
    map을 사용하여 flow의 값이 {1, 4, 9, 16, 25}로 바뀌고 reduce를 사용하여
    ((((1 + 4) + 9) + 16) + 25)를 하여 55를 방출한다.

    Flows are sequential

    flow는 multiple flow로 동작하게 하는 연산자를 사용하지 않는 이상 순차적으로 동작합니다.

    일반적으로 terminal 연산자를 호출하는 코루틴에서 바로 수행되고, 새로운 코 루틴을 생성해서 사용하지 않습니다.

    각각 방출된 값은 모든 중간 연산자가 upstream에서 downstream으로 처리한 후 terminal 연산자에게 전달합니다.

    (1..5).asFlow()
        .filter {
            println("Filter $it")
            it % 2 == 0              
        }              
        .map { 
            println("Map $it")
            "string $it"
        }.collect { 
            println("Collect $it")
        }
    
    // Filter 1
    // Filter 2
    // Map 2
    // Collect string 2
    // Filter 3
    // Filter 4
    // Map 4
    // Collect string 4
    // Filter 5
    
    순차적으로 연산자를 처리하기 때문에
    filter를 수행하다가 true 조건이 발생하면 map으로 변환시키고 terminal 연산자인 
    collect를 통해 값을 출력하고 위와 같은 순서를 5가 나올때 까지 반복한다.

    Flow context

    flow Collection은 항상 호출하는 코루틴의 context에서 수행하는 것을 context preservation라고 합니다

    withContext(context) {
        simple().collect { value ->
            println(value) // run in the specified context
        }
    }
    fun simple(): Flow<Int> = flow {
        log("Started simple flow")
        for (i in 1..3) {
            emit(i)
        }
    }  
    
    fun main() = runBlocking<Unit> {
        simple().collect { value -> log("Collected $value") } 
    }
    
    // [main @coroutine#1] Started simple flow
    // [main @coroutine#1] Collected 1
    // [main @coroutine#1] Collected 2
    // [main @coroutine#1] Collected 3
    
    simple().collect가 main 스레드에서 호출되기 때문에 
    simple()의 내부 블록도 메인 스레드에서 호출된다.

    Wrong emission withContext

    long-running CPU-consuming 코드는 Dispatchers.Default의 context에서 실행해야 하고

    UI 업데이트는 Dispatchers.Main의 context에서 실행해야 합니다.

    일반적으로 withContext를 사용하여 코루틴 코드의 context를 변경하지만

    flow에서는 context preservation으로 인해 방출하는 context와 수신하는 context를 다르게 하지 못합니다.

    fun simple(): Flow<Int> = flow {
        // The WRONG way to change context for CPU-consuming code in flow builder
        kotlinx.coroutines.withContext(Dispatchers.Default) {
            for (i in 1..3) {
                Thread.sleep(100) // pretend we are computing it in CPU-consuming way
                emit(i) // emit next value
            }
        }
    }
    
    fun main() = runBlocking<Unit> {
        simple().collect { value -> println(value) } 
    }
    
    // Exception in thread "main" java.lang.IllegalStateException: 
    // Flow invariant is violated:Flow was collected in 
    // [CoroutineId(1), "coroutine#1":
    // BlockingCoroutine{Active}@5511c7f8, BlockingEventLoop@2eac3323],
    // but emission happened in [CoroutineId(1), "coroutine#1":
    // DispatchedCoroutine{Active}@2dae0000, Dispatchers.Default].
    // Please refer to 'flow' documentation or use 'flowOn' instead at ...
    
    simple().collect를 실행한 스레드는 main 스레드 이지만
    simple() 내부에서 withContext를 사용하여 Dispatchers.Default 스레드로 변경했기 때문에
    context 보존 법칙으로 인해 오류가 발생한다.

    flowOn operator

    withContext 대신 flowOn 연산자를 사용하여 방출하는 부분의 context를 변경해 줄 수 있습니다.

    fun simple(): Flow<Int> = flow {
        for (i in 1..3) {
            Thread.sleep(100) // pretend we are computing it in CPU-consuming way
            log("Emitting $i")
            emit(i) // emit next value
        }
    }.flowOn(Dispatchers.Default) // RIGHT way to change context for CPU-consuming code in flow builder
    
    fun main() = runBlocking<Unit> {
        simple().collect { value ->
            log("Collected $value") 
        } 
    }
    // [DefaultDispatcher-worker-1 @coroutine#2] Emitting 1
    // [main @coroutine#1] Collected 1
    // [DefaultDispatcher-worker-1 @coroutine#2] Emitting 2
    // [main @coroutine#1] Collected 2
    // [DefaultDispatcher-worker-1 @coroutine#2] Emitting 3
    // [main @coroutine#1] Collected 3
    
    withContext를 썼을때와는 다르게 collect는 main 스레드에서,
    simple()은 Default에서 잘 동작하고 있는것을 볼 수 있다.

    flowOn은 기본적으로 하나의 코루틴이 방출과 수집을 순차적으로 처리하는 flow를 변경시킵니다.

    따라서 방출과 수집이 각각의 코 루틴에서 동시에 실행됩니다.

    flow를 사용할 때에는 기본적으로 새로운 코루틴을 만들어서 실행하지 않았지만

    위의 예제를 예로 들었을 때 coroutines1, coroutines2 이렇게 다른 코루틴에서 수행되는 것입니다.

    물론 스레드도 변경해주었기에 스레드도 각각 다른 곳에서 수행하고 있습니다.

    Buffering

    flow에서 방출하는 부분과 수집하는 부분 중 한쪽이나 양쪽 다 속도가 느리다면

    두 부분을 분리해서 처리하여 처리 시간을 향상할 수 있습니다.

    fun simple(): Flow<Int> = flow {
        for (i in 1..3) {
            delay(100) // pretend we are asynchronously waiting 100 ms
            emit(i) // emit next value
        }
    }
    
    fun main() = runBlocking<Unit> { 
        val time = measureTimeMillis {
            simple().collect { value -> 
                delay(300) // pretend we are processing it for 300 ms
                println(value) 
            } 
        }   
        println("Collected in $time ms")
    }
    
    숫자 하나를 처리하는데 400ms가 걸린다.

    buffer 연산자를 사용하여 순차적으로 실행하는 대신 pipeline을 만들어 방출과 수집을 동시에 처리할 수 있습니다.

    fun simple(): Flow<Int> = flow {
        for (i in 1..3) {
            delay(100) // pretend we are asynchronously waiting 100 ms
            emit(i) // emit next value
        }
    }
    
    fun main() = runBlocking<Unit> { 
        val time = measureTimeMillis {
            simple()
                .buffer() // buffer emissions, don't wait
                .collect { value -> 
                    delay(300) // pretend we are processing it for 300 ms
                    println(value) 
                } 
        }   
        println("Collected in $time ms")
    }
    
    방출하는 부분에 buffer를 만들어 동시에 동작하게 만들어서 처리 시간을 감소 시킨다.
    처음 데이터가 발생하는데 100ms 시간과 collect의 300ms * 3을 합쳐 1000ms 시간이 걸린다.

    flowOn 연산자도 CoroutineDispatcher를 변경할 때 buffer mechanism을 사용합니다.

    Conflation

    flow가 연산의 중간값이나 상태 값의 업데이트를 나타내는 경우 각 값을 처리할 필요 없이

    가장 최근 값만 처리할 수 있습니다.

    conflate 연산자는 collector가 너무 느리면 중간 값을 건너뛸 수 있습니다.

    fun simple(): Flow<Int> = flow {
        for (i in 1..3) {
            delay(100) // pretend we are asynchronously waiting 100 ms
            emit(i) // emit next value
        }
    }
    
    fun main() = runBlocking<Unit> { 
        val time = measureTimeMillis {
            simple()
                .conflate() // conflate emissions, don't process each one
                .collect { value -> 
                    delay(300) // pretend we are processing it for 300 ms
                    println(value) 
                } 
        }   
        println("Collected in $time ms")
    }
    
    // 1
    // 3
    // Collected in 746 ms
    
    1번이 처리되고 있는 동안 2번과 3번이 이미 생산되고 중간 값인 2번을 생략하고 
    가장 최근인 3번만 collector에 전달 됐다.

    Processing the latest value

    Conflation은 emitter, collector 둘 다 느릴 때 방출 값을 삭제하여 처리 속도를 높이는 방법입니다.

    느린 collector가 처리하기 전에 다른 값을 전달받으면 이런 collector를 cancel 하고 새로운 값을 처리하도록 재 시작하는 방법도 있습니다.

    collectLatest() 연산자를 사용하면 collector 동작중 새로운 값이 방출되어 전달 받으면 기존 collect()를 cancel하고
    새로운 값을 위한 collector를 재시작합니다.

    xxx로 시작하는 Latest() 연산자 들은 동일하게 기존 동작을 cancel 하고 최근 값을 처리합니다.

    fun simple(): Flow<Int> = flow {
        for (i in 1..3) {
            delay(100) // pretend we are asynchronously waiting 100 ms
            emit(i) // emit next value
        }
    }
    
    fun main() = runBlocking<Unit> { 
        val time = measureTimeMillis {
            simple()
                .collectLatest { value -> // cancel & restart on the latest value
                    println("Collecting $value") 
                    delay(300) // pretend we are processing it for 300 ms
                    println("Done $value") 
                } 
        }   
        println("Collected in $time ms")
    
    // Collecting 1
    // Collecting 2
    // Collecting 3
    // Done 3
    Collected in 741 ms
    
    100ms 마다 새로운 값을 보냈지만 마지막 값에 대해서만 완료된다.

    Composing multiple flows

    Zip

    두 개의 flow를 병합시켜 줍니다.

    원소의 개수가 다르다면 적은 원소의 개수에 맞춰 병합됩니다.

    앞쪽 원소부터 순서대로 병합합니다.

    val nums = (1..3).asFlow() // numbers 1..3
    val strs = flowOf("one", "two", "three") // strings 
    nums.zip(strs) { a, b -> "$a -> $b" } // compose a single string
        .collect { println(it) } // collect and print
    
    // 1 -> one
    // 2 -> two
    // 3 -> three
    {1, 2, 3} flow 와 {one, two, three} flow를 { 1 -> one } 형식의 하나의 string flow로 결합

    Combine

    flow가 conflation처럼 최신 값이나 최종 연산 값만 사용하는 형태라면,

    현재 flow에서 최신 값 기준으로 연산하는 작업을 수행합니다.

    두 flow가 서로 다른 타이밍에 방출될 때, 최신 값 기준으로 방출 값을 연산하는 것입니다.

    fun main() = runBlocking<Unit> { 
    
        val nums = (1..3).asFlow().onEach { delay(300) } // numbers 1..3 every 300 ms
        val strs = flowOf("one", "two", "three").onEach { delay(400) } // strings every 400 ms
        val startTime = System.currentTimeMillis() // remember the start time 
        nums.zip(strs) { a, b -> "$a -> $b" } // compose a single string with "zip"
            .collect { value -> // collect and print 
                println("$value at ${System.currentTimeMillis() - startTime} ms from start") 
            } 
    }
    
    1 -> one at 426 ms from start
    2 -> two at 826 ms from start
    3 -> three at 1228 ms from start
    val nums = (1..3).asFlow().onEach { delay(300) } // numbers 1..3 every 300 ms
    val strs = flowOf("one", "two", "three").onEach { delay(400) } // strings every 400 ms          
    val startTime = System.currentTimeMillis() // remember the start time 
    nums.combine(strs) { a, b -> "$a -> $b" } // compose a single string with "combine"
        .collect { value -> // collect and print 
            println("$value at ${System.currentTimeMillis() - startTime} ms from start") 
        }
    
    1 -> one at 433 ms from start 
    2 -> one at 636 ms from start
    2 -> two at 835 ms from start
    3 -> two at 937 ms from start
    3 -> three at 1236 ms from start
    
    300ms 1 방출
    400ms one 방출
    600ms 2 방출
    800ms two 방출
    900ms 3 방출
    1200ms three 방출
    
    첫번째 줄은 동일하게 출력된다.
    두번째 줄은 2가 방출됐을때 two는 800ms에 방출되므로 최신값은 아직 one이다
    따라서 2 -> one이 출력된다.
    800ms가 지났을때 드디어  3번째 줄인 2 -> two가 출력된다.
    900ms가 지나 3이 방출됐을때 아직 three가 방출되지 않았으므로
    4번째줄은 3 -> two이 출력된다.
    1200ms가 지났을때 5번째 줄인 3 -> three가 출력된다.

    Flattening flows

    flow는 비동기적으로 sequences 값을 전달받으므로 각 원소가 다시 다른 sequence를 요청하는 상황이 빈번합니다.

    예를 들어 Flow <Flow <String>>형태를 반환하는 상황입니다.

    fun requestFlow(i: Int): Flow<String> = flow {
        emit("$i: First")
        delay(500) // wait 500 ms
        emit("$i: Second")
    }
    // flowOf(1,2,3).map { requestFlow(it) } 동일
    (1..3).asFlow().map { requestFlow(it) } // Flow<Flow<String>>

    collection과 유사하지만 비동기적인 특성을 고려하기 위한 flatten 관련 연산자를 지원합니다.

    flatMapConcat

    flatMapConcat과 flattenConcat은 flow를 연결하는 연산자입니다.

    이 연산자들은 내부 flow가 완료되어야만 외부 collect를 수행합니다.

    fun requestFlow(i: Int): Flow<String> = flow {
        emit("$i: First") 
        delay(500) // wait 500 ms
        emit("$i: Second")    
    }
    
    fun main() = runBlocking<Unit> { 
        val startTime = System.currentTimeMillis() // remember the start time 
        (1..3).asFlow().onEach { delay(100) } // a number every 100 ms 
            .flatMapConcat { requestFlow(it) }                                                                           
            .collect { value -> // collect and print 
                println("$value at ${System.currentTimeMillis() - startTime} ms from start") 
            } 
    // 1: First at 121 ms from start
    // 1: Second at 622 ms from start
    // 2: First at 727 ms from start
    // 2: Second at 1227 ms from start
    // 3: First at 1328 ms from start
    // 3: Second at 1829 ms from start
    
    (1..3) flow가 100ms 간격으로 방출된다.
    하지만 requestFlow(it)을 수행하므로 Flow<Flow<String>> 형태가 되지만
    flatMapConcat을 사용하여 Flow<String>으로 만들어준다.
    1의 flow가 실행되고 1의 내부인 requestFlow(1)이 다 실행된 다음
    2의 flow가 실행되는 형태를 띈다.

    flatMapMerge

    flatMapMerge 또는 flattenMerge는 동시에 방출 가능한 값들을 방출시키고 하나의 flow로 병합하여 수집합니다.

    매개변수로 concurrency를 받을 수 있습니다.

    concurrency는 기본 값이 DEFAULT_CONCURRENCY로 돼 있고 동시에 받을 수 있는 flow의 개수를 제한하는 값입니다.

    fun requestFlow(i: Int): Flow<String> = flow {
        emit("$i: First") 
        delay(500) // wait 500 ms
        emit("$i: Second")    
    }
    
    fun main() = runBlocking<Unit> { 
        val startTime = System.currentTimeMillis() // remember the start time 
        (1..3).asFlow().onEach { delay(100) } // a number every 100 ms 
            .flatMapMerge { requestFlow(it) }                                                                           
            .collect { value -> // collect and print 
                println("$value at ${System.currentTimeMillis() - startTime} ms from start") 
            } 
    }
    // 1: First at 136 ms from start
    // 2: First at 231 ms from start
    // 3: First at 333 ms from start
    // 1: Second at 639 ms from start
    // 2: Second at 732 ms from start
    // 3: Second at 833 ms from start
    
    외부 flow와 내부 flow가 동시에 실행되면서 하나의 flow로 합친다.

    flatMapLatest

    collectLatest와 유사하게 방출이 되면 이전에 대기 중이거나 동작중인 flow를 cancel 시킵니다.

    fun requestFlow(i: Int): Flow<String> = flow {
        emit("$i: First") 
        delay(500) // wait 500 ms
        emit("$i: Second")    
    }
    
    fun main() = runBlocking<Unit> { 
        val startTime = System.currentTimeMillis() // remember the start time 
        (1..3).asFlow().onEach { delay(100) } // a number every 100 ms 
            .flatMapLatest { requestFlow(it) }                                                                           
            .collect { value -> // collect and print 
                println("$value at ${System.currentTimeMillis() - startTime} ms from start") 
            } 
    }
    
    // 1: First at 142 ms from start
    // 2: First at 322 ms from start
    // 3: First at 425 ms from start
    // 3: Second at 931 ms from start

    Flow exceptions

    flow collection은 emitter나 다른 operator에서 발생하는 예외로 완료 처리될 수 있습니다.

    Collector try and catch

    fun simple(): Flow<Int> = flow {
        for (i in 1..3) {
            println("Emitting $i")
            emit(i) // emit next value
        }
    }
    
    fun main() = runBlocking<Unit> {
        try {
            simple().collect { value ->         
                println(value)
                check(value <= 1) { "Collected $value" }
            }
        } catch (e: Throwable) {
            println("Caught $e")
        } 
    }
    
    // Emitting 1
    // 1
    // Emitting 2
    // 2
    // Caught java.lang.IllegalStateException: Collected 2
    
    순서대로 처리하다가 value가 2일때 check(value <= 1)이 성립 안되므로 예외가 발생한다.

    Exception transparency

    Flow는 예외에 관하여 투명해야 하지만, try-catch문 내부에서 flow builder가 값을 방출하는 것은 Exception 투명성에
    어긋납니다. 왜냐하면 예외를 던지는 collector가 try/catch를 사용하여 항상 예외를 catch 할 수 있기 때문입니다.

    emitter는 예외 투명성을 보존하고 예외 처리의 캡슐화를 허용하는 catch 연산자를 사용할 수 있습니다.

    catch 연산자의 body는 예외를 분석하고 다음과 같은 다양한 방식으로 대응할 수 있습니다.

    • 예외를 throw를 사용하여 rethrown 할 수 있습니다.
    • catch의 body에서 emit 함수를 사용하여 값을 방출할 수 있습니다.
    • 무시하거나, 로그를 남기거나 다른 처리를 하는 코드를 삽입할 수 있습니다.
      fun simple(): Flow<String> = 
          flow {
              for (i in 1..3) {
                  println("Emitting $i")
                  emit(i) // emit next value
              }
          }
          .map { value ->
              check(value <= 1) { "Crashed on $value" }                 
              "string $value"
          }
      
      fun main() = runBlocking<Unit> {
          simple()
              .catch { e -> emit("Caught $e") } // emit on exception
              .collect { value -> println(value) }
      }
      // Emitting 1
      // string 1
      // Emitting 2
      // Caught java.lang.IllegalStateException: Crashed on 2
      
      방출하는 값이 2일때 1보다 크므로 check(value <= 1)에서 Crashed on 2가 되고
      그것을 catch하여 Caught Crashed on 2를 방출한 것을 출력한다.​

    더 이상 try/catch문이 없더라도 동일하게 작동합니다.

    Transparent catch

    예외 투명성을 지키는 catch 중간 연산자는 오직 upstream 예외만 catch 합니다.

    fun simple(): Flow<Int> = flow {
        for (i in 1..3) {
            println("Emitting $i")
            emit(i)
        }
    }
    
    fun main() = runBlocking<Unit> {
        simple()
            .catch { e -> println("Caught $e") } // does not catch downstream exceptions
            .collect { value ->
                check(value <= 1) { "Collected $value" }                 
                println(value) 
            }
    }
    // Emitting 1
    // 1
    // Emitting 2
    // Collected 2
    
    catch문이 collect 보다 위에 있으므로 catch보다 아래있는 downstream에서 예외가 발생해도
    catch 하지 않는다.

    Catching declaratively

    모든 예외를 catch로 다루기 위해서는 collect의 body를 onEach로 옮기고 그 이후에 catch를 연결하면 전체 구문의 외부에 try-catch를 하지 않아도 동일한 처리를 할 수 있습니다.

    fun simple(): Flow<Int> = flow {
        for (i in 1..3) {
            println("Emitting $i")
            emit(i)
        }
    }
    
    fun main() = runBlocking<Unit> {
        simple()
            .onEach { value ->
                check(value <= 1) { "Collected $value" }                 
                println(value) 
            }
            .catch { e -> println("Caught $e") }
            .collect()
    }
    
    // Emitting 1
    // 1
    // Emitting 2
    // Caught java.lang.IllegalStateException: Collected 2

    Flow completion

    flow collection이 정상적으로나 예외적으로 완료되면 특정한 작업을 실행해야 할 수 있습니다.

    명령형 또는 선언형 두 가지 방법이 있습니다.

    Imperative finally block

    try-catch문 외에도 collector는 finally 블록을 사용하여 collect 완료 시 블록 내부의 작업을 실행할 수 있습니다.

    fun simple(): Flow<Int> = (1..3).asFlow()
    
    fun main() = runBlocking<Unit> {
        try {
            simple().collect { value -> println(value) }
        } finally {
            println("Done")
        }
    }
    // 1
    // 2
    // 3
    // Done

    Declarative handling

    선언형 방식으로는 flow가 종료됐을 때 onCompletion 중간 연산자를 호출하면 됩니다.

    fun simple(): Flow<Int> = (1..3).asFlow()
    
    fun main() = runBlocking<Unit> {
        simple()
            .onCompletion { println("Done") }
            .collect { value -> println(value) }
    }
    
    // 1
    // 2
    // 3
    // Done
    
    try-finally와 동일하게 출력되는 것을 볼 수 있다.

    onCompletion의 큰 장점은 람다식에서 nullable 한 Throwable을 param으로 넘겨주기 때문에 collect의 완료가
    정상적으로 되었는지 예외가 발생했는지 판단할 수 있습니다.

    fun simple(): Flow<Int> = flow {
        emit(1)
        throw RuntimeException()
    }
    
    fun main() = runBlocking<Unit> {
        simple()
            .onCompletion { cause -> if (cause != null) println("Flow completed exceptionally") }
            .catch { cause -> println("Caught exception") }
            .collect { value -> println(value) }
    }
    // 1
    // Flow completed exceptionally
    // Caught exception
    
    1을 방출 했을때는 정상적으로 동작했으므로
    Flow completed exceptionally을 출력했고
    그다음 예외를 던졌을때 cause가 null이 아니므로 onCompletion의 구문이 출력되지 않고
    예외를 아래 catch로 전달하여 catch구문에서 Caught exception을 출력한다.

    onCompletion은 예외를 다루지 않고 downstream으로 전달합니다.

    Successful completion

    catch 연산자와 또 다른 차이점은 onCompletion은 모든 예외를 확인하고 upstream flow가 cancell이나 failure 없이 성공적으로 완료한 경우에만 null 예외를 수신한다는 것입니다.

    fun simple(): Flow<Int> = (1..3).asFlow()
    
    fun main() = runBlocking<Unit> {
        simple()
            .onCompletion { cause -> println("Flow completed with $cause") }
            .collect { value ->
                check(value <= 1) { "Collected $value" }                 
                println(value) 
            }
    }
    
    // 1
    // Flow completed with Collected 2
    // Exception in thread "main" java.lang.IllegalStateException: Collected 2

    Imperative versus declarative

    명시적이든 암시적이든 어떤 방법이 더 좋다고 할 수 없습니다.

    따라서 본인의 코딩 스타일이다 기호에 따라 선택해서 사용하면 됩니다.

    Launching flow

    어떤 소스에서 전달되는 비동기적인 이벤트를 flow를 사용하여 쉽게 표현할 수 있습니다.

    수신하는 쪽에서 이벤트를 발생할 때마다 특정 동작을 처리하도록 하고 처리 후 다른 코드를 계속 진행되도록 하기 위해서 addEventListener 같은 기능이 필요합니다.

    onEach 연산자가 이러한 역할을 합니다.

    onEach 연산자는 중간 연산자 이므로 flow를 수집할 terminal 연산자도 필요합니다.

    terminal 연산자를 호출하지 않으면 onEach를 호출하는 것 만으로는 아무런 효과가 없습니다.

    // Imitate a flow of events
    fun events(): Flow<Int> = (1..3).asFlow().onEach { delay(100) }
    
    fun main() = runBlocking<Unit> {
        events()
            .onEach { event -> println("Event: $event") }
            .collect() // <--- Collecting the flow waits
        println("Done")
    }
    
    // Event : 1
    // Event : 2
    // Event : 3
    // Done

    collect 함수가 호출되면 모든 작업이 끝날 때까지 대기합니다.

    이벤트 리스너를 등록하는 이유는 코드는 계속 진행되지만, 특정 이벤트가 발생하면 등록된 코드를 동작시키기 위함입니다.

    collect 대신에 launchin을 사용하면 분리된 코 루틴으로 시작하고 코드는 바로 실행되도록 할 수 있습니다.

    // Imitate a flow of events
    fun events(): Flow<Int> = (1..3).asFlow().onEach { delay(100) }
    
    fun main() = runBlocking<Unit> {
        events()
            .onEach { event -> println("Event: $event") }
            .launchIn(this) // <--- Launching the flow in a separate coroutine
        println("Done")
    }
    
    // Done
    // Event : 1
    // Event : 2
    // Event : 3
    
    events()가 별도의 코루틴에서 실행되므로 100ms 딜레이로 인하여
    Done이 먼저 출력되고 순서대로 events가 출력된다.

    launchIn을 사용할 때에는 매개 변수로 coroutineScope를 명시적으로 넣어야 합니다.

    launchIn은 job을 리턴합니다.

    따라서 scope의 취소 없이 flow만을 취소할 수도 있고, join을 이용하여 완료될 때까지 대기시킬 수 도 있습니다.

    Flow cancellation checks

    편의를 위해서 flow builder는 방출된 값에 대한 cancel을 sureActive 검사를 수행합니다.

    flow {}에서 사용 중인 반복문을 사용하는 방출이 cancell 가능함을 의미합니다.

    fun foo(): Flow<Int> = flow { 
        for (i in 1..5) {
            println("Emitting $i") 
            emit(i) 
        }
    }
    
    fun main() = runBlocking<Unit> {
        foo().collect { value -> 
            if (value == 3) cancel()  
            println(value)
        } 
    }
    
    // Emitting 1
    // 1
    // Emitting 2
    // 2
    // Emitting 3
    // 3
    // Emitting 4
    // Exception in thread "main" kotlinx.coroutines.JobCancellationException: BlockingCoroutine was cancelled; job="coroutine#1":BlockingCoroutine{Cancelled}@6d7b4f4c
    
    값이 3일때 cancel 시켰으므로 4를 방출하려할때 예외가 발생한다.

    하지만 대부분 flow 연산자는 성능상의 이유로 자체적으로 additional cancellation check 하지 않습니다.

    fun main() = runBlocking<Unit> {
        (1..5).asFlow().cancellable().collect { value -> 
            if (value == 3) cancel()  
            println(value)
        } 
    }
    
    // 1
    // 2
    // 3
    // 예외

    cancellable()을 사용하면 cancel 된 것을 감지할 수 있습니다.

    반응형

    댓글

Designed by Tistory.