我有一些任务需要连同三个步骤一起执行,第一步和第三步必须顺序运行,只有第二步同时运行,给出了Kotlin Channel遵循先进先出的顺序,我不能总是让Step 3顺序运行,有没有任何实现可以满足我的目的,或者另一种数据 struct ?

以下是我的演示代码:

import android.util.Log
import kotlinx.coroutines.CoroutineScope
import kotlinx.coroutines.Dispatchers
import kotlinx.coroutines.channels.Channel
import kotlinx.coroutines.launch
import kotlin.random.Random

class SortingReceiveChannelTest(
    private val coroutineScope: CoroutineScope
) {
    private val tasks = List(30) { Task(it) }

    private val randomDelay = Random(Int.MAX_VALUE)
    private val step2Channel = Channel<Task>(2)
    private val step3Channel = Channel<Task>()

    fun start() {
        coroutineScope.launch(Dispatchers.IO) {
            for (task in tasks) {
                task.step1()
                step2Channel.send(task)
            }
        }

        coroutineScope.launch(Dispatchers.IO) {
            repeat(tasks.size) {
                val task = step2Channel.receive()
                coroutineScope.launch(Dispatchers.IO) {
                    task.step2()
                    step3Channel.send(task)
                }
            }
        }

        coroutineScope.launch(Dispatchers.IO) {
            repeat(tasks.size) {
                val task = step3Channel.receive()
                task.step3()
            }
        }
    }

    inner class Task(val index: Int) {
        fun step1() {
            Log.i(TAG, "execute step1[$index] sequential")
            // sleep to simulating a long run task
            Thread.sleep(randomDelay.nextLong(100, 200))
//            Log.i(TAG, "step1[$index] executed")
        }

        fun step2() {
            Log.i(TAG, "execute step2[$index] concurrently")
            if (index == 2 || index == 5 || index == 8 || index == 16) {
                // make specific tasks run longer to mess the step3's order
                Thread.sleep(randomDelay.nextLong(2000, 5000))
            } else {
                Thread.sleep(randomDelay.nextLong(100, 300))
            }
//            Log.i(TAG, "step2[$index] executed")
        }

        fun step3() {
            Log.i(TAG, "execute step3[$index] sequential")
            Thread.sleep(randomDelay.nextLong(100, 300))
//            Log.i(TAG, "step3[$index] executed")
        }
    }

    companion object {
        private const val TAG = "SortChannel"
    }
}

推荐答案

如果我理解正确的话:

  • 步骤1、2和3必须按顺序进行
  • 对于任务列表中的每个项,步骤1和步骤3必须按照它们在源列表中的相同顺序进行处理.

要做到这一点,一种方法是取消中间通道,而将次要作业(job)和任务一起发送到最终通道:

class SortingReceiveChannelTest(
    private val coroutineScope: CoroutineScope
) {
    private val tasks = List(30) { Task(it) }

    private val randomDelay = Random(Int.MAX_VALUE)
    private val step3Channel = Channel<Pair<Job, Task>>(capacity = tasks.size)

    fun start() {
        coroutineScope.launch(Dispatchers.IO) {
            for (task in tasks) {
                task.step1()
                val step2Job = coroutineScope.launch { task.step2() }
                step3Channel.send(step2Job to task)
            }
        }

        coroutineScope.launch(Dispatchers.IO) {
            repeat(tasks.size) {
                val (step2Job, task) = step3Channel.receive()
                step2Job.join()
                task.step3()
            }
        }
    }

    //...
}

Android相关问答推荐

如何允许我的应用程序在Android 10上运行,同时目标是API 33

当我在Android上运行应用程序时,组件会随机调整大小和移动

在Jetpack Compose中,material 3 Textfield上的底部边框 colored颜色 是如何更改的?

每次重启Android时预填入Room数据库

穿戴与iPhone连接的安卓操作系统

Android 11:在try 获取文件的永久权限后,仍然没有读写权限

如何在使用带有底部导航组件的片段管理器时更改片段工具栏的标签

Kotlin为多个控件设置一个侦听器

了解 CoroutineScope(Job() + Dispatchers.Main) 语法

面向Jetpack Compose的可组合放置问题

如何仅同步 local_manifest.xml?

判断 AAR 元数据时发现 Android 问题:androidx.core:core:1.12.0-alpha01 和 androidx.core:core-ktx:1.12.0-alpha01

CoroutineScope 与挂起函数

如何使用 Jetpack Compose 制作两个圆圈

单击过go 的文章时 NewsApp 崩溃

Android Transitions API - 在 24-48 小时后停止接收任何更新

状态值更改时屏幕未重新组合 - Jetpack Compose

我不能在 jetpack Compose 中使用 TextField()(material 3)

compose FontFamily 错误:必须初始化变量

compose :为什么以记住启动的列表触发方式与快照不同