使用python,我想创建一个简单的体系结构来打印在kinesis中的数据流,然后发送给spark stream对象。我在ec2示例中运行所有的东西。
我的数据生产者是一个运动代理监控 /var/documents/
目录。代理日志文件似乎正在解析记录并将它们发送到目标,但不知何故,当我打印dstream对象时,什么都没有显示。
我的源代码:
import boto3, random, time
from pyspark import SparkConf, SparkContext
from pyspark.streaming import StreamingContext
from pyspark.streaming.kinesis import KinesisUtils, InitialPositionInStream
conf = SparkConf().setAppName("KinesisSparkBigDataPipeline")
sc = SparkContext(conf = conf)
sc.setLogLevel("ERROR")
ssc = StreamingContext(sc, 2)
def createStream():
"""
Function that creates a DStream Object coming from Kinesis Stream.
Returns:
sparkDStream => DStream object created from records in the Kinesis Stream.
"""
kinesisAppName = ("KinesisStreamTests-%d" % abs(random.randint(0, 10000000)))
sparkDStream = KinesisUtils.createStream(
ssc,
kinesisAppName,
"EntryPoints",
"https://kinesis.eu-central-1.amazonaws.com",
"eu-central-1",
InitialPositionInStream.LATEST,
2
)
return sparkDStream
if __name__ == "__main__":
try:
kinesisStream = createStream()
kinesisStream.pprint()
ssc.start()
time.sleep(60)
ssc.stop()
# ssc.awaitTermination()
except Exception as e:
print(e)
运行命令时的输出: spark-submit --packages org.apache.spark:spark-streaming-kinesis-asl_2.11:2.4.4 poc_bigdata_pipeline.py
是:
-------------------------------------------
Time: 2020-11-03 11:09:52
-------------------------------------------
-------------------------------------------
Time: 2020-11-03 11:09:54
-------------------------------------------
...
我做错什么了吗?如果我忘记了我的问题的任何重要信息,请原谅我,我是一个新手。
谢谢你看这个。
暂无答案!
目前还没有任何答案,快来回答吧!