Cassandra 中的异步写入似乎被破坏

2023-11-24

在将 900 万行批量写入 12 节点 cassandra (2.1.2) 集群时,我遇到了 Spark-cassandra-connector (1.0.4、1.1.0) 的问题。我以一致性 ALL 写入并以一致性 1 读取,但每次读取的行数都不同于 900 万(8.865.753、8.753.213 等)。

我检查了连接器的代码,没有发现任何问题。然后,我决定编写自己的应用程序,独立于 Spark 和连接器,来调查问题(唯一的依赖项是 datastax-driver-code 版本 2.1.3)。

完整的代码、启动脚本和配置文件现在可以在github上找到的.

在伪代码中,我编写了两个不同版本的应用程序,即同步版本:

try (Session session = cluster.connect()) {

    String cql = "insert into <<a table with 9 normal fields and 2 collections>>";
    PreparedStatement pstm = session.prepare(cql);

    for(String partitionKey : keySource) {
        // keySource is an Iterable<String> of partition keys

        BoundStatement bound = pstm.bind(partitionKey /*, << plus the other parameters >> */);
        bound.setConsistencyLevel(ConsistencyLevel.ALL);

        session.execute(bound);
    }

}

还有异步的:

try (Session session = cluster.connect()) {

    List<ResultSetFuture> futures = new LinkedList<ResultSetFuture>();

    String cql = "insert into <<a table with 9 normal fields and 2 collections>>";
    PreparedStatement pstm = session.prepare(cql);

    for(String partitionKey : keySource) {
        // keySource is an Iterable<String> of partition keys

        while(futures.size()>=10 /* Max 10 concurrent writes */) {
            // Wait for the first issued write to terminate
            ResultSetFuture future = futures.get(0);
            future.get();
            futures.remove(0);
        }

        BoundStatement bound = pstm.bind(partitionKey /*, << plus the other parameters >> */);
        bound.setConsistencyLevel(ConsistencyLevel.ALL);

        futures.add(session.executeAsync(bound));
    }

    while(futures.size()>0) {
        // Wait for the other write requests to terminate
        ResultSetFuture future = futures.get(0);
        future.get();
        futures.remove(0);
    }
}

最后一种与连接器在非批量配置情况下使用的类似。

应用程序的两个版本在所有情况下都工作相同,负载较高时除外。

例如,当在 9 台机器(45 个线程)上运行具有 5 个线程的同步版本时,向集群写入 900 万行,我在后续读取中找到了所有行(使用 Spark-cassandra-connector)。

如果我运行每台机器 1 个线程(9 个线程)的异步版本,执行速度会快得多,但我无法在后续读取中找到所有行(与 Spark-cassandra-connector 出现的问题相同)。

代码在执行过程中没有抛出异常。

问题的原因可能是什么?

我添加了一些其他结果(感谢您的评论):

  • 9 台机器上有 9 个线程的异步版本,每个线程有 5 个并发写入器(45 个并发写入器):没有问题
  • 9 台机器上 90 个线程的同步版本(每个 JVM 实例 10 个线程):没有问题

异步写入和并发写入器数量 > 45 且

  • 将 ResultSetFuture 的“get”方法替换为 “getUninterruptible”:同样的问题。
  • 异步版本,9 台机器上 18 个线程,5 个并发 每个线程的写入者(90 个并发写入者):没有问题.

最后的发现表明,大量的并发写入者 (90) 并不像第一次测试中预期的那样是一个问题。问题是使用同一会话进行大量异步写入。

如果同一会话上有 5 个并发异步写入,则不存在该问题。如果我将并发写入数量增加到 10,某些操作会在没有通知的情况下丢失。

如果您在同一会话上同时发出多个 (>5) 写入,Cassandra 2.1.2(或 Cassandra Java 驱动程序)中的异步写入似乎会被破坏。


尼古拉和我这个周末通过电子邮件进行了交流,并认为我应该在这里提供我当前理论的更新。我看了一下github项目Nicola 分享并试验了 EC2 上的 8 节点集群。

我能够使用 2.1.2 重现该问题,但确实观察到在一段​​时间后我可以重新执行 Spark 作业并返回所有 900 万行。

我似乎注意到,当节点处于压缩状态时,我并没有获得全部 900 万行。一时兴起我看了一眼2.1 的更改日志并观察到一个问题CASSANDRA-8429 - “压缩期间某些键无法读取”这或许可以解释这个问题。

看到问题已在 2.1.3 中得到解决,我重新运行了针对 cassandra-2.1 分支的测试,并在压缩活动发生时运行了计数作业,并返回了 900 万行。

我想对此进行更多实验,因为我对 cassandra-2.1 分支的测试相当有限,并且压缩活动可能纯粹是巧合,但我希望这可以解释这些问题。

本文内容由网友自发贡献,版权归原作者所有,本站不承担相应法律责任。如您发现有涉嫌抄袭侵权的内容,请联系:hwhale#tublm.com(使用前将#替换为@)

Cassandra 中的异步写入似乎被破坏 的相关文章

随机推荐