Flink从入门到放弃(十二)-企业实战之事件驱动型场景踩坑(一)

2023-11-06

需求背景

某日,小明早上10点打卡到公司,先来一杯热水润润嗓子,打开音乐播放器带上心爱的降噪耳机看看新闻,静静等待11点半吃午饭。突然消息框亮了起来,这个时候小明心想要么来需求了,要么数据就有问题了。这个时候运营A部的同学发消息过来说想要分析下每个渠道当日的实时流量情况,以看板的方式提供就行。 小明看到这种需求,心想这还不简单,立马答应了下来,并许诺下班前完成。

方案设计

小明基于公司现有的Flink1.12.0 SQL接入Kafka来读取数据实现统计,通过渠道维度数据来关联,并将最终结果写入Mysql中,通过Superset来进行展示。整个数据流向也非常简单。图片

工程实践

这里的维度关联采用的是cdc模式,因为运营同学想看分析所有的渠道流量情况,而如果使用Temporal Join 就有可能会丢失部分渠道的数据(比如新接入一个渠道c,而且C渠道没有流量转换,那么就无法统计到C渠道的pv=0,uv=0)

-- Kafka Source
create table real_dwd_flow_info_from_kafka(
    visit_time timestamp,
    channel_code string,
    user_id string,
    url string,
    device_id string
    primary key(unique_id) not enforced
)with( 
  'connector' = 'upsert-kafka',
  'topic' = 'real_dwd_flow_info',
  'properties.bootstrap.servers' = 'bootstrap:9092',
  'key.format' = 'json',
  'value.format' = 'json'
 );
 
--Mysql Sink
create table real_dw_flow_index_info(
   channel string,
   pv int,
   uv int,
   primary key(channel) not enforced
)with(
  'connector' = 'jdbc',
   'url' = 'jdbc:mysql://localhost:3306/test?serverTimezone=Asia/Shanghai',
 'table-name' = 'flow_index_info',
 'username' = 'user_name',
 'password' = 'password'
)

--Dim Data
 create table real_dim_channel_code_from_mysql(
  channel_code string,
  channel_name string,
  primary key(channel_code) not enforced
) WITH(
  'connector' = 'mysql-cdc',
  'hostname' = 'localhost',
  'port' = '3306',
  'username' = 'user_name',
  'password' = 'password',
  'database-name' = 'test',
  'table-name' = 'dim_channel',
  'debezium.event.processing.failure.handling.mode' = 'warn',
  'debezium.snapshot.locking.mode' = 'none'
);

--统计指标,这里采用Regular Join
insert into real_dw_flow_index_info
select 
  t1.channel_code,
  sum(case when substr(cast(visit_time as string),1,10) = substr(cast(LOCALTIMESTAMP as string,1,10)) then 1 else 0 end) as pv,
  count(distinct case when substr(cast(visit_time as string),1,10) = substr(cast(LOCALTIMESTAMP as string,1,10)) then user_id end) as uv
from real_dim_channel_code_from_mysql t1
left join real_dwd_flow_info_from_kafka t2
on t1.channel_code  = t2.channel_code 
group by t1.channel_code

如上面的逻辑处理,可以实时统计出当日的实时流量情况。图片如上图,当运营同学通过配置接入一个新渠道G时,可以立刻在看板中反应出来。

踩坑填坑

入坑:
   小明下班前将该需求交付给运营部门后,开心下班了。不幸的是第二天一早上班,运营同学就来反馈说数据不对,渠道F数据并未发生变化,仍然停留在昨日的统计值上。。作为一名数据人,最害怕的就是别人说数据不对,经过定位排查发现渠道F由于各种因素出现故障,所以一直没有流量进入。
   这里结合Flink的事件驱动特性可以很容易理解,由于渠道F并没有任何事件传输过来,所以Flink本身不会对渠道F进行计算并做初始化的动作,因此结果值仍停留在上次事件发生时的统计状态
填坑:
    既然定位到原因,那么就需要人为或定时驱动事件产生触发计算。因此整个数据流向调整为下图:图片即在原来的流向中融入离线部分,凌晨定时抽取维度表数据并再次更新到维度表中,这样可以通过CDC模式触发一次计算。

本文内容由网友自发贡献,版权归原作者所有,本站不承担相应法律责任。如您发现有涉嫌抄袭侵权的内容,请联系:hwhale#tublm.com(使用前将#替换为@)

Flink从入门到放弃(十二)-企业实战之事件驱动型场景踩坑(一) 的相关文章

随机推荐

  • 今日头条号问答微头条原创收益向百粉开放

    今日头条号 问答和微头条原创收益向百粉作者开放了 以前没有还可以开通的作者 这次可以抓紧时间去申请开通 作者宋九九 头条号发布公告称 自2021 年 1 月 13 日起 微头条创作收益和问答创作收益权益面向粉丝数不低于100作者开放 只要你
  • Spring配置文件报错 application context not configured for this file

    Spring配置文件报错 application context not configured for this file 解决方法一 点击Configure application context 选择Create new applica
  • FileZilla Server 下载、安装、配置教程

    下载filezilla server filezilla server官网 FileZilla The free FTP solution https filezilla project org FileZilla Server下载 Fil
  • Python模块导入时全局变量"__all__"的作用

    Python中一个py文件就是一个模块 all 变量是一个特殊的变量 可以在py文件中 也可以在包的 init py中出现 1 在普通模块中使用时 表示一个模块中允许哪些属性可以被导入到别的模块中 如 全局变量 函数 类 如下 test1
  • 抓住czx【最短路】

    题目链接 首先 做这样的处理 把每个点的时间分割为几个区间 说明在这个区间内的时候 人在这个点内 那么 我们就有这样的选择 如果在这个区间内 或者区间之前抵达 就说明是可以碰见的 如果在这个区间之后抵达 就说明是见不到的了 所以跑最短路 如
  • Spring IOC容器初始化过程及其原理(源码层面)

    Spring大家族在Java技术生态体系中占有重要地位 其中Spring更是其中的佼佼者 它极大的简化了我们的代码开发量 提高我们的工作效率 其中Spring两大特性中的IOC特性是至关重要的 今天来从底层看一看Spring的容器的初始化过
  • USB描述符 包括bushound抓包

    USB描述符 USB描述符信息存储在USB设备中 在枚举过程中 USB主机会向USB设备发送GetDescriptor请求 USB设备在收到这个请求之后 会将USB描述符信息返回给USB主机 USB主机分析返回来的数据 判断出该设备是哪一种
  • 如何理解等错误率(EER, Equal Error Rate)?

    在语音vad和KWS任务中 经常用到EER 怎么正确理解EER FR定义 在一批本该全部正确 TRUE 的列表中出现几个没识别出正确的语音 这个就是错误拒识FR False Rejection 是Miss的 FA定义 在一批本该全部错误 F
  • 6、一个简单的新氧的小爬虫

    from bs4 import BeautifulSoup import requests import math url hos for i in range 1 15 url source http y soyoung com hosp
  • 输出二叉树的所有路径

    给你一个二叉树的根节点 root 按 任意顺序 返回所有从根节点到叶子节点的路径 叶子节点 是指没有子节点的节点 输入 root 1 2 3 null 5 输出 1 gt 2 gt 5 1 gt 3 解法一 深度优先搜索 递归 迭代也可以实
  • 聊聊cglib动态代理遇到的坑

    简介 cglib是另外一种动态代理的方法 他和jdk动态代理的实现是有区别的 我们在之前见过jdk动态代理类是必须实现了接口的 而cglib不需要实现接口 但是必须保证类不含有final关键字 否则是无法代理的 本文是从个人不小心遇到的cg
  • # SpringCloud集成 报错 An attempt was made to call a method that does not exist. The attempt was made

    SpringCloud集成 报错 An attempt was made to call a method that does not exist The attempt was made from the following locati
  • 【Git系列】Git概述

    Git概述 1 Git发展历史 2 Git与SVN的区别 3 Git本地结构 4 代码托管中心 4 1 代码托管中心是什么 4 2 托管中心种类 其他系列 Git最详细的体系化教程 1 Git发展历史 Git的发展历史可以追溯到2005年
  • Linux系统终端窗口ctrl+c,ctrl+z,ctrl+d的区别

    时常在Linux系统上 执行某命令停不下来 就这几个ctrl组合键按来按去 今天稍微总结下具体差别 便于以后linux系统运维操作 1 ctrl c强制中断程序 相应进程会被杀死 中断进程任务无法恢复执行 2 ctrl z暂停正在执行的进程
  • Oracle——基础知识汇总

    注意 在Oracle数据库的SQL命令中 关键字 表名和字段名都不区分大小写 语法是标准的 sql写法 参考链接 一 字符串类型 char 固定长度字符串 会用空格填充来达到最大长度 varchar2 变长度字符串 不补充空格 可以存储32
  • Ubuntu20.04无法开机/左上角小横杠闪烁/升级系统内核后与显卡驱动不匹配的问题

    问题描述 今天也不知道是因为升级了系统内核还是什么原因 导致系统开机后只有左上角一个小白杠一直闪烁 百度了以下确认了是nvidia显卡驱动和linux系统内核不匹配的问题 解决方法 1 安装nvidia显卡驱动 重装CUDA 左上角有小白杠
  • 线程安全和线程同步

    1 线程安全 每次执行的结果都是不确定的 因为线程的执行顺序是不可预见的 这是java同步产生的根源 synchronized 关键字保证了多个线程对于同步块是互斥的 synchronized作为一种同步手段 解决java多线程的执行有序性
  • 15 个最重要的 Java 多线程面试题及回答

    前言 在任何Java面试当中多线程和并发方面的问题都是必不可少的一部分 如果你想获得任何股票投资银行的前台资讯职位 那么你应该准备很多关于多线程的问题 在投资银行业务中多线程和并发是一个非常受欢迎的话题 特别是电子交易发展方面相关的 他们会
  • http://mail.163.com/help/help_spam_16.htm?ip=118.186.207.7&hostid=smtp5&time=1358341921

    0INRMumLsiQwT0xVgvYVmNCBWS7mV8LzSeLOZGHzflL3ziBSx iej3G1syAeYvPZxqagQ0P7mgdX qgnEWWuIcv4cTR6ZI5QNmqULAGtRkCtCNsphAD7cLBi
  • Flink从入门到放弃(十二)-企业实战之事件驱动型场景踩坑(一)

    需求背景 某日 小明早上10点打卡到公司 先来一杯热水润润嗓子 打开音乐播放器带上心爱的降噪耳机看看新闻 静静等待11点半吃午饭 突然消息框亮了起来 这个时候小明心想要么来需求了 要么数据就有问题了 这个时候运营A部的同学发消息过来说想要分