我正在使用Tokio tokio = { version = "1.17.0", features = ["full"] } unbounded_channel通道发送一些SSE消息,因为我需要从不同的位置发送消息,所以我定义了如下的Send Message函数(这是显示错误的最小再现演示):

use std::sync::{Arc, Mutex};
use tokio::{
    sync::mpsc::{UnboundedReceiver, UnboundedSender},
    task,
};

#[tokio::main]
async fn main() {
    let (tx, rx): (UnboundedSender<String>, UnboundedReceiver<String>) =
        tokio::sync::mpsc::unbounded_channel();
    task::spawn_blocking(move || {
        let shared_tx = Arc::new(Mutex::new(tx));
        shared_tx.lock().unwrap().send("l".to_string());
    });
    tx.send("d".to_string());
}

编译器显示错误:

   > cargo build
   Compiling rust-learn v0.1.0 (/Users/xiaoqiangjiang/source/reddwarf/backend/rust-learn)
warning: unused variable: `rx`
 --> src/main.rs:9:14
  |
9 |     let (tx, rx): (UnboundedSender<String>, UnboundedReceiver<String>) =
  |              ^^ help: if this is intentional, prefix it with an underscore: `_rx`
  |
  = note: `#[warn(unused_variables)]` on by default

error[E0382]: borrow of moved value: `tx`
  --> src/main.rs:15:5
   |
9  |     let (tx, rx): (UnboundedSender<String>, UnboundedReceiver<String>) =
   |          -- move occurs because `tx` has type `UnboundedSender<String>`, which does not implement the `Copy` trait
10 |         tokio::sync::mpsc::unbounded_channel();
11 |     task::spawn_blocking(move || {
   |                          ------- value moved into closure here
12 |         let shared_tx = Arc::new(Mutex::new(tx));
   |                                             -- variable moved due to use in closure
...
15 |     tx.send("d".to_string());
   |     ^^^^^^^^^^^^^^^^^^^^^^^^ value borrowed here after move

For more information about this error, try `rustc --explain E0382`.
warning: `rust-learn` (bin "rust-learn") generated 1 warning
error: could not compile `rust-learn` due to previous error; 1 warning emitted

我已经try 为shared_tx添加.as_ref(),但仍然没有解决这个问题.我应该做些什么来解决这个问题?这就是Cargo .

[package]
name = "rust-learn"
version = "0.1.0"
edition = "2018"

[dependencies]
tokio = { version = "1.17.0", features = ["full"] }
serde = { version = "1.0.64", features = ["derive"] }
serde_json = "1.0.64"

推荐答案

MPSC代表Multiple Producers,单一消费者.你不需要把发件人包装成Arc<Mutex>个,你只需要clone()个即可.

#[tokio::main]
async fn main() {
    let (tx, rx): (UnboundedSender<String>, UnboundedReceiver<String>) =
        tokio::sync::mpsc::unbounded_channel();
    task::spawn_blocking({
        let tx = tx.clone();
        move || {
            let shared_tx = Arc::new(Mutex::new(tx));
            shared_tx.lock().unwrap().send("l".to_string()).unwrap();
        }
    });
    tx.send("d".to_string()).unwrap();
}

Rust相关问答推荐

计算具有相邻调换且没有插入或删除的序列的距离

使用 struct 外部的属性来改变 struct 的原始方式

如何在 struct 的自定义序列化程序中使用serde序列化_WITH

如何为 struct 字段设置新值并在Ruust中的可变方法中返回旧值

有没有可能让泛型Rust T总是堆分配的?

如何防止Cargo 单据和Cargo 出口发布( crate )项目

如何获取光标下的像素 colored颜色 ?

将特征与具有生命周期的关联类型一起使用时的生命周期方差问题

如何重命名 clap_derive 中的子命令占位符?

`UnsafeCell` 在没有锁定的情况下跨线程共享 - 这可能会导致 UB,对吗?

需要一个有序向量来进行 struct 初始化

为什么需要同时为值和引用实现`From`?方法不应该自动解引用或borrow 吗?(2023-06-16)

一个函数调用会产生双重borrow 错误,而另一个则不会

使用自定义 struct 收集 Vec

产生拥有值的迭代器的 Rust 可变borrow 在循环中失败

更好的方法来模式匹配带有 Rust 的窥视孔装配说明窗口?

在 RefCell 上borrow

没有通用参数的通用返回

如何在 Rust 的内置函数上实现特征?

为什么我返回的 impl Trait 的生命周期限制在其输入的生命周期内?