org.apache.hadoop.fs.ParentNotDirectoryException -不是目录

ac1kyiln  于 5个月前  发布在  Hadoop
关注(0)|答案(1)|浏览(64)

我正在尝试使用Spark SQL将新记录追加到现有csv文件中。
获取Exception in thread "main" org.apache.hadoop.fs.ParentNotDirectoryException: /tmp/spark/input/csv/orders.csv (is not a directory)作为输入是一个文件而不是目录。
Spark(Java)代码和异常堆栈跟踪将在下面提到。
如何解决这一问题?

环境信息- Mac OS

MyiCloud@MyMC someDir % uname -a
Darwin MyMC.local 21.6.0 Darwin Kernel Version 21.6.0: Fri Sep 15 16:17:23 PDT 2023; root:xnu-8020.240.18.703.5~1/RELEASE_X86_64 x86_64

字符串

Hadoop环境信息

MyiCloud@MyMC someDir % hadoop version

Hadoop 3.3.6
Source code repository https://github.com/apache/hadoop.git -r 1be78238728da9266a4f88195058f08fd012bf9c
Compiled by ubuntu on 2023-06-18T08:22Z
Compiled on platform linux-x86_64
Compiled with protoc 3.7.1
From source with checksum 5652179ad55f76cb287d9c633bb53bbd
This command was run using /usr/local/Cellar/hadoop/3.3.6/libexec/share/hadoop/common/hadoop-common-3.3.6.jar

文件权限- HDFS

MyiCloud@MyMC sbin % hdfs dfs -ls /tmp/spark/input/csv/
2023-11-20 01:35:51,752 WARN util.NativeCodeLoader: Unable to load native-hadoop library for your platform... using builtin-java classes where applicable
Found 1 items
-rwxr-xr-x   1 MyiCloud supergroup        268 2023-11-20 00:58 /tmp/spark/input/csv/orders.csv

输入文件- orders.csv

firstName,lastName,state,quantity,revenue,timestamp
Jean Georges,Perrin,NC,1,300,1551903533
Jean Georges,Perrin,NC,2,120,1551903567
Jean Georges,Perrin,CA,4,75,1551903599
Holden,Karau,CA,6,37,1551904299
Ginni,Rometty,NY,7,91,1551916792
Holden,Karau,CA,4,153,1552876129

带有Java代码的Apache Spark

package org.example.ch11;

import org.apache.spark.api.java.JavaSparkContext;
import org.apache.spark.sql.Dataset;
import org.apache.spark.sql.Row;
import org.apache.spark.sql.RowFactory;
import org.apache.spark.sql.SparkSession;

import java.util.Collections;

public class InsertApp {

    public static void main(String[] args) {
        InsertApp inApp = new InsertApp();
        inApp.append();
    }

    private void append() {
        SparkSession spark = SparkSession.builder()
                .appName("InsertProcessor")
                .master("local[*]").getOrCreate();
        Dataset<Row> df = spark.read().format("csv")
                .option("inferSchema", true)
                .option("header", true)
                .load("hdfs://localhost:8020/tmp/spark/input/csv/orders.csv");
        System.out.println("Printing csv content..");
        df.show();

        df.createOrReplaceTempView("orders_view");
        try(JavaSparkContext jsc = new JavaSparkContext(spark.sparkContext())) {
            Dataset<Row> newDf = spark.createDataFrame(
                    jsc.parallelize(
                            Collections.singletonList(RowFactory.create("V", "S", "TN", 10, 20, 1551904299))
                    ),
                    df.schema()
            );
            newDf.createOrReplaceTempView("new_order_view");

            System.out.println("Inserting data into orders_view view from new_order_view view...");
            Dataset<Row> mergedDf = spark.sql("INSERT INTO orders_view SELECT * FROM new_order_view");
            mergedDf.show();
            System.out.println("Completed...");
        }
    }
}

异常-堆栈跟踪

Printing csv content..
23/11/20 01:38:10 INFO FileSourceStrategy: Pushed Filters: 
23/11/20 01:38:10 INFO FileSourceStrategy: Post-Scan Filters: 
23/11/20 01:38:10 INFO FileSourceStrategy: Output Data Schema: struct<firstName: string, lastName: string, state: string, quantity: int, revenue: int ... 1 more field>
23/11/20 01:38:10 INFO CodeGenerator: Code generated in 22.283375 ms
23/11/20 01:38:10 INFO MemoryStore: Block broadcast_4 stored as values in memory (estimated size 302.5 KiB, free 2003.6 MiB)
23/11/20 01:38:10 INFO MemoryStore: Block broadcast_4_piece0 stored as bytes in memory (estimated size 27.3 KiB, free 2003.6 MiB)
23/11/20 01:38:10 INFO BlockManagerInfo: Added broadcast_4_piece0 in memory on 192.168.0.6:49627 (size: 27.3 KiB, free: 2004.5 MiB)
23/11/20 01:38:10 INFO SparkContext: Created broadcast 4 from show at InsertApp.java:27
23/11/20 01:38:10 INFO FileSourceScanExec: Planning scan with bin packing, max size: 4194304 bytes, open cost is considered as scanning 4194304 bytes.
23/11/20 01:38:10 INFO SparkContext: Starting job: show at InsertApp.java:27
23/11/20 01:38:10 INFO DAGScheduler: Got job 2 (show at InsertApp.java:27) with 1 output partitions
23/11/20 01:38:10 INFO DAGScheduler: Final stage: ResultStage 2 (show at InsertApp.java:27)
23/11/20 01:38:10 INFO DAGScheduler: Parents of final stage: List()
23/11/20 01:38:10 INFO DAGScheduler: Missing parents: List()
23/11/20 01:38:10 INFO DAGScheduler: Submitting ResultStage 2 (MapPartitionsRDD[13] at show at InsertApp.java:27), which has no missing parents
23/11/20 01:38:10 INFO MemoryStore: Block broadcast_5 stored as values in memory (estimated size 14.4 KiB, free 2003.6 MiB)
23/11/20 01:38:10 INFO MemoryStore: Block broadcast_5_piece0 stored as bytes in memory (estimated size 6.8 KiB, free 2003.6 MiB)
23/11/20 01:38:10 INFO BlockManagerInfo: Added broadcast_5_piece0 in memory on 192.168.0.6:49627 (size: 6.8 KiB, free: 2004.5 MiB)
23/11/20 01:38:10 INFO SparkContext: Created broadcast 5 from broadcast at DAGScheduler.scala:1388
23/11/20 01:38:10 INFO DAGScheduler: Submitting 1 missing tasks from ResultStage 2 (MapPartitionsRDD[13] at show at InsertApp.java:27) (first 15 tasks are for partitions Vector(0))
23/11/20 01:38:10 INFO TaskSchedulerImpl: Adding task set 2.0 with 1 tasks resource profile 0
23/11/20 01:38:10 INFO TaskSetManager: Starting task 0.0 in stage 2.0 (TID 2) (192.168.0.6, executor driver, partition 0, NODE_LOCAL, 4877 bytes) taskResourceAssignments Map()
23/11/20 01:38:10 INFO Executor: Running task 0.0 in stage 2.0 (TID 2)
23/11/20 01:38:10 INFO BlockManagerInfo: Removed broadcast_3_piece0 on 192.168.0.6:49627 in memory (size: 7.7 KiB, free: 2004.5 MiB)
23/11/20 01:38:10 INFO FileScanRDD: Reading File path: hdfs://localhost:8020/tmp/spark/input/csv/orders.csv, range: 0-268, partition values: [empty row]
23/11/20 01:38:10 INFO CodeGenerator: Code generated in 19.008744 ms
23/11/20 01:38:10 INFO Executor: Finished task 0.0 in stage 2.0 (TID 2). 1789 bytes result sent to driver
23/11/20 01:38:10 INFO TaskSetManager: Finished task 0.0 in stage 2.0 (TID 2) in 115 ms on 192.168.0.6 (executor driver) (1/1)
23/11/20 01:38:10 INFO TaskSchedulerImpl: Removed TaskSet 2.0, whose tasks have all completed, from pool 
23/11/20 01:38:10 INFO DAGScheduler: ResultStage 2 (show at InsertApp.java:27) finished in 0.141 s
23/11/20 01:38:10 INFO DAGScheduler: Job 2 is finished. Cancelling potential speculative or zombie tasks for this job
23/11/20 01:38:10 INFO TaskSchedulerImpl: Killing all running tasks in stage 2: Stage finished
23/11/20 01:38:10 INFO DAGScheduler: Job 2 finished: show at InsertApp.java:27, took 0.150319 s
23/11/20 01:38:10 INFO CodeGenerator: Code generated in 40.368014 ms
+------------+--------+-----+--------+-------+----------+
|   firstName|lastName|state|quantity|revenue| timestamp|
+------------+--------+-----+--------+-------+----------+
|Jean Georges|  Perrin|   NC|       1|    300|1551903533|
|Jean Georges|  Perrin|   NC|       2|    120|1551903567|
|Jean Georges|  Perrin|   CA|       4|     75|1551903599|
|      Holden|   Karau|   CA|       6|     37|1551904299|
|       Ginni| Rometty|   NY|       7|     91|1551916792|
|      Holden|   Karau|   CA|       4|    153|1552876129|
+------------+--------+-----+--------+-------+----------+

Inserting data into orders_view view from new_order_view view...
23/11/20 01:38:10 INFO FileOutputCommitter: File Output Committer Algorithm version is 1
23/11/20 01:38:10 INFO FileOutputCommitter: FileOutputCommitter skip cleanup _temporary folders under output directory:false, ignore cleanup failures: false
23/11/20 01:38:10 INFO SQLHadoopMapReduceCommitProtocol: Using output committer class org.apache.hadoop.mapreduce.lib.output.FileOutputCommitter
23/11/20 01:38:10 INFO SparkUI: Stopped Spark web UI at http://192.168.0.6:4040
23/11/20 01:38:10 INFO MapOutputTrackerMasterEndpoint: MapOutputTrackerMasterEndpoint stopped!
23/11/20 01:38:10 INFO MemoryStore: MemoryStore cleared
23/11/20 01:38:10 INFO BlockManager: BlockManager stopped
23/11/20 01:38:10 INFO BlockManagerMaster: BlockManagerMaster stopped
23/11/20 01:38:10 INFO OutputCommitCoordinator$OutputCommitCoordinatorEndpoint: OutputCommitCoordinator stopped!
23/11/20 01:38:10 INFO SparkContext: Successfully stopped SparkContext
Exception in thread "main" org.apache.hadoop.fs.ParentNotDirectoryException: /tmp/spark/input/csv/orders.csv (is not a directory)
    at org.apache.hadoop.hdfs.server.namenode.FSPermissionChecker.checkIsDirectory(FSPermissionChecker.java:745)
    at org.apache.hadoop.hdfs.server.namenode.FSPermissionChecker.checkSimpleTraverse(FSPermissionChecker.java:736)
    at org.apache.hadoop.hdfs.server.namenode.FSPermissionChecker.checkTraverse(FSPermissionChecker.java:711)
    at org.apache.hadoop.hdfs.server.namenode.FSDirectory.checkTraverse(FSDirectory.java:1892)
    at org.apache.hadoop.hdfs.server.namenode.FSDirectory.checkTraverse(FSDirectory.java:1910)
    at org.apache.hadoop.hdfs.server.namenode.FSDirectory.resolvePath(FSDirectory.java:727)
    at org.apache.hadoop.hdfs.server.namenode.FSDirMkdirOp.mkdirs(FSDirMkdirOp.java:51)
    at org.apache.hadoop.hdfs.server.namenode.FSNamesystem.mkdirs(FSNamesystem.java:3441)
    at org.apache.hadoop.hdfs.server.namenode.NameNodeRpcServer.mkdirs(NameNodeRpcServer.java:1167)
    at org.apache.hadoop.hdfs.protocolPB.ClientNamenodeProtocolServerSideTranslatorPB.mkdirs(ClientNamenodeProtocolServerSideTranslatorPB.java:742)
    at org.apache.hadoop.hdfs.protocol.proto.ClientNamenodeProtocolProtos$ClientNamenodeProtocol$2.callBlockingMethod(ClientNamenodeProtocolProtos.java)
    at org.apache.hadoop.ipc.ProtobufRpcEngine2$Server$ProtoBufRpcInvoker.call(ProtobufRpcEngine2.java:621)
    at org.apache.hadoop.ipc.ProtobufRpcEngine2$Server$ProtoBufRpcInvoker.call(ProtobufRpcEngine2.java:589)
    at org.apache.hadoop.ipc.ProtobufRpcEngine2$Server$ProtoBufRpcInvoker.call(ProtobufRpcEngine2.java:573)
    at org.apache.hadoop.ipc.RPC$Server.call(RPC.java:1227)
    at org.apache.hadoop.ipc.Server$RpcCall.run(Server.java:1094)
    at org.apache.hadoop.ipc.Server$RpcCall.run(Server.java:1017)
    at java.security.AccessController.doPrivileged(Native Method)
    at javax.security.auth.Subject.doAs(Subject.java:422)
    at org.apache.hadoop.security.UserGroupInformation.doAs(UserGroupInformation.java:1899)
    at org.apache.hadoop.ipc.Server$Handler.run(Server.java:3048)

    at sun.reflect.NativeConstructorAccessorImpl.newInstance0(Native Method)
    at sun.reflect.NativeConstructorAccessorImpl.newInstance(NativeConstructorAccessorImpl.java:62)
    at sun.reflect.DelegatingConstructorAccessorImpl.newInstance(DelegatingConstructorAccessorImpl.java:45)
    at java.lang.reflect.Constructor.newInstance(Constructor.java:423)
    at org.apache.hadoop.ipc.RemoteException.instantiateException(RemoteException.java:121)
    at org.apache.hadoop.ipc.RemoteException.unwrapRemoteException(RemoteException.java:88)
    at org.apache.hadoop.hdfs.DFSClient.primitiveMkdir(DFSClient.java:2426)
    at org.apache.hadoop.hdfs.DFSClient.mkdirs(DFSClient.java:2400)
    at org.apache.hadoop.hdfs.DistributedFileSystem$27.doCall(DistributedFileSystem.java:1324)
    at org.apache.hadoop.hdfs.DistributedFileSystem$27.doCall(DistributedFileSystem.java:1321)
    at org.apache.hadoop.fs.FileSystemLinkResolver.resolve(FileSystemLinkResolver.java:81)
    at org.apache.hadoop.hdfs.DistributedFileSystem.mkdirsInternal(DistributedFileSystem.java:1338)
    at org.apache.hadoop.hdfs.DistributedFileSystem.mkdirs(DistributedFileSystem.java:1313)
    at org.apache.hadoop.fs.FileSystem.mkdirs(FileSystem.java:2275)
    at org.apache.hadoop.mapreduce.lib.output.FileOutputCommitter.setupJob(FileOutputCommitter.java:354)
    at org.apache.spark.internal.io.HadoopMapReduceCommitProtocol.setupJob(HadoopMapReduceCommitProtocol.scala:178)
    at org.apache.spark.sql.execution.datasources.FileFormatWriter$.write(FileFormatWriter.scala:173)
    at org.apache.spark.sql.execution.datasources.InsertIntoHadoopFsRelationCommand.run(InsertIntoHadoopFsRelationCommand.scala:188)
    at org.apache.spark.sql.execution.command.DataWritingCommandExec.sideEffectResult$lzycompute(commands.scala:108)
    at org.apache.spark.sql.execution.command.DataWritingCommandExec.sideEffectResult(commands.scala:106)
    at org.apache.spark.sql.execution.command.DataWritingCommandExec.executeCollect(commands.scala:120)
    at org.apache.spark.sql.Dataset.$anonfun$logicalPlan$1(Dataset.scala:228)
    at org.apache.spark.sql.Dataset.$anonfun$withAction$1(Dataset.scala:3687)
    at org.apache.spark.sql.execution.SQLExecution$.$anonfun$withNewExecutionId$5(SQLExecution.scala:103)
    at org.apache.spark.sql.execution.SQLExecution$.withSQLConfPropagated(SQLExecution.scala:163)
    at org.apache.spark.sql.execution.SQLExecution$.$anonfun$withNewExecutionId$1(SQLExecution.scala:90)
    at org.apache.spark.sql.SparkSession.withActive(SparkSession.scala:775)
    at org.apache.spark.sql.execution.SQLExecution$.withNewExecutionId(SQLExecution.scala:64)
    at org.apache.spark.sql.Dataset.withAction(Dataset.scala:3685)
    at org.apache.spark.sql.Dataset.<init>(Dataset.scala:228)
    at org.apache.spark.sql.Dataset$.$anonfun$ofRows$2(Dataset.scala:99)
    at org.apache.spark.sql.SparkSession.withActive(SparkSession.scala:775)
    at org.apache.spark.sql.Dataset$.ofRows(Dataset.scala:96)
    at org.apache.spark.sql.SparkSession.$anonfun$sql$1(SparkSession.scala:618)
    at org.apache.spark.sql.SparkSession.withActive(SparkSession.scala:775)
    at org.apache.spark.sql.SparkSession.sql(SparkSession.scala:613)
    at org.example.ch11.InsertApp.append(InsertApp.java:40)
    at org.example.ch11.InsertApp.main(InsertApp.java:15)
Caused by: org.apache.hadoop.ipc.RemoteException(org.apache.hadoop.fs.ParentNotDirectoryException): /tmp/spark/input/csv/orders.csv (is not a directory)
    at org.apache.hadoop.hdfs.server.namenode.FSPermissionChecker.checkIsDirectory(FSPermissionChecker.java:745)
    at org.apache.hadoop.hdfs.server.namenode.FSPermissionChecker.checkSimpleTraverse(FSPermissionChecker.java:736)
    at org.apache.hadoop.hdfs.server.namenode.FSPermissionChecker.checkTraverse(FSPermissionChecker.java:711)
    at org.apache.hadoop.hdfs.server.namenode.FSDirectory.checkTraverse(FSDirectory.java:1892)
    at org.apache.hadoop.hdfs.server.namenode.FSDirectory.checkTraverse(FSDirectory.java:1910)
    at org.apache.hadoop.hdfs.server.namenode.FSDirectory.resolvePath(FSDirectory.java:727)
    at org.apache.hadoop.hdfs.server.namenode.FSDirMkdirOp.mkdirs(FSDirMkdirOp.java:51)
    at org.apache.hadoop.hdfs.server.namenode.FSNamesystem.mkdirs(FSNamesystem.java:3441)
    at org.apache.hadoop.hdfs.server.namenode.NameNodeRpcServer.mkdirs(NameNodeRpcServer.java:1167)
    at org.apache.hadoop.hdfs.protocolPB.ClientNamenodeProtocolServerSideTranslatorPB.mkdirs(ClientNamenodeProtocolServerSideTranslatorPB.java:742)
    at org.apache.hadoop.hdfs.protocol.proto.ClientNamenodeProtocolProtos$ClientNamenodeProtocol$2.callBlockingMethod(ClientNamenodeProtocolProtos.java)
    at org.apache.hadoop.ipc.ProtobufRpcEngine2$Server$ProtoBufRpcInvoker.call(ProtobufRpcEngine2.java:621)
    at org.apache.hadoop.ipc.ProtobufRpcEngine2$Server$ProtoBufRpcInvoker.call(ProtobufRpcEngine2.java:589)
    at org.apache.hadoop.ipc.ProtobufRpcEngine2$Server$ProtoBufRpcInvoker.call(ProtobufRpcEngine2.java:573)
    at org.apache.hadoop.ipc.RPC$Server.call(RPC.java:1227)
    at org.apache.hadoop.ipc.Server$RpcCall.run(Server.java:1094)
    at org.apache.hadoop.ipc.Server$RpcCall.run(Server.java:1017)
    at java.security.AccessController.doPrivileged(Native Method)
    at javax.security.auth.Subject.doAs(Subject.java:422)
    at org.apache.hadoop.security.UserGroupInformation.doAs(UserGroupInformation.java:1899)
    at org.apache.hadoop.ipc.Server$Handler.run(Server.java:3048)

    at org.apache.hadoop.ipc.Client.getRpcResponse(Client.java:1511)
    at org.apache.hadoop.ipc.Client.call(Client.java:1457)
    at org.apache.hadoop.ipc.Client.call(Client.java:1367)
    at org.apache.hadoop.ipc.ProtobufRpcEngine$Invoker.invoke(ProtobufRpcEngine.java:228)
    at org.apache.hadoop.ipc.ProtobufRpcEngine$Invoker.invoke(ProtobufRpcEngine.java:116)
    at com.sun.proxy.$Proxy18.mkdirs(Unknown Source)
    at org.apache.hadoop.hdfs.protocolPB.ClientNamenodeProtocolTranslatorPB.mkdirs(ClientNamenodeProtocolTranslatorPB.java:656)
    at sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method)
    at sun.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:62)
    at sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43)
    at java.lang.reflect.Method.invoke(Method.java:498)
    at org.apache.hadoop.io.retry.RetryInvocationHandler.invokeMethod(RetryInvocationHandler.java:422)
    at org.apache.hadoop.io.retry.RetryInvocationHandler$Call.invokeMethod(RetryInvocationHandler.java:165)
    at org.apache.hadoop.io.retry.RetryInvocationHandler$Call.invoke(RetryInvocationHandler.java:157)
    at org.apache.hadoop.io.retry.RetryInvocationHandler$Call.invokeOnce(RetryInvocationHandler.java:95)
    at org.apache.hadoop.io.retry.RetryInvocationHandler.invoke(RetryInvocationHandler.java:359)
    at com.sun.proxy.$Proxy19.mkdirs(Unknown Source)
    at org.apache.hadoop.hdfs.DFSClient.primitiveMkdir(DFSClient.java:2424)
    ... 31 more
23/11/20 01:38:11 INFO ShutdownHookManager: Shutdown hook called
23/11/20 01:38:11 INFO ShutdownHookManager: Deleting directory /private/var/folders/6w/v2bcqfkd6kl5ylt9k5_q0lb80000gn/T/spark-cf226489-3921-439a-9653-b33159d3a230

Process finished with exit code 1

oymdgrw7

oymdgrw71#

更新你的逻辑:

import org.apache.spark.api.java.JavaSparkContext;
import org.apache.spark.sql.Dataset;
import org.apache.spark.sql.Row;
import org.apache.spark.sql.RowFactory;
import org.apache.spark.sql.SparkSession;

import java.util.Collections;

public class InsertApp {

    public static void main(String[] args) {
        InsertApp inApp = new InsertApp();
        inApp.append();
    }

    private void append() {
        SparkSession spark = SparkSession.builder()
                .appName("InsertProcessor")
                .master("local[*]").getOrCreate();
        Dataset<Row> df = spark.read()
                .option("inferSchema", true)
                .option("header", true)
                .csv("hdfs://localhost:8020/tmp/spark/input/csv/orders.csv");

        df.createOrReplaceTempView("order_view");
        try(JavaSparkContext jsc = new JavaSparkContext(spark.sparkContext())) {
            Dataset<Row> newDf = spark.createDataFrame(
                    jsc.parallelize(
                            Collections.singletonList(RowFactory.create("V", "S", "TN", 10, 20, 1551904299))
                    ),
                    df.schema()
            );
            newDf.createOrReplaceTempView("new_order_view");
            Dataset<Row> df1 = spark.sql("SELECT * from order_view");
            System.out.println("Data From Order View:");
            df1.show();
            Dataset<Row> df2 = spark.sql("SELECT * from new_order_view");
            System.out.println("Data From New Order View:");
            df2.show();
            Dataset<Row> mergedDf = df1.union(df2).distinct();
            mergedDf.createOrReplaceTempView("order_view");
            Dataset<Row> df3 = spark.sql("SELECT * from order_view");
            System.out.println("Data From Order View After Merging:");
            df3.show();
            df3.write().mode(SaveMode.Overwrite)
                .option("inferSchema", true)
                .option("header", true).csv("hdfs://localhost:8020/tmp/spark/input/csv/orders.csv");
        }
    }
}

字符串
输出量:

Data From Order View:
+------------+--------+-----+--------+-------+----------+
|   firstName|lastName|state|quantity|revenue| timestamp|
+------------+--------+-----+--------+-------+----------+
|Jean Georges|  Perrin|   NC|       1|    300|1551903533|
|Jean Georges|  Perrin|   NC|       2|    120|1551903567|
|Jean Georges|  Perrin|   CA|       4|     75|1551903599|
|      Holden|   Karau|   CA|       6|     37|1551904299|
|       Ginni| Rometty|   NY|       7|     91|1551916792|
|      Holden|   Karau|   CA|       4|    153|1552876129|
+------------+--------+-----+--------+-------+----------+

Data From New Order View:
+---------+--------+-----+--------+-------+----------+
|firstName|lastName|state|quantity|revenue| timestamp|
+---------+--------+-----+--------+-------+----------+
|        V|       S|   TN|      10|     20|1551904299|
+---------+--------+-----+--------+-------+----------+

Data From Order View After Merging:
+------------+--------+-----+--------+-------+----------+
|   firstName|lastName|state|quantity|revenue| timestamp|
+------------+--------+-----+--------+-------+----------+
|Jean Georges|  Perrin|   CA|       4|     75|1551903599|
|       Ginni| Rometty|   NY|       7|     91|1551916792|
|Jean Georges|  Perrin|   NC|       1|    300|1551903533|
|      Holden|   Karau|   CA|       4|    153|1552876129|
|Jean Georges|  Perrin|   NC|       2|    120|1551903567|
|      Holden|   Karau|   CA|       6|     37|1551904299|
|           V|       S|   TN|      10|     20|1551904299|
+------------+--------+-----+--------+-------+----------+


有几点要提:

  • 使用csv方法加载.csv文件,而不是load方法。
  • 我使用union方法将来自两个不同表视图的两行合并合并,并使用distinct方法将其区分开来,然后用新创建的合并数据替换order_view的现有数据,以完成工作。

Hadoop UI输出(包含新的合并数据):


的数据



我想这是个更容易接近的方法。看看有没有用。

相关问题