Kafka练习

2023-11-11

需求:写一个生产者,不断的去生产用户行为数据,写入到kafka的一个topic中

生产的数据格式: 造数据

{"guid":1,"eventId":"pageview","timestamp":1637868346789} isNew = 1

{"guid":1,"eventId":"addcard","timestamp":1637868347625} isNew = 0

{"guid":2,"eventId":"collect","timestamp":16378683463219}

{"guid":3,"eventId":"paid","timestamp":16378683467829}

......

再写一个消费者,不断的从kafka中消费上面的用户行为数据,做一个统计

1.每5s输出一次当前来了多少用户(去重) uv

2.将每条数据添加一个字段来标识,如果这个用户的id是第一次出现,那么就标注1,否则就是0

生产者代码示例: 

package com.doit.kafaka;

import com.alibaba.fastjson.JSON;
import org.apache.commons.lang3.RandomStringUtils;
import org.apache.commons.lang3.RandomUtils;
import org.apache.kafka.clients.producer.KafkaProducer;
import org.apache.kafka.clients.producer.ProducerConfig;
import org.apache.kafka.clients.producer.ProducerRecord;
import org.apache.kafka.common.serialization.StringSerializer;

import java.util.Properties;

//需求:写一个生产者,不断的去生产用户行为数据,写入到kafka的一个topic中  ==>先造一部分数据,然后不断的往kafka中写数据(生产者)
//        * 生产的数据格式:  造数据
//        * {"guid":1,"eventId":"pageview","timestamp":1637868346789}  isNew = 1  ==》 fastjson  ==》javabean 创建对象  格式化成json串
//        * {"guid":1,"eventId":"addcard","timestamp":1637868347625}   isNew = 0
//        * {"guid":2,"eventId":"collect","timestamp":16378683463219}
//        * {"guid":3,"eventId":"paid","timestamp":16378683467829}
//        * ......
//        * 再写一个消费者,不断的从kafka中消费上面的用户行为数据,做一个统计  ==》poll for(具体的逻辑)
//        * 1.每5s输出一次当前来了多少用户(去重)  uv   每5s输出一次==》 任务调度器 Timer
//        * 2.将每条数据添加一个字段来标识,如果这个用户的id是第一次出现,那么就标注1,否则就是0  ==》 判断这个数之前有没有出现过
//        */
public class _Producer_uv {
    public static void main(String[] args) throws InterruptedException {
        Properties props = new Properties();
        props.setProperty(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG,"linux01:9092,linux02:9092,linux03:9092");
        props.setProperty(ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG, StringSerializer.class.getName());
        props.setProperty(ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG,StringSerializer.class.getName());
        props.setProperty("value.serializer",StringSerializer.class.getName());

        KafkaProducer<String,String> producer = new KafkaProducer<>(props);
        EventLog eventLog = new EventLog();

        while(true){
            eventLog.setEventId(RandomStringUtils.randomAlphabetic(10));
            eventLog.setGuid(RandomUtils.nextInt(10000,100000));
            eventLog.setTimestamp(System.currentTimeMillis());
            String jsonString = JSON.toJSONString(eventLog);
            ProducerRecord<String,String> record = new ProducerRecord<>("event-log",jsonString);
            producer.send(record);
            producer.flush();
            Thread.sleep(RandomUtils.nextInt(10,200));

        }
    }
}

 消费者代码示例: 用hashset来实现:

package com.doit.kafaka;

import com.alibaba.fastjson.JSON;
import com.doit.demo.Consumer;
import org.apache.kafka.clients.consumer.ConsumerConfig;
import org.apache.kafka.clients.consumer.ConsumerRecord;
import org.apache.kafka.clients.consumer.ConsumerRecords;
import org.apache.kafka.clients.consumer.KafkaConsumer;
import org.apache.kafka.common.serialization.StringDeserializer;
import org.apache.kafka.common.serialization.StringSerializer;

import java.time.Duration;
import java.util.*;

//再写一个消费者,不断的从kafka中消费上面的用户行为数据,做一个统计  ==》poll for(具体的逻辑)
//        * 1.每5s输出一次当前来了多少用户(去重)  uv   每5s输出一次==》 任务调度器 Timer
//        * 2.将每条数据添加一个字段来标识,如果这个用户的id是第一次出现,那么就标注1,否则就是0  ==》 判断这个数之前有没有出现过
public class _Consumer_uv {
    public static void main(String[] args) {
      HashSet<Integer> set = new HashSet<>();
      new Thread(new SetTask(set)).start();

        Timer timer = new Timer();
        timer.schedule(new TimerTask() {
            @Override
            public void run() {
                System.out.println("截止到现在的uv数:"+set.size()+",当前时间是:"+System.currentTimeMillis());

            }
        },1000,5000);
    }
}
class SetTask implements Runnable{
private HashSet<Integer> set;
private KafkaConsumer<String,String> consumer;

    public SetTask(HashSet<Integer> set) {

        Properties props = new Properties();
        props.setProperty(ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG,"linux01:9092");
        props.setProperty(ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG, StringDeserializer.class.getName());
        props.setProperty(ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG,StringDeserializer.class.getName());
        props.setProperty(ConsumerConfig.GROUP_ID_CONFIG,"group02");
        props.setProperty(ConsumerConfig.AUTO_OFFSET_RESET_CONFIG,"earliest");
        props.setProperty(ConsumerConfig.ALLOW_AUTO_CREATE_TOPICS_CONFIG,"true");
        props.setProperty(ConsumerConfig.ENABLE_AUTO_COMMIT_CONFIG,"false");
        props.setProperty(ConsumerConfig.AUTO_COMMIT_INTERVAL_MS_CONFIG,"10000");

        consumer = new KafkaConsumer<String,String>(props);
        this.set = set;
    }

    @Override
    public void run() {

        consumer.subscribe(Arrays.asList("event-log"));
        while (true){
            ConsumerRecords<String, String> records = consumer.poll(Duration.ofMillis(Integer.MAX_VALUE));
            for (ConsumerRecord<String, String> record : records) {
                String value = record.value();
                EventLog eventLog = JSON.parseObject(value, EventLog.class);
                int guid = eventLog.getGuid();
                set.add(guid);
            }
        }

    }
}

用hashset来实现很显然会出问题,如果数据量一直往上增长,会出现oom的问题,而且占用资源越来越多,影响电脑性能!!!

方案二:将HashSet改成bitMap来计数,就很完美,大逻辑不变,小逻辑就是将HashMap改成bitMap

package com.doit.kafaka;

import com.alibaba.fastjson.JSON;
import org.apache.kafka.clients.consumer.ConsumerConfig;
import org.apache.kafka.clients.consumer.ConsumerRecord;
import org.apache.kafka.clients.consumer.ConsumerRecords;
import org.apache.kafka.clients.consumer.KafkaConsumer;
import org.apache.kafka.common.serialization.StringDeserializer;
import org.roaringbitmap.RoaringBitmap;

import java.time.Duration;
import java.util.*;

//再写一个消费者,不断的从kafka中消费上面的用户行为数据,做一个统计  ==》poll for(具体的逻辑)
//        * 1.每5s输出一次当前来了多少用户(去重)  uv   每5s输出一次==》 任务调度器 Timer
//        * 2.将每条数据添加一个字段来标识,如果这个用户的id是第一次出现,那么就标注1,否则就是0  ==》 判断这个数之前有没有出现过
public class _Consumer_uv2 {

    public static void main(String[] args) {
//      HashSet<Integer> set = new HashSet<>();
        RoaringBitmap bitmap = new RoaringBitmap();
      new Thread(new BitMapTask(bitmap)).start();

        Timer timer = new Timer();
        timer.schedule(new TimerTask() {
            @Override
            public void run() {
                System.out.println("截止到现在的uv数:"+bitmap.getCardinality()+",当前时间是:"+System.currentTimeMillis());

            }
        },1000,5000);
    }
}
class BitMapTask implements Runnable{
private RoaringBitmap bitmap;
private KafkaConsumer<String,String> consumer;

    public BitMapTask(RoaringBitmap bitmap) {

        Properties props = new Properties();
        props.setProperty(ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG,"linux01:9092");
        props.setProperty(ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG, StringDeserializer.class.getName());
        props.setProperty(ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG,StringDeserializer.class.getName());
        props.setProperty(ConsumerConfig.GROUP_ID_CONFIG,"group02");
        props.setProperty(ConsumerConfig.AUTO_OFFSET_RESET_CONFIG,"earliest");
        props.setProperty(ConsumerConfig.ALLOW_AUTO_CREATE_TOPICS_CONFIG,"true");
        props.setProperty(ConsumerConfig.ENABLE_AUTO_COMMIT_CONFIG,"false");
        props.setProperty(ConsumerConfig.AUTO_COMMIT_INTERVAL_MS_CONFIG,"10000");

        consumer = new KafkaConsumer<String,String>(props);
        this.bitmap = bitmap;
    }

    @Override
    public void run() {

        consumer.subscribe(Arrays.asList("event-log"));
        while (true){
            ConsumerRecords<String, String> records = consumer.poll(Duration.ofMillis(Integer.MAX_VALUE));
            for (ConsumerRecord<String, String> record : records) {
                String value = record.value();
                EventLog eventLog = JSON.parseObject(value, EventLog.class);
                int guid = eventLog.getGuid();
                bitmap.add(guid);
            }
        }

    }
}

需求二:判断来没来过的问题,可以用bitmap来搞,当然还可以用布隆过滤器来搞

 

package com.doit.kafaka;

import com.alibaba.fastjson.JSON;
import com.google.common.hash.BloomFilter;
import com.google.common.hash.Funnels;
import org.apache.kafka.clients.consumer.ConsumerConfig;
import org.apache.kafka.clients.consumer.ConsumerRecord;
import org.apache.kafka.clients.consumer.ConsumerRecords;
import org.apache.kafka.clients.consumer.KafkaConsumer;
import org.apache.kafka.common.serialization.StringDeserializer;
import org.checkerframework.checker.nullness.qual.Nullable;
import redis.clients.jedis.Jedis;

import java.time.Duration;
import java.util.Arrays;
import java.util.Properties;
import java.util.Timer;
import java.util.TimerTask;

//再写一个消费者,不断的从kafka中消费上面的用户行为数据,做一个统计  ==》poll for(具体的逻辑)
//        * 1.每5s输出一次当前来了多少用户(去重)  uv   每5s输出一次==》 任务调度器 Timer
//        * 2.将每条数据添加一个字段来标识,如果这个用户的id是第一次出现,那么就标注1,否则就是0  ==》 判断这个数之前有没有出现过
public class _Consumer_uv4 {
    public static void main(String[] args) {
        BloomFilter<Long> bloom = BloomFilter.create(Funnels.longFunnel(), 1000000);
        Properties props = new Properties();
        props.setProperty(ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG,"linux01:9092");
        props.setProperty(ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG, StringDeserializer.class.getName());
        props.setProperty(ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG,StringDeserializer.class.getName());
        props.setProperty(ConsumerConfig.GROUP_ID_CONFIG,"group02");
        props.setProperty(ConsumerConfig.AUTO_OFFSET_RESET_CONFIG,"earliest");
        props.setProperty(ConsumerConfig.ALLOW_AUTO_CREATE_TOPICS_CONFIG,"true");
        props.setProperty(ConsumerConfig.ENABLE_AUTO_COMMIT_CONFIG,"false");
        props.setProperty(ConsumerConfig.AUTO_COMMIT_INTERVAL_MS_CONFIG,"10000");

        KafkaConsumer<String,String>  consumer = new KafkaConsumer<String,String>(props);
        consumer.subscribe(Arrays.asList("event-log"));
        while (true){
            ConsumerRecords<String, String> records = consumer.poll(Duration.ofMillis(Integer.MAX_VALUE));
            for (ConsumerRecord<String, String> record : records) {
                String value = record.value();
                EventLog eventLog = JSON.parseObject(value, EventLog.class);
                boolean flag = bloom.mightContain((long) eventLog.getGuid());
                if (!flag){
                    eventLog.setIsNew(1);
                    bloom.put((long) eventLog.getGuid());
                }else {
                    eventLog.setIsNew(0);
                }
                System.out.println(JSON.toJSONString(eventLog));


            }
        }
    }
}


 

 

 

本文内容由网友自发贡献,版权归原作者所有,本站不承担相应法律责任。如您发现有涉嫌抄袭侵权的内容,请联系:hwhale#tublm.com(使用前将#替换为@)

Kafka练习 的相关文章

随机推荐

  • pytorch测试模型时根据不同列别的概率值得到具体的分类

    pytorch 分类任务的教程 https pytorch org tutorials beginner blitz cifar10 tutorial html 主要使用的是 predict torch max out data 1 最后的
  • best ajax lib,BEST Currency Converter

    想提升客户的购物体验 以当地货币显示价格可以省去他们很多不必要的时间 也能提升客户与平台的粘度 该插件具备如下优势 1 轻松添加多种货币 按下按钮即可添加160多种货币 像专业人士一样开始国际销售 并鼓励客户购买 2 自动转换价格 价格会根
  • node.js 读取文件的时候 cmd执行脚本,中文(汉字)打印不出来

    node js 读取文件的时候 cmd执行脚本 中文 汉字 打印不出来 文本详情 输出结果 问题原因 txt编码格式不是UTF 8 解决办法 打开TXT文件 点击 文件 gt 另存为 gt 编码改为UTF 8 保存替换 问题解决
  • 【大数据】Flink 详解(五):核心篇 Ⅳ

    本系列包含 大数据 Flink 详解 一 基础篇 大数据 Flink 详解 二 核心篇 大数据 Flink 详解 三 核心篇 大数据 Flink 详解 四 核心篇 大数据 Flink 详解 五 核心篇 大数据 Flink 详解 六 源码篇
  • 通俗易懂的教你编写自己的webpack loader与plugin

    前言 webpack几乎是目前前端开发者无人不知的打包框架 毕竟无论使用什么开发库 都会想到要使用webpack打包 包括各种脚手架cli工具 大部分也采用了webpack作为其打包工具 本文试图用最简单的代码 仅仅使用命令行工具 代码足够
  • spring data jpa使用limit时,抛QuerySyntaxException unexpected token: limit

    异常重现 jpql语句如下 select g from Entity g where g codeUrl codeUrl ORDER BY g createTime DESC limit 1异常原因 limit是特定于某些数据库 例如 my
  • IDEA设置为中文

    按照如下步骤操作即可 下载对应的语言包 中文语言包下载地址 注意此处下载的版本只能是IDEA版本之前的语言包 下载之后的会报错 将下载好的jar包 放在IDEA目录下的lib目录下 点击File Settings 点击Plugins 然后点
  • matlab相关性分析(皮尔逊,肯德尔,斯皮尔曼)

    代码 clc clear load CRO C3 mat data GPP DT VUT REF EVI NDVI NIRv kNDVI LSWI FPAR TA F VPD F SW IN F rho corr data type pea
  • LeetCode题目笔记——1658. 将 x 减到 0 的最小操作数

    文章目录 题目描述 题目难度 中等 方法一 反向思考 双指针求最长子数组 代码 Python 代码 C 方法二 滑动窗口 代码 总结 我把这篇也归到面试题那一栏 因为觉得这题的思路和思考方式还挺好的 或许能用到其他题上 题目描述 给你一个整
  • [创业之路-74] - 感悟 - 创业是所有因素的机缘组合,缺一不可; 舰船思维 VS 城堡思维.

    感悟 方向 趋势 路径 资助 船只 船长 大副 水手 船员 装备 配套 路径 一个都不能少 只看对方向与趋势 一样葬身在趋势的洪流中 看不对方向与趋势 亦会老死在寂寞孤冷之中 在所有因素中 船只 装配 配套是最表象和最容易触发感官体验的 目
  • 服务器与虚拟技术,云服务器与虚拟化服务器的区别

    虚拟化服务器是让一台服务器变成几台甚至上百台相互隔离的虚拟服务器 不再受限于物理上的界限 而是让CPU 内存 磁盘 I O等硬件变成可以动态管理的 资源池 从而提高资源的利用率 简化系统管理 服务器虚拟化的种类 主要有 一虚多 多虚一 和
  • c++ 之 shared_ptr

    shared ptr shared ptr 是一种智能指针 smart pointer 作用有如同指针 但会记录有多少个 shared ptrs 共同指向一个对象 这便是所谓的引用计数 reference counting 一旦最后一个这样
  • oracle字符串生成唯一数字,在C#中生成唯一的字符串和数字【GUID】转

    当我们想要获得一个唯一的key的时候 通常会想到GUID 这个key非常的长 虽然我们在很多情况下这并不是个问题 但是当我们需要将这个36个字符的字符串放在URL中时 会使的URL非常的丑陋 想要缩短GUID的长度而不牺牲它的唯一性是不可能
  • Spark常见错误剖析与应对策略

    问题一 日志中出现 org apache spark shuffle MetadataFetchFailedException Missing an output location for shuffle 0 原因分析 shuffle分为s
  • 第2章 PyTorch基础(1/2)

    第2章 PyTorch基础 PyTorch是Facebook团队于2017年1月发布的一个深度学习框架 虽然晚于TensorFlow Keras等框架 但自发布之日起 其关注度就在不断上升 目前在GitHub上的热度已超过Theano Ca
  • iterator 怎么使用甀_Iterator的理解和使用

    es6成员之一的Iterator 遍历器 Iterator 它是一种接口 为各种不同的数据结构提供统一的访问机制 任何数据结构只要部署Iterator接口 就可以完成遍历操作 即依次处理该数据结构的所有成员 Iterator 的作用有三个
  • 记一次edusrc的漏洞挖掘

    一 前言 在fofa上闲逛的时候发现这个系统 其实之前也碰到过这个系统 当时可能觉得没什么漏洞点就没有管 正好闲着没事又碰到了这个系统 然后就拿过来简单的测试了一下 二 漏洞挖掘 1 信息收集 由于我是在fofa上发现的这个系统 所以也谈不
  • 软件系统设计-15-架构设计

    1 设计架构 Design Architecture 1 1 设计策略 Design Strategies Abstraction Generate Test Decomposition Reusable Elements Iteratio
  • python(数据分析)第5天:图例

    图例 plt legend import matplotlib pyplot as plt import random import matplotlib from matplotlib import cycler from matplot
  • Kafka练习

    需求 写一个生产者 不断的去生产用户行为数据 写入到kafka的一个topic中 生产的数据格式 造数据 guid 1 eventId pageview timestamp 1637868346789 isNew 1 guid 1 even