JavaWeb_LeadNews_Day11-KafkaStream实现实时计算文章分数

2023-11-01

JavaWeb_LeadNews_Day11-KafkaStream实现实时计算文章分数

KafkaStream

概述

  • Kafka Stream: 提供了对存储与Kafka内的数据进行流式处理和分析的功能
  • 特点:
    • Kafka Stream提供了一个非常简单而轻量的Library, 它可以非常方便地嵌入任意Java应用中, 也可以任意方式打包和部署
    • 除了Kafka外, 无任何外部依赖
    • 通过可容错地state, store实现高效地状态操作(如windowed join和aggregation)
    • 支持基于事件时间地窗口操作, 并且可处理晚到的数据(late arrival of records)
  • 关键概念:
    • 源处理器(Source Processor):源处理器是一个没有任何上游处理器的特殊类型的流处理器。它从一个或多个kafka主题生成输入流。通过消费这些主题的消息并将它们转发到下游处理器.
    • Sink处理器: sink处理器是一个没有下游流处理器的特殊类型的流处理器。它接收上游流处理器的消息发送到一个指定的Kafka主题
  • KStream:
    • 数据结构类似于map, key-value键值对.
    • 一段顺序的, 无限长, 不断更新的数据集.

案例-统计单词个数

  • 依赖
    依赖中有排除部分依赖, 还是一整个放上来好了
    <dependencies>
        <dependency>
            <groupId>org.springframework.boot</groupId>
            <artifactId>spring-boot-starter-web</artifactId>
        </dependency>
        <!-- kafkfa -->
        <dependency>
            <groupId>org.springframework.kafka</groupId>
            <artifactId>spring-kafka</artifactId>
            <exclusions>
                <exclusion>
                    <groupId>org.apache.kafka</groupId>
                    <artifactId>kafka-clients</artifactId>
                </exclusion>
            </exclusions>
        </dependency>
        <dependency>
            <groupId>org.apache.kafka</groupId>
            <artifactId>kafka-clients</artifactId>
        </dependency>
        <dependency>
            <groupId>com.alibaba</groupId>
            <artifactId>fastjson</artifactId>
        </dependency>
        <dependency>
            <groupId>org.apache.kafka</groupId>
            <artifactId>kafka-streams</artifactId>
            <exclusions>
                <exclusion>
                    <artifactId>connect-json</artifactId>
                    <groupId>org.apache.kafka</groupId>
                </exclusion>
                <exclusion>
                    <groupId>org.apache.kafka</groupId>
                    <artifactId>kafka-clients</artifactId>
                </exclusion>
            </exclusions>
        </dependency>
    </dependencies>
    
  • 流式处理
    public class KafkaStreamQuickStart {
        public static void main(String[] args) {
    
            // Kafka的配置信息
            Properties prop = new Properties();
            prop.put(StreamsConfig.BOOTSTRAP_SERVERS_CONFIG, "192.168.174.133:9092");
            prop.put(StreamsConfig.DEFAULT_KEY_SERDE_CLASS_CONFIG, Serdes.String().getClass());
            prop.put(StreamsConfig.DEFAULT_VALUE_SERDE_CLASS_CONFIG, Serdes.String().getClass());
            prop.put(StreamsConfig.APPLICATION_ID_CONFIG, "streams-quickstart");
    
            // Stream 构建器
            StreamsBuilder streamsBuilder = new StreamsBuilder();
    
            // 流式计算
            streamProcessor(streamsBuilder);
    
            // 创建KafkaStream对象
            KafkaStreams kafkaStreams = new KafkaStreams(streamsBuilder.build(), prop);
    
            // 开启流式计算
            kafkaStreams.start();
        }
    
        /**
         * 流式计算
         * 消息的内容: hello kafka
         * @param streamsBuilder
         */
        private static void streamProcessor(StreamsBuilder streamsBuilder) {
            // 创建Kstream对象, 同时指定从哪个topic中接收消息
            KStream<String, String> stream = streamsBuilder.stream("itcast-topic-input");
            /**
             * 处理消息的value
             */
            stream.flatMapValues(new ValueMapper<String, Iterable<String>>() {
                @Override
                public Iterable<String> apply(String value) {
                    String[] valAry = value.split(" ");
                    return Arrays.asList(valAry);
                }
            })
                    // 按照value聚合处理
                    .groupBy((key, value)->value)
                    // 时间窗口
                    .windowedBy(TimeWindows.of(Duration.ofSeconds(10)))
                    // 统计单词的个数
                    .count()
                    // 转换为KStream
                    .toStream()
                    .map((key, value)->{
                        System.out.println("key:"+key+",value:"+value);
                        return new KeyValue<>(key.key().toString(), value.toString());
                    })
                    // 发送消息
                    .to("itcast-topic-out");
        }
    }
    
  • 发送消息
    for (int i = 0; i < 5; i++) {
        ProducerRecord<String, String> kvProducerRecord = new ProducerRecord<>("icast-topic-input", "hello kafka");
        producer.send(kvProducerRecord);
    }
    
  • 接收消息
    // 订阅主题
    consumer.subscribe(Collections.singleton("itcast-topic-out"));
    

SpringBoot集成

  • 配置
    config.java
    /**
     * 通过重新注册KafkaStreamsConfiguration对象,设置自定配置参数
     */
    @Setter
    @Getter
    @Configuration
    @EnableKafkaStreams
    @ConfigurationProperties(prefix="kafka")
    public class KafkaStreamConfig {
        private static final int MAX_MESSAGE_SIZE = 16* 1024 * 1024;
        private String hosts;
        private String group;
        @Bean(name = KafkaStreamsDefaultConfiguration.DEFAULT_STREAMS_CONFIG_BEAN_NAME)
        public KafkaStreamsConfiguration defaultKafkaStreamsConfig() {
            Map<String, Object> props = new HashMap<>();
            props.put(StreamsConfig.BOOTSTRAP_SERVERS_CONFIG, hosts);
            props.put(StreamsConfig.APPLICATION_ID_CONFIG, this.getGroup()+"_stream_aid");
            props.put(StreamsConfig.CLIENT_ID_CONFIG, this.getGroup()+"_stream_cid");
            props.put(StreamsConfig.RETRIES_CONFIG, 10);
            props.put(StreamsConfig.DEFAULT_KEY_SERDE_CLASS_CONFIG, Serdes.String().getClass());
            props.put(StreamsConfig.DEFAULT_VALUE_SERDE_CLASS_CONFIG, Serdes.String().getClass());
            return new KafkaStreamsConfiguration(props);
        }
    }
    
    application.yml
    kafka:
      hosts: 192.168.174.133:9092
      group: ${spring.application.name}
    
    BeanConfig.java
    @Slf4j
    @Configuration
    public class KafkaStreamHelloListener {
    
        @Bean
        public KStream<String, String> KStream(StreamsBuilder streamsBuilder)
        {
            // 创建Kstream对象, 同时指定从哪个topic中接收消息
            KStream<String, String> stream = streamsBuilder.stream("itcast-topic-input");
            /**
             * 处理消息的value
             */
            stream.flatMapValues(new ValueMapper<String, Iterable<String>>() {
                        @Override
                        public Iterable<String> apply(String value) {
                            String[] valAry = value.split(" ");
                            return Arrays.asList(valAry);
                        }
                    })
                    // 按照value聚合处理
                    .groupBy((key, value)->value)
                    // 时间窗口
                    .windowedBy(TimeWindows.of(Duration.ofSeconds(10)))
                    // 统计单词的个数
                    .count()
                    // 转换为KStream
                    .toStream()
                    .map((key, value)->{
                        System.out.println("key:"+key+",value:"+value);
                        return new KeyValue<>(key.key().toString(), value.toString());
                    })
                    // 发送消息
                    .to("itcast-topic-out");
            return stream;
        }
    }
    

实时计算文章分值

  1. nacos: leadnews-behavior配置kafka生产者
    kafka:
      bootstrap-servers: 192.168.174.133:9092
      producer:
        retries: 10
        key-serializer: org.apache.kafka.common.serialization.StringSerializer
        value-serializer: org.apache.kafka.common.serialization.StringSerializer
    
  2. 修改leadnews-behavior
    like
    public ResponseResult likesBehavior(LikesBehaviorDto dto) {
    
        ...
    
        UpdateArticleMess mess = new UpdateArticleMess();
        mess.setArticleId(dto.getArticleId());
        mess.setType(UpdateArticleMess.UpdateArticleType.LIKES);
    
        if(dto.getOperation() == 0){
            ...
            mess.setAdd(1);
        }else{
            ...
            mess.setAdd(-1);
        }
    
        // kafka: 发送消息, 数据聚合
        kafkaTemplate.send(HotArticleConstants.HOT_ARTICLE_SCORE_TOPIC, JSON.toJSONString(mess));
    
        ...
    }
    
    view
    public ResponseResult readBehavior(ReadBehaviorDto dto) {
    
        ...
    
        // kafka: 发送消息, 数据聚合
        UpdateArticleMess mess = new UpdateArticleMess();
        mess.setArticleId(dto.getArticleId());
        mess.setType(UpdateArticleMess.UpdateArticleType.VIEWS);
        mess.setAdd(1);
        kafkaTemplate.send(HotArticleConstants.HOT_ARTICLE_SCORE_TOPIC, JSON.toJSONString(mess));
    
        ...
    }
    
  3. leadnews-article中添加流式聚合处理
    @Slf4j
    @Configuration
    public class HotArticleStreamHandler {
    
        @Bean
        public KStream<String, String> KStream(StreamsBuilder streamsBuilder)
        {
            // 接收消息
            KStream<String, String> stream = streamsBuilder.stream(HotArticleConstants.HOT_ARTICLE_SCORE_TOPIC);
    
            // 聚合流式处理
            stream.map((key, value)->{
                UpdateArticleMess mess = JSON.parseObject(value, UpdateArticleMess.class);
                // 重置消息的key和value
                return new KeyValue<>(mess.getArticleId().toString(), mess.getType().name()+":"+mess.getAdd());
            })
                    // 根据文章id进行聚合
                    .groupBy((key, value)->key)
                    // 时间窗口
                    .windowedBy(TimeWindows.of(Duration.ofSeconds(10)))
                    // 自行实现聚合计算
                    .aggregate(
                            // 初始方法, 返回值是消息的value
                            new Initializer<String>() {
                                @Override
                                public String apply() {
                                    return "COLLECTION:0,COMMENT:0,LIKES:0,VIEWS:0";
                                }
                            },
                            // 真正的聚合操作, 返回值是消息的value
                            new Aggregator<String, String, String>() {
                                @Override
                                public String apply(String key, String value, String aggValue) {
                                    if(StringUtils.isBlank(value)){
                                        return aggValue;
                                    }
                                    String[] aggAry = aggValue.split(",");
                                    int col=0, com=0, lik=0, vie=0;
                                    for (String agg : aggAry) {
                                        String[] split = agg.split(":");
                                        /**
                                        * 获得初始值, 也是时间窗口内计算之后的值
                                        */
                                        switch (UpdateArticleMess.UpdateArticleType.valueOf(split[0]))
                                        {
                                            case COLLECTION:
                                                col = Integer.parseInt(split[1]);
                                                break;
                                            case COMMENT:
                                                com = Integer.parseInt(split[1]);
                                                break;
                                            case LIKES:
                                                lik = Integer.parseInt(split[1]);
                                                break;
                                            case VIEWS:
                                                vie = Integer.parseInt(split[1]);
                                                break;
                                        }
                                    }
                                    // 累加操作
                                    String[] valAry = value.split(":");
                                    switch (UpdateArticleMess.UpdateArticleType.valueOf(valAry[0]))
                                    {
                                        case COLLECTION:
                                            col += Integer.parseInt(valAry[1]);
                                            break;
                                        case COMMENT:
                                            com += Integer.parseInt(valAry[1]);
                                            break;
                                        case LIKES:
                                            lik += Integer.parseInt(valAry[1]);
                                            break;
                                        case VIEWS:
                                            vie += Integer.parseInt(valAry[1]);
                                            break;
                                    }
                                    String formatStr = String.format("COLLECTION:%d,COMMENT:%d,LIKES:%d,VIEWS:%d", col, com, lik, vie);
                                    System.out.println("文章的id "+key);
                                    System.out.println("当前时间窗口内的消息处理结果: "+formatStr);
                                    return formatStr;
                                }
                            }, Materialized.as("hot-article-stream-count-001")
                    )
                    .toStream()
                    .map((key, value)->{
                        return new KeyValue<>(key.key().toString(), formatObj(key.key().toString(), value));
                    })
                    // 发送消息
                    .to(HotArticleConstants.HOT_ARTICLE_INCR_HANDLE_TOPIC);
            return stream;
        }
    
        /**
        * 格式化消息的value数据
        * @param articleId
        * @param value
        * @return
        */
        private String formatObj(String articleId, String value) {
            ArticleVisitStreamMess mess = new ArticleVisitStreamMess();
            mess.setArticleId(Long.valueOf(articleId));
    
            // COLLECTION:0,COMMENT:0,LIKES:0,VIEWS:0
            String[] valAry = value.split(",");
            for (String val : valAry) {
                String[] split = val.split(":");
                switch (UpdateArticleMess.UpdateArticleType.valueOf(split[0]))
                {
                    case COLLECTION:
                        mess.setCollect(Integer.parseInt(split[1]));
                        break;
                    case COMMENT:
                        mess.setComment(Integer.parseInt(split[1]));
                        break;
                    case LIKES:
                        mess.setLike(Integer.parseInt(split[1]));
                        break;
                    case VIEWS:
                        mess.setView(Integer.parseInt(split[1]));
                        break;
                }
            }
            
            log.info("聚合消息处理之后的结果为: {}", JSON.toJSONString(mess));
            return JSON.toJSONString(mess);
        }
    
    }
    
    
  4. leadnews-article添加聚合数据监听器
    @Slf4j
    @Component
    public class ArticleIncrHandlerListener {
    
        @Autowired
        private ApArticleService apArticleService;
        
        @KafkaListener(topics = HotArticleConstants.HOT_ARTICLE_INCR_HANDLE_TOPIC)
        public void onMessage(String mess)
        {
            if(StringUtils.isNotBlank(mess)){
                ArticleVisitStreamMess articleVisitStreamMess = JSON.parseObject(mess, ArticleVisitStreamMess.class);
                apArticleService.updateScore(articleVisitStreamMess);
                System.out.println(mess);
            }
        }
    
    }
    
  5. 替换redis中的热点数据
    /**
     * 更新 文章分值, 缓存中的热点文章数据
     * @param mess
     */
    @Override
    public void updateScore(ArticleVisitStreamMess mess) {
        // 1. 更新文章的阅读, 点赞, 收藏, 评论的数量
        ApArticle apArticle = updateArticleBehavior(mess);
    
        // 2. 计算文章的分值
        Integer score = hotArticleService.computeArticleScore(apArticle);
        score *= 3;
    
        // 3. 替换当前文章对应频道的热点数据
        replaceDataToRedis(ArticleConstants.HOT_ARTICLE_FIRST_PAGE + apArticle.getChannelId(), apArticle, score);
    
        // 4. 替换推荐对应的热点数据
        replaceDataToRedis(ArticleConstants.HOT_ARTICLE_FIRST_PAGE + ArticleConstants.DEFAULT_TAG, apArticle, score);
    }
    
    /**
     * 替换数据并且存入到redis中
     * @param HOT_ARTICLE_FIRST_PAGE
     * @param apArticle
     * @param score
     */
    private void replaceDataToRedis(String HOT_ARTICLE_FIRST_PAGE, ApArticle apArticle, Integer score) {
        String articleList = cacheService.get(HOT_ARTICLE_FIRST_PAGE);
        if(StringUtils.isNotBlank(articleList)){
            List<HotArticleVo> hotArticleVoList = JSON.parseArray(articleList, HotArticleVo.class);
            boolean flag = true;
            // 3.1 文章已是热点文章, 则更新文章分数
            for (HotArticleVo hotArticleVo : hotArticleVoList) {
                if(hotArticleVo.getId().equals(apArticle.getId())){
                    hotArticleVo.setScore(score);
                    flag = false;
                    break;
                }
            }
            // 3.2 文章还不是热点文章, 则替换分数最小的文章
            if(flag){
                if(hotArticleVoList.size() >= 30){
                    // 热点文章超过30条
                    hotArticleVoList = hotArticleVoList.stream().sorted(Comparator.comparing(HotArticleVo::getScore).reversed()).collect(Collectors.toList());
                    HotArticleVo lastHot = hotArticleVoList.get(hotArticleVoList.size() - 1);
                    if(lastHot.getScore() < score){
                        hotArticleVoList.remove(lastHot);
                        HotArticleVo hotArticleVo = new HotArticleVo();
                        BeanUtils.copyProperties(apArticle, hotArticleVo);
                        hotArticleVo.setScore(score);
                        hotArticleVoList.add(hotArticleVo);
                    }
                }else{
                    // 热点文章没超过30条
                    HotArticleVo hotArticleVo = new HotArticleVo();
                    BeanUtils.copyProperties(apArticle, hotArticleVo);
                    hotArticleVo.setScore(score);
                    hotArticleVoList.add(hotArticleVo);
                }
            }
            // 3.3 缓存到redis
            hotArticleVoList = hotArticleVoList.stream().sorted(Comparator.comparing(HotArticleVo::getScore).reversed()).collect(Collectors.toList());
            cacheService.set(HOT_ARTICLE_FIRST_PAGE, JSON.toJSONString(hotArticleVoList));
        }
    }
    
    /**
     * 更新文章行为数据
     * @param mess
     */
    private ApArticle updateArticleBehavior(ArticleVisitStreamMess mess) {
        ApArticle apArticle = getById(mess.getArticleId());
        apArticle.setComment(apArticle.getComment()==null?0:apArticle.getComment()+mess.getComment());
        apArticle.setCollection(apArticle.getCollection()==null?0:apArticle.getCollection()+mess.getCollect());
        apArticle.setLikes(apArticle.getLikes()==null?0:apArticle.getLikes()+mess.getLike());
        apArticle.setViews(apArticle.getViews()==null?0:apArticle.getViews()+mess.getView());
    
        updateById(apArticle);
    
        return apArticle;
    }
    

来源

黑马程序员. 黑马头条

Gitee

https://gitee.com/yu-ba-ba-ba/leadnews

本文内容由网友自发贡献,版权归原作者所有,本站不承担相应法律责任。如您发现有涉嫌抄袭侵权的内容,请联系:hwhale#tublm.com(使用前将#替换为@)

JavaWeb_LeadNews_Day11-KafkaStream实现实时计算文章分数 的相关文章

随机推荐

  • 【力扣】455、分发饼干

    var findContentChildren function g s g 孩子的胃口 s 饼干尺寸 let arr g sort a b gt return a b let brr s sort a b gt return a b 初始
  • 【Flutter】十八、Flutter中常用的布局容器——列表布局ListView、ListTile

    一 ListView 1 1 创建ListView的多种方式 1 1 1 ListView 1 1 2 ListView builder 1 1 3 ListView separated 1 1 4 ListView custom 二 Li
  • 【P186 20】C++ 容器快速入门 (vector、deque、list、map、set...)(超详细)

    P186 20 C 容器快速入门 一 vector容器 1 vector存放内置数据类型 1 六种遍历方式 六种遍历方式 完整代码总览 部分遍历详解 2 初始化 区别 1 默认初始化 无参 没给定数组大小 2 带参数构造初始化 给定数组大小
  • 小物体的目标检测的研究综述

    关于小目标检测算法的研究综述 小目标研究的难点 小目标研究算法的现状 自己对于小目标算法的理解 小目标检测的难点 在我们平常的数据集中 大多数都是中等和偏大的物体 小目标属性相对偏少 小目标在原始图像中一般只占0 02 0 05左右 而我们
  • Linux系统编程——文件编程(四)光标移动(lseek)

    lseek函数 lseek是一个用于改变读写一个文件时读写指针位置的一个系统函数 每个打开的文件都有一个与其相关联的 当前文件偏移量 它通常是一个非负整数 用以度量从文件开始处计算的字节数 通常 读 写操作都从当前文件偏移量处开始 并使偏移
  • 配置根目录_传奇单机架设教程:传奇单机登陆器配置全套教程

    在什么情况下需要架设单机呢 当你想要开外网 选定版本时可以将版本架设单机进行测试 当你觉得别人的服已经不够玩了 也可以自己架设单机 开自己喜欢的版本 传奇单机架设教程分享 只要学会了单机架设 就可以用本地电脑架设不同版本的传奇啦 赶快学起来
  • python 删除两个文件中没有一一对应的名称

    删除两个文件中没有一一对应的名称 针对于 jpg文件和xml文件没有对齐 usr bin python3 coding UTF 8 import os import shutil file name 1 JPEGImages 图片文件存放地
  • C语言进阶之路:如何去求任意两个数字的加减乘除问题

    提示 可以参考博主之前的文章来写的代码去寻找思路 文章目录 思考一下 一 提示 二 根据任意两个整数的求和去书写关于任意两个整数的加减乘除代码 1简单编写 2 笔者建议 总结 思考一下 提示 建议读者先进行自我思考 通过对博客的对比 不断挖
  • 未为python配置解释器_Python环境安装,解释器配置

    下载安装完Pycharm后 创建一个py文件编写代码会提示No Python interpreter configured for the project 这是提示要配置解释器 可以去官网下载安装 从官网下载https www python
  • 2021-09-04

    使用ESP8266实现STM32连网 USART实现 巴法云物联网 1 使用硬件 程序思路 基于正点原子的测试程序 在巴法云物联网创建的主题 ESP8266初始化代码 比较简陋 主函数代码 如果想用串口助手调试 接线方法如下 1 使用硬件
  • DFS判断有向图是否存在环

    st数组记录每个点的状态 0表示没访问过 1表示访问过 2表示与该点相邻的点都被访问过 dfs深搜如果遇到被标记成1的点 就说明有环 include
  • JavaWeb开发 JSP技术详解

    目录 一 JSP简介 1 1 JSP介绍 1 2 常见的视图层技术 1 3 前后端分离开发方式 二 JSP运行原理 2 1JSP技术特点 2 2 JSP与Servlet区别 三 JSP标签的使用 3 1 原始标签 3 1 1 声明标签 3
  • Win10配置环境变量path详解

    一 配置环境变量的目的 在刚刚写的代码中 只能存放在bin目录下 才能使用 javac 和 java 工具 如果我想把代码存放在任意的目录下 在任意的目录下都可以使用javac 和 java 工具该怎么办呢 就可以把 javac 和 jav
  • JavaScript基本数据类型简单转换

    JavaScript几个变量类型简单介绍 number型 数字型 string型 字符型 Boolean型 布尔型 true 1 false 0 undefine 未定义类型 null 空值 其它类型本文暂不涉及 注意 1 JavaScri
  • L298N驱动俩路电机按键控制正反转

    一个L298N是带有俩路驱动电路的 就是可以驱动俩个电机同时去转动 在硬件部分因为以前弄小车的时候已经组装好了 一路驱动去控制俩个电机 俩路驱动就是控制四个电机 做的是四轮四驱的小车 嫌麻烦就没有去拆改装了 博主主页里面有介绍过L298N的
  • Jenkins

    参考 Job artifacts GitLab 作业产物 GitLab 说明 晚点补充
  • Java程序设计--Java入门(一)

    Java入门 1 Java概述 1 1 Java语言发展史 1 2 Java语言跨平台原理 1 3 JRE和JDK 1 4 JDK的下载和安装 2 入门程序HelloWorld 2 1 常用DOS命令 2 2 Path环境变量的配置 2 3
  • IDEA 配置 openjdk,jre

    使用 idea 配置 zip 版本的 openjdk 和 jre 我使用的时 redhat 提供的 jdk 和 jre 一 下载 idea jdk 和 jre 红帽openjdk下载地址为 Red Hat build of OpenJDK
  • 零度根轨迹的推导

    根轨迹有很多种 常见有180度根轨迹 0度根轨迹和参数根轨迹 0度和180度是对比记忆的 记住一个另一个也记住了 比较常见的是180度根轨迹 而画根轨迹的目的是通过图像来反应闭环传递函数的一些根的性质从而代表了这个系统的一些性能指标 从根轨
  • JavaWeb_LeadNews_Day11-KafkaStream实现实时计算文章分数

    JavaWeb LeadNews Day11 KafkaStream实现实时计算文章分数 KafkaStream 概述 案例 统计单词个数 SpringBoot集成 实时计算文章分值 来源 Gitee KafkaStream 概述 Kafk