如何在sourceconnector中而不是在sourceconnectortask中获取offsetreader?

aelbi1ox  于 2021-06-08  发布在  Kafka
关注(0)|答案(1)|浏览(327)

我需要读入 SourceConnector.taskConfigs() 方法,有办法吗?

46scxncf

46scxncf1#

这有票和公关
https://issues.apache.org/jira/browse/kafka-4794
目前,我设法使用反射(joor)获得offsetreader:

private OffsetStorageReader getOffsetStorageReader() {

    try {
        Object innerContext = on(context).get("this$0");
        Object ctx = on(innerContext).get("ctx");

        Object herder = on(ctx).get("herder");
        String connectorName = on(ctx).get("connectorName");
        Object worker = on(herder).get("worker");
        Object internalKeyConverter = on(worker).get("internalKeyConverter");
        Object internalValueConverter = on(worker).get("internalValueConverter");
        Object offsetBackingStore = on(worker).get("offsetBackingStore");

        return (OffsetStorageReader)
            on(Class.forName("org.apache.kafka.connect.storage.OffsetStorageReaderImpl"))
                .create(offsetBackingStore, connectorName, internalKeyConverter, internalValueConverter)
                .get();

    } catch (Exception e) {
        throw new RuntimeException(e);
    }
}

相关问题