Spark通过Hive创建DataFrame

x33g5p2x  于2020-11-26 发布在 Spark  
字(3.6k)|赞(0)|评价(0)|浏览(1336)

在spark 2.0中,您可以轻松地从hive数据仓库读取数据,也可以向hive表中写入和追加数据。
本文将使用python编程语言实现:

从现有的hive表中创建DataFrame
将dataframe保存到一个新的hive表
通过insert语句和append write模式向现有的hive表追加数据。

首先,创建一个支持hive的sparksession,运行以下代码创建一个支持hive的spark会话:

from pyspark.sql import SparkSession
appName = "PySpark Hive Example"
master = "local"
# Create Spark session with Hive supported.
spark = SparkSession.builder \
.appName(appName) \
.master(master) \
.enableHiveSupport() \
.getOrCreate()

然后从hive读取数据,现在我们可以使用sparksession对象从hive数据库读取数据:

# Read data from Hive database test_db, table name: test_table.
df = spark.sql("select * from test_db.test_table")
df.show()

我使用derby作为hive metastore,并且我已经在test_db数据库上创建了一个名为test_table的表。在表内部,有两条记录。

结果如下所示:

+---+-----+
| id|value|
+---+-----+
|  1|  ABC|
|  2|  DEF|
+---+-----+

添加一个新列

# Let's add a new column
df = df.withColumn("NewColumn",lit('Test'))
df.show()

结果如下:

+---+-----+---------+
| id|value|NewColumn|
+---+-----+---------+
|  1|  ABC|     Test|
|  2|  DEF|     Test|
+---+-----+---------+

将Dataframe保存为一个新的hive表

使用下面的代码将Dataframe保存到一个新的名为test_table2的hive表中:

# Save df to a new table in Hive
df.write.mode("overwrite").saveAsTable("test_db.test_table2")
# Show the results using SELECT
spark.sql("select * from test_db.test_table2").show()

在日志中,我可以看到新表默认保存为parquet:

使用catalyst模式初始化拼花writesupport:

{
   "type" : "struct",
   "fields" : [ {
     "name" : "id",
     "type" : "long",
     "nullable" : true,
     "metadata" : { }
   }, {
     "name" : "value",
     "type" : "string",
     "nullable" : true,
     "metadata" : {
       "HIVE_TYPE_STRING" : "varchar(100)"
     }
   }, {
     "name" : "NewColumn",
     "type" : "string",
     "nullable" : false,
     "metadata" : { }
   } ]
}

对应的拼花消息类型:

message spark_schema {
   optional int64 id;
   optional binary value (UTF8);
   required binary NewColumn (UTF8);
}

将数据追加到现有的hive表

您还可以通过“insert sql statement”或“append”写入模式向现有的hive表追加数据。

# Append data via SQL
spark.sql("insert into test_db.test_table2 values (3, 'GHI', 'SQL INSERT')")
spark.sql("select * from test_db.test_table2").show()
# Append data via code
df = spark.sql("select 4 as id, 'JKL' as value, 'Spark Write Append Mode' as NewColumn")
df.show()
df.write.mode("append").saveAsTable("test_db.test_table2")
spark.sql("select * from test_db.test_table2").show()

这两条记录都被成功插入到表中,如下面的输出所示:

+---+-----+--------------------+
| id|value|           NewColumn|
+---+-----+--------------------+
|  4|  JKL|Spark Write Appen...|
|  1|  ABC|                Test|
|  2|  DEF|                Test|
|  3|  GHI|          SQL INSERT|
+---+-----+--------------------+

完整的代码如下: example.py

from pyspark.sql import SparkSession
from pyspark.sql.functions import lit

appName = "PySpark Hive Example"
master = "local"

# Create Spark session with Hive supported.
spark = SparkSession.builder \
    .appName(appName) \
    .master(master) \
    .enableHiveSupport() \
    .getOrCreate()

# Read data from Hive database test_db, table name: test_table.
df = spark.sql("select * from test_db.test_table")
df.show()

# Let's add a new column
df = df.withColumn("NewColumn",lit('Test'))
df.show()

# Save df to a new table in Hive
df.write.mode("overwrite").saveAsTable("test_db.test_table2")
# Show the results using SELECT
spark.sql("select * from test_db.test_table2").show()

# Append data via SQL
spark.sql("insert into test_db.test_table2 values (3, 'GHI', 'SQL INSERT')")
spark.sql("select * from test_db.test_table2").show()

# Append data via code
df = spark.sql("select 4 as id, 'JKL' as value, 'Spark Write Append Mode' as NewColumn")
df.show()
df.write.mode("append").saveAsTable("test_db.test_table2")
spark.sql("select * from test_db.test_table2").show()

在spark玩得开心!

相关文章

微信公众号

最新文章

更多