spark结构化流式处理使用pyspark将空值输出到配置单元

ghg1uchk  于 2021-05-27  发布在  Spark
关注(0)|答案(0)|浏览(264)

我在从dstream移植到结构化流媒体时遇到了问题,我已经设置了代码,将数据从比特币价格api输出到hive中。
数据是这样的

[{"time": 1599859680, "open": "10328.0", "high": "10330.8","low": "10328.0", "close": "10330.8", "vwap": "10330.8", "volume": "0.00321565", "count": 1}, {"time": 1599859740, "open": "10330.8", "high": "10331.9", "low": "10330.8", "close": "10331.0", "vwap": "10331.5", "volume": "0.92199459", "count": 12}, {"time": 1599859800, "open": "10331.0", "high": "10331.0", "low": "10331.0", "close": "10331.0", "vwap": "0.0", "volume": "0.00000000", "count": 0}]

我的设置包括以下内容

from pyspark.sql import SparkSession
from pyspark.sql.functions import col, from_json
from pyspark.sql.types import *

bootstrapServers = "localhost:9099"
topics = "kraken"
subscribeType = "subscribe"

spark = SparkSession\
    .builder\
    .appName("StructuredBitcoin")\
    .config("spark.sql.warehouse.dir","/user/hive/warehouse")\
    .config("hive.metastore.uris","thrift://localhost:9083")\
    .enableHiveSupport()\
    .getOrCreate()

data = spark\
    .readStream\
    .format("kafka")\
    .option("kafka.bootstrap.servers",bootstrapServers)\
    .option(subscribeType, topics)\
    .load()

我对模式的定义如下

schema = StructType([
    StructField("time",TimestampType()),
    StructField("open",IntegerType()),
    StructField("high",IntegerType()),
    StructField("low",IntegerType()),
    StructField("close",IntegerType()),
    StructField("vwap",IntegerType()),
    StructField("volume",IntegerType()),
    StructField("count",IntegerType())
    ])

然后我选择数据(这是我认为我出错的地方)

df = data.select(from_json(col("value").cast("string"), schema)\
    .alias("bitcoin")).selectExpr("bitcoin.*")

给Hive写信

def write(df,epoch_id):
    df.show()
    df.write.mode("append").saveAsTable("bitcoin.data")

之后,我写流并等待终止

query = df.writeStream.foreachBatch(write).start()

query.awaitTermination()

结果输出只是一堆空值,这让我相信数据没有被正确读取。

NULL    NULL    NULL    NULL    NULL    NULL    NULL    NULL
NULL    NULL    NULL    NULL    NULL    NULL    NULL    NULL
NULL    NULL    NULL    NULL    NULL    NULL    NULL    NULL

我哪里做错了?
感谢你的帮助

暂无答案!

目前还没有任何答案,快来回答吧!

相关问题