在将 900 万行批量写入 12 节点 cassandra (2.1.2) 集群时,我遇到了 Spark-cassandra-connector (1.0.4、1.1.0) 的问题。我以一致性 ALL 写入并以一致性 1 读取,但每次读取的行数都不同于 900 万(8.865.753、8.753.213 等)。
我检查了连接器的代码,没有发现任何问题。然后,我决定编写自己的应用程序,独立于 Spark 和连接器,来调查问题(唯一的依赖项是 datastax-driver-code 版本 2.1.3)。
完整的代码、启动脚本和配置文件现在可以在github上找到的.
在伪代码中,我编写了两个不同版本的应用程序,即同步版本:
try (Session session = cluster.connect()) {
String cql = "insert into <<a table with 9 normal fields and 2 collections>>";
PreparedStatement pstm = session.prepare(cql);
for(String partitionKey : keySource) {
// keySource is an Iterable<String> of partition keys
BoundStatement bound = pstm.bind(partitionKey /*, << plus the other parameters >> */);
bound.setConsistencyLevel(ConsistencyLevel.ALL);
session.execute(bound);
}
}
还有异步的:
try (Session session = cluster.connect()) {
List<ResultSetFuture> futures = new LinkedList<ResultSetFuture>();
String cql = "insert into <<a table with 9 normal fields and 2 collections>>";
PreparedStatement pstm = session.prepare(cql);
for(String partitionKey : keySource) {
// keySource is an Iterable<String> of partition keys
while(futures.size()>=10 /* Max 10 concurrent writes */) {
// Wait for the first issued write to terminate
ResultSetFuture future = futures.get(0);
future.get();
futures.remove(0);
}
BoundStatement bound = pstm.bind(partitionKey /*, << plus the other parameters >> */);
bound.setConsistencyLevel(ConsistencyLevel.ALL);
futures.add(session.executeAsync(bound));
}
while(futures.size()>0) {
// Wait for the other write requests to terminate
ResultSetFuture future = futures.get(0);
future.get();
futures.remove(0);
}
}
最后一种与连接器在非批量配置情况下使用的类似。
应用程序的两个版本在所有情况下都工作相同,负载较高时除外。
例如,当在 9 台机器(45 个线程)上运行具有 5 个线程的同步版本时,向集群写入 900 万行,我在后续读取中找到了所有行(使用 Spark-cassandra-connector)。
如果我运行每台机器 1 个线程(9 个线程)的异步版本,执行速度会快得多,但我无法在后续读取中找到所有行(与 Spark-cassandra-connector 出现的问题相同)。
代码在执行过程中没有抛出异常。
问题的原因可能是什么?
我添加了一些其他结果(感谢您的评论):
- 9 台机器上有 9 个线程的异步版本,每个线程有 5 个并发写入器(45 个并发写入器):没有问题
- 9 台机器上 90 个线程的同步版本(每个 JVM 实例 10 个线程):没有问题
异步写入和并发写入器数量 > 45 且
- 将 ResultSetFuture 的“get”方法替换为
“getUninterruptible”:同样的问题。
- 异步版本,9 台机器上 18 个线程,5 个并发
每个线程的写入者(90 个并发写入者):没有问题.
最后的发现表明,大量的并发写入者 (90) 并不像第一次测试中预期的那样是一个问题。问题是使用同一会话进行大量异步写入。
如果同一会话上有 5 个并发异步写入,则不存在该问题。如果我将并发写入数量增加到 10,某些操作会在没有通知的情况下丢失。
如果您在同一会话上同时发出多个 (>5) 写入,Cassandra 2.1.2(或 Cassandra Java 驱动程序)中的异步写入似乎会被破坏。