Kafka流到Spark不减少计数

s2j5cfk0  于 2021-06-08  发布在  Kafka
关注(0)|答案(1)|浏览(248)

我正在尝试一个从Kafka流到spark的基本示例。我是个新手,没有什么经验。
我的程序如下(复制自apachespark中的示例):

if (args.length < 4) {
        System.err.println("Usage: JavaKafkaWordCount <zkQuorum> <group> <topics> <numThreads>");
        System.exit(1);
    }

    String zkQuorum = args[0];
    String groupId = args[1];
    String topicsToListen = args[2];
    String numOfThread = args[3];

    StreamingExamples.setStreamingLogLevels();
    SparkConf sparkConf = new SparkConf().setAppName("JavaKafkaWordCount");
    // Create the context with 2 seconds batch size
    JavaStreamingContext jssc = new JavaStreamingContext(sparkConf, new Duration(2000));

    int numThreads = Integer.parseInt(numOfThread);
    Map<String, Integer> topicMap = new HashMap<>();
    String[] topics = topicsToListen.split(",");
    for (String topic : topics) {
        topicMap.put(topic, numThreads);
    }

    JavaPairReceiverInputDStream<String, String> messages =
            KafkaUtils.createStream(jssc, zkQuorum, groupId, topicMap);

    JavaDStream<String> lines = messages.map(Tuple2::_2);

    JavaDStream<String> words = lines.flatMap(x -> Arrays.asList(SPACE.split(x)).iterator());

    JavaPairDStream<String, Integer> wordCounts = words.mapToPair(s -> new Tuple2<>(s, 1))
            .reduceByKey((i1, i2) -> i1 + i2);

    wordCounts.print();
    jssc.start();
    jssc.awaitTermination();

然后启动kafka代理并通过生成以下命令来运行构建的jar:
$spark\u home/bin/spark submit--class“javakafkawordcount”--master local[2]指向\u jar/kafka-spark-streaming-1.0-snapshot-jar-with-dependencies.jar的路径localhost:2181 test-consumer-group 测试1
当我从Kafka制作人那里制作一些文字时,我希望一个文字的出版数量会增加很多倍,但我看到的只是文字,而且每出版一本新的书就印一本:
(你好,1)
当我不止一次发表同一个词的时候,我还以为这个数字会增加,
(你好,2)
但那不会发生。我到底理解错了什么?这和我对工作的看法有关,还是工作的目的有关?
有人能给我一些见解吗?
谢谢谢比尔

l2osamch

l2osamch1#

在读了几遍代码之后,我设法找出了为什么我总是把每个单词的计数设为1,而不是合计总数。
在下面的行中:

// Create the context with 2 seconds batch size
JavaStreamingContext jssc = new JavaStreamingContext(sparkConf, new Duration(2000));

我将流中连续读取的间隔设置为2秒。我意识到,在producer端的这个间隔(2秒)内,我没有生成足够的同一字符串内容来获得聚合结果。
但是,当我将这个间隔增加到10000毫秒(10秒)时,我就可以从Kafka生成器生成多行数据。这些行由作业适当地处理,类似的字符串计数在特定的时间间隔内很好地聚合。
(你好,4)
(世界,6)
非常感谢谢比尔

相关问题