当所有列都不在源中时,有没有办法在spark/databricks合并查询上“set*”?

3duebb1j  于 2021-07-12  发布在  Spark
关注(0)|答案(1)|浏览(304)

我正在spark/databricks中将一个表中的数据合并到另一个表中。我能做到 update set * 如果选择了所有列,但如果未选择所有列(例如,如果下面查询的dst表中有col9),则此操作失败。有没有一种方法可以在不重复列列表的情况下进行合并 when matched (即在源查询中按列名匹配)和 when not matched 部分?

merge into demo dst
using (
select distinct
  col1,
  col2,
  col3,
  col4,
  col5,
  col6,
  col7,
  col8
from
  demo_raw
where 1=1
  and col1 = 'ac'
  and col2 is not null
) src
on src.col1 = dst.col1
when matched then
  update set *
when not matched then
  insert *
;
raogr8fs

raogr8fs1#

下面是一个我比较满意的python解决方案:
功能定义:

%python
def merge(srcTableName, dstTableName, whereStr, joinStr, cols, selectCols, debug):

  # create the sql string
  sqlString = ""
  sqlString += "merge into " + dstTableName + " t\n"
  sqlString += "using ( \n"
  sqlString += "select \n"
  for str in selectCols[:-1]:
    sqlString += "  " + str + ", \n"
  sqlString += "  " + selectCols[-1] + "\n"
  sqlString += "from " + srcTableName
  sqlString += whereStr + "\n"
  sqlString += ") s\n"
  sqlString += "on \n"
  sqlString += joinStr + "\n"
  sqlString += "when matched then update set \n"
  updateCols = [s + " = s." + s for s in cols]
  for str in updateCols[:-1]:
    sqlString += "  " + str + ", \n"
  sqlString += "  " + updateCols[-1]
  sqlString += """
  when not matched then 
  insert (
  """
  for str in cols[:-1]:
    sqlString += "  " + str + ", \n"
  sqlString += "  " + cols[-1]
  sqlString += """
  )
  values (
  """
  for str in cols[:-1]:
    sqlString += "  " + str + ", \n"
  sqlString += "  " + cols[-1] + "\n"
  sqlString += ")"

  if debug == True:
    print(sqlString)

  spark.sql(sqlString)
  return sqlString

呼叫示例:


# define the tables (FROM MY_PROJECT.DEMO TO DEMO)

srcTableName = "my_project.demo"
dstTableName = "demo"

# define the where clause (ONLY AC DATA)

whereStr = """
where 1=1
  and org = 'ac'
  and org_patient_id is not null
"""

# define the join (MATCH ON PATIENT_ID)

joinStr = "t.patient_id = s.patient_id"

# define the columns (JUST THESE COLUMNS)

cols = [
  'org',
  'data_lot',
  'raw_table',
  'org_patient_id',
  'patient_id',
  'sex',
  'sexual_orientation',
  'year_of_birth',
  'year_of_death',
  'race',
  'ethnicity',
  'city',
  'state',
  'zip'  
]

# create any needed aliases for query string

selectCols = cols
selectCols = [x if x != 'year_of_birth' else '(2020 - age) as year_of_birth' for x in selectCols]

# do the merge

merge(srcTableName, dstTableName, whereStr, joinStr, cols, selectCols)
print("Done.")

相关问题