数据流中的valueerror:无效的gcs位置:无

pprl5pva  于 2021-09-08  发布在  Java
关注(0)|答案(0)|浏览(172)

我正在尝试从gcs bucket加载数据,并将内容发布到pubsub和bigquery。以下是我的管道选项:

options = PipelineOptions(
      project = project,
      temp_location = "gs://dataflow-example-bucket6721/temp21/",
      region = 'us-east1',
      job_name = "dataflow2-pubsub-09072021",
      machine_type = 'e2-standard-2',
   )

这是我的管道

data = p | 'CreateData' >> beam.Create(sum([fileName()], []))

jsonFile =  data | "filterJson" >> beam.Filter(filterJsonfile)

JsonData = jsonFile | "JsonData" >> beam.Map(readFromJson)

split_data = JsonData | 'Split Data' >> ParDo(CheckForValidData()).with_outputs("ValidData", "InvalidData")

ValidData = split_data.ValidData
InvalidData = split_data.InvalidData
data_ = split_data[None]

publish_data = ValidData | "Publish msg" >> ParDo(publishMsg())

ToBQ = ValidData | "To BQ" >> beam.io.WriteToBigQuery(
            table_spec,
            #schema=table_schema,
            create_disposition=beam.io.BigQueryDisposition.CREATE_IF_NEEDED,
            write_disposition=beam.io.BigQueryDisposition.WRITE_APPEND)

数据在interactiverunner中流动良好,但在dataflowrunner中显示错误,如
valueerror:无效的gcs位置:无。使用文件加载方法写入bigquery需要提供一个gcs位置来写入要加载到bigquery中的文件。请通过writetobigquery构造函数中的自定义\u gcs\u temp\u位置或回退选项--temp\u location提供gcs存储桶,或将method=“streaming\u inserts”传递给writetobigquery[运行“[15]:到bq/bigquerybatchfileloads/generatefileprefix”]
显示地面军事系统位置错误,建议添加临时位置。但我已经添加了临时位置。

暂无答案!

目前还没有任何答案,快来回答吧!

相关问题