I'm struggling to create a 'takeUntilSignal' operator for a Flow - an extension method that will cancel a flow when another flow generates an output.
fun <T> Flow<T>.takeUntilSignal(signal: Flow<Unit>): Flow<T>
我最初的努力是try 在与主流收集相同的协同路由范围内启动信号流收集,并取消协同路由范围:
fun <T> Flow<T>.takeUntilSignal(signal: Flow<Unit>): Flow<T> = flow {
kotlinx.coroutines.withContext(coroutineContext) {
launch {
signal.take(1).collect()
println("signalled")
cancel()
}
collect {
emit(it)
}
}
}
But this isn't working (and uses the forbidden "withContext" method that is expressly stubbed out by Flow to prevent usage).
edit I've kludged together the following abomination, which doesn't quite fit the definition (resulting flow will only cancel after first emission from primary flow), and I get the feeling there's a far better way out there:
fun <T> Flow<T>.takeUntilSignal(signal: Flow<Unit>): Flow<T> =
combine(
signal.map { it as Any? }.onStart { emit(null) }
) { x, y -> x to y }
.takeWhile { it.second == null }
.map { it.first }
edit2
fun <T> Flow<T>.takeUntilSignal(signal: Flow<Unit>): Flow<T> =
channelFlow {
launch {
signal.take(1).collect()
println("hello!")
close()
}
collect { send(it) }
close()
}