spark流返回本地的结果,而不是Yarn的结果

68de4m5k  于 2021-06-07  发布在  Kafka
关注(0)|答案(0)|浏览(141)

我使用的是cloudera的vmcdh5.12、spark v1.6、kafka(由yum安装)v0.10和python2.66。我正在关注这个链接
Spark设置:

下面是我正在运行的一个简单的spark应用程序。它从Kafka那里获取事件,并在Map缩小后打印出来。

from __future__ import print_function

import sys

from pyspark import SparkContext
from pyspark.streaming import StreamingContext
from pyspark.streaming.kafka import KafkaUtils

if __name__ == "__main__":
    if len(sys.argv) != 3:
        print("Usage: kafka_wordcount.py <zk> <topic>", file=sys.stderr)
        exit(-1)

    sc = SparkContext(appName="PythonStreamingKafkaWordCount")
    ssc = StreamingContext(sc, 1)
zkQuorum, topic = sys.argv[1:]
kvs = KafkaUtils.createStream(ssc, zkQuorum, "spark-streaming-consumer", {topic: 1})
lines = kvs.map(lambda x: x[1])
counts = lines.flatMap(lambda line: line.split(" ")) \
    .map(lambda word: (word, 1)) \
    .reduceByKey(lambda a, b: a+b)
counts.pprint()

ssc.start()
ssc.awaitTermination()

请注意,当我使用以下命令(local)提交上述代码时,它运行良好

spark-submit --master local[2] --jars /usr/lib/spark/lib/spark-examples.jar testfile.py <ZKhostname>:2181 <kafka-topic>

当我使用下面的命令(yarn)提交相同的代码时,它不起作用

spark-submit --master yarn --deploy-mode client --jars /usr/lib/spark/lib/spark-examples.jar testfile.py <ZKhostname>:2181 <kafka-topic>

以下是在Yarn上运行时生成的日志(短切,日志可能与上述Spark设置不同):

INFO Client: 
client token: N/A
diagnostics: N/A
ApplicationMaster host: 192.168.134.143
ApplicationMaster RPC port: 0
queue: root.cloudera
start time: 1515766709025
final status: UNDEFINED
tracking URL: http://quickstart.cloudera:8088/proxy/application_1515761416282_0010/
user: cloudera

40 INFO YarnClientSchedulerBackend: Application application_1515761416282_0010 has started running.
40 INFO Utils: Successfully started service 'org.apache.spark.network.netty.NettyBlockTransferService' on port 53694.
40 INFO NettyBlockTransferService: Server created on 53694
53 INFO YarnClientSchedulerBackend: SchedulerBackend is ready for scheduling beginning after waiting maxRegisteredResourcesWaitingTime: 30000(ms)
54 INFO BlockManagerMasterEndpoint: Registering block manager quickstart.cloudera:56220 with 534.5 MB RAM, BlockManagerId(1, quickstart.cloudera, 56220)
07 INFO ReceiverTracker: Starting 1 receivers
07 INFO ReceiverTracker: ReceiverTracker started
07 INFO PythonTransformedDStream: metadataCleanupDelay = -1
07 INFO KafkaInputDStream: metadataCleanupDelay = -1
07 INFO KafkaInputDStream: Slide time = 10000 ms
07 INFO KafkaInputDStream: Storage level = StorageLevel(false, false, false, false, 1)
07 INFO KafkaInputDStream: Checkpoint interval = null
07 INFO KafkaInputDStream: Remember duration = 10000 ms
07 INFO KafkaInputDStream: Initialized and validated org.apache.spark.streaming.kafka.KafkaInputDStream@7137ea0e
07 INFO PythonTransformedDStream: Slide time = 10000 ms
07 INFO PythonTransformedDStream: Storage level = StorageLevel(false, false, false, false, 1)
07 INFO PythonTransformedDStream: Checkpoint interval = null
07 INFO PythonTransformedDStream: Remember duration = 10000 ms
07 INFO PythonTransformedDStream: Initialized and validated org.apache.spark.streaming.api.python.PythonTransformedDStream@de77734

10 INFO MemoryStore: Block broadcast_2 stored as values in memory (estimated size 5.8 KB, free 534.5 MB)
10 INFO MemoryStore: Block broadcast_2_piece0 stored as bytes in memory (estimated size 3.5 KB, free 534.5 MB)
20 INFO JobScheduler: Added jobs for time 1515766760000 ms
30 INFO JobScheduler: Added jobs for time 1515766770000 ms
40 INFO JobScheduler: Added jobs for time 1515766780000 ms

在此之后,作业只是开始重复下面的行(在流上下文设置的延迟之后),并且不打印kafka的流,而master local上的作业使用完全相同的代码。

有趣的是,每次Kafka事件发生时,它都会打印下面的一行(图片显示的是增加的Spark记忆设置)

请注意:
数据是在Kafka和我可以看到,在消费者控制台
我也尝试过增加executor的momory(3g)和网络超时时间(800秒),但没有成功

暂无答案!

目前还没有任何答案,快来回答吧!

相关问题