akka websocket客户端到kafka制作人

clj7thdc  于 2021-06-07  发布在  Kafka
关注(0)|答案(1)|浏览(249)

我正在连接到一个websocket(股票市场)并在Flume中从中获取交易,我想将这些交易发布给Kafka。我可以发送一些测试消息(字符串)到Kafka,但无法连接到Kafka出版商的实际交易
这里我有actorsystem,actor materializer,system dispatcher,然后是9092(端口)的React式kafka生产者。然后我在控制台上打印交易,这是好的。而不是打印到控制台,我想把这些交易Kafka生产者。

implicit val system = ActorSystem()
implicit val materializer = ActorMaterializer()
import system.dispatcher

val kafka = new ReactiveKafka()

val producer = ReactiveKafkaProducer[Array[Byte], String](ProducerProperties(
  bootstrapServers = "localhost:9092",
  topic = "binance",
  valueSerializer = new StringSerializer()
))

val flow: Flow[Message, Message, Promise[Option[Message]]] =
  Flow.fromSinkAndSourceMat(
    Sink.foreach(println),
    Source.maybe[Message])(Keep.right)

// Test messages to Kafka Producer is working fine
producer.producer.send(new ProducerRecord("binance","foo"))
producer.producer.send(new ProducerRecord("binance","bar"))

val (upgradeResponse, promise) =
  Http().singleWebSocketRequest(
    WebSocketRequest("wss://stream.binance.com:9443/ws/bnbbtc@trade"),
    flow)
val connected = upgradeResponse.map { upgrade =>
  if (upgrade.response.status == StatusCodes.SwitchingProtocols) {
    Done
  } else {
    throw new RuntimeException(s"Connection failed: ${upgrade.response.status}")
  }
}
connected.onComplete(println)

控制台上打印的交易:success(done)textmessage.strict({“e”:“trade”,“e”:1518536267285,“s”:“bnbbtc”,“t”:9161710,“p”:“0.00106130”,“q”:“7.43000000”,“b”:23819006,“a”:23819013,“t”:1518536267283,“m”:true,“m”:true})textmessage.strict({“e”:“trade”,“e”:1518536267920,“s”:“bnbbtc”,“t”:9161711,“p”:“0.00106210”,“q”:“20.00000000”,“b”:23819014,“a”:23819010,“t”:1518536267917,“m”:false,“m”:true})文本信息。严格({“e”:“trade”,“e”:1518536272108,“s”:“bnbbtc”,“t”:9161712,“p”:“0.00106150”,“q”:“47.03000000”,“b”:23819019,“a”:23819020,“t”:1518536272104,“m”:true,“m”:true})文本信息。严格({“e”:“trade”,“e”:1518536276145,“s”:“bnbbtc”,“t”:9161713,“p”:“0.00106180”,“q”:“1.29000000”,“b”:23819028,“a”:23819027,“t”:1518536276142,“m”:假,“m”:真})
也请让我知道如何处理的消息,因为它是json和发送给Kafka生产者

lstz6jyr

lstz6jyr1#

它和这段代码一起工作

val flow: Flow[Message, Message, Promise[Option[Message]]] =
  Flow.fromSinkAndSourceMat(
    Sink.foreach[Message](record=>producer.producer.send(new ProducerRecord[Array[Byte],String]("binance",record.toString()))),
    Source.maybe[Message])(Keep.right)

相关问题