(1)示例数据:`people.txt
Michael,29
Andy,30
Justin,19
```
(2)示例代码
```scala
import org.apache.spark.SparkContext
import org.apache.spark.rdd.RDD
import org.apache.spark.sql.types.{IntegerType, StringType, StructField, StructType}
import org.apache.spark.sql.{DataFrame, Row, SparkSession}
object RDDtoDataFrame {
case class People(name:String, age:Int)
def main(args: Array[String]): Unit = {
val spark:SparkSession = SparkSession.builder()
.master("local[4]")
.appName(this.getClass.getName)
.getOrCreate()
val sc:SparkContext = spark.sparkContext
import spark.implicits._
/***** 方式1:将RDD切割,然后关联case class,最后转换成DataFrame *****/
val peopleRDD:RDD[String] = sc.textFile("file:///E:\\hadoop\\input\\people.txt")
// 对RDD切割并关联到case class
val peopleDF:DataFrame = peopleRDD
.map(_.split(","))
.map(x=>People(x(0), x(1).toInt))
.toDF()
peopleDF.show()
// +-------+---+
// | name|age|
// +-------+---+
// |Michael| 29|
// | Andy| 30|
// | Justin| 19|
// +-------+---+
// 创建临时表
peopleDF.createOrReplaceTempView("people")
spark.sql("select * from people where name='Andy'").show()
// +----+---+
// |name|age|
// +----+---+
// |Andy| 30|
// +----+---+
/***** 方式2:将RDD通过和Schema信息关联, 得到DataFrame *****/
// 1. 通过StructType构建Schema
// StructFile(字段名, 字段类型, 字段的值是否可以为null),默认为true可以为null
val schema = StructType(Array(
StructField("name", StringType, true),
StructField("age", IntegerType, true)
))
// 2. 将每行字符串切割,切割成Array, 然后将其转化为RDD[Row]类型
val peopleRowRDD:RDD[Row] = peopleRDD
.map(_.split(","))
.map(x=>Row(x(0), x(1).toInt))
// 3. 将Row类型的RDD和Schema信息关联, 创建一个DataFrame
val df:DataFrame = spark.createDataFrame(peopleRowRDD, schema)
df.createOrReplaceTempView("people2")
spark.sql("select * from people2").show()
// +-------+---+
// | name|age|
// +-------+---+
// |Michael| 29|
// | Andy| 30|
// | Justin| 19|
// +-------+---+
}
}
```
内容来源于网络,如有侵权,请联系作者删除!