spark结构化流式处理需要太多时间来计算聚合查询

au9on6nz  于 2021-06-08  发布在  Kafka
关注(0)|答案(1)|浏览(317)

我正在运行一个简单的spark structured streaming(v2.2.0)程序,它从kafka读取少量数据,并对它们执行一些聚合查询(使用“update”输出模式)。但是,我的程序在控制台打印出一到两个批的结果后,被卡住了很长一段时间(我尝试了很多次,并用scala和python开发了我的程序。例如,在下面的例子中,当它卡住时,我立即停止了为Kafka提供信息的程序,5分钟后,spark streaming继续工作。有人能理解为什么spark结构化流媒体甚至不能处理非常少量的数据吗?
我在同一台机器(MacBookPro 15英寸,2015年年中)上用默认配置运行zookeeper和kafka(v0.11.0.0),并以每秒5个字符串(每个字符串最多100个字符)的速度输入kafka。
以下是scala中的spark结构化流式代码:

val spark: SparkSession = SparkSession
    .builder
    .appName("StructuredNetworkFeatureExtractor")
    .master("local[*]")
    .getOrCreate()

  import spark.implicits._

  val input = spark
    .readStream
    .format("kafka")
    .option("kafka.bootstrap.servers", "localhost:9092")
    .option("subscribe", "mytopic")
    .load

  val result = input.withColumn("_tmp", split($"value", ",")).select(
    $"_tmp".getItem(0).as("src_ip"),
    $"_tmp".getItem(1).as("dst_ip"),
    $"_tmp".getItem(2).as("dst_port"),
    $"_tmp".getItem(3).as("protocol"),
    $"_tmp".getItem(4).as("start_time"),
    $"_tmp".getItem(5).as("end_time"),
    $"_tmp".getItem(6).as("packets")
  ).drop("_tmp").groupBy("src_ip", "dst_ip", "dst_port", "protocol").count()

  val query = result.writeStream
    .outputMode("update")
    .format("console")
    .start()

  query.awaitTermination()

程序在“append”输出模式下运行良好,在前一批或两批“update”输出模式下也运行良好。
以下是卡住之前的一些spark流日志:

17/08/15 12:41:08 INFO TaskSetManager: Finished task 186.0 in stage 11.0 (TID 605) in 43 ms on localhost (executor driver) (186/200)
17/08/15 12:41:08 INFO Executor: Running task 193.0 in stage 11.0 (TID 612)
17/08/15 12:41:08 INFO HDFSBackedStateStoreProvider: Retrieved version 2 of HDFSStateStoreProvider[id = (op=0, part=192), dir = checkpoint/8cc5a2fc-4245-40ef-a66f-113d42abc50c/state/0/192] for update
17/08/15 12:41:08 INFO HDFSBackedStateStoreProvider: Retrieved version 2 of HDFSStateStoreProvider[id = (op=0, part=192), dir = checkpoint/8cc5a2fc-4245-40ef-a66f-113d42abc50c/state/0/192] for update
17/08/15 12:41:08 INFO ShuffleBlockFetcherIterator: Getting 0 non-empty blocks out of 1 blocks
17/08/15 12:41:08 INFO ShuffleBlockFetcherIterator: Started 0 remote fetches in 0 ms
17/08/15 12:41:08 INFO HDFSBackedStateStoreProvider: Retrieved version 2 of HDFSStateStoreProvider[id = (op=0, part=193), dir = checkpoint/8cc5a2fc-4245-40ef-a66f-113d42abc50c/state/0/193] for update
17/08/15 12:41:08 INFO HDFSBackedStateStoreProvider: Retrieved version 2 of HDFSStateStoreProvider[id = (op=0, part=193), dir = checkpoint/8cc5a2fc-4245-40ef-a66f-113d42abc50c/state/0/193] for update
17/08/15 12:41:08 INFO ShuffleBlockFetcherIterator: Getting 0 non-empty blocks out of 1 blocks
17/08/15 12:41:08 INFO ShuffleBlockFetcherIterator: Started 0 remote fetches in 0 ms
17/08/15 12:41:08 INFO HDFSBackedStateStoreProvider: Committed version 3 for HDFSStateStore[id = (op=0, part=189), dir = checkpoint/8cc5a2fc-4245-40ef-a66f-113d42abc50c/state/0/189] to file checkpoint/8cc5a2fc-4245-40ef-a66f-113d42abc50c/state/0/189/3.delta
17/08/15 12:41:08 INFO Executor: Finished task 189.0 in stage 11.0 (TID 608). 3952 bytes result sent to driver
17/08/15 12:41:08 INFO TaskSetManager: Starting task 194.0 in stage 11.0 (TID 613, localhost, executor driver, partition 194, PROCESS_LOCAL, 6685 bytes)
17/08/15 12:41:08 INFO Executor: Running task 194.0 in stage 11.0 (TID 613)
17/08/15 12:41:08 INFO TaskSetManager: Finished task 189.0 in stage 11.0 (TID 608) in 38 ms on localhost (executor driver) (187/200)
17/08/15 12:41:08 INFO HDFSBackedStateStoreProvider: Committed version 3 for HDFSStateStore[id = (op=0, part=190), dir = checkpoint/8cc5a2fc-4245-40ef-a66f-113d42abc50c/state/0/190] to file checkpoint/8cc5a2fc-4245-40ef-a66f-113d42abc50c/state/0/190/3.delta
17/08/15 12:41:08 INFO HDFSBackedStateStoreProvider: Retrieved version 2 of HDFSStateStoreProvider[id = (op=0, part=194), dir = checkpoint/8cc5a2fc-4245-40ef-a66f-113d42abc50c/state/0/194] for update
17/08/15 12:41:08 INFO HDFSBackedStateStoreProvider: Retrieved version 2 of HDFSStateStoreProvider[id = (op=0, part=194), dir = checkpoint/8cc5a2fc-4245-40ef-a66f-113d42abc50c/state/0/194] for update
17/08/15 12:41:08 INFO ShuffleBlockFetcherIterator: Getting 0 non-empty blocks out of 1 blocks
17/08/15 12:41:08 INFO ShuffleBlockFetcherIterator: Started 0 remote fetches in 0 ms
17/08/15 12:41:08 INFO Executor: Finished task 190.0 in stage 11.0 (TID 609). 3952 bytes result sent to driver
17/08/15 12:41:08 INFO TaskSetManager: Starting task 195.0 in stage 11.0 (TID 614, localhost, executor driver, partition 195, PROCESS_LOCAL, 6685 bytes)
17/08/15 12:41:08 INFO Executor: Running task 195.0 in stage 11.0 (TID 614)
17/08/15 12:41:08 INFO TaskSetManager: Finished task 190.0 in stage 11.0 (TID 609) in 33 ms on localhost (executor driver) (188/200)
17/08/15 12:41:08 INFO HDFSBackedStateStoreProvider: Committed version 3 for HDFSStateStore[id = (op=0, part=191), dir = checkpoint/8cc5a2fc-4245-40ef-a66f-113d42abc50c/state/0/191] to file checkpoint/8cc5a2fc-4245-40ef-a66f-113d42abc50c/state/0/191/3.delta
17/08/15 12:41:08 INFO HDFSBackedStateStoreProvider: Retrieved version 2 of HDFSStateStoreProvider[id = (op=0, part=195), dir = checkpoint/8cc5a2fc-4245-40ef-a66f-113d42abc50c/state/0/195] for update
17/08/15 12:41:08 INFO HDFSBackedStateStoreProvider: Retrieved version 2 of HDFSStateStoreProvider[id = (op=0, part=195), dir = checkpoint/8cc5a2fc-4245-40ef-a66f-113d42abc50c/state/0/195] for update
17/08/15 12:41:08 INFO ShuffleBlockFetcherIterator: Getting 0 non-empty blocks out of 1 blocks
17/08/15 12:41:08 INFO ShuffleBlockFetcherIterator: Started 0 remote fetches in 0 ms
17/08/15 12:41:08 INFO Executor: Finished task 191.0 in stage 11.0 (TID 610). 3952 bytes result sent to driver
17/08/15 12:41:08 INFO TaskSetManager: Starting task 196.0 in stage 11.0 (TID 615, localhost, executor driver, partition 196, PROCESS_LOCAL, 6685 bytes)
17/08/15 12:41:08 INFO TaskSetManager: Finished task 191.0 in stage 11.0 (TID 610) in 16 ms on localhost (executor driver) (189/200)
17/08/15 12:41:08 INFO Executor: Running task 196.0 in stage 11.0 (TID 615)
17/08/15 12:41:08 INFO HDFSBackedStateStoreProvider: Committed version 3 for HDFSStateStore[id = (op=0, part=192), dir = checkpoint/8cc5a2fc-4245-40ef-a66f-113d42abc50c/state/0/192] to file checkpoint/8cc5a2fc-4245-40ef-a66f-113d42abc50c/state/0/192/3.delta
17/08/15 12:41:08 INFO HDFSBackedStateStoreProvider: Committed version 3 for HDFSStateStore[id = (op=0, part=193), dir = checkpoint/8cc5a2fc-4245-40ef-a66f-113d42abc50c/state/0/193] to file checkpoint/8cc5a2fc-4245-40ef-a66f-113d42abc50c/state/0/193/3.delta
17/08/15 12:41:08 INFO Executor: Finished task 192.0 in stage 11.0 (TID 611). 3952 bytes result sent to driver
17/08/15 12:41:08 INFO Executor: Finished task 193.0 in stage 11.0 (TID 612). 3952 bytes result sent to driver
17/08/15 12:41:08 INFO TaskSetManager: Starting task 197.0 in stage 11.0 (TID 616, localhost, executor driver, partition 197, PROCESS_LOCAL, 6685 bytes)
17/08/15 12:41:08 INFO Executor: Running task 197.0 in stage 11.0 (TID 616)
17/08/15 12:41:08 INFO TaskSetManager: Starting task 198.0 in stage 11.0 (TID 617, localhost, executor driver, partition 198, PROCESS_LOCAL, 6685 bytes)
17/08/15 12:41:08 INFO TaskSetManager: Finished task 192.0 in stage 11.0 (TID 611) in 16 ms on localhost (executor driver) (190/200)
17/08/15 12:41:08 INFO Executor: Running task 198.0 in stage 11.0 (TID 617)
17/08/15 12:41:08 INFO TaskSetManager: Finished task 193.0 in stage 11.0 (TID 612) in 14 ms on localhost (executor driver) (191/200)
17/08/15 12:41:08 INFO HDFSBackedStateStoreProvider: Retrieved version 2 of HDFSStateStoreProvider[id = (op=0, part=196), dir = checkpoint/8cc5a2fc-4245-40ef-a66f-113d42abc50c/state/0/196] for update
17/08/15 12:41:08 INFO HDFSBackedStateStoreProvider: Retrieved version 2 of HDFSStateStoreProvider[id = (op=0, part=196), dir = checkpoint/8cc5a2fc-4245-40ef-a66f-113d42abc50c/state/0/196] for update
17/08/15 12:41:08 INFO ShuffleBlockFetcherIterator: Getting 0 non-empty blocks out of 1 blocks
17/08/15 12:41:08 INFO ShuffleBlockFetcherIterator: Started 0 remote fetches in 0 ms
17/08/15 12:41:08 INFO HDFSBackedStateStoreProvider: Retrieved version 2 of HDFSStateStoreProvider[id = (op=0, part=198), dir = checkpoint/8cc5a2fc-4245-40ef-a66f-113d42abc50c/state/0/198] for update
17/08/15 12:41:08 INFO HDFSBackedStateStoreProvider: Retrieved version 2 of HDFSStateStoreProvider[id = (op=0, part=198), dir = checkpoint/8cc5a2fc-4245-40ef-a66f-113d42abc50c/state/0/198] for update
17/08/15 12:41:08 INFO HDFSBackedStateStoreProvider: Retrieved version 2 of HDFSStateStoreProvider[id = (op=0, part=197), dir = checkpoint/8cc5a2fc-4245-40ef-a66f-113d42abc50c/state/0/197] for update
17/08/15 12:41:08 INFO ShuffleBlockFetcherIterator: Getting 0 non-empty blocks out of 1 blocks
17/08/15 12:41:08 INFO ShuffleBlockFetcherIterator: Started 0 remote fetches in 0 ms
17/08/15 12:41:08 INFO HDFSBackedStateStoreProvider: Retrieved version 2 of HDFSStateStoreProvider[id = (op=0, part=197), dir = checkpoint/8cc5a2fc-4245-40ef-a66f-113d42abc50c/state/0/197] for update
17/08/15 12:41:08 INFO ShuffleBlockFetcherIterator: Getting 0 non-empty blocks out of 1 blocks
17/08/15 12:41:08 INFO ShuffleBlockFetcherIterator: Started 0 remote fetches in 0 ms
17/08/15 12:41:08 INFO HDFSBackedStateStoreProvider: Committed version 3 for HDFSStateStore[id = (op=0, part=194), dir = checkpoint/8cc5a2fc-4245-40ef-a66f-113d42abc50c/state/0/194] to file checkpoint/8cc5a2fc-4245-40ef-a66f-113d42abc50c/state/0/194/3.delta
17/08/15 12:41:08 INFO Executor: Finished task 194.0 in stage 11.0 (TID 613). 3952 bytes result sent to driver
17/08/15 12:41:08 INFO TaskSetManager: Starting task 199.0 in stage 11.0 (TID 618, localhost, executor driver, partition 199, PROCESS_LOCAL, 6685 bytes)
17/08/15 12:41:08 INFO TaskSetManager: Finished task 194.0 in stage 11.0 (TID 613) in 15 ms on localhost (executor driver) (192/200)
17/08/15 12:41:08 INFO Executor: Running task 199.0 in stage 11.0 (TID 618)
17/08/15 12:41:08 INFO HDFSBackedStateStoreProvider: Committed version 3 for HDFSStateStore[id = (op=0, part=195), dir = checkpoint/8cc5a2fc-4245-40ef-a66f-113d42abc50c/state/0/195] to file checkpoint/8cc5a2fc-4245-40ef-a66f-113d42abc50c/state/0/195/3.delta
17/08/15 12:41:08 INFO Executor: Finished task 195.0 in stage 11.0 (TID 614). 3952 bytes result sent to driver
17/08/15 12:41:08 INFO TaskSetManager: Finished task 195.0 in stage 11.0 (TID 614) in 16 ms on localhost (executor driver) (193/200)
17/08/15 12:41:08 INFO HDFSBackedStateStoreProvider: Retrieved version 2 of HDFSStateStoreProvider[id = (op=0, part=199), dir = checkpoint/8cc5a2fc-4245-40ef-a66f-113d42abc50c/state/0/199] for update
17/08/15 12:41:08 INFO HDFSBackedStateStoreProvider: Retrieved version 2 of HDFSStateStoreProvider[id = (op=0, part=199), dir = checkpoint/8cc5a2fc-4245-40ef-a66f-113d42abc50c/state/0/199] for update
17/08/15 12:41:08 INFO ShuffleBlockFetcherIterator: Getting 0 non-empty blocks out of 1 blocks
17/08/15 12:41:08 INFO ShuffleBlockFetcherIterator: Started 0 remote fetches in 0 ms
17/08/15 12:41:08 INFO HDFSBackedStateStoreProvider: Committed version 3 for HDFSStateStore[id = (op=0, part=197), dir = checkpoint/8cc5a2fc-4245-40ef-a66f-113d42abc50c/state/0/197] to file checkpoint/8cc5a2fc-4245-40ef-a66f-113d42abc50c/state/0/197/3.delta
17/08/15 12:41:08 INFO HDFSBackedStateStoreProvider: Committed version 3 for HDFSStateStore[id = (op=0, part=196), dir = checkpoint/8cc5a2fc-4245-40ef-a66f-113d42abc50c/state/0/196] to file checkpoint/8cc5a2fc-4245-40ef-a66f-113d42abc50c/state/0/196/3.delta
17/08/15 12:41:08 INFO HDFSBackedStateStoreProvider: Committed version 3 for HDFSStateStore[id = (op=0, part=198), dir = checkpoint/8cc5a2fc-4245-40ef-a66f-113d42abc50c/state/0/198] to file checkpoint/8cc5a2fc-4245-40ef-a66f-113d42abc50c/state/0/198/3.delta
17/08/15 12:41:08 INFO Executor: Finished task 196.0 in stage 11.0 (TID 615). 3952 bytes result sent to driver
17/08/15 12:41:08 INFO Executor: Finished task 197.0 in stage 11.0 (TID 616). 3952 bytes result sent to driver
17/08/15 12:41:08 INFO Executor: Finished task 198.0 in stage 11.0 (TID 617). 3952 bytes result sent to driver
17/08/15 12:41:08 INFO TaskSetManager: Finished task 196.0 in stage 11.0 (TID 615) in 17 ms on localhost (executor driver) (194/200)
17/08/15 12:41:08 INFO TaskSetManager: Finished task 197.0 in stage 11.0 (TID 616) in 15 ms on localhost (executor driver) (195/200)
17/08/15 12:41:08 INFO TaskSetManager: Finished task 198.0 in stage 11.0 (TID 617) in 15 ms on localhost (executor driver) (196/200)
17/08/15 12:41:08 INFO HDFSBackedStateStoreProvider: Committed version 3 for HDFSStateStore[id = (op=0, part=199), dir = checkpoint/8cc5a2fc-4245-40ef-a66f-113d42abc50c/state/0/199] to file checkpoint/8cc5a2fc-4245-40ef-a66f-113d42abc50c/state/0/199/3.delta
17/08/15 12:41:08 INFO Executor: Finished task 199.0 in stage 11.0 (TID 618). 3952 bytes result sent to driver
17/08/15 12:41:08 INFO TaskSetManager: Finished task 199.0 in stage 11.0 (TID 618) in 15 ms on localhost (executor driver) (197/200)

它在12:41:08被卡住了,5分钟后继续:

17/08/15 12:46:07 INFO HDFSBackedStateStoreProvider: Committed version 3 for HDFSStateStore[id = (op=0, part=77), dir = checkpoint/8cc5a2fc-4245-40ef-a66f-113d42abc50c/state/0/77] to file checkpoint/8cc5a2fc-4245-40ef-a66f-113d42abc50c/state/0/77/3.delta
17/08/15 12:46:07 INFO HDFSBackedStateStoreProvider: Committed version 3 for HDFSStateStore[id = (op=0, part=27), dir = checkpoint/8cc5a2fc-4245-40ef-a66f-113d42abc50c/state/0/27] to file checkpoint/8cc5a2fc-4245-40ef-a66f-113d42abc50c/state/0/27/3.delta
17/08/15 12:46:07 INFO Executor: Finished task 77.0 in stage 11.0 (TID 496). 4025 bytes result sent to driver
17/08/15 12:46:07 INFO Executor: Finished task 27.0 in stage 11.0 (TID 446). 4116 bytes result sent to driver
17/08/15 12:46:07 INFO TaskSetManager: Finished task 77.0 in stage 11.0 (TID 496) in 300007 ms on localhost (executor driver) (198/200)
17/08/15 12:46:07 INFO TaskSetManager: Finished task 27.0 in stage 11.0 (TID 446) in 300649 ms on localhost (executor driver) (199/200)
17/08/15 12:46:07 INFO HDFSBackedStateStoreProvider: Committed version 3 for HDFSStateStore[id = (op=0, part=78), dir = checkpoint/8cc5a2fc-4245-40ef-a66f-113d42abc50c/state/0/78] to file checkpoint/8cc5a2fc-4245-40ef-a66f-113d42abc50c/state/0/78/3.delta
17/08/15 12:46:07 INFO Executor: Finished task 78.0 in stage 11.0 (TID 497). 4025 bytes result sent to driver
17/08/15 12:46:07 INFO TaskSetManager: Finished task 78.0 in stage 11.0 (TID 497) in 300012 ms on localhost (executor driver) (200/200)
17/08/15 12:46:07 INFO TaskSchedulerImpl: Removed TaskSet 11.0, whose tasks have all completed, from pool 
17/08/15 12:46:07 INFO DAGScheduler: ResultStage 11 (start at FeatureExtractor.scala:97) finished in 302.767 s
17/08/15 12:46:07 INFO DAGScheduler: Job 8 finished: start at FeatureExtractor.scala:97, took 302.903295 s
17/08/15 12:46:07 INFO SparkContext: Starting job: start at FeatureExtractor.scala:97
17/08/15 12:46:07 INFO DAGScheduler: Got job 9 (start at FeatureExtractor.scala:97) with 1 output partitions
17/08/15 12:46:07 INFO DAGScheduler: Final stage: ResultStage 12 (start at FeatureExtractor.scala:97)
17/08/15 12:46:07 INFO DAGScheduler: Parents of final stage: List()
17/08/15 12:46:07 INFO DAGScheduler: Missing parents: List()
17/08/15 12:46:07 INFO DAGScheduler: Submitting ResultStage 12 (MapPartitionsRDD[47] at start at FeatureExtractor.scala:97), which has no missing parents
17/08/15 12:46:07 INFO MemoryStore: Block broadcast_18 stored as values in memory (estimated size 9.6 KB, free 2003.5 MB)
17/08/15 12:46:07 INFO MemoryStore: Block broadcast_18_piece0 stored as bytes in memory (estimated size 4.8 KB, free 2003.5 MB)
17/08/15 12:46:07 INFO BlockManagerInfo: Added broadcast_18_piece0 in memory on 10.249.202.140:50853 (size: 4.8 KB, free: 2004.4 MB)
17/08/15 12:46:07 INFO SparkContext: Created broadcast 18 from broadcast at DAGScheduler.scala:996
17/08/15 12:46:07 INFO DAGScheduler: Submitting 1 missing tasks from ResultStage 12 (MapPartitionsRDD[47] at start at FeatureExtractor.scala:97)
17/08/15 12:46:07 INFO TaskSchedulerImpl: Adding task set 12.0 with 1 tasks
17/08/15 12:46:07 INFO TaskSetManager: Starting task 0.0 in stage 12.0 (TID 619, localhost, executor driver, partition 0, PROCESS_LOCAL, 7995 bytes)
17/08/15 12:46:07 INFO Executor: Running task 0.0 in stage 12.0 (TID 619)
17/08/15 12:46:07 INFO Executor: Finished task 0.0 in stage 12.0 (TID 619). 1304 bytes result sent to driver
17/08/15 12:46:07 INFO TaskSetManager: Finished task 0.0 in stage 12.0 (TID 619) in 5 ms on localhost (executor driver) (1/1)
17/08/15 12:46:07 INFO TaskSchedulerImpl: Removed TaskSet 12.0, whose tasks have all completed, from pool 
17/08/15 12:46:07 INFO DAGScheduler: ResultStage 12 (start at FeatureExtractor.scala:97) finished in 0.005 s
17/08/15 12:46:07 INFO DAGScheduler: Job 9 finished: start at FeatureExtractor.scala:97, took 0.010624 s
17/08/15 12:46:07 INFO SparkContext: Starting job: start at FeatureExtractor.scala:97
17/08/15 12:46:07 INFO DAGScheduler: Got job 10 (start at FeatureExtractor.scala:97) with 4 output partitions
17/08/15 12:46:07 INFO DAGScheduler: Final stage: ResultStage 13 (start at FeatureExtractor.scala:97)
17/08/15 12:46:07 INFO DAGScheduler: Parents of final stage: List()
17/08/15 12:46:07 INFO DAGScheduler: Missing parents: List()
17/08/15 12:46:07 INFO DAGScheduler: Submitting ResultStage 13 (MapPartitionsRDD[47] at start at FeatureExtractor.scala:97), which has no missing parents
17/08/15 12:46:07 INFO MemoryStore: Block broadcast_19 stored as values in memory (estimated size 9.6 KB, free 2003.5 MB)
17/08/15 12:46:07 INFO MemoryStore: Block broadcast_19_piece0 stored as bytes in memory (estimated size 4.8 KB, free 2003.5 MB)
17/08/15 12:46:07 INFO BlockManagerInfo: Added broadcast_19_piece0 in memory on 10.249.202.140:50853 (size: 4.8 KB, free: 2004.4 MB)
17/08/15 12:46:07 INFO SparkContext: Created broadcast 19 from broadcast at DAGScheduler.scala:996
17/08/15 12:46:07 INFO DAGScheduler: Submitting 4 missing tasks from ResultStage 13 (MapPartitionsRDD[47] at start at FeatureExtractor.scala:97)
17/08/15 12:46:07 INFO TaskSchedulerImpl: Adding task set 13.0 with 4 tasks
17/08/15 12:46:07 INFO TaskSetManager: Starting task 0.0 in stage 13.0 (TID 620, localhost, executor driver, partition 1, PROCESS_LOCAL, 7995 bytes)
17/08/15 12:46:07 INFO TaskSetManager: Starting task 1.0 in stage 13.0 (TID 621, localhost, executor driver, partition 2, PROCESS_LOCAL, 8000 bytes)
17/08/15 12:46:07 INFO TaskSetManager: Starting task 2.0 in stage 13.0 (TID 622, localhost, executor driver, partition 3, PROCESS_LOCAL, 7997 bytes)
17/08/15 12:46:07 INFO TaskSetManager: Starting task 3.0 in stage 13.0 (TID 623, localhost, executor driver, partition 4, PROCESS_LOCAL, 7987 bytes)
17/08/15 12:46:07 INFO Executor: Running task 0.0 in stage 13.0 (TID 620)
17/08/15 12:46:07 INFO Executor: Running task 1.0 in stage 13.0 (TID 621)
17/08/15 12:46:07 INFO Executor: Running task 2.0 in stage 13.0 (TID 622)
17/08/15 12:46:07 INFO Executor: Running task 3.0 in stage 13.0 (TID 623)
17/08/15 12:46:07 INFO Executor: Finished task 0.0 in stage 13.0 (TID 620). 1289 bytes result sent to driver
17/08/15 12:46:07 INFO Executor: Finished task 1.0 in stage 13.0 (TID 621). 1305 bytes result sent to driver
17/08/15 12:46:07 INFO Executor: Finished task 3.0 in stage 13.0 (TID 623). 1297 bytes result sent to driver
17/08/15 12:46:07 INFO Executor: Finished task 2.0 in stage 13.0 (TID 622). 1308 bytes result sent to driver
17/08/15 12:46:07 INFO TaskSetManager: Finished task 0.0 in stage 13.0 (TID 620) in 4 ms on localhost (executor driver) (1/4)
17/08/15 12:46:07 INFO TaskSetManager: Finished task 3.0 in stage 13.0 (TID 623) in 4 ms on localhost (executor driver) (2/4)
17/08/15 12:46:07 INFO TaskSetManager: Finished task 1.0 in stage 13.0 (TID 621) in 4 ms on localhost (executor driver) (3/4)
17/08/15 12:46:07 INFO TaskSetManager: Finished task 2.0 in stage 13.0 (TID 622) in 4 ms on localhost (executor driver) (4/4)
17/08/15 12:46:07 INFO TaskSchedulerImpl: Removed TaskSet 13.0, whose tasks have all completed, from pool 
17/08/15 12:46:07 INFO DAGScheduler: ResultStage 13 (start at FeatureExtractor.scala:97) finished in 0.005 s
17/08/15 12:46:07 INFO DAGScheduler: Job 10 finished: start at FeatureExtractor.scala:97, took 0.006888 s
17/08/15 12:46:07 INFO SparkContext: Starting job: start at FeatureExtractor.scala:97
17/08/15 12:46:07 INFO DAGScheduler: Got job 11 (start at FeatureExtractor.scala:97) with 3 output partitions
17/08/15 12:46:07 INFO DAGScheduler: Final stage: ResultStage 14 (start at FeatureExtractor.scala:97)
17/08/15 12:46:07 INFO DAGScheduler: Parents of final stage: List()
17/08/15 12:46:07 INFO DAGScheduler: Missing parents: List()
17/08/15 12:46:07 INFO DAGScheduler: Submitting ResultStage 14 (MapPartitionsRDD[47] at start at FeatureExtractor.scala:97), which has no missing parents
17/08/15 12:46:07 INFO MemoryStore: Block broadcast_20 stored as values in memory (estimated size 9.6 KB, free 2003.5 MB)
17/08/15 12:46:07 INFO MemoryStore: Block broadcast_20_piece0 stored as bytes in memory (estimated size 4.8 KB, free 2003.5 MB)
17/08/15 12:46:07 INFO BlockManagerInfo: Added broadcast_20_piece0 in memory on 10.249.202.140:50853 (size: 4.8 KB, free: 2004.4 MB)
17/08/15 12:46:07 INFO SparkContext: Created broadcast 20 from broadcast at DAGScheduler.scala:996
17/08/15 12:46:07 INFO DAGScheduler: Submitting 3 missing tasks from ResultStage 14 (MapPartitionsRDD[47] at start at FeatureExtractor.scala:97)
17/08/15 12:46:07 INFO TaskSchedulerImpl: Adding task set 14.0 with 3 tasks
17/08/15 12:46:07 INFO TaskSetManager: Starting task 0.0 in stage 14.0 (TID 624, localhost, executor driver, partition 5, PROCESS_LOCAL, 7993 bytes)
17/08/15 12:46:07 INFO TaskSetManager: Starting task 1.0 in stage 14.0 (TID 625, localhost, executor driver, partition 6, PROCESS_LOCAL, 7994 bytes)
17/08/15 12:46:07 INFO TaskSetManager: Starting task 2.0 in stage 14.0 (TID 626, localhost, executor driver, partition 7, PROCESS_LOCAL, 7992 bytes)
17/08/15 12:46:07 INFO Executor: Running task 0.0 in stage 14.0 (TID 624)
17/08/15 12:46:07 INFO Executor: Running task 1.0 in stage 14.0 (TID 625)
17/08/15 12:46:07 INFO Executor: Running task 2.0 in stage 14.0 (TID 626)

17/08/15 12:46:07 INFO Executor: Finished task 1.0 in stage 14.0 (TID 625). 1298 bytes result sent to driver
17/08/15 12:46:07 INFO Executor: Finished task 0.0 in stage 14.0 (TID 624). 1298 bytes result sent to driver
17/08/15 12:46:07 INFO Executor: Finished task 2.0 in stage 14.0 (TID 626). 1312 bytes result sent to driver
17/08/15 12:46:07 INFO TaskSetManager: Finished task 1.0 in stage 14.0 (TID 625) in 3 ms on localhost (executor driver) (1/3)
17/08/15 12:46:07 INFO TaskSetManager: Finished task 0.0 in stage 14.0 (TID 624) in 4 ms on localhost (executor driver) (2/3)
17/08/15 12:46:07 INFO TaskSetManager: Finished task 2.0 in stage 14.0 (TID 626) in 3 ms on localhost (executor driver) (3/3)
17/08/15 12:46:07 INFO TaskSchedulerImpl: Removed TaskSet 14.0, whose tasks have all completed, from pool 
17/08/15 12:46:07 INFO DAGScheduler: ResultStage 14 (start at FeatureExtractor.scala:97) finished in 0.004 s
17/08/15 12:46:07 INFO DAGScheduler: Job 11 finished: start at FeatureExtractor.scala:97, took 0.008861 s
17/08/15 12:46:07 INFO StreamExecution: Streaming query made progress: {
  "id" : "c84a0d40-ac3e-4a55-98c4-c63c14abcc12",
  "runId" : "ddadecb7-b3b9-4c1b-b742-90638b43e5a0",
  "name" : null,
  "timestamp" : "2017-08-15T19:41:03.971Z",
  "numInputRows" : 20,
  "inputRowsPerSecond" : 4.163197335553705,
  "processedRowsPerSecond" : 0.06597459318416478,
  "durationMs" : {
    "addBatch" : 303093,
    "getBatch" : 4,
    "getOffset" : 1,
    "queryPlanning" : 12,
    "triggerExecution" : 303147,
    "walCommit" : 32
  },
  "stateOperators" : [ {
    "numRowsTotal" : 28,
    "numRowsUpdated" : 16
  } ],
  "sources" : [ {
    "description" : "KafkaSource[Subscribe[mytopic]]",
    "startOffset" : {
      "mytopic" : {
        "0" : 101830
      }
    },
    "endOffset" : {
      "mytopic" : {
        "0" : 101850
      }
    },
    "numInputRows" : 20,
    "inputRowsPerSecond" : 4.163197335553705,
    "processedRowsPerSecond" : 0.06597459318416478
  } ],
  "sink" : {
    "description" : "org.apache.spark.sql.execution.streaming.ConsoleSink@6ceb7741"
  }
}

同时这些也是Kafka的日志(在控制台中):

[2017-08-15 12:40:54,327] INFO [GroupCoordinator 0]: Preparing to rebalance group spark-kafka-source-d7625848-02f2-4af7-9285-1ddd2b0906cf-1499473642-driver-0 with old generation 0 (__consumer_offsets-6) (kafka.coordinator.group.GroupCoordinator)
[2017-08-15 12:40:54,328] INFO [GroupCoordinator 0]: Stabilized group spark-kafka-source-d7625848-02f2-4af7-9285-1ddd2b0906cf-1499473642-driver-0 generation 1 (__consumer_offsets-6) (kafka.coordinator.group.GroupCoordinator)
[2017-08-15 12:40:54,334] INFO [GroupCoordinator 0]: Assignment received from leader for group spark-kafka-source-d7625848-02f2-4af7-9285-1ddd2b0906cf-1499473642-driver-0 for generation 1 (kafka.coordinator.group.GroupCoordinator)
[2017-08-15 12:40:54,335] INFO Updated PartitionLeaderEpoch. New: {epoch:0, offset:0}, Current: {epoch:-1, offset-1} for Partition: __consumer_offsets-6. Cache now contains 0 entries. (kafka.server.epoch.LeaderEpochFileCache)
[2017-08-15 12:43:08,939] INFO [Group Metadata Manager on Broker 0]: Removed 0 expired offsets in 0 milliseconds. (kafka.coordinator.group.GroupMetadataManager)
[2017-08-15 12:46:03,978] INFO [GroupCoordinator 0]: Preparing to rebalance group spark-kafka-source-d7625848-02f2-4af7-9285-1ddd2b0906cf-1499473642-driver-0 with old generation 1 (__consumer_offsets-6) (kafka.coordinator.group.GroupCoordinator)
[2017-08-15 12:46:03,980] INFO [GroupCoordinator 0]: Group spark-kafka-source-d7625848-02f2-4af7-9285-1ddd2b0906cf-1499473642-driver-0 with generation 2 is now empty (__consumer_offsets-6) (kafka.coordinator.group.GroupCoordinator)
[2017-08-15 12:46:07,120] INFO [GroupCoordinator 0]: Preparing to rebalance group spark-kafka-source-d7625848-02f2-4af7-9285-1ddd2b0906cf-1499473642-driver-0 with old generation 2 (__consumer_offsets-6) (kafka.coordinator.group.GroupCoordinator)
[2017-08-15 12:46:07,122] INFO [GroupCoordinator 0]: Stabilized group spark-kafka-source-d7625848-02f2-4af7-9285-1ddd2b0906cf-1499473642-driver-0 generation 3 (__consumer_offsets-6) (kafka.coordinator.group.GroupCoordinator)

更新:
我查看了spark应用程序ui,似乎耗时的部分是一些任务的“执行器计算时间”。

a1o7rhls

a1o7rhls1#

似乎你的移动太多了,试着对kafka流进行分区,并使用分区键进行聚合。只是一个想法,不是答案。

相关问题