仿牛客社区项目(第五章)(上)

2023-11-02

第三章:Kafka,构建TB级异步消息系统

一、阻塞队列

  • BlockingQueue

    • 解决线程通信的问题。
    • 阻塞方法:puttake
  • 生产者消费者模式

    • 生产者:产生数据的线程。
    • 消费者:使用数据的线程。
  • 实现类

    • ArrayBlockingQueue
    • LinkedBlockingQueue
    • PriorityBlockingQueue、SynchronousQueue、DelayQueue等。

1. 阻塞队列测试方法

test 中添加 BlockingQueueTests类,来表示阻塞队列的测试方法,代码如下:

public class BlockingQueueTests {
    public static void main(String[] args) {
        BlockingQueue queue = new ArrayBlockingQueue(10);
        new Thread(new Producer(queue)).start();
        new Thread(new Consumer(queue)).start();
        new Thread(new Consumer(queue)).start();
        new Thread(new Consumer(queue)).start();
    }
}

class Producer implements Runnable{
    private BlockingQueue<Integer> queue;

    public Producer(BlockingQueue<Integer> queue) {
        this.queue = queue;
    }
    @Override
    public void run(){
        try{
            for(int i = 0; i < 20; i ++ ) {
                Thread.sleep(20);
                queue.put(i);
                System.out.println(Thread.currentThread().getName()+"生产:"+ queue.size());
            }
        }catch (Exception e){
            e.printStackTrace();
        }
    }
}

class Consumer implements Runnable{
    private BlockingQueue<Integer> queue;

    public Consumer(BlockingQueue<Integer> queue) {
        this.queue = queue;
    }

    @Override
    public void run() {
        try {
            while (true){
                Thread.sleep(new Random().nextInt(1000));
                queue.take();
                System.out.println(Thread.currentThread().getName()+"消费:"+ queue.size());
            }
        } catch (InterruptedException e) {
            e.printStackTrace();
        }
    }
}

2. 测试结果

Thread-0生产:1
Thread-0生产:2
Thread-0生产:3
Thread-0生产:4
Thread-0生产:5
Thread-0生产:6
Thread-0生产:7
Thread-0生产:8
Thread-0生产:9
Thread-0生产:10
Thread-3消费:9
Thread-0生产:10
Thread-1消费:9
Thread-0生产:10
Thread-2消费:9
Thread-0生产:10
Thread-3消费:9
Thread-0生产:10
Thread-1消费:9
Thread-0生产:10
Thread-2消费:9
Thread-0生产:10
Thread-1消费:9
Thread-0生产:10
Thread-1消费:9
Thread-0生产:10
Thread-3消费:9
Thread-0生产:10
Thread-1消费:9
Thread-0生产:10
Thread-1消费:9
Thread-1消费:8
Thread-2消费:7
Thread-1消费:6
Thread-2消费:5
Thread-1消费:4
Thread-3消费:3
Thread-3消费:2
Thread-2消费:1
Thread-3消费:0

二、Kafka入门

  • Kafka简介

    • Kafka是一个分布式的流媒体平台。
    • 应用:消息系统、日志收集、用户行为追踪、流式处理。
  • Kafka特点

    • 高吞吐量、消息持久化、高可靠性、高扩展性。
  • Kafka术语

    • BrokerZookeeper
    • TopicPartitionOffset
    • Leader ReplicaFollower Replica

1. Kafka下载

Kafka官网: https://kafka.apache.org/
在这里插入图片描述

2. Kafka安装与配置

下载Kafka的安装包后进行解压,就相当于安装成功了。
需要进行以下配置:
修改 config包下的 zookeeper.properties:
在这里插入图片描述
修改 config包下的 server.properties:
在这里插入图片描述
 

3. Kafka的启动

首先在命令行中启动 Zookeeper:

C:\Users\Andrew> d:
D:\> cd d:\kafka_2.13-3.2.3
d:\kafka_2.13-3.2.3>bin\windows\zookeeper-server-start.bat config\zookeeper.properties

启动成功后不关闭此窗口,重新打开一个新的命令窗口,用于启动 kafka

C:\Users\Andrew> d:
D:\> cd d:\kafka_2.13-3.2.3
d:\kafka_2.13-3.2.3>bin\windows\kafka-server-start.bat config\server.properties

注意: 当遇到“‘wmic’不是内部或外部命令,也不是可运行程序”。
在C盘下找到wbem文件夹,且里面包含WMIC.exe,将其添加到系统变量path中去。
比如我的路径是:C:\Windows\System32\wbem,在系统变量path中新建该路径。就可以正常启动Kafka了。

4. Kafka使用

  • 创建主题
    cd到 …\kafka_2.13-2.8.0\bin\windows这里,然后输入 kafka-topics.bat --create --bootstrap-server localhost:9092 --replication-factor 1 --partitions 1 --topic test
    创建服务器端口号为9092(Kafka默认端口号)的topic,指生产者发布消息存储的位置在该服务器上localhost:9092--replication-factor 1 指1个副本。--partitions 1 指1个分区。--topic test 指该主题名为 test
    在这里插入图片描述

  • 以生产者身份发送消息
    输入:kafka-console-producer.bat --broker-list localhost:9092 --topic test
    生产者身份打开服务器列表中为localhost:9092的服务器上的test主题。--broker-list 指服务器列表。
    并且输入要发送的消息:
    在这里插入图片描述

  • 以消费者身份读取消息
    新打开一个命令行窗口,且cd到…\kafka_2.13-2.8.0\bin\windows,并输入:kafka-console-consumer.bat --bootstrap-server localhost:9092 --topic test --from-beginning
    可以看到生产者发送的消息。并且这个消息队列中可以实时传送消息。
    比如在生产者的命令行中继续输入信息,很快在消费者这边也能得到消息。
    在这里插入图片描述
     

三、Spring整合Kafka

  • 引入依赖

    • spring-kafka
  • 配置Kafka

    • 配置 serverconsumer
  • 访问Kafka

    • 生产者:
      kafkaTemplate.send(topic, data);
    • 消费者:
      @KafkaListener(topics = {"test"}
      public void handleMessage(ConsumerRecord record) {}

1. 引入依赖

pom.xml 添加相关的依赖:

		<dependency>
			<groupId>org.springframework.kafka</groupId>
			<artifactId>spring-kafka</artifactId>
			<version>2.8.6</version>
		</dependency>	

2. 配置Kafka

#KafkaProperties
spring.kafka.bootstrap-servers=localhost:9092
spring.kafka.consumer.group-id=test-consumer-group
spring.kafka.consumer.enable-auto-commit=true
spring.kafka.consumer.auto-commit-interval=3000

3. 测试

test 包下添加 KafkaTests 类,代码如下:

@RunWith(SpringRunner.class)
@SpringBootTest
@ContextConfiguration(classes = CommunityApplication.class)
public class KafkaTests {

    @Autowired
    private KafkaProducer kafkaProducer;

    @Test
    public void testKafka() {
        kafkaProducer.sendMessage("test", "你好");
        kafkaProducer.sendMessage("test", "在吗");

        try {
            Thread.sleep(1000 * 20);
        } catch (InterruptedException e) {
            e.printStackTrace();
        }
    }

}

@Component
class KafkaProducer {

    @Autowired
    private KafkaTemplate kafkaTemplate;

    public void sendMessage(String topic, String content) {
        kafkaTemplate.send(topic, content);
    }

}

@Component
class KafkaConsumer {

    @KafkaListener(topics = {"test"})
    public void handleMessage(ConsumerRecord record) {
        System.out.println(record.value());
    }
}

 

4. 测试结果

在这里插入图片描述
 
 
创作不易,如果有帮助到你,请给题解点个赞和收藏,让更多的人看到!!!
关注博主不迷路,内容持续更新中。

本文内容由网友自发贡献,版权归原作者所有,本站不承担相应法律责任。如您发现有涉嫌抄袭侵权的内容,请联系:hwhale#tublm.com(使用前将#替换为@)

仿牛客社区项目(第五章)(上) 的相关文章

  • 如何将 .cer 证书导入 java 密钥库?

    在开发 Java Web 服务客户端期间 我遇到了一个问题 Web 服务的身份验证使用客户端证书 用户名和密码 我从网络服务背后的公司收到的客户端证书位于 cer格式 当我使用文本编辑器检查该文件时 它具有以下内容 BEGIN CERTIF
  • Spring Data:限制自定义查询的结果

    在我的 Spring 数据存储库中 我 必须 使用自定义查询 Query注解 我知道我可以限制这样的命名查询中的结果数量 Iterable
  • 如何将列表转换为地图?

    最近我和一位同事讨论了转换的最佳方式是什么List to Map在 Java 中 这样做是否有任何具体的好处 我想知道最佳的转换方法 如果有人可以指导我 我将非常感激 这是个好方法吗 List
  • 从 Bitmap 类创建 .bmp 图像文件

    我创建了一个使用套接字的应用程序 客户端在其中接收图像并将图像数据存储在 Bitmap 类中 谁能告诉我如何创建一个名为我的图像 png or 我的图像 bmp来自此 Bitmap 对象 String base64Code dataInpu
  • 使用 java 的 RAR 档案 [关闭]

    就目前情况而言 这个问题不太适合我们的问答形式 我们希望答案得到事实 参考资料或专业知识的支持 但这个问题可能会引发辩论 争论 民意调查或扩展讨论 如果您觉得这个问题可以改进并可能重新开放 访问帮助中心 help reopen questi
  • 适用于 Solaris 的 Java 8 中缺少 javaws

    看起来 Oracle 从 Java 8 for Solaris 中删除了 Java Web Start javaws 在 Java 8u51 中不再可用 来自兼容性指南 http www oracle com technetwork jav
  • Jodatime 日期格式

    是否可以格式化 JodaTime 日期 这是代码 private static LocalDate priorDay LocalDate date1 do date1 date1 plusDays 1 while date1 getDayO
  • 总结二维数组

    鉴于我当前的程序 我希望它在用户输入所有值后计算每列和每行的总和 我当前的代码似乎只是将数组的值加倍 这不是我想要做的 例如 如果用户输入具有以下值 1 2 3 2 3 4 3 4 5 的 3x3 矩阵 则看起来就像我在下面的程序中对其进行
  • Java 反射:如何检索匿名内部类?

    我在另一个类中有一个匿名内部类 SomeClass Both SomeClass class getClasses and SomeClass class getDeclaredClasses 返回空数组 我在中找不到一些关于此的提示Cla
  • 将 EditText 聚焦在设备上运行的 PopupWindow 中时出现异常

    我正在为 Android 开发一个弹出窗口 它正在工作 我在上面添加了一个 EditText 和一个按钮 当在 ADV 上运行时 它可以正常工作 而在设备上运行时 当我专注于 EditText 时 这会抛出一个奇怪的异常 android v
  • 在Java中使用BufferedWriter写入文件时监视文件大小?

    我正在将一个可能很长的项目列表写入文件 我正在写的项目的长度是可变的 如果生成的文件大小大于10M 则应将其分成多个文件 为了提高性能 我目前使用 BufferedWriter 如下所示 final FileOutputStream fos
  • 在 JavaFX 中拖动未装饰的舞台

    我希望将舞台设置为 未装饰 使其可拖动且可最小化 问题是我找不到这样做的方法 因为我遇到的示例是通过插入到主方法中的方法来实现的 我想通过控制器类中声明的方法来完成此操作 就像我如何使用下面的 WindowClose 方法来完成此操作 这是
  • Akka 和 spring 配置

    我正在尝试将 akka 与 spring 结合起来 但没有成功 基本上 我的应用程序似乎不习惯读取 akka 模式 具有架构的 service context xml 的一部分
  • 在Spring-Boot中,我们如何在同一个项目中连接两个数据库(Mysql数据库和MongoDB)?

    我正在尝试创建一个 Spring Boot 项目 其中我有一个要求 我想连接到不同的数据库 MySql 和 MongoDB 我是否需要做一些特殊的事情来连接到这两个数据库 或者 spring boot 会自动计算出自己连接到这两个数据库 我
  • java Runtime.getRunTime().exec 和通配符?

    我正在尝试使用删除垃圾文件 Process p Runtime getRuntime exec 只要我不使用通配符 它 就可以正常工作 即 Process p Runtime getRuntime exec bin rm f specifi
  • 处理照片上传的最佳方式是什么?

    我正在为一个家庭成员的婚礼制作一个网站 他们要求的一个功能是一个照片部分 所有客人都可以在婚礼结束后前往并上传他们的照片 我说这是一个很棒的想法 然后我就去实现它 那么只有一个问题 物流 上传速度很慢 现代相机拍摄的照片很大 2 5 兆 我
  • Apache Kafka 是否提供异步订阅回调 API?

    我的项目正在将 Apache Kafka 视为老化的基于 JMS 的消息传递方法的潜在替代品 为了让这个过渡尽可能的顺利 如果替代的排队系统 Kafka 有一个异步订阅机制那就更理想了 类似于我们当前项目使用的JMS机制MessageLis
  • 如何配置嵌入式 MongoDB 以在 Spring Boot 应用程序中进行集成测试?

    我有一个相当简单的 Spring Boot 应用程序 它公开一个小型 REST API 并从 MongoDB 实例检索数据 对 MongoDB 实例的查询通过基于 Spring Data 的存储库 下面的一些关键代码 Main applic
  • Python 可以替代 Java 小程序吗?

    除了制作用于物理模拟 如抛射运动 重力等 的教育性 Java 小程序之外 还有其他选择吗 如果你想让它在浏览器中运行 你可以使用PyJamas http pyjs org 这是一个 Python 到 Javascript 的编译器和工具集
  • spring data jpa 过滤 @OneToMany 中的子项

    我有一个员工测试实体是父实体并且FunGroup信息子实体 这两个实体都是通过employeeId映射 我需要一种方法来过滤掉与搜索条件匹配的子实体 以便结果仅包含父实体和子实体 满足要求 员工测试类 Entity name Employe

随机推荐

  • Node.js到底是个啥?干什么用的?优缺点是什么?

    Nodejs简介 Node js是一个Javascript运行环境 runtime 是一个可以快速构建网络服务及应用的平台 是用Javascript语言构建的服务平台 可用于后端建立服务器 Node js与Javascript的区别 nod
  • error: cannot call member function ‘void me::sendMessage()‘ without object

    error cannot call member function void me sendMessage without object 原因分析 解决方案 原因分析 在connect中 传递函数地址不用带括号 参考函数指针的赋值 incl
  • 将tensorflow模型部署到服务器上

    基本思路 利用tensorflow官方提供的tensorflow serving进行部署 同时 为了免去环境配置等麻烦操作 可借助docker容器 一 服务器环境选择 首先肯定要去租一个服务器 例如阿里云 一开始选了window serve
  • C++ 如何调用 通过Boost.python 封装的python函数(安装与配置注意事项)

    一 下载好相匹配的版本python与boost 1 建议使用新版的比较方便 也没有太多的bug 2 我使用的Boost 库是1 82 0 点击下载 Boost 1 82 0 3 使用的是python 3 10 11版本 点击下载 pytho
  • 【代码扫描修复】绝对路径遍历(高危)

    漏洞描述 摘要 允许用户输入控制文件系统操作所用的路径会导致攻击者能够访问或修改其他受保护的系统资源 缺陷描述 当满足以下两个条件时 就会产生路径遍历错误 攻击者可以指定某一文件系统操作中所使用的路径 攻击者可以通过指定特定资源来获取某种权
  • SpringMVC架构浅析

    SpringMVC概述 Spring的web框架围绕DispatcherServlet设计 DispatcherServlet的作用是将请求分发到不同的处理器 Spring的web框架包括可配置的处理器 handler 映射 视图 view
  • 服装销售管理系统---课程设计(C/C++简易版)

    目录 基于大一学期对C C 的学习做的一个关于实现一个服装销售管理系统的课程设计 强化自己关于面向对象 OOP 编程思想 耗时4 5天左右 功能大抵实现 流程图 源代码 总结 基于大一学期对C C 的学习做的一个关于实现一个服装销售管理系统
  • 腾讯安全技术类面试

    初试 2014 09 23 被分配到 安全技术类 在海航威斯汀酒店 五楼签到 面试房间2009 时间14 30 因为没有把握 反正要被刷掉的 于是我随便穿了一件红色的短衬衫和黑色的小短裤就去了 发现好冷 面试的时候又很饿 那个房间关门了 按
  • STM32F103芯片的基本硬件设计:下载、复位、启动设置、晶振

    1 下载口 一般情况下我们都是用SWD方式下载 一般有两种接线方式 一种4线 VCC GND SWDIO接10K上拉 SWCLK接10K下拉 一种是5线的 在4线的基础上增加了一个NRST上拉10K 但其实没必要 因为NRST是复位脚 电路
  • 变态青蛙跳台阶的两种典型分析方法

    变态青蛙跳台阶的两种典型分析方法 最近看到递归相关的算法 有个变态青蛙跳台阶的延伸问题还蛮有趣的 题目如下 拿出来分析一下 一只青蛙一次可以跳上1级台阶 也可以跳上2级 它也可以跳上n级 求该青蛙跳上一个n级的台阶总共有多少种跳法 方法一
  • python爬虫招聘网站(智联)

    2021年10月7日爬取 爬虫代码不知道是否失效 文章目录 爬虫目标 具体过程 源码 爬虫目标 要求 搜索 大数据 专业 爬相关公司的招聘信息 列数不少于10列 行数不少于3000 目标 搜索 大数据 爬取智联招聘 北京上海广州深圳天津武汉
  • maven查看jar的pom引入来源

    从idea中点击 Maven Projects 后点击Show Dependencies 如图所示 得到依赖关系图 如下 在页面进行 Ctrl F 搜索需要的 Jar 名称 例 查找 spring beans 双击框定的地方 就能进入到对应
  • 一分钟快速利用ChatGPT生成PPT

    目标 让AI给我们生成一篇PPT报告 首先介绍一下什么是ChatGPT ChatGPT是一种基于自然语言处理技术的人工智能应用 它使用OpenAI的GPT模型来自动生成自然语言的回复 可以作为虚拟助手 客服机器人等方面的应用 与其他机器学习
  • SOC芯片中VIP和IP之间的路由关系

    通用PAD是双向端口 inout 这就意味着每个通用PAD可以根据需要被配置成输入或输出 如图1所示 图1 ind是输入端口 do是输出端口 obe是输出使能信号 当obe为低电平时 PAD作为输入端口使用 三态门关闭 do高阻 片外数据通
  • nm 命令显示

    用途 显示关于对象文件 可执行文件以及对象文件库里的符号信息 语法 nm A C X 32 64 32 64 f h l p r T v B P e g u d o x t Format File 描述 nm 命令显示关于指定 File 中
  • FISCO BCOS工程师常用的性能分析工具推荐

    FISCO BCOS是完全开源的联盟区块链底层技术平台 由金融区块链合作联盟 深圳 简称金链盟 成立开源工作组通力打造 开源工作组成员包括博彦科技 华为 深证通 神州数码 四方精创 腾讯 微众银行 亦笔科技和越秀金科等金链盟成员机构 代码仓
  • Hibernate学习笔记 查询简介

    创建实体类 在介绍Hibernate查询语言之前 首先我们来建立一下数据库 这里直接使用了MySQL自带的样例数据库world 如果你没有安装MySQL那么需要安装一下 并且在安装的时候选择安装样例数据库 安装完成之后 应该能在MySQL中
  • 《区块链技术与应用》学习笔记13——ETH权益证明

    矿工挖矿是为了取得出块奖励 获取收益 而系统给予出块奖励的目的是激励矿工参与区块链系统维护 进行记账 而挖矿本质上是看矿工投入资金来决定的 投入资金买设备 gt 设备决定算力 gt 算力比例决定收益 那么 为什么不直接拼 钱 呢 现状是用钱
  • getchar与scanf的区别

    getchar getchar先读取一个字符放到ch里面去 如果这个字符不等于EOF 就进入循环 打印这个字符 当getchar读到文件末尾或者结束时 它会返回一个EOF 此时结束循环 输入A 输出A 输入b 输出b 当我们想要结束时 输入
  • 仿牛客社区项目(第五章)(上)

    文章目录 第三章 Kafka 构建TB级异步消息系统 一 阻塞队列 1 阻塞队列测试方法 2 测试结果 二 Kafka入门 1 Kafka下载 2 Kafka安装与配置 3 Kafka的启动 4 Kafka使用 三 Spring整合Kafk