我对此应用程序的目标是创建监视数据库的逻辑,并在将文档添加到数据库时触发操作(例如发送电子邮件)。但是,由于首次填充数据库时可能无法启动此应用程序,因此我如何手动创建一个指向添加到集合中的第一个文档的 ResumeToken,以便在第一次运行时,我可以从头开始并迭代更改,直到到达终点。我认识到我需要存储来自 lastChangeStreamDocument 的 ResumeToken 以便将来重新启动,但我对“首次运行”场景感兴趣。我虽然enumerator.Reset();
是正确的选项,但它引发了一个异常,表明它不受支持。
我已经按照中提供的测试进行了操作https://github.com/mongodb/mongo-csharp-driver/blob/master/tests/MongoDB.Driver.Examples/ChangeStreamExamples.cs https://github.com/mongodb/mongo-csharp-driver/blob/master/tests/MongoDB.Driver.Examples/ChangeStreamExamples.cs并已使用以下代码成功配置了更改流
mongoClient = mongoClient ?? new MongoClient(ConnectionString); //Create client object if it is null
IMongoDatabase sandboxDB = mongoClient.GetDatabase("SandboxDB");
var collection = sandboxDB.GetCollection<BsonDocument>("CollectionToMonitor");
try
{
var cursor = collection.Watch();
var enumerator = cursor.ToEnumerable().GetEnumerator();
enumerator.MoveNext(); //Blocks until a record is UPDATED in the database
var lastChangeStreamDocument = enumerator.Current;
enumerator.Dispose();
//lastChangeStreamDocument.FullDocument.Should().Be(document);
}
catch( Exception ex)
{
Logger.WriteException(ex);
}
但是,使用此代码, enumerator.MoveNext() 行会阻塞,直到文档被更新,因此我只能在设置更改流后获取对更新文档的引用。
我的想法是搜索 local.oplog 数据库并获取插入到集合中的第一个文档的 UUID,并且成功了,但是,我没有找到一种方法可以将此引用转换为我可以提供的 ResumeToken 对象观看方法。
Update:
ResumeToken 似乎存储为 Base64,其中包含时间戳、o._id ObjectID 以及 oplog 条目中的 ui UUID。我需要再遍历一下代码,但从这个源代码中可以看出(https://github.com/mongodb/mongo/blob/c906f6357d22f66d58e3334868025069c62bd97b/src/mongo/db/pipeline/resume_token_test.cpp https://github.com/mongodb/mongo/blob/c906f6357d22f66d58e3334868025069c62bd97b/src/mongo/db/pipeline/resume_token_test.cpp)有不同格式的简历令牌。有了这些信息,希望我可以构建自己的简历令牌以匹配数据库期望的格式。
更新#2:
经过更多研究,我偶然发现了解析 a 的代码key_string
在蒙戈github.com/mongodb/mongo/src/mongo/db/storage/key_string.cpp https://github.com/mongodb/mongo/blob/b58afd1e34aedcd1c1df4f2b5613c60668eaaad4/src/mongo/db/storage/key_string.cpp。该文件包含 CType 的定义。我将 Base64 解码为字节数组,然后通过 CType 枚举定义,我能够更多地了解如何构建自己的 ResumeToken。
考虑以下示例:
更新文档后,我在 ChangeStream 上捕获了 ResumeToken。
glp9zsgAAAABRmRfaWQAZFp9zH40PyabFRwB/ABaEAQESw1YexhL967nKLXsT5Z+BA==
解码为字节数组:
82 5a 7d ce c8 00 00 00 01 46 64 5f 69 64 00 64 5a 7d cc 7e 34 3f 26 9b 15 1c 01 fc 00 5a 10 04 04 4b 0d 58 7b 18 4b f7 ae e7 28 b5 ec 4f 96 7e 04
我决定成为:
//Timestamp (of oplog entry??)
82 //CType::TimeStamp
5a 7d ce c8 00 00 00 01 //It appears to be expecting a 64b number
//I'm not sure why the last byte 0x01 unless it has something to do with little/bit endian
//Matching oplog doc has { ts: TimeStamp(1518194376, 1) }
// that integer converts to 0x5A7DCEC8
//Unknown Object
46 //CType::Object
64 5f 69 64 //Either expecting a 32b value or null terminated
00 //Null terminator or divider
//Document ID
64 //CType::OID
5a 7d cc 7e 34 3f 26 9b 15 1c 01 fc //o._id value from oplog entry
00 //OID expecting null terminated
//UUID
5a //CType::BinData
10 //Length (16b)
04 //BinDataType of newUUID (from bsontypes.h)
04 4b 0d 58 7b 18 4b f7 ae e7 28 b5 ec 4f 96 7e //UUID value from oplog entry
04 //Unknown byte. Perhaps end of ResumeToken, or end of UUID mark?
我现在遇到的问题是,如果我有很多 oplog 条目作为一个集合,并且我使用 oplog 中第一个条目中的 ts、ui 和 o._id 值来构建我自己的 ResumeToken(对未知值进行硬编码)0x4664 5f69 6400
块和结局0x04
字节,然后服务器在设置时接受它作为有效的 ResumeTokencollection.Watch
。但是, enumerator.moveNext() 调用返回的文档始终返回第三个 oplog 条目,而不是第二个!
在不知道 12Byte 块的用途,也不知道为什么我指向第三个而不是第二个条目的情况下,我在生产中依赖于此感到紧张。
更新#3:
这些有问题的字节块:
46 64 5f 69 64 00
0x46 = CType::Object
0x64 = d
0x5F = _
0x69 = i
0x64 = d
0x00 = NULL
以下字节块描述了受影响文档的 ObjectId,或者它的“_id”键。那么“d”字符的意义是什么?