如何在hadoop集群中将数据从sparkr插入hbase

wsewodh2  于 2021-06-01  发布在  Hadoop
关注(0)|答案(1)|浏览(345)

我正在寻求帮助,使sparkr数据直接加载到hbase。read函数正在工作,我可以使用sparkr(sparkr.session)从配置单元外部表读取数据
执行的步骤:
创建了一个hbase表(hbase\u test1)
在配置单元中创建了一个外部表来Map配置单元中的hbase表(test1)
代码:

library(SparkR)

sc <- sparkR.session(master = "local",sparkEnvir = list(spark.driver.memory="2g",enableHiveSupport=TRUE))
sqlContext <- sparkR.session(sc)

df <- sql("show tables")
collect(df)

sdf <- sql("SELECT * from test1")

这就是我的立场。
我可以直接从sparkr将数据写入hbase吗?仅供参考:我需要使用sparkr的某些ml代码。结果需要保存回hbase。请注意,我使用的是所有开源工具。

wwtsj6pe

wwtsj6pe1#

不需要额外的部署,就可以使用apachespark-apachehbase连接器。
首先你必须包括包裹。可以通过以下选项来完成*

spark.jars.packages  com.hortonworks:shc-core:1.1.1-2.1-s_2.11
spark.jars.repositories http://repo.hortonworks.com/content/groups/public/

在你的 spark-defaults.conf 或等效的命令行参数 spark-submit / SparkR ```
--packages com.hortonworks:shc-core:1.1.1-2.1-s_2.11
--repositories http://repo.hortonworks.com/content/groups/public/

版本( `s_2.11` 包的版本必须匹配用于构建spark的scala版本。
现在假设您将表定义为

create 'FooBar', 'Foo', 'Bar'

您希望sparkr insert等效于:

put 'FooBar', '1000', 'Foo:Value', 'x1'
put 'FooBar', '1000', 'Bar:Value', 'y1'
put 'FooBar', '2000', 'Foo:Value', 'x2'
put 'FooBar', '2000', 'Bar:Value', 'y2'

必须提供目录Map:

catalog = '{
"table":{"namespace":"default", "name":"FooBar"},
"rowkey":"key",
"columns":{
"rowkey":{"cf":"rowkey", "col":"key", "type":"string"},
"foo_value":{"cf":"Foo", "col":"Value", "type":"string"},
"bar_value":{"cf":"Bar", "col":"Value", "type":"string"}
}
}'

以及输入表:

df <- createDataFrame(data.frame(
rowkey = c("1000", "2000"), foo_value = c("x1", "x2"), bar_value = c("y1", "y2")
))

最后你可以申请了 `write.ml` 使用以下选项:

write.df(df,
source = "org.apache.spark.sql.execution.datasources.hbase",
mode = "append", catalog = catalog)

详情请参阅官方连接器文档。
如果不介意其他依赖项,可以部署apachephoenix,Maphbase表(例如phoenix-447),然后使用官方连接器或内置jdbc源来编写数据。
以额外的成本,它将提供更好的用户体验。例如,如果将phoenix表定义为:

CREATE TABLE foobar (
id VARCHAR NOT NULL PRIMARY KEY,
foo INTEGER,
bar VARCHAR
);

你可以的

SparkR:::callJStatic(
"java.lang.Class", "forName",
"org.apache.phoenix.jdbc.PhoenixDriver"
)

df <- createDataFrame(data.frame(
id = c("1000", "2000"), foo = c(1, 2), bar = c("x", "y")
))

write.df(
dfr, source = "org.apache.phoenix.spark",

Note that the only supported mode is overwrite,

which in fact works like UPSERT

mode = "overwrite",
table = "FooBar",

ZooKeeper URL

zkUrl = "host:port"
)

与第一个选项类似,您必须包含相应的连接器。但是,与hbase连接器不同的是,它不是自给自足的,需要phoenix core和客户端jar `CLASSPATH` .

* 以后别忘了调整软件包版本。

相关问题