본문 바로가기

Android/이론

[안드로이드] Cold flows, Hot channels(번역)

728x90
반응형

 

Cold flows, Hot channels

 

Cold flows, hot channels

Kotlin coroutines were missing a primitive to represent cold asynchronous streams of data. Not anymore. Welcome Kotlin Flows.

elizarov.medium.com

 

비동기식, 장기 또는 원격 작업들은 future type을 통해 표현될 수 있으므로, 값을 반환하는 함수는 다음과 같이 구현될 수 있습니다.

fun fooAsync(p: Params): CompletableFuture<Value> = 
    CompletableFuture.supplyAsync { bar(p) } 

 

fooAsync(p)를 호출하면 미래에 값을 제공하겠다는 약속을 가지게 되고, 이 값을 계산하기 위해 backgraond에서 bar operation 이 실행되게 됩니다. 이제 이 future에 대한 참조를 잃지 않기위해 주의를 기울여야합니다. 왜냐하면 이 future는, 사실상 열린 파일과 같은 자원이기 때문입니다. 이 값이 더이상 필요하지 않은 경우 반드시 기다리거나, 취소해야합니다.

 

이를 hot data source라고 합니다. 호출 중에만 활성화되는 일반 함수와는 달리 , hot source는 해당 함수에 대한 호출 외부에서도 활성 상태이고, 함수를 호출하기 전에도 백그라운드에서 활성 상태였을 수 있으며, 여기서 보는 것 처럼 함수를 호출한 후에도 활성 상태일 수 있습니다.

 

Suspending functions

코틀린 프로그래밍 언어는 suspending functions을 제공합니다.

코틀린에서 이러한 비동기적인 작업을 나타내고 hot future 프로그래밍의 모든 위험들을 피하기 위한 관용적인 방법은 :

suspend fun foo(p: Params): Value =
    withContext(Dispatchers.Default) { bar(p) }

foo 호출자는 bar operation이 진행되는 동안 중단(연기)됩니다. background 작업에 대한 참조가 실수로 손실될 염려가 없으며 suspending function을 사용하여 작성된 코드가 일반, blocking, 동기식 코드처럼 친숙해 보입니다. 이 foo 함수 정의는 cold입니다. 이것은 호출되기 전에는 아무 작업도 하지 않으며 반환한 후에는 아무 작업도 수행하지 않습니다.

 

Collection of values

만약 operation이 변수들의 collection을 반환하면 어떨까요? 우리는 List type을 사용할수 있습니다 :

suspend fun foo(p: Params): List<Value> =
    buildList² { while (hasMore) add(nextValue) }

² : buildList는 실제 코틀린의 function은 아니지만, 제안되었다: KT-15363

이 signature는 변수의 JSON array나 유사한 종류의 RPC/RMI endpoint를 반환하는 REST endpoint의 API를 완벽하게 캡쳐합니다. 작업이 진행되는 동안 호출자를 중단하고 전체목록을 한번에 반환합니다.

 

Stream of values

그러나 streaming API를 나타내려면 어떨까요? 즉, 변수가 하나씩 도착할 때, 예를 들면 websocket 메세지나, GRPC메세지 스트림이나, RSocket과 같은 스트리밍 프로토콜과 같은 경우가 있습니다.

동기식 스트리밍을 위해 코틀린은 Sequence data type을 제공합니다:

fun foo(p: Params): Sequence<Value> =
    sequence { while (hasMore) yield(nextValue) }

그러나, 만약 스트리밍 API를 나타내기 위해 sequence를 리턴 타입으로 사용한다면, 들어오는 값을 기다리는 동안 호출자의 스레드를 차단해야 합니다. 이것은 UI 앱에 좋지 않고 확장 가능한 서버측 코드에 좋지 않습니다. 비동기 프로그래밍의 경우  우리는 코루틴을 대신 중단하길 원합니다.

 

Hot channels

우리는 동기식 stream을 나타내기 위해 kotlinx.coroutines 라이브러리의 ReceiveChannel type을 사용할 수 있습니다:

fun fooProducer(p: Params): ReceiveChannel<Value> =
    GlobalScope.produce { while (hasMore) send(nextValue) }

그러나, 우리는 future와 같은 문제에 봉착합니다. channel은 값의 hot stream을 나타냅니다. channel의 다른편에는 값을 생산하기 위해 일하는 코루틴이 있기 때문에, ReceiveChannel에 대한 참조를 분리할수 없습니다. 왜냐하면 생산자는 소비자를 기다리며, 메모리 자원을 낭비, 네트워크 연결 개방등을 하며 영원히 중단될것이기 때문입니다.

 

구조적 동시성은 문제를 다소 완화시킵니다. fooProducer가 코드의 나머지 부분과 동시에 작동하는 코루틴을 실행하는것을 관찰합시다. 우리는 CoroutineScope extension으로서 fooProducer function을 선언함으로써 이러한 동시실행을 명시적으로 만들 수 있습니다.

fun CoroutineScope.fooProducer(p: Params): ReceiveChannel<Value> =
    produce { while (hasMore) send(nextValue) }

그러나, 이것은 문제를 완벽히 해결하지는 않습니다. 단지 우리의 버그의 영향을 바꿀 뿐입니다. 구조화된 동시실행이 없이는 lost channels는 lost futures와 같습니다. 그들은 조용히 리소스 누출을 발생시킵니다.

구조화된 동시실행과 함께 하면 lost channels는 외부 코루틴 스코프의 완성를 방지하고, 진행중인 작업을 효과적으로 "hanging"합니다. 후자는 시험중에 더 분명히 알 수 있는 효과이지만, 어쨌든 나쁩니다. 우리는 여전히 다음과 같은것을 쓸수 없습니다:

val values: ReceiveChannel<Value> = fooProducer(p)
if (someCondition) return anotherResult // Oops! Leaked values
// ... do further work with values ...

대체로 channel과의 작업은 suspending 함수를 사용한 하나의 변수와 작업하거나 동기식 값들의 Sequence와 작업하는 것처럼 간단하지 않으며, 동시실행으로 인한 미묘한 문제와 관습이 수반됩니다.

 

channel은 본질적으로 hot한 데이터 소스와 애플리케이션의 요청 없이 존재하는 데이터소스(수신 네트워크 연결, 이벤트 스트림 등)를 모델링하는데 매우 적합합니다.

 

channel은 future와 마찬가지로 동기화 기본 요소 입니다. 한 코루틴에서 같거나 다른 프로세스에 있는 다른 코루틴으로 데이터를 전송해야할 때 채널을 사용해야 합니다. 왜냐하면 다른 코루틴들이 공존하고 동시성이 있는 데이터와 작업하려면 동기화가 필요하기 때문입니다.

그러나, 동기화는 항상 성능 비용을 부담합니다.

 

cold flows

그러나 동시실행과 동기화가 필요하지 않은, non-blocking data stream이 필요하다면 어떨까요?

우리는 지금까지 그런 type이 없었으므로, kotlinx.coroutines version 1.2.0-alpha-2에서 이용가능하게 된 코틀린 Flow type을 환영합니다 :

fun foo(p: Params): Flow<Value> =
    flow { while (hasMore) emit(nextValue) }

sequence처럼, flow는 값의 cold stream을 나타냅니다. foo 호출자는 flow instance에 대한 참조를 갖지만, flow{...} 안에 있는 코드는 map, filter등과 같은 다양한 operator를 사용하여 변형될 수 있습니다.

sequence와 달리, flow는 비동기이고 builder와 operator 내부의 모든 곳에서 suspending function을 허용합니다.

예를들어, 다음의 코드는 10개의 정수 앞에 각각 100ms의 delay를 가지는 flow를 정의합니다:

val ints: Flow<Int> = flow { 
    for (i in 1..10) {
        delay(100)
        emit(i)
    }
}

flow의 Terminal operator는 해당하는 작동기간의 flow code만 활성화하여, flow에 의해 방출된 모든 값을 수집합니다. 이것이 flow를 cold로 만듭니다.

terminal operation 호출 전에는 활성되지 않고, 이후에도 활성되지 않으며, 호출에서 돌아오기 전에 모든 리소스를 해제합니다.

가장 기본적인 terminal operation을 collect라고 합니다. 이것은 flow가 collect되는 동안 호출 코루틴을 일시 중단하는 suspending function입니다 :

ints.collect { println(it) } // takes 1 second, prints 10 ints

 

Conclusion

Flow가 preview상태이므로 귀하의 피드백을 환영합니다. 우리는 여전히 API와 구현을 조정할 수 있습니다.

channel과 다르게, flow는 본질적으로 어떤 동시수행을 수반하지 않습니다. non-blocking이지만, 순차적입니다.

flow의 목표는 비동기 데이터 스트림을 위한 편리하고 안전하며, 배우기 쉽고 사용하기 쉬운 비동기 operation을 대표하는 suspending function이 되는 것입니다.

아직 코드에 성능 최적화가 적용되지 않았기 때문에 flow의 성능을 논하기는 이르지만, 동기화 기본요소의 성능을 상당히 초과할 것을 약속합니다. 향후 업데이트에서 이러한 측면을 살펴보겠습니다.

 

 

728x90
반응형