对 DStream 进行类型参数化

2024-02-28

Can a DStream have type parameters?

如果是,怎么办?

当我尝试时lazy val qwe = mStream.mapWithState(stateSpec) on myDStream: DStream[(A, B)](类参数),我得到:

value mapWithState is not a member of org.apache.spark.streaming.dstream.DStream[(A, B)]
    lazy val qwe = mStream.mapWithState(stateSpec)

Spark API 的大量子集需要隐式ClassTags (see Scala:什么是 TypeTag 以及如何使用它? https://stackoverflow.com/q/12218641/6910411) and PairDStreamFunctions.mapWithState没有什么不同。查看类定义 https://github.com/apache/spark/blob/f830bb9170f6b853565d9dd30ca7418b93a54fe3/streaming/src/main/scala/org/apache/spark/streaming/dstream/PairDStreamFunctions.scala#L37-L38:

class PairDStreamFunctions[K, V](self: DStream[(K, V)])
  (implicit kt: ClassTag[K], vt: ClassTag[V], ord: Ordering[K])

and https://github.com/apache/spark/blob/f830bb9170f6b853565d9dd30ca7418b93a54fe3/streaming/src/main/scala/org/apache/spark/streaming/dstream/PairDStreamFunctions.scala#L380-L381:

def mapWithState[StateType: ClassTag, MappedType: ClassTag](
    spec: StateSpec[K, V, StateType, MappedType]
  ): MapWithStateDStream[K, V, StateType, MappedType] = {
  ...
}

如果想创建一个对通用对流进行操作并使用的函数mapWithState你至少应该提供ClassTags for KeyType and ValueType types:

def foo[T : ClassTag, U : ClassTag](
  stream: DStream[(T, U)], f: StateSpec[T, U, Int, Int]) = stream.mapWithState(f)

If StateType and MappedType也被参数化了,你需要ClassTags对于这些也:

def bar[T : ClassTag, U : ClassTag, V : ClassTag,  W : ClassTag](
  stream: DStream[(T, U)], f: StateSpec[T, U, V, W]) = stream.mapWithState(f)
本文内容由网友自发贡献,版权归原作者所有,本站不承担相应法律责任。如您发现有涉嫌抄袭侵权的内容,请联系:hwhale#tublm.com(使用前将#替换为@)

对 DStream 进行类型参数化 的相关文章

  • kafka消费端Offsets的一致性

    我有复制因子为 3 的卡夫卡主题min insync replicas 2 一个向该主题发送 X 条消息的生产者acks all 一段时间后 1 分钟内 在所有消息发送到主题后 将使用 java kafka 客户端为此主题创建新的消费者 使
  • ';'预期但发现“导入” - Scala 和 Spark

    我正在尝试使用 Spark 和 Scala 来编译一个独立的应用程序 我不知道为什么会收到此错误 topicModel scala 2 expected but import found error import org apache sp
  • Spark:出现心跳错误后丢失数据

    我有一个在 Spark 集群上运行的 Python 程序 有四个工作线程 它处理一个包含大约 1500 万条记录的巨大 Oracle 表 检查结果后发现大约有600万条记录没有插入 我的写入功能如下 df write format jdbc
  • 使用 pyspark 计算所有可能的单词对

    我有一个文本文档 我需要找到整个文档中重复单词对的可能数量 例如 我有下面的word文档 该文档有两行 每行用 分隔 文档 My name is Sam My name is Sam My name is Sam My name is Sa
  • 通过过滤对 Pyspark Dataframe 进行分组

    我有一个数据框如下 cust id req req met 1 r1 1 1 r2 0 1 r2 1 2 r1 1 3 r1 1 3 r2 1 4 r1 0 5 r1 1 5 r2 0 5 r1 1 我必须观察客户 看看他们有多少要求 看看
  • 如何以最佳方式传递元组参数?

    如何以最佳方式传递元组参数 Example def foo Int Int def bar a Int b Int 现在我想传递的输出foo to bar 这可以通过以下方式实现 val fooResult foo bar fooResul
  • 在 Akka/Scala 中使用带有 future 的 mapTo

    我最近开始使用 Akka Scala 编码 遇到了以下问题 通过范围内的隐式转换 例如 implicit def convertTypeAtoTypeX a TypeA TypeX TypeX just some kinda convers
  • 在 Akka 中配置嵌套 Router

    我有一些嵌套的路由器 应创建它FromConfig 我想要的是这样的 test akka actor deployment worker router round robin nr of instances 5 slave router b
  • 如何在 Apache Spark 中通过 DStream 使用特征提取

    我有通过 DStream 从 Kafka 到达的数据 我想进行特征提取以获得一些关键词 我不想等待所有数据的到达 因为它是可能永远不会结束的连续流 所以我希望以块的形式执行提取 如果准确性会受到一点影响 对我来说并不重要 到目前为止 我整理
  • 具有两个通用参数的上下文边界

    在 Scala 中 我可以使用上下文边界 def sort T Ordered t Seq T 与以下意思相同 def sort T t Seq T implicit def Ordered T 如果我有一个带有两个泛型参数的类怎么办 IE
  • HashPartitioner 是如何工作的?

    我阅读了文档HashPartitioner http spark apache org docs 1 3 1 api java index html org apache spark HashPartitioner html 不幸的是 除了
  • Spark问题中读取大文件 - python

    我已经使用 python 在本地安装了 Spark 并在运行以下代码时 data sc textFile C Users xxxx Desktop train csv data first 我收到以下错误 Py4JJavaError Tra
  • 可选择将项目添加到 Scala 映射

    我正在寻找这个问题的惯用解决方案 我正在构建一个valScala 不可变 Map 并希望有选择地添加一项或多项 val aMap Map key1 gt value1 key2 gt value2 if condition key3 gt
  • 具有上限的联合类型

    我正在遵循这个问题的公认答案中提出的技术如何定义 类型析取 联合类型 https stackoverflow com questions 3508077 does scala have type disjunction union type
  • Spark:如何使用crossJoin

    我有两个数据框 df1有 100000 行并且df2有 10000 行 我想创建一个df3这是两者的交叉连接 val df3 df1 crossJoin df2 这将产生 10 亿行 尝试在本地运行它 但似乎需要很长时间 您认为本地可以实现
  • Play框架:单属性案例类的JSON读取

    我正在尝试为包含单个属性的案例类创建隐式 JSON Reads 但收到错误 Reads Nothing 不符合预期类型 这是代码 import play api libs functional syntax import play api
  • 使用 Scala 获取 Spark 数据集中最新时间戳对应的行

    我对 Spark 和 Scala 比较陌生 我有一个具有以下格式的数据框 Col1 Col2 Col3 Col 4 Col 5 Col TS Col 7 1234 AAAA 1111 afsdf ewqre 1970 01 01 00 00
  • 如何将 Pyspark Dataframe 标题设置到另一行?

    我有一个如下所示的数据框 col1 col2 col3 id name val 1 a01 X 2 a02 Y 我需要从中创建一个新的数据框 使用 row 1 作为新的列标题并忽略或删除 col1 col2 等行 新表应如下所示 id na
  • scala中的反引号有什么用[重复]

    这个问题在这里已经有答案了 我在一本书上找到了以下代码 val list List 5 4 3 2 1 val result 0 list running total next element running total next elem
  • 解决“Show”类型类实例的隐式问题

    我正在努力使Gender实施Show类型类 scala gt trait Gender extends Show Gender defined trait Gender scala gt case object Male extends G

随机推荐