我正在研究 scala/slick 流,并试图了解它是如何工作的。这是我的测试代码
val bigdata = TableQuery[BigData]
val x = db.stream(bigdata.result.transactionally.withStatementParameters(fetchSize = 100)).foreach {
(tuple: (Int, UUID)) =>
println(tuple._1 + " " + tuple._2)
Thread.sleep(50)//emulating slow consumer.
}
Await.result(x, 100000 seconds)
当代码运行时,我启用了 postgresql 查询日志来了解幕后发生的事情。我发现每 100 个元素就会发生一次重新查询
2015-11-06 15:03:24 IST [24379-3] postgres@scala_test 日志:从 S_2/C_3 执行获取:从“bigdata”x2 选择 x2."id", x2."data"
2015-11-06 15:03:29 IST [24379-4] postgres@scala_test 日志:执行
fetch from S_2/C_3: select x2."id", x2."data" from "bigdata" x2
2015-11-06 15:03:34 IST [24379-5] postgres@scala_test LOG: execute fetch from S_2/C_3: select x2."id", x2."data" from "bigdata" x2
2015-11-06 15:03:39 IST [24379-6] postgres@scala_test LOG: execute fetch from S_2/C_3: select x2."id", x2."data" from "bigdata" x2
2015-11-06 15:03:44 IST [24379-7] postgres@scala_test LOG: execute fetch from S_2/C_3: select x2."id", x2."data" from "bigdata" x2
2015-11-06 15:03:49 IST [24379-8] postgres@scala_test LOG: execute fetch from S_2/C_3: select x2."id", x2."data" from "bigdata" x2
然而,它看起来像是在获取整个数据集。我期待一个带有偏移量的查询。
ie SELECT * FROM bigdata LIMIT 100 OFFSET 500
看起来一切都被查询并且发送数据是部分发送的。
然后,当上面的流正在运行时,我将新的数据集插入到同一个表中。
串流之前
SELECT count(*) FROM bigdata -> 500
然后插入几行
SELECT count(*) FROM bigdata -> 700
但流在 500 处停止。这似乎表明新数据从未被提取和流回。关于流媒体如何在 slick 中工作的任何想法。