如何转换/分叉Kafka流并将其发送到特定主题?

20jt8wwn  于 2021-06-08  发布在  Kafka
关注(0)|答案(2)|浏览(389)

我正在尝试使用函数“mapvalues”将原始流“textlines”中获得的字符串值转换为jsonobject消息。然后将newstream中的所有内容流到一个名为“testoutput”的主题上。但是每次消息实际通过转换块时,我都会得到一个nullpointerexception,错误只指向kafka流库。不知道发生了什么:((
p、 当我从原始流派生/创建一个新的Kafka流时,新的流是否属于原始生成器?因为我需要生成器来创建kafkastreams对象并开始流式处理,所以我不确定是否需要对新的流进行其他处理,而不仅仅是指定它的去向(“topic”)

//Testing a Kafka Stream Application
public class testStream {

public static void main(String[] args) throws Exception {
    //Configurations
    Properties props = new Properties();
    props.put(StreamsConfig.APPLICATION_ID_CONFIG, "streams-teststream");
    props.put(StreamsConfig.BOOTSTRAP_SERVERS_CONFIG, "xxxxxxxxxxxx:xxxx");
    props.put(StreamsConfig.KEY_SERDE_CLASS_CONFIG, Serdes.String().getClass().getName());
    props.put(StreamsConfig.VALUE_SERDE_CLASS_CONFIG, Serdes.String().getClass().getName());
    props.put(StreamsConfig.TIMESTAMP_EXTRACTOR_CLASS_CONFIG, WallclockTimestampExtractor.class);
    props.put(ConsumerConfig.AUTO_OFFSET_RESET_CONFIG, "latest");

    //Building Stream
    KStreamBuilder builder = new KStreamBuilder();
    KStream<String, String> textlines = builder.stream("mytest2"); 

    //Printout The Inputs just for testing purposes
    textlines.foreach(new ForeachAction<String, String>(){
        public void apply(String key, String value){
            for(int y=0; y<value.length(); y++){
                System.out.print(value.charAt(y));
            }
            System.out.print("\n");
        }
    });

    //Transform String Records to JSON Objects
    KStream<String, JSONObject> newStream = textlines.mapValues(new ValueMapper<String,JSONObject>(){
        @Override
        public JSONObject apply(String value) {

            JSONObject jsnobj = new JSONObject();

            //If the first 4 letters of the message is "xxxx" then parse it to a 
            //JSON Object, otherwise create a dummy
            if(value.substring(0, 4).equals("xxxx")){               
                jsnobj.put("Header_Title", value.substring(0, 4));
                jsnobj.put("Data_Part", value.substring(4));
            }else{
                jsnobj.put("Header_Title", "Not xxxx");
                jsnobj.put("Data_Part", "None");
            }
            return jsnobj;
        }
    });

    //Specify target
    newStream.to("testoutput");

    //Off you go
    KafkaStreams streams=new KafkaStreams(builder, props);
    streams.start();

  }
}
u4dcyp6a

u4dcyp6a1#

@迈克尔:我根据你的建议修改了密码。谢谢。我的目标是将读取字符串解析为json值。

KStreamBuilder builder = new KStreamBuilder();

    KStream<String, String> textLines = builder.stream("input-topic-name");
    // do stuff

    Deserializer<JsonNode> jsonDeserializer = new JsonDeserializer();
    Serializer<JsonNode> jsonSerializer = new JsonSerializer();

    Serde<JsonNode> jsonSerde = Serdes.serdeFrom(jsonSerializer, jsonDeserializer);

    KStream<String, JsonNode> newStream = textLines.mapValues(new ValueMapper<String,JsonNode>(){
        @Override
        public JsonNode apply(String value) {

            JSONObject jsnObj = new JSONObject();

            //If the first 4 letters of the message is "xxxx" then parse it to a 
            //JSON Object, otherwise create a dummy
                jsnObj.put("Header_Title", value.toString());

                    ObjectMapper objectMapper = new ObjectMapper();

                    JsonNode json_value = null;
                    try {
                        json_value = objectMapper.readTree(jsnObj.toString());
                    } catch (IOException e) {
                        // TODO Auto-generated catch block
                        e.printStackTrace();
                    }

            return json_value;
        }

    });

    newStream.to(Serdes.String(), jsonSerde, "json-output");
ia2d9nvy

ia2d9nvy2#

据我所知,你的问题是:

newStream.to("testoutput");
``` `newStream` 具有类型 `KStream<String, JSONObject>` .
但是,默认情况下,应用程序配置为使用 `String` serde以序列化/反序列化记录键和记录值:

props.put(StreamsConfig.KEY_SERDE_CLASS_CONFIG, Serdes.String().getClass().getName());
props.put(StreamsConfig.VALUE_SERDE_CLASS_CONFIG, Serdes.String().getClass().getName());

这意味着,如果在 `to()` 打电话,Kafka将尝试写你的 `newStream` 作为 `KStream<String, String>` (而不是 `KStream<String, JSONObject>` )回到Kafka。
您需要做的是在 `to()` 电话:

// Sth like this
newStream.to(Serdes.String(), myJsonSerde, "testoutput");

不幸的是,kafka还没有包含现成的json serde(这是计划中的)。幸运的是,您可以查看(并复制)kafka自己的kafka streams api演示应用程序中包含的json serde示例:https://github.com/apache/kafka/tree/trunk/streams/examples/src/main/java/org/apache/kafka/streams/examples/pageview

相关问题