我在从dstream移植到结构化流媒体时遇到了问题,我已经设置了代码,将数据从比特币价格api输出到hive中。
数据是这样的
[{"time": 1599859680, "open": "10328.0", "high": "10330.8","low": "10328.0", "close": "10330.8", "vwap": "10330.8", "volume": "0.00321565", "count": 1}, {"time": 1599859740, "open": "10330.8", "high": "10331.9", "low": "10330.8", "close": "10331.0", "vwap": "10331.5", "volume": "0.92199459", "count": 12}, {"time": 1599859800, "open": "10331.0", "high": "10331.0", "low": "10331.0", "close": "10331.0", "vwap": "0.0", "volume": "0.00000000", "count": 0}]
我的设置包括以下内容
from pyspark.sql import SparkSession
from pyspark.sql.functions import col, from_json
from pyspark.sql.types import *
bootstrapServers = "localhost:9099"
topics = "kraken"
subscribeType = "subscribe"
spark = SparkSession\
.builder\
.appName("StructuredBitcoin")\
.config("spark.sql.warehouse.dir","/user/hive/warehouse")\
.config("hive.metastore.uris","thrift://localhost:9083")\
.enableHiveSupport()\
.getOrCreate()
data = spark\
.readStream\
.format("kafka")\
.option("kafka.bootstrap.servers",bootstrapServers)\
.option(subscribeType, topics)\
.load()
我对模式的定义如下
schema = StructType([
StructField("time",TimestampType()),
StructField("open",IntegerType()),
StructField("high",IntegerType()),
StructField("low",IntegerType()),
StructField("close",IntegerType()),
StructField("vwap",IntegerType()),
StructField("volume",IntegerType()),
StructField("count",IntegerType())
])
然后我选择数据(这是我认为我出错的地方)
df = data.select(from_json(col("value").cast("string"), schema)\
.alias("bitcoin")).selectExpr("bitcoin.*")
给Hive写信
def write(df,epoch_id):
df.show()
df.write.mode("append").saveAsTable("bitcoin.data")
之后,我写流并等待终止
query = df.writeStream.foreachBatch(write).start()
query.awaitTermination()
结果输出只是一堆空值,这让我相信数据没有被正确读取。
NULL NULL NULL NULL NULL NULL NULL NULL
NULL NULL NULL NULL NULL NULL NULL NULL
NULL NULL NULL NULL NULL NULL NULL NULL
我哪里做错了?
感谢你的帮助
暂无答案!
目前还没有任何答案,快来回答吧!