我希望根据一个字段对所有文档进行分组,但要限制 for each 值分组的文档数量.

每条消息都有一个CONVERSACTION_ID.我需要 for each CONTACTIONS_ID获取10条或更少的消息.

我可以根据以下命令进行分组,但不知道如何限制 除切片结果外的分组文档数 Message.aggregate({'$group':{_id:'$conversation_ID',msgs:{'$push':{msgid:'$_id'}}}})

如何将每个CONVERSACTION_ID的消息数组长度限制为10?

推荐答案

现代的

从MongoDB3.6开始,有一种"新颖"的方法,使用$lookup来执行"自连接",其方式与下面演示的原始游标处理大致相同.

由于在此版本中,您可以将"pipeline"参数指定为$lookup作为"Join"的源,这实质上意味着您可以使用$match$limit来收集和"限制"数组的条目:

db.messages.aggregate([
  { "$group": { "_id": "$conversation_ID" } },
  { "$lookup": {
    "from": "messages",
    "let": { "conversation": "$_id" },
    "pipeline": [
      { "$match": { "$expr": { "$eq": [ "$conversation_ID", "$$conversation" ] } }},
      { "$limit": 10 },
      { "$project": { "_id": 1 } }
    ],
    "as": "msgs"
  }}
])

您可以 Select 在$lookup后面添加额外的投影,以便使数组项成为简单的值,而不是具有_id键的文档,但只需执行上述操作就可以得到基本结果.

仍然有未完成的SERVER-9277个实际要求直接"限制推动",但在过渡期间,以这种方式使用$lookup是一个可行的 Select .

NOTE:还有$slice个是在写了原始答案后介绍的,在原始内容中被"杰出的吉拉问题"提到.虽然使用小的结果集可以得到相同的结果,但它仍然需要将所有内容"推入"数组,然后将最终数组输出限制为所需的长度.

这就是主要的区别,也是为什么对于大的结果来说,$slice是不实际的.但当然,也可以在需要的情况下交替使用.

关于这两种替代用法,mongodb group values by multiple fields还有更多的细节.


原创

如前所述,这不是不可能的,但肯定是一个可怕的问题.

实际上,如果您主要关心的是结果数组会非常大,那么最好的方法是将每个不同的"CONVERSACTION_ID"作为单独的查询提交,然后组合您的结果.非常符合MongoDB 2.6语法,可能需要根据您的语言实现实际情况进行一些调整:

var results = [];
db.messages.aggregate([
    { "$group": {
        "_id": "$conversation_ID"
    }}
]).forEach(function(doc) {
    db.messages.aggregate([
        { "$match": { "conversation_ID": doc._id } },
        { "$limit": 10 },
        { "$group": {
            "_id": "$conversation_ID",
            "msgs": { "$push": "$_id" }
        }}
    ]).forEach(function(res) {
        results.push( res );
    });
});

但这完全取决于这是否是你试图避免的.因此,让我们来看看真正的答案:


这里的第一个问题是,没有函数来"限制"被"推入"到数组中的项数.这当然是我们想要的,但是功能目前还不存在.

第二个问题是,即使在将所有项推入数组时,也不能在聚合管道中使用$slice或任何类似的运算符.因此,目前还没有办法通过简单的操作从生成的数组中仅获得"前10个"结果.

但是您实际上可以生成一组操作来有效地对您的分组边界进行"切片".它相当复杂,例如,在这里我将把"分片"的数组元素减少到"6".这里的主要原因是演示该过程,并说明如何在不 destruct 不包含要"切片"到的总数的数组的情况下完成此操作.

给出一个文档样本:

{ "_id" : 1, "conversation_ID" : 123 }
{ "_id" : 2, "conversation_ID" : 123 }
{ "_id" : 3, "conversation_ID" : 123 }
{ "_id" : 4, "conversation_ID" : 123 }
{ "_id" : 5, "conversation_ID" : 123 }
{ "_id" : 6, "conversation_ID" : 123 }
{ "_id" : 7, "conversation_ID" : 123 }
{ "_id" : 8, "conversation_ID" : 123 }
{ "_id" : 9, "conversation_ID" : 123 }
{ "_id" : 10, "conversation_ID" : 123 }
{ "_id" : 11, "conversation_ID" : 123 }
{ "_id" : 12, "conversation_ID" : 456 }
{ "_id" : 13, "conversation_ID" : 456 }
{ "_id" : 14, "conversation_ID" : 456 }
{ "_id" : 15, "conversation_ID" : 456 }
{ "_id" : 16, "conversation_ID" : 456 }

您可以在那里看到,当按条件分组时,您将得到一个包含10个元素的数组和另一个包含"5"元素的array.这里要做的是将这两个元素都减少到前"六个",而不" destruct "只与"五个"元素匹配的array.

以及以下查询:

db.messages.aggregate([
    { "$group": {
        "_id": "$conversation_ID",
        "first": { "$first": "$_id" },
        "msgs": { "$push": "$_id" },
    }},
    { "$unwind": "$msgs" },
    { "$project": {
        "msgs": 1,
        "first": 1,
        "seen": { "$eq": [ "$first", "$msgs" ] }
    }},
    { "$sort": { "seen": 1 }},
    { "$group": {
        "_id": "$_id",
        "msgs": { 
            "$push": {
               "$cond": [ { "$not": "$seen" }, "$msgs", false ]
            }
        },
        "first": { "$first": "$first" },
        "second": { "$first": "$msgs" }
    }},
    { "$unwind": "$msgs" },
    { "$project": {
        "msgs": 1,
        "first": 1,
        "second": 1,
        "seen": { "$eq": [ "$second", "$msgs" ] }
    }},
    { "$sort": { "seen": 1 }},
    { "$group": {
        "_id": "$_id",
        "msgs": { 
            "$push": {
               "$cond": [ { "$not": "$seen" }, "$msgs", false ]
            }
        },
        "first": { "$first": "$first" },
        "second": { "$first": "$second" },
        "third": { "$first": "$msgs" }
    }},
    { "$unwind": "$msgs" },
    { "$project": {
        "msgs": 1,
        "first": 1,
        "second": 1,
        "third": 1,
        "seen": { "$eq": [ "$third", "$msgs" ] },
    }},
    { "$sort": { "seen": 1 }},
    { "$group": {
        "_id": "$_id",
        "msgs": { 
            "$push": {
               "$cond": [ { "$not": "$seen" }, "$msgs", false ]
            }
        },
        "first": { "$first": "$first" },
        "second": { "$first": "$second" },
        "third": { "$first": "$third" },
        "forth": { "$first": "$msgs" }
    }},
    { "$unwind": "$msgs" },
    { "$project": {
        "msgs": 1,
        "first": 1,
        "second": 1,
        "third": 1,
        "forth": 1,
        "seen": { "$eq": [ "$forth", "$msgs" ] }
    }},
    { "$sort": { "seen": 1 }},
    { "$group": {
        "_id": "$_id",
        "msgs": { 
            "$push": {
               "$cond": [ { "$not": "$seen" }, "$msgs", false ]
            }
        },
        "first": { "$first": "$first" },
        "second": { "$first": "$second" },
        "third": { "$first": "$third" },
        "forth": { "$first": "$forth" },
        "fifth": { "$first": "$msgs" }
    }},
    { "$unwind": "$msgs" },
    { "$project": {
        "msgs": 1,
        "first": 1,
        "second": 1,
        "third": 1,
        "forth": 1,
        "fifth": 1,
        "seen": { "$eq": [ "$fifth", "$msgs" ] }
    }},
    { "$sort": { "seen": 1 }},
    { "$group": {
        "_id": "$_id",
        "msgs": { 
            "$push": {
               "$cond": [ { "$not": "$seen" }, "$msgs", false ]
            }
        },
        "first": { "$first": "$first" },
        "second": { "$first": "$second" },
        "third": { "$first": "$third" },
        "forth": { "$first": "$forth" },
        "fifth": { "$first": "$fifth" },
        "sixth": { "$first": "$msgs" },
    }},
    { "$project": {
         "first": 1,
         "second": 1,
         "third": 1,
         "forth": 1,
         "fifth": 1,
         "sixth": 1,
         "pos": { "$const": [ 1,2,3,4,5,6 ] }
    }},
    { "$unwind": "$pos" },
    { "$group": {
        "_id": "$_id",
        "msgs": {
            "$push": {
                "$cond": [
                    { "$eq": [ "$pos", 1 ] },
                    "$first",
                    { "$cond": [
                        { "$eq": [ "$pos", 2 ] },
                        "$second",
                        { "$cond": [
                            { "$eq": [ "$pos", 3 ] },
                            "$third",
                            { "$cond": [
                                { "$eq": [ "$pos", 4 ] },
                                "$forth",
                                { "$cond": [
                                    { "$eq": [ "$pos", 5 ] },
                                    "$fifth",
                                    { "$cond": [
                                        { "$eq": [ "$pos", 6 ] },
                                        "$sixth",
                                        false
                                    ]}
                                ]}
                            ]}
                        ]}
                    ]}
                ]
            }
        }
    }},
    { "$unwind": "$msgs" },
    { "$match": { "msgs": { "$ne": false } }},
    { "$group": {
        "_id": "$_id",
        "msgs": { "$push": "$msgs" }
    }}
])

您将获得数组中的最高结果,最多六个条目:

{ "_id" : 123, "msgs" : [ 1, 2, 3, 4, 5, 6 ] }
{ "_id" : 456, "msgs" : [ 12, 13, 14, 15 ] }

如你所见,这里充满了乐趣.

初始分组后,基本上希望从堆栈中"弹出"$first值以获得数组结果.为了简化这个过程,我们实际上是在初始操作中这样做的.因此,这个过程变成:

  • $unwind数组
  • $eq相等匹配时已经看到的值进行比较
  • $sort将结果"浮点"false个看不见的值到顶部(这仍然保留顺序)
  • $group再次返回,并将看不见的$first值"弹出"为堆栈中的下一个成员.此外,它还使用$cond运算符将数组堆栈中的"see"值替换为false,以帮助计算.

$cond的最后一个动作是确保将来的迭代不会在"切片"计数大于数组成员的地方一遍又一遍地添加数组的最后一个值.

整个过程需要对你想要"切片"的项目进行重复.因为我们已经在初始分组中找到了"第一个"项,这意味着需要n-1次迭代才能得到所需的切片结果.

最后几个步骤实际上只是将所有内容转换回数组的可选示例,以获得最终显示的结果.所以实际上只是根据匹配位置有条件地将Items或false推回,最后"过滤"出所有false个值,这样结束数组就分别有"6"和"5"个成员.

因此,没有标准操作符来适应这一点,并且您不能仅将推送"限制"为数组中的5或10项或其他任何项.但是如果你真的必须这么做,那么这是你最好的办法.


您可以使用mapReduce来实现这一点,同时放弃聚合框架.我将采取的方法(在合理的限制范围内)是在服务器上有效地拥有内存中的哈希映射,并将数组累积到该映射,同时使用JavaScript切片来"限制"结果:

db.messages.mapReduce(
    function () {

        if ( !stash.hasOwnProperty(this.conversation_ID) ) {
            stash[this.conversation_ID] = [];
        }

        if ( stash[this.conversation_ID.length < maxLen ) {
            stash[this.conversation_ID].push( this._id );
            emit( this.conversation_ID, 1 );
        }

    },
    function(key,values) {
        return 1;   // really just want to keep the keys
    },
    { 
        "scope": { "stash": {}, "maxLen": 10 },
        "finalize": function(key,value) {
            return { "msgs": stash[key] };                
        },
        "out": { "inline": 1 }
    }
)

这样就基本上建立了"内存中"对象,该对象与发出的"键"匹配,数组的大小永远不会超过您希望从结果中获取的最大值.此外,当达到最大堆栈时,这甚至不需要"发出"项目.

reduce部分实际上没有做什么,只不过本质上只是将其简化为"key"和一个值.因此,为了防止我们的reducer没有被调用,如果一个键只有一个值,那么finalize函数负责将"stash"键映射到最终输出.

这种方法的有效性取决于输出的大小,JavaScript判断当然不快,但可能比在管道中处理大型数组更快.


JIRA issues向上投票,以实际拥有一个"切片"运算符,甚至"$PUSH"和"$addToSet"的"限制",这两个操作都很方便.个人希望至少可以对$map运算符进行一些修改,以便在处理时expose "当前索引"的值.这将有效地允许"切片"和其他操作.

你真的想把它编码成"生成"所有需要的迭代.如果这里的答案得到了足够的爱和/或我在tuits中等待的其他时间,那么我可能会添加一些代码来演示如何做到这一点.这已经是一个相当长的回应.


生成管道的代码:

var key = "$conversation_ID";
var val = "$_id";
var maxLen = 10;

var stack = [];
var pipe = [];
var fproj = { "$project": { "pos": { "$const": []  } } };

for ( var x = 1; x <= maxLen; x++ ) {

    fproj["$project"][""+x] = 1;
    fproj["$project"]["pos"]["$const"].push( x );

    var rec = {
        "$cond": [ { "$eq": [ "$pos", x ] }, "$"+x ]
    };
    if ( stack.length == 0 ) {
        rec["$cond"].push( false );
    } else {
        lval = stack.pop();
        rec["$cond"].push( lval );
    }

    stack.push( rec );

    if ( x == 1) {
        pipe.push({ "$group": {
           "_id": key,
           "1": { "$first": val },
           "msgs": { "$push": val }
        }});
    } else {
        pipe.push({ "$unwind": "$msgs" });
        var proj = {
            "$project": {
                "msgs": 1
            }
        };
        
        proj["$project"]["seen"] = { "$eq": [ "$"+(x-1), "$msgs" ] };
       
        var grp = {
            "$group": {
                "_id": "$_id",
                "msgs": {
                    "$push": {
                        "$cond": [ { "$not": "$seen" }, "$msgs", false ]
                    }
                }
            }
        };

        for ( n=x; n >= 1; n-- ) {
            if ( n != x ) 
                proj["$project"][""+n] = 1;
            grp["$group"][""+n] = ( n == x ) ? { "$first": "$msgs" } : { "$first": "$"+n };
        }

        pipe.push( proj );
        pipe.push({ "$sort": { "seen": 1 } });
        pipe.push(grp);
    }
}

pipe.push(fproj);
pipe.push({ "$unwind": "$pos" });
pipe.push({
    "$group": {
        "_id": "$_id",
        "msgs": { "$push": stack[0] }
    }
});
pipe.push({ "$unwind": "$msgs" });
pipe.push({ "$match": { "msgs": { "$ne": false } }});
pipe.push({
    "$group": {
        "_id": "$_id",
        "msgs": { "$push": "$msgs" }
    }
}); 

这就构建了基本的迭代方法,步骤从$unwind$group,最高可达maxLen步.还嵌入了所需的最终投影的详细信息和"嵌套"的条件语句.最后一个基本上是这个问题的做法:

Does MongoDB's $in clause guarantee order?

Database相关问答推荐

如何在 C# 控制台应用程序中执行 CMD 命令?

MySQL FIND_IN_SET 的对面

一个强大的 MySQL 管理工具,具有与 SQL Server Management Studio 类似的功能

什么是非规范化 mysql 数据库的好方法?

MongoDB 单文档大小限制为 16MB

查询最后一天、上周、上个月SQLite

如何识别 DB2 端口号

怎样有效存储 7.300.000.000 行?

在默认路径下使用脚本创建数据库?

为数据库应用程序留下审计跟踪/更改历史的有效策略?

在 Heroku 生产站点上清除 Rails 应用程序数据库

PostgreSQL 哈希索引

MySQL 中 NOW()、SYSDATE() 和 CURRENT_DATE() 之间的区别

获取数据库路径

postgreSQL 同时将列类型从 int 更改为 bigint

如何在数据库中获取原始的created_at值(不是转换为 ActiveSupport::TimeWithZone 的对象)

Web 应用程序的文件存储:文件系统、数据库和 NoSQL 引擎

NoSQL 数据库是否使用或需要索引?

包含 8000 万条记录并添加索引的表需要超过 18 小时(或永远)!怎么办?

为什么 Rails 迁移在应用程序中定义外键而不在数据库中定义外键?