Kafka流消息在聚合后不通过

zbwhf8kr  于 5个月前  发布在  Apache
关注(0)|答案(1)|浏览(76)

我有一个Kafka Streams应用程序,它从一个主题中读取,从记录值中选择key作为非空字段,对所选键执行groupbyKey,然后聚合具有相同键的所有值,字符串初始值为“”(空字符串)。我的想法是收集所有具有相同键的值作为单个大字符串并写入输出主题。(我更改应用程序ID以从最早的输入主题开始消费)最终得到不同大小的输出主题。我的输入中有1.28亿条记录,但输出主题的范围从130万到4500万条记录。是我的物化.with,还是我键入数据的方式?!
有人能帮我理解一下吗?这里是我的代码,我使用一个Kafka Streams绑定器(Spring Cloud Stream Kafka binder)读取所有输入数据,然后做下面的key-ing,groupBy和聚合,然后使用.toStream()并将这些数据放在主题中。

public class PRWSLoader {

    private String initStr = "";

    @StreamListener
    public void process(@Input("prws_input_channel") KStream<String, PRWSREPLICA> prwsKStream
    ) {

        prwsKStream
                .peek(((key, value) -> log.info(">>> input key: {} prwhn: {} VALUE: {} ", key,value.getPRWHn(),value)))
                .selectKey(((key, value) -> String.valueOf(value.getPRWHn()))) //convert Integer to String key
                .groupByKey()
                .aggregate(() -> initStr,
                        (key, value, agg) -> {

                            return agg + "::" + value.getSequence() + "|" + value.getName()+"|"+value.getCae()+"|"+value.getSoc()+
                            "|"+value.getSocietyMech()+"|"+value.getCdCat()+"|"+value.getPRWSn()+"|"+value.getContr()+"|"+
                                value.getPePerf()+"|"+value.getPeMech()+"|"+value.getPeSy()+"|"+value.getCdTer()+"|"
                                    +value.getCdSy()+"|"+value.getCdCon();
                        },
                        Materialized.with(Serdes.String(), Serdes.String()))
                .toStream()
                .peek(((key, value) -> log.info(">>> output key: {} VALUE: {} ", key,value)))
                .to("APRADB_PRWS_MERGED");

    }

}

字符串
有趣的是,我看到在一个健康运行我的输出主题和更新日志主题计数是相似的,但在一个异常运行如下面所附的图片,我看到输出主题有相当少的消息比更新日志。这是一个从AKHQ的视图,我的输出主题,更新日志,重新划分主题。我还看到一个奇怪的事情发生了,我的输出主题开始减少,因为某种原因,就像它达到了250万,并开始减少。我所有的主题都有无限的保留,我将retention.ms设置为-1用于输出和更新日志主题。我们运行在AWS MSK Kafka上,有3个代理,所有主题都有3个分区和2个同步副本。任何帮助都将非常感谢。TIA
x1c 0d1x的数据
这里是我的应用程序配置应用程序。yaml,我应该在这里设置的东西可能是我错过了?

spring:
  cloud:
    stream:
      schemaRegistryClient:
        endpoint: https://kafka.internal.org:8081
      bindings:
        prws_input_channel:
          destination: APRADB_VW_PRWSREPLICA

      kafka:
        streams:
          binder:
            applicationId: ${APP_ID}
            brokers: ${CONNECT_BOOTSTRAP_SERVERS}
            configuration:
              max.request.size: 5242880
              schema.registry.url: ${SCHEMA_REGISTRY_URL}
              security:
                protocol: ${CONNECT_SECURITY_PROTOCOL}
              sasl:
                mechanism: ${CONNECT_SASL_MECHANISM}
                jaas:
                  config: ${CONNECT_SASL_JAAS_CONFIG}
              commit.interval.ms: ${COMMIT_INTERVAL_MS}
              state.dir: efs/${APP_NAME}-state-store
              default:
                key.serde: org.apache.kafka.common.serialization.Serdes$StringSerde
                value.serde: io.confluent.kafka.streams.serdes.avro.SpecificAvroSerde
          bindings:
            prws_input_channel:
              consumer:
                keySerde: org.apache.kafka.common.serialization.Serdes$StringSerde
                valueSerde: io.confluent.kafka.streams.serdes.avro.SpecificAvroSerde
                startOffset: earliest

相关问题