在我的测试环境中,我有 1 个 Cassandra 节点和 3 个 Spark 节点。我想迭代大约有 200k 行的明显大表,每行大约占用 20-50KB。
CREATE TABLE foo (
uid timeuuid,
events blob,
PRIMARY KEY ((uid))
)
这是在 Spark 集群中执行的 scala 代码
val rdd = sc.cassandraTable("test", "foo")
// This pulls records in memory, taking ~6.3GB
var count = rdd.select("events").count()
// Fails nearly immediately with
// NoHostAvailableException: All host(s) tried for query failed [...]
var events = rdd.select("events").collect()
Cassandra 2.0.9、Spark:1.2.1、Spark-cassandra-connector-1.2.0-alpha2
我试着只跑collect
, 没有count
- 在这种情况下,它很快就会失败NoHostAvailableException
.
问题:一次迭代大表读取和处理小批量行的正确方法是什么?
Cassandra Spark Connector 中有 2 个设置用于调整块大小(将它们放在 SparkConf 对象中):
- Spark.cassandra.input.split.size:每个 Spark 分区的行数(默认 100000)
- Spark.cassandra.input.page.row.size:每个获取页面的行数(即网络往返)(默认1000)
此外,您不应该使用collect
在您的示例中执行操作,因为它将获取驱动程序应用程序内存中的所有行,并可能引发内存不足异常。您可以使用collect
仅当您确定它会产生少量行时才采取行动。这count
动作不同,它只产生一个整数。因此,我建议您像以前一样从 Cassandra 加载数据,处理它,然后存储结果(在 Cassandra、HDFS 等中)。
本文内容由网友自发贡献,版权归原作者所有,本站不承担相应法律责任。如您发现有涉嫌抄袭侵权的内容,请联系:hwhale#tublm.com(使用前将#替换为@)