Flow
는 요청이 안쪽 방향으로 흐르고, 요청에 의해 성된 값이 다른 방향으로 흐르는 파이프라 생각할 수 있습니다. 모든 정보가 Flow
로 전달되므로 값, 예외 및 다른 특정 이벤트를 감지할 수 있습니다.
Flow
의 값을 하나씩 받기 위해 onEach 함수를 사용합니다.
public fun <T> Flow<T>.onEach(action: suspend (T) -> Unit): Flow<T> = transform { value ->
action(value)
return@transform emit(value)
}
원소는 순서대로 처리됩니다.
fun main() = runBlocking {
flowOf(1, 2, 3)
.onEach { println("onEach: $it") }
.collect { println("collect: $it") }
}
// onEach: 1
// collect: 1
// onEach: 2
// collect: 2
// onEach: 3
// collect: 3
onStart 함수는 Flow
가 시작되는 경우, 즉 첫 번째 원소를 요청했을 때 호출되는 함수입니다.
public fun <T> Flow<T>.onStart(
action: suspend FlowCollector<T>.() -> Unit
): Flow<T> = unsafeFlow { // Note: unsafe flow is used here, but safe collector is used to invoke start action
val safeCollector = SafeCollector<T>(this, currentCoroutineContext())
try {
safeCollector.action()
} finally {
safeCollector.releaseIntercepted()
}
collect(this) // directly delegate
}
onCompletion 메서드는 Flow
가 완료되었을 때, 호출되는 함수입니다.
public fun <T> Flow<T>.onCompletion(
action: suspend FlowCollector<T>.(cause: Throwable?) -> Unit
): Flow<T> = unsafeFlow { // Note: unsafe flow is used here, but safe collector is used to invoke completion action
try {
collect(this)
} catch (e: Throwable) {
/*
* Use throwing collector to prevent any emissions from the
* completion sequence when downstream has failed, otherwise it may
* lead to a non-sequential behaviour impossible with `finally`
*/
ThrowingCollector(e).invokeSafely(action, e)
throw e
}
// Normal completion
val sc = SafeCollector(this, currentCoroutineContext())
try {
sc.action(null)
} finally {
sc.releaseIntercepted()
}
}
<aside> 💡
onCompletion을 사용할 때, 주의해야할 것은 Hot Flow에서 사용할 경우에 Flow가 완료되지 않기에 영원히 호출되지 않을 수 있다는 점입니다.
</aside>
onEmpty는 Flow
의 요소가 하나도 방출되지 않았을 때, 주어진 작업을 실행하는 연산자입니다. Flow
는 예기치 않은 이벤트가 발생하면 값을 하나도 내보내기 전에 완료될 수 있기에 이런 경우에 사용될 수 있습니다.