我尝试在猫效果IO-App的上下文中使用不纯(“java”)API。不纯API看起来有点像这样:
import io.reactivex.Flowable
import java.util.concurrent.CompletableFuture
trait ImpureProducer[A] {
/** the produced output - must be subscribed to before calling startProducing() */
def output: Flowable[A]
/** Makes the producer start publishing its output to any subscribers of `output`. */
def startProducing(): CompletableFuture[Unit]
}
字符串
(Of当然,还有更多的方法,包括stopProducing(),但这些与我的问题无关。
我的(也许是天真的)适应API的方法如下所示(利用Flowable
是org.reactivestreams.Publisher
的事实):
import cats.effect.IO
import fs2.Stream
import fs2.interop.reactivestreams.*
class AdaptedProducer[A](private val underlying: ImpureProducer[A]) {
def output: Stream[IO, A] =
underlying.output.toStreamBuffered(1)
def startProducing: IO[Unit] =
IO.fromCompletableFuture(IO(underlying.startProducing()))
}
型
我的问题是:如何确保在评估startProducing
之前订阅了output
-流?
例如,我如何修复以下尝试获取生成的第一个项目的IO:
import cats.Parallel
import cats.effect.IO
def firstOutput[A](producer: AdaptedProducer[A]): IO[A] = {
val firstOut: IO[A] = producer.output.take(1).compile.onlyOrError
// this introduces a race condition: it is not ensured that the output-stream
// will already be subscribed to when startProducing is evaluated.
Parallel[IO].parProductL(firstOut)(producer.startProducing)
}
型
1条答案
按热度按时间tyky79it1#
这可以使用新的Java
flow
互操作来完成。请注意,reactive-streamsintoop * 在添加
flow
后 * 已被弃用 *。API经过重新设计以处理更多情况 (如本例),并且从头开始重新实现以提高效率。如果您正在使用的API没有基于
flow
的版本,则可以使用FlowAdapters
Package 它。代码如下所示:
字符串
这确保了只有当您实际开始使用
Stream
并在调用subscribe
之后,才调用startProducing
。希望能帮到你:D