我开发这个应用程序的目的是创建监控数据库的逻辑,并在将文档添加到数据库时触发操作(如发送邮箱).然而,由于这个应用程序可能不会在首次填充数据库时启动,我如何手动创建一个ResumeToken,指向添加到集合中的第一个文档,这样在第一次运行时,我就可以从头开始,迭代更改,直到结束.我知道我需要存储lastChangeStreamDocument中的ResumeToken,以便将来重新启动,但我对"首次运行"场景感兴趣.我认为enumerator.Reset();是正确的选项,但它抛出了一个异常,表明它不受支持.

我遵循了https://github.com/mongodb/mongo-csharp-driver/blob/master/tests/MongoDB.Driver.Examples/ChangeStreamExamples.cs中提供的测试,并用以下代码成功地配置了一个变更流

mongoClient = mongoClient ?? new MongoClient(ConnectionString);  //Create client object if it is null
IMongoDatabase sandboxDB = mongoClient.GetDatabase("SandboxDB");

var collection = sandboxDB.GetCollection<BsonDocument>("CollectionToMonitor");

try
{
    var cursor = collection.Watch();
    var enumerator = cursor.ToEnumerable().GetEnumerator();

    enumerator.MoveNext();  //Blocks until a record is UPDATED in the database
    var lastChangeStreamDocument = enumerator.Current;
    enumerator.Dispose();
    //lastChangeStreamDocument.FullDocument.Should().Be(document);

}
catch( Exception ex)
{
    Logger.WriteException(ex);
}

然而,使用这段代码,枚举器.MoveNext()行会一直阻塞,直到文档被更新,因此我只能在设置更改流后获得对更新文档的引用.

我想搜索一下当地人.oplog数据库并获取插入到集合中的第一个文档的UUID,并且成功了,但是,我没有看到将此引用转换为ResumeToken对象的方法,我可以为watch方法提供数据.


Update:

ResumeToken似乎存储为Base64,其中包含时间戳、o._idobjectid以及来自oplog条目的ui UUID.我需要进一步遍历代码,但从源代码(https://github.com/mongodb/mongo/blob/c906f6357d22f66d58e3334868025069c62bd97b/src/mongo/db/pipeline/resume_token_test.cpp)中可以看出,ResumeToken有不同的格式.有了这些信息,我希望能构建自己的简历令牌,以匹配数据库所期望的格式.


Update #2:

经过更多的研究,我偶然发现了在mongo中解析key_string的代码.此文件包含CType的定义.我将Base64解码为字节数组,然后通过CType enum定义,我能够更多地了解如何构建自己的ResumeToken.

考虑下面的例子:

glp9zsgAAAABRmRfaWQAZFp9zH40PyabFRwB/ABaEAQESw1YexhL967nKLXsT5Z+BA==

这被解码到字节数组:

82 5a 7d ce c8 00 00 00 01 46 64 5f 69 64 00 64 5a 7d cc 7e 34 3f 26 9b 15 1c 01 fc 00 5a 10 04 04 4b 0d 58 7b 18 4b f7 ae e7 28 b5 ec 4f 96 7e 04

我解码为:

//Timestamp (of oplog entry??)
82    //CType::TimeStamp
5a 7d ce c8 00 00 00 01   //It appears to be expecting a 64b number
//I'm not sure why the last byte 0x01 unless it has something to do with little/bit endian
//Matching oplog doc has { ts: TimeStamp(1518194376, 1) }
//  that integer converts to 0x5A7DCEC8

//Unknown Object
46    //CType::Object
64 5f 69 64     //Either expecting a 32b value or null terminated
00    //Null terminator or divider

//Document ID
64    //CType::OID
5a 7d cc 7e 34 3f 26 9b 15 1c 01 fc  //o._id value from oplog entry
00    //OID expecting null terminated

//UUID
5a    //CType::BinData
10    //Length (16b)
04    //BinDataType of newUUID (from bsontypes.h)
04 4b 0d 58 7b 18 4b f7 ae e7 28 b5 ec 4f 96 7e  //UUID value from oplog entry
04    //Unknown byte. Perhaps end of ResumeToken, or end of UUID mark?

我现在的问题是,如果我为一个集合创建了许多oplog条目,并且使用了ts,ui和o._id值从oplog中的第一个条目开始,构建我自己的ResumeToken(硬编码未知的0x4664 5f69 6400块和结尾的0x04字节,然后服务器在设置collection.Watch时将其作为有效的ResumeToken接受).但是,枚举器返回的文档.moveNext()调用始终返回第三个oplog条目,而不是第二个!

在生产过程中,我很紧张,因为我不知道12字节块的用途,也不知道为什么我会指向第三个条目,而不是第二个条目.


Update #3:

有问题的字节块:

46 64 5f 69 64 00

0x46 = CType::Object
0x64 = d
0x5F = _
0x69 = i
0x64 = d
0x00 = NULL

下面的字节块描述了受影响文档的ObjectId,或者它的"_id"键.那么"d"字符的意义是什么呢?

推荐答案

在我处理这个问题的过程中,我一直在用额外的信息更新这个问题,现在我已经设法把它全部拼凑起来,这样它就可以工作了.

以下是我创建的代码:

  1. 在本地文件中查找命名空间的第一个条目.oplog集合
  2. 从oplog文档生成一个ResumeToken(所以我们在第二个条目上继续)
  3. 测试这些功能的示例.

希望这段代码能对其他试图这么做的人有所帮助.

/// <summary>
/// Locates the first document for the given namespace in the local.oplog collection
/// </summary>
/// <param name="docNamespace">Namespace to search for</param>
/// <returns>First Document found in the local.oplog collection for the specified namespace</returns>
internal static BsonDocument GetFirstDocumentFromOpLog(string docNamespace)
{
    mongoClient = mongoClient ?? new MongoClient(ConnectionString);  //Create client object if it is null
    IMongoDatabase localDB = mongoClient.GetDatabase("local");
    var collection = localDB.GetCollection<BsonDocument>("oplog.rs");

    //Find the documents from the specified namespace (DatabaseName.CollectionName), that have an operation type of "insert" (The first entry to a collection must always be an insert)
    var filter = MongoDB.Bson.Serialization.BsonSerializer.Deserialize<BsonDocument>("{ $and: [ { 'ns': '" + docNamespace + "'}, { 'op': 'i'}] }");

    BsonDocument retDoc = null;
    try //to get the first document from the oplog entries
    {       
        retDoc = collection.Find<BsonDocument>(filter).First();
    }
    catch(Exception ex) { /*Logger.WriteException(ex);*/ }
    return retDoc;
}

/// <summary>
/// Takes a document from the OpLog and generates a ResumeToken
/// </summary>
/// <param name="firstDoc">BsonDocument from the local.oplog collection to base the ResumeToken on</param>
/// <returns>A ResumeToken that can be provided to a collection watch (ChangeStream) that points to the firstDoc provided</returns>
private static BsonDocument GetResumeTokenFromOpLogDoc(BsonDocument firstDoc)
{
    List<byte> hexVal = new List<byte>(34);

    //Insert Timestamp of document
    hexVal.Add(0x82);   //TimeStamp Tag
    byte[] docTimeStampByteArr = BitConverter.GetBytes(firstDoc["ts"].AsBsonTimestamp.Timestamp); //Timestamp is an integer, so we need to reverse it
    if (BitConverter.IsLittleEndian) { Array.Reverse(docTimeStampByteArr); }
    hexVal.AddRange(docTimeStampByteArr);

    //Expecting UInt64, so make sure we added 8 bytes (likely only added 4)
    hexVal.AddRange(new byte[] { 0x00, 0x00, 0x00, 0x01 }); //Not sure why the last bytes is a 0x01, but it was present in observed ResumeTokens

    //Unknown Object observed in a ResumeToken
    //0x46 = CType::Object, followed by the string "d_id" NULL
    //This may be something that identifies that the following value is for the "_id" field of the ObjectID given next
    hexVal.AddRange(new byte[] { 0x46, 0x64, 0x5F, 0x69, 0x64, 0x00 }); //Unknown Object, expected to be 32 bits, with a 0x00 terminator

    //Insert OID (from 0._id.ObjectID)
    hexVal.Add(0x64);   //OID Tag
    byte[] docByteArr = firstDoc["o"]["_id"].AsObjectId.ToByteArray();
    hexVal.AddRange(docByteArr);
    hexVal.Add(0x00);   //End of OID

    //Insert UUID (from ui) as BinData
    hexVal.AddRange(new byte[] { 0x5a, 0x10, 0x04 });   //0x5A = BinData, 0x10 is Length (16 bytes), 0x04 is BinDataType (newUUID)
    hexVal.AddRange(firstDoc["ui"].AsByteArray);

    hexVal.Add(0x04);   //Unknown marker (maybe end of resumeToken since 0x04 == ASCII 'EOT')

    //Package the binary data into a BsonDocument with the key "_data" and the value as a Base64 encoded string
    BsonDocument retDoc = new BsonDocument("_data", new BsonBinaryData(hexVal.ToArray()));
    return retDoc;
}


/// <summary>
/// Example Code for setting up and resuming to the second doc
/// </summary>
internal static void MonitorChangeStream()
{
    mongoClient = mongoClient ?? new MongoClient(ConnectionString);  //Create client object if it is null
    IMongoDatabase sandboxDB = mongoClient.GetDatabase("SandboxDB");
    var collection = sandboxDB.GetCollection<BsonDocument>("CollectionToMonitor");

    var options = new ChangeStreamOptions();
    options.FullDocument = ChangeStreamFullDocumentOption.UpdateLookup;

    try
    {
        var pipeline = new EmptyPipelineDefinition<ChangeStreamDocument<BsonDocument>>().Match("{ operationType: { $in: [ 'replace', 'insert', 'update' ] } }");  //Works

        //Build ResumeToken from the first document in the oplog collection
        BsonDocument resumeTokenRefDoc = GetFirstDocumentFromOpLog(collection.CollectionNamespace.ToString());
        if (resumeTokenRefDoc != null)
        {
            BsonDocument docResumeToken = GetResumeTokenFromOpLogDoc(resumeTokenRefDoc);
            options.ResumeAfter = docResumeToken;
        }

        //Setup the ChangeStream/Watch Cursor
        var cursor = collection.Watch(pipeline, options);
        var enumerator = cursor.ToEnumerable().GetEnumerator();

        enumerator.MoveNext();  //Blocks until a record is UPDATEd, REPLACEd or INSERTed in the database (thanks to the pipeline arg), or returns the second entry (thanks to the ResumeToken that points to the first entry)

        ChangeStreamDocument<BsonDocument> lastChangeStreamDocument = enumerator.Current;
        //lastChangeStreamDocument is now pointing to the second entry in the oplog, or the just received entry
        //A loop can be setup to call enumerator.MoveNext() to step through each entry in the oplog history and to also receive new events

        enumerator.Dispose();   //Be sure to dispose of the enumerator when finished.
    }
    catch( Exception ex)
    {
        //Logger.WriteException(ex);
    }
}

如果有人对代码的改进有任何建议,请提供建议.我还在学习.

Mongodb相关问答推荐

如何从MongoDB集合中获取第一个和最后一个元素?

筛选出嵌套数组中的记录Mongo DB聚合

获取文档字段名并将其作为嵌套字段添加到聚合中

联接不返回具有ObjectId和非ObjectId的结果

在 ExpirePolicy 之后从 Store 中删除元素

Mongoose 返回包含属性内任何位置的值的所有文档

MongoDB - 在 $lookup 管道中匹配键匹配不起作用

在 DBeaver 中连接到 Redis 或 MongoDB

如何将记录从一个 mongo 数据库插入另一个?

具有最佳插入/秒性能的数据库?

从命令行创建 MongoDB 用户

在 Mongo 中存储嵌套类别的最有效方法?

Spring Mongo 条件查询两次相同的字段

MongoDB聚合排序不起作用

Mongo:无法在 src/mongo/shell/mongo.js:145 连接到服务器 127.0.0.1:27017

MongoDB如何判断是否存在

如何在 MongoDB 中删除此弃用警告,为什么会这样?

更新时提示Field name duplication not allowed with modifiers

处理Mongodb连接的正确方法是什么?

如何使用 Spring 的 MongoTemplate 和 Query 类检索字段子集?