【clickhouse】使用waterdrop将Hive中的数据导入ClickHouse

x33g5p2x  于2021-12-05 转载在 ClickHouse  
字(3.4k)|赞(0)|评价(0)|浏览(806)

1.概述

转载:使用waterdrop将Hive中的数据导入ClickHouse
这里仅仅自己学习用。

前言
最近有一个需求需要把hive的数据同步到clickhouse,而且数据量还比较大,所以使用导出csv再导入clickhouse的那种方式并不适合。由于公司使用的服务器是某云服务器,Hadoop的底层不是使用的是原生的hdfs,导致没法使用datax。

waterdrop
waterdrop的官方地址:https://interestinglab.github.io/waterdrop-docs/#/zh-cn/v1/

Waterdrop 是一个非常易用,高性能、支持实时流式和离线批处理的海量数据处理产品,架构于Apache Spark 和 Apache Flink之上。

1)准备环境
使用 Waterdrop前请先准备好Spark和Java运行环境。java版本为jdk1.8

2)waterdrop下载和解压

下载地址:https://github.com/InterestingLab/waterdrop/releases

目前1.x的稳定版为v1.5.1。spark >= 2.3 下载 waterdrop-1.5.1.zip, spark < 2.3 下载waterdrop-1.5.1-with-spark.zip。

如果Github下载速度慢,可通过百度云(链接:https://pan.baidu.com/s/19GUwZPC2YBG9Pt7iuF9TNw 密码:upeb) 直接下载。

解压:unzip waterdrop-1.5.1.zip

3)更改启动目录
cd到waterdrop的config目录下,找到waterdrop-env.sh并打开,找到如果spark_home没有配置环境变量,需要把spark_home改了,如果配置有环境变量就不用管。

hive2clickhouse
1)通用配置
一个完整的Waterdrop配置包含spark, input, filter, output, 即:

spark {
    ...
}

input {
    ...
}

filter {
    ...
}

output {
    ...
}

spark是spark相关的配置,可配置的spark参数见: Spark Configuration, 其中master, deploy-mode两个参数不能在这里配置,需要在Waterdrop启动脚本中指定。
input可配置任意的input插件及其参数,具体参数随不同的input插件而变化。
filter可配置任意的filter插件及其参数,具体参数随不同的filter插件而变化。
output可配置任意的output插件及其参数,具体参数随不同的output插件而变化。filter处理完的数据,会发送给output中配置的每个插件

2)编写脚本

spark {
  spark.sql.catalogImplementation = "hive"
  spark.app.name = "Waterdrop"
  spark.executor.instances = 30
  spark.executor.cores = 2
  spark.executor.memory = "2g"
}
input {
    hive {
        pre_sql = "select id,user_id,grade,money_type_id,money,shop,amount,balance,bi_platform_id,server_id,act_type,act_method,act_method2,record_time,bi_sid,gather_time,if(record_time is null,dateline,from_unixtime(record_time,'yyyy-MM-dd')) as dateline,bi_pid from sgz_big_2017.ods_bi_money_list where dateline='2021-01-01'"
        table_name = "ods_bi_money_list"
    }
}

filter {}

output {
    clickhouse {
        host = "xxx.xx.xxx.xx:8123"
        database = "big_sgz_2017"
        table = "bi_money_list"
        fields = [ "id","user_id","grade","money_type_id","money","shop","amount","balance","bi_platform_id","server_id","act_type","act_method","act_method2","record_time","bi_sid","gather_time","dateline","bi_pid"]
        username = "default"
        password = "m8yjvWQ+"
    }
}

相关配置参考clickhouse输出配置

3)部署和运行
在本地以local方式运行Waterdrop

./bin/start-waterdrop.sh --master local[4] --deploy-mode client --config ./config/application.conf

在Spark Standalone集群上运行Waterdrop

# client 模式
./bin/start-waterdrop.sh --master spark://207.184.161.138:7077 --deploy-mode client --config ./config/application.conf

# cluster 模式
./bin/start-waterdrop.sh --master spark://207.184.161.138:7077 --deploy-mode cluster --config ./config/application.conf

在Yarn集群上运行Waterdrop

# client 模式
./bin/start-waterdrop.sh --master yarn --deploy-mode client --config ./config/application.conf

# cluster 模式
./bin/start-waterdrop.sh --master yarn --deploy-mode cluster --config ./config/application.conf

在Mesos上运行Waterdrop

# cluster 模式
./bin/start-waterdrop.sh --master mesos://207.184.161.138:7077 --deploy-mode cluster --config ./config/application.conf

cluster、client、local模式下必须把hive-site.xml置于提交任务节点的HADOOP_CONF目录下(或者放在$SPARK_HOME/conf下面),IDE本地调试将其放在resources目录

4)遇到的问题
max concurrent queries
在设置spark的时候并行度设置过高,导致连接数超过ck配置的最大连接数。
解决办法:降低spark的并行度,增加ck的连接数(max_concurrent_queries)
connection reset

目前暂时没有找到是什么原因导致的超时连接,但是减少并行线程就不会报这个错误。
解决办法:
1)减少spark的并行度
2)在clickhouse的output插件中增加clickhouse.socket_timeout参数

相关文章

微信公众号

最新文章

更多