我在用ApacheBeam和directrunner。我有5个dofns包裹与pardos是一个接一个应用。当管道运行()时,第一个dofn处理它的所有工作,然后是第二个,然后是第三个。我希望第二个dofn在第一个dofn发出输出后立即开始工作,这样处理将是并行的,因为目前最多有一个dofn在任何给定时间工作(相关:apachebeam中dofn的线程同步)。
val pipelineOptions = PipelineOptionsFactory
.fromArgs("--streaming", "--experiments=use_runner_v2")
.withValidation()
.create()
.`as`(DirectOptions::class.java)
pipelineOptions.isBlockOnRun = true
pipelineOptions.isEnforceEncodability = true
pipelineOptions.isEnforceImmutability = true
pipelineOptions.targetParallelism = Runtime.getRuntime().availableProcessors() * 2
pipelineOptions.appName = name
val pipeline = Pipeline.create(pipelineOptions)
pipeline
.apply(...)
.apply(...)
.apply(...)
.apply(...)
.apply(...)
.apply(...)
pipeline
.run()
.waitUntilFinish()
暂无答案!
目前还没有任何答案,快来回答吧!