kafka流在分组和聚合时使用ktable转换为字符串问题

kulphzqa  于 2021-06-06  发布在  Kafka
关注(0)|答案(1)|浏览(443)

我有一个Kafka流与传入的消息,看起来像 sensor_code: x, time: 1526978768, address: Y 我想创建一个ktable,在每个传感器代码中存储每个唯一的地址。
K表

KTable<String, Long> numCount = streams
            .map(kvm1)
            .groupByKey(Serialized.with(stringSerde, stringSerde))
            .count()
            .groupBy(kvm2, Serialized.with(stringSerde, longSerde))
            .count(Materialized.<String, Long, KeyValueStore<Bytes, byte[]>>as("StateStore"));

哪里 kvm1 以及 kvm2 是我自己的 KeyValueMappers . 我的想法是用 sensor_code=x, address=y ,执行 groupByKey() 以及 count() . 然后是另一个 groupBy(kvm2, Serialized.with(stringSerde, longSerde)) 哪里 kvm2 修改现有的 key 控制 sensor_code 然后这个值就是它的计数。但既然不起作用,也许我做错了。。。它试图将其转换为long并抛出异常,因为它正在查找字符串。我要伯爵 Long ,对吧?
这是第一个 KeyValueMapper 我使用它各自的帮助功能:

private static String getKeySensorIdAddress(String o) {
    String x = "sensor_id=\"x\", address=\"y\""; 
    try {
        WifiStringEvent event = mapper.readValue(o, WifiStringEvent.class);
        x = x.replace("x", event.getSensor_code());
        x = x.replace("y", event.getAddress());
        return x;
    } catch(Exception ex) {
        System.out.println("Error... " + ex);
        return "Error";
    }
}
        //KeyValueMapper1
KeyValueMapper<String, String, KeyValue<String, String>> kvm1 = 
    new KeyValueMapper<String, String, KeyValue<String, String>>() {
         public KeyValue<String, String> apply(String key, String value) {
             return new KeyValue<>(getKeySensorIdAddress(value), value);
         }
    };

这是第二个 KeyValueMapper 以及它的帮助功能。

private static String getKeySensorId(String o) {
    int a = o.indexOf(",");
    return o.substring(0,a);
}

        //KeyValueMapper2 
    KeyValueMapper<String, Long, KeyValue<String, Long>> kvm2 = 
    new KeyValueMapper<String, Long, KeyValue<String, Long>>() {
         public KeyValue<String, Long> apply(String key, Long value) {
             return new KeyValue<>(getKeySensorId(key), value);
         }
    };

下面是我尝试运行代码时返回的异常和错误。
[2018-05-29 15:28:40,119]错误流线程[testuniqueaddresses-ed48daf8-fff0-42e4-bb5a-687584734b45-streamthread-1]由于以下错误无法处理流任务2\u 0:(org.apache.kafka.streams.processor.internals.assignedstr)eamstasks:105)java.lang.classcastexception:java.lang.long不能在org.apache.kafka.common.serialization.stringserializer.serialize(stringserializer。java:28)在org.apache.kafka.streams.state.stateserdes.rawvalue(stateserdes。java:178)位于org.apache.kafka.streams.state.internals.meteredkeyvaluebytesstore$1.innervalue(meteredkeyvaluebytesstore)。java:66)在org.apache.kafka.streams.state.internals.meteredkeyvaluebytesstore$1.innervalue(meteredkeyvaluebytesstore)。java:57)位于org.apache.kafka.streams.state.internals.innermeteredkeyvaluestore.put(innermeteredkeyvaluestore。java:198)在org.apache.kafka.streams.state.internals.meteredkeyvaluebytesstore.put(meteredkeyvaluebytesstore。java:117)在org.apache.kafka.streams.kstream.internals.ktableaggregate$ktableaggregateprocessor.process(ktableaggregate)。java:95)在org.apache.kafka.streams.kstream.internals.ktableaggregate$ktableaggregateprocessor.process(ktableaggregate)中。java:56)
注意 java.lang.ClassCastException: java.lang.Long cannot be cast to java.lang.String 错误。
你知道我为什么会出现这个错误,以及如何修复它,或者建议我如何编辑代码以达到我前面提到的预期输出吗?
非常感谢!
编辑:由于我放弃了其中一种方法,所以对我的问题进行了大的修改。

mu0hgdu0

mu0hgdu01#

在第一种情况下,如果要使用hashmap作为值类型,则需要为其定义一个自定义serde,并使用materialized.withvalueserde传递它。
在第二种情况下,我不能不看到keyvaluemappers的返回类型和确切的错误消息:它是试图将字符串转换为long还是相反?
编辑:谢谢分享额外的信息。
我认为在第二种情况下,还需要在第二个count操作中指定serde值。kgroupedstream上的count()和kgroupedtable之间似乎存在不一致,前者会自动将值serde设置为lonserde:
https://github.com/apache/kafka/blob/1.1/streams/src/main/java/org/apache/kafka/streams/kstream/internals/kgroupedstreamimpl.java#l281-l283型
但是kgroupedtable没有:
https://github.com/apache/kafka/blob/1.1/streams/src/main/java/org/apache/kafka/streams/kstream/internals/kgroupedtableimpl.java#l253
似乎已经固定在后备箱上,但尚未释放:
https://github.com/apache/kafka/blob/trunk/streams/src/main/java/org/apache/kafka/streams/kstream/internals/kgroupedtableimpl.java#l158-160磅

相关问题