在另一个pysparkDataframe中搜索来自一个pysparkDataframe的字符串

cld4siwp  于 2021-05-18  发布在  Spark
关注(0)|答案(1)|浏览(391)

我有一个pyspark数据框,其中一列URL包含一些URL,另一个pyspark数据框也包含URL和id,但这些URL是链接,例如第一个是.com,第二个是.com/contact。我想在第一个Dataframe的新列中收集与特定域相关的所有inlink的id。我现在正在做这个

url_list = df1.select('url').collect()
all_rows = df2.collect()
ids = list()
urls = list()
for row in all_rows:
  ids.append(row.id)
  urls.append(row.url)

dict_ids = dict([(i.website,"") for i in url_list])

for url,id in zip(urls, ids):
  res = [ele.website for ele in url_list if(ele.website in url)]
  if len(res)>0:
    print(res)
    dict_ids[res[0]]+=('\n\n\n'+id+'\n\n\n')

这是需要很多时间,我想使用Spark处理,所以我也尝试了这个

def add_id(url, id):
  for i in url_list:
    if i.website in url:
      dict_ids[i.website]+=id

add_id_udf=udf(add_id,StringType())

test = df_crawled_2.withColumn("Test", add_id_udf(df2['url'],df2['id']))
display(test)
input:
df1::
url
http://example.com
http://example2.com/index.html

df2::
url,id
http://example.com/contact, 12
http://example2.com/index.html/pif, 45
http://example.com/about, 68
http://example2.com/index.html/juk/er, 96

expected output:
df1::
url,id
http://example.com, [12,68]
http://example2.com/index.html, [45,96]

or even a dictionary is fine with urls as keys and id as values.

但在第二个案例中,这个dict\u id仍然是空的。有人能帮帮我吗?

mmvthczy

mmvthczy1#

我可以让你的例子起作用,但有两个条件。我正在使用 crossJoin 在两个Dataframe上使用 contains .

from pyspark.sql import SparkSession
from pyspark.sql.functions import (col, to_date,)
from pyspark.sql import Row
from pyspark.sql.functions import *
from pyspark.sql.types import *
from pyspark import SparkContext, SQLContext, SparkConf

config = SparkConf().setAll([('spark.sql.crossJoin.enabled', 'true')])

sc = SparkContext('local', conf=config)
sqlContext = SQLContext(sc)

data1 = [("http://example.com", -1),
       ("http://example2.com/index.html",-1)
      ]

df1Columns = ["url_first", "id"]
df1 = sqlContext.createDataFrame(data=data1, schema = df1Columns)
df1 = df1.drop("id")
df1.show(truncate=False)

data2 = [
    ("http://example.com/contact", 12),
    ("http://example2.com/index.html/pif", 45),
    ("http://example.com/about", 68),
    ("http://example2.com/index.html/juk/er", 96)
      ]

df2Columns = ["url_second", "id"]
df2 = sqlContext.createDataFrame(data=data2, schema = df2Columns)
df2.show(truncate=False)

joined_df = df2.crossJoin(df1)
joined_df.show(truncate=False)

inter_result = joined_df.withColumn("myjoin", col("url_second").contains(col("url_first")))
inter_result.show(n=200, truncate=False)

final_result = inter_result.filter(col("myjoin") == True).groupBy("url_first").agg(collect_list(col("id")).alias("id_list"))
final_result.show(n=200, truncate=False)

输出如下。

+------------------------------+
|url_first                     |
+------------------------------+
|http://example.com            |
|http://example2.com/index.html|
+------------------------------+

+-------------------------------------+---+
|url_second                           |id |
+-------------------------------------+---+
|http://example.com/contact           |12 |
|http://example2.com/index.html/pif   |45 |
|http://example.com/about             |68 |
|http://example2.com/index.html/juk/er|96 |
+-------------------------------------+---+

+-------------------------------------+---+------------------------------+
|url_second                           |id |url_first                     |
+-------------------------------------+---+------------------------------+
|http://example.com/contact           |12 |http://example.com            |
|http://example.com/contact           |12 |http://example2.com/index.html|
|http://example2.com/index.html/pif   |45 |http://example.com            |
|http://example2.com/index.html/pif   |45 |http://example2.com/index.html|
|http://example.com/about             |68 |http://example.com            |
|http://example.com/about             |68 |http://example2.com/index.html|
|http://example2.com/index.html/juk/er|96 |http://example.com            |
|http://example2.com/index.html/juk/er|96 |http://example2.com/index.html|
+-------------------------------------+---+------------------------------+

+-------------------------------------+---+------------------------------+------+
|url_second                           |id |url_first                     |myjoin|
+-------------------------------------+---+------------------------------+------+
|http://example.com/contact           |12 |http://example.com            |true  |
|http://example.com/contact           |12 |http://example2.com/index.html|false |
|http://example2.com/index.html/pif   |45 |http://example.com            |false |
|http://example2.com/index.html/pif   |45 |http://example2.com/index.html|true  |
|http://example.com/about             |68 |http://example.com            |true  |
|http://example.com/about             |68 |http://example2.com/index.html|false |
|http://example2.com/index.html/juk/er|96 |http://example.com            |false |
|http://example2.com/index.html/juk/er|96 |http://example2.com/index.html|true  |
+-------------------------------------+---+------------------------------+------+

+------------------------------+--------+
|url_first                     |id_list |
+------------------------------+--------+
|http://example.com            |[12, 68]|
|http://example2.com/index.html|[45, 96]|
+------------------------------+--------+

相关问题