spark结构化流媒体-

zqdjd7g9  于 2021-06-08  发布在  Kafka
关注(0)|答案(1)|浏览(531)

我正在尝试从intellij idea运行以下代码,以便将kafka的消息打印到console。但它抛出以下错误-

Exception in thread "main" org.apache.spark.sql.AnalysisException: Queries with streaming sources must be executed with writeStream.start();;

stacktrace开始于 Dataset.checkpoint 再往上走。如果我移除 .checkpoint() 然后我得到了一些与权限相关的错误

17/08/02 12:10:52 ERROR StreamMetadata: Error writing stream metadata StreamMetadata(4e612f22-efff-4c9a-a47a-a36eb533e9d6) to C:/Users/rp/AppData/Local/Temp/temporary-2f570b97-ad16-4f00-8356-d43ccb7660db/metadata
java.io.IOException: (null) entry in command string: null chmod 0644 C:\Users\rp\AppData\Local\Temp\temporary-2f570b97-ad16-4f00-8356-d43ccb7660db\metadata

资料来源:

def main(args : Array[String]) = {
 val spark = SparkSession.builder().appName("SparkStreaming").master("local[*]").getOrCreate()
  val canonicalSchema = new StructType()
                          .add("cid",StringType)
                          .add("uid",StringType)
                          .add("sourceSystem",
                              new StructType().add("id",StringType)
                                              .add("name",StringType))
                          .add("name", new StructType()
                                        .add("firstname",StringType)
                                        .add("lastname",StringType))

val messages = spark
                    .readStream
                    .format("kafka")
                    .option("kafka.bootstrap.servers","localhost:9092")
                    .option("subscribe","c_canonical")
                    .option("startingOffset","earliest")
                    .load()
                    .checkpoint()
.select(from_json(col("value").cast("string"),canonicalSchema))
.writeStream.outputMode("append").format("console").start.awaitTermination

 }

有人能帮我理解我做错了什么吗?

chy5wohz

chy5wohz1#

结构化流媒体不支持 Dataset.checkpoint() . 有一个开放的票证可以提供更好的信息,或者干脆忽略它:https://issues.apache.org/jira/browse/spark-20927
ioexception可能是因为您没有在windows上安装cygwin。

相关问题