我对此应用程序的目标是创建监视数据库的逻辑,并在将文档添加到数据库时触发操作(例如发送电子邮件)。但是,由于首次填充数据库时可能无法启动此应用程序,因此我如何手动创建一个指向添加到集合中的第一个文档的 ResumeToken,以便在第一次运行时,我可以从头开始并迭代更改,直到到达终点。我认识到我需要存储来自 lastChangeStreamDocument 的 ResumeToken 以便将来重新启动,但我对“首次运行”场景感兴趣。我虽然enumerator.Reset();是正确的选项,但它引发了一个异常,表明它不受支持。

我已经按照中提供的测试进行了操作https://github.com/mongodb/mongo-csharp-driver/blob/master/tests/MongoDB.Driver.Examples/ChangeStreamExamples.cs https://github.com/mongodb/mongo-csharp-driver/blob/master/tests/MongoDB.Driver.Examples/ChangeStreamExamples.cs并已使用以下代码成功配置了更改流

mongoClient = mongoClient ?? new MongoClient(ConnectionString);  //Create client object if it is null
IMongoDatabase sandboxDB = mongoClient.GetDatabase("SandboxDB");

var collection = sandboxDB.GetCollection<BsonDocument>("CollectionToMonitor");

    var cursor = collection.Watch();
    var enumerator = cursor.ToEnumerable().GetEnumerator();

    enumerator.MoveNext();  //Blocks until a record is UPDATED in the database
    var lastChangeStreamDocument = enumerator.Current;

catch( Exception ex)

但是,使用此代码, enumerator.MoveNext() 行会阻塞,直到文档被更新,因此我只能在设置更改流后获取对更新文档的引用。

我的想法是搜索 local.oplog 数据库并获取插入到集合中的第一个文档的 UUID,并且成功了,但是,我没有找到一种方法可以将此引用转换为我可以提供的 ResumeToken 对象观看方法。


ResumeToken 似乎存储为 Base64,其中包含时间戳、o._id ObjectID 以及 oplog 条目中的 ui UUID。我需要再遍历一下代码,但从这个源代码中可以看出(https://github.com/mongodb/mongo/blob/c906f6357d22f66d58e3334868025069c62bd97b/src/mongo/db/pipeline/resume_token_test.cpp https://github.com/mongodb/mongo/blob/c906f6357d22f66d58e3334868025069c62bd97b/src/mongo/db/pipeline/resume_token_test.cpp)有不同格式的简历令牌。有了这些信息,希望我可以构建自己的简历令牌以匹配数据库期望的格式。


经过更多研究,我偶然发现了解析 a 的代码key_string在蒙戈github.com/mongodb/mongo/src/mongo/db/storage/key_string.cpp https://github.com/mongodb/mongo/blob/b58afd1e34aedcd1c1df4f2b5613c60668eaaad4/src/mongo/db/storage/key_string.cpp。该文件包含 CType 的定义。我将 Base64 解码为字节数组,然后通过 CType 枚举定义,我能够更多地了解如何构建自己的 ResumeToken。

考虑以下示例: 更新文档后,我在 ChangeStream 上捕获了 ResumeToken。



82 5a 7d ce c8 00 00 00 01 46 64 5f 69 64 00 64 5a 7d cc 7e 34 3f 26 9b 15 1c 01 fc 00 5a 10 04 04 4b 0d 58 7b 18 4b f7 ae e7 28 b5 ec 4f 96 7e 04


//Timestamp (of oplog entry??)
82    //CType::TimeStamp
5a 7d ce c8 00 00 00 01   //It appears to be expecting a 64b number
//I'm not sure why the last byte 0x01 unless it has something to do with little/bit endian
//Matching oplog doc has { ts: TimeStamp(1518194376, 1) }
//  that integer converts to 0x5A7DCEC8

//Unknown Object
46    //CType::Object
64 5f 69 64     //Either expecting a 32b value or null terminated
00    //Null terminator or divider

//Document ID
64    //CType::OID
5a 7d cc 7e 34 3f 26 9b 15 1c 01 fc  //o._id value from oplog entry
00    //OID expecting null terminated

5a    //CType::BinData
10    //Length (16b)
04    //BinDataType of newUUID (from bsontypes.h)
04 4b 0d 58 7b 18 4b f7 ae e7 28 b5 ec 4f 96 7e  //UUID value from oplog entry
04    //Unknown byte. Perhaps end of ResumeToken, or end of UUID mark?

我现在遇到的问题是,如果我有很多 oplog 条目作为一个集合,并且我使用 oplog 中第一个条目中的 ts、ui 和 o._id 值来构建我自己的 ResumeToken(对未知值进行硬编码)0x4664 5f69 6400块和结局0x04字节,然后服务器在设置时接受它作为有效的 ResumeTokencollection.Watch。但是, enumerator.moveNext() 调用返回的文档始终返回第三个 oplog 条目,而不是第二个!

在不知道 12Byte 块的用途,也不知道为什么我指向第三个而不是第二个条目的情况下,我在生产中依赖于此感到紧张。



46 64 5f 69 64 00

0x46 = CType::Object
0x64 = d
0x5F = _
0x69 = i
0x64 = d
0x00 = NULL

以下字节块描述了受影响文档的 ObjectId,或者它的“_id”键。那么“d”字符的意义是什么?



  1. 在 local.oplog 集合中查找命名空间的第一个条目
  2. 从该 oplog 文档生成 ResumeToken(因此我们在第二个条目上恢复)
  3. 测试这些功能的示例。


/// <summary>
/// Locates the first document for the given namespace in the local.oplog collection
/// </summary>
/// <param name="docNamespace">Namespace to search for</param>
/// <returns>First Document found in the local.oplog collection for the specified namespace</returns>
internal static BsonDocument GetFirstDocumentFromOpLog(string docNamespace)
    mongoClient = mongoClient ?? new MongoClient(ConnectionString);  //Create client object if it is null
    IMongoDatabase localDB = mongoClient.GetDatabase("local");
    var collection = localDB.GetCollection<BsonDocument>("oplog.rs");

    //Find the documents from the specified namespace (DatabaseName.CollectionName), that have an operation type of "insert" (The first entry to a collection must always be an insert)
    var filter = MongoDB.Bson.Serialization.BsonSerializer.Deserialize<BsonDocument>("{ $and: [ { 'ns': '" + docNamespace + "'}, { 'op': 'i'}] }");

    BsonDocument retDoc = null;
    try //to get the first document from the oplog entries
        retDoc = collection.Find<BsonDocument>(filter).First();
    catch(Exception ex) { /*Logger.WriteException(ex);*/ }
    return retDoc;

/// <summary>
/// Takes a document from the OpLog and generates a ResumeToken
/// </summary>
/// <param name="firstDoc">BsonDocument from the local.oplog collection to base the ResumeToken on</param>
/// <returns>A ResumeToken that can be provided to a collection watch (ChangeStream) that points to the firstDoc provided</returns>
private static BsonDocument GetResumeTokenFromOpLogDoc(BsonDocument firstDoc)
    List<byte> hexVal = new List<byte>(34);

    //Insert Timestamp of document
    hexVal.Add(0x82);   //TimeStamp Tag
    byte[] docTimeStampByteArr = BitConverter.GetBytes(firstDoc["ts"].AsBsonTimestamp.Timestamp); //Timestamp is an integer, so we need to reverse it
    if (BitConverter.IsLittleEndian) { Array.Reverse(docTimeStampByteArr); }

    //Expecting UInt64, so make sure we added 8 bytes (likely only added 4)
    hexVal.AddRange(new byte[] { 0x00, 0x00, 0x00, 0x01 }); //Not sure why the last bytes is a 0x01, but it was present in observed ResumeTokens

    //Unknown Object observed in a ResumeToken
    //0x46 = CType::Object, followed by the string "d_id" NULL
    //This may be something that identifies that the following value is for the "_id" field of the ObjectID given next
    hexVal.AddRange(new byte[] { 0x46, 0x64, 0x5F, 0x69, 0x64, 0x00 }); //Unknown Object, expected to be 32 bits, with a 0x00 terminator

    //Insert OID (from 0._id.ObjectID)
    hexVal.Add(0x64);   //OID Tag
    byte[] docByteArr = firstDoc["o"]["_id"].AsObjectId.ToByteArray();
    hexVal.Add(0x00);   //End of OID

    //Insert UUID (from ui) as BinData
    hexVal.AddRange(new byte[] { 0x5a, 0x10, 0x04 });   //0x5A = BinData, 0x10 is Length (16 bytes), 0x04 is BinDataType (newUUID)

    hexVal.Add(0x04);   //Unknown marker (maybe end of resumeToken since 0x04 == ASCII 'EOT')

    //Package the binary data into a BsonDocument with the key "_data" and the value as a Base64 encoded string
    BsonDocument retDoc = new BsonDocument("_data", new BsonBinaryData(hexVal.ToArray()));
    return retDoc;

/// <summary>
/// Example Code for setting up and resuming to the second doc
/// </summary>
internal static void MonitorChangeStream()
    mongoClient = mongoClient ?? new MongoClient(ConnectionString);  //Create client object if it is null
    IMongoDatabase sandboxDB = mongoClient.GetDatabase("SandboxDB");
    var collection = sandboxDB.GetCollection<BsonDocument>("CollectionToMonitor");

    var options = new ChangeStreamOptions();
    options.FullDocument = ChangeStreamFullDocumentOption.UpdateLookup;

        var pipeline = new EmptyPipelineDefinition<ChangeStreamDocument<BsonDocument>>().Match("{ operationType: { $in: [ 'replace', 'insert', 'update' ] } }");  //Works

        //Build ResumeToken from the first document in the oplog collection
        BsonDocument resumeTokenRefDoc = GetFirstDocumentFromOpLog(collection.CollectionNamespace.ToString());
        if (resumeTokenRefDoc != null)
            BsonDocument docResumeToken = GetResumeTokenFromOpLogDoc(resumeTokenRefDoc);
            options.ResumeAfter = docResumeToken;

        //Setup the ChangeStream/Watch Cursor
        var cursor = collection.Watch(pipeline, options);
        var enumerator = cursor.ToEnumerable().GetEnumerator();

        enumerator.MoveNext();  //Blocks until a record is UPDATEd, REPLACEd or INSERTed in the database (thanks to the pipeline arg), or returns the second entry (thanks to the ResumeToken that points to the first entry)

        ChangeStreamDocument<BsonDocument> lastChangeStreamDocument = enumerator.Current;
        //lastChangeStreamDocument is now pointing to the second entry in the oplog, or the just received entry
        //A loop can be setup to call enumerator.MoveNext() to step through each entry in the oplog history and to also receive new events

        enumerator.Dispose();   //Be sure to dispose of the enumerator when finished.
    catch( Exception ex)



