我正在尝试一个从Kafka流到spark的基本示例。我是个新手,没有什么经验。
我的程序如下(复制自apachespark中的示例):
if (args.length < 4) {
System.err.println("Usage: JavaKafkaWordCount <zkQuorum> <group> <topics> <numThreads>");
System.exit(1);
}
String zkQuorum = args[0];
String groupId = args[1];
String topicsToListen = args[2];
String numOfThread = args[3];
StreamingExamples.setStreamingLogLevels();
SparkConf sparkConf = new SparkConf().setAppName("JavaKafkaWordCount");
// Create the context with 2 seconds batch size
JavaStreamingContext jssc = new JavaStreamingContext(sparkConf, new Duration(2000));
int numThreads = Integer.parseInt(numOfThread);
Map<String, Integer> topicMap = new HashMap<>();
String[] topics = topicsToListen.split(",");
for (String topic : topics) {
topicMap.put(topic, numThreads);
}
JavaPairReceiverInputDStream<String, String> messages =
KafkaUtils.createStream(jssc, zkQuorum, groupId, topicMap);
JavaDStream<String> lines = messages.map(Tuple2::_2);
JavaDStream<String> words = lines.flatMap(x -> Arrays.asList(SPACE.split(x)).iterator());
JavaPairDStream<String, Integer> wordCounts = words.mapToPair(s -> new Tuple2<>(s, 1))
.reduceByKey((i1, i2) -> i1 + i2);
wordCounts.print();
jssc.start();
jssc.awaitTermination();
然后启动kafka代理并通过生成以下命令来运行构建的jar:
$spark\u home/bin/spark submit--class“javakafkawordcount”--master local[2]指向\u jar/kafka-spark-streaming-1.0-snapshot-jar-with-dependencies.jar的路径localhost:2181 test-consumer-group 测试1
当我从Kafka制作人那里制作一些文字时,我希望一个文字的出版数量会增加很多倍,但我看到的只是文字,而且每出版一本新的书就印一本:
(你好,1)
当我不止一次发表同一个词的时候,我还以为这个数字会增加,
(你好,2)
但那不会发生。我到底理解错了什么?这和我对工作的看法有关,还是工作的目的有关?
有人能给我一些见解吗?
谢谢谢比尔
1条答案
按热度按时间l2osamch1#
在读了几遍代码之后,我设法找出了为什么我总是把每个单词的计数设为1,而不是合计总数。
在下面的行中:
我将流中连续读取的间隔设置为2秒。我意识到,在producer端的这个间隔(2秒)内,我没有生成足够的同一字符串内容来获得聚合结果。
但是,当我将这个间隔增加到10000毫秒(10秒)时,我就可以从Kafka生成器生成多行数据。这些行由作业适当地处理,类似的字符串计数在特定的时间间隔内很好地聚合。
(你好,4)
(世界,6)
非常感谢谢比尔