结构化流Dataframe为空

ui7jx7zq  于 2021-07-12  发布在  Spark
关注(0)|答案(0)|浏览(204)

我有一个spark集群在运行,还有一个twitter流api套接字在端口9005上监听。然后我尝试以下代码来显示tweet。为了调试的目的,我想显示Dataframe。

if __name__ == "__main__":

    spark = SparkSession.builder.appName("TwitterSentimentAnalysis").getOrCreate()

    tweet_df = spark \
        .readStream \
        .format("socket") \
        .option("host", "127.0.0.1") \
        .option("port", 9005) \
        .load()

    query = tweet_df \
        .writeStream \
        .outputMode("append") \
        .format("memory") \
        .queryName("tweet_count") \
        .trigger(processingTime='20 seconds') \
        .start()

    while True:
        clear_output(wait=True)
        display(query.status)
        display(spark.sql('SELECT * FROM tweet_count').show())
        time.sleep(1)

我没有错误,但输出是空的,如下所示。下面是一些while循环。我可以在监听插座上看到笔记本输出的tweet,这让我觉得问题不存在。

WARNING: An illegal reflective access operation has occurred
WARNING: Illegal reflective access by org.apache.spark.unsafe.Platform (file:/opt/spark/jars/spark-unsafe_2.12-3.1.1.jar) to constructor java.nio.DirectByteBuffer(long,int)
WARNING: Please consider reporting this to the maintainers of org.apache.spark.unsafe.Platform
WARNING: Use --illegal-access=warn to enable warnings of further illegal reflective access operations
WARNING: All illegal access operations will be denied in a future release
21/03/11 01:48:58 WARN NativeCodeLoader: Unable to load native-hadoop library for your platform... using builtin-java classes where applicable
Using Spark's default log4j profile: org/apache/spark/log4j-defaults.properties
21/03/11 01:49:00 INFO SparkContext: Running Spark version 3.1.1
21/03/11 01:49:00 INFO ResourceUtils: ==============================================================
21/03/11 01:49:00 INFO ResourceUtils: No custom resources configured for spark.driver.
21/03/11 01:49:00 INFO ResourceUtils: ==============================================================
21/03/11 01:49:00 INFO SparkContext: Submitted application: TwitterSentimentAnalysis
21/03/11 01:49:00 INFO ResourceProfile: Default ResourceProfile created, executor resources: Map(cores -> name: cores, amount: 1, script: , vendor: , memory -> name: memory, amount: 1024, script: , vendor: , offHeap -> name: offHeap, amount: 0, script: , vendor: ), task resources: Map(cpus -> name: cpus, amount: 1.0)
21/03/11 01:49:00 INFO ResourceProfile: Limiting resource is cpu
21/03/11 01:49:00 INFO ResourceProfileManager: Added ResourceProfile id: 0
21/03/11 01:49:00 INFO SecurityManager: Changing view acls to: delalma
21/03/11 01:49:00 INFO SecurityManager: Changing modify acls to: delalma
21/03/11 01:49:00 INFO SecurityManager: Changing view acls groups to: 
21/03/11 01:49:00 INFO SecurityManager: Changing modify acls groups to: 
21/03/11 01:49:00 INFO SecurityManager: SecurityManager: authentication disabled; ui acls disabled; users  with view permissions: Set(delalma); groups with view permissions: Set(); users  with modify permissions: Set(delalma); groups with modify permissions: Set()
21/03/11 01:49:01 INFO Utils: Successfully started service 'sparkDriver' on port 43057.
21/03/11 01:49:01 INFO SparkEnv: Registering MapOutputTracker
21/03/11 01:49:01 INFO SparkEnv: Registering BlockManagerMaster
21/03/11 01:49:01 INFO BlockManagerMasterEndpoint: Using org.apache.spark.storage.DefaultTopologyMapper for getting topology information
21/03/11 01:49:01 INFO BlockManagerMasterEndpoint: BlockManagerMasterEndpoint up
21/03/11 01:49:01 INFO SparkEnv: Registering BlockManagerMasterHeartbeat
21/03/11 01:49:01 INFO DiskBlockManager: Created local directory at /tmp/blockmgr-d524cd27-d32b-468b-a325-a1def82f6038
21/03/11 01:49:01 INFO MemoryStore: MemoryStore started with capacity 413.9 MiB
21/03/11 01:49:01 INFO SparkEnv: Registering OutputCommitCoordinator
21/03/11 01:49:02 WARN Utils: Service 'SparkUI' could not bind on port 4040. Attempting port 4041.
21/03/11 01:49:02 INFO Utils: Successfully started service 'SparkUI' on port 4041.
21/03/11 01:49:02 INFO SparkUI: Bound SparkUI to 0.0.0.0, and started at http://pd:4041
21/03/11 01:49:03 INFO StandaloneAppClient$ClientEndpoint: Connecting to master spark://192.168.56.104:7077...
21/03/11 01:49:03 INFO TransportClientFactory: Successfully created connection to /192.168.56.104:7077 after 79 ms (0 ms spent in bootstraps)
21/03/11 01:49:03 INFO StandaloneSchedulerBackend: Connected to Spark cluster with app ID app-20210311014903-0020
21/03/11 01:49:03 INFO Utils: Successfully started service 'org.apache.spark.network.netty.NettyBlockTransferService' on port 34633.
21/03/11 01:49:03 INFO NettyBlockTransferService: Server created on pd:34633
21/03/11 01:49:03 INFO BlockManager: Using org.apache.spark.storage.RandomBlockReplicationPolicy for block replication policy
21/03/11 01:49:03 INFO BlockManagerMaster: Registering BlockManager BlockManagerId(driver, pd, 34633, None)
21/03/11 01:49:03 INFO BlockManagerMasterEndpoint: Registering block manager pd:34633 with 413.9 MiB RAM, BlockManagerId(driver, pd, 34633, None)
21/03/11 01:49:03 INFO BlockManagerMaster: Registered BlockManager BlockManagerId(driver, pd, 34633, None)
21/03/11 01:49:03 INFO BlockManager: Initialized BlockManager: BlockManagerId(driver, pd, 34633, None)
21/03/11 01:49:04 INFO StandaloneSchedulerBackend: SchedulerBackend is ready for scheduling beginning after reached minRegisteredResourcesRatio: 0.0
21/03/11 01:49:05 INFO SharedState: Setting hive.metastore.warehouse.dir ('null') to the value of spark.sql.warehouse.dir ('file:/home/delalma/windowsshare/spark-warehouse').
21/03/11 01:49:05 INFO SharedState: Warehouse path is 'file:/home/delalma/windowsshare/spark-warehouse'.
21/03/11 01:49:07 WARN TextSocketSourceProvider: The socket source should not be used for production applications! It does not support recovery.
21/03/11 01:49:11 INFO StateStoreCoordinatorRef: Registered StateStoreCoordinator endpoint
21/03/11 01:49:11 WARN StreamingQueryManager: Temporary checkpoint location created which is deleted normally when the query didn't fail: /tmp/temporary-5b513dbd-ff79-4765-b0ef-505461138727. If it's required to delete it under any circumstances, please set spark.sql.streaming.forceDeleteTempCheckpointLocation to true. Important to know deleting temp checkpoint folder is best effort.
21/03/11 01:49:11 INFO MicroBatchExecution: Checkpoint root /tmp/temporary-5b513dbd-ff79-4765-b0ef-505461138727 resolved to file:/tmp/temporary-5b513dbd-ff79-4765-b0ef-505461138727.
21/03/11 01:49:11 INFO CheckpointFileManager: Writing atomically to file:/tmp/temporary-5b513dbd-ff79-4765-b0ef-505461138727/metadata using temp file file:/tmp/temporary-5b513dbd-ff79-4765-b0ef-505461138727/.metadata.681b8a15-a356-4106-af11-931957fb494f.tmp
21/03/11 01:49:11 INFO CheckpointFileManager: Renamed temp file file:/tmp/temporary-5b513dbd-ff79-4765-b0ef-505461138727/.metadata.681b8a15-a356-4106-af11-931957fb494f.tmp to file:/tmp/temporary-5b513dbd-ff79-4765-b0ef-505461138727/metadata
21/03/11 01:49:11 INFO MicroBatchExecution: Starting tweet_count [id = c98f34a0-899b-4268-9f77-1a25c3fce270, runId = 84e5f4b9-6561-4ba9-b984-019ef7406b72]. Use file:/tmp/temporary-5b513dbd-ff79-4765-b0ef-505461138727 to store the query checkpoint.
21/03/11 01:49:11 INFO MicroBatchExecution: Reading table [org.apache.spark.sql.execution.streaming.sources.TextSocketTable@1c9e3c7c] from DataSourceV2 named 'socket' [org.apache.spark.sql.execution.streaming.sources.TextSocketSourceProvider@28a6f9d9]
21/03/11 01:49:11 INFO MicroBatchExecution: Starting new streaming query.
21/03/11 01:49:11 INFO MicroBatchExecution: Stream started from {}
21/03/11 01:49:11 INFO CheckpointFileManager: Writing atomically to file:/tmp/temporary-5b513dbd-ff79-4765-b0ef-505461138727/offsets/0 using temp file file:/tmp/temporary-5b513dbd-ff79-4765-b0ef-505461138727/offsets/.0.271908c7-dbfa-4e8b-b443-aa1d16ea90a8.tmp
21/03/11 01:49:12 INFO CheckpointFileManager: Renamed temp file file:/tmp/temporary-5b513dbd-ff79-4765-b0ef-505461138727/offsets/.0.271908c7-dbfa-4e8b-b443-aa1d16ea90a8.tmp to file:/tmp/temporary-5b513dbd-ff79-4765-b0ef-505461138727/offsets/0
21/03/11 01:49:12 INFO MicroBatchExecution: Committed offsets for batch 0. Metadata OffsetSeqMetadata(0,1615427351898,Map(spark.sql.streaming.stateStore.providerClass -> org.apache.spark.sql.execution.streaming.state.HDFSBackedStateStoreProvider, spark.sql.streaming.join.stateFormatVersion -> 2, spark.sql.streaming.stateStore.compression.codec -> lz4, spark.sql.streaming.flatMapGroupsWithState.stateFormatVersion -> 2, spark.sql.streaming.multipleWatermarkPolicy -> min, spark.sql.streaming.aggregation.stateFormatVersion -> 2, spark.sql.shuffle.partitions -> 200))
[2K
[2K
{'message': 'Processing new data', 'isDataAvailable': True, 'isTriggerActive': True}
+-----+
|value|
+-----+
+-----+

None
[2K
[2K
{'message': 'Processing new data', 'isDataAvailable': True, 'isTriggerActive': True}
+-----+
|value|
+-----+
+-----+

None
21/03/11 01:49:16 INFO CodeGenerator: Code generated in 1174.33437 ms
21/03/11 01:49:16 INFO WriteToDataSourceV2Exec: Start processing data source write support: org.apache.spark.sql.execution.streaming.sources.MicroBatchWrite@63744bdb. The input RDD has 2 partitions.
[2K
[2K
{'message': 'Processing new data', 'isDataAvailable': True, 'isTriggerActive': True}
21/03/11 01:49:16 INFO SparkContext: Starting job: start at NativeMethodAccessorImpl.java:0
21/03/11 01:49:16 INFO DAGScheduler: Got job 0 (start at NativeMethodAccessorImpl.java:0) with 2 output partitions
21/03/11 01:49:16 INFO DAGScheduler: Final stage: ResultStage 0 (start at NativeMethodAccessorImpl.java:0)
21/03/11 01:49:16 INFO DAGScheduler: Parents of final stage: List()
21/03/11 01:49:16 INFO DAGScheduler: Missing parents: List()
21/03/11 01:49:17 INFO DAGScheduler: Submitting ResultStage 0 (MapPartitionsRDD[2] at start at NativeMethodAccessorImpl.java:0), which has no missing parents
+-----+
|value|
+-----+
+-----+

任何帮助,以了解什么可能是问题的根源请。

暂无答案!

目前还没有任何答案,快来回答吧!

相关问题