Spark Streaming输出模式仅处理新消息

0g0grzrc  于 6个月前  发布在  Apache
关注(0)|答案(1)|浏览(55)

我使用nats-spark-connector(https://github.com/nats-io/nats-spark-connector/tree/main/load_balanced)连接到NATS Jetstream并使用Spark Java代码消费消息和流程。

private static void sparkNatsTester() {
        SparkSession spark = SparkSession.builder()
                .appName("spark-with-nats")
                .master("local")
//              .config("spark.logConf", "false")
                  .config("spark.jars",
                  "libs/nats-spark-connector-balanced_2.12-1.1.4.jar,"+"libs/jnats-2.17.1.jar"
                  )
//                .config("spark.executor.instances", "2")
//                .config("spark.cores.max", "4")
//                .config("spark.executor.memory", "2g")
                  .getOrCreate();
        System.out.println("sparkSession : "+ spark);
        Dataset<Row> df = spark.readStream()
                .format("nats")
                .option("nats.host", "localhost")
                .option("nats.port", 4222)
                .option("nats.stream.name", "my_stream3")
                .option("nats.stream.subjects", "mysub3")
                // wait 90 seconds for an ack before resending a message
                .option("nats.msg.ack.wait.secs", 90)
                //.option("nats.num.listeners", 2)
                // Each listener will fetch 10 messages at a time
               // .option("nats.msg.fetch.batch.size", 10)
                .load();
        System.out.println("Successfully read nats stream !");
        
        StreamingQuery query;
        try {
            query = df.writeStream()
                      .outputMode("append")
                      .format("console")
                      .option("truncate", false)
                      .start();
            query.awaitTermination();
        } catch (Exception e) {
            e.printStackTrace();
        } 
    }

字符串
根据nats-io jetstream指南(https://docs.nats.io/nats-concepts/jetstream/js_walkthrough),使用以下命令将消息发布到流(主题名称:mysub 3)

nats pub foo --count=1000 --sleep 1s "publication #{{Count}} @ {{TimeStamp}}"


在将消息发布到nats流之后,代码的输出是:

Successfully read nats stream !
Status change nats: connection opened
-------------------------------------------
Batch: 0
-------------------------------------------
+-------+--------+-------+
|subject|dateTime|content|
+-------+--------+-------+
+-------+--------+-------+

-------------------------------------------
Batch: 1
-------------------------------------------
+-------+--------+-------+
|subject|dateTime|content|
+-------+--------+-------+
+-------+--------+-------+

-------------------------------------------
Batch: 2
-------------------------------------------
+-------+---------------------------+------------------------------------------+
|subject|dateTime                   |content                                   |
+-------+---------------------------+------------------------------------------+
|mysub3 |10/31/2023 - 18:04:31 +0530|publication #1 @ 2023-10-31T18:04:31+05:30|
+-------+---------------------------+------------------------------------------+

-------------------------------------------
Batch: 3
-------------------------------------------
+-------+---------------------------+------------------------------------------+
|subject|dateTime                   |content                                   |
+-------+---------------------------+------------------------------------------+
|mysub3 |10/31/2023 - 18:04:31 +0530|publication #1 @ 2023-10-31T18:04:31+05:30|
|mysub3 |10/31/2023 - 18:04:32 +0530|publication #2 @ 2023-10-31T18:04:32+05:30|
|mysub3 |10/31/2023 - 18:04:33 +0530|publication #3 @ 2023-10-31T18:04:33+05:30|
+-------+---------------------------+------------------------------------------+

-------------------------------------------
Batch: 4
-------------------------------------------
+-------+---------------------------+------------------------------------------+
|subject|dateTime                   |content                                   |
+-------+---------------------------+------------------------------------------+
|mysub3 |10/31/2023 - 18:04:31 +0530|publication #1 @ 2023-10-31T18:04:31+05:30|
|mysub3 |10/31/2023 - 18:04:32 +0530|publication #2 @ 2023-10-31T18:04:32+05:30|
|mysub3 |10/31/2023 - 18:04:33 +0530|publication #3 @ 2023-10-31T18:04:33+05:30|
|mysub3 |10/31/2023 - 18:04:34 +0530|publication #4 @ 2023-10-31T18:04:34+05:30|
+-------+---------------------------+------------------------------------------+

-------------------------------------------
Batch: 5
-------------------------------------------
+-------+---------------------------+------------------------------------------+
|subject|dateTime                   |content                                   |
+-------+---------------------------+------------------------------------------+
|mysub3 |10/31/2023 - 18:04:31 +0530|publication #1 @ 2023-10-31T18:04:31+05:30|
|mysub3 |10/31/2023 - 18:04:32 +0530|publication #2 @ 2023-10-31T18:04:32+05:30|
|mysub3 |10/31/2023 - 18:04:33 +0530|publication #3 @ 2023-10-31T18:04:33+05:30|
|mysub3 |10/31/2023 - 18:04:34 +0530|publication #4 @ 2023-10-31T18:04:34+05:30|
|mysub3 |10/31/2023 - 18:04:35 +0530|publication #5 @ 2023-10-31T18:04:35+05:30|
+-------+---------------------------+------------------------------------------+

-------------------------------------------
Batch: 6
-------------------------------------------
+-------+---------------------------+------------------------------------------+
|subject|dateTime                   |content                                   |
+-------+---------------------------+------------------------------------------+
|mysub3 |10/31/2023 - 18:04:31 +0530|publication #1 @ 2023-10-31T18:04:31+05:30|
|mysub3 |10/31/2023 - 18:04:32 +0530|publication #2 @ 2023-10-31T18:04:32+05:30|
|mysub3 |10/31/2023 - 18:04:33 +0530|publication #3 @ 2023-10-31T18:04:33+05:30|
|mysub3 |10/31/2023 - 18:04:34 +0530|publication #4 @ 2023-10-31T18:04:34+05:30|
|mysub3 |10/31/2023 - 18:04:35 +0530|publication #5 @ 2023-10-31T18:04:35+05:30|
|mysub3 |10/31/2023 - 18:04:36 +0530|publication #6 @ 2023-10-31T18:04:36+05:30|
|mysub3 |10/31/2023 - 18:04:37 +0530|publication #7 @ 2023-10-31T18:04:37+05:30|
+-------+---------------------------+------------------------------------------+


它继续为每个批次打印相同的消息集。由于我使用outputMode("append"),我希望每个批次只打印新发布的消息。但所有消息,包括在前一批次打印的消息,都包含在后续批次中。我也尝试了outputMode("update")。它给出了与append相同的输出。你能帮助我如何确保每个批次都有新的消息吗?批处理只打印/接收新推送的消息?

wmtdaxz3

wmtdaxz31#

找到了这个场景的解决方案!消息在每一批中都重复,因为在消费它之后没有向NATS确认消息。必须使用持久消费者并将消费者名称作为spark代码中的选项传递。
创建持久消费者的说明:https://docs.nats.io/nats-concepts/jetstream/js_walkthrough#3.-creating-a-consumer
耐用消费品相关信息:https://github.com/nats-io/nats-spark-connector/tree/main/load_balanced

  • ”nats.durable.name“持久订阅允许客户端在创建订阅时为其分配持久名称。这样做会导致NATS流服务器跟踪该clientID +持久名称的最后一条确认消息,以便只有自最后一条确认消息以来的消息才会被传递到客户端。强制配置。*

通过使用键“nats.durable.name"将此消费者名称作为选项传递:

Dataset<Row> df = spark.readStream()
                .format("nats")
                .option("nats.host", "localhost")
                .option("nats.port", 4222)
                .option("nats.stream.name", "my_stream3")
                .option("nats.stream.subjects", "mysub3")
                // wait 90 seconds for an ack before resending a message
                .option("nats.msg.ack.wait.secs", 90)
                //.option("nats.num.listeners", 2)
                // Each listener will fetch 10 messages at a time
               // .option("nats.msg.fetch.batch.size", 10)
                  .option("nats.durable.name", "my_consumer")
                .load();

字符串
这将确保只有最后一条确认消息之后的消息才会被传递到客户端。

相关问题