在spark/scala中将rdd转换为Dataframe

w3nuxt5m  于 2021-05-29  发布在  Hadoop
关注(0)|答案(4)|浏览(690)

rdd已按以下格式创建 Array[Array[String]] 具有以下值:

val rdd : Array[Array[String]] = Array(
Array("4580056797", "0", "2015-07-29 10:38:42", "0", "1", "1"), 
Array("4580056797", "0", "2015-07-29 10:38:43", "0", "1", "1"))

我要使用以下架构创建Dataframe:

val schemaString = "callId oCallId callTime duration calltype swId"

下一步:

scala> val rowRDD = rdd.map(p => Array(p(0), p(1), p(2),p(3),p(4),p(5).trim))
rowRDD: org.apache.spark.rdd.RDD[Array[String]] = MapPartitionsRDD[14] at map at <console>:39
scala> val calDF = sqlContext.createDataFrame(rowRDD, schema)

出现以下错误:

console:45: error: overloaded method value createDataFrame with alternatives:
     (rdd: org.apache.spark.api.java.JavaRDD[_],beanClass: Class[_])org.apache.spark.sql.DataFrame <and>
    (rdd: org.apache.spark.rdd.RDD[_],beanClass: Class[_])org.apache.spark.sql.DataFrame <and>
    (rowRDD: org.apache.spark.api.java.JavaRDD[org.apache.spark.sql.Row],schema: org.apache.spark.sql.types.StructType)org.apache.spark.sql.DataFrame <and>
    (rowRDD: org.apache.spark.rdd.RDD[org.apache.spark.sql.Row],schema: org.apache.spark.sql.types.StructType)org.apache.spark.sql.DataFrame
    cannot be applied to (org.apache.spark.rdd.RDD[Array[String]],   
    org.apache.spark.sql.types.StructType)
       val calDF = sqlContext.createDataFrame(rowRDD, schema)
idv4meu8

idv4meu81#

使用 spark 1.6.1 以及 scala 2.10 我也犯了同样的错误 error: overloaded method value createDataFrame with alternatives: 对我来说,抓住你就是我的签名 createDataFrame ,我想用 val rdd : List[Row] ,但失败的原因是 java.util.List[org.apache.spark.sql.Row] 以及 scala.collection.immutable.List[org.apache.spark.sql.Row] 都不一样。
我找到的解决办法是 val rdd : Array[Array[String]] 进入 RDD[Row] 通过 List[Array[String]] . 我发现这是最接近的文件

import org.apache.spark.sql.Row
import org.apache.spark.sql.types.{StructType,StructField,StringType};
val sqlContext = new org.apache.spark.sql.SQLContext(sc)

val rdd_original : Array[Array[String]] = Array(
    Array("4580056797", "0", "2015-07-29 10:38:42", "0", "1", "1"), 
    Array("4580056797", "0", "2015-07-29 10:38:42", "0", "1", "1"))

val rdd : List[Array[String]] = rdd_original.toList

val schemaString = "callId oCallId callTime duration calltype swId"

// Generate the schema based on the string of schema
val schema =
  StructType(
    schemaString.split(" ").map(fieldName => StructField(fieldName, StringType, true)))

// Convert records of the RDD to Rows.
val rowRDD = rdd.map(p => Row(p: _*)) // using splat is easier
// val rowRDD = rdd.map(p => Row(p(0), p(1), p(2), p(3), p(4), p(5))) // this also works

val df = sqlContext.createDataFrame(sc.parallelize(rowRDD:List[Row]), schema)
df.show
0mkxixxg

0mkxixxg2#

我想你的 schema 如Spark指南中所示,如下所示:

val schema =
  StructType(
    schemaString.split(" ").map(fieldName => StructField(fieldName, StringType, true)))

如果您查看createdataframe的签名,下面是一个接受structtype作为第二个参数的签名(对于scala)
def createdataframe(rowrdd:rdd[row],schema:structtype):Dataframe
使用给定的模式从包含行的rdd创建Dataframe。
所以它接受作为第一个论点 RDD[Row] . 你身上有什么 rowRDD 是一个 RDD[Array[String]] 因此存在不匹配。
你需要一个 RDD[Array[String]] ?
否则,您可以使用以下方法创建Dataframe:

val rowRDD = rdd.map(p => Row(p(0), p(1), p(2),p(3),p(4),p(5).trim))
zrfyljdw

zrfyljdw3#

只需粘贴到 spark-shell :

val a = 
  Array(
    Array("4580056797", "0", "2015-07-29 10:38:42", "0", "1", "1"), 
    Array("4580056797", "0", "2015-07-29 10:38:42", "0", "1", "1"))

val rdd = sc.makeRDD(a)

case class X(callId: String, oCallId: String, 
  callTime: String, duration: String, calltype: String, swId: String)

那么 map() 在rdd上创建case类的示例,然后使用 toDF() :

scala> val df = rdd.map { 
  case Array(s0, s1, s2, s3, s4, s5) => X(s0, s1, s2, s3, s4, s5) }.toDF()
df: org.apache.spark.sql.DataFrame = 
  [callId: string, oCallId: string, callTime: string, 
    duration: string, calltype: string, swId: string]

这将从case类推断模式。
然后您可以继续:

scala> df.printSchema()
root
 |-- callId: string (nullable = true)
 |-- oCallId: string (nullable = true)
 |-- callTime: string (nullable = true)
 |-- duration: string (nullable = true)
 |-- calltype: string (nullable = true)
 |-- swId: string (nullable = true)

scala> df.show()
+----------+-------+-------------------+--------+--------+----+
|    callId|oCallId|           callTime|duration|calltype|swId|
+----------+-------+-------------------+--------+--------+----+
|4580056797|      0|2015-07-29 10:38:42|       0|       1|   1|
|4580056797|      0|2015-07-29 10:38:42|       0|       1|   1|
+----------+-------+-------------------+--------+--------+----+

如果你想用 toDF() 在正常程序中(不是在 spark-shell ),确保(此处引用):
import sqlContext.implicits._ 就在创建 SQLContext 使用在方法外定义case类 toDF()

5gfr0r5j

5gfr0r5j4#

你需要先改变你的信仰 Array 进入 Row 然后定义模式。我假设你的大部分领域 Long ```
val rdd: RDD[Array[String]] = ???
val rows: RDD[Row] = rdd map {
case Array(callId, oCallId, callTime, duration, swId) =>
Row(callId.toLong, oCallId.toLong, callTime, duration.toLong, swId.toLong)
}

object schema {
  val callId = StructField("callId", LongType)
  val oCallId = StructField("oCallId", StringType)
  val callTime = StructField("callTime", StringType)
  val duration = StructField("duration", LongType)
  val swId = StructField("swId", LongType)

  val struct = StructType(Array(callId, oCallId, callTime, duration, swId))
}

sqlContext.createDataFrame(rows, schema.struct)

相关问题