在spark中处理数据并保存结果

1hdlvixo  于 2021-06-28  发布在  Hive
关注(0)|答案(0)|浏览(228)

我有三个表,每个表有2gb的数据。
dc-第一个表包含人员的数据(姓名、genre、passport等)。
ec-第二个表包含这些人的地址。
第三个表包含这些人的账单地址。
我需要迭代第二个表并在第三个表中按passport进行搜索,以生成最佳地址的得分(多个算法)。
如何处理这些数据并将结果保存在表中?
注意:我正在处理Parquet文件
更新(@cheseaux):我已经完成了
我有3个文件:dc.txt,ec.txt和eck.txt
导入数据:

val DC = sqlContext.read
          .format("com.databricks.spark.csv")
          .option("header", "false")
          .option("inferSchema", "false")
          .option("mode", "DROPMALFORMED")
          .option("delimiter", "|")
          .load("/DC.txt")

并保存到配置单元上下文:

DC.select("*").write
    .format("com.databricks.spark.csv")
    .option("header", "false")
    .option("inferSchema", "false")
    .option("mode", "DROPMALFORMED")
    .option("delimiter", "|")
    .option("quote", " ")
    .save("hdfs://localhost:8020/project/DC")

创建外部表:

CREATE EXTERNAL TABLE EC(
        some     String, 
        var   String, 
        here     String
    )
ROW FORMAT
DELIMITED FIELDS TERMINATED BY '\|'
STORED AS TEXTFILE 
LOCATION 'hdfs://localhost:8020/project/DC'

创建Parquet地板桌:

CREATE TABLE 
    EC_PARQUET(
        some     String, 
        var   String, 
        here     String
  )
ROW FORMAT
    SERDE 'org.apache.hadoop.hive.ql.io.parquet.serde.ParquetHiveSerDe'
STORED AS
    INPUTFORMAT 'org.apache.hadoop.hive.ql.io.parquet.MapredParquetInputFormat'
    OUTPUTFORMAT 'org.apache.hadoop.hive.ql.io.parquet.MapredParquetOutputFormat'

将所有数据插入Parquet地板表:

INSERT INTO EC_PARQUET SELECT * FROM EC

现在我试着处理这些数据:

val hc = new org.apache.spark.sql.hive.HiveContext(sc)
val ec = hc.sql("SELECT * FROM EC_PARQUET")

并尝试:

import org.apache.spark.sql.Row;

val test = ec.take(100000).map(row => {
    val row1 = row.getAs[String](1)
    val make = if (row1.toLowerCase == "cualquiercosa") "S" else row1
    Row(row(0),make,row(3))
  }).registerTempTable("teste2")

如何创建具有此修改列的表?

暂无答案!

目前还没有任何答案,快来回答吧!

相关问题