kinesis spark流媒体集成-无法输出数据流内容

6jjcrrmo  于 2021-05-18  发布在  Spark
关注(0)|答案(0)|浏览(336)

使用python,我想创建一个简单的体系结构来打印在kinesis中的数据流,然后发送给spark stream对象。我在ec2示例中运行所有的东西。
我的数据生产者是一个运动代理监控 /var/documents/ 目录。代理日志文件似乎正在解析记录并将它们发送到目标,但不知何故,当我打印dstream对象时,什么都没有显示。
我的源代码:

import boto3, random, time
from pyspark import SparkConf, SparkContext
from pyspark.streaming import StreamingContext
from pyspark.streaming.kinesis import KinesisUtils, InitialPositionInStream

conf = SparkConf().setAppName("KinesisSparkBigDataPipeline")

sc = SparkContext(conf = conf)
sc.setLogLevel("ERROR")
ssc = StreamingContext(sc, 2)

def createStream():
    """
    Function that creates a DStream Object coming from Kinesis Stream.

    Returns:
        sparkDStream => DStream object created from records in the Kinesis Stream.
    """
    kinesisAppName = ("KinesisStreamTests-%d" % abs(random.randint(0, 10000000)))
    sparkDStream = KinesisUtils.createStream(
            ssc,
            kinesisAppName,
            "EntryPoints",
            "https://kinesis.eu-central-1.amazonaws.com",
            "eu-central-1",
            InitialPositionInStream.LATEST,
            2
    )
    return sparkDStream

if __name__ == "__main__":
    try:
        kinesisStream = createStream()
        kinesisStream.pprint()

        ssc.start()
        time.sleep(60)
        ssc.stop()
        # ssc.awaitTermination()
    except Exception as e:
        print(e)

运行命令时的输出: spark-submit --packages org.apache.spark:spark-streaming-kinesis-asl_2.11:2.4.4 poc_bigdata_pipeline.py 是:

-------------------------------------------
Time: 2020-11-03 11:09:52
-------------------------------------------

-------------------------------------------
Time: 2020-11-03 11:09:54
-------------------------------------------

...

我做错什么了吗?如果我忘记了我的问题的任何重要信息,请原谅我,我是一个新手。
谢谢你看这个。

暂无答案!

目前还没有任何答案,快来回答吧!

相关问题