twitter+apachekafka+spark结构化流媒体无法工作

isr3a4wc  于 2021-05-27  发布在  Spark
关注(0)|答案(5)|浏览(341)

我想从github得到一些示例代码(https://github.com/kaantas/spark-twitter-sentiment-analysis). 我遵循以下步骤;
已启动zkserver
启动了kafka 2.5.0版本(我也在使用apachespark3.0.0和jdk8)
启动tweeetlistener.py(tweets开始流,我可以看到tweet cmd窗口)
我用spyder打开twitter主题avg情绪val.py,它只显示底部文本
注:我不知道任何关于jar的想法,如果我将使用外部jar,请解释如何?谢谢。。。

Traceback (most recent call last):

  File "C:\Users\merha\Desktop\spark-twitter-sentiment-analysis-master\twitter_topic_avg_sentiment_val.py", line 40, in <module>
    query.awaitTermination()

  File "C:\Anaconda3\lib\site-packages\pyspark\sql\streaming.py", line 103, in awaitTermination
    return self._jsq.awaitTermination()

  File "C:\Anaconda3\lib\site-packages\py4j\java_gateway.py", line 1305, in __call__
    answer, self.gateway_client, self.target_id, self.name)

  File "C:\Anaconda3\lib\site-packages\pyspark\sql\utils.py", line 137, in deco
    raise_from(converted)

  File "<string>", line 3, in raise_from

StreamingQueryException: org/apache/spark/kafka010/KafkaConfigUpdater
=== Streaming Query ===
Identifier: [id = f5dd9cb5-fcea-42ec-a20e-93a2ad233e1f, runId = 6cffdd89-3792-4500-a508-e4abc76425fb]
Current Committed Offsets: {}
Current Available Offsets: {}

Current State: INITIALIZING
Thread State: RUNNABLE
jhkqcmku

jhkqcmku1#


from pyspark.sql import SparkSession
from pyspark.sql.functions import *
import json
import sys
from pyspark.sql.types import *

def fun(avg_senti_val):
    try:
        if avg_senti_val < 0: return 'NEGATIVE'
        elif avg_senti_val == 0: return 'NEUTRAL'
        else: return 'POSITIVE'
    except TypeError:
        return 'NEUTRAL'

if __name__ == "__main__":

    schema = StructType([                                                                                          
        StructField("text", StringType(), True),
        StructField("senti_val", DoubleType(), True)    
    ])

    spark = SparkSession.builder.appName("TwitterSentimentAnalysis")  .getOrCreate()

    kafka_df = spark.readStream.format("kafka").option("kafka.bootstrap.servers", "localhost:9092").option("subscribe", "twitter3").option("startingOffsets", "earliest").load()

    kafka_df_string = kafka_df.selectExpr("CAST(value AS STRING)")

    tweets_table = kafka_df_string.select(from_json(col("value"), schema).alias("data")).select("data.*")
    sum_val_table = tweets_table.select(avg('senti_val').alias('avg_senti_val'))

    # udf = USER DEFINED FUNCTION
    udf_avg_to_status = udf(fun, StringType())

    # avarage of senti_val column to status column
    new_df = sum_val_table.withColumn("status", udf_avg_to_status("avg_senti_val"))

    query = kafka_df_string.writeStream.format("console").option("truncate","false").start()

    query.awaitTermination()```
mo49yndu

mo49yndu2#

--<<<twitter主题\u平均情绪\u值.py>>>>>

iyzzxitl

iyzzxitl3#

--------<tweet\u listener.py>>>

huwehgph

huwehgph4#


from tweepy import Stream
from tweepy.streaming import StreamListener
import json
import twitter_config
import pykafka
from afinn import Afinn
import sys
from sys import exit

class TweetListener(StreamListener):
    def __init__(self):
        self.client = pykafka.KafkaClient("localhost:9092")
        self.producer = self.client.topics[bytes('twitter3','ascii')].get_producer()

    def on_data(self, data):
        try:
            json_data = json.loads(data)

            send_data = '{}'
            json_send_data = json.loads(send_data)          
            json_send_data['text'] = json_data['text']
            json_send_data['senti_val']=afinn.score(json_data['text'])

            print(json_send_data['text'], " >>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>> ", json_send_data['senti_val'])

            self.producer.produce(bytes(json.dumps(json_send_data),'ascii'))
            return True
        except KeyError:
            return True

    def on_error(self, status):
        print(status)
        return True

consumer_key = "xxxxxxxxxx"
consumer_secret = "xxxxxxxxxxx"
access_token = "xxxxxxxxxxxx"
access_secret = "xxxxxxxxxx"

auth = OAuthHandler(consumer_key, consumer_secret)
auth.set_access_token(access_token, access_secret)

# create AFINN object for sentiment analysis

afinn = Afinn()

twitter_stream = Stream(auth, TweetListener())
twitter_stream.filter(languages=['en'], track=["big data"])
3j86kqsm

3j86kqsm5#

在我下载并复制这个jar文件之后 spark-token-provider-kafka-0-10 到spark jars文件夹(或添加到spark\u classpath),我的问题解决了。

相关问题