接着clickhouse原理篇,下面来介绍他的具体使用场景,包括数据导入,更新等
1. 数据导入
根据官方介绍,Clickhouse的分布式表数据导入有两种形式,一种是在外部将数据划分好分片,分别并行的加载到各个节点的本地表,这种速度是最快的,同时也不需要分布式引擎处理网络发送各个节点数据;另外一种是直接将数据批量插入分布式表,由分布式表引擎处理数据到各节点的写入,需要节点间的网络通信,速度稍慢,但是在实际观察中,也还好,更重要的是操作方便,重跑容易;
调研
目前见过的几种写入形式:
1. 这里我参考过一些大厂的方法,其中我感觉最有效的是,k8s动态创建出一个新的clickhouse 集群,写入数据利用这个集群生成对应的分区文件,然后直接将文件attach到生产集群上的clickhouse各节点,并销毁用来生成分区文件的clickhouse集群。这才是终极操作,直接加载目的文件!
2. 使用阿里云的产品的话,他们有提供专门的spark + waterdrop 对接clickhouse, 这里他们使用虚拟ip 加上spark多进程写入clickhouse集群
3. 自己使用jdbc 写入,如果是向单点分布式表写入再由它分发的话,速度可想而知了,不过也可以自己整多线程 + vip写入集群,速度理论和2 差不多
我这里使用的是腾讯云的Ck产品,由于才刚上不久缺乏对应完善的配套,导入数据就比较麻烦了,由于需要替换greenplum,自然想到继承原来的集群间数据中转方式: 腾讯云对象存储cos。这里我们将在gp中计算好的数据通过外表方式写入到cos文件系统中以csv存储;然后下一步就需要clickhouse 中导入cos数据了,这里我测试了clickhouse 集成的COSN表引擎,从该表读取数据向分布式表插入, 但是这么做,首先是单点写入,性能不足,其次,cosn引擎实现的并不好,insert into xx select * from cosn_tb时,需要将cos指定的全部文件载入内存,轻易就OOM了,根本无法使用,腾讯云那边似乎也拿不出解决方式。
计划
由于时间拖的非常久了,领导说数据导入的事情该要了结了。。
此时我这里就只能用上老本行spark了,花了一天的时间搭建和调试,在买来的cvm搭建了5个节点的hadoop + spark 集群(这里不敢尝试太高版本,hadoop 2.7.6 + spark 2.4.7 ),然后参考了阿里云那边spark 导入数据到clickhouse 的代码,通过自己调节并行度,+ 虚拟ip 直接使用spark-shell 粘贴代码做不同并行度的写入测试。
数据源: cos上的csv
schema: jdbc读取pg表后df.schema
数据目的地:clickhouse集群的分布式表(主要是因为vip进入的节点不可控,不如让分布式表自己根据规则分配数据所在分片)
实施
1. cos文件系统集成
为了让spark能读取cos上的csv文件,需要给hadoop集成cos文件系统,类似于hdfs:// ,cos支持 cosn://, 这里参考hadoop集成cos文件系统
除了以上配置,还需要将spark配置中加上hadoop中存放这些jar包的类路径,以便spark也能访问cos文件系统
2. 编码获取
val table_schema = spark.jdbc("jdbc:postgres://xxx/xxx",table_name,pg_perproties).schema
val df = spark
.read
.schema(schema_df.schema)
.csv(s"cosn://ad-cdwxxxxx")
val properties = new Properties()
properties.put("driver", "ru.yandex.clickhouse.ClickHouseDriver")
properties.put("user", "<your-user-name>")
properties.put("password", "<your-password>")
properties.put("batchsize","100000")
properties.put("socket_timeout","300000")
properties.put("numPartitions","8")
properties.put("rewriteBatchedStatements","true")
df.write.mode(SaveMode.Append)
.option(JDBCOptions.JDBC_BATCH_INSERT_SIZE, 100000)
.jdbc(url, table, properties)
我们当前的4节点集群(2分片 ,每个分片2副本)经过测试并行度12写入是最稳定的,并行度越高速度是会越快,但是有一定几率会使得clickhouse返回内存不足的错误,任务失败了重跑又会耽误一倍的时间,且会导致数据重复,需要依赖于表的特性来去重;
2. 数据更新和使用
由于各种不可预料的情况,任务失败重试等,会导致数据的重复写入,以及每15分钟的数据刷新等,此处为了用户使用的连续性,也不能删除数据再写入,而是直接写入数据并去重更新数据,此时依赖表引擎的去重设计就比较重要了。
这里介绍我们线上将要使用的
ENGINE = ReplicatedReplacingMergeTree('/clickhouse/tables/{shard}/local_table_name', '{replica}')
PARTITION BY toYYYYMMDD(stat_datetime)
PRIMARY KEY field1
ORDER BY (field1, field2,field3,...)
其中副本机制依靠zookeeper实现,这里不介绍,主要介绍ReplacingMergeTree表引擎,除了拥有MergeTree那些共有特性,它还有个额外功能,根据Order by指定的字段,在单机器,单分区内部去重。(为什么说单机器呢,如果一个数据写入了两个节点的同一个分区,那也是没办法去重的,所以数据根据规则额每次落于固定的分片相当重要)。
数据完全写入后,需要执行optimize进行去重,如果数据量很大,且集群负载较高不能执行的话,需要对有重复数据的分区进行final查询,然后union上其他分区的数据在进行运算,形如
select
k1,k2,k3
sum(xxx),
count(xxx)
from (
select * from report_table_name where stat_datetime <={old_statime}
union all
select * from report_table_name FINAL where stat_datetime > {old_stattime}
) raw
group by k1,k2,k3
CREATE VIEW IF NOT EXISTS dn_name.view_tablename_update_1day on cluster default_cluster
as
select * from dn_name.tablename
where stat_datetime <= yesterday()
union all
select * from dn_name.tablename final where stat_datetime = today()
;
以这种方式,即使更新数据的分去重复,也能得出正确的结果,就是会比普通查询会慢些,而且有oom风险,具体还要看使用情况。
以上就是我对于clickhouse目前使用上的理解,后续有新的内容会再补充
本文内容由网友自发贡献,版权归原作者所有,本站不承担相应法律责任。如您发现有涉嫌抄袭侵权的内容,请联系:hwhale#tublm.com(使用前将#替换为@)