SparkStreaming从kafka消费数据

2023-11-04

val spark = SparkSession.builder().master("local[*]").appName("myKafka"),getOrCreate()
//5秒一个窗口
val ssc=new StreamingContext(spark.sparkContext,Seconds(5))
val kafkaParams = Map(
ConsumerConfig.BOOTSTRAP_SERVER_CONFIG->"192.168.30.182:9092",  //写自己的kafka所在虚拟机ip
ConsumerConfig.GROUP_ID_CONFIG->"cmoo1",    //消费者组
ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG->classOf[StringDeserializer].getName,
ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG->classOf[StringDeserializer].getName,
ConsumerConfig.AUTO_OFFSET_RESET_CONFIG->"earliest"
)
//从Kafka消费数据
val ds=KafkaUtils.createDirectStream(ssc,LocationStrategies.preferConsistent,
ConsumerStrategies.Subscribe[String,String](Set("mydemo"),kafkaParams)
ds.mapPartitions(itercr=>{
val lb =ListBuffer[String]()
itercr.foreach(cr=>lb.append(cr.value()))
lb.iterator
}).foreachRDD(line=>println(line.collect().mkString("\n")))
ssc.start()
ssc.awaitTermination()

本文内容由网友自发贡献,版权归原作者所有,本站不承担相应法律责任。如您发现有涉嫌抄袭侵权的内容,请联系:hwhale#tublm.com(使用前将#替换为@)

SparkStreaming从kafka消费数据 的相关文章

  • 如何查看Pocketsphinx词典中是否存在该单词?

    我只是想看看字典文件中是否存在字符串 字典文件位于问题底部 我想检查语音识别器是否可以识别单词 例如 识别器将无法识别字符串ahdfojakdlfafiop 因为字典中没有定义 所以 我可以检查某个单词是否在 pocktsphinx 词典中
  • HTTP 状态 404 - 请求的资源不可用

    在使用 MyEclipse IDE 中的 Tomcat 服务器和 Struts 2 框架时 我遇到了反复出现的问题 我将我的程序作为服务器应用程序运行 当它运行时 默认的index jsp 文件将成功打开 但应用程序的其他过去都不起作用 当
  • java中的csv到pdf文件

    我正在尝试获得一个csv文件解析为pdf 到目前为止我所拥有的内容附在下面 我的问题是这段代码最终出现在 pdf 中的文件在 csv 文件的第一行被截断 我不明白为什么 附示例 本质上我想要一个没有任何操作的 csv 文件的 pdf 版本
  • 由于连接超时,无法通过 ImageIO.read(url) 获取图像

    下面的代码似乎总是失败 URL url new URL http userserve ak last fm serve 126 8636005 jpg Image img ImageIO read url System out printl
  • JTree 节点不会被直观地选择

    不知何故 我无法为我的 JTree 节点启用 选择突出显示 我正在我的项目中使用自定义单元格渲染器 这很可能导致此问题 这是完整的渲染器类代码 protected class ProfessionTreeCellRenderer exten
  • 如何对 IntStream 进行逆序排序

    我正在使用 txt 文件读取数字BufferedReader 我想颠倒该流中元素的顺序 以便在收集它们时 它们将从最高到最低排列 我不想在构建数组后进行排序 因为我不知道其中可能有多少元素 我只需要最高的 N 个元素 in new Buff
  • 使用 Spring 时实例化对象,用于测试与生产

    使用 Spring 时 应该使用 Spring 配置 xml 来实例化生产对象 并在测试时直接实例化对象 这样的理解是否正确 Eg MyMain java package org world hello import org springf
  • Spring Stomp over Websocket:流式传输大文件

    我的SockJs客户端在网页中 发送帧大小为16K的消息 消息大小限制决定了我可以传输的文件的最大大小 以下是我在文档中找到的内容 Configure the maximum size for an incoming sub protoco
  • 服务器到 Firebase HTTP POST 结果为响应消息 200

    使用 Java 代码 向下滚动查看 我使用 FCM 向我的 Android 发送通知消息 当提供正确的服务器密钥令牌时 我收到如下所示的响应消息 之后从 FCM 收到以下响应消息 Response 200 Success Message m
  • 使用 Java 在浏览器中下载 CSV 文件

    我正在尝试在 Web 应用程序上添加一个按钮 单击该按钮会下载一个 CSV 文件 该文件很小 大小仅约 4KB 我已经制作了按钮并附加了一个侦听器 文件也准备好了 我现在唯一需要做的就是创建单击按钮时下载 csv 文件的实际事件 假设 fi
  • 所有junit测试后的清理

    在我的项目中 我必须在所有测试之前进行一些存储库设置 这是使用一些棘手的静态规则来完成的 然而 在所有测试之后我不知道如何进行清理 我不想保留一些神奇的静态数字来引用所有测试方法的数量 我应该一直维护它 最受赞赏的方法是添加一些侦听器 该侦
  • 想要开发像 Facebook 这样的网站 - 处理数百万个请求 - 高性能 [关闭]

    很难说出这里问的是什么 这个问题是含糊的 模糊的 不完整的 过于宽泛的或修辞性的 无法以目前的形式得到合理的回答 如需帮助澄清此问题以便重新打开 访问帮助中心 help reopen questions 我想用 Java 开发一个像 Fac
  • tomcat 过滤所有 web 应用程序

    问题 我想对所有网络应用程序进行过滤 我创建了一个过滤器来监视对 apache tomcat 服务器的请求 举例来说 它称为 MyFilter 我在 netbeans 中创建了它 它创建了 2 个独立的目录 webpages contain
  • 在 Selenium WebDriver 上如何从 Span 标签获取文本

    在 Selenium Webdriver 上 如何从 span 标记检索文本并打印 我需要提取文本UPS Overnight Free HTML代码如下 div id customSelect 3 class select wrapper
  • 如何在android sdk上使用PowerMock

    我想为我的 android 项目编写一些单元测试和仪器测试 然而 我遇到了一个困扰我一段时间的问题 我需要模拟静态方法并伪造返回值来测试项目 经过一些论坛的调查 唯一的方法是使用PowerMock来模拟静态方法 这是我的 gradle 的一
  • struts 教程或示例

    我正在尝试在 Struts 中制作一个登录页面 这个想法是验证用户是否存在等 然后如果有错误 则返回到登录页面 错误显示为红色 典型的登录或任何表单页面验证 我想知道是否有人知道 Struts 中的错误管理教程 我正在专门寻找有关的教程 或
  • 重写Object类的finalize()方法有什么用?

    据我所知 在java中如果我们想手动调用垃圾收集器 我们可以执行System gc 1 我们在重写的finalize 方法中做了哪些操作 2 如果我们想手动调用JVM垃圾收集器 是否需要重写finalize 方法 我们在重写的 Finali
  • Java 编码风格、局部变量与重复方法调用

    我更喜欢使用局部变量而不是多次调用同一方法 I prefer this Vehicle vehicle person getVehicle if vehicle instanceof Car Car car Car vehicle car
  • MongoDB Java 驱动程序:MongoCore 驱动程序与 MongoDB 驱动程序与 MongoDB 异步驱动程序

    MongoDB Java 驱动程序有三种不同的驱动程序选项 核心驱动 MongoDB 驱动程序 MongoDB 异步驱动程序 The 驱动程序描述页面 https docs mongodb org ecosystem drivers jav
  • Java 推断泛型类型

    我正在寻找类似的推断捕获泛型类型的概念 类似于以下方法片段 但不是捕获泛型类型的类 public

随机推荐

  • 框架分析(4)-Spring

    框架分析 4 Spring 专栏介绍 Spring 核心特点 控制反转 IoC 面向切面编程 AOP 组件化 集成 简化开发 总结 优缺点 优点 高度可扩展 控制反转 IoC 面向切面编程 AOP 集成支持 轻量级 测试友好 社区活跃 缺点
  • props和state的区别

    区别 1 props是传递给组件的 类似于函数的形参 而state是在组件内部被组件自己管理的 类似于在一个函数内声明的变量 2 props是不可以被修改的 state是多变的 可被修改的 开发react组件 最常用到的两个引起组件渲染的可
  • springboot 日志配置

    logback xml与logback spring xml 配置文件的加载顺序 logback xml gt application properties gt logback spring xml 如果同时存在logback xml和l
  • C#字符串数值前加0将1转化成01

    定义两个数值字符串 string str1 1 string str2 01 在我们的主观感受里这两个在进行数值比较时都是1 应该是等价的 但进行字符比对时则不尽然 转化处理 str1 Convert ToDouble str1 ToStr
  • oracle下载页面改版后的 JDK 下载、安装及其环境配置

    oracle下载页面改版后的 JDK 下载 安装及其环境配置 时间 2020 06 10 下载链接 https www oracle com java technologies javase downloads html 变量名 JAVA
  • Onnx推理框架

    Onnx推理框架 ONNX 即 Open Neural Network Exchange 当我们使用Pytorch或者TensorFlow训练完成后 通常会将其模型转化为ONNX模型 ONNX模型一般用于中间部署阶段 然后再拿转化后的ONN
  • Java的快速排序代码

    public static void quickSort int arr int start int end if start lt end int partitionIndex partition arr start end quickS
  • ETL学习心得:探求数据仓库关键环节ETL的本质

    原文链接 http hi baidu com horsewhite blog item b167f81f6924ef0a304e15a0 html 做数据仓库系统 ETL是关键的一环 说大了 ETL是数据整合解决方案 说小了 就是倒数据的工
  • spring系统架构

    一 ioc控制反转 业务层每次都要new一个新的对象来实现对实例的创建 耦合度偏高 使用IOC的最终目的是解除多个类之间的耦合性 降低代码的耦合度 IOC核心概念 ioc容器 spring容器 负责对象的创建 初始化等一系列工作 在ioc容
  • 【读书笔记】游戏开发原理

    游戏开发原理读书笔记 Contents 游戏开发原理读书笔记 一 游戏与游戏设计 1 游戏类型与平台 1 1 类型和子类型 1 2 出品类型 1 3 平台 1 4 图形类型 1 5 交付方式 1 6 视角 2 video game剖析 2
  • Unity3D之ForceMode模式

    ForceMode是一种在物理引擎中使用的模式 用于模拟对象之间的力和运动 它常用于游戏开发 虚拟现实和机器人学等领域 ForceMode通常应用于刚体 Rigidbody 对象 通过施加力来影响物体的运动 它提供了不同的模式 可以根据需求
  • gitee使用教程--初学者【超易懂】

    将本地文件上传至git 1 进入gitee官网 注册并登录 工作台 Gitee com 2 点击右上角加号 选择 新建仓库 3 给自己的仓库添加名称与项目描述 4 创建好之后会看到自己的仓库地址 5 在本地新建一个文件夹用来存放需要上传到g
  • 常量和标识符

    常量 常量是固定值 在程序执行期间不会改变 这些固定的值 又叫做字面量 常量可以是任何的基本数据类型 可分为整型数字 浮点数字 字符 字符串和布尔值 常量就像是常规的变量 只不过常量的值在定义后不能进行修改 在 C 中 有两种简单的定义常量
  • 用户端APP自动化测试_L2

    目录 appium server 环境安装 capability 进阶用法 元素定位工具 高级定位技巧 xpath 定位 高级定位技巧 css 定位与原生定位 特殊控件 toast 识别 显式等待高级使用 高级控件交互方法 设备交互api
  • sass中变量引入html,Sass变量、嵌套_html/css_WEB-ITnose

    声明变量 定义变量的语法 Sass 的变量包括三个部分 声明变量的符号 变量名称 赋予变量的值 简单的示例 假设你的按钮颜色可以给其声明几个变量 1 brand primary darken 428bca 6 5 default 337ab
  • rust使用rhai和actix实现web接口

    初始化项目 cargo new acix rhai web 依赖 Cargo toml package name actix sim yt version 0 1 0 edition 2021 See more keys and their
  • flutter图片显示

    图片显示 本地图片显示 首先项目根目录下创建一个用于放置图片的文件夹 将要显示的图片放进去 如下图 然后在项目根目录的pubspec yaml文件中的assets下添加图片路径 如下图 在需要显示图片的地方使用Image asset 进行加
  • 2021CCPC河南省省赛

    文章目录 1001 收集金币 1002 使用技能 1003 欢度佳节 1005 闯关游戏 1010 小凯的书架 1001 收集金币 题目链接 dp i 0 表示前i个事件都没有选择使用技能 dp i 1 表示前i个事件已经选择使用技能了 i
  • 关于Qt下中静态变量的使用

    需求是这样的 在主窗口类Widget中启动一个子线程去执行录音操作 然后使用共享的静态变量来结束录音 在Widget类中发出停止命令 MyThread类则停止录音操作 status定义 class MyThread public QObje
  • SparkStreaming从kafka消费数据

    val spark SparkSession builder master local appName myKafka getOrCreate 5秒一个窗口 val ssc new StreamingContext spark sparkC