当使用channel时,假设在给定时刻,给定接收方的队列中有以下消息:

abc
blah
abc
something
something
blah
something

通常,消息的处理方式如下:

let (sender, receiver) = mpsc::channel::<Msg>();
for msg in receiver {
   // Process a message...
}

对已经堆积的邮件进行重复数据删除最简单的方法是什么?

在上面的例子中,我只想处理3条(而不是7条)消息:

abc
blah
something

注意:在处理完一条消息abc之后,以后可以再次接收(并处理)另一条消息abc.其目标是仅对已在队列中堆积的邮件进行重复数据消除.

推荐答案

解决方案1

以下是一个解决方案示例:

loop {
    let deduped_msgs : HashSet<Msg> = HashSet::from_iter(receiver.try_iter());
    for msg in deduped_msgs {
        // Process a message...                
    }
    thread::sleep(time::Duration::from_millis(10));
}

上面使用的是非阻塞的try_iter方法.我添加了一个短sleep(),以避免对CPU造成过大的压力.

解决方案2

根据Chayim的建议,这里有一个类似的解决方案,它避免了每X毫秒进行一次定期判断的需要:

let mut deduped_msgs = HashSet::new();
loop {
    deduped_msgs.insert(receiver.recv().unwrap());
    deduped_msgs.extend(receiver.try_iter());               
    for msg in deduped_msgs.drain() {
        // Process a message... 
    }
}

处理顺序

如果处理消息的顺序很重要,那么HashSet(单独)可能不是最佳 Select .

  • 要按照消息到达的顺序处理消息(只考虑每条消息第一次出现的情况),例如可以使用IndexSet.
  • 在某些情况下,消息的自然顺序可能更有意义,然后BTreeSet可能是最简单的 Select .
  • 要应用某种自定义优先级顺序,可以为 例如,使用一个向量和sort it个VIA sort_unstable_by 或 一百零二 (首先用HashSet对他们进行重复数据删除可能更简单,但另一个 选项可能是 dedup 已经排序的向量).

Rust相关问答推荐

有没有方法处理rust中嵌套的ok_or()?

收集RangeInclusive T到Vec T<><>

关联类型(类型参数)命名约定

使用模块中的所有模块,但不包括特定模块

什么时候铁 rust FFI边界上的panic 是未定义的行为?

什么时候和为什么S最好是按值或引用传递简单类型

对于已经被认为是未定义行为的相同数据,纯粹存在`&;[u32]`和`&;mut[u32]`吗?

如何将映射反序列化为具有与键匹配的字段的定制 struct 的向量?

根据填充系数以相对大小在给定空间中布局项目

解析程序无法在Cargo 发布中 Select 依赖版本

为什么 tokio 在以奇怪的方式调用时只运行 n 个任务中的 n-1 个?

RUST 中的读写器锁定模式

Rust 中的复合 `HashSet` 操作或如何在 Rust 中获得 `HashSet` 的显式差异/并集

仅发布工作区的二进制 crate

bcrypt 有长度限制吗?

Rust/Serde/HTTP:序列化`Option`

BigUint 二进制补码

为什么 Bevy 的 Trait 边界不满足 Rapier 物理插件?

来自外部函数的future 内部可变引用

函数参数的 Rust 功能标志