如何显示流式Dataframe(因为显示失败,analysisexception)?

qfe3c7zg  于 2021-06-08  发布在  Kafka
关注(0)|答案(2)|浏览(305)

所以我有一些数据,我在一个Kafka主题中,我把这些数据流放到一个 DataFrame . 我想显示数据框内的数据:

import os
from kafka import KafkaProducer
from pyspark.sql import SparkSession, DataFrame
import time
from datetime import datetime, timedelta

os.environ['PYSPARK_SUBMIT_ARGS'] = '--packages org.apache.spark:spark-sql-kafka-0-10_2.11:2.2.0,org.apache.spark:spark-streaming-kafka-0-8_2.11:2.2.0 pyspark-shell'

topic_name = "my-topic"
kafka_broker = "localhost:9092"

producer = KafkaProducer(bootstrap_servers = kafka_broker)
spark = SparkSession.builder.getOrCreate()
terminate = datetime.now() + timedelta(seconds=30)

while datetime.now() < terminate:
    producer.send(topic = topic_name, value = str(datetime.now()).encode('utf-8'))
    time.sleep(1)

readDF = spark \
    .readStream \
    .format("kafka") \
    .option("kafka.bootstrap.servers", kafka_broker) \
    .option("subscribe", topic_name) \
    .load()
readDF = readDF.selectExpr("CAST(key AS STRING)","CAST(value AS STRING)")

readDF.writeStream.format("console").start()
readDF.show()

producer.close()

然而,我不断得到这个错误:

During handling of the above exception, another exception occurred:

Traceback (most recent call last):
  File "/home/spark/spark/python/pyspark/sql/utils.py", line 63, in deco
    return f(*a,**kw)
  File "/home/spark/spark/python/lib/py4j-0.10.4-src.zip/py4j/protocol.py", line 319, in get_return_value
py4j.protocol.Py4JJavaError: An error occurred while calling o30.showString.
: org.apache.spark.sql.AnalysisException: Queries with streaming sources must be executed with writeStream.start();;
kafka
    at org.apache.spark.sql.catalyst.analysis.UnsupportedOperationChecker$.org$apache$spark$sql$catalyst$analysis$UnsupportedOperationChecker$$throwError(UnsupportedOperationChecker.scala:297)
    at org.apache.spark.sql.catalyst.analysis.UnsupportedOperationChecker$$anonfun$checkForBatch$1.apply(UnsupportedOperationChecker.scala:36)
    at org.apache.spark.sql.catalyst.analysis.UnsupportedOperationChecker$$anonfun$checkForBatch$1.apply(UnsupportedOperationChecker.scala:34)
    at org.apache.spark.sql.catalyst.trees.TreeNode.foreachUp(TreeNode.scala:127)
...
Traceback (most recent call last):
      File "test2.py", line 30, in <module>
        readDF.show()
      File "/home/spark/spark/python/pyspark/sql/dataframe.py", line 336, in show
        print(self._jdf.showString(n, 20))
      File "/home/spark/spark/python/lib/py4j-0.10.4-src.zip/py4j/java_gateway.py", line 1133, in __call__
      File "/home/spark/spark/python/pyspark/sql/utils.py", line 69, in deco
        raise AnalysisException(s.split(': ', 1)[1], stackTrace)
    pyspark.sql.utils.AnalysisException: 'Queries with streaming sources must be executed with writeStream.start();;\nkafka'

我不明白为什么会有例外,我打电话给你 writeStream.start() 就在之前 show() . 我试着摆脱 selectExpr() 但这没什么区别。有人知道如何显示源于流的Dataframe吗?我使用的是python3.6.1、kafka0.10.2.1和spark2.2.0

inkz8wg9

inkz8wg91#

streaming dataframe不直接支持show()方法,但是有一种方法可以查看数据,方法是让后台线程休眠一段时间,并在内存接收器中创建的temp表上使用show()函数。我可以帮助您使用show()方法的pyspark方法。
请参考我的答案

wgmfuz8q

wgmfuz8q2#

流式Dataframe不支持 show() 方法。当你打电话的时候 start() 方法,它将启动一个后台线程将输入数据流式传输到接收器,并且由于您使用的是consolesink,它将数据输出到控制台。你不需要打电话 show() .
删除 readDF.show() 并在此之后添加一个sleep,那么您应该能够在控制台中看到数据,例如

query = readDF.writeStream.format("console").start()
import time
time.sleep(10) # sleep 10 seconds
query.stop()

你还需要设置 startingOffsetsearliest ,否则,kafka源将仅从最新偏移量开始,在您的情况下不获取任何内容。

readDF = spark \
    .readStream \
    .format("kafka") \
    .option("kafka.bootstrap.servers", kafka_broker) \
    .option("startingOffsets", "earliest") \
    .option("subscribe", topic_name) \
    .load()

相关问题