本文整理了Java中org.apache.spark.sql.DataFrameWriter.insertInto()
方法的一些代码示例,展示了DataFrameWriter.insertInto()
的具体用法。这些代码示例主要来源于Github
/Stackoverflow
/Maven
等平台,是从一些精选项目中提取出来的代码,具有较强的参考意义,能在一定程度帮忙到你。DataFrameWriter.insertInto()
方法的具体详情如下:
包路径:org.apache.spark.sql.DataFrameWriter
类名称:DataFrameWriter
方法名:insertInto
暂无
代码示例来源:origin: Impetus/Kundera
@Override
public void saveDataFrame(DataFrame dataFrame, Class<?> entityClazz, Map<String, Object> properties)
{
dataFrame.sqlContext().sql("use " + (String) properties.get(KEYSPACE));
dataFrame.write().insertInto((String) properties.get(TABLE));
}
}
代码示例来源:origin: Impetus/Kundera
@Override
public boolean persist(List listEntity, EntityMetadata m, SparkClient sparkClient)
{
try
{
Seq s = scala.collection.JavaConversions.asScalaBuffer(listEntity).toList();
ClassTag tag = scala.reflect.ClassTag$.MODULE$.apply(m.getEntityClazz());
JavaRDD personRDD = sparkClient.sparkContext.parallelize(s, 1, tag).toJavaRDD();
DataFrame df = sparkClient.sqlContext.createDataFrame(personRDD, m.getEntityClazz());
sparkClient.sqlContext.sql("use " + m.getSchema());
if (logger.isDebugEnabled())
{
logger.info("Below are the registered table with hive context: ");
sparkClient.sqlContext.sql("show tables").show();
}
df.write().insertInto(m.getTableName());
return true;
}
catch (Exception e)
{
throw new KunderaException("Cannot persist object(s)", e);
}
}
代码示例来源:origin: cerner/bunsen
/**
* Writes mapping records to a table. This class ensures the columns and partitions are mapped
* properly, and is a workaround similar to the problem described <a
* href="http://stackoverflow.com/questions/35313077/pyspark-order-of-column-on-write-to-mysql-with-jdbc">here</a>.
*
* @param mappings a dataset of mapping records
* @param tableName the table to write them to
*/
private static void writeMappingsToTable(Dataset<Mapping> mappings,
String tableName) {
// Note the last two columns here must be the partitioned-by columns
// in order and in lower case for Spark to properly match
// them to the partitions.
Dataset<Row> orderedColumnDataset =
mappings.select("sourceValueSet",
"targetValueSet",
"sourceSystem",
"sourceValue",
"targetSystem",
"targetValue",
"equivalence",
"conceptmapuri",
"conceptmapversion");
orderedColumnDataset
.write()
.insertInto(tableName);
}
代码示例来源:origin: com.cerner.bunsen/bunsen-core
/**
* Writes mapping records to a table. This class ensures the columns and partitions are mapped
* properly, and is a workaround similar to the problem described <a
* href="http://stackoverflow.com/questions/35313077/pyspark-order-of-column-on-write-to-mysql-with-jdbc">here</a>.
*
* @param mappings a dataset of mapping records
* @param tableName the table to write them to
*/
private static void writeMappingsToTable(Dataset<Mapping> mappings,
String tableName) {
// Note the last two columns here must be the partitioned-by columns
// in order and in lower case for Spark to properly match
// them to the partitions.
Dataset<Row> orderedColumnDataset =
mappings.select("sourceValueSet",
"targetValueSet",
"sourceSystem",
"sourceValue",
"targetSystem",
"targetValue",
"equivalence",
"conceptmapuri",
"conceptmapversion");
orderedColumnDataset
.write()
.insertInto(tableName);
}
代码示例来源:origin: com.cerner.bunsen/bunsen-core
/**
* Writes ancestor records to a table. This class ensures the columns and partitions are mapped
* properly, and is a workaround similar to the problem described <a
* href="http://stackoverflow.com/questions/35313077/pyspark-order-of-column-on-write-to-mysql-with-jdbc">here</a>.
*
* @param ancestors a dataset of ancestor records
* @param tableName the table to write them to
*/
private static void writeAncestorsToTable(Dataset<Ancestor> ancestors, String tableName) {
Dataset<Row> orderedColumnDataset = ancestors.select("descendantSystem",
"descendantValue",
"ancestorSystem",
"ancestorValue",
"uri",
"version");
orderedColumnDataset.write()
.mode(SaveMode.ErrorIfExists)
.insertInto(tableName);
}
代码示例来源:origin: cerner/bunsen
/**
* Writes ancestor records to a table. This class ensures the columns and partitions are mapped
* properly, and is a workaround similar to the problem described <a
* href="http://stackoverflow.com/questions/35313077/pyspark-order-of-column-on-write-to-mysql-with-jdbc">here</a>.
*
* @param ancestors a dataset of ancestor records
* @param tableName the table to write them to
*/
private static void writeAncestorsToTable(Dataset<Ancestor> ancestors, String tableName) {
Dataset<Row> orderedColumnDataset = ancestors.select("descendantSystem",
"descendantValue",
"ancestorSystem",
"ancestorValue",
"uri",
"version");
orderedColumnDataset.write()
.mode(SaveMode.ErrorIfExists)
.insertInto(tableName);
}
代码示例来源:origin: com.cerner.bunsen/bunsen-core
/**
* Writes value records to a table. This class ensures the columns and partitions are mapped
* properly, and is a workaround similar to the problem described <a
* href="http://stackoverflow.com/questions/35313077/pyspark-order-of-column-on-write-to-mysql-with-jdbc">here</a>.
*
* @param values a dataset of value records
* @param tableName the table to write them to
*/
private static void writeValuesToTable(Dataset<Value> values, String tableName) {
// Note the last two columns here must be the partitioned-by columns in order and in lower case
// for Spark to properly match them to the partitions
Dataset<Row> orderColumnDataset = values.select("system",
"version",
"value",
"valueseturi",
"valuesetversion");
orderColumnDataset.write()
.mode(SaveMode.ErrorIfExists)
.insertInto(tableName);
}
}
代码示例来源:origin: cerner/bunsen
/**
* Writes value records to a table. This class ensures the columns and partitions are mapped
* properly, and is a workaround similar to the problem described <a
* href="http://stackoverflow.com/questions/35313077/pyspark-order-of-column-on-write-to-mysql-with-jdbc">here</a>.
*
* @param values a dataset of value records
* @param tableName the table to write them to
*/
private static void writeValuesToTable(Dataset<Value> values, String tableName) {
// Note the last two columns here must be the partitioned-by columns in order and in lower case
// for Spark to properly match them to the partitions
Dataset<Row> orderColumnDataset = values.select("system",
"version",
"value",
"valueseturi",
"valuesetversion");
orderColumnDataset.write()
.mode(SaveMode.ErrorIfExists)
.insertInto(tableName);
}
}
代码示例来源:origin: cloudera-labs/envelope
@Override
public void applyBulkMutations(List<Tuple2<MutationType, Dataset<Row>>> planned) {
for (Tuple2<MutationType, Dataset<Row>> plan : planned) {
MutationType mutationType = plan._1();
Dataset<Row> mutation = (doesAlignColumns) ? alignColumns(plan._2()) : plan._2();
DataFrameWriter<Row> writer = mutation.write();
if (partitionColumns != null) {
writer = writer.partitionBy(partitionColumns);
}
if (options != null) {
writer = writer.options(options);
}
switch (mutationType) {
case INSERT:
writer = writer.mode(SaveMode.Append);
break;
case OVERWRITE:
writer = writer.mode(SaveMode.Overwrite);
break;
default:
throw new RuntimeException("Hive output does not support mutation type: " + mutationType);
}
writer.insertInto(tableName);
}
}
代码示例来源:origin: com.cerner.bunsen/bunsen-core
.insertInto(conceptMapTable);
代码示例来源:origin: cerner/bunsen
.insertInto(conceptMapTable);
代码示例来源:origin: cerner/bunsen
.insertInto(valueSetTable);
代码示例来源:origin: com.cerner.bunsen/bunsen-core
.insertInto(valueSetTable);
内容来源于网络,如有侵权,请联系作者删除!