-
采用谓词下推的技术,提早进行过滤有可能减少必须在数据库分区之间传递的数据量
- 所谓谓词下推就是通过嵌套的方式,将底层查询语句尽量推到数据底层去过滤,这样在上层应用中就可以使用更少的数据量来查询,这种SQL技巧被称为谓词下推(Predicate pushdown)
聚合类group by操作,发生数据倾斜
-
map段部分聚合
- 开启Map端聚合参数设置set hive.map.aggr=true
- 在Map端进行聚合操作的条目数目set hive.grouby.mapaggr.checkinterval=100000
-
有数据倾斜的时候进行负载均衡(默认是false)
-
set hive.groupby.skewindata = true
-
阶段拆分-两阶段聚合 需要聚合的key前加一个随机数的前后缀,这样就均匀了,之后再按照原始的key聚合一次
-
生成的查询计划有两 个 MapReduce 任务。在第一个 MapReduce 中,map 的输出结果集合会随机分布到 reduce 中, 每个 reduce 做部分聚合操作,并输出结果。相同的 GroupB Key 有可能分发到不同的 reduce 中,从而达到负载均衡的目的;第二个 MapReduce 任务再根据预处 理的数据结果按照 Group By Key 分布到 reduce 中(这个过程可以保证相同的 Group By Key 分布到同一个 reduce 中),最后完成最终的聚合操作。
-
假设 key = 水果
select count(substr(a.tmp,1,2)) as key
from(
select concat(key,'_',cast(round(10*rand())+1 as string)) tmp
from table
group by tmp
)a
group by key
Join 优化
-
build table(小表)前置
- Hive在解析带join的SQL语句时,会默认将最后一个表作为probe table,将前面的表作为build table并试图将它们读进内存。如果表顺序写反,probe table在前面,引发OOM的风险就高了。
- 在维度建模数据仓库中,事实表就是probe table,维度表就是build table。假设现在要将日历记录事实表和记录项编码维度表来join 维度表在前,事实表在后
-
Reduce join 改为Map join
- 在Reduce阶段完成join。容易发生数据倾斜。可以用MapJoin把小表全部加载到内存在map端进行join,避免reducer处理
-
适用于小表和大表 join,将较小RDD中的数据直接通过collect算子拉取到Driver端的内存中来,然后对其创建一个Broadcast变量;接着对另外RDD执行map类算子,在算子函数内,从Broadcast变量中获取较小RDD 的全量数据,与当前RDD的每一条数据按照连接key进行比对,如果连接key相同的话,那么就将两个RDD的数据用你需要的方式连接起来。
- 设置自动选择MapJoin set hive.auto.convert.join = true;默认为true
设置合理的map reduce的task数量
-
map阶段优化。使单个map任务处理合适的数据量;
-
map的数量不是越多越好,如果一个任务有很多小文件(远远小于块大小128m),则每个小文件也会被当做一个块,用一个map任务来完成,而一个map任务启动和初始化的时间远远大于逻辑处理的时间,就会造成很大的资源浪费 。而且,同时可执行的map数是受限的
-
mapred.min.split.size: 指的是数据的最小分割单元大小;min的默认值是1B
-
mapred.max.split.size: max的默认值是256MB
-
小文件问题:
- 如果一个任务有很多小文件(远远小于块大小128M),则每个小文件也会当做一个块,用一个map任务来完成。
- 而一个map任务启动和初始化的时间远远大于逻辑处理的时间,就会造成很大的资源浪费。而且,比如有一个127M的文件,正常会用一个map去完成,但这个文件只有一个或者两个小字段,却有几千万的记录,如果map处理的逻辑比较复杂,用一个map任务去做,肯定也比较耗时。
-
当input的文件任务逻辑复杂,map执行非常慢的时候,可以考虑增加Map数,来使得每个map处理的数据量减少,从而提高任务的执行效率。
-
reduce阶段优化
-
启动和初始化reduce也会消耗时间和资源;
-
另外,有多少个reduce,就会有个多少个输出文件,如果生成了很多个小文件,那么如果这些小文件作为下一个任务的输入,则也会出现小文件过多的问题;
-
如果Reduce设置的过小,那么单个Reduce处理的数据将会加大,很可能会引起OOM异常
-
处理大数据量利用合适的Reduce数;使单个Reduce任务处理数据量大小要合适;
-
set mapred.reduce.tasks=10; 就是10个 如果是-1 就会估算
- hive.exec.reducers.bytes.per.reducer(每个reduce任务处理的数据量,默认为1000^3=1G)
- hive.exec.reducers.max(每个任务最大的reduce数,默认为999)
- 计算reducer数的公式很简单N=min(参数2,总输入数据量/参数1)
-
调整hive.exec.reducers.bytes.per.reducer参数的值 每个reduce处理数据量;
-
什么情况下只有一个reduce;
- 没有group by的汇总,
- 用了Order by
- 有笛卡尔积。
sort by代替order by
- HiveSQL中的order by与其他SQL方言中的功能一样,就是将结果按某字段全局排序,这会导致所有map端数据都进入一个reducer中,在数据量大时可能会长时间计算不完。
- 如果使用sort by,那么还是会视情况启动多个reducer进行排序,并且保证每个reducer内局部有序。为了控制map端数据分配到reducer的key,往往还要配合distribute by一同使用。如果不加distribute by的话,map端数据就会随机分配到reducer。
- 举个例子,假如要以UID为key,以上传时间倒序、记录类型倒序输出记录数据:
select uid,upload_time,event_type,record_data
from calendar_record_log
where pt_date >= 20190201 and pt_date <= 20190224
distribute by uid
sort by upload_time desc,event_type desc;
group by代替distinct
-
原因:distinct会将列中所有的数据保存到内存中 ,极有可能发生内存溢出
-
采用sum() group by的方式来替换count(distinct) 完成计算。
-
解决方案 :可以考虑使用Group By 或者 ROW_NUMBER() OVER(PARTITION BY col) 方式代替COUNT(DISTINCT col)
-
select count(distinct a) from calendar_record_log
where pt_date >= 20190101;
– 但是这样写会启动两个MR job(单纯distinct只会启动一个),
– 所以要确保数据量大到启动job的overhead远小于计算耗时,才考虑这种方法。
select count(1) from (
select uid from calendar_record_log
where pt_date >= 20190101
group by uid
) t;
– 用group by方式同时统计多个列?下面是解决方法:
select t.a,sum(t.b),count(t.c),count(t.d) from (
select a,b,null c,null d from some_table
union all
select a,0 b,c,null d from some_table group by a,c
union all
select a,0 b,null c,d from some_table group by a,d
) t;
优化SQL处理join数据倾斜
-
处理掉字段中带有空值的数据
-
原因:一个表内有许多空值时会导致MapReduce过程中,空成为一个key值,对应的会有大量的value值, 而一个key的value会一起到达reduce造成内存不足
-
1.在查询的时候,过滤掉所有为NULL的数据,比如:
create table res_tbl as
select n.* from
(select * from res where id is not null ) n
left join org_tbl o on n.id = o.id;
2.查询出空值并给其赋上随机数,避免了key值为空(数据倾斜中常用的一种技巧)
create table res_tbl as
select n.* from res n
full join org_tbl o
on case when n.id is null then concat(‘hive’, rand()) else n.id end = o.id;
选择使用Tez引擎
- Tez: 是基于Hadoop Yarn之上的DAG(有向无环图,Directed Acyclic Graph)计算框架。它把Map/Reduce过程拆分成若干个子过程,同时可以把多个Map/Reduce任务组合成一个较大的DAG任务,减少了Map/Reduce之间的文件存储。同时合理组合其子过程,也可以减少任务的运行时间
- 设置 hive.execution.engine = tez;通过上述设置,执行的每个HIVE查询都将利用Tez, 当然,也可以选择使用spark作为计算引擎
本地执行 和 并发执行
严格模式 (hive.mapred.mode = strict)
- 对于分区表,用户不允许扫描所有分区
- 使用了order by语句的查询,要求必须使用limit语句
- 限制笛卡尔积的查询
参考:博客1 博客2
Hive数据倾斜如何定位 + 怎么解决
-
Hive 中数据倾斜的基本表现
- 一般都发生在 Sql 中 group by 和 join on 上,而且和数据逻辑绑定比较深。
- 任务进度长时间维持在99%(或100%),查看任务监控页面**,发现只有少量(1个或几个)reduce子任务未完成**。因为其处理的数据量和其他reduce差异过大
-
如何产生
- key的分布不均匀或者说某些key太集中
- 业务数据自身的特性,例如不同数据类型关联产生数据倾斜
- SQL语句导致的数据倾斜
-
如何解决
- 开启map端combiner set hive.map.aggr = true
- 开启数据倾斜时负载均衡 set hive.groupby.skewindata = true
-
控制空值分布 将为空的key转变为字符串加随机数或纯随机数,将因空值而造成倾斜的数据分配到多个Reducer
- SQL语句调整
- 选用join key 分布最均匀的表作为驱动表。做好列裁剪和filter操作,以达到两表join的时候,数据量相对变小的效果。
- 大小表Join:使用map join让小的维度表(1000条以下的记录条数)先进内存。在Map端完成Reduce
- 大表Join大表:把空值的Key变成一个字符串加上一个随机数,把倾斜的数据分到不同的reduce上,由于null值关联不上,处理后并不影响最终的结果
- count distinct大量相同特殊值:count distinct 时,将值为空的情况单独处理,如果还有其他计算,需要进行group by,可以先将值为空的记录单独处理,再和其他计算结果进行union。
Hive中MR(map reduce)、Tez和Spark执行引擎对比
- MapReduce是一种编程模型,用于大规模数据集,分为映射 和 归约 ,大数据量下优势明显,读写HDFS次数多
- Tez是Apache开源的支持DAG(有向图)作业的计算框架,它直接源于MapReduce框架,核心思想是将Map和Reduce两个操作进一步拆分,把多个MR任务组合成一个较大的DAG任务,减少文件存储并可以优化子过程
- Spark基于map reduce算法实现的分布式计算,Job中间输出和结果可以保存在内存中,不需要读写HDFS,以DAG方式处理数据,数据量比较大的时候比较吃内存
-
Spark和Tez的区别:
- Spark与Tez都是以DAG方式处理数据
- Spark更像是一个通用的计算引擎,可以同时作为批式和流式的处理引擎,提供内存计算,实时流处理,机器学习等多种计算方式,适合迭代计算。tez作为一个框架工具,特定为hive和pig提供批量计算
- Spark属于内存计算,支持多种运行模式,可以跑在standalone,yarn上;而Tez只能跑在yarn上;
- Tez能够及时的释放资源,重用container,节省调度时间,对内存的资源要求率不高; 而spark如果存在迭代计算时,container一直占用资源;
- 使用场景:
- 如果数据需要快速处理而且资源充足,则可以选择Spark;如果资源是瓶颈,则可以使用Tez;
为什么任务执行的时候只有一个reduce?
-
原因:
- 使用了Order by (Order By是会进行全局排序)
-
直接COUNT(1),没有加GROUP BY,比如:有笛卡尔积操作 SELECT COUNT(1) FROM tbl WHERE pt=’201909’
-
解决方案:
- 避免使用全局排序,可以使用sort by进行局部排序
- 使用GROUP BY进行统计,不会进行全局排序,比如:SELECT pt,COUNT(1) FROM tbl WHERE pt=’201909’ group by pt;
-
Hive有索引么
-
Hive支持索引,但不支持主键或者外键。Hive索引可以建立在表中的某些列上,以提升一些操作的效率,例如减少MapReduce任务中需要读取的数据块的数量。
- 适用场景:适用于不更新的静态字段。以免总是重建索引数据。每次建立、更新数据后,都要重建索引以构建索引表
- hive在指定列上建立索引,会产生一张索引表(Hive的一张物理表),里面的字段包括,索引列的值、该值对应的HDFS文件路径、该值在文件中的偏移量
- 很少用索引
Hive为什么有分区
- 随着系统运行时间增长,表的数据量越来越大,使用分区技术可以指定条件,缩小数据扫描的范围,避免hive全表扫描,提升查询效率
- 可以将用户的整个表的数据 划分到多个子目录,
- 根据业务,通常按照年月日、地区等分区
如何使用分区
- PARTITION BY(col_name data_type)
- hive的分区字段使用的是表外字段。而mysql使用的是表内字段。
- hive的分区名区分大小写
- hive的分区本质是在表目录下面创建目录,但是该分区字段是一个伪列,不真实存在于数据中
- 一张表可以有一个或者多个分区,分区下面也可以有一个或者多个分区
- 双分区partitioned by (date_time string,type string),在文件系统中的表现为date_time为一个文件夹,type为date_time的子文件夹。
- 动态分区列必须在 SELECT 语句中的最后一个列中指定,且顺序与它们在 PARTITION() 子句中出现的顺序相同。
-
动态分区需要开启 set hive.exec.dynamic.partition = true; hive.exec.dynamic.parition.mode=nonstrict;
-- 创建静态分区 数据加载到指定的分区
create table if not exists part1(
uid int,
uname string,
uage int
)PARTITION BY (country string)
row format delimiterd fileds terminated by ',';
(stored as ORC| SequenceFile) ORC、 SequenceFile都是存储方式
(loacation 地址)
– 导入数据 需要指定分区 数据未知,根据分区值确定创建分区
load data local inpath ‘/usr/loacl/xxx’
into table part1 partition(country=‘China’);
– 开启动态分区 默认为false,不开启
set hive.exec.dynamic.partition=true;
hive.exec.dynamic.parition.mode=nonstrict;
– 创建动态双分区
create table if not exists dt_part1(
uid int,
uname string,
uage int
)
PARTITIONED BY (year string,month string)
row format delimited fields terminated by ‘,’;
– 在文件系统中的表现为date_time为一个文件夹,type为date_time的子文件夹。
– 追加写入数据
insert into dy_part1 partition(year,month)
select from part_tmp;
– 覆盖写入数据
insert overwrite dy_part1 partition(year,month)
select from part_tmp;
– 混合分区
create table if not exists dy_part2(
uid int,
uname string,
uage int
)
PARTITIONED BY (year string,month string)
row format delimited fields terminated by ‘,’;
– 插入数据
insert into dy_part2 partition(year=‘2018’,month)
select uid,uname,uage,month from part_tmp;
– 多个范围分区键
create table test_demo (value int)
partitioned by range (id1 INT, id2 INT, id3 INT)
(
– id1在(–∞,5]之间,id2在(-∞,105]之间,id3在(-∞,205]之间
partition p5_105_205 VALUES LESS THAN (5, 105, 205),
– id1在(–∞,5]之间,id2在(-∞,115]之间,id3在(-∞,+∞]之间
partition p5_115_max VALUES LESS THAN (5, 115, MAXVALUE)
)
– 查看分区数据
select * from part1 where country = ‘China’;
– 显示分区
show partitions part1;
– 增加分区
alter table part1 add partition(country = ‘india’) partition(country = ‘America’);
– 增加分区并设置数据
alter table part1 add partition(country = ‘xxx’)
location ‘user/hive/warehouse/xxx’
– 修改分区的存储路径 hdfs路径必须是全路径
alter table part1 partition(country=‘Vietnam’)
set location ‘hdfs://hadoop01:9000/user/hive/warehouse/brz.db/part1/country=Vietnam’
– 删除分区
alter table part1 drop partition(country = ‘india’)
– 手动向hdfs中创建分区目录,添加数据,创建好hive的外表之后,无法加载数据,
– 元数据中没有相应的记录
msck repair table tablename
分区注意事项
- hive的分区使用的表外字段,分区字段是一个伪列但是可以查询过滤。
- 分区使用的是表外字段,分桶使用的是表内字段
- 分区字段不建议使用中文
- 不太建议使用动态分区。因为动态分区将会使用mapreduce来查询数据,如果分区数量过多将导致namenode和yarn的资源瓶颈。所以建议动态分区前也尽可能之前预知分区数量。
- 分区属性的修改均可以使用手动元数据和hdfs的数据内容
- 在hive中的数据是存储在hdfs中的,我们知道hdfs中的数据是不允许修改只能追加的,那么在hive中执行数据修改的命令时,就只能先找到对应的文件,读取后执行修改操作,然后重新写一份文件。如果文件比较大,就需要大量的IO读写。在hive中采用了分桶的策略,只需要找到文件存放对应的桶,然后读取再修改写入即可。
为什么要分桶?
- 单个分区或者表中的数据量越来越大,当分区不能更细粒的划分数据时,所以会采用分桶技术将数据更细粒度的划分和管理。
分桶的意义
-
分桶是更细粒度的划分、管理数据,更多用来做数据抽样、JOIN操作
- 大表在JOIN的时候,效率低下。如果对两个表先分别按id分桶,那么相同id都会归入一个桶。那么此时再进行JOIN的时候是按照桶来JOIN的,那么大大减少了JOIN的数量。
- 对数据抽样的时候,也不需要扫描整个文件。只需要对每个分区按照相同规则抽取一部分数据即可。
- 原始数据中加入一些额外的结构,这些结构可以用于高效的查询,例如,基于ID的分桶可以使得用户的查询非常的块。
如何使用分桶
--创建一个分桶表 并且指定排序字段及排序规则
create table if not exists buc1(
uid int,
uname string,
uage int
)
distribute by (uid)
sorted by(uid desc) into 4 buckets
row format delimited fields terminated by ',';
– cluster by (uid)指定getPartition以哪个字段来进行hash散列,并且排序字段也是指定的字段,默认以正序进行排序
– distribute by(uid) – 指定getPartition以哪个字段来进行hash散列
– 加载数据 方式1
– 打开enforce bucketing开关,设置强制分桶属性
set hive.enforce.bucketing=true
set mapred.reduce.tasks = -1
insert overwrite table buc1
select uid,uname,uage from buc_temp
sort by (uid);
– 加载数据 方式2
– 将reducer个数设置为目标表的桶数,并在 SELECT 语句中用 DISTRIBUTE BY <bucket_key>
– 对查询结果按目标表的分桶键分进reducer中。
set hive.enforce.bucketing = false
set mapred.reduce.tasks = num_buckets
insert into table buc1
select uid,uname,uage from buc_temp
distribute by (uid) sort by (uage desc);
– 查看表结构
desc formatted tablename;
– 分桶查询结果
select from buc1 cluster by (uid);
‘’’
采样 TABLESAMPLE(BUCKET x OUT OF y)
x:表示从哪个 bucket 开始抽取数据 y:必须为该表总 bucket 数的倍数或因子
‘’’
– 查询第几桶 取出 uid % 4 == 0的数据
select from buc1 tablesample(bucket 1 out of 4 on uid);
– 查询uid 为奇数
select from buc1 tablesample(bucket 2 out of 2 on uid)
– 随机查询三条数据
select from part_tmp order by rand() limit 3;
select * from part_tmp tablesample(0.1 percent) ;
分区分桶表举例
- 例子也可参考 https://www.studytime.xin/article/hive-partition-and-bucket.html
-- 按照性别进行分区(1男2女),在分区中按照uid的奇偶进行分桶:
-- 分区使用的是表外字段,分桶使用的是表内字段
1 gyy1 1
2 gyy2 2
3 gyy3 2
4 gyy4 1
5 gyy5 2
6 gyy6 1
7 gyy7 1
8 gyy8 2
9 gyy9 1
10 gyy10 1
11 gyy11 2
12 gyy12 1
– 创建带有分区的分桶表
create table if not exists stus(
uid int,
uname string
)
partitioned by(sex int)
clustered by(uid) into 2 buckets
row format delimited filed terminated by ’ ‘;
– 创建临时表
create table if not exists stu_temp(
uid int,
uname string,
usex int
)
row format delimited fields terminated by ’ ';
– 临时表中添加数据
load data local inpath ‘/usr/local/hivedata/stu.dat’ into table stu_temp
– 分桶表中加数据
insert into table stus partition(sex)
select uid,uname,usex from stu_temp
cluster by (uid);
– 查询性别为女性的、并且学号为奇数的学生:
select * from stus tablesample(bucket 2 out of 2 on uid)
where sex=2;
Hive函数
-
关系函数: <= 、 >= 、 IS NULL 、IS NOT NULL、LIKE
-
日期函数: to_date、 year 、month 、second 、weekofyear、 datediff时间比较
-
条件函数: IF CASE
-
字符串函数:length、 reverse、 substr 截取字符串、lower、 trim去空格、CONCAT 字符串拼接
-
统计函数:
-
Hive的SQL还可以通过用户定义的函数(UDF),用户定义的聚合(UDAF)和用户定义的表函数(UDTF)进行扩展。
-
UDF、UDAF、UDTF的区别:
- UDF(User-Defined-Function)一进一出
- UDAF(User-Defined Aggregation Funcation)聚集函数,多进一出
- UDTF(User-Defined Table-Generating Functions)一进多出,如lateral view explore()
hive中split、coalesce及collect_list函数的用法
split将字符串转化为数组,即:split(‘a,b,c,d’ , ‘,’) ==> [“a”,“b”,“c”,“d”]。
coalesce(T v1, T v2, …) 返回参数中的第一个非空值;如果所有值都为 NULL,那么返回NULL。
collect_list列出该字段所有的值,不去重 select collect_list(id) from table。
使用过Hive解析JSON串吗
- hive 处理json数据总体来说有两个方向的路走
- 将json以字符串的方式整个入Hive表,然后通过使用UDF函数解析已经导入到hive中的数据,比如使用LATERAL VIEW json_tuple的方法,获取所需要的列名。
- 在导入之前将json拆成各个字段,导入Hive表的数据是已经解析过得。这将需要使用第三方的SerDe。