'sqlcontext'在我的函数调用另一个函数时没有定义

jecbmhm3  于 2021-05-27  发布在  Spark
关注(0)|答案(1)|浏览(318)

我有一个函数all\u purch\u spark(),它为五个不同的表设置spark上下文和sql上下文。然后,相同的函数成功地对aws redshift db运行sql查询。效果很好。我包括下面的整个功能(当然去除敏感数据)。请原谅它的长度,但我想表明它是鉴于我所面临的问题。
我的问题是第二个函数reprch\u prep()以及它如何调用第一个函数all\u purch\u spark()。我不知道如何避免这样的错误:nameerror:name'sqlcontext'未定义
我将在下面展示这两个函数和错误。
这里是第一个函数all\u purch\u spark()。我再次把整个函数放在这里作为参考。我知道它很长,但不确定我能不能把它简化成一个有意义的例子。

def all_purch_spark():
    config = {
    'redshift_user': 'tester123',
    'redshift_pass': '*****************',
    'redshift_port': "5999",
    'redshift_db': 'my_database',
    'redshift_host': 'redshift.my_database.me',
    }

    from pyspark import SparkContext, SparkConf, SQLContext
    jars = [
    "/home/spark/SparkNotebooks/src/service/RedshiftJDBC42-no-awssdk-1.2.41.1065.jar"
    ]
    conf = (
        SparkConf()
        .setAppName("S3 with Redshift")
        .set("spark.driver.extraClassPath", ":".join(jars))
        .set("spark.hadoop.fs.s3a.path.style.access", True)
        .set("spark.hadoop.fs.s3a.impl", "org.apache.hadoop.fs.s3a.S3AFileSystem")
        .set("com.amazonaws.services.s3.enableV4", True)
        .set("spark.hadoop.fs.s3a.endpoint", f"s3-{config.get('region')}.amazonaws.com")
        .set("spark.executor.extraJavaOptions", "-Dcom.amazonaws.services.s3.enableV4=true")
        .set("spark.driver.extraJavaOptions", "-Dcom.amazonaws.services.s3.enableV4=true")
    )
    sc = SparkContext(conf=conf).getOrCreate()
    sqlContext = SQLContext(sc)

    ##Set Schema and table to query
    schema1 = 'production'
    schema2 = 'X4production'
    table1 = 'purchases'
    table2 = 'customers'
    table3 = 'memberships'
    table4 = 'users'  #set as users table in both schemas

    purchases_df = sqlContext.read \
               .format("jdbc") \
               .option("url", f"jdbc:postgresql://{config.get('redshift_host')}:{config.get('redshift_port')}/{config.get('redshift_db')}") \
               .option("dbtable", f"{schema1}.{table1}") \
               .option("user", config.get('redshift_user')) \
               .option("password", config.get('redshift_pass')) \
               .load()

    customers_df = sqlContext.read \
               .format("jdbc") \
               .option("url", f"jdbc:postgresql://{config.get('redshift_host')}:{config.get('redshift_port')}/{config.get('redshift_db')}") \
               .option("dbtable", f"{schema1}.{table2}") \
               .option("user", config.get('redshift_user')) \
               .option("password", config.get('redshift_pass')) \
               .load()

    memberships_df = sqlContext.read \
               .format("jdbc") \
               .option("url", f"jdbc:postgresql://{config.get('redshift_host')}:{config.get('redshift_port')}/{config.get('redshift_db')}") \
               .option("dbtable", f"{schema1}.{table3}") \
               .option("user", config.get('redshift_user')) \
               .option("password", config.get('redshift_pass')) \
               .load()

    users_df = sqlContext.read \
               .format("jdbc") \
               .option("url", f"jdbc:postgresql://{config.get('redshift_host')}:{config.get('redshift_port')}/{config.get('redshift_db')}") \
               .option("dbtable", f"{schema1}.{table4}") \
               .option("user", config.get('redshift_user')) \
               .option("password", config.get('redshift_pass')) \
               .load()

    cusers_df = sqlContext.read \
               .format("jdbc") \
               .option("url", f"jdbc:postgresql://{config.get('redshift_host')}:{config.get('redshift_port')}/{config.get('redshift_db')}") \
               .option("dbtable", f"{schema2}.{table4}") \
               .option("user", config.get('redshift_user')) \
               .option("password", config.get('redshift_pass')) \
               .load()

    from pyspark.sql import SparkSession
    spark = SparkSession.builder.appName('fc_purchases').getOrCreate()

    purchases_df.createOrReplaceTempView('purchases')
    customers_df.createOrReplaceTempView('customers')
    memberships_df.createOrReplaceTempView('memberships')
    users_df.createOrReplaceTempView('users')
    cusers_df.createOrReplaceTempView('cusers')

    all_purch = spark.sql("SELECT \
    p_paid.customer_id AS p_paid_user_id \
    ,p_trial.created_at AS trial_start_date \
    ,p_paid.created_at \
    ,cu.graduation_year \
    ,lower(cu.student_year) AS student_year \
    ,lower(p_paid.description) as product \
    ,u.email \
    ,u.id AS u_user_id \
    ,cu.id AS cu_user_id \
    FROM \
    purchases AS p_paid \
    INNER JOIN purchases AS p_trial ON p_trial.customer_id = p_paid.customer_id \
    INNER JOIN customers AS c on c.id = p_paid.customer_id \
    INNER JOIN memberships AS m on m.id = c.membership_id \
    INNER JOIN users AS u on u.id = m.user_id \
    INNER JOIN cusers AS cu on cu.id = u.id \
    WHERE \
    p_trial.created_at >= '2018-03-01' \
    AND p_paid.created_at >= '2018-03-01' \
    AND u.institution_contract = false \
    AND LOWER(u.email) not like '%hotmail.me%' \
    AND LOWER(u.email) not like '%gmail.com%' \
    AND p_trial.description like '% Day Free Trial' \
    AND p_paid.status = 'paid' \
    GROUP BY \
    p_paid_user_id \
    ,trial_start_date \
    ,p_paid.created_at \
    ,u.email \
    ,cu.graduation_year \
    ,student_year \
    ,product \
    ,cu_user_id \
    ,u_user_id \
    ORDER BY p_paid_user_id") 
    all_purch.registerTempTable("all_purch_table")

    return all_purch

下面是调用上述函数的第二个函数。应根据上述函数中设置的已注册表视图进行选择:

def repurch_prep():
        all_purch_spark()
        all_repurch = sqlContext.sql("SELECT * FROM all_purch_table WHERE p_paid_user_id IN \
        (SELECT p_paid_user_id FROM all_purch_table GROUP BY p_paid_user_id HAVING COUNT(*) > 1) \
        ORDER BY p_paid_user_id ASC")
        return all_repurch

当我运行reprch\u prep()时,它会抛出以下异常,即使sql上下文是在上面的函数中定义的。我已尝试返回上面的值,但不知道如何使其工作:

---------------------------------------------------------------------------
NameError                                 Traceback (most recent call last)
 in 
----> 1 repurch_prep()

~/spark/SparkNotebooks/firecracker/utils_prod_db_spark.py in repurch_prep()
    735     #sc = SparkContext().getOrCreate()
    736     #sqlContext = SQLContext()
--> 737     all_repurch = sqlContext.sql("SELECT * FROM all_purch_table WHERE p_paid_user_id IN \
    738     (SELECT p_paid_user_id FROM all_purch_table GROUP BY p_paid_user_id HAVING COUNT(*) > 1) \
    739     ORDER BY p_paid_user_id ASC")

NameError: name 'sqlContext' is not defined

非常感谢您的帮助。

wnvonmuf

wnvonmuf1#

per@lamanus的解决方案是将变量放在函数外部,使它们成为全局变量,而不是像我那样将它们存储在一个函数中,然后从另一个函数调用该函数。


############### SPARK REDSHIFT GLOBAL CONFIG #####################

    config = {
    'redshift_user': 'tester123',
    'redshift_pass': '*****************',
    'redshift_port': "5999",
    'redshift_db': 'my_database',
    'redshift_host': 'redshift.my_database.me',
    }

    from pyspark import SparkContext, SparkConf, SQLContext
    jars = [
    "/home/spark/SparkNotebooks/src/service/RedshiftJDBC42-no-awssdk-1.2.41.1065.jar"
    ]
    conf = (
        SparkConf()
        .setAppName("S3 with Redshift")
        .set("spark.driver.extraClassPath", ":".join(jars))
        .set("spark.hadoop.fs.s3a.path.style.access", True)
        .set("spark.hadoop.fs.s3a.impl", "org.apache.hadoop.fs.s3a.S3AFileSystem")
        .set("com.amazonaws.services.s3.enableV4", True)
        .set("spark.hadoop.fs.s3a.endpoint", f"s3-{config.get('region')}.amazonaws.com")
        .set("spark.executor.extraJavaOptions", "-Dcom.amazonaws.services.s3.enableV4=true")
        .set("spark.driver.extraJavaOptions", "-Dcom.amazonaws.services.s3.enableV4=true")
    )
    sc = SparkContext(conf=conf).getOrCreate()

############################################################### 

def all_purch_spark():
    sqlContext = SQLContext(sc)

    ##Set Schema and table to query
    schema1 = 'production'
    schema2 = 'X4production'
    table1 = 'purchases'
    table2 = 'customers'
    table3 = 'memberships'
    table4 = 'users'  #set as users table in both schemas

    purchases_df = sqlContext.read \
               .format("jdbc") \
               .option("url", f"jdbc:postgresql://{config.get('redshift_host')}:{config.get('redshift_port')}/{config.get('redshift_db')}") \
               .option("dbtable", f"{schema1}.{table1}") \
               .option("user", config.get('redshift_user')) \
               .option("password", config.get('redshift_pass')) \
               .load()

    customers_df = sqlContext.read \
               .format("jdbc") \
               .option("url", f"jdbc:postgresql://{config.get('redshift_host')}:{config.get('redshift_port')}/{config.get('redshift_db')}") \
               .option("dbtable", f"{schema1}.{table2}") \
               .option("user", config.get('redshift_user')) \
               .option("password", config.get('redshift_pass')) \
               .load()

    memberships_df = sqlContext.read \
               .format("jdbc") \
               .option("url", f"jdbc:postgresql://{config.get('redshift_host')}:{config.get('redshift_port')}/{config.get('redshift_db')}") \
               .option("dbtable", f"{schema1}.{table3}") \
               .option("user", config.get('redshift_user')) \
               .option("password", config.get('redshift_pass')) \
               .load()

    users_df = sqlContext.read \
               .format("jdbc") \
               .option("url", f"jdbc:postgresql://{config.get('redshift_host')}:{config.get('redshift_port')}/{config.get('redshift_db')}") \
               .option("dbtable", f"{schema1}.{table4}") \
               .option("user", config.get('redshift_user')) \
               .option("password", config.get('redshift_pass')) \
               .load()

    cusers_df = sqlContext.read \
               .format("jdbc") \
               .option("url", f"jdbc:postgresql://{config.get('redshift_host')}:{config.get('redshift_port')}/{config.get('redshift_db')}") \
               .option("dbtable", f"{schema2}.{table4}") \
               .option("user", config.get('redshift_user')) \
               .option("password", config.get('redshift_pass')) \
               .load()

    from pyspark.sql import SparkSession
    spark = SparkSession.builder.appName('fc_purchases').getOrCreate()

    purchases_df.createOrReplaceTempView('purchases')
    customers_df.createOrReplaceTempView('customers')
    memberships_df.createOrReplaceTempView('memberships')
    users_df.createOrReplaceTempView('users')
    cusers_df.createOrReplaceTempView('cusers')

    all_purch = spark.sql("SELECT \
    p_paid.customer_id AS p_paid_user_id \
    ,p_trial.created_at AS trial_start_date \
    ,p_paid.created_at \
    ,cu.graduation_year \
    ,lower(cu.student_year) AS student_year \
    ,lower(p_paid.description) as product \
    ,u.email \
    ,u.id AS u_user_id \
    ,cu.id AS cu_user_id \
    FROM \
    purchases AS p_paid \
    INNER JOIN purchases AS p_trial ON p_trial.customer_id = p_paid.customer_id \
    INNER JOIN customers AS c on c.id = p_paid.customer_id \
    INNER JOIN memberships AS m on m.id = c.membership_id \
    INNER JOIN users AS u on u.id = m.user_id \
    INNER JOIN cusers AS cu on cu.id = u.id \
    WHERE \
    p_trial.created_at >= '2018-03-01' \
    AND p_paid.created_at >= '2018-03-01' \
    AND u.institution_contract = false \
    AND LOWER(u.email) not like '%hotmail.me%' \
    AND LOWER(u.email) not like '%gmail.com%' \
    AND p_trial.description like '% Day Free Trial' \
    AND p_paid.status = 'paid' \
    GROUP BY \
    p_paid_user_id \
    ,trial_start_date \
    ,p_paid.created_at \
    ,u.email \
    ,cu.graduation_year \
    ,student_year \
    ,product \
    ,cu_user_id \
    ,u_user_id \
    ORDER BY p_paid_user_id") 
    all_purch.registerTempTable("all_purch_table")

    return all_purch

相关问题