json数据

xe55xuns  于 2021-06-21  发布在  Flink
关注(0)|答案(1)|浏览(381)

我试着用flink从postgre获取数据。代码如下:

dbData =env.createInput(JDBCInputFormat.buildJDBCInputFormat()
.setDrivername(Utils.properties_fetch("drivername"))
.setDBUrl(Utils.properties_fetch("dbURL"))
.setUsername(Utils.properties_fetch("username"))
.setPassword(Utils.properties_fetch("password"))
.setQuery(sourcequery)
.setRowTypeInfo(newRowTypeInfo(BasicTypeInfo.STRING_TYPE_INFO,
BasicTypeInfo.STRING_TYPE_INFO,BasicTypeInfo.STRING_TYPE_INFO,
BasicTypeInfo.DATE_TYPE_INFO,BasicTypeInfo.DATE_TYPE_INFO,
BasicTypeInfo.STRING_TYPE_INFO))
.finish());

第三个basictypeinfo.string类型信息是从postgre获取jsonb数据类型。
我得到以下错误:

06/28/2018 14:02:09 Job execution switched to status FAILING.
java.lang.ClassCastException: org.postgresql.util.PGobject cannot be 
cast to java.lang.String at 
org.apache.flink.api.common.typeutils.base.StringSerializer
.serialize(StringSerializer.java:28) at 
org.apache.flink.api.java.typeutils.runtime
.RowSerializer.serialize(RowSerializer.java:160) at 
org.apache.flink.api.java.typeutils.runtime.RowSerializer
.serialize(RowSerializer.java:46) at 
org.apache.flink.runtime.plugable.SerializationDelegate
.write(SerializationDelegate.java:54) at 
org.apache.flink.runtime.io.network.api.serialization
.SpanningRecordSerializer.addRecord(SpanningRecordSerializer
.java:93) at 
org.apache.flink.runtime.io.network.api.writer
.RecordWriter.sendToTarget(RecordWriter.java:114) at 
org.apache.flink.runtime.io.network.api.writer
.RecordWriter.emit(RecordWriter.java:89) at 
org.apache.flink.runtime.operators.shipping.OutputCollector
.collect(OutputCollector.java:65) at 
org.apache.flink.runtime.operators.util.metrics
.CountingCollector.collect(CountingCollector.java:35) at 
org.apache.flink.runtime.operators.DataSourceTask
.invoke(DataSourceTask.java:168)
at org.apache.flink.runtime.taskmanager.Task.run(Task.java:718)
at java.lang.Thread.run(Thread.java:748)
vecaoik1

vecaoik11#

从查询中返回的一个字段似乎是pgobject,其中flink需要一个字符串。
你可以改变主意 BasicTypeInfo.STRING_TYPE_INFO 此字段 TypeInformation.of(PGobject.class) 稍后您可以添加一个map函数来调用 PGobject#value 获取此字段的基础字符串值

相关问题