scala fs2:流启动后如何执行操作(“doOnSubscribe”)?

sxissh06  于 5个月前  发布在  Scala
关注(0)|答案(1)|浏览(75)

我尝试在猫效果IO-App的上下文中使用不纯(“java”)API。不纯API看起来有点像这样:

import  io.reactivex.Flowable
import java.util.concurrent.CompletableFuture

trait ImpureProducer[A] {
  /** the produced output - must be subscribed to before calling startProducing() */
  def output: Flowable[A]

  /** Makes the producer start publishing its output to any subscribers of `output`. */
  def startProducing(): CompletableFuture[Unit]
}

字符串
(Of当然,还有更多的方法,包括stopProducing(),但这些与我的问题无关。
我的(也许是天真的)适应API的方法如下所示(利用Flowableorg.reactivestreams.Publisher的事实):

import cats.effect.IO
import fs2.Stream
import fs2.interop.reactivestreams.*

class AdaptedProducer[A](private val underlying: ImpureProducer[A]) {
  def output: Stream[IO, A] =
    underlying.output.toStreamBuffered(1)
  def startProducing: IO[Unit] = 
    IO.fromCompletableFuture(IO(underlying.startProducing()))
}


我的问题是:如何确保在评估startProducing之前订阅了output-流?
例如,我如何修复以下尝试获取生成的第一个项目的IO:

import cats.Parallel
import cats.effect.IO

def firstOutput[A](producer: AdaptedProducer[A]): IO[A] = {
  val firstOut: IO[A] = producer.output.take(1).compile.onlyOrError
  // this introduces a race condition: it is not ensured that the output-stream
  // will already be subscribed to when startProducing is evaluated.
  Parallel[IO].parProductL(firstOut)(producer.startProducing)
}

tyky79it

tyky79it1#

这可以使用新的Javaflow互操作来完成。
请注意,reactive-streamsintoop * 在添加flow后 * 已被弃用 *。API经过重新设计以处理更多情况 (如本例),并且从头开始重新实现以提高效率。
如果您正在使用的API没有基于flow的版本,则可以使用FlowAdapters Package 它。
代码如下所示:

import org.reactivestreams.FlowAdapters

final class AdaptedProducer[A](underlying: ImpureProducer[A], chunkSize: Int) {
  val run: Stream[IO, A] =
    fs2.interop.flow.fromPublisher(chunkSize) { subscriber =>
       IO(
         underlying.output.subscribe(
            FlowAdapters.toFlowSubscriber(
             subscriber
            )
         )
       ) >>
       IO.fromCompletableFuture(IO(underlying.startProducing()))
    }    
}

字符串
这确保了只有当您实际开始使用Stream并在调用subscribe之后,才调用startProducing
希望能帮到你:D

相关问题