pyspark-如何根据条件将多个json流合并到单个Dataframe?

qc6wkl3g  于 2021-05-18  发布在  Spark
关注(0)|答案(0)|浏览(287)

我有以下来自不同流媒体主题的json事件

customer= [

    {
        "Customer_id": "103",
        "Customer_name": "Hari",
        "email_address": "hari@gmail.com"
    }]

product = [
  {
    "Customer_id": "103",
    "product_id": " 205",
    "product_name": "Books",
    "product_Category": "Stationary"
  }]

Sales= [
  {
    "customer_id": "103",
    "line": {
      "product_id": "205",
      "purchase_time": "2017-08-19 12:17:55-0400",
      "quantity": "2",
      "unit_price": "25000"
    },
    "shipping_address": "Chennai"
  }]

上面的流媒体与下面的用例是不同的主题
当用户登录到e-portal-customer.json时
当用户搜索产品时-product.json
当用户 checkout 产品时-sales.json
我为上面的用例创建了下面的df

sales_schema = StructType([
    StructField("Customer_id", StringType(), True),
    StructField("Customer_name", StringType(), True),
    StructField("email_address", StringType(), True),
    StructField("product_Category", StringType(), True),
    StructField("product_id", StringType(), True),
    StructField("product_name", StringType(), True),
    StructField("purchase_time", StringType(), True),
    StructField("quantity", StringType(), True),
    StructField("unit_price", StringType(), True),
    StructField("shipping_address", StringType(), True)
   ]
)
cus_Topic=session.sparkContext.parallelize(customer)
sales_df = session.createDataFrame(cus_Topic,sales_schema)

product_topic = session.sparkContext.parallelize(product)
productDF = session.createDataFrame(product_topic)

sales_df.show()

|Customer_id|Customer_name|   email_address|product_Category|product_id|product_name|purchase_time|quantity|unit_price|shipping_address|
         103|         Hari|  hari@gmail.com|            null|      null|        null|         null|    null|      null|            null|
         104|        Umesh|Umesh3@gmail.com|            null|      null|        null|         null|    null|      null|            null|

productDF.show()

+-----------+----------------+----------+------------+
|Customer_id|product_Category|product_id|product_name|
+-----------+----------------+----------+------------+
|        103|      Stationary|       205|       Books|
|        104|     Electronics|       206|      Mobile|
+-----------+----------------+----------+------------+

现在我想根据客户id合并这个Dataframe

product_search_DF = sales_df.join(productDF, [sales_df.Customer_id==productDF.Customer_id], 'left_outer')
product_search_DF.show()

+-----------+-------------+----------------+----------------+----------+------------+-------------+--------+----------+----------------+-----------+----------------+----------+------------+
|Customer_id|Customer_name|   email_address|product_Category|product_id|product_name|purchase_time|quantity|unit_price|shipping_address|Customer_id|product_Category|product_id|product_name|
+-----------+-------------+----------------+----------------+----------+------------+-------------+--------+----------+----------------+-----------+----------------+----------+------------+
|        104|        Umesh|Umesh3@gmail.com|            null|      null|        null|         null|    null|      null|            null|        104|     Electronics|       206|      Mobile|
|        103|         Hari|  hari@gmail.com|            null|      null|        null|         null|    null|      null|            null|        103|      Stationary|       205|       Books|
+-----------+-------------+----------------+----------------+----------+------------+-------------+--------+----------+----------------+-----------+----------------+----------+------------+

但它会复制列
此外,iam所看到的是,从直播主题来看,所有这些来自客户、产品和销售的数据都应该合并到一个Dataframe中
我也想知道实现这一目标的正确方法。。
谢谢你的帮助。。谢谢

暂无答案!

目前还没有任何答案,快来回答吧!

相关问题