如何在第一个文档处恢复 MongoDB ChangeStream,而不仅仅是开始监听后的更改

2024-01-31

我对此应用程序的目标是创建监视数据库的逻辑,并在将文档添加到数据库时触发操作(例如发送电子邮件)。但是,由于首次填充数据库时可能无法启动此应用程序,因此我如何手动创建一个指向添加到集合中的第一个文档的 ResumeToken,以便在第一次运行时,我可以从头开始并迭代更改,直到到达终点。我认识到我需要存储来自 lastChangeStreamDocument 的 ResumeToken 以便将来重新启动,但我对“首次运行”场景感兴趣。我虽然enumerator.Reset();是正确的选项,但它引发了一个异常,表明它不受支持。

我已经按照中提供的测试进行了操作https://github.com/mongodb/mongo-csharp-driver/blob/master/tests/MongoDB.Driver.Examples/ChangeStreamExamples.cs https://github.com/mongodb/mongo-csharp-driver/blob/master/tests/MongoDB.Driver.Examples/ChangeStreamExamples.cs并已使用以下代码成功配置了更改流

mongoClient = mongoClient ?? new MongoClient(ConnectionString);  //Create client object if it is null
IMongoDatabase sandboxDB = mongoClient.GetDatabase("SandboxDB");

var collection = sandboxDB.GetCollection<BsonDocument>("CollectionToMonitor");

try
{
    var cursor = collection.Watch();
    var enumerator = cursor.ToEnumerable().GetEnumerator();

    enumerator.MoveNext();  //Blocks until a record is UPDATED in the database
    var lastChangeStreamDocument = enumerator.Current;
    enumerator.Dispose();
    //lastChangeStreamDocument.FullDocument.Should().Be(document);

}
catch( Exception ex)
{
    Logger.WriteException(ex);
}

但是,使用此代码, enumerator.MoveNext() 行会阻塞,直到文档被更新,因此我只能在设置更改流后获取对更新文档的引用。

我的想法是搜索 local.oplog 数据库并获取插入到集合中的第一个文档的 UUID,并且成功了,但是,我没有找到一种方法可以将此引用转换为我可以提供的 ResumeToken 对象观看方法。


Update:

ResumeToken 似乎存储为 Base64,其中包含时间戳、o._id ObjectID 以及 oplog 条目中的 ui UUID。我需要再遍历一下代码,但从这个源代码中可以看出(https://github.com/mongodb/mongo/blob/c906f6357d22f66d58e3334868025069c62bd97b/src/mongo/db/pipeline/resume_token_test.cpp https://github.com/mongodb/mongo/blob/c906f6357d22f66d58e3334868025069c62bd97b/src/mongo/db/pipeline/resume_token_test.cpp)有不同格式的简历令牌。有了这些信息,希望我可以构建自己的简历令牌以匹配数据库期望的格式。


更新#2:

经过更多研究,我偶然发现了解析 a 的代码key_string在蒙戈github.com/mongodb/mongo/src/mongo/db/storage/key_string.cpp https://github.com/mongodb/mongo/blob/b58afd1e34aedcd1c1df4f2b5613c60668eaaad4/src/mongo/db/storage/key_string.cpp。该文件包含 CType 的定义。我将 Base64 解码为字节数组,然后通过 CType 枚举定义,我能够更多地了解如何构建自己的 ResumeToken。

考虑以下示例: 更新文档后,我在 ChangeStream 上捕获了 ResumeToken。

glp9zsgAAAABRmRfaWQAZFp9zH40PyabFRwB/ABaEAQESw1YexhL967nKLXsT5Z+BA==

解码为字节数组:

82 5a 7d ce c8 00 00 00 01 46 64 5f 69 64 00 64 5a 7d cc 7e 34 3f 26 9b 15 1c 01 fc 00 5a 10 04 04 4b 0d 58 7b 18 4b f7 ae e7 28 b5 ec 4f 96 7e 04

我决定成为:

//Timestamp (of oplog entry??)
82    //CType::TimeStamp
5a 7d ce c8 00 00 00 01   //It appears to be expecting a 64b number
//I'm not sure why the last byte 0x01 unless it has something to do with little/bit endian
//Matching oplog doc has { ts: TimeStamp(1518194376, 1) }
//  that integer converts to 0x5A7DCEC8

//Unknown Object
46    //CType::Object
64 5f 69 64     //Either expecting a 32b value or null terminated
00    //Null terminator or divider

//Document ID
64    //CType::OID
5a 7d cc 7e 34 3f 26 9b 15 1c 01 fc  //o._id value from oplog entry
00    //OID expecting null terminated

//UUID
5a    //CType::BinData
10    //Length (16b)
04    //BinDataType of newUUID (from bsontypes.h)
04 4b 0d 58 7b 18 4b f7 ae e7 28 b5 ec 4f 96 7e  //UUID value from oplog entry
04    //Unknown byte. Perhaps end of ResumeToken, or end of UUID mark?

我现在遇到的问题是,如果我有很多 oplog 条目作为一个集合,并且我使用 oplog 中第一个条目中的 ts、ui 和 o._id 值来构建我自己的 ResumeToken(对未知值进行硬编码)0x4664 5f69 6400块和结局0x04字节,然后服务器在设置时接受它作为有效的 ResumeTokencollection.Watch。但是, enumerator.moveNext() 调用返回的文档始终返回第三个 oplog 条目,而不是第二个!

在不知道 12Byte 块的用途,也不知道为什么我指向第三个而不是第二个条目的情况下,我在生产中依赖于此感到紧张。


更新#3:

这些有问题的字节块:

46 64 5f 69 64 00

0x46 = CType::Object
0x64 = d
0x5F = _
0x69 = i
0x64 = d
0x00 = NULL

以下字节块描述了受影响文档的 ObjectId,或者它的“_id”键。那么“d”字符的意义是什么?


在解决这个问题时,我一直在用附加信息更新这个问题,现在我已经成功地将它拼凑起来,这样它就可以工作了。

下面是我创建的代码:

  1. 在 local.oplog 集合中查找命名空间的第一个条目
  2. 从该 oplog 文档生成 ResumeToken(因此我们在第二个条目上恢复)
  3. 测试这些功能的示例。

希望此代码对尝试执行相同操作的其他人有所帮助。

/// <summary>
/// Locates the first document for the given namespace in the local.oplog collection
/// </summary>
/// <param name="docNamespace">Namespace to search for</param>
/// <returns>First Document found in the local.oplog collection for the specified namespace</returns>
internal static BsonDocument GetFirstDocumentFromOpLog(string docNamespace)
{
    mongoClient = mongoClient ?? new MongoClient(ConnectionString);  //Create client object if it is null
    IMongoDatabase localDB = mongoClient.GetDatabase("local");
    var collection = localDB.GetCollection<BsonDocument>("oplog.rs");

    //Find the documents from the specified namespace (DatabaseName.CollectionName), that have an operation type of "insert" (The first entry to a collection must always be an insert)
    var filter = MongoDB.Bson.Serialization.BsonSerializer.Deserialize<BsonDocument>("{ $and: [ { 'ns': '" + docNamespace + "'}, { 'op': 'i'}] }");

    BsonDocument retDoc = null;
    try //to get the first document from the oplog entries
    {       
        retDoc = collection.Find<BsonDocument>(filter).First();
    }
    catch(Exception ex) { /*Logger.WriteException(ex);*/ }
    return retDoc;
}

/// <summary>
/// Takes a document from the OpLog and generates a ResumeToken
/// </summary>
/// <param name="firstDoc">BsonDocument from the local.oplog collection to base the ResumeToken on</param>
/// <returns>A ResumeToken that can be provided to a collection watch (ChangeStream) that points to the firstDoc provided</returns>
private static BsonDocument GetResumeTokenFromOpLogDoc(BsonDocument firstDoc)
{
    List<byte> hexVal = new List<byte>(34);

    //Insert Timestamp of document
    hexVal.Add(0x82);   //TimeStamp Tag
    byte[] docTimeStampByteArr = BitConverter.GetBytes(firstDoc["ts"].AsBsonTimestamp.Timestamp); //Timestamp is an integer, so we need to reverse it
    if (BitConverter.IsLittleEndian) { Array.Reverse(docTimeStampByteArr); }
    hexVal.AddRange(docTimeStampByteArr);

    //Expecting UInt64, so make sure we added 8 bytes (likely only added 4)
    hexVal.AddRange(new byte[] { 0x00, 0x00, 0x00, 0x01 }); //Not sure why the last bytes is a 0x01, but it was present in observed ResumeTokens

    //Unknown Object observed in a ResumeToken
    //0x46 = CType::Object, followed by the string "d_id" NULL
    //This may be something that identifies that the following value is for the "_id" field of the ObjectID given next
    hexVal.AddRange(new byte[] { 0x46, 0x64, 0x5F, 0x69, 0x64, 0x00 }); //Unknown Object, expected to be 32 bits, with a 0x00 terminator

    //Insert OID (from 0._id.ObjectID)
    hexVal.Add(0x64);   //OID Tag
    byte[] docByteArr = firstDoc["o"]["_id"].AsObjectId.ToByteArray();
    hexVal.AddRange(docByteArr);
    hexVal.Add(0x00);   //End of OID

    //Insert UUID (from ui) as BinData
    hexVal.AddRange(new byte[] { 0x5a, 0x10, 0x04 });   //0x5A = BinData, 0x10 is Length (16 bytes), 0x04 is BinDataType (newUUID)
    hexVal.AddRange(firstDoc["ui"].AsByteArray);

    hexVal.Add(0x04);   //Unknown marker (maybe end of resumeToken since 0x04 == ASCII 'EOT')

    //Package the binary data into a BsonDocument with the key "_data" and the value as a Base64 encoded string
    BsonDocument retDoc = new BsonDocument("_data", new BsonBinaryData(hexVal.ToArray()));
    return retDoc;
}


/// <summary>
/// Example Code for setting up and resuming to the second doc
/// </summary>
internal static void MonitorChangeStream()
{
    mongoClient = mongoClient ?? new MongoClient(ConnectionString);  //Create client object if it is null
    IMongoDatabase sandboxDB = mongoClient.GetDatabase("SandboxDB");
    var collection = sandboxDB.GetCollection<BsonDocument>("CollectionToMonitor");

    var options = new ChangeStreamOptions();
    options.FullDocument = ChangeStreamFullDocumentOption.UpdateLookup;

    try
    {
        var pipeline = new EmptyPipelineDefinition<ChangeStreamDocument<BsonDocument>>().Match("{ operationType: { $in: [ 'replace', 'insert', 'update' ] } }");  //Works

        //Build ResumeToken from the first document in the oplog collection
        BsonDocument resumeTokenRefDoc = GetFirstDocumentFromOpLog(collection.CollectionNamespace.ToString());
        if (resumeTokenRefDoc != null)
        {
            BsonDocument docResumeToken = GetResumeTokenFromOpLogDoc(resumeTokenRefDoc);
            options.ResumeAfter = docResumeToken;
        }

        //Setup the ChangeStream/Watch Cursor
        var cursor = collection.Watch(pipeline, options);
        var enumerator = cursor.ToEnumerable().GetEnumerator();

        enumerator.MoveNext();  //Blocks until a record is UPDATEd, REPLACEd or INSERTed in the database (thanks to the pipeline arg), or returns the second entry (thanks to the ResumeToken that points to the first entry)

        ChangeStreamDocument<BsonDocument> lastChangeStreamDocument = enumerator.Current;
        //lastChangeStreamDocument is now pointing to the second entry in the oplog, or the just received entry
        //A loop can be setup to call enumerator.MoveNext() to step through each entry in the oplog history and to also receive new events

        enumerator.Dispose();   //Be sure to dispose of the enumerator when finished.
    }
    catch( Exception ex)
    {
        //Logger.WriteException(ex);
    }
}

如果有人对代码改进有任何建议,请提出建议。我还在学习。

本文内容由网友自发贡献,版权归原作者所有,本站不承担相应法律责任。如您发现有涉嫌抄袭侵权的内容,请联系:hwhale#tublm.com(使用前将#替换为@)

如何在第一个文档处恢复 MongoDB ChangeStream,而不仅仅是开始监听后的更改 的相关文章

  • 为什么我应该使用内联代码? [复制]

    这个问题在这里已经有答案了 我是一名 C C 开发人员 这里有几个始终困扰我的问题 常规 代码和内联代码之间有很大区别吗 主要区别是什么 内联代码只是宏的一种 形式 吗 选择内联代码时必须进行什么样的权衡 Thanks 表现 正如之前的答案
  • C语言实现延时函数

    我想使用空循环实现延迟函数 但是完成一次循环所需的时间取决于编译器和机器 我希望我的程序自行确定时间并将程序延迟指定的时间 谁能给我任何想法如何做到这一点 注意 有一个名为delay 的函数可以将系统暂停指定的毫秒 是否可以在不使用此功能的
  • 值类型如何实现引用类型

    我遇到了一个值类型正在实现 ref 的场景 类型 只是想知道这怎么可能 幕后发生了什么 结构体是值类型 接口是引用 类型但结构可以实现接口而不会出现任何错误 有什么想法吗 提前致谢 实际上 它同时以两种不同的方式进行 首先 任何值类型都可以
  • C++ 并行任务的开销

    我有以下简单的功能 include
  • C 链表销毁函数

    我正在尝试学习 C 和很多人一样 我对指针有点困惑 无论如何 我创建了一个递归函数来销毁我的链表 但是正如我调试的那样 当我从函数返回时 列表的头部不应该为空 所以我猜这是对指针的一些基本误解 这是函数 void destroy struc
  • _MM_TRANSPOSE4_PS 在 GCC 中导致编译器错误?

    我第一次在 GCC 而不是 MSVC 中编译我的数学库 并经历了所有的小错误 我遇到了一个根本没有意义的错误 Line 284 error lvalue required as left operand of assignment 284号
  • 编译器消息“警告:格式‘%s’需要类型‘char *’,但参数 2 具有类型‘char (*)’”

    我正在尝试运行一个简单的 C 程序 但收到此错误 警告 格式 s 需要类型 char 但参数 2 的类型为 char 20 我在跑步Mac OS X v10 8 https en wikipedia org wiki OS X Mounta
  • Cookie 在 ASP.net 中失去价值

    我有以下设置 cookie 的代码 string locale DropDownList this LoginUser FindControl locale SelectedValue HttpCookie cookie new HttpC
  • FFplay成功移入我的Winform中,如何设置它无边框?

    用这个代码 在 C 应用程序中显示 tcp 视频流 来自 FFPLAY FFMPEG https stackoverflow com questions 14201894 show a tcp video stream from ffpla
  • 使用 FromBase64Transform 解码 base64 文件流

    The example https msdn microsoft com en us library system security cryptography frombase64transform 28v vs 110 29 aspx从M
  • 将旧的 Unity 代码升级到 Unity 5

    在触发按钮上播放动画的代码似乎不起作用 我在 Youtube 上看到了一个视频 内容很简单animation Play 它可以在该视频上运行 但我无法让它在我的计算机上运行 我做错了什么还是团结改变了它 请帮助我在网上找不到解决方案 所有
  • WCF 服务中的缓冲区大小

    我们有一个 WCF 服务 它执行某些存储过程并将结果返回给 silverlight 客户端 某些存储过程最多返回 80K 行 下面给出的是 web config 中服务的设置
  • 我的代码哪里有泄漏?

    下面是我的代码 它打开一个 XML 文件 old xml 过滤无效字符并写入另一个 XML 文件 abc xml 最后 我将再次加载 XML abc xml 当执行以下行时 出现异常 表示 xml 文件被另一个进程使用 xDoc Load
  • Rx 在不同的线程上生产和消费

    我试图通过此处的示例代码来简化我的问题 我有一个生产者线程不断地输入数据 并且我尝试在批次之间添加时间延迟来对其进行批处理 以便 UI 有时间渲染它 但结果并不如预期 生产者和消费者似乎在同一个线程上 我不希望批处理缓冲区在正在生成的线程上
  • 使用 DataGridViewCheckboxCell 真正禁用 DataGridView 中的复选框

    有谁知道如何使用 DataGridViewCheckboxCell 禁用 DataGridView 中的复选框 我可以将其设置为只读 并设置背景颜色 但我无法让复选框本身显示为禁用状态 有什么想法吗 Guess 你必须自己画 http so
  • 小数精度

    我使用小数类型进行高精度计算 货币 但我今天遇到了这个简单的划分 1 1 37 这应该再次得到 37 http www wolframalpha com input i 1 2F 281 2F37 29 http www wolframal
  • SMTP 客户端在 C# 应用程序中显示错误“未采取请求的操作”

    我正在尝试使用 hotmail 帐户设置电子邮件发送应用程序 代码如下所示 MailMessage mail new MailMessage from to mail Subject Proba email mail Attachments
  • 如何使用实体框架设置连接字符串

    我将 EF6 与 MySQL 结合使用 并有一个用于多个数据库的模型 我希望能够在我的表单中设置连接设置 如何以编程方式设置模型的连接字符串 你应该使用EntityConnectionFactory这就是您所需要的 public strin
  • 为什么 Mongohint 可以使查询运行速度提高 10 倍?

    如果我使用explain 从shell运行mongo查询 获取所使用的索引的名称 然后再次运行相同的查询 但使用hint 指定要使用的相同索引 解释计划中的 millis 字段是显着下降 例如 没有提供任何提示 gt gt db event
  • 从其对象获取结构体字段的名称和类型

    例如 我有一个类似这样的结构 struct Test int i float f char ch 10 我有一个该结构的对象 例如 Test obj 现在 我想以编程方式获取字段名称和类型obj 是否可以 顺便说一句 这是 C 你正在要求C

随机推荐