使用 slick/scala 进行流式传输

2024-04-09

我正在研究 scala/slick 流,并试图了解它是如何工作的。这是我的测试代码

    val bigdata = TableQuery[BigData] 
    val x = db.stream(bigdata.result.transactionally.withStatementParameters(fetchSize = 100)).foreach {
      (tuple: (Int, UUID)) =>
        println(tuple._1 + " " + tuple._2)
        Thread.sleep(50)//emulating slow consumer.
    }

    Await.result(x, 100000 seconds)

当代码运行时,我启用了 postgresql 查询日志来了解幕后发生的事情。我发现每 100 个元素就会发生一次重新查询

2015-11-06 15:03:24 IST [24379-3] postgres@scala_test 日志:从 S_2/C_3 执行获取:从“bigdata”x2 选择 x2."id", x2."data" 2015-11-06 15:03:29 IST [24379-4] postgres@scala_test 日志:执行

fetch from S_2/C_3: select x2."id", x2."data" from "bigdata" x2
2015-11-06 15:03:34 IST [24379-5] postgres@scala_test LOG:  execute fetch from S_2/C_3: select x2."id", x2."data" from "bigdata" x2
2015-11-06 15:03:39 IST [24379-6] postgres@scala_test LOG:  execute fetch from S_2/C_3: select x2."id", x2."data" from "bigdata" x2
2015-11-06 15:03:44 IST [24379-7] postgres@scala_test LOG:  execute fetch from S_2/C_3: select x2."id", x2."data" from "bigdata" x2
2015-11-06 15:03:49 IST [24379-8] postgres@scala_test LOG:  execute fetch from S_2/C_3: select x2."id", x2."data" from "bigdata" x2

然而,它看起来像是在获取整个数据集。我期待一个带有偏移量的查询。

ie SELECT * FROM bigdata  LIMIT 100 OFFSET 500

看起来一切都被查询并且发送数据是部分发送的。

然后,当上面的流正在运行时,我将新的数据集插入到同一个表中。

串流之前

SELECT count(*) FROM bigdata -> 500

然后插入几行

SELECT count(*) FROM bigdata  -> 700

但流在 500 处停止。这似乎表明新数据从未被提取和流回。关于流媒体如何在 slick 中工作的任何想法。


None

本文内容由网友自发贡献,版权归原作者所有,本站不承担相应法律责任。如您发现有涉嫌抄袭侵权的内容,请联系:hwhale#tublm.com(使用前将#替换为@)

使用 slick/scala 进行流式传输 的相关文章

  • Scala 如何将 Map 转换为元组的可变参数?

    在 Scala Play 2 2 x 测试的背景下 我有一个Map String String 我需要将其传递给接受的函数 String String 即一个可变参数 String String tuple e g val data Map
  • 如何在 Apache Spark 中基于列的子集实现“ except ”?

    我正在 Spark 中使用两个模式 table1 and table2 scala gt table1 printSchema root user id long nullable true item id long nullable tr
  • 压缩 HList 的函数的推断类型

    谢谢https github com milessabin shapeless wiki Feature overview shapeless 2 0 0 https github com milessabin shapeless wiki
  • 结构化 scala 案例类的自定义 json 序列化

    我有一些用于往返 scala 案例类的工作 jackson scala 模块代码 Jackson 对于平面案例类非常有用 但是当我制作一个包含其他案例类列表的案例时 我似乎需要很多代码 考虑 abstract class Message c
  • 案例类和案例对象之间的区别?

    我正在学习 Scala 和 Akka 并且在最近的查找中solution https stackoverflow com questions 22770927 waiting for multiple results in akka 我发现
  • Scala SBT 和 JNI 库

    我正在编写一个简单的应用程序Scala通过以下方式使用 leveldb 数据库leveldbjni图书馆 我的build sbt文件看起来像这样 name Whatever version 1 0 scalaVersion 2 10 2 l
  • Twitter Future 与 Scala Future 相比有何优势?

    我知道 Scala Future 变得更好的很多原因 有什么理由改用 Twitter Future 吗 除了 Finagle 使用它这一事实之外 免责声明 我在 Twitter 负责 Future 的实施 一点背景知识 在 Scala 有一
  • Spark Scala:按小时或分钟计算两列的 DateDiff

    我在数据框中有两个时间戳列 我想获取它们的分钟差异 或者小时差异 目前我可以通过四舍五入获得日差 val df2 df1 withColumn time datediff df1 ts1 df1 ts2 但是 当我查看文档页面时https
  • Scala repl 抛出错误

    当我打字时scala在终端上启动 repl 它会抛出此错误 scala gt init error error while loading AnnotatedElement class file usr lib jvm java 8 ora
  • 用惯用的 Scala 更新大型数据结构

    我已经尝试 Scala 一段时间了 并且经常遇到支持不可变数据结构的建议 但是当你有一个像这样的数据结构时3D 场景图 大型神经网络或任何具有大量需要频繁更新的对象的东西 对场景中的对象进行动画处理 训练神经网络 这似乎是 运行时效率极低
  • Scala REPL 中的递归重载语义 - JVM 语言

    使用 Scala 的命令行 REPL def foo x Int Unit def foo x String Unit println foo 2 gives error type mismatch found Int 2 required
  • 如何以最佳方式传递元组参数?

    如何以最佳方式传递元组参数 Example def foo Int Int def bar a Int b Int 现在我想传递的输出foo to bar 这可以通过以下方式实现 val fooResult foo bar fooResul
  • 如何将 scala 列表转换为 javascript 数组?

    有更简单的方法吗 document ready function var jsArray if scalaList null for id lt scalaList jsArray push id 很简单 如下所示 import play
  • 在 Akka 中配置嵌套 Router

    我有一些嵌套的路由器 应创建它FromConfig 我想要的是这样的 test akka actor deployment worker router round robin nr of instances 5 slave router b
  • 如何在 Apache Spark 中通过 DStream 使用特征提取

    我有通过 DStream 从 Kafka 到达的数据 我想进行特征提取以获得一些关键词 我不想等待所有数据的到达 因为它是可能永远不会结束的连续流 所以我希望以块的形式执行提取 如果准确性会受到一点影响 对我来说并不重要 到目前为止 我整理
  • Scala 宏的位置怎么了?

    我试图获取宏参数的原始输入字符串 但返回的位置似乎有点偏离 考虑这个宏 例如 object M import scala reflect macros Context import language experimental macros
  • HashPartitioner 是如何工作的?

    我阅读了文档HashPartitioner http spark apache org docs 1 3 1 api java index html org apache spark HashPartitioner html 不幸的是 除了
  • 模拟 BlazeClientBuilder[IO] 以返回模拟客户端[IO]

    我正在使用BlazeClientBuilder IO resource方法得到Client IO 现在 我想模拟客户端进行单元测试 但不知道该怎么做 有没有一个好的方法来嘲笑这个 我会怎么做 class ExternalCall val r
  • Spark SQL 失败,因为“常量池已超过 JVM 限制 0xFFFF”

    我在 EMR 4 6 0 Spark 1 6 1 上运行此代码 val sqlContext SQLContext getOrCreate sc val inputRDD sqlContext read json input try inp
  • 规范化且不可变的数据模型

    Haskell如何解决 规范化不可变数据结构 问题 例如 让我们考虑一个表示前女友 男友的数据结构 data Man Man name String exes Woman data Woman Woman name String exes

随机推荐

  • 在android中使用Google Drive api获取在Google Drive上创建的文件的大小

    我创建了应用程序 使用该应用程序用户将能够从 Google 驱动器获取列表中的所有文件并能够下载它 现在 用户可以下载已上传到驱动器的文件 并带有进度计数 但我无法获取在 Goolge 驱动器上创建的文件的大小 例如文档 演示文稿 电子表格
  • Android UI 测试期间“未找到测试”

    如果我有时想通过右键单击测试然后选择运行来运行单个测试 测试结果将显示 未找到测试 并显示与 线程 main java lang NoClassDefFoundError 中的异常 相关的错误 我发现这种情况只发生过几次 为什么会发生这种情
  • C++迭代器和反向迭代器

    我正在写一个iterator 实际上是const iterator对于我当前的对象 我还想创建一个reverse const iterator also 我环顾四周 想看看如何做到这一点 然后我偶然发现this http www cplus
  • 在jsp页面中使用log4j的正确方法是什么

    我的意思是 我希望记录器名称反映 source jsp 文件 无论它是否包含在另一个文件中或编译为类或其他文件 首先 导入所需的包 即 then the jsppagename jsp根据您使用的服务器 可能会发生变化 然后 在 jsp 内
  • GZip 算法如何工作? [关闭]

    Closed 这个问题不符合堆栈溢出指南 help closed questions 目前不接受答案 GZip 文件压缩算法如何工作 如果有人有这方面的任何文档 我想阅读它 如果您仍在寻找更详细的概述 我已经在以下位置编写了 gzip de
  • Django 对象.update_or_create

    我有一个在 celery 中运行的 period task 来查询最新的加密货币价格 但由于某种原因 每次想要显示数据时 我没有得到更新的记录 我只是得到新的记录 而旧的记录由于某种原因被保留 tasks py periodic task
  • NavigationView如何处理动态标题内容

    我有一个非常标准的 NavigationView 当我在标题中使用静态布局 如下所示 时 效果非常好
  • 约束布局 - 具有最大宽度的两个视图

    我想创建一个布局 使用约束布局 如下所示 在不同的语言中 Button1 可能比 Button2 大 我怎样才能做到这一点 我只能在包含两个按钮的约束内使用 LinearLayout 来实现此目的 但我尝试仅使用布局 Thanks Upda
  • 如果主体参数以“@”开头,则发出 PowerShell POST 请求

    我想在 PowerShell 中发出 POST 请求 以下是 Postman 中的正文详细信息 type login username email protected cdn cgi l email protection password
  • 生成数字数组中有效的数字组合

    我正在尝试从数字数组中生成所有有效的数字组合 假设我们有以下内容 let arr 1 2 9 4 7 我们需要输出这样的内容 1 2 9 4 7 1 2 9 47 1 2 94 7 1 2 947 1 29 4 7 1 29 47 1 29
  • 我无法在我的 Visual C Express Edition 2008 中汇编电影 (MMX) 指令

    当我尝试编译时movd指令显示错误为 error A2085 instruction or register not accepted in current CPU mode 我的代码如下 386 model flat c code add
  • 我怎样才能让我的verilog移位器更通用?

    这里我有一个移位器 但现在它最多只能工作 3 位 我一直在寻找 但不知道如何让它工作最多 8 位 module shifter a b out input 7 0 a b output 7 0 out wire 7 0 out1 out2
  • 扩展 Eloquent 的类的构造函数

    我刚刚启动了一个新网站 我想使用 Eloquent 在为数据库播种的过程中 我注意到 如果我在扩展 eloquent 的模型上包含任何类型的构造函数 则会添加空行 例如 运行此播种器
  • 如何解决三向多态关联?

    首先我要说我正在使用 MySQL 不是事务型 并且这是无法更改的 另外 为了简洁和清晰起见 我简化了此处的表格 在此示例中 课程 由其内部属性和外部属性及其自己的属性 阅读 组成 阅读 有其自己的关键依赖属性和三个不同的外部属性 阅读源 我
  • 如何在 SQL Server 2008 中存储特定列的列值?

    基本上我正在映射字段 正如你所看到的GridView 2我选择了特定的列名称 让我们考虑第一条记录 即1 id Column0 For id我已选择Column0 所以在数据库中我想在 id 列下存储列值 1 2 3 4 像下面这样 id
  • 使用属性和不访问 ivars 之间的区别

    使用属性或直接访问 ivars 的特定性能和行为差异 对于全局变量 使用它有什么区别 interface myClass UIImageView myView void loadView super loadView myView UIIm
  • 如何动态添加 mixin 作为基类而不出现 MRO 错误?

    说我有课A B and C Class A and B都是 Class 的 mixin 类C class A object pass class B object pass class C object A B pass 这在实例化 C 类
  • 在angularjs中克隆html元素

    我正在尝试在 angularjs 中实现拖放系统 我希望在拖动开始时克隆拖动的对象 但是我不知道如何在 angularjs 中克隆元素及其范围和链接控制器 有什么建议么 不建议使用 Angular 来克隆 DOM 元素 通常是通过拖放完成的
  • 如何比较两个 pandas 数据帧并删除一个文件上的重复项而不附加其他文件中的数据[重复]

    这个问题在这里已经有答案了 我正在尝试使用 pandas 数据框比较两个 csv 文件 其中一个是每天都会附加数据的主表 test master csv 第二个是每日报告 test daily csv 其中包含我想要附加到 test mas
  • 使用 slick/scala 进行流式传输

    我正在研究 scala slick 流 并试图了解它是如何工作的 这是我的测试代码 val bigdata TableQuery BigData val x db stream bigdata result transactionally