java—不允许在ApacheFlink表api中查询pojo数据集的超类型吗

fslejnso  于 2021-06-25  发布在  Flink
关注(0)|答案(1)|浏览(302)

我尝试在Windows10上用ApacheFlink1.3.2和Java1.8.0实现一个日志分析器。
上下文:
日志消息有多种类型。
为每种类型创建pojo。
为每种类型创建pojo类型的数据集示例。
然后使用表api进行查询,如下所示。
这个很好用。

DataSet<String> rawLogs = env.readTextFile(input);// input is the data file path
DataSet<FirstBackupMessage> logMsgPOJODataSet = rawLogs.map(new LogMapFunction());
BatchTableEnvironment tableEnv = TableEnvironment.getTableEnvironment(env); 
Table LogMessageTable = tableEnv.fromDataSet(logMsgPOJODataSet);
Table result = tableEnv .sql("Select taskId from " + LogMessageTable);
tableEnv.toDataSet(result, Row.class).print();

要求:我试图用工厂模型来概括这个实现。为了做到这一点,我尝试将pojo类推广到logmessage接口。在上述情况下:

public class FirstBackupMessage implements LogMessage
similarly 
public class SecondBackupMessage implements LogMessage
public class ThirdBackupMessage implements LogMessage

在mapfunction实现中,我填充了特定的类示例,但map函数的输出被Map到泛型引用,即logmessage,在上述情况下,它将是

DataSet<LogMessage> logMsgPOJODataSet = rawLogs.map(new LogMapFunction());  
//the LogMapFunction.map method is populating FirstBackupMessage

在此之后,如果我尝试查询pojo firstbackupmessage中存在的字段,但现在参考接口(即logmessage),它会抛出异常,表示找不到我要查询的字段。
但是
奇怪的是,如果我使用通用引用(即logmsgpogodataset.print())打印数据集,它会打印特定pojo中的所有字段(在本例中为firstbackupmessage)。
问题:在flink表API中,这种对数据集的泛型引用的转换是不允许/不可用的吗?

hpxqektj

hpxqektj1#

表api/sql库对关系表进行操作。通过呼叫 TableEnvironment.fromDataSet(logMsgPOJODataSet) ,的
DataSet logMsgPOJODataSet 逻辑上转换为表。在这个过程中,新表的模式需要根据
logMsgPOJODataSet DataSet . flink的数据集api使用 TypeInformation 确定 DataSet .
因为
logMsgPOJODataSet DataSetLogMessage ,表api不知道它的任何子类型。因此所有的领域 LogMessage 包含,但没有子类型字段。
在任何情况下,都不可能在同一个表中处理不同类型的行。所有行必须具有相同的架构。处理这种情况的两种方法是:
使架构成为所有子类型的超集,并为不支持的类型提供空值。可以添加另一个表示子类型的字段。
添加泛型 Map<String, String> 保存所有子类型数据的字段。
在这两种情况下,转换都需要使用dataset api完成,例如使用 MapFunction .

相关问题