使用flask将tweet流式传输到控制台

lyr7nygr  于 2021-06-13  发布在  ElasticSearch
关注(0)|答案(0)|浏览(212)

用 flask ,我试着:
获取用户输入
根据输入过滤数据流
处理过滤数据
将结果重定向到kibana Jmeter 板以进行可视化
streamer.py ,我有办法 stream_tweets 根据输入流推特。
当我跑的时候 streamer.py 使用python,我可以在终端上获取tweet,并在kibana Jmeter 板上查看可视化效果。

class Streamer():

    def __init__(self):
        self.twitter_authenticator = TwitterAuthenticator()

    def stream_tweets(self, hash_tag_list):
        listener = TwitterListener()
        auth = self.twitter_authenticator.authenticate_twitter_app()
        stream = Stream(auth, listener)
        stream.filter(track=hash_tag_list, is_async=True)

def main():

    tweet_streamer = Streamer()
    hash_tag = ['Arsenal']
    tweet_streamer.stream_tweets(hash_tag)

if __name__ == '__main__':
    main()

此文件调用 listener.py ,它使用textblob库处理数据。

import json

from tweepy.streaming import StreamListener
from textblob import TextBlob
from elasticsearch import Elasticsearch

class TwitterListener(StreamListener):

    def __init__(self):
        self.es = Elasticsearch()
        self.es.indices.create(index='twitter', ignore=400)

    def on_data(self, data):
        try:
            dict_data = json.loads(data)
            tweet = TextBlob(dict_data["text"])
            print(tweet.sentiment.polarity)

            if (tweet.sentiment.polarity == 0):
                sentiment = "neutral"
            elif (tweet.sentiment.polarity > 0 and tweet.sentiment.polarity <= 0.3):
                sentiment = "weak positive"
            elif (tweet.sentiment.polarity > 0.3 and tweet.sentiment.polarity <= 0.6):
                sentiment = "positive"
            elif (tweet.sentiment.polarity > 0.6 and tweet.sentiment.polarity <= 1):
                sentiment = "strong positive"
            elif (tweet.sentiment.polarity > -0.3 and tweet.sentiment.polarity <= 0):
                sentiment = "weak negative"
            elif (tweet.sentiment.polarity > -0.6 and tweet.sentiment.polarity <= -0.3):
                sentiment = "negative"
            else: # (tweet.sentiment.polarity > -1 and tweet.sentiment.polarity <= -0.6):
                sentiment = "strong negative"

            print(sentiment)

            self.es.index(
                index = "twitter",
                doc_type = "test-type",
                body = {
                    "author": dict_data["user"]["screen_name"],
                    "date": dict_data["created_at"],
                    "message": dict_data["text"],
                    "polarity": tweet.sentiment.polarity,
                    "subjectivity": tweet.sentiment.subjectivity,
                    "sentiment": sentiment
                })

            return True

        except Exception as e:
            print("Data Error:", str(e))
        return False

    def on_error(self, status):
        """Repeated access after rate limit will lock developer account."""
        if status == 420:
            return False
        print(status)

但是当我从flask中调用相同的方法时,结果模板被呈现,但是tweet流化的时间非常短——之后我得到数据错误:“text”
下面是 flask 微服务:

@app.route("/dynamic-analysis/", methods=["GET", "POST"])
def dynamic_analysis():
    tweet_streamer = Streamer()
    if request.method == "POST":
        hash_tag = request.form['dynamic_hashtag']
        tweet_streamer.stream_tweets(hash_tag)
        return render_template("query-dynamic-analysis.html")    # contains URL to Kibana dashboard
    else: # request.method == "GET":
        return render_template("dynamic-analysis.html")          # form to get user input

我试过打电话 render_template 有一个额外的数据属性 tweet_streamer.stream_tweets(hash_tag) ,但我仍然得到相同的错误。
我怎样才能解决这个问题?
更新:
我也试过使用 gunicorn 以及 foreman 具有以下配置: gunicorn.conf.py :

workers = 4
worker_class = 'sync'
loglevel = 'debug'
accesslog = '-'
errorlog  = '-'
``` `Procfile` :

flask_dynamic_analysis: gunicorn --bind 127.0.0.1:$PORT --config gunicorn.conf.py twitter_sentiment_analysis:app
microservices_routes: gunicorn --bind 127.0.0.1:$PORT --config gunicorn.conf.py app.routes:app
``` .foreman :

formation: flask_dynamic_analysis=2, microservices_routes=2
port: 3000

但仍然得到同样的错误。

暂无答案!

目前还没有任何答案,快来回答吧!

相关问题