I'm struggling to create a 'takeUntilSignal' operator for a Flow - an extension method that will cancel a flow when another flow generates an output.

fun <T> Flow<T>.takeUntilSignal(signal: Flow<Unit>): Flow<T>

我最初的努力是try 在与主流收集相同的协同路由范围内启动信号流收集,并取消协同路由范围:

fun <T> Flow<T>.takeUntilSignal(signal: Flow<Unit>): Flow<T> = flow {
    kotlinx.coroutines.withContext(coroutineContext) {
        launch {
            signal.take(1).collect()
            println("signalled")
            cancel()
        }
        collect {
            emit(it)
        }
    }
}

But this isn't working (and uses the forbidden "withContext" method that is expressly stubbed out by Flow to prevent usage).

edit I've kludged together the following abomination, which doesn't quite fit the definition (resulting flow will only cancel after first emission from primary flow), and I get the feeling there's a far better way out there:

fun <T> Flow<T>.takeUntilSignal(signal: Flow<Unit>): Flow<T> =
    combine(
        signal.map { it as Any? }.onStart { emit(null) }
    ) { x, y -> x to y }
        .takeWhile { it.second == null }
        .map { it.first }

edit2

fun <T> Flow<T>.takeUntilSignal(signal: Flow<Unit>): Flow<T> =
    channelFlow {
        launch {
            signal.take(1).collect()
            println("hello!")
            close()
        }
        collect { send(it) }
        close()
    }

推荐答案

Use couroutineScope and start the new coroutine inside:

fun <T> Flow<T>.takeUntilSignal(signal: Flow<Unit>): Flow<T> = flow {
    try {
        coroutineScope {
            launch {
                signal.take(1).collect()
                println("signalled")
                this@coroutineScope.cancel()
            }

            collect {
                emit(it)
            }
        }

    } catch (e: CancellationException) {
        //ignore
    }
}

Kotlin相关问答推荐

Groovy Gradle文件的Kotlin类似功能

Kotlin编译器如何决定是否可以在任何给定点调用Suspend方法?

在 Kotlin 中将两个字节转换为 UIn16

将 java Optional 转换为 Kotlin Arrow Option

区分函数和扩展

Kotlin Path.useLines { } - 如何不获取 IOException("Stream closed")?

Kotlin:使用另一个列表和字母顺序对列表进行排序的有效方法

如何为 Kotlin 中的每个循环设置以避免越界异常

从 HashMap 检索时的 NPE,即使 containsKey() 在多线程环境中返回 true

Mixin 在 Jackson 中添加 defaultImpl 不起作用

interface扩展

IntentService (kotlin) 的默认构造函数

Kotlinwhen表达式在使用主题时是否支持复合布尔表达式?

Kotlin - mutableMapOf() 会保留我输入的顺序

Kotlin DataBinding 将静态函数传递到布局 xml

作为 Kotlin 中的函数的结果,如何从 Firestore 数据库返回列表?

哪里可以找到aapt2日志(log)?

面临一些未知问题一些后端jvm内部错误

任何处理器都无法识别以下选项:'[kapt.kotlin.generated, room.incremental]'

kotlin中密封类和密封接口的区别是什么