多次调用kafka streams streambuilder.build()可以吗

mznpcxlj  于 2021-07-07  发布在  Java
关注(0)|答案(1)|浏览(278)

我们使用的是micronaut/kafka流。使用此框架来创建streams应用程序,您可以构建如下内容:

@Factory
public class FooTopologyConfig {
  @Singleton
  @Named
  public KStream<String, FooPojo> configureTopology {
    return builder.stream("foo-topic-in")
           .peek((k,v) -> System.out.println(String.format("key %s, value: %s", k,v))
           .to("foo-topic-out");
  }
}

这是:
收到 ConfiguredStreamBuilder (包裹得很轻 StreamsBuilder )
构建并返回流(我们实际上不确定返回流有多重要,但这是另一个问题)。 ConfiguredStreamBuilder::build() (在 StreamsBuilder )稍后由框架调用并返回 Topology 不能通过micronaut进行注射。
我们想要 Topology bean来记录拓扑的描述(通过 Topology::describe ).
这样做安全吗?
呼叫 ConfiguredStreamBuilder::build (因此 StreamsBuilder::build )并使用 Topology 打印可读的描述。
允许框架调用 ConfiguredStreamBuilder::build 然后,使用返回拓扑的第二个示例来构建应用程序。

ej83mcc0

ej83mcc01#

打电话应该没问题 build() 多次。这在流的内部代码以及测试中都很常见。
回答你的另一个问题。你只需要从 builder.stream() 如果以后要在拓扑的该分支上展开,请执行以下操作。

相关问题