kafka streams应用程序不能在一台机器上扩展更多cpu内核和线程

brjng4g3  于 2021-06-06  发布在  Kafka
关注(0)|答案(0)|浏览(142)

我正在编写一个kafka流应用程序,它基本上从avro记录中提取两种类型的密钥,并在指定的窗口中对它们进行计数。它应该每秒处理约6k个事件。
我面临的问题是:
1 c4.8xlarge 示例 num.stream.threads = 20 线程(输入主题的分区数)每秒仅消耗约2.5k个事件
与相同的示例 num.stream.threads = 10 线程以相同的速率使用事件
4 c4.2xlarge 示例 num.stream.threads = 5 每秒最多消耗10-25k个事件
我从未见过任何一个内核的cpu利用率高于70%。网络信息技术也未得到充分利用。
这是我的配置:

kafka.streaming {  
  compression.type = "lz4"
  acks = 1
  retries = 1

  // I care about throughput more than about latency
  max.poll.records = 6000
  fetch.min.bytes = 3300000 // 6000 * 550 (average record size)
  fetch.max.wait.ms = 1000 // we get 6000 records in 1 second
  batch.size = 165000 // (6000 / 20) * 550
  linger.ms = 1000
}

代理版本:0.10.2.1
Kafka流版本:1.1.1
这似乎令人惊讶,因为我认为只要有足够的分区,我就可以线性地扩展kafka处理,不管用户位于何处、在一台机器上还是在多台机器上。
许多ec2示例可以解决可伸缩性问题,但我希望在单个示例上运行我的应用程序,因为聚合必须通过交互式查询公开,而且我不想开发rpc层。
upd:流定义

signalStream
  .map[EventDetailsGroup, java.lang.Short]((_, v) => new KeyValue(extractEventDetailsGroup(v), Short.box(1)))
  .groupByKey(Serialized.`with`(eventDetailsSerde, Serdes.Short()))   
  .windowedBy(TimeWindows.of(30 * 60 * 1000).advanceBy(60 * 1000))   
  .count(Materialized.as("store-name").withCachingDisabled().withLoggingDisabled())

暂无答案!

目前还没有任何答案,快来回答吧!

相关问题