我有一个函数all\u purch\u spark(),它为五个不同的表设置spark上下文和sql上下文。然后,相同的函数成功地对aws redshift db运行sql查询。效果很好。我包括下面的整个功能(当然去除敏感数据)。请原谅它的长度,但我想表明它是鉴于我所面临的问题。
我的问题是第二个函数reprch\u prep()以及它如何调用第一个函数all\u purch\u spark()。我不知道如何避免这样的错误:nameerror:name'sqlcontext'未定义
我将在下面展示这两个函数和错误。
这里是第一个函数all\u purch\u spark()。我再次把整个函数放在这里作为参考。我知道它很长,但不确定我能不能把它简化成一个有意义的例子。
def all_purch_spark():
config = {
'redshift_user': 'tester123',
'redshift_pass': '*****************',
'redshift_port': "5999",
'redshift_db': 'my_database',
'redshift_host': 'redshift.my_database.me',
}
from pyspark import SparkContext, SparkConf, SQLContext
jars = [
"/home/spark/SparkNotebooks/src/service/RedshiftJDBC42-no-awssdk-1.2.41.1065.jar"
]
conf = (
SparkConf()
.setAppName("S3 with Redshift")
.set("spark.driver.extraClassPath", ":".join(jars))
.set("spark.hadoop.fs.s3a.path.style.access", True)
.set("spark.hadoop.fs.s3a.impl", "org.apache.hadoop.fs.s3a.S3AFileSystem")
.set("com.amazonaws.services.s3.enableV4", True)
.set("spark.hadoop.fs.s3a.endpoint", f"s3-{config.get('region')}.amazonaws.com")
.set("spark.executor.extraJavaOptions", "-Dcom.amazonaws.services.s3.enableV4=true")
.set("spark.driver.extraJavaOptions", "-Dcom.amazonaws.services.s3.enableV4=true")
)
sc = SparkContext(conf=conf).getOrCreate()
sqlContext = SQLContext(sc)
##Set Schema and table to query
schema1 = 'production'
schema2 = 'X4production'
table1 = 'purchases'
table2 = 'customers'
table3 = 'memberships'
table4 = 'users' #set as users table in both schemas
purchases_df = sqlContext.read \
.format("jdbc") \
.option("url", f"jdbc:postgresql://{config.get('redshift_host')}:{config.get('redshift_port')}/{config.get('redshift_db')}") \
.option("dbtable", f"{schema1}.{table1}") \
.option("user", config.get('redshift_user')) \
.option("password", config.get('redshift_pass')) \
.load()
customers_df = sqlContext.read \
.format("jdbc") \
.option("url", f"jdbc:postgresql://{config.get('redshift_host')}:{config.get('redshift_port')}/{config.get('redshift_db')}") \
.option("dbtable", f"{schema1}.{table2}") \
.option("user", config.get('redshift_user')) \
.option("password", config.get('redshift_pass')) \
.load()
memberships_df = sqlContext.read \
.format("jdbc") \
.option("url", f"jdbc:postgresql://{config.get('redshift_host')}:{config.get('redshift_port')}/{config.get('redshift_db')}") \
.option("dbtable", f"{schema1}.{table3}") \
.option("user", config.get('redshift_user')) \
.option("password", config.get('redshift_pass')) \
.load()
users_df = sqlContext.read \
.format("jdbc") \
.option("url", f"jdbc:postgresql://{config.get('redshift_host')}:{config.get('redshift_port')}/{config.get('redshift_db')}") \
.option("dbtable", f"{schema1}.{table4}") \
.option("user", config.get('redshift_user')) \
.option("password", config.get('redshift_pass')) \
.load()
cusers_df = sqlContext.read \
.format("jdbc") \
.option("url", f"jdbc:postgresql://{config.get('redshift_host')}:{config.get('redshift_port')}/{config.get('redshift_db')}") \
.option("dbtable", f"{schema2}.{table4}") \
.option("user", config.get('redshift_user')) \
.option("password", config.get('redshift_pass')) \
.load()
from pyspark.sql import SparkSession
spark = SparkSession.builder.appName('fc_purchases').getOrCreate()
purchases_df.createOrReplaceTempView('purchases')
customers_df.createOrReplaceTempView('customers')
memberships_df.createOrReplaceTempView('memberships')
users_df.createOrReplaceTempView('users')
cusers_df.createOrReplaceTempView('cusers')
all_purch = spark.sql("SELECT \
p_paid.customer_id AS p_paid_user_id \
,p_trial.created_at AS trial_start_date \
,p_paid.created_at \
,cu.graduation_year \
,lower(cu.student_year) AS student_year \
,lower(p_paid.description) as product \
,u.email \
,u.id AS u_user_id \
,cu.id AS cu_user_id \
FROM \
purchases AS p_paid \
INNER JOIN purchases AS p_trial ON p_trial.customer_id = p_paid.customer_id \
INNER JOIN customers AS c on c.id = p_paid.customer_id \
INNER JOIN memberships AS m on m.id = c.membership_id \
INNER JOIN users AS u on u.id = m.user_id \
INNER JOIN cusers AS cu on cu.id = u.id \
WHERE \
p_trial.created_at >= '2018-03-01' \
AND p_paid.created_at >= '2018-03-01' \
AND u.institution_contract = false \
AND LOWER(u.email) not like '%hotmail.me%' \
AND LOWER(u.email) not like '%gmail.com%' \
AND p_trial.description like '% Day Free Trial' \
AND p_paid.status = 'paid' \
GROUP BY \
p_paid_user_id \
,trial_start_date \
,p_paid.created_at \
,u.email \
,cu.graduation_year \
,student_year \
,product \
,cu_user_id \
,u_user_id \
ORDER BY p_paid_user_id")
all_purch.registerTempTable("all_purch_table")
return all_purch
下面是调用上述函数的第二个函数。应根据上述函数中设置的已注册表视图进行选择:
def repurch_prep():
all_purch_spark()
all_repurch = sqlContext.sql("SELECT * FROM all_purch_table WHERE p_paid_user_id IN \
(SELECT p_paid_user_id FROM all_purch_table GROUP BY p_paid_user_id HAVING COUNT(*) > 1) \
ORDER BY p_paid_user_id ASC")
return all_repurch
当我运行reprch\u prep()时,它会抛出以下异常,即使sql上下文是在上面的函数中定义的。我已尝试返回上面的值,但不知道如何使其工作:
---------------------------------------------------------------------------
NameError Traceback (most recent call last)
in
----> 1 repurch_prep()
~/spark/SparkNotebooks/firecracker/utils_prod_db_spark.py in repurch_prep()
735 #sc = SparkContext().getOrCreate()
736 #sqlContext = SQLContext()
--> 737 all_repurch = sqlContext.sql("SELECT * FROM all_purch_table WHERE p_paid_user_id IN \
738 (SELECT p_paid_user_id FROM all_purch_table GROUP BY p_paid_user_id HAVING COUNT(*) > 1) \
739 ORDER BY p_paid_user_id ASC")
NameError: name 'sqlContext' is not defined
非常感谢您的帮助。
1条答案
按热度按时间wnvonmuf1#
per@lamanus的解决方案是将变量放在函数外部,使它们成为全局变量,而不是像我那样将它们存储在一个函数中,然后从另一个函数调用该函数。