我有以下来自不同流媒体主题的json事件
customer= [
{
"Customer_id": "103",
"Customer_name": "Hari",
"email_address": "hari@gmail.com"
}]
product = [
{
"Customer_id": "103",
"product_id": " 205",
"product_name": "Books",
"product_Category": "Stationary"
}]
Sales= [
{
"customer_id": "103",
"line": {
"product_id": "205",
"purchase_time": "2017-08-19 12:17:55-0400",
"quantity": "2",
"unit_price": "25000"
},
"shipping_address": "Chennai"
}]
上面的流媒体与下面的用例是不同的主题
当用户登录到e-portal-customer.json时
当用户搜索产品时-product.json
当用户 checkout 产品时-sales.json
我为上面的用例创建了下面的df
sales_schema = StructType([
StructField("Customer_id", StringType(), True),
StructField("Customer_name", StringType(), True),
StructField("email_address", StringType(), True),
StructField("product_Category", StringType(), True),
StructField("product_id", StringType(), True),
StructField("product_name", StringType(), True),
StructField("purchase_time", StringType(), True),
StructField("quantity", StringType(), True),
StructField("unit_price", StringType(), True),
StructField("shipping_address", StringType(), True)
]
)
cus_Topic=session.sparkContext.parallelize(customer)
sales_df = session.createDataFrame(cus_Topic,sales_schema)
product_topic = session.sparkContext.parallelize(product)
productDF = session.createDataFrame(product_topic)
sales_df.show()
|Customer_id|Customer_name| email_address|product_Category|product_id|product_name|purchase_time|quantity|unit_price|shipping_address|
103| Hari| hari@gmail.com| null| null| null| null| null| null| null|
104| Umesh|Umesh3@gmail.com| null| null| null| null| null| null| null|
productDF.show()
+-----------+----------------+----------+------------+
|Customer_id|product_Category|product_id|product_name|
+-----------+----------------+----------+------------+
| 103| Stationary| 205| Books|
| 104| Electronics| 206| Mobile|
+-----------+----------------+----------+------------+
现在我想根据客户id合并这个Dataframe
product_search_DF = sales_df.join(productDF, [sales_df.Customer_id==productDF.Customer_id], 'left_outer')
product_search_DF.show()
+-----------+-------------+----------------+----------------+----------+------------+-------------+--------+----------+----------------+-----------+----------------+----------+------------+
|Customer_id|Customer_name| email_address|product_Category|product_id|product_name|purchase_time|quantity|unit_price|shipping_address|Customer_id|product_Category|product_id|product_name|
+-----------+-------------+----------------+----------------+----------+------------+-------------+--------+----------+----------------+-----------+----------------+----------+------------+
| 104| Umesh|Umesh3@gmail.com| null| null| null| null| null| null| null| 104| Electronics| 206| Mobile|
| 103| Hari| hari@gmail.com| null| null| null| null| null| null| null| 103| Stationary| 205| Books|
+-----------+-------------+----------------+----------------+----------+------------+-------------+--------+----------+----------------+-----------+----------------+----------+------------+
但它会复制列
此外,iam所看到的是,从直播主题来看,所有这些来自客户、产品和销售的数据都应该合并到一个Dataframe中
我也想知道实现这一目标的正确方法。。
谢谢你的帮助。。谢谢
暂无答案!
目前还没有任何答案,快来回答吧!