pyspark java.lang.ClassNotFoundException:找不到数据源:冰山,使用Java自定义目录时出现问题

fae0ux8s  于 5个月前  发布在  Spark
关注(0)|答案(2)|浏览(51)

我们正在使用Java自定义目录与冰山。表是正确创建的,但我们得到一个问题,当我们插入数据。

public String createCustomTable(String tableName) {
        try {
            TableIdentifier tableIdentifier = TableIdentifier.of(name(), tableName);
            Schema schema = readSchema(tableIdentifier);
            Map<String, String> properties = ImmutableMap.of(
                    TableProperties.DEFAULT_FILE_FORMAT, FileFormat.PARQUET.name()
            );
            PartitionSpec partitionSpec = PartitionSpec.builderFor(schema)
                    .identity(getPartitionKeyfromSchema(tableIdentifier.name()))
                    .build();
            String tableLocation = defaultLocation + tableIdentifier.namespace().toString() + "/" + tableIdentifier.name();
            catalog.createTable(tableIdentifier, schema, partitionSpec, tableLocation, properties);
            catalog.loadTable(TableIdentifier.of(name(), tableName));
            return "Table created";
        } catch (Exception e) {
            return e.getMessage();
        }
    }

    public String insertData(String tableName, String csvPath) throws IOException {
            Table icebergTable = catalog.loadTable(TableIdentifier.of(name(), tableName));
            SparkSession spark = SparkSession.builder()
                    .config("spark.master", "local")
                    .getOrCreate();
            String headerJson = readHeaderJson(tableName);
            LOGGER.info("Header JSON for {}: {}", tableName, headerJson);

            String[] columns = headerJson.split(",");
            Dataset<Row> df = spark.read()
                    .option("header", "false")
                    .option("inferSchema", "false")
                    .option("comment", "#")
                    .option("sep", "|")
                    .csv(csvPath)
                    .toDF(columns);
            LOGGER.info("Actual columns: {}", Arrays.toString(df.columns()));

            for (String col : df.columns()) {
                df = df.withColumn(col, df.col(col).cast("string"));
            }
            df.write().format("iceberg").mode(SaveMode.Append).save(icebergTable.location());
            LOGGER.info("Data inserted successfully into table: {}", tableName);
    }

字符串
当我们通过Java中的主程序执行时,它可以完美地工作
然而,当我们创建一个jar并从Spark调用它时,它会给我们这个错误

ERROR:root:Error: An error occurred while calling o0.insertData.
: java.lang.ClassNotFoundException:
Failed to find data source: iceberg. Please find packages at
http://spark.apache.org/third-party-projects.html

        at org.apache.spark.sql.errors.QueryExecutionErrors$.failedToFindDataSourceError(QueryExecutionErrors.scala:443)        at org.apache.spark.sql.execution.datasources.DataSource$.lookupDataSource(DataSource.scala:670)
        at org.apache.spark.sql.execution.datasources.DataSource$.lookupDataSourceV2(DataSource.scala:720)
        at org.apache.spark.sql.DataFrameWriter.lookupV2Provider(DataFrameWriter.scala:852)
        at org.apache.spark.sql.DataFrameWriter.saveInternal(DataFrameWriter.scala:256)
        at org.apache.spark.sql.DataFrameWriter.save(DataFrameWriter.scala:239)
        at com.xyz.catalog.CustomCatalog.insertData(CustomCatalog.java:178)
        at java.base/jdk.internal.reflect.NativeMethodAccessorImpl.invoke0(Native Method)
        at java.base/jdk.internal.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:62)
        at java.base/jdk.internal.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43)     
        at java.base/java.lang.reflect.Method.invoke(Method.java:566)
        at py4j.reflection.MethodInvoker.invoke(MethodInvoker.java:244)
        at py4j.reflection.ReflectionEngine.invoke(ReflectionEngine.java:357)
        at py4j.Gateway.invoke(Gateway.java:282)
        at py4j.commands.AbstractCommand.invokeMethod(AbstractCommand.java:132)
        at py4j.commands.CallCommand.execute(CallCommand.java:79)
        at py4j.GatewayConnection.run(GatewayConnection.java:238)
        at java.base/java.lang.Thread.run(Thread.java:829)
Caused by: java.lang.ClassNotFoundException: iceberg.DefaultSource
        at java.base/jdk.internal.loader.BuiltinClassLoader.loadClass(BuiltinClassLoader.java:581)
        at java.base/jdk.internal.loader.ClassLoaders$AppClassLoader.loadClass(ClassLoaders.java:178)
        at java.base/java.lang.ClassLoader.loadClass(ClassLoader.java:527)
        at org.apache.spark.sql.execution.datasources.DataSource$.$anonfun$lookupDataSource$5(DataSource.scala:656)     
        at scala.util.Try$.apply(Try.scala:210)
        at org.apache.spark.sql.execution.datasources.DataSource$.$anonfun$lookupDataSource$4(DataSource.scala:656)     
        at scala.util.Failure.orElse(Try.scala:221)
        at org.apache.spark.sql.execution.datasources.DataSource$.lookupDataSource(DataSource.scala:656)
        ... 16 more


任何帮助将不胜感激
我们正在尝试使用自定义目录在冰山表中创建和插入数据。我们希望插入能够正常工作

df.write().format("iceberg").mode(SaveMode.Append).save(icebergTable.location());


然而,我们得到一个错误

2sbarzqh

2sbarzqh1#

此错误表明您知道您尝试使用的Datasheet(iceberg)不在Spark的运行时类路径上。这意味着您可能没有在作业提交中包含iceberg-spark-runtime包。
如何执行此操作的示例在入门指南中显示

spark-shell --packages org.apache.iceberg:iceberg-spark-runtime-3.2_2.12:1.4.1

字符串
这将向Spark Job的Driver和Executors添加依赖项。
类似的packages arg应该添加到您的Spark Job提交或运行Spark的方法的相应属性中。更多信息请参见Spark Docs中的spark.jars.packages

omvjsjqw

omvjsjqw2#

您正在创建一个新会话,而不是使用现有会话。

SparkSession spark = SparkSession.builder()
                    .config("spark.master", "local")
                    .getOrCreate();

字符串
要么把它换成

SparkSession spark = SparkSession.getActiveSession();


或者,无论出于何种原因,您需要一个新的会话,然后使用正确的选项创建它:

SparkSession spark = SparkSession.builder()
                    .config("spark.master", "local")
                    .config("spark.jars.packages", "org.apache.iceberg:iceberg-spark-runtime-3.2_2.12:1.4.1")
                    .getOrCreate();

相关问题