我正在尝试弄清楚如何使用流畅的流媒体。我使用 slick 3.0.0 和 postgres 驱动程序
情况如下:服务器必须向客户端提供按大小(以字节为单位)限制的数据块分割的数据序列。因此,我编写了以下巧妙的查询:
val sequences = TableQuery[Sequences]
def find(userId: Long, timestamp: Long) = sequences.filter(s ⇒ s.userId === userId && s.timestamp > timestamp).sortBy(_.timestamp.asc).result
val seq = db.stream(find(0L, 0L))
我将 seq 与 akka-streams 结合起来Source
,写自定义PushPullStage
,限制数据的大小(以字节为单位)并在达到大小限制时完成上游。它工作得很好。问题是 - 当我查看 postgres 日志时,我看到这样的查询select * from sequences where user_id = 0 and timestamp > 0 order by timestamp;
因此,乍一看,似乎正在进行大量(且不必要的)数据库查询,只是在每个查询中使用几个字节。使用 Slick 进行流式传输以最大程度地减少数据库查询并充分利用每个查询中传输的数据的正确方法是什么?
使用 Slick 和 Postgres 进行流式传输的“正确方法”包括三件事:
必须使用 db.stream()
必须禁用autoCommit
在 JDBC 驱动程序中。一种方法是通过后缀使查询在事务中运行.transactionally
.
必须设置fetchSize
必须是 0 以外的值,否则 postgres 会将整个结果集一次性推送到客户端。
Ex:
DB.stream(
find(0L, 0L)
.transactionally
.withStatementParameters(fetchSize = 1000)
).foreach(println)
有用的链接:
https://github.com/slick/slick/issues/1038 https://github.com/slick/slick/issues/1038
https://github.com/slick/slick/issues/809 https://github.com/slick/slick/issues/809
本文内容由网友自发贡献,版权归原作者所有,本站不承担相应法律责任。如您发现有涉嫌抄袭侵权的内容,请联系:hwhale#tublm.com(使用前将#替换为@)