pyspark结构化流数据写入cassandra而不是填充数据

suzh9iv8  于 2021-06-14  发布在  Cassandra
关注(0)|答案(1)|浏览(267)

我想把spark结构化流数据写入cassandra。我的spark版本是2.4.0。
我来自kafka的输入源带有json,因此在写入控制台时,这是可以的,但是当我在cqlsh cassandra中查询时,没有记录附加到表中。你能告诉我怎么了吗?

schema = StructType() \
            .add("humidity", IntegerType(), True) \
            .add("time", TimestampType(), True) \
            .add("temperature", IntegerType(), True) \
            .add("ph", IntegerType(), True) \
            .add("sensor", StringType(), True) \
            .add("id", StringType(), True)

def writeToCassandra(writeDF, epochId):
    writeDF.write \
        .format("org.apache.spark.sql.cassandra") \
        .mode('append') \
        .options("spark.cassandra.connection.host", "cassnode1, cassnode2") \
        .options(table="sensor", keyspace="sensordb") \
        .save()

# Load json format to dataframe

df = spark \
      .readStream \
      .format("kafka") \
      .option("kafka.bootstrap.servers", "kafkanode") \
      .option("subscribe", "iot-data-sensor") \
      .load() \
      .select([
            get_json_object(col("value").cast("string"), "$.{}".format(c)).alias(c)
            for c in ["humidity", "time", "temperature", "ph", "sensor", "id"]])

df.writeStream \
    .foreachBatch(writeToCassandra) \
    .outputMode("update") \
    .start()
oknwwptz

oknwwptz1#

我在Pypark也有同样的问题。尝试以下步骤
首先,验证它是否连接到cassandra。您可以指向一个不可用的表,查看它是否因为“找不到表”而失败
尝试如下所示的writestream(在调用cassandra update之前包括触发器和输出模式) df.writeStream \ .trigger(processingTime="10 seconds") \ .outputMode("update") \ .foreachBatch(writeToCassandra) \

相关问题