kafka连接转换器vs转换

yacmzcpb  于 2021-06-08  发布在  Kafka
关注(0)|答案(1)|浏览(455)

我正在尝试创建以下工作流
nginx日志由kafka连接器获取并上传到主题
hdfs同步连接器然后将这些日志放入hdfs中
hive用于分析hdfs数据(例如,按ip地址分组的访问数等)
虽然我可以按照配置单元元存储所需的格式(仅限空格或逗号分隔的必填字段)排列nginx日志,但我想知道是否可以在不接触nginx日志格式的情况下通过
使用类似于org.apache.kafka.connect.json.jsonconverter的转换器
使用单个消息转换
这两种方法都需要一个定制的实现,关于如何实现这一点的文档很少。
哪一种方法是实现这一目标的正确方法?在使用kafka connect将nginx日志输出/任何源数据写入主题时,是否有任何示例可用于解析nginx日志输出/任何源数据。我使用的是独立文件连接器。

8nuwlpux

8nuwlpux1#

kafka connect源连接器负责将消息从源中的表示(例如nginx日志消息格式)转换为内存中的表示调用 SourceRecord 使用Kafka的连接 Struct 以及 Schema 数据结构。然后,kafka connect使用它的转换器将内存中的记录表示形式转换为 byte[] 写给Kafka的陈述。
这种职责分离非常重要,因为它允许您混合和匹配功能。写入主题的序列化消息的确切性质可以独立于连接器进行更改。例如,一些开发人员更喜欢使用json编写数据。许多其他人更喜欢使用公共模式注册表用avro序列化消息,两者的结合可以确保所有消息都使用一个特定的模式,同时让该模式以向后兼容的方式随着时间的推移而发展,这样生产者就可以发展到该模式的新版本,而消费者就可以在稍后的某个时间点适应该模式。使用avro模式和模式注册提供了巨大的好处。
底线:不要创建了解上游数据源的自定义转换器。你会把自己困在太多的耦合里,因为 byte[] 表示将是定制的,并且仅由也知道此特定表示的消费者和应用程序使用。
相反,如果需要稍微调整内存中的消息结构,请使用现有的源连接器和单个消息转换。在这种情况下,甚至最好创建一个自定义源连接器(可能专门化现有的基于文件的源连接器),将nginx日志消息格式调整为结构化内存表示形式。

相关问题