大数据毕设分享 flink大数据淘宝用户行为数据实时分析与可视化

2024-01-04

文章目录


0 前言

???? 这两年开始毕业设计和毕业答辩的要求和难度不断提升,传统的毕设题目缺少创新和亮点,往往达不到毕业答辩的要求,这两年不断有学弟学妹告诉学长自己做的项目系统达不到老师的要求。

为了大家能够顺利以及最少的精力通过毕设,学长分享优质毕业设计项目,今天要分享的是

???? flink大数据淘宝用户行为数据实时分析与可视化

????学长这里给一个题目综合评分(每项满分5分)

  • 难度系数:3分
  • 工作量:3分
  • 创新点:4分

1、环境准备

1.1 flink 下载相关 jar 包

flink-sql 连接外部系统时,需要依赖特定的 jar 包,所以需要事先把这些 jar 包准备好。 说明与下载入口

本项目使用到了以下的 jar 包 ,下载后直接放在了 flink/lib 里面。

需要注意的是 flink-sql 执行时,是转化为 flink-job 提交到集群执行的,所以 flink 集群的每一台机器都要添加以下的 jar 包。

外部 版本 jar
kafka 4.1 flink-sql-connector-kafka_2.11-1.10.2.jar
flink-json-1.10.2-sql-jar.jar
elasticsearch 7.6 flink-sql-connector-elasticsearch7_2.11-1.10.2.jar
mysql 5.7 flink-jdbc_2.11-1.10.2.jar
mysql-connector-java-8.0.11.jar

1.2 生成 kafka 数据

用户行为数据来源: 阿里云天池公开数据集

网盘:https://pan.baidu.com/s/1wDVQpRV7giIlLJJgRZAInQ 提取码:gja5

商品类目纬度数据来源: category.sql

数据生成器: datagen.py

有了数据文件之后,使用 python 读取文件数据,然后并发写入到 kafka。

修改生成器中的 kafka 地址配置,然后运行 以下命令,开始不断往 kafka 写数据

# 5000 并发
nohup python3 datagen.py 5000 &                  

1.3 开发前的三个小 tip

  • 生成器往 kafka 写数据,会自动创建主题,无需事先创建

  • flink 往 elasticsearch 写数据,会自动创建索引,无需事先创建

  • Kibana 使用索引模式从 Elasticsearch 索引中检索数据,以实现诸如可视化等功能。

使用的逻辑为:创建索引模式 》Discover (发现) 查看索引数据 》visualize(可视化)创建可视化图表》dashboards(仪表板)创建大屏,即汇总多个可视化的图表

2、flink-sql 客户端编写运行 sql

# 进入 flink-sql 客户端, 需要指定刚刚下载的 jar 包目录
./bin/sql-client.sh embedded -l lib

2.1 创建 kafka 数据源表

-- 创建 kafka 表, 读取 kafka 数据
CREATE TABLE user_behavior (
    user_id BIGINT,
    item_id BIGINT,
    category_id BIGINT,
    behavior STRING,
    ts TIMESTAMP(3),
    proctime as PROCTIME(),
    WATERMARK FOR ts as ts - INTERVAL '5' SECOND  
) WITH (
    'connector.type' = 'kafka', 
    'connector.version' = 'universal',  
    'connector.topic' = 'user_behavior',  
    'connector.startup-mode' = 'earliest-offset', 
    'connector.properties.zookeeper.connect' = '172.16.122.24:2181', 
    'connector.properties.bootstrap.servers' = '172.16.122.17:9092', 
    'format.type' = 'json'  
);
SELECT * FROM user_behavior;

2.2 指标统计:每小时成交量

2.2.1 创建 es 结果表, 存放每小时的成交量

CREATE TABLE buy_cnt_per_hour (
    hour_of_day BIGINT,
    buy_cnt BIGINT
) WITH (
    'connector.type' = 'elasticsearch', 
    'connector.version' = '7',  
    'connector.hosts' = 'http://172.16.122.13:9200',  
    'connector.index' = 'buy_cnt_per_hour',
    'connector.document-type' = 'user_behavior',
    'connector.bulk-flush.max-actions' = '1',
    'update-mode' = 'append',
    'format.type' = 'json'
);

2.2.2 执行 sql ,统计每小时的成交量

INSERT INTO buy_cnt_per_hour
SELECT HOUR(TUMBLE_START(ts, INTERVAL '1' HOUR)), COUNT(*)
FROM user_behavior
WHERE behavior = 'buy'
GROUP BY TUMBLE(ts, INTERVAL '1' HOUR);

2.3 指标统计:每10分钟累计独立用户数

2.3.1 创建 es 结果表,存放每10分钟累计独立用户数

CREATE TABLE cumulative_uv (
    time_str STRING,
    uv BIGINT
) WITH (
    'connector.type' = 'elasticsearch', 
    'connector.version' = '7',  
    'connector.hosts' = 'http://172.16.122.13:9200',  
    'connector.index' = 'cumulative_uv',
    'connector.document-type' = 'user_behavior',    
    'update-mode' = 'upsert',
    'format.type' = 'json'
);

2.3.2 创建视图

CREATE VIEW uv_per_10min AS
SELECT
  MAX(SUBSTR(DATE_FORMAT(ts, 'HH:mm'),1,4) || '0') OVER w AS time_str,
  COUNT(DISTINCT user_id) OVER w AS uv
FROM user_behavior
WINDOW w AS (ORDER BY proctime ROWS BETWEEN UNBOUNDED PRECEDING AND CURRENT ROW);

2.3.3 执行 sql ,统计每10分钟的累计独立用户数

INSERT INTO cumulative_uv
SELECT time_str, MAX(uv)
FROM uv_per_10min
GROUP BY time_str;

2.4 指标统计:商品类目销量排行

2.4.1 创建商品类目维表

先在 mysql 创建一张商品类目的维表,然后配置 flink 读取 mysql。

CREATE TABLE category_dim (
    sub_category_id BIGINT,
    parent_category_name STRING
) WITH (
    'connector.type' = 'jdbc',
    'connector.url' = 'jdbc:mysql://172.16.122.25:3306/flink',
    'connector.table' = 'category',
    'connector.driver' = 'com.mysql.jdbc.Driver',
    'connector.username' = 'root',
    'connector.password' = 'root',
    'connector.lookup.cache.max-rows' = '5000',
    'connector.lookup.cache.ttl' = '10min'
);

2.4.1 创建 es 结果表,存放商品类目排行表

CREATE TABLE top_category  (
    category_name  STRING,
    buy_cnt  BIGINT
) WITH (
    'connector.type' = 'elasticsearch', 
    'connector.version' = '7',  
    'connector.hosts' = 'http://172.16.122.13:9200',  
    'connector.index' = 'top_category',
    'connector.document-type' = 'user_behavior',
    'update-mode' = 'upsert',
    'format.type' = 'json'
);

2.4.2 创建视图

CREATE VIEW rich_user_behavior AS
SELECT U.user_id, U.item_id, U.behavior, C.parent_category_name as category_name
FROM user_behavior AS U LEFT JOIN category_dim FOR SYSTEM_TIME AS OF U.proctime AS C
ON U.category_id = C.sub_category_id;

2.4.3 执行 sql , 统计商品类目销量排行

INSERT INTO top_category
SELECT category_name, COUNT(*) buy_cnt
FROM rich_user_behavior
WHERE behavior = 'buy'
GROUP BY category_name;

3、最终效果与体验心得

3.1 最终效果

整个开发过程,只用到了 flink-sql ,无需写 java 或者其它代码,就完成了这样一个实时报表。

image-20201201175438743

3.2 体验心得

3.2.1 执行

  • flink-sql 的 ddl 语句不会触发 flink-job , 同时创建的表、视图仅在会话级别有效。

  • 对于连接表的 insert、select 等操作,则会触发相应的流 job, 并自动提交到 flink 集群,无限地运行下去,直到主动取消或者 job 报错。

  • flink-sql 客户端关闭后,对于已经提交到 flink 集群的 job 不会有任何影响。

本次开发,执行了 3 个 insert , 因此打开 flink 集群面板,可以看到有 3 个无限的流 job 。即使 kafka 数据全部写入完毕,关闭 flink-sql 客户端,这个 3 个 job 都不会停止。
image-20201201175523916

3.2.2 存储

  • flnik 本身不存储业务数据,只作为流批一体的引擎存在,所以主要的用法为读取外部系统的数据,处理后,再写到外部系统。

  • flink 本身的元数据,包括表、函数等,默认情况下只是存放在内存里面,所以仅会话级别有效。但是,似乎可以存储到 Hive Metastore 中,关于这一点就留到以后再实践。

4 最后

本文内容由网友自发贡献,版权归原作者所有,本站不承担相应法律责任。如您发现有涉嫌抄袭侵权的内容,请联系:hwhale#tublm.com(使用前将#替换为@)

大数据毕设分享 flink大数据淘宝用户行为数据实时分析与可视化 的相关文章

  • Tkinter 菜单删除项

    如何删除任何菜单项 例如我想删除 播放 self menubar Menu self root self root config menu self menubar self filemenu2 Menu self menubar self
  • 如何删除 PyCharm 中的项目?

    如果我关闭一个项目 然后删除该项目文件夹 则在 PyCharm 重新启动后 会再次创建一个空的项目文件夹 只需按顺序执行以下步骤即可 他们假设您当前在 PyCharm 窗口中打开了该项目 单击 文件 gt 关闭项目 关闭项目 在 PyCha
  • 从字典的元素创建 Pandas 数据框

    我正在尝试从字典创建一个 pandas 数据框 字典设置为 nvalues y1 1 2 3 4 y2 5 6 7 8 y3 a b c d 我希望数据框仅包含 y1 and y2 到目前为止我可以使用 df pd DataFrame fr
  • 定义Python源代码编码的正确方法

    PEP 263 http www python org dev peps pep 0263 定义如何声明Python源代码编码 通常 Python 文件的前两行应以以下内容开头 usr bin python coding
  • 检查 python 中命令行参数的数量

    我是蟒蛇新手 还是把脚弄湿了 我正在尝试做这样的事情 import sys if len sys argv lt 3 or lt len sys argv gt 3 print This script will compare two fi
  • 如何获取numpy.random.choice的索引? - Python

    是否可以修改 numpy random choice 函数以使其返回所选元素的索引 基本上 我想创建一个列表并随机选择元素而不进行替换 import numpy as np gt gt gt a 1 4 1 3 3 2 1 4 gt gt
  • 使用 Pytest 的参数化添加测试功能的描述

    当其中一个测试失败时 可以在测试正在测试的内容的参数化中添加描述 快速了解测试失败的原因 有时您不知道测试失败的原因 您必须查看代码 通过每个测试的描述 您就可以知道 例如 pytest mark parametrize num1 num2
  • python celery -A 的无效值无法加载应用程序

    我有一个以下项目目录 azima init py main py tasks py task py from main import app app task def add x y return x y app task def mul
  • 如何使用 opencv python 计算乐高积木上的孔数?

    我正在开发我的 python 项目 我需要计算每个乐高积木组件中有多少个孔 我将从输入 json 文件中获取有关需要计算哪个程序集的信息 如下所示 img 001 red 0 blue 2 white 1 grey 1 yellow 1 r
  • 在 Mac OSX 上从 Python 3.6 运行 wine 命令

    我正在尝试用 Python 编写一个打开的脚本wine然后发送代码到wine终端打开一个 exe程序 这 exe程序也是命令驱动的 我可以打开wine 但我无法进一步 import shlex subprocess line usr bin
  • 更改QLineEdit的ClearButton图标

    我想在Windows 10 1909 64位 上的Python 3 8和PyQt5 5 15 0 上更改我的QLineEdit的ClearButton图标 稍后我想在Linux上运行代码 我尝试应用此处找到的代码 如何在 QLineEdit
  • 在 Mac OS X 上安装 libxml2 时出现问题

    我正在尝试在我的 Mac 操作系统 10 6 4 上安装 libxml2 我实际上正在尝试在 Python 中运行 Scrapy 脚本 这需要我安装 Twisted Zope 现在还需要安装 libxml2 我已经下载了最新版本 2 7 7
  • 为什么在Python解释器中输入_会返回True? [复制]

    这个问题在这里已经有答案了 我的翻译行为非常奇怪 gt gt gt True gt gt gt type True
  • 为什么这个 if 语句会导致语法错误

    我正在尝试设置一个 elif 语句 如果用户按下 Enter 键 代码将继续 但是我不断遇到语法错误 GTIN 0 while True try GTIN int input input your gtin 8 number if len
  • 使用 suds SOAP 库进行 HTTP 身份验证的奇怪行为

    我有一个正在运行的 python 程序 它使用 suds 通过 SOAP 获取大量数据 Web服务是通过分页功能实现的 这样我就可以抓取nnn每个 fetch 调用的行并获取下一个nnn与后续的电话 如果我使用如下代码向 HTTP 服务器进
  • 在 numpy 中连接维度

    我有x 1 2 3 4 5 6 7 8 9 10 11 12 shape 2 2 3 I want 1 2 3 4 5 6 7 8 9 10 11 12 shape 2 6 也就是说 我想连接中间维度的所有项目 在这种特殊情况下我可以得到这
  • 在matlab中,如何读取python pickle文件?

    在 python 中 我生成了一个 p 数据文件 pickle dump allData open myallData p wb 现在我想在Matlab中读取myallData p 我的Matlab安装在Windows 8下 其中没有Pyt
  • Jupyter Notebook:带有小部件的交互式绘图

    我正在尝试生成一个依赖于小部件的交互式绘图 我遇到的问题是 当我使用滑块更改参数时 会在前一个绘图之后完成一个新绘图 而我预计只有一个绘图会根据参数发生变化 Example from ipywidgets import interact i
  • Python 中的 Unix cat 函数 (cat * > merged.txt)? [复制]

    这个问题在这里已经有答案了 一旦建立了目录 有没有办法在Python中使用Unix中的cat函数或类似的函数 我想将 files 1 3 合并到 merged txt 我通常会在 Unix 中找到该目录 然后运行 cat gt merged
  • 如何通过点击复制 folium 地图上的标记位置?

    I am able to print the location of a given marker on the map using folium plugins MousePosition class GeoMap def update

随机推荐

  • 振弦采集仪在地基沉降监测中的应用研究

    振弦采集仪在地基沉降监测中的应用研究 振弦采集仪是一种专门用于测量地基沉降的仪器 它采用振弦原理来测量地基的沉降情况 振弦采集仪通过在地基上安装一根细长的弹性振弦 并测量振弦的变形来获得地基沉降的数据 在地基沉降监测中 振弦采集仪可以提供精
  • promethues grafana 安装和使用

    文章目录 1 promethues安装 2 node exporter安装 3 grafana安装 4 配置promethues监控node节点 5 grafana操作 外传 Docker 镜像下载地址 https hub docker c
  • uniapp微信小程序投票系统实战 (SpringBoot2+vue3.2+element plus ) -小程序端TabBar搭建

    锋哥原创的uniapp微信小程序投票系统实战 uniapp微信小程序投票系统实战课程 SpringBoot2 vue3 2 element plus 火爆连载更新中 哔哩哔哩 bilibili uniapp微信小程序投票系统实战课程 Spr
  • LONGQLORA: EFFICIENT AND EFFECTIVE METHOD TO EXTEND CONTEXT LENGTH OF LARGE LANGUAGE MODELS

    本文是LLM系列文章 针对 LONGQLORA EFFICIENT AND EFFECTIVE METHOD TO EXTEND CONTEXT LENGTH OF LARGE LANGUAGE MODELS 的翻译 LONGQLORA 扩
  • 新规施行推动数据资产化迈出关键一步

    经济参考报 1月2日刊发文章 新规施行推动数据资产化迈出关键一步 文章称 2024年1月1日起 企业数据资源相关会计处理暂行规定 下称 暂行规定 正式施行 暂行规定 明确数据资源的确认范围和会计处理适用准则等 业内人士表示 这是推动数据资产
  • PD SINK协议芯片系列产品介绍对比-ECP5701、FS312A、CH221K、HUSB238、AS225KL

    目录 一 ECP5701 二 FS312A 三 CH221K 四 HUSB238 五 AS225KL 在如今快节奏生活不断蔓延的背景下 人们对各种事情的处理也渐渐地开始要求在保证质量的情况下 不断加快 手机快充就是一个典型的例子 从开始的1
  • 智康护智慧养老院建设之智慧视频联动解决方案

    智康护智慧养老院建设之智慧视频联动解决方案是一种基于智能技术和视频监控的创新解决方案 旨在提升养老院的安全管理和服务质量 通过视频联动实现智能化的监控和应急响应 建设背景 随着人口老龄化问题的日渐严重 养老院作为提供长期照护和安居服务的场所
  • 程序员思维——四个思考原则

    一 什么是四个思考原则 以终为始 确定好真实目标 任务分解 找到实施路径 沟通反馈 解决与人打交道出现的问题 自动化 解决与机器打交道出现的问题 二 如何运用思考框架 运用这个思考框架 我们需要问自己一些问题 Where are we 我们
  • Qt学习_17_一些关于QTableWidget的记录

    1 QTableWidget clear 程序异常退出 近日 项目中使用到QTableWidget 遇到一个问题 项目需要清空这个表格 但是无论调用clear clearContents 程序都报 程序异常退出 而且项目程序还比较多 最开始
  • prometheus grafana mysql监控配置使用

    文章目录 前传 bitnami mysqld exporter 0 15 1镜像 出现了问题 my cnf 可以用这个 prom mysqld exporter v0 15 0 镜像 重要的事情 mysql监控效果 外传 前传 promet
  • 第九章 1 面向对象程序设计

    两大编程思想 面向过程和对象 p108 面向过程 功能上的封装 面向对象 属性和行为上的封装 面向过程 面向对象 区别 事物比较简单 可以用线性的思维去解决 事物比较复杂 使用简单的线性思维无法解决 共同点 1 面向过程和面向对象都是解决实
  • Android跨进程渲染

    文章目录 背景 实现步骤 服务端 客户端 参考代码
  • Python+Selenium键盘鼠标模拟事件操作详解

    当我们定位到具体的一个元素的时候就可以对这个元素进行具体的操作 比如之前章节所执行的 click 操作 这是最简单的操作 webdriver 还有其他的操作 比如元素的基本操作 点击 输入 清除 还有一些高级操作如鼠标键盘模拟事件 弹出框处
  • 服务器3M固定带宽什么意思?够用吗?

    云服务器3M固定带宽是什么意思 速度快吗 3M固定带宽是指云服务器的公网带宽 用于在外网提供服务的 3M带宽的下载速度是384KB 秒 上传速度是1280KB 秒 对于个人博客或流量不多的企业官网速度还是挺快的 阿里云服务器网aliyunf
  • thinkadmin安装步骤

    一 先cmd运行安装命令 创建项目 需要在英文目录下面执行 composer create project zoujingli thinkadmin 二 在confing中的database php配置数据库 三 将仓库的data复制到ap
  • 亚马逊自养号测评防关联技巧分享,亚马逊自养号怎么养?

    我们做亚马逊的都知道 想要做好亚马逊 测评是免不了的 很多卖家选择自养号这种方式 但是亚马逊养号并不是一件容易的事 需要我们提高养号的技术和掌握相应的技巧 而且随着平台审查力度的加强 自养号的账号关联问题也给卖家们带来许多困扰 那么什么是自
  • VUE+Springboot实现生成二维码及二维码下载功能

    一 Springboot相关 1 pom依赖引入
  • Python selenium模块的安装和配置教程

    一 selenium的安装以及简单应用 我们以谷歌浏览器的chromedriver为例 1 在Python虚拟环境中安装selenium模块 pip pip3 install selenium 2 下载版本符合的webdriver 以chr
  • 山西电力市场日前价格预测【2024-01-05】

    日前价格预测 预测说明 如上图所示 预测明日 2024 01 05 山西电力市场全天平均日前电价为259 10元 MWh 其中 最高日前电价为363 99元 MWh 预计出现在18 00 最低日前电价为0 00元 MWh 预计出现在11 1
  • 大数据毕设分享 flink大数据淘宝用户行为数据实时分析与可视化

    文章目录 0 前言 1 环境准备 1 1 flink 下载相关 jar 包 1 2 生成 kafka 数据 1 3 开发前的三个小 tip 2 flink sql 客户端编写运行 sql 2 1 创建 kafka 数据源表