Kafka传输数据到Spark Streaming通过编写程序java、scala程序实现操作

2023-11-16

一、案例说明

现有一电商网站数据文件,名为buyer_favorite1,记录了用户对商品的收藏数据,数据以“\t”键分割,数据内容及数据格式如下:
在这里插入图片描述

二、前置准备工作

项目环境说明
Linux Ubuntu 16.04

jdk-7u75-linux-x64

scala-2.10.4

kafka_2.10-0.8.2.2

spark-1.6.0-bin-hadoop2.6

开启hadoop集群,zookeeper服务,开启kafka服务。再另开启一个窗口,在/apps/kafka/bin目录下创建一个topic。

/apps/zookeeper/bin/zkServer.sh start 
cd /apps/kafka  
bin/kafka-server-start.sh config/server.properties &  
cd /apps/kafka  
bin/kafka-topics.sh \  
--create \  
--zookeeper localhost:2181 \  
--replication-factor 1 \  
--topic kafkasendspark \  
--partitions 1

三、编写程序代码创建kafka的producer

1、新创一个文件folder命名为lib,并将jar包添加进来。(可以从我的博客主页资源里面下载)
2、进入以下界面,移除Scala Library。

在这里插入图片描述

3、操作完成后,再点击Add Library选项

在这里插入图片描述

4、进入以下界面

在这里插入图片描述

5、点击完成即可
6、最后创建如下项目结构的文件

在这里插入图片描述

四、编写代码,运行程序

编写生产者代码

package my.kafka;  
import java.io.BufferedReader;  
import java.io.File;  
import java.io.FileNotFoundException;  
import java.io.FileReader;  
import java.io.IOException;  
import java.util.Properties;  
import kafka.javaapi.producer.Producer;  
import kafka.producer.KeyedMessage;  
import kafka.producer.ProducerConfig;  
public class KafkaSend {  
    private final Producer<String, String> producer;  
  
    public final static String TOPIC = "kafkasendspark";  
  
    public KafkaSend(){  
        Properties props = new Properties();  
        // 此处配置的是kafka的端口  
        props.put("metadata.broker.list", "localhost:9092");  
        // 配置value的序列化类  
        props.put("serializer.class", "kafka.serializer.StringEncoder");  
        // 配置key的序列化类  
        props.put("key.serializer.class", "kafka.serializer.StringEncoder");  
        props.put("request.required.acks", "-1");  
        producer = new Producer<String, String>(new ProducerConfig(props));  
    }  
  
    void produce() {  
        int lineNo = 1;  
        File file = new File("/data/case6/buyer_favorite1");  
        BufferedReader reader = null;  
        try {  
            reader = new BufferedReader(new FileReader(file));  
            String tempString = null;  
  
            while ( (tempString = reader.readLine()) != null ) {  
                String key = String.valueOf(lineNo);  
                String data = tempString;  
                producer.send(new KeyedMessage<String, String>(TOPIC, key, data));  
                System.out.println(data);  
                lineNo++;  
  
                Thread.sleep(100);  
  
            }  
        } catch (FileNotFoundException e) {  
            System.err.println(e.getMessage());  
        } catch (IOException e) {  
            System.err.println(e.getMessage());  
        } catch (InterruptedException e) {  
            System.err.println(e.getMessage());  
        }  
    }  
    public static void main(String[] args) {  
        System.out.println("start");  
        new KafkaSend().produce();  
        System.out.println("finish");  
    }  
}  

编写消费者代码

package my.scala  
import org.apache.spark.SparkConf  
import org.apache.spark.streaming.StreamingContext  
import org.apache.spark.streaming.Seconds  
import scala.collection.immutable.Map  
import org.apache.spark.streaming.kafka.KafkaUtils  
import kafka.serializer.StringDecoder  
import kafka.serializer.StringDecoder  
object SparkReceive {  
  def main(args: Array[String]) {  
  
    val sparkConf = new SparkConf().setAppName("countuser").setMaster("local")  
    val ssc = new StreamingContext(sparkConf, Seconds(2))  
    ssc.checkpoint("checkpoint")  
    val topics = Set("kafkasendspark")  
    val brokers = "localhost:9092"  
    val zkQuorum = "localhost:2181"  
  
    val kafkaParams = Map[String, String](  
        "metadata.broker.list" -> brokers,  
        "serializer.class" -> "kafka.serializer.StringEncoder"  
    )  
  
  
    val lines = KafkaUtils.createDirectStream[String,String,StringDecoder,StringDecoder](ssc, kafkaParams, topics)  
    val addFunc = (currValues: Seq[Int], prevValueState: Option[Int]) => {  
      //通过Spark内部的reduceByKey按key规约,然后这里传入某key当前批次的Seq/List,再计算当前批次的总和  
      val currentCount = currValues.sum  
      // 已累加的值  
      val previousCount = prevValueState.getOrElse(0)  
      // 返回累加后的结果,是一个Option[Int]类型  
      Some(currentCount + previousCount)  
    }  
    val result=lines.map(line => (line._2.split("\t")) ).map( row => (row(0),1) ).updateStateByKey[Int](addFunc).print()  
  
    ssc.start();  
    ssc.awaitTermination()  
  }  
}  

五、运行程序

在Eclipse的SparkReceive类中右键并点击==>Run As==>Scala Application选项。

然后在KafkaSend类中:右键点击==>Run As==>Jave Application选项。

即可在控制窗口Console中查看输出结果为:
在这里插入图片描述

本文内容由网友自发贡献,版权归原作者所有,本站不承担相应法律责任。如您发现有涉嫌抄袭侵权的内容,请联系:hwhale#tublm.com(使用前将#替换为@)

Kafka传输数据到Spark Streaming通过编写程序java、scala程序实现操作 的相关文章

随机推荐

  • dword ptr指令讲解

    dword ptr指令讲解 8086CPU的指令 可以处理两种尺寸的数据 byte和word 所以在机器指令中要指明 指令进行的是字操作还是字节操作 对于这个问题 汇编语言中用一下方法处理 1 通过寄存器名指明要处理的数据的尺寸 例如 下面
  • linux配置交换内存(虚拟内存)

    虚拟内存 Virtual Memory 是操作系统内存管理的一种技术 它将主存虚拟化 使得程序可以获得更大的可用内存空间 虚拟内存的主要优点有 提高内存利用率 可以加载更大的程序到内存中执行 提供了内存保护 避免程序间相互干扰 实现了懒加载
  • 【FPGA多周期约束】

    多周期约束及语法 一 什么时候需要用到多周期约束 Vivado TimeQuest等时序引擎默认是按照单周期关系分析数据关系的 即数据在发起沿发送 在捕获被捕获 发起沿和捕获沿相差一个周期 但是很多情况是 数据路径逻辑较为复杂 导致延时较大
  • 朴素贝叶斯基本原理和预测过程、先验概率、后验概率、似然概率概念

    贝叶斯原理是英国数学家托马斯 贝叶斯提出的 贝叶斯原理 建立在主观判断的基础上 在我们不了解所有客观事实的情况下 同样可以先估计一个值 然后根据实际结果不断进行修正 举例 一个袋子里有10个球 其中6个黑球 4个白球 那么随机抓一个黑球的概
  • 关于电商秒杀系统中防超卖、以及高性能下单的处理方案简述

    秒杀抢购系统的成功平稳运行 有一些需要注意的知识点 1 高并发 以及刷接口等黑客请求对服务端的负载冲击 2 高并发时带来的超卖 即商品数量的控制 3 高负载下 下单的速度和成功率的保证 4 其他 以秒杀单品为例 如抢小米手机 解决方案探讨
  • 大型网站架构核心要素之可用性:高可用架构

    前言 上节我们讲了网站核心要素之性能 这节我们接着讲第二个核心要素可用性 网站的可用性 描述的是一个网站是否可以正常使用的特性 这个特性是比较关键的 直接影响公司形象和利益 因此也有很多大公司把这点作为技术人员的绩效考核之一 既然那么重要
  • Springboot毕设项目地铁站自动售票系统77x9w(java+VUE+Mybatis+Maven+Mysql)

    项目运行 环境配置 Jdk1 8 Tomcat8 5 Mysql HBuilderX Webstorm也行 Eclispe IntelliJ IDEA Eclispe MyEclispe Sts都支持 项目技术 Springboot myb
  • Plotly Express 详细使用指南,20组案例从入门到进阶(附源代码)

    作者 阳哥 出品 Python数据之道 ID PyDataLab 大家好 我是阳哥 今天跟大家分享的是 Plotly Express 的详细使用教程 Plotly Express 是 Python 交互式可视化库 Plotly 的高级组件
  • 【Deepin-15.11】下【Datax】使用【插件】进行【csv文件读写】

    接上 1 将Downloads目录下的压缩包放到指定文件夹下 题目要求 2 按照要求创建文件夹 题目要求 3 Github Datax txtfilereader模板官网将模板copy下来 写入文本文档并修改后缀名 复制到job 题目指定
  • Android Studio 中如何添加ViewModelProviders依赖?

    我的做法是在class类中直接导入文件 import androidx lifecycle ViewModelProvidels 一个小白 就当是记录一下啦
  • 86-信号和槽-信号与槽的参数

    信号与槽的参数 上节介绍了信号与槽的基本使用方法 本节介绍其参数传递的情况 通过为槽函数传递特定的参数 可以实现更复杂的功能 既可以传递 Qt 的内置参数 也可以传递自定义参数 当然 内置参数和自定义参数也可以放在一起传递 自定义参数既可以
  • 不习惯的 Vue3 起步六 の Echarts绘制下钻地图

    序 看过一些可视化大屏展示地图的例子 准备动手做做 既然要开始比制作 那么先把目标定好 做一个展示中国城市的下钻地图 使用 Vue3 Vite Typescript echarts 实现效果 准备工作 创建项目 因为准备使用Vue3 Vit
  • Vue——自定义指令

    自定义全局指令 注 使用指令时必须在指名名称前加前缀v 即v 指令名称 Vue directive hello bind 常用 alert 指令第一次绑定到元素上时调用 只调用一次 可执行初始化操作 inserted alert 被绑定元素
  • 【上位机】通过QTCreator编写WIFI上位机与网络调试助手通信绘制曲线

    文章目录 前言 一 使用QT Creator编写上位机 二 上位机与网络调试助手联调 三 总结 前言 17年电赛H题中要求编写WIFI上位机实现远程幅频特性曲线显示 以下是本人在近期摸索出来的一些心得及体会 一 使用QT Creator编写
  • 目前有哪些好用的测试管理工具?

    PingCode Testhub Zephyr for jira 禅道等都是当下不错的测试管理工具 其实就测试用例管理工具或Bug管理工具来说 当前市场上种类并不少 功能也各有特色 我们在工具选型过程中最大的问题并不是不知道有哪些好的工具
  • FastDFS单机部署安装

    FastDFS单机部署安装 文章目录 FastDFS单机部署安装 前言 1 服务器规划 2 安装包 3 所有tracker和storage节点都执行如下操作 3 1 安装所需的依赖包 3 2 安装libfatscommon 3 3 安装Fa
  • mac电脑屏幕录制Berrycast Mac屏幕录制软件

    Berrycast是一款为Mac设计的优秀屏幕录制软件 它让屏幕录制变得简单而高效 以下是Berrycast的一些主要特点 简单的用户界面 Berrycast拥有直观和简洁的用户界面 使得用户可以轻松上手 高质量的视频输出 Berrycas
  • Vue2开发插件并发布到npm

    Vue3 TS Vite开发插件并发布到npm 目标 创建vue amazing selector下拉框组件 并发布到npm 效果如下图 默认时样式 禁用时样式 创建vue项目 vue create vue amazing selector
  • 指针和引用的区别

    从概念上讲 指针从本质上讲就是存放变量地址的一个变量 在逻辑上是独立的 它可以被改变 包括其所指向的地址的改变和其指向的地址中所存放的数据的改变 而引用是一个别名 它在逻辑上不是独立的 它的存在具有依附性 所以引用必须在一开始就被初始化 而
  • Kafka传输数据到Spark Streaming通过编写程序java、scala程序实现操作

    一 案例说明 现有一电商网站数据文件 名为buyer favorite1 记录了用户对商品的收藏数据 数据以 t 键分割 数据内容及数据格式如下 二 前置准备工作 项目环境说明 Linux Ubuntu 16 04 jdk 7u75 lin