合并多个Parquet文件并使用aws胶水在s3中创建一个更大的Parquet文件

nfg76nw0  于 2021-07-13  发布在  Spark
关注(0)|答案(1)|浏览(521)

我试图合并多个Parquet文件使用aws胶水作业。我知道这里提到的类似问题和可能的解决办法。我试过了,但似乎不起作用。这是我的示例代码:

import sys
from awsglue.transforms import *
from awsglue.utils import getResolvedOptions
from pyspark.context import SparkContext
from awsglue.context import GlueContext
from awsglue.job import Job
from awsglue.dynamicframe import DynamicFrame

## @params: [JOB_NAME]

args = getResolvedOptions(sys.argv, ['JOB_NAME'])

sc = SparkContext()
glueContext = GlueContext(sc)
spark = glueContext.spark_session
job = Job(glueContext)

job.init(args['JOB_NAME'], args)

df = glueContext.create_dynamic_frame.from_options(connection_type="parquet", connection_options={'paths': ["s3://bucket-name/parquet/2021/02/15/15/"]})
partitioned_df=df.toDF().repartition(1)
partitioned_dynamic_df=DynamicFrame.fromDF(partitioned_df,glueContext,"partitioned_df")

datasink0=glueContext.write_dynamic_frame.from_options(frame=partitioned_dynamic_df,connection_type="s3", connection_options={'paths':["s3://bucket-name/output/"]}, format="parquet")

job.commit()

我已经印好了 partitioned_dynamic_df 它是所有镶块的组合。但我一直收到这个错误信息,不知道如何解决。

Traceback (most recent call last):
File "/opt/amazon/spark/python/lib/pyspark.zip/pyspark/sql/utils.py", line 63, in deco return f(*a,**kw) File "/opt/amazon/spark/python/lib/py4j-0.10.7-src.zip/py4j/protocol.py", line 328, in get_return_value format(target_id, ".", name), value) py4j.protocol.Py4JJavaError: An error occurred while calling o76.pyWriteDynamicFrame. : java.lang.IllegalArgumentException: Expected exactly one path to be specified, but got: at org.apache.spark.sql.execution.datasources.DataSource.planForWritingFileFormat(DataSource.scala:427
 at org.apache.spark.sql.execution.datasources.DataSource.planForWriting(DataSource.scala:524
 at org.apache.spark.sql.DataFrameWriter.saveToV1Source(DataFrameWriter.scala:290
 at org.apache.spark.sql.DataFrameWriter.save(DataFrameWriter.scala:271
 at com.amazonaws.services.glue.SparkSQLDataSink$$anonfun$writeDynamicFrame$1.apply(DataSink.scala:535
 at com.amazonaws.services.glue.SparkSQLDataSink$$anonfun$writeDynamicFrame$1.apply(DataSink.scala:522
 at com.amazonaws.services.glue.util.FileSchemeWrapper$$anonfun$executeWithQualifiedScheme$1.apply(FileSchemeWrapper.scala:66
 at com.amazonaws.services.glue.util.FileSchemeWrapper$$anonfun$executeWithQualifiedScheme$1.apply(FileSchemeWrapper.scala:66
 at com.amazonaws.services.glue.util.FileSchemeWrapper.executeWith(FileSchemeWrapper.scala:58
 at com.amazonaws.services.glue.util.FileSchemeWrapper.executeWithQualifiedScheme(FileSchemeWrapper.scala:66
 at com.amazonaws.services.glue.SparkSQLDataSink.writeDynamicFrame(DataSink.scala:521
 at com.amazonaws.services.glue.DataSink.pyWriteDynamicFrame(DataSink.scala:63
 at sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method
 at sun.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:62
 at sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43
 at java.lang.reflect.Method.invoke(Method.java:498
 at py4j.reflection.MethodInvoker.invoke(MethodInvoker.java:244
 at py4j.reflection.ReflectionEngine.invoke(ReflectionEngine.java:357
 at py4j.Gateway.invoke(Gateway.java:282
 at py4j.commands.AbstractCommand.invokeMethod(AbstractCommand.java:132
 at py4j.commands.CallCommand.execute(CallCommand.java:79
 at py4j.GatewayConnection.run(GatewayConnection.java:238
 at java.lang.Thread.run(Thread.java:748) During handling of the above exception, another exception occurred: Traceback (most recent call last): File "/tmp/job-name", line 33, in <module> datasink0=glueContext.write_dynamic_frame.from_options(frame=partitioned_dynamic_df,connection_type="s3", connection_options={'paths':["s3://bucket-name/output/"]}, format="parquet") File "/opt/amazon/lib/python3.6/site-packages/awsglue/dynamicframe.py", line 640, in from_options format_options, transformation_ctx) File "/opt/amazon/lib/python3.6/site-packages/awsglue/context.py", line 242, in write_dynamic_frame_from_options format, format_options, transformation_ctx) File "/opt/amazon/lib/python3.6/site-packages/awsglue/context.py", line 265, in w
rite_from_options return sink.write(frame_or_dfc) File "/opt/amazon/lib/python3.6/site-packages/awsglue/data_sink.py", line 35, in write return self.writeFrame(dynamic_frame_or_dfc, info) File "/opt/amazon/lib/python3.6/site-packages/awsglue/data_sink.py", line 31, in writeFrame return DynamicFrame(self._jsink.pyWriteDynamicFrame(dynamic_frame._jdf, callsite(), info), dynamic_frame.glue_ctx, dynamic_frame.name + "_errors")
File "/opt/amazon/spark/python/lib/py4j-0.10.7-src.zip/py4j/java_gateway.py", line 1257, in __call__ answer, self.gateway_client, self.target_id, self.name) File "/opt/amazon/spark/python/lib/pyspark.zip/pyspark/sql/utils.py", line 79, in deco raise IllegalArgumentException(s.split(': ', 1)[1], stackTrace)
pyspark.sql.utils.IllegalArgumentException: 'Expected exactly one path to be specified, but got: '

数据格式如下:

{"lead:objectid": 21345, "contactdetails:name": "Ruben Dias", "contactdetails:contactnumber": 9838942345}
{"lead:objectid": 41335, "contactdetails:name": "Nick Pope", "contactdetails:contactnumber": 9228672345}
{"lead:objectid": 4132, "contactdetails:name": "Edison Cavani", "contactdetails:contactnumber": 9228633345}
{"lead:objectid": 21335, "contactdetails:name": "James Justin", "contactdetails:contactnumber": 9838672345}
{"lead:objectid": null, "contactdetails:name": "James Maddison", "contactdetails:contactnumber": null}
{"lead:objectid": null, "contactdetails:name": "Jack Grealish", "contactdetails:contactnumber": null}
{"lead:objectid": 3214, "contactdetails:name": "Harry Kane", "contactdetails:contactnumber": null}
{"lead:objectid": 34143, "contactdetails:name": null, "contactdetails:contactnumber": null}

有什么帮助/建议吗?

qmelpv7a

qmelpv7a1#

找到了故障。我已经写信了

datasink0=glueContext.write_dynamic_frame.from_options(frame=partitioned_dynamic_df,connection_type="s3", connection_options={'paths':["s3://bucket-name/output/"]}, format="parquet")

它正在传递一个列表作为“路径” connection_options . 只应提供一个“路径”(不是列表)。应该是:

datasink0=glueContext.write_dynamic_frame.from_options(frame=partitioned_dynamic_df,connection_type="s3", connection_options={'path':"s3://bucket-name/output/"}, format="parquet")

相关问题