flink中是否不推荐使用jsondeserializationschema()?

1cklez4t  于 2021-06-26  发布在  Flink
关注(0)|答案(2)|浏览(391)

我是新的flink和做一些非常类似的下面的链接。
在flink 1.2中,无法看到Kafka流下沉时的消息,也无法看到打印消息
我还尝试添加jsondeserializationschema()作为没有键的kafka输入json消息的反序列化程序。
但是我发现jsondeserializationschema()不存在。
如果我做错了什么,请告诉我。

9rygscc1

9rygscc11#

为了解决从kafka读取非键json消息的问题,我使用了case类和json解析器。
下面的代码生成一个case类,并使用playapi解析json字段。

import play.api.libs.json.JsValue

object CustomerModel {

  def readElement(jsonElement: JsValue): Customer = {
    val id = (jsonElement \ "id").get.toString().toInt
    val name = (jsonElement \ "name").get.toString()
    Customer(id,name)
  }
case class Customer(id: Int, name: String)
}

def main(args: Array[String]): Unit = {
val env = StreamExecutionEnvironment.getExecutionEnvironment
val properties = new Properties()
properties.setProperty("bootstrap.servers", "xxx.xxx.0.114:9092")
properties.setProperty("group.id", "test-grp")

val consumer = new FlinkKafkaConsumer[String]("customer", new SimpleStringSchema(), properties)
val stream1 = env.addSource(consumer).rebalance

val stream2:DataStream[Customer]= stream1.map( str =>{Try(CustomerModel.readElement(Json.parse(str))).getOrElse(Customer(0,Try(CustomerModel.readElement(Json.parse(str))).toString))
    })

stream2.print("stream2")
env.execute("This is Kafka+Flink")

}

try方法允许您克服在解析数据时抛出的异常,并在其中一个字段中返回异常(如果需要的话),否则它可以只返回带有任何给定或默认字段的case类对象。
代码的示例输出为:

stream2:1> Customer(1,"Thanh")
stream2:1> Customer(5,"Huy")
stream2:3> Customer(0,Failure(com.fasterxml.jackson.databind.JsonMappingException: No content to map due to end-of-input
 at [Source: ; line: 1, column: 0]))

我不确定这是否是最好的方法,但它是为我工作到现在。

r1zhe5dt

r1zhe5dt2#

JSONDeserializationSchema 在Flink1.8中被删除,之前被弃用过。
建议的方法是编写一个实现 DeserializationSchema<T> . 下面是一个例子,我从flink operationsPlayground复制的:

import org.apache.flink.api.common.serialization.DeserializationSchema;
import org.apache.flink.api.common.typeinfo.TypeInformation;

import org.apache.flink.shaded.jackson2.com.fasterxml.jackson.databind.ObjectMapper;

import java.io.IOException;

/**
 * A Kafka {@link DeserializationSchema} to deserialize {@link ClickEvent}s from JSON.
 *
 */
public class ClickEventDeserializationSchema implements DeserializationSchema<ClickEvent> {

    private static final long serialVersionUID = 1L;

    private static final ObjectMapper objectMapper = new ObjectMapper();

    @Override
    public ClickEvent deserialize(byte[] message) throws IOException {
        return objectMapper.readValue(message, ClickEvent.class);
    }

    @Override
    public boolean isEndOfStream(ClickEvent nextElement) {
        return false;
    }

    @Override
    public TypeInformation<ClickEvent> getProducedType() {
        return TypeInformation.of(ClickEvent.class);
    }
}

对于一个Kafka制作人来说 KafkaSerializationSchema<T> ,你会在同一个项目中找到这样的例子。

相关问题