我正在使用Redis来传输数据.我有多个生产者实例生成相同的数据,旨在实现事件一致性.

现在,生产商生成的交易具有1到2之间的随机交易id.我想要一个重复数据消除服务或基于交易id的东西来分发重复数据.我该怎么做?

消费者

using System.Text.Json;
using Shared;
using StackExchange.Redis;

var tokenSource = new CancellationTokenSource();
var token = tokenSource.Token;

var muxer = ConnectionMultiplexer.Connect("localhost:6379");
var db = muxer.GetDatabase();

const string streamName = "positions";
const string groupName = "avg";

if (!await db.KeyExistsAsync(streamName) ||
    (await db.StreamGroupInfoAsync(streamName)).All(x => x.Name != groupName))
{
    await db.StreamCreate消费者GroupAsync(streamName, groupName, "0-0");
}

var consumerGroupReadTask = Task.Run(async () =>
{
    var id = string.Empty;
    while (!token.IsCancellationRequested)
    {
        if (!string.IsNullOrEmpty(id))
        {
            await db.StreamAcknowledgeAsync(streamName, groupName, id);
            id = string.Empty;
        }

        var result = await db.StreamReadGroupAsync(streamName, groupName, "avg-1", ">", 1);
        if (result.Any())
        {
            id = result.First().Id;
            var dict = ParseResult(result.First());

            var trade = JsonSerializer.Deserialize<Trade>(dict["trade"]);

            Console.WriteLine($"Group read result: trade: {dict["trade"]}, time: {dict["time"]}");
        }

        await Task.Delay(1000);
    }
});

Console.ReadLine();

static Dictionary<string, string> ParseResult(StreamEntry entry)
{
    return entry.Values.ToDictionary(x => x.Name.ToString(), x => x.Value.ToString());
}

制作人

using System.Text.Json;
using Shared;
using StackExchange.Redis;

var tokenSource = new CancellationTokenSource();
var token = tokenSource.Token;

var muxer = ConnectionMultiplexer.Connect("localhost:6379");
var db = muxer.GetDatabase();

const string streamName = "positions";

var producerTask = Task.Run(async () =>
{
    var random = new Random();
    while (!token.IsCancellationRequested)
    {
        var trade = new Trade(random.Next(1, 3), "btcusdt", 25000, 2);

        var entry = new List<NameValueEntry>
        {
            new("trade", JsonSerializer.Serialize(trade)),
            new("time", DateTimeOffset.Now.ToUnixTimeSeconds())
        };

        await db.StreamAddAsync(streamName, entry.ToArray());

        await Task.Delay(2000);
    }
});

Console.ReadLine();

推荐答案

根据需要的分发级别和处理流中传入的丢失消息的程度,您可以在这里使用两种策略.以下是使用Redis的两个可行解决方案:

当您可以容忍事件中1%的未命中率时,请使用Bloom过滤器

您可以在Redis中使用BloomFilter,这将是一种非常紧凑、非常快速的方法来确定特定记录是否尚未被记录.如果您运行:

var hasBeenAdded = ((int)await db.ExecuteAsync("BF.ADD", "bf:trades",dict["trade"])) == 1;

如果hasBeenAdded为真,则可以肯定地说记录不是重复记录,如果为假,则概率取决于如何将bloom过滤器设置为BF.RESERVE

如果你想使用Bloom过滤器,你需要在Redis实例中加载RedisBloom,或者只需使用Redis Stack

当未命中不可接受时,使用排序集

如果你的应用程序不能容忍遗漏,你可能更明智地使用集合或排序集,一般来说,我建议你使用集合,因为它们更容易清理.

基本上,如果你使用的是一个排序集,你会用ZSCORE zset:trades trade-id判断一条记录是否已经记录在你的平均值中,如果分数回来,你知道这些记录已经被使用了,否则你可以将其添加到排序集.重要的是,由于排序集呈线性增长,因此您需要定期清理它,因此如果您将消息id的时间戳设置为分数,则可能可以确定一些可行的时间间隔以返回并执行ZREMRANGEBYSCORE以清除旧记录.

Csharp相关问答推荐

为什么这个Reflection. Emit代码会导致一个DDL ViolationException?

"virtual"修饰符对接口成员有什么影响?

. NET Core DB vs JSON模型设计

为什么Blazor值在更改后没有立即呈现?

如何在NodaTime中为Instant添加一年?

C#方法从AJAX调用接收NULL

如何忽略API JSON响应中的空字符串?

System.Net.Http.HttpClient.SendAsync(request)在docker容器内的POST方法30秒后停止

尽管保证密钥不同,但已添加相同密钥的项(&Q;)

如何在毛伊岛应用程序中完美地同步视图模型和视图的加载?

BlockingCollection T引发意外InvalidOperationException

如何在C#中创建VS代码中的控制台应用程序时自动生成Main方法

Selify只更改第一个下拉菜单,然后忽略REST-C#

是否有必要在ASP.NET Core中注册可传递依赖项?

基于C#和ANGING的SignalR实时聊天流媒体应用

MudBlazor Textfield已禁用,但其验证工作正常

用于ASP.NET核心的最小扩展坞

C#Microsoft.CodeAnalysis.CSharp.Scriiting不等待并行.对于

根据运行时值获取泛型类型的字典

在SQL中删除少于24小时的令牌