如何使用pyspark加速从sparkDataframe写入sqlserver?

k7fdbhmy  于 2021-07-14  发布在  Spark
关注(0)|答案(0)|浏览(285)

在mssqlserver表中插入一个500mb、100000行的ndjson文件大约需要15分钟。我在一台规格很好的机器上运行spark,32gbram,i9-10885h,8核cpu。我怀疑这台机器是否已被充分利用了。这就是我正在尝试的。

master = "local[16]"

conf = SparkConf() \
    .setAppName(appName) \
    .set("spark.driver.memory", "16g") \
    .set("spark.executor.memory", "1g") \
    .set('spark.executor.cores', '5') \
    .set("spark.driver.extraClassPath","./mssql-jdbc-9.2.1.jre11.jar") \
    .setMaster(master)
sc = SparkContext(conf=conf)
sqlContext = SQLContext(sc)
spark = sqlContext.sparkSession

def insert_into_ss(start):
    for i in range(start, len(files)):
        item = files[i]
        print(item)

        start = datetime.now()

        spark_df = sqlContext.read.json(upload_dir + '/' +item)
        spark_df = spark_df.select([col(c).cast("string") for c in spark_df.columns])

        print('Casting time', datetime.now() - start)

        spark_df.write.mode("append") \
        .format("jdbc") \
        .option("url", url) \
        .option("dbtable", table) \
        .option("batchsize", 20000) \
        .option("reliabilityLevel", 'NO_DUPLICATES') \
        .option("tableLock", 'true') \
        .option("numPartitions", 16) \
        .option("bulkCopyTimeout", 600000) \
        .option("driver", "com.microsoft.sqlserver.jdbc.SQLServerDriver") \
        .save()
        end = datetime.now()
        print(end-start)
insert_into_ss()

暂无答案!

目前还没有任何答案,快来回答吧!

相关问题