什么是kafka主题分区、spark批大小、每个主题的线程数的组合?

643ylb08  于 2021-06-07  发布在  Kafka
关注(0)|答案(0)|浏览(135)

Kafka主题有300个分区,当我运行这个作业的同时系统崩溃并给出以下错误,
添加批量大小持续时间或可用线程数是否有错误?
我试着做以下事情,
批量大小:500、1000、2000和5000
尝试的线程数->1、10、100、50、200、500、1000
但仍然得到下面列出的错误,
这是我在这里写的javaspark作业。

public static void main(String[] args) throws Exception {

        // StreamingExamples.setStreamingLogLevels();
        SparkConf sparkConf = new SparkConf().setAppName("JavaKafkaWordCount11").setMaster("local[*]");
        sparkConf.set("spark.streaming.concurrentJobs", "300");

        // Create the context with 2 seconds batch size
        JavaStreamingContext jssc = new JavaStreamingContext(sparkConf, new Duration(5000));

        Map<String, Integer> topicMap = new HashMap<>();
        topicMap.put("PartWithTopic01Queue", 500);
        JavaPairReceiverInputDStream<String, String> messages = KafkaUtils.createStream(jssc, "x.xx.xxx.xxx:2181", "1",
                topicMap);
        JavaDStream<String> lines = messages.map(new Function<Tuple2<String, String>, String>() {
            @Override
            public String call(Tuple2<String, String> tuple2) {
                return tuple2._2();
            }
        });
        // System.out.println(lines.count());
        lines.foreachRDD(rdd -> {
            rdd.foreachPartition(p -> {
                while (p.hasNext()) {
                    System.out.println("file data");
                }
            });
        });
}

显示错误,

11/22 05:08:27 INFO DAGScheduler: Job 4 finished: foreachPartition at KafkaConsumer.java:56, took 0.000055 s
16/11/22 05:08:27 INFO JobScheduler: Finished job streaming job 1479809307000 ms.0 from job set of time 1479809307000 ms
16/11/22 05:08:27 INFO JobScheduler: Total delay: 0.025 s for time 1479809307000 ms (execution: 0.015 s)
16/11/22 05:08:27 INFO MapPartitionsRDD: Removing RDD 6 from persistence list
16/11/22 05:08:27 INFO BlockManager: Removing RDD 6
16/11/22 05:08:27 INFO BlockRDD: Removing RDD 5 from persistence list
16/11/22 05:08:27 INFO BlockManager: Removing RDD 5
16/11/22 05:08:27 INFO KafkaInputDStream: Removing blocks of RDD BlockRDD[5] at createStream at KafkaConsumer.java:46 of time 1479809307000 ms
16/11/22 05:08:27 INFO ReceivedBlockTracker: Deleting batches ArrayBuffer(1479809304000 ms)
16/11/22 05:08:27 INFO InputInfoTracker: remove old batch metadata: 1479809304000 ms
JVMDUMP039I Processing dump event "systhrow", detail "java/lang/OutOfMemoryError" at 2016/11/22 05:08:28 - please wait.
JVMDUMP032I JVM requested System dump using 'C:\dev\workspace\SparkDemo\core.20161122.050828.7080.0001.dmp' in response to an event
JVMDUMP010I System dump written to C:\dev\workspace\SparkDemo\core.20161122.050828.7080.0001.dmp
JVMDUMP032I JVM requested Heap dump using 'C:\dev\workspace\SparkDemo\heapdump.20161122.050828.7080.0002.phd' in response to an event
JVMDUMP010I Heap dump written to C:\dev\workspace\SparkDemo\heapdump.20161122.050828.7080.0002.phd
JVMDUMP032I JVM requested Java dump using 'C:\dev\workspace\SparkDemo\javacore.20161122.050828.7080.0003.txt' in response to an event
JVMDUMP010I Java dump written to C:\dev\workspace\SparkDemo\javacore.20161122.050828.7080.0003.txt
JVMDUMP032I JVM requested Snap dump using 'C:\dev\workspace\SparkDemo\Snap.20161122.050828.7080.0004.trc' in response to an event
16/11/22 05:08:32 INFO ClientCnxn: Client session timed out, have not heard from server in 4742ms for sessionid 0x1581fe095f40bc9, closing socket connection and attempting reconnect
JVMDUMP010I Snap dump written to C:\dev\workspace\SparkDemo\Snap.20161122.050828.7080.0004.trc
JVMDUMP013I Processed dump event "systhrow", detail "java/lang/OutOfMemoryError".
16/11/22 05:08:32 INFO SimpleConsumer: Reconnect due to error:
java.lang.OutOfMemoryError: Java heap space
    at java.nio.HeapByteBuffer.<init>(HeapByteBuffer.java:57)
    at java.nio.ByteBuffer.allocate(ByteBuffer.java:335)
    at org.apache.kafka.common.network.NetworkReceive.readFromReadableChannel(NetworkReceive.java:93)
    at kafka.network.BlockingChannel.readCompletely(BlockingChannel.scala:129)
    at kafka.network.BlockingChannel.receive(BlockingChannel.scala:120)
    at kafka.consumer.SimpleConsumer.liftedTree1$1(SimpleConsumer.scala:86)
    at kafka.consumer.SimpleConsumer.kafka$consumer$SimpleConsumer$$sendRequest(SimpleConsumer.scala:83)
    at kafka.consumer.SimpleConsumer$$anonfun$fetch$1$$anonfun$apply$mcV$sp$1.apply$mcV$sp(SimpleConsumer.scala:132)
    at kafka.consumer.SimpleConsumer$$anonfun$fetch$1$$anonfun$apply$mcV$sp$1.apply(SimpleConsumer.scala:132)
    at kafka.consumer.SimpleConsumer$$anonfun$fetch$1$$anonfun$apply$mcV$sp$1.apply(SimpleConsumer.scala:132)
    at kafka.metrics.KafkaTimer.time(KafkaTimer.scala:33)
    at kafka.consumer.SimpleConsumer$$anonfun$fetch$1.apply$mcV$sp(SimpleConsumer.scala:131)
    at kafka.consumer.SimpleConsumer$$anonfun$fetch$1.apply(SimpleConsumer.scala:131)
    at kafka.consumer.SimpleConsumer$$anonfun$fetch$1.apply(SimpleConsumer.scala:131)
    at kafka.metrics.KafkaTimer.time(KafkaTimer.scala:33)
    at kafka.consumer.SimpleConsumer.fetch(SimpleConsumer.scala:130)
    at kafka.consumer.ConsumerFetcherThread.fetch(ConsumerFetcherThread.scala:108)
    at kafka.consumer.ConsumerFetcherThread.fetch(ConsumerFetcherThread.scala:29)
    at kafka.server.AbstractFetcherThread.processFetchRequest(AbstractFetcherThread.scala:107)
    at kafka.server.AbstractFetcherThread.doWork(AbstractFetcherThread.scala:98)
    at kafka.utils.ShutdownableThread.run(ShutdownableThread.scala:63)
JVMDUMP039I Processing dump event "systhrow", detail "java/lang/OutOfMemoryError" at 2016/11/22 05:08:33 - please wait.
JVMDUMP032I JVM requested Heap dump using 'C:\dev\workspace\SparkDemo\heapdump.20161122.050833.7080.0005.phd' in response to an event
JVMDUMP010I Heap dump written to C:\dev\workspace\SparkDemo\heapdump.20161122.050833.7080.0005.phd
JVMDUMP032I JVM requested Java dump using 'C:\dev\workspace\SparkDemo\javacore.20161122.050833.7080.0006.txt' in response to an event
JVMDUMP010I Java dump written to C:\dev\workspace\SparkDemo\javacore.20161122.050833.7080.0006.txt
JVMDUMP032I JVM requested Snap dump using 'C:\dev\workspace\SparkDemo\Snap.20161122.050833.7080.0007.trc' in response to an event
16/11/22 05:08:34 INFO JobScheduler: Added jobs for time 1479809308500 ms
JVMDUMP010I Snap dump written to C:\dev\workspace\SparkDemo\Snap.20161122.050833.7080.0007.trc
JVMDUMP013I Processed dump event "systhrow", detail "java/lang/OutOfMemoryError".
16/11/22 05:08:34 INFO ZkClient: zookeeper state changed (Disconnected)
16/11/22 05:08:34 WARN ConsumerFetcherThread: [ConsumerFetcherThread-1_wrauser07-1479809302517-dff0431d-0-1], Error in fetch kafka.consumer.ConsumerFetcherThread$FetchRequest@cc2189ae
java.lang.OutOfMemoryError: Java heap space
    at java.nio.HeapByteBuffer.<init>(HeapByteBuffer.java:57)
    at java.nio.ByteBuffer.allocate(ByteBuffer.java:335)
    at org.apache.kafka.common.network.NetworkReceive.readFromReadableChannel(NetworkReceive.java:93)
    at kafka.network.BlockingChannel.readCompletely(BlockingChannel.scala:129)
    at kafka.network.BlockingChannel.receive(BlockingChannel.scala:120)
    at kafka.consumer.SimpleConsumer.liftedTree1$1(SimpleConsumer.scala:99)
    at kafka.consumer.SimpleConsumer.kafka$consumer$SimpleConsumer$$sendRequest(SimpleConsumer.scala:83)
    at kafka.consumer.SimpleConsumer$$anonfun$fetch$1$$anonfun$apply$mcV$sp$1.apply$mcV$sp(SimpleConsumer.scala:132)
    at kafka.consumer.SimpleConsumer$$anonfun$fetch$1$$anonfun$apply$mcV$sp$1.apply(SimpleConsumer.scala:132)
    at kafka.consumer.SimpleConsumer$$anonfun$fetch$1$$anonfun$apply$mcV$sp$1.apply(SimpleConsumer.scala:132)
    at kafka.metrics.KafkaTimer.time(KafkaTimer.scala:33)
    at kafka.consumer.SimpleConsumer$$anonfun$fetch$1.apply$mcV$sp(SimpleConsumer.scala:131)
    at kafka.consumer.SimpleConsumer$$anonfun$fetch$1.apply(SimpleConsumer.scala:131)
    at kafka.consumer.SimpleConsumer$$anonfun$fetch$1.apply(SimpleConsumer.scala:131)
    at kafka.metrics.KafkaTimer.time(KafkaTimer.scala:33)
    at kafka.consumer.SimpleConsumer.fetch(SimpleConsumer.scala:130)
    at kafka.consumer.ConsumerFetcherThread.fetch(ConsumerFetcherThread.scala:108)
    at kafka.consumer.ConsumerFetcherThread.fetch(ConsumerFetcherThread.scala:29)
    at kafka.server.AbstractFetcherThread.processFetchRequest(AbstractFetcherThread.scala:107)
    at kafka.server.AbstractFetcherThread.doWork(AbstractFetcherThread.scala:98)
    at kafka.utils.ShutdownableThread.run(ShutdownableThread.scala:63)
16/11/22 05:08:35 INFO JobScheduler: Starting job streaming job 1479809308500 ms.0 from job set of time 1479809308500 ms

暂无答案!

目前还没有任何答案,快来回答吧!

相关问题