我开发这个应用程序的目的是创建监控数据库的逻辑,并在将文档添加到数据库时触发操作(如发送邮箱).然而,由于这个应用程序可能不会在首次填充数据库时启动,我如何手动创建一个ResumeToken,指向添加到集合中的第一个文档,这样在第一次运行时,我就可以从头开始,迭代更改,直到结束.我知道我需要存储lastChangeStreamDocument中的ResumeToken,以便将来重新启动,但我对"首次运行"场景感兴趣.我认为enumerator.Reset();
是正确的选项,但它抛出了一个异常,表明它不受支持.
我遵循了https://github.com/mongodb/mongo-csharp-driver/blob/master/tests/MongoDB.Driver.Examples/ChangeStreamExamples.cs中提供的测试,并用以下代码成功地配置了一个变更流
mongoClient = mongoClient ?? new MongoClient(ConnectionString); //Create client object if it is null
IMongoDatabase sandboxDB = mongoClient.GetDatabase("SandboxDB");
var collection = sandboxDB.GetCollection<BsonDocument>("CollectionToMonitor");
try
{
var cursor = collection.Watch();
var enumerator = cursor.ToEnumerable().GetEnumerator();
enumerator.MoveNext(); //Blocks until a record is UPDATED in the database
var lastChangeStreamDocument = enumerator.Current;
enumerator.Dispose();
//lastChangeStreamDocument.FullDocument.Should().Be(document);
}
catch( Exception ex)
{
Logger.WriteException(ex);
}
然而,使用这段代码,枚举器.MoveNext()行会一直阻塞,直到文档被更新,因此我只能在设置更改流后获得对更新文档的引用.
我想搜索一下当地人.oplog数据库并获取插入到集合中的第一个文档的UUID,并且成功了,但是,我没有看到将此引用转换为ResumeToken对象的方法,我可以为watch方法提供数据.
Update:
ResumeToken似乎存储为Base64,其中包含时间戳、o._idobjectid以及来自oplog条目的ui UUID.我需要进一步遍历代码,但从源代码(https://github.com/mongodb/mongo/blob/c906f6357d22f66d58e3334868025069c62bd97b/src/mongo/db/pipeline/resume_token_test.cpp)中可以看出,ResumeToken有不同的格式.有了这些信息,我希望能构建自己的简历令牌,以匹配数据库所期望的格式.
Update #2:
经过更多的研究,我偶然发现了在mongo中解析key_string
的代码.此文件包含CType的定义.我将Base64解码为字节数组,然后通过CType enum定义,我能够更多地了解如何构建自己的ResumeToken.
考虑下面的例子:
glp9zsgAAAABRmRfaWQAZFp9zH40PyabFRwB/ABaEAQESw1YexhL967nKLXsT5Z+BA==
这被解码到字节数组:
82 5a 7d ce c8 00 00 00 01 46 64 5f 69 64 00 64 5a 7d cc 7e 34 3f 26 9b 15 1c 01 fc 00 5a 10 04 04 4b 0d 58 7b 18 4b f7 ae e7 28 b5 ec 4f 96 7e 04
我解码为:
//Timestamp (of oplog entry??)
82 //CType::TimeStamp
5a 7d ce c8 00 00 00 01 //It appears to be expecting a 64b number
//I'm not sure why the last byte 0x01 unless it has something to do with little/bit endian
//Matching oplog doc has { ts: TimeStamp(1518194376, 1) }
// that integer converts to 0x5A7DCEC8
//Unknown Object
46 //CType::Object
64 5f 69 64 //Either expecting a 32b value or null terminated
00 //Null terminator or divider
//Document ID
64 //CType::OID
5a 7d cc 7e 34 3f 26 9b 15 1c 01 fc //o._id value from oplog entry
00 //OID expecting null terminated
//UUID
5a //CType::BinData
10 //Length (16b)
04 //BinDataType of newUUID (from bsontypes.h)
04 4b 0d 58 7b 18 4b f7 ae e7 28 b5 ec 4f 96 7e //UUID value from oplog entry
04 //Unknown byte. Perhaps end of ResumeToken, or end of UUID mark?
我现在的问题是,如果我为一个集合创建了许多oplog条目,并且使用了ts,ui和o._id值从oplog中的第一个条目开始,构建我自己的ResumeToken(硬编码未知的0x4664 5f69 6400
块和结尾的0x04
字节,然后服务器在设置collection.Watch
时将其作为有效的ResumeToken接受).但是,枚举器返回的文档.moveNext()调用始终返回第三个oplog条目,而不是第二个!
在生产过程中,我很紧张,因为我不知道12字节块的用途,也不知道为什么我会指向第三个条目,而不是第二个条目.
Update #3:
有问题的字节块:
46 64 5f 69 64 00
0x46 = CType::Object
0x64 = d
0x5F = _
0x69 = i
0x64 = d
0x00 = NULL
下面的字节块描述了受影响文档的ObjectId,或者它的"_id"键.那么"d"字符的意义是什么呢?