scala—如何将http管理到akka流中并将消息发送给kafka?

nlejzf6q  于 2021-06-06  发布在  Kafka
关注(0)|答案(1)|浏览(324)

我从akka streams开始,我想构建一个服务器作为stream来接收 Http.IncomingConnection 并将收到的信息作为普通接收器发送给Kafka。
我声明我的来源如下:

val serverSource: Source[Http.IncomingConnection, Future[Http.ServerBinding]] =
    Http().bind(interface = "localhost", port = "8080")

然后,我想从httprequest的主体中提取消息(字符串),最后将字符串发送给kafka。流程如下所示:

val bindingFuture: Future[Http.ServerBinding] = serverSource
     .map(???) //Here, I need to extract the message
     .map(message => new ProducerRecord[String, String](topic, message.result(2 seconds)))
     .runWith(akka.kafka.scaladsl.Producer.plainSink(producerSettings))

但是,我不知道如何提取信息。我想这样做:

val requestHandler: HttpRequest => HttpResponse = {
    case HttpRequest(POST, Uri.Path("/publish"), _, _, _) => {
      HttpResponse(202, entity = "Message sent to Kafka!")
    }
    case r: HttpRequest =>
      r.discardEntityBytes() // important to drain incoming HTTP Entity stream
      HttpResponse(404, entity = "Unknown resource!")
  }

但是,使用 connection handleWithSyncHandler requestHandler 我无法获取流进程所遵循的消息。同时,我也希望能收到你的任何请求 /publish uri,或者在流内部返回404。
有可能这样做吗?

4bbkushb

4bbkushb1#

改用指令
路由dsl将比试图处理数据更容易使用 HttpRequest 手工操作:

val route : Route = 
  post {
    path("publish") {
      extractRequestEntity { entity =>
        onComplete(entity.toStrict(10.seconds).map(_.data.utf8String){ message =>
          Producer.plainSink(producerSettings)(
            new ProducerRecord[String, String](topic, message.result(2 seconds))
          )
          complete(StatusCodes.OK)
        } 
      }
    }
  }

现在可以将其传入以处理传入的请求:

Http().bindAndHandle(
  route,
  "localhost",
  8080
)

相关问题