我用spark流python读kafka写hbase,我发现saveasnewapihadoopdataset阶段的作业很容易被阻塞。如下图所示:你会发现这个舞台的持续时间是8小时。spark是通过hbase api写数据还是直接通过hdfs api写数据?
sqserrrh1#
有点晚了,但下面是一个将rdd保存到hbase的类似示例:考虑一个包含一行的rdd:
{"id":3,"name":"Moony","color":"grey","description":"Monochrome kitty"}
转换rdd我们需要将rdd转换为(键,值)对,其内容如下:(行键,[行键,列族,列名,值])
datamap = rdd.map(lambda x: (str(json.loads(x)["id"]),[str(json.loads(x)["id"]),"cfamily","cats_json",x]))
保存到hbase我们可以利用 RDD.saveAsNewAPIHadoopDataset 本例中使用的函数:pyspark hbase example将rdd保存到hbase?
RDD.saveAsNewAPIHadoopDataset
datamap.saveAsNewAPIHadoopDataset(conf=conf,keyConverter=keyConv,valueConverter=valueConv)
您可以参考我的博客:pyspark sparkstreaming hbase来获取工作示例的完整代码。
1条答案
按热度按时间sqserrrh1#
有点晚了,但下面是一个将rdd保存到hbase的类似示例:
考虑一个包含一行的rdd:
转换rdd
我们需要将rdd转换为(键,值)对,其内容如下:
(行键,[行键,列族,列名,值])
保存到hbase
我们可以利用
RDD.saveAsNewAPIHadoopDataset
本例中使用的函数:pyspark hbase example将rdd保存到hbase?您可以参考我的博客:pyspark sparkstreaming hbase来获取工作示例的完整代码。