databricks:结构化流因timeoutexception而失败

gupuwyp2  于 2021-05-19  发布在  Spark
关注(0)|答案(0)|浏览(472)

我想用kafka源代码在databricks中创建一个结构化流。我按照这里的说明做了。我的脚本似乎开始了,但是在流的第一个元素中失败了。当我使用时,流itsellf工作良好,产生结果并工作(在databricks中) confluent_kafka 因此,我似乎遗漏了另一个问题:
处理初始流后,脚本超时:

java.util.concurrent.TimeoutException: Stream Execution thread for stream [id = 80afdeed-9266-4db4-85fa-66ccf261aee4, 
runId = b564c626-9c74-42a8-8066-f1f16c7ab53d] failed to stop within 36000 milliseconds (specified by spark.sql.streaming.stopTimeout). See the cause on what was being executed in the streaming query thread.`

我尝试的是:寻找答案,包括 spark.conf.set("spark.sql.streaming.stopTimeout", 36000) 进入我的设置-这没有改变什么。
非常感谢您的任何意见!

from pyspark.sql import functions as F
from pyspark.sql.types import *

# Define a data schema

schema = StructType() \
           .add('PARAMETERS_TEXTVALUES_070_VALUES', StringType())\
           .add('ID', StringType())\
           .add('PARAMETERS_TEXTVALUES_001_VALUES', StringType())\
           .add('TIMESTAMP', TimestampType())

df = spark \
    .readStream \
    .format("kafka") \
    .option("host", "stream.xxx.com") \
    .option("port", 12345)\
    .option('kafka.bootstrap.servers', 'stream.xxx.com:12345') \
    .option('subscribe', 'stream_test.json') \
    .option("startingOffset", "earliest") \
    .load()

df_word = df.select(F.col('key').cast('string'),
                    F.from_json(F.col('value').cast('string'), schema).alias("parsed_value"))

df_word \
      .writeStream \
      .format("parquet") \
      .option("path", "dbfs:/mnt/streamfolder/stream/") \
      .option("checkpointLocation", "dbfs:/mnt/streamfolder/check/") \
      .outputMode("append") \
      .start()

我的流输出数据如下所示:

"PARAMETERS_TEXTVALUES_070_VALUES":'something'
"ID":"47575963333908"
"PARAMETERS_TEXTVALUES_001_VALUES":12345
"TIMESTAMP": "2020-10-22T15:06:42.507+02:00"

此外, stream 以及 check 文件夹中填充了0-b文件,但 metadata ,其中包括ì从上面的错误中删除。
谢谢,注意安全。

暂无答案!

目前还没有任何答案,快来回答吧!

相关问题