我希望我的Spark应用程序从DynamoDB读取一个表,做一些事情,然后将结果写入DynamoDB。
将表读入DataFrame
现在,我可以将DynamoDB中的表作为hadoopRDD
读取到Spark中,并将其转换为DataFrame。然而,我必须使用正则表达式从AttributeValue
中提取值。有更好/更优雅的方法吗?在AWS API中找不到任何东西。
package main.scala.util
import org.apache.spark.sql.SparkSession
import org.apache.spark.SparkContext
import org.apache.spark.sql.SQLContext
import org.apache.spark.sql.functions._
import org.apache.spark.sql.types._
import org.apache.spark.rdd.RDD
import scala.util.matching.Regex
import java.util.HashMap
import com.amazonaws.services.dynamodbv2.model.AttributeValue
import org.apache.hadoop.io.Text;
import org.apache.hadoop.dynamodb.DynamoDBItemWritable
/* Importing DynamoDBInputFormat and DynamoDBOutputFormat */
import org.apache.hadoop.dynamodb.read.DynamoDBInputFormat
import org.apache.hadoop.dynamodb.write.DynamoDBOutputFormat
import org.apache.hadoop.mapred.JobConf
import org.apache.hadoop.io.LongWritable
object Tester {
// {S: 298905396168806365,}
def extractValue : (String => String) = (aws:String) => {
val pat_value = "\\s(.*),".r
val matcher = pat_value.findFirstMatchIn(aws)
matcher match {
case Some(number) => number.group(1).toString
case None => ""
}
}
def main(args: Array[String]) {
val spark = SparkSession.builder().getOrCreate()
val sparkContext = spark.sparkContext
import spark.implicits._
// UDF to extract Value from AttributeValue
val col_extractValue = udf(extractValue)
// Configure connection to DynamoDB
var jobConf_add = new JobConf(sparkContext.hadoopConfiguration)
jobConf_add.set("dynamodb.input.tableName", "MyTable")
jobConf_add.set("dynamodb.output.tableName", "MyTable")
jobConf_add.set("mapred.output.format.class", "org.apache.hadoop.dynamodb.write.DynamoDBOutputFormat")
jobConf_add.set("mapred.input.format.class", "org.apache.hadoop.dynamodb.read.DynamoDBInputFormat")
// org.apache.spark.rdd.RDD[(org.apache.hadoop.io.Text, org.apache.hadoop.dynamodb.DynamoDBItemWritable)]
var hadooprdd_add = sparkContext.hadoopRDD(jobConf_add, classOf[DynamoDBInputFormat], classOf[Text], classOf[DynamoDBItemWritable])
// Convert HadoopRDD to RDD
val rdd_add: RDD[(String, String)] = hadooprdd_add.map {
case (text, dbwritable) => (dbwritable.getItem().get("PIN").toString(), dbwritable.getItem().get("Address").toString())
}
// Convert RDD to DataFrame and extract Values from AttributeValue
val df_add = rdd_add.toDF()
.withColumn("PIN", col_extractValue($"_1"))
.withColumn("Address", col_extractValue($"_2"))
.select("PIN","Address")
}
}
字符串
将DataFrame写入DynamoDB
stackoverflow和其他地方的许多答案都只指向blog post和emr-dynamodb-hadoop github。这些资源中没有一个实际演示如何写入DynamoDB。
I tried converting我的DataFrame
到RDD[Row]
不成功。
df_add.rdd.saveAsHadoopDataset(jobConf_add)
型
将此DataFrame写入DynamoDB的步骤是什么?(如果您告诉我如何控制overwrite
vs putItem
,则会获得额外积分;)
注意:df_add
与DynamoDB中的MyTable
具有相同的模式。
编辑:我遵循this answer的建议,该建议指向Using Spark SQL for ETL上的这篇文章:
// Format table to DynamoDB format
val output_rdd = df_add.as[(String,String)].rdd.map(a => {
var ddbMap = new HashMap[String, AttributeValue]()
// Field PIN
var PINValue = new AttributeValue() // New AttributeValue
PINValue.setS(a._1) // Set value of Attribute as String. First element of tuple
ddbMap.put("PIN", PINValue) // Add to HashMap
// Field Address
var AddValue = new AttributeValue() // New AttributeValue
AddValue.setS(a._2) // Set value of Attribute as String
ddbMap.put("Address", AddValue) // Add to HashMap
var item = new DynamoDBItemWritable()
item.setItem(ddbMap)
(new Text(""), item)
})
output_rdd.saveAsHadoopDataset(jobConf_add)
型
然而,现在我得到java.lang.ClassCastException: java.lang.String cannot be cast to org.apache.hadoop.io.Text
尽管以下的文档.你有什么建议?
编辑2:阅读更仔细这篇文章在Using Spark SQL for ETL:
获得DataFrame后,执行转换以获得与DynamoDB自定义输出格式知道如何编写的类型相匹配的RDD。自定义输出格式需要包含Text和DynamoDBItemWritable
类型的元组。
考虑到这一点,下面的代码正是AWS博客文章所建议的,除了我将output_df
转换为rdd,否则saveAsHadoopDataset
无法工作。现在,我得到了Exception in thread "main" scala.reflect.internal.Symbols$CyclicReference: illegal cyclic reference involving object InterfaceAudience
。我已经无计可施了!
// Format table to DynamoDB format
val output_df = df_add.map(a => {
var ddbMap = new HashMap[String, AttributeValue]()
// Field PIN
var PINValue = new AttributeValue() // New AttributeValue
PINValue.setS(a.get(0).toString()) // Set value of Attribute as String
ddbMap.put("PIN", PINValue) // Add to HashMap
// Field Address
var AddValue = new AttributeValue() // New AttributeValue
AddValue.setS(a.get(1).toString()) // Set value of Attribute as String
ddbMap.put("Address", AddValue) // Add to HashMap
var item = new DynamoDBItemWritable()
item.setItem(ddbMap)
(new Text(""), item)
})
output_df.rdd.saveAsHadoopDataset(jobConf_add)
型
3条答案
按热度按时间9rnv2umw1#
我在“Using Spark SQL for ETL”链接下,发现了同样的“非法循环引用”异常。该异常的解决方案非常简单(但我花了2天时间才弄清楚),如下所示。关键是在框架的RDD上使用map函数,而不是框架本身。
字符串
gzjq41n42#
我希望我不会太晚参加聚会,以保存其他人的时间和精力。我一直在努力与这个Spark-> dynamodb的东西使用emr-dynamodb-connector(主要是依赖项冲突问题,如缺少类或方法)我花了几个小时才发现,由于其他AWS包的版本限制,我需要在pom.xml中包含
aws-java-sdk-dynamodb
包。doc关于它,但我发现它相当肤浅(步骤3缺少一些变量声明,步骤2在我的情况下是不必要的),它没有告诉你如何选择与Spark和Hadoop兼容的版本)。pvcm50d13#
这是一个比较简单的工作示例。
例如,使用Hadoop RDD从Kinesis Stream写入DynamoDB:-
https://github.com/kali786516/Spark2StructuredStreaming/blob/master/src/main/scala/com/dataframe/part11/kinesis/consumer/KinesisSaveAsHadoopDataSet/TransactionConsumerDstreamToDynamoDBHadoopDataSet.scala
用于使用Hadoop RDD和不带正则表达式的spark SQL从DynamoDB进行阅读。
字符串