我正在实现一个通知系统,其中我有一个具有以下架构的notifications集合:

{
    user_id: Number,
    message: String,
    count: Number,
    timestamp: Date
}

每当同一用户有2个或更多相同message的连续通知时,我想要防止存储所有这些通知.相反,我只想将count加1.

以下是使用上述方法处理插入新通知的node.js代码:

async function createNotification(user_id, message) {
    const notifications = db.collection('notifications');
    const [lastNotification] = await notifications.find({ user_id })
        .sort({ timestamp: -1 }).limit(1).toArray();

    if(lastNotifcation?.message === message) {
        await notifications.updateOne(
            { _id: lastNotifcation._id },
            { count: lastNotifcation.count+1 },
        )
    } else {
        await notifications.insertOne({ user_id, message, count: 1, timestamp: new Date() });
    }
}

虽然上面的代码可以工作,但它不是原子的(它不是单个操作). 我如何用一次操作而不是两次操作来实现上述逻辑?

推荐答案

虽然我认为将您的现有工作包装在transactions中应该是涉及最少更改的选项,但您可以 Select 为所有工作利用聚合管道.

  1. $match以判断现有数据
  2. $sort+$limit: 1获得最后一项记录
  3. $set+$add以递增计数器
  4. $setUnion与您的默认数据"插入模板"
  5. $setWindowFields to compute $rank for the user_id partition
    • sortBy timestamp到计算机rank
    • 如果有来自步骤2,3的现有数据,它将具有等级:1,因为它的时间戳将小于$$NOW当前时间戳"插入模板"
    • 如果不存在现有数据,则"插入模板"的排名为:1
  6. $match用于仅提取rank: 1数据,$unset用于清理rank字段
  7. $merge以插入回集合中
db.notifications.aggregate([
  {
    "$match": {
      // your input user_id and message to upsert here
      "user_id": 1,
      "message": "update case - count should be incremented"
    }
  },
  {
    "$sort": {
      "timestamp": -1
    }
  },
  {
    "$limit": 1
  },
  {
    "$set": {
      "count": {
        "$add": [
          "$count",
          1
        ]
      }
    }
  },
  {
    "$unionWith": {
      "coll": "notifications",
      "pipeline": [
        {
          "$documents": [
            // document to insert if not found
            {
              "user_id": 1,
              "message": "msg to insert",
              "count": 1,
              "timestamp": "$$NOW"
            }
          ]
        }
      ]
    }
  },
  {
    "$setWindowFields": {
      "partitionBy": "$user_id",
      "sortBy": {
        "timestamp": 1
      },
      "output": {
        "rank": {
          "$rank": {}
        }
      }
    }
  },
  {
    "$match": {
      "rank": 1
    }
  },
  {
    "$unset": "rank"
  },
  {
    "$merge": {
      "into": "notifications",
      "on": [
        "user_id",
        "message"
      ]
    }
  }
])

Mongo Playground for matched / update count case
Mongo Playground for unmatched / insert case
Mongo Playground for no record / insert case

Node.js相关问答推荐

使用NodeJS在S3上传文件时的格式问题

postgresql层与应用层的序列化

仅当所需文档是最后一个文档时才更新MongoDB,否则插入

FireStore事务似乎已允许并发编辑?

使用NodeJS的DynamoDB中的BatchGetItem出现MultipleValidationError

如何修复PostgreSQL和NodeJS/NestJS应用程序之间的日期时间和时区问题?

使用AWS SDK for JavaScript V3将图像从node.js上传到s3 bucket

使用单个 MongoDB 查询更新多个元素

Gulp 能否向 Docker 发出增量构建的第一次迭代完成的信号?

$not 的聚合版本是什么?

express cors request.body formData显示undefined

将文件传递到 AWS lambdas(nodejs) + API 网关后重新上传文件

为什么要加密 CSRF 令牌?

如何刷新 youtube-data-api v3 的访问令牌

AWS EC2 npm install 突然很慢

postman 发送请求……永远

自定义 Docker 容器 Github 操作无法在 /github/workspace 中找到 Node 脚本

Cassandra node.js 驱动程序有替代品吗?

提供静态文件到底是什么意思?

Node.js 变量声明和范围