在saveasnewapihadoopdataset上阻止使用python的spark流式写入数据到hbase

n3schb8v  于 2021-06-09  发布在  Hbase
关注(0)|答案(1)|浏览(340)

我用spark流python读kafka写hbase,我发现saveasnewapihadoopdataset阶段的作业很容易被阻塞。如下图所示:你会发现这个舞台的持续时间是8小时。spark是通过hbase api写数据还是直接通过hdfs api写数据?

sqserrrh

sqserrrh1#

有点晚了,但下面是一个将rdd保存到hbase的类似示例:
考虑一个包含一行的rdd:

{"id":3,"name":"Moony","color":"grey","description":"Monochrome kitty"}

转换rdd
我们需要将rdd转换为(键,值)对,其内容如下:
(行键,[行键,列族,列名,值])

datamap = rdd.map(lambda x: (str(json.loads(x)["id"]),[str(json.loads(x)["id"]),"cfamily","cats_json",x]))

保存到hbase
我们可以利用 RDD.saveAsNewAPIHadoopDataset 本例中使用的函数:pyspark hbase example将rdd保存到hbase?

datamap.saveAsNewAPIHadoopDataset(conf=conf,keyConverter=keyConv,valueConverter=valueConv)

您可以参考我的博客:pyspark sparkstreaming hbase来获取工作示例的完整代码。

相关问题