apachebeam管道自动删除它生成的输出文件

4sup72z8  于 2021-05-19  发布在  Spark
关注(0)|答案(0)|浏览(393)

长话短说:当使用beam管道简单地读取tf记录文件并将其写入另一个文件时,它可以工作。但是,当使用它读取匹配多个tf记录文件的输入文件模式并将内容写入单个输出文件时,它会自动删除输出文件,并在运行结束时发出警告。我是新来的Apacheè束和Spark,所以任何帮助是感激的。
具体情况如下
我试着在spark上运行一个简单的beam脚本。spark设置如下:

./sbin/start-master.sh -h localhost -p 1313
./sbin/start-slave.sh spark://localhost:1313 -c 1 -m 128G

我设立了一个就业服务如下(以下https://beam.apache.org/documentation/runners/spark/):

docker run --net=host apache/beam_spark_job_server:latest --spark-master-url=spark://localhost:1313

beam python脚本如下(它只读取一个或多个tf记录并将其写入输出文件):

import apache_beam as beam
from apache_beam import coders
from apache_beam.options.pipeline_options import PipelineOptions
import sys

input_file = sys.argv[1]
output_file = sys.argv[2]

options = PipelineOptions(
[
    "--runner=PortableRunner",
    "--job_endpoint=localhost:8099",
    "--environment_type=LOOPBACK",
]
)

with beam.Pipeline(options=options) as pipeline:
    filepattern = input_file
    readers = []
    readers.append(pipeline
                   | 'ReadTFRecordFiles_{}[{}]'.format(0, filepattern) >> beam
                   .io.ReadFromTFRecord(filepattern, coder=coders.BytesCoder()))
    input_data = readers | 'Flatten' >> beam.Flatten()
    _ = input_data | beam.io.WriteToTFRecord(
        file_path_prefix=output_file,
        file_name_suffix='.tfrecord.gz',
        coder=coders.BytesCoder())

我使用spark submit以以下方式运行它: ./bin/spark-submit --master spark://localhost:1313 --driver-memory 128G --executor-memory 256G ~/test_spark_writer.py $input_file /tmp/portable 当我设置 $input_file 到单个文件(例如, $FILEPATH/training_set.with_label.tfrecord-00000-of-00030.gz ),输出文件可用。但是,当我 $input_file 文件模式(例如, $FILEPATH/training_set.with_label.tfrecord-?????-of-00030.gz ),我看到以下警告,并且没有可用的输出文件 WARNING:apache_beam.io.filebasedsink:Deleting 1 existing files in target path matching: -*-of-%(num_shards)05d 我确保在每次操作之前清除输出目录中以前的任何输出文件 spark-submit .
注意:使用directrunner而不是portablerunner时的行为是相同的,portablerunner不需要spark设置。

暂无答案!

目前还没有任何答案,快来回答吧!

相关问题