Influxdb Alpakka连接器未写入数据库

ocebsuys  于 2022-11-23  发布在  InfluxDB
关注(0)|答案(1)|浏览(173)

我正在尝试写入Influxdb(在2.0版本的docker容器中运行)。我使用Scala和Reactive Streams。因此使用Alpakka连接器(https://doc.akka.io/docs/alpakka/current/influxdb.html),因为Scala Reactive Client(https://github.com/influxdata/influxdb-client-java/tree/master/client-scala)不支持写入数据库。
无论我如何尝试写入数据库,数据都不会写入其中。

Source
    .tick(1.seconds, 1.seconds,
      Seq(
        InfluxDbWriteMessage.create(
          Point
            .measurement("cpu_load_short")
            .addField("host", "server01")
            .addField("value", 0.64)
            .tag("region", "us-west")
            .time(DateTime.now.getMillis, java.util.concurrent.TimeUnit.MILLISECONDS)
            .build,
        ).withDatabaseName("database"),
      )
    )
    .toMat(
      InfluxDbSink.create()(
        InfluxDBFactory.connect("http://localhost:9091", "admin", "admin123")
      )
    )(Keep.right)
    .run.andThen { case Success(posts) => print("done") }

此外,“完成”是从来没有打印,所以我假设未来是永远不会完成的,因此某处是一个问题。
唯一被打印出来的是

Pong{version=2.1.1, responseTime=68}

我遗漏了什么,以至于无法编写。是因为Alpakka连接器是为InfluxDB之前的版本2编写的,因此无法工作吗?

js5cn81o

js5cn81o1#

虽然我自己没有尝试过,但官方的InfluxDB Alpakka connector可能无法将记录写入InfluxDB 2.x,所以我猜您的观察是正确的。
对我有用的是:
最新的 influxdb 2.x Docker映像和sbt导入:

"org.influxdb" % "influxdb-java" % "2.22",
  "com.influxdb" %% "influxdb-client-scala" % "4.3.0",
  "com.influxdb" % "flux-dsl" % "4.3.0",

对于从influxdb-client-java编写同步Java API WriteApiBlocking,由于此discussion about write performance,类似于:

public CompletionStage<Done> writeTestPoints(int nPoints, String sensorID) {
    List<Integer> range = IntStream.rangeClosed(1, nPoints).boxed().collect(Collectors.toList());
    Source<Integer, NotUsed> source = Source.from(range);
    CompletionStage<Done> done = source
            .groupedWithin(10, Duration.ofMillis(100))
            .mapAsyncUnordered(10, each -> this.eventHandlerPointBatch(each, influxDBClient.getWriteApiBlocking(), nPoints, sensorID))
            .runWith(Sink.ignore(), system);
    return done;
}

private CompletionStage<Done> eventHandlerPointBatch(List<Integer> hrs, WriteApiBlocking writeApi, int nPoints, String sensorID) {
    LOGGER.info("Writing points: {}-{} ", sensorID, hrs);
    List<Point> points = hrs.stream().map(each -> createPoint(nPoints, sensorID, System.nanoTime(), each)).collect(Collectors.toList());
    writeApi.writePoints(points);
    return CompletableFuture.completedFuture(Done.done());
}

完整示例:InfluxdbWriter
用于阅读您提到的“ScalaReact式客户端”influxdb-client-scala
完整示例:InfluxdbReader
集成测试InfluxdbIT通过测试容器引导InfluxDB 2.x Docker映像,并运行上述类。
希望这对你有帮助
保禄

相关问题