我们正在使用Java自定义目录与冰山。表是正确创建的,但我们得到一个问题,当我们插入数据。
public String createCustomTable(String tableName) {
try {
TableIdentifier tableIdentifier = TableIdentifier.of(name(), tableName);
Schema schema = readSchema(tableIdentifier);
Map<String, String> properties = ImmutableMap.of(
TableProperties.DEFAULT_FILE_FORMAT, FileFormat.PARQUET.name()
);
PartitionSpec partitionSpec = PartitionSpec.builderFor(schema)
.identity(getPartitionKeyfromSchema(tableIdentifier.name()))
.build();
String tableLocation = defaultLocation + tableIdentifier.namespace().toString() + "/" + tableIdentifier.name();
catalog.createTable(tableIdentifier, schema, partitionSpec, tableLocation, properties);
catalog.loadTable(TableIdentifier.of(name(), tableName));
return "Table created";
} catch (Exception e) {
return e.getMessage();
}
}
public String insertData(String tableName, String csvPath) throws IOException {
Table icebergTable = catalog.loadTable(TableIdentifier.of(name(), tableName));
SparkSession spark = SparkSession.builder()
.config("spark.master", "local")
.getOrCreate();
String headerJson = readHeaderJson(tableName);
LOGGER.info("Header JSON for {}: {}", tableName, headerJson);
String[] columns = headerJson.split(",");
Dataset<Row> df = spark.read()
.option("header", "false")
.option("inferSchema", "false")
.option("comment", "#")
.option("sep", "|")
.csv(csvPath)
.toDF(columns);
LOGGER.info("Actual columns: {}", Arrays.toString(df.columns()));
for (String col : df.columns()) {
df = df.withColumn(col, df.col(col).cast("string"));
}
df.write().format("iceberg").mode(SaveMode.Append).save(icebergTable.location());
LOGGER.info("Data inserted successfully into table: {}", tableName);
}
字符串
当我们通过Java中的主程序执行时,它可以完美地工作
然而,当我们创建一个jar并从Spark调用它时,它会给我们这个错误
ERROR:root:Error: An error occurred while calling o0.insertData.
: java.lang.ClassNotFoundException:
Failed to find data source: iceberg. Please find packages at
http://spark.apache.org/third-party-projects.html
at org.apache.spark.sql.errors.QueryExecutionErrors$.failedToFindDataSourceError(QueryExecutionErrors.scala:443) at org.apache.spark.sql.execution.datasources.DataSource$.lookupDataSource(DataSource.scala:670)
at org.apache.spark.sql.execution.datasources.DataSource$.lookupDataSourceV2(DataSource.scala:720)
at org.apache.spark.sql.DataFrameWriter.lookupV2Provider(DataFrameWriter.scala:852)
at org.apache.spark.sql.DataFrameWriter.saveInternal(DataFrameWriter.scala:256)
at org.apache.spark.sql.DataFrameWriter.save(DataFrameWriter.scala:239)
at com.xyz.catalog.CustomCatalog.insertData(CustomCatalog.java:178)
at java.base/jdk.internal.reflect.NativeMethodAccessorImpl.invoke0(Native Method)
at java.base/jdk.internal.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:62)
at java.base/jdk.internal.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43)
at java.base/java.lang.reflect.Method.invoke(Method.java:566)
at py4j.reflection.MethodInvoker.invoke(MethodInvoker.java:244)
at py4j.reflection.ReflectionEngine.invoke(ReflectionEngine.java:357)
at py4j.Gateway.invoke(Gateway.java:282)
at py4j.commands.AbstractCommand.invokeMethod(AbstractCommand.java:132)
at py4j.commands.CallCommand.execute(CallCommand.java:79)
at py4j.GatewayConnection.run(GatewayConnection.java:238)
at java.base/java.lang.Thread.run(Thread.java:829)
Caused by: java.lang.ClassNotFoundException: iceberg.DefaultSource
at java.base/jdk.internal.loader.BuiltinClassLoader.loadClass(BuiltinClassLoader.java:581)
at java.base/jdk.internal.loader.ClassLoaders$AppClassLoader.loadClass(ClassLoaders.java:178)
at java.base/java.lang.ClassLoader.loadClass(ClassLoader.java:527)
at org.apache.spark.sql.execution.datasources.DataSource$.$anonfun$lookupDataSource$5(DataSource.scala:656)
at scala.util.Try$.apply(Try.scala:210)
at org.apache.spark.sql.execution.datasources.DataSource$.$anonfun$lookupDataSource$4(DataSource.scala:656)
at scala.util.Failure.orElse(Try.scala:221)
at org.apache.spark.sql.execution.datasources.DataSource$.lookupDataSource(DataSource.scala:656)
... 16 more
型
任何帮助将不胜感激
我们正在尝试使用自定义目录在冰山表中创建和插入数据。我们希望插入能够正常工作
df.write().format("iceberg").mode(SaveMode.Append).save(icebergTable.location());
型
然而,我们得到一个错误
2条答案
按热度按时间2sbarzqh1#
此错误表明您知道您尝试使用的Datasheet(
iceberg
)不在Spark的运行时类路径上。这意味着您可能没有在作业提交中包含iceberg-spark-runtime包。如何执行此操作的示例在入门指南中显示
字符串
这将向Spark Job的Driver和Executors添加依赖项。
类似的packages arg应该添加到您的Spark Job提交或运行Spark的方法的相应属性中。更多信息请参见Spark Docs中的
spark.jars.packages
。omvjsjqw2#
您正在创建一个新会话,而不是使用现有会话。
字符串
要么把它换成
型
或者,无论出于何种原因,您需要一个新的会话,然后使用正确的选项创建它:
型