Scala、ZIO、Kafka -如何将Producer层添加到范围

myzjeezk  于 6个月前  发布在  Scala
关注(0)|答案(1)|浏览(82)

我有一个类,它向Kafka生成消息(使用zio-kafka):

case class MyObject(name: String, otherObject: OtherObject)

class MyProducer(config: KafkaConfiguration) {
   val layer: ZLayer[Any, Throwable, Producer] = config.layer() //it creates layer

  def produceEvent(e: MyObject): RIO[Producer, Unit] = {
    val message = new ProducerRecord(topic, e)
Producer.produce(message, Serde.int, Serde.Json).unit
}

}

object MyProducer {
  val layer: ZLayer[SomeConfig, Throwable, MyProducer] = ZLayer(for { ... } yield new MyProducer(config)
}

字符串
现在我想在其他地方使用这个produceEvent方法:

class A(producer: MyProducer) {
  def doSmth(): ZIO[Producer, Throwable, Unit] {

   for {
    // some logic 
    _ <- producer.produceEvent(event)

   } yield ()
   
  } 
}


问题是在运行时我得到一个错误:

Defect in zio.Environment: Could not find Producer inside Environment


我不知道如何向scope提供Producer。我正确地提供了MyProducer,但是我需要返回ZIO[Producer, Throwable, Unit]。也许有其他解决方案可以从环境中“剪切”这个Producer,然后返回Task[Unit]

4uqofj5v

4uqofj5v1#

实际上,将Producer保持在MyProducer本地是有意义的。
您可以立即“提供”:

def produceEvent(e: MyObject): IO[Unit] = {
  val message = new ProducerRecord(topic, e)
  Producer.produce(message, Serde.int, Serde.Json)
    .unit
    .provide(layer)
}

字符串

相关问题