org.apache.kafka.connect.errors.dataexception:属性不是有效的字段名

x7yiwoj4  于 2021-06-04  发布在  Kafka
关注(0)|答案(0)|浏览(390)

如果有任何引用项目或代码使用开放源码将kafka消息发送到jms队列,这将非常有用。我试着在junit中调用下面的代码,

private TextMessage getTextMessage(SinkRecord sinkRecord) {
    try {
        System.out.println("Processing Record: key={} value={}"+sinkRecord.key()+""+sinkRecord.value());
        final String payload = sinkRecord.value().toString();
        final TextMessage textMessage = webLogicJmsSession.session().createTextMessage(payload);
        textMessage.setStringProperty("KafkaTopic", sinkRecord.topic());
        textMessage.setIntProperty("KafkaPartition", sinkRecord.kafkaPartition());
        textMessage.setLongProperty("KafkaOffset", sinkRecord.kafkaOffset());
        Object key = sinkRecord.key();
        try {
            Schema keySchema = sinkRecord.keySchema();
            Struct keyStruct = (Struct) key;
            //Struct jmsStruct = keyStruct.getStruct("jms");
            Struct jmsProperties = keyStruct.getStruct("properties");
            keySchema
                .field("properties")
                .schema()
                .fields()
                .forEach(field -> {
                    String value = jmsProperties.getString(field.name());
                    try {
                        textMessage.setStringProperty(field.name(), value);
                    } catch (JMSException e) {
                        e.printStackTrace();
                    }
                });
            Struct jmsHeaders = keyStruct.getStruct("headers");
            textMessage.setJMSCorrelationID(jmsHeaders.getString("JMSCorrelationID"));
            final String jmsReplyTo = jmsHeaders.getString("JMSReplyTo");
            if (jmsReplyTo != null)
                textMessage.setJMSReplyTo(webLogicJmsSession.session().createQueue(jmsReplyTo));
            textMessage.setJMSCorrelationID(jmsHeaders.getString("JMSCorrelationID"));
            textMessage.setJMSType(jmsHeaders.getString("JMSType"));

        } catch (Exception e) {
            e.printStackTrace();
        }
        return textMessage;
    } catch (JMSException e) {
        e.printStackTrace();
        throw new RuntimeException(e);
    }
}

下面是junit代码

Schema dowSchema = SchemaBuilder.array(Schema.STRING_SCHEMA).build();
              Schema airingToItemSchema = SchemaBuilder.struct()
                      .field("dow", dowSchema)
                      .build();
              Schema airingToSchema = SchemaBuilder.array(airingToItemSchema).build();
              Schema rootSchema = SchemaBuilder.struct()
                      .field("airingTo", airingToSchema).build();

              Struct item = new Struct(airingToItemSchema)
              .put("dow", Collections.singletonList("SATURDAY"));
              Struct rootStruct = new Struct(rootSchema)
              .put("airingTo", Collections.singletonList(item));

              SinkRecord sinkRecord4 =  new SinkRecord("test",0,dowSchema,rootStruct,schema,struct,1234L);
              webLogicJmsSinkTask.put(Arrays.asList(sinkRecord4));

但它会抛出错误,
处理记录:key={}value={}struct{airingto=[struct{dow=[saturday]}}struct{first\u name=,last\u name=x,电子邮件地址=}org.apache.kafka.connect.errors.dataexception:属性不是有效的字段名

暂无答案!

目前还没有任何答案,快来回答吧!

相关问题