spark结构化流在读取hdfs时不写入数据

ddrv8njm  于 2021-07-14  发布在  Spark
关注(0)|答案(1)|浏览(280)

我正在开发一个流脚本,它应该在文件到达hdfs后立即提取文件,聚合它们并将它们写到其他地方。
在这里,我无法让写入工作-它创建了元数据文件夹,但没有实际的写入发生。在10多个文件(结构都相同)中,只有一个是编写的,我不知道为什么
有人能帮我吗?

from pyspark.sql import SparkSession
import pyspark.sql.functions as sqlfunc
import argparse, sys
from pyspark.sql import *
from pyspark.sql.functions import *
from datetime import datetime
from pyspark.sql.functions import lit
from pyspark.sql.types import *
from pyspark.sql.functions import udf
from pyspark.sql.types import *
from pyspark.sql.functions import udf, input_file_name, lower
from pyspark.streaming import StreamingContext
import sys
reload(sys)

sys.setdefaultencoding('utf-8')

now = datetime.now()

# create a contexit that supports hive

def create_session(appname):
    spark_session = SparkSession\
        .builder\
        .appName(appname)\
        .enableHiveSupport()\
        .getOrCreate()
    return spark_session

### START MAIN ###

if __name__ == '__main__':
    spark_session = create_session('streaming_monitor')
    ssc = StreamingContext(spark_session, 1)
    print('start')
    print(datetime.now())

    myschema = StructType([
      StructField('text', StringType())
    ])

    #only files after stream starts
    df = spark_session\
        .readStream\
        .option('newFilesOnly', 'true')\
        .option('header', 'true')\
        .schema(myschema)\
        .text('hdfs://nameservice/user/user1/streamtest/')\
        .withColumn("FileName", input_file_name())

    output = df.createOrReplaceTempView('log')
    #hive_dump = spark_session.sql("select '" + str(now) + "' as timestamp, FileName, did_it_error, solution, text from log")

    output = df\
    .writeStream\
    .format("csv")\
    .queryName('logsmonitor')\
    .option("checkpointLocation", "file:///home/user1/analytics/logs/chkpoint_dir")\
    .start('hdfs://nameservice/user/user1/streamtest/output/')\
    .awaitTermination()
4si2a6ki

4si2a6ki1#

您在这里看到的是,spark streaming读取的文件必须以原子方式放入源文件夹。否则,文件将在创建后立即读取(并且没有任何内容)。spark不会对文件中的更新数据执行操作,而是只查看一次文件。
如果您
停止流媒体作业
删除检查点目录(或将所有输入文件重命名为新的唯一名称)
将所有文件移到源文件夹中
等待移动结束
启动流媒体应用程序
当然,如果您想让此作业连续运行并添加越来越多的文件,这将不是一个解决方案,但真正的秘密在于将文件以原子方式一次放入文件夹中。
我并不完全熟悉hdfs,但通常这种原子性可以通过将数据写入另一个文件夹,然后将其移动到源文件夹中来实现。
以下是有关输入源的文档中的参考:
“文件源-读取作为数据流写入目录的文件。文件将按文件修改时间的顺序进行处理。如果设置了latestfirst,则顺序将颠倒。支持的文件格式有text、csv、json、orc、parquet。请参阅datastreamreader接口的文档以获取更为最新的列表,以及每种文件格式支持的选项。请注意,文件必须以原子方式放置在给定的目录中,在大多数文件系统中,这可以通过文件移动操作实现。”

相关问题