演员的Spark流

nhhxz33t  于 2021-06-07  发布在  Kafka
关注(0)|答案(1)|浏览(271)

我想有一个消费者演员订阅Kafka主题和流数据与Spark流消费者以外的进一步处理。为什么是演员?因为我读到它的主管策略是处理Kafka失败(例如,失败后重新启动)的好方法。
我找到了两个选择:
java KafkaConsumer 类别:its poll() 方法返回 Map[String, Object] . 我想要一个 DStream 被归还就像 KafkaUtils.createDirectStream 会的,我不知道如何从演员的外边获取流。
扩展 ActorHelper 特性和用途 actorStream() 如本例所示。后一个选项不显示到主题的连接,而是显示到套接字的连接。
有人能给我指出正确的方向吗?

dgiusagp

dgiusagp1#

为了处理kafka故障,我使用了apache curator框架和以下解决方法:

val client: CuratorFramework = ... // see docs
val zk: CuratorZookeeperClient = client.getZookeeperClient

/**
  * This method returns false if kafka or zookeeper is down.
  */ 
def isKafkaAvailable:Boolean = 
   Try {
      if (zk.isConnected) {
        val xs = client.getChildren.forPath("/brokers/ids")
        xs.size() > 0
      }
      else false
    }.getOrElse(false)

对于Kafka主题,我使用 com.softwaremill.reactivekafka 图书馆。例如:

class KafkaConsumerActor extends Actor {
   val kafka = new ReactiveKafka()
   val config: ConsumerProperties[Array[Byte], Any] = ... // see docs

   override def preStart(): Unit = {
      super.preStart()

      val publisher = kafka.consume(config)
      Source.fromPublisher(publisher)
            .map(handleKafkaRecord)
            .to(Sink.ignore).run()
   }

   /**
     * This method will be invoked when any kafka records will happen.
     */
   def handleKafkaRecord(r: ConsumerRecord[Array[Byte], Any]) = {
      // handle record
   }
}

相关问题