将rdd转换为dataframe:attributeerror:'rdd'对象没有使用pyspark的属性'todf'

roejwanj  于 2021-05-27  发布在  Spark
关注(0)|答案(2)|浏览(903)

我正在尝试使用pyspark将rdd转换为Dataframe。下面是我的代码。

from pyspark import SparkConf, SparkContext
from pyspark.sql.functions import *
from pyspark.sql import SparkSession

conf = SparkConf().setMaster("local").setAppName("Dataframe_examples")
sc = SparkContext(conf=conf)

def parsedLine(line):
    fields = line.split(',')
    movieId = fields[0]
    movieName = fields[1]
    genres = fields[2]
    return movieId, movieName, genres

movies = sc.textFile("file:///home/ajit/ml-25m/movies.csv")
parsedLines = movies.map(parsedLine)
print(parsedLines.count())

dataFrame = parsedLines.toDF(["movieId"])
dataFrame.printSchema()

我正在用pycharm ide运行这个代码。
我得到一个错误:

File "/home/ajit/PycharmProjects/pythonProject/Dataframe_examples.py", line 19, in <module>
    dataFrame = parsedLines.toDF(["movieId"])
AttributeError: 'PipelinedRDD' object has no attribute 'toDF'

因为我是新来的,让我知道我错过了什么?

a14dhokn

a14dhokn1#

初始化 SparkSession 通过传递sparkcontext。 Example: ```
from pyspark import SparkConf, SparkContext
from pyspark.sql.functions import *
from pyspark.sql import SparkSession

conf = SparkConf().setMaster("local").setAppName("Dataframe_examples")
sc = SparkContext(conf=conf)

spark = SparkSession(sc)

def parsedLine(line):
fields = line.split(',')
movieId = fields[0]
movieName = fields[1]
genres = fields[2]
return movieId, movieName, genres

movies = sc.textFile("file:///home/ajit/ml-25m/movies.csv")

or using spark.sparkContext

movies = spark.sparkContext.textFile("file:///home/ajit/ml-25m/movies.csv")

parsedLines = movies.map(parsedLine)
print(parsedLines.count())

dataFrame = parsedLines.toDF(["movieId"])
dataFrame.printSchema()

chy5wohz

chy5wohz2#

使用 SparkSession 要使rddDataframe如下所示:

movies = sc.textFile("file:///home/ajit/ml-25m/movies.csv")
parsedLines = movies.map(parsedLine)
print(parsedLines.count())

spark = SparkSession.builder.getOrCreate()
dataFrame = spark.createDataFrame(parsedLines).toDF(["movieId"])
dataFrame.printSchema()

或者首先使用会话中的spark上下文。

spark = SparkSession.builder.master("local").appName("Dataframe_examples").getOrCreate()
sc = spark.sparkContext

相关问题