如何将avro中的java bytes列(logicaltype为decimal)转换为float/number格式并写入parquet

cclgggtu  于 2021-07-13  发布在  Spark
关注(0)|答案(0)|浏览(329)

我想从hdfs读取一个avro文件,并将其作为parquet重新连接到hdfs。问题是有些字段有typ字节和逻辑类型decimal。我想wirte作为浮点数或最佳拟合的数字格式,以使用与配置单元Parquet文件或可以很容易地使用Spark功能,如总和的列。
avro模式示例:

{
    "name" : "Field1",
    "type" : [ "null", {
      "type" : "bytes",
      "scale" : 0,
      "precision" : 64,
      "connect.version" : 1,
      "connect.parameters" : {
        "scale" : "0"
      },
      "connect.name" : "org.apache.kafka.connect.data.Decimal",
      "logicalType" : "decimal"
    } ],
    "default" : null
  },

读取avro和写入parquet的代码:

SparkContext context = new SparkContext(new SparkConf().setAppName("spark-ml").setMaster("local[*]")
        .set("dfs.client.use.datanode.hostname", "true")
        .set("spark.hadoop.fs.default.name", "hdfs://sandbox-hdp.hortonworks.com:8020")
        .set("spark.hadoop.fs.defaultFS", "hdfs://sandbox-hdp.hortonworks.com:8020")
        .set("spark.hadoop.fs.hdfs.impl", org.apache.hadoop.hdfs.DistributedFileSystem.class.getName())
        .set("spark.hadoop.fs.hdfs.server", org.apache.hadoop.hdfs.server.namenode.NameNode.class.getName())
        .set("spark.hadoop.conf", org.apache.hadoop.hdfs.HdfsConfiguration.class.getName()));

SparkSession spark = SparkSession
          .builder()                 
           .sparkContext(context)
          .enableHiveSupport()
          .getOrCreate();

Dataset<Row> df = spark.read().format("avro")           
        .load("/tmp/partition=0/tmp.avro");

df.show();
df.printSchema();

 df.write().mode(SaveMode.Overwrite)
 .parquet("/tmp/parquet/test.parquet");

我读了很多关于对话的文章,但没有什么真正有效的。有人有一个简单的方法来转换avro类型字节和逻辑类型十进制的数字格式,如浮点数?

暂无答案!

目前还没有任何答案,快来回答吧!

相关问题